Contact Us 1-800-596-4880

Obtaining Flow Output and Sending Responses from a Message Source

One of the main properties of a Source indicates whether it generates a response or not.

Examples of sources that do not emit a response:

  • A File listener

  • Salesforce streaming API

  • A Scheduler element

  • A Source that listens for asynchronous push notifications

Sources that do emit a response:

  • A TCP server

  • An HTTP server

  • A JMS server

As you can see, this classification is highly non-deterministic, for example:

  • Although an HTTP server supports sending a response, the user might choose not to do so.

  • Although a JMS listener might respond by sending a message to a reply queue, it might not, depending on the message headers.

Even if a Source can send a response, the Source might not send a response even if the message was processed successfully. There are logic or configuration circumstances that might skip the process of sending the response.

To specify that your source can generate the response, it must be annotated with @EmitsResponse. Notice that this simply tells the runtime and tooling that the source has the capability of doing so.

However, note that the neither the SDK nor the runtime will:

  • Validate that a source that emits a response actually emits a response.

  • Prevent a source from sending a response even when the source does not declare that it emits responses.

Callback Parameters

Each time the flow finishes processing a message generated by the source, the source might need to obtain the response that the flow has generated. You do this through callback methods. Callback methods are used to tap into the responses generated by the flow that owns the source. You can do this by declaring methods annotated with @OnSuccess , @OnError, and @OnTerminate. These methods are declared in a similar fashion as operations, but they need to be void because they listen for the response but cannot alter it.

Each of the parameters in these callback methods is evaluated in the same manner as operations (including auto-transformation). Notice, however, that because these callbacks are executed in response to the runtime processing a message, the callbacks can accept expressions (the default).

Even if the source does not emit responses, it might need callback methods for releasing resources, logging, auditing, and so on.

Listening for Successful Outputs

The source can declare a method with the @OnSuccess annotation so that the method is executed when the flow successfully processes the message (either because no errors were thrown or because they were all handled gracefully).

This method can have arguments that are treated as source parameters. Note that the user of the connector cannot (and does not need to) tell the difference between a regular parameter and a callback one. For that use, it is all the same, and Studio shows which parameters accept expressions and which ones do not. The user should not need to worry about anything else.

Another important thing to keep in mind is that the processing of the message once the source has pushed it to the flow is asynchronous. So the @OnSuccess method will also be invoked asynchronously.

Here is another oversimplified example from the HTTP connector:

@EmitsResponse (1)
public class HttpListener extends Source<InputStream, HttpRequestAttributes> {

    @Override
    public void onStart(SourceCallback<InputStream, HttpRequestAttributes> sourceCallback) throws MuleException {
        httpServer = serverProvider.connect();
        httpServer.listen(path).onRequest(request -> {
            SourceCallbackContext ctx = sourceCallback.createContext(); (2)
            ctx.setVariable("responseContext", new HttpResponseContext(request.getResponseSender())); (3)
            Result<InputStream, HttpRequestAttributes> result = requestToResult(request);
            sourceCallback.handle(result, ctx); (4)
        });
    }

    @OnSuccess
    public void onSuccess(@Content String responseBody,
                          @Optional String responseStatusCode,
                          SourceCallbackContext callbackContext) (5)
    throws Exception {
        if (hasResponse(responseStatusCode)) { (6)
            HttpResponseContext responseContext = callbackContext.getVariable("responseContext"); (7)
            responseContext.sendResponse(responseBody, statusCode); (8)
        }
    }
}
1 The example annotates the source with @EmitsResponse because this HttpListener might send a response.
2 Because the @OnSuccess method is invoked asynchronously (and most likely in parallel), it is necessary to communicate state to the callback method. More precisely, it is necessary to provide some kind of reference regarding the actual request to reply into. To accomplish that, the example creates a new SourceCallbackContext using the SourceCallback.
3 The example adds a variable into the context that holds an HttpResponseContext. This is a custom object defined in the connector that knows how to send responses.
4 The example generates and pushes the message through the SourceCallback, using both the Result object and the newly create context.
5 The callback method has two parameters, a @Content one for the body of the response to be sent and the HTTP status code of the response. There is a third argument of type SourceCallbackContext. This is not a parameter but a way to tell the SDK that to recover the same context that was passed to the handle() method.
6 Because the HTTP protocol does not always send a response, the example checks to see if the response status code requires the response to be sent or not.
7 If a response is needed, the example recovers the response context that was originally set on the context.
8 The response is sent.

Listening for Error Outputs

Similar to @OnSuccess, the @OnError is executed when an unhandled error is thrown by the flow, for example:

@OnError
public void onError(@Optional @DisplayName(ERROR_RESPONSE_SETTINGS) @NullSafe HttpListenerErrorResponseBuilder errorResponseBuilder,
                   SourceCallbackContext callbackContext,
                   Error error) {

}

In this case, the method is very similar to the success example. It also receives parameters and the callback context, and it sends an error response back to the requester. It can also receive an Error object. This is the Mule Error object that contains information about the obtained failure.

The Error object is not a Java java.lang.Error object but an SDK org.mule.runtime.api.message.Error.

Content Parameters

Callbacks accept content parameters just like any operation. In the case of the HTTP connector, the HttpListenerResponseBuilder uses them so that DataWeave can be used to transform the flow’s result into whatever format the connector intends to return. All the concepts of content and primary contents apply, as well as their associated restrictions.

On Terminate

A source does not need to provide @OnSuccess or @OnError methods. A source can declare the ones it needs or none at all. However, if at least one of them is defined, then you also need to define an @OnTerminate method.

This is required because the @OnSuccess or @OnError might fail. If either of them throws an exception, a remote system might hang as it waits for a response, resources might leak, audit logs might be left uncompleted, and so on.

Here is an example of an onTerminate method:

  public void onTerminate(SourceResult sourceResult) {
    Boolean sendingResponse = (Boolean) sourceResult.getSourceCallbackContext().getVariable(RESPONSE_SEND_ATTEMPT).orElse(false);
    if (FALSE.equals(sendingResponse)) {
      sourceResult
          .getInvocationError()
          .ifPresent(error -> sendErrorResponse(new
                                HttpListenerErrorResponseBuilder(),
                                sourceResult.getSourceCallbackContext(),
                                error,
                                null));
    }
  }

The method receives a SourceResult object that contains optional errors that might have occurred in either the onSuccess or onError methods and the associated SourceCallbackContext.

In the example above, the method sends a generic error response in case an error was found.