Simple callbacks turned into full-blown state machines

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4d2031b..24a7afd 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -49,10 +49,10 @@
     core/device.c
     core/symbol.c
 
-    aio/callback.h
-    aio/callback.c
     aio/ctx.h
     aio/ctx.c
+    aio/fsm.h
+    aio/fsm.c
     aio/poller.h
     aio/poller.c
     aio/poller_epoll.inc
diff --git a/src/aio/callback.c b/src/aio/callback.c
deleted file mode 100644
index ccbe5f0..0000000
--- a/src/aio/callback.c
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-    Copyright (c) 2013 250bpm s.r.o.
-
-    Permission is hereby granted, free of charge, to any person obtaining a copy
-    of this software and associated documentation files (the "Software"),
-    to deal in the Software without restriction, including without limitation
-    the rights to use, copy, modify, merge, publish, distribute, sublicense,
-    and/or sell copies of the Software, and to permit persons to whom
-    the Software is furnished to do so, subject to the following conditions:
-
-    The above copyright notice and this permission notice shall be included
-    in all copies or substantial portions of the Software.
-
-    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
-    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
-    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
-    IN THE SOFTWARE.
-*/
-
-#include "callback.h"
-
-void nn_callback_init (struct nn_callback *self, nn_callback_fn fn)
-{
-    self->fn = fn;
-}
-
-void nn_callback_term (struct nn_callback *self)
-{
-}
-
diff --git a/src/aio/callback.h b/src/aio/callback.h
deleted file mode 100644
index 4807be5..0000000
--- a/src/aio/callback.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
-    Copyright (c) 2013 250bpm s.r.o.
-
-    Permission is hereby granted, free of charge, to any person obtaining a copy
-    of this software and associated documentation files (the "Software"),
-    to deal in the Software without restriction, including without limitation
-    the rights to use, copy, modify, merge, publish, distribute, sublicense,
-    and/or sell copies of the Software, and to permit persons to whom
-    the Software is furnished to do so, subject to the following conditions:
-
-    The above copyright notice and this permission notice shall be included
-    in all copies or substantial portions of the Software.
-
-    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
-    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
-    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
-    IN THE SOFTWARE.
-*/
-
-#ifndef NN_CALLBACK_INCLUDED
-#define NN_CALLBACK_INCLUDED
-
-/*  Base class for objects accepting callbacks. */
-
-struct nn_callback;
-
-/*  Virtual function to be implemented by the derived class to handle the
-    callbacks. 'source' parameter is pointer to the object that generated
-    the callback. As it can be any kind of object it is of type void*.
-    The user should check whether the pointer points to any source of
-    callbacks it is aware of and cast the pointer accordingly. If a single
-    object can generate different kinds of callbacks, 'type' parameter
-    specifies the callback type. Possible values are defined by the object
-    that is the source of callbacks. */
-typedef void (*nn_callback_fn) (struct nn_callback *self, void *source,
-    int type);
-
-struct nn_callback {
-    nn_callback_fn fn;
-};
-
-void nn_callback_init (struct nn_callback *self, nn_callback_fn fn);
-void nn_callback_term (struct nn_callback *self);
-
-#endif
-
diff --git a/src/aio/ctx.c b/src/aio/ctx.c
index a6f5e41..a8fc698 100644
--- a/src/aio/ctx.c
+++ b/src/aio/ctx.c
@@ -25,30 +25,16 @@
 #include "../utils/err.h"
 #include "../utils/cont.h"
 
-void nn_ctx_task_init (struct nn_ctx_task *self, struct nn_callback *callback,
-    void *source, int type)
-{
-    self->callback = callback;
-    self->source = source;
-    self->type = type;
-    nn_queue_item_init (&self->item);
-}
-
-void nn_ctx_task_term (struct nn_ctx_task *self)
-{
-    nn_queue_item_term (&self->item);
-}
-
 void nn_ctx_init (struct nn_ctx *self, struct nn_pool *pool)
 {
     nn_mutex_init (&self->sync);
     self->pool = pool;
-    nn_queue_init (&self->tasks);
+    nn_queue_init (&self->events);
 }
 
 void nn_ctx_term (struct nn_ctx *self)
 {
-    nn_queue_term (&self->tasks);
+    nn_queue_term (&self->events);
     nn_mutex_term (&self->sync);
 }
 
@@ -59,14 +45,15 @@
 
 void nn_ctx_leave (struct nn_ctx *self)
 {
-    struct nn_ctx_task *task;
+    struct nn_fsm_event *event;
 
-    /*  Process any queued tasks before leaving the AIO context. */
+    /*  Process any queued events before leaving the AIO context. */
     while (1) {
-        task = nn_cont (nn_queue_pop (&self->tasks), struct nn_ctx_task, item);
-        if (!task)
+        event = nn_cont (nn_queue_pop (&self->events),
+            struct nn_fsm_event, item);
+        if (!event)
             break;
-        task->callback->fn (task->callback, task->source, task->type);
+        event->fsm->fn (event->fsm, event->source, event->type);
     }
 
     nn_mutex_unlock (&self->sync);
@@ -77,8 +64,8 @@
     return nn_pool_choose_worker (self->pool);
 }
 
