Contact Us 1-800-596-4880

Anypoint MQ Subscriber Source - Mule 4

The Subscriber source in the Anypoint MQ connector provides the ability to consume messages as they arrive to the destination. Tuning the subscriber behavior is done from within a connector’s configuration.

Message Prefetch

By default, a connector provides a configuration that optimizes maximum message throughput. This means that when message prefetch is enabled, the maximum fetchSize is possible.

When using prefetch mode, the subscriber attempts to keep a local buffer of three times the fetchSize filled at any given time, meaning that it makes as many service calls as needed (with up to three concurrent calls) to retrieve messages and make them available to the buffer. Messages are then taken from the local buffer and dispatched to the flow for processing.

The buffer is most likely to never be full in that Mule accepts many messages very fast and uses as many threads as possible to process them concurrently. Not all the requests to the service may provide the maximum number of messages possible defined by the fetchSize, so more than three requests may be necessary to fill the buffer in cases of low load.

Prefetch can be tuned using the configuration element:

<anypoint-mq:default-subscriber-config name="defaultPrefetchConfig"
        fetchSize="5"
        fetchTimeout="1000"
        frequency="4000">
    <anypoint-mq:connection url="${providerUrl}" clientId="${clientId}"
            clientSecret="${clientSecret}"/>
</anypoint-mq:default-subscriber-config>

<flow name="prefetchedListener">
    <anypoint-mq:subscriber config-ref="defaultPrefetchConfig"
    destination="${invoiceQueue}"/>

    <!-- Message processing-->
</flow>

In this example, reducing the amount of messages retrieved per API call reduces the buffer size, effectively having a buffer of 15 messages instead of the 30 that are preserved by default.

Other parameters like fetchTimeout and frequency modify the behavior of the subscriber when the queue is empty and when the connector is waiting for a message to arrive.

For details on how each parameter works, see the Anypoint MQ Connector Reference - 2.x.

Buffer, Acknowledgment, and Acknowledgment Timeout

While in the buffer, messages are kept as in-flight for the broker, so no redelivery occurs as long as needed for the message to be dispatched to the flow, or until the Subscriber source is stopped and the buffer cleared. Once dispatched, a message remains in-flight until the acknowledgementTimeout elapses.

By default, a consumed message is acknowledged only when the execution of the flow receiving the message finishes successfully. If an error occurs during the execution of the flow, the session recovers and the message is redelivered.

For more information, see Anypoint MQ ACK and NACK Operations.

Poll for New Messages

To have more control on how many messages are consumed by each subscriber, you can use a polling configuration, thus polling for messages from the service at a fixed rate.

This behavior is achieved effectively by disabling prefetch by setting a fetchSize of zero. When using the polling mode, the connector always attempts to fetch 10 messages per request, where each request is done at the fixed rate defined by the pollingTime. No overlapping requests are executed and only one request per poll is handled by the service.

The syntax to listen for new messages from a queue at a fixed rate is:

<anypoint-mq:default-subscriber-config name="pollingConfig"
                                       fetchSize="0"
                                       pollingTime="1000"
                                       acknowledgementTimeout="5000">
    <anypoint-mq:connection url="${providerUrl}"
                                      clientId="${clientId}"
                                      clientSecret="${clientSecret}"/>
</anypoint-mq:default-subscriber-config>

<flow name="prefetchedListener">
    <anypoint-mq:subscriber config-ref="pollingConfig" destination="${invoiceQueue}"/>

    <!-- Message processing-->
</flow>

This source tries to retrieve 10 messages from the queue identified by the destination every 1 second, and then dispatches each to the flow as a MuleMessage. In this case, the message remains in flight for 5 seconds.

The Mule message has:

  • The message’s content as its payload.

  • The message’s metadata in the message attributes.

By default, the message consumed is acknowledged only when the execution of the flow receiving the message finishes successfully. If instead, an error occurs during the execution of the flow, the session is recovered and the message is redelivered.

For more information, see Anypoint MQ ACK and NACK Operations.

Circuit Breaker Capability

Since version 2.1.0, the Anypoint MQ connector Subscriber source provides circuit breaking capability, enabling you to have more control on how the connector deals with errors that occurred while processing a consumed message.

Circuit Breaker

In any scenario in which you need to connect to a service, consider what happens when an external service fails. A common pattern to deal with downtime of an external service is the Circuit Breaker, which allows the system to stop making requests that are doomed to fail, and also allowing the external service to recover under a reduced load. The Circuit Breaker goes through three different states (Closed, Open, Half Open), changing the behavior of the app based on the current state. See the Microsoft Circuit Breaker Pattern site for more information.

In a Mule 4 app using Anypoint MQ means having a Mule flow with an Anypoint MQ Subscriber that consumes messages from a queue, and processes it using an external service. When this service is not available and the request fails, an error propagates and processing of the message finishes as a failure or as a custom error for handling the message, such as sending it to a DLQ. When the external service is not available, every attempt to process a message results in a failure, forcing the app to loop on consuming messages that cannot succeed. This can be avoided if the subscriber is notified of the error in a way that prevents it from consuming more messages for a certain period.

Circuit Breaker Processes

The circuit breaker capability provided by the anypoint-mq:subscriber is bound to the error handling mechanism that Mule provides out of the box, using the errors notification mechanism to keep a count of errors related to an external service, which is known as circuit failures. You can bind any error to a circuit failure, for example you can bind HTTP:TIMEOUT, FTP:SERVICE_NOT_AVAILABLE, or even a custom error from your app like ORG:EXTERNAL_ERROR.

If a Mule flow finishes its execution with an error, the subscriber checks if the error is one of the onErrorTypes that indicate an external service error, and counts consecutive occurrences until the errorsThreshold is reached. When the errorsThreshold is reached, the circuit trips and stops polling for new messages for a configurable tripTimeout duration. Messages are consumed again on the next poll after the tripTimeout elapses. By default, the circuit breaking feature is disabled.

Circuit Breaker States

  • Closed

    The starting state where the subscriber retrieves messages normally from Anypoint MQ based on its configuration, effectively working as if the Circuit Breaker is not present.

  • Closed-Open Transition

    When the amount of failures occurs one after the other during message processing, without any success in between the failures and reaches the errorsThreshold value, then the circuit breaker trips and transition changes to an open state.

    Messages that were already dispatched to the flow then finishes processing, regardless if the result is success or failure.

    Messages kept locally which are in-flight for the broker but haven’t been dispatched yet, are not acknowledged and returned to the queue for redelivery to another consumer.

  • Open

    The subscriber doesn’t attempt to retrieve messages, and skips the iterations silently until the tripTimeout is reached.

  • Half Open

    After the tripTimeout elapses, the subscriber goes to a Half Open state meaning that in the next poll for messages, it retrieves a single message from the service and uses that message to check if the system has recovered before going back to the normal Closed state.

    When a single message is successfully fetched, dispatched to the flow, and its processing finishes with a success, then the subscriber goes back to normal and immediately attempts to fetch more messages.

    If Mule flow processing fails with one of the expected onErrorTypes, the circuit goes back to an Open state and resets the tripTimeout timer.

Configure the Circuit Breaker

You can configure a Circuit Breaker as part of the anypoint-mq:default-subscriber-config.

In Anypoint Studio in the Advanced tab, enable the Circuit Breaker group and populate the following fields as needed:

  • onErrorTypes

    The error types whose occurrence during the flow execution counts as a failure in the circuit. An error occurrence counts only when the flow finishes with an error propagation. By default all errors count as a circuit failure.

  • errorsThreshold

    The number of onErrorTypes errors have to occur for the circuit breaker to open.

  • tripTimeout

    How long the circuit remains open once the errorsThreshold is reached.

  • circuitName

    The name of a circuit breaker to bind to this configuration. By default each queue has its own circuit breaker.

Circuit Configuration for a Single Subscriber

In an example scenario where there’s a single anypoint-mq:subscriber consuming messages from a queue and posting the messages to another service using its REST API, you can stop processing messages after 5 requests to the external service result in a timeout. Once that happens, the example waits for 30 seconds for the service to recover before retrying with a new message.

For this one config is needed with these circuit breaker parameters:

<anypoint-mq:default-subscriber-config name="ConfigWithCircuit" >
   	<anypoint-mq:connection url="${providerUrl}" clientId="${clientId}" clientSecret="${clientSecret}"/>
<anypoint-mq:circuit-breaker
           onErrorTypes="HTTP:TIMEOUT" (1)
           errorsThreshold="5" (2)
           tripTimeout="30" (3)
           tripTimeoutUnit="SECONDS"/>
</anypoint-mq:default-subscriber-config>

<flow name="subscribe">
   <anypoint-mq:subscriber config-ref="ConfigWithCircuit" destination="${subscriberQueue}"/> (4)
    <http:request config-ref="RequesterConfig" path="/external" method="POST"/> (5)
</flow>
1 Configure the error types to trip the circuit. When an error occurs for an errorsThreshold amount of times, polling stops.
2 Set the threshold for how many consequent messages have to occur to consider that the circuit is in a failure state.
3 After the circuit breaker trips because the errorsThreshold is reached, configure how long to wait before you resume polling new messages.
4 On the subscriber, we just need to reference the config with the circuit breaker.
5 Define the operation to throw the error expected by the onErrorTypes parameters.

It’s important to notice that any other error not listed in the onErrorTypes parameter is ignored by the circuit breaker. For this example, errors like `HTTP:BAD_REQUEST`are ignored.

Share a Circuit From Different Queues

In many cases there’s a single common service between the processing of messages from different queues. This example configures the circuitName parameter to bind both subscribers to a single circuit:

<anypoint-mq:default-subscriber-config name="ConfigWithCircuit" >
   	<anypoint-mq:connection url="${providerUrl}" clientId="${clientId}" clientSecret="${clientSecret}"/>
<anypoint-mq:circuit-breaker
           circuitName="InvoiceProcess" (1)
           onErrorTypes="FTP:RETRY_EXHAUSTED, HTTP:SERVICE_UNAVAILABLE" (2)
           errorsThreshold="10"
           tripTimeout="5"
           tripTimeoutUnit="MINUTES"/>
</anypoint-mq:default-subscriber-config>

<flow name="subscribe">
    <anypoint-mq:subscriber  config-ref="ConfigWithCircuit" destination="${reservationsQueue}"/> (3)
    <flow-ref name="invoiceProcess">
</flow>

<flow name="otherSubscribe">
    <anypoint-mq:subscriber  config-ref="ConfigWithCircuit" destination="${paymentsQueue}"/> (3)
    <flow-ref name="invoiceProcess">
</flow>

<sub-flow name="invoiceProcess">
  <ftp:write path="${auditFolder}" config-ref="ftp-config"/> (4)
  <http:request config-ref="requestConfig" path="/external"/> (5)
</sub-flow>
1 Set the circuitName parameter to share a common circuit breaker on multiple queues.
2 Consider the two different errors that can affect the processing of messages from the subscriber, passing each as a CSV list.
3 For both subscribers, reference the config with the circuit breaker configuration.
4 This component may throw an FTP:RETRY_EXHAUSTED error along with many others. Only the FTP:RETRY_EXHAUSTED error is taken into account by the circuit breaker.
5 The HTTP connector may throw an HTTP:SERVICE_UNAVAILABLE preventing the message to be processed.

In this scenario, both subscribers stop polling for messages as soon as the error count reaches the errorsThreshold="10" value, counting both FTP:RETRY_EXHAUSTED and HTTP:SERVICE_UNAVAILABLE errors. When the tripTimeout value elapses, one of the subscribers polls for a message and uses it to test the circuit, enabling the polling for both subscribers if the processing of that message succeeds.

FIFO Queues

FIFO queues are most suitable for single-consumer scenarios. When one consumer is accessing a message, all other consumers are blocked until the first batch is processed. No messages are delivered until all in-flight messages are acknowledged or not acknowledged.

With message groups, multiple consumers can access messages in a FIFO queue at the same time. In this case, one consumer accesses messages in a group and another consumer accesses messages in another group. Message order is preserved within each message group.

Always use a single-thread flow configuration when consuming messages from a FIFO queue. If flow processing is not single-threaded, messages processing order might be lost.

FIFO Queues and Clustering

FIFO queues behave the same in a clustered environment as in a non-clustered environment.

FIFO queues consume messages in the specified order. After the message is consumed, any further message processing can be distributed to other nodes. In this case, if the consumer acknowledges a message before it is fully processed, message order might be lost during message processing.

  • In an on-premises, high availability clustering environment, the Subscriber source in the Anypoint MQ connector runs on all nodes by default.

    You can change the behavior to run as a primary node by selecting Primary node only in the Advanced tab.

  • In CloudHub with multiple workers, all workers are run as a primary node.

    In this case, all workers running the application consume from the same FIFO queue.

View on GitHub