<mule xmlns:batch="http://www.mulesoft.org/schema/mule/batch" xmlns:data-mapper="http://www.mulesoft.org/schema/mule/ee/data-mapper" xmlns:sfdc="http://www.mulesoft.org/schema/mule/sfdc" xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd
http://www.mulesoft.org/schema/mule/ee/data-mapper http://www.mulesoft.org/schema/mule/ee/data-mapper/current/mule-data-mapper.xsd
http://www.mulesoft.org/schema/mule/sfdc http://www.mulesoft.org/schema/mule/sfdc/current/mule-sfdc.xsd">
<sfdc:config name="Salesforce" username="username" password="password" securityToken="SpBdsf98af9tTR3m3YVcm4Y5q0y0R" doc:name="Salesforce">
<sfdc:connection-pooling-profile initialisationPolicy="INITIALISE_ONE" exhaustedAction="WHEN_EXHAUSTED_GROW"/>
</sfdc:config>
<data-mapper:config name="new_mapping_grf" transformationGraphPath="new_mapping.grf" doc:name="DataMapper"/>
<data-mapper:config name="new_mapping_1_grf" transformationGraphPath="new_mapping_1.grf" doc:name="DataMapper"/>
<data-mapper:config name="leads_grf" transformationGraphPath="leads.grf" doc:name="DataMapper"/>
<data-mapper:config name="csv_to_lead_grf" transformationGraphPath="csv-to-lead.grf" doc:name="DataMapper"/>
<batch:job max-failed-records="1000" name="Create Leads" doc:name="Create Leads">
<batch:threading-profile poolExhaustedAction="WAIT"/>
<batch:input>
<file:inbound-endpoint path="src/test/resources/input" moveToDirectory="src/test/resources/output" responseTimeout="10000" doc:name="File"/>
<data-mapper:transform config-ref="csv_to_lead_grf" doc:name="CSV to Lead"/>
</batch:input>
<batch:process-records>
<batch:step name="lead-check" doc:name="Lead Check">
<enricher source="#[payload.size() > 0]" target="#[recordVars['exists']]" doc:name="Message Enricher">
<sfdc:query config-ref="Salesforce" query="dsql:SELECT Id FROM Lead WHERE Email = '#[payload["Email"]]'" doc:name="Find Lead"/>
</enricher>
</batch:step>
<batch:step name="insert-lead" doc:name="Insert Lead" accept-expression="#[recordVars['exists']]">
<logger message="Got Record #[payload], it exists #[recordVars['exists']]" level="INFO" doc:name="Logger"/>
<batch:commit size="200" doc:name="Batch Commit">
<sfdc:create config-ref="Salesforce" type="Lead" doc:name="Insert Lead">
<sfdc:objects ref="#[payload]"/>
</sfdc:create>
</batch:commit>
</batch:step>
<batch:step name="log-failures" accept-policy="ONLY_FAILURES" doc:name="Log Failures">
<logger message="Got Failure #[payload]" level="INFO" doc:name="Log Failure"/>
</batch:step>
</batch:process-records>
<batch:on-complete>
<logger message="#[payload.loadedRecords] Loaded Records #[payload.failedRecords] Failed Records" level="INFO" doc:name="Log Results"/>
</batch:on-complete>
</batch:job>
</mule>
Batch Processing Reference
Enterprise, CloudHub
Terminology
Term | Description | Element |
---|---|---|
Batch |
A group of records which Mule processes individually, by record. |
n/a |
Batch Commit |
A scope which accumulates records into chunks to prepare bulk upserts to external source or service. |
|
Batch Job |
The top-level element in an application in which Mule processes a message payload as a batch of records. The term batch job is inclusive of all four phases of processing: Input, Load and Dispatch, Process, and On Complete. |
|
Batch Job Instance |
An occurrence in a Mule application resulting from the execution of a batch job in a Mule flow. you can let Mule create the batch job instance in the Load and Dispatch phase for you, or you can specify a distinct job instance Id through a Mule Expression. This instance exists eternally. |
|
Batch Job Result |
POJO which contains information about the processing results of a batch job instance. You can use it to get information about execution progress. |
|
Batch Message Processor |
An element in a Mule flow which triggers execution of a batch job. |
|
Batch Phase |
Sequentially ordered stages through which batches pass as Mule processes them. |
|
Batch Step |
A child element of the batch job within which multiple message processors act upon records in a batch. |
|
Record |
A small part of a large message’s payload; a single instance of the result of Mule’s action to split a serialized message payload (i.e. a collection or array) into pieces for processing. |
n/a |
Block size |
Batch records are queued and scheduled in blocks. This element determines the size of the block that will be used for the job’s instances. |
|
Batch Elements
Studio Palette | XML Editor | Use |
---|---|---|
Batch |
|
Defines a batch "flow". |
Batch Commit |
|
Accumulates records into chunks to prepare bulk upserts to external source or service. |
Batch Reference |
|
Set within a Mule flow, triggers start of batch job. |
Batch Threading Profile |
|
Configures details regarding threads upon which Mule processes batch jobs. |
Record Variable |
|
Sets or removes |
Elements and Attributes
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
unique name for message processor |
x |
Defines unique identifier for batch commit wrapper. |
|
integer |
x |
Defines number of records to collect before initiating upsert chunk of records to external source or service. |
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
batch job name |
x |
Identifies the batch job to execute. |
|
unique name for message processor |
x |
Defines unique identifier for batch reference message processor; can be an expression. |
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
n/a |
n/a |
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
unique name for job |
x |
Defines unique identifier for job. |
|
|
|
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
n/a |
n/a |
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
n/a |
n/a |
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
unique name for message processor |
x |
Defines unique identifier for batch reference message processor. |
|
name for record-level variable |
x |
Identifies record-level variable for removal. |
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
unique name for message processor |
x |
Defines unique identifier for batch reference message processor. |
|
MEL expression |
x |
Defines value of named variable. |
|
|
name for record-level variable |
x |
Defines unique name for record-level variable. |
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
unique name for step |
x |
Defines unique identifier for step inside the batch job. |
|
ALL |
ALL = step processes all records, failed and successful. |
||
|
MEL expression |
Step processes only those records which, relative to the expression, evaluate to true (evaluate to false = skip record). |
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
WAIT |
Defines what a batch job should do if all threads are active. |
|
|
integer |
Defines the maximum number of active threads upon which Mule processes batch jobs. |
||
|
integer |
Defines the minimum number of active threads upon which Mule processes batch jobs. |
||
|
integer |
Defines, in milliseconds, the time a thread should live and remain idle before becoming inactive. |
||
|
integer |
Defines how long a batch job should wait for a thread to become available before timing out. |
||
|
integer |
Defines the size of the "overflow" memory which holds batch jobs while waiting for a thread to become available. |
Batch Commit Connectors
Several Anypoint Connectors have the ability to handle record-level errors without failing a whole batch commit (i.e. upsert). At runtime, these connectors keep track of which records were successfully accepted by the target resource, and which failed to upsert. Thus, rather than failing a complete group of records during a commit activity, the connector simply upserts as many records as it can, and tracks any failures for notification. The short – but soon to grow – list of such connectors follows:
-
Salesforce
-
NetSuite
BatchJobResult Processing Statistics
Statistic | Description |
---|---|
|
A String indicating the id of the executed job instance. |
|
A long indicating the number of milliseconds the batch job spent in executing state. |
|
A boolean indicating whether an exception was found on the on the complete phase. |
|
A boolean indicating whether an exception was found on the on the input phase. |
|
A boolean indicating whether an exception was found on the on the input phase. |
|
A long indicating the number of records that failed processing. |
|
If an exception was found in the input phase, then that Exception is returned; otherwise |
|
A long indicating the number of records loaded so far. Once the loading phase is completed, it should be equal to totalRecords. |
|
If an exception was found in the loading phase, then that Exception is returned; otherwise |
|
If an exception was found in the on complete phase, then that Exception is returned; otherwise |
|
A long indicating the number of records processed so far. It equals successfulRecords failedRecords, but it could be lower than totalRecords if the job is not finished. |
|
A long indicating the number of records processed so far. |
|
Total number of records in the batch. |
Example
For a full description of the example and steps the batch job takes in each phase of processing, see Batch Processing. |
XML Editor
If you copy-paste the code into your instance of Studio, be sure to enter your own values for the the global Salesforce connector:
How do I get a Salesforce security token?
|
See Also
-
Learn more about filters in batch processing.
-
Learn more about batch commit.
-
Learn more about setting and removing record-level variables.
-
Learn MEL expressions you can use in Batch jobs to simplify error handling
-
Review the basic anatomy of batch processing in Mule.