Configure Size-Based Aggregator Timeout and Max Size Fields - Mule 4

To understand the Aggregators module Size based aggregator scope timeout behavior, review the following diagram that explains how the Timeout and Max size fields work. Then learn how to configure these fields in Anypoint Studio and in the XML editor in a Mule application example.

aggregator size timeout diagram
Figure 1. Size-Based Aggregator Timeout and Max Size behavior diagram
  • The Max size field indicates the total number of elements to be aggregated before considering the aggregation complete.

  • The Timeout field indicates the maximum time to wait for the aggregation to complete.

  • The first process indicates that the aggregation is complete because the total aggregated elements reached the number required before the timeout occurred.

  • The second process indicates that the aggregation is not complete because the total aggregated elements did not reach the number required before the timeout occurred, which causes an aggregation reset.

  • The third process indicates that you can use an Aggregator listener source to capture the elements that were aggregated before the timeout occurred. The source triggers when the timeout occurs.

If you choose to use an Aggregator listener source to capture incomplete aggregation elements, you must select the Include timed out group field to indicate that the source is triggered when the timeout occurs.

Configure the Mule Application

The following example shows how to configure the Size based Aggregator scope to aggregate up to 3 HTTP request calls within 10 seconds. The application uses DataWeave to set the payload of each call with a random ID number. When the aggregation reaches the 3 calls required for completion and before the timeout occurs, the application logs all the aggregated elements. Otherwise, the application logs only the aggregated elements captured by an Aggregator listener source when the timeout occurred.

Size-Based Aggregator Flow Example in Anypoint Studio Canvas
Figure 2. Size-Based Aggregator Flow Example

To test this example in your own environment, you must create the Mule application and test it with curl commands.

To create the Mule flow in Anypoint Studio:

  1. In Mule Palette, select HTTP Listener and drag it onto the canvas.
    The source initiates the flow by listening for incoming HTTP message attributes.

  2. In Connector configuration, select HTTP_Listener_config.

  3. Set Path to /test.

  4. Drag a Transform message component to the right of Listener.

  5. In the DataSense preview window of the Transform message component, add the following DataWeave expression, which sets to each HTTP request payload a Source Name as "size-based" and a Source ID as random() as String:

    %dw 2.0
    output application/json
    ---
    {
        "Source Name": "size-based",
        "Source ID": (random() as String)
    }
  6. Drag the Size based aggregator scope to the right of Transform message.

  7. Set Name to sizeBasedAggregator.

  8. Set Max size to 3.

  9. Set Timeout to 10.

  10. Set Content to payload, which is the expression that defines what to aggregate: in this case, the transformed HTTP requests.

  11. Drag a Logger component into the Incremental aggregation route.

  12. In the Logger configuration screen, set Message to Doing incremental aggregation step.

  13. Drag a Logger component into the Aggregation complete route.

  14. Set Message to Aggregation complete.

  15. Drag another Logger component to the right of the first Logger component in Aggregation complete.

  16. Set Message to the following expression, which returns all the aggregated elements:

    output application/json
    ---
    payload
  17. Drag an Aggregator listener source below the first flow.

  18. Set Aggregator name to sizeBasedAggregator, which listens to the previous aggregator.

  19. Select Include timed out groups, which indicates whether the listener should be triggered when the timeout occurs.

  20. Drag a Logger component to the right of the Aggregation listener source.

  21. Set Message to the following expression, which returns only the aggregated elements before the timeout occurs:

    output application/json
    ---
    payload
  22. Save the project.

  23. In Package Explorer, click Run > Run As > Mule Application.

  24. Test the app by sending the following curl command within 10 seconds:
    curl -X POST http://localhost:8086/test

  25. Navigate to the Console view to read the logger message:

INFO  2021-06-22 12:59:16,319 [[MuleRuntime].uber.12: [aggregator-size-demo].aggregator-size-demoFlow.CPU_INTENSIVE @7992f689] [processor: aggregator-size-demoFlow/processors/1/route/0/processors/0; event: cbc219c0-d372-11eb-b975-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation step.
INFO  2021-06-22 12:59:26,328 [[MuleRuntime].uber.12: [aggregator-size-demo].aggregator-size-demoFlow1.CPU_LITE @684a1bef] [processor: aggregator-size-demoFlow1/processors/0; event: d6b48f00-d36f-11eb-b975-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
  {
    "Source Name": "size-based",
    "Source ID": "0.6568747259174192"
  }
]

This example logger message shows that for the first HTTP request, the Logger component in the Incremental aggregator route logs the message Doing incremental aggregation step. After the aggregation reaches the 10 seconds, the timeout occurs. The Aggregation listener source captures the only HTTP request call and the subsequent Logger component returns the aggregated payload element, which is the random ID number.

XML for Size-Based Aggregator Timeout Max Size Example

Paste this code into your Studio XML editor to quickly load the flow for this example into your Mule app:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:aggregators="http://www.mulesoft.org/schema/mule/aggregators"
	xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
	xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/aggregators http://www.mulesoft.org/schema/mule/aggregators/current/mule-aggregators.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd">
	<http:listener-config name="HTTP_Listener_config">
		<http:listener-connection host="0.0.0.0" port="8086" />
	</http:listener-config>
	<flow name="aggregator-size-demoFlow"  >
		<http:listener doc:name="Listener"  config-ref="HTTP_Listener_config" path="/test" allowedMethods="POST"/>
		<ee:transform doc:name="Transform Message" >
			<ee:message >
				<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
    "Source Name": "size-based",
    "Source ID": (random() as String)
}]]></ee:set-payload>
			</ee:message>
		</ee:transform>
		<aggregators:size-based-aggregator doc:name="Size based aggregator"  name="sizeBasedAggregator" maxSize="3" timeout="10">
			<aggregators:incremental-aggregation>
				<logger level="INFO"  message="Doing incremental aggregation step." />
			</aggregators:incremental-aggregation>
			<aggregators:aggregation-complete>
				<logger level="INFO"  message="Aggregation complete" />
				<logger level="INFO" message="#[output application/json
---
payload]" />
			</aggregators:aggregation-complete>
		</aggregators:size-based-aggregator>
	</flow>
	<flow name="aggregator-size-demoFlow1"  >
		<aggregators:aggregator-listener  aggregatorName="sizeBasedAggregator" includeTimedOutGroups="true"/>
		<logger level="INFO" message="#[output application/json&#10;---&#10;payload]"/>
	</flow>
</mule>