オブジェクトストリーミング (ページネーション)

オブジェクトストリーミングは、未加工のバイトストリームではなく Java オブジェクトをストリーミングするという点を除いては​バイナリストリーミング​と同じです。

Devkit (Mule 3) に精通しているなら、自動ページネーション機能の進化形と考えればわかりやすいでしょう。Devkit との違いは、Mule SDK では反復可能アクセス、ランダムアクセス、同時アクセスをサポートしているという点です。

データベースコネクタの Select 操作を大幅に簡素化したオブジェクトストリーミングの使用例を示します。

public PagingProvider<DbConnection, Map<String, Object>> list(@Text String sql) { (1)
    return new PagingProvider<DbConnection, Map<String, Object>>() {

      private ResultSet resultSet;

      @Override
      public List<Map<String, Object>> getPage(DbConnection connection) { (2)
        if (resultSet == null) {
            resultSet = executeQuery(connection, sql); (3)
        }

        return extractPageFrom(resultSet);
      }

      @Override
      public java.util.Optional<Integer> getTotalResults(DbConnection connection) {
        return java.util.Optional.empty(); (4)
      }

      @Override
      public void close(DbConnection connection) throws MuleException {
        resultSet.close(); (5)
      }
    }
}
1 このメソッドの署名について 2 点注意すべきことがあります。
  • メソッドは ​PagingDelegate​ オブジェクトを返します。​PagingDelegate​ は 2 つの汎用型を必要とします。1 つ目はデリゲート関数が次ページ以降を取得するために使用する接続の種別で、2 つ目はページ内の要素の種別です。

  • List 操作はパラメーターを受け取ります。ただし、接続オブジェクトは受け取りません。ページングされた操作は接続を受け取ることができないためです。代わりに、Runtime によって ​PagingProvider​ に渡される接続を常に使用する必要があります。

    そのため、この署名は ​DbConnection​ オブジェクトを使用したページのクエリ結果を返し、ストリームの各項目は結果の各行を表す ​Map<String, Object>​ となります。

2 getPage()​ は次のページをフェッチして返します。このメソッドは、ページをフェッチするために使用する接続を受け取ります。ストリームが終わったら、このメソッドは次を行う必要があります。
3 操作のロジックによって、最初のページを取得するロジックか、次のページを取得するロジックが必要な場合があります。
4 オブジェクトストリーミングの別の機能として、ストリーム内の実際のオブジェクト数を通知することができます。これには ​getTotalResults()​ メソッドを実装する必要があります。ただし、すべての API がこのメソッドをサポートしているわけではありません。たとえば、​Salesforce​API はこのメソッドをサポートしていますが JDBC (データベースコネクタを駆動する API) はサポートしていません。そのため、このメソッドは Java ​Optional​ クラス (この場合は空のクラス) を返します。ページネーション操作のユーザーは、​#[payload.size]​ のような式によってこの値を取得できます。
5 最後に、ストリーム内のオブジェクトが無くなったら ​close()​ メソッドが呼び出されます。このメソッドは接続を終了するものでは​​ありません​。接続の終了は SDK が自動的に行います。このメソッドは、このページングデリゲート関数が持っている可能性のあるリソースを解放するものです。たとえば、このケースでは、ページングデリゲート関数は、各ページ値を抽出する元となる ​ResultSet​ を持っています。この ​ResultSet​ を解放する必要があります。

スティッキー接続

デフォルトでは、​getPage()​ が受け取る接続が呼び出しごとに同じであるという保証はありません。それで別に問題はありません。ほとんどの API (特に Rest API) では、​問題ではない​だけではなく、機能の 1 つでもあります。ただし、他のケース (データベースなど) では、​同じ​​接続を使用して、すべてのページを取得する​​必要があります​。これは Devkit では不可能でした。

Mule SDK ではスティッキー接続が追加されており、​getPage()​ メソッドが常に同じトランザクションで呼び出されるように Runtime で保証しなければならないことを指示します。次に例を示します。

public PagingProvider<DbConnection, Map<String, Object>> list(String sql) {
    return new PagingProvider<DbConnection, Map<String, Object>>() {

      private ResultSet resultSet;

      @Override
      public List<Map<String, Object>> getPage(DbConnection connection) {
        ...
      }

      @Override
      public java.util.Optional<Integer> getTotalResults(DbConnection connection) {
        ...
      }

      @Override
      public void close(DbConnection connection) throws MuleException {
        ...
      }

      @Override
      public boolean useStickyConnections() {
        return true;
      }
    }
}

ストリーミングの設定

バイナリストリーミングと同様に、自動ページング操作は ​<repeatable-in-memory-iterable />​、​<ee:repeatable-file-store-iterable />​、および ​<non-repeatable-iterable />​ を介してストリーミング設定を自動的に取得します。これらの要素の詳細については、​バイナリストリーミング​についての説明を参照してください。上記の Select 操作の例でカスタムファイルストアストリーミングを設定すると次のようになります。

<db:select sql="select * from REALLY_LARGE_TABLE">
    <ee:repeatable-file-store-iterable inMemoryObjects="100"/>
</db:select>