透かしの移行

透かしは通常、たとえばレガシーリソースをポーリングして新しいデータを取得し、別の宛先エンドポイントと同期するときなど、データ同期を実行するために使用されます。透かしの技法により、定期的な同期が次に実行されるときに再開する時点を保存して取得します。透かしは通常、ObjectStore と一緒に使用されます。

Mule 3 アプローチについては、次の MuleSoft ブログを参照してください: 「Data Synchronizing made easy with Watermarks (透かしでデータ同期が簡単になる)」

ここで、そのブログから引用した Mule 3 の透かしの例を紹介します。

Mule 3 の例: 透かしを使用した Salesforce 取引先責任者のポーリング
<flow name="syncWithWatermark" processingStrategy="synchronous">
  <poll>
    <fixed-frequency-scheduler frequency="1" timeUnit="HOURS" />
      <watermark variable="timestamp"
       default-expression="#[server.dateTime.format(&quot;yyyy-MM-dd'T'HH:mm:ss.SSS'Z'&quot;)]"
       selector="MAX"
			 selector-expression="#[payload.LastModifiedDate]" />
    <sfdc:query config-ref="Salesforce" query="select Id, LastModifiedDate from Contact where LastModifiedDate &amp;gt; #[flowVars['timestamp']]" />
  </poll>
  <flow-ref name="doYourSyncMagic" />
</flow>

Anypoint Platform が拡張され、ユースケースが複雑になるに従って、Mule 3 アプローチでの制限が明らかになり始めました。

  • 以前の <watermark> コンポーネントではカスタム ObjectStore の設定が可能でしたが、作成するのは簡単ではありませんでした。この問題についてはこのドキュメントですでに説明済みであり、新しい ObjectStore コネクタでは修正されます。

  • <watermark> 要素では、<poll> コンポーネント (現在は <scheduler> に置換) が反復可能なペイロードを返す必要がありました。さらに、新しい透かし値を取得するには、そのペイロードを最後まで反復処理する必要がありました。その処理に失敗すると、値が不正確に更新されるか、一切更新されませんでした。

  • フローでエラーがスローされた場合、コンティンジェンシー値で透かしを更新する方法はありませんでした。

  • いつどのように値が更新されるか、エキスパート以外のユーザにはよくわかりませんでした。

このユースケースをどうすれば改善できるか検討しているとき、問題の本当の根本は、実際には <watermark> はまったく不要であることだと気付きました。これは単に、他のコンポーネントの制限事項の影響で存在していたのです。

  • 透かしの人為的な反復は、多くのデータセットが 1 回しか反復できないという事実を補う方法でしかありませんでした。特に、バイナリストリームを処理する場合や、コネクタで自動ページ設定を行う場合がこれに当てはまります。Mule 4 では、この問題は反復可能なストリーミング機能によって修正されました。

  • コンポーネントによる ObjectStore の自動処理は、古い ObjectStore サポートの UX の問題を埋め合わせるためだけのものでした。新しいコネクタでより簡単なセマンティクスを提供できれば、まったく必要がなくなります。

Mule 4 では、次のように上記の Mule 3 ブログ記事で説明しているユースケースを実行できます。

<os:object-store name="watermarkStore" persistent="true"/>

<flow name="watermark">
  <os:retrieve key="watermark" objectStore="watermarkStore" target="watermark">
    <os:default-value>2017-09-11T00:00:00.000Z</os:default-value>
  </os:retrieve>
  <sfdc:query config-ref="config">
    <sfdc:salesforce-query>
      <![CDATA[
        #["Select Id, Name, BillingCity,Phone,Website,LastModifiedDate from account WHERE LastModifiedDate > " ++ vars.watermark]
      ]]>
    </sfdc:salesforce-query>
  </sfdc:query>
  <flow-ref name="doYourIntegrationLogic" />
  <os:store key="watermark" failIfPresent="false" failOnNullValue="false" objectStore="watermarkStore">
    <os:value>#[max(payload map $.LastModifiedDate)]</os:value>
  </os:store>
</flow>

この例をステップごとに説明していきましょう。

  • まず、このフローは汎用です。特定の方法でトリガする必要はありません。別のフローから呼び出しても、<scheduler> を追加しても構いません。何らかの種別のトリガを強制しなくても、透かしのセマンティクスを実現できるようになりました。

  • この設定では、最初に透かしのカスタム ObjectStore を定義しています。これは必須ではありません。ObjectStore を定義できず、単にすべてのアプリケーションで暗黙的な ObjectStore を使用する場合もあります。しかし、ここでは例として、独自の ObjectStore を使用します。

  • このフローは、最後の透かし値を取得するための retrieve 操作で開始します。以下の点に注意してください。

    • retrieve 操作ではデフォルト値が指定されます。これは、フローが最初に実行されるときに透かしがまだ計算されていなくても、使用可能な値を取得できるようにするためです。これにより、<choice><contains> のようなパターンが必要にならないようにしています。

    • 対象パラメータは、コネクタにメッセージペイロードを透かしで上書きするのではなく、代わりに変数に配置するように指示するために使用されます。これはメッセージに悪影響を及ぼさないため、フローは再利用可能になり、保守するのが容易になります。これは、retrieve 操作の前に任意の種類の操作を追加でき、中断が起こらないためです。

  • 次に、クエリを実行し、そのクエリで透かしを使用して取得した結果を絞り込みます。

  • 続いて、必要な処理をすべて実行します。こうした処理として、別のシステムに送信しても、変換しても構いません。

  • 最後に、透かし値を更新します。このステップに関しても重要な点がいくつもあります。

    • 新しい透かし値は、最大の更新タイムスタンプにする必要があります。その値を取得するために DataWeave 式を使用します。

    • 反復可能なストリーミング機能によって、インテグレーションロジックがどのようなものだったかを意識する必要がなくなりました。クエリ操作で自動ページ設定されるとしても (実際に自動ページ設定されます!)、この機能で結果セットを再度反復できることが保証されます。

    • failIfPresent パラメータが false に設定されます。そうしないと、操作は初回のみ機能し、透かしがすでにオブジェクトストアにある場合は失敗します。false に設定することで、値が存在すれば更新されるようになります。

    • そして最後に、failOnNullValue パラメータが false に設定されました。なぜでしょうか? これは、クエリの結果が空の場合、値がないため、最大値を見つける DW 式が null を返すからです。これを true に設定すると、コネクタは自動的に null 値をスキップするため、ユーザは <choice> ルータを使用して状況を確認する必要がなくなります。

自動透かし

上記の機能は優れており、Mule 3 にあった機能よりも強力ですが、さらにシンプルな機能にしたいと考えました。そのため一部のコネクタには、上記のステップを一切実行しなくても、自動的に透かしを行うことができるメッセージソースがすでに含まれています。このドキュメントを執筆している時点で、ファイル、Ftp、Sftp、データベース、Salesforce の各コネクタにはこの機能がありますが、引き続き他のコネクタにもこの機能のサポートを追加していく予定です。

File Connector を使用して、この機能の仕組みの例を確認してみましょう。

Mule 4 の例: File Connector の自動透かし
<flow name="onNewInvoice">
  <file:listener config-ref="file" directory="invoices" autoDelete="false" watermarkMode="CREATED_TIMESTAMP">
      <scheduling-strategy>
          <fixed-frequency frequency="1000"/>
      </scheduling-strategy>
  </file:listener>
  <flow-ref name="onNewInvoice"/>
</flow>

この例では、作成時刻を検索条件として使用し、自動透かしを実現しています。watermarkMode パラメータは、代わりに変更されたファイルをリスンする場合には MODIFIED_TIMESTAMP に設定でき、透かしがまったく必要ない場合は DISABLED に設定することもできます。

最初の例で説明している「手間のかかる方法」を利用することをお勧めするのは、以下の場合のみです。

  • ユースケースが極めてカスタムな場合。

  • 自動透かしをサポートしていないコネクタを使用している場合。

それ以外の場合は、こうしたユースケースに対処する最適な方法として自動透かしをお勧めします。

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub