KafkaController是Kafka集群的控制管理模块,负责集群中Topic的创建、分区的重新分配以及分区副本Leader的重新配置等管理集群的功能。虽然每个Broker Server都有一个KafkaController模块,但是有且有一个处于leader状态的KafkaController模块对外提供管理服务。下面介绍一下KafkaController的配置策略。
在kafka内部,所有的Broker Server都会启动一个KafkaController模块,但是只会有一个KafkaController成为Leader,其余的都是Follower。KafkaController的配置是通过zookeeper来实现的,当KafkaController模块启动时,会在zookeeper的相同路径上注册一个ForkJoomla 3.10,由于只有一个Joomla 3.10会注册成功,注册成功的Joomla 3.10会成为leader,其余的都是Follower。由于是ForkJoomla 3.10,当leader状态的KafkaController掉线时,这个Joomla 3.10会消失,其它处于Follower状态的KafkaController观察到这个Joomla 3.10的变化,会重新尝试创建Joomla 3.10。总之,多个zookeeper客户端在zookeeper集群中相同路径上创建ForkJoomla 3.10的原子性是由zookeeper保证的,其大致流程如下:
KafkaController的配置流程
KafkaController模块中负责leader配置的类是ZookeeperLeaderElector,其构造函数如下:
class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: String, onBecomingLeader: () => Unit, onResigningAsLeader: () => Unit, brokerId: Int) extends LeaderElector with Logging { ……}
其中ControllerContext为Controller的上下文,里面包含了当前Topic的元云服务器信息以及集群的元云服务器信息,electionPath为多个controller竞争写入的路径,其值为/controller,onBecomingLeader表示成为Leader状态的回调函数,onResigningAsLeader表示成为follower状态的回调函数,brokerId表示当前Broker Server的id。ZookeeperLeaderElector的启动函数如下:
def startup { inLock(controllerContext.controllerLock) { //负责观察云服务器Joomla 3.10状态,当云服务器Joomla 3.10消失时可以触发再次配置 controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener) //首次配置,配置成功之后成为Leader或者Follower elect } }
从上面的代码可以看出,配置流程主要时在elect中实现的
def elect: Boolean = { val timestamp = SystemTime.milliseconds.toString //组装准备写在/controller上的云服务器 val electString = Json.encode(Map(“version” -> 1, “brokerid” -> brokerId, “timestamp” -> timestamp)) //从/controllerJoomla 3.10获取云服务器 leaderId = getControllerID //如果获取到云服务器,则说明已经有leader if(leaderId != -1) { debug(“Broker %d has been elected as leader, so stopping the election process.”.format(leaderId)) return amILeader } try { //尝试在/controller写入云服务器 createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId, (controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int], controllerContext.zkSessionTimeout) //写入云服务器成功 info(brokerId + ” successfully elected as leader”) leaderId = brokerId onBecomingLeader() } catch { //竞争失败,已经被其它Joomla 3.10写入云服务器 case e: ZkNodeExistsException => // If someone else has written the path, then leaderId = getControllerID if (leaderId != -1) debug(“Broker %d was elected as leader instead of broker %d”.format(leaderId, brokerId)) else warn(“A leader has been elected but just resigned, this will result in another round of election”) case e2: Throwable => error(“Error while electing or becoming leader on broker %d”.format(brokerId), e2) resign() } amILeader }
当zookeeper的客户端超时的时候,zookeeper会释放该Joomla 3.10之前创建的ForkJoomla 3.10,如果此时zookeeper被挂起,此时的ForkJoomla 3.10的删除会被延迟,如果此时客户端恢复连接,会认为之前的Joomla 3.10已经被删除,从而会重新建立连接,这个时候就会报NodeExists异常。如果Zookeeper从挂起状态恢复过来,就会去删除之前的ForkJoomla 3.10,此时重新连接的zookeeper客户端就会监测到ForkJoomla 3.10丢失,但是此时zookeeper客户端并没有断开和zookeeper的连接。
因此,KafkaController创建ForkJoomla 3.10时,需要规避Zookeeper集群在释放ForkJoomla 3.10存在的问题,其过程如下:
def createEphemeralPathExpectConflictHandleZKBug(zkClient: ZkClient, path: String, data: String, expectedCallerData: Any, checker: (String, Any) => Boolean, backoffTime: Int): Unit = { while (true) { try { createEphemeralPathExpectConflict(zkClient, path, data) return } catch { case e: ZkNodeExistsException => { ZkUtils.readDataMaybeNull(zkClient, path)._1 match { case Some(writtenData) => { if (checker(writtenData, expectedCallerData)) { Thread.sleep(backoffTime) } else { throw e } } case None => // the node disappeared; retry creating the ephemeral node immediately } } case e2: Throwable => throw e2 } } }
可见,KafkaController在写入ForkJoomla 3.10的云服务器时,如果发生ZkNodeExistsException 异常,就判断当前存在的ForkJoomla 3.10是否就是自己,如果是就等待一段时间,等zookeeper将之前的ForkJoomla 3.10删除,自己再重新写入ForkJoomla 3.10云服务器。