アグリゲータの例

アグリゲータの一般的なユースケースは、複数の値を保存して後でバッチとして処理することです。 サイズベースのアグリゲータを使用することで、これを簡単に実現できます。

集約するデータは、以前に personName という変数に保存された人物名であり、100 要素のバッチを処理するとします。すべてのデータが収集されたら、whatToDoWithAllTheseNames というフロー内で定義されたいくつかのロジックを実行します。

この場合、次を使用できます。

<aggregators:size-based-aggregator name="personsNameAggregator" maxSize="100">
    <aggregators:content>#[vars.personName]</aggregators:content>
    <aggregators:aggregation-complete>
        <flow-ref name="whatToDoWithAllTheseNames"/>
    </aggregators:aggregation-complete>
</aggregators:size-based-aggregator>

ここで、名前のリストが 10% 埋められるたびにアクションを実行するとします。すべての新しい要素でこれを実行することは、実行数が多すぎてしまうため望ましくありません。実行されるロジックは partialAction という別のフロー内にあるとします。これには incremental-aggregation ルートを使用できます。

<aggregators:size-based-aggregator name="personsNameAggregator" maxSize="100">
    <aggregators:content>#[vars.personName]</aggregators:content>
    <aggregators:incremental-aggregation>
        <choice>
            <when expression="#[(sizeOf(payload) mod 10) == 0]">
                <flow-ref name="partialAction"/>
            </when>
        </choice>
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        <flow-ref name="whatToDoWithAllTheseNames"/>
    </aggregators:aggregation-complete>
</aggregators:size-based-aggregator>

ただし、すべてのバッチが埋まるまで永久に待つことを避けるにはどうすればよいでしょうか? その場合、maxSize に達していなくても特定の時点ですべてのイベントを処理すると役立ちます。これを行うには、アグリゲータのタイムアウトを設定します。この例では、待機制限を 10 秒に設定します。

<aggregators:size-based-aggregator name="personsNameAggregator"
                                   maxSize="100"
                                   timeout="10"
                                   timeoutUnit="SECONDS">
    <aggregators:content>#[vars.personName]</aggregators:content>
    <aggregators:incremental-aggregation>
        <choice>
            <when expression="#[(sizeOf(payload) mod 10) == 0]">
                <flow-ref name="partialAction"/>
            </when>
        </choice>
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        <flow-ref name="whatToDoWithAllTheseNames"/>
    </aggregators:aggregation-complete>
</aggregators:size-based-aggregator>

タイムアウトが発生したときに aggregation-complete ルートが実行されるように、次のようなリスナも含める必要があります。

<aggregators:aggregator-listener aggregatorName="personsNameAggregator" includeTimedOutGroups="true"/>
<flow-ref name="whatToDoWithAllTheseNames"/>

この場合、aggregation-complete ルートと aggregator-listener から取得されるデータ型は同じです。そのため、フロー whatToDoWithAllTheseNames の実行では、データの取得元は関係ありません。

最後に、国籍に従って各人物を処理する必要があります。この情報は変数で渡されます。ただし、この場合は同じ国のすべての人物を同じバッチでまとめて処理するとします。

グループベースのアグリゲータでそれらを適宜分割できます。たとえば、人物の国籍が以前に personNac という変数に保存されたとします。この場合、フローは次のようになります。

<aggregators:group-based-aggregator name="personsNameByNacAggregator"
                                    groupId="#[vars.personNac]"
                                    groupSize="100"
                                    timeout="10"
                                    timeoutUnit="SECONDS"
                                    evictionTime="0">
    <aggregators:content>#[vars.personName]</aggregators:content>
    <aggregators:incremental-aggregation>
        <choice>
            <when expression="#[(sizeOf(payload) mod 10) == 0]">
                <flow-ref name="partialAction"/>
            </when>
        </choice>
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        <flow-ref name="whatToDoWithAllTheseNamesFromSomeCountry"/>
    </aggregators:aggregation-complete>
</aggregators:group-based-aggregator>

このフローにはタイムアウト用のリスナも必要です。

<aggregators:aggregator-listener aggregatorName="personsNameByNacAggregator" includeTimedOutGroups="true"/>
<flow-ref name="whatToDoWithAllTheseNamesFromSomeCountry"/>

この例では、アグリゲータで除去時間を設定していません。これは、デフォルト値が望ましい動作の 0 (グループの解放直後に除去) であるためです。そのため、グループが完了またはタイムアウトした場合、後続の値は拒否されることなくグループに追加されます。

Was this article helpful?

💙 Thanks for your feedback!