Contact Us 1-800-596-4880

Anypoint MQ Subscriber Source - Mule 4

The Subscriber source in the Anypoint MQ connector enables the app to listen for new messages and consume them as they arrive at the destination. You can configure different listening strategies that enable you to tune the consumption for performance, predictability, and schedules.

Maximum Message Throughput

By default, the Subscriber source works in continuous listening mode, providing maximum throughput by consuming messages as soon as they arrive in the queue. This is called the prefetch subscriber-type configuration. When using the prefetch mode, the Subscriber source attempts to keep a local buffer of messages full, making them available to be dispatched to the Mule flow as soon as the app can accept them.

Prefetch subscriber type
Figure 1. The arrow shows Subscriber type set to Prefetch in the Subscriber properties window.

For the prefetch subscriber-type configuration, you configure the maxLocalMessages parameter to specify the target size for a full buffer, which controls how quickly messages are taken from the queue.

<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config" destination="myQueue">
</anypoint-mq:subscriber>
  • Use a larger buffer size for maximum performance, keeping in mind that the messages kept locally won’t be available for any other consumer.

    When a consumer takes a message from the queue and stores it in the local buffer, the message appears as in-flight for any other consumer and prevents them from consuming it. The message remains in this in-flight state for as long as necessary for the app to begin the processing.

  • Use a smaller buffer size to process messages as they are published to the queue.

    The smaller buffer restricts app-level throughput and avoids blocking competing consumers.

FIFO Queues and Prefetch Mode

When using the prefetch mode, the acknowledgementTimeout timer begins when the message is taken from the queue. Anypoint MQ attempts to honor the expiration time specified by acknowledgementTimeout. However, Anypoint MQ extends the expiration timeout if the time that the message remains in the buffer reaches 80% of the timeout value.

Don’t use prefetch mode in the following situations:

  • Applications with long processing times that use the maxConcurrency attribute to limit concurrent processing.

    These options can cause messages to remain in the buffer longer, which can result in the messages expiring and being sent back to the queue.

  • When using FIFO queues

    Because Anypoint MQ can’t guarantee ordering when using prefetch mode, don’t use it with FIFO queues.

    Using prefetch mode for applications with long processing times can consume significant resources because Anypoint MQ monitors the ACK TTL for all messages in the local buffer and extends the TTL to avoid sending messages back to the queue.

Predictable Message Consumption

If your app requires predictable and controlled message consumption rather than maximum throughput, you can configure the Subscriber as a polling source. The Subscriber source checks for new messages in the queue at a fixed scheduling rate. Every time the poll is triggered, the Subscriber source retrieves between 1 and 10 messages to dispatch to the flow individually in order.

Messages aren’t consumed immediately. Instead, they remain in the queue until the next poll of the Subscriber source is triggered. The queue contains up to 10 messages to be retrieved. For example, if your queue has 15 messages, one poll consumes the first 10 messages and the remaining 5 messages wait for the next poll.

To configure the Subscriber as a polling source, specify the polling subscriber-type.

The default polling subscriber-type declaration looks like this:

Polling subscriber type
Figure 2. The arrow shows Subscriber type set to Polling in the Subscriber properties window.
<anypoint-mq:subscriber doc:name="Subscriber" config-ref="Anypoint_MQ_Config" destination="myQueue">
  <anypoint-mq:subscriber-type >
    <anypoint-mq:polling >
      <scheduling-strategy >
        <fixed-frequency />
      </scheduling-strategy>
    </anypoint-mq:polling>
  </anypoint-mq:subscriber-type>
</anypoint-mq:subscriber>

The default polling subscriber-type declaration attempts to retrieve 10 messages from the declared destination every 1 second, dispatching the retrieved messages to the flow individually as a MuleMessage instance.

The resulting MuleMessage has:

  • The message body as its payload

  • The message metadata in the message attributes

Message output payload and attributes
Figure 3. The screenshot shows (1) Payload and (2) Attributes of the message in the Output tab.

You can customize the Subscriber source polling strategy using the standard Mule runtime engine scheduling strategies:

Customized Fixed Polling

You can customize the frequency at which the Subscriber source polls for new messages by declaring a polling subscriber-type strategy with a customized fixed-frequency scheduling strategy.

The default fixed-frequency scheduling strategy declaration looks like this:

Fixed Frequency Scheduling Strategy
Figure 4. The arrow shows Scheduling Strategy set to Fixed Frequency in the Subscriber properties window.
<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config" destination="myQueue">
  <anypoint-mq:subscriber-type >
    <anypoint-mq:polling>
      <scheduling-strategy >
        <fixed-frequency/>
      </scheduling-strategy>
    </anypoint-mq:polling>
  </anypoint-mq:subscriber-type>
</anypoint-mq:subscriber>

Cron-Based Polling

The Anypoint MQ Subscriber can also use a cron scheduling strategy, which enables you to schedule jobs such as "Run every minute starting at 2pm and ending at 2:59pm, every day". To use this scheduling strategy, declare a polling subscriber-type strategy with a customized cron scheduling strategy:

Cron Scheduling Strategy
Figure 5. The arrow shows Scheduling Strategy set to Cron in the Subscriber properties window.
<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config" destination="myQueue">
  <anypoint-mq:subscriber-type >
    <anypoint-mq:polling >
      <scheduling-strategy >
        <cron expression="0 * 14 * * ?" timeZone="America/Los_Angeles" />
      </scheduling-strategy>
    </anypoint-mq:polling>
  </anypoint-mq:subscriber-type>
</anypoint-mq:subscriber>

Message Acknowledgment

You can acknowledge received messages automatically only if the flow execution finishes successfully or as your app consumes them, before processing them.

  • Automatic

    By default, the Subscriber source uses the AUTO acknowledgment mode. With this mode, the messages that the Subscriber source retrieves are acknowledged automatically after message flow processing succeeds. This means that the Subscriber source receives a message, dispatches it to the flow, and waits to see how the message processing finishes. It executes an ACK only when the processing finishes without exceptions.

    If the execution of the processing flow finishes with a propagated exception, the message is automatically not acknowledged and is returned to the queue for redelivery.

    For more information, see Automatic Acknowledgment.

  • Immediate

    When you use the IMMEDIATE acknowledgment mode, the consumed message is acknowledged (deleted from the queue) and then dispatched to the Mule flow for processing. If the message acknowledgment fails, the message is discarded. The message isn’t dispatched to the flow and remains in-flight until the acknowledgment timeout.

    If the app restarts between being acknowledged and dispatched, the message might not be available again because it was deleted from the queue. To prevent message deletion until after processing, use the AUTO or MANUAL acknowledgment mode instead.

    Using the Subscriber source in Prefetch mode with the IMMEDIATE acknowledgment mode sometimes results in thread accumulation and an unresponsive application.

    For more information, see Immediate Acknowledgment.

  • Manual

    When you use the MANUAL acknowledgment mode, the app logic decides when to perform the acknowledgment of the message, using the ACK or NACK sources.

    To perform the manual acknowledgment, you need the value of ackToken provided as part of the resulting message attributes.

    For more information, see Manual Acknowledgment.

For information about acknowledgment timeouts, see Acknowledgment Timeout.

Circuit Breaker Capability

The Subscriber source provides circuit breaking capability, which enables you to control how the connector handles errors that occur while processing a consumed message.

For example, when connecting to an external service, you can use the circuit breaker to handle any downtime of that service. The circuit breaker allows the system to stop making requests and allows the external service to recover under a reduced load.

Using Anypoint MQ in a Mule 4 app means having a Mule flow with an MQ subscriber that consumes messages from a queue and processes them using an external service. When this service isn’t available:

  1. The request fails.

  2. An error results.

  3. Message processing finishes as either a failure or as a custom error for handling the message, such as sending it to a dead letter queue (DLQ).

When the external service is not available, every attempt to process a message results in a failure, forcing the app to loop, consuming messages that cannot succeed. You can avoid this behavior by notifying the subscriber of the error in a way that prevents it from consuming more messages for a certain period.

Circuit Breaker Processes

The circuit breaker capability that the Subscriber source provides is bound to the error handling mechanism provided by Mule. It uses the error notification mechanism to count errors related to an external service, which is known as a circuit failure. You can bind any error to a circuit failure. For example, you can bind HTTP:TIMEOUT, FTP:SERVICE_NOT_AVAILABLE, or even a custom error from your app, such as ORG:EXTERNAL_ERROR.

If a Mule flow finishes its execution with an error, the Subscriber source checks if the error is one of the onErrorTypes that indicates an external service error, and counts consecutive occurrences until errorsThreshold is reached.

When errorsThreshold is reached, the circuit trips and remains open for the duration specified by tripTimeout. While the circuit is open, all polled or consumed messages are not acknowledged (NACK) and Anypoint MQ halts any future operations (poll, ACK, NACK, and Publish) until tripTimeout elapses.

After tripTimeout elapses, the circuit breaker transitions to a new state. If the new state is Closed, clients can consume messages on the next poll.

The behavior varies depending on whether you deploy the app to CloudHub with a single worker or multiple workers:

Single worker

The worker processes messages individually from the queue and opens the circuit breaker when the number of errors reaches the value of errorsThreshold.

Multiple workers

Each worker has an associated circuit breaker and tracks the values of errorsThreshold and tripTimeout individually. Each worker processes messages individually from the queue and opens the circuit breaker when the number of errors reaches the value of errorsThreshold set on the worker.

By default, the circuit breaking feature is disabled.

Circuit Breaker States

The circuit breaker has three states: Closed, Open, and Half Open. The behavior of the app changes based on the current state. See the Microsoft Circuit Breaker pattern for more information.

Circuit Breaker States Diagram
  • Closed

    The starting state where the Subscriber source retrieves messages normally from MQ based on its configuration, effectively working as if the circuit breaker is not present.

  • Closed-Open Transition

    When the number of failures occurs in succession during message processing, without successes, and reaches the errorsThreshold value, the circuit breaker trips and the circuit breaker transitions to an Open state.

    Messages that were already dispatched to the flow then finish processing, regardless of whether the result is success or failure.

    Messages kept locally that are in-flight for the broker but haven’t been dispatched yet are not acknowledged and returned to the queue for redelivery to another consumer.

  • Open

    The Subscriber source doesn’t attempt to retrieve messages, and skips the iterations silently until tripTimeout is reached.

  • Half Open

    After tripTimeout elapses, the Subscriber source goes to a Half Open state. In the next poll for messages, the Subscriber source retrieves a single message from the service and uses that message to check if the system has recovered before going back to the normal Closed state.

    When the Subscriber source successfully fetches a single message, dispatches it to the flow, and processing finishes successfully, the Subscriber source returns to normal and immediately attempts to fetch more messages.

    If Mule flow processing fails with one of the expected onErrorTypes, the circuit goes back to an Open state and resets the tripTimeout timer.

Configure the Circuit Breaker

You can configure a Circuit Breaker as either a Global Circuit Breaker or a Private Circuit Breaker.

Either way, the configuration parameters are the same:

  • onErrorTypes

    The error types that count as a failure during the flow execution. An error occurrence counts only when the flow finishes with an error propagation. By default, all errors count as a circuit failure.

  • errorsThreshold

    The number of onErrorTypes errors that must occur for the circuit breaker to open.

  • tripTimeout

    How long the circuit remains open once errorsThreshold is reached.

  • circuitName

    The name of a circuit breaker to bind to this configuration. By default, each queue has its own circuit breaker.

Global Circuit Breaker

Use a Global Circuit Breaker when you want to share the circuit state across multiple subscribers, as if subscribers are part of the same "circuit".

  1. In Anypoint Studio, click the Global Elements tab in the canvas.

  2. Select Create > Component Configuration > Circuit Breaker.

    Circuit breaker configuration properties in the Global Element Properties window
    Figure 6. The screenshot shows the circuit breaker configuration properties in the Global Element Properties window.

    In the configuration wizard, populate the following fields as needed. Once the configuration is complete, you can reference this Circuit_breaker declaration from any Anypoint MQ Subscriber.

To reference a circuit breaker:

  1. Select the Subscriber source in the canvas.

  2. Click the Advanced tab.

  3. Select Circuit Breaker > Global Reference and select a global circuit breaker configuration from the list.

    Circuit breaker global reference in the Advanced tab of the Subscriber properties window
    Figure 7. The arrow shows the circuit breaker global reference in the Advanced tab of the Subscriber properties window.

Private Circuit Breaker

You declare a Private Circuit Breaker internally on a single subscriber. This circuit declaration is used only in the flow where the Subscriber source is declared, isolated from all the other circuits.

To use this configuration:

  1. Select the Subscriber source in the canvas.

  2. Click the Advanced tab.

  3. Select Circuit breaker > Edit Inline, and then complete the fields.

    Edit inline selected for the circuit breaker in the Advanced tab of the Subscriber properties window
    Figure 8. The arrow shows Edit inline selected for the circuit breaker in the Advanced tab of the Subscriber properties window.

Circuit Breaker Examples

Circuit Configuration for a Single Subscriber

In this example, a single subscriber consumes messages from a queue and posts the messages to another service using its REST API. You can stop processing messages after 5 requests to the external service result in a timeout. Once processing stops, the Subscriber source waits for 30 seconds for the service to recover before retrying with a new message.

For this example, you need one config with these circuit breaker parameters:

<anypoint-mq:config name="Anypoint_MQ_Config">
    <anypoint-mq:connection url="${providerUrl}"
    clientId="${clientId}"
    clientSecret="${clientSecret}"/>
</anypoint-mq:config>

