新規メッセージのリスン方法

AMQP コネクタのリスナソースにより、メッセージが AMQP キューに到着した時点でコンシュームできるようになります。

新規メッセージのリスン

キューからの新しいメッセージをリスンするための構文は、次のとおりです。

<amqp:listener config-ref="config" queueName="targetQueue"/>

上記のソースは、queueName パラメータによって識別されるキューの新しいメッセージをリスンし、AMQP メッセージがキューで使用できるようになるたびに AmqpMessage を返します。

AmqpMessage には以下が含まれます。

  • ペイロードとしてのメッセージのコンテンツ。

  • メッセージ属性に含まれるメッセージのメタデータ。

デフォルトでは、コンシュームされたメッセージが肯定応答されるのは、メッセージを受信するフローの実行が正常に完了した場合のみです。 代わりに、フローの実行中にエラーが発生した場合、recoverStrategy に従ってセッションが回復され、メッセージが再配信されます。

メッセージ肯定応答についての詳細は、「メッセージ肯定応答の処理」を参照してください。

メッセージスループットの設定

追加の処理力が必要な場合、AMQP リスナでは任意のリスナがメッセージを同時にコンシュームするために使用する numberOfConsumers を設定できます。 デフォルトでは、各リスナは 4 つのコンシューマを使用し、これらの各コンシューマによって個別のチャネルでメッセージが同時に生成されます。各コンシューマはメッセージが処理されるまで待機するため、最大で同時に 4 件のメッセージを送信中の状態にできることになります。 メッセージの同時処理数を増やす必要がある場合は、リスナの numberOfConsumers 設定を増やしてください。各コンシューマは異なるチャネルを使用します。

Mime タイプとエンコーディング

AMQP コネクタは、メッセージの contentType プロパティに基づいてメッセージの MIME タイプ (contentType) を自動的に判断するために最善を尽くします。ただし、最善の推測では不十分な場合もあり、こういった場合にはメッセージのコンテンツを実際に把握しておく必要があります。

このような場合には、contentType パラメータを使用して、コンテンツタイプを特定の値に設定します。

同じプロセスが文字コードでも機能します。デフォルトでは、他に情報が提供されない限り、コネクタはランタイムのデフォルトエンコーディングがメッセージのエンコーディングに一致することを前提とします。これは inboundEncoding パラメータで設定できます。

AMQP リスナでのキューの宣言

デフォルトでは、定義されたキューが存在しないと AMQP:QUEUE_NOT_FOUND エラーで consume 操作に失敗します。

キューを宣言する必要がある場合、キューを宣言するためにエンティティの定義を参照するか、インラインで定義する必要があります。

<amqp:listener config-ref="Amqp_Config" queueName="testQueue">
	<amqp:fallback-queue-definition removalStrategy="SHUTDOWN" exchangeToBind="echangeToBind" />
</amqp:listener>

キューの定義では、パラメータ exchangeToBind を使用してバインドを作成できます。

キューは、高レベル要素として定義することもできます。

<amqp:queue-definition name="targetQueueDefinition" exchangeToBind="testExchange" />

<amqp:listener config-ref="Amqp_Config" queueName="testQueue" fallbackQueueDefinition="targetQueueDefinition">

AMQP トポグラフィーの変更を回避する方法

createFallbackQueue グローバル設定を指定して、代替キューの定義による AMQP トポグラフィーの変更を回避できます。「AMQP トポグラフィーの変更を回避する方法」を参照してください。

受信メッセージへの応答

受信 AMQP メッセージで REPLY_TO プロパティが宣言されている場合、AMQP リスナは メッセージが正常に処理された場合に 自動的に応答を生成します。つまり、フロー実行中にエラーは発生しません。 その場合、フローが完了すると、応答が処理済みのメッセージプロパティで指定された交換にパブリッシュされます。エラーの場合、回復戦略が適用され、メッセージが正常に処理されるまで応答は REPLY_TO プロパティに送信されません。

エラーの場合の回復戦略

エラーが発生した場合、recoverStrategy が適用されます。 デフォルトでは、recoverStrategyREQUEUE に設定されます。つまり、メッセージがコンシューム元の queue に送信され、他の AMQP コンシューマがそのメッセージを取得する可能性があります。 たとえば、NO_REQUEUE に設定されている場合、次のようになります。

<amqp:listener config-ref="Amqp_Config" queueName="testQueue" recoverStrategy="NO_REQUEUE">
	<amqp:fallback-queue-definition removalStrategy="SHUTDOWN" exchangeToBind="exchangeToBindToQueue" />
</amqp:listener>

メッセージは、メッセージを取得したコンシューマに直接送信されます。 NONErecoverStrategy として定義されている場合、アクションは実行されません。

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub