Contact Us 1-800-596-4880

Object Streaming (Pagination)

Object streaming is similar to binary streaming, except that it streams Java objects, not a raw byte stream.

Note that if you are familiar with the Devkit (for Mule 3), you can think of this feature as an evolution of the Auto Pagination feature. The main difference between this and Devkit is that the Mule SDK supports repeatable, random, and concurrent access.

Here is an example that use object streaming through an oversimplified Select operation from the Database connector:

public PagingProvider<DbConnection, Map<String, Object>> list(@Text String sql) { (1)
    return new PagingProvider<DbConnection, Map<String, Object>>() {

      private ResultSet resultSet;

      @Override
      public List<Map<String, Object>> getPage(DbConnection connection) { (2)
        if (resultSet == null) {
            resultSet = executeQuery(connection, sql); (3)
        }

        return extractPageFrom(resultSet);
      }

      @Override
      public java.util.Optional<Integer> getTotalResults(DbConnection connection) {
        return java.util.Optional.empty(); (4)
      }

      @Override
      public void close(DbConnection connection) throws MuleException {
        resultSet.close(); (5)
      }
    }
}
1 Two things to notice about the signature of this method:
  • The method returns a PagingDelegate object. PagingDelegate requires two generic types. The first one is the type of connection the delegate will use to get the next pages, and the second is the type of the elements in the page.

  • The List operation receives the parameters. However, notice that is does not receive a connection object. Paged operations cannot receive connections. Instead, they must always use the connections that the runtime passes to the PagingProvider.

    So, this signature means that it will return the results of the query in pages, using a DbConnection object, and each item in the stream will be a Map<String, Object> that represents each row in the result.

2 getPage() fetches the next page and returns it. It receives the connection to use to fetch that page. When the stream is over, this method needs to return an empty List. Implementations should ensure that no empty pages are returned before the stream is finished.
3 Depending on the operation’s logic, you might need logic to obtain the first page versus logic to obtain the next pages.
4 Another feature of object streaming is the ability to tell how many objects are actually in the streams. To determine that, you need to implement the getTotalResults() method. The problem is that not all APIs support that. For example, the Salesforce API supports that, but JDBC (the API powering the Database connector) does not. That is why this method returns a Java Optional, in this case, an empty one. Users of your paginated operation can obtain this value through an expression similar to #[payload.size].
5 Finally, the close() method will be invoked when no more objects are in the stream. This method IS NOT for closing the connection. The SDK will do that automatically. The method is for releasing any resources that this paging delegate might be holding. For example, in this case, the paging delegate has a ResultSet from which each of the page values are extracted. That ResultSet needs to be released.

Sticky Connections

By default, there is no guarantee that the connection that getPage() receives will be the same one on each invocation. And that is fine. For most APIs (especially Rest APIs), this is not only not a problem, but in some cases, it is a feature. However, there are other cases (like Database) in which all pages MUST be obtained using the SAME connection. This was impossible to do with Devkit.

Mule SDK adds sticky connections, which signal that the runtime must guarantee that the getPage() method is always invoked with the same transaction, for example:

public PagingProvider<DbConnection, Map<String, Object>> list(String sql) {
    return new PagingProvider<DbConnection, Map<String, Object>>() {

      private ResultSet resultSet;

      @Override
      public List<Map<String, Object>> getPage(DbConnection connection) {
        ...
      }

      @Override
      public java.util.Optional<Integer> getTotalResults(DbConnection connection) {
        ...
      }

      @Override
      public void close(DbConnection connection) throws MuleException {
        ...
      }

      @Override
      public boolean useStickyConnections() {
        return true;
      }
    }
}

Configuring Streaming

Just like with binary streaming, auto-paged operations automatically get the streaming configuration through <repeatable-in-memory-iterable />, <ee:repeatable-file-store-iterable />, and <non-repeatable-iterable />. For details on these elements, see binary streaming. Here is an example that configures custom file store streaming for the select example above:

<db:select sql="select * from REALLY_LARGE_TABLE">
    <ee:repeatable-file-store-iterable inMemoryObjects="100"/>
</db:select>