Skip to content

Commit 7f3f41f

Browse files
committed
[LI-HOTFIX] Add dynamic maintenance broker config
TICKET = KAFKA-8527 LI_DESCRIPTION = When a broker is masked as maintenance broker, Kafka will not assign partitions of new topics to the broker if partition assignment is done by kafka brokers. 1) add dynamic broker config to support maintenance brokers 2) add logic to update maintenance broker info in controller 3) support maintenance broker in Kafka API 4) support maintenance broker in AdminZkClient 5) add logic to rearrange replica assignment for new topics for maintenance brokers 5) test cases RB=1484649 G=Kafka-Code-Reviews R=agencer, amendhek, jkoshy, zahuang A=zahuang EXIT_CRITERIA = TICKET [KAFKA-8527]
1 parent ca4a695 commit 7f3f41f

File tree

8 files changed

+383
-12
lines changed

8 files changed

+383
-12
lines changed

core/src/main/scala/kafka/controller/KafkaController.scala

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ class KafkaController(val config: KafkaConfig,
9595
threadNamePrefix: Option[String] = None)
9696
extends ControllerEventProcessor with Logging with KafkaMetricsGroup {
9797

98+
val adminZkClient = new AdminZkClient(zkClient)
9899
this.logIdent = s"[Controller id=${config.brokerId}] "
99100

100101
@volatile private var brokerInfo = initialBrokerInfo
@@ -180,6 +181,13 @@ class KafkaController(val config: KafkaConfig,
180181
}
181182
)
182183

