Contact Free trial Login

About the AMQP Connector

The AMQP connector enables your application to do messaging using an AMQP 0.9.1 compliant broker. Its main features include:

  • RPC Pattern through request-reply consumption using private temporal queues

  • SDK Transaction management with optional channel self-recovery

  • Synchronous Message requesting, with timeout

  • Management operations for exchange and queue declarations

  • All AMQP message properties, including custom headers

  • Reply to (publishing replies to the default exchange)

  • Automatic, Mule-driven and manual message acknowledgement

  • Manual message rejection

  • Mandatory and immediate publishing parameters and handling of returned (undelivered) messages through error handling

  • Prefetch size and count "quality of service" settings

  • noLocal and exclusive consumers

  • TLS/SSL connectivity

  • Message Builder functionality similar to the one provided by the JMS Connector

Configuring the Connector

Out of the box, the AMQP connector comes with a fine tuned set of default values, both for publishing and consuming messages. This means that the only requirement is for you to configure which connection should be used.

This example sets a minimal connection to a broker in localhost:

<amqp:config name="AMQP_Config">
  <amqp:connection host="localhost" username="guest" password="guest" />
</amqp:config>

Defining Global Defaults

The AMQP config allows you to define multiple parameters that will be used by default while consuming or publishing messages. This way, you can define a global default behavior for all the operations associated to the config and then override each parameter only in the operations that require a custom behavior.

For example, we can define a prefetch size window in terms of size and in terms of whole messages.

<amqp:config name="AMQP_Config">
   <amqp:connection host="localhost" username="guest" password="guest" />
   <amqp:quality-of-service-config prefetchSize="4" prefetchCount="4" />
   <amqp:consumer-config ackMode="IMMEDIATE" />
   <amqp:publisher-config requestBrokerConfirms="true"/>
</amqp:config>

In this case, every consume operation or listener associated to this config uses an acknowledge mode of type IMMEDIATE, while every time we do a publish or publish-consume, the outgoing Message requests a confirm from the broker.

All of this properties can be overridden at the operation level when required. Let’s say we need to publish a message that does not require confirms from the broker:

<amqp:publish config-ref="AMQP_Config" requestBrokerConfirms="false"/>

Connecting to a Broker

An AMQP connection is defined by a set of general parameters that apply for any connection.

Providing Credentials For Authentication

In order to establish an authenticated connection, you have to set the username, password, and the virtualhost in the connection element:

<amqp:config name="AMQP_Config">
  <amqp:connection host="localhost" virtualhost="/" username="guest" password="guest" />
</amqp:config>

Setting TLS/SSL Connectivity to the Broker

Using the parameter useTls you can create a secure connection to the broker using the default TLS Context. You can also provide a custom TLS Context as a child element in the connection or as global element in the following way:

<tls:context name="globalTlsContext">
  <tls:trust-store path="tls/tlstest-cacerts.jks"
    password="changeit" />
  <tls:key-store path="tls/tlstest-keystore.jks"
    keyPassword="changeit" password="changeit" />
</tls:context>

<amqp:config name="config">
  <amqp:connection host="localhost" port="5671"
    virtualHost="/" username="guest" password="guest" useTls="true"
    tlsContext="globalTlsContext" />
 </amqp:config>

SNI

As of AMQP connector version 1.1.0 Server Name Indication (SNI) is supported.

Use the parameter useSNI to indicate that the AMQP client should add the hostname as an extension in the ClientHello message during the SSL handshake. If the broker supports SNI, it’s available to verify the hostname and present different certificates for the same IP address.

<tls:context name="globalTlsContext">
  <tls:trust-store path="tls/tlstest-cacerts.jks"
    password="changeit" />
  <tls:key-store path="tls/tlstest-keystore.jks"
    keyPassword="changeit" password="changeit" />
</tls:context>

<amqp:config name="config">
  <amqp:connection host="localhost" port="5671"
    virtualHost="/" username="guest" password="guest" useTls="true" useSni="true"
    tlsContext="globalTlsContext" />
 </amqp:config>

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub