MQTT Connector を使用したメッセージのリスンとパブリッシュ

次の例では、メッセージをリスンして特定のトピックにパブリッシュするように MQTT 用 Anypoint Connector (MQTT Connector) の ​On New Message​ ソースと ​Publish​ 操作を設定します。

メッセージのリスンとパブリッシュを行う Mule アプリケーションの作成

次の例では、メッセージをリスンして特定のトピックにパブリッシュするように ​On New Message​ ソースと ​Publish​ 操作を設定します。

Studio キャンバスでの MQTT のリスンとパブリッシュの例

リスナーフローの作成

例の最初の部分では、検索条件 ​topic/quotes/#​ に一致するトピックの受信メッセージに対してイベントをトリガーする ​On New Message​ ソースを含むフローを作成し、受信したメッセージを記録する ​Logger​ コンポーネントを追加します。

  1. Studio > ​[Mule Palette (Mule パレット)]​ ビューで、​[MQTT3] > [On New Message]​ を選択します。

  2. [On New Messagee]​ ソースを Studio キャンバスにドラッグします。

  3. [Connector configuration (コネクタ設定)]​ 項目の横にあるプラス記号 (​+​) をクリックして、アプリケーション内のすべてのソースのインスタンスで使用できるグローバル要素を設定します。

  4. [MQTT3 Config (MQTT3 設定)]​ > ​[Connection (接続)]​ で ​[MQTT3 URL Connection (MQTT3 URL 接続)]​ を選択します。

  5. [Client id generator (クライアント ID ジェネレーター)]​ で ​[Client id custom expression generator (クライアント ID カスタム式ジェネレーター)]​ を選択します。

  6. 次の項目を設定します。

    • Client ID (クライアント ID)​: crowley123

    • Username (ユーザー名)​: mosquitto

    • Password (パスワード)​: mosquitto

    • URL​: tcp://0.0.0.0:1883

  7. [LWT]​ をクリックします。

  8. 次の項目を設定します。

    • Topic (トピック)​: topic/lwt

    • Body (本文)​: This is the LWT message

    • Is retained (保持)​: True

  9. [OK]​ をクリックします。

  10. [On New Message]​ 設定画面で、​[Topics (トピック)]​ を ​[Edit inline (インライン編集)]​ に設定します。

  11. プラス記号 (​+​) をクリックして、新しいトピックを追加します。

  12. [Topic filter (トピック検索条件)]​ で ​topic/quotes/#​ を設定します。

  13. [OK]​ をクリックします。

  14. [Logger]​ コンポーネントを ​[On New Message]​ ソースの右にドラッグします。

  15. [Message (メッセージ)]​ を ​Received message '#[payload]' with at topic #[attributes.topic] with qos #[attributes.qos]​ に設定します。

パブリッシャーフローの作成

例の 2 番目の部分では、メッセージをトピック ​topic/quotes/shakespeare​ にパブリッシュし、​listener-flow​ でイベントをトリガーする ​Publish​ 操作を含むフローを作成します。

  1. Studio で HTTP の ​[Listener]​ ソースを 1 番目の ​listener-flow​ の下にドラッグします。

  2. [Path (パス)]​ を ​publish/message​ に設定します。

  3. [Connector configuration (コネクタ設定)]​ 項目の横にあるプラス記号 (​+​) をクリックして、アプリケーション内のすべてのソースのインスタンスで使用できるグローバル要素を設定します。

  4. [Host (ホスト)]​ を ​0.0.0.0​ に、​[Port (ポート)]​ を ​8081​ に設定します。

  5. [OK]​ をクリックします。

  6. MQTT3 の ​[Publisher]​ 操作を HTTP の ​[Listener]​ ソースの横にドラッグします。

  7. [Connector configuration (コネクタ設定)]​ で、​listener-flow​ ファイルで作成された ​MQTT3_Config_1​ 設定を選択します。

  8. 次の項目を設定します。

    • Topic (トピック)​: topic/quotes/shakespeare

    • Message (メッセージ)​: "Uneasy lies the head that wears a crown"

  9. [Set Payload]​ コンポーネントを ​[Publish]​ 操作の右にドラッグします。

  10. [Value (値)]​ を ​SUCCESSFULLY PUBLISHED MESSAGE!​ に設定します。

  11. Mule アプリケーションを保存して実行します。

  12. メッセージをパブリッシュするには、次の curl コマンドを使用して HTTP エンドポイントに要求を送信します。​curl -v --request GET localhost:8081/publish/message

MQTT Connector を使用したメッセージのリスンとパブリッシュの XML

この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:mqtt3="http://www.mulesoft.org/schema/mule/mqtt3"
	xmlns="http://www.mulesoft.org/schema/mule/core"
	xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/mqtt3 http://www.mulesoft.org/schema/mule/mqtt3/current/mule-mqtt3.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">

	<mqtt3:config name="MQTT3_Config1" >
		<mqtt3:connection username="mosquitto" password="mosquitto" url="tcp://0.0.0.0:1883" >
			<mqtt3:client-id-generator >
	            <mqtt3:client-id-custom-expression-generator clientId="crowley123" />
	        </mqtt3:client-id-generator>
			<mqtt3:last-will-and-testament topic="topic/lwt" body="This is the LWT message" isRetained="true" />
		</mqtt3:connection>
	</mqtt3:config>
	<http:listener-config name="HTTP_Listener_config" >
		<http:listener-connection host="0.0.0.0" port="8081" />
	</http:listener-config>
	<flow name="listener-flow" >
		<mqtt3:listener doc:name="On New Message" config-ref="MQTT3_Config1">
			<mqtt3:topics >
				<mqtt3:topic topicFilter="topic/quotes/#" />
			</mqtt3:topics>
		</mqtt3:listener>
		<logger level="INFO" message="Received message '#[payload]' with at topic #[attributes.topic] with qos #[attributes.qos]" />
	</flow>
	<flow name="publisher-flow" >
		<http:listener config-ref="HTTP_Listener_config" path="publish/message"/>
		<mqtt3:publish config-ref="MQTT3_Config1" topic="topic/quotes/shakespeare">
			<mqtt3:message ><![CDATA[#["Uneasy lies the head that wears a crown"]]]></mqtt3:message>
		</mqtt3:publish>
		<set-payload value="SUCCESSFULLY PUBLISHED MESSAGE!" />
	</flow>
</mule>

Mule アプリケーションの作成と LWT メッセージトピックのサブスクライブ

次の例では、前述のリスナーフローを使用し、1 番目のフローと同じ LWT メッセージトピックをサブスクライブする 2 番目のリスナーフローを作成します。Mule アプリケーションがクラッシュし、1 番目のリスナーフローが切断されると、MQTT ブローカーは設定済みのトピックに LWT メッセージを送信し、メッセージを保持します。Mule アプリケーションが再起動すると、2 番目のリスナーフローは、保持された LWT メッセージをリスンして記録します。

Studio キャンバスでの MQTT リスナーフローの例

1 番目のリスナーフローの作成

例の最初の部分では、検索条件 ​topic/quotes/#​ に一致するトピックの受信メッセージに対してイベントをトリガーする ​On New Message​ ソースを含むフローを作成し、受信したメッセージを記録する ​Logger​ コンポーネントを追加します。

  1. Studio の ​[Mule Palette (Mule パレット)]​ ビューで、​[MQTT3] > [On New Message]​ を選択します。

  2. [On New Messagee]​ ソースを Studio キャンバスにドラッグします。

  3. [Connector configuration (コネクタ設定)]​ 項目の横にあるプラス記号 (​+​) をクリックして、アプリケーション内のすべてのソースのインスタンスで使用できるグローバル要素を設定します。

  4. [MQTT3 Config (MQTT3 設定)]​ ウィンドウの ​[Connection (接続)]​ で ​[MQTT3 URL Connection (MQTT3 URL 接続)]​ を選択します。

  5. [Client id generator (クライアント ID ジェネレーター)]​ で ​[Client id custom expression generator (クライアント ID カスタム式ジェネレーター)]​ を選択します。

  6. 次の項目を設定します。

    • Client ID (クライアント ID)​: crowley123

    • Username (ユーザー名)​: mosquitto

    • Password (パスワード)​: mosquitto

    • URL​: tcp://0.0.0.0:1883

  7. [LWT]​ タブをクリックします。

  8. 次の項目を設定します。

    • Topic (トピック)​: topic/lwt

    • Body (本文)​: This is the LWT message

    • Is retained (保持)​: True

  9. [OK]​ をクリックします。

  10. [On New Message]​ 設定画面で、​[Topics (トピック)]​ を ​[Edit inline (インライン編集)]​ に設定します。

  11. プラス記号 (​+​) をクリックして、新しいトピックを追加します。

  12. [Topic filter (トピック検索条件)]​ で ​topic/quotes/#​ を設定します。

  13. [OK]​ をクリックします。

  14. [Logger]​ コンポーネントを ​[On New Message]​ ソースの右にドラッグします。

  15. [Message (メッセージ)]​ を ​Received message '#[payload]' with at topic #[attributes.topic] with qos #[attributes.qos]​ に設定します。

2 番目のリスナーフローの作成

例の 2 番目の部分では、トピック ​topic/lwt​ をサブスクライブする別の ​On New Message​ ソースを追加することで 2 番目のリスナーフローを作成し、受信したメッセージを記録する ​Logger​ コンポーネントを追加します。

  1. Studio で、別の ​[On New Message]​ ソースを 1 番目の ​listener-flow​ の下にドラッグします。

  2. [Connector configuration (コネクタ設定)]​ 項目の横にあるプラス記号 (​+​) をクリックして、アプリケーション内のすべてのソースのインスタンスで使用できるグローバル要素を設定します。

  3. [MQTT3 Config (MQTT3 設定)]​ ウィンドウの ​[Connection (接続)]​ で ​[MQTT3 URL Connection (MQTT3 URL 接続)]​ を選択します。

  4. [Client id generator (クライアント ID ジェネレーター)]​ で ​[Client id random suffix generator (クライアント ID ランダムサフィックスジェネレーター)]​ を選択します。

  5. 次の項目を設定します。

    • Client ID (クライアント ID)​: azfell123

    • Username (ユーザー名)​: mosquitto

    • Password (パスワード)​: mosquitto

    • URL​: tcp://0.0.0.0:1884

  6. [OK]​ をクリックします。

  7. [On New Message]​ 設定画面で、​[Topics (トピック)]​ を ​[Edit inline (インライン編集)]​ に設定します。

  8. プラス記号 (​+​) をクリックして、新しいトピックを追加します。

  9. [Topic filter (トピック検索条件)]​ で ​topic/lwt​ を設定します。

  10. [OK]​ をクリックします。

  11. [Logger]​ コンポーネントを ​[On New Message]​ ソースの右にドラッグします。

  12. [Message (メッセージ)]​ を ​Received message '#[payload]' with at topic #[attributes.topic] with qos #[attributes.qos]​ に設定します。

  13. 保存して Mule アプリケーションを実行します。

  14. Mule アプリケーションのプロセス ID を探し、​kill -9 <process-id>​ のようなコマンドを使用してターミナルでアプリケーションを強制終了することで、アプリケーションのクラッシュをシミュレーションします。
    Mule アプリケーションがクラッシュするため、1 番目のリスナーフローは切断されます。

  15. Mule アプリケーションを再起動します。
    MQTT ブローカーは LWT メッセージを保持し、2 番目のリスナーフローは、1 番目のリスナーフローで設定された LWT メッセージを記録します。

LWT メッセージトピックをサブスクライブするメッセージをリスンするための XML

この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:mqtt3="http://www.mulesoft.org/schema/mule/mqtt3"
	xmlns="http://www.mulesoft.org/schema/mule/core"
	xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/mqtt3 http://www.mulesoft.org/schema/mule/mqtt3/current/mule-mqtt3.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">
	<mqtt3:config name="MQTT3_Config1" >
		<mqtt3:connection username="mosquitto" password="mosquitto" url="tcp://0.0.0.0:1883" >
			<mqtt3:client-id-generator>
	            <mqtt3:client-id-custom-expression-generator clientId="crowley123"/>
	        </mqtt3:client-id-generator>
			<mqtt3:last-will-and-testament topic="topic/lwt" body="This is the LWT message" isRetained="true" />
		</mqtt3:connection>
	</mqtt3:config>
	<http:listener-config >
		<http:listener-connection host="0.0.0.0" port="8081" />
	</http:listener-config>
	<mqtt3:config name="MQTT3_Config2" >
		<mqtt3:connection username="mosquitto" password="mosquitto" url="tcp://0.0.0.0:1884" />
		<mqtt3:client-id-generator>
					<mqtt3:client-id-random-suffix-generator clientId="azfell123" />
			</mqtt3:client-id-generator>
	</mqtt3:config>
	<flow name="listener-flow" >
		<mqtt3:listener config-ref="MQTT3_Config1">
			<mqtt3:topics >
				<mqtt3:topic topicFilter="topic/quotes/#" />
			</mqtt3:topics>
		</mqtt3:listener>
		<logger level="INFO" message="Received message '#[payload]' with at topic #[attributes.topic] with qos #[attributes.qos]"/>
	</flow>
	<flow name="listener-flow2" >
		<mqtt3:listener config-ref="MQTT3_Config2">
			<mqtt3:topics >
				<mqtt3:topic topicFilter="topic/lwt" />
			</mqtt3:topics>
		</mqtt3:listener>
		<logger level="INFO" message="Received message '#[payload]' with at topic #[attributes.topic] with qos #[attributes.qos]"/>
	</flow>
</mule>