Goで実践 アクターモデル vol.5 Supervision 1

はじめに

今回は、アクターモデルの監視・監督について説明します。
監督は親アクターが子アクターの状態を監督する機能で、
監視はアクターモデルをサポートするツールキットの特徴のひとつで、アクターが他のアクターの状態を監視する機能です。

一般的なWebアプリケーションフレームワークや、
一般的な処理フローからは想像しにくいかもしれませんが、
とあるアクターが異常終了した際に監視しているアクターに異常終了自体を通知したり、
親アクターが子アクターの再起動方法を指定できます。

これまで簡単にヒエラルキーについて解説しましたが、このヒエラルキーと監督は関連しており、
しっかり把握するのがアクターモデルを理解するための一歩となります。

そんな監督・監視の概念と、Proto.Actorにおいて監視とはどのような意味を持つかについて説明します。

前回

blog.ytake.jp.net

監督

これまでの説明からも親アクターは子アクターにタスクを委任することは理解できたと思いますが、
その子アクターの失敗に対応する責務を持っています(監督者になる)。
(Akka、Pekkoなども同様に監督・監視機能を持っています)
*ここで指す子アクターとは、自身で生成 (spawn) した処理を移譲するアクターすべてを指します。

子アクターの失敗を検知すると(Proto Actorではpanicを検知します)、
自分自身とすべての子アクターを一時停止し、失敗を通知するメッセージを送信します。
これには4つのオプションがあります。

  • Resume : アクターを再開させ、蓄積された内部状態を保持
  • Restart : アクターを再起動し、蓄積された内部状態をクリア
  • Stop : アクターを永久に停止し、メッセージ処理を行わない
  • Escalate : 階層内における自アクターに失敗を親アクターにエスカレートする

アクターモデルにおけるアクターは常にヒエラルキーの一部として考えなければなりません。
失敗に合わせてどのように子アクターに指示を出すのかを実装しながら考えておきましょう。
とりあえず再起動させてしまえ!でも構いません。

伝統的なアプリケーションのように、Exceptionをキャッチして処理するのではなく(GoなのでExceptionの仕組みはありませんが)、
アクターモデルでは失敗を受け取り、それに対応する戦略を決定することが重要です。

ではどんなことに気をつけておくべきなのか、少し説明します。
下記の動作は、ヒエラルキー内のアクターを操作するときに頭に入れておく必要があります。

アクターを再開すると、そのすべての子アクターが再開されます。
アクターを再起動すると、そのすべての子アクターが再起動されます。
同様に、アクターを終了すると、そのすべての子アクターも終了します。

アクターの再起動イベントのデフォルト動作は、再起動前にすべての子アクターを終了すること、となっていますが、
Proto Actorではデフォルトの動作を変更することができるようになっています。
ある程度理解してから自分のアプリケーションに合わせてカスタマイズしてみてください。

監督はpanicを検知すると上記の4つの選択肢のいずれかに変換できるように構成されています。
厳密にいうとpanic / recoverを内部的に利用して、アクターの失敗を検知しています。

この大本の処理は、mailboxのrun関数内で行われています。
開発者が直接的にこの関数を呼び出すことはありませんが、
独自のMailboxを実装する場合はこの仕組みを理解しておくと良いでしょう!

func (m *defaultMailbox) run() {
    var msg interface{}

    defer func() {
        if r := recover(); r != nil {
            m.invoker.EscalateFailure(r, msg)
        }
    }()
    i, t := 0, m.dispatcher.Throughput()
    for {
        if i > t {
            i = 0
            runtime.Gosched()
        }

        i++
        // keep processing system messages until queue is empty
        if msg = m.systemMailbox.Pop(); msg != nil {
            atomic.AddInt32(&m.sysMessages, -1)
            switch msg.(type) {
            case *SuspendMailbox:
                atomic.StoreInt32(&m.suspended, 1)
            case *ResumeMailbox:
                atomic.StoreInt32(&m.suspended, 0)
            default:
                m.invoker.InvokeSystemMessage(msg)
            }
            for _, ms := range m.middlewares {
                ms.MessageReceived(msg)
            }
            continue
        }

        // didn't process a system message, so break until we are resumed
        if atomic.LoadInt32(&m.suspended) == 1 {
            return
        }

        if msg = m.userMailbox.Pop(); msg != nil {
            atomic.AddInt32(&m.userMessages, -1)
            m.invoker.InvokeUserMessage(msg)
            for _, ms := range m.middlewares {
                ms.MessageReceived(msg)
            }
        } else {
            return
        }
    }
}

異常が発生したアクターを特別に識別するようなものはとくにありません。
特定のアクターにのみ異なる戦略を適用したいと思うこともあるかもしれませんが、
監督に関する理解が難しくなるため、この場合の推奨される方法は監督レベルを追加する、となります。

アクターを作成するときに監督についての戦略(スーパーバイザー戦略といいます)を設定できるため、
アクター毎に異なる戦略を持つアクターを事前に作成できる、ということになります。
このため、障害を受け取ったあとでなんとかするのではなく、
事前にどのように対処するかを決めておくことができます。

なおProto.Actor(Go)では「親の監督」のみがサポートされています。
アクターは他のアクター(親)によってのみ作成でき、トップレベルのアクターはアクターシステムによって提供されます。
このため親アクターは子アクターの監督者としての役割を果たします。

アクターが外部から監督されることはない、と保証されていますので、
意図しない別なところから不意にエラーが捕捉されることはない、としっかり理解しておきましょう。

なお監視はどのアクターでも担当できます。
そんな監視の動きを少しだけ見てみましょう。

監視の動きを見てみよう

親のアクターは子のアクターの監視者/ヒエラルキーの関係があると、と説明しましたが、
実際にどのような動きをするのか、少し見てみましょう。

親アクターは子アクターの停止状態を簡単に検知できます。

package main

import (
    "fmt"

    console "github.com/asynkron/goconsole"
    "github.com/asynkron/protoactor-go/actor"
)

type (
    hello       struct{ Who string }
    parentActor struct{}
)

func (state *parentActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *hello:
        props := actor.PropsFromProducer(newChildActor)
        child := context.Spawn(props)
        context.Send(child, msg)
    case *actor.Terminated:
        fmt.Println("child terminated", msg.Who)
    }
}

func newParentActor() actor.Actor {
    return &parentActor{}
}

type childActor struct{}

func (state *childActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *actor.Restarting:
        fmt.Println("restarting")
    case *hello:
        fmt.Printf("Hello %v\n", msg.Who)
        context.Stop(context.Self())
    }
}

func newChildActor() actor.Actor {
    return &childActor{}
}

func main() {
    system := actor.NewActorSystem()
    supervisor := actor.RestartingSupervisorStrategy()
    rootContext := system.Root
    props := actor.
        PropsFromProducer(newParentActor,
            actor.WithSupervisor(supervisor))
    pid := rootContext.Spawn(props)
    rootContext.Send(pid, &hello{Who: "ytake"})
    _, _ = console.ReadLine()
}

このコードは、親アクターが子アクターを生成し、子アクターがメッセージを受け取ると停止する、というものです。
とくに特別な動作はしていませんが、childActor が停止すると actor.Terminated が親アクターに送信されます。

actor.Terminated は、アクターが停止したことを示すメッセージで、Who に停止したアクターの名前が入ります。
デフォルトでは子アクター以外のアクターは停止したことを検知できませんが、
context.Watch(targetPID) を使うことで、他のアクターの停止を検知できます。
また監視対象から外す場合は context.Unwatch(targetPID) を使います。

この例では次のような出力になります。

実際にはアプリケーション設計にふさわしい方法を選択してください。
次に監督の詳細をみていく前にアクターの再起動について説明します。

アクターの再起動とは

特定のメッセージ処理中にアクターに何らかのエラー発生した場合、失敗の原因は大体以下3つのカテゴリに分類されます。

  • 特定のメッセージのプログラムエラー(システムエラー)
  • メッセージ処理中に使用される外部リソースの(一時的な)失敗
  • アクターの内部状態の破損

失敗が特定可能でない場合、3番目の原因がもっとも一般的な原因と考え、
アクターは内部状態をクリアする必要があるという考え方があります。
つまり再起動することが一番安全かもしれない、ということです。

伝統的なアプリケーションでは再起動というのは難しい操作ですが、
Proto.Actorも含め、各言語のツールキットでは新しいアクターを生成し、
失敗したインスタンスを新しいものに置き換えることが簡単にできます。
そんなアクターモデルでは再起動が推奨されています。

再起動は他のアクターは他のアクターには影響も与えず、
新しいアクターはメールボックスの処理を再開しますが、
メッセージを過去に戻って再処理を行うということはありません。

過去に戻って再処理を行う場合は、
persistence、つまり永続化を利用することで実現できます。
永続化についてはまた別の機会に説明します。

実際にpanicを発生させて、再起動を確認してみましょう。

package main

import (
    "fmt"

    console "github.com/asynkron/goconsole"
    "github.com/asynkron/protoactor-go/actor"
)

type (
    hello       struct{ Who string }
    parentActor struct{}
)

func (state *parentActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *hello:
        props := actor.PropsFromProducer(newChildActor)
        child := context.Spawn(props)
        context.Send(child, msg)
    case string:
        props := actor.PropsFromProducer(newChildActor)
        child := context.Spawn(props)
        context.Send(child, msg)
    }
}

func newParentActor() actor.Actor {
    return &parentActor{}
}

type childActor struct{}

func (state *childActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *actor.Restarting:
        fmt.Println("restarting")
    case string:
        fmt.Printf("Hello %v\n", msg)
    case *hello:
        panic("Ouch")
    }
}

func newChildActor() actor.Actor {
    return &childActor{}
}

func main() {
    system := actor.NewActorSystem()
    supervisor := actor.RestartingSupervisorStrategy()
    rootContext := system.Root
    props := actor.
        PropsFromProducer(newParentActor,
            actor.WithSupervisor(supervisor))
    pid := rootContext.Spawn(props)
    rootContext.Send(pid, &hello{Who: "ytake"})
    rootContext.Send(pid, "ytake")

    _, _ = console.ReadLine()
}

では少し解説します。

上記のコードは、親アクターが子アクターを生成し、子アクターがpanicを発生させるというものです。

parentActorhellostring を受け取り、それぞれ子アクターを生成してメッセージを送信します。
このとき生成されるアクターは childActor です。
この例ではメッセージを受けるたびに生成されるのあまりメモリには優しくありませんが、
サンプルとしてはこのようにしています。

func (state *parentActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *hello:
        props := actor.PropsFromProducer(newChildActor)
        child := context.Spawn(props)
        context.Send(child, msg)
    case string:
        props := actor.PropsFromProducer(newChildActor)
        child := context.Spawn(props)
        context.Send(child, msg)
    }
}

次に、childActorhellostring を受け取り、それぞれメッセージを出力します。
このとき、hello を受け取った場合にpanicを発生させるようにしています。

func (state *childActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *actor.Restarting:
        fmt.Println("restarting")
    case string:
        fmt.Printf("Hello %v\n", msg)
    case *hello:
        panic("Ouch")
    }
}

片方のアクターがダウンしてももう片方のアクターは生きていることが確認できます。
この childActor がpanicを発生させると、再起動が走り actor.Restarting が受け取られることが確認できます。
再起動中を示すメッセージは actor.Restarting です。

このアクターに対してメッセージを送信しているのが下記のコードです。

func main() {
    system := actor.NewActorSystem()
    supervisor := actor.RestartingSupervisorStrategy()
    rootContext := system.Root
    props := actor.
        PropsFromProducer(newParentActor,
            actor.WithSupervisor(supervisor))
    pid := rootContext.Spawn(props)
    rootContext.Send(pid, &hello{Who: "ytake"})
    rootContext.Send(pid, "ytake")

    _, _ = console.ReadLine()
}

parentActor に対して hellostring を送信しています。
hello を送信するとpanicが発生し、再起動が走ることが確認できます。
一方で string を送信すると他のアクターはpanicと無関係のため、正常にメッセージが出力されます。

ではもう少し変えて、同じアクターに対してpanicとその後にメッセージを受け取れるか確認してみましょう。

package main

import (
    "fmt"

    console "github.com/asynkron/goconsole"
    "github.com/asynkron/protoactor-go/actor"
)

type (
    hello       struct{ Who string }
    parentActor struct{}
)

func (state *parentActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *hello:
        props := actor.PropsFromProducer(newChildActor)
        child, err := context.SpawnNamed(
            props, "child")
        if err != nil {
            fmt.Println(err)
        }
        context.Send(child, msg)
    }
}

func newParentActor() actor.Actor {
    return &parentActor{}
}

type childActor struct{}

func (state *childActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *actor.Restarting:
        fmt.Println("restarting")
    case *hello:
        if msg.Who == "ytake" {
            panic("Ouch")
        }
        fmt.Printf("Hello %v\n", msg.Who)
    }
}

func newChildActor() actor.Actor {
    return &childActor{}
}

func main() {
    system := actor.NewActorSystem()
    supervisor := actor.RestartingSupervisorStrategy()
    rootContext := system.Root
    props := actor.
        PropsFromProducer(newParentActor,
            actor.WithSupervisor(supervisor))
    pid := rootContext.Spawn(props)
    rootContext.Send(pid, &hello{Who: "ytake"})
    rootContext.Send(pid, &hello{Who: "ytake1"})
    _, _ = console.ReadLine()
}

このコードは前回のコードとほぼ同じですが、少しだけコードを変更しています。

parentActorhello を受け取り、childActor を生成してメッセージを送信します。

func (state *parentActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *hello:
        props := actor.PropsFromProducer(newChildActor)
        child, err := context.SpawnNamed(
            props, "child")
        if err != nil {
            fmt.Println(err)
        }
        context.Send(child, msg)
    }
}

さきほどのコードとの違いは、SpawnとSpawnNamedの違いです。
SpawnNamedは名前を指定してアクターを生成できますので、
この例では child という名前でアクターを生成しています。
すでに生成済みの場合、つまり2回目以降の生成の場合は、すでにアクターが生成済みのためエラーが返されます。
そして生成された1アクターがメッセージを受け続ける形になります。

ここで生成されたアクターは先ほどと同じく childActor です。

func (state *childActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *actor.Restarting:
        fmt.Println("restarting")
    case *hello:
        if msg.Who == "ytake" {
            panic("Ouch")
        }
        fmt.Printf("Hello %v\n", msg.Who)
    }
}

childActorhello を受け取り、Whoytake の場合にpanicを発生させます。
それ以外は正常にメッセージを出力します。
つまりpanicが起きても再起動で復帰すれば、その後のメッセージは正常に受け取れる、ということです。

このコードを実行すると、panicが発生して再起動が走り、その後のメッセージが正常に出力されることが確認できます。

どちらの例も、アクター再起動後はメッセージの再処理は行われませず、
再起動後のアクターは新しいメッセージを受け取ることができる、という動作は変わりません。

これまではアクターの監視と簡単な再起動について説明しましたが、
この再起動のさせ方はいくつかの方法があります。
スーパーバイザー戦略についてですが、また長くなってしまうため次回に説明します。

次回はアクターのスーパーバイザー戦略について!