<amqp:consume config-ref="AMQP_config" queueName="#[vars.queue]"/>
Consume AMQP Messages - Mule 4
The consume
operation in AMQP Connector provides the ability to consume a message at any given time in the flow, from any given AMQP queue.
Consume a Message
The syntax to consume a message from a queue is:
This operation consumes the first available message in the queue identified by the queueName
value, and converts it to an AmqpMessage
, which results in the following structure:
-
The message’s content as payload
-
The message’s metadata in the message attributes
After it is received, the message is immediately acknowledged by default. If you want to control the ACK of the message after some processing, then set ackMode
to MANUAL
.
For more information about message ACK operations, see How to Handle Acknowledgment.
Default Configuration for the Consume Operation
Because the default AMQP global configuration element <amqp:config>
is optimized for the AMQP Listener source, the configuration does not have the suitable parameters for the Consume Operation.
Using the default AMQP global configuration with the Consume Operation can return the following error:
com.mule.extensions.amqp.internal.connection.provider.GenericConnectionProvider.initialise:221 @415c7a8a ERROR
Consumer com.mule.extensions.amqp.internal.client.SingleMessageQueueingConsumer@741fd13b (amq.ctag-BsDMf9v86wa9v3e_mo1p8g) method handleDelivery for channel AMQChannel(amqp://xxxx@xxxx/xxxx,1) threw an exception for channel AMQChannel(amqp://xxxx@xxxx:5672/xxxx,1)
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:421)
at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicReject(RecoveryAwareChannelN.java:114)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicReject(AutorecoveringChannel.java:438)
at com.mule.extensions.amqp.internal.connection.channel.MuleAmqpChannel.basicReject(MuleAmqpChannel.java:383)
at com.mule.extensions.amqp.internal.client.SingleMessageQueueingConsumer.handleDelivery(SingleMessageQueueingConsumer.java:47)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.mule.service.scheduler.internal.AbstractRunnableFutureDecorator.doRun(AbstractRunnableFutureDecorator.java:111)
at org.mule.service.scheduler.internal.RunnableFutureDecorator.run(RunnableFutureDecorator.java:54)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Usually, this occurs because the broker sends all the messages in the queue to the requester while the Consume Operation processes only the first message and discards the remaining ones. To avoid this behavior, change one or both of the following parameters depending upon your use case:
-
Clean the Maximum Wait Parameter
-
In Studio, select the Publish consume operation of your flow.
-
Clean any value for the Maximum wait parameter by leaving it empty.
If you are using the XML editor, clean any value for the
maximumWait
parameter.
-
-
Update the Prefetch Count Parameter
-
In Studio, go to the Global Elements tab.
-
Select the AMQP Config (Configuration) configuration.
-
Click Edit.
-
In the Quality of Service tab, set the Prefetch count to
1
.If you are using the XML editor, in the
quality-of-service-config
section of the AMQP global configuration element<amqp:config>
, set theprefetchCount
parameter to1
.
-
Wait for a Message
By default, the maximum wait time is configured to 10 seconds. If no message is available during the specified wait time, an AMQP:TIMEOUT
error is thrown. You can customize the wait time by configuring the maximumWait
and maximumWaitUnit
parameters.
To create an indefinite wait time for a message to arrive, set the maximumWait
value to -1
. In this case, no TIMEOUT
error is thrown.
MIME Types and Encoding
AMQP Connector is designed to auto-determine a message’s MIME type (contentType
) based on the contentType
property of the message. However, there are cases in which it cannot make this determination, and you need firsthand knowledge of the message’s content.
In such cases, you can force the content type to a particular value by using the contentType
parameter.
The same process works for encoding. By default, the connector assumes that the default encoding of Mule runtime engine matches that of the message, if no other information is provided. You can set this by using the encoding
parameter.
Declare a Queue in the Consume Operation
By default, the consume
operation fails with an AMQP:QUEUE_NOT_FOUND
error if the defined queue does not exist.
If a queue must be declared, a definition for the entity should be referenced or defined inline:
<amqp:consume config-ref="Amqp_Config" queueName="testQueue">
<amqp:fallback-queue-definition removalStrategy="SHUTDOWN" exchangeToBind="exchangeToBindToQueue" />
</amqp:consume>
Notice that in the definition of the queue a binding can be created using the parameter exchangeToBind
.
The queue can also be defined as a high level element:
<amqp:queue-definition
name="targetQueueDefinition"
exchangeToBind="testExchange" />
<amqp:consume
config-ref="AMQP_Config"
queueName="testQueue"
fallbackQueueDefinition="targetQueueDefinition">
Avoid Changing the AMQP Topography
You can set the createFallbackQueue
global config to prevent changes to the AMQP topography resulting from the definition of fallback queues, see How to Avoid Changing the AMQP Topography.
Incoming Message Metadata
As stated earlier, each message received consists of two parts:
-
The payload, containing the content of the message
-
The attributes, containing metadata regarding the message
This metadata has four parts that map all the information available in a AMQP Message:
-
Envelope
-
AckId
-
Headers
-
Properties
See the AMQP Reference for information on the structure for attributes.