Contact Us 1-800-596-4880

Anypoint MQ Connector 3.x Migration Guide - Mule 4

Anypoint MQ Connector version 3.0 includes new features, such as simplified configuration, standardized endpoints, and non-blocking functionality.

Global Configuration

In version 2.x of this connector, default-subscriber-config includes these parameters:

<anypoint-mq:default-subscriber-config
    name="Anypoint_MQ_Default_subscriber"
    acknowledgementMode="IMMEDIATE"
    acknowledgementTimeout="10000" <!-- (1) -->
    maxRedelivery="2"              <!-- (2) -->
    pollingTime="2000"
    fetchSize="5"                  <!-- (3) -->
    fetchTimeout="5000"            <!-- (3) -->
    frequency="2000">              <!-- (3) -->
  <anypoint-mq:connection          <!-- (4) -->
    clientId="${client.id}"
    clientSecret="${client.secret}"/>
  <anypoint-mq:circuit-breaker     <!-- (5) -->
    circuitName="InvoiceSettings"
    onErrorTypes="JMS:TIMEOUT"
    errorsThreshold="10" tripTimeout="1"
    tripTimeoutUnit="MINUTES" />
</anypoint-mq:default-subscriber-config>

In version 3.x of this connector, the config element contains only the connection settings:

<anypoint-mq:config name="Anypoint_MQ_Config">
    <anypoint-mq:connection
        clientId="${client.id}"
        clientSecret="${client.secret}" />
</anypoint-mq:config>

This table lists the 2.x config elements and their 3.x equivalents.

Element Version 2.x Behavior Version 3.x Equivalent

(1) - Acknowledgment

Global acknowledgment settings for the Anypoint MQ Subscriber sources.

Configure for each Subscriber instance.

(2) - Max Redelivery

Redelivery filter for Anypoint MQ messages located in the redeliveryCount header.

Deprecated and no longer available as a standalone filter. Use "DLQ + Idempotent Filter" pattern instead.

(3) - Message Fetch

All parameters related to how messages are retrieved from the Anypoint MQ service, including prefetch configuration.

Configure all the fetching strategy parameters on each Subscriber under the Subscriber Type parameter.

(4) - Connection

Connection parameters.

Idem configuration for all the connection settings.

(5) - Circuit Breaker

Circuit Breaker configuration for the Anypoint MQ Subscriber source.

Handle each Circuit Breaker instance either as a standalone element referenced by each Anypoint MQ Subscriber or as a private Circuit Breaker for the Subscriber source that declares it.

Subscriber Listener

In version 3.x of this connector, the Subscriber message source configuration for message listening has changed.

For information about the new subscriber, see Anypoint MQ Subscriber Source.

Message Acknowledgment

In version 2.x of this connector, the acknowledgement mode of the Subscriber source is defined at the configuration level:

<anypoint-mq:default-subscriber-config name="Anypoint_MQ_Default_subscriber"
       acknowledgementMode="MANUAL" acknowledgementTimeout="120000">
    <anypoint-mq:connection clientId="${client.id}" clientSecret="${client.secret}"/>
</anypoint-mq:default-subscriber-config>

In version 3.x, these parameters are part of the Subscriber source itself and affect only that single Subscriber instance. The semantics of the acknowledgment parameters has not changed. Here’s the equivalent configuration of the Subscriber source:

<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config"
    destination="queue"
    acknowledgementMode="MANUAL"
    acknowledgementTimeout="2"
    acknowledgementTimeoutUnit="MINUTES"/>

Prefetch Mode

In version 2.x of this connector, prefetch mode is enabled by default and you specified all related parameters at the configuration level:

<anypoint-mq:default-subscriber-config
      name="Anypoint_MQ_Default_subscriber"
      pollingTime="2000" fetchSize="5"
      fetchTimeout="5000" frequency="2000">
    <anypoint-mq:connection clientId="${client.id}"
      clientSecret="${client.secret}"/>
</anypoint-mq:default-subscriber-config>

Version 3.x includes a new concept called Subscriber Type, in which you control the prefetch configuration.

<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config"
    destination="queue">
   <anypoint-mq:subscriber-type>
     <anypoint-mq:prefetch maxLocalMessages="20"/>
   </anypoint-mq:subscriber-type>
</anypoint-mq:subscriber>

This table lists the 2.x prefetch parameters and their 3.x equivalents.

Version 2.x Parameter Description Version 3.x Equivalent Description

fetchSize

Declared at the config level, controls the size of the local message buffer, which is calculated as three times the fetchSize value.

maxLocalMessages

Declared at the Subscriber level, explicitly controls the maximum size that the local message buffer can have. This parameter serves as a target number, attempting to fetch as many messages as needed to fill the buffer completely.

fetchTimeout

Controls the time waiting for a response from the service.

Deprecated

This parameter is no longer available. The connector always uses the maximum long-polling value.

frequency

Controls when to recheck for messages after the queue is empty.

Deprecated

This parameter is no longer available. The connector handles the frequency internally, fixed at 1 second.

pollingTime

Used only when fetchSize is 0, effectively disabling the prefetch mode and enabling polling mode.

Deprecated

This parameter is replaced by the more powerful polling subscriber type.

Polling Mode

Version 2.x of the Anypoint MQ Connector allowed you to disable the prefetch mode by setting the fetchSize parameter to 0 and then using pollingTime as a fixed-frequency polling scheduler.

In version 3.x of this connector, the polling mode of the Subscriber source is simplified and normalized. You can use the schedulers provided by Mule runtime engine out of the box, either as fixed-frequency or cron:

  • fixed-frequency

    <anypoint-mq:subscriber config-ref="Anypoint_MQ_Config"
      destination="queue">
       <anypoint-mq:subscriber-type >
          <anypoint-mq:polling fetchSize="9">
             <scheduling-strategy >
                <fixed-frequency frequency="1" timeUnit="SECONDS" />
             </scheduling-strategy>
          </anypoint-mq:polling>
       </anypoint-mq:subscriber-type>
    </anypoint-mq:subscriber>
  • cron

    <anypoint-mq:subscriber destination="queue"
        config-ref="Anypoint_MQ_Config">
        <anypoint-mq:subscriber-type>
            <anypoint-mq:polling fetchSize="9">
                <scheduling-strategy>
                    <cron expression="0 * 14 * * ?"
                    timeZone="America/Los_Angeles"/>
                </scheduling-strategy>
            </anypoint-mq:polling>
        </anypoint-mq:subscriber-type>
    </anypoint-mq:subscriber>

This table lists the 2.x polling parameters and their 3.x equivalents.

Version 2.x Parameter Description Version 3.x Equivalent Description

fetchSize

Declared at the config level, controls the size of the local message buffer, which is calculated as three times the fetchSize value.

fetchSize

Declared at the Subscriber level, sets the maximum number (1-10) of messages to fetch on each polling execution. The default is 10.

fetchTimeout

Limits the wait time for a response from the service.

Deprecated

This parameter is no longer available. The connector always uses the maximum long-polling value.

frequency

Controls when to recheck for messages after the queue is empty.

Deprecated

This parameter is no longer available. The connector uses a fixed frequency of 1 second.

pollingTime

Used only when fetchSize is 0, effectively disabling the prefetch mode and enabling polling mode.

Deprecated

This parameter is replaced by the more powerful polling subscriber type.

Circuit Breaker

The circuit breaker declaration is no longer in default-subscriber-config and is either a global element or an inline declaration.

In version 2.x of this connector, you declared the circuit breaker as part of the Subscriber configuration and then referenced it across multiple Subscriber sources.

<anypoint-mq:default-subscriber-config name="ConfigWithCircuit" >
     <anypoint-mq:connection url="${providerUrl}"
       clientId="${clientId}"
      clientSecret="${clientSecret}"/>
   <anypoint-mq:circuit-breaker
       circuitName="InvoiceProcess"
       onErrorTypes="FTP:RETRY_EXHAUSTED, HTTP:SERVICE_UNAVAILABLE"
       errorsThreshold="10"
       tripTimeout="5"
       tripTimeoutUnit="MINUTES"/>
</anypoint-mq:default-subscriber-config>

<flow name="subscribe">
    <anypoint-mq:subscriber
        config-ref="ConfigWithCircuit"
        destination="${reservationsQueue}"/>
    <flow-ref name="invoiceProcess">
</flow>

<flow name="otherSubscribe">
    <anypoint-mq:subscriber
        config-ref="ConfigWithCircuit"
        destination="${paymentsQueue}"/>
    <flow-ref name="invoiceProcess">
</flow>

<sub-flow name="invoiceProcess">
  <ftp:write path="${auditFolder}" config-ref="ftp-config"/>
  <http:request config-ref="requestConfig" path="/external"/>
</sub-flow>

In version 3.x of this connector, you declare one standalone global element, and reference that global element from each subscriber, no longer binding a connection to the app logic circuit:

<anypoint-mq:config name="Anypoint_MQ_Config">
    <anypoint-mq:connection url="${providerUrl}"
       clientId="${clientId}"
       clientSecret="${clientSecret}"/>
</anypoint-mq:config>

<anypoint-mq:circuit-breaker
    name="InvoiceProcess"
    onErrorTypes="FTP:RETRY_EXHAUSTED, HTTP:SERVICE_UNAVAILABLE"
    errorsThreshold="10"
    tripTimeout="5"
    tripTimeoutUnit="MINUTES"/>

<flow name="subscribe">
    <anypoint-mq:subscriber
        destination="${reservationsQueue}"
        config-ref="Anypoint_MQ_Config"
        circuitBreaker="GlobalCircuit"/>
    <flow-ref name="invoiceProcess">
</flow>

<flow name="otherSubscribe">
    <anypoint-mq:subscriber
        destination="${paymentsQueue}"
        config-ref="Anypoint_MQ_Config"
        circuitBreaker="GlobalCircuit"/>
    <flow-ref name="invoiceProcess">
</flow>

<sub-flow name="invoiceProcess">
  <ftp:write path="${auditFolder}" config-ref="ftp-config"/>
  <http:request config-ref="requestConfig" path="/external"/>
</sub-flow>

Publish Operation

In version 2.x of this connector, Publish operation properties are declared like this:

<anypoint-mq:publish config-ref="Anypoint_MQ_Default_subscriber"
       destination="queue"
       messageId="#[vars.messageId]" sendContentType="false">
   <anypoint-mq:body >#[vars.messageBody]</anypoint-mq:body>
   <anypoint-mq:properties >
      <anypoint-mq:property key="MSG_TYPE" value="My Value"/>
   </anypoint-mq:properties>
</anypoint-mq:publish>

In version 3.x, you declare the same properties as a dynamic map, subject to transformations and a dynamic number of keys, instead of having fixed keys for each message:

<anypoint-mq:publish config-ref="Anypoint_MQ_Config"
     destination="queue"
     messageId="#[vars.currentId]" sendContentType="false">
   <anypoint-mq:body >#[vars.messageBody]</anypoint-mq:body>
   <anypoint-mq:properties ><![CDATA[#[output application/java -----
{
   "MSG_TYPE" : vars.msgType
}]]]></anypoint-mq:properties>
</anypoint-mq:publish>

This table lists the 2.x Publish operation and its 3.x equivalents.

Changes Version 2.x Version 3.x

Execution Type

Blocking

Non-Blocking

Properties Parameter

Fixed key-value map

Dynamic "content" parameter map

For more information, see Anypoint MQ Publish Operation

Consume Operation

This table lists the 2.x Consume operation and its 3.x equivalents.

Changes Version 2.x Version 3.x

Execution Type

Blocking

Non-Blocking

Default Acknowledgment Mode

MANUAL

IMMEDIATE

For more information, see Anypoint MQ Consume Operation.

ACK and NACK Operations

The ACK and NACK operations use the ackToken string.

You can obtain ackToken from the attributes element. The ackToken value is available only for messages that have a MANUAL acknowledgment mode.

This table lists the 2.x ACK and NACK operations and their 3.x equivalents.

Version 2.x Version 3.x

Operations received a messageContext parameter, whose value had to be the attributes element from a given message.

The ackToken string of the message for performing ACK or NACK of a message.

For more information, see Anypoint MQ ACK and NACK Operations.

Platform Compatibility

Software Version

Mule Runtime Engine

4.1.1 and later

Anypoint Studio

v7 and later

View on GitHub