To Consume AMQP Messages
The Consume operation in the AMQP connector provides the ability to consume a Message at any given time of the flow, from any given AMQP Queue.
The syntax to consume a message from a Queue is:
<amqp:consume config-ref="AMQP_config" queueName="#[vars.queue]"/>
The operation above consumes the first available message in the Queue identified by the queueName, and then converts it to a
AmqpMessage resulting in the following structure:
The message’s content as payload.
The message’s metadata in the message attributes
Once received, the Message will be immediately ACKed by default. If you want to control the ACK of the Message after some processing, then
ackMode should be set to
For more information regarding a Message ACK, see How to handle Acknowledgement.
By default, the maximum wait time is configured in 10 seconds, producing an
AMQP:TIMEOUT error if no message is available in that period.
In order to configure a timeout that fits your use case better, customize the
In order to wait indefinitely for a Message to arrive, the
maximumWait value has to be set to
-1. In this case, no
TIMEOUT error will be thrown.
The AMQP Connector does its best to auto determine a Message’s mime type (
contentType) based on the
contentType property of the message. However, there are cases in which that best guess is not enough, and you need first-hand knowledge of the Message’s content.
In such cases, you can force that content type to a particular value by using the
The same process works for encoding. By default, the connector will assume that the runtime’s default encoding matches the one in the Message if no other information is provided. You can set this by using the
By default, the
consume operation will fail in case the defined queue does not exist with an
For cases where the queue has to be declared, a definition for the entity should be referenced or defined inline so that the queue is declared.
<amqp:consume config-ref="Amqp_Config" queueName="testQueue"> <amqp:fallback-queue-definition removalStrategy="SHUTDOWN" exchangeToBind="exchangeToBindToQueue" /> </amqp:consume>
Notice that in the definition of the queue a binding can be created using the parameter
The queue can also be defined as a high level element:
<amqp:queue-definition name="targetQueueDefinition" exchangeToBind="testExchange" /> <amqp:consume config-ref="AMQP_Config" queueName="testQueue" fallbackQueueDefinition="targetQueueDefinition">
You can set the
createFallbackQueue global config to prevent changes to the AMQP topography resulting from the definition of fallback queues, see How to Avoid Changing the AMQP Topography.
As stated before, each Message received will consist of two parts:
The payload, containing the content of the Message
The attributes, containing metadata regarding the Message
This Metadata has four parts that map all the information available in a AMQP Message:
Check the AMQP Reference for details regarding the Attributes structure.