<amqp:listener
config-ref="AMQP_Config"
queueName="${originQueue}"
transactionalAction="ALWAYS_BEGIN"/>
Handle Transactions in AMQP - Mule 4
Transactional connections in AMQP allow you to execute a series of operations that are actually performed only when the transaction is committed. This has two main scenarios:
-
Publishing a message as part of a transaction: When performing a
publish
, the message will be effectively sent to the destination once the transaction is committed. If for some reason the transaction fails and a rollback occurs, then the Message will never be sent. -
Consuming a message as part of a transaction: In case of consuming a AMQP Message, either with the
listener
orconsume
, the message will be acknowledged only after the transacion is committed, and instead it will be returned to the destination for redelivery if the transaction is rolledback.
The AMQP Connector provides support for executing its operations in a transactional way out of the box using Mule’s transactionalAction
configuration.
In order to execute an operation as part of the transaction, we have the following two options.
Process New Messages With a Transactional Session
If you want a AMQP listener
to use a transactional session when dispatching the messages, you should set the transactionalAction
to 'ALWAYS_BEGIN':
With this configuration, each new message will be processed in a transaction that is propagated to all the flow’s components and is committed once the flow execution is completed successfully. The transaction will be rolledback if the flow execution completes with an error.
By default, other components in the flow will not join the transaction created by the listener. In order to execute other operations using the listener’s transaction, you must declare them either with 'ALWAYS_JOIN', or 'JOIN_IF_POSSIBLE'.
Execute Operations As Part Of A Transaction
To execute an operation like publish
or consume
as part of a transaction, set the transactionalAction
to ALWAYS_JOIN
, or JOIN_IF_POSSIBLE
:
These operations can join a transaction initiated by the listener:
<flow name="joiningToListenerTransaction">
<amqp:listener
config-ref="AMQP_Config"
queueName="${originQueue}"
transactionalAction="ALWAYS_BEGIN"/>
<amqp:publish
config-ref="AMQP_Config"
exchangeName="#[attributes.properties.userProperties.redirectDestination]"
transactionalAction="JOIN_IF_POSSIBLE"/>
<amqp:consume
config-ref="AMQP_Config"
queueName="#[attributes.properties.userProperties.callbackDestination]"
transactionalAction="JOIN_IF_POSSIBLE"/>
</flow>
Or they can join a scoped transaction:
<flow name="nonTxPublishMustNotJoinCurrentTx">
<http:listener config-ref="HTTP_Config" path="/orders"/>
<try transactionalAction="ALWAYS_BEGIN">
<amqp:publish
config-ref="config"
exchangeName="${billingService}"
transactionalAction="ALWAYS_JOIN"/>
<amqp:publish
config-ref="config"
exchangeName="${shipmentService}"
transactionalAction="ALWAYS_JOIN"/>
<amqp:publish-consume
config-ref="AMQP_Config"
exchangeName="${invoicesVerificationService}"/>
<validation:is-true expression="#[payload]"/>
</try>
</flow>