Contact Us 1-800-596-4880

Reconnection on Connectors

The SDK and Mule provide out-of-the-box reconnection strategies to be applied to all connectors that work with connections. This support is generic, so the experience of configuring and adjusting reconnection rules is the same across connectors.

HTTP Requester does not use the reconnection strategy for retries. See HTTP Request Configuration Reference for additional information.

How the SDK Knows When to Trigger a Reconnection

The SDK and Mule provide the reconnection engine, but the connector is responsible for communicating when a reconnection should occur, when a connection is down, and when a new connection is required.

What is a Connection Error?

A reconnection mechanism can obtain a new, valid connection when the existing one does not work anymore and does not allow the connector to execute its business logic. These errors are connection or connectivity errors.

It is very important to understand when an error is a connectivity one and when it is not. Some examples follow below.

Examples of Connection Errors

  • Connection Timeout

  • Invalid Credentials

  • Token Expired

Examples of Non-connection Errors

  • Missing required HTTP Query Param when doing a HTTP request

  • Directory does not exist when trying to copy a file

  • Publish a Null message to Slack

  • Syntax error when executing a Database Query

How to Trigger a Reconnection

The main thing on reconnection is to be able to communicate correctly when a connectivity error occurred and is required to reconnect. How to communicate the error might differ depending on the component where the error has been detected.

The sections below explain how to trigger a reconnection for each component of a connector.

Connection Provider

Creating Connection

Connection Providers are the first moments of a connection life. At this moment, the connection provider can fail to create a connection because of several problems, so in this moment, the Connection Provider is able to communicate the connectivity error and trigger a reconnection, which means try to create the connection again.

When creating the connection, connect() method, to declare a connection problem and is required a reconnection a ConnectionException() should be thrown.

@Override
public Connection connect() throws ConnectionException {
  Connection connection;
  try {
    connection = new Connection();
  } catch (Exception e) {
    throw new ConnectionException("Error occurred trying to connect.", e);
  }
  return connection;
}

Validating Connection

To check that a connection is still valid, the SDK can grab a live connection at any time and validate it against the Connection Provider that created it.

To validate the connection, validate(Connection) is invoked, and the connection provider should check if the given connection is valid or not. If the Connection Provider detects that the Connection is not valid anymore, it should communicate a connection validation result with a failure result ConnectionValidationResult.failure(Cause, Exception). This will disconnect and invalidate the current connection, if possible, and a new connection will be created again.

Example: Validation Over Connection
@Override
public ConnectionValidationResult validate(Connection connection) {
  if(connection.isValid()){
    return ConnectionValidationResult.success();
  } else {
    return ConnectionValidationResult.failure("Connection Problem", new RuntimeException());
  }
}

Operations

Operations can fail due to multiple causes, such as bad parameters values and unexpected errors. Errors can also be due to connectivity errors. To indicate that the error comes from a non-working connection, a ConnectionException() is thrown. This exception triggers the reconnection, and the operation is executed again.

Operation Example

Example
public void operationTriggersReconnection(@Connection Connection connection, boolean connectionException){
    if(connectionException){
      throw new ConnectionException("Connection Error"); (1)
    } else {
      throw new RuntimeException(); (2)
    }
}
1 If a ConnectionException is thrown, the used connection will be invalidated. The connection provider will try to create a new valid connection, and if a connection is available, the operation will be executed again.
2 If any other exception is thrown, the error will be considered as a business error, the connection will not be invalidated, and the operation will not be executed again.

Sources

Because a Sources are responsible of creating their own Connections, the way a Source triggers reconnections differs somewhat from Operations. If a successfully created connection is later disconnected, the source should communicate the connection failure: ConnectionException(Exception, Connection) .

Sources have two stages, when they are starting and when they are running. Each moment has to communicate their connectivity failures in different ways.

Not communicating the Connection failure in the ConnectionException could make the reconnection mechanism fail to work properly.

Starting Source

A source is in the starting source stage when the onStart() method executes. Connection errors must be communicated through the SourceCallback using the onConnectionException() method. The source stops and attempts to reconnect.

Example
@Override
public void onStart(SourceCallback<String, Void> sourceCallback) throws MuleException {
Connection connection = connectionProvider.connect();
  try {
    connection.startStreaming();
  } catch(Exception e){
    sourceCallback.onConnectionException(new ConnectionException(e, connection));
  }
}

Running Source

When the Source starts correctly, it means that the onStart() method finishes correctly, and the remaining logic is running on other threads.

When running on other threads, the SDK is unable to detect any kind of error raised with a throw statement. So the errors must be communicated through the SourceCallback, using the onConnectionException().

Example
@Connection
ConnectionProvider<Connection> connectionProvider;

@Inject
SchedulerService schedulerService;

private Scheduler scheduler;

@Override
public void onStart(SourceCallback<Connection, Void> sourceCallback) throws MuleException {
  Connection connection = connectionProvider.connect();
    scheduler = schedulerService.ioScheduler(); (1)
    scheduler.execute(() -> {
      while (true) {
        try {
          connection.receiveMessage();
        } catch (Exception e){
          sourceCallback.onConnectionException(new ConnectionException(e, connection)); (2)
        }
      }
    });
}
1 Creating a scheduler to run the Source logic in a new Thread.
2 Communicating to the SourceCallback the ConnectionException containing the failure connection that should be replaced.

Accessing the Retry Policy Template from Within an Operation

Available since version 1.3.

There are advanced cases in which an operation might need to access the RetryPolicyTemplate object that has been configured for it. For example, an operation might be leveraging an existing library that handles retries independently, or your use case might require applying retries without necessarily reconnecting.

To support these cases, operations can request that the effective RetryPolicyTemplate instance be used by adding it as an operation parameter. Consider this example from Anypoint Connector for WebSockets (WebSockets Connector):

public void send(String socketId,
                   @Content TypedValue<InputStream> content,
                   @Connection WebSocketConnection conn,
                   CompletionCallback<Void, Void> callback,
                   RetryPolicyTemplate retryPolicyTemplate) {

                   }

The retryPolicyTemplate argument is not translated into an operation parameter, but it is injected with the effective value upon execution.

This feature is intended for advanced scenarios only. Typically, you should throw a ConnectionException and allow Mule to handle reconnection and retries automatically. Use this feature only when absolutely necessary.

Connection Exception

A ConnectionException can be built with the following properties:

  • Message : String that describes the current error.

  • Cause : If present, Throwable indicates the cause the current error.

  • ErrorType : If present, ErrorType describing the current error.

  • Connection : If present, Object representing the connection failure. This will be used to disconnect and destroy it.