ウォーターマークの移行

ウォーターマークは通常、たとえばレガシーリソースをポーリングして新しいデータを取得し、別の宛先エンドポイントと同期するときなど、データ同期を実行するために使用されます。ウォーターマークの技法により、定期的な同期が次に実行されるときに再開する時点を保存して取得します。ウォーターマークは通常、ObjectStore と一緒に使用されます。

Mule 3 アプローチについては、次の MuleSoft ブログを参照してください: 「Data Synchronizing made easy with Watermarks (ウォーターマークでデータ同期が簡単になる)」​。

ここで、そのブログから引用した Mule 3 のウォーターマークの例を紹介します。

Mule 3 の例: ウォーターマークを使用した Salesforce 取引先責任者のポーリング
<flow name="syncWithWatermark" processingStrategy="synchronous">
  <poll>
    <fixed-frequency-scheduler frequency="1" timeUnit="HOURS" />
      <watermark variable="timestamp"
       default-expression="#[server.dateTime.format(&quot;yyyy-MM-dd'T'HH:mm:ss.SSS'Z'&quot;)]"
       selector="MAX"
			 selector-expression="#[payload.LastModifiedDate]" />
    <sfdc:query config-ref="Salesforce" query="select Id, LastModifiedDate from Contact where LastModifiedDate &amp;gt; #[flowVars['timestamp']]" />
  </poll>
  <flow-ref name="doYourSyncMagic" />
</flow>

Anypoint Platform が拡張され、ユースケースが複雑になるに従って、Mule 3 アプローチでの制限が明らかになり始めました。

  • 以前の ​<watermark>​ コンポーネントではカスタム ObjectStore の設定が可能でしたが、作成するのは簡単ではありませんでした。この問題についてはこのドキュメントですでに説明済みであり、新しい ObjectStore Connector では修正されます。

  • <watermark>​ 要素では、​<poll>​ コンポーネント (現在は ​<scheduler>​ に置換) が反復可能なペイロードを返す必要がありました。さらに、新しいウォーターマーク値を取得するには、そのペイロードを最後まで反復処理する必要がありました。その処理に失敗すると、値が不正確に更新されるか、一切更新されませんでした。

  • フローでエラーがスローされた場合、コンテンジェンシー値でウォーターマークを更新する方法はありませんでした。

  • いつどのように値が更新されるか、エキスパート以外のユーザーにはよくわかりませんでした。

このユースケースをどうすれば改善できるか検討しているとき、問題の本当の根本は、実際には <watermark> はまったく不要であることだと気付きました。これは単に、他のコンポーネントの制限事項の影響で存在していたのです。

  • ウォーターマークの人為的な反復は、多くのデータセットが 1 回しか反復できないという事実を補う方法でしかありませんでした。特に、バイナリストリームを処理する場合や、コネクタで自動ページ設定を行う場合がこれに当てはまります。Mule 4 では、この問題は反復可能なストリーミング機能によって修正されました。

  • コンポーネントによる ObjectStore の自動処理は、古い ObjectStore サポートの UX の問題を埋め合わせるためだけのものでした。新しいコネクタでより簡単なセマンティクスを提供できれば、まったく必要がなくなります。

Mule 4 では、次のように上記の Mule 3 ブログ記事で説明しているユースケースを実行できます。

<os:object-store name="watermarkStore" persistent="true"/>

<flow name="watermark">
  <os:retrieve key="watermark" objectStore="watermarkStore" target="watermark">
    <os:default-value>2017-09-11T00:00:00.000Z</os:default-value>
  </os:retrieve>
  <sfdc:query config-ref="config">
    <sfdc:salesforce-query>
      <![CDATA[
        #["Select Id, Name, BillingCity,Phone,Website,LastModifiedDate from account WHERE LastModifiedDate > " ++ vars.watermark]
      ]]>
    </sfdc:salesforce-query>
  </sfdc:query>
  <flow-ref name="doYourIntegrationLogic" />
  <os:store key="watermark" failIfPresent="false" failOnNullValue="false" objectStore="watermarkStore">
    <os:value>#[max(payload map $.LastModifiedDate)]</os:value>
  </os:store>
</flow>

この例をステップごとに説明していきましょう。

  • まず、このフローは汎用です。特定の方法でトリガーする必要はありません。別のフローから呼び出しても、​<scheduler>​ を追加しても構いません。何らかの種別のトリガーを強制しなくても、ウォーターマークのセマンティクスを実現できるようになりました。

  • この設定では、最初にウォーターマークのカスタム ObjectStore を定義しています。これは必須ではありません。ObjectStore を定義できず、単にすべてのアプリケーションで暗黙的な ObjectStore を使用する場合もあります。しかし、ここでは例として、独自の ObjectStore を使用します。

  • このフローは、最後のウォーターマーク値を取得するための retrieve 操作で開始します。以下の点に注意してください。

    • retrieve 操作ではデフォルト値が指定されます。これは、フローが最初に実行されるときにウォーターマークがまだ計算されていなくても、使用可能な値を取得できるようにするためです。これにより、​<choice>​ → ​<contains>​ のようなパターンが必要にならないようにしています。

    • Target (対象) (​target​) パラメーターは、コネクタにメッセージペイロードをウォーターマークで上書きするのではなく、代わりに変数に配置するように指示するために使用されます。これはメッセージに悪影響を及ぼさないため、フローは再利用可能になり、保守するのが容易になります。これは、retrieve 操作の前に任意の種類の操作を追加でき、中断が起こらないためです。

  • 次に、クエリを実行し、そのクエリでウォーターマークを使用して取得した結果を絞り込みます。

  • 続いて、必要な処理をすべて実行します。こうした処理として、別のシステムに送信しても、変換しても構いません。

  • 最後に、ウォーターマーク値を更新します。このステップに関しても重要な点がいくつもあります。

    • 新しいウォーターマーク値は、最大の更新タイムスタンプにする必要があります。その値を取得するために DataWeave 式を使用します。

    • 反復可能なストリーミング機能によって、インテグレーションロジックがどのようなものだったかを意識する必要がなくなりました。クエリ操作で自動ページ設定されるとしても (実際に自動ページ設定されます!)、この機能で結果セットを再度反復できることが保証されます。

    • failIfPresent​ パラメーターが false に設定されます。そうしないと、操作は初回のみ機能し、ウォーターマークがすでにオブジェクトストアにある場合は失敗します。false に設定することで、値が存在すれば更新されるようになります。

    • そして最後に、​failOnNullValue​ パラメーターが false に設定されました。なぜでしょうか? これは、クエリの結果が空の場合、値がないため、最大値を見つける DW 式が null を返すからです。これを true に設定すると、コネクタは自動的に null 値をスキップするため、ユーザーは ​<choice>​ ルーターを使用して状況を確認する必要がなくなります。

自動ウォーターマーク

上記の機能は優れており、Mule 3 にあった機能よりも強力ですが、さらにシンプルな機能にしたいと考えました。そのため一部のコネクタには、上記のステップを一切実行しなくても、自動的にウォーターマークを行うことができるメッセージソースがすでに含まれています。このドキュメントを執筆している時点で、ファイル、Ftp、Sftp、データベース、Salesforce の各コネクタにはこの機能がありますが、引き続き他のコネクタにもこの機能のサポートを追加していく予定です。

File Connector を使用して、この機能の仕組みの例を確認してみましょう。

Mule 4 の例: File Connector の自動ウォーターマーク
<flow name="onNewInvoice">
  <file:listener config-ref="file" directory="invoices" autoDelete="false" watermarkMode="CREATED_TIMESTAMP">
      <scheduling-strategy>
          <fixed-frequency frequency="1000"/>
      </scheduling-strategy>
  </file:listener>
  <flow-ref name="onNewInvoice"/>
</flow>

この例では、作成時刻を検索条件として使用し、自動ウォーターマークを実現しています。​watermarkMode​ パラメーターは、代わりに変更されたファイルをリスンする場合には ​MODIFIED_TIMESTAMP​ に設定でき、ウォーターマークがまったく必要ない場合は ​DISABLED​ に設定することもできます。

最初の例で説明している「手間のかかる方法」を利用することをお勧めするのは、以下の場合のみです。

  • ユースケースが極めてカスタムな場合。

  • 自動ウォーターマークをサポートしていないコネクタを使用している場合。

それ以外の場合は、こうしたユースケースに対処する最適な方法として自動ウォーターマークをお勧めします。