Scatter-Gather
The routing message processor Scatter-Gather sends a request message to multiple targets concurrently. It collects the responses from all routes, and aggregates them into a single message.
Scatter-Gather replaced the All message processor, which was deprecated in Mule 3.5.0. Note that, unlike All, Scatter-Gather executes routes concurrently instead of sequentially. Parallel execution of routes greatly increases the efficiency of your application and provides more information than sequential processing. Refer to Migrating to Scatter-Gather from the All Message Router below for more information about the differences you can expect.
Prerequisites
This document assumes you are familiar with Mule applications and message processors, in particular routing message processors.
Scatter-Gather Behavior and Exceptions
The Scatter-Gather router sends a message for concurrent processing to all configured routes. The thread executing the flow that owns the router waits until all routes complete or time out.
If there are no failures, Mule aggregates the results from each of the routes into a message collection (MessageCollection class). Failure in one route does not stop the Scatter-Gather from sending messages to its other configured routes, so it is possible that many, or all routes may fail concurrently.
By default, if any route fails, Scatter-Gather performs the following actions:
-
Sets the exception payload accordingly for each route.
-
Throws a CompositeRoutingException, which maps each exception to its corresponding route using a sequential route ID.
Catching the CompositeRoutingException allows you to gather information on all failed routes.
CompositeRoutingException
The CompositeRoutingException extends the Mule MessagingException to aggregate exceptions from different routes in the context of a single message router. Exceptions are correlated to each route through a sequential ID.
This exception exposes two methods which allow you to obtain the IDs of failed routes and the exceptions returned by each route.
-
The
getExceptions
method returns a map where the key is an integer that identifies the index of the failed route, and the value is the exception itself. -
The
getExceptionForRouteIndex(int)
method returns the exception of the requested route ID.
Customizing Aggregation Strategies
Scatter-Gather allows you to define a custom aggregation strategy which overrides its default aggregation strategy. Among other things, custom gathering strategies allow you to:
-
Discard message responses
-
Merge message properties that originated in different routes
-
Discard failed messages without throwing an exception
-
Select only one from multiple responses
Configuration
Scatter-Gather directs messages through two or more processing routes. Using Scatter-Gather to direct messages through only one processing route throws an exception. Applications do not start if Scatter-Gather has been configured with less than two processing routes. |
Studio Visual Editor
Modeling Scatter-Gather in a Flow
Add a scatter-gather element to your flow, then, inside the dashed-line scope area of the scatter-gather element, drag and drop two or more message processors or connectors, placing them parallel to one another, as shown.
General Tab
Field | Description | Example XML |
---|---|---|
Display Name |
Customize to display a unique name for the splitter in your application. Default is |
doc:name="Scatter-Gather" |
Aggregation Strategy |
Specify an aggregation strategy to use:
|
<custom-aggregation-strategy class= "org.my.CustomAggregationStrategy"/> |
Advanced Tab
Field | Description | Example XML |
---|---|---|
Timeout |
Sets the timeout for responses from sent messages, in milliseconds. A value of 0 or lower than 0 means no timeout. Default is |
timeout="6000" |
Threading Profile |
Optionally, use to customize the threading profile. Refer to Tuning Performance for a description of the configurable attributes. For a brief discussion of threading profiles in Scatter-Gather, see the section below. Default is |
<threading-profile maxBufferSize="100"/> |
XML Editor or Standalone
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:jms="http://www.mulesoft.org/schema/mule/jms" xmlns:tcp="http://www.mulesoft.org/schema/mule/tcp" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/tcp http://www.mulesoft.org/schema/mule/tcp/current/mule-tcp.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd">
<http:request-config name="HTTP_Request_Configuration" host="localhost" port="8081" doc:name="HTTP Request Configuration"/>
<jms:connector name="JMS" username="me" password="metoo" validateConnections="true" doc:name="JMS"/>
<flow name="scatter-gather-Flow">
<scatter-gather timeout="6000" doc:name="Scatter-Gather">
<custom-aggregation-strategy class="org.my.CustomAggregationStrategy"/>
<threading-profile maxThreadsActive="1" poolExhaustedAction="WAIT" maxBufferSize="100"/>
<http:request config-ref="HTTP_Request_Configuration" path="/" method="GET" doc:name="HTTP"/>
<tcp:outbound-endpoint exchange-pattern="request-response" host="localhost" port="80" responseTimeout="10000" doc:name="TCP"/>
<jms:outbound-endpoint doc:name="JMS" address="127.0.0.1" connector-ref="JMS"/>
</scatter-gather>
</flow>
</mule>
Element | Description |
---|---|
scatter-gather |
Sends a request message to multiple targets concurrently. It collects the responses from all routes, and aggregates them into a single message. |
Attribute | Description | Default Value | Required? |
---|---|---|---|
timeout |
Sets the timeout for responses from sent messages, in milliseconds. A value of 0 or lower than 0 means no timeout. |
|
Optional Child Element | Description |
---|---|
custom-aggregation-strategy |
Allows you to define a custom gathering strategy using either a custom class or a reference to a spring bean. Note that you cannot set |
Attribute | Description | Default Value | Required? |
---|---|---|---|
class |
A string with the canonical name of a class that implements the aggregation strategy. That class is required to have a default constructor. |
- |
|
ref |
The name of a registered bean that implements the aggregation strategy. |
- |
Optional Child Element | Description |
---|---|
threading-profile |
Allows you to configure the underlying thread pool. Refer to Tuning Performance for a list of configurable attributes, all of which can be applied here. For a brief discussion of threading profiles in Scatter-Gather, see the section below. |
Scatter-Gather Threading Profiles
Scatter-Gather’s default threading profile is designed to work in most scenarios, where the Scatter-Gather component is typically configured with between three to six routes. If the default threading profile is not best suited for your needs, Scatter-Gather allows you to define a custom threading profile for the component.
Scatter-Gather’s threading profile is specific to the Scatter-Gather router and does not define the threading profile for your whole Mule application; however, threads started by each Scatter-Gather router are shared across all messages passing through the flow. This means that a high number of threads configured in Scatter-Gather does not necessarily guarantee that enough processing power is available to meet the requirements for all messages. For example, suppose two messages arrive two milliseconds apart from each other at a Scatter-Gather component with 20 routes and 20 threads. The first message has access to the 20 threads and executes promptly whereas the second message has high latency while it waits for the first message to release these threads.
Ultimately, the optimum threading profile depends on each application. For most scenarios, MuleSoft recommends that the number of threads in Scatter-Gather should be the result of the number of routes times the value of maxThreadsActive
for the flow where Scatter-Gather resides.
-
maxThreadsActive
for Scatter-Gather is the number of routes in a Scatter-Gather *maxThreadsActive
for flow
However, in some scenarios the above recommendation could result in a large number of threads which would consume a lot of memory and processing power. If this is the case, you need to experiment to find the optimum tuning point, that is, the exact point at which parallelism provides maximum gain before starting to become a bottleneck.
For scenarios in which routes execute very quickly (a couple of milliseconds per route) it’s probably better to do sequential processing.
For details on setting up threading profiles, see Tuning Performance.
Migrating to Scatter-Gather from the All Message Router
If you are currently using All routers in your application, you may wish to replace them with Scatter-Gather routers. This section details the differences you need to be aware of when considering migration.
Why Migrate?
Support for the All router continues throughout the Mule 3.x series. However, MuleSoft recommends migrating to Scatter-Gather, mainly for two reasons:
-
Scatter-Gather is a better option for most cases
-
Migrating to Scatter-Gather now facilitates the transition to Mule 4
Differences Between Scatter-Gather and the All Router
The All router implements sequential multicasting to send a message through the specified routes. This works well in some situations, such as the following:
-
where route n depends on side effects generated on target systems by route n-1
-
where an exception in route n should prevent Mule from sending messages to route n+1
However, where the above situations do not apply, the only effect of sequential multicasting is to decrease application efficiency. In these cases, it is best to send the message to all routes concurrently. The scatter-gather routing message processor does precisely that, executing all message routes concurrently. It allows you to:
-
multicast a single message in parallel to several routes
-
configure a timeout after which a failed route causes the application to throw an exception
-
group exceptions in case of failed routes
The table below compares the three main differences between the All and the Scatter-Gather message routers.
Compare | All | Scatter-Gather |
---|---|---|
Processing |
Employs serial processing and one single thread to send the current Mule message across all specified routes. Hence, to access all of the responses returned by the routes, the application must wait until all of the routes have finished execution. |
Uses parallel processing in a thread pool to concurrently execute all routes. Hence, to access all of the responses returned by the routes, the application need only wait until the slowest route has finished execution. |
Error handling |
If a route fails, successive routes do not execute. Likewise, if route n fails, it is not possible to obtain information about route n-1. you can only obtain information about the failed route. |
Parallel execution means that even if one or many routes fail, the rest of the assigned routes still execute. If one or more routes throws an exception, scatter-gather throws a |
Customization |
If successful, the all router always returns a MuleMessageCollection, and this is the only information that you can obtain from it. |
Scatter-Gather uses an aggregator to combine responses from all routes. To provide backwards compatibility, by default Scatter-Gather returns a MuleMessageCollection, thereby facilitating migration for users who wish to take advantage of improved performance. However, Scatter-Gather also allows you to define your own custom aggregation strategy (see the Complete Code Example below). |
Complete Code Example
In this example, a travel booking application selects direct flight routes between user-selected cities. The application contacts a list of airline brokers for available flights, then selects the least expensive flight. It uses Scatter-Gather to concurrently send the message to each airline broker, then waits for all routes to complete. Prior to selecting the least expensive flight, the app needs to eliminate (filter out) any routes that returned an error. To do this, it uses a custom aggregation strategy, which is invoked using the custom-aggregation-strategy
attribute within Scatter-Gather. The complete Scatter-Gather XML is shown below.
<scatter-gather timeout="5000">
<custom-aggregation-strategy class="org.myproject.CheapestFlightAggregationStrategy" />
<flow-ref name="flightBroker1" />
<flow-ref name="flightBroker2" />
<flow-ref name="flightBroker3" />
</scatter-gather>
In the code above, scatter-gather’s custom-aggregation-strategy
invokes public class org.myproject.CheapestFlightAggregationStrategy
, which contains the code showed below, for filtering out failed routes.
public class CheapeastFlightAggregationStrategy implements AggregationStrategy {
@Override
public MuleEvent aggregate(AggregationContext context) throws MuleException {
MuleEvent result = null;
long value = Long.MAX_VALUE;
for (MuleEvent event : context.collectEventsWithoutExceptions()) {
Flight flight = (Flight) event.getMessage().getPayload();
if (flight.getCost() < value) {
result = DefaultMuleEvent.copy(event);
value = flight.getCost();
}
}
if (result != null) {
return result;
}
throw new RuntimeException("no flights obtained");
}
}
Notice the line: result = DefaultMuleEvent.copy(event); Users running Mule 3.5.0 need to copy the event instead of simply referencing it. The reason is that the event was created in a thread other than the one processing the flow. Therefore, any attempt at modifying the message after the Scatter-Gather finalizes its execution would result in an In Mule 3.5.1 and newer, Scatter-Gather automatically handles the message, saving you the task of manually copying it. |
Serial Multicast with Scatter-Gather
An earlier section of this page discusses some situations where sequential multicast is desirable over concurrent execution of all message routes. If you need sequential multicast in your application, you can implement it by configuring Scatter-Gather with a custom threading profile of only one thread, as shown below in the Max Active Threads field.
In the context of the above Complete Code Example, the XML would look as follows:
<scatter-gather timeout="5000">
<threading-profile maxThreadsActive="1"/>
<custom-aggregation-strategy class="org.myproject.CheapestFlightAggregationStrategy" />
<flow-ref name="flightBroker1" />
<flow-ref name="flightBroker2" />
<flow-ref name="flightBroker3" />
</scatter-gather>
Like the All router, this configuration ensures that the routes are invoked sequentially. However, there is one difference: unlike with the All router, with this configuration if one route fails the subsequent routes are still invoked.
Defining a threading profile of only one thread may yield below-par performance results in some situations, since the single thread used by Scatter-Gather is shared across all messages in the flow. If you find that this is the case, it may be desirable to fall back to using the All router for sequential processing. As of Mule version 3.6.0 this issue is fixed. |