Nav
You are viewing an older version of this section. Click here to navigate to the latest version.

Scatter-Gather

The routing message processor Scatter-Gather sends a request message to multiple targets concurrently. It collects the responses from all routes, and aggregates them into a single message.

Scatter-Gather replaces the All message processor, which is deprecated as of Mule 3.5.0. Note that, unlike All, Scatter-Gather executes routes concurrently instead of sequentially. Parallel execution of routes can greatly increase the efficiency of your application and provides more information than sequential processing. Refer to Migrating to Scatter-Gather from the All Message Router below for more information about the differences you can expect.

Assumptions

This document assumes you are familiar with Mule applications and message processors, in particular routing message processors.

Scatter-Gather Behavior and Exceptions

scatter-gather

The Scatter-Gather router sends a message for concurrent processing to all configured routes. The thread executing the flow that owns the router waits until all routes complete or time out.

If there are no failures, Mule aggregates the results from each of the routes into a message collection (MessageCollection class). Failure in one route does not stop the Scatter-Gather from sending messages to its other configured routes, so it is possible that many, or all routes may fail concurrently.

By default, if any route fails, Scatter-Gather performs the following actions:

  • sets the exception payload accordingly for each route

  • throws a CompositeRoutingException, which maps each exception to its corresponding route using a sequential route ID

Catching the CompositeRoutingException allows you to gather information on all failed routes. 

CompositeRoutingException

The CompositeRoutingException is new to the 3.5.0 Runtime. It extends the Mule MessagingException to aggregate exceptions from different routes in the context of a single message router. Exceptions are correlated to each route through a sequential ID.

This exception exposes two methods which allow you to obtain the IDs of failed routes and the exceptions returned by each route.

  • The getExceptions method returns a map where the key is an integer that identifies the index of the failed route, and the value is the exception itself.

  • The getExceptionForRouteIndex(int) method returns the exception of the requested route ID.

Customizing Aggregation Strategies

Scatter-Gather allows you to define a custom aggregation strategy which overrides its default aggregation strategy. Among other things, custom gathering strategies allow you to:

  • discard message responses

  • merge message properties that originated in different routes

  • discard failed messages without throwing an exception

  • select only one from multiple responses

Configuration

Scatter-Gather is designed to direct messages through two or more processing routes. Using Scatter-Gather to direct messages through only one processing route will throw an exception.

In Mule 3.5.1 and above, the application will not start if Scatter-Gather has been configured with less than two processing routes.

Modeling Scatter-Gather in a Flow

Add a scatter-gather element to your flow, then, inside the dashed-line scope area of the scatter-gather element, drag and drop two or more message processors or connectors, placing them parallel to one another, as shown.

scatter-gather

General Tab

scatter-gather

Field Description Default Value Example XML

Display Name

Customize to display a unique name for the splitter in your application.

Scatter-Gather


          
                  
               
1
doc:name="Scatter-Gather"

Aggregation Strategy

Specify an aggregation strategy to use:

  • Default: Use Mule’s default strategy.

  • From Class: Provide the canonical name of a class that defines a custom aggregation strategy.

  • From Reference: Provide the name of a spring bean that implements a custom aggregation strategy.

Default


          
                  
               
1
<custom-aggregation-strategy class= "org.my.CustomAggregationStrategy"/>

Advanced Tab

scatter-gather

Field Description Default Value Example XML

Timeout

Sets the timeout for responses from sent messages, in milliseconds. A value of 0 or lower than 0 means no timeout.

0


          
                  
               
1
timeout="6000"

Threading Profile

Optionally, use to customize the threading profile. Refer to Tuning Performance for a description of the configurable attributes. For a brief discussion of threading profiles in Scatter-Gather, see the section below.

Default threading profile


          
                  
               
1
<threading-profile maxBufferSize="100"/>

    
            
         
1
2
3
4
5
6
7
<scatter-gather doc:name="Scatter-Gather" timeout="6000">
  <custom-aggregation-strategy class="org.my.CustomAggregationStrategy"/>
    <threading-profile poolExhaustedAction="WAIT" maxBufferSize="100"/>
    <http:request path="/path" method="POST" doc:name="HTTP"/>
    <tcp:outbound-endpoint exchange-pattern="request-response" host="localhost" port="80" responseTimeout="10000" doc:name="TCP">
    <jms:outbound-endpoint connector-ref="JMS1" ref="JMS" doc:name="JMS"/>
</scatter-gather>
Element Description

scatter-gather

Sends a request message to multiple targets concurrently. It collects the responses from all routes, and aggregates them into a single message.

Attribute Description Default Value Required?

timeout

Sets the timeout for responses from sent messages, in milliseconds. A value of 0 or lower than 0 means no timeout.

0

Optional Child Element Description

custom-aggregation-strategy

Allows you to define a custom gathering strategy using either a custom class or a reference to a spring bean. Note that you cannot set class and ref at the same time. Doing so will result in an exception when starting the application. See Customizing Gather Strategies above and the [Complete Code Example] below.

Attribute Description Default Value Required?

class

A string with the canonical name of a class that implements the aggregation strategy. That class is required to have a default constructor.

-

ref

The name of a registered bean that implements the aggregation strategy.

-

Optional Child Element Description

threading-profile

Allows you to configure the underlying thread pool. Refer to Tuning Performance for a list of configurable attributes, all of which can be applied here. For a brief discussion of threading profiles in Scatter-Gather, see the section below.

Scatter-Gather Threading Profiles

Scatter-Gather’s default threading profile is designed to work in most scenarios, where the Scatter-Gather component is typically configured with between three to six routes. If the default threading profile is not best suited for your needs, Scatter-Gather allows you to define a custom threading profile for the component.

Scatter-Gather’s threading profile is specific to the Scatter-Gather router and does not define the threading profile for your whole Mule application; however, threads started by each Scatter-Gather router are shared across all messages passing through the flow. This means that a high number of threads configured in Scatter-Gather does not necessarily guarantee that enough processing power will be available to meet the requirements for all messages. For example, suppose two messages arrive two milliseconds apart from each other at a Scatter-Gather component with 20 routes and 20 threads. The first message will have access to the 20 threads and will execute promptly whereas the second message will have high latency while it waits for the first message to release these threads.

Ultimately, the optimum threading profile depends on each application. For most scenarios, MuleSoft recommends that the number of threads in Scatter-Gather should be the result of the number of routes times the value of maxThreadsActive for the flow where Scatter-Gather resides.

  • maxThreadsActive for Scatter-Gather = number of routes in Scatter-Gather * maxThreadsActive for flow

However, in some scenarios the above recommendation could result in a large number of threads which would consume a lot of memory and processing power. If this is the case, you will need to experiment in order to find the optimum tuning point, i.e. the exact point at which parallelism provides maximum gain before starting to become a bottleneck.

For scenarios in which routes execute very quickly (a couple of milliseconds per route) it’s probably better to do sequential processing.

For details on setting up threading profiles, see Tuning Performance.

Migrating to Scatter-Gather from the All Message Router

If you are currently using All routers in your application, you may wish to replace them with Scatter-Gather routers. This section details the differences you need to be aware of when considering migration.

Why Migrate?

Support for the All router will continue throughout the Mule 3.x series. However, MuleSoft recommends migrating to Scatter-Gather, mainly for two reasons:

  • Scatter-Gather is a better option for most cases

  • migrating to Scatter-Gather now will facilitate the transition to Mule 4

Differences Between Scatter-Gather and the All Router

The All router implements sequential multicasting to send a message through the specified routes. This works well in some situations, such as the following:

  • where route n depends on side effects generated on target systems by route n-1

  • where an exception in route n should prevent Mule from sending messages to route n+1

However, where the above situations do not apply, the only effect of sequential multicasting is to decrease application efficiency. In these cases, it is best to send the message to all routes concurrently. The scatter-gather routing message processor does precisely that, executing all message routes concurrently. It allows you to:

  • multicast a single message in parallel to several routes

  • configure a timeout after which a failed route causes the application to throw an exception

  • group exceptions in case of failed routes

scatter-gather

The table below compares the three main differences between the All and the Scatter-Gather message routers.

Compare All Scatter-Gather

Processing

Employs serial processing and one single thread to send the current Mule message across all specified routes. Hence, to access all of the responses returned by the routes, the application must wait until all of the routes have finished execution.

Uses parallel processing in a thread pool to concurrently execute all routes. Hence, to access all of the responses returned by the routes, the application need only wait until the slowest route has finished execution.

Error handling

If a route fails, successive routes are not executed. Likewise, if route n fails, it is not possible to obtain information about route n-1; i.e. you can only obtain information about the failed route.

Parallel execution means that even if one or many routes fail, the rest of the assigned routes will still be executed. If one or more routes throw an exception, scatter-gather throws a CompositeRoutingException, which allows the application to retrieve information about both failed and successful routes.

Customization

If successful, the all router always returns a MuleMessageCollection, and this is the only information that you can obtain from it.

Scatter-Gather uses an aggregator to combine responses from all routes. To provide backwards compatibility, by default Scatter-Gather returns a MuleMessageCollection, thereby facilitating migration for users who wish to take advantage of improved performance. However, Scatter-Gather also allows you to define your own custom aggregation strategy (see the Complete Code Example below).

Complete Code Example

In this example, a travel booking application selects direct flight routes between user-selected cities. The application contacts a list of airline brokers for available flights, then selects the least expensive flight. It uses Scatter-Gather to concurrently send the message to each airline broker, then waits for all routes to complete. Prior to selecting the least expensive flight, the app needs to eliminate (filter out) any routes that returned an error. To do this, it uses a custom aggregation strategy, which is invoked using the custom-aggregation-strategy attribute within Scatter-Gather. The complete Scatter-Gather XML is shown below.


         
      
1
2
3
4
5
6
<scatter-gather timeout="5000">
    <custom-aggregation-strategy class="org.myproject.CheapestFlightAggregationStrategy" />   
    <flow-ref name="flightBroker1" />
    <flow-ref name="flightBroker2" />
    <flow-ref name="flightBroker3" />
</scatter-gather>

In the code above, scatter-gather’s custom-aggregation-strategy invokes public class org.myproject.CheapestFlightAggregationStrategy, which contains the code showed below, for filtering out failed routes.


         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CheapeastFlightAggregationStrategy implements AggregationStrategy {
 
    @Override
    public MuleEvent aggregate(AggregationContext context) throws MuleException {
        MuleEvent result = null;
        long value = Long.MAX_VALUE;
        for (MuleEvent event : context.collectEventsWithoutExceptions()) {
            Flight flight = (Flight) event.getMessage().getPayload();
            if (flight.getCost() < value) {
                result = DefaultMuleEvent.copy(event);
                value = flight.getCost();
            }
        }
         
        if (result != null)  {
            return result;
        }
         
        throw new  RuntimeException("no flights obtained");
    }
}

Notice the line:


              
           
1
result = DefaultMuleEvent.copy(event);

Users running Mule 3.5.0 need to copy the event instead of simply referencing it. The reason is that the event was created in a thread other than the one processing the flow. Therefore, any attempt at modifying the message after the Scatter-Gather finalizes its execution would result in an IllegalStateException, since for security reasons Mule does not allow modifying an event in a thread other than the one that created it.

In Mule 3.5.1 and above, Scatter-Gather will automatically handle the message, saving you the task of manually copying it.

Serial Multicast with Scatter-Gather

An earlier section of this page discusses some situations where sequential multicast is desirable over concurrent execution of all message routes. If you need sequential multicast in your application, you can implement it by configuring Scatter-Gather with a custom threading profile of only one thread, as shown below in the Max Active Threads field.

scatter-gather

In the context of the above Complete Code Example, the XML would look as follows:


         
      
1
2
3
4
5
6
7
<scatter-gather timeout="5000">
  <threading-profile maxThreadsActive="1"/>
  <custom-aggregation-strategy class="org.myproject.CheapestFlightAggregationStrategy" />
  <flow-ref name="flightBroker1" />
  <flow-ref name="flightBroker2" />
  <flow-ref name="flightBroker3" />
</scatter-gather>

Like the All router, this configuration ensures that the routes are invoked sequentially. However, there is one difference: unlike with the All router, with this configuration if one route fails the subsequent routes are still invoked.

Defining a threading profile of only one thread may yield below-par performance results in some situations, since the single thread used by Scatter-Gather will be shared across all messages in the flow. If you find that this is the case, it may be desirable to fall back to using the All router for sequential processing. As of Mule version 3.6.0 this issue is fixed.

See Also

  • Learn more about message routing.