Goで実践 アクターモデル vol.7 ローカルとリモートで分散させてみよう

ちょっと時間が空いてしまいましたが・・

これまではローカルのみでアクターを動かす簡単なアプリケーションと、
基本的な機能や考え方について解説しました。

今回はProto Actorを使って、ネットワーク上でアクターシステム同士がコミュニケーションを行い、
分散して処理行うRemoteについて簡単に解説していきます。

小さなアプリケーションを分散してみよう

下記の小さなアプリケーションを例にRemoteに一部を置き換えてみましょう。

blog.ytake.jp.net

これは学校と先生と生徒をアクターで実装する内容となっていました。
今回はこの生徒を他のアクターシステムに移動して、
複数のアクターシステムで稼働するように変更し、どのように動作をするのか理解していきます。

このあたりの仕組みはAkka / Pekkoよりも多少簡単なしくみにはなっていますが、
理解するには十分な内容になっていると思います。

おさらいですが、例としてはこのような形でした。

flowchart TD
subgraph ActorSystem
教室アクター --> |*command.PrepareTest|先生アクター
先生アクター --> |*command.StartTest|生徒アクター
生徒アクター --> |*command.SubmitTest|先生アクター
end

今回は次の形に変更します。

flowchart TD
 subgraph RemoteActorSystem["リモートアクターシステム"]
        生徒アクター["生徒アクター"]
  end
 subgraph s1[Endpoint]
        gRPC["gRPC"]
  end
 subgraph LocalActorSystem["ローカルアクターシステム"]
        先生アクター["先生アクター"]
        教室アクター["教室アクター"]
      end

教室アクター -- *command.PrepareTest --> 先生アクター
先生アクター -- *command.StartTest --> gRPC
gRPC -- *command.StartTest --> 生徒アクター
生徒アクター -- *command.SubmitTest --> gRPC
gRPC -- *command.SubmitTest --> 先生アクター

Proto Actorでは処理を分散させる場合、
gRPCを利用するためProtocol Buffersを利用したメッセージ変更する必要があるため、
先生アクターと生徒アクターが利用していたメッセージを
protoファイルに記述してprotocコマンドでコンパイルします。

この2つの間で利用されていたのはcommand.StartTestcommand.SubmitTest でした。
下記のように記述してみましょう。

syntax = "proto3";

package message;

option go_package = "github.com/your/path/to";

message StartTest {
    string subject = 1;
}

message SubmitTest {
    string subject = 1;
    string name = 2;
}

go_packageなどはお使いの環境に合わせて変更してください。

生徒アクターはRemoteのアクターシステムへ

難しそうに感じるかもしれませんが簡単です。

以前のコードを別なアクターシステムで実行するように、
main関数を違うファイルに記述しましょう。

例えばremote.goのようなファイルにしておきましょう。

package main

import (
    console "github.com/asynkron/goconsole"
    "github.com/asynkron/protoactor-go/actor"
    "github.com/asynkron/protoactor-go/remote"
    "github.com/acme/sample/student"
)

func main() {
    system := actor.NewActorSystem()
    remoteConfig := remote.Configure("127.0.0.1", 50052,
        remote.WithKinds(&remote.Kind{
            Kind:  "student",
            Props: actor.PropsFromProducer(student.NewActor),
        }))
    rt := remote.NewRemote(system, remoteConfig)
    rt.Start()
    _, _ = console.ReadLine()
}

これがremoteで起動する生徒アクターが存在するアクターシステムとなります。

remoteを利用する場合はローカルだけで実行する場合と異なり、
アクターシステムの設定を少しだけ記述する必要があります。

remote.Configureを使ってこのアクターシステムで起動するgRPCサーバのホストとポートを指定します。

次のremote.WithKinds はこのアクターシステムで生成されるアクターの種類を登録するものです。

ここでは一つだけですが、remote.WithKindsは複数のアクターを登録することができます。

リモートを使ったアクター間の設定方法はこれ以外にもいくつか用意されていますが、
ここではアクターを生成せずに、「どのようなアクターがリモートに存在するか」を記述しています。

あとはgRPCサーバとして起動しているだけです(ここでは127.0.0.1:50052)。

   rt := remote.NewRemote(system, remoteConfig)
    rt.Start()

なぜこのような指定になるのかというと、
図のように内部でEndpoint ReaderとEndpoint Writerが起動し、
アクターシステム間で相互通信を行う仕組みになっています。

つまりこのホストとポートの指定は背後で動くgRPCに関する設定ということです。

おっと忘れていました。
アクターモデルの基本は「全てのものはアクターである」という哲学がありましたね。
ということでEndpoint ReaderとEndpoint Writerは、アクターとして実装されています。
興味がある方はぜひコードを読んでみましょう。

さてさて、次にリモートで稼働する生徒アクターの実装を少しだけ変更します。

package student

import (
    "fmt"
    "math/rand"
    "time"

    "github.com/asynkron/protoactor-go/actor"
    "github.com/acme/sample/message"
)

type Actor struct{}

func NewActor() actor.Actor {
    return &Actor{}
}

// Receive is sent messages to be processed from the mailbox associated with the instance of the actor
func (a *Actor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *message.StartTest:
        // ランダムで解答時間を1-9秒で設定する
        randTime := rand.Intn(9) + 1
        time.Sleep(time.Duration(randTime) * time.Second)
        // 生徒がテストの問題を解く
        context.Logger().Info(
            fmt.Sprintf("%s が %s テストの解答を提出します", context.Self().GetId(), msg.Subject))
        context.Send(context.Sender(), &message.SubmitTest{Subject: msg.Subject, Name: context.Self().GetId()})
        context.Poison(context.Self())
    }
}

変更点が多少間違い探しのようなものですが・・・:)

前述のようにメッセージとして利用する構造体をprotoファイルから生成したものに変更しています。
message.StartTestmessage.SubmitTest ですね。

次に context.Send(context.Sender(), &message.SubmitTest{Subject: msg.Subject, Name: context.Self().GetId()}) の部分です。

変更前のコードは、送信先のPid(アクターリファレンス)をcontext.Parent() としていましたが、
ローカルとリモートで分割されたアクターはcontext.Parent()で参照ができないため、
ローカルから返信先のアクターリファレンスを指定し、それをcontext.Sender() で取り出す形に変更しています。
多少指定方法が変わりましたが、そのくらいの変更です。

ローカルのアクターシステムを変更しよう

まずはmain関数からです。

package main

import (
    "fmt"

    "github.com/asynkron/protoactor-go/actor"
    "github.com/asynkron/protoactor-go/remote"
    "github.com/asynkron/protoactor-go/stream"
    "github.com/acme/sample/classroom"
    "github.com/acme/sample/command"
    "github.com/acme/sample/event"
)

func main() {
    system := actor.NewActorSystem()
    remoteConfig := remote.Configure("127.0.0.1", 50053)
    rt := remote.NewRemote(system, remoteConfig)
    rt.Start()
    p := stream.NewTypedStream[*event.ClassFinished](system)
    cr, err := system.Root.SpawnNamed(
        actor.PropsFromProducer(
            classroom.NewActor(p.PID(), students(), rt)),
        "math-classroom")
    if err != nil {
        return
    }
    go func() {
        system.Root.Send(cr, &command.StartsClass{Subject: "算数"})
    }()
    r := <-p.C()
    fmt.Printf("%s テストが終了しました\n", r.Subject)
}

func students() []int {
    sts := make([]int, 20)
    for i := 0; i < 20; i++ {
        sts[i] = i
    }
    return sts
}

ローカルのアクターシステムもリモートと同様に、
どのホストでポートは何番でgRPCサーバを起動しますよ、と指定します(ここでは127.0.0.1:50053)。
これでEndpoint ReaderとEndpoint Writerを担当するアクターが起動します。
この例ではローカルとリモートの2つとして紹介していきますが、
サーバがいくつあっても構いません。

次に大きな変更点ではありませんが、
教室アクターから先生アクターを生成するようになっていましたので、
protoactor-go/remote*remote.Remote を追加します。

これはリモートに登録したアクターを生成するために利用するもので、
先生アクターからリモートに登録した生徒アクターを複数生成する、
という挙動へ変更するために使用します。

package classroom

import (
    "github.com/asynkron/protoactor-go/actor"
    "github.com/asynkron/protoactor-go/remote"
    "github.com/acme/sample/command"
    "github.com/acme/sample/event"
    "github.com/acme/sample/teacher"
)

type Actor struct {
    stream   *actor.PID
    students []int
    remote   *remote.Remote
}

func NewActor(stream *actor.PID, students []int, remote *remote.Remote) func() actor.Actor {
    return func() actor.Actor {
        return &Actor{
            stream:   stream,
            students: students,
            remote:   remote,
        }
    }
}

// 省略

これ以外の変更はありません。

次に先生アクターを少しだけ変更しましょう。

元は先生アクターから生徒アクターを生成していましたので、
同じ流れのまま少しだけ変えていきます。

package teacher

import (
    "fmt"
    "time"

    "github.com/asynkron/protoactor-go/actor"
    "github.com/asynkron/protoactor-go/remote"
    "github.com/acme/sample/command"
    "github.com/acme/sample/message"
)

type Actor struct {
    students   []int
    endOfTests []message.SubmitTest
    replyTo    *actor.PID
    remote     *remote.Remote
}

func NewActor(students []int, replyTo *actor.PID, remote *remote.Remote) actor.Actor {
    return &Actor{
        students:   students,
        replyTo:    replyTo,
        endOfTests: []message.SubmitTest{},
        remote:     remote,
    }
}

// Receive is sent messages to be processed from the mailbox associated with the instance of the actor
func (a *Actor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *actor.Restarting:
        // リスタート時に実行される
        context.Send(context.Self(), &command.PrepareTest{Subject: "math"})
    case *command.PrepareTest:
        context.Logger().Info("先生が", msg.Subject, "テストを出しました")
        for _, st := range a.students {
            rp, err := a.remote.SpawnNamed(
                "127.0.0.1:50052",
                fmt.Sprintf("student-%d", st),
                "student",
                5*time.Second)
            if err != nil {
                context.Logger().Info("生徒が見つかりません")
            }
            context.RequestWithCustomSender(rp.Pid, &message.StartTest{Subject: msg.Subject}, context.Self())
        }
        // 生徒がテストを提出する
    case *message.SubmitTest:
        context.Logger().Info(
            fmt.Sprintf("先生が %s の %s テストの解答を受け取りました", msg.Name, msg.Subject))
        a.endOfTests = append(a.endOfTests, *msg)
        // 全員提出したら先生がテストの解答を受け取る
        if len(a.endOfTests) == len(a.students) {
            context.Send(a.replyTo, &command.FinishTest{Subject: msg.Subject})
        }
    }
}

生徒アクターとやり取りを行う部分は、message.StartTestmessage.SubmitTestに変更されました。
他にここで注目する点は下記の部分です。

       for _, st := range a.students {
            rp, err := a.remote.SpawnNamed(
                "127.0.0.1:50052",
                fmt.Sprintf("student-%d", st),
                "student",
                5*time.Second)
            if err != nil {
                context.Logger().Info("生徒が見つかりません")
            }
            context.RequestWithCustomSender(rp.Pid, &message.StartTest{Subject: msg.Subject}, context.Self())
        }

remote.SpawnNamedでリモートのアクターシステムに、アクター生成を指示しています。

ここで指定しているのは

  • 127.0.0.1:50052で起動しているgRPCサーバ、つまりリモートのアクターシステム
  • fmt.Sprintf("student-%d", st) として複数の生徒アクターとして名前をつける
  • リモートに登録されている student を使って生成する
  • 5秒以内に生成されるものとしてFutureを利用

ということになります。

Futureで指定した時間以上かかる場合はerrで通知されることになります。

他にもまだあります。

以前のコードはcontext.Sendを利用していましたが、
今回はcontext.RequestWithCustomSender に変更されました。

これはリモートで生成される生徒アクターが子アクターではなく、
リモートのアクターというコンテキストになるため、
RequestWithCustomSenderを使って返信先であるSenderを明示する、というものです。

これでリモートからローカルにメッセージが返ってくるようになります。

動かしてみよう

先生アクター、生徒アクター間で分散システムとなりますので、
ここでは先に生徒アクターを稼働させたいリモートのアクターシステムを起動しておきましょう。

go runやコンパイルしたバイナリでも構いませんので起動すると下記のように出力されます。

ローカルだけで起動していたアクターシステムとちょっとだけ出力されるメッセージが異なりますが、
前述した図の構成で起動しています。

次にローカルのアクターシステムを起動して先生アクターから生徒へ算数テストの開始を指示します。
コンソールを複数使って起動してください。コンテナでもおk

どうやら生徒たちが算数テストに向き合い始めたようです。

このときリモートのアクターシステムのコンソールには次のように出力されるはずです。

activator/Remote$student-1のようなログは、これまでのヒエラルキーの表現と少しだけ変わっていますが
リモートを使ったアクターシステム間の表現となっています。
SpawnNamedで指定した名前で識別されているのが確認できます(並行・非同期であることは変わりません)。

Supervisionの解説でもあったようにローカル・リモート間で監視も可能ですので、
試しに動かす場合には色々試してみると良いでしょう。

大きな変更なくアクターシステム間で分散が簡単に実現できるのが理解できたと思います。

次回はアクターモデルを活用できるヒントになるものを紹介します。