PHPerKaigi2025でCQRS+ESやマイクロサービスアーキテクチャ実践のヒントになるトークをしました

長いので目次!

まぁまぁ難しい話をしてきました

事前に本ブログの読み方のアドバイスです。
ちょっと読んでわからない場合は、Google NotebookLMにURLか文章そんまま突っ込んでみてください。

notebooklm.google

毎回楽しいカンファレンスありがとうございました!
皆さんお疲れ様でした!

ということで去年久しぶりにPHPで書いてみたらできてしまったアクターモデル導入ツールキットの Phluxorをベースに、
アクターモデルの活用方法の一つとして、表題にかなり関連するトークをしてきました。

内容的にはPHPを問わずアクターモデルの活用や、
CQRS+ESやマイクロサービスアーキテクチャの
具体的な実装方法も含まれる内容になっています。
(最近では導入する企業は少ないと思いますが)

speakerdeck.com

本ブログで内容の解説をしていきますが、前提知識が要求されますので要注意です。

また一般的なPHPのプログラミングや設計手法だけでは真似できない形になっています。
見様見真似で作ってもオブジェクトなどがすべて並行で非同期、
かつ位置透過性などを担保する仕組みに乗ったアクターシステムでなければ同じことはできません。

PHPと入っていますが実はPHPはあんまり関係ありません・・!
アクターシステムがある言語であれば基本は同じです。

イメージをお伝えしておくと、
一つ一つのオブジェクトがSupervisorやDaemonのような仕組みで動いており、
このオブジェクトはreturnなどではなく、Queueを用いたような体験で処理をするイメージです。
すべて並行・非同期でかつ自律的に動く仕組みになっています。
障害発生時は自動復旧したり、監視の仕組みを自分でデザインしたり一般的なQueueとはちょっと異なります。

また状態復元ができる仕組みを持っていますので、処理して終わりではなく世代管理のような形で
オブジェクトが自動的に直前の状態に戻ったりする挙動やライフサイクルを持っています。
多分全然わからないと思いますが、PHPにはないモノなのでわからないのは普通です
PHPの既存のWebフレームワークでなどアクターモデルを導入しているものは存在していません。
アルゴリズムなどに該当するものです

Java、ScalaやErlangなどでは広く使われているものでもありますので、
興味がある方は調べてみるといいでしょう!

このブログを読んだあとに動画を見られる方もいると思いますので補足を書いておきますが、
CQRS+ES・マイクロサービスアーキテクチャを実際に取り入れていくには、
PHPを用いた一般的なWebアプリケーション以外の手法や知識を持っていないと
複雑な割には障害にも強くなかったり不整合が多く発生したり、
スループットを出そうと思ったらメッセージングのアンチパターンを歩んだりすることが多くなります。

そういった問題に取り組む場合にアクターモデルを理解していれば
武器になる知識や手法が多く手に入ります。

が、アクターモデルはPHPでは一般的ではなく
実現できるものがほぼなかった概念となっていますので、
アタクーモデルについてある程度事前知識が必要なセッションとなっています。

事前に以下の記事などを読んでおくことをおすすめします。
知らないとトークを後で見ても、言葉や考え方がほとんどわかりません!

zenn.dev

codezine.jp

speakerdeck.com

アクターモデルを体験できるのはPHPでは拙作のものしかありませんので、
こちらも事前に少し体験しておくといいとおもいます。
*あくまで普段の言語で体験できる学習・トレーニング用だと思ってください。
普段は他言語のものを使っているのもあり、重厚な機能などの実装予定はありません!

GoやJVM系の言語を普段から触られている方は、Proto Actor (Go)やAkka / Pekkoを参照ください。
Phluxorは概念や使い方もほとんど同じようになるように実装しています。
PHP的な実装コードになっていないのもそういった側面からです。

codezine.jp

blog.ytake.jp.net

ドキュメントは途中までしか書いておりませんが、
アクターモデルについて簡単な解説もあります。

phluxor.github.io

speakerdeck.com

トークで利用したコードはこちらです。

github.com

こちらもどうぞ

コードは古いですが、内容を理解して写経するには十分です
(前のAkkaのバージョンをお使いください)

PDOとかのトランザクション内でSQSとか複数処理すればいいんじゃない?

そういった疑問を持たれる方も多くいるのかもしれません。

実はこれはコード上はそう見えるだけで、
プロセスなどをコントロールしているわけではないので、
2相コミット・分散トランザクションなどの問題には対応できないのです。

RDBトランザクションと外部システム間でのコミットが同期されていない

RDB(リレーショナルデータベース)のトランザクションは、PDOによって開始・コミット・ロールバックが行われます。

しかし、RedisやSQSといった外部システムに対する操作は独立した処理基盤を持っており、
RDBと同一のトランザクション制御下にないため、
RDBトランザクションのコミットと完全に同期して処理をアトミックに確定させることができません。

RDBのコミットが成功しても、
RedisやSQSへのパブリッシュが一時的な障害で失敗する可能性がありその逆も起こり得ます。

冪等性や再送制御を考慮した仕組みが不足している

分散トランザクションを実現するうえでは、各システム間でのデータの不整合を回避するため、
メッセージの再送制御や冪等性を担保する仕組みが不可欠です。

単純にRDBトランザクション内部でRedisやSQSへパブリッシュしているだけでは、
万一の障害発生時に「RDBはコミットされているがメッセージキューは書き込まれていない」
「重複して書き込んでしまう」などのシナリオを防ぎきれません。

アプリケーション側で手動リトライ等を行う場合にも、
どの状態から再処理を行うかによってデータ不整合が生じるリスクが高まります。

Exceptionがスローされてないからと言って担保できているとは限らないのです・・。

本来は2相コミットやサガパターンなどが必要

複数の分散システムで原子的なトランザクションを実現するには、
2相コミット(Two-Phase Commit, 2PC)やサガ(Saga)パターンなどを用いたワークフロー制御が一般的です。

これらの手法は、
外部システムとの間でコミット・ロールバックを含む高度な整合性を保つためのプロトコル・設計パターンとなっています。

単純に1つのRDBトランザクション内で外部システムを呼び出すだけでは、
こうした分散トランザクション制御の要件を満たせません。

整合性が担保されないままの実装はリスクが大きい

分散システム間でデータの齟齬が生じると、最悪の場合、ビジネス上の重大インシデントにつながる恐れがあります。
たとえば、「在庫が引き当てられていないのに出荷指示が出てしまう」
「決済が完了していないのにメッセージが送信され、後続の処理が進んでしまう」など、
サービスの信頼性に直結する不具合が発生し得ます。

これは実際に体験するとわかると思います・・(わりと大事なときに起こるあるある)

従来のPHPプログラミングとの違い

長い前置きのように従来のPHPプログラミングとアクターモデルには、
いくつかの根本的な違いがあります。

またこれらは一般的なPHPの処理ランタイムの違いも含まれます。

1. 処理モデル

従来のPHP:
- リクエスト・レスポンス型の同期処理
- 1リクエスト = 1プロセス
- 処理が完了するまでブロック

// 従来のPHPプログラミング
$result = $database->query("SELECT * FROM users");
$users = $result->fetchAll();
foreach ($users as $user) {
    echo $user['name'];
}

アクターモデル:
- メッセージ駆動型の非同期処理
- 多数のアクターが並行して動作
- メッセージを送信して次の処理に進む

// アクターモデルの考え方(概念的な例)
$databaseActor->send(new QueryMessage("SELECT * FROM users"));
// 応答を待たずに次の処理に進む

// 別のメッセージハンドラで結果を受け取る
public function receive(ContextInterface $context): void {
    $message = $context->message();
    if ($message instanceof QueryResultMessage) {
        $users = $message->getResult();
        // 処理を続行
    }
}

Queueなどを使うイメージだと思いますが、
ミドルウェアは使わずにこれらが実現できるようになっています。

2. 状態管理

従来のPHP:
- セッションやデータベースに状態を保存
- グローバルな状態にアクセス可能
- 状態の共有が容易

アクターモデル:
- 各アクターが自身の状態を完全にカプセル化
- 状態へのアクセスはメッセージ経由のみ
- shared nothing な通信モデル

3. エラー処理

従来のPHP:
- try-catchによる例外処理
- エラー発生時にトランザクションをロールバック
- エラーが発生したリクエストのみが影響を受ける

アクターモデル:
- 監督階層によるエラー処理
- 親アクターが子アクターの障害を処理
- 「Let it crash」哲学(問題が発生したらクラッシュさせて再起動する)

そんな違いがあります。

利点は以下のものがあります。

  1. スケーラビリティ: アクターは独立して動作するため、水平スケーリングが容易
  2. 耐障害性: 監督階層により、障害が発生しても自動的に回復
  3. モジュール性: アクターは明確な責務を持ち、システムの結合度を下げる
  4. 並行性: 多数のアクターが並行して動作し、システム全体のスループットを向上

一般的なPHPのプログラミングパラダイムとは
かなり違うもの、と認識しておくほうが逆にわかりやすいと思います。
MVCでいうと・・みたいな比較はできませんのでリセットしましょう。

ではSagaパターンとは?

ここからがトークの中身ですね・・w

Sagaパターンは、分散システムにおける長時間実行トランザクションを管理するためのパターンとなってます。

複数のサービスやリソースにまたがる処理を、一連の小さなローカルトランザクションに分割し、
各ステップが失敗した場合に補償トランザクション(補償アクション)を実行することで、
システム全体の一貫性を保つというものです。

Sagaパターンの名前は、Hector Garcia-MolinaとKenneth Salemによって発表された論文「Sagas」に由来しています。
当初はデータベーストランザクションの文脈で提案されましたが、
現在ではマイクロサービスアーキテクチャにおける分散トランザクション管理の手法として広く採用されているものです。

割と古いものですね。

Sagaパターンの特徴

  1. 分割されたトランザクション: 大きなトランザクションを小さなローカルトランザクションに分割
  2. 補償トランザクション: 失敗時に既に完了したトランザクションを元に戻す処理
  3. イベント駆動: 各トランザクションの完了がイベントとなり、次のステップをトリガー
  4. 状態管理: トランザクションの進行状態を管理
graph LR
    A[トランザクション1] --> B[トランザクション2]
    B --> C[トランザクション3]
    
    B -- 失敗 --> D[補償トランザクション1]
    C -- 失敗 --> E[補償トランザクション2]
    E --> D
    
    style D fill:#ffcccc
    style E fill:#ffcccc

こんな感じ

従来のトランザクション処理との違い

2フェーズコミット(2PC)

従来の分散トランザクション管理では、2フェーズコミット(2PC)プロトコルが一般的でした。

  1. 準備フェーズ: コーディネーターが全参加者に準備を要求し、各参加者が準備完了を応答
  2. コミットフェーズ: 全参加者が準備完了なら、コーディネーターがコミットを指示

PHPで実際に実装するのはかなり難しいです。
Apache CamelやScalarDBなどを使いましょう。

2PCの問題は明らかですが、
- 参加者の1つが応答しないとブロックする
- コーディネーターの単一障害点
- 長時間のリソースロック

というものがあります。

Sagaパターン

Sagaパターンでは各ステップが独立したトランザクションとなり、
失敗時には補償アクションを実行されます。

Sagaパターンの利点は下記のとおりです。

  • リソースのロック時間が短い
  • 各サービスの独立性を保持
  • 障害からの回復メカニズムが組み込まれている

多分さっぱりわからないと思います。

Sagaパターンの実装方法

Sagaパターンには主に2つの実装アプローチがあります。
多分聞いたことがある方もいるでしょう!

1. コレオグラフィ(Choreography)

コレオグラフィアのSagaでは各サービスがイベントを発行し、
他のサービスがそれに反応する形で処理が進行します。
中央のコーディネーターは存在せずサービス間の直接的な連携によって全体のフローが形成されます。

graph LR
    A[サービスA] -->|イベント発行| B[サービスB]
    B -->|イベント発行| C[サービスC]
    C -->|イベント発行| D[サービスD]
    
    B -- 失敗イベント --> A
    C -- 失敗イベント --> B
    D -- 失敗イベント --> C

PHPの文脈で解説されるのはほぼこちらのパターンで、
SQSなどのミドルウェアなどを多用して作る形になります。

これらのメッセージング系はアンチパターンもたくさんありますので、
興味のある方はOOCのトークなどもどうぞ。

2. オーケストレーション(Orchestration)

オーケストレーションのSagaでは、
中央のコーディネーター(オーケストレーター)が全体のフローを制御する形になります。
各サービスはオーケストレーターからの指示に従って処理を実行し結果を報告します。

graph TD
    O[オーケストレーター] -->|コマンド| A[サービスA]
    O -->|コマンド| B[サービスB]
    O -->|コマンド| C[サービスC]
    
    A -->|結果| O
    B -->|結果| O
    C -->|結果| O

PHPでのSagaパターン実装の課題と解決策

課題1: 状態管理

Sagaは基本的に長時間実行トランザクションになるため状態の永続化が重要になります。
PHPの従来のリクエスト・レスポンスモデルでは、リクエスト間で状態を保持することが難しいという
そもそもの課題があります。

解決策としては下記のようなものになります。

  • データベースやRedisなどを使用して状態を永続化
  • イベントソーシングパターンを採用して、全イベントを記録
  • アクターモデルと組み合わせて、アクター内に状態を保持

課題2: 冪等性(べきとうせい)

Sagaの各ステップは、ネットワーク障害などにより複数回実行される可能性があります。
そのため各ステップは冪等(何度実行しても同じ結果になる)である必要がでてきます。

解決策としては下記のようなものです。

  • 一意のトランザクションIDを使用
  • 処理済みのトランザクションを記録
  • 条件付き更新を使用

このあたりからどんどん難しくなっていきますね!

課題3: 非同期処理

Sagaパターンは本質的に非同期であり、
PHPの同期処理モデルとの相性が良くないという課題があります。

このあたりは言語的な問題が若干あるところです。
そのうち解決されるとは思いますが!

このため解決策は下記のどれかになるわけですね。

  • メッセージキュー(RabbitMQ、Kafkaなど)を使用
  • Swoole/OpenSwooleを活用した非同期処理
  • アクターモデルと組み合わせる
// Swooleを使用した非同期処理の例
use Swoole\Coroutine as Co;

Co\run(function() {
    $channel = new Co\Channel(1);
    
    // 非同期で支払い処理を実行
    go(function() use ($channel) {
        echo "支払い処理を開始します\n";
        Co::sleep(2); // 処理時間をシミュレート
        
        $success = (rand(0, 10) > 2); // 80%の確率で成功
        $channel->push($success);
    });
    
    // 結果を待機
    $paymentResult = $channel->pop();
    
    if ($paymentResult) {
        echo "支払いが成功しました\n";
        
        // 非同期で出荷処理を実行
        go(function() {
            echo "出荷処理を開始します\n";
            Co::sleep(3); // 処理時間をシミュレート
            
            $success = (rand(0, 10) > 1); // 90%の確率で成功
            
            if ($success) {
                echo "出荷が成功しました\n";
            } else {
                echo "出荷に失敗しました\n";
                echo "補償アクション: 支払いを返金します\n";
            }
        });
    } else {
        echo "支払いに失敗しました\n";
        echo "補償アクション: 注文をキャンセルします\n";
    }
});

アクターモデルとSagaパターンの組み合わせ

ということで、他言語のアクターモデルのツールキットを
PHP向けに移植したものがPhluxorというわけです。
(内部はSwooleのCoroutineやTimerを使って色々やっています)

アクターモデルとSagaパターンは相性が良く、
組み合わせることで強力な分散トランザクション管理システムを構築できます。

利点としては以下のとおりです。

  1. 状態管理: アクターは内部状態を持つため、Sagaの状態管理に適しています
  2. 非同期処理: アクターモデルは本質的に非同期であり、Sagaの非同期性と合致します
  3. 監督階層: アクターの監督機能を使用して、障害からの回復を管理できます
  4. メッセージパッシング: アクター間のメッセージパッシングは、Sagaのステップ間の通信に適しています

苦手だった部分をこれまでのPHPではなかったアプローチで体験してみる、ということですね。
トークでもお話していますが、商用でPhluxorを使わないようにお願いします。。
他言語のものと比べると機能が足りなかったり、クラスタ対応の実装をしていません!

トークの送金Sagaの概要

このトークで扱った送金Sagaは、以下のステップを仮のものとしています。

  1. 送金元口座から金額を引き落とす(デビット)
  2. 送金先口座に金額を入金する(クレジット)
  3. 入金が失敗した場合、送金元口座に返金する(ロールバック)

これらは別なサービスなどで動いていると過程してもらっていいです。
サービスというか別なサーバにあるアクターなどをイメージしてください。
(アクターシステム的にも別サーバにあるアクターとやり取りができます)

これらのステップをSagaパターンに基づいて実装していくわけです。
各ステップは独立したトランザクションとして実行されて、
かつ障害が発生した場合はアクターの仕組みを使って一貫性を回復するしくみになっています。

graph LR
    A[送金元口座からデビット] --> B[送金先口座へクレジット]
    B -->|成功| C[送金完了]
    B -->|失敗| D[送金元口座へロールバック]
    D --> E[送金失敗(一貫性維持)]
    
    style D fill:#ffcccc

アクターの設計

Phluxorを使った送金Sagaの実装では、以下のようなヒエラルキー・階層を設計します:

graph TD
    Root[Root Actor] --> Runner[Runner]
    
    Runner --> Account1[FromAccount]
    Runner --> Account2[ToAccount]
    Runner --> TP[TransferProcess]
    
    TP --> DA[DebitAttempt<br>AccountProxy]
    TP --> CA[CreditAttempt<br>AccountProxy]
    TP --> RD[RollbackDebit<br>AccountProxy]
    
    DA -.-> Account1
    CA -.-> Account2
    RD -.-> Account1
    
    classDef actor fill:#ddf,stroke:#333,stroke-width:1px
    class Root,Account1,Account2,DA,CA,RD,TP,Runner actor

下記のアクターで構成されます。

  1. Root Actor: システムのルートアクター
  2. Runner: 送金プロセスのオーケストレーター
  3. Account: 口座を表すアクター(FromAccount, ToAccount)
  4. TransferProcess: 送金Sagaの状態機械
  5. AccountProxy: アカウントとの通信を仲介するアクター(DebitAttempt, CreditAttempt, RollbackDebit)

主要なコンポーネントの役割について

1. Runner

Runnerは送金プロセスのオーケストレーターになります。
口座アクターとTransferProcessアクターを作成し結果を集計することになります。

<?php
// src/PhluxorSaga/Runner.php
declare(strict_types=1);

namespace PhluxorSaga;

use Phluxor\ActorSystem\Context\ContextInterface;
use Phluxor\ActorSystem\Message\ActorInterface;
use Phluxor\ActorSystem\Message\Started;
use Phluxor\ActorSystem\Props;
use Phluxor\ActorSystem\Ref;
use Phluxor\Persistence\InMemoryProvider;
use PhluxorSaga\Internal\ForWithProgress;
use PhluxorSaga\ProtoBuf\SuccessResult;
use PhluxorSaga\ProtoBuf\FailedAndInconsistent;
use PhluxorSaga\ProtoBuf\FailedButConsistentResult;

class Runner implements ActorInterface
{
    private array $transfers = [];
    private int $successResults = 0;
    private int $failedAndInconsistentResults = 0;
    private int $failedButConsistentResults = 0;
    private int $unknownResults = 0;

    public function __construct(
        private readonly int $numberOfIterations,
        private readonly int $intervalBetweenConsoleUpdates,
        private readonly float $uptime,
        private readonly float $refusalProbability,
        private readonly float $busyProbability,
        private readonly int $retryAttempts,
    ) {
    }

    public function receive(ContextInterface $context): void
    {
        $message = $context->message();

        switch (true) {
            case $message instanceof SuccessResult:
                // 送金成功の処理
                $this->successResults++;
                $this->checkForCompletion($message->getFrom());
                break;

            case $message instanceof FailedAndInconsistent:
                // 不整合状態での送金失敗の処理
                $this->failedAndInconsistentResults++;
                $this->checkForCompletion($message->getFrom());
                break;

            case $message instanceof FailedButConsistentResult:
                // 一貫性を保った送金失敗の処理
                $this->failedButConsistentResults++;
                $this->checkForCompletion($message->getFrom());
                break;

            case $message instanceof Started:
                // アクターシステム起動時の処理
                $inMemoryProvider = $this->inMemoryProvider();
                (new ForWithProgress(
                    $this->numberOfIterations,
                    $this->intervalBetweenConsoleUpdates,
                    true,
                    false
                ))->everyNth(
                    fn($i) => print("Started {$i}/{$this->numberOfIterations} processes\n"),
                    function ($i, $nth) use ($context, $inMemoryProvider) {
                        // 送金元と送金先の口座アクターを作成
                        $fromAccount = $this->createAccount($context, "FromAccount{$i}");
                        $toAccount = $this->createAccount($context, "ToAccount{$i}");
                        
                        // 送金プロセスアクターを作成
                        $actorName = "Transfer Process {$i}";
                        $factory = new TransferFactory(
                            $context,
                            $this->uptime,
                            $this->retryAttempts,
                            $inMemoryProvider
                        );
                        $transfer = $factory->createTransfer($actorName, $fromAccount, $toAccount, 10);
                        $this->transfers[] = (string) $transfer->getRef();
                        
                        if ($i === $this->numberOfIterations && !$nth) {
                            print("Started {$i}/{$this->numberOfIterations} processes\n");
                        }
                    }
                );
                break;
        }
    }

    private function checkForCompletion($pid): void
    {
        // 送金プロセスの完了をチェックし、結果を表示
        $this->transfers = array_filter($this->transfers, fn($t) => $t !== $pid->getId());
        $remaining = count($this->transfers);
        
        // 進捗表示
        if ($this->numberOfIterations >= $this->intervalBetweenConsoleUpdates) {
            print(".");
            if ($remaining % ($this->numberOfIterations / $this->intervalBetweenConsoleUpdates) === 0) {
                print("\n{$remaining} processes remaining\n");
            }
        } else {
            print("{$remaining} processes remaining\n");
        }

        // すべての送金プロセスが完了したら結果を表示
        if ($remaining === 0) {
            \Swoole\Coroutine::sleep(2);
            print("\nRESULTS:\n");
            printf(
                "%.2f%% (%d/%d) successful transfers\n",
                $this->asPercentage($this->successResults),
                $this->successResults,
                $this->numberOfIterations
            );
            printf(
                "%.2f%% (%d/%d) failures leaving a consistent system\n",
                $this->asPercentage($this->failedButConsistentResults),
                $this->failedButConsistentResults,
                $this->numberOfIterations
            );
            printf(
                "%.2f%% (%d/%d) failures leaving an inconsistent system\n",
                $this->asPercentage($this->failedAndInconsistentResults),
                $this->failedAndInconsistentResults,
                $this->numberOfIterations
            );
            printf(
                "%.2f%% (%d/%d) unknown results\n",
                $this->asPercentage($this->unknownResults),
                $this->unknownResults,
                $this->numberOfIterations
            );
        }
    }

    private function asPercentage(int $results): float
    {
        return ($results / $this->numberOfIterations) * 100;
    }

    private function inMemoryProvider(): InMemoryStateProvider
    {
        return new InMemoryStateProvider(new InMemoryProvider(2));
    }

    private function createAccount(ContextInterface $context, string $name): Ref
    {
        $accountProps = Props::fromProducer(
            fn() => new Account($this->uptime, $this->refusalProbability, $this->busyProbability)
        );
        return $context->spawnNamed($accountProps, $name)->getRef();
    }
}

Runnerアクターは、以下の役割を担います。

  1. 指定された数の送金プロセスを開始
  2. 各プロセスの結果(成功、一貫性を保った失敗、不整合状態での失敗)を集計
  3. 進捗状況と最終結果を表示

2. Account

Accountは口座を表すアクターで、残高の管理と取引処理を担当します。

<?php
// src/PhluxorSaga/Account.php
declare(strict_types=1);

namespace PhluxorSaga;

use Phluxor\ActorSystem\Context\ContextInterface;
use Phluxor\ActorSystem\Message\ActorInterface;
use Phluxor\ActorSystem\Ref;
use PhluxorSaga\Message;
use Random\RandomException;

class Account implements ActorInterface
{
    private float $balance = 10.0;

    /** @var array<string, mixed> */
    private array $processedMessages = [];

    public function __construct(
        private readonly float $serviceUptime,
        private readonly float $refusalProbability,
        private readonly float $busyProbability
    ) {
    }

    /**
     * @throws RandomException
     */
    public function receive(ContextInterface $context): void
    {
        $message = $context->message();
        switch (true) {
            case $message instanceof Message\Credit:
            case $message instanceof Message\Debit:
                $this->handleBalanceChange($context, $message);
                break;
            case $message instanceof Message\GetBalance:
                $context->respond($this->balance);
                break;
        }
    }

    /**
     * @throws RandomException
     */
    private function handleBalanceChange(
        ContextInterface $context,
        Message\ChangeBalance $message
    ): void {
        // すでに処理済みのメッセージかチェック(冪等性の確保)
        if ($this->alreadyProcessed($message->replyTo)) {
            $context->send($message->replyTo, $this->processedMessages[(string) $message->replyTo]);
            return;
        }

        // 残高不足のチェック
        if ($message instanceof Message\Debit && ($message->amount + $this->balance) < 0) {
            $context->send($message->replyTo, new Message\InsufficientFunds());
            return;
        }

        // 永続的な拒否(確率ベース)
        if ($this->refusePermanently()) {
            $this->processedMessages[(string) $message->replyTo] = new Message\Refused();
            $context->send($message->replyTo, new Message\Refused());
            return;
        }

        // サービス利用不可(確率ベース)
        if ($this->isBusy()) {
            $context->send($message->replyTo, new Message\ServiceUnavailable());
            return;
        }

        // 処理前の障害シミュレーション
        if ($this->shouldFailBeforeProcessing()) {
            $context->send($message->replyTo, new Message\InternalServerError());
            return;
        }
        
        // 処理の実行(遅延をシミュレート)
        usleep(random_int(0, 150) * 1000);
        $this->balance += $message->amount;
        $this->processedMessages[(string) $message->replyTo] = new Message\Ok();

        // 処理後の障害シミュレーション
        if ($this->shouldFailAfterProcessing()) {
            $context->send($message->replyTo, new Message\InternalServerError());
            return;
        }
        
        // 成功応答
        $context->send($message->replyTo, new Message\Ok());
    }

    /**
     * @return bool
     * @throws RandomException
     */
    private function isBusy(): bool
    {
        return random_int(0, 100) / 100.0 <= $this->busyProbability;
    }

    /**
     * @return bool
     * @throws RandomException
     */
    private function refusePermanently(): bool
    {
        return random_int(0, 100) / 100.0 <= $this->refusalProbability;
    }

    /**
     * @return bool
     * @throws RandomException
     */
    private function shouldFailBeforeProcessing(): bool
    {
        return random_int(0, 100) / 100.0 > $this->serviceUptime / 2;
    }

    /**
     * @return bool
     * @throws RandomException
     */
    private function shouldFailAfterProcessing(): bool
    {
        return random_int(0, 100) / 100.0 > $this->serviceUptime;
    }

    private function alreadyProcessed(Ref $replyTo): bool
    {
        return isset($this->processedMessages[(string) $replyTo]);
    }
}

Accountアクターは、以下の役割を担います。

  1. 口座残高の管理
  2. 入金(Credit)と引き落とし(Debit)の処理
  3. 残高照会(GetBalance)への応答
  4. 様々な障害シナリオのシミュレーション:
    • 残高不足
    • 永続的な拒否
    • サービス利用不可
    • 内部エラー
  5. 冪等性の確保(同じメッセージが複数回送信されても一度だけ処理)

各エラーは実際に起きそうなものをランダムで発生させるような形になってます。

3. TransferProcess

TransferProcessは送金プロセス全体を管理するステートマシンです。
デビット、クレジット、ロールバックの各ステップを調整します。

このアクターはBecome/Unbecomeという振る舞いを変更する仕組みで
送金プロセスに対応する仕組みになっています。
このあたりはAkka 実践バイブルなどを読むといいかなと思います。

<?php
// src/PhluxorSaga/TransferProcess.php(一部抜粋)
declare(strict_types=1);

namespace PhluxorSaga;

use Exception;
use Google\Protobuf\Internal\Message;
use Phluxor\ActorSystem\Behavior;
use Phluxor\ActorSystem\Context\ContextInterface;
use Phluxor\ActorSystem\Message\{ActorInterface, ReceiveFunction, Restarting, Started, Stopped, Stopping};
use Phluxor\ActorSystem\Props;
use Phluxor\ActorSystem\ProtoBuf\Terminated;
use Phluxor\ActorSystem\Ref;
use Phluxor\Persistence\Mixin;
use Phluxor\Persistence\PersistentInterface;
use PhluxorSaga\Message\Credit;
use PhluxorSaga\Message\Debit;
use PhluxorSaga\Message\GetBalance;
use PhluxorSaga\Message\Ok;
use PhluxorSaga\Message\Refused;
use PhluxorSaga\ProtoBuf\SuccessResult;

class TransferProcess implements ActorInterface, PersistentInterface
{
    use Mixin; // イベントソーシングのためのミックスイン

    private bool $processCompleted = false;
    private bool $restarting = false;
    private bool $stopping = false;

    public function __construct(
        private readonly Ref $from,
        private readonly Ref $to,
        private readonly float $amount,
        private readonly float $availability,
        private readonly Behavior $behavior = new Behavior()
    ) {
        // 初期状態を設定
        $this->behavior->become(
            new ReceiveFunction(
                fn($context) => $this->starting($context)
            )
        );
    }

    // 初期状態: 開始
    private function starting(ContextInterface $context): void
    {
        if ($context->message() instanceof Started) {
            // デビット試行アクターを作成
            $context->spawnNamed($this->tryDebit($this->from, -$this->amount), 'DebitAttempt');
            // 送金開始イベントを永続化
            $this->persistEvent(new ProtoBuf\TransferStarted());
        }
    }

    // デビット試行アクターの作成
    private function tryDebit(Ref $targetActor, float $amount): Props
    {
        return Props::fromProducer(
            fn() => new AccountProxy(
                $targetActor,
                fn($sender) => new Debit($amount, $sender)
            )
        );
    }

    // メインのメッセージ処理
    public function receive(ContextInterface $context): void
    {
        $message = $context->message();
        switch (true) {
            case $message instanceof Started:
                $this->behavior->become(
                    new ReceiveFunction(
                        fn($context) => $this->starting($context)
                    )
                );
                break;
            case $message instanceof Stopping:
                $this->stopping = true;
                break;
            case $message instanceof Restarting:
                $this->restarting = true;
                break;
            case $message instanceof Stopped && !$this->processCompleted:
                // 予期せぬ停止の処理
                $parent = $context->parent();
                $self = $context->self();
                if (!$this->recovering()) {
                    $this->persistEvent(
                        new ProtoBuf\TransferFailed([
                            'reason' => 'Process stopped unexpectedly'
                        ])
                    );
                    $this->persistEvent(
                        new ProtoBuf\EscalateTransfer([
                            'reason' => 'Unknown failure. Transfer Process crashed'
                        ])
                    );
                }
                $context->send($parent, new Message\UnknownResult($self));
                break;
            // その他のケース...
        }
        // 現在の状態に基づくメッセージ処理
        $this->behavior->receive($context);
    }

    // イベント適用による状態遷移
    private function applyEvent(Message $event): void
    {
        switch (true) {
            case $event instanceof ProtoBuf\TransferStarted:
                // 状態遷移: デビット確認待ちへ
                $this->behavior->become(
                    new ReceiveFunction(
                        fn($context) => $this->awaitingDebitConfirmation($context)
                    )
                );
                break;
            case $event instanceof ProtoBuf\AccountDebited:
                // 状態遷移: クレジット確認待ちへ
                $this->behavior->become(
                    new ReceiveFunction(
                        fn($context) => $this->awaitingCreditConfirmation($context)
                    )
                );
                break;
            case $event instanceof ProtoBuf\CreditRefused:
                // 状態遷移: デビットロールバックへ
                $this->behavior->become(
                    new ReceiveFunction(
                        fn($context) => $this->rollingBackDebit($context)
                    )
                );
                break;
            case $event instanceof ProtoBuf\AccountCredited:
            case $event instanceof ProtoBuf\DebitRolledBack:
            case $event instanceof ProtoBuf\TransferFailed:
                // プロセス完了
                $this->processCompleted = true;
                break;
        }
    }

    // 状態: デビット確認待ち
    private function awaitingDebitConfirmation(ContextInterface $context): void
    {
        $message = $context->message();
        $self = $context->self();
        switch (true) {
            // 再起動時の処理
            case $message instanceof Started:
                $context->spawnNamed(
                    $this->tryDebit($this->from, -$this->amount),
                    'DebitAttempt'
                );
                break;
            // デビット成功
            case $message instanceof Ok:
                // デビット成功イベントを永続化
                $this->persistEvent(new ProtoBuf\AccountDebited());
                // クレジット試行アクターを作成
                $context->spawnNamed(
                    $this->tryCredit($this->to, +$this->amount),
                    'CreditAttempt'
                );
                break;
            // デビット拒否
            case $message instanceof Refused:
                $parent = $context->parent();
                // 送金失敗イベントを永続化
                $this->persistEvent(new ProtoBuf\TransferFailed([
                    'reason' => 'Debit refused'
                ]));
                // 親アクターに結果を通知
                $context->send($parent, new ProtoBuf\FailedButConsistentResult([
                    'from' => $self->protobufPid()
                ]));
                $this->stopAll($context, $self);
                break;
            // アクター終了
            case $message instanceof Terminated:
                $this->persistEvent(new ProtoBuf\StatusUnknown());
                $this->stopAll($context, $self);
                break;
        }
    }

    // クレジット試行アクターの作成
    private function tryCredit(Ref $targetActor, float $amount): Props
    {
        return Props::fromProducer(
            fn() => new AccountProxy(
                $targetActor,
                fn($sender) => new Credit($amount, $sender)
            )
        );
    }

    // 状態: クレジット確認待ち
    private function awaitingCreditConfirmation(ContextInterface $context): void
    {
        $message = $context->message();
        $self = $context->self();
        switch (true) {
            // 再起動時の処理
            case $message instanceof Started:
                $context->spawnNamed(
                    $this->tryCredit($this->to, +$this->amount),
                    'CreditAttempt'
                );
                break;
            // クレジット成功
            case $message instanceof Ok:
                $parent = $context->parent();
                // 残高照会
                $fromBalance = $context->requestFuture($this->from, new GetBalance(), 2000);
                $fromBalanceResult = $fromBalance->result()->value();
                $toBalance = $context->requestFuture($this->to, new GetBalance(), 2000);
                $toBalanceResult = $toBalance->result()->value();
                // クレジット成功イベントを永続化
                $this->persistEvent(new ProtoBuf\AccountCredited());
                $completed = new ProtoBuf\TransferCompleted();
                $this->persistEvent(
                    $completed->setFromBalance((float)$fromBalanceResult)
                        ->setToBalance((float)$toBalanceResult)
                        ->setFrom($this->from->protobufPid())
                        ->setTo($this->to->protobufPid())
                );
                // 親アクターに成功を通知
                $context->send($parent, new SuccessResult([
                    'from' => $self->protobufPid()
                ]));
                $this->stopAll($context, $self);
                break;
            // クレジット拒否
            case $message instanceof Refused:
                // クレジット拒否イベントを永続化
                $this->persistEvent(new ProtoBuf\CreditRefused());
                // ロールバックアクターを作成
                $context->spawnNamed(
                    $this->tryCredit($this->from, +$this->amount),
                    'RollbackDebit'
                );
                break;
            // アクター終了
            case $message instanceof Terminated:
                $this->persistEvent(new ProtoBuf\StatusUnknown());
                $this->stopAll($context, $self);
                break;
        }
    }

    // 状態: デビットロールバック中
    private function rollingBackDebit(ContextInterface $context): void
    {
        $message = $context->message();
        $self = $context->self();
        switch (true) {
            // 再起動時の処理
            case $message instanceof Started:
                $context->spawnNamed(
                    $this->tryCredit($this->from, +$this->amount),
                    'RollbackDebit'
                );
                break;
            // ロールバック成功
            case $message instanceof Ok:
                $parent = $context->parent();
                // ロールバック成功イベントを永続化
                $this->persistEvent(new ProtoBuf\DebitRolledBack());
                $this->persistEvent(
                    new ProtoBuf\TransferFailed([
                        'reason' => sprintf('Unable to rollback debit to %s', $this->to->protobufPid()->getId())
                    ])
                );
                // 親アクターに結果を通知
                $context->send(
                    $parent,
                    new ProtoBuf\FailedAndInconsistent([
                        'from' => $self->protobufPid(),
                    ])
                );
                $this->stopAll($context, $self);
                break;
            // ロールバック失敗
            case $message instanceof Refused:
            case $message instanceof Terminated:
                $parent = $context->parent();
                $self = $context->self();
                // 送金失敗イベントを永続化
                $this->persistEvent(
                    new ProtoBuf\TransferFailed([
                        'reason' => sprintf(
                            'Unable to rollback process. %s is owed %s',
                            $this->to->protobufPid()->getId(),
                            $this->amount
                        )
                    ])
                );
                // エスカレーションイベントを永続化
                $this->persistEvent(
                    new ProtoBuf\EscalateTransfer(
                        sprintf(
                            '%s is owed %s',
                            $this->to->protobufPid()->getId(),
                            $this->amount
                        )
                    )
                );
                // 親アクターに結果を通知
                $context->send(
                    $parent,
                    new ProtoBuf\FailedAndInconsistent([
                        'from' => $self->protobufPid(),
                    ])
                );
                $this->stopAll($context, $self);
                break;
        }
    }

    // すべてのアクターを停止
    private function stopAll(ContextInterface $context, Ref $self): void
    {
        $context->stop($this->from);
        $context->stop($this->to);
        $context->stop($self);
    }

    // イベントの永続化と適用
    private function persistEvent(Message $event): void
    {
        $this->persistenceReceive($event);
        $this->applyEvent($event);
    }

    // イベントソーシングのためのリカバリーメソッド
    public function receiveRecover(mixed $message): void
    {
        if ($message instanceof Message) {
            $this->applyEvent($message);
        }
    }
}

TransferProcessアクターは、以下の役割を担います。

  1. 送金プロセス全体の状態管理
  2. 状態遷移の制御
    (starting → awaitingDebitConfirmation → awaitingCreditConfirmation → rollingBackDebit)
  3. AccountProxyアクターの作成と管理
  4. イベントの永続化(イベントソーシング)
  5. 親アクター(Runner)への結果通知

TransferProcessはイベントソーシングを使用して状態を管理していますので、
各状態変更はイベントとして永続化され障害発生時にはイベントを再生して状態が復元されます。

このあたりはアクターのライフサイクルなどと関係がありますので、
これも書籍や実際に動かしてみると良いと思います。

TransferProcessのProcess Manager(プロセスマネージャー)としての役割

TransferProcessはSagaパターンにおける「Process Manager」としての役割も担っています。
Process Managerは、複数のサービスやコンポーネント間の調整を行い全体のプロセスを管理する責務を持つもので、
EIP(enterprise integration patterns)でもおなじみのものです。

具体的には、

  1. オーケストレーション: 送金プロセスの各ステップ(デビット、クレジット、ロールバック)を順序立てて実行
  2. 状態追跡: プロセス全体の進行状況を追跡し、現在の状態に基づいて次のアクションを決定
  3. 障害処理: 障害発生時の補償アクション(ロールバック)を調整
  4. 結果通知: プロセスの結果を親アクターに通知

というものでProcess Managerとしての役割により、
TransferProcessは送金Sagaの中心的な調整役として機能し、分散トランザクションの一貫性を確保する仕組みです。
これは、マイクロサービスアーキテクチャなどの分散システムにおいて特に重要なもので、
このあたりを知っているか実践できるかで障害に強いシステム作りができるかどうかに関わってきます。

www.enterpriseintegrationpatterns.com

オーケストレーション型のSagaパターンでは、このようなProcess Managerが中央の調整役として機能し、
各サービスに対してコマンドを発行し、応答に基づいて次のステップを決定していきます。

コレオグラフィ型のSagaパターンでは、
中央の調整役は存在せず各サービスがイベントを発行し、
他のサービスがそれに反応する形で処理していくものになります。

トークのSagaの例は、TransferProcessがオーケストレーション型のProcess Managerとして実装してあり、
AccountProxyアクターを通じてAccountアクターとの通信を調整するようになっています。

このあたりは一般的なPHPの手法から大きく異なるところかもしれません。
LaravelやSymfonyなどを採用しても実現するのは相当難しいと思います。

4. AccountProxy

AccountProxyはTransferProcessとAccountの間の通信を仲介するアクターです。
タイムアウト処理や再試行ロジックを担当します。

<?php
// src/PhluxorSaga/AccountProxy.php
declare(strict_types=1);

namespace PhluxorSaga;

use Closure;
use DateInterval;
use Phluxor\ActorSystem\Context\ContextInterface;
use Phluxor\ActorSystem\Message\ActorInterface;
use Phluxor\ActorSystem\Message\ReceiveTimeout;
use Phluxor\ActorSystem\Message\Started;
use Phluxor\ActorSystem\Ref;
use PhluxorSaga\Message\InsufficientFunds;
use PhluxorSaga\Message\InternalServerError;
use PhluxorSaga\Message\Ok;
use PhluxorSaga\Message\Refused;
use PhluxorSaga\Message\ServiceUnavailable;
use RuntimeException;

readonly class AccountProxy implements ActorInterface
{
    /**
     * @param Ref $target
     * @param Closure(Ref, mixed): mixed $createMessage
     */
    public function __construct(
        private Ref $target,
        private Closure $createMessage
    ) {
    }

    public function receive(ContextInterface $context): void
    {
        $message = $context->message();
        switch (true) {
            case $message instanceof Started:
                // アクター起動時の処理
                $context->send($this->target, ($this->createMessage)($context->self()));
                $context->setReceiveTimeout(new DateInterval('PT2S'));
                break;
            case $message instanceof Refused:
            case $message instanceof Ok:
                // 成功または拒否の応答を親アクターに転送
                $context->cancelReceiveTimeout();
                $context->send($context->parent(), $message);
                break;
            case $message instanceof InsufficientFunds:
            case $message instanceof InternalServerError:
            case $message instanceof ReceiveTimeout:
            case $message instanceof ServiceUnavailable:
                // エラー時の処理
                throw new RuntimeException('Unexpected message');
        }
    }
}

AccountProxyアクターは、以下の役割を担ってます。

  1. TransferProcessとAccountの間の通信を仲介
  2. メッセージの送信とタイムアウト設定
  3. 応答の転送
  4. エラー処理

Error Kernel Patternを採用していて、
大事な処理は絶対に落とさないような形になっています。

このあたりを理解するにはReactive Design Patterns にて・・

www.reactivedesignpatterns.com

メッセージフロー

Sagaのメッセージフローを図示すると、以下のようになります:

sequenceDiagram
    participant R as Runner
    participant TP as TransferProcess
    participant DA as DebitAttempt
    participant CA as CreditAttempt
    participant RD as RollbackDebit
    participant FA as FromAccount
    participant TA as ToAccount
    
    R->>TP: 作成
    TP->>DA: 作成
    DA->>FA: Debit(-amount)
    FA-->>DA: Ok
    DA-->>TP: Ok
    TP->>CA: 作成
    CA->>TA: Credit(+amount)
    
    alt 成功シナリオ
        TA-->>CA: Ok
        CA-->>TP: Ok
        TP->>FA: GetBalance
        TP->>TA: GetBalance
        FA-->>TP: balance
        TA-->>TP: balance
        TP-->>R: SuccessResult
    else 失敗シナリオ(クレジット拒否)
        TA-->>CA: Refused
        CA-->>TP: Refused
        TP->>RD: 作成
        RD->>FA: Credit(+amount)
        FA-->>RD: Ok
        RD-->>TP: Ok
        TP-->>R: FailedButConsistentResult
    end

このフローの流れは下記のようなになります。

  1. Runnerが送金プロセスを開始
  2. TransferProcessがDebitAttemptを作成して送金元口座からデビット
  3. デビット成功後、CreditAttemptを作成して送金先口座にクレジット
  4. クレジット成功の場合、送金完了
  5. クレジット失敗の場合、RollbackDebitを作成して送金元口座にロールバック

なんかよくわからなくなってきますね。
TransferProcessはデビット・クレジットの関係をそのまま並行で独立して持ちますので、
10人が振り込んでいると
10個独立したTransferProcessと各Accountアクターのセットで稼働します。 (コルーチンで制御されています)

