@@ -23,7 +23,7 @@ import scala.util.matching.Regex
2323import scala .util .{Failure , Try }
2424
2525object KafkaJMX extends Logging {
26-
26+
2727 private [this ] val defaultJmxConnectorProperties = Map [String , Any ] (
2828 " jmx.remote.x.request.waiting.timeout" -> " 3000" ,
2929 " jmx.remote.x.notification.fetch.timeout" -> " 3000" ,
@@ -99,7 +99,7 @@ object KafkaMetrics {
9999 private def getBrokerTopicMeterMetrics (kafkaVersion : KafkaVersion , mbsc : MBeanServerConnection , metricName : String , topicOption : Option [String ]) = {
100100 getMeterMetric(mbsc, getObjectName(kafkaVersion, metricName, topicOption))
101101 }
102-
102+
103103 private def getSep (kafkaVersion : KafkaVersion ) : String = {
104104 kafkaVersion match {
105105 case Kafka_0_8_1_1 => " \" "
@@ -110,7 +110,7 @@ object KafkaMetrics {
110110 def getObjectName (kafkaVersion : KafkaVersion , name : String , topicOption : Option [String ] = None ) = {
111111 val sep = getSep(kafkaVersion)
112112 val topicAndName = kafkaVersion match {
113- case Kafka_0_8_1_1 =>
113+ case Kafka_0_8_1_1 =>
114114 topicOption.map( topic => s " ${sep}$topic- $name${sep}" ).getOrElse(s " ${sep}AllTopics $name${sep}" )
115115 case _ =>
116116 val topicProp = topicOption.map(topic => s " ,topic= $topic" ).getOrElse(" " )
@@ -127,11 +127,11 @@ object KafkaMetrics {
127127 /* Gauge, Value : 0 */
128128 private val replicaFetcherManagerMaxLag = new ObjectName (
129129 " kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica" )
130-
130+
131131 /* Gauge, Value : 0 */
132132 private val kafkaControllerActiveControllerCount = new ObjectName (
133133 " kafka.controller:type=KafkaController,name=ActiveControllerCount" )
134-
134+
135135 /* Gauge, Value : 0 */
136136 private val kafkaControllerOfflinePartitionsCount = new ObjectName (
137137 " kafka.controller:type=KafkaController,name=OfflinePartitionsCount" )
@@ -144,16 +144,18 @@ object KafkaMetrics {
144144 private val operatingSystemObjectName = new ObjectName (" java.lang:type=OperatingSystem" )
145145
146146 /* Log Segments */
147- private val logSegmentObjectName = new ObjectName (" kafka.log:type=Log,name=*- LogSegments" )
147+ private val logSegmentObjectName = new ObjectName (" kafka.log:type=Log,name=LogSegments,topic=*,partition=* " )
148148
149- private val directoryObjectName = new ObjectName (" kafka.log:type=Log,name=*- Directory" )
149+ private val directoryObjectName = new ObjectName (" kafka.log:type=Log,name=Directory,topic=*,partition=* " )
150150
151- private val LogSegmentsNameRegex = new Regex (" %s-LogSegments" .format(""" (.*)-(\d*)""" ), " topic" , " partition" )
151+ // exp: kafka.log:type=Log,name=LogSegments,topic=WEB_post,partition=19/Value (ArrayList) = [baseOffset=0, created=1527666489299, logSize=55232565, indexSize=252680]
152+ // private val LogSegmentsNameRegex = new Regex("%s-LogSegments".format("""(.*)-(\d*)"""), "topic", "partition")
152153
153- private val DirectoryNameRegex = new Regex (" %s-Directory" .format(""" (.*)-(\d*)""" ), " topic" , " partition" )
154+ // exp: kafka.log:type=Log,name=Directory,topic=WEB_post,partition=16/Value (String) = /log/kafka/WEB_post-16
155+ // private val DirectoryNameRegex = new Regex("%s-Directory".format("""(.*)-(\d*)"""), "topic", "partition")
154156
155157 val LogSegmentRegex = new Regex (
156- " baseOffset=(.*), created=(.*), logSize=(.*), indexSize=(.*)" ,
158+ " baseOffset=(.*); created=(.*); logSize=(.*); indexSize=(.*)" ,
157159 " baseOffset" , " created" , " logSize" , " indexSize"
158160 )
159161
@@ -172,7 +174,7 @@ object KafkaMetrics {
172174 case _ : InstanceNotFoundException => OSMetric (0D , 0D )
173175 }
174176 }
175-
177+
176178 private def getMeterMetric (mbsc : MBeanServerConnection , name : ObjectName ) = {
177179 import scala .collection .JavaConverters ._
178180 try {
@@ -187,7 +189,7 @@ object KafkaMetrics {
187189 case _ : InstanceNotFoundException => MeterMetric (0 ,0 ,0 ,0 ,0 )
188190 }
189191 }
190-
192+
191193 private def getLongValue (attributes : Seq [Attribute ], name : String ) = {
192194 attributes.find(_.getName == name).map(_.getValue.asInstanceOf [Long ]).getOrElse(0L )
193195 }
@@ -196,27 +198,22 @@ object KafkaMetrics {
196198 attributes.find(_.getName == name).map(_.getValue.asInstanceOf [Double ]).getOrElse(0D )
197199 }
198200
199- private def topicAndPartition (name : String , regex : Regex ) = {
201+ private def topicAndPartition (objectName : ObjectName ) = {
200202 try {
201- val matches = regex.findAllIn(name).matchData.toSeq
202- require(matches.size == 1 )
203- val m = matches.head
204-
205- val topic = m.group(" topic" )
206- val partition = m.group(" partition" ).toInt
207-
203+ val topic = objectName.getKeyProperty(" topic" )
204+ val partition = objectName.getKeyProperty(" partition" ).toInt
208205 (topic, partition)
209206 }
210207 catch {
211208 case e : Exception =>
212- throw new IllegalStateException (" Can't parse topic and partition from: <%s>" .format(name ), e)
209+ throw new IllegalStateException (" Can't parse topic and partition from: <%s>" .format(objectName ), e)
213210 }
214211 }
215212
216213 private def queryValues [K , V ](
217214 mbsc : MBeanServerConnection ,
218215 objectName : ObjectName ,
219- keyConverter : String => K ,
216+ keyConverter : ObjectName => K ,
220217 valueConverter : Object => V
221218 ) = {
222219 val logsSizeObjectNames = mbsc.queryNames(objectName, null ).asScala.toSeq
@@ -228,12 +225,12 @@ object KafkaMetrics {
228225 private def queryValue [K , V ](
229226 mbsc : MBeanServerConnection ,
230227 objectName : ObjectName ,
231- keyConverter : String => K ,
228+ keyConverter : ObjectName => K ,
232229 valueConverter : Object => V
233230 ) = {
234- val name = objectName.getKeyProperty(" name" )
231+ // val name = objectName.getKeyProperty("name")
235232 val mbean = MBeanServerInvocationHandler .newProxyInstance(mbsc, objectName, classOf [GaugeMBean ], true )
236- (keyConverter(name ), valueConverter(mbean.getValue))
233+ (keyConverter(objectName ), valueConverter(mbean.getValue))
237234 }
238235
239236 private def parseLogSegment (str : String ): LogSegment = {
@@ -259,7 +256,7 @@ object KafkaMetrics {
259256 queryValues(
260257 mbsc,
261258 logSegmentObjectName,
262- key => topicAndPartition(key, LogSegmentsNameRegex ),
259+ key => topicAndPartition(key),
263260 value => {
264261 val lst = value.asInstanceOf [ju.List [String ]]
265262 lst.asScala.map(parseLogSegment).toSeq
@@ -271,7 +268,7 @@ object KafkaMetrics {
271268 queryValues(
272269 mbsc,
273270 directoryObjectName,
274- key => topicAndPartition(key, DirectoryNameRegex ),
271+ key => topicAndPartition(key),
275272 value => value.asInstanceOf [String ]
276273 )
277274 }.toMap
@@ -355,10 +352,10 @@ case class MeterMetric(count: Long,
355352
356353 def + (o : MeterMetric ) : MeterMetric = {
357354 MeterMetric (
358- o.count + count,
359- o.fifteenMinuteRate + fifteenMinuteRate,
360- o.fiveMinuteRate + fiveMinuteRate,
361- o.oneMinuteRate + oneMinuteRate,
355+ o.count + count,
356+ o.fifteenMinuteRate + fifteenMinuteRate,
357+ o.fiveMinuteRate + fiveMinuteRate,
358+ o.oneMinuteRate + oneMinuteRate,
362359 o.meanRate + meanRate)
363360 }
364361}
0 commit comments