fixes #581 WebSocket transport lagging features of TCP Transport
diff --git a/src/transports/ws/aws.c b/src/transports/ws/aws.c
index 2a20649..bb025e4 100644
--- a/src/transports/ws/aws.c
+++ b/src/transports/ws/aws.c
@@ -1,6 +1,6 @@
/*
Copyright (c) 2012-2013 250bpm s.r.o. All rights reserved.
- Copyright (c) 2014 Wirebird Labs LLC. All rights reserved.
+ Copyright (c) 2014-2016 Jack R. Dunaway. All rights reserved.
Copyright 2015 Garrett D'Amore <garrett@damore.org>
Permission is hereby granted, free of charge, to any person obtaining a copy
@@ -313,7 +313,8 @@
case NN_USOCK_SHUTDOWN:
return;
case NN_USOCK_STOPPED:
- nn_aws_stop (aws);
+ nn_fsm_raise (&aws->fsm, &aws->done, NN_AWS_ERROR);
+ aws->state = NN_AWS_STATE_DONE;
return;
default:
nn_fsm_bad_action (aws->state, src, type);
@@ -330,4 +331,3 @@
nn_fsm_bad_state (aws->state, src, type);
}
}
-
diff --git a/src/transports/ws/bws.c b/src/transports/ws/bws.c
index 1468473..cc7321c 100644
--- a/src/transports/ws/bws.c
+++ b/src/transports/ws/bws.c
@@ -1,6 +1,6 @@
/*
Copyright (c) 2012-2013 250bpm s.r.o. All rights reserved.
- Copyright (c) 2014 Wirebird Labs LLC. All rights reserved.
+ Copyright (c) 2014-2016 Jack R. Dunaway. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"),
@@ -30,6 +30,8 @@
#include "../../aio/fsm.h"
#include "../../aio/usock.h"
+#include "../utils/backoff.h"
+
#include "../../utils/err.h"
#include "../../utils/cont.h"
#include "../../utils/alloc.h"
@@ -54,9 +56,14 @@
#define NN_BWS_STATE_STOPPING_AWS 3
#define NN_BWS_STATE_STOPPING_USOCK 4
#define NN_BWS_STATE_STOPPING_AWSS 5
+#define NN_BWS_STATE_LISTENING 6
+#define NN_BWS_STATE_WAITING 7
+#define NN_BWS_STATE_CLOSING 8
+#define NN_BWS_STATE_STOPPING_BACKOFF 9
#define NN_BWS_SRC_USOCK 1
#define NN_BWS_SRC_AWS 2
+#define NN_BWS_SRC_RECONNECT_TIMER 3
struct nn_bws {
@@ -76,6 +83,9 @@
/* List of accepted connections. */
struct nn_list awss;
+
+ /* Timer used to throttle reconnection attempts. */
+ struct nn_backoff retry;
};
/* nn_epbase virtual interface implementation. */
@@ -105,6 +115,9 @@
size_t sslen;
int ipv4only;
size_t ipv4onlylen;
+ int reconnect_ivl;
+ int reconnect_ivl_max;
+ size_t sz;
/* Allocate the new endpoint object. */
self = nn_alloc (sizeof (struct nn_bws), "bws");
@@ -145,6 +158,18 @@
nn_fsm_init_root (&self->fsm, nn_bws_handler, nn_bws_shutdown,
nn_epbase_getctx (&self->epbase));
self->state = NN_BWS_STATE_IDLE;
+ sz = sizeof (reconnect_ivl);
+ nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_RECONNECT_IVL,
+ &reconnect_ivl, &sz);
+ nn_assert (sz == sizeof (reconnect_ivl));
+ sz = sizeof (reconnect_ivl_max);
+ nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_RECONNECT_IVL_MAX,
+ &reconnect_ivl_max, &sz);
+ nn_assert (sz == sizeof (reconnect_ivl_max));
+ if (reconnect_ivl_max == 0)
+ reconnect_ivl_max = reconnect_ivl;
+ nn_backoff_init (&self->retry, NN_BWS_SRC_RECONNECT_TIMER,
+ reconnect_ivl, reconnect_ivl_max, &self->fsm);
nn_usock_init (&self->usock, NN_BWS_SRC_USOCK, &self->fsm);
self->aws = NULL;
nn_list_init (&self->awss);
@@ -177,6 +202,7 @@
nn_list_term (&bws->awss);
nn_assert (bws->aws == NULL);
nn_usock_term (&bws->usock);
+ nn_backoff_term (&bws->retry);
nn_epbase_term (&bws->epbase);
nn_fsm_term (&bws->fsm);
@@ -193,8 +219,14 @@
bws = nn_cont (self, struct nn_bws, fsm);
if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) {
- nn_aws_stop (bws->aws);
- bws->state = NN_BWS_STATE_STOPPING_AWS;
+ nn_backoff_stop (&bws->retry);
+ if (bws->aws) {
+ nn_aws_stop (bws->aws);
+ bws->state = NN_BWS_STATE_STOPPING_AWS;
+ }
+ else {
+ bws->state = NN_BWS_STATE_STOPPING_USOCK;
+ }
}
if (nn_slow (bws->state == NN_BWS_STATE_STOPPING_AWS)) {
if (!nn_aws_isidle (bws->aws))
@@ -206,7 +238,7 @@
bws->state = NN_BWS_STATE_STOPPING_USOCK;
}
if (nn_slow (bws->state == NN_BWS_STATE_STOPPING_USOCK)) {
- if (!nn_usock_isidle (&bws->usock))
+ if (!nn_usock_isidle (&bws->usock) || !nn_backoff_isidle (&bws->retry))
return;
for (it = nn_list_begin (&bws->awss);
it != nn_list_end (&bws->awss);
@@ -260,8 +292,6 @@
switch (type) {
case NN_FSM_START:
nn_bws_start_listening (bws);
- nn_bws_start_accepting (bws);
- bws->state = NN_BWS_STATE_ACTIVE;
return;
default:
nn_fsm_bad_action (bws->state, src, type);
@@ -314,6 +344,71 @@
}
/******************************************************************************/
+/* CLOSING_USOCK state. */
+/* usock object was asked to stop but it hasn't stopped yet. */
+/******************************************************************************/
+ case NN_BWS_STATE_CLOSING:
+ switch (src) {
+
+ case NN_BWS_SRC_USOCK:
+ switch (type) {
+ case NN_USOCK_SHUTDOWN:
+ return;
+ case NN_USOCK_STOPPED:
+ nn_backoff_start (&bws->retry);
+ bws->state = NN_BWS_STATE_WAITING;
+ return;
+ default:
+ nn_fsm_bad_action (bws->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (bws->state, src, type);
+ }
+
+/******************************************************************************/
+/* WAITING state. */
+/* Waiting before re-bind is attempted. This way we won't overload */
+/* the system by continuous re-bind attempts. */
+/******************************************************************************/
+ case NN_BWS_STATE_WAITING:
+ switch (src) {
+
+ case NN_BWS_SRC_RECONNECT_TIMER:
+ switch (type) {
+ case NN_BACKOFF_TIMEOUT:
+ nn_backoff_stop (&bws->retry);
+ bws->state = NN_BWS_STATE_STOPPING_BACKOFF;
+ return;
+ default:
+ nn_fsm_bad_action (bws->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (bws->state, src, type);
+ }
+
+/******************************************************************************/
+/* STOPPING_BACKOFF state. */
+/* backoff object was asked to stop, but it haven't stopped yet. */
+/******************************************************************************/
+ case NN_BWS_STATE_STOPPING_BACKOFF:
+ switch (src) {
+
+ case NN_BWS_SRC_RECONNECT_TIMER:
+ switch (type) {
+ case NN_BACKOFF_STOPPED:
+ nn_bws_start_listening (bws);
+ return;
+ default:
+ nn_fsm_bad_action (bws->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (bws->state, src, type);
+ }
+
+/******************************************************************************/
/* Invalid state. */
/******************************************************************************/
default:
@@ -372,12 +467,27 @@
/* Start listening for incoming connections. */
rc = nn_usock_start (&self->usock, ss.ss_family, SOCK_STREAM, 0);
- /* TODO: EMFILE error can happen here. We can wait a bit and re-try. */
- errnum_assert (rc == 0, -rc);
+ if (nn_slow (rc < 0)) {
+ nn_backoff_start (&self->retry);
+ self->state = NN_BWS_STATE_WAITING;
+ return;
+ }
+
rc = nn_usock_bind (&self->usock, (struct sockaddr*) &ss, (size_t) sslen);
- errnum_assert (rc == 0, -rc);
+ if (nn_slow (rc < 0)) {
+ nn_usock_stop (&self->usock);
+ self->state = NN_BWS_STATE_CLOSING;
+ return;
+ }
+
rc = nn_usock_listen (&self->usock, NN_BWS_BACKLOG);
- errnum_assert (rc == 0, -rc);
+ if (nn_slow (rc < 0)) {
+ nn_usock_stop (&self->usock);
+ self->state = NN_BWS_STATE_CLOSING;
+ return;
+ }
+ nn_bws_start_accepting(self);
+ self->state = NN_BWS_STATE_ACTIVE;
}
static void nn_bws_start_accepting (struct nn_bws *self)
@@ -392,4 +502,3 @@
/* Start waiting for a new incoming connection. */
nn_aws_start (self->aws, &self->usock);
}
-
diff --git a/src/transports/ws/cws.c b/src/transports/ws/cws.c
index 2e24693..627d1ec 100644
--- a/src/transports/ws/cws.c
+++ b/src/transports/ws/cws.c
@@ -1,6 +1,6 @@
/*
Copyright (c) 2012-2013 250bpm s.r.o. All rights reserved.
- Copyright (c) 2014 Wirebird Labs LLC. All rights reserved.
+ Copyright (c) 2014-2016 Jack R. Dunaway. All rights reserved.
Copyright 2015 Garrett D'Amore <garrett@damore.org>
Permission is hereby granted, free of charge, to any person obtaining a copy
@@ -170,7 +170,7 @@
slash = colon ? strchr (colon, '/') : strchr (addr, '/');
resource = slash ? slash : addr + addrlen;
self->remote_hostname_len = colon ? colon - hostname : resource - hostname;
-
+
/* Host contains both hostname and port. */
hostlen = resource - hostname;
@@ -238,7 +238,7 @@
nn_epbase_getctx (&self->epbase));
self->state = NN_CWS_STATE_IDLE;
nn_usock_init (&self->usock, NN_CWS_SRC_USOCK, &self->fsm);
-
+
sz = sizeof (msg_type);
nn_epbase_getopt (&self->epbase, NN_WS, NN_WS_MSG_TYPE,
&msg_type, &sz);
@@ -518,11 +518,7 @@
/* If the peer has confirmed itself gone with a Closing
Handshake, or if the local endpoint failed the remote,
don't try to reconnect. */
- if (cws->peer_gone) {
- /* It is expected that the application detects this and
- prunes the connection with nn_shutdown. */
- }
- else {
+ if (!cws->peer_gone) {
nn_backoff_start (&cws->retry);
cws->state = NN_CWS_STATE_WAITING;
}
@@ -673,7 +669,11 @@
/* Bind the socket to the local network interface. */
rc = nn_usock_bind (&self->usock, (struct sockaddr*) &local, locallen);
- errnum_assert (rc == 0, -rc);
+ if (nn_slow (rc != 0)) {
+ nn_backoff_start (&self->retry);
+ self->state = NN_CWS_STATE_WAITING;
+ return;
+ }
/* Start connecting. */
nn_usock_connect (&self->usock, (struct sockaddr*) &remote, remotelen);
diff --git a/src/transports/ws/sws.c b/src/transports/ws/sws.c
index 4a4fecb..cbb2a77 100644
--- a/src/transports/ws/sws.c
+++ b/src/transports/ws/sws.c
@@ -1,6 +1,6 @@
/*
Copyright (c) 2013 250bpm s.r.o. All rights reserved.
- Copyright (c) 2014 Wirebird Labs LLC. All rights reserved.
+ Copyright (c) 2014-2016 Jack R. Dunaway. All rights reserved.
Copyright 2015 Garrett D'Amore <garrett@damore.org>
Permission is hereby granted, free of charge, to any person obtaining a copy
@@ -115,7 +115,7 @@
/* Ceases further I/O on the underlying socket and prepares to send a
close handshake on the next receive. */
-static int nn_sws_fail_conn (struct nn_sws *self, int code, char *reason);
+static void nn_sws_fail_conn (struct nn_sws *self, int code, char *reason);
/* Start receiving new message chunk. */
static int nn_sws_recv_hdr (struct nn_sws *self);
@@ -127,6 +127,10 @@
/* Validates incoming text chunks for UTF-8 compliance as per RFC 3629. */
static void nn_sws_validate_utf8_chunk (struct nn_sws *self);
+/* Ensures that Close frames received from peer conform to
+ RFC 6455 section 7. */
+static void nn_sws_acknowledge_close_handshake (struct nn_sws *self);
+
void nn_sws_init (struct nn_sws *self, int src,
struct nn_epbase *epbase, struct nn_fsm *owner)
{
@@ -237,12 +241,14 @@
nn_list_term (msg_array);
}
+/* Given a buffer location, this function determines whether the leading
+ octets form a valid UTF-8 code point. */
static int nn_utf8_code_point (const uint8_t *buffer, size_t len)
{
/* The lack of information is considered neither valid nor invalid. */
if (!buffer || !len)
return NN_SWS_UTF8_FRAGMENT;
-
+
/* RFC 3629 section 4 UTF8-1. */
if (buffer [0] <= 0x7F)
return 1;
@@ -433,7 +439,7 @@
/* Generate 32-bit mask as per RFC 6455 5.3. */
nn_random_generate (rand_mask, NN_SWS_FRAME_SIZE_MASK);
-
+
memcpy (&sws->outhdr [hdr_len], rand_mask, NN_SWS_FRAME_SIZE_MASK);
hdr_len += NN_SWS_FRAME_SIZE_MASK;
@@ -468,13 +474,6 @@
sws->outstate = NN_SWS_OUTSTATE_SENDING;
- /* If a Close handshake was just sent, it's time to shut down. */
- if ((sws->outhdr [0] & NN_SWS_FRAME_BITMASK_OPCODE) ==
- NN_WS_OPCODE_CLOSE) {
- nn_pipebase_stop (&sws->pipebase);
- sws->state = NN_SWS_STATE_CLOSING_CONNECTION;
- }
-
return 0;
}
@@ -485,6 +484,7 @@
struct msg_chunk *ch;
struct nn_cmsghdr *cmsg;
uint8_t opcode_hdr;
+ uint8_t opcode;
size_t cmsgsz;
size_t pos;
@@ -495,15 +495,23 @@
switch (sws->instate) {
case NN_SWS_INSTATE_RECVD_CHUNKED:
- /* This library should not deliver fragmented messages to the application,
- so it's expected that this is the final frame. */
- nn_assert (sws->is_final_frame);
-
- nn_msg_init (msg, sws->inmsg_total_size);
-
/* Relay opcode to the user in order to interpret payload. */
opcode_hdr = sws->inmsg_hdr;
+ /* This library should not deliver fragmented messages to the
+ application, so it's expected that this is the final frame. */
+ nn_assert (sws->is_final_frame);
+ nn_assert (opcode_hdr & NN_SWS_FRAME_BITMASK_FIN);
+ opcode_hdr &= ~NN_SWS_FRAME_BITMASK_FIN;
+
+ /* The library is expected to have failed any connections with other
+ opcodes; these are the only two opcodes that can be chunked. */
+ opcode = opcode_hdr & NN_SWS_FRAME_BITMASK_OPCODE;
+ nn_assert (opcode == NN_WS_OPCODE_BINARY ||
+ opcode == NN_WS_OPCODE_TEXT);
+
+ nn_msg_init (msg, sws->inmsg_total_size);
+
pos = 0;
/* Reassemble incoming message scatter array. */
@@ -529,26 +537,27 @@
case NN_SWS_INSTATE_RECVD_CONTROL:
- /* This library should not deliver fragmented messages to the user, so
- it's expected that this is the final frame. */
- nn_assert (sws->is_final_frame);
-
- nn_msg_init (msg, sws->inmsg_current_chunk_len);
-
/* Relay opcode to the user in order to interpret payload. */
opcode_hdr = sws->inhdr [0];
+ /* This library should not deliver fragmented messages to the
+ application, so it's expected that this is the final frame. */
+ nn_assert (sws->is_final_frame);
+ nn_assert (opcode_hdr & NN_SWS_FRAME_BITMASK_FIN);
+ opcode_hdr &= ~NN_SWS_FRAME_BITMASK_FIN;
+
+ /* The library is expected to have failed any connections with other
+ opcodes; these are the only two control opcodes delivered. */
+ opcode = opcode_hdr & NN_SWS_FRAME_BITMASK_OPCODE;
+ nn_assert (opcode == NN_WS_OPCODE_PING ||
+ opcode == NN_WS_OPCODE_PONG);
+
+ nn_msg_init (msg, sws->inmsg_current_chunk_len);
+
memcpy (((uint8_t*) nn_chunkref_data (&msg->body)),
sws->inmsg_control, sws->inmsg_current_chunk_len);
- /* If a closing handshake was just transferred to the application,
- discontinue continual, async receives. */
- if (sws->opcode == NN_WS_OPCODE_CLOSE) {
- sws->instate = NN_SWS_INSTATE_CLOSED;
- }
- else {
- nn_sws_recv_hdr (sws);
- }
+ nn_sws_recv_hdr (sws);
break;
@@ -597,7 +606,7 @@
code_point_len = nn_utf8_code_point (self->utf8_code_pt_fragment,
self->utf8_code_pt_fragment_len);
-
+
if (code_point_len > 0) {
/* Valid code point found; continue validating. */
break;
@@ -626,12 +635,11 @@
nn_assert (0);
while (len > 0) {
-
code_point_len = nn_utf8_code_point (pos, len);
if (code_point_len > 0) {
/* Valid code point found; continue validating. */
- nn_assert (len >= code_point_len);
+ nn_assert (len >= (size_t) code_point_len);
len -= code_point_len;
pos += code_point_len;
continue;
@@ -678,7 +686,76 @@
return;
}
-static int nn_sws_fail_conn (struct nn_sws *self, int code, char *reason)
+static void nn_sws_acknowledge_close_handshake (struct nn_sws *self)
+{
+ uint8_t *pos;
+ uint16_t close_code;
+ int code_point_len;
+ size_t len;
+
+ len = self->inmsg_current_chunk_len;
+ pos = self->inmsg_current_chunk_buf;
+
+ /* Peer did not provide a Close Code, so choose our own here. */
+ if (len == 0) {
+ nn_sws_fail_conn (self, NN_SWS_CLOSE_NORMAL, "");
+ return;
+ }
+
+ /* If the payload is not even long enough for the required 2-octet
+ Close Code, the connection should have already been failed. */
+ nn_assert (len >= NN_SWS_CLOSE_CODE_LEN);
+ len -= NN_SWS_CLOSE_CODE_LEN;
+ pos += NN_SWS_CLOSE_CODE_LEN;
+
+ /* As per RFC 6455 7.1.6, the Close Reason following the Close Code
+ must be well-formed UTF-8. */
+ while (len > 0) {
+ code_point_len = nn_utf8_code_point (pos, len);
+
+ if (code_point_len > 0) {
+ /* Valid code point found; continue validating. */
+ nn_assert (len >= (size_t) code_point_len);
+ len -= code_point_len;
+ pos += code_point_len;
+ continue;
+ }
+ else {
+ /* RFC 6455 7.1.6 */
+ nn_sws_fail_conn (self, NN_SWS_CLOSE_ERR_PROTO,
+ "Invalid UTF-8 sent as Close Reason.");
+ return;
+ }
+ }
+
+ /* Entire Close Reason is well-formed UTF-8 (or empty) */
+ nn_assert (len == 0);
+
+ close_code = nn_gets (self->inmsg_current_chunk_buf);
+
+ if (close_code == NN_SWS_CLOSE_NORMAL ||
+ close_code == NN_SWS_CLOSE_GOING_AWAY ||
+ close_code == NN_SWS_CLOSE_ERR_PROTO ||
+ close_code == NN_SWS_CLOSE_ERR_WUT ||
+ close_code == NN_SWS_CLOSE_ERR_INVALID_FRAME ||
+ close_code == NN_SWS_CLOSE_ERR_POLICY ||
+ close_code == NN_SWS_CLOSE_ERR_TOOBIG ||
+ close_code == NN_SWS_CLOSE_ERR_EXTENSION ||
+ close_code == NN_SWS_CLOSE_ERR_SERVER ||
+ (close_code >= 3000 && close_code <= 3999) ||
+ (close_code >= 4000 && close_code <= 4999)) {
+ /* Repeat close code, per RFC 6455 7.4.1 and 7.4.2 */
+ nn_sws_fail_conn (self, (int) close_code, "");
+ }
+ else {
+ nn_sws_fail_conn (self, NN_SWS_CLOSE_ERR_PROTO,
+ "Unrecognized close code.");
+ }
+
+ return;
+}
+
+static void nn_sws_fail_conn (struct nn_sws *self, int code, char *reason)
{
size_t reason_len;
size_t payload_len;
@@ -688,6 +765,10 @@
nn_assert_state (self, NN_SWS_STATE_ACTIVE);
+ /* Stop user send/recv actions. */
+ self->instate = NN_SWS_INSTATE_CLOSED;
+ nn_pipebase_stop (&self->pipebase);
+
/* Destroy any remnant incoming message fragments. */
nn_msg_array_term (&self->inmsg_array);
@@ -727,7 +808,7 @@
}
payload_pos = (uint8_t*) (&self->fail_msg [self->fail_msg_len]);
-
+
/* Copy Status Code in network order (big-endian). */
nn_puts (payload_pos, (uint16_t) code);
self->fail_msg_len += NN_SWS_CLOSE_CODE_LEN;
@@ -743,11 +824,6 @@
self->fail_msg_len += payload_len;
- self->instate = NN_SWS_INSTATE_CLOSED;
-
- /* Stop user send/recv actions. */
- nn_pipebase_stop (&self->pipebase);
-
if (self->outstate == NN_SWS_OUTSTATE_IDLE) {
iov.iov_base = self->fail_msg;
iov.iov_len = self->fail_msg_len;
@@ -759,7 +835,7 @@
nn_fsm_raise (&self->fsm, &self->done, NN_SWS_RETURN_CLOSE_HANDSHAKE);
}
- return 0;
+ return;
}
static void nn_sws_shutdown (struct nn_fsm *self, int src, int type,
@@ -796,6 +872,8 @@
{
struct nn_sws *sws;
int rc;
+ int opt;
+ size_t opt_sz = sizeof (opt);
sws = nn_cont (self, struct nn_sws, fsm);
@@ -1093,7 +1171,7 @@
}
/* Continue to receive extended header+payload. */
break;
-
+
case NN_WS_OPCODE_PONG:
sws->is_control_frame = 1;
sws->pongs_received++;
@@ -1124,7 +1202,7 @@
}
/* Continue to receive extended header+payload. */
break;
-
+
case NN_WS_OPCODE_CLOSE:
/* RFC 6455 section 5.5.1. */
sws->is_control_frame = 1;
@@ -1159,13 +1237,12 @@
/* Special case when there is no payload,
mask, or additional frames. */
sws->inmsg_current_chunk_len = 0;
- sws->instate = NN_SWS_INSTATE_RECVD_CONTROL;
- nn_pipebase_received (&sws->pipebase);
+ nn_sws_acknowledge_close_handshake (sws);
return;
}
/* Continue to receive extended header+payload. */
break;
-
+
default:
/* Client sent an invalid opcode; as per RFC 6455
section 10.7, close connection with code. */
@@ -1181,17 +1258,12 @@
nn_assert (sws->mode == NN_WS_CLIENT);
/* In the case of no additional header, the payload
- is known to not exceed this threshold. */
- nn_assert (sws->payload_ctl <= NN_SWS_PAYLOAD_MAX_LENGTH);
+ is known to be within these bounds. */
+ nn_assert (0 < sws->payload_ctl &&
+ sws->payload_ctl <= NN_SWS_PAYLOAD_MAX_LENGTH);
- /* In the case of no additional header, the payload
- is known to not exceed this threshold. */
- nn_assert (sws->payload_ctl > 0);
-
- sws->instate = NN_SWS_INSTATE_RECV_PAYLOAD;
sws->inmsg_current_chunk_len = sws->payload_ctl;
-
/* Use scatter/gather array for application messages,
and a fixed-width buffer for control messages. This
is convenient since control messages can be
@@ -1200,13 +1272,25 @@
sws->inmsg_current_chunk_buf = sws->inmsg_control;
}
else {
- sws->inmsg_chunks++;
sws->inmsg_total_size += sws->inmsg_current_chunk_len;
+ /* Protect non-control messages against the
+ NN_RCVMAXSIZE threshold; control messages already
+ have a small pre-allocated buffer, and therefore
+ are not subject to this limit. */
+ nn_pipebase_getopt (&sws->pipebase, NN_SOL_SOCKET,
+ NN_RCVMAXSIZE, &opt, &opt_sz);
+ if (opt >= 0 && sws->inmsg_total_size > (size_t) opt) {
+ nn_sws_fail_conn (sws, NN_SWS_CLOSE_ERR_TOOBIG,
+ "Message larger than application allows.");
+ return;
+ }
+ sws->inmsg_chunks++;
sws->inmsg_current_chunk_buf =
nn_msg_chunk_new (sws->inmsg_current_chunk_len,
&sws->inmsg_array);
}
+ sws->instate = NN_SWS_INSTATE_RECV_PAYLOAD;
nn_usock_recv (sws->usock, sws->inmsg_current_chunk_buf,
sws->inmsg_current_chunk_len, NULL);
return;
@@ -1267,25 +1351,22 @@
}
/* Handle zero-length message bodies. */
- if (sws->inmsg_current_chunk_len == 0)
- {
+ if (sws->inmsg_current_chunk_len == 0) {
if (sws->is_final_frame) {
- if (sws->opcode == NN_WS_OPCODE_CLOSE) {
- nn_pipebase_stop (&sws->pipebase);
- sws->state = NN_SWS_STATE_CLOSING_CONNECTION;
+ if (sws->opcode == NN_WS_OPCODE_CLOSE) {
+ nn_sws_acknowledge_close_handshake (sws);
}
- else
- {
- sws->instate = (sws->is_control_frame ?
- NN_SWS_INSTATE_RECVD_CONTROL :
- NN_SWS_INSTATE_RECVD_CHUNKED);
- nn_pipebase_received (&sws->pipebase);
+ else {
+ sws->instate = (sws->is_control_frame ?
+ NN_SWS_INSTATE_RECVD_CONTROL :
+ NN_SWS_INSTATE_RECVD_CHUNKED);
+ nn_pipebase_received (&sws->pipebase);
}
}
else {
nn_sws_recv_hdr (sws);
}
- return;
+ return;
}
nn_assert (sws->inmsg_current_chunk_len > 0);
@@ -1298,8 +1379,19 @@
sws->inmsg_current_chunk_buf = sws->inmsg_control;
}
else {
- sws->inmsg_chunks++;
sws->inmsg_total_size += sws->inmsg_current_chunk_len;
+ /* Protect non-control messages against the
+ NN_RCVMAXSIZE threshold; control messages already
+ have a small pre-allocated buffer, and therefore
+ are not subject to this limit. */
+ nn_pipebase_getopt (&sws->pipebase, NN_SOL_SOCKET,
+ NN_RCVMAXSIZE, &opt, &opt_sz);
+ if (opt >= 0 && sws->inmsg_total_size > (size_t) opt) {
+ nn_sws_fail_conn (sws, NN_SWS_CLOSE_ERR_TOOBIG,
+ "Message size exceeds limit.");
+ return;
+ }
+ sws->inmsg_chunks++;
sws->inmsg_current_chunk_buf =
nn_msg_chunk_new (sws->inmsg_current_chunk_len,
&sws->inmsg_array);
@@ -1362,21 +1454,14 @@
return;
case NN_WS_OPCODE_CLOSE:
- /* If the payload is not even long enough for the
- required 2-octet Close Code, the connection
- should have been failed upstream. */
- nn_assert (sws->inmsg_current_chunk_len >=
- NN_SWS_CLOSE_CODE_LEN);
-
- nn_pipebase_stop (&sws->pipebase);
- sws->state = NN_SWS_STATE_CLOSING_CONNECTION;
+ nn_sws_acknowledge_close_handshake (sws);
return;
default:
/* This should have been prevented upstream. */
nn_assert (0);
return;
- }
+ }
default:
nn_fsm_error ("Unexpected socket instate",
diff --git a/src/transports/ws/ws_handshake.c b/src/transports/ws/ws_handshake.c
index d5ef7ef..45b62b1 100644
--- a/src/transports/ws/ws_handshake.c
+++ b/src/transports/ws/ws_handshake.c
@@ -1,6 +1,6 @@
/*
Copyright (c) 2013 250bpm s.r.o. All rights reserved.
- Copyright (c) 2014 Wirebird Labs LLC. All rights reserved.
+ Copyright (c) 2014-2016 Jack R. Dunaway. All rights reserved.
Copyright 2015 Garrett D'Amore <garrett@damore.org>
Permission is hereby granted, free of charge, to any person obtaining a copy
@@ -1215,6 +1215,7 @@
rc = nn_base64_encode (rand_key, sizeof (rand_key),
encoded_key, sizeof (encoded_key));
+ nn_assert (rc >=0);
encoded_key_len = strlen (encoded_key);
@@ -1273,6 +1274,7 @@
rc = nn_ws_handshake_hash_key (self->key, self->key_len,
accept_key, sizeof (accept_key));
+ nn_assert (rc >= 0);
nn_assert (strlen (accept_key) == NN_WS_HANDSHAKE_ACCEPT_KEY_LEN);
diff --git a/tests/ws.c b/tests/ws.c
index 8e9df46..e855786 100644
--- a/tests/ws.c
+++ b/tests/ws.c
@@ -1,6 +1,6 @@
/*
Copyright (c) 2012 250bpm s.r.o. All rights reserved.
- Copyright (c) 2014 Wirebird Labs LLC. All rights reserved.
+ Copyright (c) 2014-2016 Jack R. Dunaway. All rights reserved.
Copyright 2015 Garrett D'Amore <garrett@damore.org>
Copyright 2016 Franklin "Snaipe" Mathieu <franklinmathieu@gmail.com>
@@ -35,8 +35,8 @@
/* test_text() verifies that we drop messages properly when sending invalid
UTF-8, but not when we send valid data. */
-void test_text() {
-
+void test_text ()
+{
int sb;
int sc;
int opt;
@@ -46,15 +46,12 @@
sb = test_socket (AF_SP, NN_PAIR);
sc = test_socket (AF_SP, NN_PAIR);
- /* Wait for connects to establish. */
- nn_sleep (200);
-
opt = NN_WS_MSG_TYPE_TEXT;
- test_setsockopt(sb, NN_WS, NN_WS_MSG_TYPE, &opt, sizeof (opt));
+ test_setsockopt (sb, NN_WS, NN_WS_MSG_TYPE, &opt, sizeof (opt));
opt = NN_WS_MSG_TYPE_TEXT;
- test_setsockopt(sc, NN_WS, NN_WS_MSG_TYPE, &opt, sizeof (opt));
+ test_setsockopt (sc, NN_WS, NN_WS_MSG_TYPE, &opt, sizeof (opt));
opt = 500;
- test_setsockopt(sb, NN_SOL_SOCKET, NN_RCVTIMEO, &opt, sizeof (opt));
+ test_setsockopt (sb, NN_SOL_SOCKET, NN_RCVTIMEO, &opt, sizeof (opt));
test_bind (sb, socket_address);
test_connect (sc, socket_address);
@@ -63,12 +60,17 @@
test_recv (sb, "GOOD");
/* and the bad ... */
- strcpy((char *)bad, "BAD.");
+ strcpy ((char *)bad, "BAD.");
bad[2] = (char)0xDD;
test_send (sc, (char *)bad);
/* Make sure we dropped the frame. */
test_drop (sb, ETIMEDOUT);
+
+ test_close (sb);
+ test_close (sc);
+
+ return;
}
int main (int argc, const char *argv[])
@@ -76,16 +78,17 @@
int rc;
int sb;
int sc;
+ int sb2;
int opt;
size_t sz;
int i;
char any_address[128];
- test_addr_from(socket_address, "ws", "127.0.0.1",
- get_test_port(argc, argv));
+ test_addr_from (socket_address, "ws", "127.0.0.1",
+ get_test_port (argc, argv));
- test_addr_from(any_address, "ws", "*",
- get_test_port(argc, argv));
+ test_addr_from (any_address, "ws", "*",
+ get_test_port (argc, argv));
/* Try closing bound but unconnected socket. */
sb = test_socket (AF_SP, NN_PAIR);
@@ -159,16 +162,11 @@
test_close (sc);
- nn_sleep (200);
-
sb = test_socket (AF_SP, NN_PAIR);
test_bind (sb, socket_address);
sc = test_socket (AF_SP, NN_PAIR);
test_connect (sc, socket_address);
- /* Leave enough time for connection establishment. */
- nn_sleep (200);
-
/* Ping-pong test. */
for (i = 0; i != 100; ++i) {
@@ -190,7 +188,64 @@
test_close (sc);
test_close (sb);
+ /* Test that NN_RCVMAXSIZE can be -1, but not lower */
+ sb = test_socket (AF_SP, NN_PAIR);
+ opt = -1;
+ rc = nn_setsockopt (sb, NN_SOL_SOCKET, NN_RCVMAXSIZE, &opt, sizeof (opt));
+ nn_assert (rc >= 0);
+ opt = -2;
+ rc = nn_setsockopt (sb, NN_SOL_SOCKET, NN_RCVMAXSIZE, &opt, sizeof (opt));
+ nn_assert (rc < 0);
+ errno_assert (nn_errno () == EINVAL);
+ test_close (sb);
+
+ /* Test NN_RCVMAXSIZE limit */
+ sb = test_socket (AF_SP, NN_PAIR);
+ test_bind (sb, socket_address);
+ sc = test_socket (AF_SP, NN_PAIR);
+ test_connect (sc, socket_address);
+ opt = 1000;
+ test_setsockopt (sc, NN_SOL_SOCKET, NN_SNDTIMEO, &opt, sizeof (opt));
+ nn_assert (opt == 1000);
+ opt = 1000;
+ test_setsockopt (sb, NN_SOL_SOCKET, NN_RCVTIMEO, &opt, sizeof (opt));
+ nn_assert (opt == 1000);
+ opt = 4;
+ test_setsockopt (sb, NN_SOL_SOCKET, NN_RCVMAXSIZE, &opt, sizeof (opt));
+ test_send (sc, "ABC");
+ test_recv (sb, "ABC");
+ test_send (sc, "ABCD");
+ test_recv (sb, "ABCD");
+ test_send (sc, "ABCDE");
+ test_drop (sb, ETIMEDOUT);
+
+ /* Increase the size limit, reconnect, then try sending again. The reason a
+ reconnect is necessary is because after a protocol violation, the
+ connecting socket will not continue automatic reconnection attempts. */
+ opt = 5;
+ test_setsockopt (sb, NN_SOL_SOCKET, NN_RCVMAXSIZE, &opt, sizeof (opt));
+ test_connect (sc, socket_address);
+ test_send (sc, "ABCDE");
+ test_recv (sb, "ABCDE");
+ test_close (sb);
+ test_close (sc);
+
test_text ();
+ /* Test closing a socket that is waiting to bind. */
+ sb = test_socket (AF_SP, NN_PAIR);
+ test_bind (sb, socket_address);
+ sb2 = test_socket (AF_SP, NN_PAIR);
+ test_bind (sb2, socket_address);
+ sc = test_socket (AF_SP, NN_PAIR);
+ test_connect (sc, socket_address);
+ test_send (sb, "ABC");
+ test_recv (sc, "ABC");
+ test_close (sb2);
+ test_send (sb, "ABC");
+ test_recv (sc, "ABC");
+ test_close (sb);
+ test_close (sc);
+
return 0;
}