Time-Based Aggregator Example - Mule 4

The Aggregators module Time based aggregator scope enables you to aggregate elements within a predefined time limit. The following example shows how to configure this scope to aggregate HTTP request calls within 5 seconds. The application uses DataWeave to set the payload of each call with a random ID number. When the aggregation reaches 5 seconds, an Aggregator listener source gets the list of elements and the application logs all the aggregated random ID numbers.

Time-Based Aggregator Flow Example in Anypoint Studio Canvas
Figure 1. Time-Based Aggregator Flow Example

To test this example in your own environment, you must create the Mule application and test it with curl commands.

Create the Mule Application

To create the Mule flow in Anypoint Studio:

  1. In Mule Palette, select HTTP Listener and drag it onto the canvas.
    The source initiates the flow by listening for incoming HTTP message attributes.

  2. In Connector configuration, select HTTP_Listener_config.

  3. Set Path to /test.

  4. Drag a Transform message component to the right of Listener.

  5. In the DataSense preview window of the Transform message component, add the following DataWeave expression, which sets to each HTTP request payload a Source Name as "time-based" and a Source ID as random() as String:

    %dw 2.0
    output application/json
    ---
    {
        "Source Name": "time-based",
        "Source ID": (random() as String)
    }
  6. Drag the Time based aggregator scope to the right of Transform message.

  7. Set Name to timeBasedAggregator.

  8. Set Period to 5.

  9. Set Period unit to SECONDS(Default).

  10. Set Content to payload, which is the expression that defines what to aggregate; in this case, the transformed HTTP requests.

  11. Drag a Logger component into the Incremental aggregation route.

  12. In the Logger configuration screen, set Message to Doing incremental step.

  13. Drag an Aggregator listener source below the first flow.
    This source listens to the previous defined time-based aggregator. Once the aggregator releases its elements, the listener is executed.

  14. Set Aggregator name to timeBasedAggregator.

  15. Drag a Logger component to the right of Aggregator listener

  16. Set Message to Aggregation complete.

  17. Drag another Logger component to the right of the previous added Logger component.

  18. Set Message to the following expression, which returns all the aggregated elements:

    output application/json
    ---
    payload
  19. Save the project.

  20. In Package Explorer, click Run > Run As > Mule Application.

  21. Test the app by sending the following curl command twice within 5 seconds:
    curl -X POST http://localhost:8086/test
    This generates two HTTP requests for the Time based aggregator to process.

  22. Navigate to the Console view to read the logger message:

INFO  2021-06-09 12:02:04,203 [[MuleRuntime].uber.04: [aggregator-time-demo].aggregator-time-demoFlow.CPU_INTENSIVE @59b15997] [processor: aggregator-time-demoFlow/processors/1/route/0/processors/0; event: a6b00280-c933-11eb-a825-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental step.
INFO  2021-06-09 12:02:06,071 [[MuleRuntime].uber.04: [aggregator-time-demo].aggregator-time-demoFlow.CPU_INTENSIVE @59b15997] [processor: aggregator-time-demoFlow/processors/1/route/0/processors/0; event: a7cd0b40-c933-11eb-a825-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental step.
INFO  2021-06-09 12:02:09,206 [[MuleRuntime].uber.04: [aggregator-time-demo].aggregator-time-demoOnCompleteFlow.CPU_LITE @2ab3c271] [processor: aggregator-time-demoOnCompleteFlow/processors/0; event: 827c9591-c933-11eb-a825-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Aggregation complete
INFO  2021-06-09 12:02:09,208 [[MuleRuntime].uber.04: [aggregator-time-demo].aggregator-time-demoOnCompleteFlow.CPU_LITE @2ab3c271] [processor: aggregator-time-demoOnCompleteFlow/processors/1; event: 827c9591-c933-11eb-a825-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
  {
    "Source Name": "time-based",
    "Source ID": "0.7095971146328517"
  },
  {
    "Source Name": "time-based",
    "Source ID": "0.6900116288202388"
  }
]

This example logger message shows that for each HTTP request, the Logger component in the Incremental aggregator route logs the message Doing incremental step. After the timeBasedAggregator value that is referenced by the Aggregator listener source completes the incremental aggregations within 5 seconds, the listener is triggered with a list of all the elements. The subsequent Logger component logs the message Aggregation complete, and the second Logger component returns the aggregated payload elements, which are the random ID numbers:

{
  "Source Name": "time-based",
  "Source ID": "0.7095971146328517"
},
{
  "Source Name": "time-based",
  "Source ID": "0.6900116288202388"
}

XML for Time-Based Aggregator Example

Paste this code into your Studio XML editor to quickly load the flow for this example into your Mule app:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:aggregators="http://www.mulesoft.org/schema/mule/aggregators" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
	xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/aggregators http://www.mulesoft.org/schema/mule/aggregators/current/mule-aggregators.xsd">
	<http:listener-config name="HTTP_Listener_config" >
		<http:listener-connection host="0.0.0.0" port="8086" />
	</http:listener-config>
	<flow name="aggregator-time-demoFlow" >
		<http:listener config-ref="HTTP_Listener_config" path="/test" allowedMethods="POST"/>
		<ee:transform >
			<ee:message >
				<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
    "Source Name": "time-based",
    "Source ID": (random() as String)
}]]></ee:set-payload>
			</ee:message>
		</ee:transform>
		<aggregators:time-based-aggregator period="5" name="timeBasedAggregator">
			<aggregators:incremental-aggregation >
				<logger level="INFO" message="Doing incremental step."/>
			</aggregators:incremental-aggregation>
		</aggregators:time-based-aggregator>
	</flow>
	<flow name="aggregator-time-demoOnCompleteFlow" >
		<aggregators:aggregator-listener  aggregatorName="timeBasedAggregator"/>
		<logger level="INFO" message="Aggregation complete" />
		<logger level="INFO" message="#[output application/json
---
payload]" />
	</flow>
</mule>