String
AMQP Connector Reference - Mule 4
AMQP Connector v1.7
Release Notes: AMQP Connector Release Notes
Anypoint Connector for AMQP is an AMQP 0.9.1 compliant MuleSoft extension, and is used to consume and produce AMQP messages. The extension supports AMQP functionality including exchanges and queues, consumers, acknowledgment modes, and local transactions.
Configurations
Config
These are the base configuration parameters for AMQP Connector.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Name |
The name for this configuration. Connectors reference the configuration with this name. |
x |
||
Encoding |
String |
The default encoding of the Message body to use if the message doesn’t communicate it. |
||
Content Type |
String |
The default content type of the Message body to use if the message doesn’t communicate it. |
|
|
Create Fallback Queue |
String |
Whether to create non-existing queues according to the fallback definition if they are defined. |
|
|
Create Fallback Exchange |
String |
Whether to create non-existing exchanges according to the fallback definition if they are defined. |
|
Configuration for Connection.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Host |
String |
Host of the broker to connect to. |
x |
|
Port |
Number |
Port of the AMQP broker to connect to. |
|
|
Username |
String |
Username to use when providing credentials for authentication. |
||
Password |
String |
Password to use when providing credentials for authentication. |
||
useTls |
Boolean |
Whether TLS is needed. If it is not provided, the default for the AMQP connection is used. |
|
|
useSNI |
Boolean |
Whether the Server Name Indication is needed to be used in the ClientHello message. |
|
x |
useSasl |
Boolean |
Whether the SASL (EXTERNAL) authentication mechanism is used (No username or password required). |
|
x |
heartbeatTimeout |
Boolean |
The heartbeat timeout. Heartbeat frames are sent at about half the timeout interval. (v1.2.0 and later) |
|
|
handshakeTimeoutTimeUnit |
Enumeration, one of:
|
Time unit for the handshake timeout AMQP connection socket configuration. (v1.5.0 and later) |
MILLISECONDS |
|
handshakeTimeout |
Integer |
AMQP 0.9.1 timeout to set to the underlying AMQP connector. (v1.5.0 and later) |
Default by AMQP client (10000 ms) |
|
fallbackAddresses |
List of fallback addresses (host, port) |
The addresses of the broker nodes to which to attempt to connect in case the connection to the main broker fails. (v1.3.0 and later) |
No fallback addresses |
Username and Password are optional and their presence is validated when useSasl is deactivated. UseTLS is required for enabling useSasl,
Configuration for Consumers.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Ack Mode |
Enumeration, one of:
|
The ConsumerAckMode to use when consuming a message. Can be overridden at the message source level. |
IMMEDIATE |
|
No Local |
Boolean |
If set to |
|
|
Exclusive Consumers |
Boolean |
Set to |
|
|
Number of Consumers |
Integer |
It is the number of consumers spawned by message source to receive AMQP messages. Each consumer creates a channel. |
|
Configuration for Publishers.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Delivery Mode Mode |
Enumeration, one of:
|
The delivery mode use when publishing to the AMQP broker. |
|
|
Priority |
Integer |
The priority to use when publishing to the AMQP broker. |
|
|
Request Broker Confirms |
Boolean |
Whether it must fail in case no confirmation is provided. |
|
|
Mandatory |
Boolean |
Whether the operation must fail if the message cannot be routed to a queue. |
|
|
Immediate |
Boolean |
Whether the operation must fail if the message cannot be routed to a queue consumer immediately. |
|
|
Returned Message Exchange |
String |
The exchange to publish returned messages. |
Configuration for Quality of Service.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Prefetch Size |
Integer |
This field defines a prefetch size window. The broker sends as many messages as possible without exceeding the prefetchSize window in octets (bytes). Use |
|
|
Prefetch Count |
Integer |
Specifies a global prefetch window in terms of whole messages. Use this field in combination with the prefetch-size field. A message is sent in advance only if both prefetch windows allow it. Use |
|
Socket configuration for AMQP frame handler.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Keep Alive |
Boolean |
Keep alive to set to the underlying AMQP connector (v1.5.0 and later) |
|
|
soTimeoutTimeUnit |
Enumeration, one of:
|
Timeunit for the SO_TIMEOUT AMQP connection socket configuration (v1.5.0 and later) |
MILLISECONDS |
|
soTimeout |
Integer |
SO_TIMEOUT to set to the underlying AMQP connector (v1.5.0 and later) |
Default in user environment |
|
receiveBufferSize |
Integer |
Receive buffer size to set to the underlying AMQP connector (v1.5.0 and later) |
Default in user environment |
|
sendBufferSize |
Integer |
Send buffer size to set to the underlying AMQP connector (v1.5.0 and later) |
Default in user environment |
Sources
Consume
<amqp:consume>
Operation that allows the user to consume a single AMQP message from a given queue.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
The name of the configuration to use. |
x |
|
Queue name |
String |
The name of the queue from where to consume the message. |
x |
|
Content Type |
String |
The message’s content type. |
||
Encoding |
String |
The message’s content encoding. |
||
Fallback Queue Definition |
Definition of a Queue |
The queue definition to use for queue declaration in case there is no queue with the queueName. |
||
Ack Mode |
Enumeration, one of:
|
The ConsumerAckMode to configure for the message and session. |
||
Maximum Wait |
Number |
Maximum time to wait for a message before timing out. |
|
|
Maximum Wait Unit |
Enumeration, one of:
|
Time unit to use in the Maximum wait time configurations. |
|
|
Transactional Action |
Enumeration, one of:
|
The type of joining action that operations can take regarding transactions. |
JOIN_IF_POSSIBLE |
|
Reconnection Strategy |
A retry strategy in case of connectivity errors. |
Publish
<amqp:publish>
Operation that allows the user to publish a single AMQP message to a given exchange.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
The name of the configuration to use. |
x |
|
Exchange Name |
String |
The name of the exchange to publish the message to. |
x |
|
Fallback Exchange Definition |
Definition of an Exchange |
The exchange to use for exchange declaration in case there is no exchange with the exchangeName. |
||
Routing Keys |
LIST |
List of routing keys |
||
Delivery Mode |
Enumeration, one of:
|
The delivery mode use when publishing to the AMQP broker. |
|
|
Correlation Id |
String |
The AMQPCorrelationID header of the Message. |
||
ContentType |
String |
The content type of the body. |
||
Encoding |
String |
The outboundEncoding of the message’s body. |
||
Reply To |
String |
The AMQP |
||
User Properties |
Object |
The custom user properties to set for this message. Each property is merged with other default AMQP user properties. All AMQP user properties are set at once in a single object. You can write this object as a DataWeave object, such as |
||
Reconnection Strategy |
A retry strategy in case of connectivity errors. |
Publish Consume
<amqp:publish-consume>
Send a message to an AMQP Exchange and wait for a response either to the provided replyTo destination or to a temporary destination created dynamically.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
The name of the configuration to use. |
x |
|
Exchange Name |
String |
The name of the exchange to publish the message to. |
x |
|
Correlation Id |
String |
The AMQPCorrelationID header of the message. |
||
ContentType |
String |
The content type of the body. |
|
|
Encoding |
String |
The outboundEncoding of the message’s body. |
||
User Properties |
Object |
The custom user properties that should be set for this message. Each property is merged with other default AMQP user properties. All AMQP user properties are set at once in a single object. You can write this object as a DataWeave object, such as |
||
Maximum Wait |
Number |
Maximum time to wait for a message before timing out. |
|
|
Maximum Wait Unit |
Enumeration, one of:
|
Time unit to use in the maximumWaitTime configurations. |
MILLISECONDS |
|
Reconnection Strategy |
A retry strategy in case of connectivity errors. |
Sources
Listener
<amqp:listener>
AMQP Listener for queues to listen for incoming messages.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
The name of the configuration to use. |
x |
|
Queue Name |
String |
Name of the queue to consume from. |
x |
|
Ack Mode |
Enumeration, one of:
|
The Ack mode to use when consuming a message. Overridden if Transactional action is set to |
EMPTY |
|
Number Of consumers |
Number |
The number of concurrent consumers to use to receive for AMQP messages. |
4 |
|
Consumer Tag |
String |
A client-generated consumer tag to establish context. |
|
|
Recovery Strategy |
Enumeration, one of:
|
Strategy to use when a channel-recover or a rollback is performed. |
|
|
Inbound content type |
String |
The content type of the message body. |
||
Inbound encoding |
String |
The inboundEncoding of the message body. |
Types
Redelivery Policy
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Max Redelivery Count |
Number |
The maximum number of times a message can be redelivered and processed unsuccessfully before triggering a process-failed-message. |
||
Use Secure Hash |
Boolean |
Whether to use a secure hash algorithm to identify a redelivered message. |
||
Message Digest Algorithm |
String |
The secure hashing algorithm to use. If not set. |
SHA-256 |
|
Id Expression |
String |
Defines one or more expressions to use to determine when a message has been redelivered. This property can be used only if useSecureHash is |
||
Object Store |
ObjectStore |
The object store where the redelivery counter for each message is stored. |
Queue Definition
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Removal Strategy |
Enumeration, one of:
|
Defines when the declared queue must be removed from the broker. |
|
|
Exchange to Bind |
String |
Defines the exchange to bind the queue to. |
||
Binding Routing Key |
String |
Defines the routing key to use in the binding to the exchange. (v1.4.0 and later) |
Exchange Definition
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Removal Strategy |
Enumeration, one of:
|
Defines when the declared exchange must be removed from the broker. |
|
|
Exchange Type |
Enumeration, one of:
|
The type of the exchange to be declared |
|
Properties
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Content Type |
String |
The content type of the message. |
||
Content Encoding |
String |
Content encoding of the message. |
||
Priority |
Number |
The priority to use when publishing to the AMQP broker. |
||
Correlation Id |
String |
Used in case of implementation of RPC pattern to distinguish among messages in a request-reply. |
||
Message Id |
String |
The message ID of the message. |
||
Reply To |
String |
Destination set in case of RPC. |
||
Expiration |
String |
Specify the expiration time for the message. |
||
Expiration time unit |
Specify the unit of time for the message expiration time. |
User Id |
||
String |
User ID of the message. |
App Id |
||
String |
App ID of the message. |
Cluster Id |
||
String |
Cluster ID of the message. |
Timestamp |
||
TIMESTAMP |
Timestamp of the consumed message. |
Type |