Aggregators Module の複数の例 - Mule 4

アグリゲーターの一般的なユースケースは、複数の値を保存して後でバッチとして処理することです。 この集約を実現するには、​[Size based aggregator (サイズベースのアグリゲーター)]​ スコープを使用できます。

集約するデータは、以前に ​personName​ という変数に保存された人物名であり、100 要素のバッチを処理するとします。すべてのデータが収集されたら、​whatToDoWithAllTheseNames​ というフロー内で定義されたいくつかのロジックを実行します。

この場合、次を使用できます。

<aggregators:size-based-aggregator name="personsNameAggregator" maxSize="100">
    <aggregators:content>#[vars.personName]</aggregators:content>
    <aggregators:aggregation-complete>
        <flow-ref name="whatToDoWithAllTheseNames"/>
    </aggregators:aggregation-complete>
</aggregators:size-based-aggregator>

実行数が多すぎてしまうため、すべての新しい要素でアクションを実行しないようにするには、​incremental-aggregation​ を使用します。たとえば、次に示すように名前のリストが 10% ずつ増加するたびにアクションを実行するとします。

<aggregators:size-based-aggregator name="personsNameAggregator" maxSize="100">
    <aggregators:content>#[vars.personName]</aggregators:content>
    <aggregators:incremental-aggregation>
        <choice>
            <when expression="#[(sizeOf(payload) mod 10) == 0]">
                <flow-ref name="partialAction"/>
            </when>
        </choice>
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        <flow-ref name="whatToDoWithAllTheseNames"/>
    </aggregators:aggregation-complete>
</aggregators:size-based-aggregator>

実行されるロジックは ​partialAction​ という別のフロー内にあるとします。

すべてのバッチが埋まるまで永久に待つことを避けるには、アグリゲーターのタイムアウトを設定して、​maxSize​ に達していなくても特定の時点ですべてのイベントを処理します。次の例では、タイムアウトを 10 秒に設定しています。

<aggregators:size-based-aggregator name="personsNameAggregator"
                                   maxSize="100"
                                   timeout="10"
                                   timeoutUnit="SECONDS">
    <aggregators:content>#[vars.personName]</aggregators:content>
    <aggregators:incremental-aggregation>
        <choice>
            <when expression="#[(sizeOf(payload) mod 10) == 0]">
                <flow-ref name="partialAction"/>
            </when>
        </choice>
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        <flow-ref name="whatToDoWithAllTheseNames"/>
    </aggregators:aggregation-complete>
</aggregators:size-based-aggregator>

タイムアウトが発生したときに、すべての集約された要素を処理するロジックが実行されるように、次のようなリスナーも含める必要があります。

<aggregators:aggregator-listener aggregatorName="personsNameAggregator" includeTimedOutGroups="true"/>
<flow-ref name="whatToDoWithAllTheseNames"/>

この場合、​aggregation-complete​ ルートと ​aggregator-listener​ から取得されるデータ型は同じです。そのため、フロー ​whatToDoWithAllTheseNames​ の実行では、データの取得元は関係ありません。 ただしこの設定では、集約が完了した場合、フロー ​whatToDoWithAllTheseNames​ は ​aggregation-complete​ ルート ​flow-ref​ から 1 回、​aggregator-listener​ でのフロー内の ​flow-ref​ からもう 1 回の計 2 回トリガーされます。

同じロジックが複数回実行されないようにするには、不完全な集約を確認するため、​isAggregationComplete​ 属性を使用する ​choice​ コンポーネントで ​flow-ref​ 値 ​aggregator-listener​ をラップする必要があります:

<flow name="test-flow">
    ...
    <aggregators:size-based-aggregator name="personsNameAggregator"
                                       maxSize="100"
                                       timeout="10"
                                       timeoutUnit="SECONDS">
        <aggregators:content>#[vars.personName]</aggregators:content>
        <aggregators:incremental-aggregation>
            <choice>
                <when expression="#[(sizeOf(payload) mod 10) == 0]">
                    <flow-ref name="partialAction"/>
                </when>
            </choice>
        </aggregators:incremental-aggregation>
        <aggregators:aggregation-complete>
            <flow-ref name="whatToDoWithAllTheseNames"/>
        </aggregators:aggregation-complete>
    </aggregators:size-based-aggregator>
    ...
</flow>

<flow name="aggregator-listener-flow">
    <aggregators:aggregator-listener aggregatorName="personsNameAggregator" includeTimedOutGroups="true"/>
    <choice>
        <when expression="#[not attributes.isAggregationComplete]">
            <flow-ref name="whatToDoWithAllTheseNames"/>
        </when>
    <choice>
</flow>

この例では、​flow-ref​ はタイムアウトでそのフローがトリガーされたときにのみコールされます。

最後に、国籍に従って各人物を処理する必要があります。この情報は変数で渡されます。ただし、この場合は同じ国のすべての人物を同じバッチでまとめて処理するとします。

グループベースのアグリゲーターでそれらを適宜分割できます。たとえば、人物の国籍が以前に ​personNac​ という変数に保存されたとします。この場合、フローは次のようになります。

<aggregators:group-based-aggregator name="personsNameByNacAggregator"
                                    groupId="#[vars.personNac]"
                                    groupSize="100"
                                    timeout="10"
                                    timeoutUnit="SECONDS"
                                    evictionTime="0">
    <aggregators:content>#[vars.personName]</aggregators:content>
    <aggregators:incremental-aggregation>
        <choice>
            <when expression="#[(sizeOf(payload) mod 10) == 0]">
                <flow-ref name="partialAction"/>
            </when>
        </choice>
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        <flow-ref name="whatToDoWithAllTheseNamesFromSomeCountry"/>
    </aggregators:aggregation-complete>
</aggregators:group-based-aggregator>

このフローにはタイムアウト用のリスナーも必要です。

<aggregators:aggregator-listener aggregatorName="personsNameByNacAggregator" includeTimedOutGroups="true"/>
<flow-ref name="whatToDoWithAllTheseNamesFromSomeCountry"/>

この例では、アグリゲーターで除去時間を設定していません。これは、デフォルト値が望ましい動作の ​0​ (グループの解放直後に除去) であるためです。そのため、グループが完了またはタイムアウトした場合、後続の値は拒否されることなくグループに追加されます。