Nav
You are viewing an older version of this section. Click here to navigate to the latest version.

Poll Reference

While some connectors, such as [HTTP] and [FTP], utilize a polling process to actively retrieve messages from an external resource, most message processors in Mule are triggered when called by a previous element in a flow. If you want to arrange for a message processor to actively call a resource at regular intervals, use a Poll scope.

Assumption

This document assumes that you are familiar with the [Visual Editor in Anypoint Studio] and that you can use to create applications in Studio. Further, this document references [Global Elements] and the category of message processors known as [scopes]. Review the [Anypoint Studio Essentials] to learn more about developing with Mule ESB’s graphical user interface.

Configuring a Poll Wrapper

  1. Drag a Poll scope onto the canvas.

    poll_scope

  2. Drag an element inside the poll scope, for example a Twitter connector. Mule uses this message processor to regularly poll the Twitter API for new data to process.

    poll_connector

  3. Configure the Twitter Connector

    twitter

    Attribute Value Description

    Operation

    Get user timeline by screen name

    Captures recent tweets by a particular Twitter user

    Screen Name

    MuleSoft

    Return tweets by the official MuleSoft account. You can use any Twitter account you want.

    config reference

    Twitter

    You must create a Twitter connector as a [global element] to reference here. For your Twitter connector, you need a Twitter developer account.

  4. Add more message processors to your flow to perform specific business logic after polling for data. The example below uses a logger component. This [logger] uses the [MEL] expression #[payload] to log the message payload collected by the Twitter connector every 1000 milliseconds.

    poll_flow

  1. Add a Poll element to your flow.

    Element Description Sample XML

    poll

    Polls an element at regular intervals

    <poll frequency="1000"doc:name="Poll">

    Attribute Name Value Description Sample XML

    frequency

    integer

    The interval, in milliseconds, between calls

    frequency="1000"

    doc:name

    string

    (Studio Only) Display a unique name for the poll scope in your application.

    doc:name="Poll"

  2. Add Twith connector as child element of the poll element.

    Child Element Description Sample XML

    twitter:<operation>

    Connects to Twitter. The "operation" is the activity Twitter performs. In the example at right, show recent tweets by a particular Twitter user.

    <twitter:get-user-timeline-by-screen-name config-ref= "Twitter" screenName= "mulesoft" doc:name= "Twitter" />

    Attribute Name Value Description XML

    screenName

    string

    Twitter account to return tweets from. In this example, the official MuleSoft account; you can use any valid Twitter account.

    screenName="MuleSoft"

    config-ref

    string

    You must create a Twitter connector as a [global element] to reference here. For your Twitter connector, you need a Twitter developer account.

    config-ref="Twitter"

    doc:name

    string

    (Studio only) Name of the poll scope as it will be displayed in your application.

    doc:name"Twitter"

  3. Add more message processor to your flow to perform specific business logic after polling for data. The example below uses a [logger] component. This logger uses the MEL expression #[payload] to log the message payload collect by the Twitter connector every 1000 milliseconds.

    Element Description Sample XML

    logger

    Logs the message payload

    <logger message="#[payload]"level="INFO"doc:name="Logger"/>

    Attribute Name Value Description

    message

    string or [Mule expression]

    The content that will be logged to console. In this case, the Mule Expression #[payload] will output the message payload.

    level

    string, on of ERROR, WARN, INFO, DEBUG, or TRACE

    The priority level of the notification, in this case INFO.

    doc:name

    string

    (Studio only) Name of the logger component as it will appear in your application.

Complete Code Example

