DebeziumとKafka ConnectとSnowflakeを使ってニアリアルタイムなデータパイプラインを作る

ニアリアルタイムで更新されるデータ基盤を作るために

データ基盤をはじめとするデータ処理やその辺りに興味がある、
もしくは 作らないといけない!という形にもぴったりだと思う実際にやっている方法を紹介します。

Snowflakeとは?

すでにご存知の方は次の次くらいまで飛ばしてください。

www.snowflake.com

そもそもSnowflakeとは何か、というお話です。
Webアプリケーション開発を主としているエンジニアの方には、あまりお馴染みではないものだと思いますが、
ここでいうSnowflakeとはSaasのデータプラットフォームサービスです。
データ設計のスノーフレークスキーマのことではありません。

データプラットフォームとはいえ、RDBMSとかとなんか違うの?という話ですが、
これはただのリレーショナルなデータベースではなく、
PrestoやAthenaなどを利用している方、もしくはPostgreSQLのFDWとかを使っている方は連想しやすいと思いますが、
複数のデータソースを結合することに加えて(検索などもRDBMSとS3のparquetとか、CSVとかJOINできる等)、
データガバナンスとしてのデータアクセスに関する権限管理などができる、
データレイクやデータウェアハウス、データ活用や権限などデータに関するあらゆるものを管理できるものです。

サービスが成長するとあっちこっちのプロジェクトのデータベースを結合したり、
RDBMSではない全く異なる何かとRDBMSを結合したデータを作りたい、判断できる状態にしたい、
対象が何千万件、何億とあって結合や算出がアプリケーションのコードだけでは難しい、
などそういった状態をデータ基盤で解決することも多いと思いますが、
そういった場合などでも権限管理などと共に強力にサポートしてくれるプラットフォームとなっています。

データ運用していると、データマネジメントが必要不可欠になります。
例えばある権限を持つアカウントのアクセス以外は特定のカラムをマスクするなど、
通常のデータベースのみではなかなか難しいものを解決することができたり(データを書き換えるわけではない)、 Sparkなどを利用している方はSnowflake上でシュッと加工ができたり、
タスクを使って集計処理をSnowflake上で完結したりできるものです。
BigQueryと似たようなもんだと思うといいでしょう。

ここでいうタスクとはCronを用いたいような定期的に何かをするJOBだと思っていただければ良いです。

データ基盤は基本的にすべてのデータをイミュータブルとして扱いますので、
通常は更新などはあまり用意されていないんですが、Viiewやタスクなどを組み合わせて完結できるのは非常にありがたいです。
普段からHudiを使っている方にはイメージしやすいかと思います。

hudi.apache.org

なぜイミュータブルで持つ必要があるのかというと、
会社などによって色々差異はありますが、どのような時にどんなことが起きたか、
どんなことをしたらどうなったか、を基本としてビジネスなどの意思決定が行われます。

これが何もエビデンスがなく、誰かの勘だけで決定されていたらどうなってしまうのでしょうか?
施策などを実施したあとにうまくいった、うまくいかなかった、などはどのようにして判断できるのでしょうか?

結果の数値はスプレッドシートなどで共有されるのかもしれませんが、
その細部はどのようにしてわかるのでしょうか? アプリケーションがよく利用するデータベースでは、パフォーマンスや実装の観点から
ほとんどは直近のデータが多く格納されていると思います。
うまくパーティショニングなどがされて数年保管している、というのはごく一部のアプリケーションだと思います。

ほとんどの場合においてアプリケーションとして必要なデータはあるが、
過去は追えない(例えばいつ削除されていつ戻ったのか、などもフラグでは管理できません)、
データドリブンな意思決定の観点の設計をアプリケーションに多く取り込むことは、
不確定要素が多すぎるため難しいわけです。

ある程度見越せるものはありますが、
リリースされてからどんなものがあれば判断ができるのかはほとんど後で決まります。

こうした判断はさまざまな出来事の変化の過程や、
事実を判断材料にするためデータをエビデンスとして誰もが同じ認識で判断できるようにする必要があります。

なのでイミュータブルに保管する、という選択肢しかないわけです。

が、データを必要とする領域や人によっては積み上げたものをひたすら眺めるだけでなく、
ある程度の頻度で更新されるビューの方が都合がよくなります。

そういったものを解決する方法の一つがSnowflakeというわけです。

SnowflakeはS3などを定期的に取り込んで差分更新したり(S3は不変でビューとしてのSnowflakeが変更される)、
いくつかの便利なデータパイプライン機能がありますが、
RDBMSをうまくいい感じに差分更新したりといったものは単体では難しいです。

よろしい、ならばDebeziumだ

RDBMSの変更をリアルタイムで検知してその中身のスナップショットをとるぞ!という場合は
日本でもお馴染みになってきたと思いますが、CDCを利用するのが一般的です。
もちろんPostgreSQLのようにデータベース自身が様々なデータソースと接続できる機能があれば
問題ないものもありますが、アプリケーションの観点と全く異なるデータ分析やエビデンス作りなどを吸収して
うまく設計するのは非常に難易度が高いです。
このためデータをCOPYしてどこかに持っていくことが多いと思います。
CDCはデータが変更されたタイミングで変更後と変更前を吸い上げてくれる仕組みです。

Debeziumの動かし方や細かい解説はスタフェスのメンバーが解説していたりしていますので、
参考にしてください。

zenn.dev

kaz29.hatenablog.com

debezium.io

Debeziumなどを利用しない場合は、アプルケーション側のバッチ処理でなんとかしたり、
EmbulkなどのETL/ELTツールを使って転送することもできますが、
洗い替えの方法や、パーティション設計や、転送量(コストにもなります)、
メンテナンス性などを加味して選択するといいと思います。

DebeziumはDebezium Serverなどもありますが、ここではKafka ConnectのSource Connectorとして 動作させていきます。 Kafka(or MSK)を利用している環境であればSnowflakeへの転送もSink Connectorで解決できますので、
理にかなった選択肢ではないかと思います。

データの流れ

Debezium、Kafka、Snowflakeを組み合わせた場合に流れは次のようになります。

Debeziumを通じてMySQLなどからキャプチャーされると次のようなJSONがKafkaのメッセージとして送られてきます。

{
  "schema": { 
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value", 
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source", 
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "mysql-server-1.inventory.customers.Envelope" 
  },
  "payload": { 
    "op": "c", 
    "ts_ms": 1465491411815, 
    "before": null, 
    "after": { 
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { 
      "version": "2.3.0.Final",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 0,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "thread": 7,
      "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"
    }
  }
}

これに対して、Kafka ConnectのSnowflake Sink Connectorは次の 公式ページにあるような構造でデータ転送が行われます。

docs.snowflake.com

この中で下記のものにまずは注目します。

VARIANT Type
RECORD_CONTENT Kafkaメッセージが含まれる
RECORD_METADATA メッセージに関するメタデータ(例えば、メッセージが読み取られたトピック)が含まれる

これはSnowflake Sink ConnectorがSnowflakeのどのカラム名(VARIANT型)に書き込むかということを表していて、
DebeziumのJSONにある payload がRECORD_CONTENTに対応する形となります。

つまり、DebeziumでキャプチャーされるとSnowflake側の対応するテーブルの
RECORD_CONTENTにそのままVARIANTで転送されるということです。

これでできそうな感じがしてくると思います。

もう一つ、実際に気をつけなければいけないことは、
Debeziumから送られてくるメッセージの中の payload.opです。

このopの値によってRECORD_CONTENTのどの値を利用しなければならない、というのが決定されます。

payload.op payload.before payload.after
c = create レコード作成前は状態がないためnull 作成後のデータ構造
u = update 変更前のデータ構造  現在のデータ構造
d = delete 物理削除前のデータ構造 物理削除時はデータがなくなるためnull
r = read スナップショット取得前の状態はないためnull 現在のデータ構造

readはSELECTが発行された、ではなく
データベースのスナップショットから読み込んだもので全て r で送られてきます。
スナップショット取得以降は c, u, dで送られてきます。

何かあってDebeziumから再取得すると以前c, u, dで送られてきたものもrになりますが、Topicにあるものから再送するとc, u, dのままです。
この辺りはKakfaの仕組みを理解していれば違いがわかると思います。

大雑把にいうとc, u, dの場合は payload.afterを利用する
dの場合はpayload.before を利用する

ということになります。

物理削除がある場合はこのおおまかに分けた2つの違いを意識して
Snowflakeのテーブル操作による更新を行えるようにします。

いくつか方法はありますが、
ここでは分けて考えやすい、Kafkaから送られてきたメッセージを格納しているSnowflakeのテーブルから
物理削除とそれ以外に分けた2つのViewを作り、
その2つのViewを利用してテーブルの追加更新削除タスクを作成し、
スケジュールによるニアリアルタイム更新をしていく、
という流れをつくります。

でかい実践的なデータ構造の方が面白いんですが、
解説が大変なので、ここでは下記のすごいシンプルな実際にはないテーブルにします。

CREATE TABLE tests(
  id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(25) NOT NULL
) DEFAULT CHARACTER SET=utf8mb4;

ここでは特に触れませんが、
対象の環境によってはMySQLのDATETIMEなどに対して、
ZERO DATE系の値が入っている場合(0000-00-00、0000-00-00 00:00:00など)は、
そのままの値で変換されません。
1970-01-01 などになりますので注意しておきましょう。

その前にZERO DATEはDebeziumの設定でもいくつか対応しておかなければ転送時にエラーになりますので、
設定周りで対応するか、ZERO DATE利用をやめるなどをおすすめします。

Snowflakeで事前に必要なもの

まずはKafka ConnectのSnowflake Sink Connectorが書きこめるように
ロールの設定などを事前に終わらせておきます。

-- Use a role that can create and manage roles and privileges.
USE ROLE securityadmin;

-- Create a Snowflake role with the privileges to work with the connector.
CREATE ROLE kafka_connector_role_1;

-- Grant privileges on the database.
GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1;

-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE STAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE PIPE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;

公式リファレンスにあるものですが、Snowflake Sink Connectorが
テーブルを作成したり、連携のためのPIPEやSTAGEが作成できるようにしておきます。
(事前に作成して異なる権限を割り当ててもOKです。)

Kakfa からみるとただのConnectorですが、Snowflake上ではPIPEなどを組み合わせて
連携を実現するためこの辺りの権限が必要になります。

またKafka Connectorで利用するSnowflakeのユーザーにはデフォルトの権限やWarehouseなども割り当てておきます。

ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;
-- etc

このあたりを事前に割り当てておかないと連携ができず、Kafka Connect上でStatusがFailedとなります。

Snowflake Sink Connector

基本的な設定方法は公式にありますので、こちらを参考にしてください。

docs.snowflake.com

ハマりがちなポイントだけ紹介します。
connectorの設定のうち下記のものを注目します。

    "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max":"8",
    "topics":"topic1,topic2",
    "snowflake.topic2table.map": "topic1:table1,topic2:table2",
    "buffer.count.records":"10000",
    "buffer.flush.time":"60",
    "buffer.size.bytes":"5000000",
    "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443",
    "snowflake.user.name":"jane.smith",
    "snowflake.private.key":"xyz123",
    "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
    "snowflake.database.name":"mydb",
    "snowflake.schema.name":"myschema",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",

connector.class はSnowflakeのSink Connectorを指定するのでこれ以外はありません。(独自拡張している場合は除く)
tasks.max は環境に合わせて設定してください。
topics はDebeziumのメッセージが格納されているTopicを指定します。
複数ある場合はカンマ繋ぎで指定します。
複数テーブルへの書き込みをトランザクションに合わせて確実に欲しい場合はOutboxなどを利用してください。

snowflake.topic2table.map はtopicに対応したsnowflakeのテーブルをマッピングした表現で指定します。
ここで記述するsnowflakeのテーブルのRECORD_CONTENT、RECORD_METADATAにメッセージが転送されます。

snowflake.url.name ここはsnowflakeのURLですが契約プランなどによって指定方法に多少違いがあります。
事前に確認しておきましょう。
snowflake.user.name はSink Connector用のユーザーなどを割り当てておくと良いでしょう。
(なんでもできるアカウントは基本的に作れない、作らないと思うのでセレように作りましょう)

snowflake.private.keysnowflake.user.name に割り当てられている秘密鍵を指定します。
terraformなどを使って生成している場合やツールなどで作ると多少差異はありますが、
ここで指定する秘密鍵はヘッダー・フッター、改行コードを全て取り除いて一行で指定する必要があります。

snowflake.database.namesnowflake.schema.name はSnowflakeで転送するテーブルが存在している
データベース、スキーマを指定します。
ここで指定するデータベース、スキーマ、テーブルも含め、snowflake.user.name に割り当てられている権限が
アクセスできる状態でなければいけません(デフォルトロールで含まれていること)

value.converter は転送するメッセージによって異なります。
Avroを利用している場合は com.snowflake.kafka.connector.records.SnowflakeAvroConverter になりますが、
Debezium+JSON形式のメッセージの場合は com.snowflake.kafka.connector.records.SnowflakeJsonConverter になります。
必ずデータフォーマットに合わせて指定してください(Kafka Connectを理解していないとハマります)

ここまで設定がきちんとできていると後はConnectorを起動すると転送が開始されます。

テーブルを更新するためにViewをつくる

例として使うテーブルは下記のようなメッセージでRECORD_COTENTに保存されます。

{
  "after": {
    "id": 1,
    "name": "転送テスト"
  },
  "before": null,
  "op": "c",
  "source": {
    "connector": "mysql",
    "db": "test-db",
    "file": "mysql-bin-changelog.12345",
    "gtid": null,
    "name": "raw-data",
    "pos": 1,
    "query": null,
    "row": 0,
    "sequence": null,
    "server_id": 1,
    "snapshot": "false",
    "table": "tests",
    "thread": 75448658,
    "ts_ms": 1688962410000,
    "version": "1.9.7.Final"
  },
  "transaction": null,
  "ts_ms": 1688962410471
}

opがd以外に対応するViewは次のようになります。

create or replace view HOGE_DB.RAW_DATA.TEST_IU_VIEW (
    ID,
    NAME,
    DEBEZIUM_PROCESSED_TS,
    SOURCE_PROCESSED_TS,
    SOURCE_SERVER,
    SOURCE_DB,
    SOURCE_TABLE,
    DML_OPERATOR
) as
SELECT
    record_content:"after"."id"::NUMBER as ID,
    record_content:"after"."name"::NUMBER as NAME,
    record_content:"ts_ms"::STRING::DATETIME as DEBEZIUM_PROCESSED_TS,
    record_content:"source"."ts_usec"::STRING::DATETIME as SOURCE_PROCESSED_TS,
    record_content:"source"."name"::STRING as SOURCE_SERVER,
    record_content:"source"."db"::STRING as SOURCE_DB,
    record_content:"source"."table"::STRING as SOURCE_TABLE,
    record_content:"op"::STRING as DML_OPERATOR
FROM HOGE_DB.RAW_DATA.CDC_RAW_TESTS
WHERE lower(DML_OPERATOR) in ('r', 'c', 'u');

HOGE_DB.RAW_DATA.CDC_RAW_TESTS はメッセージが格納されているテーブルだと思ってください。
Viewの名前はdとそれ以外を区別できる名前であればなんでも大丈夫です。
snowflakeのテーブルなどは大文字小文字は区別されませんが、大体大文字で表現されるのでそれに合わせた表記にしています。
小文字で表現しても問題ありません。

RECORD_CONTENTに格納される Debeziumのpayload.opは record_content:"op"::STRING as DML_OPERATOR とすることで、
値を取得することができます。
古いバージョンだとrecord_content:"payload"."op":の場合もありますので、
RECORD_COTENTを確認してください。

これでop d以外に対応したViewになります。
物理削除を使っていない環境の場合はこれだけでも可能ですが、
物理削除を使っていて かつテーブルからは除外したい場合は、
この後のTaskで追加更新削除を表現できるように記述する必要があります。

続いて op dに対応するViewです。
先述した対応表に合わせてRECORD_CONTENTからの取得方法を変えてあげます。

create or replace view HOGE_DB.RAW_DATA.TEST_D_VIEW (
    ID,
    NAME,
    DEBEZIUM_PROCESSED_TS,
    SOURCE_PROCESSED_TS,
    SOURCE_SERVER,
    SOURCE_DB,
    SOURCE_TABLE,
    DML_OPERATOR
) as
SELECT
    record_content:"before"."id"::NUMBER as ID,
    record_content:"before"."name"::NUMBER as NAME,
    record_content:"ts_ms"::STRING::DATETIME as DEBEZIUM_PROCESSED_TS,
    record_content:"source"."ts_usec"::STRING::DATETIME as SOURCE_PROCESSED_TS,
    record_content:"source"."name"::STRING as SOURCE_SERVER,
    record_content:"source"."db"::STRING as SOURCE_DB,
    record_content:"source"."table"::STRING as SOURCE_TABLE,
    record_content:"op"::STRING as DML_OPERATOR
FROM HOGE_DB.RAW_DATA.CDC_RAW_TESTS
WHERE lower(DML_OPERATOR) = 'd';

after指定だったものをbeforeにして、Viewを別名にして識別できるようにしました。
これで追加更新と削除を分けてニアリアルタイムで更新するタスクが作れるようになります。

更新対象のテーブル

更新対象にするテーブルはCDC対象のテーブルと同じ構造にしておきましょう。

create or replace TABLE HOGE_DB.STAGING_DATA.TESTS (
    ID NUMBER(38,0),
    NAME VARCHAR(100)
)COMMENT='ステージングデータ';

シンプルこの上ない形ですね。

このテーブルに対して先に作ったViewのデータを元に操作するタスクを作ります。
タスク実行用のユーザーやロールを作っておくのがおすすめです。
タスク実行には、それぞれのViewに対してのSELECT (FUTURE SELECTとかにしておくのもいいでしょう)、
ここでいうHOGE_DB.STAGING_DATA.TESTS に対する SELECT、INSERT、UPDATE、DELETE
タスク実行ロールにOWNERSHIPを割り当てる、などがあります。

タスク作り

タスクとしては、HOGE_DB.RAW_DATA.TEST_IU_VIEW、HOGE_DB.RAW_DATA.TEST_D_VIEWのレコードを使って
操作するだけですが、ここで意識しておかなければいけないのは時系列です。
Debeziumから送られてくるメッセージは時系列が担保されていますので、
送られてきた通りに実行すれば問題ありませんが、追加更新と削除は判断するカラムが異なります。
そのために2つのViewに分けたのはわかると思いますが、
この2つに分かれたものそれぞれを独立したタスクにすると、どうなってしまうのでしょうか。

追加更新は時系列順にViewが並んでいるので大きな問題はありませんが、
独立してしまった削除はどのタイミングでタスク実行されるのが確実なのでしょうか。
ID 1が追加され、ID 1のNAMEが更新、ID 1を削除し、再びID 1を作成した場合、
削除以外のViewでは追加、更新、追加とならびます。最後の追加は重複したレコードが作成されることになります。
削除が保管されているViewにはID 1の削除が保管されますので、タスク実行のタイミングによっては
重複して作られたレコード両方を削除するかもしれませんし、
先に削除が動いてしまうかもしれません。
単純に二つのタスクに分けてそれぞれ実行するのはリスクがあるわけです。

解決方法はいくつかありますが、ここでは一番簡単な二つに分けたテーブルをくっつけて並び替えてあげる、ことで
時系列もデータの状態も正しく表現するようにしていきます。

更新対象になる HOGE_DB.STAGING_DATA.TESTS に対して
データの有無を判断して操作するにはMERGEを利用します。

タスクで実行されるものは次の通りです。

MERGE INTO HOGE_DB.STAGING_DATA.TESTS as tgt
USING (
    SELECT * FROM (
        SELECT *,
            ROW_NUMBER() over (
                PARTITION BY ID
                ORDER BY DEBEZIUM_PROCESSED_TS DESC
        ) as row_num
    FROM (
        SELECT
            ID,
            NAME,
            DEBEZIUM_PROCESSED_TS,
            DML_OPERATOR
        FROM HOGE_DB.RAW_DATA.TEST_IU_VIEW
        UNION ALL
        SELECT
            ID,
            NAME,
            DEBEZIUM_PROCESSED_TS,
            DML_OPERATOR
        FROM HOGE_DB.RAW_DATA.TEST_D_VIEW
        ) as u
    ) as t1 WHERE t1.row_num = 1
) as src
ON tgt.ID = src.ID
WHEN MATCHED AND src.DML_OPERATOR = 'd' THEN DELETE
WHEN MATCHED AND src.DML_OPERATOR = 'u' THEN
UPDATE SET
    tgt.ID = src.ID,
    tgt.NAME = src.NAME
WHEN NOT MATCHED AND src.DML_OPERATOR IN ('c', 'r', 'u') THEN
INSERT (
    ID,
    NAME
)
VALUES  (
    src.ID,
    src.NAME
);

二つのViewをUNION ALLで結合し、かつ重複してはいけないIDでパーティション、
時系列で最新が並ぶようにDEBEZIUMで処理した最新のものを取得するようにします。
最新になったもので、DML_OPERATORの値で追加更新か削除が判定できるようになるので、
その条件によって追加か更新か削除かを判定します。
つまり結果整合の状態になるようにするわけです。
これでテーブルに反映すべき最新に状態がわかるようになります。

このクエリをタスクとして、このブログのタイトルがニアリアルタイムなので5分おきに最新の状態に更新する、
というものにします。

CREATE TASK TESTS_VIEW_MERGE
WAREHOUSE = HOGE_WH
SCHEDULE = '5 MINUTE'
AS MERGE INTO HOGE_DB.STAGING_DATA.TESTS as tgt
USING (
    SELECT * FROM (
        SELECT *,
            ROW_NUMBER() over (
                PARTITION BY ID
                ORDER BY DEBEZIUM_PROCESSED_TS DESC
        ) as row_num
    FROM (
        SELECT
            ID,
            NAME,
            DEBEZIUM_PROCESSED_TS,
            DML_OPERATOR
        FROM HOGE_DB.RAW_DATA.TEST_IU_VIEW
        UNION ALL
        SELECT
            ID,
            NAME,
            DEBEZIUM_PROCESSED_TS,
            DML_OPERATOR
        FROM HOGE_DB.RAW_DATA.TEST_D_VIEW
        ) as u
    ) as t1 WHERE t1.row_num = 1
) as src
ON tgt.ID = src.ID
WHEN MATCHED AND src.DML_OPERATOR = 'd' THEN DELETE
WHEN MATCHED AND src.DML_OPERATOR = 'u' THEN
UPDATE SET
    tgt.ID = src.ID,
    tgt.NAME = src.NAME
WHEN NOT MATCHED AND src.DML_OPERATOR IN ('c', 'r', 'u') THEN
INSERT (
    ID,
    NAME
)
VALUES  (
    src.ID,
    src.NAME
);

ここでは5分おきに実行されるようにしています。
このタスクを実行するロールからRESUMEしてあげれば指定したSCHEDULE通りに実行されます。

SCHEDULEの指定方法はいくつかあり、
慣れ親しんだCRON形式で指定することもできます(SCHEDULE = 'USING CRON 0 */1 * * * Asia/Tokyo' とか)。

docs.snowflake.com

タスク実行時のエラーなどはAWSやGCP、Azureなどを介して通知することもできますので、
環境に合わせて設定するといいでしょう。

これ以外にも実現方法はいくつかありますが、
今回はコードすら書かずにDebeziumとKafka Connect、Snowflakeを使って設定のみで更新される
データパイプラインの作り方を紹介しました。

さいごに

ここではある程度リアルタイムにデータ更新したい、なんか実装とかしたくない、
Snowflake上でなんとかうまいことしたい、件数が多すぎてアプリケーションのスクリプトでやるにはちょっと、
みたいな時を背景としたものです。
規模やそもそものミドルウェアなどの利用状況、組織や文化によっては
良い選択肢ではないことも多くありますので、
ここにあるものをそっくりそのままではなく皆さんの環境に合わせて最適化してみてください。