Nav
You are viewing an older version of this section. Click here to navigate to the latest version.

AMQP Connector

The Advanced Message Queuing Protocol (AMQP) is an open standard that defines a protocol for passing business messages between applications or organizations. It is an application-level, vendor-agnostic protocol designed for interoperability. The defining features of AMQP are message orientation, queuing, routing (including point-to-point and publish-and-subscribe), reliability and security (Wikipedia). The protocol’s specification is available http://www.amqp.org/resources/download[online] and several broker implementations exist, such as the very popular VMware RabbitMQ and Apache Qpid.

AMQP is built around easy-to-grasp concepts:

  • Clients connect via channels to AMQP brokers to send or receive messages.

  • Clients can publish messages to exchanges.

  • Messages published to exchanges are routed to queues, where they accumulate for later consuming.

  • The message publisher does not know which queue will be the final destination of a message; the queue is determined by the type of exchange and by a piece of metainformation known as the "routing key".

  • It is possible for a message to end-up nowhere if no queue is bound to the targeted exchange or if no existing queues are matched by the routing rules.

  • There are four main types of exchanges: direct, fanout, topic, and headers. For more information, see Exchange types and the effect of bindings.

  • Clients interested in consuming messages must create queues and bind these queues to exchanges.

  • Queue and exchange declaration are idempotent operations; hence in common practice they usually take place on each client startup.

AMQP for the JMS-savvy

If you’re a Java developer, chances are you have been exposed to JMS and are wondering how AMQP differs from JMS.

The main differences are:

  • AMQP defines both an API and a wire format, ensuring compatibility between implementations (JMS only defines an API)

  • In JMS you publish directly to destinations (queues or topic); in AMQP you publish to exchanges to which queues are bound (or not); this decouples the producer from the final destination of its messages

  • For some types of exchanges, delivery to the final destination depends on a routing key, a simple string that provides the necessary metadata for successfully routing the message (unlike in JMS where all that’s needed is the name of the destination)

Supported AMQP Versions

This transport is based on the RabbitMQ Java Client, which is compatible with brokers supporting AMQP version 0.9.1.

Features

Among others, the AMQP connector includes the following features:

  • it can receive inbound messages via a subscription to existing or declared exchanges and queues

  • it acan publish outbound messages to existing or declared exchanges

  • Outbound request-response pattern supported via temporary reply queues

  • Support for inbound and outbound transactions, with optional channel self-recovery

  • Synchronous Message requesting, with timeout

  • Passive or active-only exchange and queue declarations

  • Connection fallback across a list of AMQP hosts

  • All AMQP message properties, including custom headers

  • Reply to (publishing replies to the default exchange)

  • Automatic, Mule-driven and manual message acknowledgment

  • Manual message rejection

  • Manual channel recovery

  • Default exchange semantics in outbound endpoints

  • Mandatory and immediate publishing parameters and handling of returned (undelivered) messages

  • Prefetch size and count "quality of service" settings

  • noLocal and exclusive consumers

  • Custom exchange and queue arguments

  • SSL connectivity

Core Transport Principles

The Mule AMQP Transport is an abstraction built on top of the previously introduced AMQP constructs: connection, channel, exchanges, queues and messages.

The transport hides the low level concepts, like dealing with channels, but gives a great deal of control on all the constructs it encapsulates, allowing you to experience the richness of AMQP without the need to code to its API.

Here is a quick review of the main configuration elements you’ll deal with when using the transport:

  • The connector element:

    • establishes connections to AMQP brokers

    • deals with channels

    • manages a set of common properties shared by all consumers or publishers that use this connector

  • The inbound-endpoint element:

    • consumes messages from AMQP queues

    • routes these messages to your components, transformers or other outbound endpoints, as defined in your Mule application configuration

  • The outbound-endpoint element:

    • publishes messages to AMQP exchanges from your Mule configuration

Message Payload and Properties

The AMQP transport works with another abstraction that is very important to understand: the Mule message. A Mule message is a transport-agnostic abstraction that encapsulates a payload and meta-information defined in properties. This allows the different configuration elements in your application to deal with messages without knowing their source or destination.

An AMQP message also has a payload (in bytes) and message properties. Message properties are composed of a set of pre-defined properties (know as basic properties) and any additional custom properties. Moreover, when a message is delivered, extra properties, known as envelope properties, can be added to the message.

From an inbound AMQP message, the AMQP transport creates a Mule message with byte[] payloads. For a Mule message destined for an outbound AMQP message, Mule uses its auto transformation infrastructure to extract the Mule message’s byte[] payload. Should you need to use a particular payload representation (for example XML or JSON), it is up to you to add the necessary transformers to perform the desired serialization/deserialization steps.

The transport also takes care of making inbound message properties available as standard Mule Message properties and, conversely, converting properties of Mule messages into AMQP properties for outbound messages.

Here is the list of properties supported by the transport:

Basic Properties Envelope Properties Technical Properties

app-id

delivery-tag

amqp.headers

content-encoding

exchange

consumer-tag

content-type

redelivered

amqp.channel

correlation-id

routing-key

amqp.delivery-tag

delivery_mode

amqp.return.listener

expiration

amqp.return.reply-code

message-id

amqp.return.reply-text

priority

amqp.return.exchange

reply-to

amqp.return.routing-key

timestamp

 

type

 

user-id

 

Additionally, all custom headers defined in the AMQP basic properties – which are available in a map under the amqp.headers inbound property – are added as standard inbound properties.

Installing the AMQP Connector

Maven Support

To install the AMQP connector, add the following repository to your Maven installation:


          
       
1
2
3
4
5
6
<repository>
  <id>mule-releases</id>
  <name>Mule Releases Repository</name>
  <url>https://repository-master.mulesoft.org/nexus/content/repositories/releases</url>
  <layout>default</layout>
</repository>

To add the Mule AMQP connector to a Maven project, add the following dependency:


          
       
1
2
3
4
5
<dependency>
  <groupId>org.mule.transports</groupId>
  <artifactId>mule-transport-amqp</artifactId>
  <version>x.y.z</version>
</dependency>

Studio Plugin

The AMQP connector is available as a Studio plugin in Anypoint Exchange.

  1. In Anypoint Studio, click the Exchange icon in the Studio taskbar.

  2. Click Login in Anypoint Exchange.

  3. Search for the connector and click Install.

  4. Follow the prompts to install the connector.

AMQP Notes

  • The AMQP connector defines what broker to connect to, which credentials to use when doing so, and all the common properties used by the inbound and outbound endpoints using this connector.

  • You can create several connectors connected to the same broker for the purpose of having different sets of common properties that the endpoints use.

  • The AMQP connector accepts and uses a receiver-threading-profile that is used to set the consumer thread pool as per the RabbitMQ Consumer Thread Pool. More information on how to set a receiver threading profile in the Mule Tuning Performance guide.

  • The AMQP connector is not cluster aware. Every inbound endpoint acts as an AMQP consumer associated to a queue and the consumption of messages from a queue is performed using a round robin strategy among the consumers attached to it. This implies that although the inbound endpoints are not clustered, this does not result in the same message being consumed twice by different consumers. If you need pub-sub semantics, you should consider using one queue per consumer.

See Also