バッチ処理

Mule Runtime Engine (Mule) では、メッセージをバッチ処理できます。バッチジョブは、アプリケーション内で開始できます。これは、大きなメッセージを個々のレコードに分割し、各レコードに対してアクションを実行してから、結果を報告し、場合によっては処理された出力を他のシステムまたはキューに転送するコードブロックです。

100 万件のレコードであっても、バッチアプリケーションなら 3 つのステップで処理できるのです。Mule ジョブのフェーズを移動しながら各レコードを処理している間には、いくつかの I/O 操作が発生します。バッチジョブのパフォーマンスは、ディスクの特性とワークロードのサイズに大きく左右されます。特に入力フェーズでは、ディスク上にキューが作成され、処理するレコードが登録されます。このキューはフローによって継続的に読み書きされるため、I/O 負荷が大きくなります。

さらにバッチ処理には、スレッドを並行して処理するのに十分な使用可能メモリが必要です。つまり、レコードを永続的なストレージから RAM に移動します。レコードとその数量が大きいほど、バッチ処理に必要な使用可能メモリが多くなります。

Batch Block Size (バッチブロックサイズ)

デフォルトでは、バッチブロックサイズは 100 に設定されています。さまざまなレコードサイズで代表的なバッチのユースケースを分析した結果、このサイズが最もパフォーマンスとワーキングメモリ要件のバランスが取れていることが判明しています。ただし、各アプリケーションの最適な値は、ユースケースによって異なります。
このプロパティは、バッチジョブごとに設定できます。特定のユースケースでカスタムサイズが最適であると思われる場合は、いろいろなサイズで比較テストを実行して、ユースケースに最適なサイズを見つけてください。次の例にバッチブロックサイズを示します。

<batch:job jobName="test-batch" blockSize="${batch.block.size}">

Max Concurrency (最大同時実行)

Batch スコープ内の処理に使用されるスレッドの最大数は、デフォルトで JVM で検出されるコア数の 2 倍になります。CloudHub や Anypoint Runtime Fabric のような断片的コアデプロイメントの場合、JVM では​非断片的​コア数が検出されます。自動設定された同時実行設定を手動で増減できます。Max Concurrency (最大同時実行) 設定には、それを上回るスレッドを保証するにはレコード数が足りない場合などに、バッチジョブインスタンスに使用するスレッドの上限数を設定できます。

次の例では、自動設定された ​maxConcurrency​ (デフォルトでプロパティから読み込まれた設定になる) を上書きします。

<batch:job jobName="an-example-batch" maxConcurrency="${batch.block.maxConcurrency}">

Batch Aggregator

スレッドが 1 つか複数かに関係なく、100 万回ネットワークを往復すればネットワークに影響を与えます。SaaS システムとの通信ではすぐに API コールのクォータがコンシュームされることがあります。レコードを 1 件ずつ処理するのではなく、Batch Aggregator スコープを使用し、レコードの配列に対して一括操作を実行することでパフォーマンスを改善できます。このスコープではアグリゲータのサイズ設定を使用して各配列のレコード数を制限します。

1 つのレコード配列で処理するレコード数を決定するときには、通信に使用するシステムの操作で許可される最大レコード数を考慮する必要があります。アグリゲータのサイズ設定と​それより小さい値​でパフォーマンステストを実施することが重要です。ロジックは設定値以下のどのレコード数でも処理する必要があります。システムが受信するレコードの数が指定値よりも少ない可能性があるからです。たとえば、120 レコードがあり、アグリゲータのサイズ設定が 50 の場合、最後の送信は 20 レコードになります。

次の例は、Batch Aggregator スコープが固定サイズの ​50​ に設定された Batch Step スコープを示します。

<batch:step name="Batch_Step_PushToXYZ" >
  <batch:aggregator doc:name="Batch Aggregator" size="50">
     <!-- code for an array of 50 (or fewer) records -->
  </batch:aggregator>
</batch:step>

アグリゲータにデータのストリーミング操作が含まれる場合、アグリゲータのサイズ設定 (​streaming="true"​) を使用する代わりにスコープのストリーミングオプション (​size="50"​) をアクティブ化できます。

Batch Job の複数同時インスタンス

Batch Job スコープの多くのユースケースでは一度に 1 つの要求 (バッチジョブインスタンス) のみを処理しますが、同時に複数のバッチジョブインスタンスを実行するケースもあります。そうしたケースのスレッドプール共有をチューニングするために、次のいずれかのスケジュール戦略を設定できます。各戦略はバッチインスタンス ID で別個に追跡されます。

  • ORDERED_SEQUENTIAL​: インスタンスをトリガされた順に処理することで同時インスタンスの影響を制限するデフォルトのスケジュール戦略。この戦略は、同じバッチジョブを順不同で処理するとデータの不整合などの問題が発生したり、リソース制限に達したりする可能性がある場合に便利です。

  • ROUND_ROBIN​: スレッドを「インフライト」のバッチジョブインスタンスと共有するスケジュール戦略です。

関連情報