ytake blog

Web Application Developer

Proto Actor(Go)でスキャッタ・ギャザーを実装してみよう

メッセージパッシングの世界にようこそ。
ここしばらくはアクターモデルばっかりやってるおじさんです。

アクターモデルといえば、多くの方々はAkka/Pekkoを連想すると思います。

pekko.apache.org

akka.io

が、実はGoでも実践に投入できるレベルのものがいくつかあります。

proto.actor

cloud.ergo.services

今回はprotoactor-goを使って、スキャッタ・ギャザー実装の解説をしていきます。

アクターモデル自体の解説はしませんので、
わかっている方 or 興味ある方向けにはなりますのでご了承ください。

スキャッタ・ギャザーって何?

エンタープライズ統合パターンという、
テクノロジーに依存しないソリューションとしてメッセージングを扱ったパターンがあり、
そのうちの一つがこのスキャッタ・ギャザーです。

access.redhat.com

www.enterpriseintegrationpatterns.com

スキャッタ・ギャザーは、
特定のグループにメッセージを送り、
グループ内のそれぞれが処理を行うなどして集約する仕組みのことです。

参考 https://access.redhat.com/documentation/ja-jp/red_hat_fuse/7.8/html/apache_camel_development_guide/scatter-gather

分散システムデザインパターンの本にもあるものなので、
目にした方も多いと思います。

ミドルウェアなどにも多く採用されているもので、
通常のWebアプリケーションではあまり見ることがないものかもしれませんが、
マイクロサービスアーキテクチャや、メッセージ駆動系のアプリケーションでは必須のテクニックとなっています。

この機会にぜひ覚えましょう!

どうやって実装するの?

Goはgroutineなどがありますので、特に特殊な何かをしなくとも処理の流れを作ることはできます。

が、実際にメッセージングを扱う以上、障害などにも強く弾力性もあるものでなければなりません。
一つのプロセスだけでなく、複数のプロセス、
もしくは物理的に異なるものに対しても変わらずに動くシステムにしなければなりません。

www.reactivemanifesto.org

アクターモデルを導入することで、これらの問題を解決することができるということで、

github.com

ということで今回はprotoactor-goの出番です。

protoactor-goはAkka/Pekkoほど強力ではありませんが、
実践投入しても問題ないもので、
当然1プロセスでもサーバ間でもアクター同士がきちんと関係性を持ち、
状態変化(become/unbecome)や監督(supervise)、クラスタリングなども当然サポートされていて
Goでは割とコスト低めでおすすめなものです。(が、Goの開発サイクルはあまり早くなく.NET版の方が盛ん)

goroutineとアクターの違いは、
どちらもプロセス間通信ものもではありますが、
アクターモデルは位置の把握や識別などが可能で、
channelとは差異があります。
解説するとそれだけで長くなるので、ここでは似たような感じのもの と思っていただいて良いです。

同じ販売商品でも、形態によって異なる中の最安値のデータを判断するとか、
店舗に問い合わせて在庫を持っているところだけに発注を出すとか、
そういうものを連想していただければ良いです。

今回は、少し古い本ですがAkka実践バイブルにある交通監視カメラの画像ファイル名を扱う例をそのまま再現します。

画像ファイルを扱うメッセージの定義は下記のものとします。

type PhotoMessage struct {
    ID              string
    Photo           string
    CreationTime    *time.Time
    Speed           *int
}

なんてことはありません。
画像識別のためのIDと、画像ファイル名などがあります。
スピードや撮影日はあったりなかったりする例です。

画像ファイルには撮影日やその時の速度が入っている、という形になります。
この画像ファイルから撮影日と速度を取り出し、
最後に集約して一つのメッセージにしていきます。

この撮影日抽出と速度抽出を二つのアクターがそれぞれを担当します。
スキャッタ・ギャザーを実装するにはこの二つだけでは当然足りません。

画像メッセージをディスパッチするスキャッタ機能を担当するアクター、
結果を集約するアクターが必要になります。

スキャッタ

実装する方法はいくつかありますが、
ここでは受信したメッセージを複数のアクターに送信するRecipient List
を利用します。

type RecipientList struct {
    PIDs []*actor.PID
}

撮影日抽出と速度抽出のアクターにメッセージを送信するため、それぞれのアクターを渡せるようにしています。
RecipientListアクターはメッセージを受け取ると
対象のアクターにメッセージを送信します。

func (state *RecipientList) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *message.PhotoMessage:
        for _, recipient := range state.PIDs {
            context.Send(recipient, msg)
        }
    }
}

定義したメッセージのみを指定したそれぞれのアクターに送信する形になります。
特別なことは特にありません。

Akka/Pekkoの(!)はProto ActorではSendになります。

実際の送られているかどうかは下記のテストコードで確認できます。

func stubActorProps(ref *actor.PID) *actor.Props {
    return actor.PropsFromFunc(func(ctx actor.Context) {
        switch msg := ctx.Message().(type) {
        case *message.PhotoMessage:
            ctx.Send(ref, msg)
        default:
        }
    })
}

func TestRecipientList_Receive(t *testing.T) {
    t.Run("scatter the message", func(t *testing.T) {

        system := actor.NewActorSystem()
        p := stream.NewTypedStream[*message.PhotoMessage](system)
        ti := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC)
        expectMsg := &message.PhotoMessage{
            ID:    "id1",
            Photo: makeCreatePhotoString(ti, 60)}

        go func() {
            p1 := stubActorProps(p.PID())
            p2 := stubActorProps(p.PID())
            pa1 := system.Root.Spawn(p1)
            pa2 := system.Root.Spawn(p2)
            re := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor {
                return &RecipientList{
                    PIDs: []*actor.PID{pa1, pa2}}
            }))
            system.Root.Send(re, expectMsg)
        }()
        p1msg := <-p.C()
        if p1msg != expectMsg {
            t.Errorf("expected %v, got %v", expectMsg, p1msg)
        }
        p2msg := <-p.C()
        if p2msg != expectMsg {
            t.Errorf("expected %v, got %v", expectMsg, p2msg)
        }
    })
}

stream.NewTypedStream はAkka Typedと大体似たような形になっており、
ストリームの中から型指定でメッセージを取り出せるようになっているものです。
値を取り出す場合は、Proto ActorではRequestFuture(いわゆるFutureです)を使うなどして、
送信もとのアクターで取り出すことができますが、
同様にストリームのパイプラインとして扱うことでgoroutineを使って取り出すことができます。
この辺りはGoっぽい感じですんなり利用できると思います。

ストリームについてわからない方は、
時間軸の流れの中心になる道、というようなイメージで問題ありません。

ギャザー

集約を担当するAggregatorを実装します。

これは名前の通り複数のメッセージを集約するのに利用し、ギャザーとして作用させます。
複数のメッセージを保持し、決まった数を受信したらその分だけ処理するなどができます。

早速なので、2つメッセージを受信したら処理を行うようにします。

構造体は下記のようにしておきます。
集約したメッセージを受け取るアクターをpipeとしています。
messagesは処理していないメッセージを保持するためのスライスです。

type Aggregator struct {
    pipe     *actor.PID
    messages []*message.PhotoMessage
}

メッセージを一つにまとめる処理は次のとおりです。

func (state *Aggregator) Receive(ctx actor.Context) {
    switch msg := ctx.Message().(type) {
    case *message.PhotoMessage:
        var found *message.PhotoMessage
        for _, m := range state.messages {
            if m.ID == msg.ID {
                found = m
                break
            }
        }
        if found != nil {
            newCombinedMsg := &message.PhotoMessage{
                ID:           msg.ID,
                Photo:        msg.Photo,
                CreationTime: msg.CreationTime,
                Speed:        msg.Speed,
            }
            newCombinedMsg.UpdatePhotoMessage(found)
            ctx.Send(state.pipe, newCombinedMsg)
            var newMessages []*message.PhotoMessage
            for _, m := range state.messages {
                if m.ID != found.ID {
                    newMessages = append(newMessages, m)
                }
            }
            state.messages = newMessages
        } else {
            state.messages = append(state.messages, msg)
        }
}

仕組み上Akka/Pekkoよりはコード量が多いですが、Goに慣れている人であれば特に普通のコードです。

下記の部分はメッセージを二つ以上受信するとできる処理であることを示しています。
無闇に関係ないメッセージを処理することはできないため、IDが一致するもののみを対象としています。

       var found *message.PhotoMessage
        for _, m := range state.messages {
            if m.ID == msg.ID {
                found = m
                break
            }
        }

次の部分では、処理して良いメッセージが二つ以上ある場合は、
先に来たメッセージを基準として、後に来たメッセージで更新して
二つをマージする処理となっています。
処理後はpipeで指定されている他のアクターの送信されます(結果として受け取れます)

そしてこのアクターはメッセージの状態を持っていますので、処理したメッセージを
再処理しないように処理済みを取り除いてアクターの状態を更新しています。
(アクターのライフサイクルについては今回触れませんので、どこかで理解するとよりわかると思います)

       if found != nil {
            newCombinedMsg := &message.PhotoMessage{
                ID:                     msg.ID,
                Photo:              msg.Photo,
                CreationTime: msg.CreationTime,
                Speed:             msg.Speed,
            }
            newCombinedMsg.UpdatePhotoMessage(found)
            ctx.Send(state.pipe, newCombinedMsg)
            var newMessages []*message.PhotoMessage
            for _, m := range state.messages {
                if m.ID != found.ID {
                    newMessages = append(newMessages, m)
                }
            }
            state.messages = newMessages
        } else {
            state.messages = append(state.messages, msg)
        }

elseではまとめて処理できるものが見つからない、つまり1件しかないということになりますので、
アクターが保持して、次のメッセージ受信まで保管しています。

アクター同士の関係を作っていきますので、returnなどはできません。
(やるとしても違う方法になります。)

この動作を確認するテストコードは次のとおりです。

   t.Run("aggregate two messages", func(t *testing.T) {
        ti := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC)
        create := time.Date(2023, 2, 3, 4, 5, 6, 0, time.UTC)
        ps := makeCreatePhotoString(ti, 60)

        msg1 := &message.PhotoMessage{ID: "id1", CreationTime: &create, Photo: ps}
        s := 60
        msg2 := &message.PhotoMessage{ID: "id1", Photo: ps, Speed: &s}
        expect := &message.PhotoMessage{ID: "id1", Photo: ps, CreationTime: msg1.CreationTime, Speed: msg2.Speed}

        system := actor.NewActorSystem()
        p := stream.NewTypedStream[*message.PhotoMessage](system)
        go func() {
            re := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor {
                return NewAggregator(1*time.Second, p.PID())
            }))
            system.Root.Send(re, msg1)
            system.Root.Send(re, msg2)
        }()
        res := <-p.C()
        if !reflect.DeepEqual(res, expect) {
            t.Errorf("expected %v, got %v", expect, res)
        }
    })

二つのメッセージが、撮影日と速度を別々に持っており、
Aggregatorのアクターが二つのメッセージを集約することを期待するコードとなっています。

できた気がします。
正常に動くことだけを担保するのであればこれで問題ありませんが、
これだけでは不十分です。

仮にメッセージの集約処理に時間がかかりすぎた場合、
他のメッセージ処理ができずにスタックしてしまうかもしれません。
アクターにメッセージを保持しすぎてメモリーを逼迫してしまう可能性もあります。

解決方法はいろいろありますが、ここではアクターモデル採用しているツールキットなどでお馴染みの
タイムアウトを利用してみましょう。
指定した時間以上経過した場合は、未処理のものをそのままpipeに送信するようにします。

       if found != nil {
            newCombinedMsg := &message.PhotoMessage{
                ID:           msg.ID,
                Photo:        msg.Photo,
                CreationTime: msg.CreationTime,
                Speed:        msg.Speed,
            }
            newCombinedMsg.UpdatePhotoMessage(found)
            ctx.Send(state.pipe, newCombinedMsg)
            var newMessages []*message.PhotoMessage
            for _, m := range state.messages {
                if m.ID != found.ID {
                    newMessages = append(newMessages, m)
                }
            }
            state.messages = newMessages
        } else {
            // メッセージが一件しかない場合はタイムアウトを設定した後に
            // アクターにメッセージを持たせている
            state.messages = append(state.messages, msg)
            ctx.SetReceiveTimeout(state.timeout)
        }

先ほどのコードに ctx.SetReceiveTimeout(state.timeout) で指定した時間以上待機する場合はタイムアウトを発生するように
アクターに指定します。

外から指定できるようにする場合は、先ほどの構造体を次のようにするといいでしょう。

type Aggregator struct {
    timeout  time.Duration
    pipe     *actor.PID
    messages []*message.PhotoMessage
}

Akka/Pekkoと異なり、Timeout指定時にメッセージをくっつけられないため、
タイムアウトだけ指定しています。
同様の動きにしたい場合は、少し工夫が必要になります(ここでは簡単なものにしておきます)

アクターでタイムアウトが発生すると *actor.ReceiveTimeout メッセージが自アクターに送信されます。
これを受け取った詩に、保管しているメッセージを全てpipeのアクターの送信し、
保持しているメッセージの領域を開けるようにしてみます。
(前述の通り実践に投入する場合は、タイムアウトしたメッセージだけを送信して、その分だけ解放するようにしましょう)

   case *actor.ReceiveTimeout:
        for _, m := range state.messages {
            ctx.Send(state.pipe, m)
        }
        state.messages = nil

手抜きですが、これで長い時間がかかる処理に対してのリソース問題を手抜きして解決しました。
これだけでは実はまだ予期せぬ事態に対応できません。

例えばアクターは何かしらの問題が起きると再起動し、その時の状態がなくなってしまいます。
Goの場合はpanicを投げるとアクターは再起動されます。
(振る舞いは該当のアクターだけ再起動するか、親も含めて再起動するかなど変更できます。)

予期せぬ事態にも対応できるようにしておきましょう。
とは言ってもメッセージの内部で意図的にpanicを起こすのは面倒なため、
特定のメッセージを受け取ったらpanicを発生させられるように、
ここでは意図的に実行できるようにします。

   case *message.IllegalStatePanicMessage:
        // 意図的にパニックを起こすメッセージを受信するとパニックを検知して、
        // Aggregatorのメッセージを持っている状態で停止する
        panic("this is a scheduled panic")
    }

IllegalStatePanicMessageを受け取るとpanicを起こして再起動が走るようにします。
ではこの再起動時にメッセージが消失しないようにするにはどうしたらいいのでしょうか。

これはProto Actorのライフサイクルを把握しておくと簡単に対応できます。

Akka/PekkoにはPreRestartがありますが、Proto Actorでは *actor.Restarting メッセージをアクターが受け取るタイミングで
同じような処理ができます。

   case *actor.Restarting:
        for _, v := range state.messages {
            ctx.Send(ctx.Self(), v)
        }
        state.messages = nil

自アクターにメッセージを送信して再起動後に続きから処理できるようにする、という解決方法を取ることができます。
これである程度の事態に対応できるようになりました。

2件以上で処理したいのに1件のみ受信、1秒以上経過するとタイムアウトを起こし、
持っているメッセージをそのまま戻すことを下記のようなコードでテストできます。

   t.Run("send message after timeout", func(t *testing.T) {
        ti := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC)
        ps := makeCreatePhotoString(ti, 60)
        msg1 := &message.PhotoMessage{ID: "id1", CreationTime: &ti, Photo: ps}

        system := actor.NewActorSystem()
        p := stream.NewTypedStream[*message.PhotoMessage](system)
        go func() {
            re := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor {
                return NewAggregator(1*time.Second, p.PID())
            }))
            system.Root.Send(re, msg1)
        }()
        res := <-p.C()
        if !reflect.DeepEqual(res, msg1) {
            t.Errorf("expected %v, got %v", msg1, res)
        }
    })

意図的にパニックを起こし、アクターを再起動させた後にもう1件メッセージを送り
処理を再開できることをテストするには次のようになります。

   t.Run("aggregate two messages when restarting", func(t *testing.T) {
        ti := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC)
        ps := makeCreatePhotoString(ti, 60)
        create := time.Date(2023, 2, 3, 4, 5, 6, 0, time.UTC)

        msg1 := &message.PhotoMessage{ID: "id1", CreationTime: &create, Photo: ps}
        s := 60
        msg2 := &message.PhotoMessage{ID: "id1", Photo: ps, Speed: &s}
        expect := &message.PhotoMessage{ID: "id1", Photo: ps, CreationTime: msg1.CreationTime, Speed: msg2.Speed}

        system := actor.NewActorSystem()
        p := stream.NewTypedStream[*message.PhotoMessage](system)
        go func() {
            re := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor {
                return NewAggregator(1*time.Second, p.PID())
            }))
            system.Root.Send(re, msg1)
            system.Root.Send(re, &message.IllegalStatePanicMessage{})
            system.Root.Send(re, msg2)
        }()
        res := <-p.C()
        if !reflect.DeepEqual(res, expect) {
            t.Errorf("expected %v, got %v", expect, res)
        }
    })

これで現在想定しているAggregatorの振る舞いは大丈夫そうです。

撮影日抽出と速度抽出 2つのアクター

この二つは大したことはありません。
画像名が 01012022 12:34:56.789|55|XYZ123 のようなものを想定して、
撮影時間|速度|ナンバープレートの番号 というような形式を例としています。
(この辺りは書籍などを確認ください)

それぞれの抽出のみ担当するアクターの例です。

撮影日抽出アクター

type GetTime struct {
    pipe *actor.PID
}

func NewGetTimeActor(pipe *actor.PID) actor.Actor {
    return &GetTime{
        pipe: pipe,
    }
}

func (g *GetTime) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *message.PhotoMessage:
        i := &process.ImageProcessing{}
        t, _ := i.GetTime(msg.Photo)
        msg.CreationTime = &t
        context.Send(g.pipe, msg)
    }
}

速度抽出アクター

type GetSpeed struct {
    pipe *actor.PID
}

func NewGetSpeedActor(pipe *actor.PID) actor.Actor {
    return &GetSpeed{
        pipe: pipe,
    }
}

func (g *GetSpeed) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *message.PhotoMessage:
        i := &process.ImageProcessing{}
        speed, _ := i.GetSpeed(msg.Photo)
        msg.Speed = &speed
        context.Send(g.pipe, msg)
    }
}

結合

これで最後に各アクターをスキャッタ・ギャザーに結合させます。

結合も大して大変ではありません。
これまで同様にTypedStreamを使ってメッセージを受信します。
一つの画像名から撮影日抽出アクターと速度抽出アクターがそれぞれ担当します。
送るメッセージは1つだけです。

       system := actor.NewActorSystem()
        p := stream.NewTypedStream[*message.PhotoMessage](system)
        d := time.Date(2023, 9, 10, 4, 5, 6, 0, time.UTC)
        speed := 60
        ps := makeCreatePhotoString(d, speed)
        msg := &message.PhotoMessage{
            ID:    "id1",
            Photo: ps,
        }

結合は下記の通りです。

結合イメージ

これに合わせて組み合わせます。
Aggregatorがメッセージを保持しますので、
GetSpeedActor、GetTimeActorそれぞれのメッセージを集約するようにします。
(2つのアクターにいるpipe つまりAggregatorにそれぞれメッセージを送信します)
RecipientListはこの2つのアクターに同じメッセージを送します。

Aggregatorで指定されているp.PIDは集約結果を受け取る、TypedStreamのアクターです。

       go func() {
            ref := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor {
                return NewAggregator(timeOut, p.PID())
            }))
            speedRef := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor {
                return NewGetSpeedActor(ref)
            }))
            timeRef := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor {
                return NewGetTimeActor(ref)
            }))
            actorRef := system.Root.Spawn(actor.PropsFromProducer(func() actor.Actor {
                return &RecipientList{
                    PIDs: []*actor.PID{speedRef, timeRef}}
            }))
            system.Root.Send(actorRef, msg)
        }()

集約したメッセージは下記の通り二つの処理結果をマージしたものになります。

       expect := &message.PhotoMessage{
            ID:           msg.ID,
            Photo:        msg.Photo,
            Speed:        &speed,
            CreationTime: &d,
        }
        res := <-p.C()
        if !reflect.DeepEqual(res, expect) {
            t.Errorf("expected %v, got %v", expect, res)
        }

送信時は速度や撮影日がありませんでしたが、それぞれ抽出した後に結合されるので
上記のメッセージが受け取れるようになります。

これでスキャッタ・ギャザーを一通り実装することができました。
実際に実装する場合は、アクターは何個いても構いません。
アクターの紐づく子アクターを生成してメッセージをそれぞれ保持することもできますし、
それぞれのアクターがまた他のアクターと関係性を持っても構いません。
最終的にメッセージが集約されて期待通りになればどんな組み合わせでも良いです。

おわりに

アクターモデルを使わずともKakfaなどを合わせてやれば作ることは確かにできますが、
やはりこうした処理はアクターモデルのツールキットを使うことで並列でメッセージをうまく扱えるようになります。
(Kafkaを使ってつくるのであればKafka StreamsやApache Sparkなどを併用することになります)

Akkaの本を元にProto Actor(Go)に置き換えるのはさほど苦労なくできるのがわかるかと思います。
対象内容が古くなっていてもProto Actorを通じてアクターモデルを習得することが十分できますので、
ぜひやってみてください。

とはいいつつなんだかんだAkka/Pekkoの方が楽です

SREはインフラエンジニアだけでなく、みんなの活動

みなさんSREしてますか?

サービスなどの品質を維持していくために切っても切り離せないSREですが、
日本でもSREという言葉が定着しつつあるかと思います。

このSREについて書いていきたいと思います。

SRE NextのCFP忘れてたのでその代わりに・・

SREってインフラですよね?

非常によくあるケース、というか多分ほとんどがこうなっていると思います。
もちろん会社としてインフラのことを指しても問題はありませんが、
SREとはどういうものなのか、正しく認識して今一度現状を振り返ることで
さらに良い活動に繋がることが多いと思います。

なんのこっちゃ、という方も多いかもしれません。
SREはエラーバジェットなどの話が必ず出てきますので、
モニタリングや監視などが必ずセットにはなっていきます。

ですが、この部分が強調されているのかどうしてもインフラエンジニアでしょ、
というのが定着している場面が多いような気がしています。
もちろんこうしたツールがなければ、実践は難しいのは明らかです。

本来やるべき活動はそれではありません。
ではDevOpsなのでしょうか?

DevOpsは、開発と運用が連携する・もしくはその境界線をなくしていく開発手法のひとつです。
SREの活動には含まれますが、
CI/CDなどを作りこれを実践できるための環境を作ることや、その手法が目的ではありません。

SREが一番重要視しなくてはいけないものは、
サービスの利用者がハッピーになるための、あらゆる秩序や品質維持に関する活動ではないでしょうか?

sre.google

ではDevOpsやモニタリングはどういう関わりがあるのでしょうか? ちょっと整理してみましょう。

品質が高いとは?

まず目指すゴールはサービスの利用者がハッピーになること、満足できるサービス提供すること。
抽象度が少し高いですが「品質が高い」、ということになります。

ではこの「品質が高い」とはどういうことを指すのでしょうか?
みなさんは自社における品質の高さはどういうものなのか理解していますか?

たしかにアプリケーションのパフォーマンス(レスポンスなど)が良いことかもしれません。

果たして本当にそれだけでしょうか?
SREというキーワードで見るとほとんどこの観点のものが多いように思います。

速さ以外に本当に重要なものはありませんか?

サービスの形態や事業ドメインによってはまちまちです。
ニュース配信やコンテンツ提供を行っている会社では、パフォーマンスが良いことはもちろんですが、
「読みたいと思った記事が確実に読めること」などではないでしょうか?

たとえば高速なレスポンスを返すシステムがあり、負荷にも強いシステムができている、
たまに意図しないコンテンツが返ることがあり、
利用者が読みたいコンテンツを返却できずに違うものが返却される状態になると
利用者が満足できる状態になるのでしょうか?

高速にレスポンスを返すのも当然必要ですが、
重要視するのは速さだけではなく読みたいものを確実に届けること、になるのではないでしょうか?
読みたいものが確実に読める、届けられるようになりユーザーの満足度が上がり、
サービスへの信頼度も上がり、購読会員などになっていく重要なポイントになるはずです。
レスポンスが速ければ会員登録や、売り上げが上がるのでしょうか?
(広告配信などはそれが一番重要ですね)

購入などを提供するサービスでは確実にユーザー購入できること、が重要になるかもしれません。
配送系のサービスであれば、確実に住所が指定できて配送会社に連携ができること、かもしれません。
(リアルな配達は道路事情などもあるので満足度を高くするにはアプリケーションだけでは不可能ですが)

確かにレスポンスなどの監視だけではこの辺りをすべて満たすのは難しいかも、というのがわかってくると思います。

では再び考えてみましょう。

品質が低くなってしまう理由は?

ここでようやくDevOpsなどの話が少し出てきます。

上記のようなコンテンツが正しく返却できずに、読みたいものが届けられなかった原因を探ってみましょう。
もしかすると開発チームが十分にテストできていなかった。
キャッシュサーバが故障していた。
色々原因があると思いますが、DevOpsっぽく前者のテストができていなかった、に注目してみましょう。

たとえば
「開発がすごい速さで開発している、モダンで自動化されたフローでリリースは毎日、
運用チームは保守やカスタマーサポートと連携して色々な不具合を調査したり改修をしている。」
こんな感じになっているとしましょう(こういった体制のところも多いはず)

ではテストができなかった原因を探ってみましょう。

一方が開発速度が早く毎日自動でリリースしていて、ユニットテストも書いている、結合テストもできている
テストがすべて通ったので自動でリリースされる、
が実はDBのレコードにミスがあり
気が付かずに自動でどんどんリリースされてしまった。

運用チームがリリース内容などをチェックするよりも早くリリースされていくため、
当然気がつくのはユーザーからのクレームだったり、もしくは他の部署の人たちから運用チームに連絡が来た。
という流れだとしましょう。

当然これはテストが十分にできていなかった、ということになります。
これはDevOpsとして考えるとどうでしょうか?

テストコードが足りなかったのかもしれませんし、
自動化することだけを目的にしてしまったため起きてしまった事故なのかもしれませんし、
もしくは開発と運用が仲が悪くてリリース時に連携したくなかったのかもしれません。
(ほとんどArt of SLOsの例そのまんまですねw)

こうしたことがきっかけで結果としてユーザーの満足度が下がっていくことに繋がるわけです。
この場合解決しないといけないものは、
リリース時の体制や自動化によるリリースの頻度、
普段のコミュニケーションの改善かもしれません。
もしくはとにかくリリースしろ!というような方針が招いた結果かもしれません。

SREは
こうした事態を招かないようにユーザーの満足度低下に繋がる原因を突き止め、
守るべきものを文章化して会社全体に共有しながら文化を作り、
秩序を維持していくための活動
なのです。

これはインフラエンジニアだけがするものなのでしょうか?

これはポジションや領域関係なくできる活動のはずです。
もちろんインフラエンジニアが関わることが多い監視やモニタリングはありますが、
これらはあくまで手段であり、目的ではありません。
開発チームと運用チームがうまく連携し、品質を維持するために活動しなければいけません。

SLI/SLOはただ設定するだけでなく、
現在の開発サイクルや運用、サービスの利用状況や事業ドメインなどを考慮して設定する必要があります。
ユーザーの満足度を上げるためにどうすれば良いかを第一に考え、
それを実現するための活動をすることがSREの本質です。

つまり逆算して考えると、
活動が十分にできるように社内における信頼関係を築くこと、
職種問わずドメインに関する知識を共通認識とすること、
そしてユーザーの満足度を上げる・維持のために品質を維持するルール策定、
同時にモニタリングやそれらを実現するためのツール・環境整備などをする(実装ですね)、
これらがSREの活動になります。

インフラエンジニア = SREとすると、
多くの場合は手段のひとつであるモニタリングや監視に偏ってしまい、
本来の目的であるユーザーの満足度を上げるための活動ができていない、ということになります。
これではエラーバジェットを設定しても、活用が難しくなるのではないでしょうか。

HowではなくWhyを考えることが大事です。

会社の体制などにもよりますがインフラエンジニアは
ドメインの知識を持つための機会に参加することが難しい場合も多く、
それよりも障害対応や環境整備・コスト削減や場合によっては情シスなどの活動が多くなってしまうことも多いと思います。
そしてそれを期待してインフラエンジニアを採用している会社も多いと思います。

この体制ではSREとしての活動は難しいでしょう。
SREとしての活動をするためには、
企業のこうした体制や文化を変えていくことが必要になります。

SLIメニューを振り返ろう

SREはどのような活動をしていくものなのか、というのを簡単にピックアップしていきました。
それに基づいてSLIメニューを振り返ってみましょう。

サービスの種類 SLIの種類 説明
Request/Response 可⽤性(Availability) 正常に応答したリクエストの⽐率
Request/Response 遅延(Latency) しきい値より早く応答したリクエストの⽐率
Request/Response 品質(Quality) 特定の品質を満たしたリクエストの⽐率
データ処理 新鮮さ(Freshness) ある特定の時間をしきい値にして、それより最近に更新されたデータの⽐率
データ処理 正確性(Correctness) 正しい値の出⼒につながったデータ処理への⼊⼒レコードの⽐率
データ処理 カバレッジ(Coverage) 処理したジョブの⽐率、処理に成功した⼊⼒レコードの⽐率など
ストレージ Durability(耐久性) 書き込まれたレコードのうち、正しく読み出せるものの⽐率

参考

www.coursera.org

* New RelicやGoogleでも開催されていますので、問い合わせなどをしてみてください。

各社でやっているワークショップに参加したことがある方は理解しているかもしれませんが、
SLIメニューをみてもわかるように
正しさや比率などがそれぞれのサービスによって設けられています。

SLIの種類はこれ以外にも独自に作ることももちろんできると思いますが、
大事なのは何を持って正しいとするのか、良いとするのか、ということです。
このSLIメニューは、DREなどの活動にも当てはまりますのでぜひ意識してみてください。

これらは教科書通りに作るのではなく、
事業ドメインやサービスの特性を考慮して作る必要があります。
これはSREの活動をする上で非常に重要なことですので、
ぜひ意識して、プロダクトオーナーや開発チームと一緒に考えてみてください。
またCTOやCEOなどの経営陣とも一緒に考えてみるのも良いかもしれません(割とオススメです)。

インフラエンジニアがこうした活動をうまくするには?

せっかくなので、この観点でどう進んでいくと良いのでしょうか?
会社の体制や文化などもありますので、すべてにおいてこうすると良い、というのはありませんが
こうすると良いかもしれません、というものをピックアップします。
(もちろんインフラエンジニア以外にもオススメのものでもあります)

1. ドメインの知識を深める

これはインフラエンジニアだけでなく、開発チームや運用チーム、会社に属するすべての人に当てはまります。
ドメインの知識を深めることで、
それぞれの立場でどういうことが重要なのか、どういうことがユーザーの満足度に繋がるのか、
どういうことがユーザーの満足度を下げるのかを理解するできます。

プロジェクトやプロダクトの立ち上げ時には、
ドメインの知識を深めるためのワークショップなどを行うことが多いと思います。
これは非常に重要なことですが、
プロジェクトが進んでいくとドメインの知識を深める機会がなくなってしまうことが多いです。
インフラエンジニアの場合、とくに何か困った状態になった場合などに呼ばれることが多く
ドメインの知識を深める機会もなく、単純に困ったことを解決するためのタスクをこなすことが多くなってしまいます。

これではドメインの知識を深めることができません。
ドメインの知識がないと、ユーザーの満足度を上げるためにどうすれば良いのか、
どういうことがユーザーの満足度を下げるのか、を理解することができません。
つまりモニタリングなども単純なAPIのレスポンスの監視になってしまい、
何をもっていして品質が良いとするのか、ということがわからなくなってしまいます。

こうした状態にならないように、インフラエンジニアもドメインの知識を深める機会にぜひ加わりましょう。
また会社自体がエンジニアとそれ以外などで分断している場合は、
ドメインの知識を深める機会を作ることも大事です。

2. インタラクションを理解する

ドメインの知識を深めることで、
自分達のサービスはどのようにインタラクションをしているのか、
どのようなことがユーザーの満足度を上げるのか、下げるのかを理解できます。
UI/UXの観点からも考えることができます。
この観点から考えられるようになると、必然的に監視の観点も変わってきます。

冒頭にも書いたようにリクエストレスポンスの速さだけではなく、
提供しているコンテンツが正しいものになっているか、
データ提供までの鮮度や正確性なども考えることができるようになります。

3. 分析に参加する

これはいわゆるインフラ的なモニタリングだけを指しているわけではなく、
GAなどの分析ツールを使ってユーザーの行動を分析することも含みます。

ユーザーの行動を分析することで、
どのようなことがユーザーの満足度を上げるのか、下げるのかを理解することできます。
またどのように分析できればユーザーの満足度やユーザーの行動を理解できるのか、
設計から考えることができるようになります。

これはSREとしての活動においても非常に重要なことだと思っています。
自分自身はSRE/DRE、事業戦略などにも関わることが多く(サポートさせていただいてる会社でもよくやっています)、
同時にドメインの知識も深めることができます。

これはマーケティングやプロダクトオーナーなどがやっていることですが、
一緒に行動したり加わることで信頼関係も生まれ、活動しやすい環境になると思います。
このような活動の繰り返しによって文化が生まれ、
品質がよく、満足度の高いサービスを提供することができるようになると思います。

4. 当事者意識を持つ

SREとしての活動をする上で、当事者意識を持たずに動いていくのは不可能だと思っています。
自分たちのサービスがユーザーにどういう影響を与えているのか、
どういうことがユーザーの満足度を上げるのか、下げるのか、
クレームなどから理解することも当然できます。
ですがそれだけでは不十分で、これまで述べてきたようにあらゆることが満足度低下に繋がる可能性があります。

これらを知り、それを防ぐための秩序などを作るのは誰かがやってくれるのではなく、
SREなのです。
色々な領域に関わる必要性も出てきますし、コミュニケーションも必要になります。
そして自分たちSREはどうしていきたいのか、をきちんと伝えて伝播させていったり文化を作っていく必要があります。
当事者意識を持たないままでいると、いい感じにしてくれるインフラエンジニアというイメージが定着してしまいます。

ほんの少しの前に進む力と、やるかやらないかだけです!

5. 仲間作り

ドメインを理解し、インタラクションを理解し、分析などにも参加することで
自然と当事者意識を持つようになると思います。
もしくは当事者意識を持っているからこそ、ドメインを理解し、インタラクションを理解し、
分析などにも参加することができるのかもしれません。

ここまで意識できるようになればあとは仲間作りです。
もしSREの活動をしている人がいるのであれば、合流して一緒に活動していくのが良いと思います。
もしSREの活動をしている人がいないのであれば、
仲間を作るための活動が必要になります。

ここまで書いたようにSREは活動の幅が広く、一人でできることは限られています。
文化作りや秩序を守るためのルール作りなどに専念できる環境であれば問題ありませんが、
インフラエンジニアとしての活動もある程度はしなければならないでしょう。
もしこれを読んでいる方がインフラエンジニアでなくても、日常的に携わっている業務があるはずです。

エネルギーが必要な活動なため、一人では難しいのです。
仲間を作り、一緒に活動していくことで文化ができていきます。 仲間作りが難しい、でもSREの活動をしたい、という場合は
SREとして活動している外部のエンジニアに相談してみるのも良いかもしれません。

まとめ

SREはサービスの利用者がハッピーになるための、あらゆる秩序や品質維持に関する活動です。
これはインフラエンジニアだけがするものではなく、
開発チームや運用チーム、会社に属するすべての人がするべき活動です。
決してコンテナーのオーケストレーションやクラウドの運用などの活動ではありません。
(が、コンテナーのオーケストレーションによってユーザーの満足度が上がるのであればやりましょう)

モニタリングなどはたしかにインフラエンジニアがやることが多いですが、
これはあくまで手段であり、目的ではなくいろんなエンジニアの方が協力してできる活動です。

自分自身でもできているのかというと、
すべての場所で、すべてがうまくできているのは当然なく、
体制の問題やSREについての認識の差異などもありできていないことも多いです。

ですがどんなドメインであっても当事者意識を持ち、事業に対してコミットしていくことは大事にしており
実際にやっていることも多いです。
アプリケーション開発をする場合においても、こうした活動をするときでも、
データエンジニアとしての活動をするときも、まずはドメインの知識を深めることから始めています。
そして怪しいと思ったら分析をして、ユーザーの行動を理解し、
自分自身でPDCAを回していくことで「これができたらよいな」、を実際に自分で取り組むようにしています。
(文章化やモニタリングの整備がまだまだですが、一人では全然できません・・)

もしかするとこれはSREとは違うのでは?と思うかもしれませんが、
SREとは何か、ということを考えるとこうした活動が一番近いのではないかと思っています。
もちろんいろんな環境によって差異やベストプラクティスはあると思いますが、
ユーザーのことを考えながら、社内のコミュニケーションや文化作りがセットになるため、
教科書通りにすることは難しいでしょう。

しかし行動した分だけ、ユーザーの満足度が上がり、
みなさんにとっても有益な楽しい活動になると思います。
ぜひ皆さんでやってみましょう!

思考の整理などにどうぞ

技術的負債と向き合うための取り組みでよかったもの例

技術的負債はどこにでもある

タイトルにあるように、
いくつかの開発チームと一緒に技術的負債を改善する開発や、それらに関する活動を行うことが多く
いろんな取り組みをしていく中で、よかったことがいくつかありました。
もちろん技術的負債を返すのは数ヶ月で終わるレベルのモノは多くなく、
何年から十数年もかかるものの方が多いはずですので、
すべて完了しているわけではないですが、その活動の中であくまで「今のところよさそう」というレベルのものです。

何番煎じかわからないくらいのものですが、
これを読んだ方が取り組んでいくにあたってヒントになればと思います。

普通の話しかありません。

会社全体で合意とSRE

これは当たり前ですが、念の為・・

以前もイベントでお話しさせてもらったりしましたが、
技術的負債は開発体験が悪くなり、モチベーションが上がらなくなるものでもあり、
そこから招く生産性の低下や色々なネガティブなものから、ビジネスに対してもコミットメントが著しく低下するようになります。
いろんな方面から見ても良いことにはつながらないことは明らかです。

しかし、会社や組織によっては技術そのものについてや、
エンジニアによる事業価値の最大化につながる基盤作りなどが
なかなか伝わらないこともあります。

これは諦めずにコミュニケーションをとってしっかり説明し、時間や予算をきちんと確保しなければならないです。
すでにサービス提供をしているシステムが対象になることが多く(サービス提供していないのに負債っていうのはないですね)
運用チームと負債返却などに当たるチームの二つが必要だったりします。
新しいサービスを提供したらもっと収益が伸びる!競合に!
色々あります。

しかし現状がその繰り返しの結果であることはきちんと理解してもらいます。
*もちろん予算確保などもあります

そしてビジネス側の観点でもしっかりと説明責任を果たすようにしていきます。

新しいサービスがなぜ必要なのか、収益を伸ばすのは新しい仕組みが必要なのか、
今あるものや、活動の仕方を変えるだけで最大化できないかどうか、
少ない時間で最大効果を発揮するものは何か、みたいなところも一緒に・・などなど
この辺りは書くと長くなるので、割愛します
うまくできない場合はみなさんの会社のCTOなどを巻き込んでみてください。

なぜすぐ開発にはいれなくなるのか、すぐできるのでは?
という定番の話などには、個人的にはAmazonなどで物を購入する生活で未開封が多くなった時に、
数年後に一発でみつけられるかどうかとか、家族の荷物混じってたらとか想像しやすい話を例えにすることが多いです。

まずは整理整頓などの大事さをしっかりと伝え、この辺りまでいくと会社としてはこうなっていきそうだ、
という将来の姿にいくためのものとして、あくまで事業の最大化・加速が目的であることをしっかりと。
こうした活動を実際にしてスタートしています。
(こういった支援もしております)

そして大事なのがSREの存在です。

お金や時間を使って進めていくものですから、今のサービスが実際にどうなっているかをしっかりと
理解できる状態にします。
これはサービスなどインタラクションの中で、どこがどうなっているのか、
それはこういった作り方をしてしまっているから、
などなどきちんと説明できるようにするためのものにもなり、
SLI/SLOのためでもあります。

憶測ではなくしっかりと数値化や可視化できる状態にします。
インフラレベルであれば色々なサービスがありますが、
サービスや体験としてどうなのかはツールの画面だけでは分かりません。

なのでしっかりSLI/SLOを文章化するなりが効果があります。
経営層も巻き込むのがよいことにつながることが多い印象でした。
作ったものはきちんと全体にも共有していきます。

SLIメニューはたくさんありますから、どれも参考にできるはずです。

この辺りの理解がそこまでない組織やチームの場合は、
Art of SLOsのワークショップを何十人も対象にして何回も実施しました。

意識作りと文化を作っていくのもやはり結局のところセットです。

この辺りを関係者でやることで、運用チームと負債返却チームなどできちんと連携をとり、
自動化して早くリリースできたけど、バグも増えて運用チーム辛いみたいな状態にならないように
しっかりやります。(ケースとして結構あります)

思想やコンセプトをしっかり・一緒に走る

負債返却となると、リプレイスなども多くなるので言語の選定だったりアーキテクチャ云々の話が必ずあります。
当たり前の話ですが、この時になぜそのアーキテクチャにするのかをしっかりと説明できるものにします。
これは手法が目的にならないように、
ドメインに対する理解がどのくらいあって、何を解決するために選んでいるか、などの観点でもあります。

この時に何かとてもガッチガチなものにしすぎないように気をつけています。

ある程度大きなものであったり、基盤システムのようなものは
しっかり作らないといけないため、ある程度堅くしなければなりません。
しかし大体方針を決めてから完成まで数年かかることが多く、
後になればなるほど当時考えたものがもう古い状態になります。

もちろん流行りに乗った方がいいものも多くありますし、
後知恵バイアスにならないように気をつけないといけないものでもありますが、
あくまである程度変わりゆくものだという前提で考えます。

ガッチガチなアーキテクチャを作り上げてしまうより、
思想やコンセプト・グランドデザインをしっかりと共有します。

リプレイスだ!こういうアーキテクチャでやろう、はもちろんみなさんやると思いますが、
チームの人が変わったりすれば、ガイドが目の前にあるコードだけになってしまいます。

もちろんドメインの理解や分析などができていない、
またはすっ飛ばしてしまっている場合は事前にしっかりと行います。
戦術パターンを実践するためではなく、全社できちんとコミュニケーションができるようにするため、
という温度感です。
そしてこの辺りの分析やモデリングは必ずワークショップを何回も実施します。

余談ですが、こうしたことを繰り返したことで文化的に良い方向になったところも多くあります。
エンジニアチーム以外とのコミュニケーションや、事業戦略にエンジニアが混ざったり色々変化が
これはまた違う機会に

むしろ戦術パターンをガッチガチに採用しないようにするのも選択肢としてやっています。
DOPなどであったり、ある程度モダンな技術に合わせたり、
ETL/ELT・イベントソーシングなどを利用することで切り離したりもすることも組み合わせるため、
全てに置いてガッチガチに採用するのは良いことではないこともあったからです。

話を戻して・・
思想やコンセプトは自分だけしかわからない状態にはせず、
かならず同じように理解して同じ景色がある程度見えるように意思疎通などを重ねます。
これは1on1とかもそうですし、色々やります。
コンセプトがないと色々破綻してしまうことが多かった・・

そしてチーム全体で共通認識を持ち、
入れ替わった時に同じ認識を持てるようにみんながしっかりコンセプトを話せる状態にします。

これは抽象度が高すぎないようにして、手法がガッチガチになりすぎない程よい抽象度に。
中長期すぎて想像しづらいものよりももうちょっと手前なものが認識しやすいです。

加えて実際に幾つかのリファクタリングであったり、リプレイスであったり、
インフラ設計やデータベースリファクタリング、
ソフトウェア以外のリプレイスなども一緒にやり、
目的に進むための技や知識を育てることをかならず取り組んでいます。
(チームによってはできないこともあります)
当たり前ですが言うだけの存在であればChatGPTだけで良いです

ドメイン知識と向かう方向を示すコンセプト、
そこに向かうための方法を実際に体験して、大変だけどやれそうだぞ!面白そう!
という状態をしっかりと作るように心がけます。

信頼関係づくりとかそういう観点でも良いと思います。(個人的には)

レビュー

色々やってみた中で一番しっくりきたのが、
コードレビューなどをGitHub上だけで終わらせず、
プロダクトやプロジェクトに関わるメンバーを集めてみんなでレビューをする、です。

圧迫面接のようなものではなく、あくまで平和的に実施することが条件ですが(当たり前)、
指摘というよりもコードの背景を教えてもらう、という趣旨です。
というのもコードだけであれば、何々っていう作り方の方が、とかこういう書き方の方が、
という話になりがちなんですが、
背景を知ることで「なるほど!」ということがたくさん知れたり、隠されたドメインの知識や
フローや組織の問題などが見えてきます。

技術的負債はそういったものにも原因がたくさんあり、
コードや実装の仕方だけではうまく付き合えないことがまぁまぁあります。
事態は複雑なのです。

他にも色々

他にもたくさんありますが、長すぎると読まなくなるのでこのあたりで。
負債返却のための技術的なアプローチは、
アクターモデルの導入がなんだかんだ非常に体験が良かったりしますので(規模によって合う合わないがありますが)
これはまたの機会に。

楽しく向き合いましょう!

DebeziumとKafka ConnectとSnowflakeを使ってニアリアルタイムなデータパイプラインを作る

ニアリアルタイムで更新されるデータ基盤を作るために

データ基盤をはじめとするデータ処理やその辺りに興味がある、
もしくは 作らないといけない!という形にもぴったりだと思う実際にやっている方法を紹介します。

Snowflakeとは?

すでにご存知の方は次の次くらいまで飛ばしてください。

www.snowflake.com

そもそもSnowflakeとは何か、というお話です。
Webアプリケーション開発を主としているエンジニアの方には、あまりお馴染みではないものだと思いますが、
ここでいうSnowflakeとはSaasのデータプラットフォームサービスです。
データ設計のスノーフレークスキーマのことではありません。

データプラットフォームとはいえ、RDBMSとかとなんか違うの?という話ですが、
これはただのリレーショナルなデータベースではなく、
PrestoやAthenaなどを利用している方、もしくはPostgreSQLのFDWとかを使っている方は連想しやすいと思いますが、
複数のデータソースを結合することに加えて(検索などもRDBMSとS3のparquetとか、CSVとかJOINできる等)、
データガバナンスとしてのデータアクセスに関する権限管理などができる、
データレイクやデータウェアハウス、データ活用や権限などデータに関するあらゆるものを管理できるものです。

サービスが成長するとあっちこっちのプロジェクトのデータベースを結合したり、
RDBMSではない全く異なる何かとRDBMSを結合したデータを作りたい、判断できる状態にしたい、
対象が何千万件、何億とあって結合や算出がアプリケーションのコードだけでは難しい、
などそういった状態をデータ基盤で解決することも多いと思いますが、
そういった場合などでも権限管理などと共に強力にサポートしてくれるプラットフォームとなっています。

データ運用していると、データマネジメントが必要不可欠になります。
例えばある権限を持つアカウントのアクセス以外は特定のカラムをマスクするなど、
通常のデータベースのみではなかなか難しいものを解決することができたり(データを書き換えるわけではない)、 Sparkなどを利用している方はSnowflake上でシュッと加工ができたり、
タスクを使って集計処理をSnowflake上で完結したりできるものです。
BigQueryと似たようなもんだと思うといいでしょう。

ここでいうタスクとはCronを用いたいような定期的に何かをするJOBだと思っていただければ良いです。

データ基盤は基本的にすべてのデータをイミュータブルとして扱いますので、
通常は更新などはあまり用意されていないんですが、Viiewやタスクなどを組み合わせて完結できるのは非常にありがたいです。
普段からHudiを使っている方にはイメージしやすいかと思います。

hudi.apache.org

なぜイミュータブルで持つ必要があるのかというと、
会社などによって色々差異はありますが、どのような時にどんなことが起きたか、
どんなことをしたらどうなったか、を基本としてビジネスなどの意思決定が行われます。

これが何もエビデンスがなく、誰かの勘だけで決定されていたらどうなってしまうのでしょうか?
施策などを実施したあとにうまくいった、うまくいかなかった、などはどのようにして判断できるのでしょうか?

結果の数値はスプレッドシートなどで共有されるのかもしれませんが、
その細部はどのようにしてわかるのでしょうか? アプリケーションがよく利用するデータベースでは、パフォーマンスや実装の観点から
ほとんどは直近のデータが多く格納されていると思います。
うまくパーティショニングなどがされて数年保管している、というのはごく一部のアプリケーションだと思います。

ほとんどの場合においてアプリケーションとして必要なデータはあるが、
過去は追えない(例えばいつ削除されていつ戻ったのか、などもフラグでは管理できません)、
データドリブンな意思決定の観点の設計をアプリケーションに多く取り込むことは、
不確定要素が多すぎるため難しいわけです。

ある程度見越せるものはありますが、
リリースされてからどんなものがあれば判断ができるのかはほとんど後で決まります。

こうした判断はさまざまな出来事の変化の過程や、
事実を判断材料にするためデータをエビデンスとして誰もが同じ認識で判断できるようにする必要があります。

なのでイミュータブルに保管する、という選択肢しかないわけです。

が、データを必要とする領域や人によっては積み上げたものをひたすら眺めるだけでなく、
ある程度の頻度で更新されるビューの方が都合がよくなります。

そういったものを解決する方法の一つがSnowflakeというわけです。

SnowflakeはS3などを定期的に取り込んで差分更新したり(S3は不変でビューとしてのSnowflakeが変更される)、
いくつかの便利なデータパイプライン機能がありますが、
RDBMSをうまくいい感じに差分更新したりといったものは単体では難しいです。

よろしい、ならばDebeziumだ

RDBMSの変更をリアルタイムで検知してその中身のスナップショットをとるぞ!という場合は
日本でもお馴染みになってきたと思いますが、CDCを利用するのが一般的です。
もちろんPostgreSQLのようにデータベース自身が様々なデータソースと接続できる機能があれば
問題ないものもありますが、アプリケーションの観点と全く異なるデータ分析やエビデンス作りなどを吸収して
うまく設計するのは非常に難易度が高いです。
このためデータをCOPYしてどこかに持っていくことが多いと思います。
CDCはデータが変更されたタイミングで変更後と変更前を吸い上げてくれる仕組みです。

Debeziumの動かし方や細かい解説はスタフェスのメンバーが解説していたりしていますので、
参考にしてください。

zenn.dev

kaz29.hatenablog.com

debezium.io

Debeziumなどを利用しない場合は、アプルケーション側のバッチ処理でなんとかしたり、
EmbulkなどのETL/ELTツールを使って転送することもできますが、
洗い替えの方法や、パーティション設計や、転送量(コストにもなります)、
メンテナンス性などを加味して選択するといいと思います。

DebeziumはDebezium Serverなどもありますが、ここではKafka ConnectのSource Connectorとして 動作させていきます。 Kafka(or MSK)を利用している環境であればSnowflakeへの転送もSink Connectorで解決できますので、
理にかなった選択肢ではないかと思います。

データの流れ

Debezium、Kafka、Snowflakeを組み合わせた場合に流れは次のようになります。

Debeziumを通じてMySQLなどからキャプチャーされると次のようなJSONがKafkaのメッセージとして送られてきます。

{
  "schema": { 
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value", 
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source", 
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "mysql-server-1.inventory.customers.Envelope" 
  },
  "payload": { 
    "op": "c", 
    "ts_ms": 1465491411815, 
    "before": null, 
    "after": { 
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { 
      "version": "2.3.0.Final",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 0,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "thread": 7,
      "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"
    }
  }
}

これに対して、Kafka ConnectのSnowflake Sink Connectorは次の 公式ページにあるような構造でデータ転送が行われます。

docs.snowflake.com

この中で下記のものにまずは注目します。

VARIANT Type
RECORD_CONTENT Kafkaメッセージが含まれる
RECORD_METADATA メッセージに関するメタデータ(例えば、メッセージが読み取られたトピック)が含まれる

これはSnowflake Sink ConnectorがSnowflakeのどのカラム名(VARIANT型)に書き込むかということを表していて、
DebeziumのJSONにある payload がRECORD_CONTENTに対応する形となります。

つまり、DebeziumでキャプチャーされるとSnowflake側の対応するテーブルの
RECORD_CONTENTにそのままVARIANTで転送されるということです。

これでできそうな感じがしてくると思います。

もう一つ、実際に気をつけなければいけないことは、
Debeziumから送られてくるメッセージの中の payload.opです。

このopの値によってRECORD_CONTENTのどの値を利用しなければならない、というのが決定されます。

payload.op payload.before payload.after
c = create レコード作成前は状態がないためnull 作成後のデータ構造
u = update 変更前のデータ構造  現在のデータ構造
d = delete 物理削除前のデータ構造 物理削除時はデータがなくなるためnull
r = read スナップショット取得前の状態はないためnull 現在のデータ構造

readはSELECTが発行された、ではなく
データベースのスナップショットから読み込んだもので全て r で送られてきます。
スナップショット取得以降は c, u, dで送られてきます。

何かあってDebeziumから再取得すると以前c, u, dで送られてきたものもrになりますが、Topicにあるものから再送するとc, u, dのままです。
この辺りはKakfaの仕組みを理解していれば違いがわかると思います。

大雑把にいうとc, u, dの場合は payload.afterを利用する
dの場合はpayload.before を利用する

ということになります。

物理削除がある場合はこのおおまかに分けた2つの違いを意識して
Snowflakeのテーブル操作による更新を行えるようにします。

いくつか方法はありますが、
ここでは分けて考えやすい、Kafkaから送られてきたメッセージを格納しているSnowflakeのテーブルから
物理削除とそれ以外に分けた2つのViewを作り、
その2つのViewを利用してテーブルの追加更新削除タスクを作成し、
スケジュールによるニアリアルタイム更新をしていく、
という流れをつくります。

でかい実践的なデータ構造の方が面白いんですが、
解説が大変なので、ここでは下記のすごいシンプルな実際にはないテーブルにします。

CREATE TABLE tests(
  id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(25) NOT NULL
) DEFAULT CHARACTER SET=utf8mb4;

ここでは特に触れませんが、
対象の環境によってはMySQLのDATETIMEなどに対して、
ZERO DATE系の値が入っている場合(0000-00-00、0000-00-00 00:00:00など)は、
そのままの値で変換されません。
1970-01-01 などになりますので注意しておきましょう。

その前にZERO DATEはDebeziumの設定でもいくつか対応しておかなければ転送時にエラーになりますので、
設定周りで対応するか、ZERO DATE利用をやめるなどをおすすめします。

Snowflakeで事前に必要なもの

まずはKafka ConnectのSnowflake Sink Connectorが書きこめるように
ロールの設定などを事前に終わらせておきます。

-- Use a role that can create and manage roles and privileges.
USE ROLE securityadmin;

-- Create a Snowflake role with the privileges to work with the connector.
CREATE ROLE kafka_connector_role_1;

-- Grant privileges on the database.
GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1;

-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE STAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE PIPE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;

公式リファレンスにあるものですが、Snowflake Sink Connectorが
テーブルを作成したり、連携のためのPIPEやSTAGEが作成できるようにしておきます。
(事前に作成して異なる権限を割り当ててもOKです。)

Kakfa からみるとただのConnectorですが、Snowflake上ではPIPEなどを組み合わせて
連携を実現するためこの辺りの権限が必要になります。

またKafka Connectorで利用するSnowflakeのユーザーにはデフォルトの権限やWarehouseなども割り当てておきます。

ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;
-- etc

このあたりを事前に割り当てておかないと連携ができず、Kafka Connect上でStatusがFailedとなります。

Snowflake Sink Connector

基本的な設定方法は公式にありますので、こちらを参考にしてください。

docs.snowflake.com

ハマりがちなポイントだけ紹介します。
connectorの設定のうち下記のものを注目します。

    "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max":"8",
    "topics":"topic1,topic2",
    "snowflake.topic2table.map": "topic1:table1,topic2:table2",
    "buffer.count.records":"10000",
    "buffer.flush.time":"60",
    "buffer.size.bytes":"5000000",
    "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443",
    "snowflake.user.name":"jane.smith",
    "snowflake.private.key":"xyz123",
    "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
    "snowflake.database.name":"mydb",
    "snowflake.schema.name":"myschema",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",

connector.class はSnowflakeのSink Connectorを指定するのでこれ以外はありません。(独自拡張している場合は除く)
tasks.max は環境に合わせて設定してください。
topics はDebeziumのメッセージが格納されているTopicを指定します。
複数ある場合はカンマ繋ぎで指定します。
複数テーブルへの書き込みをトランザクションに合わせて確実に欲しい場合はOutboxなどを利用してください。

snowflake.topic2table.map はtopicに対応したsnowflakeのテーブルをマッピングした表現で指定します。
ここで記述するsnowflakeのテーブルのRECORD_CONTENT、RECORD_METADATAにメッセージが転送されます。

snowflake.url.name ここはsnowflakeのURLですが契約プランなどによって指定方法に多少違いがあります。
事前に確認しておきましょう。
snowflake.user.name はSink Connector用のユーザーなどを割り当てておくと良いでしょう。
(なんでもできるアカウントは基本的に作れない、作らないと思うのでセレように作りましょう)

snowflake.private.keysnowflake.user.name に割り当てられている秘密鍵を指定します。
terraformなどを使って生成している場合やツールなどで作ると多少差異はありますが、
ここで指定する秘密鍵はヘッダー・フッター、改行コードを全て取り除いて一行で指定する必要があります。

snowflake.database.namesnowflake.schema.name はSnowflakeで転送するテーブルが存在している
データベース、スキーマを指定します。
ここで指定するデータベース、スキーマ、テーブルも含め、snowflake.user.name に割り当てられている権限が
アクセスできる状態でなければいけません(デフォルトロールで含まれていること)

value.converter は転送するメッセージによって異なります。
Avroを利用している場合は com.snowflake.kafka.connector.records.SnowflakeAvroConverter になりますが、
Debezium+JSON形式のメッセージの場合は com.snowflake.kafka.connector.records.SnowflakeJsonConverter になります。
必ずデータフォーマットに合わせて指定してください(Kafka Connectを理解していないとハマります)

ここまで設定がきちんとできていると後はConnectorを起動すると転送が開始されます。

テーブルを更新するためにViewをつくる

例として使うテーブルは下記のようなメッセージでRECORD_COTENTに保存されます。

{
  "after": {
    "id": 1,
    "name": "転送テスト"
  },
  "before": null,
  "op": "c",
  "source": {
    "connector": "mysql",
    "db": "test-db",
    "file": "mysql-bin-changelog.12345",
    "gtid": null,
    "name": "raw-data",
    "pos": 1,
    "query": null,
    "row": 0,
    "sequence": null,
    "server_id": 1,
    "snapshot": "false",
    "table": "tests",
    "thread": 75448658,
    "ts_ms": 1688962410000,
    "version": "1.9.7.Final"
  },
  "transaction": null,
  "ts_ms": 1688962410471
}

opがd以外に対応するViewは次のようになります。

create or replace view HOGE_DB.RAW_DATA.TEST_IU_VIEW (
    ID,
    NAME,
    DEBEZIUM_PROCESSED_TS,
    SOURCE_PROCESSED_TS,
    SOURCE_SERVER,
    SOURCE_DB,
    SOURCE_TABLE,
    DML_OPERATOR
) as
SELECT
    record_content:"after"."id"::NUMBER as ID,
    record_content:"after"."name"::NUMBER as NAME,
    record_content:"ts_ms"::STRING::DATETIME as DEBEZIUM_PROCESSED_TS,
    record_content:"source"."ts_usec"::STRING::DATETIME as SOURCE_PROCESSED_TS,
    record_content:"source"."name"::STRING as SOURCE_SERVER,
    record_content:"source"."db"::STRING as SOURCE_DB,
    record_content:"source"."table"::STRING as SOURCE_TABLE,
    record_content:"op"::STRING as DML_OPERATOR
FROM HOGE_DB.RAW_DATA.CDC_RAW_TESTS
WHERE lower(DML_OPERATOR) in ('r', 'c', 'u');

HOGE_DB.RAW_DATA.CDC_RAW_TESTS はメッセージが格納されているテーブルだと思ってください。
Viewの名前はdとそれ以外を区別できる名前であればなんでも大丈夫です。
snowflakeのテーブルなどは大文字小文字は区別されませんが、大体大文字で表現されるのでそれに合わせた表記にしています。
小文字で表現しても問題ありません。

RECORD_CONTENTに格納される Debeziumのpayload.opは record_content:"op"::STRING as DML_OPERATOR とすることで、
値を取得することができます。
古いバージョンだとrecord_content:"payload"."op":の場合もありますので、
RECORD_COTENTを確認してください。

これでop d以外に対応したViewになります。
物理削除を使っていない環境の場合はこれだけでも可能ですが、
物理削除を使っていて かつテーブルからは除外したい場合は、
この後のTaskで追加更新削除を表現できるように記述する必要があります。

続いて op dに対応するViewです。
先述した対応表に合わせてRECORD_CONTENTからの取得方法を変えてあげます。

create or replace view HOGE_DB.RAW_DATA.TEST_D_VIEW (
    ID,
    NAME,
    DEBEZIUM_PROCESSED_TS,
    SOURCE_PROCESSED_TS,
    SOURCE_SERVER,
    SOURCE_DB,
    SOURCE_TABLE,
    DML_OPERATOR
) as
SELECT
    record_content:"before"."id"::NUMBER as ID,
    record_content:"before"."name"::NUMBER as NAME,
    record_content:"ts_ms"::STRING::DATETIME as DEBEZIUM_PROCESSED_TS,
    record_content:"source"."ts_usec"::STRING::DATETIME as SOURCE_PROCESSED_TS,
    record_content:"source"."name"::STRING as SOURCE_SERVER,
    record_content:"source"."db"::STRING as SOURCE_DB,
    record_content:"source"."table"::STRING as SOURCE_TABLE,
    record_content:"op"::STRING as DML_OPERATOR
FROM HOGE_DB.RAW_DATA.CDC_RAW_TESTS
WHERE lower(DML_OPERATOR) = 'd';

after指定だったものをbeforeにして、Viewを別名にして識別できるようにしました。
これで追加更新と削除を分けてニアリアルタイムで更新するタスクが作れるようになります。

更新対象のテーブル

更新対象にするテーブルはCDC対象のテーブルと同じ構造にしておきましょう。

create or replace TABLE HOGE_DB.STAGING_DATA.TESTS (
    ID NUMBER(38,0),
    NAME VARCHAR(100)
)COMMENT='ステージングデータ';

シンプルこの上ない形ですね。

このテーブルに対して先に作ったViewのデータを元に操作するタスクを作ります。
タスク実行用のユーザーやロールを作っておくのがおすすめです。
タスク実行には、それぞれのViewに対してのSELECT (FUTURE SELECTとかにしておくのもいいでしょう)、
ここでいうHOGE_DB.STAGING_DATA.TESTS に対する SELECT、INSERT、UPDATE、DELETE
タスク実行ロールにOWNERSHIPを割り当てる、などがあります。

タスク作り

タスクとしては、HOGE_DB.RAW_DATA.TEST_IU_VIEW、HOGE_DB.RAW_DATA.TEST_D_VIEWのレコードを使って
操作するだけですが、ここで意識しておかなければいけないのは時系列です。
Debeziumから送られてくるメッセージは時系列が担保されていますので、
送られてきた通りに実行すれば問題ありませんが、追加更新と削除は判断するカラムが異なります。
そのために2つのViewに分けたのはわかると思いますが、
この2つに分かれたものそれぞれを独立したタスクにすると、どうなってしまうのでしょうか。

追加更新は時系列順にViewが並んでいるので大きな問題はありませんが、
独立してしまった削除はどのタイミングでタスク実行されるのが確実なのでしょうか。
ID 1が追加され、ID 1のNAMEが更新、ID 1を削除し、再びID 1を作成した場合、
削除以外のViewでは追加、更新、追加とならびます。最後の追加は重複したレコードが作成されることになります。
削除が保管されているViewにはID 1の削除が保管されますので、タスク実行のタイミングによっては
重複して作られたレコード両方を削除するかもしれませんし、
先に削除が動いてしまうかもしれません。
単純に二つのタスクに分けてそれぞれ実行するのはリスクがあるわけです。

解決方法はいくつかありますが、ここでは一番簡単な二つに分けたテーブルをくっつけて並び替えてあげる、ことで
時系列もデータの状態も正しく表現するようにしていきます。

更新対象になる HOGE_DB.STAGING_DATA.TESTS に対して
データの有無を判断して操作するにはMERGEを利用します。

タスクで実行されるものは次の通りです。

MERGE INTO HOGE_DB.STAGING_DATA.TESTS as tgt
USING (
    SELECT * FROM (
        SELECT *,
            ROW_NUMBER() over (
                PARTITION BY ID
                ORDER BY DEBEZIUM_PROCESSED_TS DESC
        ) as row_num
    FROM (
        SELECT
            ID,
            NAME,
            DEBEZIUM_PROCESSED_TS,
            DML_OPERATOR
        FROM HOGE_DB.RAW_DATA.TEST_IU_VIEW
        UNION ALL
        SELECT
            ID,
            NAME,
            DEBEZIUM_PROCESSED_TS,
            DML_OPERATOR
        FROM HOGE_DB.RAW_DATA.TEST_D_VIEW
        ) as u
    ) as t1 WHERE t1.row_num = 1
) as src
ON tgt.ID = src.ID
WHEN MATCHED AND src.DML_OPERATOR = 'd' THEN DELETE
WHEN MATCHED AND src.DML_OPERATOR = 'u' THEN
UPDATE SET
    tgt.ID = src.ID,
    tgt.NAME = src.NAME
WHEN NOT MATCHED AND src.DML_OPERATOR IN ('c', 'r', 'u') THEN
INSERT (
    ID,
    NAME
)
VALUES  (
    src.ID,
    src.NAME
);

二つのViewをUNION ALLで結合し、かつ重複してはいけないIDでパーティション、
時系列で最新が並ぶようにDEBEZIUMで処理した最新のものを取得するようにします。
最新になったもので、DML_OPERATORの値で追加更新か削除が判定できるようになるので、
その条件によって追加か更新か削除かを判定します。
つまり結果整合の状態になるようにするわけです。
これでテーブルに反映すべき最新に状態がわかるようになります。

このクエリをタスクとして、このブログのタイトルがニアリアルタイムなので5分おきに最新の状態に更新する、
というものにします。

CREATE TASK TESTS_VIEW_MERGE
WAREHOUSE = HOGE_WH
SCHEDULE = '5 MINUTE'
AS MERGE INTO HOGE_DB.STAGING_DATA.TESTS as tgt
USING (
    SELECT * FROM (
        SELECT *,
            ROW_NUMBER() over (
                PARTITION BY ID
                ORDER BY DEBEZIUM_PROCESSED_TS DESC
        ) as row_num
    FROM (
        SELECT
            ID,
            NAME,
            DEBEZIUM_PROCESSED_TS,
            DML_OPERATOR
        FROM HOGE_DB.RAW_DATA.TEST_IU_VIEW
        UNION ALL
        SELECT
            ID,
            NAME,
            DEBEZIUM_PROCESSED_TS,
            DML_OPERATOR
        FROM HOGE_DB.RAW_DATA.TEST_D_VIEW
        ) as u
    ) as t1 WHERE t1.row_num = 1
) as src
ON tgt.ID = src.ID
WHEN MATCHED AND src.DML_OPERATOR = 'd' THEN DELETE
WHEN MATCHED AND src.DML_OPERATOR = 'u' THEN
UPDATE SET
    tgt.ID = src.ID,
    tgt.NAME = src.NAME
WHEN NOT MATCHED AND src.DML_OPERATOR IN ('c', 'r', 'u') THEN
INSERT (
    ID,
    NAME
)
VALUES  (
    src.ID,
    src.NAME
);

ここでは5分おきに実行されるようにしています。
このタスクを実行するロールからRESUMEしてあげれば指定したSCHEDULE通りに実行されます。

SCHEDULEの指定方法はいくつかあり、
慣れ親しんだCRON形式で指定することもできます(SCHEDULE = 'USING CRON 0 */1 * * * Asia/Tokyo' とか)。

docs.snowflake.com

タスク実行時のエラーなどはAWSやGCP、Azureなどを介して通知することもできますので、
環境に合わせて設定するといいでしょう。

これ以外にも実現方法はいくつかありますが、
今回はコードすら書かずにDebeziumとKafka Connect、Snowflakeを使って設定のみで更新される
データパイプラインの作り方を紹介しました。

さいごに

ここではある程度リアルタイムにデータ更新したい、なんか実装とかしたくない、
Snowflake上でなんとかうまいことしたい、件数が多すぎてアプリケーションのスクリプトでやるにはちょっと、
みたいな時を背景としたものです。
規模やそもそものミドルウェアなどの利用状況、組織や文化によっては
良い選択肢ではないことも多くありますので、
ここにあるものをそっくりそのままではなく皆さんの環境に合わせて最適化してみてください。

今年買って大変良かったもの 2022

今年買って良かったものです。
何かの参考にあれば幸いですが個人的なものも含みますので、
ふーん くらいに捉えていただければ。

書籍

事業分析・データ設計のためのモデル作成技術入門

読んだ感想を書こうと思ってすっかり忘れてましたが、
概念的な要素も多分に含むモデリングですが、
わかりやすい数学的な解説(集合論とかもありますよ!)と、
読み進めていくと体系的に学べるようになっている内容でおすすめです。

データ設計とタイトルにありまして、
データモデルを学ぶ上でも非常に有用ですが、
ソフトウェア設計にもマイクロサービスアーキテクチャ的にも
自分たちのドメインモデルを作っていく上でもどうやって考えていけばいいか、
がまさにここにある、という具合です。

めちゃくちゃ良い本なので、年末年始のお供にぜひ

継続的デリバリーのソフトウェア工学

気がついたら2022年12月に発売されていた書籍。
原語版でもかなり好評だったもので、日本語で読めます。

タイトルはCI/CDをイメージする方も多いかもしれませんが、
そのまんまの内容ではありません。
工学というキーワードを見聞きすると、アカデミックな難しいものと想像する方も多いと思いますが、
著者がソフトウェアにおける工学を定義して(誰でもわかる内容です)、
様々な事柄を明らかにしていく内容です。

学びのエキスパート、複雑さ管理エキスパートという言葉が最初に出てきますが、
読み進める度にそのための要素を探っていくのが読んでいて非常に楽しく、
これからものづくりをやっていくぞ!という方にも、
ソフトウェアの本質を知ってより良い設計に繋げていきたい!という方にも非常に良いのではないかと思います。

年末年始や輪読会にも

ちょうぜつソフトウェア設計入門

ひさてるさんの著書です。
サンプルコードはPHPですが、他の言語を使っている方にも読めますし、
サンプルコードがJavaの本と同じテイストなので、
PHPに偏見がある方にもぜひ読んで欲しい、というのはありますがw
書影はキャッチーですが、ソフトウェア設計について真正面から向き合っている本です。

ひさてるさんらしい わかりやすい文章でオブジェクト指向を掘っていき、
密接な関係がある開発サイクルも含めていろんなプラクティスが散りばめられており、
クラスベースの言語や関数ベースの言語などにも役に立つと思います。

もちろん読んだだけで何かが解決されて全てうまくいく、というわけでもなく
コピペしてコードが書けるようになるわけではありませんが、
中級者になっていくエンジニアの方には大きな助けにもなると思いますし、
知識の補完やエンジニアとしての思考方法を広げてくれるものでもあると思います。

ほんとに書影からは想像できないくらいの内容で、
キャッチーでありながらも、ひさてるさんの深くて広い知識と経験から出てくる文章が
非常に素晴らしいです。
ミノ駆動さんの本と合わせて横に置いておくと、いろんなものに対応できるようになると思います。

Domain Modeling Made Functional

発売してから4年くらい立っていますが、思い出して買った書籍

ドメイン駆動設計を体系的に学べて、かつ関数型言語のF#を使って表現していく内容ですが、
本の半分以上はどのようにドメイン駆動設計を進めていくか、という点に割かれていて
注文を受け取って請求書を送るシステムを例に、
イベントストーミングでドメインを分析していくのが重要としています。
データ構造よりもイベントやワークフローを軸にしていくので、
メッセージングを扱うリアクティブシステムに興味がある、ES+CQRSも!という方にも非常に良いと思います。
あとデータ基盤などを作る上で、イベントストーミングは避けて通れず、
何がエビデンスとして価値に繋がるのか、などの観点からも本書は有用だと思います。
表現としての型を重視しているのも読み進めていくと納得できるものですし、
関数型でなくても書籍を読んでいくとコンテキストが共有されるので、
自分たちの場合はどうしていくのがいいか、と考えられると思います。

もちろんコピペして他の言語に移植するだけではこの本の価値はあまりありません。
むしろそれはドメイン駆動でもなんでもないものですので、
翻訳をしながらでもきちんと読み進めていきましょう!
これを読んだ後にF#のドキュメントを読みまくって、ちょっと触ってみたりして
ぜひ導入してみたいなと思いつつ、Scalaだけでも一杯一杯なのでやめましたw
(Scalaが難しいという意味ではありません)

そのうちこれらの本について深掘りした記事でも書こうかな

楽器

IT界隈では楽器演奏する方も多いと思いますが、
今年買って良かったものが多くあったのでついでに

Roland V-Drums TD-27KV2

ドラム自体は結構長くやっていて、エンジニアになる前はドラムばっかりやっていまして。
自宅に生ドラムがあったんですが(1セットはスタフェスCTOに譲った)子供が興味を持ち始めたのもあり、
ちびっこに生ドラムは流石に音量もデカくて、大きさも大人サイズなので叩けないし、
ということで思い切って購入。

www.roland.com

楽器屋をやっていた時の知識のまんまだったので、
生のドラムと全然別物だしなぁ、と悩んでいて実際に店頭でも試奏させてもらったりして、
ハイハットが一枚で表現されてるし、スネアもなんかなぁ、と思っていたんですが
drumtecの動画をみたりして、購入をしたわけです。

www.youtube.com

セットアップしてみると、たしかに生ドラムのようにチューニングでパッと音が作れるとかではないですが、
結構細かくトリガーの設定もできるし、音も自分で組み合わせたりできて、
一番感動したのは重ねシンバルがあらかじめある!という点。
元々terry bozzioが大好きだったり、最近だとほとんどが重ねシンバルで構成してるbenny grebだったり、
そういうドラムスタイルに影響けていたりして、
20代の頃からシンバルの半分は重ねシンバルでセットしていたので、
普段と変わらず叩けるっていうのはいいですね。

www.youtube.com

youtu.be

あとスネアドラムもデジタルではありますけど、かなりよく再現されていて、
メッシュのヘッドもかなり満足度が高い。
生ドラムではなくこの手の電子ドラムから始める方は、
ベロシティなどを少し調整して、強めに叩かないとしっかり音が出ない、とかにすれば良いかもしれません。
太鼓を鳴らす感覚はもちろんないので、それは生ドラムで養えばいいくらいですね。
クリックももちろんあるし、コーチ的なモードもいいです。
もっと早く買えば良かったなぁっていうくらいのもの!

Ibanez QX527PB 7弦ギター

20代の最初のうちから7弦ギターをずっと触っていて、
いろんな事情で手放してからは6弦ギターだけ手元に残していたんですが、
やっぱりどうしても7弦ギターが欲しくなり、
個人的にはフロイドローズ系のトレモロで24フレットで、というこだわりがあったんですが、
最近はそうしたギターもあまり販売されていおらずという時代でもあるなかで、
strandbergが最近の楽器メーカーに影響を与えているのか、
いろんなところからヘッドレススタイルのギターが出ているわけですね。

結構いい値段するし、趣味ギターおじさんにはちょっともったいないなぁ、ということで
このQX527PBがスポット販売された時から気になっていて、
でてもすぐ売り切れという状態だったんですが、夏ぐらいから再生産されたタイミングで遂に購入!

www.ibanez.com

スラントフレット(人間の手首の傾きに合わせて斜めにフレットが打たれているもの)は非常に楽で、
ちょうどいい7弦ギターのネックの太さと、めちゃくちゃ軽量で、
薄さゆえにしっかり生音がする、というのも非常に気に入ってしまいました。

元々ついてるピックアップもまぁまぁ良いものではあるんですが、
リアは購入してすぐにSEYMOUR DUNCAN Pegasus Passiveに変えてしまいました。

https://www.soundhouse.co.jp/products/detail/item/184229/

手元にある6弦ギターには、ブリッジにDuncan DistortionとDuncan Customをのせたものが一本ずつありますが、
Pegasusはそれらよりも出力がちょっと低めで、
パワフルに歪むという系統ではないんですが、音の分離もよく、倍音がまた綺麗なやつでこれもまた満足。

ちなみにこのギターのピックアップ交換は非常に楽な仕組みになっているので、
自分で交換したことがある方は感動すると思います。

Ibanez PIA

Steve Vaiは高校生の頃からずっと聴いていて、いまだに憧れのギタリストなんですね。
そんなにたくさんは弾けませんがw(弾けたらエンジニアになってませんねw)

前のモデルのJEMの方が印象が強くて、当時買えれば良かったんですが、
シュッと購入できる状態ではなかったんですが、
上記にあるQX527PBを購入して、昔のようにいろんな曲を練習したり、
運指のトレーニングだったりをやっていくうちに、
憧れのギタリストのモデルが欲しいよね、という欲望と、
IbanezのEdgeトレモロの安定感が素晴らしく、手元にフロイドローズオリジナル搭載のFender ストラトがあるんですが、
24フレットも欲しいし、ということもあって遂に買ってしまった。

www.ibanez.com

フロイド系のトレモロは構造上胴鳴りはあまり期待できないんですが、
アルダーボディで日本製、しかもj.custom的なグレードのものでもあるためか、
予想外に鳴りがあってネックも鳴るんですが、この鳴りの一体感がとても素晴らしくて、
ついつい弾くのが楽しくなってしまうギター!

買って良かった!と思えるものですね。
これは素晴らしい!

Line6 Pod Goも実は買ったんですけど、これ一台あれば完璧ですね。
オーディオインターフェースに突っ込んでもいいし、ヘッドホンを繋いでシミュレーターを堪能してもいいし。
自分はPod Goをプリアンプにして(キャビネットのシミュレーターをオフにして、アンプをプリアンプにします)、
ギターアンプのリターンに突っ込んで使ったりもしていますが、
音がとにかく幅広いのでなんでもできます。
すごい時代だ・・!

来年はあまり買わないようにしよう・・

今年も大変だった 2022年振り返り

2022年は大変だった

年末でいつもの振り返り。

去年に引き続きブログ等のアウトプットもあまりしていなかったので、
振り返りとどんなことをしていたのか、 毎年の殴り書きです。

アウトプット

今年もアウトプットは意図的に抑えていました。
やはりコミュニティには新陳代謝は必要だと思うので、
いつものおじさんにならないようにCFPなどはほとんど出しませんでした。

登壇はPHPerKaigi 2022の「入門 境界づけられたコンテキスト」のみ

fortee.jp

トークイベントとしては
ミノ駆動さんと
ビジネスとアーキテクチャとアーキテクトなどのテーマを扱ったトークをしました。

flxy.jp

上記で内容はサマってあるものの、結構脂っこい話をしたりしました。
ここ数年はオフラインイベントが難しかったりしていますので、久しぶりに面白い話がたくさんできた・・。

もう一つは
あらたまさんと リアーキテクティングについて、これまでの取り組みの内容やどういった観点で判断していくか、
ビジネスはどうなっていくのか、などなどこちらも楽しいテーマでトークしました。

flxy.jp

もっと時間があれば、もっとたくさん具体的な話ができるのに!という所感ですが
これはこれでまた時期がきたらどっぷり肩まで浸かって話すみたいなことをしたいなと思いました。

スタフェスの現在進行形についてのリアーキテクチャ等については後述。

スターフェスティバル

メインでコミットしているスターフェスティバルでは、
エンジニアが増えてきて これからが面白そうな取り組みができる下地が少しずつ構築されてきた、という具合。
これまでエンジニアが多いような組織体系の会社ではなかったところに、
プロダクト開発体制に変わりながら一緒に未来を作るエンジニアが少しずつ増えてきて、
ついに今年はアドベントカレンダーが余裕で埋まってしまうという状態に。

qiita.com

みんな日常の開発等が忙しい中、アウトプットが好きな人たちが多く
投稿前にみんなでレビューしあったりして、大変素晴らしいエンジニア組織になってきたなぁという一年。

自分は引き続き将来の事業にフィードバックしていくための仕組みづくりや、
エンジニア外に対しての意識作りなどを継続して活動 & 実践をしたような気がする。
ドメイン駆動ということで、開発だけでなく会社全体に認知されるようにコミュニケーションを取ったり、
エンジニアとそれ以外、自分たちだけ、という関係性じゃないよ的なことも取り組んだりしてました。

去年インフラエンジニアが抜けたあとに、自分がしばらくやっていたところに
同じようなベクトルを持つエンジニアがジョイン! (koonagi)

インフラ周りはもちろんデータ基盤関係も一緒に取り組むことができて、
ひたすら地道に続けていたデータをとある形でプロダクトとして繋がりはじめて
これからどんどん面白いことができるのではないかなぁというのが見えてきたところ。

来年はsnowflake等を使いながらクラスタリングや機械学習などを使って、
価値あるものを作っていくぞ!と。

Webアプリケーション側はアドベントカレンダーにも書いたので、そこを参照してください。

blog.ytake.jp.net

マイクロサービスアーキテクチャはやる予定はなく、そっち方向に倒すつもりも現在ありませんが、
仕組み上どうしても分散トランザクションを扱うものがちらほら見え隠れしているため、
Scalar DB(KotlinかScalaと合わせる)を導入するか、Akka等でリアクティブな骨組みを作り、
その上でNode.jsやPHPのアプリケーションが構築される、みたいなのを準備していこうかなとかやんわり考えています。

github.com

基盤整理だったり業務フローに関係するものの再構築に関わっているチームは、
KafkaだったりCDCに徐々に慣れてきているところで、
イベントだったりメッセージングを実際に向き合っているので
来年等は面白いアウトプットがチームから出てくるのではないかと思ってます。

毎年やろうやろうと思っているフロントエンドアプリケーションはあまり手がつけられず、
AWS Amplifyで色々作ってみようと準備したところで年末になってしまった。
社内では開発をずっとしている、みたいな動きはしていないので
ytakeのコードとか実装をたくさん知れる!みたいな感じではありませんが、
お、面白そうじゃん?っていう方はぜひお話ししましょう!

stafes.notion.site

カジュアル面談は弊社エンジニアまで気軽に連絡してください!

副業

株式会社ネットプロテクションズ

以前から副業でお手伝いしていた株式会社ネットプロテクションズ(以下 np)のPRが出た!

prtimes.jp

npはネットで買い物をしている人なら大体見たことがあるのではないか、という
NP後払い等を展開している会社です。
技術とドメインを結びつけて次に進むためのお手伝いを継続的にさせていただいており、
ドメイン駆動・開発のサポートだったり、事業のところにも少しずつ入り込んだりしています。

システムのリプレイス、新たな言語選定だったりチームビルディングだったりを組織全体で取り組んでいるところで、
まだまだエンジニアが足りておらず、たくさんのボトムアップも必要な段階で絶賛エンジニア募集中なので、
ちょっと話聞きたいな!とかあればこちらも気軽にどうぞ!
今話題のKotlinやGo、Node.js(TS)だったりで色々やっています。
エンタープライズ的なアーキテクトになりたい!っていう方にも良いフェーズだと思います。

他にもいくつかの企業様でお手伝いを継続的にさせていただいてまして、
来年とかにもまた色々アウトプットが出てくるのでは、と思いますので
乞うご期待

プライベート

夏ぐらいから介護看護的な事象が発生したりして大変な中、
首がヘルニアになってしまって、しばらく動かないようにして寝ながら仕事したり、
勤務時間を少なくしたりして休んでいました。
ヘルニアからの神経痛と、ものすごい肩こりからくる神経痛の両方が併発してしまい
一ヶ月半くらいほとんどキーボードも打てない状態だったりしましたが、
12月に入ってからヘルニアが落ち着き、
ものすごい肩こりからの神経痛だけが残るというところまで回復してきたので
新年会等やりたいな!と思ってます。

そしてなぜか楽器熱が上がってきて(知っている方は知っているかもしれませんが、エンジニアになる前は音楽系だったりします)、
楽器が急に充実してしまった。
これはまた別のエントリとして書こうかなと思います。

来年はまた少し大きなアウトプットが出せそうなので
楽しみにしながら(大変な現実から目を背けている)、
引き続きビジネスに貢献を実践しながら同じような動きをしていくエンジニアを各所で育てて
圧倒的な成長に繋げるおじさんを継続していこうと思いますので、
関係者の皆様は引き続きよろしくお願いいたします。

現状と向き合ってシステムを考える時の頭の中

このエントリはスターフェスティバル株式会社の スターフェスティバル Advent Calendar 2022
16日目記事でもあります。

みなさんは開発する時にどう考えていますか?

大した内容ではありませんが、今回は開発をする上で
「どう考えて設計して表現していくか」、という永遠の悩みの中で
自分が複雑な物事に立ち向かう時の頭の中を少し書き出してみようと思います。
各カンファレンスなどで話しているものを結合したものではあります。

一緒に仕事をしたりしている方々にはお馴染みの話です

前半くらいは前提の話や分析の思考、
後半はイベント駆動などにおけるメッセージについて
という流れになってます。

ちなみに自身はスターフェスティバルではアプリケーション全般の開発には関わっていますが、
主にデータ基盤やデータドリブンなマインドを伝播させていくことや、
データを使った戦略を立てながらのプロダクト作りや、インフラ全般に携わっています。
(オフライン勉強会とかも少なくなったのでこういう話をする機会も減りましたねぇ・・)

細かいドメインモデルやデータモデル、設計手法の云々等については触れません(十分なボリュームすぎて・・)。

商品とは?

せっかくなのでスターフェスティバルで取り組んでいるテーマを例にしてみましょう。
*そのものずばりの内容ではなく、実際のものよりも単純にしていたり想像しやすい内容にしています

前提としてスターフェスティバルは下記の領域を取り組んでいる会社です。

企業理念
ごちそうで 人々を より 幸せに

飲食店が中食・デリバリーに参入するためのソリューションとして、
「製造」以外の部分にあたる「商品開発」「販路提供」「販売促進」「注文受付」「決済」
「配達」のすべてをスターフェスティバルが一気通貫でトータルサポートいたします。

また、最大級のモールを運営する弊社ならではの、製造・物流ネットワークや
販売チャネルを活用したサービスも行っています。

何らかのシステムを介して、おいしいごちそうを頼みたい!ユーザー(喫食者と呼んでいます)、
おいしいごちそうを提供したい製造元(以下製造パートナー)と
そしてそれを届ける配送が全国各地でリアルに動く、と大まかな関係があります。

では商品とは何を指すのでしょうか?

ドメイン知識などがない場合は、商品という言葉から
「8割くらいごちクルで販売している弁当とかのことを商品というんだな、
データベースにはこの商品があるだろう」
と思うはずです。

これは購入する喫食者の視点からみた時の認識であると同時に、
開発者としての視点での認識でもあります。

現状を理解せずに喫食者(らしい者)と開発者視点のままで開発をしていくと、
半分は飲食店のメニューとして並んでいるような商品で、
半分は商品の属性などが入り混じった商品という言葉から連想されるものが結合した何か、
データベース設計でいうならばポリモーフィック前提のような構造になってしまいます。

これを読んでいるみなさんの周りのアプリケーションにも似たような状態のものはたくさんあると思います。
アプリケーションがきちんと機能していればネガティブなものではありません。
多少見通しが悪いなどはありますが

これまではこれでも良かった。

が、物事は永遠に変わらないわけではなく、
色々と変化が必要なタイミングもあり業務フローとシステムを大きく見直す必要が出てきました。

取り組んでいる領域には「ごちそう」を軸に

  • 喫食者
  • 製造パートナー
  • 配送者

の3つがあることはわかっていましたが、本当にそれだけでしょうか?

実際は製造以外の分野を引き受けるため、
ごちそうを商品として登録する運用チーム、営業チームがあり、
このチームが製造パートナーの商品情報を受け取り、
サービスが提供する商品として成立するための情報を付与し、
はじめて喫食者が目にする「ごちそう」となります。

よく考えたら当たり前の構造ではありますが、
これまではよくても(雑にいうと)事業成長を支えていくためには現在の会社の状況を正しく理解し、
業務フローもスケールしやすくするためにはある程度ながく戦えるシステム・仕組み(例えです)を作らなければなりません。

事業を支えていくような基盤の場合は短い期間で大きなリプレイスが難しい場合もありますし、
サービスの特性や領域によっては簡単にリプレイスを繰り返すものもあります。
皆さんの環境や領域ではまた変わってきますので、皆さんの環境に置き換えて考えてみてください!
(これはあくまで例です)

業務フローとシステム(みなさんが想像しやすいアプリケーションと思っていただいて良いです)は
密接な関係があり、「運用でカバー」がそのまま謎の進化を遂げたり、
とりあえず今の要求通りで動けばいいやというシステムは、
やがてスケールできない業務フローや事業戦略が立てづらい企業と繋がっていきます。
(そのために継続的なリファクタリングや、変化に対応しやすく未来を一緒に作っていけるアジャイルなどを取り入れたり)

現状をより理解する

では長く戦うために考えることは、
マイクロサービスアーキテクチャ
でしょうか?

いやいや、そうではなくて現在の業務フローや構造などを再度見つめなおしてみます。

「ごちそう」と表現される商品の大元は、
弁当などを製造する店舗・製造パートナーがありそこで認識される商品は、
あくまで製造側が認識できる「xx弁当」という商品になります。

たとえば製造側は上記のような認識だとしましょう。

製造パートナーが認識できる商品に対してスターフェスティバルで必要な情報が付与されます。
配送に関連したものであったりさまざまですが、
店舗が認識している商品とはまた少し違った形となります。

同様に配送する場合に認識する商品もそれぞれ似たところもあるけど異なるもの、
となります。

(もちろん実際とは異なります)

つまり商品は3者の認識が混入しているっぽい、と当たりがつくようになります。
これは開発者だけではなく社内の各方面の方々とコミュニケーションをとりながら、
実際に言葉の齟齬がないか背景や事実確認などを行いながら現状を深掘りします。
社内で共通の言葉を使いながら、領域ごとに多少の背景を汲み取りながら
それぞれの領域で疎通ができるようになればはっきりと境界線が認識できます。
(これは実際に自分でやっています。)

これを認識せずに実際のコードやシステム設計に落とし込むと
データモデルとも大きく乖離してしまうデータベース設計(フラグを多用したり、ライブラリで使いやすい構造になってしまったり)や、
商品という名前を使った巨大ななにかに繋がるコードとなってしまいます。
これはクラスベースの言語でなくても、型演算多用で何者にでもなれてしまう何かになってしまったり
どういう指標で作っていくかはすべて実装者依存になってしまいます。

これでは10年戦えるシステム・仕組みどころか、
実装者が変わった途端に「なぜこう作ったんだろう?」のみが残ってしまい、
開発者の好みなどの粒度でさらに分割したり結合したりとどんどん複雑になってしまいますので、
みなさんも「あれ?」という瞬間があったらぜひ現状の分析をしてみてください。

起点を探す

さて、この記事はここからが本題です。

この境界線はそれぞれの立場や業務などの背景があるのは明確ですが、
それはどのような瞬間から分かれていくのでしょうか。

それは下記のものがキーとなります。

製造以外の分野を引き受けるため、  
ごちそうを商品として登録する運用チーム、営業チームがあり、  
このチームが製造パートナーの商品情報を受け取り、  
サービスが提供する商品として成立するための情報を付与し、  
はじめて喫食者が目にする「ごちそう」となります。  

つまり一番最初の事象は、
製造パートナーからシステムを通じて販売したい商品データをもらう・投入してもらう
という事象を起点に商品に対する認識が少しずつ変化します。

そして商品に対するさまざまな情報が加えられたりすることで「ごちそう」となるわけです。
雑に整理すると下記のようになるでしょう。

  • 製造パートナーが販売したい商品情報を入稿した
  • スターフェスティバルがサービスで販売するための商品として承認した
  • 商品が「ごちそう」へと生まれ変わった

(多くなるのでこの辺で・・)

もちろんこの事柄以外にもありますが、価値として大きなものはこのようになります。
ではなぜこの事象を取り上げたのでしょうか?

実はこの事象はWebアプリケーション開発の視野と異なる視点でみると
少し違う見方ができます。

これらの事象は製造パートナーとして販売したい商品がどのくらいあり、   販売できなかった商品がどのくらいあったのか、
「ごちそう」と変化した商品がどのくらいあったのか
という事業としても製造パートナーとのやりとりの改善や、
他にもさまざまなエビデンスとなり活用できる価値のあるデータへとつながります。

そして物事の始まりでもある「製造パートナーが販売したい商品情報を入稿した」事象をきっかけに
いろいろな世界へと繋がることが見えてきます。
人によってはまるで小さなビックバンを見つけたかのような感覚にもなるかもしれません。
自分はよく
「今ここでジャンプしたら地球の裏側にはどういうことが起こるかワクワクしながら想像するようなもの」と喩えたりしています。

そしてこの例の事象1つ1つは会社活動やサービス、「ごちそう」など、
それぞれの視点だけで完結するピュアな事象に近いということがわかってきます。
各領域のアプリケーション中ではもっとたくさんの事象がありますが、
会社活動としても重要な事象はどれか、という思考で分類します。

たとえば事象の起点から最終的に「ごちそう」へ変化するまでの道のりを俯瞰することで
物事を把握できるデータ・エビデンスとしても活用できる、
かつ社内外の業務フローからみても、この事象はピュアなまま活用できます。

あくまでたとえばの話ですので、みなさんのアプリケーションや会社活動の中ではどうなのか、
と向き合ってみるとまた違うものが見えてくると思います。
(時系列があることはしっかりと認識しておきましょう)

やがてはこの事象が「あの時どうだったっけ?」という真のデータソースとなり得るもので、
それぞれの立場・領域でこれを元に世界が広がっていく拡張されていくデータの流れとしても捉えることができます。

事象が保管されることでそれぞれの領域に閉じた構造を作ることができるようになります。
お互いを行き来する場合でも相関IDや真のデータソースを介することで
翻訳しあえることも見えてきます。

この事象を更新などで変化をさせずに的確に保管し、
読み込み時にそれぞれの領域の視点を取り入れた構造にすることもできそうです。
(複製しても再度やり直せば良いということにもなります。)

もちろん1つのDBですべてが完結していて
そこで分析やさまざまアプリケーションが動いていて問題がなければ難しくする必要はありません!
あくまで捉え方の例としてください。

どういう仕組みで作っていくか

ここでやっと実装の話に。

事象が発生したタイミングで複数の領域の視点や関心が加わっていくということはわかっています。
この事象が発生したよ、と複数の領域に伝播・指示できれば良さそうです。

もし領域ごとにアプリケーションが存在しているとするとどう取り組んでいくと良いでしょうか?
この場合は社内業務用のアプリケーションに社外用のデータ投入アプリケーション、
そして販売サイトとして作用するアプリケーションとなりますのでアプリケーションが分かれていることになります。
*注意 マイクロサービスアーキテクチャではありません。

気をつけなければならないのは、
複数の領域があるためHTTPを挟んだりデータベースへ直接書き込みにいったり、
データ処理のためにBigQueryやS3へ一緒に書き込むぞ!
という選択肢をとると難しい問題と立ち向かわなければなりません。

ここまで述べた中でいくつかの領域があるため、
たとえばシステムが分かれていた場合は3領域に対してデータベースに書き込んだり、
HTTPを挟んで書き込んだりする場合(Web APIなど)は、
どこかの領域で失敗した場合、どこで整合性が担保されるのかという問題があります。

リトライを挟んだとしてどこで3つの領域に書き込まれたよ、と担保できるのでしょうか。

なにかの障害などでどこかの領域に指示ができない場合はどうしていくのでしょうか?

それぞれの領域で構成変更などが起きた・起きる場合は同時に対応しなければならなくなりそうです。

分かれているシステムへ直接データベース書き込みにいくと、
リリースやデータ構造の変更が発生する場合は一緒に行動する必要が出てきます。
加えてどれかの領域の関心が入り込んだテーブルやカラムなどが追加される可能性が高くなっていきます。
どこかで必要だけどどこかで必要ない、という形になっていきます。

このアプローチでは2相コミットが発生するのと、
相手が必ず生きていることを前提とした同期処理ということになってしまいます。

ja.wikipedia.org

つまり指示元が相手先を考慮しなければならなくなります。
お互いに仕様変更がある場合なども大変そうなことも見えてきます。

難しい場合はマイクロサービスアーキテクチャだ!分離だ!と意気込まずに、
まずは結合したアプリケーションとして着手していきましょう。

この例の場合で、各領域に影響を与えず、かつそれぞれの領域独自の視点を入れてもよい、と考えると
メッセージブローカーなどを介するのが良さそうです。

結果整合を選択することになりますが、この例の場合は問題ないという前提にしておきましょう。

事象が起きたことを記憶したい領域がメッセージブローカーに伝えることができれば
指示を受ける側に何かあっても気にする必要はなく、
各領域のタイミングで自由に受け取ることができます。

これにはSQSやMSK、Kinesisなどがありますが
弊社ではMSK(Kafka)を利用しています。
SQSなどのQueueとMSK、Kinesisとの違いなどはまた別の機会にするとして、
ここではKafkaを選択したとしましょう。

*受け取る領域が途中で増えたとしても過去の事象を再送することができるので、再送のために再度指示を投げる必要もありません

メッセージをどうするか?

メッセージブローカーに送信するにはメッセージが必要となります。

メッセージはIDだけを記述して受信側が再度問い合わせる、
という形にすると「あの時どうだったっけ?」ができているつもりでも、
「あの時」が更新された最新のものになってしまうことになります。
必ず事象をそのままスナップショットにしましょう。

肝心のメッセージの送信方法ですが、大まかに2つの方法があります。
1つ目はoutboxパターンを用いてメッセージブローカーに送信、
2つ目はメッセージブローカーに直接送信があります。
(Akka等はこの記事では触れません)

www.infoq.com

outboxパターンを用いる場合は、データベースのトランザクションを使って
送信したいメッセージをテーブルに書き込み、CDC(change data capture)を介してメッセージブローカーに送信されます。

この場合はoutboxとして利用するテーブル以外にも、
送信元のアプリケーションで必要なデータベースに書き込む必要がある場合などにも利用できます。
レガシーアプリケーション改善などにも活用できます。

注意点としてはCDCの対象はテーブル単位となりますので、
複数テーブルの変更をバラバラで受け取ることになりますのですべてを同時に受け取ることは難しくなりますので、
その場合は1つのメッセージで完結するような集約を作る必要があります。
AWSのDynamoDB Streamsなどで用いられている仕組みのようなもの、と捉えてもらえると良いと思います。
WAL(Write Ahead Log)で変更などを検知して吸い上げてメッセージとして送信されるものです。

Debeziumでoutboxパターンをサポートしてくれる機能がありますので、
興味のある方は下記をどうぞ。

debezium.io

すでにスターフェスティバルのアドベントカレンダーに一部がありますので、
こちらも合わせてどうぞ!

zenn.dev

これについてはまたどこかで解説しましょう。

メッセージブローカーに直接送信する場合は、
送信元のアプリケーションでリアルタイムにデータベースへ保存する必要がなく、
メッセージブローカーに送信さえできれば良いという場合に利用します。

この場合は上記にも記述しましたがメッセージブローカーに送ってデータベースにも書き込んで、
といった実装になる場合は2相コミットが発生しますのでなるべく選択することを避けてください。

データベースなどへの書き込みなどはコンシューマーとなるアプリケーションが担当、
もしくはKafka Connectなどを通じて転送して保存できます。

RDBMSやS3、BigQueryやさまざまなデータソースに転送したり、
もちろん前述のCDCとKafka Connectを組み合わせることができます。

なおMSK、Kinesisなどは送信時にトランザクションを利用できますので、確実に届けることができます。

1つの領域で閉じたメッセージ送信などの場合は、アプリケーション側で担保できますが、
本記事の例では複数の領域やデータ処理などにも繋がっていくため、いったんメッセージブローカーを介していきましょう。
(いろんな言語で解説するのが大変なのでGoの例にします)

メッセージを作る

すべての例を記載していくのは大変なので、下記のものだけをやっていきましょう。

  • 製造パートナーが販売したい商品情報を入稿した

特別な知識は排除した簡単な例ですが、おそらく次の構造になるでしょう。

{
  "correlation_id": "11111122223444",
  "store_name": "ytakeキッチン",
  "store_id": 2,
  "product_name": "めちゃくちゃおいしいカレー",
  "wholesale_price": 100,
  "price_including_tax": 108,
  "tax_rate": 1.08,
  "comment": "めちゃくちゃおいしい",
  "type": "new",
  "created_at":"2022-12-12T00:00:00+09:00"
}

だいぶ省略していますが税率が複数ある場合はこのままでは対応できませんので、税率を含めて構造化等を。
データベースに直接突っ込むものではありませんので、
実際に永続化するときは商品と価格などは分けておきましょう。

package message

import "time"

type EventType string

const (
    New    EventType = "new"
    UPDATE EventType = "update"
)

// Product 商品に関するメッセージを表現したもの
type Product struct {
    // CorrelationID 相関ID
    CorrelationID string `json:"correlation_id"`
    // StoreName 製造パートナー名
    StoreName string `json:"store_name"`
    // StoreID 製造パートナーID
    StoreID int `json:"store_id"`
    // ProductName 商品名
    ProductName string `json:"product_name"`
    // WholesalePrice 卸値 税抜
    WholesalePrice int `json:"wholesale_price"`
    // PriceIncludingTax 卸値 税込
    PriceIncludingTax int `json:"price_including_tax"`
    // TaxRate 税率
    TaxRate float64 `json:"tax_rate"`
    // Comment 何かあれば
    Comment string `json:"comment"`
    // Type データ入稿が新規か更新かなどなど
    Type EventType `json:"type"`
    // CreatedAt 事象の発生日時
    CreatedAt time.Time `json:"created_at"`
}

良さそうに見えますね。

待ってください?!

たとえばデータ投入で少し変わってきた場合にどうやって構造が変更されたことを担保するのでしょうか?
型指定があまり得意ではない(型指定しているように見えてもそれはあくまでコード上だけだったりも然り)場合はどうやったら?

という場合には
Apache AvroProtocol Buffersを利用するといいでしょう。

これらを使うことで各アプリケーションやデータ処理などで定義を共有できます。
下記はProtocol Buffersの例です。

syntax = "proto3";

package product;
import "google/protobuf/timestamp.proto";

option java_package = "com.github.ytake.example.protobuf";
option go_package = "github.com/ytake/example/pb";

message RegistrationAction {
  uint64 correlationId = 1;
  string storeName = 2;
  uint32 storeId = 3;
  string productName = 4;
  uint32 wholesalePrice = 5;
  uint32 priceIncludingTax = 6;
  float taxRate = 7;
  string comment = 8;
  enum EventType {
    NEW = 0;
    UPDATED = 1;
  }
  EventType event = 9;
  google.protobuf.Timestamp created = 10;
}

言語に対応したoptionなどもありますので必要に応じて足します。
AWS DMSやDebeziumなどでoutboxとしてこれらを利用する場合は下記を参照してみるといいでしょう。

medium.com

さてとこれで。。

おっと、まだ考慮しておくことがあります。
これまで述べた長い背景の中で事象には順序があるらしい、とわかっています。
たとえばこの事象ごとにQueueやTopicを分けて送信してしまうと、
送った側は時系列順に送ったつもりでも受け取り側はそうなるとは限りません。

どこかでスタックしてしまうと、時系列は想定通りにはならず簡単に順番が入れ替わってしまいます。
スーパーのレジを想像してみてください。
並んでる人が少ない列で待っていると、「あれ、あっちの方が多かったはずなのにもう捌けてる・・」
あの現象が起こります。
事象の分析や時系列を理解してメッセージ設計に含めましょう。

今回の例であれば最終的には付加情報をつけて「ごちそう」になるわけですから、
前の状態がなくとも突然「ごちそう」のメッセージが流れてきても問題ないかどうか、
時系列の判断などが可能かどうか、スナップショットと不整合が起きないかどうか、
結果整合となりますのでそちらで担保が可能で、
頭から読み出して再処理しても問題ないかどうか、などなどがあります。

上記のprotoファイルに、たとえば付加情報を含めてスナップショットとする、などなど
Protocol BuffersのOptional なども理解しておきましょう。

このあたりの設計について触れるとどんどん長くなってしまうので、今回は割愛します。
機会があればこれはこれでどこかで・・・

protoファイルを使ってGoのコードを吐き出してあげます。

# Goのプロジェクトルートから実行して./pbに吐き出すとしたら
$  protoc --go_out=./pb --go_opt=paths=source_relative ./product.proto

簡単なサンプルですが、下記のようになります。

import (
    "errors"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "github.com/ytake/example/pb"
    "golang.org/x/xerrors"
    "google.golang.org/protobuf/proto"
    "google.golang.org/protobuf/types/known/timestamppb"
    "strconv"
)

// 抜粋
    ra := pb.RegistrationAction{
        CorrelationID:     11111122223444,
        StoreName:         "ytakeキッチン",
        StoreId:           2,
        ProductName:       "めちゃくちゃおいしいカレー",
        WholesalePrice:    100,
        PriceIncludingTax: 108,
        TaxRate: 1.08,
        Comment:           "めちゃくちゃおいしい",
        Event:             pb.RegistrationAction_NEW,
        Created:           timestamppb.Now(),
    }
    tn := "topic-name"
    by, err := proto.Marshal(&ra)
    if err != nil {
        return xerrors.Errorf("error: %w", ErrProtobufMarshal)
    }
    deliveryChan := make(chan kafka.Event)
    err = p.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{
            Topic:     &tn,
            Partition: kafka.PartitionAny,
        },
        Value: by,
        Key:   []byte(strconv.Itoa(int(ra.CorrelationID))), // パーティションKey
        Headers:        []kafka.Header{{Key: "適切なヘッダーKey", Value: []byte("header values are binary")}},
    }, deliveryChan)
// 以下略
// close等していませんのでこれをコピペしても動きませんので注意

これで良さそうです。

いや、まだあります。

メッセージの定義は時の流れとともに変更されていきます。
開発していくなかで要件が変わったり、設計見直しなどがありますのでずっとこのままというわけにはいきません。

そんな時にオススメなのはSchema Registryです。

docs.confluent.io

AWSではGlue Schema Registryがあります。
(GCPは利用していないのでわかりませんがCloud Pub/Subとか?)

もちろんすべての変更に完璧に応えるというわけではなく、
どのように互換性を担保するかを指定しなければなりません。
互換性チェックのタイプとしては大まかに下記の通りです。

  • 後方互換性
  • 前方互換性
  • 完全互換性
  • 互換性チェックなし

コンシューマーから先にアップグレードするかプロデューサーから先か、など
ユースケースによって選ぶことができます。
詳細を解説すると本1冊分くらいになりますので、下記を参照してください。

docs.confluent.io

互換性の指示についてはSchema RegistryのREST APIを利用するか、
クライアントで指定できます。
Schema Registryを介してKafkaへ送信する場合は下記のように利用できます。

// 抜粋
    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "bootstrapServers"})
    if err != nil {
        return err
    }
    client, err := schemaregistry.NewClient(schemaregistry.NewConfig(url))
    if err != nil {
        return xerrors.Errorf("Failed to create schema registry client: %w", ErrSchemaRegistryClient)
    }

    /* subjectを指定して互換性を変更する場合は下記のように利用できます
    *
    *_, err = client.UpdateCompatibility("subject名", schemaregistry.Forward)
    *if err != nil {
    *  return xerrors.Errorf("Failed to change compatibility: %w", ErrSchemaRegistryCompatibility)
    *}
    */

    ser, err := protobuf.NewSerializer(client, serde.ValueSerde, protobuf.NewSerializerConfig())

    ra := pb.RegistrationAction{
        CorrelationID:     11111122223444,
        StoreName:         "ytakeキッチン",
        StoreId:           2,
        ProductName:       "めちゃくちゃおいしいカレー",
        WholesalePrice:    100,
        PriceIncludingTax: 108,
        TaxRate: 1.08,
        Comment:           "めちゃくちゃおいしい",
        Event:             pb.RegistrationAction_NEW,
        Created:           timestamppb.Now(),
    }
    tn := "topic-name"
    payload, err := ser.Serialize(tn, &ra)
    if err != nil {
        return xerrors.Errorf("error: %w", ErrProtobufMarshal)
    }
    deliveryChan := make(chan kafka.Event)
    err = p.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{
            Topic:     &tn,
            Partition: kafka.PartitionAny,
        },
        Value:   payload,
        Key:     []byte(strconv.Itoa(int(ra.CorrelationID))),
        Headers: []kafka.Header{{Key: "適切なヘッダーKey", Value: []byte("header values are binary")}},
    }, deliveryChan)
    // 以下略

Consumerも同様に実装できます。
長くなるので下記のサンプルなどを参考にしてください。

github.com

データ処理系はメッセージが格納されたTopicに対して
Kafka Connectを利用してS3やさまざまなデータストアに対して転送できます。
下記のconverterを利用して変換を行う形になります。

Kafka Connect Protobuf Converter | Confluent Hub

docs.confluent.io

と、ボリュームがえらいことになってきたので、駆け足的にここまでとなりますが、
どのように物事と向き合って実現するための方法など
両方が結びつきあうことで開発以外の領域に対しても広く視野が持てるようにもなります。

実際にはここまで複雑につくらずとも同期的な処理を利用した実装や
物理的にデータベースを共有しながらアカウントや権限で分離するなど、
シンプルに解決できる方法もありますので、
色々な角度で物事を見ながら抽斗をたくさん作っていきましょう。

採用情報

スターフェスティバルではエンジニアを絶賛採用中です。
気になる方は是非以下のページなりを見ながらインフラやデータ基盤の話、
アプリケーション開発周りの話などを聞きたい方がいましたら、カジュアルお話ししましょう!
弊社のTwitterやエンジニアにDMなどお気軽に!

stafes.notion.site