Anypoint MQ Subscriber Source - Mule 4
The Subscriber source in the Anypoint MQ connector enables the app to listen for new messages and consume them as they arrive at the destination. You can configure different listening strategies that enable you to tune the consumption for performance, predictability, and schedules.
Maximum Message Throughput
By default, the Subscriber source works in continuous listening mode, providing maximum throughput by consuming messages as soon as they arrive in the queue. This is called the prefetch
subscriber-type configuration. When using the prefetch
mode, the Subscriber source attempts to keep a local buffer of messages full, making them available to be dispatched to the Mule flow as soon as the app can accept them.
Due to a known issue, using prefetch mode with maxConcurrency="1" set on the flow element might result in message-processing latency.
|
For the prefetch
subscriber-type configuration, you configure the maxLocalMessages
parameter to specify the target size for a full buffer, which controls how quickly messages are taken from the queue.
<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config" destination="myQueue">
</anypoint-mq:subscriber>
-
Use a larger buffer size for maximum performance, keeping in mind that the messages kept locally won’t be available for any other consumer.
When a consumer takes a message from the queue and stores it in the local buffer, the message appears as in-flight for any other consumer and prevents them from consuming it. The message remains in this in-flight state for as long as necessary for the app to begin the processing.
-
Use a smaller buffer size to process messages as they are published to the queue.
The smaller buffer restricts app-level throughput and avoids blocking competing consumers.
When using
prefetch
mode, setmaxLocalMessages
to a value greater than1
to avoid message-processing latency issues. WhenmaxLocalMessages
is set to1
, latency can occur when the connector prefetches the message but the app doesn’t consume it, and the message returns to the queue for reprocessing.
FIFO Queues and Prefetch Mode
When using the prefetch
mode, the acknowledgementTimeout
timer begins when the message is taken from the queue.
Anypoint MQ attempts to honor the expiration time specified by acknowledgementTimeout
.
However, Anypoint MQ extends the expiration timeout if the time that the message remains in the buffer reaches 80% of the timeout value.
Don’t use prefetch
mode in the following situations:
-
Applications with long processing times that use the
maxConcurrency
attribute to limit concurrent processing.These options can cause messages to remain in the buffer longer, which can result in the messages expiring and being sent back to the queue.
-
When using FIFO queues
Because Anypoint MQ can’t guarantee ordering when using
prefetch
mode, don’t use it with FIFO queues.Using
prefetch
mode for applications with long processing times can consume significant resources because Anypoint MQ monitors the ACK TTL for all messages in the local buffer and extends the TTL to avoid sending messages back to the queue.
Predictable Message Consumption
If your app requires predictable and controlled message consumption rather than maximum throughput, you can configure the Subscriber as a polling source. The Subscriber source checks for new messages in the queue at a fixed scheduling rate. Every time the poll is triggered, the Subscriber source retrieves between 1 and 10 messages to dispatch to the flow individually in order.
Messages aren’t consumed immediately. Instead, they remain in the queue until the next poll of the Subscriber source is triggered. The queue contains up to 10 messages to be retrieved. For example, if your queue has 15 messages, one poll consumes the first 10 messages and the remaining 5 messages wait for the next poll.
To configure the Subscriber as a polling source, specify the polling
subscriber-type.
The default polling
subscriber-type declaration looks like this:
<anypoint-mq:subscriber doc:name="Subscriber" config-ref="Anypoint_MQ_Config" destination="myQueue">
<anypoint-mq:subscriber-type >
<anypoint-mq:polling >
<scheduling-strategy >
<fixed-frequency />
</scheduling-strategy>
</anypoint-mq:polling>
</anypoint-mq:subscriber-type>
</anypoint-mq:subscriber>
The default polling
subscriber-type declaration attempts to retrieve 10 messages from the declared destination every 1 second, dispatching the retrieved messages to the flow individually as a MuleMessage
instance.
The resulting MuleMessage
has:
-
The message body as its payload
-
The message metadata in the message attributes
You can customize the Subscriber source polling strategy using the standard Mule runtime engine scheduling strategies:
Customized Fixed Polling
You can customize the frequency at which the Subscriber source polls for new messages by declaring a polling
subscriber-type strategy with a customized fixed-frequency
scheduling strategy.
The default fixed-frequency
scheduling strategy declaration looks like this:
<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config" destination="myQueue">
<anypoint-mq:subscriber-type >
<anypoint-mq:polling>
<scheduling-strategy >
<fixed-frequency/>
</scheduling-strategy>
</anypoint-mq:polling>
</anypoint-mq:subscriber-type>
</anypoint-mq:subscriber>
Cron-Based Polling
The Anypoint MQ Subscriber can also use a cron
scheduling strategy, which enables you to schedule jobs such as "Run every minute starting at 2pm and ending at 2:59pm, every day". To use this scheduling strategy, declare a polling
subscriber-type strategy with a customized cron
scheduling strategy:
<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config" destination="myQueue">
<anypoint-mq:subscriber-type >
<anypoint-mq:polling >
<scheduling-strategy >
<cron expression="0 * 14 * * ?" timeZone="America/Los_Angeles" />
</scheduling-strategy>
</anypoint-mq:polling>
</anypoint-mq:subscriber-type>
</anypoint-mq:subscriber>
Message Acknowledgment
You can acknowledge received messages automatically only if the flow execution finishes successfully or as your app consumes them, before processing them.
-
Automatic
By default, the Subscriber source uses the AUTO acknowledgment mode. With this mode, the messages that the Subscriber source retrieves are acknowledged automatically after message flow processing succeeds. This means that the Subscriber source receives a message, dispatches it to the flow, and waits to see how the message processing finishes. It executes an ACK only when the processing finishes without exceptions.
If the execution of the processing flow finishes with a propagated exception, the message is automatically not acknowledged and is returned to the queue for redelivery.
For more information, see Automatic Acknowledgment.
-
Immediate
When you use the
IMMEDIATE
acknowledgment mode, the consumed message is acknowledged (deleted from the queue) and then dispatched to the Mule flow for processing. If the message acknowledgment fails, the message is discarded. The message isn’t dispatched to the flow and remains in-flight until the acknowledgment timeout.If the app restarts between being acknowledged and dispatched, the message might not be available again because it was deleted from the queue. To prevent message deletion until after processing, use the
AUTO
orMANUAL
acknowledgment mode instead.Using the Subscriber source in Prefetch mode with the
IMMEDIATE
acknowledgment mode sometimes results in thread accumulation and an unresponsive application.For more information, see Immediate Acknowledgment.
-
Manual
When you use the MANUAL acknowledgment mode, the app logic decides when to perform the acknowledgment of the message, using the ACK or NACK sources.
To perform the manual acknowledgment, you need the value of
ackToken
provided as part of the resulting message attributes.For more information, see Manual Acknowledgment.
For information about acknowledgment timeouts, see Acknowledgment Timeout.
Circuit Breaker Capability
The Subscriber source provides circuit breaking capability, which enables you to control how the connector handles errors that occur while processing a consumed message.
For example, when connecting to an external service, you can use the circuit breaker to handle any downtime of that service. The circuit breaker allows the system to stop making requests and allows the external service to recover under a reduced load.
Using Anypoint MQ in a Mule 4 app means having a Mule flow with an MQ subscriber that consumes messages from a queue and processes them using an external service. When this service isn’t available:
-
The request fails.
-
An error results.
-
Message processing finishes as either a failure or as a custom error for handling the message, such as sending it to a dead letter queue (DLQ).
When the external service is not available, every attempt to process a message results in a failure, forcing the app to loop, consuming messages that cannot succeed. You can avoid this behavior by notifying the subscriber of the error in a way that prevents it from consuming more messages for a certain period.
Circuit Breaker Processes
The circuit breaker capability that the Subscriber source provides is bound to the error handling mechanism provided by Mule. It uses the error notification mechanism to count errors related to an external service, which is known as a circuit failure. You can bind any error to a circuit failure. For example, you can bind HTTP:TIMEOUT
, FTP:SERVICE_NOT_AVAILABLE
, or even a custom error from your app, such as ORG:EXTERNAL_ERROR
.
If a Mule flow finishes its execution with an error, the Subscriber source checks if the error is one of the onErrorTypes
that indicates an external service error, and counts consecutive occurrences until errorsThreshold
is reached.
When errorsThreshold
is reached, the circuit trips and remains open for the duration specified by tripTimeout
.
While the circuit is open, all polled or consumed messages are not acknowledged (NACK)
and Anypoint MQ halts any future operations (poll, ACK, NACK, and Publish) until tripTimeout
elapses.
After tripTimeout
elapses, the circuit breaker transitions to a new state.
If the new state is Closed, clients can consume messages on the next poll.
The behavior varies depending on whether you deploy the app to CloudHub with a single worker or multiple workers:
- Single worker
-
The worker processes messages individually from the queue and opens the circuit breaker when the number of errors reaches the value of
errorsThreshold
. - Multiple workers
-
Each worker has an associated circuit breaker and tracks the values of
errorsThreshold
andtripTimeout
individually. Each worker processes messages individually from the queue and opens the circuit breaker when the number of errors reaches the value oferrorsThreshold
set on the worker.
By default, the circuit breaking feature is disabled.
Circuit Breaker States
The circuit breaker has three states: Closed, Open, and Half Open. The behavior of the app changes based on the current state. See the Microsoft Circuit Breaker pattern for more information.
-
Closed
The starting state where the Subscriber source retrieves messages normally from MQ based on its configuration, effectively working as if the circuit breaker is not present.
-
Closed-Open Transition
When the number of failures occurs in succession during message processing, without successes, and reaches the
errorsThreshold
value, the circuit breaker trips and the circuit breaker transitions to an Open state.Messages that were already dispatched to the flow then finish processing, regardless of whether the result is success or failure.
Messages kept locally that are in-flight for the broker but haven’t been dispatched yet are not acknowledged and returned to the queue for redelivery to another consumer.
-
Open
The Subscriber source doesn’t attempt to retrieve messages, and skips the iterations silently until
tripTimeout
is reached. -
Half Open
After
tripTimeout
elapses, the Subscriber source goes to a Half Open state. In the next poll for messages, the Subscriber source retrieves a single message from the service and uses that message to check if the system has recovered before going back to the normal Closed state.When the Subscriber source successfully fetches a single message, dispatches it to the flow, and processing finishes successfully, the Subscriber source returns to normal and immediately attempts to fetch more messages.
If Mule flow processing fails with one of the expected
onErrorTypes
, the circuit goes back to an Open state and resets thetripTimeout
timer.
Configure the Circuit Breaker
You can configure a Circuit Breaker as either a Global Circuit Breaker or a Private Circuit Breaker.
Either way, the configuration parameters are the same:
-
onErrorTypes
The error types that count as a failure during the flow execution. An error occurrence counts only when the flow finishes with an error propagation. By default, all errors count as a circuit failure.
-
errorsThreshold
The number of
onErrorTypes
errors that must occur for the circuit breaker to open. -
tripTimeout
How long the circuit remains open once
errorsThreshold
is reached. -
circuitName
The name of a circuit breaker to bind to this configuration. By default, each queue has its own circuit breaker.
Global Circuit Breaker
Use a Global Circuit Breaker when you want to share the circuit state across multiple subscribers, as if subscribers are part of the same "circuit".
-
In Anypoint Studio, click the Global Elements tab in the canvas.
-
Select Create > Component Configuration > Circuit Breaker.
Figure 6. The screenshot shows the circuit breaker configuration properties in the Global Element Properties window.In the configuration wizard, populate the following fields as needed. Once the configuration is complete, you can reference this
Circuit_breaker
declaration from any Anypoint MQ Subscriber.
To reference a circuit breaker:
-
Select the Subscriber source in the canvas.
-
Click the Advanced tab.
-
Select Circuit Breaker > Global Reference and select a global circuit breaker configuration from the list.
Figure 7. The arrow shows the circuit breaker global reference in the Advanced tab of the Subscriber properties window.
Private Circuit Breaker
You declare a Private Circuit Breaker internally on a single subscriber. This circuit declaration is used only in the flow where the Subscriber source is declared, isolated from all the other circuits.
To use this configuration:
-
Select the Subscriber source in the canvas.
-
Click the Advanced tab.
-
Select Circuit breaker > Edit Inline, and then complete the fields.
Figure 8. The arrow shows Edit inline selected for the circuit breaker in the Advanced tab of the Subscriber properties window.
Circuit Breaker Examples
Circuit Configuration for a Single Subscriber
In this example, a single subscriber consumes messages from a queue and posts the messages to another service using its REST API. You can stop processing messages after 5 requests to the external service result in a timeout. Once processing stops, the Subscriber source waits for 30 seconds for the service to recover before retrying with a new message.
For this example, you need one config with these circuit breaker parameters:
<anypoint-mq:config name="Anypoint_MQ_Config">
<anypoint-mq:connection url="${providerUrl}"
clientId="${clientId}"
clientSecret="${clientSecret}"/>
</anypoint-mq:config>
<flow name="subscribe">
<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config"
destination="${subscribedQueue}">
<anypoint-mq:circuit-breaker
onErrorTypes="HTTP:TIMEOUT" <!-- (1) -->
errorsThreshold="5" <!-- (2) -->
tripTimeout="30" <!-- (3) -->
tripTimeoutUnit="SECONDS"/>
</anypoint-mq:subscriber>
<http:request config-ref="RequesterConfig"
path="/external" method="POST"/> <!-- (4) -->
</flow>
(1) |
Configures the error types to trip the circuit. When an error occurs for an |
(2) |
Sets the threshold for how many consequent messages must occur to consider the circuit to be in a failure state. |
(3) |
Specifies how long to wait before resuming new message polling after the circuit breaker trips because |
(4) |
Defines the operation to throw the error expected by the |
The circuit breaker ignores all errors that aren’t listed in the onErrorTypes
parameter. In this example, the circuit breaker ignores errors such as HTTP:BAD_REQUEST
.
Share a Circuit from Different Queues
In many cases, a single common service processes messages from different queues. This example configures the circuitName
parameter to bind both subscribers to a single circuit:
<anypoint-mq:config name="Anypoint_MQ_Config">
<anypoint-mq:connection url="${providerUrl}"
clientId="${clientId}"
clientSecret="${clientSecret}"/>
</anypoint-mq:config>
<anypoint-mq:circuit-breaker
name="InvoiceProcess" <!-- (1) -->
onErrorTypes="FTP:RETRY_EXHAUSTED, HTTP:SERVICE_UNAVAILABLE" <!-- (2) -->
errorsThreshold="10"
tripTimeout="5"
tripTimeoutUnit="MINUTES"/>
<flow name="subscribe">
<anypoint-mq:subscriber destination="${reservationsQueue}"
config-ref="Anypoint_MQ_Config"
circuitBreaker="GlobalCircuit"/> <!-- (3) -->
<flow-ref name="invoiceProcess">
</flow>
<flow name="otherSubscribe">
<anypoint-mq:subscriber
destination="${paymentsQueue}"
config-ref="Anypoint_MQ_Config"
circuitBreaker="GlobalCircuit"/> <!-- (3) -->
<flow-ref name="invoiceProcess">
</flow>
<sub-flow name="invoiceProcess">
<ftp:write path="${auditFolder}" <!-- (4) -->
config-ref="ftp-config"/>
<http:request config-ref="requestConfig" <!-- (5) -->
path="/external"/>
</sub-flow>
(1) |
Sets the |
(2) |
Identifies two errors that can affect the processing of messages from the Subscriber source and passes each as a CSV list. |
(3) |
For both subscribers, references the global circuit breaker configuration. |
(4) |
Might throw several errors, but only |
(5) |
The HTTP connector might throw an |
In this scenario, both subscribers stop polling for messages as soon as the error count reaches the errorsThreshold="10"
value, counting both FTP:RETRY_EXHAUSTED
and HTTP:SERVICE_UNAVAILABLE
errors. When the tripTimeout
value elapses, one of the subscribers polls for a message and uses it to test the circuit, enabling the polling for both subscribers if the processing of that message succeeds.
FIFO Queues
FIFO queues are most suitable for single-consumer scenarios. When one consumer is accessing a message, all other consumers are blocked until the first batch is processed. No messages are delivered until all in-flight messages are acknowledged or not acknowledged.
With message groups, multiple consumers can access messages in a FIFO queue at the same time. In this case, one consumer accesses messages in a group and another consumer accesses messages in another group. Message order is preserved within each message group.
FIFO queues do not support retrieving messages by message group ID.
To preserve message processing order, set the value of maxConcurrency to 1 for flows that consume messages from a FIFO queue.
|
FIFO Queues and Clustering
FIFO queues behave the same in a clustered environment as in a non-clustered environment.
FIFO queues consume messages in the specified order. After the message is consumed, any further message processing can be distributed to other nodes. In this case, if the consumer acknowledges a message before it is fully processed, message order might be lost during message processing.
-
In an on-premises, high availability clustering environment, the Subscriber source in the Anypoint MQ connector runs on all nodes by default.
You can change the behavior to run as a primary node by selecting Primary node only in the Advanced tab.
-
In CloudHub with multiple workers, all workers are run as a primary node.
In this case, all workers running the application consume from the same FIFO queue.