Proto Actor(Go)でスキャッタ・ギャザーを実装してみよう

メッセージパッシングの世界にようこそ。
ここしばらくはアクターモデルばっかりやってるおじさんです。

アクターモデルといえば、多くの方々はAkka/Pekkoを連想すると思います。

pekko.apache.org

akka.io

が、実はGoでも実践に投入できるレベルのものがいくつかあります。

proto.actor

cloud.ergo.services

今回はprotoactor-goを使って、スキャッタ・ギャザー実装の解説をしていきます。

アクターモデル自体の解説はしませんので、
わかっている方 or 興味ある方向けにはなりますのでご了承ください。

スキャッタ・ギャザーって何?

エンタープライズ統合パターンという、
テクノロジーに依存しないソリューションとしてメッセージングを扱ったパターンがあり、
そのうちの一つがこのスキャッタ・ギャザーです。

access.redhat.com

www.enterpriseintegrationpatterns.com

スキャッタ・ギャザーは、
特定のグループにメッセージを送り、
グループ内のそれぞれが処理を行うなどして集約する仕組みのことです。

参考 https://access.redhat.com/documentation/ja-jp/red_hat_fuse/7.8/html/apache_camel_development_guide/scatter-gather

分散システムデザインパターンの本にもあるものなので、
目にした方も多いと思います。

ミドルウェアなどにも多く採用されているもので、
通常のWebアプリケーションではあまり見ることがないものかもしれませんが、
マイクロサービスアーキテクチャや、メッセージ駆動系のアプリケーションでは必須のテクニックとなっています。

この機会にぜひ覚えましょう!

どうやって実装するの?

Goはgroutineなどがありますので、特に特殊な何かをしなくとも処理の流れを作ることはできます。

が、実際にメッセージングを扱う以上、障害などにも強く弾力性もあるものでなければなりません。
一つのプロセスだけでなく、複数のプロセス、
もしくは物理的に異なるものに対しても変わらずに動くシステムにしなければなりません。

www.reactivemanifesto.org

アクターモデルを導入することで、これらの問題を解決することができるということで、

github.com

ということで今回はprotoactor-goの出番です。

protoactor-goはAkka/Pekkoほど強力ではありませんが、
実践投入しても問題ないもので、
当然1プロセスでもサーバ間でもアクター同士がきちんと関係性を持ち、
状態変化(become/unbecome)や監督(supervise)、クラスタリングなども当然サポートされていて
Goでは割とコスト低めでおすすめなものです。(が、Goの開発サイクルはあまり早くなく.NET版の方が盛ん)

goroutineとアクターの違いは、
どちらもプロセス間通信ものもではありますが、
アクターモデルは位置の把握や識別などが可能で、
channelとは差異があります。
解説するとそれだけで長くなるので、ここでは似たような感じのもの と思っていただいて良いです。

同じ販売商品でも、形態によって異なる中の最安値のデータを判断するとか、
店舗に問い合わせて在庫を持っているところだけに発注を出すとか、
そういうものを連想していただければ良いです。

今回は、少し古い本ですがAkka実践バイブルにある交通監視カメラの画像ファイル名を扱う例をそのまま再現します。

画像ファイルを扱うメッセージの定義は下記のものとします。

type PhotoMessage struct {
    ID              string
    Photo           string
    CreationTime    *time.Time
    Speed           *int
}

なんてことはありません。
画像識別のためのIDと、画像ファイル名などがあります。
スピードや撮影日はあったりなかったりする例です。

画像ファイルには撮影日やその時の速度が入っている、という形になります。
この画像ファイルから撮影日と速度を取り出し、
最後に集約して一つのメッセージにしていきます。

この撮影日抽出と速度抽出を二つのアクターがそれぞれを担当します。
スキャッタ・ギャザーを実装するにはこの二つだけでは当然足りません。

画像メッセージをディスパッチするスキャッタ機能を担当するアクター、
結果を集約するアクターが必要になります。

スキャッタ

実装する方法はいくつかありますが、
ここでは受信したメッセージを複数のアクターに送信するRecipient List
を利用します。

type RecipientList struct {
    PIDs []*actor.PID
}

撮影日抽出と速度抽出のアクターにメッセージを送信するため、それぞれのアクターを渡せるようにしています。
RecipientListアクターはメッセージを受け取ると
対象のアクターにメッセージを送信します。

func (state *RecipientList) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *message.PhotoMessage:
        for _, recipient := range state.PIDs {
            context.Send(recipient, msg)
        }
    }
}

定義したメッセージのみを指定したそれぞれのアクターに送信する形になります。
特別なことは特にありません。

Akka/Pekkoの(!)はProto ActorではSendになります。

実際の送られているかどうかは下記のテストコードで確認できます。

func stubActorProps(ref *actor.PID) *actor.Props {
    return actor.PropsFromFunc(func(ctx actor.Context) {
        switch msg := ctx.Message().(type) {
        case *message.PhotoMessage:
            ctx.Send(ref, msg)
        default:
        }
    })
}

func TestRecipientList_Receive(t *testing.T) {
    t.Run("scatter the message", func(t *testing.T) {

        system := actor.NewActorSystem()
        p := stream.NewTypedStream[*message.PhotoMessage](system)
        ti := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC)
        expectMsg := &message.PhotoMessage{
            ID:    "id1",
            Photo: makeCreatePhotoString(ti, 60)}

        go func() {
            p1 := stubActorProps(p.PID())
            p2 := stubActorProps(p.PID())
            pa1 := system.Root.Spawn(p1)
            pa2 := system.Root.Spawn(p2)
            re := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor {
                return &RecipientList{
                    PIDs: []*actor.PID{pa1, pa2}}
            }))
            system.Root.Send(re, expectMsg)
        }()
        p1msg := <-p.C()
        if p1msg != expectMsg {
            t.Errorf("expected %v, got %v", expectMsg, p1msg)
        }
        p2msg := <-p.C()
        if p2msg != expectMsg {
            t.Errorf("expected %v, got %v", expectMsg, p2msg)
        }
    })
}

stream.NewTypedStream はAkka Typedと大体似たような形になっており、
ストリームの中から型指定でメッセージを取り出せるようになっているものです。
値を取り出す場合は、Proto ActorではRequestFuture(いわゆるFutureです)を使うなどして、
送信もとのアクターで取り出すことができますが、
同様にストリームのパイプラインとして扱うことでgoroutineを使って取り出すことができます。
この辺りはGoっぽい感じですんなり利用できると思います。

ストリームについてわからない方は、
時間軸の流れの中心になる道、というようなイメージで問題ありません。

ギャザー

集約を担当するAggregatorを実装します。

これは名前の通り複数のメッセージを集約するのに利用し、ギャザーとして作用させます。
複数のメッセージを保持し、決まった数を受信したらその分だけ処理するなどができます。

早速なので、2つメッセージを受信したら処理を行うようにします。

構造体は下記のようにしておきます。
集約したメッセージを受け取るアクターをpipeとしています。
messagesは処理していないメッセージを保持するためのスライスです。

type Aggregator struct {
    pipe     *actor.PID
    messages []*message.PhotoMessage
}

メッセージを一つにまとめる処理は次のとおりです。

func (state *Aggregator) Receive(ctx actor.Context) {
    switch msg := ctx.Message().(type) {
    case *message.PhotoMessage:
        var found *message.PhotoMessage
        for _, m := range state.messages {
            if m.ID == msg.ID {
                found = m
                break
            }
        }
        if found != nil {
            newCombinedMsg := &message.PhotoMessage{
                ID:           msg.ID,
                Photo:        msg.Photo,
                CreationTime: msg.CreationTime,
                Speed:        msg.Speed,
            }
            newCombinedMsg.UpdatePhotoMessage(found)
            ctx.Send(state.pipe, newCombinedMsg)
            var newMessages []*message.PhotoMessage
            for _, m := range state.messages {
                if m.ID != found.ID {
                    newMessages = append(newMessages, m)
                }
            }
            state.messages = newMessages
        } else {
            state.messages = append(state.messages, msg)
        }
}

仕組み上Akka/Pekkoよりはコード量が多いですが、Goに慣れている人であれば特に普通のコードです。

下記の部分はメッセージを二つ以上受信するとできる処理であることを示しています。
無闇に関係ないメッセージを処理することはできないため、IDが一致するもののみを対象としています。

       var found *message.PhotoMessage
        for _, m := range state.messages {
            if m.ID == msg.ID {
                found = m
                break
            }
        }

次の部分では、処理して良いメッセージが二つ以上ある場合は、
先に来たメッセージを基準として、後に来たメッセージで更新して
二つをマージする処理となっています。
処理後はpipeで指定されている他のアクターの送信されます(結果として受け取れます)

そしてこのアクターはメッセージの状態を持っていますので、処理したメッセージを
再処理しないように処理済みを取り除いてアクターの状態を更新しています。
(アクターのライフサイクルについては今回触れませんので、どこかで理解するとよりわかると思います)

       if found != nil {
            newCombinedMsg := &message.PhotoMessage{
                ID:                     msg.ID,
                Photo:              msg.Photo,
                CreationTime: msg.CreationTime,
                Speed:             msg.Speed,
            }
            newCombinedMsg.UpdatePhotoMessage(found)
            ctx.Send(state.pipe, newCombinedMsg)
            var newMessages []*message.PhotoMessage
            for _, m := range state.messages {
                if m.ID != found.ID {
                    newMessages = append(newMessages, m)
                }
            }
            state.messages = newMessages
        } else {
            state.messages = append(state.messages, msg)
        }

elseではまとめて処理できるものが見つからない、つまり1件しかないということになりますので、
アクターが保持して、次のメッセージ受信まで保管しています。

アクター同士の関係を作っていきますので、returnなどはできません。
(やるとしても違う方法になります。)

この動作を確認するテストコードは次のとおりです。

   t.Run("aggregate two messages", func(t *testing.T) {
        ti := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC)
        create := time.Date(2023, 2, 3, 4, 5, 6, 0, time.UTC)
        ps := makeCreatePhotoString(ti, 60)

        msg1 := &message.PhotoMessage{ID: "id1", CreationTime: &create, Photo: ps}
        s := 60
        msg2 := &message.PhotoMessage{ID: "id1", Photo: ps, Speed: &s}
        expect := &message.PhotoMessage{ID: "id1", Photo: ps, CreationTime: msg1.CreationTime, Speed: msg2.Speed}

        system := actor.NewActorSystem()
        p := stream.NewTypedStream[*message.PhotoMessage](system)
        go func() {
            re := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor {
                return NewAggregator(1*time.Second, p.PID())
            }))
            system.Root.Send(re, msg1)
            system.Root.Send(re, msg2)
        }()
        res := <-p.C()
        if !reflect.DeepEqual(res, expect) {
            t.Errorf("expected %v, got %v", expect, res)
        }
    })

二つのメッセージが、撮影日と速度を別々に持っており、
Aggregatorのアクターが二つのメッセージを集約することを期待するコードとなっています。

できた気がします。
正常に動くことだけを担保するのであればこれで問題ありませんが、
これだけでは不十分です。

仮にメッセージの集約処理に時間がかかりすぎた場合、
他のメッセージ処理ができずにスタックしてしまうかもしれません。
アクターにメッセージを保持しすぎてメモリーを逼迫してしまう可能性もあります。

解決方法はいろいろありますが、ここではアクターモデル採用しているツールキットなどでお馴染みの
タイムアウトを利用してみましょう。
指定した時間以上経過した場合は、未処理のものをそのままpipeに送信するようにします。

       if found != nil {
            newCombinedMsg := &message.PhotoMessage{
                ID:           msg.ID,
                Photo:        msg.Photo,
                CreationTime: msg.CreationTime,
                Speed:        msg.Speed,
            }
            newCombinedMsg.UpdatePhotoMessage(found)
            ctx.Send(state.pipe, newCombinedMsg)
            var newMessages []*message.PhotoMessage
            for _, m := range state.messages {
                if m.ID != found.ID {
                    newMessages = append(newMessages, m)
                }
            }
            state.messages = newMessages
        } else {
            // メッセージが一件しかない場合はタイムアウトを設定した後に
            // アクターにメッセージを持たせている
            state.messages = append(state.messages, msg)
            ctx.SetReceiveTimeout(state.timeout)
        }

先ほどのコードに ctx.SetReceiveTimeout(state.timeout) で指定した時間以上待機する場合はタイムアウトを発生するように
アクターに指定します。

外から指定できるようにする場合は、先ほどの構造体を次のようにするといいでしょう。

type Aggregator struct {
    timeout  time.Duration
    pipe     *actor.PID
    messages []*message.PhotoMessage
}

Akka/Pekkoと異なり、Timeout指定時にメッセージをくっつけられないため、
タイムアウトだけ指定しています。
同様の動きにしたい場合は、少し工夫が必要になります(ここでは簡単なものにしておきます)

アクターでタイムアウトが発生すると *actor.ReceiveTimeout メッセージが自アクターに送信されます。
これを受け取った詩に、保管しているメッセージを全てpipeのアクターの送信し、
保持しているメッセージの領域を開けるようにしてみます。
(前述の通り実践に投入する場合は、タイムアウトしたメッセージだけを送信して、その分だけ解放するようにしましょう)

   case *actor.ReceiveTimeout:
        for _, m := range state.messages {
            ctx.Send(state.pipe, m)
        }
        state.messages = nil

手抜きですが、これで長い時間がかかる処理に対してのリソース問題を手抜きして解決しました。
これだけでは実はまだ予期せぬ事態に対応できません。

例えばアクターは何かしらの問題が起きると再起動し、その時の状態がなくなってしまいます。
Goの場合はpanicを投げるとアクターは再起動されます。
(振る舞いは該当のアクターだけ再起動するか、親も含めて再起動するかなど変更できます。)

予期せぬ事態にも対応できるようにしておきましょう。
とは言ってもメッセージの内部で意図的にpanicを起こすのは面倒なため、
特定のメッセージを受け取ったらpanicを発生させられるように、
ここでは意図的に実行できるようにします。

   case *message.IllegalStatePanicMessage:
        // 意図的にパニックを起こすメッセージを受信するとパニックを検知して、
        // Aggregatorのメッセージを持っている状態で停止する
        panic("this is a scheduled panic")
    }

IllegalStatePanicMessageを受け取るとpanicを起こして再起動が走るようにします。
ではこの再起動時にメッセージが消失しないようにするにはどうしたらいいのでしょうか。

これはProto Actorのライフサイクルを把握しておくと簡単に対応できます。

Akka/PekkoにはPreRestartがありますが、Proto Actorでは *actor.Restarting メッセージをアクターが受け取るタイミングで
同じような処理ができます。

   case *actor.Restarting:
        for _, v := range state.messages {
            ctx.Send(ctx.Self(), v)
        }
        state.messages = nil

自アクターにメッセージを送信して再起動後に続きから処理できるようにする、という解決方法を取ることができます。
これである程度の事態に対応できるようになりました。

2件以上で処理したいのに1件のみ受信、1秒以上経過するとタイムアウトを起こし、
持っているメッセージをそのまま戻すことを下記のようなコードでテストできます。

   t.Run("send message after timeout", func(t *testing.T) {
        ti := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC)
        ps := makeCreatePhotoString(ti, 60)
        msg1 := &message.PhotoMessage{ID: "id1", CreationTime: &ti, Photo: ps}

        system := actor.NewActorSystem()
        p := stream.NewTypedStream[*message.PhotoMessage](system)
        go func() {
            re := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor {
                return NewAggregator(1*time.Second, p.PID())
            }))
            system.Root.Send(re, msg1)
        }()
        res := <-p.C()
        if !reflect.DeepEqual(res, msg1) {
            t.Errorf("expected %v, got %v", msg1, res)
        }
    })

意図的にパニックを起こし、アクターを再起動させた後にもう1件メッセージを送り
処理を再開できることをテストするには次のようになります。

   t.Run("aggregate two messages when restarting", func(t *testing.T) {
        ti := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC)
        ps := makeCreatePhotoString(ti, 60)
        create := time.Date(2023, 2, 3, 4, 5, 6, 0, time.UTC)

        msg1 := &message.PhotoMessage{ID: "id1", CreationTime: &create, Photo: ps}
        s := 60
        msg2 := &message.PhotoMessage{ID: "id1", Photo: ps, Speed: &s}
        expect := &message.PhotoMessage{ID: "id1", Photo: ps, CreationTime: msg1.CreationTime, Speed: msg2.Speed}

        system := actor.NewActorSystem()
        p := stream.NewTypedStream[*message.PhotoMessage](system)
        go func() {
            re := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor {
                return NewAggregator(1*time.Second, p.PID())
            }))
            system.Root.Send(re, msg1)
            system.Root.Send(re, &message.IllegalStatePanicMessage{})
            system.Root.Send(re, msg2)
        }()
        res := <-p.C()
        if !reflect.DeepEqual(res, expect) {
            t.Errorf("expected %v, got %v", expect, res)
        }
    })

これで現在想定しているAggregatorの振る舞いは大丈夫そうです。

撮影日抽出と速度抽出 2つのアクター

この二つは大したことはありません。
画像名が 01012022 12:34:56.789|55|XYZ123 のようなものを想定して、
撮影時間|速度|ナンバープレートの番号 というような形式を例としています。
(この辺りは書籍などを確認ください)

それぞれの抽出のみ担当するアクターの例です。

撮影日抽出アクター

type GetTime struct {
    pipe *actor.PID
}

func NewGetTimeActor(pipe *actor.PID) actor.Actor {
    return &GetTime{
        pipe: pipe,
    }
}

func (g *GetTime) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *message.PhotoMessage:
        i := &process.ImageProcessing{}
        t, _ := i.GetTime(msg.Photo)
        msg.CreationTime = &t
        context.Send(g.pipe, msg)
    }
}

速度抽出アクター

type GetSpeed struct {
    pipe *actor.PID
}

func NewGetSpeedActor(pipe *actor.PID) actor.Actor {
    return &GetSpeed{
        pipe: pipe,
    }
}

func (g *GetSpeed) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *message.PhotoMessage:
        i := &process.ImageProcessing{}
        speed, _ := i.GetSpeed(msg.Photo)
        msg.Speed = &speed
        context.Send(g.pipe, msg)
    }
}

結合

これで最後に各アクターをスキャッタ・ギャザーに結合させます。

結合も大して大変ではありません。
これまで同様にTypedStreamを使ってメッセージを受信します。
一つの画像名から撮影日抽出アクターと速度抽出アクターがそれぞれ担当します。
送るメッセージは1つだけです。

       system := actor.NewActorSystem()
        p := stream.NewTypedStream[*message.PhotoMessage](system)
        d := time.Date(2023, 9, 10, 4, 5, 6, 0, time.UTC)
        speed := 60
        ps := makeCreatePhotoString(d, speed)
        msg := &message.PhotoMessage{
            ID:    "id1",
            Photo: ps,
        }

結合は下記の通りです。

結合イメージ

これに合わせて組み合わせます。
Aggregatorがメッセージを保持しますので、
GetSpeedActor、GetTimeActorそれぞれのメッセージを集約するようにします。
(2つのアクターにいるpipe つまりAggregatorにそれぞれメッセージを送信します)
RecipientListはこの2つのアクターに同じメッセージを送します。

Aggregatorで指定されているp.PIDは集約結果を受け取る、TypedStreamのアクターです。

       go func() {
            ref := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor {
                return NewAggregator(timeOut, p.PID())
            }))
            speedRef := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor {
                return NewGetSpeedActor(ref)
            }))
            timeRef := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor {
                return NewGetTimeActor(ref)
            }))
            actorRef := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor {
                return &RecipientList{
                    PIDs: []*actor.PID{speedRef, timeRef}}
            }))
            system.Root.Send(actorRef, msg)
        }()

集約したメッセージは下記の通り二つの処理結果をマージしたものになります。

       expect := &message.PhotoMessage{
            ID:           msg.ID,
            Photo:        msg.Photo,
            Speed:        &speed,
            CreationTime: &d,
        }
        res := <-p.C()
        if !reflect.DeepEqual(res, expect) {
            t.Errorf("expected %v, got %v", expect, res)
        }

送信時は速度や撮影日がありませんでしたが、それぞれ抽出した後に結合されるので
上記のメッセージが受け取れるようになります。

これでスキャッタ・ギャザーを一通り実装することができました。
実際に実装する場合は、アクターは何個いても構いません。
アクターの紐づく子アクターを生成してメッセージをそれぞれ保持することもできますし、
それぞれのアクターがまた他のアクターと関係性を持っても構いません。
最終的にメッセージが集約されて期待通りになればどんな組み合わせでも良いです。

おわりに

アクターモデルを使わずともKakfaなどを合わせてやれば作ることは確かにできますが、
やはりこうした処理はアクターモデルのツールキットを使うことで並列でメッセージをうまく扱えるようになります。
(Kafkaを使ってつくるのであればKafka StreamsやApache Sparkなどを併用することになります)

Akkaの本を元にProto Actor(Go)に置き換えるのはさほど苦労なくできるのがわかるかと思います。
対象内容が古くなっていてもProto Actorを通じてアクターモデルを習得することが十分できますので、
ぜひやってみてください。

とはいいつつなんだかんだAkka/Pekkoの方が楽です