PHPでアクターモデルを体験 / ツールキット公開とサンプルコードの巻

PHPでアクターモデルが導入できない・・?

長い間そう言われてきました(自分も言ってました)。
実際に並行システムのためのものでもあり、
PHPでは簡単な並行処理以外は難しくもありました。

ja.wikipedia.org

PHP主体の方はOOPに慣れ親しんでいると思いますが、
アクターモデルはOOPとは異なるプログラミングパラダイムとなることもあり、
アクターモデルは実際に触りまくって概念等を覚えないとなかなかシュッと使えるものでもありません。

そのためにPHP以外の言語をわざわざ触るのも・・・
などもあるかなと思います。

そして月日が流れ、PHPでも並行処理が比較的手軽に利用することができるようになりました。
今なら作れるのでは、と思い実際にPHPのみでアクターモデルが導入できるツールキットを作って公開しました。

github.com

フレームワークではありませんので好みのフレームワーク内で利用できます(値はchannelなどを使って取り出してください)
ちなみに読み方はフラクサー

packagist.org

$ composer require phluxor/phluxor

で試せます。

現在はSwooleが導入済みであれば特別なことをしなくても、並行処理+アクターモデルを体験できるようにしています。

PHPでちゃんとしたアクターモデル ツールキットが見当たらなかったため、
+日常的にアクターモデルと触れているので割としっかりしたものを作ってみました。

個人的には思ったよりもしっかりできてしまって笑っております・・

実装済みのもの

アクターモデルでは当たり前ですが、
他の処理からアクターを直接操作することは一切できません。
状態などが戻り値としてほしい場合は、メッセージ送受信を行います。

PHPなどでは一般的?なリポジトリ・集約はアクターが担当することになりますので、
このあたりの理解が難しい方は矯正ギブスかのように理解できるようになると思います。
後述の永続化をサポートしていますので、本格的に利用できます。

アクターライフサイクル

アクターモデルといえば当たり前ですが、ライフサイクルを実装しています。
起動や停止はもちろんですが、再起動も自動で行います。

再起動ができるということはSupervisionも実装しています。
アクターの再起動方法は、OneForOneStrategy AllForOneStrategy ExponentialBackoffStrategy RestartStrategy
があります。

細かい解説は Goで実践アクターモデル シリーズのブログで解説しますが、
多くのアクターシステムで導入されているものを用意しています。

アクターは基本的にクラッシュさせる前提ですが、
子アクターがクラッシュしたときに同階層(アクターモデルはヒエラルキーを形成します)のアクターをすべて再起動させるか、
クラッシュした子アクターのみを再起動させるか、
または全て再起動させるか、
再起動の時間を徐々に延長するか、などを選択できます。

Mailbox

Akka、Pekko、Proto Actorなどと同様に、アクターごとにMailboxを持つようになっています。
解説については下記を参考にしてください。

blog.ytake.jp.net

またバックプレッシャーがありますので、 量が多すぎるなどは配信数を調整できます。
2以上の設定がおすすめです(高トラフィックにならないと多分使いません)

DeadLetter

停止済みのアクターに送信してしまった場合などのDeadLetter配信も実装済みです。

blog.ytake.jp.net

PubSub

上記のDeadLetterはアクターシステム内のEventStreamを使って配信されます。
つまりアクターシステム内のアクター全てに配信・購読ができます。

Become Unbecome

いわゆるアクターの状態変化ですが、
これも同様に実装しています。

アクターモデルの場合、Webアプリケーションのような一方向の流れだけに存在するわけではなく、
処理のスループットなどに応じて分散させたり、
特定の処理が終わるまで処理をスキップさせたり、on offスイッチのように利用できるものが必要になります。

これもAkka、Pekko、Proto Actorなどと同様ですので、
ネット上の記事を参考に実装できるようになっています。

Future

いわゆるasync/ awaitのようなもので、
あとから決定される値が今あるかのように使えるものです。

ja.wikipedia.org

なにげに一番たいへんだったかもしれません・・・。

時間制限ももちろん使えますので(秒単位の指定になりますが)、1秒以内に処理が終わらない場合は強制的に処理を停止し、
Timeoutしたことを伝えるメッセージがアクターに流れるようになっています。

いわゆる情報伝達のパイプライン設計にも利用するもので、
どちらか早い方を利用したり、複数のアクターの時間軸になるようなものだったり
いろんな使い方ができます。

まだ調整中ですが、アクターで処理を待ちつつ他の処理を先にするなどのものも実装済みです。

ja.wikipedia.org

Persistent

これがないとPHPのシステムでは大変!
ということでアクターの永続化も実装済みです。

アクターがメッセージを受け取るとそれを永続化することができ、
アクター再起動時にその当時の状態に自動で戻ります。

ジャーナルとスナップショット両方がありますので、近い方から状態復元を行います。

あれ、これってEvent Sourcingってやつなのでは?

はい、そのとおりです。
PHPではCDCなどを併用しないと実現できませんでしたが
PHPだけで実現できるようになっています。

使い方などはそのうち解説か、ツールキットのドキュメントページを作成予定なので載せます。
(サンプルコードにもありますのでどうぞ)

メッセージの保管にはProtocol Buffersを使うように制限をかけていますので、
不安定なJSONを使わずに型指定をしていきましょう。

protobuf.dev

実装予定のもの

クラスタ / バーチャルアクター

PHPではアクターモデルはあまり使われないとは思いますが、
せっかくなので他言語でも提供されている仕組みは提供しようと思っています。

アクターシステムと言えば、物理的にことなるサーバにあるプロセスでも
どこにあっても同じように利用できるわけですが、
Phluxorでも同様にクラスタをサポートする予定です。

一般的なアクターのクラスタ、Orleansなどでおなじみのバーチャルアクターを実装予定です。

https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/Orleans-MSR-TR-2014-41.pdf:title

OpenTelemetry

もちろん OpenTelemetry対応を行います。

並行処理ということもありデバックが多少むずかしいため(慣れるとわかりますが)、
ある程度可視化もできるように対応します。

簡単に取り出せるストリーム

Akka Streamのようなものではありませんが、
Proto Actorにもあるような指定した型のメッセージを簡単に取り出せるものを用意します。
個人的にあると楽なので実装しますw

ルーター

情報伝達経路の設計やスループットの改善、
効率の良いリソース利用などでは欠かせないルーターですが、まだ未実装です。
こちらも一般的なラウンドロビンやconsistent hash、スキャッターギャザーなどができるように提供します。

永続化プロバイダー

いくつかのデータベース向けに用意する予定です

ほかいくつか考えています。

ということで、まぁまぁちゃんとしたものができつつありますが、
久しぶりにPHPを書いたのもあり、微妙なコードも多くありますのでPRお待ちしております。

せっかく作るならある程度ガチなものにしたい!ということで以下サンプルです。

サンプルコード紹介

Proto Actor(Go)で実践するシリーズにも書いたサンプルと全く同じ挙動ができます。

blog.ytake.jp.net

前半の細かい解説は元記事を読んでいただければと思います。

サンプルコードは下記のものです。

github.com

まずは先生アクター、生徒アクター、教室アクターの3つを用意します。

生徒アクター解説

<?php

declare(strict_types=1);

namespace PhluxorExample\Student;

use Phluxor\ActorSystem\Context\ContextInterface;
use Phluxor\ActorSystem\Message\ActorInterface;
use PhluxorExample\Command\StartTest;
use PhluxorExample\Command\SubmitTest;
use Random\RandomException;

