PHPでアクターモデルを使ってES+CQRSを実装してみよう

PHPでES+CQRS

PHPで完全なES+CQRSを表現するには、CDC(Change Data Capture)+Outbox形式にするのが定番でした。

単純なQueueではリプレイができず、
伝播させて終わり、いわゆるEvent Streamingの形になってしまうことが多いと思います。
とはいえCDC+Outboxでも環境によってはN層コミットがどうしても発生してしまいます。

ではCDC+Outboxを使わずにES+CQRSを実現するにはどのような方法があるのでしょうか?

ということで早速他言語でもお馴染みのアクターモデルを導入して実装してみましょう。

アクターモデル自体を知らないと本エントリは少しばかり難しいと思いますが、
ChatGPTなりClaudeなどに解説してもらってください。

アクターは並行で動作してメッセージのみでやり取りを行い、
ローカルでも物理的な別サーバでもk8s上で散らばったものでも、
場所関係なくアクターを利用することができるものです。
単純なQueueではありません(内部にMailboxというQueueがありますがそれ自体ではないです)。

今回はめちゃくちゃ長いので覚悟して読みましょう。

アクターモデル導入でES+CQRSを体験

PHPでアクターモデルを導入するにはほぼ選択肢がありません。
ので、拙作のPhluxor を使って解説していきます。

例としてユーザーの作成とユーザー一覧みたいなものをテーマに進めていきましょう。

サンプルコードは下記

github.com

作るものはこんな感じのイメージです(実際にはちょっと異なりますが)

Command

graph TD;
    Mezzio(Mezzio) -.- Phluxor(Phluxor);
    Phluxor --> |Spawn| RestAPIActor(RestAPI Actor);
    Phluxor --> |Spawn| TypedChannelActor(Stream Actor);
    TypedChannelActor(TypedChannel Actor) --> Mezzio(Mezzio);
    RestAPIActor(RestAPI Actor) --> |Spawn| UserRegistration/UserActor(User Registration Actor);
    RestAPIActor(RestAPI Actor)  --> |Spawn| ReadModelUpdateActor(Read Model Update Actor);
    UserRegistration/UserActor --> |Send| TypedChannelActor(TypedChannel Actor);
    UserRegistration/UserActor <--> |Event Sourcing/Replay| Persistence(Persistence);
    Persistence <--> MySQL(MySQL);
    ReadModelUpdateActor(Read Model Update Actor)  <--> MySQL(MySQL);

Query

graph TD;
    Mezzio(Mezzio) <--> |DTO|MySQL(MySQL);
    Mezzio(Mezzio) --> |HTTP Response|Response(JSON);

サンプルなのでデータベースはMySQLのみですが、
イベントの保管やアプリケーションで必要なデータはテーブル単位で分け、
共有しない状態になっています。

実際には別のデータベースで問題ありません。

アクターシステムを起動しよう

早速ですが、Webアプリケーション形式で動かす場合、アクターモデルは基本的に並行処理が前提になりますので
coroutineに対応したものを利用していきます。

docs.mezzio.dev

docs.mezzio.dev

mezzio以外でも動作させることはできますが、
Laravel Octaneはcoroutineが無効になっていますので
Laravelですんなり導入できるかどうかは試していません。
ぜひ試してみてください。

mezzioの場合、
アプリケーションサーバとして起動すると WorkerStartEvent イベントが送られてきますので、
これをListenしてアクターシステムを起動できます。

こうすることでアプリケーションサーバが起動中はアクターシステムが常時起動したままの状態になります。
一般的なPHPアプリケーションのようにリクエストを受ける度にアクターシステムを起動すると
並行処理としてもリソースが勿体なく、パフォーマンスにも悪いため起動中はアクターシステムは常時動いたままにしておきましょう。
(あとに解説しますが、アクターシステム起動時にすべて元通りに復元されますので安心してデプロイできます)

<?php

declare(strict_types=1);

namespace App\ActorSystem;

use Closure;
use Laminas\ServiceManager\ServiceManager;
use Mezzio\Swoole\Event\WorkerStartEvent;
use Phluxor\ActorSystem;
use Phluxor\ActorSystem\Props;
use Phluxor\Persistence\Mysql\DefaultSchema;
use Phluxor\Persistence\Mysql\MysqlProvider;
use Phluxor\Persistence\ProviderInterface;
use Phluxor\Persistence\ProviderStateInterface;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use RuntimeException;
use Swoole\Database\PDOProxy;

readonly class BootAppActor
{
    public function __construct(
        private ContainerInterface $container
    ) {
    }

    public function __invoke(WorkerStartEvent $event): void
    {
        $system = ActorSystem::create();
        $proxy = $this->container->get(PDOProxy::class);
        $spawned = $system->root()->spawnNamed(
            Props::fromProducer(
                fn() => new RestApiActor(
                    $proxy,
                    $this->stateProvider($proxy, $system->getLogger()),
                )
            ),
            AppActor::NAME
        );
        if ($this->container instanceof ServiceManager) {
            $this->container->setService(
                AppActor::class,
                new AppActor($system, $spawned->getRef())
            );
            return;
        }
        throw new RuntimeException('Container is not a ServiceManager');
    }

    /**
     * @param PDOProxy $proxy
     * @param LoggerInterface $logger
     * @return Closure(int): ProviderStateInterface&ProviderInterface
     */
    private function stateProvider(
        PDOProxy $proxy,
        LoggerInterface $logger
    ): Closure {
        return function (int $snapshotInterval) use ($proxy, $logger) {
            return new MysqlProvider(
                $proxy,
                new DefaultSchema(),
                $snapshotInterval,
                $logger
            );
        };
    }
}

コンテナへの登録方法はいくつかありますが、
ここでは Laminas\ServiceManager\ServiceManager 前提のコードとなっています。

dependencies.global.php や、App\ConfigProviderを使っていないのはなぜかというと、
それらを使った通常の依存解決方法はリクエストの度に実行されるため
前述したようにリソースの効率が悪いからです。。
ここはPHPということもあって仕方がありません。

少しコードの解説をしましょう。

まずは下記の部分です。

<?php

    public function __invoke(WorkerStartEvent $event): void
    {
        $system = ActorSystem::create();
        $proxy = $this->container->get(PDOProxy::class);
        $spawned = $system->root()->spawnNamed(
            Props::fromProducer(
                fn() => new RestApiActor(
                    $proxy,
                    $this->stateProvider($proxy, $system->getLogger()),
                )
            ),
            AppActor::NAME
        );
        if ($this->container instanceof ServiceManager) {
            $this->container->setService(
                AppActor::class,
                new AppActor($system, $spawned->getRef())
            );
            return;
        }
        throw new RuntimeException('Container is not a ServiceManager');
    }

まずmezzioのイベントをListenするには __invoke を実装する必要があります。

ActorSystem::create() はPhluxorのアクターシステム起動のためのコードです。
$proxy = $this->container->get(PDOProxy::class); は、
コンテナからデータベースアクセスのインスタンスを取得しています(Queryでも利用)。
ここでのデータベース接続には持続接続(Pool)を利用しています。

<?php

        $spawned = $system->root()->spawnNamed(
            Props::fromProducer(
                fn() => new RestApiActor(
                    $proxy,
                    $this->stateProvider($proxy, $system->getLogger()),
                )
            ),
            AppActor::NAME
        );

この部分はルートアクターの定義です。
当ブログでも書いているGoで実践するアクターモデルシリーズにも詳細がありますが、
アクターはヒエラルキーを形成します。
その際にトップレベルに位置するアクターを定義しなければなりません。

ここではこのアプリケーションと合わせてRestApiアクターとしています。
spawnNamedは、アクターに意図した名前を付与するためのものです。
(指定がなければ自動で適当な名前が割り当てられます)

Props::fromProducer はアクターの設定を割り当てるためのものです。
RestApiActorの第2引数は、アクター永続化のためのデータストレージの設定です。

Phluxorではアクターの永続化にインメモリ、MySQLをデフォルトで用意しています。
(PostgreSQLにはすぐ対応する予定)

<?php

    /**
     * @param PDOProxy $proxy
     * @param LoggerInterface $logger
     * @return Closure(int): ProviderStateInterface&ProviderInterface
     */
    private function stateProvider(
        PDOProxy $proxy,
        LoggerInterface $logger
    ): Closure {
        return function (int $snapshotInterval) use ($proxy, $logger) {
            return new MysqlProvider(
                $proxy,
                new DefaultSchema(),
                $snapshotInterval,
                $logger
            );
        };
    }

MySQLを利用する場合はPhluxor\Persistence\Mysql\MysqlProvider を利用し、
データベースアクセスや、ログ、スナップショットのタイミングを指定します。
Phluxor\Persistence\Mysql\DefaultSchemaはデータベース定義情報になっています。

Phluxorでは永続化に2つのテーブルを利用します。
一つはjournals、
もう一つはsnapshots です。
Event Sourcingをする際、都度イベントを記録する仕組みが必要になります。
イベントは真のデータソースになる必要があり、どの時点にでも復元できなければなりません。

このため更新か作成か、という方法は利用できません(最新しか残らないため)。
復元時に頭からリプレイして戻す事ができますが、
大量にある場合は効率が悪くなるため、
ある時点をスナップショットとして保管してそこから復元するようになっています。

これで下記の様になります。

graph TD;
    Mezzio(Mezzio) -.- Phluxor(Phluxor);
    Phluxor --> |Spawn| RestAPIActor(RestAPI Actor);

イベントを永続化しよう!

イベントは過去におきた事実となるわけですが、
アクターはステートフルで最新のイベントの結果が今のアクター、と考えておくと良いでしょう。
誰でも誕生日を迎えることで1歳ずつ歳を取り、
学校の卒業だったり、引っ越しだったり様々なライフイベントがあり今の皆さんになっているわけですが、
思い出すこともできれば、今の状態にも戻れます。
アクターもそれと同じです。

状態を保存する場合、
他言語のアクターモデル ツールキット(Akka / Pekko / Proto Actor)と同様に、
イベントを永続化する場合は自由な型のJSONではなくProtocol Buffersを利用します。

syntax = "proto3";

package protobuf;

option php_namespace = "App\\Event\\ProtoBuf";
option php_metadata_namespace = "App\\Event\\Metadata";

message UserCreated {
  string userName = 1;
  string email = 2;
  string userID = 3;
  int64 version = 4;
}

protoc コマンドを利用するとコードが生成されます。

メッセージをいくつか

システムへの指示をコマンドとして下記のようなものを用意します。

<?php

declare(strict_types=1);

namespace App\Command;

use Phluxor\ActorSystem\Ref;

readonly class CreateUser
{
    public function __construct(
        public string $email,
        public string $userName,
        public Ref $ref
    ) {
    }
}

これはユーザー作成の指示です。
第3引数にある Phluxor\ActorSystem\Ref はユーザー作成完了したことを受け取りたいアクターとなっています。
アクターの組み合わせ方は色々ありますので、色々試してみてください。

あとはシステムからの応答メッセージです。
ユーザー作成の成功と失敗を表現したものです。

<?php

declare(strict_types=1);

namespace App\Message;

readonly class UserCreateResponse implements UserCreateMessageInterface
{
    public function __construct(
        public string $userID
    ) {
    }

    public function isSuccess(): bool
    {
        return true;
    }
}
<?php

declare(strict_types=1);

namespace App\Message;

readonly class UserCreateError implements UserCreateMessageInterface
{
    public function __construct(
        public string $message
    ) {
    }

    public function isSuccess(): bool
    {
        return false;
    }
}

メッセージの流れとしては、Webアプリケーション(mezzio)側から CreateUser をアクターシステムに送信します。
アクターはこのコマンドを受け取り、Protocol Buffersで定義した UserCreated イベントを発行して
アクターの状態として保存します(アクターが作成ユーザーとなる)。
そのイベントのままWebアプリケーション側にイベントを返却しても良いのですが、
生成できたかどうかを判断できるように、UserCreateResponse または UserCreateError (名前がイマイチですが・・・)を
メッセージとして送るようにします。

この状態(イベント)をQueryとして利用するためには、リードモデルを作るための仕組みが必要になります。
今回はリードモデル更新アクターを生成し、Queryのためのデータ成形を依頼するようにします。

アクターの状態保存以外はアクター間のメッセージパッシングになりますので、
ここではデータベース以外のミドルウェアは一切登場しません。

尚、これらのメッセージングはすべて並行で動作します。

RestApiActorを実装しよう

ある程度の流れがわかったところで、
ルートアクターとして作用するRestApiActorの中身を実装していきます。

<?php

declare(strict_types=1);

namespace App\ActorSystem;

use App\ActorSystem\UserRegistration\ReadModelUpdateActor;
use App\ActorSystem\UserRegistration\UserActor;
use App\Command\CreateUser;
use App\Database\RegisteredUser;
use App\Message\UserCreateError;
use Closure;
use Phluxor\ActorSystem\Context\ContextInterface;
use Phluxor\ActorSystem\Exception\NameExistsException;
use Phluxor\ActorSystem\Message\ActorInterface;
use Phluxor\ActorSystem\Message\Started;
use Phluxor\ActorSystem\Props;
use Phluxor\ActorSystem\Ref;
use Phluxor\ActorSystem\SpawnResult;
use Phluxor\Persistence\EventSourcedReceiver;
use Phluxor\Persistence\ProviderInterface;
use Phluxor\Persistence\ProviderStateInterface;
use Swoole\Database\PDOProxy;

class RestApiActor implements ActorInterface
{
    private ?Ref $readModelUpdater = null;

    /**
     * @param Closure(int): ProviderStateInterface&ProviderInterface $providerState
     */
    public function __construct(
        private readonly PDOProxy $proxy,
        private readonly Closure $providerState
    ) {
    }

    public function receive(ContextInterface $context): void
    {
        $msg = $context->message();
        switch (true) {
            case $msg instanceof Started:
                $this->readModelUpdater = $context->spawn(
                    Props::fromProducer(
                        fn() => new ReadModelUpdateActor(new RegisteredUser($this->proxy))
                    )
                );
                break;
            case $msg instanceof CreateUser:
                $provider = $this->providerState;
                $result = $this->spawnUserActor($context, $this->readModelUpdater, $provider, $msg);
                if ($result->isError() instanceof NameExistsException) {
                    $context->send(
                        $msg->ref,
                        new UserCreateError(
                            sprintf("user %s already exists", $msg->userName)
                        )
                    );
                    return;
                }
                $context->send($result->getRef(), $msg);
                break;
        }
    }

    public function spawnUserActor(
        ContextInterface $context,
        Ref $readModelUpdater,
        Closure $provider,
        mixed $msg
    ): SpawnResult {
        $stateProvider = $provider(3);
        return $context->spawnNamed(
            Props::fromProducer(fn() => new UserActor($readModelUpdater),
                Props::withReceiverMiddleware(
                    new EventSourcedReceiver(
                        $stateProvider
                    )
                )
            ),
            sprintf("user:%s", $msg->email)
        );
    }
}

これもコード解説をしていきましょう。
まず下記の部分はお決まりのものですが、
アクターとして動作させるには Phluxor\ActorSystem\Message\ActorInterface を実装する必要があります。

<?php

class RestApiActor implements ActorInterface
{

    public function receive(ContextInterface $context): void 
// 略
}

コンストラクタにあるのは BootAppActor がコールするときに渡すオブジェクトで、
コネクションプールと永続化に利用する設定です。

spawnUserActorを軽く見ていきましょう。

    public function spawnUserActor(
        ContextInterface $context,
        Ref $readModelUpdater,
        Closure $provider,
        mixed $msg
    ): SpawnResult {
        $stateProvider = $provider(3);
        return $context->spawnNamed(
            Props::fromProducer(fn() => new UserActor($readModelUpdater),
                Props::withReceiverMiddleware(
                    new EventSourcedReceiver(
                        $stateProvider
                    )
                )
            ),
            sprintf("user:%s", $msg->email)
        );
    }

そんなに大した処理はありませんが、ここはユーザーアクターつまりユーザー作成を担当するアクター生成に関するものです。
ここのユーザーとは いわゆるログインやいろんなものを担当する包括した巨大なユーザーではなく、
あくまでユーザー作成に関するユーザーとなり、
かなり小さい範囲のものになります。

このユーザー作成アクターの状態を保存することで作成済みユーザーかどうかなどの判定に利用できたり、
もしくは作成済みユーザーリスト集約の生成に利用したりできるようになります。
集約がアクターによる管理なる、そんなイメージをするとよいかと思います。
ここでは一つ一つをアクターとしていますが、実際にはもっと設計すると良いと思います。
(簡単に体験できるようにこの単位にしています)

これだけが全てではなく、色んな方法があるので実際にアクターモデルを使ってみてください。

当たり前ですがたくさん作るとメモリを沢山消費しますので、しっかりと設計しましょう。

つまりこのユーザー作成アクターは必ず一意である必要があり、そのための識別子が必要となります。
ここではメールアドレスがユニークなものとして、
ユーザー作成アクター生成時にspawnNamedを利用して任意の名前をつけ、
メールアドレスを識別子として利用するようにしています。

なおアクターは位置透過性もサポートされますので、
このアクターがローカルにあったりリモートにあったりしても必ず一意なものとなります。
(Phluxorのリモートとクラスタは現在実装中)

このようにすることで同じ名前指定時は作成済みのアクターと判断され、
一意のアクターが保証されます。

下記の部分はユーザー作成アクター永続化利用のための設定です。

<?php

Props::withReceiverMiddleware(
    new EventSourcedReceiver(
        $stateProvider
    )
)

stateProviderは下記の通りで、イベントストアとしてMySQLを利用し
snapshot保管のタイミングはイベント3回に対して1回保管する、という意味になります。

$stateProvider = $provider(3);

次に下記の部分を見てみます。

<?php

    public function receive(ContextInterface $context): void
    {
        $msg = $context->message();
        switch (true) {
            case $msg instanceof Started:
                $this->readModelUpdater = $context->spawn(
                    Props::fromProducer(
                        fn() => new ReadModelUpdateActor(new RegisteredUser($this->proxy))
                    )
                );
                break;
            case $msg instanceof CreateUser:
                $provider = $this->providerState;
                $result = $this->spawnUserActor($context, $this->readModelUpdater, $provider, $msg);
                if ($result->isError() instanceof NameExistsException) {
                    $context->send(
                        $msg->ref,
                        new UserCreateError(
                            sprintf("user %s already exists", $msg->userName)
                        )
                    );
                    return;
                }
                $context->send($result->getRef(), $msg);
                break;
        }
    }

RestApiアクターが起動すると、receive メソッドの $contextにたくさんの情報が流れてきます。
情報の中からメッセージへアクセスするには $context->message() を利用します。

PHPにはGenericsなどはありませんので、
switchやif、matchなど好きなものを使ってメッセージの型に合わせて処理を記述します。

一つ一つの処理はクラスなどを利用しても構いませんが、
アクターは独立して並行で動作しますので単純なシングルトンでの共有などはできません

またアクターを通常のメソッドコールのように外部から強制的に叩くこともできませんので注意してください。
このあたりはアクターを使ったプログラミングパラダイムに慣れる必要があります。
中身の状態が知りたいときは状態を尋ねるメッセージなどを用意し、
送り返す仕組みが必要になります。

話を戻しましょう。

条件分岐の1つ目、Phluxor\ActorSystem\Message\Started に対応した処理です。
このメッセージはアクターが起動すると必ず流れてくるメッセージです。
アクターを再起動させても流れてきます(アクターは任意で停止させたり再起動させたりできます)

この仕組みを使ってリードモデル更新アクターをここで生成しています。
(アクター生成は他の場所でもOKですがメリット・デメリットがありますので調べてみるといいでしょう)

<?php

$this->readModelUpdater = $context->spawn(
    Props::fromProducer(
        fn() => new ReadModelUpdateActor(new RegisteredUser($this->proxy))
    )
);

RegisteredUserはQueryのためのデータ成形結果を保存するテーブル操作に関するものです。
このテーブルは永続化で軽く触れた2つのテーブルは利用しません。
完全に別のテーブルを利用します(他のデータストレージでもOK)

次に App\Command\CreateUser に対応した処理です。

<?php

            case $msg instanceof CreateUser:
                $provider = $this->providerState;
                $result = $this->spawnUserActor($context, $this->readModelUpdater, $provider, $msg);
                if ($result->isError() instanceof NameExistsException) {
                    $context->send(
                        $msg->ref,
                        new UserCreateError(
                            sprintf("user %s already exists", $msg->userName)
                        )
                    );
                    return;
                }
                $context->send($result->getRef(), $msg);
                break;

ここではメールアドレスで識別されるユーザー作成アクターの生成を実行しています。
同じメールアドレスを指定すると、NameExistsException が返却されますので、
生成済みかどうかを判定する事ができます。
ただし復元した瞬間は判断できませんので、実際にはこことユーザー作成アクターの両方で判断することが必要になります。
(アクター・イベントの復元はアクター生成時に行われるため)

アクターが生成済みの場合は UserCreateError メッセージが返却されます。
CreateUser は返信先が指定されたメッセージとなっていますので、sendの第1引数を使って $msg-ref 宛に返信します。

アクターが生成されるとメッセージを受け付ける状態になりますので(メールボックスが起動する)
$context->send($result->getRef(), $msg); で生成されたユーザー作成アクターにメッセージを送信しています。

これで下記くらいが用意されました。

graph TD;
    Mezzio(Mezzio) -.- Phluxor(Phluxor);
    Phluxor --> |Spawn| RestAPIActor(RestAPI Actor);
    RestAPIActor(RestAPI Actor) --> |Spawn| UserRegistration/UserActor(User Registration Actor);
    RestAPIActor(RestAPI Actor)  --> |Spawn| ReadModelUpdateActor(Read Model Update Actor);
    Persistence <--> MySQL(MySQL);

リードモデル更新アクターを実装しよう

リードモデル更新アクターは非常に簡単です。

<?php

declare(strict_types=1);

namespace App\ActorSystem\UserRegistration;

use App\Database\RegisteredUser;
use App\Event\ProtoBuf\UserCreated;
use Phluxor\ActorSystem\Context\ContextInterface;
use Phluxor\ActorSystem\Message\ActorInterface;

readonly class ReadModelUpdateActor implements ActorInterface
{
    public function __construct(
        private RegisteredUser $registeredUser
    ) {
    }

    public function receive(ContextInterface $context): void
    {
        $msg = $context->message();
        if ($msg instanceof UserCreated) {
            $row = $this->registeredUser->findByEmail($msg->getEmail());
            if (!$row) {
                $result = $this->registeredUser->addUser($msg->getUserID(), $msg->getUserName(), $msg->getEmail());
                if (!$result) {
                    $context->logger()->error(
                        'failed to add user',
                        [
                            'user_id' => $msg->getUserID(),
                            'user_name' => $msg->getUserName(),
                            'email' => $msg->getEmail()
                        ]
                    );
                }
            }
        }
        // no send message
        // read model update actor is a final actor
    }
}

このリードモデル更新アクターとして作用するアクターは、このアクターシステム内では一つしかありません。

他のアクターと同様にメッセージを受け取り、Queryのためのデータ成形を行うだけですので
伝統的なアプリケーションと同じように実装するだけです。

ここで流れてくるのはイベントとなりますので、このイベントを加工して保存するだけのものです。
前述の通りここで操作するテーブルはjournals、snapshots以外のものになりますので、
事実を変更するようなものはありません。

任意の処理で色々できますので、いわゆるPubSubで受けるConsumerのような感じだと思っていただけるといいかなと思います。
成形の完了は返信する必要はなく、
Queryとして作用するAPIエンドポイントなどを通じてリストが表示されればよい、という具合です。

ポイントとして、アクター内のメッセージは at-most-once となります。
受け取り側の保証などはしていません(自分で実装する必要があります)ので、必要であれば独自で作り込む必要があります。
これは多くのアクターシステムと共通です

ヒエラルキー等を理解すれば再送の伝播などが簡単にできるのは理解できると思いますが、
同じようにQuery側で利用済みの過去のイベントも流れてきますので、
バージョンなどを利用したり工夫してみてください(のでここでは単純なデータ存在確認を実行しています)

現在はここまで実装できました。

graph TD;
    Mezzio(Mezzio) -.- Phluxor(Phluxor);
    Phluxor --> |Spawn| RestAPIActor(RestAPI Actor);
    RestAPIActor(RestAPI Actor) --> |Spawn| UserRegistration/UserActor(User Registration Actor);
    RestAPIActor(RestAPI Actor)  --> |Spawn| ReadModelUpdateActor(Read Model Update Actor);
    Persistence <--> MySQL(MySQL);
    ReadModelUpdateActor(Read Model Update Actor)  <--> MySQL(MySQL);

永続化するユーザー作成アクターを実装しよう

最後にアクターの永続化です。

アクターはステートフルとなりますので、
private ?UserCreated $state = null; がアクターの状態を示すもので、
かつProtocol Buffersを利用したもの、となります。

<?php

declare(strict_types=1);

namespace App\ActorSystem\UserRegistration;

use App\Command\CreateUser;
use App\Event\ProtoBuf\UserCreated;
use App\Message\UserCreateError;
use App\Message\UserCreateResponse;
use Google\Protobuf\Internal\Message;
use Phluxor\ActorSystem\Context\ContextInterface;
use Phluxor\ActorSystem\Message\ActorInterface;
use Phluxor\ActorSystem\Ref;
use Phluxor\Persistence\Message\ReplayComplete;
use Phluxor\Persistence\Message\RequestSnapshot;
use Phluxor\Persistence\PersistentInterface;
use Phluxor\Persistence\Mixin;
use Symfony\Component\Uid\Ulid;

class UserActor implements ActorInterface, PersistentInterface
{
    use Mixin;

    private ?UserCreated $state = null;
    private $version = 0;

    public function __construct(
        private readonly Ref $readModelUpdater
    ) {
    }

    public function receive(ContextInterface $context): void
    {
        $msg = $context->message();
        switch (true) {
            case $msg instanceof RequestSnapshot:
                // save the state
                if ($this->state != null) {
                    $this->persistenceSnapshot($this->state);
                }
                break;
            case $msg instanceof ReplayComplete:
                if ($this->state != null) {
                    $context->logger()->info(
                        sprintf('Replay complete for %s', $this->state->serializeToJsonString())
                    );
                }
                break;
            case $msg instanceof CreateUser:
                if ($this->isStateExists($msg->email)) {
                    $context->send($msg->ref, new UserCreateError('user already exists'));
                    return;
                }
                // サンプルでは省いていますがversion を利用して楽観的ロックを実現することができます
                $id = Ulid::generate();
                $ev = new UserCreated([
                    'userID' => $id,
                    'email' => $msg->email,
                    'userName' => $msg->userName,
                    'version' => $this->version++,
                ]);
                $this->persist($ev, $context);
                $context->send($msg->ref, new UserCreateResponse($id));
                $context->stop($context->self());
                break;
            case $msg instanceof Message:
                // event がリプレイされた場合は状態を更新する
                if ($msg->serializeToJsonString() != '') {
                    $this->persist($msg, $context);
                }
                break;
            default:
        }
    }

    private function isStateExists(string $email): bool
    {
        if ($this->state === null) {
            return false;
        }
        return $this->state->getEmail() == $email;
    }

    private function persist(Message $msg, ContextInterface $context): void
    {
        if (!$this->recovering()) {
            $this->persistenceReceive($msg);
        }
        if ($msg instanceof UserCreated) {
            $this->state = $msg;
            $context->send($this->readModelUpdater, $msg);
        }
    }
}

アクターを永続化する場合は、Phluxor\Persistence\PersistentInterface インターフェースを実装する必要があります。
が、Phluxor\Persistence\Mixin トレイトに実装済みメソッドがありますのでこのトレイトを利用します。

これを利用することでスナップショット取得の計算や永続化などが実行されます。

これまで同様にメッセージに対応した処理をみていきましょう。

最初に Phluxor\Persistence\Message\RequestSnapshot メッセージに対応した処理です。

<?php

            case $msg instanceof RequestSnapshot:
                // save the state
                if ($this->state != null) {
                    $this->persistenceSnapshot($this->state);
                }
                break;

これは永続化を利用する際に、snapshotのタイミングで内部的に送られてくるメッセージとなっています。
このメッセージが来たタイミングでスナップショット保管を行います。
persistenceSnapshot で現在の状態がスナップショットとして扱われます。

サンプルでは省いてますが、アクターとはいえ楽観ロックなどは必要ですのでversionなどを利用してください。

<?php

            case $msg instanceof ReplayComplete:
                if ($this->state != null) {
                    $context->logger()->info(
                        sprintf('Replay complete for %s', $this->state->serializeToJsonString())
                    );
                }
                break;

この例では大事な意味を持ちませんが、
Phluxor\Persistence\Message\ReplayComplete は状態復元が完了すると流れてくるメッセージとなっています。
journalsとsnapshotsを使ってアクターの復元を自動で行いますが、
アクターが復元されたタイミングが取得できます。

繰り返しになりますが、アクターの状態復元は、
基本的にはアクター再生成時(このため任意の名前をつけておくのがポイント)となりますので
停止からの再起動や、時間がたったあとに再生成したり、
アクターがクラッシュした場合の自動復旧(Supervisor Strategyによる)などがあります。

次に App\Command\CreateUser に対応した処理です。

<?php

            case $msg instanceof CreateUser:
                if ($this->isStateExists($msg->email)) {
                    $context->send($msg->ref, new UserCreateError('user already exists'));
                    return;
                }
                // サンプルでは省いていますがversion を利用して楽観的ロックを実現することができます
                $id = Ulid::generate();
                $ev = new UserCreated([
                    'userID' => $id,
                    'email' => $msg->email,
                    'userName' => $msg->userName,
                    'version' => $this->version++,
                ]);
                $this->persist($ev, $context);
                $context->send($msg->ref, new UserCreateResponse($id));
                $context->stop($context->self());
                break;

内部でコールされているメソッドは下記のとおりです。

<?php

    private function isStateExists(string $email): bool
    {
        if ($this->state === null) {
            return false;
        }
        return $this->state->getEmail() == $email;
    }

    private function persist(Message $msg, ContextInterface $context): void
    {
        if (!$this->recovering()) {
            $this->persistenceReceive($msg);
        }
        if ($msg instanceof UserCreated) {
            $this->state = $msg;
            $context->send($this->readModelUpdater, $msg);
        }
    }

大した処理ではありませんが、
isStateExists は状態をもっているかどうかの判断です。
このアクター例はユーザー作成に関するものとなっていますので、
送られてきたメールアドレスが自分のアドレスと同じものの場合は
作成済みであることを返却し、自身はなにもしないために利用するものです。

persist は状態保存のためのメソッドです。

PHPではProtocol Buffersを使ったメッセージは Google\Protobuf\Internal\Message を継承していますので、
このメッセージのみを保存します。
recovering はリカバリ中、つまり復元中の状態にあるかどうかで
復元中は重複して状態を保管しないようにする必要があります。

復元中でなければ送られてきたメッセージは 永続化対象として保存されます persistenceReceive

<?php

        if ($msg instanceof UserCreated) {
            $this->state = $msg;
            $context->send($this->readModelUpdater, $msg);
        }

この部分は送られてきたメッセージ、メッセージを自分の状態としている処理です。
そして自分の状態として利用したメッセージをリードモデル更新アクターの送信する、
という流れです。

処理内容に話を戻しましょう。

<?php

                $id = Ulid::generate();
                $ev = new UserCreated([
                    'userID' => $id,
                    'email' => $msg->email,
                    'userName' => $msg->userName,
                    'version' => $this->version++,
                ]);
                $this->persist($ev, $context);
                $context->send($msg->ref, new UserCreateResponse($id));
                $context->stop($context->self());

App\Command\CreateUserを受け取り、
メッセージを利用して、App\Event\ProtoBuf\UserCreatedを自身の状態・状態を変更したイベントとして利用します。
状態変更ができたことを App\Command\CreateUser に指定された送信先にメッセージを送り、
リードモデル更新アクターにもメッセージを送信しています。(persist内部で)

最後にstopで自身のアクターを停止しています。

最後に下記のものです。

<?php

            case $msg instanceof Message:
                // event がリプレイされた場合は状態を更新する
                if ($msg->serializeToJsonString() != '') {
                    $this->persist($msg, $context);
                }
                break;

復元時は永続化したアクターの状態が頭から読み出されるようになっていますので、
このメッセージを受けて状態を一つずつ反映
+状態(イベント)を利用するリードモデル更新アクターに送信(再送信という形になります)しています。

ここまでの流れでわかるようにイベントをデータソースとして状態を復元し、
任意で伝播させているのがわかると思います。

Phluxorはアクターモデルのツールキットですので、
他言語のツールキット同様にすべて並行で意図したように実行されます。

だんだん完成に近づいてきました。

graph TD;
    Mezzio(Mezzio) -.- Phluxor(Phluxor);
    Phluxor --> |Spawn| RestAPIActor(RestAPI Actor);
    Phluxor --> |Spawn| TypedChannelActor(Stream Actor);
    RestAPIActor(RestAPI Actor) --> |Spawn| UserRegistration/UserActor(User Registration Actor);
    RestAPIActor(RestAPI Actor)  --> |Spawn| ReadModelUpdateActor(Read Model Update Actor);
    UserRegistration/UserActor <--> |Event Sourcing/Replay| Persistence(Persistence);
    Persistence <--> MySQL(MySQL);
    ReadModelUpdateActor(Read Model Update Actor)  <--> MySQL(MySQL);

ユーザー作成コマンド送信エンドポイントを見てみよう

POST /user/registration 的なものに対応するエンドポイント例です。
mezzioからは App\ActorSystem\AppActor でアクターシステムを参照できる状態になっていますので、
これを使ってユーザー作成を依頼しています。

<?php

declare(strict_types=1);

namespace App\Handler\UserRegistration;

use App\ActorSystem\AppActor;
use App\Command\CreateUser;
use App\Message\UserCreateError;
use App\Message\UserCreateMessageInterface;
use App\Message\UserCreateResponse;
use Laminas\Diactoros\Response\JsonResponse;
use Phluxor\ActorSystem\Channel\TypedChannel;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Psr\Http\Server\RequestHandlerInterface;

readonly class CreateUserHandler implements RequestHandlerInterface
{
    public function __construct(
        private AppActor $appActor
    ) {
    }

    public function handle(ServerRequestInterface $request): ResponseInterface
    {
        $params = $request->getParsedBody();
        if (!array_key_exists("email", $params) && !array_key_exists("username", $params)) {
            return new JsonResponse(['message' => 'missing email or username'], 400);
        }
        $system = $this->appActor->system;
        $c = new TypedChannel(
            $system,
            fn(mixed $message): bool => $message instanceof UserCreateMessageInterface
        );
        $system->root()->send(
            $this->appActor->actorRef,
            new CreateUser($params['email'], $params['username'], $c->getRef())
        );
        $result = $c->result();
        return match (true) {
            $result instanceof UserCreateResponse => new JsonResponse(
                ['message' => 'success', 'user' => $result->userID],
                200
            ),
            $result instanceof UserCreateError => new JsonResponse(
                ['message' => 'user already exists'],
                400
            ),
            default => new JsonResponse(['message' => 'failed'], 400),
        };
    }
}

コマンド送信に該当するのは下記の部分です。

<?php

        $c = new TypedChannel(
            $system,
            fn(mixed $message): bool => $message instanceof UserCreateMessageInterface
        );
        $system->root()->send(
            $this->appActor->actorRef,
            new CreateUser($params['email'], $params['username'], $c->getRef())
        );

Phluxor\ActorSystem\Channel\TypedChannel はアクターに流れてくるメッセージから、
任意の型メッセージを取り出すものですが、これもアクターになっていますので
独自に実装したアクターを使っても構いません。
PhluxorではFutureを使って返却を直接受け取る事もできます。

すべてのアクターは生成時に返却される Phluxor\ActorSystem\Ref さえ利用できれば、
どこにでもメッセージを送ることができます。
アクターの名前が決まっていれば、Refオブジェクトを直接生成しても構いません。
これはアクター利用例の一つだと思ってください。

これで下記の状態になります。

graph TD;
    Mezzio(Mezzio) -.- Phluxor(Phluxor);
    Phluxor --> |Spawn| RestAPIActor(RestAPI Actor);
    Phluxor --> |Spawn| TypedChannelActor(Stream Actor);
    TypedChannelActor(TypedChannel Actor) --> Mezzio(Mezzio);
    RestAPIActor(RestAPI Actor) --> |Spawn| UserRegistration/UserActor(User Registration Actor);
    RestAPIActor(RestAPI Actor)  --> |Spawn| ReadModelUpdateActor(Read Model Update Actor);
    UserRegistration/UserActor --> |Send| TypedChannelActor(TypedChannel Actor);
    UserRegistration/UserActor <--> |Event Sourcing/Replay| Persistence(Persistence);
    Persistence <--> MySQL(MySQL);
    ReadModelUpdateActor(Read Model Update Actor)  <--> MySQL(MySQL);

ユーザー作成結果は
App\Message\UserCreateMessageInterface を実装した成功か失敗のメッセージが送られてきますので、
それを受け取っているだけの処理です。

つまり度々登場しているRef、App\Command\CreateUser を処理するアクターはこのアクターに返信する、という意味です。
順を追って読み解いていくと、そんなに難しい処理ではないのがわかります。

あとは返信されたメッセージを使ってHTTPのレスポンスを返しているだけですので、
Webアプリケーション側は非常に薄い処理となっています。

ユーザー作成済みリストエンドポイントを見てみよう

ついでにユーザー作成済みリスト返却の処理はもっと簡単です。
表示のための成形は済んでいますので、ただのDTOで十分なものになっています。
レイヤ構造にする必要もないくらいのものになります。

一番シンプルなこの部分の実装となります。

graph TD;
    Mezzio(Mezzio) <--> |DTO|MySQL(MySQL);
    Mezzio(Mezzio) --> |HTTP Response|Response(JSON);
<?php

declare(strict_types=1);

namespace App\Handler\UserRegistration;

use App\Query\RegistrationUser;
use Laminas\Diactoros\Response\JsonResponse;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Psr\Http\Server\RequestHandlerInterface;

readonly class ListHandler implements RequestHandlerInterface
{
    public function __construct(
        private RegistrationUser $registrationUser,
    ) {
    }

    public function handle(ServerRequestInterface $request): ResponseInterface
    {
        $result = $this->registrationUser->findAll();
        return match (true) {
            $result !== null => new JsonResponse($result, 200),
            default => new JsonResponse(['message' => 'unknown'], 400),
        };
    }
}

長編でしたが、いかがでしたか?
Phluxorはアクターモデルを導入するための最小のツールキットというのが理解できたと思います。
(今回は紹介してませんがアクター再起動の設定や、エラーのエスカレーション、Become/Unbecome、ルーターなどもあります)

あとはサンプルコードなどを見ながら実際に実装してみてください。
(長くなってきたので最後は雑)

Webアプリケーションを作りながら
アクターモデルを導入することができますので
特定のアクターを物理的に別サーバの配置して負荷分散したり、
マイクロサービス的な実装をPHPだけで実装することができるようになります。
(クラスタの機能は実装中です)

他にも色々な使い方を導入することで、
これまでPHPだけでは実現が難しかった手法なども
表現できるようになると思いますので、
ぜひ色々トライして体験してみてください。

リトライ可能なタスクランナー的なものや、メッセージングに関するミドルウェア、
ブロックチェーンの仕組みも作ることができます。
他にも色々ありますので、AkkaやActor Model、Earlang OTPなどをネットで検索してみてください。

とはいえ、書くのがめんどうなので趣味でakka-projectionみたいなのとかも作ろうかな・・