By dave | February 6, 2016

Using sockets for client / server development

Following on from Using sockets for a character based stream we now introduce the idea of messaging.

When we need to send commands between two systems (often referred to as client and server) we normally use message based communication, this was briefly discussed in the data communications introduction. For this example we will use Java, it’s freely available and there are many good IDEs. Also note that this code is not ready for production use, and is therefore purely for learning purposes.

You may want to download the example source from github. Otherwise, there’s quite a lot to type in!

messages on stream protocol

Above: diagram showing the basics of messages at the data level. At the very least the header would contain the length of the message and the message type. Without these two pieces of information, we could not process the rest of message data. CRC is optional, and often skipped on sockets as the protocol layer guarantees complete and accurate transmission.

In contrast to the last example, where we sent raw character data we now see structure in the data stream. We need to make sure that we are at the right point in the stream before reading or else our messages would not make sense. This is why it is normal to send the message length at the beginning, so even if we can’t process it, we have a chance of skipping it and moving on.

Once we’ve got the message infrastructure in place, the next job is to design a protocol suitable to send messages or use a ready built messaging system. For the purpose of this tutorial we will build a very simple protocol for requesting the time and responding to the request. Given this we’ll need two messages, one to send a current-time-request message, and the other that the server will use to respond with the time. To avoid issues with byte ordering between systems we will use the ByteBuffer class. If you’re not familiar with it I’m currently working on a basic article about this.

Below is the definition of the messages

Message type: time-request
Parameters: time zone as string

Message type: time-response
Parameters: current-time, time zone.

At this point we’ll delve into the examples, I’m assuming you’ve downloaded the source using the link at the start of this article. Although the classes are mainly repeated here, so you could copy / paste or even copy out by hand instead.

First we need to define the Message class; which for this example this class also takes care of all activities to do with sending and receiving messages. It is able to send messages on the socket and decode a message that has been received. Of particular note is the ensureBytesAvailble method which is called both before reading the length or reading the message data. This method is just to make sure we have enough data before starting message decode.

Sending a message is also a bit chicken before the egg, because we need to prepend the message length, which we do not know until it has been placed into the byte buffer. We get around this by having two buffers, one for the length, one for the message.

Message.java

package com.thecoderscorner.example.datacomms.messagestream;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.logging.Logger;

/**
 * Message is the base class of all messages to be send or received over the socket.
 * In this class we present the full set of functions needed to send and receive data
 * over a socket. In a real world case, we'd probably want better separation of concerns
 * but this is more than satisfactory for an example.
 *
 * Message has two abstract methods that must be implemented, these two methods handle
 * conversion to and from a byte buffer.
 */
public abstract class Message {
    private final static Logger LOGGER = Logger.getLogger("MESSAGE");

    /**
     * Must be implemented by all sub classes to convert the bytes in the buffer
     * into the fields in this message object.
     * @param buffer the byte buffer containing the message
     */
    public abstract void fromBytes(ByteBuffer buffer);

    /**
     * Must be implemented by all sub classes to convert the message into
     * bytes in the buffer.
     * @param buffer the byte buffer to receive the message data.
     */
    public abstract void toBytes(ByteBuffer buffer);

    /**
     * We use the simple class name (without package) as the message type.
     * This is not entirely efficient, but will suffice for this example.
     * @return the message type.
     */
    private String messageType() {return getClass().getSimpleName();}

    /**
     * Converts a string into a message field in the buffer passed in.
     * into the buffer
     * @param buffer the buffer that represents the socket
     * @param str the string to be written
     */
    public static void stringToMsg(ByteBuffer buffer, String str) {
        byte[] bytes = str.getBytes();
        int len = bytes.length;
        buffer.putShort((short) len);
        buffer.put(bytes);
    }

    /**
     * converts a message field from the buffer into a string
     * @param buffer the message as a buffer
     * @return the string field
     */
    public static String stringFromMsg(ByteBuffer buffer) {
        int len = buffer.getShort();
        byte[] bytes = new byte[len];
        buffer.get(bytes);
        return new String(bytes);
    }

    /**
     * Reads a single message from the socket, returning it as a sub class of Message
     * @param socket socket to read from
     * @param dataBuffer the data buffer to use
     * @return a message if it could be parsed
     * @throws IOException if the message could not be converted.
     */
    public static Message nextMessageFromSocket(SocketChannel socket, ByteBuffer dataBuffer) throws IOException {

        // read the first 4 bytes to get the message length.
        ensureBytesAvailable(socket, dataBuffer, 4);
        int length = dataBuffer.getInt();

        // read the rest of the message (as denoted by length)
        ensureBytesAvailable(socket, dataBuffer, length);

        // we now get the message type from the payload and see what type of message to create.
        // In a real world example, we may have a message factory that did this for us.
        String type = stringFromMsg(dataBuffer);
        Message msg = null;
        if(type.equals(TimeRequestMsg.class.getSimpleName())) {
            msg = new TimeRequestMsg();
        }
        else if(type.equals(TimeResponseMsg.class.getSimpleName())) {
            msg = new TimeResponseMsg();
        }

        // if we couldn't convert the message, bail out here
        if(msg == null) {
            throw new IOException("Unknown message type: " + type);
        }

        // message's fromBytes is now used to recover the rest of the fields from the payload
        msg.fromBytes(dataBuffer);

        LOGGER.info("Message read from socket: " + msg);

        return msg;

    }

    /**
     * Send any message derived from Message base class on the socket,
     * @param channel the channel on which the message is sent
     * @param toSend the message to send.
     * @throws IOException if there is a problem during writing.
     */
    public static void sendMessage(SocketChannel channel, Message toSend) throws IOException {

        // we need to put the message type into the buffer first.
        ByteBuffer bbMsg = ByteBuffer.allocate(2048);
        stringToMsg(bbMsg, toSend.messageType());

        // and then any extra fields for this type of message
        toSend.toBytes(bbMsg);
        bbMsg.flip();

        // now we need to encode the length into a different buffer.
        ByteBuffer bbOverall = ByteBuffer.allocate(10);
        bbOverall.putInt(bbMsg.remaining());
        bbOverall.flip();

        // and lastly, we write the length, followed by the message.
        long written = channel.write(new ByteBuffer[]{bbOverall, bbMsg});

        LOGGER.info("Message written to socket: " + toSend + ", length was: " + written);
    }

    /**
     * When we are reading messages from the wire, we need to ensure there are
     * enough bytes in the buffer to fully decode the message. If not we keep
     * reading until we have enough.
     * @param socket the socket to read from
     * @param buffer the buffer to store the bytes
     * @param required the amount of data required.
     * @throws IOException if the socket closes or errors out.
     */
    private static void ensureBytesAvailable(SocketChannel socket, ByteBuffer buffer, int required) throws IOException {
        // if there's already something in the buffer, then compact it and prepare it for writing again.
        if(buffer.position() != 0) {
            buffer.compact();
        }

        // we loop until we have enough data to decode the message
        while(buffer.position() < required) {

            // try and read, if read returns 0 or less, the socket's closed.
            int len = socket.read(buffer);
            if(!socket.isOpen() || len <= 0) {
                throw new IOException("Socket closed while reading");
            }

            LOGGER.info("Bytes now in buffer: " + buffer.remaining() + " read from socket: " + len);
        }

        // and finally, prepare the buffer for reading.
        buffer.flip();

    }
}

TimeRequestMsg.java

Alongside Message, we have the two sub-classes (the actual messages). The first is the request message that is sent from the client to the server.

package com.thecoderscorner.example.datacomms.messagestream;

import java.nio.ByteBuffer;
import java.util.TimeZone;

/**
 * Represents the TimeRequest message that can be sent to the server. This is the
 * java form of the message.
 */
public class TimeRequestMsg extends Message {
    private String timeZone;

    public String getTimeZone() {
        return timeZone;
    }

    public void setTimezone(TimeZone zone) {
        timeZone = zone.getID();
    }


    @Override
    public String toString() {
        return "TimeRequestMsg{timeZone='" + timeZone + "'}";
    }

    /** converts the raw message into this message object */
    public void fromBytes(ByteBuffer message) {
        timeZone = stringFromMsg(message);
    }

    /** converts the message into raw bytes. */
    public void toBytes(ByteBuffer buffer) {
        stringToMsg(buffer, timeZone);
    }

}

TimeResponseMsg.java

This is the response message that is sent from the server to the client. Notice that there is no difference, both messages are technically bidirectional.

package com.thecoderscorner.example.datacomms.messagestream;

import java.nio.ByteBuffer;

/**
 * Represents the TimeResponse message sent from the server to the client. This
 * is the java form of the message.
 */
public class TimeResponseMsg extends Message {

    private String currentTime;

    public TimeResponseMsg() {
    }

    public TimeResponseMsg(String currentTime) {
        this.currentTime = currentTime;
    }

    public String getCurrentTime() {
        return currentTime;
    }

    @Override
    public String toString() {
        return "TimeResponse{currentTime='" + currentTime + "'}";
    }


    /** converts the raw message into this message object */
    public void fromBytes(ByteBuffer message) {
        currentTime = stringFromMsg(message);
    }

    /** converts the message into raw bytes. */
    public void toBytes(ByteBuffer buffer) {
        stringToMsg(buffer, currentTime);
    }

}

TimeRequestServer.java

Next is the server side of the communications, this accepts and awaits a connection, once established we immediately wait for a request to arrive, then sending back the time in response.

package com.thecoderscorner.example.datacomms.messagestream;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.DateFormat;
import java.util.Date;
import java.util.TimeZone;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * This is the time request server, it accepts connections on the known port and
 * awaits a TimeRequestMsg. Once this is received, it responds with a TimeResponseMsg.
 *
 * Designed for use with the data communications section at http://www.thecoderscorner.com/data-comms
 */
public class TimeRequestServer {
    // this is the port on which we will accept connections.
    public static final int SERVER_PORT = 5000;

    private final Logger logger = Logger.getLogger("SERVER");

    public static void main(String[] args) throws IOException {
        TimeRequestServer server = new TimeRequestServer();
        server.start();
    }

    private void start() throws IOException {
        // we firstly create a server socket on a spare port, this allows
        // our client to connect, this time we use the channel api.
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress("localhost", SERVER_PORT));

        logger.info("Listening on " + SERVER_PORT);

        // while our process is running
        while (!Thread.currentThread().isInterrupted()) {

            try {
                // attempt to accept a connection on our server port.
                SocketChannel socket = serverSocket.accept();

                // we have a connection, lets service it.
                processSocket(socket);

            } catch (Exception e) {
                logger.log(Level.SEVERE, "socket error", e);
            }
        }
        serverSocket.close();
    }

    private void processSocket(SocketChannel socket) throws IOException {
        ByteBuffer dataBuffer = ByteBuffer.allocate(2048);

        // it's customary to log out who's just connected.
        logger.info("Socket opened to " + socket.getRemoteAddress());

        // we read a message from the socket, and ensure it is a time request
        Message msg = Message.nextMessageFromSocket(socket, dataBuffer);
        if(msg instanceof TimeRequestMsg) {

            // get the timezone from the message and prepare the response.
            logger.info("Received time request from client, servicing");
            String zone = ((TimeRequestMsg) msg).getTimeZone();
            TimeZone tz = TimeZone.getTimeZone(zone);
            DateFormat dateFormat = DateFormat.getDateTimeInstance();
            dateFormat.setTimeZone(tz);

            // and write out the message to the socket, job done,
            // another happy client!
            TimeResponseMsg response = new TimeResponseMsg(dateFormat.format(new Date()));
            Message.sendMessage(socket, response);
        }
        else {
            logger.severe("Unexpected message " + msg);
        }

        // as in the client example, this close should really be
        // wrapped in a finally, omitted for clarity.
        socket.close();
        logger.info("Socket closed");
    }


}

 

TimeRequestClient.java

Lastly, the client establishes a connection to the server and sends the request, it then immediately waits for a response and prints it out.

package com.thecoderscorner.example.datacomms.messagestream;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.TimeZone;
import java.util.logging.Logger;

/**
 * A time request client that makes the request to the server and waits for the result.
 * Once the result arrives the client exits.
 *
 * Designed for use with the data communications section at http://www.thecoderscorner.com/data-comms
 */
public class TimeRequestClient {
    Logger logger = Logger.getLogger("CLIENT");

    public static void main(String[] args) throws IOException {
        TimeRequestClient client = new TimeRequestClient();
        client.start();
    }

    private void start() throws IOException {

        // open the socket and connect to the server on the known port.
        SocketChannel channel = SocketChannel.open();
        channel.connect(new InetSocketAddress("localhost", TimeRequestServer.SERVER_PORT));

        logger.info("Connected to server: " + channel.getRemoteAddress());


        // if we have an open channel, then lets do the request.
        if(channel.isOpen()) {
            ByteBuffer byteBuffer = ByteBuffer.allocate(2048);

            // we formulate the request message and send it to the server.
            TimeRequestMsg msg = new TimeRequestMsg();
            msg.setTimezone(TimeZone.getDefault());
            Message.sendMessage(channel, msg);

            // we then await the servers response.
            Message response = Message.nextMessageFromSocket(channel, byteBuffer);
            if(! (response instanceof TimeResponseMsg)) {
                // didn't get what we expected!
                throw new IOException("Unexpected response from server:" + response);
            }
            logger.info("Response: " + response);
        }

        // don't forget to close the channel, really in a real world example this
        // would be in a finally block, to ensure it's always called.
        channel.close();

        logger.info("channel closed");
    }
}

 

comments powered by Disqus
We use cookies to analyse traffic and to personalise content and adverts. Our social buttons may also use cookies.