fixes #978 nanomsg IPC on Windows is very fragile

This is critical for hardening use cases with other IPC clients
such as nng or mangos.  The old code made some very incorrect
assumptions about the atomicity of named pipes and ReadFile.

We've changed the code so that if ReadFile (or WSARecv incidentally)
ever returns a partial read, we keep going.

This solves a critial assertion error, and greatly improves
the crash resistance of nanomsg when using IPC on Windows.
diff --git a/src/aio/usock_win.inc b/src/aio/usock_win.inc
index ccbd607..5087f47 100644
--- a/src/aio/usock_win.inc
+++ b/src/aio/usock_win.inc
@@ -1,6 +1,8 @@
 /*
     Copyright (c) 2013 Martin Sustrik  All rights reserved.
     Copyright (c) 2013 GoPivotal, Inc.  All rights reserved.
+    Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+    Copyright 2018 Capitar IT Group BV <info@capitar.com>
 
     Permission is hereby granted, free of charge, to any person obtaining a copy
     of this software and associated documentation files (the "Software"),
@@ -311,7 +313,7 @@
     listener->asock = self;
 
     /*  Asynchronous accept. */
-    nn_worker_op_start (&listener->in, 0);
+    nn_worker_op_start (&listener->in);
 }
 
 void nn_usock_activate (struct nn_usock *self)
@@ -371,7 +373,7 @@
     }
 
     /*  Asynchronous connect. */
-    nn_worker_op_start (&self->out, 0);
+    nn_worker_op_start (&self->out);
 }
 
 void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov,
@@ -418,7 +420,7 @@
         }
         brc = WriteFile (self->p, self->pipesendbuf, (DWORD) len, NULL, &self->out.olpd);
         if (nn_fast (brc || GetLastError() == ERROR_IO_PENDING)) {
-            nn_worker_op_start (&self->out, 0);
+            nn_worker_op_start (&self->out);
             return;
         }
         error = GetLastError();
@@ -430,12 +432,12 @@
 
     rc = WSASend (self->s, wbuf, iovcnt, NULL, 0, &self->out.olpd, NULL);
     if (nn_fast (rc == 0)) {
-        nn_worker_op_start (&self->out, 0);
+        nn_worker_op_start (&self->out);
         return;
     }
     error = WSAGetLastError();
     if (nn_fast (error == WSA_IO_PENDING)) {
-        nn_worker_op_start (&self->out, 0);
+        nn_worker_op_start (&self->out);
         return;
     }
     wsa_assert (error == WSAECONNABORTED || error == WSAECONNRESET ||
@@ -445,14 +447,67 @@
     nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
 }
 
-void nn_usock_recv (struct nn_usock *self, void *buf, size_t len, int *fd)
+void nn_usock_recv_start_wsock (void *arg)
 {
-    int rc;
-    BOOL brc;
+    struct nn_usock *self = arg;
     WSABUF wbuf;
-    DWORD wflags;
+    DWORD flags;
     DWORD error;
 
+    /*  Start the receive operation. */
+    wbuf.len = (ULONG) self->in.resid;
+    wbuf.buf = (char FAR*) self->in.buf;
+    flags = MSG_WAITALL;
+    memset (&self->in.olpd, 0, sizeof (self->in.olpd));
+
+    if (WSARecv (self->s, &wbuf, 1, NULL, &flags, &self->in.olpd, NULL) == 0) {
+        error = ERROR_SUCCESS;
+    } else {
+        error = WSAGetLastError ();
+    }
+
+    switch (error) {
+    case ERROR_SUCCESS:
+    case WSA_IO_PENDING:
+        nn_worker_op_start (&self->in);
+        return;
+
+    default:
+        nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
+        return;
+    }
+}
+
+void nn_usock_recv_start_pipe (void *arg)
+{
+    struct nn_usock *self = arg;
+    void *buf = self->in.buf;
+    DWORD len = (DWORD) self->in.resid;
+    DWORD error;
+
+    /*  Start the receive operation. */
+    memset (&self->in.olpd, 0, sizeof (self->in.olpd));
+
+    if (ReadFile (self->p, buf, len, NULL, &self->in.olpd)) {
+        error = ERROR_SUCCESS;
+    } else {
+        error = GetLastError ();
+    }
+
+    switch (error) {
+    case ERROR_SUCCESS:
+    case ERROR_IO_PENDING:
+        nn_worker_op_start (&self->in);
+        return;
+
+    default:
+        nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
+        return;
+   }
+}
+
+void nn_usock_recv (struct nn_usock *self, void *buf, size_t len, int *fd)
+{
     /*  Passing file descriptors is not implemented on Windows platform. */
     if (fd)
         *fd = -1;
@@ -460,44 +515,18 @@
     /*  Make sure that the socket is actually alive. */
     nn_assert_state (self, NN_USOCK_STATE_ACTIVE);
 
-    /*  Start the receive operation. */
-    wbuf.len = (ULONG) len;
-    wbuf.buf = (char FAR*) buf;
-    wflags = MSG_WAITALL;
-    memset (&self->in.olpd, 0, sizeof (self->in.olpd));
-
+    self->in.resid = len;
+    self->in.buf = buf;
+    self->in.arg = self;
+    self->in.zero_is_error = 1;
     if (self->domain == AF_UNIX) {
-
-        /*  Ensure the total buffer size does not exceed size limitation
-            of WriteFile. */
-        nn_assert (len <= MAXDWORD);
-        brc = ReadFile (self->p, buf, (DWORD) len, NULL, &self->in.olpd);
-        error = brc ? ERROR_SUCCESS : GetLastError ();
+        self->in.start = nn_usock_recv_start_pipe;
     }
     else {
-        rc = WSARecv (self->s, &wbuf, 1, NULL, &wflags, &self->in.olpd, NULL);
-        error = (rc == 0) ? ERROR_SUCCESS : WSAGetLastError ();
+        self->in.start = nn_usock_recv_start_wsock;
     }
 
-    if (nn_fast (error == ERROR_SUCCESS)) {
-        nn_worker_op_start (&self->in, 1);
-        return;
-    }
-
-    if (nn_fast (error == WSA_IO_PENDING)) {
-        nn_worker_op_start (&self->in, 1);
-        return;
-    }
-
-    if (error == WSAECONNABORTED || error == WSAECONNRESET ||
-          error == WSAENETDOWN || error == WSAENETRESET ||
-          error == WSAETIMEDOUT || error == WSAEWOULDBLOCK ||
-          error == ERROR_PIPE_NOT_CONNECTED || error == ERROR_BROKEN_PIPE) {
-        nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
-        return;
-   }
-
-    wsa_assert (0);
+    self->in.start (self->in.arg);
 }
 
 static void nn_usock_create_io_completion (struct nn_usock *self)
@@ -628,7 +657,7 @@
     listener->asock = self;
 
     /*  Asynchronous accept. */
-    nn_worker_op_start (&listener->in, 0);
+    nn_worker_op_start (&listener->in);
 }
 
 static void nn_usock_close (struct nn_usock *self)
diff --git a/src/aio/worker_win.h b/src/aio/worker_win.h
index 1b1dac0..200f4dd 100644
--- a/src/aio/worker_win.h
+++ b/src/aio/worker_win.h
@@ -1,5 +1,7 @@
 /*
     Copyright (c) 2013 Martin Sustrik  All rights reserved.
+    Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+    Copyright 2018 Capitar IT Group BV <info@capitar.com>
 
     Permission is hereby granted, free of charge, to any person obtaining a copy
     of this software and associated documentation files (the "Software"),
@@ -42,16 +44,21 @@
     /*  This structure is to be used by the user, not nn_worker_op itself.
         Actual usage is specific to the asynchronous operation in question. */
     OVERLAPPED olpd;
+
+    /*  We might have transferred less than requested.  This keeps track. */
+    size_t resid;
+    char *buf;
+    void *arg;
+    void (*start)(struct nn_usock *);
+    int zero_is_error;
 };
 
 void nn_worker_op_init (struct nn_worker_op *self, int src,
     struct nn_fsm *owner);
 void nn_worker_op_term (struct nn_worker_op *self);
 
-/*  Call this function when asynchronous operation is started.
-    If 'zeroiserror' is set to 1, zero bytes transferred will be treated
-    as an error. */
-void nn_worker_op_start (struct nn_worker_op *self, int zeroiserror);
+/*  Call this function when asynchronous operation is started. */
+void nn_worker_op_start (struct nn_worker_op *self);
 
 int nn_worker_op_isidle (struct nn_worker_op *self);
 
diff --git a/src/aio/worker_win.inc b/src/aio/worker_win.inc
index d62028a..4e4b95e 100644
--- a/src/aio/worker_win.inc
+++ b/src/aio/worker_win.inc
@@ -1,6 +1,8 @@
 /*
     Copyright (c) 2013 Martin Sustrik  All rights reserved.
     Copyright (c) 2013 GoPivotal, Inc.  All rights reserved.
+    Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+    Copyright 2018 Capitar IT Group BV <info@capitar.com>
 
     Permission is hereby granted, free of charge, to any person obtaining a copy
     of this software and associated documentation files (the "Software"),
@@ -32,7 +34,6 @@
 
 #define NN_WORKER_OP_STATE_IDLE 1
 #define NN_WORKER_OP_STATE_ACTIVE 2
-#define NN_WORKER_OP_STATE_ACTIVE_ZEROISERROR 3
 
 /*  The value of this variable is irrelevant. It's used only as a placeholder
     for the address that is used as the 'stop' event ID. */
@@ -58,6 +59,10 @@
     self->src = src;
     self->owner = owner;
     self->state = NN_WORKER_OP_STATE_IDLE;
+    self->start = NULL;
+    self->buf = NULL;
+    self->resid = 0;
+    self->zero_is_error = 0;
 }
 
 void nn_worker_op_term (struct nn_worker_op *self)
@@ -65,11 +70,9 @@
     nn_assert_state (self, NN_WORKER_OP_STATE_IDLE);
 }
 
-void nn_worker_op_start (struct nn_worker_op *self, int zeroiserror)
+void nn_worker_op_start (struct nn_worker_op *self)
 {
-    nn_assert_state (self, NN_WORKER_OP_STATE_IDLE);
-    self->state = zeroiserror ? NN_WORKER_OP_STATE_ACTIVE_ZEROISERROR :
-        NN_WORKER_OP_STATE_ACTIVE;
+    self->state = NN_WORKER_OP_STATE_ACTIVE;
 }
 
 int nn_worker_op_isidle (struct nn_worker_op *self)
@@ -174,6 +177,7 @@
 
             /*  Process I/O completion events. */
             if (nn_fast (entries [i].lpOverlapped != NULL)) {
+                DWORD nxfer;
                 op = nn_cont (entries [i].lpOverlapped,
                     struct nn_worker_op, olpd);
 
@@ -183,9 +187,31 @@
                 rc = entries [i].Internal & 0xc0000000;
                 switch (rc) {
                 case 0x00000000:
+                     nxfer = entries[i].dwNumberOfBytesTransferred;
+
+                     if ((nxfer == 0) && (op->zero_is_error != 0)) {
+                         rc = NN_WORKER_OP_ERROR;
+                         break;
+                     }
+                     if (op->start != NULL) {
+                         if (nxfer > op->resid) {
+                             rc = NN_WORKER_OP_ERROR;
+                             break;
+                         }
+                         op->resid -= nxfer;
+                         op->buf += nxfer;
+
+                         /*  If we still have more to transfer, keep going. */
+                         if (op->resid != 0) {
+                             op->start (op->arg);
+                             continue;
+                         }
+                     }
                      rc = NN_WORKER_OP_DONE;
                      break;
+
                 case 0xc0000000:
+                     nxfer = 0;
                      rc = NN_WORKER_OP_ERROR;
                      break;
                 default:
@@ -195,11 +221,9 @@
                 /*  Raise the completion event. */
                 nn_ctx_enter (op->owner->ctx);
                 nn_assert (op->state != NN_WORKER_OP_STATE_IDLE);
-                if (rc != NN_WORKER_OP_ERROR &&
-                      op->state == NN_WORKER_OP_STATE_ACTIVE_ZEROISERROR &&
-                      entries [i].dwNumberOfBytesTransferred == 0)
-                    rc = NN_WORKER_OP_ERROR;
+
                 op->state = NN_WORKER_OP_STATE_IDLE;
+
                 nn_fsm_feed (op->owner, op->src, rc, op);
                 nn_ctx_leave (op->owner->ctx);