メッセージのパブリッシュ - Mule4

publish​ 操作では、新しい AMQP メッセージを作成して、指定したエクスチェンジに送信できます。その後、メッセージのコンテンツだけでなく、AMQP メッセージのすべてのヘッダー、プロパティ、エンベロープも設定できます。

エクスチェンジへのメッセージの送信

デフォルトの形式で使用された場合、コネクタによってメッセージペイロード内のすべての内容がパブリッシュされます。

<amqp:publish config-ref="AMQP_Config" exchangeName="targetExchange"/>
xml

しかし、ペイロードが正しい形式でなく、実際に変換を行う必要がある場合にはどうでしょうか? この場合は、​publish​ 操作の前に DataWeave トランスフォーマーを配置できますが、メッセージのペイロードが変更されるため、​publish​ に続く操作に影響が出ることがあります。

この不要な影響を回避するには、​publish​ 操作の内部にトランスフォーマーを配置します。

<amqp:publish config-ref="Amqp_Config" exchangeName="targetExchange">
	<amqp:message>
		<amqp:body>
		<![CDATA[#[%dw 2.0
			output application/json
			---
			payload.payments
		]]]></amqp:body>
	</amqp:message>
</amqp:publish>
xml

これで、トランスフォーマーによってパブリッシュされるコンテンツが生成される一方で、送信中のメッセージに対する副作用は発生しません。

メッセージのルーティングキーの上書きまたは設定

プロパティのルーティングキーを設定または上書きすることが必要になる場合があります。ルーティングキーは、AMQP メッセージのプロパティで表されます。

<amqp:publish config-ref="Amqp_Config" exchangeName="targetExchange">
	<amqp:message>
		<amqp:body>
		<![CDATA[#[%dw 2.0
			output application/json
			---
			payload.payments
		]]]></amqp:body>
	</amqp:message>
	<amqp:routing-keys >
		<amqp:routing-key value="routingKey1" />
		<amqp:routing-key value="routingKey*" />
	</amqp:routing-keys>
</amqp:publish>
xml

応答宛先の宣言

送信するメッセージに対する非同期の応答が必要な場合は、AMQP の ​publish​ 操作で任意の ​reply-to​ 宛先を宣言できます。この宛先は、AMQP メッセージプロパティとしてメッセージのコンシューマーに渡されます。返信が送信されると予想される宛先を表します。

reply-to​ 宛先を宣言するには、次のようにメッセージのプロパティに追加します。

<amqp:publish config-ref="AMQP_config" exchangeName="#[vars.targetExchange]">
	<amqp:message>
		<amqp:properties replyTo="replyToQueue" />
	</amqp:message>
</amqp:publish>
xml

ブローカーから確認の要求

ブローカーが publish 操作を確認する必要がある場合、​requestBrokerConfirms​ パラメーターを設定できます。このパラメーターを設定すると、publish 操作のチャネルが確認モードに設定され、コネクタはブローカーからの ​basic.ack​ を想定します。パラメーターは、グローバルパブリッシャー設定または操作レベルで設定できます。

<amqp:publish
	config-ref="AMQP_config"
	exchangeName="#[vars.targetExchange]"
	requestBrokerConfirms="true">
	<amqp:message>
		<amqp:properties replyTo="replyToQueue" />
	</amqp:message>
</amqp:publish>
xml

確認が届かない場合、ブローカーが確認メッセージに同意しなかったことを示す ​AMQP:PUBLISHING​ エラーが発生します。

返されるメッセージの処理

コネクタでは、必須および即時のパブリッシュフラグがサポートされています。

このコネクタで送信されるメッセージを配信できない場合、AMQP ブローカーは非同期でそのメッセージを返します。

AMQP Connector では、返されるこれらのメッセージを交換にディスパッチしてカスタム処理を実行できる可能性があります。

コネクタレベルで返されるメッセージを処理するエンドポイントを定義できます。例を挙げます。

<amqp:publish config-ref="AMQP_config" exchangeName="#[vars.targetExchange]" returnedMessageExchange="exchange" mandatory="true" immediate="true" />
xml

Publish 操作でのエクスチェンジの宣言

デフォルトでは、定義された交換が存在しないと ​AMQP:EXCHANGE_NOT_FOUND​ エラーで ​publish​ 操作に失敗します。

交換を宣言する必要がある場合、交換を宣言するためにエンティティの定義を参照するか、インラインで定義する必要があります。

<amqp:publish
	config-ref="Amqp_Config"
	exchangeName="targetExchange">
	<amqp:fallback-exchange-definition
		removalStrategy="SHUTDOWN"
		type="DIRECT"/>
</amqp:publish>
xml

交換は、高レベル要素として定義することもできます。

<amqp:exchange-defintiion
	name="targetExchangeDefinition"
	removalStrategy="SHUTDOWN"
	type="DIRECT" />

<amqp:publish
	config-ref="Amqp_Config"
	exchangeName="targetExchange"
	fallbackExchangeDefinition="targetExchangeDefinition" />
xml

AMQP トポグラフィの変更を回避

createFallbackExchange​ グローバル設定を指定して、代替エクスチェンジの定義による AMQP トポグラフィの変更を回避できます。​「AMQP トポグラフィの変更を回避」​を参照してください。

相関 ID の伝播

publish​ 操作では、送信メッセージの ​correlationId​ を設定できます。

最初に、​sendCorrelationId​ パラメーターを使用してメッセージをパブリッシュするときに ​correlationId​ を送信するかどうかを設定する必要があります。このパラメーターは、​ALWAYS​ (常にヘッダーを送信する)、​NEVER​ (ヘッダーを送信しない)、または ​AUTO​ (デフォルト、アプリケーション設定を使用する) に設定できます。 次に、メッセージを送信するイベントの ​correlationId​ を使用するか、またはメッセージビルダーでカスタム ​correlationId​ を設定できます。

<amqp:publish
	config-ref="AMQP_config"
	sendCorrelationId="ALWAYS"
	exchangeName="#[vars.targetExchange]">
	<amqp:properties
		correlationId="#[attributes.properties.correlationId]" />
</amqp:publish>
xml

関連情報