Malboxとは
前回まではアクターを使った簡単な実装について解説しました。
一般的なWebアプリケーションで利用するHTTPのリクエストレスポンスを中心として考えるものと異なり、
情報伝達経路を中心に考えていくように感じられたかもしれません。
アクターの情報伝達には Mailbox の存在を理解しておく必要があります。
それぞれのアクターは現実世界の我々と同じようにアドレスを持ち、
他のアクターからのメッセージはMailboxに配信されます。
メッセージはPIDを指定して送信されますが、全てMailboxに格納され、
アクターは格納されたメッセージを取り出すことで、読み取ることができるようになります。
Mailboxは現実世界の郵便受けをイメージしてもらっても構いませんし、
ある程度開発に慣れている方はQueueというと伝わるかもしれません。
出典:
proto.actor
前回
Mailboxの基本的な挙動
Mailboxの基本的な挙動として、
アクターシステムのメッセージとみなさんがアクターに送信するユーザーメッセージの2 つで構成されます。
システムメッセージは障害が発生した場合にMailboxの処理を一時停止したり再開するために、
アクターコンテキストによって内部的に使用されます。
システムメッセージは、アクターを管理するために内部的にも使用され、
開始、停止、再起動といった重要なメッセージを扱います。
メールボックス内のメッセージは、常にFIFO順序で配信されますが、
システムメッセージがある場合はユーザーメッセージより前に処理され利用になっています。
この点は通常のFIFOと異なりますので覚えておきましょう。
アクターのMailboxには次のルールが適用されます。
メッセージの送信は複数のアクターによって同時に実行されます。
これはmpsc、つまりmultiple producer, single consumerを表します。
メッセージの受信はアクターによって順番に行われます。
特別な挙動が必要な場合は任意の他のルールを適用することもできます。
Mailboxはアクター間で共有することはできません。
デフォルトでは無制限でメッセージをMailboxに格納できるようになっています。
Mailbox Middleware
下記のシンプルなアクターの処理をベースにMailboxを実際に触ってみましょう。
package main import ( "fmt" console "github.com/asynkron/goconsole" "github.com/asynkron/protoactor-go/actor" ) type Hello struct{ Who string } type HelloActor struct{} func NewHelloActor() actor.Actor { return &HelloActor{} } func (state *HelloActor) Receive(context actor.Context) { switch msg := context.Message().(type) { case Hello: fmt.Printf("Hello %v\n", msg.Who) } } func main() { system := actor.NewActorSystem() props := actor.PropsFromProducer(NewHelloActor) pid := system.Root.Spawn(props) system.Root.Send(pid, Hello{Who: "ytake"}) _, _ = console.ReadLine() }
Proto ActorではMailboxに処理を挟み込みたい場合は、
下記のWithMailboxを利用して任意の処理を追加できます。
props := actor.FromProducer( MyActorProducer, actor.WithMailbox(MyMailboxProducer) )
Mailboxにロガーを追加してみよう
それでは早速シンプルなログ出力を追加してみましょう。
Mailboxに任意の処理を挟み込む場合は、下記のMailboxMiddleware
インターフェースを実装します。
最近のWebアプリケーションフレームワークでもよくあるミドルウェア的な表現がされています。
// MailboxMiddleware is an interface for intercepting messages and events in the mailbox type MailboxMiddleware interface { MailboxStarted() MessagePosted(message interface{}) MessageReceived(message interface{}) MailboxEmpty() }
MailboxStarted
は Mailbox自体が起動した時にコールされるメソッドです。
MessagePosted
はMailboxに対して
システムメッセージ・ユーザーメッセージが送信されるとコールされます。
コールされたあとにMailboxにpushされます。
MessageReceived
はMailboxに
システムメッセージ・ユーザーメッセージが到達・受け取るとコールされます。
MailboxEmpty
はMailboxが空になるたびにコールされます。
実際にシンプルなログ出力を追加して挙動を確認してみましょう。
ここでは下記の通りに実装します。
package main import ( "fmt" "log/slog" console "github.com/asynkron/goconsole" "github.com/asynkron/protoactor-go/actor" ) type mailboxLogger struct { logger *slog.Logger } func (m *mailboxLogger) MailboxStarted() { m.logger.Info("Mailbox started") } func (m *mailboxLogger) MessagePosted(msg interface{}) { m.logger.Info("Message posted", slog.String("type", fmt.Sprintf("%T", msg)), slog.Any("message", msg)) } func (m *mailboxLogger) MessageReceived(msg interface{}) { m.logger.Info("Message received", slog.String("type", fmt.Sprintf("%T", msg)), slog.Any("message", msg)) } func (m *mailboxLogger) MailboxEmpty() { m.logger.Info("No more messages") }
これで実際に MessagePosted
や MessageReceived
が受け取るメッセージについて理解することができます。
ここで実装した処理をMailboxで利用するには次の実装をします。
package main import ( "fmt" "log/slog" console "github.com/asynkron/goconsole" "github.com/asynkron/protoactor-go/actor" ) type Hello struct{ Who string } type HelloActor struct{} func NewHelloActor() actor.Actor { return &HelloActor{} } func (state *HelloActor) Receive(context actor.Context) { switch msg := context.Message().(type) { case Hello: fmt.Printf("Hello %v\n", msg.Who) } } type mailboxLogger struct { logger *slog.Logger } func (m *mailboxLogger) MailboxStarted() { m.logger.Info("Mailbox started") } func (m *mailboxLogger) MessagePosted(msg interface{}) { m.logger.Info("Message posted", slog.String("type", fmt.Sprintf("%T", msg)), slog.Any("message", msg)) } func (m *mailboxLogger) MessageReceived(msg interface{}) { m.logger.Info("Message received", slog.String("type", fmt.Sprintf("%T", msg)), slog.Any("message", msg)) } func (m *mailboxLogger) MailboxEmpty() { m.logger.Info("No more messages") } func main() { system := actor.NewActorSystem() props := actor.PropsFromProducer( NewHelloActor, actor.WithMailbox(actor.Unbounded(&mailboxLogger{logger: system.Logger()})), ) pid := system.Root.Spawn(props) system.Root.Send(pid, Hello{Who: "ytake"}) _, _ = console.ReadLine() }
actor.NewActorSystem
でアクターシステムを生成した後、
system(任意の変数名).Logger
でslogを利用したProto Actorのロガーにアクセスできます。
実際に利用するには事前に説明したようにWithMailboxを使います。
props := actor.PropsFromProducer( NewHelloActor, actor.WithMailbox(actor.Unbounded(&mailboxLogger{logger: system.Logger()})), )
アクター生成時に利用するPropsFromProducerの第2引数以降に指定します。
第2引数以降は全てオプション扱いとなっています。
actor.Unbounded
は、Mailboxに対して無制限に作用するものとなります。
これで冒頭のサンプルにあったHelloActorのMailbox全てに作用するロガーということになります。
actor.Unbounded
の第2引数以降はオプションとなりますが、
同様にMailboxMiddleware
インターフェースを実装した処理を追加できます。
actor system started
はアクターシステム起動時に出力されるメッセージですので、
それ以降のメッセージがMailboxに追加したログ出力となります。
その後の出力されている Message posted lib=Proto.Actor type=*actor.Started message=&{}
は、
*actor.Started
、つまりHelloアクターが起動したことを示すシステムメッセージが
HelloアクターのMailboxに送信されたことを意味しています。
その後 Mailbox started
は MailboxStarted
がコールされたことを表しています。
次の Message posted type=main.Hello message={Who:ytake}
は、
HelloアクターのMailboxに送信された、ということになります。
その次の Message received lib=Proto.Actor type=*actor.Started message=&{}
は、
Helloアクターが起動したことを示すシステムメッセージを受け取ったことを意味しています。
次の Hello ytake
はHelloアクター内でメッセージを出力したログです。
さらに Message received type=main.Hello message={Who:ytake}
、
Helloアクターがユーザーメッセージを受け取ったことを示しています。
最後に全てのメッセージを処理したため、No more messages
となっています。
これでMailbox自体の流れは理解できたと思います。
この仕組みを使って、任意でいろんな処理を追加することができます。
簡単ですね!
Mailboxで制限
デフォルトでは無制限でMailboxにメッセージを格納できると説明しましたが、
必ずしもそれがベストではない場合もあります。
何かしらの悪意ある攻撃やアプリケーションのメモリ不足など、
過負荷状態に陥ったり、クラッシュやメッセージ損失などが起こる可能性がある場合は、
無制限のままにしておくことはできません。
こうした場合はMailboxに制限を設けて処理できるサイズを指定できます。
制限を設ける場合は下記のようにactor.Bounded
を利用してサイズを指定します。
props := actor.PropsFromProducer(
NewHelloActor,
actor.WithMailbox(actor.Bounded(2)),
)
2は極端な例ですので、実際は環境に合わせて設定すると良いでしょう。
また、受け付けたもの以外をドロップする設定にする事もできます。
その場合は actor.BoundedDropping
を利用してください。
props := actor.PropsFromProducer(
NewHelloActor,
actor.WithMailbox(actor.BoundedDropping(2)),
)
actor.Bounded
とactor.BoundedDropping
の第1引数はサイズとなりますが、
第2引数以降はMailboxMiddleware
インターフェースを実装した処理を追加できますので、
前述したロガーのような処理や、任意でバッファを設けるなどができます。
併せてバックプレッシャーについて理解しておくといいでしょう。
(実践するには理解が必要です)
Dispatcher、Invokerを理解しよう
ここまでMailboxについて簡単に解説しましたが、
MailboxにはDispatcherとInvokerという2つのハンドラがあります。
Dispatcher
DispatcherはMailboxからメッセージを取り出し、
アクターが処理するための方法(スレッドやgoroutineなど)でスケジュールすることを担当します。
Proto Actor(Go)で提供されている方法は goroutine を使った呼び出しとなっています。
Proto Actor(C#)やAkkaなどでは、スレッドプールを利用したものとなっています。
DispatcherはProto ActorもAkka / Pekkoも基本的には同じ概念を指しています。
Mailboxのスループットを制限する責任もあり、
スループットを制限することで使用中のスレッドを解放して、
他Mailboxの実行をスケジュールする、などの役割を持っています。
Proto Actor(Go)ではDispatcherが元から2つ用意されています。
NewDefaultDispatcher
デフォルトで利用されているのがこのDispatcherです。
goroutineでコールされているだけのもので、
デフォルトではスループットが300に設定されています。
NewSynchronizedDispatcher
goroutineを利用せず、同期的な処理が必要な場合に利用します。
並行性が必要でない場合や、リソースが限られている環境などに利用できます。
Proto Actor(Go)はシンプルなDispatcherが提供されていますが、
Goの特性を生かしたものとして提供されています。
実現したいものに合わせて独自に実装したものを利用することもできますが、
Proto Actorについてある程度理解していることが前提ですので、
通常はどちらかを選ぶことをおすすめします。
用意されているものを使って、スループットなどを指定したい場合は、
これまでのactor.PropsFromProducerを使った指定と同様で、
actor.WithDispatcher
を使います。
props := actor.PropsFromProducer(
NewHelloActor,
actor.WithDispatcher(actor.NewDefaultDispatcher(1000)),
)
actor.NewDefaultDispatcher
、actor.NewSynchronizedDispatcher
が選択できます。
Invoker
InvokerはDispatcherによってスケジュールされたメッセージを
実際にアクターのメソッドとして呼び出す役割を担っています。
アクターコンテキストとしてメッセージを取得したあとにアクターのReceive メソッドを呼び出します。
つまり実装時によく目にするReceiveはInvokerによってコールされているということです。
メッセージの処理中にエラーが発生すると、
Mailboxは登録されているInvokerにエラーをエスカレートし、アクターの再起動などを実行します。
エスカレート時の振る舞いなどについては、アクターシステムにおける重要な要素の一つSupervisionが担当しますが、
こちらは長編なので他の機会で解説します。
Invokerについては簡単に任意の処理に変更することはできません。
さいごに
今回はProto Actorにおけるメッセージ処理パイプラインの中心を担う、
Mailbox、Dispatcher、Invokerについて解説しました。
これらはアクターモデルを実践するには必要不可欠な存在ですので、
実際に簡単なコードを書いて理解しておきましょう!