Nav
You are viewing an older version of this section. Click here to navigate to the latest version.

AMQP Connector Examples

There are many ways to use the AMQP connector and endpoints; the following examples demonstrate some common use cases.

Connection Fallback

It’s possible to define a list of <host>:<port> or <host> to try to connect to in case the main one fails to connect. If only the host is specified, the default port is used.


         
      
1
2
3
4
5
6
7
<amqp:connector name="amqpConnectorWithFallback"
                host="rabbit1"
                port="9876"
                fallbackAddresses="rabbit1:9875,rabbit2:5672,rabbit3"
                virtualHost="mule-test"
                username="my-user"
                password="my-pwd" />

Listening to Messages with Exchange Re-declaration and Queue Creation

This is a typical AMQP pattern where consumers redeclare the exchanges they intend to bind queues to.


         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<amqp:connector name="amqpAutoAckLocalhostConnector"
                virtualHost="my-vhost"
                username="my-user"
                password="my-pwd"
                activeDeclarationsOnly="true" />
 
<amqp:inbound-endpoint exchangeName="my-exchange"
                       exchangeType="fanout"
                       exchangeAutoDelete="false"
                       exchangeDurable="true"
                       queueName="my-queue"
                       queueDurable="false"
                       queueExclusive="false"
                       queueAutoDelete="true"
                       connector-ref="amqpAutoAckLocalhostConnector" />

Listening to Messages with Exchange Re-declaration and Private Queue Creation

In this variation of the previous example, Mule creates an exclusive server-named, auto-delete, non-durable queue; and binds it to the re-declared exchange.


         
      
1
2
3
4
5
6
7
8
9
10
11
<amqp:connector name="amqpAutoAckLocalhostConnector"
                virtualHost="my-vhost"
                username="my-user"
                password="my-pwd"
                activeDeclarationsOnly="true" />
 
<amqp:inbound-endpoint exchangeName="my-exchange"
                       exchangeType="fanout"
                       exchangeAutoDelete="false"
                       exchangeDurable="true"
                       connector-ref="amqpAutoAckLocalhostConnector" />

Listening to Messages on a Pre-existing Exchange

In this mode, the inbound connection will fail if the exchange doesn’t pre-exist.

This behavior is enforced by activeDeclarationsOnly=false, which means that Mule will strictly ensure the pre-existence of the exchange before trying to subscribe to it.


         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
<amqp:connector name="amqpAutoAckStrictLocalhostConnector"
                virtualHost="my-vhost"
                username="my-user"
                password="my-pwd"
                activeDeclarationsOnly="false" />
 
<amqp:inbound-endpoint exchangeName="my-exchange"
                       queueName="my-queue"
                       queueDurable="false"
                       queueExclusive="false"
                       queueAutoDelete="true"
                       queueName="my-queue"
                       connector-ref="amqpAutoAckStrictLocalhostConnector" />

Listening to Messages on a Pre-existing Queue

Similarly to the previous example, the inbound connection will fail if the queue doesn’t pre-exist.


         
      
1
2
3
4
5
6
7
8
<amqp:connector name="amqpAutoAckStrictLocalhostConnector"
                virtualHost="my-vhost"
                username="my-user"
                password="my-pwd"
                activeDeclarationsOnly="false" />
 
<amqp:inbound-endpoint queueName="my-queue"
                       connector-ref="amqpAutoAckStrictLocalhostConnector" />

Listening to Messages on a Declared But Unbound Queue

In this case, the queue is declared but not bound to any custom exchange, except the default exchange to which all declared queues are bound by default.

To trigger queue creation, it’s necessary to configure any of the queueDurable, queueAutoDelete or queueExclusive attributes.


         
      
1
2
<amqp:inbound-endpoint queueName="my-queue"
                       queueDurable="true" />

Manual Message Acknowledgment and Rejection

So far, all incoming messages were automatically acknowledged by the AMQP client.

The following example shows how to manually acknowledge or reject messages within a flow, based on criteria of your choice.


         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<amqp:connector name="amqpManualAckLocalhostConnector"
                virtualHost="my-vhost"
                username="my-user"
                password="my-pwd"
                ackMode="MANUAL" />
 
<flow name="amqpChoiceAckNackService">
  <amqp:inbound-endpoint queueName="my-queue"
                         connector-ref="amqpManualAckLocalhostConnector" />
  <choice>
    <when ...condition...>
      <amqp:acknowledge-message />
    </when>
    <otherwise>
      <amqp:reject-message requeue="true" />
    </otherwise>
  </choice>
</flow>

Manual Channel Recovery

To manually recover the channel that is associated with the current message, use:


         
      
1
<amqp:reject-message />

If you want the messages to be re-queued, use:


         
      
1
<amqp:reject-message requeue="true" />

Flow Control

Expanding on the previous example, it is possible to throttle the delivery of messages by configuring the connector accordingly.

The following demonstrates a connector that fetches messages one by one and a flow that uses manual acknowledgment to throttle message delivery.


         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<amqp:connector name="amqpThrottledConnector"
                virtualHost="my-vhost"
                username="my-user"
                password="my-pwd"
                prefetchCount="1"
                ackMode="MANUAL" />
 
<flow name="amqpManualAckService">
  <amqp:inbound-endpoint queueName="my-queue"
                         connector-ref="amqpThrottledConnector" />
  <!--
  components, routers... go here
  -->
  <amqp:acknowledge-message />
</flow>

Publishing Messages to a Redeclared Exchange

This is a typical AMQP pattern where producers redeclare the exchanges they intend to publish to.


         
      
1
2
3
4
5
6
7
8
9
10
11
12
<amqp:connector name="amqpLocalhostConnector"
                virtualHost="my-vhost"
                username="my-user"
                password="my-pwd"
                activeDeclarationsOnly="true" />
 
<amqp:outbound-endpoint routingKey="my-key"
                        exchangeName="my-exchange"
                        exchangeType="fanout"
                        exchangeAutoDelete="false"
                        exchangeDurable="false"
                        connector-ref="amqpLocalhostConnector" />

Publishing Messages to a Pre-existing Exchange

It is also possible to publish to a pre-existing exchange:


         
      
1
2
<amqp:outbound-endpoint exchangeName="my-exchange"
                        connector-ref="amqpLocalhostConnector" />

It may be desirable to strictly enforce the existence of this exchange before publishing to it. This is done by configuring the connector to perform passive declarations:


         
      
1
2
3
4
5
6
7
8
9
<amqp:connector name="amqpStrictLocalhostConnector"
                virtualHost="my-vhost"
                username="my-user"
                password="my-pwd"
                activeDeclarationsOnly="false" />
 
<amqp:outbound-endpoint routingKey="my-key"
                        exchangeName="my-exchange"
                        connector-ref="amqpStrictLocalhostConnector" />

Declaring and Binding an Outbound Queue

It’s also possible to declare the queue in outbound endpoints, as shown below:


         
      
1
2
3
4
<amqp:outbound-endpoint exchangeName="amqpOutBoundQueue-exchange"
                        exchangeType="fanout"
                        queueName="amqpOutBoundQueue-queue"
                        queueDurable="true" />

Note that the queue will be declared and bound in a lazy fashion, i.e. only when the outbound endpoint is used.

Message-level Override of Exchange and Routing Key

It’s possible to override some outbound endpoint attributes with outbound-scoped message properties:

  • routing-key overrides the routingKey attribute,

  • exchange overrides the exchangeName attribute.

Mandatory and Immediate Deliveries and Returned Message Handling

The connector supports the mandatory and immediate publication flags, as described below.

If a message sent with this connector can’t be delivered, the AMQP broker will return it asynchronously.

The AMQP transport offers the possibility of dispatching these returned messages to user-defined endpoints for custom processing.

You can define the endpoint in charge of handling returned messages at the connector level. Here is an example that targets a VM endpoint:


         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<vm:endpoint name="flowReturnedMessageChannel" path="flow.returnedMessages" />
 
<flow name="amqpMandatoryDeliveryFailureFlowHandler">
  <!--
  inbound endpoint, components, routers ...
  -->
 
  <amqp:return-handler>      
    <defaultReturnListener-ref="flowReturnedMessageChannel" />
  </amqp:return-handler>
 
  <amqp:outbound-endpoint routingKey="my-key"
                          exchangeName="my-exchange"
                          connector-ref="mandatoryAmqpConnector" />
</flow>

It’s also possible to define the returned message endpoint at flow level:


         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<vm:endpoint name="flowReturnedMessageChannel" path="flow.returnedMessages" />
 
<flow name="amqpMandatoryDeliveryFailureFlowHandler">
  <!--
  inbound endpoint, components, routers ...
  -->
 
  <amqp:return-handler>
    <vm:outbound-endpoint ref="flowReturnedMessageChannel" />
  </amqp:return-handler>
 
  <amqp:outbound-endpoint routingKey="my-key"
                          exchangeName="my-exchange"
                          connector-ref="mandatoryAmqpConnector" />
</flow>

If both are configured, the handler defined in the flow will supersede the one defined in the connector. If none is configured, Mule will log a warning with the full details of the returned message.

Request-response Publication

It’s possible to perform synchronous (request-response) outbound operations:


         
      
1
2
3
4
<amqp:outbound-endpoint routingKey="my-key"
                        exchange-pattern="request-response"
                        exchangeName="my-exchange"
                        connector-ref="amqpLocalhostConnector" />

In this case, Mule will:

  • Create a temporary auto-delete private reply queue

  • Set it as the reply-to property of the current message

  • Publish the message to the specified exchange

  • Wait for a response to be sent to the reply-queue (via the default exchange)

==Transaction Support

AMQP local transactions are supported by using the standard Mule transaction configuration element. For example, the following code declares an AMQP inbound endpoint that starts a new transaction for each newly-received message:


         
      
1
2
3
4
<amqp:inbound-endpoint queueName="amqpTransactedBridge-queue"
                       connector-ref="amqpConnector">
    <amqp:transaction action="ALWAYS_BEGIN" />
</amqp:inbound-endpoint>

The following declares a transacted AMQP bridge:


         
      
1
2
3
4
5
6
7
8
9
10
<bridge name="amqpTransactedBridge" exchange-pattern="one-way" transacted="true">
    <amqp:inbound-endpoint queueName="amqpTransactedBridge-queue"
                           connector-ref="amqpConnector">
        <amqp:transaction action="ALWAYS_BEGIN" />
    </amqp:inbound-endpoint>
    <amqp:outbound-endpoint exchangeName="amqpOneWayBridgeTarget-exchange"
                            connector-ref="amqpConnector">
        <amqp:transaction action="ALWAYS_JOIN" />
    </amqp:outbound-endpoint>
</bridge>

If an error occurs while processing the message after the inbound endpoint, the transaction will automatically be rolled back. Otherwise the transaction will be committed after successful dispatch in the outbound endpoint.

By default, no channel recovery is performed upon rollback. To modify this behavior, configure a recoverStrategy attribute on the transaction element, as shown below.


         
      
1
<amqp:transaction action="ALWAYS_BEGIN" recoverStrategy="REQUEUE" />

Valid values for the recoverStrategy option are: NONE, NO_REQUEUE and REQUEUE.

Transactions in AMQP do not behave like JMS transactions. It is strongly recommend that you read this overview of transaction support in AMQP 0.91 before using transactions. It is important to understand that when a transaction starts on a Mule-managed channel (for example, via <amqp:transaction action="ALWAYS_BEGIN"/>), this channel will remain transactional throughout its lifetime.

Exchange and Queue Declaration Arguments

AMQP supports custom arguments during the declaration of exchanges and queues. The AMQP connector supports these custom arguments, which you must pass as endpoint properties with names prefixed with amqp-exchange. or amqp-queue., for exchange or queue arguments respectively.

The below example declares a global endpoint that uses the alternate-exchange argument during the exchange declaration and the x-dead-letter-exchange argument during the queue declaration:


         
      
1
2
3
4
5
6
7
8
9
10
11
<amqp:endpoint name="amqpEndpointWithArguments" exchangeName="target-exchange"
    exchangeType="fanout" exchangeDurable="true" exchangeAutoDelete="false"
    queueName="target-queue" queueDurable="true" queueAutoDelete="false"
    queueExclusive="true" routingKey="a.b.c">
    <properties>
        <spring:entry key="amqp-exchange.alternate-exchange"
            value="some-exchange" />
        <spring:entry key="amqp-queue.x-dead-letter-exchange"
            value="some-queue" />
    </properties>
</amqp:endpoint>

Programmatic Message Requesting

It is possible to programmatically get messages from an AMQP queue.

To achieve this, you first need to build an URI that identifies the AMQP queue that you want to consume from. Here is the syntax to use, with optional parameters in square brackets:


         
      
1
amqp://[${exchangeName}/]amqp-queue.${queueName}[?connector=${connectorName}[&...other parameters...]]

For example, the following identifies a pre-existing queue named "my-queue", to be consumed with a unique AMQP connector available in the Mule configuration:


         
      
1
amqp://amqp-queue.my-queue

The below example creates and binds a non-durable, auto-delete, non-exclusive queue named "new-queue" to a pre-existing exchange named "my-exchange", with the provided routing key on the specified connector:


         
      
1
amqp://my-exchange/amqp-queue.new-queue?connector=amqpAutoAckLocalhostConnector&queueDurable=false&queueExclusive=false&queueAutoDelete=true

Once the URI is defined, it is possible to retrieve a message from the queue using the Mule Client, as shown in the following code sample.


         
      
1
MuleMessage message = new MuleClient(muleContext).request("amqp://amqp-queue.my-queue", 2500L);

The above waits for 2.5 seconds for a message, after which it returns null if no message has come up in the queue.

SSL Connectivity

The transport can connect to the broker using SSLv3 or TLS. To do so, use the AMQPS connector with the XML namespace declaration listed below.


         
      
1
xmlns:amqps="http://www.mulesoft.org/schema/mule/amqps" http://www.mulesoft.org/schema/mule/amqps http://www.mulesoft.org/schema/mule/amqps/current/mule-amqps.xsd

Connect using SSLv3 (default) and use a trust manager that accepts all certificates as valid:


         
      
1
<amqps:connector name="amqpsDefaultSslConnector" />

Connect using TLS and use a trust manager that accepts all certificates as valid:


         
      
1
<amqps:connector name="amqpsTlsConnector" sslProtocol="TLS" />

Connect using SSLv3 (default) and use a custom trust manager:


         
      
1
<amqps:connector name="amqpsTrustManagerConnector" sslTrustManager-ref="myTrustManager" />

Connect using TLS and use a custom trust manager:


         
      
1
<amqps:connector name="amqpsTlsTrustManagerConnector" sslProtocol="TLS" sslTrustManager-ref="myTrustManager" />

Connect with key and trust stores:


         
      
1
<amqps:connector name="amqpsTlsKeyStores"> <amqps:ssl-key-store path="keycert.p12" type="PKCS12" algorithm="SunX509" keyPassword="MySecretPassword" storePassword="MySecretPassword" /> <amqps:ssl-trust-store path="trustStore.jks" type="JKS" algorithm="SunX509" storePassword="rabbitstore" /> </amqps:connector>

See Also

  • Access the AMQP Connector Reference for a complete list and description of all AMQP connector configuration attributes.