Initial ws handshake implemented
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/core/epbase.c b/src/core/epbase.c
index 28e0905..580a07a 100644
--- a/src/core/epbase.c
+++ b/src/core/epbase.c
@@ -76,3 +76,4 @@
void nn_epbase_stat_increment(struct nn_epbase *self, int name, int increment) {
nn_ep_stat_increment(self->ep, name, increment);
}
+
diff --git a/src/transports/ws/aws.c b/src/transports/ws/aws.c
index 1caad73..5a3c8d6 100644
--- a/src/transports/ws/aws.c
+++ b/src/transports/ws/aws.c
@@ -26,20 +26,27 @@
#include "../../utils/err.h"
#include "../../utils/cont.h"
#include "../../utils/attr.h"
+#include "../../utils/alloc.h"
+#include "../../utils/wire.h"
#define NN_AWS_STATE_IDLE 1
#define NN_AWS_STATE_ACCEPTING 2
-#define NN_AWS_STATE_ACTIVE 3
-#define NN_AWS_STATE_STOPPING_SWS 4
-#define NN_AWS_STATE_STOPPING_USOCK 5
-#define NN_AWS_STATE_DONE 6
-#define NN_AWS_STATE_STOPPING_SWS_FINAL 7
-#define NN_AWS_STATE_STOPPING 8
+#define NN_AWS_STATE_RECEIVING_WSHDR 3
+#define NN_AWS_STATE_SENDING_HDR 4
+#define NN_AWS_STATE_RECEIVING_SPHDR 5
+#define NN_AWS_STATE_ACTIVE 6
+#define NN_AWS_STATE_STOPPING_SWS 7
+#define NN_AWS_STATE_STOPPING_USOCK 8
+#define NN_AWS_STATE_DONE 9
+#define NN_AWS_STATE_STOPPING_SWS_FINAL 10
+#define NN_AWS_STATE_STOPPING 11
#define NN_AWS_SRC_USOCK 1
#define NN_AWS_SRC_SWS 2
#define NN_AWS_SRC_LISTENER 3
+#define NN_AWS_BUF_SIZE 2048
+
/* Private functions. */
static void nn_aws_handler (struct nn_fsm *self, int src, int type,
void *srcptr);
@@ -58,6 +65,7 @@
self->listener_owner.src = -1;
self->listener_owner.fsm = NULL;
nn_sws_init (&self->sws, NN_AWS_SRC_SWS, epbase, &self->fsm);
+ self->buf = NULL;
nn_fsm_event_init (&self->accepted);
nn_fsm_event_init (&self->done);
nn_list_item_init (&self->item);
@@ -71,6 +79,8 @@
nn_fsm_event_term (&self->done);
nn_fsm_event_term (&self->accepted);
nn_sws_term (&self->sws);
+ if (self->buf)
+ nn_free (self->buf);
nn_usock_term (&self->usock);
nn_fsm_term (&self->fsm);
}
@@ -142,6 +152,7 @@
NN_UNUSED void *srcptr)
{
struct nn_aws *aws;
+ struct nn_iovec iov;
int val;
size_t sz;
@@ -203,13 +214,17 @@
aws->listener_owner.fsm = NULL;
nn_fsm_raise (&aws->fsm, &aws->accepted, NN_AWS_ACCEPTED);
- /* Start the sws state machine. */
- nn_usock_activate (&aws->usock);
- nn_sws_start (&aws->sws, &aws->usock, NN_SWS_MODE_SERVER);
- aws->state = NN_AWS_STATE_ACTIVE;
+ /* Allocate the buffer to be used during the handshake. */
+ nn_assert (aws->buf == NULL);
+ aws->buf = nn_alloc (NN_AWS_BUF_SIZE, "aws handshake buffer");
+ alloc_assert (aws->buf);
- nn_epbase_stat_increment (aws->epbase,
- NN_STAT_ACCEPTED_CONNECTIONS, 1);
+ /* Start reading the request. It is at least 150 bytes long. */
+ nn_usock_activate (&aws->usock);
+ nn_assert (NN_AWS_BUF_SIZE >= 150);
+ nn_usock_recv (&aws->usock, aws->buf, 150, NULL);
+ aws->bufsz = 150;
+ aws->state = NN_AWS_STATE_RECEIVING_WSHDR;
return;
@@ -237,6 +252,138 @@
}
/******************************************************************************/
+/* RECEIVING_WSHDR state. */
+/* WebSocket connection request from the client is being read. */
+/******************************************************************************/
+ case NN_AWS_STATE_RECEIVING_WSHDR:
+ switch (src) {
+
+ case NN_AWS_SRC_USOCK:
+ switch (type) {
+ case NN_USOCK_RECEIVED:
+
+ /* Check whether WebSocket connection reply was fully read.
+ If not so, read one more byte and repeat. */
+ nn_assert (aws->bufsz >= 4);
+ if (memcmp (&aws->buf [aws->bufsz - 4], "\r\n\r\n", 4) != 0) {
+ nn_assert (NN_AWS_BUF_SIZE >= aws->bufsz + 1);
+ nn_usock_recv (&aws->usock,
+ &aws->buf [aws->bufsz], 1, NULL);
+ ++aws->bufsz;
+ return;
+ }
+
+ /* TODO: Validate the request. */
+
+ /* Send the WebSocket connection reply. */
+ iov.iov_base = aws->buf;
+ iov.iov_len = snprintf (aws->buf, NN_AWS_BUF_SIZE,
+ "HTTP/1.1 101 Switching Protocols\r\n"
+ "Upgrade: websocket\r\n"
+ "Connection: Upgrade\r\n"
+ "Sec-WebSocket-Accept: %s\r\n"
+ "Sec-WebSocket-Protocol: sp\r\n\r\n",
+ "accept_key" /* TODO */);
+ nn_assert (iov.iov_len + 10 <= NN_AWS_BUF_SIZE);
+
+ /* Bundle SP header with the request. */
+ sz = sizeof (val);
+ nn_epbase_getopt (aws->epbase, NN_SOL_SOCKET, NN_PROTOCOL,
+ &val, &sz);
+ nn_assert (sz == sizeof (val));
+ aws->buf [iov.iov_len] = 0x82;
+ aws->buf [iov.iov_len + 1] = 0x08;
+ aws->buf [iov.iov_len + 2] = 0x00;
+ aws->buf [iov.iov_len + 3] = 'S';
+ aws->buf [iov.iov_len + 4] = 'P';
+ aws->buf [iov.iov_len + 5] = 0x00;
+ nn_puts (&aws->buf [iov.iov_len + 6], val);
+ aws->buf [iov.iov_len + 8] = 0x00;
+ aws->buf [iov.iov_len + 9] = 0x00;
+ iov.iov_len += 10;
+
+ /* Send it to the peer. */
+ nn_usock_send (&aws->usock, &iov, 1);
+ aws->state = NN_AWS_STATE_SENDING_HDR;
+
+ return;
+
+ case NN_USOCK_ERROR:
+ nn_usock_stop (&aws->usock);
+ aws->state = NN_AWS_STATE_STOPPING_USOCK;
+ return;
+ default:
+ nn_fsm_bad_action (aws->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (aws->state, src, type);
+ }
+
+/******************************************************************************/
+/* SENDING_HDR state. */
+/* WebSocket connection reply is being sent along with SP protocol header. */
+/******************************************************************************/
+ case NN_AWS_STATE_SENDING_HDR:
+ switch (src) {
+
+ case NN_AWS_SRC_USOCK:
+ switch (type) {
+ case NN_USOCK_SENT:
+
+ /* Reply is sent. Now read the SP protocol header. */
+ nn_assert (NN_AWS_BUF_SIZE >= 14);
+ nn_usock_recv (&aws->usock, aws->buf, 14, NULL);
+ aws->bufsz = 14;
+ aws->state = NN_AWS_STATE_RECEIVING_SPHDR;
+ return;
+
+ case NN_USOCK_ERROR:
+ nn_usock_stop (&aws->usock);
+ aws->state = NN_AWS_STATE_STOPPING_USOCK;
+ return;
+ default:
+ nn_fsm_bad_action (aws->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (aws->state, src, type);
+ }
+
+/******************************************************************************/
+/* RECEIVING_SPHDR state. */
+/* SP protocol header is being read. */
+/******************************************************************************/
+ case NN_AWS_STATE_RECEIVING_SPHDR:
+ switch (src) {
+
+ case NN_AWS_SRC_USOCK:
+ switch (type) {
+ case NN_USOCK_RECEIVED:
+
+ /* TODO: Validate the SP header. */
+
+ /* Start the sws state machine. */
+ nn_free (aws->buf);
+ aws->buf = NULL;
+ nn_sws_start (&aws->sws, &aws->usock, NN_SWS_MODE_SERVER);
+ aws->state = NN_AWS_STATE_ACTIVE;
+ nn_epbase_stat_increment (aws->epbase,
+ NN_STAT_ACCEPTED_CONNECTIONS, 1);
+ return;
+ case NN_USOCK_ERROR:
+ nn_usock_stop (&aws->usock);
+ aws->state = NN_AWS_STATE_STOPPING_USOCK;
+ return;
+ default:
+ nn_fsm_bad_action (aws->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (aws->state, src, type);
+ }
+
+/******************************************************************************/
/* ACTIVE state. */
/******************************************************************************/
case NN_AWS_STATE_ACTIVE:
diff --git a/src/transports/ws/aws.h b/src/transports/ws/aws.h
index 45772f9..e787d15 100644
--- a/src/transports/ws/aws.h
+++ b/src/transports/ws/aws.h
@@ -61,6 +61,15 @@
/* State machine that takes care of the connection in the active state. */
struct nn_sws sws;
+ /* This buffer is used to store both incoming WebSocket connection request
+ and outgoing reply. It's a pointer rather than static buffer so that
+ it can be deallocated after the handshake. */
+ uint8_t *buf;
+
+ /* When reading to buf, this number indicates how many bytes are already
+ read. */
+ size_t bufsz;
+
/* Events generated by aws state machine. */
struct nn_fsm_event accepted;
struct nn_fsm_event done;
diff --git a/src/transports/ws/cws.c b/src/transports/ws/cws.c
index b99255c..bc47a30 100644
--- a/src/transports/ws/cws.c
+++ b/src/transports/ws/cws.c
@@ -23,6 +23,7 @@
#include "cws.h"
#include "sws.h"
+#include "masker.h"
#include "../../ws.h"
@@ -41,6 +42,8 @@
#include "../../utils/fast.h"
#include "../../utils/int.h"
#include "../../utils/attr.h"
+#include "../../utils/random.h"
+#include "../../utils/wire.h"
#include <string.h>
@@ -56,19 +59,24 @@
#define NN_CWS_STATE_RESOLVING 2
#define NN_CWS_STATE_STOPPING_DNS 3
#define NN_CWS_STATE_CONNECTING 4
-#define NN_CWS_STATE_ACTIVE 5
-#define NN_CWS_STATE_STOPPING_SWS 6
-#define NN_CWS_STATE_STOPPING_USOCK 7
-#define NN_CWS_STATE_WAITING 8
-#define NN_CWS_STATE_STOPPING_BACKOFF 9
-#define NN_CWS_STATE_STOPPING_SWS_FINAL 10
-#define NN_CWS_STATE_STOPPING 11
+#define NN_CWS_STATE_SENDING_HDR 5
+#define NN_CWS_STATE_RECEIVING_WSHDR 6
+#define NN_CWS_STATE_RECEIVING_SPHDR 7
+#define NN_CWS_STATE_ACTIVE 8
+#define NN_CWS_STATE_STOPPING_SWS 9
+#define NN_CWS_STATE_STOPPING_USOCK 10
+#define NN_CWS_STATE_WAITING 11
+#define NN_CWS_STATE_STOPPING_BACKOFF 12
+#define NN_CWS_STATE_STOPPING_SWS_FINAL 13
+#define NN_CWS_STATE_STOPPING 14
#define NN_CWS_SRC_USOCK 1
#define NN_CWS_SRC_RECONNECT_TIMER 2
#define NN_CWS_SRC_DNS 3
#define NN_CWS_SRC_SWS 4
+#define NN_CWS_BUF_SIZE 2048
+
struct nn_cws {
/* The state machine. */
@@ -85,6 +93,15 @@
/* Used to wait before retrying to connect. */
struct nn_backoff retry;
+ /* This buffer is used to store both outgoing WebSocket connection request
+ and incoming reply. It's a pointer rather than static buffer so that
+ it can be deallocated after the handshake. */
+ uint8_t *buf;
+
+ /* When reading to buf, this number indicates how many bytes are already
+ read. */
+ size_t bufsz;
+
/* State machine that handles the active part of the connection
lifetime. */
struct nn_sws sws;
@@ -197,6 +214,7 @@
reconnect_ivl_max = reconnect_ivl;
nn_backoff_init (&self->retry, NN_CWS_SRC_RECONNECT_TIMER,
reconnect_ivl, reconnect_ivl_max, &self->fsm);
+ self->buf = NULL;
nn_sws_init (&self->sws, NN_CWS_SRC_SWS, &self->epbase, &self->fsm);
nn_dns_init (&self->dns, NN_CWS_SRC_DNS, &self->fsm);
@@ -230,7 +248,8 @@
nn_usock_term (&cws->usock);
nn_fsm_term (&cws->fsm);
nn_epbase_term (&cws->epbase);
-
+ if (cws->buf)
+ nn_free (cws->buf);
nn_free (cws);
}
@@ -275,6 +294,10 @@
NN_UNUSED void *srcptr)
{
struct nn_cws *cws;
+ struct nn_iovec iov;
+ size_t sz;
+ int protocol;
+ struct nn_masker masker;
cws = nn_cont (self, struct nn_cws, fsm);
@@ -357,6 +380,169 @@
case NN_CWS_SRC_USOCK:
switch (type) {
case NN_USOCK_CONNECTED:
+
+ /* Allocate the buffer to be used during the handshake. */
+ nn_assert (cws->buf == NULL);
+ cws->buf = nn_alloc (NN_CWS_BUF_SIZE, "cws handshake buffer");
+ alloc_assert (cws->buf);
+
+ /* Create WebSocket connection request. */
+ iov.iov_base = cws->buf;
+ iov.iov_len = snprintf (cws->buf, NN_CWS_BUF_SIZE,
+ "GET / HTTP/1.1\r\n"
+ "Host: %s\r\n"
+ "Upgrade: websocket\r\n"
+ "Connection: Upgrade\r\n"
+ "Sec-WebSocket-Key: %s\r\n"
+ "Sec-WebSocket-Version: 13\r\n"
+ "Sec-WebSocket-Protocol: sp\r\n\r\n",
+ "TODO", /* TODO: The host part of the connection string. */
+ "encoded_key" /* TODO */);
+ nn_assert (iov.iov_len + 14 <= NN_CWS_BUF_SIZE);
+
+ /* Bundle SP header with the request. */
+ sz = sizeof (protocol);
+ nn_epbase_getopt (&cws->epbase, NN_SOL_SOCKET, NN_PROTOCOL,
+ &protocol, &sz);
+ nn_assert (sz == sizeof (protocol));
+ cws->buf [iov.iov_len] = 0x82;
+ cws->buf [iov.iov_len + 1] = 0x88;
+ nn_random_generate (&cws->buf [iov.iov_len + 2], 4);
+ cws->buf [iov.iov_len + 6] = 0x00;
+ cws->buf [iov.iov_len + 7] = 'S';
+ cws->buf [iov.iov_len + 8] = 'P';
+ cws->buf [iov.iov_len + 9] = 0x00;
+ nn_puts (&cws->buf [iov.iov_len + 10], protocol);
+ cws->buf [iov.iov_len + 12] = 0x00;
+ cws->buf [iov.iov_len + 13] = 0x00;
+ nn_masker_init (&masker, &cws->buf [iov.iov_len + 2]);
+ nn_masker_mask (&masker, &cws->buf [iov.iov_len + 6], 8);
+ iov.iov_len += 14;
+
+ /* Send it to the peer. */
+ nn_usock_send (&cws->usock, &iov, 1);
+ cws->state = NN_CWS_STATE_SENDING_HDR;
+ return;
+
+ case NN_USOCK_ERROR:
+ nn_epbase_set_error (&cws->epbase,
+ nn_usock_geterrno (&cws->usock));
+ nn_usock_stop (&cws->usock);
+ cws->state = NN_CWS_STATE_STOPPING_USOCK;
+ nn_epbase_stat_increment (&cws->epbase,
+ NN_STAT_INPROGRESS_CONNECTIONS, -1);
+ nn_epbase_stat_increment (&cws->epbase,
+ NN_STAT_CONNECT_ERRORS, 1);
+ return;
+ default:
+ nn_fsm_bad_action (cws->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (cws->state, src, type);
+ }
+
+/******************************************************************************/
+/* SENDING_HDR state. */
+/* WebSocket request (with bundled SP header) is being sent. */
+/******************************************************************************/
+ case NN_CWS_STATE_SENDING_HDR:
+ switch (src) {
+
+ case NN_CWS_SRC_USOCK:
+ switch (type) {
+ case NN_USOCK_SENT:
+
+ /* Start reading the reply. It is at least 15 bytes long. */
+ nn_assert (NN_CWS_BUF_SIZE >= 15);
+ nn_usock_recv (&cws->usock, cws->buf, 15, NULL);
+ cws->bufsz = 15;
+ cws->state = NN_CWS_STATE_RECEIVING_WSHDR;
+ return;
+
+ case NN_USOCK_ERROR:
+ nn_epbase_set_error (&cws->epbase,
+ nn_usock_geterrno (&cws->usock));
+ nn_usock_stop (&cws->usock);
+ cws->state = NN_CWS_STATE_STOPPING_USOCK;
+ nn_epbase_stat_increment (&cws->epbase,
+ NN_STAT_INPROGRESS_CONNECTIONS, -1);
+ nn_epbase_stat_increment (&cws->epbase,
+ NN_STAT_CONNECT_ERRORS, 1);
+ return;
+ default:
+ nn_fsm_bad_action (cws->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (cws->state, src, type);
+ }
+
+/******************************************************************************/
+/* RECEIVING_WSHDR state. */
+/* WebSocket reply is being received. */
+/******************************************************************************/
+ case NN_CWS_STATE_RECEIVING_WSHDR:
+ switch (src) {
+
+ case NN_CWS_SRC_USOCK:
+ switch (type) {
+ case NN_USOCK_RECEIVED:
+
+ /* Check whether WebSocket connection request was fully read.
+ If not so, read one more byte and repeat. */
+ nn_assert (cws->bufsz >= 4);
+ if (memcmp (&cws->buf [cws->bufsz - 4], "\r\n\r\n", 4) != 0) {
+ nn_assert (NN_CWS_BUF_SIZE >= cws->bufsz + 1);
+ nn_usock_recv (&cws->usock,
+ &cws->buf [cws->bufsz], 1, NULL);
+ ++cws->bufsz;
+ return;
+ }
+
+ /* TODO: Do reply validation here. */
+
+ /* When WebSocket response was received, read SP header. */
+ nn_assert (NN_CWS_BUF_SIZE >= 10);
+ nn_usock_recv (&cws->usock, cws->buf, 10, NULL);
+ cws->bufsz = 10;
+ cws->state = NN_CWS_STATE_RECEIVING_SPHDR;
+ return;
+
+ case NN_USOCK_ERROR:
+ nn_epbase_set_error (&cws->epbase,
+ nn_usock_geterrno (&cws->usock));
+ nn_usock_stop (&cws->usock);
+ cws->state = NN_CWS_STATE_STOPPING_USOCK;
+ nn_epbase_stat_increment (&cws->epbase,
+ NN_STAT_INPROGRESS_CONNECTIONS, -1);
+ nn_epbase_stat_increment (&cws->epbase,
+ NN_STAT_CONNECT_ERRORS, 1);
+ return;
+ default:
+ nn_fsm_bad_action (cws->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (cws->state, src, type);
+ }
+
+/******************************************************************************/
+/* RECEIVING_SPHDR state. */
+/* SP protocol header is being received. */
+/******************************************************************************/
+ case NN_CWS_STATE_RECEIVING_SPHDR:
+ switch (src) {
+
+ case NN_CWS_SRC_USOCK:
+ switch (type) {
+ case NN_USOCK_RECEIVED:
+
+ /* TODO: Do SP header validation here. */
+
+ /* Start normal communication. */
+ nn_free (cws->buf);
+ cws->buf = NULL;
nn_sws_start (&cws->sws, &cws->usock, NN_SWS_MODE_CLIENT);
cws->state = NN_CWS_STATE_ACTIVE;
nn_epbase_stat_increment (&cws->epbase,
@@ -365,6 +551,7 @@
NN_STAT_ESTABLISHED_CONNECTIONS, 1);
nn_epbase_clear_error (&cws->epbase);
return;
+
case NN_USOCK_ERROR:
nn_epbase_set_error (&cws->epbase,
nn_usock_geterrno (&cws->usock));