新規メッセージのリスン - Mule 4

AMQP 用 Anypoint Connector (AMQP Connector) のリスナーソースにより、メッセージが AMQP キューに到着した時点でコンシュームできるようになります。

新規メッセージのリスン

キューからの新規メッセージをリスンするための構文は次のとおりです。

<amqp:listener config-ref="config" queueName="targetQueue"/>
xml

このソースは、​queueName​ パラメーターによって識別されるキューの新しいメッセージをリスンし、AMQP メッセージがキューで使用できるようになるたびに ​AmqpMessage​ を返します。

AmqpMessage​ には以下が含まれます。

  • メッセージのペイロードとしてのコンテンツ。

  • メッセージ属性に含まれるメッセージのメタデータ。

デフォルトでは、コンシュームされたメッセージは、メッセージを受信するフローの実行が正常に完了した場合にのみ肯定応答されます。 代わりに、フローの実行中にエラーが発生した場合、recoverStrategy に従ってセッションが回復され、メッセージが再配信されます。

メッセージ肯定応答についての詳細は、​「メッセージ肯定応答の処理」​を参照してください。

メッセージスループットの設定

より多くの処理能力が必要である場合、AMQP リスナーでは ​numberOfConsumers​ を設定することで、特定のリスナーがメッセージをコンシュームするために同時に使用するコンシューマーの数を指定できます。 デフォルトでは、各リスナーは 4 つのコンシューマーを使用し、これらの各コンシューマーによって個別のチャネルでメッセージが同時に生成されます。各コンシューマーは処理するメッセージを待つため、同時に最大 4 件のメッセージが転送されることになります。 メッセージの同時処理数を増やす必要がある場合は、リスナーの ​numberOfConsumers​ 設定を増やしてください。各コンシューマーは異なるチャネルを使用します。

MIME タイプおよび文字コード

AMQP Connector は、メッセージの ​contentType​ プロパティに基づいてメッセージの MIME タイプ (​contentType​) を自動的に判断します。ただし、このプロセスでは不十分な場合もあり、こういった場合にはメッセージのコンテンツを実際に把握しておく必要があります。

このような場合には、​contentType​ パラメーターを使用して、コンテンツタイプを特定の値に設定します。

同じプロセスが文字コードでも機能します。デフォルトでは、他の情報が提供されていない場合、コレクターはランタイムのデフォルトエンコードがメッセージのエンコードと一致するものと想定します。これは ​inboundEncoding​ パラメーターで設定できます。

AMQP リスナーでのキューの宣言

デフォルトでは、定義されたキューが存在しないと ​AMQP:QUEUE_NOT_FOUND​ エラーで ​consume​ 操作に失敗します。

キューを宣言する必要がある場合、キューを宣言するためにエンティティの定義を参照するか、インラインで定義する必要があります。

<amqp:listener config-ref="Amqp_Config"
	queueName="testQueue">
	<amqp:fallback-queue-definition
	  removalStrategy="SHUTDOWN"
	  exchangeToBind="echangeToBind" />
</amqp:listener>
xml

キューの定義では、パラメーター ​exchangeToBind​ を使用してバインドを作成できます。

キューは、高レベル要素として定義することもできます。

<amqp:queue-definition
  name="targetQueueDefinition"
  exchangeToBind="testExchange" />

<amqp:listener config-ref="Amqp_Config"
  queueName="testQueue"
  fallbackQueueDefinition="targetQueueDefinition">
xml

AMQP トポグラフィの変更を回避する方法

createFallbackQueue​ グローバル設定を指定して、代替キューの定義による AMQP トポグラフィの変更を回避できます。

受信メッセージへの応答

受信 AMQP メッセージで ​REPLY_TO​ プロパティが宣言されている場合、AMQP リスナーはメッセージが正常に処理された場合に自動的に応答を生成します。つまり、フロー実行中にエラーは発生しません。 その場合、フローが完了すると、応答が処理済みのメッセージプロパティで指定されたエクスチェンジにパブリッシュされます。エラーの場合、回復戦略が適用され、メッセージが正常に処理されるまで応答は ​REPLY_TO​ プロパティに送信されません。

エラーの場合の回復戦略

エラーが発生した場合、​recoverStrategy​ が適用されます。 デフォルトでは、​recoverStrategy​ が ​REQUEUE​ に設定されます。つまり、メッセージがコンシューム元の ​queue​ に送信され、他の AMQP コンシューマーがそのメッセージを取得する可能性があります。

たとえば、​NO_REQUEUE​ に設定されている場合、次のようになります。

<amqp:listener config-ref="Amqp_Config"
		queueName="testQueue"
		recoverStrategy="NO_REQUEUE">
	<amqp:fallback-queue-definition
		removalStrategy="SHUTDOWN"
		exchangeToBind="exchangeToBindToQueue" />
</amqp:listener>
xml

メッセージは、メッセージを取得したコンシューマーに直接送信されます。 NONE​ が ​recoverStrategy​ として定義されている場合、アクションは実行されません。

クラスター環境

デフォルトでは、AMQP リスナーはクラスター環境のすべてのノードからメッセージを受信します。他のソースと同様に、​primaryNodeOnly​ 属性を使用してこれを変更できます。

<amqp:listener config-ref="Amqp_Config"
	queueName="testQueue"
	recoverStrategy="NO_REQUEUE"
	primaryNodeOnly="true" />
xml