フロー内でのメッセージのパブリッシュおよびコンシュームの例 - Mule 4

次の例は、2 つのフロー内でメッセージをパブリッシュおよびコンシュームするように JMS 用 Anypoint Connector (JMS Connector) の ​Publish consume​ 操作を設定し、相関 ID を使用して受信メッセージが特定の送信メッセージへの応答であることを確認する方法を示しています。

Anypoint Studio のキャンバスビューでの両方の Publish consume フロー
Figure 1. Studio のフロー内での Publish Consume
  • 最初のフローの ​HTTP Listener​ ソースで、HTTP 経由で投稿されるランダムなコンテンツを読み取ってフローを開始します。

  • Publish consume​ 操作では、HTTP Listener ソースから受信したコンテンツペイロードが含まれるメッセージを宛先キュー (​Q1​) にパブリッシュします。

  • 2 番目のフローの ​On New Message​ ソースで、最初のフローで設定したキュー ​Q1​ に送信されたメッセージをリスンします。

  • 引き続き 2 番目のフローの ​Publish​ 操作で、2 つのフロー内の要求メッセージと応答メッセージを相関する相関 ID 属性を使用して、メッセージを新しい宛先キュー (​Q2​) にパブリッシュします。

  • 最初のフローに戻り、​Publish consume​ 操作で、相関 ID に一致する、​Q2​ のキューに送信されたメッセージをコンシュームします。

  • 最後に最初のフローの ​Set Payload​ コンポーネントで値 ​It Works!​ を返します。

この例をテストするには、次が必要です。

  1. ActiveMQ ブローカーの Docker インスタンス。

  2. ActiveMQ ブローカーを使用して 2 つのキュー (​Q1​ と ​Q2​) を作成する。

  3. 2 つのフローを含む Mule アプリケーションを作成する。

  4. curl​ コマンドを使用してアプリケーションを実行およびテストする。

ActiveMQ ブローカーの Docker インスタンスをインストールする方法とキューを作成する方法は、各ガイドラインドキュメントを参照してください。

最初のフローの作成

メッセージをパブリッシュおよびコンシュームする Mule アプリケーションの作成を開始するには、最初のフローを作成します。

  1. Studio の ​[Mule Palette (Mule パレット)]​ ビューで、​[HTTP] > [Listener]​ を選択します。

  2. HTTP ​Listener​ ソースを Studio キャンバスにドラッグします。

  3. [HTTP Listener (HTTP リスナー)]​ 設定画面で、必要に応じて ​[Display Name (表示名)]​ 項目の値を変更します。

  4. [Path (パス)]​ 項目を ​/test​ に設定します。

  5. [Connector configuration (コネクタ設定)]​ 項目の横にあるプラス記号 (​+​) をクリックして、アプリケーション内の HTTP Listener のすべてのインスタンスで使用できるグローバル要素を設定します。

  6. [General (一般)]​ タブで、次のパラメーターを設定します。

    • Host (ホスト)​: All Interfaces [0.0.0.0] (default)

    • Port (ポート)​: 8081

  7. [OK]​ をクリックします。

  8. [Publish consume]​ 操作を ​[HTTP Listener]​ の右にドラッグします。

  9. [Connector configuration (コネクタ設定)]​ 項目の横にあるプラス記号 (​+​) をクリックして、アプリケーション内の ​[Publish consume]​ のすべてのインスタンスで使用できるグローバル要素を設定します。

  10. [Connection (接続)]​ 項目を ​[ActiveMQ Connection (ActiveMQ 接続)]​ に設定します。

  11. [Required Libraries (必須のライブラリ)]​ セクションで、​[Configure…​ (設定…​)]​ をクリックして連動関係をインストールします。

  12. [Specification (仕様)]​ 項目と ​[Caching strategy (キャッシュ戦略)]​ 項目をデフォルト値に設定します。

  13. [Username (ユーザー名)]​ 項目と ​[Password (パスワード)]​ 項目の両方を ​admin​ に設定します。

  14. [Factory configuration (ファクトリー設定)]​ を ​[Edit inline (インライン編集)]​ に設定します。

  15. [Broker url (ブローカー URL)]​ 項目を ​tcp://localhost:61616​ に設定します。

  16. [OK]​ をクリックします。

  17. [Publish consume] > [General (一般)]​ セクションで、​[Destination (宛先)]​ 項目を ​Q1​ に設定して 2 つのキューのうちの最初のキューを作成します。このキューでは HTTP Listener のペイロードを含むメッセージをパブリッシュします。

  18. この例では、​[Message (メッセージ)]​ セクションで ​[Correlation ID (相関 ID)]​ を ​0303456​ に設定します。

  19. [Reply To (応答先)]​ 項目を ​[Edit inline (インライン編集)]​ に設定します。

  20. [Destination Name (宛先名)]​ 項目を ​Q2​ に設定して 2 番目の宛先キューを作成します。このキューでは 2 番目のフローの ​Publish​ 操作で送信されたメッセージをコンシュームします。

  21. [Consume Configuration (コンシューム設定)]​ セクションで ​[Ack mode (肯定応答モード)]​ 項目を ​[IMMEDIATE (即時)]​ に設定します。

Anypoint Studio での Publish Consume 操作の設定
Figure 2. Publish Consume 操作の設定
  1. [Set Payload]​ コンポーネントを ​[Publish consume]​ の右にドラッグします。

  2. [Value (値)]​ 項目を ​It Works!​ に設定します。

2 つ目のフローの作成

引き続き 2 番目のフローを追加してアプリケーションを作成します。

  1. JMS の ​[On New Message]​ ソースを最初のフローの下にドラッグします。

  2. 最初のフローを作成するステップ 9 ~ 16 を繰り返して、ソースのグローバル要素を作成します。

  3. [On New Message]​ 設定画面で、​[Destination (宛先)]​ 項目を ​Q1​ に設定し、最初のフローの ​Publish consume​ 操作で設定した宛先をソースでリスンすることを示します。

  4. JMS の ​[Publish]​ 操作をソースの右にドラッグします。

  5. [Connector configuration (コネクタ設定)]​ を ​On New Message​ ソースと同じ設定にします。

  6. [Destination (宛先)]​ 項目を ​Q2​ に設定し、メッセージを最初のフローの ​Publish consume​ 操作の宛先キューにパブリッシュすることを示します。

  7. [Message (メッセージ)]​ セクションで ​[Correlation ID (相関 ID)]​ 項目を ​attributes.headers.correlationId​ に設定し、要求メッセージと応答メッセージを相関します。

Mule アプリケーションの実行およびテスト

Mule アプリケーションを作成したら、それを実行してテストします。

  1. Studio で、Mule アプリケーションを保存します。

  2. Package Explorer​ でプロジェクト名をクリックし、​[Run (実行)]​ > ​[Run As (別のユーザーとして実行)]​ > ​[Mule Application (Mule アプリケーション)]​ をクリックします。

  3. ブラウザーを開き、​http://0.0.0.0:8081/test​ と入力します。

フロー内でのパブリッシュおよびコンシュームの XML

この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:jms="http://www.mulesoft.org/schema/mule/jms" xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns="http://www.mulesoft.org/schema/mule/core"
	xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd">
	<http:listener-config name="HTTP_Listener_config" >
		<http:listener-connection host="0.0.0.0" port="8081" />
	</http:listener-config>
	<jms:config name="JMS_Config_ActiveMQ" >
		<jms:active-mq-connection username="admin" password="admin">
			<jms:factory-configuration brokerUrl="tcp://localhost:61616"/>
		</jms:active-mq-connection>
	</jms:config>
	<jms:config name="JMS_Config_ActiveMQ_2" >
		<jms:active-mq-connection username="admin" password="ßadmin">
			<jms:factory-configuration brokerUrl="tcp://localhost:61616"/>
		</jms:active-mq-connection>
	</jms:config>

	<flow name="demo-jms-attributesFlow" >
		<http:listener config-ref="HTTP_Listener_config" path="/test"/>
		<jms:publish-consume destination="Q1" config-ref="JMS_Config_ActiveMQ">
			<jms:message correlationId="0303456" >
				<jms:reply-to destination="Q2" />
			</jms:message>
			<jms:consume-configuration ackMode="IMMEDIATE" />
		</jms:publish-consume>
		<set-payload value="It Works!" doc:name="Set Payload" />
	</flow>
	<flow name="demo-jms-attributesFlow1" >
		<jms:listener doc:name="On New Message" config-ref="JMS_Config_ActiveMQ_2" destination="Q1"/>
		<jms:publish doc:name="Publish" config-ref="JMS_Config_ActiveMQ_2" destination="Q2">
			<jms:message correlationId="#[attributes.headers.correlationId]" />
		</jms:publish>
	</flow>
</mule>