Nav

Splitter Flow Control Reference

The Splitter Flow Control splits a message into separate fragments, then sends these fragments one at a time to the next message processor in the flow. Segments are identified based on an expression parameter, usually written in Mule Expression Language (MEL), but other formats can be employed also. You can then use a Collection Aggregator Flow Control to reassemble the parts of the original message. You can also include a Resequencer Flow Control to put the parts back into the original sequence in case they are shuffled out of order.

Splitting and aggregating the message is especially useful when you intend to process the split parts in asynchronous flows running on separate servers. Together, the splitter and aggregator flow controls allow you to share the workload among several servers and still be able to reassemble the message after it’s processed.

Splitter Configuration

Splitter

Field Description

Display Name

Customize to display a unique name for the splitter in your application. Default is Splitter
Example: doc:name="Splitter"

Enable Correlation

Specifies whether Mule should give outgoing messages a correlation ID. Options are:

  • IF_NOT_SET(existing correlation IDs are maintained)

  • ALWAYS (existing correlation IDs are overridden)

  • NEVER (no action)

Default is IF_NOT_SET
Example: enableCorrelation="IF_NOT_SET"

Message Info Mapping

Optional. If this child element is not configured, MuleMessage.getCorrelationId() is used, which is optimal for most use cases. Maps attributes from incoming data to construct Correlation ID and Message ID on outgoing messages. No default value.

Example:


         
                 
              
1
2
3
<expression-message-info-mapping
  messageIdExpression="#[java.util.UUID.randomUUID().toString()]"
  correlationIdExpression="#[xpath3('//order/@id')]"/>

Expression

Expression to define how to split the message. This is a required field. No default value.
Example: `expression="#[xpath3('//item')]" `

Simple Splitter


     
             
          
1
<splitter expression="#[xpath3('//item')]" doc:name="Splitter" enableCorrelation="IF_NOT_SET"/>
Element Description

splitter

Splits a message into separate fragments, then sends these fragments one at a time to the next message processor in the flow.

Attribute Description

doc:name

Customize to display a unique name for the splitter in your application.

Note: Attribute not required in Mule Standalone configuration.

expression

Expression to define how to split the message. This is a required field.

enableCorrelation

Specifies whether Mule should give outgoing messages a correlation ID. Options are:

  • IF_NOT_SET (Default. Existing correlation IDs are maintained)

  • `ALWAYS `(Existing correlation IDs are overridden)

  • `NEVER `(No action)

Advanced Splitter Including a Child Element

Note that this example includes the optional child element, expression-message-info-mapping. Use this child element only if your aggregation (later in your flow) is extremely customized and the standard correlation id set by Mule does not meet your needs.

     
             
          
1
2
3
<splitter expression="#[xpath3('//item')]" doc:name="Splitter" enableCorrelation="IF_NOT_SET">
   <expression-message-info-mapping messageIdExpression="#[java.util.UUID.randomUUID().toString()]" correlationIdExpression="#[xpath3('//order/@id')]"/>
</splitter>
Child Element Description

expression-message-info-mapping

Optional. If this child element is not configured, MuleMessage.getCorrelationId() is used, which is optimal for most use cases. Maps attributes from incoming data to construct Correlation ID and Message ID on outgoing messages, according to the expressions in the attributes listed below.

Attribute Description

messageIdExpression

An expression that sets a custom message ID for each of the split messages. Must result in unique message Ids.

correlationIdExpression

An expression that sets a custom correlation ID for the split messages.

Basic Splitter Example

In this simple example, Mule splits an XML payload it into several items.

  1. Create a new Anypoint Studio project and drag a File Connector onto an empty canvas.

  2. Set the Path field to ./src/main/resources/.

  3. Add a new Connector Configuration, uncheck the Auto Delete Setting and click OK.

    1. Set the Polling Frequency to 15000.

    2. Set the File Name Regex Filter with the expression: file:filename-regex-filter caseSensitive=true, pattern=vip.xml

      splitter flow control reference ed1f8
  4. Drag a Splitter Flow Control into the flow.

  5. Configure the Splitter as shown. In the Expression parameter, provide the XPath expression //*:actor/text()',payload,'NODESET', wrapped inside a MEL expression. The splitter uses the xpath expression to select every XML element named actor, and makes each of these (together with its children) into a new message.

  6. Add a Logger after the Splitter and set its message to #[payload] so that it logs the entire payload of each message that it receives.

    splitter_example

    Parameter Value

    Display Name

    Splitter

    Enable Correlation

    IF_NOT_SET

    Expression

    #[xpath3('//*:actor/text()',payload,'NODESET')]

  1. Add an File Connector to the flow and configure it as follows:

    
           
                   
                
    1
    2
    3
    
    <file:inbound-endpoint path="./src/main/resources/" connector-ref="File" pollingFrequency="5000" responseTimeout="10000" doc:name="File">
      <file:filename-regex-filter pattern="vip.xml" caseSensitive="true"/>
    </file:inbound-endpoint>
  2. For this element to work, you must include a connector-ref. The attribute in the connector named config-ref references this connector configuration element.

    <file:connector name="File" autoDelete="false" streaming="true" validateConnections="true" doc:name="File"/>
    Attribute Value

    name

    File

    autoDelete

    false

    streaming

    true

    doc:name

    File

  3. Add a Splitter below, to receive messages File connector. In the Expression parameter provide the XPath expression //*:actor/text(), wrapped inside a MEL expression. This XPath expression selects every XML element named 'actor'. The splitter makes each of these (together with its children) into a new message.

    <splitter expression="#[xpath3('//*:actor/text()',payload,'NODESET')]" doc:name="Splitter"/>
    Attribute Value

    expression

    #[xpath3('//*:actor/text()',payload,'NODESET')]

    doc:name

    Splitter

    enableCorrelation

    IF_NOT_SET

  4. Include a logger after the splitter to log the entire payload of each message received.

    <logger message="#[message.payload]" level="INFO" doc:name="Logger"/>
    Attribute Value

    message

    #[message.payload]

    level

    INFO

    doc:name

    Logger

  5. The finished flow should look like this:

    
           
                   
                
    1
    2
    3
    4
    5
    6
    7
    
    <flow name="SplitterExampleFlow1" >
      <file:inbound-endpoint path="./src/main/resources/" connector-ref="File" pollingFrequency="5000" responseTimeout="10000" doc:name="File">
          <file:filename-regex-filter pattern="vip.xml" caseSensitive="true"/>
      </file:inbound-endpoint>
      <splitter expression="#[xpath3('//*:actor/text()',payload,'NODESET')]" doc:name="Splitter"/>
      <logger message="#[payload]" level="INFO" doc:name="Logger"/>
    </flow>

Full Example Code


          
       
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
                xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
        http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
        http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
        http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/current/mule-xml.xsd
        http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
        http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
        http://www.mulesoft.org/schema/mule/ee/dw http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd">

            <file:connector name="File" autoDelete="false" streaming="true" validateConnections="true" doc:name="File"/>

            <flow name="SplitterExampleFlow1" >
                <file:inbound-endpoint path="./src/main/resources/" connector-ref="File" pollingFrequency="5000" responseTimeout="10000" doc:name="File">
                    <file:filename-regex-filter pattern="vip.xml" caseSensitive="true"/>
                </file:inbound-endpoint>
                <splitter expression="#[xpath3('//*:actor/text()',payload,'NODESET')]" doc:name="Splitter"/>
                <logger message="#[payload]" level="INFO" doc:name="Logger"/>
            </flow>
</mule>

Test Splitter Example

Navigate to the /src/main/resources directory in your project and create a vip.xml file with the following content:


           
        
1
2
3
4
5
6
7
8
9
10
11
12
<root>
    <actors>
        <actor id="1">Christian Bale</actor>
        <actor id="2">Liam Neeson</actor>
        <actor id="3">Will Ferrell</actor>
    </actors>
    <singers>
        <singer id="4">Dave Grohl</singer>
        <singer id="5">B.B. King</singer>
        <singer id="6">Weird Al</singer>
    </singers>
</root>

Run the application and note that every 15 seconds, your File connector pushes the XML to the splitter, and all the actor elements are separated in three different messages:


           
        
1
2
3
INFO  YYYY-DD-MM HH:MM:SS,SSS [] org.mule.api.processor.LoggerMessageProcessor: [#text: Christian Bale]
INFO  YYYY-DD-MM HH:MM:SS,SSS [] org.mule.api.processor.LoggerMessageProcessor: [#text: Liam Neeson]
INFO  YYYY-DD-MM HH:MM:SS,SSS [] org.mule.api.processor.LoggerMessageProcessor: [#text: Will Ferrell]

Every time a request is sent, the order of each element in the response might change. This is because

Aggregating the Payload

When the splitter splits a message, it adds three new outbound variables into each of the output fragments. These three variables are later used by the Aggregator to reassemble the message:

  • MULE_CORRELATION_GROUP_SIZE: number of fragments into which the original message was split.

  • MULE_CORRELATION_SEQUENCE: position of a fragment within the group.

  • MULE_CORRELATION_ID: single ID for entire group (all output fragments of the same original message share the same value).

    variables+diagramv2

    You can look at the values of these outbound variables by putting a break point after the splitter and running your flow with the Visual Debugger:

variables

Thanks to these variables, when an aggregator receives a single fragment, it knows what group to put it into and how large this group should be. Once all of the fragments have arrived, it passes on the complete group as a single message.

diagram+ag+2

Aggregator Configuration

collection_agg

Field Description

Display Name

Customize to display a unique name for the splitter in your application. Default value is`Collection Aggregator`
Example: doc:name="Collection Aggregator"

Timeout

Defines a timeout in milliseconds to wait for events to be aggregated. By default, the aggregator throws an exception if it is waiting for a correlation group and a timeout occurs before it receives all group entities.
Example: timeout="60000"

Fail On Timeout

If set, your app fails if the aggregator times out. Default is false
Example: failOnTimeout="true"

Message Info Mapping

Optional. If this child element is not configured, MuleMessage.getCorrelationId() is used, which is optimal for most use cases. Defines where to obtain Correlation ID and Message ID in incoming messages.

Example:

&lt;expression-message-info-mapping messageIdExpression="#[java.util.UUID.randomUUID().toString()]" correlationIdExpression="#[xpath3('//order/@id')]"/&gt;

Store Prefix

Defines the prefix of the ObjectStore names +  Example: storePrefix="split_"

Simple Collection Aggregator

&lt;collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator" storePrefix="split_" timeout="60000"/&gt;
Element Description

collection-aggregator

Reassembles a message from separate fragments. Once all fragments have arrived it sends the full message to the next message processor in the flow.

Attribute Description

doc:name

Customize to display a unique name for the splitter in your application.

Note: Attribute not required in Mule Standalone configuration.

Timeout

Defines a timeout in milliseconds to wait for events to be aggregated. By default the aggregator throws an exception if it is waiting for a correlation group and a timeout occurs before it receives all group entities.

Fail On Timeout

If set, your app fails if the aggregator times out.

Message Info Mapping

Optional. If this child element is not configured, MuleMessage.getCorrelationId() is used, which is optimal for most use cases. Defines where to obtain Correlation ID and Message ID in incoming messages.

Prefix

Defines the prefix of the ObjectStore names

event-groups-object-store-ref

The object store where event groups are stored as a buffer. A default object store is used if none specified.

processed-groups-object-store-ref

The object store where processed groups are stored as a buffer. A default object store is used if none specified. It is recommended that if you assign a custom object store, that it has a max capacity and an expiration interval.

Advanced Collection Aggregator Including a Child Element

Note that this example includes the optional child element, expression-message-info-mapping. Use this child element only if your aggregation (later in your flow) is extremely customized and the standard correlation ID set by Mule does not meet your needs.

     
              
           
1
2
3
&lt;collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator" storePrefix="split_" timeout="60000"&gt;
&lt;expression-message-info-mapping messageIdExpression="#[java.util.UUID.randomUUID().toString()]" correlationIdExpression="#[xpath3('//order/@id')]"/&gt;
&lt;/collection-aggregator&gt;
Child Element Description

expression-message-info-mapping

Optional. If this child element is not configured, MuleMessage.getCorrelationId() is used, which is optimal for most use cases. Maps attributes of the arriving messages to messageIdExpression and correlationIdExpression.

Attribute Description

messageIdExpression

An expression that maps attributes of the arriving messages to messageIdExpression. Must result in unique message IDs.

correlationIdExpression

An expression that maps attributes of the arriving messages to correlationIdExpression. Must result in unique message IDs.

Simple Aggregator Example

Following the Splitter example from earlier, the following steps guide you to aggregate the actor elements in one message again:

  1. Drag from the canvas a Collection Aggregator after the last Logger component in the SplitterExampleFlow1 flow.

  2. In order to see the results, add another logger to the end of the flow and set its message to #[payload].

    splitter flow control reference 06595
  1. Add a Collection Aggregator and another Logger component at the end of your Flow:

    
           
                   
                
    1
    2
    
    &lt;collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/&gt;
    &lt;logger message="#[payload]" level="INFO" doc:name="Logger"/&gt;

Full Example Code


          
       
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
                xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/current/mule-xml.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/ee/dw http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd">

  <file:connector name="File" autoDelete="false" streaming="true" validateConnections="true" doc:name="File"/>
  <flow name="SplitterExampleFlow1" >
    <file:inbound-endpoint path="./src/main/resources/" connector-ref="File" pollingFrequency="5000" responseTimeout="10000" doc:name="File">
      <file:filename-regex-filter pattern="vip.xml" caseSensitive="true"/>
    </file:inbound-endpoint>
    <splitter expression="#[xpath3('//*:actor/text()',payload,'NODESET')]" doc:name="Splitter"/>
    <logger message="#[payload]" level="INFO" doc:name="Logger"/>
    <collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/>
    <logger message="#[payload]" level="INFO" doc:name="Logger"/>
  </flow>
</mule>

After running your code again, notice that the second logger picks up the message with all the names aggregated:


          
       
1
2
3
4
INFO  YYYY-DD-MM HH:MM:SS,SSS [] org.mule.api.processor.LoggerMessageProcessor: [#text: Christian Bale]
INFO  YYYY-DD-MM HH:MM:SS,SSS [] org.mule.api.processor.LoggerMessageProcessor: [#text: Liam Neeson]
INFO  YYYY-DD-MM HH:MM:SS,SSS [] org.mule.api.processor.LoggerMessageProcessor: [#text: Will Ferrell]
INFO  YYYY-DD-MM HH:MM:SS,SSS [] org.mule.api.processor.LoggerMessageProcessor: [[#text: Liam Neeson], [#text: Will Ferrell], [#text: Christian Bale]]