AMQP Connector リファレンス

AMQP Connector v1.6

リリースノート: AMQP Connector リリースノート

AMQP 用 Anypoint Connector は AMQP 0.9.1 準拠の MuleSoft 拡張機能であり、AMQP メッセージをコンシュームおよび生成するために使用されます。この拡張機能では、エクスチェンジとキュー、コンシューマー、肯定応答モード、ローカルトランザクションを含む AMQP 機能がサポートされます。

設定


設定

AMQP Connector の基本設定パラメーターは次のとおりです。

パラメーター

名前 説明 デフォルト値 必須

Name (名前)

String (文字列)

この設定の名前。コネクタはこの名前の設定を参照します。

x

Encoding (エンコード)

String (文字列)

メッセージにエンコードが指定されていない場合に使用するメッセージ本文のデフォルトエンコード。

Content Type (コンテンツタイプ)

String (文字列)

メッセージにエンコードが指定されていない場合に使用するメッセージ本文のデフォルトコンテンツタイプ。

/

Create Fallback Queue (代替キューを作成)

String (文字列)

代替定義が指定されている場合にその定義に従って存在しないキューを作成するかどうか。

true

Create Fallback Exchange (代替エクスチェンジを作成)

String (文字列)

代替定義が指定されている場合にその定義に従って存在しないエクスチェンジを作成するかどうか。

true

接続の設定。

パラメーター

名前 説明 デフォルト値 必須

Host (ホスト)

String (文字列)

接続先のブローカーのホスト。

x

Port (ポート)

Number (数値)

接続先の AMQP ブローカーのポート。

  • セキュアでない接続の場合は ​5672

  • セキュアな接続の場合は ​5671

Username (ユーザー名)

String (文字列)

認証のログイン情報を提供するときに使用するユーザー名。

x

Password (パスワード)

String (文字列)

認証のログイン情報を提供するときに使用するパスワード。

x

useTls

Boolean (ブール)

TLS が必要かどうか。指定しない場合、AMQP 接続のデフォルトが使用されます。

false

heartbeatTimeout

Boolean (ブール)

ハートビートタイムアウト。ハートビートフレームは、タイムアウトの約 1/2 の間隔で送信されます。(v1.2.0 以降)

60

handshakeTimeoutTimeUnit

Enumeration (列挙)。次のいずれかになります。

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

ハンドシェイクタイムアウト AMQP 接続ソケット設定の時間単位。(v1.5.0 以降)

MILLISECONDS

handshakeTimeout

Integer (整数)

基盤となる AMQP Connector に設定する AMQP 0.9.1 タイムアウト。(v1.5.0 以降)

AMQP クライアントによるデフォルト (10000 ミリ秒)

fallbackAddresses

List of Fallback Addresses (host, port) (代替アドレスのリスト (ホスト、ポート))

メインブローカーへの接続に失敗した場合に接続を試行するブローカーノードのアドレス。(v1.3.0 以降)

代替アドレスなし

コンシューマーの設定。

パラメーター

名前 説明 デフォルト値 必須

Ack Mode (肯定応答モード)

Enumeration (列挙)。次のいずれかになります。

  • IMMEDIATE

  • AUTO

  • MANUAL

メッセージをコンシュームするときに使用する ConsumerAckMode。メッセージソースレベルで上書きできます。

IMMEDIATE

No Local (非ローカル)

Boolean (ブール)

true​ に設定されている場合、サーバーはメッセージをパブリッシュした接続にメッセージを送信しません。

false

Exclusive Consumers (排他的なコンシューマー)

Boolean (ブール)

コネクタで排他的なコンシューマーのみを作成する必要がある場合に ​true​ に設定します。つまり、作成されたコンシューマーのみがキューにアクセスできます。

false

Number of Consumers (コンシューマー数)

Integer (整数)

AMQP メッセージを受信するためにメッセージソースによって実行されるコンシューマーの数。各コンシューマーでチャネルが作成されます。

4

パブリッシャーの設定。

