Contact Free trial Login

To Consume AMQP Messages

The Consume operation in the AMQP connector provides the ability to consume a Message at any given time of the flow, from any given AMQP Queue.

Consuming A Message

The syntax to consume a message from a Queue is:

<amqp:consume config-ref="AMQP_config" queueName="#[vars.queue]"/>

The operation above consumes the first available message in the Queue identified by the queueName, and then converts it to a AmqpMessage resulting in the following structure:

  • The message’s content as payload.

  • The message’s metadata in the message attributes

Once received, the Message will be immediately ACKed by default. If you want to control the ACK of the Message after some processing, then ackMode should be set to MANUAL For more information regarding a Message ACK, see How to handle Acknowledgement.

Waiting for a Message

By default, the maximum wait time is configured in 10 seconds, producing an AMQP:TIMEOUT error if no message is available in that period. In order to configure a timeout that fits your use case better, customize the maximumWait and maximumWaitUnit parameters.

In order to wait indefinitely for a Message to arrive, the maximumWait value has to be set to -1. In this case, no TIMEOUT error will be thrown.

Mime Types and Encoding

The AMQP Connector does its best to auto determine a Message’s mime type (contentType) based on the contentType property of the message. However, there are cases in which that best guess is not enough, and you need first-hand knowledge of the Message’s content.

In such cases, you can force that content type to a particular value by using the contentType parameter.

The same process works for encoding. By default, the connector will assume that the runtime’s default encoding matches the one in the Message if no other information is provided. You can set this by using the encoding parameter.

Declaring a Queue in the Consume Operation

By default, the consume operation will fail in case the defined queue does not exist with an AMQP:QUEUE_NOT_FOUND error.

For cases where the queue has to be declared, a definition for the entity should be referenced or defined inline so that the queue is declared.

<amqp:consume config-ref="Amqp_Config" queueName="testQueue">
	<amqp:fallback-queue-definition removalStrategy="SHUTDOWN" exchangeToBind="exchangeToBindToQueue" />

Notice that in the definition of the queue a binding can be created using the parameter exchangeToBind.

The queue can also be defined as a high level element:

<amqp:queue-definition name="targetQueueDefinition" exchangeToBind="testExchange" />

<amqp:consume  config-ref="AMQP_Config" queueName="testQueue" fallbackQueueDefinition="targetQueueDefinition">

How to avoid changing the AMQP Topography

You can set the createFallbackQueue global config to prevent changes to the AMQP topography resulting from the definition of fallback queues, see How to Avoid Changing the AMQP Topography.

Incoming Message Metadata

As stated before, each Message received will consist of two parts:

  • The payload, containing the content of the Message

  • The attributes, containing metadata regarding the Message

This Metadata has four parts that map all the information available in a AMQP Message:

  • Envelope

  • AckId

  • Headers

  • Properties

Check the AMQP Reference for details regarding the Attributes structure.

We use cookies to make interactions with our websites and services easy and meaningful, to better understand how they are used and to tailor advertising. You can read more and make your cookie choices here. By continuing to use this site you are giving us your consent to do this.