Anypoint MQ ACK および NACK 操作 - Mule 4

メッセージの肯定応答 (ACK) は、メッセージが処理されたので再配信されないようにキューから削除する必要があることを Anypoint MQ ブローカーに通知します。メッセージの否定応答 (NACK) は、メッセージを処理せずに、任意のコンシューマーに再配信するためにキューに戻すようブローカーに通知します。

肯定応答モード

Anypoint MQ Connector には 3 つの肯定応答モードがあります。

  • Immediate (即時)

    アプリケーションがメッセージを処理する前にコンシュームするときにそのメッセージの肯定応答を行うには、即時モードを使用します。

  • 自動

    フローの実行が正常に完了した場合にのみ受信メッセージの自動的な肯定応答を行うには、自動モードを使用します。

  • 手動

    メッセージの肯定応答を行うタイミングを決定するアプリケーションロジックにすべての責任を委譲するには、手動モードを使用します。

即時肯定応答

IMMEDIATE​ 肯定応答モードを使用して、アプリケーションがメッセージを処理する前にコンシュームするときにそのメッセージの肯定応答を行います。

メッセージ受信時の自動的な肯定応答は、処理中にエラーが発生した場合にメッセージを再配信しないことを意味します。メッセージを失うことなくエラーを管理するには、デッドレターキューを使用します。

IMMEDIATE​ モードを使用したメッセージの処理中にエラーが発生した場合、​ANYPOINT-MQ:ACKING​ エラーがスローされます。

自動肯定応答

AUTO​ 肯定応答モードを使用して、フローの実行が正常に完了した場合にのみ受信メッセージの自動的な肯定応答を行います。

フローの実行中にエラーが発生すると、メッセージは否定応答 (NACK) され、再配信のためにキューに戻されます。

手動肯定応答

MANUAL​ 肯定応答モードを使用して、アプリケーションロジックにメッセージの肯定応答のすべての責任を委譲します。

この設定を使用すると、受信したすべてのメッセージでは ​ackToken​ 値が含まれています。メッセージはそれによって特定の接続で一意に識別されます。

この ​ackToken​ 値を保存して、後で ACK または NACK 操作への入力として使用します。

肯定応答タイムアウト

AUTO​ または ​MANUAL​ 肯定応答モードを使用する場合、​acknowledgementTimeout​ パラメーターを使用して、自動的にキューに戻る前にメッセージが肯定応答を待機する時間を制御します。 acknowledgementTimeout​ タイマーはメッセージがコンシュームされたときに開始されます。

acknowledgementTimeout​ が設定されていない場合、Anypoint MQ Connector では 2 分が使用されます。 別の TTL が必要な場合は、サブスクライバーの ​acknowledgementTimeout​ パラメーターでそれを明示的に定義する必要があります。

acknowledgementTimeout​ を使用する場合、外部システムの遅延や高負荷によるアプリケーションのバックプレッシャーなどの予期しない遅延の時間を含む、予想されるアプリケーション時間処理を考慮してください。たとえば、コンシュームされたメッセージが 10 秒で処理されると予想される場合は、​acknowledgementTimeout​ を 15 秒以上に設定します。

Subscriber ソースで ​prefetch​ モードを使用する場合、Anypoint MQ は、​acknowledgementTimeout​ で指定された有効期限 (時間) を順守しようとします。 ただし、メッセージがバッファに保持されている時間がタイムアウト値の 80% に達すると、Anypoint MQ は有効期限タイムアウトを延長します。

このため、​acknowledgementTimeout​ を順守する必要がある場合は、​prefetch​ モードを使用しないでください。

肯定応答タイムアウト — AUTO 肯定応答モード

サブスクライバーがメッセージをコンシュームした後にタイマーが起動し、肯定応答を待機中のブローカーでメッセージがインフライトである期間を示します。​acknowledgementTimeout​ 制限に達する前にフロー処理が成功または失敗で終了しない場合、メッセージは再配信のために自動的にキューに戻されます。

肯定応答タイムアウト — MANUAL 肯定応答モード

Subscriber または Consume 操作で ​MANUAL​ 肯定応答モードを使用してメッセージをコンシュームしている場合、アプリケーションは ​acknowledgementTimeout​ 制限に達する前にメッセージ ​ackToken​ を使用して、ACK または NACK 操作を実行する必要があります。タイムアウトの前に指定したトークンでどちらの操作も実行されないと、メッセージは再配信のために自動的にキューに戻されます。タイムアウト制限後にメッセージの肯定応答を試行すると、期限切れのトークンが原因で失敗します。

ACK 操作 — MANUAL 肯定応答モード

メッセージの肯定応答 (ACK) は、メッセージが処理されたので再配信されないようにキューから削除する必要があることをブローカーに通知します。

手動モードを使用して ACK 操作を実行する場合、肯定応答トークン (​ackToken​) を使用してメッセージを識別する必要があります。これはメッセージ属性の ​ackToken​ 項目にあります。

<flow name="consumerWithManualAck">
    <scheduler>
        <scheduling-strategy >
          <fixed-frequency />
        </scheduling-strategy>
    </scheduler>

    <!-- Consume a message -->
    <anypoint-mq:consume destination="${destination}"
         acknowledgementMode="MANUAL"
            config-ref="Anypoint_MQ_Config"/>

    <!-- Save the ackToken for future acknowledgment -->
    <set-variable variableName="currentAckToken"
       value="#[attributes.ackToken]"/>

    <!-- Do the required processing of the message -->
    <flow-ref name="process-message"/>

    <!-- Use the ackToken to ACK the message -->
    <anypoint-mq:ack ackToken="#[vars.currentAckToken]"
         config-ref="Anypoint_MQ_Config" />
</flow>

メッセージの処理中に void でない操作が起動された場合、Mule メッセージのペイロードと属性が変更されます。処理後に ACK 操作を実行するには、変数に ​ackToken​ を保存する必要があります。

後で使用するために属性を保存するには、​target​ および ​targetValue​ パラメーターを使用して変数にメッセージ全体を保存します。

<flow name="consumerWithManualAck">
      <scheduler>
        <scheduling-strategy >
          <fixed-frequency />
        </scheduling-strategy>
      </scheduler>

    <!-- Consume a message -->
    <anypoint-mq:consume destination="${destination}"
        acknowledgementMode="MANUAL"
        config-ref="Anypoint_MQ_Config"
        target="mqMessage"
        targetValue="#[message]"/>


    <!--Do any message processing-->
    <jms:publish-consume destination="#[vars.mqMessage.attributes.targetDestination]"
        config-ref="JMS_Config">
        <jms:message>
            <jms:body>#[vars.mqMessage.payload]</jms:body>
        </jms:message>
    </jms:publish-consume>

    <!-- Use the ackToken to ACK the message -->
    <anypoint-mq:ack ackToken="#[vars.mqMessage.attributes.ackToken]"
        config-ref="Anypoint_MQ_Config" />
</flow>

NACK 操作

メッセージの否定応答 (NACK) は、メッセージが正常に処理されなかったので、任意のコンシューマーに再配信するためにキューに戻すようブローカーに通知します。

肯定応答トークン ​ackToken​ は、NACK 操作を実行するときに使用する必要があるメッセージの一意の識別子です。これはメッセージ属性の ​ackToken​ 項目にあります。

<flow name="consumerWithManualAck">
    <scheduler>
      <scheduling-strategy >
        <fixed-frequency />
      </scheduling-strategy>
    </scheduler>

    <!-- Consume a message -->
    <anypoint-mq:consume destination="${destination}"
           acknowledgementMode="MANUAL"
           config-ref="Anypoint_MQ_Config"
           target="mqMessage"
          targetValue="#[message]"/>

    <!--Do message processing -->
    <logger message="#[payload]"/>

    <!-- Use the ackToken to NACK the message -->
    <anypoint-mq:nack ackToken="#[vars.mqMessage.attributes.ackToken]"
    config-ref="Anypoint_MQ_Config" />
</flow>