Apache Kafka Connector 4.8 - Mule 4

Apache Kafka 用 Anypoint Connector (Apache Kafka Connector) を使用すると、Apache Kafka メッセージングシステムとやりとりでき、Mule Runtime Engine (Mule) を使用した Mule アプリケーションと Kafka クラスター間のシームレスなインテグレーションを実現できます。

互換性に関する情報は「Apache Kafka Connector リリースノート」を参照してください。

始める前に

アプリケーションを作成する前に、次の作業を行う必要があります。

  • Apache Kafka の対象リソースおよび Anypoint Platform へのアクセス権を取得する

  • Anypoint Studio を使用した Mule アプリケーションの作成方法を理解する

  • Studio に表示される項目の値を取得するために Apache Kafka へのアクセス権を取得する

コネクタの一般的なユースケース

Apache Kafka Connector の一般的なユースケースを次に挙げます。

  • ログ集約

    Apache Kafka の低レイテンシー処理を活用して、複数のサービスからログを収集し、それらを標準形式で複数のコンシューマーが使用できるようにします。たとえば、メッセージを Apache Kafka にパブリッシュし、その後、メッセージを取得することができます。

  • 分析およびメトリクス

    広告予算を最適化するには、Apache Kafka とビッグデータ分析ソリューションを統合し、ページビュー、クリック、共有などのエンドユーザーアクティビティを分析して、関連する広告を提供します。たとえば、百貨店は Web サイトアクティビティトラッカーを使用してオンラインショッピング体験を改善することができます。収集されたデータは複数の部門に送信されてさまざまな計算が行われます。各部門は、受信した情報を確認して、顧客が探している商品に関する最新情報を入手し、それに応じておすすめを提案します。

  • 通知とアラート

    信用調査機関との統合や場所などに基づいて、さまざまな財務イベント (最近の取引金額など) やより複雑なイベント (今後の投資の提案など) について顧客に通知します。

  • 時間的制約のあるアプリケーション

    時間的制約のあるインテグレーションアプリケーションを構築します。たとえば、患者の緊急の入院要求を処理したり、指定された条件により決定される優先度に基づいて要求を並び替えたりする病院のサーバーを使用して、病院の患者が必要なケアをタイムリーに受けられるようにするアプリケーションを構築できます。このシナリオでは、アプリケーションは、キューを介して送信されるメッセージの順序と羃等性を使用して、受信した順序で Apache Kafka メッセージを処理します。たとえば、ニュースルームでは Apache Kafka システムを使用して最新のニュースを配信することができます。インテグレーションアプリケーションを構築し、最初に Apache Kafka キューの末尾から読み取ることで、最新のニュースを取得します。

これらのユースケースの例については、​「Apache Kafka Connector Connector の例」​を参照してください。

パブリッシュ先のユースケース

Kafka Connector を使用すると、アプリケーションから Kafka サーバーにバイナリメッセージをパブリッシュできます。ユースケースの例を次に示します。

  • バイナリデータを Kafka サーバーおよび基盤となるトピックにパブリッシュする。

  • 大量のデータを処理する場合に Kafka にパブリッシュするデータのバッチサイズを設定する。

    • パフォーマンスを最適化するために、会社は送信するメッセージをまとめることができます。Kafka は、バッチサイズに達するまでメッセージを保存してから送信を実行します。

  • 最適なパフォーマンスに焦点を合わせて、送信するメッセージサイズを設定する。

    • Kafka は、大きなメッセージを処理することを目的としていません。通常、メッセージは 1 MB の範囲内に収まり、ユーザーはパフォーマンスが低下しないようにサイズを設定します。エラーの場合、メッセージは拒否されます。

  • データトランスミッションが成功するように、Kafka サーバーにパブリッシュされるメッセージの設定可能なレベルの肯定応答に基づいて後続のコミットをブロックする。

    • 肯定応答により、ユーザーは追加のメッセージをパブリッシュする前にメッセージが送信されたことを確認できます。これは、時間順またはイベント順のメッセージの場合 (ログや患者の投薬順など) に特に役立ちます。メッセージが順番どおりに送信されるようにすると、ダウンストリームアプリケーションで適切な順序でメッセージを処理できます。

  • パブリッシュ中のデータのシリアル化方法を設定する。

  • 相互 TLS 中に送信されたメッセージを保護する。

パブリッシュの承認条件を次に示します。

  • Kafka トピックにメッセージをパブリッシュする。

  • バッチサイズおよびメッセージサイズを設定する。

  • 設定したレベルでパブリッシュが完了したら肯定応答を取得する。

  • 相互 TLS 経由でメッセージを安全に送信する。

  • シリアル化種別を設定する。

サブスクライブ元のユースケース

Kafka Connector を使用すると、アプリケーションは後続のコンシュームダウンストリームのためのメッセージを Kafka から取得できます。ユースケースの例を次に示します。

Kafka からの読み取り:

  • Kafka サーバーと基盤となるグループをサブスクライブし、ダウンストリームで処理するためにメッセージを読み取ってコンシュームします。

  • 特定の Kafka パーティションをサブスクライブしてメッセージを処理する。

    • 高可用性マイクロサービスを使用するユーザーは、失敗時に Kafka の再調整を使用したくないと考えている。

    • パーティションに関連付けられたローカル状態があるユーザーは、そのパーティションのみを処理したいと考えている。

  • 一連の値やトピックの先頭または末尾に対するオフセット値を設定する。

    • ローカルストアがあるユーザーは、その情報を使用してレコードの処理を開始したいと考えている。

    • 時間的制約のあるレコードがあるユーザーは、アプリケーションを最新の状態にする必要がある場合は末尾までスキップし、新しいアプリケーションでレコードを処理する場合は先頭までスキップしたいと考えている。

  • 再起動時に代替ストレージシステムからオフセット値を読み取る。

    • オフセット情報が含まれるローカルストアがあるユーザーは、ローカルストレージから値を読み取りたいと考えている。

  • 変化するダウンストリーム処理期間に対応するためにメッセージの受信をブロックする期間を設定する。

メッセージの処理:

  • より詳細に制御するために、コミットを手動で設定して、ダウンストリームアプリケーションのクラッシュ時にデータ損失がないことを確認する。

  • ローカルストアまたは Kafka ストレージにコミット値を保存したり、保存が成功するまでブロックしたりする。

  • オフセットをローカルストアまたは Kafka ストレージに非同期に保存する。

  • 同じグループ ID から読み取りを行う複数のアプリケーションで Kafka の組み込みの負荷分散機能を使用する。

  • 1 回のコールで処理するレコード数を設定する。

サブスクライブの承認条件を次に示します。

  • 特定のパーティションまたはトピックから Kafka のメッセージを読み取る。

  • ローカルストアからオフセットを取得した後に Kafka のメッセージを読み取る。

  • キューの先頭、キューの末尾、および事前に指定したオフセットからメッセージを読み取る。

  • スレッドあたりのコンシューマーが 1 人の場合に高いレベルのパフォーマンスを確保する。1 つのグループには複数のコンシューマーが存在します。

対象者

次のステップ

前提条件を満たしたら、​Anypoint Studio​ でアプリケーションを作成できます。