public class TransactionalSource extends Source<Serializable, Void> {
@Connection
private ConnectionProvider<MyTransactionalConnection> connectionProvider;
....
}
Transactional Message Sources
Just like operations, Message Sources support transactions. Examples of Message Sources include the JMS and VM connector listeners. Both listeners take messages out of a queue and push it to a Flow. If the message processing is successful, the transaction is committed. Otherwise, the transaction is rolled back, and the message goes back to the queue.
To be transactional, a Message Source needs to operate over a connection type that implements the TransactionalConnection
interface, just like operations. XA transactions are also supported automatically if the connection implements XATransactionalConnection
.
Message Source Transactional Actions
When a Message Source is transactional, a synthetic parameter named transactionalAction
is automatically added to it. That parameter is an enum
type that can take these values:
-
ALWAYS_BEGIN: Ensures that a new transaction is created for each invocation.
-
NONE: Indicates that the source will not start any transaction and will not participate of one that is opened in the flow.
Message Source Transaction Types
When the source supports XA transactions, a transactionType
parameter is added. This parameter can take these values:
-
LOCAL: Starts a regular transaction.
-
XA: Starts an XA transaction instead.
Handling Transactions
In the SDK, transactions are modeled through Connections. So each transaction requires a different connection instance. This is because Message Sources need to support parallelism, and the same connection cannot serve two different transactions at the same time.
This requires a model different from that of the HTTP Listener example used elsewhere in the SDK documentation. So, consider this oversimplified example that uses the <vm:listener />
from the VM Connector:
public VMListener extends Source<Serializable, VMMessageAttributes> {
@Connection
private ConnectionProvider<QueueSession> sessionProvider;
@Override
public void onStart(SourceCallback<Serializable, VMMessageAttributes> sourceCallback) throws MuleException {
while(notStopped()) {
QueueSession session = sessionProvider.connect(); (1)
CallbackContext ctx = callback.createContext(); (2)
TransactionHandle status = ctx.bindConnection(session); (3)
try {
callback.handle(session.poll(), ctx); (4)
} catch (Exception e) {
status.rollback();
}
}
}
@OnSuccess
public void onSuccess(SourceCallbackContext context) {
handleSuccess(context.getConnection()); (5)
}
@OnError
public void onError(SourceCallbackContext context, Error error) {
handleError(context, error);
}
}
1 | Multiple connections are obtained. |
2 | For each connection, a new CallbackContext is created. |
3 | Each connection is registered with the context through the bindConnection() method in the SourceCallbackContext that was just created. |
4 | The context is passed to the handle method. |
5 | The connection will later be available for all the onSuccess , onError , and onTerminate methods , through the SourceCallbackContext.getConnection() method. |
Because the connection has been bound to the context, the runtime will automatically close the connection when needed. If the connection is transactional and the source is also configured to be transactional, then calling the bindConnection()
method will automatically start the transaction on that connection. Also if the connection is transactional, the runtime will automatically commit the transaction after the onSuccess()
method or roll it back after the onError()
method.
Notice that in this case, the transaction needs to be started BEFORE polling any message from the queue. It is a good practice to call bindConnection() immediately after the connection has been obtained.
|
Custom Transaction Handling
Some connectors might need to provide custom transaction handling. For example, suppose that in the case of an error, you want to publish an error response. In that case, you need the transaction to be committed, because if it gets rolled back, the response will never reach its destination.
@OnError
public void onError(SourceCallbackContext context, Error error) {
ctx.getConnection().publish(buildErrorResponse(error)); (1)
ctx.getTransactionHandle().commit(); (2)
}
1 | The connection is still in an open state when the onSuccess() and onError() methods are executed. |
2 | The context has a TransactionHandle that allows you to manually manipulate the transaction. If you manually resolve the transaction, the runtime will respect your wishes and not do it for you later. You do not need to do this for the most common use cases. |