AMQP コネクタドキュメントリファレンス

AmqpConnector は AMQP 0.9.1 準拠の MuleSoft 拡張機能であり、AMQP メッセージをコンシュームおよび生成するために使用されます。この拡張機能では、交換とキュー、コンシューマ、肯定応答モード、ローカルトランザクションを含む AMQP 機能がサポートされます。

設定


設定

AmqpConnector の基本設定

パラメータ

名前 説明 デフォルト値 必須

Name (名前)

String (文字列)

この設定の名前。コネクタはこの名前の設定を参照します。

x 

Encoding (エンコーディング)

String (文字列)

メッセージが通信しない場合に使用されるメッセージ本文のデフォルトエンコーディング。

 

Content Type (コンテンツタイプ)

String (文字列)

メッセージが通信しない場合に使用されるメッセージ本文のデフォルトの contentType。

*/*

 

Create Fallback Queue (代替キューを作成)

String (文字列)

代替定義が指定されている場合にその定義に従って存在しないキューを作成するかどうか。

true

 

Create Fallback Exchange (代替交換を作成)

String (文字列)

代替定義が指定されている場合にその定義に従って存在しないキューを作成するかどうか。

true

 

接続の設定

パラメータ

名前 説明 デフォルト値 必須

Host (ホスト)

String (文字列)

接続先のブローカーのホスト。

x 

Port (ポート)

Number (数値)

接続先の AMQP ブローカーのポート。

5672 (セキュアでない接続)/5671 (セキュアな接続)

 

Username (ユーザ名)

String (文字列)

認証のログイン情報を提供するときに使用されるユーザ名。

 

x 

Password (パスワード)

String (文字列)

認証のログイン情報を提供するときに使用されるパスワード。

 

x 

useTls

Boolean (ブール)

TLS を使用する必要があるかどうか。指定しない場合、AMQP 接続のデフォルトが使用されます。

false

 

heartbeatTimeout

Boolean (ブール)

ハートビートタイムアウト。ハートビートフレームは、タイムアウトの約 1/2 の間隔で送信されます。(1.2.0 以降)。

60

 

fallbackAddresses

List of Fallback Addresses (host, port) (代替アドレスのリスト (ホスト、ポート))

メインブローカーへの接続に失敗した場合に接続を試行するブローカーノードのアドレス。(1.3.0 以降)。

代替アドレスなし

 

コンシューマの設定

パラメータ

名前 説明 デフォルト値 必須

Ack Mode (肯定応答モード)

Enumeration (列挙)。次のいずれかになります。

  • IMMEDIATE (即時)

  • AUTO (自動)

  • MANUAL (手動)

メッセージをコンシュームするときに使用する ConsumerAckMode。メッセージソースレベルで上書きできます。

AUTO (自動)

 

No Local (非ローカル)

Boolean (ブール)

true に設定されている場合、サーバはメッセージをパブリッシュした接続にメッセージを送信しません。

false

 

Exclusive Consumers (排他的なコンシューマ)

Boolean (ブール)

コネクタで排他的なコンシューマのみを作成する必要がある場合に true に設定します。つまり、作成されたコンシューマのみがキューにアクセスできます。

false

 

Number of Consumers (コンシューマ数)

Integer (整数)

AMQP メッセージを受信するためにメッセージソースによって実行されるコンシューマの数。各コンシューマでチャネルが作成されます。

4

 

パブリッシャーの設定

パラメータ

名前 説明 デフォルト値 必須

Delivery Mode (配信モード)

Enumeration (列挙)。次のいずれかになります。

  • PERSISTENT (永続的)

  • TRANSIENT (一時的)

AMQP ブローカーにパブリッシュするときに使用する配信モード。

PERSISTENT (永続的)

 

Priority (優先度)

Integer (整数)

AMQP ブローカーにパブリッシュするときに使用する優先度。

0

 

Request Broker Confirms (ブローカーの確認を要求)

Boolean (ブール)

確認が行われない場合に失敗するかどうか。

false

 

Mandatory (必須)

Boolean (ブール)

メッセージをキューにルーティングできない場合に操作に失敗するかどうか。

false

 

Immediate (即時)

Boolean (ブール)

メッセージをキューコンシューマに即時にルーティングできない場合に操作に失敗するかどうか。

false

 

Returned Message Exchange (返されるメッセージ交換)

String (文字列)

返されるメッセージをパブリッシュする交換。

 

 

サービス品質の設定

パラメータ

名前 説明 デフォルト値 必須

Prefetch Size (プリフェッチサイズ)

Integer (整数)

この項目では、プリフェッチサイズウィンドウを定義します。ブローカーは、prefetchSize ウィンドウ (バイト) を超えない範囲でできるだけ多くのメッセージを送信します。特定の上限を設定しない場合は、0 を使用します。

0

 

Prefetch Count (プリフェッチ数)

Integer (整数)

メッセージ全体に関してグローバルプリフェッチウィンドウを指定します。この項目は、プリフェッチサイズ項目と組み合わせて使用できます。両方のプリフェッチウィンドウで許可されている場合にのみメッセージが事前の送信されます。特定の上限を設定しない場合は、0 を使用します。

0

 

関連操作

関連付けられたソース

操作

Consume

<amqp:consume>

ユーザが任意のキューから単一の AmqpMessage をコンシュームできるようにする操作。

パラメータ

名前 説明 デフォルト値 必須

Configuration (設定)

String (文字列)

使用する設定の名前。

x 

Queue name (キュー名)

String (文字列)

メッセージがコンシュームされるキューの名前

x 

Content Type (コンテンツタイプ)

String (文字列)

メッセージのコンテンツ種別。

 

Encoding (エンコーディング)

String (文字列)

メッセージのコンテンツエンコーディング。

 

Fallback Queue Definition (代替キュー定義)

Definition of a Queue (キューの定義)

queueName のキューがない場合にキューを宣言するために使用されるキュー定義

 

Ack Mode (肯定応答モード)

Enumeration (列挙)。次のいずれかになります。

  • IMMEDIATE (即時)

  • MANUAL (手動)

メッセージおよびセッションで設定される ConsumerAckMode。

 

Maximum Wait (最大待機)

Number (数値)

タイムアウトになる前にメッセージを待機する最大時間。

10000

 

Maximum Wait Unit (最大待機単位)

Enumeration (列挙)。次のいずれかになります。

  • NANOSECONDS (ナノ秒)

  • MICROSECONDS (マイクロ秒)

  • MILLISECONDS (ミリ秒)

  • SECONDS (秒)

  • MINUTES (分)

  • HOURS (時間)

  • DAYS (日)

maximumWaitTime 設定で使用する時間の単位。

MILLISECONDS (ミリ秒)

 

Transactional Action (トランザクションアクション)

Enumeration (列挙)。次のいずれかになります。

  • ALWAYS_JOIN

  • JOIN_IF_POSSIBLE

  • NOT_SUPPORTED

操作で実行できる、トランザクションに関する結合アクションの種別。

JOIN_IF_POSSIBLE

 

Reconnection Strategy (再接続戦略)

接続エラー時の再試行戦略

 

出力

Any (いずれか)

Attributes Type (属性型)

設定

スロー

  • AMQP:TIMEOUT  

  • AMQP:CONNECTIVITY  

  • AMQP:CONSUMING  

  • AMQP:RETRY_EXHAUSTED  

  • AMQP:QUEUE_NOT_FOUND  

  • AMQP:CREATION_NOT_ALLOWED  

Publish

<amqp:publish>

ユーザが単一の AmqpMessage を任意の交換にパブリッシュできるようにする操作。

パラメータ

名前 説明 デフォルト値 必須

Configuration (設定)

String (文字列)

使用する設定の名前。

x 

Exchange Name (交換名)

String (文字列)

メッセージのパブリッシュ先の交換の名前

x 

Fallback Exchange Definition (代替交換定義)

Definition of an Exchange (交換の定義)

exchangeName の交換がない場合に交換を宣言するために使用される交換定義

 

Routing Keys (ルーティングキー)

LIST (リスト)

ルーティングキーのリスト

 

Delivery Mode (配信モード)

Enumeration (列挙)。次のいずれかになります。

  • PERSISTENT (永続的)

  • TRANSIENT (一時的)

AMQP ブローカーにパブリッシュするときに使用する配信モード。

PERSISTENT (永続的)

 

Correlation Id (相関 ID)

String (文字列)

メッセージの AMQPCorrelationID ヘッダー

 

ContentType

String (文字列)

本文のコンテンツ種別。

 

Encoding (エンコーディング)

String (文字列)

メッセージの本文の outboundEncoding。

 

Reply To (返信先)

String (文字列)

このメッセージの返信先となるキューの AMQP replyTo プロパティ情報

 

User Properties (ユーザプロパティ)

Object (オブジェクト)

このメッセージについて設定する必要があるカスタムユーザプロパティ。各プロパティは他のデフォルトの AMQP ユーザプロパティとマージされます。すべての AMQP ユーザプロパティは単一のオブジェクトで一度に設定されます。このオブジェクトは、`#[output application/json --- { userName: vars.user, appName: 'myApp'}]` のように DataWeave オブジェクトとして書き込めます。続いて、ユーザプロパティオブジェクトのそれぞれのキー/値が個別の AMQP ユーザプロパティとして設定されます。

 

Reconnection Strategy (再接続戦略)

接続エラー時の再試行戦略

 

設定

スロー

  • AMQP:PUBLISHING  

  • AMQP:UNROUTABLE_MESSAGE  

  • AMQP:CREATION_NOT_ALLOWED  

  • AMQP:ILLEGAL_BODY  

  • AMQP:RETRY_EXHAUSTED  

Publish Consume

<amqp:publish-consume>

ユーザが AMQP 交換にメッセージを送信し、指定された replyTo 宛先または動的に作成される一時的な宛先への応答を待機できるようにする操作。

パラメータ

名前 説明 デフォルト値 必須

Configuration (設定)

String (文字列)

使用する設定の名前。

 

x 

Exchange Name (交換名)

String (文字列)

メッセージのパブリッシュ先の交換の名前

 

x 

Correlation Id (相関 ID)

String (文字列)

メッセージの AMQPCorrelationID ヘッダー

 

 

ContentType

String (文字列)

本文のコンテンツタイプ

*/*

 

Encoding (エンコード)

String (文字列)

メッセージの本文の outboundEncoding

 

 

User Properties (ユーザプロパティ)

Object (オブジェクト)

このメッセージについて設定する必要があるカスタムユーザプロパティ。各プロパティは他のデフォルトの AMQP ユーザプロパティとマージされます。すべての AMQP ユーザプロパティは単一のオブジェクトで一度に設定されます。このオブジェクトは、`#[output application/json --- { userName: vars.user, appName: 'myApp'}]` のように DataWeave オブジェクトとして書き込めます。続いて、ユーザプロパティオブジェクトのそれぞれのキー/値が個別の AMQP ユーザプロパティとして設定されます。

 

Maximum Wait (最大待機)

Number (数値)

タイムアウトになる前にメッセージを待機する最大時間。

10000

 

Maximum Wait Unit (最大待機単位)

Enumeration (列挙)。次のいずれかになります。

  • NANOSECONDS (ナノ秒)

  • MICROSECONDS (マイクロ秒)

  • MILLISECONDS (ミリ秒)

  • SECONDS (秒)

  • MINUTES (分)

  • HOURS (時間)

  • DAYS (日)

maximumWaitTime 設定で使用する時間の単位。

MILLISECONDS (ミリ秒)

 

Reconnection Strategy (再接続戦略)

接続エラー時の再試行戦略

 

出力

Any (いずれか)

Attributes Type (属性型)

設定

スロー

  • AMQP:PUBLISHING_CONSUMING  

  • AMQP:PUBLISHING  

  • AMQP:TIMEOUT  

  • AMQP:CONNECTIVITY  

  • AMQP:CONSUMING  

  • AMQP:ILLEGAL_BODY  

  • AMQP:RETRY_EXHAUSTED  

  • AMQP:QUEUE_NOT_FOUND  

  • AMQP:CREATION_NOT_ALLOWED  

Ack

<amqp:ack>

ユーザが配信された AmqpMessage を肯定応答できるようにする操作。

パラメータ

名前 説明 デフォルト値 必須

Ack Id (肯定応答 ID)

String (文字列)

肯定応答するメッセージの AckId。

x 

Reject

<amqp:reject>

ユーザが配信された AmqpMessage を拒否できるようにする操作。

パラメータ

名前 説明 デフォルト値 必須

Ack Id (肯定応答 ID)

String (文字列)

肯定応答するメッセージの AckId。

x 

Requeue (キューに再登録)

Boolean (ブール)

拒否されたメッセージをキューに再登録する必要があるかどうかを示します。

false

 

Sources (ソース)

リスナ

<amqp:listener>

キューの AMQP リスナ。受信メッセージをリスンできるようにします。

リスナ設定パラメータ

名前 説明 デフォルト値 必須

Configuration (設定)

String (文字列)

使用する設定の名前。

x 

Queue Name (キュー名)

String (文字列)

コンシュームするキューの名前

x 

Number Of consumers (コンシューマ数)

Number (数値)

AMQP メッセージを受信するために使用される同時コンシューマ数

4

 

Consumer Tag (コンシューマタグ)

String (文字列)

コンテキストを確立するためにクライアントによって生成されたコンシューマタグ。

4

 

Recovery Strategy (回復戦略)

Enumeration (列挙)。次のいずれかになります。

  • NONE (なし)

  • NO_REQUEUE

  • REQUEUE (キューに再登録)

チャネル回復またはロールバックの実行時に使用する戦略。

REQUEUE

 

Inbound content type (インバウンドコンテンツ種別)

String (文字列)

メッセージ本文のコンテンツ種別。

 

Inbound encoding (インバウンドエンコーディング)

String (文字列)

メッセージ本文の inboundEncoding。

 

Redelivery Policy (再配信ポリシー)

項目 説明 デフォルト値 必須

Max Redelivery Count (最大再配信数)

Number (数値)

正常に処理されずにプロセス失敗メッセージがトリガされるまでにメッセージを再配信できる最大回数。

Use Secure Hash (セキュアハッシュを使用)

Boolean (ブール)

再配信されたメッセージの識別にセキュアハッシュアルゴリズムを使用するかどうか。

Message Digest Algorithm (メッセージダイジェストアルゴリズム)

String (文字列)

使用するセキュアハッシュアルゴリズム。設定しない場合、デフォルトの SHA-256 になります。

Id Expression (ID 式)

String (文字列)

メッセージがいつ再配信されたのかを判断するために使用する 1 つ以上の式を定義します。このプロパティは、useSecureHash が false の場合にのみ設定できます。

Object Store (オブジェクトストア)

各メッセージの再配信カウンタが保存されるオブジェクトストア。

Reconnect (再接続)

項目 説明 デフォルト値 必須

Frequency (頻度)

Number (数値)

再接続の頻度 (ミリ秒)。

Count (カウント)

Number (数値)

再接続の試行回数。

blocking (ブロック)

Boolean (ブール)

false の場合、再接続戦略が個別の非ブロックスレッドで実行されます。

true

Reconnect Forever (繰り返し再接続)

項目 説明 デフォルト値 必須

Frequency (頻度)

Number (数値)

再接続の頻度 (ミリ秒)。

blocking (ブロック)

Boolean (ブール)

false の場合、再接続戦略が個別の非ブロックスレッドで実行されます。

true

キュー定義

パラメータ

名前 説明 デフォルト値 必須

Removal Strategy (削除戦略)

Enumeration (列挙)。次のいずれかになります。

  • EXPLICIT (明示的)

  • SHUTDOWN (シャットダウン)

  • UNUSED (未使用)

宣言されたキューをブローカーから削除するタイミングを定義します。

EXPLICIT (未使用)

 

Exchange to Bind (バインドする交換)

String (文字列)

キューのバインド先の交換を定義します。

 

 

交換定義

パラメータ

名前 説明 デフォルト値 必須

Removal Strategy (削除戦略)

Enumeration (列挙)。次のいずれかになります。

  • EXPLICIT (明示的)

  • SHUTDOWN (シャットダウン)

  • UNUSED (未使用)

宣言された交換をブローカーから削除するタイミングを定義します。

EXPLICIT (明示的)

 

Exchange Type (交換種別)

Enumeration (列挙)。次のいずれかになります。

  • DIRECT (直接)

  • TOPIC (トピック)

  • FANOUT (論理出力)

  • HEADERS (ヘッダー)

宣言する交換の種別。

FANOUT (論理出力)

 

AMQP 属性

パラメータ

名前 説明 デフォルト値 必須

Envelope (エンベロープ)

ENVELOPE (エンベロープ)

AMQP の基本的な方法で使用されるパラメータのグループをカプセル化します。

Properties (プロパティ)

PROPERTIES (プロパティ)

AMQP メッセージプロパティ

Headers (ヘッダー)

MAP (マップ)

AMQP メッセージヘッダー

エンベロープ

パラメータ

名前 説明 デフォルト値 必須

Delivery Tag (配信タグ)

Number (数値)

配信タグ

Redeliver (再配信)

Boolean (ブール)

これが失敗した肯定応答の後の再配信の場合は true

Exchange (交換)

String (文字列)

現在の操作で使用される交換。

routingKey

String (文字列)

関連付けられているルーティングキー

Properties (プロパティ)

パラメータ

名前 説明 デフォルト値 必須

Content Type (コンテンツタイプ)

String (文字列)

メッセージのコンテンツタイプ。

Content Encoding (コンテンツエンコード)

String (文字列)

メッセージのコンテンツエンコード。

Delivery Mode (配信モード)

DELIVERY MODE (配信モード)

AMQP ブローカーにパブリッシュするときに使用する配信モード。

Priority (優先度)

Number (数値)

AMQP ブローカーにパブリッシュするときに使用する優先度。

Correlation Id (相関 ID)

String (文字列)

要求-返信のメッセージで識別される RPC パターンを実装する場合に使用されます。

replyTo

String (文字列)

RPC の場合に設定される宛先。

expiration (有効期限)

String (文字列)

メッセージの有効期限 (ミリ秒)。

messageId

String (文字列)

メッセージの messageId

timestamp (タイムスタンプ)

TIMESTAMP (タイムスタンプ)

コンシュームされたメッセージのタイムスタンプ

type (種別)

String (文字列)

コンシュームされたメッセージの種別

userId

String (文字列)

メッセージのユーザ ID

appId

String (文字列)

メッセージのアプリケーション ID

clusterId

String (文字列)

メッセージのクラスタ ID

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub