Contact Us 1-800-596-4880

Anypoint MQ Consume Operation - Mule 4

The Consume operation in the Anypoint MQ connector enables the app to consume a single message from a queue at any time in the flow using the acknowledgment strategy that supports your use case.

Anypoint MQ supports consuming from queues, not message exchanges.

If an app attempts to consume from a message exchange, the Anypoint MQ REST API throws the following error: The destination should be a queue.

Incoming Message Structure

Every incoming message consists of two parts:

  • payload - Represents the message body

  • attributes - Contains metadata for the received message, such as its ID or other properties

Mule message, including payload and message attributes

Message Consumption

The simplest way to consume a message from a queue is to drag the Consume operation from the Mule palette to the canvas and configure the Queue parameter with the name of the queue from which to retrieve the message:

Connector configuration, including destination, acknowledgment mode and timeout, and polling timeout
<anypoint-mq:consume config-ref="Anypoint_MQ_config" destination="myQueue"/>

The Consume operation in this example consumes the first available message in myQueue and converts it to a MuleMessage instance with this structure:

  • The message body as payload.

  • The message metadata in the message attributes. Attributes might include messageId or redeliveryCount.

Use a DataWeave expression in the Queue parameter to dynamically change the target queue from which to consume the message.

Consumed Message Acknowledgment

  • Automatic

    By default, acknowledgementMode is IMMEDIATE, meaning that the message is acknowledged as soon as it is consumed from the queue and before it’s delivered to the Mule flow. If the message acknowledgment fails, the Consume operation terminates with an ANYPOINT-MQ:ACKING error.

    For more information, see Immediate Acknowledgment.

  • Manual

    To manually control the acknowledgment of the message, change acknowledgementMode to MANUAL. When using the MANUAL acknowledgment mode, the app logic decides when to perform the acknowledgment of the message, using the ACK or NACK operations.

    To perform the manual acknowledgment, you need the value of ackToken provided as part of the resulting message attributes.

    For more information, see Manual Acknowledgment.

For information about acknowledgment timeouts, see Acknowledgment Timeout.

Empty Queue Management

When a queue has messages, the Consume operation returns immediately with one of the messages.

When a queue is empty, the operation behavior depends on the value of the pollingTime parameter:

  • No value

    By default, the Consume operation waits for 10000 milliseconds (10 seconds) for a new message to arrive when consuming from an empty queue.

    If Anypoint MQ receives no messages in that time, Consume fails with an ANYPOINT-MQ:TIMEOUT error.

  • 0

    If the value of pollingTime is 0, the Consume operation waits for the default of 10000 milliseconds (10 seconds).

    If Anypoint MQ receives no messages in that time, Consume produces a null message as the result and doesn’t produce an error.

  • 1 to 20000

    If the value of pollingTime is between one and 20000 milliseconds (the maximum value), Consume waits for the time specified by pollingTime for a new message to arrive.

    If Anypoint MQ receives no messages within the given time window, Consume fails with an ANYPOINT-MQ:TIMEOUT error.

  • If the value of pollingTime is outside the range of 0-20000 milliseconds (20 seconds), Consume fails with an ANYPOINT-MQ:CONSUMING error.

Output Message Metadata

Each message received consists of two parts: Payload and Attributes. Attributes contains the metadata for the message.

This metadata maps all the information available in an Anypoint MQ message, including:

  • Message ID

  • Headers Map

  • Properties Map

Mule message, including payload and message attributes

For information about the attributes structure, see the Consume operation in the Anypoint MQ Connector Reference - 4.x.

View on GitHub