Nav
You are viewing an older version of this section. Click here to navigate to the latest version.

Using Message Processors to Control Message Flow

Message Processors are used within flows to control how messages are sent and received within that flow. This is further described in Using Flows for Service Orchestration. (Within services, the same job is performed by message routers – see Using Mule Services and Using Message Routers for further details.)

Click a link in the Quick Reference table below for details on a specific message processor.

Quick Reference

Message Processor Description

All

Broadcast a message to multiple targets

Async

Run a chain of message processors in a separate thread

Choice

Send a message to the first matching message processor

Collection Aggregator

Aggregate messages into a message collection

Collection Splitter

Split a message that is a collection

Combine Collections

Converts a payload consisting of a collection of collections into a single list.

Custom Aggregator

A custom-written class that aggregates messages

Custom Processor

A custom-written message processor

First Successful

Iterate through message processors until one succeeds (added in 3.0.1)

Idempotent Message Filter

Filter out duplicate message by message ID

Idempotent Secure Hash Message Filter

Filter out duplicate message by message content

Message Chunk Aggregator

Aggregate messages into a single message

Message Chunk Splitter

Split a message into fixed-size chunks

Message Filter

Filter messages using a filter

Processor Chain

Create a message chain from multiple targets

Recipient List

Send a message to multiple endpoints

[MULE3USER:Mule 3.2] Request Reply

Receive a message for asynchronous processing and accept the asynchronous response on a different channel

Resequencer

Reorder a list of messages

Round Robin

Round-robin among a list of message processors (added in 3.0.1)

[MULE3USER:Mule 3.2] Until Successful

Repeatedly attempt to process a message until successful

Splitter

Split a message using an expression

WireTap

Send a message to an extra message processor as well as to the next message processor in the chain

All

The All message processor can be used to send the same message to multiple targets.

Configuration for this router is as follows:


         
      
1
2
3
4
5
<all>
  <jms:endpoint queue="test.queue" transformer-refs="StringToJmsMessage"/>
  <http:endpoint host="10.192.111.11" transformer-refs="StringToHttpClientRequest"/>
  <tcp:endpoint host="10.192.111.12" transformer-refs="StringToByteArray"/>
</all>

If any of the targets specified is an endpoint that has a filter configured on it, only messages accepted by that filter are sent to that endpoint.

All messages (if any) returned by the targets are aggregated together and form the response from this processor.

Async

The Async message processor runs a chain of message processors in another thread, optionally specifying a threading profile for the thread to be used. The message processor is configured as follows:


         
      
1
2
3
4
5
6
<async>
  <append-string-transformer message="-async" />
  <vm:outbound-endpoint path="async-async-out"
                        exchange-pattern="one-way" />
  <threading-profile doThreading="true" maxThreadsActive="16"/>
</async>

This transforms the current message and sends it to the specified endpoint, using a threadpool that contains up to 16 concurrent threads.

Choice

The Choice message processor sends a message to the first message processor that matches. If none match and a message processor has been configured as "otherwise", the message is sent there. If none match and no otherwise message processor has been configured, an exception is thrown.

Choice is configured as follows:


         
      
1
2
3
4
5
6
7
8
9
10
11
<choice>
  <when expression="payload=='foo'" evaluator="groovy">
    <append-string-transformer message=" Hello foo" />
  </when>
  <when expression="payload=='bar'" evaluator="groovy">
    <append-string-transformer message=" Hello bar" />
  </when>
  <otherwise>
    <append-string-transformer message=" Hello ?" />
  </otherwise>
</choice>

If the message payload is "foo" or "bar", the corresponding transformer is run. If not, the transformer specified under "otherwise" is run.

Collection Aggregator

The Collection Aggregator groups incoming messages that have matching group IDs before forwarding them. The group ID can come from the correlation ID or another property that links messages together.

You can specify the timeout attribute to determine how long the router waits in milliseconds for messages to complete the group. By default, if the expected messages are not received by the timeout time, an exception is thrown and the messages are not forwarded. You can also set the failOnTimeout attribute to false to prevent the exception from being thrown and simply forward whatever messages have been received so far.

Configuration for the Collection Aggregator is as follows:

<collection-aggregator timeout="6000" failOnTimeout="false"/>

Collection Splitter

The Collection Splitter acts on messages whose payload is a Collection type. It sends each member of the collection to the next message processor as separate messages. You can specify the attribute enableCorrelation to determine whether a correlation ID is set on each individual message.

Configuration for the Collection Splitter is as follows:

<collection-splitter enableCorrelation="IF_NOT_SET"/>

Combine Collections

Converts a payload consisting of a collection of collections into a single list. For example, if the payload contains a collection holding A and B as well as another collection containing C and D, the output collection will contain A, B, C, and D.

This router (flow control) also converts MuleMessageCollections. In such a case, the collection payloads from more than one MuleMessage get merged into a single collection for a new MuleMessage.

<combine-collections-router/>

Custom Aggregator

A Custom Aggregator is an instance of a user-written class that aggregates messages. This class must implement the interface MessageProcessor. Often, it will be useful for it to subclass AbstractAggregator, which provides the skeleton of a thread-safe aggregator implementation, requiring only specific correlation logic. As with most custom objects in Mule, it can be configured either with a fully specified class name or as a reference to a Spring bean. It can also be configured with the same timeout and failOnTimeout attributes described under Collection Aggregator.

Configuration for a Custom Aggregator is as follows:


         
      
1
<custom-aggregator failOnTimeout="true" class="com.mycompany.utils.PurchaseOrderAggregator"/>

Custom Processor

A Custom Processor is an instance of a user-written class that acts as a message processor. This class must implement the interface MessageProcessor. As with most custom objects in Mule, it can be configured either with a fully specified class name or as a reference to a Spring bean.

Configuration for a Custom Processor is as follows:


         
      
1
<processor ref="HighSpeedRouter"/>

or


         
      
1
<custom-processor class="com.mycompany.utils.HighSpeedRouter"/>

First Successful

The First Successful message processor iterates through its list of child message processors, routing a received message to each of them in order until one processes the message successfully. If none succeed, an exception is thrown.

Success is defined as:

  • If the child message processor throws an exception, this is a failure.

  • Otherwise:

    • If the child message processor returns a message that contains an exception payload, this is a failure.

    • If the child message processor returns a message that does not contain an exception payload, this is a success.

    • If the child message processor does not return a message (e.g. is a one-way endpoint), this is a success.

This message processor was added in Mule 3.0.1.


         
      
1
2
3
4
5
6
<first-successful>
  <http:outbound-endpoint address="http://localhost:6090/weather-forecast" />
  <http:outbound-endpoint address="http://localhost:6091/weather-forecast" />
  <http:outbound-endpoint address="http://localhost:6092/weather-forecast" />
  <vm:outbound-endpoint path="dead-letter-queue" />
</first-successful>

From 3.1.0 you can further customize the behavior of this router by specifying a 'failureExpression' that allows you to use Mule Expressions to define a failure. The failureExpression attribute is configured as follows:


         
      
1
<first-successful failureExpression="exception-type:java.net.SocketTimeoutException">  <http:outbound-endpoint address="http://localhost:6090/weather-forecast" />  <http:outbound-endpoint address="http://localhost:6091/weather-forecast" />  <vm:outbound-endpoint path="dead-letter-queue" /></first-successful>

In the above example a failure expression is being used to more exactly define the exception type that will be considered a failure, alternatively you can use any other Mule expression that can be used with expression filters, just remember that the expression denotes failure rather than success.

Idempotent Message Filter

An idempotent filter checks the unique message ID of the incoming message to ensure that only unique messages are received by the flow. The ID can be generated from the message using an expression defined in the idExpression attribute. By default, the expression used is #[message:id], which means the underlying endpoint must support unique message IDs for this to work. Otherwise, a UniqueIdNotSupportedException is thrown.

There is a simple idempotent filter implementation provided at org.mule.routers.IdempotentMessageFilter. The default implementation uses a simple file-based mechanism for storing message IDs, but you can extend this class to store the IDs in a database instead by implementing the ObjectStore interface.

Configuration for this router is as follows:


         
      
1
2
3
<idempotent-message-filter idExpression="#[message:id]-#[header:foo]">
    <simple-text-file-store directory="./idempotent"/>
 </idempotent-message-filter>

The optional idExpression attribute determines what should be used as the unique message ID. If this attribute is not used, #[message:id] is used by default.

The nested element shown above configures the location where the received message IDs are stored. In this example, they are stored to disk so that the router can remember state between restarts. If the directory attribute is not specified, the default value used is ${mule.working.dir}/objectstore where mule.working.dir is the working directory configured for the Mule instance.

If no store is configured, the InMemoryObjectStore is used by default.

Idempotent Secure Hash Message Filter

This filter calculates the hash of the message itself using a message digest algorithm to ensure that only unique messages are received by the flow. This approach provides a value with an infinitesimally small chance of a collision and can be used to filter message duplicates. Note that the hash is calculated over the entire byte array representing the message, so any leading or trailing spaces or extraneous bytes (like padding) can produce different hash values for the same semantic message content. Therefore, you should ensure that messages do not contain extraneous bytes. This router is useful when the message does not support unique identifiers.

Configuration for this filter is as follows:


         
      
1
2
3
<idempotent-secure-hash-filter messageDigestAlgorithm="SHA26">
    <simple-text-file-store directory="./idempotent"/>
</idempotent-secure-hash-filter>

Idempotent Secure Hash Message Filter also uses object stores, which are configured the same way as the Idempotent Message Filter. The optional messageDigestAlgorithm attribute determines the hashing algorithm that will be used. If this attribute is not specified, the default algorithm SHA-256 is used.

Message Chunk Aggregator

After a splitter such as the Message Chunk Splitter splits a message into parts, the message chunk aggregator router reassembles those parts back into a single message. The aggregator uses the message’s correlation ID to identify which parts belong to the same message.

Configuration for the Message Chunk Aggregator is as follows:


         
      
1
2
3
<message-chunk-aggregator>
  <expression-message-info-mapping messageIdExpression="#[header:id]" correlationIdExpression="#[header:correlation]"/>
</message-chunk-aggregator>

The optional expression-message-info-mapping element allows you to identify the correlation ID in the message using an expression. If this element is not specified, MuleMessage.getCorrelationId() is used.

The Message Chunk Aggregator also accepts the timeout and failOnTimeout attributes as described under Collection Aggregator.

Message Chunk Splitter

The Message Chunk Splitter allows you to split a single message into a number of fixed-length messages that will all be sent to the same message processor. It will split the message up into a number of smaller chunks according to the messageSize attribute that you configure for the router. The message is split by first converting it to a byte array and then splitting this array into chunks. If the message cannot be converted into a byte array, a RoutingException is raised.

A message chunk splitter is useful if you have bandwidth problems (or size limitations) when using a particular transport.

To put the chunked items back together again, you can use the Message Chunk Aggregator.

Configuration for the Message Chunk Splitter is as follows:


         
      
1
<message-chunk-splitter message-size="512"/>

Message Filter

The Message Filter is used to control whether a message is processed by using a filter. In addition to the filter, you can configure whether to throw an exception if the filter does not accept the message and an optional message processor to send unaccepted messages to.

Configuration for the Message Filter is as follows:


         
      
1
2
3
<message-filter throwOnUnaccepted="false" onUnaccepted="rejectedMessageLogger">
  <message-property-filter pattern="Content-Type=text/xml" caseSensitive="false"/>
</message-filter>

Processor Chain

A Processor Chain is a linear chain of message processors which process a message in order. A Processor Chain can be configured wherever a message processor appears in a Mule Schema. For example, to allow a WireTap to transform the current message before sending it off, you can configure the following:


         
      
1
2
3
4
5
6
<wire-tap>
  <processor-chain>
    <append-string-transformer message="tap" />
    <vm:outbound-endpoint path="wiretap-tap" exchange-pattern="one-way" />
  </processor-chain>
</wire-tap>

Recipient List

The Recipient List message processor allows you to send a message to multiple endpoints by specifying an expression that, when evaluated, provides the list of endpoints. These messages can optionally be given a correlation ID, as in the Collection Splitter. An example is


         
      
1
<recipient-list enableCorrelation="ALWAYS" evaluator="header" expression="myRecipients"/>

which finds the list of endpoints in the message header named myRecipients.

Request Reply

[MULE3USER:Mule 3.2]

The Request Reply message processor receives a message on one channel, allows the back-end process to be forked to invoke other flows asynchronously, and accepts the asynchronous result on another channel.

Here is an example that uses the Request Reply message processor:


         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
<flow name="main">
    <vm:inbound-endpoint path="input"/>
    <request-reply storePrefix="mainFlow">
        <vm:outbound-endpoint path="request"/>
        <vm:inbound-endpoint path="reply"/>
    </request-reply>
    <component class="com.mycompany.OrderProcessor"/>
</flow>

<flow name="handle-request-reply">
    <vm:inbound-endpoint path="request"/>
    <component class="come.mycompany.AsyncOrderGenerator"/>
</flow>

The request is received in the main flow and passed to the request-reply router, which implicitly sets the MULE_REPLYTO message property to the URL of its inbound endpoint (vm://reply) and asynchronously dispatches the message to the (one-way) vm://request endpoint, where it is processed by the handle-request-reply flow. The main flow then waits for a reply. The handle-request-reply flow passes the message to the AsynchOrderGenerator component. When this processing is complete, the message is sent to vm://reply (the value of the MULE_REPLYTO property.) The asynchronous reply is received and given to the OrderProcessor component to complete the order processing.

In more advanced cases, you might not want the automatic forwarding of the second flow’s response to the request-reply inbound endpoint. For instance, the second flow might trigger the running of a third flow, which then generates and sends the reply. In these cases, you can remove the MULE_REPLYTO property with a Message Properties Transformer:


         
      
1
2
3
4
5
6
7
8
<request-reply storePrefix="mainFlow">
    <vm:outbound-endpoint path="request">
        <message-properties-transformer scope="outbound">
            <delete-property key="MULE_REPLYTO"/>
        </message-properties-transformer?
    </vm:outbound-endpoint>
    <vm:inbound-endpoint path="reply"/>
</request-reply>

Resequencer

The Resequencer sorts a set of received messages by their correlation sequence property and issues them in the correct order. It uses the timeout and fileOnTimeout attributes described in Collection Aggregator to determine when all the messages in the set have been received.

The Resequencer is configured as follows:


         
      
1
<resequencer timeout="6000" failOnTimeout="false"/>

Round Robin

The Round Robin message processor iterates through a list of child message processors in round-robin fashion: the first message received is routed to the first child, the second message to the second child, and so on. After a message has been routed to each child, the next is routed to the first child again, restarting the iteration.

This message processor was added in Mule 3.0.1.


         
      
1
2
3
4
5
<round-robin>
  <http:outbound-endpoint address="http://localhost:6090/weather-forecast" />
  <http:outbound-endpoint address="http://localhost:6091/weather-forecast" />
  <http:outbound-endpoint address="http://localhost:6092/weather-forecast" />
</round-robin>

Splitter

A Splitter uses an expression to split a message into pieces, all of which are then sent to the next message processor. Like other splitters, it can optionally specify non-0default locations within the message for the message ID and correlation ID.

The Splitter is configured as shown below:


         
      
1
<splitter evaluator="xpath" expression="//acme:Trade"/>

This uses the specified XPath expression to find a list of nodes in the current message and sends each of them as a separate message.

Until Successful

[MULE3USER:Mule 3.2]

The Until Successful message processor processes a message with its child message processor until the processing succeeds. This processing occurs asynchronously, therefore execution is returned to the parent flow immediately.

The Until Successful message processor is able to retry:

  • Dispatching to outbound endpoints, for example, when you’re reaching out to a remote web service that may have availability issues.

  • Execution of a component method, for example, to retry an action on a Spring Bean that may depend on unreliable resources.

  • A sub-flow execution, to keep reexecuting several actions until they all succeed.

  • Any other message processor execution, to allow more complex scenarios.


         
      
1
2
3
4
5
<until-successful objectStore-ref="objectStore"
                  maxRetries="5"
                  secondsBetweenRetries="60">
    <outbound-endpoint ref="retriableEndpoint" />
</until-successful>

This message processor needs an ListableObjectStore instance in order to persist messages pending (re)processing. There are several implementations available in Mule, including the following:

  • DefaultInMemoryObjectStore. The default in-memory store.

  • DefaultPersistentObjectStore. The default persistent store

  • FileObjectStore. A file-based store.

  • QueuePersistenceObjectStore. The global queue store.

  • SimpleMemoryObjectStore. An in-memory store

See Mule Object Stores for further information about object stores in Mule.

Here is how you would create an in-memory store:


         
      
1
<spring:bean id="objectStore" class="org.mule.util.store.SimpleMemoryObjectStore" />

Success or failure are defined as:

  • If the child message processor throws an exception, this is a failure.

  • If the child message processor does not return a message (e.g. is a one-way endpoint), this is a success.

  • If a 'failure expression' (see below) has been configured, the return message is evaluated against this expression to determine failure or not.

  • Otherwise:

    • If the child message processor returns a message that contains an exception payload, this is a failure.

    • If the child message processor returns a message that does not contain an exception payload, this is a success.

Here is an example showing how to configure the failure expression:


         
      
1
2
3
4
5
6
7
8
<until-successful objectStore-ref="objectStore"
                  failureExpression="#[header:INBOUND:http.status != 202]"
                  maxRetries="6"
                  secondsBetweenRetries="600">
    <http:outbound-endpoint address="http://acme.com/api/flakey"
                            exchange-pattern="request-response"
                            method="POST" />
</until-successful>

The Until Successful message processor is also able to synchronously acknowledge that it has accepted a message and will try to process it repeatedly. The following is an example where the message correlation ID is used as an acknowledgement message:


         
      
1
2
3
4
5
6
<until-successful objectStore-ref="objectStore"
                  ackExpression="#[message:correlationId]"
                  maxRetries="3"
                  secondsBetweenRetries="10">
    <flow-ref name="signup-flow" />
</until-successful>

It is also possible to define a DLQ (dead letter queue) endpoint to which messages will be sent if they have failed processing too many times:


         
      
1
2
3
4
5
6
<until-successful objectStore-ref="objectStore"
                  dlqEndpoint-ref="dlqChannel"
                  maxRetries="3"
                  secondsBetweenRetries="10">
...
</until-successful>

WireTap

The WireTap message processor allows you to route certain messages to a different message processor as well as to the next one in the chain. For instance, To copy all messages to a specific endpoint, you configure it as an outbound endpoint on the WireTap routing processor:


         
      
1
2
3
<wire-tap>
    <vm:outbound-endpoint path="tapped.channel"/>
</wire-tap>

Using Filters with the WireTap

The WireTap routing processor is useful both with and without filtering. If filtered, it can be used to record or take note of particular messages or to copy only messages that require additional processing. If filters aren’t used, you can make a backup copy of all messages received. The behavior here is similar to that of an interceptor, but interceptors can alter the message flow by preventing the message from reaching the component. WireTap routers cannot alter message flow but just copy on demand. In this example, only messages that match the filter expression are copied to the vm endpoint.


          
       
1
2
3
4
<wire-tap>
    <vm:outbound-endpoint path="tapped.channel"/>
    <wildcard-filter pattern="the quick brown*"/>
</wire-tap>