メッセージの肯定応答

このガイドは Mule 3 および Studio 6 にのみ適用されます。Mule 4 の肯定応答情報については、Anypoint MQ ACK and NACK Operations - Mule 4を参照してください。

Anypoint MQ では、Anypoint MQ コネクタを使用して Anypoint Studio でメッセージを処理する方法を決定できます。

次の 2 つの応答が発生する可能性があります。

  • 肯定応答 (ack) - アプリケーションがメッセージを受信したことを示します。

  • 否定応答 (nack) - アプリケーションが現在のメッセージを拒否することを示します。Anypoint MQ は、別のアプリケーションがメッセージを受信できるように、キューにメッセージを返します。

Anypoint MQ コネクタには、Studio アプリケーションでこれらの状態をグローバルまたはローカルに設定するためのツールが用意されています。

  • Anypoint MQ コネクタ - Mule 4 コネクタでは、Ack および Nack コンポーネントを使用するときに、AnypointMQMessageContext オブジェクトを渡す必要があります。このオブジェクトは、サブスクライバまたはコンシューマコンポーネントに対する Anypoint MQ コネクタ属性内にあります。サブスクライバまたはコンシューマに続く Ack または Nack コンポーネントに #[attributes] を挿入すると、Anypoint MQ コネクタは自動的に AnypointMQMessageContext オブジェクトを渡します。

  • 他の操作によって #[attributes] が新しい値で上書きされる可能性があるため、サブスクライバまたはコンシューマと Ack/Nack 間のフローにそのような演算子が存在する場合は、サブスクライバまたはコンシューマの属性を変数に保存して属性が失われないようにします。

フローの例:

<flow name="flow">
    <anypoint-mq:subscriber doc:name="Subscriber" config-ref="Anypoint_MQ_Default_subscriber" destination="test"/>
    <logger level="INFO" doc:name="Logger" message="Received MQ msg: #[payload]"/>
    <anypoint-mq:ack doc:name="Ack" config-ref="Anypoint_MQ_Default_subscriber" messageContext="#[attributes]"/>
</flow>

グローバルおよびローカル状態

Anypoint MQ コネクタは、Studioアプリケーションがキューからメッセージを受信する方法を指定するためのグローバルおよびローカル状態を提供します。

グローバル状態

Studio の [Global Element Properties (グローバル要素のプロパティ)] メニューを使用して、フロー内のすべてのメッセージを処理する戦略を指定できます。グローバル状態には次の選択肢があります。

  • AUTO (自動) - Anypoint MQ コネクタは、例外のキャッチ戦略 (ACK) と例外のロールバック戦略 (NACK) のどちらを検出したかに応じて、フローの最後に Ack または Nack を自動的に送信します。「Anypoint Studio での自動肯定応答フローの作成」の例を参照してください。[AUTO (自動)] は Mule 4 コネクタのデフォルトです。

    考えられる結果は 2 つあります。

    • ACK - フローが成功するか、例外のキャッチ戦略が発生します。次のコードは、例外のキャッチ戦略を示しています。

      <catch-exception-strategy doc:name="Catch Exception Strategy"
          when="exception.causedBy(UnsupportedOperationException)">
          <logger message="Will ACK" level="ERROR" doc:name="Logger"/>
      </catch-exception-strategy>
    • NACK - 例外のロールバック戦略が発生します。

      <rollback-exception-strategy doc:name="Rollback Exception Strategy">
          <logger message="Will NACK" level="ERROR" doc:name="Logger"/>
      </rollback-exception-strategy>
  • MANUAL (手動) - アプリケーションは Anypoint MQ コネクタのローカル状態を使用して独自の Ack または Nack を送信することを示します。Anypoint MQ コネクタはアクションを自動的に提供しません。「Anypoint Studio での手動肯定応答フローの作成」の例を参照してください。

  • NONE (なし) - アプリケーションがメッセージを受信し、Anypoint MQ コネクタはそのメッセージをすぐに肯定応答して削除します。この状態は、メッセージが適切に使用されない場合はメッセージが失われるリスクがありますが、通常は、後続の各メッセージがその前のメッセージに関する詳細を提供するニュースフィードなど、メッセージを常に更新するフローがある場合に使用されます。この状態はリスクがありますが、コーディングが簡単です。「[NONE (なし)] 肯定応答モードの設定」の例を参照してください。

    Anypoint MQ Mule コネクタ - Mule 4 では、[NONE (なし)] は [IMMEDIATE (即時)] と呼ばれますが、機能は同じです。

    グローバル状態は [Acknowledgment Mode (肯定応答モード)] 項目で設定できます。

    mq global mode set

ローカル状態

アプリケーションがメッセージを受信するか拒否するかに応じて [Operation (操作)] 項目が [ack] または [nack] に設定された Anypoint MQ コネクタインスタンスを手動でフローに追加できます。

Anypoint MQ コネクタの [Operation (操作)] 項目を使用して、ローカル状態を設定できます。

mq connector ops

Anypoint Studio での [AUTO (自動)] 肯定応答フローの作成

[AUTO (自動)] 状態を使用している場合、choice-exception-strategy を設定すると、選択した例外に応じて Anypoint MQ コネクタが ack または nack を送信します。次のコードで [AUTO (自動)] 状態の使用方法を示します。

次のコードは Studio のフローを示しています。

MQ 肯定応答モードのフロー

この例では、次の 2 つのフローがあります。

  • autoHttp - 0.0.0.0:8081 でリスンする HTTP コネクタをセットアップし、メッセージをパブリッシュするように Anypoint MQ コネクタを設定し、肯定応答モードを [AUTO (自動)] に設定します。

  • autoMq - メッセージをコンシュームするように Anypoint MQ コネクタを設定し、Java クラス RandomError を使用して、選択に応じて自動機能が肯定応答または否定応答を送信するための考えられる例外を作成します。ロガーは、コンソールに ACK または NACK の選択を表示します。

    フローの [Error handling (エラー処理)] セクションには、Anypoint MQ コネクタが ACK を送信する [Catch Exception Strategy (例外のキャッチ戦略)]、および Anypoint MQ コネクタが NACK を送信する [Rollback Exception Strategy (例外のロールバック戦略)] が含まれます。

[AUTO (自動)] の XML コード

[AUTO (自動)] 肯定応答モードの XML コードは次のとおりです。

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:anypoint-mq="http://www.mulesoft.org/schema/mule/anypoint-mq"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw"
xmlns:metadata="http://www.mulesoft.org/schema/mule/metadata"
xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:spring="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/ee/tracking
http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/ee/dw
http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-current.xsd
http://www.mulesoft.org/schema/mule/anypoint-mq
http://www.mulesoft.org/schema/mule/anypoint-mq/current/mule-anypoint-mq.xsd
http://www.mulesoft.org/schema/mule/http
http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">

    <context:property-placeholder location="ackmodes.properties"/>

    <anypoint-mq:config name="Anypoint_MQ_Configuration"
    doc:name="Anypoint MQ Configuration">
        <anypoint-mq:provider
        url="https://mq-us-east-1.anypoint.mulesoft.com/api/v1"
        clientId="${mq.clientId}" clientSecret="${mq.clientSecret}"/>
    </anypoint-mq:config>

    <http:listener-config name="HTTP_Listener_Configuration"
    host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration"/>

    <flow name="autoHttp">
        <http:listener config-ref="HTTP_Listener_Configuration"
        path="/" doc:name="HTTP"/>
        <anypoint-mq:publish config-ref="Anypoint_MQ_Configuration"
        destination="auto" doc:name="Anypoint MQ"/>
    </flow>

    <flow name="autoMq">
        <anypoint-mq:subscriber config-ref="Anypoint_MQ_Configuration"
        destination="auto" doc:name="Anypoint MQ" pollingTime="10000"/>
        <component class="ackmodes.RandomError" doc:name="Java"/>
        <logger level="ERROR" doc:name="Logger" message="Will ACK"/>
        <choice-exception-strategy doc:name="holaChoice_Exception_Strategy">
            <catch-exception-strategy doc:name="Catch Exception Strategy"
            when="exception.causedBy(UnsupportedOperationException)">
                <logger message="Will ACK" level="ERROR" doc:name="Logger"/>
            </catch-exception-strategy>
            <rollback-exception-strategy doc:name="Rollback Exception Strategy">
                <logger message="Will NACK" level="ERROR" doc:name="Logger"/>
            </rollback-exception-strategy>
        </choice-exception-strategy>
    </flow>

</mule>

プロパティファイルでクライアント ID (mq.clientId) とクライアントシークレット (mq.clientSecret) の値を設定していることを確認します。この場合、それらの値は /ackmodes/classes/ackmodes.properties ファイルで設定され、次の内容が含まれています。

# Contents of this file are not meant to be shared with the wide public

mq.clientId=<Client_ID>
mq.clientSecret=<Client_Secret>

このステートメントでコールされる Java テストプログラムを使用して例外を発生させる方法についての説明は、「ランダムエラージェネレータ」を参照してください。

<component class="ackmodes.RandomError" doc:name="Java"/>

Anypoint Studio での [MANUAL (手動)] 肯定応答フローの作成

Studio の [MANUAL (手動)] 肯定応答モードは、Anypoint MQ コネクタの [Global Element Properties (グローバル要素のプロパティ)] から設定できます。

mq global mode set

[MANUAL (手動)] フローでも、choice-exception-strategy がフローで同様に設定されます。この場合、アプリケーションは「ランダムエラー Java クラス」で送信する例外に応じて、操作が Ack または Nack に設定された個々の Anypoint MQ コネクタインスタンスを使用します。

mq manual flow

[AUTO (自動)] の XML コード

[AUTO (自動)] 肯定応答モードの XML コードは次のとおりです。

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:anypoint-mq="http://www.mulesoft.org/schema/mule/anypoint-mq"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw"
xmlns:metadata="http://www.mulesoft.org/schema/mule/metadata"
xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:spring="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/ee/tracking
http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/ee/dw
http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-current.xsd
http://www.mulesoft.org/schema/mule/anypoint-mq
http://www.mulesoft.org/schema/mule/anypoint-mq/current/mule-anypoint-mq.xsd
http://www.mulesoft.org/schema/mule/http
http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">

    <http:listener-config name="HTTP_Listener_Configuration_manual"
    host="0.0.0.0" port="8082" doc:name="HTTP Listener Configuration"/>

    <flow name="manualHttp">
        <http:listener config-ref="HTTP_Listener_Configuration_manual"
        path="/" doc:name="HTTP"/>
        <anypoint-mq:publish config-ref="Anypoint_MQ_Configuration"
        destination="manual" doc:name="Anypoint MQ"/>
    </flow>

    <flow name="manualMq">
        <anypoint-mq:subscriber config-ref="Anypoint_MQ_Configuration"
        destination="manual" doc:name="Anypoint MQ" pollingTime="10000"
        AcknowledgmentMode="MANUAL"/>
        <component class="ackmodes.RandomError" doc:name="Java"/>
        <logger level="ERROR" doc:name="Logger" message="Will ACK"/>
        <anypoint-mq:ack config-ref="Anypoint_MQ_Configuration"
        doc:name="Anypoint MQ"/>
        <choice-exception-strategy doc:name="holaChoice_Exception_Strategy">
            <catch-exception-strategy doc:name="Catch Exception Strategy"
            when="exception.causedBy(UnsupportedOperationException)">
                <logger message="Will ACK" level="ERROR" doc:name="Logger"/>
                <anypoint-mq:ack config-ref="Anypoint_MQ_Configuration"
                doc:name="Anypoint MQ"/>
            </catch-exception-strategy>
            <catch-exception-strategy doc:name="Rollback Exception Strategy">
                <logger message="Will NACK" level="ERROR" doc:name="Logger"/>
                <anypoint-mq:nack config-ref="Anypoint_MQ_Configuration"
                doc:name="Anypoint MQ"/>
            </catch-exception-strategy>
        </choice-exception-strategy>
    </flow>

