Goで実践 アクターモデル vol.6 Supervision / スーパーバイザー戦略

Supervisionの続き

前回はアクターモデルの監視・監督について説明していきました。
特に親子関係による監視・監督についてはなんとなく理解することができたのではないかと思います。

今回は親子関係にないアクター間の監視について触れていきます。

blog.ytake.jp.net

ライフサイクルモニタリングとは

前回までの親子の関係とは異なり、
各アクターは他のアクターを監視することができます。

これまで説明してきたようにアクターはこれまでの伝統的なアプリケーション、もしくはオブジェクトというよりも
有機的ななにかに見えて来ると思います(そのようにイメージしたほうが習得は早いと思います)。

まず、アクターは生成時に完全に生きている状態となります。
そしてアクターは停止などをすることによって生存していない、死んだ状態となります。
これの状態を理解できるのは監督者に当たる親以外は通常わかりません。

しかしこの「生きている状態」から「死んでいる状態」への変化は監視で利用することができます。
監督は何かしらの障害に反応しますが、
監視はこの「死んだ状態」 つまりアクターの終了に反応するために利用できます。

「生きている状態」から「死んでいる状態」への変化をライフサイクルとしてこれを監視するのが、
ライフサイクルモニタリングというわけです。

ライフサイクル監視は監視アクターが受け取る actor.Terminated メッセージを使用して実装できます。

actor.Terminated メッセージを受信するために監視を開始しなければなりません。
これは Context.Watch(targetPID) をコールすることで開始できます。
最後に監視の停止ですが、Context.Unwatch(targetPID) をコールするだけです。

重要なのは 監視リクエストと対象の終了がどの順序で発生してもメッセージが配信されることです。
つまり、登録時に対象が停止状態にあってもメッセージを受け取ることができます。

監督者に当たるアクターが子アクターを単に再起動できない場合、
例えばアクターの初期化中にエラーが発生したときなどに、子アクターを終了させる必要があります。

そんなとき、監視は特に役立ちます。
監督者はその子アクターを監視し、終了したら再作成するか、後で再試行するようにスケジュールすることができます。

例として、Webサーバーアクターがあるとしましょう。
Webサーバーアクターは複数の子アクターを使ってリクエストを処理しますが、初期化時にデータベース接続が失敗した場合、
子アクターを再起動しても問題は解決しません。

この場合、監督者は子アクターの終了を監視し、データベース接続の再試行をスケジュールすることができます。

もう一つの一般的な使用例は、アクターが外部リソースの不在時に失敗する必要がある場合です。
この外部リソースはそのアクター自身の子であることもあります。
例えば、監督者アクターがデータベース接続アクターを持っていて、
第三者が Context.Stop(pid) メソッドや Context.Poison(pid) を使ってその子アクターを終了させると、
監督者もその影響を受ける可能性があります。
そんなときに利用することができます。

実装の仕方

実際にどのようにWatchするのか、少しだけ見てみましょう。

コード自体は非常にシンプルです。
前回のコードを使って少し変更してみましょう。

まずは前回のparentActorを少しだけ変えていきます。
と言っても特定のメッセージ受信時にアクターを停止させるだけです。

func newParentActor() actor.Actor {
    return &parentActor{}
}

func (state *parentActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *hello:
        if msg.Who == "ytake" {
            context.Stop(context.Self())
        }
    }
}

上記の様に helloにytake があった場合に停止するコードとしました。

次に親子関係にもならない otherActor を実装します。
Proto Actorの内部でも利用している actor.Watch(何でもいいです)を受信したら、
メッセージのWatcherにあるPIDの監視を行うように
context.Watch(msg.Watcher) をコールします。
解説の文章にあったものと全く同じです。

そして actor.Terminated が流れてきたら誰が停止したのかわかるように出力できるようにしてみます。

func newOtherActor() actor.Actor {
    return &otherActor{}
}

func (state *otherActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *actor.Watch:
        fmt.Printf("watch: %s\n", msg.Watcher)
        context.Watch(msg.Watcher)
    case *actor.Terminated:
        fmt.Println("terminated")
        fmt.Println(msg.Who)
    }
}

この2つのアクターをRootに配置して独立させ、
otherActorにparentActorを監視させる、ということです。
親子関係にないアクター間はどのようになるのでしょうか、と。

Rootにはいくつアクターを配置しても構いませんので、下記のように生成してみましょう。

func main() {
    system := actor.NewActorSystem()
    rootContext := system.Root
    pid := rootContext.Spawn(
        actor.PropsFromProducer(newParentActor))
    other := rootContext.Spawn(
        actor.PropsFromProducer(newOtherActor))
    rootContext.Send(other, &actor.Watch{Watcher: pid})
    rootContext.Send(pid, &hello{Who: "ytake"})
    _, _ = console.ReadLine()
}

ここではotherActorにはWatchのためのメッセージを送信し、
parentActorには ytakeが含まれるメッセージを送信しています。
つまり停止と監視の依頼をそれぞれにお願いしています。

これを実行すると下記のようになります。

10:58PM INF actor system started lib=Proto.Actor system=hyzNdq69x5Qm9tCJqXbGhc id=hyzNdq69x5Qm9tCJqXbGhc
watch: Address:"nonhost"  Id:"$1"
10:58PM INF [DeadLetter] lib=Proto.Actor system=hyzNdq69x5Qm9tCJqXbGhc pid="Address:\"nonhost\"  Id:\"$1\"" message="Watcher:{Address:\"nonhost\"  Id:\"$2\"}" sender=<nil>
terminated
Address:"nonhost"  Id:"$1"

いつものアクターシステムが起動したことを示すメッセージのあと、
Watchを受け取ったotherActorがどのアクターを監視対象にするのか、を出力しています。
Address:"nonhost" Id:"$1" これがparentActorのアドレスになります(識別子)

このあとparentActorが停止、
DeadLetterにparentActorが停止してメッセージが届かなくなったことがイベントストリームとして流れています(Infoなので出ているだけです)

その後の下記の部分がotherActorのactor.Terminatedに流れてきたことがわかる出力となっています。

terminated
Address:"nonhost"  Id:"$1"

このようにして親子関係にないアクター間であっても監視することができます。
簡単!

コード全体は以下の通りです。

package main

import (
    "fmt"

    console "github.com/asynkron/goconsole"
    "github.com/asynkron/protoactor-go/actor"
)

type (
    hello       struct{ Who string }
    parentActor struct{}
    otherActor  struct{}
)

func newParentActor() actor.Actor {
    return &parentActor{}
}

func (state *parentActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *hello:
        if msg.Who == "ytake" {
            context.Stop(context.Self())
        }
    }
}

func newOtherActor() actor.Actor {
    return &otherActor{}
}

func (state *otherActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *actor.Watch:
        fmt.Printf("watch: %s\n", msg.Watcher)
        context.Watch(msg.Watcher)
    case *actor.Terminated:
        fmt.Println("terminated")
        fmt.Println(msg.Who)
    }
}

func main() {
    system := actor.NewActorSystem()
    rootContext := system.Root
    pid := rootContext.Spawn(
        actor.PropsFromProducer(newParentActor))
    other := rootContext.Spawn(
        actor.PropsFromProducer(newOtherActor))
    rootContext.Send(other, &actor.Watch{Watcher: pid})
    rootContext.Send(pid, &hello{Who: "ytake"})
    _, _ = console.ReadLine()
}

スーパーバイザー戦略

これまで監視に関連したものを解説しましたが、
外部リソースなどの影響でアクターが障害時にどのように再起動させていくかを実装できます。

例えばデータベースなどの高負荷状態にあるものに影響してアクターがクラッシュしてしまう場合、
少しずつ時間を空けて再起動と再接続を行うような戦略などがあります。

Proto Actorではpanicを発生させるとクラッシュした、とみなされるようになっていますので、
errorの返却ではなくpanicでなければ補足されない、と思っておくと良いでしょう。

ExponentialBackoffStrategy

これは指数関数的に失敗からリスタートするまでの時間を増やしながらアクターを再起動していく戦略です。

この戦略は下記のものが提供されています。

actor.NewExponentialBackoffStrategy(10*time.Second, 1*time.Second))))

第1引数はbackoffWindow (time.Duration) となっており、どんな時間枠で再起動を行うかを指定します。
第2引数はinitialBackoff (time.Duration) で、最初に再起動を開始する時間です。
その後リトライ期間が伸びていくようになります。

実際の利用例は下記の通りです。

package main

import (
    "fmt"
    "time"

    console "github.com/asynkron/goconsole"
    "github.com/asynkron/protoactor-go/actor"
)

type (
    hello       struct{ Who string }
    parentActor struct{}
    childActor struct{}
)

func (state *parentActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *hello:
        child := context.Spawn(
            actor.PropsFromProducer(newChildActor))
        context.Send(child, msg)
    }
}

func newParentActor() actor.Actor {
    return &parentActor{}
}

func (state *childActor) Receive(context actor.Context) {
    switch context.Message().(type) {
    case *actor.Started:
        fmt.Println("Starting, initialize actor here")
    case *actor.Stopping:
        fmt.Println("Stopping, actor is about to shut down")
    case *actor.Stopped:
        fmt.Println("Stopped, actor and its children are stopped")
    case *actor.Restarting:
        fmt.Println("Restarting, actor is about to restart")
    case *hello:
        panic("oops")
    }
}

func newChildActor() actor.Actor {
    return &childActor{}
}

func main() {
    system := actor.NewActorSystem()

    rootContext := system.Root
    pid := rootContext.Spawn(
        actor.PropsFromProducer(newParentActor,
            actor.WithSupervisor(
                actor.NewExponentialBackoffStrategy(10*time.Second, 1*time.Second))))
    rootContext.Send(pid, &hello{Who: "ytake"})
    _, _ = console.ReadLine()
}

親に当たるアクターに対して、NewExponentialBackoffStrategy戦略利用を指定しています。

   pid := rootContext.Spawn(
        actor.PropsFromProducer(newParentActor,
            actor.WithSupervisor(
                actor.NewExponentialBackoffStrategy(10*time.Second, 1*time.Second))))

ほかはこれまでと同様ですが、childActorがどのような挙動をするのかわかるようにログ出力を追加しています。

実際に実行するとログ出力は下記のようになります(一気に複数回投げた例)。

ログ出力

panic発生時にRecoveringがすぐに出力されますが、
restartingは指定した1秒後に開始されます。
この例では1度再起動すると通常の安定したアクターとなりますので再起動は一度だけ行われます。
panic発生時にその時のメッセージを再処理する、という振る舞いにはならないため
再び親アクターから再送しなければいけませんので注意しておきましょう。

OneForOneStrategy と AllForOneStrategy

さらにProto.Actor にはOneForOneStrategy と AllForOneStrategy という 2 つの戦略が用意されています。
どちらも子アクターが完全に停止する前に失敗できる回数の制限が設定されています。

この2つの違いですが、
OneForOneStrategyは失敗した子アクターにのみ適用されるのに対し、 AllForOneStrategyは同じ階層に位置するすべての子アクターにも適用されます。

デフォルトはOneForOneStrategyとなっていますが、 利用シーンや負荷分散などの戦略に合わせて使い分けると良いでしょう。

OneForOneStrategyを利用する場合は actor.NewOneForOneStrategy をSpawn時に利用しますが、
ExponentialBackoffStrategyと異なり、再起動以外の指示が選択できるようになっています。
この選択肢・指示(ディレクティブ)として次のような違いがあります。

ResumeDirective

エラーが発生したアクターを再開し、メッセージの処理を続行するよう指示。
エラーは無視され、アクターはそのままの状態でメッセージ処理を続けます。

RestartDirective

エラーが発生したアクターを破棄し、新しいインスタンスに置き換える(再起動)指示。
アクターの状態は初期化され、再起動後に新しいインスタンスとして動作を続けます。

StopDirective

エラーが発生したアクターを停止するよう指示。
停止されたアクターはメッセージの処理を終了しシステムから削除されます。

EscalateDirective

エラーの処理をアクターの親アクターにエスカレーションするよう指示。
このディレクティブにより、エラーの処理が親に引き継がれます。

この4つのディレクティブを使ってどのようにしていくか、戦略を決めていくと良いでしょう。
上記のものを指定する場合は下記の型を利用するだけです。

// DeciderFunc is a function which is called by a SupervisorStrategy
type DeciderFunc func(reason interface{}) Directive
   decider := func(reason interface{}) actor.Directive {
        fmt.Println("handling failure for child")
        return actor.StopDirective
    }

reasonに合わせてディレクティブを複数与えることもできます(panicのメッセージが渡されます)。
この指定と actor.NewOneForOneStrategy を組み合わせて利用します。

   decider := func(reason interface{}) actor.Directive {
        fmt.Println("handling failure for child")
        return actor.StopDirective
    }
    supervisor := actor.NewOneForOneStrategy(10, time.Nanosecond, decider)
    rootContext := system.Root
    pid := rootContext.Spawn(
        actor.PropsFromProducer(newParentActor,
            actor.WithSupervisor(supervisor)))

第1引数は 試行回数、第2引数は試行する期間を指します。
第3引数はディレティブについてです。

これらを使って簡単に動かしてみましょう。
OneForOneとのことで子アクターを2つ生成するようにしてみましょう。
ここではchildActorOneとchildActorTwoを用意します。

type (
    hello         struct{ Who string }
    parentActor   struct{}
    childActorOne struct{}
    childActorTwo struct{}
)

func (state *childActorOne) Receive(context actor.Context) {
    switch context.Message().(type) {
    case *actor.Started:
        fmt.Println("Starting, child actor 1")
    case *actor.Stopping:
        fmt.Println("Stopping, child actor 1")
    case *actor.Stopped:
        fmt.Println("Stopped, child actor 1")
    case *actor.Restarting:
        fmt.Println("Restarting, child actor 1")
    case *hello:
        panic("oops")
    }
}

func newChildActorOne() actor.Actor {
    return &childActorOne{}
}

func (state *childActorTwo) Receive(context actor.Context) {
    switch context.Message().(type) {
    case *actor.Started:
        fmt.Println("Starting, child actor 2")
    case *actor.Stopping:
        fmt.Println("Stopping, child actor 2")
    case *actor.Stopped:
        fmt.Println("Stopped, child actor 2")
    case *actor.Restarting:
        fmt.Println("Restarting, child actor 2")
    case *hello:
        fmt.Println("Hello from parent")
    }
}
func newChildActorTwo() actor.Actor {
    return &childActorTwo{}
}

childActorOneはhelloメッセージを受信するとクラッシュするようにしています。
ログも識別できるように文字を入れています。

次にこれまで同様にparentActorを用意します。

func (state *parentActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *hello:
        child := context.Spawn(
            actor.PropsFromProducer(newChildActorOne))
        context.Forward(child)
        childTwo := context.Spawn(
            actor.PropsFromProducer(newChildActorTwo))
        context.Forward(childTwo)
    }
    }
}

func newParentActor() actor.Actor {
    return &parentActor{}
}

helloメッセージ受信時に前述のchildActorOneとchildActorTwoを生成してメッセージを転送(Forward)しています。
あとはいつもどおりメッセージを送信します。

func main() {
    system := actor.NewActorSystem()

    decider := func(reason interface{}) actor.Directive {
        fmt.Println("handling failure for child")
        return actor.RestartDirective
    }
    supervisor := actor.NewOneForOneStrategy(10, time.Nanosecond, decider)
    rootContext := system.Root
    pid := rootContext.Spawn(
        actor.PropsFromProducer(newParentActor,
            actor.WithSupervisor(supervisor)))
    rootContext.Send(pid, &hello{Who: "ytake"})
    _, _ = console.ReadLine()
}

childActorOneをクラッシュさせると
クラッシュしたアクターのみがディレクティブ(ここでは再起動)に従った挙動を行っているのがわかると思います。

次に actor.NewAllForOneStrategy に変更して実行してみましょう。

func main() {
    system := actor.NewActorSystem()

    decider := func(reason interface{}) actor.Directive {
        fmt.Println("handling failure for child")
        return actor.RestartDirective
    }
    supervisor := actor.NewAllForOneStrategy(10, time.Nanosecond, decider) // 変更
    rootContext := system.Root
    pid := rootContext.Spawn(
        actor.PropsFromProducer(newParentActor,
            actor.WithSupervisor(supervisor)))
    rootContext.Send(pid, &hello{Who: "ytake"})
    _, _ = console.ReadLine()
}

これを実行すると下記のログからわかるように、
childActorOneアクターがクラッシュすると、
同じ階層に位置するchildActorTwoアクターもディレクティブに合わせて実行されているのがわかると思います。

ここで紹介した戦略以外に、
中身を理解すると独自の戦略を実装することもできますので、
ぜひ挑戦してみてください。

おわりに

Proto Actorにはいくつかの監視や戦略が提供されているのが理解できたと思います。
このあたりは各ツールキットでほとんど同様に用意されていますので、
どれかを覚えると簡単に流用することができます。

アクターシステムを利用するうえでは避けては通れず、
このあたりを理解しているかどうかで障害に強くなるかどうかのポイントでもありますので、
簡単なコードを書いて実際に様々なパターンを試してみましょう!

次回はまたちょっと毛色の違うものを解説します。