AMQP Connector への移行

他のコネクタと同様、AMQP Connector も一新され、Mule 3 トランスポートモデルから、簡便な UX や DataSense に完全対応するメッセージ情報を特長とする操作ベースのコネクタに進化しています。 つまり、AMQP を使用すると、他のコネクタと同じように接続や設定を行えるだけでなく、メッセージのパブリッシュやコンシュームも構造されたメッセージ情報に基づくため簡便になります。

コネクタの設定

3.x のトランスポート設定から Mule 4 の AMQP Connector 設定に移行するということは、基本的にはほぼ同じパラメーターを使用しながら、より一貫性のある方法で宣言することになります。Mule 4 では AMQP Connector の一定のパラメーターが変更されています。 たとえば、メッセージのコンシュームに使用するパラメーターは 'consumer-config' グループで宣言されますが、一般的な認証パラメーターは一般グループで設定されます。また、コネクタの動作に影響するパラメーター (config に存在) が、接続の確立方法にのみ影響するパラメーター (接続レベルで存在) と区別されています。

Mule 3 の例: Connector の設定
<amqp:connector name="AMQP_Config"
     fallbackAddresses="192.168.0.1,192.168.0.2"
     virtualHost="/"
     username="guest"
     password="guest"
	 deliveryMode="TRANSIENT"
	 priority="1"
	 ackMode="AUTO"
	 activeDeclarationsOnly="false"
	 mandatory="false"
	 immediate="false"
	 prefetchSize="16"
	 prefetchCount="16"
	 noLocal="false"
	 exclusiveConsumers="false"
	 requestBrokerConfirms="false"
	 numberOfChannels="10"
	  />

Mule 4 でも、実際に適用するコンテキストに同じパラメーターを設定すれば、これと同じ設定にすることができます。

Mule 4 の例: Connector の設定
<amqp:config name="AMQP_Config">
	<amqp:connection host="localhost" port="5672"
		virtualHost="/" username="guest" password="guest" />
	<amqp:consumer-config numberOfConsumers="10" exclusiveConsumers="false" noLocal="false" ackMode="AUTO"/>
	<amqp:publisher-config requestBrokerConfirms="false" mandatory="false" immediate="false" priority="1" deliveryMode="TRANSIENT"/>
	<amqp:quality-of-service prefetchSize="16" prefetchCount="16" />
</amqp:config>

fallbackAddresses​ はサポートされなくなりました。

メッセージの送信

Mule 4 の新しいアプローチでは、AMQP の 'publish' 操作に入力パラメーターのみを利用して、パブリッシュする AMQP メッセージのビルドを完成させます。

たとえば、本文にペイロードの一部のみが記載された、優先度の高い AMQP メッセージを送信し、このメッセージをグループに関連付ける場合、次の手順を実行する必要があります。 <1>) ​transform​ を使用して、メッセージ本文となるものをペイロードに設定します。 <2>) 生成されたストリームを文字列に変換して、テキストメッセージとして送信します。 <3>) AMQP プロパティに ​priority​ を、AMQP メッセージの優先度を設定するキーとして設定します。

Mule 3 の例: 優先度が設定されたメッセージをグループの一部として送信
<flow name="AmqpTransportOutbound">
    <http:listener config-ref="HTTP_Listener_Configuration" path="/orders"/>
    <dw:transform-message> (1)
        <dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
{
order_id: payload.id,
supplier: payload.warehouse
}]]></dw:set-payload>
    </dw:transform-message>
    <object-to-string-transformer/> (2)
    <amqp:outbound-endpoint exchangeName="testExchange" connector-ref="AMQP_Connector" >
      <message-properties-transformer scope="outbound">
          <add-message-property key="priority" value="9"/> (3)
      </message-properties-transformer>
    </amqp:outbound-endpoint>
</flow>

Mule 4 でも、AMQP Connector を次のとおり設定すれば、同じ結果を得ることができます。

Mule 4 の例: 優先度が設定されたメッセージをグループの一部として送信
<flow name="AMQPConnectorPublish">
		<http:listener config-ref="HTTP_Listener_config" path="/orders"/>
		(2)
		<amqp:publish config-ref="AMQP_Config" exchangeName="targetExchange">
			<amqp:message> (1)
				<amqp:body>#[output application/json ---
        {
          order_id: payload.id,
          supplier: payload.warehouse
        }]</amqp:body>
        			<amqp:properties priority="3"/> (3)
			</amqp:message>
		</amqp:publish>
	</flow>

次の違いに注意します。

1) メッセージの ​body​ はインラインで作成され、ペイロードには変更がないため、​transform​ コンポーネントは必要ありません。 2) Connector が変換出力を自動的に処理できるため、​object-to-string​ トランスフォーマーも排除されています。 3) 優先度は AmqpMessage 操作のプロパティとして設定され、ユーザーが正確なキーを把握している必要はありません。

上記をまとめると、3.x で AMQP トランスポートを使用してメッセージをパブリッシュするときは、AmqpMessage ペイロードとアウトバウンドプロパティを利用して AMQP メッセージの作成を設定するため、トランスポートがどのように機能するのかの詳しい知識が必要でした。4.x では AMQP Connector が、設定可能な各要素をその要素が属するスコープのパラメーターとして公開するため、AMQP のすべての機能がより明確な形で公開されます。

新規メッセージのリスン

AMQP トランスポートの ​inbound-endpoint​ では、指定したキューの新しい AMQP メッセージを待機できます。このリスナーの出力では、ペイロードにメッセージの本文、​AMQP Attributes​ に AMQP のすべてのヘッダーとプロパティが記載されます。

Mule 3 の例: メッセージのリスン
<flow name="AMQPTransportInbound">
  <amqp:inbound-endpoint connector-ref="AMQP_Connector" queueName="in" />
  <dw:transform-message> (2)
      <dw:set-payload><![CDATA[%dw 1.0
        %output application/json
        ---
        {
        items: payload,
        costumer: message.inboundProperties.'costumer_id'
        }]]></dw:set-payload>
  </dw:transform-message>
  <object-to-string-transformer/>  (3)
  <amqp:outbound-endpoint exchangeName="v2/prime/orders" connector-ref="AMQP_Connector"/>  (4)
</flow>

この場合は、メッセージをリスンして、新たに定められた形式に適合させます。

1) inboundProperties に含まれているメタデータを使用して MuleMessage を変換し、新たな API で求められる新しい JSON 形式にペイロードを適合させます。 2) 変換されたペイロードを JSON 文字列に変換します。 3) ペイロードを定義済みのエクスチェンジにパブリッシュします。

Mule 4 に同じコードを実装すると次のようになります。

Mule 4 の例: メッセージのリスン
<flow name="AMQPConnectorPublish">
  <amqp:listener config-ref="AMQP_Config" queueName="in" /> (1)
  <amqp:publish config-ref="AMQP_Config" exchangeName="ordersExchange"> (2)
    <amqp:message>
      <amqp:body>#[output application/json ---
      {
        items: payload,
        costumer: attributes.properties.userProperties.costumer_id, (3)
        type: attributes.headers.type
      }]</amqp:body>
    </amqp:message>
  </amqp:publish>
</flow>

この場合、フローのコンポーネント数が減り、メッセージペイロードを異なる形式でパブリッシュするために変更する必要がありません。

1 新しいメッセージの定義はインラインで行われるため、新しいメッセージ本文の JSON のみが作成されます。
2 ここではメッセージの 'inboundProperties' ではなく、'attributes’ POJO を使用しています。このため、AMQP メッセージの 'headers' が 'properties' と区別されます。

メッセージのコンシューム

Mule 3 の AMQP トランスポートでは、特定の宛先からのメッセージをフローの途中でコンシュームすることがサポートされていなかったため、この対処法として「Mule Requester Module」をアプリケーションに追加し、このモジュールがフローの途中のメッセージのコンシュームを処理していました。

たとえば、AMQP キューを公開する場合、アプリケーションは次のようになりました。

Mule 3 の例: フローの途中のメッセージのコンシューム
<flow name="ordersFromAMQP">
  <http:inbound-endpoint exchange-pattern="request-response" path="orders" host="localhost" port="8081"/>
  <scripting:transformer doc:name="AMQP Message Listening">
    <scripting:script engine="Groovy"><![CDATA[
org.mule.api.MuleMessage message = new org.mule.module.client.MuleClient(muleContext).request('amqp://recordsyntactic_exchange/amqp-queue?connector=AMQP_0_9_Connector&exchangeType=direct&queueDurable=true&exchangeDurable=true&queueAutoDelete=true', 10000L);
]]></scripting:script>
</flow>

留意点:

  • AMQP メッセージに関するすべてのメタデータは完全に失われるため、CorrelationId をログに記録するためには、ヘッダーを取得する構文を把握している必要があります。

  • この要求には AMQP とキューの設定の両方が必要です。

Mule 4 には、'consume' 操作を使用して、フローの途中のメッセージをコンシュームする機能が標準装備されています。この操作は前述のリスナーとよく似ていますが、相違点はフローのどの時点でも使用できることです。

Mule 4 の例: フローの途中のメッセージのコンシューム
<flow name="ordersFromAMQP">
  <http:listener config-ref="HTTP_Listener_config" path="/orders"/>
  <amqp:consume config-ref="config"  queueName="Orders" />
  <logger level="INFO" message="#['CorrelationId: ' ++ attributes.properties.correlationId]"/>
</flow>

要求-応答の実行

AMQP では ​reply_to​ プロパティを使用して、RPC パターンを実装できます。この場合は、クライアントがその場で作成した一時的な専用応答キューを使用するか、既存のキューを使用します。

一時的な自動削除非公開応答キューを使用した要求-応答

Mule 3 の場合、メッセージの到着後に破棄される一時的な専用キューを応答キューとする最初のケースでは、アウトバウンドエンドポイントで "request-response" exchange-pattern を使用します。

Mule 3 の例: 一時的な応答先を使用した要求-応答の実行
<flow name="amqpRequestReplyTemporaryDestination">
  <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8080" path="invoices"/>
  <dw:transform-message>
      <dw:set-payload><![CDATA[%dw 1.0
        %output application/xml
        ---
        {
        data: payload,
        costumer: message.inboundProperties."http.query.params".costumer_id
        }]]></dw:set-payload>
  </dw:transform-message>
  <object-to-string-transformer/>
  <amqp:outbound-endpoint exchange-pattern="request-response" queueName="invoiceProcessor" connector-ref="AMQP_Connector"/>
  <logger level="INFO" message="Status: #[payload]">
</flow>

他方 Mule 4 には、この特定のユースケースの解決を目的とする、​publish-consume​ というまったく新しい操作が用意されています。

Mule 4 の例: 一時的な応答先を使用した要求-応答の実行
<flow name="amqpRequestReplyTemporaryDestination">
  <http:listener config-ref="HTTP_Listener_config" path="/invoices"/>
  <amqp:publish-consume config-ref="AMQP_Config" exchangeName="invoiceProcessor">
    <amqp:message>
      <amqp:body>#[output application/xml ---
      {
        data: payload,
        costumer: attributes.queryParams.costumer_id
      }]</amqp:body>
    </amqp:message>
  </amqp:publish-consume>
  <logger level="INFO" message="#['Status: ' ++ payload]">
</flow>

この場合も、メッセージの作成が ​message​ 要素の操作のインラインで実行され、送信メッセージに影響する変換や設定はすべてこの要素の一部として行われます。

明示的な応答先キューを使用した要求-応答

Mule 4 では、replyTo プロパティを指定した明示的な ​reply-to​ キューを使用して、要求応答を実行できます。

Mule 4 の例: 明示的な応答先を使用した要求-応答の実行
<flow name="amqpRequestReplyTemporaryDestination">
  <http:listener config-ref="HTTP_Listener_config" path="/invoices"/>
  <amqp:publish-consume config-ref="AMQP_Config" exchangeName="targetExchange">
    <amqp:message>
      <amqp:body>#[output application/xml ---
      {
        data: payload,
        costumer: attributes.queryParams.costumer_id
      }]</amqp:body>
      <amqp:properties replyTo="replyToQueue" />
    </amqp:message>
  </amqp:publish-consume>
  <logger level="INFO" message="#['Status: ' ++ payload]">
</flow>

トランザクションの使用

3.x から 4.x に移行する場合、トランザクションのサポートの設定はよく似ていますが、相違点はご想像のとおり、​inbound-endpoint​ や ​outbound-endpoint​ で行われていた設定が、操作や接続元のトランザクションに関する標準化された Mule 4 アプローチに変更されたことです。

Mule 3 の例: トランザクションの使用
<flow name="transactedAmqpFlow">
    <amqp:inbound-endpoint queue=Name"${in}">
        <amqp:transaction action="ALWAYS_BEGIN" /> (1)
    </amqp:inbound-endpoint>
    <set-variable variableName="originalPayload" value="#[payload]"/> (2)
    <dw:transform-message> (3)
        <dw:set-payload><![CDATA[%dw 1.0
          %output application/xml
          ---
          payload
          ]]></dw:set-payload>
    </dw:transform-message>
    <object-to-string-transformer/>
    <amqp:outbound-endpoint exchangeName="${out}"> (4)
        <amqp:transaction action="ALWAYS_JOIN"/>
    </amqp:outbound-endpoint>
    <default-exception-strategy>
        <commit-transaction exception-pattern="*"/> (5)
        <set-payload value="#[flowVars.originalPayload]"/> (6)
        <amqp:outbound-endpoint queue="dead.letter"> (7)
            <amqp:transaction action="JOIN_IF_POSSIBLE"/>
        </amqp:outbound-endpoint>
    </default-exception-strategy>
</flow>

注意点:

1 ALWAYS_BEGIN​ を指定したインバウンドエンドポイントからトランザクションが開始されます。
2 元のペイロードが失われていないことを確認します。
3 ペイロードが変換され、アウトバウンドエンドポイントを介して送信可能になります。
4 アウトバウンドエンドポイントが ​ALWAYS_JOIN​ に設定されます。
5 あらゆる例外をキャッチする例外戦略をセットアップします。
6 元のペイロードが復元されるため、元のメッセージが dead.letter にパブリッシュされます。
7 最後に、元のメッセージを dead.letter に送信して、現在のトランザクションへの結合を試行します。

Mule 4 でも、次のアプローチで同じシナリオを実装できます。

Mule 4 の例: トランザクションの使用
<flow name="transactedAmqpFlow">
    <amqp:listener config-ref="AMQP_Config" queueName="${in}" transactionalAction="ALWAYS_BEGIN"/> (1)
    <amqp:publish config-ref="AMQP_Config" destination="${out}" transactionalAction="ALWAYS_JOIN"> (2)
        <amqp:message>
            <amqp:body>#[output application/xml --- payload</amqp:body>
        </amqp:message>
    </amqp:publish>
    <error-handler>
        <on-error-continue type="ANY"> (3)
          <amqp:publish config-ref="AMQP_Config" exchangeName="dead.letter" transactionalAction="JOIN_IF_POSSIBLE"> (4)
          	<amqp:routing-keys>
				<amqp:routing-key value="dead.letter" />
			</amqp:routing-keys>
          </amqp:publish>
        </on-error-continue>
    </error-handler>
</flow>
1 ALWAYS_BEGIN​ を指定した ​listener​ からトランザクションが開始されます。
2 publish​ 操作によってペイロードが XML 形式でパブリッシュされますが、現在のペイロードを変更したり、​ALWAYS_JOIN​ を指定したトランザクションに結合することはありません。
3 生じたエラーをキャッチするエラーハンドラーを使用して、メッセージが失われていないことを確認します。
4 現在のペイロードは受信した元のメッセージのままのため、​JOIN_IF_POSSIBLE​ トランザクションアクションを使用してそのまま dead.letter にパブリッシュします。

キューやエクスチェンジの存在の強制

Mule 3 では、​activeDeclarationsOnly​ を false に設定している場合に、宣言されたキューまたはエクスチェンジが存在しなければ、AMQP Connector が 404 エラーを記載した ​ShutdownSignalException​ をスローします。Mule 4 では、​createFallbackQueue​ と ​createFallbackExchange​ パラメーターを使用して、キューやエクスチェンジを強制的に存在させることができます。

<amqp:config name="Amqp_Config" createFallbackQueue="false" createFallbackQueue="false">
    <amqp:connection host="localhost" username="guest" password="guest" />
</amqp:config>