usock state machine improved

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/aio/usock.c b/src/aio/usock.c
index cb4ae0b..3bf6202 100644
--- a/src/aio/usock.c
+++ b/src/aio/usock.c
@@ -33,9 +33,19 @@
 #define NN_USOCK_STATE_STARTING 1
 #define NN_USOCK_STATE_CONNECTING 2
 #define NN_USOCK_STATE_CONNECTED 3
-#define NN_USOCK_STATE_ACCEPTING 4
-#define NN_USOCK_STATE_CLOSING 5
-#define NN_USOCK_STATE_CLOSED 6
+#define NN_USOCK_STATE_CONNECT_ERROR 4
+#define NN_USOCK_STATE_LISTENING 5
+#define NN_USOCK_STATE_ACCEPTING 6
+#define NN_USOCK_STATE_ERROR 7
+#define NN_USOCK_STATE_CLOSING 8
+#define NN_USOCK_STATE_CLOSED 9
+
+#define NN_USOCK_EVENT_CLOSE 1
+#define NN_USOCK_EVENT_ACCEPT 2
+#define NN_USOCK_EVENT_LISTEN 3
+#define NN_USOCK_EVENT_CONNECTED 4
+#define NN_USOCK_EVENT_CONNECT_ERROR 5
+#define NN_USOCK_EVENT_CONNECTING 6
 
 /*  Private functions. */
 static int nn_usock_init_from_fd (struct nn_usock *self, int fd,
@@ -129,7 +139,7 @@
 
     /*  Initialise outgoing tasks. */
     nn_worker_fd_init (&self->wfd, &self->fsm);
-    nn_worker_task_init (&self->task_connect, &self->fsm);
+    nn_worker_task_init (&self->task_connecting, &self->fsm);
     nn_worker_task_init (&self->task_connected, &self->fsm);
     nn_worker_task_init (&self->task_accept, &self->fsm);
     nn_worker_task_init (&self->task_send, &self->fsm);
@@ -144,23 +154,19 @@
     nn_fsm_event_init (&self->event_error, self, NN_USOCK_ERROR);
     nn_fsm_event_init (&self->event_closed, self, NN_USOCK_CLOSED);
 
-    /*  We are not accepting a connection at the moment. */
+    /*  We are not listening at the moment. */
     self->newsock = NULL;
     self->newowner = NULL;
 
     return 0;
 }
 
-void nn_usock_close (struct nn_usock *self)
-{
-    /*  Ask socket to close asynchronously. */
-    nn_worker_execute (self->worker, &self->task_close);
-}
-
 void nn_usock_term (struct nn_usock *self)
 {
     int rc;
 
+    nn_assert (self->state == NN_USOCK_STATE_CLOSED);
+
     if (self->in.batch)
         nn_free (self->in.batch);
 
@@ -175,7 +181,7 @@
     nn_worker_task_term (&self->task_send);
     nn_worker_task_term (&self->task_accept);
     nn_worker_task_term (&self->task_connected);
-    nn_worker_task_term (&self->task_connect);
+    nn_worker_task_term (&self->task_connecting);
     nn_worker_fd_term (&self->wfd);
 
     rc = close (self->s);
@@ -190,11 +196,21 @@
     return nn_fsm_swap_owner (&self->fsm, newowner);
 }
 
+void nn_usock_close (struct nn_usock *self)
+{
+    /*  Ask socket to close asynchronously. */
+    nn_usock_callback (&self->fsm, NULL, NN_USOCK_EVENT_CLOSE);
+}
+
 int nn_usock_setsockopt (struct nn_usock *self, int level, int optname,
     const void *optval, size_t optlen)
 {
     int rc;
 
+    /*  The socket can be modified only before it's connected. */
+    if (nn_slow (self->state != NN_USOCK_STATE_STARTING))
+        return -EFSM;
+
     /*  EINVAL errors are ignored on OSX platform. The reason for that is buggy
         OSX behaviour where setsockopt returns EINVAL if the peer have already
         disconnected. Thus, nn_usock_setsockopt() can succeed on OSX even though
@@ -217,6 +233,10 @@
 {
     int rc;
 
+    /*  The socket can be bound only before it's connected. */
+    if (nn_slow (self->state != NN_USOCK_STATE_STARTING))
+        return -EFSM;
+
     rc = bind (self->s, addr, (socklen_t) addrlen);
     if (nn_slow (rc != 0))
         return -errno;
@@ -228,47 +248,27 @@
 {
     int rc;
 
+    /*  You can start listening only before the socket is connected. */
+    if (nn_slow (self->state != NN_USOCK_STATE_STARTING))
+        return -EFSM;
+
     /*  Start listening for incoming connections. */
     rc = listen (self->s, backlog);
     if (nn_slow (rc != 0))
         return -errno;
 
+    /*  Notify the state machine. */
+    nn_usock_callback (&self->fsm, NULL, NN_USOCK_EVENT_LISTEN);
+
     return 0;
 }
 
 void nn_usock_accept (struct nn_usock *self, struct nn_usock *newsock,
     struct nn_fsm *newowner)
 {
-    int s;
-
-    /*  If newsock is not NULL, other accept is in progress. That should never
-        happen. */
-    nn_assert (!self->newsock);
-
-#if NN_HAVE_ACCEPT4
-    s = accept4 (self->s, NULL, NULL, SOCK_CLOEXEC);
-#else
-    s = accept (self->s, NULL, NULL);
-#endif
-
-    /*  Immediate success. */
-    if (nn_fast (s >= 0)) {
-        nn_usock_init_from_fd (newsock, s, self->newowner);
-        nn_fsm_raise (&self->fsm, &self->event_accepted);
-        return;
-    }
-
-    /*  Unexpected failure. */
-    if (nn_slow (errno != EAGAIN && errno != EWOULDBLOCK &&
-          errno != ECONNABORTED)) {
-        nn_fsm_raise (&self->fsm, &self->event_error);
-        return;
-    }
-
-    /*  Ask the worker thread to wait for the new connection. */
     self->newsock = newsock;
     self->newowner = newowner;
-    nn_worker_execute (self->worker, &self->task_accept);
+    nn_usock_callback (&self->fsm, NULL, NN_USOCK_EVENT_ACCEPT);
 }
 
 void nn_usock_connect (struct nn_usock *self, const struct sockaddr *addr,
@@ -276,28 +276,26 @@
 {
     int rc;
 
+    /*  Fail if the socket is already connected, closed or such. */
+    nn_assert (self->state == NN_USOCK_STATE_STARTING);
+
     /* Do the connect itself. */
     rc = connect (self->s, addr, (socklen_t) addrlen);
 
     /* Immediate success. */
     if (nn_fast (rc == 0)) {
-
-        /*  Ask worker thread to start polling on the socket. */
-        nn_worker_execute (self->worker, &self->task_connected);
-
-        /*  Notify the user that the connection is established. */
-        nn_fsm_raise (&self->fsm, &self->event_connected);
+        nn_usock_callback (&self->fsm, NULL, NN_USOCK_EVENT_CONNECTED);
         return;
     }
 
-    /* Return unexpected errors to the caller. Notify the user about it. */
+    /*  Error. */
     if (nn_slow (errno != EINPROGRESS)) {
-        nn_fsm_raise (&self->fsm, &self->event_error);
+        nn_usock_callback (&self->fsm, NULL, NN_USOCK_EVENT_CONNECT_ERROR);
         return;
     }
 
-    /*  Ask worker thread to start waiting for connection establishment. */
-    nn_worker_execute (self->worker, &self->task_connect);
+    /*  Async connect. */
+    nn_usock_callback (&self->fsm, NULL, NN_USOCK_EVENT_CONNECTING);
 }
 
 void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov,
@@ -307,6 +305,9 @@
     int i;
     int out;
 
+    /*  Make sure that the socket is actually alive. */
+    nn_assert (self->state == NN_USOCK_STATE_CONNECTED);
+
     /*  Copy the iovecs to the socket. */
     nn_assert (iovcnt <= NN_USOCK_MAX_IOVCNT);
     self->out.hdr.msg_iov = self->out.iov;
@@ -332,6 +333,7 @@
     /*  Errors. */
     if (nn_slow (rc != -EAGAIN)) {
         errnum_assert (rc == -ECONNRESET, -rc);
+        self->state = NN_USOCK_STATE_ERROR;
         nn_fsm_raise (&self->fsm, &self->event_error);
         return;
     }
@@ -345,11 +347,15 @@
     int rc;
     size_t nbytes;
 
+    /*  Make sure that the socket is actually alive. */
+    nn_assert (self->state == NN_USOCK_STATE_CONNECTED);
+
     /*  Try to receive the data immediately. */
     nbytes = len;
     rc = nn_usock_recv_raw (self, buf, &nbytes);
     if (nn_slow (rc < 0)) {
         errnum_assert (rc == -ECONNRESET, -rc);
+        self->state = NN_USOCK_STATE_ERROR;
         nn_fsm_raise (&self->fsm, &self->event_error);
         return;
     }
@@ -377,45 +383,84 @@
 
     usock = nn_cont (self, struct nn_usock, fsm);
 
-    /*  Close event is processed in the same way not depending on the state
-        the usock is in. */
-    if (source == &usock->task_close) {
-        nn_worker_rm_fd (usock->worker, &usock->wfd);
-        nn_fsm_raise (&usock->fsm, &usock->event_closed);
-        nn_usock_term (usock);
+    /*  Internal tasks sent from the user thread to the worker thread. */
+    if (source == &usock->task_send) {
+        nn_assert (type == NN_WORKER_TASK_EXECUTE);
+        nn_assert (usock->state == NN_USOCK_STATE_CONNECTED ||
+            usock->state == NN_USOCK_STATE_CLOSING);
+        nn_worker_set_out (usock->worker, &usock->wfd);
+        return;
+    }
+    if (source == &usock->task_recv) {
+        nn_assert (type == NN_WORKER_TASK_EXECUTE);
+        nn_assert (usock->state == NN_USOCK_STATE_CONNECTED ||
+            usock->state == NN_USOCK_STATE_CLOSING);
+        nn_worker_set_in (usock->worker, &usock->wfd);
+        return;
+    }
+    if (source == &usock->task_connected) {
+        nn_assert (type == NN_WORKER_TASK_EXECUTE);
+        nn_assert (usock->state == NN_USOCK_STATE_CONNECTED ||
+            usock->state == NN_USOCK_STATE_CLOSING);
+        nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
+        return;
+    }
+    if (source == &usock->task_connecting) {
+        nn_assert (type == NN_WORKER_TASK_EXECUTE);
+        nn_assert (usock->state == NN_USOCK_STATE_CONNECTING ||
+            usock->state == NN_USOCK_STATE_CLOSING);
+        nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
+        nn_worker_set_out (usock->worker, &usock->wfd);
+        return;
+    }
+    if (source == &usock->task_accept) {
+        nn_assert (type == NN_WORKER_TASK_EXECUTE);
+        nn_assert (usock->state == NN_USOCK_STATE_ACCEPTING ||
+            usock->state == NN_USOCK_STATE_CLOSING);
+        nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
+        nn_worker_set_in (usock->worker, &usock->wfd);
         return;
     }
 
+    /*  The state machine itself. */
     switch (usock->state) {
 
 /******************************************************************************/
-/*  STARTING                                                                  */
+/*  STARTING state                                                            */
 /******************************************************************************/
     case NN_USOCK_STATE_STARTING:
-        if (source == &usock->task_connected) {
-            nn_assert (type == NN_WORKER_TASK_EXECUTE);
-            nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
-            usock->state = NN_USOCK_STATE_CONNECTED;
-            return;
-        }
-        if (source == &usock->task_connect) {
-            nn_assert (type == NN_WORKER_TASK_EXECUTE);
-            nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
-            nn_worker_set_out (usock->worker, &usock->wfd);
-            usock->state = NN_USOCK_STATE_CONNECTING;
-            return;
-        }
-        if (source == &usock->task_accept) {
-            nn_assert (type == NN_WORKER_TASK_EXECUTE);
-            nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
-            nn_worker_set_in (usock->worker, &usock->wfd);
-            usock->state = NN_USOCK_STATE_ACCEPTING;
-            return;
+
+        /*  Events from the owner of the usock. */
+        if (source == NULL) {
+            switch (type) {
+            case NN_USOCK_EVENT_LISTEN:
+                usock->state = NN_USOCK_STATE_LISTENING;
+                return;
+            case NN_USOCK_EVENT_CONNECTED:
+                usock->state = NN_USOCK_STATE_CONNECTED;
+                nn_worker_execute (usock->worker, &usock->task_connected);
+                nn_fsm_raise (&usock->fsm, &usock->event_connected);
+                return;
+            case NN_USOCK_EVENT_CONNECT_ERROR:
+                usock->state = NN_USOCK_STATE_CONNECT_ERROR;
+                nn_fsm_raise (&usock->fsm, &usock->event_error);
+                return;
+            case NN_USOCK_EVENT_CONNECTING:
+                usock->state = NN_USOCK_STATE_CONNECTING;
+                nn_worker_execute (usock->worker, &usock->task_connecting);
+                return;
+            case NN_USOCK_EVENT_CLOSE:
+                usock->state = NN_USOCK_STATE_CLOSED;
+                nn_fsm_raise (&usock->fsm, &usock->event_closed);
+                return;
+            default:
+                nn_assert (0);
+            }
         }
         nn_assert (0);
 
 /******************************************************************************/
-/*  CONNECTING                                                                */
+/*  CONNECTING state                                                          */
 /******************************************************************************/ 
     case NN_USOCK_STATE_CONNECTING:
         if (source == &usock->wfd) {
@@ -431,17 +476,77 @@
                 nn_assert (0);
             }
         }
+        if (source == NULL) {
+            nn_assert (type == NN_USOCK_EVENT_CLOSE);
+            nn_assert (0);
+        }
         nn_assert (0);
 
 /******************************************************************************/
-/*  ACCEPTING                                                                 */
+/*  CONNECT_ERROR state                                                       */
+/*  This state means that the connect have failed synchronously and thus,     */
+/*  the socket is not registered with the worker thread. The only thing that  */
+/*  can be done in this state is closing the socket.                          */
+/******************************************************************************/ 
+    case NN_USOCK_STATE_CONNECT_ERROR:
+        if (source == NULL) {
+            nn_assert (type == NN_USOCK_EVENT_CLOSE);
+            usock->state = NN_USOCK_STATE_CLOSED;
+            nn_fsm_raise (&usock->fsm, &usock->event_closed);
+            return;
+        }
+        nn_assert (0);
+
+/******************************************************************************/
+/*  LISTENING state                                                           */
+/******************************************************************************/ 
+    case NN_USOCK_STATE_LISTENING:
+
+        /*  Events from the owner of the usock. */
+        if (source == NULL) {
+            switch (type) {
+            case NN_USOCK_EVENT_ACCEPT:
+
+                /*  Try to accept new connection in synchronous manner. */
+#if NN_HAVE_ACCEPT4
+                s = accept4 (usock->s, NULL, NULL, SOCK_CLOEXEC);
+#else
+                s = accept (usock->s, NULL, NULL);
+#endif
+                /*  Immediate success. */
+                if (nn_fast (s >= 0)) {
+                    nn_usock_init_from_fd (usock->newsock, s, usock->newowner);
+                    nn_fsm_raise (&usock->fsm, &usock->event_accepted);
+                    return;
+                }
+
+                /*  Detect unexpected failure. */
+                errno_assert (errno == EAGAIN && errno == EWOULDBLOCK &&
+                      errno == ECONNABORTED);
+
+                /*  Ask the worker thread to wait for the new connection. */
+                nn_worker_execute (usock->worker, &usock->task_accept);
+                usock->state = NN_USOCK_STATE_ACCEPTING;
+
+                return;
+
+            case NN_USOCK_EVENT_CLOSE:
+                nn_assert (0);
+            default:
+                nn_assert (0);
+            }
+        }
+        nn_assert (0);
+
+/******************************************************************************/
+/*  ACCEPTING state                                                           */
 /******************************************************************************/ 
     case NN_USOCK_STATE_ACCEPTING:
         if (source == &usock->wfd) {
             switch (type) {
             case NN_WORKER_FD_IN:
                 nn_assert (usock->newsock);
-#if NN_HAVE_ACCEPT4
+#if NN_HAVE_ACCEPT4c
                 s = accept4 (usock->s, NULL, NULL, SOCK_CLOEXEC);
 #else
                 s = accept (usock->s, NULL, NULL);
@@ -465,22 +570,16 @@
                 nn_assert (0);
             }
         }
+        if (source == NULL) {
+            nn_assert (type == NN_USOCK_EVENT_CLOSE);
+            nn_assert (0);
+        }
         nn_assert (0);
 
 /******************************************************************************/
-/*  CONNECTED                                                                 */
+/*  CONNECTED state                                                           */
 /******************************************************************************/ 
     case NN_USOCK_STATE_CONNECTED:
-        if (source == &usock->task_send) {
-            nn_assert (type == NN_WORKER_TASK_EXECUTE);
-            nn_worker_set_out (usock->worker, &usock->wfd);
-            return;
-        }
-        if (source == &usock->task_recv) {
-            nn_assert (type == NN_WORKER_TASK_EXECUTE);
-            nn_worker_set_in (usock->worker, &usock->wfd);
-            return;
-        }
         if (source == &usock->wfd) {
             switch (type) {
             case NN_WORKER_FD_IN:
@@ -495,6 +594,7 @@
                     return;
                 }
                 errnum_assert (rc == -ECONNRESET, -rc);
+                usock->state = NN_USOCK_STATE_ERROR;
                 nn_fsm_raise (&usock->fsm, &usock->event_error);
                 return;
             case NN_WORKER_FD_OUT:
@@ -507,6 +607,7 @@
                 if (nn_fast (rc == -EAGAIN))
                     return;
                 errnum_assert (rc == -ECONNRESET, -rc);
+                usock->state = NN_USOCK_STATE_ERROR;
                 nn_fsm_raise (&usock->fsm, &usock->event_error);
                 return;
             case NN_WORKER_FD_ERR:
@@ -515,6 +616,53 @@
                 nn_assert (0);
             }
         }
+        if (source == NULL) {
+            nn_assert (type == NN_USOCK_EVENT_CLOSE);
+            nn_assert (0);
+        }
+        nn_assert (0);
+
+/******************************************************************************/
+/*  ERROR state                                                               */
+/******************************************************************************/ 
+    case NN_USOCK_STATE_ERROR:
+        if (source == NULL) {
+            nn_assert (type == NN_USOCK_EVENT_CLOSE);
+            usock->state = NN_USOCK_STATE_CLOSING;
+            nn_worker_execute (usock->worker, &usock->task_close);
+            return;
+        }
+        nn_assert (0);
+
+/******************************************************************************/
+/*  CLOSING state                                                             */
+/******************************************************************************/ 
+    case NN_USOCK_STATE_CLOSING:
+
+        /*  The close request was delivered to the worker thread. We can now
+            remove the fd from the poller and notify user that the socket is
+            actually closed. */
+        if (source == &usock->task_close) {
+            nn_assert (usock->state == NN_USOCK_STATE_CLOSING);
+            nn_worker_rm_fd (usock->worker, &usock->wfd);
+            usock->state = NN_USOCK_STATE_CLOSED;
+            nn_fsm_raise (&usock->fsm, &usock->event_closed);
+            return;
+        }
+
+        /*  While closing the socket we may get some delayed events from
+            the worker thread. We can simply ignore those. */
+        if (source == &usock->wfd)
+            return;
+
+        nn_assert (0);
+
+/******************************************************************************/
+/*  CLOSED state                                                               */
+/******************************************************************************/ 
+    case NN_USOCK_STATE_CLOSED:
+
+        /*  Nothing should happen in the CLOSED state. */
         nn_assert (0);
 
 /******************************************************************************/
@@ -671,3 +819,4 @@
     nn_assert (optsz == sizeof (opt));
     return opt;
 }
+
diff --git a/src/aio/usock.h b/src/aio/usock.h
index f0e1a4a..7845b50 100644
--- a/src/aio/usock.h
+++ b/src/aio/usock.h
@@ -94,7 +94,7 @@
     } out;
 
     /*  Asynchronous tasks for the worker. */
-    struct nn_worker_task task_connect;
+    struct nn_worker_task task_connecting;
     struct nn_worker_task task_connected;
     struct nn_worker_task task_accept;
     struct nn_worker_task task_send;
diff --git a/src/transports/utils/cstream.c b/src/transports/utils/cstream.c
index 14c8150..9d3b386 100644
--- a/src/transports/utils/cstream.c
+++ b/src/transports/utils/cstream.c
@@ -232,7 +232,8 @@
             switch (type) {
             case NN_USOCK_CLOSED:
                 cstream->state = NN_CSTREAM_STATE_CLOSED;
-                /*  TODO: Notify the owner. */
+        
+                nn_assert (0);
                 return;
             default:
                 nn_assert (0);