Flex Gateway新着情報
Governance新着情報
Monitoring API Managerソケット用 Anypoint Connector (Sockets Connector) を使用すると、UDP ソケットと TCP ソケットの両方を介してトランスポートレイヤーパッケージを送受信できます。次の例では、アプリケーションで実装するトランスポートレイヤープロトコルを考慮した、利用できる操作の使用方法と、カスタム TCP プロトコルの作成方法を示しています。
次の例では、ソケットリスナーをセットアップしてある状態でアプリケーションを作成します。このアプリケーションは、外部クライアントから特定のデータを受信すると、データベースクエリを実行し、取得した結果をメッセージとして UDP サーバーソケットに送信します。この例では UDP を選択しましたが、Send 操作では TCP ソケットと UDP ソケットの両方にメッセージを送信することができます。
<sockets:listener-config name="Sockets_Listener_config" doc:name="Sockets Listener config">
<sockets:tcp-listener-connection host="localhost" port="8082" >
<sockets:protocol >
<sockets:direct-protocol />
</sockets:protocol>
</sockets:tcp-listener-connection>
</sockets:listener-config>
<db:config name="Database_Config" doc:name="Database Config">
<db:oracle-connection host="localhost" user="sys as sysdba" password="Oradoc_db1" serviceName="orclpdb1.localdomain" />
</db:config>
<sockets:request-config name="Sockets_Request_config" doc:name="Sockets Request config" >
<sockets:udp-requester-connection host="localhost" port="8085" />
</sockets:request-config>
<flow name="socketSearchDatabaseAndReturnResult">
<sockets:listener doc:name="Listener" config-ref="Sockets_Listener_config" outputMimeType="application/json"/>
<db:select doc:name="Select" config-ref="Database_Config">
<db:sql>#["SELECT first_name, last_name from SYSTEM.MUSICIANS where id = :id"]</db:sql>
<db:input-parameters><![CDATA[#[{ 'id': payload.id }]]]></db:input-parameters>
</db:select>
<logger level="INFO" doc:name="Logger" message="#[payload]" />
<sockets:send doc:name="Send" config-ref="Sockets_Request_config">
<sockets:content ><![CDATA[#[output application/json --- 'name': payload[0].FIRST_NAME as String ++ ' ' ++ payload[0].LAST_NAME as String]]]></sockets:content>
</sockets:send>
</flow>
xml
この例をテストする方法:
次のスクリプトで Oracle データベースを初期化します。
CREATE TABLE SYSTEM.MUSICIANS( id INTEGER GENERATED BY DEFAULT AS IDENTITY, age INTEGER, first_name VARCHAR2(100), last_name VARCHAR2(100), PRIMARY KEY(id) ); INSERT INTO SYSTEM.MUSICIANS(age, first_name, last_name) VALUES(37, 'Farrokh', 'Bulsara'); INSERT INTO SYSTEM.MUSICIANS(age, first_name, last_name) VALUES(36, 'Brian Harold', 'May'); INSERT INTO SYSTEM.MUSICIANS(age, first_name, last_name) VALUES(35, 'Roger', 'Meddows Taylor'); INSERT INTO SYSTEM.MUSICIANS(age, first_name, last_name) VALUES(32, 'John', 'Deacon');
xml
netcat コマンド nc -ul 8085
を実行して、ポート 8085 で結果の送信先となる UDP サーバーを起動します。
Mule アプリケーションを実行し、netcat コマンド nc localhost 8082
を実行してデータ {"id": 2}
を送信することで、TCP クライアントソケットを起動して想定されるクエリパラメーターを送信します。
ソケットは全二重であるため、両方向通信が可能であり、メッセージの送受信を同時に行うことができます。次の例の操作は、メッセージを送信するだけの例とは少し異なり、クライアントにデータを送信するだけではなく、応答も待機します。
この操作は、TCP ソケットと UDP ソケットの両方で使用できます。
操作は以下を行います。
RequesterConnection に関連付けられているクライアントを使用してデータを送信します。
データを受信するかタイムアウトになるまでブロックします (タイムアウトになった場合は NULL ペイロードが返されます)。
この例は、データ送信のみの例に新しいフローの socketSendQueryAndReceiveResult
を追加して socketSearchDatabaseAndReturnResult
からのリスナーに接続し、クエリ文字列を送信して、応答を待ちます。応答はファイルに書き込まれます。
<sockets:listener-config name="Sockets_Listener_config" doc:name="Sockets Listener config" >
<sockets:tcp-listener-connection host="localhost" port="8082" >
<sockets:protocol >
<sockets:direct-protocol />
</sockets:protocol>
</sockets:tcp-listener-connection>
</sockets:listener-config>
<sockets:listener-config name="Sockets_Listener_config1" doc:name="Sockets Listener config" >
<sockets:tcp-listener-connection host="localhost" port="8085" >
<sockets:protocol >
<sockets:direct-protocol />
</sockets:protocol>
</sockets:tcp-listener-connection>
</sockets:listener-config>
<sockets:request-config name="Sockets_Request_config" doc:name="Sockets Request config" >
<sockets:tcp-requester-connection host="localhost" port="8082" >
<sockets:protocol >
<sockets:direct-protocol />
</sockets:protocol>
</sockets:tcp-requester-connection>
</sockets:request-config>
<db:config name="Database_Config" doc:name="Database Config" >
<db:oracle-connection host="localhost" user="sys as sysdba" password="Oradoc_db1" serviceName="orclpdb1.localdomain" />
</db:config>
<file:config name="File_Config" doc:name="File Config" >
<file:connection />
</file:config>
<flow name="socket-testFlow">
<sockets:listener doc:name="Listener" config-ref="Sockets_Listener_config" outputMimeType="application/json" />
<db:select doc:name="Select" config-ref="Database_Config">
<db:sql >#["SELECT first_name, last_name from SYSTEM.MUSICIANS where id = :id"]</db:sql>
<db:input-parameters><![CDATA[#[{ 'id': payload.id }]]]></db:input-parameters>
</db:select>
<ee:transform doc:name="Transform Message">
<ee:message >
<ee:set-payload><![CDATA[%dw 2.0
output application/json --- 'name': payload[0].FIRST_NAME as String ++ ' ' ++ payload[0].LAST_NAME as String]]>
</ee:set-payload>
</ee:message>
</ee:transform>
</flow>
<flow name="socketSendAndReceive" >
<sockets:listener doc:name="Listener" config-ref="Sockets_Listener_config1" outputMimeType="application/json"/>
<sockets:send-and-receive doc:name="Send and receive" config-ref="Sockets_Request_config" outputMimeType="application/json">
</sockets:send-and-receive>
<logger level="INFO" doc:name="Logger" message='#[payload]'/>
<file:write path="musicians.json" config-ref="File_Config" mode="CREATE_NEW" />
</flow>
xml
この例をテストするには、データ送信の例と同じデータベース設定を使用して、Mule アプリケーションを実行し、netcat コマンド nc localhost 8085
を使用してエンドポイントにアクセスして、データ {"id":2}
を送信します。
次の例では、文字列ヘッダーを送信されるすべてのメッセージに追加し、すべての追加文字を切り捨てて入力メッセージサイズを固定長に制限する TCP ベースのカスタムプロトコルを作成します。
TcpProtocol
インターフェースを実装するクラスを作成します。
package org.mule.extension.socket.protocol;
import static java.lang.System.arraycopy;
import static org.slf4j.LoggerFactory.getLogger;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.mule.extension.socket.api.socket.tcp.TcpProtocol;
import org.slf4j.Logger;
import java.io.IOException;
import java.io.OutputStream;
import java.io.InputStream;
import java.io.ByteArrayInputStream;
import java.io.BufferedOutputStream;
public class CustomTestTcpProtocol implements TcpProtocol {
protected static final int READ_ATTEMPTS = 50;
protected static final int CUSTOM_BUFFER_SIZE = 30;
private static final String HEADER = "This is my custom protocol.";
private static final Logger LOGGER = getLogger(CustomTestTcpProtocol.class);
protected int bufferSize;
public CustomTestTcpProtocol() {
this(CUSTOM_BUFFER_SIZE);
}
public CustomTestTcpProtocol(int bufferSize) {
this.bufferSize = bufferSize;
}
@Override
public InputStream read(InputStream socketIs) throws IOException {
byte[] buffer = new byte[HEADER.length() + bufferSize];
int bytesRead, attempts = 0;
while ((bytesRead = socketIs.read(buffer)) <= 0 && (attempts < READ_ATTEMPTS)) {
attempts++;
}
if (bytesRead <= 0) {
throw new IOException("Number of read attempts exceeded! Failed to read any data from socket!");
}
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(bytesRead);
byteArrayOutputStream.write(buffer, 0, bytesRead);
return new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
}
@Override
public void write(OutputStream os, InputStream data) throws IOException {
byte[] messageToSend = new byte[HEADER.length() + bufferSize];
byte[] inputPayload = new byte[bufferSize];
int dataLength = data.read(inputPayload);
if (dataLength == bufferSize) {
LOGGER.warn("Data length exceeds buffer size so data will be chunked.");
}
arraycopy(HEADER.getBytes(), 0, messageToSend, 0, HEADER.length());
if (dataLength >= 0) {
arraycopy(inputPayload, 0, messageToSend, HEADER.length(), dataLength);
}
BufferedOutputStream writer = new BufferedOutputStream(os);
writer.write(messageToSend, 0, HEADER.length() + dataLength);
writer.flush();
}
}
}
java
ソケットリスナーソースの [Connector configuration (コネクタ設定)] ウィンドウと、Sockets Send 操作または Send and receive 操作でクラス名を追加します。
ソケットリスナーソース
Studio でソケットリスナーソースの [Global Element Properties (グローバル要素のプロパティ)] 設定を開きます。
[General (一般)] タブで、[Protocol (プロトコル)] を Custom protocol
に設定します。
[Protocol Class Name (プロトコルクラス名)] 項目でクラス名を追加します。
Sockets Send 操作または Send and receive 操作
Studio で Sockets Send 操作の Send and receive 操作の [Global Element Properties (グローバル要素のプロパティ)] 設定を開きます。
[Connection (接続)] タブで、[Protocol (プロトコル)] を Custom protocol
に設定します。
[Protocol Class Name (プロトコルクラス名)] 項目でクラス名を追加します。
[Configuration XML (設定 XML)] タブで、class
パラメーターを使用して接続操作の sockets:custom-protocol
セクションにクラスを追加します。
<sockets:request-config name="SocketsRequestConfigCustomTcp" >
<sockets:tcp-requester-connection host="127.0.0.1" port="${tcp.port}" failOnUnresolvedHost="true" sendTcpNoDelay="true">
<sockets:protocol>
<sockets:custom-protocol class="org.mule.extension.socket.protocol.CustomTestTcpProtocol"/>
</sockets:protocol>
</sockets:tcp-requester-connection>
</sockets:request-config>
<sockets:listener-config name="SocketsListenerConfigCustomTcp">
<sockets:tcp-listener-connection host="127.0.0.1" port="${tcp.port}">
<sockets:protocol>
<sockets:custom-protocol class="org.mule.extension.socket.protocol.CustomTestTcpProtocol"/>
</sockets:protocol>
</sockets:tcp-listener-connection>
</sockets:listener-config>
xml