class Actor implements ActorInterface
{
    /**
     * @throws RandomException
     */
    public function receive(ContextInterface $context): void
    {
        $msg = $context->message();
        if ($msg instanceof StartTest) {
            sleep(random_int(1, 9));
            $context->logger()->info(
                sprintf(
                    '%s が %s テストの解答を提出します',
                    $context->self()?->protobufPid()->getId(),
                    $msg->getSubject()
                )
            );
            $context->send($context->parent(), new SubmitTest([
                'subject' => $msg->getSubject(),
                'name' => $context->self()?->protobufPid()->getId(),
            ]));
            $context->poison($context->self());
        }
    }
}

アクターとして機能させたいものには、Phluxor\ActorSystem\Message\ActorInterface を実装します。
とは言ってもメソッドは一つだけです。

receive(ContextInterface $context): void

$contextにはアクターに対する情報がいくつか流れてきます。
主なものはアクター向けのメッセージですね。

$context->message(); で取り出せるようになっています。
PHPの場合はジェネリクスがないため、Akka等でいうクラシックスタイルになります。
matchなどでもうまく活用していただければと思いますが、
上記のように流れてくるメッセージの形に合わせて処理を記述します。

ユーザーメッセージとして利用するものは、Protocol Buffersから生成済みですのでこちらと、Proto Actorの記事を参考にしてください。

この生徒アクターはランダムでsleepするようになっています。
複数アクターを生成しても最大9秒しか待たないはずですね。

<?php
            $context->send($context->parent(), new SubmitTest([
                'subject' => $msg->getSubject(),
                'name' => $context->self()?->protobufPid()->getId(),
            ]));

上記のコードはメッセージを受け取ったあとに $context->parent()
つまり生成もとである親アクターにメッセージを送るという意味です。

第2引数に SubmitTest とありますがテストの提出を表現したものです。
長いですが、自アクターのID/名前は $context->self()?->protobufPid()->getId() で取得できます。
Pid はAkka/PekkoのActorRefと同じものです。
このツールキットはProto Actorをモチーフに再現して作ったようなものなので 今はPidという名前を採用していますが、
ActorRefに変更するかもしれません(他言語のものに移行しやすいように)

最後の $context->poison($context->self()); は自アクターの停止です。
これを指定することでプロセスレジストリから削除され、参照がなくなります。

先生アクター解説

長くなってしまうので割愛しますが、
下記のものとほとんど同じです。

先生アクターを実装する

この先生アクターが生徒アクターの親になっています。
生徒からのメッセージは先生が受け取るようになっていますので、SubmitTest の処理があります。
全生徒が解答用紙を提出するとテストが終わったことを replyTo にある他のアクターに送信して
先生アクターが停止します。

<?php

declare(strict_types=1);

namespace PhluxorExample\Teacher;

use Phluxor\ActorSystem\Context\ContextInterface;
use Phluxor\ActorSystem\Message\ActorInterface;
use Phluxor\ActorSystem\Message\Restarting;
use Phluxor\ActorSystem\Pid;
use Phluxor\ActorSystem\Props;
use PhluxorExample\Command\FinishTest;
use PhluxorExample\Command\PrepareTest;
use PhluxorExample\Command\StartTest;
use PhluxorExample\Command\SubmitTest;
use PhluxorExample\Student\Actor as StudentActor;

class Actor implements ActorInterface
{
    /** @var SubmitTest[] */
    private array $endOfTests = [];

    /**
     * @param int[] $students
     * @param Pid $replyTo
     */
    public function __construct(
        private readonly array $students,
        private readonly Pid $replyTo
    ) {
    }

