Contact Us 1-800-596-4880

Migrating to the AMQP Connector

This version of Mule reached its End of Life on May 2, 2023, when Extended Support ended.

Deployments of new applications to CloudHub that use this version of Mule are no longer allowed. Only in-place updates to applications are permitted.

MuleSoft recommends that you upgrade to the latest version of Mule 4 that is in Standard Support so that your applications run with the latest fixes and security enhancements.

As in other cases the AMQP connector was completely reshaped and evolved away from the Mule 3 transport model into an operation based connector with a simplified UX and fully DataSense enabled Message information. This means not only that using AMQP now provides the same experience regarding connection and configuration that any other connector, but also that publishing and consuming messages is simpler based on the structured Message information.

Configuring The Connector

Moving from a 3.x transport configuration to a AMQP Connector configuration in Mule 4 implies basically using most of the same parameters, but declaring them in a more cohesive way. Some parameters changed in Mule 4 AMQP Connector. This means for example that parameters that are used for consuming Messages are declared in a 'consumer-config' group, while general authentication parameters are configured in the General group. Also, a separation has been made between the parameters that affect the behavior of the connector (present in the config) from those that affect only how the connection is established (present at connection level):

Mule 3 Example: Configuring The Connector
<amqp:connector name="AMQP_Config"
     fallbackAddresses="192.168.0.1,192.168.0.2"
     virtualHost="/"
     username="guest"
     password="guest"
	 deliveryMode="TRANSIENT"
	 priority="1"
	 ackMode="AUTO"
	 activeDeclarationsOnly="false"
	 mandatory="false"
	 immediate="false"
	 prefetchSize="16"
	 prefetchCount="16"
	 noLocal="false"
	 exclusiveConsumers="false"
	 requestBrokerConfirms="false"
	 numberOfChannels="10"
	  />

This same configuration is achieved in Mule 4 setting the same parameters in the context where they actually apply:

Mule 4 Example: Configuring The Connector
<amqp:config name="AMQP_Config">
	<amqp:connection host="localhost" port="5672"
		virtualHost="/" username="guest" password="guest" />
	<amqp:consumer-config numberOfConsumers="10" exclusiveConsumers="false" noLocal="false" ackMode="AUTO"/>
	<amqp:publisher-config requestBrokerConfirms="false" mandatory="false" immediate="false" priority="1" deliveryMode="TRANSIENT"/>
	<amqp:quality-of-service prefetchSize="16" prefetchCount="16" />
</amqp:config>

Notice that fallbackAddresses is not supported anymore.

Sending Messages

With the new Mule 4 approach, the AMQP 'publish' operation relies only on its input parameters to completely build the AMQP Message to be published.

For example, if we wanted to send a high priority AMQP Message with only a part of the payload in the body, and associate that Message to a group, we’ll need to: <1>) Use transform to set the payload to what the Message body is expected to be. <2>) Convert the resulting stream into a String to send it as a text message. <3>) Set an AMQP property with priority as key to set the AMQP Message Priority.

Mule 3 Example: Sending a prioritized Message as part of a Group
<flow name="AmqpTransportOutbound">
    <http:listener config-ref="HTTP_Listener_Configuration" path="/orders"/>
    <dw:transform-message> (1)
        <dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
{
order_id: payload.id,
supplier: payload.warehouse
}]]></dw:set-payload>
    </dw:transform-message>
    <object-to-string-transformer/> (2)
    <amqp:outbound-endpoint exchangeName="testExchange" connector-ref="AMQP_Connector" >
      <message-properties-transformer scope="outbound">
          <add-message-property key="priority" value="9"/> (3)
      </message-properties-transformer>
    </amqp:outbound-endpoint>
</flow>

The same results can be achieved in Mule 4 using the AMQP Connector with the following configuration:

Mule 4 Example: Sending a prioritized Message as part of a Group
<flow name="AMQPConnectorPublish">
		<http:listener config-ref="HTTP_Listener_config" path="/orders"/>
		(2)
		<amqp:publish config-ref="AMQP_Config" exchangeName="targetExchange">
			<amqp:message> (1)
				<amqp:body>#[output application/json ---
        {
          order_id: payload.id,
          supplier: payload.warehouse
        }]</amqp:body>
        			<amqp:properties priority="3"/> (3)
			</amqp:message>
		</amqp:publish>
	</flow>

Differences to be noted:

1) There’s no need of the transform component, since the body of the Message is created inline, thus the payload remains unmodified. 2) The object-to-string transformer was also removed, since the Connector can automatically handle the transformation output. 3) Priority is set as a property in the AmqpMessage operation and does not rely on the user knowing the exact key.

As a summary, when publishing a Message in 3.x with the AMQP transport, we relied on the AmqpMessage payload, and outbound properties to configure the creation of the AMQP Message, which meant a deeper knowledge of how the transport worked. In 4.x, the AMQP Connector exposes every configurable element as a parameter in the scope were it belongs, thus exposing all the AMQP functionality in a clearer way.

Listening For New Messages

The AMQP transport inbound-endpoint allows you to wait for new AMQP Messages on a given queue. The output of this listener will contain the body of the message in the payload, and all the AMQP headers and properties encompassed in the AMQP Attributes.

Mule 3 Example: Listening For Messages
<flow name="AMQPTransportInbound">
  <amqp:inbound-endpoint connector-ref="AMQP_Connector" queueName="in" />
  <dw:transform-message> (2)
      <dw:set-payload><![CDATA[%dw 1.0
        %output application/json
        ---
        {
        items: payload,
        costumer: message.inboundProperties.'costumer_id'
        }]]></dw:set-payload>
  </dw:transform-message>
  <object-to-string-transformer/>  (3)
  <amqp:outbound-endpoint exchangeName="v2/prime/orders" connector-ref="AMQP_Connector"/>  (4)
</flow>

In this case, we are listening for Messages and then adapting them to the new format required:

1) Transform the MuleMessage using the metadata contained in the inboundProperties so the payload matches the new JSON format we need for the new API. 2) Convert the transformed payload to a JSON String. 3) Publish the payload to the defined exchange.

Implementing the same in Mule 4 looks like this:

Mule 4 Example: Listening For Messages
<flow name="AMQPConnectorPublish">
  <amqp:listener config-ref="AMQP_Config" queueName="in" /> (1)
  <amqp:publish config-ref="AMQP_Config" exchangeName="ordersExchange"> (2)
    <amqp:message>
      <amqp:body>#[output application/json ---
      {
        items: payload,
        costumer: attributes.properties.userProperties.costumer_id, (3)
        type: attributes.headers.type
      }]</amqp:body>
    </amqp:message>
  </amqp:publish>
</flow>

Now, the flow has fewer components and is not required to modify the Message payload to publish with a different format:

1 Definition of the new message is done inline, so it only creates the JSON for the new Message body.
2 We use the message 'attributes' POJO instead of the 'inboundProperties', which now differentiate the 'headers' of the AMQP Messsage from the 'properties'.

Consuming Messages

Consuming Messages mid-flow from a given destination was not supported by Mule’s 3 AMQP transport, and the way to go was also adding the 'Mule Requester Module' to your application, which would then handle the mid-flow message consume.

So, for example, if you wanted to expose your AMQP Queue, your application would be similar to this:

Mule 3 Example: Consuming Messages Mid-Flow
<flow name="ordersFromAMQP">
  <http:inbound-endpoint exchange-pattern="request-response" path="orders" host="localhost" port="8081"/>
  <scripting:transformer doc:name="AMQP Message Listening">
    <scripting:script engine="Groovy"><![CDATA[
org.mule.api.MuleMessage message = new org.mule.module.client.MuleClient(muleContext).request('amqp://recordsyntactic_exchange/amqp-queue?connector=AMQP_0_9_Connector&exchangeType=direct&queueDurable=true&exchangeDurable=true&queueAutoDelete=true', 10000L);
]]></scripting:script>
</flow>

Some things to notice here are:

  • All metadata regarding AMQP Message is completely lost, so logging the CorrelationId relies on you knowing the syntax to obtain the header.

  • We need both the AMQP and the configuration for the queue in the request.

Mule 4 comes out of the box with the capability of consuming messages mid-flow by using 'consume' operation. This operation is very similar to the listener we saw before, with the difference that it can be used anywhere in the flow:

Mule 4 Example: Consuming Messages Mid-Flow
<flow name="ordersFromAMQP">
  <http:listener config-ref="HTTP_Listener_config" path="/orders"/>
  <amqp:consume config-ref="config"  queueName="Orders" />
  <logger level="INFO" message="#['CorrelationId: ' ++ attributes.properties.correlationId]"/>
</flow>

Doing Request-Reply

AMQP allows you to use the reply_to property to perform a implement the RPC pattern. This can be done either with a temporary temporary exclusive reply queue that is created on the fly by the client, or using an already existing queue.

Request Reply With Temporary Auto-delete Private Reply Queue

In Mule 3, for the first case where the reply queue is a temporary exclusive queue that will be discarded once the message arrives, we have the "request-response" exchange-pattern in the outbound endpoint:

Mule 3 Example: Doing Request-Reply With Temporary Reply Destination
<flow name="amqpRequestReplyTemporaryDestination">
  <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8080" path="invoices"/>
  <dw:transform-message>
      <dw:set-payload><![CDATA[%dw 1.0
        %output application/xml
        ---
        {
        data: payload,
        costumer: message.inboundProperties."http.query.params".costumer_id
        }]]></dw:set-payload>
  </dw:transform-message>
  <object-to-string-transformer/>
  <amqp:outbound-endpoint exchange-pattern="request-response" queueName="invoiceProcessor" connector-ref="AMQP_Connector"/>
  <logger level="INFO" message="Status: #[payload]">
</flow>

Instead, in Mule 4 you have a brand new operation called publish-consume which aims to solve this specific use case:

