オブジェクトストアでのウォーターマークのセットアップの例 - Mule 4

ウォーターマークとは、定期的な同期が次に実行されるときに再開する時点を保存および取得する技法です。ウォーターマークは、オブジェクトストアの一般的な使用方法です。

Mule 3 ユーザーのための背景知識

ウォーターマークとは何か、MuleSoft が Mule 3 でその問題にどう対処しているかについての説明は、 こちらのブログ記事​を参照してください。

Mule 3 アプローチには、次の制限事項があります。

  • Mule 3 ​<watermark/>​ コンポーネントではカスタム ​ObjectStore​ の設定が可能ですが、それを作成するのは簡単ではありませんでした。Mule 4 ではこの問題が修正されています。

  • <watermark/>​ 要素では、​<poll/>​ コンポーネント (Mule 4 で ​<scheduler/>​ に置き換え) が反復可能なペイロードを返す必要がありました。また、新しいウォーターマーク値を取得するには、そのペイロードを最後まで反復処理する必要がありました。その処理に失敗すると、値が不正確に更新されるか、一切更新されませんでした。

  • フローでエラーが発生した場合、コンテンジェンシー値でウォーターマークを更新する方法はありませんでした。いつどのように値が更新されるかが、完全に明確化されてはいませんでした。

このユースケースをどうすれば改善できるか検討しているとき、問題の本当の根本は、実際には <watermark> はまったく不要であることだと気付きました。これは単に、他のコンポーネントの制限事項の影響で存在していたのです。

  • ウォーターマークの人為的な反復は、多くのデータセットが 1 回しか反復できないという事実を補う方法でしかありませんでした。特に、バイナリストリームを処理する場合や、コネクタで自動ページ設定を行う場合がこれに当てはまります。Mule 4 では、この問題は反復可能なストリーミング機能によって修正されました。

  • コンポーネントによる ​ObjectStore​ の自動処理は、古い ​ObjectStore​ サポートの UX の問題を埋め合わせるためだけのものでした。新しいコネクタでより簡単なセマンティクスを提供できれば、まったく必要がなくなります。

Mule 4 の例

この例は、Mule 4 を使用して Mule 3 ブログで説明されているユースケースを実行する方法を示しています。

<os:object-store name="watermarkStore" persistent="true"/>

<flow name="watermark">
    <os:retrieve key="watermark" objectStore="watermarkStore" target="watermark">
        <os:default-value>2017-09-11T00:00:00.000Z</os:default-value>
    </os:retrieve>
    <sfdc:query config-ref="config">
        <sfdc:salesforce-query>
            <![CDATA[
                #["Select Id, Name, BillingCity,Phone,Website,LastModifiedDate from account WHERE LastModifiedDate > " ++ vars.watermark]
           ]]>
       </sfdc:salesforce-query>
    </sfdc:query>
    <flow-ref name="doYourIntegrationLogic" />
    <os:store key="watermark" failIfPresent="false"
     failOnNullValue="false" objectStore="watermarkStore">
        <os:value>#[max(payload map $.LastModifiedDate)]</os:value>
    </os:store>
</flow>

これは、特定の種類のトリガーを必要としない汎用フローの例です。別のフローからこのフローを呼び出したり、​<scheduler/>​ を使用してフローをトリガーしたりする場合がありますが、どの種類のトリガーも強制せずに、ウォーターマークのセマンティクスを作成できます。

次にこの例では、ウォーターマークのカスタム ​ObjectStore​ を定義していますが、暗黙的にどのアプリケーションでも使用可能な ​ObjectStore​ を使用することもできます。つまり、カスタム ​ObjectStore​ を定義する必要がありません。

フローは、最後のウォーターマーク値を取得することで開始します。Retrieve 操作 (​<os:retrieve/>​) では、最初にフローを実行するとき、最初のウォーターマークが計算される前に、使用可能なデフォルト値 (​<os:default-value/>​) を指定します。この設定では、​<choice> → <contains>​ タイプのパターンが必要にならないようにしています。

Retrieve 操作は、ウォーターマーク値を対象値に保存して (​target="watermark"​)、コネクタがメッセージペイロードを上書きしないようにしています。これにより、フローが再利用可能でメンテナンスしやすくなります。Retrieve 操作の前に、任意の種類の操作を安全に追加できます。

クエリ (​<sfdc:query/>​) はウォーターマーク (​vars.watermark​) を使用して取得した結果を絞り込みます。

<flow-ref />​ は、別のシステムへのデータの送信、トランスフォーメーションの実行など、必要な処理を実行する別のフローを参照します。

最後のステップ (​<os:store/>​ 内) で、ウォーターマーク値を次のように更新します。

  • 新しいウォーターマーク値は、最大の更新タイムスタンプにする必要があります。この例では ​<os:value/>​ を使用してその値を取得する DataWeave 式を指定しています。

    反復可能なストリーミング機能によって、ユーザーが反復ロジックを意識する必要がなくなっています。クエリ操作で自動ページ設定されても、この機能でユーザーが結果セットを再度反復できることが保証されます。

  • failIfPresent​ は ​false​ に設定されているため、値が存在すれば更新されます。そうしないと、操作は初回のみ機能し、ウォーターマークがすでにオブジェクトストアにある場合は失敗します。

  • failOnNullValue​ は ​false​ に設定されるため、クエリの結果が空の場合、DataWeave 式 (最大値を見つける) は null を返します。​false​ に設定すると、コネクタは null 値をスキップするので、<choice/> ルーターを使用して状況を確認する必要がなくなります。デフォルト値の ​true​ を使用すると、コネクタは ​NULL_VALUE​ エラーをスローします。