Time-Based Aggregator Example - Mule 4
The Aggregators module Time based aggregator scope enables you to aggregate elements within a predefined time limit. The following example shows how to configure this scope to aggregate HTTP request calls within 5
seconds. The application uses DataWeave to set the payload of each call with a random ID number. When the aggregation reaches 5
seconds, an Aggregator listener source gets the list of elements and the application logs all the aggregated random ID numbers.
To test this example in your own environment, you must create the Mule application and test it with curl commands.
Create the Mule Application
To create the Mule flow in Anypoint Studio:
-
In Mule Palette, select HTTP Listener and drag it onto the canvas.
The source initiates the flow by listening for incoming HTTP message attributes. -
In Connector configuration, select HTTP_Listener_config.
-
Set Path to
/test
. -
Drag a Transform message component to the right of Listener.
-
In the DataSense preview window of the Transform message component, add the following DataWeave expression, which sets to each HTTP request payload a
Source Name
as"time-based"
and aSource ID
asrandom() as String
:%dw 2.0 output application/json --- { "Source Name": "time-based", "Source ID": (random() as String) }
-
Drag the Time based aggregator scope to the right of Transform message.
-
Set Name to
timeBasedAggregator
. -
Set Period to
5
. -
Set Period unit to
SECONDS(Default)
. -
Set Content to
payload
, which is the expression that defines what to aggregate; in this case, the transformed HTTP requests. -
Drag a Logger component into the Incremental aggregation route.
-
In the Logger configuration screen, set Message to
Doing incremental step.
-
Drag an Aggregator listener source below the first flow.
This source listens to the previous defined time-based aggregator. Once the aggregator releases its elements, the listener is executed. -
Set Aggregator name to
timeBasedAggregator
. -
Drag a Logger component to the right of Aggregator listener
-
Set Message to
Aggregation complete
. -
Drag another Logger component to the right of the previous added Logger component.
-
Set Message to the following expression, which returns all the aggregated elements:
output application/json --- payload
-
Save the project.
-
In Package Explorer, click Run > Run As > Mule Application.
-
Test the app by sending the following curl command twice within
5
seconds:
curl -X POST http://localhost:8086/test
This generates two HTTP requests for the Time based aggregator to process. -
Navigate to the Console view to read the logger message:
INFO 2021-06-09 12:02:04,203 [[MuleRuntime].uber.04: [aggregator-time-demo].aggregator-time-demoFlow.CPU_INTENSIVE @59b15997] [processor: aggregator-time-demoFlow/processors/1/route/0/processors/0; event: a6b00280-c933-11eb-a825-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental step.
INFO 2021-06-09 12:02:06,071 [[MuleRuntime].uber.04: [aggregator-time-demo].aggregator-time-demoFlow.CPU_INTENSIVE @59b15997] [processor: aggregator-time-demoFlow/processors/1/route/0/processors/0; event: a7cd0b40-c933-11eb-a825-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental step.
INFO 2021-06-09 12:02:09,206 [[MuleRuntime].uber.04: [aggregator-time-demo].aggregator-time-demoOnCompleteFlow.CPU_LITE @2ab3c271] [processor: aggregator-time-demoOnCompleteFlow/processors/0; event: 827c9591-c933-11eb-a825-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Aggregation complete
INFO 2021-06-09 12:02:09,208 [[MuleRuntime].uber.04: [aggregator-time-demo].aggregator-time-demoOnCompleteFlow.CPU_LITE @2ab3c271] [processor: aggregator-time-demoOnCompleteFlow/processors/1; event: 827c9591-c933-11eb-a825-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
{
"Source Name": "time-based",
"Source ID": "0.7095971146328517"
},
{
"Source Name": "time-based",
"Source ID": "0.6900116288202388"
}
]
This example logger message shows that for each HTTP request, the Logger component in the Incremental aggregator route logs the message
Doing incremental step.
After the timeBasedAggregator
value that is referenced by the Aggregator listener source completes the incremental aggregations within 5
seconds, the listener is triggered with a list of all the elements. The subsequent Logger component logs the message Aggregation complete
, and the second Logger component returns the aggregated payload elements, which are the random ID numbers:
{
"Source Name": "time-based",
"Source ID": "0.7095971146328517"
},
{
"Source Name": "time-based",
"Source ID": "0.6900116288202388"
}
XML for Time-Based Aggregator Example
Paste this code into your Studio XML editor to quickly load the flow for this example into your Mule app:
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:aggregators="http://www.mulesoft.org/schema/mule/aggregators" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/aggregators http://www.mulesoft.org/schema/mule/aggregators/current/mule-aggregators.xsd">
<http:listener-config name="HTTP_Listener_config" >
<http:listener-connection host="0.0.0.0" port="8086" />
</http:listener-config>
<flow name="aggregator-time-demoFlow" >
<http:listener config-ref="HTTP_Listener_config" path="/test" allowedMethods="POST"/>
<ee:transform >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
"Source Name": "time-based",
"Source ID": (random() as String)
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<aggregators:time-based-aggregator period="5" name="timeBasedAggregator">
<aggregators:incremental-aggregation >
<logger level="INFO" message="Doing incremental step."/>
</aggregators:incremental-aggregation>
</aggregators:time-based-aggregator>
</flow>
<flow name="aggregator-time-demoOnCompleteFlow" >
<aggregators:aggregator-listener aggregatorName="timeBasedAggregator"/>
<logger level="INFO" message="Aggregation complete" />
<logger level="INFO" message="#[output application/json
---
payload]" />
</flow>
</mule>