PHPを使ってEvent Streaming + CQRSをざっくり理解してみよう(Laravel)

これはさりげなく スターフェスティバル Advent Calendar 2020の20日目です。

PHPカンファレンス2020

2019年は登壇などを控えて一休みの期間としていたので一年振りくらいの
と登壇となりました。

発表の内容としてはここ3、4年注力しているデータ処理まわりから、
PHPにおけるWebアプリケーションなどでも活用することができる題材を取り上げてお話させていただきました。

要するに事業に関わっている開発は年々要件も複雑になっていき、
問題解決するためにはいろんな手法があるけど、きちんと分析して
開発しやすいよう、フレームワークにべったり依存してつくるのではなく、
数年先を見越してつくったり、改善する方法の一つにCQRSもありますよ、という話です。

お話したように、全てのアプリケーションでペイできるものではありませんし、
ある程度大きな規模だったりある程度複雑な機能だったり、
または周辺サービスや事業自体の構想によって初めて導入するかしないかという話になります。

なんかカッコ良さそうな方法があるから採用しよう!では失敗しますので
よく見極めて導入するのが良いと思います。

さてアドベントカレンダーということもあり、実際に自分自身が手がけてきたものを取り入れて、 CQRSをざっくり理解してみよう ということで概念だけではなく、
PHPのコードも交えて簡単に解説します。

スタフェスではある部分をES+CQRSに置き換える途中だったり、
データ基盤的なものをこれらの手法の発展形ともいえるラムダアーキテクチャなどを用いていろんなものを
作り上げようとしている段階です。

今回で取り上げるものは、Event Sourcingのエッセンスを流用した
Event Streamingによる流れになります。
期限付きで状態をKSQL(Kafkaで提供されているもの)などを組み合わせて
以前の状態に復元したり、リプレイすることは可能ですが このソフトウェアレベルだけでは実現できませんので、Outboxパターンを併用しましょう。

ではざっくりといきましょう。

仮のお話

*ブログか、レビューか、何かそういうものを想像してください。

ユーザーが記事にキーワードを投稿して、頻度の高いキーワードをサジェストしたり、
ワードクラウドみたいなものを実現したいんですよね!
難しいかもしれませんが、キーワードのサジェストはリアルタイムに近いくらいの速度で変えたいです。 もちろんデータ集計や将来的にはレコメンデーションで使いたいです!

すごい雑な内容ですがこんな要望を実現しようという場合に、
キーワードはおそらくタグ的な用途だと思われますが、
記事などのコンテンツと同時にキーワードが投稿されるらしいものというのはざっくりわかります。

おそらくキーワードと、それを投稿したユーザー情報が分ければ良さそうです。
大きくするとキリがないのでここでは例としてユーザーIDだけにします。

ワードクラウドみたいなものやサジェストはどう実現したらよいでしょう?
ワードクラウドみたいなものやサジェスト はどうやら投稿したユーザー向けというわけではなく、
UIやAPIを含めて、不特定多数のユーザー向けの機能なようです。

ユーザーが投稿する時のキーワードとは違う概念になりそう(利用数の概念などが加わります)で、
サジェストはさすがにRDBMSでは無理でしょう。
ElasticsearchやSolrを使えば実現できそうです。

ざっくりとこういう内容だとします。

RDBMSでも近しいことはできなくはないと思いますが、
情報取得にはLIKE検索と集計を多用することになり、アナライザーなどはないため
日本語サジェストに対応するにはかなり厳しいものがあります。

ワードクラウド的なものは集計するだけでできそうですね。
ただし上記のサジェスト、ワードクラウドはユーザーIDは不用そうです。

同じキーワード、という文字でもアプリケーションに関わるユーザーの角度から
多少コンテキストが違うのがわかります。

例としてかなり簡単ですので、CQRSを使うほどでもないですが
Streamingと組み合わせてどういう風に作っていくか簡単に見ていきましょう。

コマンド実装例

今回の内容のコードは下記で公開しています。

github.com

CQRSについては、取り入れる環境やアプリケーションによって広義の意味だったりすることもありますので、
まずはCQRS Documents by Greg Youngを参照してください。

ベースの考え方としては、副作用のある書き込み処理と副作用のない読み込み処理を分離しましょう。
というものです。
当然DDDとシステムを利用するUI的な問題もともに解決しなければならないため、
スマートUI的な解決方法も取り入れなければなりません。
画面を構成する要素と要件分析は必ずしも一致しません。
(データベースの物理的な設計もデータモデルも異なります。)

このあたりはTask Based User Interface考察の下記の記事もわかりやすいと思います。

qiita.com

現実的にはうまくいろんな要素を取り入れなければ、
アプリケーションの規模によってはパフォーマンス面で深刻なボトルネックがあったり、
多様性のないデータベースなどが溢れてしまいます。

さてこの例のアプリケーションでは、
書き込みとしてはキーワード投稿(通常はブログかなにかの一部ですが、それだけを抜き出したものとして)が、
該当することがわかります。
読み込みとしてはサジェスト、ワードクラウドが該当します。

まずこれらを分割します。

まずはユーザーが投稿するキーワードです。

<?php
declare(strict_types=1);

namespace SampleDomain\Keyword\Entity;

use SampleDomain\User\ValueObject\UserId;

final class Keyword
{
    /**
     * @param UserId $user
     * @param string $word
     */
    public function __construct(
        private UserId $user,
        private string $word
    ) {
    }

    public function getUserId(): UserId
    {
        return $this->user;
    }

    public function getWord(): string
    {
        return $this->word;
    }
}

実際に必要なのはこれだけです。
これがアクションからユースケース(アプリケーションサービス)が実行されます。

<?php
// 省略
    /**
     * キーワードを登録する
     * @LogExceptions()
     * @param int $id
     * @param string $text
     */
    public function register(
        int $id,
        string $text
    ): void {
        $keyword = new Keyword(new UserId($id), $text);
        $this->dbRepository->save($keyword);
    }

書き込みを実行する実装で、簡単に実装できました。
ですが、ここに一つの問題があります。
それは書き込みの処理です。

サジェストがあるので RDBMS(ここではMySQLを使います)とElasticsearchに書き込まなければいけません。
ここで2フェーズコミットの問題が出てきます。

<?php
// 省略
        $this->dbRepository->save($keyword);
        $this->esRepository->save($keyword);

この二つに書き込む場合、MySQLへの書き込みが失敗した場合に
次のElasticsearchへの書き込みを停止することはまだ簡単です。

Elasticsearchが失敗した場合、MySQLへ書き込んだ内容を削除しなければなりません。
この順序を逆さまにした場合でも同じです。

2つとも無事に書き込めたことを確認し、正常終了したものとしてコミット扱いにしなければなりません。
これを回避するにはどれか一つにだけ書き込んで、
バッチ処理か何かでデータを同期してあげれば良さそうです。

ただリアルタイムに近しい頻度で更新して欲しい、という話がありました。
そのあたり要件の調整をしろ、というのもありますが、
そこで止めては今回のサンプルの意味がありません。

<?php
// 省略
    /**
     * キーワードを登録する
     * @LogExceptions()
     * @param int $id
     * @param string $text
     */
    public function register(
        int $id,
        string $text
    ): void {
        $keyword = new Keyword(new UserId($id), $text);
        $this->dbRepository->save($keyword);
        $this->dispatcher->dispatch(new KeywordRegistered($keyword));
    }

リアルタイムに近しい速度を実現させるためにpubsubのメッセージブローカーを使うことにしました。
dispatchを使ってメッセージブローカーへの通知をするように実装してみました。

<?php
declare(strict_types=1);

namespace App\Listeners;

use App\DataAccess\Kafka\KeywordCreatedParameter;
use App\DataAccess\KeywordProducerInterface;
use SampleDomain\Keyword\Event\KeywordRegistered;

class KeywordRegisteredListener
{
    /**
     * @param KeywordProducerInterface $producer
     */
    public function __construct(
        private KeywordProducerInterface $producer
    ) {
    }

    /**
     * @param KeywordRegistered $event
     */
    public function handle(
        KeywordRegistered $event
    ): void {
        $this->producer->add(
            new KeywordCreatedParameter(
                $event->getKeyword()
            )
        );
    }
}

リスナーで受け取ってpublishしているだけです。 これならいけそうです。

とはなりません。
結局、MySQLかElasticsearchのどちらかに書き込んだあとに、
メッセージブローカーへの書き込みが失敗したら戻さなければなりません。
ということで、ここではメッセージブローカーへのpublishに注力すれば良さそうです。

<?php
declare(strict_types=1);

namespace App\AppService;

use Psr\Log\LoggerInterface;
use SampleDomain\Keyword\Event\KeywordRegistered;
use Illuminate\Contracts\Events\Dispatcher;
use SampleDomain\Keyword\Entity\Keyword;
use SampleDomain\User\ValueObject\UserId;
use Ytake\LaravelAspect\Annotation\LogExceptions;

/**
 * Usecase
 */
class KeywordRegistration
{
    /**
     * @param LoggerInterface $logger
     * @param Dispatcher $dispatcher
     */
    public function __construct(
        private LoggerInterface $logger,
        private Dispatcher $dispatcher
    ) {
    }

    /**
     * キーワードを登録する
     * @LogExceptions()
     * @param int $id
     * @param string $text
     */
    public function register(
        int $id,
        string $text
    ): void {
        $keyword = new Keyword(new UserId($id), $text);
        $this->dispatcher->dispatch(new KeywordRegistered($keyword));
        $this->logger->info('publish', ['object' => $keyword]);
    }
}

DebeziumやDynamoDB ストリームを使うことで、この問題を回避することができますが、
ここではどこかの環境に依存していないアプリケーションのアプローチとして簡略化します。
あくまで非常にゆるい結果整合になることに注意してください。

このアプローチを採用する場合に、当然重要なのがメッセージブローカーに何を使うか、になると思います。
PHPに焦点を当てたこの例では、有力なのはApache Kafkaとなります。
大きな問題としてKafkaへの送信失敗時の挙動です。

PHPの場合、librdkafkaを使ったrdkafkaを利用するわけですが
このlibrdkafkaは通信失敗時のリトライがサポート、
それに加え通信のトランザクションもサポートされています。

<?php
declare(strict_types=1);

namespace App\DataAccess;

use App\DataAccess\Kafka\ParameterInterface;
use App\Foundation\Serializer\SerializerInterface;
use RdKafka\Producer;
use RdKafka\ProducerTopic;
use function is_null;
use const RD_KAFKA_PARTITION_UA;

final class KeywordProducer implements KeywordProducerInterface
{
    /**
     * @param Producer $producer
     * @param ProducerTopic $topic
     * @param SerializerInterface $serializer
     */
    public function __construct(
        private Producer $producer,
        private ProducerTopic $topic,
        private SerializerInterface $serializer
    ) {
    }

    /**
     * to Kafka
     * @param ParameterInterface $parameter
     */
    public function add(
        ParameterInterface $parameter
    ): void {
        $this->producer->initTransactions(10000);
        $this->producer->beginTransaction();
        $this->topic->produce(
            RD_KAFKA_PARTITION_UA,
            0,
            $this->serializer->serialize($parameter->toArray())
        );
        $this->producer->poll(0);
        $error = $this->producer->commitTransaction(10000);
        if (!is_null($error)) {
            throw new \RuntimeException('Kafka Transaction Error.');
        }
    }
}

もちろん選択肢としてApache Pulsarもあるかと思います。
この辺はみなさんのアプリケーションによって最適なものを採用できればいいかと思います。
サンプルコードではjsonで送信していますが、
より強固なアプリケーションにする場合はApache Avroを利用します。
ほかにもKafka Streamsを使うことでメッセージがsubscribeされる前にフィルターしたり(バリデーションなど)、
メッセージの中身によって振り分けたりさまざまなことができます。
この辺りもアプリケーションに合わせて採用するといいでしょう。

これでメッセージブローカーへの送信が確かなものになりました。 このメッセージをサブスクライブしてそのまま補完していけばイベントを再発行することもでき、
遡ることもできます。
またKafka自体にもメッセージを消失させずに30日まで補完する機能がありますので、
その辺りもうまく利用できます。
また手抜きをするならば、kafkaで受け取ったメッセージをそのまま他のデータベースに保存することもできます。
(Kafka Connect)
これでイベントソーシングと組み合わせることができました。

読み込みモデル更新処理

次に読み込みモデルの更新です。
ここではメッセージブローカーをsubscribeしたプロセスの処理が該当します。

PHPではこれを行うにはsystemdかsupervisorなどで常駐プロセスにするしかありません。

ここでLaravelならQueueがあるのに、という話になりますが、
今回はMySQLとElasticsearchを使って二つのデータベースに書き込まなければいけません。
つまり一つのQueueに異なるプロセスが同時にアクセスできる必要があります。
これを一つのプロセスで実装すると2フェーズコミットの問題がそのままくっついてきます。
(同じデータベースであれば問題ありません)

ラウンドロビンというよりも並行して処理が走る、ということになります。
これはもうフレームワークの機能でどうにかなる話ではありません。
それにパフォーマンス面でPHPの読み込み処理をGoやScalaなどに変更する、というのは
事業サービス形の会社ではよくある話だと思います。
この場合、LaravelのPHPシリアライズしたQueueを使い続けていては足枷になってしまいます。

こういった処理をする場合はフレームワーク依存の機能を使わないことが一番です。

MySQLとElasticsearchに書き込むということで、MySQLの方は簡単ですので、
サンプルを見るなり自身で作りなりしてもらえれば良いですが、
問題はElasticsearchです。

サジェストなどを実装しなければなりませんので、LIKE検索の延長で使うだけではできません。
これを解決するにはアナライザーを利用することです。
kuromojiやicu、ngramなどを組み合わせると対応できます。
今回のサンプルでは最低限の構成になっていますので、下記のようなmappingで十分です。

{
  "settings": {
    "index": {
      "number_of_shards": 3,
      "number_of_replicas": 0
    },
    "analysis": {
      "char_filter": {
        "normalize": {
          "type": "icu_normalizer",
          "name": "nfkc",
          "mode": "compose"
        },
        "kana_to_romaji": {
          "type": "mapping",
          "mappings": [
            "あ=>a",
            "い=>i",
            "う=>u",
            "え=>e",
            "お=>o",
            "か=>ka",
            "き=>ki",
            "く=>ku",
            "け=>ke",
            "こ=>ko",
            "さ=>sa",
            "し=>shi",
            "す=>su",
            "せ=>se",
            "そ=>so",
            "た=>ta",
            "ち=>chi",
            "つ=>tsu",
            "て=>te",
            "と=>to",
            "な=>na",
            "に=>ni",
            "ぬ=>nu",
            "ね=>ne",
            "の=>no",
            "は=>ha",
            "ひ=>hi",
            "ふ=>fu",
            "へ=>he",
            "ほ=>ho",
            "ま=>ma",
            "み=>mi",
            "む=>mu",
            "め=>me",
            "も=>mo",
            "や=>ya",
            "ゆ=>yu",
            "よ=>yo",
            "ら=>ra",
            "り=>ri",
            "る=>ru",
            "れ=>re",
            "ろ=>ro",
            "わ=>wa",
            "を=>o",
            "ん=>n",
            "が=>ga",
            "ぎ=>gi",
            "ぐ=>gu",
            "げ=>ge",
            "ご=>go",
            "ざ=>za",
            "じ=>ji",
            "ず=>zu",
            "ぜ=>ze",
            "ぞ=>zo",
            "だ=>da",
            "ぢ=>ji",
            "づ=>zu",
            "で=>de",
            "ど=>do",
            "ば=>ba",
            "び=>bi",
            "ぶ=>bu",
            "べ=>be",
            "ぼ=>bo",
            "ぱ=>pa",
            "ぴ=>pi",
            "ぷ=>pu",
            "ぺ=>pe",
            "ぽ=>po",
            "きゃ=>kya",
            "きゅ=>kyu",
            "きょ=>kyo",
            "しゃ=>sha",
            "しゅ=>shu",
            "しょ=>sho",
            "ちゃ=>cha",
            "ちゅ=>chu",
            "ちょ=>cho",
            "にゃ=>nya",
            "にゅ=>nyu",
            "にょ=>nyo",
            "ひゃ=>hya",
            "ひゅ=>hyu",
            "ひょ=>hyo",
            "みゃ=>mya",
            "みゅ=>myu",
            "みょ=>myo",
            "りゃ=>rya",
            "りゅ=>ryu",
            "りょ=>ryo",
            "ぎゃ=>gya",
            "ぎゅ=>gyu",
            "ぎょ=>gyo",
            "じゃ=>ja",
            "じゅ=>ju",
            "じょ=>jo",
            "びゃ=>bya",
            "びゅ=>byu",
            "びょ=>byo",
            "ぴゃ=>pya",
            "ぴゅ=>pyu",
            "ぴょ=>pyo",
            "ふぁ=>fa",
            "ふぃ=>fi",
            "ふぇ=>fe",
            "ふぉ=>fo",
            "ふゅ=>fyu",
            "うぃ=>wi",
            "うぇ=>we",
            "うぉ=>wo",
            "つぁ=>tsa",
            "つぃ=>tsi",
            "つぇ=>tse",
            "つぉ=>tso",
            "ちぇ=>che",
            "しぇ=>she",
            "じぇ=>je",
            "てぃ=>ti",
            "でぃ=>di",
            "でゅ=>du",
            "とぅ=>tu",
            "ぢゃ=>ja",
            "ぢゅ=>ju",
            "ぢょ=>jo",
            "ぁ=>a",
            "ぃ=>i",
            "ぅ=>u",
            "ぇ=>e",
            "ぉ=>o",
            "っ=>t",
            "ゃ=>ya",
            "ゅ=>yu",
            "ょ=>yo",
            "ア=>a",
            "イ=>i",
            "ウ=>u",
            "エ=>e",
            "オ=>o",
            "カ=>ka",
            "キ=>ki",
            "ク=>ku",
            "ケ=>ke",
            "コ=>ko",
            "サ=>sa",
            "シ=>shi",
            "ス=>su",
            "セ=>se",
            "ソ=>so",
            "タ=>ta",
            "チ=>chi",
            "ツ=>tsu",
            "テ=>te",
            "ト=>to",
            "ナ=>na",
            "ニ=>ni",
            "ヌ=>nu",
            "ネ=>ne",
            "ノ=>no",
            "ハ=>ha",
            "ヒ=>hi",
            "フ=>fu",
            "ヘ=>he",
            "ホ=>ho",
            "マ=>ma",
            "ミ=>mi",
            "ム=>mu",
            "メ=>me",
            "モ=>mo",
            "ヤ=>ya",
            "ユ=>yu",
            "ヨ=>yo",
            "ラ=>ra",
            "リ=>ri",
            "ル=>ru",
            "レ=>re",
            "ロ=>ro",
            "ワ=>wa",
            "ヲ=>o",
            "ン=>n",
            "ガ=>ga",
            "ギ=>gi",
            "グ=>gu",
            "ゲ=>ge",
            "ゴ=>go",
            "ザ=>za",
            "ジ=>ji",
            "ズ=>zu",
            "ゼ=>ze",
            "ゾ=>zo",
            "ダ=>da",
            "ヂ=>ji",
            "ヅ=>zu",
            "デ=>de",
            "ド=>do",
            "バ=>ba",
            "ビ=>bi",
            "ブ=>bu",
            "ベ=>be",
            "ボ=>bo",
            "パ=>pa",
            "ピ=>pi",
            "プ=>pu",
            "ペ=>pe",
            "ポ=>po",
            "キャ=>kya",
            "キュ=>kyu",
            "キョ=>kyo",
            "シャ=>sha",
            "シュ=>shu",
            "ショ=>sho",
            "チャ=>cha",
            "チュ=>chu",
            "チョ=>cho",
            "ニャ=>nya",
            "ニュ=>nyu",
            "ニョ=>nyo",
            "ヒャ=>hya",
            "ヒュ=>hyu",
            "ヒョ=>hyo",
            "ミャ=>mya",
            "ミュ=>myu",
            "ミョ=>myo",
            "リャ=>rya",
            "リュ=>ryu",
            "リョ=>ryo",
            "ギャ=>gya",
            "ギュ=>gyu",
            "ギョ=>gyo",
            "ジャ=>ja",
            "ジュ=>ju",
            "ジョ=>jo",
            "ビャ=>bya",
            "ビュ=>byu",
            "ビョ=>byo",
            "ピャ=>pya",
            "ピュ=>pyu",
            "ピョ=>pyo",
            "ファ=>fa",
            "フィ=>fi",
            "フェ=>fe",
            "フォ=>fo",
            "フュ=>fyu",
            "ウィ=>wi",
            "ウェ=>we",
            "ウォ=>wo",
            "ヴァ=>va",
            "ヴィ=>vi",
            "ヴ=>v",
            "ヴェ=>ve",
            "ヴォ=>vo",
            "ツァ=>tsa",
            "ツィ=>tsi",
            "ツェ=>tse",
            "ツォ=>tso",
            "チェ=>che",
            "シェ=>she",
            "ジェ=>je",
            "ティ=>ti",
            "ディ=>di",
            "デュ=>du",
            "トゥ=>tu",
            "ヂャ=>ja",
            "ヂュ=>ju",
            "ヂョ=>jo",
            "ァ=>a",
            "ィ=>i",
            "ゥ=>u",
            "ェ=>e",
            "ォ=>o",
            "ッ=>t",
            "ャ=>ya",
            "ュ=>yu",
            "ョ=>yo"
          ]
        }
      },
      "tokenizer": {
        "kuromoji_normal": {
          "mode": "normal",
          "type": "kuromoji_tokenizer"
        }
      },
      "filter": {
        "readingform": {
          "type": "kuromoji_readingform",
          "use_romaji": true
        },
        "edge_ngram": {
          "type": "edge_ngram",
          "min_gram": 1,
          "max_gram": 10
        },
        "synonym": {
          "type": "synonym",
          "lenient": true,
          "synonyms": [
            "nippon, nihon"
          ]
        }
      },
      "analyzer": {
        "suggest_index_analyzer": {
          "type": "custom",
          "char_filter": [
            "normalize"
          ],
          "tokenizer": "kuromoji_normal",
          "filter": [
            "lowercase",
            "edge_ngram"
          ]
        },
        "suggest_search_analyzer": {
          "type": "custom",
          "char_filter": [
            "normalize"
          ],
          "tokenizer": "kuromoji_normal",
          "filter": [
            "lowercase"
          ]
        },
        "readingform_index_analyzer": {
          "type": "custom",
          "char_filter": [
            "normalize",
            "kana_to_romaji"
          ],
          "tokenizer": "kuromoji_normal",
          "filter": [
            "lowercase",
            "readingform",
            "asciifolding",
            "synonym",
            "edge_ngram"
          ]
        },
        "readingform_search_analyzer": {
          "type": "custom",
          "char_filter": [
            "normalize",
            "kana_to_romaji"
          ],
          "tokenizer": "kuromoji_normal",
          "filter": [
            "lowercase",
            "readingform",
            "asciifolding",
            "synonym"
          ]
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "word_field": {
        "type": "keyword",
        "fields": {
          "suggest": {
            "type": "text",
            "search_analyzer": "suggest_search_analyzer",
            "analyzer": "suggest_index_analyzer"
          },
          "readingform": {
            "type": "text",
            "search_analyzer": "readingform_search_analyzer",
            "analyzer": "readingform_index_analyzer"
          }
        }
      }
    }
  }
}

この辺りはこれを解説するだけで数記事になりますので、
興味ある方は下記のものをはじめとして色々読み込んでみてください。

www.elastic.co

これを利用すれば集計と利用頻度順のサジェストをうまく活用できます。
ここまでデータベースだけの世界で解決できました。

これを読み込む、クエリの処理は単純に問い合わせてDTO(Data Transfer Object)だけあれば十分でしょう。
例えばクエリの対象がUIや複雑なAPIであれば、それに合わせたデータを構築すれば済みます。
また対象のインデックスの中身のデータを更新したければ、
それ用のメッセージをpublishし、subscribeした処理で書き換えれば済みますので、
複雑な処理がWebアプリケーション側からも排除できます。
またチームで開発をする場合は、開発者の分担もうまくできるようになります。
(どこかの層にビジネスロジックが集中しすぎないように配慮する必要はあります。)

<?php
// 省略

    /**
     * @param HandlerInterface $handler
     * @param Consumer $consumer
     * @param int $offset
     */
    public function handle(
        HandlerInterface $handler,
        Consumer $consumer,
        int $offset = RD_KAFKA_OFFSET_STORED
    ): void {
        $consumer->addBrokers($this->broker);
        $topic = $consumer->newTopic($this->topic, $this->topicConf);
        $topic->consumeStart($this->partition, $offset);
        while (true) {
            $message = $topic->consume($this->partition, 120 * 10000);
            if ($message instanceof Message) {
                match ($message->err) {
                    RD_KAFKA_RESP_ERR_NO_ERROR => call_user_func($handler, $message),
                    RD_KAFKA_RESP_ERR__TIMED_OUT => throw new SubscriberTimeoutException('time out.'),
                };
            }
        }
    }

今回の例ではsubsrcibeの仕組みと、そのデータを受け取って処理するクラスを分離するようにしました。
この辺りは単純にデータ処理だけになりますので、特に難しいものはありません。
なにかのビジネスロジックを解決するわけでもありませんので、
シンプルな仕組みになっています。

<?php
declare(strict_types=1);

namespace App\DataAccess\Elasticsearch;

use DateTime;
use App\Foundation\Kafka\HandlerInterface;
use Elasticsearch\Client;
use RdKafka\Message;

class RegisterKeyword implements HandlerInterface
{
    /**
     * @param Client $client
     * @param string $index
     */
    public function __construct(
        private Client $client,
        private string $index
    ) {
    }

    /**
     * @param Message $message
     * @throws \JsonException
     */
    public function __invoke(
        Message $message
    ): void {
        $decoded = json_decode($message->payload, false, 512, JSON_THROW_ON_ERROR);
        $word = '';
        if (isset($decoded->body)) {
            $word = $decoded->body;
        }
        if ($word === '') {
            return;
        }
        $d = new DateTime();
        $this->client->index([
            'index' => $this->index,
            'body' => [
                'word_field' => $word,
                'created' => $d->format('Y-m-d\TH:i:s')
            ]
        ]);
    }
}

クエリ実装例

クエリとしては、特に何かに注力しなければいけないことはありません。
コマンドと同じクラスを使わない、なにかを変に汎用化させないなどができていれば良いでしょう。

<?php
declare(strict_types=1);

namespace App\DataAccess\Elasticsearch;

use App\DataAccess\GetSuggestKeywordInterface;
use Elasticsearch\Client;
use function array_merge;

final class SuggestKeyword implements GetSuggestKeywordInterface
{
    use AggregateQuery;

    /**
     * @param Client $client
     * @param string $index
     */
    public function __construct(
        private Client $client,
        private string $index
    ) {
    }

    /**
     * @param string $word
     * @return array
     */
    public function findByWord(
        string $word
    ): array {
        $params = [
            'index' => $this->index,
            'body' => $this->aggsQuery(),
        ];
        if ($word !== '') {
            $params['body'] = array_merge($params['body'], [
                'query' => $this->suggestMatchQuery($word)
            ]);
        }
        $result = $this->client->search($params);
        return $result['aggregations']['keyword']['buckets'];
    }

    /**
     * @param string $word
     * @return array
     */
    private function suggestMatchQuery(
        string $word
    ): array {
        return  [
            'bool' => [
                'should' => [
                    [
                        'match' => [
                            'word_field.suggest' => [
                                'query' => $word
                            ]
                        ]
                    ],
                    [
                        'match' => [
                            'word_field.readingform' => [
                                'query' => $word,
                                'fuzziness' => 'AUTO',
                                'operator' => 'and'
                            ]
                        ]
                    ]
                ]
            ]
        ];
    }
}

Elasticsearchへの問い合わせは上記のような形になります。
ここで取得したものをインターフェースを挟んで
DTOに変換してあげると要件を実現できます。

<?php
declare(strict_types=1);

namespace App\QueryProcessor;

use App\Transfer\Keyword;
use Generator;
use App\DataAccess\GetSuggestKeywordInterface;

final class SuggestKeywordQueryProcessor
{
    /**
     * @param GetSuggestKeywordInterface $suggestKeyword
     */
    public function __construct(
        private GetSuggestKeywordInterface $suggestKeyword
    ) {
    }

    /**
     * @param string $word
     * @return Generator
     */
    public function run(string $word): Generator
    {
        foreach ($this->suggestKeyword->findByWord($word) as $row) {
            yield new Keyword($row['key'], $row['doc_count']);
        }
    }
}

気になる方は実際のサンプルコードを見るなどすると理解できると思います。

最後に

一つ一つの処理は単純になることがわかると思います。
ただし作るものや関わるミドルウェアが増えるのは事実ですので、
利用するミドルウェアについてもしっかりと理解をする必要があります。

またこのKafkaのデータハブとして作用する仕組みを使って、
データ基盤系のデータパイプラインやSparkを使ったより高度なアプリケーションを開発することができます。
その辺りになると本格的な分散処理や、Webアプリケーションとは全く違う知識やテクニックが必要となりますので、
エンジニアとしてのスキルセットをさらに増やしたい方にはいい入り口になると思います。
これらはES+CQRSではなくてイベント駆動になりますが、その先にUIがあることも多々ありますので考え方は踏襲できます。

もうちょっと丁寧に図解とかしようと思いましたが力尽きたので今回はこの辺で。

会社のアドベントカレンダーということで、
こういった処理を作ったり考えたり、改善をしまくりたい!というTSをメインにして
GoやPHPをやりたい!というエンジニアの方や、
KafkaやSparkなどを使ったストリーム処理(Scala、Java)とかやりたいというエンジニアの方、
それらを支えるインフラをやりたい!SREみたいなことしたい!というインフラエンジニアの方を

募集しております!!!!!

お気軽に@ex_takezawaなどにご連絡ください。