Flex Gateway新着情報
Governance新着情報
Monitoring API Manager操作と同様に、メッセージソースではトランザクションがサポートされています。メッセージソースの例としては、JMS や VM Connector リスナーがあります。どちらのリスナーも、キューからメッセージを取得してフローにプッシュします。メッセージ処理が成功すると、トランザクションがコミットされます。失敗すると、トランザクションがロールバックされ、メッセージがキューに戻ります。
操作と同様に、メッセージソースはトランザクションになるように TransactionalConnection
インターフェースを実装する接続種別で動作する必要があります。接続で XATransactionalConnection
が実装されている場合、XA トランザクションも自動的にサポートされます。
public class TransactionalSource extends Source<Serializable, Void> {
@Connection
private ConnectionProvider<MyTransactionalConnection> connectionProvider;
....
}
メッセージソースがトランザクションの場合、transactionalAction
という名前の合成パラメーターが自動的に追加されます。そのパラメーターは、次の値を取ることができる enum
型です。
ALWAYS_BEGIN: 呼び出しごとに新しいトランザクションが作成されます。
NONE: ソースがトランザクションを開始せずに、フローで開いているトランザクションに参加しないことを示します。
ソースで XA トランザクションがサポートされている場合、transactionType
パラメーターが追加されます。このパラメーターは、次の値を取ることができます。
LOCAL: 通常のトランザクションを開始します。
XA: 代わりに XA トランザクションを開始します。
SDK では、トランザクションは接続を介してモデル化されます。そのため、トランザクションごとに異なる接続インスタンスが必要になります。これは、メッセージソースでは並列性がサポートされている必要がありますが、同じ接続で異なる 2 つのトランザクションを同時に提供できないためです。
そのため、SDK ドキュメントの他の場所で使用されている HTTP リスナーサンプルとは異なるモデルが必要になります。そこで、VM Connector の <vm:listener />
を使用する次の簡略化された例を考えてみましょう。
public VMListener extends Source<Serializable, VMMessageAttributes> {
@Connection
private ConnectionProvider<QueueSession> sessionProvider;
@Override
public void onStart(SourceCallback<Serializable, VMMessageAttributes> sourceCallback) throws MuleException {
while(notStopped()) {
QueueSession session = sessionProvider.connect(); (1)
CallbackContext ctx = callback.createContext(); (2)
TransactionHandle status = ctx.bindConnection(session); (3)
try {
callback.handle(session.poll(), ctx); (4)
} catch (Exception e) {
status.rollback();
}
}
}
@OnSuccess
public void onSuccess(SourceCallbackContext context) {
handleSuccess(context.getConnection()); (5)
}
@OnError
public void onError(SourceCallbackContext context, Error error) {
handleError(context, error);
}
}
1 | 複数の接続が取得されます。 |
2 | 接続ごとに新しいCallbackContext が作成されます。 |
3 | 作成された SourceCallbackContext の bindConnection() メソッドを介して各接続がコンテキストに登録されます。 |
4 | コンテキストが handle メソッドに渡されます。 |
5 | 接続は後で SourceCallbackContext.getConnection() メソッドを介してすべての onSuccess 、onError 、onTerminate methods で使用できるようになります。 |
接続はコンテキストにバインドされているため、Runtime は必要に応じて自動的に接続を終了します。接続がトランザクションで、ソースもトランザクションになるように設定されている場合、bindConnection()
メソッドをコールすると、その接続で自動的にトランザクションが開始されます。また、接続がトランザクションの場合、Runtime は onSuccess()
メソッドの後に自動的にトランザクションをコミットしたり、onError()
メソッドの後にロールバックしたりします。
このケースでは、トランザクションはキューからメッセージをポーリングする前に開始する必要があります。接続の取得直後に bindConnection() をコールすることをお勧めします。
|
コネクタによっては、カスタムトランザクション処理を提供することが必要になる場合があります。たとえば、エラーの場合にエラー応答をパブリッシュしたいとします。この場合、ロールバックされると応答が宛先に到達しなくなるため、トランザクションをコミットする必要があります。
@OnError
public void onError(SourceCallbackContext context, Error error) {
ctx.getConnection().publish(buildErrorResponse(error)); (1)
ctx.getTransactionHandle().commit(); (2)
}
1 | onSuccess() および onError() メソッドを実行するとき、接続はまだオープン状態です。 |
2 | コンテキストには、手動でトランザクションを操作できる TransactionHandle が設定されています。トランザクションを手動で解決する場合、Runtime でそのことが考慮され、後で自動的に解決されなくなります。最も一般的なユースケースでは、この作業を行う必要はありません。 |