Batch Step 処理の絞り込み

Batch Step が処理するレコードに対して実行する作業を絞り込むことができます。

  • 一部のレコードの処理のみを受け入れるように、Batch Step に​検索条件​を設定できます。

  • レコードをグループに​集約​し、それらを一括更新として外部ソースまたはサービスに送信できます。

このドキュメントでは、バッチ検索条件とバッチコミットの使用方法およびそのタイミングについて説明します。

バッチ検索条件

1 つ以上の検索条件を属性として任意の数の Batch Step に適用できます。
レコードに Salesforce 取引先責任者が存在するかどうかを最初の Batch Step で確認するバッチジョブと、既存の各 Salesforce 取引先責任者を新しい情報で更新する 2 番目の Batch Step があるとします。2 番目の Batch Step に検索条件を適用し、最初の Batch Step で失敗しなかったレコードのみを処理するように設定できます。
Batch Step で一部のレコードのみを受け入れて処理するように、Batch Job を合理化することで、Mule Runtime Engine は特定の Batch Step で関連データのみに集中できます。

Batch Step は、レコードの絞り込みに次の 2 つの属性を使用します。

  • acceptExpression

  • acceptPolicy

各 Batch Step では、レコードの絞り込みに 1 つの acceptExpression 属性と 1 つの acceptPolicy 属性を使用できます。

acceptExpression 属性を使用して、true と評価されたレコードのみを処理します。レコードが false と評価された場合、Batch Step はそのレコードをスキップして次のレコードでこの属性を使用します。つまり、受け入れ式で false に解決されるレコードは、Mule では​除外​されます。

次の例は、年齢が 21 歳未満のすべてのレコードを除外し、Batch Step はそれらのレコードを処理しません。

<batch:job jobName="batchJob">
	<batch:process-records >
		<batch:step name="adultsOnlyStep" acceptExpression="#[payload.age > 21]">
			...
		</batch:step>
	</batch:process-records>
 </batch:job>

Batch Step の acceptPolicy 属性を使用して、受け入れポリシー属性の値に対して true と評価されたレコードのみを処理します。受け入れポリシーに使用できる値のリストについては、次の表を参照してください。

Accept Policy (受け入れポリシー) TRUE と評価された場合

NO_FAILURES

デフォルト
Batch Step は、先行するすべてのステップで処理に​成功​したレコードのみを処理します。

ONLY_FAILURES

Batch Step は、先行する Batch Step で処理に​失敗​したレコードのみを処理します。

ALL (すべて)

Batch Step は、先行する Batch Step で処理に失敗したかどうかに関係なく、すべてのレコードを処理します。

Batch Step に検索条件を適用しない場合、バッチは先行するすべてのステップで処理に​成功​したレコードのみを処理します。つまり、すべての Batch Step に適用されるデフォルトの Accept Policy (受け入れポリシー) は ​NO_FAILURES​ です。

次の例は、先行するステップで処理に失敗したレコードのみを処理する Batch Job の 2 番目の Batch Step を示しています。最初の Batch Step では、ランタイムは各レコードに既存の Salesforce 取引先責任者があるかどうかを確認し、各レコードの取引先責任者を作成する 2 番目の Batch Step は、失敗したレコード (既存の取引先責任者がなかったレコード) のみを処理します。

<batch:job jobName="batchJob">
	<batch:process-records >
		<batch:step name="batchStep1">
			...
		</batch:step>
		<batch:step name="batchStep2" accept-policy="ONLY_FAILURES">
			...
		</batch:step>
	</batch:process-records>
 </batch:job>

各 Batch Job には、Batch Job で受け入れる失敗したレコード数を制御する ​maxFailedRecords​ 属性があります。
Batch Job インスタンスが ​maxFailedRecords​ 値を超えると、Batch Step で設定された検索条件に関係なく、ステップはレコードを処理せず、失敗した Batch Job インスタンスを On Complete (完了時) フェーズに転送します。
詳細は、​「Batch Job 中のエラー処理」​を参照してください。

検索条件の特性

  • バッチ検索条件は Batch Step にのみ適用され、Batch Job のバッチ処理フェーズ内でのみ使用可能。Input (入力) または On Complete (完了時) フェーズで検索条件を適用することはできません。

  • Batch Step に検索条件を適用しない場合、バッチは先行するすべてのステップで処理に​成功​したレコードのみを処理します。つまり、すべての Batch Step に適用されるデフォルトの Accept Policy (受け入れポリシー) は ​NO_FAILURES​ です。

  • Batch Job インスタンスが ​max-failed-records​ 値を超えると、Batch Step で設定された検索条件に関係なく、ステップはレコードを処理せず、失敗した Batch Job インスタンスを On Complete (完了時) フェーズに転送します。

  • 両方の種別の検索条件を適用する場合、Mule はそれらを次の順序で評価します。

    1. Accept Policy (受け入れポリシー)

    2. Accept Expression (受け入れ式)

Batch Aggregator

Batch Aggregator スコープを使用して、Batch Step からレコードのサブセットを蓄積し、それらを外部ソースまたはサービスに一括更新/挿入できます。
たとえば、各リード (レコード) を Salesforce にバッチで更新/挿入する代わりに、200 件のレコードを蓄積するように Batch Commit を設定して、そのすべてを 1 つのチャンクで Salesforce に更新/挿入できます。

<batch:step name="Step2">
	<batch:aggregator size="200">
     <salesforce:create type="Lead" .../>
	</batch:aggregator>
</batch:step>

レコードをストリーミングするように Batch Aggregator スコープを設定することもできます。

<batch:step name="Strep2">
	<batch:aggregator streaming="true">
     <salesforce:create type="Lead" .../>
	</batch:aggregator>
</batch:step>

固定量のレコードの処理、およびすべてのレコードのストリーミングは相互に排他的な設定です。詳細は、以下のそれぞれのセクションを参照してください。

Batch Aggregator は可変です。つまり、Batch Aggregator でグループ化されたレコードのペイロードと変数にアクセスできます。
固定量のレコードを集約するときは、各レコードに順番にアクセスすることも、変更するランダムレコードを指定することもできます。
ただし、コンテンツをストリーミングするように Batch Aggregator を設定した場合、それらのレコードには順番にアクセスすることしかできません。

Aggregator を使用するときには、​ImmutableMap​ などの Guava データ型を処理しようとしないでください。代わりに、Java Map を使用してシリアル化の問題を回避します。

固定サイズを使用したレコードの集約

Batch Aggregator スコープ内のレコードの固定サイズグループを処理するように、Batch Aggregator スコープを設定できます。

たとえば、一度に 100 件のレコードを更新/挿入するように Batch Aggregator スコープを設定できます。

<batch:job jobName="batchJob">
	<batch:process-records >
		<batch:step name="batchStep">
			<batch:aggregator size="100">
				...
			</batch:aggregator>
		</batch:step>
	</batch:process-records>
</batch:job>

固定サイズのアグリゲーターを使用する場合、各レコードのペイロードと変数データの置換、変更、または保存を実行できます。

上述のように Batch Aggregator は可変であるため、foreach スコープを追加することで固定サイズのアグリゲーターブロックを反復処理し、各レコードのデータを順番に調べて各レコードのペイロードと変数を永続的に保存できます。Batch Aggregator 内でレコードにアクセスするこの方式は、順次アクセスと呼ばれます。
たとえば、Groovy Scripting Module を使用してペイロードを変更し、収集したレコードごとに変数を作成できます。

<batch:job jobName="batchJob">
	<batch:process-records>
		<batch:step name="batchStep">
			<batch:aggregator doc:name="batchAggregator" size="10">
				<foreach doc:name="For Each">
					<script:execute engine="groovy">
			    	<script:code>
			        		vars['marco'] = 'polo'
							    vars['record'].payload = 'foo'
			    	</script:code>
					</script:execute>
				</foreach>
			</batch:aggregator>
		</batch:step>
	</batch:process-records>
</batch:job>

順次アクセス方式は、次の条件を前提としています。

  1. アグリゲーターのサイズが集約されたレコードの量と一致する。

  2. 集約されたレコードとリスト内の項目の間に直接の相関関係がある。

foreach の反復回数を指定することで、ランダムレコードにアクセスすることもできます。これにより、すべてのレコードを反復処理する必要がなくなります。
この foreach スコープは ​records​ 変数を公開します。この変数は、反復を追跡するために foreach によって使用される不変リストであり、Batch Aggregator 全体でアクセス可能なランダムアクセスリストを提供します。

各レコードに順番にアクセスする代わりに、レコードリストで任意のインデックス番号を指定することで、上記の例と同じ結果を得ることができます。たとえば、次に示すように変数を作成して最初のレコードのペイロードを変更できます。

<batch:job jobName="batchJob">
	<batch:process-records>
		<batch:step name="batchStep">
			<batch:aggregator doc:name="batchAggregator" size="10">
				<foreach doc:name="For Each">
					<script:execute engine="groovy">
			    	<script:code>
			        	records[0].vars['marco'] = 'polo'
						    records[0].vars['record'].payload = 'foo'
			    	</script:code>
					</script:execute>
				</foreach>
			</batch:aggregator>
		</batch:step>
	</batch:process-records>
</batch:job>

ランダムアクセスを使用すると、コミットブロック内の特定のインデックス位置でレコードのペイロードを変更できます。

ブロックサイズを定義する場合の考慮事項

従来のオンライン処理モデルでは、各要求は通常、ワーカースレッドにマップされます。処理種別 (同期、非同期、一方向、要求 - 応答、または要求が処理前に一時的にバッファされる場合) に関係なく、サーバーは通常、要求と実行中のスレッドの間で 1 対 1 の関係になります。
Batch Job の場合、Process (処理) フェーズが始まる前にすべてのレコードが最初に永続的なキューに保存されるため、従来のスレッドモデルは適用されません。

パフォーマンスを向上させるため、ランタイムはバッチレコードをスレッドごとに最大 100 件のレコードのブロックでキューに追加し、スケジュールします。この動作により I/O 要求数が削減され、操作の負荷が改善されます。Batch Job は Mule Runtime Engine のスレッドプールを使用するため、ジョブのデフォルトはありません。各スレッドはそのブロックを反復処理して各レコードを処理し、その後各ブロックはキューに戻され、プロセスが続行されます。

3 ステップの Batch Job では、キューに 100 万件のレコードを追加することを検討してください。Mule Runtime Engine がジョブのフェーズ間を移動するときに各レコードの取得と要求を行うと、少なくとも 300 万回の I/O 操作が発生します。
パフォーマンスには、スレッドを並行して処理するのに十分な使用可能メモリが必要です。つまり、レコードを永続的なストレージから RAM に移動します。レコードとその数量が大きいほど、バッチ処理に必要な使用可能メモリが多くなります。

Batch Job の 1 スレッドあたり最大 100 件のレコードの標準モデルは、ほとんどのユースケースで機能しますが、ブロックサイズの増減が必要な次の 3 つのユースケースを考慮してください。

  • Batch Job で処理するレコードが 200 件あるとします。デフォルトの 100 件のレコードのブロックサイズでは、Mule が同時に並行処理できるレコード数は 2 件のみです。Batch Job で処理するレコードが 101 件未満の場合、順次処理になります。負荷の高いペイロードを処理する必要がある場合、100 件のレコードをキューに追加するには大量の作業メモリが必要です。

  • 画像を処理する必要がある Batch Job があり、平均画像サイズは 3 MB だとします。この場合、Mule は、各スレッドで 3 MB のペイロードを持つ 100 件のレコードのブロックを処理します。したがって、デフォルトの threading-profile 設定では、ブロックをキューで保持するためだけに大量の作業メモリが必要になります。この場合、より低いブロックサイズを設定して、より多くのジョブを介して各ペイロードを分散し、使用可能メモリの負荷を減らします。

  • 500 万件のレコードがあり、ペイロードが非常に小さいためにメモリ内に 500 件のレコードのブロックを問題なく入れることができるとします。より大きなブロックサイズを設定すると、作業メモリの負荷を犠牲にすることなく Batch Job の時間を短縮できます。

この機能を最大限に活用するには、ブロックサイズが Batch Job に与える影響を理解する必要があります。異なる値とテストパフォーマンスを使用して比較テストを実行すると、この変更を本番に移行する前に最適なブロックサイズを見つけるために役立ちます。

