Nav
You are viewing an older version of this section. Click here to navigate to the latest version.

Async Scope Reference

An async scope is a branch processing block that executes simultaneously with the parent message flow. This type of processing block can prove useful for executing time-consuming operations (such as printing a file or connecting to a mail server) — as long as those operations do not require sending a response back to the initiating flow. In other words, the main flow can continue execution while it initiates and processes the asynchronous scope; it does not have to pause until the last message processor embedded in the asynchronous flow has completed its task.

To facilitate this simultaneous branch processing, the async scope sends one copy of the message it has received to the first embedded message processor in its own processing block; at the same time it sends another copy of the message to the next message processor in the main flow (see below).

Async+scope+schematic

Since they operate on a copy of the message on a different thread, async scopes cannot, by definition, support request-response exchange patterns. Instead, they must implement one of several supported one-way processing strategies, as detailed in the configuration section, below.

If no processing strategy is configured for the async scope, Mule applies a queued-asynchronous processing strategy.

Async Scopes versus Asychronous Flows

An async scope is similar to an asynchronous flow in that:

  • It processes the message asynchronously with the main flow, so the message is simultaneously processed in the async scope without pausing the processing in the main flow thread

  • Does not pass data from the scope back into the main flow thread

  • It can have its own processing strategy

However, unlike an asynchronous flow, an async scope:

  • Exists in-line with the main flow thread

  • Is not called by a flow reference component

  • Is not re-usable

  • Cannot have its own exception handling strategy – it inherits this from the flow in which it resides

Async Scopes versus Subflows

An async scope is similar to a subflow in that it inherits the exception strategy of the main flow. 

However, unlike a subflow, an async scope:

  • Processes messages asynchronously

  • Does not pass data back to the main flow

  • Exists in-line with the main flow thread

  • Is not called by a flow reference component

  • Is not re-usable

Even though the Async scope receives a copy of the Mule message, the payload is not copied. The same payload objects are referenced by both Mule messages: One that continues down the original flow, and the one processed by the Async scope.

In other words, if the payload of your message is a mutable object (for example a bean with different fields in it) and a message processor in your async scope changes the value of one of the fields, the message processors outside of the Async scope see the changed values.

Configuration

  1. Drag the async scope to the canvas and position it within the sequence of message processors that make up your flow at the point where you want to initiate an asynchronous processing block.

    async1

  2. Within the scope area, add and configure the sequence of message processors that you want to execute asynchronously with the main flow. See example below.

    async+2

  3. Optionally, configure the async scope itself with a Display Name. If you wish to specify a Processing Strategy, see the instructions in the next section.

    async_scope

  1. Add an async element to your flow at the point where you want to initiate an asynchronous processing block. Refer to the code sample below.

  2. Configure the scope according to the tables below.

    Element Description

    async

    Use to create a block of message processors that execute asynchronously while the rest of the flow continues to execute in parallel.

    Attribute Description

    doc:name

    Customize to display a unique name for the async scope in your application.

    Note: Attribute not required in Mule Standalone configuration.

  3. Configure one or more child elements within the async scope to define the processing that should occur within the asynchronous processing block. Refer to code sample below. If you wish to specify a Processing Strategy, see the instructions in the next section.

    Child Element Description

    processingStrategy

    Optional. Specify the name of a processing strategy that you have defined as a global element.


    
            
         
1
2
3
4
5
6
7
8
9
<http:listener-config name="listener-config" host="localhost" port="8081"/>
<flow name="Async_Scope_ExampleFlow1" doc:name="Async_Scope_FlowFlow1" >
   <http:listener config-ref="listener-config" path="/" doc:name="HTTP Connector"/>
      <async doc:name="Async">
            <component doc:name="Takes a long time"/>
            <jms:outbound-endpoint doc:name="Store result"/>
      </async>
   <expression-transformer doc:name="Create Response"/>
</flow>

Configuring a Processing Strategy

Configuring a processing strategy is optional. Unless you explicitly define a different one, Mule applies the queued-asynchronous processing strategy to the scope. You can configure the Processing Strategy of the async scope to one of the following available processing strategies.

Strategy Description

Asynchronous Processing Strategy

Same as queued-asynchronous processing strategy (which is what Mule applies if no other processing strategy is configured) except that it doesn’t use a queue. Use this only if for some reason you do not want your processing to be distributed across nodes.

Custom Processing Strategy

A user-written processor strategy.

Queued-Asynchronous Processing Strategy

Uses a queue to decouple the flow’s receiver from the rest of the steps in the flow. It works the same way in a scope as in a flow. Mule applies this strategy unless another is specified. Select this if you want to fine-tune this processing strategy by:

  • Changing the number of threads available to the flow.

  • Limiting the number of messages that can be queued.

  • Specifying a queue store to persist data.

Queued Thread Per Processor Processing Strategy

Not applicable to most use cases. Writes messages to a queue, then every processor in the scope runs sequentially in a different thread.

Thread Per Processor Processor Strategy

Not applicable to most use cases. Every processor in the scope runs sequentially in a different thread.

For more information about processing strategies, see Flow Processing Strategies.

  1. Click the plus + sign to the right of the Processing Strategy field.

  2. In the Choose Global Type window, select from the list of available processing strategies, then click OK

    Studio_Async_ChooseGlobalType

  3. Configure the processing strategy as needed. For more information, see Flow Processing Strategies.

  1. Define your processing strategy as a global element, with any necessary configuration or optional fine-tuning. (For more information, see Flow Processing Strategies.) Refer to code sample below.

  2. Add a processingStrategy attribute to your async element to specify the processing strategy by name, as in the code sample.


    
             
          
1
2
3
4
5
6
7
8
9
10
11
<queued-asynchronous-processing-strategy name="Allow42Threads" maxThreads="42" doc:name="Queued Asynchronous Processing Strategy"/>

<http:listener-config name="listener-config" host="localhost" port="8081"/>
<flow name="Async_Scope_ExampleFlow1" doc:name="Async_Scope_FlowFlow1" >
   <http:listener config-ref="listener-config" path="/" doc:name="HTTP Connector"/>
      <async doc:name="Async" processingStrategy="Allow42Threads">
            <component doc:name="Takes a long time"/>
            <jms:outbound-endpoint queue="myQueue" connector-ref="Active_MQ" doc:name="Store Result"/>
      </async>
   <expression-transformer doc:name="Create Response"/>
</flow>

Replacing versus Modifying Object References

If you replace, that is, change the reference completely inside the async scope, then both the payload and the flow variable in the original thread continue to have their original values.

If you modify, that is, make a change in the object referenced, but leave the same reference, the payload is modified for the original thread, but is preserved for the flow variable, since the former is not copied but the latter is.

Example:

The following example lets you test a replace versus a modify on an async scope.

You can see test the example with these calls:


         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
<flow name="replace">
  <http:inbound-endpoint address="http://localhost:9000/replacepayload" exchange-pattern="request-response" />
  <set-payload value="original payload" />
  <set-variable value="original flowvar" variableName="testflowvar"/>
  <logger level="WARN" message="original payload: #[payload]" />
  <logger level="WARN" message="original flowvar: #[flowVars['testflowvar']]" />
  <async>
    <set-payload value="new payload" />
    <set-variable value="new flowvar" variableName="testflowvar"/>
    <logger level="WARN" message="Payload in async: #[payload]" />
    <logger level="WARN" message="Flowvar in async: #[flowVars['testflowvar']]" />
  </async>
  <scripting:component>
    <scripting:script engine="groovy">
      <scripting:text>
Thread.sleep(3000)
return payload
      </scripting:text> </scripting:script>
  </scripting:component> <logger level="WARN" message="Payload after async: #[payload]" />
  <logger level="WARN" message="Flowvar after async: #[flowVars['testflowvar']]" />
</flow> <flow name="modify"> <http:inbound-endpoint address="http://localhost:9000/modifypayload" exchange-pattern="request-response" /> <set-payload value="#[['key':'originalvalue']]" /> <set-variable value="#[['key':'originalvalue']]" variableName="testflowvar"/> <logger level="WARN" message="original payload: #[payload]" /> <logger level="WARN" message="original flowvar: #[flowVars['testflowvar']]" /> <async> <set-payload value="#[payload.key = 'new payload'; return payload]" /> <set-variable value="#[['key':'new value']]" variableName="testflowvar"/> <logger level="WARN" message="Payload in async: #[payload]" /> <logger level="WARN" message="Flowvar in async: #[flowVars['testflowvar']]" /> </async> <scripting:component> <scripting:script engine="groovy"> <scripting:text> Thread.sleep(3000) return payload </scripting:text> </scripting:script> </scripting:component> <logger level="WARN" message="Payload after async: #[payload]" /> <logger level="WARN" message="Flowvar after async: #[flowVars['testflowvar']]" /> </flow>

Complete Example Code

The following example shows an async flow with a Java component communicating with a Java Message Service queue.


         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:jms="http://www.mulesoft.org/schema/mule/jms" xmlns:wmq="http://www.mulesoft.org/schema/mule/ee/wmq" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
        xmlns:spring="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd
http://www.mulesoft.org/schema/mule/ee/wmq http://www.mulesoft.org/schema/mule/ee/wmq/current/mule-wmq-ee.xsd">
    <http:listener-config name="HTTP_Listener_Configuration" host="localhost" port="8081" doc:name="HTTP Listener Configuration"/>
    <jms:activemq-connector name="Active_MQ" username="myusername" password="mypassword" brokerURL="tcp://localhost:61616" validateConnections="true" doc:name="Active MQ"/>
    <queued-asynchronous-processing-strategy name="Queued_Asynchronous_Processing_Strategy" maxThreads="42" doc:name="Queued Asynchronous Processing Strategy"/>
    <flow name="asyncFlow">
        <http:listener config-ref="HTTP_Listener_Configuration" path="/" doc:name="HTTP"/>
        <async doc:name="Async" processingStrategy="Queued_Asynchronous_Processing_Strategy">
            <component doc:name="Takes a Long Time"/>
            <jms:outbound-endpoint queue="myQueue" connector-ref="Active_MQ" doc:name="Store Result"/>
        </async>
        <wmq:expression-transformer doc:name="Create Response"/>
    </flow>
</mule>

See Also