Nav
You are viewing an older version of this section. Click here to navigate to the latest version.

Message Splitting and Aggregation

Message Splitters

List Message Splitter

A message splitter can be used to break down an outgoing message into parts and dispatch those parts over different endpoints configured on the router. The List Message Splitter accepts a list of objects that will be routed to different endpoints. The actual endpoint used for each object in the list is determined by a filter configured on the endpoint itself. If the endpoint’s filter accepts the object, the endpoint will be used to route the object.

By default the AbstractMessageSplitter sets a correlation ID and correlation sequence on the outbound messages so that inbound routers such as the Collection Aggregator or Correlation Resequencer are able to resequence or combine the split messages.

The router configuration below expects the message payload to be a java.util.List and will route objects in the list that are of type com.foo.Order, com.foo.Item, and com.foo.Customer. The router will allow any number and combination of these objects.

Configuration for this router is as follows:


          
       
1
<outbound>  <list-message-splitter-router>    <jms:outbound-endpoint queue="order.queue">      <payload-type-filter expectedType="com.foo.Order"/>    </jms:outbound-endpoint>    <jms:outbound-endpoint queue="item.queue">      <payload-type-filter expectedType="com.foo.Item"/>    </jms:outbound-endpoint>    <jms:outbound-endpoint queue="customer.queue">      <payload-type-filter expectedType="com.foo.Customer"/>    </jms:outbound-endpoint>    <payload-type-filter expectedType="java.util.List"/>  </list-message-splitter-router></outbound>

Note that there is also a filter on the router itself that ensures that the message payload received is of type java.util.List. If there are objects in the list that do not match any of the endpoint filters, a warning is written to the log and processing continues. To route any non-matching object types to another endpoint, add the endpoint at the end of the list without a filter.

Expression Splitter Router

This router is similar to the list message splitter router, but it splits the message based on an expression. The expression must return one or more message parts to be effective.


          
       
1
<outbound>  <expression-splitter-router evaluator="xpath" expression="/mule:mule/mule:model/mule:service" disableRoundRobin="true" failIfNoMatch="false">    <outbound-endpoint ref="service1">      <expression-filter evaluator="xpath" expression="/mule:service/@name = 'service splitter'"/>    </outbound-endpoint>    <outbound-endpoint ref="service2">      <expression-filter evaluator="xpath" expression="/mule:service/@name = 'round robin deterministic'"/>    </outbound-endpoint>  </expression-splitter-router></outbound>

Filtering XML Message Splitter

The XML Message Splitter is deprecated. Use the Expression Message Splitter with an xpath expression instead.

This router is similar to the List Message Splitter but operates on XML documents. Supported payload types are:

  • org.dom4j.Document objects

  • byte[]

  • java.lang.String

If no match is found, it is ignored and logged at the WARN level.

The router splits the payload into nodes based on the splitExpression property. The actual endpoint used for each object in the list is determined by a filter configured on the endpoint itself. If the endpoint’s filter accepts the object, the endpoint will be used to route the object. Each part returned is actually returned as a new DOM4J document.

The router can optionally perform a validation against an external XML schema document. To perform the validation, set externalSchemaLocation to the XSD file in your classpath. Setting this property overrides whatever schema document you declare in the XML header.

By default, the router fails if none of the endpoint filters match the payload. To prevent the router from failing in this case, you can set the failIfNoMatch attribute to false.

Configuration for this router is as follows:


          
       
1
<outbound>  <mulexml:filter-based-splitter splitExpression="root/nodes" validateSchema="true" externalSchemaLocation="/com/example/TheSchema.xsd">    <vm:outbound-endpoint path="order">      <payload-type-filter expectedType="com.foo.Order"/>    </vm:outbound-endpoint>    <vm:outbound-endpoint path="item">      <payload-type-filter expectedType="com.foo.Item"/>    </vm:outbound-endpoint>    <vm:outbound-endpoint path="customer">      <payload-type-filter expectedType="com.foo.Customer"/>    </vm:outbound-endpoint>    <payload-type-filter expectedType="org.dom4j.Document"/>  </mulexml:filter-based-splitter></outbound>

The above splitters simply split an a message into multiple parts, each part being passed onto the next message processor or endpoints. There are also message routers that split the message before routing such as the Round-Robin Message Splitter. See Message Routersfor more information about these.

Message Aggregation

Collection Aggregator

The Collection Aggregator groups incoming messages that have matching group IDs before forwarding them. The group ID can come from the correlation ID or another property that links messages together.

You can specify the timeout attribute to determine how long the router waits in milliseconds for messages to complete the group. By default, if the expected messages are not received by the timeout time, an exception is thrown and the messages are not forwarded. As of Mule 2.2, you can set the failOnTimeout attribute to false to prevent the exception from being thrown and simply forward whatever messages have been received so far.

The aggregator is based on the Selective Consumer, so you can also apply filters to the messages. Configuration for this router is as follows:


          
       
1
<inbound>  <vm:inbound-endpoint path="foo.bar"/>  <collection-aggregator-router timeout="6000" failOnTimeout="false">    <payload-type-filter expectedType="org.foo.some.Object"/>  </collection-aggregator-router></inbound>

Message Chunking Aggregator

After an outbound router such as the List Message Splitter splits a message into parts, the message chunking aggregator router reassembles those parts back into a single message. The aggregator uses the correlation ID, which is set by the outbound router, to identify which parts belong to the same message. This aggregator is based on the Selective Consumer, so filters can also be applied to messages.

Configuration for this router is as follows:


          
       
1
<inbound>  <vm:inbound-endpoint path="foo.bar"/>  <message-chunking-aggregator-router>    <expression-message-info-mapping correlationIdExpression="#[header:correlation]"/>    <payload-type-filter expectedType="org.foo.some.Object"/>  </message-chunking-aggregator-router></inbound>

The optional expression-message-info-mapping element allows you to identify the correlation ID in the message using an expression. If this element is not specified, MuleMessage.getCorrelationId() is used.

The Message Chunking aggregator also accepts the timeout and (as of Mule 2.2) failOnTimeout attributes as described under Collection Aggregator.

Custom Correlation Aggregator

This router is used to configure a custom message aggregator. Mule provides an abstract implementation that has a template method that performs the message aggregation. A common use of the aggregator router is to combine the results of multiple requests such as "ask this set of vendors for the best price of X".

The aggregator is based on the Selective Consumer, so you can also apply filters to messages. It also accepts the timeout and (as of Mule 2.2) failOnTimeout attributes as described under Collection Aggregator.

Configuration for this router is as follows:


          
       
1
<inbound>  <vm:inbound-endpoint path="foo.bar"/>  <custom-correlation-aggregator-router class="org.mule.CustomAgregator">    <payload-type-filter expectedType="org.foo.some.Object"/>  </custom-correlation-aggregator-router></inbound>

There is an AbstractEventAggregator that provides a thread-safe implementation for custom aggregators, which you can can use to write a custom aggregator router. For example, the Loan Broker examples included in the Mule distribution use a custom BankQuotesInboundAggregator router to aggregate bank quotes.