MQTT Connector を使用した新規メッセージのリスン

次の例では、1 つ以上のトピック検索条件を使用して新しい受信メッセージをリスンできる MQTT 用 Anypoint Connector (MQTT Connector) の ​On New Message​ ソースを設定します。各トピック検索条件には特定のサービス品質 (QoS) が設定されています。

次の例では、​On New Message​ ソースを設定します。

  1. Studio > ​[Mule Palette (Mule パレット)]​ で、​[MQTT3] > [On New Message]​ を選択します。

  2. [On New Message]​ を Studio キャンバスにドラッグします。

  3. [On New Message]​ 設定画面で、必要に応じて ​[Display Name (表示名)]​ 項目の値を変更します。

  4. [Connector configuration (コネクタ設定)]​ 項目の横にあるプラス記号 (​+​) をクリックして、アプリケーション内のすべてのソースのインスタンスで使用できるグローバル要素を設定します。

  5. [MQTT3 Config (MQTT3 設定)]​ > ​[Connection (接続)]​ で、この設定に指定する接続種別のいずれかを選択します。

    • MQTT3 URL Connection (MQTT3 URL 接続)

    • MQTT3 Fail-Over Connection (MQTT3 フェールオーバー接続)

    • MQTT3 Form Connection (MQTT3 フォーム接続)

  1. ブローカーへの MQTT 接続を識別する一意の直観的な ​[Client ID (クライアント ID)]​ を指定します。

  2. [General (一般)]​ タブで接続情報 (ブローカーで必要なライブラリや接続の詳細オプションなど) も指定します。

  3. [LWT]​ タブで、必要に応じて LWT (Last Will and Testament) メッセージを指定します。

  4. [TLS/SSL]​ タブで、必要に応じて TLS 設定を指定します。

  5. [Advanced (詳細)]​ タブで、必要に応じて再接続戦略を指定します。

  6. [OK]​ をクリックして、ウィンドウを閉じます。

  7. [On New Message]​ 設定画面の ​[Topics (トピック)]​ で ​[Edit inline (インライン編集)]​ を選択し、リスナーでサブスクライブするトピックをリストします。

  8. プラス記号 (​+​) をクリックして、トピックを設定します。

  9. [Topic (トピック)]​ 設定画面で、トピックの単一レベルまたはマルチレベルのサブスクリプションを表す ​[Topic filter (トピック検索条件)]​ を設定します。

  10. [QoS]​ を次のいずれかのサービス品質レベルに設定します。

    • AT_MOST_ONCE

    • AT_LEAST_ONCE

    • EXACTLY_ONCE

  11. [Finish (完了)]​ をクリックします。

[Topic (トピック)] 検索条件が設定されている Studio での MQTT の [On New Message] ソースの設定

[Configuration XML (設定 XML)]​ エディターで、​<mqtt3:listener>​、​<mqtt3:topic>​、および ​topicFilter​ の設定は次のように記述されます。

    <flow name="listenerAuthorQuotes">
        <mqtt3:listener config-ref="MQTT_Config">
            <mqtt3:topics>
                <mqtt3:topic topicFilter="quotes/terryPratchett" qos="EXACTLY_ONCE"/>
                <mqtt3:topic topicFilter="quotes/neilGaiman" qos="AT_LEAST_ONCE" />
                <mqtt3:topic topicFilter="quotes/ianMcEwan" qos="AT_MOST_ONCE" />
            </mqtt3:topics>
        </mqtt3:listener>
    </flow>

トピック検索条件での単一レベルのワイルドカードの設定

複数のトピックをリスンするには、単一レベルのワイルドカードを設定します。単一レベルのワイルドカード (​+​) を使用すると、サブスクライバーは、特定の構造に一致するすべてのトピックにパブリッシュされたメッセージを受信できます。

次の例では、構造 ​quotes/+/authors​ を含む ​topicFilter​ パラメーターを設定します。

    <flow name="quotes">
        <mqtt3:listener config-ref="${config}">
            <mqtt3:topics>
                <mqtt3:topic topicFilter="quotes/+/authors"/>
            </mqtt3:topics>
        </mqtt3:listener>

        <logger level="DEBUG" message="A quote has been published to #[attributes.topic]: #[payload]"/>
    </flow>

次のトピックにパブリッシュされたメッセージではリスナーがトリガーされます。

  • quotes/british/authors

  • quotes/american/authors

ただし、次のトピックにパブリッシュされたメッセージではリスナーはトリガーされません。

  • names/british/authors

  • quotes/american/writers

トピック検索条件でのマルチレベルのワイルドカードの設定

同じルート (ハッシュ ​#​ 記号の前にあるすべて) を共有するすべてのトピックをリスナーでサブスクライブできるようにするには、マルチレベルのワイルドカードを設定します。

次の例では、構造 ​quotes/england/#​ を含む ​topicFilter​ パラメーターを設定します。

    <flow name="listenerArgentinaTemperature">
        <mqtt3:listener config-ref="${config}">
            <mqtt3:topics>
                <mqtt3:topic topicFilter="quotes/england/#"/>
            </mqtt3:topics>
        </mqtt3:listener>

        <logger level="DEBUG" message="A quote has been published to #[attributes.topic]: #[payload]"/>
    </flow>

次のトピックにパブリッシュされたメッセージではリスナーがトリガーされます。

  • quotes/england/authors/terryPratchett

  • quotes/england/authors/neilGaiman

  • quotes/england/actors

ただし、次のトピックにパブリッシュされたメッセージではリスナーはトリガーされません。

  • quotes/american/actors

  • phrases/england/authors/neilGaiman

設定を共有するリスナー

設定要素を共有するリスナーは、接続とクライアント ID も共有します。特に一方のリスナーが停止し、もう一方のリスナーが停止していない場合は、この点を覚えておくことが重要です。

次の例では、フロー ​listenerReaderA​ と ​listenerReaderB​ のリスナーが同じ接続 ​MQTT_Config​ 設定とクライアント ID を共有します。ブローカーの観点では、どちらのリスナーも識別不可能です。

<mqtt3:config name="MQTT_Config">
    <mqtt3:connection url="tcp://127.0.0.1:1883" >
        <mqtt3:client-id-generator>
            <mqtt3:client-id-random-suffix-generator clientId="smart-bentley-123" />
        </mqtt3:client-id-generator>
    </mqtt3:connection>
</mqtt3:config>

<flow name="listenerReaderA">
   <mqtt3:listener config-ref="MQTT_Config">
            <mqtt3:topics>
                <mqtt3:topic topicFilter="shakespeare"/>
                <mqtt3:topic topicFilter="terryPratchett"/>
            </mqtt3:topics>
   </mqtt3:listener>
   <logger level="INFO"  message="Received message '#[payload]' with at topic #[attributes.topic] with qos #[attributes.qos]">
</flow>
<flow name="listenerReaderB">
    <mqtt3:listener config-ref="MQTT_Config">
            <mqtt3:topics>
                <mqtt3:topic topicFilter="neilGaiman"/>
                <mqtt3:topic topicFilter="terryPratchett"/>
            </mqtt3:topics>
   </mqtt3:listener>
   <logger level="INFO"  message="Received message '#[payload]' with at topic #[attributes.topic] with qos #[attributes.qos]">
</flow>

この例でわかるように、リスナーはトピック ​terryPratchett​ のサブスクリプションも共有します。トピックを最初にサブスクライブしたリスナーがサブスクリプションのサービス品質 (QoS) を設定できます。該当のトピックに対して、1 つのサービス品質レベルを持つ 1 つのサブスクリプションのみが存在できます。

listenerReaderB​ フローが停止しても、​listenerReaderA​ フローが ​terryPratchett​ トピックのメッセージを引き続き受信して処理します。設定要素で ​cleanSession=false​ が指定されていても、​listenerReaderB​ がオフラインの間に処理されたメッセージは ​listenerReaderB​ に再送信されません。