Contact Us 1-800-596-4880

Migrating to the JMS Connector

The JMS transport was completely reshaped and evolved away from the Mule 3 transport model into an operation based connector with a simplified UX and fully DataSense enabled Message information. This means not only that using JMS now provides the same experience regarding connection and configuration that any other connector, but also that producing and consuming messages is simpler based on the structured Message information.

Also, the new version of JMS Connector provides support for the much desired JMS 2.0 spec, with all the new capabilities like shared subscriptions.

Configuring The Connector

Moving from a 3.x transport configuration to a JMS Connector configuration in Mule 4 implies basically using the same exact parameters, but declaring them in a more cohesive way. This means for example that parameters that are used for consuming Messages are declared in a 'consumer-config' group, while general authentication parameters are configured in the General group. Also, a separation has been made between the parameters that affect the behavior of the connector (present in the config) from those that affect only how the connection is established (present at connection level):

Mule 3 Example: Configuring The Connector
<jms:connector name="JMS_Config"
     acknowledgementMode="DUPS_OK_ACKNOWLEDGE"
     clientId="myClient"
     durable="true"
     noLocal="true"
     persistentDelivery="true"
     maxRedelivery="5"
     cacheJmsSessions="true"
     eagerConsumer="false"
     specification="1.1"
     numberOfConsumers="7"
     username="myuser"
     password="mypass" />

This same configuration is achieved in Mule 4 setting the same parameters in the context where they actually apply, like 'noLocal' or 'durable' affecting only the 'topic-consumer':

Mule 4 Example: Configuring The Connector
<jms:config name="JMS_Config">
  <jms:active-mq-connection specification="JMS_1_1" clientId="myClient" username="myuser" password="mypass">
      <jms:factory-configuration maxRedelivery="5"/>
  </jms:active-mq-connection>
  <jms:consumer-config ackMode="DUPS_OK">
      <jms:consumer-type>
          <jms:topic-consumer noLocal="true" durable="true"/>
      </jms:consumer-type>
  </jms:consumer-config>
  <jms:producer-config persistentDelivery="true"/>
</jms:config>

Connecting To A Broker

In the JMS Connector for Mule 4, the distinction between a generic JMS connection and the simplified ActiveMQ connection still exists, but with a much more more focused set of parameters that are associated only to the connection. We’ll see first how to configure this simplified connection and then move on to the generic approach.

Connecting To ActiveMQ

Using the transport, the only way to configure ActiveMQ specific parameters was defining the ConfigurationFactory as an spring bean and then passing on each property: .Mule 3 Example: Connecting To ActiveMQ

<spring:bean name="connectionFactory"
             class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL"
              value="tcp://activemqserver:61616"/>

    <property name="redeliveryPolicy">
        <spring:bean class="org.apache.activemq.RedeliveryPolicy">
            <property name="initialRedeliveryDelay"
                      value="20000"/>
            <property name="redeliveryDelay"
                      value="20000"/>
            <property name="maximumRedeliveries"
                      value="10"/>
       </spring:bean>
    </property>
</spring:bean>


<jms:activemq-connector name="jmsConnector" connectionFactory-ref="connectionFactory"/>

With the new JMS Connector, this properties can be configured directly in the activemq connection:

Mule 4 Example: Connecting To ActiveMQ
<jms:config name="JMS_Config">
  <jms:active-mq-connection>
      <jms:factory-configuration brokerUrl="tcp://activemqserver:61616"
                                 initialRedeliveryDelay="20000" redeliveryDelay="20000"  maxRedelivery="10"/>
  </jms:active-mq-connection>
</jms:config>

If you need to migrate a transport configuration that is using a Connection Factory not yet supported out of the box in the JMS Connector, you can still do use the 'connectionFactory' attribute in the 'activemq-connection' to pass a reference to any Connection Factory bean.

Using A Different Broker

Both Mule 3 JMS transport and Mule 4 JMS Connector allow you to define a generic connection to any Connection Factory that you need.

We have two ways of doing this, the first one is creating a Connection Factory using spring beans and then referencing it as the Connection Factory:

Mule 3 Example: Generic Connection Using Spring Beans
<spring:bean name="connectionFactory" class="com.foo.CustomConnectionFactory"/>

<jms:connector name="jmsConnector" connectionFactory-ref="connectionFactory" />

Mule 4’s version is almost identical:

Mule 4 Example: Generic Connection Using Spring Beans
<spring:bean name="connectionFactory" class="com.foo.CustomConnectionFactory"/>

<jms:config name="JMS_Config">
  <jms:generic-connection connectionFactory="customConnectionFactory"/>
</jms:config>

Another way of creating a generic connection is using a Connection Factory that is discovered using JNDI. In this case, the functionality remains the same, but syntax changes from transport to connector:

Mule 3 Example: Generic Connection Using JNDI
<beans>
    <util:properties id="providerProperties">
        <prop key="queue.jndi-queue-in">in</prop>
        <prop key="topic.jndi-topic-in">in</prop>
    </util:properties>
</beans>


<jms:connector name="jmsConnector"
    jndiInitialFactory="com.sun.jndi.ldap.LdapCtxFactory"
    jndiProviderUrl="ldap://localhost:10389/"
    jndiProviderProperties-ref="providerProperties"
    connectionFactoryJndiName="cn=ConnectionFactory,dc=example,dc=com"
    jndiDestinations="true"
    forceJndiDestinations="false"/>

In Mule 4’s version you can do this configuring the JNDI inline:

Mule 4 Example: Generic Connection Using Spring Beans
<jms:config name="JMS_Config">
    <jms:generic-connection>
        <jms:connection-factory>
            <jms:jndi-connection-factory connectionFactoryJndiName="cn=ConnectionFactory,dc=example,dc=com"
                                         lookupDestination="TRY_ALWAYS">
                <jms:name-resolver-builder
                        jndiInitialContextFactory="com.sun.jndi.ldap.LdapCtxFactory"
                        jndiProviderUrl="ldap://localhost:10389/">
                  <jms:provider-properties>
                      <jms:provider-property key="queue.jndi-queue-in" value="in"/>
                      <jms:provider-property key="topic.jndi-topic-in" value="in"/>
                  </jms:provider-properties>
                </jms:name-resolver-builder>
            </jms:jndi-connection-factory>
        </jms:connection-factory>
    </jms:generic-connection>
</jms:config>

Three main differences arise from this example:

  • Properties are now declared inline, no need for spring bean utils to be used.

  • Enforcing the lookup of destinations using JNDI is now configured as a single parameter named 'lookupDestination', which unifies the previous two parameters 'jndiDestinations' and 'forceJndiDestinations'.

  • Parameters are now present in the context for which they are relevant, like the 'jndiProviderUrl' being part of the 'name-resolver'.

Sending Messages

JMS Transport relied in the payload to contain the body of a JMS Message, and used Mule’s outbound properties to customize the JMS Properties and Headers. With the new Mule 4 approach, the JMS 'publish' operation relies only on its input parameters to completely build the JMS Message to be published.

For example, if we wanted to send a high priority JMS Message with only a part of the payload in the body, and associate that Message to a group, we’ll need to: <1>) Use transform to set the payload to what the Message body is expected to be. <2>) Convert the resulting stream into a String to send it as a text message. <3>) Set an outbound message property with priority as key to set the JMSPriority. <4>) Set an outbound message property with JMSXGroupID as key to set the JMSXGroupID.

Mule 3 Example: Sending a prioritized Message as part of a Group
<flow name="JmsTransportOutbound">
    <http:listener config-ref="HTTP_Listener_Configuration" path="/orders"/>
    <dw:transform-message> (1)
        <dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
{
order_id: payload.id,
supplier: payload.warehouse
}]]></dw:set-payload>
    </dw:transform-message>
    <object-to-string-transformer/> (2)
    <jms:outbound-endpoint queue="storage" connector-ref="Active_MQ">
      <message-properties-transformer scope="outbound">
          <add-message-property key="JMSXGroupID" value="#[message.inboundProperties."http.query.params".packageGroup]"/> (3)
          <add-message-property key="priority" value="9"/> (4)
      </message-properties-transformer>
    </jms:outbound-endpoint>
</flow>

The same results can be achieved in Mule 4 using the JMS Connector with the following configuration:

Mule 4 Example: Sending a prioritized Message as part of a Group
<flow name="JMSConnectorPublish">
		<http:listener config-ref="HTTP_Listener_config" path="/orders"/>
    (2)
		<jms:publish config-ref="JMS_Config" destination="storage" priority="9"> (3)
			<jms:message> (1)
				<jms:body>#[output application/json ---
        {
          order_id: payload.id,
          supplier: payload.warehouse
        }]</jms:body>
				<jms:jmsx-properties jmsxGroupID="#[attributes.queryParams.packageGroup]"/> (4)
			</jms:message>
		</jms:publish>
	</flow>

Differences to be noted:

1) There’s no need of the transform component, since the body of the Message is created inline, thus the payload remains unmodified. 2) The object-to-string transformer was also removed, since the Connector can handle automatically the transformation output. 3) Priority is set as a parmeter of the publish operation and doesn’t rely on the user knowing the exact key. 4) Group is set as part of the Message JMSX properties and doesn’t rely on the user knowing the exact header name.

As a summary, when publishing a Message in 3.x with the JMS transport, we relied on the MuleMessage payload, and outbound properties to configure the creation of the JMS Message, which meant a deeper knowledge of how the transport worked. In 4.x, the JMS Connector exposes every configurable element as a parameter in the scope were it belongs, thus exposing all the JMS functionality in a clearer way.

Listening For New Messages

The JMS transport inbound-endpoint allows you to wait for new Messages on a given topic or queue. The output of this listener will contain the body of the message in the payload, and all the JMS headers and properties as inboundProperties.

Mule 3 Example: Listening For Messages With Filtering by Selector
<flow name="JmsTransportInbound">
  <jms:inbound-endpoint connector-ref="Active_MQ" queue="in">
    <jms:selector expression="JMSPriority=9"/>   (1)
  </jms:inbound-endpoint>
  <dw:transform-message> (2)
      <dw:set-payload><![CDATA[%dw 1.0
        %output application/json
        ---
        {
        items: payload,
        costumer: message.inboundProperties.'costumer_id',
        type: message.inboundProperties.'JMSType'
        }]]></dw:set-payload>
  </dw:transform-message>
  <object-to-string-transformer/>  (3)
  <jms:outbound-endpoint queue="v2/prime/orders" connector-ref="Active_MQ"/>  (4)
</flow>

In this case, we are listening for high priority Messages and then adapting them to the new format required by version 2 of priority orders:

1) Filter incoming messages by priority. 2) Transform the MuleMessage using the metadata contained in the inboundProperties so the payload matches the new JSON format we need for the new API. 3) Convert the transformed payload to a JSON String. 4) Publish the payload to the proxied queue.

Implementing the same in Mule 4 looks like this:

Mule 4 Example: Listening For Messages With Filtering by Selector
<flow name="JMSConnectorPublish">
  <jms:listener config-ref="JMS_Config" destination="in" selector="JMSPriority=9"/> (1)
  <jms:publish config-ref="JMS_Config" destination="v2/prime/orders"> (2)
    <jms:message>
      <jms:body>#[output application/json ---
      {
        items: payload,
        costumer: attributes.properties.userProperties.costumer_id, (3)
        type: attributes.headers.type
      }]</jms:body>
    </jms:message>
  </jms:publish>
</flow>

Now, the flow has fewer components and is not required to modify the Message payload to publish with a different format:

1 Listening with a filter is done configuring the 'selector' in the listener.
2 Definition of the new message is done inline, so it only creates the JSON for the new Message body.
3 We use the message 'attributes' POJO instead of the 'inboundProperties', which now differentiate the 'headers' of the JMS Messsage from the 'properties'.

Consuming Messages

Consuming Messages mid-flow from a given destination was not supported by Mule’s 3 JMS transport, and the way to go was also adding the 'Mule Requester Module' to your application, which would then handle the mid-flow message consume.

So, for example, if you wanted to expose your JMS Queue behind a new REST API, your application would be similar to this:

Mule 3 Example: Consuming Messages Mid-Flow
<mulerequester:config name="Mule_Requester"/>
<jms:activemq-connector name="Active_MQ" brokerURL="tcp://localhost:61616" specification="1.1"/>

<flow name="ordersFromJMS">
  <http:inbound-endpoint exchange-pattern="request-response" path="orders" host="localhost" port="8081"/>
  <mulerequester:request config-ref="Mule_Requester"
                         resource="jms://Orders?selector=shipped%3D'#[message.inboundProperties.'shipped']'"/>
  <logger level="INFO" message="CorrelationId: #[message.inboundProperties.'JMSCorrelationId']"/>
</flow>

Some things to notice here are:

  • All metadata regarding JMS Message is completely lost, so logging the CorrelationId relies on you knowing the syntax for obtaining the Header.

  • Dynamic filtering by 'selector' has to be done in the 'resource' url of the requester, so multiple arguments end up with an error prone configuration.

  • We need both the JMS and Mule Requester configurations.

Mule 4 comes out of the box with the capability of consuming messages mid-flow by using thr 'consume' operation. This operation is very similar to the Listener we saw before, with the difference that it can be used anywhere in the flow:

Mule 4 Example: Consuming Messages Mid-Flow
<flow name="ordersFromJMS">
  <http:listener config-ref="HTTP_Listener_config" path="/orders"/>
  <jms:consume destination="Orders" selector=#['shipped=' ++ attributes.queryParams.shipped]/>
  <logger level="INFO" message="#['CorrelationId: ' ++ attributes.headers.correlationId]"/>
</flow>

Now we only needed a the JMS Connector, configured the 'consume' operation with the 'selector' parameter using the metadata from the listener, and also were able to log the correlationId with metadata support in the Message attributes.

Handling Topic Subscriptions

Topics used as inbound endpoints in 3.x allowed the user to configure if the subscription to the Topic had to be done as a durable subscription or not. There were different ways of doing so, and it had the issue of exposing the durable configuration for queues too, which made no sense.

A Topic subscription in 3.x would look like this:

Mule 3 Example: Topic Subscriptions
<jms:inbound-endpoint connector-ref="Active_MQ" topic="trackedEvents" durable="true" durableName="inboundEvents_1"/>

For Mule 4, the subscription mechanism was reviewed, leaving the option of subscriptions scoped down to Topics only, and adding more functionality thanks to the support of JMS 2.0.

Same example as before, but in 4.x will be:

Mule 4 Example: Topic Subscriptions
<jms:listener config-ref="JMS_Config" destination="trackedEvents">
    <jms:consumer-type>
        <jms:topic-consumer durable="true" subscriptionName="inboundEvents_1"/>
    </jms:consumer-type>
</jms:listener>

But in this case, the topic-consumer configuration allows us to also set a shared subscription (only if using a JMS 2.0 Connection) that allows the processing of messages from at topic subscription by multiple threads, connections or JVMs:

Mule 4 Example: Topic Subscriptions
<jms:listener config-ref="JMS_Config" destination="trackedEvents">
    <jms:consumer-type>
        <jms:topic-consumer durable="true" shared="true" subscriptionName="inboundEvents_1"/>
    </jms:consumer-type>
</jms:listener>

Responding To Incoming Messages

When the listener for new JMS Messages receives a Message with the 'JMSReplyTo' header configured, then it is expected that a response is emitted to the reply destination once the processing of the Message is completed.

For Mule 3, this means configuring the transport with exchange-pattern="request-response"`, where the result of the flow will automatically become the payload of the response. Headers of the response Message were configured using the outbound-properties, while the body of the Message was taken from the payload at the end of the Flow.

Mule 3 Example: Responding To Incoming Messages
<flow name="jmsBridge">
  <jms:inbound-endpoint queue="storage" exchange-pattern="request-response" connector-ref="PublicAMQ">
    <message-properties-transformer scope="outbound">
      <add-message-property key="timeToLive" value="2000"/>
      <add-message-property key="timeToLive" value="2000"/>
    </message-properties-transformer>
  </jms:inbound-endpoint>
  <http:request config-ref="HTTP_Request_Configuration" path="/storage" method="POST"/>
  <set-payload value="BRIDGED">
</flow>

Mule 4 instead allows you to configure all the parameters associated to the response, directly inline as a part of the listener component, leaving behind the need of a transformation when reaching the end of the flow.

Mule 4 Example: Responding To Incoming Messages
<flow name="jmsBridge">
  <jms:listener config-ref="config" destination="storage">
    <jms:response timeToLive="2" timeToLiveUnit="SECONDS">
        <jms:body>#['BRIDGED']</jms:body>
    </jms:response>
  </jms:listener>
  <http:request config-ref="HTTP_Request_Configuration" path="/storage" method="POST">
</flow>

Doing Request-Reply

JMS allows you to use the JMSReplyTo header to perform a synchronous communication. This can be done either with a temporary destination that is created on the fly by the client, or using an already existing destination.

Request Reply With Temporary Destinations

In Mule 3, for the first case where the reply destination is a temporary queue that will be discarded once the message arrives, we have the "request-response" exchange-pattern in the outbound endpoint:

Mule 3 Example: Doing Request-Reply With Temporary Reply Destination
<flow name="jmsRequestReplyTemporaryDestination">
  <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8080" path="invoices"/>
  <dw:transform-message>
      <dw:set-payload><![CDATA[%dw 1.0
        %output application/xml
        ---
        {
        data: payload,
        costumer: message.inboundProperties."http.query.params".costumer_id
        }]]></dw:set-payload>
  </dw:transform-message>
  <object-to-string-transformer/>
  <jms:outbound-endpoint exchange-pattern="request-response" queue="invoiceProcessor" connector-ref="Active_MQ"/>
  <logger level="INFO" message="Status: #[payload]">
</flow>

Instead, in Mule 4 you have a brand new operation called publish-consume which aims to solve this specific use case:

Mule 4 Example: Doing Request-Reply With Temporary Reply Destination
<flow name="jmsRequestReplyTemporaryDestination">
  <http:listener config-ref="HTTP_Listener_config" path="/invoices"/>
  <jms:publish-consume config-ref="JMS_Config" destination="invoiceProcessor">
    <jms:message>
      <jms:body>#[output application/xml ---
      {
        data: payload,
        costumer: attributes.queryParams.costumer_id
      }]</jms:body>
    </jms:message>
  </jms:publish-consume>
  <logger level="INFO" message="#['Status: ' ++ payload]">
</flow>

You may see that, again, the building of the Message is where inline of the operation, in the message element, and any transformation or configuration that affects the outgoing Message will be done as part of that element.

Request Reply With Explicit Destinations

Doing a request-reply with an explicit reply-to destination was a little bit more tricky in 3.x, since a new component was required, the request-reply Scope:

Mule 3 Example: Doing Request-Reply With Explicit Reply Destination
<flow name="JMS-request-reply">
  <jms:inbound-endpoint queue="invoices" exchange-pattern="request-response" connector-ref="Active_MQ"/>
  <dw:transform-message>
      <dw:set-payload><![CDATA[%dw 1.0
        %output application/xml
        ---
        {
        data: payload,
        costumer: message.inboundProperties."http.query.params".costumer_id
        }]]></dw:set-payload>
  </dw:transform-message>
  <object-to-string-transformer/>
  <request-reply> (1)
    <jms:outbound-endpoint connector-ref="Active_MQ" exchange-pattern="one-way" queue="invoiceProcessor"/>
    <jms:inbound-endpoint connector-ref="Active_MQ" exchange-pattern="one-way" topic="processedInvoiceEvents"/>
  </request-reply>
  <logger level="INFO" message="#['Status: ' ++ payload]">
</flow>

This scope (1) allowed you to set an inbound and outbound transport to do the request-reply pattern. This way, it would inject the JMSReplyTo header automatically in the outgoing Message and then started listening in the inbound endpoint

For the case of Mule’s 4 JMS Connector with the new publish-consume operation, it requires you to do almost no changes to the flow. If you want an specific destination for the reply to be sent, just configure the reply-to header in the Message builder directly, as you would in any other case of either a publish or a response:

Mule 4 Example: Doing Request-Reply With Temporary Reply Destination
<flow name="jmsRequestReplyTemporaryDestination">
  <http:listener config-ref="HTTP_Listener_config" path="/invoices"/>
  <jms:publish-consume config-ref="JMS_Config" destination="invoiceProcessor">
    <jms:message>
      <jms:body>#[output application/xml ---
      {
        data: payload,
        costumer: attributes.queryParams.costumer_id
      }]</jms:body>
    </jms:message>
    <jms:reply-to destination="processedInvoiceEvents" destinationType="TOPIC"/> (1)
  </jms:publish-consume>
  <logger level="INFO" message="#['Status: ' ++ payload]">
</flow>

In this example we set the reply destination header (<1>) to a well-known Topic, to illustrate that a known destination may be used by others to do things like event tracking or post-processing triggers.

Using Transactions

Transactions support is quite similar in its configuration when moving from 3.x to 4.x, with the expected change from it being configured in the inbound-endpoint and outbound-endpoint to the normalized Mule 4 approach for operations transactions:

Mule 3 Example: Using Transactions
<flow name="transactedJmsFlow">
    <jms:inbound-endpoint queue="${in}">
        <jms:transaction action="ALWAYS_BEGIN" /> (1)
    </jms:inbound-endpoint>
    <set-variable variableName="originalPayload" value="#[payload]"/> (2)
    <dw:transform-message> (3)
        <dw:set-payload><![CDATA[%dw 1.0
          %output application/xml
          ---
          payload
          ]]></dw:set-payload>
    </dw:transform-message>
    <object-to-string-transformer/>
    <jms:outbound-endpoint queue="${out}"> (4)
        <jms:transaction action="ALWAYS_JOIN"/>
    </jms:outbound-endpoint>
    <default-exception-strategy>
        <commit-transaction exception-pattern="*"/> (5)
        <set-payload value="#[flowVars.originalPayload]"/> (6)
        <jms:outbound-endpoint queue="dead.letter"> (7)
            <jms:transaction action="JOIN_IF_POSSIBLE"/>
        </jms:outbound-endpoint>
    </default-exception-strategy>
</flow>

Things to note are:

1 Transaction is initiated by the inbound endpoint with ALWAYS_BEGIN
2 We make sure not to loose the original payload
3 Payload is transformed so it can be sent through the outbound endpoint
4 The outbound endpoint is configured to ALWAYS_JOIN
5 We set up the exception strategy to catch all exceptions
6 Original payload is restored so the original message is published to the dead.letter
7 Finally we send the original message to the dead.letter attempting to join to the current transaction.

Same scenario can be implemented in Mule 4 with the following approach:

Mule 4 Example: Using Transactions
<flow name="transactedJmsFlow">
    <jms:listener config-ref="JMS_Config" destination="${in}" transactionalAction="ALWAYS_BEGIN"/> (1)
    <jms:publish config-ref="JMS_Config" destination="${out}" transactionalAction="ALWAYS_JOIN"> (2)
        <jms:message>
            <jms:body>#[output application/xml --- payload</jms:body>
        </jms:message>
    </jms:publish>
    <error-handler>
        <on-error-continue type="ANY"> (3)
          <jms:publish config-ref="JMS_Config" destination="dead.letter" transactionalAction="JOIN_IF_POSSIBLE"/> (4)
        </on-error-continue>
    </error-handler>
</flow>
1 Transaction is initiated by the listener with ALWAYS_BEGIN
2 Publishing of the payload in XML format is done by the publish operation without modifying the current payload, also joining the transaction with ALWAYS_JOIN
3 An error handler that catches any error occurred is used to make sure the Message is not lost
4 Since the current payload is still the original Message received, we just publish it to the dead.letter using the JOIN_IF_POSSIBLE transactional action