Initial ws handshake implemented

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/core/epbase.c b/src/core/epbase.c
index 28e0905..580a07a 100644
--- a/src/core/epbase.c
+++ b/src/core/epbase.c
@@ -76,3 +76,4 @@
 void nn_epbase_stat_increment(struct nn_epbase *self, int name, int increment) {
     nn_ep_stat_increment(self->ep, name, increment);
 }
+
diff --git a/src/transports/ws/aws.c b/src/transports/ws/aws.c
index 1caad73..5a3c8d6 100644
--- a/src/transports/ws/aws.c
+++ b/src/transports/ws/aws.c
@@ -26,20 +26,27 @@
 #include "../../utils/err.h"
 #include "../../utils/cont.h"
 #include "../../utils/attr.h"
+#include "../../utils/alloc.h"
+#include "../../utils/wire.h"
 
 #define NN_AWS_STATE_IDLE 1
 #define NN_AWS_STATE_ACCEPTING 2
-#define NN_AWS_STATE_ACTIVE 3
-#define NN_AWS_STATE_STOPPING_SWS 4
-#define NN_AWS_STATE_STOPPING_USOCK 5
-#define NN_AWS_STATE_DONE 6
-#define NN_AWS_STATE_STOPPING_SWS_FINAL 7
-#define NN_AWS_STATE_STOPPING 8
+#define NN_AWS_STATE_RECEIVING_WSHDR 3
+#define NN_AWS_STATE_SENDING_HDR 4
+#define NN_AWS_STATE_RECEIVING_SPHDR 5
+#define NN_AWS_STATE_ACTIVE 6
+#define NN_AWS_STATE_STOPPING_SWS 7
+#define NN_AWS_STATE_STOPPING_USOCK 8
+#define NN_AWS_STATE_DONE 9
+#define NN_AWS_STATE_STOPPING_SWS_FINAL 10
+#define NN_AWS_STATE_STOPPING 11
 
 #define NN_AWS_SRC_USOCK 1
 #define NN_AWS_SRC_SWS 2
 #define NN_AWS_SRC_LISTENER 3
 
+#define NN_AWS_BUF_SIZE 2048
+
 /*  Private functions. */
 static void nn_aws_handler (struct nn_fsm *self, int src, int type,
     void *srcptr);
@@ -58,6 +65,7 @@
     self->listener_owner.src = -1;
     self->listener_owner.fsm = NULL;
     nn_sws_init (&self->sws, NN_AWS_SRC_SWS, epbase, &self->fsm);
+    self->buf = NULL;
     nn_fsm_event_init (&self->accepted);
     nn_fsm_event_init (&self->done);
     nn_list_item_init (&self->item);
@@ -71,6 +79,8 @@
     nn_fsm_event_term (&self->done);
     nn_fsm_event_term (&self->accepted);
     nn_sws_term (&self->sws);
+    if (self->buf)
+        nn_free (self->buf);
     nn_usock_term (&self->usock);
     nn_fsm_term (&self->fsm);
 }
@@ -142,6 +152,7 @@
     NN_UNUSED void *srcptr)
 {
     struct nn_aws *aws;
+    struct nn_iovec iov;
     int val;
     size_t sz;
 
@@ -203,13 +214,17 @@
                 aws->listener_owner.fsm = NULL;
                 nn_fsm_raise (&aws->fsm, &aws->accepted, NN_AWS_ACCEPTED);
 
-                /*  Start the sws state machine. */
-                nn_usock_activate (&aws->usock);
-                nn_sws_start (&aws->sws, &aws->usock, NN_SWS_MODE_SERVER);
-                aws->state = NN_AWS_STATE_ACTIVE;
+                /*  Allocate the buffer to be used during the handshake. */
+                nn_assert (aws->buf == NULL);
+                aws->buf = nn_alloc (NN_AWS_BUF_SIZE, "aws handshake buffer");
+                alloc_assert (aws->buf);
 
-                nn_epbase_stat_increment (aws->epbase,
-                    NN_STAT_ACCEPTED_CONNECTIONS, 1);
+                /*  Start reading the request. It is at least 150 bytes long. */
+                nn_usock_activate (&aws->usock);
+                nn_assert (NN_AWS_BUF_SIZE >= 150);
+                nn_usock_recv (&aws->usock, aws->buf, 150, NULL);
+                aws->bufsz = 150;
+                aws->state = NN_AWS_STATE_RECEIVING_WSHDR;
 
                 return;
 
@@ -237,6 +252,138 @@
         }
 
 /******************************************************************************/
+/*  RECEIVING_WSHDR state.                                                    */
+/*  WebSocket connection request from the client is being read.               */
+/******************************************************************************/
+    case NN_AWS_STATE_RECEIVING_WSHDR:
+        switch (src) {
+
+        case NN_AWS_SRC_USOCK:
+            switch (type) {
+            case NN_USOCK_RECEIVED:
+
+                /*  Check whether WebSocket connection reply was fully read.
+                    If not so, read one more byte and repeat. */
+                nn_assert (aws->bufsz >= 4);
+                if (memcmp (&aws->buf [aws->bufsz - 4], "\r\n\r\n", 4) != 0) {
+                    nn_assert (NN_AWS_BUF_SIZE >= aws->bufsz + 1);
+                    nn_usock_recv (&aws->usock,
+                        &aws->buf [aws->bufsz], 1, NULL);
+                    ++aws->bufsz;
+                    return;
+                }
+
+                /*  TODO: Validate the request. */
+
+                /*  Send the WebSocket connection reply. */
+                iov.iov_base = aws->buf;
+                iov.iov_len = snprintf (aws->buf, NN_AWS_BUF_SIZE,
+                        "HTTP/1.1 101 Switching Protocols\r\n"
+                        "Upgrade: websocket\r\n"
+                        "Connection: Upgrade\r\n"
+                        "Sec-WebSocket-Accept: %s\r\n"
+                        "Sec-WebSocket-Protocol: sp\r\n\r\n",
+                    "accept_key" /* TODO */);
+                nn_assert (iov.iov_len + 10 <= NN_AWS_BUF_SIZE);
+
+                /*  Bundle SP header with the request. */
+                sz = sizeof (val);
+                nn_epbase_getopt (aws->epbase, NN_SOL_SOCKET, NN_PROTOCOL,
+                    &val, &sz);
+                nn_assert (sz == sizeof (val));
+                aws->buf [iov.iov_len] = 0x82;
+                aws->buf [iov.iov_len + 1] = 0x08;
+                aws->buf [iov.iov_len + 2] = 0x00;
+                aws->buf [iov.iov_len + 3] = 'S';
+                aws->buf [iov.iov_len + 4] = 'P';
+                aws->buf [iov.iov_len + 5] = 0x00;
+                nn_puts (&aws->buf [iov.iov_len + 6], val);
+                aws->buf [iov.iov_len + 8] = 0x00;
+                aws->buf [iov.iov_len + 9] = 0x00;
+                iov.iov_len += 10;
+
+                /*  Send it to the peer. */
+                nn_usock_send (&aws->usock, &iov, 1);
+                aws->state = NN_AWS_STATE_SENDING_HDR;
+
+                return;
+
+            case NN_USOCK_ERROR:
+                nn_usock_stop (&aws->usock);
+                aws->state = NN_AWS_STATE_STOPPING_USOCK;
+                return;
+            default:
+                nn_fsm_bad_action (aws->state, src, type);
+            }
+
+        default:
+            nn_fsm_bad_source (aws->state, src, type);
+        }
+
+/******************************************************************************/
+/*  SENDING_HDR state.                                                        */
+/*  WebSocket connection reply is being sent along with SP protocol header.   */
+/******************************************************************************/
+    case NN_AWS_STATE_SENDING_HDR:
+        switch (src) {
+
+        case NN_AWS_SRC_USOCK:
+            switch (type) {
+            case NN_USOCK_SENT:
+
+                /*  Reply is sent. Now read the SP protocol header. */
+                nn_assert (NN_AWS_BUF_SIZE >= 14);
+                nn_usock_recv (&aws->usock, aws->buf, 14, NULL);
+                aws->bufsz = 14;
+                aws->state = NN_AWS_STATE_RECEIVING_SPHDR;
+                return;
+
+            case NN_USOCK_ERROR:
+                nn_usock_stop (&aws->usock);
+                aws->state = NN_AWS_STATE_STOPPING_USOCK;
+                return;
+            default:
+                nn_fsm_bad_action (aws->state, src, type);
+            }
+
+        default:
+            nn_fsm_bad_source (aws->state, src, type);
+        }
+
+/******************************************************************************/
+/*  RECEIVING_SPHDR state.                                                    */
+/*  SP protocol header is being read.                                         */
+/******************************************************************************/
+    case NN_AWS_STATE_RECEIVING_SPHDR:
+        switch (src) {
+
+        case NN_AWS_SRC_USOCK:
+            switch (type) {
+            case NN_USOCK_RECEIVED:
+
+                /*  TODO: Validate the SP header. */
+                
+                /*  Start the sws state machine. */
+                nn_free (aws->buf);
+                aws->buf = NULL;
+                nn_sws_start (&aws->sws, &aws->usock, NN_SWS_MODE_SERVER);
+                aws->state = NN_AWS_STATE_ACTIVE;                
+                nn_epbase_stat_increment (aws->epbase,
+                    NN_STAT_ACCEPTED_CONNECTIONS, 1);
+                return;
+            case NN_USOCK_ERROR:
+                nn_usock_stop (&aws->usock);
+                aws->state = NN_AWS_STATE_STOPPING_USOCK;
+                return;
+            default:
+                nn_fsm_bad_action (aws->state, src, type);
+            }
+
+        default:
+            nn_fsm_bad_source (aws->state, src, type);
+        }
+
+/******************************************************************************/
 /*  ACTIVE state.                                                             */
 /******************************************************************************/
     case NN_AWS_STATE_ACTIVE:
diff --git a/src/transports/ws/aws.h b/src/transports/ws/aws.h
index 45772f9..e787d15 100644
--- a/src/transports/ws/aws.h
+++ b/src/transports/ws/aws.h
@@ -61,6 +61,15 @@
     /*  State machine that takes care of the connection in the active state. */
     struct nn_sws sws;
 
+    /*  This buffer is used to store both incoming WebSocket connection request
+        and outgoing reply. It's a pointer rather than static buffer so that
+        it can be deallocated after the handshake. */
+    uint8_t *buf;
+
+    /*  When reading to buf, this number indicates how many bytes are already
+        read. */
+    size_t bufsz;
+
     /*  Events generated by aws state machine. */
     struct nn_fsm_event accepted;
     struct nn_fsm_event done;
diff --git a/src/transports/ws/cws.c b/src/transports/ws/cws.c
index b99255c..bc47a30 100644
--- a/src/transports/ws/cws.c
+++ b/src/transports/ws/cws.c
@@ -23,6 +23,7 @@
 
 #include "cws.h"
 #include "sws.h"
+#include "masker.h"
 
 #include "../../ws.h"
 
@@ -41,6 +42,8 @@
 #include "../../utils/fast.h"
 #include "../../utils/int.h"
 #include "../../utils/attr.h"
+#include "../../utils/random.h"
+#include "../../utils/wire.h"
 
 #include <string.h>
 
@@ -56,19 +59,24 @@
 #define NN_CWS_STATE_RESOLVING 2
 #define NN_CWS_STATE_STOPPING_DNS 3
 #define NN_CWS_STATE_CONNECTING 4
-#define NN_CWS_STATE_ACTIVE 5
-#define NN_CWS_STATE_STOPPING_SWS 6
-#define NN_CWS_STATE_STOPPING_USOCK 7
-#define NN_CWS_STATE_WAITING 8
-#define NN_CWS_STATE_STOPPING_BACKOFF 9
-#define NN_CWS_STATE_STOPPING_SWS_FINAL 10
-#define NN_CWS_STATE_STOPPING 11
+#define NN_CWS_STATE_SENDING_HDR 5
+#define NN_CWS_STATE_RECEIVING_WSHDR 6
+#define NN_CWS_STATE_RECEIVING_SPHDR 7
+#define NN_CWS_STATE_ACTIVE 8
+#define NN_CWS_STATE_STOPPING_SWS 9
+#define NN_CWS_STATE_STOPPING_USOCK 10
+#define NN_CWS_STATE_WAITING 11
+#define NN_CWS_STATE_STOPPING_BACKOFF 12
+#define NN_CWS_STATE_STOPPING_SWS_FINAL 13
+#define NN_CWS_STATE_STOPPING 14
 
 #define NN_CWS_SRC_USOCK 1
 #define NN_CWS_SRC_RECONNECT_TIMER 2
 #define NN_CWS_SRC_DNS 3
 #define NN_CWS_SRC_SWS 4
 
+#define NN_CWS_BUF_SIZE 2048
+
 struct nn_cws {
 
     /*  The state machine. */
@@ -85,6 +93,15 @@
     /*  Used to wait before retrying to connect. */
     struct nn_backoff retry;
 
+    /*  This buffer is used to store both outgoing WebSocket connection request
+        and incoming reply. It's a pointer rather than static buffer so that
+        it can be deallocated after the handshake. */
+    uint8_t *buf;
+
+    /*  When reading to buf, this number indicates how many bytes are already
+        read. */
+    size_t bufsz;
+
     /*  State machine that handles the active part of the connection
         lifetime. */
     struct nn_sws sws;
@@ -197,6 +214,7 @@
         reconnect_ivl_max = reconnect_ivl;
     nn_backoff_init (&self->retry, NN_CWS_SRC_RECONNECT_TIMER,
         reconnect_ivl, reconnect_ivl_max, &self->fsm);
+    self->buf = NULL;
     nn_sws_init (&self->sws, NN_CWS_SRC_SWS, &self->epbase, &self->fsm);
     nn_dns_init (&self->dns, NN_CWS_SRC_DNS, &self->fsm);
 
@@ -230,7 +248,8 @@
     nn_usock_term (&cws->usock);
     nn_fsm_term (&cws->fsm);
     nn_epbase_term (&cws->epbase);
-
+    if (cws->buf)
+        nn_free (cws->buf);
     nn_free (cws);
 }
 
