Apache Kafka on K8s (Strimzi) をコントロールする Cruise Control
Red Hat からエンタープライズ版 K8s である OpenShift 上で動作可能な Apache Kafka(アップストリームは Strimzi) のエンタープライズ版 AMQ Streams 1.5.0 がリリースされました。
このリリースの中には様々な機能拡張が含まれています。
- Kafka 2.5.0 のサポート
- Zookeeper 3.5.8 のサポート
- disconnected な OpenShift 環境への AMQ Streams のインストールのサポート
- MirrorMaker2.0 のサポート
- などなど
その中でも今回は Cruise Control が Tech Preview になっており、Cruise Control に焦点を当ててみたいと思います。
※ Tech Preview なので本番環境でご利用にならないようお願いいたします。
Cruise Control を理解するために(Apache Kafka の簡単なご紹介)
Cruise Control を説明する際に出てくる用語がわからない方向けに Kafka のアーキテクチャを含め、Kafka の用語を簡単に説明します。
- Kafka は複数台のマシンでクラスターを作成します。
(図のケースでは5台) - 複数台のマシンで構成する分散システムであり、分散システムで設定情報を集中管理するための Apache Zookeeper も使用します。
- Zookeeper 自体も分散システムであり、図のケースでは3台で構成されています。
- Kafka はブローカー上に論理的なメッセージの容れ物をトピックとして持ちます。
(図では Topic1 と Topic2 が存在)
- ②トピックはパーティションに分割されます(シャーディング)。パーティション数はトピック作成時に決定できます。
(Topic1 ではパーティション数2、Topic2 ではパーティション数 3) - ①パーティションは複数ノード(下記のレプリカ数に相当)にそれぞれ同じ内容で保存されます。
- パーティションを持つノードの中にはリーダーレプリカ・フォロワーレプリカの2種類が存在します。
- クライアント(メッセージを生成するのがプロデューサー、メッセージを取得するのがコンシューマー)からのメッセージの読書を担うのはリーダーレプリカになり、パーティションの中に 1 つ存在します。
(Topic1 の Partition0のリーダーは Broker Node3、Topic1 の Partition1のリーダーは Broker Node4) - トピック作成時にレプリカ数(Replication Factor)を決定できます。
(Kafka を構成するブローカーの数を超えることはできません) - フォロワーレプリカはリーダーレプリカのレプリケーションのみを担当します。
パーティションを複数ノードで分散して保持しているため、特定のノードに障害が発生した場合でも、継続して処理をおこなうことができるようになっています。障害が発生したノードにリーダーレプリカが存在する場合は他のフォロワーレプリカがリーダーレプリカに昇格します。
Cruise Control 登場の背景
Apache Kafka を使用するとブローカーを追加することで大規模な負荷に対応することができます。
クラスターに新しいブローカーが追加されると、クラスター追加後に生成された新しいトピック-パーティションについてはリーダー・フォロワーとして追加された新ブローカーも対象として選択され、負荷が分散されるようになります。
しかし、ブローカー追加前に作成されたトピック-パーティションについては負荷が緩和されません。手動でパーティション再割り当てを実行する必要があります。
また、ブローカーを新規追加するケースに限らず、特定のブローカーがリーダーレプリカを偏って所持し、高負荷になっているケースでは負荷の高いトピック-パーティションを特定して、負荷のかかっていないフォロワーレプリカをリーダーにする必要が出てきます。
Cruise Control
LinkedIn では1800 を超える Kafka ブローカーが起動され、1日1台はブローカーが故障するという状況にあるとのことです。復旧作業やリバランス作業にかなりの工数がかかっていました。Cruise Control はこうした運用時の負荷を軽減するために開発されました。事前に定義されたパフォーマンス目標を満たすようクラスターに割り当てられたリソースを自動調整するシステムです。
(当初、社内プロジェクトとして開発されていましたが、2017 年(2016?)に OSS として公開されました)
運用初期は問題なくても、ブローカーの障害、ブローカーの追加、トピックの追加などを繰り返していくうちにクラスター全体の負荷が均等ではなくなってきます。こうした問題に対して、Cruise Control は CPU、ディスク、ネットワークの負荷に応じてクラスターのリソース利用率を均等になるように構成します。
これらが自動で計算できるよう、最適化目標を複数定義しておきます。最適化目標に基づいて最適化の提案が生成されます。最適化の提案を承認すると、Kafka クラスターのリバランスが実施されます。
Cruise Control のアーキテクチャ
Cruise Control は以下の 4 つの主要コンポーネントを持っており、それらと REST API で通信できるようになっています。※ AMQ Streams がベースとする Strimzi では Kafka Cluster Operator の制御外、つまり REST API を使って構成を変更できる可能性を排除しています。基本的には Kafka Cluster Operator を使用するよう現時点では設計されています。そのため、Cruise Control の REST API では利用できるにもかかわらず、Strimzi では利用できない機能も存在します。Strimzi で利用できない機能は AMQ Streams でも利用できません。
以下のサイトを参考に記述しています。
- ロードモニター
クラスターから Kafka のメトリックを収集します。直接取得することができないパーティションごとのリソースメトリックを導出します(たとえばパーティションごとの CPU 使用率を推定)。
ディスク使用率、CPU 使用率、byte-in 率、byte-out 率などのクラスターリソース使用率を正確にキャプチャするクラスターワークロードモデルを生成し、ディテクターとアナライザーに送信します。 - アナライザー
Cruise Control の頭脳にあたります。発見的手法を使用してユーザーが指定した最適化目標とロードモニターからのクラスターワークロードモデルに基づいて最適化の提案を生成します。 - ディテクター
ブローカーの障害、最適化目標の違反といった2種類の異常を検出します。 - エグゼキューター
アナライザーからの最適化提案を実施する責任があります。Kafka クラスターのリバランスにはパーティションの再割り当ても含まれます。エグゼキューターはどのブローカーにも負担をかけないように実行します。
最適化目標
最適化目標は Kafka クラスター全体でのワークロードの再分散とリソース使用率に対する制約です。使用できる最適化目標は以下のとおりです(デフォルトの優先順位)。
- Rack-awareness
- Replica capacity
- Capacity
ディスク、インバウンドネットワーク、アウトバウンドネットワーク - Replica distribution
- Potential network output
- Resource distribution
ディスク使用率、ネットワークインバウンド使用率、ネットワークアウトバウンド使用率
最適化目標は Kafka もしくは KafkaRebalance カスタムリソースで定義します。
ブローカーの追加 without Cruise Control
Cruise Control を使用しない場合において Kafka Broker の総数を 3 で Kafka クラスターを作成し、あとで 2 ブローカーを追加した場合にどうなるかをみてみましょう。ここでは OpenShift 4.4、AMQ Streams 1.5.0 を使用しています。
コマンドではなく GUI で操作している場面では Installed Operators から AMQ Streams Operator を介して操作しています。
クラスター作成時には name: my-cluster で作成しますが実態は StatefulSet で -kafka-X の接尾辞を付与した my-cluster-kafka-0、my-cluster-kafka-1、my-cluster-kafka-2 という Pod が起動してきます。
ここでは最初にパーティション数 5、レプリカ数 3 のトピックを作成してみます。
この状態でトピックの詳細情報を取得してみます。起動した my-cluster-kafka-0 の Pod 内に入って、kafka-topics.sh を打ちます。
$ oc exec -it my-cluster-kafka-0 -- ./bin/kafka-topics.sh --describe --topic my-topic --zookeeper localhost:2181
Defaulting container name to kafka.
Use 'oc describe pod/my-cluster-kafka-0 -n cruise-control-demo' to see all of the containers in this pod.
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Topic: my-topic PartitionCount: 5 ReplicationFactor: 3 Configs: segment.bytes=1073741824,retention.ms=604800000
Topic: my-topic Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: my-topic Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: my-topic Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: my-topic Partition: 3 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: my-topic Partition: 4 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
すると作成した 5 つのパーティション、それに加えどのブローカーがリーダーなのかが表示されます。
- Partition0: リーダーは Broker0
- Partition1: リーダーは Broker2
- Partition2: リーダーは Broker1
- Partition3: リーダーは Broker0
- Partition4: リーダーは Broker2
ここでは Broker0 と Broker2 が 2 つのリーダーを持っているため Broker1 に比べて忙しそうです。
さてここでブローカーを 2 台追加してみます。これでリーダーがどう変わるかをみてみましょう。
上記の状態で Save を押すと、新たに my-cluster-kafka-3、my-cluster-kafka-4 の 2 Pod が起動してきます。
kafka-topics.sh を実行してリーダーが変更されたかどうかを確認してみましょう。
- Partition0: リーダーは Broker0
- Partition1: リーダーは Broker2
- Partition2: リーダーは Broker1
- Partition3: リーダーは Broker0
- Partition4: リーダーは Broker2
リーダーは変わっていません。Broker3 や Broker4 といった新規に追加したブローカーは既存のトピック、パーティションには何ら影響しないことがわかりました。
ブローカーの追加 with Cruise Control
Kafka クラスターを一度削除して、もう一度作成します。このとき、以下の1行を追加します。
spec:
cruiseControl: {}
この状態でクラスターを作成すると、my-cluster-cruise-control-xxxx という Pod が起動してきます。
この時点で、KafkaTopic がいくつか自動で生成されます。
上記のアーキテクチャ図における Kafka Cluster のトピックに相当するものと予想しますが、上図にはないトピックも存在しており、当初の Cruise Control からは拡張されているようです。
続いて、Kafka Rebalance を作成します。
spec を空で生成していますが、このように設定するとデフォルトの目標が設定されます。
さて、これで準備完了です。
まずはトピックを作成します。Cruise Control を使用していない場合と同じようにパーティション 5、レプリカ数 3を指定します。
まずはトピックの状態を表示してみましょう。
- Partition0: リーダーは Broker1
- Partition1: リーダーは Broker0
- Partition2: リーダーは Broker2
- Partition3: リーダーは Broker1
- Partition4: リーダーは Broker0
今度は Broker2 が暇そうです。では Kafka Broker を2台追加してみましょう。
replicas の数を 3 → 5 に変更して Save を押します。変更ができたら Kafka Rebalance の詳細を確認してみましょう。
$ oc describe kafkarebalance my-rebalance -n cruise-control-demo
出力された結果の中で 特に注目してもらいたいのが Status.Conditions.Status が ProposalReady になっている点です。提案の準備ができたことを表しています。この提案を承認するとリーダーの移動が 8、レプリカの移動が 48 実行されることがわかります。On Demand Balancedness Score も承認前後で 9 ポイント向上することがわかります。
この時点では、レプリカの状態は何も変わっていません。この提案を受けてみます。
$ oc annotate kafkarebalance my-rebalance strimzi.io/rebalance=approve -n cruise-control-demo
提案を受け入れると、リバランスが実行され以下のようにリーダーが変更されます。リバランス中は、KafkaRebalance の Status は Rebalancing になるようです。
また使用されているレプリカも Broker0〜Broker4 のすべてが利用されていることがわかります。
※ 皆さんの環境が私と全く同じ提案にはならないでしょう。OpenShift のネットワーク状況やワーカーの CPU 使用率などさまざまな要素で決められているためです。
これで Kafka クラスターのリバランスを実施することができました。
まだ私も本機能を調べ始めたばかりなので不明な点が山ほどありますが、今回はここまでとします。
まとめ
Kafka の運用で大変になるリバランス作業を Cruise Control が様々な情報を収集し、どう変更すれば良いのかを提案してくれるようになります。変更を受け入れるかどうかは運用者に委ねられているため、定期的に KafkaRebalance の状態をチェックして(状態をリフレッシュする必要もあるようです)、よりよい状態であれば受け入れてリバランスを走らせることができます。これにより運用の手間を大いに減らすことができるでしょう。
人手を介してパーティションの使用率や CPU 使用率を推定して出すよりもかなり楽です。