Flex Gateway新着情報
Governance新着情報
Monitoring API ManagerJMS トランスポートは、Mule 3 トランスポートモデルを進化させ、UX の簡素化と DataSense 完全対応のメッセージ情報を使用する、操作ベースのコネクタへと刷新されました。 つまり JMS を使用すると、接続と設定に関して他のコネクタと同じ環境が提供されるだけではなく、構造化されたメッセージ情報に基づき、メッセージの生成とコンシュームも簡略化されます。
また、JMS Connector の新しいバージョンでは待望の JMS 2.0 仕様がサポートされ、共有サブスクリプションなどの新機能がすべて含まれています。
3.x のトランスポート設定から Mule 4 の JMS Connector 設定への移行は、基本的にはまったく同じパラメーターを使用することを暗黙に示していますが、パラメーターはよりまとまった形で宣言されます。 たとえば、メッセージのコンシュームに使用するパラメーターは「consumer-config」グループで宣言されますが、一般的な認証パラメーターは一般グループで設定されます。また、コネクタの動作に影響するパラメーター (config に存在) が、接続の確立方法にのみ影響するパラメーター (接続レベルで存在) と区別されています。
<jms:connector name="JMS_Config"
acknowledgementMode="DUPS_OK_ACKNOWLEDGE"
clientId="myClient"
durable="true"
noLocal="true"
persistentDelivery="true"
maxRedelivery="5"
cacheJmsSessions="true"
eagerConsumer="false"
specification="1.1"
numberOfConsumers="7"
username="myuser"
password="mypass" />
この同じ設定を Mule 4 で行うには、「topic-consumer」のみに影響する「noLocal」または「durable」のように、実際に適用するコンテキストに同じパラメーターを設定します。
<jms:config name="JMS_Config">
<jms:active-mq-connection specification="JMS_1_1" clientId="myClient" username="myuser" password="mypass">
<jms:factory-configuration maxRedelivery="5"/>
</jms:active-mq-connection>
<jms:consumer-config ackMode="DUPS_OK">
<jms:consumer-type>
<jms:topic-consumer noLocal="true" durable="true"/>
</jms:consumer-type>
</jms:consumer-config>
<jms:producer-config persistentDelivery="true"/>
</jms:config>
Mule 4 の JMS Connector では、汎用的な JMS 接続と簡易的な ActiveMQ 接続が引き続き区別されますが、接続のみに関連付けられる、かなり絞り込まれたパラメーターセットを使用します。まずこの簡易接続の設定方法を確認し、その後に汎用アプローチについて説明します。
トランスポートを使用して ActiveMQ 固有のパラメーターを設定する唯一の方法は、ConfigurationFactory を Spring bean として定義してから、各プロパティに渡すという方法です。 .Mule 3 の例: ActiveMQ への接続
<spring:bean name="connectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL"
value="tcp://activemqserver:61616"/>
<property name="redeliveryPolicy">
<spring:bean class="org.apache.activemq.RedeliveryPolicy">
<property name="initialRedeliveryDelay"
value="20000"/>
<property name="redeliveryDelay"
value="20000"/>
<property name="maximumRedeliveries"
value="10"/>
</spring:bean>
</property>
</spring:bean>
<jms:activemq-connector name="jmsConnector" connectionFactory-ref="connectionFactory"/>
新しい JMS Connector では、このプロパティを activemq connection で直接設定することができます。
<jms:config name="JMS_Config">
<jms:active-mq-connection>
<jms:factory-configuration brokerUrl="tcp://activemqserver:61616"
initialRedeliveryDelay="20000" redeliveryDelay="20000" maxRedelivery="10"/>
</jms:active-mq-connection>
</jms:config>
JMS Connector でまだ標準でサポートされていない Connection Factory を使用するトランスポート設定を移行する必要がある場合は、「activemq-connection」で「connectionFactory」属性を引き続き使用して、Connection Factory の bean に参照を渡すことができます。
Connection Factory への必要な汎用接続は、Mule 3 の JMS トランスポートと Mule 4 の JMS Connector のどちらでも定義することができます。
これには 2 通りの方法があります。1 つ目の方法は、Spring bean を使用して Connection Factory を作成した後、それを Connection Factory として参照することです。
<spring:bean name="connectionFactory" class="com.foo.CustomConnectionFactory"/>
<jms:connector name="jmsConnector" connectionFactory-ref="connectionFactory" />
Mule 4 の場合も、ほとんど同じです。
<spring:bean name="connectionFactory" class="com.foo.CustomConnectionFactory"/>
<jms:config name="JMS_Config">
<jms:generic-connection connectionFactory="customConnectionFactory"/>
</jms:config>
汎用接続を作成するもう 1 つの方法は、JNDI を使用して検出される Connection Factory を使用することです。この場合、機能は同じですが、構文がトランスポートからコネクタに変わります。
<beans>
<util:properties id="providerProperties">
<prop key="queue.jndi-queue-in">in</prop>
<prop key="topic.jndi-topic-in">in</prop>
</util:properties>
</beans>
<jms:connector name="jmsConnector"
jndiInitialFactory="com.sun.jndi.ldap.LdapCtxFactory"
jndiProviderUrl="ldap://localhost:10389/"
jndiProviderProperties-ref="providerProperties"
connectionFactoryJndiName="cn=ConnectionFactory,dc=example,dc=com"
jndiDestinations="true"
forceJndiDestinations="false"/>
Mule 4 の場合は、JNDI をインラインで設定して行うことができます。
<jms:config name="JMS_Config">
<jms:generic-connection>
<jms:connection-factory>
<jms:jndi-connection-factory connectionFactoryJndiName="cn=ConnectionFactory,dc=example,dc=com"
lookupDestination="TRY_ALWAYS">
<jms:name-resolver-builder
jndiInitialContextFactory="com.sun.jndi.ldap.LdapCtxFactory"
jndiProviderUrl="ldap://localhost:10389/">
<jms:provider-properties>
<jms:provider-property key="queue.jndi-queue-in" value="in"/>
<jms:provider-property key="topic.jndi-topic-in" value="in"/>
</jms:provider-properties>
</jms:name-resolver-builder>
</jms:jndi-connection-factory>
</jms:connection-factory>
</jms:generic-connection>
</jms:config>
この例には、主に 3 つの違いがあります。
プロパティはインラインで宣言されるので、Spring bean の util を使用する必要がなくなりました。
JNDI を使用した宛先のルックアップの実行は、以前の「jndiDestinations」と「forceJndiDestinations」の 2 つのパラメーターが統合された、「lookupDestination」という 1 つのパラメーターで設定できるようになりました。
「jndiProviderUrl」が「name-resolver」の一部になったように、パラメーターは関連するコンテキスト内に含まれるようになりました。
JMS トランスポートでは、JMS メッセージの本文を含めるためにペイロードを必要とし、Mule のアウトバウンドプロパティを使用して JMS プロパティとヘッダーをカスタマイズしていました。新しい Mule 4 アプローチでは、JMS の「publish」操作で、入力パラメーターのみを使用してパブリッシュする JMS メッセージを完全にビルドします。
たとえば、本文にペイロードの一部のみを含めて優先度の高い JMS メッセージを送信し、そのメッセージをグループに関連付ける場合、次の手順を実行する必要があります。
<1>) transform
を使用して、メッセージ本文となるものをペイロードに設定します。
<2>) 生成されたストリームを文字列に変換して、テキストメッセージとして送信します。
<3>) アウトバウンドメッセージプロパティにキーとして priority
を設定し、JMSPriority を設定します。
<4>) アウトバウンドメッセージプロパティにキーとして JMSXGroupID
を設定し、JMSXGroupID を設定します。
<flow name="JmsTransportOutbound">
<http:listener config-ref="HTTP_Listener_Configuration" path="/orders"/>
<dw:transform-message> (1)
<dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
{
order_id: payload.id,
supplier: payload.warehouse
}]]></dw:set-payload>
</dw:transform-message>
<object-to-string-transformer/> (2)
<jms:outbound-endpoint queue="storage" connector-ref="Active_MQ">
<message-properties-transformer scope="outbound">
<add-message-property key="JMSXGroupID" value="#[message.inboundProperties."http.query.params".packageGroup]"/> (3)
<add-message-property key="priority" value="9"/> (4)
</message-properties-transformer>
</jms:outbound-endpoint>
</flow>
Mule 4 では JMS Connector と次の設定を使用して、同じ結果を得ることができます。
<flow name="JMSConnectorPublish">
<http:listener config-ref="HTTP_Listener_config" path="/orders"/>
(2)
<jms:publish config-ref="JMS_Config" destination="storage" priority="9"> (3)
<jms:message> (1)
<jms:body>#[output application/json ---
{
order_id: payload.id,
supplier: payload.warehouse
}]</jms:body>
<jms:jmsx-properties jmsxGroupID="#[attributes.queryParams.packageGroup]"/> (4)
</jms:message>
</jms:publish>
</flow>
次の違いに注意します。
1) メッセージの body
はインラインで作成され、ペイロードには変更がないため、transform
コンポーネントは必要ありません。
2) コネクタでは変換出力を自動的に処理できるので、object-to-string
トランスフォーマーも削除されました。
3) 優先度は publish
操作のパラメーターとして設定され、ユーザーが正確なキーを知っている必要はありません。
4) グループはメッセージ JMSX プロパティの一部として設定され、ユーザーが正確なヘッダー名を知っている必要はありません。
要約すると、JMS トランスポートを使用する 3.x でメッセージをパブリッシュする場合、JMS メッセージの作成を設定するには、MuleMessage ペイロードとアウトバウンドプロパティを使用します。それにはトランスポートの機能についてより深い知識が必要でした。4.x の JMS Connector では、設定可能なすべての要素は、属するスコープのパラメーターとして公開されるため、すべての JMS 機能がより明確に公開されます。
JMS トランスポートの inbound-endpoint
では、特定のトピックまたはキューで新規メッセージを待機することができます。このリスナーの出力では、メッセージの本文はペイロードに、すべての JMS ヘッダーとプロパティは inboundProperties
として含まれます。
<flow name="JmsTransportInbound">
<jms:inbound-endpoint connector-ref="Active_MQ" queue="in">
<jms:selector expression="JMSPriority=9"/> (1)
</jms:inbound-endpoint>
<dw:transform-message> (2)
<dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
{
items: payload,
costumer: message.inboundProperties.'costumer_id',
type: message.inboundProperties.'JMSType'
}]]></dw:set-payload>
</dw:transform-message>
<object-to-string-transformer/> (3)
<jms:outbound-endpoint queue="v2/prime/orders" connector-ref="Active_MQ"/> (4)
</flow>
この場合、優先度の高いメッセージをリスンし、優先順序のバージョン 2 で要求される新しい形式に適合させます。
1) 受信メッセージを優先度で絞り込みます。 2) inboundProperties に含まれているメタデータを使用して MuleMessage を変換し、新たな API で求められる新しい JSON 形式にペイロードを適合させます。 3) 変換されたペイロードを JSON 文字列に変換します。 4) ペイロードをプロキシキューにパブリッシュします。
Mule 4 に同じコードを実装すると次のようになります。
<flow name="JMSConnectorPublish">
<jms:listener config-ref="JMS_Config" destination="in" selector="JMSPriority=9"/> (1)
<jms:publish config-ref="JMS_Config" destination="v2/prime/orders"> (2)
<jms:message>
<jms:body>#[output application/json ---
{
items: payload,
costumer: attributes.properties.userProperties.costumer_id, (3)
type: attributes.headers.type
}]</jms:body>
</jms:message>
</jms:publish>
</flow>
この場合、フローのコンポーネント数が減り、メッセージペイロードを異なる形式でパブリッシュするために変更する必要がありません。
1 | 検索条件を使用したリスンは、リスナーで「selector」を設定して行います。 |
2 | 新しいメッセージの定義はインラインで行われるため、新しいメッセージ本文の JSON のみが作成されます。 |
3 | 「inboundProperties」の代わりに、メッセージの「attributes」 POJO を使用します。これで、JMS メッセージの「headers」と「properties」が区別されるようになります。 |
特定の宛先からのメッセージをフロー途中でコンシュームすることは、Mule 3 の JMS トランスポートではサポートされておらず、その対応策としてアプリケーションに Mule Requester Module を追加して、フロー途中のメッセージのコンシュームを処理していました。
たとえば、新しい REST API の背後にある JMS キューを公開する場合、アプリケーションは次のようになります。
<mulerequester:config name="Mule_Requester"/>
<jms:activemq-connector name="Active_MQ" brokerURL="tcp://localhost:61616" specification="1.1"/>
<flow name="ordersFromJMS">
<http:inbound-endpoint exchange-pattern="request-response" path="orders" host="localhost" port="8081"/>
<mulerequester:request config-ref="Mule_Requester"
resource="jms://Orders?selector=shipped%3D'#[message.inboundProperties.'shipped']'"/>
<logger level="INFO" message="CorrelationId: #[message.inboundProperties.'JMSCorrelationId']"/>
</flow>
留意点:
JMS メッセージに関するすべてのメタデータは完全に失われるため、CorrelationId のログを作成するには、ヘッダーを取得する構文を知っている必要があります。
「selector」による動的な絞り込みは、リクエスターの「resource」URL で行われる必要があるため、複数の引数の構成でエラーが発生しやすくなります。
JMS Ruquester と Mule Requester の両方の設定が必要です。
Mule 4 には、「consume」操作を使用する、フロー途中のメッセージのコンシューム機能が標準で備わっています。この操作は前述のリスナーとよく似ていますが、相違点はフローのどの時点でも使用できることです。
<flow name="ordersFromJMS">
<http:listener config-ref="HTTP_Listener_config" path="/orders"/>
<jms:consume destination="Orders" selector=#['shipped=' ++ attributes.queryParams.shipped]/>
<logger level="INFO" message="#['CorrelationId: ' ++ attributes.headers.correlationId]"/>
</flow>
ここで必要なのは JMS Connector のみで、リスナーからのメタデータを使用して「consume」操作に「selector」パラメーターを設定し、またメッセージ属性のメタデータのサポートにより correlationId のログを作成することができました。
3.x でインバウンドエンドポイントとして使用されるトピックでは、トピックサブスクリプションを durable
サブスクリプションとして実行する必要があるかどうかをユーザーが設定することができました。これには異なる設定方法があり、queues
の durable
設定も公開されるという、意味をなさない問題がありました。
3.x でのトピックサブスクリプションは、次のようになります。
<jms:inbound-endpoint connector-ref="Active_MQ" topic="trackedEvents" durable="true" durableName="inboundEvents_1"/>
Mule 4 では、サブスクリプションのメカニズムの見直しを行い、サブスクリプションをトピックのみに絞り込むオプションを残し、JMS 2.0 のサポートのおかげで機能がさらに追加されました。
前述の例は 4.x では次のようになります。
<jms:listener config-ref="JMS_Config" destination="trackedEvents">
<jms:consumer-type>
<jms:topic-consumer durable="true" subscriptionName="inboundEvents_1"/>
</jms:consumer-type>
</jms:listener>
ただし、このケースでは、topic-consumer
設定で shared
サブスクリプションも設定することができ (JMS 2.0 接続を使用している場合のみ)、複数のスレッド、接続、または JVM によるトピックサブスクリプションのメッセージの処理が可能です。
<jms:listener config-ref="JMS_Config" destination="trackedEvents">
<jms:consumer-type>
<jms:topic-consumer durable="true" shared="true" subscriptionName="inboundEvents_1"/>
</jms:consumer-type>
</jms:listener>
新しい JMS メッセージのリスナーが「JMSReplyTo」ヘッダーが設定されたメッセージを受信すると、メッセージ処理の完了後に応答が返信宛先に対して生成されます。
Mule 3 の場合、これはトランスポートに exchange-pattern="request-response"
を設定することを意味し、フローの結果は自動的に応答のペイロードになります。応答メッセージのヘッダーが outbound-properties
を使用して設定されたのに対し、メッセージの本文はフローの最後の payload
から取得されました。
<flow name="jmsBridge">
<jms:inbound-endpoint queue="storage" exchange-pattern="request-response" connector-ref="PublicAMQ">
<message-properties-transformer scope="outbound">
<add-message-property key="timeToLive" value="2000"/>
<add-message-property key="timeToLive" value="2000"/>
</message-properties-transformer>
</jms:inbound-endpoint>
<http:request config-ref="HTTP_Request_Configuration" path="/storage" method="POST"/>
<set-payload value="BRIDGED">
</flow>
Mule 4 ではその代わりに、応答に関連付けられたすべてのパラメーターを、listener
コンポーネントの一部としてインラインで直接設定でき、フローの最後に到達したときに変換を行う必要がなくなりました。
<flow name="jmsBridge">
<jms:listener config-ref="config" destination="storage">
<jms:response timeToLive="2" timeToLiveUnit="SECONDS">
<jms:body>#['BRIDGED']</jms:body>
</jms:response>
</jms:listener>
<http:request config-ref="HTTP_Request_Configuration" path="/storage" method="POST">
</flow>
JMS では、JMSReplyTo
ヘッダーを使用して非同期通信を実行できます。これは、クライアントによってその場で作成される一時的な宛先か、既存の宛先のどちらかを使用して行うことができます。
Mule 3 では、返信宛先がメッセージが到着したら破棄される一時キューである最初のケースの場合、アウトバウンドエンドポイントで「request-response」 exchange-pattern を使用します。
<flow name="jmsRequestReplyTemporaryDestination">
<http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8080" path="invoices"/>
<dw:transform-message>
<dw:set-payload><![CDATA[%dw 1.0
%output application/xml
---
{
data: payload,
costumer: message.inboundProperties."http.query.params".costumer_id
}]]></dw:set-payload>
</dw:transform-message>
<object-to-string-transformer/>
<jms:outbound-endpoint exchange-pattern="request-response" queue="invoiceProcessor" connector-ref="Active_MQ"/>
<logger level="INFO" message="Status: #[payload]">
</flow>
他方 Mule 4 には、この特定のユースケースの解決を目的とする、publish-consume
というまったく新しい操作が用意されています。
<flow name="jmsRequestReplyTemporaryDestination">
<http:listener config-ref="HTTP_Listener_config" path="/invoices"/>
<jms:publish-consume config-ref="JMS_Config" destination="invoiceProcessor">
<jms:message>
<jms:body>#[output application/xml ---
{
data: payload,
costumer: attributes.queryParams.costumer_id
}]</jms:body>
</jms:message>
</jms:publish-consume>
<logger level="INFO" message="#['Status: ' ++ payload]">
</flow>
この場合も、メッセージの構築が message
要素内で操作のインラインで行われることがあり、送信メッセージに影響する変換または設定はその要素の一部として行われます。
明示的な reply-to
宛先を使用した request-reply の実行は、request-reply
スコープという新しいコンポーネントが必要だったため、3.x では少し複雑でした。
<flow name="JMS-request-reply">
<jms:inbound-endpoint queue="invoices" exchange-pattern="request-response" connector-ref="Active_MQ"/>
<dw:transform-message>
<dw:set-payload><![CDATA[%dw 1.0
%output application/xml
---
{
data: payload,
costumer: message.inboundProperties."http.query.params".costumer_id
}]]></dw:set-payload>
</dw:transform-message>
<object-to-string-transformer/>
<request-reply> (1)
<jms:outbound-endpoint connector-ref="Active_MQ" exchange-pattern="one-way" queue="invoiceProcessor"/>
<jms:inbound-endpoint connector-ref="Active_MQ" exchange-pattern="one-way" topic="processedInvoiceEvents"/>
</request-reply>
<logger level="INFO" message="#['Status: ' ++ payload]">
</flow>
このスコープ (1) では、request-reply パターンを実行する送受信トランスポートを設定できました。そうすると、送信メッセージに JMSReplyTo
ヘッダーが自動的に挿入され、インバウンドエンドポイントでリスンが開始されました。
新しい publish-consume
操作を使用する Mule 4 の JMS Connector では、フローに変更を加える必要がほとんどありません。返信を特定の宛先に送信する必要がある場合は、パブリッシュまたは応答の他のケースと同様に、メッセージビルダーで直接 reply-to
ヘッダーを設定します。
<flow name="jmsRequestReplyTemporaryDestination">
<http:listener config-ref="HTTP_Listener_config" path="/invoices"/>
<jms:publish-consume config-ref="JMS_Config" destination="invoiceProcessor">
<jms:message>
<jms:body>#[output application/xml ---
{
data: payload,
costumer: attributes.queryParams.costumer_id
}]</jms:body>
</jms:message>
<jms:reply-to destination="processedInvoiceEvents" destinationType="TOPIC"/> (1)
</jms:publish-consume>
<logger level="INFO" message="#['Status: ' ++ payload]">
</flow>
この例では、既知の宛先が他のユーザーによってイベント追跡や処理後トリガーなどに使用される場合があることを示すために、返信宛先ヘッダー (<1>) をよく知られたトピックに設定します。
3.x から 4.x に移行する場合、トランザクションのサポートは設定がよく似ています。予想されるのは、inbound-endpoint
および outbound-endpoint
で行われていた設定から、操作トランザクションについて標準化された Mule 4 アプローチへの変更です。
<flow name="transactedJmsFlow">
<jms:inbound-endpoint queue="${in}">
<jms:transaction action="ALWAYS_BEGIN" /> (1)
</jms:inbound-endpoint>
<set-variable variableName="originalPayload" value="#[payload]"/> (2)
<dw:transform-message> (3)
<dw:set-payload><![CDATA[%dw 1.0
%output application/xml
---
payload
]]></dw:set-payload>
</dw:transform-message>
<object-to-string-transformer/>
<jms:outbound-endpoint queue="${out}"> (4)
<jms:transaction action="ALWAYS_JOIN"/>
</jms:outbound-endpoint>
<default-exception-strategy>
<commit-transaction exception-pattern="*"/> (5)
<set-payload value="#[flowVars.originalPayload]"/> (6)
<jms:outbound-endpoint queue="dead.letter"> (7)
<jms:transaction action="JOIN_IF_POSSIBLE"/>
</jms:outbound-endpoint>
</default-exception-strategy>
</flow>
注意点:
1 | ALWAYS_BEGIN を指定したインバウンドエンドポイントからトランザクションが開始されます。 |
2 | 元のペイロードが失われていないことを確認します。 |
3 | ペイロードが変換され、アウトバウンドエンドポイントを介して送信可能になります。 |
4 | アウトバウンドエンドポイントが ALWAYS_JOIN に設定されます。 |
5 | あらゆる例外をキャッチする例外戦略をセットアップします。 |
6 | 元のペイロードが復元されるため、元のメッセージが dead.letter にパブリッシュされます。 |
7 | 最後に、元のメッセージを dead.letter に送信して、現在のトランザクションへの結合を試行します。 |
Mule 4 でも、次のアプローチで同じシナリオを実装できます。
<flow name="transactedJmsFlow">
<jms:listener config-ref="JMS_Config" destination="${in}" transactionalAction="ALWAYS_BEGIN"/> (1)
<jms:publish config-ref="JMS_Config" destination="${out}" transactionalAction="ALWAYS_JOIN"> (2)
<jms:message>
<jms:body>#[output application/xml --- payload</jms:body>
</jms:message>
</jms:publish>
<error-handler>
<on-error-continue type="ANY"> (3)
<jms:publish config-ref="JMS_Config" destination="dead.letter" transactionalAction="JOIN_IF_POSSIBLE"/> (4)
</on-error-continue>
</error-handler>
</flow>
1 | ALWAYS_BEGIN を指定した listener からトランザクションが開始されます。 |
2 | publish 操作によってペイロードが XML 形式でパブリッシュされますが、現在のペイロードを変更したり、ALWAYS_JOIN を指定したトランザクションに結合することはありません。 |
3 | 生じたエラーをキャッチするエラーハンドラーを使用して、メッセージが失われていないことを確認します。 |
4 | 現在のペイロードは受信した元のメッセージのままのため、JOIN_IF_POSSIBLE トランザクションアクションを使用してそのまま dead.letter にパブリッシュします。 |