Contact Free trial Login

Listen For New Messages

The Listener source in the AMQP connector provides the ability to consume messages as they arrive into an AMQP Queue.

Listen for New Messages

The syntax to listen for new messages from a queue is:

<amqp:listener config-ref="config" queueName="targetQueue"/>

The source above listens for new messages in the queue identified by the queueName parameter, returning a AmqpMessage each time a AMQP Message is available in the Queue.

The AmqpMessage has:

  • The message’s content as its payload.

  • The message’s metadata in the message attributes.

By default, the message consumed is ACKed only when the execution of the flow receiving the message completes successfully. If instead, an error occurs during the execution of the flow, the session is recovered and the message is redelivered according to the recoverStrategy.

For more information regarding a Message ACK, see How To Handle Message Acknowledgement.

Configure Message Throughput

When extra processing power is needed, the AMQP Listener allows you to configure the numberOfConsumers that a given listener uses to consume messages concurrently. By default, each listener uses four consumers that produce the messages concurrently, each using a separate channel. Since each consumer waits for a message to be processed, that means that you can have a maximum of four messages in-flight at the same time. If you need to increase the concurrent message processing, just increase the numberOfConsumers in the Listener. Each consumer uses a different channel.

MIME Types and Encoding

The AMQP connector automatically determines a message’s MIME type (contentType) based on the contentType property of the message. However, there are cases where this process is not enough, and you need first-hand knowledge of a message’s content.

In such cases, you can force that content type to a particular value by using the contentType parameter.

The same process works for encoding. By default, the connector assumes that the runtime’s default encoding matches the one in the message if no other information is provided. You can set this by using the inboundEncoding parameter.

Declare a Queue in the AMQP Listener

By default, the consume operation fails when the defined queue does not exist with an AMQP:QUEUE_NOT_FOUND error.

For cases where the queue has to be declared, a definition for the entity should be referenced or defined inline so that the queue is declared.

<amqp:listener config-ref="Amqp_Config"
	queueName="testQueue">
	<amqp:fallback-queue-definition
	  removalStrategy="SHUTDOWN"
	  exchangeToBind="echangeToBind" />
</amqp:listener>

Notice that in the definition of the queue, a binding can be created using the parameter exchangeToBind.

The queue can also be defined as a high level element:

<amqp:queue-definition
  name="targetQueueDefinition"
  exchangeToBind="testExchange" />

<amqp:listener config-ref="Amqp_Config"
  queueName="testQueue"
  fallbackQueueDefinition="targetQueueDefinition">

How to Avoid Changing the AMQP Topography

You can set the createFallbackQueue global config to prevent changes to the AMQP topography resulting from the definition of fallback queues.

Reply to Incoming Messages

When an incoming AMQP message declares a REPLY_TO property, the AMQP Listener automatically produces a response when the message is processed successfully, meaning that no error occurs during the flow execution. In that case, when the flow is completed, a response is published to the exchange specified in the processed message property. In case of error, the recover strategy is applied and a response is not sent to the REPLY_TO property until the message is successfully processed.

Recover Strategy in Case of Error

In case an error occurs, the recoverStrategy is applied. By default, the recoverStrategy is set to REQUEUE, which means that the message is sent to the queue where it was consumed from, and another AMQP consumer can potentially retrieve it.

In case NO_REQUEUE is set, for example:

<amqp:listener config-ref="Amqp_Config"
		queueName="testQueue"
		recoverStrategy="NO_REQUEUE">
	<amqp:fallback-queue-definition
		removalStrategy="SHUTDOWN"
		exchangeToBind="exchangeToBindToQueue" />
</amqp:listener>

The message is directly sent to the same consumer that retrieved it. NONE as in the defined recoverStrategy does not perform any action.

Cluster Environment

By default, the AMQP listener receives messages from every node in a clustered environment. As in other sources, this can be modified using the primaryNodeOnly attribute.

<amqp:listener config-ref="Amqp_Config"
	queueName="testQueue"
	recoverStrategy="NO_REQUEUE"
	primaryNodeOnly="true" />

See Also

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub