@OnBackPressure
public void onBackPressure(BackPressureContext ctx) {
// .. implementation
}
Define Sources
Assure Restartability
Tools like Anypoint Runtime Manager allow you to start and stop individual sources or flows, independently from the rest of the application.
This means that all sources must:
-
Be able to be stopped and restarted at any time, without their functionality being affected
-
Release all resources when stopped including connections, threads, files, and so on.
No Manual Polling
Sources that work by doing polling must not implement the polling logic manually. They must extend the PollingSource class instead.
Declare When a Response Is Emitted
There are sources that only trigger messages into a flow. Others, also send a response to an external system, like the HTTP connector.
Sources that emit a response must be annotated with @EmitsResponse.
Handle Asynchronous Callbacks
Sources define callbacks (methods annotated with @OnSuccess and/or @OnError) in which asynchronous tasks are performed. These async tasks can range from sending a response using non-blocking I/O to sending notifications or auditing tasks.
For asynchronous callbacks, the source must use SourceCompletionCallback to signal the completion of such tasks. See Sending Asynchronous Responses from a Message Source.
Use Transactions
When the source obtains its messages from a transactional system, it must do so using transactions.
Handle Back Pressure
The Mule execution engine may apply back pressure to a message source when at capacity. By default, the SDK forces the source to wait when back pressure occurs by making this method block and wait for Mule to have resources available to handle the request:
org.mule.runtime.extension.api.runtime.source.SourceCallback#handle(org.mule.runtime.extension.api.runtime.operation.Result<T,A>)
Although this default behavior suits most cases, some modules require different actions. For example:
-
Instead of waiting, the HTTP connector sends a 503 response
-
The JMS connector makes it configurable if the source should wait, fail, or drop the message, as those are valid options in the messaging domain
When the module’s domain includes use cases in which waiting should not be the default, or additional options should be available, then the source must use the @BackPressure annotation to provide that behavior.
Custom Actions When Back Pressure Occurs
Some modules may need to take custom actions when back pressure errors occur, such as:.
-
Resolving transactions
-
Releasing resources
-
Sending notifications to external systems
Modules with this functional need must do so by including a callback method annotated with @OnBackPressure. For example:
Notice that if the action to be taken is asynchronous, a SourceCompletionCallback must be leveraged too:
@OnBackPressure
public void onBackPressure(BackPressureContext ctx, SourceCompletionCallback completionCallback) {
// .. implementation
}
Mind the Cluster Behavior
All sources must consider how they are going to behave when Mule operates in cluster mode or the same app is deployed to several workers in CloudHub.
In some cases, it’s okay for the source to run on all replicas. An example of this is the <http:listener>. The underlying protocol already guarantees that no two nodes will pick up the same file at the same time. This is the default behavior.
However, there are some cases in which the source should run on the primary node only, as running on all replicas could lead to bogus behavior or data corruption. An example of this is a <jms:listener>
that is listening to a topic, or a source reading from a stream of events. In these cases, a source must be annotated with the @ClusterSupport annotation, and specifying the NOT_SUPPORTED or DEFAULT_PRIMARY_NODE_ONLY options.
Display Name
Sources should have a @DisplayName that uses the “On ..” prefix to communicate in which event or action the source/listener/trigger will be executed/triggered, for example:
-
JMS Connector
The source is called On New Message communicating that the source will dispatch a message when a new message arrives to a Queue.
-
Database Connector
On Table Row communicates that triggers for each table row.
-
Salesforce Connector
On Created Object, every time the connector detects a new object in Salesforce, the connector triggers the flow.
DSL Naming
At the DSL level, the sources should instead use the listener
suffix, or simply use the listener name.
For example:
-
The HTTP connector defines the <http:listener> message source for exposing inbound HTTP endpoints
-
The WebSockets connector defines the <ws:inbound-listener> and <ws:outbound-listener> sources
These can be controlled using the @Alias annotation.