WebSockets Connector の例 - Mule 4

次の Mule アプリケーションの例では、WebSockets 用 Anypoint Connector (WebSockets Connector) を使用して、株価情報を取得してブロードキャストするインテグレーションシステムを構築できます。

Quote Producer App

次の Mule アプリケーションは、継続的でランダムな株価情報を生成します。

Anypoint Studio キャンバスの Quote Producer App フロー
  1. 株価情報の例:

{
   "ticker": "CRM",
   "price": 157.6,
   "cur": "USD",
  "timestamp": 1563374475104
}
  • Quote​ 種別で前述の株価情報の例を識別します。

  • Mule アプリケーションは、少なくとも 5 つの異なるティッカー (CRM、MELI、GOOG、NFLX、AAPL) を使用するランダムな株価を使用して約 50 件の見積を生成します。

  • また、1 つの接続のみを受け入れる TLS セキュア ​wss://localhost:60000/feed​ エンドポイントを公開します。

  • 最初のクライアントはエンドポイントに正常に接続する必要があります。

  • エンドポイントに接続する以降のクライアントには、「​"Sorry, spot taken"​ (申し訳ありません。スポットは取得済みです)」というテキストが含まれる WSS メッセージが送信されます。

  • 生成された各見積もりが接続されたフィードクライアントに送信されます。

対応するキーストアを生成する手順は、次のとおりです。

  1. ターミナルを開きます。

  2. ターミナルで、Studio のプロジェクトアプリケーションの ​src/main/resources​ フォルダーに移動します。

  3. 次のコマンドを実行します。

    keytool -genkey -v -keystore producer-keystore.jks -alias producerkey -keyalg RSA -keysize 2048 -validity 10000

Quote Producer App の XML

この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:tls="http://www.mulesoft.org/schema/mule/tls"
	xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
	xmlns:websocket="http://www.mulesoft.org/schema/mule/websocket" xmlns="http://www.mulesoft.org/schema/mule/core"
	xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:os="http://www.mulesoft.org/schema/mule/os" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/tls http://www.mulesoft.org/schema/mule/tls/current/mule-tls.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/websocket http://www.mulesoft.org/schema/mule/websocket/current/mule-websocket.xsd
http://www.mulesoft.org/schema/mule/os http://www.mulesoft.org/schema/mule/os/current/mule-os.xsd">


<tls:context name="TLS_Context">
        <tls:key-store
		path="producer-keystore.jks"
		alias="producerkey"
		keyPassword="mule123"
		password="mule123" />
    </tls:context>

    <http:listener-config name="HTTP_TLS_Listener_config">
        <http:listener-connection
		host="0.0.0.0"
		port="60000"
		tlsContext="TLS_Context"
		protocol="HTTPS"/>
    </http:listener-config>

    <http:listener-config name="HTTP_Listener_config">
        <http:listener-connection host="0.0.0.0" port="60001" />
    </http:listener-config>

    <websocket:config name="WebSockets_Config">
        <websocket:connection >
            <websocket:server-settings listenerConfig="HTTP_TLS_Listener_config" />
        </websocket:connection>
    </websocket:config>

    <os:object-store name="Object_store" persistent="false" />

 	<flow name="stock-quotes-producer-flow">
        <scheduler>
            <scheduling-strategy >
                <fixed-frequency frequency="15" timeUnit="SECONDS"/>
            </scheduling-strategy>
        </scheduler>

        <ee:transform doc:name="Transform Message">
            <ee:message >
                <ee:set-payload >
                    <![CDATA[%dw 2.0
                    var randomStockSelector = randomInt(5)
                    var randomPriceVariation = (randomInt(401) / 100) - 2 as String { format: "#.00"} as Number
                    var baseStockQuotes = [
                        {
                            "ticker": "CRM",
                            "price": 157.6,
                            "cur": "USD"
                        },
                        {
                            "ticker": "MELI",
                            "price": 646.24,
                            "cur": "USD"
                        },
                        {
                            "ticker": "GOOG",
                            "price": 1134.14,
                            "cur": "USD"
                        },
                        {
                            "ticker": "NFLX",
                            "price": 316.53,
                            "cur": "USD"
                        },
                        {
                            "ticker": "AAPL",
                            "price": 208.19,
                            "cur": "USD"
                        }
                    ]
                    var selectedStock = baseStockQuotes[randomStockSelector]
                    output application/json
                    ---
                    {
                        ticker : selectedStock.ticker,
                        price : selectedStock.price + randomPriceVariation,
                        cur : selectedStock.cur,
                        timestamp : now() as Number
                    }]]>
                </ee:set-payload>
            </ee:message>
        </ee:transform>
        <flow-ref name="send-stock-quote-flow"/>
    </flow>

    <flow name="send-stock-quote-flow">
		<os:contains
		objectStore="Object_store"
		key="webSocketConnectedId"
		target="webSocketConnected"/>
        <choice>
            <when expression="#[vars.webSocketConnected]">
                <logger level="INFO" message="Sending" />
				<os:retrieve
			objectStore="Object_store"
			target="webSocketConnectedId"
			key="webSocketConnectedId" />
				<websocket:send
			socketId="#[vars.webSocketConnectedId]"
			config-ref="WebSockets_Config"/>
            </when>
        </choice>
    </flow>

	<flow name="on-new-inbound-connection-flow">
		<websocket:on-inbound-connection
			doc:name="On New Inbound Connection"
			config-ref="WebSockets_Config"
			path="/feed" />
		<os:contains
			key="webSocketConnectedId"
			target="webSocketConnected"
			objectStore="Object_store"/>
		<choice>
			<when expression="#[vars.webSocketConnected]">
				<os:retrieve
					key="webSocketConnectedId"
					objectStore="Object_store"
					target="webSocketConnectedId" />
				<websocket:close-socket
					socketId="#[attributes.socketId]"
					reason="Sorry, spot taken"
					config-ref="WebSockets_Config"/>
			</when>
			<otherwise>
				<os:store key="webSocketConnectedId" objectStore="Object_store">
					<os:value ><![CDATA[#[attributes.socketId]]]></os:value>
				</os:store>
			</otherwise>
		</choice>
	</flow>

	<flow name="on-new-inbound-message-flow">
		<websocket:inbound-listener config-ref="WebSockets_Config" path="/feed"/>
		<logger
			level="INFO"
			doc:name="Logger"
			message="#[output application/json
		---
		{
			info: 'New message received from [' ++ attributes.socketId ++ ']',
			payload : payload
		}]"/>
	</flow>

	<flow name="on-socket-closed-flow">
		<websocket:on-socket-closed config-ref="WebSockets_Config" path="/feed"/>
		<os:retrieve
			key="webSocketConnectedId"
			objectStore="Object_store"
			target="webSocketConnectedId" />
		<choice>
			<when expression="#[attributes.socketId == vars.webSocketConnectedId]">
				<os:remove key="webSocketConnectedId" objectStore="Object_store"/>
			</when>
		</choice>
	</flow>

	<flow name="close-websocket-flow">
		<os:contains
			key="webSocketConnectedId"
			target="webSocketConnected"
			objectStore="Object_store"/>
		<choice>
			<when expression="#[vars.webSocketConnected]">
				<os:retrieve
					doc:name="Retrieve"
					key="webSocketConnectedId"
					objectStore="Object_store"
					target="webSocketConnectedId" />
				<websocket:close-socket
					config-ref="WebSockets_Config"
					socketId="#[vars.webSocketConnectedId]"
					reason="Producer app wanted to close the websocket"/>
			</when>
		</choice>
	</flow>

</mule>

Quote Aggregator App

次の Mule アプリケーションは、前の Quote Producer App のフィードエンドポイントに接続し、すべての見積を受信します。次の、Quote Producer App はティッカー別に見積を分割し、5 秒ごとに実行される時間ベースのアグリゲーターに配置します。

Anypoint Studio キャンバスの Quote Aggregator App フロー

集計が実行されると、アグリゲーターは縮小される ​Array<Quote>​ を単一の ​QuoteSnapshot​ 種別に出力します。

QuoteSnapshot 種別の例
{
   "ticker": "CRM"
   "price": "157.54 USD"
}
  • 縮小は、タイムスタンプが最大の ​Quote​ を選択して変換することで実行されます。

  • 生成されたスナップショットはサブスクライバーの動的リストにブロードキャストされます。

  • サブスクリプションは、Quote Aggregator App が公開する ​wss://localhost:8082/quotes​ エンドポイントを通じて実行されます。

  • クライアントはどの株価情報に従うかを示すクエリパラメーターを使用して前のエンドポイントに接続できます (例: wss://localhost:8082/quotes?ticker=CRM&ticker=MELI​)。

対応するキーストアとトラストストアを生成する手順は、次のとおりです。

  1. ターミナルを開きます。

  2. ターミナルで、Studio のプロジェクトアプリケーションの ​src/main/resources​ フォルダーに移動します。

  3. キーストアの次のコマンドを実行します。

    keytool -genkey -v -keystore broadcast-keystore.jks -alias broadcast -keyalg RSA -keysize 2048 -validity 10000

  4. 次のコマンドを実行してトラストストアを生成します。

    keytool -genkey -v -keystore aggregator-truststore.jks -alias broadcast -keyalg RSA -keysize 2048 -validity 10000

要求によって、​CRM​ および ​MELI​ ティッカーのスナップショットを取得する WebSocket が確立されます。これは、生成されたソケットを適切なソケットグループにサブスクライブすることで実行されます。

Quote Aggregator App を Quote Producer App に接続するには、次のコマンドを使用して、アウトバウンドソケットを開くフローをトリガーします。

Quote Aggregator App の XML

この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:tls="http://www.mulesoft.org/schema/mule/tls"
	xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:websocket="http://www.mulesoft.org/schema/mule/websocket"
	xmlns:os="http://www.mulesoft.org/schema/mule/os"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/tls http://www.mulesoft.org/schema/mule/tls/current/mule-tls.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/os http://www.mulesoft.org/schema/mule/os/current/mule-os.xsd
http://www.mulesoft.org/schema/mule/websocket http://www.mulesoft.org/schema/mule/websocket/current/mule-websocket.xsd">


	<tls:context name="TLS_Context">
        <tls:trust-store path="aggregator-truststore.jks" password="mule123" insecure="true"/>
    </tls:context>

    <http:listener-config name="HTTP_API_Listener_config">
        <http:listener-connection host="0.0.0.0" port="61000" />
    </http:listener-config>

    <websocket:config name="WebSockets_Client_Config">
        <websocket:connection >
            <websocket:client-settings
	    	host="0.0.0.0"
		port="60000"
		protocol="WSS"
		tlsContext="TLS_Context"/>
        </websocket:connection>
    </websocket:config>

    <os:object-store name="Object_store" persistent="false" />

	<tls:context name="TLS_Context_Broadcast">
        <tls:key-store
		path="broadcast-keystore.jks"
		alias="broadcast"
		keyPassword="mule123"
		password="mule123" />
    </tls:context>

    <http:listener-config name="HTTP_WebSockets_Listener_Config">
        <http:listener-connection
		host="0.0.0.0"
		port="61001"
		tlsContext="TLS_Context_Broadcast"
		protocol="HTTPS"/>
    </http:listener-config>

    <websocket:config name="WebSockets_Server_Config">
        <websocket:connection >
            <websocket:server-settings listenerConfig="HTTP_WebSockets_Listener_Config" />
        </websocket:connection>
    </websocket:config>


    <http:listener-config name="HTTP_Listener_config"  >
		<http:listener-connection host="0.0.0.0" port="8081" />
	</http:listener-config>

	<flow name="broadcasting-stock-quote-snapshot">
		<logger level="INFO" doc:name="Logger" message="#[output application/json
            ---
            {
                info: 'Broadcasting stock quote reduction',
                payload: payload
            }]"/>
    </flow>
	<flow name="producer-app-websocket-open-flow">
        <http:listener  config-ref="HTTP_Listener_config" path="/connect"/>
		<websocket:open-outbound-socket config-ref="WebSockets_Client_Config" path="/feed"/>
        <logger
		level="INFO"
		message="#['New connection established with [' ++ attributes.socketId ++ ']']" />
        <os:contains key="socketId" objectStore="Object_store" target="webSocketConnected" />
        <choice>
            <when expression="#[vars.webSocketConnected]">
                <logger
			level="INFO"
			message="#['The websocket [' ++ attributes.socketId ++ '] is not saved as there is already one connected']"/>
            </when>
            <otherwise >
                <logger
			level="INFO"
			message="#['Websocket with ID [' ++ attributes.socketId ++ '] is saved for later reference']"/>
                <os:store key="socketId" objectStore="Object_store">
                    <os:value><![CDATA[#[attributes.socketId]]]></os:value>
                </os:store>
            </otherwise>
        </choice>
    </flow>

    <flow name="webs-quote-aggFlow"  >
		<websocket:on-inbound-connection  config-ref="WebSockets_Server_Config" path="/quotes"/>
		<websocket:subscribe-groups config-ref="WebSockets_Server_Config" socketId="#[attributes.socketId]" groups='#[[attributes.headers.groups]]' />
	</flow>
	<flow name="client-app-websocket-inbound-listener-flow">
        <websocket:inbound-listener doc:name="On New Inbound Message" config-ref="WebSockets_Server_Config" path="/quotes" />
        <logger level="INFO" doc:name="Logger" message="#[output application/json
            ---
            {
                info: 'New message received from [' ++ attributes.socketId ++ ']',
                payload: payload
            }]" />
    </flow>
	<flow name="producer-app-websocket-outbound-listener-flow">
        <websocket:outbound-listener
		config-ref="WebSockets_Client_Config"
		path="/feed"
		outputMimeType="application/json"/>
        <logger level="INFO" message="#[output application/json
            ---
            {
                info: 'New message received from [' ++ attributes.socketId ++ ']',
                payload: payload
            }]" />
		<logger level="INFO" message="#[[payload.ticker]]"/>
		<websocket:broadcast config-ref="WebSockets_Server_Config" path="/quotes" groups="#[[payload.ticker]]"/>
    </flow>

    <flow name="producer-app-websocket-close-flow">
        <os:contains key="socketId" objectStore="Object_store" target="webSocketConnected"/>
        <choice>
            <when expression="#[vars.webSocketConnected]">
                <os:retrieve key="socketId" objectStore="Object_store" target="socketId"/>
                <logger
			level="INFO"
			message="#['Trying to close websocket [' ++ vars.socketId as String ++ ']']"/>
                <websocket:close-socket
			config-ref="WebSockets_Client_Config"
			socketId="#[vars.socketId]"
			reason="Client wants to close the websocket"/>
            </when>
            <otherwise >
                <logger level="INFO" message="There is no WebSocket to close"/>
            </otherwise>
        </choice>
    </flow>

    <flow name="producer-app-websocket-on-socket-closed-flow">
        <websocket:on-socket-closed config-ref="WebSockets_Client_Config" path="/feed"/>
        <logger
		level="INFO"
		message="#['Websocket [' ++ attributes.socketId ++ '] was closed']"/>
        <os:retrieve key="socketId" objectStore="Object_store" target="socketId" />
        <choice>
            <when expression="#[attributes.socketId == vars.socketId]">
                <logger
			level="INFO"
			message="#['Removing the websocket ID stored for referencing it']" />
                <os:remove key="socketId" objectStore="Object_store" />
            </when>
            <otherwise >
                <logger
			level="INFO"
			message="#['Disconnected WebSocket is not the main one [' ++ vars.socketId as String ++ ']']" />
            </otherwise>
        </choice>
    </flow>

    </mule>

Quote Client App

次の Mule アプリケーションによって少なくとも 3 つの異なる WebSockets が Quote Aggregator App の見積エンドポイントに開かれます。こうした各ソケットは異なるセットのティッカーをリスンします。

Anypoint Studio キャンバスの Quote Client App フロー
Figure 1. Quote Client App フロー

受信された ​QuoteSnapshots​ は CSV 形式に変換され、ファイルに付加されます。

対応するキーストアを生成する手順は、次のとおりです。

  1. ターミナルを開きます。

  2. ターミナルで、Studio のプロジェクトアプリケーションの ​src/main/resources​ フォルダーに移動します。

  3. 次のコマンドを実行します。

    keytool -genkey -v -keystore client-truststore.jks -alias client -keyalg RSA -keysize 2048 -validity 10000

Quote Client App を Quote Aggregator App に接続するには、次のコマンドを使用して、アウトバウンドソケットを開くフローをトリガーします。

Quote Client App の XML

この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:tls="http://www.mulesoft.org/schema/mule/tls"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.mulesoft.org/schema/mule/websocket"
xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/tls http://www.mulesoft.org/schema/mule/tls/current/mule-tls.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/websocket http://www.mulesoft.org/schema/mule/websocket/current/mule-websocket.xsd">

    <http:listener-config name="HTTP_Listener_config">
      <http:listener-connection host="0.0.0.0" port="62000" />
  </http:listener-config>

  <websocket:config name="WebSockets_Client_App_Config">
      <websocket:connection>
          <websocket:client-settings host="0.0.0.0" port="61001" protocol="WSS">
              <tls:context >
                  <tls:trust-store path="client-truststore.jks" password="mule123" insecure="true"/>
              </tls:context>
          </websocket:client-settings>
      </websocket:connection>
  </websocket:config>

  <flow name="aggregator-app-websocket-open-flow">
      <http:listener doc:name="Listener"
  config-ref="HTTP_Listener_config"
  path="open-aggregator-ws"/>
      <set-variable value="#[attributes.queryParams.*ticker]" variableName="groups"/>
      <logger level="INFO" message="#[vars.groups]"/>
  <websocket:open-outbound-socket config-ref="WebSockets_Client_App_Config" path="/quotes" socketId="#[attributes.socketId]" defaultGroups="#[vars.groups]">
    <websocket:headers ><![CDATA[#[output application/java
---
{
groups : vars.groups[0]
}]]]></websocket:headers>
  </websocket:open-outbound-socket>
  </flow>

  <flow name="stock-quotes-clientFlow">
      <websocket:outbound-listener
  config-ref="WebSockets_Client_App_Config"
  path="/quotes"
  outputMimeType="application/json"/>
      <logger level="INFO" doc:name="Logger" message="#[output application/json
          ---
          {
              info: 'New message received on [' ++ attributes.socketId ++ ']',
              payload: payload
          }]"/>
  </flow>
<flow name="webs-clientFlow" >
  <websocket:subscribe-groups config-ref="WebSockets_Client_App_Config" socketId="#[attributes.socketId]" groups="#[vars.groups]" />
</flow>

</mule>