フロー出力の取得とメッセージソースからの応答の送信

ソースの主要なプロパティの 1 つは、応答が生成されるかどうかを示します。

応答を生成しないソースの例:

  • ファイルリスナ

  • Salesforce ストリーミング API

  • Scheduler 要素

  • 非同期プッシュ通知をリスンするソース

応答を生成するソース:

  • TCP サーバ

  • HTTP サーバ

  • JMS サーバ

このように、この分類はあまり確定的ではありません。次に例を示します。

  • HTTP サーバでは応答の送信がサポートされているが、ユーザが送信しないように選択できる。

  • JMS リスナがメッセージを応答キューに送信して応答するかどうかは、メッセージヘッダーによって異なる。

ソースが応答を送信できて、メッセージが正常に処理されても、ソースが応答を送信しない場合があります。応答の送信プロセスがスキップされる可能性のあるロジックまたは設定状況があります。

ソースが応答を生成できることを指定するには、@EmitsResponse アノテーションを付加する必要があります。 これにより、ソースにそれを行う機能があることが Runtime やツールに通知されます。

ただし、SDK も Runtime も次の作業を行わない点に注意してください。

  • 応答を生成するソースが実際に応答を生成するかどうかを検証する。

  • ソースが応答を生成することを宣言していなくても、ソースが応答を送信しないようにする。

コールバックパラメータ

ソースで生成されたメッセージの処理がフローで完了するたびに、フローで生成された応答をソースで取得することが必要になる場合があります。これは、​コールバックメソッド​を介して行われます。 コールバックメソッドは、ソースを所有するフローで生成された応答を利用するために使用されます。これを行うには、@OnSucess@OnError@OnTerminate アノテーションを付加してメソッドを宣言します。これらのメソッドは操作と同じように宣言されます。ただし、これらのメソッドは応答をリスンしますが、応答を変更することはできないため void にする必要があります。

これらのコールバックメソッドの各パラメータは、操作と同じように評価されます (自動変換を含む)。ただし、これらのコールバックはメッセージを処理する Runtime に応じて実行されるため、コールバックは式を受け入れることができます (デフォルト)。

ソースが応答を生成しない場合でも、リソースの解放、ログ、監査などでコールバックメソッドが必要になることがあります。

正常出力のリスン

ソースでは @OnSuccess アノテーションが付加されたメソッドを宣言できます。こうすることで、フローでメッセージが正常に処理された (エラーがスローされなかったか、すべて適切に処理された) 場合にメソッドが実行されます。

このメソッドには、ソースパラメータとして処理される引数を設定できます。コネクタのユーザは、通常のパラメータとコールバックパラメータを区別できません (区別する必要がありません)。この用途では、どちらでも問題ありません。ツール (Studio や Flow Designer) により、式を受け入れるパラメータと受け入れないパラメータがユーザに通知されます。ユーザは、他に何も心配する必要はありません。

別の重要な考慮事項は、ソースがメッセージをフローにプッシュした後のメッセージ処理が非同期であるということです。そのため、@OnSuccess メソッドも非同期で呼び出されます。

簡略化された HTTP コネクタの例を次に示します。

@EmitsResponse
public class HttpListener extends Source<InputStream, HttpRequestAttributes> {

    @Override
    public void onStart(SourceCallback<InputStream, HttpRequestAttributes> sourceCallback) throws MuleException {
        httpServer = serverProvider.connect();
        httpServer.listen(path).onRequest(request -> {
            SourceCallbackContext ctx = sourceCallback.createContext(); (2)
            ctx.setVariable("responseContext", new HttpResponseContext(request.getResponseSender())); (3)
            Result<InputStream, HttpRequestAttributes> result = requestToResult(request);
            sourceCallback.handle(result, ctx); (4)
        });
    }

    @OnSuccess
    public void onSuccess(@Content String responseBody,
                          @Optional String responseStatusCode,
                          SourceCallbackContext callbackContext) // <5
    throws Exception {
        if (hasResponse(responseStatusCode)) { (6)
            HttpResponseContext responseContext = callbackContext.getVariable("responseContext"); (7)
            responseContext.sendResponse(responseBody, statusCode); (8)
        }
    }
}
1 この例では、この HttpListener が応答を送信する可能性があるため、@EmitsResponse アノテーションをソースに付加します。
2 @OnSuccess メソッドは非同期 (多くの場合は並列) で呼び出されるため、状態をコールバックメソッドに伝える必要があります。より正確に言えば、応答する実際の要求に関連する何らかの参照を提供する必要があります。それを行うために、この例では SourceCallback を使用して新しい SourceCallbackContext を作成しています。
3 この例では、HttpResponseContext を保持するコンテキストに変数を追加します。これは、応答の送信方法を認識しているコネクタで定義されたカスタムオブジェクトです。
4 この例では、Result オブジェクトと新しく作成されたコンテキストを使用して、SourceCallback でメッセージを生成およびプッシュします。
5 コールバックメソッドには、2 つのパラメータ (送信される応答の本文に対応する @Content と応答の HTTP 状況コードに対応する @Optional) があります。SourceCallbackContext 型の 3 番目の引数もあります。これはパラメータではありませんが、handle() メソッドに渡された同じコンテキストを復元するように SDK に指示する手段です。
6 HTTP プロトコルでは応答が送信されない場合もあるため、この例では、応答の状況コードで応答を送信することが要求されているのかどうかをチェックします。
7 応答が必要な場合、この例では最初にコンテキストに設定された応答コンテキストを復元します。
8 応答が送信されます。

エラー出力のリスン

@OnSuccess と同様に、@OnError はフローで未処理のエラーがスローされたときに実行されます。次に例を示します。

@OnError
public void onError(@Optional @DisplayName(ERROR_RESPONSE_SETTINGS) @NullSafe HttpListenerErrorResponseBuilder errorResponseBuilder,
                   SourceCallbackContext callbackContext,
                   Error error) {

}

このケースのメソッドは、成功の例と非常に似ています。また、パラメータとコールバックコンテキストを受け取って、エラー応答をリクエスタに送り返します。さらに、エラーオブジェクトを受け取ることもできます。これは、取得した失敗に関する情報が含まれる Mule エラーオブジェクトです。

エラーオブジェクトは、Java java.lang.Error オブジェクトではなく、SDK org.mule.runtime.api.message.Error です。

コンテンツパラメータ

コールバックは、操作と同様にコンテンツパラメータを受け入れます。HTTP コネクタの場合、HttpListenerResponseBuilder でこれらが使用されます。これにより、DataWeave を使用して、フローの結果をコネクタで返す形式に変換できます。コンテンツやプライマリコンテンツのすべての概念は、関連する制限にも適用されます。

OnTerminate

ソースで @OnSuccess または @OnError メソッドを提供する必要はありません。ソースでは、必要なメソッドを宣言できますが、まったく宣言しないこともできます。ただし、これらの 1 つ以上が定義されている場合、@OnTerminate メソッドも定義する必要があります。

@OnSuccess または @OnError は失敗する可能性があるため、このメソッドが必要になります。これらのいずれかで例外がスローされると、応答を待機してリモートシステムがハングしたり、リソースが漏洩したり、監査ログが不完全なままになったりします。

onTerminate メソッドの例を次に示します。

  public void onTerminate(SourceResult sourceResult) {
    Boolean sendingResponse = (Boolean) sourceResult.getSourceCallbackContext().getVariable(RESPONSE_SEND_ATTEMPT).orElse(false);
    if (FALSE.equals(sendingResponse)) {
      sourceResult
          .getInvocationError()
          .ifPresent(error -> sendErrorResponse(new
                                HttpListenerErrorResponseBuilder(),
                                sourceResult.getSourceCallbackContext(),
                                error,
                                null));
    }
  }

メソッドは、onSuccess または onError メソッドで発生した可能性のある省略可能なエラーと、関連する SourceCallbackContext が含まれる SourceResult オブジェクトを受け取ります。

上記の例では、メソッドはエラーが見つかった場合に汎用エラー応答を送信します。

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub