Contact Us 1-800-596-4880

Polling Sources

Available since version 1.1

Instead of getting triggered by an external action, Polling Sources are sources that poll items periodically depending on a given scheduling strategy and automatically handle idempotency and watermarking. On every poll action, many items can be dispatched to the flow, each as a single Message.

Creating a Polling Source

To create a Polling Source, you need to add the Source class to the @Sources annotation on the extension class, just like regular Sources. For a Source to behave as a PollingSource, instead of extending the class Source<T,A>, the class needs to extend from PollingSource<T, A>.

When extending from PollingSource<T, A>, you need to implement the methods poll and onRejectedItem.

  • poll is responsible for obtaining the items to be dispatched and communicating with them through the pollContext using the accept method.

  • onRejectedItem is called when one of the dispatch items was rejected (for example, by watermarking idempotency or server overload). This method is called to release any resources associated with the result set for the PollItem.

Here is an example of the declaration of these methods on the directory listener implementation for the FTP connector:

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Override
  public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
    // ...
  }

  @Override
  public void onRejectedItem(Result<InputStream, FtpFileAttributes> result, SourceCallbackContext callbackContext) {
    // ...
  }
// ...
}

Implementing the Poll Method

This method gathers all the items to be dispatched and processes them one by one.

The accept method from the PollContext is in charge of dispatching the polled items into the flow. It receives Consumer<PollItem> as a parameter.

This example shows how to set the Message to dispatch to the flow:

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Override
  public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
    if(pollContext.isSourceStopping()){ (1)
      return;
    }
    List<FtpFileAttributes> attributesList = listFilesAttributes();

    for (FtpFileAttributes attributes : attributesList) {
      if(pollContext.isSourceStopping()){ (1)
        break;
      }
      pollContext.accept(item -> { (2)
        Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
        item.setResult(result); (3)
      });
    }
  }
// ...
}
1 Note that the pollContext provides information on whether the Source is stopping. This must be taken into account before any time-consuming task.
2 accept dispatches an item to the flow.
3 setResult is used to set the Message that is dispatched.

Idempotency

Idempotency prevents items from getting polled twice. The SDK ensures that no other thread or cluster node is processing an item of the same ID, which provides processing idempotency. No two items with the same ID will be processed at the same time. If a node polls an item with the ID of an item that is being processed, the item will be dropped, and the onRejectedItem method will be called.

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Override
  public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
    if(pollContext.isSourceStopping()){
      return;
    }
    List<FtpFileAttributes> attributesList = listFilesAttributes();

    for (FtpFileAttributes attributes : attributesList) {
      if(pollContext.isSourceStopping()){
        break;
      }
      pollContext.accept(item -> {
        Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
        item.setResult(result);
        item.setId(attributes.getPath());  (1)
      });
    }
  }
// ...
}
1 setId is used to set the ID used for idempotency.

Watermarking

You can use watermarking when you are polling an incremental source of data and you only want the new data instead of manually dividing the new data from the old data on each call.

By setting an incrementing value to the item’s watermark, every other poll will drop and call onRejectedItem on any of the item’s watermark values that are lower than the current watermark value. In the example above, the chosen watermark is the timestamp of the file.

What criteria does the SDK use to compare watermarks?

If the watermarks that you use implement Comparable, the natural order of those values will be used. There is no need to set a criterion.

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Parameter
  @Optional(defaultValue = "false")
  private boolean watermarkEnabled = false; (1)
// ...
  @Override
  public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
    if(pollContext.isSourceStopping()){
      return;
    }
    List<FtpFileAttributes> attributesList = listFilesAttributes();

    for (FtpFileAttributes attributes : attributesList) {
      if(pollContext.isSourceStopping()){
        break;
      }
      pollContext.accept(item -> {
        Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
        item.setResult(result);
        item.setId(attributes.getPath());
        if (watermarkEnabled) { (1)
          item.setWatermark(attributes.getTimestamp());  (2)
        }
      });
    }
  }
// ...
}
1 A typical best practice is to let the user decide whether to use watermarking or not.
2 setWatermark sets the file timestamp as watermark.

What if the watermark values do not implement Comparable, or what if I want to use another criteria that is not the natural order?

You can simply set a Comparator to the PollContext by calling its method setWatermarkComparator.

If the watermark updates are not set before polling is complete, you can add a latch to avoid updating the watermark in two iterations.

PollItemStatus

If you call the accept method, you get a PollItemStatus in return. This is an Enum that can take the following values:

  • ACCEPTED: The item was accepted and has been scheduled for execution.

  • FILTERED_BY_WATERMARK: The item was rejected because watermarking was enabled, so it was filtered.

  • ALREADY_IN_PROCESS: The item was rejected because idempotency was enabled and another thread or node is already processing this item.

  • SOURCE_STOPPING: The item was rejected because the source has received the stop signal.

You could use this to log the status of each item polled:

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Parameter
  @Optional(defaultValue = "false")
  private boolean watermarkEnabled = false;
// ...
  @Override
  public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
    if(pollContext.isSourceStopping()){
      return;
    }
    List<FtpFileAttributes> attributesList = listFilesAttributes();

    for (FtpFileAttributes attributes : attributesList) {
      if(pollContext.isSourceStopping()){
        break;
      }
      PollItemStatus status = pollContext.accept(item -> { (1)
        Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
        item.setResult(result);
        item.setId(attributes.getPath());
        if (watermarkEnabled) {
          item.setWatermark(attributes.getTimestamp());
        }
      });

      if(!status.equals(PollItemStatus.ACCEPTED)){
        LOGGER.debug("Item rejected with code:  " + status.name());
      }
    }
  }
// ...
}
1 Get the PollItemStatus that is logged later.

Getting the Source Callback Context

If you need to save data to be used on the callback methods, item provides the SourceCallbackContext for it.

This example shows how to get the context and then save data in it:

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Parameter
  @Optional(defaultValue = "false")
  private boolean watermarkEnabled = false;
// ...
  @Override
  public void poll(PollContext<InputStream, FtpFileAttributes> pollContext) {
    if(pollContext.isSourceStopping()){
      return;
    }
    List<FtpFileAttributes> attributesList = listFilesAttributes();

    for (FtpFileAttributes attributes : attributesList) {
      if(pollContext.isSourceStopping()){
        break;
      }
      PollItemStatus status = pollContext.accept(item -> {
        SourceCallbackContext context = item.getSourceCallbackContext();   (1)
        context.addVariable("attributes", attributes);  (2)
        Result<InputStream, FtpFileAttributes> result = read(attributes.getPath());
        item.setResult(result);
        item.setId(attributes.getPath());
        if (watermarkEnabled) {
          item.setWatermark(attributes.getTimestamp());
        }
      });

      if(!status.equals(PollItemStatus.ACCEPTED)){
        LOGGER.debug("Item rejected with code:  " + status.name());
      }
    }
  }
// ...
}
1 Gets the context from item.
2 Saves data on the SourceCallbackContext.

Implementing the OnRejectedItem Method

This method is called when one of the dispatched items was rejected. It should release any resource associated with the result set for the PollItem.

This example shows how to implement it:

public class FtpDirectoryListener extends PollingSource<InputStream, FtpFileAttributes> {
// ...
  @Override
    public void onRejectedItem(Result<InputStream, FtpFileAttributes> result, SourceCallbackContext callbackContext) {
      closeFileStream(result);
    }
// ...
}

How to Use Polling Sources in a Flow

The SDK automatically adds a scheduling strategy parameter to the source, enabling the user to provide any strategy (like fixed-frequency) to handle the polling executions. The runtime will use that strategy to automatically schedule executions of the poll method.

Here is an example of the FTP directory listener that uses the polling source:

<ftp:listener config-ref="config" directory="path/" watermarkEnabled="true">
  <scheduling-strategy>
    <fixed-frequency startDelay="40000" frequency="1000" timeUnit="MILLISECONDS"/>
  </scheduling-strategy>
</ftp:listener>