時間ベースのアグリゲーターの例 - Mule 4

Aggregators Module の ​[Time based aggregator (時間ベースのアグリゲーター)]​ スコープを使用すると、定義済みの時間制限内で要素を集約することができます。次の例は、​5​ 秒間の HTTP 要求コールを集約するようにこのスコープを設定する方法を示しています。アプリケーションは DataWeave を使用して、ランダム ID 番号で各コールのペイロードを設定します。集約が ​5​ 秒に達すると、​[Aggregator listener (アグリゲーターリスナー)]​ ソースは要素のリストを取得し、アプリケーションはすべての集約されたランダム ID 番号を記録します。

Anypoint Studio キャンバスでの時間ベースのアグリゲーターのフローの例
Figure 1. 時間ベースのアグリゲーターのフローの例

独自の環境でこの例をテストするには、Mule アプリケーションを作成し、curl コマンドを使用してアプリケーションを実行する必要があります。

Mule アプリケーションの作成

Anypoint Studio で Mule フローを作成する手順は、次のとおりです。

  1. [Mule Palette (Mule パレット)]​ で、​[HTTP Listener (HTTP リスナー)]​ を選択してキャンバスにドラッグします。
    このソースは受信 HTTP メッセージ属性をリスンすることでフローを開始します。

  2. [Connector configuration (コネクタ設定)]​ で ​[HTTP_Listener_config]​ を選択します。

  3. [Path (パス)]​ を ​/test​ に設定します。

  4. [Transform message]​ コンポーネントを ​[Listener]​ の右にドラッグします。

  5. [Transform message]​ コンポーネントの DataSense プレビューウィンドウで、次の DataWeave 式を追加します。これにより、各 HTTP 要求のペイロードに ​Source Name​ が ​"time-based"​ として設定され、​Source ID​ が ​random() as String​ として設定されます。

    %dw 2.0
    output application/json
    ---
    {
        "Source Name": "time-based",
        "Source ID": (random() as String)
    }
  6. [Time based aggregator (時間ベースのアグリゲーター)]​ スコープを ​[Transform message]​ の右にドラッグします。

  7. [名前]​ を ​timeBasedAggregator​ に設定します。

  8. [Period (期間)]​ を ​5​ に設定します。

  9. [Period unit (期間単位)]​ を ​SECONDS(Default)​ に設定します。

  10. [Content (コンテンツ)]​ を ​payload​ に設定します。これは、何を集約するか (この場合は変換された HTTP 要求) を定義する式です。

  11. [Logger]​ コンポーネントを ​[Incremental aggregation (増分集約)]​ ルートにドラッグします。

  12. [Logger]​ 設定画面で、​[Message (メッセージ)]​ を ​Doing incremental step.​ に設定します。

  13. [Aggregator listener (アグリゲーターリスナー)]​ ソースを最初のフローの下にドラッグします。
    このソースは、前に定義された時間ベースのアグリゲーターをリスンします。このアグリゲーターがその要素を解放すると、リスナーが実行されます。

  14. [Aggregator name (アグリゲーター名)]​ を ​timeBasedAggregator​ に設定します。

  15. [Logger]​ コンポーネントを ​[Aggregator listener (アグリゲーターリスナー)]​ の右にドラッグします

  16. [Message (メッセージ)]​ を ​Aggregation complete​ に設定します。

  17. 別の ​[Logger]​ コンポーネントを、前に追加した ​[Logger]​ コンポーネントの右にドラッグします。

  18. [Message (メッセージ)]​ を次の式に設定します。この式では、すべての集約された要素が返されます。

    output application/json
    ---
    payload
  19. プロジェクトを保存します。

  20. Package Explorer​ で、​[Run (実行)] > [Run As (別のユーザーとして実行)] > [Mule Application (Mule アプリケーション)]​ をクリックします。

  21. 5​ 秒以内に次の curl コマンドを 2 回送信して、アプリケーションをテストします。
    curl -X POST http://localhost:8086/test
    これにより、​[Time based aggregator (時間ベースのアグリゲーター)]​ で処理する 2 個の HTTP 要求が生成されます。

  22. [Console (コンソール)]​ ビューに移動して、ロガーメッセージを参照します。

INFO  2021-06-09 12:02:04,203 [[MuleRuntime].uber.04: [aggregator-time-demo].aggregator-time-demoFlow.CPU_INTENSIVE @59b15997] [processor: aggregator-time-demoFlow/processors/1/route/0/processors/0; event: a6b00280-c933-11eb-a825-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental step.
INFO  2021-06-09 12:02:06,071 [[MuleRuntime].uber.04: [aggregator-time-demo].aggregator-time-demoFlow.CPU_INTENSIVE @59b15997] [processor: aggregator-time-demoFlow/processors/1/route/0/processors/0; event: a7cd0b40-c933-11eb-a825-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental step.
INFO  2021-06-09 12:02:09,206 [[MuleRuntime].uber.04: [aggregator-time-demo].aggregator-time-demoOnCompleteFlow.CPU_LITE @2ab3c271] [processor: aggregator-time-demoOnCompleteFlow/processors/0; event: 827c9591-c933-11eb-a825-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Aggregation complete
INFO  2021-06-09 12:02:09,208 [[MuleRuntime].uber.04: [aggregator-time-demo].aggregator-time-demoOnCompleteFlow.CPU_LITE @2ab3c271] [processor: aggregator-time-demoOnCompleteFlow/processors/1; event: 827c9591-c933-11eb-a825-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
  {
    "Source Name": "time-based",
    "Source ID": "0.7095971146328517"
  },
  {
    "Source Name": "time-based",
    "Source ID": "0.6900116288202388"
  }
]

このロガーメッセージの例は、各 HTTP 要求で、​[Incremental aggregator (増分アグリゲーター)]​ ルートの ​[Logger]​ コンポーネントによってメッセージ ​Doing incremental step.​ が記録されることを示しています。​[Aggregator listener (アグリゲーターリスナー)]​ ソースで参照される ​timeBasedAggregator​ 値によって増分集約が ​5​ 秒以内に完了すると、すべての要素のリストを使用してリスナーがトリガーされます。後続の ​[Logger]​ コンポーネントによってメッセージ ​Aggregation complete​ が記録され、2 番目の ​[Logger]​ コンポーネントによって、集約されたペイロード要素 (ランダム ID 番号) が返されます。

{
  "Source Name": "time-based",
  "Source ID": "0.7095971146328517"
},
{
  "Source Name": "time-based",
  "Source ID": "0.6900116288202388"
}

時間ベースのアグリゲーターの例の XML

この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:aggregators="http://www.mulesoft.org/schema/mule/aggregators" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
	xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/aggregators http://www.mulesoft.org/schema/mule/aggregators/current/mule-aggregators.xsd">
	<http:listener-config name="HTTP_Listener_config" >
		<http:listener-connection host="0.0.0.0" port="8086" />
	</http:listener-config>
	<flow name="aggregator-time-demoFlow" >
		<http:listener config-ref="HTTP_Listener_config" path="/test" allowedMethods="POST"/>
		<ee:transform >
			<ee:message >
				<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
    "Source Name": "time-based",
    "Source ID": (random() as String)
}]]></ee:set-payload>
			</ee:message>
		</ee:transform>
		<aggregators:time-based-aggregator period="5" name="timeBasedAggregator">
			<aggregators:incremental-aggregation >
				<logger level="INFO" message="Doing incremental step."/>
			</aggregators:incremental-aggregation>
		</aggregators:time-based-aggregator>
	</flow>
	<flow name="aggregator-time-demoOnCompleteFlow" >
		<aggregators:aggregator-listener  aggregatorName="timeBasedAggregator"/>
		<logger level="INFO" message="Aggregation complete" />
		<logger level="INFO" message="#[output application/json
---
payload]" />
	</flow>
</mule>