Apache Kafka(AMQ Streams on OpenShift)で Apache Avro with Service Registry を使う#2

Kenta Kosugi
11 min readApr 21, 2021

--

前回の続きになります。

使用するソースコードは以下になります。
https://github.com/k-kosugi/service-registry-demo

Kafka Producer 開発

Avro Schema の定義

Kafka で使用する Avro Schema を作成します。今回は producer/src/main/avro ディレクトリに machine.avsc という名称でスキーマを作成しています。中身は以下の通りです。

Avro スキーマを Service Registry に登録

Avro スキーマの定義が FIX したら、Service Registry に Avro スキーマを登録します。

producer/pom.xml の registryURL を書き換えましょう。

<plugin>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-maven-plugin</artifactId>
<version>${registry.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>register</goal>
</goals>
<configuration>
<registryUrl>
http://example-apicurioregistry.rhi-service-registry.apps.cluster-1492.1492.sandbox1228.opentlc.com/api
</registryUrl>
<artifactType>AVRO</artifactType>
<artifacts>
<schema1>${project.basedir}/producer/src/main/avro/book.avsc</schema1>
</artifacts>
</configuration>
</execution>
</executions>
</plugin>

registryUrl には、Service Registry の Route に /api を付与したものを記載するよう変更してください。Service Registry の Route は OpenShift のコンソールから「Routes」 → 「Location」で確認することができます。

上記の場合、http://example-apicurioregistry.rhi-service-registry.apps.cluster-1492.1492.sandbox1228.opentlc.com/api が対象の URL になります(/api のつけ忘れにご注意ください)。

Avro スキーマからソースの生成

Avro スキーマを定義したあとは、Kafka Producer でこのスキーマを使用するためのコードを自動生成します(GitHub 上にはすでにプリコンパイルされたコードが登録済みです)。producer ディレクトリで mvn compile を実施すると、pom.xml で定義されている sourceDirectory の Avro スキーマを読み込み、ouptputDirectory にソースを自動生成します。

<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.10.0</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
</execution>
</executions>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</plugin>
プリコンパイルされたコード

上記が自動生成されたコード部分になります。このコードを使用して Kafka Producer を開発していくことになりますが、今回は詳細については省略します。

producer ディレクトリで

$ ./mvnw compile

と打ってください。Maven のコンパイルの仮定で Avro スキーマが Service Registry に登録されるのと同時に、プリコンパイルが実行されます。

登録された Avro スキーマ

アクセスするには「Routes」の Service Registry の URL をクリックするだけです。

Kafka Consumer は Service Registry からダウンロードした AVRO スキーマを使用して開発することもできますし、直にスキーマファイルの受け渡しをすることでも開発することができます。

アプリケーションの設定変更

Kafka BootStrap Server の設定変更

producer ディレクトリ、consumer ディレクトリの配下にある、src/main/resources/application.properties を開きます。

必要があれば以下の1行を変更しましょう。Kafka Broker を起動したプロジェクトを「my-project」から変更している場合は、該当箇所を変更する必要があります。

kafka.bootstrap.servers=my-cluster-kafka-bootstrap.my-project.svc:9092

Service Registry の設定

同じく、producer と consumer ディレクトリの配下にある src/main/resources/application.properties の設定において、以下の1行を変更します。

mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://example-apicurioregistry.rhi-service-registry.apps.cluster-1492.1492.sandbox1228.opentlc.com/api

こちらに設定する値は「rhi-service-registry」プロジェクト →「Routes」 → 「example-apicurioregistry-ingress-xxxxxxxx」の「Location」に api を付与したものになります。api のつけ忘れに注意しましょう。

Producer / Consumer のビルド実行

producer ディレクトリ / consumer ディレクトリに移動して以下のコマンドを実行します。

$ ./mvnw clean package

ビルドが完了すると、それぞれのディレクトリ配下に target/producer-1.0.0-SNAPSHOT.jar と target/consumer-1.0.0-SNAPSHOT.jar が作成されます。実行可能 jar になってます。

OpenShift へのデプロイ

なんらかの方法で OpenShift の「my-project」プロジェクトへデプロイしてください。src/main/docker/Dockerfile.jvm の Dockerfile を使用して Docker イメージを作成してデプロイしました。このとき、producer 側は Route を作成するためのチェックを必ず付与してください。

アプリケーションの動作

producer 側の動作

prodcuer を実行します。このとき呼び出す URL は以下の場所にあります。

$ curl --header "Content-Type: application/json" \
--request POST \
--data '{"id":"11111","temp":60,"rotation":120}'
http://producer-my-project.apps.cluster-1492.1492.sandbox1228.opentlc.com/machines/

consumer 側の動作

consumer は consumer の Pod のログを表示してみましょう。

続いて「Logs」を選択します。

ID : 11111, Temperature: 60 C, Rotation 120 

の出力がされていることを確認します。

Kafka Console Consumer を起動して値をチェック

これでは Apache Avro によるシリアライズ / デシリアライズが実行されているかがわからないため、Kafak Console Consumer を起動して確認してみましょう。

「Workloads」 →「Pods」から「my-cluser-kafka-0」を選択します。

「Terminal」を選択します。

「Terminal」で以下のコマンドを打って、Kafka Broker に格納されたメッセージを出力してみます。

$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic machines --from-beginning
出力されたメッセージの中身

少なくとも文字列はそのまま読めそうですが、int で定義した temp や rotation は解読ができそうにないことがわかりました。

後方互換性・前方互換性

Service Registry には後方互換性や前方互換性を備えているため、Producer 側がスキーマを更新して、Consumer 側が古いスキーマを使っているケースや逆のケースでもある程度スキーマによるエラーが発生しないような仕組みがとられています。詳細は以下をご確認ください。

今回はこれで終わりです。

--

--

Kenta Kosugi
Kenta Kosugi

Written by Kenta Kosugi

Javaアプリケーションサーバーの開発からCORBA製品のサポート、QA、証券外務員(第一種免許)、ストレージ屋、アーキテクト、SaaS屋と一貫性のない道を歩んでいます。Red Hatに復帰しました。

No responses yet