</mule>

このステートメントでコールされる Java テストプログラムを使用して例外を発生させる方法についての説明は、「ランダムエラージェネレータ」を参照してください。

<component class="ackmodes.RandomError" doc:name="Java"/>

[NONE (なし)] 肯定応答モードの設定

Anypoint MQ コネクタの Studio の [Global Element Properties (グローバル要素のプロパティ)] メニューで [Acknowledgment Mode (肯定応答モード)][NONE (なし)] に設定できます。

mq global mode set

[NONE (なし)] フローでは、何が発生した場合でも、Anypoint MQ コネクタが常に ACK を送信します。

[NONE (なし)] は Anypoint MQ コネクタ - Mule 4 では [IMMEDIATE (即時)] と呼ばれますが、機能は同じです。

次の例は、Java クラス RandomError を使用して考えられる例外を作成することでこのリスクを強調しますが、それらの例外は無視して ACK を送信します。

<component class="ackmodes.RandomError" doc:name="Java"/>
<logger level="ERROR" doc:name="Logger"
message="Always ACKs as soon as a message is received"/>
mq none flow

このモードを使用するとアプリケーションはシンプルになりますが、リスクが高まります。このモードは、ニュースの詳細が明らかになっていくニュースフィードのように、メッセージが常に相互に更新される場合に最適です。例外が発生した場合、次のメッセージが適切なコンテンツを取得する機会があります。

[NONE (なし)] の XML コード

[NONE (なし)] 肯定応答モードの XML コードは次のとおりです。

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:anypoint-mq="http://www.mulesoft.org/schema/mule/anypoint-mq"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw"
xmlns:metadata="http://www.mulesoft.org/schema/mule/metadata"
xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:spring="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/ee/tracking
http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/ee/dw
http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-current.xsd
http://www.mulesoft.org/schema/mule/anypoint-mq
http://www.mulesoft.org/schema/mule/anypoint-mq/current/mule-anypoint-mq.xsd
http://www.mulesoft.org/schema/mule/http
http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">

    <http:listener-config name="HTTP_Listener_Configuration_none"
    host="0.0.0.0" port="8083" doc:name="HTTP Listener Configuration"/>

    <flow name="noneHttp">
        <http:listener config-ref="HTTP_Listener_Configuration_none"
        path="/" doc:name="HTTP"/>
        <anypoint-mq:publish config-ref="Anypoint_MQ_Configuration"
        destination="none" doc:name="Anypoint MQ"/>
    </flow>

    <flow name="noneMq">
        <anypoint-mq:subscriber config-ref="Anypoint_MQ_Configuration"
        destination="none" doc:name="Anypoint MQ"
        pollingTime="10000"
        AcknowledgmentMode="NONE"/>
        <component class="ackmodes.RandomError" doc:name="Java"/>
        <logger level="ERROR" doc:name="Logger"
        message="Always ACKs as soon as a message is received"/>
    </flow>

</mule>

ランダムエラージェネレータ

次の Java テストプログラムは、アプリケーションのテストに使用できるランダムエラーを生成します。このプログラムは 0 から 100 の間のランダムな整数を取得し、その値に応じてこれらの選択を行います。

エラー状態 Studio フローの動作

0 ~ 32

エラーなし。渡されたイベントコンテキストを返します。

通過してアプリケーションは ACK を送信します。

33 ~ 65

エラー。無効な状態の例外を返します。

アプリケーションは NACK を送信します。

66 ~ 100

エラー。サポートされていない操作の例外を返します。

アプリケーションは ACK を送信します。

package ackmodes;

import java.util.Random;

import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;

public class RandomError implements Callable {

	@Override
	public Object onCall(MuleEventContext eventContext) throws Exception {
		int randomInt = new Random().nextInt(100);
		if (randomInt > 66) {
			throw new IllegalStateException("This should be retried");
		} else if (randomInt > 33) {
			throw new UnsupportedOperationException("This should not be retried");
		} else {
			return eventContext;
		}
	}
}

関連情報

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub