ちょっと時間が空いてしまいましたが・・
これまではローカルのみでアクターを動かす簡単なアプリケーションと、
基本的な機能や考え方について解説しました。
今回はProto Actorを使って、ネットワーク上でアクターシステム同士がコミュニケーションを行い、
分散して処理行うRemoteについて簡単に解説していきます。
小さなアプリケーションを分散してみよう
下記の小さなアプリケーションを例にRemoteに一部を置き換えてみましょう。
これは学校と先生と生徒をアクターで実装する内容となっていました。
今回はこの生徒を他のアクターシステムに移動して、
複数のアクターシステムで稼働するように変更し、どのように動作をするのか理解していきます。
このあたりの仕組みはAkka / Pekkoよりも多少簡単なしくみにはなっていますが、
理解するには十分な内容になっていると思います。
おさらいですが、例としてはこのような形でした。
flowchart TD subgraph ActorSystem 教室アクター --> |*command.PrepareTest|先生アクター 先生アクター --> |*command.StartTest|生徒アクター 生徒アクター --> |*command.SubmitTest|先生アクター end
今回は次の形に変更します。
flowchart TD subgraph RemoteActorSystem["リモートアクターシステム"] 生徒アクター["生徒アクター"] end subgraph s1[Endpoint] gRPC["gRPC"] end subgraph LocalActorSystem["ローカルアクターシステム"] 先生アクター["先生アクター"] 教室アクター["教室アクター"] end 教室アクター -- *command.PrepareTest --> 先生アクター 先生アクター -- *command.StartTest --> gRPC gRPC -- *command.StartTest --> 生徒アクター 生徒アクター -- *command.SubmitTest --> gRPC gRPC -- *command.SubmitTest --> 先生アクター
Proto Actorでは処理を分散させる場合、
gRPCを利用するためProtocol Buffersを利用したメッセージ変更する必要があるため、
先生アクターと生徒アクターが利用していたメッセージを
protoファイルに記述してprotocコマンドでコンパイルします。
この2つの間で利用されていたのはcommand.StartTest
と command.SubmitTest
でした。
下記のように記述してみましょう。
syntax = "proto3"; package message; option go_package = "github.com/your/path/to"; message StartTest { string subject = 1; } message SubmitTest { string subject = 1; string name = 2; }
go_packageなどはお使いの環境に合わせて変更してください。
生徒アクターはRemoteのアクターシステムへ
難しそうに感じるかもしれませんが簡単です。
以前のコードを別なアクターシステムで実行するように、
main関数を違うファイルに記述しましょう。
例えばremote.go
のようなファイルにしておきましょう。
package main import ( console "github.com/asynkron/goconsole" "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/remote" "github.com/acme/sample/student" ) func main() { system := actor.NewActorSystem() remoteConfig := remote.Configure("127.0.0.1", 50052, remote.WithKinds(&remote.Kind{ Kind: "student", Props: actor.PropsFromProducer(student.NewActor), })) rt := remote.NewRemote(system, remoteConfig) rt.Start() _, _ = console.ReadLine() }
これがremoteで起動する生徒アクターが存在するアクターシステムとなります。
remoteを利用する場合はローカルだけで実行する場合と異なり、
アクターシステムの設定を少しだけ記述する必要があります。
remote.Configure
を使ってこのアクターシステムで起動するgRPCサーバのホストとポートを指定します。
次のremote.WithKinds
はこのアクターシステムで生成されるアクターの種類を登録するものです。
ここでは一つだけですが、remote.WithKinds
は複数のアクターを登録することができます。
リモートを使ったアクター間の設定方法はこれ以外にもいくつか用意されていますが、
ここではアクターを生成せずに、「どのようなアクターがリモートに存在するか」を記述しています。
あとはgRPCサーバとして起動しているだけです(ここでは127.0.0.1:50052)。
rt := remote.NewRemote(system, remoteConfig) rt.Start()
なぜこのような指定になるのかというと、
図のように内部でEndpoint ReaderとEndpoint Writerが起動し、
アクターシステム間で相互通信を行う仕組みになっています。
つまりこのホストとポートの指定は背後で動くgRPCに関する設定ということです。
おっと忘れていました。
アクターモデルの基本は「全てのものはアクターである」という哲学がありましたね。
ということでEndpoint ReaderとEndpoint Writerは、アクターとして実装されています。
興味がある方はぜひコードを読んでみましょう。
さてさて、次にリモートで稼働する生徒アクターの実装を少しだけ変更します。
package student import ( "fmt" "math/rand" "time" "github.com/asynkron/protoactor-go/actor" "github.com/acme/sample/message" ) type Actor struct{} func NewActor() actor.Actor { return &Actor{} } // Receive is sent messages to be processed from the mailbox associated with the instance of the actor func (a *Actor) Receive(context actor.Context) { switch msg := context.Message().(type) { case *message.StartTest: // ランダムで解答時間を1-9秒で設定する randTime := rand.Intn(9) + 1 time.Sleep(time.Duration(randTime) * time.Second) // 生徒がテストの問題を解く context.Logger().Info( fmt.Sprintf("%s が %s テストの解答を提出します", context.Self().GetId(), msg.Subject)) context.Send(context.Sender(), &message.SubmitTest{Subject: msg.Subject, Name: context.Self().GetId()}) context.Poison(context.Self()) } }
変更点が多少間違い探しのようなものですが・・・:)
前述のようにメッセージとして利用する構造体をprotoファイルから生成したものに変更しています。
message.StartTest
とmessage.SubmitTest
ですね。
次に context.Send(context.Sender(), &message.SubmitTest{Subject: msg.Subject, Name: context.Self().GetId()})
の部分です。
変更前のコードは、送信先のPid(アクターリファレンス)をcontext.Parent()
としていましたが、
ローカルとリモートで分割されたアクターはcontext.Parent()
で参照ができないため、
ローカルから返信先のアクターリファレンスを指定し、それをcontext.Sender()
で取り出す形に変更しています。
多少指定方法が変わりましたが、そのくらいの変更です。
ローカルのアクターシステムを変更しよう
まずはmain関数からです。
package main import ( "fmt" "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/remote" "github.com/asynkron/protoactor-go/stream" "github.com/acme/sample/classroom" "github.com/acme/sample/command" "github.com/acme/sample/event" ) func main() { system := actor.NewActorSystem() remoteConfig := remote.Configure("127.0.0.1", 50053) rt := remote.NewRemote(system, remoteConfig) rt.Start() p := stream.NewTypedStream[*event.ClassFinished](system) cr, err := system.Root.SpawnNamed( actor.PropsFromProducer( classroom.NewActor(p.PID(), students(), rt)), "math-classroom") if err != nil { return } go func() { system.Root.Send(cr, &command.StartsClass{Subject: "算数"}) }() r := <-p.C() fmt.Printf("%s テストが終了しました\n", r.Subject) } func students() []int { sts := make([]int, 20) for i := 0; i < 20; i++ { sts[i] = i } return sts }
ローカルのアクターシステムもリモートと同様に、
どのホストでポートは何番でgRPCサーバを起動しますよ、と指定します(ここでは127.0.0.1:50053)。
これでEndpoint ReaderとEndpoint Writerを担当するアクターが起動します。
この例ではローカルとリモートの2つとして紹介していきますが、
サーバがいくつあっても構いません。
次に大きな変更点ではありませんが、
教室アクターから先生アクターを生成するようになっていましたので、
protoactor-go/remote
の *remote.Remote
を追加します。
これはリモートに登録したアクターを生成するために利用するもので、
先生アクターからリモートに登録した生徒アクターを複数生成する、
という挙動へ変更するために使用します。
package classroom import ( "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/remote" "github.com/acme/sample/command" "github.com/acme/sample/event" "github.com/acme/sample/teacher" ) type Actor struct { stream *actor.PID students []int remote *remote.Remote } func NewActor(stream *actor.PID, students []int, remote *remote.Remote) func() actor.Actor { return func() actor.Actor { return &Actor{ stream: stream, students: students, remote: remote, } } } // 省略
これ以外の変更はありません。
次に先生アクターを少しだけ変更しましょう。
元は先生アクターから生徒アクターを生成していましたので、
同じ流れのまま少しだけ変えていきます。
package teacher import ( "fmt" "time" "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/remote" "github.com/acme/sample/command" "github.com/acme/sample/message" ) type Actor struct { students []int endOfTests []message.SubmitTest replyTo *actor.PID remote *remote.Remote } func NewActor(students []int, replyTo *actor.PID, remote *remote.Remote) actor.Actor { return &Actor{ students: students, replyTo: replyTo, endOfTests: []message.SubmitTest{}, remote: remote, } } // Receive is sent messages to be processed from the mailbox associated with the instance of the actor func (a *Actor) Receive(context actor.Context) { switch msg := context.Message().(type) { case *actor.Restarting: // リスタート時に実行される context.Send(context.Self(), &command.PrepareTest{Subject: "math"}) case *command.PrepareTest: context.Logger().Info("先生が", msg.Subject, "テストを出しました") for _, st := range a.students { rp, err := a.remote.SpawnNamed( "127.0.0.1:50052", fmt.Sprintf("student-%d", st), "student", 5*time.Second) if err != nil { context.Logger().Info("生徒が見つかりません") } context.RequestWithCustomSender(rp.Pid, &message.StartTest{Subject: msg.Subject}, context.Self()) } // 生徒がテストを提出する case *message.SubmitTest: context.Logger().Info( fmt.Sprintf("先生が %s の %s テストの解答を受け取りました", msg.Name, msg.Subject)) a.endOfTests = append(a.endOfTests, *msg) // 全員提出したら先生がテストの解答を受け取る if len(a.endOfTests) == len(a.students) { context.Send(a.replyTo, &command.FinishTest{Subject: msg.Subject}) } } }
生徒アクターとやり取りを行う部分は、message.StartTest
とmessage.SubmitTest
に変更されました。
他にここで注目する点は下記の部分です。
for _, st := range a.students { rp, err := a.remote.SpawnNamed( "127.0.0.1:50052", fmt.Sprintf("student-%d", st), "student", 5*time.Second) if err != nil { context.Logger().Info("生徒が見つかりません") } context.RequestWithCustomSender(rp.Pid, &message.StartTest{Subject: msg.Subject}, context.Self()) }
remote.SpawnNamed
でリモートのアクターシステムに、アクター生成を指示しています。
ここで指定しているのは
127.0.0.1:50052
で起動しているgRPCサーバ、つまりリモートのアクターシステムfmt.Sprintf("student-%d", st)
として複数の生徒アクターとして名前をつける- リモートに登録されている student を使って生成する
- 5秒以内に生成されるものとしてFutureを利用
ということになります。
Futureで指定した時間以上かかる場合はerr
で通知されることになります。
他にもまだあります。
以前のコードはcontext.Send
を利用していましたが、
今回はcontext.RequestWithCustomSender
に変更されました。
これはリモートで生成される生徒アクターが子アクターではなく、
リモートのアクターというコンテキストになるため、
RequestWithCustomSender
を使って返信先であるSenderを明示する、というものです。
これでリモートからローカルにメッセージが返ってくるようになります。
動かしてみよう
先生アクター、生徒アクター間で分散システムとなりますので、
ここでは先に生徒アクターを稼働させたいリモートのアクターシステムを起動しておきましょう。
go run
やコンパイルしたバイナリでも構いませんので起動すると下記のように出力されます。
ローカルだけで起動していたアクターシステムとちょっとだけ出力されるメッセージが異なりますが、
前述した図の構成で起動しています。
次にローカルのアクターシステムを起動して先生アクターから生徒へ算数テストの開始を指示します。
コンソールを複数使って起動してください。コンテナでもおk
どうやら生徒たちが算数テストに向き合い始めたようです。
このときリモートのアクターシステムのコンソールには次のように出力されるはずです。
activator/Remote$student-1
のようなログは、これまでのヒエラルキーの表現と少しだけ変わっていますが
リモートを使ったアクターシステム間の表現となっています。
SpawnNamedで指定した名前で識別されているのが確認できます(並行・非同期であることは変わりません)。
Supervisionの解説でもあったようにローカル・リモート間で監視も可能ですので、
試しに動かす場合には色々試してみると良いでしょう。
大きな変更なくアクターシステム間で分散が簡単に実現できるのが理解できたと思います。
次回はアクターモデルを活用できるヒントになるものを紹介します。