PHPだけで分散処理ってできるの?
多分皆さんがそう思うはずです。
これまでPHPではリソースの操作などで並行処理を利用することは
エクステンションなどの介して実現はできていましたが、
物理的分かれているPHPに処理を依頼する、結果を戻すなどは
Kafkaなどに代表されるようなミドルウェアを介することが一般的でした。
そんなPHPですが、
PHPの進化を感じさせるような、完全にPHPだけで物理的な別サーバと連携しながら処理をする、
いわゆる分散システムの簡単な例を紹介します。
え!そんなことPHPでできんの?!
Phluxor Remote
ということで100%PHPのアクターモデルツールキット、PhluxorにRemoteという機能を仮で実装しました。
仮なのはまだまだ実用するにはちょっと難しい面もあるため仮、いわゆるExperimentalなものとしています。
サンプルコードは下記にあります
解説はあとにするとして、簡単に実装して動かしましょう
メッセージ
物理的にネットワークをまたぐシステムとやり取りを行う場合は
Protocol Buffersを利用することを必須としています。
下記の様な簡単なメッセージを用意し、protoc経由でコード生成します。
syntax = "proto3"; package protobuf; option php_namespace = "PhluxorExample\\ProtoBuf"; option php_metadata_namespace = "PhluxorExample\\Metadata"; message HelloRequest { string name = 1; } message HelloResponse { string message = 1; }
普通のprotoファイルなので、特別なものは何もありません。
あとは生成したクラスを利用して簡単な実装をするだけです。
HelloResponseActor
HelloRequest
を受け取ったら HelloResponse
を返すだけの簡単なアクターを実装します。
<?php declare(strict_types=1); namespace PhluxorExample; use Phluxor\ActorSystem\Context\ContextInterface; use Phluxor\ActorSystem\Message\ActorInterface; use PhluxorExample\ProtoBuf\HelloRequest; use PhluxorExample\ProtoBuf\HelloResponse; class HelloResponseActor implements ActorInterface { public function receive(ContextInterface $context): void { $message = $context->message(); if ($message instanceof HelloRequest) { $context->respond(new HelloResponse([ 'message' => sprintf( 'Hello %s from remote node. this is %s, %d', $message->getName(), $context->self(), time() ), ])); } } }
Proto Actorやakka、Apache Pekkoと
同じ概念になっていますので実装方法は基本的に同じです(独自のものではないです)。
Node 1
アクターシステムを起動して、アクターシステム同士でやり取りを行うように実装します。
といってもコード数はかなり少ないです。
<?php declare(strict_types=1); use Phluxor\ActorSystem; use Phluxor\Remote\Config; use Phluxor\Remote\Remote; use PhluxorExample\HelloResponseActor; use function Swoole\Coroutine\run; require_once __DIR__ . '/vendor/autoload.php'; run(function () { \Swoole\Coroutine\go(function () { $system = ActorSystem::create(); $remote = new Remote( $system, new Config( 'node1', 50052, Config::withUseWebSocket(true) ) ); $remote->start(); $props = ActorSystem\Props::fromProducer( fn() => new HelloResponseActor() ); $system->root()->spawnNamed($props, 'hello'); });
phpコマンドで直接起動する形になっていますが、なんとこれだけです。
下記の部分は、このノードで動かすアクターシステムの情報を与えています。
$remote = new Remote(
$system,
new Config(
'node1',
50052,
Config::withUseWebSocket(true)
)
);
$remote->start();
Phluxor\Remote\Config
の第1引数は自分自身のhost名
第2引数は利用するport
第3引数はWebScoketを指定していますが、gRPCに後で対応するのでどっちにするかのオプション指定です。
その後 remoteサーバを起動、という流れになっています。
これを実行するとcoroutineでWebSocketサーバが起動し接続待機状態になります。
次の部分は普通のアクター利用と同じで、
HelloRequestに対応するアクター生成を行っています。
このとき生成されるアクターは spawnNamed
を使って hello
という名前を与えています。
$props = ActorSystem\Props::fromProducer( fn() => new HelloResponseActor() ); $system->root()->spawnNamed($props, 'hello');
Node 1の実装はなんとこれだけです。
Node 2
続いてもう一つのNodeを実装します。
この例ではNode2からNode1へ仕事を依頼する形になります。
<?php require_once __DIR__ . '/vendor/autoload.php'; use Phluxor\ActorSystem; use Phluxor\Remote\Config; use Phluxor\Remote\Remote; use PhluxorExample\ProtoBuf\HelloRequest; use function Swoole\Coroutine\run; run(function () { \Swoole\Coroutine\go(function () { $system = ActorSystem::create(); $remote = new Remote( $system, new Config( 'node2', 50053, Config::withUseWebSocket(true) ) ); $remote->start(); $future = $system->root()->requestFuture( new ActorSystem\Ref(new ActorSystem\ProtoBuf\Pid([ 'address' => 'node1:50052', 'id' => 'hello', ])), new HelloRequest(['name' => 'world']), 1 ); $r = $future->result(); var_dump($r->value()->getMessage()); // Hello from remote node! $remote->shutdown(); // 処理が終わったらWebSocketサーバ停止 }); });
基本の部分はNode1と変わりません。
自身のhost名とportを指定しWebSocketサーバ起動を指示します。
次の部分も通常のアクターへのメッセージ送信と大きく変わりません。
唯一ちょっと違うのはアクターへのアドレスを担当するActorSystem\Ref
で指定している
'address' => 'node1:50052'
だけです。
これが送信先のアクターシステム指示となります。
$future = $system->root()->requestFuture( new ActorSystem\Ref(new ActorSystem\ProtoBuf\Pid([ 'address' => 'node1:50052', 'id' => 'hello', ])), new HelloRequest(['name' => 'world']), 1 ); $r = $future->result(); var_dump($r->value()->getMessage()); // Hello from remote node!
簡単にメッセージが受け取れるように Future を使って
最大1秒待機するように指定していますが、
受信用のアクターを用意してそちらで受け取っても構いません。
面倒なのでFuture(これもアクターですが)が受け取ってそのまま出力するだけの処理となっていますので、
特別なものではありません。
なんとこれだけでネットワークを挟んで他のアクターシステムに仕事を依頼することができてしまいます。
これを実行するとNode1には
[2024-09-12T10:20:05.381542+00:00] Phluxor.INFO: actor system started {"id":"672LaPa7iUjt5nuxxtHUeb"} [] [2024-09-12T10:20:05.391609+00:00] Phluxor.INFO: Started Activator [] [] [2024-09-12T10:20:05.395260+00:00] Phluxor.INFO: Started EndpointManager [] [] [2024-09-12T10:20:05.397406+00:00] Phluxor.INFO: Starting Phluxor remote server {"address":"node1:50052"} []
Node2ではNode1からの返信(処理結果)を受け取るようになります
[2024-09-12T10:22:00.629499+00:00] Phluxor.INFO: actor system started {"id":"sjxkMGdzsnBECgm5974HmK"} [] [2024-09-12T10:22:00.640914+00:00] Phluxor.INFO: Started Activator [] [] [2024-09-12T10:22:00.645342+00:00] Phluxor.INFO: Started EndpointManager [] [] [2024-09-12T10:22:00.647943+00:00] Phluxor.INFO: Starting Phluxor remote server {"address":"node2:50053"} [] [2024-09-12T10:22:00.653363+00:00] Phluxor.INFO: Started WebSocket.EndpointWriter. connecting {"address":"node1:50052"} [] [2024-09-12T10:22:00.655078+00:00] Phluxor.INFO: Started EndpointWatcher {"address":"node1:50052"} [] [2024-09-12T10:22:00.685907+00:00] Phluxor.INFO: WebSocket.EndpointWriter connected {"address":"node1:50052"} [] /var/app/node2.php:33: string(55) "Hello world from remote node. this is hello, 1726136520" [2024-09-12T10:22:02.230792+00:00] Phluxor.INFO: Killed Phluxor server [] [] [2024-09-12T10:22:03.224089+00:00] Phluxor.INFO: WebSocket.EndpointWriter closing connection {"address":"node1:50052"} [] [2024-09-12T10:22:03.228512+00:00] Phluxor.INFO: deadletter {"message":{"Phluxor\\ActorSystem\\ProtoBuf\\Terminated":[]},"sender":"","pid":"EndpointSupervisor"} []
何回送っても分散して処理ができる!
今までPHPではできなかった+アクターシステムが処理自体をコントロールする、
いわゆる最小のマイクロサービス的な形で動作してしまう、ということです。
もちろんSupervisionによるアクターの監視なども動作しますので、
アクターがクラッシュすると自動復旧なども動作します。
分散処理は複雑な挙動を作り込まなければなりませんが、
アクターシステムの特徴をフル活用することで、並行なのか分散処理なのか
どこで動いているのかはほぼ意識せずに体験できるようになりました。
Experimentalなので実現とは言えませんがw
PHPで開発しているときはあまり意識することはありませんが、
アクターシステムの特徴でもある位置透過性がきちんと実現されています。
Node2内で下記のような、ローカルで処理するアクターが追加であっても
もちろんそれはローカルだけでアクターが動作します。
run(function () { \Swoole\Coroutine\go(function () { $system = ActorSystem::create(); $remote = new Remote( $system, new Config( 'node2', 50053, Config::withUseWebSocket(true) ) ); $remote->start(); $future = $system->root()->requestFuture( new ActorSystem\Ref(new ActorSystem\ProtoBuf\Pid([ 'address' => 'node1:50052', 'id' => 'hello', ])), new HelloRequest(['name' => 'world']), 1 ); $r = $future->result(); var_dump($r->value()->getMessage()); // Hello from remote node! $ref = $system->root()->spawn( ActorSystem\Props::fromProducer(fn() => new \PhluxorExample\LocalActor()) ); $future = $system->root()->requestFuture( $ref, new \PhluxorExample\Message\LocalHello('node2 world'), 1 ); var_dump((string) $future->result()->value()); // local hello $remote->shutdown(); }); });
ActorSystem\Ref
を直接記述しているのはありますが、
$ref = $system->root()->spawn( ActorSystem\Props::fromProducer(fn() => new \PhluxorExample\LocalActor()) );
アクター生成をするとActorSystem\Ref
が返却されますので直接アクターへのアドレス(正しくはMailbox)を指定しているか、
自動で生成されるものを使うか、だけの違いしかありません。
アクターシステム内部としても自動生成か直生成かは特に区別もしていません。
あくまでただのアドレス情報となっており、これがローカルのものなのか別サーバにあるものなのかは全く登場せずに、
すべてアクターシステム内、もしくはアクターシステム間で解決するようになっています。
これまでのPHPには全く無かったアクターモデルを活用した、PHPの進化!という体験ができると思います。
どうなってるの?
これは Proto.Remoteの仕組みや構成を踏襲した形になっており、
PHPでできる範囲で表現しています。
アクターシステムの内部ではEndpoint ReaderとEndpoint Writerの2つがあり、
それぞれが別アクターシステムとやり取りを行う形になっています。
PHPの場合は、gRPCサーバとしてStreamがうまく扱えないためWebSocketで表現しています
最近SwooleがいろんなStream対応できるように機能追加してくれました
PHPの場合はGoなどのようにそこまで柔軟にCoroutineが扱えないなども実際あり
若干微妙な挙動などもありますが仕組み理解くらいはできるようになっています。
このあたりを活用することで、HTTPリクエストレスポンスを返すだけのサーバと
実処理を担当するサーバに分けたり、
AWSのLAmbdaなどを組み合わせて一気に処理したり、
マイクロサービスアーキテクチャに代表されるようなサーガ実装をPHPだけで実現したり、
これまでPHPでは実現が難しかった分野へのチャレンジのきっかけにもなるかもしれません。
ぜひいろいろ試したりしてみてください(ぜひPRも送ってください)
終わり