Batch Step 処理の絞り込み

Batch Step が処理するレコードに対して実行する作業を絞り込むことができます。

  • 一部のレコードの処理のみを受け入れるように、Batch Step に​検索条件​を設定できます。

  • レコードをグループに​集約​し、それらを一括更新として外部ソースまたはサービスに送信できます。

このドキュメントでは、バッチ検索条件とバッチコミットの使用方法およびそのタイミングについて説明します。

バッチ検索条件

1 つ以上の検索条件を属性として任意の数の Batch Step に適用できます。
レコードに Salesforce 取引先責任者が存在するかどうかを最初の Batch Step で確認するバッチジョブと、既存の各 Salesforce 取引先責任者を新しい情報で更新する 2 番目の Batch Step があるとします。2 番目の Batch Step に検索条件を適用し、最初の Batch Step で失敗しなかったレコードのみを処理するように設定できます。
Batch Step で一部のレコード処理のみを受け入れるようにすることで、Batch Job を合理化し、ランタイムが特定の Batch Step で関連データのみに集中できるようにします。

Batch Step は、レコードの絞り込みに次の 2 つの属性を使用します。
  • acceptExpression

  • acceptPolicy

各 Batch Step では、レコードの絞り込みに 1 つの acceptExpression 属性と 1 つの acceptPolicy 属性を使用できます。

acceptExpression 属性を使用して、true と評価されたレコードのみを処理します。レコードが false と評価された場合、Batch Step はそのレコードをスキップして次のレコードでこの属性を使用します。つまり、受け入れ式で false に解決されるレコードは、Mule では​除外​されます。

次の例は、年齢が 21 歳未満のすべてのレコードを除外し、Batch Step はそれらのレコードを処理しません。

<batch:job jobName="batchJob">
	<batch:process-records >
		<batch:step name="adultsOnlyStep" acceptExpression="#[payload.age > 21]">
			...
		</batch:step>
	</batch:process-records>
 </batch:job>

Batch Step の acceptPolicy 属性を使用して、受け入れポリシー属性の値に対して true と評価されたレコードのみを処理します。受け入れポリシーに使用できる値のリストについては、次の表を参照してください。

Accept Policy (受け入れポリシー) TRUE と評価された場合

NO_FAILURES

Default
Batch Step は、先行するすべてのステップで処理に 成功 したレコードのみを処理します。

ONLY_FAILURES

Batch Step は、先行する Batch Step で処理に 失敗 したレコードのみを処理します。

ALL (すべて)

Batch Step は、先行する Batch Step で処理に失敗したかどうかに関係なく、すべてのレコードを処理します。

Batch Step に検索条件を適用しない場合、バッチは先行するすべてのステップで 成功 したレコードのみを処理します。つまり、すべての Batch Step に適用されるデフォルトの Accept Policy (受け入れポリシー) は NO_FAILURES です。

次の例は、先行するステップで処理に失敗したレコードのみを処理する Batch Job の 2 番目の Batch Step を示しています。最初の Batch Step では、ランタイムは各レコードに既存の Salesforce 取引先責任者があるかどうかを確認し、各レコードの取引先責任者を作成する 2 番目の Batch Step は、失敗したレコード (既存の取引先責任者がなかったレコード) のみを処理します。

<batch:job jobName="batchJob">
	<batch:process-records >
		<batch:step name="batchStep1">
			...
		</batch:step>
		<batch:step name="batchStep2" accept-policy="ONLY_FAILURES">
			...
		</batch:step>
	</batch:process-records>
 </batch:job>

各 Batch Job には、Batch Job で受け入れる失敗したレコード数を制御する maxFailedRecords 属性があります。
Batch Job インスタンスが maxFailedRecords 値を超えると、Batch Step で設定された検索条件に関係なく、ステップはレコードを処理せず、失敗した Batch Job インスタンスを On Complete (完了時) フェーズに転送します。
詳細は、「Batch Job 中のエラー処理」を参照してください。

検索条件の特性

  • バッチ検索条件は Batch Step にのみ適用され、Batch Job のバッチ処理フェーズ内でのみ使用可能。Input (入力) または On Complete (完了時) フェーズで検索条件を適用することはできません。

  • Batch Step に検索条件を適用しない場合、バッチは先行するすべてのステップで処理に 成功 したレコードのみを処理する。つまり、すべての Batch Step に適用されるデフォルトの Accept Policy (受け入れポリシー) は NO_FAILURES です。

  • Batch Job インスタンスが max-failed-records 値を超えると、Batch Step で設定された検索条件に関係なく、ステップはレコードを処理せず、失敗した Batch Job インスタンスを On Complete (完了時) フェーズに転送します。

  • 両方の種別の検索条件を適用する場合、Mule はそれらを次の順序で評価します。

    1. Accept Policy (受け入れポリシー)

    2. Accept Expression (受け入れ式)

Batch Aggregator

Batch Aggregator スコープを使用して、Batch Step からレコードのサブセットを蓄積し、それらを外部ソースまたはサービスに一括更新/挿入できます。
たとえば、各リード (レコード) を Salesforce にバッチで更新/挿入する代わりに、200 件のレコードを蓄積するように Batch Commit を設定して、そのすべてを 1 つのチャンクで Salesforce に更新/挿入できます。

<batch:step name="Step2">
	<batch:aggregator size="200">
     <salesforce:create type="Lead" .../>
	</batch:aggregator>
</batch:step>

レコードをストリーミングするように Batch Aggregator スコープを設定することもできます。

<batch:step name="Strep2">
	<batch:aggregator streaming="true">
     <salesforce:create type="Lead" .../>
	</batch:aggregator>
</batch:step>

固定量のレコードの処理、およびすべてのレコードのストリーミングは相互に排他的な設定です。詳細は、以下のそれぞれのセクションを参照してください。

Batch Aggregator は可変です。つまり、Batch Aggregator でグループ化されたレコードのペイロードと変数にアクセスできます。
固定量のレコードを集約するときは、各レコードに順番にアクセスすることも、変更するランダムレコードを指定することもできます。
ただし、コンテンツをストリーミングするように Batch Aggregator を設定した場合、それらのレコードには順番にアクセスすることしかできません。

固定サイズを使用したレコードの集約

Batch Aggregator スコープ内のレコードの固定サイズグループを処理するように、Batch Aggregator スコープを設定できます。

たとえば、一度に 100 件のレコードを更新/挿入するように Batch Aggregator スコープを設定できます。

<batch:job jobName="batchJob">
	<batch:process-records >
		<batch:step name="batchStep">
			<batch:aggregator size="100">
				...
			</batch:aggregator>
		</batch:step>
	</batch:process-records>
</batch:job>

固定サイズのアグリゲータを使用する場合、各レコードのペイロードと変数データの置換、変更、または保存を実行できます。

上述のように Batch Aggregator は可変であるため、foreach スコープを追加することで固定サイズのアグリゲータブロックを反復処理し、各レコードのデータを順番に調べて各レコードのペイロードと変数を永続的に保存できます。Batch Aggregator 内でレコードにアクセスするこの方式は、順次アクセスと呼ばれます。
たとえば、Groovy スクリプティングモジュールを使用してペイロードを変更し、収集したレコードごとに変数を作成できます。

<batch:job jobName="batchJob">
	<batch:process-records>
		<batch:step name="batchStep">
			<batch:aggregator doc:name="batchAggregator" size="10">
				<foreach doc:name="For Each">
					<script:execute engine="groovy">
			    	<script:code>
			        		vars['marco'] = 'polo'
							    vars['record'].payload = 'foo'
			    	</script:code>
					</script:execute>
				</foreach>
			</batch:aggregator>
		</batch:step>
	</batch:process-records>
</batch:job>

順次アクセス方式は、次の条件を前提としています。

  1. アグリゲータのサイズが集約されたレコードの量と一致する。

  2. 集約されたレコードとリスト内の項目の間に直接の相関関係がある。

foreach の反復回数を指定することで、ランダムレコードにアクセスすることもできます。これにより、すべてのレコードを反復処理する必要がなくなります。
この foreach スコープは records 変数を公開します。この変数は、反復を追跡するために foreach によって使用される不変リストであり、Batch Aggregator 全体でアクセス可能なランダムアクセスリストを提供します。

各レコードに順番にアクセスする代わりに、レコードリストで任意のインデックス番号を指定することで、上記の例と同じ結果を得ることができます。たとえば、次に示すように変数を作成して最初のレコードのペイロードを変更できます。

<batch:job jobName="batchJob">
	<batch:process-records>
		<batch:step name="batchStep">
			<batch:aggregator doc:name="batchAggregator" size="10">
				<foreach doc:name="For Each">
					<script:execute engine="groovy">
			    	<script:code>
			        	records[0].vars['marco'] = 'polo'
						    records[0].vars['record'].payload = 'foo'
			    	</script:code>
					</script:execute>
				</foreach>
			</batch:aggregator>
		</batch:step>
	</batch:process-records>
</batch:job>

ランダムアクセスを使用すると、コミットブロック内の特定のインデックス位置でレコードのペイロードを変更できます。

ブロックサイズを定義する場合の考慮事項

従来のオンライン処理モデルでは、各要求は通常、ワーカースレッドにマップされます。処理種別 (同期、非同期、一方向、要求 - 応答、または要求が処理前に一時的にバッファされる場合) に関係なく、サーバは通常、要求と実行中のスレッドの間で 1 対 1 の関係になります。
Batch Job の場合、Process (処理) フェーズが始まる前にすべてのレコードが最初に永続的なキューに保存されるため、従来のスレッドモデルは適用されません。

パフォーマンスを向上させるため、ランタイムはバッチレコードを 100 件のレコードのブロックでキューに追加し、スケジュールします。これにより I/O 要求数が削減され、操作の負荷が改善されます。
ランタイムのデフォルトのスレッドプロファイルは、ジョブあたり 16 スレッドです。したがって、デフォルトの設定済み Batch Job では、16 スレッドのそれぞれが 100 件のレコードのブロックを処理します。
各スレッドは各レコードを処理するそのブロックを反復処理し、その後各ブロックはキューに戻され、プロセスが続行されます。

3 ステップの Batch Job では、キューに 100 万件のレコードを追加することを検討してください。ランタイムがジョブのフェーズ間を移動するときに各レコードの取得と要求を行うと、少なくとも 300 万回の I/O 操作が発生します。
パフォーマンスには、16 スレッドを並行して処理するのに十分な使用可能メモリが必要です。つまり、1600 件のレコードを永続的なストレージから RAM に移動します。レコードとその数量が大きいほど、バッチ処理に必要な使用可能メモリが多くなります。

Batch Job あたり 100 件のレコードを含む 16 スレッドの標準モデルは、ほとんどのユースケースで機能しますが、ブロックサイズの増減が必要な可能性がある次の 3 つのユースケースを考慮してください。

  • Batch Job で処理するレコードが 200 件あるとします。デフォルトの 100 件のレコードのブロックサイズでは、Mule が同時に並行処理できるレコード数は 2 件のみです。101 件未満のレコードを要求した場合、順次処理になります。負荷の高いペイロードを処理する必要がある場合、100 件のレコードをキューに追加するには大量の作業メモリが必要です。

  • 画像を処理する必要がある Batch Job があり、平均画像サイズは 3 MB だとします。3 MB のペイロードを含む 100 ブロックがあり、16 スレッドで処理されます。したがって、デフォルトの threading-profile 設定では、ブロックをキューで保持するためだけに約 4.6 GB の作業メモリが必要になります。より多くのジョブを介して各ペイロードを分散し、使用可能メモリの負荷を減らすために、より低いブロックサイズを設定する必要があります。

  • ペイロードに 500 万件のレコードが含まれ、メモリ内に 500 件のレコードのブロックを問題なく入れることができるとします。より大きなブロックサイズを設定すると、作業メモリの負荷を犠牲にすることなく Batch Job の時間を短縮できます。

この機能を最大限に活用するには、ブロックサイズが Batch Job に与える影響を理解する必要があります。異なる値とテストパフォーマンスを使用して比較テストを実行すると、この変更を本番に移行する前に最適なブロックサイズを見つけるために役立ちます。

バッチブロックサイズの変更は省略可能です。変更を適用しない場合、デフォルト値はブロックあたり 100 件のレコードです。

Batch Aggregator でのレコードのストリーミング

コンテンツをストリーミングするように Batch Aggregator スコープを設定できます。
レコードをストリーミングするように Batch Aggregator を設定すると、レコードの数や大きさに関係なく、ジョブインスタンス内のすべてのレコードを集約できます。

固定サイズの Batch Aggregator で受信する要素のリストの代わりに、ストリーミング機能を使用して、メモリ不足になることなくジョブインスタンス内のすべてのレコードを受信できます。

たとえば、CSV ファイルに数百万件のレコードを書き込む必要がある場合、レコードをストリーミング Batch Aggregator として処理できます。

<batch:job jobName="batchJob">
	<batch:process-records >
		<batch:step name="batchStep">
			<batch:aggregator streaming="true">
				<file:write path="reallyLarge.csv">
					<file:content><![CDATA[%dw 2.0
						...

					}]]></file:content>
			</batch:aggregator>
		</batch:step>
	</batch:process-records>
</batch:job>

この Batch Aggregator はストリーミングであるため、そのコンテンツへの順次アクセスのみが可能です。

<batch:job jobName="batchJob">
	<batch:process-records>
		<batch:step name="batchStep">
			<batch:aggregator doc:name="batchAggregator" streaming="true">
				<foreach doc:name="For Each">
					<script:execute engine="groovy">
						<script:code>
              vars['marco'] = 'polo'
							vars['record'].payload = 'foo'
						</script:code>
					</script:execute>
				</foreach>
			</batch:aggregator>
		</batch:step>
	</batch:process-records>
</batch:job>

メモリの制限により、ストリーミングアグリゲータではランダムアクセスはサポートされていません。
ランダムアクセス用のレコードペイロードは immutable List として公開され、ストリーミングアグリゲータは、固定コミットサイズなしにレコードセット全体にアクセスできることを意味するため、ランタイムはすべてのレコードがメモリ内に入ることを保証できません。

ヒント

  • SaaS プロバイダからのストリーミング: 通常、SaaS プロバイダにはストリーミング入力の受け入れに対する制限があるため、Anypoint コネクタを介して Salesforce などの SaaS プロバイダにデータを送信する場合、バッチストリーミングは使用しません。CSV、JSON、XML などのファイルに書き込む場合は、ストリーミングバッチ処理を使用します。

  • バッチストリーミングおよびパフォーマンス: ストリーミングデータのバッチ処理はアプリケーションのパフォーマンスに影響し、トランザクションの処理速度が遅くなります。パフォーマンスは低下しますが、ストリーミングデータをバッチ処理できるようにするトレードオフにより、実装でこの使用が必要になる可能性があります。

  • バッチストリーミングおよび項目へのアクセス: バッチストリーミングを使用する最大の欠点は、出力の項目へのアクセスが制限されることです。つまり、​固定サイズのコミット​では変更できないリストが作成されるため、その項目にアクセスして反復処理できます。​ストリーミングコミット​では、1 つずつの読み取りで前方移動のみのイテレータを使用することになります。

Batch Aggregator の特性

  • Batch Aggregator スコープは Batch Step にのみ適用され、Batch Job のバッチ処理フェーズ内でのみ使用可能。Batch Job の On Complete (完了時) フェーズでは Batch Aggregator を使用できません。

  • アグリゲータは、その Batch Step 内で最後の要素のみをラップ可能。

  • いくつかの Anypoint コネクタ​は、バッチ集約全体 (更新/挿入) を失敗させることなくレコードレベルのエラーを処理可能。
    実行時に、これらのコネクタは対象リソースによって正常に受け入れらたレコード、および更新/挿入できなかったレコードを追跡します。したがって、レコードのグループ全体を失敗させる代わりに、できるだけ多くのレコードを更新/挿入し、通知用に失敗を追跡します。これらのコネクタの一部を次に示します。

    • Salesforce

    • NetSuite

    • データベース

      使用しているコネクタがレコードレベルのエラーをサポートすることを確認するには、コネクタのドキュメントを参照してください。

  • Batch Aggregator スコープは、ジョブインスタンス全体のトランザクションをサポートしない。各レコードを個別のトランザクションで処理する Batch Step 内にトランザクションを定義できます。これはステップ内のステップと考えてください。
    このようなトランザクションは、ステップの境界内で開始および終了する必要があります。

  • Batch Step とそのステップ内に存在する Batch Aggregator の間でトランザクションは共有できない。Batch Step が開始するトランザクションは、Batch Aggregator が処理を開始する前に終了します。つまり、トランザクションは Batch Step とそれに含まれる Batch Aggregator スコープの間の境界を越えることはできません。

Was this article helpful?

💙 Thanks for your feedback!