Contact Us 1-800-596-4880

Until Successful Scope

The Until Successful scope processes messages through its processors until the entire operation succeeds. By default, Until Successful’s processing occurs asynchronously from the main flow. After passing a message into the Until Successful scope, the main flow immediately regains control of the thread. However, you can configure Until Successful to run synchronously relative to the main flow.

Until Successful repeatedly retries to process a message that is attempting to complete an activity such as:

  • Dispatching to outbound endpoints, for example, when calling a remote web service that may have availability issues.

  • Executing a component method, for example, when executing on a Spring bean that may depend on unreliable resources.

  • A sub-flow execution, to keep re-executing several actions until they all succeed,

  • Any other message processor execution, to allow more complex scenarios.

The Until Successful component requires a mandatory object store to work. However Anypoint Studio doesn’t enforce this. If an object store is not declared, Studio throws an InitialisationException. For more information, see Until Successful and Object Store. The object store needs to be exclusive for each Until Successful instance.

Success and Failure

As this scope continues to processes messages Until Successful, it is important to understand the definition of successful in context of Mule message processing.

Process Result Conditions that determine the result

FAILURE

A message processor within the Until Successful scope throws an exception or contains an exception payload. Also, if an expression is provided in the attribute failureExpression and it evaluates to true.

This scope does not check the failureExpression if there is an exception. The scope will schedule the message for the next retry, as long as it has not exceeded the max retry limit.

SUCCESS

None of the message processors within the Until Successful scope throw any exceptions or contain an exception payload, or they do not return any message at all (that is, the flow ends in a one-way outbound endpoint).

conditional

Where you have configured a failure expression (see below), Mule evaluates the return message against the expression to dynamically determine if the action has failed or succeeded.

Basic Attributes

The following table provides a description of the main configurable attributes of the element:

Attribute Description

objectStore-ref

Reference to the org.mule.api.store.ListableObjectStore that is used to store events pending to process or reprocess.

maxRetries

Specifies the maximum number of retries that are attempted.

millisBetweenRetries

Specifies the minimum interval between two attempts to process, in milliseconds. The actual interval depends on the previous execution, but should not exceed twice this number. The default value is 60000 milliseconds (one minute).

failureExpression

Specifies an expression that, when it evaluated to true, determines that the processing of one route was a failure. If no expression is provided, only an exception is treated as a processing failure.

ackExpression

Specifies an expression that, when evaluated to true, determines the synchronous response of Until Successful.

deadLetterQueue-ref

The endpoint or message processor to which undeliverable messages are sent after all retries have been executed unsuccessfully. For more information, see Configuring a Dead Letter Queue.

Configuring failureExpression

If the scope fails, a RetryPolicyExhaustedException is created, wrapped as a MessagingException and passed to the exception handler of the flow that contains the until-successful element.

The following illustrates how to configure the failureExpression returned by an Until Successful scope:

<until-successful objectStore-ref="objectStore"
   failureExpression="#[message.inboundProperties['http.status'] != 202]"
   maxRetries="6" secondsBetweenRetries="600">
   <http:request config-ref="HTTP_Request_Configuration" path="flakey"
     method="POST" doc:name="HTTP"/>
</until-successful>

When all Else has Failed

If message processing keeps failing and the maximum number of retries is exceeded, the default behavior of the Until Successful message processor consists in logging the message details and dropping it.

Should you want to perform a specific action on the discarded message (for example, storing it in a file or database), it is possible to configure a Dead Letter Queue endpoint” where dropped messages are sent.

For more information, see Configuring a Dead Letter Queue.

Configuring a Dead Letter Queue

To manage messages which have exhausted the number of maxRetries within the Until Successful scope, you can define a DLQ (dead letter queue) endpoint to which Mule can send such messages. The following code sample shows how a VM endpoint can be used to receive messages that have been discarded.

<vm:endpoint name="dlqChannel" path="dlq" />

<until-successful objectStore-ref="objectStore"
                  dlqEndpoint-ref="dlqChannel"
                  maxRetries="3"
                  secondsBetweenRetries="10">
...
</until-successful>

One common option in configuring a DLQ is to do a Global endpoint:

<vm:inbound-endpoint exchange-pattern="one-way" path="dlqChannel" name="dlqChannel" doc:name="dlqChannel"/>

Then have a flow:

<flow name="dead-letter-queue-testFlow2" doc:name="dead-letter-queue-testFlow2">
<vm:inbound-endpoint exchange-pattern="one-way" ref="dlqChannel" doc:name="VM"/>
<logger level="WARN" doc:name="logger"/>
</flow>

So the deadLetterQueue-ref="dlqChannel" in until-successful refers to the global endpoint.

Asynchronous Until Successful

By default, Until Successful processes messages asynchronously relative to the main flow in which it resides. The section below describe configurations you can customize in your asynchronous Until Successful.

Until Successful and Object Store

This message processor needs a ListableObjectStore instance in order to persist messages pending (re)processing. For an example of ListableObjectStore, see XML Example.

There are several implementations available in Mule, including the following:

  • DefaultInMemoryObjectStore - Default in-memory store

  • DefaultPersistentObjectStore - Default persistent store

  • FileObjectStore - File-based store

  • QueuePersistenceObjectStore - Global queue store

  • SimpleMemoryObjectStore - In-memory store

See Mule Object Stores for further information about object stores in Mule. The following code sample illustrates how to configure an in-memory store:

<spring:bean id="objectStore" class="org.mule.util.store.SimpleMemoryObjectStore" />

Customizing the Threading Profile of Asynchronous Until Successful

This feature enables you to customize the threading profile of an asynchronous Until Successful scope.

Studio Visual Editor

For an example of using Until Successful in Anypoint Studio see the XML Example and the canvas flow thereafter.

To customize the threading profile:

  1. In the Properties Editor of the Until Successful Scope in your flow, click to access the Threading tab.

  2. Click to select the Configure threading profile radio button.

  3. Enter values in the threading profile fields to customize the threading behavior. In this example, Pool Exhausted Action is set to WAIT and all other fields are left empty:

    Configure Threading Profile
    Attribute Description

    Max Buffer Size

    Determines how many requests are queued when the pool is at maximum usage capacity and the Pool Exhausted Action is WAIT. The buffer is used as an overflow.

    Type: Integer
    Required: No

    Max Active Threads

    The maximum number of threads to use.

    Type: Integer
    Required: No

    Max Idle Threads

    he maximum number of idle or inactive threads that can be in the pool before they are destroyed.

    Type: Integer
    Required: No

    Pool Exhausted Action

    When the maximum pool size or queue size is bounded, this value determines how to handle incoming tasks.

    Possible values are:

    • WAIT - Wait until a thread becomes available. Don’t use this value if the minimum number of threads is zero, in which case a thread may never become available.

    • DISCARD - Throw away the current request and return.

    • DISCARD_OLDEST - Throw away the oldest request and return.

    • ABORT - Throw a RuntimeException.

    • RUN - (Default). The thread that makes an execute request runs the task itself, which helps guard against lockup.

    Type: String
    Required: No

    Thread TTL

    Determines how long an inactive thread is kept in the pool before being discarded.

    Type: Integer
    Required: No

    Thread Wait Timeout

    How long to wait in milliseconds when the Pool Exhausted Action is WAIT. If the value is negative, it waits indefinitely.

    Type: Integer
    Required: No

Notes

  • Any BlockingQueue may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing:

    • If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.

    • If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.*

    • If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task is rejected.

  • If you configure a threading profile with poolExhaustedAction=WAIT and a maxBufferSize of a positive value, the thread pool does not grow from maxThreadsIdle (corePoolSize) towards maxThreadsActive (maxPoolSize) unless the queue is completely filled up.

XML Editor or Standalone

Add the child element threading-profile to the until-successful element. Configure the attributes of threading-profile according to the table below.

<until-successful>
     <threading-profile maxThreadsActive="1" maxThreadsIdle="1" poolExhaustedAction="RUN"/>
     <set-payload/>
<until-successful>
Attribute Description

maxBufferSize

Determines how many requests are queued when the pool is at maximum usage capacity and the poolExhaustedAction is WAIT. The buffer is used as an overflow.

Type: Integer
Required: No

maxThreadsActive

The maximum number of threads to use.

Type: Integer
Required: No

maxThreadsIdle

The maximum number of idle or inactive threads that can be in the pool before they are destroyed.

Type: Integer
Required: No

poolExhaustedAction

When the maximum pool size or queue size is bounded, this value determines how to handle incoming tasks.

Possible values are:

  • WAIT - Wait until a thread becomes available. Don’t use this value if the minimum number of threads is zero, in which case a thread may never become available.

  • DISCARD - Throw away the current request and return.

  • DISCARD_OLDEST - Throw away the oldest request and return.

  • ABORT - Throw a RuntimeException.

  • RUN - (Default). The thread making the execute request runs the task itself, which helps guard against lockup.

Type: String
Required: No

threadTTL

Determines how long an inactive thread is kept in the pool before being discarded.

Type: Integer
Required: No

threadWaitTimeout

How long to wait in milliseconds when the poolExhaustedAction is WAIT. If the value is negative, it waits indefinitely.

Type: Integer
Required: No

Notes

  • Any BlockingQueue may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing:

    • If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.

    • If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.

    • If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task is rejected.

  • If you configure a threading profile with poolExhaustedAction=WAIT and a maxBufferSize of a positive value, the thread pool does not grow from maxThreadsIdle (corePoolSize) towards maxThreadsActive (maxPoolSize) unless the queue is completely filled up.

Synchronous Until Successful

Out of the box, the Until Successful Scope processes messages asynchronously. After passing a message into the Until Successful scope, the main flow immediately regains control of the thread thus prohibiting any returned response from the processing activities which occur within the scope.

However, in some situations, you may need Until Successful to process messages synchronously so that the main flow waits for processing within the scope to complete before continuing processing. To address these needs, the Mule enables you to configure the scope to process messages synchronously.

When set to process message synchronously, Until Successful executes within the thread of the main flow, then returns the result scope’s processing on the same thread.

Studio Visual Editor

In the Threading tab of the Until Successful’s Properties Editor, click to select Synchronous.

until successful

XML Editor or Standalone

Add the synchronous attribute with the value set to true to the until-successful element.

<until-successful synchronous="true">
     <set-payload/>
</until-successful>

When set to process synchronously, the Until Successful scope does not accept the configuration of the following child element and attributes:

  • threading-profile - Synchronous Until Successful does not need a ThreadPool.

  • objectStore-ref - Synchronous Until Successful is not required to persist messages between retries.

  • deadLetterQueue-ref - When the retry count is exhausted, Mule executes the exception strategy.

Example - Until Successful Usage

<until-successful objectStore-ref="objectStore" maxRetries="5" secondsBetweenRetries="60" doc:name="Until Successful">
    <http:request config-ref="HTTP_Request_Configuration" path="submit" method="POST" doc:name="HTTP"/>
</until-successful>

Example - Retry Sending to an HTTP-based Service

This example demonstrates how to retry sending to an HTTP-based service until success:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
	xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
	xmlns="http://www.mulesoft.org/schema/mule/core"
	xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:spring="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-current.xsd
	http://www.mulesoft.org/schema/mule/core
	http://www.mulesoft.org/schema/mule/core/current/mule.xsd
	http://www.mulesoft.org/schema/mule/http
	http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
	http://www.mulesoft.org/schema/mule/ee/tracking
	http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
	http://www.mulesoft.org/schema/mule/vm
	http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd">
    <http:request-config name="HTTP_Request_Configuration"
    	host="http://acme.com/api/flakey" port="8082"
    	doc:name="HTTP Request Configuration"/>
    <spring:bean id="objectStore"
    	class="org.mule.util.store.SimpleMemoryObjectStore" />
    <flow name="retrying-http-bridge">
        <vm:inbound-endpoint exchange-pattern="one-way"
        	path="acme-bridge" doc:name="VM"/>
        <until-successful objectStore-ref="objectStore" maxRetries="5"
        	failureExpression="#[header:INBOUND:http.status != 202]"
        	doc:name="Until Successful">
            <http:request config-ref="HTTP_Request_Configuration"
            	path="/" method="POST" doc:name="HTTP"/>
        </until-successful>
    </flow>
</mule>

The Until Successful message processor relies on Mule ObjectStore for persisting the events it processes. In this example, we use an in-memory implementation: a persistent implementation would be required in order to ensure that nothing gets lost in case of a restart or crash.

This example retries every 10 minutes for an hour. Afterwards, the message is discarded.

This example interacts synchronously (request-response) with the outbound HTTP endpoint to ensure the remote web service correctly accepted the POSTed message (that is that it replied with a 202 status code).

Example - Retry Other Flows

The following example shows that other flows can be retried the same way:

<flow name="subflow-retrier">
    <vm:inbound-endpoint path="signup"
        exchange-pattern="request-response"/>
    <until-successful objectStore-ref="objectStore"
        ackExpression="#[message:correlationId]"
        maxRetries="3"
        secondsBetweenRetries="10">
        <flow-ref name="signup-flow" />
    </until-successful>
</flow>

Notice how the Until Successful message processor has been configured to synchronously acknowledge it has accepted the inbound event for processing by returning the current message correlation ID. Sending to the “signup” VM endpoint therefore returns the correlation ID of the message whose processing by the sub-flow named “signup-flow” is tried (and retried).

Example - Configure Object Stores

The following example demonstrates how to configure object stores in the following three situations:

  1. idempotent filter with an in-memory object store

  2. idempotent filter with a persistent object store

  3. Until a successful scope occurs with an in-memory object store

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">

<!-- Global object store definition for a Listable Object Store, used in Flow 3 below. -->

    <spring:beans>
        <spring:bean id="myListableObjectStore" class="org.mule.util.store.SimpleMemoryObjectStore"/>
    </spring:beans>

    <http:listener-config name="HTTP_Listener_Configuration" host="localhost" port="8081" doc:name="HTTP Listener Configuration"/>

<!--  Idempotent Filter with In Memory Object Store -->

    <flow name="Flow1_idempotentWithInMemoryStore" doc:name="Flow1_idempotentWithInMemoryStore">
        <http:listener config-ref="HTTP_Listener_Configuration" path="idempotentInMemory" doc:name="HTTP"/>
        <idempotent-message-filter idExpression="#[message.payload]" throwOnUnaccepted="true" storePrefix="Idempotent_Message" doc:name="Idempotent Message">
            <in-memory-store name="myInMemoryObjectStore" entryTTL="120" expirationInterval="3600" maxEntries="60000" />
        </idempotent-message-filter>
        <set-payload value="YAY!" doc:name="Set Payload" />
        <catch-exception-strategy doc:name="Catch Exception Strategy">
            <set-payload value="NAY!" doc:name="Set Payload" />
        </catch-exception-strategy>
    </flow>

<!--  Idempotent Filter with Persistent File Store -->

    <flow name="Flow2_idempotentWithTextFileStore" doc:name="Flow2_idempotentWithTextFileStore">
        <http:listener config-ref="HTTP_Listener_Configuration" path="idempotentTextFile" doc:name="HTTP"/>
        <idempotent-message-filter idExpression="#[message.payload]" throwOnUnaccepted="true" storePrefix="Idempotent_Message" doc:name="Idempotent Message">
            <simple-text-file-store name="mySimpleTextFileStore"                directory="#[server.tmpDir + '/objectstore']" entryTTL="120" expirationInterval="3600" maxEntries="60000" />
        </idempotent-message-filter>
        <set-payload value="YAY!" doc:name="Set Payload" />
        <catch-exception-strategy doc:name="Catch Exception Strategy">
            <set-payload value="NAY!" doc:name="Set Payload" />
        </catch-exception-strategy>
    </flow>

<!--  Until Successful Scope with In Memory Object Store -->

    <flow name="Flow3_UntilSuccessfulWithListableObjectStore" doc:name="UntilSuccessfulWithListableObjectStore">
        <http:listener config-ref="HTTP_Listener_Configuration" path="hey" doc:name="HTTP"/>
        <until-successful objectStore-ref="myListableObjectStore" maxRetries="15" secondsBetweenRetries="1" doc:name="Until Successful">
            <processor-chain doc:name="Processor Chain">
                <message-filter throwOnUnaccepted="true">
                    <expression-filter expression="return Math.random() &lt; 0.1" doc:name="Expression" />
                </message-filter>
                <logger message="This eventually happens." doc:name="Logger" />
            </processor-chain>
        </until-successful>
        <set-payload value="Completed" doc:name="Set Payload" />
    </flow>

</mule>

Anypoint Studio canvas flows for this example:

until successful xml example pt1
until successful xml example pt2