-void nn_ctx_execute (struct nn_ctx *self, struct nn_ctx_task *task)
+void nn_ctx_raise (struct nn_ctx *self, struct nn_fsm_event *event)
 {
-    nn_queue_push (&self->tasks, &task->item);
+    nn_queue_push (&self->events, &event->item);
 }
 
diff --git a/src/aio/ctx.h b/src/aio/ctx.h
index a9750ed..2a04c27 100644
--- a/src/aio/ctx.h
+++ b/src/aio/ctx.h
@@ -27,26 +27,15 @@
 #include "../utils/queue.h"
 
 #include "worker.h"
-#include "callback.h"
 #include "pool.h"
+#include "fsm.h"
 
 /*  AIO context for objects using AIO subsystem. */
 
-struct nn_ctx_task {
-    struct nn_callback *callback;
-    void *source;
-    int type;
-    struct nn_queue_item item;
-};
-
-void nn_ctx_task_init (struct nn_ctx_task *self, struct nn_callback *callback,
-    void *source, int type);
-void nn_ctx_task_term (struct nn_ctx_task *self);
-
 struct nn_ctx {
     struct nn_mutex sync;
     struct nn_pool *pool;
-    struct nn_queue tasks;
+    struct nn_queue events;
 };
 
 void nn_ctx_init (struct nn_ctx *self, struct nn_pool *pool);
@@ -57,7 +46,7 @@
 
 struct nn_worker *nn_ctx_choose_worker (struct nn_ctx *self);
 
-void nn_ctx_execute (struct nn_ctx *self, struct nn_ctx_task *task);
+void nn_ctx_raise (struct nn_ctx *self, struct nn_fsm_event *event);
 
 #endif
 
diff --git a/src/aio/fsm.c b/src/aio/fsm.c
new file mode 100644
index 0000000..dfedaad
--- /dev/null
+++ b/src/aio/fsm.c
@@ -0,0 +1,74 @@
+/*
+    Copyright (c) 2013 250bpm s.r.o.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#include "fsm.h"
+#include "ctx.h"
+
+#include <stddef.h>
+
+void nn_fsm_event_init (struct nn_fsm_event *self, void *source, int type)
+{
+    self->fsm = NULL;
+    self->source = source;
+    self->type = type;
+    nn_queue_item_init (&self->item);
+}
+
+void nn_fsm_event_term (struct nn_fsm_event *self)
+{
+    nn_queue_item_term (&self->item);
+}
+
+void nn_fsm_init_root (struct nn_fsm *self, nn_fsm_fn fn, struct nn_ctx *ctx)
+{
+    self->fn = fn;
+    self->owner = NULL;
+    self->ctx = ctx;
+}
+
+void nn_fsm_init (struct nn_fsm *self, nn_fsm_fn fn, struct nn_fsm *owner)
+{
+    self->fn = fn;
+    self->owner = owner;
+    self->ctx = owner->ctx;
+}
+
+void nn_fsm_term (struct nn_fsm *self)
+{
+}
+
+struct nn_worker *nn_fsm_choose_worker (struct nn_fsm *self)
+{
+    return nn_ctx_choose_worker (self->ctx);
+}
+
+void nn_fsm_raise (struct nn_fsm *self, struct nn_fsm_event *event)
+{
+    event->fsm = self->owner;
+    nn_ctx_raise (self->ctx, event);
+}
+
+void nn_fsm_execute (struct nn_fsm *self, int type)
+{
+    self->fn (self, NULL, type);
+}
+
diff --git a/src/aio/fsm.h b/src/aio/fsm.h
new file mode 100644
index 0000000..1c7ab89
--- /dev/null
+++ b/src/aio/fsm.h
@@ -0,0 +1,72 @@
+/*
+    Copyright (c) 2013 250bpm s.r.o.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#ifndef NN_FSM_INCLUDED
+#define NN_FSM_INCLUDED
+
+#include "../utils/queue.h"
+
+/*  Base class for state machines. */
+
+struct nn_ctx;
+struct nn_fsm;
+struct nn_worker;
+
+struct nn_fsm_event {
+    struct nn_fsm *fsm;
+    void *source;
+    int type;
+    struct nn_queue_item item;
+};
+
+void nn_fsm_event_init (struct nn_fsm_event *self, void *source, int type);
+void nn_fsm_event_term (struct nn_fsm_event *self);
+
+/*  Virtual function to be implemented by the derived class to handle the
+    incoming events. 'source' parameter is pointer to the object that generated
+    the event. As it can be any kind of object it is of type void*.
+    The user should check whether the pointer points to any source of
+    events it is aware of and cast the pointer accordingly. If a single
+    object can generate different kinds of events, 'type' parameter
+    specifies the event type. Possible values are defined by the object
+    that is the source of events. Source equal to NULL means the event is
+    comming from the state machine owner. */
+typedef void (*nn_fsm_fn) (struct nn_fsm *self, void *source, int type);
+
+struct nn_fsm {
+    nn_fsm_fn fn;
+    struct nn_fsm *owner;
+    struct nn_ctx *ctx;
+};
+
+void nn_fsm_init_root (struct nn_fsm *self, nn_fsm_fn fn, struct nn_ctx *ctx);
+void nn_fsm_init (struct nn_fsm *self, nn_fsm_fn fn, struct nn_fsm *owner);
+void nn_fsm_term (struct nn_fsm *self);
+
+struct nn_worker *nn_fsm_choose_worker (struct nn_fsm *self);
+
+void nn_fsm_raise (struct nn_fsm *self, struct nn_fsm_event *event);
+
+void nn_fsm_execute (struct nn_fsm *self, int type);
+
+#endif
+
diff --git a/src/aio/timer.c b/src/aio/timer.c
index 6ab448f..60b603b 100644
--- a/src/aio/timer.c
+++ b/src/aio/timer.c
@@ -26,30 +26,33 @@
 #include "../utils/err.h"
 
 static void nn_timer_term (struct nn_timer *self);
-static void nn_timer_callback (struct nn_callback *self, void *source,
+static void nn_timer_callback (struct nn_fsm *self, void *source,
     int type);
 
-void nn_timer_init (struct nn_timer *self, struct nn_ctx *ctx,
-    struct nn_callback *callback)
+void nn_timer_init (struct nn_timer *self, struct nn_fsm *owner)
 {
-    nn_callback_init (&self->in_callback, nn_timer_callback);
-    self->out_callback = callback;
-    nn_worker_task_init (&self->start_task, &self->in_callback);
-    nn_worker_task_init (&self->stop_task, &self->in_callback);
-    nn_worker_task_init (&self->close_task, &self->in_callback);
-    nn_worker_timer_init (&self->wtimer, &self->in_callback);
-    self->ctx = ctx;
-    self->worker = nn_ctx_choose_worker (ctx);
+    nn_fsm_init (&self->fsm, nn_timer_callback, owner);
+    nn_worker_task_init (&self->start_task, &self->fsm);
+    nn_worker_task_init (&self->stop_task, &self->fsm);
+    nn_worker_task_init (&self->close_task, &self->fsm);
+    nn_worker_timer_init (&self->wtimer, &self->fsm);
+    nn_fsm_event_init (&self->timeout_event, self, NN_TIMER_TIMEOUT);
+    nn_fsm_event_init (&self->stopped_event, self, NN_TIMER_STOPPED);
+    nn_fsm_event_init (&self->closed_event, self, NN_TIMER_CLOSED);
+    self->worker = nn_fsm_choose_worker (&self->fsm);
     self->timeout = -1;
 }
 
 static void nn_timer_term (struct nn_timer *self)
 {
+    nn_fsm_event_term (&self->closed_event);
+    nn_fsm_event_term (&self->stopped_event);
+    nn_fsm_event_term (&self->timeout_event);
     nn_worker_timer_term (&self->wtimer);
     nn_worker_task_term (&self->close_task);
     nn_worker_task_term (&self->stop_task);
     nn_worker_task_term (&self->start_task);
-    nn_callback_term (&self->in_callback);
+    nn_fsm_term (&self->fsm);
 }
 
 void nn_timer_close (struct nn_timer *self)
@@ -77,19 +80,16 @@
     nn_worker_execute (self->worker, &self->start_task);
 }
 
-static void nn_timer_callback (struct nn_callback *self, void *source, int type)
+static void nn_timer_callback (struct nn_fsm *self, void *source, int type)
 {
     struct nn_timer *timer;
-    struct nn_callback *out_callback;
 
-    timer = nn_cont (self, struct nn_timer, in_callback);
+    timer = nn_cont (self, struct nn_timer, fsm);
 
     if (source == &timer->wtimer) {
         nn_assert (timer->timeout > 0);
         timer->timeout = -1;
-        nn_ctx_enter (timer->ctx);
-        timer->out_callback->fn (timer->out_callback, timer, NN_TIMER_TIMEOUT);
-        nn_ctx_leave (timer->ctx);
+        nn_fsm_raise (&timer->fsm, &timer->timeout_event);
         return;
     }
     if (source == &timer->start_task) {
@@ -101,19 +101,14 @@
         nn_assert (timer->timeout > 0);
         timer->timeout = -1;
         nn_worker_rm_timer (timer->worker, &timer->wtimer);
-        nn_ctx_enter (timer->ctx);
-        timer->out_callback->fn (timer->out_callback, timer, NN_TIMER_STOPPED);
-        nn_ctx_leave (timer->ctx);
+        nn_fsm_raise (&timer->fsm, &timer->stopped_event);
         return;
     }
     if (source == &timer->close_task) {
         if (timer->timeout > 0)
             nn_worker_rm_timer (timer->worker, &timer->wtimer);
-        out_callback = timer->out_callback;
+        nn_fsm_raise (&timer->fsm, &timer->closed_event);
         nn_timer_term (timer);
-        nn_ctx_enter (timer->ctx);
-        out_callback->fn (out_callback, timer, NN_TIMER_CLOSED);
-        nn_ctx_leave (timer->ctx);
         return;
     }
     nn_assert (0);
diff --git a/src/aio/timer.h b/src/aio/timer.h
index af4138c..f8a5da2 100644
--- a/src/aio/timer.h
+++ b/src/aio/timer.h
@@ -23,27 +23,27 @@
 #ifndef NN_TIMER_INCLUDED
 #define NN_TIMER_INCLUDED
 
-#include "callback.h"
-#include "ctx.h"
+#include "fsm.h"
+#include "worker.h"
 
 #define NN_TIMER_TIMEOUT 1
 #define NN_TIMER_STOPPED 2
 #define NN_TIMER_CLOSED 3
 
 struct nn_timer {
-    struct nn_callback in_callback;
-    struct nn_callback *out_callback;
+    struct nn_fsm fsm;
     struct nn_worker_task start_task;
     struct nn_worker_task stop_task;
     struct nn_worker_task close_task;
     struct nn_worker_timer wtimer;
-    struct nn_ctx *ctx;
+    struct nn_fsm_event timeout_event;
+    struct nn_fsm_event stopped_event;
+    struct nn_fsm_event closed_event;
     struct nn_worker *worker;
     int timeout;
 };
 
-void nn_timer_init (struct nn_timer *self, struct nn_ctx *ctx,
-    struct nn_callback *callback);
+void nn_timer_init (struct nn_timer *self, struct nn_fsm *owner);
 void nn_timer_close (struct nn_timer *self);
 
 void nn_timer_start (struct nn_timer *self, int timeout);
diff --git a/src/aio/usock.c b/src/aio/usock.c
index d0633cc..01453e5 100644
--- a/src/aio/usock.c
+++ b/src/aio/usock.c
@@ -40,22 +40,19 @@
 static int nn_usock_send_raw (struct nn_usock *self, struct msghdr *hdr);
 static int nn_usock_recv_raw (struct nn_usock *self, void *buf, size_t *len);
 static int nn_usock_geterr (struct nn_usock *self);
-static void nn_usock_callback (struct nn_callback *self, void *source,
-    int type);
+static void nn_usock_callback (struct nn_fsm *self, void *source, int type);
 
 static int nn_usock_init_from_fd (struct nn_usock *self,
-    int fd, struct nn_ctx *ctx, struct nn_callback *callback)
+    int fd, struct nn_fsm *owner)
 {
     int rc;
     int opt;
 
-    /*  Set up the callback pointers. */
-    nn_callback_init (&self->in_callback, nn_usock_callback);
-    self->out_callback = callback;
+    /*  Initalise the state machine. */
+    nn_fsm_init (&self->fsm, nn_usock_callback, owner);
 
-    /*  Store the reference to the worker the socket is associated with. */
-    self->ctx = ctx;
-    self->worker = nn_ctx_choose_worker (ctx);
+    /*  Choose a worker thread to handle this socket. */
+    self->worker = nn_fsm_choose_worker (&self->fsm);
 
     /*  Store the file descriptor of the underlying socket. */
     self->s = fd;
@@ -110,35 +107,31 @@
     memset (&self->out.hdr, 0, sizeof (struct msghdr));
 
     /*  Initialise outgoing tasks. */
-    nn_worker_fd_init (&self->wfd, &self->in_callback);
-    nn_worker_task_init (&self->wtask_connect, &self->in_callback);
-    nn_worker_task_init (&self->wtask_connected, &self->in_callback);
-    nn_worker_task_init (&self->wtask_accept, &self->in_callback);
-    nn_worker_task_init (&self->wtask_send, &self->in_callback);
-    nn_worker_task_init (&self->wtask_recv, &self->in_callback);
-    nn_worker_task_init (&self->wtask_close, &self->in_callback);
+    nn_worker_fd_init (&self->wfd, &self->fsm);
+    nn_worker_task_init (&self->task_connect, &self->fsm);
+    nn_worker_task_init (&self->task_connected, &self->fsm);
+    nn_worker_task_init (&self->task_accept, &self->fsm);
+    nn_worker_task_init (&self->task_send, &self->fsm);
+    nn_worker_task_init (&self->task_recv, &self->fsm);
+    nn_worker_task_init (&self->task_close, &self->fsm);
 
     /*  Intialise incoming tasks. */
-    nn_ctx_task_init (&self->ctask_accepted, self->out_callback, self,
-        NN_USOCK_ACCEPTED);
-    nn_ctx_task_init (&self->ctask_connected, self->out_callback, self,
-        NN_USOCK_CONNECTED);
-    nn_ctx_task_init (&self->ctask_sent, self->out_callback, self,
-        NN_USOCK_SENT);
-    nn_ctx_task_init (&self->ctask_received, self->out_callback, self,
-        NN_USOCK_RECEIVED);
-    nn_ctx_task_init (&self->ctask_error, self->out_callback, self,
-        NN_USOCK_ERROR);
+    nn_fsm_event_init (&self->event_accepted, self, NN_USOCK_ACCEPTED);
+    nn_fsm_event_init (&self->event_connected, self, NN_USOCK_CONNECTED);
+    nn_fsm_event_init (&self->event_sent, self, NN_USOCK_SENT);
+    nn_fsm_event_init (&self->event_received, self, NN_USOCK_RECEIVED);
+    nn_fsm_event_init (&self->event_error, self, NN_USOCK_ERROR);
+    nn_fsm_event_init (&self->event_closed, self, NN_USOCK_CLOSED);
 
     /*  We are not accepting a connection at the moment. */
     self->newsock = NULL;
-    self->newcallback = NULL;
+    self->newowner = NULL;
 
     return 0;
 }
 
 int nn_usock_init (struct nn_usock *self, int domain, int type, int protocol,
-    struct nn_ctx *ctx, struct nn_callback *callback)
+    struct nn_fsm *owner)
 {
     int s;
 
@@ -153,13 +146,13 @@
     if (s < 0)
        return -errno;
 
-    return nn_usock_init_from_fd (self, s, ctx, callback);
+    return nn_usock_init_from_fd (self, s, owner);
 }
 
 void nn_usock_close (struct nn_usock *self)
 {
     /*  Ask socket to close asynchronously. */
-    nn_worker_execute (self->worker, &self->wtask_close);
+    nn_worker_execute (self->worker, &self->task_close);
 }
 
 static void nn_usock_term (struct nn_usock *self)
@@ -169,23 +162,24 @@
     if (self->in.batch)
         nn_free (self->in.batch);
 
-    nn_ctx_task_term (&self->ctask_error);
-    nn_ctx_task_term (&self->ctask_received);
-    nn_ctx_task_term (&self->ctask_sent);
-    nn_ctx_task_term (&self->ctask_connected);
-    nn_ctx_task_term (&self->ctask_accepted);
-    nn_worker_task_term (&self->wtask_close);
-    nn_worker_task_term (&self->wtask_recv);
-    nn_worker_task_term (&self->wtask_send);
-    nn_worker_task_term (&self->wtask_accept);
-    nn_worker_task_term (&self->wtask_connected);
-    nn_worker_task_term (&self->wtask_connect);
+    nn_fsm_event_term (&self->event_closed);
+    nn_fsm_event_term (&self->event_error);
+    nn_fsm_event_term (&self->event_received);
+    nn_fsm_event_term (&self->event_sent);
+    nn_fsm_event_term (&self->event_connected);
+    nn_fsm_event_term (&self->event_accepted);
+    nn_worker_task_term (&self->task_close);
+    nn_worker_task_term (&self->task_recv);
+    nn_worker_task_term (&self->task_send);
+    nn_worker_task_term (&self->task_accept);
+    nn_worker_task_term (&self->task_connected);
+    nn_worker_task_term (&self->task_connect);
     nn_worker_fd_term (&self->wfd);
 
     rc = close (self->s);
     errno_assert (rc == 0);
 
-    nn_callback_term (&self->in_callback);
+    nn_fsm_term (&self->fsm);
 }
 
 int nn_usock_setsockopt (struct nn_usock *self, int level, int optname,
@@ -235,7 +229,7 @@
 }
 
 void nn_usock_accept (struct nn_usock *self, struct nn_usock *newsock,
-    struct nn_callback *newcallback)
+    struct nn_fsm *newowner)
 {
     int s;
 
@@ -251,22 +245,22 @@
 
     /*  Immediate success. */
     if (nn_fast (s >= 0)) {
-        nn_usock_init_from_fd (newsock, s, self->ctx, newcallback);
-        nn_ctx_execute (self->ctx, &self->ctask_accepted);
+        nn_usock_init_from_fd (newsock, s, self->newowner);
+        nn_fsm_raise (&self->fsm, &self->event_accepted);
         return;
     }
 
     /*  Unexpected failure. */
     if (nn_slow (errno != EAGAIN && errno != EWOULDBLOCK &&
           errno != ECONNABORTED)) {
-        nn_ctx_execute (self->ctx, &self->ctask_error);
+        nn_fsm_raise (&self->fsm, &self->event_error);
         return;
     }
 
     /*  Ask the worker thread to wait for the new connection. */
     self->newsock = newsock;
-    self->newcallback = newcallback;
-    nn_worker_execute (self->worker, &self->wtask_accept);
+    self->newowner = newowner;
+    nn_worker_execute (self->worker, &self->task_accept);
 }
 
 void nn_usock_connect (struct nn_usock *self, const struct sockaddr *addr,
@@ -281,21 +275,21 @@
     if (nn_fast (rc == 0)) {
 
         /*  Ask worker thread to start polling on the socket. */
-        nn_worker_execute (self->worker, &self->wtask_connected);
+        nn_worker_execute (self->worker, &self->task_connected);
 
         /*  Notify the user that the connection is established. */
-        nn_ctx_execute (self->ctx, &self->ctask_connected);
+        nn_fsm_raise (&self->fsm, &self->event_connected);
         return;
     }
 
     /* Return unexpected errors to the caller. Notify the user about it. */
     if (nn_slow (errno != EINPROGRESS)) {
-        nn_ctx_execute (self->ctx, &self->ctask_error);
+        nn_fsm_raise (&self->fsm, &self->event_error);
         return;
     }
 
     /*  Ask worker thread to start waiting for connection establishment. */
-    nn_worker_execute (self->worker, &self->wtask_connect);
+    nn_worker_execute (self->worker, &self->task_connect);
 }
 
 void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov,
@@ -323,19 +317,19 @@
 
     /*  Success. */
     if (nn_fast (rc == 0)) {
-        nn_ctx_execute (self->ctx, &self->ctask_sent);
+        nn_fsm_raise (&self->fsm, &self->event_sent);
         return;
     }
 
     /*  Errors. */
     if (nn_slow (rc != -EAGAIN)) {
         errnum_assert (rc == -ECONNRESET, -rc);
-        nn_ctx_execute (self->ctx, &self->ctask_error);
+        nn_fsm_raise (&self->fsm, &self->event_error);
         return;
     }
 
     /*  Ask the worker thread to send the remaining data. */
-    nn_worker_execute (self->worker, &self->wtask_send);
+    nn_worker_execute (self->worker, &self->task_send);
 }
 
 void nn_usock_recv (struct nn_usock *self, void *buf, size_t len)
@@ -348,13 +342,13 @@
     rc = nn_usock_recv_raw (self, buf, &nbytes);
     if (nn_slow (rc < 0)) {
         errnum_assert (rc == -ECONNRESET, -rc);
-        nn_ctx_execute (self->ctx, &self->ctask_error);
+        nn_fsm_raise (&self->fsm, &self->event_error);
         return;
     }
 
     /*  Success. */
     if (nn_fast (nbytes == len)) {
-        nn_ctx_execute (self->ctx, &self->ctask_received);
+        nn_fsm_raise (&self->fsm, &self->event_received);
         return;
     }
 
@@ -363,28 +357,24 @@
     self->in.len = len - nbytes;
 
     /*  Ask the worker thread to receive the remaining data. */
-    nn_worker_execute (self->worker, &self->wtask_recv);
+    nn_worker_execute (self->worker, &self->task_recv);
 }
 
-static void nn_usock_callback (struct nn_callback *self, void *source, int type)
+static void nn_usock_callback (struct nn_fsm *self, void *source, int type)
 {
     int rc;
     struct nn_usock *usock;
     int s;
     size_t sz;
-    struct nn_callback *out_callback;
 
-    usock = nn_cont (self, struct nn_usock, in_callback);
+    usock = nn_cont (self, struct nn_usock, fsm);
 
     /*  Close event is processed in the same way not depending on the state
         the usock is in. */
-    if (source == &usock->wtask_close) {
+    if (source == &usock->task_close) {
         nn_worker_rm_fd (usock->worker, &usock->wfd);
-        out_callback = usock->out_callback;
+        nn_fsm_raise (&usock->fsm, &usock->event_closed);
         nn_usock_term (usock);
-        nn_ctx_enter (usock->ctx);
-        out_callback->fn (usock->out_callback, usock, NN_USOCK_CLOSED);
-        nn_ctx_leave (usock->ctx);
         return;
     }
 
@@ -394,20 +384,20 @@
 /*  STARTING                                                                  */
 /******************************************************************************/
     case NN_USOCK_STATE_STARTING:
-        if (source == &usock->wtask_connected) {
+        if (source == &usock->task_connected) {
             nn_assert (type == NN_WORKER_TASK_EXECUTE);
             nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
             usock->state = NN_USOCK_STATE_CONNECTED;
             return;
         }
-        if (source == &usock->wtask_connect) {
+        if (source == &usock->task_connect) {
             nn_assert (type == NN_WORKER_TASK_EXECUTE);
             nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
             nn_worker_set_out (usock->worker, &usock->wfd);
             usock->state = NN_USOCK_STATE_CONNECTING;
             return;
         }
-        if (source == &usock->wtask_accept) {
+        if (source == &usock->task_accept) {
             nn_assert (type == NN_WORKER_TASK_EXECUTE);
             nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
             nn_worker_set_in (usock->worker, &usock->wfd);
@@ -425,10 +415,7 @@
             case NN_WORKER_FD_OUT:
                 nn_worker_reset_out (usock->worker, &usock->wfd);
                 usock->state = NN_USOCK_STATE_CONNECTED;
-                nn_ctx_enter (usock->ctx);
-                usock->out_callback->fn (usock->out_callback, usock,
-                    NN_USOCK_CONNECTED);
-                nn_ctx_leave (usock->ctx);
+                nn_fsm_raise (&usock->fsm, &usock->event_connected);
                 return;
             case NN_WORKER_FD_ERR:
                 nn_assert (0);
@@ -459,16 +446,12 @@
                     errno_assert (0);
                 }
 
-                nn_usock_init_from_fd (usock->newsock, s, usock->ctx,
-                    usock->newcallback);
+                nn_usock_init_from_fd (usock->newsock, s, usock->newowner);
                 nn_worker_add_fd (usock->newsock->worker, usock->newsock->s,
                     &usock->newsock->wfd);
-                nn_ctx_enter (usock->ctx);
-                usock->out_callback->fn (usock->out_callback, usock,
-                    NN_USOCK_ACCEPTED);
-                nn_ctx_leave (usock->ctx);
+                nn_fsm_raise (&usock->fsm, &usock->event_accepted);
                 usock->newsock = NULL;
-                usock->newcallback = NULL;
+                usock->newowner = NULL;
                 return;
             default:
                 nn_assert (0);
@@ -480,12 +463,12 @@
 /*  CONNECTED                                                                 */
 /******************************************************************************/ 
     case NN_USOCK_STATE_CONNECTED:
-        if (source == &usock->wtask_send) {
+        if (source == &usock->task_send) {
             nn_assert (type == NN_WORKER_TASK_EXECUTE);
             nn_worker_set_out (usock->worker, &usock->wfd);
             return;
         }
-        if (source == &usock->wtask_recv) {
+        if (source == &usock->task_recv) {
             nn_assert (type == NN_WORKER_TASK_EXECUTE);
             nn_worker_set_in (usock->worker, &usock->wfd);
             return;
@@ -499,36 +482,24 @@
                     usock->in.len -= sz;
                     if (!usock->in.len) {
                         nn_worker_reset_in (usock->worker, &usock->wfd);
-                        nn_ctx_enter (usock->ctx);
-                        usock->out_callback->fn (usock->out_callback, usock,
-                            NN_USOCK_RECEIVED);
-                        nn_ctx_leave (usock->ctx);
+                        nn_fsm_raise (&usock->fsm, &usock->event_received);
                     }
                     return;
                 }
                 errnum_assert (rc == -ECONNRESET, -rc);
-                nn_ctx_enter (usock->ctx);
-                usock->out_callback->fn (usock->out_callback, usock,
-                    NN_USOCK_ERROR);
-                nn_ctx_leave (usock->ctx);
+                nn_fsm_raise (&usock->fsm, &usock->event_error);
                 return;
             case NN_WORKER_FD_OUT:
                 rc = nn_usock_send_raw (usock, &usock->out.hdr);
                 if (nn_fast (rc == 0)) {
                     nn_worker_reset_out (usock->worker, &usock->wfd);
-                    nn_ctx_enter (usock->ctx);
-                    usock->out_callback->fn (usock->out_callback, usock,
-                        NN_USOCK_SENT);
-                    nn_ctx_leave (usock->ctx);
+                    nn_fsm_raise (&usock->fsm, &usock->event_sent);
                     return;
                 }
                 if (nn_fast (rc == -EAGAIN))
                     return;
                 errnum_assert (rc == -ECONNRESET, -rc);
-                nn_ctx_enter (usock->ctx);
-                usock->out_callback->fn (usock->out_callback, usock,
-                    NN_USOCK_ERROR);
-                nn_ctx_leave (usock->ctx);
+                nn_fsm_raise (&usock->fsm, &usock->event_error);
                 return;
             case NN_WORKER_FD_ERR:
                 nn_assert (0);
diff --git a/src/aio/usock.h b/src/aio/usock.h
index dd24959..ad362f5 100644
--- a/src/aio/usock.h
+++ b/src/aio/usock.h
@@ -26,9 +26,8 @@
 /*  Import the definition of nn_iovec. */
 #include "../nn.h"
 
-#include "callback.h"
+#include "fsm.h"
 #include "worker.h"
-#include "ctx.h"
 
 #define _GNU_SOURCE
 #include <sys/types.h>
@@ -54,14 +53,8 @@
 
 struct nn_usock {
 
-    /*  This class is sink of events. */
-    struct nn_callback in_callback;
-
-    /*  This class is source of events. */
-    struct nn_callback *out_callback;
-
-    /*  AIO context the socket belongs to. */
-    struct nn_ctx *ctx;
+    /*  State machine base class. */
+    struct nn_fsm fsm;
 
     /*  The worker thread the usock is associated with. */
     struct nn_worker *worker;
@@ -104,28 +97,29 @@
     } out;
 
     /*  Asynchronous tasks for the worker. */
-    struct nn_worker_task wtask_connect;
-    struct nn_worker_task wtask_connected;
-    struct nn_worker_task wtask_accept;
-    struct nn_worker_task wtask_send;
-    struct nn_worker_task wtask_recv;
-    struct nn_worker_task wtask_close;
+    struct nn_worker_task task_connect;
+    struct nn_worker_task task_connected;
+    struct nn_worker_task task_accept;
+    struct nn_worker_task task_send;
+    struct nn_worker_task task_recv;
+    struct nn_worker_task task_close;
 
-    /*  Asynchronous callback tasks. */ 
-    struct nn_ctx_task ctask_accepted;
-    struct nn_ctx_task ctask_connected;
-    struct nn_ctx_task ctask_sent;
-    struct nn_ctx_task ctask_received;
-    struct nn_ctx_task ctask_error;
+    /*  Events raised by the usock. */ 
+    struct nn_fsm_event event_accepted;
+    struct nn_fsm_event event_connected;
+    struct nn_fsm_event event_sent;
+    struct nn_fsm_event event_received;
+    struct nn_fsm_event event_error;
+    struct nn_fsm_event event_closed;
 
     /*  When accepting a new connection, the pointer to the object to associate
         the new connection with is stored here. */
     struct nn_usock *newsock;
-    struct nn_callback *newcallback;
+    struct nn_fsm *newowner;
 };
 
 int nn_usock_init (struct nn_usock *self, int domain, int type, int protocol,
-    struct nn_ctx *ctx, struct nn_callback *callback);
+    struct nn_fsm *owner);
 void nn_usock_close (struct nn_usock *self);
 
 int nn_usock_setsockopt (struct nn_usock *self, int level, int optname,
@@ -135,7 +129,7 @@
     size_t addrlen);
 int nn_usock_listen (struct nn_usock *self, int backlog);
 void nn_usock_accept (struct nn_usock *self, struct nn_usock *newsock,
-    struct nn_callback *newcallback);
+    struct nn_fsm *newowner);
 void nn_usock_connect (struct nn_usock *self, const struct sockaddr *addr,
     size_t addrlen);
 
diff --git a/src/aio/worker.c b/src/aio/worker.c
index d0960d3..480728d 100644
--- a/src/aio/worker.c
+++ b/src/aio/worker.c
@@ -31,20 +31,18 @@
 /*  Private functions. */
 static void nn_worker_routine (void *arg);
 
-void nn_worker_fd_init (struct nn_worker_fd *self,
-    struct nn_callback *callback)
+void nn_worker_fd_init (struct nn_worker_fd *self, struct nn_fsm *owner)
 {
-    self->callback = callback;
+    self->owner = owner;
 }
 
 void nn_worker_fd_term (struct nn_worker_fd *self)
 {
 }
 
-void nn_worker_timer_init (struct nn_worker_timer *self,
-    struct nn_callback *callback)
+void nn_worker_timer_init (struct nn_worker_timer *self, struct nn_fsm *owner)
 {
-    self->callback = callback;
+    self->owner = owner;
     nn_timerset_hndl_init (&self->hndl);
 }
 
@@ -95,10 +93,9 @@
     nn_timerset_rm (&((struct nn_worker*) self)->timerset, &timer->hndl);
 }
 
-void nn_worker_task_init (struct nn_worker_task *self,
-    struct nn_callback *callback)
+void nn_worker_task_init (struct nn_worker_task *self, struct nn_fsm *owner)
 {
-    self->callback = callback;
+    self->owner = owner;
     nn_queue_item_init (&self->item);
 }
 
@@ -187,8 +184,7 @@
                 break;
             errnum_assert (rc == 0, -rc);
             timer = nn_cont (thndl, struct nn_worker_timer, hndl);
-            timer->callback->fn (timer->callback, timer,
-                NN_WORKER_TIMER_TIMEOUT);
+            nn_fsm_execute (timer->owner, NN_WORKER_TIMER_TIMEOUT);
         }
 
         /*  Process all events from the poller. */
@@ -228,8 +224,7 @@
                     /*  It's a user-defined task. Notify the user that it has
                         arrived in the worker thread. */
                     task = nn_cont (item, struct nn_worker_task, item);
-                    task->callback->fn (task->callback,
-                        task, NN_WORKER_TASK_EXECUTE);
+                    nn_fsm_execute (task->owner, NN_WORKER_TASK_EXECUTE);
                 }
                 nn_queue_term (&tasks);
                 continue;
@@ -237,7 +232,7 @@
 
             /*  It's a true I/O event. Invoke the handler. */
             fd = nn_cont (phndl, struct nn_worker_fd, hndl);
-            fd->callback->fn (fd->callback, fd, pevent);
+            nn_fsm_execute (fd->owner, pevent);
         }
     }
 }
