Nav

Aggregators Module Reference

An Aggregator is a component that receives some data, processes it to extract some value and then adds that value to a list of aggregated elements. After that, and depending on the configuration, the list of elements is sent to a set of components.

Aggregators are pass-through routers, meaning that the same data that is received by the aggregator is going to be processed by the components that follow it. The only modifications are the propagation of variables in case any is set while executing any of the aggregation routes.

When an aggregator releases the stored values, the list of aggregated elements is processed either by a route within the aggregator itself or in another Mule Flow, through an aggregator listener. All of that depends on the type and configuration of the aggregator being used.

Configurations

There is no configuration for Aggregators because the module is based only on operations. Each different type of aggregator is a different operation that configurable through its parameters.

However, there are a couple of common parameters that are worth mentioning beforehand:

  • Content:

    The expression that defines what to aggregate. The result of the evaluation is the value stored in the aggregation. Since all data related to an aggregator is stored in an Object Store, it’s really important that the content value is Serializable. Otherwise, there is no assurance that the module will work properly.

  • Object Store:

    All information related to an aggregator is stored in an Object Store. The type of Object Store should relate to the expected behavior of the component.

    Generally speaking:

    • Persistent Object Store (Default): More reliable, permits data recovery after restart. Slower.

    • In Memory Object Store: Faster. Loses all the data if the application is restarted.

Aggregator Types

There are size-based, time-based, and group-based aggregators.

Size-Based Aggregator


          
       
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<aggregators:size-based-aggregator  name="sizeBasedAggregator"
                                    maxSize="10"
                                    timeout="60"
                                    timeoutUnit="MINUTES"
                                    objectStore="exampleObjectStore">
    <aggregators:content>
        #[payload]
    </aggregators:content>
    <aggregators:incremental-aggregation>
        ...
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        ...
    </aggregators:aggregation-complete>
</aggregators:size-based-aggregator>

This example aggregates elements until a pre-defined size is reached, executing the routes and listeners.

Parameters

Name Type Description Default Value Required

Name

String

The name of the aggregator. It can later be referenced by an aggregator listener.

x

Content

Expression

The expression that defines what to aggregate. The result of the evaluation is the value stored in the aggregation.

#[payload]

Max Size

Number

The total number of elements to be aggregated before considering the aggregation complete.

x

Timeout

Number

A maximum time to wait for the aggregation to complete. If the timeout is reached before the total number of elements is equal to max size, the aggregation is considered complete. A value of 0 is not supported to avoid timing out the group constantly.

-1(UNLIMITED)

Timeout unit

Time Unit

The time unit in which to measure the timeout.

SECONDS

Object Store

Object Store

Either a name to reference a global object store or a definition of a private object store where the aggregated elements will be stored.

Default Object Store partition.

Aggregation Complete Route

Route

Components chain to execute once the aggregation is complete.

x

Incremental Aggregation Route

Route

Components chain to execute for every new element that is aggregated. The payload is the list of all the elements that have been aggregated from the first one (in this aggregation) to the one that is being aggregated now.

Raises

  • AGGREGATORS:AGGREGATOR_CONFIG

    Whenever maxSize or timeout have invalid values (for example, maxSize < 0).

  • AGGREGATORS:OBJECT_STORE_ACCESS

    If there is any issue accessing the ObjectStore used to store the aggregated values.

Time-Based Aggregator


          
       
1
2
3
4
5
6
7
8
9
10
11
12
<aggregators:time-based-aggregator  name="timeBasedAggregator"
                                    period="60"
                                    periodUnit="MINUTES"
                                    maxSize="10"
                                    objectStore="exampleObjectStore">
    <aggregators:content>
        #[payload]
    </aggregators:content>
    <aggregators:incremental-aggregation>
        ...
    </aggregators:incremental-aggregation>
</aggregators:time-based-aggregator>

Aggregates elements until a time period is completed, executing the routes and listeners.

Parameters

Name Type Description Default Value Required

Name

String

The name of the aggregator. It can later be referenced by an aggregator listener.

x

Content

Expression

The expression that defines what to aggregate. The result of the evaluation is the value stored in the aggregation.

#[payload]

Period

Number

A time period to wait before considering the aggregation to be complete.

x

Period unit

Time Unit

The time unit in which to measure the time period.

SECONDS

Max Size

Number

The total number of elements to be aggregated before considering the aggregation to be complete.

-1(UNLIMITED)

Object Store

Object Store

Either a name to reference a global object store or a definition of a private object store where the aggregated elements will be stored.

Default Object Store partition.

Incremental Aggregation Route

Route

Components chain to be executed for every new element that is aggregated. The payload is the list of all the elements that have been aggregated from the first one (in this aggregation) to the one that is being aggregated now.

Raises

  • AGGREGATORS:AGGREGATOR_CONFIG

    Whenever period or maxSize has invalid values (for example, Period = 0).

  • AGGREGATORS:OBJECT_STORE_ACCESS

    If there is any issue accessing the ObjectStore used to store the aggregated values.

Group-Based Aggregator


          
       
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<aggregators:group-based-aggregator name="groupBasedAggregator"
                                    groupId="#[correlationId]"
                                    groupSize="#[itemSequenceInfo.sequenceSize]"
                                    evictionTime="180"
                                    evictionTimeUnit="SECONDS"
                                    timeout="60"
                                    timeoutUnit="MINUTES"
                                    objectStore="exampleObjectStore">
    <aggregators:content>
        #[payload]
    </aggregators:content>
    <aggregators:incremental-aggregation>
        ...
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        ...
    </aggregators:aggregation-complete>
</aggregators:group-based-aggregator>

Aggregates elements in different groups according to a group ID.

Every time a new element reaches the aggregator, an ID will be resolved. If a group with that ID already exists in the aggregator, the value will be added to that group. Otherwise, a new group with that ID will be created and the received element will be the first element in that group’s aggregation.

Some important concepts appear with the group-based aggregator:

  • Group timeout: When a group has to be released because all the necessary elements for the group did not arrive within the expected time. If a group has timed out but is not yet evicted, it will reject attempts to add any new values to that group.

  • Group eviction: When a group is removed from the aggregator, regardless of whether it was completed or timed out. If a new element with that group’s ID is received by the aggregator, the group will be created again.

Lastly, when elements that reach group-based aggregators come from a sequence that was splitted (by a ForEach component for example), each will have assigned a different sequenceNumber. In that case, they are sorted in increasing order prior to the aggregation release.

Parameters

Name Type Description Default Value Required

Name

String

The name of the aggregator. It can later be used to be referenced by an aggregator listener

x

Content

Expression

The expression that defines what to aggregate. The result of the evaluation is the value stored in the aggregation.

#[payload]

Group Id

Expression

The expression to be evaluated for every new message received in order to get the ID for the group where it should be aggregated.

#[correlationId]

Group Size

Number

The maximum size to assign to the group with the group ID resolved. All messages with the same group ID must have the same group size. If not, only the first resolved group size will be considered correct, and a warning will be logged for every one that does not match it.

#[itemSequenceInfo.sequenceSize]

Eviction Time

Number

The time to remember a group ID once it was completed or timed out (0 means: don’t remember, -1: remember forever)

180

Eviction Time Unit

Time Unit

The time unit for the Eviction Time.

SECONDS

Timeout

Number

A maximum time to wait for the aggregation of a group to complete. If the timeout is reached before the total number of elements in that group is equal to the group’s size, the aggregation will be considered complete. To avoid constant group timeouts, a value of 0 is not supported.

-1(UNLIMITED)

Timeout unit

Time Unit

The time unit in which to measure the timeout.

SECONDS

Object Store

Object Store

Either a name to reference a global object store or a definition of a private object store where the aggregated elements will be stored.

Default Object Store partition

Aggregation Complete Route

Route

Components chain to execute once the aggregation is complete.

x

Incremental Aggregation Route

Route

Components chain to execute for every new element that is aggregated. The payload is the list of all the elements that have been aggregated from the first one (in this aggregation) to the one that is being aggregated now.

Raises

  • AGGREGATORS:GROUP_COMPLETED

    When a new element has to be added to an already completed group (and the group was not yet evicted).

  • AGGREGATORS:GROUP_TIMED_OUT

    When a new element has to be added to a group that timed out (and the group was not yet evicted).

  • AGGREGATORS:NO_GROUP_ID

    When the expression that resolves to the group ID returns null.

  • AGGREGATORS:NO_GROUP_SIZE

    When the expression that resolves to the group size returns null.

  • AGGREGATORS:AGGREGATOR_CONFIG

    When the group size or timeout has invalid values (for example, groupSize < 0).

  • AGGREGATORS:OBJECT_STORE_ACCESS

    If there is any issue accessing the ObjectStore used to store the aggregated values.

Sources

Aggregator Listener

<aggregators:aggregator-listener aggregatorName="exampleAggregator" includeTimedOutGroups="false">

Once the aggregator that is referenced by the listener completes an aggregation, the listener is triggered with a list of all the elements. Though the aggregation listener can be used for any kind of aggregator, it is important for time-driven (Async) aggregations. Such aggregations are triggered asynchronously, so they do not execute an aggregator route and can only reach components in flows with an aggregator listener as the source.

Parameters

Name Type Description Default Value Required

Aggregator Name

String

The name of the aggregator to listen to. Once that aggregator releases its elements, the listener will be executed. Each listener can only reference one aggregator, and each aggregator can only be referenced by at most one listener.

x

Include Timed Out Groups

Boolean

Indicates whether the listener should be triggered when a group is released due to a timeout.

false

Aggregation Attributes

Each time a message goes through an aggregation, some attributes with information about the aggregation are added to it.

Name Type Description

Aggregation ID

String

The ID from the group where the element was aggregated. If the aggregation strategy does not aggregate by group, this field will be an autogenerated value that is kept until the aggregation is released (as with group-based and time-based aggregators).

First Item Arrival Time

Date

The time when the first value was aggregated.

Last Item Arrival Time

Date

The time when the last value was aggregated.

Is Aggregation Complete

Boolean

True if the aggregation is complete, False otherwise.

Async Versus Sync Aggregations

There are two kinds of triggers for an aggregation completion: Synchronous and Asynchronous.
As seen in the configurations, an aggregation can be considered complete based on a new element being added to the list (as when a max size is specified) or because some timeout or time period was completed. This is important because the type of aggregation determines which chain of components to execute with the list of elements.

Each time counter associated with an aggregator starts counting from the moment the first message of a group arrives. Once the aggregation is complete, the counter resets and waits for the next element to arrive.
For single-group aggregators (time-based and size-based aggregators), there is only one time counter, but for the group-based aggregator, there is one counter per group.

For aggregations completed because a new element arrives (Sync), at least one of the following things will happen:

  • If the aggregator was configured with an aggregation-complete route, the components inside that route will be executed with the payload being the list of aggregated elements.

  • If the aggregator has a listener hooked, the flow which that listener belongs to will be executed, with the payload being the list of aggregated elements.

For aggregations completed due to a time period or timeout being reached (Async), the only thing that can happen is:

  • If the aggregator has a listener hooked and the listener accepts timed out aggregations, the flow which that listener belongs to will be executed, with the payload being the list of aggregated elements.

This is very important and should always be present when creating an application that contains an aggregator.

As a general rule:

  • If the aggregation depends on time, all the logic for processing it should be in a different flow, with an aggregation listener as source.

  • If it depends on a size being reached, the logic can be declared in the aggregation-complete route.

  • If it can be both, then a good approach is to:

    • Add the main logic in a sub-flow with no source. You can check the isAggregationComplete attribute to check how the aggregation was released.

    • Add a flow reference to the main logic flow in the aggregation-complete route. If the sub-flow is executed by this flow reference, then isAggregationComplete will be true.

    • Add another flow with an aggregation listener that listens to the aggregator and accepts timed out groups. The listener should be followed by a flow reference that calls the main logic sub-flow. In this case, isAggregationComplente will be false.

Aggregators in a Cluster

The module is developed to work in a cluster out-of-the-box. However, to prevent unexpected behavior, you need to take into account these configuration details:

When an Async aggregation is defined and the first element arrives, it is scheduled in the primary node of the cluster. Because new values can arrive in any node of the cluster, we need a way to notify and make the primary node schedule that aggregation. To do that, another task in the primary node periodically determines whether it is necessary to schedule a new aggregation or not. This can lead to a problem if the interval between checks for new aggregation scheduling is much longer than the actual timeout of the aggregation, because the aggregation could be over before it is scheduled, or there might be errors in the time computation.

To avoid this issue, you can configure the frequency at which the primary node checks for new aggregations to be scheduled. You can define this value using either:

  • The global configuration property (in ms) aggregatorsSchedulingPeriod

  • The system property -M-Dmule.aggregatorsSchedulingPeriod

Object Store Configuration

For any aggregator, an object store can be configured either by referencing a global OS or creating a private one.

Global

         
      
1
2
3
<aggregators:size-based-aggregator name="globalOSAggregator"
                                   maxSize="10"
                                   objectStore="aGlobalObjectStore">
Private

         
      
1
2
3
4
5
6
<aggregators:size-based-aggregator  name="privateOSAggregator" maxSize="10">
    ...
    <aggregators:object-store>
        <os:private-object-store alias="privateObjectStore" persistent="false"/>
    </aggregators:object-store>
</aggregators:group-based-aggregator>

We use cookies to make interactions with our websites and services easy and meaningful, to better understand how they are used and to tailor advertising. You can read more and make your cookie choices here. By continuing to use this site you are giving us your consent to do this.

+