Flex Gateway新着情報
Governance新着情報
Monitoring API ManagerAggregators Module の [Time based aggregator (時間ベースのアグリゲーター)] スコープを使用すると、定義済みの時間制限内で要素を集約することができます。次の例は、5
秒間の HTTP 要求コールを集約するようにこのスコープを設定する方法を示しています。アプリケーションは DataWeave を使用して、ランダム ID 番号で各コールのペイロードを設定します。集約が 5
秒に達すると、[Aggregator listener (アグリゲーターリスナー)] ソースは要素のリストを取得し、アプリケーションはすべての集約されたランダム ID 番号を記録します。
独自の環境でこの例をテストするには、Mule アプリケーションを作成し、curl コマンドを使用してアプリケーションを実行する必要があります。
Anypoint Studio で Mule フローを作成する手順は、次のとおりです。
[Mule Palette (Mule パレット)] で、[HTTP Listener (HTTP リスナー)] を選択してキャンバスにドラッグします。
このソースは受信 HTTP メッセージ属性をリスンすることでフローを開始します。
[Connector configuration (コネクタ設定)] で [HTTP_Listener_config] を選択します。
[Path (パス)] を /test
に設定します。
[Transform message] コンポーネントを [Listener] の右にドラッグします。
[Transform message] コンポーネントの DataSense プレビューウィンドウで、次の DataWeave 式を追加します。これにより、各 HTTP 要求のペイロードに Source Name
が "time-based"
として設定され、Source ID
が random() as String
として設定されます。
%dw 2.0
output application/json
---
{
"Source Name": "time-based",
"Source ID": (random() as String)
}
[Time based aggregator (時間ベースのアグリゲーター)] スコープを [Transform message] の右にドラッグします。
[名前] を timeBasedAggregator
に設定します。
[Period (期間)] を 5
に設定します。
[Period unit (期間単位)] を SECONDS(Default)
に設定します。
[Content (コンテンツ)] を payload
に設定します。これは、何を集約するか (この場合は変換された HTTP 要求) を定義する式です。
[Logger] コンポーネントを [Incremental aggregation (増分集約)] ルートにドラッグします。
[Logger] 設定画面で、[Message (メッセージ)] を Doing incremental step.
に設定します。
[Aggregator listener (アグリゲーターリスナー)] ソースを最初のフローの下にドラッグします。
このソースは、前に定義された時間ベースのアグリゲーターをリスンします。このアグリゲーターがその要素を解放すると、リスナーが実行されます。
[Aggregator name (アグリゲーター名)] を timeBasedAggregator
に設定します。
[Logger] コンポーネントを [Aggregator listener (アグリゲーターリスナー)] の右にドラッグします
[Message (メッセージ)] を Aggregation complete
に設定します。
別の [Logger] コンポーネントを、前に追加した [Logger] コンポーネントの右にドラッグします。
[Message (メッセージ)] を次の式に設定します。この式では、すべての集約された要素が返されます。
output application/json
---
payload
プロジェクトを保存します。
Package Explorer で、[Run (実行)] > [Run As (別のユーザーとして実行)] > [Mule Application (Mule アプリケーション)] をクリックします。
5
秒以内に次の curl コマンドを 2 回送信して、アプリケーションをテストします。
curl -X POST http://localhost:8086/test
これにより、[Time based aggregator (時間ベースのアグリゲーター)] で処理する 2 個の HTTP 要求が生成されます。
[Console (コンソール)] ビューに移動して、ロガーメッセージを参照します。
INFO 2021-06-09 12:02:04,203 [[MuleRuntime].uber.04: [aggregator-time-demo].aggregator-time-demoFlow.CPU_INTENSIVE @59b15997] [processor: aggregator-time-demoFlow/processors/1/route/0/processors/0; event: a6b00280-c933-11eb-a825-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental step.
INFO 2021-06-09 12:02:06,071 [[MuleRuntime].uber.04: [aggregator-time-demo].aggregator-time-demoFlow.CPU_INTENSIVE @59b15997] [processor: aggregator-time-demoFlow/processors/1/route/0/processors/0; event: a7cd0b40-c933-11eb-a825-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental step.
INFO 2021-06-09 12:02:09,206 [[MuleRuntime].uber.04: [aggregator-time-demo].aggregator-time-demoOnCompleteFlow.CPU_LITE @2ab3c271] [processor: aggregator-time-demoOnCompleteFlow/processors/0; event: 827c9591-c933-11eb-a825-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Aggregation complete
INFO 2021-06-09 12:02:09,208 [[MuleRuntime].uber.04: [aggregator-time-demo].aggregator-time-demoOnCompleteFlow.CPU_LITE @2ab3c271] [processor: aggregator-time-demoOnCompleteFlow/processors/1; event: 827c9591-c933-11eb-a825-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
{
"Source Name": "time-based",
"Source ID": "0.7095971146328517"
},
{
"Source Name": "time-based",
"Source ID": "0.6900116288202388"
}
]
このロガーメッセージの例は、各 HTTP 要求で、[Incremental aggregator (増分アグリゲーター)] ルートの [Logger] コンポーネントによってメッセージ Doing incremental step.
が記録されることを示しています。[Aggregator listener (アグリゲーターリスナー)] ソースで参照される timeBasedAggregator
値によって増分集約が 5
秒以内に完了すると、すべての要素のリストを使用してリスナーがトリガーされます。後続の [Logger] コンポーネントによってメッセージ Aggregation complete
が記録され、2 番目の [Logger] コンポーネントによって、集約されたペイロード要素 (ランダム ID 番号) が返されます。
{
"Source Name": "time-based",
"Source ID": "0.7095971146328517"
},
{
"Source Name": "time-based",
"Source ID": "0.6900116288202388"
}
この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:aggregators="http://www.mulesoft.org/schema/mule/aggregators" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/aggregators http://www.mulesoft.org/schema/mule/aggregators/current/mule-aggregators.xsd">
<http:listener-config name="HTTP_Listener_config" >
<http:listener-connection host="0.0.0.0" port="8086" />
</http:listener-config>
<flow name="aggregator-time-demoFlow" >
<http:listener config-ref="HTTP_Listener_config" path="/test" allowedMethods="POST"/>
<ee:transform >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
"Source Name": "time-based",
"Source ID": (random() as String)
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<aggregators:time-based-aggregator period="5" name="timeBasedAggregator">
<aggregators:incremental-aggregation >
<logger level="INFO" message="Doing incremental step."/>
</aggregators:incremental-aggregation>
</aggregators:time-based-aggregator>
</flow>
<flow name="aggregator-time-demoOnCompleteFlow" >
<aggregators:aggregator-listener aggregatorName="timeBasedAggregator"/>
<logger level="INFO" message="Aggregation complete" />
<logger level="INFO" message="#[output application/json
---
payload]" />
</flow>
</mule>