Consume AMQP Messages - Mule 4
consume operation in AMQP Connector provides the ability to consume a message at any given time in the flow, from any given AMQP queue.
The syntax to consume a message from a queue is:
<amqp:consume config-ref="AMQP_config" queueName="#[vars.queue]"/>
This operation consumes the first available message in the queue identified by the
queueName value, and converts it to an
AmqpMessage, which results in the following structure:
The message’s content as payload
The message’s metadata in the message attributes
After it is received, the message is immediately acknowledged by default. If you want to control the ACK of the message after some processing, then set
For more information about message ACK operations, see How to Handle Acknowledgment.
By default, the maximum wait time is configured to 10 seconds. If no message is available during the specified wait time, an
AMQP:TIMEOUT error is thrown. You can customize the wait time by configuring the
To create an indefinite wait time for a message to arrive, set the
maximumWait value to
-1. In this case, no
TIMEOUT error is thrown.
AMQP Connector is designed to auto-determine a message’s MIME type (
contentType) based on the
contentType property of the message. However, there are cases in which it cannot make this determination, and you need firsthand knowledge of the message’s content.
In such cases, you can force the content type to a particular value by using the
The same process works for encoding. By default, the connector assumes that the default encoding of Mule runtime engine matches that of the message, if no other information is provided. You can set this by using the
By default, the
consume operation fails with an
AMQP:QUEUE_NOT_FOUND error if the defined queue does not exist.
If a queue must be declared, a definition for the entity should be referenced or defined inline:
<amqp:consume config-ref="Amqp_Config" queueName="testQueue"> <amqp:fallback-queue-definition removalStrategy="SHUTDOWN" exchangeToBind="exchangeToBindToQueue" /> </amqp:consume>
Notice that in the definition of the queue a binding can be created using the parameter
The queue can also be defined as a high level element:
<amqp:queue-definition name="targetQueueDefinition" exchangeToBind="testExchange" /> <amqp:consume config-ref="AMQP_Config" queueName="testQueue" fallbackQueueDefinition="targetQueueDefinition">
You can set the
createFallbackQueue global config to prevent changes to the AMQP topography resulting from the definition of fallback queues, see How to Avoid Changing the AMQP Topography.
As stated earlier, each message received consists of two parts:
The payload, containing the content of the message
The attributes, containing metadata regarding the message
This metadata has four parts that map all the information available in a AMQP Message:
See the AMQP Reference for information on the structure for attributes.