Reassemby of ws messages from incoming message fragments

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/transports/ws/sws.c b/src/transports/ws/sws.c
index 5d6004e..a6c101c 100644
--- a/src/transports/ws/sws.c
+++ b/src/transports/ws/sws.c
@@ -29,6 +29,7 @@
 #include "../../utils/wire.h"
 #include "../../utils/int.h"
 #include "../../utils/attr.h"
+#include "../../utils/alloc.h"
 #include "../../utils/random.h"
 
 /*  States of the object as a whole. */
@@ -64,6 +65,13 @@
 #define NN_SWS_SIZE_16 0x7e
 #define NN_SWS_SIZE_64 0x7f
 
+/*  Array element type for incoming message fragments.
+    Fragmented messages are reassembled prior to notifying the user. */
+struct nn_sws_fragment {
+    struct nn_list_item item;
+    struct nn_chunkref data;
+};
+
 /*  Stream is a special type of pipe. Implementation of the virtual pipe API. */
 static int nn_sws_send (struct nn_pipebase *self, struct nn_msg *msg);
 static int nn_sws_recv (struct nn_pipebase *self, struct nn_msg *msg);
@@ -89,7 +97,8 @@
     self->usock_owner.fsm = NULL;
     nn_pipebase_init (&self->pipebase, &nn_sws_pipebase_vfptr, epbase);
     self->instate = -1;
-    nn_msg_init (&self->inmsg, 0);
+    nn_list_init (&self->inmsg);
+    self->inmsglen = 0;
     self->outstate = -1;
     nn_msg_init (&self->outmsg, 0);
     nn_fsm_event_init (&self->done);
@@ -97,11 +106,22 @@
 
 void nn_sws_term (struct nn_sws *self)
 {
+    struct nn_sws_fragment *fragment;
+
     nn_assert_state (self, NN_SWS_STATE_IDLE);
 
+    while (!nn_list_empty (&self->inmsg)) {
+        fragment = nn_cont (nn_list_begin (&self->inmsg),
+            struct nn_sws_fragment, item);
+        nn_list_erase (&self->inmsg, &fragment->item);
+        nn_list_item_term (&fragment->item);
+        nn_chunkref_term (&fragment->data);
+        nn_free (fragment);
+    }
+
     nn_fsm_event_term (&self->done);
     nn_msg_term (&self->outmsg);
-    nn_msg_term (&self->inmsg);
+    nn_list_term (&self->inmsg);
     nn_pipebase_term (&self->pipebase);
     nn_fsm_term (&self->fsm);
 }
@@ -206,15 +226,40 @@
 static int nn_sws_recv (struct nn_pipebase *self, struct nn_msg *msg)
 {
     struct nn_sws *sws;
+    struct nn_list_item *it;
+    struct nn_sws_fragment *fragment;
+    size_t pos;
+    size_t sz;
 
     sws = nn_cont (self, struct nn_sws, pipebase);
 
     nn_assert_state (sws, NN_SWS_STATE_ACTIVE);
     nn_assert (sws->instate == NN_SWS_INSTATE_HASMSG);
 
-    /*  Move received message to the user. */
-    nn_msg_mv (msg, &sws->inmsg);
-    nn_msg_init (&sws->inmsg, 0);
+    /*  Reassemble the list of fragments into a single message and
+        pass it to the user. */
+    /*  TODO: Optimise for the case of a single-fragment message. */
+    nn_msg_init (msg, sws->inmsglen);
+    nn_assert (!nn_list_empty (&sws->inmsg));
+    pos = 0;
+    while (1) {
+        if (nn_list_empty (&sws->inmsg))
+            break;
+        fragment = nn_cont (
+            nn_list_prev (&sws->inmsg, nn_list_end (&sws->inmsg)),
+            struct nn_sws_fragment, item);
+        sz = nn_chunkref_size (&fragment->data);
+        nn_assert (pos + sz <= sws->inmsglen);
+        memcpy (((char*) nn_chunkref_data (&msg->body)) + pos,
+            nn_chunkref_data (&fragment->data), sz);
+        pos += sz;
+        nn_list_erase (&sws->inmsg, &fragment->item);
+        nn_list_item_term (&fragment->item);
+        nn_chunkref_term (&fragment->data);
+        nn_free (fragment);
+    }
+    nn_assert (pos == sws->inmsglen);
+    sws->inmsglen = 0;
 
     /*  Start receiving new message by reading 2 bytes. That's a minimal
         message header and by looking at it we'll find out whether any
@@ -252,6 +297,7 @@
     int rc;
     struct nn_sws *sws;
     uint64_t size;
+    struct nn_sws_fragment *fragment;
 
     sws = nn_cont (self, struct nn_sws, fsm);
 
@@ -316,9 +362,6 @@
                 switch (sws->instate) {
                 case NN_SWS_INSTATE_HDR:
 
-                    /* TODO */
-                    nn_assert ((sws->inhdr [0] & NN_SWS_FIN) != 0);
-
                     /*  Reserved bits should not be set. */
                     nn_assert ((sws->inhdr [0] & NN_SWS_RSVS) == 0);
 
@@ -374,22 +417,27 @@
                             nn_masker_init (&sws->masker, &sws->inhdr [2]);
                     }
  
