アグリゲータモジュールリファレンス

アグリゲータは、データを受信し、そのデータを処理して値を抽出し、その値を集約された要素のリストに追加するコンポーネントです。その後設定に応じて、要素のリストがコンポーネントのセットに送信されます。

アグリゲータはパススルールータです。つまり、アグリゲータが受信したデータと同じデータが後続のコンポーネントによって処理されます。唯一の変更として、集約ルートの実行時に変数が伝達されます (変数が設定されている場合)。

アグリゲータが保存値を解放すると、アグリゲータリスナを介して、アグリゲータ自体のルートまたは別の Mule フロー内で、集約された要素のリストが処理されます。そのすべては、使用されているアグリゲータの種別と設定によって異なります。

設定

このモジュールは操作のみに基づくため、アグリゲータの設定はありません。異なる種別の各アグリゲータは、そのパラメータを介して設定可能な異なる操作です。

いくつかの一般的なパラメータは重要であるため、事前に説明します。

  • Content (コンテンツ):

    何を集約するかを定義する式。評価の結果は、集約に保存された値です。 アグリゲータに関連するすべてのデータはオブジェクトストアに保存されるため、コンテンツの値は Serializable であることが非常に重要です。 そうでない場合、モジュールが適切に動作するという保証はありません

  • Object Store (オブジェクトストア):

    アグリゲータに関するすべての情報はオブジェクトストアに保存されます。オブジェクトストアの種別は、コンポーネントの予測される動作に関連する必要があります。

    一般的に次の特徴があります。

    • 永続オブジェクトストア (デフォルト): より信頼性が高く、再起動後のデータ復旧が可能。低速。

    • メモリ内オブジェクトストア: 高速。アプリケーションを再起動するとすべてのデータが失われる。

アグリゲータ種別

サイズベース、時間ベース、およびグループベースのアグリゲータがあります。

サイズベースのアグリゲータ

<aggregators:size-based-aggregator  name="sizeBasedAggregator"
                                    maxSize="10"
                                    timeout="60"
                                    timeoutUnit="MINUTES"
                                    objectStore="exampleObjectStore">
    <aggregators:content>
        #[payload]
    </aggregators:content>
    <aggregators:incremental-aggregation>
        ...
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        ...
    </aggregators:aggregation-complete>
</aggregators:size-based-aggregator>

この例では、定義済みのサイズに達するまで要素を集約し、ルートとリスナを実行します。

パラメータ

名前 説明 デフォルト値 必須

Name (名前)

String (文字列)

アグリゲータの名前。後でアグリゲータリスナで参照できます。

x

Content (コンテンツ)

Expression (式)

集約する対象を定義する式。評価の結果は、集約に保存された値です。

#[payload]

Max Size (最大サイズ)

Number (数値)

集約が完了とみなされる前に集約される要素の合計数。

x

Timeout (タイムアウト)

Number (数値)

集約が完了するのを待機する最大時間。要素の合計数が最大サイズに達する前にタイムアウトすると、集約は完了とみなされます。グループが頻繁にタイムアウトすることを避けるため、0 の値はサポートされていません。

-1 (制限なし)

Timeout unit (タイムアウト単位)

Time Unit (時間単位)

タイムアウトを測定する時間単位。

SECONDS (秒)

Object Store (オブジェクトストア)

Object Store (オブジェクトストア)

グローバルオブジェクトストアを参照する名前、または集約された要素が保存される非公開オブジェクトストアの定義。

デフォルトのオブジェクトストアパーティション。

Aggregation Complete Route (集約完了ルート)

Route (ルート)

集約の完了時に実行するコンポーネントチェーン。

x

Incremental Aggregation Route (増分集約ルート)

Route (ルート)

集約されたすべての新規要素で実行するコンポーネントチェーン。ペイロードは、(この集約内の) 最初に集約された要素から現在集約中の要素までのすべての要素のリストです。

発生

  • AGGREGATORS:AGGREGATOR_CONFIG

    maxSize または timeout の値が無効な場合 (maxSize < 0 など)。

  • AGGREGATORS:OBJECT_STORE_ACCESS

    集約された値の保存に使用される ObjectStore へのアクセスに問題がある場合。


サイズベースのアグリゲータ
  1. 入力データ

  2. 集約された要素 (未完了)

  3. 集約された要素 (完了)

  4. 入力データ (1 と同じ) + 新しい変数

  5. 集約された要素 (次のいずれか):

    1. maxSize に到達 (3 と同じ)

    2. timeout (タイムアウトグループが受け入れられた場合のみ)


時間ベースのアグリゲータ

<aggregators:time-based-aggregator  name="timeBasedAggregator"
                                    period="60"
                                    periodUnit="MINUTES"
                                    maxSize="10"
                                    objectStore="exampleObjectStore">
    <aggregators:content>
        #[payload]
    </aggregators:content>
    <aggregators:incremental-aggregation>
        ...
    </aggregators:incremental-aggregation>
</aggregators:time-based-aggregator>

期間が完了するまで要素を集約し、ルートとリスナを実行します。

パラメータ

名前 説明 デフォルト値 必須

Name (名前)

String (文字列)

アグリゲータの名前。後でアグリゲータリスナで参照できます。

x

Content (コンテンツ)

Expression (式)

集約する対象を定義する式。評価の結果は、集約に保存された値です。

#[payload]

Period (期間)

Number (数値)

集約が完了とみなされるまで待機する期間。

x

Period unit (期間単位)

Time Unit (時間単位)

期間を測定する時間単位。

SECONDS (秒)

Max Size (最大サイズ)

Number (数値)

集約が完了とみなされる前に集約される要素の合計数。

-1 (制限なし)

Object Store (オブジェクトストア)

Object Store (オブジェクトストア)

グローバルオブジェクトストアを参照する名前、または集約された要素が保存される非公開オブジェクトストアの定義。

デフォルトのオブジェクトストアパーティション。

Incremental Aggregation Route (増分集約ルート)

Route (ルート)

集約されたすべての新規要素で実行するコンポーネントチェーン。ペイロードは、(この集約内の) 最初に集約された要素から現在集約中の要素までのすべての要素のリストです。

発生

  • AGGREGATORS:AGGREGATOR_CONFIG

    period または maxSize の値が無効な場合 (Period = 0 など)。

  • AGGREGATORS:OBJECT_STORE_ACCESS

    集約された値の保存に使用される ObjectStore へのアクセスに問題がある場合。


時間ベースのアグリゲータ
  1. 入力データ

  2. 集約された要素 (未完了)

  3. 集約された要素 (次のいずれか):

    1. period が完了 (タイムアウトグループが受け入れられた場合のみ)

    2. maxSize に到達

  4. 入力データ (1 と同じ) + 新しい変数


グループベースのアグリゲータ

<aggregators:group-based-aggregator name="groupBasedAggregator"
                                    groupId="#[correlationId]"
                                    groupSize="#[itemSequenceInfo.sequenceSize]"
                                    evictionTime="180"
                                    evictionTimeUnit="SECONDS"
                                    timeout="60"
                                    timeoutUnit="MINUTES"
                                    objectStore="exampleObjectStore">
    <aggregators:content>
        #[payload]
    </aggregators:content>
    <aggregators:incremental-aggregation>
        ...
    </aggregators:incremental-aggregation>
    <aggregators:aggregation-complete>
        ...
    </aggregators:aggregation-complete>
</aggregators:group-based-aggregator>

グループ ID に従って異なるグループで要素を集約します。

新しい要素がアグリゲータに到達するたびに、ID が解決されます。その ID のグループがすでにアグリゲータに存在する場合、値はそのグループに追加されます。それ以外の場合、その ID の新しいグループが作成され、受信した要素がそのグループの集約の最初の要素になります。

グループベースのアグリゲータには、いくつかの重要な概念があります。

  • グループのタイムアウト: グループに必要なすべての要素が指定時間内に到着しなかったため、グループを解放する必要がある場合。グループがタイムアウトし、まだ除去されていない場合、そのグループに新しい値を追加しようとする試みは拒否されます。

  • グループの除去: グループが完了したかタイムアウトしたかに関わらず、グループがアグリゲータから削除された場合。そのグループの ID を持つ新しい要素をアグリゲータが受信すると、そのグループが再度作成されます。

最後に、グループベースのアグリゲータに到達する要素が分割されたシーケンスから渡された場合 (ForEach コンポーネントによってなど)、それぞれに異なる sequenceNumber が割り当てられます。その場合、それらは集約リリースの前に昇順に並び替えられます。

パラメータ

名前 説明 デフォルト値 必須

Name (名前)

String (文字列)

アグリゲータの名前。後でアグリゲータリスナで参照するために使用されます。

x

Content (コンテンツ)

Expression (式)

集約する対象を定義する式。評価の結果は、集約に保存された値です。

#[payload]

Group Id (グループ ID)

Expression (式)

集約する必要があるグループの ID を取得するため、受信した各新規メッセージで評価される式。

#[correlationId]

Group Size (グループサイズ)

Number (数値)

解決されたグループ ID のグループに割り当てる最大サイズ。グループ ID が同じすべてのメッセージは、同じグループサイズにする必要があります。そうしないと、最初に解決されたグループサイズのみが正しいとみなされ、そのサイズに一致しないすべてのメッセージに警告が記録されます。

#[itemSequenceInfo.sequenceSize]

Eviction Time (除去時間)

Number (数値)

完了またはタイムアウトしたグループ ID を記憶する時間 (0: 記憶しない、-1: 永久に記憶)。

180

Eviction Time Unit (除去時間単位)

Time Unit (時間単位)

除去時間の時間単位。

SECONDS (秒)

Timeout (タイムアウト)

Number (数値)

グループの集約が完了するのを待機する最大時間。そのグループ内の要素の合計数がグループのサイズと同じになる前にタイムアウトすると、集約は完了とみなされます。グループが頻繁にタイムアウトすることを避けるため、0 の値はサポートされていません。

-1 (制限なし)

Timeout unit (タイムアウト単位)

Time Unit (時間単位)

タイムアウトを測定する時間単位。

SECONDS (秒)

Object Store (オブジェクトストア)

Object Store (オブジェクトストア)

グローバルオブジェクトストアを参照する名前、または集約された要素が保存される非公開オブジェクトストアの定義。

デフォルトのオブジェクトストアパーティション

Aggregation Complete Route (集約完了ルート)

Route (ルート)

集約の完了時に実行するコンポーネントチェーン。

x

Incremental Aggregation Route (増分集約ルート)

Route (ルート)

集約されたすべての新規要素で実行するコンポーネントチェーン。ペイロードは、(この集約内の) 最初に集約された要素から現在集約中の要素までのすべての要素のリストです。

発生

  • AGGREGATORS:GROUP_COMPLETED

    新しい要素をすでに完了したグループに追加する必要がある場合 (およびそのグループがまだ除去されてない場合)。

  • AGGREGATORS:GROUP_TIMED_OUT

    新しい要素をタイムアウトしたグループに追加する必要がある場合 (およびそのグループがまだ除去されていない場合)。

  • AGGREGATORS:NO_GROUP_ID

    グループ ID に解決される式が null を返した場合。

  • AGGREGATORS:NO_GROUP_SIZE

    グループサイズに解決される式が null を返した場合。

  • AGGREGATORS:AGGREGATOR_CONFIG

    グループサイズまたはタイムアウトの値が無効な場合 (groupSize < 0 など)。

  • AGGREGATORS:OBJECT_STORE_ACCESS

    集約された値の保存に使用される ObjectStore へのアクセスに問題がある場合。


グループベースのアグリゲータ
  1. 入力データ

  2. 集約された要素 (未完了)

  3. 集約された要素 (完了)

  4. 入力データ (1 と同じ) + 新しい変数

  5. 集約された要素 (次のいずれか):

    1. groupSize に到達 (3 と同じ)

    2. timeout (タイムアウトグループが受け入れられた場合のみ)

Sources (ソース)

アグリゲータリスナ

<aggregators:aggregator-listener aggregatorName="exampleAggregator" includeTimedOutGroups="false">

リスナは、フロー内にあるアグリゲータのみを参照できます。アグリゲータがサブフローで宣言されている場合、リスナからは参照できません。

リスナで参照されるアグリゲータが集約を完了すると、すべての要素のリストを使用してリスナがトリガされます。集約リスナはあらゆるアグリゲータで使用できますが、特に時間駆動型 (非同期) 集約で重要です。そのような集約は非同期的にトリガされるため、アグリゲータルートを実行せず、アグリゲータリスナをソースとして使用するフロー内のコンポーネントのみに到達できます。

パラメータ

名前 説明 デフォルト値 必須

Aggregator Name (アグリゲータ名)

String (文字列)

リスンするアグリゲータの名前。そのアグリゲータがその要素を解放すると、リスナが実行されます。各リスナが参照できるアグリゲータは 1 つのみで、各アグリゲータは最大 1 つのリスナでのみ参照可能です。

x

Include Timed Out Groups (タイムアウトグループを含める)

Boolean (ブール)

タイムアウトによりグループが解放されたときに、リスナがトリガされるかどうかを示します。

false

集約属性

メッセージの集約が行われるたびに、その集約に関する情報を含む属性がいくつか追加されます。

名前 説明

Aggregation ID (集約 ID)

String (文字列)

要素が集約されたグループの ID。集約戦略でグループ別に集約しない場合、この項目は集約が解放されるまで保持される自動生成値になります (グループベースおよび時間ベースのアグリゲータと同様)。

First Item Arrival Time (最初の項目の到着時間)

Date (日付)

最初の値が集約された時間。

Last Item Arrival Time (最後の項目到着時間)

Date (日付)

最後の値が集約された時間。

Is Aggregation Complete (集約完了)

Boolean (ブール)

集約が完了した場合は True、そうでない場合は False。

非同期集約と同期集約の比較

集約完了のトリガには、同期と非同期の 2 つがあります。
設定にあるように、集約はリストに追加される新しい要素に基づいて (最大サイズが指定されている場合)、またはタイムアウトや期間の完了に基づいて完了とみなされます。要素のリストを使用して実行するコンポーネントのチェーンは集約の種別によって決まるため、これは重要です。

アグリゲータに関連付けられた各時間カウンタは、グループの最初のメッセージが到着した瞬間からカウントを開始します。集約が完了すると、カウンタはリセットされ、次の要素の到着を待ちます。
単一グループのアグリゲータ (時間ベースおよびサイズベースのアグリゲータ) の場合、時間カウンタは 1 つのみですが、グループベースのアグリゲータの場合、グループごとに 1 つのカウンタがあります。

新しい要素が到着したことで完了した集約の場合 (同期)、次のうち少なくとも 1 つの動作が発生します。

  • アグリゲータが aggregation-complete ルートで設定されている場合、そのルート内のコンポーネントがペイロード (集約された要素のリスト) を使用して実行される。

  • アグリゲータにリスナがフックされている場合、そのリスナが属するフローがペイロード (集約された要素のリスト) を使用して実行される。

期間またはタイムアウトに達したことで集約が完了した場合 (非同期)、発生する可能性がある動作は次の 1 つのみです。

  • アグリゲータにリスナがフックされており、リスナがタイムアウトした集約を受け入れる場合、そのリスナが属するフローがペイロード (集約された要素のリスト) を使用して実行される。

これは非常に重要であり、アグリゲータを含むアプリケーションを作成する場合は常に必要です。

次の原則が適用されます。

  • 集約が時間に依存する場合、その処理用のすべてのロジックは、集約リスナをソースとして使用する異なるフロー内にある必要がある。

  • 到達するサイズに依存する場合、ロジックは aggregation-complete ルート内で宣言可能。

  • どちらにもなり得る場合、次の適切なアプローチを使用する。

    • ソースのないサブフローにメインロジックを追加する。isAggregationComplete 属性で、集約が解放された方法を確認できます。

    • aggregation-complete ルート内のメインロジックフローにフロー参照を追加する。このフロー参照でサブフローが実行された場合、isAggregationComplete は true になります。

    • アグリゲータをリスンし、タイムアウトグループを受け入れる集約リスナを使用する別のフローを追加する。リスナの後には、メインロジックサブフローをコールするフロー参照コンポーネントが続く必要があります。この場合、isAggregationComplete は false になります。

クラスタ内のアグリゲータ

このモジュールは、標準のクラスタ内で動作するように開発されています。ただし、予期しない動作を防ぐため、次の設定の詳細を考慮する必要があります。

非同期集約が定義されている場合、最初の要素が到着したときに、その要素はクラスタのプライマリノードでスケジュールされます。新しい値はクラスタのどのノードにも到着する可能性があるため、通知してプライマリノードでその集約をスケジュールさせる方法が必要です。 そのために、プライマリノードの別のタスクは、新しい集約をスケジュールする必要があるかどうかを定期的に判断します。 これは、新しい集約スケジュールをチェックする間隔が集約の実際のタイムアウトよりもはるかに長い場合に問題になる可能性があります。この理由は、集約がスケジュールされる前に終了したり、時間の計算でエラーが発生する可能性があるためです。

この問題を回避するため、スケジュールされる新しい集約をプライマリノードがチェックする頻度を設定できます。 次のいずれかを使用してこの値を定義できます。

  • グローバル設定プロパティ (ミリ秒) aggregatorsSchedulingPeriod

  • システムプロパティ -M-Dmule.aggregatorsSchedulingPeriod

オブジェクトストア設定

どのアグリゲータでも、グローバルオブジェクトストアを参照するか非公開オブジェクトストアを作成することで、オブジェクトストアを設定できます。

グローバル
<aggregators:size-based-aggregator name="globalOSAggregator"
                                   maxSize="10"
                                   objectStore="aGlobalObjectStore">
非公開
<aggregators:size-based-aggregator  name="privateOSAggregator" maxSize="10">
    ...
    <aggregators:object-store>
        <os:private-object-store alias="privateObjectStore" persistent="false"/>
    </aggregators:object-store>
</aggregators:group-based-aggregator>

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub