メッセージの肯定応答 - Mule 3

このガイドは Mule 3 および Studio 6 にのみ適用されます。Mule 4 の肯定応答情報については、​「ACK 操作と NACK 操作」​を参照してください。

Anypoint MQ では、Anypoint MQ Connector を使用して Anypoint Studio でメッセージを処理する方法を決定できます。

次の 2 つの応答が発生する可能性があります。

  • 肯定応答 (ACK) は、アプリケーションがメッセージを受信したことを示します。

  • 否定応答 (NACK) は、アプリケーションが現在のメッセージを拒否することを示します。Anypoint MQ は、別のアプリケーションがメッセージを受信できるように、キューにメッセージを返します。

Anypoint MQ Connector には、Studio アプリケーションでこれらの状態をグローバルまたはローカルに設定するためのツールが用意されています。

  • Anypoint MQ Connector - Mule 4 では、ACK および NACK コンポーネントを使用するときに、​AnypointMQMessageContext​ オブジェクトを渡す必要があります。このオブジェクトは、サブスクライバーまたはコンシューマーコンポーネントに対する Anypoint MQ Connector 属性内にあります。サブスクライバーまたはコンシューマーに続く ACK または NACK コンポーネントに ​#[attributes]​ を挿入すると、Anypoint MQ Connector は自動的に AnypointMQMessageContext オブジェクトを渡します。

  • 他の操作によって ​#[attributes]​ が新しい値で上書きされる可能性があるため、サブスクライバーまたはコンシューマーと ACK/NACK 間のフローにそのような演算子が存在する場合は、サブスクライバーまたはコンシューマーの属性を変数に保存して属性が失われないようにします。

フローの例:

<flow name="flow">
    <anypoint-mq:subscriber doc:name="Subscriber" config-ref="Anypoint_MQ_Default_subscriber" destination="test"/>
    <logger level="INFO" doc:name="Logger" message="Received MQ msg: #[payload]"/>
    <anypoint-mq:ack doc:name="Ack" config-ref="Anypoint_MQ_Default_subscriber" messageContext="#[attributes]"/>
</flow>

グローバルおよびローカル状態

Anypoint MQ Connector は、Studio アプリケーションがキューからメッセージを受信する方法を指定するためのグローバルおよびローカル状態を提供します。

グローバル状態

Studio の ​[Global Element Properties (グローバル要素のプロパティ)]​ メニューを使用して、フロー内のすべてのメッセージを処理する戦略を指定できます。グローバル状態には次の選択肢があります。

  • AUTO

    Anypoint MQ Connector は、例外のキャッチ戦略 (ACK) と例外のロールバック戦略 (NACK) のどちらを検出したかに応じて、フローの最後に Ack または Nack を自動的に送信します。​「Anypoint Studio での自動肯定応答フローの作成」​の例を参照してください。[AUTO (自動)] は Mule 4 Connector のデフォルトです。

    考えられる結果は 2 つあります。

    • ACK

      フローが成功するか、例外のキャッチ戦略が発生します。次のコードは、例外のキャッチ戦略を示しています。

      <catch-exception-strategy doc:name="Catch Exception Strategy"
          when="exception.causedBy(UnsupportedOperationException)">
          <logger message="Will ACK" level="ERROR" doc:name="Logger"/>
      </catch-exception-strategy>
    • NACK

      例外のロールバック戦略が発生します。

      <rollback-exception-strategy doc:name="Rollback Exception Strategy">
          <logger message="Will NACK" level="ERROR" doc:name="Logger"/>
      </rollback-exception-strategy>
  • MANUAL

    アプリケーションは Anypoint MQ Connector のローカル状態を使用して独自の ACK または NACK を送信することを示します。Anypoint MQ Connector はアクションを自動的に提供しません。​「Anypoint Studio での手動肯定応答フローの作成」​の例を参照してください。

  • NONE

    アプリケーションがメッセージを受信し、Anypoint MQ Connector はそのメッセージをすぐに肯定応答して削除します。この状態は、メッセージが適切にコンシュームされない場合はメッセージが失われるリスクがありますが、通常は、後続の各メッセージがその前のメッセージに関する詳細を提供するニュースフィードなど、メッセージを常に更新するフローがある場合に使用されます。この状態はリスクがありますが、コーディングが簡単です。​「[NONE (なし)] 肯定応答モードの設定」​の例を参照してください。

    Anypoint MQ Mule Connector - Mule 4 では、[NONE (なし)] は [IMMEDIATE (即時)] と呼ばれますが、機能は同じです。

    グローバル状態は ​[Acknowledgment Mode (肯定応答モード)]​ 項目で設定できます。

    mq global mode set

ローカル状態

アプリケーションがメッセージを受信するか拒否するかに応じて ​[Operation (操作)]​ 項目が ​[ack]​ または ​[nack]​ に設定された Anypoint MQ Connector インスタンスを手動でフローに追加できます。

Anypoint MQ Connector の ​[Operation (操作)]​ 項目を使用して、ローカル状態を設定できます。

mq connector ops

Anypoint Studio での [AUTO (自動)] 肯定応答フローの作成

[AUTO (自動)]​ 状態を使用している場合、​choice-exception-strategy​ を設定すると、選択した例外に応じて Anypoint MQ Connector が ack または nack を送信します。次のコードで ​[AUTO (自動)]​ 状態の使用方法を示します。

次のコードは Studio のフローを示しています。

MQ 肯定応答モードのフロー

この例では、次の 2 つのフローがあります。

  • autoHttp

    0.0.0.0:8081​ でリスンする HTTP Connector をセットアップし、メッセージをパブリッシュするように Anypoint MQ Connector を設定し、肯定応答モードを ​[AUTO (自動)]​ に設定します。

  • autoMq

    メッセージをコンシュームするように Anypoint MQ Connector を設定し、Java クラス ​RandomError​ を使用して、選択に応じて自動機能が肯定応答または否定応答を送信するための考えられる例外を作成します。ロガーは、コンソールに ACK または NACK の選択を表示します。

    フローの [Error handling (エラー処理)] セクションには、Anypoint MQ Connector が ACK を送信する [Catch Exception Strategy (例外のキャッチ戦略)]、および Anypoint MQ Connector が NACK を送信する [Rollback Exception Strategy (例外のロールバック戦略)] が含まれます。

[AUTO (自動)] の XML コード

[AUTO (自動)] 肯定応答モードの XML コードは次のとおりです。

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:anypoint-mq="http://www.mulesoft.org/schema/mule/anypoint-mq"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw"
xmlns:metadata="http://www.mulesoft.org/schema/mule/metadata"
xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:spring="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/ee/tracking
http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/ee/dw
http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-current.xsd
http://www.mulesoft.org/schema/mule/anypoint-mq
http://www.mulesoft.org/schema/mule/anypoint-mq/current/mule-anypoint-mq.xsd
http://www.mulesoft.org/schema/mule/http
http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">

    <context:property-placeholder location="ackmodes.properties"/>

    <anypoint-mq:config name="Anypoint_MQ_Configuration"
    doc:name="Anypoint MQ Configuration">
        <anypoint-mq:provider
        url="https://mq-us-east-1.anypoint.mulesoft.com/api/v1"
        clientId="${mq.clientId}" clientSecret="${mq.clientSecret}"/>
    </anypoint-mq:config>

    <http:listener-config name="HTTP_Listener_Configuration"
    host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration"/>

    <flow name="autoHttp">
        <http:listener config-ref="HTTP_Listener_Configuration"
        path="/" doc:name="HTTP"/>
        <anypoint-mq:publish config-ref="Anypoint_MQ_Configuration"
        destination="auto" doc:name="Anypoint MQ"/>
    </flow>

    <flow name="autoMq">
        <anypoint-mq:subscriber config-ref="Anypoint_MQ_Configuration"
        destination="auto" doc:name="Anypoint MQ" pollingTime="10000"/>
        <component class="ackmodes.RandomError" doc:name="Java"/>
        <logger level="ERROR" doc:name="Logger" message="Will ACK"/>
        <choice-exception-strategy doc:name="holaChoice_Exception_Strategy">
            <catch-exception-strategy doc:name="Catch Exception Strategy"
            when="exception.causedBy(UnsupportedOperationException)">
                <logger message="Will ACK" level="ERROR" doc:name="Logger"/>
            </catch-exception-strategy>
            <rollback-exception-strategy doc:name="Rollback Exception Strategy">
                <logger message="Will NACK" level="ERROR" doc:name="Logger"/>
            </rollback-exception-strategy>
        </choice-exception-strategy>
    </flow>

</mule>

プロパティファイルでクライアント ID (​mq.clientId​) とクライアントシークレット (​mq.clientSecret​) の値を設定していることを確認します。この場合、それらの値は ​/ackmodes/classes/ackmodes.properties​ ファイルで設定され、次の内容が含まれています。

# Contents of this file are not meant to be shared with the wide public

mq.clientId=<Client_ID>
mq.clientSecret=<Client_Secret>

このステートメントでコールされる Java テストプログラムを使用して例外を発生させる方法についての説明は、​「ランダムエラージェネレーター」​を参照してください。

<component class="ackmodes.RandomError" doc:name="Java"/>

Anypoint Studio での [MANUAL (手動)] 肯定応答フローの作成

Studio の [MANUAL (手動)] 肯定応答モードは、Anypoint MQ Connector の [Global Element Properties (グローバル要素のプロパティ)] から設定できます。

mq global mode set

[MANUAL (手動)] フローでも、​choice-exception-strategy​ がフローで同様に設定されます。この場合、アプリケーションは​「ランダムエラー Java クラス」​で送信する例外に応じて、操作が ACK または NACK に設定された個々の Anypoint MQ Connector インスタンスを使用します。

mq manual flow

[AUTO (自動)] の XML コード

[AUTO (自動)] 肯定応答モードの XML コードは次のとおりです。

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:anypoint-mq="http://www.mulesoft.org/schema/mule/anypoint-mq"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw"
xmlns:metadata="http://www.mulesoft.org/schema/mule/metadata"
xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:spring="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/ee/tracking
http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/ee/dw
http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-current.xsd
http://www.mulesoft.org/schema/mule/anypoint-mq
http://www.mulesoft.org/schema/mule/anypoint-mq/current/mule-anypoint-mq.xsd
http://www.mulesoft.org/schema/mule/http
http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">

    <http:listener-config name="HTTP_Listener_Configuration_manual"
    host="0.0.0.0" port="8082" doc:name="HTTP Listener Configuration"/>

    <flow name="manualHttp">
        <http:listener config-ref="HTTP_Listener_Configuration_manual"
        path="/" doc:name="HTTP"/>
        <anypoint-mq:publish config-ref="Anypoint_MQ_Configuration"
        destination="manual" doc:name="Anypoint MQ"/>
    </flow>

    <flow name="manualMq">
        <anypoint-mq:subscriber config-ref="Anypoint_MQ_Configuration"
        destination="manual" doc:name="Anypoint MQ" pollingTime="10000"
        AcknowledgmentMode="MANUAL"/>
        <component class="ackmodes.RandomError" doc:name="Java"/>
        <logger level="ERROR" doc:name="Logger" message="Will ACK"/>
        <anypoint-mq:ack config-ref="Anypoint_MQ_Configuration"
        doc:name="Anypoint MQ"/>
        <choice-exception-strategy doc:name="holaChoice_Exception_Strategy">
            <catch-exception-strategy doc:name="Catch Exception Strategy"
            when="exception.causedBy(UnsupportedOperationException)">
                <logger message="Will ACK" level="ERROR" doc:name="Logger"/>
                <anypoint-mq:ack config-ref="Anypoint_MQ_Configuration"
                doc:name="Anypoint MQ"/>
            </catch-exception-strategy>
            <catch-exception-strategy doc:name="Rollback Exception Strategy">
                <logger message="Will NACK" level="ERROR" doc:name="Logger"/>
                <anypoint-mq:nack config-ref="Anypoint_MQ_Configuration"
                doc:name="Anypoint MQ"/>
            </catch-exception-strategy>
        </choice-exception-strategy>
    </flow>

</mule>

このステートメントでコールされる Java テストプログラムを使用して例外を発生させる方法についての説明は、​「ランダムエラージェネレーター」​を参照してください。

<component class="ackmodes.RandomError" doc:name="Java"/>

[NONE (なし)] 肯定応答モードの設定

Anypoint MQ Connector の Studio の ​[Global Element Properties (グローバル要素のプロパティ)]​ メニューで ​[Acknowledgment Mode (肯定応答モード)]​ を ​[NONE (なし)]​ に設定できます。

mq global mode set

[NONE (なし)]​ フローでは、何が発生した場合でも、Anypoint MQ Connector が常に ACK を送信します。

[NONE (なし)]​ は Anypoint MQ Connector - Mule 4 では ​[IMMEDIATE (即時)]​ と呼ばれますが、機能は同じです。

次の例は、Java クラス ​RandomError​ を使用して考えられる例外を作成することでこのリスクを強調しますが、それらの例外は無視して ACK を送信します。

<component class="ackmodes.RandomError" doc:name="Java"/>
<logger level="ERROR" doc:name="Logger"
message="Always ACKs as soon as a message is received"/>
mq none flow

このモードを使用するとアプリケーションはシンプルになりますが、リスクが高まります。このモードは、ニュースの詳細が明らかになっていくニュースフィードのように、メッセージが常に相互に更新される場合に最適です。例外が発生した場合、次のメッセージが適切なコンテンツを取得する機会があります。

[NONE (なし)] の XML コード

[NONE (なし)]​ 肯定応答モードの XML コードは次のとおりです。

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:anypoint-mq="http://www.mulesoft.org/schema/mule/anypoint-mq"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw"
xmlns:metadata="http://www.mulesoft.org/schema/mule/metadata"
xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:spring="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/ee/tracking
http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/ee/dw
http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-current.xsd
http://www.mulesoft.org/schema/mule/anypoint-mq
http://www.mulesoft.org/schema/mule/anypoint-mq/current/mule-anypoint-mq.xsd
http://www.mulesoft.org/schema/mule/http
http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">

    <http:listener-config name="HTTP_Listener_Configuration_none"
    host="0.0.0.0" port="8083" doc:name="HTTP Listener Configuration"/>

    <flow name="noneHttp">
        <http:listener config-ref="HTTP_Listener_Configuration_none"
        path="/" doc:name="HTTP"/>
        <anypoint-mq:publish config-ref="Anypoint_MQ_Configuration"
        destination="none" doc:name="Anypoint MQ"/>
    </flow>

    <flow name="noneMq">
        <anypoint-mq:subscriber config-ref="Anypoint_MQ_Configuration"
        destination="none" doc:name="Anypoint MQ"
        pollingTime="10000"
        AcknowledgmentMode="NONE"/>
        <component class="ackmodes.RandomError" doc:name="Java"/>
        <logger level="ERROR" doc:name="Logger"
        message="Always ACKs as soon as a message is received"/>
    </flow>

</mule>

ランダムエラージェネレーター

次の Java テストプログラムは、アプリケーションのテストに使用できるランダムエラーを生成します。このプログラムは 0 から 100 の間のランダムな整数を取得し、その値に応じてこれらの選択を行います。

エラー状態 Studio フローの動作

0 ~ 32

エラーなし。渡されたイベントコンテキストを返します。

通過してアプリケーションは ACK を送信します。

33 ~ 65

エラー。無効な状態の例外を返します。

アプリケーションは NACK を送信します。

66 ~ 100

エラー。サポートされていない操作の例外を返します。

アプリケーションは ACK を送信します。

package ackmodes;

import java.util.Random;

import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;

public class RandomError implements Callable {

	@Override
	public Object onCall(MuleEventContext eventContext) throws Exception {
		int randomInt = new Random().nextInt(100);
		if (randomInt > 66) {
			throw new IllegalStateException("This should be retried");
		} else if (randomInt > 33) {
			throw new UnsupportedOperationException("This should not be retried");
		} else {
			return eventContext;
		}
	}
}

関連情報