Contact Us 1-800-596-4880

Mule Runtime High Availability (HA) Cluster Overview

A cluster is a set of Mule runtimes that acts as a unit. In other words, a cluster is a virtual server composed of multiple nodes. The nodes (Mule runtimes) in a cluster communicate and share information through a distributed shared memory grid. This means that the data is replicated across memory in different physical machines.

cluster
Contact your customer service representative about pricing for this feature.

The Benefits of Clustering

By default, clustering Mule runtimes ensures high system availability. (If you wish to use clusters for high performance instead, see Clustering for High Performance below.) If a Mule runtime node becomes unavailable due to failure or planned downtime, another node in the cluster can assume the workload and continue to process existing events and messages. The following figure illustrates the processing of incoming messages by a cluster of two nodes. Notice that the processing load is balanced across nodes – Node 1 processes message 1 while Node 2 simultaneously processes message 2.

FailoverNoFail

If one node fails, the other available nodes pick up the work of the failing node. As shown in the following figure, if Node 2 fails, Node 1 processes both message 1 and message 2.

FailoverNode2Fail

Because all nodes in a cluster of Mule runtimes process messages simultaneously, clusters can also improve performance and scalability. Compared to a single node instance, clusters can support more users or improve application performance by sharing the workload across multiple nodes or by adding nodes to the cluster.

The following figure illustrates workload sharing in more detail. Both nodes process messages related to order fulfillment. However, when one node is heavily loaded, it can move the processing for one or more steps in the process to another node. Here, processing of the Process order discount step is moved to Node 1, and processing of the Fulfill order step is moved to Node 2.

cluster diagram

Beyond benefits such as high availability through automatic failover, improved performance, and enhanced scalability, clustering Mule runtimes offers the following benefits:

  • Automatic coordination of access to resources such as files, databases, and FTP sources. The Mule runtime cluster automatically manages which node (Mule runtime) will handle communication from a data source.

  • Automatic load balancing of processing within a cluster. If you divide your flows into a series of steps and connect these steps with a transport such as VM, each step is put in a queue, making it cluster enabled. The cluster of Mule runtimes can then process each step in any node, and so better balance the load across nodes.

  • Raised alerts. You can set up an alert to appear when a node goes down and when a node comes back up.

All Mule runtimes in a cluster actively process messages. Note that each Mule runtime is also internally scalable – a single Mule runtime can scale easily by taking advantage of multiple cores. Even when a Mule runtime takes advantage of multiple cores, it still operates as a single node (Mule runtime) in a cluster.

Concurrency Issues Solved by Clusters

The following problems may exist when you have a server group composed of multiple servers that aren’t bound as a cluster. You don’t have to worry about any of them if you group your servers as a cluster:

  • File based transports: All Mule instances access the same mule file folders concurrently, which can lead to duplicate file processing and even possible failures if a file is deleted or modified by the Mule application

  • Multicast transport: All mule instances get the same TCP requests and then process duplicate messages

  • JMS Topics: All mule instances connect to the same JMS topic, which may lead to repeated processing of messages when scaling the non clustered Mule instance horizontally

  • JMS request-reply/request-response: All Mule instances are listening for messages in the same response queue, this implies that a Mule instance might obtain a response that isn’t correlated to the request it sent. This can result in incorrect responses or make a flow fail with timeout.

  • Idempotent-redelivery-policy: Idempotency doesn’t work if the same request is received by different Mule instances. Duplicated messages aren’t possible

  • Salesforce streaming API: if multiple instances of the same application are deployed, they will fail since the API only supports a single consumer. No failover support in case the instance connected is stopped or crashes

About Clustering

All the Mule runtimes in a cluster group together to form a single unit. Thus, you can deploy, monitor, or stop all the Mule runtimes in a cluster as if they were a single Mule runtime.

All the Mule runtimes in a cluster share memory, as illustrated below:

topology_4-cluster

Mule uses an active-active model to cluster Mule runtimes, rather than an active-passive model.

In an active-passive model, one node in a cluster acts as the primary, or active node, while the others are secondary, or passive nodes. The application in such a model runs on the primary node, and only ever runs on the secondary node if the first one fails. In this model, the processing power of the secondary node(s) is mostly wasted in passive waiting for the primary node to fail.

In an active-active model, no one node in the cluster acts as the primary node; all nodes in the cluster support the application. This application in this model runs on all the nodes, even splitting apart message processing between nodes to expedite processing across nodes.

About Queues

You can set up a VM queue explicitly to load balance across Mule runtimes (nodes). Thus, if your entire application flow contains a sequence of child flows, the cluster can assign each successive child flow to whichever Mule runtime (node) happens to be available at the time. Potentially, the cluster can process a single message on multiple nodes as it passes through the VM endpoints in the application flow, as illustrated below:

load_balancing

About High-Reliability Applications

A high-reliability application must meet the following requirements:

  1. Zero tolerance for message loss

  2. A reliable underlying enterprise service bus (Mule)

  3. Highly reliable individual connections

The feature known as transactionality tracks application event sequences to ensure that each message-processing step gets completed successfully, and therefore, no messages get lost or processed incorrectly. If a step fails, for some reason, the transactional mechanism rolls back all previous processing events, then restarts the message processing sequence again.

Transports such as JMS, VM, and JDBC provide built-in transactional support, thus ensuring that messages get processed reliably. For example, you can configure a transaction on an inbound JMS connection endpoint to remove messages from the JMS server only after the transaction has been committed. This ensures that the original message remains available for reprocessing if an error occurs during the processing flow.

You must use XA transactions to move messages between dissimilar transports that support transactions. This ensures that the Mule runtime commits associated transactions from all the dissimilar transports as a single unit. See Transaction Management for more information on XA and other types of transactions supported by Mule runtimes.

Cluster Support for Transports

All Mule transports are supported within a cluster. Because of differences in the way different transports access inbound traffic, the details of this support vary. In general, outbound traffic acts the same way inside and outside a cluster. Mule runtimes support three basic types of transports:

Socket-based Transports

Socket-based transports read input sent to network sockets that Mule owns. Examples include TCP, UDP, and HTTP[S].

Socket-based transports are supported in clusters as described below:

  • Since each clustered Mule runtime runs on a different network node, each instance receives only the socket-based traffic sent to its node. Incoming socket-based traffic should be Clustering and Load Balancing to distribute it among the clustered instances.

  • Output to socket-based transports is written to a specific host/port combination. If the host/port combination is an external host, no special considerations apply. If it is a port on the local host, consider using that port on the load balancer instead to better distribute traffic among the cluster.

Listener-based Transports

Listener-based transports read data using a protocol that fully supports concurrent multiple accessors. Examples include JMS and VM.

Listener-based transports are supported in clusters as described below:

  • Listener-based transports fully support multiple readers and writers. No special considerations apply either to input or to output.

  • Note that, in a cluster, VM transport queues are a shared, cluster-wide resource. The cluster will automatically synchronize access to the VM transport queues. Because of this, a message written to a VM queue can be processed by any cluster node. This makes VM ideal for sharing work among cluster nodes.

Resource-based Transports

Resource-based transports read data from a resource that allows multiple concurrent accessors, but does not natively coordinate their use of the resource. Examples of resource-based transports include File, FTP, SFTP, E-mail, and JDBC.

Resource-based transports are supported in clusters as described below:

  • Mule HA Clustering automatically coordinates access to each resource only for nodes within the cluster, ensuring that only one clustered instance accesses each resource at a time. Because of this, it is generally a good idea to immediately write messages read from a resource-based transport to VM queues. This allows the other cluster nodes to take part in processing the messages.

    If you run other applications (either Mule applications outside the cluster or non-Mule applications) that access the same resources as the nodes in the cluster, you must implement logic to lock those resources.

    For example, if multiple programs are processing files in the same shared directory by reading, modifying, and then deleting the files. These programs must use an explicit, application-level locking strategy to prevent the same file from being processed more than once.

  • There are no special considerations in writing to resource-based clustered transports:

    • When writing to file-based transports (File, FTP, SFTP), Mule will generate unique file names.

    • When writing to JDBC, Mule can generate unique keys.

    • Writing e-mail is effectively listener-based rather than resource-based.

Clustering and Reliable Applications

High-reliability applications (ones that have zero tolerance for message loss) not only require the underlying Mule to be reliable, but that reliability needs to extend to individual connections. Reliability Patterns give you the tools to build fully reliable applications in your clusters.

Current Mule documentation provides code examples that show how you can implement a reliability pattern for a number of different non-transactional transports, including HTTP, FTP, File, and IMAP. If your application uses a non-transactional transport, follow the reliability pattern. These patterns ensure that a message is accepted and successfully processed or that it generates an "unsuccessful" response allowing the client to retry.

If your application uses transactional transports, such as JMS, VM, and JDBC, use transactions. Mule’s built-in support for transactional transports enables reliable messaging for applications that use these transports.

These actions can also apply to non-clustered applications.

Clustering and Networking

Single Data-Center Clustering

To ensure reliable connectivity between cluster nodes, all nodes of a cluster should be located on the same LAN. Implementing a cluster with nodes across geographically separated locations, such as different data centers that are connected through a VPN, is possible but not recommended.

Ensuring that all cluster nodes reside on the same LAN is the best practice to lower the possibility of network interruptions and unintended consequences, such as duplicated messages.

Distributed Data-center Clustering

Linking cluster nodes through a WAN network introduces many possible points of failure, such as external routers and firewalls, which can prevent proper synchronization between cluster nodes. This not only affects performance but also requires you to plan for possible side effects in your app. For example, when two cluster nodes reconnect after getting cut off by a failed network link, the ensuing synchronization process can cause messages to be processed twice, which creates duplicates that must be handled in your application logic.

Note that it is possible to use nodes of a cluster located in different data centers and not necessarily located on the same LAN, but some restrictions apply.

To prevent this behavior, it is necessary to enable the Quorum Protocol. This protocol is used to allow one set of nodes to continue processing data while other sets do nothing with the shared data until they reconnect. Basically, when a disconnection occurs, only the portion with the most nodes will continue to function. For instance, assume two data centers, one with three nodes and another with two nodes. If a connectivity problem occurs in the data center with two nodes, then the data center with three nodes will continue to function, and the second data center will not. If the three-noded data center goes offline, none of your nodes will function. To prevent this outage, you must create the cluster in at least three data centers with the same number of nodes. It is unlikely for two data centers to crash, so if just one data center goes offline, the cluster will always be functional.

A cluster partition that does not have enough nodes to function will continue reacting to external system calls, but all operations over the object stores will fail, preventing data generation.
Limitations
  • Quorum is only supported in Object Store-related operations.

  • Distributed locking is not supported, which affects:

    • File/FTP transport polling for files concurrent

    • Idempotent Redelivery Policy component

    • Idempotent Message Filter component

  • In-memory messaging is not supported, which affects:

    • VM transport

    • SEDA queues

  • The Quorum feature can only be configured manually. See Creating and Managing a Cluster Manually

Clustering and Load Balancing

When Mule clusters are used to serve TCP requests (where TCP includes SSL/TLS, UDP, Multicast, HTTP, and HTTPS), some load balancing is needed to distribute the requests among the clustered instances. There are various software load balancers available, two of them are:

There are also many hardware load balancers that can route both TCP and HTTP(S) traffic.

Clustering for High Performance

High performance is implemented differently on CloudHub, so this section applies only for on-premises deployments.

If high performance is your primary goal (rather than reliability), you can configure a Mule cluster or an individual application for maximum performance using a performance profile. By implementing the performance profile for specific applications within a cluster, you can maximize the scalability of your deployments while deploying applications with different performance and reliability requirements in the same cluster. By implementing the performance profile at the container level, you apply it to all applications within that container. Application-level configuration overrides container-level configuration.

Setting the performance profile has two effects:

  • It disables distributed queues, using local queues instead to prevent data serialization/deserialization and distribution in the shared data grid.

  • It implements the object store without backups, to avoid replication.

To configure the performance profile at the container level, add to mule-cluster.properties or to the system properties from the command line or wrapper.conf:

mule.cluster.storeprofile=performance

To configure the performance profile at the individual application level, add the profile inside a configuration wrapper, as shown below.

Performance Store Profile

<mule>
   <configuration>
      <cluster:cluster-config>
         <cluster:performance-store-profile/>
      </cluster:cluster-config>
   </configuration>
</mule>

Remember that application-level configuration overrides container-level configuration. If you would like to configure the container for high performance but make one ore more individual applications within that container prioritize reliability, include the following code in those applications:

Reliable Store Profile

<mule>
    <configuration>
        <cluster:cluster-config>
            <cluster:reliable-store-profile/>
        </cluster:cluster-config>
    </configuration>
</mule>
In cases of high load with endpoints that do not support load balancing, applying the performance profile may degrade performance. If you are using a File-based transport with an asynchronous processing strategy, JMS topics, multicasting, or HTTP connectors without a load balancer, the high volume of messages entering a single node can cause bottlenecks, and thus it can be better for performance to turn off the performance profile for these applications.

You can also choose to define a minimum number of machines required in a cluster for it to remain in an operational state. This grants you a consistency improvement. Find more information in our quorum management section.

Best Practices

There are a number of recommended practices related to clustering. These include:

  • As much as possible, organize your application into a series of steps where each step moves the message from one transactional store to another.

  • If your application processes messages from a non-transactional transport, use a reliability pattern to move them to a transactional store such as a VM or JMS store.

  • Use transactions to process messages from a transactional transport. This ensures that if an error is encountered, the message reprocesses.

  • Use distributed stores such as those used with the VM or JMS transport – these stores are available to an entire cluster. This is preferable to the non-distributed stores used with transports such as File, FTP, and JDBC – these stores are read by a single node at a time.

  • Use the VM transport to get optimal performance. Use the JMS transport for applications where data needs to be saved after the entire cluster exits.

  • Create the number of nodes within a cluster that best meets your needs.

  • Implement reliability patterns to create high reliability applications.

Prerequisites and Limitations

  • To date, MuleSoft has only tested eight node clusters. If you plan to scale beyond eight nodes. If you plan to use many nodes in your cluster, or you plan to store large message payloads, research Hazelcast cluster scaling in the Hazelcast documentation or contact MuleSoft Support.

  • You must have at least two Mule runtimes in a cluster, each of which should run on different physical (or virtual) machines.

  • To maintain synchronization between the nodes in the cluster, Mule HA requires a reliable network connection between servers.

  • You must keep the following ports open in order to set up a Mule cluster: port 5701 and port 54327.

  • Because new cluster member discovery is performed using multicast, you need to enable the multicast IP: 224.2.2.3

  • To serve TCP requests, some load balancing across a Mule cluster is needed. See Clustering and Load Balancing for more information about third-party load balancers that you can use. You can also load balance the processing within a cluster by separating your flows into a series of steps and connecting each step with a transport such as VM. This cluster enables each step, allowing Mule to better balance the load across nodes.

  • If your custom message source does not use a message receiver to define node polling, then you must configure your message source to implement a ClusterizableMessageSource interface.
    ClusterizableMessageSource dictates that only one application node inside a cluster contains the active (that is, started) instance of the message source; this is the ACTIVE node. If the active node falters, the ClusterizableMessageSource selects a new active node, then starts the message source in that node.

See Also