public class CommitLog { public final static int MESSAGE_MAGIC_CODE = -626843481; protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); // 堆代码 duidaima.com // 空文件结束标识 protected final static int BLANK_MAGIC_CODE = -875286124; // 文件队列,用于存储在磁盘上的消息 protected final MappedFileQueue mappedFileQueue; // 默认的消息存储对象 protected final DefaultMessageStore defaultMessageStore; // 用于刷盘和提交的服务 private final FlushCommitLogService flushCommitLogService; // 如果启用了TransientStorePool,我们必须在固定的时间内将消息刷新到FileChannel private final FlushCommitLogService commitLogService; // 消息发送的回调函数 private final AppendMessageCallback appendMessageCallback; private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal; // 用于存储每个 topic 的队列 protected HashMap<String/* topi c-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024); // 确认偏移量 protected volatile long confirmOffset = -1L; // 加锁期间的起始时间 private volatile long beginTimeInLock = 0; // 消息发送的锁,用于防止并发发送。 protected final PutMessageLock putMessageLock; }DLedgerCommitLog 是 RocketMQ 用作持久化存储的一种实现方式,它基于Apache DistributedLog (DLedger) 实现了高可靠、高性能的分布式日志存储,也正是它,使得 CommitLog 拥有了选举复制的能力。
public class DLedgerCommitLog extends CommitLog { // DLedger实例 private final DLedgerServer dLedgerServer; // DLedger的配置信息 private final DLedgerConfig dLedgerConfig; // 用于存储mmap文件的存储类 private final DLedgerMmapFileStore dLedgerFileStore; // mmap文件列表 private final MmapFileList dLedgerFileList; // id标识代理角色,0表示主,其他表示从 private final int id; private final MessageSerializer messageSerializer; // 进入DLedger锁的开始时间 private volatile long beginTimeInDledgerLock = 0; // 分隔旧的commitlog和DLedgerCommitlog的偏移量 private long dividedCommitlogOffset = -1; // 是否正在恢复旧的commitlog private boolean isInrecoveringOldCommitlog = false; }了解完 CommitLog,我们回到 broker 的启动,核心类 BrokerController 的 initialize 方法,首先会根据配置生成一个 DefaultMessageStore,这里会判断当前是否支持 DLedgerCommitLog,如果支持,则会创建一个 DLedgerRoleChangeHandler 对象并注册为 leader 选举的回调方法。接着与老版本一致会创建一个 BrokerStats 对象和一个 MessageStorePluginContext 对象。最后,会将 CommitLogDispatcherCalcBitMap 对象添加到 MessageStore 的 DispatcherList 中。
public class BrokerController { public boolean initialize() throws CloneNotSupportedException { …… if (result) { try { this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); // 如果支持DLegerCommitLog if (messageStoreConfig.isEnableDLegerCommitLog()) { // 创建DLedgerRoleChangeHandler DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); // 将CommitLog转换为DLegerCommitLog,并添加DLedgerRoleChangeHandler处理器 ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); } this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); // 加载插件 MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); this.messageStore = MessageStoreFactory.build(context, this.messageStore); this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager)); } …… } }接着我们进入 DLedgerCommitLog 的代码,这里可以看到使用了 openmessaging 包下的 DLedgerServer 组件
public class DLedgerCommitLog extends CommitLog { @Override public void start() { // 启动dLedgerServer dLedgerServer.startup(); } }基于 dLedgerLeaderElector 做了 Leader 选举的操作
public class DLedgerServer extends AbstractDLedgerServer { public synchronized void startup() { if (!isStarted) { this.dLedgerStore.startup(); this.fsmCaller.ifPresent(x -> { // 启动状态机调用程序并加载现有快照以进行数据恢复 x.start(); x.getSnapshotManager().loadSnapshot(); }); if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) { this.dLedgerRpcService.startup(); } this.dLedgerEntryPusher.startup(); // 进行leader选举 this.dLedgerLeaderElector.startup(); executorService.scheduleAtFixedRate(this::checkPreferredLeader, 1000, 1000, TimeUnit.MILLISECONDS); isStarted = true; } } }启动状态机
public class DLedgerLeaderElector { public void startup() { // 启动状态机 stateMaintainer.start(); for (RoleChangeHandler roleChangeHandler : roleChangeHandlers) { roleChangeHandler.startup(); } } }状态机
public class StateMaintainer extends ShutdownAbleThread { public StateMaintainer(String name, Logger logger) { super(name, logger); } @Override public void doWork() { try { // 是否支持Leader选举 if (DLedgerLeaderElector.this.dLedgerConfig.isEnableLeaderElector()) { DLedgerLeaderElector.this.refreshIntervals(dLedgerConfig); // 状态机核心方法 DLedgerLeaderElector.this.maintainState(); } sleep(10); } catch (Throwable t) { DLedgerLeaderElector.LOGGER.error("Error in heartbeat", t); } } }状态机核心方法,Raft 中的三个角色,Leader、Follower、Candidate,这里对三个角色不做过多叙述,可以参见《In Search of an Understandable Consensus Algorithm》 一文。
public class DLedgerLeaderElector { private void maintainState() throws Exception { // Leader角色 if (memberState.isLeader()) { maintainAsLeader(); } else if (memberState.isFollower()) { // Follower角色 maintainAsFollower(); } else { // Candidate角色 maintainAsCandidate(); } } }接下来我们来看 raft 的核心实现,首先看 Leader 角色的实现代码。当上次发送心跳时间大于心跳包间隔时,会重新发送心跳。
public class DLedgerLeaderElector { private void maintainAsLeader() throws Exception { // 上次发送心跳时间是否大于心跳包间隔时间 if (DLedgerUtils.elapsed(lastSendHeartBeatTime) > heartBeatTimeIntervalMs) { // 任期 long term; // 主节点id String leaderId; synchronized (memberState) { if (!memberState.isLeader()) { //stop sending return; } term = memberState.currTerm(); leaderId = memberState.getLeaderId(); lastSendHeartBeatTime = System.currentTimeMillis(); } // 发送心跳 sendHeartbeats(term, leaderId); } } }当我们看心跳代码前,首先回顾下 raft 理论中对于心跳返回的描述。
public class DLedgerLeaderElector { private void sendHeartbeats(long term, String leaderId) throws Exception { …… for (String id : memberState.getPeerMap().keySet()) { if (memberState.getSelfId().equals(id)) { continue; } HeartBeatRequest heartBeatRequest = new HeartBeatRequest(); heartBeatRequest.setGroup(memberState.getGroup()); heartBeatRequest.setLocalId(memberState.getSelfId()); heartBeatRequest.setRemoteId(id); // 主节点id heartBeatRequest.setLeaderId(leaderId); // 任期 heartBeatRequest.setTerm(term); // 异步发送心跳 CompletableFuture<HeartBeatResponse> future = dLedgerRpcService.heartBeat(heartBeatRequest); future.whenComplete((HeartBeatResponse x, Throwable ex) -> { try { if (ex != null) { memberState.getPeersLiveTable().put(id, Boolean.FALSE); throw ex; } // 获取心跳结果 switch (DLedgerResponseCode.valueOf(x.getCode())) { // 成功 case SUCCESS: succNum.incrementAndGet(); break; // 主节点的Term小于从节点 case EXPIRED_TERM: maxTerm.set(x.getTerm()); break; // 从节点的主节点非当前节点 case INCONSISTENT_LEADER: inconsistLeader.compareAndSet(false, true); break; // 从节点尚未准备完毕 case TERM_NOT_READY: notReadyNum.incrementAndGet(); break; default: break; } …… } catch (Throwable t) { LOGGER.error("heartbeat response failed", t); } finally { allNum.incrementAndGet(); if (allNum.get() == memberState.peerSize()) { beatLatch.countDown(); } } }); } long voteResultWaitTime = 10; beatLatch.await(heartBeatTimeIntervalMs - voteResultWaitTime, TimeUnit.MILLISECONDS); Thread.sleep(voteResultWaitTime); // 当从节点返回的term大于自身时,直接退化为candidate if (maxTerm.get() > term) { LOGGER.warn("[{}] currentTerm{} is not the biggest={}, deal with it", memberState.getSelfId(), term, maxTerm.get()); changeRoleToCandidate(maxTerm.get()); return; } // 当半数以上正常返回心跳时,Leader状态正常,重置心跳时间 if (memberState.isQuorum(succNum.get())) { lastSuccHeartBeatTime = System.currentTimeMillis(); } else { LOGGER.info("[{}] Parse heartbeat responses in cost={} term={} allNum={} succNum={} notReadyNum={} inconsistLeader={} maxTerm={} peerSize={} lastSuccHeartBeatTime={}", memberState.getSelfId(), DLedgerUtils.elapsed(startHeartbeatTimeMs), term, allNum.get(), succNum.get(), notReadyNum.get(), inconsistLeader.get(), maxTerm.get(), memberState.peerSize(), new Timestamp(lastSuccHeartBeatTime)); // 当正常心跳 + 未准备的心跳大于半数时,立即发送心跳 if (memberState.isQuorum(succNum.get() + notReadyNum.get())) { lastSendHeartBeatTime = -1; // 当从节点中有其他主节点时,直接退化为candidate } else if (inconsistLeader.get()) { changeRoleToCandidate(term); // 如果上次心跳包时间大于3次心跳间隔时间,直接退化为candidate } else if (DLedgerUtils.elapsed(lastSuccHeartBeatTime) > (long) maxHeartBeatLeak * heartBeatTimeIntervalMs) { changeRoleToCandidate(term); } } } }从简单上来说,Leader 只做了一件事,那就是发送心跳,根据心跳结果判断服务是否正常及自己的地位。接着让我们看 Follower 做了什么,Follower 在选举中的流程比较简单
public class DLedgerLeaderElector { private void maintainAsFollower() { // 如果上次心跳时间大于2次心跳间隔 if (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > 2L * heartBeatTimeIntervalMs) { synchronized (memberState) { // 如果当前角色是Follower,并且心跳大于3次心跳间隔,升级到candidate if (memberState.isFollower() && DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > (long) maxHeartBeatLeak * heartBeatTimeIntervalMs) { LOGGER.info("[{}][HeartBeatTimeOut] lastLeaderHeartBeatTime: {} heartBeatTimeIntervalMs: {} lastLeader={}", memberState.getSelfId(), new Timestamp(lastLeaderHeartBeatTime), heartBeatTimeIntervalMs, memberState.getLeaderId()); changeRoleToCandidate(memberState.currTerm()); } } } } }最后我们来看 Candidate 角色,这块的代码比较多,让我们逐行来进行分析
public class DLedgerLeaderElector { private void maintainAsCandidate() throws Exception { // 如果当前时间小于下次发起投票时间或者不应该立即发起投票,返回 if (System.currentTimeMillis() < nextTimeToRequestVote && !needIncreaseTermImmediately) { return; } // 任期 long term; // Leader节点的投票任期 long ledgerEndTerm; // 日志的最大索引 long ledgerEndIndex; // 如果不是Candidate,直接返回 if (!memberState.isCandidate()) { return; } synchronized (memberState) { if (!memberState.isCandidate()) { return; } // 如果上次投票结果是等待下一轮或者需要立即投票 if (lastParseResult == VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT || needIncreaseTermImmediately) { // 记录当前任期 long prevTerm = memberState.currTerm(); // 记录下一任期 term = memberState.nextTerm(); LOGGER.info("{}_[INCREASE_TERM] from {} to {}", memberState.getSelfId(), prevTerm, term); // 将上次投票结果重置为等待重新投票 lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE; } else { term = memberState.currTerm(); } // 设置日志的最大索引 ledgerEndIndex = memberState.getLedgerEndIndex(); // 设置节点的投票任期 ledgerEndTerm = memberState.getLedgerEndTerm(); } // 需要立即投票 if (needIncreaseTermImmediately) { // 设置下次投票超时时间,这中间为了减少同时发起的概率,会有随机时间 nextTimeToRequestVote = getNextTimeToRequestVote(); // 重置需要立即投票标识 needIncreaseTermImmediately = false; return; } // 发起投票申请,包含当前任期,自身维护的最大任期,自身维护的最大日志索引,直接给自己投票 final List<CompletableFuture<VoteResponse>> quorumVoteResponses = voteForQuorumResponses(term, ledgerEndTerm, ledgerEndIndex); …… for (CompletableFuture<VoteResponse> future : quorumVoteResponses) { future.whenComplete((VoteResponse x, Throwable ex) -> { try { if (ex != null) { throw ex; } LOGGER.info("[{}][GetVoteResponse] {}", memberState.getSelfId(), JSON.toJSONString(x)); if (x.getVoteResult() != VoteResponse.RESULT.UNKNOWN) { validNum.incrementAndGet(); } synchronized (knownMaxTermInGroup) { switch (x.getVoteResult()) { // 赞成,成功数加一 case ACCEPT: acceptedNum.incrementAndGet(); break; // 被已有Leader的节点拒绝 case REJECT_ALREADY_HAS_LEADER: alreadyHasLeader.compareAndSet(false, true); break; // 任期小于其他选举人 case REJECT_TERM_SMALL_THAN_LEDGER: case REJECT_EXPIRED_VOTE_TERM: if (x.getTerm() > knownMaxTermInGroup.get()) { // 维护最大任期 knownMaxTermInGroup.set(x.getTerm()); } break; // 任期小于对方 case REJECT_EXPIRED_LEDGER_TERM: // 日志小于对方 case REJECT_SMALL_LEDGER_END_INDEX: biggerLedgerNum.incrementAndGet(); break; // 对方尚未准备完成 case REJECT_TERM_NOT_READY: notReadyTermNum.incrementAndGet(); break; // 已投票 case REJECT_ALREADY_VOTED: // 拒绝接受领导 case REJECT_TAKING_LEADERSHIP: default: break; } } // 如果已经有leader或已接受的投票数量满足 quorum 或者已接受和未准备好的数量之和满足 quorum,释放阻塞状态 if (alreadyHasLeader.get() || memberState.isQuorum(acceptedNum.get()) || memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) { voteLatch.countDown(); } } catch (Throwable t) { LOGGER.error("vote response failed", t); } finally { allNum.incrementAndGet(); // 所有异步请求结束时,释放阻塞状态 if (allNum.get() == memberState.peerSize()) { voteLatch.countDown(); } } }); } try { // 生成一个随机数的阻塞时间 voteLatch.await(2000 + random.nextInt(maxVoteIntervalMs), TimeUnit.MILLISECONDS); } catch (Throwable ignore) { } lastVoteCost = DLedgerUtils.elapsed(startVoteTimeMs); VoteResponse.ParseResult parseResult; if (knownMaxTermInGroup.get() > term) { // 已知的最大任期比当前任期要大,则返回 WAIT_TO_VOTE_NEXT,并转变为Candidate parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT; nextTimeToRequestVote = getNextTimeToRequestVote(); changeRoleToCandidate(knownMaxTermInGroup.get()); } else if (alreadyHasLeader.get()) { // 已经存在Leader,则返回 WAIT_TO_VOTE_NEXT parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE; nextTimeToRequestVote = getNextTimeToRequestVote() + (long) heartBeatTimeIntervalMs * maxHeartBeatLeak; } else if (!memberState.isQuorum(validNum.get())) { // 有效响应的数量无法满足 quorum,则返回 WAIT_TO_REVOTE parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE; nextTimeToRequestVote = getNextTimeToRequestVote(); } else if (!memberState.isQuorum(validNum.get() - biggerLedgerNum.get())) { // 有效响应的数量减去日志条目大于自身的数量无法满足 quorum,则返回 WAIT_TO_REVOTE parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE; nextTimeToRequestVote = getNextTimeToRequestVote() + maxVoteIntervalMs; } else if (memberState.isQuorum(acceptedNum.get())) { // 接受的投票数量满足 quorum,则本次投票通过 parseResult = VoteResponse.ParseResult.PASSED; } else if (memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) { // 已接受和未准备好的数量之和满足 quorum,则立即进行投票 parseResult = VoteResponse.ParseResult.REVOTE_IMMEDIATELY; } else { parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT; nextTimeToRequestVote = getNextTimeToRequestVote(); } lastParseResult = parseResult; LOGGER.info("[{}] [PARSE_VOTE_RESULT] cost={} term={} memberNum={} allNum={} acceptedNum={} notReadyTermNum={} biggerLedgerNum={} alreadyHasLeader={} maxTerm={} result={}", memberState.getSelfId(), lastVoteCost, term, memberState.peerSize(), allNum, acceptedNum, notReadyTermNum, biggerLedgerNum, alreadyHasLeader, knownMaxTermInGroup.get(), parseResult); if (parseResult == VoteResponse.ParseResult.PASSED) { LOGGER.info("[{}] [VOTE_RESULT] has been elected to be the leader in term {}", memberState.getSelfId(), term); // 如果是通过,则转变为Leader对象 changeRoleToLeader(term); } } }由于篇幅的问题,我们并没有一一将 Dledger 的核心代码在这里展现,在此仅展示了选主等基本流程,对于写入、复制、日志存储、消息传递等都不多加诉说。Dledger 算法的核心原理是 Raft 协议,当一个节点发起投票请求时,其他节点会收到请求并发送响应,响应的结果将根据投票数量判断是否达成共识。如果共识达成,则新的 leader 将被选举出来,同时新的日志将被追加到磁盘上。如果共识未达成,则需要等待一定时间后重新发起投票请求。