    public function receive(ContextInterface $context): void
    {
        $msg = $context->message();
        switch (true) {
            case $msg instanceof Restarting:
                $context->send($context->self(), new PrepareTest(['subject' => 'math']));
                break;
            case $msg instanceof PrepareTest:
                $context->logger()->info(sprintf('先生が%sテストを出しました', $msg->getSubject()));
                foreach ($this->students as $student) {
                    $ref = $context->spawnNamed(
                        Props::fromProducer(fn() => new StudentActor()),
                        sprintf('student-%d', $student)
                    );
                    if ($ref->isError()) {
                        // $context->logger()->error(sprintf('生徒 %d 生成できませんでした', $student));
                        throw new \RuntimeException('生徒生成失敗');
                    }
                    $context->send($ref->getPid(), new StartTest(['subject' => $msg->getSubject()]));
                }
                $this->endOfTests[] = $context->self();
                break;
            case $msg instanceof SubmitTest:
                $this->endOfTests[] = $msg;
                if (count($this->endOfTests) === count($this->students)) {
                    $context->send($this->replyTo, new FinishTest(['subject' => 'math']));
                    $context->poison($context->self());
                }
                break;
        }
    }
}

多くのツールキット同様にアクターの名前を自分で付与するか自動で付与するかを選択できます。

spawnNamedspawn を選択できます。
Props::fromProducer はアクターに関する定義などを指定できます。
こちらもいくつかの定義方法があり、永続化の指定などもここのオプション(第2引数以降)でできます。

教室アクター

こちらも詳しい解説は元記事を参照してください。(記法の差くらいで全く同じです)

<?php

declare(strict_types=1);

namespace PhluxorExample\Classroom;

use Phluxor\ActorSystem\Context\ContextInterface;
use Phluxor\ActorSystem\Message\ActorInterface;
use Phluxor\ActorSystem\Pid;
use Phluxor\ActorSystem\Props;
use PhluxorExample\Command\FinishTest;
use PhluxorExample\Command\PrepareTest;
use PhluxorExample\Command\StartsClass;
use PhluxorExample\Event\ClassFinished;
use PhluxorExample\Teacher\Actor as TeacherActor;

readonly class Actor implements ActorInterface
{
    /**
     * @param Pid $stream
     * @param int[] $students
     */
    public function __construct(
        private Pid $stream,
        private array $students
    ) {
    }

    public function receive(ContextInterface $context): void
    {
        $msg = $context->message();
        switch (true) {
            case $msg instanceof StartsClass:
                $ref = $context->spawn(
                    Props::fromProducer(
                        fn() => new TeacherActor(
                            $this->students, $context->self()
                        )
                    )
                );
                $context->send($ref, new PrepareTest(['subject' => $msg->getSubject()]));
                break;
            case $msg instanceof FinishTest:
                $context->send($this->stream, new ClassFinished([
                    'subject' => $msg->getSubject(),
                ]));
                $context->poison($context->self());
                break;
        }
    }
}

これらを実装すると下記のようなヒエラルキーや伝達経路を構築します。

flowchart TD
subgraph ActorSystem
教室アクター --> |PrepareTest|先生アクター
先生アクター --> |StartTest|生徒アクター
生徒アクター --> |SubmitTest|先生アクター
end
flowchart TD
subgraph ActorSystem
Teacher("先生アクター") --> |生成|student-1
Teacher("先生アクター") --> |生成|student-2
Teacher("先生アクター") --> |生成|student-3
Teacher("先生アクター") --> |生成|student-4
Teacher("先生アクター") --> |生成|student-5
end

動かす!

最後にコンソールで簡単に実行できるようにします。

<?php

declare(strict_types=1);

require_once 'vendor/autoload.php';

function main(): void
{
    \Swoole\Coroutine\run(function () {
        \Swoole\Coroutine\go(function () {
            $system = \Phluxor\ActorSystem::create();
            $pipe = $system->root()->spawn(
                \Phluxor\ActorSystem\Props::fromFunction(
                    new \Phluxor\ActorSystem\Message\ReceiveFunction(
                        function (\Phluxor\ActorSystem\Context\ContextInterface $context): void {
                            $msg = $context->message();
                            if ($msg instanceof \PhluxorExample\Event\ClassFinished) {
                                $context->logger()->info(
                                    sprintf('クラスが終了しました: %s', $msg->getSubject())
                                );
                            }
                        }
                    )
                )
            );
            $stream = $system->root()->spawnNamed(
                \Phluxor\ActorSystem\Props::fromProducer(
                    fn()  => new \PhluxorExample\Classroom\Actor($pipe, range(1, 20))
                ),
                'math-classroom'
            );
            $system->root()->send($stream->getPid(), new \PhluxorExample\Command\StartsClass(['subject' => 'math']));
        });
    });
}

main();

そのうちSwooleを意識しないようなコードになる予定ですが、手軽に動かす場合は
\Swoole\Coroutine\run\Swoole\Coroutine\go を使ってその中でアクターシステムを生成します。

アクターシステム自体は $system = \Phluxor\ActorSystem::create(); で生成できます。
細かいオプション指定もいくつかありますが、Phluxorに詳しくない場合はそのままで利用してください。
LoggerはPSR3に対応していれば何でもおkですが、デフォルトではmonologの標準出力になっています。

アクター生成の始まりは、アクターシステムのルートからです。
$system->root()->spawn で教室アクターからの返信を受け取るパイプラインになるアクターを生成します。
Futureを利用したrequestFutureを使って取り出すこともできます。
それはまたいつか解説します。

<?php

            $pipe = $system->root()->spawn(
                \Phluxor\ActorSystem\Props::fromFunction(
                    new \Phluxor\ActorSystem\Message\ReceiveFunction(
                        function (\Phluxor\ActorSystem\Context\ContextInterface $context): void {
                            $msg = $context->message();
                            if ($msg instanceof \PhluxorExample\Event\ClassFinished) {
                                $context->logger()->info(
                                    sprintf('クラスが終了しました: %s', $msg->getSubject())
                                );
                            }
                        }
                    )
                )
            );

ここでは面倒なので生成と受取を一気に指定していますが、
前述したようにいくつか定義方法がありますのでシーンに合わせて選択してください。
ここではテスト終了の \PhluxorExample\Event\ClassFinished を受け取って、教室の終了を出力しているだけです。

<?php

            $stream = $system->root()->spawnNamed(
                \Phluxor\ActorSystem\Props::fromProducer(
                    fn()  => new \PhluxorExample\Classroom\Actor($pipe, range(1, 20))
                ),
                'math-classroom'
            );
            $system->root()->send($stream->getPid(), new \PhluxorExample\Command\StartsClass(['subject' => 'math']));

最後に今回のサンプルのメインでもある教室アクターを生成して、生徒と先生の生成を教室アクターにお願いします。
生徒の数は20人指定していますので、生徒アクターが20個それぞれが独立して動作するようになります。

最後にこれを実行すると・・・

上記のようなログが出力されて約9秒前後に処理が一気に終わるはずです。

ということで割とちゃんとしたアクターモデルが体験できますので気軽に利用してみてください。
まだまだ未実装機能がありますので、ちょいちょい変更されることを前提にどうぞ。

まだWebアプリケーションフレームワークとの結合等はやっていないですが、
Proto Actorの例やAkka HTTPなどを参考にするとできると思います。

github.com

最後に

ちょっと古い本ですが、このツールキットを通じてアクターモデルを習得するには十分な本です。
PHP向けということもあってクラシックスタイルになりますので、ほぼそのまま読み替える事ができます。
ルーターは少しずつ実装しますのでお待ちください。

DDDやES+CQRSなど、アクターモデルを使って表現するために
良い知識習得ができます。

academy.lightbend.com

このツールキット(Phluxor)で割とメッセージングが利用できるようになりますので、
知識の保管にもどうぞ。(ネットでも同タイトルページで閲覧できます)

アクターモデルを使ってDDDの戦術パターンやりたい!という方は今からでも間に合います。

本格的に使いたい方のPRお待ちしております(スポンサーもよければ・・)

エンジニア募集しております

findy-code.io

ついでにProto Actor(Go)を使ったES+CQRSの話もしますので是非どうぞ