Nav

Handling Transactions in AMQP

Transactional connections in AMQP allow you to execute a series of operations that are actually performed only when the transaction is committed. This has two main scenarios:

  • Publishing a Message as part of a transaction: When performing a publish, the Message will be effectively sent to the destination once the transaction is committed. If for some reason the transaction fails and a rollback occurs, then the Message will never be sent.

  • Consuming a Message as part of a transaction: In case of consuming a AMQP Message, either with the listener or consume, the Message will be acknowledged only after the transacion is committed, and instead it will be returned to the destination for redelivery if the transaction is rolledback.

The AMQP Connector provides support for executing its operations in a transactional way out of the box using Mule’s transactionalAction configuration.

In order to execute an operation as part of the transaction, we have the following two options.

Processing New Messages With Transactional Session

If you want a AMQP Listener to use a transactional session when dispatching the messages, you should set the transactionalAction to 'ALWAYS_BEGIN':


         
      
1
<amqp:listener config-ref="AMQP_Config" queueName="${originQueue}" transactionalAction="ALWAYS_BEGIN"/>

With this configuration, each new message will be processed in a transaction that is propagated to all the flow’s components and is committed once the flow execution is completed successfully. The transaction will be rolledback if the flow execution completes with an error.

By default, other components in the flow will not join the transaction created by the listener. In order to execute other operations using the listener’s transaction, you must declare them either with 'ALWAYS_JOIN', or 'JOIN_IF_POSSIBLE'.

Execute Operations As Part Of A Transaction

To execute an operation like publish or consume as part of a transaction, we have to set the transactionalAction to 'ALWAYS_JOIN', or 'JOIN_IF_POSSIBLE':

This operations can join a transaction initiated by the listener:


         
      
1
2
3
4
5
6
<flow name="joiningToListenerTransaction">
    <amqp:listener config-ref="AMQP_Config" queueName="${originQueue}" transactionalAction="ALWAYS_BEGIN"/>
    <amqp:publish config-ref="AMQP_Config" exchangeName="#[attributes.properties.userProperties.redirectDestination]" transactionalAction="JOIN_IF_POSSIBLE"/>
    <amqp:consume config-ref="AMQP_Config" queueName="#[attributes.properties.userProperties.callbackDestination]" transactionalAction="JOIN_IF_POSSIBLE"/>
</flow>

Or they can join a scoped transaction:


         
      
1
2
3
4
5
6
7
8
9
<flow name="nonTxPublishMustNotJoinCurrentTx">
    <http:listener config-ref="HTTP_Config" path="/orders"/>
    <try transactionalAction="ALWAYS_BEGIN">
        <amqp:publish config-ref="config" exchangeName="${billingService}" transactionalAction="ALWAYS_JOIN"/>
        <amqp:publish config-ref="config" exchangeName="${shipmentService}" transactionalAction="ALWAYS_JOIN"/>
        <amqp:publish-consume config-ref="AMQP_Config" exchangeName="${invoicesVerificationService}"/>
        <validation:is-true expression="#[payload]"/>
    </try>
</flow>

We use cookies to make interactions with our websites and services easy and meaningful, to better understand how they are used and to tailor advertising. You can read more and make your cookie choices here. By continuing to use this site you are giving us your consent to do this.

+