Flex Gateway新着情報
Governance新着情報
Monitoring API ManagerAggregators Module の [Size based aggregator (サイズベースのアグリゲーター)] スコープのタイムアウト動作を理解するには、[Timeout (タイムアウト)] 項目と [Max size (最大サイズ)] 項目のしくみについて説明している次の図を参照してください。その後、Mule アプリケーションの例でこれらの項目を Anypoint Studio と XML エディターで設定する方法を確認してください。
[Max size (最大サイズ)] 項目は、集約が完了とみなされる前に集約される要素の合計数を示します。
[Timeout (タイムアウト)] 項目は集約が完了するのを待機する最大時間を示します。
最初のプロセスは、タイムアウトの発生前に集約された要素の合計数が必要な数に達したために集約が完了したことを示しています。
2 番目のプロセスは、タイムアウトの発生前に集約された要素の合計数が必要な数に達しなかったために集約が完了していないことを示しています。これにより、集約はリセットされます。
3 番目のプロセスは、[Aggregator listener (アグリゲーターリスナー)] ソースを使用して、タイムアウトの発生前に集約された要素を取得できることを示しています。このソースは、タイムアウトが発生するとトリガーされます。
[Aggregator listener (アグリゲーターリスナー)] ソースを使用して未完了の集約要素を取得する場合、[Include timed out group (タイムアウトグループを含める)] 項目を選択して、タイムアウトの発生時にソースがトリガーされることを示す必要があります。
次の例は、10
秒間に最大 3
件の HTTP 要求コールを集約するように [Size based Aggregator (サイズベースのアグリゲーター)] スコープを設定する方法を示しています。アプリケーションは DataWeave を使用して、ランダム ID 番号で各コールのペイロードを設定します。タイムアウトの発生前に集約の完了に必要な 3
件のコールに達すると、アプリケーションはすべての集約された要素を記録します。それ以外の場合、アプリケーションは、タイムアウトの発生時に [Aggregator listener (アグリゲーターリスナー)] ソースによって取得された集約済み要素のみを記録します。
独自の環境でこの例をテストするには、Mule アプリケーションを作成し、curl コマンドを使用してアプリケーションを実行する必要があります。
Anypoint Studio で Mule フローを作成する手順は、次のとおりです。
[Mule Palette (Mule パレット)] で、[HTTP Listener (HTTP リスナー)] を選択してキャンバスにドラッグします。
このソースは受信 HTTP メッセージ属性をリスンすることでフローを開始します。
[Connector configuration (コネクタ設定)] で [HTTP_Listener_config] を選択します。
[Path (パス)] を /test
に設定します。
[Transform message] コンポーネントを [Listener] の右にドラッグします。
[Transform message] コンポーネントの DataSense プレビューウィンドウで、次の DataWeave 式を追加します。これにより、各 HTTP 要求のペイロードに Source Name
が "size-based"
として設定され、Source ID
が random() as String
として設定されます。
%dw 2.0
output application/json
---
{
"Source Name": "size-based",
"Source ID": (random() as String)
}
[Size based aggregator (サイズベースのアグリゲーター)] スコープを [Transform message] の右にドラッグします。
[名前] を sizeBasedAggregator
に設定します。
[Max Size (最大サイズ)] を 3
に設定します。
[Timeout (タイムアウト)] を 10
に設定します。
[Content (コンテンツ)] を payload
に設定します。これは、何を集約するか (この場合は変換された HTTP 要求) を定義する式です。
[Logger] コンポーネントを [Incremental aggregation (増分集約)] ルートにドラッグします。
[Logger] 設定画面で、[Message (メッセージ)] を Doing incremental aggregation step.
に設定します。
[Logger] コンポーネントを [Aggregation complete (集約完了)] ルートにドラッグします。
[Message (メッセージ)] を Aggregation complete
に設定します。
別の [Logger] コンポーネントを [Aggregation complete (集約完了)] 内の最初の [Logger] コンポーネントの右にドラッグします。
[Message (メッセージ)] を次の式に設定します。この式では、すべての集約された要素が返されます。
output application/json
---
payload
[Aggregator listener (アグリゲーターリスナー)] ソースを最初のフローの下にドラッグします。
[Aggregator name (アグリゲーター名)] を sizeBasedAggregator
に設定します。これは、前のアグリゲーターをリスンします。
[SelectInclude Timed Out Groups (タイムアウトグループを含める)] を選択します。これは、タイムアウトの発生時にリスナーがトリガーされるかどうかを示します。
[Logger] コンポーネントを [Aggregation listener (集約リスナー)] ソースの右にドラッグします。
[Message (メッセージ)] を次の式に設定します。この式では、タイムアウトの発生前に集約された要素のみが返されます。
output application/json
---
payload
プロジェクトを保存します。
Package Explorer で、[Run (実行)] > [Run As (別のユーザーとして実行)] > [Mule Application (Mule アプリケーション)] をクリックします。
10 秒以内に次の curl コマンドを送信して、アプリケーションをテストします。
curl -X POST http://localhost:8086/test
[Console (コンソール)] ビューに移動して、ロガーメッセージを参照します。
INFO 2021-06-22 12:59:16,319 [[MuleRuntime].uber.12: [aggregator-size-demo].aggregator-size-demoFlow.CPU_INTENSIVE @7992f689] [processor: aggregator-size-demoFlow/processors/1/route/0/processors/0; event: cbc219c0-d372-11eb-b975-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation step.
INFO 2021-06-22 12:59:26,328 [[MuleRuntime].uber.12: [aggregator-size-demo].aggregator-size-demoFlow1.CPU_LITE @684a1bef] [processor: aggregator-size-demoFlow1/processors/0; event: d6b48f00-d36f-11eb-b975-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
{
"Source Name": "size-based",
"Source ID": "0.6568747259174192"
}
]
このロガーメッセージの例は、最初の HTTP 要求で、[Incremental aggregator (増分アグリゲーター)] ルートの [Logger] コンポーネントによってメッセージ Doing incremental aggregation step.
が記録されることを示しています。集約が 10
秒に達すると、タイムアウトが発生します。[Aggregation listener (集約リスナー)] ソースによって唯一の HTTP 要求コールが取得され、後続の [Logger] コンポーネントによって、集約されたペイロード要素 (ランダム ID 番号) が返されます。
この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:aggregators="http://www.mulesoft.org/schema/mule/aggregators"
xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/aggregators http://www.mulesoft.org/schema/mule/aggregators/current/mule-aggregators.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd">
<http:listener-config name="HTTP_Listener_config">
<http:listener-connection host="0.0.0.0" port="8086" />
</http:listener-config>
<flow name="aggregator-size-demoFlow" >
<http:listener doc:name="Listener" config-ref="HTTP_Listener_config" path="/test" allowedMethods="POST"/>
<ee:transform doc:name="Transform Message" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
"Source Name": "size-based",
"Source ID": (random() as String)
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<aggregators:size-based-aggregator doc:name="Size based aggregator" name="sizeBasedAggregator" maxSize="3" timeout="10">
<aggregators:incremental-aggregation>
<logger level="INFO" message="Doing incremental aggregation step." />
</aggregators:incremental-aggregation>
<aggregators:aggregation-complete>
<logger level="INFO" message="Aggregation complete" />
<logger level="INFO" message="#[output application/json
---
payload]" />
</aggregators:aggregation-complete>
</aggregators:size-based-aggregator>
</flow>
<flow name="aggregator-size-demoFlow1" >
<aggregators:aggregator-listener aggregatorName="sizeBasedAggregator" includeTimedOutGroups="true"/>
<logger level="INFO" message="#[output application/json --- payload]"/>
</flow>
</mule>