public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
@Override
public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
// ...
}
@Override
public void onRejectedItem(Result<InputStream, FtpFileAttributes> result, SourceCallbackContext callbackContext) {
// ...
}
// ...
}
Polling Sources
Available since version 1.1
Instead of getting triggered by an external action, Polling Sources are sources that poll items periodically depending on a given scheduling strategy
and automatically handle idempotency and watermarking. On every poll action, many items can be dispatched to the flow, each as a single Message
.
Creating a Polling Source
To create a Polling Source, you need to add the Source
class to the @Sources
annotation on the extension class, just like regular Sources. For a Source
to behave as a PollingSource
, instead of extending the class Source<T,A>
, the class needs to extend from PollingSource<T, A>
.
When extending from PollingSource<T, A>
, you need to implement the methods poll
and onRejectedItem
.
-
poll
is responsible for obtaining the items to be dispatched and communicating with them through thepollContext
using theaccept
method. -
onRejectedItem
is called when one of the dispatch items was rejected (for example, by watermarking idempotency or server overload). This method is called to release any resources associated with the result set for thePollItem
.
Here is an example of the declaration of these methods on the directory listener implementation for the FTP connector:
Implementing the Poll Method
This method gathers all the items to be dispatched and processes them one by one.
The accept
method from the PollContext
is in charge of dispatching the polled items into the flow. It receives Consumer<PollItem>
as a parameter.
This example shows how to set the Message to dispatch to the flow:
public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
@Override
public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
if(pollContext.isSourceStopping()){ (1)
return;
}
List<FtpFileAttributes> attributesList = listFilesAttributes();
for (FtpFileAttributes attributes : attributesList) {
if(pollContext.isSourceStopping()){ (1)
break;
}
pollContext.accept(item -> { (2)
Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
item.setResult(result); (3)
});
}
}
// ...
}
1 | Note that the pollContext provides information on whether the Source is stopping. This must be taken into account before any time-consuming task. |
2 | accept dispatches an item to the flow. |
3 | setResult is used to set the Message that is dispatched. |
Idempotency
Idempotency prevents items from getting polled twice. The SDK ensures that
no other thread or cluster node is processing an item of the same ID, which provides processing idempotency. No two items with the same ID will be processed at the same time. If a node polls an item with the ID of an item that is being processed, the item will be dropped, and the onRejectedItem
method will be called.
public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
@Override
public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
if(pollContext.isSourceStopping()){
return;
}
List<FtpFileAttributes> attributesList = listFilesAttributes();
for (FtpFileAttributes attributes : attributesList) {
if(pollContext.isSourceStopping()){
break;
}
pollContext.accept(item -> {
Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
item.setResult(result);
item.setId(attributes.getPath()); (1)
});
}
}
// ...
}
1 | setId is used to set the ID used for idempotency. |
Watermarking
You can use watermarking when you are polling an incremental source of data and you only want the new data instead of manually dividing the new data from the old data on each call.
By setting an incrementing value to the item’s watermark, every other poll will drop and call onRejectedItem
on any of the item’s watermark values that are lower than the current watermark value. In the example above, the chosen watermark is the timestamp of the file.
What criteria does the SDK use to compare watermarks?
If the watermarks that you use implement Comparable
, the natural order of those values will be used. There is no need to set a criterion.
public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
@Parameter
@Optional(defaultValue = "false")
private boolean watermarkEnabled = false; (1)
// ...
@Override
public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
if(pollContext.isSourceStopping()){
return;
}
List<FtpFileAttributes> attributesList = listFilesAttributes();
for (FtpFileAttributes attributes : attributesList) {
if(pollContext.isSourceStopping()){
break;
}
pollContext.accept(item -> {
Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
item.setResult(result);
item.setId(attributes.getPath());
if (watermarkEnabled) { (1)
item.setWatermark(attributes.getTimestamp()); (2)
}
});
}
}
// ...
}
1 | A typical best practice is to let the user decide whether to use watermarking or not. |
2 | setWatermark sets the file timestamp as watermark. |
What if the watermark values do not implement Comparable
, or what if I want to use another criteria that is not the natural order?
You can simply set a Comparator
to the PollContext
by calling its method setWatermarkComparator
.
If the watermark updates are not set before polling is complete, you can add a latch to avoid updating the watermark in two iterations. |
PollItemStatus
If you call the accept
method, you get a PollItemStatus
in return. This is an Enum
that can take the following values:
-
ACCEPTED: The item was accepted and has been scheduled for execution.
-
FILTERED_BY_WATERMARK: The item was rejected because watermarking was enabled, so it was filtered.
-
ALREADY_IN_PROCESS: The item was rejected because idempotency was enabled and another thread or node is already processing this item.
-
SOURCE_STOPPING: The item was rejected because the source has received the stop signal.
You could use this to log the status of each item polled:
public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
@Parameter
@Optional(defaultValue = "false")
private boolean watermarkEnabled = false;
// ...
@Override
public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
if(pollContext.isSourceStopping()){
return;
}
List<FtpFileAttributes> attributesList = listFilesAttributes();
for (FtpFileAttributes attributes : attributesList) {
if(pollContext.isSourceStopping()){
break;
}
PollItemStatus status = pollContext.accept(item -> { (1)
Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
item.setResult(result);
item.setId(attributes.getPath());
if (watermarkEnabled) {
item.setWatermark(attributes.getTimestamp());
}
});
if(!status.equals(PollItemStatus.ACCEPTED)){
LOGGER.debug("Item rejected with code: " + status.name());
}
}
}
// ...
}
1 | Get the PollItemStatus that is logged later. |
Getting the Source Callback Context
If you need to save data to be used on the callback methods, item
provides the SourceCallbackContext
for it.
This example shows how to get the context and then save data in it:
public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
@Parameter
@Optional(defaultValue = "false")
private boolean watermarkEnabled = false;
// ...
@Override
public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
if(pollContext.isSourceStopping()){
return;
}
List<FtpFileAttributes> attributesList = listFilesAttributes();
for (FtpFileAttributes attributes : attributesList) {
if(pollContext.isSourceStopping()){
break;
}
PollItemStatus status = pollContext.accept(item -> {
SourceCallbackContext context = item.getSourceCallbackContext(); (1)
context.addVariable("attributes", attributes); (2)
Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
item.setResult(result);
item.setId(attributes.getPath());
if (watermarkEnabled) {
item.setWatermark(attributes.getTimestamp());
}
});
if(!status.equals(PollItemStatus.ACCEPTED)){
LOGGER.debug("Item rejected with code: " + status.name());
}
}
}
// ...
}
1 | Gets the context from item . |
2 | Saves data on the SourceCallbackContext . |
Implementing the OnRejectedItem Method
This method is called when one of the dispatched items was rejected. It should release
any resource associated with the result set for the PollItem
.
This example shows how to implement it:
public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
@Override
public void onRejectedItem(Result<InputStream, FtpFileAttributes> result, SourceCallbackContext callbackContext) {
closeFileStream(result);
}
// ...
}
How to Use Polling Sources in a Flow
The SDK automatically adds a scheduling strategy
parameter to the source, enabling the user to provide any strategy (like fixed-frequency
) to handle the polling executions. The runtime will use that strategy to automatically schedule executions of the poll
method.
Here is an example of the FTP directory listener that uses the polling source:
<ftp:listener config-ref="config" directory="path/" watermarkEnabled="true">
<scheduling-strategy>
<fixed-frequency startDelay="40000" frequency="1000" timeUnit="MILLISECONDS"/>
</scheduling-strategy>
</ftp:listener>