Apache Kafka(AMQ Streams on OpenShift)で Apache Avro with Service Registry を使う#2
前回の続きになります。
使用するソースコードは以下になります。
https://github.com/k-kosugi/service-registry-demo
Kafka Producer 開発
Avro Schema の定義
Kafka で使用する Avro Schema を作成します。今回は producer/src/main/avro ディレクトリに machine.avsc という名称でスキーマを作成しています。中身は以下の通りです。
Avro スキーマを Service Registry に登録
Avro スキーマの定義が FIX したら、Service Registry に Avro スキーマを登録します。
producer/pom.xml の registryURL を書き換えましょう。
<plugin>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-maven-plugin</artifactId>
<version>${registry.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>register</goal>
</goals>
<configuration>
<registryUrl>
http://example-apicurioregistry.rhi-service-registry.apps.cluster-1492.1492.sandbox1228.opentlc.com/api
</registryUrl>
<artifactType>AVRO</artifactType>
<artifacts>
<schema1>${project.basedir}/producer/src/main/avro/book.avsc</schema1>
</artifacts>
</configuration>
</execution>
</executions>
</plugin>
registryUrl には、Service Registry の Route に /api を付与したものを記載するよう変更してください。Service Registry の Route は OpenShift のコンソールから「Routes」 → 「Location」で確認することができます。
上記の場合、http://example-apicurioregistry.rhi-service-registry.apps.cluster-1492.1492.sandbox1228.opentlc.com/api が対象の URL になります(/api のつけ忘れにご注意ください)。
Avro スキーマからソースの生成
Avro スキーマを定義したあとは、Kafka Producer でこのスキーマを使用するためのコードを自動生成します(GitHub 上にはすでにプリコンパイルされたコードが登録済みです)。producer ディレクトリで mvn compile を実施すると、pom.xml で定義されている sourceDirectory の Avro スキーマを読み込み、ouptputDirectory にソースを自動生成します。
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.10.0</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
</execution>
</executions>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</plugin>
上記が自動生成されたコード部分になります。このコードを使用して Kafka Producer を開発していくことになりますが、今回は詳細については省略します。
producer ディレクトリで
$ ./mvnw compile
と打ってください。Maven のコンパイルの仮定で Avro スキーマが Service Registry に登録されるのと同時に、プリコンパイルが実行されます。
アクセスするには「Routes」の Service Registry の URL をクリックするだけです。
Kafka Consumer は Service Registry からダウンロードした AVRO スキーマを使用して開発することもできますし、直にスキーマファイルの受け渡しをすることでも開発することができます。
アプリケーションの設定変更
Kafka BootStrap Server の設定変更
producer ディレクトリ、consumer ディレクトリの配下にある、src/main/resources/application.properties を開きます。
必要があれば以下の1行を変更しましょう。Kafka Broker を起動したプロジェクトを「my-project」から変更している場合は、該当箇所を変更する必要があります。
kafka.bootstrap.servers=my-cluster-kafka-bootstrap.my-project.svc:9092
Service Registry の設定
同じく、producer と consumer ディレクトリの配下にある src/main/resources/application.properties の設定において、以下の1行を変更します。
mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://example-apicurioregistry.rhi-service-registry.apps.cluster-1492.1492.sandbox1228.opentlc.com/api
こちらに設定する値は「rhi-service-registry」プロジェクト →「Routes」 → 「example-apicurioregistry-ingress-xxxxxxxx」の「Location」に api を付与したものになります。api のつけ忘れに注意しましょう。
Producer / Consumer のビルド実行
producer ディレクトリ / consumer ディレクトリに移動して以下のコマンドを実行します。
$ ./mvnw clean package
ビルドが完了すると、それぞれのディレクトリ配下に target/producer-1.0.0-SNAPSHOT.jar と target/consumer-1.0.0-SNAPSHOT.jar が作成されます。実行可能 jar になってます。
OpenShift へのデプロイ
なんらかの方法で OpenShift の「my-project」プロジェクトへデプロイしてください。src/main/docker/Dockerfile.jvm の Dockerfile を使用して Docker イメージを作成してデプロイしました。このとき、producer 側は Route を作成するためのチェックを必ず付与してください。
アプリケーションの動作
producer 側の動作
prodcuer を実行します。このとき呼び出す URL は以下の場所にあります。
$ curl --header "Content-Type: application/json" \
--request POST \
--data '{"id":"11111","temp":60,"rotation":120}'
http://producer-my-project.apps.cluster-1492.1492.sandbox1228.opentlc.com/machines/
consumer 側の動作
consumer は consumer の Pod のログを表示してみましょう。
続いて「Logs」を選択します。
ID : 11111, Temperature: 60 C, Rotation 120
の出力がされていることを確認します。
Kafka Console Consumer を起動して値をチェック
これでは Apache Avro によるシリアライズ / デシリアライズが実行されているかがわからないため、Kafak Console Consumer を起動して確認してみましょう。
「Workloads」 →「Pods」から「my-cluser-kafka-0」を選択します。
「Terminal」を選択します。
「Terminal」で以下のコマンドを打って、Kafka Broker に格納されたメッセージを出力してみます。
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic machines --from-beginning
少なくとも文字列はそのまま読めそうですが、int で定義した temp や rotation は解読ができそうにないことがわかりました。
後方互換性・前方互換性
Service Registry には後方互換性や前方互換性を備えているため、Producer 側がスキーマを更新して、Consumer 側が古いスキーマを使っているケースや逆のケースでもある程度スキーマによるエラーが発生しないような仕組みがとられています。詳細は以下をご確認ください。
今回はこれで終わりです。