Contact Free trial Login

Aggregators Examples - Mule 4

A common use case for an aggregator is to store multiple values, then process them as a batch later. That is easily accomplished by using a size-based aggregator.

Suppose that the data that you want to aggregate is a person’s name that was previously stored in a variable with name personName, and you want to process batches of 100 elements. Once all the data is collected, you want to execute some logic defined in a flow with the name whatToDoWithAllTheseNames.

So, you could use this:

<aggregators:size-based-aggregator name="personsNameAggregator" maxSize="100">
    <aggregators:content>#[vars.personName]</aggregators:content>
    <aggregators:aggregation-complete>
        <flow-ref name="whatToDoWithAllTheseNames"/>
    </aggregators:aggregation-complete>
</aggregators:size-based-aggregator>

If you do not want to execute an action for every new element because that would cause too many executions, you can use incremental-aggregation. For example, you could execute an action every time your list of names increases by an increment of 10% as shown here:

<aggregators:size-based-aggregator name="personsNameAggregator" maxSize="100">
    <aggregators:content>#[vars.personName]</aggregators:content>
    <aggregators:incremental-aggregation>
        <choice>
            <when expression="#[(sizeOf(payload) mod 10) == 0]">
                <flow-ref name="partialAction"/>
            </when>
        </choice>
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        <flow-ref name="whatToDoWithAllTheseNames"/>
    </aggregators:aggregation-complete>
</aggregators:size-based-aggregator>

You can assume that the logic to be executed is in another flow named partialAction.

If you do not want to wait forever to fill every batch, you can process all the events that you have even if the maxSize value is not reached, by setting a timeout for your aggregator. The following example sets the timeout to 10 seconds.

<aggregators:size-based-aggregator name="personsNameAggregator"
                                   maxSize="100"
                                   timeout="10"
                                   timeoutUnit="SECONDS">
    <aggregators:content>#[vars.personName]</aggregators:content>
    <aggregators:incremental-aggregation>
        <choice>
            <when expression="#[(sizeOf(payload) mod 10) == 0]">
                <flow-ref name="partialAction"/>
            </when>
        </choice>
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        <flow-ref name="whatToDoWithAllTheseNames"/>
    </aggregators:aggregation-complete>
</aggregators:size-based-aggregator>

To make sure that the logic that processes all aggregated elements executes once a timeout occurs, you also need to include a listener, for example:

<aggregators:aggregator-listener aggregatorName="personsNameAggregator" includeTimedOutGroups="true"/>
<flow-ref name="whatToDoWithAllTheseNames"/>

For this case, the data types that come from the aggregation-complete route and aggregator-listener are the same. So, for the execution of the flow whatToDoWithAllTheseNames, it does not matter where the data comes from. However, with this configuration, when the aggregation is complete, the flow whatToDoWithAllTheseNames triggers twice: once from the aggregation-complete route flow-ref, and again from the flow-ref in the flow with the aggregator-listener.

To avoid multiple executions of the same logic, wrap the flow-ref value aggregator-listener in a choice component that uses the isAggregationComplete attribute to check for an incomplete aggregation:

<flow name="test-flow">
    ...
    <aggregators:size-based-aggregator name="personsNameAggregator"
                                       maxSize="100"
                                       timeout="10"
                                       timeoutUnit="SECONDS">
        <aggregators:content>#[vars.personName]</aggregators:content>
        <aggregators:incremental-aggregation>
            <choice>
                <when expression="#[(sizeOf(payload) mod 10) == 0]">
                    <flow-ref name="partialAction"/>
                </when>
            </choice>
        </aggregators:incremental-aggregation>
        <aggregators:aggregation-complete>
            <flow-ref name="whatToDoWithAllTheseNames"/>
        </aggregators:aggregation-complete>
    </aggregators:size-based-aggregator>
    ...
</flow>

<flow name="aggregator-listener-flow">
    <aggregators:aggregator-listener aggregatorName="personsNameAggregator" includeTimedOutGroups="true"/>
    <choice>
        <when expression="#[not attributes.isAggregationComplete]">
            <flow-ref name="whatToDoWithAllTheseNames"/>
        </when>
    <choice>
</flow>

In this example, the flow-ref is called only when a timeout triggers that flow.

Lastly, you realize that you need to process each person according to nationality. This information comes in a variable. However, in this case, assume that you want to process every person from the same country together in the same batch.

You can split them accordingly with a group-based aggregator. For example, assume that the person’s nationality was previously stored in a variable with name personNac. In this case, the flow should contain this:

<aggregators:group-based-aggregator name="personsNameByNacAggregator"
                                    groupId="#[vars.personNac]"
                                    groupSize="100"
                                    timeout="10"
                                    timeoutUnit="SECONDS"
                                    evictionTime="0">
    <aggregators:content>#[vars.personName]</aggregators:content>
    <aggregators:incremental-aggregation>
        <choice>
            <when expression="#[(sizeOf(payload) mod 10) == 0]">
                <flow-ref name="partialAction"/>
            </when>
        </choice>
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        <flow-ref name="whatToDoWithAllTheseNamesFromSomeCountry"/>
    </aggregators:aggregation-complete>
</aggregators:group-based-aggregator>

The flow also needs a listener for timeouts:

<aggregators:aggregator-listener aggregatorName="personsNameByNacAggregator" includeTimedOutGroups="true"/>
<flow-ref name="whatToDoWithAllTheseNamesFromSomeCountry"/>

Notice that the example does not configure an eviction time in the aggregator. That is because the default value is 0, which means the eviction occurs directly after the group is released, which is the desired behavior. Therefore, if a group finishes or is timed out, a subsequent value can be added to the group without being rejected.