Contact Free trial Login

WebSockets Examples - Mule 4

You can use the WebSockets connector to build an integration system that obtains and broadcasts stock quotes.

This example consists of three apps:

App Implementation

Quote Producer App

This app generaates continuous random stock quotes like the following:

{
   "ticker": "CRM",
   "price": 157.6,
   "cur": "USD",
  "timestamp": 1563374475104
}

In this example, this is called by the Quote type.

This app generates approximately 50 quotes, using at least 5 different tickers (CRM, MELI, GOOG, NFLX, AAPL). The prices are random, so there’s no need to get the real ones for this example.

This app also exposes the wss://localhost:8080/feed endpoint. This TLS secure endpoint accepts one and only one connection. The first client to connect needs to do so successfully. Subsequent clients need to be sent a WSS message with the text "Sorry, spot taken" after which, the connection is dropped.

Each produced quote is sent to the connected feed client.

Quote Aggregator App

This app connects to the feed endpoint in the prior app and receives all of the quotes. It splits them by ticker and places them in time-based aggregators that flush every 5 seconds.

When the aggregators flush, they output an Array<Quote> that needs to be reduced to a single QuoteSnapshot type. QuoteSnapshot is as follows:

{
   "ticker": "CRM"
   "price": "157.54 USD"
}

Such reduction is done by picking the Quote with the greatest timestamp and transforming it per the example.

The produced snapshots broadcast to a dynamic list of subscribers. Subscriptions are done through the wss://localhost:8082/quotes endpoint that this application exposes. Clients can connect to this endpoint, using a query parameter to indicate what stock quotes to follow. For example: wss://localhost:8082/quotes?ticker=CRM&ticker=MELI

The request should establish a WebSocket that gets the snapshots for the CRM and MELI tickers. This should be done by subscribing the resulting sockets to the proper socket groups.

Quote Client App

Finally, the client app should open at least three different WebSockets to the quotes endpoint in the aggregator app. Each of those sockets should be listening for a different set of tickers.

The received QuoteSnapshots should be transformed to CSV format and appended to a file.

Quote Producer Implementation

    <tls:context name="TLS_Context">
        <tls:key-store
		path="producer-keystore.jks"
		alias="producerkey"
		keyPassword="mule123"
		password="mule123" />
    </tls:context>

    <http:listener-config name="HTTP_TLS_Listener_config">
        <http:listener-connection
		host="0.0.0.0"
		port="60000"
		tlsContext="TLS_Context"
		protocol="HTTPS"/>
    </http:listener-config>

    <http:listener-config name="HTTP_Listener_config">
        <http:listener-connection host="0.0.0.0" port="60001" />
    </http:listener-config>

    <websocket:config name="WebSockets_Config">
        <websocket:connection >
            <websocket:server-settings listenerConfig="HTTP_TLS_Listener_config" />
        </websocket:connection>
    </websocket:config>

    <os:object-store name="Object_store" persistent="false" />

 	<flow name="stock-quotes-producer-flow">
        <scheduler>
            <scheduling-strategy >
                <fixed-frequency frequency="15" timeUnit="SECONDS"/>
            </scheduling-strategy>
        </scheduler>

        <ee:transform doc:name="Transform Message">
            <ee:message >
                <ee:set-payload >
                    <![CDATA[%dw 2.0
                    var randomStockSelector = randomInt(5)
                    var randomPriceVariation = (randomInt(401) / 100) - 2 as String { format: "#.00"} as Number
                    var baseStockQuotes = [
                        {
                            "ticker": "CRM",
                            "price": 157.6,
                            "cur": "USD"
                        },
                        {
                            "ticker": "MELI",
                            "price": 646.24,
                            "cur": "USD"
                        },
                        {
                            "ticker": "GOOG",
                            "price": 1134.14,
                            "cur": "USD"
                        },
                        {
                            "ticker": "NFLX",
                            "price": 316.53,
                            "cur": "USD"
                        },
                        {
                            "ticker": "AAPL",
                            "price": 208.19,
                            "cur": "USD"
                        }
                    ]
                    var selectedStock = baseStockQuotes[randomStockSelector]
                    output application/json
                    ---
                    {
                        ticker : selectedStock.ticker,
                        price : selectedStock.price + randomPriceVariation,
                        cur : selectedStock.cur,
                        timestamp : now() as Number
                    }]]>
                </ee:set-payload>
            </ee:message>
        </ee:transform>
        <flow-ref name="send-stock-quote-flow"/>
    </flow>

    <flow name="send-stock-quote-flow">
        <os:contains
		objectStore="Object_store"
		key="webSocketConnectedId"
		target="webSocketConnected"/>
        <choice>
            <when expression="#[vars.webSocketConnected]">
                <os:retrieve
			objectStore="Object_store"
			target="webSocketConnectedId"
			key="webSocketConnectedId" />
                <websocket:send
			socketId="#[vars.webSocketConnectedId]"
			config-ref="WebSockets_Config"/>
            </when>
        </choice>
    </flow>

	<flow name="on-new-inbound-connection-flow">
		<websocket:on-inbound-connection
			doc:name="On New Inbound Connection"
			config-ref="WebSockets_Config"
			path="/feed" />
		<os:contains
			key="webSocketConnectedId"
			target="webSocketConnected"
			objectStore="Object_store"/>
		<choice>
			<when expression="#[vars.webSocketConnected]">
				<os:retrieve
					key="webSocketConnectedId"
					objectStore="Object_store"
					target="webSocketConnectedId" />
				<websocket:close-socket
					socketId="#[attributes.socketId]"
					reason="Sorry, spot taken"
					config-ref="WebSockets_Config"/>
			</when>
			<otherwise>
				<os:store key="webSocketConnectedId" objectStore="Object_store">
					<os:value ><![CDATA[#[attributes.socketId]]]></os:value>
				</os:store>
			</otherwise>
		</choice>
	</flow>

	<flow name="on-new-inbound-message-flow">
		<websocket:inbound-listener config-ref="WebSockets_Config" path="/feed"/>
		<logger
			level="INFO"
			doc:name="Logger"
			message="#[output application/json
		---
		{
			info: 'New message received from [' ++ attributes.socketId ++ ']',
			payload : payload
		}]"/>
	</flow>

	<flow name="on-socket-closed-flow">
		<websocket:on-socket-closed config-ref="WebSockets_Config" path="/feed"/>
		<os:retrieve
			key="webSocketConnectedId"
			objectStore="Object_store"
			target="webSocketConnectedId" />
		<choice>
			<when expression="#[attributes.socketId == vars.webSocketConnectedId]">
				<os:remove key="webSocketConnectedId" objectStore="Object_store"/>
			</when>
		</choice>
	</flow>

	<flow name="close-websocket-flow">
		<os:contains
			key="webSocketConnectedId"
			target="webSocketConnected"
			objectStore="Object_store"/>
		<choice>
			<when expression="#[vars.webSocketConnected]">
				<os:retrieve
					doc:name="Retrieve"
					key="webSocketConnectedId"
					objectStore="Object_store"
					target="webSocketConnectedId" />
				<websocket:close-socket
					config-ref="WebSockets_Config"
					socketId="#[vars.webSocketConnectedId]"
					reason="Producer app wanted to close the websocket"/>
			</when>
		</choice>
	</flow>

Quote Aggregator Implementation

	<tls:context name="TLS_Context">
        <tls:trust-store path="aggregator-truststore.jks" password="mule123" />
    </tls:context>

    <http:listener-config name="HTTP_API_Listener_config">
        <http:listener-connection host="0.0.0.0" port="61000" />
    </http:listener-config>

    <websocket:config name="WebSockets_Client_Config">
        <websocket:connection >
            <websocket:client-settings
	    	host="0.0.0.0"
		port="60000"
		protocol="WSS"
		tlsContext="TLS_Context"/>
        </websocket:connection>
    </websocket:config>

    <os:object-store name="Object_store" persistent="false" />

    <http:listener-config name="HTTP_WebSockets_Listener_Config">
        <http:listener-connection host="0.0.0.0" port="61001" protocol="HTTPS">
            <tls:context >
                <tls:key-store
			path="broadcast-keystore.jks"
			alias="broadcast"
			keyPassword="mule123"
			password="mule123" />
            </tls:context>
        </http:listener-connection>
    </http:listener-config>

    <websocket:config name="WebSockets_Server_Config">
        <websocket:connection >
            <websocket:server-settings
	    	listenerConfig="HTTP_WebSockets_Listener_Config"
		idleSocketTimeout="30"/>
        </websocket:connection>
    </websocket:config>

    <flow name="client-app-websocket-inbound-listener-flow">
        <websocket:inbound-listener
		doc:name="On New Inbound Message"
		config-ref="WebSockets_Server_Config"
		path="/quotes"/>
        <logger level="INFO" doc:name="Logger" message="#[output application/json
            ---
            {
                info: 'New message received from [' ++ attributes.socketId ++ ']',
                payload: payload
            }]" />
    </flow>

	<flow name="broadcasting-stock-quote-snapshot">
        <logger level="INFO" doc:name="Logger" message="#[output application/json
            ---
            {
                info: 'Broadcasting stock quote reduction',
                payload: payload
            }]"/>

        <websocket:broadcast
		config-ref="WebSockets_Server_Config"
		path="/quotes"
		socketType="INBOUND"
		groups="#[[payload.ticker]]" />
    </flow>

	<flow name="producer-app-websocket-open-flow">
        <websocket:open-outbound-socket config-ref="WebSockets_Client_Config" path="/feed"/>
        <logger
		level="INFO"
		message="#['New connection established with [' ++ attributes.socketId ++ ']']" />
        <os:contains key="socketId" objectStore="Object_store" target="webSocketConnected" />
        <choice>
            <when expression="#[vars.webSocketConnected]">
                <logger
			level="INFO"
			message="#['The websocket [' ++ attributes.socketId ++ '] is not saved as there is already one connected']"/>
            </when>
            <otherwise >
                <logger
			level="INFO"
			message="#['Websocket with ID [' ++ attributes.socketId ++ '] is saved for later reference']"/>
                <os:store key="socketId" objectStore="Object_store">
                    <os:value><![CDATA[#[attributes.socketId]]]></os:value>
                </os:store>
            </otherwise>
        </choice>
    </flow>

    <flow name="producer-app-websocket-outbound-listener-flow">
        <websocket:outbound-listener
		config-ref="WebSockets_Client_Config"
		path="/feed"
		outputMimeType="application/json"/>
        <logger level="INFO" message="#[output application/json
            ---
            {
                info: 'New message received from [' ++ attributes.socketId ++ ']',
                payload: payload
            }]" />
    </flow>

    <flow name="producer-app-websocket-close-flow">
        <os:contains key="socketId" objectStore="Object_store" target="webSocketConnected"/>
        <choice>
            <when expression="#[vars.webSocketConnected]">
                <os:retrieve key="socketId" objectStore="Object_store" target="socketId"/>
                <logger
			level="INFO"
			message="#['Trying to close websocket [' ++ vars.socketId as String ++ ']']"/>
                <websocket:close-socket
			config-ref="WebSockets_Client_Config"
			socketId="#[vars.socketId]"
			reason="Client wants to close the websocket"/>
            </when>
            <otherwise >
                <logger level="INFO" message="There is no WebSocket to close"/>
            </otherwise>
        </choice>
    </flow>

    <flow name="producer-app-websocket-on-socket-closed-flow">
        <websocket:on-socket-closed config-ref="WebSockets_Client_Config" path="/feed"/>
        <logger
		level="INFO"
		message="#['Websocket [' ++ attributes.socketId ++ '] was closed']"/>
        <os:retrieve key="socketId" objectStore="Object_store" target="socketId" />
        <choice>
            <when expression="#[attributes.socketId == vars.socketId]">
                <logger
			level="INFO"
			message="#['Removing the websocket ID stored for referencing it']" />
                <os:remove key="socketId" objectStore="Object_store" />
            </when>
            <otherwise >
                <logger
			level="INFO"
			message="#['Disconnected WebSocket is not the main one [' ++ vars.socketId as String ++ ']']" />
            </otherwise>
        </choice>
    </flow>

Quote Client Implementation

    <http:listener-config name="HTTP_Listener_config">
        <http:listener-connection host="0.0.0.0" port="62000" />
    </http:listener-config>

    <websocket:config name="WebSockets_Client_App_Config">
        <websocket:connection>
            <websocket:client-settings host="0.0.0.0" port="61001" protocol="WSS">
                <tls:context >
                    <tls:trust-store path="client-truststore.jks" password="mule123" />
                </tls:context>
            </websocket:client-settings>
        </websocket:connection>
    </websocket:config>

    <flow name="aggregator-app-websocket-open-flow">
        <http:listener doc:name="Listener"
		config-ref="HTTP_Listener_config"
		path="open-aggregator-ws"/>
        <set-variable value="#[attributes.queryParams.*ticker]" variableName="groups"/>
        <websocket:open-outbound-socket
		config-ref="WebSockets_Client_App_Config"
		path="/quotes"
		defaultGroups="#[vars.groups]" />
    </flow>

    <flow name="stock-quotes-clientFlow">
        <websocket:outbound-listener
		config-ref="WebSockets_Client_App_Config"
		path="/quotes"
		outputMimeType="application/json"/>
        <logger level="INFO" doc:name="Logger" message="#[output application/json
            ---
            {
                info: 'New message received on [' ++ attributes.socketId ++ ']',
                payload: payload
            }]"/>
    </flow>