<batch:job jobName="batchJob">
<batch:process-records >
<batch:step name="adultsOnlyStep" acceptExpression="#[payload.age > 21]">
...
</batch:step>
</batch:process-records>
</batch:job>
Refining Batch Steps Processing
You can refine the work that a batch step performs upon the records it processes.
-
You can set filters upon batch steps to only accept some records for processing.
-
You can aggregate records in groups, sending them as bulk upserts to external sources or services.
This document outlines how and when to use batch filters and the batch commit.
Batch Filters
You can apply one or more filters as attributes to any number of batch steps.
Imagine a batch job whose first batch step checks if a Salesforce contact exists for a record, and a second batch step that updates each existing Salesforce contact with new information. You can apply a filter to the second batch step to ensure it only processes records that didn’t fail during the first batch step.
By having batch steps accept only some records for processing, you streamline the batch job so the Mule runtime engine can focus only on the relevant data for a particular batch step.
A batch step uses two attributes to filter records:
-
acceptExpression
-
acceptPolicy
Each batch step can accept one acceptExpression and one acceptPolicy attributes to filter records.
Use the acceptExpression attribute to process only records that evaluate to true; if the record evaluates to false, the batch step skips the record and sends it to the next one. In other words, the records with an accept expression that resolves to false are the ones that Mule filters out.
The example below filters out all records where the age is less than 21; the batch step does not process those records.
Use the acceptPolicy attribute from batch step to process only the records which, relative to the value of the accept policy attribute, evaluate to true. Refer to the table below for a list of the available values for the accept policy.
Accept Policy | When evaluates to TRUE |
---|---|
NO_FAILURES |
Default |
ONLY_FAILURES |
Batch step processes only those records that failed to process in a preceding batch step. |
ALL |
Batch step processes all records, regardless of whether they failed to process in a preceding batch step. |
If you don’t apply filters to a batch step, the batch processes only those records that succeeded to process in all preceding steps. In other words, the default Accept Policy applied to all batch steps is NO_FAILURES.
The example below illustrates the second batch step in a batch job that processes only those records that failed to process during the preceding step. In the first batch step, the runtime checked each record to see if it had an existing Salesforce contact; the second batch step, which creates a contact for each record, processes only the failed records (that is, records that failed to have an existing account).
<batch:job jobName="batchJob">
<batch:process-records >
<batch:step name="batchStep1">
...
</batch:step>
<batch:step name="batchStep2" accept-policy="ONLY_FAILURES">
...
</batch:step>
</batch:process-records>
</batch:job>
Each batch job has a maxFailedRecords
attribute that controls how many failed records you are willing to accept for a batch job.
When a batch job instance exceeds its maxFailedRecords
value, regardless of the filter set on the batch step, the step does not process any records and pushes the failed batch job instance to the On Complete phase.
See Handling Errors During Batch Job for more information.
Filter Characteristics
-
Batch filters only apply to batch steps which, in turn, are only usable within the batch process phase of a batch job. You cannot apply filters with the Input or On Complete phases.
-
If you apply no filters to a batch step, the batch processes only those records which succeeded to process in all preceding steps. In other words, the default Accept Policy applied to all batch steps is NO_FAILURES.
-
When a batch job instance exceeds its
max-failed-records
value, regardless of the filter set on the batch step, the step does not process any records and pushes the failed batch job instance to the On Complete phase. -
Where you apply both types of filters, Mule evaluates them in the following order:
-
Accept Policy
-
Accept Expression
-
Batch Aggregator
You can use the batch aggregator scope to accumulate a subset of records from a batch step, and bulk upsert them to an external source or service.
For example, rather than upserting each lead (i.e., record) in a batch to Salesforce, you can configure a Batch Commit to accumulate, say, 200 records and then upsert all of them to Salesforce in one chunk.
<batch:step name="Step2">
<batch:aggregator size="200">
<salesforce:create type="Lead" .../>
</batch:aggregator>
</batch:step>
You can also configure the batch aggregator scope to stream your records:
<batch:step name="Strep2">
<batch:aggregator streaming="true">
<salesforce:create type="Lead" .../>
</batch:aggregator>
</batch:step>
Processing a fixed amount of records, and streaming all records are mutually exclusive configurations. Learn more about each other in their sections below.
The batch aggregator is mutable, meaning that you can access the payloads and variables of the records grouped on your batch aggregator.
Keep in mind that, when aggregating a fixed amount of records, you can access each record sequentially, or you can specify a random record to modify.
However, if you configured your batch aggregator to stream its content, you can only access those records sequentially.
When using an aggregator, do not attempt to process Guava data types, such as ImmutableMap
. Instead, use a Java Map to avoid serialization issues.
Aggregating Records using a Fixed Size
You can configure a batch aggregator scope to process fixed-size groups of records inside a batch aggregator scope.
You can configure the batch aggregator scope to upsert, for example, 100 records at a time.
<batch:job jobName="batchJob">
<batch:process-records >
<batch:step name="batchStep">
<batch:aggregator size="100">
...
</batch:aggregator>
</batch:step>
</batch:process-records>
</batch:job>
When using a fixed-size aggregator, you can replace, change, or store the payload and variable data of each record.
As stated above, since the batch aggregator is mutable, by adding a foreach scope you can iterate through a fixed-size aggregator block, you can sequentially go over each record’s data and persistently store each record’s payload and variables. This method of accessing records within the batch aggregator is called sequential access.
You can, for example, for example, use the Groovy scripting module to modify the payload and create a variable for each collected record.
<batch:job jobName="batchJob">
<batch:process-records>
<batch:step name="batchStep">
<batch:aggregator doc:name="batchAggregator" size="10">
<foreach doc:name="For Each">
<script:execute engine="groovy">
<script:code>
vars['marco'] = 'polo'
vars['record'].payload = 'foo'
</script:code>
</script:execute>
</foreach>
</batch:aggregator>
</batch:step>
</batch:process-records>
</batch:job>
The sequential access method assumes that:
-
The aggregator size matches the amount of aggregated records.
-
There is a direct correlation between the aggregated records and the items in the list.
You can also access random records by specifying the iteration number of the foreach, saving you the need to iterate through all records.
The foreach scope exposes a records
variable. This variable is an immutable list used by foreach to keep track of the iteration and provides a random access list that is accessible across the batch aggregator.
You can carry out the same result as the example above by specifying an arbitrary index number for the records list instead of sequentially accessing each record. You can, for example, create a variable and modify the payload of the first record as shown below.
<batch:job jobName="batchJob">
<batch:process-records>
<batch:step name="batchStep">
<batch:aggregator doc:name="batchAggregator" size="10">
<foreach doc:name="For Each">
<script:execute engine="groovy">
<script:code>
records[0].vars['marco'] = 'polo'
records[0].vars['record'].payload = 'foo'
</script:code>
</script:execute>
</foreach>
</batch:aggregator>
</batch:step>
</batch:process-records>
</batch:job>
Using random access, you can change a record’s payload at any given index position in the commit block.
Considerations for Defining a Block Size
In a traditional online processing model, each request is usually mapped to a worker thread. Regardless of the processing type (either synchronous, asynchronous, one-way, request-response or even if the requests are temporarily buffered before being processed), servers usually end up in a 1:1 relationship between a request and a running thread.
When it comes to a batch job, all records are first stored in a persistent queue before the Process phase begins, so that the traditional threading model wouldn’t apply.
To improve performance, the runtime queues and schedules batch records in blocks of up to 100 records per thread. This behavior reduces the number of I/O requests and improves an operation’s load. Batch jobs use the Mule runtime engine’s thread pools, so there is no default for the job. Each thread iterates through that block to process each record, and then each block is queued back, and the process continues.
Consider having 1 million records to place in a queue for a 3-step batch job. At least three million I/O operations occur as the Mule runtime engine takes and requests each record as they move through the job’s phases.
Performance requires having enough available memory to process the threads in parallel, which means moving the records from persistent storage into RAM. The larger your records and their quantity, the more available memory you need for batch processing.
Although the standard model of up to 100 records per thread in the batch job works for most use cases, consider three use cases where you need to increase or decrease the block size:
-
Assume you have 200 records to process through a batch job. With the default 100-record block size, Mule can only process two records in parallel at a time. If a batch job has fewer than 101 records to process, then processing becomes sequential. If you need to process heavy payloads, then queueing a hundred records demands a large amount of working memory.
-
Consider a batch job that needs to process images, and an average image size of 3 MB. In this case, Mule processes 100-record blocks with payloads of 3 MB in each thread. Hence, your default threading-profile setting would require a large amount of working memory just to keep the blocks in the queue. In this case, set a lower block size to distribute each payload through more jobs and lessen the load on your available memory.
-
Suppose you have 5 million records with payloads so small that you can fit blocks of 500 records in your memory without problems. Setting a larger block size improves your batch job time without sacrificing working memory load.
To take full advantage of this feature, you must understand how the block sizes affect your batch job. Running comparative tests with different values and testing performance helps you find an optimum block size before moving this change into production.
Remember that modifying the batch block size is optional. If you apply no changes, the default value is 100 records per block.
You set the size through the Batch Aggregator component, for example:
<batch:aggregator size="100">
...
</batch:aggregator>
Streaming Records in a Batch Aggregator
You can configure a batch aggregator scope to stream its content.
Setting your batch aggregator to stream the records enables you to aggregate all the records in the job instance, no matter how many or how large they are.
Instead of a list of elements that you receive with a fixed-size batch aggregator, the streaming functionality ensures that you receive all the records in the job instance without running out of memory.
For example, if you need to write millions of records to a CSV file, you can process the records as a streaming batch aggregator.
<batch:job jobName="batchJob">
<batch:process-records >
<batch:step name="batchStep">
<batch:aggregator streaming="true">
<file:write path="reallyLarge.csv">
<file:content><![CDATA[%dw 2.0
...
}]]></file:content>
</batch:aggregator>
</batch:step>
</batch:process-records>
</batch:job>
Remember that since this batch aggregator is streaming, you can only access its content sequentially:
<batch:job jobName="batchJob">
<batch:process-records>
<batch:step name="batchStep">
<batch:aggregator doc:name="batchAggregator" streaming="true">
<foreach doc:name="For Each">
<script:execute engine="groovy">
<script:code>
vars['marco'] = 'polo'
vars['record'].payload = 'foo'
</script:code>
</script:execute>
</foreach>
</batch:aggregator>
</batch:step>
</batch:process-records>
</batch:job>
Due to memory restrictions, random access is not supported for streaming aggregators.
The record payloads for random access are exposed as an immutable List
, and since streaming aggregators implies having access to the entire set of records, without a fixed commit size, the runtime can’t guarantee that all records will fit in memory.
Tips
-
Streaming from SaaS providers: In general, you likely wouldn’t use batch streaming when sending data through an Anypoint Connector to a SaaS provider like Salesforce, because SaaS providers often have restrictions on accepting streaming input. Use streaming batch processing when writing to a file such as CSV, JSON, or XML.
-
Batch streaming and performance: Batch processing streaming data does affect the performance of your application, slowing the pace at which it processes transactions. Though performance slows, the trade-off to be able to batch process streaming data may warrant using it in your implementation.
-
Batch streaming and access to items: The biggest drawback to using batch streaming is that you have limited access to the items in the output. In other words, with a fixed-size commit, you get an unmodifiable list, thus allowing you to access and iteratively process its items; with streaming commit, you get a one-read, forward-only iterator.
Batch Aggregator Characteristics
-
The batch aggregator scope can only exist in batch steps which, in turn, are only usable within the batch process phase of a batch job. You cannot use batch aggregators during the On Complete phase of a batch job.
-
An aggregator can only wrap the final element within the batch step in which it resides.
-
Several Anypoint Connectors can handle record-level errors without failing a whole batch aggregation (i.e., upsert).
At runtime, these connectors keep track of which records were successfully accepted by the target resource, and which failed to upsert. Thus, rather than failing a complete group of records, the connector upserts as many records as it can, and tracks any failures for notification. Some of these connectors are:-
Salesforce
-
NetSuite
To make sure that the connector you are using supports record-level errors, check the connector’s documentation.
-
-
The batch aggregator scope does not support job-instance-wide transactions. You can define a transaction inside a batch step that processes each record in a separate transaction. Think of it as a step within a step.
Such a transaction must start and end within the step’s boundaries. -
You cannot share a transaction between a batch step and a batch aggregator that exists within the step. Any transaction that the batch step starts, ends before the batch aggregator begins processing. In other words, a transaction cannot cross the barrier between a batch step and the batch aggregator scope it contains.
Preserving the MIME types of the Aggregated Records
Supported by Mule 4.3
Aggregated records are passed into the aggregator as an array containing each record’s payload. However, the MIME types associated with those payloads are not preserved by default. You can preserve record’s MIME types by specifying the preserveMimeTypes
attribute in a batch aggregator.
As an example, consider the following JSON document:
[
{
"name": "Tony Stark",
"alias": "Iron Man",
"status": "DEAD"
},
{
"name": "Steve Rodgers",
"alias": "Captain America",
"status": "RETIRED"
},
{
"name": "Peter Parker",
"alias": "SpiderMan",
"status": "FUGITIVE"
}
]
Suppose you fed this document into the following job:
<batch:job name="avengersLogger">
<batch:process-records>
<batch:step name="log">
<batch:aggregator size="10">
<foreach>
<logger message="Agent #[payload.alias] is #[payload.status]" />
</foreach>
</batch:aggregator>
</batch:step>
</batch:process-records>
</batch:job>
The batch engine splits the input JSON array into individual records, which means that the aggregator block receives an array with three elements. The first one of them is:
{
"name": "Tony Stark",
"alias": "Iron Man",
"status": "DEAD"
}
However, when the logger element attempts to evaluate the #[payload.alias]
expression, it results in an error similar to the following:
********************************************************************************
Message : "You called the function 'Value Selector' with these arguments:
1: Binary ("ewogICJmaXJzdE5hbWUiOiAiUmFtIiwKICAibGFzdE5hbWUiOiAiUmFtMSIsCiAgImFkZHJlc3Mi...)
2: Name ("alias")
But it expects one of these combinations:
(Array, Name)
(Array, String)
(Date, Name)
(DateTime, Name)
(LocalDateTime, Name)
(LocalTime, Name)
(Object, Name)
(Object, String)
(Period, Name)
(Time, Name)
5| name: payload.alias,
^^^^^^^^^^^^^
The previous error occurs because MIME types are not preserved by default, and therefore Mule doesn’t know that this record is actually a JSON. You can fix this by specifying the preserveMimeTypes
attribute in the batch aggregator:
<batch:aggregator size="10" preserveMimeTypes="true">
<foreach>
<logger message="Agent #[payload.alias] is #[payload.status]" />
</foreach>
</batch:aggregator>
By setting this attribute, Mule automatically maintains each record’s media type and knows that the payload actually represents a JSON document.
Max Concurrency Limit
The Max Concurrency (maxConcurrency
) property limits the number of blocks that the batch job can process concurrently. The records inside the block are processed sequentially.
You can configure the maxConcurrency
property as in the following example:
<batch:job jobName="test-batch" maxConcurrency="${batch.max.concurrency}">
...
</batch:job>
By default, the Batch Job component limits the max concurrency to twice the number of available cores. The capacity of the system running the Mule instance also limits concurrency.