Supervisor戦略

このSagaではエラー処理のためにSupervisor戦略を設定しています。
Daemonなどに利用するツールではなくて、
アクターシステム内に存在するアクターに対する戦略です。
PHPで似た概念はありません。

初めて見る方は全くわからないと思います。
過去に解説していますのでそちらを参照ください。
またはAkkaなどのドキュメントにも解説があります。

blog.ytake.jp.net

blog.ytake.jp.net

TransferFactoryクラスでは、以下のようにsupervisor戦略を設定しています:

<?php
// src/PhluxorSaga/TransferFactory.php
declare(strict_types=1);

namespace PhluxorSaga;

use DateInterval;
use Phluxor\ActorSystem\Context\ContextInterface;
use Phluxor\ActorSystem\Props;
use Phluxor\ActorSystem\Ref;
use Phluxor\ActorSystem\SpawnResult;
use Phluxor\ActorSystem\Strategy\OneForOneStrategy;
use Phluxor\ActorSystem\Supervision\DefaultDecider;
use Phluxor\Persistence\ProviderInterface;

readonly class TransferFactory
{
    public function __construct(
        private ContextInterface $context,
        private float $availability,
        private int $retryAttempts,
        private ProviderInterface $provider,
    ) {
    }

    /**
     * @param string $actorName
     * @param Ref $fromAccount
     * @param Ref $toAccount
     * @param float $amount
     * @return SpawnResult
     */
    public function createTransfer(
        string $actorName,
        Ref $fromAccount,
        Ref $toAccount,
        float $amount,
    ): SpawnResult {
        $props = Props::fromProducer(
            fn() => new TransferProcess($fromAccount, $toAccount, $amount, $this->availability),
            Props::withReceiverMiddleware(
                new EventSourcedFactory($this->provider)
            ),
            Props::withSupervisor(
                new OneForOneStrategy(
                    $this->retryAttempts,
                    new \DateInterval('PT10S'),
                    new DefaultDecider(),
                )
            )
        );
        return $this->context->spawnNamed($props, $actorName);
    }
}

このSupervisor戦略は下記の意味を持ちます。

  1. OneForOneStrategy: 失敗した子アクターのみを再起動
  2. retryAttempts: 最大再試行回数
  3. DateInterval('PT10S'): 10秒間の時間枠内での再試行
  4. DefaultDecider: 例外発生時のデフォルト対応(再起動)

これを指定しておくと、アクターが予期せぬクラッシュなどが起きた場合に
どのように復旧させる自分で設計できるわけです。
そしてクラッシュすると、アクターが前の状態に自動で復元されたりします。
外部の監視ツールなどを使うわけでなく、アクターシステムが面倒をみるわけです(Phluxorもそうです)。

これでアクターモデルとSagaパターンを組み合わせることで、
堅牢で柔軟な分散トランザクション管理システムが構築できるようになります。

主要なコンポーネントとして下記のとおりです。

  1. Runner: 送金プロセスのオーケストレーター
  2. Account: 口座を表すアクター
  3. TransferProcess: 送金Sagaの状態機械
  4. AccountProxy: アカウントとの通信を仲介するアクター

解説は長いですが、仕組みの割には実装コードはかなり少ないです。

さぁ、もうちょっとで終わりですw

TransferProcessの状態遷移

TransferProcessは、送金プロセス全体を管理するステートマシンという説明をしました。

これは以下の状態を持ちます。

  1. Starting: 送金プロセスの開始状態
  2. AwaitingDebitConfirmation: 送金元口座からのデビット確認待ち
  3. AwaitingCreditConfirmation: 送金先口座へのクレジット確認待ち
  4. RollingBackDebit: デビットのロールバック中

これらの状態間の遷移を図示すると、以下のようになります。
実際にトークにもあった例ですね。

stateDiagram-v2
    [*] --> Starting
    Starting --> AwaitingDebitConfirmation: TransferStarted
    
    AwaitingDebitConfirmation --> AwaitingCreditConfirmation: AccountDebited
    AwaitingDebitConfirmation --> [*]: TransferFailed (Refused)
    
    AwaitingCreditConfirmation --> [*]: AccountCredited
    AwaitingCreditConfirmation --> RollingBackDebit: CreditRefused
    
    RollingBackDebit --> [*]: DebitRolledBack
    RollingBackDebit --> [*]: TransferFailed (Rollback Failed)

各状態では特定のメッセージに対して異なる処理を行います。
例えば、AwaitingDebitConfirmation状態では、
Okメッセージを受け取るとAccountDebitedイベントを永続化し、
AwaitingCreditConfirmation状態に遷移する

そんな挙動になります。

状態遷移の実装

TransferProcessではBehaviorを使用して状態遷移を実装しています。
各状態は、ReceiveFunctionとして定義されたメッセージハンドラに対応します。

// 初期状態の設定
$this->behavior->become(
    new ReceiveFunction(
        fn($context) => $this->starting($context)
    )
);

// イベント適用による状態遷移
private function applyEvent(Message $event): void
{
    switch (true) {
        case $event instanceof ProtoBuf\TransferStarted:
            // 状態遷移: デビット確認待ちへ
            $this->behavior->become(
                new ReceiveFunction(
                    fn($context) => $this->awaitingDebitConfirmation($context)
                )
            );
            break;
        case $event instanceof ProtoBuf\AccountDebited:
            // 状態遷移: クレジット確認待ちへ
            $this->behavior->become(
                new ReceiveFunction(
                    fn($context) => $this->awaitingCreditConfirmation($context)
                )
            );
            break;
        case $event instanceof ProtoBuf\CreditRefused:
            // 状態遷移: デビットロールバックへ
            $this->behavior->become(
                new ReceiveFunction(
                    fn($context) => $this->rollingBackDebit($context)
                )
            );
            break;
        // その他の状態...
    }
}

イベントソーシングによる状態管理

TransferProcessはイベントソーシングとして状態を管理しています。
コード例ではインメモリで保管しているだけなので、
プロセス自体を落とすと消えますが実際のデータベースなどに変更することもできます。

アクターの永続化については色々なドキュメントや
各言語のアクターシステムのリファレンスを読まれると良いと思います。

永続化されているとアクターが再起動すると、永続化されたイベントを頭から再生して状態が復元されます。

障害シナリオと回復メカニズム

このSagaでは以下のような障害シナリオを考慮しています。

1. デビット拒否

送金元口座からのデビットが拒否された場合:

case $message instanceof Refused:
    $parent = $context->parent();
    // 送金失敗イベントを永続化
    $this->persistEvent(new ProtoBuf\TransferFailed([
        'reason' => 'Debit refused'
    ]));
    // 親アクターに結果を通知
    $context->send($parent, new ProtoBuf\FailedButConsistentResult([
        'from' => $self->protobufPid()
    ]));
    $this->stopAll($context, $self);
    break;

この場合、送金プロセスは失敗しますがシステムは一貫性を保っています
(送金元口座からデビットされていないため)。

2. クレジット拒否

送金先口座へのクレジットが拒否された場合

case $message instanceof Refused:
    // クレジット拒否イベントを永続化
    $this->persistEvent(new ProtoBuf\CreditRefused());
    // ロールバックアクターを作成
    $context->spawnNamed(
        $this->tryCredit($this->from, +$this->amount),
        'RollbackDebit'
    );
    break;

この場合、送金元口座へのデビットをロールバックするために、
RollbackDebitアクターを作成します。

3. ロールバック失敗

送金元口座へのデビットロールバックが失敗した場合

