ポーリングソース

バージョン 1.1 以降で使用可能

ポーリングソースは、外部アクションによってトリガされるのではなく、指定された scheduling strategy に従って定期的に項目をポーリングして、羃等性と透かしを自動的に処理します。毎回のポーリングアクションでは、多くの項目を、それぞれを単一のメッセージとしてフローにディスパッチできます。

ポーリングソースの作成

ポーリングソースを作成するには、通常のソースと同じように、Extension クラスで Source クラスを @Sources アノテーションに付加します。クラス Source<T,A> を拡張するのではなく、ソースをポーリングソースとして動作させるためには、クラスを PollingSource<T, A> から拡張する必要があります。

PollingSource<T, A> から拡張する場合は、poll メソッドと onRejectedItem メソッドを実装する必要があります。

  • poll はディスパッチするアイテムを取得し、accept メソッドを使用してそれらのアイテムを pollContext で渡します。

  • ディスパッチした項目のいずれかが (透かし、羃等性、あるいはサーバの過負荷の問題などによって) 拒否された場合は、onRejectedItem がコールされます。このメソッドは、PollItem の結果セットに関連付けられているすべてのリソースを解放します。

FTP コネクタのディレクトリリスナ実装でこれらのメソッドを宣言した例を示します。

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Override
  public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
    // ...
  }

  @Override
  public void onRejectedItem(Result<InputStream, FtpFileAttributes> result, SourceCallbackContext callbackContext) {
    // ...
  }
// ...
}

poll メソッドの実装

このメソッドは、ディスパッチする項目をすべて収集して、1 つずつ処理します。

PollContext からの accept メソッドは、ポーリングされた項目をフローにディスパッチします。このメソッドは、Consumer<PollItem> をパラメータとして受け取ります。

次の例では、フローにディスパッチするメッセージを設定しています。

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Override
  public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
    if(pollContext.isSourceStopping()){ (1)
      return;
    }
    List<FtpFileAttributes> attributesList = listFilesAttributes();

    for (FtpFileAttributes attributes : attributesList) {
      if(pollContext.isSourceStopping()){ (1)
        break;
      }
      pollContext.accept(item -> { (2)
        Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
        item.setResult(result); (3)
      });
    }
  }
// ...
}
1 pollContext は、ソースが停止中であるかどうかの情報を提供します。これは、時間を要するタスクの前には必ず考慮しなければなりません。
2 accept はフローに項目をディスパッチします。
3 setResult は、ディスパッチしたメッセージを設定します。

羃等性

羃等性により、項目が 2 回ポーリングされることを防止できます。SDK は、他のスレッドやクラスタノードが同じ ID の項目を処理していないことを保証することで、処理の羃等性を実現します。同じ ID を持つ 2 つの項目が同時に処理されることはありません。処理中の項目と同じ ID を持つ項目をノードがポーリングすると、その項目は削除され、onRejectedItem メソッドがコールされます。

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Override
  public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
    if(pollContext.isSourceStopping()){
      return;
    }
    List<FtpFileAttributes> attributesList = listFilesAttributes();

    for (FtpFileAttributes attributes : attributesList) {
      if(pollContext.isSourceStopping()){
        break;
      }
      pollContext.accept(item -> {
        Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
        item.setResult(result);
        item.setId(attributes.getPath());  (1)
      });
    }
  }
// ...
}
1 setId は、羃等性に使用する ID を設定します。

透かし

コールのたびに新しいデータを古いデータから手動で分割することなく、新しいデータのみを入手したい場合には、データの増分ソースをポーリングする際に透かしを使用します。

項目の透かしに増分値を設定することで、新しい項目の透かし値が現在の透かし値よりも小さい場合は、ほかのすべてのポーリングでその項目が削除されて onRejectedItem がコールされます。上記の例では、ファイルのタイムスタンプを透かし値として使用しています。

SDK はどの条件で透かしを比較するのでしょうか?

使用する透かしが Comparable を実装するのであれば、値の自然順序が使用されます。この場合は、条件を設定する必要はありません。

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Parameter
  @Optional(defaultValue = "false")
  private boolean watermarkEnabled = false; (1)
// ...
  @Override
  public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
    if(pollContext.isSourceStopping()){
      return;
    }
    List<FtpFileAttributes> attributesList = listFilesAttributes();

    for (FtpFileAttributes attributes : attributesList) {
      if(pollContext.isSourceStopping()){
        break;
      }
      pollContext.accept(item -> {
        Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
        item.setResult(result);
        item.setId(attributes.getPath());
        if (watermarkEnabled) { (1)
          item.setWatermark(attributes.getTimestamp());  (2)
        }
      });
    }
  }
// ...
}
1 通常は、透かしを使用するかどうかをユーザに決めさせるのがベストプラクティスです。
2 setWatermark は、ファイルのタイムスタンプを透かしとして設定します。

透かし値が Comparable を実装しない場合や、自然順序以外の条件を使用する場合はどうしたらよいでしょうか?

この場合は、setWatermarkComparator メソッドをコールして、ComparatorPollContext に設定します。

PollItemStatus

accept メソッドをコールすると PollItemStatus が返されます。これは、列挙値であり、以下の値を取ることができます。

  • ACCEPTED: 項目が受け入れられ、実行用にスケジュールされました。

  • FILTERED_BY_WATERMARK: 透かしが有効化されており、それによって除外されたため、項目が拒否されました。

  • ALREADY_IN_PROCESS: 羃等性が有効化されており、別のスレッドまたはノードですでに同じ項目が処理されているため、項目が拒否されました。

  • SOURCE_STOPPING: ソースが停止信号を受信したため、項目が拒否されました。

これを使用して、ポーリングで返された各項目の状況をログに記録できます。

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Parameter
  @Optional(defaultValue = "false")
  private boolean watermarkEnabled = false;
// ...
  @Override
  public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
    if(pollContext.isSourceStopping()){
      return;
    }
    List<FtpFileAttributes> attributesList = listFilesAttributes();

    for (FtpFileAttributes attributes : attributesList) {
      if(pollContext.isSourceStopping()){
        break;
      }
      PollItemStatus status = pollContext.accept(item -> { (1)
        Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
        item.setResult(result);
        item.setId(attributes.getPath());
        if (watermarkEnabled) {
          item.setWatermark(attributes.getTimestamp());
        }
      });

      if(!status.equals(PollItemStatus.ACCEPTED)){
        LOGGER.debug("Item rejected with code:  " + status.name());
      }
    }
  }
// ...
}
1 ログに記録されている PollItemStatus を後で取得します。

ソースコールバックコンテキストの取得

コールバックメソッドで使用するデータを保存する必要がある場合は、item によって SourceCallbackContext を取得できます。

次の例では、コンテキストを取得してそのデータを保存しています。

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Parameter
  @Optional(defaultValue = "false")
  private boolean watermarkEnabled = false;
// ...
  @Override
  public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
    if(pollContext.isSourceStopping()){
      return;
    }
    List<FtpFileAttributes> attributesList = listFilesAttributes();

    for (FtpFileAttributes attributes : attributesList) {
      if(pollContext.isSourceStopping()){
        break;
      }
      PollItemStatus status = pollContext.accept(item -> {
        SourceCallbackContext context = item.getSourceCallbackContext();   (1)
        context.addVariable("attributes", attributes);  (2)
        Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
        item.setResult(result);
        item.setId(attributes.getPath());
        if (watermarkEnabled) {
          item.setWatermark(attributes.getTimestamp());
        }
      });

      if(!status.equals(PollItemStatus.ACCEPTED)){
        LOGGER.debug("Item rejected with code:  " + status.name());
      }
    }
  }
// ...
}
1 item からコンテキストを取得します。
2 データを `SourceCallbackContext`に保存します。

OnRejectedItem メソッドの実装

このメソッドは、ディスパッチされた項目のいずれかが拒否されるとコールされます。このメソッドは、PollItem の結果セットに関連付けられているすべてのリソースを解放します。

次の例は、このメソッドの実装方法を示しています。

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Override
    public void onRejectedItem(Result<InputStream, FtpFileAttributes> result, SourceCallbackContext callbackContext) {
      closeFileStream(result);
    }
// ...
}

フローでのポーリングソースの使用方法

SDK は、scheduling strategy パラメータを自動的にソースに追加し、ポーリングの実行を処理する戦略 (fixed-frequency など) をユーザが指定できるようにします。Runtime は、指定された戦略に従って poll メソッドの実行を自動的にスケジュールします。

ポーリングソースを使用する FTP ディレクトリリスナの例を示します。

<ftp:listener config-ref="config" directory="path/" watermarkEnabled="true">
  <scheduling-strategy>
    <fixed-frequency startDelay="40000" frequency="1000" timeUnit="MILLISECONDS"/>
  </scheduling-strategy>
</ftp:listener>

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub