<aggregators:size-based-aggregator name="personsNameAggregator" maxSize="100">
<aggregators:content>#[vars.personName]</aggregators:content>
<aggregators:aggregation-complete>
<flow-ref name="whatToDoWithAllTheseNames"/>
</aggregators:aggregation-complete>
</aggregators:size-based-aggregator>
Aggregators Module Multiple Examples - Mule 4
A common use case for an aggregator is to store multiple values, then process them as a batch later. You can accomplish this aggregation using the Size based aggregator scope.
Suppose that the data that you want to aggregate is a person’s name that was previously stored in a variable with name personName
, and you want to process batches of 100 elements. Once all the data is collected, you want to execute some logic defined in a flow with the name whatToDoWithAllTheseNames
.
So, you could use this:
If you do not want to execute an action for every new element because that would cause too many executions, you can use incremental-aggregation
. For example, you could execute an action every time your list of names increases by an increment of 10% as shown here:
<aggregators:size-based-aggregator name="personsNameAggregator" maxSize="100">
<aggregators:content>#[vars.personName]</aggregators:content>
<aggregators:incremental-aggregation>
<choice>
<when expression="#[(sizeOf(payload) mod 10) == 0]">
<flow-ref name="partialAction"/>
</when>
</choice>
</aggregators:incremental-aggregation>
<aggregators:aggregation-complete>
<flow-ref name="whatToDoWithAllTheseNames"/>
</aggregators:aggregation-complete>
</aggregators:size-based-aggregator>
You can assume that the logic to be executed is in another flow named partialAction
.
If you do not want to wait forever to fill every batch, you can process all the events that you have even if the maxSize
value is not reached, by setting a timeout for your aggregator. The following example sets the timeout to 10 seconds.
<aggregators:size-based-aggregator name="personsNameAggregator"
maxSize="100"
timeout="10"
timeoutUnit="SECONDS">
<aggregators:content>#[vars.personName]</aggregators:content>
<aggregators:incremental-aggregation>
<choice>
<when expression="#[(sizeOf(payload) mod 10) == 0]">
<flow-ref name="partialAction"/>
</when>
</choice>
</aggregators:incremental-aggregation>
<aggregators:aggregation-complete>
<flow-ref name="whatToDoWithAllTheseNames"/>
</aggregators:aggregation-complete>
</aggregators:size-based-aggregator>
To make sure that the logic that processes all aggregated elements executes once a timeout occurs, you also need to include a listener, for example:
<aggregators:aggregator-listener aggregatorName="personsNameAggregator" includeTimedOutGroups="true"/>
<flow-ref name="whatToDoWithAllTheseNames"/>
For this case, the data types that come from the aggregation-complete
route and aggregator-listener
are the same. So, for the execution of the flow whatToDoWithAllTheseNames
, it does not matter where the data comes from.
However, with this configuration, when the aggregation is complete, the flow whatToDoWithAllTheseNames
triggers twice: once from the aggregation-complete
route flow-ref
, and again from the flow-ref
in the flow with the aggregator-listener
.
To avoid multiple executions of the same logic, wrap the flow-ref
value aggregator-listener
in a choice
component that uses the isAggregationComplete
attribute to check for an incomplete aggregation:
<flow name="test-flow">
...
<aggregators:size-based-aggregator name="personsNameAggregator"
maxSize="100"
timeout="10"
timeoutUnit="SECONDS">
<aggregators:content>#[vars.personName]</aggregators:content>
<aggregators:incremental-aggregation>
<choice>
<when expression="#[(sizeOf(payload) mod 10) == 0]">
<flow-ref name="partialAction"/>
</when>
</choice>
</aggregators:incremental-aggregation>
<aggregators:aggregation-complete>
<flow-ref name="whatToDoWithAllTheseNames"/>
</aggregators:aggregation-complete>
</aggregators:size-based-aggregator>
...
</flow>
<flow name="aggregator-listener-flow">
<aggregators:aggregator-listener aggregatorName="personsNameAggregator" includeTimedOutGroups="true"/>
<choice>
<when expression="#[not attributes.isAggregationComplete]">
<flow-ref name="whatToDoWithAllTheseNames"/>
</when>
<choice>
</flow>
In this example, the flow-ref
is called only when a timeout triggers that flow.
Lastly, you realize that you need to process each person according to nationality. This information comes in a variable. However, in this case, assume that you want to process every person from the same country together in the same batch.
You can split them accordingly with a group-based aggregator. For example, assume that the person’s nationality was previously stored in a variable with name personNac
. In this case, the flow should contain this:
<aggregators:group-based-aggregator name="personsNameByNacAggregator"
groupId="#[vars.personNac]"
groupSize="100"
timeout="10"
timeoutUnit="SECONDS"
evictionTime="0">
<aggregators:content>#[vars.personName]</aggregators:content>
<aggregators:incremental-aggregation>
<choice>
<when expression="#[(sizeOf(payload) mod 10) == 0]">
<flow-ref name="partialAction"/>
</when>
</choice>
</aggregators:incremental-aggregation>
<aggregators:aggregation-complete>
<flow-ref name="whatToDoWithAllTheseNamesFromSomeCountry"/>
</aggregators:aggregation-complete>
</aggregators:group-based-aggregator>
The flow also needs a listener for timeouts:
<aggregators:aggregator-listener aggregatorName="personsNameByNacAggregator" includeTimedOutGroups="true"/>
<flow-ref name="whatToDoWithAllTheseNamesFromSomeCountry"/>
Notice that the example does not configure an eviction time in the aggregator. That is because the default value is 0
, which means the eviction occurs directly after the group is released, which is the desired behavior. Therefore, if a group finishes or is timed out, a subsequent value can be added to the group without being rejected.