サイズベースのアグリゲーターのタイムアウト項目と最大サイズ項目の設定 - Mule4

Aggregators Module の ​[Size based aggregator (サイズベースのアグリゲーター)]​ スコープのタイムアウト動作を理解するには、​[Timeout (タイムアウト)]​ 項目と ​[Max size (最大サイズ)]​ 項目のしくみについて説明している次の図を参照してください。その後、Mule アプリケーションの例でこれらの項目を Anypoint Studio と XML エディターで設定する方法を確認してください。

aggregator size timeout diagram
Figure 1. サイズベースのアグリゲーターのタイムアウトと最大サイズの動作の図
  • [Max size (最大サイズ)]​ 項目は、集約が完了とみなされる前に集約される要素の合計数を示します。

  • [Timeout (タイムアウト)]​ 項目は集約が完了するのを待機する最大時間を示します。

  • 最初のプロセスは、タイムアウトの発生前に集約された要素の合計数が必要な数に達したために集約が完了したことを示しています。

  • 2 番目のプロセスは、タイムアウトの発生前に集約された要素の合計数が必要な数に達しなかったために集約が完了していないことを示しています。これにより、集約はリセットされます。

  • 3 番目のプロセスは、​[Aggregator listener (アグリゲーターリスナー)]​ ソースを使用して、タイムアウトの発生前に集約された要素を取得できることを示しています。このソースは、タイムアウトが発生するとトリガーされます。

[Aggregator listener (アグリゲーターリスナー)]​ ソースを使用して未完了の集約要素を取得する場合、​[Include timed out group (タイムアウトグループを含める)]​ 項目を選択して、タイムアウトの発生時にソースがトリガーされることを示す必要があります。

Mule アプリケーションの設定

次の例は、​10​ 秒間に最大 ​3​ 件の HTTP 要求コールを集約するように ​[Size based Aggregator (サイズベースのアグリゲーター)]​ スコープを設定する方法を示しています。アプリケーションは DataWeave を使用して、ランダム ID 番号で各コールのペイロードを設定します。タイムアウトの発生前に集約の完了に必要な ​3​ 件のコールに達すると、アプリケーションはすべての集約された要素を記録します。それ以外の場合、アプリケーションは、タイムアウトの発生時に ​[Aggregator listener (アグリゲーターリスナー)]​ ソースによって取得された集約済み要素のみを記録します。

Anypoint Studio キャンバスでのサイズベースのアグリゲーターのフローの例
Figure 2. サイズベースのアグリゲーターのフローの例

独自の環境でこの例をテストするには、Mule アプリケーションを作成し、curl コマンドを使用してアプリケーションを実行する必要があります。

Anypoint Studio で Mule フローを作成する手順は、次のとおりです。

  1. [Mule Palette (Mule パレット)]​ で、​[HTTP Listener (HTTP リスナー)]​ を選択してキャンバスにドラッグします。
    このソースは受信 HTTP メッセージ属性をリスンすることでフローを開始します。

  2. [Connector configuration (コネクタ設定)]​ で ​[HTTP_Listener_config]​ を選択します。

  3. [Path (パス)]​ を ​/test​ に設定します。

  4. [Transform message]​ コンポーネントを ​[Listener]​ の右にドラッグします。

  5. [Transform message]​ コンポーネントの DataSense プレビューウィンドウで、次の DataWeave 式を追加します。これにより、各 HTTP 要求のペイロードに ​Source Name​ が ​"size-based"​ として設定され、​Source ID​ が ​random() as String​ として設定されます。

    %dw 2.0
    output application/json
    ---
    {
        "Source Name": "size-based",
        "Source ID": (random() as String)
    }
  6. [Size based aggregator (サイズベースのアグリゲーター)]​ スコープを ​[Transform message]​ の右にドラッグします。

  7. [名前]​ を ​sizeBasedAggregator​ に設定します。

  8. [Max Size (最大サイズ)]​ を ​3​ に設定します。

  9. [Timeout (タイムアウト)]​ を ​10​ に設定します。

  10. [Content (コンテンツ)]​ を ​payload​ に設定します。これは、何を集約するか (この場合は変換された HTTP 要求) を定義する式です。

  11. [Logger]​ コンポーネントを ​[Incremental aggregation (増分集約)]​ ルートにドラッグします。

  12. [Logger]​ 設定画面で、​[Message (メッセージ)]​ を ​Doing incremental aggregation step.​ に設定します。

  13. [Logger]​ コンポーネントを ​[Aggregation complete (集約完了)]​ ルートにドラッグします。

  14. [Message (メッセージ)]​ を ​Aggregation complete​ に設定します。

  15. 別の ​[Logger]​ コンポーネントを ​[Aggregation complete (集約完了)]​ 内の最初の ​[Logger]​ コンポーネントの右にドラッグします。

  16. [Message (メッセージ)]​ を次の式に設定します。この式では、すべての集約された要素が返されます。

    output application/json
    ---
    payload
  17. [Aggregator listener (アグリゲーターリスナー)]​ ソースを最初のフローの下にドラッグします。

  18. [Aggregator name (アグリゲーター名)]​ を ​sizeBasedAggregator​ に設定します。これは、前のアグリゲーターをリスンします。

  19. [SelectInclude Timed Out Groups (タイムアウトグループを含める)]​ を選択します。これは、タイムアウトの発生時にリスナーがトリガーされるかどうかを示します。

  20. [Logger]​ コンポーネントを ​[Aggregation listener (集約リスナー)]​ ソースの右にドラッグします。

  21. [Message (メッセージ)]​ を次の式に設定します。この式では、タイムアウトの発生前に集約された要素のみが返されます。

    output application/json
    ---
    payload
  22. プロジェクトを保存します。

  23. Package Explorer​ で、​[Run (実行)] > [Run As (別のユーザーとして実行)] > [Mule Application (Mule アプリケーション)]​ をクリックします。

  24. 10 秒以内に次の curl コマンドを送信して、アプリケーションをテストします。
    curl -X POST http://localhost:8086/test

  25. [Console (コンソール)]​ ビューに移動して、ロガーメッセージを参照します。

INFO  2021-06-22 12:59:16,319 [[MuleRuntime].uber.12: [aggregator-size-demo].aggregator-size-demoFlow.CPU_INTENSIVE @7992f689] [processor: aggregator-size-demoFlow/processors/1/route/0/processors/0; event: cbc219c0-d372-11eb-b975-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation step.
INFO  2021-06-22 12:59:26,328 [[MuleRuntime].uber.12: [aggregator-size-demo].aggregator-size-demoFlow1.CPU_LITE @684a1bef] [processor: aggregator-size-demoFlow1/processors/0; event: d6b48f00-d36f-11eb-b975-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
  {
    "Source Name": "size-based",
    "Source ID": "0.6568747259174192"
  }
]

このロガーメッセージの例は、最初の HTTP 要求で、​[Incremental aggregator (増分アグリゲーター)]​ ルートの ​[Logger]​ コンポーネントによってメッセージ ​Doing incremental aggregation step.​ が記録されることを示しています。集約が ​10​ 秒に達すると、タイムアウトが発生します。​[Aggregation listener (集約リスナー)]​ ソースによって唯一の HTTP 要求コールが取得され、後続の ​[Logger]​ コンポーネントによって、集約されたペイロード要素 (ランダム ID 番号) が返されます。

サイズベースのアグリゲーターのタイムアウトと最大サイズの例の XML

この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:aggregators="http://www.mulesoft.org/schema/mule/aggregators"
	xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
	xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/aggregators http://www.mulesoft.org/schema/mule/aggregators/current/mule-aggregators.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd">
	<http:listener-config name="HTTP_Listener_config">
		<http:listener-connection host="0.0.0.0" port="8086" />
	</http:listener-config>
	<flow name="aggregator-size-demoFlow"  >
		<http:listener doc:name="Listener"  config-ref="HTTP_Listener_config" path="/test" allowedMethods="POST"/>
		<ee:transform doc:name="Transform Message" >
			<ee:message >
				<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
    "Source Name": "size-based",
    "Source ID": (random() as String)
}]]></ee:set-payload>
			</ee:message>
		</ee:transform>
		<aggregators:size-based-aggregator doc:name="Size based aggregator"  name="sizeBasedAggregator" maxSize="3" timeout="10">
			<aggregators:incremental-aggregation>
				<logger level="INFO"  message="Doing incremental aggregation step." />
			</aggregators:incremental-aggregation>
			<aggregators:aggregation-complete>
				<logger level="INFO"  message="Aggregation complete" />
				<logger level="INFO" message="#[output application/json
---
payload]" />
			</aggregators:aggregation-complete>
		</aggregators:size-based-aggregator>
	</flow>
	<flow name="aggregator-size-demoFlow1"  >
		<aggregators:aggregator-listener  aggregatorName="sizeBasedAggregator" includeTimedOutGroups="true"/>
		<logger level="INFO" message="#[output application/json&#10;---&#10;payload]"/>
	</flow>
</mule>