Kakfa ConnectでSinkが動かないときの確認メモ

Apache KafkaからHadoopのHDFS転送で、
うまく転送できないものがあり、忘れないようにするためのメモ

アプリケーション自体はいわゆるWebアクセス系のトラッキング

  • メタデータを付与したログデータをFluentdで収集
  • Apache Kafkaのtopicに格納
  • Kafka StreamsでTopicのデータ内で完結する前処理後、他topicへ転送
  • Kafka Streams処理後のデータをHDFSへ転送

connect-distributed

distributedで動かすのは、connect-distributedコマンドで直接実行するか、
または

$ systemctl start confluent-kafka-connect

通常のアプリケーションから送信されるときはJSONにして送ることが多く、
そのままの設定でconnectへ登録しました(Webアプリケーション側にはAvro導入していない)

hdfsへのsink connect例は下記

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=streamed.purchase_promotion
hdfs.url=hdfs://hdfshostname/path
flush.size=3

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
 
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# 省略
format.class=io.confluent.connect.hdfs.json.JsonFormat

正常に登録されたかどうかは、RESTで問い合わせることで判定ができます。

$ curl localhost:8083/connectors
["hdfs-sink"]

対象のconnectorだけを確認する場合は、
curl localhost:8083/connectors/connector-name で問い合わせます。
Fluentdからの転送とKafka Streamsの動作を確認して、
Connectだけが動かない・・・。

確認するところ

connectorのtask状況もRESTで問い合わせて確認できます。

curl localhost:8083/connectors/connector-name/tasks/0/status | jq
{
  "state": "FAILED",
  "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat ....(省略",
}

何かエラーが出ているので、その内容を探します。

jacksonでパースエラー発生

Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hoge': was expecting

topicへ挿入されるメッセージ自体はconsumeコマンドで確認できます。

$ kafka-console-consumer --from-beginning --topic topic.name --bootstrap-server hostname:9092
{"referer":null,"event": "blog"}

JSONで挿入されているので問題はありませんでした。
このデータはいわゆるValueなため、Keyも確認します。
keyを取得する場合は、property print.key=true を指定します。

$ kafka-console-consumer --property print.key=true --from-beginning --topic topic.name --bootstrap-server hostname:9092
hoge.stdout  {"referer":null,"event": "blog"}

どう云ったキーで送信されているかがわかります。
kafka-connectのkey.converterをStringに対応させるには以下のConverterを指定

key.converter=org.apache.kafka.connect.storage.StringConverter

再度登録し直して、転送がスタートされました。