闽公网安备 35020302035485号
.推模式是消费者客户端初始化时利用重平衡线程去拉取消息,拉取消息的方法会注册回调函数,拉取到消息后,由回调函数触发监听器(定义处理逻辑)进行消息处理。

客户端建立连接后,发送消息拉取请求,如果服务端有新消息,则返回消息。如果服务端没有新消息,则挂起连接,等待新消息到来后给客户端返回。客户端如果连接超时,则断开连接。
二. RocketMQ 实现if (this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis()) {
throw new MQClientException(
"Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
2.2 Broker//PullMessageProcessor#processRequest
case ResponseCode.PULL_NOT_FOUND:
if (brokerAllowSuspend && hasSuspendFlag) {
//suspendTimeoutMillisLong 这个参数就是消费端发来的 consumerTimeoutMillisWhenSuspend
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
//这里挂起消息
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
上面的 suspendPullRequest 调用了 PullRequestHoldService#suspendPullRequest,将请求保存在 pullRequestTable。public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
//长轮询模式,等待 5s 后处理
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} //堆代码 duidaima.com
//这里处理被挂起的请求
this.checkHoldRequest();
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}//...
}
处理请求的逻辑参考下面代码:protected void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
finallong offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
}
}
}
}
notifyMessageArriving 方法逻辑如下:否则, 继续挂起,等待 5s 后重复执行上面逻辑。
三. 总结