Mule 4 Example: Doing Request-Reply With Temporary Reply Destination
<flow name="amqpRequestReplyTemporaryDestination">
  <http:listener config-ref="HTTP_Listener_config" path="/invoices"/>
  <amqp:publish-consume config-ref="AMQP_Config" exchangeName="invoiceProcessor">
    <amqp:message>
      <amqp:body>#[output application/xml ---
      {
        data: payload,
        costumer: attributes.queryParams.costumer_id
      }]</amqp:body>
    </amqp:message>
  </amqp:publish-consume>
  <logger level="INFO" message="#['Status: ' ++ payload]">
</flow>

You may see that, again, the building of the Message is done inline of the operation, in the message element, and any transformation or configuration that affects the outgoing Message will be done as part of that element.

Request Reply With Explicit Reply To Queue

Doing a request-reply with an explicit reply-to queue was can be done in Mule 4 through the replyTo property:

Mule 4 Example: Doing Request-Reply With Explicit Reply To
<flow name="amqpRequestReplyTemporaryDestination">
  <http:listener config-ref="HTTP_Listener_config" path="/invoices"/>
  <amqp:publish-consume config-ref="AMQP_Config" exchangeName="targetExchange">
    <amqp:message>
      <amqp:body>#[output application/xml ---
      {
        data: payload,
        costumer: attributes.queryParams.costumer_id
      }]</amqp:body>
      <amqp:properties replyTo="replyToQueue" />
    </amqp:message>
  </amqp:publish-consume>
  <logger level="INFO" message="#['Status: ' ++ payload]">
</flow>

Using Transactions

Transactions support is quite similar in its configuration when moving from 3.x to 4.x, with the expected change from it being configured in the inbound-endpoint and outbound-endpoint to the normalized Mule 4 approach for operations and sources:

Mule 3 Example: Using Transactions
<flow name="transactedAmqpFlow">
    <amqp:inbound-endpoint queue=Name"${in}">
        <amqp:transaction action="ALWAYS_BEGIN" /> (1)
    </amqp:inbound-endpoint>
    <set-variable variableName="originalPayload" value="#[payload]"/> (2)
    <dw:transform-message> (3)
        <dw:set-payload><![CDATA[%dw 1.0
          %output application/xml
          ---
          payload
          ]]></dw:set-payload>
    </dw:transform-message>
    <object-to-string-transformer/>
    <amqp:outbound-endpoint exchangeName="${out}"> (4)
        <amqp:transaction action="ALWAYS_JOIN"/>
    </amqp:outbound-endpoint>
    <default-exception-strategy>
        <commit-transaction exception-pattern="*"/> (5)
        <set-payload value="#[flowVars.originalPayload]"/> (6)
        <amqp:outbound-endpoint queue="dead.letter"> (7)
            <amqp:transaction action="JOIN_IF_POSSIBLE"/>
        </amqp:outbound-endpoint>
    </default-exception-strategy>
</flow>

Things to note are:

1 Transaction is initiated by the inbound endpoint with ALWAYS_BEGIN
2 We make sure not to loose the original payload
3 Payload is transformed so it can be sent through the outbound endpoint
4 The outbound endpoint is configured to ALWAYS_JOIN
5 We set up the exception strategy to catch all exceptions
6 Original payload is restored so the original message is published to the dead.letter
7 Finally we send the original message to the dead.letter attempting to join to the current transaction.

Same scenario can be implemented in Mule 4 with the following approach:

Mule 4 Example: Using Transactions
<flow name="transactedAmqpFlow">
    <amqp:listener config-ref="AMQP_Config" queueName="${in}" transactionalAction="ALWAYS_BEGIN"/> (1)
    <amqp:publish config-ref="AMQP_Config" destination="${out}" transactionalAction="ALWAYS_JOIN"> (2)
        <amqp:message>
            <amqp:body>#[output application/xml --- payload</amqp:body>
        </amqp:message>
    </amqp:publish>
    <error-handler>
        <on-error-continue type="ANY"> (3)
          <amqp:publish config-ref="AMQP_Config" exchangeName="dead.letter" transactionalAction="JOIN_IF_POSSIBLE"> (4)
          	<amqp:routing-keys>
				<amqp:routing-key value="dead.letter" />
			</amqp:routing-keys>
          </amqp:publish>
        </on-error-continue>
    </error-handler>
</flow>
1 Transaction is initiated by the listener with ALWAYS_BEGIN
2 Publishing of the payload in XML format is done by the publish operation without modifying the current payload, also joining the transaction with ALWAYS_JOIN
3 An error handler that catches any error occurred is used to make sure the Message is not lost
4 Since the current payload is still the original Message received, we just publish it to the dead.letter using the JOIN_IF_POSSIBLE transactional action

Enforcing the Existence of Queue and Exchanges

In Mule 3, if you set activeDeclarationsOnly to false and the declared queue or exchange does not exist, the AMQP connector throws a ShutdownSignalException involving a 404 error. You can enforce the existence of a queue or exchange in Mule 4 using the createFallbackQueue and createFallbackExchange parameters.

<amqp:config name="Amqp_Config" createFallbackQueue="false" createFallbackQueue="false">
    <amqp:connection host="localhost" username="guest" password="guest" />
</amqp:config>