Flex Gateway新着情報
Governance新着情報
Monitoring API ManagerAMQP Connector の consume 操作により、任意の AMQP キューからフローの任意の時点でメッセージをコンシュームする機能が提供されます。
キューからメッセージをコンシュームするための構文は次のとおりです。
<amqp:consume config-ref="AMQP_config" queueName="#[vars.queue]"/>
この操作では、queueName 値によって識別されるキューで最初に使用可能なメッセージをコンシュームし、AmqpMessage に変換するため、次の構造が得られます。
ペイロードとしてのメッセージのコンテンツ
メッセージ属性に含まれるメッセージのメタデータ
メッセージは、デフォルトでは受信されるとすぐに肯定応答されます。何らかの処理を行った後でメッセージの肯定応答を制御する場合は、ackMode を MANUAL に設定します。
メッセージ肯定応答操作についての詳細は、「メッセージ肯定応答の処理」を参照してください。
デフォルトの AMQP グローバル設定要素 <amqp:config> は AMQP Listener ソース用に最適化されているため、この設定には Consume 操作に適したパラメーターがありません。
デフォルトの AMQP グローバル設定を Consume 操作で使用すると、次のエラーが返される場合があります。
com.mule.extensions.amqp.internal.connection.provider.GenericConnectionProvider.initialise:221 @415c7a8a ERROR
Consumer com.mule.extensions.amqp.internal.client.SingleMessageQueueingConsumer@741fd13b (amq.ctag-BsDMf9v86wa9v3e_mo1p8g) method handleDelivery for channel AMQChannel(amqp://xxxx@xxxx/xxxx,1) threw an exception for channel AMQChannel(amqp://xxxx@xxxx:5672/xxxx,1)
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:421)
at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicReject(RecoveryAwareChannelN.java:114)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicReject(AutorecoveringChannel.java:438)
at com.mule.extensions.amqp.internal.connection.channel.MuleAmqpChannel.basicReject(MuleAmqpChannel.java:383)
at com.mule.extensions.amqp.internal.client.SingleMessageQueueingConsumer.handleDelivery(SingleMessageQueueingConsumer.java:47)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.mule.service.scheduler.internal.AbstractRunnableFutureDecorator.doRun(AbstractRunnableFutureDecorator.java:111)
at org.mule.service.scheduler.internal.RunnableFutureDecorator.run(RunnableFutureDecorator.java:54)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
通常、Consume 操作が最初のメッセージのみを処理し、残りのメッセージを破棄する一方で、ブローカーがキュー内のすべてのメッセージをリクエスターに送信するため、このエラーが発生します。この動作を回避するには、ユースケースに応じて次のパラメーターの 1 つまたは両方を変更します。
[Maximum Wait (最大待機)] パラメーターをクリーンアップする
Studio で、フローの [Publish consume] 操作を選択します。
[Maximum Wait (最大待機)] パラメーターの値を空のままにして、すべての値をクリーンアップします。
XML エディターを使用している場合、maximumWait パラメーターのすべての値をクリーンアップします。
[Prefetch Count (プリフェッチ数)] パラメーターを更新する
Studio で、[Global Elements (グローバル要素)] タブに移動します。
[AMQP Config (Configuration) (AMQP 設定)] 設定を選択します。
[Edit (編集)] をクリックします。
[Quality of Service (サービス品質)] タブで、[Prefetch Count (プリフェッチ数)] を 1 に設定します。
XML エディターを使用している場合、AMQP グローバル設定要素 <amqp:config> の quality-of-service-config セクションで、prefetchCount パラメーターを 1 に設定します。
デフォルトの最大待機時間は 10 秒に設定されています。指定した待機時間中にメッセージが受信できなければ、AMQP:TIMEOUT エラーがスローされます。maximumWait および maximumWaitUnit パラメーターを設定して、待機時間をカスタマイズできます。
到着するメッセージの無限待機時間を作成するには、maximumWait 値を -1 に設定します。この場合は TIMEOUT エラーは発生しません。
AMQP Connector は、メッセージの contentType プロパティに基づいてメッセージの MIME タイプ (contentType) を自動的に判断するように設計されています。ただし、この判断をできない場合もあり、こういった場合にはメッセージのコンテンツを実際に把握しておく必要があります。
このような場合には、contentType パラメーターを使用して、コンテンツタイプを特定の値に設定します。
同じプロセスが文字コードでも機能します。デフォルトでは、他の情報が提供されていない場合、コレクターは Mule Runtime Engine のデフォルトエンコードがメッセージのエンコードと一致するものと想定します。これは encoding パラメーターで設定できます。
デフォルトでは、定義されたキューが存在しないと AMQP:QUEUE_NOT_FOUND エラーで consume 操作に失敗します。
キューを宣言する必要がある場合、エンティティの定義を参照するか、インラインで定義する必要があります。
<amqp:consume config-ref="Amqp_Config" queueName="testQueue">
<amqp:fallback-queue-definition removalStrategy="SHUTDOWN" exchangeToBind="exchangeToBindToQueue" />
</amqp:consume>
キューの定義では、パラメーター exchangeToBind を使用してバインドを作成できます。
キューは、高レベル要素として定義することもできます。
<amqp:queue-definition
name="targetQueueDefinition"
exchangeToBind="testExchange" />
<amqp:consume
config-ref="AMQP_Config"
queueName="testQueue"
fallbackQueueDefinition="targetQueueDefinition">
createFallbackQueue グローバル設定を指定して、代替キューの定義による AMQP トポグラフィの変更を回避できます。「AMQP トポグラフィの変更を回避する方法」を参照してください。
前述のとおり、受信した各メッセージは次の 2 つで構成されています。
メッセージのコンテンツが含まれるペイロード
メッセージに関するメタデータが含まれる属性
このメタデータには、AMQP メッセージで使用できるすべての情報をマップする 4 つの部分があります。
エンベロープ
AckId
ヘッダー
プロパティ
属性の構造についての詳細は、「AMQP リファレンス」をご覧ください。