Group-Based Aggregator Example - Mule 4
The Aggregators module Group based aggregator scope enables you to aggregate elements into groups by group ID. The following example shows how to configure this scope to aggregate up to 3
HTTP request calls per group based on the group ID sent in the request. The application uses DataWeave to set the payload of each call with a random ID number. For each group that completes the aggregation of 3
elements, the application logs all the aggregated random ID number elements of the group.
To test this example in your own environment, you must create the Mule application and test it with curl commands.
Create the Mule Application
To create the Mule flow in Anypoint Studio:
-
In Mule Palette, select HTTP Listener and drag it onto the canvas.
The source initiates the flow by listening for incoming HTTP message attributes. -
In Connector configuration, select HTTP_Listener_config.
-
Set Path to
/test
. -
Drag a Transform message component to the right of Listener.
-
In the DataSense preview window of the Transform message component, add the following DataWeave expression, which sets to each HTTP request payload a
Source Name
as"group-based"
and aSource ID
asrandom() as String
. The expression also obtains theGroupId
value of the request based onmessage.attributes['queryParams'].groupId
:%dw 2.0 output application/json --- { "Source Name": "group-based", "GroupId": message.attributes['queryParams'].groupId, "Source ID": (random() as String) }
-
Drag the Group based aggregator scope to the right of Transform message.
-
Set Name to
groupBasedAggregator
. -
Set Content to
payload
, which is the expression that defines what to aggregate: in this case, the transformed HTTP requests. -
Set Group id to
payload.GroupId
, which is the expression to evaluate for every new message received that gets the group ID where the element is aggregated. -
Set Group size to
3
, which is the maximum size of elements to aggregate to the group with the group ID resolved. -
Drag a Logger component into the Incremental aggregation route.
-
In the Logger configuration screen, set Message to
Doing incremental aggregation.
-
Drag a Logger component into the Aggregation complete route.
-
Set Message to
Aggregation complete
. -
Drag another Logger component to the right of the first Logger component in Aggregation complete.
-
Set Message to the following expression, which returns all the aggregated elements:
output application/json --- payload
-
Save the project.
-
In Package Explorer, click Run > Run As > Mule Application.
-
Test the app by sending the following curl command twice: +
curl -X POST http://localhost:8081/test?groupId=1
.
This generates two HTTP requests to aggregate to the group ID1
. -
Send the command
curl -X POST http://localhost:8081/test?groupId=2
to aggregate one request to the group ID2
. -
Send another command
curl -X POST http://localhost:8081/test?groupId=1
to aggregate a request to group ID1
. -
Navigate to the Console view to read the logger message:
INFO 2021-06-14 16:34:33,921 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/0/processors/0; event: 8beae010-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation
INFO 2021-06-14 16:34:43,618 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/0/processors/0; event: 91b93be0-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation
INFO 2021-06-14 16:34:47,203 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/0/processors/0; event: 93d8e790-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation
INFO 2021-06-14 16:34:51,380 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/1/processors/0; event: 96566ab0-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Aggregation complete
INFO 2021-06-14 16:34:51,409 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/1/processors/1; event: 96566ab0-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
{
"Source Name": "group-based",
"GroupId": "1",
"Source ID": "0.5127046416902801"
},
{
"Source Name": "group-based",
"GroupId": "1",
"Source ID": "0.20239307122656847"
},
{
"Source Name": "group-based",
"GroupId": "1",
"Source ID": "0.7505233647798346"
}
]
This example logger message shows that for the first two HTTP requests with group ID 1
and the third request with group ID 2
, the Logger component in the Incremental aggregator route logs the message Doing incremental aggregation.
After you send the fourth HTTP request, which is the third request with group ID 1
, the aggregation group 1
is complete because of the group size of 3
calls per group. The first Logger component in the Aggregation complete route logs the message Aggregation complete
, and the second Logger component returns the aggregated payload elements of the group, which are the random ID numbers.
If you send another two commands curl -X POST http://localhost:8081/test?groupId=2
to aggregate calls to group 2
, the Console view returns the logger message:
INFO 2021-06-14 16:53:56,262 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/0/processors/0; event: 40c07200-cd4a-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation
INFO 2021-06-14 16:53:58,686 [[MuleRuntime].uber.01: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/1/processors/0; event: 42329fa0-cd4a-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Aggregation complete
INFO 2021-06-14 16:53:58,689 [[MuleRuntime].uber.01: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/1/processors/1; event: 42329fa0-cd4a-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
{
"Source Name": "group-based",
"GroupId": "2",
"Source ID": "0.8390326267445475"
},
{
"Source Name": "group-based",
"GroupId": "2",
"Source ID": "0.8931962767228987"
},
{
"Source Name": "group-based",
"GroupId": "2",
"Source ID": "0.6514448303865373"
}
]
XML for Group-Based Aggregator Example
Paste this code into your Studio XML editor to quickly load the flow for this example into your Mule app:
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:aggregators="http://www.mulesoft.org/schema/mule/aggregators" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/aggregators http://www.mulesoft.org/schema/mule/aggregators/current/mule-aggregators.xsd">
<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="c4a97ca0-c01f-4ce9-9922-380eb0defda2" >
<http:listener-connection host="0.0.0.0" port="8081" />
</http:listener-config>
<flow >
<http:listener config-ref="HTTP_Listener_config" path="/test" allowedMethods="POST"/>
<ee:transform >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
"Source Name": "group-based",
"GroupId": message.attributes['queryParams'].groupId,
"Source ID": (random() as String)
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<aggregators:group-based-aggregator name="groupBasedAggregator" evictionTime="0" groupSize="3" groupId="#[payload.GroupId]">
<aggregators:incremental-aggregation >
<logger level="INFO" message="Doing incremental aggregation"/>
</aggregators:incremental-aggregation>
<aggregators:aggregation-complete >
<logger level="INFO" message="Aggregation complete"/>
<logger level="INFO" message="#[output application/json
---
payload]"/>
</aggregators:aggregation-complete>
</aggregators:group-based-aggregator>
</flow>
</mule>