AMQP メッセージのコンシューム - Mule4

AMQP コネクタの consume​ 操作により、任意の AMQP キューからフローの任意の時点でメッセージをコンシュームする機能が提供されます。

メッセージのコンシューム

キューからメッセージをコンシュームするための構文は次のとおりです。

<amqp:consume config-ref="AMQP_config" queueName="#[vars.queue]"/>

この操作では、queueName​ 値によって識別されるキューで最初に使用可能なメッセージをコンシュームし、AmqpMessage​ に変換するため、次の構造が得られます。

  • ペイロードとしてのメッセージのコンテンツ

  • メッセージ属性に含まれるメッセージのメタデータ

メッセージは、デフォルトでは受信されるとすぐに肯定応答されます。何らかの処理を行った後でメッセージの肯定応答を制御する場合は、ackMode​ を MANUAL​ に設定します。 メッセージ肯定応答操作についての詳細は、「メッセージ肯定応答の処理」​を参照してください。

メッセージの待機

デフォルトの最大待機時間は 10 秒に設定されています。指定した待機時間中にメッセージが受信できなければ、AMQP:TIMEOUT​ エラーがスローされます。maximumWait​ および maximumWaitUnit​ パラメータを設定して、待機時間をカスタマイズできます。

到着するメッセージの無限待機時間を作成するには、maximumWait​ 値を -1​ に設定します。. この場合は TIMEOUT​ エラーは発生しません。

MIME タイプおよび文字コード

AMQP コネクタは、メッセージの contentType​ プロパティに基づいてメッセージの MIME タイプ (contentType​) を自動的に判断するように設計されています。ただし、この判断をできない場合もあり、こういった場合にはメッセージのコンテンツを実際に把握しておく必要があります。

このような場合には、contentType​ パラメータを使用して、コンテンツタイプを特定の値に設定します。

同じプロセスが文字コードでも機能します。デフォルトでは、他の情報が提供されていない場合、コレクタは Mule Runtime Engine のデフォルトエンコードがメッセージのエンコードと一致するものと想定します。これは encoding​ パラメータで設定できます。

Consume 操作でのキューの宣言

デフォルトでは、定義されたキューが存在しないと AMQP:QUEUE_NOT_FOUND​ エラーで consume​ 操作に失敗します。

キューを宣言する必要がある場合、エンティティの定義を参照するか、インラインで定義する必要があります。

<amqp:consume config-ref="Amqp_Config" queueName="testQueue">
	<amqp:fallback-queue-definition removalStrategy="SHUTDOWN" exchangeToBind="exchangeToBindToQueue" />
</amqp:consume>

キューの定義では、パラメータ exchangeToBind​ を使用してバインドを作成できます。

キューは、高レベル要素として定義することもできます。

<amqp:queue-definition
	name="targetQueueDefinition"
	exchangeToBind="testExchange" />

<amqp:consume
	config-ref="AMQP_Config"
	queueName="testQueue"
	fallbackQueueDefinition="targetQueueDefinition">

AMQP トポグラフィーの変更を回避

createFallbackQueue​ グローバル設定を指定して、代替キューの定義による AMQP トポグラフィーの変更を回避できます。「AMQP トポグラフィーの変更を回避する方法」​を参照してください。

受信メッセージのメタデータ

前述のとおり、受信した各メッセージは 次の 2 つで構成されています。

  • メッセージのコンテンツが含まれるペイロード

  • メッセージに関するメタデータが含まれる属性

このメタデータには、AMQP メッセージで使用できるすべての情報をマップする 4 つの部分があります。

  • エンベロープ

  • AckId

  • ヘッダー

  • プロパティ

属性の構造についての詳細は、「AMQP リファレンス」​をご覧ください。

Was this article helpful?

💙 Thanks for your feedback!