Contact Us 1-800-596-4880

AMQP Connector

Select

The Advanced Message Queuing Protocol (AMQP) is an open standard that defines a protocol for passing business messages between applications or organizations. It is an application-level, vendor-agnostic protocol designed for interoperability. The defining features of AMQP are message orientation, queuing, routing (including point-to-point and publish-and-subscribe), reliability, and security (Wikipedia). The protocol’s specification is available online and several broker implementations exist, such as the very popular VMware RabbitMQ and Apache Qpid.

The AMQP Connector enables your application to publish and consume messages using an AMQP-compliant broker.

AMQP is built around several basic concepts:

  • Clients connect via channels to AMQP brokers to send or receive messages.

  • Clients can publish messages to exchanges.

  • Messages published to exchanges are routed to queues, where they accumulate for later consuming.

  • The message publisher does not know which queue is the final destination of a message; the queue is determined by the type of exchange and by a piece of metadata known as the "routing key".

  • It is possible for a message to end-up no where if no queue is bound to the targeted exchange or if no existing queues are matched by the routing rules.

  • There are four main types of exchanges: direct, fanout, topic, and headers. For more information, see Exchange types and the effect of bindings.

  • Clients interested in consuming messages must create queues and bind these queues to exchanges.

  • Queue and exchange declaration are idempotent operations; hence in common practice they usually take place on each client startup.

AMQP for the JMS-savvy

If you’re a Java developer, chances are you have been exposed to JMS and are wondering how AMQP differs from JMS.

The main differences are:

  • AMQP defines both an API and a wire format, ensuring compatibility between implementations (JMS only defines an API)

  • In JMS you publish directly to destinations (queues or topic); in AMQP you publish to exchanges to which queues are bound (or not); this decouples the producer from the final destination of its messages

  • For some types of exchanges, delivery to the final destination depends on a routing key, a simple string that provides the necessary metadata for successfully routing the message (unlike in JMS where all that’s needed is the name of the destination)

Supported AMQP Versions

This transport is based on the RabbitMQ Java Client, which is compatible with brokers supporting AMQP version 0.9.1.

Features

The AMQP connector receives inbound messages via a subscription to existing or declared exchanges and queues and publishes outbound messages to existing or declared exchanges.

The AMQP connector supports:

  • All AMQP message properties, including custom headers

  • Automatic, Mule-driven and manual message acknowledgment

  • Custom exchange and queue arguments

  • Default exchange semantics in outbound endpoints

  • Inbound and outbound transactions, with optional channel self-recovery

  • Connection fallback across a list of AMQP hosts.

  • Mandatory and immediate publishing parameters and handling of returned (undelivered) messages

  • Manual channel recovery

  • Manual message rejection

  • noLocal and exclusive consumers

  • Outbound request-response patterns via temporary reply queues

  • Passive or active-only exchange and queue declarations

  • Prefetch size and count "quality of service" settings

  • Reply to (publishing replies to the default exchange)

  • SSL connectivity

  • Synchronous Message requesting, with timeout

Core Connector Principles

The AMQP connector is an abstraction built on top of the previously introduced AMQP constructs: connection, channel, exchanges, queues and messages.

The connector hides the low-level concepts, like dealing with channels, but gives a great deal of control over all the constructs it encapsulates, allowing you to access the richness of AMQP without the need to code to its API.

Main configuration elements with the AMQP connector:

Element Features

connector

  • Establishes connections to AMQP brokers

  • Deals with channels

  • Manages a set of common properties shared by all consumers or publishers that use this connector

inbound-endpoint

  • Consumes messages from AMQP queues

  • Routes these messages to your components, transformers or other outbound endpoints, as defined in your Mule application configuration

outbound-endpoint

  • Publishes messages to AMQP exchanges from your Mule configuration

Mapping AMQP Messages to Mule Messages

The AMQP connector works with another abstraction that is very important to understand: the Mule message. A Mule message is a transport-agnostic abstraction that encapsulates a payload and meta-information defined in properties. This allows the different configuration elements in your application to deal with messages without knowing their source or destination.

An AMQP message also has a payload (in bytes) and message properties. Message properties are composed of a set of pre-defined properties (know as basic properties) and any additional custom properties. Moreover, when a message is delivered, extra properties, known as envelope properties, can be added to the message.

From an inbound AMQP message, the AMQP connector creates a Mule message with byte[] payloads. For a Mule message destined for an outbound AMQP message, Mule uses its auto-transformation infrastructure to extract the Mule message’s byte[] payload. Should you need to use a particular payload representation (for example XML or JSON), add the necessary transformers to perform the desired serialization and deserialization steps.

The transport also takes care of making inbound message properties available as standard Mule Message properties and, conversely, converting properties of Mule messages into AMQP properties for outbound messages.

Here is the list of properties supported by the connector:

Basic Properties Envelope Properties Technical Properties

app-id

delivery-tag

amqp.headers

content-encoding

exchange

consumer-tag

content-type

redelivered

amqp.channel

correlation-id

routing-key

amqp.delivery-tag

delivery_mode

amqp.return.listener

expiration

amqp.return.reply-code

message-id

amqp.return.reply-text

priority

amqp.return.exchange

reply-to

amqp.return.routing-key

timestamp

type

user-id

Additionally, all custom headers defined in the AMQP basic properties – which are available in a map under the amqp.headers inbound property – are added as standard inbound properties.

Installing the AMQP Connector

Studio Plugin

The AMQP connector is available as a Studio plugin. Get the AMQP connector from Anypoint Exchange. Connector installation information is available at Anypoint Connectors.

Maven Support

To install the AMQP connector, add the following repository to your Maven installation:

<repository>
  <id>mule-releases</id>
  <name>Mule Releases Repository</name>
  <url>https://repository-master.mulesoft.org/nexus/content/repositories/releases</url>
  <layout>default</layout>
</repository>

To add the Mule AMQP connector to a Maven project, add the following dependency and inclusion:

<dependency>
  <groupId>org.mule.transports</groupId>
  <artifactId>mule-transport-amqp</artifactId>
  <version>x.y.z</version>
</dependency>
<inclusions>
  <inclusion>
  <groupId>org.mule.transports</groupId>
  <artifactId>mule-transport-amqp</artifactId>
  </inclusion>
</inclusions>

Configuring the AMQP Connector

Configure the connector via the tabs in the properties pane that appear after you’ve clicked an AMQP connector on the Anypoint Studio canvas.

general tab

Studio Plugin

The AMQP connector is available as a Studio plugin in Anypoint Exchange.

  1. In Anypoint Studio, click the Exchange icon in the Studio taskbar.

  2. Click Login in Anypoint Exchange.

  3. Search for the connector and click Install.

  4. Follow the prompts to install the connector.

AMQP Notes

  • The AMQP connector defines what broker to connect to, which credentials to use when doing so, and all the common properties used by the inbound and outbound endpoints using this connector.

  • You can create several connectors connected to the same broker for the purpose of having different sets of common properties that the endpoints use.

  • The AMQP connector accepts and uses a receiver-threading-profile that is used to set the consumer thread pool as per the RabbitMQ Consumer Thread Pool. More information on how to set a receiver threading profile in the Mule Tuning Performance guide.

  • The AMQP connector is not cluster aware. Every inbound endpoint acts as an AMQP consumer associated to a queue and the consumption of messages from a queue is performed using a round robin strategy among the consumers attached to it. This implies that although the inbound endpoints are not clustered, this does not result in the same message being consumed twice by different consumers. If you need pub-sub semantics, you should consider using one queue per consumer.

See Also

View on GitHub