184+
newGauge(
185+
"MaintenanceBrokerCount",
186+
new Gauge[Int] {
187+
def value: Int = if (isActive) config.getMaintenanceBrokerList.size else 0
188+
}
189+
)
190+
183191
/**
184192
* Returns true if this broker is the current controller.
185193
*/
@@ -695,6 +703,7 @@ class KafkaController(val config: KafkaConfig,
695703
controllerContext.setLiveBrokerAndEpochs(curBrokerAndEpochs)
696704
info(s"Initialized broker epochs cache: ${controllerContext.liveBrokerIdAndEpochs}")
697705
controllerContext.allTopics = zkClient.getAllTopicsInCluster.toSet
706+
rearrangePartitionReplicaAssignmentForNewTopics(controllerContext.allTopics)
698707
registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
699708
getReplicaAssignmentPolicyCompliant(controllerContext.allTopics.toSet).foreach {
700709
case (topicPartition, assignedReplicas) => controllerContext.updatePartitionReplicaAssignment(topicPartition, assignedReplicas)
@@ -770,6 +779,37 @@ class KafkaController(val config: KafkaConfig,
770779
}
771780
}
772781

782+
// Rearrange partition and replica assignment for new topics that get assigned to
783+
// maintenance brokers that do not take new partitions
784+
private def rearrangePartitionReplicaAssignmentForNewTopics(topics: Set[String]) {
785+
try {
786+
val noNewPartitionBrokerIds = config.getMaintenanceBrokerList
787+
if (noNewPartitionBrokerIds.nonEmpty) {
788+
val newTopics = zkClient.getPartitionNodeNonExistsTopics(topics.toSet)
789+
val newTopicsToBeArranged = zkClient.getPartitionAssignmentForTopics(newTopics).filter {
790+
case (_, partitionMap) =>
791+
partitionMap.exists {
792+
case (_, assignedReplicas) =>
793+
assignedReplicas.intersect(noNewPartitionBrokerIds).nonEmpty
794+
}
795+
}
796+
newTopicsToBeArranged.foreach {
797+
case (topic, partitionMap) =>
798+
val numPartitions = partitionMap.size
799+
val numReplica = partitionMap.head._2.size
800+
val brokers = controllerContext.liveOrShuttingDownBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }.toSeq
801+
802+
val replicaAssignment = adminZkClient.assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokerIds.toSet, numPartitions, numReplica)
803+
adminZkClient.writeTopicPartitionAssignment(topic, replicaAssignment, true)
804+
info(s"Rearrange partition and replica assignment for topic [$topic]")
805+
}
806+
}
807+
} catch {
808+
case e =>
809+
error("Error during rearranging partition and replica assignment for new topics for maintenance brokers :" + e.getMessage)
810+
}
811+
}
812+
773813
private def moveReassignedPartitionLeaderIfRequired(topicPartition: TopicPartition,
774814
reassignedPartitionContext: ReassignedPartitionsContext) {
775815
val reassignedReplicas = reassignedPartitionContext.newReplicas
@@ -1320,6 +1360,7 @@ class KafkaController(val config: KafkaConfig,
13201360
val newTopics = topics -- controllerContext.allTopics
13211361
val deletedTopics = controllerContext.allTopics -- topics
13221362
controllerContext.allTopics = topics
1363+
rearrangePartitionReplicaAssignmentForNewTopics(newTopics)
13231364

13241365
registerPartitionModificationsHandlers(newTopics.toSeq)
13251366
val addedPartitionReplicaAssignment = getReplicaAssignmentPolicyCompliant(newTopics)
@@ -1593,11 +1634,12 @@ class KafkaController(val config: KafkaConfig,
15931634

15941635
val replicationFactor = config.defaultReplicationFactor
15951636
val brokers = controllerContext.liveOrShuttingDownBrokers.map { sb => kafka.admin.BrokerMetadata(sb.id, sb.rack) }.toSeq
1637+
val noNewPartitionBrokerIds = config.getMaintenanceBrokerList.toSet
15961638

15971639
topicsReplicaAssignment.foreach{
15981640
case(topic, partitionAssignment) => {
15991641
val numPartitions = partitionAssignment.size
1600-
val assignment = AdminUtils.assignReplicasToBrokers(brokers, numPartitions, replicationFactor)
1642+
val assignment = adminZkClient.assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokerIds, numPartitions, replicationFactor)
16011643
.map{ case(partition, replicas) => {
16021644
(new TopicPartition(topic, partition), replicas)
16031645
}}.toMap

core/src/main/scala/kafka/server/AdminManager.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package kafka.server
1818

1919
import java.util.{Collections, Properties}
2020

21-
import kafka.admin.{AdminOperationException, AdminUtils}
21+
import kafka.admin.AdminOperationException
2222
import kafka.common.TopicAlreadyMarkedForDeletionException
2323
import kafka.log.LogConfig
2424
import kafka.metrics.KafkaMetricsGroup
@@ -96,7 +96,7 @@ class AdminManager(val config: KafkaConfig,
9696
"Both cannot be used at the same time.")
9797
}
9898
val assignments = if (topic.assignments().isEmpty) {
99-
AdminUtils.assignReplicasToBrokers(brokers, topic.numPartitions, topic.replicationFactor)
99+
adminZkClient.assignReplicasToAvailableBrokers(brokers, config.getMaintenanceBrokerList.toSet, topic.numPartitions, topic.replicationFactor)
100100
} else {
101101
val assignments = new mutable.HashMap[Int, Seq[Int]]
102102
// Note: we don't check that replicaAssignment contains unknown brokers - unlike in add-partitions case,
@@ -276,7 +276,8 @@ class AdminManager(val config: KafkaConfig,
276276
}
277277

278278
val updatedReplicaAssignment = adminZkClient.addPartitions(topic, existingAssignment, allBrokers,
279-
newPartition.totalCount, reassignment, validateOnly = validateOnly)
279+
newPartition.totalCount, reassignment, validateOnly = validateOnly,
280+
noNewPartitionBrokerIds = config.getMaintenanceBrokerList.toSet)
280281
CreatePartitionsMetadata(topic, updatedReplicaAssignment, ApiError.NONE)
281282
} catch {
282283
case e: AdminOperationException =>
@@ -543,7 +544,10 @@ class AdminManager(val config: KafkaConfig,
543544
}
544545

545546
private def configType(name: String, synonyms: List[String]): ConfigDef.Type = {
546-
val configType = config.typeOf(name)
547+
var configType = config.typeOf(name)
548+
if (configType == null)
549+
configType = DynamicConfig.Broker.typeOf(name)
550+
547551
if (configType != null)
548552
configType
549553
else

core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,10 @@ object DynamicBrokerConfig {
130130
checkInvalidProps(securityConfigsWithoutListenerPrefix(props),
131131
"These security configs can be dynamically updated only per-listener using the listener prefix")
132132
validateConfigTypes(props)
133-
if (!perBrokerConfig) {
133+
if (perBrokerConfig) {
134+
checkInvalidProps(clusterLevelConfigs(props),
135+
"Cannot update these configs at per broker level, broker id must not be specified")
136+
} else {
134137
checkInvalidProps(perBrokerConfigs(props),
135138
"Cannot update these configs at default cluster level, broker id must be specified")
136139
}
@@ -147,6 +150,11 @@ object DynamicBrokerConfig {
147150
configNames.intersect(PerBrokerConfigs) ++ configNames.filter(perBrokerListenerConfig)
148151
}
149152

153+
private def clusterLevelConfigs(props: Properties): Set[String] = {
154+
val configNames = props.asScala.keySet
155+
configNames.intersect(DynamicConfig.Broker.ClusterLevelConfigs)
156+
}
157+
150158
private def nonDynamicConfigs(props: Properties): Set[String] = {
151159
props.asScala.keySet.intersect(DynamicConfig.Broker.nonDynamicProps)
152160
}
@@ -311,6 +319,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
311319
}
312320
}
313321

322+
private[server] def getMaintenanceBrokerList: Seq[Int] = CoreUtils.inReadLock(lock) {
323+
DynamicConfig.Broker.getMaintenanceBrokerListFromString(dynamicDefaultConfigs.getOrElse(DynamicConfig.Broker.MaintenanceBrokerListProp,
324+
DynamicConfig.Broker.DefaultMaintenanceBrokerList).toString)
325+
}
326+
314327
private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = {
315328
secret.map { secret =>
316329
new PasswordEncoder(secret,

core/src/main/scala/kafka/server/DynamicConfig.scala

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ import java.util.Properties
2121

2222
import kafka.log.LogConfig
2323
import kafka.security.CredentialProvider
24-
import org.apache.kafka.common.config.ConfigDef
24+
import org.apache.kafka.common.config.{ConfigDef, ConfigException}
2525
import org.apache.kafka.common.config.ConfigDef.Importance._
2626
import org.apache.kafka.common.config.ConfigDef.Range._
2727
import org.apache.kafka.common.config.ConfigDef.Type._
28+
import org.apache.kafka.common.config.ConfigDef.Validator
2829

2930
import scala.collection.JavaConverters._
3031

@@ -39,9 +40,11 @@ object DynamicConfig {
3940
val LeaderReplicationThrottledRateProp = "leader.replication.throttled.rate"
4041
val FollowerReplicationThrottledRateProp = "follower.replication.throttled.rate"
4142
val ReplicaAlterLogDirsIoMaxBytesPerSecondProp = "replica.alter.log.dirs.io.max.bytes.per.second"
43+
val MaintenanceBrokerListProp = "maintenance.broker.list"
4244

4345
//Defaults
4446
val DefaultReplicationThrottledRate = ReplicationQuotaManagerConfig.QuotaBytesPerSecondDefault
47+
val DefaultMaintenanceBrokerList: String = ""
4548

4649
//Documentation
4750
val LeaderReplicationThrottledRateDoc = "A long representing the upper bound (bytes/sec) on replication traffic for leaders enumerated in the " +
@@ -52,19 +55,45 @@ object DynamicConfig {
5255
s"limit be kept above 1MB/s for accurate behaviour."
5356
val ReplicaAlterLogDirsIoMaxBytesPerSecondDoc = "A long representing the upper bound (bytes/sec) on disk IO used for moving replica between log directories on the same broker. " +
5457
s"This property can be only set dynamically. It is suggested that the limit be kept above 1MB/s for accurate behaviour."
58+
val MaintenanceBrokerListDoc = "A list containing maintenance broker Ids, separated by comma"
5559

5660
//Definitions
5761
private val brokerConfigDef = new ConfigDef()
5862
//round minimum value down, to make it easier for users.
5963
.define(LeaderReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, LeaderReplicationThrottledRateDoc)
6064
.define(FollowerReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, FollowerReplicationThrottledRateDoc)
6165
.define(ReplicaAlterLogDirsIoMaxBytesPerSecondProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, ReplicaAlterLogDirsIoMaxBytesPerSecondDoc)
66+
.define(MaintenanceBrokerListProp, STRING, DefaultMaintenanceBrokerList, MaintenanceBrokerListValidator, MEDIUM, MaintenanceBrokerListDoc)
6267
DynamicBrokerConfig.addDynamicConfigs(brokerConfigDef)
6368
val nonDynamicProps = KafkaConfig.configNames.toSet -- brokerConfigDef.names.asScala
6469

70+
//cluster level only configs
71+
val ClusterLevelConfigs = Set(MaintenanceBrokerListProp)
72+
def typeOf(key: String): ConfigDef.Type = {
73+
val configKey: ConfigDef.ConfigKey = brokerConfigDef.configKeys().get(key)
74+
if (configKey == null)
75+
null
76+
else
77+
configKey.`type`
78+
}
79+
6580
def names = brokerConfigDef.names
6681

6782
def validate(props: Properties) = DynamicConfig.validate(brokerConfigDef, props, customPropsAllowed = true)
83+
84+
def getMaintenanceBrokerListFromString(brokerListStr: String): Seq[Int] = {
85+
brokerListStr.split(",").map(_.trim).filter(_.nonEmpty).map(_.toInt)
86+
}
87+
88+
object MaintenanceBrokerListValidator extends Validator {
89+
override def ensureValid(name: String, value: Any): Unit = {
90+
try {
91+
getMaintenanceBrokerListFromString(value.toString)
92+
} catch {
93+
case e: NumberFormatException => throw new ConfigException(name, value.toString, e.getMessage)
94+
}
95+
}
96+
}
6897
}
6998

7099
object Client {

core/src/main/scala/kafka/server/KafkaConfig.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1403,6 +1403,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
14031403
millis
14041404
}
14051405

1406+
def getMaintenanceBrokerList: Seq[Int] = {
1407+
dynamicConfig.getMaintenanceBrokerList
1408+
}
1409+
14061410
private def getMap(propName: String, propValue: String): Map[String, String] = {
14071411
try {
14081412
CoreUtils.parseCsvMap(propValue)

core/src/main/scala/kafka/zk/AdminZkClient.scala

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
5252
topicConfig: Properties = new Properties,
5353
rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
5454
val brokerMetadatas = getBrokerMetadatas(rackAwareMode)
55-
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
55+
val noNewPartitionBrokerIds = getMaintenanceBrokerList()
56+
val replicaAssignment = assignReplicasToAvailableBrokers(brokerMetadatas, noNewPartitionBrokerIds.toSet, partitions, replicationFactor)
5657
createTopicWithAssignment(topic, topicConfig, replicaAssignment)
5758
}
5859

@@ -80,6 +81,16 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
8081
brokerMetadatas.sortBy(_.id)
8182
}
8283

84+
/**
85+
* fetch maintenance broker list from zk
86+
*/
87+
def getMaintenanceBrokerList(): Seq[Int] = {
88+
val maintenanceBrokerConfig = fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default)
89+
.getProperty(DynamicConfig.Broker.MaintenanceBrokerListProp, DynamicConfig.Broker.DefaultMaintenanceBrokerList)
90+
91+
DynamicConfig.Broker.getMaintenanceBrokerListFromString(maintenanceBrokerConfig)
92+
}
93+
8394
def createTopicWithAssignment(topic: String,
8495
config: Properties,
8596
partitionReplicaAssignment: Map[Int, Seq[Int]]): Unit = {
@@ -134,7 +145,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
134145
LogConfig.validate(config)
135146
}
136147

137-
private def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, Seq[Int]], isUpdate: Boolean) {
148+
def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, Seq[Int]], isUpdate: Boolean) {
138149
try {
139150
val assignment = replicaAssignment.map { case (partitionId, replicas) => (new TopicPartition(topic,partitionId), replicas) }.toMap
140151

@@ -168,6 +179,39 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
168179
}
169180
}
170181

182+
/**
183+
* Assign replicas to brokers that take new partitions.
184+
* If the number of replicationFactor is greater than the number of brokers that take new partitions,
185+
* all brokers are used for assignment.
186+
* @param brokerMetadatas
187+
* @param noNewPartitionBrokerIds
188+
* @param nPartitions
189+
* @param replicationFactor
190+
* @param fixedStartIndex
191+
* @param startPartitionId
192+
* @return
193+
*/
194+
def assignReplicasToAvailableBrokers(brokerMetadatas: Seq[BrokerMetadata],
195+
noNewPartitionBrokerIds: Set[Int],
196+
nPartitions: Int,
197+
replicationFactor: Int,
198+
fixedStartIndex: Int = -1,
199+
startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
200+
201+
val availableBrokerMetadata = brokerMetadatas.filter {
202+
brokerMetadata =>
203+
if (noNewPartitionBrokerIds.contains(brokerMetadata.id)) false
204+
else true
205+
}
206+
207+
if (replicationFactor > availableBrokerMetadata.size) {
208+
info(s"Using all brokers for replica assignment since replicationFactor[$replicationFactor] " +
209+
s"is larger than the number of nonMaintenanceBroker[${availableBrokerMetadata.size}]")
210+
AdminUtils.assignReplicasToBrokers(brokerMetadatas, nPartitions, replicationFactor, fixedStartIndex, startPartitionId)
211+
} else
212+
AdminUtils.assignReplicasToBrokers(availableBrokerMetadata, nPartitions, replicationFactor, fixedStartIndex, startPartitionId)
213+
}
214+
171215
/**
172216
* Add partitions to existing topic with optional replica assignment
173217
*
@@ -177,14 +221,16 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
177221
* @param numPartitions Number of partitions to be set
178222
* @param replicaAssignment Manual replica assignment, or none
179223
* @param validateOnly If true, validate the parameters without actually adding the partitions
224+
* @param noNewPartitionBrokerIds Brokers that do not take new partitions
180225
* @return the updated replica assignment
181226
*/
182227
def addPartitions(topic: String,
183228
existingAssignment: Map[Int, Seq[Int]],
184229
allBrokers: Seq[BrokerMetadata],
185230
numPartitions: Int = 1,
186231
replicaAssignment: Option[Map[Int, Seq[Int]]] = None,
187-
validateOnly: Boolean = false): Map[Int, Seq[Int]] = {
232+
validateOnly: Boolean = false,
233+
noNewPartitionBrokerIds: Set[Int] = Set.empty[Int]): Map[Int, Seq[Int]] = {
188234
val existingAssignmentPartition0 = existingAssignment.getOrElse(0,
189235
throw new AdminOperationException(
190236
s"Unexpected existing replica assignment for topic '$topic', partition id 0 is missing. " +
@@ -204,8 +250,8 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
204250

205251
val proposedAssignmentForNewPartitions = replicaAssignment.getOrElse {
206252
val startIndex = math.max(0, allBrokers.indexWhere(_.id >= existingAssignmentPartition0.head))
207-
AdminUtils.assignReplicasToBrokers(allBrokers, partitionsToAdd, existingAssignmentPartition0.size,
208-
startIndex, existingAssignment.size)
253+
assignReplicasToAvailableBrokers(allBrokers, noNewPartitionBrokerIds, partitionsToAdd,
254+
existingAssignmentPartition0.size, startIndex, existingAssignment.size)
209255
}
210256

211257
val proposedAssignment = existingAssignment ++ proposedAssignmentForNewPartitions

core/src/main/scala/kafka/zk/KafkaZkClient.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,23 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
602602
}.toMap
603603
}
604604

605+
def getPartitionNodeNonExistsTopics(topics: Set[String]): Set[String] = {
606+
val existsRequests = topics.map(topic => ExistsRequest(TopicPartitionsZNode.path(topic), ctx = Some(topic)))
607+
val existsResponses = retryRequestsUntilConnected(existsRequests.toSeq)
608+
val newTopics = scala.collection.mutable.Set.empty[String]
609+
610+
existsResponses.foreach {
611+
existsResponse =>
612+
val topic = existsResponse.ctx.get.asInstanceOf[String]
613+
existsResponse.resultCode match {
614+
case Code.OK =>
615+
case Code.NONODE => newTopics.add(topic)
616+
case _ => throw existsResponse.resultException.get
617+
}
618+
}
619+
newTopics.toSet
620+
}
621+
605622
/**
606623
* Gets the partition numbers for the given topics
607624
* @param topics the topics whose partitions we wish to get.

0 commit comments

Comments
 (0)