新規メッセージのリスン - Mule 4

AMQP 用 Anypoint コネクタ (AMQP コネクタ) のリスナソースにより、メッセージが AMQP キューに到着した時点でコンシュームできるようになります。

新規メッセージのリスン

キューからの新規メッセージをリスンするための構文は次のとおりです。

<amqp:listener config-ref="config" queueName="targetQueue"/>

このソースは、queueName​ パラメータによって識別されるキューの新しいメッセージをリスンし、AMQP メッセージがキューで使用できるようになるたびに AmqpMessage​ を返します。

AmqpMessage​ には以下が含まれます。

  • メッセージのペイロードとしてのコンテンツ。

  • メッセージ属性に含まれるメッセージのメタデータ。

デフォルトでは、コンシュームされたメッセージは、メッセージを受信するフローの実行が正常に完了した場合にのみ肯定応答されます。 代わりに、フローの実行中にエラーが発生した場合、recoverStrategy に従ってセッションが回復され、メッセージが再配信されます。

メッセージ肯定応答についての詳細は、「メッセージ肯定応答の処理」​を参照してください。

メッセージスループットの設定

より多くの処理能力が必要である場合、AMQP リスナでは numberOfConsumers​ を設定することで、特定のリスナがメッセージをコンシュームするために同時に使用するコンシューマの数を指定できます。 デフォルトでは、各リスナは 4 つのコンシューマを使用し、これらの各コンシューマによって個別のチャネルでメッセージが同時に生成されます。各コンシューマは処理するメッセージを待つため、同時に最大 4 件のメッセージが転送されることになります。 メッセージの同時処理数を増やす必要がある場合は、リスナの numberOfConsumers​ 設定を増やしてください。各コンシューマは異なるチャネルを使用します。

MIME タイプおよび文字コード

AMQP コネクタは、メッセージの contentType​ プロパティに基づいてメッセージの MIME タイプ (contentType​) を自動的に判断します。ただし、このプロセスでは不十分な場合もあり、こういった場合にはメッセージのコンテンツを実際に把握しておく必要があります。

このような場合には、contentType​ パラメータを使用して、コンテンツタイプを特定の値に設定します。

同じプロセスが文字コードでも機能します。デフォルトでは、他の情報が提供されていない場合、コレクタはランタイムのデフォルトエンコードがメッセージのエンコードと一致するものと想定します。これは inboundEncoding​ パラメータで設定できます。

AMQP リスナでのキューの宣言

デフォルトでは、定義されたキューが存在しないと AMQP:QUEUE_NOT_FOUND​ エラーで consume​ 操作に失敗します。

キューを宣言する必要がある場合、キューを宣言するためにエンティティの定義を参照するか、インラインで定義する必要があります。

<amqp:listener config-ref="Amqp_Config"
	queueName="testQueue">
	<amqp:fallback-queue-definition
	  removalStrategy="SHUTDOWN"
	  exchangeToBind="echangeToBind" />
</amqp:listener>

キューの定義では、パラメータ exchangeToBind​ を使用してバインドを作成できます。

キューは、高レベル要素として定義することもできます。

<amqp:queue-definition
  name="targetQueueDefinition"
  exchangeToBind="testExchange" />

<amqp:listener config-ref="Amqp_Config"
  queueName="testQueue"
  fallbackQueueDefinition="targetQueueDefinition">

AMQP トポグラフィーの変更を回避する方法

createFallbackQueue​ グローバル設定を指定して、代替キューの定義による AMQP トポグラフィーの変更を回避できます。

受信メッセージへの応答

受信 AMQP メッセージで REPLY_TO​ プロパティが宣言されている場合、AMQP リスナは メッセージが正常に処理された場合に自動的に応答を生成します。つまり、フロー実行中にエラーは発生しません。 その場合、フローが完了すると、応答が処理済みのメッセージプロパティで指定されたエクスチェンジにパブリッシュされます。エラーの場合、回復戦略が適用され、メッセージが正常に処理されるまで応答は REPLY_TO​ プロパティに送信されません。

エラーの場合の回復戦略

エラーが発生した場合、recoverStrategy​ が適用されます。 デフォルトでは、recoverStrategy​ が REQUEUE​ に設定されます。つまり、メッセージがコンシューム元の queue​ に送信され、他の AMQP コンシューマがそのメッセージを取得する可能性があります。

たとえば、NO_REQUEUE​ に設定されている場合、次のようになります。

<amqp:listener config-ref="Amqp_Config"
		queueName="testQueue"
		recoverStrategy="NO_REQUEUE">
	<amqp:fallback-queue-definition
		removalStrategy="SHUTDOWN"
		exchangeToBind="exchangeToBindToQueue" />
</amqp:listener>

メッセージは、メッセージを取得したコンシューマに直接送信されます。 NONE​ が recoverStrategy​ として定義されている場合、アクションは実行されません。

クラスタ環境

デフォルトでは、AMQP リスナはクラスタ環境のすべてのノードからメッセージを受信します。他のソースと同様に、primaryNodeOnly​ 属性を使用してこれを変更できます。

<amqp:listener config-ref="Amqp_Config"
	queueName="testQueue"
	recoverStrategy="NO_REQUEUE"
	primaryNodeOnly="true" />

Was this article helpful?

💙 Thanks for your feedback!