Nav

AMQP Connector

amqp connector icon

Introduction

The Advanced Message Queuing Protocol (AMQP) is an open standard that defines a protocol for passing business messages between applications or organizations. It is an application-level, vendor-agnostic protocol designed for interoperability. The defining features of AMQP are message orientation, queuing, routing (including point-to-point and publish-and-subscribe), reliability, and security (Wikipedia). The protocol’s specification is available online and several broker implementations exist, such as the very popular VMware RabbitMQ and Apache Qpid.

You can "test drive" this connector in Anypoint Studio using the instructions in Installing a Connector from Anypoint Exchange.  

AMQP is built around several basic concepts:

  • Clients connect via channels to AMQP brokers to send or receive messages.

  • Clients can publish messages to exchanges.

  • Messages published to exchanges are routed to queues, where they accumulate for later consuming.

  • The message publisher does not know which queue is the final destination of a message; the queue is determined by the type of exchange and by a piece of metadata known as the "routing key".

  • It is possible for a message to end-up no where if no queue is bound to the targeted exchange or if no existing queues are matched by the routing rules.

  • There are four main types of exchanges: direct, fanout, topic, and headers. For more information, see Exchange types and the effect of bindings.

  • Clients interested in consuming messages must create queues and bind these queues to exchanges.

  • Queue and exchange declaration are idempotent operations; hence in common practice they usually take place on each client startup.

AMQP for the JMS-savvy

If you’re a Java developer, chances are you have been exposed to JMS and are wondering how AMQP differs from JMS.

The main differences are:

  • AMQP defines both an API and a wire format, ensuring compatibility between implementations (JMS only defines an API)

  • In JMS you publish directly to destinations (queues or topic); in AMQP you publish to exchanges to which queues are bound (or not); this decouples the producer from the final destination of its messages

  • For some types of exchanges, delivery to the final destination depends on a routing key, a simple string that provides the necessary metadata for successfully routing the message (unlike in JMS where all that’s needed is the name of the destination)

Supported AMQP Versions

This transport is based on the RabbitMQ Java Client, which is compatible with brokers supporting AMQP version 0.9.1.

Features

The AMQP connector receives inbound messages via a subscription to existing or declared exchanges and queues and publishes outbound messages to existing or declared exchanges.

The AMQP connector supports:

  • Outbound request-response patterns via temporary reply queues

  • Inbound and outbound transactions, with optional channel self-recovery

  • Synchronous Message requesting, with timeout

  • Passive or active-only exchange and queue declarations

  • Connection fallback across a list of AMQP hosts.

  • All AMQP message properties, including custom headers

  • Reply to (publishing replies to the default exchange)

  • Automatic, Mule-driven and manual message acknowledgment

  • Manual message rejection

  • Manual channel recovery

  • Default exchange semantics in outbound endpoints

  • Mandatory and immediate publishing parameters and handling of returned (undelivered) messages

  • Prefetch size and count "quality of service" settings

  • noLocal and exclusive consumers

  • Custom exchange and queue arguments

  • SSL connectivity

Core Connector Principles

The AMQP connector is an abstraction built on top of the previously introduced AMQP constructs: connection, channel, exchanges, queues and messages.

The connector hides the low-level concepts, like dealing with channels, but gives a great deal of control over all the constructs it encapsulates, allowing you to access the richness of AMQP without the need to code to its API.

Main configuration elements with the AMQP connector:

Element Features

connector

  • Establishes connections to AMQP brokers

  • Deals with channels

  • Manages a set of common properties shared by all consumers or publishers that use this connector

inbound-endpoint

  • Consumes messages from AMQP queues

  • Routes these messages to your components, transformers or other outbound endpoints, as defined in your Mule application configuration

outbound-endpoint

  • Publishes messages to AMQP exchanges from your Mule configuration

Mapping AMQP Messages to Mule Messages

The AMQP connector works with another abstraction that is very important to understand: the Mule message. A Mule message is a transport-agnostic abstraction that encapsulates a payload and meta-information defined in properties. This allows the different configuration elements in your application to deal with messages without knowing their source or destination.

An AMQP message also has a payload (in bytes) and message properties. Message properties are composed of a set of pre-defined properties (know as basic properties) and any additional custom properties. Moreover, when a message is delivered, extra properties, known as envelope properties, can be added to the message.

From an inbound AMQP message, the AMQP connector creates a Mule message with byte[] payloads. For a Mule message destined for an outbound AMQP message, Mule uses its auto-transformation infrastructure to extract the Mule message’s byte[] payload. Should you need to use a particular payload representation (for example XML or JSON), add the necessary transformers to perform the desired serialization and deserialization steps.

The transport also takes care of making inbound message properties available as standard Mule Message properties and, conversely, converting properties of Mule messages into AMQP properties for outbound messages.

Here is the list of properties supported by the connector:

Basic Properties Envelope Properties Technical Properties

app-id

delivery-tag

amqp.headers

content-encoding

exchange

consumer-tag

content-type

redelivered

amqp.channel

correlation-id

routing-key

amqp.delivery-tag

delivery_mode

amqp.return.listener

expiration

amqp.return.reply-code

message-id

amqp.return.reply-text

priority

amqp.return.exchange

reply-to

amqp.return.routing-key

timestamp

 

type

 

user-id

 

Additionally, all custom headers defined in the AMQP basic properties – which are available in a map under the amqp.headers inbound property – are added as standard inbound properties.

Installing the AMQP Connector

Studio Plugin

The AMQP connector is available as a Studio plugin. Get the AMQP connector from Anypoint Exchange. Connector installation information is available at Anypoint Connectors.

Maven Support

To install the AMQP connector, add the following repository to your Maven installation:


          
       
1
2
3
4
5
6
<repository>
  <id>mule-releases</id>
  <name>Mule Releases Repository</name>
  <url>https://repository-master.mulesoft.org/nexus/content/repositories/releases</url>
  <layout>default</layout>
</repository>

To add the Mule AMQP connector to a Maven project, add the following dependency and inclusion:


          
       
1
2
3
4
5
<dependency>
  <groupId>org.mule.transports</groupId>
  <artifactId>mule-transport-amqp</artifactId>
  <version>x.y.z</version>
</dependency>

          
       
1
2
3
4
5
6
<inclusions>
  <inclusion>
  <groupId>org.mule.transports</groupId>
  <artifactId>mule-transport-amqp</artifactId>
  </inclusion>
</inclusions>

You also need to add JARs included in the ActiveMQ distribution. The following dependencies provide a Maven alternative to only adding JARs to an Anypoint Studio project.

Studio does not add these dependencies for you, so it’s important to add these dependencies.

After adding the dependencies to the POM file, add the files from the Active MQ distribution to $MULE_HOME/lib/user or $APP_HOME/lib. The latter can be done manually or by using Studio.

Dependencies:


          
       
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-kahadb-store</artifactId>
    <version>5.8.0</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq.protobuf</groupId>
    <artifactId>activemq-protobuf</artifactId>
    <version>1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-openwire-legacy</artifactId>
    <version>5.8.0</version>
</dependency>
<dependency>
  <groupId>org.fusesource.hawtbuf</groupId>
  <artifactId>hawtbuf</artifactId>
  <version>1.9</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-broker</artifactId>
    <version>5.8.0</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>5.8.0</version>
</dependency>

Configuring the AMQP Connector

Configure the connector via the tabs in the properties pane that appear after you’ve clicked an AMQP connector on the Anypoint Studio canvas.

general tab

AMQP Notes

The AMQP connector defines what broker to connect to, which credentials to use when doing so, and all the common properties used by the inbound and outbound endpoints using this connector.

It is possible to create several connectors connected to the same broker for the purpose of having different sets of common properties that the endpoints use.

The AMQP connector accepts and uses a receiver-threading-profile that is used to set the consumer thread pool as per the RabbitMQ Consumer Thread Pool. More information on how to set a receiver threading profile in the Mule Tuning Performance guide.

See Also