Flex Gateway新着情報
Governance新着情報
Monitoring API Manager他のコネクタと同様、AMQP Connector も一新され、Mule 3 トランスポートモデルから、簡便な UX や DataSense に完全対応するメッセージ情報を特長とする操作ベースのコネクタに進化しています。 つまり、AMQP を使用すると、他のコネクタと同じように接続や設定を行えるだけでなく、メッセージのパブリッシュやコンシュームも構造されたメッセージ情報に基づくため簡便になります。
3.x のトランスポート設定から Mule 4 の AMQP Connector 設定に移行するということは、基本的にはほぼ同じパラメーターを使用しながら、より一貫性のある方法で宣言することになります。Mule 4 では AMQP Connector の一定のパラメーターが変更されています。 たとえば、メッセージのコンシュームに使用するパラメーターは 'consumer-config' グループで宣言されますが、一般的な認証パラメーターは一般グループで設定されます。また、コネクタの動作に影響するパラメーター (config に存在) が、接続の確立方法にのみ影響するパラメーター (接続レベルで存在) と区別されています。
<amqp:connector name="AMQP_Config"
fallbackAddresses="192.168.0.1,192.168.0.2"
virtualHost="/"
username="guest"
password="guest"
deliveryMode="TRANSIENT"
priority="1"
ackMode="AUTO"
activeDeclarationsOnly="false"
mandatory="false"
immediate="false"
prefetchSize="16"
prefetchCount="16"
noLocal="false"
exclusiveConsumers="false"
requestBrokerConfirms="false"
numberOfChannels="10"
/>
Mule 4 でも、実際に適用するコンテキストに同じパラメーターを設定すれば、これと同じ設定にすることができます。
<amqp:config name="AMQP_Config">
<amqp:connection host="localhost" port="5672"
virtualHost="/" username="guest" password="guest" />
<amqp:consumer-config numberOfConsumers="10" exclusiveConsumers="false" noLocal="false" ackMode="AUTO"/>
<amqp:publisher-config requestBrokerConfirms="false" mandatory="false" immediate="false" priority="1" deliveryMode="TRANSIENT"/>
<amqp:quality-of-service prefetchSize="16" prefetchCount="16" />
</amqp:config>
fallbackAddresses
はサポートされなくなりました。
Mule 4 の新しいアプローチでは、AMQP の 'publish' 操作に入力パラメーターのみを利用して、パブリッシュする AMQP メッセージのビルドを完成させます。
たとえば、本文にペイロードの一部のみが記載された、優先度の高い AMQP メッセージを送信し、このメッセージをグループに関連付ける場合、次の手順を実行する必要があります。
<1>) transform
を使用して、メッセージ本文となるものをペイロードに設定します。
<2>) 生成されたストリームを文字列に変換して、テキストメッセージとして送信します。
<3>) AMQP プロパティに priority
を、AMQP メッセージの優先度を設定するキーとして設定します。
<flow name="AmqpTransportOutbound">
<http:listener config-ref="HTTP_Listener_Configuration" path="/orders"/>
<dw:transform-message> (1)
<dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
{
order_id: payload.id,
supplier: payload.warehouse
}]]></dw:set-payload>
</dw:transform-message>
<object-to-string-transformer/> (2)
<amqp:outbound-endpoint exchangeName="testExchange" connector-ref="AMQP_Connector" >
<message-properties-transformer scope="outbound">
<add-message-property key="priority" value="9"/> (3)
</message-properties-transformer>
</amqp:outbound-endpoint>
</flow>
Mule 4 でも、AMQP Connector を次のとおり設定すれば、同じ結果を得ることができます。
<flow name="AMQPConnectorPublish">
<http:listener config-ref="HTTP_Listener_config" path="/orders"/>
(2)
<amqp:publish config-ref="AMQP_Config" exchangeName="targetExchange">
<amqp:message> (1)
<amqp:body>#[output application/json ---
{
order_id: payload.id,
supplier: payload.warehouse
}]</amqp:body>
<amqp:properties priority="3"/> (3)
</amqp:message>
</amqp:publish>
</flow>
次の違いに注意します。
1) メッセージの body
はインラインで作成され、ペイロードには変更がないため、transform
コンポーネントは必要ありません。
2) Connector が変換出力を自動的に処理できるため、object-to-string
トランスフォーマーも排除されています。
3) 優先度は AmqpMessage 操作のプロパティとして設定され、ユーザーが正確なキーを把握している必要はありません。
上記をまとめると、3.x で AMQP トランスポートを使用してメッセージをパブリッシュするときは、AmqpMessage ペイロードとアウトバウンドプロパティを利用して AMQP メッセージの作成を設定するため、トランスポートがどのように機能するのかの詳しい知識が必要でした。4.x では AMQP Connector が、設定可能な各要素をその要素が属するスコープのパラメーターとして公開するため、AMQP のすべての機能がより明確な形で公開されます。
AMQP トランスポートの inbound-endpoint
では、指定したキューの新しい AMQP メッセージを待機できます。このリスナーの出力では、ペイロードにメッセージの本文、AMQP Attributes
に AMQP のすべてのヘッダーとプロパティが記載されます。
<flow name="AMQPTransportInbound">
<amqp:inbound-endpoint connector-ref="AMQP_Connector" queueName="in" />
<dw:transform-message> (2)
<dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
{
items: payload,
costumer: message.inboundProperties.'costumer_id'
}]]></dw:set-payload>
</dw:transform-message>
<object-to-string-transformer/> (3)
<amqp:outbound-endpoint exchangeName="v2/prime/orders" connector-ref="AMQP_Connector"/> (4)
</flow>
この場合は、メッセージをリスンして、新たに定められた形式に適合させます。
1) inboundProperties に含まれているメタデータを使用して MuleMessage を変換し、新たな API で求められる新しい JSON 形式にペイロードを適合させます。 2) 変換されたペイロードを JSON 文字列に変換します。 3) ペイロードを定義済みのエクスチェンジにパブリッシュします。
Mule 4 に同じコードを実装すると次のようになります。
<flow name="AMQPConnectorPublish">
<amqp:listener config-ref="AMQP_Config" queueName="in" /> (1)
<amqp:publish config-ref="AMQP_Config" exchangeName="ordersExchange"> (2)
<amqp:message>
<amqp:body>#[output application/json ---
{
items: payload,
costumer: attributes.properties.userProperties.costumer_id, (3)
type: attributes.headers.type
}]</amqp:body>
</amqp:message>
</amqp:publish>
</flow>
この場合、フローのコンポーネント数が減り、メッセージペイロードを異なる形式でパブリッシュするために変更する必要がありません。
1 | 新しいメッセージの定義はインラインで行われるため、新しいメッセージ本文の JSON のみが作成されます。 |
2 | ここではメッセージの 'inboundProperties' ではなく、'attributes’ POJO を使用しています。このため、AMQP メッセージの 'headers' が 'properties' と区別されます。 |
Mule 3 の AMQP トランスポートでは、特定の宛先からのメッセージをフローの途中でコンシュームすることがサポートされていなかったため、この対処法として「Mule Requester Module」をアプリケーションに追加し、このモジュールがフローの途中のメッセージのコンシュームを処理していました。
たとえば、AMQP キューを公開する場合、アプリケーションは次のようになりました。
<flow name="ordersFromAMQP">
<http:inbound-endpoint exchange-pattern="request-response" path="orders" host="localhost" port="8081"/>
<scripting:transformer doc:name="AMQP Message Listening">
<scripting:script engine="Groovy"><![CDATA[
org.mule.api.MuleMessage message = new org.mule.module.client.MuleClient(muleContext).request('amqp://recordsyntactic_exchange/amqp-queue?connector=AMQP_0_9_Connector&exchangeType=direct&queueDurable=true&exchangeDurable=true&queueAutoDelete=true', 10000L);
]]></scripting:script>
</flow>
留意点:
AMQP メッセージに関するすべてのメタデータは完全に失われるため、CorrelationId をログに記録するためには、ヘッダーを取得する構文を把握している必要があります。
この要求には AMQP とキューの設定の両方が必要です。
Mule 4 には、'consume' 操作を使用して、フローの途中のメッセージをコンシュームする機能が標準装備されています。この操作は前述のリスナーとよく似ていますが、相違点はフローのどの時点でも使用できることです。
<flow name="ordersFromAMQP">
<http:listener config-ref="HTTP_Listener_config" path="/orders"/>
<amqp:consume config-ref="config" queueName="Orders" />
<logger level="INFO" message="#['CorrelationId: ' ++ attributes.properties.correlationId]"/>
</flow>
AMQP では reply_to
プロパティを使用して、RPC パターンを実装できます。この場合は、クライアントがその場で作成した一時的な専用応答キューを使用するか、既存のキューを使用します。
Mule 3 の場合、メッセージの到着後に破棄される一時的な専用キューを応答キューとする最初のケースでは、アウトバウンドエンドポイントで "request-response" exchange-pattern を使用します。
<flow name="amqpRequestReplyTemporaryDestination">
<http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8080" path="invoices"/>
<dw:transform-message>
<dw:set-payload><![CDATA[%dw 1.0
%output application/xml
---
{
data: payload,
costumer: message.inboundProperties."http.query.params".costumer_id
}]]></dw:set-payload>
</dw:transform-message>
<object-to-string-transformer/>
<amqp:outbound-endpoint exchange-pattern="request-response" queueName="invoiceProcessor" connector-ref="AMQP_Connector"/>
<logger level="INFO" message="Status: #[payload]">
</flow>
他方 Mule 4 には、この特定のユースケースの解決を目的とする、publish-consume
というまったく新しい操作が用意されています。
<flow name="amqpRequestReplyTemporaryDestination">
<http:listener config-ref="HTTP_Listener_config" path="/invoices"/>
<amqp:publish-consume config-ref="AMQP_Config" exchangeName="invoiceProcessor">
<amqp:message>
<amqp:body>#[output application/xml ---
{
data: payload,
costumer: attributes.queryParams.costumer_id
}]</amqp:body>
</amqp:message>
</amqp:publish-consume>
<logger level="INFO" message="#['Status: ' ++ payload]">
</flow>
この場合も、メッセージの作成が message
要素の操作のインラインで実行され、送信メッセージに影響する変換や設定はすべてこの要素の一部として行われます。
Mule 4 では、replyTo プロパティを指定した明示的な reply-to
キューを使用して、要求応答を実行できます。
<flow name="amqpRequestReplyTemporaryDestination">
<http:listener config-ref="HTTP_Listener_config" path="/invoices"/>
<amqp:publish-consume config-ref="AMQP_Config" exchangeName="targetExchange">
<amqp:message>
<amqp:body>#[output application/xml ---
{
data: payload,
costumer: attributes.queryParams.costumer_id
}]</amqp:body>
<amqp:properties replyTo="replyToQueue" />
</amqp:message>
</amqp:publish-consume>
<logger level="INFO" message="#['Status: ' ++ payload]">
</flow>
3.x から 4.x に移行する場合、トランザクションのサポートの設定はよく似ていますが、相違点はご想像のとおり、inbound-endpoint
や outbound-endpoint
で行われていた設定が、操作や接続元のトランザクションに関する標準化された Mule 4 アプローチに変更されたことです。
<flow name="transactedAmqpFlow">
<amqp:inbound-endpoint queue=Name"${in}">
<amqp:transaction action="ALWAYS_BEGIN" /> (1)
</amqp:inbound-endpoint>
<set-variable variableName="originalPayload" value="#[payload]"/> (2)
<dw:transform-message> (3)
<dw:set-payload><![CDATA[%dw 1.0
%output application/xml
---
payload
]]></dw:set-payload>
</dw:transform-message>
<object-to-string-transformer/>
<amqp:outbound-endpoint exchangeName="${out}"> (4)
<amqp:transaction action="ALWAYS_JOIN"/>
</amqp:outbound-endpoint>
<default-exception-strategy>
<commit-transaction exception-pattern="*"/> (5)
<set-payload value="#[flowVars.originalPayload]"/> (6)
<amqp:outbound-endpoint queue="dead.letter"> (7)
<amqp:transaction action="JOIN_IF_POSSIBLE"/>
</amqp:outbound-endpoint>
</default-exception-strategy>
</flow>
注意点:
1 | ALWAYS_BEGIN を指定したインバウンドエンドポイントからトランザクションが開始されます。 |
2 | 元のペイロードが失われていないことを確認します。 |
3 | ペイロードが変換され、アウトバウンドエンドポイントを介して送信可能になります。 |
4 | アウトバウンドエンドポイントが ALWAYS_JOIN に設定されます。 |
5 | あらゆる例外をキャッチする例外戦略をセットアップします。 |
6 | 元のペイロードが復元されるため、元のメッセージが dead.letter にパブリッシュされます。 |
7 | 最後に、元のメッセージを dead.letter に送信して、現在のトランザクションへの結合を試行します。 |
Mule 4 でも、次のアプローチで同じシナリオを実装できます。
<flow name="transactedAmqpFlow">
<amqp:listener config-ref="AMQP_Config" queueName="${in}" transactionalAction="ALWAYS_BEGIN"/> (1)
<amqp:publish config-ref="AMQP_Config" destination="${out}" transactionalAction="ALWAYS_JOIN"> (2)
<amqp:message>
<amqp:body>#[output application/xml --- payload</amqp:body>
</amqp:message>
</amqp:publish>
<error-handler>
<on-error-continue type="ANY"> (3)
<amqp:publish config-ref="AMQP_Config" exchangeName="dead.letter" transactionalAction="JOIN_IF_POSSIBLE"> (4)
<amqp:routing-keys>
<amqp:routing-key value="dead.letter" />
</amqp:routing-keys>
</amqp:publish>
</on-error-continue>
</error-handler>
</flow>
1 | ALWAYS_BEGIN を指定した listener からトランザクションが開始されます。 |
2 | publish 操作によってペイロードが XML 形式でパブリッシュされますが、現在のペイロードを変更したり、ALWAYS_JOIN を指定したトランザクションに結合することはありません。 |
3 | 生じたエラーをキャッチするエラーハンドラーを使用して、メッセージが失われていないことを確認します。 |
4 | 現在のペイロードは受信した元のメッセージのままのため、JOIN_IF_POSSIBLE トランザクションアクションを使用してそのまま dead.letter にパブリッシュします。 |
Mule 3 では、activeDeclarationsOnly
を false に設定している場合に、宣言されたキューまたはエクスチェンジが存在しなければ、AMQP Connector が 404 エラーを記載した ShutdownSignalException
をスローします。Mule 4 では、createFallbackQueue
と createFallbackExchange
パラメーターを使用して、キューやエクスチェンジを強制的に存在させることができます。
<amqp:config name="Amqp_Config" createFallbackQueue="false" createFallbackQueue="false">
<amqp:connection host="localhost" username="guest" password="guest" />
</amqp:config>