FAILURE
Until Successful Scope
The Until Successful scope processes messages through its processors until the entire operation succeeds. By default, Until Successful’s processing occurs asynchronously from the main flow. After passing a message into the Until Successful scope, the main flow immediately regains control of the thread. However, you can configure Until Successful to run synchronously relative to the main flow.
Until Successful repeatedly retries to process a message that is attempting to complete an activity such as:
-
Dispatching to outbound endpoints, for example, when calling a remote web service that may have availability issues.
-
Executing a component method, for example, when executing on a Spring bean that may depend on unreliable resources.
-
A sub-flow execution, to keep re-executing several actions until they all succeed,
-
Any other message processor execution, to allow more complex scenarios.
The Until Successful component requires a mandatory object store to work. However Anypoint Studio doesn’t enforce this. If an object store is not declared, Studio throws an InitialisationException. For more information, see Until Successful and Object Store. The object store needs to be exclusive for each Until Successful instance. |
Success and Failure
As this scope continues to processes messages Until Successful, it is important to understand the definition of successful in context of Mule message processing.
Process Result | Conditions that determine the result | ||
---|---|---|---|
A message processor within the Until Successful scope throws an exception or contains an exception payload. Also, if an expression is provided in the attribute
|
|||
|
None of the message processors within the Until Successful scope throw any exceptions or contain an exception payload, or they do not return any message at all (that is, the flow ends in a one-way outbound endpoint). |
||
conditional |
Where you have configured a failure expression (see below), Mule evaluates the return message against the expression to dynamically determine if the action has failed or succeeded. |
Basic Attributes
The following table provides a description of the main configurable attributes of the element:
Attribute | Description |
---|---|
|
Reference to the |
|
Specifies the maximum number of retries that are attempted. |
|
Specifies the minimum interval between two attempts to process, in milliseconds. The actual interval depends on the previous execution, but should not exceed twice this number. The default value is 60000 milliseconds (one minute). |
|
Specifies an expression that, when it evaluated to true, determines that the processing of one route was a failure. If no expression is provided, only an exception is treated as a processing failure. |
|
Specifies an expression that, when evaluated to true, determines the synchronous response of Until Successful. |
|
The endpoint or message processor to which undeliverable messages are sent after all retries have been executed unsuccessfully. For more information, see Configuring a Dead Letter Queue. |
Configuring failureExpression
If the scope fails, a RetryPolicyExhaustedException
is created, wrapped as a MessagingException
and passed to the exception handler of the flow that contains the until-successful
element.
The following illustrates how to configure the failureExpression
returned by an Until Successful scope:
<until-successful objectStore-ref="objectStore"
failureExpression="#[message.inboundProperties['http.status'] != 202]"
maxRetries="6" secondsBetweenRetries="600">
<http:request config-ref="HTTP_Request_Configuration" path="flakey"
method="POST" doc:name="HTTP"/>
</until-successful>
When all Else has Failed
If message processing keeps failing and the maximum number of retries is exceeded, the default behavior of the Until Successful message processor consists in logging the message details and dropping it.
Should you want to perform a specific action on the discarded message (for example, storing it in a file or database), it is possible to configure a Dead Letter Queue endpoint” where dropped messages are sent.
For more information, see Configuring a Dead Letter Queue.
Configuring a Dead Letter Queue
To manage messages which have exhausted the number of maxRetries
within the Until Successful scope, you can define a DLQ (dead letter queue) endpoint to which Mule can send such messages. The following code sample shows how a VM endpoint can be used to receive messages that have been discarded.
<vm:endpoint name="dlqChannel" path="dlq" />
<until-successful objectStore-ref="objectStore"
dlqEndpoint-ref="dlqChannel"
maxRetries="3"
secondsBetweenRetries="10">
...
</until-successful>
One common option in configuring a DLQ is to do a Global endpoint:
<vm:inbound-endpoint exchange-pattern="one-way" path="dlqChannel" name="dlqChannel" doc:name="dlqChannel"/>
Then have a flow:
<flow name="dead-letter-queue-testFlow2" doc:name="dead-letter-queue-testFlow2">
<vm:inbound-endpoint exchange-pattern="one-way" ref="dlqChannel" doc:name="VM"/>
<logger level="WARN" doc:name="logger"/>
</flow>
So the deadLetterQueue-ref="dlqChannel"
in until-successful
refers to the global endpoint.
Asynchronous Until Successful
By default, Until Successful processes messages asynchronously relative to the main flow in which it resides. The section below describe configurations you can customize in your asynchronous Until Successful.
Until Successful and Object Store
This message processor needs a ListableObjectStore
instance in order to persist messages pending (re)processing. For an example of ListableObjectStore
, see XML Example.
There are several implementations available in Mule, including the following:
-
DefaultInMemoryObjectStore
- Default in-memory store -
DefaultPersistentObjectStore
- Default persistent store -
FileObjectStore
- File-based store -
QueuePersistenceObjectStore
- Global queue store -
SimpleMemoryObjectStore
- In-memory store
See Mule Object Stores for further information about object stores in Mule. The following code sample illustrates how to configure an in-memory store:
<spring:bean id="objectStore" class="org.mule.util.store.SimpleMemoryObjectStore" />
Customizing the Threading Profile of Asynchronous Until Successful
This feature enables you to customize the threading profile of an asynchronous Until Successful scope.
Studio Visual Editor
For an example of using Until Successful in Anypoint Studio see the XML Example and the canvas flow thereafter. |
To customize the threading profile:
-
In the Properties Editor of the Until Successful Scope in your flow, click to access the Threading tab.
-
Click to select the Configure threading profile radio button.
-
Enter values in the threading profile fields to customize the threading behavior. In this example, Pool Exhausted Action is set to
WAIT
and all other fields are left empty:Attribute Description Max Buffer Size
Determines how many requests are queued when the pool is at maximum usage capacity and the Pool Exhausted Action is
WAIT
. The buffer is used as an overflow.Type: Integer
Required: NoMax Active Threads
The maximum number of threads to use.
Type: Integer
Required: NoMax Idle Threads
he maximum number of idle or inactive threads that can be in the pool before they are destroyed.
Type: Integer
Required: NoPool Exhausted Action
When the maximum pool size or queue size is bounded, this value determines how to handle incoming tasks.
Possible values are:
-
WAIT
- Wait until a thread becomes available. Don’t use this value if the minimum number of threads is zero, in which case a thread may never become available. -
DISCARD
- Throw away the current request and return. -
DISCARD_OLDEST
- Throw away the oldest request and return. -
ABORT
- Throw a RuntimeException. -
RUN
- (Default). The thread that makes an execute request runs the task itself, which helps guard against lockup.
Type: String
Required: NoThread TTL
Determines how long an inactive thread is kept in the pool before being discarded.
Type: Integer
Required: NoThread Wait Timeout
How long to wait in milliseconds when the Pool Exhausted Action is
WAIT
. If the value is negative, it waits indefinitely.Type: Integer
Required: No -
Notes
-
Any
BlockingQueue
may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing:-
If fewer than
corePoolSize
threads are running, the Executor always prefers adding a new thread rather than queuing. -
If
corePoolSize
or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.* -
If a request cannot be queued, a new thread is created unless this would exceed
maximumPoolSize
, in which case, the task is rejected.
-
-
If you configure a threading profile with
poolExhaustedAction=WAIT
and amaxBufferSize
of a positive value, the thread pool does not grow frommaxThreadsIdle (corePoolSize)
towardsmaxThreadsActive (maxPoolSize)
unless the queue is completely filled up.
XML Editor or Standalone
Add the child element threading-profile
to the until-successful
element. Configure the attributes of threading-profile
according to the table below.
<until-successful>
<threading-profile maxThreadsActive="1" maxThreadsIdle="1" poolExhaustedAction="RUN"/>
<set-payload/>
<until-successful>
Attribute | Description |
---|---|
|
Determines how many requests are queued when the pool is at maximum usage capacity and the Type: Integer |
|
The maximum number of threads to use. Type: Integer |
|
The maximum number of idle or inactive threads that can be in the pool before they are destroyed. Type: Integer |
|
When the maximum pool size or queue size is bounded, this value determines how to handle incoming tasks. Possible values are:
Type: String |
|
Determines how long an inactive thread is kept in the pool before being discarded. Type: Integer |
|
How long to wait in milliseconds when the Type: Integer |
Notes
-
Any
BlockingQueue
may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing:-
If fewer than
corePoolSize
threads are running, the Executor always prefers adding a new thread rather than queuing. -
If
corePoolSize
or more threads are running, the Executor always prefers queuing a request rather than adding a new thread. -
If a request cannot be queued, a new thread is created unless this would exceed
maximumPoolSize
, in which case, the task is rejected.
-
-
If you configure a threading profile with
poolExhaustedAction=WAIT
and amaxBufferSize
of a positive value, the thread pool does not grow frommaxThreadsIdle (corePoolSize)
towardsmaxThreadsActive (maxPoolSize)
unless the queue is completely filled up.
Synchronous Until Successful
Out of the box, the Until Successful Scope processes messages asynchronously. After passing a message into the Until Successful scope, the main flow immediately regains control of the thread thus prohibiting any returned response from the processing activities which occur within the scope.
However, in some situations, you may need Until Successful to process messages synchronously so that the main flow waits for processing within the scope to complete before continuing processing. To address these needs, the Mule enables you to configure the scope to process messages synchronously.
When set to process message synchronously, Until Successful executes within the thread of the main flow, then returns the result scope’s processing on the same thread.
Studio Visual Editor
In the Threading tab of the Until Successful’s Properties Editor, click to select Synchronous.
XML Editor or Standalone
Add the synchronous
attribute with the value set to true
to the until-successful
element.
<until-successful synchronous="true">
<set-payload/>
</until-successful>
When set to process synchronously, the Until Successful scope does not accept the configuration of the following child element and attributes:
-
threading-profile
- Synchronous Until Successful does not need a ThreadPool. -
objectStore-ref
- Synchronous Until Successful is not required to persist messages between retries. -
deadLetterQueue-ref
- When the retry count is exhausted, Mule executes the exception strategy.
Example - Until Successful Usage
<until-successful objectStore-ref="objectStore" maxRetries="5" secondsBetweenRetries="60" doc:name="Until Successful">
<http:request config-ref="HTTP_Request_Configuration" path="submit" method="POST" doc:name="HTTP"/>
</until-successful>
Example - Retry Sending to an HTTP-based Service
This example demonstrates how to retry sending to an HTTP-based service until success:
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http
http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/tracking
http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/vm
http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd">
<http:request-config name="HTTP_Request_Configuration"
host="http://acme.com/api/flakey" port="8082"
doc:name="HTTP Request Configuration"/>
<spring:bean id="objectStore"
class="org.mule.util.store.SimpleMemoryObjectStore" />
<flow name="retrying-http-bridge">
<vm:inbound-endpoint exchange-pattern="one-way"
path="acme-bridge" doc:name="VM"/>
<until-successful objectStore-ref="objectStore" maxRetries="5"
failureExpression="#[header:INBOUND:http.status != 202]"
doc:name="Until Successful">
<http:request config-ref="HTTP_Request_Configuration"
path="/" method="POST" doc:name="HTTP"/>
</until-successful>
</flow>
</mule>
The Until Successful message processor relies on Mule ObjectStore for persisting the events it processes. In this example, we use an in-memory implementation: a persistent implementation would be required in order to ensure that nothing gets lost in case of a restart or crash.
This example retries every 10 minutes for an hour. Afterwards, the message is discarded.
This example interacts synchronously (request-response) with the outbound HTTP endpoint to ensure the remote web service correctly accepted the POSTed message (that is that it replied with a 202 status code).
Example - Retry Other Flows
The following example shows that other flows can be retried the same way:
<flow name="subflow-retrier">
<vm:inbound-endpoint path="signup"
exchange-pattern="request-response"/>
<until-successful objectStore-ref="objectStore"
ackExpression="#[message:correlationId]"
maxRetries="3"
secondsBetweenRetries="10">
<flow-ref name="signup-flow" />
</until-successful>
</flow>
Notice how the Until Successful message processor has been configured to synchronously acknowledge it has accepted the inbound event for processing by returning the current message correlation ID. Sending to the “signup” VM endpoint therefore returns the correlation ID of the message whose processing by the sub-flow named “signup-flow” is tried (and retried).
Example - Configure Object Stores
The following example demonstrates how to configure object stores in the following three situations:
-
idempotent filter with an in-memory object store
-
idempotent filter with a persistent object store
-
Until a successful scope occurs with an in-memory object store
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">
<!-- Global object store definition for a Listable Object Store, used in Flow 3 below. -->
<spring:beans>
<spring:bean id="myListableObjectStore" class="org.mule.util.store.SimpleMemoryObjectStore"/>
</spring:beans>
<http:listener-config name="HTTP_Listener_Configuration" host="localhost" port="8081" doc:name="HTTP Listener Configuration"/>
<!-- Idempotent Filter with In Memory Object Store -->
<flow name="Flow1_idempotentWithInMemoryStore" doc:name="Flow1_idempotentWithInMemoryStore">
<http:listener config-ref="HTTP_Listener_Configuration" path="idempotentInMemory" doc:name="HTTP"/>
<idempotent-message-filter idExpression="#[message.payload]" throwOnUnaccepted="true" storePrefix="Idempotent_Message" doc:name="Idempotent Message">
<in-memory-store name="myInMemoryObjectStore" entryTTL="120" expirationInterval="3600" maxEntries="60000" />
</idempotent-message-filter>
<set-payload value="YAY!" doc:name="Set Payload" />
<catch-exception-strategy doc:name="Catch Exception Strategy">
<set-payload value="NAY!" doc:name="Set Payload" />
</catch-exception-strategy>
</flow>
<!-- Idempotent Filter with Persistent File Store -->
<flow name="Flow2_idempotentWithTextFileStore" doc:name="Flow2_idempotentWithTextFileStore">
<http:listener config-ref="HTTP_Listener_Configuration" path="idempotentTextFile" doc:name="HTTP"/>
<idempotent-message-filter idExpression="#[message.payload]" throwOnUnaccepted="true" storePrefix="Idempotent_Message" doc:name="Idempotent Message">
<simple-text-file-store name="mySimpleTextFileStore" directory="#[server.tmpDir + '/objectstore']" entryTTL="120" expirationInterval="3600" maxEntries="60000" />
</idempotent-message-filter>
<set-payload value="YAY!" doc:name="Set Payload" />
<catch-exception-strategy doc:name="Catch Exception Strategy">
<set-payload value="NAY!" doc:name="Set Payload" />
</catch-exception-strategy>
</flow>
<!-- Until Successful Scope with In Memory Object Store -->
<flow name="Flow3_UntilSuccessfulWithListableObjectStore" doc:name="UntilSuccessfulWithListableObjectStore">
<http:listener config-ref="HTTP_Listener_Configuration" path="hey" doc:name="HTTP"/>
<until-successful objectStore-ref="myListableObjectStore" maxRetries="15" secondsBetweenRetries="1" doc:name="Until Successful">
<processor-chain doc:name="Processor Chain">
<message-filter throwOnUnaccepted="true">
<expression-filter expression="return Math.random() < 0.1" doc:name="Expression" />
</message-filter>
<logger message="This eventually happens." doc:name="Logger" />
</processor-chain>
</until-successful>
<set-payload value="Completed" doc:name="Set Payload" />
</flow>
</mule>
Anypoint Studio canvas flows for this example: