<anypoint-mq:default-subscriber-config
name="Anypoint_MQ_Default_subscriber"
acknowledgementMode="IMMEDIATE"
acknowledgementTimeout="10000" <!-- (1) -->
maxRedelivery="2" <!-- (2) -->
pollingTime="2000"
fetchSize="5" <!-- (3) -->
fetchTimeout="5000" <!-- (3) -->
frequency="2000"> <!-- (3) -->
<anypoint-mq:connection <!-- (4) -->
clientId="${client.id}"
clientSecret="${client.secret}"/>
<anypoint-mq:circuit-breaker <!-- (5) -->
circuitName="InvoiceSettings"
onErrorTypes="JMS:TIMEOUT"
errorsThreshold="10" tripTimeout="1"
tripTimeoutUnit="MINUTES" />
</anypoint-mq:default-subscriber-config>
Anypoint MQ Connector 3.x Migration Guide - Mule 4
Anypoint MQ Connector version 3.0 includes new features, such as simplified configuration, standardized endpoints, and non-blocking functionality.
Global Configuration
In version 2.x of this connector, default-subscriber-config
includes these parameters:
In version 3.x of this connector, the config
element contains only the connection settings:
<anypoint-mq:config name="Anypoint_MQ_Config">
<anypoint-mq:connection
clientId="${client.id}"
clientSecret="${client.secret}" />
</anypoint-mq:config>
This table lists the 2.x config
elements and their 3.x equivalents.
Element | Version 2.x Behavior | Version 3.x Equivalent |
---|---|---|
(1) - Acknowledgment |
Global acknowledgment settings for the Anypoint MQ Subscriber sources. |
Configure for each Subscriber instance. |
(2) - Max Redelivery |
Redelivery filter for Anypoint MQ messages located in the |
Deprecated and no longer available as a standalone filter. Use |
(3) - Message Fetch |
All parameters related to how messages are retrieved from the Anypoint MQ service, including |
Configure all the fetching strategy parameters on each Subscriber under the |
(4) - Connection |
Connection parameters. |
Idem configuration for all the connection settings. |
(5) - Circuit Breaker |
Circuit Breaker configuration for the Anypoint MQ Subscriber source. |
Handle each Circuit Breaker instance either as a standalone element referenced by each Anypoint MQ Subscriber or as a private Circuit Breaker for the Subscriber source that declares it. |
Subscriber Listener
In version 3.x of this connector, the Subscriber
message source configuration for message listening has changed.
For information about the new subscriber, see Anypoint MQ Subscriber Source.
Message Acknowledgment
In version 2.x of this connector, the acknowledgement mode of the Subscriber source is defined at the configuration level:
<anypoint-mq:default-subscriber-config name="Anypoint_MQ_Default_subscriber"
acknowledgementMode="MANUAL" acknowledgementTimeout="120000">
<anypoint-mq:connection clientId="${client.id}" clientSecret="${client.secret}"/>
</anypoint-mq:default-subscriber-config>
In version 3.x, these parameters are part of the Subscriber source itself and affect only that single Subscriber instance. The semantics of the acknowledgment parameters has not changed. Here’s the equivalent configuration of the Subscriber source:
<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config"
destination="queue"
acknowledgementMode="MANUAL"
acknowledgementTimeout="2"
acknowledgementTimeoutUnit="MINUTES"/>
Prefetch Mode
In version 2.x of this connector, prefetch
mode is enabled by default and you specified all related parameters at the configuration level:
<anypoint-mq:default-subscriber-config
name="Anypoint_MQ_Default_subscriber"
pollingTime="2000" fetchSize="5"
fetchTimeout="5000" frequency="2000">
<anypoint-mq:connection clientId="${client.id}"
clientSecret="${client.secret}"/>
</anypoint-mq:default-subscriber-config>
Version 3.x includes a new concept called Subscriber Type
, in which you control the prefetch
configuration.
<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config"
destination="queue">
<anypoint-mq:subscriber-type>
<anypoint-mq:prefetch maxLocalMessages="20"/>
</anypoint-mq:subscriber-type>
</anypoint-mq:subscriber>
This table lists the 2.x prefetch
parameters and their 3.x equivalents.
Version 2.x Parameter | Description | Version 3.x Equivalent | Description |
---|---|---|---|
|
Declared at the |
|
Declared at the When using |
|
Controls the time waiting for a response from the service. |
Deprecated |
This parameter is no longer available. The connector always uses the maximum long-polling value. |
|
Controls when to recheck for messages after the queue is empty. |
Deprecated |
This parameter is no longer available. The connector handles the frequency internally, fixed at 1 second. |
|
Used only when |
Deprecated |
This parameter is replaced by the more powerful |
Polling Mode
Version 2.x of the Anypoint MQ Connector allowed you to disable the prefetch
mode by setting the fetchSize
parameter to 0
and then using pollingTime
as a fixed-frequency polling scheduler.
In version 3.x of this connector, the polling mode of the Subscriber source is simplified and normalized. You can use the schedulers provided by Mule runtime engine out of the box, either as fixed-frequency
or cron
:
-
fixed-frequency
<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config" destination="queue"> <anypoint-mq:subscriber-type > <anypoint-mq:polling fetchSize="9"> <scheduling-strategy > <fixed-frequency frequency="1" timeUnit="SECONDS" /> </scheduling-strategy> </anypoint-mq:polling> </anypoint-mq:subscriber-type> </anypoint-mq:subscriber>
-
cron
<anypoint-mq:subscriber destination="queue" config-ref="Anypoint_MQ_Config"> <anypoint-mq:subscriber-type> <anypoint-mq:polling fetchSize="9"> <scheduling-strategy> <cron expression="0 * 14 * * ?" timeZone="America/Los_Angeles"/> </scheduling-strategy> </anypoint-mq:polling> </anypoint-mq:subscriber-type> </anypoint-mq:subscriber>
This table lists the 2.x polling
parameters and their 3.x equivalents.
Version 2.x Parameter | Description | Version 3.x Equivalent | Description |
---|---|---|---|
|
Declared at the |
|
Declared at the |
|
Limits the wait time for a response from the service. |
Deprecated |
This parameter is no longer available. The connector always uses the maximum long-polling value. |
|
Controls when to recheck for messages after the queue is empty. |
Deprecated |
This parameter is no longer available. The connector uses a fixed frequency of 1 second. |
|
Used only when |
Deprecated |
This parameter is replaced by the more powerful |
Circuit Breaker
The circuit breaker declaration is no longer in default-subscriber-config
and is either a global element or an inline declaration.
In version 2.x of this connector, you declared the circuit breaker as part of the Subscriber configuration and then referenced it across multiple Subscriber sources.
<anypoint-mq:default-subscriber-config name="ConfigWithCircuit" >
<anypoint-mq:connection url="${providerUrl}"
clientId="${clientId}"
clientSecret="${clientSecret}"/>
<anypoint-mq:circuit-breaker
circuitName="InvoiceProcess"
onErrorTypes="FTP:RETRY_EXHAUSTED, HTTP:SERVICE_UNAVAILABLE"
errorsThreshold="10"
tripTimeout="5"
tripTimeoutUnit="MINUTES"/>
</anypoint-mq:default-subscriber-config>
<flow name="subscribe">
<anypoint-mq:subscriber
config-ref="ConfigWithCircuit"
destination="${reservationsQueue}"/>
<flow-ref name="invoiceProcess">
</flow>
<flow name="otherSubscribe">
<anypoint-mq:subscriber
config-ref="ConfigWithCircuit"
destination="${paymentsQueue}"/>
<flow-ref name="invoiceProcess">
</flow>
<sub-flow name="invoiceProcess">
<ftp:write path="${auditFolder}" config-ref="ftp-config"/>
<http:request config-ref="requestConfig" path="/external"/>
</sub-flow>
In version 3.x of this connector, you declare one standalone global element, and reference that global element from each subscriber, no longer binding a connection to the app logic circuit:
<anypoint-mq:config name="Anypoint_MQ_Config">
<anypoint-mq:connection url="${providerUrl}"
clientId="${clientId}"
clientSecret="${clientSecret}"/>
</anypoint-mq:config>
<anypoint-mq:circuit-breaker
name="InvoiceProcess"
onErrorTypes="FTP:RETRY_EXHAUSTED, HTTP:SERVICE_UNAVAILABLE"
errorsThreshold="10"
tripTimeout="5"
tripTimeoutUnit="MINUTES"/>
<flow name="subscribe">
<anypoint-mq:subscriber
destination="${reservationsQueue}"
config-ref="Anypoint_MQ_Config"
circuitBreaker="GlobalCircuit"/>
<flow-ref name="invoiceProcess">
</flow>
<flow name="otherSubscribe">
<anypoint-mq:subscriber
destination="${paymentsQueue}"
config-ref="Anypoint_MQ_Config"
circuitBreaker="GlobalCircuit"/>
<flow-ref name="invoiceProcess">
</flow>
<sub-flow name="invoiceProcess">
<ftp:write path="${auditFolder}" config-ref="ftp-config"/>
<http:request config-ref="requestConfig" path="/external"/>
</sub-flow>
Publish Operation
In version 2.x of this connector, Publish operation properties are declared like this:
<anypoint-mq:publish config-ref="Anypoint_MQ_Default_subscriber"
destination="queue"
messageId="#[vars.messageId]" sendContentType="false">
<anypoint-mq:body >#[vars.messageBody]</anypoint-mq:body>
<anypoint-mq:properties >
<anypoint-mq:property key="MSG_TYPE" value="My Value"/>
</anypoint-mq:properties>
</anypoint-mq:publish>
In version 3.x, you declare the same properties as a dynamic map, subject to transformations and a dynamic number of keys, instead of having fixed keys for each message:
<anypoint-mq:publish config-ref="Anypoint_MQ_Config"
destination="queue"
messageId="#[vars.currentId]" sendContentType="false">
<anypoint-mq:body >#[vars.messageBody]</anypoint-mq:body>
<anypoint-mq:properties ><![CDATA[#[output application/java -----
{
"MSG_TYPE" : vars.msgType
}]]]></anypoint-mq:properties>
</anypoint-mq:publish>
This table lists the 2.x Publish operation and its 3.x equivalents.
Changes | Version 2.x | Version 3.x |
---|---|---|
Execution Type |
Blocking |
Non-Blocking |
Properties Parameter |
Fixed key-value map |
Dynamic "content" parameter map |
For more information, see Anypoint MQ Publish Operation
Consume Operation
This table lists the 2.x Consume operation and its 3.x equivalents.
Changes | Version 2.x | Version 3.x |
---|---|---|
Execution Type |
Blocking |
Non-Blocking |
Default Acknowledgment Mode |
MANUAL |
IMMEDIATE |
For more information, see Anypoint MQ Consume Operation.
ACK and NACK Operations
The ACK and NACK operations use the ackToken
string.
You can obtain ackToken
from the attributes
element. The ackToken
value is available only for messages that have a MANUAL
acknowledgment mode.
This table lists the 2.x ACK and NACK operations and their 3.x equivalents.
Version 2.x | Version 3.x |
---|---|
Operations received a |
The |
For more information, see Anypoint MQ ACK and NACK Operations.