Aggregate Messages Pending in Queue with VM Connector Examples

The following examples show two scenarios on how to aggregate messages pending in a queue using Anypoint Connector for Virtual Machine (VM Connector).

Note that VM is a queue and you cannot perform a for each operation over every message pending on a queue.

First Example of Aggregating Messages in a Queue

In this example, you use the Aggregators module to aggregate the received messages by the VM Listener operation and then iterate over the result of the aggregation.

Aggregate Messages Pending in Queue first flow in Anypoint Studio canvas

Create the First Flow

In the first flow, an HTTP Listener source listens for incoming HTTP messages. Then a VM Publish operation publishes specific content in a VM queue:

  1. In Studio, create a new Mule project.

  2. In Mule Palette, select the HTTP > Listener source and drag it on to the canvas.

  3. On the HTTP Listener configuration screen, set the Path field to /.

  4. Click the plus sign (+) next to the Connector configuration field to configure a global element that can be used by all instances of the HTTP Listener in the app.

  5. Complete the required fields for the HTTP global configuration.

  6. Click OK.

  7. Drag a For Each component to the right of the HTTP Listener source.

  8. Set Collection to 1 to 200.

  9. Drag a VM > Publish operation into the For Each component.

  10. Set Content to "hello world".

  11. Click the plus sign (+) next to the Connector configuration field to configure a global element that can be used by all instances of the VM Publish operation in the app.

  12. In the VM Config window, click the plus sign (+) to add a new queue.

  13. Set Queue name to demo.

  14. Click Finish.

  15. Click OK.

Create the Second Flow

In the second flow, a VM Listener operation listens to the messages published in the VM queue. The Time based aggregator scope aggregates the received messages until the time condition is reached.

  1. In Studio, drag a VM > Listener operation below the first flow.

  2. For Connector configuration, select the previously created VM_Config configuration.

  3. Set Queue name to demo.

  4. Drag a Time based aggregator scope to the right of the VM Listener.

  5. Set Name to demoaggregator.

  6. Set Period to 5 to aggregate the received messages until 5 seconds is reached.

Create the Third Flow

In this flow, an Aggregator listener operation listens to the messages aggregated by the Time based aggregator scope. The For Each component iterates over the messages and a Logger component logs the content of each message.

  1. In Studio, drag an Aggregator listener source above the second flow.

  2. Set Aggregator name to demoaggregator.

  3. Drag a For Each component to the right of the Aggregator listener source.

  4. Set Collection to [#payload].

  5. Drag a Logger component into the For Each component.

  6. Set Message to [#payload].

  7. Save and run your Mule app.

  8. In your terminal, run the curl command curl http://localhost:8081/.

XML for the First Example of Aggregating Messages in a Queue

Paste this code into the Studio XML editor to quickly load the flow for this example into your Mule app:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:aggregators="http://www.mulesoft.org/schema/mule/aggregators" xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
	xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/aggregators http://www.mulesoft.org/schema/mule/aggregators/current/mule-aggregators.xsd">
	<http:listener-config name="HTTP_Listener_config" basePath="/" >
		<http:listener-connection host="0.0.0.0" port="8081" />
	</http:listener-config>
	<vm:config name="VM_Config">
		<vm:connection />
		<vm:queues >
			<vm:queue queueName="demo" />
		</vm:queues>
	</vm:config>
	<flow name="demo-vm2Flow" >
		<http:listener config-ref="HTTP_Listener_config" path="/"/>
		<foreach collection="1 to 200">
			<vm:publish config-ref="VM_Config" queueName="demo" >
				<vm:content ><![CDATA[#["hello world"]]]></vm:content>
			</vm:publish>
		</foreach>
	</flow>
	<flow name="demo-vm2Flow2" >
		<aggregators:aggregator-listener aggregatorName="demoaggregator" includeTimedOutGroups="true"/>
		<foreach collection="#[payload]">
			<logger level="INFO" message="#[payload]"/>
		</foreach>
	</flow>
	<flow name="demo-vm2Flow1" >
		<vm:listener queueName="demo" config-ref="VM_Config"/>
		<aggregators:time-based-aggregator  name="demoaggregator" period="5">
		</aggregators:time-based-aggregator>
	</flow>
</mule>

Second Example of Aggregating Messages in a Queue

In this example, a For Each component iterates over messages that were consumed by the VM Consume operation. The Mule app appends every received message into a variable, and logs the payload content for each message. Note that in this example scenario you get slices of messages until no messages are available on the queue. When this occurs, the VM Consume operation throws a timeout message handled by the On Error Continue component and then the results are available for iteration.

Aggregate Messages Pending in Queue second flow in Anypoint Studio canvas

Create the First Flow

In the first flow, an HTTP Listener source listens for incoming HTTP messages, and the VM Publish operation publishes specific content in a VM queue:

  1. In Studio, create a new Mule project.

  2. In Mule Palette, select the HTTP > Listener source and drag it on to the canvas.

  3. On the HTTP Listener configuration screen, set the Path field to /.

  4. Click the plus sign (+) next to the Connector configuration field to configure a global element that can be used by all instances of the HTTP Listener in the app.

  5. Complete the required fields for the HTTP global configuration.

  6. Click OK.

  7. Drag a For Each component to the right of the HTTP Listener source.

  8. Set Collection to 1 to 55.

  9. Drag a VM > Publish operation into the For Each component.

  10. Set Content to "test world".

  11. Click the plus sign (+) next to the Connector configuration field to configure a global element that can be used by all instances of the VM Publish operation in the app.

  12. In the VM Config window, click the plus sign (+) to add a new queue.

  13. Set Queue name to demo.

  14. Click Finish.

  15. Click OK.

Create the Second Flow

In the second flow, a VM Consume operation pulls the published messages from the queue. The Transform Message component transforms the payload content and save it in a variable. The Mule app logs each variable payload.

  1. In Studio, drag a Scheduler source below the first flow.

  2. Set Frequency to 5000.

  3. Drag a Set Variable component to the right of the Scheduler component.

  4. Set Name to result.

  5. Set Value to [].

  6. Drag a Try scope component to the right of Set Variable.

  7. Drag a For Each component into the Try scope.

  8. Set Collection to 1 to 10000.

  9. Drag a VM Consume operation into the For Each component.

  10. For Connector configuration, select the previously created VM_Config configuration.

  11. Set Queue name to demo.

  12. Drag a Transform Message component to the right of Consume.

  13. Set the DataWeave script to:

%dw 2.0
output application/java
---
vars.result << payload
  1. Drag an On Error Continue component into the Error handling section of the Try scope component.

  2. Set Type to VM:EMPTY_QUEUE.

  3. Drag a Set Payload component to the right of the Try scope component.

  4. Set Value to vars.result.

  5. Drag a For Each component to the right of the Set Payload component.

  6. Set Collection to payload.

  7. Drag a Logger component into the For each scope component.

  8. Set Message to payload.

  9. Save and run your Mule app.

  10. In your terminal, run the curl command curl http://localhost:8081/.

XML for the Second Example of Aggregating Messages in a Queue

Paste this code into the Studio XML editor to quickly load the flow for this example into your Mule app:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core" xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
	xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd">
	<http:listener-config name="HTTP_Listener_config" basePath="/" >
		<http:listener-connection host="0.0.0.0" port="8081" />
	</http:listener-config>
	<vm:config name="VM_Config" >
		<vm:connection />
		<vm:queues >
			<vm:queue queueName="demo" />
		</vm:queues>
	</vm:config>
	<flow name="demo-vmFlow1" >
		<http:listener config-ref="HTTP_Listener_config" path="/"/>
		<foreach collection="1 to 55">
			<vm:publish config-ref="VM_Config" queueName="demo" >
				<vm:content ><![CDATA[#["test message"]]]></vm:content>
			</vm:publish>
		</foreach>
	</flow>
	<flow name="demo-vmFlow">
		<scheduler>
			<scheduling-strategy>
				<fixed-frequency frequency="5000" />
			</scheduling-strategy>
		</scheduler>
		<set-variable value="#[[]]" variableName="result" />
		<try>
			<foreach collection="#[1 to 10000]">
				<vm:consume config-ref="VM_Config" queueName="demo" />
				<ee:transform>
					<ee:message>
						<ee:set-payload><![CDATA[%dw 2.0
output application/java
---
vars.result << payload
]]></ee:set-payload>
					</ee:message>
					<ee:variables>
						<ee:set-variable variableName="result"><![CDATA[%dw 2.0
output application/java
---
vars.result<<payload]]></ee:set-variable>
					</ee:variables>
				</ee:transform>
			</foreach>
			<error-handler>
				<on-error-continue enableNotifications="true" logException="false" type="VM:EMPTY_QUEUE" />
			</error-handler>
		</try>
		<set-payload value="#[vars.result]" />
		<foreach collection="#[payload]" counterVariableName="message">
			<logger level="INFO" message="#[payload]" />
		</foreach>
	</flow>
</mule>