Contact Free trial Login

Define Sources

Assure Restartability

Tools like Anypoint Runtime Manager allow you to start and stop individual sources or flows, independently from the rest of the application.

This means that all sources must:

  • Be able to be stopped and restarted at any time, without their functionality being affected

  • Release all resources when stopped including connections, threads, files, and so on.

No Manual Polling

Sources that work by doing polling must not implement the polling logic manually. They must extend the PollingSource class instead.

Declare When a Response Is Emitted

There are sources that only trigger messages into a flow. Others, also send a response to an external system, like the HTTP connector.

Sources that emit a response must be annotated with @EmitsResponse.

Handle Asynchronous Callbacks

Sources define callbacks (methods annotated with @OnSuccess and/or @OnError) in which asynchronous tasks are performed. These async tasks can range from sending a response using non-blocking I/O to sending notifications or auditing tasks.

For asynchronous callbacks, the source must use SourceCompletionCallback to signal the completion of such tasks. See Sending Asynchronous Responses from a Message Source.

Use Transactions

When the source obtains its messages from a transactional system, it must do so using transactions.

Handle Back Pressure

The Mule execution engine may apply back pressure to a message source when at capacity. By default, the SDK forces the source to wait when back pressure occurs by making this method block and wait for Mule to have resources available to handle the request:

org.mule.runtime.extension.api.runtime.source.SourceCallback#handle(org.mule.runtime.extension.api.runtime.operation.Result<T,A>)

Although this default behavior suits most cases, some modules require different actions. For example:

  • Instead of waiting, the HTTP connector sends a 503 response

  • The JMS connector makes it configurable if the source should wait, fail, or drop the message, as those are valid options in the messaging domain

When the moduleโ€™s domain includes use cases in which waiting should not be the default, or additional options should be available, then the source must use the @BackPressure annotation to provide that behavior.

Custom Actions When Back Pressure Occurs

Some modules may need to take custom actions when back pressure errors occur, such as:.

  • Resolving transactions

  • Releasing resources

  • Sending notifications to external systems

Modules with this functional need must do so by including a callback method annotated with @OnBackPressure. For example:

@OnBackPressure
public void onBackPressure(BackPressureContext ctx) {
  // .. implementation
}

Notice that if the action to be taken is asynchronous, a SourceCompletionCallback must be leveraged too:

@OnBackPressure
public void onBackPressure(BackPressureContext ctx, SourceCompletionCallback completionCallback) {
  // .. implementation
}

Resolving Transactions

If the source is transactional, then it must add a back pressure handler in which the transaction is resolved.

Mind the Cluster Behavior

All sources must consider how they are going to behave when Mule operates in cluster mode or the same app is deployed to several workers in CloudHub.

In some cases, itโ€™s okay for the source to run on all replicas. An example of this is the <http:listener>. The underlying protocol already guarantees that no two nodes will pick up the same file at the same time. This is the default behavior.

However, there are some cases in which the source should run on the primary node only, as running on all replicas could lead to bogus behavior or data corruption. An example of this is a <jms:listener> that is listening to a topic, or a source reading from a stream of events. In these cases, a source must be annotated with the @ClusterSupport annotation, and specifying the NOT_SUPPORTED or DEFAULT_PRIMARY_NODE_ONLY options.

Display Name

Sources should have a @DisplayName that uses the โ€œOn ..โ€ prefix to communicate in which event or action the source/listener/trigger will be executed/triggered, for example:

  • JMS Connector

    The source is called On New Message communicating that the source will dispatch a message when a new message arrives to a Queue.

  • Database Connector

    On Table Row communicates that triggers for each table row.

  • Salesforce Connector

    On Created Object, every time the connector detects a new object in Salesforce, the connector triggers the flow.

DSL Naming

At the DSL level, the sources should instead use the listener suffix, or simply use the listener name.

For example:

  • The HTTP connector defines the <http:listener> message source for exposing inbound HTTP endpoints

  • The WebSockets connector defines the <ws:inbound-listener> and <ws:outbound-listener> sources

These can be controlled using the @Alias annotation.

Was this article helpful?

๐Ÿ’™ Thanks for your feedback!

Edit on GitHub