1、ToBeAppliedRequestProcessor简介
ToBeAppliedRequestProcessor是Zookeeper中的一个请求处理器,它实现了ZooKeeperRequestProcessor接口,主要负责处理客户端发送的请求,并将请求放入到leader的待处理队列,等待leader处理。
2、ToBeAppliedRequestProcessor的工作流程
ToBeAppliedRequestProcessor的工作流程如下:
(1)当客户端发送请求到服务端时,服务端的ToBeAppliedRequestProcessor接收到请求;
(2)ToBeAppliedRequestProcessor将请求封装成Request对象,并将该Request对象放入本地的待处理队列;
(3)ToBeAppliedRequestProcessor将Request对象发送给leader;
(4)leader收到Request对象,并将其放入自己的待处理队列;
(5)leader处理完Request对象后,将处理结果发送给ToBeAppliedRequestProcessor;
(6)ToBeAppliedRequestProcessor收到处理结果后,将结果返回给客户端。
3、ToBeAppliedRequestProcessor的源码
ToBeAppliedRequestProcessor的源码如下:
public class ToBeAppliedRequestProcessor implements RequestProcessor {private final RequestProcessor nextProcessor;private final RequestQueue toBeApplied;private final CommitProcessor commitProcessor;private final Thread me;private volatile boolean finished = false;private final ZooKeeperServer zks;private final AtomicLong requestsProcessed = new AtomicLong(0);private final long maxCommands;private final LinkedList<Request> toBeAppliedRequests = new LinkedList<Request>();private final StatsTrack strack;private final RequestController requestController;public ToBeAppliedRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor,final CommitProcessor commitProcessor, long maxCommands, RequestController requestController)throws RequestProcessorException {this.nextProcessor = nextProcessor;this.commitProcessor = commitProcessor;this.zks = zks;this.toBeApplied = new RequestQueue();this.maxCommands = maxCommands;this.strack = new StatsTrack();this.requestController = requestController;String name = "ToBeAppliedRequestProcessor:" + zks.getServerId();this.me = new ZooKeeperThread(name, this, zks.getZooKeeperServerListener());this.me.start();}public void run() {try {while (!finished) {Request si = toBeApplied.take();if (si == Request.requestOfDeath) {break;}// track the number of read requestsif (si.type == OpCode.sync) {strack.updateSync();} else if (si.type == OpCode.create || si.type == OpCode.delete ||si.type == OpCode.setData || si.type == OpCode.multi) {strack.updateWrite();} else {strack.updateRead();}if (requestController != null) {if (!requestController.shouldProcessRequest(si)) {continue;}}if (requestsProcessed.get() > maxCommands) {LOG.warn("Too many requests processed (" + requestsProcessed.get() + " > " + maxCommands + ") - dropping connection");break;}requestsProcessed.incrementAndGet();nextProcessor.processRequest(si);if (si.getHdr() != null && si.getHdr().getType() == OpCode.closeSession) {break;}}} catch (Exception e) {handleException(this.getName(), e);}LOG.info("ToBeApplied processor exited loop!");}public void processRequest(Request request) throws RequestProcessorException {toBeApplied.add(request);}public void shutdown() {LOG.info("Shutting down");finished = true;toBeApplied.add(Request.requestOfDeath);try {me.join();} catch (InterruptedException e) {LOG.warn("Got interrupted before joining ToBeAppliedRequestProcessor thread");}}}