VM Connector を使用したキュー内保留メッセージの集約の例

次の例は、仮想マシン用 Anypoint Connector (VM Connector) を使用して、キューで保留中のメッセージを集約する方法についての 2 つのシナリオを示しています。

VM はキューであり、キュー内で保留中のすべてのメッセージに対して for each 操作を実行することはできません。

キュー内メッセージ集計の最初の例

この例では、Aggregators Module を使用して、VM ​Listener​ 操作によって受信されたメッセージを集約し、集約の結果を反復処理します。

Anypoint Studio キャンバスのキュー内保留メッセージの集約の最初のフロー

最初のフローの作成

最初のフローでは、HTTP ​[Listener]​ ソースが受信 HTTP メッセージをリスンします。次に、VM ​Publish​ 操作が VM キュー内の特定のコンテンツをパブリッシュします。

  1. Studio で、新しい Mule プロジェクトを作成します。

  2. [Mule Palette (Mule パレット)]​ で、​[HTTP] > [Listener]​ ソースを選択してキャンバスにドラッグします。

  3. HTTP ​[Listener]​ 設定画面で、​[Path (パス)]​ 項目を ​/​ に設定します。

  4. [Connector configuration (コネクタ設定)]​ 項目の横にあるプラス記号 (+) をクリックして、アプリケーション内の HTTP ​Listener​ のすべてのインスタンスで使用できるグローバル要素を設定します。

  5. HTTP グローバル設定の必須項目を入力します。

  6. [OK]​ をクリックします。

  7. [For Each]​ コンポーネントを HTTP の ​[Listener]​ ソースの右にドラッグします。

  8. [Collection (コレクション)]​ を ​1 to 200​ に設定します。

  9. [VM] > [Publish]​ 操作を ​[For Each]​ コンポーネントにドラッグします。

  10. [Content (コンテンツ)]​ を ​"hello world"​ に設定します。

  11. [Connector configuration (コネクタ設定)]​ 項目の横にあるプラス記号 (+) をクリックして、アプリケーション内の VM ​Publish​ 操作のすべてのインスタンスで使用できるグローバル要素を設定します。

  12. [VM Config (VM 設定)]​ ウィンドウで、プラス記号 (+) をクリックして新しいキューを追加します。

  13. [Queue name (キュー名)]​ を ​demo​ に設定します。

  14. [Finish (完了)]​ をクリックします。

  15. [OK]​ をクリックします。

2 つ目のフローの作成

2 番目のフローでは VM ​[Listener]​ 操作が VM キューにパブリッシュされたメッセージをリスンします。​[Time based aggregator (時間ベースのアグリゲーター)]​ スコープを使用すると、時間条件を満たすまで受信したメッセージを集約することができます。

  1. Studio で ​[VM] > [Listener]​ 操作を最初のフローの下にドラッグします。

  2. [Connector configuration (コネクタ設定)]​ で、前に作成した ​VM_Config​ 設定を選択します。

  3. [Queue name (キュー名)]​ を ​demo​ に設定します。

  4. [Time based aggregator (時間ベースのアグリゲーター)]​ スコープを ​[VM Listener]​ の右にドラッグします。

  5. [Name (名前)]​ を ​demoaggregator​ に設定します。

  6. [Period (期間)]​ を ​5​ に設定して、5 秒間に達するまで受信メッセージを集約します。

3 つ目のフローを作成する

このフローでは、​[Aggregator listener]​ 操作は ​[Time based aggregator (時間ベースのアグリゲーター)]​ スコープによって集約されたメッセージをリスンします。​[For Each]​ コンポーネントはメッセージを反復処理し、​[Logger]​ コンポーネントは各メッセージのコンテンツを記録します。

  1. Studio で、​[Aggregator listener]​ ソースを 2 番目のフローの上にドラッグします。

  2. [Aggregator name (アグリゲーター名)]​ を ​demoaggregator​ に設定します。

  3. [For Each]​ コンポーネントを ​[Aggregator listener]​ ソースの右にドラッグします。

  4. [Collection (コレクション)]​ を ​[#payload]​ に設定します。

  5. [Logger]​ コンポーネントを ​[For Each]​ コンポーネントにドラッグします。

  6. [Message (メッセージ)]​ を ​[#payload]​ に設定します。

  7. Mule アプリケーションを保存して実行します。

  8. ターミナルで、curl コマンド ​curl http://localhost:8081/​ を実行します。

キュー内メッセージ集約の最初の例の XML

この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:aggregators="http://www.mulesoft.org/schema/mule/aggregators" xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
	xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/aggregators http://www.mulesoft.org/schema/mule/aggregators/current/mule-aggregators.xsd">
	<http:listener-config name="HTTP_Listener_config" basePath="/" >
		<http:listener-connection host="0.0.0.0" port="8081" />
	</http:listener-config>
	<vm:config name="VM_Config">
		<vm:connection />
		<vm:queues >
			<vm:queue queueName="demo" />
		</vm:queues>
	</vm:config>
	<flow name="demo-vm2Flow" >
		<http:listener config-ref="HTTP_Listener_config" path="/"/>
		<foreach collection="1 to 200">
			<vm:publish config-ref="VM_Config" queueName="demo" >
				<vm:content ><![CDATA[#["hello world"]]]></vm:content>
			</vm:publish>
		</foreach>
	</flow>
	<flow name="demo-vm2Flow2" >
		<aggregators:aggregator-listener aggregatorName="demoaggregator" includeTimedOutGroups="true"/>
		<foreach collection="#[payload]">
			<logger level="INFO" message="#[payload]"/>
		</foreach>
	</flow>
	<flow name="demo-vm2Flow1" >
		<vm:listener queueName="demo" config-ref="VM_Config"/>
		<aggregators:time-based-aggregator  name="demoaggregator" period="5">
		</aggregators:time-based-aggregator>
	</flow>
</mule>

キュー内メッセージ集計の 2 番目の例

この例では、​[For Each]​ コンポーネントは VM ​[Consume]​ 操作によってコンシュームされたメッセージを反復処理します。Mule アプリケーションは受信したすべてのメッセージを変数に追加し、各メッセージのペイロードコンテンツを記録します。この例のシナリオではキューにメッセージがなくなるまでメッセージを取得します。キューにメッセージがなくなると、VM ​[Consume]​ 操作はタイムアウトメッセージをスローし、メッセージは ​[On Error Continue]​ コンポーネントで処理され、結果が使用可能になります。

Anypoint Studio キャンバスのキュー内保留メッセージの集約の 2 番目のフロー

最初のフローの作成

最初のフローでは ​[HTTP Listener]​ ソースが受信 HTTP メッセージをリスンし、VM ​[Publish]​ 操作が VM キューに特定のコンテンツをパブリッシュします。

  1. Studio で、新しい Mule プロジェクトを作成します。

  2. [Mule Palette (Mule パレット)]​ で、​[HTTP] > [Listener]​ ソースを選択してキャンバスにドラッグします。

  3. HTTP ​[Listener]​ 設定画面で、​[Path (パス)]​ 項目を ​/​ に設定します。

  4. [Connector configuration (コネクタ設定)]​ 項目の横にあるプラス記号 (+) をクリックして、アプリケーション内の HTTP ​Listener​ のすべてのインスタンスで使用できるグローバル要素を設定します。

  5. HTTP グローバル設定の必須項目を入力します。

  6. [OK]​ をクリックします。

  7. [For Each]​ コンポーネントを HTTP の ​[Listener]​ ソースの右にドラッグします。

  8. [Collection (コレクション)]​ を ​1 to 55​ に設定します。

  9. [VM] > [Publish]​ 操作を ​[For Each]​ コンポーネントにドラッグします。

  10. [Content (コンテンツ)]​ を ​"test world"​ に設定します。

  11. [Connector configuration (コネクタ設定)]​ 項目の横にあるプラス記号 (+) をクリックして、アプリケーション内の VM ​Publish​ 操作のすべてのインスタンスで使用できるグローバル要素を設定します。

  12. [VM Config (VM 設定)]​ ウィンドウで、プラス記号 (+) をクリックして新しいキューを追加します。

  13. [Queue name (キュー名)]​ を ​demo​ に設定します。

  14. [Finish (完了)]​ をクリックします。

  15. [OK]​ をクリックします。

2 つ目のフローの作成

2 番目のフローでは VM ​[Consume]​ 操作がキューからパブリッシュされたメッセージを取得します。​[Transform Message]​ コンポーネントは、ペイロードコンテンツ変換し、変数に保存します。Mule アプリケーションは各変数ペイロードを記録します。

  1. Studio で ​[Scheduler]​ ソースを最初のフローの下にドラッグします。

  2. [Frequency (頻度)]​ を ​5000​ に設定します。

  3. [Set Variable]​ コンポーネントを ​[Scheduler]​ コンポーネントの右にドラッグします。

  4. [Name (名前)]​ を ​result​ に設定します。

  5. [Value (値)]​ を ​[]​ に設定します。

  6. [Try]​ スコープコンポーネントを ​[Set Variable]​ の右にドラッグします。

  7. [For Each]​ コンポーネントを ​[Try]​ スコープにドラッグします。

  8. [Collection (コレクション)]​ を ​1 to 10000​ に設定します。

  9. VM ​[Consume]​ 操作を ​[For Each]​ コンポーネントにドラッグします。

  10. [Connector configuration (コネクタ設定)]​ で、前に作成した ​VM_Config​ 設定を選択します。

  11. [Queue name (キュー名)]​ を ​demo​ に設定します。

  12. [Transform Message]​ コンポーネントを ​[Consume]​ の右にドラッグします。

  13. DataWeave スクリプトを次のように設定します。

%dw 2.0
output application/java
---
vars.result << payload
  1. [On Error Continue]​ コンポーネントを ​[Try]​ スコープコンポーネントの ​[Error handling (エラー処理)]​ セクションにドラッグします。

  2. [Type (種別)]​ を ​VM:EMPTY_QUEUE​ に設定します。

  3. [Set Payload]​ コンポーネントを ​[Try]​ スコープコンポーネントの右にドラッグします。

  4. [Value (値)]​ を ​vars.result​ に設定します。

  5. [For Each]​ コンポーネントを ​[Set Payload]​ コンポーネントの右にドラッグします。

  6. [Collection (コレクション)]​ を ​payload​ に設定します。

  7. [Logger]​ コンポーネントを ​[For Each]​ スコープコンポーネントにドラッグします。

  8. [Message (メッセージ)]​ を ​payload​ に設定します。

  9. Mule アプリケーションを保存して実行します。

  10. ターミナルで、curl コマンド ​curl http://localhost:8081/​ を実行します。

キュー内メッセージ集約の 2 番目の例の XML

この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core" xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
	xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd">
	<http:listener-config name="HTTP_Listener_config" basePath="/" >
		<http:listener-connection host="0.0.0.0" port="8081" />
	</http:listener-config>
	<vm:config name="VM_Config" >
		<vm:connection />
		<vm:queues >
			<vm:queue queueName="demo" />
		</vm:queues>
	</vm:config>
	<flow name="demo-vmFlow1" >
		<http:listener config-ref="HTTP_Listener_config" path="/"/>
		<foreach collection="1 to 55">
			<vm:publish config-ref="VM_Config" queueName="demo" >
				<vm:content ><![CDATA[#["test message"]]]></vm:content>
			</vm:publish>
		</foreach>
	</flow>
	<flow name="demo-vmFlow">
		<scheduler>
			<scheduling-strategy>
				<fixed-frequency frequency="5000" />
			</scheduling-strategy>
		</scheduler>
		<set-variable value="#[[]]" variableName="result" />
		<try>
			<foreach collection="#[1 to 10000]">
				<vm:consume config-ref="VM_Config" queueName="demo" />
				<ee:transform>
					<ee:message>
						<ee:set-payload><![CDATA[%dw 2.0
output application/java
---
vars.result << payload
]]></ee:set-payload>
					</ee:message>
					<ee:variables>
						<ee:set-variable variableName="result"><![CDATA[%dw 2.0
output application/java
---
vars.result<<payload]]></ee:set-variable>
					</ee:variables>
				</ee:transform>
			</foreach>
			<error-handler>
				<on-error-continue enableNotifications="true" logException="false" type="VM:EMPTY_QUEUE" />
			</error-handler>
		</try>
		<set-payload value="#[vars.result]" />
		<foreach collection="#[payload]" counterVariableName="message">
			<logger level="INFO" message="#[payload]" />
		</foreach>
	</flow>
</mule>