在kafka数据不均衡,或者需要下线节点迁移数据时,我们一般做的3个步骤就是:
1)./kafka-reassign-partitions.sh --zookeeper xxx:2181/kafka --topics-to-move-json-file topics-to-move.json --broker-list "xxx" --generate
2)./kafka-reassign-partitions.sh --zookeeper xxx:2181/kafka --reassignment-json-file resign.json --execute
3)./kafka-reassign-partitions.sh --zookeeper xxx:2181/kafka --reassignment-json-file resign.json --verify
但是第一步generate产生的重新分配规则,一般会导致所有partition都会被重新分配,产生不必要的迁移,比如topicA的partition1原先在node1节点上,我们现在是要下线node2节点,node2节点上有topicA的partition,所以需要迁移,但没必要迁移node1上的。这里写了个工具产生重新分配规则,即代替其中的第一步的命令执行,执行方式如:
打包以下源码成xxx.jar,执行:java -cp xxx.jar com.bigdata.tools.kafka.PartitionReassignment010 kafkazk根地址 要下线的brokerid
源码如下:
package com.bigdata.tools.kafka import joptsimple.OptionParser import kafka.common.TopicAndPartition import kafka.utils.ZkUtils import org.apache.kafka.common.security.JaasUtils import scala.collection.{Seq, mutable} /** * @author :sfw * date: 2020/5/28 14:56. * description: * 输入下线的kafka broker id,生成新的分配规则供kafka-reassign-partitions.sh使用 * 对于为什么不用kafka-reassign-partitions.sh --generate产生的分配规则,是因为我们目的不在于重新打散分配这些partition,而是为了把这个要 * 下线的节点上的partition迁移到其他存活的节点上,减少没必要的partition移动。 * 这里的分配逻辑是: * 获取原先topic的partition的分配位置,然后替换掉要下线的节点,替换原则是round-robin */ class PartitionReassignment010 { } object PartitionReassignment010 { def main(args: Array[String]) { val opts = new ReassignPartitionsOptions(args) val zkConnect = opts.options.valueOf(opts.zkConnectOpt) val offlineBrokerid = opts.options.valueOf(opts.offlineBrokerid) val zkUtils = ZkUtils.apply(zkConnect, 30000, 30000, JaasUtils.isZkSecurityEnabled) printPartitionReassignment(zkUtils, offlineBrokerid) } def printPartitionReassignment(zkUtils: ZkUtils, offlineBrokerid: Int): Unit = { //所有topic的分布 val allReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(zkUtils.getAllTopics()) //存活着的brokerid val aliveBrokerids = zkUtils.getSortedBrokerList() var reassignTopic: Set[String] = Set() allReplicaAssignment.foreach(item => { if (item._2.contains(offlineBrokerid)) { reassignTopic += item._1.topic } }) //在下线节点上所有的topic分布 val currentAssignments = zkUtils.getReplicaAssignmentForTopics(reassignTopic.toSeq) var proposedAssignments: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map() var roundIndex = 0; //即每个partition寻找替换副本的循环次数,最多循环2次,2次没找到即表示broker机器不够,无法找到足够的副本存储broker var circleNum = 1; currentAssignments.foreach(item => { var breakFlag = false while (roundIndex < aliveBrokerids.size && !breakFlag) { if (!item._2.contains(offlineBrokerid)) { proposedAssignments += (item._1 -> item._2) breakFlag = true circleNum = 1 } else { val replaceBrokerid = aliveBrokerids(roundIndex) roundIndex += 1 if (offlineBrokerid != replaceBrokerid && !item._2.contains(replaceBrokerid)) { var replaceSeq: Seq[Int] = List() replaceSeq = item._2 replaceSeq = replaceSeq.updated(replaceSeq.indexOf(offlineBrokerid), replaceBrokerid) proposedAssignments += (item._1 -> replaceSeq) breakFlag = true circleNum = 1 } } if (roundIndex >= aliveBrokerids.size) { if (2 == circleNum) { throw new Exception("无法找到足够的broker分配副本") } roundIndex = 0; circleNum += 1 } } }) println("Current partition replica assignment\n%s\n".format(ZkUtils.formatAsReassignmentJson(currentAssignments))) println("Proposed partition reassignment configuration\n%s".format(ZkUtils.formatAsReassignmentJson(proposedAssignments))) } class ReassignPartitionsOptions(args: Array[String]) { val parser = new OptionParser val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " + "form host:port. Multiple URLS can be given to allow fail-over.") .withRequiredArg .describedAs("urls") .ofType(classOf[String]) val offlineBrokerid = parser.accepts("offlineid", "REQUIRED: the offline brokerid").withRequiredArg .describedAs("offlineBrokerid") .ofType(classOf[Int]) val options = parser.parse(args: _*) } }
