Contact Free trial Login

Refining Batch Steps Processing

You can refine the work that a batch step performs upon the records it processes.

  • You can set filters upon batch steps to only accept some records for processing.

  • You can aggregate records in groups, sending them as bulk upserts to external sources or services.

This document outlines how and when to use batch filters and the batch commit.

Batch Filters

You can apply one or more filters as attributes to any number of batch steps.
Imagine a batch job whose first batch step checks if a Salesforce contact exists for a record, and a second batch step that updates each existing Salesforce contact with new information. You can apply a filter to the second batch step to ensure it only processes records that didn’t fail during the first batch step.
By having batch steps accept only some records for processing, you streamline the batch job so the runtime can focus only on the relevant data for a particular batch step.

A batch step uses two attributes to filter records:

  • acceptExpression

  • acceptPolicy

Each batch step can accept one acceptExpression and one acceptPolicy attributes to filter records.

Use the acceptExpression attribute to process only records that evaluate to true; if the record evaluates to false, the batch step skips the record and sends it to the next one. In other words, the records with an accept expression that resolves to false are the ones that Mule filters out.

The example below filters out all records where the age is less than 21; the batch step does not process those records.

<batch:job jobName="batchJob">
	<batch:process-records >
		<batch:step name="adultsOnlyStep" acceptExpression="#[payload.age > 21]">
			...
		</batch:step>
	</batch:process-records>
 </batch:job>

Use the acceptPolicy attribute from batch step to process only the records which, relative to the value of the accept policy attribute, evaluate to true. Refer to the table below for a list of the available values for the accept policy.

Accept Policy When evaluates to TRUE

NO_FAILURES

Default
Batch step processes only those records that succeeded to process in all preceding steps.

ONLY_FAILURES

Batch step processes only those records that failed to process in a preceding batch step.

ALL

Batch step processes all records, regardless of whether they failed to process in a preceding batch step.

If you don’t apply filters to a batch step, the batch processes only those records that succeeded to process in all preceding steps. In other words, the default Accept Policy applied to all batch steps is NO_FAILURES.

The example below illustrates the second batch step in a batch job that processes only those records that failed to process during the preceding step. In the first batch step, the runtime checked each record to see if it had an existing Salesforce contact; the second batch step, which creates a contact for each record, processes only the failed records (that is, records that failed to have an existing account).

<batch:job jobName="batchJob">
	<batch:process-records >
		<batch:step name="batchStep1">
			...
		</batch:step>
		<batch:step name="batchStep2" accept-policy="ONLY_FAILURES">
			...
		</batch:step>
	</batch:process-records>
 </batch:job>

Each batch job has a maxFailedRecords attribute that controls how many failed records you are willing to accept for a batch job.
When a batch job instance exceeds its maxFailedRecords value, regardless of the filter set on the batch step, the step does not process any records and pushes the failed batch job instance to the On Complete phase.
See Handling Errors During Batch Job for more information.

Filter Characteristics

  • Batch filters only apply to batch steps which, in turn, are only usable within the batch process phase of a batch job. You cannot apply filters with the Input or On Complete phases.

  • If you apply no filters to a batch step, the batch processes only those records which succeeded to process in all preceding steps. In other words, the default Accept Policy applied to all batch steps is NO_FAILURES.

  • When a batch job instance exceeds its max-failed-records value, regardless of the filter set on the batch step, the step does not process any records and pushes the failed batch job instance to the On Complete phase.

  • Where you apply both types of filters, Mule evaluates them in the following order:

    1. Accept Policy

    2. Accept Expression

Batch Aggregator

You can use the batch aggregator scope to accumulate a subset of records from a batch step, and bulk upsert them to an external source or service.
For example, rather than upserting each lead (i.e., record) in a batch to Salesforce, you can configure a Batch Commit to accumulate, say, 200 records and then upsert all of them to Salesforce in one chunk.

<batch:step name="Step2">
	<batch:aggregator size="200">
     <salesforce:create type="Lead" .../>
	</batch:aggregator>
</batch:step>

You can also configure the batch aggregator scope to stream your records:

<batch:step name="Strep2">
	<batch:aggregator streaming="true">
     <salesforce:create type="Lead" .../>
	</batch:aggregator>
</batch:step>

Processing a fixed amount of records, and streaming all records are mutually exclusive configurations. Learn more about each other in their sections below.

The batch aggregator is mutable, meaning that you can access the payloads and variables of the records grouped on your batch aggregator.
Keep in mind that, when aggregating a fixed amount of records, you can access each record sequentially, or you can specify a random record to modify.
However, if you configured your batch aggregator to stream its content, you can only access those records sequentially.

Aggregating Records using a Fixed Size

You can configure a batch aggregator scope to process fixed-size groups of records inside a batch aggregator scope.

You can configure the batch aggregator scope to upsert, for example, 100 records at a time.

<batch:job jobName="batchJob">
	<batch:process-records >
		<batch:step name="batchStep">
			<batch:aggregator size="100">
				...
			</batch:aggregator>
		</batch:step>
	</batch:process-records>
</batch:job>

When using a fixed-size aggregator, you can replace, change, or store the payload and variable data of each record.

As stated above, since the batch aggregator is mutable, by adding a foreach scope you can iterate through a fixed-size aggregator block, you can sequentially go over each record’s data and persistently store each record’s payload and variables. This method of accessing records within the batch aggregator is called sequential access.
You can, for example, for example, use the Groovy scripting module to modify the payload and create a variable for each collected record.

<batch:job jobName="batchJob">
	<batch:process-records>
		<batch:step name="batchStep">
			<batch:aggregator doc:name="batchAggregator" size="10">
				<foreach doc:name="For Each">
					<script:execute engine="groovy">
			    	<script:code>
			        		vars['marco'] = 'polo'
							    vars['record'].payload = 'foo'
			    	</script:code>
					</script:execute>
				</foreach>
			</batch:aggregator>
		</batch:step>
	</batch:process-records>
</batch:job>

The sequential access method assumes that:

  1. The aggregator size matches the amount of aggregated records.

  2. There is a direct correlation between the aggregated records and the items in the list.

You can also access random records by specifying the iteration number of the foreach, saving you the need to iterate through all records.
The foreach scope exposes a records variable. This variable is an immutable list used by foreach to keep track of the iteration and provides a random access list that is accessible across the batch aggregator.

You can carry out the same result as the example above by specifying an arbitrary index number for the records list instead of sequentially accessing each record. You can, for example, create a variable and modify the payload of the first record as shown below.

<batch:job jobName="batchJob">
	<batch:process-records>
		<batch:step name="batchStep">
			<batch:aggregator doc:name="batchAggregator" size="10">
				<foreach doc:name="For Each">
					<script:execute engine="groovy">
			    	<script:code>
			        	records[0].vars['marco'] = 'polo'
						    records[0].vars['record'].payload = 'foo'
			    	</script:code>
					</script:execute>
				</foreach>
			</batch:aggregator>
		</batch:step>
	</batch:process-records>
</batch:job>

Using random access, you can change a record’s payload at any given index position in the commit block.

Considerations for Defining a Block Size

On a traditional on-line processing model, each request is usually mapped to a worker thread. Regardless of the processing type (either synchronous, asynchronous, one-way, request-response or even if the requests are temporarily buffered before being processed), servers usually end up in a 1:1 relationship between a request and a running thread.
When it comes to a batch job, all records are first stored in a persistent queue before the Process phase begins, so that the traditional threading model wouldn’t apply.

To improve performance, the runtime queues and schedules batch records in blocks of 100 records. This lessens the amount of I/O requests and improves an operation’s load.
The default threading profile of the runtime is 16 threads per job. Therefore, in a default configured batch job each of the 16 threads processes a block of 100 records.
Each thread iterates through that block processing each record, and then each block is queued back, and the process continues.

Consider having 1 million records to place in a queue for a 3 step batch job. At least three million I/O operations occur as the runtime takes and requests each record as they move through the job’s phases.
Performance requires having enough available memory to process the 16 threads in parallel, which means moving 1600 records from persistent storage into RAM. The larger your records and their quantity, the more available memory you need for batch processing.

Although, the standard model of 16 threads, with 100 records per batch job works for most use cases, consider three use cases where you might need to increase or decrease the block size:

  • Assume you have 200 records to process through a batch job. With the default 100-record block size, Mule can only process two records in parallel at a time. If you request fewer than 101 records, then your processing becomes sequential. If you need to process heavy payloads, then queueing a hundred records demands a large amount of working memory.

  • Consider a batch job that needs to process images, and an average image size of 3 MB. You then have 100 blocks with payloads of 3 MB, being processed in 16 threads. Hence your default threading-profile setting would require around 4.6 GB of working memory just to keep the blocks in the queue. You should set a lower block size to distribute each payload through more jobs and lessen the load on your available memory.

  • Suppose having 5 million records with payloads so small that you can fit blocks of 500 records in your memory without problems. Setting a larger block size improves your batch job time without sacrificing working memory load.

To take full advantage of this feature, you need to understand how the block sizes affect your batch job. Running comparative tests with different values and testing performance helps you find an optimum block size before moving this change into production.

Remember that modifying the batch block size is optional. If you apply no changes, the default value is 100 records per block.

Streaming Records in a Batch Aggregator

You can configure a batch aggregator scope to stream its content.
Setting your batch aggregator to stream the records enables you to aggregate all the records in the job instance, no matter how many or how large they are.

Instead of a list of elements that you receive with a fixed-size batch aggregator, the streaming functionality ensures that you receive all the records in the job instance without running out of memory.

For example, if you need to write millions of records to a CSV file, you can process the records as a streaming batch aggregator.

<batch:job jobName="batchJob">
	<batch:process-records >
		<batch:step name="batchStep">
			<batch:aggregator streaming="true">
				<file:write path="reallyLarge.csv">
					<file:content><![CDATA[%dw 2.0
						...

					}]]></file:content>
			</batch:aggregator>
		</batch:step>
	</batch:process-records>
</batch:job>

Remember that since this batch aggregator is streaming, you can only access its content sequentially:

<batch:job jobName="batchJob">
	<batch:process-records>
		<batch:step name="batchStep">
			<batch:aggregator doc:name="batchAggregator" streaming="true">
				<foreach doc:name="For Each">
					<script:execute engine="groovy">
						<script:code>
              vars['marco'] = 'polo'
							vars['record'].payload = 'foo'
						</script:code>
					</script:execute>
				</foreach>
			</batch:aggregator>
		</batch:step>
	</batch:process-records>
</batch:job>

Due to memory restrictions, random access is not supported for streaming aggregators.
The record payloads for random access are exposed as an immutable List, and since streaming aggregators implies having access to the entire set of records, without a fixed commit size, the runtime can’t guarantee that all records will fit in memory.

Tips

  • Streaming from SaaS providers: In general, you likely wouldn’t use batch streaming when sending data through an Anypoint Connector to a SaaS provider like Salesforce, because SaaS providers often have restrictions on accepting streaming input. Use streaming batch processing when writing to a file such as CSV, JSON, or XML.

  • Batch streaming and performance: Batch processing streaming data does affect the performance of your application, slowing the pace at which it processes transactions. Though performance slows, the trade-off to be able to batch process streaming data may warrant using it in your implementation.

  • Batch streaming and access to items: The biggest drawback to using batch streaming is that you have limited access to the items in the output. In other words, with a fixed-size commit, you get an unmodifiable list, thus allowing you to access and iteratively process its items; with streaming commit, you get a one-read, forward-only iterator.

Batch Aggregator Characteristics

  • The batch aggregator scope can only exist in batch steps which, in turn, are only usable within the batch process phase of a batch job. You cannot use batch aggregators during the On Complete phase of a batch job.

  • An aggregator can only wrap the final element within the batch step in which it resides.

  • Several Anypoint Connectors can handle record-level errors without failing a whole batch aggregation (i.e., upsert).
    At runtime, these connectors keep track of which records were successfully accepted by the target resource, and which failed to upsert. Thus, rather than failing a complete group of records, the connector upserts as many records as it can, and tracks any failures for notification. Some of these connectors are:

    • Salesforce

    • NetSuite

    • Database

      To make sure that the connector you are using supports record-level errors, check the connector’s documentation.

  • The batch aggregator scope does not support job-instance-wide transactions. You can define a transaction inside a batch step that processes each record in a separate transaction. Think of it as a step within a step.
    Such a transaction must start and end within the step’s boundaries.

  • You cannot share a transaction between a batch step and a batch aggregator that exists within the step. Any transaction that the batch step starts, ends before the batch aggregator begins processing. In other words, a transaction cannot cross the barrier between a batch step and the batch aggregator scope it contains.

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub