Contact Free trial Login

AMQP Connector Reference - Mule 4

Support Category: Select

AMQP Connector v1.4

AMQP Connector is an AMQP 0.9.1 compliant MuleSoft Extension, and is used to consume and produce AMQP messages. The extension supports AMQP functionality including exchanges and queues, consumers, acknowledgement modes, and local transactions.

Configurations


Config

Base configuration for the AMQP connector.

Parameters

Name Type Description Default Value Required

Name

String

The name for this configuration. Connectors reference the configuration with this name.

x 

Encoding

String

The default encoding of the Message body to be used if the message doesn't communicate it.

 

Content Type

String

The default contentType of the Message body to be used if the message doesn't communicate it.

/

 

Create Fallback Queue

String

Whether to create non-existing queues according to the fallback definition in case they are defined.

true

 

Create Fallback Exchange

String

Whether to create non-existing exchanges according to the fallback definition in case they are defined.

true

 

Configuration for Connection.

Parameters

Name Type Description Default Value Required

Host

String

Host of the broker to connect to.

x 

Port

Number

Port of the AMQP broker to connect to.

5672 for Non-Secure Connection, 5671 for Secure Connection

 

Username

String

Username to be used when providing credentials for authentication.

 

x 

Password

String

Password to be used when providing credentials for authentication.

 

x 

useTls

Boolean

Whether TLS is needed to be used. In case it is not provided, the default for AMQP connection is used.

false

 

heartbeatTimeout

Boolean

The heartbeat timeout. Heartbeat frames are sent at about 1/2 the timeout interval. (From 1.2.0 onwards.)

60

 

fallbackAddresses

List of Fallback Addresses (host, port)

The addresses of the broker nodes to attempt connection to, should the connection to main broker fail. (From v1.3.0 onwards.)

No fallback addresses

 

Configuration for Consumers.

Parameters

Name Type Description Default Value Required

Ack Mode

Enumeration, one of:

  • IMMEDIATE

  • AUTO

  • MANUAL

The ConsumerAckMode to use when consuming a message. Can be overridden at the message source level.

IMMEDIATE

 

No Local

Boolean

If set to true, the server does not send messages to the connection that published them.

false

 

Exclusive Consumers

Boolean

Set to true if the connector should only create exclusive consumers, that is, only the created consumer can access the queue.

false

 

Number of Consumers

Integer

It is the number of consumers spawned by message source to receive AMQP messages. Each consumer creates a channel.

4

 

Configuration for Publishers.

Parameters

Name Type Description Default Value Required

Delivery Mode Mode

Enumeration, one of:

  • PERSISTENT

  • TRANSIENT

the delivery mode use when publishing to the AMQP broker.

PERSISTENT

 

Priority

Integer

The priority to use when publishing to the AMQP broker.

0

 

Request Broker Confirms

Boolean

Whether it must fail in case no confirmation is provided.

false

 

Mandatory

Boolean

Whether the operation must fail if the message cannot be routed to a queue.

false

 

Immediate

Boolean

Whether the operation must fail if the message cannot be routed to a queue consumer immediately.

false

 

Returned Message Exchange

String

The exchange to publish returned messages.

 

 

Configuration for Quality of Service.

Parameters

Name Type Description Default Value Required

Prefetch Size

Integer

This field defines a prefetch size window. The broker sends as many messages as possible without exceeding the prefetchSize window in octets (bytes). 0 is used for no specific limit.

0

 

Prefetch Count

Integer

Specifies a global prefetch window in terms of whole messages. This field may be used in combination with the prefetch-size field. A message will only be sent in advance if both prefetch windows allow it. 0 is used for no specific limit.

0

 

Associated Operations

Associated Sources

Operations

Consume

<amqp:consume>

Operation that allows the user to consume a single AmqpMessage from a given queue.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x 

Queue name

String

The name of the queue from where the message should be consumed.

x 

Content Type

String

The message's content content type.

 

Encoding

String

The message's content encoding.

 

Fallback Queue Definition

Definition of a Queue

The queue definition to use for queue declaration in case there is no queue with the queueName.

 

Ack Mode

Enumeration, one of:

  • IMMEDIATE

  • MANUAL

The ConsumerAckMode to configure for the message and session.

 

Maximum Wait

Number

Maximum time to wait for a message before timing out.

10000

 

Maximum Wait Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit to be used in the maximumWaitTime configurations.

MILLISECONDS

 

Transactional Action

Enumeration, one of:

  • ALWAYS_JOIN

  • JOIN_IF_POSSIBLE

  • NOT_SUPPORTED

The type of joining action that operations can take regarding transactions.

JOIN_IF_POSSIBLE

 

Reconnection Strategy

A retry strategy in case of connectivity errors.

 

Output

Type

Any

Attributes Type

For Configurations

Throws

  • AMQP:TIMEOUT  

  • AMQP:CONNECTIVITY  

  • AMQP:CONSUMING  

  • AMQP:RETRY_EXHAUSTED  

  • AMQP:QUEUE_NOT_FOUND  

  • AMQP:CREATION_NOT_ALLOWED  

Publish

<amqp:publish>

Operation that allows the user to publish a single AmqpMessage to a given exchange.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x 

Exchange Name

String

The name of the exchange to publish the message to.

x 

Fallback Exchange Definition

Definition of an Exchange

The exchange to use for exchange declaration in case there is no exchange with the exchangeName.

 

Routing Keys

LIST

List of routing keys.

 

Delivery Mode

Enumeration, one of:

  • PERSISTENT

  • TRANSIENT

the delivery mode use when publishing to the AMQP broker.

PERSISTENT

 

Correlation Id

String

The AMQPCorrelationID header of the Message.

 

ContentType

String

The content type of the body.

 

Encoding

String

The outboundEncoding of the message's body.

 

Reply To

String

The AMQP replyTo property information of the queue where this message should be replied to.

 

User Properties

Object

The custom user properties that should be set for this message. Each property is merged with other default AMQP user properties. All AMQP user properties are set at once in a single object. You can write this object as a DataWeave object, such as #[output application/json --- { userName: vars.user, appName: 'myApp'}]. Each key/value from the user properties object is then set as a separate AMQP user property.

 

Reconnection Strategy

A retry strategy in case of connectivity errors.

 

For Configurations

Throws

  • AMQP:PUBLISHING  

  • AMQP:UNROUTABLE_MESSAGE  

  • AMQP:CREATION_NOT_ALLOWED  

  • AMQP:ILLEGAL_BODY  

  • AMQP:RETRY_EXHAUSTED  

Publish Consume

<amqp:publish-consume>

Send a message to an AMQP Exchange and wait for a response either to the provided replyTo destination or to a temporary destination created dynamically.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

 

x 

Exchange Name

String

The name of the exchange to publish the message to.

 

x 

Correlation Id

String

The AMQPCorrelationID header of the message.

 

 

ContentType

String

The content type of the body.

/

 

Encoding

String

The outboundEncoding of the message's body.

 

 

User Properties

Object

The custom user properties that should be set for this message. Each property is merged with other default AMQP user properties. All AMQP user properties are set at once in a single object. You can write this object as a DataWeave object, such as #[output application/json --- { userName: vars.user, appName: 'myApp'}]. Each key/value from the user properties object is then set as a separate AMQP user property.

 

Maximum Wait

Number

Maximum time to wait for a message before timing out.

10000

 

Maximum Wait Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit to be used in the maximumWaitTime configurations.

MILLISECONDS

 

Reconnection Strategy

A retry strategy in case of connectivity errors.

 

Output

Type

Any

Attributes Type

For Configurations

Throws

  • AMQP:PUBLISHING_CONSUMING  

  • AMQP:PUBLISHING  

  • AMQP:TIMEOUT  

  • AMQP:CONNECTIVITY  

  • AMQP:CONSUMING  

  • AMQP:ILLEGAL_BODY  

  • AMQP:RETRY_EXHAUSTED  

  • AMQP:QUEUE_NOT_FOUND  

  • AMQP:CREATION_NOT_ALLOWED  

Ack

<amqp:ack>

Operation that allows the user to ACK a delivered AmqpMessage.

Parameters

Name Type Description Default Value Required

Ack Id

String

The AckId of the message to ACK.

x 

Reject

<amqp:reject>

Operation that allows the user to reject a delivered AmqpMessage.

Parameters

Name Type Description Default Value Required

Ack Id

String

The AckId of the message to ACK.

x 

Requeue

Boolean

Indicates whether the rejected message has to be requeued.

false

 

Sources

Listener

<amqp:listener>

AMQP Listener for Queues, allows to listen for incoming messages.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x 

Queue Name

String

Name of the queue to consume from.

x 

Number Of consumers

Number

The number of concurrent consumers to use to receive for AMQP messages.

4

 

Consumer Tag

String

A client-generated consumer tag to establish context.

4

 

Recovery Strategy

Enumeration, one of:

  • NONE

  • NO_REQUEUE

  • REQUEUE

Strategy to use when a channel recover or a rollback is performed.

REQUEUE

 

Inbound content type

String

The content type of the message body.

 

Inbound encoding

String

The inboundEncoding of the message body.

 

Types

Redelivery Policy

Field Type Description Default Value Required

Max Redelivery Count

Number

The maximum number of times a message can be redelivered and processed unsuccessfully before triggering process-failed-message.

Use Secure Hash

Boolean

Whether to use a secure hash algorithm to identify a redelivered message.

Message Digest Algorithm

String

The secure hashing algorithm to use. If not set, the default is SHA-256.

Id Expression

String

Defines one or more expressions to use to determine when a message has been redelivered. This property may only be set if useSecureHash is false.

Object Store

ObjectStore

The object store where the redelivery counter for each message is going to be stored.

Reconnect

Field Type Description Default Value Required

Frequency

Number

How often in milliseconds to reconnect.

Count

Number

How many reconnection attempts to make.

blocking

Boolean

If false, the reconnection strategy runs in a separate, non-blocking thread.

true

Reconnect Forever

Field Type Description Default Value Required

Frequency

Number

How often in milliseconds to reconnect.

blocking

Boolean

If false, the reconnection strategy runs in a separate, non-blocking thread.

true

Queue Definition

Parameters

Name Type Description Default Value Required

Removal Strategy

Enumeration, one of:

  • EXPLICIT

  • SHUTDOWN

  • UNUSED

Defines when the declared queue must be removed from the broker.

EXPLICIT

 

Exchange to Bind

String

Defines the exchange to bind the queue to.

 

 

Binding Routing Key

String

Defines the routing key to use in the binding to the exchange. (Since v1.4.0.)

 

 

Exchange Definition

Parameters

Name Type Description Default Value Required

Removal Strategy

Enumeration, one of:

  • EXPLICIT

  • SHUTDOWN

  • UNUSED

Defines when the declared exchange must be removed from the broker.

EXPLICIT

 

Exchange Type

Enumeration, one of:

  • DIRECT

  • TOPIC

  • FANOUT

  • HEADERS

The type of the exchange to be declared

FANOUT

 

AMQP Attributes

Parameters

Name Type Description Default Value Required

Envelope

ENVELOPE

Encapsulates a group of parameters used for AMQP’s basic methods.

Properties

PROPERTIES

AMQP Message Properties.

Headers

MAP

AMQP Message headers.

Envelope

Parameters

Name Type Description Default Value Required

Delivery Tag

Number

The delivery Tag

Redeliver

Boolean

True if this is a redelivery following a failed ACK.

Exchange

String

The exchange used for the current operation.

routingKey

String

The associated routing key.

Properties

Parameters

Name Type Description Default Value Required

Content Type

String

The content type of the message.

Content Encoding

String

Content encoding of the message.

Delivery Mode

DELIVERY MODE

The delivery mode to use when publishing to the AMQP broker.

Priority

Number

The priority to use when publishing to the AMQP broker.

Correlation Id

String

Used in case of implementation of RPC pattern to distinguish among messages in a request-reply.

replyTo

String

Destination set in case of RPC.

expiration

String

Expiration in milliseconds for the message.

messageId

String

The messageId of the Message.

timestamp

TIMESTAMP

Timestamp of the consumed message.

type

String

Type of the consumed message.

userId

String

User ID of the message.

appId

String

App ID of the message.

clusterId

String

Cluster ID of the message.

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub