Listen for New Messages with MQTT Connector

In these examples, you configure the Anypoint Connector for MQTT (MQTT Connector) On New Message source that enables you to listen for new incoming messages using one or more topic filters. Each topic filter has a specific quality of service (QoS) configured.

In the following example, you configure the On New Message source:

  1. In the Mule Palette view of Studio, select MQTT3 > On New Message.

  2. Drag On New Message to the Studio canvas.

  3. In the On New Message configuration screen, optionally change the value of the Display Name field.

  4. Click the plus sign (+) next to the Connector configuration field to configure a global element that can be used by all instances of the source in the app.

  5. In the MQTT3 Config window, for Connection, select one of the connection types to provide to this configuration:

    • MQTT3 URL Connection

    • MQTT3 Fail-Over Connection

    • MQTT3 Form Connection

  1. Specify a unique and intuitive Client ID to identify an MQTT connection to a broker.

  2. On the General tab, also specify connection information such as required libraries for the broker and advanced connection options.

  3. On the LWT tab, optionally specify Last Will and Testament message.

  4. On the TLS/SSL tab, optionally specify a TLS configuration.

  5. On the Advanced tab, optionally specify a reconnection strategy.

  6. Click OK to close the window.

  7. In the On New Message configuration screen, in Topics, select Edit inline to list the topics that the listener subscribes to.

  8. Click the plus sign (+) to configure a topic.

  9. In the Topic configuration screen, set Topic filter that represents a single or multilevel subscription to a topic.

  10. Set QoS to any of the following quality of service levels:

    • AT_MOST_ONCE

    • AT_LEAST_ONCE

    • EXACTLY_ONCE

  11. Click Finish.

MQTT On New Message source configuration in Studio with Topic filters configured

In the Configuration XML editor, the <mqtt3:listener>, <mqtt3:topic>, and topicFilter configurations look like this:

    <flow name="listenerAuthorQuotes">
        <mqtt3:listener config-ref="MQTT_Config">
            <mqtt3:topics>
                <mqtt3:topic topicFilter="quotes/terryPratchett" qos="EXACTLY_ONCE"/>
                <mqtt3:topic topicFilter="quotes/neilGaiman" qos="AT_LEAST_ONCE" />
                <mqtt3:topic topicFilter="quotes/ianMcEwan" qos="AT_MOST_ONCE" />
            </mqtt3:topics>
        </mqtt3:listener>
    </flow>

Configure Single-Level Wildcards in Topic Filters

Configure single-level wildcards to listen to multiple topics. Single-level wildcards (+) enable the subscriber to receive messages published to all topics that match a specific structure.

In the following example, you configure the topicFilter parameter with the structure quotes/+/authors:

    <flow name="quotes">
        <mqtt3:listener config-ref="${config}">
            <mqtt3:topics>
                <mqtt3:topic topicFilter="quotes/+/authors"/>
            </mqtt3:topics>
        </mqtt3:listener>

        <logger level="DEBUG" message="A quote has been published to #[attributes.topic]: #[payload]"/>
    </flow>

Messages published to the following topics trigger the listener:

  • quotes/british/authors

  • quotes/american/authors

However, messages published to the following topics do not trigger the listener:

  • names/british/authors

  • quotes/american/writers

Configure Multi-Level Wildcards in Topic Filters

Configure multi-level wildcards to enable the listener to subscribe to all topics that share the same root (everything that comes before the hash # symbol).

In the following example, you configure the topicFilter parameter with the structure quotes/england/#:

    <flow name="listenerArgentinaTemperature">
        <mqtt3:listener config-ref="${config}">
            <mqtt3:topics>
                <mqtt3:topic topicFilter="quotes/england/#"/>
            </mqtt3:topics>
        </mqtt3:listener>

        <logger level="DEBUG" message="A quote has been published to #[attributes.topic]: #[payload]"/>
    </flow>

Messages published to the following topics trigger the listener:

  • quotes/england/authors/terryPratchett

  • quotes/england/authors/neilGaiman

  • quotes/england/actors

However, messages published to the following topics do not trigger the listener:

  • quotes/american/actors

  • phrases/england/authors/neilGaiman

Listeners Sharing a Configuration

Listeners that share a configuration element also share a connection and client ID. This is important to remember, especially in the cases where one of the listeners is stopped but the other isn’t.

In the following example, the listeners of the flows listenerReaderA and listenerReaderB share the same connection MQTT_Config configuration and client ID. From the broker’s perspective both listeners are indistinguishable:

<mqtt3:config name="MQTT_Config">
    <mqtt3:connection url="tcp://127.0.0.1:1883" >
        <mqtt3:client-id-generator>
            <mqtt3:client-id-random-suffix-generator clientId="smart-bentley-123" />
        </mqtt3:client-id-generator>
    </mqtt3:connection>
</mqtt3:config>

<flow name="listenerReaderA">
   <mqtt3:listener config-ref="MQTT_Config">
            <mqtt3:topics>
                <mqtt3:topic topicFilter="shakespeare"/>
                <mqtt3:topic topicFilter="terryPratchett"/>
            </mqtt3:topics>
   </mqtt3:listener>
   <logger level="INFO"  message="Received message '#[payload]' with at topic #[attributes.topic] with qos #[attributes.qos]">
</flow>
<flow name="listenerReaderB">
    <mqtt3:listener config-ref="MQTT_Config">
            <mqtt3:topics>
                <mqtt3:topic topicFilter="neilGaiman"/>
                <mqtt3:topic topicFilter="terryPratchett"/>
            </mqtt3:topics>
   </mqtt3:listener>
   <logger level="INFO"  message="Received message '#[payload]' with at topic #[attributes.topic] with qos #[attributes.qos]">
</flow>

As you can see from the previous XML example, the listeners also share a subscription to the topic terryPratchett. Whichever listener subscribes first to the topic can set the subscription quality of service (QoS). Only one subscription with one quality of service level can exist for that topic.

If the listenerReaderB flow stops, the listenerReaderA flow still receives and processes the messages for the terryPratchett topic. Even if the configuration element specifies cleanSession=false, the messages processed while listenerReaderB was offline, are not resent to listenerReaderB.