<flow name="subscribe">
   <anypoint-mq:subscriber config-ref="Anypoint_MQ_Config"
      destination="${subscribedQueue}">
        <anypoint-mq:circuit-breaker
          onErrorTypes="HTTP:TIMEOUT"   <!-- (1) -->
          errorsThreshold="5"           <!-- (2) -->
          tripTimeout="30"              <!-- (3) -->
          tripTimeoutUnit="SECONDS"/>
   </anypoint-mq:subscriber>
    <http:request config-ref="RequesterConfig"
       path="/external" method="POST"/> <!-- (4) -->
</flow>

(1)

Configures the error types to trip the circuit. When an error occurs for an errorsThreshold amount of times, polling stops.

(2)

Sets the threshold for how many consequent messages must occur to consider the circuit to be in a failure state.

(3)

Specifies how long to wait before resuming new message polling after the circuit breaker trips because errorsThreshold is reached.

(4)

Defines the operation to throw the error expected by the onErrorTypes parameters.

The circuit breaker ignores all errors that aren’t listed in the onErrorTypes parameter. In this example, the circuit breaker ignores errors such as HTTP:BAD_REQUEST.

Share a Circuit from Different Queues

In many cases, a single common service processes messages from different queues. This example configures the circuitName parameter to bind both subscribers to a single circuit:

<anypoint-mq:config name="Anypoint_MQ_Config">
    <anypoint-mq:connection url="${providerUrl}"
       clientId="${clientId}"
       clientSecret="${clientSecret}"/>
</anypoint-mq:config>

<anypoint-mq:circuit-breaker
    name="InvoiceProcess"                    <!-- (1) -->
    onErrorTypes="FTP:RETRY_EXHAUSTED, HTTP:SERVICE_UNAVAILABLE" <!-- (2) -->
    errorsThreshold="10"
    tripTimeout="5"
    tripTimeoutUnit="MINUTES"/>

<flow name="subscribe">
    <anypoint-mq:subscriber destination="${reservationsQueue}"
       config-ref="Anypoint_MQ_Config"
        circuitBreaker="GlobalCircuit"/>     <!-- (3) -->
    <flow-ref name="invoiceProcess">
</flow>

<flow name="otherSubscribe">
    <anypoint-mq:subscriber
      destination="${paymentsQueue}"
        config-ref="Anypoint_MQ_Config"
        circuitBreaker="GlobalCircuit"/>    <!-- (3) -->
    <flow-ref name="invoiceProcess">
</flow>

<sub-flow name="invoiceProcess">
  <ftp:write path="${auditFolder}"          <!-- (4) -->
	   config-ref="ftp-config"/>
  <http:request config-ref="requestConfig"  <!-- (5) -->
	   path="/external"/>
</sub-flow>

(1)

Sets the name parameter to share a common circuit breaker on multiple queues.

(2)

Identifies two errors that can affect the processing of messages from the Subscriber source and passes each as a CSV list.

(3)

For both subscribers, references the global circuit breaker configuration.

(4)

Might throw several errors, but only FTP:RETRY_EXHAUSTED is relevant to the circuit breaker.

(5)

The HTTP connector might throw an HTTP:SERVICE_UNAVAILABLE error, preventing the message from being processed.

In this scenario, both subscribers stop polling for messages as soon as the error count reaches the errorsThreshold="10" value, counting both FTP:RETRY_EXHAUSTED and HTTP:SERVICE_UNAVAILABLE errors. When the tripTimeout value elapses, one of the subscribers polls for a message and uses it to test the circuit, enabling the polling for both subscribers if the processing of that message succeeds.

FIFO Queues

FIFO queues are most suitable for single-consumer scenarios. When one consumer is accessing a message, all other consumers are blocked until the first batch is processed. No messages are delivered until all in-flight messages are acknowledged or not acknowledged.

With message groups, multiple consumers can access messages in a FIFO queue at the same time. In this case, one consumer accesses messages in a group and another consumer accesses messages in another group. Message order is preserved within each message group.

FIFO queues do not support retrieving messages by message group ID.

To preserve message processing order, set the value of maxConcurrency to 1 for flows that consume messages from a FIFO queue.

FIFO Queues and Clustering

FIFO queues behave the same in a clustered environment as in a non-clustered environment.

FIFO queues consume messages in the specified order. After the message is consumed, any further message processing can be distributed to other nodes. In this case, if the consumer acknowledges a message before it is fully processed, message order might be lost during message processing.

  • In an on-premises, high availability clustering environment, the Subscriber source in the Anypoint MQ connector runs on all nodes by default.

    You can change the behavior to run as a primary node by selecting Primary node only in the Advanced tab.

  • In CloudHub with multiple workers, all workers are run as a primary node.

    In this case, all workers running the application consume from the same FIFO queue.

View on GitHub