Flex Gateway新着情報
Governance新着情報
Monitoring API ManagerAnypoint Studio で、Apache Kafka 用 Anypoint Connector (Apache Kafka Connector) を Mule プロジェクトに追加し、Kafka クラスターへの接続を設定し、コネクタの入力元を設定します。
Anypoint Studio でコネクタを設定する手順は、次のとおりです。
コネクタを Mule プロジェクトに追加します。
コネクタを設定します。
コネクタの入力元を設定します。
Studio で Mule プロジェクトを作成します。
[Mule Palette (Mule パレット)] で、[(X) Search in Exchange ((X) Exchange 内を検索)] をクリックします。
[Add Modules to Project (モジュールをプロジェクトに追加)] で、検索項目にコネクタの名前を入力します。
[Available modules (使用可能なモジュール)] で、そのコネクタ名をクリックします。
[Add (追加)] をクリックします。
[Finish (完了)] をクリックします。
Studio で Mule プロジェクトを作成します。
Studio タスクバーの左上にある Exchange (X) アイコンをクリックします。
Exchange で、[Login (ログイン)] をクリックし、Anypoint Platform のユーザー名とパスワードを指定します。
Exchange で、[All assets (すべてのアセット)] を選択して Apache Kafka
を検索します。
コネクタを選択して [Add to project (プロジェクトに追加)] をクリックします。
画面の指示に従って、コネクタを Mule プロジェクトに追加します。
Apache Kafka Connector から参照するコンシューマー設定のグローバル要素を作成できます。これにより、設定の詳細をフロー内の複数のローカル要素に適用できます。
Studio キャンバスで [Global Elements (グローバル要素)] をクリックします。
[Create (作成)] をクリックして、[Connector Configuration (コネクタ設定)] を展開します。
[Apache Kafka Consumer configuration (Apache Kafka コンシューマー設定)] を選択して、[OK] をクリックします。
[Connection (接続)] 項目で接続種別を選択します。
次の項目の値を入力します。
接続種別 | Field Name (項目名) | 説明 |
---|---|---|
すべての接続種別 |
Bootstrap Server URLs (ブートストラップサーバー URL) |
Kafka クラスターへの初期接続を確立する、ホストとポートのペアのリスト。この値を、Kafka コンシューマークライアントに提供する必要がある |
Bean reference (bean 参照) |
コンシューマーが Kafka クラスターに接続するために使用できる URL |
|
グループ ID |
この設定を使用するすべての Kafka コンシューマーのデフォルトのグループ ID |
|
コンシューマー Kerberos |
Principal (プリンシパル) |
認証する Apache Kafka システムのエンティティ |
Service name (サービス名) |
Kafka Connector に関連付けられる Kerberos プリンシパル名。 |
|
Kafka configuration file (krb5.conf) (Kafka 設定ファイル (krb5.conf)) |
Kerberos 設定情報を含むファイル |
|
コンシューマー SASL/SCRAM |
Username (ユーザー名) |
Kafka へのログインに使用するユーザー名 |
Password (パスワード) |
Kafka へのログインに使用するパスワード |
|
Encryption type (暗号化種別) |
SCRAM で使用される暗号化アルゴリズム |
|
コンシューマー SASL/PLAIN |
Username (ユーザー名) |
Kafka へのログインに使用するユーザー名 |
Password (パスワード) |
Kafka へのログインに使用するパスワード |
プロデューサー設定のグローバル要素を作成します。
Studio キャンバスで [Global Elements (グローバル要素)] をクリックします。
[Create (作成)] をクリックして、[Connector Configuration (コネクタ設定)] を展開します。
[Apache Kafka Producer configuration (Apache Kafka プロデューサー設定)] を選択して、[OK] をクリックします。
[Connection (接続)] 項目で接続種別を選択します。
次の項目の値を入力します。
接続種別 |
Field Name (項目名) |
説明 |
すべての接続種別 |
Bootstrap Server URLs (ブートストラップサーバー URL) |
Kafka クラスターへの初期接続を確立する、ホストとポートのペアのリスト。この値を、Kafka プロデューサークライアントに提供する必要がある |
Bean reference (bean 参照) |
プロデューサーが Kafka クラスターに接続するために使用できる URL |
|
プロデューサー Kerberos |
Principal (プリンシパル) |
認証する Apache Kafka システムのエンティティ |
Service name (サービス名) |
Kafka Connector に関連付けられる Kerberos プリンシパル名。 |
|
Kerberos 設定ファイル (krb5.conf) |
Kerberos 設定情報を含むファイル |
|
コンシューマー SASL/SCRAM |
Username (ユーザー名) |
Kafka へのログインに使用するユーザー名 |
Password (パスワード) |
Kafka へのログインに使用するパスワード |
|
Encryption type (暗号化種別) |
SCRAM で使用される暗号化アルゴリズム |
|
コンシューマー SASL/PLAIN |
Username (ユーザー名) |
Kafka へのログインに使用するユーザー名 |
Password (パスワード) |
Kafka へのログインに使用するパスワード |
アプリケーション用に TLS を有効にする手順は、次のとおりです。
[TLS] タブをクリックし、トラストストアとキーストアを設定します。
Trust Store Configuration (トラストストア設定)
Path (パス)
トラストストアファイルの場所。
Password (パスワード)
トラストストアファイルのパスワード。
型
トラストストアファイルのファイル形式。
Algorithm (アルゴリズム)
トラストストアが使用するアルゴリズム。
Insecure (安全ではない)
トラストストアを検証するかどうかを決定するブール値。true
に設定すると検証は行われません。デフォルト値は false
です。
Key Store Configuration (キーストア設定)
型
キーストアファイルのファイル形式を指定します (省略可能)。デフォルト値は JKS
です。
Path (パス)
キーストアファイルの場所。省略可能で、コネクタの双方向認証に使用できます。
Alias (別名)
キーストアに複数の非公開キーが含まれている場合に使用するキーの別名を示す属性。定義しない場合、ファイルにある最初のキーがデフォルトで使用されます。
Key password (キーパスワード)
キーマネージャーのパスワード。キーストア内の非公開キーのパスワードです。
Password (パスワード)
キーストアファイルのストアのパスワード。省略可能で、[Key Store Location (キーストアの場所)] が設定されている場合にのみ必要です。
Algorithm (アルゴリズム)
キーストアで使用するアルゴリズム。
Commit 操作を Studio キャンバスにドラッグします。
[General (一般)] タブで、Commit 操作を設定します。
名前 |
型 |
説明 |
デフォルト値 |
必須 |
Configuration (設定) |
String (文字列) |
使用する設定の名前。 |
x |
|
Consumer commit key (コンシューマーコミットキー) |
String (文字列) |
前回のポーリングの commitKey。この操作は、この値を属性として Mule イベントに挿入するリスナーソース (Batch Message Listener または Message Listener) のいずれかを使用しているフロー内で使用される場合のみ有効です。 |
x |
[Advanced (詳細)] タブで、再接続戦略を設定します。
Consume 操作を Studio キャンバスにドラッグします。
[General (一般)] タブで、Consume 操作を設定します。
名前 | 型 | 説明 | デフォルト値 | 必須 |
---|---|---|---|---|
Configuration (設定) |
String (文字列) |
使用する設定の名前。 |
x |
|
Consumption timeout (コンシュームタイムアウト) |
Number (数値) |
この操作でメッセージの受信を待機する時間単位数。 |
||
Timeout time unit (タイムアウト時間単位) |
Enumeration (列挙)。次のいずれかになります。
|
タイムアウトプロパティの時間単位。 |
[Advanced (詳細)] タブで、次の設定を定義します。
名前 | 型 | 説明 | デフォルト値 | 必須 |
---|---|---|---|---|
Operation Timeout (操作タイムアウト) |
Number (数値) |
|||
Operation Timeout Time Unit (操作タイムアウト時間単位) |
Enumeration (列挙)。次のいずれかになります。
|
|||
Streaming Strategy (ストリーミング戦略) |
|
反復可能なストリームを使用するように設定します。 |
||
Target Variable (対象変数) |
String (文字列) |
操作の出力を保存する変数の名前。 |
||
Target Value (対象値) |
String (文字列) |
操作の出力を評価する式。評価の結果は対象変数に保存されます。 |
|
|
Reconnection Strategy (再接続戦略) |
|
接続エラーが発生した場合の再試行戦略。 |
Publish 操作を Studio キャンバスにドラッグします。
[General (一般)] タブで、Publish 操作を設定します。
名前 | 型 | 説明 | デフォルト値 | 必須 |
---|---|---|---|---|
Configuration (設定) |
String (文字列) |
使用する設定の名前。 |
x |
|
Topic (トピック) |
String (文字列) |
パブリッシュするトピック。 |
||
Partition (パーティション) |
Number (数値) |
(省略可能) トピックパーティション。 |
||
Key (キー) |
Binary (バイナリ) |
(省略可能) パブリッシュされたメッセージのキー。 |
||
Message (メッセージ) |
Binary (バイナリ) |
(省略可能) メッセージのメッセージコンテンツ。 |
|
|
Headers (ヘッダー) |
Object (オブジェクト) |
(省略可能) メッセージのヘッダー。 |
[Advanced (詳細)] タブで、次の設定を定義します。
名前 | 型 | 説明 | デフォルト値 | 必須 |
---|---|---|---|---|
Transactional Action (トランザクションアクション) |
Enumeration (列挙)。次のいずれかになります。
|
トランザクションに関する操作で実行できる結合アクションの種別。 |
|
|
Target Variable (対象変数) |
String (文字列) |
操作の出力を保存する変数の名前。 |
||
Target Value (対象値) |
String (文字列) |
操作の出力に対して評価される式。この式の結果が対象変数に保存されます。 |
|
|
Reconnection Strategy (再接続戦略) |
|
接続エラーが発生した場合の再試行戦略。 |
Seek 操作を Studio キャンバスにドラッグします。
[General (一般)] タブで、Seek 操作を設定します。
名前 | 型 | 説明 | デフォルト値 | 必須 |
---|---|---|---|---|
Configuration (設定) |
String (文字列) |
使用する設定の名前。 |
x |
|
Topic (トピック) |
String (文字列) |
Seek 操作の実行の基盤となるトピックの名前。 |
x |
|
Partition (パーティション) |
Number (数値) |
オフセットの変更の対象であるパーティション番号。 |
x |
|
オフセット |
Number (数値) |
設定済みパーティションに対してコミットするオフセット値。 |
[Advanced (詳細)] タブで、次の設定を定義します。
名前 | 型 | 説明 | デフォルト値 | 必須 |
---|---|---|---|---|
Operation Timeout (操作タイムアウト) |
Number (数値) |
|||
Operation Timeout Time Unit (操作タイムアウト時間単位) |
Enumeration (列挙)。次のいずれかになります。
|
|||
Reconnection Strategy (再接続戦略) |
|
接続エラーが発生した場合の再試行戦略。 |
Message Consumer 操作など、コネクタの入力元を設定します。
名前 | 説明 |
---|---|
Configuration (設定) |
使用する設定の名前。 |
Topic (トピック) |
メッセージのコンシューム元のトピックの名前。 |
Primary Node Only (プライマリノードのみ) |
クラスターでの実行時、このソースをプライマリノード上でのみ実行するかどうか。 |
Streaming Strategy (ストリーミング戦略) |
反復可能なストリームを使用するように設定します。 |
Redelivery Policy (再配信ポリシー) |
同じメッセージの再配信を処理するためのポリシーを定義します。 |
Reconnection Strategy (再接続戦略) |
接続エラーが発生した場合の再試行戦略。
|
Apache Kafka Connector の入力元として Batch Message Listener 操作を使用することもできます。
名前 | 説明 |
---|---|
Connector Configuration (コネクタ設定) |
使用する設定の名前。 |
Poll timeout (ポーリングタイムアウト) |
ブロックする時間。 |
Poll timeout time unit (ポーリングタイムアウト時間単位) |
ポーリングタイムアウトの時間単位。これをポーリングタイムアウトと組み合わせて、ポーリングの合計タイムアウトを定義します。 |
Acknowledgment mode (肯定応答モード) |
サポートされる肯定応答モード種別を宣言します。 |
並列コンシューマーの量。 |
並列で使用するコンシューマーの数を宣言します。 |
Primary Node Only (プライマリノードのみ) |
クラスターでの実行時、このソースをプライマリノード上でのみ実行する必要があるかどうか。 |
Streaming Strategy (ストリーミング戦略) |
ストリーミング戦略を定義します。
|
Redelivery Policy (再配信ポリシー) |
同じメッセージの再配信を処理するためのポリシーを定義します。 |
Reconnection Strategy (再接続戦略) |
接続エラーが発生した場合の再試行戦略。
|