<scatter-gather doc:name="Scatter-Gather">
<custom-aggregation-strategy class="com.example.poc.CustomSGAggregationStrategy"/>
<processor-chain>
...
</processor-chain>
<processor-chain>
...
</processor-chain>
</scatter-gather>
<logger message="#[payload]" level="INFO" doc:name="Logger"/>
Migrating the Scatter-Gather Router
The most important change in Scatter-Gather router between Mule 3 and Mule 4
is in the AggregationStrategy used to consolidate the results of the
different routes. Instead of providing a Java class with the aggregation
logic, you can perform the aggregation with a DataWeave transformation.
The payload Scatter-Gather returns is a collection of messages, where each the result of each route. Those messages are accessible with DataWeave.
Example
This is a Mule 3 Scatter-Gather with a custom-aggregation-strategy
element referencing the Java class with the aggregation strategy:
And the aggregation strategy implementation is:
package com.example.poc;
import ...
public class CustomSGAggregationStrategy implements AggregationStrategy {
@Override
public MuleEvent aggregate(AggregationContext context) throws MuleException {
StringBuilder responseBuilder = new StringBuilder();
MuleEvent result = null;
ArrayList < MarketRate > marketRates = new ArrayList < > ();
for (MuleEvent event: context.collectEventsWithoutExceptions()) {
String response = (String) event.getMessage().getPayload();
String[] spiltResponse = StringUtils.split(response, ",");
MarketRate marketRate = new MarketRate();
marketRate.setEvent(event);
marketRate.setMarketName(spiltResponse[0]);
marketRate.setMarketRate(new Integer(spiltResponse[1]));
marketRates.add(marketRate);
System.out.println(marketRate);
}
Collections.sort(marketRates);
result = DefaultMuleEvent.copy(marketRates.get(0).getEvent());
result.getMessage().setPayload(marketRates.get(0).toString());
return result;
}
}
For Mule 4, the Java class is removed, and the flow elements become:
<scatter-gather doc:name="Scatter-Gather">
<route>
...
</route>
<route>
...
</route>
</scatter-gather>
<ee:transform>
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output text/plain
---
(payload map ((response, index) -> (response.payload splitBy(','))[1]) orderBy ((rate, index) -> rate))[0]
]]]></ee:set-payload>
</ee:message>
</ee:transform>
<logger message="#[payload]" level="INFO" doc:name="Logger"/>
Notice the ee:transform element that replicates the logic of the
CustomSGAggregationStrategy, in DataWeave rather than Java.



