Scatter-Gather ルーター

Scatter-Gather コンポーネントは、異なるイベントプロセッサーを持つ異なる並列処理ルートで Mule イベントを処理するルーティングイベントプロセッサーです。各ルートは、Mule イベントへの参照を受け取り、1 つ以上の一連のイベントプロセッサーを実行します。各ルートは個別のスレッドを使用してイベントプロセッサーを実行します。実行後の Mule イベントは、元と同じ未変更の Mule イベントか、もしくは独自のペイロード、属性、変数を持つ新しい Mule イベントになります。Scatter-Gather コンポーネントは、各処理ルートから返された Mule イベントを新しい Mule イベントとして統合し、すべてのルートが正常に完了した場合にのみ、この Mule イベントを次のイベントプロセッサーに渡します。

Scatter-Gather コンポーネントは、複数のルートを (順次ではなく) 並列に実行します。ルートを並列実行することで、Mule アプリケーションの効率が大幅に向上し、順次処理よりも多くの情報を提供できます。

Scatter-Gather コンポーネントは、反復可能ストリームを処理します。失われる前に 1 回だけ読み取ることができる反復不可能なストリームは処理しません。コンポーネントのストリーム戦略が反復不可能に設定されていない限り、デフォルトではすべてのストリームが Mule で反復可能です。

Scatter-Gather コンポーネントの詳細な動作を次の図に示します。

Scatter-Gather コンポーネントの図
1 Scatter-Gather コンポーネントは Mule イベントを受け取って、この Mule イベントの参照を各処理ルートに送信します。
2 各処理ルートは並列に実行を開始します。ルート内のすべてのプロセッサーが処理を完了すると、そのルートは Mule イベントを返します。この Mule イベントは、元と同じ未変更の Mule イベントか、もしくはルート内のプロセッサーによって変更が加えられて作成された新しい Mule イベントになります。
3 すべての処理ルートが処理を完了すると、Scatter-Gather コンポーネントはすべてのルートから返された Mule イベントを統合して、フロー内の次のコンポーネントに渡します。
Scatter-Gather コンポーネントは、2 本以上のルートで設定してください。そうしないと、Mule アプリケーションは例外を返して実行されません。

結果

すべてのルートが完了すると、コンポーネントによって次の形式で結果が出力されます: {0: messageFromRoute0, 1: messageFromRoute1, …​}

生成されたすべてのペイロードを配列に抽出するには、次のような DataWeave スクリプトを使用します。

flatten(valuesOf(payload) map ((item, index) -> item.*payload))

変数の伝播

すべてのルートは同じ初期変数値で開始します。特定のルート内の変数の変更は他のルートに影響しません。そのため、1 つのルートで変数が追加されたり変更されたりした場合、集約後に、その値はそのルートによって定義されます。複数のルートによって変数が追加されたり変更されたりした場合、その値はすべてのルート内のその変数について定義されたすべての値のリストに追加されます。例:

<set-variable variableName="var1" value="var1"/>
<set-variable variableName="var2" value="var2"/>
<scatter-gather doc:name="Scatter-Gather" doc:id="abc665e0-6119-4ecb-9f8b-52dbcbb1d488" >
	<route >
		<set-variable variableName="var2" value="newValue"/>
        <set-variable variableName="var3" value="appleVal"/>
	</route>
	<route >
		<set-variable variableName="var3" value="bananaVal"/>
	</route>
	<route >
		<set-variable variableName="var3" value="otherVal"/>
        <set-variable variableName="var4" value="val4"/>
	</route>
</scatter-gather>

集約後、変数は次のようになります。

{var1: "var1", var2: "newValue", var3: ["appleVal, bananaVal, otherVal"], var4: "val4"}

Scatter-Gather ルート内のエラー処理

Scatter-Gather コンポーネントの各ルートで Try スコープを使用して、ルートのイベントプロセッサーで生成されるエラーを処理できます。実行されたいずれかのルートでエラーが発生すると、Scatter-Gather コンポーネントは ​MULE:COMPOSITE_ROUTING​ 種別のエラーをスローし、フローの処理はその Scatter-Gather コンポーネントから先には進みません。その代わりに、フローはエラー処理イベントプロセッサーに分岐します。

MULE:COMPOSITE_ROUTING​ エラーオブジェクトは、失敗したルートからのエラーだけではなく、正常に完了したルートからの Mule イベントも収集します。そのためアプリケーションは、エラー処理イベントプロセッサーを使用して、完了したルートからの Mule イベントを処理できます。

2 つのケースを使用して、この仕組みを分かりやすく説明します。

  • Scatter-Gather コンポーネントの各ルートに Try スコープがあるケース。
    いずれか 1 本のルートでエラーが生成され、そのルートの Try スコープで ​on-error-continue​ エラーハンドラーを使用して正常に処理されたため、そのルートは正常に完了します。Scatter-Gather コンポーネントは、すべてのルートからの Mule イベントを統合して新しい Mule イベントを生成し、統合したイベントを次のイベントプロセッサーに渡します。

  • Scatter-Gather コンポーネントのいずれかのルートが Try スコープを持たないか、またはそのルートの Try スコープで該当のエラー種別を処理できないエラーハンドラーを使用しているか、あるいはエラーハンドラーの種別が ​on-error-propagate​ であるケース。
    このルートでエラーが発生すると、ルートは失敗して Scatter-Gather コンポーネントは ​MULE:COMPOSITE_ROUTING​ エラーをスローします。フローは、完了したルートからの Mule イベントを処理できるエラー処理イベントプロセッサーに分岐します。

こうしたエラーの処理の例:

<flow name="errorHandler">
    <scatter-gather>
        <route>
            <raise-error type="APP:MYERROR"/>
        </route>
        <route>
            <set-payload value="apple"/>
        </route>
    </scatter-gather>
    <error-handler>
        <on-error-continue type="MULE:COMPOSITE_ROUTING">
            <!-- This will have the error thrown by the first route -->
            <logger level="WARN" message="#[error.errorMessage.payload.failures['0']]"/>
            <!-- This will be a null value -->
            <logger level="WARN" message="#[error.errorMessage.payload.failures['1']]"/>

            <!-- This will be a null value -->
            <logger level="WARN" message="#[error.errorMessage.payload.results['0']]"/>
            <!-- This will have the result of the second (correctly executed) route -->
            <logger level="WARN" message="#[error.errorMessage.payload.results['1']]"/>
        </on-error-continue>
    </error-handler>
</flow>

Scatter-Gather でのタイムアウトエラーの処理

Scatter-Gather コンポーネントでタイムアウトを設定してある場合、そのタイムアウト時間内にルートが処理を完了しないと、そのルートは ​MULE:TIMEOUT​ エラーを返します。このエラーは、ルートで発生する他のエラーと同じように処理されます。各ルートが (処理を正常に完了するか、または ​MULE:TIMEOUT​ エラーをスローして) 完了すると、成功した結果とエラーが Scatter-Gather コンポーネントによって収集され、​MULE:COMPOSITE_ROUTING​ エラーとしてスローされます。このエラーは、設定されているエラーハンドラーで処理されます。

サンプルプロジェクト

Scatter-Gather コンポーネントの使い方の詳細を見るには、Anypoint Studio で Anypoint Exchange からサンプルプロジェクトの ​Scatter-Gather Flow Control (スキャッターギャザーフロー制御)​ をダウンロードします。このサンプルは、データを並列に集約して結果を JSON で返すための Scatter-Gather 制御フローを示します。

また、このサンプルは用意されているデータを、集約する 2 つのリソースへの入力として使用します。このデータは、2 人の取引先責任者の情報を表し、次の構造となっています。

リソース

firstname

surname

phone

email

contacts-1.csv

John

Doe

096548763

john.doe@texasComp.com

contacts-2.csv

Jane

Doe

091558780

jane.doe@texasComp.com

データの集約には DataWeave が使用されます。取引先責任者の情報は、両方のリソースからのデータを表す JSON 構造に集約されます。

Anypoint Studio でこのサンプルプロジェクトをダウンロードして開くには、左上隅にある [Exchange] アイコンをクリックします。次に、開いたウィンドウで、Anypoint Exchange にログインし、プロジェクトの名前を検索します。

Scatter-Gather XML リファレンス

要素 説明

scatter-gather

複数の対象に要求メッセージを同時に送信します。応答をすべてのルートから収集して、単一のメッセージに集約します。

属性

timeout

送信したメッセージからの応答のタイムアウトをミリ秒単位で指定します。0 または負の値を指定すると、タイムアウトは設定されません。

maxConcurrency

処理する並列ルートの最大数を指定します。
デフォルトでは、すべてのルートが並列処理されます。

この値を 1 に設定すると、scatter-gather はルートを 1 本ずつ順次処理します。

target

対象変数の名前。

targetValue

対象変数に格納するデータの値。
設定しないと、デフォルト値の ​#[payload]​ が使用されます。
この項目は、変数が受け入れるすべての値を受け入れます。

  • サポートされる任意のデータ型。

  • DataWeave 式。

  • キーワードの ​payload​、​attributes​、​message​。​vars​ は​受け入れられません​。

子要素 説明

route

scatter-gather ルーター内のいずれかのルート。

スロー

  • MULE:ROUTING

  • MULE:COMPOSITE_ROUTING