HDInsightのKafkaをJolokia(REST API)でモニタリングしてみる。
はじめに
くどうです
HDInsightをモニタリングしたい場合、Ambari UIを利用すると非常に便利です。
しかし、KafkaについてはAmbari UIでモニタリング出来る項目が少なく別の方法を検討する必要があります。
その一つとして、Jolokiaを利用した方法を紹介します。
Jolokiaとは
Jolokia
https://jolokia.org/index.html
JolokiaはJMXの情報を収集し、HTTP REST APIとして返答できるAgentです。
Agentは一つではなく下記が用意されています。
-
1,WAR Agent for deployment as web application in a JEE Server.
2,OSGi Agent for deployment in an OSGi container. This agent is packaged as a bundle and comes in two flavors (minimal, all-in-one).
3,JVM Agent which can be used with any Oracle/Sun based JVM, Version 6 or later.
4,Mule Agent for usage within a Mule ESB.
それぞれのAgentの詳細は下記で確認できます。
https://jolokia.org/agent.html
KafkaではJVM Agentを利用し、REST APIにてモニタリングします。
構成
今回の構成図
Hdinsight はクラスターの種類はStorm、OSはLinux、バージョンはStorm 0.10.0(HDI 3.4)です。
ログインしてKafkaを確認
-
hdinsight-kafka 2.4
kafka-2-4-2-0-258 0.9.0.2.4.2.0-258
Hdinsight とモニタ用仮想マシン(Linux)は同Vnetの必要があります。
図中の、StormについてあStorm UIを利用して、REST APIが利用できます。
KafkaについてはJolokiaを利用します。
Jolokiaを設定
最初にHDInsight側にJolokiaを設置する必要があります。
AmbariUIを利用し、Kafkaが動作しているIPアドレスを確認します。
まず、Hdinsight にSSHでログインします。
その後、Summaryで確認したIPへログインします。
1 2 |
# wget http://search.maven.org/remotecontent?filepath=org/jolokia/jolokia-jvm/1.3.3/jolokia-jvm-1.3.3-agent.jar -O jolokia-jvm-1.3.3-agent.jar # mv /root/jolokia-jvm-1.3.3-agent.jar /usr/sbin/ |
以上で設置は完了です、
次に、KAFKA_OPTSにてjavaagent動作させます。
直接、Configを書き換えでも大丈夫ですがAmbari UIから設定してみましょう。
Advanced kafka-envを開きます。
kafka-env templateに下記を追記します。
追記する場所は
1 |
export KAFKA_KERBEROS_PARAMS={{kafka_kerberos_params}} |
の直下
1 |
export KAFKA_OPTS=-javaagent:/usr/sbin/jolokia-jvm-1.3.3-agent.jar=host=0.0.0.0 |
host=0.0.0.0はすべてのIPアドレスからREST APIのGETリクエストを受け付けることを意味します。
Saveします。
以上でHdinsight 側に準備ができました。
REST API
実際にモニタ用仮想マシンから確認してみましょう。
モニタ用仮想マシンにログインします。
事前にjqをインストールします。
デフォルトポートは8778です
確認できるメトリクスを確認する場合のsearch書式は
http://[サーバーIP]:8778/jolokia/search/[メトリクス]
|
[root@kafkamonitoring ~]# curl -s http://10.1.0.13:8778/jolokia/search/kafka.*:* | jq . { "value": [ "kafka.server:delayedOperation=Fetch,name=NumDelayedOperations,type=DelayedOperationPurgatory", "kafka.server:delayedOperation=Produce,name=PurgatorySize,type=DelayedOperationPurgatory", "kafka.network:name=RequestsPerSec,request=Heartbeat,type=RequestMetrics", "kafka.network:name=ResponseQueueTimeMs,request=SyncGroup,type=RequestMetrics", "kafka.network:name=ThrottleTimeMs,request=StopReplica,type=RequestMetrics", "kafka.server:name=UnderReplicatedPartitions,type=ReplicaManager", "kafka.network:name=RemoteTimeMs,request=LeaveGroup,type=RequestMetrics", "kafka.network:name=RequestQueueTimeMs,request=StopReplica,type=RequestMetrics", "kafka.network:name=LocalTimeMs,request=DescribeGroups,type=RequestMetrics", "kafka.network:name=ResponseSendTimeMs,request=ListGroups,type=RequestMetrics", "kafka.network:name=LocalTimeMs,request=LeaderAndIsr,type=RequestMetrics", "kafka.server:name=TotalProduceRequestsPerSec,type=BrokerTopicMetrics", "kafka.network:name=ResponseSendTimeMs,request=GroupCoordinator,type=RequestMetrics", "kafka.network:name=RequestQueueSize,type=RequestChannel", "kafka.network:name=LocalTimeMs,request=Metadata,type=RequestMetrics", "kafka.network:name=LocalTimeMs,request=Heartbeat,type=RequestMetrics", "kafka.network:name=ResponseSendTimeMs,request=LeaveGroup,type=RequestMetrics", "kafka.log:name=max-buffer-utilization-percent,type=LogCleaner", "kafka.network:name=RequestsPerSec,request=StopReplica,type=RequestMetrics", "kafka.server:broker-id=1001,type=controller-channel-metrics", "kafka.network:name=RequestQueueTimeMs,request=GroupCoordinator,type=RequestMetrics", "kafka.network:name=ResponseQueueTimeMs,request=Offsets,type=RequestMetrics", "kafka.network:name=LocalTimeMs,request=OffsetCommit,type=RequestMetrics", "kafka.network:name=RequestsPerSec,request=Offsets,type=RequestMetrics", "kafka.network:name=NetworkProcessorAvgIdlePercent,type=SocketServer", "kafka.controller:name=OfflinePartitionsCount,type=KafkaController", "kafka.network:name=ResponseQueueTimeMs,request=Metadata,type=RequestMetrics", "kafka.network:name=ResponseQueueTimeMs,request=ControlledShutdown,type=RequestMetrics", "kafka.network:name=RequestQueueTimeMs,request=LeaderAndIsr,type=RequestMetrics", "kafka.network:name=IdlePercent,networkProcessor=6,type=Processor", "kafka.network:name=ResponseSendTimeMs,request=OffsetCommit,type=RequestMetrics", "kafka.network:name=IdlePercent,networkProcessor=0,type=Processor", "kafka.network:name=RequestQueueTimeMs,request=Heartbeat,type=RequestMetrics", "kafka.network:name=ResponseSendTimeMs,request=ControlledShutdown,type=RequestMetrics", "kafka.network:name=ResponseSendTimeMs,request=DescribeGroups,type=RequestMetrics", "kafka.network:name=TotalTimeMs,request=SyncGroup,type=RequestMetrics", "kafka.network:name=ResponseQueueTimeMs,request=Heartbeat,type=RequestMetrics", "kafka.network:name=TotalTimeMs,request=DescribeGroups,type=RequestMetrics", "kafka.network:name=IdlePercent,networkProcessor=7,type=Processor", "kafka.network:name=TotalTimeMs,request=JoinGroup,type=RequestMetrics", "kafka.network:name=RequestsPerSec,request=FetchConsumer,type=RequestMetrics", "kafka.network:name=ResponseQueueTimeMs,request=Fetch,type=RequestMetrics", "kafka.network:name=ResponseQueueTimeMs,request=FetchConsumer,type=RequestMetrics", "kafka.controller:name=UncleanLeaderElectionsPerSec,type=ControllerStats", "kafka.network:name=TotalTimeMs,request=StopReplica,type=RequestMetrics", "kafka.network:name=RequestsPerSec,request=DescribeGroups,type=RequestMetrics", "kafka.network:name=RequestQueueTimeMs,request=JoinGroup,type=RequestMetrics", "kafka.network:name=ThrottleTimeMs,request=Heartbeat,type=RequestMetrics", "kafka.server:networkProcessor=6,type=socket-server-metrics", "kafka.network:name=TotalTimeMs,request=ControlledShutdown,type=RequestMetrics", "kafka.server:type=Produce", "kafka.network:name=ResponseQueueSize,processor=0,type=RequestChannel", "kafka.network:name=ResponseQueueTimeMs,request=LeaveGroup,type=RequestMetrics", "kafka.log:name=max-clean-time-secs,type=LogCleaner", "kafka.network:name=ThrottleTimeMs,request=Produce,type=RequestMetrics", "kafka.network:name=ResponseQueueSize,processor=7,type=RequestChannel", "kafka.network:name=IdlePercent,networkProcessor=2,type=Processor", "kafka.network:name=ResponseQueueTimeMs,request=OffsetFetch,type=RequestMetrics", "kafka.network:name=LocalTimeMs,request=StopReplica,type=RequestMetrics", "kafka.network:name=ThrottleTimeMs,request=UpdateMetadata,type=RequestMetrics", "kafka.network:name=LocalTimeMs,request=Fetch,type=RequestMetrics", "kafka.network:name=RemoteTimeMs,request=JoinGroup,type=RequestMetrics", "kafka.network:name=ResponseSendTimeMs,request=JoinGroup,type=RequestMetrics", "kafka.network:name=ResponseQueueSize,processor=3,type=RequestChannel", "kafka.server:name=PartitionCount,type=ReplicaManager", "kafka.network:name=RequestsPerSec,request=OffsetCommit,type=RequestMetrics", "kafka.server:networkProcessor=3,type=socket-server-metrics", "kafka.server:name=BrokerState,type=KafkaServer", "kafka.server:delayedOperation=Heartbeat,name=PurgatorySize,type=DelayedOperationPurgatory", "kafka.server:name=RequestHandlerAvgIdlePercent,type=KafkaRequestHandlerPool", "kafka.network:name=RequestQueueTimeMs,request=DescribeGroups,type=RequestMetrics", "kafka.network:name=ResponseQueueSize,processor=2,type=RequestChannel", "kafka.network:name=RemoteTimeMs,request=Produce,type=RequestMetrics", "kafka.network:name=TotalTimeMs,request=Produce,type=RequestMetrics", "kafka.network:name=ResponseSendTimeMs,request=StopReplica,type=RequestMetrics", "kafka.network:name=ThrottleTimeMs,request=SyncGroup,type=RequestMetrics", "kafka.server:name=BytesInPerSec,type=BrokerTopicMetrics", "kafka.network:name=LocalTimeMs,request=ControlledShutdown,type=RequestMetrics", "kafka.network:name=RemoteTimeMs,request=OffsetCommit,type=RequestMetrics", "kafka.network:name=ThrottleTimeMs,request=OffsetFetch,type=RequestMetrics", "kafka.network:name=ResponseSendTimeMs,request=OffsetFetch,type=RequestMetrics", "kafka.network:name=TotalTimeMs,request=LeaveGroup,type=RequestMetrics", "kafka.network:name=ThrottleTimeMs,request=ListGroups,type=RequestMetrics", "kafka.network:name=RequestsPerSec,request=OffsetFetch,type=RequestMetrics", "kafka.controller:name=PreferredReplicaImbalanceCount,type=KafkaController", "kafka.network:name=RequestsPerSec,request=LeaveGroup,type=RequestMetrics", "kafka.server:name=IsrShrinksPerSec,type=ReplicaManager", "kafka.network:name=TotalTimeMs,request=FetchFollower,type=RequestMetrics", "kafka.network:name=ResponseQueueTimeMs,request=StopReplica,type=RequestMetrics", "kafka.server:networkProcessor=1,type=socket-server-metrics", "kafka.server:networkProcessor=5,type=socket-server-metrics", "kafka.network:name=TotalTimeMs,request=Heartbeat,type=RequestMetrics", "kafka.server:name=FailedProduceRequestsPerSec,type=BrokerTopicMetrics", "kafka.network:name=ResponseQueueTimeMs,request=LeaderAndIsr,type=RequestMetrics", "kafka.network:name=RemoteTimeMs,request=OffsetFetch,type=RequestMetrics", "kafka.network:name=ThrottleTimeMs,request=Offsets,type=RequestMetrics", "kafka.network:name=ResponseSendTimeMs,request=Offsets,type=RequestMetrics", "kafka.network:name=LocalTimeMs,request=FetchConsumer,type=RequestMetrics", "kafka.network:name=ResponseSendTimeMs,request=LeaderAndIsr,type=RequestMetrics", "kafka.network:name=ResponseSendTimeMs,request=Produce,type=RequestMetrics", "kafka.server:name=IsrExpandsPerSec,type=ReplicaManager", "kafka.network:name=IdlePercent,networkProcessor=1,type=Processor", "kafka.network:name=RemoteTimeMs,request=ListGroups,type=RequestMetrics", "kafka.network:name=RequestQueueTimeMs,request=LeaveGroup,type=RequestMetrics", "kafka.server:delayedOperation=Produce,name=NumDelayedOperations,type=DelayedOperationPurgatory", "kafka.network:name=RequestsPerSec,request=Produce,type=RequestMetrics", "kafka.network:name=ResponseQueueTimeMs,request=Produce,type=RequestMetrics", "kafka.network:name=IdlePercent,networkProcessor=4,type=Processor", "kafka.network:name=RemoteTimeMs,request=UpdateMetadata,type=RequestMetrics", "kafka.network:name=TotalTimeMs,request=Metadata,type=RequestMetrics", "kafka.network:name=RequestQueueTimeMs,request=Fetch,type=RequestMetrics", "kafka.network:name=ThrottleTimeMs,request=LeaveGroup,type=RequestMetrics", "kafka.network:name=RequestsPerSec,request=LeaderAndIsr,type=RequestMetrics", "kafka.network:name=IdlePercent,networkProcessor=5,type=Processor", "kafka.network:name=ThrottleTimeMs,request=JoinGroup,type=RequestMetrics", "kafka.network:name=RequestsPerSec,request=SyncGroup,type=RequestMetrics", "kafka.network:name=LocalTimeMs,request=GroupCoordinator,type=RequestMetrics", "kafka.network:name=ResponseQueueSize,processor=5,type=RequestChannel", "kafka.network:name=RequestsPerSec,request=JoinGroup,type=RequestMetrics", "kafka.server:delayedOperation=Rebalance,name=PurgatorySize,type=DelayedOperationPurgatory", "kafka.network:name=RemoteTimeMs,request=ControlledShutdown,type=RequestMetrics", "kafka.network:name=RequestsPerSec,request=FetchFollower,type=RequestMetrics", "kafka.network:name=TotalTimeMs,request=GroupCoordinator,type=RequestMetrics", "kafka.network:name=RequestQueueTimeMs,request=SyncGroup,type=RequestMetrics", "kafka.network:name=LocalTimeMs,request=ListGroups,type=RequestMetrics", "kafka.network:name=ThrottleTimeMs,request=ControlledShutdown,type=RequestMetrics", "kafka.network:name=ResponseQueueSize,processor=4,type=RequestChannel", "kafka.server:networkProcessor=4,type=socket-server-metrics", "kafka.network:name=RemoteTimeMs,request=SyncGroup,type=RequestMetrics", "kafka.server:delayedOperation=Fetch,name=PurgatorySize,type=DelayedOperationPurgatory", "kafka.network:name=RemoteTimeMs,request=Heartbeat,type=RequestMetrics", "kafka.network:name=RemoteTimeMs,request=GroupCoordinator,type=RequestMetrics", "kafka.network:name=TotalTimeMs,request=Offsets,type=RequestMetrics", "kafka.network:name=ResponseQueueTimeMs,request=GroupCoordinator,type=RequestMetrics", "kafka.network:name=ResponseQueueTimeMs,request=DescribeGroups,type=RequestMetrics", "kafka.network:name=RequestQueueTimeMs,request=Metadata,type=RequestMetrics", "kafka.network:name=ThrottleTimeMs,request=Fetch,type=RequestMetrics", "kafka.network:name=RemoteTimeMs,request=FetchConsumer,type=RequestMetrics", "kafka.network:name=IdlePercent,networkProcessor=3,type=Processor", "kafka.network:name=ResponseQueueTimeMs,request=JoinGroup,type=RequestMetrics", "kafka.network:name=ResponseQueueTimeMs,request=OffsetCommit,type=RequestMetrics", "kafka.network:name=RequestsPerSec,request=Fetch,type=RequestMetrics", "kafka.server:name=BytesRejectedPerSec,type=BrokerTopicMetrics", "kafka.server:name=MessagesInPerSec,type=BrokerTopicMetrics", "kafka.network:name=TotalTimeMs,request=OffsetFetch,type=RequestMetrics", "kafka.network:name=RequestQueueTimeMs,request=Produce,type=RequestMetrics", "kafka.network:name=RequestQueueTimeMs,request=FetchConsumer,type=RequestMetrics", "kafka.network:name=TotalTimeMs,request=Fetch,type=RequestMetrics", "kafka.server:networkProcessor=7,type=socket-server-metrics", "kafka.server:name=LeaderCount,type=ReplicaManager", "kafka.network:name=ThrottleTimeMs,request=DescribeGroups,type=RequestMetrics", "kafka.network:name=ThrottleTimeMs,request=GroupCoordinator,type=RequestMetrics", "kafka.server:type=Fetch", "kafka.network:name=TotalTimeMs,request=FetchConsumer,type=RequestMetrics", "kafka.network:name=ResponseSendTimeMs,request=UpdateMetadata,type=RequestMetrics", "kafka.network:name=RemoteTimeMs,request=StopReplica,type=RequestMetrics", "kafka.network:name=ThrottleTimeMs,request=Metadata,type=RequestMetrics", "kafka.network:name=RequestQueueTimeMs,request=ListGroups,type=RequestMetrics", "kafka.server:delayedOperation=Heartbeat,name=NumDelayedOperations,type=DelayedOperationPurgatory", "kafka.network:name=LocalTimeMs,request=UpdateMetadata,type=RequestMetrics", "kafka.network:name=RequestQueueTimeMs,request=UpdateMetadata,type=RequestMetrics", "kafka.network:name=RequestsPerSec,request=ListGroups,type=RequestMetrics", "kafka.server:clientId=Replica,name=MaxLag,type=ReplicaFetcherManager", "kafka.network:name=LocalTimeMs,request=JoinGroup,type=RequestMetrics", "kafka.server:networkProcessor=0,type=socket-server-metrics", "kafka.network:name=TotalTimeMs,request=OffsetCommit,type=RequestMetrics", "kafka.network:name=RequestQueueTimeMs,request=ControlledShutdown,type=RequestMetrics", "kafka.network:name=ThrottleTimeMs,request=FetchFollower,type=RequestMetrics", "kafka.network:name=ResponseSendTimeMs,request=FetchFollower,type=RequestMetrics", "kafka.coordinator:name=NumGroups,type=GroupMetadataManager", "kafka.network:name=RequestQueueTimeMs,request=Offsets,type=RequestMetrics", "kafka.network:name=ResponseQueueSize,type=RequestChannel", "kafka.network:name=TotalTimeMs,request=UpdateMetadata,type=RequestMetrics", "kafka.server:name=FailedFetchRequestsPerSec,type=BrokerTopicMetrics", "kafka.log:name=max-dirty-percent,type=LogCleanerManager", "kafka.network:name=RemoteTimeMs,request=Fetch,type=RequestMetrics", "kafka.network:name=LocalTimeMs,request=LeaveGroup,type=RequestMetrics", "kafka.server:delayedOperation=Rebalance,name=NumDelayedOperations,type=DelayedOperationPurgatory", "kafka.network:name=ResponseSendTimeMs,request=Metadata,type=RequestMetrics", "kafka.network:name=LocalTimeMs,request=Offsets,type=RequestMetrics", "kafka.utils:name=cleaner-io,type=Throttler", "kafka.network:name=ResponseSendTimeMs,request=Heartbeat,type=RequestMetrics", "kafka.network:name=RequestQueueTimeMs,request=FetchFollower,type=RequestMetrics", "kafka.network:name=ThrottleTimeMs,request=LeaderAndIsr,type=RequestMetrics", "kafka.network:name=LocalTimeMs,request=SyncGroup,type=RequestMetrics", "kafka.network:name=ResponseQueueTimeMs,request=UpdateMetadata,type=RequestMetrics", "kafka.network:name=RequestsPerSec,request=Metadata,type=RequestMetrics", "kafka.network:name=RemoteTimeMs,request=Metadata,type=RequestMetrics", "kafka.server:networkProcessor=2,type=socket-server-metrics", "kafka.network:name=RequestQueueTimeMs,request=OffsetFetch,type=RequestMetrics", "kafka.network:name=RequestsPerSec,request=UpdateMetadata,type=RequestMetrics", "kafka.network:name=ResponseSendTimeMs,request=Fetch,type=RequestMetrics", "kafka.network:name=ResponseSendTimeMs,request=FetchConsumer,type=RequestMetrics", "kafka.network:name=LocalTimeMs,request=OffsetFetch,type=RequestMetrics", "kafka.controller:name=ActiveControllerCount,type=KafkaController", "kafka.network:name=LocalTimeMs,request=FetchFollower,type=RequestMetrics", "kafka.server:name=BytesOutPerSec,type=BrokerTopicMetrics", "kafka.network:name=ResponseQueueTimeMs,request=ListGroups,type=RequestMetrics", "kafka.network:name=ThrottleTimeMs,request=OffsetCommit,type=RequestMetrics", "kafka.controller:name=LeaderElectionRateAndTimeMs,type=ControllerStats", "kafka.network:name=ResponseQueueSize,processor=6,type=RequestChannel", "kafka.network:name=ResponseQueueTimeMs,request=FetchFollower,type=RequestMetrics", "kafka.log:name=cleaner-recopy-percent,type=LogCleaner", "kafka.network:name=LocalTimeMs,request=Produce,type=RequestMetrics", "kafka.network:name=ResponseQueueSize,processor=1,type=RequestChannel", "kafka.server:clientId=Replica,name=MinFetchRate,type=ReplicaFetcherManager", "kafka.network:name=ResponseSendTimeMs,request=SyncGroup,type=RequestMetrics", "kafka.network:name=TotalTimeMs,request=LeaderAndIsr,type=RequestMetrics", "kafka.network:name=RequestsPerSec,request=GroupCoordinator,type=RequestMetrics", "kafka.server:name=TotalFetchRequestsPerSec,type=BrokerTopicMetrics", "kafka.network:name=RemoteTimeMs,request=FetchFollower,type=RequestMetrics", "kafka.network:name=ThrottleTimeMs,request=FetchConsumer,type=RequestMetrics", "kafka.coordinator:name=NumOffsets,type=GroupMetadataManager", "kafka.server:id=1001,type=app-info", "kafka.network:name=RemoteTimeMs,request=Offsets,type=RequestMetrics", "kafka.network:name=RequestQueueTimeMs,request=OffsetCommit,type=RequestMetrics", "kafka.network:name=RemoteTimeMs,request=LeaderAndIsr,type=RequestMetrics", "kafka.network:name=RemoteTimeMs,request=DescribeGroups,type=RequestMetrics", "kafka.network:name=RequestsPerSec,request=ControlledShutdown,type=RequestMetrics", "kafka.network:name=TotalTimeMs,request=ListGroups,type=RequestMetrics" ], "request": { "type": "search", "mbean": "kafka.*:*" }, "status": 200, "timestamp": 1465974055 } |
メトリクスを確認する場合のread書式は
http://[サーバーIP]:8778/jolokia/read/[メトリクス]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
[root@kafkamonitoring ~]# curl -s http://10.1.0.13:8778/jolokia/read/kafka.server:networkProcessor=4,type=socket-server-metrics | jq . { "value": { "response-rate": 0, "outgoing-byte-rate": 0, "io-time-ns-avg": 4206.140350877193, "io-wait-time-ns-avg": 300293983.64912283, "incoming-byte-rate": 0, "connection-count": 0, "io-wait-ratio": 1.0025923015375604, "io-ratio": 1.4043051691316445e-05, "request-rate": 0, "request-size-avg": 0, "request-size-max": null, "select-rate": 3.338702591887538, "connection-creation-rate": 0, "connection-close-rate": 0, "network-io-rate": 0 }, "request": { "type": "read", "mbean": "kafka.server:networkProcessor=4,type=socket-server-metrics" }, "status": 200, "timestamp": 1465980199 } |
すべてを確認する場合は
curl -s http://10.1.0.13:8778/jolokia/read/kafka.*:* | jq .
にて確認できます。
これらをCronなどで定期的に取集することなども行えます。jqやawkなど利用してログに残すこともできます。
まとめ
Hdinsight をモニタリングする場合、ポータルで出来るのが理想ですが、さすがに無理なので今回はjolokiaを利用したモニタリングの方法を紹介しました。
これはHdinsight やkafkaに限った方法ではありません。JMXを利用してモニタリングを行いたい場合は非常に便利です。
ではでは