<amqp:listener config-ref="config" queueName="targetQueue"/>
Listen For New Messages - Mule 4
The Listener source in Anypoint Connector for AMQP (AMQP Connector) provides the ability to consume messages as they arrive into an AMQP queue.
Listen for New Messages
The syntax to listen for new messages from a queue is:
The source listens for new messages in the queue identified by the queueName
parameter, returning a AmqpMessage
each time a AMQP message is available in the queue.
The AmqpMessage
has:
-
The message’s content as its payload.
-
The message’s metadata in the message attributes.
By default, the message consumed is ACKed only when the execution of the flow receiving the message completes successfully. If instead, an error occurs during the execution of the flow, the session is recovered and the message is redelivered according to the recoverStrategy.
For more information regarding a Message ACK, see How To Handle Message Acknowledgement.
Configure Message Throughput
When extra processing power is needed, the AMQP Listener allows you to configure the numberOfConsumers
that a given listener uses to consume messages concurrently.
By default, each listener uses four consumers that produce the messages concurrently, each using a separate channel. Since each consumer waits for a message to be processed, that means that you can have a maximum of four messages in-flight at the same time.
If you need to increase the concurrent message processing, just increase the numberOfConsumers
in the Listener. Each consumer uses a different channel.
MIME Types and Encoding
The AMQP connector automatically determines a message’s MIME type (contentType
) based on the contentType
property of the message. However, there are cases where this process is not enough, and you need first-hand knowledge of a message’s content.
In such cases, you can force that content type to a particular value by using the contentType
parameter.
The same process works for encoding. By default, the connector assumes that the runtime’s default encoding matches the one in the message if no other information is provided. You can set this by using the inboundEncoding
parameter.
Declare a Queue in the AMQP Listener
By default, the consume
operation fails when the defined queue does not exist with an AMQP:QUEUE_NOT_FOUND
error.
For cases where the queue has to be declared, a definition for the entity should be referenced or defined inline so that the queue is declared.
<amqp:listener config-ref="Amqp_Config"
queueName="testQueue">
<amqp:fallback-queue-definition
removalStrategy="SHUTDOWN"
exchangeToBind="echangeToBind" />
</amqp:listener>
Notice that in the definition of the queue, a binding can be created using the parameter exchangeToBind
.
The queue can also be defined as a high level element:
<amqp:queue-definition
name="targetQueueDefinition"
exchangeToBind="testExchange" />
<amqp:listener config-ref="Amqp_Config"
queueName="testQueue"
fallbackQueueDefinition="targetQueueDefinition">
How to Avoid Changing the AMQP Topography
You can set the createFallbackQueue
global config to prevent changes to the AMQP topography resulting from the definition of fallback queues.
Reply to Incoming Messages
When an incoming AMQP message declares a REPLY_TO property, the AMQP Listener automatically produces a response when the message is processed successfully, meaning that no error occurs during the flow execution. In that case, when the flow is completed, a response is published to the exchange specified in the processed message property. In case of error, the recover strategy is applied and a response is not sent to the REPLY_TO property until the message is successfully processed.
Recover Strategy in Case of Error
In case an error occurs, the recoverStrategy
is applied.
By default, the recoverStrategy
is set to REQUEUE
, which means that the message is sent to the queue
where it was consumed from, and another AMQP consumer can potentially retrieve it.
In case NO_REQUEUE
is set, for example:
<amqp:listener config-ref="Amqp_Config"
queueName="testQueue"
recoverStrategy="NO_REQUEUE">
<amqp:fallback-queue-definition
removalStrategy="SHUTDOWN"
exchangeToBind="exchangeToBindToQueue" />
</amqp:listener>
The message is directly sent to the same consumer that retrieved it.
NONE
as in the defined recoverStrategy
does not perform any action.
Cluster Environment
By default, the AMQP listener receives messages from every node in a clustered environment. As in other sources, this can be modified using the primaryNodeOnly
attribute.
<amqp:listener config-ref="Amqp_Config"
queueName="testQueue"
recoverStrategy="NO_REQUEUE"
primaryNodeOnly="true" />