Goで実践 アクターモデル vol.2 小さなActor アプリケーション

小さなアプリケーションを実装してみよう

今回は、前回のhello worldから少しだけアクターについて掘り下げていきます。

Proto Actorでの用語で記述しますが、Akka/Pekkoと用語がちょっと異なるだけなので、
読み替えてみると良いと思います。

構成を考えよう

アクターは生成(spawn / create)、送受信(send | receive)、状態変化(become)、監督(Supervisor)などを操作します。

リクエスト・レスポンスを中心に考えるHTTP周りのものとはちょっと違うものがいくつかありますね。
はじめて触る方は頭をリセットして取り組んでいきましょう。

このうち生成(spawn / create)と送受信(send | receive)だけを使って
小さなアプリケーションとして、学校生活でお馴染みの学力テストを実装します。

まずはざっくりと学力テストの構造を考えてみましょう。

どんな構造でも構いませんが、
今回の例では教室(クラスルーム)、先生、生徒で構成されるものとして考えます。
HTTPの流れなどは今回含みませんので非常に簡単です。

どんなものを表現したいか、モデリングはどうかなどでアクターの構成が変わります。
どのアクターから何を生成するかは、アプリケーション次第ですのであくまで例だと思ってください。
今回はアクター生成の流れとしては下記のものとします。

flowchart TD
subgraph ActorSystem
教室-->先生
先生-->生徒
end

これらはすべてアクターとして考えます。
担任の先生が固定でいるものを表現する場合は先生から教室を生成しても構いません。
その場合は教室がオーケストレーションを行うものとして考えることができます。

今回はテストだけの観点で教室を中心にやり取りを行う形で実装します。

次に実際の流れがどのようになるか例を示します。
学生時代を思い出していきましょう(電子化されている学校だとフローが違いますが・・)。

  • 授業時間が始まる
  • 教室に先生が来る
  • テスト開始
  • 問題を解く
  • テストの解答用紙を提出する
  • 全員の提出が終わり授業が終了する

授業時間の中でテストを行うものとしての流れがこのようになっているとします。
これをそのままアクターシステムに落とし込んでいきましょう。

授業時間が始まる

細かく考えるとキリがないため、
今回は 授業時間の開始を指示されると教室アクターを生成する
とします。

なんの授業か、大事ですね。
ここでは算数にしておきましょう。

すべててのフローはここから始まると考えても問題がないでしょう。

flowchart TD
subgraph ActorSystem
ルート -->|生成\n授業開始指示| ClassroomActor("教室アクター\n(算数)")
end

繰り返しになりますが、何を起点にしていくかはどのようにモデリングしていくかで変わっていきます。
先生を最初のアクターにしても構いませんし、学校にしても構いません。

教室に先生が来る

小さなアプリケーションで、どのように作っていくかを解説するため、
この場では「先生が来る」を教室アクターが先生アクターを生成すると考えます。

flowchart TD
subgraph ActorSystem
ClassroomActor("教室アクター\n(算数)") --> |生成| Teacher("先生アクター")
end

この文章のまま、先生もルートのアクターとし、
依存として教室アクターに外部から渡すこともできます。

しかしアクターには監督の概念があり、
複数のルートのアクターを利用するといくつか考慮しなければいけない点が出てきます。
この場でそこまで触れると、大きく道が外れてしまうのでいったんは触れずにおきましょう。

テスト開始

細かいテスト内容はさておき、テスト開始だけを表現します。
先生アクターが生成された後何かしらのメッセージを送らなければ、
先生アクターはただ存在しているだけ、となります。

ここでは教室アクターから先生アクターにテスト開始準備の指示をメッセージで送信します。

flowchart TD
subgraph ActorSystem
ClassroomActor("教室アクター\n(算数)") --> |生成\nテスト開始準備| Teacher("先生アクター")
end

テスト準備開始を受け取ると先生は生徒アクターにテスト開始を指示します。

問題を解く

これまで生徒アクターを生成していませんでしたね。
生徒アクターを生成するタイミングはどこでも構いませんが、
今回はテスト開始の指示が来たタイミングで先生アクターが生徒アクターたちを生成します。

flowchart TD
subgraph ActorSystem
Teacher("先生アクター") --> |テスト開始\n生成|StudentActor("生徒アクター")
end

テスト開始を指示されると、生徒アクターたちが問題に向き合います。

テストの解答用紙を提出する

問題を解くと生徒アクターが解答用紙を先生アクターに提出します。

flowchart TD
subgraph ActorSystem
StudentActor("生徒アクター") --> |解答用紙提出|Teacher("先生アクター")
end

先生アクターが生徒アクターたちを周回して回答用紙を回収するようなパターンも表現できます。
この辺りはEIP(Enterprise Integration Patterns)に寄っていきますので、
後の回にしましょう。

全員の提出が終わり授業が終了する

生徒アクター全員が解答用紙を提出すると、テストが終わったとみなされます。
テストが終わると、授業時間も終了するものとして表現します。

先生アクターが全生徒アクターの回答を受け取り、その情報を教室アクターに通知します。

flowchart TD
subgraph ActorSystem
StudentActor("生徒アクター") --> |解答用紙提出|Teacher("先生アクター") 
Teacher("先生アクター") --> |テスト終了|ClassroomActor("教室アクター\n(算数)")
end

これで全体の流れでざっと作れそうですね。
さっそく実装に入っていきましょう。

メッセージを決める

前回の説明にもあったように、アクターはメッセージのみやり取りができます。
慣れ親しんだプログラミングのように、他アクターのメソッドを直接コールすることはできません。

何かをして欲しい場合はすべてメッセージにする必要があります。
落とし込んだものを例にメッセージをざっくりと考えていきましょう。
今回は下記のようにします。

package command

type StartClass struct {
    Subject string
}

type PrepareTest struct {
    Subject  string
}

type StartTest struct {
    Subject string
}

type SubmitTest struct {
    Subject string
    Name    string
}

type FinishTest struct {
    Subject string
}

ClassStarts は授業開始、
PrepareTest はテストの準備、
StartTest はテストの開始、
SubmitTest は解答用紙提出。
ReceiveTest は解答用紙の受け取り、
FinishTest は授業終了

これらを指示するものとしました。

最後に授業が終了したことを示すものを下記のようにします。

package event

type ClassFinished struct {
    Subject string
}

command(コマンド)とevent(イベント)に分かれているのはなぜ?
と思う方もいると思います。

今回はES+CQRSのような作りにはしないため、messageという形で統一してもよいのですが、
せっかくなのでコマンドとイベントの違いを覚えておきます。

コマンド はアクターが実行すべきアクション、状態変更などを指示するものとして表現します。
例にある授業開始などはアクションの指示になっています。
ES+CQRSに興味がある方はご存知かもしれませんが、このアクションは一般的に永続化対象にはなりません。

イベント はアクターやシステムの状態変更を表すものとして表現します。
コマンドを受け取った後、システムがどうなったか、をイベントとします。
通常はxxした、などの過去形で表し、ES(イベントソーシング)で永続化の対象とするものです。
今回の例では、授業終了の指示がありますが、
指示の結果どうなったか、を後々に取り出すため「授業が終了した」として
イベントをひとつだけ用意しています。
永続化の詳しい解説は後の回で解説予定です。
今はそういうものなんだな、と覚えておきましょう。

生徒アクターを実装する

まずは一番簡単な生徒アクターを実装します。
細かい名称はどのようなものでも構いません。
package名などは環境に合わせて適当に置き換えて実装してみてください。

前回の解説にもあったように、
アクターとして機能させるには actor.Actor インタフェースを満たす必要があります。
Receive(context actor.Context) メソッドを実装します。

ただ動作させるだけであれば下記のもので十分ですが、 これだけでは何もしないアクターになってしまいます。

package student

import (
    "github.com/asynkron/protoactor-go/actor"
)

type Actor struct{}

func NewActor() actor.Actor {
    return &Actor{}
}

func (a *Actor) Receive(context actor.Context) {}

生徒はテスト開始指示を受けて解答するのは、決まっています。
つまりcommand.StartTestを受け取った時に動作するようにすれば良いということがわかります。

生徒は一人一人提出までの時間が異なるものとして、
解答時間を1-9秒で設定してみます。
並行で動作しますので、生徒が何人いても最大時間は変わりません。
そして解答結果を送り返せば期待する生徒アクターとして満たせるということになります。

package student

import (
    "fmt"
    "math/rand"
    "time"

    "github.com/asynkron/protoactor-go/actor"
    "github.com/acme/sample/command"
)

type Actor struct{}

func NewActor() actor.Actor {
    return &Actor{}
}

func (a *Actor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *command.StartTest:
        // ランダムで解答時間を1-9秒で設定する
        randTime := rand.Intn(9) + 1
        time.Sleep(time.Duration(randTime) * time.Second)
        // 生徒がテストの問題を解く
        context.Logger().Info(
            fmt.Sprintf("%s が %s テストの解答を提出します", context.Self().GetId(), msg.Subject))
        context.Send(context.Parent(), &command.SubmitTest{Subject: msg.Subject, Name: context.Self().GetId()})
        context.Poison(context.Self())
    }
}

実装はこのようにしました。

ここで注目しておくものがいくつかあります。

まずひとつは context.Self().GetId() です。

これは生徒アクター自身のアドレスを文字列で取得できます。
ここでは誰がテストの回答を提出するのか、を出力するために利用していますが、
context.Self() 自体がアドレスになる *actor.PID となっています。

アクターはアドレスによって識別されるため、
送信するメッセージに *actor.PID() があれば、メッセージの送信元、もしくは戻して欲しい宛先を指示できます。

次に context.Send です。
これは送信先アドレス(相手)を指定してメッセージを送信する、というメソッドです。
第一引数に送信先のアドレス、第二引数に送信したいメッセージを指定します。

このメソッドは、アクターのメッセージ送信でもっとも基本的なものです。

ただしあくまで1方向の送信であり、
このメソッドを使ってメッセージを受信した場合、
アドレスを直接指定しない限り、送信元に応答メッセージを返信することはできません。

この辺りは後の回で詳しく解説します。

さいごに context.Poison です。
これはアクターを終了させるメソッドになっています。
アクターを終了させるということは、そのアクターが持っているリソースを解放するということになります。
とくに理由がない限り、アクターは終了させるようにしましょう。

終了のさせ方はいくつかありますが、
このメソッドはアクターごとのメールボックス内のメッセージを処理した後に停止します。

メールボックスについても後の回で解説します。

先生アクターを実装する

次に先生アクターを実装します。

type Actor struct {
    students   []int
    endOfTests []command.SubmitTest
    replyTo    *actor.PID
    mutex      sync.Mutex
}

func NewActor(students []int, replyTo *actor.PID) actor.Actor {
    return &Actor{
        students:   students,
        replyTo:    replyTo,
        endOfTests: []command.SubmitTest{},
    }
}

先生アクターは生徒アクターを生成するための情報を持っています。
ここでは students というフィールドに生成するstudentアクターの数を持っています。
endOfTestsはテストが終わった生徒アクターの情報を持つためのものです。

アクターはすべて独立して動作するため、
他のアクターなどからアクセスすることはできず、
アクター自身で状態を持つように実装する必要があります。

先ほどの流れから、先生アクターは教室アクターからテスト開始準備と
生徒アクターからの解答用紙提出を受け取ります。
それぞれのメッセージを受け取った時の動作を実装します。

package teacher

import (
    "fmt"

    "github.com/asynkron/protoactor-go/actor"
    "github.com/acme/sample/command"
    "github.com/acme/sample/student"
)

type Actor struct {
    students   []int
    endOfTests []command.SubmitTest
    replyTo    *actor.PID
    mutex      sync.Mutex
}

func NewActor(students []int, replyTo *actor.PID) actor.Actor {
    return &Actor{
        students:   students,
        replyTo:    replyTo,
        endOfTests: []command.SubmitTest{},
    }
}

func (a *Actor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *command.PrepareTest:
        context.Logger().Info("先生が", msg.Subject, "テストを出しました")
        for _, st := range a.students {
            sta, _ := context.SpawnNamed(
                actor.PropsFromProducer(student.NewActor),
                fmt.Sprintf("student-%d", st))
            context.Send(sta, &command.StartTest{Subject: msg.Subject})
        }
        // 生徒がテストを提出する
    case *command.SubmitTest:
        context.Logger().Info(
            fmt.Sprintf("先生が %s の %s テストの解答を受け取りました", msg.Name, msg.Subject))
        a.endOfTests = append(a.endOfTests, *msg)
        // 全員提出したら先生がテストの解答を受け取る
        if len(a.endOfTests) == len(a.students) {
            context.Send(a.replyTo, &command.FinishTest{Subject: msg.Subject})
        }
    }
}

Proto Actorはswitch文でメッセージに対応する処理を記述します。
*command.PrepareTest はテスト開始準備、
*command.SubmitTest は解答用紙提出に対応した処理になっています。
*command.PrepareTest 受信後に生徒アクターを生成してテスト開始指示を送信します。

flowchart TD
subgraph ActorSystem
教室アクター --> |*command.PrepareTest|先生アクター
先生アクター --> |*command.StartTest|生徒アクター
生徒アクター --> |*command.SubmitTest|先生アクター
end

ここではstudentsに格納されている数だけ生徒アクターを生成しています。
アクター生成には context.SpawnNamed を利用しています。
これはアクターに任意の名前をつけて生成するメソッドです。

ここではstudent-1, student-2, student-3, ... というように名前をつけています。

flowchart TD
subgraph ActorSystem
Teacher("先生アクター") --> |生成|student-1
Teacher("先生アクター") --> |生成|student-2
Teacher("先生アクター") --> |生成|student-3
Teacher("先生アクター") --> |生成|student-4
Teacher("先生アクター") --> |生成|student-5
end

名前をつけることでアドレス生徒アクター毎に付与され、
だれがどのような動作をしているかを把握しやすくなります。
名前をつけない場合は自動でアドレスが付与されます。

ここで名前を指定して払い出されたアドレスが生徒アクター側の context.Self().GetId() で取得できます。

context.Send(sta, &command.StartTest{Subject: msg.Subject}) は、
生成された生徒アクターたちにテスト開始を送信しています。

*command.SubmitTest は生徒アクターからの解答用紙提出メッセージを受け取った時の処理です。
受け取った生徒アクターの情報をendOfTestsに追加し、参加者全員が提出したら、
返信先であるreplyTo(送信先は教室アクター)にテスト終了を送信しています。

flowchart TD
subgraph ActorSystem
先生アクター --> |*command.FinishTest|教室アクター
end

これで先生アクターの実装は完了です。

アクターならではの実装

Proto ActorはAkka/Pekkoと同様に「let it crush(クラッシュするならさせておけ)」をポリシーとしています。
ここで詳しく説明すると大きくなるため、後の回で解説します。

アクターが復活するプロセスを少しだけ見てみましょう。

func (a *Actor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *actor.Restarting:
        context.Send(context.Self(), &command.PrepareTest{Subject: "math"})
    case *command.PrepareTest:
        // 何かしらの処理
        panic("なにかしらのエラー")
        // 省略
    }
}

たとえば何かの処理が失敗した場合、
Goで実装する場合は気軽に panic を使うということはあまりありませんが、
Proto Actorの場合、panic はシステムの停止ではなく、アクターをクラッシュさせるために用いることができます。

この場合どのようになるでしょうか?
いつものようなアプリケーションであればコンテナーが停止するほどのものとして扱われますが、
アクターシステムではpanicでアクターが停止すると、そのアクターが再起動される仕組みになっています。
親アクターが、子アクターの状態を監視することでこうした復帰が可能になっており、
他のアクターにはなんら影響を与えません。

再起動はあくまでデフォルトの動作で、クラッシュ時に何をするかは設定で変更できます。
これも後の回で解説します。

アクターが再起動すると、特殊な *actor.Restarting メッセージが送信されてきます。
これを用いて再起動後の処理を記述できます。
このアクターがメモリに残っている情報は消えてしまいますので、永続化などがセットになりますが、
単純な再起動の場合はこのように記述できます。

この例では再起動後に自身にテスト開始指示を送信しています。
こうすることで再度テスト開始指示に対応した処理が勝手に行われるようになります。
(このまま実行すると停止するまで実行し続けるので気をつけてくださいw)

教室アクターを実装する

これまでの実装ととくに異なるところはありません。
同じように実装していきましょう。

package classroom

import (
    "fmt"

    "github.com/asynkron/protoactor-go/actor"
    "github.com/acme/sample/command"
    "github.com/acme/sample/event"
    "github.com/acme/sample/teacher"
)

type Actor struct {
    students []int
}

func NewActor(students []int) func() actor.Actor {
    return func() actor.Actor {
        return &Actor{
            students: students,
        }
    }
}

func (class *Actor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *command.StartsClass:
        pid := context.Spawn(actor.PropsFromProducer(func() actor.Actor {
            return teacher.NewActor(class.students, context.Self())
        }))
        context.Send(pid, &command.PrepareTest{Subject: msg.Subject})
    case *command.FinishTest:
        fmt.Println(&event.ClassFinished{Subject: msg.Subject})
        context.Poison(context.Self())
    }
}

これまでと異なるところはとくにありません。
NewActor の第一引数に生徒の数を受け取るようにしています。
これは教室アクターが生成される際に生徒アクターを生成するための情報として利用します。

*command.StartsClass がルートから送られてきた場合に、
先生アクターをひとつだけ生成しています。
この例では context.Spawn で生成していますので、
先生アクターの説明にもあったようにランダムなアドレスが付与されます。

先生アクターを生成した後に *command.PrepareTest を送信しています。
これで先生アクターがテスト開始準備を行うようになります。

*command.FinishTest が送られてきた場合には、
授業が終了したことを示す *event.ClassFinished を出力しています。
そして context.Poison で自身を終了させています。

これで教室アクターの実装は完了です。

教室アクターを起動する

これまでの実装で教室アクターを起動してみましょう。

package main

import (
    "fmt"

    console "github.com/asynkron/goconsole"
    "github.com/asynkron/protoactor-go/actor"
    "github.com/acme/sample/classroom"
    "github.com/acme/sample/command"
)

func main() {
    system := actor.NewActorSystem()
    cr, err := system.Root.SpawnNamed(
        actor.PropsFromProducer(
            classroom.NewActor(students())),
        "math-classroom")
    if err != nil {
        return
    }
    system.Root.Send(cr, &command.StartsClass{Subject: "算数"})
    _ , _ = console.ReadLine()
}

func students() []int {
    sts := make([]int, 20)
    for i := 0; i < 20; i++ {
        sts[i] = i
    }
    return sts
}

アクターシステムを actor.NewActorSystem() で生成し、
これまでの実装と同じように system.Root.SpawnNamed で教室アクター("math-classroom")を生成しています。
このアドレスは、配下のアクターすべてに影響を与えるものとなります。
とはいえ、そこまで大袈裟なものではありません。

今回の例では、生徒アクターのアドレスは下記のようになります。

math-classroom/$2/student-9

math-classroom は教室アクターのアドレスで、
$2 は先生アクターのアドレス、student-9 は生徒アクターになります。

アクターの階層構造(ヒエラルキー)がわかるようになっていますので、
名前をあらかじめ決定することで階層構造をうまく扱うことができ、
ある階層のアクターすべてにメッセージを通じて現在の状態を返信してもらう、といったことができます。

ほかにこれまでの実装と異なるのは、ルートになるアクターは system.Root を利用しているところです。
このルートアクターはアクターシステムの中心となるわけです。

さいごに system.Root.Send(cr, &command.StartsClass{Subject: "算数"})
教室アクターに授業開始指示を送信しています。

これで教室アクターが起動され、授業が開始から終了まで処理されるようになります。
コンソールに出力されるのを確認しましょう。

コンソールで実行すると?

最後のメッセージをアクターシステム外で使いたいんだけど・・

コンソールに出力された *event.ClassFinished をアクターシステム外で使いたい場合はどうしたらいいでしょうか? goroutineを使ってアクターシステム外で使うこともできますが、
もっとアクターシステムらしいメッセージの取り出し方があります。

アクターモデルは「すべてのものはアクターである」ということを忘れないでください。

Proto Actorには TypedStream というものがあり、
情報の流れであるストリームもアクターとして扱い、
かつ型指定することでストリームに流れるメッセージを取り出すことができます。
(channelを使っているだけですが・・)

シンプルな仕組みの場合はこの TypedStream を使ってメッセージを取り出すことができます。
(ほかにも方法はもちろんあります)

一番最後に出力される *event.ClassFinished を取り出すようにしてみましょう。

package main

import (
    "fmt"

    "github.com/asynkron/protoactor-go/actor"
    "github.com/asynkron/protoactor-go/stream"
    "github.com/acme/sample/classroom"
    "github.com/acme/sample/command"
    "github.com/acme/sample/event"
)

func main() {
    system := actor.NewActorSystem()
    p := stream.NewTypedStream[*event.ClassFinished](system)
    cr, err := system.Root.SpawnNamed(
        actor.PropsFromProducer(
            classroom.NewActor(p.PID(), students())),
        "math-classroom")
    if err != nil {
        return
    }
    go func() {
        system.Root.Send(cr, &command.StartsClass{Subject: "算数"})
    }()
    r := <-p.C()
    fmt.Printf("%s テストが終了しました\n", r.Subject)
}

func students() []int {
    sts := make([]int, 20)
    for i := 0; i < 20; i++ {
        sts[i] = i
    }
    return sts
}

stream.NewTypedStream[*event.ClassFinished](system)TypedStream を生成し、
system.Root.SpawnNamed で教室アクターに TypedStreamアクターのアドレス(p.PID())を渡しています。

教室アクターは、さいごにストリームアクターに *event.ClassFinishedを送信すれば、
<-p.C() で取り出すことができます。

flowchart TD
subgraph ActorSystem
教室アクター --> |Send|ストリームアクター
end
ストリームアクター --> |channel|アクターシステム外

教室アクターの実装は下記のようになります。

type Actor struct {
    stream   *actor.PID
    students []int
}

func NewActor(stream *actor.PID, students []int) func() actor.Actor {
    return func() actor.Actor {
        return &Actor{
            stream:   stream,
            students: students,
        }
    }
}

func (class *Actor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    // 省略
    case *command.FinishTest:
        context.Send(class.stream, &event.ClassFinished{Subject: msg.Subject})
        context.Poison(context.Self())
    }
}

これでアクターシステム内のメッセージを取り出すことができます。
ただしタイムアウトしそうなくらい長い時は?とか、タイムアウトしてしまった時は?など考慮する点はありますので、
注意しましょう。

サンプルコードはこちら

github.com

goroutineとアクターの違い

Go言語に慣れている方は、goroutineと何が違うんだろう?と思うかもしれません。
コード上でみると似たようなものに見えると思います。

Go自体はCommunicating Sequential Processes(CSP)(1978)をベースにしており、
アクターはそれよりも前の時代(1973)のものという違いもありますが。

goroutineはGo言語のランタイムに組み込まれている軽量で非同期に実行するしくみです。
channelを利用して高い並行性を実現します。
(channelにもアクターと同じようにIDがあります)

アクターモデルは個々のアクターの隔離を行い、
メッセージパッシングによる並行性を実現します。
今回の例ではローカルで動作するようにしていますが、
先生アクターや生徒アクターがそれぞれ別サーバで動作するようになっていても基本は変わりません。

今回は省いていますが、各アクターはそれぞれ自身のメールボックスを持ち、
そのメールボックスにあるもののみを処理するという特徴があります。
メモリの共有を行わずアクターがそれぞれ独立し、
言語のランタイム・物理的な配置にかかわらず動作するモデルです。
そうした隔離性と耐障害性(クラッシュ後の戦略など)によりシステムの堅牢性が高まる設計になっています。

コード上では似てそうですが問題解決の範囲が異なるため、
それぞれの特性を理解して使い分けられると良いのではないでしょうか。

回を追うごとにちょっとずつ別もの感が出てきますので、お楽しみに。

さいごに

今回はアクターモデルの基本的な実装を行いました。
なにやらおもしろそうなことがたくさんありますが、
次回はより詳しくなるために各コンポーネントなどについて解説します。(予定)

ちょっと古いけどProto Actorにはちょうどいい本

おすすめのエレキギター用 弦