Flex Gateway新着情報
Governance新着情報
Monitoring API Manager次の Mule アプリケーションの例では、WebSockets 用 Anypoint Connector (WebSockets Connector) を使用して、株価情報を取得してブロードキャストするインテグレーションシステムを構築できます。
次の Mule アプリケーションは、継続的でランダムな株価情報を生成します。
株価情報の例:
{ "ticker": "CRM", "price": 157.6, "cur": "USD", "timestamp": 1563374475104 }
Quote
種別で前述の株価情報の例を識別します。
Mule アプリケーションは、少なくとも 5 つの異なるティッカー (CRM、MELI、GOOG、NFLX、AAPL) を使用するランダムな株価を使用して約 50 件の見積を生成します。
また、1 つの接続のみを受け入れる TLS セキュア wss://localhost:60000/feed
エンドポイントを公開します。
最初のクライアントはエンドポイントに正常に接続する必要があります。
エンドポイントに接続する以降のクライアントには、「"Sorry, spot taken"
(申し訳ありません。スポットは取得済みです)」というテキストが含まれる WSS メッセージが送信されます。
生成された各見積もりが接続されたフィードクライアントに送信されます。
対応するキーストアを生成する手順は、次のとおりです。
ターミナルを開きます。
ターミナルで、Studio のプロジェクトアプリケーションの src/main/resources
フォルダーに移動します。
次のコマンドを実行します。
keytool -genkey -v -keystore producer-keystore.jks -alias producerkey -keyalg RSA -keysize 2048 -validity 10000
この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:tls="http://www.mulesoft.org/schema/mule/tls"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:websocket="http://www.mulesoft.org/schema/mule/websocket" xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:os="http://www.mulesoft.org/schema/mule/os" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/tls http://www.mulesoft.org/schema/mule/tls/current/mule-tls.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/websocket http://www.mulesoft.org/schema/mule/websocket/current/mule-websocket.xsd
http://www.mulesoft.org/schema/mule/os http://www.mulesoft.org/schema/mule/os/current/mule-os.xsd">
<tls:context name="TLS_Context">
<tls:key-store
path="producer-keystore.jks"
alias="producerkey"
keyPassword="mule123"
password="mule123" />
</tls:context>
<http:listener-config name="HTTP_TLS_Listener_config">
<http:listener-connection
host="0.0.0.0"
port="60000"
tlsContext="TLS_Context"
protocol="HTTPS"/>
</http:listener-config>
<http:listener-config name="HTTP_Listener_config">
<http:listener-connection host="0.0.0.0" port="60001" />
</http:listener-config>
<websocket:config name="WebSockets_Config">
<websocket:connection >
<websocket:server-settings listenerConfig="HTTP_TLS_Listener_config" />
</websocket:connection>
</websocket:config>
<os:object-store name="Object_store" persistent="false" />
<flow name="stock-quotes-producer-flow">
<scheduler>
<scheduling-strategy >
<fixed-frequency frequency="15" timeUnit="SECONDS"/>
</scheduling-strategy>
</scheduler>
<ee:transform doc:name="Transform Message">
<ee:message >
<ee:set-payload >
<![CDATA[%dw 2.0
var randomStockSelector = randomInt(5)
var randomPriceVariation = (randomInt(401) / 100) - 2 as String { format: "#.00"} as Number
var baseStockQuotes = [
{
"ticker": "CRM",
"price": 157.6,
"cur": "USD"
},
{
"ticker": "MELI",
"price": 646.24,
"cur": "USD"
},
{
"ticker": "GOOG",
"price": 1134.14,
"cur": "USD"
},
{
"ticker": "NFLX",
"price": 316.53,
"cur": "USD"
},
{
"ticker": "AAPL",
"price": 208.19,
"cur": "USD"
}
]
var selectedStock = baseStockQuotes[randomStockSelector]
output application/json
---
{
ticker : selectedStock.ticker,
price : selectedStock.price + randomPriceVariation,
cur : selectedStock.cur,
timestamp : now() as Number
}]]>
</ee:set-payload>
</ee:message>
</ee:transform>
<flow-ref name="send-stock-quote-flow"/>
</flow>
<flow name="send-stock-quote-flow">
<os:contains
objectStore="Object_store"
key="webSocketConnectedId"
target="webSocketConnected"/>
<choice>
<when expression="#[vars.webSocketConnected]">
<logger level="INFO" message="Sending" />
<os:retrieve
objectStore="Object_store"
target="webSocketConnectedId"
key="webSocketConnectedId" />
<websocket:send
socketId="#[vars.webSocketConnectedId]"
config-ref="WebSockets_Config"/>
</when>
</choice>
</flow>
<flow name="on-new-inbound-connection-flow">
<websocket:on-inbound-connection
doc:name="On New Inbound Connection"
config-ref="WebSockets_Config"
path="/feed" />
<os:contains
key="webSocketConnectedId"
target="webSocketConnected"
objectStore="Object_store"/>
<choice>
<when expression="#[vars.webSocketConnected]">
<os:retrieve
key="webSocketConnectedId"
objectStore="Object_store"
target="webSocketConnectedId" />
<websocket:close-socket
socketId="#[attributes.socketId]"
reason="Sorry, spot taken"
config-ref="WebSockets_Config"/>
</when>
<otherwise>
<os:store key="webSocketConnectedId" objectStore="Object_store">
<os:value ><![CDATA[#[attributes.socketId]]]></os:value>
</os:store>
</otherwise>
</choice>
</flow>
<flow name="on-new-inbound-message-flow">
<websocket:inbound-listener config-ref="WebSockets_Config" path="/feed"/>
<logger
level="INFO"
doc:name="Logger"
message="#[output application/json
---
{
info: 'New message received from [' ++ attributes.socketId ++ ']',
payload : payload
}]"/>
</flow>
<flow name="on-socket-closed-flow">
<websocket:on-socket-closed config-ref="WebSockets_Config" path="/feed"/>
<os:retrieve
key="webSocketConnectedId"
objectStore="Object_store"
target="webSocketConnectedId" />
<choice>
<when expression="#[attributes.socketId == vars.webSocketConnectedId]">
<os:remove key="webSocketConnectedId" objectStore="Object_store"/>
</when>
</choice>
</flow>
<flow name="close-websocket-flow">
<os:contains
key="webSocketConnectedId"
target="webSocketConnected"
objectStore="Object_store"/>
<choice>
<when expression="#[vars.webSocketConnected]">
<os:retrieve
doc:name="Retrieve"
key="webSocketConnectedId"
objectStore="Object_store"
target="webSocketConnectedId" />
<websocket:close-socket
config-ref="WebSockets_Config"
socketId="#[vars.webSocketConnectedId]"
reason="Producer app wanted to close the websocket"/>
</when>
</choice>
</flow>
</mule>
xml
次の Mule アプリケーションは、前の Quote Producer App のフィードエンドポイントに接続し、すべての見積を受信します。次の、Quote Producer App はティッカー別に見積を分割し、5 秒ごとに実行される時間ベースのアグリゲーターに配置します。
集計が実行されると、アグリゲーターは縮小される Array<Quote>
を単一の QuoteSnapshot
種別に出力します。
{
"ticker": "CRM"
"price": "157.54 USD"
}
json
縮小は、タイムスタンプが最大の Quote
を選択して変換することで実行されます。
生成されたスナップショットはサブスクライバーの動的リストにブロードキャストされます。
サブスクリプションは、Quote Aggregator App が公開する wss://localhost:8082/quotes
エンドポイントを通じて実行されます。
クライアントはどの株価情報に従うかを示すクエリパラメーターを使用して前のエンドポイントに接続できます (例: wss://localhost:8082/quotes?ticker=CRM&ticker=MELI
)。
対応するキーストアとトラストストアを生成する手順は、次のとおりです。
ターミナルを開きます。
ターミナルで、Studio のプロジェクトアプリケーションの src/main/resources
フォルダーに移動します。
キーストアの次のコマンドを実行します。
keytool -genkey -v -keystore broadcast-keystore.jks -alias broadcast -keyalg RSA -keysize 2048 -validity 10000
次のコマンドを実行してトラストストアを生成します。
keytool -genkey -v -keystore aggregator-truststore.jks -alias broadcast -keyalg RSA -keysize 2048 -validity 10000
要求によって、CRM
および MELI
ティッカーのスナップショットを取得する WebSocket が確立されます。これは、生成されたソケットを適切なソケットグループにサブスクライブすることで実行されます。
Quote Aggregator App を Quote Producer App に接続するには、次のコマンドを使用して、アウトバウンドソケットを開くフローをトリガーします。
+
curl -k http://localhost:8081/connect
この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:tls="http://www.mulesoft.org/schema/mule/tls"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:websocket="http://www.mulesoft.org/schema/mule/websocket"
xmlns:os="http://www.mulesoft.org/schema/mule/os"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/tls http://www.mulesoft.org/schema/mule/tls/current/mule-tls.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/os http://www.mulesoft.org/schema/mule/os/current/mule-os.xsd
http://www.mulesoft.org/schema/mule/websocket http://www.mulesoft.org/schema/mule/websocket/current/mule-websocket.xsd">
<tls:context name="TLS_Context">
<tls:trust-store path="aggregator-truststore.jks" password="mule123" insecure="true"/>
</tls:context>
<http:listener-config name="HTTP_API_Listener_config">
<http:listener-connection host="0.0.0.0" port="61000" />
</http:listener-config>
<websocket:config name="WebSockets_Client_Config">
<websocket:connection >
<websocket:client-settings
host="0.0.0.0"
port="60000"
protocol="WSS"
tlsContext="TLS_Context"/>
</websocket:connection>
</websocket:config>
<os:object-store name="Object_store" persistent="false" />
<tls:context name="TLS_Context_Broadcast">
<tls:key-store
path="broadcast-keystore.jks"
alias="broadcast"
keyPassword="mule123"
password="mule123" />
</tls:context>
<http:listener-config name="HTTP_WebSockets_Listener_Config">
<http:listener-connection
host="0.0.0.0"
port="61001"
tlsContext="TLS_Context_Broadcast"
protocol="HTTPS"/>
</http:listener-config>
<websocket:config name="WebSockets_Server_Config">
<websocket:connection >
<websocket:server-settings listenerConfig="HTTP_WebSockets_Listener_Config" />
</websocket:connection>
</websocket:config>
<http:listener-config name="HTTP_Listener_config" >
<http:listener-connection host="0.0.0.0" port="8081" />
</http:listener-config>
<flow name="broadcasting-stock-quote-snapshot">
<logger level="INFO" doc:name="Logger" message="#[output application/json
---
{
info: 'Broadcasting stock quote reduction',
payload: payload
}]"/>
</flow>
<flow name="producer-app-websocket-open-flow">
<http:listener config-ref="HTTP_Listener_config" path="/connect"/>
<websocket:open-outbound-socket config-ref="WebSockets_Client_Config" path="/feed"/>
<logger
level="INFO"
message="#['New connection established with [' ++ attributes.socketId ++ ']']" />
<os:contains key="socketId" objectStore="Object_store" target="webSocketConnected" />
<choice>
<when expression="#[vars.webSocketConnected]">
<logger
level="INFO"
message="#['The websocket [' ++ attributes.socketId ++ '] is not saved as there is already one connected']"/>
</when>
<otherwise >
<logger
level="INFO"
message="#['Websocket with ID [' ++ attributes.socketId ++ '] is saved for later reference']"/>
<os:store key="socketId" objectStore="Object_store">
<os:value><![CDATA[#[attributes.socketId]]]></os:value>
</os:store>
</otherwise>
</choice>
</flow>
<flow name="webs-quote-aggFlow" >
<websocket:on-inbound-connection config-ref="WebSockets_Server_Config" path="/quotes"/>
<websocket:subscribe-groups config-ref="WebSockets_Server_Config" socketId="#[attributes.socketId]" groups='#[[attributes.headers.groups]]' />
</flow>
<flow name="client-app-websocket-inbound-listener-flow">
<websocket:inbound-listener doc:name="On New Inbound Message" config-ref="WebSockets_Server_Config" path="/quotes" />
<logger level="INFO" doc:name="Logger" message="#[output application/json
---
{
info: 'New message received from [' ++ attributes.socketId ++ ']',
payload: payload
}]" />
</flow>
<flow name="producer-app-websocket-outbound-listener-flow">
<websocket:outbound-listener
config-ref="WebSockets_Client_Config"
path="/feed"
outputMimeType="application/json"/>
<logger level="INFO" message="#[output application/json
---
{
info: 'New message received from [' ++ attributes.socketId ++ ']',
payload: payload
}]" />
<logger level="INFO" message="#[[payload.ticker]]"/>
<websocket:broadcast config-ref="WebSockets_Server_Config" path="/quotes" groups="#[[payload.ticker]]"/>
</flow>
<flow name="producer-app-websocket-close-flow">
<os:contains key="socketId" objectStore="Object_store" target="webSocketConnected"/>
<choice>
<when expression="#[vars.webSocketConnected]">
<os:retrieve key="socketId" objectStore="Object_store" target="socketId"/>
<logger
level="INFO"
message="#['Trying to close websocket [' ++ vars.socketId as String ++ ']']"/>
<websocket:close-socket
config-ref="WebSockets_Client_Config"
socketId="#[vars.socketId]"
reason="Client wants to close the websocket"/>
</when>
<otherwise >
<logger level="INFO" message="There is no WebSocket to close"/>
</otherwise>
</choice>
</flow>
<flow name="producer-app-websocket-on-socket-closed-flow">
<websocket:on-socket-closed config-ref="WebSockets_Client_Config" path="/feed"/>
<logger
level="INFO"
message="#['Websocket [' ++ attributes.socketId ++ '] was closed']"/>
<os:retrieve key="socketId" objectStore="Object_store" target="socketId" />
<choice>
<when expression="#[attributes.socketId == vars.socketId]">
<logger
level="INFO"
message="#['Removing the websocket ID stored for referencing it']" />
<os:remove key="socketId" objectStore="Object_store" />
</when>
<otherwise >
<logger
level="INFO"
message="#['Disconnected WebSocket is not the main one [' ++ vars.socketId as String ++ ']']" />
</otherwise>
</choice>
</flow>
</mule>
xml
次の Mule アプリケーションによって少なくとも 3 つの異なる WebSockets が Quote Aggregator App の見積エンドポイントに開かれます。こうした各ソケットは異なるセットのティッカーをリスンします。
受信された QuoteSnapshots
は CSV 形式に変換され、ファイルに付加されます。
対応するキーストアを生成する手順は、次のとおりです。
ターミナルを開きます。
ターミナルで、Studio のプロジェクトアプリケーションの src/main/resources
フォルダーに移動します。
次のコマンドを実行します。
keytool -genkey -v -keystore client-truststore.jks -alias client -keyalg RSA -keysize 2048 -validity 10000
Quote Client App を Quote Aggregator App に接続するには、次のコマンドを使用して、アウトバウンドソケットを開くフローをトリガーします。
この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:tls="http://www.mulesoft.org/schema/mule/tls"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.mulesoft.org/schema/mule/websocket"
xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/tls http://www.mulesoft.org/schema/mule/tls/current/mule-tls.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/websocket http://www.mulesoft.org/schema/mule/websocket/current/mule-websocket.xsd">
<http:listener-config name="HTTP_Listener_config">
<http:listener-connection host="0.0.0.0" port="62000" />
</http:listener-config>
<websocket:config name="WebSockets_Client_App_Config">
<websocket:connection>
<websocket:client-settings host="0.0.0.0" port="61001" protocol="WSS">
<tls:context >
<tls:trust-store path="client-truststore.jks" password="mule123" insecure="true"/>
</tls:context>
</websocket:client-settings>
</websocket:connection>
</websocket:config>
<flow name="aggregator-app-websocket-open-flow">
<http:listener doc:name="Listener"
config-ref="HTTP_Listener_config"
path="open-aggregator-ws"/>
<set-variable value="#[attributes.queryParams.*ticker]" variableName="groups"/>
<logger level="INFO" message="#[vars.groups]"/>
<websocket:open-outbound-socket config-ref="WebSockets_Client_App_Config" path="/quotes" socketId="#[attributes.socketId]" defaultGroups="#[vars.groups]">
<websocket:headers ><![CDATA[#[output application/java
---
{
groups : vars.groups[0]
}]]]></websocket:headers>
</websocket:open-outbound-socket>
</flow>
<flow name="stock-quotes-clientFlow">
<websocket:outbound-listener
config-ref="WebSockets_Client_App_Config"
path="/quotes"
outputMimeType="application/json"/>
<logger level="INFO" doc:name="Logger" message="#[output application/json
---
{
info: 'New message received on [' ++ attributes.socketId ++ ']',
payload: payload
}]"/>
</flow>
<flow name="webs-clientFlow" >
<websocket:subscribe-groups config-ref="WebSockets_Client_App_Config" socketId="#[attributes.socketId]" groups="#[vars.groups]" />
</flow>
</mule>
xml