Batch コンポーネントの設定

Mule バッチ処理コンポーネントをセットアップおよび設定して、一般的なバッチ処理のユースケースを実行したり、デフォルトを調整する必要がある場合にバッチジョブのパフォーマンスを改善したりできます。

Batch Step コンポーネント内で処理するレコードの絞り込み

1 つ以上のレコード検索条件を属性として任意の数の Batch Step コンポーネントに適用できます。たとえば、特定の Batch Job コンポーネント内の最初の Batch Step コンポーネントで各レコードの Salesforce 取引先責任者をチェックして、2 番目の Batch Step コンポーネントでそれらのレコードの取引先責任者情報を更新します。2 番目の Batch Step で最初のステップで成功したレコードのみを処理するようにするには、2 番目の Batch Step で検索条件を設定します。検索条件によって処理が合理化されるため、Mule は特定のステップの関連するデータにのみ集中できます。

レコードを絞り込むために、Batch Step コンポーネントでは、1 つの ​acceptExpression​ と 1 つの ​acceptPolicy​ がサポートされています。どちらも省略可能で、他の Mule コンポーネントではこれらの検索条件を使用できません。同じコンポーネントで両方の検索条件を使用する場合、Mule は検索条件を次の順序で評価します。

  1. 受け入れポリシー: acceptExpression

  2. 受け入れ式: acceptPolicy

acceptExpression​ 属性を使用する Batch Step コンポーネントは、このコンポーネントに到達する各レコードに DataWeave 式を適用し、式が ​true​ に評価されたレコードのみを受け入れて、コンポーネント内で処理します。レコードが ​false​ に評価される場合、Batch Step コンポーネントはレコードをスキップし、そのレコードは Batch Job コンポーネント内の次の Batch Step コンポーネント (存在する場合) で使用できるようになります。

以下の例では、​age​ の値が ​21​ 以下のすべてのレコードを除外します。

<batch:job jobName="batchJob">
  <batch:process-records >
    <batch:step name="adultsOnlyStep" acceptExpression="#[payload.age > 21]">
      ...
    </batch:step>
  </batch:process-records>
</batch:job>

Batch Step コンポーネントは、​acceptPolicy​ 属性を使用して、以前の Batch Step コンポーネントで処理されたレコードにポリシーを適用します。Batch Step は、ポリシーが ​true​ に評価されたレコードのみを受け入れて、コンポーネント内で処理します。デフォルトのポリシーは ​NO_FAILURES​ です。他のポリシーは ​ONLY_FAILURES​ と ​ALL​ です。これらのポリシーの詳細は、​「Batch Step コンポーネント (<batch:step/>)」​を参照してください。

次の例では、2 番目の Batch Step コンポーネント (​batchStep2​) で、先行するステップ (​batchStep1​) で正常に処理できなかったレコードのみを受け入れます。たとえば、​batchStep1​ 内のプロセッサで Salesforce 取引先責任者の各レコードをチェックするとします。チェックで取引先責任者が見つからなかった場合、​batchStep2​ は、取引先責任者のないレコードのみを受け入れて、取引先責任者のあるレコードはスキップします。​batchStep2​ 内のメッセージプロセッサは取引先責任者をそれらのレコードに追加することができます。

<batch:job jobName="batchJob">
  <batch:process-records >
	  <batch:step name="batchStep1" >
      <!-- Check for contact -->
		  ...
    </batch:step>
    <batch:step name="batchStep2" accept-policy="ONLY_FAILURES">
      <!-- Accept records that failed -->
		  ...
    </batch:step>
  </batch:process-records>
</batch:job>

次の例では、デモ目的で Raise Error コンポーネント (​<raise-error />​) を使用して、最初の Batch Step (​batchStep1​) で失敗が発生するようにします。2 番目の Batch Step (​batchStep2​) は、失敗したレコード (​ONLY_FAILURES​) のみを受け入れ、それらのレコードのペイロードを記録します。3 番目は、デフォルトの ​acceptPolicy​ を使用して ​NO_FAILURES​ を受け入れます。

<flow name="batch-exampleFlow" >
  <scheduler doc:name="Scheduler" >
    <scheduling-strategy >
      <fixed-frequency frequency="75" timeUnit="SECONDS"/>
    </scheduling-strategy>
  </scheduler>
  <!-- Set Payload -->
  <set-payload value="#[[1,2,3,4,5]]" doc:name="Set Payload" />
  <!-- Batch Job Component -->
  <batch:job jobName="Batch_Job" maxFailedRecords="-1">
    <batch:process-records >
      <!-- First Batch Step defaults to NO_FAILURES -->
      <batch:step name="batchStep1" acceptPolicy="ALL"
                  acceptExpression="#[payload &lt; 3]">
        <!-- Raising Error -->
        <raise-error doc:name="Raise error"
                     type="MY:APP" description="Example: Raising Error "/>
      </batch:step>
      <!-- Second Batch Step: ONLY FAILURES -->
      <batch:step name="batchStep2" acceptPolicy="ONLY_FAILURES">
        <logger level="INFO" doc:name="Logger" message="#[payload]"
                category="LOGGER IN SECOND BATCH STEP"/>
      </batch:step>
      <batch:step name="batchStep3" >
        <logger level="INFO" doc:name="Logger" message="#[payload]"
                category="LOGS IN THIRD BATCH STEP"/>
      </batch:step>
    </batch:process-records>
  </batch:job>
</flow>
  • Set Payload コンポーネント (​<set-payload />​) は、配列 ​[1,2,3,4,5]​ を Batch Job コンポーネントに送信します。

  • Batch Job コンポーネント (​<batch:job />​) は、最初の Batch Step コンポーネント内で失敗したレコードによって Batch Job インスタンスの処理が停止しないように ​maxFailedRecords​ を ​-1​ に設定します。

  • 最初の Batch Step (​batchStep1​) は、​acceptExpression="#[payload < 3]"​ を設定して、ペイロードが ​3​ 未満のレコードのみを受け入れ、そのコンポーネント内の Raise Error コンポーネントにより、それらのレコードでエラーをスローします。

    INFO  ...DefaultBatchStep: Found exception processing record on step 'batchStep1' for job instance 'f9bb6d60-5ef2-11ed-81e7-147ddaaf4f97'
    of job 'batch-example-policiesBatch_Job'.
  • 2 番目の Batch Step (​batchStep2​) は、次のように失敗したレコードのペイロードを出力します。

    INFO  ... LOGGER IN SECOND BATCH STEP: 1
    INFO  ... LOGGER IN SECOND BATCH STEP: 2
  • 3 番目の Batch Step (​batchStep3​) は、次のように成功したレコードのペイロードを出力します。

    INFO ... LOGS IN THIRD BATCH STEP: 3
    INFO  ...LOGS IN THIRD BATCH STEP: 4
    INFO  ...LOGS IN THIRD BATCH STEP: 5
  • Batch Job インスタンスの完了後、​3​ つのレコードが正常に処理されて、​2​ つのレコードが失敗したことがログに示されます。

    INFO  ...engine.DefaultBatchEngine: Finished execution for instance ...
    Total Records processed: 5. Successful records: 3. Failed Records: 2

詳細は、「Batch Job 中のエラー処理」を参照してください。

Batch Aggregator コンポーネントからの一括操作の実行

集約は、配列内の複数のレコードを外部サーバに送信する場合に便利です。Batch Aggregator コンポーネント内で操作 (一括更新、挿入、更新など) を追加することで、レコードごとに個別に操作を実行する代わりに、操作の 1 つの実行で複数のレコードをサーバに読み込むことができます。

固定サイズの個別の配列でレコードを処理することも、Batch Job インスタンスから 1 つのレコード配列をストリーミングすることもできます。たとえば、200 件の処理済みレコードを更新/挿入するように、Salesforce 用 Anypoint Connector (Salesforce Connector) で Upsert 操作を設定できます。または、インスタンス内のすべてのレコードをサーバにストリーミングすることもできます。

次の例では、更新/挿入あたり 200 件のレコードの個別の配列でレコードを一括更新/挿入します。

<batch:job jobName="batchJob">
  <batch:process-records >
    <batch:step name="batchStep">
      <batch:aggregator size="200">
        <salesforce:upsert doc:name="Upsert" ... />
      </batch:aggregator>
    </batch:step>
  </batch:process-records>
