<aggregators:size-based-aggregator name="sizeBasedAggregator"
maxSize="10"
timeout="60"
timeoutUnit="MINUTES"
objectStore="exampleObjectStore">
<aggregators:content>
#[payload]
</aggregators:content>
<aggregators:incremental-aggregation>
...
</aggregators:incremental-aggregation>
<aggregators:aggregation-complete>
...
</aggregators:aggregation-complete>
</aggregators:size-based-aggregator>
Aggregators Module Reference 1.1 - Mule 4
Release Notes: Aggregators Module Release Notes
An aggregator scope is a component that receives some data, processes the data to extract some value, and then adds that value to a list of aggregated elements. After that process is complete, and depending on the aggregator scope configuration, a set of components in the flow processes the list of elements.
Aggregators are pass-through routers. The same data that an aggregator scope receives is processed by the components that follow the aggregator scope. The only modification is variable propagation if any variables are set during the execution of any of the aggregation routes.
When an aggregator scope releases the stored values, a route component (Incremental aggregation or Aggregation complete) processes the list of aggregated elements within the aggregator scope itself or in another Mule flow, through an Aggregator listener source. The process depends on the configuration of the aggregator scope used.
Configurations
There is no global configuration for the Aggregator Module because the module uses scopes. Each type of aggregator is a different scope that is configurable through its parameters. The following parameters are common in each scope:
-
Content
The expression that defines what to aggregate. The result of the evaluation is the value stored in the aggregation. Because all data related to an aggregator scope is stored in an object store, the content value must be serializable. Otherwise, there the module might not work properly.
-
Object Store
All information related to an aggregator is stored in an object store. The type of object store relates to the expected behavior of the component.
-
Persistent Object Store (Default)
Slower but more reliable than an in-memory object store. Enables data recovery after the application restarts.
-
In-Memory Object Store
Faster than a persistent memory store, but loses all the data if the application restarts.
-
Aggregators Scopes
There are size-based, time-based, and group-based aggregators.
Size-Based Aggregator
The Size based aggregator scope enables you to aggregate elements until a predefined number of elements completes the aggregation.
If the elements reach the Max size specified, two things occur:
-
The elements in the storage are removed and the next element belongs to the new aggregation.
-
The Aggregation complete route executes with the aggregated elements.
Additionally, if an Aggregator listener is registered to the aggregator scope, the listener’s callback executes using the same set of elements.
If the Incremental aggregation route is not null, and the Max size is not reached, then the route component chain executes with all the aggregated elements, including the last element aggregated.
You can also define a Timeout for the aggregator scope. In that case, a scheduled task with that timeout as a delay is registered for execution. The time is computed from the time at which the first element arrives, and no extra task is scheduled if there is another task waiting to be executed. In the case of a timeout, the referenced listener executes only if it supports being called by timeout.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Name |
String |
The name of the aggregator. An aggregator listener can later reference the scope by name. |
x |
|
Content |
Expression |
The expression that defines what to aggregate. The result of the evaluation is the value stored in the aggregation. |
#[payload] |
|
Max Size |
Number |
The total number of elements to aggregate before considering the aggregation complete. |
x |
|
Timeout |
Number |
A maximum time to wait for the aggregation to complete. If the timeout is reached before the total number of elements is equal to Max Size, the aggregation is not considered complete. A value of |
-1(UNLIMITED) |
|
Timeout unit |
Time Unit |
The time unit in which to measure the timeout. |
SECONDS |
|
Object Store |
Object Store |
Either a name to reference a global object store or a definition of a private object store where the aggregated elements will be stored. |
Default Object Store partition. |
|
Aggregation Complete Route |
Route |
Components chain to execute once the aggregation is complete. The Aggregation Complete Route does not execute if the timeout is reached. |
x |
|
Incremental Aggregation Route |
Route |
Components chain to execute for every new element that is aggregated. The payload is the list of all the elements that have been aggregated in the aggregation from the first element to the one that is currently aggregated. |
Time-Based Aggregator
The Time based aggregator scope enables you to aggregate elements until a specified time limit is reached.
<aggregators:time-based-aggregator name="timeBasedAggregator"
period="60"
periodUnit="MINUTES"
maxSize="10"
objectStore="exampleObjectStore">
<aggregators:content>
#[payload]
</aggregators:content>
<aggregators:incremental-aggregation>
...
</aggregators:incremental-aggregation>
</aggregators:time-based-aggregator>
The period taken into account is computed from the time the first element arrives. After the aggregation is released, the timer does not start until the next element arrives.
The aggregator also enables an Incremental aggregation route to be executed every time a new element arrives, unless a Max size is set. If that is the case, the Incremental aggregation route executes every time except when the size of the aggregated elements is equal to the Max size. If an Aggregator listener is present at that moment, the listener callback is also executed.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Name |
String |
The name of the aggregator. An aggregator listener can later reference the scope by name. |
x |
|
Content |
Expression |
The expression that defines what to aggregate. The result of the evaluation is the value stored in the aggregation. |
#[payload] |
|
Period |
Number |
A time period to wait before considering the aggregation complete. |
x |
|
Period unit |
Time Unit |
The time unit in which to measure the time period. |
SECONDS |
|
Max Size |
Number |
The total number of elements to aggregate before considering the aggregation complete. |
-1(UNLIMITED) |
|
Object Store |
Object Store |
Either a name to reference a global object store or a definition of a private object store where the aggregated elements will be stored. |
Default Object Store partition. |
|
Incremental Aggregation Route |
Route |
Components chain to execute for every new element that is aggregated. The payload is the list of all the elements that have been aggregated in the aggregation from the first element to the one that is currently aggregated. |
Group-Based Aggregator
The Group based aggregator scope enables you to aggregate elements into groups by group ID.
<aggregators:group-based-aggregator name="groupBasedAggregator"
groupId="#[correlationId]"
groupSize="#[itemSequenceInfo.sequenceSize]"
evictionTime="180"
evictionTimeUnit="SECONDS"
timeout="60"
timeoutUnit="MINUTES"
objectStore="exampleObjectStore">
<aggregators:content>
#[payload]
</aggregators:content>
<aggregators:incremental-aggregation>
...
</aggregators:incremental-aggregation>
<aggregators:aggregation-complete>
...
</aggregators:aggregation-complete>
</aggregators:group-based-aggregator>
If the elements reach the Max size specified for the group, two things occur:
-
The elements in that group are removed from storage. The group is marked as complete and every new element that arrives at that group raises an exception.
-
The Aggregation complete route executes with the aggregated elements of that particular group.
Every time a new element reaches the aggregator, an ID is resolved. If a group with that ID already exists in the aggregator, the value is added to that group. Otherwise, a new group with that ID is created and the received element is the first element in that group’s aggregation.
Additionally, if an Aggregator listener is registered to the aggregator scope, the listener’s callback executes with the same set of elements.
If the Incremental aggregation route is not null, and the Max size is not reached, then the route component chain executes with all the aggregated elements, including the last element aggregated.
The Group based aggregator scope introduces some important concepts:
-
Group timeout
Specifies when a group must be released because all the necessary elements for the group did not arrive within the expected time. If a group has timed out but is not yet evicted, it rejects attempts to add any new values -
Group eviction
Specifies when a group is removed from the aggregator, regardless of whether it was completed or timed out. If a new element with that group’s ID is received by the aggregator, the group is created again.
Lastly, when elements that reach group-based aggregators come from a sequence that is split (by a ForEach component for example), each element is assigned a different sequenceNumber
. In that case, the elements are sorted in increasing order prior to the aggregation release.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Name |
String |
The name of the aggregator. An aggregator listener can later reference the scope by name. |
x |
|
Content |
Expression |
The expression that defines what to aggregate. The result of the evaluation is the value stored in the aggregation. |
#[payload] |
|
Group Id |
Expression |
The expression to evaluate for every new message received to obtain the ID for the group in which it should be aggregated.. |
#[correlationId] |
|
Group Size |
Number |
The maximum size to assign to the group with the group ID resolved. All messages with the same group ID must have the same group size. If not, only the first resolved group size is considered correct. A warning is logged for every group that does not match. |
#[itemSequenceInfo.sequenceSize] |
|
Eviction Time |
Number |
The time to remember a group ID after it is completed or timed out (0 means: don’t remember, -1: remember forever) |
180 |
|
Eviction Time Unit |
Time Unit |
The time unit for the Eviction Time. |
SECONDS |
|
Timeout |
Number |
The maximum time to wait for the aggregation of a group to complete. If the timeout is reached before the total number of elements in that group is equal to the group’s size, the aggregation is considered complete. To avoid constant group timeouts, a value of |
-1(UNLIMITED) |
|
Timeout unit |
Time Unit |
The time unit in which to measure the timeout. |
SECONDS |
|
Object Store |
Object Store |
Either a name to reference a global object store or a definition of a private object store where the aggregated elements are stored. |
Default Object Store partition |
|
Aggregation Complete Route |
Route |
Components chain to execute once the aggregation is complete. |
x |
|
Incremental Aggregation Route |
Route |
Components chain to execute for every new element that is aggregated. The payload is the list of all the elements that have been aggregated in the aggregation from the first element to the one that is currently aggregated. |
Raises
-
AGGREGATORS:GROUP_COMPLETED
An error occurred during an attempt to add a new element to an already completed group that was not yet evicted.
-
AGGREGATORS:GROUP_TIMED_OUT
An error occurred during an attempt to add a new element to a group that timed out but had not yet been evicted.
-
AGGREGATORS:NO_GROUP_ID
The expression that resolves to the group ID returns null.
-
AGGREGATORS:NO_GROUP_SIZE
The expression that resolves to the group size returns null.
-
AGGREGATORS:AGGREGATOR_CONFIG
The Group size or Timeout fields have invalid values, for example, groupSize < 0.
-
AGGREGATORS:OBJECT_STORE_ACCESS
An error occurred during an attempt to access the object store used to store the aggregated values.
Sources
Aggregator Listener
An Aggregator listener is a source for listening to elements triggered by an aggregator scope.
<aggregators:aggregator-listener aggregatorName="exampleAggregator" includeTimedOutGroups="false">
An Aggregator listener references only aggregator scopes that are inside a flow. Aggregator scopes declared in a subflow are not visible to Aggregator listeners. |
After the aggregator scope referenced by the listener completes an aggregation, the listener is triggered with a list of all the elements.
Because the Aggregator listener is a source, it is located in a different flow than the aggregator. The listener cannot access the context from the aggregator’s flow, and therefore cannot access the flow’s variables.
Although you can use aggregation listeners for any kind of aggregator, it is important for time-driven asynchronous aggregations, which do not execute an aggregator route and can reach components only in flows with an aggregator listener as the source.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Aggregator Name |
String |
The name of the aggregator to listen to. After that aggregator releases its elements, the listener is executed. Each listener can reference only one aggregator, and each aggregator can be referenced by at most one listener. |
x |
|
Include Timed Out Groups |
Boolean |
Indicates whether the listener should be triggered when a group is released due to a timeout. |
false |
Aggregation Attributes
Each time a message goes through an aggregation, some attributes with information about the aggregation are added to the message.
Name | Type | Description |
---|---|---|
Aggregation ID |
String |
The ID from the group in which the element is aggregated. If the aggregation strategy does not aggregate by group, this field is an autogenerated value that is kept until the aggregation is released (as with group-based and time-based aggregators). |
First Item Arrival Time |
Date |
The time when the first value is aggregated. |
Last Item Arrival Time |
Date |
The time when the last value is aggregated. |
Is Aggregation Complete |
Boolean |
True if the aggregation is complete, False otherwise. |
Redelivery Policy
Configures the redelivery policy for executing requests that generate errors. You can add a redelivery policy to any source in a flow.
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Max Redelivery Count |
Number |
Maximum number of times that a redelivered request can be processed unsuccessfully before returning a REDELIVERY_EXHAUSTED error. |
||
Use Secure Hash |
Boolean |
If |
||
Message Digest Algorithm |
String |
Secure hashing algorithm to use if the <b>Use Secure Hash<b/> field is |
||
Id Expression |
String |
One or more expressions that determine when a message was redelivered. This property can be set only if the <b>Use Secure Hash</b> field is |
||
Object Store |
Object Store |
Configures the object store that stores the redelivery counter for each message. |
System Properties
Property | Description | Default Value | Example |
---|---|---|---|
|
Currently a Mule app deployment fails, if the quorum constraint is not met. This system property sends the quorum poll to the background to deploy the Mule app. |
|
|
|
Configures the time (in milliseconds) between quorum polls, and the property takes effect if the |
|
|
|
Configures the scheduling period (in milliseconds) to determine if Size-Based or Time-Based aggregators reached the configured size. The module triggers a GET call every 1 second. |
|