Apache Kafka コネクタリファレンス - Mule 4

サポートカテゴリ: 選択

Kafka コネクタ v3.0

Apache Kafka 用 Anypoint コネクタ (Kafka コネクタ) を使用すると、Apache Kafka メッセージングシステムとやりとりでき、Mule Runtime を使用した Mule アプリケーションと Apache Kafka クラスタ間のシームレスなインテグレーションが可能になります。

リリースノート: ​Apache Kafka Connector Release Notes - Mule 4

設定


Consumer Configuration (コンシューマ設定)

パラメータ

名前 説明 デフォルト値 必須

Name (名前)

String (文字列)

この設定の名前。コネクタはこの名前の設定を参照します。

x

Connection (接続)

この設定に指定する接続型。

x

Expiration Policy (有効期限ポリシー)

動的設定インスタンスがアイドル状態を続けられる最小時間を設定します。この時間が経過すると、Runtime で期限切れに相当するとみなされます。これは、インスタンスが有効期限の対象となった瞬間にプラットフォームでそのインスタンスが期限切れになるということではありません。必要に応じて、インスタンスがパージされます。

接続種別

Kafka の基本コンシューマ接続
パラメータ
名前 説明 デフォルト値 必須

Consumer Partitions (コンシューマパーティション)

Number (数値)

このコンシューマに使用するパーティションの数。

1

Group Id (グループ ID)

String (文字列)

このコンシューマが属するコンシューマグループを識別する一意の文字列。

x

Bootstrap Servers (ブートストラップサーバ)

String (文字列)

Kafka クラスタへの初期接続の確立に使用するカンマ区切りのホストとポートのペア。Kafka クライアント (プロデューサおよびコンシューマ) に指定する bootstrap.servers​ 値と同じです。

x

Additional Properties (追加プロパティ)

Object (オブジェクト)

接続に必要なキー-値ペアなどの追加プロパティ。

Reconnection (再接続)

アプリケーションがデプロイされると、すべてのコネクタで接続テストが実行されます。true​ に設定されている場合、関連する再接続戦略をすべて実行した後にテストに合格しないと、デプロイが失敗します。

Kafka の Kerberos コンシューマ接続
パラメータ
名前 説明 デフォルト値 必須

Consumer Partitions (コンシューマパーティション)

Number (数値)

このコンシューマに使用するパーティションの数。

1

Group Id (グループ ID)

String (文字列)

このコンシューマが属するコンシューマグループを識別する一意の文字列。

x

Bootstrap Servers (ブートストラップサーバ)

String (文字列)

Kafka クラスタへの初期接続の確立に使用するカンマ区切りのホストとポートのペア。Kafka クライアント (プロデューサおよびコンシューマ) に指定する bootstrap.servers​ 値と同じです。

x

Additional Properties (追加プロパティ)

Object (オブジェクト)

接続に必要なキー-値ペアなどの追加プロパティ。

Reconnection (再接続)

アプリケーションがデプロイされると、すべてのコネクタで接続テストが実行されます。true​ に設定されている場合、関連する再接続戦略をすべて実行した後にテストに合格しないと、デプロイが失敗します。

Kerberos Config File (Kerberos 設定ファイル)

String (文字列)

Kerberos 設定ファイル。不可欠な Kerberos 設定情報は、デフォルトの領域とデフォルトの KDC です。

Principal (プリンシパル)

String (文字列)

Kerberos プリンシパル。

x

Keytab (キータブ)

String (文字列)

principal​ に関連付けられた keytab ファイルへのパス。.

x

Service Name (サービス名)

String (文字列)

Kafka ブローカーが実行時に使用する Kerberos プリンシパル名。

x

Additional JAAS Properties (JAAS の追加プロパティ)

Object (オブジェクト)

キー-値ペアなどの追加プロパティ。sasl.jaas.config​ で設定する必要があり、通常は JAAS 設定ファイルに格納します。

Kafka の Kerberos SSL コンシューマ接続
パラメータ
名前 説明 デフォルト値 必須

Consumer Partitions (コンシューマパーティション)

Number (数値)

このコンシューマに使用するパーティションの数。

1

Group Id (グループ ID)

String (文字列)

このコンシューマが属するコンシューマグループを識別する一意の文字列。

x

Bootstrap Servers (ブートストラップサーバ)

String (文字列)

Kafka クラスタへの初期接続の確立に使用するカンマ区切りのホストとポートのペア。Kafka クライアント (プロデューサおよびコンシューマ) に指定する bootstrap.servers​ 値と同じです。

x

Additional Properties (追加プロパティ)

Object (オブジェクト)

接続に必要なキー-値ペアなどの追加プロパティ。

Reconnection (再接続)

アプリケーションがデプロイされると、すべてのコネクタで接続テストが実行されます。true​ に設定されている場合、関連する再接続戦略をすべて実行した後にテストに合格しないと、デプロイが失敗します。

Key Store Type (キーストア種別)

String (文字列)

キーストアファイルのファイル形式。省略可能で、デフォルト値は JKS​ です。.

JKS

Key Store Password (キーストアのパスワード)

String (文字列)

キーストアファイルのストアのパスワード。省略可能で、[Key Store Location (キーストアの場所)]​ が設定されている場合にのみ必要です。

Key Store Location (キーストアの場所)

String (文字列)

キーストアファイルの場所。省略可能で、コネクタの双方向認証に使用できます。

Trust Store Type (トラストストア種別)

String (文字列)

トラストストアファイルのファイル形式。

JKS

Trust Store Password (トラストストアのパスワード)

String (文字列)

トラストストアファイルのパスワード。

x

Trust Store Location (トラストストアの場所)

String (文字列)

トラストストアファイルの場所。

x

Kerberos Config File (Kerberos 設定ファイル)

String (文字列)

Kerberos 設定ファイル。不可欠な Kerberos 設定情報は、デフォルトの領域とデフォルトの KDC です。

Principal (プリンシパル)

String (文字列)

Kerberos プリンシパル。

x

Keytab (キータブ)

String (文字列)

principal​ に関連付けられた keytab ファイルへのパス。.

x

Service Name (サービス名)

String (文字列)

Kafka ブローカーが実行時に使用する Kerberos プリンシパル名。

x

Additional JAAS Properties (JAAS の追加プロパティ)

Object (オブジェクト)

キー-値ペアなどの追加プロパティ。sasl.jaas.config​ で設定する必要があり、通常は JAAS 設定ファイルに格納します。

Kafka の SSL コンシューマ接続
パラメータ
名前 説明 デフォルト値 必須

Consumer Partitions (コンシューマパーティション)

Number (数値)

このコンシューマに使用するパーティションの数。

1

Group Id (グループ ID)

String (文字列)

このコンシューマが属するコンシューマグループを識別する一意の文字列。

x

Bootstrap Servers (ブートストラップサーバ)

String (文字列)

Kafka クラスタへの初期接続の確立に使用するカンマ区切りのホストとポートのペア。Kafka クライアント (プロデューサおよびコンシューマ) に指定する bootstrap.servers​ 値と同じです。

x

Additional Properties (追加プロパティ)

Object (オブジェクト)

接続に必要なキー-値ペアなどの追加プロパティ。

Reconnection (再接続)

アプリケーションがデプロイされると、すべてのコネクタで接続テストが実行されます。true​ に設定されている場合、関連する再接続戦略をすべて実行した後にテストに合格しないと、デプロイが失敗します。

Key Store Type (キーストア種別)

String (文字列)

キーストアファイルのファイル形式。省略可能で、デフォルト値は JKS​ です。.

JKS

Key Store Password (キーストアのパスワード)

String (文字列)

キーストアファイルのストアのパスワード。省略可能で、[Key Store Location (キーストアの場所)]​ が設定されている場合にのみ必要です。

Key Store Location (キーストアの場所)

String (文字列)

キーストアファイルの場所。省略可能で、コネクタの双方向認証に使用できます。

Trust Store Type (トラストストア種別)

String (文字列)

トラストストアファイルのファイル形式。

JKS

Trust Store Password (トラストストアのパスワード)

String (文字列)

トラストストアファイルのパスワード。

x

Trust Store Location (トラストストアの場所)

String (文字列)

トラストストアファイルの場所。

x

関連付けられたソース


プロデューサ設定

パラメータ

名前 説明 デフォルト値 必須

Name (名前)

String (文字列)

この設定の名前。コネクタはこの名前の設定を参照します。

x

Connection (接続)

この設定に指定する接続型。

x

Expiration Policy (有効期限ポリシー)

動的設定インスタンスがアイドル状態を続けられる最小時間を設定します。この時間が経過すると、Runtime で期限切れに相当するとみなされます。これは、インスタンスが有効期限の対象となった瞬間にプラットフォームでそのインスタンスが期限切れになるということではありません。必要に応じて、インスタンスがパージされます。

接続種別

Kafka の基本プロデューサ接続
パラメータ
名前 説明 デフォルト値 必須

Bootstrap Servers (ブートストラップサーバ)

String (文字列)

Kafka クラスタへの初期接続の確立に使用するカンマ区切りのホストとポートのペア。Kafka クライアント (プロデューサおよびコンシューマ) に指定する bootstrap.servers​ 値と同じです。

x

Additional Properties (追加プロパティ)

Object (オブジェクト)

接続に必要なキー-値ペアなどの追加プロパティ。

Reconnection (再接続)

アプリケーションがデプロイされると、すべてのコネクタで接続テストが実行されます。true​ に設定されている場合、関連する再接続戦略をすべて実行した後にテストに合格しないと、デプロイが失敗します。

Kafka の Kerberos プロデューサ接続
パラメータ
名前 説明 デフォルト値 必須

Bootstrap Servers (ブートストラップサーバ)

String (文字列)

Kafka クラスタへの初期接続の確立に使用するカンマ区切りのホストとポートのペア。Kafka クライアント (プロデューサおよびコンシューマ) に指定する bootstrap.servers​ 値と同じです。

x

Additional Properties (追加プロパティ)

Object (オブジェクト)

接続に必要なキー-値ペアなどの追加プロパティ。

Reconnection (再接続)

アプリケーションがデプロイされると、すべてのコネクタで接続テストが実行されます。true​ に設定されている場合、関連する再接続戦略をすべて実行した後にテストに合格しないと、デプロイが失敗します。

Kerberos Config File (Kerberos 設定ファイル)

String (文字列)

Kerberos 設定ファイル。不可欠な Kerberos 設定情報は、デフォルトの領域とデフォルトの KDC です。

Principal (プリンシパル)

String (文字列)

Kerberos プリンシパル。

x

Keytab (キータブ)

String (文字列)

principal​ に関連付けられた keytab ファイルへのパス。.

x

Service Name (サービス名)

String (文字列)

Kafka ブローカーが実行時に使用する Kerberos プリンシパル名。

x

Additional JAAS Properties (JAAS の追加プロパティ)

Object (オブジェクト)

キー-値ペアなどの追加プロパティ。sasl.jaas.config​ で設定する必要があり、通常は JAAS 設定ファイルに格納します。

Kafka の Kerberos SSL プロデューサ接続
パラメータ
名前 説明 デフォルト値 必須

Bootstrap Servers (ブートストラップサーバ)

String (文字列)

Kafka クラスタへの初期接続の確立に使用するカンマ区切りのホストとポートのペア。Kafka クライアント (プロデューサおよびコンシューマ) に指定する bootstrap.servers​ 値と同じです。

x

Additional Properties (追加プロパティ)

Object (オブジェクト)

接続に必要なキー-値ペアなどの追加プロパティ。

Reconnection (再接続)

アプリケーションがデプロイされると、すべてのコネクタで接続テストが実行されます。true​ に設定されている場合、関連する再接続戦略をすべて実行した後にテストに合格しないと、デプロイが失敗します。

Key Store Type (キーストア種別)

String (文字列)

キーストアファイルのファイル形式。省略可能で、デフォルト値は JKS​ です。.

JKS

Key Store Password (キーストアのパスワード)

String (文字列)

キーストアファイルのストアのパスワード。省略可能で、[Key Store Location (キーストアの場所)]​ が設定されている場合にのみ必要です。

Key Store Location (キーストアの場所)

String (文字列)

キーストアファイルの場所。省略可能で、コネクタの双方向認証に使用できます。

Trust Store Type (トラストストア種別)

String (文字列)

トラストストアファイルのファイル形式。

JKS

Trust Store Password (トラストストアのパスワード)

String (文字列)

トラストストアファイルのパスワード。

x

Trust Store Location (トラストストアの場所)

String (文字列)

トラストストアファイルの場所。

x

Kerberos Config File (Kerberos 設定ファイル)

String (文字列)

Kerberos 設定ファイル。不可欠な Kerberos 設定情報は、デフォルトの領域とデフォルトの KDC です。

Principal (プリンシパル)

String (文字列)

Kerberos プリンシパル。

x

Keytab (キータブ)

String (文字列)

principal​ に関連付けられた keytab ファイルへのパス。.

x

Service Name (サービス名)

String (文字列)

Kafka ブローカーが実行時に使用する Kerberos プリンシパル名。

x

Additional JAAS Properties (JAAS の追加プロパティ)

Object (オブジェクト)

キー-値ペアなどの追加プロパティ。sasl.jaas.config​ で設定する必要があり、通常は JAAS 設定ファイルに格納します。

Kafka の SSL プロデューサ接続
パラメータ
名前 説明 デフォルト値 必須

Bootstrap Servers (ブートストラップサーバ)

String (文字列)

Kafka クラスタへの初期接続の確立に使用するカンマ区切りのホストとポートのペア。Kafka クライアント (プロデューサおよびコンシューマ) に指定する bootstrap.servers​ 値と同じです。

x

Additional Properties (追加プロパティ)

Object (オブジェクト)

接続に必要なキー-値ペアなどの追加プロパティ。

Reconnection (再接続)

アプリケーションがデプロイされると、すべてのコネクタで接続テストが実行されます。true​ に設定されている場合、関連する再接続戦略をすべて実行した後にテストに合格しないと、デプロイが失敗します。

Key Store Type (キーストア種別)

String (文字列)

キーストアファイルのファイル形式。省略可能で、デフォルト値は JKS​ です。.

JKS

Key Store Password (キーストアのパスワード)

String (文字列)

キーストアファイルのストアのパスワード。省略可能で、[Key Store Location (キーストアの場所)]​ が設定されている場合にのみ必要です。

Key Store Location (キーストアの場所)

String (文字列)

キーストアファイルの場所。省略可能で、コネクタの双方向認証に使用できます。

Trust Store Type (トラストストア種別)

String (文字列)

トラストストアファイルのファイル形式。

JKS

Trust Store Password (トラストストアのパスワード)

String (文字列)

トラストストアファイルのパスワード。

x

Trust Store Location (トラストストアの場所)

String (文字列)

トラストストアファイルの場所。

x

サポートされている操作

操作

Publish Message

<kafka:producer>

Kafka トピックにメッセージを送信します。

パラメータ

名前 説明 デフォルト値 必須

Configuration (設定)

String (文字列)

使用する設定の名前。

x

Topic (トピック)

String (文字列)

メッセージの送信先のトピック。

x

Key (キー)

String (文字列)

送信するメッセージに属するキー。

x

Message (メッセージ)

Binary (バイナリ)

送信するメッセージ。

#[payload]

Reconnection Strategy (再接続戦略)

接続エラーが発生した場合の再試行戦略。

次の設定の場合

スロー

  • KAFKA:CONNECTIVITY

  • KAFKA:RETRY_EXHAUSTED

  • KAFKA:UNABLE_TO_SEND_MESSAGE

  • KAFKA:UNKNOWN

  • KAFKA:VALIDATION

ソース

Message Consumer

<kafka:consumer>

指定したトピックからメッセージをコンシュームするソース。

パラメータ

KAFKA:VALIDATION 説明 デフォルト値 必須

Configuration (設定)

String (文字列)

使用する設定の名前。

x

Topic (トピック)

String (文字列)

メッセージのコンシューム元のトピックの名前。

x

Partition Offsets (パーティションオフセット)

オフセット​ の配列

設定するパーティションのオフセットのリスト。リストの要素ごとに、パーティションのインデックスとオフセットを指定する必要があります。

Primary Node Only (プライマリノードのみ)

Boolean (ブール)

クラスタでの実行時、このソースをプライマリノード上でのみ実行する必要があるかどうか。

Streaming Strategy (ストリーミング戦略)

反復可能なストリームを使用するように設定します。

Redelivery Policy (再配信ポリシー)

同じメッセージの再配信を処理するためのポリシーを定義します。

Reconnection Strategy (再接続戦略)

接続エラーが発生した場合の再試行戦略。

出力

Binary (バイナリ)

Attributes Type (属性型)

Reconnection (再接続)

項目 説明 デフォルト値 必須

Fails Deployment (デプロイに失敗)

Boolean (ブール)

アプリケーションがデプロイされると、すべてのコネクタで接続テストが実行されます。true​ に設定されている場合、関連する再接続戦略をすべて実行した後にテストに合格しないと、デプロイが失敗します。

Reconnection Strategy (再接続戦略)

使用する再接続戦略。

Reconnect (再接続)

項目 説明 デフォルト値 必須

Frequency (頻度)

Number (数値)

再接続する頻度 (ミリ秒)。

Count (数)

Number (数値)

再接続の試行回数。

blocking (ブロック)

Boolean (ブール)

false の場合、再接続戦略が個別の非ブロックスレッドで実行されます。

true

Reconnect Forever (繰り返し再接続)

項目 説明 デフォルト値 必須

Frequency (頻度)

Number (数値)

再接続する頻度 (ミリ秒)。

blocking (ブロック)

Boolean (ブール)

false の場合、再接続戦略が個別の非ブロックスレッドで実行されます。

true

Expiration Policy (有効期限ポリシー)

項目 説明 デフォルト値 必須

Max Idle Time (最大アイドル時間)

Number (数値)

有効期限の対象とみなされるまで、動的設定インスタンスがアイドル状態を維持できる最大時間のスカラー時間値。

Time Unit (時間単位)

Enumeration (列挙)。次のいずれかになります。

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAY

maxIdleTime​ 属性の時間単位。

Kafka メッセージ属性

項目 説明 デフォルト値 必須

Topic (トピック)

String (文字列)

x

Key (キー)

String (文字列)

x

Partition (パーティション)

Number (数値)

x

オフセット

Number (数値)

x

オフセット

項目 説明 デフォルト値 必須

Partition Number (パーティション番号)

String (文字列)

Partition Offset (パーティションオフセット)

String (文字列)

Repeatable In Memory Stream (反復可能なメモリ内ストリーム)

項目 説明 デフォルト値 必須

Initial Buffer Size (初期バッファサイズ)

Number (数値)

ランダムアクセスを提供し、ストリームをコンシュームするために割り当てられるメモリ量。ストリームのデータ量がこのバッファサイズを超える場合は、[Buffer Size Increment (バッファサイズ増分)]​ 属性の値に従って、maxInMemorySize​ を上限としてバッファが拡張されます。.

Buffer Size Increment (バッファサイズ増分)

Number (数値)

バッファサイズが初期サイズを超えた場合に拡張する量。値を 0 以下に設定すると、バッファが拡張されず、バッファがフルになると、STREAM_MAXIMUM_SIZE_EXCEEDED​ エラーが発生します。

Max Buffer Size (最大バッファサイズ)

Number (数値)

使用するメモリの最大量。これを超えると、STREAM_MAXIMUM_SIZE_EXCEEDED​ エラーが発生します。0 以下の値は無制限を意味します。

Buffer Unit (バッファ単位)

Enumeration (列挙)。次のいずれかになります。

  • BYTE

  • KB

  • MB

  • GB

これらのすべての属性の単位。

Repeatable File Store Stream (反復可能なファイルストアストリーム)

項目 説明 デフォルト値 必須

Max In Memory Size (最大メモリ内サイズ)

Number (数値)

データをメモリ内に保持するためにストリームで使用する最大メモリを定義します。これを超えると、ディスクへのコンテンツのバッファが開始されます。

Buffer Unit (バッファ単位)

Enumeration (列挙)。次のいずれかになります。

  • BYTE

  • KB

  • MB

  • GB

maxInMemorySize で表される単位。

Redelivery Policy (再配信ポリシー)

項目 説明 デフォルト値 必須

Max Redelivery Count (最大再配信数)

Number (数値)

正常に処理されずに process-failed​ メッセージがトリガされるまでにメッセージを再配信できる最大回数。

Use Secure Hash (セキュアハッシュを使用)

Boolean (ブール)

再配信されたメッセージの識別にセキュアハッシュアルゴリズムを使用するかどうか。

Message Digest Algorithm (メッセージダイジェストアルゴリズム)

String (文字列)

使用するセキュアハッシュアルゴリズム。設定しないと、デフォルトの SHA-256​ が使用されます。.

Id Expression (ID 式)

String (文字列)

メッセージがいつ再配信されたのかを判断するために使用する 1 つ以上の式を定義します。このプロパティは、useSecureHash が false の場合にのみ設定できます。

Object Store (オブジェクトストア)

Object Store (オブジェクトストア)

各メッセージの再配信カウンタが保存されるオブジェクトストア。

Was this article helpful?

💙 Thanks for your feedback!