case $message instanceof Refused:
case $message instanceof Terminated:
    $parent = $context->parent();
    $self = $context->self();
    // 送金失敗イベントを永続化
    $this->persistEvent(
        new ProtoBuf\TransferFailed([
            'reason' => sprintf(
                'Unable to rollback process. %s is owed %s',
                $this->to->protobufPid()->getId(),
                $this->amount
            )
        ])
    );
    // エスカレーションイベントを永続化
    $this->persistEvent(
        new ProtoBuf\EscalateTransfer(
            sprintf(
                '%s is owed %s',
                $this->to->protobufPid()->getId(),
                $this->amount
            )
        )
    );
    // 親アクターに結果を通知
    $context->send(
        $parent,
        new ProtoBuf\FailedAndInconsistent([
            'from' => $self->protobufPid(),
        ])
    );
    $this->stopAll($context, $self);
    break;

この場合、システムは不整合状態になり手動介入が必要です。
EscalateTransferイベントを永続化し親アクターにFailedAndInconsistentメッセージを送信します。
オペレーターの方などに連絡をするようなパターンになります。

4. 予期せぬ停止

TransferProcessが予期せず停止した場合

case $message instanceof Stopped && !$this->processCompleted:
    // 予期せぬ停止の処理
    $parent = $context->parent();
    $self = $context->self();
    if (!$this->recovering()) {
        $this->persistEvent(
            new ProtoBuf\TransferFailed([
                'reason' => 'Process stopped unexpectedly'
            ])
        );
        $this->persistEvent(
            new ProtoBuf\EscalateTransfer([
                'reason' => 'Unknown failure. Transfer Process crashed'
            ])
        );
    }
    $context->send($parent, new Message\UnknownResult($self));
    break;

この場合、TransferFailedイベントとEscalateTransferイベントを永続化し、
親アクターにUnknownResultメッセージが送信されます。

冪等性の確保

Sagaではメッセージの冪等性(何度実行しても同じ結果になる性質)を確保するために、
以下のような仕組みを使用しています:

private function handleBalanceChange(
    ContextInterface $context,
    Message\ChangeBalance $message
): void {
    // すでに処理済みのメッセージかチェック(冪等性の確保)
    if ($this->alreadyProcessed($message->replyTo)) {
        $context->send($message->replyTo, $this->processedMessages[(string) $message->replyTo]);
        return;
    }
    
    // 処理ロジック...
    
    // 結果を記録
    $this->processedMessages[(string) $message->replyTo] = new Message\Ok();
}

Accountアクターは、各メッセージの処理結果をprocessedMessages配列に記録し、
同じメッセージが再送された場合は、記録された結果を返します。
これにより同じメッセージが複数回送信されても一度だけ処理されることが保証されます。

安心して復元できます!

結果の分類

さいごにいろんな処理結果を以下のように分類してconsoleに出力するようになっています。

  1. SuccessResult: 送金成功
  2. FailedButConsistentResult: 一貫性を保った送金失敗(例:デビット拒否)
  3. FailedAndInconsistent: 不整合状態での送金失敗(例:ロールバック失敗)
  4. UnknownResult: 結果不明(例:予期せぬ停止)

実行例

送金Sagaを実行すると、以下のような出力が得られます:

Started 1/10 processes
Started 2/10 processes
Started 3/10 processes
Started 4/10 processes
Started 5/10 processes
Started 6/10 processes
Started 7/10 processes
Started 8/10 processes
Started 9/10 processes
Started 10/10 processes
..........
0 processes remaining

RESULTS:
80.00% (8/10) successful transfers
10.00% (1/10) failures leaving a consistent system
10.00% (1/10) failures leaving an inconsistent system
0.00% (0/10) unknown results

この例では、10回の送金プロセスのうち8回が成功し、
1回が一貫性を保った失敗、1回が不整合状態での失敗となっています。

もちろん規定回数以内の失敗はSupervisor戦略通り、
アクターの自動復旧が行われます(logにrecoverができます)。

こんな感じの全体像で、Sagaパターンが実装されています。
Proto ActorやPekko/Akkaを使った話は今後何処かで話す予定はありますので、
商用に近い話はその時にもぜひご参加ください。

実際にサンプルコードを使ってたくさん処理すると、
リソースの管理等にまだ問題があるままなのでそのまま停止したりしますので
ご注意ください

トークでのQA

Ask The Speakerでいただいた質問は省きます。

アクターモデルの話をしているとシステムになるのはなぜ?

アクターモデル自体は、「状態・振る舞い・メッセージ駆動」を基本とした抽象的な並行処理モデルであり、
モデル自体の理解は比較的シンプルです。
多分一般的なオブジェクト指向的なものよりもわかりやすいです(アクターモデルもOOPの一種です)。

ですがこのアクターモデルを実際のシステムとして、
スケーラブルかつフォールトトレラントに構築するには、 Akka等のアクターシステムツールキットやランタイムの活用が必要となります。

このためシステムの話がセットになる、という感じです。

アクター 一つ一つをAPIサーバで表現するのは?

アクターモデルの設計単位で変わりますが、
アクターはいろんなコンテキストで粒度が変わっていきます。
例えばxx Entityなどもアクターで表現されますし、
ドメイン駆動設計などのRepositoryもアクターで表現します(集約に対するイベントが簡単に表現できる)。

実際にそういう単位でもアクターが生成されますので、
これを1つのサーバと表現すると、一つのアプリケーションで何百台になるかもしれません。
またトークの内容通り、分散トランザクション・2相コミットなどの問題には結局対応できず、
どれかのサーバがダウンしていると処理がスタックすることにもなりますので、
より複雑な問題が発生してしまいます。

このあたりはアクターモデル自体を知っているかどうかというのもありますので、
良い観点の質問だと思います。

アクターが並行で動くということは、Split-brainなどには対応できるの?

これは今回のトークもすっ飛ばしたところですが(これだけで1講演できるくらい・・)、
実際にローカルだけのアクターシステムで構築する場合、
アクターシステム内部では同じアクターは一つだけになるように実はコントロールされています。
といっても何もしなければランダムでアクターの識別子が付与されて、
それがアクターのアドレス(位置情報になる)になります。

実際にアクター生成時に任意の名前をつけることもでき、
同じ名前は使用できないようになっています。
例えば "hoge_user:1"とか "hoge_users"とかなんでもいいんですが、
アクターでどう表現するか次第ですがこうした識別子を使うことで常に一つという形になります。
(これはどの言語のアクターシステムでも全く同じです)

これがアクターシステム同士が連携してクラスタになる場合も
該当のアクターは一つだけになる仕組みがあります。

アクターがダウンした際の引き継ぎ機能なども実はあります。

doc.akka.io

他にもCluster Singletonや、
Proto ActorやMicrosoft Orleansの場合はVirtual Actorを活用することで
Split-brainに対応することができます!

このあたりはトークから遠くなってしまうため、今回の内容には含めませんでした。

おわりに

というアクター仕組みを用いてSagaパターンを実践する内容でした。
これは他言語でアクターを利用する場合もある程度流用できると思います。

PHPerの方は馴染のない概念なのもありますのですごく難しく感じられるかもしれませんが、
うまく活用したりPHPに逆輸入することで
これまでのPHPとは大きく異なる新しいなにかが生まれるきっかけにもなるかもしれません。

長い文章でここまで読んだ方はいないかもしれませんが、
一部CQRS+ES実践のヒントも隠されていますので是非読み解いてチャレンジしてみてください。

繰り返しのような話になりますが、
後日動画やこのブログを読んだだけではおそらくほとんどのPHPの方は理解できないと思います。

ですが面白そうだなとかやってみたいな!とかありましたら是非質問してみてください!
DMなど送っていただいても大丈夫です。