配信元の定義

再起動性の保証

Anypoint Runtime Manager などのツールでは、個別の配信元やフローを、アプリケーションの他の部分から独立して開始および終了できます。

つまり、すべての配信元は以下の条件を満たす必要があります。

  • 機能が影響を受けることなく、いつでも開始および終了できる

  • 停止時は接続、スレッド、ファイルなど、すべてのリソースを開放する

手動ポーリングの禁止

ポーリングによって機能する配信元は、ポーリングロジックを手動で実装しては​なりません​。その代わりに PollingSource クラスを拡張する​必要があります​。

応答が生成された場合の宣言

一部の配信元は、フローに対してメッセージをトリガーするだけです。一方で、外部システムに応答を送信するソースもあります (例: HTTP Connector)。

応答を発信する配信元には、​@EmitsResponse​ アノテーションを付ける​必要があります​。

非同期コールバックの処理

配信元は、非同期タスクが実行されるコールバック (@OnSuccess や @OnError のアノテーションが付けられたメソッド) を定義します。これらの非同期タスクは、非ブロック I/O を使用した応答の送信から、通知の送信やタスクの監査までさまざまです。

非同期コールバックでは、配信元は SourceCompletionCallback を使用してタスクの完了を通知する​必要があります​。​「メッセージ配信元からの非同期応答の送信」​を参照してください。

トランザクションを使用する

配信元がトランザクションシステムからメッセージを取得する場合は、​トランザクション​を使用する​必要があります​。

バックプレッシャーの処理

Mule ​実行エンジン​は、バッファが満杯になると、メッセージ配信元に対してバックプレッシャーを適用する場合があります。デフォルトでは、バックプレッシャーが発生すると、SDK はメソッドをブロックし、Mule が要求を処理するためのリソースを用意するまで配信元を強制的に待機させます。

org.mule.runtime.extension.api.runtime.source.SourceCallback#handle(org.mule.runtime.extension.api.runtime.operation.Result<T,A>)

このデフォルト動作はほとんどのケースに適しますが、モジュールが異なるアクションを必要とする場合もあります。次に例を示します。

  • HTTP Connector は、待機するのではなく 503 応答を送信します。

  • JMS Connector の場合は、メッセージングドメインの有効なオプションとして待機、失敗、メッセージの削除があるため、これらのどれを実行す​べきか​を配信元で設定できます。

モジュールのドメインに、待機をデフォルトとす​べきではない​、あるいは追加のオプションを用意す​べきである​ユースケースがある場合には、配信元で @BackPressure アノテーションを使用して、必要な動作を提供する​必要があります​。

バックプレッシャーが発生した場合のカスタムアクション

一部のモジュールでは、バックプレッシャーエラーが発生した場合に、次のようなカスタムアクションを実行する必要があります。

  • トランザクションの解決

  • リソースの開放

  • 外部システムへの通知の送信

これらのモジュールでは、@OnBackPressure アノテーションを付けたコールバックメソッドを含めることで、必要な処理を実行する​必要があります​。次に例を示します。

@OnBackPressure
public void onBackPressure(BackPressureContext ctx) {
  // .. implementation
}

実行するアクションが非同期の場合は、SourceCompletionCallback も使用する​必要があります​ので注意してください。

@OnBackPressure
public void onBackPressure(BackPressureContext ctx, SourceCompletionCallback completionCallback) {
  // .. implementation
}

トランザクションの解決

配信元がトランザクションの場合は、トランザクションを解決するバックプレッシャーハンドラーを追加する​必要があります​。

クラスター動作の想定

すべての配信元は、Mule がクラスターモードで動作し、同じアプリケーションが CloudHub の複数のワーカーにデプロイされた場合の動作を想定する​必要があります​。

一部のケースでは、すべてのレプリカで配信元を実行しても問題ありません。このケースの例としては <http:listener> があります。基礎となるプロトコルによって、2 つのノードが同じファイルを同時に選択しないことが保証されているためです。これはデフォルトの動作です。

一方で、配信元をすべてのレプリカで実行すると動作不良やデータ破壊を招くため、プライマリノードでのみ実行す​べき​ケースもあります。このケースの例としては、トピックをリスンする ​<jms:listener>​ や、イベントストリームを読み取る配信元があります。これらの場合には、配信元に @ClusterSupport アノテーションを付けて、​NOT_SUPPORTED​ または ​DEFAULT_PRIMARY_NODE_ONLY​ オプションを指定する​必要があります​。

表示名

配信元では、@DisplayName を使用して、「On …​」プレフィックスによって、配信元、リスナー、トリガーを実行またはトリガーするイベントやアクションを指定す​べき​です。例を示します。

  • JMS Connector

    配信元は On New Message で呼び出され、新しいメッセージがキューに到着するとメッセージを配信元がディスパッチすることを通知します。

  • データベースコネクタ

    On Table Row により、各テーブル行でトリガーされることを通知します。

  • Salesforce Connector

    On Created Object により、コネクタが Salesforce で新しいオブジェクトを検出するたびにフローをトリガーすることを通知します。

DSL の命名

DSL レベルでは、配信元は代わりに ​listener​ サフィックスを使用するか、または単純にリスナー名を使用す​べき​です。

次に例を示します。

  • HTTP Connector は、インバウンド HTTP エンドポイントを公開するために <http:listener> メッセージ配信元を定義しています。

  • WebSockets Connector は、<ws:inbound-listener> 配信元と <ws:outbound-listener> 配信元を定義しています。

これらは @Alias アノテーションを使用して制御できます。