Contact Free trial Login

To Publish Messages

This operation allows you to create a new AMQP Message and send it to the specified exchange. You can then configure not only the content of the message, but also all the headers, the properties and the envelope in the AMQP Message.

Sending a Message to an Exchange

When used in its default form, the connector will publish whatever is in the message payload:

<amqp:publish config-ref="AMQP_Config" exchangeName="targetExchange"/>

But what happens if the payload is not in the correct format, and you actually need to make a transformation? You could place a DataWeave transformation before the publish operation, but that would cause the message payload to change and impact the operation placed after the publish operation.

To avoid this undesired impact, you can now place the transformation inside the publish operation:

<amqp:publish config-ref="Amqp_Config" exchangeName="targetExchange">
	<amqp:message>
		<amqp:body>
		<![CDATA[#[%dw 2.0
			output application/json
			---
			payload.payments
		]]]></amqp:body>
	</amqp:message>
</amqp:publish>

Now, the transformation can be used for generating the content that will be published, without producing a side effect on the message in transit.

Overriding or Setting the Routing Keys of the Message

You may need to set or override the routing key of the property. The routing key is represented by a property of an AMQP message:

<amqp:publish config-ref="Amqp_Config" exchangeName="targetExchange">
	<amqp:message>
		<amqp:body>
		<![CDATA[#[%dw 2.0
			output application/json
			---
			payload.payments
		]]]></amqp:body>
	</amqp:message>
	<amqp:routing-keys >
		<amqp:routing-key value="routingKey1" />
		<amqp:routing-key value="routingKey*" />
	</amqp:routing-keys>
</amqp:publish>

Declaring a Reply Destination

For the cases where you need an asynchronous reply to the Message being sent, the AMQP publish operation allows us to declare any reply-to destination. This destination will be communicated as an AMQP Message Property to the consumer of the Message. It represents the destination where we should expect a reply to be sent.

To declare the reply-to destination, we add it to the Message properties:

<amqp:publish config-ref="AMQP_config" exchangeName="#[vars.targetExchange]">
	<amqp:message>
		<amqp:properties replyTo="replyToQueue" />
	</amqp:message>
</amqp:publish>

Request Confirms from the Broker

If you need that the the broker confirms a publish operation, you can set the requestBrokerConfirms parameter. Setting this parameter, the channel in the publish operation is set in confirm mode and the connector will expect a basic.ack from the broker. The parameter can be set in the global publisher configuration or at the operation level.

<amqp:publish config-ref="AMQP_config" exchangeName="#[vars.targetExchange]" requestBrokerConfirms="true">
	<amqp:message>
		<amqp:properties replyTo="replyToQueue" />
	</amqp:message>
</amqp:publish>

In case no confirmation arrives, an AMQP:PUBLISHING error will be raised indicating that the broker failed to agree on confirming messages.

Returned Messages Handling

The connector supports the mandatory and immediate publication flags, as described below.

If a message sent with this connector cannot be delivered, the AMQP broker returns it asynchronously.

The AMQP connector offers the possibility of dispatching these returned messages to an exchange for custom processing.

You can define the endpoint in charge of handling returned messages at the connector level. Here is an example:

<amqp:publish config-ref="AMQP_config" exchangeName="#[vars.targetExchange]" returnedMessageExchange="exchange" mandatory="true" immediate="true" />

Declaring an Exchange in the Publish Operation

By default, the publish operation will fail in case the defined exchange does not exist with an AMQP:EXCHANGE_NOT_FOUND error.

For cases where the exchange has to be declared, a definition for the entity should be referenced or inline defined so that the exchange is declared.

<amqp:publish config-ref="Amqp_Config" exchangeName="targetExchange">
	<amqp:fallback-exchange-definition removalStrategy="SHUTDOWN" type="DIRECT"/>
</amqp:publish>

The exchange can also be defined as a high level element:

<amqp:exchange-defintiion name="targetExchangeDefinition" removalStrategy="SHUTDOWN" type="DIRECT" />

<amqp:publish config-ref="Amqp_Config" exchangeName="targetExchange" fallbackExchangeDefinition="targetExchangeDefinition" />

How to Avoid Changing the AMQP Topography

You can set the createFallbackExchange global config to prevent changes to the AMQP topography resulting from the definition of fallback exchanges, see How to Avoid Changing the AMQP Topography.

Propagating the Correlation ID

The publish operation allows you to configure the correlationId for the outgoing Message.

First you need to configure whether or not you want to send the correlationId when publishing the Message using the sendCorrelationId parameter. This parameter can be set to ALWAYS (always send the header), NEVER (never send the header) or AUTO (the default, use the application configuration). Then, you can either use the correlationId of the Event that is sending the Message, or you can configure your own custom correlationId in the message builder:

<amqp:publish config-ref="AMQP_config" sendCorrelationId="ALWAYS"  exchangeName="#[vars.targetExchange]">
	<amqp:properties correlationId="#[attributes.properties.correlationId]" />
</amqp:publish>

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub