消费者组:跟生产者一样,每个消费者都有所在的消费者组,一个消费者组可以有很多的消费者,不同的消费者组消费消息是互不影响的。
1)Broker 端需要维护 Consumer 的状态,这不利于 Broker 支持大量 Consumer 的场景。
2)Consumer 的消费速度是不一致的,单独通过 Broker 推送消息,难以处理不同的 Consumer 的状况。
3)Broker 难以处理 Consumer 无法消费消息的情况,因为Broker 无法确定 Consumer 只是暂时的故障还是永久性的故障。
4)大量的推送消息会加重 Consumer 的负载,甚至冲垮 Consumer。Pull模式由 Consumer 主动从 Broker 获取消息,其优点为:
1) Broker 不再需要维护 Consumer 的状态(每一次 Pull 都包含了其实偏移量等必要的信息)。
2)状态维护在 Consumer,所以 Consumer 可以很容易的根据自身的负载等状态来决定从 Broker 获取消息的频率。
3)因为 Broker 无法预测写一条消息产生的时间,所以在收到消息之后只能立即推送给 Consumer,所以无法对消息聚合后再推送给 Consumer。而 Pull 模式由 Consumer 主动来获取消息,每一次 Pull 时都尽可能多的获取已经在 Broker 上的消息。
8.ExecuteRequestWhenWakeup() 方法唤醒拉取请求,调用 ProcessRequest() 方法处理该请求。
private void pullMessage(final PullRequest pullRequest) { // 堆代码 duidaima.com //从pullRequest中获取消费者组 final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { //强转为push模式消费者 DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; //真正执行拉取消息的方法 impl.pullMessage(pullRequest); } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } } /** * 处理拉取消息的请求 */ @Override public void run() { log.info(this.getServiceName() + " service started"); //在它的run方法中,循环不断的从pullRequestQueue中阻塞式的获取并移除队列的头部数据,即拉取消息的请求, // 然后调用pullMessage方法根据该请求去broker拉取消息。 while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); //调用pullMessage方法 this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }在 Consumer 端, Push 模式的消息拉取由 PullMessageService 类实现, PullMessageService 继承了 ServiceThread 类,并实现了 Run 方法,通过异步的方式,循环从 PullRequestQueue 中阻塞式的获取并移除队列头部的数据,最终调用了 DefaultMQPushConsumerImpl 类的 PullMessage 方法。其中,PullRequestQueue 队列是在负载均衡之时对于新分配到的消息队列而创建的,因此只要该队列中有拉取的请求,就会去 Brocker 拉取消息,如果没有就会阻塞。
/** * 处理正在拉取消息的代码 */ public void pullMessage(final PullRequest pullRequest) { //服务状态校验 //... //流控校验 //获得processQueue中已缓存的消息总数量 long cachedMessageCount = processQueue.getMsgCount().get(); //获取processQueue中已缓存的消息总大小MB long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); // 判断还未消息的数量,数量太多就等会再执行重新执行拉取消息的逻辑. if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { // 等会再执行重新执行拉取消息的逻辑. this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } // 判断还未消息的大小,如果还未消息的消息占用的内存过大,就等会再执行重新执行拉取消息的逻辑. if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { // 等会再执行重新执行拉取消息的逻辑. this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } //... //顺序消费和并发消费的校验 //调用pullAPIWrapper.pullKernelImpl方法,拉取消息 }承接上文,这里是 DefaultMQPushConsumerImpl 的 PullMessage 方法的源码,该类中主要做了以下操作:
/*****************PullMessageProcessor#processRequest*****************/ case ResponseCode.PULL_NOT_FOUND: // 消息没找到,如果允许请求挂起的话,那么就会将请求挂起,等有消息的时候,再将消息返回给客户端. if (brokerAllowSuspend && hasSuspendFlag) { long pollingTimeMills = suspendTimeoutMillisLong; if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); } String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); // 将拉消息的请求存起来 this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); // response 设置为null,就不会给客户端响应的意思 response = null; break; } /********************PullRequestHoldService#suspendPullRequest**************************/ protected ConcurrentMap<String, ManyPullRequest> pullRequestTable = new ConcurrentHashMap<String, ManyPullRequest>(1024); /** * 将拉取消息的请求挂起 * * @param topic * @param queueId * @param pullRequest */ public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (null == mpr) { mpr = new ManyPullRequest(); ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr); if (prev != null) { mpr = prev; } } mpr.addPullRequest(pullRequest); }如果在 Broker 端中没有查询到消息,会通过响应码为 ResponseCode.PULL_NOT_FOUND 的代码块,并且启动长轮询。该代码块会调用 PullRequestHoldService 类的 SuspendPullRequest 方法将拉取消息的请求存储起来。PullRequestHoldService 是用来存储拉取请求的类,该方法会将请求进行分类并放在一个 ConcurrentHashMap 中。
protected void checkHoldRequest() { for (String key : this.pullRequestTable.keySet()) { String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); if (2 == kArray.length) { String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]); //获取 这个topic 的 这个queueId的queue消息的最大的offset final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); try { //尝试唤醒等待线程. this.notifyMessageArriving(topic, queueId, offset); } catch (Throwable e) { log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e); } } } } /** * 这个方法也会在 {@link NotifyMessageArrivingListener} 中调用,意思就是一旦有消息来了,那么就尝试唤醒长轮询的请求 */ public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (mpr != null) { List<PullRequest> requestList = mpr.cloneListAndClear(); if (requestList != null) { List<PullRequest> replayList = new ArrayList<PullRequest>(); for (PullRequest request : requestList) { long newestOffset = maxOffset; if (newestOffset <= request.getPullFromThisOffset()) { //传过来的offset小于请求拉取消息的起始的offset,那么就重新读取消息最大的offset //这一步其实是为了保证一定能拉取的需要的消息 newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); } if (newestOffset > request.getPullFromThisOffset()) { //只有当队列消息最大的offset大于消费者需要拉取的消息的offset,那么才执行 //其实很好理解,假设当前队列消息的最大offset是10,但是消费者要拉取第11位的消息,那么此时肯定没有消息,就不用处理了 boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap)); // match by bit map, need eval again when properties is not null. if (match && properties != null) { match = request.getMessageFilter().isMatchedByCommitLog(null, properties); } if (match) { try { // 重新执行一遍拉取的请求,这样就能拉取到消息了. this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup( request.getClientChannel(),request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } //... } //... } } }在 Broker 端,存在 PullRequestHoldService 服务来管理长轮询请求的线程。当一个拉取请求被挂起时,它将被保存在这个服务中。每隔一段时间(长轮询或短轮询等待时间),该服务会检查挂起的请求中是否有可拉取的消息。