Size-Based Aggregator Example - Mule 4

The Aggregators module Size based aggregator scope enables you to aggregate elements until a predefined size number of elements completes the aggregation. The following example shows how to configure this scope to aggregate up to 3 HTTP request calls. The application uses DataWeave to set the payload of each call with a random ID number. When the aggregation reaches the 3 calls required for completion, the application logs all the aggregated random ID number elements.

Size-Based Aggregator Flow Example in Anypoint Studio Canvas
Figure 1. Size-Based Aggregator Flow Example

To test this example in your own environment, you must create the Mule application and test it with curl commands.

Create the Mule Application

To create the Mule flow in Anypoint Studio:

  1. In Mule Palette, select HTTP Listener and drag it onto the canvas.
    The source initiates the flow by listening for incoming HTTP message attributes.

  2. In Connector configuration, select HTTP_Listener_config.

  3. Set Path to /test.

  4. Drag a Transform message component to the right of Listener.

  5. In the DataSense preview window of the Transform message component, add the following DataWeave expression, which sets to each HTTP request payload a Source Name as "size-based" and a Source ID as random() as String:

    %dw 2.0
    output application/json
    ---
    {
        "Source Name": "size-based",
        "Source ID": (random() as String)
    }
  6. Drag the Size based aggregator scope to the right of Transform message.

  7. Set Name to sizeBasedAggregator.

  8. Set Max size to 3.

  9. Set Content to payload, which is the expression that defines what to aggregate: in this case, the transformed HTTP requests.

  10. Drag a Logger component into the Incremental aggregation route.

  11. In the Logger configuration screen, set Message to Doing incremental aggregation step.

  12. Drag a Logger component into the Aggregation complete route.

  13. Set Message to Aggregation complete.

  14. Drag another Logger component to the right of the first Logger component in Aggregation complete.

  15. Set Message to the following expression, which returns all the aggregated elements:

    output application/json
    ---
    payload
  16. Save the project.

  17. In Package Explorer, click Run > Run As > Mule Application.

  18. Test the app by sending the following curl command three times:
    curl -X POST http://localhost:8086/test
    This generates three HTTP requests for the Size based aggregator to process.

  19. Navigate to the Console view to read the logger message:

INFO  2021-06-08 11:18:44,897 [[MuleRuntime].uber.02: [aggregator-size-demo].aggregator-size-demoFlow.CPU_INTENSIVE @20a8a5b4] [processor: aggregator-size-demoFlow/processors/1/route/0/processors/0; event: 6ee63ec0-c864-11eb-b5a7-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation step.
INFO  2021-06-08 11:18:48,570 [[MuleRuntime].uber.02: [aggregator-size-demo].aggregator-size-demoFlow.CPU_INTENSIVE @20a8a5b4] [processor: aggregator-size-demoFlow/processors/1/route/0/processors/0; event: 71283f80-c864-11eb-b5a7-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation step.
INFO  2021-06-08 11:18:50,189 [[MuleRuntime].uber.02: [aggregator-size-demo].aggregator-size-demoFlow.CPU_INTENSIVE @20a8a5b4] [processor: aggregator-size-demoFlow/processors/1/route/1/processors/0; event: 721efb90-c864-11eb-b5a7-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Aggregation complete
INFO  2021-06-08 11:18:50,254 [[MuleRuntime].uber.02: [aggregator-size-demo].aggregator-size-demoFlow.CPU_INTENSIVE @20a8a5b4] [processor: aggregator-size-demoFlow/processors/1/route/1/processors/1; event: 721efb90-c864-11eb-b5a7-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
  {
    "Source Name": "size-based",
    "Source ID": "0.49127866969631506"
  },
  {
    "Source Name": "size-based",
    "Source ID": "0.4250959269275998"
  },
  {
    "Source Name": "size-based",
    "Source ID": "0.4160926509813687"
  }
]

This example logger message shows that for the first two HTTP requests, the Logger component in the Incremental aggregator route logs the message Doing incremental aggregation step. After the aggregation reaches the third HTTP request call, the first Logger component in the Aggregation complete route logs the message Aggregation complete, and the second Logger component returns the aggregated payload elements, which are the random ID numbers:

{
  "Source Name": "size-based",
  "Source ID": "0.49127866969631506"
},
{
  "Source Name": "size-based",
  "Source ID": "0.4250959269275998"
},
{
  "Source Name": "size-based",
  "Source ID": "0.4160926509813687"
}

XML for Size-Based Aggregator Example

Paste this code into your Studio XML editor to quickly load the flow for this example into your Mule app:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:aggregators="http://www.mulesoft.org/schema/mule/aggregators"
	xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
	xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/aggregators http://www.mulesoft.org/schema/mule/aggregators/current/mule-aggregators.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd">
	<http:listener-config name="HTTP_Listener_config">
		<http:listener-connection host="0.0.0.0" port="8086" />
	</http:listener-config>
	<flow name="aggregator-size-demoFlow" >
		<http:listener config-ref="HTTP_Listener_config" path="/test" allowedMethods="POST"/>
		<ee:transform >
			<ee:message >
				<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
    "Source Name": "size-based",
    "Source ID": (random() as String)
}]]></ee:set-payload>
			</ee:message>
		</ee:transform>
		<aggregators:size-based-aggregator name="sizeBasedAggregator" maxSize="3">
			<aggregators:incremental-aggregation>
				<logger level="INFO" message="Doing incremental aggregation step." />
			</aggregators:incremental-aggregation>
			<aggregators:aggregation-complete>
				<logger level="INFO" message="Aggregation complete" />
				<logger level="INFO" message="#[output application/json
---
payload]" />
			</aggregators:aggregation-complete>
		</aggregators:size-based-aggregator>
	</flow>
</mule>