Keep in mind that for this example to work, you must manually configure the following values of the global Twitter connector (twitter:config element):

  • accessKey

  • accessSecret

  • consumerKey

  • consumerSecret


          
       
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<mule xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:twitter="http://www.mulesoft.org/schema/mule/twitter" xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" version="EE-3.5.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/twitter http://www.mulesoft.org/schema/mule/twitter/2.4/mule-twitter.xsd http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd">
   
  <twitter:config name="Twitter" accessKey="" accessSecret="" consumerKey="" consumerSecret="" doc:name="Twitter">
        <twitter:connection-pooling-profile initialisationPolicy="INITIALISE_ONE" exhaustedAction="WHEN_EXHAUSTED_GROW"/>
    </twitter:config>
   
    <flow name="test1" doc:name="test1" processingStrategy="synchronous">
        <poll frequency="1000" doc:name="Poll">
            <twitter:get-user-timeline-by-screen-name config-ref="Twitter"      doc:name="Twitter" screenName="MuleSoft" sinceId="#[flowVars['lastID']]"/>
        </poll>
        <logger message="#[payload]" level="INFO" doc:name="Logger"/>
        
    </flow>
</mule>

Pausing Polling During Debugging

While trying to debug an application the utilizes polling functionality, it is challenging to constantly trigger flow processing so as to monitor Mule activity. To trigger polling during testing, use the Poll Resume and Poll Pause mechanisms available with Studio’s Visual Debugger.

  1. Run your project in Debug Mode. (Refer to the [Visual Debugger] document for greater detail.)

  2. In debug mode, notice the set of buttons in the title bar of the Poll scope (see image below). Use the green Poll Start-Pause button to trigger the poll and initiate the flow; click again to halt polling.

    poll_debug

  3. While running your project in debug mode, Studio opens a new Mule Debugger View tab the lower section of the screen. In the top-right of this tab, Studio displays two icons which allow you to stop or start all polls in your project at the same time.

    poll_startStop

Polling for Updates using Watermarks

Rather than polling a resource for all its data with every call, you may want to acquire only the data that has been newly created or updated since the last call. To acquire only new or updated data, you need to keep a persistent record of either the item that was last processed, or the time at which your flow last polled the resource. In the context of Mule flows, this persistent record is called a watermark.

Typically, Mule sets a watermark to a default value the first time the flow runs, then uses it as necessary when running a query or making an outbound request (i.e. calling a resource). Depending upon how the flow processes the results of the call, Mule may update the original value of the watermark or maintain the original value. As the value must persist across flows, Mule uses an object store for persistent storage. Built into the poll scope, object stores require no custom logic. You can configure watermarks by setting a couple of attributes.

Consider the following generic Mule flow.

watermark-expbasic

This flow regularly polls a resource, then performs a series of operations on the resulting payload. With every poll, the application acquires only the data that is newly created or updated since the last call to the resource. In this example, Mule stores watermarks in two variables:

  • a persistent object store variable

  • an exposed flow variable

If you’re already comfortably familiar with Mule components in general, you might find [this blog post] to be a clear explanation, as it explains the watermark by replicating its behavior with a series of other Mule components.

The diagram below illustrates same flow including numbered steps. The step-by-step explanation below describes the activities Mule performs in the background with these two variables.

watermark-w-selectors

  1. Mule looks for a variable in the object store with a name that matches the value of the Poll attribute "Variable Name". In this case the chosen name is lastModifiedID.

  2. If Mule finds a variable by this name, Mule exposes it by creating a flow variable (flowVar) with the same name.

    The first time the poll runs, no object store variable exists by this name. In this case, Mule creates a flow variable anyway, and loads it with the value you provide in the Default Expression attribute. In this case, the initial value is 0.
  3. Mule polls the resource. Connectors inside the poll should include filters that accept the flowVars as an attribute, as per the code below.

    
                
             
    1
    
    sinceId="#[flowVars['lastModifiedID']]"
  4. Mule executes the rest of the flow.

  5. When the flow has completed execution, Mule updates the value of the flowVars according to either the Update Expression or a combination of the Selector Expression and the chosen Selector. In this case, the Selector Expression is #[payload.id], and the Selector is LAST, so Mule will inspect the id attribute of each of the returned objects and pick the last of these as the new value for the lastmodifiedID flowVars.

  6. Mule saves the flowVars back into the object store. If no variable existed in the object store in step 1, Mule creates a new variable in the object store.

    If you define a value in the optional "Object Store" poll attribute, Mule searches for an object store by your value instead of the default user object store.

List of Watermark Attributes

Attributes XML Element Required Default Description

Variable Name

variable

X

-

Identifies both the object store key that Mule uses to store the watermark, and the name of the flowVars where Mule exposes the watermark value to the user.

Defualt Expression

default-expression

X

-

If Mule cannot locate the object store key if uses the default expression to generate a value. This is useful for the first run of the flow.

Update Expression

update-expression

Value of the variable attribute

Mule uses the result of this expression to update the watermark once flow execution is complete. Use this expression as an alternative to a selector in case you need to follow a more complex logic.

Selector

selector

-

The criteria Mule will use to pick the next value for the flowVars. There are four available selectors: MIN, MAX, FIRST and LAST. If you use this attribute, you must also provide a value for Selector Expression.

Selector Expression

selector-expression

-

Mule executes this expression on every object returned by the Poll. The Selector then collects the returned values and picks one according to the chosen criteria. If you use this attribute, you must also provide a value for the Selector.

Object Store

object-store-ref

The default user object store.

A reference to the object store in which you wish to store the watermarks.

Configuring Polling with Watermarks

  1. Follow the steps above to create a flow that polls Twitter for data every 1000 milliseconds, then logs the message payload.

    poll_flow

  2. Click to flow name bar to select the flow, then, in the properties editor, set the Processing Strategy to synchronous.

    All flows use an asynchronous processing strategy by default. If you do not set the processing strategy to synchronous, polling with watermarks will not work!

    synchronous

  3. Configure the Since Id attribute of the Twitter connector according to the table below.

    watermark

    Attribute Value Description

    Since Id

    #[flowVars["lastID"]]

    Instructs the connector to return only those tweets with an ID greater than the value of the lastID variable. lastID is a flow variable that Mule creates, then updates every time the poll runs.

  4. Select the poll scope, then edit its properties according to the table below.

    watermark_enable

    Attribute Value Description XML

    Fixed Frequency Scheduler

    1000

    Run the Poll every 1000 milliseconds

    Start Delay

    0

    Delays polling by 0 milliseconds

    Time Unit

    MILLISECONDS

    Use milliseconds as unit for the frequency and delay settings

    Enable Watermark

    true

    Enable using the Watermark

    Variable Name

    lastID

    Mule creates two variables

    • a persistent object store variable with the provided name

    • a flow variable that the Twitter Connector references in its sinceID filter

    variable="lastID"

    Default Expression

    -1

    the value that lastID uses the first time Mule executes the poll, or whenever the watermark can’t be found.

    default-expression="-1"

    Selector

    FIRST

    Pick the FIRST value returned by the Selector Expression to update the lastID variable each time the flow execution completes. In this case, it takes the id of the first tweet in the generated output (i.e. the most recent one).

    selector="FIRST"

    Selector expression

    #[payload.id]

    Return the id of each object in the generated output, this value is passed on the Selector

    selector-expression="#[payload.id]"

    Update Expression

    -

    Not needed. Selector and Selector Expression are being used

  1. Follow the steps above to create a flow that polls Twitter for data every 1000 milliseconds, then logs the message payload.

  2. In the flow, set the value of the processingStrategy attribute to synchronous.

    All flows use an asynchronous processing strategy by default. If you do not set the processing strategy to synchronous, polling with watermarks will not work!
    
           
                   
                
    1
    
    &lt;flow name="test1" doc:name="test1" processingStrategy="synchronous"&gt;
  3. Within the poll scope, add a watermark child element according to the table below.

    Element Description Sample XML

    watermark

    Keeps a persistent record of the last element that was processed, or the last time a sync was performed

    <watermark variable="lastID" default-expression="-1" selector="FIRST" selector-expression="#[payload.id]"/>

  4. Add attributes to the watermark child element according to the table below.

    
           
                   
                
    1
    
    &lt;watermark variable="lastID" default-expression="-1" selector="FIRST" selector-expression="#[payload.id]"/&gt;
    Attribute Name Value Description Sample XML

    variable

    string

    Mule creates two variables:

    • a persistent object store variable with the provided name

    • a flow variable that the Twitter Connector references in its sinceID filter.

    variable= "lastID"

    default-expression

    integer

    The value that lastID uses the first time Mule executes the poll, or whenever the watermark can’t be found.

    default -expression= "-1"

    Selector

    FIRST

    Pick the FIRST value returned by the Selector Expression to update the lastID variable each time the flow execution completes. In this case, it’s the id of the first tweet in the generated output (i.e. the most recent one).

    selector="FIRST"

    Selector expression

    #[payload.id]

    Return the id of each object in the generated output, this value is passed on to the Selector.

    selector-expression="#[payload.id]"

  5. Configure the Since Id attribute of the Twitter connector according to the table below:

    Attribute Value Description Sample XML

    sinceId

    string or Mule expression

    Instructs the connector to return only those tweets with an ID greater than the value of the lastID variable. lastID is a flow variable that Mule creates, then updates every time the poll runs.

    sinceId="#[flowVars['lastID']]"

    
           
                   
                
    1
    2
    3
    4
    5
    6
    7
    
    &lt;flow name="test1" doc:name="test1" processingStrategy="synchronous"&gt;
        &lt;poll frequency="1000" doc:name="Poll"&gt;
            &lt;watermark variable="lastID" default-expression="-1" selector="FIRST" selector-expression="#[payload.id]"/&gt;
                &lt;twitter:get-user-timeline-by-screen-name config-ref="Twitter"      doc:name="Twitter" screenName="MuleSoft" sinceId="#[flowVars['lastID']]"/&gt;
        &lt;/poll&gt;
        &lt;logger message="#[payload]" level="INFO" doc:name="Logger"/&gt;
    &lt;/flow&gt;

Example Code

Keep in mind that for this example to work, you must manually configure the following values of the global Twitter connector (twitter:config element):

  • accessKey

  • accessSecret

  • consumerKey

  • consumerSecret


          
       
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<mule xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:twitter="http://www.mulesoft.org/schema/mule/twitter" xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" version="EE-3.5.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/twitter http://www.mulesoft.org/schema/mule/twitter/2.4/mule-twitter.xsd http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd">
  
  <twitter:config name="Twitter" accessKey="xyz" accessSecret="xys" consumerKey="xyz" consumerSecret="xyz" doc:name="Twitter">
        <twitter:connection-pooling-profile initialisationPolicy="INITIALISE_ONE" exhaustedAction="WHEN_EXHAUSTED_GROW"/>
    </twitter:config>
  
    <flow name="test1" doc:name="test1" processingStrategy="synchronous">
        <poll frequency="1000" doc:name="Poll">
            <watermark variable="lastID" default-expression="-1" selector="FIRST" selector-expression="#[payload.id]"/>          
            <twitter:get-user-timeline-by-screen-name config-ref="Twitter"      doc:name="Twitter" screenName="MuleSoft" sinceId="#[flowVars['lastID']]"/>
        </poll>
        <logger message="#[payload]" level="INFO" doc:name="Logger"/>
       
    </flow>
</mule>

Variation for Updating the Flow Variable

As described above, the watermark element includes two ways to update the flow variable (flowVars) every time flow execution completes:

  • set an expression in the attribute update-expression

  • set an expression in the attribute selector-expression, and a criteria in selector

However, neither of these options support exception handling strategies; you may wish to add more complex logic rules to the process of updating the flow variable. To do so, you can use other message processors in your flow to set the flow variable using custom logic.

Add code, such as the example below, into a Java class, wrapping your extra custom logic around it.


          
       
1
#[flowVars['lastModifiedID']] = #[payload.id]
If you are using custom logic to update the flowVars, ensure that the radio button for update-expression is selected, but that the field is left empty. If the update-expression attribute has a value, Mule stores new watermark information on the flowVars according to that attribute, overwriting any custom logic you may have defined for updating the variable.

The image below displays a sample flow which updates the flowVars using custom logic; note the empty update-expression attribute.

watermark-expcomplex

Using Watermarks with Auto-Paging

