Contact Us 1-800-596-4880

WebSockets Connector Examples - Mule 4

The following Mule app examples use Anypoint Connector for WebSockets (WebSockets Connector) to build an integration system that obtains and broadcasts stock quotes.

Quote Producer App

The following Mule app generates continuous random stock quotes.

Quote Producer App flow in Anypoint Studio Canvas
  1. Stock quotes example:

{
   "ticker": "CRM",
   "price": 157.6,
   "cur": "USD",
  "timestamp": 1563374475104
}
  • The Quote type identifies the previous mentioned stock quote sample.

  • The Mule app generates approximately 50 quotes with random stock prices that uses at least 5 different tickers, such as CRM, MELI, GOOG, NFLX and APPL.

  • The app also exposes the TLS secure wss://localhost:60000/feed endpoint that accepts only one connection.

  • The first client must connect to the endpoint successfully.

  • Subsequent clients that want to connect to the endpoint receive a WSS message with the text "Sorry, spot taken".

  • Each produced quote is sent to the connected feed client.

To generate the corresponding keystores:

  1. Open your terminal.

  2. In the terminal, navigate to the src/main/resources folder of your project app in Studio.

  3. Run the following command:

    keytool -genkey -v -keystore producer-keystore.jks -alias producerkey -keyalg RSA -keysize 2048 -validity 10000

XML for the Quote Producer App

Paste this code into your Studio XML editor to quickly load the flow for this example into your Mule app:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:tls="http://www.mulesoft.org/schema/mule/tls"
	xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
	xmlns:websocket="http://www.mulesoft.org/schema/mule/websocket" xmlns="http://www.mulesoft.org/schema/mule/core"
	xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:os="http://www.mulesoft.org/schema/mule/os" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/tls http://www.mulesoft.org/schema/mule/tls/current/mule-tls.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/websocket http://www.mulesoft.org/schema/mule/websocket/current/mule-websocket.xsd
http://www.mulesoft.org/schema/mule/os http://www.mulesoft.org/schema/mule/os/current/mule-os.xsd">


<tls:context name="TLS_Context">
        <tls:key-store
		path="producer-keystore.jks"
		alias="producerkey"
		keyPassword="mule123"
		password="mule123" />
    </tls:context>

    <http:listener-config name="HTTP_TLS_Listener_config">
        <http:listener-connection
		host="0.0.0.0"
		port="60000"
		tlsContext="TLS_Context"
		protocol="HTTPS"/>
    </http:listener-config>

    <http:listener-config name="HTTP_Listener_config">
        <http:listener-connection host="0.0.0.0" port="60001" />
    </http:listener-config>

    <websocket:config name="WebSockets_Config">
        <websocket:connection >
            <websocket:server-settings listenerConfig="HTTP_TLS_Listener_config" />
        </websocket:connection>
    </websocket:config>

    <os:object-store name="Object_store" persistent="false" />

 	<flow name="stock-quotes-producer-flow">
        <scheduler>
            <scheduling-strategy >
                <fixed-frequency frequency="15" timeUnit="SECONDS"/>
            </scheduling-strategy>
        </scheduler>

        <ee:transform doc:name="Transform Message">
            <ee:message >
                <ee:set-payload >
                    <![CDATA[%dw 2.0
                    var randomStockSelector = randomInt(5)
                    var randomPriceVariation = (randomInt(401) / 100) - 2 as String { format: "#.00"} as Number
                    var baseStockQuotes = [
                        {
                            "ticker": "CRM",
                            "price": 157.6,
                            "cur": "USD"
                        },
                        {
                            "ticker": "MELI",
                            "price": 646.24,
                            "cur": "USD"
                        },
                        {
                            "ticker": "GOOG",
                            "price": 1134.14,
                            "cur": "USD"
                        },
                        {
                            "ticker": "NFLX",
                            "price": 316.53,
                            "cur": "USD"
                        },
                        {
                            "ticker": "AAPL",
                            "price": 208.19,
                            "cur": "USD"
                        }
                    ]
                    var selectedStock = baseStockQuotes[randomStockSelector]
                    output application/json
                    ---
                    {
                        ticker : selectedStock.ticker,
                        price : selectedStock.price + randomPriceVariation,
                        cur : selectedStock.cur,
                        timestamp : now() as Number
                    }]]>
                </ee:set-payload>
            </ee:message>
        </ee:transform>
        <flow-ref name="send-stock-quote-flow"/>
    </flow>

    <flow name="send-stock-quote-flow">
		<os:contains
		objectStore="Object_store"
		key="webSocketConnectedId"
		target="webSocketConnected"/>
        <choice>
            <when expression="#[vars.webSocketConnected]">
                <logger level="INFO" message="Sending" />
				<os:retrieve
			objectStore="Object_store"
			target="webSocketConnectedId"
			key="webSocketConnectedId" />
				<websocket:send
			socketId="#[vars.webSocketConnectedId]"
			config-ref="WebSockets_Config"/>
            </when>
        </choice>
    </flow>

	<flow name="on-new-inbound-connection-flow">
		<websocket:on-inbound-connection
			doc:name="On New Inbound Connection"
			config-ref="WebSockets_Config"
			path="/feed" />
		<os:contains
			key="webSocketConnectedId"
			target="webSocketConnected"
			objectStore="Object_store"/>
		<choice>
			<when expression="#[vars.webSocketConnected]">
				<os:retrieve
					key="webSocketConnectedId"
					objectStore="Object_store"
					target="webSocketConnectedId" />
				<websocket:close-socket
					socketId="#[attributes.socketId]"
					reason="Sorry, spot taken"
					config-ref="WebSockets_Config"/>
			</when>
			<otherwise>
				<os:store key="webSocketConnectedId" objectStore="Object_store">
					<os:value ><![CDATA[#[attributes.socketId]]]></os:value>
				</os:store>
			</otherwise>
		</choice>
	</flow>

	<flow name="on-new-inbound-message-flow">
		<websocket:inbound-listener config-ref="WebSockets_Config" path="/feed"/>
		<logger
			level="INFO"
			doc:name="Logger"
			message="#[output application/json
		---
		{
			info: 'New message received from [' ++ attributes.socketId ++ ']',
			payload : payload
		}]"/>
	</flow>

	<flow name="on-socket-closed-flow">
		<websocket:on-socket-closed config-ref="WebSockets_Config" path="/feed"/>
		<os:retrieve
			key="webSocketConnectedId"
			objectStore="Object_store"
			target="webSocketConnectedId" />
		<choice>
			<when expression="#[attributes.socketId == vars.webSocketConnectedId]">
				<os:remove key="webSocketConnectedId" objectStore="Object_store"/>
			</when>
		</choice>
	</flow>

	<flow name="close-websocket-flow">
		<os:contains
			key="webSocketConnectedId"
			target="webSocketConnected"
			objectStore="Object_store"/>
		<choice>
			<when expression="#[vars.webSocketConnected]">
				<os:retrieve
					doc:name="Retrieve"
					key="webSocketConnectedId"
					objectStore="Object_store"
					target="webSocketConnectedId" />
				<websocket:close-socket
					config-ref="WebSockets_Config"
					socketId="#[vars.webSocketConnectedId]"
					reason="Producer app wanted to close the websocket"/>
			</when>
		</choice>
	</flow>

</mule>

Quote Aggregator App

The following Mule app connects to the feed endpoint of the previous Quote Producer app and receives all of the quotes. Then, the Quote Aggregator app splits the quotes by ticker and places them in time-based aggregators that occur every 5 seconds.

Quote Aggregator App flow in Anypoint Studio Canvas

When the aggregations occur, the aggregators output a reduced Array<Quote> to a single QuoteSnapshot type.

QuoteSnapshot Type example:
{
   "ticker": "CRM"
   "price": "157.54 USD"
}
  • The reduction occurs by picking the Quote with the greatest timestamp and transforming it.

  • The produced snapshots broadcast to a dynamic list of subscribers.

  • Subscriptions are done through the wss://localhost:8082/quotes endpoint that the Quote Aggregator App exposes.

  • Clients can connect to the previous endpoint by using a query parameter to indicate what stock quotes to follow, for example: wss://localhost:8082/quotes?ticker=CRM&ticker=MELI

To generate the corresponding keystores and truststores:

  1. Open your terminal.

  2. In the terminal, navigate to the src/main/resources folder of your project app in Studio.

  3. Run the following command for keystores:

    keytool -genkey -v -keystore broadcast-keystore.jks -alias broadcast -keyalg RSA -keysize 2048 -validity 10000

  4. Run the following command to generate the truststores:

    keytool -genkey -v -keystore aggregator-truststore.jks -alias broadcast -keyalg RSA -keysize 2048 -validity 10000

The request establishes a WebSocket that gets the snapshots for the CRM and MELI tickers. This is done should by subscribing the resulting sockets to the proper socket groups.

To connect the Quote Aggregator app with the Quote Producer app, trigger the flow to open the outbound socket with the following command:

XML for the Quote Aggregator App

Paste this code into your Studio XML editor to quickly load the flow for this example into your Mule app:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:tls="http://www.mulesoft.org/schema/mule/tls"
	xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:websocket="http://www.mulesoft.org/schema/mule/websocket"
	xmlns:os="http://www.mulesoft.org/schema/mule/os"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/tls http://www.mulesoft.org/schema/mule/tls/current/mule-tls.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/os http://www.mulesoft.org/schema/mule/os/current/mule-os.xsd
http://www.mulesoft.org/schema/mule/websocket http://www.mulesoft.org/schema/mule/websocket/current/mule-websocket.xsd">


	<tls:context name="TLS_Context">
        <tls:trust-store path="aggregator-truststore.jks" password="mule123" insecure="true"/>
    </tls:context>

    <http:listener-config name="HTTP_API_Listener_config">
        <http:listener-connection host="0.0.0.0" port="61000" />
    </http:listener-config>

    <websocket:config name="WebSockets_Client_Config">
        <websocket:connection >
            <websocket:client-settings
	    	host="0.0.0.0"
		port="60000"
		protocol="WSS"
		tlsContext="TLS_Context"/>
        </websocket:connection>
    </websocket:config>

    <os:object-store name="Object_store" persistent="false" />

	<tls:context name="TLS_Context_Broadcast">
        <tls:key-store
		path="broadcast-keystore.jks"
		alias="broadcast"
		keyPassword="mule123"
		password="mule123" />
    </tls:context>

    <http:listener-config name="HTTP_WebSockets_Listener_Config">
        <http:listener-connection
		host="0.0.0.0"
		port="61001"
		tlsContext="TLS_Context_Broadcast"
		protocol="HTTPS"/>
    </http:listener-config>

    <websocket:config name="WebSockets_Server_Config">
        <websocket:connection >
            <websocket:server-settings listenerConfig="HTTP_WebSockets_Listener_Config" />
        </websocket:connection>
    </websocket:config>


    <http:listener-config name="HTTP_Listener_config"  >
		<http:listener-connection host="0.0.0.0" port="8081" />
	</http:listener-config>

	<flow name="broadcasting-stock-quote-snapshot">
		<logger level="INFO" doc:name="Logger" message="#[output application/json
            ---
            {
                info: 'Broadcasting stock quote reduction',
                payload: payload
            }]"/>
    </flow>
	<flow name="producer-app-websocket-open-flow">
        <http:listener  config-ref="HTTP_Listener_config" path="/connect"/>
		<websocket:open-outbound-socket config-ref="WebSockets_Client_Config" path="/feed"/>
        <logger
		level="INFO"
		message="#['New connection established with [' ++ attributes.socketId ++ ']']" />
        <os:contains key="socketId" objectStore="Object_store" target="webSocketConnected" />
        <choice>
            <when expression="#[vars.webSocketConnected]">
                <logger
			level="INFO"
			message="#['The websocket [' ++ attributes.socketId ++ '] is not saved as there is already one connected']"/>
            </when>
            <otherwise >
                <logger
			level="INFO"
			message="#['Websocket with ID [' ++ attributes.socketId ++ '] is saved for later reference']"/>
                <os:store key="socketId" objectStore="Object_store">
                    <os:value><![CDATA[#[attributes.socketId]]]></os:value>
                </os:store>
            </otherwise>
        </choice>
    </flow>

    <flow name="webs-quote-aggFlow"  >
		<websocket:on-inbound-connection  config-ref="WebSockets_Server_Config" path="/quotes"/>
		<websocket:subscribe-groups config-ref="WebSockets_Server_Config" socketId="#[attributes.socketId]" groups='#[[attributes.headers.groups]]' />
	</flow>
	<flow name="client-app-websocket-inbound-listener-flow">
        <websocket:inbound-listener doc:name="On New Inbound Message" config-ref="WebSockets_Server_Config" path="/quotes" />
        <logger level="INFO" doc:name="Logger" message="#[output application/json
            ---
            {
                info: 'New message received from [' ++ attributes.socketId ++ ']',
                payload: payload
            }]" />
    </flow>
	<flow name="producer-app-websocket-outbound-listener-flow">
        <websocket:outbound-listener
		config-ref="WebSockets_Client_Config"
		path="/feed"
		outputMimeType="application/json"/>
        <logger level="INFO" message="#[output application/json
            ---
            {
                info: 'New message received from [' ++ attributes.socketId ++ ']',
                payload: payload
            }]" />
		<logger level="INFO" message="#[[payload.ticker]]"/>
		<websocket:broadcast config-ref="WebSockets_Server_Config" path="/quotes" groups="#[[payload.ticker]]"/>
    </flow>

    <flow name="producer-app-websocket-close-flow">
        <os:contains key="socketId" objectStore="Object_store" target="webSocketConnected"/>
        <choice>
            <when expression="#[vars.webSocketConnected]">
                <os:retrieve key="socketId" objectStore="Object_store" target="socketId"/>
                <logger
			level="INFO"
			message="#['Trying to close websocket [' ++ vars.socketId as String ++ ']']"/>
                <websocket:close-socket
			config-ref="WebSockets_Client_Config"
			socketId="#[vars.socketId]"
			reason="Client wants to close the websocket"/>
            </when>
            <otherwise >
                <logger level="INFO" message="There is no WebSocket to close"/>
            </otherwise>
        </choice>
    </flow>

    <flow name="producer-app-websocket-on-socket-closed-flow">
        <websocket:on-socket-closed config-ref="WebSockets_Client_Config" path="/feed"/>
        <logger
		level="INFO"
		message="#['Websocket [' ++ attributes.socketId ++ '] was closed']"/>
        <os:retrieve key="socketId" objectStore="Object_store" target="socketId" />
        <choice>
            <when expression="#[attributes.socketId == vars.socketId]">
                <logger
			level="INFO"
			message="#['Removing the websocket ID stored for referencing it']" />
                <os:remove key="socketId" objectStore="Object_store" />
            </when>
            <otherwise >
                <logger
			level="INFO"
			message="#['Disconnected WebSocket is not the main one [' ++ vars.socketId as String ++ ']']" />
            </otherwise>
        </choice>
    </flow>

    </mule>

Quote Client App

The following Mule app opens at least three different WebSockets to the quotes endpoint in the Quote Aggregator app. Each of those sockets listen to a different set of tickers.

Quote Client App flow in Anypoint Studio Canvas
Figure 1. Quote Client App flow

The received QuoteSnapshots is transformed to CSV format and appended to a file.

To generate the corresponding keystores:

  1. Open your terminal.

  2. In the terminal, navigate to the src/main/resources folder of your project app in Studio.

  3. Run the following command:

    keytool -genkey -v -keystore client-truststore.jks -alias client -keyalg RSA -keysize 2048 -validity 10000

To connect the Quote Client app with the Quote Aggregator app, trigger the flow to open the outbound socket with the following command:

XML for the Quote Client App

Paste this code into your Studio XML editor to quickly load the flow for this example into your Mule app:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:tls="http://www.mulesoft.org/schema/mule/tls"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.mulesoft.org/schema/mule/websocket"
xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/tls http://www.mulesoft.org/schema/mule/tls/current/mule-tls.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/websocket http://www.mulesoft.org/schema/mule/websocket/current/mule-websocket.xsd">

    <http:listener-config name="HTTP_Listener_config">
      <http:listener-connection host="0.0.0.0" port="62000" />
  </http:listener-config>

  <websocket:config name="WebSockets_Client_App_Config">
      <websocket:connection>
          <websocket:client-settings host="0.0.0.0" port="61001" protocol="WSS">
              <tls:context >
                  <tls:trust-store path="client-truststore.jks" password="mule123" insecure="true"/>
              </tls:context>
          </websocket:client-settings>
      </websocket:connection>
  </websocket:config>

  <flow name="aggregator-app-websocket-open-flow">
      <http:listener doc:name="Listener"
  config-ref="HTTP_Listener_config"
  path="open-aggregator-ws"/>
      <set-variable value="#[attributes.queryParams.*ticker]" variableName="groups"/>
      <logger level="INFO" message="#[vars.groups]"/>
  <websocket:open-outbound-socket config-ref="WebSockets_Client_App_Config" path="/quotes" socketId="#[attributes.socketId]" defaultGroups="#[vars.groups]">
    <websocket:headers ><![CDATA[#[output application/java
---
{
groups : vars.groups[0]
}]]]></websocket:headers>
  </websocket:open-outbound-socket>
  </flow>

  <flow name="stock-quotes-clientFlow">
      <websocket:outbound-listener
  config-ref="WebSockets_Client_App_Config"
  path="/quotes"
  outputMimeType="application/json"/>
      <logger level="INFO" doc:name="Logger" message="#[output application/json
          ---
          {
              info: 'New message received on [' ++ attributes.socketId ++ ']',
              payload: payload
          }]"/>
  </flow>
<flow name="webs-clientFlow" >
  <websocket:subscribe-groups config-ref="WebSockets_Client_App_Config" socketId="#[attributes.socketId]" groups="#[vars.groups]" />
</flow>

</mule>
View on GitHub