WebSockets Connector Examples - Mule 4
The following Mule app examples use Anypoint Connector for WebSockets (WebSockets Connector) to build an integration system that obtains and broadcasts stock quotes.
Quote Producer App
The following Mule app generates continuous random stock quotes.
-
Stock quotes example:
{ "ticker": "CRM", "price": 157.6, "cur": "USD", "timestamp": 1563374475104 }
-
The
Quote
type identifies the previous mentioned stock quote sample. -
The Mule app generates approximately 50 quotes with random stock prices that uses at least 5 different tickers, such as CRM, MELI, GOOG, NFLX and APPL.
-
The app also exposes the TLS secure
wss://localhost:60000/feed
endpoint that accepts only one connection. -
The first client must connect to the endpoint successfully.
-
Subsequent clients that want to connect to the endpoint receive a WSS message with the text
"Sorry, spot taken"
. -
Each produced quote is sent to the connected feed client.
To generate the corresponding keystores:
-
Open your terminal.
-
In the terminal, navigate to the
src/main/resources
folder of your project app in Studio. -
Run the following command:
keytool -genkey -v -keystore producer-keystore.jks -alias producerkey -keyalg RSA -keysize 2048 -validity 10000
XML for the Quote Producer App
Paste this code into your Studio XML editor to quickly load the flow for this example into your Mule app:
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:tls="http://www.mulesoft.org/schema/mule/tls"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:websocket="http://www.mulesoft.org/schema/mule/websocket" xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:os="http://www.mulesoft.org/schema/mule/os" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/tls http://www.mulesoft.org/schema/mule/tls/current/mule-tls.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/websocket http://www.mulesoft.org/schema/mule/websocket/current/mule-websocket.xsd
http://www.mulesoft.org/schema/mule/os http://www.mulesoft.org/schema/mule/os/current/mule-os.xsd">
<tls:context name="TLS_Context">
<tls:key-store
path="producer-keystore.jks"
alias="producerkey"
keyPassword="mule123"
password="mule123" />
</tls:context>
<http:listener-config name="HTTP_TLS_Listener_config">
<http:listener-connection
host="0.0.0.0"
port="60000"
tlsContext="TLS_Context"
protocol="HTTPS"/>
</http:listener-config>
<http:listener-config name="HTTP_Listener_config">
<http:listener-connection host="0.0.0.0" port="60001" />
</http:listener-config>
<websocket:config name="WebSockets_Config">
<websocket:connection >
<websocket:server-settings listenerConfig="HTTP_TLS_Listener_config" />
</websocket:connection>
</websocket:config>
<os:object-store name="Object_store" persistent="false" />
<flow name="stock-quotes-producer-flow">
<scheduler>
<scheduling-strategy >
<fixed-frequency frequency="15" timeUnit="SECONDS"/>
</scheduling-strategy>
</scheduler>
<ee:transform doc:name="Transform Message">
<ee:message >
<ee:set-payload >
<![CDATA[%dw 2.0
var randomStockSelector = randomInt(5)
var randomPriceVariation = (randomInt(401) / 100) - 2 as String { format: "#.00"} as Number
var baseStockQuotes = [
{
"ticker": "CRM",
"price": 157.6,
"cur": "USD"
},
{
"ticker": "MELI",
"price": 646.24,
"cur": "USD"
},
{
"ticker": "GOOG",
"price": 1134.14,
"cur": "USD"
},
{
"ticker": "NFLX",
"price": 316.53,
"cur": "USD"
},
{
"ticker": "AAPL",
"price": 208.19,
"cur": "USD"
}
]
var selectedStock = baseStockQuotes[randomStockSelector]
output application/json
---
{
ticker : selectedStock.ticker,
price : selectedStock.price + randomPriceVariation,
cur : selectedStock.cur,
timestamp : now() as Number
}]]>
</ee:set-payload>
</ee:message>
</ee:transform>
<flow-ref name="send-stock-quote-flow"/>
</flow>
<flow name="send-stock-quote-flow">
<os:contains
objectStore="Object_store"
key="webSocketConnectedId"
target="webSocketConnected"/>
<choice>
<when expression="#[vars.webSocketConnected]">
<logger level="INFO" message="Sending" />
<os:retrieve
objectStore="Object_store"
target="webSocketConnectedId"
key="webSocketConnectedId" />
<websocket:send
socketId="#[vars.webSocketConnectedId]"
config-ref="WebSockets_Config"/>
</when>
</choice>
</flow>
<flow name="on-new-inbound-connection-flow">
<websocket:on-inbound-connection
doc:name="On New Inbound Connection"
config-ref="WebSockets_Config"
path="/feed" />
<os:contains
key="webSocketConnectedId"
target="webSocketConnected"
objectStore="Object_store"/>
<choice>
<when expression="#[vars.webSocketConnected]">
<os:retrieve
key="webSocketConnectedId"
objectStore="Object_store"
target="webSocketConnectedId" />
<websocket:close-socket
socketId="#[attributes.socketId]"
reason="Sorry, spot taken"
config-ref="WebSockets_Config"/>
</when>
<otherwise>
<os:store key="webSocketConnectedId" objectStore="Object_store">
<os:value ><![CDATA[#[attributes.socketId]]]></os:value>
</os:store>
</otherwise>
</choice>
</flow>
<flow name="on-new-inbound-message-flow">
<websocket:inbound-listener config-ref="WebSockets_Config" path="/feed"/>
<logger
level="INFO"
doc:name="Logger"
message="#[output application/json
---
{
info: 'New message received from [' ++ attributes.socketId ++ ']',
payload : payload
}]"/>
</flow>
<flow name="on-socket-closed-flow">
<websocket:on-socket-closed config-ref="WebSockets_Config" path="/feed"/>
<os:retrieve
key="webSocketConnectedId"
objectStore="Object_store"
target="webSocketConnectedId" />
<choice>
<when expression="#[attributes.socketId == vars.webSocketConnectedId]">
<os:remove key="webSocketConnectedId" objectStore="Object_store"/>
</when>
</choice>
</flow>
<flow name="close-websocket-flow">
<os:contains
key="webSocketConnectedId"
target="webSocketConnected"
objectStore="Object_store"/>
<choice>
<when expression="#[vars.webSocketConnected]">
<os:retrieve
doc:name="Retrieve"
key="webSocketConnectedId"
objectStore="Object_store"
target="webSocketConnectedId" />
<websocket:close-socket
config-ref="WebSockets_Config"
socketId="#[vars.webSocketConnectedId]"
reason="Producer app wanted to close the websocket"/>
</when>
</choice>
</flow>
</mule>
Quote Aggregator App
The following Mule app connects to the feed endpoint of the previous Quote Producer app and receives all of the quotes. Then, the Quote Aggregator app splits the quotes by ticker and places them in time-based aggregators that occur every 5 seconds.
When the aggregations occur, the aggregators output a reduced Array<Quote>
to a single QuoteSnapshot
type.
{
"ticker": "CRM"
"price": "157.54 USD"
}
-
The reduction occurs by picking the
Quote
with the greatest timestamp and transforming it. -
The produced snapshots broadcast to a dynamic list of subscribers.
-
Subscriptions are done through the
wss://localhost:8082/quotes
endpoint that the Quote Aggregator App exposes. -
Clients can connect to the previous endpoint by using a query parameter to indicate what stock quotes to follow, for example:
wss://localhost:8082/quotes?ticker=CRM&ticker=MELI
To generate the corresponding keystores and truststores:
-
Open your terminal.
-
In the terminal, navigate to the
src/main/resources
folder of your project app in Studio. -
Run the following command for keystores:
keytool -genkey -v -keystore broadcast-keystore.jks -alias broadcast -keyalg RSA -keysize 2048 -validity 10000
-
Run the following command to generate the truststores:
keytool -genkey -v -keystore aggregator-truststore.jks -alias broadcast -keyalg RSA -keysize 2048 -validity 10000
The request establishes a WebSocket that gets the snapshots for the CRM
and MELI
tickers. This is done should by subscribing the resulting sockets to the proper socket groups.
To connect the Quote Aggregator app with the Quote Producer app, trigger the flow to open the outbound socket with the following command:
+
curl -k http://localhost:8081/connect
XML for the Quote Aggregator App
Paste this code into your Studio XML editor to quickly load the flow for this example into your Mule app:
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:tls="http://www.mulesoft.org/schema/mule/tls"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:websocket="http://www.mulesoft.org/schema/mule/websocket"
xmlns:os="http://www.mulesoft.org/schema/mule/os"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/tls http://www.mulesoft.org/schema/mule/tls/current/mule-tls.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/os http://www.mulesoft.org/schema/mule/os/current/mule-os.xsd
http://www.mulesoft.org/schema/mule/websocket http://www.mulesoft.org/schema/mule/websocket/current/mule-websocket.xsd">
<tls:context name="TLS_Context">
<tls:trust-store path="aggregator-truststore.jks" password="mule123" insecure="true"/>
</tls:context>
<http:listener-config name="HTTP_API_Listener_config">
<http:listener-connection host="0.0.0.0" port="61000" />
</http:listener-config>
<websocket:config name="WebSockets_Client_Config">
<websocket:connection >
<websocket:client-settings
host="0.0.0.0"
port="60000"
protocol="WSS"
tlsContext="TLS_Context"/>
</websocket:connection>
</websocket:config>
<os:object-store name="Object_store" persistent="false" />
<tls:context name="TLS_Context_Broadcast">
<tls:key-store
path="broadcast-keystore.jks"
alias="broadcast"
keyPassword="mule123"
password="mule123" />
</tls:context>
<http:listener-config name="HTTP_WebSockets_Listener_Config">
<http:listener-connection
host="0.0.0.0"
port="61001"
tlsContext="TLS_Context_Broadcast"
protocol="HTTPS"/>
</http:listener-config>
<websocket:config name="WebSockets_Server_Config">
<websocket:connection >
<websocket:server-settings listenerConfig="HTTP_WebSockets_Listener_Config" />
</websocket:connection>
</websocket:config>
<http:listener-config name="HTTP_Listener_config" >
<http:listener-connection host="0.0.0.0" port="8081" />
</http:listener-config>
<flow name="broadcasting-stock-quote-snapshot">
<logger level="INFO" doc:name="Logger" message="#[output application/json
---
{
info: 'Broadcasting stock quote reduction',
payload: payload
}]"/>
</flow>
<flow name="producer-app-websocket-open-flow">
<http:listener config-ref="HTTP_Listener_config" path="/connect"/>
<websocket:open-outbound-socket config-ref="WebSockets_Client_Config" path="/feed"/>
<logger
level="INFO"
message="#['New connection established with [' ++ attributes.socketId ++ ']']" />
<os:contains key="socketId" objectStore="Object_store" target="webSocketConnected" />
<choice>
<when expression="#[vars.webSocketConnected]">
<logger
level="INFO"
message="#['The websocket [' ++ attributes.socketId ++ '] is not saved as there is already one connected']"/>
</when>
<otherwise >
<logger
level="INFO"
message="#['Websocket with ID [' ++ attributes.socketId ++ '] is saved for later reference']"/>
<os:store key="socketId" objectStore="Object_store">
<os:value><![CDATA[#[attributes.socketId]]]></os:value>
</os:store>
</otherwise>
</choice>
</flow>
<flow name="webs-quote-aggFlow" >
<websocket:on-inbound-connection config-ref="WebSockets_Server_Config" path="/quotes"/>
<websocket:subscribe-groups config-ref="WebSockets_Server_Config" socketId="#[attributes.socketId]" groups='#[[attributes.headers.groups]]' />
</flow>
<flow name="client-app-websocket-inbound-listener-flow">
<websocket:inbound-listener doc:name="On New Inbound Message" config-ref="WebSockets_Server_Config" path="/quotes" />
<logger level="INFO" doc:name="Logger" message="#[output application/json
---
{
info: 'New message received from [' ++ attributes.socketId ++ ']',
payload: payload
}]" />
</flow>
<flow name="producer-app-websocket-outbound-listener-flow">
<websocket:outbound-listener
config-ref="WebSockets_Client_Config"
path="/feed"
outputMimeType="application/json"/>
<logger level="INFO" message="#[output application/json
---
{
info: 'New message received from [' ++ attributes.socketId ++ ']',
payload: payload
}]" />
<logger level="INFO" message="#[[payload.ticker]]"/>
<websocket:broadcast config-ref="WebSockets_Server_Config" path="/quotes" groups="#[[payload.ticker]]"/>
</flow>
<flow name="producer-app-websocket-close-flow">
<os:contains key="socketId" objectStore="Object_store" target="webSocketConnected"/>
<choice>
<when expression="#[vars.webSocketConnected]">
<os:retrieve key="socketId" objectStore="Object_store" target="socketId"/>
<logger
level="INFO"
message="#['Trying to close websocket [' ++ vars.socketId as String ++ ']']"/>
<websocket:close-socket
config-ref="WebSockets_Client_Config"
socketId="#[vars.socketId]"
reason="Client wants to close the websocket"/>
</when>
<otherwise >
<logger level="INFO" message="There is no WebSocket to close"/>
</otherwise>
</choice>
</flow>
<flow name="producer-app-websocket-on-socket-closed-flow">
<websocket:on-socket-closed config-ref="WebSockets_Client_Config" path="/feed"/>
<logger
level="INFO"
message="#['Websocket [' ++ attributes.socketId ++ '] was closed']"/>
<os:retrieve key="socketId" objectStore="Object_store" target="socketId" />
<choice>
<when expression="#[attributes.socketId == vars.socketId]">
<logger
level="INFO"
message="#['Removing the websocket ID stored for referencing it']" />
<os:remove key="socketId" objectStore="Object_store" />
</when>
<otherwise >
<logger
level="INFO"
message="#['Disconnected WebSocket is not the main one [' ++ vars.socketId as String ++ ']']" />
</otherwise>
</choice>
</flow>
</mule>
Quote Client App
The following Mule app opens at least three different WebSockets to the quotes endpoint in the Quote Aggregator app. Each of those sockets listen to a different set of tickers.
The received QuoteSnapshots
is transformed to CSV format and appended to a file.
To generate the corresponding keystores:
-
Open your terminal.
-
In the terminal, navigate to the
src/main/resources
folder of your project app in Studio. -
Run the following command:
keytool -genkey -v -keystore client-truststore.jks -alias client -keyalg RSA -keysize 2048 -validity 10000
To connect the Quote Client app with the Quote Aggregator app, trigger the flow to open the outbound socket with the following command:
XML for the Quote Client App
Paste this code into your Studio XML editor to quickly load the flow for this example into your Mule app:
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:tls="http://www.mulesoft.org/schema/mule/tls"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.mulesoft.org/schema/mule/websocket"
xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/tls http://www.mulesoft.org/schema/mule/tls/current/mule-tls.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/websocket http://www.mulesoft.org/schema/mule/websocket/current/mule-websocket.xsd">
<http:listener-config name="HTTP_Listener_config">
<http:listener-connection host="0.0.0.0" port="62000" />
</http:listener-config>
<websocket:config name="WebSockets_Client_App_Config">
<websocket:connection>
<websocket:client-settings host="0.0.0.0" port="61001" protocol="WSS">
<tls:context >
<tls:trust-store path="client-truststore.jks" password="mule123" insecure="true"/>
</tls:context>
</websocket:client-settings>
</websocket:connection>
</websocket:config>
<flow name="aggregator-app-websocket-open-flow">
<http:listener doc:name="Listener"
config-ref="HTTP_Listener_config"
path="open-aggregator-ws"/>
<set-variable value="#[attributes.queryParams.*ticker]" variableName="groups"/>
<logger level="INFO" message="#[vars.groups]"/>
<websocket:open-outbound-socket config-ref="WebSockets_Client_App_Config" path="/quotes" socketId="#[attributes.socketId]" defaultGroups="#[vars.groups]">
<websocket:headers ><![CDATA[#[output application/java
---
{
groups : vars.groups[0]
}]]]></websocket:headers>
</websocket:open-outbound-socket>
</flow>
<flow name="stock-quotes-clientFlow">
<websocket:outbound-listener
config-ref="WebSockets_Client_App_Config"
path="/quotes"
outputMimeType="application/json"/>
<logger level="INFO" doc:name="Logger" message="#[output application/json
---
{
info: 'New message received on [' ++ attributes.socketId ++ ']',
payload: payload
}]"/>
</flow>
<flow name="webs-clientFlow" >
<websocket:subscribe-groups config-ref="WebSockets_Client_App_Config" socketId="#[attributes.socketId]" groups="#[vars.groups]" />
</flow>
</mule>