public PagingProvider<DbConnection, Map<String, Object>> list(@Text String sql) { (1)
return new PagingProvider<DbConnection, Map<String, Object>>() {
private ResultSet resultSet;
@Override
public List<Map<String, Object>> getPage(DbConnection connection) { (2)
if (resultSet == null) {
resultSet = executeQuery(connection, sql); (3)
}
return extractPageFrom(resultSet);
}
@Override
public java.util.Optional<Integer> getTotalResults(DbConnection connection) {
return java.util.Optional.empty(); (4)
}
@Override
public void close(DbConnection connection) throws MuleException {
resultSet.close(); (5)
}
}
}
Object Streaming (Pagination)
Object streaming is similar to binary streaming, except that it streams Java objects, not a raw byte stream.
Note that if you are familiar with the Devkit (for Mule 3), you can think of this feature as an evolution of the Auto Pagination feature. The main difference between this and Devkit is that the Mule SDK supports repeatable, random, and concurrent access.
Here is an example that use object streaming through an oversimplified Select operation from the Database connector:
1 | Two things to notice about the signature of this method:
|
2 | getPage() fetches the next page and returns it. It receives the connection to use to fetch that page. When the stream is over, this method needs to return an empty List. Implementations should ensure that no empty pages are returned before the stream is finished. |
3 | Depending on the operation’s logic, you might need logic to obtain the first page versus logic to obtain the next pages. |
4 | Another feature of object streaming is the ability to tell how many objects are actually in the streams. To determine that, you need to implement the getTotalResults() method. The problem is that not all APIs support that. For example, the Salesforce API supports that, but JDBC (the API powering the Database connector) does not. That is why this method returns a Java Optional , in
this case, an empty one. Users of your paginated operation can obtain this value through an expression similar to #[payload.size] . |
5 | Finally, the close() method will be invoked when no more objects are in the stream. This method IS NOT for closing the connection. The SDK will do that automatically. The method is for releasing any resources that this paging delegate might be holding. For example, in this case, the paging delegate has a ResultSet from which each of the page values are extracted. That ResultSet needs to be released. |
Sticky Connections
By default, there is no guarantee that the connection that getPage()
receives will be the same one on each invocation. And that is fine. For most APIs (especially Rest APIs), this is not only not a problem, but in some cases, it is a feature. However, there are other cases (like Database) in which all pages MUST be obtained using the SAME connection. This was impossible to do with Devkit.
Mule SDK adds sticky connections, which signal that the runtime must guarantee that the getPage()
method is always invoked with the same transaction, for example:
public PagingProvider<DbConnection, Map<String, Object>> list(String sql) {
return new PagingProvider<DbConnection, Map<String, Object>>() {
private ResultSet resultSet;
@Override
public List<Map<String, Object>> getPage(DbConnection connection) {
...
}
@Override
public java.util.Optional<Integer> getTotalResults(DbConnection connection) {
...
}
@Override
public void close(DbConnection connection) throws MuleException {
...
}
@Override
public boolean useStickyConnections() {
return true;
}
}
}
Configuring Streaming
Just like with binary streaming, auto-paged operations automatically get the streaming configuration through <repeatable-in-memory-iterable />
, <ee:repeatable-file-store-iterable />
, and <non-repeatable-iterable />
. For details on these elements, see binary streaming. Here is an example that configures custom file store streaming for the select example above:
<db:select sql="select * from REALLY_LARGE_TABLE">
<ee:repeatable-file-store-iterable inMemoryObjects="100"/>
</db:select>