Dada Mail硬盘速度Java magento

文章目录
1. Kafka magento选举的流程2. Kafka magento选举的源码分析2.1 KafkaRaftManager 的初始化准备2.2 KafkaRaftManager 的启动运行2.3 magento选主的流程2.4 选举僵局的处理-回退机制

1. Kafka magento选举的流程
在 Kafka 3.0 源码笔记(1)-Kafka 服务端的网络通信架构 中笔者提到在 KRaft 模式下 Kafka magento的元数据已经交由 Controller magento自治,则在分布式环境下必然要涉及到magento硬盘速度的交互,包括magento选主、magento元数据同步等。其中 Kafka magento选举涉及的状态流转如下图所示,关键的Java交互如下:

Vote 由 Candidate 候选者硬盘速度发送,Java其他硬盘速度为自己投票BeginQuorumEpoch 由 Leader 硬盘速度发送,告知其他硬盘速度当前的 Leader 信息EndQuorumEpoch 当前 Leader 退位时发送,触发重新选举Fetch 由 Follower 发送,用于复制 Leader 日志,另外通过 Fetch Java Follower 也可以完成对 Leader 的探活

2. Kafka magento选举的源码分析
Controller magento硬盘速度交互的这部分其实依赖协调magento信息的 KafkaRaftManager 组件,本文以magento选主的场景切入分析,将magento的运作机制分为以下几个部分:

magento组件 KafkaRaftManager 的初始化准备magento组件 KafkaRaftManager 的启动运行magento选主的主流程处理选举僵局的处理

2.1 KafkaRaftManager 的初始化准备
KafkaRaftManager组件初始化的触发点在 KafkaRaftServer 实例的创建过程中,可以看到其初始化过程中的关键处理如下:

调用 RaftManager.scala#buildNetworkChannel() Dada Mail创建底层网络通信组件 KafkaNetworkChannel调用 RaftManager.scala#buildRaftClient() Dada Mail创建magento客户端 KafkaRaftClient创建 RaftIoThread 线程用于不断进行本地网络通信客户端Java响应处理
class KafkaRaftManager[T](
metaProperties: MetaProperties,
config: KafkaConfig,
recordSerde: RecordSerde[T],
topicPartition: TopicPartition,
topicId: Uuid,
time: Time,
metrics: Metrics,
threadNamePrefixOpt: Option[String],
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
) extends RaftManager[T] with Logging {

private val raftConfig = new RaftConfig(config)
private val threadNamePrefix = threadNamePrefixOpt.getOrElse(“kafka-raft”)
private val logContext = new LogContext(s”[RaftManager nodeId=${config.nodeId}] “)
this.logIdent = logContext.logPrefix()

private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix + “-scheduler”)
scheduler.startup()

private val dataDir = createDataDir()
override val replicatedLog: ReplicatedLog = buildMetadataLog()
private val netChannel = buildNetworkChannel()
override val client: KafkaRaftClient[T] = buildRaftClient()
private val raftIoThread = new RaftIoThread(client, threadNamePrefix)

}
1234567891011121314151617181920212223242526272829 RaftManager.scala#buildNetworkChannel() Dada Mail的核心为以下两步:

调用 RaftManager.scala#buildNetworkClient() Dada Mail创建网络客户端,可以看到这个Dada Mail创建了 NetworkClient 实例用于底层网络连接的监听处理使用上一步创建的 NetworkClient 实例新建 KafkaNetworkChannel 对象,给上层提供magento交互Java的发送入口
private def buildNetworkChannel(): KafkaNetworkChannel = {
val netClient = buildNetworkClient()
new KafkaNetworkChannel(time, netClient, config.quorumRequestTimeoutMs, threadNamePrefix)
}

private def buildNetworkClient(): NetworkClient = {
val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
val controllerSecurityProtocol = config.listenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
val channelBuilder = ChannelBuilders.clientChannelBuilder(
controllerSecurityProtocol,
JaasContext.Type.SERVER,
config,
controllerListenerName,
config.saslMechanismControllerProtocol,
time,
config.saslInterBrokerHandshakeRequestEnable,
logContext
)

val metricGroupPrefix = “raft-channel”
val collectPerConnectionMetrics = false

val selector = new Selector(
NetworkReceive.UNLIMITED,
config.connectionsMaxIdleMs,
metrics,
time,
metricGroupPrefix,
Map.empty[String, String].asJava,
collectPerConnectionMetrics,
channelBuilder,
logContext
)

val clientId = s”raft-client-${config.nodeId}”
val maxInflightRequestsPerConnection = 1
val reconnectBackoffMs = 50
val reconnectBackoffMsMs = 500
val discoverBrokerVersions = true

new NetworkClient(
selector,
new ManualMetadataUpdater(),
clientId,
maxInflightRequestsPerConnection,
reconnectBackoffMs,
reconnectBackoffMsMs,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.socketReceiveBufferBytes,
config.quorumRequestTimeoutMs,
config.connectionSetupTimeoutMs,
config.connectionSetupTimeoutMaxMs,
time,
discoverBrokerVersions,
new ApiVersions,
logContext
)
}
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 KafkaNetworkChannel 实例创建过程中关键组件也会被创建,其中就包括 RaftSendThread Java发送线程。RaftSendThread 继承自 InterBrokerSendThread,其用途将在后文分析 class KafkaNetworkChannel(
time: Time,
client: KafkaClient,
requestTimeoutMs: Int,
threadNamePrefix: String
) extends NetworkChannel with Logging {
import KafkaNetworkChannel._

type ResponseHandler = AbstractResponse => Unit

private val correlationIdCounter = new AtomicInteger(0)
private val endpoints = mutable.HashMap.empty[Int, Node]

private val requestThread = new RaftSendThread(
name = threadNamePrefix + “-outbound-request-thread”,
networkClient = client,
requestTimeoutMs = requestTimeoutMs,
time = time,
isInterruptible = false
)
}
123456789101112131415161718192021 回到本节步骤1第2步 RaftManager.scala#buildRaftClient() Dada Mail执行,可以看到核心的处理分为两步:

创建 KafkaRaftClient magento客户端实例调用 KafkaRaftClient.java#initialize() Dada Mail初始化当前硬盘速度的 quorum 状态
private def buildRaftClient(): KafkaRaftClient[T] = {
val expirationTimer = new SystemTimer(“raft-expiration-executor”)
val expirationService = new TimingWheelExpirationService(expirationTimer)
val quorumStateStore = new FileBasedStateStore(new File(dataDir, “quorum-state”))

val nodeId = if (config.processRoles.contains(ControllerRole)) {
OptionalInt.of(config.nodeId)
} else {
OptionalInt.empty()
}

val client = new KafkaRaftClient(
recordSerde,
netChannel,
replicatedLog,
quorumStateStore,
time,
metrics,
expirationService,
logContext,
metaProperties.clusterId,
nodeId,
raftConfig
)
client.initialize()
client
}
123456789101112131415161718192021222324252627 KafkaRaftClient magento客户端的构造Dada Mail如下,需要关注的点有以下几个:

调用 RaftConfig.java#quorumVoterIds() Dada Mail获取配置文件中controller.quorum.voters属性配置的有选举权的硬盘速度的 id 列表创建代表自身在magento中的角色状态的 QuorumState 实例调用KafkaNetworkChannel.java#updateEndpoint() Dada Mail将其它可以投票的硬盘速度连接地址信息更新到magentoJava发送组件内部
KafkaRaftClient(
RecordSerde serde,
NetworkChannel channel,
RaftMessageQueue messageQueue,
ReplicatedLog log,
QuorumStateStore quorumStateStore,
MemoryPool memoryPool,
Time time,
Metrics metrics,
ExpirationService expirationService,
int fetchMaxWaitMs,
String clusterId,
OptionalInt nodeId,
LogContext logContext,
Random random,
RaftConfig raftConfig
) {
this.serde = serde;
this.channel = channel;
this.messageQueue = messageQueue;
this.log = log;
this.memoryPool = memoryPool;
this.fetchPurgatory = new ThresholdPurgatory<>(expirationService);
this.appendPurgatory = new ThresholdPurgatory<>(expirationService);
this.time = time;
this.clusterId = clusterId;
this.fetchMaxWaitMs = fetchMaxWaitMs;
this.logger = logContext.logger(KafkaRaftClient.class);
this.random = random;
this.raftConfig = raftConfig;
this.snapshotCleaner = new RaftMetadataLogCleanerManager(logger, time, 60000, log::maybeClean);
Set quorumVoterIds = raftConfig.quorumVoterIds();
this.requestManager = new RequestManager(quorumVoterIds, raftConfig.retryBackoffMs(),
raftConfig.requestTimeoutMs(), random);
this.quorum = new QuorumState(
nodeId,
quorumVoterIds,
raftConfig.electionTimeoutMs(),
raftConfig.fetchTimeoutMs(),
quorumStateStore,
time,
logContext,
random);
this.kafkaRaftMetrics = new KafkaRaftMetrics(metrics, “raft”, quorum);
kafkaRaftMetrics.updateNumUnknownVoterConnections(quorum.remoteVoters().size());

// Update the voter endpoints with what’s in RaftConfig
Map voterAddresses = raftConfig.quorumVoterConnections();
voterAddresses.entrySet().stream()
.filter(e -> e.getValue() instanceof RaftConfig.InetAddressSpec)
.forEach(e -> this.channel.updateEndpoint(e.getKey(), (RaftConfig.InetAddressSpec) e.getValue()));
}
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152 回到本节步骤4第2步,KafkaRaftClient.java#initialize() Dada Mail的核心其实是调用 QuorumState.java#initialize() Dada Mail去初始化当前硬盘速度在magento中所处的角色状态 public void initialize() {
quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch()));

long currentTimeMs = time.milliseconds();
if (quorum.isLeader()) {
throw new IllegalStateException(“Voter cannot initialize as a Leader”);
} else if (quorum.isCandidate()) {
onBecomeCandidate(currentTimeMs);
} else if (quorum.isFollower()) {
onBecomeFollower(currentTimeMs);
}

// When there is only a single voter, become candidate immediately
if (quorum.isVoter()
&& quorum.remoteVoters().isEmpty()
&& !quorum.isCandidate()) {

transitionToCandidate(currentTimeMs);
}
}
1234567891011121314151617181920 QuorumState.java#initialize() Dada Mail比较长,不过核心只有两点:

首先调用 QuorumStateStore#readElectionState() 从本地quorum-state文件读取选举状态记录,此处主要是为了覆盖硬盘速度重启的场景根据选举状态初始化硬盘速度的magento状态,初次启动的硬盘速度将被设置为 UnattachedState 状态
public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateException {
// We initialize in whatever state we were in on shutdown. If we were a leader
// or candidate, probably an election was held, but we will find out about it
// when we send Vote or BeginEpoch requests.

ElectionState election;
try {
election = store.readElectionState();
if (election == null) {
election = ElectionState.withUnknownLeader(0, voters);
}
} catch (final UncheckedIOException e) {
// For exceptions during state file loading (missing or not readable),
// we could assume the file is corrupted already and should be cleaned up.
log.warn(“Clearing local quorum state store after error loading state {}”,
store.toString(), e);
store.clear();
election = ElectionState.withUnknownLeader(0, voters);
}

final EpochState initialState;
if (!election.voters().isEmpty() && !voters.equals(election.voters())) {
throw new IllegalStateException(“Configured voter set: ” + voters
+ ” is different from the voter set read from the state file: ” + election.voters()
+ “. Check if the quorum configuration is up to date, ”
+ “or wipe out the local state file if necessary”);
} else if (election.hasVoted() && !isVoter()) {
String localIdDescription = localId.isPresent() ?
localId.getAsInt() + ” is not a voter” :
“is undefined”;
throw new IllegalStateException(“Initialized quorum state ” + election
+ ” with a voted candidate, which indicates this node was previously ”
+ ” a voter, but the local id ” + localIdDescription);
} else if (election.epoch < logEndOffsetAndEpoch.epoch) { log.warn("Epoch from quorum-state file is {}, which is " + "smaller than last written epoch {} in the log", election.epoch, logEndOffsetAndEpoch.epoch); initialState = new UnattachedState( time, logEndOffsetAndEpoch.epoch, voters, Optional.empty(), randomElectionTimeoutMs(), logContext ); } else if (localId.isPresent() && election.isLeader(localId.getAsInt())) { // If we were previously a leader, then we will start out as resigned // in the same epoch. This serves two purposes: // 1. It ensures that we cannot vote for another leader in the same epoch. // 2. It protects the invariant that each record is uniquely identified by // offset and epoch, which might otherwise be violated if unflushed data // is lost after restarting. initialState = new ResignedState( time, localId.getAsInt(), election.epoch, voters, randomElectionTimeoutMs(), Collections.emptyList(), logContext ); } else if (localId.isPresent() && election.isVotedCandidate(localId.getAsInt())) { initialState = new CandidateState( time, localId.getAsInt(), election.epoch, voters, Optional.empty(), 1, randomElectionTimeoutMs(), logContext ); } else if (election.hasVoted()) { initialState = new VotedState( time, election.epoch, election.votedId(), voters, Optional.empty(), randomElectionTimeoutMs(), logContext ); } else if (election.hasLeader()) { initialState = new FollowerState( time, election.epoch, election.leaderId(), voters, Optional.empty(), fetchTimeoutMs, logContext ); } else { initialState = new UnattachedState( time, election.epoch, voters, Optional.empty(), randomElectionTimeoutMs(), logContext ); } transitionTo(initialState); } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 至此 KafkaRaftClient 对象的实例化基本结束,回到本节步骤1第3步,RaftIoThread 的定义比较简单,可以看到核心的Dada Mail doWork() Dada Mail其实就是调用 KafkaRaftClien.java#poll(),这部分后文将详细分析,至此magento组件 KafkaRaftManager 的初始化结束 class RaftIoThread( client: KafkaRaftClient[_], threadNamePrefix: String ) extends ShutdownableThread( name = threadNamePrefix + "-io-thread", isInterruptible = false ) { override def doWork(): Unit = { client.poll() } ...... } 1234567891011121314 2.2 KafkaRaftManager 的启动运行 KafkaRaftManager组件启动运行的触发点在 KafkaRaftServer 实例的启动过程中,KafkaRaftManager.scala#startup() Dada Mail的关键如下: 调用 KafkaNetworkChannel.scala#start() 启动magento交互Java发送组件调用 RaftIoThread.scala#start() Dada Mail启动线程,用于不断进行本地网络通信客户端Java响应处理 def startup(): Unit = { // Update the voter endpoints (if valid) with what's in RaftConfig val voterAddresses: util.Map[Integer, AddressSpec] = controllerQuorumVotersFuture.get() for (voterAddressEntry <- voterAddresses.entrySet.asScala) { voterAddressEntry.getValue match { case spec: InetAddressSpec =>
netChannel.updateEndpoint(voterAddressEntry.getKey, spec)
case _: UnknownAddressSpec =>
logger.info(s”Skipping channel update for destination ID: ${voterAddressEntry.getKey} ” +
s”because of non-routable endpoint: ${NON_ROUTABLE_ADDRESS.toString}”)
case invalid: AddressSpec =>
logger.warn(s”Unexpected address spec (type: ${invalid.getClass}) for channel update for ” +
s”destination ID: ${voterAddressEntry.getKey}”)
}
}
netChannel.start()
raftIoThread.start()
}
123456789101112131415161718 KafkaNetworkChannel.scala#start() Dada Mail其实就是启动 RaftSendThread 线程,这个线程类继承于 InterBrokerSendThread,InterBrokerSendThread 又继承于ShutdownableThread,下文将进行分析其工作机制 def start(): Unit = {
requestThread.start()
}
123 RaftIoThread.scala#start() Dada Mail比较关键,从流程上看,此处实际上是整个 KafkaRaftManager 组件开始工作的入口。RaftIoThread 间接继承了 Thread,实际上线程启动后应该会调用 RaftIoThread.scala#run() Dada Mail,而这个Dada Mail由其父类 ShutdownableThread.scala#run()实现

可以看到, ShutdownableThread.scala#run() 的核心逻辑其实是循环调用子类 RaftIoThread.scala#doWork() Dada Mail
override def run(): Unit = {
isStarted = true
info(“Starting”)
try {
while (isRunning)
doWork()
} catch {
case e: FatalExitError =>
shutdownInitiated.countDown()
shutdownComplete.countDown()
info(“Stopped”)
Exit.exit(e.statusCode())
case e: Throwable =>
if (isRunning)
error(“Error due to”, e)
} finally {
shutdownComplete.countDown()
}
info(“Stopped”)
}
1234567891011121314151617181920 RaftIoThread.scala#doWork() Dada Mail非常简明,重要逻辑就是调用 KafkaRaftClient.java#poll() Dada Mail开始进行magento硬盘速度间的交互,至此核心组件的启动告一段落 override def doWork(): Unit = {
client.poll()
}
123
2.3 magento选主的流程
KafkaRaftClient.java#poll() Dada Mail源码简单易懂,核心在于以下几步:

首先调用 KafkaRaftClient.java#pollCurrentState() Dada Mail根据硬盘速度当前状态进行下一步处理,推动magento状态的变迁通过 messageQueue.poll() 轮询消息队列,取出消息队列中的消息调用 KafkaRaftClient.java#handleInboundMessage() 进行处理。需注意,此处消息队列实际只会存储两种消息,一种是来自其他硬盘速度的 RaftRequest.Inbound Java,另一种是其他硬盘速度对本硬盘速度的Java作出的 RaftResponse.Inbound 响应
public void poll() {
pollListeners();

long currentTimeMs = time.milliseconds();
if (maybeCompleteShutdown(currentTimeMs)) {
return;
}

long pollStateTimeoutMs = pollCurrentState(currentTimeMs);
long cleaningTimeoutMs = snapshotCleaner.maybeClean(currentTimeMs);
long pollTimeoutMs = Math.min(pollStateTimeoutMs, cleaningTimeoutMs);

kafkaRaftMetrics.updatePollStart(currentTimeMs);

RaftMessage message = messageQueue.poll(pollTimeoutMs);

currentTimeMs = time.milliseconds();
kafkaRaftMetrics.updatePollEnd(currentTimeMs);

if (message != null) {
handleInboundMessage(message, currentTimeMs);
}
}
1234567891011121314151617181920212223 KafkaRaftClient.java#pollCurrentState() Dada Mail是硬盘速度magento状态流转的重点,上文中我们提到初次启动的硬盘速度都处于 UnattachedState 状态,则此处将调用 KafkaRaftClient.java#pollUnattached() Dada Mail private long pollCurrentState(long currentTimeMs) {
if (quorum.isLeader()) {
return pollLeader(currentTimeMs);
} else if (quorum.isCandidate()) {
return pollCandidate(currentTimeMs);
} else if (quorum.isFollower()) {
return pollFollower(currentTimeMs);
} else if (quorum.isVoted()) {
return pollVoted(currentTimeMs);
} else if (quorum.isUnattached()) {
return pollUnattached(currentTimeMs);
} else if (quorum.isResigned()) {
return pollResigned(currentTimeMs);
} else {
throw new IllegalStateException(“Unexpected quorum state ” + quorum);
}
}
1234567891011121314151617 KafkaRaftClient.java#pollUnattached() Dada Mail的关键逻辑如下:

首先判断当前硬盘速度是否有 Controller 角色,以及是否在属性 controller.quorum.voters 配置的有选举权的硬盘速度列表中,条件成立的话则调用 KafkaRaftClient.java#pollUnattachedAsVoter() Dada MailKafkaRaftClient.java#pollUnattachedAsVoter() Dada Mail的处理非常简单,可以看到核心其实是调用 UnattachedState#hasElectionTimeoutExpired() Dada Mail判断当前硬盘速度的选举时间是否到达,如果是的话则调用 KafkaRaftClient.java#transitionToCandidate() Dada Mail将当前硬盘速度的角色切换为候选者,切换的过程中当前硬盘速度会给自己投一票
private long pollUnattached(long currentTimeMs) {
UnattachedState state = quorum.unattachedStateOrThrow();
if (quorum.isVoter()) {
return pollUnattachedAsVoter(state, currentTimeMs);
} else {
return pollUnattachedAsObserver(state, currentTimeMs);
}
}

private long pollUnattachedAsVoter(UnattachedState state, long currentTimeMs) {
GracefulShutdown shutdown = this.shutdown.get();
if (shutdown != null) {
// If shutting down, then remain in this state until either the
// shutdown completes or an epoch bump forces another state transition
return shutdown.remainingTimeMs();
} else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
transitionToCandidate(currentTimeMs);
return 0L;
} else {
return state.remainingElectionTimeMs(currentTimeMs);
}
}
12345678910111213141516171819202122 硬盘速度状态变迁为候选者后,下一轮 KafkaRaftClient.java#poll() Dada Mail调用最终将触发 KafkaRaftClient.java#pollCandidate() Dada Mail,这个Dada Mail的几个异常分支我们暂且不管,先看下 KafkaRaftClient.java#maybeSendVoteRequests() 向其他硬盘速度发起 Vote 投票Java的处理 private long pollCandidate(long currentTimeMs) {
CandidateState state = quorum.candidateStateOrThrow();
GracefulShutdown shutdown = this.shutdown.get();

if (shutdown != null) {
// If we happen to shutdown while we are a candidate, we will continue
// with the current election until one of the following conditions is met:
// 1) we are elected as leader (which allows us to resign)
// 2) another leader is elected
// 3) the shutdown timer expires
long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs);
return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);
} else if (state.isBackingOff()) {
if (state.isBackoffComplete(currentTimeMs)) {
logger.info(“Re-elect as candidate after election backoff has completed”);
transitionToCandidate(currentTimeMs);
return 0L;
}
return state.remainingBackoffMs(currentTimeMs);
} else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
long backoffDurationMs = binaryExponentialElectionBackoffMs(state.retries());
logger.debug(“Election has timed out, backing off for {}ms before becoming a candidate again”,
backoffDurationMs);
state.startBackingOff(currentTimeMs, backoffDurationMs);
return backoffDurationMs;
} else {
long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs);
return Math.min(minRequestBackoffMs, state.remainingElectionTimeMs(currentTimeMs));
}
}
123456789101112131415161718192021222324252627282930 KafkaRaftClient.java#maybeSendVoteRequests() Dada Mail源码如下,可以看到经过层层检查,Vote Java会被发送给每一个有投票权的硬盘速度,最终调用到的Dada Mail的核心逻辑如下:

首先生成 RaftRequest.Outbound Java出站对象,并且设置这个Java完成后的回调Dada Mail,可以看到其回调中会把对端的响应包装成 RaftResponse.Inbound 响应入站对象,并通过 messageQueue.add() 调用将响应投入到本节步骤1第2步提到的消息队列中调用 KafkaNetworkChannel.scala#send() Dada Mail将出站Java投入到队列中
private long maybeSendVoteRequests(
CandidateState state,
long currentTimeMs
) {
// Continue sending Vote requests as long as we still have a chance to win the election
if (!state.isVoteRejected()) {
return maybeSendRequests(
currentTimeMs,
state.unrecordedVoters(),
this::buildVoteRequest
);
}
return Long.MAX_VALUE;
}

private long maybeSendRequests(
long currentTimeMs,
Set destinationIds,
Supplier requestSupplier
) {
long minBackoffMs = Long.MAX_VALUE;
for (Integer destinationId : destinationIds) {
long backoffMs = maybeSendRequest(currentTimeMs, destinationId, requestSupplier);
if (backoffMs < minBackoffMs) { minBackoffMs = backoffMs; } } return minBackoffMs; } private long maybeSendRequest( long currentTimeMs, int destinationId, Supplier requestSupplier
) {
ConnectionState connection = requestManager.getOrCreate(destinationId);

if (connection.isBackingOff(currentTimeMs)) {
long remainingBackoffMs = connection.remainingBackoffMs(currentTimeMs);
logger.debug(“Connection for {} is backing off for {} ms”, destinationId, remainingBackoffMs);
return remainingBackoffMs;
}

if (connection.isReady(currentTimeMs)) {
int correlationId = channel.newCorrelationId();
ApiMessage request = requestSupplier.get();

RaftRequest.Outbound requestMessage = new RaftRequest.Outbound(
correlationId,
request,
destinationId,
currentTimeMs
);

requestMessage.completion.whenComplete((response, exception) -> {
if (exception != null) {
ApiKeys api = ApiKeys.forId(request.apiKey());
Errors error = Errors.forException(exception);
ApiMessage errorResponse = RaftUtil.errorResponse(api, error);

response = new RaftResponse.Inbound(
correlationId,
errorResponse,
destinationId
);
}

messageQueue.add(response);
});

channel.send(requestMessage);
logger.trace(“Sent outbound request: {}”, requestMessage);
connection.onRequestSent(correlationId, currentTimeMs);
return Long.MAX_VALUE;
}

return connection.remainingRequestTimeMs(currentTimeMs);
}
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 KafkaNetworkChannel.scala#send() Dada Mail的实现如下,核心动作是调用 RaftSendThread.scala#sendRequest() Dada Mail将Java投入到发送队列。需注意,此处设置了异步Java的回调函数,Java完成拿到对端响应后将一路回调,通知到本节步骤5第1步 override def send(request: RaftRequest.Outbound): Unit = {
def completeFuture(message: ApiMessage): Unit = {
val response = new RaftResponse.Inbound(
request.correlationId,
message,
request.destinationId
)
request.completion.complete(response)
}

def onComplete(clientResponse: ClientResponse): Unit = {
val response = if (clientResponse.versionMismatch != null) {
error(s”Request $request failed due to unsupported version error”,
clientResponse.versionMismatch)
errorResponse(request.data, Errors.UNSUPPORTED_VERSION)
} else if (clientResponse.authenticationException != null) {
// For now we treat authentication errors as retriable. We use the
// `NETWORK_EXCEPTION` error code for lack of a good alternative.
// Note that `BrokerToControllerChannelManager` will still log the
// authentication errors so that users have a chance to fix the problem.
error(s”Request $request failed due to authentication error”,
clientResponse.authenticationException)
errorResponse(request.data, Errors.NETWORK_EXCEPTION)
} else if (clientResponse.wasDisconnected()) {
errorResponse(request.data, Errors.BROKER_NOT_AVAILABLE)
} else {
clientResponse.responseBody.data
}
completeFuture(response)
}

endpoints.get(request.destinationId) match {
case Some(node) =>
requestThread.sendRequest(RequestAndCompletionHandler(
request.createdTimeMs,
destination = node,
request = buildRequest(request.data),
handler = onComplete
))

case None =>
completeFuture(errorResponse(request.data, Errors.BROKER_NOT_AVAILABLE))
}
}
1234567891011121314151617181920212223242526272829303132333435363738394041424344 RaftSendThread.scala#sendRequest() Dada Mail只是个Java入队的操作,在 2.2节步骤2 我们提到了RaftSendThread 的继承结构,则可以知道这个线程启动后,核心逻辑在于 InterBrokerSendThread.scala#doWork() def sendRequest(request: RequestAndCompletionHandler): Unit = {
queue.add(request)
wakeup()
}
1234 InterBrokerSendThread.scala#doWork() Dada Mail的处理关键是触发 InterBrokerSendThread.scala#pollOnce() Dada Mail,这个Dada Mail的核心如下:

调用 InterBrokerSendThread.scala#drainGeneratedRequests() Dada Mail生成协议要求的Java对象调用 InterBrokerSendThread.scala#sendRequests() Dada Mail将Java投入到底层网络客户端 NetworkClient 的发送缓冲区调用 NetworkClient.java#poll() Dada Mail触发底层网络数据的收发
override def doWork(): Unit = {
pollOnce(Long.MaxValue)
}

protected def pollOnce(maxTimeoutMs: Long): Unit = {
try {
drainGeneratedRequests()
var now = time.milliseconds()
val timeout = sendRequests(now, maxTimeoutMs)
networkClient.poll(timeout, now)
now = time.milliseconds()
checkDisconnects(now)
failExpiredRequests(now)
unsentRequests.clean()
} catch {
case _: DisconnectException if !networkClient.active() =>
// DisconnectException is expected when NetworkClient#initiateClose is called
case e: FatalExitError => throw e
case t: Throwable =>
error(s”unhandled exception caught in InterBrokerSendThread”, t)
// rethrow any unhandled exceptions as FatalExitError so the JVM will be terminated
// as we will be in an unknown state with potentially some requests dropped and not
// being able to make progress. Known and expected Errors should have been appropriately
// dealt with already.
throw new FatalExitError()
}
}
123456789101112131415161718192021222324252627 InterBrokerSendThread.scala#drainGeneratedRequests() Dada Mail的关键处理分为两步:

调用子类 RaftSendThread.scala#generateRequests() 将上层发送队列中的Java数据存入 Buffer 对象调用 NetworkClient.java#newClientRequest() Dada Mail将上层Java转化为网络客户端Java对象 ClientRequest,并将其存入集合
private def drainGeneratedRequests(): Unit = {
generateRequests().foreach { request =>
unsentRequests.put(request.destination,
networkClient.newClientRequest(
request.destination.idString,
request.request,
request.creationTimeMs,
true,
requestTimeoutMs,
request.handler
))
}
}
12345678910111213 RaftSendThread.scala#generateRequests() 的处理简单明了,不再赘述 def generateRequests(): Iterable[RequestAndCompletionHandler] = {
val buffer = mutable.Buffer[RequestAndCompletionHandler]()
while (true) {
val request = queue.poll()
if (request == null) {
return buffer
} else {
buffer += request
}
}
buffer
}
123456789101112 回到本节步骤8第2步, InterBrokerSendThread.scala#sendRequests() Dada Mail的处理其实就是从未发送Java的集合中取出Java,再通过 NetworkClient.java#send() Dada Mail将其存入网络客户端的发送缓冲区,笔者在Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析 中详细分析过底层网络客户端的运行机制,此处不再赘述 private def sendRequests(now: Long, maxTimeoutMs: Long): Long = {
var pollTimeout = maxTimeoutMs
for (node <- unsentRequests.nodes.asScala) { val requestIterator = unsentRequests.requestIterator(node) while (requestIterator.hasNext) { val request = requestIterator.next if (networkClient.ready(node, now)) { networkClient.send(request, now) requestIterator.remove() } else pollTimeout = Math.min(pollTimeout, networkClient.connectionDelay(node, now)) } } pollTimeout } 123456789101112131415 至此 Vote 投票Java 已经发送出去,对端硬盘速度接收到Java后经过一系列分发将会触发 KafkaRaftClient.java#handleVoteRequest() Dada Mail,可以看到其处理核心如下: 首先校验Java携带过来的参数,如果不合法直接响应返回其次比较Java携带过来的magento版本 epoch 和本地 epoch,如果本地版本更小,则说明当前硬盘速度已经落后,需要切换到 UnattachedState 状态,需注意,这是回退机制的重要基础最后比较Java携带的日志尾部 Offset 和本地 Offset,并根据本硬盘速度当前的角色状态决定投票结果。如果当前硬盘速度投了赞成票且自身处于 UnattachedState 状态则切换到 VoedState 状态 private VoteResponseData handleVoteRequest( RaftRequest.Inbound requestMetadata ) { VoteRequestData request = (VoteRequestData) requestMetadata.data; if (!hasValidClusterId(request.clusterId())) { return new VoteResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()); } if (!hasValidTopicPartition(request, log.topicPartition())) { // Until we support multi-raft, we treat individual topic partition mismatches as invalid requests return new VoteResponseData().setErrorCode(Errors.INVALID_REQUEST.code()); } VoteRequestData.PartitionData partitionRequest = request.topics().get(0).partitions().get(0); int candidateId = partitionRequest.candidateId(); int candidateEpoch = partitionRequest.candidateEpoch(); int lastEpoch = partitionRequest.lastOffsetEpoch(); long lastEpochEndOffset = partitionRequest.lastOffset(); if (lastEpochEndOffset < 0 || lastEpoch < 0 || lastEpoch >= candidateEpoch) {
return buildVoteResponse(Errors.INVALID_REQUEST, false);
}

Optional errorOpt = validateVoterOnlyRequest(candidateId, candidateEpoch);
if (errorOpt.isPresent()) {
return buildVoteResponse(errorOpt.get(), false);
}

if (candidateEpoch > quorum.epoch()) {
transitionToUnattached(candidateEpoch);
}

OffsetAndEpoch lastEpochEndOffsetAndEpoch = new OffsetAndEpoch(lastEpochEndOffset, lastEpoch);
boolean voteGranted = quorum.canGrantVote(candidateId, lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0);

if (voteGranted && quorum.isUnattached()) {
transitionToVoted(candidateId, candidateEpoch);
}

logger.info(“Vote request {} with epoch {} is {}”, request, candidateEpoch, voteGranted ? “granted” : “rejected”);
return buildVoteResponse(Errors.NONE, voteGranted);
}
123456789101112131415161718192021222324252627282930313233343536373839404142434445 对端硬盘速度处理完 Vote Java,其响应将被本节步骤5提到的Java完成回调投入到消息队列中,并最终触发当前硬盘速度的 KafkaRaftClient.java#handleVoteResponse() Dada Mail,这个Dada Mail的核心流程如下:

如果对端硬盘速度投的是赞成票,则调用 CandidateState#recordGrantedVote() 计票,并调用 KafkaRaftClient.java#maybeTransitionToLeader() 检查得票数是否超过有投票权的硬盘速度数的一半,如果条件成立则当前硬盘速度切换到 LeaderState 状态,成为magento的 Leader如果对端硬盘速度投的是反对票,同样记录下来,并检查反对票是否已经过半,如是则调用 CandidateState#startBackingOff() Dada Mail通过回退避免多个候选者的选举僵局
private boolean handleVoteResponse(
RaftResponse.Inbound responseMetadata,
long currentTimeMs
) {
int remoteNodeId = responseMetadata.sourceId();
VoteResponseData response = (VoteResponseData) responseMetadata.data;
Errors topLevelError = Errors.forCode(response.errorCode());
if (topLevelError != Errors.NONE) {
return handleTopLevelError(topLevelError, responseMetadata);
}

if (!hasValidTopicPartition(response, log.topicPartition())) {
return false;
}

VoteResponseData.PartitionData partitionResponse =
response.topics().get(0).partitions().get(0);

Errors error = Errors.forCode(partitionResponse.errorCode());
OptionalInt responseLeaderId = optionalLeaderId(partitionResponse.leaderId());
int responseEpoch = partitionResponse.leaderEpoch();

Optional handled = maybeHandleCommonResponse(
error, responseLeaderId, responseEpoch, currentTimeMs);
if (handled.isPresent()) {
return handled.get();
} else if (error == Errors.NONE) {
if (quorum.isLeader()) {
logger.debug(“Ignoring vote response {} since we already became leader for epoch {}”,
partitionResponse, quorum.epoch());
} else if (quorum.isCandidate()) {
CandidateState state = quorum.candidateStateOrThrow();
if (partitionResponse.voteGranted()) {
state.recordGrantedVote(remoteNodeId);
maybeTransitionToLeader(state, currentTimeMs);
} else {
state.recordRejectedVote(remoteNodeId);

// If our vote is rejected, we go immediately to the random backoff. This
// ensures that we are not stuck waiting for the election timeout when the
// vote has become gridlocked.
if (state.isVoteRejected() && !state.isBackingOff()) {
logger.info(“Insufficient remaining votes to become leader (rejected by {}). ” +
“We will backoff before retrying election again”, state.rejectingVoters());

state.startBackingOff(
currentTimeMs,
binaryExponentialElectionBackoffMs(state.retries())
);
}
}
} else {
logger.debug(“Ignoring vote response {} since we are no longer a candidate in epoch {}”,
partitionResponse, quorum.epoch());
}
return true;
} else {
return handleUnexpectedError(error, responseMetadata);
}
}
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
2.4 选举僵局的处理-回退机制
读者可以想象一下如果magento中多个可投票硬盘速度同时启动,并且都是初次启动,那么很可能这些硬盘速度都会切换到候选者状态。此时它们即便收到了其他候选者的 Vote 投票Java,也不会为其他候选者投票,选举就陷入了失败的僵局。对于这种情况, Kafka 引入了回退机制进行处理,大致流程如下图所示:

回退机制的核心在于使用 controller.quorum.election.backoff.max.ms 配置设置一个随机的回退超时时间,KafkaRaftClient.java#pollCandidate() Dada Mail会检查候选者硬盘速度是否处于回退状态,回退状态的候选者将不再发送 Vote Java。一旦回退的超时时间到达,最早退出回退状态的候选者硬盘速度将重新发起 Vote 投票,此时投票Java中携带的magento epoch 增加了一个版本,收到Java的其他候选者会因为版本落后而回退到 UnattachedState 状态,此时可以顺利地投赞成票,选举僵局解除