Contact Us 1-800-596-4880

Batch Component Reference

Mule batch components manage bulk processing of records with minimal adjustments to default component settings. They are highly configurable to meet specific processing and performance requirements.

A Batch Job component must contain at least one Batch Step component. Optionally, each Batch Step component can contain a single Batch Aggregator component. The Mule flow in the following figure shows batch components.

A flow showing batch processing components in Anypoint Studio

The Batch Job component splits the payload of the Mule message into records. Within the Batch Step and Batch Aggregator components, these records are available with the keyword payload, and Mule variables are also accessible through the vars keyword. However, Mule attributes are not accessible from within these components. Both components return null on attributes.

This reference assumes that you are familiar with the batch processing phases of a batch job instance.

Batch Job Component (<batch:job />)

This component prepares its input for processing and outputs a report with results of that processing. Preparation takes place during the Load and Dispatch phase of a batch job instance, and the report is available in the On Complete phase.

In Studio, you can set these properties in the component’s General and History tabs.

Batch Job Properties

The Batch Job component provides a number of configurable properties that determine how batch processing takes place. It also provides a way of storing the input payload in a target variable for use by subsequent components in the flow, which can be useful because the Batch Job component consumes the message payload.

Anypoint Studio provides the following fields in the General tab:

Batch job fields in Anypoint Studio

The following table describes each field and includes the XML attribute for the field.

Field Name XML Description

Name

name

Configurable name for a Batch Job component. The default name appends Batch_Job to the name of the application.

Max Failed Records

maxFailedRecords

Maximum number of records that can fail within a batch job instance before the execution of that instance stops. The default is 0. Use -1 for no limit. This setting takes precedence over any accept policy (acceptPolicy) or expression (acceptExpression) filter on a Batch Step component within the Batch Job component. When a batch job instance exceeds the maxFailedRecords value, the Batch Job component stops processing of the batch job instance in which the failure occurs, assigns the instance the status FAILED_PROCESS_RECORDS, and reports the number of failed records in the instance. When a batch job instance reaches this threshold, the console prints a message such as this one (edited for readability):

INFO  ... DefaultBatchEngine:
 instance '6d85c380-f332-11ec-bb5f-147ddaaf4f97' of job 'Batch_Job_in_Mule_Flow'
 has reached the max allowed number of failed records. Record will be added to
 failed list and the instance will be removed from execution pool.
...
INFO  ... DefaultBatchEngine:
 instance 6d85c380-f332-11ec-bb5f-147ddaaf4f97 of job Batch_Job_in_Mule_Flow
 has been stopped. Instance status is FAILED_PROCESS_RECORDS

Note that parallel processing within a batch job instance makes it possible for the number of errors to exceed the configured maximum because it is possible for multiple records fail at the same time.

Scheduling Strategy

schedulingStrategy

When a Batch Job component triggers repeatedly, more than one batch job instance can be ready to execute at the same time. The instances can run sequentially based on their creation timestamp (the default) or according to a round-robin algorithm that attempts to maximize the use of available resources. This setting is appropriate only for jobs that have no side effects or dependencies on other records because the order of processing is not guaranteed.

  • ORDERED_SQUENTIAL (default): With this setting, batch job instances execute one at a time in the order established by their creation timestamp. An instance created earlier executes before an instance created at a later time and date.

  • ROUND_ROBIN: With this setting, all batch job instances execute based on a round-robin algorithm that assigns available resources. Order of execution is not predetermined. Batch instances can execute in any order. This option is viable only if you are certain that no execution of an instance is capable of producing a side effect on an execution of another instance. Best practices:

    • To avoid potential updates to the same record in concurrently running batch job instances, do not use this strategy for data synchronization jobs. Using this strategy when synchronizing data makes it possible to return a result with an earlier, incorrect version of the data.

    • Use this strategy to parallelize execution of batch job instances if you are certain that no record depends on the processing of another record within the Batch Job component, such as a case in which your batch job retrieves only new records from a database.

Job Instance ID

jobInstanceId

Optional expression that provides a recognizable, unique name for the batch instance. To make the name unique, you must use a DataWeave expression, for example, #[jobInstanceId="#['Job From ' ++ now() as String {format: 'dd/MM/yy hh:mm:ss'}] within this field. If an expression returns the same value more than once, Mule throws an error with a message such as, Batch Job 'my-batch-job-example' already has an instance with id 'my-repeated-id'. The Batch Job component uses this value to create a new identifier each time the Mule flow reaches the Batch Job component. If you do not provide an ID, Mule automatically creates a UUID that serves as the identifier for each batch job instance.

Batch Block Size

blockSize

Number of records to process per record block. The default is 100 records per block. If fewer records are available for processing than the configured size, Mule sends a block of a smaller size to the record processors. Such a case can occur, for example, if there are 1010 records and the block size is 100 because the modulo is 10, not 0. The block size setting provides a way to tune performance, which is affected by the size of records in each block and the maxConcurrency setting.

Max Concurrency

maxConcurrency

Property that sets the maximum level of parallelism to allow when processing record blocks within the component. The default is twice the number of cores available in the CPUs. The capacity of the system running the Mule instance also limits concurrency. Like the blockSize setting, this setting provides a way to tune performance.

Target

target

To use the payload received by a batch job instance in a component that is located after the Batch Job component, you can provide a unique value for the target and use payload in the downstream component to gain access that payload. Without a target value, the input payload to the Batch Job component is not available in downstream components. For more information about target variables, see Enrich Data with Target Variables. There is no default value.

Batch Job History (<batch:history />)

A batch job history configuration enables you to address a No space left on device error by adjusting the amount of time that historical data for batch job instances persist in a temporary Mule directory. This error can occur if the frequency or number of records to process is too high for the available disk space or for a CloudHub worker of a given size to handle.

By default, the history of batch job instances persists for 7 days. A monitoring process automatically removes that history when the period expires.

Batch job history fields in Anypoint Studio

The following table describes each field and includes the XML attribute for the field.

Name XML Description

Max Age

maxAge

Maximum age of job instances before they expire, for example: expiration maxAge="10". Default: maxAge=7.

Time Unit

ageUnit

The unit of time to apply to the maxAge. Valid values: DAYS, HOURS, MILLISECONDS, MINUTES, and SECONDS. Default: ageUnit=DAYS.

Note that these fields are attributes to the <batch:expiration/> element, which is embedded within <batch:history/>:

<batch:job ...>
  <batch:history >
    <batch:expiration maxAge="10" ageUnit="MINUTES" />
  </batch:history>
  <batch:process-records>
    <batch:step />
  </batch:process-records>
<batch:job />

The XML example is edited to focus on settings in the batch history fields.

Batch Step Component (<batch:step/>)

The Batch Step component runs during the Process phase of a batch job instance, not during the Load and Dispatch or On Complete phases.

A minimum of one Batch Step component is required within a Batch Job component. In addition to changing the name of a Batch Step component, you can configure filters that determine which records the component accepts for processing within the component. Any processors that you add to the component act on records that the Batch Job component accepts.

Batch job general fields in Anypoint Studio

The following table describes each field and includes the XML attribute for the field.

Field Name XML Description

Name

name

Configurable name for a Batch Step component. The default is Batch_Step for the first Batch Step component within a Batch Job component. Subsequent steps append a unique number to the name, following the sequence, Batch_Step1, Batch_Step2, and so on.

Accept Expression

acceptExpression

Optional DataWeave expression for a filter that determines whether to process a record in the component. If both the acceptExpression and the acceptPolicy values for a record evaluate to true, then the component accepts the record for processing. If not, the component skips the record, and the record becomes available to any downstream Batch Step components. Examples:

  • acceptExpression="#[payload.age > 21]" for a payload with an age field

  • acceptExpression="#[not isBlank(payload.name)]" for a payload with a name field.

Accept Policy

acceptPolicy

Accepts a record for processing only if the policy evaluates to true for that record. You can base acceptance on the success or failure of a record to process successfully in preceding (upstream) Batch Step components within the Batch Job component. You can configure the component to process only records that processed successfully, only records that failed to process successfully, or all records, regardless of whether they processed successfully. The default is NO_FAILURES. Note that the acceptPolicy evaluates before any acceptExpression on the component, and the maxFailedRecords setting on the Batch Job component takes precedence over acceptPolicy settings.

  • NO_FAILURES: Default setting. Accepts a record for processing if the record never failed to process successfully within a preceding Batch Step component or if there is no upstream Batch Step component.

  • ONLY_FAILURES: Accepts a record for processing if the record failed to process successfully within a preceding Batch Step component.

  • ALL: Accepts a record for processing regardless of processing successfully or failing within a preceding Batch Step component.

Batch Aggregator Component (<batch:aggregator />)

The Batch Aggregator component is an optional component that acts on an array of records. Aggregation takes place during the Process phase of a batch job instance.

There is no default configuration for the Batch Aggregator component. If you use this component, you must indicate whether to stream records (streaming) or pull records from the queue into separate arrays of a fixed size (size), but not both. The settings are mutually exclusive. Random access to streamed records is not possible.

The Batch Aggregator component accepts one or more processors for the message payload. However, it is common to use a scope within the aggregator, such as For Each scope, to iterate over each record so that the child processors within the scope can act on each record individually.

The Batch Aggregator component also provides a way preserve the MIME type of records (preserveMimeTypes), which is needed when accessing part of a record that you are aggregating as a fixed size (size). See Preserving the MIME types of the Aggregated Records. To learn about record processing within this component, see batch aggregation information in Process Phase.

Note that some connectors, such as Anypoint Connector for Salesforce (Salesforce Connector) and Anypoint Connector for NetSuite (NetSuite Connector), provide operations that can handle record-level errors without causing the batch aggregation process to fail.

When using Batch Aggregators, consider the following limitations:

  • When using a Batch Aggregator component, do not attempt to process Guava data types, such as ImmutableMap. Instead, use a Java Map to avoid serialization issues.

  • The Batch Aggregator component does not support job-instance-wide transactions. Within a Batch Step component, you can define a transaction that processes each record in a separate transaction. However, any transaction started by a Batch Step component ends before the Batch Aggregator component starts processing. Components cannot share a transaction because a transaction cannot cross the boundary between the components.

Batch Aggregator component fields in Anypoint Studio

The following table describes each field and includes the XML element for the field.

Field Name XML Description

Display Name

doc:name

Configurable name for the component. Example: doc:name="My Aggregator". The default name is Batch Aggregator.

Aggregator Size

size

Size of each array of records to process. There is no default size. The processor within the component must accept an array as input. The component continues pulling records from the batch stepping queue into arrays of the configured size until none are left. If the queue contains fewer records than the configured size, the component pulls those records into a smaller array for processing. You must set the Size (size) or Streaming (streaming) property but not both.

When using this configuration, the component does not wait for the Batch Step component to finish processing all records in an instance or batch. Once the set number of records is available in the stepping queue, the Batch Aggregator component pulls those records into an array and processes them.

Streaming

streaming

When streaming=true, the component acts on all records in a batch job instance and returns the results in an array of a size equal to the number of records in the instance. The batch size setting through the Batch Job component does not limit the number of records to process or return in the array. For example, assume that the aggregator returns the numeric IDs of a small batch job instance that processes only ten records of the form {"id": 1}, {"id": 2}, and so on. In this case, the output array for the instance contains ten IDs, such as [1, 6, 2, 7, 3, 8, 4, 9, 5, 10], even if the blockSize on the Batch Job component is set to a number smaller than the number of records in the instance, such as 5.

Processors can gain sequential access to the array of streamed records. Random access to the streamed records is not allowed because the aggregator does not preserve the processed records.

There is no default. You must set the Size (size) or Streaming (streaming) property but not both.

Preserve Mime Types

preserveMimeTypes

Defaults to false. To preserve the MIME type of records processed within this component, set this attribute to true. Processors that cannot determine the MIME type of data return output, such as null, or unreadable references to the payload, such as [[B@e596d5c, [B@1c4c3b32], …​]. Setting preserveMimeTypes=true is useful when this component is set to process a fixed number (size) of records.

When deciding whether to stream aggregated records, consider the following:

  • SaaS providers often have restrictions on accepting streaming input.

  • Batch streaming is often useful in batch processing when writing to a file such as CSV, JSON, or XML.

  • Batch streaming affects the performance of your application because the array of records is stored in RAM, which can slow the pace of transaction processing. Though performance slows, the trade-off of streaming data can warrant its use.

  • Streaming prevents random access to records because a streaming commit is a one-read, forward-only iterator. However, you can perform random access operations when streaming if you use a component like For Each. For Each provides a separate array of records, which is accessible through its records variable.