Contact Us 1-800-596-4880

Consume AMQP Messages - Mule 4

The consume operation in AMQP Connector provides the ability to consume a message at any given time in the flow, from any given AMQP queue.

Consume a Message

The syntax to consume a message from a queue is:

<amqp:consume config-ref="AMQP_config" queueName="#[vars.queue]"/>

This operation consumes the first available message in the queue identified by the queueName value, and converts it to an AmqpMessage, which results in the following structure:

  • The message’s content as payload

  • The message’s metadata in the message attributes

After it is received, the message is immediately acknowledged by default. If you want to control the ACK of the message after some processing, then set ackMode to MANUAL. For more information about message ACK operations, see How to Handle Acknowledgment.

Default Configuration for the Consume Operation

Because the default AMQP global configuration element <amqp:config> is optimized for the AMQP Listener source, the configuration does not have the suitable parameters for the Consume Operation.

Using the default AMQP global configuration with the Consume Operation can return the following error:

com.mule.extensions.amqp.internal.connection.provider.GenericConnectionProvider.initialise:221 @415c7a8a ERROR
Consumer com.mule.extensions.amqp.internal.client.SingleMessageQueueingConsumer@741fd13b (amq.ctag-BsDMf9v86wa9v3e_mo1p8g) method handleDelivery for channel AMQChannel(amqp://xxxx@xxxx/xxxx,1) threw an exception for channel AMQChannel(amqp://xxxx@xxxx:5672/xxxx,1)
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:421)
at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicReject(RecoveryAwareChannelN.java:114)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicReject(AutorecoveringChannel.java:438)
at com.mule.extensions.amqp.internal.connection.channel.MuleAmqpChannel.basicReject(MuleAmqpChannel.java:383)
at com.mule.extensions.amqp.internal.client.SingleMessageQueueingConsumer.handleDelivery(SingleMessageQueueingConsumer.java:47)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.mule.service.scheduler.internal.AbstractRunnableFutureDecorator.doRun(AbstractRunnableFutureDecorator.java:111)
at org.mule.service.scheduler.internal.RunnableFutureDecorator.run(RunnableFutureDecorator.java:54)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Usually, this occurs because the broker sends all the messages in the queue to the requester while the Consume Operation processes only the first message and discards the remaining ones. To avoid this behavior, change one or both of the following parameters depending upon your use case:

  • Clean the Maximum Wait Parameter

    1. In Studio, select the Publish consume operation of your flow.

    2. Clean any value for the Maximum wait parameter by leaving it empty.

      If you are using the XML editor, clean any value for the maximumWait parameter.

  • Update the Prefetch Count Parameter

    1. In Studio, go to the Global Elements tab.

    2. Select the AMQP Config (Configuration) configuration.

    3. Click Edit.

    4. In the Quality of Service tab, set the Prefetch count to 1.

      If you are using the XML editor, in the quality-of-service-config section of the AMQP global configuration element <amqp:config>, set the prefetchCount parameter to 1.

Wait for a Message

By default, the maximum wait time is configured to 10 seconds. If no message is available during the specified wait time, an AMQP:TIMEOUT error is thrown. You can customize the wait time by configuring the maximumWait and maximumWaitUnit parameters.

To create an indefinite wait time for a message to arrive, set the maximumWait value to -1. In this case, no TIMEOUT error is thrown.

MIME Types and Encoding

AMQP Connector is designed to auto-determine a message’s MIME type (contentType) based on the contentType property of the message. However, there are cases in which it cannot make this determination, and you need firsthand knowledge of the message’s content.

In such cases, you can force the content type to a particular value by using the contentType parameter.

The same process works for encoding. By default, the connector assumes that the default encoding of Mule runtime engine matches that of the message, if no other information is provided. You can set this by using the encoding parameter.

Declare a Queue in the Consume Operation

By default, the consume operation fails with an AMQP:QUEUE_NOT_FOUND error if the defined queue does not exist.

If a queue must be declared, a definition for the entity should be referenced or defined inline:

<amqp:consume config-ref="Amqp_Config" queueName="testQueue">
	<amqp:fallback-queue-definition removalStrategy="SHUTDOWN" exchangeToBind="exchangeToBindToQueue" />
</amqp:consume>

Notice that in the definition of the queue a binding can be created using the parameter exchangeToBind.

The queue can also be defined as a high level element:

<amqp:queue-definition
	name="targetQueueDefinition"
	exchangeToBind="testExchange" />

<amqp:consume
	config-ref="AMQP_Config"
	queueName="testQueue"
	fallbackQueueDefinition="targetQueueDefinition">

Avoid Changing the AMQP Topography

You can set the createFallbackQueue global config to prevent changes to the AMQP topography resulting from the definition of fallback queues, see How to Avoid Changing the AMQP Topography.

Incoming Message Metadata

As stated earlier, each message received consists of two parts:

  • The payload, containing the content of the message

  • The attributes, containing metadata regarding the message

This metadata has four parts that map all the information available in a AMQP Message:

  • Envelope

  • AckId

  • Headers

  • Properties

See the AMQP Reference for information on the structure for attributes.

View on GitHub