Flex Gateway新着情報
Governance新着情報
Monitoring API Managerメッセージの肯定応答 (ACK) は、メッセージが処理されたので再配信されないようにキューから削除する必要があることを Anypoint MQ ブローカーに通知します。メッセージの否定応答 (NACK) は、メッセージを処理せずに、任意のコンシューマーに再配信するためにキューに戻すようブローカーに通知します。
Anypoint MQ Connector には 3 つの肯定応答モードがあります。
Immediate (即時)
アプリケーションがメッセージを処理する前にコンシュームするときにそのメッセージの肯定応答を行うには、即時モードを使用します。
自動
フローの実行が正常に完了した場合にのみ受信メッセージの自動的な肯定応答を行うには、自動モードを使用します。
手動
メッセージの肯定応答を行うタイミングを決定するアプリケーションロジックにすべての責任を委譲するには、手動モードを使用します。
IMMEDIATE
肯定応答モードを使用して、アプリケーションがメッセージを処理する前にコンシュームするときにそのメッセージの肯定応答を行います。
Anypoint MQ はメッセージの肯定応答を行うと、キューからそのメッセージを削除し、処理中にエラーが発生した場合は再配信しません。
アプリケーションが肯定応答とディスパッチの間で再起動した場合、メッセージはキューから削除されているため、再度使用できない可能性があります。
処理されるまでメッセージが削除されないようにするには、代わりに [AUTO (自動)]
または [MANUAL (手動)]
肯定応答モードを使用します。
IMMEDIATE
モードを使用したメッセージの処理中にエラーが発生した場合、Anypoint MQ によって ANYPOINT-MQ:ACKING
エラーがスローされます。
メッセージを失うことなくエラーを管理するには、デッドレターキューを使用します。
AUTO
肯定応答モードを使用して、フローの実行が正常に完了した場合にのみ受信メッセージの自動的な肯定応答を行います。
フローの実行中にエラーが発生すると、メッセージは否定応答 (NACK) され、再配信のためにキューに戻されます。
MANUAL
肯定応答モードを使用して、アプリケーションロジックにメッセージの肯定応答のすべての責任を委譲します。
この設定を使用すると、受信したすべてのメッセージでは ackToken
値が含まれています。メッセージはそれによって特定の接続で一意に識別されます。
この ackToken
値を保存して、後で ACK または NACK 操作への入力として使用します。
AUTO
または MANUAL
肯定応答モードを使用する場合、acknowledgementTimeout
パラメーターを使用して、自動的にキューに戻る前にメッセージが肯定応答を待機する時間を制御します。
acknowledgementTimeout
タイマーはメッセージがコンシュームされたときに開始されます。
acknowledgementTimeout
が設定されていない場合、Anypoint MQ Connector では 2 分が使用されます。
別の TTL が必要な場合は、サブスクライバーの acknowledgementTimeout
パラメーターでそれを明示的に定義する必要があります。
acknowledgementTimeout
を使用する場合、外部システムの遅延や高負荷によるアプリケーションのバックプレッシャーなどの予期しない遅延の時間を含む、予想されるアプリケーション時間処理を考慮してください。たとえば、コンシュームされたメッセージが 10 秒で処理されると予想される場合は、acknowledgementTimeout
を 15 秒以上に設定します。
Subscriber ソースで このため、 |
サブスクライバーがメッセージをコンシュームした後にタイマーが起動し、肯定応答を待機中のブローカーでメッセージがインフライトである期間を示します。acknowledgementTimeout
制限に達する前にフロー処理が成功または失敗で終了しない場合、メッセージは再配信のために自動的にキューに戻されます。
Subscriber または Consume 操作で MANUAL
肯定応答モードを使用してメッセージをコンシュームしている場合、アプリケーションは acknowledgementTimeout
制限に達する前にメッセージ ackToken
を使用して、ACK または NACK 操作を実行する必要があります。タイムアウトの前に指定したトークンでどちらの操作も実行されないと、メッセージは再配信のために自動的にキューに戻されます。タイムアウト制限後にメッセージの肯定応答を試行すると、期限切れのトークンが原因で失敗します。
メッセージの肯定応答 (ACK) は、メッセージが処理されたので再配信されないようにキューから削除する必要があることをブローカーに通知します。
手動モードを使用して ACK 操作を実行する場合、肯定応答トークン (ackToken
) を使用してメッセージを識別する必要があります。これはメッセージ属性の ackToken
項目にあります。
<flow name="consumerWithManualAck">
<scheduler>
<scheduling-strategy >
<fixed-frequency />
</scheduling-strategy>
</scheduler>
<!-- Consume a message -->
<anypoint-mq:consume destination="${destination}"
acknowledgementMode="MANUAL"
config-ref="Anypoint_MQ_Config"/>
<!-- Save the ackToken for future acknowledgment -->
<set-variable variableName="currentAckToken"
value="#[attributes.ackToken]"/>
<!-- Do the required processing of the message -->
<flow-ref name="process-message"/>
<!-- Use the ackToken to ACK the message -->
<anypoint-mq:ack ackToken="#[vars.currentAckToken]"
config-ref="Anypoint_MQ_Config" />
</flow>
xml
メッセージの処理中に void でない操作が起動された場合、Mule メッセージのペイロードと属性が変更されます。処理後に ACK 操作を実行するには、変数に ackToken
を保存する必要があります。
後で使用するために属性を保存するには、target
および targetValue
パラメーターを使用して変数にメッセージ全体を保存します。
<flow name="consumerWithManualAck">
<scheduler>
<scheduling-strategy >
<fixed-frequency />
</scheduling-strategy>
</scheduler>
<!-- Consume a message -->
<anypoint-mq:consume destination="${destination}"
acknowledgementMode="MANUAL"
config-ref="Anypoint_MQ_Config"
target="mqMessage"
targetValue="#[message]"/>
<!--Do any message processing-->
<jms:publish-consume destination="#[vars.mqMessage.attributes.targetDestination]"
config-ref="JMS_Config">
<jms:message>
<jms:body>#[vars.mqMessage.payload]</jms:body>
</jms:message>
</jms:publish-consume>
<!-- Use the ackToken to ACK the message -->
<anypoint-mq:ack ackToken="#[vars.mqMessage.attributes.ackToken]"
config-ref="Anypoint_MQ_Config" />
</flow>
xml
メッセージの否定応答 (NACK) は、メッセージが正常に処理されなかったので、任意のコンシューマーに再配信するためにキューに戻すようブローカーに通知します。
肯定応答トークン ackToken
は、NACK 操作を実行するときに使用する必要があるメッセージの一意の識別子です。これはメッセージ属性の ackToken
項目にあります。
<flow name="consumerWithManualAck">
<scheduler>
<scheduling-strategy >
<fixed-frequency />
</scheduling-strategy>
</scheduler>
<!-- Consume a message -->
<anypoint-mq:consume destination="${destination}"
acknowledgementMode="MANUAL"
config-ref="Anypoint_MQ_Config"
target="mqMessage"
targetValue="#[message]"/>
<!--Do message processing -->
<logger message="#[payload]"/>
<!-- Use the ackToken to NACK the message -->
<anypoint-mq:nack ackToken="#[vars.mqMessage.attributes.ackToken]"
config-ref="Anypoint_MQ_Config" />
</flow>
xml