Knative Eventing with Apache Kafka & Camel K Quarkus

Kenta Kosugi
17 min readMay 15, 2020

--

Camel K

Red Hat Fuse のプロダクトの中に Fuse Online があります。この Fuse Online は Integration Platform as a Service の一種で OpenShift Container Platform 上に構築した Fuse Online 上でグラフィカルにシステムの統合ができるようデザインされたものになっています。

Fuse Online

Fuse Online には Camel K と呼ばれる Kubernetes 用に設計された Camel の Tech Preview が含まれています。

今回はこの Camel K に注目し、Apache Kafka と Camel K を利用した Knative Eventing をご紹介します。前回 Serving ご紹介時はサンプルコード主体でしたが、今回は主にアーキテクチャや仕組みのご紹介となります。

イベントドリヴンなアプリケーション

AWS では Amazon S3 のバケットに画像が格納されるとイベントが発生し、これを契機として Amazon Lambda が起動して様々な処理をさせる、という仕組みがよく利用されます。

S3 は Simple Storage Service の略で、オブジェクトストレージです。ストレージと聞くと昔ながらのアプリケーション開発者は自分とは無縁だと思うかもしれませんが、AWS を使用するアプリケーション開発者はガシガシ S3 を使って開発しています。API でデータが操作でき、画像ファイルを格納することもできますし、json ファイルや xml ファイルを格納し、Hadoop/Spark から解析することもできます。

Lambda はイベントが発生したときのみ起動するサーバーレスアプリケーションです。これにより、サーバーのプロビジョニングや管理を気にすることのないアプリケーション開発を可能としています。

S3 と Lambda の組み合わせは AWS 上でしか使えません。様々なプラットフォームでイベントドリヴンなアプリケーションを構築するにはどうしたらよいでしょうか。

Knative Eventing

Knative Eventing は Knative のコンポーネントの一つです。前回ご紹介した Knative Serving と、Knative Eventing の合わせて二つが Knative のコンポーネントです。Knative は Kubernetes 上に構築されます。

Knative Eventing を利用すると、S3 + Lambda で実現しているようなイベントドリヴンな仕組みを様々な環境で構築できるようになります。

こちらをご覧ください。

S3 の代わりに OSS のオブジェクトストレージである Ceph を使用しています。Ceph に胸部レントゲン写真が格納されると notification が Apache Kafka に格納されます。Kafka に notification が格納されると Kafka を イベントソースとして(KafkaSource と言います)、サーバーレスアプリケーションが起動します。肺炎かどうか機械学習の結果を反映させて再度 Ceph に格納するというサンプルになります。

このようにサーバーレスアプリケーションをベンダーに依存しない形で OSS のみで実現できます。イベントドリヴンなアプリケーションの基盤となるのが Knative Eventing になります。

※ 今後、Knative Eventing が流行るとオブジェクトストレージである Ceph は更に重要度を増すことになるでしょう。

Knative Eventing のアーキテクチャ

knative.dev より引用

ソースとシンク

上記の図の Simple Delivery と記載されているパターンです。イベントソースが直接イベントシンクにイベントを届けます。

チャネルとサブスクリプション

イベントを生成するイベントソースとイベントを消費するイベントシンクの間にチャネルを定義し、イベントの永続化領域として使用できます。

利用できるチャネルには NatssChannel や InMemoryChannel、KafkaChannel などを使用することができます。デフォルトでは InMmoryChannel が利用されます。

現在 Knative Cookbook が無料で公開されており、その中で Knative のデフォルトチャンネルを KafkaChannel にする方法をはじめ(Knative のオフィシャルサイトにも記載されていますが)、役に立つ情報が記載されているので、興味があれば是非ダウンロードしてみてください。

各チャンネルにはイベントシンクとしてサブスクライバを設定できます。

【参考】Kafka Channel を作成してみる

OpenShift 上で KafkaChannel を作成してみます。手順としては以下になります。※ OpenShift Serverless Operator はインストール済みで Knative Serving と Knative Eventing も構成済みであることが前提です。

  1. AMQ Streams Operator をインストールして AMQ Streams Operator 経由で Kafka Broker を生成
  2. Knative Apache Kafka Operator をインストールして Knative Components For Apache Kafka インスタンスの生成
    ※このとき、Kafka の接続先を1で作成した Kafka BootStrap のサービスにすることに注意します
  3. KafkaChannel のデプロイ
    以下のコマンドを実行します。パーティションやレプリケーションファクターは適宜変更してください。
$ cat <<-EOF | oc apply -f -
---
apiVersion: messaging.knative.dev/v1alpha1
kind: KafkaChannel
metadata:
name: my-kafka-channel
spec:
numPartitions: 3
replicationFactor: 1
EOF

KafkaChannel を確認してみます。

$ oc get kafkachannel
NAME READY REASON URL AGE
my-kafka-channel True http://my-kafka-channel-kn-channel.default.svc.cluster.local 5m45s

KafkaChannel を作成したことで以下のように、KafkaChannel 用の Kafka Topic が自動で作成されます。
※ このことから分かるように、Kafka ブロカーを作成するタイミングで auto.create.topics.enable=false にした場合はおそらく Channel の作成に失敗するでしょう。

$ oc get kafkatopics
NAME PARTITIONS REPLICATION FACTOR
knative-messaging-kafka.project-user1-tekton.my-kafka-channel 3 1
Webコンソールで Topic を確認したところ

ブローカーとトリガー

knative.dev より引用

チャンネルとサブスクリプションに似ていますが、サブスクライバが興味のあるイベントだけをフィルタリングすることができます。

サービス

サービスは Knative Serving のリソースで、ワークロードのライフサイクル全体を自動的に管理します。前回の紹介で、リビジョンをあげて、トラフィックを分割する方法を紹介しました。

knative.dev から引用

イベントソース

イベントドリヴンなアプリケーションを構築する上で何をイベントにするかは重要です。例えば、GitHub 上の何かしらのイベントを契機にしたい場合は GitHubSource を使用することができます。

knative.dev から引用

Apache Kafka を使用している場合、Kafka をチャネルとして利用するだけではなく、イベントソースとして利用することもできます。胸部レントゲンの例では KafkaSource が使用されています。

Camel K

GitHub や Kafka がイベントソースとして利用できることがわかりました。しかし多くの場合、初めから用意されているソースでは不足することが考えられます。ServiceNow や salesforce などのビジネスで利用しているシステム、Slack や Twitter、Facebook などの SNS を契機にしたイベントを補足したい場合はどうしたら良いでしょうか。こちらを見てもそういった Source は存在しません。

こういったケースでは様々なシステムと連携するための Enterprise Integration Patterns を実現した Apache Camel がその役割を担うことになります。厳密にいうと Kubernetes 用の Camel である Camel K を使用します。

Camel K 自体を紹介すると非常に長くなってしまうので、詳細についてはこちらをご覧ください。また、Japan Camel User Group で発表されたこちらの資料についても今回の紹介内容が記載されているため合わせてご確認ください。

Japane Camel User Group は定期的に「Camel In Actions」の読書会も行っていますので、興味があれば参加してみてはいかがでしょうか。

CamelSource

Knative の CamelSource を使用すると、様々なシステムから発生するイベントへ対応が可能となります。対応しているコンポーネントは300以上あり、様々なシステムに接続できるようになるでしょう。

Camel Sources

【参考】CamelSource を作成してみる

OpenShift 上に CamelSource を作成してみます。手順としては以下になります。※ OpenShift Serverless Operator はインストール済みで Knative Serving と Knative Eventing も構成済みであることが前提です。

  1. Camle K Operator のインストール
  2. Knative Apache Camel Operator のインストール
  3. イベントシンク側の event-display をデプロイ
  4. ソース側 CamelSource のデプロイ

作成後のトポロジは以下のようになります。event-display は KSVC とあり、Knative Service であることがわかります。CamelSource 側は Camel K Operator によって作成されたものです。

トポロジ

event-display は以下のコマンドでデプロイします。

$ oc apply -f https://github.com/knative/eventing-contrib/releases/download/v0.14.1/event-display.yaml

CamelSource は以下の yaml を使用してデプロイします。

apiVersion: sources.knative.dev/v1alpha1
kind: CamelSource
metadata:
name: timed-greeter
spec:
integration:
dependencies:
- camel:log
source:
flow:
from:
uri: "timer:tick"
parameters:
period: "10s"
steps:
- set-body:
constant: "Welcome to Apache Camel-K"
- set-header:
name: ContentType
simple: text/plain
- transform:
simple: "${body.toUpperCase()}"
- log:
message: "${body}"
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display

sink の箇所を見てください。CamelSource は上記で作成した event-display に対してイベントを送信するよう設定しています。今回は Knative プロジェクトで用意された CloudEvents をみるための event-display というサービスを使用します。

sink の部分を event-display から KafkaChannel にすれば、CamelSource から Kafka にイベントを送信するよう設定することもできます。

source 部分ですが、今回は外部システムには接続せず、Camel 自信が 10 秒ごとに Welcome to Apache Camel-K というメッセージを大文字に変換して投げるものになっています。作り的には CronJobSource と似たものになっています。本来 source 部分には例えば ServiceNow、Facebook、Twitter 、 Kafka(Camel K から Kafka へももちろん接続可能)、ActiveMQ 等、他システムとの連携を記述することになります。

今回デプロイした event-display では CamelSource から届いたイベントを CloudEvents の形式でみることができます。

CloudEvents

サービス側での Camel K の利用

Camel K がイベントソース側で様々なシステムと連携することは上記でわかりました。様々なシステムと連携できるということはサービス側(つまり Knative Serving)でも使えるのではないでしょうか。

サーバーレスでは不要な時はサービスは落ち、イベントを契機にアプリケーションが起動されます。

Camel は主に Java 言語で利用されますが、Java は Go や JavaScript で記述されたアプリケーションに比べ起動処理が遅く、サーバーレスアプリケーションのワークロード(0 までスケールインされたアプリケーションがイベント発生に伴い起動する)には適さないような気がします。

Camel K + Quarkus

Quarkus を利用することでこの問題は解消されます。
Quarkus を利用すると Java で記述されたソースコードを Java バイトコードではなく Linux ネイティブな実行可能バイナリとして生成することを可能にする上、Java の技術者の知識を活かすことができます。

Camel K も Camel K Quarkus を使用することで起動時間が早く、メモリの消費量を抑えたサーバーレスアプリケーションを開発することができます。

これにより、リクエストの度に起動されても問題ないアプリケーションを実現することができます。しかも、Camel の様々なシステムへの接続性を活かすことができるため、Camel K + Quarkus をイベントシンク側のアプリケーションとして利用することは非常にメリットがあります。

Knative Serving としての Camel K

Wrap Up

Knative Eventing ではイベントの永続化領域で Apache Kafka が使用できます。また、Kafka 自体をイベントソースとしても利用することができます。これにより、以前にご紹介したこともある Change Data Capture でデータベースに発生したイベント(INSERT/UPDATE/DELETE)を契機に、KafkaSource を通してサーバーレスアプリケーションを動作させるということももちろん可能になります。

Apache Kafka を含めた様々なシステムに接続することができ、K8s 用に特化した Camel K は Apache Kafka と同様にイベントソースとしても活用できますし、サービス側でも活用できます。Camel がもともと持っているコンテンツベースのルーティングを使用して、イベントの分離も簡単におこなえるようになるでしょう。

イベントソース、サービス双方での Camel K の利用

サーバーレスアプリケーションを考慮する場合、そのワークロードの特性上、起動時間を高速にする Quarkus が重要になります。また、冒頭にあったオブジェクトストレージ Ceph もイベントドリヴンなシステムを構築する上では今後欠かせない要素となるでしょう。

それらの基盤となる Kubernetes、Knative も重要です。
Knative を使用するとサービスのリビジョン管理・トラフィック制御も実現でき、A/B テストも可能となります。

こうした技術を組み合わせることで、特定の環境に依存しないイベントドリヴンなシステムを構築することができるようになります。

Red Hat では今回ご紹介したマイクロサービス・サーバーレス含めた情報を Red Hat Developer Channel で Webinar 配信する予定です。

是非ご視聴ください。

--

--

Kenta Kosugi
Kenta Kosugi

Written by Kenta Kosugi

Javaアプリケーションサーバーの開発からCORBA製品のサポート、QA、証券外務員(第一種免許)、ストレージ屋、アーキテクト、SaaS屋と一貫性のない道を歩んでいます。Red Hatに復帰しました。

No responses yet