フロー出力の取得とメッセージソースからの応答の送信

ソースの主要なプロパティの 1 つは、応答が生成されるかどうかを示します。

応答を生成しないソースの例:

  • ファイルリスナー

  • Salesforce ストリーミング API

  • Scheduler 要素

  • 非同期プッシュ通知をリスンするソース

応答を生成するソース:

  • TCP サーバー

  • HTTP サーバー

  • JMS サーバー

このように、この分類はあまり確定的ではありません。次に例を示します。

  • HTTP サーバーでは応答の送信がサポートされているが、ユーザーが送信しないように選択できる。

  • JMS リスナーがメッセージを応答キューに送信して応答するかどうかは、メッセージヘッダーによって異なる。

ソースが応答を送信できて、メッセージが正常に処理されても、ソースが応答を送信しない場合があります。応答の送信プロセスがスキップされる可能性のあるロジックまたは設定状況があります。

ソースが応答を生成できることを指定するには、​@EmitsResponse​ アノテーションを付加する必要があります。 これにより、ソースにそれを行う機能があることが Runtime やツールに通知されます。

ただし、SDK も Runtime も次の作業を行わない点に注意してください。

  • 応答を生成するソースが実際に応答を生成するかどうかを検証する。

  • ソースが応答を生成することを宣言していなくても、ソースが応答を送信しないようにする。

コールバックパラメーター

ソースで生成されたメッセージの処理がフローで完了するたびに、フローで生成された応答をソースで取得することが必要になる場合があります。これは、​コールバックメソッド​​を介して行われます。 コールバックメソッドは、ソースを所有するフローで生成された応答を利用するために使用されます。これを行うには、​@OnSuccess​、​@OnError​、​@OnTerminate​ アノテーションを付加してメソッドを宣言します。これらのメソッドは操作と同じように宣言されます。ただし、これらのメソッドは応答をリスンしますが、応答を変更することはできないため void にする必要があります。

これらのコールバックメソッドの各パラメーターは、操作と同じように評価されます (自動変換を含む)。ただし、これらのコールバックはメッセージを処理する Runtime に応じて実行されるため、コールバックは式を受け入れることができます (デフォルト)。

ソースが応答を生成しない場合でも、リソースの解放、ログ、監査などでコールバックメソッドが必要になることがあります。

正常出力のリスン

ソースでは ​@OnSuccess​ アノテーションが付加されたメソッドを宣言できます。こうすることで、フローでメッセージが正常に処理された (エラーがスローされなかったか、すべて適切に処理された) 場合にメソッドが実行されます。

このメソッドには、ソースパラメーターとして処理される引数を設定できます。コネクタのユーザーは、通常のパラメーターとコールバックパラメーターを区別できません (区別する必要がありません)。この用途では、どちらでも問題ありません。ツール (Studio や Flow Designer) により、式を受け入れるパラメーターと受け入れないパラメーターがユーザーに通知されます。ユーザーは、他に何も心配する必要はありません。

別の重要な考慮事項は、ソースがメッセージをフローにプッシュした後のメッセージ処理が非同期であるということです。そのため、​@OnSuccess​ メソッドも非同期で呼び出されます。

簡略化された HTTP Connector の例を次に示します。

@EmitsResponse (1)
public class HttpListener extends Source<InputStream, HttpRequestAttributes> {

    @Override
    public void onStart(SourceCallback<InputStream, HttpRequestAttributes> sourceCallback) throws MuleException {
        httpServer = serverProvider.connect();
        httpServer.listen(path).onRequest(request -> {
            SourceCallbackContext ctx = sourceCallback.createContext(); (2)
            ctx.setVariable("responseContext", new HttpResponseContext(request.getResponseSender())); (3)
            Result<InputStream, HttpRequestAttributes> result = requestToResult(request);
            sourceCallback.handle(result, ctx); (4)
        });
    }

    @OnSuccess
    public void onSuccess(@Content String responseBody,
                          @Optional String responseStatusCode,
                          SourceCallbackContext callbackContext) (5)
    throws Exception {
        if (hasResponse(responseStatusCode)) { (6)
            HttpResponseContext responseContext = callbackContext.getVariable("responseContext"); (7)
            responseContext.sendResponse(responseBody, statusCode); (8)
        }
    }
}
1 この例では、この ​HttpListener​ が応答を送信する可能性があるため、​@EmitsResponse​ アノテーションをソースに付加します。
2 @OnSuccess​ メソッドは非同期 (多くの場合は並列) で呼び出されるため、状態をコールバックメソッドに伝える必要があります。より正確に言えば、応答する実際の要求に関連する何らかの参照を提供する必要があります。それを行うために、この例では ​SourceCallback​ を使用して新しい ​SourceCallbackContext​ を作成しています。
3 この例では、​HttpResponseContext​ を保持するコンテキストに変数を追加します。これは、応答の送信方法を認識しているコネクタで定義されたカスタムオブジェクトです。
4 この例では、​Result​ オブジェクトと新しく作成されたコンテキストを使用して、​SourceCallback​ でメッセージを生成およびプッシュします。
5 コールバックメソッドには、2 つのパラメーター (送信される応答の本文に対応する ​@Content​ と応答の HTTP 状況コードに対応する @Optional) があります。​SourceCallbackContext​ 型の 3 番目の引数もあります。これはパラメーターではありませんが、​handle()​ メソッドに渡された同じコンテキストを復元するように SDK に指示する手段です。
6 HTTP プロトコルでは応答が送信されない場合もあるため、この例では、応答の状況コードで応答を送信することが要求されているのかどうかをチェックします。
7 応答が必要な場合、この例では最初にコンテキストに設定された応答コンテキストを復元します。
8 応答が送信されます。

エラー出力のリスン

@OnSuccess​ と同様に、​@OnError​ はフローで未処理のエラーがスローされたときに実行されます。次に例を示します。

@OnError
public void onError(@Optional @DisplayName(ERROR_RESPONSE_SETTINGS) @NullSafe HttpListenerErrorResponseBuilder errorResponseBuilder,
                   SourceCallbackContext callbackContext,
                   Error error) {

}

このケースのメソッドは、成功の例と非常に似ています。また、パラメーターとコールバックコンテキストを受け取って、エラー応答をリクエスターに送り返します。さらに、エラーオブジェクトを受け取ることもできます。これは、取得した失敗に関する情報が含まれる Mule エラーオブジェクトです。

Error​ オブジェクトは、Java ​java.lang.Error​ オブジェクトではなく、SDK ​org.mule.runtime.api.message.Error​ です。

コンテンツパラメーター

コールバックは、操作と同様にコンテンツパラメーターを受け入れます。HTTP Connector の場合、​HttpListenerResponseBuilder​ でこれらが使用されます。これにより、DataWeave を使用して、フローの結果をコネクタで返す形式に変換できます。コンテンツやプライマリコンテンツのすべての概念は、関連する制限にも適用されます。

OnTerminate

ソースで ​@OnSuccess​ または ​@OnError​ メソッドを提供する必要はありません。ソースでは、必要なメソッドを宣言できますが、まったく宣言しないこともできます。ただし、これらの 1 つ以上が定義されている場合、​@OnTerminate​ メソッドも定義する必要があります。

@OnSuccess​ または ​@OnError​ は失敗する可能性があるため、このメソッドが必要になります。これらのいずれかで例外がスローされると、応答を待機してリモートシステムがハングしたり、リソースが漏洩したり、監査ログが不完全なままになったりします。

onTerminate​ メソッドの例を次に示します。

  public void onTerminate(SourceResult sourceResult) {
    Boolean sendingResponse = (Boolean) sourceResult.getSourceCallbackContext().getVariable(RESPONSE_SEND_ATTEMPT).orElse(false);
    if (FALSE.equals(sendingResponse)) {
      sourceResult
          .getInvocationError()
          .ifPresent(error -> sendErrorResponse(new
                                HttpListenerErrorResponseBuilder(),
                                sourceResult.getSourceCallbackContext(),
                                error,
                                null));
    }
  }

メソッドは、​onSuccess​ または ​onError​ メソッドで発生した可能性のある省略可能なエラーと、関連する ​SourceCallbackContext​ が含まれる ​SourceResult​ オブジェクトを受け取ります。

上記の例では、メソッドはエラーが見つかった場合に汎用エラー応答を送信します。