AMQP メッセージのコンシューム

AMQP Connector の ​consume​ 操作により、任意の AMQP キューからフローの任意の時点でメッセージをコンシュームする機能が提供されます。

メッセージのコンシューム

キューからメッセージをコンシュームするための構文は次のとおりです。

<amqp:consume config-ref="AMQP_config" queueName="#[vars.queue]"/>

この操作では、​queueName​ 値によって識別されるキューで最初に使用可能なメッセージをコンシュームし、​AmqpMessage​ に変換するため、次の構造が得られます。

  • ペイロードとしてのメッセージのコンテンツ

  • メッセージ属性に含まれるメッセージのメタデータ

メッセージは、デフォルトでは受信されるとすぐに肯定応答されます。何らかの処理を行った後でメッセージの肯定応答を制御する場合は、​ackMode​ を ​MANUAL​ に設定します。 メッセージ肯定応答操作についての詳細は、​「メッセージ肯定応答の処理」​を参照してください。

Consume 操作のデフォルト設定

デフォルトの AMQP グローバル設定要素 ​<amqp:config>​ は AMQP Listener ソース用に最適化されているため、この設定には Consume 操作に適したパラメーターがありません。

デフォルトの AMQP グローバル設定を Consume 操作で使用すると、次のエラーが返される場合があります。

com.mule.extensions.amqp.internal.connection.provider.GenericConnectionProvider.initialise:221 @415c7a8a ERROR
Consumer com.mule.extensions.amqp.internal.client.SingleMessageQueueingConsumer@741fd13b (amq.ctag-BsDMf9v86wa9v3e_mo1p8g) method handleDelivery for channel AMQChannel(amqp://xxxx@xxxx/xxxx,1) threw an exception for channel AMQChannel(amqp://xxxx@xxxx:5672/xxxx,1)
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:421)
at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicReject(RecoveryAwareChannelN.java:114)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicReject(AutorecoveringChannel.java:438)
at com.mule.extensions.amqp.internal.connection.channel.MuleAmqpChannel.basicReject(MuleAmqpChannel.java:383)
at com.mule.extensions.amqp.internal.client.SingleMessageQueueingConsumer.handleDelivery(SingleMessageQueueingConsumer.java:47)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.mule.service.scheduler.internal.AbstractRunnableFutureDecorator.doRun(AbstractRunnableFutureDecorator.java:111)
at org.mule.service.scheduler.internal.RunnableFutureDecorator.run(RunnableFutureDecorator.java:54)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

通常、Consume 操作が最初のメッセージのみを処理し、残りのメッセージを破棄する一方で、ブローカーがキュー内のすべてのメッセージをリクエスターに送信するため、このエラーが発生します。この動作を回避するには、ユースケースに応じて次のパラメーターの 1 つまたは両方を変更します。

  • [Maximum Wait (最大待機)] パラメーターをクリーンアップする

    1. Studio で、フローの ​[Publish consume]​ 操作を選択します。

    2. [Maximum Wait (最大待機)]​ パラメーターの値を空のままにして、すべての値をクリーンアップします。

      XML エディターを使用している場合、​maximumWait​ パラメーターのすべての値をクリーンアップします。

  • [Prefetch Count (プリフェッチ数)] パラメーターを更新する

    1. Studio で、​[Global Elements (グローバル要素)]​ タブに移動します。

    2. [AMQP Config (Configuration) (AMQP 設定)]​ 設定を選択します。

    3. [Edit (編集)]​ をクリックします。

    4. [Quality of Service (サービス品質)]​ タブで、​[Prefetch Count (プリフェッチ数)]​ を ​1​ に設定します。

      XML エディターを使用している場合、AMQP グローバル設定要素 ​<amqp:config>​ の ​quality-of-service-config​ セクションで、​prefetchCount​ パラメーターを ​1​ に設定します。

メッセージの待機

デフォルトの最大待機時間は 10 秒に設定されています。指定した待機時間中にメッセージが受信できなければ、​AMQP:TIMEOUT​ エラーがスローされます。​maximumWait​ および ​maximumWaitUnit​ パラメーターを設定して、待機時間をカスタマイズできます。

到着するメッセージの無限待機時間を作成するには、​maximumWait​ 値を ​-1​ に設定します。この場合は ​TIMEOUT​ エラーは発生しません。

MIME タイプおよび文字コード

AMQP Connector は、メッセージの ​contentType​ プロパティに基づいてメッセージの MIME タイプ (​contentType​) を自動的に判断するように設計されています。ただし、この判断をできない場合もあり、こういった場合にはメッセージのコンテンツを実際に把握しておく必要があります。

このような場合には、​contentType​ パラメーターを使用して、コンテンツタイプを特定の値に設定します。

同じプロセスが文字コードでも機能します。デフォルトでは、他の情報が提供されていない場合、コレクターは Mule Runtime Engine のデフォルトエンコードがメッセージのエンコードと一致するものと想定します。これは ​encoding​ パラメーターで設定できます。

Consume 操作でのキューの宣言

デフォルトでは、定義されたキューが存在しないと ​AMQP:QUEUE_NOT_FOUND​ エラーで ​consume​ 操作に失敗します。

キューを宣言する必要がある場合、エンティティの定義を参照するか、インラインで定義する必要があります。

<amqp:consume config-ref="Amqp_Config" queueName="testQueue">
	<amqp:fallback-queue-definition removalStrategy="SHUTDOWN" exchangeToBind="exchangeToBindToQueue" />
</amqp:consume>

キューの定義では、パラメーター ​exchangeToBind​ を使用してバインドを作成できます。

キューは、高レベル要素として定義することもできます。

<amqp:queue-definition
	name="targetQueueDefinition"
	exchangeToBind="testExchange" />

<amqp:consume
	config-ref="AMQP_Config"
	queueName="testQueue"
	fallbackQueueDefinition="targetQueueDefinition">

AMQP トポグラフィの変更を回避

createFallbackQueue​ グローバル設定を指定して、代替キューの定義による AMQP トポグラフィの変更を回避できます。​「AMQP トポグラフィの変更を回避する方法」​を参照してください。

受信メッセージのメタデータ

前述のとおり、受信した各メッセージは次の 2 つで構成されています。

  • メッセージのコンテンツが含まれるペイロード

  • メッセージに関するメタデータが含まれる属性

このメタデータには、AMQP メッセージで使用できるすべての情報をマップする 4 つの部分があります。

  • エンベロープ

  • AckId

  • ヘッダー

  • プロパティ

属性の構造についての詳細は、​「AMQP リファレンス」​をご覧ください。