例: オブジェクトストアを使用した透かしの設定

透かしとは、定期的な同期が次に実行されるときに再開する時点を保存および取得する技法です。透かしは、オブジェクトストアの一般的な使用方法です。

Mule 3 ユーザのための背景知識

透かしとは何か、MuleSoft が Mule 3 でその問題にどう対処しているかについての説明は、 こちらのブログ記事 を参照してください。

Mule 3 アプローチには、次の制限事項があります。

  • Mule 3 <watermark/> コンポーネントではカスタム ObjectStore の設定が可能ですが、それを作成するのは簡単ではありませんでした。Mule 4 ではこの問題が修正されています。

  • <watermark/> 要素では、<poll/> コンポーネント (Mule 4 で <scheduler/> に置き換え) が反復可能なペイロードを返す必要がありました。また、新しい透かし値を取得するには、そのペイロードを最後まで反復処理する必要がありました。その処理に失敗すると、値が不正確に更新されるか、一切更新されませんでした。

  • フローでエラーが発生した場合、コンティンジェンシー値で透かしを更新する方法はありませんでした。いつどのように値が更新されるかが、完全に明確化されてはいませんでした。

このユースケースをどうすれば改善できるか検討しているとき、問題の本当の根本は、実際には <watermark> はまったく不要であることだと気付きました。これは単に、他のコンポーネントの制限事項の影響で存在していたのです。

  • 透かしの人為的な反復は、多くのデータセットが 1 回しか反復できないという事実を補う方法でしかありませんでした。特に、バイナリストリームを処理する場合や、コネクタで自動ページ設定を行う場合がこれに当てはまります。Mule 4 では、この問題は反復可能なストリーミング機能によって修正されました。

  • コンポーネントによる ObjectStore の自動処理は、古い ObjectStore サポートの UX の問題を埋め合わせるためだけのものでした。新しいコネクタでより簡単なセマンティクスを提供できれば、まったく必要がなくなります。

Mule 4 の例

この例は、Mule 4 を使用して Mule 3 ブログで説明されているユースケースを実行する方法を示しています。

<os:object-store name="watermarkStore" persistent="true"/>

<flow name="watermark">
    <os:retrieve key="watermark" objectStore="watermarkStore" target="watermark">
        <os:default-value>2017-09-11T00:00:00.000Z</os:default-value>
    </os:retrieve>
    <sfdc:query config-ref="config">
        <sfdc:salesforce-query>
            <![CDATA[
                #["Select Id, Name, BillingCity,Phone,Website,LastModifiedDate from account WHERE LastModifiedDate > " ++ vars.watermark]
           ]]>
       </sfdc:salesforce-query>
    </sfdc:query>
    <flow-ref name="doYourIntegrationLogic" />
    <os:store key="watermark" failIfPresent="false"
     failOnNullValue="false" objectStore="watermarkStore">
        <os:value>#[max(payload map $.LastModifiedDate)]</os:value>
    </os:store>
</flow>

これは、特定の種類のトリガを必要としない汎用フローの例です。別のフローからこのフローを呼び出したり、<scheduler/> を使用してフローをトリガしたりする場合がありますが、どの種類のトリガも強制せずに、透かしのセマンティクスを作成できます。

次にこの例では、透かしのカスタム ObjectStore を定義していますが、暗黙的にどのアプリケーションでも使用可能な ObjectStore を使用することもできます。つまり、カスタム ObjectStore を定義する必要がありません。

フローは、最後の透かし値を取得することで開始します。Retrieve 操作 (<os:retrieve/>) では、最初にフローを実行するとき、最初の透かしが計算される前に、使用可能なデフォルト値 (<os:default-value/>) を指定します。この設定では、<choice> → <contains> タイプのパターンが必要にならないようにしています。

Retrieve 操作は、透かし値を対象値に保存して (target="watermark")、コネクタがメッセージペイロードを上書きしないようにしています。これにより、フローが再利用可能でメンテナンスしやすくなります。Retrieve 操作の前に、任意の種類の操作を安全に追加できます。

クエリ (<sfdc:query/>) は透かし (vars.watermark) を使用して取得した結果を絞り込みます。

<flow-ref /> は、別のシステムへのデータの送信、トランスフォーメーションの実行など、必要な処理を実行する別のフローを参照します。

最後のステップ (<os:store/> 内) で、透かし値を次のように更新します。

  • 新しい透かし値は、最大の更新タイムスタンプにする必要があります。この例では <os:value/> を使用してその値を取得する DataWeave 式を指定しています。

    反復可能なストリーミング機能によって、ユーザが反復ロジックを意識する必要がなくなっています。クエリ操作で自動ページ設定されても、この機能でユーザが結果セットを再度反復できることが保証されます。

  • failIfPresentfalse に設定されているため、値が存在すれば更新されます。そうしないと、操作は初回のみ機能し、透かしがすでにオブジェクトストアにある場合は失敗します。

  • failOnNullValuefalse に設定されるため、クエリの結果が空の場合、DataWeave 式 (最大値を見つける) は null を返します。true に設定すると、コネクタは null 値をスキップするので、<choice/> ルータを使用して状況を確認する必要がなくなります。

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub