VM Connector を使用したキュー内保留メッセージの集約の例

次の例は、仮想マシン用 Anypoint Connector (VM Connector) を使用して、キューで保留中のメッセージを集約する方法についての 2 つのシナリオを示しています。

VM はキューであり、キュー内で保留中のすべてのメッセージに対して for each 操作を実行することはできません。

キュー内メッセージ集計の最初の例

この例では、Aggregators Module を使用して、VM ​Listener​ 操作によって受信されたメッセージを集約し、集約の結果を反復処理します。

Anypoint Studio キャンバスのキュー内保留メッセージの集約の最初のフロー

最初のフローの作成

最初のフローでは、HTTP ​[Listener]​ ソースが受信 HTTP メッセージをリスンします。次に、VM ​Publish​ 操作が VM キュー内の特定のコンテンツをパブリッシュします。

  1. Studio で、新しい Mule プロジェクトを作成します。

  2. [Mule Palette (Mule パレット)]​ で、​[HTTP] > [Listener]​ ソースを選択してキャンバスにドラッグします。

  3. HTTP ​[Listener]​ 設定画面で、​[Path (パス)]​ 項目を ​/​ に設定します。

  4. [Connector configuration (コネクタ設定)]​ 項目の横にあるプラス記号 (+) をクリックして、アプリケーション内の HTTP ​Listener​ のすべてのインスタンスで使用できるグローバル要素を設定します。

  5. HTTP グローバル設定の必須項目を入力します。

  6. [OK]​ をクリックします。

  7. [For Each]​ コンポーネントを HTTP の ​[Listener]​ ソースの右にドラッグします。

  8. [Collection (コレクション)]​ を ​1 to 200​ に設定します。

  9. [VM] > [Publish]​ 操作を ​[For Each]​ コンポーネントにドラッグします。

  10. [Content (コンテンツ)]​ を ​"hello world"​ に設定します。

  11. [Connector configuration (コネクタ設定)]​ 項目の横にあるプラス記号 (+) をクリックして、アプリケーション内の VM ​Publish​ 操作のすべてのインスタンスで使用できるグローバル要素を設定します。

  12. [VM Config (VM 設定)]​ ウィンドウで、プラス記号 (+) をクリックして新しいキューを追加します。

  13. [Queue name (キュー名)]​ を ​demo​ に設定します。

  14. [Finish (完了)]​ をクリックします。

  15. [OK]​ をクリックします。

2 つ目のフローの作成

2 番目のフローでは VM ​[Listener]​ 操作が VM キューにパブリッシュされたメッセージをリスンします。​[Time based aggregator (時間ベースのアグリゲーター)]​ スコープを使用すると、時間条件を満たすまで受信したメッセージを集約することができます。

  1. Studio で ​[VM] > [Listener]​ 操作を最初のフローの下にドラッグします。

  2. [Connector configuration (コネクタ設定)]​ で、前に作成した ​VM_Config​ 設定を選択します。

  3. [Queue name (キュー名)]​ を ​demo​ に設定します。

  4. [Time based aggregator (時間ベースのアグリゲーター)]​ スコープを ​[VM Listener]​ の右にドラッグします。

  5. [Name (名前)]​ を ​demoaggregator​ に設定します。

  6. [Period (期間)]​ を ​5​ に設定して、5 秒間に達するまで受信メッセージを集約します。

3 つ目のフローを作成する

このフローでは、​[Aggregator listener]​ 操作は ​[Time based aggregator (時間ベースのアグリゲーター)]​ スコープによって集約されたメッセージをリスンします。​[For Each]​ コンポーネントはメッセージを反復処理し、​[Logger]​ コンポーネントは各メッセージのコンテンツを記録します。

  1. Studio で、​[Aggregator listener]​ ソースを 2 番目のフローの上にドラッグします。

  2. [Aggregator name (アグリゲーター名)]​ を ​demoaggregator​ に設定します。

  3. [For Each]​ コンポーネントを ​[Aggregator listener]​ ソースの右にドラッグします。

  4. [Collection (コレクション)]​ を ​[#payload]​ に設定します。

  5. [Logger]​ コンポーネントを ​[For Each]​ コンポーネントにドラッグします。

  6. [Message (メッセージ)]​ を ​[#payload]​ に設定します。

  7. Mule アプリケーションを保存して実行します。

  8. ターミナルで、curl コマンド ​curl http://localhost:8081/Leaving the Site​ を実行します。

キュー内メッセージ集約の最初の例の XML

この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。

キュー内メッセージ集計の 2 番目の例

この例では、​[For Each]​ コンポーネントは VM ​[Consume]​ 操作によってコンシュームされたメッセージを反復処理します。Mule アプリケーションは受信したすべてのメッセージを変数に追加し、各メッセージのペイロードコンテンツを記録します。この例のシナリオではキューにメッセージがなくなるまでメッセージを取得します。キューにメッセージがなくなると、VM ​[Consume]​ 操作はタイムアウトメッセージをスローし、メッセージは ​[On Error Continue]​ コンポーネントで処理され、結果が使用可能になります。

Anypoint Studio キャンバスのキュー内保留メッセージの集約の 2 番目のフロー

最初のフローの作成

最初のフローでは ​[HTTP Listener]​ ソースが受信 HTTP メッセージをリスンし、VM ​[Publish]​ 操作が VM キューに特定のコンテンツをパブリッシュします。

  1. Studio で、新しい Mule プロジェクトを作成します。

  2. [Mule Palette (Mule パレット)]​ で、​[HTTP] > [Listener]​ ソースを選択してキャンバスにドラッグします。

  3. HTTP ​[Listener]​ 設定画面で、​[Path (パス)]​ 項目を ​/​ に設定します。

  4. [Connector configuration (コネクタ設定)]​ 項目の横にあるプラス記号 (+) をクリックして、アプリケーション内の HTTP ​Listener​ のすべてのインスタンスで使用できるグローバル要素を設定します。

  5. HTTP グローバル設定の必須項目を入力します。

  6. [OK]​ をクリックします。

  7. [For Each]​ コンポーネントを HTTP の ​[Listener]​ ソースの右にドラッグします。

  8. [Collection (コレクション)]​ を ​1 to 55​ に設定します。

  9. [VM] > [Publish]​ 操作を ​[For Each]​ コンポーネントにドラッグします。

  10. [Content (コンテンツ)]​ を ​"test world"​ に設定します。

  11. [Connector configuration (コネクタ設定)]​ 項目の横にあるプラス記号 (+) をクリックして、アプリケーション内の VM ​Publish​ 操作のすべてのインスタンスで使用できるグローバル要素を設定します。

  12. [VM Config (VM 設定)]​ ウィンドウで、プラス記号 (+) をクリックして新しいキューを追加します。

  13. [Queue name (キュー名)]​ を ​demo​ に設定します。

  14. [Finish (完了)]​ をクリックします。

  15. [OK]​ をクリックします。

2 つ目のフローの作成

2 番目のフローでは VM ​[Consume]​ 操作がキューからパブリッシュされたメッセージを取得します。​[Transform Message]​ コンポーネントは、ペイロードコンテンツ変換し、変数に保存します。Mule アプリケーションは各変数ペイロードを記録します。

  1. Studio で ​[Scheduler]​ ソースを最初のフローの下にドラッグします。

  2. [Frequency (頻度)]​ を ​5000​ に設定します。

  3. [Set Variable]​ コンポーネントを ​[Scheduler]​ コンポーネントの右にドラッグします。

  4. [Name (名前)]​ を ​result​ に設定します。

  5. [Value (値)]​ を ​[]​ に設定します。

  6. [Try]​ スコープコンポーネントを ​[Set Variable]​ の右にドラッグします。

  7. [For Each]​ コンポーネントを ​[Try]​ スコープにドラッグします。

  8. [Collection (コレクション)]​ を ​1 to 10000​ に設定します。

  9. VM ​[Consume]​ 操作を ​[For Each]​ コンポーネントにドラッグします。

  10. [Connector configuration (コネクタ設定)]​ で、前に作成した ​VM_Config​ 設定を選択します。

  11. [Queue name (キュー名)]​ を ​demo​ に設定します。

  12. [Transform Message]​ コンポーネントを ​[Consume]​ の右にドラッグします。

  13. DataWeave スクリプトを次のように設定します。

%dw 2.0
output application/java
---
vars.result << payload
  1. [On Error Continue]​ コンポーネントを ​[Try]​ スコープコンポーネントの ​[Error handling (エラー処理)]​ セクションにドラッグします。

  2. [Type (種別)]​ を ​VM:EMPTY_QUEUE​ に設定します。

  3. [Set Payload]​ コンポーネントを ​[Try]​ スコープコンポーネントの右にドラッグします。

  4. [Value (値)]​ を ​vars.result​ に設定します。

  5. [For Each]​ コンポーネントを ​[Set Payload]​ コンポーネントの右にドラッグします。

  6. [Collection (コレクション)]​ を ​payload​ に設定します。

  7. [Logger]​ コンポーネントを ​[For Each]​ スコープコンポーネントにドラッグします。

  8. [Message (メッセージ)]​ を ​payload​ に設定します。

  9. Mule アプリケーションを保存して実行します。

  10. ターミナルで、curl コマンド ​curl http://localhost:8081/Leaving the Site​ を実行します。

キュー内メッセージ集約の 2 番目の例の XML

この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。