AMQP コネクタへの移行

他のコネクタと同様、AMQP コネクタも一新され、Mule 3 トランスポートモデルから、簡便な UX や DataSense に完全対応するメッセージ情報を特長とする操作ベースのコネクタに進化しています。 つまり、AMQP を使用すると、他のコネクタと同じように接続や設定を行えるだけでなく、メッセージのパブリッシュやコンシュームも構造されたメッセージ情報に基づくため簡便になります。

コネクタの設定

3.x のトランスポート設定から Mule 4 の AMQP コネクタ設定に移行するということは、基本的にはほぼ同じパラメータを使用しながら、より一貫性のある方法で宣言することになります。Mule 4 では AMQP コネクタの一定のパラメータが変更されています。 たとえば、メッセージのコンシュームに使用するパラメータは 'consumer-config' グループで宣言されますが、一般的な認証パラメータは一般グループで設定されます。また、コネクタの動作に影響するパラメータ (config に存在) が、接続の確立方法にのみ影響するパラメータ (接続レベルで存在) と区別されています。

Mule 3 の例: コネクタの設定
<amqp:connector name="AMQP_Config"
     fallbackAddresses="192.168.0.1,192.168.0.2"
     virtualHost="/"
     username="guest"
     password="guest"
	 deliveryMode="TRANSIENT"
	 priority="1"
	 ackMode="AUTO"
	 activeDeclarationsOnly="false"
	 mandatory="false"
	 immediate="false"
	 prefetchSize="16"
	 prefetchCount="16"
	 noLocal="false"
	 exclusiveConsumers="false"
	 requestBrokerConfirms="false"
	 numberOfChannels="10"
	  />

Mule 4 でも、実際に適用するコンテキストに同じパラメータを設定すれば、これと同じ設定にすることができます。

Mule 4 の例: コネクタの設定
<amqp:config name="AMQP_Config">
	<amqp:connection host="localhost" port="5672"
		virtualHost="/" username="guest" password="guest" />
	<amqp:consumer-config numberOfConsumers="10" exclusiveConsumers="false" noLocal="false" ackMode="AUTO"/>
	<amqp:publisher-config requestBrokerConfirms="false" mandatory="false" immediate="false" priority="1" deliveryMode="TRANSIENT"/>
	<amqp:quality-of-service prefetchSize="16" prefetchCount="16" />
</amqp:config>

fallbackAddresses はサポートされなくなりました。

メッセージの送信

Mule 4 の新しいアプローチでは、AMQP の 'publish' 操作に入力パラメータのみを利用して、パブリッシュする AMQP メッセージを完成させます。

たとえば、本文にペイロードの一部のみが記載された、優先度の高い AMQP メッセージを送信し、このメッセージをグループに関連付ける場合、次の手順を実行する必要があります。 <1>) transform を使用して、メッセージ本文となるものをペイロードに設定します。 <2>) 生成されたストリームを文字列に変換して、テキストメッセージとして送信します。 <3>) AMQP プロパティに priority を、AMQP メッセージの優先度を設定するキーとして設定します。

Mule 3 の例: 優先度が設定されたメッセージをグループの一部として送信
<flow name="AmqpTransportOutbound">
    <http:listener config-ref="HTTP_Listener_Configuration" path="/orders"/>
    <dw:transform-message> (1)
        <dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
{
order_id: payload.id,
supplier: payload.warehouse
}]]></dw:set-payload>
    </dw:transform-message>
    <object-to-string-transformer/> (2)
    <amqp:outbound-endpoint exchangeName="testExchange" connector-ref="AMQP_Conector" >
      <message-properties-transformer scope="outbound">
          <add-message-property key="priority" value="9"/> (3)
      </message-properties-transformer>
    </amqp:outbound-endpoint>
</flow>

Mule 4 でも、AMQP コネクタを次のとおり設定すれば、同じ結果を得ることができます。

Mule 4 の例: 優先度が設定されたメッセージをグループの一部として送信
<flow name="AMQPConnectorPublish">
		<http:listener config-ref="HTTP_Listener_config" path="/orders"/>
		(2)
		<amqp:publish config-ref="AMQP_Config" exchangeName="targetExchange">
			<amqp:message> (1)
				<amqp:body>#[output application/json ---
        {
          order_id: payload.id,
          supplier: payload.warehouse
        }]</amqp:body>
        			<amqp:properties priority="3"/> (3)
			</amqp:message>
		</amqp:publish>
	</flow>

次の違いに注意します。

1) メッセージの body はインラインで作成され、ペイロードには変更がないため、transform コンポーネントは必要ありません。 2) コネクタが変換出力を自動的に処理できるため、object-to-string トランスフォーマも排除されています。 3) 優先度は AmqpMessage 操作のプロパティとして設定され、ユーザが正確なキーを把握している必要はありません。

上記をまとめると、3.x で AMQP トランスポートを使用してメッセージをパブリッシュするときは、AmqpMessage ペイロードとアウトバウンドプロパティを利用して AMQP メッセージの作成を設定するため、トランスポートがどのように機能するのかの詳しい知識が必要でした。4.x では AMQP コネクタが、設定可能な各要素をその要素が属するスコープのパラメータとして公開するため、AMQP のすべての機能がより明確な形で公開されます。

新規メッセージのリスン

AMQP トランスポートの inbound-endpoint では、指定したキューの新しい AMQP メッセージを待機できます。このリスナの出力では、ペイロードにメッセージの本文、AMQP Attributes に AMQP のすべてのヘッダーとプロパティが記載されます。

Mule 3 の例: メッセージのリスン
<flow name="AMQPTransportInbound">
  <amqp:inbound-endpoint connector-ref="AMQP_Connector" queueName="in" />
  <dw:transform-message> (2)
      <dw:set-payload><![CDATA[%dw 1.0
        %output application/json
        ---
        {
        items: payload,
        costumer: message.inboundProperties.'costumer_id'
        }]]></dw:set-payload>
  </dw:transform-message>
  <object-to-string-transformer/>  (3)
  <amqp:outbound-endpoint exchangeName="v2/prime/orders" connector-ref="AMQP_Connector"/>  (4)
</flow>

この場合は、メッセージをリスンして、新たに定められた形式に適合させます。

1) inboundProperties に含まれているメタデータを使用して MuleMessage を変換し、新たな API で求められる新しい JSON 形式にペイロードを適合させます。 2) 変換されたペイロードを JSON 文字列に変換します。 3) ペイロードを定義済みの交換にパブリッシュします。

Mule 4 に同じコードを実装すると次のようになります。

Mule 4 の例: メッセージのリスン
<flow name="AMQPConnectorPublish">
  <amqp:listener config-ref="AMQP_Config" queueName="in" /> (1)
  <amqp:publish config-ref="AMQP_Config" exchangeName="ordersExcahnge"> (2)
    <amqp:message>
      <amqp:body>#[output application/json ---
      {
        items: payload,
        costumer: attributes.properties.userProperties.costumer_id, (3)
        type: attributes.headers.type
      }]</amqp:body>
    </amqp:message>
  </amqp:publish>
</flow>

この場合、フローのコンポーネント数が減り、メッセージペイロードを異なる形式でパブリッシュするために変更する必要がありません。

1 新しいメッセージの定義はインラインで行われるため、新しいメッセージ本文の JSON のみが作成されます。
2 ここではメッセージの 'inboundProperties' ではなく、'attributes’ POJO を使用しています。このため、AMQP メッセージの 'headers' が 'properties' と区別されます。

メッセージのコンシューム

Mule 3 の AMQP トランスポートでは、特定の宛先からのメッセージをフローの途中でコンシュームすることがサポートされていなかったため、この対処法として「Mule リクエスタモジュール」をアプリケーションに追加し、このモジュールがフローの途中のメッセージのコンシュームを処理していました。

たとえば、AMQP キューを公開する場合、アプリケーションは次のようになりました。

Mule 3 の例: フローの途中のメッセージのコンシューム
<flow name="ordersFromAMQP">
  <http:inbound-endpoint exchange-pattern="request-response" path="orders" host="localhost" port="8081"/>
  <scripting:transformer doc:name="AMQP Message Listening">
    <scripting:script engine="Groovy"><![CDATA[
org.mule.api.MuleMessage message = new org.mule.module.client.MuleClient(muleContext).request('amqp://recordsyntactic_exchange/amqp-queue?connector=AMQP_0_9_Connector&exchangeType=direct&queueDurable=true&exchangeDurable=true&queueAutoDelete=true', 10000L);
]]></scripting:script>
</flow>

留意点:

  • AMQP メッセージに関するすべてのメタデータは完全に失われるため、CorrelationId をログに記録するためには、ヘッダーを取得する構文を把握している必要があります。

  • この要求には AMQP とキューの設定の両方が必要です。

Mule 4 には、'consume' 操作を使用して、フローの途中のメッセージをコンシュームする機能が標準装備されています。この操作は前述のリスナとよく似ていますが、相違点はフローのどの時点でも使用できることです。

Mule 4 の例: フローの途中のメッセージのコンシューム
<flow name="ordersFromAMQP">
  <http:listener config-ref="HTTP_Listener_config" path="/orders"/>
  <amqp:consume config-ref="config"  queueName="Orders" />
  <logger level="INFO" message="#['CorrelationId: ' ++ attributes.properties.correlationId]"/>
</flow>

要求-応答の実行

AMQP では reply_to プロパティを使用して、RPC パターンを実装できます。この場合は、クライアントがその場で作成した一時的な専用応答キューを使用するか、既存のキューを使用します。

一時的な自動削除非公開応答キューを使用した要求-応答

Mule 3 の場合、メッセージの到着後に破棄される一時的な専用キューを応答キューとする最初のケースでは、アウトバウンドエンドポイントで "request-response" exchange-pattern を使用します。

Mule 3 の例: 一時的な応答先を使用した要求-応答の実行
<flow name="amqpRequestReplyTemporaryDestination">
  <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8080" path="invoices"/>
  <dw:transform-message>
      <dw:set-payload><![CDATA[%dw 1.0
        %output application/xml
        ---
        {
        data: payload,
        costumer: message.inboundProperties."http.query.params".costumer_id
        }]]></dw:set-payload>
  </dw:transform-message>
  <object-to-string-transformer/>
  <amqp:outbound-endpoint exchange-pattern="request-response" queueName="invoiceProcessor" connector-ref="AMQP_Connector"/>
  <logger level="INFO" message="Status: #[payload]">
</flow>

他方 Mule 4 には、この特定のユースケースの解決を目的とする、publish-consume というまったく新しい操作が用意されています。

Mule 4 の例: 一時的な応答先を使用した要求-応答の実行
<flow name="amqpRequestReplyTemporaryDestination">
  <http:listener config-ref="HTTP_Listener_config" path="/invoices"/>
  <amqp:publish-consume config-ref="AMQP_Config" exchangeName="invoiceProcessor">
    <amqp:message>
      <amqp:body>#[output application/xml ---
      {
        data: payload,
        costumer: attributes.queryParams.costumer_id
      }]</amqp:body>
    </amqp:message>
  </amqp:publish-consume>
  <logger level="INFO" message="#['Status: ' ++ payload]">
</flow>

この場合も、メッセージの作成が message 要素の操作のインラインで実行され、送信メッセージに影響する変換や設定はすべてこの要素の一部として行われます。

明示的な応答先キューを使用した要求-応答

Mule 4 では、replyTo プロパティを指定した明示的な reply-to キューを使用して、要求応答を実行できます。

Mule 4 の例: 明示的な応答先を使用した要求-応答の実行
<flow name="amqpRequestReplyTemporaryDestination">
  <http:listener config-ref="HTTP_Listener_config" path="/invoices"/>
  <amqp:publish-consume config-ref="AMQP_Config" exchangeName="targetExchange">
    <amqp:message>
      <amqp:body>#[output application/xml ---
      {
        data: payload,
        costumer: attributes.queryParams.costumer_id
      }]</amqp:body>
      <amqp:properties replyTo="replyToQueue" />
    </amqp:message>
  </amqp:publish-consume>
  <logger level="INFO" message="#['Status: ' ++ payload]">
</flow>

トランザクションの使用

3.x から 4.x に移行する場合、トランザクションのサポートの設定はよく似ていますが、相違点はご想像のとおり、inbound-endpointoutbound-endpoint で行われていた設定が、操作やソースのトランザクションに関する標準化された Mule 4 アプローチに変更されたことです。

Mule 3 の例: トランザクションの使用
<flow name="transactedAmqpFlow">
    <amqp:inbound-endpoint queue=Name"${in}">
        <amqp:transaction action="ALWAYS_BEGIN" /> (1)
    </amqp:inbound-endpoint>
    <set-variable variableName="originalPayload" value="#[payload]"/> (2)
    <dw:transform-message> (3)
        <dw:set-payload><![CDATA[%dw 1.0
          %output application/xml
          ---
          payload
          ]]></dw:set-payload>
    </dw:transform-message>
    <object-to-string-transformer/>
    <amqp:outbound-endpoint exchangeName="${out}"> (4)
        <amqp:transaction action="ALWAYS_JOIN"/>
    </amqp:outbound-endpoint>
    <default-exception-strategy>
        <commit-transaction exception-pattern="*"/> (5)
        <set-payload value="#[flowVars.originalPayload]"/> (6)
        <amqp:outbound-endpoint queue="dead.letter"> (7)
            <amqp:transaction action="JOIN_IF_POSSIBLE"/>
        </amqp:outbound-endpoint>
    </default-exception-strategy>
</flow>

注意点:

1 ALWAYS_BEGIN を指定したインバウンドエンドポイントからトランザクションが開始されます。
2 元のペイロードが失われていないことを確認します。
3 ペイロードが変換され、アウトバウンドエンドポイントを介して送信可能になります。
4 アウトバウンドエンドポイントが ALWAYS_JOIN に設定されます。
5 あらゆる例外をキャッチする例外戦略をセットアップします。
6 元のペイロードが復元されるため、元のメッセージが dead.letter にパブリッシュされます。
7 最後に、元のメッセージを dead.letter に送信して、現在のトランザクションへの結合を試行します。

Mule 4 でも、次のアプローチで同じシナリオを実装できます。

Mule 4 の例: トランザクションの使用
<flow name="transactedAmqpFlow">
    <amqp:listener config-ref="AMQP_Config" queueName="${in}" transactionalAction="ALWAYS_BEGIN"/> (1)
    <amqp:publish config-ref="AMQP_Config" destination="${out}" transactionalAction="ALWAYS_JOIN"> (2)
        <amqp:message>
            <amqp:body>#[output application/xml --- payload</amqp:body>
        </amqp:message>
    </amqp:publish>
    <error-handler>
        <on-error-continue type="ANY"> (3)
          <amqp:publish config-ref="AMQP_Config" exchangeName="dead.letter" transactionalAction="JOIN_IF_POSSIBLE"> (4)
          	<amqp:routing-keys>
				<amqp:routing-key value="dead.letter" />
			</amqp:routing-keys>
          </amqp:publish>
        </on-error-continue>
    </error-handler>
</flow>
1 ALWAYS_BEGIN を指定した listener からトランザクションが開始されます。
2 publish 操作によってペイロードが XML 形式でパブリッシュされますが、現在のペイロードを変更したり、ALWAYS_JOIN を指定したトランザクションに結合することはありません。
3 生じたエラーをキャッチするエラーハンドラを使用して、メッセージが失われていないことを確認します。
4 現在のペイロードは受信した元のメッセージのままのため、JOIN_IF_POSSIBLE トランザクションアクションを使用してそのまま dead.letter にパブリッシュします。

キューや交換の存在の強制

Mule 3 では、activeDeclarationsOnly を false に設定している場合に、宣言されたキューまたは交換が存在しなければ、AMQP コネクタが 404 エラーを記載した ShutdownSignalException をスローします。Mule 4 では、createFallbackQueuecreateFallbackExchange パラメータを使用して、キューや交換を強制的に存在させることができます。

<amqp:config name="Amqp_Config" createFallbackQueue="false" createFallbackQueue="false">
    <amqp:connection host="localhost" username="guest" password="guest" />
</amqp:config>

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub