Reassemby of ws messages from incoming message fragments
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/transports/ws/sws.c b/src/transports/ws/sws.c
index 5d6004e..a6c101c 100644
--- a/src/transports/ws/sws.c
+++ b/src/transports/ws/sws.c
@@ -29,6 +29,7 @@
#include "../../utils/wire.h"
#include "../../utils/int.h"
#include "../../utils/attr.h"
+#include "../../utils/alloc.h"
#include "../../utils/random.h"
/* States of the object as a whole. */
@@ -64,6 +65,13 @@
#define NN_SWS_SIZE_16 0x7e
#define NN_SWS_SIZE_64 0x7f
+/* Array element type for incoming message fragments.
+ Fragmented messages are reassembled prior to notifying the user. */
+struct nn_sws_fragment {
+ struct nn_list_item item;
+ struct nn_chunkref data;
+};
+
/* Stream is a special type of pipe. Implementation of the virtual pipe API. */
static int nn_sws_send (struct nn_pipebase *self, struct nn_msg *msg);
static int nn_sws_recv (struct nn_pipebase *self, struct nn_msg *msg);
@@ -89,7 +97,8 @@
self->usock_owner.fsm = NULL;
nn_pipebase_init (&self->pipebase, &nn_sws_pipebase_vfptr, epbase);
self->instate = -1;
- nn_msg_init (&self->inmsg, 0);
+ nn_list_init (&self->inmsg);
+ self->inmsglen = 0;
self->outstate = -1;
nn_msg_init (&self->outmsg, 0);
nn_fsm_event_init (&self->done);
@@ -97,11 +106,22 @@
void nn_sws_term (struct nn_sws *self)
{
+ struct nn_sws_fragment *fragment;
+
nn_assert_state (self, NN_SWS_STATE_IDLE);
+ while (!nn_list_empty (&self->inmsg)) {
+ fragment = nn_cont (nn_list_begin (&self->inmsg),
+ struct nn_sws_fragment, item);
+ nn_list_erase (&self->inmsg, &fragment->item);
+ nn_list_item_term (&fragment->item);
+ nn_chunkref_term (&fragment->data);
+ nn_free (fragment);
+ }
+
nn_fsm_event_term (&self->done);
nn_msg_term (&self->outmsg);
- nn_msg_term (&self->inmsg);
+ nn_list_term (&self->inmsg);
nn_pipebase_term (&self->pipebase);
nn_fsm_term (&self->fsm);
}
@@ -206,15 +226,40 @@
static int nn_sws_recv (struct nn_pipebase *self, struct nn_msg *msg)
{
struct nn_sws *sws;
+ struct nn_list_item *it;
+ struct nn_sws_fragment *fragment;
+ size_t pos;
+ size_t sz;
sws = nn_cont (self, struct nn_sws, pipebase);
nn_assert_state (sws, NN_SWS_STATE_ACTIVE);
nn_assert (sws->instate == NN_SWS_INSTATE_HASMSG);
- /* Move received message to the user. */
- nn_msg_mv (msg, &sws->inmsg);
- nn_msg_init (&sws->inmsg, 0);
+ /* Reassemble the list of fragments into a single message and
+ pass it to the user. */
+ /* TODO: Optimise for the case of a single-fragment message. */
+ nn_msg_init (msg, sws->inmsglen);
+ nn_assert (!nn_list_empty (&sws->inmsg));
+ pos = 0;
+ while (1) {
+ if (nn_list_empty (&sws->inmsg))
+ break;
+ fragment = nn_cont (
+ nn_list_prev (&sws->inmsg, nn_list_end (&sws->inmsg)),
+ struct nn_sws_fragment, item);
+ sz = nn_chunkref_size (&fragment->data);
+ nn_assert (pos + sz <= sws->inmsglen);
+ memcpy (((char*) nn_chunkref_data (&msg->body)) + pos,
+ nn_chunkref_data (&fragment->data), sz);
+ pos += sz;
+ nn_list_erase (&sws->inmsg, &fragment->item);
+ nn_list_item_term (&fragment->item);
+ nn_chunkref_term (&fragment->data);
+ nn_free (fragment);
+ }
+ nn_assert (pos == sws->inmsglen);
+ sws->inmsglen = 0;
/* Start receiving new message by reading 2 bytes. That's a minimal
message header and by looking at it we'll find out whether any
@@ -252,6 +297,7 @@
int rc;
struct nn_sws *sws;
uint64_t size;
+ struct nn_sws_fragment *fragment;
sws = nn_cont (self, struct nn_sws, fsm);
@@ -316,9 +362,6 @@
switch (sws->instate) {
case NN_SWS_INSTATE_HDR:
- /* TODO */
- nn_assert ((sws->inhdr [0] & NN_SWS_FIN) != 0);
-
/* Reserved bits should not be set. */
nn_assert ((sws->inhdr [0] & NN_SWS_RSVS) == 0);
@@ -374,22 +417,27 @@
nn_masker_init (&sws->masker, &sws->inhdr [2]);
}
- /* Allocate memory for the message. */
- nn_msg_term (&sws->inmsg);
- nn_msg_init (&sws->inmsg, (size_t) size);
+ /* Allocate memory for the message. */
+ /* TODO: Get rid of this allocation for messages
+ with a single fragment. */
+ fragment = nn_alloc (sizeof (struct nn_sws_fragment),
+ "ws incoming fragment");
+ alloc_assert (fragment);
+ nn_list_item_init (&fragment->item);
+ nn_chunkref_init (&fragment->data, (size_t) size);
+ nn_list_insert (&sws->inmsg, &fragment->item,
+ nn_list_begin (&sws->inmsg));
+ sws->inmsglen += size;
/* Special case when size of the message body is 0. */
- if (!size) {
- sws->instate = NN_SWS_INSTATE_HASMSG;
- nn_pipebase_received (&sws->pipebase);
- return;
- }
+ if (!size)
+ goto fragment_received;
/* Start receiving the message body. */
sws->instate = NN_SWS_INSTATE_BODY;
nn_usock_recv (sws->usock,
- nn_chunkref_data (&sws->inmsg.body),
- (size_t) size, NULL);
+ nn_chunkref_data (&fragment->data),
+ (size_t) size, NULL);
return;
@@ -397,9 +445,19 @@
/* Unmask the message body, if needed. */
if (sws->inhdr [1] & NN_SWS_MASK) {
+ fragment = nn_cont (nn_list_begin (&sws->inmsg),
+ struct nn_sws_fragment, item);
nn_masker_mask (&sws->masker,
- nn_chunkref_data (&sws->inmsg.body),
- nn_chunkref_size (&sws->inmsg.body));
+ nn_chunkref_data (&fragment->data),
+ nn_chunkref_size (&fragment->data));
+ }
+
+fragment_received:
+ /* If this is not the last fragment, read the next one. */
+ if (!(sws->inhdr [0] & NN_SWS_FIN)) {
+ sws->instate = NN_SWS_INSTATE_HDR;
+ nn_usock_recv (sws->usock, sws->inhdr, 2, NULL);
+ return;
}
/* Message body is now fully received.
diff --git a/src/transports/ws/sws.h b/src/transports/ws/sws.h
index b4b5cad..7816399 100644
--- a/src/transports/ws/sws.h
+++ b/src/transports/ws/sws.h
@@ -30,6 +30,7 @@
#include "../../aio/usock.h"
#include "../../utils/msg.h"
+#include "../../utils/list.h"
#include "masker.h"
@@ -67,8 +68,11 @@
possible header is 14 bytes long, see RFC 6455, section 5.2. */
uint8_t inhdr [14];
- /* Message being received at the moment. */
- struct nn_msg inmsg;
+ /* Message being received at the moment, if form of list of fragments. */
+ struct nn_list inmsg;
+
+ /* Total size of all the fragments inside inmsg list. */
+ size_t inmsglen;
/* State of the outbound state machine. */
int outstate;