usock state machine improved
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/aio/usock.c b/src/aio/usock.c
index cb4ae0b..3bf6202 100644
--- a/src/aio/usock.c
+++ b/src/aio/usock.c
@@ -33,9 +33,19 @@
#define NN_USOCK_STATE_STARTING 1
#define NN_USOCK_STATE_CONNECTING 2
#define NN_USOCK_STATE_CONNECTED 3
-#define NN_USOCK_STATE_ACCEPTING 4
-#define NN_USOCK_STATE_CLOSING 5
-#define NN_USOCK_STATE_CLOSED 6
+#define NN_USOCK_STATE_CONNECT_ERROR 4
+#define NN_USOCK_STATE_LISTENING 5
+#define NN_USOCK_STATE_ACCEPTING 6
+#define NN_USOCK_STATE_ERROR 7
+#define NN_USOCK_STATE_CLOSING 8
+#define NN_USOCK_STATE_CLOSED 9
+
+#define NN_USOCK_EVENT_CLOSE 1
+#define NN_USOCK_EVENT_ACCEPT 2
+#define NN_USOCK_EVENT_LISTEN 3
+#define NN_USOCK_EVENT_CONNECTED 4
+#define NN_USOCK_EVENT_CONNECT_ERROR 5
+#define NN_USOCK_EVENT_CONNECTING 6
/* Private functions. */
static int nn_usock_init_from_fd (struct nn_usock *self, int fd,
@@ -129,7 +139,7 @@
/* Initialise outgoing tasks. */
nn_worker_fd_init (&self->wfd, &self->fsm);
- nn_worker_task_init (&self->task_connect, &self->fsm);
+ nn_worker_task_init (&self->task_connecting, &self->fsm);
nn_worker_task_init (&self->task_connected, &self->fsm);
nn_worker_task_init (&self->task_accept, &self->fsm);
nn_worker_task_init (&self->task_send, &self->fsm);
@@ -144,23 +154,19 @@
nn_fsm_event_init (&self->event_error, self, NN_USOCK_ERROR);
nn_fsm_event_init (&self->event_closed, self, NN_USOCK_CLOSED);
- /* We are not accepting a connection at the moment. */
+ /* We are not listening at the moment. */
self->newsock = NULL;
self->newowner = NULL;
return 0;
}
-void nn_usock_close (struct nn_usock *self)
-{
- /* Ask socket to close asynchronously. */
- nn_worker_execute (self->worker, &self->task_close);
-}
-
void nn_usock_term (struct nn_usock *self)
{
int rc;
+ nn_assert (self->state == NN_USOCK_STATE_CLOSED);
+
if (self->in.batch)
nn_free (self->in.batch);
@@ -175,7 +181,7 @@
nn_worker_task_term (&self->task_send);
nn_worker_task_term (&self->task_accept);
nn_worker_task_term (&self->task_connected);
- nn_worker_task_term (&self->task_connect);
+ nn_worker_task_term (&self->task_connecting);
nn_worker_fd_term (&self->wfd);
rc = close (self->s);
@@ -190,11 +196,21 @@
return nn_fsm_swap_owner (&self->fsm, newowner);
}
+void nn_usock_close (struct nn_usock *self)
+{
+ /* Ask socket to close asynchronously. */
+ nn_usock_callback (&self->fsm, NULL, NN_USOCK_EVENT_CLOSE);
+}
+
int nn_usock_setsockopt (struct nn_usock *self, int level, int optname,
const void *optval, size_t optlen)
{
int rc;
+ /* The socket can be modified only before it's connected. */
+ if (nn_slow (self->state != NN_USOCK_STATE_STARTING))
+ return -EFSM;
+
/* EINVAL errors are ignored on OSX platform. The reason for that is buggy
OSX behaviour where setsockopt returns EINVAL if the peer have already
disconnected. Thus, nn_usock_setsockopt() can succeed on OSX even though
@@ -217,6 +233,10 @@
{
int rc;
+ /* The socket can be bound only before it's connected. */
+ if (nn_slow (self->state != NN_USOCK_STATE_STARTING))
+ return -EFSM;
+
rc = bind (self->s, addr, (socklen_t) addrlen);
if (nn_slow (rc != 0))
return -errno;
@@ -228,47 +248,27 @@
{
int rc;
+ /* You can start listening only before the socket is connected. */
+ if (nn_slow (self->state != NN_USOCK_STATE_STARTING))
+ return -EFSM;
+
/* Start listening for incoming connections. */
rc = listen (self->s, backlog);
if (nn_slow (rc != 0))
return -errno;
+ /* Notify the state machine. */
+ nn_usock_callback (&self->fsm, NULL, NN_USOCK_EVENT_LISTEN);
+
return 0;
}
void nn_usock_accept (struct nn_usock *self, struct nn_usock *newsock,
struct nn_fsm *newowner)
{
- int s;
-
- /* If newsock is not NULL, other accept is in progress. That should never
- happen. */
- nn_assert (!self->newsock);
-
-#if NN_HAVE_ACCEPT4
- s = accept4 (self->s, NULL, NULL, SOCK_CLOEXEC);
-#else
- s = accept (self->s, NULL, NULL);
-#endif
-
- /* Immediate success. */
- if (nn_fast (s >= 0)) {
- nn_usock_init_from_fd (newsock, s, self->newowner);
- nn_fsm_raise (&self->fsm, &self->event_accepted);
- return;
- }
-
- /* Unexpected failure. */
- if (nn_slow (errno != EAGAIN && errno != EWOULDBLOCK &&
- errno != ECONNABORTED)) {
- nn_fsm_raise (&self->fsm, &self->event_error);
- return;
- }
-
- /* Ask the worker thread to wait for the new connection. */
self->newsock = newsock;
self->newowner = newowner;
- nn_worker_execute (self->worker, &self->task_accept);
+ nn_usock_callback (&self->fsm, NULL, NN_USOCK_EVENT_ACCEPT);
}
void nn_usock_connect (struct nn_usock *self, const struct sockaddr *addr,
@@ -276,28 +276,26 @@
{
int rc;
+ /* Fail if the socket is already connected, closed or such. */
+ nn_assert (self->state == NN_USOCK_STATE_STARTING);
+
/* Do the connect itself. */
rc = connect (self->s, addr, (socklen_t) addrlen);
/* Immediate success. */
if (nn_fast (rc == 0)) {
-
- /* Ask worker thread to start polling on the socket. */
- nn_worker_execute (self->worker, &self->task_connected);
-
- /* Notify the user that the connection is established. */
- nn_fsm_raise (&self->fsm, &self->event_connected);
+ nn_usock_callback (&self->fsm, NULL, NN_USOCK_EVENT_CONNECTED);
return;
}
- /* Return unexpected errors to the caller. Notify the user about it. */
+ /* Error. */
if (nn_slow (errno != EINPROGRESS)) {
- nn_fsm_raise (&self->fsm, &self->event_error);
+ nn_usock_callback (&self->fsm, NULL, NN_USOCK_EVENT_CONNECT_ERROR);
return;
}
- /* Ask worker thread to start waiting for connection establishment. */
- nn_worker_execute (self->worker, &self->task_connect);
+ /* Async connect. */
+ nn_usock_callback (&self->fsm, NULL, NN_USOCK_EVENT_CONNECTING);
}
void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov,
@@ -307,6 +305,9 @@
int i;
int out;
+ /* Make sure that the socket is actually alive. */
+ nn_assert (self->state == NN_USOCK_STATE_CONNECTED);
+
/* Copy the iovecs to the socket. */
nn_assert (iovcnt <= NN_USOCK_MAX_IOVCNT);
self->out.hdr.msg_iov = self->out.iov;
@@ -332,6 +333,7 @@
/* Errors. */
if (nn_slow (rc != -EAGAIN)) {
errnum_assert (rc == -ECONNRESET, -rc);
+ self->state = NN_USOCK_STATE_ERROR;
nn_fsm_raise (&self->fsm, &self->event_error);
return;
}
@@ -345,11 +347,15 @@
int rc;
size_t nbytes;
+ /* Make sure that the socket is actually alive. */
+ nn_assert (self->state == NN_USOCK_STATE_CONNECTED);
+
/* Try to receive the data immediately. */
nbytes = len;
rc = nn_usock_recv_raw (self, buf, &nbytes);
if (nn_slow (rc < 0)) {
errnum_assert (rc == -ECONNRESET, -rc);
+ self->state = NN_USOCK_STATE_ERROR;
nn_fsm_raise (&self->fsm, &self->event_error);
return;
}
@@ -377,45 +383,84 @@
usock = nn_cont (self, struct nn_usock, fsm);
- /* Close event is processed in the same way not depending on the state
- the usock is in. */
- if (source == &usock->task_close) {
- nn_worker_rm_fd (usock->worker, &usock->wfd);
- nn_fsm_raise (&usock->fsm, &usock->event_closed);
- nn_usock_term (usock);
+ /* Internal tasks sent from the user thread to the worker thread. */
+ if (source == &usock->task_send) {
+ nn_assert (type == NN_WORKER_TASK_EXECUTE);
+ nn_assert (usock->state == NN_USOCK_STATE_CONNECTED ||
+ usock->state == NN_USOCK_STATE_CLOSING);
+ nn_worker_set_out (usock->worker, &usock->wfd);
+ return;
+ }
+ if (source == &usock->task_recv) {
+ nn_assert (type == NN_WORKER_TASK_EXECUTE);
+ nn_assert (usock->state == NN_USOCK_STATE_CONNECTED ||
+ usock->state == NN_USOCK_STATE_CLOSING);
+ nn_worker_set_in (usock->worker, &usock->wfd);
+ return;
+ }
+ if (source == &usock->task_connected) {
+ nn_assert (type == NN_WORKER_TASK_EXECUTE);
+ nn_assert (usock->state == NN_USOCK_STATE_CONNECTED ||
+ usock->state == NN_USOCK_STATE_CLOSING);
+ nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
+ return;
+ }
+ if (source == &usock->task_connecting) {
+ nn_assert (type == NN_WORKER_TASK_EXECUTE);
+ nn_assert (usock->state == NN_USOCK_STATE_CONNECTING ||
+ usock->state == NN_USOCK_STATE_CLOSING);
+ nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
+ nn_worker_set_out (usock->worker, &usock->wfd);
+ return;
+ }
+ if (source == &usock->task_accept) {
+ nn_assert (type == NN_WORKER_TASK_EXECUTE);
+ nn_assert (usock->state == NN_USOCK_STATE_ACCEPTING ||
+ usock->state == NN_USOCK_STATE_CLOSING);
+ nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
+ nn_worker_set_in (usock->worker, &usock->wfd);
return;
}
+ /* The state machine itself. */
switch (usock->state) {
/******************************************************************************/
-/* STARTING */
+/* STARTING state */
/******************************************************************************/
case NN_USOCK_STATE_STARTING:
- if (source == &usock->task_connected) {
- nn_assert (type == NN_WORKER_TASK_EXECUTE);
- nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
- usock->state = NN_USOCK_STATE_CONNECTED;
- return;
- }
- if (source == &usock->task_connect) {
- nn_assert (type == NN_WORKER_TASK_EXECUTE);
- nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
- nn_worker_set_out (usock->worker, &usock->wfd);
- usock->state = NN_USOCK_STATE_CONNECTING;
- return;
- }
- if (source == &usock->task_accept) {
- nn_assert (type == NN_WORKER_TASK_EXECUTE);
- nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
- nn_worker_set_in (usock->worker, &usock->wfd);
- usock->state = NN_USOCK_STATE_ACCEPTING;
- return;
+
+ /* Events from the owner of the usock. */
+ if (source == NULL) {
+ switch (type) {
+ case NN_USOCK_EVENT_LISTEN:
+ usock->state = NN_USOCK_STATE_LISTENING;
+ return;
+ case NN_USOCK_EVENT_CONNECTED:
+ usock->state = NN_USOCK_STATE_CONNECTED;
+ nn_worker_execute (usock->worker, &usock->task_connected);
+ nn_fsm_raise (&usock->fsm, &usock->event_connected);
+ return;
+ case NN_USOCK_EVENT_CONNECT_ERROR:
+ usock->state = NN_USOCK_STATE_CONNECT_ERROR;
+ nn_fsm_raise (&usock->fsm, &usock->event_error);
+ return;
+ case NN_USOCK_EVENT_CONNECTING:
+ usock->state = NN_USOCK_STATE_CONNECTING;
+ nn_worker_execute (usock->worker, &usock->task_connecting);
+ return;
+ case NN_USOCK_EVENT_CLOSE:
+ usock->state = NN_USOCK_STATE_CLOSED;
+ nn_fsm_raise (&usock->fsm, &usock->event_closed);
+ return;
+ default:
+ nn_assert (0);
+ }
}
nn_assert (0);
/******************************************************************************/
-/* CONNECTING */
+/* CONNECTING state */
/******************************************************************************/
case NN_USOCK_STATE_CONNECTING:
if (source == &usock->wfd) {
@@ -431,17 +476,77 @@
nn_assert (0);
}
}
+ if (source == NULL) {
+ nn_assert (type == NN_USOCK_EVENT_CLOSE);
+ nn_assert (0);
+ }
nn_assert (0);
/******************************************************************************/
-/* ACCEPTING */
+/* CONNECT_ERROR state */
+/* This state means that the connect have failed synchronously and thus, */
+/* the socket is not registered with the worker thread. The only thing that */
+/* can be done in this state is closing the socket. */
+/******************************************************************************/
+ case NN_USOCK_STATE_CONNECT_ERROR:
+ if (source == NULL) {
+ nn_assert (type == NN_USOCK_EVENT_CLOSE);
+ usock->state = NN_USOCK_STATE_CLOSED;
+ nn_fsm_raise (&usock->fsm, &usock->event_closed);
+ return;
+ }
+ nn_assert (0);
+
+/******************************************************************************/
+/* LISTENING state */
+/******************************************************************************/
+ case NN_USOCK_STATE_LISTENING:
+
+ /* Events from the owner of the usock. */
+ if (source == NULL) {
+ switch (type) {
+ case NN_USOCK_EVENT_ACCEPT:
+
+ /* Try to accept new connection in synchronous manner. */
+#if NN_HAVE_ACCEPT4
+ s = accept4 (usock->s, NULL, NULL, SOCK_CLOEXEC);
+#else
+ s = accept (usock->s, NULL, NULL);
+#endif
+ /* Immediate success. */
+ if (nn_fast (s >= 0)) {
+ nn_usock_init_from_fd (usock->newsock, s, usock->newowner);
+ nn_fsm_raise (&usock->fsm, &usock->event_accepted);
+ return;
+ }
+
+ /* Detect unexpected failure. */
+ errno_assert (errno == EAGAIN && errno == EWOULDBLOCK &&
+ errno == ECONNABORTED);
+
+ /* Ask the worker thread to wait for the new connection. */
+ nn_worker_execute (usock->worker, &usock->task_accept);
+ usock->state = NN_USOCK_STATE_ACCEPTING;
+
+ return;
+
+ case NN_USOCK_EVENT_CLOSE:
+ nn_assert (0);
+ default:
+ nn_assert (0);
+ }
+ }
+ nn_assert (0);
+
+/******************************************************************************/
+/* ACCEPTING state */
/******************************************************************************/
case NN_USOCK_STATE_ACCEPTING:
if (source == &usock->wfd) {
switch (type) {
case NN_WORKER_FD_IN:
nn_assert (usock->newsock);
-#if NN_HAVE_ACCEPT4
+#if NN_HAVE_ACCEPT4c
s = accept4 (usock->s, NULL, NULL, SOCK_CLOEXEC);
#else
s = accept (usock->s, NULL, NULL);
@@ -465,22 +570,16 @@
nn_assert (0);
}
}
+ if (source == NULL) {
+ nn_assert (type == NN_USOCK_EVENT_CLOSE);
+ nn_assert (0);
+ }
nn_assert (0);
/******************************************************************************/
-/* CONNECTED */
+/* CONNECTED state */
/******************************************************************************/
case NN_USOCK_STATE_CONNECTED:
- if (source == &usock->task_send) {
- nn_assert (type == NN_WORKER_TASK_EXECUTE);
- nn_worker_set_out (usock->worker, &usock->wfd);
- return;
- }
- if (source == &usock->task_recv) {
- nn_assert (type == NN_WORKER_TASK_EXECUTE);
- nn_worker_set_in (usock->worker, &usock->wfd);
- return;
- }
if (source == &usock->wfd) {
switch (type) {
case NN_WORKER_FD_IN:
@@ -495,6 +594,7 @@
return;
}
errnum_assert (rc == -ECONNRESET, -rc);
+ usock->state = NN_USOCK_STATE_ERROR;
nn_fsm_raise (&usock->fsm, &usock->event_error);
return;
case NN_WORKER_FD_OUT:
@@ -507,6 +607,7 @@
if (nn_fast (rc == -EAGAIN))
return;
errnum_assert (rc == -ECONNRESET, -rc);
+ usock->state = NN_USOCK_STATE_ERROR;
nn_fsm_raise (&usock->fsm, &usock->event_error);
return;
case NN_WORKER_FD_ERR:
@@ -515,6 +616,53 @@
nn_assert (0);
}
}
+ if (source == NULL) {
+ nn_assert (type == NN_USOCK_EVENT_CLOSE);
+ nn_assert (0);
+ }
+ nn_assert (0);
+
+/******************************************************************************/
+/* ERROR state */
+/******************************************************************************/
+ case NN_USOCK_STATE_ERROR:
+ if (source == NULL) {
+ nn_assert (type == NN_USOCK_EVENT_CLOSE);
+ usock->state = NN_USOCK_STATE_CLOSING;
+ nn_worker_execute (usock->worker, &usock->task_close);
+ return;
+ }
+ nn_assert (0);
+
+/******************************************************************************/
+/* CLOSING state */
+/******************************************************************************/
+ case NN_USOCK_STATE_CLOSING:
+
+ /* The close request was delivered to the worker thread. We can now
+ remove the fd from the poller and notify user that the socket is
+ actually closed. */
+ if (source == &usock->task_close) {
+ nn_assert (usock->state == NN_USOCK_STATE_CLOSING);
+ nn_worker_rm_fd (usock->worker, &usock->wfd);
+ usock->state = NN_USOCK_STATE_CLOSED;
+ nn_fsm_raise (&usock->fsm, &usock->event_closed);
+ return;
+ }
+
+ /* While closing the socket we may get some delayed events from
+ the worker thread. We can simply ignore those. */
+ if (source == &usock->wfd)
+ return;
+
+ nn_assert (0);
+
+/******************************************************************************/
+/* CLOSED state */
+/******************************************************************************/
+ case NN_USOCK_STATE_CLOSED:
+
+ /* Nothing should happen in the CLOSED state. */
nn_assert (0);
/******************************************************************************/
@@ -671,3 +819,4 @@
nn_assert (optsz == sizeof (opt));
return opt;
}
+
diff --git a/src/aio/usock.h b/src/aio/usock.h
index f0e1a4a..7845b50 100644
--- a/src/aio/usock.h
+++ b/src/aio/usock.h
@@ -94,7 +94,7 @@
} out;
/* Asynchronous tasks for the worker. */
- struct nn_worker_task task_connect;
+ struct nn_worker_task task_connecting;
struct nn_worker_task task_connected;
struct nn_worker_task task_accept;
struct nn_worker_task task_send;
diff --git a/src/transports/utils/cstream.c b/src/transports/utils/cstream.c
index 14c8150..9d3b386 100644
--- a/src/transports/utils/cstream.c
+++ b/src/transports/utils/cstream.c
@@ -232,7 +232,8 @@
switch (type) {
case NN_USOCK_CLOSED:
cstream->state = NN_CSTREAM_STATE_CLOSED;
- /* TODO: Notify the owner. */
+
+ nn_assert (0);
return;
default:
nn_assert (0);