Contact Free trial Login

Anypoint MQ Subscriber Source - Mule 4

The Subscriber source in the Anypoint MQ connector enables the app to listen for new messages and consume them as they arrive at the destination. You can configure different listening strategies that enable you to tune the consumption for performance, predictability, and schedules.

Maximum Message Throughput

By default, the Subscriber source works in continuous listening mode, providing maximum throughput by consuming messages as soon as they arrive in the queue. This is called the prefetch subscriber-type configuration. When using the prefetch mode, the Subscriber source attempts to keep a local buffer of messages full, making them available to be dispatched to the Mule flow as soon as the app can accept them.

For subscriber-type, you configure the maxLocalMessages parameter to specify the target size for a full buffer, which controls how quickly messages are taken from the queue.

Anypoint MQ Subscriber With Prefetch
<anypoint-mq:subscriber doc:name="Subscriber"
     config-ref="Anypoint_MQ_Config" destination="myQueue">
  <anypoint-mq:subscriber-type>
      <anypoint-mq:prefetch maxLocalMessages="30"/>
  </anypoint-mq:subscriber-type>
</anypoint-mq:subscriber>
  • Use a larger buffer size for maximum performance, keeping in mind that the messages kept locally won’t be available for any other consumer.

    When a consumer takes a message from the queue and stores it in the local buffer, the message appears as in-flight for any other consumer and prevents them from consuming it. The message remains in this in-flight state for as long as necessary for the app to begin the processing.

  • Use a smaller buffer size to process messages as they are published to the queue.

    The smaller buffer restricts app-level throughput and avoids blocking competing consumers.

When using the prefetch mode, the acknowledgementTimeout timer begins from the moment the message is dispatched to the flow, not when it is taken from the queue. The time the message remains in the local buffer doesn’t count as part of the timeout.

Predictable Message Consumption

If your app requires predictable and controlled message consumption rather than maximum throughput, you can configure Subscriber as a polling source. The Subscriber source checks for new messages in the queue at a fixed scheduling rate. Every time the poll is triggered, the Subscriber source retrieves up to 10 messages to be dispatched to the flow in order, one by one. To do so, configure polling in subscriber-type.

Messages aren’t consumed immediately. Instead, they remain in the queue until the next poll of the Subscriber source is triggered. The queue contains up to 10 messages to be retrieved. For example, if your queue has 15 messages, one poll consumes the first 10 messages and the remaining 5 messages wait for the next poll.

The most basic subscriber declaration looks like this:

Anypoint MQ Default Subscriber
<anypoint-mq:subscriber destination="myQueue" config-ref="Anypoint_MQ_Config"/>

This subscriber declaration attempts to retrieve 10 messages from the declared destination every 1 second, dispatching the retrieved messages to the flow one-by-one as a MuleMessage instance.

The resulting MuleMessage has:

  • The message body as its payload.

  • The message metadata in the message attributes.

Anypoint MQ Subscriber Output Message

You can customize the Subscriber source polling strategy using the standard Mule runtime engine scheduling strategies:

Customized Fixed Polling

You can customize the frequency at which the Subscriber source polls for new messages by declaring an anypoint-mq:polling anypoint-mq:subscriber-type strategy with a customized fixed-frequency scheduling strategy:

Anypoint MQ Subscriber With Custom Fixed Frequency
<anypoint-mq:subscriber destination="myQueue"
   config-ref="Anypoint_MQ_Config">
    <anypoint-mq:subscriber-type>
        <anypoint-mq:polling>
            <scheduling-strategy>
                <fixed-frequency frequency="2000"
                 startDelay="100"/>
            </scheduling-strategy>
        </anypoint-mq:polling>
    </anypoint-mq:subscriber-type>
</anypoint-mq:subscriber>

Cron-Based Polling

The Anypoint MQ Subscriber can also use a cron scheduling strategy, which enables you to schedule jobs such as "Run every minute starting at 2pm and ending at 2:59pm, every day". To use this scheduling strategy, declare an anypoint-mq:polling anypoint-mq:subscriber-type strategy with a customized cron scheduling strategy:

Anypoint MQ Subscriber With Custom Cron Strategy
<anypoint-mq:subscriber destination="myQueue"
    config-ref="Anypoint_MQ_Config">
    <anypoint-mq:subscriber-type>
        <anypoint-mq:polling>
            <scheduling-strategy>
                <cron expression="0 * 14 * * ?"
                timeZone="America/Los_Angeles"/>
            </scheduling-strategy>
        </anypoint-mq:polling>
    </anypoint-mq:subscriber-type>
</anypoint-mq:subscriber>

Message Acknowledgment

You can acknowledge received messages automatically only if the flow execution finishes successfully or as your app consumes them, before processing them.

  • Automatic

    By default, the Subscriber source uses the AUTO acknowledgment mode. With this mode, the messages that the Subscriber source retrieves are acknowledged automatically after message flow processing succeeds. This means that the Subscriber source receives a message, dispatches it to the flow, and waits to see how the message processing finishes. It executes an ACK only when the processing finishes without exceptions.

    If the execution of the processing flow finishes with a propagated exception, the message is automatically not acknowledged and is returned to the queue for redelivery.

    For more information, see Automatic Acknowledgment.

  • Immediate

    When you use the IMMEDIATE acknowledgment mode, the consumed message is acknowledged right before being dispatched to the Mule flow. If the message acknowledgment fails, the message is discarded. The message isn’t dispatched to the flow and remains in-flight until the acknowledgment timeout.

    For more information, see Manual Acknowledgment.

  • Manual

    When you use the MANUAL acknowledgment mode, the app logic decides when to perform the acknowledgment of the message, using the ACK or NACK sources.

    To perform the manual acknowledgment, you need the value of ackToken provided as part of the resulting message attributes.

    For more information, see Manual Acknowledgment.

For information about acknowledgment timeouts, see Acknowledgment Timeout.

Circuit Breaker Capability

The Subscriber source provides circuit breaking capability, which enables you to control how the connector handles errors that occur while processing a consumed message.

For example, when connecting to an external service, you can use the circuit breaker to handle any downtime of that service. The circuit breaker allows the system to stop making requests and allows the external service to recover under a reduced load.

Using Anypoint MQ in a Mule 4 app means having a Mule flow with an MQ subscriber that consumes messages from a queue and processes them using an external service. When this service isn’t available:

  1. The request fails.

  2. An error results.

  3. Message processing finishes as either a failure or as a custom error for handling the message, such as sending it to a dead letter queue (DLQ).

When the external service is not available, every attempt to process a message results in a failure, forcing the app to loop, consuming messages that cannot succeed. You can avoid this behavior by notifying the subscriber of the error in a way that prevents it from consuming more messages for a certain period.

Circuit Breaker Processes

The circuit breaker capability that the Subscriber source provides is bound to the error handling mechanism provided by Mule. It uses the error notification mechanism to count errors related to an external service, which is known as a circuit failure. You can bind any error to a circuit failure. For example, you can bind HTTP:TIMEOUT, FTP:SERVICE_NOT_AVAILABLE, or even a custom error from your app, such as ORG:EXTERNAL_ERROR.

If a Mule flow finishes its execution with an error, the Subscriber source checks if the error is one of onErrorTypes that indicates an external service error, and counts consecutive occurrences until errorsThreshold is reached. When errorsThreshold is reached, the circuit trips and stops polling for new messages the duration specified by tripTimeout. Messages are consumed again on the next poll after the tripTimeout elapses.

By default, the circuit breaking feature is disabled.

Circuit Breaker States

The circuit breaker has three states: Closed, Open, and Half Open. The behavior of the app changes based on the current state. See the Microsoft Circuit Breaker pattern for more information.

Circuit Breaker States Diagram
  • Closed

    The starting state where the Subscriber source retrieves messages normally from MQ based on its configuration, effectively working as if the circuit breaker is not present.

  • Closed-Open Transition

    When the number of failures occurs in succession during message processing, without successes, and reaches the errorsThreshold value, the circuit breaker trips and the circuit breaker transitions to an Open state.

    Messages that were already dispatched to the flow then finish processing, regardless of whether the result is success or failure.

    Messages kept locally that are in-flight for the broker but haven’t been dispatched yet are not acknowledged and returned to the queue for redelivery to another consumer.

  • Open

    The Subscriber source doesn’t attempt to retrieve messages, and skips the iterations silently until tripTimeout is reached.

  • Half Open

    After tripTimeout elapses, the Subscriber source goes to a Half Open state. In the next poll for messages, the Subscriber source retrieves a single message from the service and uses that message to check if the system has recovered before going back to the normal Closed state.

    When the Subscriber source successfully fetches a single message, dispatches it to the flow, and processing finishes successfully, the Subscriber source returns to normal and immediately attempts to fetch more messages.

    If Mule flow processing fails with one of the expected onErrorTypes, the circuit goes back to an Open state and resets the tripTimeout timer.

Configure the Circuit Breaker

You can configure a Circuit Breaker as either a Global Circuit Breaker or a Private Circuit Breaker.

Either way, the configuration parameters are the same:

  • onErrorTypes

    The error types that count as a failure during the flow execution. An error occurrence counts only when the flow finishes with an error propagation. By default, all errors count as a circuit failure.

  • errorsThreshold

    The number of onErrorTypes errors that must occur for the circuit breaker to open.

  • tripTimeout

    How long the circuit remains open once errorsThreshold is reached.

  • circuitName

    The name of a circuit breaker to bind to this configuration. By default, each queue has its own circuit breaker.

Global Circuit Breaker

Use a Global Circuit Breaker when you want to share the circuit state across multiple subscribers, as if subscribers are part of the same "circuit".

  1. In Anypoint Studio, click the Global Elements tab in the canvas.

  2. Select Create > Component Configuration > Circuit Breaker.

amq 3x cb global canvas

In the configuration wizard, populate the following fields as needed. Once the configuration is complete, you can reference this Circuit_breaker declaration from any Anypoint MQ Subscriber.

To reference a circuit breaker:

  1. Select the Subscriber source in the canvas.

  2. Click the Advanced tab.

  3. Select Circuit Breaker > Global Reference, and then choose one from the list.

amq 3x cb global ref

Private Circuit Breaker

You declare a Private Circuit Breaker internally on a single subscriber. This circuit declaration is used only in the flow where the Subscriber source is declared, isolated from all the other circuits.

To use this configuration:

  1. Select the Subscriber source in the canvas.

  2. Click the Advanced tab.

  3. Select Circuit breaker > Edit Inline, and then choose one from the list.

amq 3x cb inline

Circuit Breaker Examples

Circuit Configuration for a Single Subscriber

In this example, a single subscriber consumes messages from a queue and posts the messages to another service using its REST API. You can stop processing messages after 5 requests to the external service result in a timeout. Once processing stops, the Subscriber source waits for 30 seconds for the service to recover before retrying with a new message.

For this example, you need one config with these circuit breaker parameters:

<anypoint-mq:config name="Anypoint_MQ_Config">
    <anypoint-mq:connection url="${providerUrl}"
    clientId="${clientId}"
    clientSecret="${clientSecret}"/>
</anypoint-mq:config>

<flow name="subscribe">
   <anypoint-mq:subscriber config-ref="Anypoint_MQ_Config"
      destination="${subscribedQueue}">
        <anypoint-mq:circuit-breaker
          onErrorTypes="HTTP:TIMEOUT"   <!-- (1) -->
          errorsThreshold="5"           <!-- (2) -->
          tripTimeout="30"              <!-- (3) -->
          tripTimeoutUnit="SECONDS"/>
   </anypoint-mq:subscriber>
    <http:request config-ref="RequesterConfig"
       path="/external" method="POST"/> <!-- (4) -->
</flow>

(1)

Configures the error types to trip the circuit. When an error occurs for an errorsThreshold amount of times, polling stops.

(2)

Sets the threshold for how many consequent messages must occur to consider the circuit to be in a failure state.

(3)

Specifies how long to wait before resuming new message polling after the circuit breaker trips because errorsThreshold is reached.

(4)

Defines the operation to throw the error expected by the onErrorTypes parameters.

The circuit breaker ignores all errors that aren’t listed in the onErrorTypes parameter. In this example, the circuit breaker ignores errors such as HTTP:BAD_REQUEST.

Share a Circuit from Different Queues

In many cases, a single common service processes messages from different queues. This example configures the circuitName parameter to bind both subscribers to a single circuit:

<anypoint-mq:config name="Anypoint_MQ_Config">
    <anypoint-mq:connection url="${providerUrl}"
       clientId="${clientId}"
       clientSecret="${clientSecret}"/>
</anypoint-mq:config>

<anypoint-mq:circuit-breaker
    name="InvoiceProcess"                    <!-- (1) -->
    onErrorTypes="FTP:RETRY_EXHAUSTED, HTTP:SERVICE_UNAVAILABLE" <!-- (2) -->
    errorsThreshold="10"
    tripTimeout="5"
    tripTimeoutUnit="MINUTES"/>

<flow name="subscribe">
    <anypoint-mq:subscriber destination="${reservationsQueue}"
       config-ref="Anypoint_MQ_Config"
        circuitBreaker="GlobalCircuit"/>     <!-- (3) -->
    <flow-ref name="invoiceProcess">
</flow>

<flow name="otherSubscribe">
    <anypoint-mq:subscriber
      destination="${paymentsQueue}"
        config-ref="Anypoint_MQ_Config"
        circuitBreaker="GlobalCircuit"/>    <!-- (3) -->
    <flow-ref name="invoiceProcess">
</flow>

<sub-flow name="invoiceProcess">
  <ftp:write path="${auditFolder}"          <!-- (4) -->
	   config-ref="ftp-config"/>
  <http:request config-ref="requestConfig"  <!-- (5) -->
	   path="/external"/>
</sub-flow>

(1)

Sets the name parameter to share a common circuit breaker on multiple queues.

(2)

Identifies two errors that can affect the processing of messages from the Subscriber source and passes each as a CSV list.

(3)

For both subscribers, references the global circuit breaker configuration.

(4)

Might throw several errors, but only FTP:RETRY_EXHAUSTED is relevant to the circuit breaker.

(5)

The HTTP connector might throw an HTTP:SERVICE_UNAVAILABLE error, preventing the message from being processed.

In this scenario, both subscribers stop polling for messages as soon as the error count reaches the errorsThreshold="10" value, counting both FTP:RETRY_EXHAUSTED and HTTP:SERVICE_UNAVAILABLE errors. When the tripTimeout value elapses, one of the subscribers polls for a message and uses it to test the circuit, enabling the polling for both subscribers if the processing of that message succeeds.

FIFO Queues

FIFO queues are most suitable for single-consumer scenarios. When one consumer is accessing a message, all other consumers are blocked until the first batch is processed. No messages are delivered until all in-flight messages are acknowledged or not acknowledged.

With message groups, multiple consumers can access messages in a FIFO queue at the same time. In this case, one consumer accesses messages in a group and another consumer accesses messages in another group. Message order is preserved within each message group.

Always use a single-thread flow configuration when consuming messages from a FIFO queue. If flow processing is not single-threaded, messages processing order might be lost.

FIFO Queues and Clustering

FIFO queues behave the same in a clustered environment as in a nonclustered environment.

FIFO queues consume messages in the specified order. After the message is consumed, any further message processing can be distributed to other nodes. In this case, if the consumer acknowledges a message before it is fully processed, message order might be lost during message processing.

  • In an on-premises, high availability clustering environment, the Subscriber source in the Anypoint MQ connector runs on all nodes by default.

    You can change the behavior to run as a primary node by selecting Primary node only in the Advanced tab.

  • In CloudHub with multiple workers, all workers are run as a primary node.

    In this case, all workers running the application consume from the same FIFO queue.

We use cookies to make interactions with our websites and services easy and meaningful, to better understand how they are used and to tailor advertising. You can read more and make your cookie choices here. By continuing to use this site you are giving us your consent to do this.