String
AMQP Connector Reference - Mule 4
AMQP Connector v1.8
Release Notes: AMQP Connector Release Notes
Anypoint Connector for AMQP is an AMQP 0.9.1 compliant MuleSoft extension, and is used to consume and produce AMQP messages. The extension supports AMQP functionality including exchanges and queues, consumers, acknowledgment modes, and local transactions.
Configurations
Config
These are the base configuration parameters for AMQP Connector.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Name |
Name for this configuration. Connectors reference the configuration with this name. |
x |
||
Connection |
The connection types to provide to this configuration. |
x |
||
Encoding |
String |
The default encoding of the Message body to use if the message doesn’t communicate it. |
||
Content Type |
String |
The default contentType of the Message body to use if the message doesn’t communicate it |
/ |
|
Create Fallback Queue |
Boolean |
Whether to create non-existing queues according to the fallback definition if they are defined. |
true |
|
Create Fallback Exchange |
Boolean |
Whether to create non-existing exchanges according to the fallback definition if they are defined. |
true |
|
Send Correlation Id |
Enumeration, one of:
|
Whether to specify a correlationId when publishing messages. This applies both for custom correlation IDs specifies at the operation level and for default correlation IDs taken from the current event |
AUTO |
|
Expiration Policy |
Configures the minimum amount of time that a dynamic configuration instance can remain idle before Mule considers it eligible for expiration. |
|||
Ack Mode |
Enumeration, one of:
|
The acknowledgment mode to use when consuming from the AMQP broker. If a transaction is opened in the channel, message acknowledgment is handled automatically by commit, and recovery is handled automatically by rollback. |
IMMEDIATE |
|
No Local |
Boolean |
If set to true, the server does not send messages to the connection that published them. |
false |
|
Exclusive Consumers |
Boolean |
Set to true if the connector should only create exclusive consumers. Only the consumer created can access the queue. |
false |
|
Number Of Consumers |
Number |
It is the number of consumers spawned by message source to receive AMQP messages. Each consumer will create a channel. |
4 |
|
Delivery Mode |
Enumeration, one of:
|
The delivery mode to use when publishing to the AMQP broker. |
PERSISTENT |
|
Priority |
Number |
The priority to use when publishing to the AMQP broker. The priority has possible values from 0 to 9 - default is 0. |
0 |
|
Request Broker Confirms |
Boolean |
Whether it must fail in case no confirmation is provided. |
false |
|
Mandatory |
Boolean |
Whether the operation must fail if it cannot be routed to a queue. |
false |
|
Immediate |
Boolean |
Whether the operation must fail if it cannot be routed to a queue consumer immediately. |
false |
|
Returned Message Exchange |
String |
The exchange to publish returned messages. |
||
Prefetch Size |
Number |
This field defines a prefetch size window. The broker will send as much messages as possible without exceeding the prefetchSize window in octets (bytes). 0 is used for no specific limit. |
0 |
|
Prefetch Count |
Number |
Specifies a global prefetch window in terms of whole messages. This field may be used in combination with the prefetch-size field; a message will only be sent in advance if both prefetch windows allow it. 0 is used for no specific limit. |
0 |
Connection Types
Connection
Generic implementation of a AMQP ConnectionProvider.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Socket Configuration |
||||
TLS Configuration |
Reference to a TLS config element. This will enable secure connection to the AMQP broker. |
|||
Host |
String |
Host of the broker to connect to. |
x |
|
Port |
Number |
Port of the AMQP broker to connect to. |
||
Virtual Host |
String |
Virtual host to use in the AMQP broker. |
/ |
|
Username |
String |
Username to use when providing credentials for authentication. |
||
Password |
String |
Password to use when providing credentials for authentication. |
||
Use Tls |
Boolean |
Whether TLS is needed to use. In case it is not provided, the default for AMQP connection will be used. |
false |
|
Use Sni |
Boolean |
Whether Server Name Indication is needed to use in the ClientHello Message |
false |
|
Use Sasl |
Boolean |
Whether SASL (EXTERNAL) Auth mechanism is used (No username or password required). |
false |
|
Heartbeat Timeout |
Number |
The heartbeat timeout. Heartbeat frames is sent at about 1/2 the timeout interval. |
60 |
|
Handshake Timeout |
Number |
AMQP 0.9.1 timeout to set to the underlying AMQP connector. |
||
Handshake Timeout Time Unit |
Enumeration, one of:
|
Timeunit for the handshake timeout AMQP connection socket configuration. |
MILLISECONDS |
|
Fallback Addresses |
Array of Fallback Address |
The addresses of the broker nodes to attempt connection to, should the connection to main broker fail. |
||
Reconnection |
When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy |
Operations
Consume
<amqp:consume>
Enables you to consume a single AMQP message from a given queue.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
Name of the configuration to use. |
x |
|
Queue Name |
String |
The name of the queue from where the Message should be consumed |
x |
|
Content Type |
String |
The content type of the message body |
||
Encoding |
String |
The encoding of the message body |
||
Fallback Queue Definition |
The queue definition to use for queue declaration in case there is no queue with the queueName |
|||
Ack Mode |
Enumeration, one of:
|
The ACK mode to use when consuming a message |
||
Consumer Tag |
String |
The consumer tag to use for the consumer involved in the operation |
||
Maximum Wait |
Number |
Maximum time to wait for a message to arrive before timeout |
10000 |
|
Maximum Wait Unit |
Enumeration, one of:
|
Time unit to use in the maximumWaitTime configuration |
MILLISECONDS |
|
Create Fallback Queue |
Boolean |
|||
Transactional Action |
Enumeration, one of:
|
information of the current transaction in case it exists |
JOIN_IF_POSSIBLE |
|
Streaming Strategy |
|
Configures how Mule processes streams. Repeatable streams are the default behavior. |
||
Target Variable |
String |
Name of the variable that storesoperation’s output will be placed |
||
Target Value |
String |
An expression to evaluate against the operation’s output and store the expression outcome in the target variable |
|
|
Reconnection Strategy |
Retry strategy in case of connectivity errors. |
Publish
<amqp:publish>
Enables you to publish a single AMQP message to a given exchange.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
Name of the configuration to use. |
x |
|
Exchange Name |
String |
The name of the exchange to publish the message to |
||
Routing Keys |
Array of String |
List of routing keys |
||
Delivery Mode |
Enumeration, one of:
|
The delivery mode to use when publishing to the AMQP broker |
||
Transactional Action |
Enumeration, one of:
|
information of the current transaction in case it exists |
JOIN_IF_POSSIBLE |
|
Send Correlation Id |
Enumeration, one of:
|
options on whether to include an outbound correlation ID or not |
||
Create Fallback Exchange |
Boolean |
|||
Skip Exchange Validation |
Enumeration, one of:
|
Skips the exchange validation. |
||
Fallback Exchange Definition |
The exchange to use for exchange declaration in case there is no exchange with the exchangeName |
|||
Body |
Any |
The body of the Message |
|
|
AMQP Properties |
Properties of the message. |
|||
Headers |
Object |
The custom user properties that should be set to this AmqpMessage |
||
Request Broker Confirms |
Boolean |
Expects a confirmation from a message published. An exception is raised in case no confirmation is provided in case this attribute is set to true. |
false |
|
Mandatory |
Boolean |
Tells the server how to react if the message cannot be routed to a queue. If set to true, the server throws an exception (UNROUTABLE_MESSAGE, see below in the operation spec) for any message that cannot be routed. If set to false, the server silently drops the message. |
false |
|
Immediate |
Boolean |
Tells the server how to react if the message cannot be routed to a queue consumer immediately. If set to true, the server throws an exception (UNROUTABLE_MESSAGE, see below in the operation spec) for any undeliverable message. If set to false, the server queues the message, but with no guarantee that the message will ever be consumed. |
false |
|
Returned Message Exchange |
String |
The exchange to publish returned messages. |
||
Reconnection Strategy |
Retry strategy in case of connectivity errors. |
Publish Consume
<amqp:publish-consume>
Enables you to send a message to a AMQP exchange and waits for a response either to the provided replyTo destination or to a temporary destination created dynamically.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
Name of the configuration to use. |
x |
|
Exchange Name |
String |
The name of the exchange to publish the message to |
x |
|
Content Type |
String |
The content type of the message body |
||
Encoding |
String |
The encoding of the message body |
||
Routing Key |
String |
The routing key to publish to |
||
Delivery Mode |
Enumeration, one of:
|
The delivery mode to use when publishing to the AMQP broker |
||
Maximum Wait |
Number |
The maximum time to wait for a message to arrive before timeout |
10000 |
|
Maximum Wait Unit |
Enumeration, one of:
|
The time unit to use in the maximumWaitTime configuration |
MILLISECONDS |
|
Transactional Action |
Enumeration, one of:
|
information of the current transaction in case it exists |
JOIN_IF_POSSIBLE |
|
Send Correlation Id |
Enumeration, one of:
|
options on whether to include an outbound correlation ID or not |
||
Create Fallback Exchange |
Boolean |
|||
Streaming Strategy |
|
Configures how Mule processes streams. Repeatable streams are the default behavior. |
||
Skip Exchange Validation |
Enumeration, one of:
|
This field is intended to skip exchange validation by default the operation will do the validation if the parameter is not configured. If you decide to skip the exchange validation it will improve the performance but if the exchange does not exist you can incur in messaging losing depending on the broker configuration. |
||
Fallback Exchange Definition |
The exchange to use for exchange declaration in case there is no exchange with the exchangeName |
|||
Body |
Any |
The body of the Message |
|
|
AMQP Properties |
Properties of the message. |
|||
Headers |
Object |
The custom user properties that should be set to this AmqpMessage |
||
Request Broker Confirms |
Boolean |
Expects a confirmation from a message published. An exception is raised in case no confirmation is provided in case this attribute is set to true. |
false |
|
Mandatory |
Boolean |
Tells the server how to react if the message cannot be routed to a queue. If set to true, the server throws an exception (UNROUTABLE_MESSAGE, see below in the operation spec) for any message that cannot be routed. If set to false, the server silently drops the message. |
false |
|
Immediate |
Boolean |
Tells the server how to react if the message cannot be routed to a queue consumer immediately. If set to true, the server throws an exception (UNROUTABLE_MESSAGE, see below in the operation spec) for any undeliverable message. If set to false, the server queues the message, but with no guarantee that the message will ever be consumed. |
false |
|
Returned Message Exchange |
String |
The exchange to publish returned messages. |
||
Target Variable |
String |
Name of the variable that storesoperation’s output will be placed |
||
Target Value |
String |
An expression to evaluate against the operation’s output and store the expression outcome in the target variable |
|
|
Reconnection Strategy |
Retry strategy in case of connectivity errors. |
Ack
<amqp:ack>
Enables to ack a message with the delivery tag.
Sources
Listener
<amqp:listener>
AMQP listener for queues, enables you to listen for incoming messages.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
Name of the configuration to use. |
x |
|
Queue Name |
String |
Name of the queue to consume from. |
x |
|
Fallback Queue Definition |
Declaration of a queue definition to use in case no queue with the queueName provided exists in the broker. In case the queue with provided queueName exists, it will be used as is, ignoring the fallback. In case the queue does not exist, a new queue shall be created according to the referenced definition. |
|||
Ack Mode |
Enumeration, one of:
|
The acknowledgment mode to use when consuming from the AMQP broker. |
||
Number Of Consumers |
Number |
The number of channels that are spawned per inbound endpoint to receive AMQP messages. |
||
Consumer Tag |
String |
A client-generated consumer tag to establish context. |
||
Recover Strategy |
Enumeration, one of:
|
Valid values for the recoverStrategy option are: NONE, NO_REQUEUE and REQUEUE. |
REQUEUE |
|
Inbound Encoding |
String |
The default encoding of the message body to use if the message doesn’t communicate it |
||
Inbound Content Type |
String |
The default contentType of the Message body to use if the message doesn’t communicate it |
||
Create Fallback Queue |
Boolean |
Whether non existing queues will be created according to the fallback definition or an error is raised if they do not exist. This can be disabled in the mule app is not intended to change the AMQP topography. |
||
Transactional Action |
Enumeration, one of:
|
The type of beginning action that sources can take regarding transactions. |
NONE |
|
Transaction Type |
Enumeration, one of:
|
The type of transaction to create. Availability depends on Mule version. |
LOCAL |
|
Primary Node Only |
Boolean |
Determines whether to execute this source on only the primary node when running Mule instances in a cluster. |
||
Streaming Strategy |
|
Configures how Mule processes streams. Repeatable streams are the default behavior. |
||
Redelivery Policy |
Defines a policy for processing the redelivery of the same message. |
|||
Prefetch Size |
Number |
This field defines a prefetch size window. The broker will send as much messages as possible without exceeding the prefetchSize window in octets (bytes). 0 is used for no specific limit. |
||
Prefetch Count |
Number |
Specifies a global prefetch window in terms of whole messages. This field may be used in combination with the prefetch-size field; a message will only be sent in advance if both prefetch windows allow it. 0 is used for no specific limit. |
||
Reconnection Strategy |
Retry strategy in case of connectivity errors. |
|||
Body |
Any |
The body of the Message |
|
|
AMQP Properties |
Properties of the message. |
|||
Headers |
Object |
The custom user properties that should be set to this AmqpMessage |
||
Delivery Mode |
Enumeration, one of:
|
The delivery mode to use when publishing to the AMQP broker. |
PERSISTENT |
|
Priority |
Number |
The priority to use when publishing to the AMQP broker. The priority has possible values from 0 to 9 - default is 0. |
0 |
|
Request Broker Confirms |
Boolean |
Whether it must fail in case no confirmation is provided in case this attribute is set to true." |
false |
|
Mandatory |
Boolean |
Whether the operation must fail if it cannot be routed to a queue. |
false |
|
Immediate |
Boolean |
Whether the operation must fail if it cannot be routed to a queue consumer immediately. |
false |
|
Returned Message Exchange |
String |
The exchange to publish returned messages. |
Types
Socket Configuration
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Keep Alive |
Boolean |
Keep alive to set to the underlying AMQP connector |
false |
|
So Timeout Time Unit |
Enumeration, one of:
|
Timeunit for the SO_TIMEOUT AMQP connection socket configuration. |
MILLISECONDS |
|
So Timeout |
Number |
SO_TIMEOUT to set to the underlying AMQP connector. |
||
Receive Buffer Size |
Number |
Receive buffer size to set to the underlying AMQP connector |
||
Send Buffer Size |
Number |
Send buffer size to set to the underlying AMQP connector |
TLS
Configures TLS to provide secure communications for the Mule app.
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Enabled Protocols |
String |
Comma-separated list of protocols enabled for this context. |
||
Enabled Cipher Suites |
String |
Comma-separated list of cipher suites enabled for this context. |
||
Trust Store |
Configures the TLS truststore. |
|||
Key Store |
Configures the TLS keystore. |
|||
Revocation Check |
Truststore
Configures the truststore for TLS.
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Path |
String |
Path to the truststore. Mule resolves the path relative to the current classpath and file system. |
||
Password |
String |
Password used to protect the truststore. |
||
Type |
String |
Type of store. |
||
Algorithm |
String |
Encryption algorithm that the truststore uses. |
||
Insecure |
Boolean |
If |
Keystore
Configures the keystore for the TLS protocol. The keystore you generate contains a private key and a public certificate.
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Path |
String |
Path to the keystore. Mule resolves the path relative to the current classpath and file system. |
||
Type |
String |
Type of store. |
||
Alias |
String |
Alias of the key to use when the keystore contains multiple private keys. By default, Mule uses the first key in the file. |
||
Key Password |
String |
Password used to protect the private key. |
||
Password |
String |
Password used to protect the keystore. |
||
Algorithm |
String |
Encryption algorithm that the keystore uses. |
Standard Revocation Check
Configures standard revocation checks for TLS certificates.
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Only End Entities |
Boolean |
Which elements to verify in the certificate chain:
Verify only the last element in the certificate chain.
Verify all elements in the certificate chain. |
||
Prefer Crls |
Boolean |
How to check certificate validity:
Check the Certification Revocation List (CRL) for certificate validity.
Use the Online Certificate Status Protocol (OCSP) to check certificate validity. |
||
No Fallback |
Boolean |
Whether to use the secondary method to check certificate validity:
Use the method that wasn’t specified in the Prefer Crls field (the secondary method) to check certificate validity.
Do not use the secondary method to check certificate validity. |
||
Soft Fail |
Boolean |
What to do if the revocation server can’t be reached or is busy:
Avoid verification failure.
Allow the verification to fail. |
Custom OCSP Responder
Configures a custom OCSP responder for certification revocation checks.
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Url |
String |
URL of the OCSP responder. |
||
Cert Alias |
String |
Alias of the signing certificate for the OCSP response. If specified, the alias must be in the truststore. |
CRL File
Specifies the location of the certification revocation list (CRL) file.
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Path |
String |
Path to the CRL file. |
Fallback Address
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Host |
String |
Host of the broker to connect to. |
x |
|
Port |
String |
Port of the AMQP broker to connect to. |
Reconnection
Configures a reconnection strategy for an operation.
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Fails Deployment |
Boolean |
When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy. |
||
Reconnection Strategy |
Reconnection strategy to use. |
Reconnect
Configures a standard reconnection strategy, which specifies how often to reconnect and how many reconnection attempts the connector source or operation can make.
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Frequency |
Number |
How often to attempt to reconnect, in milliseconds. |
||
Count |
Number |
How many reconnection attempts the Mule app can make. |
Reconnect Forever
Configures a forever reconnection strategy by which the connector source or operation attempts to reconnect at a specified frequency for as long as the Mule app runs.
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Frequency |
Number |
How often to attempt to reconnect, in milliseconds. |
Expiration Policy
Configures an expiration policy strategy.
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Max Idle Time |
Number |
Configures the maximum amount of time that a dynamic configuration instance can remain idle before Mule considers it eligible for expiration. |
||
Time Unit |
Enumeration, one of:
|
Time unit for the Max Idle Time field. |
AMQP Attributes
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Envelope |
Encapsulates a group of parameters used for AMQP’s Basic methods |
x |
||
Properties |
AMQP Message Properties |
x |
||
Headers |
Object |
AMQP Message parameters |
x |
|
Ack Id |
String |
The channel ACK ID required to ACK a the current Message if one is available, or null otherwise. |
Envelope
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Delivery Tag |
Number |
|||
Redeliver |
Boolean |
false |
||
Exchange |
String |
|||
Routing Key |
String |
Properties
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Content Type |
String |
|||
Content Encoding |
String |
|||
Delivery Mode |
Enumeration, one of:
|
PERSISTENT |
||
Priority |
Number |
|||
Correlation Id |
String |
|||
Reply To |
String |
|||
Expiration |
String |
|||
Message Id |
String |
|||
Timestamp |
Date |
|||
Type |
String |
|||
User Id |
String |
|||
App Id |
String |
|||
Cluster Id |
String |
Queue Definition
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Removal Strategy |
Enumeration, one of:
|
Defines when the declared queue must be removed from the broker. |
EXPLICIT |
|
Exchange To Bind |
String |
Defines the exchange to bind the queue to. |
||
Binding Routing Key |
String |
Defines the routing key to use in the binding of the exchange. |
Repeatable In Memory Stream
Configures the in-memory streaming strategy by which the request fails if the data exceeds the MAX buffer size. Always run performance tests to find the optimal buffer size for your specific use case.
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Initial Buffer Size |
Number |
Initial amount of memory to allocate to the data stream. If the streamed data exceeds this value, the buffer expands by Buffer Size Increment, with an upper limit of Max In Memory Size value. |
||
Buffer Size Increment |
Number |
This is by how much the buffer size expands if it exceeds its initial size. Setting a value of zero or lower means that the buffer should not expand, meaning that a STREAM_MAXIMUM_SIZE_EXCEEDED error is raised when the buffer gets full. |
||
Max Buffer Size |
Number |
Maximum size of the buffer. If the buffer size exceeds this value, Mule raises a |
||
Buffer Unit |
Enumeration, one of:
|
Unit for the Initial Buffer Size, Buffer Size Increment, and Buffer Unit fields. |
Repeatable File Store Stream
Configures the repeatable file-store streaming strategy by which Mule keeps a portion of the stream content in memory. If the stream content is larger than the configured buffer size, Mule backs up the buffer’s content to disk and then clears the memory.
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
In Memory Size |
Number |
Maximum amount of memory that the stream can use for data. If the amount of memory exceeds this value, Mule buffers the content to disk. To optimize performance:
|
||
Buffer Unit |
Enumeration, one of:
|
Unit for the In Memory Size field. |
Redelivery Policy
Configures the redelivery policy for executing requests that generate errors. You can add a redelivery policy to any source in a flow.
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Max Redelivery Count |
Number |
Maximum number of times that a redelivered request can be processed unsuccessfully before returning a REDELIVERY_EXHAUSTED error. |
||
Use Secure Hash |
Boolean |
If |
||
Message Digest Algorithm |
String |
Secure hashing algorithm to use if the Use Secure Hash field is |
||
Id Expression |
String |
One or more expressions that determine when a message was redelivered. You can set this property only if the Use Secure Hash field is |
||
Object Store |
Object Store |
Configures the object store that stores the redelivery counter for each message. |
AMQP Properties
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Content Type |
String |
The content type of the body |
/ |
|
Content Encoding |
String |
The Content Encoding of the published Message |
||
Priority |
Number |
The priority to use when publishing to the AMQP broker. The priority has possible values from 0 to 9 - default is 0. |
0 |
|
Correlation Id |
String |
Used in case of implementation of RPC pattern to distinguish among messages in a request-reply. |
||
Message Id |
String |
|||
Reply To |
String |
Destination set in case of RPC. |
||
Expiration |
Number |
Expiration in miliseconds for the message. |
||
Expiration Time Unit |
Enumeration, one of:
|
MILLISECONDS |
||
User Id |
String |
|||
App Id |
String |
|||
Cluster Id |
String |
|||
Timestamp |
Date |
|||
Type |
String |
Type of the consumed message |
Exchange Defintiion
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Removal Strategy |
Enumeration, one of:
|
Defines when the declared exchange must be removed from the broker. |
EXPLICIT |
|
Type |
Enumeration, one of:
|
The type of exchange to be declared. |
FANOUT |