メッセージのパブリッシュ方法

この操作では、新しい AMQP メッセージを作成して、指定した交換に送信できます。その後、メッセージのコンテンツだけでなく、AMQP メッセージのすべてのヘッダー、プロパティ、エンベロープも設定できます。

交換へのメッセージの送信

デフォルトの形式で使用された場合、コネクタによってメッセージペイロード内のすべての内容がパブリッシュされます。

<amqp:publish config-ref="AMQP_Config" exchangeName="targetExchange"/>

しかし、ペイロードが正しい形式でなく、実際に変換を行う必要がある場合にはどうでしょうか? この場合は、publish 操作の前に DataWeave トランスフォーマを配置できますが、メッセージのペイロードが変更されるため、publish に続く操作に影響が出ることがあります。

この不要な影響を回避するには、publish 操作の内部にトランスフォーマを配置します。

<amqp:publish config-ref="Amqp_Config" exchangeName="targetExchange">
	<amqp:message>
		<amqp:body>
		<![CDATA[#[%dw 2.0
			output application/json
			---
			payload.payments
		]]]></amqp:body>
	</amqp:message>
</amqp:publish>

これで、トランスフォーマによってパブリッシュされるコンテンツが生成される一方で、送信中のメッセージに対する副作用は発生しません。

メッセージのルーティングキーの上書きまたは設定

プロパティのルーティングキーを設定または上書きすることが必要になる場合があります。ルーティングキーは、AMQP メッセージのプロパティで表されます。

<amqp:publish config-ref="Amqp_Config" exchangeName="targetExchange">
	<amqp:message>
		<amqp:body>
		<![CDATA[#[%dw 2.0
			output application/json
			---
			payload.payments
		]]]></amqp:body>
	</amqp:message>
	<amqp:routing-keys >
		<amqp:routing-key value="routingKey1" />
		<amqp:routing-key value="routingKey*" />
	</amqp:routing-keys>
</amqp:publish>

応答宛先の宣言

送信するメッセージに対する非同期の応答が必要な場合は、AMQP の publish 操作で任意の reply-to 宛先を宣言できます。この宛先は、AMQP メッセージプロパティでメッセージのコンシューマに渡され、応答の宛先となります。

reply-to 宛先を宣言するには、次のようにメッセージのプロパティに追加します。

<amqp:publish config-ref="AMQP_config" exchangeName="#[vars.targetExchange]">
	<amqp:message>
		<amqp:properties replyTo="replyToQueue" />
	</amqp:message>
</amqp:publish>

ブローカーから確認の要求

ブローカーが publish 操作を確認する必要がある場合、requestBrokerConfirms パラメータを設定できます。このパラメータを設定すると、publish 操作のチャネルが確認モードに設定され、コネクタはブローカーからの basic.ack を想定します。パラメータは、グローバルパブリッシャー設定または操作レベルで設定できます。

<amqp:publish config-ref="AMQP_config" exchangeName="#[vars.targetExchange]" requestBrokerConfirms="true">
	<amqp:message>
		<amqp:properties replyTo="replyToQueue" />
	</amqp:message>
</amqp:publish>

確認が届かない場合、ブローカーが確認メッセージに同意しなかったことを示す AMQP:PUBLISHING エラーが発生します。

返されるメッセージの処理

コネクタでは、後述するように必須および即時のパブリッシュフラグがサポートされています。

このコネクタで送信されるメッセージを配信できない場合、AMQP ブローカーは非同期でそのメッセージを返します。

AMQP コネクタでは、返されるこれらのメッセージを交換にディスパッチしてカスタム処理を実行できる可能性があります。

コネクタレベルで返されるメッセージを処理するエンドポイントを定義できます。例を挙げます。

<amqp:publish config-ref="AMQP_config" exchangeName="#[vars.targetExchange]" returnedMessageExchange="exchange" mandatory="true" immediate="true" />

Publish 操作での交換の宣言

デフォルトでは、定義された交換が存在しないと AMQP:EXCHANGE_NOT_FOUND エラーで publish 操作に失敗します。

交換を宣言する必要がある場合、交換を宣言するためにエンティティの定義を参照するか、インラインで定義する必要があります。

<amqp:publish config-ref="Amqp_Config" exchangeName="targetExchange">
	<amqp:fallback-exchange-definition removalStrategy="SHUTDOWN" type="DIRECT"/>
</amqp:publish>

交換は、高レベル要素として定義することもできます。

<amqp:exchange-defintiion name="targetExchangeDefinition" removalStrategy="SHUTDOWN" type="DIRECT" />

<amqp:publish config-ref="Amqp_Config" exchangeName="targetExchange" fallbackExchangeDefinition="targetExchangeDefinition" />

AMQP トポグラフィーの変更を回避する方法

createFallbackExchange グローバル設定を指定して、代替交換の定義による AMQP トポグラフィーの変更を回避できます。「 AMQP トポグラフィーの変更を回避する方法」を参照してください。

相関 ID の伝播

publish 操作では、送信メッセージの correlationId を設定できます。

最初に、sendCorrelationId パラメータを使用してメッセージをパブリッシュするときに correlationId を送信するかどうかを設定する必要があります。このパラメータは、ALWAYS (常にヘッダーを送信する)、NEVER (ヘッダーを送信しない)、または AUTO (デフォルト、アプリケーション設定を使用する) に設定できます。 次に、メッセージを送信するイベントの correlationId を使用するか、またはメッセージビルダーでカスタム correlationId を設定できます。

<amqp:publish config-ref="AMQP_config" sendCorrelationId="ALWAYS"  exchangeName="#[vars.targetExchange]">
	<amqp:properties correlationId="#[attributes.properties.correlationId]" />
</amqp:publish>

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub