AMQP コネクタリファレンス - Mule 4

サポートカテゴリ: 選択

AMQP コネクタ v1.5

リリースノート: ​AMQP Connector Release Notes - Mule 4

AMQP 用 Anypoint コネクタは AMQP 0.9.1 準拠の MuleSoft 拡張機能であり、AMQP メッセージをコンシュームおよび生成するために使用されます。この拡張機能では、エクスチェンジとキュー、コンシューマ、肯定応答モード、ローカルトランザクションを含む AMQP 機能がサポートされます。

設定


設定

AMQP コネクタの基本設定パラメータは次のとおりです。

パラメータ

名前 説明 デフォルト値 必須

Name (名前)

String (文字列)

この設定の名前。コネクタはこの名前の設定を参照します。

x

Encoding (エンコード)

String (文字列)

メッセージにエンコードが指定されていない場合に使用するメッセージ本文のデフォルトエンコード。

Content-Type (コンテンツタイプ)

String (文字列)

メッセージにエンコードが指定されていない場合に使用するメッセージ本文のデフォルトコンテンツタイプ。

/

Create Fallback Queue (代替キューを作成)

String (文字列)

代替定義が指定されている場合にその定義に従って存在しないキューを作成するかどうか。

true

Create Fallback Exchange (代替エクスチェンジを作成)

String (文字列)

代替定義が指定されている場合にその定義に従って存在しないエクスチェンジを作成するかどうか。

true

接続の設定。

パラメータ

名前 説明 デフォルト値 必須

Host (ホスト)

String (文字列)

接続先のブローカーのホスト。

x

Port (ポート)

Number (数値)

接続先の AMQP ブローカーのポート。

  • セキュアでない接続の場合は 5672

  • セキュアな接続の場合は 5671

Username (ユーザ名)

String (文字列)

認証のログイン情報を提供するときに使用するユーザ名。

x

Password (パスワード)

String (文字列)

認証のログイン情報を提供するときに使用するパスワード。

x

useTls

Boolean (ブール)

TLS が必要かどうか。指定しない場合、AMQP 接続のデフォルトが使用されます。

false

heartbeatTimeout

Boolean (ブール)

ハートビートタイムアウト。ハートビートフレームは、タイムアウトの約 1/2 の間隔で送信されます。(v1.2.0 以降)

60

handshakeTimeoutTimeUnit

Enumeration (列挙)。次のいずれかになります。

  • NANOSECONDS (ナノ秒)

  • MICROSECONDS (マイクロ秒)

  • MILLISECONDS (ミリ秒)

  • SECONDS (秒)

  • MINUTES (分)

  • HOURS (時間)

  • DAYS (日)

ハンドシェイクタイムアウト AMQP 接続ソケット設定の時間単位。(v1.5.0 以降)

MILLISECONDS (ミリ秒)

handshakeTimeout

Integer (整数)

基盤となる AMQP コネクタに設定する AMQP 0.9.1 タイムアウト。(v1.5.0 以降)

AMQP クライアントによるデフォルト (10000 ミリ秒)

fallbackAddresses

List of Fallback Addresses (host, port) (代替アドレスのリスト (ホスト、ポート))

メインブローカーへの接続に失敗した場合に接続を試行するブローカーノードのアドレス。(v1.3.0 以降)

代替アドレスなし

コンシューマの設定。

パラメータ

名前 説明 デフォルト値 必須

Ack Mode (肯定応答モード)

Enumeration (列挙)。次のいずれかになります。

  • IMMEDIATE

  • AUTO

  • MANUAL

メッセージをコンシュームするときに使用する ConsumerAckMode。メッセージソースレベルで上書きできます。

IMMEDIATE

No Local (非ローカル)

Boolean (ブール)

true​ に設定されている場合、サーバはメッセージをパブリッシュした接続にメッセージを送信しません。

false

Exclusive Consumers (排他的なコンシューマ)

Boolean (ブール)

コネクタで排他的なコンシューマのみを作成する必要がある場合に true​ に設定します。つまり、作成されたコンシューマのみがキューにアクセスできます。

false

Number of Consumers (コンシューマ数)

Integer (整数)

AMQP メッセージを受信するためにメッセージソースによって実行されるコンシューマの数。各コンシューマでチャネルが作成されます。

4

パブリッシャーの設定

パラメータ

名前 説明 デフォルト値 必須

Delivery Mode (配信モード)

Enumeration (列挙)。次のいずれかになります。

  • PERSISTENT

  • TRANSIENT

AMQP ブローカーにパブリッシュするときに使用する配信モード。

PERSISTENT

Priority (優先度)

Integer (整数)

AMQP ブローカーにパブリッシュするときに使用する優先度。

0

Request Broker Confirms (ブローカーの確認を要求)

Boolean (ブール)

確認が行われない場合に失敗するかどうか。

false

必須

Boolean (ブール)

メッセージをキューにルーティングできない場合に操作に失敗するかどうか。

false

Immediate (即時)

Boolean (ブール)

メッセージをキューコンシューマに即時にルーティングできない場合に操作に失敗するかどうか。

false

Returned Message Exchange (返されるメッセージエクスチェンジ)

String (文字列)

返されるメッセージをパブリッシュするエクスチェンジ。

サービス品質の設定。

パラメータ

名前 説明 デフォルト値 必須

Prefetch Size (プリフェッチサイズ)

Integer (整数)

この項目では、プリフェッチサイズウィンドウを定義します。ブローカーは、prefetchSize ウィンドウ (バイト) を超えない範囲でできるだけ多くのメッセージを送信します。特定の上限を設定しない場合は、0​ を使用します。

0

Prefetch Count (プリフェッチ数)

Integer (整数)

メッセージ全体に関してグローバルプリフェッチウィンドウを指定します。この項目は、プリフェッチサイズ項目と組み合わせて使用します。両方のプリフェッチウィンドウで許可されている場合にのみメッセージが事前に送信されます。特定の上限を設定しない場合は、0​ を使用します。

0

AMQP フレームハンドラのソケット設定。

パラメータ

名前 説明 デフォルト値 必須

Keep Alive (キープアライブ)

Boolean (ブール)

基盤となる AMQP コネクタに設定するキープアライブ (v1.5.0 以降)。

false

soTimeoutTimeUnit

Enumeration (列挙)。次のいずれかになります。

  • NANOSECONDS (ナノ秒)

  • MICROSECONDS (マイクロ秒)

  • MILLISECONDS (ミリ秒)

  • SECONDS (秒)

  • MINUTES (分)

  • HOURS (時間)

  • DAYS (日)

SO_TIMEOUT​ AMQP 接続ソケット設定の時間単位 (v1.5.0 以降)。

MILLISECONDS (ミリ秒)

soTimeout

Integer (整数)

基盤となる AMQP コネクタに設定する SO_TIMEOUT​ (v1.5.0 以降)。

ユーザ環境のデフォルト

receiveBufferSize

Integer (整数)

基盤となる AMQP コネクタに設定する受信バッファサイズ (v1.5.0 以降)。

ユーザ環境のデフォルト

sendBufferSize

Integer (整数)

基盤となる AMQP コネクタに設定する送信バッファサイズ (v1.5.0 以降)。

ユーザ環境のデフォルト

操作

ソース

Consume

<amqp:consume>

ユーザが任意のキューから単一の AMQP メッセージをコンシュームできるようにする操作。

パラメータ

名前 説明 デフォルト値 必須

Configuration (設定)

String (文字列)

使用する設定の名前。

x

Queue name (キュー名)

String (文字列)

メッセージがコンシュームされるキューの名前。

x

Content-Type (コンテンツタイプ)

String (文字列)

メッセージのコンテンツタイプ。

Encoding (エンコード)

String (文字列)

メッセージのコンテンツエンコード。

Fallback Queue Definition (代替キュー定義)

Definition of a Queue (キューの定義)

queueName のキューがない場合にキューを宣言するために使用されるキュー定義。

Ack Mode (肯定応答モード)

Enumeration (列挙)。次のいずれかになります。

  • IMMEDIATE

  • MANUAL

メッセージとセッションで設定する ConsumerAckMode。

Maximum Wait (最大待機)

Number (数値)

タイムアウトとなる前にメッセージを待つ最大時間。

10000

Maximum Wait Unit (最大待機単位)

Enumeration (列挙)。次のいずれかになります。

  • NANOSECONDS (ナノ秒)

  • MICROSECONDS (マイクロ秒)

  • MILLISECONDS (ミリ秒)

  • SECONDS (秒)

  • MINUTES (分)

  • HOURS (時間)

  • DAYS (日)

[Maximum wait time (最大待機時間)]​ 設定で使用する時間単位。

MILLISECONDS

Transactional Action (トランザクションアクション)

Enumeration (列挙)。次のいずれかになります。

  • ALWAYS_JOIN​

  • JOIN_IF_POSSIBLE​

  • NOT_SUPPORTED

トランザクションに関する操作で実行できる結合アクションの種別。

JOIN_IF_POSSIBLE

Reconnection Strategy (再接続戦略)

接続エラーが発生した場合の再試行戦略。

出力

Any (いずれか)

Attributes Type (属性型)

次の設定の場合

スロー

  • AMQP:CONNECTIVITY

  • AMQP:CONSUMING

  • AMQP:​CREATION_NOT_ALLOWED​

  • AMQP:​QUEUE_NOT_FOUND​

  • AMQP:​RETRY_EXHAUSTED​

  • AMQP:TIMEOUT

Publish

<amqp:publish>

ユーザが単一の AMQP メッセージ を任意のエクスチェンジにパブリッシュできるようにする操作。

パラメータ

名前 説明 デフォルト値 必須

Configuration (設定)

String (文字列)

使用する設定の名前。

x

Exchange Name (エクスチェンジ名)

String (文字列)

メッセージのパブリッシュ先のエクスチェンジの名前。

x

Fallback Exchange Definition (代替エクスチェンジ定義)

Definition of an Exchange (エクスチェンジの定義)

exchangeName のエクスチェンジがない場合にエクスチェンジを宣言するために使用されるエクスチェンジ定義。

Routing Keys (ルーティングキー)

LIST

ルーティングキーのリスト。

Delivery Mode (配信モード)

Enumeration (列挙)。次のいずれかになります。

  • PERSISTENT

  • TRANSIENT

AMQP ブローカーにパブリッシュするときに使用する配信モード。

PERSISTENT

Correlation Id (相関 ID)

String (文字列)

メッセージの AMQPCorrelationID ヘッダー。

ContentType

String (文字列)

本文のコンテンツタイプ。

Encoding (エンコード)

String (文字列)

メッセージの本文の outboundEncoding。

Reply To (返信先)

String (文字列)

このメッセージの返信先となるキューの AMQP replyTo​ プロパティ情報。

User Properties (ユーザプロパティ)

Object (オブジェクト)

このメッセージについて設定するカスタムユーザプロパティ。各プロパティは他のデフォルトの AMQP ユーザプロパティとマージされます。すべての AMQP ユーザプロパティは単一のオブジェクトで一度に設定されます。このオブジェクトは、#[output application/json --- { userName: vars.user, appName: 'myApp'}]​ のように DataWeave オブジェクトとして書き込めます。. 続いて、ユーザプロパティオブジェクトのそれぞれのキー/値が個別の AMQP ユーザプロパティとして設定されます。

Reconnection Strategy (再接続戦略)

接続エラーが発生した場合の再試行戦略。

次の設定の場合

スロー

  • AMQP:​CREATION_NOT_ALLOWED​

  • AMQP:​ILLEGAL_BODY​

  • AMQP:PUBLISHING

  • AMQP:​RETRY_EXHAUSTED​

  • AMQP:​UNROUTABLE_MESSAGE​

Publish Consume

<amqp:publish-consume>

AMQP エクスチェンジにメッセージを送信し、指定された replyTo 宛先または動的に作成される一時的な宛先への応答を待機します。

パラメータ

名前 説明 デフォルト値 必須

Configuration (設定)

String (文字列)

使用する設定の名前。

x

Exchange Name (エクスチェンジ名)

String (文字列)

メッセージのパブリッシュ先のエクスチェンジの名前。

x

Correlation Id (相関 ID)

String (文字列)

メッセージの AMQPCorrelationID ヘッダー。

ContentType

String (文字列)

本文のコンテンツタイプ。

/

/

String (文字列)

メッセージの本文の outboundEncoding。

User Properties (ユーザプロパティ)

Object (オブジェクト)

このメッセージについて設定する必要があるカスタムユーザプロパティ。各プロパティは他のデフォルトの AMQP ユーザプロパティとマージされます。すべての AMQP ユーザプロパティは単一のオブジェクトで一度に設定されます。このオブジェクトは、#[output application/json --- { userName: vars.user, appName: 'myApp'}]​ のように DataWeave オブジェクトとして書き込めます。. 続いて、ユーザプロパティオブジェクトのそれぞれのキー/値が個別の AMQP ユーザプロパティとして設定されます。

Maximum Wait (最大待機)

Number (数値)

タイムアウトとなる前にメッセージを待つ最大時間。

10000

Maximum Wait Unit (最大待機単位)

Enumeration (列挙)。次のいずれかになります。

  • NANOSECONDS (ナノ秒)

  • MICROSECONDS (マイクロ秒)

  • MILLISECONDS (ミリ秒)

  • SECONDS (秒)

  • MINUTES (分)

  • HOURS (時間)

  • DAYS (日)

maximumWaitTime 設定で使用する時間単位。

MILLISECONDS (ミリ秒)

Reconnection Strategy (再接続戦略)

接続エラーが発生した場合の再試行戦略。

出力

Any (いずれか)

Attributes Type (属性型)

次の設定の場合

スロー

  • AMQP:CONNECTIVITY

  • AMQP:CONSUMING

  • AMQP:​CREATION_NOT_ALLOWED​

  • AMQP:​ILLEGAL_BODY​

  • AMQP:PUBLISHING

  • AMQP:​PUBLISHING_CONSUMING​

  • AMQP:​QUEUE_NOT_FOUND​

  • AMQP:​RETRY_EXHAUSTED​

  • AMQP:TIMEOUT

Ack

<amqp:ack>

ユーザが配信された AMQP メッセージを肯定応答できるようにする操作。

パラメータ

名前 説明 デフォルト値 必須

Ack Id

String (文字列)

肯定応答するメッセージの AckId。

x

Reject

<amqp:reject>

ユーザが配信された AMQP メッセージを拒否できるようにする操作。

パラメータ

名前 説明 デフォルト値 必須

Ack Id

String (文字列)

肯定応答するメッセージの AckId。

x

Requeue (キューに再登録)

Boolean (ブール)

拒否されたメッセージをキューに再登録する必要があるかどうかを示します。

false

ソース

リスナ

<amqp:listener>

受信メッセージをリスンするためのキューの AMQP リスナ。

パラメータ

名前 説明 デフォルト値 必須

Configuration (設定)

String (文字列)

使用する設定の名前。

x

Queue Name (キュー名)

String (文字列)

コンシュームするキューの名前。

x

Number Of Consumers (コンシューマ数)

Number (数値)

AMQP メッセージを受信するために使用する同時コンシューマ数。

4

Consumer Tag (コンシューマタグ)

String (文字列)

コンテキストを確立するためにクライアントによって生成されたコンシューマタグ。

4

Recovery Strategy (回復戦略)

Enumeration (列挙)。次のいずれかになります。

  • None (なし)

  • NO_REQUEUE​

  • REQUEUE

チャネル回復またはロールバックの実行時に使用する戦略。

REQUEUE

Inbound content type (インバウンドコンテンツタイプ)

String (文字列)

メッセージ本文のコンテンツタイプ。

Inbound encoding (インバウンドエンコード)

String (文字列)

メッセージの本文の inboundEncoding。

Redelivery Policy (再配信ポリシー)

項目 説明 デフォルト値 必須

Max Redelivery Count (最大再配信数)

Number (数値)

正常に処理されずにプロセス失敗メッセージがトリガされるまでにメッセージを再配信できる最大回数。

Use Secure Hash (セキュアハッシュを使用)

Boolean (ブール)

再配信されたメッセージの識別にセキュアハッシュアルゴリズムを使用するかどうか。

Message Digest Algorithm (メッセージダイジェストアルゴリズム)

String (文字列)

使用するセキュアハッシュアルゴリズム。設定しない場合、デフォルトの SHA-256 になります。

Id Expression (ID 式)

String (文字列)

メッセージがいつ再配信されたのかを判断するために使用する 1 つ以上の式を定義します。このプロパティは、useSecureHash が false の場合にのみ設定できます。

Object Store (オブジェクトストア)

ObjectStore

各メッセージの再配信カウンタが保存されるオブジェクトストア。

Reconnect (再接続)

項目 説明 デフォルト値 必須

Frequency (頻度)

Number (数値)

再接続する頻度 (ミリ秒)。

Count (数)

Number (数値)

再接続を試みる回数。

blocking (ブロック)

Boolean (ブール)

false の場合、再接続戦略が個別の非ブロックスレッドで実行されます。

true

Reconnect Forever (繰り返し再接続)

項目 説明 デフォルト値 必須

Frequency (頻度)

Number (数値)

再接続する頻度 (ミリ秒)。

blocking (ブロック)

Boolean (ブール)

false の場合、再接続戦略が個別の非ブロックスレッドで実行されます。

true

キュー定義

パラメータ

名前 説明 デフォルト値 必須

Removal Strategy (削除戦略)

Enumeration (列挙)。次のいずれかになります。

  • EXPLICIT

  • SHUTDOWN

  • UNUSED

宣言されたキューをブローカーから削除するタイミングを定義します。

EXPLICIT

Exchange to Bind (バインドするエクスチェンジ)

String (文字列)

キューのバインド先のエクスチェンジを定義します。

Binding Routing Key (ルーティングキーのバインド)

String (文字列)

エクスチェンジへのバインドで使用するルーティングキーを定義します。(v1.4.0 以降)

交換定義

パラメータ

名前 説明 デフォルト値 必須

Removal Strategy (削除戦略)

Enumeration (列挙)。次のいずれかになります。

  • EXPLICIT

  • SHUTDOWN

  • UNUSED

宣言されたエクスチェンジをブローカーから削除するタイミングを定義します。

EXPLICIT

Exchange Type (エクスチェンジ種別)

Enumeration (列挙)。次のいずれかになります。

  • DIRECT

  • TOPIC

  • FANOUT (論理出力)

  • HEADERS

宣言するエクスチェンジの種別。

FANOUT

AMQP Attributes (SOAP 属性)

パラメータ

名前 説明 デフォルト値 必須

Envelope (エンベロープ)

ENVELOPE

AMQP の基本的な方法で使用されるパラメータのグループをカプセル化します。

Properties (プロパティ)

PROPERTIES

AMQP メッセージのプロパティ。

ヘッダー

MAP

AMQP メッセージヘッダー。

Envelope (エンベロープ)

パラメータ

名前 説明 デフォルト値 必須

Delivery Tag (配信タグ)

Number (数値)

配信タグ。

Redeliver (再配信)

Boolean (ブール)

これが失敗した肯定応答の後の再配信の場合は true。

Exchange

String (文字列)

現在の操作で使用されるエクスチェンジ。

routingKey

String (文字列)

関連付けられたルーティングキー。

Properties (プロパティ)

パラメータ

名前 説明 デフォルト値 必須

Content-Type (コンテンツタイプ)

String (文字列)

メッセージのコンテンツタイプ。

Content Encoding (コンテンツエンコード)

String (文字列)

メッセージのコンテンツエンコード。

Delivery Mode (配信モード)

DELIVERY MODE (配信モード)

AMQP ブローカーにパブリッシュするときに使用する配信モード。

Priority (優先度)

Number (数値)

AMQP ブローカーにパブリッシュするときに使用する優先度。

Correlation Id (相関 ID)

String (文字列)

要求-返信のメッセージで識別される RPC パターンを実装する場合に使用されます。

replyTo

String (文字列)

RPC の場合に設定される宛先。

expiration (有効期限)

String (文字列)

メッセージの有効期限 (ミリ秒)。

messageId

String (文字列)

メッセージの messageId。

timestamp (タイムスタンプ)

TIMESTAMP

コンシュームされたメッセージのタイムスタンプ。

String (文字列)

コンシュームされたメッセージの種別。

userId

String (文字列)

メッセージのユーザ ID。

appId

String (文字列)

メッセージのアプリケーション ID。

clusterId

String (文字列)

メッセージのクラスタ ID。

Was this article helpful?

💙 Thanks for your feedback!