Apache KafkaからHadoopのHDFS転送で、
うまく転送できないものがあり、忘れないようにするためのメモ
アプリケーション自体はいわゆるWebアクセス系のトラッキング
- メタデータを付与したログデータをFluentdで収集
- Apache Kafkaのtopicに格納
- Kafka StreamsでTopicのデータ内で完結する前処理後、他topicへ転送
- Kafka Streams処理後のデータをHDFSへ転送
connect-distributed
distributedで動かすのは、connect-distributedコマンドで直接実行するか、
または
$ systemctl start confluent-kafka-connect
通常のアプリケーションから送信されるときはJSONにして送ることが多く、
そのままの設定でconnectへ登録しました(Webアプリケーション側にはAvro導入していない)
hdfsへのsink connect例は下記
name=hdfs-sink connector.class=io.confluent.connect.hdfs.HdfsSinkConnector tasks.max=1 topics=streamed.purchase_promotion hdfs.url=hdfs://hdfshostname/path flush.size=3 key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false # 省略 format.class=io.confluent.connect.hdfs.json.JsonFormat
正常に登録されたかどうかは、RESTで問い合わせることで判定ができます。
$ curl localhost:8083/connectors ["hdfs-sink"]
対象のconnectorだけを確認する場合は、
curl localhost:8083/connectors/connector-name
で問い合わせます。
Fluentdからの転送とKafka Streamsの動作を確認して、
Connectだけが動かない・・・。
確認するところ
connectorのtask状況もRESTで問い合わせて確認できます。
curl localhost:8083/connectors/connector-name/tasks/0/status | jq { "state": "FAILED", "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat ....(省略", }
何かエラーが出ているので、その内容を探します。
jacksonでパースエラー発生
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hoge': was expecting
topicへ挿入されるメッセージ自体はconsumeコマンドで確認できます。
$ kafka-console-consumer --from-beginning --topic topic.name --bootstrap-server hostname:9092
{"referer":null,"event": "blog"}
JSONで挿入されているので問題はありませんでした。
このデータはいわゆるValueなため、Keyも確認します。
keyを取得する場合は、property print.key=true
を指定します。
$ kafka-console-consumer --property print.key=true --from-beginning --topic topic.name --bootstrap-server hostname:9092
hoge.stdout {"referer":null,"event": "blog"}
どう云ったキーで送信されているかがわかります。
kafka-connectのkey.converterをStringに対応させるには以下のConverterを指定
key.converter=org.apache.kafka.connect.storage.StringConverter
再度登録し直して、転送がスタートされました。