Scatter-Gather Router
The Scatter-Gather component is a routing event processor that processes a Mule event through different parallel processing routes that contain different event processors. Each route receives a reference to the Mule event and executes a sequence of one or more event processors. Each of these routes uses a separate thread to execute the event processors, and the resulting Mule event can be either the same Mule event without modifications or a new Mule event with its own payload, attributes, and variables. The Scatter-Gather component then combines the Mule events returned by each processing route into a new Mule event that is passed to the next event processor only after every route completes successfully.
The Scatter-Gather component executes each route in parallel, not sequentially. Parallel execution of routes can greatly increase the efficiency of your Mule application and may provide more information than sequential processing.
The Scatter-Gather component works with repeatable streams. It does not process nonrepeatable streams, which can be read only once before they are lost. By default, all streams are repeatable in Mule unless a component’s streaming strategy is configured to be nonrepeatable.
The following diagram details the behavior of the Scatter-Gather component:
1 | The Scatter-Gather component receives a Mule event and sends a reference of this Mule event to each processing route. |
2 | Each of the processing routes starts executing in parallel. After all processors inside a route finish processing, the route returns a Mule event, which can be either the same Mule event without modifications or a new Mule event created by the processors in the route as a result of the modifications applied. |
3 | After all processing routes have finished execution, the Scatter-Gather component creates a new Mule event that combines all resulting Mule events from each route, and then passes the new Mule event to the next component in the flow. |
Configure your Scatter-Gather component to have at least two routes; otherwise, your Mule application throws an exception and does not start. |
Result
After all routes complete, the component outputs results in the following format:
{0: messageFromRoute0, 1: messageFromRoute1, …}
To extract all resulting payloads to an array, use a DataWeave script, such as the following one:
flatten(valuesOf(payload) map ((item, index) -> item.*payload))
Variable Propagation
Every route starts with the same initial variable values. Modifications to a variable within a specific route do not affect other routes. So, if a variable is added or modified in one route, then, after aggregation, the value is defined by that route. If a variable is added or modified by more than one route, the value is added to a list of all the values defined for that variable within all the routes, for example:
<set-variable variableName="var1" value="var1"/>
<set-variable variableName="var2" value="var2"/>
<scatter-gather doc:name="Scatter-Gather" doc:id="abc665e0-6119-4ecb-9f8b-52dbcbb1d488" >
<route >
<set-variable variableName="var2" value="newValue"/>
<set-variable variableName="var3" value="appleVal"/>
</route>
<route >
<set-variable variableName="var3" value="bananaVal"/>
</route>
<route >
<set-variable variableName="var3" value="otherVal"/>
<set-variable variableName="var4" value="val4"/>
</route>
</scatter-gather>
After aggregation, the variables are:
{var1: "var1", var2: "newValue", var3: ["appleVal, bananaVal, otherVal"], var4: "val4"}
Error Handling Inside Scatter-Gather Routes
You can use a Try scope in each route of a Scatter-Gather component to handle any errors that might be generated by a route’s event processors. After every route executes, if any has failed with an error, then the Scatter-Gather component throws an error of type MULE:COMPOSITE_ROUTING
, and event processing does not proceed past the Scatter-Gather component in the flow. Instead, the flow branches to your error-handling event processors.
Because the MULE:COMPOSITE_ROUTING
error object gathers not only errors from routes that failed but also Mule events from successfully completed routes, your application can use the error-handling event processors to process Mule events from the routes that completed
To illustrate how this works, consider the following two cases:
-
The routes in a Scatter-Gather component each contain a Try scope.
One of the routes generates an error that is successfully handled by that route’s Try scope through anon-error-continue
error handler, so the route is completed successfully. The Scatter-Gather component consolidates the Mule events from all routes into a new Mule event and passes the consolidated event to the next event processor. -
One of the routes in a Scatter-Gather component does not contain a Try scope or contains a Try scope with an error handler that cannot handle the error type, or the error handler is an
on-error-propagate
type.
An error occurs in this route, causing the route to fail, which in turn causes the Scatter-Gather component to throw aMULE:COMPOSITE_ROUTING
error. The flow branches to your error-handling event processors, which are able to process the Mule events from the completed routes.
Example of handling these errors:
<flow name="errorHandler">
<scatter-gather>
<route>
<raise-error type="APP:MYERROR"/>
</route>
<route>
<set-payload value="apple"/>
</route>
</scatter-gather>
<error-handler>
<on-error-continue type="MULE:COMPOSITE_ROUTING">
<!-- This will have the error thrown by the first route -->
<logger level="WARN" message="#[error.errorMessage.payload.failures['0']]"/>
<!-- This will be a null value -->
<logger level="WARN" message="#[error.errorMessage.payload.failures['1']]"/>
<!-- This will be a null value -->
<logger level="WARN" message="#[error.errorMessage.payload.results['0']]"/>
<!-- This will have the result of the second (correctly executed) route -->
<logger level="WARN" message="#[error.errorMessage.payload.results['1']]"/>
</on-error-continue>
</error-handler>
</flow>
Handle Timeout Errors in a Scatter-Gather
If you configure a timeout for a Scatter-Gather component and a route does not complete processing before the timeout expires, the route throws a MULE:TIMEOUT
error. This error is then handled the same way as any other error generated from a route: after each route completes (either by processing success or by throwing a MULE:TIMEOUT
error), the successful results and errors are collected together in the Scatter-Gather component MULE:COMPOSITE_ROUTING
error, which is then processed in your configured error handler.
Example Project
In Anypoint Studio, you can download and open the example project Scatter-Gather Flow Control from Anypoint Exchange to learn more about how to use the Scatter-Gather component. This example shows the usage of the scatter-gather control flow to aggregate data in parallel and return the result in JSON.
The example uses prepared data as input for two resources that should be aggregated. The data represents information about two contacts and has the following structure:
Resource |
|
|
|
|
contacts-1.csv |
John |
Doe |
096548763 |
|
contacts-2.csv |
Jane |
Doe |
091558780 |
DataWeave is used to aggregate the data. The information about the contacts is aggregated to a JSON structure that represents data from both resources.
To download and open this example project while you are in Anypoint Studio, click the Exchange icon in the upper-left corner. Then, in the window that opens, log into Anypoint Exchange and search on the name of the project.
Scatter-Gather XML Reference
Element | Description |
---|---|
|
Sends a request message to multiple targets concurrently. It collects the responses from all routes, and aggregates them into a single message. |
Attributes |
|
|
Sets the timeout for responses from sent messages, in milliseconds. A value of 0 or lower than 0 means no timeout. |
|
Determines the maximum amount of concurrent routes to process. By setting this value to 1, scatter-gather processes the routes sequentially. |
|
The name of the target variable. |
|
Value of the data to store in the target variable.
|
Child Element | Description |
---|---|
|
One of the routes in a the scatter-gather router. |