PHPでES+CQRS
PHPで完全なES+CQRSを表現するには、CDC(Change Data Capture)+Outbox形式にするのが定番でした。
単純なQueueではリプレイができず、
伝播させて終わり、いわゆるEvent Streamingの形になってしまうことが多いと思います。
とはいえCDC+Outboxでも環境によってはN層コミットがどうしても発生してしまいます。
ではCDC+Outboxを使わずにES+CQRSを実現するにはどのような方法があるのでしょうか?
ということで早速他言語でもお馴染みのアクターモデルを導入して実装してみましょう。
アクターモデル自体を知らないと本エントリは少しばかり難しいと思いますが、
ChatGPTなりClaudeなどに解説してもらってください。
アクターは並行で動作してメッセージのみでやり取りを行い、
ローカルでも物理的な別サーバでもk8s上で散らばったものでも、
場所関係なくアクターを利用することができるものです。
単純なQueueではありません(内部にMailboxというQueueがありますがそれ自体ではないです)。
今回はめちゃくちゃ長いので覚悟して読みましょう。
アクターモデル導入でES+CQRSを体験
PHPでアクターモデルを導入するにはほぼ選択肢がありません。
ので、拙作のPhluxor を使って解説していきます。
例としてユーザーの作成とユーザー一覧みたいなものをテーマに進めていきましょう。
サンプルコードは下記
作るものはこんな感じのイメージです(実際にはちょっと異なりますが)
Command
graph TD; Mezzio(Mezzio) -.- Phluxor(Phluxor); Phluxor --> |Spawn| RestAPIActor(RestAPI Actor); Phluxor --> |Spawn| TypedChannelActor(Stream Actor); TypedChannelActor(TypedChannel Actor) --> Mezzio(Mezzio); RestAPIActor(RestAPI Actor) --> |Spawn| UserRegistration/UserActor(User Registration Actor); RestAPIActor(RestAPI Actor) --> |Spawn| ReadModelUpdateActor(Read Model Update Actor); UserRegistration/UserActor --> |Send| TypedChannelActor(TypedChannel Actor); UserRegistration/UserActor <--> |Event Sourcing/Replay| Persistence(Persistence); Persistence <--> MySQL(MySQL); ReadModelUpdateActor(Read Model Update Actor) <--> MySQL(MySQL);
Query
graph TD; Mezzio(Mezzio) <--> |DTO|MySQL(MySQL); Mezzio(Mezzio) --> |HTTP Response|Response(JSON);
サンプルなのでデータベースはMySQLのみですが、
イベントの保管やアプリケーションで必要なデータはテーブル単位で分け、
共有しない状態になっています。
実際には別のデータベースで問題ありません。
アクターシステムを起動しよう
早速ですが、Webアプリケーション形式で動かす場合、アクターモデルは基本的に並行処理が前提になりますので
coroutineに対応したものを利用していきます。
mezzio以外でも動作させることはできますが、
Laravel Octaneはcoroutineが無効になっていますので
Laravelですんなり導入できるかどうかは試していません。
ぜひ試してみてください。
mezzioの場合、
アプリケーションサーバとして起動すると WorkerStartEvent
イベントが送られてきますので、
これをListenしてアクターシステムを起動できます。
こうすることでアプリケーションサーバが起動中はアクターシステムが常時起動したままの状態になります。
一般的なPHPアプリケーションのようにリクエストを受ける度にアクターシステムを起動すると
並行処理としてもリソースが勿体なく、パフォーマンスにも悪いため起動中はアクターシステムは常時動いたままにしておきましょう。
(あとに解説しますが、アクターシステム起動時にすべて元通りに復元されますので安心してデプロイできます)
<?php declare(strict_types=1); namespace App\ActorSystem; use Closure; use Laminas\ServiceManager\ServiceManager; use Mezzio\Swoole\Event\WorkerStartEvent; use Phluxor\ActorSystem; use Phluxor\ActorSystem\Props; use Phluxor\Persistence\Mysql\DefaultSchema; use Phluxor\Persistence\Mysql\MysqlProvider; use Phluxor\Persistence\ProviderInterface; use Phluxor\Persistence\ProviderStateInterface; use Psr\Container\ContainerInterface; use Psr\Log\LoggerInterface; use RuntimeException; use Swoole\Database\PDOProxy; readonly class BootAppActor { public function __construct( private ContainerInterface $container ) { } public function __invoke(WorkerStartEvent $event): void { $system = ActorSystem::create(); $proxy = $this->container->get(PDOProxy::class); $spawned = $system->root()->spawnNamed( Props::fromProducer( fn() => new RestApiActor( $proxy, $this->stateProvider($proxy, $system->getLogger()), ) ), AppActor::NAME ); if ($this->container instanceof ServiceManager) { $this->container->setService( AppActor::class, new AppActor($system, $spawned->getRef()) ); return; } throw new RuntimeException('Container is not a ServiceManager'); } /** * @param PDOProxy $proxy * @param LoggerInterface $logger * @return Closure(int): ProviderStateInterface&ProviderInterface */ private function stateProvider( PDOProxy $proxy, LoggerInterface $logger ): Closure { return function (int $snapshotInterval) use ($proxy, $logger) { return new MysqlProvider( $proxy, new DefaultSchema(), $snapshotInterval, $logger ); }; } }
コンテナへの登録方法はいくつかありますが、
ここでは Laminas\ServiceManager\ServiceManager
前提のコードとなっています。
dependencies.global.php
や、App\ConfigProvider
を使っていないのはなぜかというと、
それらを使った通常の依存解決方法はリクエストの度に実行されるため
前述したようにリソースの効率が悪いからです。。
ここはPHPということもあって仕方がありません。
少しコードの解説をしましょう。
まずは下記の部分です。
<?php public function __invoke(WorkerStartEvent $event): void { $system = ActorSystem::create(); $proxy = $this->container->get(PDOProxy::class); $spawned = $system->root()->spawnNamed( Props::fromProducer( fn() => new RestApiActor( $proxy, $this->stateProvider($proxy, $system->getLogger()), ) ), AppActor::NAME ); if ($this->container instanceof ServiceManager) { $this->container->setService( AppActor::class, new AppActor($system, $spawned->getRef()) ); return; } throw new RuntimeException('Container is not a ServiceManager'); }
まずmezzioのイベントをListenするには __invoke
を実装する必要があります。
ActorSystem::create()
はPhluxorのアクターシステム起動のためのコードです。
$proxy = $this->container->get(PDOProxy::class);
は、
コンテナからデータベースアクセスのインスタンスを取得しています(Queryでも利用)。
ここでのデータベース接続には持続接続(Pool)を利用しています。
<?php $spawned = $system->root()->spawnNamed( Props::fromProducer( fn() => new RestApiActor( $proxy, $this->stateProvider($proxy, $system->getLogger()), ) ), AppActor::NAME );
この部分はルートアクターの定義です。
当ブログでも書いているGoで実践するアクターモデルシリーズにも詳細がありますが、
アクターはヒエラルキーを形成します。
その際にトップレベルに位置するアクターを定義しなければなりません。
ここではこのアプリケーションと合わせてRestApiアクターとしています。
spawnNamedは、アクターに意図した名前を付与するためのものです。
(指定がなければ自動で適当な名前が割り当てられます)
Props::fromProducer
はアクターの設定を割り当てるためのものです。
RestApiActorの第2引数は、アクター永続化のためのデータストレージの設定です。
Phluxorではアクターの永続化にインメモリ、MySQLをデフォルトで用意しています。
(PostgreSQLにはすぐ対応する予定)
<?php /** * @param PDOProxy $proxy * @param LoggerInterface $logger * @return Closure(int): ProviderStateInterface&ProviderInterface */ private function stateProvider( PDOProxy $proxy, LoggerInterface $logger ): Closure { return function (int $snapshotInterval) use ($proxy, $logger) { return new MysqlProvider( $proxy, new DefaultSchema(), $snapshotInterval, $logger ); }; }
MySQLを利用する場合はPhluxor\Persistence\Mysql\MysqlProvider
を利用し、
データベースアクセスや、ログ、スナップショットのタイミングを指定します。
Phluxor\Persistence\Mysql\DefaultSchema
はデータベース定義情報になっています。
Phluxorでは永続化に2つのテーブルを利用します。
一つはjournals、
もう一つはsnapshots です。
Event Sourcingをする際、都度イベントを記録する仕組みが必要になります。
イベントは真のデータソースになる必要があり、どの時点にでも復元できなければなりません。
このため更新か作成か、という方法は利用できません(最新しか残らないため)。
復元時に頭からリプレイして戻す事ができますが、
大量にある場合は効率が悪くなるため、
ある時点をスナップショットとして保管してそこから復元するようになっています。
これで下記の様になります。
graph TD; Mezzio(Mezzio) -.- Phluxor(Phluxor); Phluxor --> |Spawn| RestAPIActor(RestAPI Actor);
イベントを永続化しよう!
イベントは過去におきた事実となるわけですが、
アクターはステートフルで最新のイベントの結果が今のアクター、と考えておくと良いでしょう。
誰でも誕生日を迎えることで1歳ずつ歳を取り、
学校の卒業だったり、引っ越しだったり様々なライフイベントがあり今の皆さんになっているわけですが、
思い出すこともできれば、今の状態にも戻れます。
アクターもそれと同じです。
状態を保存する場合、
他言語のアクターモデル ツールキット(Akka / Pekko / Proto Actor)と同様に、
イベントを永続化する場合は自由な型のJSONではなくProtocol Buffersを利用します。
syntax = "proto3"; package protobuf; option php_namespace = "App\\Event\\ProtoBuf"; option php_metadata_namespace = "App\\Event\\Metadata"; message UserCreated { string userName = 1; string email = 2; string userID = 3; int64 version = 4; }
protoc
コマンドを利用するとコードが生成されます。
メッセージをいくつか
システムへの指示をコマンドとして下記のようなものを用意します。
<?php declare(strict_types=1); namespace App\Command; use Phluxor\ActorSystem\Ref; readonly class CreateUser { public function __construct( public string $email, public string $userName, public Ref $ref ) { } }
これはユーザー作成の指示です。
第3引数にある Phluxor\ActorSystem\Ref
はユーザー作成完了したことを受け取りたいアクターとなっています。
アクターの組み合わせ方は色々ありますので、色々試してみてください。
あとはシステムからの応答メッセージです。
ユーザー作成の成功と失敗を表現したものです。
<?php declare(strict_types=1); namespace App\Message; readonly class UserCreateResponse implements UserCreateMessageInterface { public function __construct( public string $userID ) { } public function isSuccess(): bool { return true; } }
<?php declare(strict_types=1); namespace App\Message; readonly class UserCreateError implements UserCreateMessageInterface { public function __construct( public string $message ) { } public function isSuccess(): bool { return false; } }
メッセージの流れとしては、Webアプリケーション(mezzio)側から CreateUser
をアクターシステムに送信します。
アクターはこのコマンドを受け取り、Protocol Buffersで定義した UserCreated
イベントを発行して
アクターの状態として保存します(アクターが作成ユーザーとなる)。
そのイベントのままWebアプリケーション側にイベントを返却しても良いのですが、
生成できたかどうかを判断できるように、UserCreateResponse
または UserCreateError
(名前がイマイチですが・・・)を
メッセージとして送るようにします。
この状態(イベント)をQueryとして利用するためには、リードモデルを作るための仕組みが必要になります。
今回はリードモデル更新アクターを生成し、Queryのためのデータ成形を依頼するようにします。
アクターの状態保存以外はアクター間のメッセージパッシングになりますので、
ここではデータベース以外のミドルウェアは一切登場しません。
尚、これらのメッセージングはすべて並行で動作します。
RestApiActorを実装しよう
ある程度の流れがわかったところで、
ルートアクターとして作用するRestApiActorの中身を実装していきます。
<?php declare(strict_types=1); namespace App\ActorSystem; use App\ActorSystem\UserRegistration\ReadModelUpdateActor; use App\ActorSystem\UserRegistration\UserActor; use App\Command\CreateUser; use App\Database\RegisteredUser; use App\Message\UserCreateError; use Closure; use Phluxor\ActorSystem\Context\ContextInterface; use Phluxor\ActorSystem\Exception\NameExistsException; use Phluxor\ActorSystem\Message\ActorInterface; use Phluxor\ActorSystem\Message\Started; use Phluxor\ActorSystem\Props; use Phluxor\ActorSystem\Ref; use Phluxor\ActorSystem\SpawnResult; use Phluxor\Persistence\EventSourcedReceiver; use Phluxor\Persistence\ProviderInterface; use Phluxor\Persistence\ProviderStateInterface; use Swoole\Database\PDOProxy; class RestApiActor implements ActorInterface { private ?Ref $readModelUpdater = null; /** * @param Closure(int): ProviderStateInterface&ProviderInterface $providerState */ public function __construct( private readonly PDOProxy $proxy, private readonly Closure $providerState ) { } public function receive(ContextInterface $context): void { $msg = $context->message(); switch (true) { case $msg instanceof Started: $this->readModelUpdater = $context->spawn( Props::fromProducer( fn() => new ReadModelUpdateActor(new RegisteredUser($this->proxy)) ) ); break; case $msg instanceof CreateUser: $provider = $this->providerState; $result = $this->spawnUserActor($context, $this->readModelUpdater, $provider, $msg); if ($result->isError() instanceof NameExistsException) { $context->send( $msg->ref, new UserCreateError( sprintf("user %s already exists", $msg->userName) ) ); return; } $context->send($result->getRef(), $msg); break; } } public function spawnUserActor( ContextInterface $context, Ref $readModelUpdater, Closure $provider, mixed $msg ): SpawnResult { $stateProvider = $provider(3); return $context->spawnNamed( Props::fromProducer(fn() => new UserActor($readModelUpdater), Props::withReceiverMiddleware( new EventSourcedReceiver( $stateProvider ) ) ), sprintf("user:%s", $msg->email) ); } }
これもコード解説をしていきましょう。
まず下記の部分はお決まりのものですが、
アクターとして動作させるには Phluxor\ActorSystem\Message\ActorInterface
を実装する必要があります。
<?php class RestApiActor implements ActorInterface { public function receive(ContextInterface $context): void // 略 }
コンストラクタにあるのは BootAppActor
がコールするときに渡すオブジェクトで、
コネクションプールと永続化に利用する設定です。
spawnUserActorを軽く見ていきましょう。
public function spawnUserActor( ContextInterface $context, Ref $readModelUpdater, Closure $provider, mixed $msg ): SpawnResult { $stateProvider = $provider(3); return $context->spawnNamed( Props::fromProducer(fn() => new UserActor($readModelUpdater), Props::withReceiverMiddleware( new EventSourcedReceiver( $stateProvider ) ) ), sprintf("user:%s", $msg->email) ); }
そんなに大した処理はありませんが、ここはユーザーアクターつまりユーザー作成を担当するアクター生成に関するものです。
ここのユーザーとは いわゆるログインやいろんなものを担当する包括した巨大なユーザーではなく、
あくまでユーザー作成に関するユーザーとなり、
かなり小さい範囲のものになります。
このユーザー作成アクターの状態を保存することで作成済みユーザーかどうかなどの判定に利用できたり、
もしくは作成済みユーザーリスト集約の生成に利用したりできるようになります。
集約がアクターによる管理なる、そんなイメージをするとよいかと思います。
ここでは一つ一つをアクターとしていますが、実際にはもっと設計すると良いと思います。
(簡単に体験できるようにこの単位にしています)
これだけが全てではなく、色んな方法があるので実際にアクターモデルを使ってみてください。
当たり前ですがたくさん作るとメモリを沢山消費しますので、しっかりと設計しましょう。
つまりこのユーザー作成アクターは必ず一意である必要があり、そのための識別子が必要となります。
ここではメールアドレスがユニークなものとして、
ユーザー作成アクター生成時にspawnNamed
を利用して任意の名前をつけ、
メールアドレスを識別子として利用するようにしています。
なおアクターは位置透過性もサポートされますので、
このアクターがローカルにあったりリモートにあったりしても必ず一意なものとなります。
(Phluxorのリモートとクラスタは現在実装中)
このようにすることで同じ名前指定時は作成済みのアクターと判断され、
一意のアクターが保証されます。
下記の部分はユーザー作成アクター永続化利用のための設定です。
<?php Props::withReceiverMiddleware( new EventSourcedReceiver( $stateProvider ) )
stateProviderは下記の通りで、イベントストアとしてMySQLを利用し
snapshot保管のタイミングはイベント3回に対して1回保管する、という意味になります。
$stateProvider = $provider(3);
次に下記の部分を見てみます。
<?php public function receive(ContextInterface $context): void { $msg = $context->message(); switch (true) { case $msg instanceof Started: $this->readModelUpdater = $context->spawn( Props::fromProducer( fn() => new ReadModelUpdateActor(new RegisteredUser($this->proxy)) ) ); break; case $msg instanceof CreateUser: $provider = $this->providerState; $result = $this->spawnUserActor($context, $this->readModelUpdater, $provider, $msg); if ($result->isError() instanceof NameExistsException) { $context->send( $msg->ref, new UserCreateError( sprintf("user %s already exists", $msg->userName) ) ); return; } $context->send($result->getRef(), $msg); break; } }
RestApiアクターが起動すると、receive
メソッドの $contextにたくさんの情報が流れてきます。
情報の中からメッセージへアクセスするには $context->message()
を利用します。
PHPにはGenericsなどはありませんので、
switchやif、matchなど好きなものを使ってメッセージの型に合わせて処理を記述します。
一つ一つの処理はクラスなどを利用しても構いませんが、
アクターは独立して並行で動作しますので単純なシングルトンでの共有などはできません。
またアクターを通常のメソッドコールのように外部から強制的に叩くこともできませんので注意してください。
このあたりはアクターを使ったプログラミングパラダイムに慣れる必要があります。
中身の状態が知りたいときは状態を尋ねるメッセージなどを用意し、
送り返す仕組みが必要になります。
話を戻しましょう。
条件分岐の1つ目、Phluxor\ActorSystem\Message\Started
に対応した処理です。
このメッセージはアクターが起動すると必ず流れてくるメッセージです。
アクターを再起動させても流れてきます(アクターは任意で停止させたり再起動させたりできます)
この仕組みを使ってリードモデル更新アクターをここで生成しています。
(アクター生成は他の場所でもOKですがメリット・デメリットがありますので調べてみるといいでしょう)
<?php $this->readModelUpdater = $context->spawn( Props::fromProducer( fn() => new ReadModelUpdateActor(new RegisteredUser($this->proxy)) ) );
RegisteredUserはQueryのためのデータ成形結果を保存するテーブル操作に関するものです。
このテーブルは永続化で軽く触れた2つのテーブルは利用しません。
完全に別のテーブルを利用します(他のデータストレージでもOK)
次に App\Command\CreateUser
に対応した処理です。
<?php case $msg instanceof CreateUser: $provider = $this->providerState; $result = $this->spawnUserActor($context, $this->readModelUpdater, $provider, $msg); if ($result->isError() instanceof NameExistsException) { $context->send( $msg->ref, new UserCreateError( sprintf("user %s already exists", $msg->userName) ) ); return; } $context->send($result->getRef(), $msg); break;
ここではメールアドレスで識別されるユーザー作成アクターの生成を実行しています。
同じメールアドレスを指定すると、NameExistsException が返却されますので、
生成済みかどうかを判定する事ができます。
ただし復元した瞬間は判断できませんので、実際にはこことユーザー作成アクターの両方で判断することが必要になります。
(アクター・イベントの復元はアクター生成時に行われるため)
アクターが生成済みの場合は UserCreateError
メッセージが返却されます。
CreateUser
は返信先が指定されたメッセージとなっていますので、sendの第1引数を使って $msg-ref 宛に返信します。
アクターが生成されるとメッセージを受け付ける状態になりますので(メールボックスが起動する)
$context->send($result->getRef(), $msg);
で生成されたユーザー作成アクターにメッセージを送信しています。
これで下記くらいが用意されました。
graph TD; Mezzio(Mezzio) -.- Phluxor(Phluxor); Phluxor --> |Spawn| RestAPIActor(RestAPI Actor); RestAPIActor(RestAPI Actor) --> |Spawn| UserRegistration/UserActor(User Registration Actor); RestAPIActor(RestAPI Actor) --> |Spawn| ReadModelUpdateActor(Read Model Update Actor); Persistence <--> MySQL(MySQL);
リードモデル更新アクターを実装しよう
リードモデル更新アクターは非常に簡単です。
<?php declare(strict_types=1); namespace App\ActorSystem\UserRegistration; use App\Database\RegisteredUser; use App\Event\ProtoBuf\UserCreated; use Phluxor\ActorSystem\Context\ContextInterface; use Phluxor\ActorSystem\Message\ActorInterface; readonly class ReadModelUpdateActor implements ActorInterface { public function __construct( private RegisteredUser $registeredUser ) { } public function receive(ContextInterface $context): void { $msg = $context->message(); if ($msg instanceof UserCreated) { $row = $this->registeredUser->findByEmail($msg->getEmail()); if (!$row) { $result = $this->registeredUser->addUser($msg->getUserID(), $msg->getUserName(), $msg->getEmail()); if (!$result) { $context->logger()->error( 'failed to add user', [ 'user_id' => $msg->getUserID(), 'user_name' => $msg->getUserName(), 'email' => $msg->getEmail() ] ); } } } // no send message // read model update actor is a final actor } }
このリードモデル更新アクターとして作用するアクターは、このアクターシステム内では一つしかありません。
他のアクターと同様にメッセージを受け取り、Queryのためのデータ成形を行うだけですので
伝統的なアプリケーションと同じように実装するだけです。
ここで流れてくるのはイベントとなりますので、このイベントを加工して保存するだけのものです。
前述の通りここで操作するテーブルはjournals、snapshots以外のものになりますので、
事実を変更するようなものはありません。
任意の処理で色々できますので、いわゆるPubSubで受けるConsumerのような感じだと思っていただけるといいかなと思います。
成形の完了は返信する必要はなく、
Queryとして作用するAPIエンドポイントなどを通じてリストが表示されればよい、という具合です。
ポイントとして、アクター内のメッセージは at-most-once
となります。
受け取り側の保証などはしていません(自分で実装する必要があります)ので、必要であれば独自で作り込む必要があります。
これは多くのアクターシステムと共通です
ヒエラルキー等を理解すれば再送の伝播などが簡単にできるのは理解できると思いますが、
同じようにQuery側で利用済みの過去のイベントも流れてきますので、
バージョンなどを利用したり工夫してみてください(のでここでは単純なデータ存在確認を実行しています)
現在はここまで実装できました。
graph TD; Mezzio(Mezzio) -.- Phluxor(Phluxor); Phluxor --> |Spawn| RestAPIActor(RestAPI Actor); RestAPIActor(RestAPI Actor) --> |Spawn| UserRegistration/UserActor(User Registration Actor); RestAPIActor(RestAPI Actor) --> |Spawn| ReadModelUpdateActor(Read Model Update Actor); Persistence <--> MySQL(MySQL); ReadModelUpdateActor(Read Model Update Actor) <--> MySQL(MySQL);
永続化するユーザー作成アクターを実装しよう
最後にアクターの永続化です。
アクターはステートフルとなりますので、
private ?UserCreated $state = null;
がアクターの状態を示すもので、
かつProtocol Buffersを利用したもの、となります。
<?php declare(strict_types=1); namespace App\ActorSystem\UserRegistration; use App\Command\CreateUser; use App\Event\ProtoBuf\UserCreated; use App\Message\UserCreateError; use App\Message\UserCreateResponse; use Google\Protobuf\Internal\Message; use Phluxor\ActorSystem\Context\ContextInterface; use Phluxor\ActorSystem\Message\ActorInterface; use Phluxor\ActorSystem\Ref; use Phluxor\Persistence\Message\ReplayComplete; use Phluxor\Persistence\Message\RequestSnapshot; use Phluxor\Persistence\PersistentInterface; use Phluxor\Persistence\Mixin; use Symfony\Component\Uid\Ulid; class UserActor implements ActorInterface, PersistentInterface { use Mixin; private ?UserCreated $state = null; private $version = 0; public function __construct( private readonly Ref $readModelUpdater ) { } public function receive(ContextInterface $context): void { $msg = $context->message(); switch (true) { case $msg instanceof RequestSnapshot: // save the state if ($this->state != null) { $this->persistenceSnapshot($this->state); } break; case $msg instanceof ReplayComplete: if ($this->state != null) { $context->logger()->info( sprintf('Replay complete for %s', $this->state->serializeToJsonString()) ); } break; case $msg instanceof CreateUser: if ($this->isStateExists($msg->email)) { $context->send($msg->ref, new UserCreateError('user already exists')); return; } // サンプルでは省いていますがversion を利用して楽観的ロックを実現することができます $id = Ulid::generate(); $ev = new UserCreated([ 'userID' => $id, 'email' => $msg->email, 'userName' => $msg->userName, 'version' => $this->version++, ]); $this->persist($ev, $context); $context->send($msg->ref, new UserCreateResponse($id)); $context->stop($context->self()); break; case $msg instanceof Message: // event がリプレイされた場合は状態を更新する if ($msg->serializeToJsonString() != '') { $this->persist($msg, $context); } break; default: } } private function isStateExists(string $email): bool { if ($this->state === null) { return false; } return $this->state->getEmail() == $email; } private function persist(Message $msg, ContextInterface $context): void { if (!$this->recovering()) { $this->persistenceReceive($msg); } if ($msg instanceof UserCreated) { $this->state = $msg; $context->send($this->readModelUpdater, $msg); } } }
アクターを永続化する場合は、Phluxor\Persistence\PersistentInterface
インターフェースを実装する必要があります。
が、Phluxor\Persistence\Mixin
トレイトに実装済みメソッドがありますのでこのトレイトを利用します。
これを利用することでスナップショット取得の計算や永続化などが実行されます。
これまで同様にメッセージに対応した処理をみていきましょう。
最初に Phluxor\Persistence\Message\RequestSnapshot
メッセージに対応した処理です。
<?php case $msg instanceof RequestSnapshot: // save the state if ($this->state != null) { $this->persistenceSnapshot($this->state); } break;
これは永続化を利用する際に、snapshotのタイミングで内部的に送られてくるメッセージとなっています。
このメッセージが来たタイミングでスナップショット保管を行います。
persistenceSnapshot
で現在の状態がスナップショットとして扱われます。
サンプルでは省いてますが、アクターとはいえ楽観ロックなどは必要ですのでversionなどを利用してください。
<?php case $msg instanceof ReplayComplete: if ($this->state != null) { $context->logger()->info( sprintf('Replay complete for %s', $this->state->serializeToJsonString()) ); } break;
この例では大事な意味を持ちませんが、
Phluxor\Persistence\Message\ReplayComplete
は状態復元が完了すると流れてくるメッセージとなっています。
journalsとsnapshotsを使ってアクターの復元を自動で行いますが、
アクターが復元されたタイミングが取得できます。
繰り返しになりますが、アクターの状態復元は、
基本的にはアクター再生成時(このため任意の名前をつけておくのがポイント)となりますので
停止からの再起動や、時間がたったあとに再生成したり、
アクターがクラッシュした場合の自動復旧(Supervisor Strategyによる)などがあります。
次に App\Command\CreateUser
に対応した処理です。
<?php case $msg instanceof CreateUser: if ($this->isStateExists($msg->email)) { $context->send($msg->ref, new UserCreateError('user already exists')); return; } // サンプルでは省いていますがversion を利用して楽観的ロックを実現することができます $id = Ulid::generate(); $ev = new UserCreated([ 'userID' => $id, 'email' => $msg->email, 'userName' => $msg->userName, 'version' => $this->version++, ]); $this->persist($ev, $context); $context->send($msg->ref, new UserCreateResponse($id)); $context->stop($context->self()); break;
内部でコールされているメソッドは下記のとおりです。
<?php private function isStateExists(string $email): bool { if ($this->state === null) { return false; } return $this->state->getEmail() == $email; } private function persist(Message $msg, ContextInterface $context): void { if (!$this->recovering()) { $this->persistenceReceive($msg); } if ($msg instanceof UserCreated) { $this->state = $msg; $context->send($this->readModelUpdater, $msg); } }
大した処理ではありませんが、
isStateExists
は状態をもっているかどうかの判断です。
このアクター例はユーザー作成に関するものとなっていますので、
送られてきたメールアドレスが自分のアドレスと同じものの場合は
作成済みであることを返却し、自身はなにもしないために利用するものです。
persist
は状態保存のためのメソッドです。
PHPではProtocol Buffersを使ったメッセージは Google\Protobuf\Internal\Message
を継承していますので、
このメッセージのみを保存します。
recovering
はリカバリ中、つまり復元中の状態にあるかどうかで
復元中は重複して状態を保管しないようにする必要があります。
復元中でなければ送られてきたメッセージは 永続化対象として保存されます persistenceReceive
。
<?php if ($msg instanceof UserCreated) { $this->state = $msg; $context->send($this->readModelUpdater, $msg); }
この部分は送られてきたメッセージ、メッセージを自分の状態としている処理です。
そして自分の状態として利用したメッセージをリードモデル更新アクターの送信する、
という流れです。
処理内容に話を戻しましょう。
<?php $id = Ulid::generate(); $ev = new UserCreated([ 'userID' => $id, 'email' => $msg->email, 'userName' => $msg->userName, 'version' => $this->version++, ]); $this->persist($ev, $context); $context->send($msg->ref, new UserCreateResponse($id)); $context->stop($context->self());
App\Command\CreateUser
を受け取り、
メッセージを利用して、App\Event\ProtoBuf\UserCreated
を自身の状態・状態を変更したイベントとして利用します。
状態変更ができたことを App\Command\CreateUser
に指定された送信先にメッセージを送り、
リードモデル更新アクターにもメッセージを送信しています。(persist
内部で)
最後にstopで自身のアクターを停止しています。
最後に下記のものです。
<?php case $msg instanceof Message: // event がリプレイされた場合は状態を更新する if ($msg->serializeToJsonString() != '') { $this->persist($msg, $context); } break;
復元時は永続化したアクターの状態が頭から読み出されるようになっていますので、
このメッセージを受けて状態を一つずつ反映
+状態(イベント)を利用するリードモデル更新アクターに送信(再送信という形になります)しています。
ここまでの流れでわかるようにイベントをデータソースとして状態を復元し、
任意で伝播させているのがわかると思います。
Phluxorはアクターモデルのツールキットですので、
他言語のツールキット同様にすべて並行で意図したように実行されます。
だんだん完成に近づいてきました。
graph TD; Mezzio(Mezzio) -.- Phluxor(Phluxor); Phluxor --> |Spawn| RestAPIActor(RestAPI Actor); Phluxor --> |Spawn| TypedChannelActor(Stream Actor); RestAPIActor(RestAPI Actor) --> |Spawn| UserRegistration/UserActor(User Registration Actor); RestAPIActor(RestAPI Actor) --> |Spawn| ReadModelUpdateActor(Read Model Update Actor); UserRegistration/UserActor <--> |Event Sourcing/Replay| Persistence(Persistence); Persistence <--> MySQL(MySQL); ReadModelUpdateActor(Read Model Update Actor) <--> MySQL(MySQL);
ユーザー作成コマンド送信エンドポイントを見てみよう
POST /user/registration
的なものに対応するエンドポイント例です。
mezzioからは App\ActorSystem\AppActor
でアクターシステムを参照できる状態になっていますので、
これを使ってユーザー作成を依頼しています。
<?php declare(strict_types=1); namespace App\Handler\UserRegistration; use App\ActorSystem\AppActor; use App\Command\CreateUser; use App\Message\UserCreateError; use App\Message\UserCreateMessageInterface; use App\Message\UserCreateResponse; use Laminas\Diactoros\Response\JsonResponse; use Phluxor\ActorSystem\Channel\TypedChannel; use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\ServerRequestInterface; use Psr\Http\Server\RequestHandlerInterface; readonly class CreateUserHandler implements RequestHandlerInterface { public function __construct( private AppActor $appActor ) { } public function handle(ServerRequestInterface $request): ResponseInterface { $params = $request->getParsedBody(); if (!array_key_exists("email", $params) && !array_key_exists("username", $params)) { return new JsonResponse(['message' => 'missing email or username'], 400); } $system = $this->appActor->system; $c = new TypedChannel( $system, fn(mixed $message): bool => $message instanceof UserCreateMessageInterface ); $system->root()->send( $this->appActor->actorRef, new CreateUser($params['email'], $params['username'], $c->getRef()) ); $result = $c->result(); return match (true) { $result instanceof UserCreateResponse => new JsonResponse( ['message' => 'success', 'user' => $result->userID], 200 ), $result instanceof UserCreateError => new JsonResponse( ['message' => 'user already exists'], 400 ), default => new JsonResponse(['message' => 'failed'], 400), }; } }
コマンド送信に該当するのは下記の部分です。
<?php $c = new TypedChannel( $system, fn(mixed $message): bool => $message instanceof UserCreateMessageInterface ); $system->root()->send( $this->appActor->actorRef, new CreateUser($params['email'], $params['username'], $c->getRef()) );
Phluxor\ActorSystem\Channel\TypedChannel
はアクターに流れてくるメッセージから、
任意の型メッセージを取り出すものですが、これもアクターになっていますので
独自に実装したアクターを使っても構いません。
PhluxorではFutureを使って返却を直接受け取る事もできます。
すべてのアクターは生成時に返却される Phluxor\ActorSystem\Ref
さえ利用できれば、
どこにでもメッセージを送ることができます。
アクターの名前が決まっていれば、Ref
オブジェクトを直接生成しても構いません。
これはアクター利用例の一つだと思ってください。
これで下記の状態になります。
graph TD; Mezzio(Mezzio) -.- Phluxor(Phluxor); Phluxor --> |Spawn| RestAPIActor(RestAPI Actor); Phluxor --> |Spawn| TypedChannelActor(Stream Actor); TypedChannelActor(TypedChannel Actor) --> Mezzio(Mezzio); RestAPIActor(RestAPI Actor) --> |Spawn| UserRegistration/UserActor(User Registration Actor); RestAPIActor(RestAPI Actor) --> |Spawn| ReadModelUpdateActor(Read Model Update Actor); UserRegistration/UserActor --> |Send| TypedChannelActor(TypedChannel Actor); UserRegistration/UserActor <--> |Event Sourcing/Replay| Persistence(Persistence); Persistence <--> MySQL(MySQL); ReadModelUpdateActor(Read Model Update Actor) <--> MySQL(MySQL);
ユーザー作成結果は
App\Message\UserCreateMessageInterface
を実装した成功か失敗のメッセージが送られてきますので、
それを受け取っているだけの処理です。
つまり度々登場しているRef、App\Command\CreateUser
を処理するアクターはこのアクターに返信する、という意味です。
順を追って読み解いていくと、そんなに難しい処理ではないのがわかります。
あとは返信されたメッセージを使ってHTTPのレスポンスを返しているだけですので、
Webアプリケーション側は非常に薄い処理となっています。
ユーザー作成済みリストエンドポイントを見てみよう
ついでにユーザー作成済みリスト返却の処理はもっと簡単です。
表示のための成形は済んでいますので、ただのDTOで十分なものになっています。
レイヤ構造にする必要もないくらいのものになります。
一番シンプルなこの部分の実装となります。
graph TD; Mezzio(Mezzio) <--> |DTO|MySQL(MySQL); Mezzio(Mezzio) --> |HTTP Response|Response(JSON);
<?php declare(strict_types=1); namespace App\Handler\UserRegistration; use App\Query\RegistrationUser; use Laminas\Diactoros\Response\JsonResponse; use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\ServerRequestInterface; use Psr\Http\Server\RequestHandlerInterface; readonly class ListHandler implements RequestHandlerInterface { public function __construct( private RegistrationUser $registrationUser, ) { } public function handle(ServerRequestInterface $request): ResponseInterface { $result = $this->registrationUser->findAll(); return match (true) { $result !== null => new JsonResponse($result, 200), default => new JsonResponse(['message' => 'unknown'], 400), }; } }
長編でしたが、いかがでしたか?
Phluxorはアクターモデルを導入するための最小のツールキットというのが理解できたと思います。
(今回は紹介してませんがアクター再起動の設定や、エラーのエスカレーション、Become/Unbecome、ルーターなどもあります)
あとはサンプルコードなどを見ながら実際に実装してみてください。
(長くなってきたので最後は雑)
Webアプリケーションを作りながら
アクターモデルを導入することができますので
特定のアクターを物理的に別サーバの配置して負荷分散したり、
マイクロサービス的な実装をPHPだけで実装することができるようになります。
(クラスタの機能は実装中です)
他にも色々な使い方を導入することで、
これまでPHPだけでは実現が難しかった手法なども
表現できるようになると思いますので、
ぜひ色々トライして体験してみてください。
リトライ可能なタスクランナー的なものや、メッセージングに関するミドルウェア、
ブロックチェーンの仕組みも作ることができます。
他にも色々ありますので、AkkaやActor Model、Earlang OTPなどをネットで検索してみてください。
とはいえ、書くのがめんどうなので趣味でakka-projectionみたいなのとかも作ろうかな・・