Flex Gateway新着情報
Governance新着情報
Monitoring API ManagerAMQP 用 Anypoint Connector (AMQP Connector) のリスナーソースにより、メッセージが AMQP キューに到着した時点でコンシュームできるようになります。
キューからの新規メッセージをリスンするための構文は次のとおりです。
<amqp:listener config-ref="config" queueName="targetQueue"/>
xml
このソースは、queueName
パラメーターによって識別されるキューの新しいメッセージをリスンし、AMQP メッセージがキューで使用できるようになるたびに AmqpMessage
を返します。
AmqpMessage
には以下が含まれます。
メッセージのペイロードとしてのコンテンツ。
メッセージ属性に含まれるメッセージのメタデータ。
デフォルトでは、コンシュームされたメッセージは、メッセージを受信するフローの実行が正常に完了した場合にのみ肯定応答されます。 代わりに、フローの実行中にエラーが発生した場合、recoverStrategy に従ってセッションが回復され、メッセージが再配信されます。
メッセージ肯定応答についての詳細は、「メッセージ肯定応答の処理」を参照してください。
より多くの処理能力が必要である場合、AMQP リスナーでは numberOfConsumers
を設定することで、特定のリスナーがメッセージをコンシュームするために同時に使用するコンシューマーの数を指定できます。
デフォルトでは、各リスナーは 4 つのコンシューマーを使用し、これらの各コンシューマーによって個別のチャネルでメッセージが同時に生成されます。各コンシューマーは処理するメッセージを待つため、同時に最大 4 件のメッセージが転送されることになります。
メッセージの同時処理数を増やす必要がある場合は、リスナーの numberOfConsumers
設定を増やしてください。各コンシューマーは異なるチャネルを使用します。
AMQP Connector は、メッセージの contentType
プロパティに基づいてメッセージの MIME タイプ (contentType
) を自動的に判断します。ただし、このプロセスでは不十分な場合もあり、こういった場合にはメッセージのコンテンツを実際に把握しておく必要があります。
このような場合には、contentType
パラメーターを使用して、コンテンツタイプを特定の値に設定します。
同じプロセスが文字コードでも機能します。デフォルトでは、他の情報が提供されていない場合、コレクターはランタイムのデフォルトエンコードがメッセージのエンコードと一致するものと想定します。これは inboundEncoding
パラメーターで設定できます。
デフォルトでは、定義されたキューが存在しないと AMQP:QUEUE_NOT_FOUND
エラーで consume
操作に失敗します。
キューを宣言する必要がある場合、キューを宣言するためにエンティティの定義を参照するか、インラインで定義する必要があります。
<amqp:listener config-ref="Amqp_Config"
queueName="testQueue">
<amqp:fallback-queue-definition
removalStrategy="SHUTDOWN"
exchangeToBind="echangeToBind" />
</amqp:listener>
xml
キューの定義では、パラメーター exchangeToBind
を使用してバインドを作成できます。
キューは、高レベル要素として定義することもできます。
<amqp:queue-definition
name="targetQueueDefinition"
exchangeToBind="testExchange" />
<amqp:listener config-ref="Amqp_Config"
queueName="testQueue"
fallbackQueueDefinition="targetQueueDefinition">
xml
createFallbackQueue
グローバル設定を指定して、代替キューの定義による AMQP トポグラフィの変更を回避できます。
受信 AMQP メッセージで REPLY_TO プロパティが宣言されている場合、AMQP リスナーはメッセージが正常に処理された場合に自動的に応答を生成します。つまり、フロー実行中にエラーは発生しません。 その場合、フローが完了すると、応答が処理済みのメッセージプロパティで指定されたエクスチェンジにパブリッシュされます。エラーの場合、回復戦略が適用され、メッセージが正常に処理されるまで応答は REPLY_TO プロパティに送信されません。
エラーが発生した場合、recoverStrategy
が適用されます。
デフォルトでは、recoverStrategy
が REQUEUE
に設定されます。つまり、メッセージがコンシューム元の queue
に送信され、他の AMQP コンシューマーがそのメッセージを取得する可能性があります。
たとえば、NO_REQUEUE
に設定されている場合、次のようになります。
<amqp:listener config-ref="Amqp_Config"
queueName="testQueue"
recoverStrategy="NO_REQUEUE">
<amqp:fallback-queue-definition
removalStrategy="SHUTDOWN"
exchangeToBind="exchangeToBindToQueue" />
</amqp:listener>
xml
メッセージは、メッセージを取得したコンシューマーに直接送信されます。
NONE
が recoverStrategy
として定義されている場合、アクションは実行されません。
デフォルトでは、AMQP リスナーはクラスター環境のすべてのノードからメッセージを受信します。他のソースと同様に、primaryNodeOnly
属性を使用してこれを変更できます。
<amqp:listener config-ref="Amqp_Config"
queueName="testQueue"
recoverStrategy="NO_REQUEUE"
primaryNodeOnly="true" />
xml