Kafka+Spark Structured Streamingとマイクロサービスアーキテクチャ

Kafka+Spark Structured StreamingとProtocol Buffers

マイクロサービスアーキテクチャというと、
適切なドメインモデルを参考にシステムを切り出して、
小さくしてチームを分けてAPI開発すればいいんだな、と思う方も多いかもしれませんが、
実はこのまま鵜呑みにしてやってしまうと、事業をうまく支えられるかどうか厳しい面が多くあります。

ビジネスを進める上でアプリケーションが動いていることは当たり前ですが、
事業を拡大したり推進していくには、エビデンスとなるデータが必要不可欠です。

つまりAPI連携をしてうまく動いてるように見えても、
あくまでアプリケーションレベルでの動きだけで、
マーケティングやさまざまなエビデンスになるデータを取得することは困難である
という状況になってしまうことが多々あります。

愚直にこれに対応しようとなると、数多くのバッチ処理、
バッチ処理の依存を解決するための仕組みづくりや、
深夜を超えて処理されるバッチ処理についての問題解決などなど、
実は最初に目をつぶって後回しにして、数年後ににっちもさっちもどうにもいかなくなる、
というのがデータ処理関連では非常によくある問題であったりします。
そもそもグローバルになると深夜という概念がなくなりますので、
なんでも日本時間の夜中に定期実行させる、というのが難しくなりますね。

事業をうまく支えるには マイクロサービスアーキテクチャというキーワードが注目されて数年経ちますが、
我々が目指すのはマイクロサービスアーキテクチャではなくて、
リアクティブシステムでなくてはいけません。
*リアクティブシステムはリアクティブ宣言(Reactive Manifesto)で定義されていますので、
詳しくはそちらを参照ください。

www.reactivemanifesto.org

リアクティブシステムを目指していけば、
自然とマイクロサービスアーキテクチャになっていくというのがなんとなくわかるかと思います。
マイクロサービスアーキテクチャが主で考えてしまうと、
GraphQLなどのBFF的な解決策やAPIそのものについての思考が優先されてしまうため、
リアクティブシステムの文脈はあまり見えてこないかと思います。

ということで、今回はリアクティブシステムを支えることができる
メッセージブローカーとしても強力なApache Kafkaを中心に、
Spark Structured Streamingと、アプリケーション関連で利用ケースも多いProtocol Buffersを扱う例の解説です。

なんでメッセージブローカー?

そもそもマイクロサービスアーキテクチャ等でKafkaのようなメッセージブローカーを使え、
と多くの実践者が発信するのはREST APIやGraphQLなどで通信を行う場合、
副作用のないReadを扱うには非常に強力で、むしろこれ以外の選択を取るのは難しいわけですが、
副作用のあるPOST、PUT、PATCH、DELETEのようなものへのアプローチであったりします。

単一のアプリケーション観点でみるとこれらはただのHTTPメソッドだったりするかもしれませんが、
システム全体でドメインモデルなどと照らし合わせてみると、
多くがドメインイベントだったり(イベントストーミングなどで導き出したものなど)がほとんどのはずです。
つまりWebアプリケーションの裏にあるデータの流れ(ストリーム)の観点では、
ビジネスを推進して行ったり、エビデンスとしていく信頼できるデータは
このドメインイベントのスナップショットを活用していくことであったりします。

もちろん永続されたRDBMS上のデータもそうですが、
ほとんどは副作用があるデータとRDBMSやS3やその他たくさんのデータを組み合わせなければなりません。

ここの解決策に、例えばAPIにPOSTリクエストを送った後に、
分析とか機械学習とか、マーケティング等に必要なデータを他のストレージに入れていきますね、
という処理ばかりを作っていくと全て同期的に処理しなければならなくなりますし、
なにしろn層コミット問題をAPI側で作り込まねばなりません。
データ連携がうまくいかないのを検知してSlackなどで通知して手動対応、もあるかもしれませんが、
アプリケーションが使われれば使われるほど手動対応がかなりの割合を占めていきます。
これではマイクロサービスアーキテクチャ的な側面だけを導入しても、
数年後にはうまく機能しないシステムと評価されてしまうでしょう。(それもひとつの経験ですが!)

この状態の場合は、DebeziumやAWSのDMSなどを使ってCDCを導入して解決していくしか無くなってしまうわけですが、
これにしても結局Apache KafkaやAmazon Kinesisといった
大規模な分散処理に対応したメッセージブローカーを活用することとなります。 (Akkaなどで解決するのももちろんあります)

今回はタイトルにもある通り、Apache Kafkaです。
AWS上ではMSKとして提供されていますし、オンプレでもクラウドでもConfluentなどで構築することができます。

kafka.apache.org

www.confluent.io

KafkaとProtocol Buffers

アプリケーション間でやり取りをするにあたって、すべてのシーンで型の制限がないJSONを使い続けるのはやはり難しいです。
ということでProtocol Buffersを利用することも多くあるでしょう。

developers.google.com

今回は例として、単純なユーザー作成に関するものを扱うものを使うとします。

syntax = "proto3";

package sample;
import "google/protobuf/timestamp.proto";

option java_package = "com.github.acme.sample.combine";
option go_package = "github.com/acme/sample/pbd";

message UserAction {
  uint64 correlationId = 1;
  enum EventType {
    CREATED = 0;
    DELETED = 1;
  }
  EventType event = 2;
  uint32 userId = 3;
  string name = 4;
  google.protobuf.Timestamp created = 5;
}

副作用が大事といいつつUPDATEがないのはただの例だからです。

ただのprotoファイルですがマイクロサービスアーキテクチャだったり、データ処理観点が混ざっているので
APIなどのWebに近いアプリケーションはGo、データ処理はSpark+Scalaでやるという体です。

Apache Kafkaとの組み合わせでよく質問されますが、
Protocol Buffersはただのバイナリとして扱われますので、Kafkaでも問題なく扱えます。

Goでは下記のようにして扱えます。

protoファイルで定義した通りに詰め込みます。

package message

import (
    pbd "github.com/acme/sample/publisher/pbdef"
    "google.golang.org/protobuf/types/known/timestamppb"
    "math/rand"
    "time"
)

func makeTimestampForProto() *timestamppb.Timestamp {
    return timestamppb.Now()
}

func ExampleMessages() []*pbd.UserAction {
    var sua []*pbd.UserAction
    return append(sua, &pbd.UserAction{
        UserId:  uint32(1),
        Event:   pbd.UserAction_CREATED,
        Name:    "aaa1",
        Created: makeTimestampForProto(),
    }, &pbd.UserAction{
        UserId:  uint32(2),
        Event:   pbd.UserAction_CREATED,
        Name:    "aaa2",
        Created: makeTimestampForProto(),
    })
}

Kafkaへの送信はこんな感じです。

(複数のtopicへのメッセージをうまく扱うにはインターフェースなどにしておくといいでしょう)

package message

import "github.com/confluentinc/confluent-kafka-go/kafka"

// Publisher interface
type Publisher interface {
    Client() *kafka.Producer
    RetrieveTopic() *string
    RetrievePartition() int32
}
package pub

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "github.com/acme/sample/publisher/message"
)

// Client Kafka client struct
type Client struct {
    Producer *kafka.Producer
}

type Messenger struct {
    publisher message.Publisher
}

// RequestParameter for publisher
type RequestParameter struct {
    Byte []byte
    Key  []byte
}

// NewProducer create producer
func NewProducer(broker string) (*Client, error) {
    p, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers":        broker,
        "api.version.request":      "false",
        "message.timeout.ms":       "300000",
        "socket.timeout.ms":        "30000",
        "message.send.max.retries": "5",
    })
    return &Client{Producer: p}, err
}

// Publish to kafka bootstrap server
func (c *Messenger) Publish(parameter RequestParameter) error {
    deliveryChan := make(chan kafka.Event)
    km := &kafka.Message{
        TopicPartition: kafka.TopicPartition{
            Topic:     c.publisher.RetrieveTopic(),
            Partition: c.publisher.RetrievePartition(),
        },
        Value: parameter.Byte,
    }
    if len(parameter.Key) != 0 {
        km.Key = parameter.Key
    }
    err := c.publisher.Client().Produce(km, deliveryChan)
    if err != nil {
        return err
    }
    e := <-deliveryChan
    m := e.(*kafka.Message)
    if m.TopicPartition.Error != nil {
        fmt.Printf("failed to deliver message: %v\n",
            m.TopicPartition)
    } else {
        fmt.Printf("delivered to topic %s [%d] at offset %v\n",
            *m.TopicPartition.Topic,
            m.TopicPartition.Partition,
            m.TopicPartition.Offset)
    }
    return nil
}

// SampleTopicClient for no key
type SampleTopicClient struct {
    kafka     *Client
    topic     *string
}

// NewSampleTopicClient client for no key example
func NewSampleTopicClient(topic string, c *Client) *Messenger {
    return &Messenger{publisher: &SampleTopicClient{kafka: c, topic: &topic}}
}

func (c SampleTopicClient) Client() *kafka.Producer {
    return c.kafka.Producer
}

func (c SampleTopicClient) RetrieveTopic() *string {
    return c.topic
}

func (c SampleTopicClient) RetrievePartition() int32 {
    return kafka.PartitionAny
}

このような感じで送信すると、Kafkaにはvalueはただのバイナリとして保管されます。
アプリケーション連携でAPI連携で使いたくなりますが、
これをドメインイベント、メッセージとして連携していきます。
Kafkaを使うのでサブスクライブするコンシューマは、
protoファイルを使って必要な値に変換するだけとなりますので、特に変わったことはありません。

Kafkaのtopicはgroup.idが分かれていれば並行処理をいくつも増やせることができますし、
partitionを分割することでround robin的に負荷分散をすることもできます。 Kafka Connectをそのまま使えば何も実装せずにS3やRDBMS、Elasticsearchや他のデータストレージへ保管できますので、
前述のCDC的なこともドメインイベント、メッセージを主としてできます。
(DynamoDB StreamsやDMSの場合は書き込んだタイミングでメッセージ送信になります。)

ECのカートのようなものへのアプローチとしてはまだ足りませんが、
だいたいのアプリケーションの副作用的な動作はこれでイベントを正として保管しつつ(イベントソーシングとしても)、
後続の処理は非同期で、リアクティブシステムの要素を取り込んで独立してきます。
CQRSだったりに繋がっていく話ですね。

ではデータ処理観点ではどうでしょう。

ScalaPB

データ処理観点ではメッセージブローカーに到達した時に、
何かのデータと組み合わせて拡張したようなデータソースをつくることはよくあります。
ウインドウ集計などもそうです。

マイクロサービスアーキテクチャなどで分割されたアプリケーションであっても、
メッセージブローカーでドメインイベントがスナップショット的に送信されていれば
こうしたことは容易です。
最初の頃にやってしまいちなのは、ドメインイベント送信だ!といいながら中身は
パラメータ化されていて、受け取った側はそれをもとに副問合せしてほしい、というパターン。
これにしてしまうと、他の処理が先に動いていたり、partition分割で他のコンシューマが処理していたりすると、
後続の処理全体がファントムリードやダーティリード的な状態に陥りやすくなりますので、
やらないように注意してください!コマンドバス的に信号だけを送るようなアプローチは場合はまた別ですが。

さてさて、ScalaではScalaPBを使ってProtocol Buffersを扱うことが多くあります。

scalapb.github.io

project/scalapb.sbtなどを作って下記のものを記述します。

addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.3")
libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.3"

build.sbt にも同様に追記していくといいでしょう。
sparkと組み合わせるには以下のような具合です。

libraryDependencies ++= Seq(
  "org.scala-lang" % "scala-library" % "2.12.10",
  "org.apache.spark" %% "spark-core" % "3.0.3" % "provided",
  "org.apache.spark" %% "spark-sql" % "3.0.3" % "provided",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.0.3",
  "com.thesamet.scalapb" %% "sparksql-scalapb" % "0.11.0",
  "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf",
  "junit" % "junit" % "4.13" % Test,
  "org.scalatest" %% "scalatest" % "3.2.7" % Test
)

assembly / assemblyShadeRules := Seq(
  ShadeRule.rename("com.google.protobuf.**" -> "shadeproto.@1").inAll,
  ShadeRule.rename("scala.collection.compat.**" -> "scalacompat.@1").inAll
)

Compile / PB.targets := Seq(
  scalapb.gen() -> (Compile / sourceManaged).value / "scalapb"
)
Compile / PB.protoSources += file("../protobuf")

ここで注意なんですが、最新のSpark 3.2.0だとScalaPB自体で対応が追いついていないものがあるので、
うまく動かなくなりますので、2022/1/2現在では 3.0.3 を使うようにしておきましょう。

これで、protoファイルに記述した option java_package = "com.github.acme.sample.combine"; がパッケージとして
そのまま利用できます。

Kafka+Spark Structured Streaming(with ScalaPB)

spark.apache.org

簡単なStream処理だけであればKafka Streamsだけでもできますが、
他のデータをくっつけて集計したり、ほかのストレージに書き出したりは
Apache Sparkに任せた方がより多くのことができます。

SparkSessionは特に変わったものはありません・・。

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @param appName
 * @param checkPoint
 */
class SparkApplication(appName: String, checkPoint: String) {

  protected def context(): SparkContext = {
    val conf = new SparkConf().setAppName(appName)
    conf.set("spark.sql.session.timeZone", "Asia/Tokyo")
    new SparkContext(conf)
  }

  def createSession(): SparkSession = {
    val spark = context()
    spark.setCheckpointDir(checkPoint)
    spark.setLogLevel("WARN")
    SparkSession.builder
      .appName(appName)
      .getOrCreate
  }
}

ここではKafkaを接続してStructured StreamingでSelectだけする例です。
Spark Streamingでもできますが、
Spark SQL形式で記述できたりするのでこちらから入っていく方が簡単かなと思います。

Kafkaへの接続はreadStreamでformatにKafkaを指定するだけです。
DataFrameになりますので扱いも簡単に!

import org.apache.spark.sql.{DataFrame, SparkSession}

object KafkaDataFrame {

  def make(ss: SparkSession, bootstrapServers: String, topic: String): DataFrame = {
    ss.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topic)
      .option("startingOffsets", "earliest")
      .load()
  }
}

SparkSessionを生成して、Kafkaと接続してストリーム処理はこれだけで始められます。

import com.github.acme.sample.combine.definition.UserAction
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, TimestampType}
import scalapb.spark.Implicits._
import scalapb.spark.ProtoSQL

import java.util.Properties

object StructuredStreamingRunner extends App {

  if (args.length < 1) {
    throw new IllegalArgumentException(
      "This program takes one argument: the path to an environment configuration file.")
  }
  val ss = new SparkApplication(getClass.getName, "/tmp/streaming_runner").createSession()

  val prop = new Properties
  prop.load(new java.io.FileInputStream(args(0)))
  val df = KafkaDataFrame.make(ss, "127.0.0.1:9092とかkafkaのbootstrap.servers", "topicの名前")
  // 省略
}

簡単!
このままの場合は、Spark内でのスキーマは

    root
    |-- key: binary (nullable = true)
    |-- value: binary (nullable = true)
    |-- topic: string (nullable = true)
    |-- partition: integer (nullable = true)
    |-- offset: long (nullable = true)
    |-- timestamp: timestamp (nullable = true)
    |-- timestampType: integer (nullable = true)

という定義になります。
valueがProtocol Buffersでバイナリになったものです。
これを取り出すためにScalaPBを介して分解します。 UDF (User Defined Function) を介してバイナリをStructに変換できます。
UDF自体は使い勝手がいいわけですが、Sparkは分散処理が基本なので多用すると
パフォーマンス劣化につながるので注意してください。
JSONなどはStructTypeでschemaを定義して
org.apache.spark.sql.functionsfrom_jsonなどを使うだけです。

val parseUserAction = ProtoSQL.udf { bytes: Array[Byte] => UserAction.parseFrom(bytes) }

このudfを適用すると、下記の通りprotoファイルで記述した通りの定義になります。

 |-- value: struct (nullable = false)
 |    |-- correlationId: long (nullable = true)
 |    |-- event: string (nullable = true)
 |    |-- userId: integer (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- created: struct (nullable = true)
 |    |    |-- seconds: long (nullable = true)
 |    |    |-- nanos: integer (nullable = true)

このままでも value.event のようにアクセスすることもできますが、他のDataFrameとJOINなどをする時にちょっと面倒です。
わかりやすく最適化して、value自体は削除してもいいでしょう。

  df
    .select(col("key"), col("value"))
    .withColumn("key", col("key").cast(StringType))
    .withColumn("value", parseUserAction(col("value")))
    .withColumn("correlation_id", col("value.correlationId").cast(LongType))
    .withColumn("event", col("value.event").cast(StringType))
    .withColumn("user_id", col("value.userId").cast(IntegerType))
    .withColumn("name", col("value.name").cast(StringType))
    .withColumn("created", col("value.created.seconds").cast(LongType))
    .withColumn("created_timestamp", col("value.created.seconds").cast(TimestampType))
    // ウインドウ集計したい場合などは ウォーターマークを付与するなどしましょう
    // .withWatermark("created_timestamp", "10 minutes")
    .drop("value")
    .createTempView("events")

例えば上記のようにすると、Spark SQL内ではeventsがテーブル名となりますので、
慣れ親しんだSQLで記述が自由にできるようになります。
keyはKafkaでpartitionが複数ある場合の判定などにも使うもので、この例ではkeyとvalueをDataFrameから取り出して、
そのDataFrameを基準にカラムなどの定義をする処理になっています。
必要なカラムを直感的に操作できるように定義した結果は下記のとおりです。

 |-- key: string (nullable = true)
 |-- correlation_id: long (nullable = true)
 |-- event: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- created: long (nullable = true)
 |-- created_timestamp: timestamp (nullable = true)

これがeventsテーブルとなるので、いかようにも分析なり、データ結合なりウインドウ集計などができます。
最後に下記を追加することで、指定したKafkaのトピックにデータが受信されると、
サブスクライブの処理的な感覚でマイクロバッチ的にストリーム処理されていくようになります。

  ss
    .sql("SELECT * FROM events")
    .writeStream
    .format("console")
    .start()
    .awaitTermination()

ドメインイベントなどをメッセージとしてアプリケーションに適用することで、
Protocol Buffersを使ったリアクティブシステム・マイクロサービスアーキテクチャ的なアプリケーションに対しても
データ観点で重要なスナップショットを活用してさまざまなビジネスや、もちろん他アプリケーションへ繋げることができます。

手法ばかりに注目していると、こうしたデータ処理やその先のビジネス的な戦略など、
裏側の世界にたいして有用でないものにもなり得ますので、
是非メッセージ・イベント駆動などに触れてみていただけるといいかなと思います。

Apache Kafka or Amazon Kinesis + Apache Sparkの組み合わせはまだしらばく定番かと思いますので、
これをきっかけに取り組んでもらえると仲間が増えて嬉しいです!