パラメーター

名前 説明 デフォルト値 必須

Delivery Mode (配信モード)

Enumeration (列挙)。次のいずれかになります。

  • PERSISTENT

  • TRANSIENT

AMQP ブローカーにパブリッシュするときに使用する配信モード。

PERSISTENT

Priority (優先度)

Integer (整数)

AMQP ブローカーにパブリッシュするときに使用する優先度。

0

Request Broker Confirms (ブローカーの確認を要求)

Boolean (ブール)

確認が行われない場合に失敗するかどうか。

false

必須

Boolean (ブール)

メッセージをキューにルーティングできない場合に操作に失敗するかどうか。

false

Immediate (即時)

Boolean (ブール)

メッセージをキューコンシューマーに即時にルーティングできない場合に操作に失敗するかどうか。

false

Returned Message Exchange (返されるメッセージエクスチェンジ)

String (文字列)

返されるメッセージをパブリッシュするエクスチェンジ。

サービス品質の設定。

パラメーター

名前 説明 デフォルト値 必須

Prefetch Size (プリフェッチサイズ)

Integer (整数)

この項目では、プリフェッチサイズウィンドウを定義します。ブローカーは、prefetchSize ウィンドウ (バイト) を超えない範囲でできるだけ多くのメッセージを送信します。特定の上限を設定しない場合は、​0​ を使用します。

0

Prefetch Count (プリフェッチ数)

Integer (整数)

メッセージ全体に関してグローバルプリフェッチウィンドウを指定します。この項目は、プリフェッチサイズ項目と組み合わせて使用します。両方のプリフェッチウィンドウで許可されている場合にのみメッセージが事前に送信されます。特定の上限を設定しない場合は、​0​ を使用します。

0

AMQP フレームハンドラーのソケット設定。

パラメーター

名前 説明 デフォルト値 必須

Keep Alive (キープアライブ)

Boolean (ブール)

基盤となる AMQP Connector に設定するキープアライブ (v1.5.0 以降)。

false

soTimeoutTimeUnit

Enumeration (列挙)。次のいずれかになります。

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

SO_TIMEOUT​ AMQP 接続ソケット設定の時間単位 (v1.5.0 以降)。

MILLISECONDS

soTimeout

Integer (整数)

基盤となる AMQP Connector に設定する ​SO_TIMEOUT​ (v1.5.0 以降)。

ユーザー環境のデフォルト

receiveBufferSize

Integer (整数)

基盤となる AMQP Connector に設定する受信バッファサイズ (v1.5.0 以降)。

ユーザー環境のデフォルト

sendBufferSize

Integer (整数)

基盤となる AMQP Connector に設定する送信バッファサイズ (v1.5.0 以降)。

ユーザー環境のデフォルト

ソース

Consume

<amqp:consume>

ユーザーが任意のキューから単一の AMQP メッセージをコンシュームできるようにする操作。

パラメーター

名前 説明 デフォルト値 必須

Configuration (設定)

String (文字列)

使用する設定の名前。

x

Queue name (キュー名)

String (文字列)

メッセージがコンシュームされるキューの名前。

x

Content Type (コンテンツタイプ)

String (文字列)

メッセージのコンテンツタイプ。

Encoding (エンコード)

String (文字列)

メッセージのコンテンツエンコード。

Fallback Queue Definition (代替キュー定義)

Definition of a Queue (キューの定義)

queueName のキューがない場合にキューを宣言するために使用されるキュー定義。

Ack Mode (肯定応答モード)

Enumeration (列挙)。次のいずれかになります。

  • IMMEDIATE

  • MANUAL

メッセージとセッションで設定する ConsumerAckMode。

Maximum Wait (最大待機)

Number (数値)

タイムアウトとなる前にメッセージを待つ最大時間。

10000

Maximum Wait Unit (最大待機単位)

Enumeration (列挙)。次のいずれかになります。

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

[Maximum wait time (最大待機時間)]​ 設定で使用する時間単位。

MILLISECONDS

Transactional Action (トランザクションアクション)

Enumeration (列挙)。次のいずれかになります。

  • ALWAYS_JOIN​

  • JOIN_IF_POSSIBLE​

  • NOT_SUPPORTED

トランザクションに関する操作で実行できる結合アクションの種別。

JOIN_IF_POSSIBLE

Reconnection Strategy (再接続戦略)

接続エラーが発生した場合の再試行戦略。

出力

Any (任意)

Attributes Type (属性型)

次の設定の場合

スロー

  • AMQP:CONNECTIVITY

  • AMQP:CONSUMING

  • AMQP:CREATION_NOT_ALLOWED

  • AMQP:QUEUE_NOT_FOUND

  • AMQP:RETRY_EXHAUSTED

  • AMQP:TIMEOUT

Publish

<amqp:publish>

ユーザーが単一の AMQP メッセージを任意のエクスチェンジにパブリッシュできるようにする操作。

パラメーター

名前 説明 デフォルト値 必須

Configuration (設定)

String (文字列)

使用する設定の名前。

x

Exchange Name (エクスチェンジ名)

String (文字列)

メッセージのパブリッシュ先のエクスチェンジの名前。

x

Fallback Exchange Definition (代替エクスチェンジ定義)

Definition of an Exchange (エクスチェンジの定義)

exchangeName のエクスチェンジがない場合にエクスチェンジを宣言するために使用されるエクスチェンジ定義。

Routing Keys (ルーティングキー)

LIST

ルーティングキーのリスト。

Delivery Mode (配信モード)

Enumeration (列挙)。次のいずれかになります。

  • PERSISTENT

  • TRANSIENT

AMQP ブローカーにパブリッシュするときに使用する配信モード。

PERSISTENT

Correlation Id (相関 ID)

String (文字列)

メッセージの AMQPCorrelationID ヘッダー。

ContentType

String (文字列)

本文のコンテンツタイプ。

Encoding (エンコード)

String (文字列)

メッセージの本文の outboundEncoding。

Reply To (返信先)

String (文字列)

このメッセージの返信先となるキューの AMQP ​replyTo​ プロパティ情報。

User Properties (ユーザープロパティ)

Object (オブジェクト)

このメッセージについて設定するカスタムユーザープロパティ。各プロパティは他のデフォルトの AMQP ユーザープロパティとマージされます。すべての AMQP ユーザープロパティは単一のオブジェクトで一度に設定されます。このオブジェクトは、​#[output application/json --- { userName: vars.user, appName: 'myApp'}]​ のように DataWeave オブジェクトとして書き込めます。続いて、ユーザープロパティオブジェクトのそれぞれのキー/値が個別の AMQP ユーザープロパティとして設定されます。

Reconnection Strategy (再接続戦略)

接続エラーが発生した場合の再試行戦略。

次の設定の場合

スロー

  • AMQP:CREATION_NOT_ALLOWED

  • AMQP:ILLEGAL_BODY

  • AMQP:PUBLISHING

  • AMQP:RETRY_EXHAUSTED

  • AMQP:UNROUTABLE_MESSAGE

Publish Consume

<amqp:publish-consume>

AMQP エクスチェンジにメッセージを送信し、指定された replyTo 宛先または動的に作成される一時的な宛先への応答を待機します。

パラメーター

名前 説明 デフォルト値 必須

Configuration (設定)

String (文字列)

使用する設定の名前。

x

Exchange Name (エクスチェンジ名)

String (文字列)

メッセージのパブリッシュ先のエクスチェンジの名前。

x

Correlation Id (相関 ID)

String (文字列)

メッセージの AMQPCorrelationID ヘッダー。

ContentType

String (文字列)

本文のコンテンツタイプ。

/

Encoding (エンコード)

String (文字列)

メッセージの本文の outboundEncoding。

User Properties (ユーザープロパティ)

Object (オブジェクト)

このメッセージについて設定する必要があるカスタムユーザープロパティ。各プロパティは他のデフォルトの AMQP ユーザープロパティとマージされます。すべての AMQP ユーザープロパティは単一のオブジェクトで一度に設定されます。このオブジェクトは、​#[output application/json --- { userName: vars.user, appName: 'myApp'}]​ のように DataWeave オブジェクトとして書き込めます。続いて、ユーザープロパティオブジェクトのそれぞれのキー/値が個別の AMQP ユーザープロパティとして設定されます。

Maximum Wait (最大待機)

Number (数値)

タイムアウトとなる前にメッセージを待つ最大時間。

10000

Maximum Wait Unit (最大待機単位)

Enumeration (列挙)。次のいずれかになります。

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

maximumWaitTime 設定で使用する時間単位。

MILLISECONDS

Reconnection Strategy (再接続戦略)

接続エラーが発生した場合の再試行戦略。

出力

Any (任意)

Attributes Type (属性型)

次の設定の場合

スロー

  • AMQP:CONNECTIVITY

  • AMQP:CONSUMING

  • AMQP:CREATION_NOT_ALLOWED

  • AMQP:ILLEGAL_BODY

  • AMQP:PUBLISHING

  • AMQP:PUBLISHING_CONSUMING

  • AMQP:QUEUE_NOT_FOUND

  • AMQP:RETRY_EXHAUSTED

  • AMQP:TIMEOUT

Ack

<amqp:ack>

ユーザーが配信された AMQP メッセージを肯定応答できるようにする操作。

パラメーター

名前 説明 デフォルト値 必須

Ack Id

String (文字列)

肯定応答するメッセージの AckId。

x

Reject

<amqp:reject>

ユーザーが配信された AMQP メッセージを拒否できるようにする操作。

パラメーター

名前 説明 デフォルト値 必須

Ack Id

String (文字列)

肯定応答するメッセージの AckId。

x

Requeue (キューに再登録)

Boolean (ブール)

拒否されたメッセージをキューに再登録する必要があるかどうかを示します。

false

ソース

リスナー

<amqp:listener>

受信メッセージをリスンするためのキューの AMQP リスナー。

パラメーター

名前 説明 デフォルト値 必須

Configuration (設定)

String (文字列)

使用する設定の名前。

x

Queue Name (キュー名)

String (文字列)

コンシュームするキューの名前。

x

Number Of Consumers (コンシューマー数)

Number (数値)

AMQP メッセージを受信するために使用する同時コンシューマー数。

4

Consumer Tag (コンシューマータグ)

String (文字列)

コンテキストを確立するためにクライアントによって生成されたコンシューマータグ。

4

Recovery Strategy (回復戦略)

Enumeration (列挙)。次のいずれかになります。

  • None (なし)

  • NO_REQUEUE​

  • REQUEUE

チャネル回復またはロールバックの実行時に使用する戦略。

REQUEUE

Inbound content type (インバウンドコンテンツタイプ)

String (文字列)

メッセージ本文のコンテンツタイプ。

Inbound encoding (インバウンドエンコード)

String (文字列)

メッセージの本文の inboundEncoding。

Redelivery Policy (再配信ポリシー)

項目 説明 デフォルト値 必須

Max Redelivery Count (最大再配信数)

Number (数値)

正常に処理されずにプロセス失敗メッセージがトリガーされるまでにメッセージを再配信できる最大回数。

Use Secure Hash (セキュアハッシュを使用)

Boolean (ブール)

再配信されたメッセージの識別にセキュアハッシュアルゴリズムを使用するかどうか。

Message Digest Algorithm (メッセージダイジェストアルゴリズム)

String (文字列)

使用するセキュアハッシュアルゴリズム。設定されていない場合。

SHA-256

Id Expression (ID 式)

String (文字列)

メッセージがいつ再配信されたのかを判断するために使用する 1 つ以上の式を定義します。このプロパティは、useSecureHash が ​false​ の場合にのみ使用できます。

Object Store (オブジェクトストア)

ObjectStore

各メッセージの再配信カウンターが保存されるオブジェクトストア。

Reconnect (再接続)

項目 説明 デフォルト値 必須

Frequency (頻度)

Number (数値)

再接続する頻度 (ミリ秒)。

Count (数)

Number (数値)

再接続の試行回数。

blocking (ブロック)

Boolean (ブール)

false の場合、再接続戦略が個別の非ブロックスレッドで実行されます。

true

Reconnect Forever (繰り返し再接続)

項目 説明 デフォルト値 必須

Frequency (頻度)

Number (数値)

再接続する頻度 (ミリ秒)。

blocking (ブロック)

Boolean (ブール)

false の場合、再接続戦略が個別の非ブロックスレッドで実行されます。

true

キュー定義

パラメーター

名前 説明 デフォルト値 必須

Removal Strategy (削除戦略)

Enumeration (列挙)。次のいずれかになります。

  • EXPLICIT

  • SHUTDOWN

  • UNUSED

宣言されたキューをブローカーから削除するタイミングを定義します。

EXPLICIT

Exchange to Bind (バインドするエクスチェンジ)

String (文字列)

キューのバインド先のエクスチェンジを定義します。

Binding Routing Key (ルーティングキーのバインド)

String (文字列)

エクスチェンジへのバインドで使用するルーティングキーを定義します。(v1.4.0 以降)

交換定義

パラメーター

名前 説明 デフォルト値 必須

Removal Strategy (削除戦略)

Enumeration (列挙)。次のいずれかになります。

  • EXPLICIT

  • SHUTDOWN

  • UNUSED

宣言されたエクスチェンジをブローカーから削除するタイミングを定義します。

EXPLICIT

Exchange Type (エクスチェンジ種別)

Enumeration (列挙)。次のいずれかになります。

  • DIRECT

  • TOPIC

  • FANOUT (論理出力)

  • HEADERS

宣言するエクスチェンジの種別。

FANOUT

AMQP Attributes (SOAP 属性)

パラメーター

名前 説明 デフォルト値 必須

Envelope (エンベロープ)

ENVELOPE

AMQP の基本的な方法で使用されるパラメーターのグループをカプセル化します。

Properties (プロパティ)

PROPERTIES

AMQP メッセージのプロパティ。

ヘッダー

MAP

AMQP メッセージヘッダー。

Envelope (エンベロープ)

パラメーター

名前 説明 デフォルト値 必須

Delivery Tag (配信タグ)

Number (数値)

配信タグ。

Redeliver (再配信)

Boolean (ブール)

これが失敗した肯定応答の後の再配信の場合は true。

Exchange

String (文字列)

現在の操作で使用されるエクスチェンジ。

routingKey

String (文字列)

関連付けられたルーティングキー。

Properties (プロパティ)

パラメーター

名前 説明 デフォルト値 必須

Content-Type (コンテンツタイプ)

String (文字列)

メッセージのコンテンツタイプ。

Content Encoding (コンテンツエンコード)

String (文字列)

メッセージのコンテンツエンコード。

Priority (優先度)

Number (数値)

AMQP ブローカーにパブリッシュするときに使用する優先度。

Correlation Id (相関 ID)

String (文字列)

要求-返信のメッセージで識別される RPC パターンを実装する場合に使用されます。

Message Id (メッセージ ID)

String (文字列)

メッセージのメッセージ ID。

Reply To (返信先)

String (文字列)

RPC の場合に設定される宛先。

Expiration (有効期限)

String (文字列)

メッセージの有効期限 (時間) を指定します。

Expiration time unit (有効期限の時間単位)

メッセージの有効期限 (時間) の時間単位を指定します。

User Id (ユーザー ID)

String (文字列)

メッセージのユーザー ID。

App Id (アプリケーション ID)

String (文字列)

メッセージのアプリケーション ID。

Cluster Id (クラスター ID)

String (文字列)

メッセージのクラスター ID。

Timestamp (タイムスタンプ)

TIMESTAMP

コンシュームされたメッセージのタイムスタンプ。