Nav

Example: Setting Up Watermarks with an Object Store

Watermarking is a technique for storing and retrieving the point at which a periodic synchronization should resume the next time it’s executed. Watermarking is a common use of the object store.

Background for Mule 3 Users

For an explanation of what watermarks are and how MuleSoft approached that problem in Mule 3, see this blog post.

Limitations of the Mule 3 approach:

  • Although the Mule 3 <watermark/> component allowed for setting a custom ObjectStore, there was no easy way to create it. Mule 4 fixes this issue.

  • The <watermark/> element required the <poll/> component (replaced by <scheduler/> in Mule 4) to return an iterable payload, and you had to fully iterate over that payload to obtain the new watermark value. Failure to do so resulted in the value being wrongfully updated or not updated at all.

  • There was no way to update the watermark with a contingency value if the flow threw an error. How and when the value got updated was not completely clear.

When we sat down to figure out how to improve this use case, we realized that the real root of the problem was that in reality the <watermark> shouldn’t be necessary at all. It’s existence was just a consequence of limitations in other components:

  • The watermark’s artificial iteration was just a way to compensate for the fact that many data sets could only be iterated once, especially when dealing with binary streams or with connectors doing auto paging. This problem is now fixed in Mule 4 thanks to the repeatable streaming feature

  • The component’s automatic handling of the ObjectStore was just to compensate for UX problems on the old ObjectStore support. If the new connector were to provide easier semantics, it wouldn’t be needed at all.

Mule 4 Example

This example shows how to use Mule 4 to perform the use case described in the Mule 3 blog.


         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<os:object-store name="watermarkStore" persistent="true"/>

<flow name="watermark">
    <os:retrieve key="watermark" objectStore="watermarkStore" target="watermark">
        <os:default-value>2017-09-11T00:00:00.000Z</os:default-value>
    </os:retrieve>
    <sfdc:query config-ref="config">
        <sfdc:salesforce-query>
            <![CDATA[
                #["Select Id, Name, BillingCity,Phone,Website,LastModifiedDate from account WHERE LastModifiedDate > " ++ vars.watermark]
           ]]>
       </sfdc:salesforce-query>
    </sfdc:query>
    <flow-ref name="doYourIntegrationLogic" />
    <os:store key="watermark" failIfPresent="false"
     failOnNullValue="false" objectStore="watermarkStore">
        <os:value>#[max(payload map $.LastModifiedDate)]</os:value>
    </os:store>
</flow>

The example is a generic flow that does not require a specific type of trigger. Though you might invoke it from another flow, or you might use a <scheduler/> to trigger the flow, you can produce watermark semantics without forcing any type of trigger.

Then the example defines a custom ObjectStore for the watermark, but it is also possible to use the ObjectStore that is available implicitly to every app. So you do not need to define a custom ObjectStore.

The flow starts by retrieving the last watermark value. The Retrieve operation (<os:retrieve/>) specifies a default value (<os:default-value/>) that is available the first time the flow runs, before the first watermark is calculated. This configuration avoids the need for a <choice> → <contains> type of pattern.

Note that the Retrieve operation stores the watermark value in a target value (target="watermark") to prevent the connector from overriding the message payload. This makes the flow reusable and easier to maintain. You can safely add any kind of operation before the Retrieve operation.

The query (<sfdc:query/>) uses the watermark (vars.watermark) to filter the obtained results.

The <flow-ref /> points to another flow that does whatever processing is needed, for example, sending data to another system, performing a transformation, or something else.

The last step (in <os:store/>) is to update the watermark value:

  • The new watermark value should be the maximum updated timestamp. The example uses <os:value/> to provide a DataWeave expression that obtains that value.

    Note that the repeatable streaming feature prevents the need for you to know the integration logic. Even though the query operation auto-paged, this feature guarantees that you can still iterate the result set again.

  • failIfPresent is set to false so that the value will be updated if it is present. Otherwise, the operation would only work the first time and fail if the watermark were already in the object store.

  • failOnNullValue is set to false, so if the query comes back empty, the DataWeave expression (which looks for the max value) will return null. Setting it to true makes the connector skip null values, which prevents the need to use a <choice/> router to check the condition.