Contact Us 1-800-596-4880

Handle Transactions in AMQP - Mule 4

Transactional connections in AMQP allow you to execute a series of operations that are actually performed only when the transaction is committed. This has two main scenarios:

  • Publishing a message as part of a transaction: When performing a publish, the message will be effectively sent to the destination once the transaction is committed. If for some reason the transaction fails and a rollback occurs, then the Message will never be sent.

  • Consuming a message as part of a transaction: In case of consuming a AMQP Message, either with the listener or consume, the message will be acknowledged only after the transacion is committed, and instead it will be returned to the destination for redelivery if the transaction is rolledback.

The AMQP Connector provides support for executing its operations in a transactional way out of the box using Mule’s transactionalAction configuration.

In order to execute an operation as part of the transaction, we have the following two options.

Process New Messages With a Transactional Session

If you want a AMQP listener to use a transactional session when dispatching the messages, you should set the transactionalAction to 'ALWAYS_BEGIN':

<amqp:listener
    config-ref="AMQP_Config"
    queueName="${originQueue}"
    transactionalAction="ALWAYS_BEGIN"/>

With this configuration, each new message will be processed in a transaction that is propagated to all the flow’s components and is committed once the flow execution is completed successfully. The transaction will be rolledback if the flow execution completes with an error.

By default, other components in the flow will not join the transaction created by the listener. In order to execute other operations using the listener’s transaction, you must declare them either with 'ALWAYS_JOIN', or 'JOIN_IF_POSSIBLE'.

Execute Operations As Part Of A Transaction

To execute an operation like publish or consume as part of a transaction, set the transactionalAction to ALWAYS_JOIN, or JOIN_IF_POSSIBLE:

These operations can join a transaction initiated by the listener:

<flow name="joiningToListenerTransaction">
    <amqp:listener
        config-ref="AMQP_Config"
        queueName="${originQueue}"
        transactionalAction="ALWAYS_BEGIN"/>
    <amqp:publish
        config-ref="AMQP_Config"
        exchangeName="#[attributes.properties.userProperties.redirectDestination]"
        transactionalAction="JOIN_IF_POSSIBLE"/>
    <amqp:consume
        config-ref="AMQP_Config"
        queueName="#[attributes.properties.userProperties.callbackDestination]"
        transactionalAction="JOIN_IF_POSSIBLE"/>
</flow>

Or they can join a scoped transaction:

<flow name="nonTxPublishMustNotJoinCurrentTx">
    <http:listener config-ref="HTTP_Config" path="/orders"/>
    <try transactionalAction="ALWAYS_BEGIN">
        <amqp:publish
            config-ref="config"
            exchangeName="${billingService}"
            transactionalAction="ALWAYS_JOIN"/>
        <amqp:publish
            config-ref="config"
            exchangeName="${shipmentService}"
            transactionalAction="ALWAYS_JOIN"/>
        <amqp:publish-consume
            config-ref="AMQP_Config"
            exchangeName="${invoicesVerificationService}"/>
        <validation:is-true expression="#[payload]"/>
    </try>
</flow>
View on GitHub