Aggregators Module リファレンス 1.1 - Mule 4

アグリゲータースコープ​は、データを受信し、そのデータを処理して値を抽出し、その値を集約された要素のリストに追加するコンポーネントです。このプロセスが完了すると、アグリゲータースコープの設定に応じて、フロー内の一連のコンポーネントで要素のリストが処理されます。

アグリゲーターはパススルールーターです。アグリゲータースコープで受信したものと同じデータが、アグリゲータースコープの後のコンポーネントで処理されます。 唯一の変更として、集約ルートの実行中に変数が設定された場合の変数の伝搬があります。

アグリゲータースコープで保存値が解放されると、ルートコンポーネント (​[Incremental aggregation (増分集約)]​ または ​[Aggregation complete (集約完了)]​) で、​[Aggregator listener (アグリゲーターリスナー)]​ ソースを介してアグリゲータースコープ自体または別の Mule フロー内で、集約された要素のリストが処理されます。このプロセスは、使用されているアグリゲータースコープの設定によって異なります。

設定

Aggregator Module ではスコープが使用されるため、Aggregator Module のグローバル設定はありません。アグリゲーターの各種別は、そのパラメーターを介して設定可能な異なるスコープです。各スコープで共通に使用されるパラメーターを次に示します。

  • Content (コンテンツ)

    何を集約するかを定義する式。評価の結果は、集約に保存された値です。アグリゲータースコープに関連するすべてのデータはオブジェクトストアに保存されるため、コンテンツの値はシリアル化可能である必要があります。そうでない場合、モジュールが適切に動作しない場合があります。

  • Object Store (オブジェクトストア)

    アグリゲーターに関連するすべての情報はオブジェクトストアに保存されます。オブジェクトストアの種別は、コンポーネントの想定される動作に関連します。

    • 永続的なオブジェクトストア (デフォルト)

      メモリ内オブジェクトストアより遅くなりますが、信頼性は高くなります。アプリケーションの再起動後のデータ復旧が可能です。

    • メモリ内オブジェクトストア

      永続的なメモリストアよりも速くなりますが、アプリケーションが再起動するとすべてのデータが失われます。

アグリゲータースコープ

サイズベース、時間ベース、およびグループベースのアグリゲーターがあります。

サイズベースのアグリゲーター

[Size based aggregator (サイズベースのアグリゲーター)]​ スコープを使用すると、定義済みの数の要素の集約が完了するまで要素を集約することができます。

<aggregators:size-based-aggregator  name="sizeBasedAggregator"
                                    maxSize="10"
                                    timeout="60"
                                    timeoutUnit="MINUTES"
                                    objectStore="exampleObjectStore">
    <aggregators:content>
        #[payload]
    </aggregators:content>
    <aggregators:incremental-aggregation>
        ...
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        ...
    </aggregators:aggregation-complete>
</aggregators:size-based-aggregator>

要素が、指定された ​[Max size (最大サイズ)]​ に達すると、2 つのことが実行されます。

  • ストレージ内の要素が削除され、次の要素は新しい集約に属します。

  • 集約された要素を使用して ​[Aggregation complete (集約完了)]​ ルートが実行されます。

また、​[Aggregator listener (アグリゲーターリスナー)]​ がアグリゲータースコープに登録されている場合、同じ要素セットを使用してリスナーのコールバックが実行されます。
[Incremental aggregation (増分集約)]​ ルートが null ではなく、​[Max size (最大サイズ)]​ に達していない場合、集約された最後の要素を含めすべての集約された要素を使用してルートコンポーネントチェーンが実行されます。

アグリゲータースコープの ​[Timeout (タイムアウト)]​ を定義することもできます。この場合、このタイムアウトを遅延として使用するスケジュールされたタスクが実行用に登録されます。この時間は最初の要素が到着した時間から計算されます。また、実行を待機している別のタスクがある場合、追加のタスクはスケジュールされません。タイムアウトが発生すると、参照先のリスナーは、タイムアウトによって呼び出されることがサポートされている場合のみ、実行されます。

パラメーター

名前 説明 デフォルト値 必須

Name (名前)

String (文字列)

アグリゲーターの名前。[Aggregator listener (アグリゲーターリスナー)] は後で名前でスコープを参照できます。

x

Content (コンテンツ)

Expression (式)

何を集約するかを定義する式。評価の結果は、集約に保存された値です。

#[payload]

Max Size (最大サイズ)

Number (数値)

集約が完了とみなされる前に集約される要素の合計数。

x

Timeout (タイムアウト)

Number (数値)

集約が完了するのを待機する最大時間。要素の合計数が [Max Size (最大サイズ)] に達する前にタイムアウトすると、集約は完了とみなされません。グループが頻繁にタイムアウトすることを避けるため、​0​ の値はサポートされていません。

-1(UNLIMITED)

Timeout unit (タイムアウト単位)

Time Unit (時間単位)

タイムアウトを測定する時間単位。

SECONDS (秒)

Object Store (オブジェクトストア)

Object Store (オブジェクトストア)

集約された要素が保存されるグローバルオブジェクトストアを参照する名前または非公開オブジェクトストアの定義。

デフォルトのオブジェクトストアパーティション。

Aggregation Complete Route (集約完了ルート)

Route (ルート)

集約の完了時に実行するコンポーネントチェーン。タイムアウトした場合、集約完了ルートは実行されません。

x

Incremental Aggregation Route (増分集約ルート)

Route (ルート)

集約されたすべての新規要素で実行するコンポーネントチェーン。ペイロードは、この集約で集約されたすべての要素 (最初の要素から現在集約中の要素まで) のリストです。

発生

  • AGGREGATORS:AGGREGATOR_CONFIG

[Max size (最大サイズ)] または [Timeout (タイムアウト)] 項目の値が無効です (​maxSize < 0​ など)。

  • AGGREGATORS:OBJECT_STORE_ACCESS

集約された値の保存に使用されるオブジェクトストアへのアクセス中にエラーが発生しました。

時間ベースのアグリゲーター

[Time based aggregator (時間ベースのアグリゲーター)]​ スコープを使用すると、指定した時間制限に達するまで要素を集約することができます。

<aggregators:time-based-aggregator  name="timeBasedAggregator"
                                    period="60"
                                    periodUnit="MINUTES"
                                    maxSize="10"
                                    objectStore="exampleObjectStore">
    <aggregators:content>
        #[payload]
    </aggregators:content>
    <aggregators:incremental-aggregation>
        ...
    </aggregators:incremental-aggregation>
</aggregators:time-based-aggregator>

考慮される期間は、最初の要素が到着した時間から計算されます。集約が解放されても、次の要素が到着するまでタイマーは開始されません。

アグリゲーターでは、​[Max size (最大サイズ)]​ が設定されていない限り、新しい要素が到着するたびに ​[Incremental aggregation (増分集約)]​ ルートを実行することもできます。 この場合、​[Incremental aggregation (増分集約)]​ ルートは、集約された要素のサイズが ​[Max size (最大サイズ)]​ に達した場合を除き、毎回実行されます。この時点で ​[Aggregator listener (アグリゲーターリスナー)]​ が存在する場合、リスナーのコールバックも実行されます。

パラメーター

名前 説明 デフォルト値 必須

Name (名前)

String (文字列)

アグリゲーターの名前。[Aggregator listener (アグリゲーターリスナー)] は後で名前でスコープを参照できます。

x

Content (コンテンツ)

Expression (式)

何を集約するかを定義する式。評価の結果は、集約に保存された値です。

#[payload]

Period (期間)

Number (数値)

集約が完了とみなされるまで待機する期間。

x

Period unit (期間単位)

Time Unit (時間単位)

期間を測定する時間単位。

SECONDS (秒)

Max Size (最大サイズ)

Number (数値)

集約が完了とみなされる前に集約される要素の合計数。

-1(UNLIMITED)

Object Store (オブジェクトストア)

Object Store (オブジェクトストア)

集約された要素が保存されるグローバルオブジェクトストアを参照する名前または非公開オブジェクトストアの定義。

デフォルトのオブジェクトストアパーティション。

Incremental Aggregation Route (増分集約ルート)

Route (ルート)

集約されたすべての新規要素で実行するコンポーネントチェーン。ペイロードは、この集約で集約されたすべての要素 (最初の要素から現在集約中の要素まで) のリストです。

発生

  • AGGREGATORS:AGGREGATOR_CONFIG

[Period (期間)] または [Max size (最大サイズ)] 項目の値が無効です (期間が 0 など)。

  • AGGREGATORS:OBJECT_STORE_ACCESS

集約された値の保存に使用されるオブジェクトストアへのアクセス中にエラーが発生しました。

グループベースのアグリゲーター

[Group based aggregator (グループベースのアグリゲーター)]​ スコープを使用すると、要素をグループ ID でグループに集約できます。

<aggregators:group-based-aggregator name="groupBasedAggregator"
                                    groupId="#[correlationId]"
                                    groupSize="#[itemSequenceInfo.sequenceSize]"
                                    evictionTime="180"
                                    evictionTimeUnit="SECONDS"
                                    timeout="60"
                                    timeoutUnit="MINUTES"
                                    objectStore="exampleObjectStore">
    <aggregators:content>
        #[payload]
    </aggregators:content>
    <aggregators:incremental-aggregation>
        ...
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        ...
    </aggregators:aggregation-complete>
</aggregators:group-based-aggregator>

要素が、グループに指定された ​[Max size (最大サイズ)]​ に達すると、2 つのことが実行されます。

  • このグループの要素がストレージから削除されます。グループは完了としてマークされ、そのグループに到着するすべての新しい要素で例外が発生します。

  • この特定のグループの集約された要素を使用して ​[Aggregation complete (集約完了)]​ ルートが実行されます。

新しい要素がアグリゲーターに到達するたびに、ID が解決されます。この ID のグループがすでにアグリゲーターに存在する場合、値はそのグループに追加されます。それ以外の場合、この ID の新しいグループが作成され、受信した要素がそのグループの集約の最初の要素になります。

また、​[Aggregator listener (アグリゲーターリスナー)]​ がアグリゲータースコープに登録されている場合、同じ要素セットを使用してリスナーのコールバックが実行されます。
[Incremental aggregation (増分集約)]​ ルートが null ではなく、​[Max size (最大サイズ)]​ に達していない場合、集約された最後の要素を含めすべての集約された要素を使用してルートコンポーネントチェーンが実行されます。

[Group based aggregator (グループベースのアグリゲーター)]​ スコープではいくつかの重要な概念が導入されます。

  • グループのタイムアウト
    グループの必要なすべての要素が想定時間内に到着しなかったためにグループを解放する必要がある場合に指定します。グループがタイムアウトし、まだ除去されていない場合、新しい値を追加しようとする試みは拒否されます。

  • グループの除去
    完了したかタイムアウトしたかに関わらずグループをアグリゲーターから削除する場合に指定します。このグループの ID を持つ新しい要素をアグリゲーターが受信すると、そのグループが再度作成されます。

最後に、グループベースのアグリゲーターに到達する要素が、分割されたシーケンスから渡された場合 (​ForEach​ コンポーネントによってなど)、各要素には異なる ​sequenceNumber​ が割り当てられます。この場合、要素は集約解放の前に昇順に並び替えられます。

パラメーター

名前 説明 デフォルト値 必須

Name (名前)

String (文字列)

アグリゲーターの名前。[Aggregator listener (アグリゲーターリスナー)] は後で名前でスコープを参照できます。

x

Content (コンテンツ)

Expression (式)

何を集約するかを定義する式。評価の結果は、集約に保存された値です。

#[payload]

Group Id (グループ ID)

メッセージを集約する必要があるグループの ID を取得するため、受信した各新規メッセージで評価される式。

#[correlationId]

Group Size (グループサイズ)

Number (数値)

解決されたグループ ID のグループに割り当てる最大サイズ。グループ ID が同じすべてのメッセージは、同じグループサイズにする必要があります。そうしないと、最初に解決されたグループサイズのみが正しいとみなされます。一致しないすべてのメッセージに対して警告が記録されます。

#[itemSequenceInfo.sequenceSize]

Eviction Time (除去時間)

Number (数値)

完了またはタイムアウトしたグループ ID を記憶する時間 (0: 記憶しない、-1: 永久に記憶)。

180

Eviction Time Unit (除去時間単位)

Time Unit (時間単位)

除去時間の時間単位。

SECONDS (秒)

Timeout (タイムアウト)

Number (数値)

グループの集約が完了するのを待機する最大時間。そのグループ内の要素の合計数がグループのサイズと同じになる前にタイムアウトすると、集約は完了とみなされます。グループが頻繁にタイムアウトすることを避けるため、​0​ の値はサポートされていません。

-1(UNLIMITED)

Timeout unit (タイムアウト単位)

Time Unit (時間単位)

タイムアウトを測定する時間単位。

SECONDS (秒)

Object Store (オブジェクトストア)

Object Store (オブジェクトストア)

集約された要素が保存されるグローバルオブジェクトストアを参照する名前または非公開オブジェクトストアの定義。

デフォルトのオブジェクトストアパーティション。

Aggregation Complete Route (集約完了ルート)

Route (ルート)

集約の完了時に実行するコンポーネントチェーン。

x

Incremental Aggregation Route (増分集約ルート)

Route (ルート)

集約されたすべての新規要素で実行するコンポーネントチェーン。ペイロードは、この集約で集約されたすべての要素 (最初の要素から現在集約中の要素まで) のリストです。

発生

  • AGGREGATORS:GROUP_COMPLETED

まだ除去されていない完了済みのグループへの新しい要素の追加中にエラーが発生しました。

  • AGGREGATORS:GROUP_TIMED_OUT

タイムアウトしてまだ除去されていないグループへの新しい要素の追加中にエラーが発生しました。

  • AGGREGATORS:NO_GROUP_ID

グループ ID に解決される式が null を返しました。

  • AGGREGATORS:NO_GROUP_SIZE

グループサイズに解決される式が null を返しました。

  • AGGREGATORS:AGGREGATOR_CONFIG

[Group size (グループサイズ)] または [Timeout (タイムアウト)] 項目の値が無効です (グループサイズが 0 以下など)。

  • AGGREGATORS:OBJECT_STORE_ACCESS

集約された値の保存に使用されるオブジェクトストアへのアクセス中にエラーが発生しました。

ソース

アグリゲーターリスナー

[Aggregator listener (アグリゲーターリスナー)]​ は、アグリゲータースコープによりトリガーされる要素をリスンするソースです。

<aggregators:aggregator-listener aggregatorName="exampleAggregator" includeTimedOutGroups="false">

[Aggregator listener (アグリゲーターリスナー)] は、フロー内にあるアグリゲータースコープのみを参照します。サブフローで宣言されたアグリゲータースコープを [Aggregator listener (アグリゲーターリスナー)] は参照できません。

リスナーで参照されるアグリゲータースコープが集約を完了すると、すべての要素のリストを使用してリスナーがトリガーされます。
[Aggregator listener (アグリゲーターリスナー)]​ はソースであるため、アグリゲーターとは異なるフローに配置されます。リスナーはアグリゲーターのフローのコンテキストにアクセスできないため、そのフローの変数にアクセスできません。

集約リスナーはあらゆるアグリゲーターで使用できますが、特に時間駆動型の非同期集約で重要です。非同期集約では、アグリゲータールートが実行されず、[Aggregator listener (アグリゲーターリスナー)] をソースとするフロー内のコンポーネントにのみアクセスできます。

パラメーター

名前 説明 デフォルト値 必須

Aggregator Name (アグリゲーター名)

String (文字列)

リスンするアグリゲーターの名前。このアグリゲーターがその要素を解放すると、リスナーが実行されます。各リスナーが参照できるアグリゲーターは 1 つのみで、各アグリゲーターは最大 1 つのリスナーで参照可能です。

x

Include Timed Out Groups (タイムアウトグループを含める)

Boolean (ブール)

タイムアウトによりグループが解放されたときに、リスナーがトリガーされるかどうかを示します。

false

集約属性

メッセージの集約が行われるたびに、その集約に関する情報を含む属性がメッセージにいくつか追加されます。

名前 説明

Aggregation ID (集約 ID)

String (文字列)

要素が集約されたグループの ID。集約戦略でグループ別に集約しない場合、この項目は集約が解放されるまで保持される自動生成値になります (グループベースおよび時間ベースのアグリゲーターと同様)。

First Item Arrival Time (最初の項目の到着時間)

Date (日付)

最初の値が集約された時間。

Last Item Arrival Time (最後の項目の到着時間)

Date (日付)

最後の値が集約された時間。

Is Aggregation Complete (集約完了)

Boolean (ブール)

集約が完了した場合は True、そうでない場合は False。

Redelivery Policy (再配信ポリシー)

エラーを生成する要求を実行するための再配信ポリシーを設定します。再配信ポリシーは、フロー内のあらゆる提供元に追加できます。

項目 説明 デフォルト値 必須

Max Redelivery Count (最大再配信数)

Number (数値)

​REDELIVERY_EXHAUSTED​ エラーが返されるまでに、再配信された要求が処理に失敗できる最大回数。

Use Secure Hash (セキュアハッシュを使用)

Boolean (ブール)

true​ の場合、再配信されたメッセージの識別にセキュアハッシュアルゴリズムが使用されます。

Message Digest Algorithm (メッセージダイジェストアルゴリズム)

String (文字列)

<b>[Use Secure Hash (セキュアハッシュを使用)]<b/> 項目が ​true​ の場合に使用するセキュアハッシュアルゴリズム。メッセージのペイロードが Java オブジェクトの場合、Mule ではこの値が無視され、ペイロードの ​hashCode()​ が返した値が返されます。

Id Expression (ID の式)

String (文字列)

メッセージがいつ再配信されたのかを判断する 1 つ以上の式。このプロパティは、<b>[Use Secure Hash (セキュアハッシュを使用)]</b> 項目が ​false​ の場合にのみ設定できます。

Object Store (オブジェクトストア)

Object Store (オブジェクトストア)

各メッセージの再配信カウンターが保存されるオブジェクトストアを設定します。

システムプロパティ

プロパティ 説明 デフォルト値

mule.aggregator.executor.failOnStartIfNoQuorum

現在、クォーラム制約が満たされていない場合、Mule アプリケーションのデプロイメントは失敗します。このシステムプロパティは、Mule アプリケーションをデプロイするためにバックグラウンドでクォーラムポーリングを送信します。

True

-M-Dmule.aggregator.executor.failOnStartIfNoQuorum=false

mule.aggregator.executor.delayForQuorum

クォーラムポーリング間の時間 (ミリ秒) を設定します。このプロパティは、​mule.aggregator.executor.failOnStartIfNoQuorum​ プロパティの値が ​false​ の場合に有効になります。

100

-M-Dmule.aggregator.executor.delayForQuorum=200