グループベースのアグリゲーターの例 - Mule 4

Aggregators Module の ​[Group based aggregator (グループベースのアグリゲーター)]​ スコープを使用すると、要素をグループ ID でグループに集約できます。次の例は、要求で送信されたグループ ID に基づいて、グループごとに最大 ​3​ 件の HTTP 要求コールを集約するようにこのスコープを設定する方法を示しています。アプリケーションは DataWeave を使用して、ランダム ID 番号で各コールのペイロードを設定します。各グループで ​3​ 個の要素の集約が完了すると、アプリケーションはグループのすべての集約されたランダム ID 番号要素を記録します。

Anypoint Studio キャンバスでのグループベースのアグリゲーターのフローの例
Figure 1. グループベースのアグリゲーターのフローの例

独自の環境でこの例をテストするには、Mule アプリケーションを作成し、curl コマンドを使用してアプリケーションを実行する必要があります。

Mule アプリケーションの作成

Anypoint Studio で Mule フローを作成する手順は、次のとおりです。

  1. [Mule Palette (Mule パレット)]​ で、​[HTTP Listener (HTTP リスナー)]​ を選択してキャンバスにドラッグします。
    このソースは受信 HTTP メッセージ属性をリスンすることでフローを開始します。

  2. [Connector configuration (コネクタ設定)]​ で ​[HTTP_Listener_config]​ を選択します。

  3. [Path (パス)]​ を ​/test​ に設定します。

  4. [Transform message]​ コンポーネントを ​[Listener]​ の右にドラッグします。

  5. [Transform message]​ コンポーネントの DataSense プレビューウィンドウで、次の DataWeave 式を追加します。これにより、各 HTTP 要求のペイロードに ​Source Name​ が ​"group-based"​ として設定され、​Source ID​ が ​random() as String​ として設定されます。また、​message.attributes['queryParams'].groupId​ に基づいて要求の ​GroupId​ 値も取得します。

    %dw 2.0
    output application/json
    ---
    {
        "Source Name": "group-based",
        "GroupId": message.attributes['queryParams'].groupId,
        "Source ID": (random() as String)
    }
  6. [Group based aggregator (グループベースのアグリゲーター)]​ スコープを ​[Transform message]​ の右にドラッグします。

  7. [名前]​ を ​groupBasedAggregator​ に設定します。

  8. [Content (コンテンツ)]​ を ​payload​ に設定します。これは、何を集約するか (この場合は変換された HTTP 要求) を定義する式です。

  9. [Group id (グループ ID)]​ を ​payload.GroupId​ に設定します。これは、受信した各新規メッセージで評価される式であり、要素を集約するグループ ID を取得します。

  10. [Group size (グループサイズ)]​ を ​3​ に設定します。これは、解決したグループ ID を使用してグループに集約する要素の最大サイズです。

  11. [Logger]​ コンポーネントを ​[Incremental aggregation (増分集約)]​ ルートにドラッグします。

  12. [Logger]​ 設定画面で、​[Message (メッセージ)]​ を ​Doing incremental aggregation.​ に設定します。

  13. [Logger]​ コンポーネントを ​[Aggregation complete (集約完了)]​ ルートにドラッグします。

  14. [Message (メッセージ)]​ を ​Aggregation complete​ に設定します。

  15. 別の ​[Logger]​ コンポーネントを ​[Aggregation complete (集約完了)]​ 内の最初の ​[Logger]​ コンポーネントの右にドラッグします。

  16. [Message (メッセージ)]​ を次の式に設定します。この式では、すべての集約された要素が返されます。

    output application/json
    ---
    payload
  17. プロジェクトを保存します。

  18. Package Explorer​ で、​[Run (実行)] > [Run As (別のユーザーとして実行)] > [Mule Application (Mule アプリケーション)]​ をクリックします。

  19. 次の curl コマンドを 2 回送信して、アプリケーションをテストします: + ​curl -X POST http://localhost:8081/test?groupId=1​。
    これにより、2 つの HTTP 要求が生成され、グループ ID ​1​ に集約されます。

  20. コマンド ​curl -X POST http://localhost:8081/test?groupId=2​ を送信して、1 つの要求をグループ ID ​2​ に集約します。

  21. 別のコマンド ​curl -X POST http://localhost:8081/test?groupId=1​ を送信して、要求をグループ ID ​1​ に集約します。

  22. [Console (コンソール)]​ ビューに移動して、ロガーメッセージを参照します。

INFO  2021-06-14 16:34:33,921 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/0/processors/0; event: 8beae010-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation
INFO  2021-06-14 16:34:43,618 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/0/processors/0; event: 91b93be0-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation
INFO  2021-06-14 16:34:47,203 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/0/processors/0; event: 93d8e790-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation
INFO  2021-06-14 16:34:51,380 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/1/processors/0; event: 96566ab0-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Aggregation complete
INFO  2021-06-14 16:34:51,409 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/1/processors/1; event: 96566ab0-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
  {
    "Source Name": "group-based",
    "GroupId": "1",
    "Source ID": "0.5127046416902801"
  },
  {
    "Source Name": "group-based",
    "GroupId": "1",
    "Source ID": "0.20239307122656847"
  },
  {
    "Source Name": "group-based",
    "GroupId": "1",
    "Source ID": "0.7505233647798346"
  }
]

このロガーメッセージの例は、グループ ID ​1​ の最初の 2 つの HTTP 要求とグループ ID ​2​ の 3 番目の要求で、​[Incremental aggregator (増分アグリゲーター)]​ ルートの ​[Logger]​ コンポーネントによってメッセージ ​Doing incremental aggregation.​ が記録されることを示しています。
4 番目の HTTP 要求 (グループ ID ​1​ の 3 番目の要求) を送信すると、グループサイズがグループあたり ​3​ 件のコールになっているため、集約グループ ​1​ は完了します。​[Aggregation complete (集約完了)]​ ルートの 1 番目の ​[Logger]​ コンポーネントによってメッセージ ​Aggregation complete​ が記録され、2 番目の ​[Logger]​ コンポーネントによってグループの集約されたペイロード要素 (ランダム ID 番号) が返されます。

コールをグループ ​2​ に集約する別の 2 つのコマンド ​curl -X POST http://localhost:8081/test?groupId=2​ を送信すると、​[Console (コンソール)]​ ビューに次のロガーメッセージが返されます。

INFO  2021-06-14 16:53:56,262 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/0/processors/0; event: 40c07200-cd4a-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation
INFO  2021-06-14 16:53:58,686 [[MuleRuntime].uber.01: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/1/processors/0; event: 42329fa0-cd4a-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Aggregation complete
INFO  2021-06-14 16:53:58,689 [[MuleRuntime].uber.01: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/1/processors/1; event: 42329fa0-cd4a-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
  {
    "Source Name": "group-based",
    "GroupId": "2",
    "Source ID": "0.8390326267445475"
  },
  {
    "Source Name": "group-based",
    "GroupId": "2",
    "Source ID": "0.8931962767228987"
  },
  {
    "Source Name": "group-based",
    "GroupId": "2",
    "Source ID": "0.6514448303865373"
  }
]

グループベースのアグリゲーターの例の XML

この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:aggregators="http://www.mulesoft.org/schema/mule/aggregators" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
	xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/aggregators http://www.mulesoft.org/schema/mule/aggregators/current/mule-aggregators.xsd">
	<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="c4a97ca0-c01f-4ce9-9922-380eb0defda2" >
		<http:listener-connection host="0.0.0.0" port="8081" />
	</http:listener-config>
	<flow >
		<http:listener config-ref="HTTP_Listener_config" path="/test" allowedMethods="POST"/>
		<ee:transform  >
			<ee:message >
				<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
    "Source Name": "group-based",
    "GroupId": message.attributes['queryParams'].groupId,
    "Source ID": (random() as String)
}]]></ee:set-payload>
			</ee:message>
		</ee:transform>
		<aggregators:group-based-aggregator name="groupBasedAggregator" evictionTime="0" groupSize="3" groupId="#[payload.GroupId]">
			<aggregators:incremental-aggregation >
				<logger level="INFO" message="Doing incremental aggregation"/>
			</aggregators:incremental-aggregation>
			<aggregators:aggregation-complete >
				<logger level="INFO" message="Aggregation complete"/>
				<logger level="INFO" message="#[output application/json
---
payload]"/>
			</aggregators:aggregation-complete>
		</aggregators:group-based-aggregator>
	</flow>
</mule>