<scatter-gather doc:name="Scatter-Gather"> <custom-aggregation-strategy class="com.example.poc.CustomSGAggregationStrategy"/> <processor-chain> ... </processor-chain> <processor-chain> ... </processor-chain> </scatter-gather> <logger message="#[payload]" level="INFO" doc:name="Logger"/>
Migrating the Scatter-Gather Router
The most important change in Scatter-Gather router between Mule 3 and Mule 4
is in the AggregationStrategy
used to consolidate the results of the
different routes. Instead of providing a Java class with the aggregation
logic, you can perform the aggregation with a DataWeave transformation.
The payload Scatter-Gather returns is a collection of messages, where each the result of each route. Those messages are accessible with DataWeave.
Example
This is a Mule 3 Scatter-Gather with a custom-aggregation-strategy
element referencing the Java class with the aggregation strategy:
And the aggregation strategy implementation is:
package com.example.poc; import ... public class CustomSGAggregationStrategy implements AggregationStrategy { @Override public MuleEvent aggregate(AggregationContext context) throws MuleException { StringBuilder responseBuilder = new StringBuilder(); MuleEvent result = null; ArrayList < MarketRate > marketRates = new ArrayList < > (); for (MuleEvent event: context.collectEventsWithoutExceptions()) { String response = (String) event.getMessage().getPayload(); String[] spiltResponse = StringUtils.split(response, ","); MarketRate marketRate = new MarketRate(); marketRate.setEvent(event); marketRate.setMarketName(spiltResponse[0]); marketRate.setMarketRate(new Integer(spiltResponse[1])); marketRates.add(marketRate); System.out.println(marketRate); } Collections.sort(marketRates); result = DefaultMuleEvent.copy(marketRates.get(0).getEvent()); result.getMessage().setPayload(marketRates.get(0).toString()); return result; } }
For Mule 4, the Java class is removed, and the flow elements become:
<scatter-gather doc:name="Scatter-Gather"> <route> ... </route> <route> ... </route> </scatter-gather> <ee:transform> <ee:message > <ee:set-payload ><![CDATA[%dw 2.0 output text/plain --- (payload map ((response, index) -> (response.payload splitBy(','))[1]) orderBy ((rate, index) -> rate))[0] ]]]></ee:set-payload> </ee:message> </ee:transform> <logger message="#[payload]" level="INFO" doc:name="Logger"/>
Notice the ee:transform
element that replicates the logic of the
CustomSGAggregationStrategy
, in DataWeave rather than Java.