Nav
You are viewing an older version of this section. Click here to navigate to the latest version.

Custom TCP Protocol

When using TCP to communicate with an external program, it may be necessary to write a custom Mule protocol. The first step is to get a complete description of how the external program delimits messages within the TCP stream. The next is to implement the protocol as a Java class.

  • All protocols must implement the interface org.mule.transport.tcp.TcpProtocol, which contains three methods:

    • Object read(InputStream is) reads a message from the TCP socket

    • write(OutputStream os, Object data) writes a message to the TCP socket

    • ResponseOutputStream createResponse(Socket socket) creates a stream to which a response can be written.

  • Protocols which process byte-streams rather than serialized Mule messages can inherit much useful infrastructure by subclassing org.mule.transport.tcp.protocols.AbstractByteProtocol This class

    • implements createResponse

    • handles converting messages to byte arrays, allowing subclasses to implement only the simpler method writeByteArray(OutputStream os, byte[] data)

    • provides methods safeRead(InputStream is, byte[] buffer) and safeRead(InputStream is, byte[] buffer, int size) that handle the situation where data is not currently available when doing non-blocking reads from the TCP socket

Suppose we want to communicate with a server that has a simple protocol: all messages are terminated by >>>. The protocol class would look like this:


       
    
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package org.mule.transport.tcp.integration;

import org.mule.transport.tcp.protocols.AbstractByteProtocol;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class CustomByteProtocol extends AbstractByteProtocol
{

    /**
     * Create a CustomByteProtocol object.
     */
    public CustomByteProtocol()
    {
        super(false); // This protocol does not support streaming.
    }

    /**
     * Write the message's bytes to the socket,
     * then terminate each message with '>>>'.
     */
    @Override
    protected void writeByteArray(OutputStream os, byte[] data) throws IOException
    {
        super.writeByteArray(os, data);
        os.write('>');
        os.write('>');
        os.write('>');
    }

    /**
     * Read bytes until we see '>>>', which ends the message
     */
    public Object read(InputStream is) throws IOException
    {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        int count = 0;
        byte read[] = new byte[1];

        while (true)
        {
            // if no bytes are currently avalable, safeRead()
            // will wait until some arrive
            if (safeRead(is, read) < 0)
            {
                // We've reached EOF.  Return null, so that our
                // caller will know there are no
                // remaining messages
                return null;
            }
            byte b = read[0];
            if (b == '>')
            {
                count++;
                if (count == 3)
                {
                    return baos.toByteArray();
                }
            }
            else
            {
                for (int i = 0; i < count; i++)
                {
                    baos.write('>');
                }
                count = 0;
                baos.write(b);
            }
        }
    }
}