Contact Free trial Login

Aggregator Examples

A common use case for an aggregator is to store multiple values, then process them as a batch later. That is easily accomplished by using a size-based aggregator.

Suppose that the data that you want to aggregate is a person’s name that was previously stored in a variable with name personName, and you want to process batches of 100 elements. Once all the data is collected, you want to execute some logic defined in a flow with the name whatToDoWithAllTheseNames.

So, you could use this:

<aggregators:size-based-aggregator name="personsNameAggregator" maxSize="100">
    <aggregators:content>#[vars.personName]</aggregators:content>
    <aggregators:aggregation-complete>
        <flow-ref name="whatToDoWithAllTheseNames"/>
    </aggregators:aggregation-complete>
</aggregators:size-based-aggregator>

Now, suppose that you want to execute some action every time your list of names is filled by a 10%. You do not want to do this for every new element because that would cause too many executions. You can assume that the logic to be executed is in another flow named partialAction. We can you the incremental-aggregation route for this.

<aggregators:size-based-aggregator name="personsNameAggregator" maxSize="100">
    <aggregators:content>#[vars.personName]</aggregators:content>
    <aggregators:incremental-aggregation>
        <choice>
            <when expression="#[(sizeOf(payload) mod 10) == 0]">
                <flow-ref name="partialAction"/>
            </when>
        </choice>
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        <flow-ref name="whatToDoWithAllTheseNames"/>
    </aggregators:aggregation-complete>
</aggregators:size-based-aggregator>

But what if you do not want to wait forever to fill every batch. In that case, you might want to process all the events that you have even if the maxSize is not reached. For this, you can configure a timeout for your aggregator. This example sets the waiting limit to 10 seconds.

<aggregators:size-based-aggregator name="personsNameAggregator"
                                   maxSize="100"
                                   timeout="10"
                                   timeoutUnit="SECONDS">
    <aggregators:content>#[vars.personName]</aggregators:content>
    <aggregators:incremental-aggregation>
        <choice>
            <when expression="#[(sizeOf(payload) mod 10) == 0]">
                <flow-ref name="partialAction"/>
            </when>
        </choice>
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        <flow-ref name="whatToDoWithAllTheseNames"/>
    </aggregators:aggregation-complete>
</aggregators:size-based-aggregator>

To make sure that the logic that processes all aggregated elements executes once a timeout occurs, you also need to include a listener, for example:

<aggregators:aggregator-listener aggregatorName="personsNameAggregator" includeTimedOutGroups="true"/>
<flow-ref name="whatToDoWithAllTheseNames"/>

For this case, the data types that come from the aggregation-complete route and aggregator-listener are the same. So, for the execution of the flow whatToDoWithAllTheseNames, it does not matter where the data comes from. However, with this configuration, in the case the aggregation is complete, the flow whatToDoWithAllTheseNames will trigger twice, once from the aggregation-complete route flow-ref and again from the flow-ref in the flow with the aggregator-listener.

To avoid multiple executions of the same logic, the flow-ref after the aggregator-listener should be wrapped in a choice component that uses the isAggregationComplete attribute to check for an incomplete aggregation. That way, the flow-ref is only called when a timeout triggered that flow.

<flow name="test-flow">
    ...
    <aggregators:size-based-aggregator name="personsNameAggregator"
                                       maxSize="100"
                                       timeout="10"
                                       timeoutUnit="SECONDS">
        <aggregators:content>#[vars.personName]</aggregators:content>
        <aggregators:incremental-aggregation>
            <choice>
                <when expression="#[(sizeOf(payload) mod 10) == 0]">
                    <flow-ref name="partialAction"/>
                </when>
            </choice>
        </aggregators:incremental-aggregation>
        <aggregators:aggregation-complete>
            <flow-ref name="whatToDoWithAllTheseNames"/>
        </aggregators:aggregation-complete>
    </aggregators:size-based-aggregator>
    ...
</flow>

<flow name="aggregator-listener-flow">
    <aggregators:aggregator-listener aggregatorName="personsNameAggregator" includeTimedOutGroups="true"/>
    <choice>
        <when expression="#[not attributes.isAggregationComplete]">
            <flow-ref name="whatToDoWithAllTheseNames"/>
        </when>
    <choice>
</flow>

Lastly, you realize that you need to process each person according to nationality. This information comes in a variable. However, in this case, assume that you want to process every person from the same country together in the same batch.

You can split them accordingly with a group-based aggregator. For example, assume that the person’s nationality was previously stored in a variable with name personNac. In this case, the flow should contain this:

<aggregators:group-based-aggregator name="personsNameByNacAggregator"
                                    groupId="#[vars.personNac]"
                                    groupSize="100"
                                    timeout="10"
                                    timeoutUnit="SECONDS"
                                    evictionTime="0">
    <aggregators:content>#[vars.personName]</aggregators:content>
    <aggregators:incremental-aggregation>
        <choice>
            <when expression="#[(sizeOf(payload) mod 10) == 0]">
                <flow-ref name="partialAction"/>
            </when>
        </choice>
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        <flow-ref name="whatToDoWithAllTheseNamesFromSomeCountry"/>
    </aggregators:aggregation-complete>
</aggregators:group-based-aggregator>

The flow also needs a listener for timeouts:

<aggregators:aggregator-listener aggregatorName="personsNameByNacAggregator" includeTimedOutGroups="true"/>
<flow-ref name="whatToDoWithAllTheseNamesFromSomeCountry"/>

Notice that the example does not configure an eviction time in the aggregator. That is because the default value is 0 (Evict right after the group is released), which is the desired behavior. So if a group completed or timed out, a subsequent value can be added to the group without being rejected.

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub