バッチ処理

バッチ処理は Mule Enterprise ランタイム専用です。

Mule では、メッセージをバッチで処理できます。

アプリケーション内で Batch Job スコープを開始できます。これは、メッセージを個々のレコードに分割し、各レコードに対してアクションを実行してから、結果を報告し、場合によっては処理された出力を他のシステムまたはキューに転送するコードブロックです。

たとえば、次の場合にバッチ処理を使用できます。

  • NetSuite と Salesforce 間の連絡先の同期など、ビジネスアプリケーション間でデータセットを同期する。

  • フラットファイル (CSV) から Hadoop へのデータのアップロードなど、対象システムへの情報の抽出、変換、および読み込み (ETL) を行う。

  • API から従来のシステムへの大量の受信データを処理する。

Mule 3.x のバッチ処理にすでに精通している場合は、記事「バッチモジュールの移行」​で Mule 4.x のバッチとの違いの概要を確認してください。

概要

Mule アプリケーション内では、バッチ処理は、個々のレコードに分割される、メモリ量を上回るデータセットを非同期で処理するための構造を提供します。バッチジョブを使用すると、ソースデータを自動的に分割して永続的なキューに保存する信頼できるプロセスを記述できるため、大量のデータセットを処理しながら信頼性を提供できます。アプリケーションを再デプロイする場合、または Mule がクラッシュした場合、ジョブが停止した位置でジョブの実行を再開できます。

batch main3

これで、個々のレコードの処理の観点でジョブが表わされるため、レコードレベルの変数、集約、およびエラー処理のセマンティクスが提供されます。

基本構造

Mule のバッチ処理の中心は Batch Job です。Batch Job は、大きなメッセージを Mule が非同期で処理するレコードに分割するスコープです。フローがメッセージを処理する方法と同様に、Batch Job はレコードを処理します。

batch processing concept d1bdd

バッチ XML 構造は Mule 4.0 で変更されました。次の例は、バッチ要素を強調表示するため省略された詳細を示しています。

<flow name="flowOne">
	<batch:job jobName="batchJob">
		<batch:process-records>

			<batch:step name="batchStep1">
				<event processor/>
				<event processor/>
			</batch:step>

			<batch:step name="batchStep2">
				<event processor/>
				<event processor/>
			</batch:step>
		</batch:process-records>
	</batch:job>
</flow>

Batch Job には、レコードが Batch Job 内を移動するときにレコードに対して実行する 1 つ以上の Batch Step が含まれます。

Batch Job の各 Batch Step には、レコードに対してその中に含まれるデータを変換、転送、強化、または別の方法で処理するプロセッサが含まれます。既存の Mule プロセッサの機能を利用することで、Batch Job がレコードを処理する方法を Batch Step で柔軟に制御できます。詳細は、「Batch Step 処理の絞り込み」​を参照してください。

フローが Batch Job の process-records セクションに到達すると、その Batch Job が実行されます。トリガされると、Mule は新しい Batch Job インスタンスを作成します。

Batch Job の実行が開始されると、Mule は受信メッセージをレコードに分割して永続的なキューに保存し、処理するレコードのブロックでそれらのレコードのクエリとスケジュールを実行します。デフォルトでは、ランタイムは各 Batch Step に 100 件のレコードを保存します。必要なパフォーマンスに応じて、このサイズをカスタマイズできます。詳細は、「Batch Step 処理の絞り込み」​を参照してください。

すべてのレコードがすべての Batch Step を通過したら、ランタイムは Batch Job インスタンスを終了し、処理中に成功したレコードと失敗したレコードを示す Batch Job の結果を報告します。

エラー処理

Batch Job は、Batch Job 全体の失敗を回避するため、処理中に発生する可能性があるレコードレベルの失敗を処理できます。また、バッチ処理中に Mule がレコード変数に従って Batch Job 内のレコードの転送または他のアクションを実行できるように、個々のレコードの変数を設定または削除できます。詳細は、「Batch Job 中のエラー処理」​を参照してください。

Batch Job と Batch Job インスタンスの比較

上記のコンテキストで定義されていますが、Batch Job​ と Batch Job インスタンス​という用語は相互に関連するため、詳しく説明します。

  • Batch Job は、Mule がメッセージペイロードをレコードのバッチとして処理する、アプリケーションのスコープ要素です。Batch Job という用語は、Load and Dispatch (読み込みおよびディスパッチ)、Process (プロセス)、On Complete (完了時) の 3 つの処理フェーズすべてを含みます。

  • Batch Job インスタンスは、Mule フローが Batch Job を実行するたびに Mule アプリケーションで発生します。Mule は Load and Dispatch (読み込みおよびディスパッチ) フェーズで Batch Job インスタンスを作成します。すべての Batch Job インスタンスは、Batch Job インスタンス ID​ と呼ばれる一意の文字列を使用して内部的に識別されます。

この識別子は、たとえばデータを参照および管理するためにローカルジョブインスタンス ID を外部システムに渡したり、ジョブのカスタムログを改善したり、特定の Batch Job インスタンスに関する重要なイベントのメールまたは SMS 通知を送信したりする場合に役立ちます。この識別子とそのカスタマイズ方法についての詳細は、「Batch Job インスタンス ID」​を参照してください。

Batch Job 処理フェーズ

各 Batch Job には 3 つの異なるフェーズがあります。

  1. Load and Dispatch (読み込みおよびディスパッチ)。

  2. Process (プロセス)。

  3. On Complete (完了時)。

Load and Dispatch (読み込みおよびディスパッチ)

この最初のフェーズは暗黙的です。このフェーズでは、ランタイムはすべてのバックグラウンド処理を実行して Batch Job インスタンスを作成します。基本的に、これは Mule がバッチとして処理するためにシリアル化されたメッセージペイロードをレコードのコレクションに変換するフェーズです。このアクティビティが発生するために何も設定する必要はありませんが、Mule がこのフェーズ中に完了するタスクを理解しておくと役立ちます。

  1. Mule は DataWeave を使用してメッセージを分割します。この最初の手順では、新しい Batch Job インスタンス​ が作成されます。
    Mule は batchJobInstanceId​ 変数を使用して Batch Job インスタンス ID を公開します。この変数はすべてのステップと on-complete フェーズで使用可能です。

  2. Mule は永続的なキューを作成して新しい Batch Job インスタンスに関連付けます。

  3. スプリッタによって生成された各アイテムに対して、Mule はレコードを作成してキューに保存します。このアクティビティは「オールオアナッシング」です。つまり、Mule がすべての項目のレコードを正常に生成してキューに追加するか、このフェーズ中にメッセージ全体が失敗します。

  4. Mule は Batch Job のインスタンスとキュー内のすべてのレコードを処理用の最初の Batch Step に渡します。

このフェーズが完了したら、フローが続行します。

次のフェーズ「Process (プロセス)」は非同期です。つまり、フローは Batch Job がすべてのレコードの処理を完了するまで待ちません。

Process (プロセス)

「Process (プロセス)」フェーズ中、ランタイムはバッチ内のレコードの処理を非同期で処理します。各レコードは最初の Batch Step のプロセッサを通過し、2 番目の Batch Step で処理されるのを待つ間に元のキューに戻されます。すべてのレコードがすべての Batch Step を通過するまでこの処理が繰り返されます。1 つのキューのみが存在し、各 Batch Step でそこからレコードが選択され、処理されてからキューに戻されます。各レコードは、このキュー内にある間にどの段階の処理が完了したかを追跡します。Batch Job インスタンスは、キューに追加されたすべてのレコードが 1 つの Batch Step での処理を終了するのを待たずに、レコードを次の Batch Step に転送します。キューは永続的です。

Mule は、すべてのレコードが各 Batch Step での処理に成功したか失敗したときに、そのリストを保持します。レコードが Batch Step でイベントプロセッサによる処理に失敗した場合、ランタイムはバッチの処理を続行し、後続の各 Batch Step では失敗したレコードをスキップします。

このフェーズの終了時に Batch Job インスタンスは完了し、消滅します。

バッチ + 図

レコードの単純な処理以外に、Batch Step 内でレコードを使用してできることがいくつかあります。

  • 各 Batch Step 内に acceptExpressions を追加して検索条件を追加し、ステップが特定のレコードを処理しないようにする。 たとえば、先行するステップで処理に失敗したレコードがステップで処理されないようにする検索条件を設定できます。

  • Batch Aggregator プロセッサを使用して、レコードをグループに集約し、それらを一括更新/挿入で外部ソースまたはサービスに送信可能。 たとえば、各連絡先 (レコード) を Google コンタクトにバッチで更新/挿入する代わりに、100 件のレコードを蓄積するようにバッチアグリゲータを設定して、そのすべてを 1 つのチャンクで Google コンタクトに更新/挿入できます。

詳細は、「Batch Step 処理の絞り込み」​を参照してください。

On Complete (完了時)

このフェーズ中、必要に応じて、特定の Batch Job インスタンスで処理したレコードのレポートまたは概要を作成するようにランタイムを設定できます。このフェーズは、入力データに存在する可能性がある問題に対処できなかったレコードに関するインサイトをシステム管理者と開発者に提供するために存在します。

<batch:job name="Batch3">
        <batch:input>
            <poll doc:name="Poll">
                <sfdc:authorize/>
            </poll>
            <set-variable/>
        </batch:input>
        <batch:process-records>
            <batch:step name="Step1">
                <batch:record-variable-transformer/>
                <data-mapper:transform/>
            </batch:step>
            <batch:step name="Step2">
                <logger/>
                <http:request/>
            </batch:step>
        </batch:process-records>
        <batch:on-complete>
            <logger/>
        </batch:on-complete>
</batch:job>

Mule が Batch Job 全体を実行したら、出力は Batch Job 結果オブジェクト (BatchJobResult​) になります。Mule は Batch Job を非同期の一方向フローとして処理するため、バッチ処理の結果は、そのトリガ元のフローに戻されることも、呼び出し元への応答として返されることもありません。データを Batch Job に送り込むイベントソースは、要求 - 応答ではなく一方向である必要があります。

出力を操作する方法は 2 つあります。

  • On Complete (完了時) フェーズで、失敗したレコード数や正常に処理されたレコード数などの情報を使用する DataWeave を使用して、エラーが発生した可能性があるステップに関するレポートを作成する。

  • Mule アプリケーションの他の場所にある Batch Job 結果オブジェクトを参照し、特定の Batch Job インスタンスで処理に失敗したレコード数などのバッチメタデータをキャプチャおよび使用する。

On Complete (完了時) フェーズを空のままにし、アプリケーションの他の場所で Batch Job 結果オブジェクトを参照しない場合は、失敗したか成功したかに関係なく、Batch Job は単に完了します。

失敗または成功したレコードについて報告するための何らかのメカニズムを設定し、必要な場合は追加アクションの実行を促すことをお勧めします。

Was this article helpful?

💙 Thanks for your feedback!