Contact Us 1-800-596-4880

AMQP Connector Examples

There are many ways to use the AMQP connector and endpoints; the following examples demonstrate common use cases.

Connection Fallback

You can define a list of <host>:<port> or <host> brokers to try to connect to in case the main one fails to connect. If only the host is specified, the default port is used.

<amqp:connector name="amqpConnectorWithFallback"
  host="rabbit1"
  port="9876"
  fallbackAddresses="rabbit1:9875,rabbit2:5672,rabbit3"
  virtualHost="mule-test"
  username="my-user"
  password="my-pwd" />

The fallbackAddresses attribute is a comma-separated list of brokers that you can specify as "host:port" or "host". The list defines the fallback brokers that AMQP attempts to connect to, should the connection to the main broker (master broker) fail. The addresses are used along with the main one each time a connection is attempted: if the first attempt fails (main address), the next address is used to attempt connecting, and so on. This means that the connection only changes if there’s a failure and the reconnection strategy is triggered.

Fallback attempts are in order that you define the addresses - the main address is always first and the fallback addresses after.

Reconnection Strategy

The fallback reconnection strategy makes connection attempts in the order of the brokers that you define in the fallbackAddresses attribute. The first broker you specify in the list is called the main address or the master broker. The order you specify subsequent brokers defines the order that each broker is checked if the master broker fails. The non-master brokers are only connected to if the master broker fails. If the master broker fails, control passes to the next broker in the list. If that broker fails and if the master broker has returned to being online, control passes back to the master broker. If the master broker is still offline, control passes to the next broker in the list that is online.

If fallback is not set and the master broker is not responding, AMQP fails. If fallback is set and the master broker is not responding, fallback occurs, and control moves to the next broker in the list.

Listening to Messages with Exchange Redeclaration and Queue Creation

This is a typical AMQP pattern where consumers redeclare the exchanges they intend to bind queues to.

<amqp:connector name="amqpAutoAckLocalhostConnector"
  virtualHost="my-vhost"
  username="my-user"
  password="my-pwd"
  activeDeclarationsOnly="true" />

<amqp:inbound-endpoint exchangeName="my-exchange"
  exchangeType="fanout"
  exchangeAutoDelete="false"
  exchangeDurable="true"
  queueName="my-queue"
  queueDurable="false"
  queueExclusive="false"
  queueAutoDelete="true"
  connector-ref="amqpAutoAckLocalhostConnector" />

Listening to Messages with Exchange Redeclaration and Private Queue Creation

In this variation of the previous example, Mule creates an exclusive server-named, auto-delete, non-durable queue; and binds it to the re-declared exchange.

<amqp:connector name="amqpAutoAckLocalhostConnector"
  virtualHost="my-vhost"
  username="my-user"
  password="my-pwd"
  activeDeclarationsOnly="true" />

<amqp:inbound-endpoint exchangeName="my-exchange"
  exchangeType="fanout"
  exchangeAutoDelete="false"
  exchangeDurable="true"
  connector-ref="amqpAutoAckLocalhostConnector" />

Listening to Messages on a Pre-Existing Exchange

In this mode, the inbound connection fails if the exchange doesn’t pre-exist.

This behavior is enforced by activeDeclarationsOnly=false, which means that Mule strictly ensures the pre-existence of the exchange before trying to subscribe to it.

<amqp:connector name="amqpAutoAckStrictLocalhostConnector"
  virtualHost="my-vhost"
  username="my-user"
  password="my-pwd"
  activeDeclarationsOnly="false" />

<amqp:inbound-endpoint exchangeName="my-exchange"
  queueName="my-queue"
  queueDurable="false"
  queueExclusive="false"
  queueAutoDelete="true"
  queueName="my-queue"
  connector-ref="amqpAutoAckStrictLocalhostConnector" />

Listening to Messages on a Pre-existing Queue

Similarly to the previous example, the inbound connection fails if the queue doesn’t pre-exist.

<amqp:connector name="amqpAutoAckStrictLocalhostConnector"
  virtualHost="my-vhost"
  username="my-user"
  password="my-pwd"
  activeDeclarationsOnly="false" />

<amqp:inbound-endpoint queueName="my-queue"
  connector-ref="amqpAutoAckStrictLocalhostConnector" />

Listening to Messages on a Declared But Unbound Queue

In this case, the queue is declared but not bound to any custom exchange, except the default exchange to which all declared queues are bound by default.

To trigger queue creation, it’s necessary to configure any of the queueDurable, queueAutoDelete or queueExclusive attributes.

<amqp:inbound-endpoint queueName="my-queue"
  queueDurable="true" />

Manual Message Acknowledgment and Rejection

So far, all incoming messages were automatically acknowledged by the AMQP client.

The following example shows how to manually acknowledge or reject messages within a flow, based on criteria of your choice.

<amqp:connector name="amqpManualAckLocalhostConnector"
  virtualHost="my-vhost"
  username="my-user"
  password="my-pwd"
  ackMode="MANUAL" />

<flow name="amqpChoiceAckNackService">
  <amqp:inbound-endpoint queueName="my-queue"
     connector-ref="amqpManualAckLocalhostConnector" />
  <choice>
    <when ...condition...>
      <amqp:acknowledge-message />
    </when>
    <otherwise>
      <amqp:reject-message requeue="true" />
    </otherwise>
  </choice>
</flow>

Manual Channel Recovery

To manually recover the channel that is associated with the current message, use:

<amqp:reject-message />

If you want the messages to be re-queued, use:

<amqp:reject-message requeue="true" />

Flow Control

Expanding on the previous example, it is possible to throttle the delivery of messages by configuring the connector accordingly.

The following demonstrates a connector that fetches messages one by one and a flow that uses manual acknowledgment to throttle message delivery.

<amqp:connector name="amqpThrottledConnector"
  virtualHost="my-vhost"
  username="my-user"
  password="my-pwd"
  prefetchCount="1"
  ackMode="MANUAL" />

<flow name="amqpManualAckService">
  <amqp:inbound-endpoint queueName="my-queue"
  connector-ref="amqpThrottledConnector" />
  <!--
  components, routers... go here
  -->
  <amqp:acknowledge-message />
</flow>

Publishing Messages to a Redeclared Exchange

This is a typical AMQP pattern where producers redeclare the exchanges they intend to publish to.

<amqp:connector name="amqpLocalhostConnector"
  virtualHost="my-vhost"
  username="my-user"
  password="my-pwd"
  activeDeclarationsOnly="true" />

<amqp:outbound-endpoint routingKey="my-key"
  exchangeName="my-exchange"
  exchangeType="fanout"
  exchangeAutoDelete="false"
  exchangeDurable="false"
  connector-ref="amqpLocalhostConnector" />

Publishing Messages to a Pre-Existing Exchange

It is also possible to publish to a pre-existing exchange:

<amqp:outbound-endpoint exchangeName="my-exchange"
  connector-ref="amqpLocalhostConnector" />

It may be desirable to strictly enforce the existence of this exchange before publishing to it. This is done by configuring the connector to perform passive declarations:

<amqp:connector name="amqpStrictLocalhostConnector"
  virtualHost="my-vhost"
  username="my-user"
  password="my-pwd"
  activeDeclarationsOnly="false" />

<amqp:outbound-endpoint routingKey="my-key"
  exchangeName="my-exchange"
  connector-ref="amqpStrictLocalhostConnector" />

Declaring and Binding an Outbound Queue

It’s also possible to declare the queue in outbound endpoints, as shown below:

<amqp:outbound-endpoint exchangeName="amqpOutBoundQueue-exchange"
  exchangeType="fanout"
  queueName="amqpOutBoundQueue-queue"
  queueDurable="true" />

Note that the queue is declared and bound in a lazy fashion, that is, only when the outbound endpoint is used.

Message-Level Override of Exchange and Routing Key

It’s possible to override some outbound endpoint attributes with outbound-scoped message properties:

  • routing-key overrides the routingKey attribute,

  • exchange overrides the exchangeName attribute.

Mandatory and Immediate Deliveries and Returned Message Handling

The connector supports the mandatory and immediate publication flags, as described below.

If a message sent with this connector can’t be delivered, the AMQP broker returns it asynchronously.

The AMQP transport offers the possibility of dispatching these returned messages to user-defined endpoints for custom processing.

You can define the endpoint in charge of handling returned messages at the connector level. Here is an example that targets a VM endpoint:

<vm:endpoint name="flowReturnedMessageChannel" path="flow.returnedMessages" />

<flow name="amqpMandatoryDeliveryFailureFlowHandler">
  <!--
  inbound endpoint, components, routers ...
  -->

  <amqp:return-handler>
    <defaultReturnListener-ref="flowReturnedMessageChannel" />
  </amqp:return-handler>

  <amqp:outbound-endpoint routingKey="my-key"
    exchangeName="my-exchange"
    connector-ref="mandatoryAmqpConnector" />
</flow>

It’s also possible to define the returned message endpoint at flow level:

<vm:endpoint name="flowReturnedMessageChannel" path="flow.returnedMessages" />

<flow name="amqpMandatoryDeliveryFailureFlowHandler">
  <!--
  inbound endpoint, components, routers ...
  -->

  <amqp:return-handler>
    <vm:outbound-endpoint ref="flowReturnedMessageChannel" />
  </amqp:return-handler>

  <amqp:outbound-endpoint routingKey="my-key"
    exchangeName="my-exchange"
    connector-ref="mandatoryAmqpConnector" />
</flow>

If both are configured, the handler defined in the flow supersedes the one defined in the connector. If none is configured, Mule logs a warning with the full details of the returned message.

Request-Response Publication

It’s possible to perform synchronous (request-response) outbound operations:

<amqp:outbound-endpoint routingKey="my-key"
  exchange-pattern="request-response"
  exchangeName="my-exchange"
  connector-ref="amqpLocalhostConnector" />

In this case, Mule:

  • Creates a temporary auto-delete private reply queue

  • Sets it as the reply-to property of the current message

  • Publishes the message to the specified exchange

  • Waits for a response to be sent to the reply-queue (via the default exchange)

Transaction Support

AMQP local transactions are supported by using the standard Mule transaction configuration element. For example, the following code declares an AMQP inbound endpoint that starts a new transaction for each newly-received message:

<amqp:inbound-endpoint queueName="amqpTransactedBridge-queue"
        connector-ref="amqpConnector">
    <amqp:transaction action="ALWAYS_BEGIN" />
</amqp:inbound-endpoint>

The following declares a transacted AMQP bridge:

<bridge name="amqpTransactedBridge" exchange-pattern="one-way" transacted="true">
    <amqp:inbound-endpoint queueName="amqpTransactedBridge-queue"
           connector-ref="amqpConnector">
        <amqp:transaction action="ALWAYS_BEGIN" />
    </amqp:inbound-endpoint>
    <amqp:outbound-endpoint exchangeName="amqpOneWayBridgeTarget-exchange"
           connector-ref="amqpConnector">
        <amqp:transaction action="ALWAYS_JOIN" />
    </amqp:outbound-endpoint>
</bridge>

If an error occurs while processing the message after the inbound endpoint, the transaction automatically rolls back. Otherwise the transaction commits after successful dispatch in the outbound endpoint.

By default, no channel recovery is performed upon rollback. To modify this behavior, configure a recoverStrategy attribute on the transaction element, as shown below.

<amqp:transaction action="ALWAYS_BEGIN" recoverStrategy="REQUEUE" />

Valid values for the recoverStrategy option are: NONE, NO_REQUEUE and REQUEUE.

Transactions in AMQP do not behave like JMS transactions. It is strongly recommend that you read this overview of transaction support in AMQP 0.91 before using transactions. It is important to understand that when a transaction starts on a Mule-managed channel (for example, via <amqp:transaction action="ALWAYS_BEGIN"/>), this channel remains transactional throughout its lifetime.

Exchange and Queue Declaration Arguments

AMQP supports custom arguments during the declaration of exchanges and queues. The AMQP connector supports these custom arguments, which you must pass as endpoint properties with names prefixed with amqp-exchange. or amqp-queue., for exchange or queue arguments respectively.

The below example declares a global endpoint that uses the alternate-exchange argument during the exchange declaration and the x-dead-letter-exchange argument during the queue declaration:

<amqp:endpoint name="amqpEndpointWithArguments" exchangeName="target-exchange"
    exchangeType="fanout" exchangeDurable="true" exchangeAutoDelete="false"
    queueName="target-queue" queueDurable="true" queueAutoDelete="false"
    queueExclusive="true" routingKey="a.b.c">
    <properties>
        <spring:entry key="amqp-exchange.alternate-exchange"
            value="some-exchange" />
        <spring:entry key="amqp-queue.x-dead-letter-exchange"
            value="some-queue" />
    </properties>
</amqp:endpoint>

Programmatic Message Requesting

It is possible to programmatically get messages from an AMQP queue.

To achieve this, you first need to build an URI that identifies the AMQP queue that you want to consume from. Here is the syntax to use, with optional parameters in square brackets:

amqp://[${exchangeName}/]amqp-queue.${queueName}[?connector=${connectorName}[&...other parameters...]]

For example, the following identifies a pre-existing queue named "my-queue", to be consumed with a unique AMQP connector available in the Mule configuration:

amqp://amqp-queue.my-queue

The below example creates and binds a non-durable, auto-delete, non-exclusive queue named "new-queue" to a pre-existing exchange named "my-exchange", with the provided routing key on the specified connector:

amqp://my-exchange/amqp-queue.new-queue?connector=amqpAutoAckLocalhostConnector&queueDurable=false&queueExclusive=false&queueAutoDelete=true

Once the URI is defined, it is possible to retrieve a message from the queue using the Mule Client, as shown in the following code sample.

MuleMessage message = new MuleClient(muleContext).request("amqp://amqp-queue.my-queue", 2500L);

The above waits for 2.5 seconds for a message, after which it returns null if no message has come up in the queue.

SSL Connectivity

The transport can connect to the broker using SSLv3 or TLS. To do so, use the AMQPS connector with the XML namespace declaration listed below.

xmlns:amqps="http://www.mulesoft.org/schema/mule/amqps"
http://www.mulesoft.org/schema/mule/amqps
http://www.mulesoft.org/schema/mule/amqps/current/mule-amqps.xsd

Connect using SSLv3 (default) and use a trust manager that accepts all certificates as valid:

<amqps:connector name="amqpsDefaultSslConnector" />

Connect using TLS and use a trust manager that accepts all certificates as valid:

<amqps:connector name="amqpsTlsConnector" sslProtocol="TLS" />

Connect using SSLv3 (default) and use a custom trust manager:

<amqps:connector name="amqpsTrustManagerConnector" sslTrustManager-ref="myTrustManager" />

Connect using TLS and use a custom trust manager:

<amqps:connector name="amqpsTlsTrustManagerConnector" sslProtocol="TLS" sslTrustManager-ref="myTrustManager" />

Connect with key and trust stores:

<amqps:connector name="amqpsTlsKeyStores">
  <amqps:ssl-key-store path="keycert.p12" type="PKCS12" algorithm="SunX509" keyPassword="MySecretPassword" storePassword="MySecretPassword" />
  <amqps:ssl-trust-store path="trustStore.jks" type="JKS" algorithm="SunX509" storePassword="rabbitstore" />
</amqps:connector>

See Also

  • Access the AMQP Connector Reference for a complete list and description of all AMQP connector configuration attributes.

View on GitHub