Batch Job 中のエラー処理

Mule 4.1 の標準サポートは 2020 年 11 月 2 日に終了しました。このバージョンの Mule は、拡張サポートが終了する 2022 年 11 月 2 日にその すべてのサポート​が終了しました。

このバージョンの Mule を使用する CloudHub には新しいアプリケーションをデプロイできなくなります。許可されるのはアプリケーションへのインプレース更新のみになります。

標準サポートが適用されている最新バージョンの Mule 4 にアップグレード​することをお勧めします。これにより、最新の修正とセキュリティ機能強化を備えたアプリケーションが実行されます。

概要

このスコープで取得するように設計されている大量のデータを考慮すると、Batch Job 内のエラー処理は複雑になる可能性があります。
100 万件のレコードを処理する場合、すべてをログに記録することはほぼ不可能です。ログが巨大になって判読できなくなり、処理のパフォーマンスに多大な悪影響を及ぼします。
一方、ログが少なすぎると、発生した問題を知ることができなくなります。また、その 100 万件のレコードのうち 3 万件のレコードが失敗し、その原因がわからない場合、デバッグ作業が非常に困難になる可能性があります。このトレードオフを克服することは容易ではありません。

Batch Step 内の失敗したレコードのログ

Batch Step インスタンスを処理するときに、Batch Step 内のプロセッサが失敗したりエラーを発生させたりすることがあります。おそらくレコードデータが破損しているか不完全なことが原因でこれが発生すると、Mule では次のロジックに従ってスタック追跡のログが記録されます。

  1. Mule が例外の完全なスタック追跡を取得します。

  2. Mule はメッセージからそのスタック追跡を削除します。
    すべてのレコードで同じエラーが発生したとしても、処理中のメッセージにはおそらくそれらのレコードに関連する特定の情報が含まれています。たとえば、リードを Salesforce アカウントに転送していて、特定のリードはすでにアップロード済みであるため 1 つのレコードがエラーになった場合、別の繰り返されるリードのレコード情報は異なりますが、同じエラーになります。

  3. Mule はスタック追跡がすでに現在のステップに記録されているかどうかを検証します。
    ランタイムでこのエラーが発生したのは初めての場合、エラーがログに記録され、次のようなメッセージが表示されます。

    com.mulesoft.mule.runtime.module.batch.internal.DefaultBatchStep: Found exception processing record on step 'batchStep1' for job instance 'Batch Job Example' of job 'CreateLeadsBatch'.
    
    This is the first record to show this exception on this step for this job instance. Subsequent records with the same failures will not be logged for performance and log readability reasons:

    Mule は「ステップごと」にログを記録します。別のステップでも同じエラーが発生した場合、ランタイムはそのステップに対してもログを記録します。

  4. Batch Job が On Complete (完了時) フェーズに達すると、Mule にはエラー概要とすべてのエラーの種別、および各 Batch Step でそれが発生した回数が表示されます。
    batch.exception​ 種別が発生した 2 つの Batch Step が含まれる Batch Job のエラー概要を次に示します。

    ************************************************************************************************************************
    *             - - + Exception Type + - -             *         - - + Step + - -        *       - - + Count + - -       *
    ************************************************************************************************************************
    * com.mulesoft.mule.runtime.module.batch.exception.B *            batchStep1           *                10             *
    * com.mulesoft.mule.runtime.module.batch.exception.B *            batchStep2           *                9              *
    ************************************************************************************************************************

    ここでは、最初のステップが 10 回失敗し、2 番目のステップが 9 回失敗したことがわかります。

Mule のデフォルト設定ではこの動作を使用してバッチエラーをログに記録します。これは INFO レベルのメッセージのみをログに記録し、大量なデータをログに記録する場合の効率性のトレードオフとのバランスを取ります。
ただし、バッチ処理を使用する動機は、大規模なデータセットを処理できることではなく、クラッシュから回復し、「ほぼリアルタイム」のデータインテグレーションを実行したときに、中断された場所から Batch Job を処理し続けることかもしれません。または、デバッグ用にさらに詳細なログレベルが必要かもしれません。
そのような場合は、次のようにアプリケーションの log4j 設定でバッチカテゴリを DEBUG レベルに設定します。

log4j.logger.com.mulesoft.module.batch=DEBUG
DEBUG レベルは慎重に使用してください。ジョブが膨大なデータセットを処理しないことが明確でない限り、本番環境で使用することはお勧めしません。

エラー処理用の DataWeave 関数

Mule 4.x には、Batch Step のコンテキストで使用できる DataWeave 関数のセットが含まれています。

DataWeave 関数 説明

#[Batch::isSuccessfulRecord()]

現在のレコードが前のステップで例外をスローしなかった場合に true を返すブール関数。

#[Batch::isFailedRecord()]

現在のレコードが前のステップで例外をスローした場合に true を返すブール関数。

#[Batch::failureExceptionForStep(String)]

ステップの名前を文字列引数として受け取ります。現在のレコードがそのステップで例外をスローした場合、実際の例外オブジェクトを返します。それ以外の場合は null を返します。

#[Batch::getStepExceptions()]

Java ​Map<String, Exception>`​ を返します。キーは現在のレコードが失敗した Batch Step の名前で、値は例外自体です。
レコードがどのステップでも失敗しなかった場合、この Map は空になりますが null になることはありません。また、Map にはレコードが失敗しなかったステップのエントリは含まれません。

#[Batch::getFirstException()]

現在のレコードが失敗した最初のステップの例外を返します。レコードがどのステップでも失敗しなかった場合、null を返します。

#[Batch::getLastException()]

現在のレコードが失敗した最後のステップの例外を返します。レコードがどのステップでも失敗しなかった場合、null を返します。

連絡先情報を含むファイルをポーリングする Batch Job があるとします。
最初のステップでは、Batch Job は連絡先情報を集約し、Transform Message コンポーネントを使用してそれらを変換してから Salesforce に転送します。
2 番目のステップでは、ジョブはその同じ連絡先を別のサードパーティ連絡先アプリケーション (Google コンタクトなど) のデータ構造と一致するように再度変換し、HTTP 要求を使用してこのサービスに転送します。
次に、3 番目のステップとして、失敗したレコードごとに JMS デッドレターキューに書き込むことができる必要があるとします。簡略化するため、例外自体をメッセージにするとします。この要件には注意が必要です。各レコードが両方のステップで失敗し、同じレコードが 2 つの JMS メッセージに変換される可能性があります。

このアプリケーションは次のようになります。

batch error handling faq f34d5

目標は失敗を収集することであるため、​ONLY_FAILURES​ 検索条件を使用して ​Failures​ ステップを設定することが理にかなっています。(バッチ検索条件についての詳細は、​「Batch Step 処理の絞り込み」​を参照してください)。
このステップの set-payload プロセッサは、​#Batch::getStepExceptions()​ 関数を使用するように設定できます。

batch error handling faq 6269e

上述のように、この関数はすべてのステップで見つかったすべてのエラーを含むマップを返します。ここでの目標は JMS を介して例外を送信することであり、ステップを気にする必要はないため、foreach スコープを使用してマップの値 (エラー) を反復処理し、JMS アウトバウンドエンドポイントを介して送信できます。

batch error handling faq b822e

エラー処理用のバッチ処理戦略

Mule には、レコードレベルのエラーを処理するための 3 つのオプションがあります。

  1. 処理を終了​。現在のジョブインスタンスの実行を停止します。現在インフライト状態のレコードの実行を終了しますが、キューからそれ以上レコードを取得せずにジョブインスタンスを ​FAILURE​ 状態に設定します。On Complete (完了時) フェーズが呼び出されます。

  2. 失敗したレコード数に関係なく​処理を続行​。​acceptExpression​ および ​acceptPolicy​ 属性を使用して、失敗したレコードの処理方法を後続の Batch Step に指示します。

  3. Batch Job が​失敗したレコードの最大数​に達するまで、失敗したレコード数に関係なく​処理を続行​ (​acceptExpression​ および ​acceptPolicy​ 属性を使用して、失敗したレコードの処理方法を後続の Batch Job に指示します)。最大数に達した時点で、オプション 1 と同様に実行を停止します。

デフォルトでは、Mule の Batch Job はバッチインスタンスの実行を停止する最初のエラー処理戦略に従います。上記の動作は、​maxFailedRecords​ 属性を使用して制御されます。

失敗したレコードの処理オプション Batch Job

属性

失敗したレコードが見つかったときに処理を停止。

maxFailedRecords

0

失敗したレコード数に関係なく、無期限に処理を続行。

maxFailedRecords

-1

失敗したレコードの最大数に達するまで処理を続行。

maxFailedRecords

integer

<batch:job jobName="Batch1" maxFailedRecords="0">

最大失敗しきい値の超過

Batch Job で ​maxFailedRecords​ しきい値を超えるのに十分な失敗したレコード数が蓄積されると、Mule は残りの Batch Job の処理を中止し、On Complete (完了時) フェーズに直接移動します。

たとえば、​maxFailedRecords​ の値を「10」に設定し、Batch Job が 3 つの Batch Step 最初のステップで失敗したレコードを 10 件蓄積した場合、Mule は残り 2 つの Batch Step ではバッチを処理しません。代わりに、それ以降の処理を中止し、On Complete (完了時) に直接移動して Batch Job の失敗を報告します。

Batch Job が ​maxFailedRecords​ しきい値を超えるのに十分な失敗したレコード数を蓄積​しない​場合、​すべての​レコード (成功と失敗) が引き続き Batch Step 間を移動します。各 Batch Step で処理するレコードを制御するには、検索条件を使用します。