Rather than polling a resource for all its data with every call, you may want to acquire only the data that has been newly created or updated since the last call. To acquire only new or updated data, you need to keep a persistent record of either the item that was last processed, or the time at which your flow last polled the resource. In the context of Mule flows, this persistent record is called a watermark.
Typically, Mule sets a watermark to a default value the first time the flow runs, then uses it as necessary when running a query or making an outbound request (that is, calling a resource). Depending upon how the flow processes the results of the call, Mule may update the original value of the watermark or maintain the original value. As the value must persist across flows, Mule uses an object store for persistent storage. Built into the poll scope, object stores require no custom logic. You can configure watermarks by setting a couple of attributes.
Consider the following generic Mule flow.
This flow regularly polls a resource, then performs a series of operations on the resulting payload. With every poll, the application acquires only the data that is newly created or updated since the last call to the resource. In this example, Mule stores watermarks in two variables:
|
If you’re already comfortably familiar with Mule components in general, you might find this blog post to be a clear explanation, as it explains the watermark by replicating its behavior with a series of other Mule components.
|
The diagram below illustrates same flow including numbered steps. The step-by-step explanation below describes the activities Mule performs in the background with these two variables.
-
Mule looks for a variable in the object store with a name that matches the value of the Poll attribute "Variable Name"
. In this case the chosen name is lastModifiedID
.
-
If Mule finds a variable by this name, Mule exposes it by creating a flow variable (flowVar
) with the same name.
|
The first time the poll runs, no object store variable exists by this name. In this case, Mule creates a flow variable anyway, and loads it with the value you provide in the Default Expression attribute. In this case, the initial value is 0.
|
-
Mule polls the resource. Connectors inside the poll should include filters that accept the flowVars
as an attribute, as per the code below.
sinceId="#[flowVars['lastModifiedID']]"
-
Mule executes the rest of the flow.
-
When the flow has completed execution, Mule updates the value of the flowVars according to either the Update Expression
or a combination of the Selector Expression
and the chosen Selector
. In this case, the Selector Expression is #[payload.id]
, and the Selector is LAST
, so Mule inspects the ID attribute of each of the returned objects and picks the last of these as the new value for the lastmodifiedID
flowVars.
-
Mule saves the flowVars back into the object store. If no variable existed in the object store in step 1, Mule creates a new variable in the object store.
|
If you define a value in the optional “Object Store” poll attribute, Mule searches for an object store by your value instead of the default user object store.
|
List of Watermark Attributes
Attribute |
Description |
|
Identifies both the object store key that Mule uses to store the watermark, and the name of the flowVars where Mule exposes the watermark value to the user.
XML Element: variable
Required?: Yes
Default: None
|
|
If Mule cannot locate the object store key it uses the default expression to generate a value. This is useful for the first run of the flow.
XML Element: default-expression
Required?: Yes
Default: None
|
|
Mule uses the result of this expression to update the watermark once flow execution is complete. Use this expression as an alternative to a selector in case you need to follow a more complex logic.
XML Element: update-expression
Required?: No
Default: Value of the variable attribute.
|
|
The criteria Mule uses to pick the next value for the flowVars. There are four available selectors: MIN, MAX, FIRST, and LAST. If you use this attribute, you must also provide a value for Selector Expression.
XML Element: selector
Required?: No
Default: None
|
|
Mule executes this expression on every object returned by the Poll. The Selector then collects the returned values and picks one according to the chosen criteria. If you use this attribute, you must also provide a value for the Selector.
XML Element: selector-expression
Required?: No
Default: None
|
|
The default user object store. A reference to the object store in which you wish to store the watermarks.
XML Element: object-store-ref
Required?: No
Default: None
|
Configuring Polling with Watermarks
Studio Visual Editor
-
Follow the steps above to create a flow that polls Twitter for data every 1000 milliseconds, then logs the message payload.
-
Click the flow name bar to select the flow, and in the properties editor, set the Processing Strategy to synchronous.
|
All flows use an asynchronous processing strategy by default. If you do not set the processing strategy to synchronous, polling with watermarks does not work!
|
-
Click the Twitter connector and set Since Id to:
This value for the attribute instructs the connector to return only those tweets that have an ID greater than the value of the lastID
variable. lastID
is a flow variable that Mule creates, then updates every time the poll runs.
-
Select the poll scope, and edit its properties according to the table below.
Attribute |
Description |
Fixed Frequency Scheduler
|
Run the Poll every 1000 milliseconds.
|
|
Delays polling by 0 milliseconds.
|
|
Use milliseconds as unit for the frequency and delay settings.
|
|
Enable using the Watermark.
|
|
Mule creates two variables:
|
|
The value that lastID uses the first time Mule executes the poll, or whenever the watermark can’t be found.
|
|
Pick the FIRST value returned by the Selector Expression to update the lastID variable each time the flow execution completes. In this case, it takes the ID of the first tweet in the generated output (that is, the most recent one).
|
|
Return the ID of each object in the generated output, this value is passed on to the Selector.
Value: #[payload.id]
XML:
selector-expression="#[payload.id]"
|
|
Not needed. Selector and Selector Expression are being used.
|
XML Editor or Standalone
-
Follow the steps above to create a flow that polls Twitter for data every 1000 milliseconds, then logs the message payload.
-
In the flow, set the value of the processingStrategy attribute to synchronous.
|
All flows use an asynchronous processing strategy by default. If you do not set the processing strategy to synchronous, polling with watermarks does not work!
|
<flow name="test1" doc:name="test1" processingStrategy="synchronous">
-
Within the poll
scope, add a watermark
child element with this value:
<watermark variable="lastID" default-expression="-1" selector="FIRST" selector-expression="#[payload.id]"/>
This keeps a persistent record of the last element that was processed, or the last time a sync was performed.
-
Add attributes to the watermark
child element:
<watermark variable="lastID" default-expression="-1" selector="FIRST" selector-expression="#[payload.id]"/>
-
variable="lastID"
- String - Mule creates two variables:
-
default-expression= "-1"
- Integer - The value that lastID
uses the first time Mule executes the poll, or whenever the watermark can’t be found.
-
selector="FIRST"
- Pick the FIRST value returned by the Selector Expression to update the lastID
variable each time the flow execution completes. In this case, it’s the ID of the first tweet in the generated output (that is, the most recent one).
-
selector-expression="#[payload.id]"
- Return the ID of each object in the generated output, this value is passed on to the Selector.
-
Add sinceId="#[flowVars['lastID']]"
to the Twitter connector statement:
<twitter:get-user-timeline-by-screen-name config-ref="Twitter__Configuration" screenName="MuleSoft" sinceId="#[flowVars['lastID']]" doc:name="Twitter"/>
The sinceID
value is a string or Mule expression |Instructs the connector to return only those tweets with an ID greater than the value of the lastID
variable. lastID
is a flow variable that Mule creates, then updates every time the poll runs.
The code for this section is:
<flow name="test1" doc:name="test1" processingStrategy="synchronous">
<poll frequency="1000" doc:name="Poll">
<watermark variable="lastID" default-expression="-1" selector="FIRST" selector-expression="#[payload.id]"/>
<twitter:get-user-timeline-by-screen-name config-ref="Twitter" doc:name="Twitter" screenName="MuleSoft" sinceId="#[flowVars['lastID']]"/>
</poll>
<logger message="#[payload]" level="INFO" doc:name="Logger"/>
</flow>