Listen For New Messages
The Listener source in the AMQP connector provides the ability to consume messages as they arrive into an AMQP Queue.
The syntax to listen for new messages from a queue is:
<amqp:listener config-ref="config" queueName="targetQueue"/>
The source above listens for new messages in the queue identified by the
queueName parameter, returning a
AmqpMessage each time a AMQP Message is available in the Queue.
The message’s content as its payload.
The message’s metadata in the message attributes.
By default, the message consumed is ACKed only when the execution of the flow receiving the message completes successfully. If instead, an error occurs during the execution of the flow, the session is recovered and the message is redelivered according to the recoverStrategy.
For more information regarding a Message ACK, see How To Handle Message Acknowledgement.
When extra processing power is needed, the AMQP Listener allows you to configure the
numberOfConsumers that a given listener uses to consume messages concurrently.
By default, each listener uses four consumers that produce the messages concurrently, each using a separate channel. Since each consumer waits for a message to be processed, that means that you can have a maximum of four messages in-flight at the same time.
If you need to increase the concurrent message processing, just increase the
numberOfConsumers in the Listener. Each consumer uses a different channel.
The AMQP connector automatically determines a message’s MIME type (
contentType) based on the
contentType property of the message. However, there are cases where this process is not enough, and you need first-hand knowledge of a message’s content.
In such cases, you can force that content type to a particular value by using the
The same process works for encoding. By default, the connector assumes that the runtime’s default encoding matches the one in the message if no other information is provided. You can set this by using the
By default, the
consume operation fails when the defined queue does not exist with an
For cases where the queue has to be declared, a definition for the entity should be referenced or defined inline so that the queue is declared.
<amqp:listener config-ref="Amqp_Config" queueName="testQueue"> <amqp:fallback-queue-definition removalStrategy="SHUTDOWN" exchangeToBind="echangeToBind" /> </amqp:listener>
Notice that in the definition of the queue, a binding can be created using the parameter
The queue can also be defined as a high level element:
<amqp:queue-definition name="targetQueueDefinition" exchangeToBind="testExchange" /> <amqp:listener config-ref="Amqp_Config" queueName="testQueue" fallbackQueueDefinition="targetQueueDefinition">
You can set the
createFallbackQueue global config to prevent changes to the AMQP topography resulting from the definition of fallback queues.
When an incoming AMQP message declares a REPLY_TO property, the AMQP Listener automatically produces a response when the message is processed successfully, meaning that no error occurs during the flow execution. In that case, when the flow is completed, a response is published to the exchange specified in the processed message property. In case of error, the recover strategy is applied and a response is not sent to the REPLY_TO property until the message is successfully processed.
In case an error occurs, the
recoverStrategy is applied.
By default, the
recoverStrategy is set to
REQUEUE, which means that the message is sent to the
queue where it was consumed from, and another AMQP consumer can potentially retrieve it.
NO_REQUEUE is set, for example:
<amqp:listener config-ref="Amqp_Config" queueName="testQueue" recoverStrategy="NO_REQUEUE"> <amqp:fallback-queue-definition removalStrategy="SHUTDOWN" exchangeToBind="exchangeToBindToQueue" /> </amqp:listener>
The message is directly sent to the same consumer that retrieved it.
NONE as in the defined
recoverStrategy does not perform any action.
By default, the AMQP listener receives messages from every node in a clustered environment. As in other sources, this can be modified using the
<amqp:listener config-ref="Amqp_Config" queueName="testQueue" recoverStrategy="NO_REQUEUE" primaryNodeOnly="true" />