Nav
You are viewing an older version of this section. Click here to navigate to the latest version.

Batch Streaming and Job Execution

Enterprise, CloudHub

Anypoint Studio comes bundled with the ability to stream batch processing and configure the order of batch job execution. 

Streaming Batch Commits

Mule offers the ability to process fixed-size groups of records – say, upserting 100 records at a time from a CSV file to Salesforce. In the Early Access release, Mule also supports streaming commits, which enables you to batch process all the records in the job instance, no matter how many or how large. For example, if you need to write millions of records from Salesforce to a CSV file, you can process the records as a streaming batch commit.

Instead of a list of elements that you would receive with a fixed-size batch commit, the streaming functionality – an iterator – ensures that you receive all the records in a batch job without running out of memory. By combining streaming batch commit and DataMapper streaming in a flow, you can transform large datasets in one single operation and one single write to disk. The example below illustrates the actions Mule takes to batch process streaming data. 

example_actions

To enable your application to batch process streaming data, configure both the batch commit and the DataMapper to enable streaming.

In general, you likely wouldn’t use batch streaming when sending data through an Anypoint Connector to a SaaS provider, like Salesforce, because SaaS providers often have restrictions on accepting streaming input. Rather, use streaming batch processing when writing to a file such as CSV, JSON, or XML.

Element Studio Visual Editor XML

Batch Commit

In the Properties Editor of Batch Commit element, click the checkbox to enable Streaming.

batch_commit_stream


              
           
1
<batch:commit streaming="true" doc:name="Batch Commit">

DataMapper

  1. In the Mapping Editor of the DataMapper element, click the mapping icon, then select Configuration.

    configuration

  2. In the Configuration Properties Editor, click the checkbox to enable Streaming.

    streaming_DM


              
           
1
<data-mapper:transform config-ref="listcontact_to_csv" doc:name="List<Contact> To CSV" stream="true"/>

Batch-processing streaming data affects the performance of your application, slowing the pace at which it processes transactions. Though performance slows, the trade-off of being able to batch-process streaming data may warrant using it in your implementation.

Ordering Batch Job Execution

To account for situations in which different batch jobs may be competing for resources, you can adjust the configuration of the Batch’s Scheduling Strategy. For example, if one batch job instance is dependent upon another’s completion, you can set the scheduling strategy to sequentially process the batch job instances according to the order they were created. Where non-sequential processing of batch jobs could cause problems in data consistency, be sure to set the scheduling strategy to process them sequentially.

The table below describes how the configuration of this attribute instructs Mule how to submit batch job instances to a target resource.

Value of
scheduling-strategy
Attribute
Description Configuration

ORDERED_SEQUENTIAL (Default)

Mule executes batch job instances sequentially, according to the order in which Mule created them.

If Mule created a job instance at 12:00:00 and then created another at 12:00:01, Mule does not execute the second instance until the first one leaves the executable state.

Note that as this value is the default, the XML config does not explicitly display the configuration.

ordered_seq


              
           
1
<batch:job name="testBatch1" >

ROUND_ROBIN

Mule executes batch job instance according to a random, round robin pattern.

round_robin


              
           
1
<batch:job name="testBatch1" scheduling-strategy="ROUND_ROBIN">

If your application uses more than one batch job, you must individually define the scheduling strategy of each. Mule configures scheduling strategy at the batch job level, meaning that a ROUND_ROBIN or ORDERED_SEQUENTIAL configuration only applies to instances of the same batch job. ==== 

== Tips

  • Streaming from SaaS providers: In general, you likely wouldn’t use batch streaming when sending data through an Anypoint Connector TO a SaaS provider, like Salesforce, because SaaS providers often have restrictions on accepting streaming input. Rather, use streaming batch processing when writing to a file such as CSV, JSON, or XML.

  • Batch streaming and performance: Batch processing streaming data does affect the performance of your application, slowing the pace at which it processes transactions. Though performance slows, the trade-off to be able to batch process streaming data may warrant using it in your implementation.

  • Batch streaming and access to items: The biggest drawback to using batch streaming is that you have limited access to the items in the output. In other words, with a fixed-size commit, you get an unmodifiable list, thus allowing you to access and iteratively process its items; with streaming commit, you get a one-read, forward-only iterator. 

  • Setting multiple scheduling strategies: Setting all your application’s batch jobs’ scheduling strategies to ORDERED_SEQUENTIAL does not ensure that job instances created in one batch job respect the order in which job instances were created in a separate batch job. Setting the scheduling strategy only enforces the order in which Mule processes instances of the same job.

== Complete Code Example

sfdc_to_scv_streaming


    
                 
              
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
&lt;?xml version="1.0" encoding="UTF-8"?&gt;
 
&lt;mule xmlns:context="http://www.springframework.org/schema/context" xmlns:file="http://www.mulesoft.org/schema/mule/file"
 
    xmlns:batch="http://www.mulesoft.org/schema/mule/batch" xmlns:data-mapper="http://www.mulesoft.org/schema/mule/ee/data-mapper" xmlns:sfdc="http://www.mulesoft.org/schema/mule/sfdc" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
 
    xmlns:spring="http://www.springframework.org/schema/beans" version="EE-3.5.0"
 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 
    xsi:schemaLocation="http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
 
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
 
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
 
http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd
 
http://www.mulesoft.org/schema/mule/ee/data-mapper http://www.mulesoft.org/schema/mule/ee/data-mapper/current/mule-data-mapper.xsd
 
http://www.mulesoft.org/schema/mule/sfdc http://www.mulesoft.org/schema/mule/sfdc/current/mule-sfdc.xsd
 
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-current.xsd"&gt;
     
    &lt;sfdc:config name="Salesforce56" username="${sfdc.username}" password="${sfdc.password}" securityToken="${sfdc.securityToken}" url="${sfdc.url}" doc:name="Salesforce"&gt;
        &lt;sfdc:connection-pooling-profile initialisationPolicy="INITIALISE_ONE" exhaustedAction="WHEN_EXHAUSTED_GROW"/&gt;
    &lt;/sfdc:config&gt;
    &lt;data-mapper:config name="listcontact_to_csv" transformationGraphPath="list&amp;lt;contact&amp;gt;_to_csv.grf" doc:name="listcontact_to_csv"/&gt;
    &lt;context:property-placeholder location="mule-app.properties"/&gt;
 
     &lt;batch:job name="sf-to-csv-sync" max-failed-records="-1" &gt;
        &lt;batch:threading-profile poolExhaustedAction="WAIT" /&gt;
 
        &lt;batch:input&gt;
            &lt;poll doc:name="Poll"&gt;
                &lt;fixed-frequency-scheduler frequency="10" startDelay="20" timeUnit="MINUTES"/&gt;
                &lt;watermark variable="nextSync" default-expression="2014-01-01T00:00:00.000Z"
                           doc:name="Get Next Sync Time" selector="MAX" selector-expression="#[payload.LastModifiedDate]"/&gt;
                    &lt;sfdc:query config-ref="Salesforce56" query="dsql:SELECT Email,FirstName,Id,LastModifiedDate,LastName FROM Contact WHERE CreatedDate &amp;gt;= #[flowVars['nextSync']] ORDER BY LastModifiedDate ASC" doc:name="Get Updated Contacts"/&gt;
            &lt;/poll&gt;
        &lt;/batch:input&gt;
 
        &lt;batch:process-records&gt;
            &lt;batch:step name="toCSV"&gt;
                &lt;batch:commit streaming="true" doc:name="Batch Commit"&gt;
                    &lt;data-mapper:transform config-ref="listcontact_to_csv" stream="true" doc:name="List&amp;lt;Contact&amp;gt; To CSV"/&gt;
                    &lt;file:outbound-endpoint outputPattern="contacts.csv" path="/Users/marianogonzalez/Desktop" responseTimeout="10000" doc:name="File" /&gt;
                &lt;/batch:commit&gt;
            &lt;/batch:step&gt;
        &lt;/batch:process-records&gt;
        &lt;batch:on-complete&gt;
            &lt;logger level="WARN" message="Total Records Loaded: #[message.payload.getLoadedRecords()], Failed Records: #[message.payload.getFailedRecords()], Processing time: #[message.payload.getElapsedTimeInMillis()]" doc:name="Logger"/&gt;
        &lt;/batch:on-complete&gt;
    &lt;/batch:job&gt;
&lt;/mule&gt;

== See Also