<amqp:publish config-ref="AMQP_Config" exchangeName="targetExchange"/>
Publish Messages - Mule 4
The publish
operation enables you to create a new AMQP message and send it to the specified exchange. You can then configure not only the content of the message, but also all the headers, the properties, and the envelope in the AMQP message.
Send a Message to an Exchange
When used in its default form, the connector will publish whatever is in the message payload:
But what happens if the payload is not in the correct format, and you actually need to make a transformation? You could place a DataWeave transformation before the publish
operation, but that would cause the message payload to change and impact the operation placed after the publish
operation.
To avoid this undesired impact, you can now place the transformation inside the publish
operation:
<amqp:publish config-ref="Amqp_Config" exchangeName="targetExchange">
<amqp:message>
<amqp:body>
<![CDATA[#[%dw 2.0
output application/json
---
payload.payments
]]]></amqp:body>
</amqp:message>
</amqp:publish>
Now, the transformation can be used for generating the content that will be published, without producing a side effect on the message in transit.
Override or Set the Routing Keys of the Message
You may need to set or override the routing key of the property. The routing key is represented by a property of an AMQP message:
<amqp:publish config-ref="Amqp_Config" exchangeName="targetExchange">
<amqp:message>
<amqp:body>
<![CDATA[#[%dw 2.0
output application/json
---
payload.payments
]]]></amqp:body>
</amqp:message>
<amqp:routing-keys >
<amqp:routing-key value="routingKey1" />
<amqp:routing-key value="routingKey*" />
</amqp:routing-keys>
</amqp:publish>
Declare a Reply Destination
For the cases where you need an asynchronous reply to the message being sent, the AMQP publish
operation allows us to declare any reply-to
destination. This destination will be communicated as an AMQP message Property to the consumer of the Message. It represents the destination where we should expect a reply to be sent.
To declare the reply-to
destination, we add it to the message properties:
<amqp:publish config-ref="AMQP_config" exchangeName="#[vars.targetExchange]">
<amqp:message>
<amqp:properties replyTo="replyToQueue" />
</amqp:message>
</amqp:publish>
Request Confirms from the Broker
If you need that the broker confirms a publish operation, you can set the requestBrokerConfirms
parameter. Setting this parameter, the channel in the publish operation is set in confirm mode and the connector will expect a basic.ack
from the broker. The parameter can be set in the global publisher configuration or at the operation level.
<amqp:publish
config-ref="AMQP_config"
exchangeName="#[vars.targetExchange]"
requestBrokerConfirms="true">
<amqp:message>
<amqp:properties replyTo="replyToQueue" />
</amqp:message>
</amqp:publish>
In case no confirmation arrives, an AMQP:PUBLISHING
error will be raised indicating that the broker failed to agree on confirming messages.
Returned Messages Handling
The connector supports the mandatory and immediate publication flags.
If a message sent with this connector cannot be delivered, the AMQP broker returns it asynchronously.
The AMQP connector offers the possibility of dispatching these returned messages to an exchange for custom processing.
You can define the endpoint in charge of handling returned messages at the connector level. Here is an example:
<amqp:publish config-ref="AMQP_config" exchangeName="#[vars.targetExchange]" returnedMessageExchange="exchange" mandatory="true" immediate="true" />
Declare an Exchange in the Publish Operation
By default, the publish
operation will fail in case the defined exchange does not exist with an AMQP:EXCHANGE_NOT_FOUND
error.
For cases where the exchange has to be declared, a definition for the entity should be referenced or inline defined so that the exchange is declared.
<amqp:publish
config-ref="Amqp_Config"
exchangeName="targetExchange">
<amqp:fallback-exchange-definition
removalStrategy="SHUTDOWN"
type="DIRECT"/>
</amqp:publish>
The exchange can also be defined as a high level element:
<amqp:exchange-defintiion
name="targetExchangeDefinition"
removalStrategy="SHUTDOWN"
type="DIRECT" />
<amqp:publish
config-ref="Amqp_Config"
exchangeName="targetExchange"
fallbackExchangeDefinition="targetExchangeDefinition" />
Avoid Changing the AMQP Topography
You can set the createFallbackExchange
global config to prevent changes to the AMQP topography resulting from the definition of fallback exchanges, see Avoid Changing the AMQP Topography.
Propagate the Correlation ID
The publish
operation allows you to configure the correlationId
for the outgoing Message.
First you need to configure whether or not you want to send the correlationId
when publishing the Message using the sendCorrelationId
parameter. This parameter can be set to ALWAYS
(always send the header), NEVER
(never send the header) or AUTO
(the default, use the application configuration).
Then, you can either use the correlationId
of the Event that is sending the Message, or you can configure your own custom correlationId
in the message builder:
<amqp:publish
config-ref="AMQP_config"
sendCorrelationId="ALWAYS"
exchangeName="#[vars.targetExchange]">
<amqp:properties
correlationId="#[attributes.properties.correlationId]" />
</amqp:publish>