バッチブロックサイズの変更は省略可能です。変更を適用しない場合、デフォルト値はブロックあたり 100 件のレコードです。

Batch Aggregator コンポーネントを使用してサイズを設定します。次に例を示します。

<batch:aggregator size="100">
 ...
</batch:aggregator>

Batch Aggregator でのレコードのストリーミング

コンテンツをストリーミングするように Batch Aggregator スコープを設定できます。
レコードをストリーミングするように Batch Aggregator を設定すると、レコードの数や大きさに関係なく、ジョブインスタンス内のすべてのレコードを集約できます。

固定サイズの Batch Aggregator で受信する要素のリストの代わりに、ストリーミング機能を使用して、メモリ不足になることなくジョブインスタンス内のすべてのレコードを受信できます。

たとえば、CSV ファイルに数百万件のレコードを書き込む必要がある場合、レコードをストリーミング Batch Aggregator として処理できます。

<batch:job jobName="batchJob">
	<batch:process-records >
		<batch:step name="batchStep">
			<batch:aggregator streaming="true">
				<file:write path="reallyLarge.csv">
					<file:content><![CDATA[%dw 2.0
						...

					}]]></file:content>
			</batch:aggregator>
		</batch:step>
	</batch:process-records>
</batch:job>

この Batch Aggregator はストリーミングであるため、そのコンテンツへの順次アクセスのみが可能です。

<batch:job jobName="batchJob">
	<batch:process-records>
		<batch:step name="batchStep">
			<batch:aggregator doc:name="batchAggregator" streaming="true">
				<foreach doc:name="For Each">
					<script:execute engine="groovy">
						<script:code>
              vars['marco'] = 'polo'
							vars['record'].payload = 'foo'
						</script:code>
					</script:execute>
				</foreach>
			</batch:aggregator>
		</batch:step>
	</batch:process-records>
</batch:job>

メモリの制限により、ストリーミングアグリゲーターではランダムアクセスはサポートされていません。
ランダムアクセス用のレコードペイロードは ​immutable List​ として公開され、ストリーミングアグリゲーターは、固定コミットサイズなしにレコードセット全体にアクセスできることを意味するため、ランタイムはすべてのレコードがメモリ内に入ることを保証できません。

ヒント

  • SaaS プロバイダーからのストリーミング:​ 通常、SaaS プロバイダーにはストリーミング入力の受け入れに対する制限があるため、Anypoint Connector を介して Salesforce などの SaaS プロバイダーにデータを送信する場合、バッチストリーミングは使用しません。CSV、JSON、XML などのファイルに書き込む場合は、ストリーミングバッチ処理を使用します。

  • バッチストリーミングおよびパフォーマンス:​ ストリーミングデータのバッチ処理はアプリケーションのパフォーマンスに影響し、トランザクションの処理速度が遅くなります。パフォーマンスは低下しますが、ストリーミングデータをバッチ処理できるようにするトレードオフにより、実装でこの使用が必要になる可能性があります。

  • バッチストリーミングおよび項目へのアクセス:​ バッチストリーミングを使用する最大の欠点は、出力の項目へのアクセスが制限されることです。つまり、​固定サイズのコミット​では、変更できないリストが作成されるため、その項目にアクセスして反復処理できます。​ストリーミングコミット​では、1 つずつの読み取りで前方移動のみのイテレーターを使用することになります。

Batch Aggregator の特性

  • Batch Aggregator スコープは Batch Step にのみ適用され、Batch Job のバッチ処理フェーズ内でのみ使用可能。Batch Job の On Complete (完了時) フェーズでは Batch Aggregator を使用できません。

  • アグリゲーターは、その Batch Step 内で最後の要素のみをラップ可能。

  • いくつかの ​Anypoint Connector​ は、バッチ集約全体 (更新/挿入) を失敗させることなくレコードレベルのエラーを処理可能。
    実行時に、これらのコネクタは対象リソースによって正常に受け入れられたレコード、および更新/挿入できなかったレコードを追跡します。したがって、レコードのグループ全体を失敗させる代わりに、できるだけ多くのレコードを更新/挿入し、通知用に失敗を追跡します。これらのコネクタの一部を次に示します。

    • Salesforce

    • NetSuite

      使用しているコネクタがレコードレベルのエラーをサポートすることを確認するには、コネクタのドキュメントを参照してください。

  • Batch Aggregator スコープは、ジョブインスタンス全体のトランザクションをサポートしない。各レコードを個別のトランザクションで処理する Batch Step 内にトランザクションを定義できます。これはステップ内のステップと考えてください。
    このようなトランザクションは、ステップの境界内で開始および終了する必要があります。

  • Batch Step とそのステップ内に存在する Batch Aggregator の間でトランザクションは共有できない。Batch Step が開始するトランザクションは、Batch Aggregator が処理を開始する前に終了します。つまり、トランザクションは Batch Step とそれに含まれる Batch Aggregator スコープの間の境界を越えることはできません。

集約されたレコードの MIME タイプの保持

Mule 4.3 でサポート

集約されたレコードは、各レコードのペイロードが含まれる配列としてアグリゲーターに渡されます。ただし、デフォルトではそれらのペイロードに関連付けられている MIME タイプは保持されません。Batch Aggregator の ​preserveMimeTypes​ 属性を指定して、レコードの MIME タイプを保持できます。

たとえば、次の JSON ドキュメントについて考えます。

[
	{
		"name": "Tony Stark",
		"alias": "Iron Man",
		"status": "DEAD"
	},
	{
		"name": "Steve Rodgers",
		"alias": "Captain America",
		"status": "RETIRED"
	},
	{
		"name": "Peter Parker",
		"alias": "SpiderMan",
		"status": "FUGITIVE"
	}
]

このドキュメントを次のジョブに取り込むとします。

<batch:job name="avengersLogger">
	<batch:process-records>
		<batch:step name="log">
			<batch:aggregator size="10">
				<foreach>
					<logger message="Agent #[payload.alias] is #[payload.status]" />
				</foreach>
			</batch:aggregator>
		</batch:step>
	</batch:process-records>
</batch:job>

バッチエンジンは入力 JSON 配列を個々のレコードに分割します。つまり、アグリゲーターブロックで 3 つの要素が含まれる配列を受信します。1 つ目は次のようになります。

{
 "name": "Tony Stark",
 "alias": "Iron Man",
 "status": "DEAD"
}

ただし、logger 要素で ​#[payload.alias]​ 式を評価しようとすると、次のようなエラーが発生します。

********************************************************************************
Message               : "You called the function 'Value Selector' with these arguments:
  1: Binary ("ewogICJmaXJzdE5hbWUiOiAiUmFtIiwKICAibGFzdE5hbWUiOiAiUmFtMSIsCiAgImFkZHJlc3Mi...)
  2: Name ("alias")

But it expects one of these combinations:
  (Array, Name)
  (Array, String)
  (Date, Name)
  (DateTime, Name)
  (LocalDateTime, Name)
  (LocalTime, Name)
  (Object, Name)
  (Object, String)
  (Period, Name)
  (Time, Name)

5|                                         name: payload.alias,
                                                 ^^^^^^^^^^^^^

前のエラーは、デフォルトでは MIME タイプが保持されず、Mule でこのレコードが実際に JSON であるかどうかを把握できないことが原因で発生します。これを修正するには、Batch Aggregator の ​preserveMimeTypes​ 属性を指定します。

<batch:aggregator size="10" preserveMimeTypes="true">
	<foreach>
	   <logger message="Agent #[payload.alias] is #[payload.status]" />
	</foreach>
</batch:aggregator>

この属性を設定すると、Mule で自動的に各レコードのメディア種別が保持され、ペイロードが実際に JSON ドキュメントを表していることを把握できます。

最大同時実行の制限

Max Concurrency (最大同時実行) (​maxConcurrency​) プロパティでは、バッチジョブが同時に処理できるブロックの数が制限されます。ブロック内のレコードは順番に処理されます。

maxConcurrency​ プロパティを次の例のように設定できます。

<batch:job jobName="test-batch" maxConcurrency="${batch.max.concurrency}">
  ...
</batch:job>

デフォルトでは、Batch Job コンポーネントの最大同時実行は使用可能なコアの数の 2 倍に制限されます。Mule インスタンスを実行しているシステムの容量によっても、同時実行が制限されます。