メッセージのパブリッシュ - Mule4

publish​ 操作では、新しい AMQP メッセージを作成して、指定したエクスチェンジに送信できます。その後、メッセージのコンテンツだけでなく、AMQP メッセージのすべてのヘッダー、プロパティ、エンベロープも設定できます。

エクスチェンジへのメッセージの送信

デフォルトの形式で使用された場合、コネクタによってメッセージペイロード内のすべての内容がパブリッシュされます。

<amqp:publish config-ref="AMQP_Config" exchangeName="targetExchange"/>

しかし、ペイロードが正しい形式でなく、実際に変換を行う必要がある場合にはどうでしょうか? この場合は、publish​ 操作の前に DataWeave トランスフォーマを配置できますが、メッセージのペイロードが変更されるため、publish​ に続く操作に影響が出ることがあります。

この不要な影響を回避するには、publish​ 操作の内部にトランスフォーマを配置します。

<amqp:publish config-ref="Amqp_Config" exchangeName="targetExchange">
	<amqp:message>
		<amqp:body>
		<![CDATA[#[%dw 2.0
			output application/json
			---
			payload.payments
		]]]></amqp:body>
	</amqp:message>
</amqp:publish>

これで、トランスフォーマによってパブリッシュされるコンテンツが生成される一方で、送信中のメッセージに対する副作用は発生しません。

メッセージのルーティングキーの上書きまたは設定

プロパティのルーティングキーを設定または上書きすることが必要になる場合があります。ルーティングキーは、AMQP メッセージのプロパティで表されます。

<amqp:publish config-ref="Amqp_Config" exchangeName="targetExchange">
	<amqp:message>
		<amqp:body>
		<![CDATA[#[%dw 2.0
			output application/json
			---
			payload.payments
		]]]></amqp:body>
	</amqp:message>
	<amqp:routing-keys >
		<amqp:routing-key value="routingKey1" />
		<amqp:routing-key value="routingKey*" />
	</amqp:routing-keys>
</amqp:publish>

応答宛先の宣言

送信するメッセージに対する非同期の応答が必要な場合は、AMQP の publish​ 操作で任意の reply-to​ 宛先を宣言できます。この宛先は、AMQP メッセージプロパティでメッセージのコンシューマに渡され、返信が送信される宛先となります。

reply-to​ 宛先を宣言するには、次のようにメッセージのプロパティに追加します。

<amqp:publish config-ref="AMQP_config" exchangeName="#[vars.targetExchange]">
	<amqp:message>
		<amqp:properties replyTo="replyToQueue" />
	</amqp:message>
</amqp:publish>

ブローカーから確認の要求

ブローカーが publish 操作を確認する必要がある場合、requestBrokerConfirms​ パラメータを設定できます。このパラメータを設定すると、publish 操作のチャネルが確認モードに設定され、コネクタはブローカーからの basic.ack​ を想定します。パラメータは、グローバルパブリッシャー設定または操作レベルで設定できます。

<amqp:publish
	config-ref="AMQP_config"
	exchangeName="#[vars.targetExchange]"
	requestBrokerConfirms="true">
	<amqp:message>
		<amqp:properties replyTo="replyToQueue" />
	</amqp:message>
</amqp:publish>

確認が届かない場合、ブローカーが確認メッセージに同意しなかったことを示す AMQP:PUBLISHING​ エラーが発生します。

返されるメッセージの処理

コネクタでは、必須および即時のパブリッシュフラグがサポートされています。

このコネクタで送信されるメッセージを配信できない場合、AMQP ブローカーは非同期でそのメッセージを返します。

AMQP コネクタでは、返されるこれらのメッセージを交換にディスパッチしてカスタム処理を実行できる可能性があります。

コネクタレベルで返されるメッセージを処理するエンドポイントを定義できます。例を挙げます。

<amqp:publish config-ref="AMQP_config" exchangeName="#[vars.targetExchange]" returnedMessageExchange="exchange" mandatory="true" immediate="true" />

Publish 操作でのエクスチェンジの宣言

デフォルトでは、定義された交換が存在しないと AMQP:EXCHANGE_NOT_FOUND​ エラーで publish​ 操作に失敗します。

交換を宣言する必要がある場合、交換を宣言するためにエンティティの定義を参照するか、インラインで定義する必要があります。

<amqp:publish
	config-ref="Amqp_Config"
	exchangeName="targetExchange">
	<amqp:fallback-exchange-definition
		removalStrategy="SHUTDOWN"
		type="DIRECT"/>
</amqp:publish>

交換は、高レベル要素として定義することもできます。

<amqp:exchange-defintiion
	name="targetExchangeDefinition"
	removalStrategy="SHUTDOWN"
	type="DIRECT" />

<amqp:publish
	config-ref="Amqp_Config"
	exchangeName="targetExchange"
	fallbackExchangeDefinition="targetExchangeDefinition" />

AMQP トポグラフィーの変更を回避

createFallbackExchange​ グローバル設定を指定して、代替エクスチェンジの定義による AMQP トポグラフィーの変更を回避できます。「AMQP トポグラフィーの変更を回避」​を参照してください。

相関 ID の伝播

publish​ 操作では、送信メッセージの correlationId​ を設定できます。

最初に、sendCorrelationId​ パラメータを使用してメッセージをパブリッシュするときに correlationId​ を送信するかどうかを設定する必要があります。このパラメータは、ALWAYS​ (常にヘッダーを送信する)、NEVER​ (ヘッダーを送信しない)、または AUTO​ (デフォルト、アプリケーション設定を使用する) に設定できます。 次に、メッセージを送信するイベントの correlationId​ を使用するか、またはメッセージビルダーでカスタム correlationId​ を設定できます。

<amqp:publish
	config-ref="AMQP_config"
	sendCorrelationId="ALWAYS"
	exchangeName="#[vars.targetExchange]">
	<amqp:properties
		correlationId="#[attributes.properties.correlationId]" />
</amqp:publish>

関連情報

Was this article helpful?

💙 Thanks for your feedback!