AMQP Connector - Mule 4

Support Category: Select

Anypoint Connector for AMQP (AMQP Connector) v1.6

Release Notes: AMQP Connector Release Notes
Exchange: AMQP Connector

AMQP Connector enables your application to publish and consume messages using an AMQP 0.9.1-compliant broker. The connector’s main features include:

  • RPC Pattern through request-reply consumption using private temporal queues

  • SDK transaction management with optional channel self-recovery

  • Synchronous message requesting, with timeout

  • Management operations for exchange and queue declarations

  • All AMQP message properties, including custom headers

  • Reply to (publishing replies to the default exchange)

  • Automatic, Mule-driven and manual message acknowledgement

  • Manual message rejection

  • Mandatory and immediate publishing parameters and handling of returned (undelivered) messages through error handling

  • Prefetch size and count "quality of service" settings

  • noLocal and exclusive consumers

  • TLS/SSL connectivity

  • Message builder functionality similar to that provided by JMS Connector

Configure the Connector

AMQP Connector comes out of the box with a fine-tuned set of default values for both publishing and consuming messages. This means that the only requirement is for you to configure which connection to use.

This example sets a minimal connection to a broker in localhost:

<amqp:config name="AMQP_Config">
  <amqp:connection
    host="localhost"
    username="guest"
    password="guest" />
</amqp:config>

The Apache Maven pom.xml file dependency for AMQP Connector is as follows:

<dependency>
  <groupId>com.mulesoft.connectors</groupId>
  <artifactId>mule-amqp-connector</artifactId>
  <version>1.6.0</version>
  <classifier>mule-plugin</classifier>
</dependency>

Verify the <version> is the one you are using. To specify a version, view AMQP Connector in Anypoint Exchange and click Dependency Snippets.

Define Global Defaults

You can define a set of default parameters to use while consuming or publishing messages. This way, you can define a global default behavior for all the operations associated with the configuration and then override each parameter only in the operations that require a custom behavior.

For example, you can define a prefetch size window in terms of size and in terms of whole messages:

<amqp:config name="AMQP_Config">
   <amqp:connection host="localhost" username="guest" password="guest" />
   <amqp:quality-of-service-config prefetchSize="4" prefetchCount="4" />
   <amqp:consumer-config ackMode="IMMEDIATE" />
   <amqp:publisher-config requestBrokerConfirms="true"/>
</amqp:config>

In the previous example, every consume operation or listener associated with this configuration uses an acknowledgement mode of the type IMMEDIATE, while for every publish or publish-consume operation, the outgoing message requests a confirmation from the broker.

You can override all of these properties at the operation level when required. For example, if you need to publish a message that does not require confirmation from the broker:

<amqp:publish config-ref="AMQP_Config" requestBrokerConfirms="false"/>

Connect to a Broker

An AMQP connection is defined by a set of general parameters that apply for any connection.

Provide Credentials For Authentication

To establish an authenticated connection, you have to set the username, password, and the virtualhost in the connection element:

<amqp:config name="AMQP_Config">
  <amqp:connection host="localhost" virtualhost="/" username="guest" password="guest" />
</amqp:config>

Set TLS/SSL Connectivity to the Broker

You can use the parameter useTls to create a secure connection to the broker using the default TLS context. You can also provide a custom TLS context as a child element in the connection, or as global element in the following way:

<tls:context name="globalTlsContext">
  <tls:trust-store path="tls/tlstest-cacerts.jks"
    password="changeit" />
  <tls:key-store path="tls/tlstest-keystore.jks"
    keyPassword="changeit" password="changeit" />
</tls:context>

<amqp:config name="config">
  <amqp:connection host="localhost" port="5671"
    virtualHost="/" username="guest" password="guest" useTls="true"
    tlsContext="globalTlsContext" />
 </amqp:config>

Server Name Indication

AMQP Connector v1.1.0 and later supports Server Name Indication (SNI). If the broker supports SNI, it’s available to verify the hostname and present different certificates for the same IP address.

Use the parameter useSNI to indicate that the AMQP client should add the hostname as an extension in the ClientHello message during the SSL handshake:

<tls:context name="globalTlsContext">
  <tls:trust-store path="tls/tlstest-cacerts.jks"
    password="changeit" />
  <tls:key-store path="tls/tlstest-keystore.jks"
    keyPassword="changeit" password="changeit" />
</tls:context>

<amqp:config name="config">
  <amqp:connection host="localhost" port="5671"
    virtualHost="/" username="guest" password="guest" useTls="true" useSni="true"
    tlsContext="globalTlsContext" />
 </amqp:config>