@@ -275,6 +294,10 @@
     NN_UNUSED void *srcptr)
 {
     struct nn_cws *cws;
+    struct nn_iovec iov;
+    size_t sz;
+    int protocol;
+    struct nn_masker masker;
 
     cws = nn_cont (self, struct nn_cws, fsm);
 
@@ -357,6 +380,169 @@
         case NN_CWS_SRC_USOCK:
             switch (type) {
             case NN_USOCK_CONNECTED:
+
+                /*  Allocate the buffer to be used during the handshake. */
+                nn_assert (cws->buf == NULL);
+                cws->buf = nn_alloc (NN_CWS_BUF_SIZE, "cws handshake buffer");
+                alloc_assert (cws->buf);
+
+                /*  Create WebSocket connection request. */
+                iov.iov_base = cws->buf;
+                iov.iov_len = snprintf (cws->buf, NN_CWS_BUF_SIZE,
+                        "GET / HTTP/1.1\r\n"
+                        "Host: %s\r\n"
+                        "Upgrade: websocket\r\n"
+                        "Connection: Upgrade\r\n"
+                        "Sec-WebSocket-Key: %s\r\n"
+                        "Sec-WebSocket-Version: 13\r\n"
+                        "Sec-WebSocket-Protocol: sp\r\n\r\n",
+                    "TODO", /*  TODO: The host part of the connection string. */
+                    "encoded_key" /*  TODO */);
+                nn_assert (iov.iov_len + 14 <= NN_CWS_BUF_SIZE);
+
+                /*  Bundle SP header with the request. */
+                sz = sizeof (protocol);
+                nn_epbase_getopt (&cws->epbase, NN_SOL_SOCKET, NN_PROTOCOL,
+                    &protocol, &sz);
+                nn_assert (sz == sizeof (protocol));
+                cws->buf [iov.iov_len] = 0x82;
+                cws->buf [iov.iov_len + 1] = 0x88;
+                nn_random_generate (&cws->buf [iov.iov_len + 2], 4);
+                cws->buf [iov.iov_len + 6] = 0x00;
+                cws->buf [iov.iov_len + 7] = 'S';
+                cws->buf [iov.iov_len + 8] = 'P';
+                cws->buf [iov.iov_len + 9] = 0x00;
+                nn_puts (&cws->buf [iov.iov_len + 10], protocol);
+                cws->buf [iov.iov_len + 12] = 0x00;
+                cws->buf [iov.iov_len + 13] = 0x00;
+                nn_masker_init (&masker, &cws->buf [iov.iov_len + 2]);
+                nn_masker_mask (&masker, &cws->buf [iov.iov_len + 6], 8);
+                iov.iov_len += 14;
+
+                /*  Send it to the peer. */
+                nn_usock_send (&cws->usock, &iov, 1);
+                cws->state = NN_CWS_STATE_SENDING_HDR;
+                return;
+
+            case NN_USOCK_ERROR:
+                nn_epbase_set_error (&cws->epbase,
+                    nn_usock_geterrno (&cws->usock));
+                nn_usock_stop (&cws->usock);
+                cws->state = NN_CWS_STATE_STOPPING_USOCK;
+                nn_epbase_stat_increment (&cws->epbase,
+                    NN_STAT_INPROGRESS_CONNECTIONS, -1);
+                nn_epbase_stat_increment (&cws->epbase,
+                    NN_STAT_CONNECT_ERRORS, 1);
+                return;
+            default:
+                nn_fsm_bad_action (cws->state, src, type);
+            }
+
+        default:
+            nn_fsm_bad_source (cws->state, src, type);
+        }
+
+/******************************************************************************/
+/*  SENDING_HDR state.                                                        */
+/*  WebSocket request (with bundled SP header) is being sent.                 */
+/******************************************************************************/
+    case NN_CWS_STATE_SENDING_HDR:
+        switch (src) {
+
+        case NN_CWS_SRC_USOCK:
+            switch (type) {
+            case NN_USOCK_SENT:
+
+                /*  Start reading the reply. It is at least 15 bytes long. */
+                nn_assert (NN_CWS_BUF_SIZE >= 15);
+                nn_usock_recv (&cws->usock, cws->buf, 15, NULL);
+                cws->bufsz = 15;
+                cws->state = NN_CWS_STATE_RECEIVING_WSHDR;
+                return;
+
+            case NN_USOCK_ERROR:
+                nn_epbase_set_error (&cws->epbase,
+                    nn_usock_geterrno (&cws->usock));
+                nn_usock_stop (&cws->usock);
+                cws->state = NN_CWS_STATE_STOPPING_USOCK;
+                nn_epbase_stat_increment (&cws->epbase,
+                    NN_STAT_INPROGRESS_CONNECTIONS, -1);
+                nn_epbase_stat_increment (&cws->epbase,
+                    NN_STAT_CONNECT_ERRORS, 1);
+                return;
+            default:
+                nn_fsm_bad_action (cws->state, src, type);
+            }
+
+        default:
+            nn_fsm_bad_source (cws->state, src, type);
+        }
+
+/******************************************************************************/
+/*  RECEIVING_WSHDR state.                                                    */
+/*  WebSocket reply is being received.                                        */
+/******************************************************************************/
+    case NN_CWS_STATE_RECEIVING_WSHDR:
+        switch (src) {
+
+        case NN_CWS_SRC_USOCK:
+            switch (type) {
+            case NN_USOCK_RECEIVED:
+
+                /*  Check whether WebSocket connection request was fully read.
+                    If not so, read one more byte and repeat. */
+                nn_assert (cws->bufsz >= 4);
+                if (memcmp (&cws->buf [cws->bufsz - 4], "\r\n\r\n", 4) != 0) {
+                    nn_assert (NN_CWS_BUF_SIZE >= cws->bufsz + 1);
+                    nn_usock_recv (&cws->usock,
+                        &cws->buf [cws->bufsz], 1, NULL);
+                    ++cws->bufsz;
+                    return;
+                }
+
+                /*  TODO: Do reply validation here. */
+
+                /*  When WebSocket response was received, read SP header. */
+                nn_assert (NN_CWS_BUF_SIZE >= 10);
+                nn_usock_recv (&cws->usock, cws->buf, 10, NULL);
+                cws->bufsz = 10;
+                cws->state = NN_CWS_STATE_RECEIVING_SPHDR;
+                return;
+
+            case NN_USOCK_ERROR:
+                nn_epbase_set_error (&cws->epbase,
+                    nn_usock_geterrno (&cws->usock));
+                nn_usock_stop (&cws->usock);
+                cws->state = NN_CWS_STATE_STOPPING_USOCK;
+                nn_epbase_stat_increment (&cws->epbase,
+                    NN_STAT_INPROGRESS_CONNECTIONS, -1);
+                nn_epbase_stat_increment (&cws->epbase,
+                    NN_STAT_CONNECT_ERRORS, 1);
+                return;
+            default:
+                nn_fsm_bad_action (cws->state, src, type);
+            }
+
+        default:
+            nn_fsm_bad_source (cws->state, src, type);
+        }
+
+/******************************************************************************/
+/*  RECEIVING_SPHDR state.                                                    */
+/*  SP protocol header is being received.                                     */
+/******************************************************************************/
+    case NN_CWS_STATE_RECEIVING_SPHDR:
+        switch (src) {
+
+        case NN_CWS_SRC_USOCK:
+            switch (type) {
+            case NN_USOCK_RECEIVED:
+
+                /* TODO: Do SP header validation here. */
+
+                /*  Start normal communication. */
+                nn_free (cws->buf);
+                cws->buf = NULL;
                 nn_sws_start (&cws->sws, &cws->usock, NN_SWS_MODE_CLIENT);
                 cws->state = NN_CWS_STATE_ACTIVE;
                 nn_epbase_stat_increment (&cws->epbase,
@@ -365,6 +551,7 @@
                     NN_STAT_ESTABLISHED_CONNECTIONS, 1);
                 nn_epbase_clear_error (&cws->epbase);
                 return;
+
             case NN_USOCK_ERROR:
                 nn_epbase_set_error (&cws->epbase,
                     nn_usock_geterrno (&cws->usock));