Any connector which is enabled for [auto-paging] allows you to process large data sets in separate batches. This capability mitigates for memory overload, but also imposes certain conditions when used in conjunction with watermarks. The following sections illustrate two recommended methods for using watermarks when polling a connector that auto-pages its response.

Example 1 - Setting a Variable Inside a Foreach Scope

  1. Place an auto-paging-enabled-connector inside a poll scope as in previous examples.

    ex11

  2. Configure the connector according to the following screenshot. Note that the query orders the output in ascending order of LastModifiedDate so that the last item in the list is the newest. The details is crucial.

    salesforce

    Be sure to configure the order of the output so that the LAST element in the collection is the most recent one!
  3. Configure the poll scope according to the table below. The watermark will be a variable named lastUpdated. When the flow has finished processing, Mule updates the value of the variable to the value of the flow variable by the same name, lastUpdated. Its default value is the result of evaluating the following expression: #['YESTERDAY'].

    yesterday

    Attribute Name Description Sample XML

    Variable Name

    The watermark will be a variable named lastUpdated

    variable= "lastUpdated"

    default-expression

    The default value of lastUpdated will be the result of evaluating #['YESTERDAY']

    #['YESTERDAY']

    *update-expression

    Mule updates lastUpdated to the value of the flow variable by the same name lastUpdated

    #[flowVars['lastUpdated']]

  4. Next, you need to process the output of the connector with an element that can handle collections, such as a [Foreach] scope. The message processors set within the Foreach scope process each item in a collection individually, one at a time.

    ex12

  5. Inside the Foreach scope, place a Variable transformer. This message processor sets the value of a variable on each iteration of the Foreach scope, overwriting the variable’s value each time. Since you ordered your collection so that the last element is the newest, the value that "sticks" at the end of the Foreach’s iteration is what Mule needs for the watermark.

    ex13

  6. Configure the Variable to change lastUpdated to the following expression: #[payload['lastUpdated]]

    setvariable

    Attribute Value Explanation

    Name

    lastUpdated

    The name of the variable to update.

    Value

    #[payload['lastUpdated']]

    The source from which to obtain the value.

  7. Add a Logger at the end of your flow, outside the foreach scope, to log the value of lastUpdated. The logger also allows you to verify that the watermark works.

    fullex1

  1. Add a poll element to your flow, then add a watermark variable as a child element. The watermark will be a variable named lastUpdated. When the flow has finished processing, Mule updates the value of the variable to the value of the flow variable by the same name, lastUpdated. Its default value is the result of evaluating the following expression: #['YESTERDAY'].

    
           
                    
                 
    1
    2
    3
    
    &lt;poll frequency="100000" doc:name="Poll"&gt;
                &lt;watermark variable="nextSync" default-expression="#['YESTERDAY']" update-expression="#[flowVars['lastUpdated']]"/&gt;
    &lt;/poll&gt;
    Attribute Description Sample XML

    variable

    The watermark will be a variable named lastUpdated

    variable= "lastUpdated"

    default-expression

    The default value of lastUpdated will be the result of evaluating #['YESTERDAY'].

    #['YESTERDAY']

    update-expression

    lastUpdated will be updated to the value of the flow variable by the same name, lastUpdated

    #[flowVars['lastUpdated']]

  2. Add an auto-paging-enabled connector as a child element of the poll element. Note that the query orders the output in ascending order of LastModifiedDate so that the last item in the list is the newest. This detail is critical.

    
           
                    
                 
    1
    2
    3
    4
    
    &lt;poll frequency="100000" doc:name="Poll"&gt;
                &lt;watermark variable="nextSync" default-expression="#['YESTERDAY']" update-expression="#[flowVars['lastUpdated']]"/&gt;
                &lt;sfdc:query config-ref="" query="dsql:SELECT Email,FirstName,LastModifiedDate,LastName FROM Contact WHERE LastModifiedDate &amp;gt; #[flowVars['nextSync']] ORDER BY LastModifiedDate ASC LIMIT 100" doc:name="Salesforce"/&gt;
    &lt;/poll&gt;
    Be sure to configure the order of the output so that the LAST element in the collection is the most recent one!
  3. Next, you need to process the output of the connector with an element that can handle collections, such as a Foreach scope. The message processors set within the [Foreach] scope process each item in a collection individually, one at a time.

  4. Inside the Foreach scope, place a Variable transformer. This message processor sets the value of a variable on each iteration of the Foreach scope, overwriting the variable’s value each time. Since you ordered your collection so that the last element is the newest, the value that "sticks" at the end of the Foreach’s iteration is what Mule needs for the watermark. Configure the variable to change the value of lastUpdated to the value resulting from evaluating the following expression: #[payload['lastUpdated]]

    
           
                    
                 
    1
    2
    3
    
    &lt;foreach doc:name="For Each"&gt;
        &lt;set-variable variableName="lastUpdated" value="#[payload['lastUpdated']]" doc:name="Variable"/&gt;
    &lt;/foreach&gt;
    Attribute Value Explanation

    variableName

    lastUpdated

    The name of the variable to update.

    value

    #[payload['lastUpdated']]

    The source from which to obtain a value.

  5. Add a Logger at the end of your flow, outside the foreach scope, to log the value of lastUpdated. The logger also allows you to verify that the watermark works.

    
           
                    
                 
    1
    
    &lt;logger message="#[flowVars['lastUpdated']]" level="INFO" doc:name="Logger"/&gt;

          
       
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?xml version="1.0" encoding="UTF-8"?>
 
<mule xmlns:netsuite="http://www.mulesoft.org/schema/mule/netsuite" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:data-mapper="http://www.mulesoft.org/schema/mule/ee/data-mapper" xmlns:sfdc="http://www.mulesoft.org/schema/mule/sfdc" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:core="http://www.mulesoft.org/schema/mule/core" version="EE-3.5.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/ee/data-mapper http://www.mulesoft.org/schema/mule/ee/data-mapper/current/mule-data-mapper.xsd
http://www.mulesoft.org/schema/mule/sfdc http://www.mulesoft.org/schema/mule/sfdc/current/mule-sfdc.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/netsuite http://www.mulesoft.org/schema/mule/netsuite/3.0/mule-netsuite.xsd">
    <sfdc:config name="Salesforce" username="example@mulesoft.com.sap" password="password" securityToken="fKESXfSAj4398t3uhh8xotw9Uc" doc:name="Salesforce">
        <sfdc:connection-pooling-profile initialisationPolicy="INITIALISE_ONE" exhaustedAction="WHEN_EXHAUSTED_GROW"/>
    </sfdc:config>
    <data-mapper:config name="sfdc_csv_grf" transformationGraphPath="sfdc-csv.grf" doc:name="DataMapper"/>
    <data-mapper:config name="new_mapping_grf" transformationGraphPath="new_mapping.grf" doc:name="DataMapper"/>
    <flow name="example1" doc:name="example1">
        <poll frequency="100000" doc:name="Poll">
            <watermark variable="nextSync" default-expression="#['YESTERDAY']" update-expression="#[flowVars['lastUpdated']]"/>
            <sfdc:query config-ref="Salesforce" query="dsql:SELECT Email,FirstName,LastModifiedDate,LastName FROM Contact WHERE LastModifiedDate &gt; #[flowVars['nextSync']] ORDER BY LastModifiedDate ASC LIMIT 100" doc:name="Salesforce"/>
        </poll>
        <foreach doc:name="For Each">
            <set-variable variableName="lastUpdated" value="#[payload['lastUpdated']]" doc:name="Variable"/>
        </foreach>
        <logger message="#[flowVars['lastUpdated']]" level="INFO" doc:name="Logger"/>
    </flow>
</mule>

Example 2 - Setting a Variable DataMapper

  1. Place an auto-paging-enabled connector inside a poll scope as in the previous examples.

    ex11

  2. Configure the connector according to the following screenshot. Note that the query orders the output in ascending order of LastModifiedDate so that the last item in the list is the newest. This detail is critical.

    dsql_sfdc

    Be sure to configure the order of the output so that the LAST element in the collection is the most recent one! ORDER BY LastModifiedDate
  3. Configure the poll scope according to the table below. The watermark will be a variable named lastSync. When the flow has finished processing, Mule updates the value of the variable to the value of the flow variable by named lastUpdated. Its default value is the result of evaluating the following expression: #['YESTERDAY'].

    poll_watermark_variable

    Attribute Description Sample XML

    Variable Name

    The watermark will be a variable named lastSync

    variable= "lastSync"

    default-expression

    The default value of lastSync will be the result of #['YESTERDAY'].

    #['YESTERDAY']

    update-expression

    lastSync updates to the value of the flow variable lastUpdated

    #[flowVars['lastUpdated']]

  4. Next, you need to process the output of the connector with an element that can handle collections, such as a [DataMapper] transformer. DataMapper can process each item in a collection individually, one at a time.

    ex22

  5. Configure the [DataMapper] so that each variable has its pairing, then add one extra output argument named lastModified. This output argument sets the value of a variable on each iteration of the DataMapper, overwriting it with a new value each time. Because you ordered your collection so that the last element is the newest, the value that "sticks" at the end of the DataMapper’s iteration is what Mule needs for the watermark.

In other words, the value of the LastModifiedDate of the last item in the collection becomes the new value of the lastUpdated flow variable, which, in turn, Mule will use to update the value of the variable lastSync the next time the poll executes.

+ datamapper+clean

  1. Add a Logger at the end of your flow, outside the foreach scope, to log the value of lastUpdated. The logger also allows you to verify that the watermark works.

    fullex2


          
       
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?xml version="1.0" encoding="UTF-8"?>
 
<mule xmlns:netsuite="http://www.mulesoft.org/schema/mule/netsuite" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:data-mapper="http://www.mulesoft.org/schema/mule/ee/data-mapper" xmlns:sfdc="http://www.mulesoft.org/schema/mule/sfdc" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:core="http://www.mulesoft.org/schema/mule/core" version="EE-3.5.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/ee/data-mapper http://www.mulesoft.org/schema/mule/ee/data-mapper/current/mule-data-mapper.xsd
http://www.mulesoft.org/schema/mule/sfdc http://www.mulesoft.org/schema/mule/sfdc/current/mule-sfdc.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/netsuite http://www.mulesoft.org/schema/mule/netsuite/3.0/mule-netsuite.xsd">
    <sfdc:config name="Salesforce" username="example@mulesoft.com.sap" password="password" securityToken="fKESXfSAj43p8tjrnweo8otw9Uc" doc:name="Salesforce">
        <sfdc:connection-pooling-profile initialisationPolicy="INITIALISE_ONE" exhaustedAction="WHEN_EXHAUSTED_GROW"/>
    </sfdc:config>
    <data-mapper:config name="sfdc_csv_grf" transformationGraphPath="sfdc-csv.grf" doc:name="DataMapper"/>
    <data-mapper:config name="new_mapping_grf" transformationGraphPath="new_mapping.grf" doc:name="DataMapper"/>
    <flow name="example2" doc:name="example2" >
        <poll frequency="100000" doc:name="Poll">
            <watermark variable="lastSync" default-expression="#['YESTERDAY']"  update-expression="#[flowVars['lastUpdated']]"/>
            <sfdc:query config-ref="Salesforce" query="dsql:SELECT Email,FirstName,LastModifiedDate,LastName FROM Contact WHERE LastModifiedDate &gt; #[flowVars['lastSync']] ORDER BY LastModifiedDate ASC LIMIT 100" doc:name="Salesforce"/>
        </poll>
        <data-mapper:transform config-ref="new_mapping_grf" returnClass="java.lang.String" doc:name="DataMapper"/>
        <logger message="#[flowVars['lastUpdated']]" level="INFO" doc:name="Logger"/>
    </flow>
</mule>

See Also

  • Learn how to configure a [polling schedule].

  • Read an article in the [MuleSoft Blog] about using watermarks to synchronize two systems

  • Learn more about [Logger].

  • Learn more about [Anypoint™ Connectors] and [auto-paging].

  • Learn more about [flow processing strategies].