Flex Gateway新着情報
Governance新着情報
Monitoring API ManagerAggregators Module の [Group based aggregator (グループベースのアグリゲーター)] スコープを使用すると、要素をグループ ID でグループに集約できます。次の例は、要求で送信されたグループ ID に基づいて、グループごとに最大 3
件の HTTP 要求コールを集約するようにこのスコープを設定する方法を示しています。アプリケーションは DataWeave を使用して、ランダム ID 番号で各コールのペイロードを設定します。各グループで 3
個の要素の集約が完了すると、アプリケーションはグループのすべての集約されたランダム ID 番号要素を記録します。
独自の環境でこの例をテストするには、Mule アプリケーションを作成し、curl コマンドを使用してアプリケーションを実行する必要があります。
Anypoint Studio で Mule フローを作成する手順は、次のとおりです。
[Mule Palette (Mule パレット)] で、[HTTP Listener (HTTP リスナー)] を選択してキャンバスにドラッグします。
このソースは受信 HTTP メッセージ属性をリスンすることでフローを開始します。
[Connector configuration (コネクタ設定)] で [HTTP_Listener_config] を選択します。
[Path (パス)] を /test
に設定します。
[Transform message] コンポーネントを [Listener] の右にドラッグします。
[Transform message] コンポーネントの DataSense プレビューウィンドウで、次の DataWeave 式を追加します。これにより、各 HTTP 要求のペイロードに Source Name
が "group-based"
として設定され、Source ID
が random() as String
として設定されます。また、message.attributes['queryParams'].groupId
に基づいて要求の GroupId
値も取得します。
%dw 2.0
output application/json
---
{
"Source Name": "group-based",
"GroupId": message.attributes['queryParams'].groupId,
"Source ID": (random() as String)
}
[Group based aggregator (グループベースのアグリゲーター)] スコープを [Transform message] の右にドラッグします。
[名前] を groupBasedAggregator
に設定します。
[Content (コンテンツ)] を payload
に設定します。これは、何を集約するか (この場合は変換された HTTP 要求) を定義する式です。
[Group id (グループ ID)] を payload.GroupId
に設定します。これは、受信した各新規メッセージで評価される式であり、要素を集約するグループ ID を取得します。
[Group size (グループサイズ)] を 3
に設定します。これは、解決したグループ ID を使用してグループに集約する要素の最大サイズです。
[Logger] コンポーネントを [Incremental aggregation (増分集約)] ルートにドラッグします。
[Logger] 設定画面で、[Message (メッセージ)] を Doing incremental aggregation.
に設定します。
[Logger] コンポーネントを [Aggregation complete (集約完了)] ルートにドラッグします。
[Message (メッセージ)] を Aggregation complete
に設定します。
別の [Logger] コンポーネントを [Aggregation complete (集約完了)] 内の最初の [Logger] コンポーネントの右にドラッグします。
[Message (メッセージ)] を次の式に設定します。この式では、すべての集約された要素が返されます。
output application/json
---
payload
プロジェクトを保存します。
Package Explorer で、[Run (実行)] > [Run As (別のユーザーとして実行)] > [Mule Application (Mule アプリケーション)] をクリックします。
次の curl コマンドを 2 回送信して、アプリケーションをテストします: + curl -X POST http://localhost:8081/test?groupId=1
。
これにより、2 つの HTTP 要求が生成され、グループ ID 1
に集約されます。
コマンド curl -X POST http://localhost:8081/test?groupId=2
を送信して、1 つの要求をグループ ID 2
に集約します。
別のコマンド curl -X POST http://localhost:8081/test?groupId=1
を送信して、要求をグループ ID 1
に集約します。
[Console (コンソール)] ビューに移動して、ロガーメッセージを参照します。
INFO 2021-06-14 16:34:33,921 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/0/processors/0; event: 8beae010-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation
INFO 2021-06-14 16:34:43,618 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/0/processors/0; event: 91b93be0-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation
INFO 2021-06-14 16:34:47,203 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/0/processors/0; event: 93d8e790-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation
INFO 2021-06-14 16:34:51,380 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/1/processors/0; event: 96566ab0-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Aggregation complete
INFO 2021-06-14 16:34:51,409 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/1/processors/1; event: 96566ab0-cd47-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
{
"Source Name": "group-based",
"GroupId": "1",
"Source ID": "0.5127046416902801"
},
{
"Source Name": "group-based",
"GroupId": "1",
"Source ID": "0.20239307122656847"
},
{
"Source Name": "group-based",
"GroupId": "1",
"Source ID": "0.7505233647798346"
}
]
このロガーメッセージの例は、グループ ID 1
の最初の 2 つの HTTP 要求とグループ ID 2
の 3 番目の要求で、[Incremental aggregator (増分アグリゲーター)] ルートの [Logger] コンポーネントによってメッセージ Doing incremental aggregation.
が記録されることを示しています。
4 番目の HTTP 要求 (グループ ID 1
の 3 番目の要求) を送信すると、グループサイズがグループあたり 3
件のコールになっているため、集約グループ 1
は完了します。[Aggregation complete (集約完了)] ルートの 1 番目の [Logger] コンポーネントによってメッセージ Aggregation complete
が記録され、2 番目の [Logger] コンポーネントによってグループの集約されたペイロード要素 (ランダム ID 番号) が返されます。
コールをグループ 2
に集約する別の 2 つのコマンド curl -X POST http://localhost:8081/test?groupId=2
を送信すると、[Console (コンソール)] ビューに次のロガーメッセージが返されます。
INFO 2021-06-14 16:53:56,262 [[MuleRuntime].uber.12: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/0/processors/0; event: 40c07200-cd4a-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Doing incremental aggregation
INFO 2021-06-14 16:53:58,686 [[MuleRuntime].uber.01: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/1/processors/0; event: 42329fa0-cd4a-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Aggregation complete
INFO 2021-06-14 16:53:58,689 [[MuleRuntime].uber.01: [aggregator-group-demo].aggregator-group-demoFlow.CPU_INTENSIVE @2c3478b5] [processor: aggregator-group-demoFlow/processors/1/route/1/processors/1; event: 42329fa0-cd4a-11eb-8b82-f01898ad2638] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
{
"Source Name": "group-based",
"GroupId": "2",
"Source ID": "0.8390326267445475"
},
{
"Source Name": "group-based",
"GroupId": "2",
"Source ID": "0.8931962767228987"
},
{
"Source Name": "group-based",
"GroupId": "2",
"Source ID": "0.6514448303865373"
}
]
この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:aggregators="http://www.mulesoft.org/schema/mule/aggregators" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/aggregators http://www.mulesoft.org/schema/mule/aggregators/current/mule-aggregators.xsd">
<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="c4a97ca0-c01f-4ce9-9922-380eb0defda2" >
<http:listener-connection host="0.0.0.0" port="8081" />
</http:listener-config>
<flow >
<http:listener config-ref="HTTP_Listener_config" path="/test" allowedMethods="POST"/>
<ee:transform >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
"Source Name": "group-based",
"GroupId": message.attributes['queryParams'].groupId,
"Source ID": (random() as String)
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<aggregators:group-based-aggregator name="groupBasedAggregator" evictionTime="0" groupSize="3" groupId="#[payload.GroupId]">
<aggregators:incremental-aggregation >
<logger level="INFO" message="Doing incremental aggregation"/>
</aggregators:incremental-aggregation>
<aggregators:aggregation-complete >
<logger level="INFO" message="Aggregation complete"/>
<logger level="INFO" message="#[output application/json
---
payload]"/>
</aggregators:aggregation-complete>
</aggregators:group-based-aggregator>
</flow>
</mule>