PHP with Apache Kafka

Apache Foundation

ビッグデータ系の処理向けにApache Kafkaを利用し始めました。

これまでもMessage Queueなどにzmq、Redis(PubSub)、ActiveMQ/RabbitMQなどを利用はしていましたが、

スケールのしやすさや、運用面や機能など今後フル活用できそうなためKafkaに、と。
HadoopやHbase、Cassandraといったミドルウェアを扱う機会も増えていているため、
親和性なども当然あります

Kafkaどうなのよ

保存機能

Kafkaでは指定した期間、メッセージを保存する機能が用意されています。
これまでもその機能を持つMessage Queueはありましたが、
配信後にアプリケーション側から再配信させるといったことができます。
デフォルトで2週間ほど保持することができるので、
障害発生時に、再度処理を実行させることも、配信したメッセージの内容を取得することも容易です。

ただ、たしかに最近のPHP、特にLaravelなどにみられる非同期処理を溜め込むためのQueueレベルであれば、
かなりのオーバースペックとなります。
ビッグデータではデファクトスタンダードと言っても良いくらいのラムダアーキテクチャの上で動くアプリケーションであれば、
集計処理やリアルタイム処理など全てが異なるミドルウェアで実行されますので、
速度層などでは重宝します。

Sparkと一緒に利用することでストリーム処理がより強力になるのも魅力的です
(そのために最近scalaをやり始めました)

実績もあるZookeeperを利用したスケールのしやすさなどは現時点ではKafka一択でした。

Kafkaから直接CassandraやHbaseに値を挿入することもできます

www.confluent.io

Partition

Kafkaは簡単にクラスタを構築することができるのはもちろんですが、
このパーティションを利用してアプリケーションレベルで並列や分散処理が設計できます。

ごく一般的なPubSubやQueueではChannel(KafkaではTopicと呼びます)を利用して、
Round-Robinや、同一のQueueを複数のSubscriberが同時に処理するなどがあげられます。

Kafkaでは上記はもちろんですが、アプリケーションで自由に決めることができ、
場合によっては木の枝の様にどんどん細分化して効率的に処理を行うこともできます。

f:id:ytakezawa:20170521001922p:plain

これはSubscriber側だけではなく、PublishするProducer側からも指定することができます。
一つのTopicに対してPartitionで分割し、特定のサーバからはpartition0, あるサーバからはpartition1に対して、
Publishするなどが可能で、
複数のConsumerはこれらをまとめて処理することもProducer同様に、
アプリケーションに合わせてConsumerを異なるサーバで動かすなどができます。

Chatの様なリアルタイム性が重視されるアプリケーションにおいても、
TopicとPartitionがいきてくるのではないかと思います。

Kafka Install

インストール方法はいろんなサイトに記載されていますので省きます。

ただし、アプリケーション側から接続ができない、などがあれば、
confg/server.properties の中身を確認しましょう。

必要に応じて、 advertised.listeners=PLAINTEXT://ipアドレス指定:9092 などデフォルトのままではなく、
正しい情報になっているかどうかを確認します。

Zookeeperの利用が必須になりますので、Kafkaとは別に起動が必要です。
こちらもあわせてZookpeerのアドレスが正しいかどうかを確認してください。

# root directory for all kafka znodes.
zookeeper.connect=ipアドレス指定:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

*Zookeeperのクラスタの構築方法は他のサイトを参照ください

PHPから利用する

Kafkaを細かく利用していくのであれば、rdkafkaがおすすめです。
libkafkaをラップしたphpエクステンションで、
かなり自由に設定を変更できます。

arnaud-lb/php-rdkafka

設定についはアプリケーションの規模や構成によって異なりますので、
公式のドキュイメントかconfluentなどのドキュメントを読み込むのをおすすめします。

Kafka Consumers — Confluent Platform 3.2.1 documentation

Producerは queue.buffering.max.messages などをアプリケーションに合わせて変更してください。
処理速度が早すぎる場合は、指定した件数以上は処理されなくなってしまいます。

Consumerは heartbeat.interval.ms, session.timeout.ms を適切な値に設定する必要があります。
(正しく設定していない場合は切断されるなどが発生します 切断時はsupervisorでプロセス再起動)
heartbeat.interval.msは session.timeoutの1/3くらいに設定すると良いかもしれません。

簡単にProducer/Consumerの動作確認、partition分割の動作確認などを行えるサンプルを用意していますので、
難しければ参考にしてみてください

github.com

RdKafka\Conf クラスを利用して、上記の設定値を反映させることができます。

<?php
declare(strict_types=1);

namespace Ytake\KafkaConsole\Foundation;

use RdKafka\Conf;
use RdKafka\Producer;
use RdKafka\Consumer;

/**
 * Class Configure
 */
final class Configure
{
    // 省略
    /**
     * @return Producer
     */
    public function producer(): Producer
    {
        $conf = new Conf();
        foreach ($this->producerConfigure as $key => $item) {
            $conf->set($key, $item);
        }
        return new Producer($conf);
    }
    /**
     * @return Consumer
     */
    public function consumer(): Consumer
    {
        $conf = new Conf();
        foreach ($this->consumerConfigure as $key => $item) {
            $conf->set($key, $item);
        }
        return new Consumer($conf);
    }
    // 
}

Produce自体は非常にシンプルです

<?php

    /**
     * @param AbstractProduceDefinition $definition
     */
    public function produce(AbstractProduceDefinition $definition)
    {
        $kafkaTopic = $this->producerTopic();
        $kafkaTopic->produce(RD_KAFKA_PARTITION_UA, 0, $definition->payload());
        if ($this->logger instanceof LoggerInterface) {
            $this->logger->info($definition->payload());
        }
        $this->producer->poll(0);
    }
    /**
     * @return ProducerTopic
     */
    protected function producerTopic(): ProducerTopic
    {
        $configure = $this->repository->find($this->topic);
        $this->producer = $configure->producer();
        $this->producer->setLogLevel(LOG_DEBUG);
        $this->producer->addBrokers($configure->brokers());
        return $this->producer->newTopic($this->topic);
    }

RD_KAFKA_PARTITION_UA で、Producer側から自動でpartitionを選択しpublishします。
複数のpartitionを意図した様に操作する場合は
RdKafka\ProducerTopic, RdKafka\Producer クラスのproduceメソッドの引数で意図したものに指定してください。

Consumerは下記の様になります

<?php

    /**
     * @param Consumable $callable
     *
     * @throws \Exception
     */
    public function handle(Consumable $callable)
    {
        $topic = $this->consumerTopic();
        $topic->consumeStart($this->partition, $this->offset);
        while (true) {
            $message = $topic->consume($this->partition, 120 * 10000);
            if ($message instanceof Message) {
                switch ($message->err) {
                    case RD_KAFKA_RESP_ERR_NO_ERROR:
                        call_user_func($callable, $message);
                        break;
                    case RD_KAFKA_RESP_ERR__TIMED_OUT:
                        throw new \Exception("time out.");
                        break;
                    default:
                        break;
                }
            }
        }
    }

サンプルのアプリケーションでは、
producerは $ php kafka-console kafka:produce message-topic hello
consumerは $ php kafka-console kafka:consume message-topic でそれぞれ簡単に動作確認ができます。

consumerを複数立ち上げて、produceを実行すると複数のconsumerが反応します。
またpartitionを指定してconsumerを起動すると、特定のconusmerのみ反応することが確認できると思います。

複雑なアプリケーションや、ビッグデータを扱うアプリケーションなどでは、
一つのアプリケーションだけで解決するには、困難が付きまといます。
マイクロサービスアーキテクチャなどはこうしたミドルウェアを用いたEvent Sourcingなども重要になってきます。
これらのミドルウェアを適切に使い、アプリケーション作りに役立てていきましょう。