バッチコンポーネントリファレンス

Mule バッチコンポーネントでは、デフォルトのコンポーネント設定に対する調整を最小限に抑えてレコードのバッチ処理を管理します。コンポーネントは、処理およびパフォーマンスに関する特定の要件に合わせて高度に設定可能です。

Batch Job コンポーネントには 1 つ以上の Batch Step コンポーネントが含まれている必要があります。必要に応じて、各 Batch Step コンポーネントには 1 つの Batch Aggregator コンポーネントを含めることができます。次の図の Mule フローはバッチコンポーネントを示しています。

Studio 内の Mule フローのバッチ処理コンポーネント

Batch Job コンポーネントでは Mule メッセージのペイロードをレコードに分割します。Batch Step コンポーネントおよび Batch Aggregator コンポーネント内で、これらのレコードはキーワード ​payload​ で使用でき、Mule 変数は ​vars​ キーワードを使用してアクセスすることもできます。ただし、Mule 属性はこれらのコンポーネント内からアクセスできません。どちらのコンポーネントも ​attributes​ で ​null​ を返します。

このリファレンスでは、読者が Batch Job インスタンスの​バッチ処理フェーズ​に精通していることを前提としています。

Batch Job コンポーネント (<batch:job />)

このコンポーネントでは、処理用の入力を準備し、その処理の結果が記載されたレポートを出力します。準備は Batch Job インスタンスの ​Load and Dispatch​ フェーズ中に行われ、レポートは ​On Complete​ フェーズで使用できます。

Studio では、これらのプロパティをコンポーネントの ​[General (一般)]​ タブと ​[History (履歴)]​ タブで設定できます。

Batch Job のプロパティ

Batch Job コンポーネントには、バッチ処理の実行方法を決定する設定可能な多数のプロパティが用意されています。また、入力ペイロードをフローの後続のコンポーネントで使用できるように対象変数に格納することもできます。Batch Job コンポーネントはメッセージペイロードをコンシュームするため、これは便利な可能性があります。

Anypoint Studio の ​[General (一般)]​ タブには次の項目があります。

Studio の Batch Job の項目

次の表に、各項目と項目の XML 属性を示します。

Field Name (項目名) XML 説明

Name (名前)

name

Batch Job コンポーネントの設定可能な名前。デフォルトの名前はアプリケーションの名前に ​Batch_Job​ を追加したものです。

Max Failed Records (最大失敗レコード数)

maxFailedRecords

Batch Job インスタンスの実行が停止する前にその Batch Job インスタンス内で失敗できるレコードの最大数。デフォルトは ​0​ です。無制限にする場合は、​-1​ を使用します。この設定は、Batch Job コンポーネント内の ​Batch Step コンポーネント​のどの受け入れポリシー (​acceptPolicy​) または式 (​acceptExpression​) 検索条件よりも優先されます。Batch Job インスタンスが ​maxFailedRecords​ 値を超えると、Batch Job コンポーネントでエラーが発生した Batch Job インスタンスの処理が停止され、インスタンスに状況 ​FAILED_PROCESS_RECORDS​ が割り当てられ、インスタンスで失敗したレコードの数が報告されます。Batch Job インスタンスがこのしきい値に達すると、コンソールでは次のようなメッセージが出力されます (読みやすいように編集済み)。

INFO  ... DefaultBatchEngine:
 instance '6d85c380-f332-11ec-bb5f-147ddaaf4f97' of job 'Batch_Job_in_Mule_Flow'
 has reached the max allowed number of failed records. Record will be added to
 failed list and the instance will be removed from execution pool.
...
INFO  ... DefaultBatchEngine:
 instance 6d85c380-f332-11ec-bb5f-147ddaaf4f97 of job Batch_Job_in_Mule_Flow
 has been stopped. Instance status is FAILED_PROCESS_RECORDS

Batch Job インスタンス内の並列処理により、複数のレコードで同時に失敗できるようになるため、エラー数が設定した最大値を超えることができます。

Scheduling Strategy (スケジュール戦略)

schedulingStrategy

