Goで実践 アクターモデル vol.4 メッセージが届かない時に何が起きるか

メッセージが届かない時って?

前回はアクターモデルのMailboxにメッセージが届いた時にどの様な仕組みで処理されるかを見ていきました。
今回はメッセージが届かない時に何が起きるかを見ていきます。

DeadLetters

メッセージが届かない時にはDeadLettersと呼ばれるアクターにメッセージが配信されます。
この配信はベストエフォートで行われます。

通常のHTTPのリクエストレスポンスを中心に扱うアプリケーション作りとは異なり、
アクターモデルではアクターの終了も意図的に行うことができるため、
アクター終了を指示後に誤ってメッセージを配信してしまった、なんてことも実際に起こり得ます。

また、ネットワーク越しにメッセージを送信する場合、メッセージが届かないこともあります。
アクターモデルではこうした「届かないかもしれない」、という状況に対してもDeadLettersという仕組みで対応しています。

AWSのSQSなどでもメッセージが届かない場合にはDeadLetterQueueにメッセージが転送されるような仕組みがあります。
それと同じような仕組みだと思ってもらえれば良いかと思います。

とくにアクターが送信者または受信者が間違って設定されていないかどうか、
デッドレターを調査することで問題を特定できます。

が、
あくまで問題がある場合にのみDeadLetterが利用されるものです。
デッドレターサービスは、他のすべてのメッセージ送信と同じ配信保証ルールに従うため、保証された配信を実装するために使用することはできません。

DeadLetterの使い方

アクターは、イベントストリーム(Event Stream)を利用することでDeadLetterを購読でき、
購読時点からシステム内のすべてのDeadLetterを受け取ります。

DeadLetterはネットワークを介して伝播されませんので、
1つのネットワークノードごとに1つのアクターで購読して、手動で転送するなどの工夫が必要になります。
これはクラスターの説明時に詳しく説明します。

基本的にDeadLetterは配信されなかったメッセージのためのチャネルで、
処理されなかった、もしくはアクターに配信されなかったすべてのメッセージが格納されます。
ほとんどの場合、メッセージを送信するためには使用されませんので、
なにか特別なユースケースのために拡張するような使い方などはせずに
メッセージに問題がある場合にのみ使用されるものとしっかりと認識しておきましょう。

それは早速DeadLetterへの配信を実際に購読してみましょう。

package main

import (
    "fmt"

    console "github.com/asynkron/goconsole"
    "github.com/asynkron/protoactor-go/actor"
)

type hello struct{}

func (h *hello) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case string:
        fmt.Println("hello ", msg)
    }
}

func main() {
    system := actor.NewActorSystem()
    pid := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor {
        return &hello{}
    }))
    system.EventStream.Subscribe(
        func(evt interface{}) {
            fmt.Printf("Event %T\n", evt)
            fmt.Println("Event", evt)
        })
    system.Root.Send(pid, "world")
    f := system.Root.PoisonFuture(pid)
    f.Wait()
    system.Root.Send(pid, "world")
    _, _ = console.ReadLine()
}

まず購読方法についてです。
EventStreamはアクターシステムを経由してアクセスすることができます。
Rootから生成されたアクターなど、全てがアクセスすることができます。

EventStreamを購読する場合は、Subscribe を利用します。
Subscribeは引数が一つだけで、コールバックで指定します。

コールバックは type Handler func(interface{}) となり、
interface{} にはEventStreamに流れてくるメッセージとなっています。

このコード例はEventStreamに流れてくる全てのメッセージを出力しています。
DeadLetterだけを利用する場合は、*actor.DeadLetterEvent を指定します。

   system.EventStream.Subscribe(
        func(evt interface{}) {
            switch msg := evt.(type) {
            case *actor.DeadLetterEvent:
                fmt.Println("DeadLetterEvent", msg)
            }
        })

購読後にメッセージを送信、その後生成されたアクターを破棄しています。

   system.Root.Send(pid, "world")
    f := system.Root.PoisonFuture(pid)
    f.Wait()

アクターそのものを破棄する場合は Poison、またはPoisonFutureが利用できます。
PoisonFutureはWaitを利用することで削除終了を待つことができますので、
確実にDeadLetterへの配信を再現できます。

その後、存在しなくなったアクターのPIDを指定してメッセージ送信、
つまり存在しないアクターへのメッセージ送信になります。
これを実行すると EventStreamに流れてくるのが出力されます。

DeadLetterへ意図的に送信することもできます。
不明なPIDや、伝達に関して何かが起きた場合に利用するなどが良いかもしれません。

func main() {
    system := actor.NewActorSystem()
    pid := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor {
        return &hello{}
    }))
    system.EventStream.Subscribe(
        func(evt interface{}) {
            switch msg := evt.(type) {
            case *actor.DeadLetterEvent:
                fmt.Println("DeadLetterEvent", msg)
            }
        })
    system.DeadLetter.SendUserMessage(pid, "hello")
    system.DeadLetter.SendSystemMessage(pid, "hello")
    _, _ = console.ReadLine()
}

このようにアクターシステムからDeadLetterへアクセスすることができます。
SendUserMessage は通常のアクターへ送信されるユーザーメッセージ同様に順番を保ち配信されます。
SendSystemMessage は即配信されるシステムメッセージと同じ扱いとなります。

またDeadLetterを停止させる Stop も用意されていますが、
リソースに問題があるとき以外はあまり利用することはないかもしれません。

子アクターもDeadLetterは使えるの?

EventStreamはアクターシステム内全てに流れるストリームとなっています。
ではRootでSpawnしたアクターから、さらに生成されたアクターなどのDeadLetterも受け取ることはできるのでしょうか?

先程までのコードに、helloアクターでメッセージを受信したタイミングで
helloアクターの子にあたるhello2アクターを生成し、
hello2アクターから存在しないPID向けにメッセージを送信してみましょう。

package main

import (
    "fmt"

    console "github.com/asynkron/goconsole"
    "github.com/asynkron/protoactor-go/actor"
)

type hello struct{}

type hello2 struct{}

func (h *hello) Receive(context actor.Context) {
    switch context.Message().(type) {
    case string:
        child := context.Spawn(actor.PropsFromProducer(func() actor.Actor {
            return &hello2{}
        }))
        context.Send(child, "hello")
    }
}

func (h *hello2) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case string:
        fmt.Println("hello2 ", msg)
        context.Send(&actor.PID{Id: "sample"}, "hello2")
    }
}

func main() {
    system := actor.NewActorSystem()
    pid := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor {
        return &hello{}
    }))
    system.EventStream.Subscribe(
        func(evt interface{}) {
            switch msg := evt.(type) {
            case *actor.DeadLetterEvent:
                fmt.Println("DeadLetterEvent", msg)
            }
        })
    system.Root.Send(pid, "hello")
    _, _ = console.ReadLine()
}

おそらく下記のように出力されるはずです。

これまでと同様に system.EventStream.Subscribe に流れてきます。
つまりヒエラルキーを形成している全てのDeadLetterを把握することができるということになります。

また同様にこのhello2アクターでDeadLetterを購読することもできます。

func (h *hello2) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case string:
        context.ActorSystem().EventStream.Subscribe(func(evt interface{}) {
            switch msg := evt.(type) {
            case *actor.DeadLetterEvent:
                fmt.Println("DeadLetterEvent", msg)
            }
        })
        fmt.Println("hello2 ", msg)
        context.Send(&actor.PID{Id: "sample"}, "hello2")
    }
}

RootでSubscribeする以外は、
Receiveの引数であるcontextからアクターシステムのEventStreamにアクセスできます。
こうすることでRootで購読するのと全く同じものを受け取ることができますので、
Rootで購読せずに、DeadLetterだけを監視するアクターという形で専任させることもできます。

さいごに

今回は届かなかったメッセージについて簡単に説明しました。

EventStream自体はまた違う機会でも解説しますが、アクターシステム内のPubSubとしての機能を使い、
DeadLetterが流れてくるという一般的なWebアプリケーションフレームワークなどには
あまりない仕組みとなっていますので、
構成要素として理解しておくといいでしょう!

次回はアクターシステムの中でも重要な要素、監督 / Supervisionについて解説します。