Contact Us 1-800-596-4880

Group-Based Aggregator Example - Mule 4

The Aggregators module Group based aggregator scope enables you to aggregate elements into groups by group ID. The following example shows how to configure this scope to aggregate up to 3 HTTP request calls per group based on the group ID sent in the request. The application uses DataWeave to set the payload of each call with a random ID number. For each group that completes the aggregation of 3 elements, the application logs all the aggregated random ID number elements of the group.

Group-Based Aggregator Flow Example in Anypoint Studio Canvas
Figure 1. Group-Based Aggregator Flow Example

To test this example in your own environment, you must create the Mule application and test it with curl commands.

Create the Mule Application

To create the Mule flow in Anypoint Studio:

  1. In Mule Palette, select HTTP Listener and drag it onto the canvas.
    The source initiates the flow by listening for incoming HTTP message attributes.

  2. In Connector configuration, select HTTP_Listener_config.

  3. Set Path to /test.

  4. Drag a Transform message component to the right of Listener.

  5. In the DataSense preview window of the Transform message component, add the following DataWeave expression, which sets to each HTTP request payload a Source Name as "group-based" and a Source ID as random() as String. The expression also obtains the GroupId value of the request based on message.attributes['queryParams'].groupId:

    %dw 2.0
    output application/json
    ---
    {
        "Source Name": "group-based",
        "GroupId": message.attributes['queryParams'].groupId,
        "Source ID": (random() as String)
    }
  6. Drag the Group based aggregator scope to the right of Transform message.

  7. Set Name to groupBasedAggregator.

  8. Set Content to payload, which is the expression that defines what to aggregate: in this case, the transformed HTTP requests.

  9. Set Group id to payload.GroupId, which is the expression to evaluate for every new message received that gets the group ID where the element is aggregated.

  10. Set Group size to 3, which is the maximum size of elements to aggregate to the group with the group ID resolved.

  11. Drag a Logger component into the Incremental aggregation route.

  12. In the Logger configuration screen, set Message to Doing incremental aggregation.

  13. Drag a Logger component into the Aggregation complete route.

  14. Set Message to Aggregation complete.

  15. Drag another Logger component to the right of the first Logger component in Aggregation complete.

  16. Set Message to the following expression, which returns all the aggregated elements:

    output application/json
    ---
    payload
  17. Save the project.

  18. In Package Explorer, click Run > Run As > Mule Application.

  19. Test the app by sending the following curl command twice: + curl -X POST http://localhost:8081/test?groupId=1 .
    This generates two HTTP requests to aggregate to the group ID 1.

  20. Send the command curl -X POST http://localhost:8081/test?groupId=2 to aggregate one request to the group ID 2.

  21. Send another command curl -X POST http://localhost:8081/test?groupId=1 to aggregate a request to group ID 1.

  22. Navigate to the Console view to read the logger message:

INFO  2021-06-14 16:34:33,921 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/0/processors/0; event: 8beae010-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation
INFO  2021-06-14 16:34:43,618 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/0/processors/0; event: 91b93be0-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation
INFO  2021-06-14 16:34:47,203 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/0/processors/0; event: 93d8e790-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation
INFO  2021-06-14 16:34:51,380 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/1/processors/0; event: 96566ab0-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Aggregation complete
INFO  2021-06-14 16:34:51,409 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/1/processors/1; event: 96566ab0-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
  {
    "Source Name": "group-based",
    "GroupId": "1",
    "Source ID": "0.5127046416902801"
  },
  {
    "Source Name": "group-based",
    "GroupId": "1",
    "Source ID": "0.20239307122656847"
  },
  {
    "Source Name": "group-based",
    "GroupId": "1",
    "Source ID": "0.7505233647798346"
  }
]

This example logger message shows that for the first two HTTP requests with group ID 1 and the third request with group ID 2, the Logger component in the Incremental aggregator route logs the message Doing incremental aggregation.
After you send the fourth HTTP request, which is the third request with group ID 1, the aggregation group 1 is complete because of the group size of 3 calls per group. The first Logger component in the Aggregation complete route logs the message Aggregation complete, and the second Logger component returns the aggregated payload elements of the group, which are the random ID numbers.

If you send another two commands curl -X POST http://localhost:8081/test?groupId=2 to aggregate calls to group 2, the Console view returns the logger message:

INFO  2021-06-14 16:53:56,262 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/0/processors/0; event: 40c07200-cd4a-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation
INFO  2021-06-14 16:53:58,686 [[MuleRuntime].uber.01: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/1/processors/0; event: 42329fa0-cd4a-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Aggregation complete
INFO  2021-06-14 16:53:58,689 [[MuleRuntime].uber.01: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/1/processors/1; event: 42329fa0-cd4a-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
  {
    "Source Name": "group-based",
    "GroupId": "2",
    "Source ID": "0.8390326267445475"
  },
  {
    "Source Name": "group-based",
    "GroupId": "2",
    "Source ID": "0.8931962767228987"
  },
  {
    "Source Name": "group-based",
    "GroupId": "2",
    "Source ID": "0.6514448303865373"
  }
]

XML for Group-Based Aggregator Example

Paste this code into your Studio XML editor to quickly load the flow for this example into your Mule app:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:aggregators="http://www.mulesoft.org/schema/mule/aggregators" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
	xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/aggregators http://www.mulesoft.org/schema/mule/aggregators/current/mule-aggregators.xsd">
	<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="c4a97ca0-c01f-4ce9-9922-380eb0defda2" >
		<http:listener-connection host="0.0.0.0" port="8081" />
	</http:listener-config>
	<flow >
		<http:listener config-ref="HTTP_Listener_config" path="/test" allowedMethods="POST"/>
		<ee:transform  >
			<ee:message >
				<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
    "Source Name": "group-based",
    "GroupId": message.attributes['queryParams'].groupId,
    "Source ID": (random() as String)
}]]></ee:set-payload>
			</ee:message>
		</ee:transform>
		<aggregators:group-based-aggregator name="groupBasedAggregator" evictionTime="0" groupSize="3" groupId="#[payload.GroupId]">
			<aggregators:incremental-aggregation >
				<logger level="INFO" message="Doing incremental aggregation"/>
			</aggregators:incremental-aggregation>
			<aggregators:aggregation-complete >
				<logger level="INFO" message="Aggregation complete"/>
				<logger level="INFO" message="#[output application/json
---
payload]"/>
			</aggregators:aggregation-complete>
		</aggregators:group-based-aggregator>
	</flow>
</mule>
View on GitHub