Red Hat Integration – Change Data Capture Tech Preview

Kenta Kosugi
26 min readDec 4, 2019

--

Change Data Capture(CDC)とは

Cange Data Capture(以降 CDC) はあるデータベースに起こったイベントをストリームを使用して他のデータベースやソフトウェア(例えば Elastic Search)に伝播させ、データベースの反映、インデックスの更新など様々なアクションを取ることができます。

Debezium

Debezium は CDC を実現する Open Source プロダクトです。Open Source の Apach Kafka の Connector として開発されています。Apach Kafka Connector を利用するということはイベントに利用するストリーミングプロダクトは Apach Kafka になります。

Red Hat Integration 2019–12 で Debezium を UpStream とする Change Data Capture が Tech Preview となりました。CDC は Apache Kafka のエンタープライズ版である AMQ Streams を利用しますが、CDC を利用したい場合は Red Hat Integration のサブスクリプションが必要になります(Red Hat AMQ のサブスクリプションではない点に注意が必要です)。

Debezium のバージョンは 1.0、使用できる Connector は以下の 4 つです。

  1. MySQL Connector
  2. PostgreSQL Connector
  3. MongoDB Connector
  4. SQL Server Connector

今回は CDC のユースケースをいくつか紹介し、OpenShift 上で CDC の Tech Preview をためしてみたいと思います。

Debezium 0.8 の記事が今回試してみる内容も含めて詳細に記述されていますので、併せてご参照ください。

CDC は一体何に使えるのか — ユースケース

データベースのレプリケーション

一番単純なケースでは、データベースへ反映されたデータを解析用のデータベースに複製したり、参照用のスケール可能な Infinispan のような NoSQL(Red Hat JBoss Data Grid)にデータを反映させるなどのケースが考えれます。

データの複製

マルチプルライトを避ける

アプリケーションが複数のデータソースに対する更新処理を実施する必要があると、考慮すべき項目(どこか一つがコケたらどうするのか等)が途端に複雑になります。更新処理の対象が RDBMS だけであれば XA を使用してロールバックさせるなどの処理が思いつきますが、Elastic Search や Cache などの RDBMS ではないソフトウェアに対しては使えません。

本来の業務とは関係ない処理をアプリケーションにごりごり記述する必要が出てきます。

CDC を使用しない場合

CDC を利用すればシンプルです。更新対象のデータソースを RDBMS 1つに限定し、他のアプリケーションはそのデータソースへの変更をイベントとして受け取って処理をすればよいことになります。

CDCを使った場合

金融機関などの Audit Log

内部統制目的やセキュリティインシデント防止のため、データベースに対する操作・トランザクションのログを CDC を使用して簡単に保存できるようになります。従来のアプリケーションに手を入れる必要はありません。こうしたログを AMQ Streams(Apache Kafka) に溜めることによって、将来 AI/ML を適用し、犯罪の早期発見にも繋げることが可能です。

AuditLog

CQRS

ドメイン駆動開発をご存知でしょうか。ドメイン駆動開発における CQRS に CDC + Data Virtualization を適用することができます(Data Virtualization を用いない方法も可能です)。

CQRS は Command Query Responsibility Segregation の略です。ドメイン駆動開発をここで語ってしまうとそれだけで数回分のブログ量になってしまいますので簡単に言うと、更新系の Command 処理と参照系の Query 処理は要件が異なるので、責務を分離しましょうということです。

VDB と記載されている部分が Data Virtualization による仮想データベースを示しています。更新処理を RDBMS に対して実施、RDBMS に発生したイベントを Event Handler が NoSQL のようなスケール可能なものに反映させます。

ただし、更新処理系と参照処理系に同一データベースを用いる場合のようにデータベースにロックをかけて整合性をとるような ACID 特性ではなく結果整合性になる点に注意が必要です。

※ CQRS は必ずしもデータベースを分ける必要はありません。詳しくは以下の記事をご参照ください。

CDC Tech Preview を試す

では CDC をためしてみましょう。全体の手順は以下の通りとなります。

  1. OpenShift で CDC の対象となる MySQL のデプロイ
  2. OpenShift へ AMQ Streams Operator のインストール
  3. AMQ Streams Operator を利用した Broker/Zookeeper の作成、Kafka Connect S2I の作成と各 Connector(MySQL/Postgresql/MongoDB)の登録
  4. MySQL Connector を MySQL に接続
  5. MySQL に変更を発生させてイベントをキャプチャ

Tech Preview の前提

使用する OpenShift は 4.2 を想定しています。

oc コマンドで事前に OpenShift にログインしておいてください。また、今回 OpenShift 上に予め「debezium」プロジェクトを作成し、このプロジェクトを対象に操作を進めます。

1. OpenShift へ CDC の対象となる MySQL のデプロイ

以下のコマンドを実行して MySQL をデプロイします。今回はサンプルなので Secrets は使っていません。ユーザー名やパスワードが直打ちされている点に注意です。

$ oc new-app --name=mysql debezium/example-mysql:1.0
// 略
$ oc set env dc/mysql MYSQL_ROOT_PASSWORD=debezium MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpw

2. OpenShift へ AMQ Streams Operator のインストール

管理者権限で OpenShift にログインし、「Operator Hub」から 「AMQ Streams Operator」をインストールします。

Operator Hub

そのまま手順を進めていき、対象のプロジェクトを「debezium」プロジェクトにしてインストールを完了させてください。

続いて、Red Hat のコンテナレジストリである registry.redhat.io へアクセスできるユーザー(Red Hat Customer Portal にログイン可能なユーザー)を登録し、AMQ Streams Operator にリンクさせます。

$ oc create secret docker-registry amqstremas-pull-secret \
--docker-server=registry.redhat.io \
--docker-username=kkosugi \
--docker-password="password" \
--docker-email=kkosugi@redhat.com
secret/amqstremas-pull-secret created
$ oc secrets link strimzi-cluster-operator amqstremas-pull-secret --for=pull

3. AMQ Streams Operator を使用して Broker / Kafka Connect S2I の作成と Debezium Connector の登録

以下のコマンドを実行して Kafka Broker(x3) と Zookeeper(x3) を起動します。下の方の EOF までコピー & ペーストしてください。

$ cat << EOF | oc apply -f -
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: my-cluster
namespace: debezium
spec:
kafka:
version: 2.3.0
replicas: 3
listeners:
plain: {}
tls: {}
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
log.message.format.version: '2.3'
storage:
type: ephemeral
zookeeper:
replicas: 3
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
EOF
kafka.kafka.strimzi.io/my-cluster created

続いて Kafka Connect S2I を作成します。

$ cat << EOF | oc apply -f -
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnectS2I
metadata:
name: my-connect-cluster
namespace: debezium
spec:
version: 2.3.0
replicas: 1
bootstrapServers: 'my-cluster-kafka-bootstrap:9093'
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
EOF
kafkaconnects2i.kafka.strimzi.io/my-connect-cluster created

続いて「MySQL Connector」「PostgreSQL Connector」「MongoDB Connector」をダウンロードして展開し、以下のディレクトリ構成で保存します。今回使用するのは MySQL Connector のみなのでそれだけダウンロードするのでも大丈夫です。

$ tree ./plugins plugins
├── debezium-connector-mongodb
│ ├── CHANGELOG.md
│ ├── CONTRIBUTE.md
│ ├── COPYRIGHT.txt
│ ├── LICENSE-3rd-PARTIES.txt
│ ├── LICENSE.txt
│ ├── README.md
│ ├── bson-3.10.1.redhat-00001.jar
│ ├── debezium-connector-mongodb-1.0.0.Beta2-redhat-00001.jar
│ ├── debezium-core-1.0.0.Beta2-redhat-00001.jar
│ ├── mongodb-driver-3.10.1.redhat-00001.jar
│ ├── mongodb-driver-core-3.10.1.redhat-00001.jar
│ └── util-3.10.1.redhat-00001.jar
├── debezium-connector-mysql
│ ├── CHANGELOG.md
│ ├── CONTRIBUTE.md
│ ├── COPYRIGHT.txt
│ ├── LICENSE-3rd-PARTIES.txt
│ ├── LICENSE.txt
│ ├── README.md
│ ├── antlr4-runtime-4.7.0.redhat-00013.jar
│ ├── debezium-connector-mysql-1.0.0.Beta2-redhat-00001.jar
│ ├── debezium-core-1.0.0.Beta2-redhat-00001.jar
│ ├── debezium-ddl-parser-1.0.0.Beta2-redhat-00001.jar
│ ├── mysql-binlog-connector-java-0.19.1.redhat-00002.jar
│ └── mysql-connector-java-8.0.16.redhat-00001.jar
└── debezium-connector-postgres
├── CHANGELOG.md
├── CONTRIBUTE.md
├── COPYRIGHT.txt
├── LICENSE-3rd-PARTIES.txt
├── LICENSE.txt
├── README.md
├── debezium-connector-postgres-1.0.0.Beta2-redhat-00001.jar
├── debezium-core-1.0.0.Beta2-redhat-00001.jar
├── postgresql-42.2.8.redhat-00001.jar
└── protobuf-java-3.8.0.redhat-00001.jar

ダウンロードした Connector 群を Kafka Connector S2I に登録します。

$ oc start-build my-connect-cluster-connect --from-dir=./plugins/
Uploading directory "plugins" as binary input for the build ...
...
Uploading finished
build.build.openshift.io/my-connect-cluster-connect-3 started

oc get pods -w コマンドを実行して Pod の STATUS が Running になるまで待ちましょう。Running になったら以下のコマンドを実行して、MongoDB Connector、PostgreSQL Connector、MySQL Connector が登録されていることを確認します。

$ oc exec -c kafka my-cluster-kafka-0 -- curl -s http://my-connect-cluster-connect-api:8083/connector-plugins
[{"class":"io.debezium.connector.mongodb.MongoDbConnector","type":"source","version":"1.0.0.Beta2-redhat-00001"},{"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.0.0.Beta2-redhat-00001"},{"class":"io.debezium.connector.postgresql.PostgresConnector","type":"source","version":"1.0.0.Beta2-redhat-00001"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.3.0.redhat-00003"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.3.0.redhat-00003"}]

JSON 部分が見にくいので整形してみました。

すべての Connector が登録されています(FileStreamXXXConnector は今回無視してください)。

4. MySQL Connector を MySQL に接続

以下のコマンドを実行して MySQL Connector を MySQL に登録します。

$ cat << EOF | oc exec -i -c kafka my-cluster-kafka-0 -- curl -s -X POST -H "Accept:appl://my-connect-cluster-connect-api:8083/connectors -d @-
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "my-cluster-kafka-bootstrap:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
EOF
{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"mysql","database.port":"3306","database.user":"debezium","database.password":"dbz","database.server.id":"184054","database.server.name":"dbserver1","database.whitelist":"inventory","database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes.inventory","name":"inventory-connector"},"tasks":[],"type":"source"}

一度登録すると、AMQ Streams 上にトピックがいくつか自動で生成されます。OpenShift の「Installed Operators」から「AMQ Streams」->「Kafka Topic」の順にアクセスすると自動生成されたトピックを確認することができます。

自動生成されたトピック(一部)

現時点で既に MySQL に発生していたイベントをみることができます。

$ oc exec -c kafka my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbserver1.inventory.customers --from-beginning --max-messages 4OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"1.0.0.Beta2-redhat-00001","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1575468793348}}
// 略Processed a total of 4 messages

MySQL Connector は binlog を解析してイベントに変換するように実装されていますが、初期接続時にはテーブル作成時からのすべての変更を読み取るためこのような動作になります。上記コマンドの結果、全部で 4 件のメッセージが格納されていることがわかりました。

5. MySQL に変更を発生させてイベントをキャプチャ

MySQL に INSERT 文を実行してみましょう。

$ oc exec -i $(oc get pods -o custom-columns=NAME:.metadata.name --no-headers -l app=mysql) -- bash -c 'mysql -t -u mysqluser -pmysqlpw -e "INSERT INTO customers VALUES(default,\"John\",\"Doe\",\"john.doe@example.org\")" inventory'

先ほどと同様にトピックに格納されたメッセージを確認します。

$ oc exec -c kafka my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbserver1.inventory.customers --from-beginning --max-messages 5

INSERT 文を実行したため、出力されたログからメッセージ数が 5 件になっていることが確認できると思います。

出力された JSON のうち、最後の INSERT 文に当たるイベント部分に絞ってデータをみてみましょう。

payload の部分に注目すると、before が null だったものが after で値が格納されていることがわかります。INSERT 文で使用したカラムが JSON データに反映されています。

最後に

CDC は様々なユースケースで役に立つ技術となっています。Connector の数も現時点では MySQL や PostgreSQL などのオープンソース系のデータベースが中心となっていますが、その他データベースへも対応を急いでいます。

マイクロサービスを導入するに当たって巨大モノリス DB が壁になっているケースでは前回紹介の Data Virtualization が、マイクロサービス間のデータ連携には今回の CDC が欠かせない存在となると思います。組み合わせて使うとさらに効果が発揮できるソリューションとなっています。

興味があれば是非弊社営業にお問い合わせください。

--

--

Kenta Kosugi
Kenta Kosugi

Written by Kenta Kosugi

Javaアプリケーションサーバーの開発からCORBA製品のサポート、QA、証券外務員(第一種免許)、ストレージ屋、アーキテクト、SaaS屋と一貫性のない道を歩んでいます。Red Hatに復帰しました。

No responses yet