Anypoint Studio を使用した Apache Kafka 4.7 の設定 - Mule 4

Anypoint Studio で、Apache Kafka 用 Anypoint Connector (Apache Kafka Connector) を Mule プロジェクトに追加し、Kafka クラスターへの接続を設定し、コネクタの入力元を設定します。

Anypoint Studio でコネクタを設定する手順は、次のとおりです。

  1. コネクタを Mule プロジェクトに追加します。

  2. コネクタを設定します。

  3. コネクタの入力元を設定します。

Studio でコネクタに追加する

  1. Studio で Mule プロジェクトを作成します。

  2. [Mule Palette (Mule パレット)] で、​[(X) Search in Exchange ((X) Exchange 内を検索)]​ をクリックします。

  3. [Add Modules to Project (モジュールをプロジェクトに追加)]​ で、検索項目にコネクタの名前を入力します。

  4. [Available modules (使用可能なモジュール)]​ で、そのコネクタ名をクリックします。

  5. [Add (追加)]​ をクリックします。

  6. [Finish (完了)]​ をクリックします。

Exchange を使用してコネクタを追加する

  1. Studio で Mule プロジェクトを作成します。

  2. Studio タスクバーの左上にある Exchange ​(X)​ アイコンをクリックします。

  3. Exchange で、​[Login (ログイン)]​ をクリックし、Anypoint Platform のユーザー名とパスワードを指定します。

  4. Exchange で、​[All assets (すべてのアセット)]​ を選択して ​Apache Kafka​ を検索します。

  5. コネクタを選択して ​[Add to project (プロジェクトに追加)]​ をクリックします。

  6. 画面の指示に従って、コネクタを Mule プロジェクトに追加します。

Consumer Configuration (コンシューマー設定)

Apache Kafka Connector から参照するコンシューマー設定のグローバル要素を作成できます。これにより、設定の詳細をフロー内の複数のローカル要素に適用できます。

  1. Studio キャンバスで ​[Global Elements (グローバル要素)]​ をクリックします。

  2. [Create (作成)]​ をクリックして、​[Connector Configuration (コネクタ設定)]​ を展開します。

  3. [Apache Kafka Consumer configuration (Apache Kafka コンシューマー設定)]​ を選択して、​[OK]​ をクリックします。

  4. [Connection (接続)]​ 項目で接続種別を選択します。

  5. 次の項目の値を入力します。

    接続種別 Field Name (項目名) 説明

    すべての接続種別

    Bootstrap Server URLs (ブートストラップサーバー URL)

    Kafka クラスターへの初期接続を確立する、ホストとポートのペアのリスト。この値を、Kafka コンシューマークライアントに提供する必要がある ​bootstrap.servers​ 値に設定します。

    Bean reference (bean 参照)

    コンシューマーが Kafka クラスターに接続するために使用できる URL

    グループ ID

    この設定を使用するすべての Kafka コンシューマーのデフォルトのグループ ID

    コンシューマー Kerberos

    Principal (プリンシパル)

    認証する Apache Kafka システムのエンティティ

    Service name (サービス名)

    Kafka Connector に関連付けられる Kerberos プリンシパル名。

    Kafka configuration file (krb5.conf) (Kafka 設定ファイル (krb5.conf))

    Kerberos 設定情報を含むファイル

    コンシューマー SASL/SCRAM

    Username (ユーザー名)

    Kafka へのログインに使用するユーザー名

    Password (パスワード)

    Kafka へのログインに使用するパスワード

    Encryption type (暗号化種別)

    SCRAM で使用される暗号化アルゴリズム

    コンシューマー SASL/PLAIN

    Username (ユーザー名)

    Kafka へのログインに使用するユーザー名

    Password (パスワード)

    Kafka へのログインに使用するパスワード

プロデューサー設定

プロデューサー設定のグローバル要素を作成します。

  1. Studio キャンバスで ​[Global Elements (グローバル要素)]​ をクリックします。

  2. [Create (作成)]​ をクリックして、​[Connector Configuration (コネクタ設定)]​ を展開します。

  3. [Apache Kafka Producer configuration (Apache Kafka プロデューサー設定)]​ を選択して、​[OK]​ をクリックします。

  4. [Connection (接続)]​ 項目で接続種別を選択します。

  5. 次の項目の値を入力します。

接続種別

Field Name (項目名)

説明

すべての接続種別

Bootstrap Server URLs (ブートストラップサーバー URL)

Kafka クラスターへの初期接続を確立する、ホストとポートのペアのリスト。この値を、Kafka プロデューサークライアントに提供する必要がある ​bootstrap.servers​ 値に設定します。

Bean reference (bean 参照)

プロデューサーが Kafka クラスターに接続するために使用できる URL

プロデューサー Kerberos

Principal (プリンシパル)

認証する Apache Kafka システムのエンティティ

Service name (サービス名)

Kafka Connector に関連付けられる Kerberos プリンシパル名。

Kerberos 設定ファイル (krb5.conf)

Kerberos 設定情報を含むファイル

コンシューマー SASL/SCRAM

Username (ユーザー名)

Kafka へのログインに使用するユーザー名

Password (パスワード)

Kafka へのログインに使用するパスワード

Encryption type (暗号化種別)

SCRAM で使用される暗号化アルゴリズム

コンシューマー SASL/PLAIN

Username (ユーザー名)

Kafka へのログインに使用するユーザー名

Password (パスワード)

Kafka へのログインに使用するパスワード

TLS を設定する

アプリケーション用に TLS を有効にする手順は、次のとおりです。

  1. [TLS]​ タブをクリックし、トラストストアとキーストアを設定します。

    • Trust Store Configuration (トラストストア設定)

      • Path (パス)
        トラストストアファイルの場所。

      • Password (パスワード)
        トラストストアファイルのパスワード。


      • トラストストアファイルのファイル形式。

      • Algorithm (アルゴリズム)
        トラストストアが使用するアルゴリズム。

      • Insecure (安全ではない)
        トラストストアを検証するかどうかを決定するブール値。​true​ に設定すると検証は行われません。デフォルト値は ​false​ です。

    • Key Store Configuration (キーストア設定)


      • キーストアファイルのファイル形式を指定します (省略可能)。デフォルト値は ​JKS​ です。

      • Path (パス)
        キーストアファイルの場所。省略可能で、コネクタの双方向認証に使用できます。

      • Alias (別名)
        キーストアに複数の非公開キーが含まれている場合に使用するキーの別名を示す属性。定義しない場合、ファイルにある最初のキーがデフォルトで使用されます。

      • Key password (キーパスワード)
        キーマネージャーのパスワード。キーストア内の非公開キーのパスワードです。

      • Password (パスワード)
        キーストアファイルのストアのパスワード。省略可能で、​[Key Store Location (キーストアの場所)]​ が設定されている場合にのみ必要です。

      • Algorithm (アルゴリズム)
        キーストアで使用するアルゴリズム。

        kafka tls studio config

Commit 操作の設定

  1. Commit​ 操作を Studio キャンバスにドラッグします。

  2. [General (一般)]​ タブで、​Commit​ 操作を設定します。

    名前

    説明

    デフォルト値

    必須

    Configuration (設定)

    String (文字列)

    使用する設定の名前。

    x

    Consumer commit key (コンシューマーコミットキー)

    String (文字列)

    前回のポーリングの commitKey。この操作は、この値を属性として Mule イベントに挿入するリスナーソース (​Batch Message Listener​ または ​Message Listener​) のいずれかを使用しているフロー内で使用される場合のみ有効です。

    x

    kafka commit studio config general
  3. [Advanced (詳細)]​ タブで、再接続戦略を設定します。

Consume 操作の設定

  1. Consume​ 操作を Studio キャンバスにドラッグします。

  2. [General (一般)]​ タブで、​Consume​ 操作を設定します。

    名前 説明 デフォルト値 必須

    Configuration (設定)

    String (文字列)

    使用する設定の名前。

    x

    Consumption timeout (コンシュームタイムアウト)

    Number (数値)

    この操作でメッセージの受信を待機する時間単位数。

    Timeout time unit (タイムアウト時間単位)

    Enumeration (列挙)。次のいずれかになります。

    • NANOSECONDS (ナノ秒)

    • MICROSECONDS (マイクロ秒)

    • MILLISECONDS (ミリ秒)

    • SECONDS (秒)

    • MINUTES (分)

    • HOURS (時間)

    • DAYS (日)

    タイムアウトプロパティの時間単位。

    kafka consume studio config
  3. [Advanced (詳細)]​ タブで、次の設定を定義します。

    名前 説明 デフォルト値 必須

    Operation Timeout (操作タイムアウト)

    Number (数値)

    Operation Timeout Time Unit (操作タイムアウト時間単位)

    Enumeration (列挙)。次のいずれかになります。

    • NANOSECONDS (ナノ秒)

    • MICROSECONDS (マイクロ秒)

    • MILLISECONDS (ミリ秒)

    • SECONDS (秒)

    • MINUTES (分)

    • HOURS (時間)

    • DAYS (日)

    Streaming Strategy (ストリーミング戦略)

    • repeatable-in-memory-stream

    • repeatable-file-store-stream

    • non-repeatable-stream

    反復可能なストリームを使用するように設定します。

    Target Variable (対象変数)

    String (文字列)

    操作の出力を保存する変数の名前。

    Target Value (対象値)

    String (文字列)

    操作の出力を評価する式。評価の結果は対象変数に保存されます。

    #[payload]

    Reconnection Strategy (再接続戦略)

    • reconnect (再接続)

    • reconnect-forever (繰り返し再接続)

    接続エラーが発生した場合の再試行戦略。

Publish 操作の設定

  1. Publish​ 操作を Studio キャンバスにドラッグします。

  2. [General (一般)]​ タブで、​Publish​ 操作を設定します。

    名前 説明 デフォルト値 必須

    Configuration (設定)

    String (文字列)

    使用する設定の名前。

    x

    Topic (トピック)

    String (文字列)

    パブリッシュするトピック。

    Partition (パーティション)

    Number (数値)

    (省略可能) トピックパーティション。

    Key (キー)

    Binary (バイナリ)

    (省略可能) パブリッシュされたメッセージのキー。

    Message (メッセージ)

    Binary (バイナリ)

    (省略可能) メッセージのメッセージコンテンツ。

    #[payload]

    Headers (ヘッダー)

    Object (オブジェクト)

    (省略可能) メッセージのヘッダー。

    kafka publish studio config
  3. [Advanced (詳細)]​ タブで、次の設定を定義します。

    名前 説明 デフォルト値 必須

    Transactional Action (トランザクションアクション)

    Enumeration (列挙)。次のいずれかになります。

    • ALWAYS_JOIN​

    • JOIN_IF_POSSIBLE​

    • NOT_SUPPORTED

    トランザクションに関する操作で実行できる結合アクションの種別。

    JOIN_IF_POSSIBLE

    Target Variable (対象変数)

    String (文字列)

    操作の出力を保存する変数の名前。

    Target Value (対象値)

    String (文字列)

    操作の出力に対して評価される式。この式の結果が対象変数に保存されます。

    #[payload]

    Reconnection Strategy (再接続戦略)

    • reconnect (再接続)

    • reconnect-forever (繰り返し再接続)

    接続エラーが発生した場合の再試行戦略。

Seek 操作の設定

  1. Seek​ 操作を Studio キャンバスにドラッグします。

  2. [General (一般)]​ タブで、​Seek​ 操作を設定します。

    名前 説明 デフォルト値 必須

    Configuration (設定)

    String (文字列)

    使用する設定の名前。

    x

    Topic (トピック)

    String (文字列)

    Seek 操作の実行の基盤となるトピックの名前。

    x

    Partition (パーティション)

    Number (数値)

    オフセットの変更の対象であるパーティション番号。

    x

    オフセット

    Number (数値)

    設定済みパーティションに対してコミットするオフセット値。

  3. [Advanced (詳細)]​ タブで、次の設定を定義します。

    名前 説明 デフォルト値 必須

    Operation Timeout (操作タイムアウト)

    Number (数値)

    Operation Timeout Time Unit (操作タイムアウト時間単位)

    Enumeration (列挙)。次のいずれかになります。

    • NANOSECONDS (ナノ秒)

    • MICROSECONDS (マイクロ秒)

    • MILLISECONDS (ミリ秒)

    • SECONDS (秒)

    • MINUTES (分)

    • HOURS (時間)

    • DAYS (日)

    Reconnection Strategy (再接続戦略)

    • reconnect (再接続)

    • reconnect-forever (繰り返し再接続)

    接続エラーが発生した場合の再試行戦略。

入力元の設定

Message Consumer​ 操作など、コネクタの入力元を設定します。

名前 説明

Configuration (設定)

使用する設定の名前。

Topic (トピック)

メッセージのコンシューム元のトピックの名前。

Primary Node Only (プライマリノードのみ)

クラスターでの実行時、このソースをプライマリノード上でのみ実行するかどうか。

Streaming Strategy (ストリーミング戦略)

  • repeatable-in-memory-stream

  • repeatable-file-store-stream

  • non-repeatable-stream

反復可能なストリームを使用するように設定します。

Redelivery Policy (再配信ポリシー)

同じメッセージの再配信を処理するためのポリシーを定義します。

Reconnection Strategy (再接続戦略)

接続エラーが発生した場合の再試行戦略。

  • reconnect (再接続)

  • reconnect-forever (繰り返し再接続)

Apache Kafka Connector の入力元として ​Batch Message Listener​ 操作を使用することもできます。

名前 説明

Connector Configuration (コネクタ設定)

使用する設定の名前。

Poll timeout (ポーリングタイムアウト)

ブロックする時間。

Poll timeout time unit (ポーリングタイムアウト時間単位)

ポーリングタイムアウトの時間単位。これをポーリングタイムアウトと組み合わせて、ポーリングの合計タイムアウトを定義します。

Acknowledgment mode (肯定応答モード)

サポートされる肯定応答モード種別を宣言します。

並列コンシューマーの量。

並列で使用するコンシューマーの数を宣言します。

Primary Node Only (プライマリノードのみ)

クラスターでの実行時、このソースをプライマリノード上でのみ実行する必要があるかどうか。

Streaming Strategy (ストリーミング戦略)

ストリーミング戦略を定義します。

  • repeatable-in-memory-stream

  • repeatable-file-store-stream

  • non-repeatable-stream

Redelivery Policy (再配信ポリシー)

同じメッセージの再配信を処理するためのポリシーを定義します。

Reconnection Strategy (再接続戦略)

接続エラーが発生した場合の再試行戦略。

  • reconnect (再接続)

  • reconnect-forever (繰り返し再接続)