.推模式是消费者客户端初始化时利用重平衡线程去拉取消息,拉取消息的方法会注册回调函数,拉取到消息后,由回调函数触发监听器(定义处理逻辑)进行消息处理。
客户端建立连接后,发送消息拉取请求,如果服务端有新消息,则返回消息。如果服务端没有新消息,则挂起连接,等待新消息到来后给客户端返回。客户端如果连接超时,则断开连接。
二. RocketMQ 实现if (this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis()) { throw new MQClientException( "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); }2.2 Broker
//PullMessageProcessor#processRequest case ResponseCode.PULL_NOT_FOUND: if (brokerAllowSuspend && hasSuspendFlag) { //suspendTimeoutMillisLong 这个参数就是消费端发来的 consumerTimeoutMillisWhenSuspend long pollingTimeMills = suspendTimeoutMillisLong; if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); } String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); //这里挂起消息 this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null; break; }上面的 suspendPullRequest 调用了 PullRequestHoldService#suspendPullRequest,将请求保存在 pullRequestTable。
public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { //长轮询模式,等待 5s 后处理 if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } //堆代码 duidaima.com //这里处理被挂起的请求 this.checkHoldRequest(); } catch (Throwable e) { log.warn(this.getServiceName() + " service has exception. ", e); } }//... }处理请求的逻辑参考下面代码:
protected void checkHoldRequest() { for (String key : this.pullRequestTable.keySet()) { String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); if (2 == kArray.length) { String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]); finallong offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); try { this.notifyMessageArriving(topic, queueId, offset); } catch (Throwable e) { log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e); } } } }notifyMessageArriving 方法逻辑如下:
否则, 继续挂起,等待 5s 后重复执行上面逻辑。
三. 总结