ポーリング入力元

バージョン 1.1 以降で使用可能

ポーリング入力元は、外部アクションによってトリガーされるのではなく、指定された ​scheduling strategy​ に従って定期的に項目をポーリングして、羃等性とウォーターマークを自動的に処理します。毎回のポーリングアクションでは、多くの項目を、それぞれを単一のメッセージとしてフローにディスパッチできます。

ポーリング入力元の作成

ポーリング入力元を作成するには、通常の入力元と同じように、Extension クラスで ​Source​ クラスを ​@Sources​ アノテーションに付加します。クラス ​Source<T,A>​ を拡張するのではなく、入力元をポーリング入力元として動作させるためには、クラスを ​PollingSource<T, A>​ から拡張する必要があります。

PollingSource<T, A>​ から拡張する場合は、​poll​ メソッドと ​onRejectedItem​ メソッドを実装する必要があります。

  • poll​ はディスパッチするアイテムを取得し、​accept​ メソッドを使用してそれらのアイテムを ​pollContext​ で渡します。

  • ディスパッチした項目のいずれかが (ウォーターマーク、羃等性、あるいはサーバーの過負荷の問題などによって) 拒否された場合は、​onRejectedItem​ がコールされます。このメソッドは、​PollItem​ の結果セットに関連付けられているすべてのリソースを解放します。

FTP Connector のディレクトリリスナー実装でこれらのメソッドを宣言した例を示します。

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Override
  public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
    // ...
  }

  @Override
  public void onRejectedItem(Result<InputStream, FtpFileAttributes> result, SourceCallbackContext callbackContext) {
    // ...
  }
// ...
}

poll メソッドの実装

このメソッドは、ディスパッチする項目をすべて収集して、1 つずつ処理します。

PollContext​ からの ​accept​ メソッドは、ポーリングされた項目をフローにディスパッチします。このメソッドは、​Consumer<PollItem>​ をパラメーターとして受け取ります。

次の例では、フローにディスパッチするメッセージを設定しています。

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Override
  public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
    if(pollContext.isSourceStopping()){ (1)
      return;
    }
    List<FtpFileAttributes> attributesList = listFilesAttributes();

    for (FtpFileAttributes attributes : attributesList) {
      if(pollContext.isSourceStopping()){ (1)
        break;
      }
      pollContext.accept(item -> { (2)
        Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
        item.setResult(result); (3)
      });
    }
  }
// ...
}
1 pollContext​ は、入力元が停止中であるかどうかの情報を提供します。これは、時間を要するタスクの前には必ず考慮しなければなりません。
2 accept​ はフローに項目をディスパッチします。
3 setResult​ は、ディスパッチしたメッセージを設定します。

羃等性

羃等性により、項目が 2 回ポーリングされることを防止できます。SDK は、他のスレッドやクラスターノードが同じ ID の項目を処理していないことを保証することで、処理の羃等性を実現します。同じ ID を持つ 2 つの項目が同時に処理されることはありません。処理中の項目と同じ ID を持つ項目をノードがポーリングすると、その項目は削除され、​onRejectedItem​ メソッドがコールされます。

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Override
  public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
    if(pollContext.isSourceStopping()){
      return;
    }
    List<FtpFileAttributes> attributesList = listFilesAttributes();

    for (FtpFileAttributes attributes : attributesList) {
      if(pollContext.isSourceStopping()){
        break;
      }
      pollContext.accept(item -> {
        Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
        item.setResult(result);
        item.setId(attributes.getPath());  (1)
      });
    }
  }
// ...
}
1 setId​ は、羃等性に使用する ID を設定します。

ウォーターマーク

コールのたびに新しいデータを古いデータから手動で分割することなく、新しいデータのみを入手したい場合には、データの増分入力元をポーリングする際にウォーターマークを使用します。

項目のウォーターマークに増分値を設定することで、新しい項目のウォーターマーク値が現在のウォーターマーク値よりも小さい場合は、ほかのすべてのポーリングでその項目が削除されて ​onRejectedItem​ がコールされます。上記の例では、ファイルのタイムスタンプをウォーターマーク値として使用しています。

SDK はどの条件でウォーターマークを比較するのでしょうか?

使用するウォーターマークが ​Comparable​ を実装するのであれば、値の自然順序が使用されます。この場合は、条件を設定する必要はありません。

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Parameter
  @Optional(defaultValue = "false")
  private boolean watermarkEnabled = false; (1)
// ...
  @Override
  public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
    if(pollContext.isSourceStopping()){
      return;
    }
    List<FtpFileAttributes> attributesList = listFilesAttributes();

    for (FtpFileAttributes attributes : attributesList) {
      if(pollContext.isSourceStopping()){
        break;
      }
      pollContext.accept(item -> {
        Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
        item.setResult(result);
        item.setId(attributes.getPath());
        if (watermarkEnabled) { (1)
          item.setWatermark(attributes.getTimestamp());  (2)
        }
      });
    }
  }
// ...
}
1 通常は、ウォーターマークを使用するかどうかをユーザーに決めさせるのがベストプラクティスです。
2 setWatermark​ は、ファイルのタイムスタンプをウォーターマークとして設定します。

ウォーターマーク値が ​Comparable​ を実装しない場合や、自然順序以外の条件を使用する場合はどうしたらよいでしょうか?

この場合は、​setWatermarkComparator​ メソッドをコールして、​Comparator​ を ​PollContext​ に設定します。

ポーリングが完了する前にウォーターマークの更新が設定されていない場合、ラッチを追加すれば、2 回の反復でウォーターマークが更新されることがありません。

PollItemStatus

accept​ メソッドをコールすると ​PollItemStatus​ が返されます。これは、列挙値であり、以下の値を取ることができます。

  • ACCEPTED: 項目が受け入れられ、実行用にスケジュールされました。

  • FILTERED_BY_WATERMARK​: ウォーターマークが有効化されており、それによって除外されたため、項目が拒否されました。

  • ALREADY_IN_PROCESS​: 羃等性が有効化されており、別のスレッドまたはノードですでに同じ項目が処理されているため、項目が拒否されました。

  • SOURCE_STOPPING​: 入力元が停止信号を受信したため、項目が拒否されました。

これを使用して、ポーリングで返された各項目の状況をログに記録できます。

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Parameter
  @Optional(defaultValue = "false")
  private boolean watermarkEnabled = false;
// ...
  @Override
  public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
    if(pollContext.isSourceStopping()){
      return;
    }
    List<FtpFileAttributes> attributesList = listFilesAttributes();

    for (FtpFileAttributes attributes : attributesList) {
      if(pollContext.isSourceStopping()){
        break;
      }
      PollItemStatus status = pollContext.accept(item -> { (1)
        Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
        item.setResult(result);
        item.setId(attributes.getPath());
        if (watermarkEnabled) {
          item.setWatermark(attributes.getTimestamp());
        }
      });

      if(!status.equals(PollItemStatus.ACCEPTED)){
        LOGGER.debug("Item rejected with code:  " + status.name());
      }
    }
  }
// ...
}
1 ログに記録されている ​PollItemStatus​ を後で取得します。

入力元のコールバックコンテキストの取得

コールバックメソッドで使用するデータを保存する必要がある場合は、​item​ によって ​SourceCallbackContext​ を取得できます。

次の例では、コンテキストを取得してそのデータを保存しています。

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Parameter
  @Optional(defaultValue = "false")
  private boolean watermarkEnabled = false;
// ...
  @Override
  public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
    if(pollContext.isSourceStopping()){
      return;
    }
    List<FtpFileAttributes> attributesList = listFilesAttributes();

    for (FtpFileAttributes attributes : attributesList) {
      if(pollContext.isSourceStopping()){
        break;
      }
      PollItemStatus status = pollContext.accept(item -> {
        SourceCallbackContext context = item.getSourceCallbackContext();   (1)
        context.addVariable("attributes", attributes);  (2)
        Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
        item.setResult(result);
        item.setId(attributes.getPath());
        if (watermarkEnabled) {
          item.setWatermark(attributes.getTimestamp());
        }
      });

      if(!status.equals(PollItemStatus.ACCEPTED)){
        LOGGER.debug("Item rejected with code:  " + status.name());
      }
    }
  }
// ...
}
1 item​ からコンテキストを取得します。
2 データを ​SourceCallbackContext​に保存します。

OnRejectedItem メソッドの実装

このメソッドは、ディスパッチされた項目のいずれかが拒否されるとコールされます。このメソッドは、​PollItem​ の結果セットに関連付けられているすべてのリソースを解放します。

次の例は、このメソッドの実装方法を示しています。

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Override
    public void onRejectedItem(Result<InputStream, FtpFileAttributes> result, SourceCallbackContext callbackContext) {
      closeFileStream(result);
    }
// ...
}

フローでのポーリング入力元の使用方法

SDK は、​scheduling strategy​ パラメーターを自動的に入力元に追加し、ポーリングの実行を処理する戦略 (​fixed-frequency​ など) をユーザーが指定できるようにします。Runtime は、指定された戦略に従って ​poll​ メソッドの実行を自動的にスケジュールします。

ポーリング入力元を使用する FTP ディレクトリリスナーの例を示します。

<ftp:listener config-ref="config" directory="path/" watermarkEnabled="true">
  <scheduling-strategy>
    <fixed-frequency startDelay="40000" frequency="1000" timeUnit="MILLISECONDS"/>
  </scheduling-strategy>
</ftp:listener>