Flex Gateway新着情報
Governance新着情報
Monitoring API Managerアグリゲーターの一般的なユースケースは、複数の値を保存して後でバッチとして処理することです。 この集約を実現するには、[Size based aggregator (サイズベースのアグリゲーター)] スコープを使用できます。
集約するデータは、以前に personName
という変数に保存された人物名であり、100 要素のバッチを処理するとします。すべてのデータが収集されたら、whatToDoWithAllTheseNames
というフロー内で定義されたいくつかのロジックを実行します。
この場合、次を使用できます。
<aggregators:size-based-aggregator name="personsNameAggregator" maxSize="100">
<aggregators:content>#[vars.personName]</aggregators:content>
<aggregators:aggregation-complete>
<flow-ref name="whatToDoWithAllTheseNames"/>
</aggregators:aggregation-complete>
</aggregators:size-based-aggregator>
実行数が多すぎてしまうため、すべての新しい要素でアクションを実行しないようにするには、incremental-aggregation
を使用します。たとえば、次に示すように名前のリストが 10% ずつ増加するたびにアクションを実行するとします。
<aggregators:size-based-aggregator name="personsNameAggregator" maxSize="100">
<aggregators:content>#[vars.personName]</aggregators:content>
<aggregators:incremental-aggregation>
<choice>
<when expression="#[(sizeOf(payload) mod 10) == 0]">
<flow-ref name="partialAction"/>
</when>
</choice>
</aggregators:incremental-aggregation>
<aggregators:aggregation-complete>
<flow-ref name="whatToDoWithAllTheseNames"/>
</aggregators:aggregation-complete>
</aggregators:size-based-aggregator>
実行されるロジックは partialAction
という別のフロー内にあるとします。
すべてのバッチが埋まるまで永久に待つことを避けるには、アグリゲーターのタイムアウトを設定して、maxSize
に達していなくても特定の時点ですべてのイベントを処理します。次の例では、タイムアウトを 10 秒に設定しています。
<aggregators:size-based-aggregator name="personsNameAggregator"
maxSize="100"
timeout="10"
timeoutUnit="SECONDS">
<aggregators:content>#[vars.personName]</aggregators:content>
<aggregators:incremental-aggregation>
<choice>
<when expression="#[(sizeOf(payload) mod 10) == 0]">
<flow-ref name="partialAction"/>
</when>
</choice>
</aggregators:incremental-aggregation>
<aggregators:aggregation-complete>
<flow-ref name="whatToDoWithAllTheseNames"/>
</aggregators:aggregation-complete>
</aggregators:size-based-aggregator>
タイムアウトが発生したときに、すべての集約された要素を処理するロジックが実行されるように、次のようなリスナーも含める必要があります。
<aggregators:aggregator-listener aggregatorName="personsNameAggregator" includeTimedOutGroups="true"/>
<flow-ref name="whatToDoWithAllTheseNames"/>
この場合、aggregation-complete
ルートと aggregator-listener
から取得されるデータ型は同じです。そのため、フロー whatToDoWithAllTheseNames
の実行では、データの取得元は関係ありません。
ただしこの設定では、集約が完了した場合、フロー whatToDoWithAllTheseNames
は aggregation-complete
ルート flow-ref
から 1 回、aggregator-listener
でのフロー内の flow-ref
からもう 1 回の計 2 回トリガーされます。
同じロジックが複数回実行されないようにするには、不完全な集約を確認するため、isAggregationComplete
属性を使用する choice
コンポーネントで flow-ref
値 aggregator-listener
をラップする必要があります:
<flow name="test-flow">
...
<aggregators:size-based-aggregator name="personsNameAggregator"
maxSize="100"
timeout="10"
timeoutUnit="SECONDS">
<aggregators:content>#[vars.personName]</aggregators:content>
<aggregators:incremental-aggregation>
<choice>
<when expression="#[(sizeOf(payload) mod 10) == 0]">
<flow-ref name="partialAction"/>
</when>
</choice>
</aggregators:incremental-aggregation>
<aggregators:aggregation-complete>
<flow-ref name="whatToDoWithAllTheseNames"/>
</aggregators:aggregation-complete>
</aggregators:size-based-aggregator>
...
</flow>
<flow name="aggregator-listener-flow">
<aggregators:aggregator-listener aggregatorName="personsNameAggregator" includeTimedOutGroups="true"/>
<choice>
<when expression="#[not attributes.isAggregationComplete]">
<flow-ref name="whatToDoWithAllTheseNames"/>
</when>
<choice>
</flow>
この例では、flow-ref
はタイムアウトでそのフローがトリガーされたときにのみコールされます。
最後に、国籍に従って各人物を処理する必要があります。この情報は変数で渡されます。ただし、この場合は同じ国のすべての人物を同じバッチでまとめて処理するとします。
グループベースのアグリゲーターでそれらを適宜分割できます。たとえば、人物の国籍が以前に personNac
という変数に保存されたとします。この場合、フローは次のようになります。
<aggregators:group-based-aggregator name="personsNameByNacAggregator"
groupId="#[vars.personNac]"
groupSize="100"
timeout="10"
timeoutUnit="SECONDS"
evictionTime="0">
<aggregators:content>#[vars.personName]</aggregators:content>
<aggregators:incremental-aggregation>
<choice>
<when expression="#[(sizeOf(payload) mod 10) == 0]">
<flow-ref name="partialAction"/>
</when>
</choice>
</aggregators:incremental-aggregation>
<aggregators:aggregation-complete>
<flow-ref name="whatToDoWithAllTheseNamesFromSomeCountry"/>
</aggregators:aggregation-complete>
</aggregators:group-based-aggregator>
このフローにはタイムアウト用のリスナーも必要です。
<aggregators:aggregator-listener aggregatorName="personsNameByNacAggregator" includeTimedOutGroups="true"/>
<flow-ref name="whatToDoWithAllTheseNamesFromSomeCountry"/>
この例では、アグリゲーターで除去時間を設定していません。これは、デフォルト値が望ましい動作の 0
(グループの解放直後に除去) であるためです。そのため、グループが完了またはタイムアウトした場合、後続の値は拒否されることなくグループに追加されます。