Contact Us 1-800-596-4880

Migrating the Scatter-Gather Router

The most important change in Scatter-Gather router between Mule 3 and Mule 4 is in the AggregationStrategy used to consolidate the results of the different routes. Instead of providing a Java class with the aggregation logic, you can perform the aggregation with a DataWeave transformation.

The payload Scatter-Gather returns is a collection of messages, where each the result of each route. Those messages are accessible with DataWeave.

Example

This is a Mule 3 Scatter-Gather with a custom-aggregation-strategy element referencing the Java class with the aggregation strategy:

<scatter-gather doc:name="Scatter-Gather">
    <custom-aggregation-strategy class="com.example.poc.CustomSGAggregationStrategy"/>
    <processor-chain>
        ...
    </processor-chain>
    <processor-chain>
        ...
    </processor-chain>
</scatter-gather>
<logger message="#[payload]" level="INFO" doc:name="Logger"/>

And the aggregation strategy implementation is:

package com.example.poc;

import ...

public class CustomSGAggregationStrategy implements AggregationStrategy {
    @Override
    public MuleEvent aggregate(AggregationContext context) throws MuleException {
        StringBuilder responseBuilder = new StringBuilder();
        MuleEvent result = null;
        ArrayList < MarketRate > marketRates = new ArrayList < > ();
        for (MuleEvent event: context.collectEventsWithoutExceptions()) {
           String response = (String) event.getMessage().getPayload();
           String[] spiltResponse = StringUtils.split(response, ",");
           MarketRate marketRate = new MarketRate();
           marketRate.setEvent(event);
           marketRate.setMarketName(spiltResponse[0]);
           marketRate.setMarketRate(new Integer(spiltResponse[1]));
           marketRates.add(marketRate);
           System.out.println(marketRate);
        }
        Collections.sort(marketRates);
        result = DefaultMuleEvent.copy(marketRates.get(0).getEvent());
        result.getMessage().setPayload(marketRates.get(0).toString());
        return result;
     }
    }

For Mule 4, the Java class is removed, and the flow elements become:

<scatter-gather doc:name="Scatter-Gather">
    <route>
        ...
    </route>
    <route>
        ...
    </route>
</scatter-gather>
<ee:transform>
    <ee:message >
    	<ee:set-payload ><![CDATA[%dw 2.0
output text/plain
---
(payload map ((response, index) -> (response.payload splitBy(','))[1]) orderBy ((rate, index) -> rate))[0]
]]]></ee:set-payload>
    </ee:message>
</ee:transform>
<logger message="#[payload]" level="INFO" doc:name="Logger"/>

Notice the ee:transform element that replicates the logic of the CustomSGAggregationStrategy, in DataWeave rather than Java.