AMQP でのトランザクションの処理 - Mule4

AMQP のトランザクション接続により、実際にはトランザクションがコミットされたときにのみ実行される一連の操作を実行できます。主な 2 つのシナリオは次のとおりです。

  • トランザクションの一環でメッセージをパブリッシュする: publish​ を実行するときに、メッセージはトランザクションが​コミット​されたときに効果的に宛先に送信されます。何らかの理由でトランザクションが失敗して​ロールバック​が発生すると、メッセージは送信されません。

  • トランザクションの一環でメッセージをコンシュームする: AMQP メッセージをコンシュームする場合、​listener​ または ​consume​ を使用してメッセージが肯定応答されるのは​トランザクションがコミットされた後のみ​であり、トランザクションが​ロールバック​された場合は代わりに再配信のために宛先に返されます。

AMQP Connector は、Mule の ​transactionalAction​ 設定を使用して、トランザクションとしての操作の実行を標準でサポートしています。

トランザクションの一環で操作を実行するには、次の 2 つのオプションがあります。

新規メッセージのトランザクションセッションでの処理

メッセージをディスパッチするときに AMQP ​listener​ でトランザクションセッションを使用するには、​transactionalAction​ を 'ALWAYS_BEGIN' に設定する必要があります。

<amqp:listener
    config-ref="AMQP_Config"
    queueName="${originQueue}"
    transactionalAction="ALWAYS_BEGIN"/>
xml

この設定により、新しいメッセージはそれぞれフローのすべてのコンポーネントに伝播されるトランザクションで処理され、フロー実行が正常に完了したときにコミットされます。フロー実行が完了したがエラーが発生した場合、トランザクションはロールバックされます。

デフォルトでは、フローの​他のコンポーネント​はリスナーによって作成されたトランザクションに参加しません。リスナーのトランザクションを使用して他の操作を実行するには、'​ALWAYS_JOIN​' または '​JOIN_IF_POSSIBLE​' を使用して宣言する必要があります。

トランザクションの一部としての操作の実行

トランザクションの一環で ​publish​ や ​consume​ などの操作を実行するには、​transactionalAction​ を ​ALWAYS_JOIN​ または ​JOIN_IF_POSSIBLE​ に設定します。

これらの操作は、リスナーが開始するトランザクションの一部となります。

<flow name="joiningToListenerTransaction">
    <amqp:listener
        config-ref="AMQP_Config"
        queueName="${originQueue}"
        transactionalAction="ALWAYS_BEGIN"/>
    <amqp:publish
        config-ref="AMQP_Config"
        exchangeName="#[attributes.properties.userProperties.redirectDestination]"
        transactionalAction="JOIN_IF_POSSIBLE"/>
    <amqp:consume
        config-ref="AMQP_Config"
        queueName="#[attributes.properties.userProperties.callbackDestination]"
        transactionalAction="JOIN_IF_POSSIBLE"/>
</flow>
xml

または、スコープの絞られたトランザクションに参加できます。

<flow name="nonTxPublishMustNotJoinCurrentTx">
    <http:listener config-ref="HTTP_Config" path="/orders"/>
    <try transactionalAction="ALWAYS_BEGIN">
        <amqp:publish
            config-ref="config"
            exchangeName="${billingService}"
            transactionalAction="ALWAYS_JOIN"/>
        <amqp:publish
            config-ref="config"
            exchangeName="${shipmentService}"
            transactionalAction="ALWAYS_JOIN"/>
        <amqp:publish-consume
            config-ref="AMQP_Config"
            exchangeName="${invoicesVerificationService}"/>
        <validation:is-true expression="#[payload]"/>
    </try>
</flow>
xml