Batch Job コンポーネントが繰り返しトリガーする場合、複数の Batch Job インスタンスを同時に実行するように準備できます。インスタンスは作成タイムスタンプに基づいて (デフォルト)、または使用可能なリソースの使用を最大化しようとするラウンドロビンアルゴリズムに従って順次実行できます。この設定は、処理順序が保証されないため、他のレコードに対する副作用や連動関係がないジョブのみに適しています。

  • ORDERED_SQUENTIAL​ (デフォルト): この設定では、Batch Job インスタンスは作成タイムスタンプで確立された順序で一度に 1 つずつ実行されます。先に作成されたインスタンスが後の日時に作成されたインスタンスの前に実行されます。

  • ROUND_ROBIN​: この設定では、すべての Batch Job インスタンスが使用可能なリソースを割り当てるラウンドロビンアルゴリズムに基づいて実行されます。実行の順序は事前に決定されません。バッチインスタンスはどの順序でも実行できます。このオプションが適しているのは、インスタンスの実行で別のインスタンスの実行に対する副作用が発生しないことが確かな場合のみです。ベストプラクティスは次のとおりです。

    • 同時に実行される Batch Job インスタンスで同じレコードが更新されるのを避けるために、この戦略をデータ同期ジョブに使用しないでください。データを同期するときにこの戦略を使用すると、事前に誤ったバージョンのデータで結果を返すことが可能になります。

    • Batch Job でデータベースから新しいレコードのみを取得する場合など、Batch Job コンポーネント内の別のレコードの処理に依存するレコードがないことが確かな場合に、この戦略を使用して Batch Job インスタンスを並列実行するようにしてください。

Job Instance ID (ジョブインスタンス ID)

jobInstanceId

バッチインスタンスの認識可能な一意の名前を提供する省略可能な式。名前を一意にするには、この項目内でたとえば ​#[jobInstanceId="#['Job From ' ++ now() as String {format: 'dd/MM/yy hh:mm:ss'}]​ などの DataWeave 式を使用する必要があります。式で同じ値が 2 回以上返される場合、Mule では ​Batch Job 'my-batch-job-example' already has an instance with id 'my-repeated-id'​ のようなメッセージでエラーがスローされます。Batch Job コンポーネントではこの値を使用して Mule フローが Batch Job コンポーネントに達するたびに新しい識別子を作成します。ID を指定しないと、Mule によって各 Batch Job インスタンスの識別子となる UUID が自動的に作成されます。

Batch Block Size (バッチブロックサイズ)

blockSize

レコードブロックあたりの処理するレコードの件数。デフォルトはブロックあたり ​100​ 件のレコードです。処理できるレコードが設定したサイズより少ない場合、Mule によってレコードプロセッサーに送信されるブロックのサイズが小さくなります。このような状況は、たとえば、1010 件のレコードがあり、ブロックサイズが 100 の場合に発生する可能性があります。これは、剰余が 0 ではなく 10 であるためです。ブロックサイズの設定により、各ブロックのレコードのサイズと ​maxConcurrency​ 設定の影響を受けるパフォーマンスを調整できます。

Max Concurrency (最大同時実行)

maxConcurrency

コンポーネント内でレコードブロックを処理するときに許可する並列性の最大レベルを設定するプロパティ。デフォルトは CPU で使用可能なコア数の 2 倍です。Mule インスタンスを実行しているシステムの容量によっても、同時実行が制限されます。​blockSize​ 設定と同様、この設定により、パフォーマンスを調整できます。

変換先

target

Batch Job コンポーネントの​​に配置されたコンポーネントの Batch Job インスタンスで受け取られたペイロードを使用するには、​target​ の一意の値を指定して、ダウンストリームコンポーネントの ​payload​ を使用してそのペイロードにアクセスします。対象値がない場合、Batch Job コンポーネントへの入力ペイロードをダウンストリームコンポーネントで使用できません。対象変数についての詳細は、対象変数を使用したデータの強化 を参照してください。デフォルト値はありません。

Batch Job 履歴 (<batch:history />)

Batch Job 履歴設定により、Batch Job インスタンスの履歴データが一時的な Mule ディレクトリで保持される期間を調整することにより、​No space left on device​ エラーを解決できます。このエラーは、空きディスクスペース、または指定したサイズの CloudHub ワーカーに対して処理するレコードの頻度が高すぎるか数が多すぎて処理できない場合に発生する可能性があります。

デフォルトでは、Batch Job インスタンスの履歴は 7 日間保持されます。この期間が経過すると、監視プロセスによりその履歴が自動的に削除されます。

Studio の Batch Job 履歴の項目

次の表に、各項目と項目の XML 属性を示します。

Name (名前) XML 説明

Max Age (最長有効期間)

maxAge

ジョブインスタンスが期限切れになる前の最長有効期間。例: expiration maxAge="10"​。デフォルト: maxAge=7​。

Time Unit (時間単位)

ageUnit

maxAge​ に適用する時間の単位。有効な値: DAYS​、​HOURS​、​MILLISECONDS​、​MINUTES​、​SECONDS​。デフォルト: ageUnit=DAYS​。

これらの項目は、​<batch:history/>​ 内に埋め込まれる ​<batch:expiration/>​ 要素に対する属性です。

<batch:job ...>
  <batch:history >
    <batch:expiration maxAge="10" ageUnit="MINUTES" />
  </batch:history>
  <batch:process-records>
    <batch:step />
  </batch:process-records>
<batch:job />

XML の例は、バッチ履歴項目の設定に焦点を絞るために編集されています。

Batch Step コンポーネント (<batch:step/>)

Batch Step コンポーネントは、Load and Dispatch フェーズや On Complete フェーズ中ではなく、Batch Job インスタンスの ​Process フェーズ​中に実行されます。

Batch Job コンポーネント内には 1 つ以上の Batch Step コンポーネントが必要です。Batch Step コンポーネントの名前を変更するだけでなく、コンポーネントがコンポーネント内の処理に受け入れるレコードを決定する​検索条件​を設定できます。コンポーネントに追加したプロセッサーは、Batch Job コンポーネントが受け入れるレコードに対するアクションを実行します。

Studio の Batch Step の項目

次の表に、各項目と項目の XML 属性を示します。

Field Name (項目名) XML 説明

Name (名前)

name

Batch Step コンポーネントの設定可能な名前。Batch Job コンポーネント内の最初の Batch Step コンポーネントでは、デフォルトは ​Batch_Step​ です。後続のステップでは、​Batch_Step1​、​Batch_Step2​、といった順序に従い、一意の番号が名前に付加されます。

Accept Expression (受け入れ式)

acceptExpression

コンポーネントでレコードを処理するかどうかを決定する検索条件の省略可能な DataWeave 式。レコードの ​acceptExpression​ ​acceptPolicy​ の​両方​の値が ​true​ に評価される場合、コンポーネントでは処理するためにレコードを受け入れます。それ以外の場合、コンポーネントではレコードをスキップし、レコードはダウンストリームの Batch Step コンポーネントで使用可能になります。例:

  • age​ 項目があるペイロードでは ​acceptExpression="#[payload.age > 21]"

  • name​ 項目があるペイロードでは ​acceptExpression="#[not isBlank(payload.name)]"​。

Accept Policy (受け入れポリシー)

acceptPolicy

ポリシーがそのレコードで ​true​ に評価される場合のみ処理するためにレコードを受け入れます。受け入れの基準を、Batch Job コンポーネント内の​先行する​ (アップストリームの) Batch Step コンポーネントでレコードの処理に成功したか失敗したかにできます。コンポーネントは、正常に処理されたレコードのみを処理する、正常に処理できなかったレコードのみを処理する、または正常に処理されたかどうかに関係なくすべてのレコードを処理するように設定できます。デフォルトは ​NO_FAILURES​ です。​acceptPolicy​ はコンポーネントのどの ​acceptExpression​ よりも前に評価され、​Batch Job コンポーネント​の ​maxFailedRecords​ 設定は ​acceptPolicy​ 設定よりも優先される点に注意してください。

  • NO_FAILURES​: デフォルト設定。レコードが先行する Batch Step コンポーネント内で処理に失敗したことがない場合、またはアップストリームの Batch Step コンポーネントがない場合にレコードを処理するために受け入れます。

  • ONLY_FAILURES​: レコードが先行する Batch Step コンポーネント内で処理に失敗した場合にレコードを処理するために受け入れます。

  • ALL​: 先行する Batch Step コンポーネント内での処理に成功したか失敗したかに関係なくレコードを処理するために受け入れます。

Batch Aggregator コンポーネント (<batch:aggregator />)

Batch Aggregator コンポーネントはレコードの配列に対してアクションを実行する省略可能なコンポーネントです。集計は Batch Job インスタンスの ​Process フェーズ​中に行われます。

Batch Aggregator コンポーネントにはデフォルトの設定はありません。このコンポーネントを使用する場合は、レコードをストリーミングするか (​streaming​)、キューから固定サイズ (​size​) の別の配列にレコードを取り込むか、どちらも行わないかを指定する必要があります。これらの設定は相互に排他的です。ストリーミングされたレコードへのランダムアクセスはできません。

Batch Aggregator コンポーネントではメッセージ ​payload​ として 1 つ以上のプロセッサーを受け入れます。ただし、​For Each​ スコープなど、アグリゲーター内のスコープを使用して各レコードに対して反復し、スコープ内の子プロセッサーが個別に各レコードに対するアクションを実行できるようにするのが一般的です。

Batch Aggregator コンポーネントにより、固定サイズ (​size​) として集計しているレコードの一部にアクセスする場合に必要な MIME タイプのレコード (​preserveMimeTypes​) を保持することもできます。​「集約されたレコードの MIME タイプの保持」​を参照してください。このコンポーネント内のレコード処理についての詳細は、​「Process フェーズ」​のバッチ集計に関する情報を参照してください。

Salesforce 用 Anypoint Connector (Salesforce Connector)​ や ​NetSuite 用 Anypoint Connector (NetSuite Connector)​ などの一部のコネクタには、バッチ集計プロセスを失敗させることなくレコードレベルのエラーを処理できる操作が用意されています。

Batch Aggregator を使用するときは、次の制限事項を考慮してください。

  • Batch Aggregator コンポーネントを使用するときには、​ImmutableMap​ などの Guava データ型を処理しようとしないでください。代わりに、Java ​Map​ を使用してシリアル化の問題を回避します。

  • Batch Aggregator コンポーネントでは、ジョブインスタンス全体の​トランザクション​がサポートされません。Batch Step コンポーネント内で、各レコードを個別のトランザクションで処理するトランザクションを定義できます。ただし、Batch Step コンポーネントによって開始されたトランザクションは、Batch Aggregator コンポーネントが処理を開始する前に終了します。トランザクションはコンポーネント間の境界を越えることができないため、コンポーネントはトランザクションを共有できます。

Studio の Batch Aggregator の項目

次の表に、各項目と項目の XML 要素を示します。

Field Name (項目名) XML 説明

Display Name (表示名)

doc:name

コンポーネントの設定可能な名前。例: doc:name="My Aggregator"​。デフォルトの名前は ​Batch Aggregator​ です。

Aggregator Size (Aggregator サイズ)

size

処理するレコードの各配列のサイズ。デフォルトのサイズはありません。コンポーネント内のプロセッサーは入力として配列を受け入れる必要があります。コンポーネントはレコードを残りがなくなるまでバッチステッピングキューから設定したサイズの配列に取り込み続けます。キューに含まれるレコードが設定したサイズより少ない場合、コンポーネントがこれらのレコードを処理するための取り込み先の配列が小さくなります。Size (サイズ) (​size​) または Streaming (ストリーミング) (​streaming​) プロパティを設定する必要がありますが、両方を設定することはできません。

この設定を使用するときは、コンポーネントは Batch Step コンポーネントがインスタンスまたはバッチのすべてのレコードの処理を完了するのを待機しません。ステッピングキューで設定した数のレコードが使用可能になると、Batch Aggregator コンポーネントはこれらのレコードを配列に取り込んで処理します。

ストリーミング

streaming

streaming=true​ の場合、コンポーネントは Batch Job インスタンスのすべてのレコードに対するアクションを実行し、インスタンスのレコード数と同じサイズの配列で結果を返します。Batch Job コンポーネントを使用してバッチサイズを設定しても、処理するレコード数または配列で返すレコード数は制限されません。たとえば、Aggregator が ​{"id": 1}​、​{"id": 2}​、といった形式の ​10​ 個のレコードのみを処理する小さな Batch Job インスタンスの数値の ID を返すとします。この場合、Batch Job コンポーネントの ​blockSize​ が ​5​ のようにインスタンスのレコード数より小さい数に設定されていたとしても、インスタンスの出力配列には ​[1, 6, 2, 7, 3, 8, 4, 9, 5, 10]​ などの 10 個の ID が含まれます。

プロセッサーはストリーミングされたレコードの配列に順次アクセスできます。Aggregator では処理されたレコードが保持されないため、ストリーミングされたレコードへのランダムアクセスはできません。

デフォルトはありません。Size (サイズ) (​size​) または Streaming (ストリーミング) (​streaming​) プロパティを設定する必要がありますが、両方を設定することはできません。

Preserve Mime Types (MIME タイプを保持)

preserveMimeTypes

デフォルトは ​false​ です。このコンポーネント内で処理されるレコードの MIME タイプを保持するには、この属性を ​true​ に設定します。 データの MIME タイプを判断できないプロセッサーは、​null​ や、​[[B@e596d5c, [B@1c4c3b32], …​]​ などの判読できないペイロードへの参照を出力として返します。​preserveMimeTypes=true​ の設定は、コンポーネントが固定数 (​size​) のレコードを処理するように設定されている場合に便利です。

集計されたレコードをストリーミングするかどうかを決定するときは、次の点を考慮してください。

  • SaaS プロバイダーでは、ストリーミング入力の受入に制約がある場合が多くあります。

  • バッチストリーミングは、CSV、JSON、XML などのファイルに書き込む場合のバッチ処理に便利な場合が多くあります。

  • レコードの配列は RAM に格納され、トランザクションの処理速度が低下する可能性があるため、バッチストリーミングはアプリケーションのパフォーマンスに影響します。パフォーマンスは低下しますが、その分、ストリーミングデータが確実に使用できるようになる可能性があります。

  • ストリーミングのコミットは 1 つずつの読み取りで前方移動のみのイテレーターであるため、ストリーミングではレコードへのランダムアクセスはできません。ただし、​For Each​ などのコンポーネントを使用した場合はストリーミング時にランダムアクセス操作を実行できます。For Each により、レコードの個別の配列が提供され、その ​records​ 変数を使用してアクセスできます。