</batch:job>

次の例では、データベースに更新/挿入をストリーミングします。

<batch:job jobName="batchJob">
	<batch:process-records >
		<batch:step name="batchStep">
		  <batch:aggregator streaming="true">
		    salesforce:upsert doc:name="Upsert" ... />
		  </batch:aggregator>
		</batch:step>
	</batch:process-records>
</batch:job>

エラー処理:
Salesforce 用 Anypoint Connector (Salesforce Connector)​ や ​NetSuite 用 Anypoint Connector (NetSuite Connector)​ などの一部のコネクタは、バッチ集約プロセス全体を失敗させることなくレコードレベルのエラーを処理します。実行時に、これらのコネクタは対象リソースによって正常に受け入れられたレコード、および失敗したレコードを追跡します。レコードのグループ全体を失敗させる代わりに、できるだけ多くのレコードを更新/挿入し、通知目的で失敗を追跡します。

Batch Aggregator コンポーネント内の処理や、Batch Step コンポーネント内の処理との集約の違いについての詳細は、​「Process フェーズ」​を参照してください。

Batch Aggregator 内でのレコードの変更

Batch Step コンポーネントのプロセッサを使用して変更する場合と同様に、Batch Aggregator 内でレコードを変更できます。変更の実行は、順次または特定のレコードへのランダムアクセスで行うことができます。

Batch Aggregator 内の順次処理

次の例では、レコードへの順次アクセスを実行します。この例では、​Scripting Module​ 内の Groovy を使用して、​For Each​ コンポーネントの反復から各レコードのペイロードを変更し、収集したレコードごとに変数を作成できます。

<batch:job jobName="batchJob">
  <batch:process-records>
    <batch:step name="batchStep">
      <batch:aggregator doc:name="batchAggregator" size="10">
        <foreach doc:name="For Each">
          <script:execute engine="groovy">
            <script:code>
	      vars['marco'] = 'polo'
	      vars['record'].payload = 'hello'
	    </script:code>
	  </script:execute>
        </foreach>
      </batch:aggregator>
    </batch:step>
  </batch:process-records>
</batch:job>

Batch Aggregator 内でのレコードへのランダムアクセス

反復回数でレコードにランダムにアクセスする場合、For Each スコープを使用することもできます。For Each は、​records​ 変数 (反復を追跡するために使用する不変リスト) を公開します。この変数を使用して、Batch Aggregator コンポーネントからリストのレコードにランダムにアクセスできます。

次の例では、固定サイズの集約を使用するときのランダムなアクセスを示すために、レコードの For Each リストでレコードのインデックスを指定します。各レコードに順次アクセスする代わりに、レコードのインデックスが含まれる ​records​ 変数を使用して、リストから 1 つのレコードを選択します。この例では、Scripting Module を使用して、選択したレコードのペイロードを変更し、そのレコードの変数を作成します。

<batch:job jobName="batchJob">
  <batch:process-records>
    <batch:step name="batchStep">
      <batch:aggregator doc:name="batchAggregator" size="10">
        <foreach doc:name="For Each">
	  <script:execute engine="groovy">
	    <script:code>
	      records[0].vars['marco'] = 'polo'
	      records[0].vars['record'].payload = 'hello'
	    </script:code>
	  </script:execute>
	</foreach>
      </batch:aggregator>
    </batch:step>
  </batch:process-records>
</batch:job>

コンテンツをストリーミングするように Batch Aggregator コンポーネントを設定できます。レコードをストリーミングするようにこのコンポーネントを設定すると (​streaming="true"​)、レコードの数や大きさに関係なく、メモリ不足にならずに Batch Job インスタンス内のすべてのレコードの配列を処理できます。たとえば、CSV ファイルに数百万件のレコードを書き込む必要がある場合、Batch Aggregator コンポーネントを使用してレコードをストリーミングできます。

<batch:job jobName="batchJob">
  <batch:process-records >
    <batch:step name="batchStep">
      <batch:aggregator streaming="true">
        <file:write path="reallyLarge.csv">
	  <file:content><![CDATA[%dw 2.0
	    ...

	  }]]></file:content>
      </batch:aggregator>
    </batch:step>
  </batch:process-records>
</batch:job>

次の例に示すように、Batch Aggregator コンポーネント内でストリーミングする場合は、レコードへの順次アクセスのみが可能です。ストリーミングでは、レコードセット全体にアクセスする必要があるため、ランダムアクセスはサポートされません。Mule では、固定サイズが指定されていない場合、すべてのレコードがメモリ内に収まることが保証されないため、この制限は必須です。

<batch:job jobName="batchJob">
  <batch:process-records>
    <batch:step name="batchStep">
      <batch:aggregator doc:name="batchAggregator" streaming="true">
        <foreach doc:name="For Each">
	  <script:execute engine="groovy">
	    <script:code>
	      vars['marco'] = 'polo'
	      vars['record'].payload = 'foo'
	    </script:code>
	  </script:execute>
	</foreach>
      </batch:aggregator>
    </batch:step>
  </batch:process-records>
</batch:job>

Batch Aggregator コンポーネント内の処理や、Batch Step コンポーネント内の処理との集約の違いについての詳細は、​「Process フェーズ」​を参照してください。

NOTE

Batch Aggregator コンポーネントを使用するときには、​ImmutableMap​ などの Guava データ型を処理しようとしないでください。代わりに、Java Map を使用してシリアル化の問題を回避します。

集約されたレコードの MIME タイプの保持

Mule 4.3 での導入

集約されたレコードは、各レコードのペイロードが含まれる配列コンポーネントとして Batch Aggregator コンポーネントに渡されます。ただし、デフォルトではそれらのペイロードに関連付けられている MIME タイプは保持されません。レコードの MIME タイプを保持するには、Batch Aggregator コンポーネントの ​preserveMimeTypes​ 属性を指定します。

次の JSON 配列を考えてみます。

[
  {
    "name": "Tony Stark",
    "alias": "Iron Man",
    "status": "DEAD"
  },
  {
    "name": "Steve Rodgers",
    "alias": "Captain America",
    "status": "RETIRED"
  },
  {
    "name": "Peter Parker",
    "alias": "SpiderMan",
    "status": "FUGITIVE"
  }
]

JSON 配列​が次の Batch Job コンポーネントの入力だとします。

<batch:job name="avengersLogger">
  <batch:process-records>
    <batch:step name="log">
      <batch:aggregator size="10">
        <foreach>
	  <logger message="Agent #[payload.alias] is #[payload.status]" />
	</foreach>
      </batch:aggregator>
    </batch:step>
  </batch:process-records>
</batch:job>

バッチエンジンは入力 JSON 配列を個々のレコードに分割します。つまり、アグリゲータブロックで 3 つの要素が含まれる配列を受信します。1 つ目は次のようになります。

{
  "name": "Tony Stark",
  "alias": "Iron Man",
  "status": "DEAD"
}

ただし、Logger コンポーネントで ​#[payload.alias]​ 式を評価しようとすると、次の結果のようなエラーが発生します。

