Contact Us 1-800-596-4880

AMQP Connector Reference - Mule 4

AMQP Connector v1.6

Anypoint Connector for AMQP is an AMQP 0.9.1 compliant MuleSoft extension, and is used to consume and produce AMQP messages. The extension supports AMQP functionality including exchanges and queues, consumers, acknowledgment modes, and local transactions.

Configurations


Config

These are the base configuration parameters for AMQP Connector.

Parameters

Name Type Description Default Value Required

Name

String

The name for this configuration. Connectors reference the configuration with this name.

x

Encoding

String

The default encoding of the Message body to use if the message doesn’t communicate it.

Content Type

String

The default content type of the Message body to use if the message doesn’t communicate it.

/

Create Fallback Queue

String

Whether to create non-existing queues according to the fallback definition if they are defined.

true

Create Fallback Exchange

String

Whether to create non-existing exchanges according to the fallback definition if they are defined.

true

Configuration for Connection.

Parameters

Name Type Description Default Value Required

Host

String

Host of the broker to connect to.

x

Port

Number

Port of the AMQP broker to connect to.

  • 5672 for a non-secure connection

  • 5671 for a secure connection

Username

String

Username to use when providing credentials for authentication.

x

Password

String

Password to use when providing credentials for authentication.

x

useTls

Boolean

Whether TLS is needed. If it is not provided, the default for the AMQP connection is used.

false

heartbeatTimeout

Boolean

The heartbeat timeout. Heartbeat frames are sent at about half the timeout interval. (v1.2.0 and later)

60

handshakeTimeoutTimeUnit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the handshake timeout AMQP connection socket configuration. (v1.5.0 and later)

MILLISECONDS

handshakeTimeout

Integer

AMQP 0.9.1 timeout to set to the underlying AMQP connector. (v1.5.0 and later)

Default by AMQP client (10000 ms)

fallbackAddresses

List of fallback addresses (host, port)

The addresses of the broker nodes to which to attempt to connect in case the connection to the main broker fails. (v1.3.0 and later)

No fallback addresses

Configuration for Consumers.

Parameters

Name Type Description Default Value Required

Ack Mode

Enumeration, one of:

  • IMMEDIATE

  • AUTO

  • MANUAL

The ConsumerAckMode to use when consuming a message. Can be overridden at the message source level.

IMMEDIATE

No Local

Boolean

If set to true, the server does not send messages to the connection that published them.

false

Exclusive Consumers

Boolean

Set to true if the connector should create only exclusive consumers, which means that only the created consumer can access the queue.

false

Number of Consumers

Integer

It is the number of consumers spawned by message source to receive AMQP messages. Each consumer creates a channel.

4

Configuration for Publishers.

Parameters

Name Type Description Default Value Required

Delivery Mode Mode

Enumeration, one of:

  • PERSISTENT

  • TRANSIENT

The delivery mode use when publishing to the AMQP broker.

PERSISTENT

Priority

Integer

The priority to use when publishing to the AMQP broker.

0

Request Broker Confirms

Boolean

Whether it must fail in case no confirmation is provided.

false

Mandatory

Boolean

Whether the operation must fail if the message cannot be routed to a queue.

false

Immediate

Boolean

Whether the operation must fail if the message cannot be routed to a queue consumer immediately.

false

Returned Message Exchange

String

The exchange to publish returned messages.

Configuration for Quality of Service.

Parameters

Name Type Description Default Value Required

Prefetch Size

Integer

This field defines a prefetch size window. The broker sends as many messages as possible without exceeding the prefetchSize window in octets (bytes). Use 0 for no specific limit.

0

Prefetch Count

Integer

Specifies a global prefetch window in terms of whole messages. Use this field in combination with the prefetch-size field. A message is sent in advance only if both prefetch windows allow it. Use 0 for no specific limit.

0

Socket configuration for AMQP frame handler.

Parameters

Name Type Description Default Value Required

Keep Alive

Boolean

Keep alive to set to the underlying AMQP connector (v1.5.0 and later)

false

soTimeoutTimeUnit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Timeunit for the SO_TIMEOUT AMQP connection socket configuration (v1.5.0 and later)

MILLISECONDS

soTimeout

Integer

SO_TIMEOUT to set to the underlying AMQP connector (v1.5.0 and later)

Default in user environment

receiveBufferSize

Integer

Receive buffer size to set to the underlying AMQP connector (v1.5.0 and later)

Default in user environment

sendBufferSize

Integer

Send buffer size to set to the underlying AMQP connector (v1.5.0 and later)

Default in user environment

Sources

Consume

<amqp:consume>

Operation that allows the user to consume a single AMQP message from a given queue.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x

Queue name

String

The name of the queue from where to consume the message.

x

Content Type

String

The message’s content type.

Encoding

String

The message’s content encoding.

Fallback Queue Definition

Definition of a Queue

The queue definition to use for queue declaration in case there is no queue with the queueName.

Ack Mode

Enumeration, one of:

  • IMMEDIATE

  • MANUAL

The ConsumerAckMode to configure for the message and session.

Maximum Wait

Number

Maximum time to wait for a message before timing out.

10000

Maximum Wait Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit to use in the Maximum wait time configurations.

MILLISECONDS

Transactional Action

Enumeration, one of:

  • ALWAYS_JOIN

  • JOIN_IF_POSSIBLE

  • NOT_SUPPORTED

The type of joining action that operations can take regarding transactions.

JOIN_IF_POSSIBLE

Reconnection Strategy

A retry strategy in case of connectivity errors.

Output

Type

Any

Attributes Type

For Configurations

Throws

  • AMQP:CONNECTIVITY

  • AMQP:CONSUMING

  • AMQP:CREATION_NOT_ALLOWED

  • AMQP:QUEUE_NOT_FOUND

  • AMQP:RETRY_EXHAUSTED

  • AMQP:TIMEOUT

Publish

<amqp:publish>

Operation that allows the user to publish a single AMQP message to a given exchange.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x

Exchange Name

String

The name of the exchange to publish the message to.

x

Fallback Exchange Definition

Definition of an Exchange

The exchange to use for exchange declaration in case there is no exchange with the exchangeName.

Routing Keys

LIST

List of routing keys

Delivery Mode

Enumeration, one of:

  • PERSISTENT

  • TRANSIENT

The delivery mode use when publishing to the AMQP broker.

PERSISTENT

Correlation Id

String

The AMQPCorrelationID header of the Message.

ContentType

String

The content type of the body.

Encoding

String

The outboundEncoding of the message’s body.

Reply To

String

The AMQP replyTo property information of the queue where this message should be replied to.

User Properties

Object

The custom user properties to set for this message. Each property is merged with other default AMQP user properties. All AMQP user properties are set at once in a single object. You can write this object as a DataWeave object, such as #[output application/json --- { userName: vars.user, appName: 'myApp'}]. Each key/value from the user properties object is then set as a separate AMQP user property.

Reconnection Strategy

A retry strategy in case of connectivity errors.

For Configurations

Throws

  • AMQP:CREATION_NOT_ALLOWED

  • AMQP:ILLEGAL_BODY

  • AMQP:PUBLISHING

  • AMQP:RETRY_EXHAUSTED

  • AMQP:UNROUTABLE_MESSAGE

Publish Consume

<amqp:publish-consume>

Send a message to an AMQP Exchange and wait for a response either to the provided replyTo destination or to a temporary destination created dynamically.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x

Exchange Name

String

The name of the exchange to publish the message to.

x

Correlation Id

String

The AMQPCorrelationID header of the message.

ContentType

String

The content type of the body.

/

Encoding

String

The outboundEncoding of the message’s body.

User Properties

Object

The custom user properties that should be set for this message. Each property is merged with other default AMQP user properties. All AMQP user properties are set at once in a single object. You can write this object as a DataWeave object, such as #[output application/json --- { userName: vars.user, appName: 'myApp'}]. Each key/value from the user properties object is then set as a separate AMQP user property.

Maximum Wait

Number

Maximum time to wait for a message before timing out.

10000

Maximum Wait Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit to use in the maximumWaitTime configurations.

MILLISECONDS

Reconnection Strategy

A retry strategy in case of connectivity errors.

Output

Type

Any

Attributes Type

For Configurations

Throws

  • AMQP:CONNECTIVITY

  • AMQP:CONSUMING

  • AMQP:CREATION_NOT_ALLOWED

  • AMQP:ILLEGAL_BODY

  • AMQP:PUBLISHING

  • AMQP:PUBLISHING_CONSUMING

  • AMQP:QUEUE_NOT_FOUND

  • AMQP:RETRY_EXHAUSTED

  • AMQP:TIMEOUT

Ack

<amqp:ack>

Operation that allows the user to ACK a delivered AMQP message.

Parameters

Name Type Description Default Value Required

Ack Id

String

The AckId of the message to ACK.

x

Reject

<amqp:reject>

Operation that allows the user to reject a delivered AMQP message.

Parameters

Name Type Description Default Value Required

Ack Id

String

The AckId of the message to ACK.

x

Requeue

Boolean

Indicates whether the rejected message has to be requeued.

false

Sources

Listener

<amqp:listener>

AMQP Listener for queues to listen for incoming messages.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x

Queue Name

String

Name of the queue to consume from.

x

Number Of consumers

Number

The number of concurrent consumers to use to receive for AMQP messages.

4

Consumer Tag

String

A client-generated consumer tag to establish context.

4

Recovery Strategy

Enumeration, one of:

  • NONE

  • NO_REQUEUE

  • REQUEUE

Strategy to use when a channel-recover or a rollback is performed.

REQUEUE

Inbound content type

String

The content type of the message body.

Inbound encoding

String

The inboundEncoding of the message body.

Types

Redelivery Policy

Field Type Description Default Value Required

Max Redelivery Count

Number

The maximum number of times a message can be redelivered and processed unsuccessfully before triggering a process-failed-message.

Use Secure Hash

Boolean

Whether to use a secure hash algorithm to identify a redelivered message.

Message Digest Algorithm

String

The secure hashing algorithm to use. If not set.

SHA-256

Id Expression

String

Defines one or more expressions to use to determine when a message has been redelivered. This property can be used only if useSecureHash is false.

Object Store

ObjectStore

The object store where the redelivery counter for each message is stored.

Reconnect

Field Type Description Default Value Required

Frequency

Number

How often in milliseconds to reconnect.

Count

Number

How many reconnection attempts to make.

blocking

Boolean

If false, the reconnection strategy runs in a separate, non-blocking thread.

true

Reconnect Forever

Field Type Description Default Value Required

Frequency

Number

How often in milliseconds to reconnect.

blocking

Boolean

If false, the reconnection strategy runs in a separate, non-blocking thread.

true

Queue Definition

Parameters

Name Type Description Default Value Required

Removal Strategy

Enumeration, one of:

  • EXPLICIT

  • SHUTDOWN

  • UNUSED

Defines when the declared queue must be removed from the broker.

EXPLICIT

Exchange to Bind

String

Defines the exchange to bind the queue to.

Binding Routing Key

String

Defines the routing key to use in the binding to the exchange. (v1.4.0 and later)

Exchange Definition

Parameters

Name Type Description Default Value Required

Removal Strategy

Enumeration, one of:

  • EXPLICIT

  • SHUTDOWN

  • UNUSED

Defines when the declared exchange must be removed from the broker.

EXPLICIT

Exchange Type

Enumeration, one of:

  • DIRECT

  • TOPIC

  • FANOUT

  • HEADERS

The type of the exchange to be declared

FANOUT

AMQP Attributes

Parameters

Name Type Description Default Value Required

Envelope

ENVELOPE

Encapsulates a group of parameters used for AMQP’s basic methods.

Properties

PROPERTIES

AMQP Message Properties.

Headers

MAP

AMQP Message headers.

Envelope

Parameters

Name Type Description Default Value Required

Delivery Tag

Number

The delivery Tag

Redeliver

Boolean

True if this is a redelivery following a failed ACK.

Exchange

String

The exchange used for the current operation.

routingKey

String

The associated routing key.

Properties

Parameters

Name Type Description Default Value Required

Content Type

String

The content type of the message.

Content Encoding

String

Content encoding of the message.

Priority

Number

The priority to use when publishing to the AMQP broker.

Correlation Id

String

Used in case of implementation of RPC pattern to distinguish among messages in a request-reply.

Message Id

String

The message ID of the message.

Reply To

String

Destination set in case of RPC.

Expiration

String

Specify the expiration time for the message.

Expiration time unit

Specify the unit of time for the message expiration time.

User Id

String

User ID of the message.

App Id

String

App ID of the message.

Cluster Id

String

Cluster ID of the message.

Timestamp

TIMESTAMP

Timestamp of the consumed message.

Type

View on GitHub