Contact Us 1-800-596-4880

VM Connector Propagate Message Attributes Example - Mule 4

The following example illustrates how you can propagate message attributes through a VM queue via a message payload. The first flow takes incoming HTTP message attributes and additionally creates a custom attribute of its own, and then adds both of these attributes into a message payload so the VM Publish operation can publish the message to a VM queue. In the second flow, the VM Listener source receives the message payload from the previous flow and sets its own message attributes, and the message payload is finally stored in an object store.

The following screenshot shows the Anypoint Studio app flow for this example:

VM Connector example flow
Figure 1. VM Connector Propagate message attributes flow

Create the VM Publishing Flow

The VM publishing flow first listens for incoming HTTP message attributes and then transforms the message attributes into a JSON structure message. Finally the VM Publish operation publishes the message in a VM queue.

To create the VM publishing flow:

  1. Create a new Mule project in Studio.

  2. In the Mule Palette view, select the HTTP Listener source and drag it into the canvas.
    The source listens for incoming HTTP messages attributes.

  3. On the HTTP Listener configuration screen, set the Path field to /message.

  4. Click the plus sign (+) next to the Connector configuration field to configure a global element that can be used by all instances of the HTTP Listener in the app.

  5. In the Connection section, set Protocol to HTTP (Default), Host to All Interfaces [0.0.0.0] (default), and Port to 8081.

  6. Set Base path to /publish.

  7. Click Test Connection to confirm that Mule can connect with the specified server.

  8. Click OK.

  9. Drag the Transform Message component to the right of HTTP Listener.
    The component transforms the incoming HTTP message attributes into a JSON structure that contains the listenerPath and method attributes, as well as a new custom sizeAttribute attribute.

  10. In the Output Payload script view of the Transform Message component, define the listenerPath and method attributes using the following DataWeave code:

%dw 2.0
output application/json
---
{
listenerPath: attributes.listenerPath,
method: attributes.method
}
  1. In the Output Attributes script view of the Transform Message component, define a new custom sizeAttribute attribute using the following DataWeave code:

%dw 2.0
output application/json
---
{
sizeAttribute: sizeOf(attributes.listenerPath) + sizeOf(attributes.method)
}
  1. Drag the VM Publish operation to the right of the Transform Message component.
    The operation publishes the created JSON message in the VM queue.

  2. On the VM Publish configuration screen, click the plus sign (+) next to the Connector configuration field to configure a global element for the VM queue.

  3. On the VM Config window, for Queues select Edit inline.

  4. On the Queue window, set Queue name to testqueue1, Queue type to TRANSIENT (Default), and Max outstanding message to o.

  5. Click Finish.

  6. Click OK.

In Studio, the VM Config global element configuration looks like this:

VM Config global element configuration
Figure 2. VM Config global element configuration
  1. In the General > Content field of the VM Publish configuration screen, define the following DataWeave code:

%dw 2.0
output application/json
---
{'listenerPath': payload.listenerPath, 'method': payload.method, 'messageSize': attributes.sizeAttribute }

In Studio, the VM Publish operation configuration looks like this:

VM Publish operation configuration
Figure 3. VM Publish operation configuration

Create the VM Listening Flow

The VM listening flow listens for incoming messages attributes in the VM queue from the previous VM Publish flow, then set its own attributes via the VM Listener source. Finally stores the message payload in an object store.

To create the VM listening flow:

  1. In the Mule Palette view, select the VM Listener source and drag it onto the canvas below the first VM Publish flow.
    The source listens for the incoming message attributes from the previous flow.

  2. On the VM Listener configuration screen, click the plus sign (+) next to the Connector configuration field and select the VM queue configuration created in the VM Publish flow.

In Studio, the VM Listener source configuration looks like this:

VM Listener source configuration
Figure 4. VM Listener source configuration
  1. Drag a Logger component to the right of the VM Listener.

  2. On the Logger configuration screen, set the Message field to payload.

  3. Drag another Logger component to the right of the first Logger component.

  4. On the Logger configuration screen, set the Message field to 'VM message attributes: queue name ' ++ attributes.queueName as String.

  5. Drag an ObjectStore Store operation to the right of the second Logger component.
    The operation stores the received 'method':'listenerPath' pair attributes in an object store.

  6. On the ObjectStore Store configuration screen, set the Display Name field to Add Http Request.

  7. In the General section, set Key to payload.method and Value to payload.listenerPath.

  8. Click the plus sign (+) next to the Object Store field to set the global object store configuration.

  9. Set the default values and click OK.

  10. Save the project and run the app.

  11. Test the app by sending a request to http://127.0.0.1:8081/publish/message.

XML for Propagating Message Attributes Through the VM Queue

Paste this code into the Studio XML editor to quickly load the flow for this example into your Mule app:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:sockets="http://www.mulesoft.org/schema/mule/sockets"
	xmlns:os="http://www.mulesoft.org/schema/mule/os" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
	xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
	xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
http://www.mulesoft.org/schema/mule/os http://www.mulesoft.org/schema/mule/os/current/mule-os.xsd
http://www.mulesoft.org/schema/mule/sockets http://www.mulesoft.org/schema/mule/sockets/current/mule-sockets.xsd">

	<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" basePath="/publish" >
		<http:listener-connection host="0.0.0.0" port="8081" />
	</http:listener-config>

	<vm:config name="VM_Config" doc:name="VM Config">
		<vm:connection />
		<vm:queues>
	        <vm:queue queueName="testQueue1" queueType="TRANSIENT"/>
	    </vm:queues>
	</vm:config>

	<os:object-store name="Object_store" doc:name="Object store"/>
	<flow name="vm-publishFlow" >
		<http:listener doc:name="Listener" config-ref="HTTP_Listener_config" path="/message"/>
		<ee:transform doc:name="Transform Message">
			<ee:message>
				<ee:set-payload><![CDATA[%dw 2.0
					output application/json
					---
					{
						listenerPath: attributes.listenerPath,
						method: attributes.method
					}
					]]>
				</ee:set-payload>
				<ee:set-attributes><![CDATA[%dw 2.0
				output application/json
				---
				{
					sizeAttribute: sizeOf(attributes.listenerPath) + sizeOf(attributes.method)
				}
				]]></ee:set-attributes>
			</ee:message>
		</ee:transform>
		<vm:publish doc:name="Publish" config-ref="VM_Config" queueName="testQueue1">
			<vm:content ><![CDATA[#[%dw 2.0
			output application/json
			---
			{'listenerPath': payload.listenerPath, 'method': payload.method, 'messageSize': attributes.sizeAttribute }]]]></vm:content>
		</vm:publish>
	</flow>
	<flow name="vm-listenerFlow" >
		<vm:listener doc:name="Listener" config-ref="VM_Config" queueName="testQueue1"/>
		<logger level="INFO" doc:name="Logger"  message="payload"/>
		<logger level="INFO" doc:name="Logger"  message="'VM message attributes: queue name ' ++ attributes.queueName as String"/>
		<os:store doc:name=" Add Http Request" key="payload.method" objectStore="Object_store">
			<os:value ><![CDATA[payload.listenerPath]]></os:value>
		</os:store>
	</flow>
</mule>
View on GitHub