コネクタの再接続

SDK と Mule には標準の再接続戦略が用意されており、接続に使用するすべてのコネクタに適用されます。このサポートは汎用であるため、再接続ルールの設定と調整はどのコネクタでも同じです。

HTTP リクエスタでは、再試行のための再接続戦略は使用されません。詳細は、​HTTP Request 設定リファレンス を参照してください。

SDK が再接続のトリガのタイミングを知る方法

SDK と Mule は再接続エンジンを備えていますが、再接続を行うとき、接続が切断されたとき、新しい接続が必要なときは、コネクタから通知されます。

接続エラーとは?

既存の接続が機能しなくなり、コネクタがビジネスロジックの実行を継続できなくなると、再接続メカニズムによって新しい有効な接続を確立できます。これらのエラーは、接続エラーと呼ばれます。

エラーが接続エラーなのか他のエラーなのかを見極めることが非常に重要です。いくつかの例を示します。

接続エラーの例

  • Connection Timeout (接続タイムアウト)

  • ログイン情報が無効

  • トークンの期限切れ

接続エラー以外のエラーの例

  • HTTP 要求時に必要な HTTP クエリパラメータがない

  • ファイルのコピー時にディレクトリが存在しない

  • スラックに null メッセージをパブリッシュした

  • データベースクエリの実行時の構文エラー

再接続のトリガ方法

再接続において最も重要なことは、接続エラーが発生して再接続が必要な場合に、通知が正しく行えるということです。エラーの通知方法は、エラーが検出されたコンポーネントによって異なります。

以下のセクションでは、コネクタの各コンポーネントでの再接続のトリガ方法について説明します。

接続プロバイダ

接続の作成

接続プロバイダは、接続に最初に関わります。接続プロバイダが接続を作成できない理由にはいくつかありますが、接続エラーが発生した瞬間に接続プロバイダは接続エラーを通知して、再接続をトリガすることができます。つまり、接続を再び作成しようと試みます。

connect()​ メソッドが接続を作成しようとしたときにエラーを検出して再接続が必要になった場合は、​ConnectionException()​ をスローします。

@Override
public Connection connect() throws ConnectionException {
  Connection connection;
  try {
    connection = new Connection();
  } catch (Exception e) {
    throw new ConnectionException("Error occurred trying to connect.", e);
  }
  return connection;
}

接続の検証

接続がまだ有効であることを確認するため、SDK は稼働中の接続をいつでも取得して、接続を作成した接続プロバイダに対して検証できます。

接続を検証するため、​validate(Connection)​ が呼び出され、接続プロバイダが接続が有効かどうかをチェックします。 接続プロバイダが接続が有効でなくなっていることを検出すると、​接続検証結果​にエラー結果の ​ConnectionValidationResult.failure(Cause, Exception)​ を格納します。これによって現在の接続が切断されて無効化され、可能であれば再び作成されます。

例: 接続の検証
@Override
public ConnectionValidationResult validate(Connection connection) {
  if(connection.isValid()){
    return ConnectionValidationResult.success();
  } else {
    return ConnectionValidationResult.failure("Connection Problem", new RuntimeException());
  }
}

操作

操作が失敗する理由は、無効なパラメータ値や予期せぬエラーなどさまざまです。また、接続エラーに起因する場合もあります。エラーの原因が接続エラーであることを示すため、​ConnectionException()​ がスローされます。この例外により、​再接続がトリガされ、操作が再び実行されます​。

操作の例

public void operationTriggersReconnection(@Connection Connection connection, boolean connectionException){
    if(connectionException){
      throw new ConnectionException("Connection Error"); (1)
    } else {
      throw new RuntimeException(); (2)
    }
}
1 ConnectionException​ がスローされると、使用されていた接続が無効化されます。 接続プロバイダは新しい接続を作成しようと試み、接続が可能になると、操作が再び実行されます。
2 他の例外もスローされている場合は、エラーはビジネスエラーと見なされ、接続は無効化されず、操作も再実行されません。

ソース

ソースは自分自身の接続を作成する責任を持つ​ため、ソースによる再接続のトリガは操作とは少し異なります。正常に作成された接続がその後切断されると、ソースは接続エラーとして ​ConnectionException(Exception, Connection)​ をスローします。

ソースには​起動中​と​実行中​の 2 つのフェーズがあります。それぞれの段階では、接続エラーの通知方法が異なります。

接続エラーを ​ConnectionException​ で通知しないと、再接続メカニズムが正しく機能しません。

起動中のソース

onStart()​ メソッドが実行されるときに、ソースは起動中のソースフェーズにあります。 接続エラーは、​onConnectionException()​ メソッドを使用して ​SourceCallback​ で通知する必要があります。ソースは停止し、再接続を試みます。

@Override
public void onStart(SourceCallback<String, Void> sourceCallback) throws MuleException {
Connection connection = connectionProvider.connect();
  try {
    connection.startStreaming();
  } catch(Exception e){
    sourceCallback.onConnectionException(new ConnectionException(e, connection));
  }
}

実行中のソース

onStart()​ メソッドが正常に完了してソースが正常に起動すると、残りのロジックが他のスレッドで実行されます。

他のスレッドでの実行中は、SDK は ​throw​ ステートメントで通知されたエラーを検出することはできません。そのため、​onConnectionException()​ を使用して ​SourceCallback​ でエラーを通知する必要があります。

@Connection
ConnectionProvider<Connection> connectionProvider;

@Inject
SchedulerService schedulerService;

private Scheduler scheduler;

@Override
public void onStart(SourceCallback<Connection, Void> sourceCallback) throws MuleException {
  Connection connection = connectionProvider.connect();
    scheduler = schedulerService.ioScheduler(); (1)
    scheduler.execute(() -> {
      while (true) {
        try {
          connection.receiveMessage();
        } catch (Exception e){
          sourceCallback.onConnectionException(new ConnectionException(e, connection)); (2)
        }
      }
    });
}
1 新しいスレッドでソースロジックを実行するためのスケジューラを作成します。
2 置換すべきエラー接続を含む ​ConnectionException​ を ​SourceCallback​ に渡します。

操作内からの再試行ポリシーテンプレートへのアクセス

バージョン 1.3 以降で使用できます。

操作用に設定されている ​RetryPolicyTemplate​ オブジェクトに操作自身がアクセスする必要のある高度なケースがあります。たとえば、再試行を独立して処理する既存のライブラリを操作で使用している場合や、再接続なしで再試行を適用する必要のあるユースケースなどです。

これらのケースをサポートするには、有効な ​RetryPolicyTemplate​ インスタンスを操作パラメータとして追加して使用するように操作から要求します。次の WebSockets 用 Anypoint Connector (WebSockets Connector) の例を見てください。

public void send(String socketId,
                   @Content TypedValue<InputStream> content,
                   @Connection WebSocketConnection conn,
                   CompletionCallback<Void, Void> callback,
                   RetryPolicyTemplate retryPolicyTemplate) {

                   }

retryPolicyTemplate 引数は操作パラメータには変換されず、実行時に実効値と共に挿入されます。

この機能は、高度なシナリオのみを意図しています。通常は、ConnectionException をスローして、再接続と再試行を Mule に自動的に実行させます。この機能は、絶対に必要な場合にのみ使用してください。

接続例外

ConnectionException​ は以下のプロパティで構築できます。

  • Message​: 現在のエラーを説明した文字列です。

  • Cause​: 指定した場合、Throwable は現在のエラーの原因を示します。

  • ErrorType​: 指定した場合、ErrorType は現在のエラーを説明します。

  • Connection​: 指定した場合、Object は接続エラーを表します。切断と破棄に使用されます。