<amqp:consume config-ref="AMQP_config" queueName="#[vars.queue]"/>
Consume AMQP Messages - Mule 4
The consume
operation in AMQP Connector provides the ability to consume a message at any given time in the flow, from any given AMQP queue.
Consume a Message
The syntax to consume a message from a queue is:
This operation consumes the first available message in the queue identified by the queueName
value, and converts it to an AmqpMessage
, which results in the following structure:
-
The message’s content as payload
-
The message’s metadata in the message attributes
After it is received, the message is immediately acknowledged by default. If you want to control the ACK of the message after some processing, then set ackMode
to MANUAL
.
For more information about message ACK operations, see How to Handle Acknowledgment.
Wait for a Message
By default, the maximum wait time is configured to 10 seconds. If no message is available during the specified wait time, an AMQP:TIMEOUT
error is thrown. You can customize the wait time by configuring the maximumWait
and maximumWaitUnit
parameters.
To create an indefinite wait time for a message to arrive, set the maximumWait
value to -1
. In this case, no TIMEOUT
error is thrown.
MIME Types and Encoding
AMQP Connector is designed to auto-determine a message’s MIME type (contentType
) based on the contentType
property of the message. However, there are cases in which it cannot make this determination, and you need firsthand knowledge of the message’s content.
In such cases, you can force the content type to a particular value by using the contentType
parameter.
The same process works for encoding. By default, the connector assumes that the default encoding of Mule runtime engine matches that of the message, if no other information is provided. You can set this by using the encoding
parameter.
Declare a Queue in the Consume Operation
By default, the consume
operation fails with an AMQP:QUEUE_NOT_FOUND
error if the defined queue does not exist.
If a queue must be declared, a definition for the entity should be referenced or defined inline:
<amqp:consume config-ref="Amqp_Config" queueName="testQueue">
<amqp:fallback-queue-definition removalStrategy="SHUTDOWN" exchangeToBind="exchangeToBindToQueue" />
</amqp:consume>
Notice that in the definition of the queue a binding can be created using the parameter exchangeToBind
.
The queue can also be defined as a high level element:
<amqp:queue-definition
name="targetQueueDefinition"
exchangeToBind="testExchange" />
<amqp:consume
config-ref="AMQP_Config"
queueName="testQueue"
fallbackQueueDefinition="targetQueueDefinition">
Avoid Changing the AMQP Topography
You can set the createFallbackQueue
global config to prevent changes to the AMQP topography resulting from the definition of fallback queues, see How to Avoid Changing the AMQP Topography.
Incoming Message Metadata
As stated earlier, each message received consists of two parts:
-
The payload, containing the content of the message
-
The attributes, containing metadata regarding the message
This metadata has four parts that map all the information available in a AMQP Message:
-
Envelope
-
AckId
-
Headers
-
Properties
See the AMQP Reference for information on the structure for attributes.