PHP Conference Japan2024でアクターモデル完全ガイドワークショップを実施!と解説

2日連続カンファレンスの二日目!

今年は久しぶりにPHPのカンファレンスで登壇しました。
沖縄のカンファレンスに続いて今年のPHPは2回目。

今回は今年作ったPHP向けアクターモデルのツールキット
Phluxorでアクターモデルを体験し
PHPではあまりおなじみではない概念を少しだけ理解するための
ガイド的な形でハンズオン・ワークショップを開催しました。

前日のCQRS+ESカンファレンスからの似たような話を
PHPの方向けに体験してもらうという流れにしていました。

cqrs-es-con.connpass.com

CQRS+ESカンファレンスの当日の様子は前回のブログや下記からどうぞ

togetter.com

ワークショップは録画等もないのでスライドの公開はしませんが、
当日の内容を解説していこうと思います。

大作なので気を付けて!

ワークショップ解説

まずは当日利用したリポジトリです

github.com

実際に手元で写経しながらやりたい方はDockerなどをそのままご利用ください。

ディレクトリごとに難易度が異なっています。

basicディレクトリ

codezine.jp

上記の翔泳社で公開されている解説の流れに従って進めるものです。
これは実際に読みながらやってみるといいでしょう。

calculatorディレクトリ

計算機を途中で落としても状態を復元して元の状態に戻る、
アクターモデルならではの動作を体験するものです。

アクターシステムでは
それぞれのアクターは並行で稼働するわけですが、
状態復元もすべて並行で動作します。

またメモリ共有も一切行わない、という体験ができるものです。
このあたりはこれまでのPHPのプログラミングパラダイムでは存在しなかったもの、
と言っても過言ではないものです。

cartディレクトリ

計算機での復元方法を体験したあとに、
複数のアクターモデルを使用して実際のショッピングカートっぽい実装を体験するものです。
複数のアクターによるドメインイベントの保管と、復元
アクター間のやり取りのあとに元の処理に戻す、という形になっています。

アクターモデルに自身がある方はこれから挑んでもいいと思いますが、
前述の通りこれまでのPHPのプログラミングパラダイムと別物なので、
Akka実践バイブルを読みながらの実装を強くおすすめします。
知識なしで挑むと多分挫折します・・

ではせっかくなので読みながら理解できるように解説していきましょう。
実装済みのものも公開してありますので、
そちらを見ながらでもどうぞ

github.com

calculator実装

まずは実装済みの処理を使って、
アクターに計算による状態変更を実装します。

計算の実装自体はこちらに用意されています。

Phluxorではアクターの永続化には
Protocol Buffersを利用することが制限として設けてあります。
(というかAkka/Pekko、ProtoActorも)

そのあたりも事前に実装済みとなっています。
Protocol Buffersで自動生成したコードですが、
PHP版では継承したり拡張したりはできませんのでラップした形になっています。

まずは 事前に用意されているアクター
この計算に対応した処理を実装します。

まずはアクターに対しての命令、つまりコマンドですが
こちらもすべて用意済みなのでこちらを組み合わせます。
これもこちらのディレクトリに含まれています

アクタークラスのstate に状態を保持する仕組みになっていますので、
そのまま愚直に実装します。

<?php

declare(strict_types=1);

namespace Calculator;

use Phluxor\ActorSystem\Context\ContextInterface;
use Phluxor\ActorSystem\Message\ActorInterface;

class Actor implements ActorInterface
{
    public function __construct(
        private CalculationResult $state = new CalculationResult()
    ) {
    }

    public function receive(ContextInterface $context): void
    {
        $message = $context->message();
        switch (true) {
            case $message instanceof Command\Add:
                $this->state = $this->state->add($message->value);
                break;
            case $message instanceof Command\Subtract:
                $this->state = $this->state->subtract($message->value);
                break;
            case $message instanceof Command\Multiply:
                $this->state = $this->state->multiply($message->value);
                break;
            case $message instanceof Command\Divide:
                $this->state = $this->state->divide($message->value);
                break;
            case $message instanceof Command\Clear:
                $this->state = $this->state->reset();
                break;
            case $message instanceof Command\PrintResult:
                $context->logger()->info('CalculationResult is ' . $this->state->getResult());
                break;
            case $message instanceof Command\GetResult:
                $context->respond($this->state->getResult());
                break;
        }
    }
}

PrintResultやGetResultはアクターの現在の状態を出力するものです。
アクターではreturnを使うことができませんので、
中身を知りたい場合はこのようにします。

このあたりを実装すると
calculator直下のmain.phpを実行すると色々と計算処理が動くのが確認できると思います。

<?php

declare(strict_types=1);

use Calculator\Actor;
use Calculator\Command\Add;
use Calculator\Command\Subtract;
use Calculator\Command\PrintResult;
use Phluxor\ActorSystem;

use function Swoole\Coroutine\go;
use function Swoole\Coroutine\run;

require_once __DIR__ . '/vendor/autoload.php';

run(function () {
    go(function () {
        $system = ActorSystem::create();
        $ref = $system->root()->spawn(
            ActorSystem\Props::fromProducer(
                fn() => new Actor()
            )
        );
        $system->root()->send($ref, new Add(10));
        $system->root()->send($ref, new Subtract(10));
        $system->root()->send($ref, new Add(10));
        $system->root()->send($ref, new PrintResult());
        $system->shutdown();
    });
});

Phluxorの各メソッドの解説は リファレンスにもありますので、
気になる方はそちらも参考にどうぞ

phluxor.github.io

アクターはヒエラルキーを構成しますので、root()はヒエラルキーの最上位にあるものと思ってください。
spawn は最上位のアクターを生成するメソッドです。
(アクターは子アクターを生成しますが、APIはすべて同じです)

        $ref = $system->root()->spawn(
            ActorSystem\Props::fromProducer(
                fn() => new Actor()
            )
        );

上記を記述すると、
アクター生成時にアクターのアドレスである位置情報が払い出されます。
(アクターリファレンスというもの)

この位置情報に対してメッセージを送信するとそのアクターにメッセージが届けられる仕組みになっています。

        $system->root()->send($ref, new Add(10));
        $system->root()->send($ref, new Subtract(10));
        $system->root()->send($ref, new Add(10));
        $system->root()->send($ref, new PrintResult());
        $system->shutdown();

これは生成したアクターにメッセージを送信することを表しています。
最後のshutdownはアクターシステムを停止して、
すべてのリソースを開放する動作となります。

ここまでは単純なメッセージ送信の仕組みです。

ここからアクターモデルの復元を実装します。

永続化するアクターの実装をしてみましょう。

できたコードが次のものです。

<?php

declare(strict_types=1);

namespace Calculator;

use Calculator\ProtoBuf\Added;
use Calculator\ProtoBuf\Divided;
use Calculator\ProtoBuf\Multiplied;
use Calculator\ProtoBuf\Reset;
use Calculator\ProtoBuf\Subtracted;
use Google\Protobuf\Internal\Message;
use Phluxor\ActorSystem\Context\ContextInterface;
use Phluxor\ActorSystem\Message\ActorInterface;
use Phluxor\Persistence\Message\OfferSnapshot;
use Phluxor\Persistence\Message\RequestSnapshot;
use Phluxor\Persistence\Mixin;
use Phluxor\Persistence\PersistentInterface;

class PersistenceActor implements ActorInterface, PersistentInterface
{
    use Mixin;

    public function __construct(
        private CalculationResult $state = new CalculationResult()
    ) {
    }

    public function receive(ContextInterface $context): void
    {
        $message = $context->message();
        switch (true) {
            case $message instanceof RequestSnapshot:
                $this->persistenceSnapshot($this->state->getProtobufCalculationResult());
                break;
            case $message instanceof Command\Add:
                $this->updateState(new Added(['result' => $message->value]));
                break;
            case $message instanceof Command\Subtract:
                $this->updateState(new Subtracted(['result' => $message->value]));
                break;
            case $message instanceof Command\Multiply:
                $this->updateState(new Multiplied(['result' => $message->value]));
                break;
            case $message instanceof Command\Divide:
                $this->updateState(new Divided(['result' => $message->value]));
                break;
            case $message instanceof Command\Clear:
                $this->updateState(new Reset());
                break;
            case $message instanceof Command\PrintResult:
                $context->logger()->info('CalculationResult is ' . $this->state->getResult());
                break;
            case $message instanceof Command\GetResult:
                $context->respond($this->state->getResult());
                break;
        }
    }

    public function receiveRecover(mixed $message): void
    {
        switch (true) {
            case $message instanceof OfferSnapshot:
                $this->state = new CalculationResult($message->snapshot);
                break;
            case $message instanceof Message:
                $this->updateState($message);
                break;
        }
    }

    private function updateState(Message $message): void
    {
        if (!$this->recovering()) {
            $this->persistenceReceive($message);
        }
        $this->state = match (true) {
            $message instanceof Reset => $this->state->reset(),
            $message instanceof Added => $this->state->add($message->getResult()),
            $message instanceof Subtracted => $this->state->subtract($message->getResult()),
            $message instanceof Divided => $this->state->divide($message->getResult()),
            $message instanceof Multiplied => $this->state->multiply($message->getResult()),
        };
    }
}

早速解説します。

まずアクターを永続化する場合は次のインターフェースとtraitを利用します。

use Phluxor\Persistence\Mixin;
use Phluxor\Persistence\PersistentInterface;

stateのもたせ方は最初の実装と変わりません。
違いはコマンドを受け取ったあとにイベントに変換している部分です。

    public function receive(ContextInterface $context): void
    {
        $message = $context->message();
        switch (true) {
            case $message instanceof RequestSnapshot:
                $this->persistenceSnapshot($this->state->getProtobufCalculationResult());
                break;
            case $message instanceof Command\Add:
                $this->updateState(new Added(['result' => $message->value]));
                break;
            case $message instanceof Command\Subtract:
                $this->updateState(new Subtracted(['result' => $message->value]));
                break;
            case $message instanceof Command\Multiply:
                $this->updateState(new Multiplied(['result' => $message->value]));
                break;
            case $message instanceof Command\Divide:
                $this->updateState(new Divided(['result' => $message->value]));
                break;
            case $message instanceof Command\Clear:
                $this->updateState(new Reset());
                break;
            case $message instanceof Command\PrintResult:
                $context->logger()->info('CalculationResult is ' . $this->state->getResult());
                break;
            case $message instanceof Command\GetResult:
                $context->respond($this->state->getResult());
                break;
        }
    }

事前に用意されているコードではこのイベントもProtocol Buffersで事前に生成してあるものです。

実はイベントソーシングも一緒に実装される形になりますが、
このイベントをすべて保管する形になっており、
アクターを再起動するとこのイベントを頭から全て読みだす形になっています。
これは自動的にこの動作になるようにアクターシステム側で実装されています。
このあたりはアクターのライフサイクルの理解も必要になりますので、 リファレンスなどで理解しておくといいでしょう。

内部でコールしているupdateStateは大した処理ではありません。

    private function updateState(Message $message): void
    {
        if (!$this->recovering()) {
            $this->persistenceReceive($message);
        }
        $this->state = match (true) {
            $message instanceof Reset => $this->state->reset(),
            $message instanceof Added => $this->state->add($message->getResult()),
            $message instanceof Subtracted => $this->state->subtract($message->getResult()),
            $message instanceof Divided => $this->state->divide($message->getResult()),
            $message instanceof Multiplied => $this->state->multiply($message->getResult()),
        };
    }

イベントの中身を見てアクター自身の状態に反映しているだけです。
ほぼ最初の実装そのままで少しだけ変わっていきますね。

注目するものは下記の部分です。

        if (!$this->recovering()) {
            $this->persistenceReceive($message);
        }

recoveringはアクターが再起動中かどうかがわかる処理です。
persistenceReceive は受け取ったメッセージを保管する処理です。
つまり再起動中以外は受け取ったメッセージをジャーナルと呼ばれる場所に保管する形になります。
この例ではインメモリで動かしますが、RDBMSを使う場合はテーブルに自動で書き込まれます。

このジャーナルはイベントをすべて保管する場所で、
アクターが起動するとこのジャーナルから、該当するイベントをすべて読み込みます。

ただしこのイベントが100万件あったりすると100万回読まれることになります。
そこで利用するのがスナップショットで、ジャーナル何回おきに最新状態として保管するという機能があります。

これはreceiveに記載されている下記の部分です。

            case $message instanceof RequestSnapshot:
                $this->persistenceSnapshot($this->state->getProtobufCalculationResult());
                break;

スナップショットを設定するとアクターに対して自動で指示が送られてきます。
RequestSnapshot がそれです。

このときに今のアクターの状態を最新としてpersistenceSnapshot で保存します。
前述の通りProtocol Buffersを噛ませたものでなければいけませんので、
ラップしているクラスからアクセスしています。

これで最新のスナップショットと差分のジャーナルを組み合わせ、
アクター起動時に最短で復元されるようになります。

この仕組みはもちろんAkka/Pekko、Proto Actorと同じです。

次に下記の部分です。

    public function receiveRecover(mixed $message): void
    {
        switch (true) {
            case $message instanceof OfferSnapshot:
                $this->state = new CalculationResult($message->snapshot);
                break;
            case $message instanceof Message:
                $this->updateState($message);
                break;
        }
    }

永続化する場合は receiveRecover メソッドを必ず実装しなければなりません。

これはアクターが復元されるときに実行されるメソッドで、
復元時にアクターの送られてくるメッセージに対応した処理になっています。
OfferSnapshot は最新のスナップショットが勝手に送られてきます。
それを利用してアクターの状態に反映させます。
その後差分のジャーナルが送られてきますので、updateStateを内部でコールし、
状態を完全に戻します。

最後にアクターを永続化するための設定を記述します。

実装済みのリポジトリを見ていただくとわかりますが、
main.php を少し変えてあります。

<?php

declare(strict_types=1);

use Calculator\Command\Add;
use Calculator\Command\PrintResult;
use Calculator\InMemoryStateProvider;
use Calculator\PersistenceActor;
use Phluxor\ActorSystem;
use Phluxor\Persistence\EventSourcedBehavior;
use Phluxor\Persistence\InMemoryProvider;
use Swoole\Coroutine;

use function Swoole\Coroutine\go;
use function Swoole\Coroutine\run;

require_once __DIR__ . '/vendor/autoload.php';

run(function () {
    go(function () {
        $system = ActorSystem::create();
        $props = ActorSystem\Props::fromProducer(
            fn() => new PersistenceActor(),
            ActorSystem\Props::withReceiverMiddleware(
                new EventSourcedBehavior(
                    new InMemoryStateProvider(new InMemoryProvider(1))
                )
            )
        );
        $spawn = $system->root()->spawnNamed($props, 'calculator');
        $system->root()->send($spawn->getRef(), new Add(10));
        $system->root()->send($spawn->getRef(), new Add(10));
        $system->root()->send($spawn->getRef(), new Add(10));
        $system->root()->send($spawn->getRef(), new PrintResult());
        $system->root()->poison($spawn->getRef());
        \Swoole\Coroutine::sleep(0.1);
        $spawn = $system->root()->spawnNamed($props, 'calculator');
        $system->root()->send($spawn->getRef(), new PrintResult());
    });
});

大きく違うのは下記の部分です。

        $props = ActorSystem\Props::fromProducer(
            fn() => new PersistenceActor(),
            ActorSystem\Props::withReceiverMiddleware(
                new EventSourcedBehavior(
                    new InMemoryStateProvider(new InMemoryProvider(1))
                )
            )
        );

ActorSystem\Props はアクターの生成方法についての定義を記述するものです。

Phluxorでは事前に永続化ように Phluxor\Persistence\EventSourcedBehavior を用意していますので、
これをそのまま利用します。

new InMemoryStateProvider(new InMemoryProvider(1)) はメモリを使って
永続化するProviderです(テストなどにもどうぞ)。

これを指定して計算機永続化アクターを生成します。
永続化を利用する場合は、アクターにユニークな名前をつける必要があります。

名前をつけない場合は、Phluxor内部でランダムなユニークな名前をかってにつけるようになっています。
つまりランダムだと再起動時に同じ名前がつくとは限らないので、
状態を戻せない、ということになります。
このためここではcalculator という名前をつけています。

poisonは指定したアクターを停止させるメッセージです。
並行で動作するため、\Swoole\Coroutine::sleep(0.1) で少しwaitを設定しています。
その後同じ名前でアクターを生成し、
現在の状態を尋ねるメッセージを送信しています。

        $spawn = $system->root()->spawnNamed($props, 'calculator');
        $system->root()->send($spawn->getRef(), new Add(10));
        $system->root()->send($spawn->getRef(), new Add(10));
        $system->root()->send($spawn->getRef(), new Add(10));
        $system->root()->send($spawn->getRef(), new PrintResult());
        $system->root()->poison($spawn->getRef());
        \Swoole\Coroutine::sleep(0.1);
        $spawn = $system->root()->spawnNamed($props, 'calculator');
        $system->root()->send($spawn->getRef(), new PrintResult());

これを実行するとアクター停止前の状態に勝手に戻っているのがわかると思います。
また2回目のspawnNamedで名前を変えて生成すると、
別な初期状態のアクターとして生成されているのがわかると思います。
異なる名前や同じ名前のものを複数生成してみると、
状態を共有していないのもこれで体験できると思います。
ぜひやってみてください。

cart実装

計算機を踏まえたうえでやるものなので、これはちょっとだけ難しくなります。

これも実装済みの処理を解説したほうが理解しやすいかもしれません。

これで実装するのは、あらかじめ実装されている
Walletアクター(購入者の財布・予算)、
Basketアクターを組み合わせて、
Shopperアクターを実装します。

処理イメージはこんな感じです。

急に複数のアクターを組み合わせるのでちょっと難しそうですね。
複数のアクターを利用する場合は、 組み合わせ方や参考にする設計パターンなどが多少あります。

www.enterpriseintegrationpatterns.com

ここでのポイントの一つはアクターの監督・監視の仕組みの理解です。

翔泳社の記事なども事前に読んでおくとわかりますが、
このあたりはこれまでのPHPが扱っていたパラダイムと完全に別ものになりますので注意してください。

子アクターとして監視するか、
別アクターとして監視するかの違いですが、
下記の事象を事前にしっかりと理解しておく必要があります。

 - 子アクターの場合、親がダウンすると一緒にダウン
 - 別アクターの場合、アクター生成時に送信先アドレスを渡すか
メッセージに含める(ReplyTo など)

サンプルではどっちのパターンでも違いがわかるように実装例を用意してあります。

長くなるのでここではShopperの子アクターとして生成する例だけ紹介します。

<?php

declare(strict_types=1);

namespace Cart\Shopper;

use Cart\Basket\Command\Clear;
use Cart\Basket\Command\CommandInterface as BasketCommandInterface;
use Cart\Basket\Command\GetItems;
use Cart\Event\Paid;
use Cart\Items;
use Cart\Shopper\Command\PayBasket;
use Cart\Wallet\Command\CommandInterface as WalletCommandInterface;
use Cart\InMemoryStateProvider;
use Cart\Shopper\Value\Cash;
use Cart\Wallet\Command\Pay;
use Phluxor\ActorSystem\Context\ContextInterface;
use Phluxor\ActorSystem\Message\ActorInterface;
use Phluxor\ActorSystem\Message\Started;
use Phluxor\ActorSystem\Props;
use Phluxor\ActorSystem\Ref;
use Phluxor\Persistence\EventSourcedBehavior;
use Phluxor\Persistence\InMemoryProvider;
use Phluxor\Persistence\ProviderInterface;

class Actor implements ActorInterface
{
    private ?Ref $basketRef = null;
    private ?Ref $walletRef = null;
    private ProviderInterface $provider;

    public function __construct(
        private readonly int $shopperId,
        ?ProviderInterface $provider = null
    ) {
        $this->provider = $provider ?? $this->stateProvider();
    }

    public function receive(ContextInterface $context): void
    {
        $message = $context->message();
        switch (true) {
            case $message instanceof Started:
                $this->basketRef = $this->spawnBasket($context);
                $this->walletRef = $this->spawnWallet($context);
                break;
            case $message instanceof BasketCommandInterface:
                $context->forward($this->basketRef);
                break;
            case $message instanceof WalletCommandInterface:
                $context->forward($this->walletRef);
                break;
            case $message instanceof PayBasket:
                $context->request($this->basketRef, new GetItems($message->shopperId));
                break;
            case $message instanceof Items:
                $context->request($this->walletRef, new Pay($message, $this->shopperId));
                break;
            case $message instanceof Paid:
                $context->send($this->basketRef, new Clear($message->getShopperId()));
                $context->actorSystem()->getEventStream()->publish($message);
                break;
        }
    }

    private function spawnWallet(
        ContextInterface $context
    ): ?Ref {
        $spawned = $context->spawnNamed(
            Props::fromProducer(
                fn() => new \Cart\Wallet\Actor((string)Cash::VALUE),
                Props::withReceiverMiddleware(
                    new EventSourcedBehavior($this->provider)
                )
            ),
            "wallet:$this->shopperId"
        );
        if ($spawned->isError()) {
            $context->logger()->error($spawned->isError()->getMessage());
            return null;
        }
        return $spawned->getRef();
    }

    private function spawnBasket(
        ContextInterface $context
    ): ?Ref {
        $spawned = $context->spawnNamed(
            Props::fromProducer(
                fn() => new \Cart\Basket\Actor(),
                Props::withReceiverMiddleware(
                    new EventSourcedBehavior($this->provider)
                )
            ),
            "basket:$this->shopperId"
        );
        if ($spawned->isError()) {
            $context->logger()->error($spawned->isError()->getMessage());
            return null;
        }
        return $spawned->getRef();
    }

    private function stateProvider(): InMemoryStateProvider
    {
        return new InMemoryStateProvider(new InMemoryProvider(1));
    }
}

spawnWalletspawnBasketは計算機の永続化で紹介したものと同じです。
shopperIdは任意のものを与え、
ユーザーごとで管理できるようなイメージを持ってもらえればと思います。

処理イメージ通りShopperアクターがメッセージを受けて
WalletとCartに振り分けます。

ちょうど下記の部分です。

    public function receive(ContextInterface $context): void
    {
        $message = $context->message();
        switch (true) {
            case $message instanceof Started:
                $this->basketRef = $this->spawnBasket($context);
                $this->walletRef = $this->spawnWallet($context);
                break;
            case $message instanceof BasketCommandInterface:
                $context->forward($this->basketRef);
                break;
            case $message instanceof WalletCommandInterface:
                $context->forward($this->walletRef);
                break;
            case $message instanceof PayBasket:
                $context->request($this->basketRef, new GetItems($message->shopperId));
                break;
            case $message instanceof Items:
                $context->request($this->walletRef, new Pay($message, $this->shopperId));
                break;
            case $message instanceof Paid:
                $context->send($this->basketRef, new Clear($message->getShopperId()));
                $context->actorSystem()->getEventStream()->publish($message);
                break;
        }
    }

用意済みのコマンドクラスにインターフェースが実装済みですので、
どっち向けのメッセージか判断できるようになっています。

            case $message instanceof BasketCommandInterface:
                $context->forward($this->basketRef);
                break;
            case $message instanceof WalletCommandInterface:
                $context->forward($this->walletRef);
                break;

ちょうどこの部分です。
forwardはメッセージの内容はそのままで、指定されたアクターに転送するメソッドです。

ではBasketアクターを見てみましょう。
このディレクトリでは永続は実装済みなので、特に実装は必要ありません。
receiveメソッドだけ見てみましょう。

    public function receive(ContextInterface $context): void
    {
        $message = $context->message();
        switch (true) {
            case $message instanceof RequestSnapshot:
                $this->persistenceSnapshot($this->state->getProtobufItems());
                break;
            case $message instanceof AddItem:
                $this->updateState(new Added([
                    'item' => $message->item->getProtobufItem()
                ]));
                break;
            case $message instanceof GetItems:
                $context->respond($this->state);
                break;
            case $message instanceof RemoveItem:
                $removed = new ItemRemoved([
                    'productId' => $message->productId
                ]);
                $context->respond($removed);
                $this->updateState($removed);
                break;
            case $message instanceof Clear:
                $this->updateState(new Cleared([
                    'items' => $this->state->getProtobufItems()
                ]));
                break;
            case $message instanceof CountRecoveredEvents:
                $context->respond(new RecoveredEventsCount($this->nrEventsRecovered));
                break;
        }
    }

ここで注目するのはrespondメソッドを利用している部分です。

            case $message instanceof GetItems:
                $context->respond($this->state);
                break;

これはメッセージを受け取ると、送信元にそのまま返信する処理になります。
Basketアクターに今かごにあるものを教える、というものです。

つまりShopperアクターに PayBasket メッセージを送り、
Basketアクターの中身を取得し、
Walletアクターに購入を指示する(Payメッセージ)流れにすると良いことがわかります。

実装もその流れになっています。
これはAkka実践バイブルの例をPhluxorで実装していますので、
書籍がある方は手元で見てみると良いでしょう。

Walletアクターの実装を見ていただければ、
購入指示が来た際に予算があるかどうか確認し、
購入する処理となっています。

これまでのコードで一切出てこなかったのが下記の部分です。

                $notEnough = new NotEnoughCash(intval(bcsub($this->cash, $this->amountSpent)));
                $context->actorSystem()->getEventStream()->publish($notEnough);

このEventStreamは
アクターシステム内全てに流れるストリームで、
各アクターが受け取ることができます。

ただし一般的なPubSubのような形で使うものではなく、
いくつか制約があるものですので、
アクターシステム内の障害検知やうまくいかなかった処理が流れるなどに利用してください。

ここでは決済できなかったらアクターシステム内に流れるということになっています。

細かいコードは実装例を見ていただければと思います。

アクター同士の処理経路がイメージできて、
それを具体的な実装に落とし込むことができればcartはOKです。

実際に動かしたい場合はこちらをどうぞ

Basketに購入したい商品を追加し、
Shopperアクターを生成したあと、
購入したい商品追加メッセージを送信、
その後PayBasket を送信すると、
2つのアクター間でやり取りを行い決済できるのが体験できます。

このあたりはアクターモデルならではの作りや考え方が含まれているもので、
クラスベースの実装方法とも大きく異なる体験ができると思います。

ぜひチャレンジしてみてください

ちなみに今回は触れませんでしたが、   アクターシステムならではしくみですが、Phluxorだけで別サーバにあるアクターシステムと連携して
分散処理が作れるようになっています。
今回の例を元にするとこんな感じに分割することもできます。
(デプロイが独立してできますね)

メッセージが到達せずともアクターが障害になることはありませんので、
このあたりもこれまでと大きく変わる部分かなと思います。
(必要に応じてメッセージ再送・冪等性とかは実装しましょう!)

やってみて

これは仕方がないですが、PHPで扱うような処理系とはことなり、
プログラミングパラダイムが異なるものなのでなかなかすぐに面白い!という感じには
ならなかったような気がします。

このあたりはデータプラットフォームのような分散処理や、
Queue(Laravelのではない)などを活用した仕組みを理解しているかどうかも
ポイントの一つかなぁと思います。
(Semanticsなどの理解)

ちなみに
みなさんの手元で動かしてもらったコードはすべて並行で動いてますので
そのあたりもPhluxorの実装コードなんかも読み解きながら
理解していただけるといいかなと思います。