Apache Kafka(AMQ Streams on OpenShift)で Apache Avro with Service Registry を使う#1
Apicurio Registry
Red Hat Integration には Service Registry という Apache Kafka で利用可能なスキーマを管理する仕組みが備わっています。
https://access.redhat.com/documentation/en-us/red_hat_integration/2020-q4/html/getting_started_with_service_registry/
Service Registry は Apicurio Registry がアップストリームのプロジェクトです。Apicurio Registry の Web サイトにおいて「Schema Registry」のユースケースとして紹介されています。
メッセージングやイベントストリーミングの世界では、トピックやキューに公開されるデータは、しばしばスキーマ(Apache Avro、JSON スキーマ、Google プロトコルバッファなど)を使用してシリアライズまたは検証されなければなりません。スキーマは各アプリケーションでパッケージ化することができますが、代わりに外部システムに登録し、各アプリケーションから参照する方がより良いアーキテクチャパターンであることが多いです。これにより、以下のようなことが可能になります。
生産者と消費者からのスキーマのデカップリング
消費者をアップグレードせずに生産者をアップグレードする可能性
生産に使用されるすべてのスキーマを一元管理
各トピック/キューに必要なデータ形式を文書化する
データフォーマット(スキーマ)の進化を一元管理
Red Hat Integration Service Registry
Apicurio Registry をエンタープライズレディにしたものが Red Hat Integration Service Registry になります。
Service Registry では Kafka のプロデューサーとコンシューマーでのデータのシリアライズ / デシリアライズのためにスキーマを登録、管理することが可能となります。その際、利用可能なストレージはいくつかの選択肢から選択することができます。今回は Red Hat Integration 2020-Q4 バージョンを利用しますが、このバージョンで利用可能なストレージは以下3タイプになります。
- Kafka Streams based storage in AMQ Streams (General Available)
- JPA based storage in PostgreSQL (Tech Preview)
- Cache based storage in embedded Infinispan (Tech Preview)
今回は AMQ Streams を使ってみます(他は Tech Preview なので)。
Red Hat Integration Operator のインストール
Operator Hub から「Red Hat Integration Operator」をインストールします。
「Install」を選択します。
Red Hat Integration の Operator 一覧が表示されます。 「>」ボタンを押下して展開してみると、デフォルトではすべての Operator がインストールされるような設定になっていることがわかります。
「Red Hat Integration」Operator の「Status」が「Succeeded」になるまで待ちます。
「status」が「Succeeded」になると「rhi-service-registry」という OpenShift のプロジェクトに「Red Hat Integration Service Registry」Operator がデプロイされています。
事前準備 AMQ Streams (Kafka Broker)のデプロイ
OpenShift で「my-project」というプロジェクトを作成してください。このプロジェクトに Kafka Broker をデプロイします。
作成できたら、「Project」を「my-project」に設定して「Installed Operators」 → 「Kafka」の順に選択します。「my-project」を使わない場合は「Service Registry」の設定で Kafka Bootstrap Server の設定する箇所で適宜読み替えてください。
遷移した画面で「Create Kafka」を選択します。
今回はとりあえず試すだけなので、そのまま「Create」を押しましょう。
デフォルト設定では Kafka Broker 3台、Zookeeper 3台が起動します。
AMQ Streams Operator の画面から以下二つのトピックを作成しておきます。いずれも Service Registry から利用されるので、名前の打ち間違いや作成忘れにご注意ください。Service Registry の起動でエラーになります。
- storage-topic
- global-id-topic
Service Registry のデプロイ
OpenShift のプロジェクトを「rhi-service-registry」に変更します。「Installed Operators」から「ApicurioRegistry」を選択します。
遷移した画面で「Create ApicurioRegistry」を選択します。
以下の画面に遷移するのでラジオボタンの「YAML View」を選択します。
YAML エディターが開きますので、次のコードをコピー & ペーストしてください。
YAML で指定している bootstrapServers の項目は “クラスター名-kafka-bootstrap.プロジェクト名.svc:9092” になります。Kafka クラスター名を変更している場合は合わせて bootstrapServers を変更しておきましょう。
「Create」を押してデプロイしましょう。デプロイが完了すると、「example-apicurioregistry-deployment-xxxx-deploy」という Pod が起動し、しばらくすると「example-apicurioregistry-deployment-yyyy」という Pod が起動します。
Web Console 用の Route も作成されるので、アクセス可能か確認しておきます。
上記の URL にアクセスすると下記の画面が表示されます。
今回はここまでになります。次回は Service Registry を使用して Kafka クライアント(コンシューマー & プロデューサー)を開発していきます。