<flow name="syncWithWatermark" processingStrategy="synchronous">
<poll>
<fixed-frequency-scheduler frequency="1" timeUnit="HOURS" />
<watermark variable="timestamp"
default-expression="#[server.dateTime.format("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")]"
selector="MAX"
selector-expression="#[payload.LastModifiedDate]" />
<sfdc:query config-ref="Salesforce" query="select Id, LastModifiedDate from Contact where LastModifiedDate &gt; #[flowVars['timestamp']]" />
</poll>
<flow-ref name="doYourSyncMagic" />
</flow>
Migrating Watermarks
Watermarking is typically used to perform data synchronization, for example, when polling a legacy resource to retrieve new data and to sync it with another destination endpoint. The watermarking technique stores and retrieves the point at which a periodic synchronization resumes the next time it executes. Watermarking is a typically used alongside ObjectStore.
For the Mule 3 approach, see this MuleSoft blog: Data Synchronizing made easy with Watermarks.
Here’s the Mule 3 watermark example from that blog:
As the Anypoint Platform grew and our use cases became more complicated, limitations started to appear in the Mule 3 approach:
-
Although the old
<watermark>
component did allow for setting a custom ObjectStore, there was no easy way to create it. This is a problem we already addressed in this document and know the new ObjectStore connector fixes -
The
<watermark>
element required the<poll>
component (now replaced with<scheduler>
) to return an iterable payload. Furthermore, you were required to fully iterate over that payload to obtain the new watermark value. Failure to do so resulted in the value being wrongfully updated or not updated at all. -
There was no way to update the watermark with a contingency value in case of the flow throwing an error.
-
How and when the value got updated was not completely clear to non expert users.
When we sat down to figure out how to improve this use case, we realized that the real root of the problem was that in reality the <watermark> shouldn’t be necessary at all. It’s existence was just a consequence of limitations in other components:
-
The watermark’s artificial iteration was just a way to compensate for the fact that many data sets could only be iterated once, specially when dealing with binary streams or with connectors doing auto paging. This problem is now fixed in Mule 4 thanks to the repeatable streaming feature
-
The component’s automatic handling of the ObjectStore was just to compensate for UX problems on the old ObjectStore support. If the new connector were to provide easier semantics, it wouldn’t be needed at all.
In Mule 4, you can perform the use case described in the Mule 3 blog post above like this:
<os:object-store name="watermarkStore" persistent="true"/>
<flow name="watermark">
<os:retrieve key="watermark" objectStore="watermarkStore" target="watermark">
<os:default-value>2017-09-11T00:00:00.000Z</os:default-value>
</os:retrieve>
<sfdc:query config-ref="config">
<sfdc:salesforce-query>
<![CDATA[
#["Select Id, Name, BillingCity,Phone,Website,LastModifiedDate from account WHERE LastModifiedDate > " ++ vars.watermark]
]]>
</sfdc:salesforce-query>
</sfdc:query>
<flow-ref name="doYourIntegrationLogic" />
<os:store key="watermark" failIfPresent="false" failOnNullValue="false" objectStore="watermarkStore">
<os:value>#[max(payload map $.LastModifiedDate)]</os:value>
</os:store>
</flow>
Let’s break the example down step by step:
-
First, this flow is generic. It doesn’t require to be triggered in any specific way. It could be invoked from another flow, you could add a
<scheduler>
to it, it doesn’t matter. You can now achieve watermark semantics without forcing any type of trigger. -
This configuration starts by defining a custom ObjectStore for the watermark. This is not needed. You could not define an ObjectStore and just use the one that every application has implicit. But just for the sake of the example, we’ll use our own here.
-
The flow starts with a retrieve operation to get the last watermark value. Notice that:
-
The retrieve operation specifies a default value. This is so that when the flow runs for the first time and no watermark was calculated yet, you still get a usable value. This avoids the need for a
<choice>
→<contains>
kind of pattern. -
The Target (
target
) parameter is used to tell the connector not to override the message payload with the watermark, but instead put it in a variable. Because this has no side effect on the message, the flow is now reusable and easier to maintain, since you could add any kind of operation before the retrieve operation, nothing would break.
-
-
Then we perform the query, in which we use the watermark to filter the obtained results
-
Then we do whatever processing is needed. This can be anything from sending to another system, transforming, it doesn’t matter.
-
And finally, we update the watermark value. There’s a lot to be said about this step too:
-
The new watermark value should be the maximum updated timestamp. We use a DataWeave expression to obtain that value
-
Notice that thanks to the repeatable streaming feature, we now don’t care about what the integration logic was. Even if the query operation is auto paged (which it is!), this feature guarantees that we can still iterate the resultset again.
-
The
failIfPresent
parameter is set to false. Otherwise, the operation would only work the first time and fail if the watermark were already in the object store. By setting it to false, the value will be updated if present -
And finally, the
failOnNullValue
parameter was set to false. Why? Because if the query comes back empty, the DW expression which looks for the max value will return null, because there’s no value. By setting this to true, the connector will automatically skip null values saving the user from using a<choice>
router to check the condition
-
Automatic Watermark
The above is great and more powerful than what we had in Mule 3, but we wanted to make it even simpler. That’s why some connectors already include message sources capable of doing watermarking automatically, without the need for you to take any of the steps above. At the moment of writing this document, the File, Ftp, Sftp, Database and Salesforce connectors have this capability, but we will continue to add support for this in other connectors.
Let’s use the file connector to see an example of how this works:
<flow name="onNewInvoice">
<file:listener config-ref="file" directory="invoices" autoDelete="false" watermarkMode="CREATED_TIMESTAMP">
<scheduling-strategy>
<fixed-frequency frequency="1000"/>
</scheduling-strategy>
</file:listener>
<flow-ref name="onNewInvoice"/>
</flow>
In this example, we get automatic watermarking using the file’s creation time as a filtering criteria. The watermarkMode
parameter can also be set to MODIFIED_TIMESTAMP
if we wanted to listen for modified files instead or even to DISABLED
if we don’t want any watermarking at all.
We only recommend to take "the long way" described in the first example when:
-
Your use case is very custom
-
You are using a connector which doesn’t support automatic watermarking.
In any other case, we recommend automatic watermarking as the best way of dealing with these use cases.