********************************************************************************
Message               : "You called the function 'Value Selector' with these arguments:
  1: Binary ("ewogICJmaXJzdE5hbWUiOiAiUmFtIiwKICAibGFzdE5hbWUiOiAiUmFtMSIsCiAgImFkZHJlc3Mi...)
  2: Name ("alias")

But it expects one of these combinations:
  (Array, Name)
  (Array, String)
  (Date, Name)
  (DateTime, Name)
  (LocalDateTime, Name)
  (LocalTime, Name)
  (Object, Name)
  (Object, String)
  (Period, Name)
  (Time, Name)

5|                                         name: payload.alias,
                                                 ^^^^^^^^^^^^^

このエラーは、MIME タイプが保持されないことが原因で発生します。これにより、Mule で JSON として読み取らなくなります。この問題を解決するには、Batch Aggregator コンポーネントの ​preserveMimeTypes​ 属性を ​true​ に設定します。次に例を示します。

<batch:aggregator size="10" preserveMimeTypes="true">
  <foreach>
    <logger message="Agent #[payload.alias] is #[payload.status]" />
  </foreach>
</batch:aggregator>

この設定により、Mule で各レコードのメディア種別が保持され、ペイロードが JSON ドキュメントとして処理されます。

レコードブロックサイズの変更

従来のオンライン処理モデルでは、各要求は通常、ワーカースレッドにマップされます。処理種別に関係なく、通常、要求と実行中のスレッドの間には 1 対 1 のリレーションがあります。このリレーションは、同期、非同期、一方向、要求-応答の処理種別や、処理前に要求が一時的にバッファされる場合に保持されます。

ただし、Batch Job の場合、Process (処理) フェーズが始まる前にレコードが最初に永続的なキューに保存されるため、従来のスレッドモデルは適用されません。

パフォーマンスを向上させるため、Mule Runtime はバッチレコードをスレッドごとに最大 100 件のレコードのブロックでキューに追加し、スケジュールします。この動作により I/O 要求数が削減され、操作の負荷が改善されます。Batch Job は Mule のスレッドプールを使用するため、ジョブのデフォルトはありません。各スレッドはそのブロックを反復処理して各レコードを処理し、その後各ブロックはキューに戻され、プロセスが続行されます。

3 ステップの Batch Job では、キューに 100 万件のレコードを追加することを検討してください。Mule Runtime Engine がジョブのフェーズ間を移動するときに各レコードの取得と要求を行うと、少なくとも 300 万回の I/O 操作が発生します。
パフォーマンスには、スレッドを並行して処理するのに十分な使用可能メモリが必要です。つまり、レコードを永続的なストレージから RAM に移動します。レコードとその数量が大きいほど、バッチ処理に必要な使用可能メモリが多くなります。

Batch Job の 1 スレッドあたり最大 100 件のレコードの標準モデルは、ほとんどのユースケースで機能しますが、ブロックサイズの増減が必要な次の 3 つのユースケースを考慮してください。

  • Batch Job で処理するレコードが 200 件あるとします。デフォルトの 100 件のレコードのブロックサイズでは、Mule が同時に並行処理できるレコード数は 2 件のみです。Batch Job で処理するレコードが 101 件未満の場合、順次処理になります。負荷の高いペイロードを処理する必要がある場合、100 件のレコードをキューに追加するには大量の作業メモリが必要です。

  • 画像を処理する必要がある Batch Job があり、平均画像サイズは 3 MB だとします。この場合、Mule は、各スレッドで 3 MB のペイロードを持つ 100 件のレコードのブロックを処理します。したがって、デフォルトの threading-profile 設定では、ブロックをキューで保持するためだけに大量の作業メモリが必要になります。この場合、より低いブロックサイズを設定して、より多くのジョブを介して各ペイロードを分散し、使用可能メモリの負荷を減らします。

  • 500 万件のレコードがあり、ペイロードが非常に小さいためにメモリ内に 500 件のレコードのブロックを問題なく入れることができるとします。より大きなブロックサイズを設定すると、作業メモリの負荷を犠牲にすることなく Batch Job の時間を短縮できます。

この機能を最大限に活用するには、ブロックサイズが Batch Job に与える影響を理解する必要があります。異なる値とテストパフォーマンスを使用して比較テストを実行すると、この変更を本番に移行する前に最適なブロックサイズを見つけるために役立ちます。

バッチブロックサイズの変更は省略可能です。変更を適用しない場合、デフォルト値はブロックあたり 100 件のレコードです。

Batch Job コンポーネントを使用してサイズを設定します。次に例を示します。

<batch:job jobName="atch_Job" blockSize="200">
  ...
</batch:job>

Batch Job インスタンスの最大同時実行の制限の設定

Max Concurrency (最大同時実行) (​maxConcurrency​) プロパティでは、同時に処理するレコードブロックの数が制限されます。

maxConcurrency​ プロパティを次の例のように設定できます。

<batch:job jobName="test-batch" maxConcurrency="${batch.max.concurrency}">
  ...
</batch:job>

デフォルトでは、Batch Job コンポーネントの最大同時実行は使用可能なコアの数の 2 倍に制限されます。Mule インスタンスを実行しているシステムの容量によっても、同時実行が制限されます。