Contact Free trial Login

Consume AMQP Messages - Mule 4

The consume operation in AMQP Connector provides the ability to consume a message at any given time in the flow, from any given AMQP queue.

Consume a Message

The syntax to consume a message from a queue is:

<amqp:consume config-ref="AMQP_config" queueName="#[vars.queue]"/>

This operation consumes the first available message in the queue identified by the queueName value, and converts it to an AmqpMessage, which results in the following structure:

  • The message’s content as payload

  • The message’s metadata in the message attributes

After it is received, the message is immediately acknowledged by default. If you want to control the ACK of the message after some processing, then set ackMode to MANUAL. For more information about message ACK operations, see How to Handle Acknowledgment.

Wait for a Message

By default, the maximum wait time is configured to 10 seconds. If no message is available during the specified wait time, an AMQP:TIMEOUT error is thrown. You can customize the wait time by configuring the maximumWait and maximumWaitUnit parameters.

To create an indefinite wait time for a message to arrive, set the maximumWait value to -1. In this case, no TIMEOUT error is thrown.

MIME Types and Encoding

AMQP Connector is designed to auto-determine a message’s MIME type (contentType) based on the contentType property of the message. However, there are cases in which it cannot make this determination, and you need firsthand knowledge of the message’s content.

In such cases, you can force the content type to a particular value by using the contentType parameter.

The same process works for encoding. By default, the connector assumes that the default encoding of Mule runtime engine matches that of the message, if no other information is provided. You can set this by using the encoding parameter.

Declare a Queue in the Consume Operation

By default, the consume operation fails with an AMQP:QUEUE_NOT_FOUND error if the defined queue does not exist.

If a queue must be declared, a definition for the entity should be referenced or defined inline:

<amqp:consume config-ref="Amqp_Config" queueName="testQueue">
	<amqp:fallback-queue-definition removalStrategy="SHUTDOWN" exchangeToBind="exchangeToBindToQueue" />
</amqp:consume>

Notice that in the definition of the queue a binding can be created using the parameter exchangeToBind.

The queue can also be defined as a high level element:

<amqp:queue-definition
	name="targetQueueDefinition"
	exchangeToBind="testExchange" />

<amqp:consume
	config-ref="AMQP_Config"
	queueName="testQueue"
	fallbackQueueDefinition="targetQueueDefinition">

Avoid Changing the AMQP Topography

You can set the createFallbackQueue global config to prevent changes to the AMQP topography resulting from the definition of fallback queues, see How to Avoid Changing the AMQP Topography.

Incoming Message Metadata

As stated earlier, each message received consists of two parts:

  • The payload, containing the content of the message

  • The attributes, containing metadata regarding the message

This metadata has four parts that map all the information available in a AMQP Message:

  • Envelope

  • AckId

  • Headers

  • Properties

See the AMQP Reference for information on the structure for attributes.