スレッドと非同期処理

ほとんどのモジュールではスレッドセーフなデザインを心掛けるという点を除いては、同時実行の問題を抱える​べきではありません​。

ただし、モジュールがタスクを非同期に実行したり、繰り返し発生するタスクをスケジュールしたりする必要のあるユースケースもあります。このセクションでは、そのためのベストプラクティスについて詳しく説明します。

このセクションの説明では、「Scheduler」は org.mule.runtime.api.scheduler.Scheduler クラスのインスタンスを指します。

非同期処理と非ブロック I/O を混同しないでください。これらは同じではありません。

モジュールで Threads や Executors を作成しない

モジュールが自身の Threads、Executors、または ExecutorServices を作成しては​なりません​。すべての非同期タスクは、Mule の SchedulerService から Scheduler を取得して実行する​必要があります​。

SchedulerService API については、その Javadocs で詳細に説明されています。記載されているコントラクトに留意して、正しく使用してください。

連動関係によって作成されるスレッド

モジュールが外部配信元と連動しなければ自身のスレッドを作成できない場合があります。次に例を示します。

  • ブローカーライブラリと連動して自身のコンシューマー Threads を作成する JMS Connector

  • 自身の I/O セレクター Threads を生成するクライアントライブラリ

  • 自身の Threads を生成する JDBC ドライバー

これらのケースには回避策がないため、ルールの例外だと言えます。ただし、次の条件を満足する​必要があります​。

  • 他に実行可能な代替手段がなく、Threads の制御下に留まることだけを目的として自身の JMS ブローカーを作成することがないこと

  • これらのライブラリによって作成された Threads には意味のある名前を付ける​必要がある

一部のライブラリでは、Executors のインスタンスを渡して、Threads が提供されていない場合はフォールバックして自身の Threads を作成することができます。このような場合には、使用するメソッドで Executors を渡す​べきです​。詳細は次のカスタム Scheduler のセクションを参照してください。

配信元ではポーリングに Scheduler を使用しない

配信元では、ポーリングの目的で Scheduler を作成しては​なりません​。その代わりに PollingSource クラスを拡張してください。

Scheduler ライフサイクルの管理

Mule のほとんどのコンポーネントと同じように、Scheduler にはライフサイクルがあり、慎重に管理する​必要があります​。

  • 正しい Scheduler 種別の選択

    Scheduler サービスでは、3 種類の Scheduler を提供しています。

    • CPU Light (CPU 負荷小)

      これらの Scheduler は、CPU をそれほど消費せずに短時間で完了する非同期タスクに使用す​べきです​。非ブロック I/O を実行するタスクにも使用できます。たとえば、単純な検証、簡単な計算、I/O を実行せずに 10 ms 前後で完了するあらゆるロジックなどです。

    • CPU Intensive (CPU 負荷大)

      大量の CPU リソースを使用するタスク向けです。I/O 操作や長時間のブロック (I/O スレッドが I/O の実行中に保持したロックを待機する場合など) は、ここで実行す​べきです​。たとえば、暗号化、高負荷の返還、リソースを大量に消費する検証、I/O を実行せずに大量の CPU リソースを 20 ms 以上に渡って使用するあらゆるロジックなどです。

    • IO Scheduler (I/O スケジューラー)

      FTP サイトへの書き込みやリレーショナルデータベースへのアクセスといったブロック I/O 操作向けです。

      これらの Scheduler に送信するタスクは、各 Scheduler が処理対象とする種別の作業に準拠している​必要があります​。これらの Scheduler は、Mule 4 ​実行エンジン​が使用するメインスレッドプールのスライスです 。 各 Scheduler のワークプロファイルに準拠しないと、Mule から最適なパフォーマンスを引き出すことはできません。

  • カスタム Scheduler

    3 つの Scheduler 種別のいずれにも完全には適合しない Scheduler があります。カスタム Scheduler は以下のケースで使用す​べきです​。

    • 一定したレートで繰り返し実行されるタスク (PollingSource の一部とす​べきではない​)。例 (一部):

      • キャッシュ期限タスク

      • リソースリリースタスク

      • I/O セレクター

      • キューコンシューマー

      • 自身のスレッドプールを必要とするタスク

      • 上記の条件のいずれとも完全には一致しない混成プロファイルを持つタスク

  • Scheduler には意味のある名前を付ける

    SchedulerService で取得するすべての Scheduler には、以下を反映した意味のある名前を付ける​必要があります​。

    • Scheduler を作成したモジュールの名前

    • Scheduler を作成したコンポーネントの設定オブジェクトの名前またはフロー内での位置

    • Scheduler の目的

  • 所有コンポーネントの定義

    自身の Scheduler を管理する上で重要な側面は、どのコンポーネントが Scheduler を所有す​べきか​を定義することです。

    • Scheduler を配信元で使用する場合は、その配信元の一部として Scheduler を定義する​必要があります​。

    • Scheduler を接続リリース関連のリソースで使用する場合は、ConnectionProvider が Scheduler を所有す​べきです​。

    • Scheduler がどのコンポーネントにも特有ではないタスクを実行したり、多くのコンポーネントが異なるコンポーネントからタスクを受け取ったりする場合には、設定オブジェクトが Scheduler を所有す​べきです​。

      所有コンポーネントは、ライフサイクルを意識したものである​必要があります​。つまり、次のいずれかを行う​必要があります​。

    • Initialisable および Disposable インターフェースを実装する

    • Startable および Stoppable インターフェースを実装する

  • Scheduler を操作レベルでは作成しない

    Scheduler を操作クラス内で作成しては​なりません​。操作で Scheduler を使用する必要がある場合は、Scheduler を設定要素レベルで作成して、すべての操作で共有する​必要があります​。

  • Scheduler はできる限り共有する

    モジュールは、Scheduler を必要なだけ作成すべきですが、できるだけ少なく抑える​べきです​。別々のコンポーネントがそれぞれ Scheduler を所有したほうが良いユースケースもありますが、できる限りコンポーネント間で Scheduler を共有して再利用する​必要があります​。

  • すべての Scheduler が停止することを保証する

    作成されたすべての Scheduler が最後に stop() メソッドを呼び出すことを保証する​必要があります​。このメソッドの呼び出しは、Scheduler を所有するコンポーネントのライフサイクルと連携させる​べきです​。たとえば、Scheduler が設定要素によって所有される場合、そのコンポーネントは Stoppable インターフェースを実装して、自身の stop() コールの一環としてこのインターフェースを呼び出す​べきです​。配信元は、onStop() メソッドでこの処理を行う​べきです​ (以下同じように続きます)。

Scheduler のライフサイクルの例

次の配信元が所有する Scheduler の例は、すべてのライフサイクルの概念を反映しています。

public class ExampleSourceScheduler extends Source<Serializable, VMMessageAttributes> {

 @Inject
 private SchedulerService schedulerService;

 @Inject
 private SchedulerConfig schedulerConfig;

 private ComponentLocation location;
 private Scheduler scheduler;

 @Override
 public void onStart(SourceCallback<Serializable, VMMessageAttributes> sourceCallback) throws MuleException {
   scheduler = schedulerService.customScheduler(schedulerConfig
       .withMaxConcurrentTasks(numberOfConsumers)
       .withName("vm-listener-flow " + location.getRootContainerName())
       .withWaitAllowed(true)
       .withShutdownTimeout(5, SECONDS));
 }

 @Override
 public void onStop() {
   if (scheduler != null) {
     scheduler.stop();
   }
 }
}