Skip to content

Commit a378c89

Browse files
Lincong Lixiowu0
authored andcommitted
[LI-HOTFIX] LIKAFKA-21968: Add broker-side observer interface and NoOpObserver implementation (#6)
TICKET = LI_DESCRIPTION = The observer interface lets us provide implementation which provides the usage accounting data unit for the C2S V3 service. Reviewers: Radai Rosenblatt EXIT_CRITERIA = MANUAL [""]
1 parent 7431935 commit a378c89

File tree

7 files changed

+214
-2
lines changed

7 files changed

+214
-2
lines changed

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ class KafkaApis(val requestChannel: RequestChannel,
8787
val metadataCache: MetadataCache,
8888
val metrics: Metrics,
8989
val authorizer: Option[Authorizer],
90+
val observer: Observer,
9091
val quotas: QuotaManagers,
9192
val fetchManager: FetchManager,
9293
brokerTopicStats: BrokerTopicStats,
@@ -531,6 +532,14 @@ class KafkaApis(val requestChannel: RequestChannel,
531532
if (authorizedRequestInfo.isEmpty)
532533
sendResponseCallback(Map.empty)
533534
else {
535+
536+
try
537+
observer.observeProduceRequest(request.context, request.body[ProduceRequest])
538+
catch {
539+
case e: Exception => error(s"Observer failed to observe the produce request " +
540+
s"${Observer.describeRequestAndResponse(request, null)}", e)
541+
}
542+
534543
val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId
535544

536545
// call the replica manager to append messages to the replicas
@@ -2597,8 +2606,11 @@ class KafkaApis(val requestChannel: RequestChannel,
25972606
val responseString =
25982607
if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
25992608
else None
2609+
observeRequestResponse(request, response)
2610+
26002611
new RequestChannel.SendResponse(request, responseSend, responseString, onComplete)
26012612
case None =>
2613+
observeRequestResponse(request, null)
26022614
new RequestChannel.NoOpResponse(request)
26032615
}
26042616
sendResponse(response)
@@ -2620,4 +2632,11 @@ class KafkaApis(val requestChannel: RequestChannel,
26202632
}
26212633
}
26222634

2635+
private def observeRequestResponse(request: RequestChannel.Request, response: AbstractResponse): Unit = {
2636+
try {
2637+
observer.observe(request.context, request.body[AbstractRequest], response)
2638+
} catch {
2639+
case e: Exception => error(s"Observer failed to observe ${Observer.describeRequestAndResponse(request, response)}", e)
2640+
}
2641+
}
26232642
}

core/src/main/scala/kafka/server/KafkaConfig.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ object Defaults {
6464
/************* Authorizer Configuration ***********/
6565
val AuthorizerClassName = ""
6666

67+
/** ********* Broker-side configuration ***********/
68+
val ObserverClassName = "kafka.server.NoOpObserver"
69+
val ObserverShutdownTimeoutMs = 2000
70+
6771
/** ********* Socket Server Configuration ***********/
6872
val Port = 9092
6973
val HostName: String = new String("")
@@ -291,6 +295,11 @@ object KafkaConfig {
291295
val ProducerBatchDecompressionEnableProp = "producer.batch.decompression.enable"
292296
/************* Authorizer Configuration ***********/
293297
val AuthorizerClassNameProp = "authorizer.class.name"
298+
299+
/** ********* Broker-side observer Configuration ****************/
300+
val ObserverClassNameProp = "observer.class.name"
301+
val ObserverShutdownTimeoutMsProp = "observer.shutdown.timeout"
302+
294303
/** ********* Socket Server Configuration ***********/
295304
val PortProp = "port"
296305
val HostNameProp = "host.name"
@@ -866,6 +875,12 @@ object KafkaConfig {
866875
val PasswordEncoderKeyLengthDoc = "The key length used for encoding dynamically configured passwords."
867876
val PasswordEncoderIterationsDoc = "The iteration count used for encoding dynamically configured passwords."
868877

878+
/** ********* Broker-side Observer Configuration *********/
879+
val ObserverClassNameDoc = "The name of the observer class that is used to observe requests and/or response on broker."
880+
val ObserverShutdownTimeoutMsDoc = "The maximum time of closing/shutting down an observer. This property can not be less than or equal to " +
881+
"zero. When closing/shutting down an observer, most time is spent on flushing the observed stats. The reasonable timeout should be close to " +
882+
"the time it takes to flush the stats."
883+
869884
private val configDef = {
870885
import ConfigDef.Importance._
871886
import ConfigDef.Range._
@@ -901,6 +916,10 @@ object KafkaConfig {
901916
/************* Authorizer Configuration ***********/
902917
.define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc)
903918

919+
/************* Broker-side Observer Configuration ***********/
920+
.define(ObserverClassNameProp, STRING, Defaults.ObserverClassName, MEDIUM, ObserverClassNameDoc)
921+
.define(ObserverShutdownTimeoutMsProp, LONG, Defaults.ObserverShutdownTimeoutMs, atLeast(1), MEDIUM, ObserverShutdownTimeoutMsDoc)
922+
904923
/** ********* Socket Server Configuration ***********/
905924
.define(PortProp, INT, Defaults.Port, HIGH, PortDoc)
906925
.define(HostNameProp, STRING, Defaults.HostName, HIGH, HostNameDoc)
@@ -1203,6 +1222,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
12031222
/************* Authorizer Configuration ***********/
12041223
val authorizerClassName: String = getString(KafkaConfig.AuthorizerClassNameProp)
12051224

1225+
/************* Broker-side Observer Configuration ********/
1226+
val ObserverClassName: String = getString(KafkaConfig.ObserverClassNameProp)
1227+
val ObserverShutdownTimeoutMs: Long = getLong(KafkaConfig.ObserverShutdownTimeoutMsProp)
1228+
12061229
/** ********* Socket Server Configuration ***********/
12071230
val hostName = getString(KafkaConfig.HostNameProp)
12081231
val port = getInt(KafkaConfig.PortProp)

core/src/main/scala/kafka/server/KafkaServer.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
120120
var controlPlaneRequestProcessor: KafkaApis = null
121121

122122
var authorizer: Option[Authorizer] = None
123+
var observer: Observer = null
123124
var socketServer: SocketServer = null
124125
var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
125126
var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
@@ -309,21 +310,30 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
309310
authZ
310311
}
311312

313+
observer = try {
314+
CoreUtils.createObject[Observer](config.ObserverClassName)
315+
} catch {
316+
case e: Exception =>
317+
error(s"Creating observer instance from the given class name ${config.ObserverClassName} failed.", e)
318+
new NoOpObserver
319+
}
320+
observer.configure(config.originals())
321+
312322
val fetchManager = new FetchManager(Time.SYSTEM,
313323
new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
314324
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
315325

316326
/* start processing requests */
317327
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
318-
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
328+
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, observer, quotaManagers,
319329
fetchManager, brokerTopicStats, clusterId, time, tokenManager)
320330

321331
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
322332
config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
323333

324334
socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
325335
controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
326-
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
336+
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, observer, quotaManagers,
327337
fetchManager, brokerTopicStats, clusterId, time, tokenManager)
328338

329339
controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
@@ -630,6 +640,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
630640
if (controlPlaneRequestProcessor != null)
631641
CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
632642
CoreUtils.swallow(authorizer.foreach(_.close()), this)
643+
644+
CoreUtils.swallow(observer.close(config.ObserverShutdownTimeoutMs, TimeUnit.MILLISECONDS), this)
645+
633646
if (adminManager != null)
634647
CoreUtils.swallow(adminManager.shutdown(), this)
635648

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kafka.server
19+
20+
import java.util.Map
21+
import java.util.concurrent.TimeUnit
22+
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ProduceRequest, RequestContext}
23+
24+
/**
25+
* An observer implementation that has no operation and serves as a place holder.
26+
*/
27+
class NoOpObserver extends Observer {
28+
29+
def configure(configs: Map[String, _]): Unit = {}
30+
31+
/**
32+
* Observe a request and its corresponding response.
33+
*/
34+
def observe(requestContext: RequestContext, request: AbstractRequest, response: AbstractResponse): Unit = {}
35+
36+
/**
37+
* Observe a produce request
38+
*/
39+
def observeProduceRequest(requestContext: RequestContext, produceRequest: ProduceRequest): Unit = {}
40+
41+
/**
42+
* Close the observer with timeout.
43+
*/
44+
def close(timeout: Long, unit: TimeUnit): Unit = {}
45+
46+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kafka.server
19+
20+
import java.util.concurrent.TimeUnit
21+
import kafka.network.RequestChannel
22+
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ProduceRequest, RequestContext}
23+
import org.apache.kafka.common.Configurable
24+
25+
/**
26+
* Top level interface that all pluggable observer must implement. Kafka will read the 'observer.class.name' config
27+
* value at startup time, create an instance of the specificed class using the default constructor, and call its
28+
* 'configure' method.
29+
*
30+
* From that point onwards, every pair of request and response will be routed to the 'record' method.
31+
*
32+
* If 'observer.class.name' has no value specified or the specified class does not exist, the <code>NoOpObserver</code>
33+
* will be used as a place holder.
34+
*/
35+
trait Observer extends Configurable {
36+
37+
/**
38+
* Observe a request and its corresponding response
39+
*
40+
* @param requestContext the context information about the request
41+
* @param request the request being observed for a various purpose(s)
42+
* @param response the response to the request
43+
*/
44+
def observe(requestContext: RequestContext, request: AbstractRequest, response: AbstractResponse): Unit
45+
46+
/**
47+
* Observe a produce request. This method handles only the produce request since produce request is special in
48+
* two ways. Firstly, if ACK is set to be 0, there is no produce response associated with the produce request.
49+
* Secondly, the lifecycle of some inner fields in a ProduceRequest is shorter than the lifecycle of the produce
50+
* request itself. That means in some situations, when <code>observe</code> is called on a produce request and
51+
* response pair, some fields in the produce request has been null-ed already so that the produce request and
52+
* response is not observable (or no useful information). Therefore this method exists for the purpose of allowing
53+
* users to observe on the produce request before its corresponding response is created.
54+
*
55+
* @param requestContext the context information about the request
56+
* @param produceRequest the produce request being observed for a various purpose(s)
57+
*/
58+
def observeProduceRequest(requestContext: RequestContext, produceRequest: ProduceRequest): Unit
59+
60+
/**
61+
* Close the observer with timeout.
62+
*
63+
* @param timeout the maximum time to wait to close the observer.
64+
* @param unit the time unit.
65+
*/
66+
def close(timeout: Long, unit: TimeUnit): Unit
67+
}
68+
69+
object Observer {
70+
71+
/**
72+
* Generates a description of the given request and response. It could be used mostly for debugging purpose.
73+
*
74+
* @param request the request being described
75+
* @param response the response to the request
76+
*/
77+
def describeRequestAndResponse(request: RequestChannel.Request, response: AbstractResponse): String = {
78+
var requestDesc = "Request"
79+
var responseDesc = "Response"
80+
try {
81+
if (request == null) {
82+
requestDesc += " null"
83+
} else {
84+
requestDesc += (" header: " + request.header)
85+
requestDesc += (" from service with principal: " +
86+
request.session.sanitizedUser +
87+
" IP address: " + request.session.clientAddress)
88+
}
89+
requestDesc += " | " // Separate the response description from the request description
90+
91+
if (response == null) {
92+
responseDesc += " null"
93+
} else {
94+
responseDesc += (if (response.errorCounts == null || response.errorCounts.size == 0) {
95+
" with no error"
96+
} else {
97+
" with errors: " + response.errorCounts
98+
})
99+
}
100+
} catch {
101+
case e: Exception => return e.toString // If describing fails, return the exception message directly
102+
}
103+
requestDesc + responseDesc
104+
}
105+
}

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class KafkaApisTest {
7070
private val brokerId = 1
7171
private val metadataCache = new MetadataCache(brokerId)
7272
private val authorizer: Option[Authorizer] = None
73+
private val observer: Observer = EasyMock.createNiceMock(classOf[Observer])
7374
private val clientQuotaManager: ClientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
7475
private val clientRequestQuotaManager: ClientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
7576
private val replicaQuotaManager: ReplicationQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
@@ -102,6 +103,7 @@ class KafkaApisTest {
102103
metadataCache,
103104
metrics,
104105
authorizer,
106+
observer,
105107
quotas,
106108
fetchManager,
107109
brokerTopicStats,

core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -762,6 +762,10 @@ class KafkaConfigTest {
762762
case KafkaConfig.KafkaMetricsReporterClassesProp => // ignore
763763
case KafkaConfig.KafkaMetricsPollingIntervalSecondsProp => //ignore
764764

765+
// Broker-side observer configs
766+
case KafkaConfig.ObserverClassNameProp => // ignore since even if the class name is invalid, a NoOpObserver class is used instead
767+
case KafkaConfig.ObserverShutdownTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1", "0")
768+
765769
case _ => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
766770
}
767771
})

0 commit comments

Comments
 (0)