Anypoint MQ Connector Reference — Mule 4
Support Version: Select
Anypoint MQ Connector Version 2.x
Anypoint MQ is a multi-tenant, cloud messaging service that enables customers to perform advanced asynchronous messaging among their applications.
Configurations
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Name |
String |
Specifies the name for this configuration. Connectors refer to the configuration by using this name. |
x |
|
Connection |
Specifies the connection types that can be provided to this configuration. |
x |
||
Expiration Policy |
Configures the minimum amount of time that a dynamic configuration instance can remain idle before the runtime considers it eligible for purging. The platform purges expired instances as needed to free resources. |
|||
Acknowledgement Mode |
Enumeration, SubscriberAckMode:
|
The acknowledgment mode to use for the messages retrieved by the subscribers. This mode affects only the Subscriber source and is unrelated to the Consume operation. Refer to SubscriberAckMode for more details about each strategy. |
AUTO |
|
Acknowledgement Timeout |
Number |
Specifies the duration that a message is held by a broker waiting for an Acknowledgment or Not Acknowledgment. If neither an Acknowledgment or a Not Acknowledgment occurs before the timeout is exceeded, then the message is automatically returned to the destination by the broker, making it again available to any subscriber for redelivery. By default, this timeout is decided by the subscriber, depending on which strategy is used. During polling, the ACK timeout is the one configured in the queue. When using the Prefetch strategy, messages are kept in a local buffer until they are dispatched to the flow. You can update the time to live (TTL) to avoid having messages be evicted. By default, Anypoint MQ fetches messages with a TTL of at least 2 minutes. The maximum value of this parameter is 12 hours. |
0 |
|
Polling Time |
Number |
When prefetch is disabled, the polling time defines how often the subscriber should check for new messages. By default, this checks for messages every one second (only if the prefetch is disabled). The value of this parameter is defined in milliseconds. |
1000 |
|
Max Redelivery |
Number |
Number of redeliveries to accept for any given message. When the message redelivery count is higher than this threshold, then the message is ignored by the subscriber and not dispatched to the flow. For the message to be removed from the queue, configure a dead letter queue from the admin console. |
-1 |
|
Fetch Size |
Number |
Maximum number of messages to receive on each request. The response can contain fewer messages than this number, depending on the load of the target queue. This number also affects the local buffer of the prefetched messages. Setting this fetchSize to zero disables message prefetch. The prefetch buffer has a target size of three times the fetchSize, meaning that requests to retrieve messages continue as long as the local buffer is not full. Only in the case of a slow consumer, where the Mule runtime engine has no more threads available to process the request, is the buffer limit going to be reached. Enable prefetch for maximum throughput, in cases where high performance on message delivery is required. In cases where controlled message retrieval is required, you can disable prefetch and use the fixed polling configuration. By default this tries to fetch the max number of 10 messages per request. |
10 |
|
Fetch Timeout |
Number |
Maximum duration in milliseconds to wait for the required amount of messages. If this duration elapses, the response is sent with as many messages as received during the period. |
1000 |
|
Frequency |
Number |
Fixed frequency in milliseconds at which time the retrieval of messages is attempted when there are no more messages to process locally. If a queue is empty and all the messages retrieved are already processed, this frequency defines how long to wait before attempting to retrieve more messages. By default, this frequency is 5 seconds (5000 milliseconds). |
5000 |
Connection Types
Connection
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
URL |
String |
The region URL where the queue resides. This URL can be obtained and configured from the Anypoint Platform > MQ console. Copy and paste the region URL into this field. |
|
|
Client App ID |
String |
In Anypoint Platform > MQ > Client Apps, click an app name (or create a new app), and click Copy for the Client App ID field. Paste this value in the Anypoint Studio Client App ID field. |
x |
|
Client Secret |
String |
In Anypoint Platform > MQ > Client Apps, click an app name (or create a new app), and click Copy for the Client Secret field. Paste this value in the Studio Client Secret field. |
x |
|
Proxy Config |
||||
TCP Client Socket Properties |
||||
TLS Context |
||||
Reconnection |
When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy. |
Ack
<anypoint-mq:ack>
Execute an Acknowledgment (ack) over a given Anypoint MQ Message Context object indicating that the message has been consumed correctly, and then delete the message from the in transit status.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
Name of the configuration to use. |
x |
|
Message Context |
AnypointMQMessageContext object that represents the received message. |
x |
||
Reconnection Strategy |
A retry strategy in case of connectivity errors. Anypoint MQ Connector doesn’t support this parameter. For more information, see Can I use a retry strategy with the Anypoint MQ connector? |
Consume
<anypoint-mq:consume>
Consume a message from an Anypoint MQ queue.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
Name of the configuration to use. |
x |
|
Destination |
String |
Name of the queue or message exchange from which to fetch a message. |
x |
|
Acknowledgement Mode |
Enumeration, one of:
|
Acknowledgment mode to use for the messages retrieved from this subscriber. Can only be 'MANUAL' or 'NONE'. |
MANUAL |
|
Polling Time |
Number |
How much time in milliseconds to wait if the requested messages are not ready to be consumed. |
10000 |
|
Acknowledgement Timeout |
Number |
Duration that a message is held by a broker waiting for an Acknowledgment (ack) or Not Acknowledgment (nack). After that duration expires, the message is again available to any subscriber. |
0 |
|
Output Mime Type |
String |
MIME type of this operation’s output. |
||
Output Encoding |
String |
The encoding of this operation’s output. |
||
Streaming Strategy |
|
Specifies to use repeatable streams. |
||
Target Variable |
String |
Name of a variable in which to place the operation’s output. |
||
Target Value |
String |
An expression to evaluate against the operation’s output and to store the outcome of that expression in the target variable. |
#[payload] |
Nack
<anypoint-mq:nack>
Execute a Not Acknowledgement (NACK) for an Anypoint MQ Message Context object and change the status of the message from In Flight to In Queue to be consumed again for a subscriber.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
Name of the configuration to use. |
x |
|
Message Context |
AnypointMQMessageContext object that represents the received message. |
x |
||
Reconnection Strategy |
A retry strategy in case of connectivity errors. Anypoint MQ Connector doesn’t support this parameter. For more information, see Can I use a retry strategy with the Anypoint MQ connector? |
Publish
<anypoint-mq:publish>
Publish a message to an Anypoint MQ queue or message exchange.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
Name of the configuration to use. |
x |
|
Destination |
String |
Name of the queue or message exchange from which to fetch a message. |
x |
|
Body |
Binary |
Body of the message. |
#[payload] |
|
Message Id |
String |
ID of the message to publish. |
||
Send Content Type |
Boolean |
Indicates whether the content type of the Mule message should be attached or not. |
true |
|
Properties |
Object |
Additional properties to be sent within the message. |
||
Output Mime Type |
String |
MIME type of this operation’s output. |
||
Output Encoding |
String |
The encoding of this operation’s output. |
||
Streaming Strategy |
|
Specifies whether to use repeatable streams and their behavior. |
||
Target Variable |
String |
Name of a variable in which to place the operation’s output. |
||
Target Value |
String |
An expression to evaluate against the operation’s output and to store the outcome of that expression in the target variable. |
#[payload] |
|
Reconnection Strategy |
A retry strategy in case of connectivity errors. Anypoint MQ Connector doesn’t support this parameter. For more information, see Can I use a retry strategy with the Anypoint MQ connector? |
Subscriber
<anypoint-mq:subscriber>
Retrieves messages from the specified destination.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
Name of the configuration to use. |
x |
|
Destination |
String |
Name of the queue from which messages are retrieved. |
x |
|
Output Mime Type |
String |
MIME type of this operation’s output. |
||
Output Encoding |
String |
The encoding of this operation’s output. |
||
Redelivery Policy |
Defines a policy for processing redelivery of the message. |
|||
Reconnection Strategy |
A retry strategy in case of connectivity errors. Anypoint MQ Connector doesn’t support this parameter. For more information, see Can I use a retry strategy with the Anypoint MQ connector? |
Types
SubscriberAckMode
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
IMMEDIATE |
String |
The message is acknowledged as soon as it is dispatched to the flow. If the processing of the message fails during the execution of the flow, it has to be handled by the application. |
||
AUTO |
String |
The message is acknowledged only if the flow that processes it finishes successfully. If an error is propagated during the execution of the flow, finalizing the execution with an error, then the subscriber listener NACKs the message, effectively returning it to the destination for redelivery. If the flow processing does not finish before the acknowledgment TTL is evicted, then the message is returned automatically to the destination by the broker for its redelivery. |
||
MANUAL |
String |
The subscriber does not handle any aspect of the acknowledgment of the message. Once the message is dispatched to the flow for processing, the application is in charge of performing either an Acknowledgment or a Not Acknowledgment. If none is performed before the acknowledgment TTL is evicted, then the message is returned automatically to the destination by the broker for its redelivery. |
Anypoint MQ Message Context
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
destination |
String |
Name of the destination from which the message was consumed. |
||
message |
Metadata for the message consumed. |
Anypoint MQ Message
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
id |
String |
The ID of the message. |
x |
|
lockId |
Number |
The ID used to update the lock on the message while it is In Flight status. |
x |
|
deliveryCount |
Number |
How many times was this message delivered. |
x |
|
properties |
Map |
Custom properties of the message, used to port extra information related to the message body. |
||
headers |
Map |
The headers of the message, containing information provided by the broker, such as creation time or delivery count. |
Proxy Parameter Group
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Host |
String |
|||
Port |
Number |
|||
Username |
String |
|||
Password |
String |
TCP Client Socket Parameter Group
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Send Buffer Size |
Number |
|||
Receive Buffer Size |
Number |
|||
Client Timeout |
Number |
|||
Send Tcp No Delay |
Boolean |
true |
||
Linger |
Number |
|||
Keep Alive |
Boolean |
false |
||
Connection Timeout |
Number |
30000 |
TLS
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Enabled Protocols |
String |
A comma-separated list of protocols enabled for this context. |
||
Enabled Cipher Suites |
String |
A comma-separated list of cipher suites enabled for this context. |
||
Trust Store |
||||
Key Store |
Trust Store
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Path |
String |
The location of the trust store, which is resolved relative to the current classpath and file system, if possible. |
||
Password |
String |
The password used to protect the trust store. |
||
Type |
String |
The type of store used. |
||
Algorithm |
String |
The algorithm used by the trust store. |
||
Insecure |
Boolean |
If true, no certificate validations are performed, rendering connections vulnerable to attacks. Use at your own risk. |
Key Store
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Path |
String |
The location of the key store, which resolves relative to the current classpath and file system, if possible. |
||
Type |
String |
The type of store used. |
||
Alias |
String |
When the key store contains many private keys, this attribute indicates the alias of the key that should be used. If not defined, the first key in the file is used by default. |
||
Key Password |
String |
The password used to protect the private key. |
||
Password |
String |
The password used to protect the key store. |
||
Algorithm |
String |
The algorithm used by the key store. |
Reconnection
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Fails Deployment |
Boolean |
When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy. |
||
Reconnection Strategy |
The reconnection strategy to use. Anypoint MQ Connector doesn’t support this parameter. For more information, see Can I use a retry strategy with the Anypoint MQ connector? |
Reconnect
Anypoint MQ Connector doesn’t support this parameter. For more information, see Can I use a retry strategy with the Anypoint MQ connector? |
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Frequency |
Number |
How often in milliseconds to reconnect. |
||
Count |
Number |
How many reconnection attempts to make. |
||
blocking |
Boolean |
If false, the reconnection strategy runs in a separate, non-blocking thread. |
true |
Reconnect Forever
Anypoint MQ Connector doesn’t support this parameter. For more information, see Can I use a retry strategy with the Anypoint MQ connector? |
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Frequency |
Number |
How often in milliseconds to reconnect. |
||
blocking |
Boolean |
If false, the reconnection strategy runs in a separate, non-blocking thread. |
true |
Expiration Policy
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Max Idle Time |
Number |
A scalar time value for the maximum amount of time a dynamic configuration instance should be allowed to be idle before it’s considered eligible for expiration. |
||
Time Unit |
Enumeration, one of:
|
A time unit that qualifies the maxIdleTime attribute. |
Redelivery Policy
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Max Redelivery Count |
Number |
The maximum number of times a message can be redelivered and processed unsuccessfully before triggering a process-failed-message. |
||
Use Secure Hash |
Boolean |
Whether to use a secure hash algorithm to identify a redelivered message. |
||
Message Digest Algorithm |
String |
The secure hashing algorithm to use. If not set, the default is SHA-256. |
||
Id Expression |
String |
Defines one or more expressions to use to determine when a message has been redelivered. This property may only be set if useSecureHash is false. |
||
Object Store |
ObjectStore |
The object store where the redelivery counter for each message is going to be stored. |
Repeatable In Memory Stream
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Initial Buffer Size |
Number |
The amount of memory to allocate to consume the stream and provide random access to it. If the stream contains more data than can fit into this buffer, the buffer expands according to the bufferSizeIncrement attribute, with an upper limit of maxInMemorySize. |
||
Buffer Size Increment |
Number |
By how much the buffer size expands if the buffer exceeds its initial size. Setting a value of zero or lower means that the buffer should not expand, meaning that a STREAM_MAXIMUM_SIZE_EXCEEDED error is raised when the buffer gets full. |
||
Max Buffer Size |
Number |
This is the maximum amount of memory to use. If more than that is used, then a STREAM_MAXIMUM_SIZE_EXCEEDED error is raised. A value lower or equal to zero means no limit. |
||
Buffer Unit |
Enumeration, one of:
|
The unit in which these attributes are expressed. |
Repeatable File Store Stream
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Max In Memory Size |
Number |
Defines the maximum memory that the stream should use to keep data in memory. If more than that is consumed, then the connector starts to buffer the content on disk. |
||
Buffer Unit |
Enumeration, one of:
|
The unit in which maxInMemorySize is expressed. |