-                    /* Allocate memory for the message. */
-                    nn_msg_term (&sws->inmsg);
-                    nn_msg_init (&sws->inmsg, (size_t) size);
+                    /*  Allocate memory for the message. */
+                    /*  TODO: Get rid of this allocation for messages
+                               with a single fragment. */
+                    fragment = nn_alloc (sizeof (struct nn_sws_fragment),
+                        "ws incoming fragment");
+                    alloc_assert (fragment);
+                    nn_list_item_init (&fragment->item);
+                    nn_chunkref_init (&fragment->data, (size_t) size);
+                    nn_list_insert (&sws->inmsg, &fragment->item,
+                        nn_list_begin (&sws->inmsg));
+                    sws->inmsglen += size;
 
                     /*  Special case when size of the message body is 0. */
-                    if (!size) {
-                        sws->instate = NN_SWS_INSTATE_HASMSG;
-                        nn_pipebase_received (&sws->pipebase);
-                        return;
-                    }
+                    if (!size)
+                        goto fragment_received;
 
                     /*  Start receiving the message body. */
                     sws->instate = NN_SWS_INSTATE_BODY;
                     nn_usock_recv (sws->usock,
-                        nn_chunkref_data (&sws->inmsg.body),
-                       (size_t) size, NULL);
+                        nn_chunkref_data (&fragment->data),
+                        (size_t) size, NULL);
 
                     return;
 
@@ -397,9 +445,19 @@
 
                     /*  Unmask the message body, if needed. */
                     if (sws->inhdr [1] & NN_SWS_MASK) {
+                        fragment = nn_cont (nn_list_begin (&sws->inmsg),
+                            struct nn_sws_fragment, item);
                         nn_masker_mask (&sws->masker,
-                            nn_chunkref_data (&sws->inmsg.body),
-                            nn_chunkref_size (&sws->inmsg.body));
+                            nn_chunkref_data (&fragment->data),
+                            nn_chunkref_size (&fragment->data));
+                    }
+
+fragment_received:
+                    /*  If this is not the last fragment, read the next one. */
+                    if (!(sws->inhdr [0] & NN_SWS_FIN)) {
+                        sws->instate = NN_SWS_INSTATE_HDR;
+                        nn_usock_recv (sws->usock, sws->inhdr, 2, NULL);
+                        return;
                     }
 
                     /*  Message body is now fully received.
diff --git a/src/transports/ws/sws.h b/src/transports/ws/sws.h
index b4b5cad..7816399 100644
--- a/src/transports/ws/sws.h
+++ b/src/transports/ws/sws.h
@@ -30,6 +30,7 @@
 #include "../../aio/usock.h"
 
 #include "../../utils/msg.h"
+#include "../../utils/list.h"
 
 #include "masker.h"
 
@@ -67,8 +68,11 @@
         possible header is 14 bytes long, see RFC 6455, section 5.2. */
     uint8_t inhdr [14];
 
-    /*  Message being received at the moment. */
-    struct nn_msg inmsg;
+    /*  Message being received at the moment, if form of list of fragments. */
+    struct nn_list inmsg;
+
+    /*  Total size of all the fragments inside inmsg list. */
+    size_t inmsglen;
 
     /*  State of the outbound state machine. */
     int outstate;