ソケットの例

ソケット用 Anypoint Connector (Sockets Connector) を使用すると、UDP ソケットと TCP ソケットの両方を介してトランスポートレイヤーパッケージを送受信できます。次の例では、アプリケーションで実装するトランスポートレイヤープロトコルを考慮した、利用できる操作の使用方法と、カスタム TCP プロトコルの作成方法を示しています。

メッセージの送信

次の例では、ソケットリスナーをセットアップしてある状態でアプリケーションを作成します。このアプリケーションは、外部クライアントから特定のデータを受信すると、データベースクエリを実行し、取得した結果をメッセージとして UDP サーバーソケットに送信します。この例では UDP を選択しましたが、Send 操作では TCP ソケットと UDP ソケットの両方にメッセージを送信することができます。

	<sockets:listener-config name="Sockets_Listener_config" doc:name="Sockets Listener config">
		<sockets:tcp-listener-connection host="localhost" port="8082" >
			<sockets:protocol >
				<sockets:direct-protocol />
			</sockets:protocol>
		</sockets:tcp-listener-connection>
	</sockets:listener-config>

	<db:config name="Database_Config" doc:name="Database Config">
		<db:oracle-connection host="localhost" user="sys as sysdba" password="Oradoc_db1" serviceName="orclpdb1.localdomain" />
	</db:config>

	<sockets:request-config name="Sockets_Request_config" doc:name="Sockets Request config" >
		<sockets:udp-requester-connection host="localhost" port="8085" />
	</sockets:request-config>

	<flow name="socketSearchDatabaseAndReturnResult">
		<sockets:listener doc:name="Listener" config-ref="Sockets_Listener_config" outputMimeType="application/json"/>
		<db:select doc:name="Select" config-ref="Database_Config">
			<db:sql>#["SELECT first_name, last_name from SYSTEM.MUSICIANS where id = :id"]</db:sql>
			<db:input-parameters><![CDATA[#[{ 'id': payload.id }]]]></db:input-parameters>
		</db:select>
		<logger level="INFO" doc:name="Logger" message="#[payload]" />
		<sockets:send doc:name="Send" config-ref="Sockets_Request_config">
			<sockets:content ><![CDATA[#[output application/json --- 'name': payload[0].FIRST_NAME as String ++ ' ' ++ payload[0].LAST_NAME as String]]]></sockets:content>
		</sockets:send>
	</flow>

この例をテストする方法:

  1. 次のスクリプトで Oracle データベースを初期化します。

    CREATE TABLE SYSTEM.MUSICIANS(
        id INTEGER GENERATED BY DEFAULT AS IDENTITY,
        age INTEGER,
        first_name VARCHAR2(100),
        last_name VARCHAR2(100),
        PRIMARY KEY(id)
    );

    INSERT INTO SYSTEM.MUSICIANS(age, first_name, last_name) VALUES(37, 'Farrokh', 'Bulsara');
    INSERT INTO SYSTEM.MUSICIANS(age, first_name, last_name) VALUES(36, 'Brian Harold', 'May');
    INSERT INTO SYSTEM.MUSICIANS(age, first_name, last_name) VALUES(35, 'Roger', 'Meddows Taylor');
    INSERT INTO SYSTEM.MUSICIANS(age, first_name, last_name) VALUES(32, 'John', 'Deacon');
  1. netcat コマンド ​nc -ul 8085​ を実行して、ポート 8085 で結果の送信先となる UDP サーバーを起動します。

  1. Mule アプリケーションを実行し、netcat コマンド ​nc localhost 8082​ を実行してデータ ​{"id": 2}​ を送信することで、TCP クライアントソケットを起動して想定されるクエリパラメーターを送信します。

メッセージの送受信

ソケットは全二重であるため、両方向通信が可能であり、メッセージの送受信を同時に行うことができます。次の例の操作は、メッセージを送信するだけの例とは少し異なり、クライアントにデータを送信するだけではなく、応答も待機します。
この操作は、TCP ソケットと UDP ソケットの両方で使用できます。

操作は以下を行います。

  1. RequesterConnection に関連付けられているクライアントを使用してデータを送信します。

  2. データを受信するかタイムアウトになるまでブロックします (タイムアウトになった場合は NULL ペイロードが返されます)。

この例は、データ送信のみの例に新しいフローの ​socketSendQueryAndReceiveResult​ を追加して ​socketSearchDatabaseAndReturnResult​ からのリスナーに接続し、クエリ文字列を送信して、応答を待ちます。応答はファイルに書き込まれます。

	<sockets:listener-config name="Sockets_Listener_config" doc:name="Sockets Listener config" >
		<sockets:tcp-listener-connection host="localhost" port="8082" >
			<sockets:protocol >
				<sockets:direct-protocol />
			</sockets:protocol>
		</sockets:tcp-listener-connection>
	</sockets:listener-config>

	<sockets:listener-config name="Sockets_Listener_config1" doc:name="Sockets Listener config" >
		<sockets:tcp-listener-connection host="localhost" port="8085" >
			<sockets:protocol >
				<sockets:direct-protocol />
			</sockets:protocol>
		</sockets:tcp-listener-connection>
	</sockets:listener-config>

	<sockets:request-config name="Sockets_Request_config" doc:name="Sockets Request config" >
		<sockets:tcp-requester-connection host="localhost" port="8082" >
			<sockets:protocol >
				<sockets:direct-protocol />
			</sockets:protocol>
		</sockets:tcp-requester-connection>
	</sockets:request-config>

	<db:config name="Database_Config" doc:name="Database Config" >
		<db:oracle-connection host="localhost" user="sys as sysdba" password="Oradoc_db1" serviceName="orclpdb1.localdomain" />
	</db:config>

	<file:config name="File_Config" doc:name="File Config" >
		<file:connection />
	</file:config>

	<flow name="socket-testFlow">
		<sockets:listener doc:name="Listener" config-ref="Sockets_Listener_config" outputMimeType="application/json" />
		<db:select doc:name="Select" config-ref="Database_Config">
			<db:sql >#["SELECT first_name, last_name from SYSTEM.MUSICIANS where id = :id"]</db:sql>
			<db:input-parameters><![CDATA[#[{ 'id': payload.id }]]]></db:input-parameters>
		</db:select>
		<ee:transform doc:name="Transform Message">
			<ee:message >
				<ee:set-payload><![CDATA[%dw 2.0
					output application/json --- 'name': payload[0].FIRST_NAME as String ++ ' ' ++ payload[0].LAST_NAME as String]]>
				</ee:set-payload>
			</ee:message>
		</ee:transform>
	</flow>

	<flow name="socketSendAndReceive" >
		<sockets:listener doc:name="Listener" config-ref="Sockets_Listener_config1" outputMimeType="application/json"/>
		<sockets:send-and-receive doc:name="Send and receive" config-ref="Sockets_Request_config" outputMimeType="application/json">
		</sockets:send-and-receive>
		<logger level="INFO" doc:name="Logger" message='#[payload]'/>
		<file:write path="musicians.json" config-ref="File_Config" mode="CREATE_NEW" />
	</flow>

この例をテストするには、データ送信の例と同じデータベース設定を使用して、Mule アプリケーションを実行し、netcat コマンド ​nc localhost 8085​ を使用してエンドポイントにアクセスして、データ ​{"id":2}​ を送信します。

カスタム TCP プロトコルを作成する

次の例では、文字列ヘッダーを送信されるすべてのメッセージに追加し、すべての追加文字を切り捨てて入力メッセージサイズを固定長に制限する TCP ベースのカスタムプロトコルを作成します。

  1. TcpProtocol​ インターフェースを実装するクラスを作成します。

package org.mule.extension.socket.protocol;

import static java.lang.System.arraycopy;
import static org.slf4j.LoggerFactory.getLogger;

import org.apache.commons.io.output.ByteArrayOutputStream;
import org.mule.extension.socket.api.socket.tcp.TcpProtocol;
import org.slf4j.Logger;

import java.io.IOException;
import java.io.OutputStream;
import java.io.InputStream;
import java.io.ByteArrayInputStream;
import java.io.BufferedOutputStream;

public class CustomTestTcpProtocol implements TcpProtocol {

  protected static final int READ_ATTEMPTS = 50;
  protected static final int CUSTOM_BUFFER_SIZE = 30;
  private static final String HEADER = "This is my custom protocol.";
  private static final Logger LOGGER = getLogger(CustomTestTcpProtocol.class);

  protected int bufferSize;

  public CustomTestTcpProtocol() {
    this(CUSTOM_BUFFER_SIZE);
  }

  public CustomTestTcpProtocol(int bufferSize) {
    this.bufferSize = bufferSize;
  }

  @Override
  public InputStream read(InputStream socketIs) throws IOException {
    byte[] buffer = new byte[HEADER.length() + bufferSize];

    int bytesRead, attempts = 0;
    while ((bytesRead = socketIs.read(buffer)) <= 0 && (attempts < READ_ATTEMPTS)) {
      attempts++;
    }

    if (bytesRead <= 0) {
      throw new IOException("Number of read attempts exceeded! Failed to read any data from socket!");
    }

    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(bytesRead);
    byteArrayOutputStream.write(buffer, 0, bytesRead);
    return new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
  }

  @Override
  public void write(OutputStream os, InputStream data) throws IOException {
    byte[] messageToSend = new byte[HEADER.length() + bufferSize];
    byte[] inputPayload = new byte[bufferSize];

    int dataLength = data.read(inputPayload);
    if (dataLength == bufferSize) {
      LOGGER.warn("Data length exceeds buffer size so data will be chunked.");
    }

    arraycopy(HEADER.getBytes(), 0, messageToSend, 0, HEADER.length());

    if (dataLength >= 0) {
      arraycopy(inputPayload, 0, messageToSend, HEADER.length(), dataLength);
    }

    BufferedOutputStream writer = new BufferedOutputStream(os);
		writer.write(messageToSend, 0, HEADER.length() + dataLength);
		writer.flush();
    }
  }
}
  1. ソケットリスナーソースの ​[Connector configuration (コネクタ設定)]​ ウィンドウと、Sockets Send 操作または Send and receive 操作でクラス名を追加します。

    • ソケットリスナーソース

      1. Studio でソケットリスナーソースの ​[Global Element Properties (グローバル要素のプロパティ)]​ 設定を開きます。

      2. [General (一般)]​ タブで、​[Protocol (プロトコル)]​ を ​Custom protocol​ に設定します。

      3. [Protocol Class Name (プロトコルクラス名)]​ 項目でクラス名を追加します。

ソケットのカスタムプロトコル設定
Figure 1. ソケットリスナーのカスタムプロトコル設定
  • Sockets Send 操作または Send and receive 操作

    1. Studio で Sockets Send 操作の Send and receive 操作の ​[Global Element Properties (グローバル要素のプロパティ)]​ 設定を開きます。

    2. [Connection (接続)]​ タブで、​[Protocol (プロトコル)]​ を ​Custom protocol​ に設定します。

    3. [Protocol Class Name (プロトコルクラス名)]​ 項目でクラス名を追加します。

ソケットの送受信のカスタムプロトコル設定
Figure 2. ソケットの送受信のカスタムプロトコル設定

[Configuration XML (設定 XML)]​ タブで、​class​ パラメーターを使用して接続操作の ​sockets:custom-protocol​ セクションにクラスを追加します。

<sockets:request-config name="SocketsRequestConfigCustomTcp" >
    <sockets:tcp-requester-connection host="127.0.0.1" port="${tcp.port}" failOnUnresolvedHost="true" sendTcpNoDelay="true">
        <sockets:protocol>
            <sockets:custom-protocol class="org.mule.extension.socket.protocol.CustomTestTcpProtocol"/>
        </sockets:protocol>
    </sockets:tcp-requester-connection>
</sockets:request-config>

<sockets:listener-config name="SocketsListenerConfigCustomTcp">
    <sockets:tcp-listener-connection host="127.0.0.1" port="${tcp.port}">
        <sockets:protocol>
            <sockets:custom-protocol class="org.mule.extension.socket.protocol.CustomTestTcpProtocol"/>
        </sockets:protocol>
    </sockets:tcp-listener-connection>
</sockets:listener-config>