Skip to content

Commit feed875

Browse files
hzxa21xiowu0
authored andcommitted
[LI-HOTFIX] Reduce lock retention and improve broker shutdown time:
TICKET = [KAFKA-8667, KAFKA-8668] LI_DESCRIPTION = - Avoid acquiring partitionMap lock in shutdownIdleFetcherThread - Avoid appending to the time index during shutdown if the time index has not yet be initialized RB=1431408 BUG=LIKAFKA-19361 G=Kafka-Code-Reviews R=jkoshy,jonlee A=jkoshy,jonlee EXIT_CRITERIA = TICKET [KAFKA-8667, KAFKA-8668]
1 parent d3e3a0a commit feed875

File tree

3 files changed

+7
-8
lines changed

3 files changed

+7
-8
lines changed

core/src/main/scala/kafka/log/LogSegment.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,9 @@ class LogSegment private[log] (val log: FileRecords,
598598
* Close this log segment
599599
*/
600600
def close() {
601-
CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true), this)
601+
if (_maxTimestampSoFar.nonEmpty || _offsetOfMaxTimestampSoFar.nonEmpty) {
602+
CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true), this)
603+
}
602604
CoreUtils.swallow(offsetIndex.close(), this)
603605
CoreUtils.swallow(timeIndex.close(), this)
604606
CoreUtils.swallow(log.close(), this)

core/src/main/scala/kafka/server/AbstractFetcherManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
182182
lock synchronized {
183183
val keysToBeRemoved = new mutable.HashSet[BrokerIdAndFetcherId]
184184
for ((key, fetcher) <- fetcherThreadMap) {
185-
if (fetcher.partitionCount <= 0) {
185+
if (fetcher.idle) {
186186
fetcher.shutdown()
187187
keysToBeRemoved += key
188188
}

core/src/main/scala/kafka/server/AbstractFetcherThread.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ abstract class AbstractFetcherThread(name: String,
6868
val fetcherStats = new FetcherStats(metricId)
6969
val fetcherLagStats = new FetcherLagStats(metricId)
7070

71+
@volatile var idle = false
72+
7173
/* callbacks to be defined in subclass */
7274

7375
// process fetched data
@@ -628,15 +630,10 @@ abstract class AbstractFetcherThread(name: String,
628630
partitionStates.remove(topicPartition)
629631
fetcherLagStats.unregister(topicPartition)
630632
}
633+
idle = partitionStates.size() <= 0
631634
} finally partitionMapLock.unlock()
632635
}
633636

634-
def partitionCount() = {
635-
partitionMapLock.lockInterruptibly()
636-
try partitionStates.size
637-
finally partitionMapLock.unlock()
638-
}
639-
640637
// Visible for testing
641638
private[server] def fetchState(topicPartition: TopicPartition): Option[PartitionFetchState] = inLock(partitionMapLock) {
642639
Option(partitionStates.stateValue(topicPartition))

0 commit comments

Comments
 (0)