AMQP メッセージのコンシューム方法

AMQP コネクタの Consume 操作により、任意の AMQP キューからフローの任意の時点でメッセージをコンシュームする機能が提供されます。

メッセージのコンシューム

キューからメッセージをコンシュームするための構文は、次のとおりです。

<amqp:consume config-ref="AMQP_config" queueName="#[vars.queue]"/>

上記の操作では、queueName によって識別されるキューで最初に使用可能なメッセージをコンシュームし、AmqpMessage に変換するため、次の構造が得られます。

  • ペイロードとしてのメッセージのコンテンツ。

  • メッセージ属性に含まれるメッセージのメタデータ

メッセージは受信されると、デフォルトでは直ちに肯定応答されます。何らかの処理を行った後でメッセージの肯定応答を制御する場合は、ackModeMANUAL に設定する必要があります。 メッセージ肯定応答についての詳細は、「メッセージ肯定応答の処理」を参照してください。

メッセージの待機

デフォルトでは、最大待機時間は 10 秒に設定されており、その期間中に使用できるメッセージがない場合は AMQP:TIMEOUT エラーが発生します。 ユースケースに最適なタイムアウトを設定するには、maximumWait パラメータと maximumWaitUnit パラメータをカスタマイズします。

メッセージが到着するのを永久に待機するには、maximumWait の値を -1 に設定する必要があります。この場合、TIMEOUT エラーが発生することはありません。

Mime タイプとエンコーディング

AMQP コネクタは、メッセージの contentType プロパティに基づいてメッセージの MIME タイプ (contentType) を自動的に判断するために最善を尽くします。ただし、最善の推測では不十分な場合もあり、こういった場合にはメッセージのコンテンツを実際に把握しておく必要があります。

このような場合には、contentType パラメータを使用して、コンテンツタイプを特定の値に設定します。

同じプロセスが文字コードでも機能します。デフォルトでは、他に情報が提供されない限り、コネクタはランタイムのデフォルトエンコーディングがメッセージのエンコーディングに一致することを前提とします。これは encoding パラメータで設定できます。

Consume 操作でのキューの宣言

デフォルトでは、定義されたキューが存在しないと AMQP:QUEUE_NOT_FOUND エラーで consume 操作に失敗します。

キューを宣言する必要がある場合、キューを宣言するためにエンティティの定義を参照するか、インラインで定義する必要があります。

<amqp:consume config-ref="Amqp_Config" queueName="testQueue">
	<amqp:fallback-queue-definition removalStrategy="SHUTDOWN" exchangeToBind="exchangeToBindToQueue" />
</amqp:consume>

キューの定義では、パラメータ exchangeToBind を使用してバインドを作成できます。

キューは、高レベル要素として定義することもできます。

<amqp:queue-definition name="targetQueueDefinition" exchangeToBind="testExchange" />

<amqp:consume  config-ref="AMQP_Config" queueName="testQueue" fallbackQueueDefinition="targetQueueDefinition">

AMQP トポグラフィーの変更を回避する方法

createFallbackQueue グローバル設定を指定して、代替キューの定義による AMQP トポグラフィーの変更を回避できます。「AMQP トポグラフィーの変更を回避する方法」を参照してください。

受信メッセージのメタデータ

前述のとおり、受信された各メッセージは 2 つの部分で構成されます。

  • メッセージのコンテンツが含まれるペイロード

  • メッセージに関するメタデータが含まれる属性

このメタデータには、AMQP メッセージで使用できるすべての情報をマップする 4 つの部分があります。

  • エンベロープ

  • AckId

  • ヘッダー

  • Properties (プロパティ)

属性の構造についての詳細は、「AMQP リファレンス」をご覧ください。

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub