Nav
You are viewing an older version of this section. Click here to navigate to the latest version.

Until Successful Scope

The until-successful scope processes messages through its processors until the process succeeds. By default, until-successful’s processing occurs asynchronously from the main flow. After passing a message into the until-successful scope, the main flow immediately regains control of the thread. However, you can configure until-successful to run synchronously relative to the main flow.

Until-successful repeatedly retries to process a message that is attempting to complete an activity such as:

  • Dispatching to outbound endpoints, for example, when calling a remote web service that may have availability issues.

  • Executing a component method, for example, when executing on a Spring bean that may depend on unreliable resources.

  • A sub-flow execution, to keep re-executing several actions until they all succeed,

  • Any other message processor execution, to allow more complex scenarios.

The until successful component requires a mandatory object store to work. However Anypoint Studio doesn’t enforce this. If an object store is not declared, Studio throws an InitialisationException. For more information, see Until-Successful and Object Store. The object store needs to be exclusive for each until-successful instance.

See Also:

Success and Failure

As this scope continues to processes messages until successful, it is important to understand the definition of successful in context of Mule message processing.

Result Description

FAILURE

A message processor within the until-successful scope throws and exception or contains an exception payload. Also, if an expression is provided in the attribute failureExpression and it evaluates to true.

SUCCESS

None of the message processors within the until-successful scope throw any exceptions or contain an exception payload, or they do not return any message at all (i.e. the flow ends in a one-way outbound endpoint)

conditional

Where you have configured a failure expression (see below), Mule evaluates the return message against the expression to dynamically determine if the action has failed or succeeded.

Basic Attributes

Below is a table with a description of the main configurable attributes of the element:

Attribute Description

objectStore-ref

Reference to the org.mule.api.store.ListableObjectStore that is used to store events pending to process or reprocess

maxRetries

Specifies the maximum number of retries that will be attempted

millisBetweenRetries

Specifies the minimum interval between two attempts to process, in milliseconds. The actual interval depends on the previous execution, but should not exceed twice this number. The default value is 60000 milliseconds (one minute)

failureExpression

Specifies an expression that, when it evaluated to true, determines that the processing of one route was a failure. If no expression is provided, only an exception will be treated as a processing failure.

ackExpression

Specifies an expression that, when evaluated to true, determines the synchronous response of until-successful.

deadLetterQueue-ref

The endpoint or message processor to which undeliverable messages are sent after all retries have been executed unsuccessfully.

Configuring failureExpression

If the scope fails, a RetryPolicyExhaustedException is created, wrapped as a MessagingException and passed to the exception handler of the flow that contains the until-successful element.

The following illustrates how to configure the failureExpression returned by an until-successful scope:


          
       
1
2
3
4
5
6
<until-successful objectStore-ref="objectStore"
    failureExpression="#[message.inboundProperties['http.status'] != 202]"
    maxRetries="6" secondsBetweenRetries="600">
    <http:outbound-endpoint host="localhost" port="${port2}" path="flakey"
method="POST" exchange-pattern="one-way" />
</until-successful>

When all Else has Failed

If message processing keeps failing and the maximum number of retries is exceeded, the default behavior of the Until Successful message processor consists in logging the message details and dropping it.

Should you want to perform a specific action on the discarded message (for example, storing it in a file or database), it is possible to configure a Dead Letter Queue endpoint” where dropped messages are sent.

For more information, see Configuring a Dead Letter Queue.

Configuring a Dead Letter Queue

To manage messages which have exhausted the number of maxRetries within the until-successful scope, you can define a DLQ (dead letter queue) endpoint to which Mule can send such messages. Refer to the code sample below for an example configuration.


          
       
1
2
3
4
5
6
<until-successful objectStore-ref="objectStore"
                  deadLetterQueue-ref="dlqChannel"
                  maxRetries="3"
                  secondsBetweenRetries="10">
...
</until-successful>

Asynchronous Until-Successful

By default, until-successful processes messages asynchronously relative to the main flow in which it resides. The section below describe configurations you can customize in your asynchronous until-successful.

Until-Successful and Object Store

This message processor needs an http://www.mulesoft.org/docs/site/3.5.0/apidocs/index.html?org/mule/api/store/ListableObjectStore.html[ListableObjectStore] instance in order to persist messages pending (re)processing. There are several implementations available in Mule, including the following:

  • DefaultInMemoryObjectStore: default in-memory store

  • DefaultPersistentObjectStore: default persistent store

  • FileObjectStore: file-based store

  • QueuePersistenceObjectStore: global queue store

  • SimpleMemoryObjectStore: in-memory store

See Mule Object Stores for further information about object stores in Mule. The following code sample illustrates how to configure an in-memory store:

<spring:bean id="objectStore" class="org.mule.util.store.SimpleMemoryObjectStore" />

Customizing the Threading Profile of Asynchronous Until-Successful

This feature enables you to customize the threading profile of an asynchronous until-successful scope. 

  1. In the Properties Editor of the Until Successful Scope in your flow, click to access the Threading tab.

  2. Click to select the Configure threading profile radio button.

  3. Enter values in the threading profile fields to customize the threading behavior.

    configure_threading

Attribute

Type

Required

Default Value

Description

Max Buffer Size

integer

no

Determines how many requests are queued when the pool is at maximum usage capacity and the pool exhausted action is WAIT. The buffer is used as an overflow.*

Max Active Threads

integer

no

The maximum number of threads that will be used.

Max Idle Threads

integer

no

The maximum number of idle or inactive threads that can be in the pool before they are destroyed.

Pool Exhausted Action

WAIT/DISCARD/DISCARD_OLDEST/ABORT/RUN

no

When the maximum pool size or queue size is bounded, this value determines how to handle incoming tasks. Possible values are: WAIT (wait until a thread becomes available; don’t use this value if the minimum number of threads is zero, in which case a thread may never become available), DISCARD (throw away the current request and return), DISCARD_OLDEST (throw away the oldest request and return), ABORT (throw a RuntimeException), and RUN (the default; the thread making the execute request runs the task itself, which helps guard against lockup).

Thread TTL

integer

no

Determines how long an inactive thread is kept in the pool before being discarded.

Thread Wait Timeout

integer

no

How long to wait in milliseconds when the pool exhausted action is WAIT. If the value is negative, it will wait indefinitely.

Any BlockingQueue may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing: * If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing. * *If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread. * If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

If you configure a threading profile with poolExhaustedAction=WAIT and a maxBufferSize of a positive value, the thread pool does not grow from maxThreadsIdle (corePoolSize) towards maxThreadsActive (maxPoolSize) unless the queue is completely filled up.

To the until-successful element, add child element  threading-profile . Configure the attributes of the child element according to the table below.


    
             
          
1
2
3
4
&lt;until-successful&gt;
     &lt;threading-profile maxThreadsActive="1" maxThreadsIdle="1" poolExhaustedAction="RUN"/&gt;
     &lt;set-payload/&gt;
&lt;until-successful&gt;

Attribute

Type

Required

Default Value

Description

maxBufferSize

integer

no

Determines how many requests are queued when the pool is at maximum usage capacity and the pool exhausted action is WAIT. The buffer is used as an overflow.*

maxThreadsActive

integer

no

The maximum number of threads that will be used.

maxThreadsIdle

integer

no

The maximum number of idle or inactive threads that can be in the pool before they are destroyed.

poolExhaustedAction

WAIT/DISCARD/DISCARD_OLDEST/ABORT/RUN

no

When the maximum pool size or queue size is bounded, this value determines how to handle incoming tasks. Possible values are: WAIT (wait until a thread becomes available; don’t use this value if the minimum number of threads is zero, in which case a thread may never become available), DISCARD (throw away the current request and return), DISCARD_OLDEST (throw away the oldest request and return), ABORT (throw a RuntimeException), and RUN (the default; the thread making the execute request runs the task itself, which helps guard against lockup).

threadTTL

integer

no

Determines how long an inactive thread is kept in the pool before being discarded.

threadWaitTimeout

integer

no

How long to wait in milliseconds when the pool exhausted action is WAIT. If the value is negative, it will wait indefinitely.

*Any BlockingQueue may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing:

  • If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.

  • If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.

  • If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

If you configure a threading profile with poolExhaustedAction=WAIT and a maxBufferSize of a positive value, the thread pool does not grow from maxThreadsIdle (corePoolSize) towards maxThreadsActive (maxPoolSize) unless the queue is completely filled up.

Synchronous Until-Successful

Out of the box, the until-successful scope processes messages asynchronously. After passing a message into the until-successful scope, the main flow immediately regains control of the thread thus prohibiting any returned response from the processing activities which occur within the scope. 

However, in some situations, you may need until-successful to process messages synchronously so that the main flow waits for processing within the scope to complete before continuing processing. To address these needs, the Mule enables you to configure the scope to process messages synchronously.

When set to process message synchronously, until-successful executes within the thread of the main flow, then returns the result scope’s processing on the same thread. 

In the Threading tab of the Until Successful’s Properties Editor, click to select Synchronous.

until_successful

To the until-successful element, add the  synchronous  attribute with the value set to true.


    
            
         
1
2
3
&lt;until-successful synchronous="true"&gt;
     &lt;set-payload/&gt;
&lt;/until-successful&gt;

When set to process synchronously, the until-successful scope does not accept the configuration of the following child element and attributes:

  • threading-profile (synchronous until-successful does not need a ThreadPool)

  • objectStore-ref (synchronous until-successful is not required to persist messages between retries)

  • deadLetterQueue-ref (when the retry count is exhausted, Mule executes the exception strategy)

Example - Until-Successful Usage


         
      
1
2
3
<until-successful objectStore-ref="objectStore" maxRetries="5" secondsBetweenRetries="60" doc:name="Until Successful">
    <http:request config-ref="HTTP_Request_Configuration" path="submit" method="POST" doc:name="HTTP"/>
</until-successful>

Example - Retry Sending to an HTTP-based Service

This example demonstrates how to retry sending to an HTTP-based service until success:


         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
        xmlns:http="http://www.mulesoft.org/schema/mule/http"
        xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
        xmlns="http://www.mulesoft.org/schema/mule/core"
        xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
        xmlns:spring="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-current.xsd
        http://www.mulesoft.org/schema/mule/core
        http://www.mulesoft.org/schema/mule/core/current/mule.xsd
        http://www.mulesoft.org/schema/mule/http
        http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
        http://www.mulesoft.org/schema/mule/ee/tracking
        http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
        http://www.mulesoft.org/schema/mule/vm
        http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd">
    <http:request-config name="HTTP_Request_Configuration"
            host="http://acme.com/api/flakey" port="8082"
            doc:name="HTTP Request Configuration"/>
    <spring:bean id="objectStore"
            class="org.mule.util.store.SimpleMemoryObjectStore" />
    <flow name="retrying-http-bridge">
        <vm:inbound-endpoint exchange-pattern="one-way"
                path="acme-bridge" doc:name="VM"/>
        <until-successful objectStore-ref="objectStore" maxRetries="5"
                failureExpression="#[header:INBOUND:http.status != 202]"
                doc:name="Until Successful">
            <http:request config-ref="HTTP_Request_Configuration"
                    path="/" method="POST" doc:name="HTTP"/>
        </until-successful>
    </flow>
</mule>

The Until Successful message processor relies on Mule ObjectStore for persisting the events it processes. In this example, we use an in-memory implementation: a persistent implementation would be required in order to ensure that nothing gets lost in case of a restart or crash.

This example retries every 10 minutes for an hour. Afterwards, the message is discarded.

This example interacts synchronously (request-response) with the outbound HTTP endpoint to ensure the remote web service correctly accepted the POSTed message (that is that it replied with a 202 status code).

Example - Retry Other Flows

The following example shows that other flows can be retried the same way:


         
      
1
2
3
4
5
6
7
8
9
10
<flow name="subflow-retrier">
    <vm:inbound-endpoint path="signup"
        exchange-pattern="request-response"/>
    <until-successful objectStore-ref="objectStore"
        ackExpression="#[message:correlationId]"
        maxRetries="3"
        secondsBetweenRetries="10">
        <flow-ref name="signup-flow" />
    </until-successful>
</flow>

Notice how the Until Successful message processor has been configured to synchronously acknowledge it has accepted the inbound event for processing by returning the current message correlation ID. Sending to the “signup” VM endpoint therefore returns the correlation ID of the message whose processing by the sub-flow named “signup-flow” is tried (and retried).

Example - Configure Object Stores

The following example demonstrates how to configure object stores in the following three situations:

  1. idempotent filter with an in-memory object store

  2. idempotent filter with a persistent object store

  3. Until a successful scope occurs with an in-memory object store


         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd"> 
 
<!-- Global object store definition for a Listable Object Store, used in Flow 3 below. -->
 
    <spring:beans>
        <spring:bean id="myListableObjectStore" class="org.mule.util.store.SimpleMemoryObjectStore"/>
    </spring:beans>
 
    <http:listener-config name="HTTP_Listener_Configuration" host="localhost" port="8081" doc:name="HTTP Listener Configuration"/>
 
<!--  Idempotent Filter with In Memory Object Store -->
 
    <flow name="Flow1_idempotentWithInMemoryStore" doc:name="Flow1_idempotentWithInMemoryStore">
        <http:listener config-ref="HTTP_Listener_Configuration" path="idempotentInMemory" doc:name="HTTP"/>
        <idempotent-message-filter idExpression="#[message.payload]" throwOnUnaccepted="true" storePrefix="Idempotent_Message" doc:name="Idempotent Message">
            <in-memory-store name="myInMemoryObjectStore" entryTTL="120" expirationInterval="3600" maxEntries="60000" />
        </idempotent-message-filter>
        <set-payload value="YAY!" doc:name="Set Payload" />
        <catch-exception-strategy doc:name="Catch Exception Strategy">
            <set-payload value="NAY!" doc:name="Set Payload" />
        </catch-exception-strategy>
    </flow>

<!--  Idempotent Filter with Persistent File Store -->
 
    <flow name="Flow2_idempotentWithTextFileStore" doc:name="Flow2_idempotentWithTextFileStore">
        <http:listener config-ref="HTTP_Listener_Configuration" path="idempotentTextFile" doc:name="HTTP"/>
        <idempotent-message-filter idExpression="#[message.payload]" throwOnUnaccepted="true" storePrefix="Idempotent_Message" doc:name="Idempotent Message">
            <simple-text-file-store name="mySimpleTextFileStore"                directory="#[server.tmpDir + '/objectstore']" entryTTL="120" expirationInterval="3600" maxEntries="60000" />
        </idempotent-message-filter>
        <set-payload value="YAY!" doc:name="Set Payload" />
        <catch-exception-strategy doc:name="Catch Exception Strategy">
            <set-payload value="NAY!" doc:name="Set Payload" />
        </catch-exception-strategy>
    </flow>

<!--  Until Successful Scope with In Memory Object Store -->
 
    <flow name="Flow3_UntilSuccessfulWithListableObjectStore" doc:name="UntilSuccessfulWithListableObjectStore">
        <http:listener config-ref="HTTP_Listener_Configuration" path="hey" doc:name="HTTP"/>
        <until-successful objectStore-ref="myListableObjectStore" maxRetries="15" secondsBetweenRetries="1" doc:name="Until Successful">
            <processor-chain doc:name="Processor Chain">
                <message-filter throwOnUnaccepted="true">
                    <expression-filter expression="return Math.random() &lt; 0.1" doc:name="Expression" />
                </message-filter>
                <logger message="This eventually happens." doc:name="Logger" />
            </processor-chain>
        </until-successful>
        <set-payload value="Completed" doc:name="Set Payload" />
    </flow>
 
</mule>

See Also