OpenTelemetry + New Relicを使ってGoで作る分散システムのメトリクスを収集しよう

このエントリはスターフェスティバル株式会社の スターフェスティバル Advent Calendar 20233日目、
New Relic Advent Calendar 20235日目兼ねての記事でもあります。

qiita.com

qiita.com

今回は少しだけ技術的な内容をお届けします。

分散システム

クラウドをベースにプロダクトを開発しているところでは、
分散システムにしたくなくてもどうしてもそうなってしまうことが多いと思います。

難しい分散トランザクションなど様々な問題があるかと思いますが、
Goでは 難しい分散処理問題を解決できる仕組みを簡単に取り入れて開発することができますので、
その辺りの入り口を簡単に紹介します。
アクターモデルとドメインモデリングなどもそのうち紹介します。

ここ数年アクターモデルを利用した分散システム、
メッセージングを活用したリアクティブシステムなどに取り組んでいまして、
難しい技術的な問題を解決することでスケールしやすく障害にもつよいプロダクト開発を行っています。

そんなわけでここではGoを使った3台構成の分散システムの実装方法を簡単に紹介しますが、
分散システムになるとどうしても難しいのが監視ではないでしょうか?

そこで今回はシステムを作るだけでなく
OpenTelemetryを使ってNewRelicにしっかりと転送していく方法も紹介します。

全体像としては下記のようなものです。
                               

構成図

シンプルなものとして、Fizz / Buzzをそれぞれのノードで処理をし
処理を依頼するクライアントになるノードが1台、
クラスター間はgRPCでやりとりを行うアクターシステム(Proto Actor)を利用したものです。

proto.actor

構成管理などはConsulとなっています。

アクターシステムの解説は割愛しますが(また別の機会に)
ある程度システム開発をしている方にはそんなに難しくない内容です。

サンプルコードはこちら

github.com

アクター自身がクラッシュしたり通信がうまくいかなくとも
自身で復旧してリプレイできますので安心な分散システムです。
(今回のサンプルではそこまで含んでいません)

Fizz Grain

今回の例はバーチャルアクターをつかってFizzBuzzを提供する形になっていますが、
一般的なアクターモデルを利用したシステムと同様なものです。

// FizzGrain / Virtual Actor
type FizzGrain struct{}

func (s *FizzGrain) Init(ctx cluster.GrainContext)      {}
func (s *FizzGrain) Terminate(ctx cluster.GrainContext) {}
func (s *FizzGrain) ReceiveDefault(ctx cluster.GrainContext) {}

func (s *FizzGrain) SayFizz(request *shared.FizzRequest, ctx cluster.GrainContext) (*shared.FizzResponse, error) {
    r := &shared.FizzResponse{Message: ""}
    r.Number = request.Number
    if request.Number%3 == 0 {
        r.Message = "Fizz"
    }
    return r, nil
}

メッセージでFizzRequestが送られてくるとSayFizzが対応します。
下記のようにこのノードにFizzGrainを配置しておきます。(バーチャルアクターのアクター)
Proto Actorではcluster.WithKinds を使うことで、
このノードではどのようなアクターを提供するか登録することができます。

この例ではクラスタの管理にConsulを利用するので consul.New() を利用していますが、
zookeeperやetcdなども利用できます。

func main() {
    system := actor.NewActorSystemWithConfig(
        actor.Configure(
            actor.WithLoggerFactory(logger.New)))
    provider, _ := consul.New()
    lookup := disthash.New()
    config := remote.Configure("localhost", 0)
    clusterConfig := cluster.Configure("fizzbuzz-cluster", provider, lookup, config,
        cluster.WithKinds(shared.NewFizzServiceKind(func() shared.FizzService {
            return &FizzGrain{}
        }, 100)))

    c := cluster.New(system, clusterConfig)
    c.StartMember()

    _, _ = console.ReadLine()
    c.Shutdown(true)
}

これでFizzGrainが起動し処理ができるようになります。

Buzz Grain

FizzGrain同様に実装します。

// BuzzGrain / Virtual Actor
type BuzzGrain struct{}

func (s *BuzzGrain) Init(_ cluster.GrainContext)           {}
func (s *BuzzGrain) Terminate(_ cluster.GrainContext)      {}
func (s *BuzzGrain) ReceiveDefault(_ cluster.GrainContext) {}

func (s *BuzzGrain) SayBuzz(request *shared.BuzzRequest, _ cluster.GrainContext) (*shared.BuzzResponse, error) {
    response := &shared.BuzzResponse{Message: request.Message}
    response.Number = request.Number
    if request.Number%5 == 0 {
        response.Message = response.Message + "Buzz"
    }
    return response, nil
}
func main() {
    system := actor.NewActorSystemWithConfig(
        actor.Configure(
            actor.WithLoggerFactory(logger.New)))
    provider, _ := consul.New()
    lookup := disthash.New()
    config := remote.Configure("localhost", 0)
    clusterConfig := cluster.Configure("fizzbuzz-cluster", provider, lookup, config,
        cluster.WithKinds(shared.NewBuzzServiceKind(func() shared.BuzzService {
            return &BuzzGrain{}
        }, 100)))
    c := cluster.New(system, clusterConfig)
    c.StartMember()

    _, _ = console.ReadLine()
    c.Shutdown(true)
}

これでFizzとBuzzの準備が整いました。

簡単なサンプルコードにしていますので、アクターの細かい挙動は実装していませんが、
これにpersistenceなどを加えると任意のイベントやスナップショットから復元ができます。

そのままマイクロサービスアーキテクチャやEvent Sourcingなどに転用できます。

クライアントノード

FizzとBuzzにメッセージを送るクライアントになるアクターを実装します。

実装方法は色々あり、EIPのパターンなども組み合わせることも当然できますが、
ここではシンプルにクライアントがgRPCでFizzGrainにメッセージを送り、
返ってきたメッセージを使って同様にBuzzGrainにメッセージを送ります。
その後BuzzGrainからのメッセージをパイプラインとして機能するアクターに送ります。

Proto Actorのバーチャルアタターを使ったメッセージ送信方法としては、
他にも通常のアクターシステムのようにアドレスを指定することもできます(Proto ActorではPID)

type FizzBuzz struct {
    system *actor.ActorSystem
    pipe   *actor.PID
}

type Say struct {
    Number int64
}

type Response struct {
    Number  int64
    Message string
}

func (state *FizzBuzz) Receive(ctx actor.Context) {
    switch msg := ctx.Message().(type) {
    case *Say:
        fc := shared.GetFizzServiceGrainClient(
            cluster.GetCluster(state.system), "grain1")
        res, _ := fc.SayFizz(&shared.FizzRequest{
            Number: msg.Number,
        })
        nb := shared.GetBuzzServiceGrainClient(
            cluster.GetCluster(state.system), "grain1")
        buzz, _ := nb.SayBuzz(&shared.BuzzRequest{
            Number:  msg.Number,
            Message: res.Message,
        })
        ctx.Send(state.pipe, &Response{Number: msg.Number, Message: buzz.Message})
    }
}

このアクターにメッセージを流すために下記のように実装します。

const rangeTo = 100

func main() {
    system := actor.NewActorSystemWithConfig(
        actor.Configure(
            actor.WithLoggerFactory(logger.New)))
    provider, _ := consul.New()
    config := remote.Configure("localhost", 0)
    clusterConfig := cluster.Configure("fizzbuzz-cluster", provider, disthash.New(), config)
    c := cluster.New(system, clusterConfig)
    c.StartMember()
    defer c.Shutdown(false)
    fmt.Print("\nBoot other nodes and press Enter\n")
    _, _ = console.ReadLine()

    p := stream.NewTypedStream[*Response](system)
    go func() {
        fizzbuzz := actor.PropsFromProducer(func() actor.Actor {
            return &FizzBuzz{
                system: system,
                pipe:   p.PID(),
            }
        })
        pid := system.Root.Spawn(fizzbuzz)
        for v := range [rangeTo]int64{} {
            system.Root.Send(pid, &Say{Number: int64(v + 1)})
        }
    }()
    for range [rangeTo]int{} {
        fmt.Println(<-p.C())
    }
    _, _ = console.ReadLine()
}

FizzGrainとBuzzGrainとやりとりを行うアクター、
パイプラインとして機能するアクターをそれぞれ生成します。

ここではパイプラインの用途にGenericsが利用できる stream.NewTypedStream を利用していますが、
他の方法でも問題なく利用できます。
Proto Actor(Go)ではAkka/PekkoでいうClassic Actorスタイルになりますが・・

stream.NewTypedStream を利用する場合、
送られてきたメッセージはチャネルで受け取ることができます。
(この例では <-p.C()

これでそれぞれのノードを起動させると
3台組み合わせたFizzBuzzを実行するアクターを使った分散システムになります。

やったね!

実運用するにはたりないもの

これで処理自体は問題ありませんが、
実際に運用する場合は監視やメトリクス収集は欠かせません。

ということでOpenTelemetryを利用し、そのバックエンドにNew Relicを利用できるようにしていきます。
サンプルコードにはPrometheus、Grafana、OpenTelemetry Collectorも使えるようにしてありますが、
実際に利用する場合はやはりちょっと大変・・。
この辺りはNewRelicを使って楽していきます。

newrelic.com

ここではそれぞれのノードから直接New Relicに送るように実装します。
Proto Actor(Go)ではOpenTelemetry メトリクスに対応していますので、
簡単に利用できます。

利用するにはアカウントはもちろんですが、api-keyが必要になりますので事前に発行が必要です。
あとは送信先も確認しておきましょう。

docs.newrelic.com

実装の仕方は色々ありますが、簡単に動かす実装にします。

package metrics

import (
    "context"
    "time"

    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
    "go.opentelemetry.io/otel/sdk/metric"
    "go.opentelemetry.io/otel/sdk/resource"
)

type NrOpenTelemetry struct {
    endpoint  string
    attribute []attribute.KeyValue
    apiKey    string
}

// NewNrOpenTelemetry returns OpenTelemetry for newrelic
func NewNrOpenTelemetry(endpoint, apiKey, serviceName string) *NrOpenTelemetry {
    return &NrOpenTelemetry{
        endpoint: endpoint,
        attribute: []attribute.KeyValue{
            attribute.String("service.name", serviceName),
        },
        apiKey: apiKey,
    }
}

// Exporter returns metric.MeterProvider
func (o *NrOpenTelemetry) Exporter(ctx context.Context) (*metric.MeterProvider, error) {
    exporter, err := otlpmetrichttp.New(ctx,
        otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression),
        otlpmetrichttp.WithEndpoint(o.endpoint),
        otlpmetrichttp.WithHeaders(map[string]string{
            "api-key": o.apiKey,
        }),
    )
    if err != nil {
        return nil, err
    }
    res, err := resource.New(ctx, resource.WithAttributes(o.attribute...))
    provider := metric.NewMeterProvider(
        metric.WithReader(metric.NewPeriodicReader(exporter, metric.WithInterval(1*time.Second))),
        metric.WithResource(res))
    otel.SetMeterProvider(provider)
    return provider, nil
}

特に気をつけなければいけないところはありませんが、
リクエスト時にheaderでapi-keyが必要になりますので忘れずに実装しましょう。

Proto Actorではmetric.MeterProviderをそのまま利用することができますので、
下記のようにactor.WithMetricProviders を使って指定します。

   system := actor.NewActorSystemWithConfig(
        actor.Configure(
            actor.WithMetricProviders(meterProvider),
            actor.WithLoggerFactory(logger.New)))

各ノードで変更すると全てのノードでNew Relicへ直接送信されます。
実行すると下記の通り・・!

あとは利用環境に合わせて色々利用することができます。

実装するだけだと障害発生時や意図通り処理が動いていない場合の調査が非常に大変になりますが、
こうした仕組みを利用することでしっかりと内部を理解できるようになりますので、
ぜひ活用してみてください。

最後に

下記のイベントでDRE/SRE的なプラクティスとマインド、
そしてクラウドネイティブなスタックを使いながら現状をよくしていく活動のお話をします。
実際に協力させていただいている企業などでも実施していることでもありますので
興味のある方はぜひ会場にて!

event.cloudnativedays.jp