diff --git a/src/aio/worker.h b/src/aio/worker.h
index a1cc9ff..0423ddf 100644
--- a/src/aio/worker.h
+++ b/src/aio/worker.h
@@ -30,7 +30,7 @@
 #include "../utils/thread.h"
 #include "../utils/efd.h"
 
-#include "callback.h"
+#include "fsm.h"
 #include "poller.h"
 #include "timerset.h"
 
@@ -39,23 +39,21 @@
 #define NN_WORKER_FD_ERR NN_POLLER_ERR
 
 struct nn_worker_fd {
-    struct nn_callback *callback;
+    struct nn_fsm *owner;
     struct nn_poller_hndl hndl;
 };
 
-void nn_worker_fd_init (struct nn_worker_fd *self,
-    struct nn_callback *callback);
+void nn_worker_fd_init (struct nn_worker_fd *self, struct nn_fsm *owner);
 void nn_worker_fd_term (struct nn_worker_fd *self);
 
 #define NN_WORKER_TIMER_TIMEOUT 1
 
 struct nn_worker_timer {
-    struct nn_callback *callback;
+    struct nn_fsm *owner;
     struct nn_timerset_hndl hndl;
 };
 
-void nn_worker_timer_init (struct nn_worker_timer *self,
-    struct nn_callback *callback);
+void nn_worker_timer_init (struct nn_worker_timer *self, struct nn_fsm *owner);
 void nn_worker_timer_term (struct nn_worker_timer *self);
 
 
@@ -64,12 +62,11 @@
 #define NN_WORKER_TASK_EXECUTE 1
 
 struct nn_worker_task {
-    struct nn_callback *callback;
+    struct nn_fsm *owner;
     struct nn_queue_item item;
 };
 
-void nn_worker_task_init (struct nn_worker_task *self,
-    struct nn_callback *callback);
+void nn_worker_task_init (struct nn_worker_task *self, struct nn_fsm *owner);
 void nn_worker_task_term (struct nn_worker_task *self);
 
 struct nn_worker {