Flex Gateway新着情報
Governance新着情報
Monitoring API Managerバージョン 1.1 以降で使用可能
ポーリング入力元は、外部アクションによってトリガーされるのではなく、指定された scheduling strategy
に従って定期的に項目をポーリングして、羃等性とウォーターマークを自動的に処理します。毎回のポーリングアクションでは、多くの項目を、それぞれを単一のメッセージとしてフローにディスパッチできます。
ポーリング入力元を作成するには、通常の入力元と同じように、Extension クラスで Source
クラスを @Sources
アノテーションに付加します。クラス Source<T,A>
を拡張するのではなく、入力元をポーリング入力元として動作させるためには、クラスを PollingSource<T, A>
から拡張する必要があります。
PollingSource<T, A>
から拡張する場合は、poll
メソッドと onRejectedItem
メソッドを実装する必要があります。
poll
はディスパッチするアイテムを取得し、accept
メソッドを使用してそれらのアイテムを pollContext
で渡します。
ディスパッチした項目のいずれかが (ウォーターマーク、羃等性、あるいはサーバーの過負荷の問題などによって) 拒否された場合は、onRejectedItem
がコールされます。このメソッドは、PollItem
の結果セットに関連付けられているすべてのリソースを解放します。
FTP Connector のディレクトリリスナー実装でこれらのメソッドを宣言した例を示します。
public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
@Override
public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
// ...
}
@Override
public void onRejectedItem(Result<InputStream, FtpFileAttributes> result, SourceCallbackContext callbackContext) {
// ...
}
// ...
}
このメソッドは、ディスパッチする項目をすべて収集して、1 つずつ処理します。
PollContext
からの accept
メソッドは、ポーリングされた項目をフローにディスパッチします。このメソッドは、Consumer<PollItem>
をパラメーターとして受け取ります。
次の例では、フローにディスパッチするメッセージを設定しています。
public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
@Override
public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
if(pollContext.isSourceStopping()){ (1)
return;
}
List<FtpFileAttributes> attributesList = listFilesAttributes();
for (FtpFileAttributes attributes : attributesList) {
if(pollContext.isSourceStopping()){ (1)
break;
}
pollContext.accept(item -> { (2)
Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
item.setResult(result); (3)
});
}
}
// ...
}
1 | pollContext は、入力元が停止中であるかどうかの情報を提供します。これは、時間を要するタスクの前には必ず考慮しなければなりません。 |
2 | accept はフローに項目をディスパッチします。 |
3 | setResult は、ディスパッチしたメッセージを設定します。 |
羃等性により、項目が 2 回ポーリングされることを防止できます。SDK は、他のスレッドやクラスターノードが同じ ID の項目を処理していないことを保証することで、処理の羃等性を実現します。同じ ID を持つ 2 つの項目が同時に処理されることはありません。処理中の項目と同じ ID を持つ項目をノードがポーリングすると、その項目は削除され、onRejectedItem
メソッドがコールされます。
public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
@Override
public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
if(pollContext.isSourceStopping()){
return;
}
List<FtpFileAttributes> attributesList = listFilesAttributes();
for (FtpFileAttributes attributes : attributesList) {
if(pollContext.isSourceStopping()){
break;
}
pollContext.accept(item -> {
Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
item.setResult(result);
item.setId(attributes.getPath()); (1)
});
}
}
// ...
}
1 | setId は、羃等性に使用する ID を設定します。 |
コールのたびに新しいデータを古いデータから手動で分割することなく、新しいデータのみを入手したい場合には、データの増分入力元をポーリングする際にウォーターマークを使用します。
項目のウォーターマークに増分値を設定することで、新しい項目のウォーターマーク値が現在のウォーターマーク値よりも小さい場合は、ほかのすべてのポーリングでその項目が削除されて onRejectedItem
がコールされます。上記の例では、ファイルのタイムスタンプをウォーターマーク値として使用しています。
SDK はどの条件でウォーターマークを比較するのでしょうか?
使用するウォーターマークが Comparable
を実装するのであれば、値の自然順序が使用されます。この場合は、条件を設定する必要はありません。
public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
@Parameter
@Optional(defaultValue = "false")
private boolean watermarkEnabled = false; (1)
// ...
@Override
public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
if(pollContext.isSourceStopping()){
return;
}
List<FtpFileAttributes> attributesList = listFilesAttributes();
for (FtpFileAttributes attributes : attributesList) {
if(pollContext.isSourceStopping()){
break;
}
pollContext.accept(item -> {
Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
item.setResult(result);
item.setId(attributes.getPath());
if (watermarkEnabled) { (1)
item.setWatermark(attributes.getTimestamp()); (2)
}
});
}
}
// ...
}
1 | 通常は、ウォーターマークを使用するかどうかをユーザーに決めさせるのがベストプラクティスです。 |
2 | setWatermark は、ファイルのタイムスタンプをウォーターマークとして設定します。 |
ウォーターマーク値が Comparable
を実装しない場合や、自然順序以外の条件を使用する場合はどうしたらよいでしょうか?
この場合は、setWatermarkComparator
メソッドをコールして、Comparator
を PollContext
に設定します。
ポーリングが完了する前にウォーターマークの更新が設定されていない場合、ラッチを追加すれば、2 回の反復でウォーターマークが更新されることがありません。 |
accept
メソッドをコールすると PollItemStatus
が返されます。これは、列挙値であり、以下の値を取ることができます。
ACCEPTED: 項目が受け入れられ、実行用にスケジュールされました。
FILTERED_BY_WATERMARK: ウォーターマークが有効化されており、それによって除外されたため、項目が拒否されました。
ALREADY_IN_PROCESS: 羃等性が有効化されており、別のスレッドまたはノードですでに同じ項目が処理されているため、項目が拒否されました。
SOURCE_STOPPING: 入力元が停止信号を受信したため、項目が拒否されました。
これを使用して、ポーリングで返された各項目の状況をログに記録できます。
public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
@Parameter
@Optional(defaultValue = "false")
private boolean watermarkEnabled = false;
// ...
@Override
public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
if(pollContext.isSourceStopping()){
return;
}
List<FtpFileAttributes> attributesList = listFilesAttributes();
for (FtpFileAttributes attributes : attributesList) {
if(pollContext.isSourceStopping()){
break;
}
PollItemStatus status = pollContext.accept(item -> { (1)
Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
item.setResult(result);
item.setId(attributes.getPath());
if (watermarkEnabled) {
item.setWatermark(attributes.getTimestamp());
}
});
if(!status.equals(PollItemStatus.ACCEPTED)){
LOGGER.debug("Item rejected with code: " + status.name());
}
}
}
// ...
}
1 | ログに記録されている PollItemStatus を後で取得します。 |
コールバックメソッドで使用するデータを保存する必要がある場合は、item
によって SourceCallbackContext
を取得できます。
次の例では、コンテキストを取得してそのデータを保存しています。
public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
@Parameter
@Optional(defaultValue = "false")
private boolean watermarkEnabled = false;
// ...
@Override
public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
if(pollContext.isSourceStopping()){
return;
}
List<FtpFileAttributes> attributesList = listFilesAttributes();
for (FtpFileAttributes attributes : attributesList) {
if(pollContext.isSourceStopping()){
break;
}
PollItemStatus status = pollContext.accept(item -> {
SourceCallbackContext context = item.getSourceCallbackContext(); (1)
context.addVariable("attributes", attributes); (2)
Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
item.setResult(result);
item.setId(attributes.getPath());
if (watermarkEnabled) {
item.setWatermark(attributes.getTimestamp());
}
});
if(!status.equals(PollItemStatus.ACCEPTED)){
LOGGER.debug("Item rejected with code: " + status.name());
}
}
}
// ...
}
1 | item からコンテキストを取得します。 |
2 | データを SourceCallbackContext に保存します。 |
このメソッドは、ディスパッチされた項目のいずれかが拒否されるとコールされます。このメソッドは、PollItem
の結果セットに関連付けられているすべてのリソースを解放します。
次の例は、このメソッドの実装方法を示しています。
public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
@Override
public void onRejectedItem(Result<InputStream, FtpFileAttributes> result, SourceCallbackContext callbackContext) {
closeFileStream(result);
}
// ...
}
SDK は、scheduling strategy
パラメーターを自動的に入力元に追加し、ポーリングの実行を処理する戦略 (fixed-frequency
など) をユーザーが指定できるようにします。Runtime は、指定された戦略に従って poll
メソッドの実行を自動的にスケジュールします。
ポーリング入力元を使用する FTP ディレクトリリスナーの例を示します。
<ftp:listener config-ref="config" directory="path/" watermarkEnabled="true">
<scheduling-strategy>
<fixed-frequency startDelay="40000" frequency="1000" timeUnit="MILLISECONDS"/>
</scheduling-strategy>
</ftp:listener>