メッセージ肯定応答

Anypoint MQ コネクタは、さまざまな肯定応答設定を提供します。

即時肯定応答

anypoint-mq:subscriber または anypoint-mq:consume のいずれかの操作で acknowledgementModeIMMEDIATE に設定すると、メッセージがコンシュームされたらすぐに、アプリケーションによってメッセージの処理が行われる前に自動的に肯定応答されます。 受信したらすぐに自動的にメッセージが肯定応答されるということは、そのメッセージの処理中にエラーが発生しても再配信されないことを意味します。考えられるメッセージエラーを処理するには、メッセージを失わずにエラーを処理するデッドレターキューを作成するのと同様にアプリケーションロジックを処理する必要があります。

自動肯定応答

anypoint-mq:subscriber コンポーネントで AUTO acknowledgementMode を使用すると、フロー実行が正常に完了した場合に限り、受信したメッセージが自動肯定応答されます。 フローの実行中にエラーが発生すると、実行は完了する前に終了し、メッセージは否定応答され、再配信のためにキューに戻されます。

subscriber または consume のいずれかの操作でメッセージがコンシュームされた後、acknowledgementTimeout が始まります。これは、肯定応答の発生を待機中のブローカーでメッセージがインフライトである期間を示します。この期間が経過した後、メッセージは再配信のために自動的にキューに戻されます。 タイムアウトの期限が切れる前に、メッセージで ACK または NACK のいずれかを実行する必要があります。

subscriber によってメッセージがコンシュームされた後、acknowledgementTimeout が始まります。これは、肯定応答の発生を待機中のブローカーでメッセージがインフライトである期間を示します。この期間が経過する前にフロー処理が終了しない場合、メッセージは再配信のために自動的にキューに戻されます。

手動肯定応答

MANUAL acknowledgementMode は、メッセージで ACK を実行する責任をすべてアプリケーションロジックに委任します。

この設定を使用すると、subscriber または consume のいずれかの操作で受信したすべてのメッセージでは Mule メッセージ attributesAnypointMQMessageContext が含まれています。メッセージはそれによって特定の接続で一意に識別されます。

subscriber または consume のいずれかの操作でメッセージがコンシュームされた後、acknowledgementTimeout が始まります。これは、肯定応答の発生を待機中のブローカーでメッセージがインフライトである期間を示します。この期間が経過した後、メッセージは再配信のために自動的にキューに戻されます。 タイムアウトの期限が切れる前に、メッセージで ACK または NACK のいずれかを実行する必要があります。

ACK の実行

メッセージの肯定応答は、メッセージが処理されたのでキューから削除するようブローカーに命令します。 AnypointMQMessageContextanypoint-mq:ack 操作に渡すメッセージを識別します。

<flow name="consumerWithManualAck">
    <anypoint-mq:consume
       destination="${destination}"
       acknowledgementMode="MANUAL"
       config-ref="AMQ_Config"/>

    <!-- Process messages without modifying the payload-->
    <jms:publish config="JMS_config" destination="${bridgedDestination}">

    <anypoint-mq:ack messageContext="#[attributes]" config-ref="AMQ_Config"/>
</flow>
MQ メッセージの処理中に void でない操作が起動された場合、Mule メッセージのペイロードと属性が変更されます。処理後に ACK を行うには、変数に元の属性を保存する必要があります。

後で使用するために属性を保存するには、target および targetValue パラメータを使用して変数にメッセージ全体を保存すると便利です。

<flow name="consumerWithManualAck">
    <anypoint-mq:consume destination="${destination}"
                 acknowledgementMode="MANUAL"
                 config-ref="AMQ_Config"
                 target="mqMessage"
                 targetValue="#[message]"/>

    <!--Do message processing changing the payload-->
    <http:request method="POST" path="/invoicesProcessing" config-ref="httpRequestConfig">
            <http:body>#[vars.mqMessage.payload]</http:body>
    </http:request>

    <anypoint-mq:ack messageContext="#[vars.mqMessage.attributes]" config-ref="AMQ_Config"/>
</flow>

NACK の実行

メッセージの否定応答は、メッセージが処理されなかったので、任意のコンシューマに再配信するためにキューに戻すようブローカーに命令します。 メッセージを識別する AnypointMQMessageContextanypoint-mq:nack 操作に渡されます。

<flow name="consumerWithManualAck">
    <anypoint-mq:consume
                 destination="${destination}"
                 acknowledgementMode="MANUAL"
                 config-ref="AMQ_Config"/>

    <!--Do message processing-->
    <logger message="#[payload]">

    <anypoint-mq:nack messageContext="#[attributes]" config-ref="AMQ_Config"/>
</flow>

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub