Simple callbacks turned into full-blown state machines
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4d2031b..24a7afd 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -49,10 +49,10 @@
core/device.c
core/symbol.c
- aio/callback.h
- aio/callback.c
aio/ctx.h
aio/ctx.c
+ aio/fsm.h
+ aio/fsm.c
aio/poller.h
aio/poller.c
aio/poller_epoll.inc
diff --git a/src/aio/callback.c b/src/aio/callback.c
deleted file mode 100644
index ccbe5f0..0000000
--- a/src/aio/callback.c
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- Copyright (c) 2013 250bpm s.r.o.
-
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"),
- to deal in the Software without restriction, including without limitation
- the rights to use, copy, modify, merge, publish, distribute, sublicense,
- and/or sell copies of the Software, and to permit persons to whom
- the Software is furnished to do so, subject to the following conditions:
-
- The above copyright notice and this permission notice shall be included
- in all copies or substantial portions of the Software.
-
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
- THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- IN THE SOFTWARE.
-*/
-
-#include "callback.h"
-
-void nn_callback_init (struct nn_callback *self, nn_callback_fn fn)
-{
- self->fn = fn;
-}
-
-void nn_callback_term (struct nn_callback *self)
-{
-}
-
diff --git a/src/aio/callback.h b/src/aio/callback.h
deleted file mode 100644
index 4807be5..0000000
--- a/src/aio/callback.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- Copyright (c) 2013 250bpm s.r.o.
-
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"),
- to deal in the Software without restriction, including without limitation
- the rights to use, copy, modify, merge, publish, distribute, sublicense,
- and/or sell copies of the Software, and to permit persons to whom
- the Software is furnished to do so, subject to the following conditions:
-
- The above copyright notice and this permission notice shall be included
- in all copies or substantial portions of the Software.
-
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
- THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- IN THE SOFTWARE.
-*/
-
-#ifndef NN_CALLBACK_INCLUDED
-#define NN_CALLBACK_INCLUDED
-
-/* Base class for objects accepting callbacks. */
-
-struct nn_callback;
-
-/* Virtual function to be implemented by the derived class to handle the
- callbacks. 'source' parameter is pointer to the object that generated
- the callback. As it can be any kind of object it is of type void*.
- The user should check whether the pointer points to any source of
- callbacks it is aware of and cast the pointer accordingly. If a single
- object can generate different kinds of callbacks, 'type' parameter
- specifies the callback type. Possible values are defined by the object
- that is the source of callbacks. */
-typedef void (*nn_callback_fn) (struct nn_callback *self, void *source,
- int type);
-
-struct nn_callback {
- nn_callback_fn fn;
-};
-
-void nn_callback_init (struct nn_callback *self, nn_callback_fn fn);
-void nn_callback_term (struct nn_callback *self);
-
-#endif
-
diff --git a/src/aio/ctx.c b/src/aio/ctx.c
index a6f5e41..a8fc698 100644
--- a/src/aio/ctx.c
+++ b/src/aio/ctx.c
@@ -25,30 +25,16 @@
#include "../utils/err.h"
#include "../utils/cont.h"
-void nn_ctx_task_init (struct nn_ctx_task *self, struct nn_callback *callback,
- void *source, int type)
-{
- self->callback = callback;
- self->source = source;
- self->type = type;
- nn_queue_item_init (&self->item);
-}
-
-void nn_ctx_task_term (struct nn_ctx_task *self)
-{
- nn_queue_item_term (&self->item);
-}
-
void nn_ctx_init (struct nn_ctx *self, struct nn_pool *pool)
{
nn_mutex_init (&self->sync);
self->pool = pool;
- nn_queue_init (&self->tasks);
+ nn_queue_init (&self->events);
}
void nn_ctx_term (struct nn_ctx *self)
{
- nn_queue_term (&self->tasks);
+ nn_queue_term (&self->events);
nn_mutex_term (&self->sync);
}
@@ -59,14 +45,15 @@
void nn_ctx_leave (struct nn_ctx *self)
{
- struct nn_ctx_task *task;
+ struct nn_fsm_event *event;
- /* Process any queued tasks before leaving the AIO context. */
+ /* Process any queued events before leaving the AIO context. */
while (1) {
- task = nn_cont (nn_queue_pop (&self->tasks), struct nn_ctx_task, item);
- if (!task)
+ event = nn_cont (nn_queue_pop (&self->events),
+ struct nn_fsm_event, item);
+ if (!event)
break;
- task->callback->fn (task->callback, task->source, task->type);
+ event->fsm->fn (event->fsm, event->source, event->type);
}
nn_mutex_unlock (&self->sync);
@@ -77,8 +64,8 @@
return nn_pool_choose_worker (self->pool);
}
-void nn_ctx_execute (struct nn_ctx *self, struct nn_ctx_task *task)
+void nn_ctx_raise (struct nn_ctx *self, struct nn_fsm_event *event)
{
- nn_queue_push (&self->tasks, &task->item);
+ nn_queue_push (&self->events, &event->item);
}
diff --git a/src/aio/ctx.h b/src/aio/ctx.h
index a9750ed..2a04c27 100644
--- a/src/aio/ctx.h
+++ b/src/aio/ctx.h
@@ -27,26 +27,15 @@
#include "../utils/queue.h"
#include "worker.h"
-#include "callback.h"
#include "pool.h"
+#include "fsm.h"
/* AIO context for objects using AIO subsystem. */
-struct nn_ctx_task {
- struct nn_callback *callback;
- void *source;
- int type;
- struct nn_queue_item item;
-};
-
-void nn_ctx_task_init (struct nn_ctx_task *self, struct nn_callback *callback,
- void *source, int type);
-void nn_ctx_task_term (struct nn_ctx_task *self);
-
struct nn_ctx {
struct nn_mutex sync;
struct nn_pool *pool;
- struct nn_queue tasks;
+ struct nn_queue events;
};
void nn_ctx_init (struct nn_ctx *self, struct nn_pool *pool);
@@ -57,7 +46,7 @@
struct nn_worker *nn_ctx_choose_worker (struct nn_ctx *self);
-void nn_ctx_execute (struct nn_ctx *self, struct nn_ctx_task *task);
+void nn_ctx_raise (struct nn_ctx *self, struct nn_fsm_event *event);
#endif
diff --git a/src/aio/fsm.c b/src/aio/fsm.c
new file mode 100644
index 0000000..dfedaad
--- /dev/null
+++ b/src/aio/fsm.c
@@ -0,0 +1,74 @@
+/*
+ Copyright (c) 2013 250bpm s.r.o.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include "fsm.h"
+#include "ctx.h"
+
+#include <stddef.h>
+
+void nn_fsm_event_init (struct nn_fsm_event *self, void *source, int type)
+{
+ self->fsm = NULL;
+ self->source = source;
+ self->type = type;
+ nn_queue_item_init (&self->item);
+}
+
+void nn_fsm_event_term (struct nn_fsm_event *self)
+{
+ nn_queue_item_term (&self->item);
+}
+
+void nn_fsm_init_root (struct nn_fsm *self, nn_fsm_fn fn, struct nn_ctx *ctx)
+{
+ self->fn = fn;
+ self->owner = NULL;
+ self->ctx = ctx;
+}
+
+void nn_fsm_init (struct nn_fsm *self, nn_fsm_fn fn, struct nn_fsm *owner)
+{
+ self->fn = fn;
+ self->owner = owner;
+ self->ctx = owner->ctx;
+}
+
+void nn_fsm_term (struct nn_fsm *self)
+{
+}
+
+struct nn_worker *nn_fsm_choose_worker (struct nn_fsm *self)
+{
+ return nn_ctx_choose_worker (self->ctx);
+}
+
+void nn_fsm_raise (struct nn_fsm *self, struct nn_fsm_event *event)
+{
+ event->fsm = self->owner;
+ nn_ctx_raise (self->ctx, event);
+}
+
+void nn_fsm_execute (struct nn_fsm *self, int type)
+{
+ self->fn (self, NULL, type);
+}
+
diff --git a/src/aio/fsm.h b/src/aio/fsm.h
new file mode 100644
index 0000000..1c7ab89
--- /dev/null
+++ b/src/aio/fsm.h
@@ -0,0 +1,72 @@
+/*
+ Copyright (c) 2013 250bpm s.r.o.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#ifndef NN_FSM_INCLUDED
+#define NN_FSM_INCLUDED
+
+#include "../utils/queue.h"
+
+/* Base class for state machines. */
+
+struct nn_ctx;
+struct nn_fsm;
+struct nn_worker;
+
+struct nn_fsm_event {
+ struct nn_fsm *fsm;
+ void *source;
+ int type;
+ struct nn_queue_item item;
+};
+
+void nn_fsm_event_init (struct nn_fsm_event *self, void *source, int type);
+void nn_fsm_event_term (struct nn_fsm_event *self);
+
+/* Virtual function to be implemented by the derived class to handle the
+ incoming events. 'source' parameter is pointer to the object that generated
+ the event. As it can be any kind of object it is of type void*.
+ The user should check whether the pointer points to any source of
+ events it is aware of and cast the pointer accordingly. If a single
+ object can generate different kinds of events, 'type' parameter
+ specifies the event type. Possible values are defined by the object
+ that is the source of events. Source equal to NULL means the event is
+ comming from the state machine owner. */
+typedef void (*nn_fsm_fn) (struct nn_fsm *self, void *source, int type);
+
+struct nn_fsm {
+ nn_fsm_fn fn;
+ struct nn_fsm *owner;
+ struct nn_ctx *ctx;
+};
+
+void nn_fsm_init_root (struct nn_fsm *self, nn_fsm_fn fn, struct nn_ctx *ctx);
+void nn_fsm_init (struct nn_fsm *self, nn_fsm_fn fn, struct nn_fsm *owner);
+void nn_fsm_term (struct nn_fsm *self);
+
+struct nn_worker *nn_fsm_choose_worker (struct nn_fsm *self);
+
+void nn_fsm_raise (struct nn_fsm *self, struct nn_fsm_event *event);
+
+void nn_fsm_execute (struct nn_fsm *self, int type);
+
+#endif
+
diff --git a/src/aio/timer.c b/src/aio/timer.c
index 6ab448f..60b603b 100644
--- a/src/aio/timer.c
+++ b/src/aio/timer.c
@@ -26,30 +26,33 @@
#include "../utils/err.h"
static void nn_timer_term (struct nn_timer *self);
-static void nn_timer_callback (struct nn_callback *self, void *source,
+static void nn_timer_callback (struct nn_fsm *self, void *source,
int type);
-void nn_timer_init (struct nn_timer *self, struct nn_ctx *ctx,
- struct nn_callback *callback)
+void nn_timer_init (struct nn_timer *self, struct nn_fsm *owner)
{
- nn_callback_init (&self->in_callback, nn_timer_callback);
- self->out_callback = callback;
- nn_worker_task_init (&self->start_task, &self->in_callback);
- nn_worker_task_init (&self->stop_task, &self->in_callback);
- nn_worker_task_init (&self->close_task, &self->in_callback);
- nn_worker_timer_init (&self->wtimer, &self->in_callback);
- self->ctx = ctx;
- self->worker = nn_ctx_choose_worker (ctx);
+ nn_fsm_init (&self->fsm, nn_timer_callback, owner);
+ nn_worker_task_init (&self->start_task, &self->fsm);
+ nn_worker_task_init (&self->stop_task, &self->fsm);
+ nn_worker_task_init (&self->close_task, &self->fsm);
+ nn_worker_timer_init (&self->wtimer, &self->fsm);
+ nn_fsm_event_init (&self->timeout_event, self, NN_TIMER_TIMEOUT);
+ nn_fsm_event_init (&self->stopped_event, self, NN_TIMER_STOPPED);
+ nn_fsm_event_init (&self->closed_event, self, NN_TIMER_CLOSED);
+ self->worker = nn_fsm_choose_worker (&self->fsm);
self->timeout = -1;
}
static void nn_timer_term (struct nn_timer *self)
{
+ nn_fsm_event_term (&self->closed_event);
+ nn_fsm_event_term (&self->stopped_event);
+ nn_fsm_event_term (&self->timeout_event);
nn_worker_timer_term (&self->wtimer);
nn_worker_task_term (&self->close_task);
nn_worker_task_term (&self->stop_task);
nn_worker_task_term (&self->start_task);
- nn_callback_term (&self->in_callback);
+ nn_fsm_term (&self->fsm);
}
void nn_timer_close (struct nn_timer *self)
@@ -77,19 +80,16 @@
nn_worker_execute (self->worker, &self->start_task);
}
-static void nn_timer_callback (struct nn_callback *self, void *source, int type)
+static void nn_timer_callback (struct nn_fsm *self, void *source, int type)
{
struct nn_timer *timer;
- struct nn_callback *out_callback;
- timer = nn_cont (self, struct nn_timer, in_callback);
+ timer = nn_cont (self, struct nn_timer, fsm);
if (source == &timer->wtimer) {
nn_assert (timer->timeout > 0);
timer->timeout = -1;
- nn_ctx_enter (timer->ctx);
- timer->out_callback->fn (timer->out_callback, timer, NN_TIMER_TIMEOUT);
- nn_ctx_leave (timer->ctx);
+ nn_fsm_raise (&timer->fsm, &timer->timeout_event);
return;
}
if (source == &timer->start_task) {
@@ -101,19 +101,14 @@
nn_assert (timer->timeout > 0);
timer->timeout = -1;
nn_worker_rm_timer (timer->worker, &timer->wtimer);
- nn_ctx_enter (timer->ctx);
- timer->out_callback->fn (timer->out_callback, timer, NN_TIMER_STOPPED);
- nn_ctx_leave (timer->ctx);
+ nn_fsm_raise (&timer->fsm, &timer->stopped_event);
return;
}
if (source == &timer->close_task) {
if (timer->timeout > 0)
nn_worker_rm_timer (timer->worker, &timer->wtimer);
- out_callback = timer->out_callback;
+ nn_fsm_raise (&timer->fsm, &timer->closed_event);
nn_timer_term (timer);
- nn_ctx_enter (timer->ctx);
- out_callback->fn (out_callback, timer, NN_TIMER_CLOSED);
- nn_ctx_leave (timer->ctx);
return;
}
nn_assert (0);
diff --git a/src/aio/timer.h b/src/aio/timer.h
index af4138c..f8a5da2 100644
--- a/src/aio/timer.h
+++ b/src/aio/timer.h
@@ -23,27 +23,27 @@
#ifndef NN_TIMER_INCLUDED
#define NN_TIMER_INCLUDED
-#include "callback.h"
-#include "ctx.h"
+#include "fsm.h"
+#include "worker.h"
#define NN_TIMER_TIMEOUT 1
#define NN_TIMER_STOPPED 2
#define NN_TIMER_CLOSED 3
struct nn_timer {
- struct nn_callback in_callback;
- struct nn_callback *out_callback;
+ struct nn_fsm fsm;
struct nn_worker_task start_task;
struct nn_worker_task stop_task;
struct nn_worker_task close_task;
struct nn_worker_timer wtimer;
- struct nn_ctx *ctx;
+ struct nn_fsm_event timeout_event;
+ struct nn_fsm_event stopped_event;
+ struct nn_fsm_event closed_event;
struct nn_worker *worker;
int timeout;
};
-void nn_timer_init (struct nn_timer *self, struct nn_ctx *ctx,
- struct nn_callback *callback);
+void nn_timer_init (struct nn_timer *self, struct nn_fsm *owner);
void nn_timer_close (struct nn_timer *self);
void nn_timer_start (struct nn_timer *self, int timeout);
diff --git a/src/aio/usock.c b/src/aio/usock.c
index d0633cc..01453e5 100644
--- a/src/aio/usock.c
+++ b/src/aio/usock.c
@@ -40,22 +40,19 @@
static int nn_usock_send_raw (struct nn_usock *self, struct msghdr *hdr);
static int nn_usock_recv_raw (struct nn_usock *self, void *buf, size_t *len);
static int nn_usock_geterr (struct nn_usock *self);
-static void nn_usock_callback (struct nn_callback *self, void *source,
- int type);
+static void nn_usock_callback (struct nn_fsm *self, void *source, int type);
static int nn_usock_init_from_fd (struct nn_usock *self,
- int fd, struct nn_ctx *ctx, struct nn_callback *callback)
+ int fd, struct nn_fsm *owner)
{
int rc;
int opt;
- /* Set up the callback pointers. */
- nn_callback_init (&self->in_callback, nn_usock_callback);
- self->out_callback = callback;
+ /* Initalise the state machine. */
+ nn_fsm_init (&self->fsm, nn_usock_callback, owner);
- /* Store the reference to the worker the socket is associated with. */
- self->ctx = ctx;
- self->worker = nn_ctx_choose_worker (ctx);
+ /* Choose a worker thread to handle this socket. */
+ self->worker = nn_fsm_choose_worker (&self->fsm);
/* Store the file descriptor of the underlying socket. */
self->s = fd;
@@ -110,35 +107,31 @@
memset (&self->out.hdr, 0, sizeof (struct msghdr));
/* Initialise outgoing tasks. */
- nn_worker_fd_init (&self->wfd, &self->in_callback);
- nn_worker_task_init (&self->wtask_connect, &self->in_callback);
- nn_worker_task_init (&self->wtask_connected, &self->in_callback);
- nn_worker_task_init (&self->wtask_accept, &self->in_callback);
- nn_worker_task_init (&self->wtask_send, &self->in_callback);
- nn_worker_task_init (&self->wtask_recv, &self->in_callback);
- nn_worker_task_init (&self->wtask_close, &self->in_callback);
+ nn_worker_fd_init (&self->wfd, &self->fsm);
+ nn_worker_task_init (&self->task_connect, &self->fsm);
+ nn_worker_task_init (&self->task_connected, &self->fsm);
+ nn_worker_task_init (&self->task_accept, &self->fsm);
+ nn_worker_task_init (&self->task_send, &self->fsm);
+ nn_worker_task_init (&self->task_recv, &self->fsm);
+ nn_worker_task_init (&self->task_close, &self->fsm);
/* Intialise incoming tasks. */
- nn_ctx_task_init (&self->ctask_accepted, self->out_callback, self,
- NN_USOCK_ACCEPTED);
- nn_ctx_task_init (&self->ctask_connected, self->out_callback, self,
- NN_USOCK_CONNECTED);
- nn_ctx_task_init (&self->ctask_sent, self->out_callback, self,
- NN_USOCK_SENT);
- nn_ctx_task_init (&self->ctask_received, self->out_callback, self,
- NN_USOCK_RECEIVED);
- nn_ctx_task_init (&self->ctask_error, self->out_callback, self,
- NN_USOCK_ERROR);
+ nn_fsm_event_init (&self->event_accepted, self, NN_USOCK_ACCEPTED);
+ nn_fsm_event_init (&self->event_connected, self, NN_USOCK_CONNECTED);
+ nn_fsm_event_init (&self->event_sent, self, NN_USOCK_SENT);
+ nn_fsm_event_init (&self->event_received, self, NN_USOCK_RECEIVED);
+ nn_fsm_event_init (&self->event_error, self, NN_USOCK_ERROR);
+ nn_fsm_event_init (&self->event_closed, self, NN_USOCK_CLOSED);
/* We are not accepting a connection at the moment. */
self->newsock = NULL;
- self->newcallback = NULL;
+ self->newowner = NULL;
return 0;
}
int nn_usock_init (struct nn_usock *self, int domain, int type, int protocol,
- struct nn_ctx *ctx, struct nn_callback *callback)
+ struct nn_fsm *owner)
{
int s;
@@ -153,13 +146,13 @@
if (s < 0)
return -errno;
- return nn_usock_init_from_fd (self, s, ctx, callback);
+ return nn_usock_init_from_fd (self, s, owner);
}
void nn_usock_close (struct nn_usock *self)
{
/* Ask socket to close asynchronously. */
- nn_worker_execute (self->worker, &self->wtask_close);
+ nn_worker_execute (self->worker, &self->task_close);
}
static void nn_usock_term (struct nn_usock *self)
@@ -169,23 +162,24 @@
if (self->in.batch)
nn_free (self->in.batch);
- nn_ctx_task_term (&self->ctask_error);
- nn_ctx_task_term (&self->ctask_received);
- nn_ctx_task_term (&self->ctask_sent);
- nn_ctx_task_term (&self->ctask_connected);
- nn_ctx_task_term (&self->ctask_accepted);
- nn_worker_task_term (&self->wtask_close);
- nn_worker_task_term (&self->wtask_recv);
- nn_worker_task_term (&self->wtask_send);
- nn_worker_task_term (&self->wtask_accept);
- nn_worker_task_term (&self->wtask_connected);
- nn_worker_task_term (&self->wtask_connect);
+ nn_fsm_event_term (&self->event_closed);
+ nn_fsm_event_term (&self->event_error);
+ nn_fsm_event_term (&self->event_received);
+ nn_fsm_event_term (&self->event_sent);
+ nn_fsm_event_term (&self->event_connected);
+ nn_fsm_event_term (&self->event_accepted);
+ nn_worker_task_term (&self->task_close);
+ nn_worker_task_term (&self->task_recv);
+ nn_worker_task_term (&self->task_send);
+ nn_worker_task_term (&self->task_accept);
+ nn_worker_task_term (&self->task_connected);
+ nn_worker_task_term (&self->task_connect);
nn_worker_fd_term (&self->wfd);
rc = close (self->s);
errno_assert (rc == 0);
- nn_callback_term (&self->in_callback);
+ nn_fsm_term (&self->fsm);
}
int nn_usock_setsockopt (struct nn_usock *self, int level, int optname,
@@ -235,7 +229,7 @@
}
void nn_usock_accept (struct nn_usock *self, struct nn_usock *newsock,
- struct nn_callback *newcallback)
+ struct nn_fsm *newowner)
{
int s;
@@ -251,22 +245,22 @@
/* Immediate success. */
if (nn_fast (s >= 0)) {
- nn_usock_init_from_fd (newsock, s, self->ctx, newcallback);
- nn_ctx_execute (self->ctx, &self->ctask_accepted);
+ nn_usock_init_from_fd (newsock, s, self->newowner);
+ nn_fsm_raise (&self->fsm, &self->event_accepted);
return;
}
/* Unexpected failure. */
if (nn_slow (errno != EAGAIN && errno != EWOULDBLOCK &&
errno != ECONNABORTED)) {
- nn_ctx_execute (self->ctx, &self->ctask_error);
+ nn_fsm_raise (&self->fsm, &self->event_error);
return;
}
/* Ask the worker thread to wait for the new connection. */
self->newsock = newsock;
- self->newcallback = newcallback;
- nn_worker_execute (self->worker, &self->wtask_accept);
+ self->newowner = newowner;
+ nn_worker_execute (self->worker, &self->task_accept);
}
void nn_usock_connect (struct nn_usock *self, const struct sockaddr *addr,
@@ -281,21 +275,21 @@
if (nn_fast (rc == 0)) {
/* Ask worker thread to start polling on the socket. */
- nn_worker_execute (self->worker, &self->wtask_connected);
+ nn_worker_execute (self->worker, &self->task_connected);
/* Notify the user that the connection is established. */
- nn_ctx_execute (self->ctx, &self->ctask_connected);
+ nn_fsm_raise (&self->fsm, &self->event_connected);
return;
}
/* Return unexpected errors to the caller. Notify the user about it. */
if (nn_slow (errno != EINPROGRESS)) {
- nn_ctx_execute (self->ctx, &self->ctask_error);
+ nn_fsm_raise (&self->fsm, &self->event_error);
return;
}
/* Ask worker thread to start waiting for connection establishment. */
- nn_worker_execute (self->worker, &self->wtask_connect);
+ nn_worker_execute (self->worker, &self->task_connect);
}
void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov,
@@ -323,19 +317,19 @@
/* Success. */
if (nn_fast (rc == 0)) {
- nn_ctx_execute (self->ctx, &self->ctask_sent);
+ nn_fsm_raise (&self->fsm, &self->event_sent);
return;
}
/* Errors. */
if (nn_slow (rc != -EAGAIN)) {
errnum_assert (rc == -ECONNRESET, -rc);
- nn_ctx_execute (self->ctx, &self->ctask_error);
+ nn_fsm_raise (&self->fsm, &self->event_error);
return;
}
/* Ask the worker thread to send the remaining data. */
- nn_worker_execute (self->worker, &self->wtask_send);
+ nn_worker_execute (self->worker, &self->task_send);
}
void nn_usock_recv (struct nn_usock *self, void *buf, size_t len)
@@ -348,13 +342,13 @@
rc = nn_usock_recv_raw (self, buf, &nbytes);
if (nn_slow (rc < 0)) {
errnum_assert (rc == -ECONNRESET, -rc);
- nn_ctx_execute (self->ctx, &self->ctask_error);
+ nn_fsm_raise (&self->fsm, &self->event_error);
return;
}
/* Success. */
if (nn_fast (nbytes == len)) {
- nn_ctx_execute (self->ctx, &self->ctask_received);
+ nn_fsm_raise (&self->fsm, &self->event_received);
return;
}
@@ -363,28 +357,24 @@
self->in.len = len - nbytes;
/* Ask the worker thread to receive the remaining data. */
- nn_worker_execute (self->worker, &self->wtask_recv);
+ nn_worker_execute (self->worker, &self->task_recv);
}
-static void nn_usock_callback (struct nn_callback *self, void *source, int type)
+static void nn_usock_callback (struct nn_fsm *self, void *source, int type)
{
int rc;
struct nn_usock *usock;
int s;
size_t sz;
- struct nn_callback *out_callback;
- usock = nn_cont (self, struct nn_usock, in_callback);
+ usock = nn_cont (self, struct nn_usock, fsm);
/* Close event is processed in the same way not depending on the state
the usock is in. */
- if (source == &usock->wtask_close) {
+ if (source == &usock->task_close) {
nn_worker_rm_fd (usock->worker, &usock->wfd);
- out_callback = usock->out_callback;
+ nn_fsm_raise (&usock->fsm, &usock->event_closed);
nn_usock_term (usock);
- nn_ctx_enter (usock->ctx);
- out_callback->fn (usock->out_callback, usock, NN_USOCK_CLOSED);
- nn_ctx_leave (usock->ctx);
return;
}
@@ -394,20 +384,20 @@
/* STARTING */
/******************************************************************************/
case NN_USOCK_STATE_STARTING:
- if (source == &usock->wtask_connected) {
+ if (source == &usock->task_connected) {
nn_assert (type == NN_WORKER_TASK_EXECUTE);
nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
usock->state = NN_USOCK_STATE_CONNECTED;
return;
}
- if (source == &usock->wtask_connect) {
+ if (source == &usock->task_connect) {
nn_assert (type == NN_WORKER_TASK_EXECUTE);
nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
nn_worker_set_out (usock->worker, &usock->wfd);
usock->state = NN_USOCK_STATE_CONNECTING;
return;
}
- if (source == &usock->wtask_accept) {
+ if (source == &usock->task_accept) {
nn_assert (type == NN_WORKER_TASK_EXECUTE);
nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
nn_worker_set_in (usock->worker, &usock->wfd);
@@ -425,10 +415,7 @@
case NN_WORKER_FD_OUT:
nn_worker_reset_out (usock->worker, &usock->wfd);
usock->state = NN_USOCK_STATE_CONNECTED;
- nn_ctx_enter (usock->ctx);
- usock->out_callback->fn (usock->out_callback, usock,
- NN_USOCK_CONNECTED);
- nn_ctx_leave (usock->ctx);
+ nn_fsm_raise (&usock->fsm, &usock->event_connected);
return;
case NN_WORKER_FD_ERR:
nn_assert (0);
@@ -459,16 +446,12 @@
errno_assert (0);
}
- nn_usock_init_from_fd (usock->newsock, s, usock->ctx,
- usock->newcallback);
+ nn_usock_init_from_fd (usock->newsock, s, usock->newowner);
nn_worker_add_fd (usock->newsock->worker, usock->newsock->s,
&usock->newsock->wfd);
- nn_ctx_enter (usock->ctx);
- usock->out_callback->fn (usock->out_callback, usock,
- NN_USOCK_ACCEPTED);
- nn_ctx_leave (usock->ctx);
+ nn_fsm_raise (&usock->fsm, &usock->event_accepted);
usock->newsock = NULL;
- usock->newcallback = NULL;
+ usock->newowner = NULL;
return;
default:
nn_assert (0);
@@ -480,12 +463,12 @@
/* CONNECTED */
/******************************************************************************/
case NN_USOCK_STATE_CONNECTED:
- if (source == &usock->wtask_send) {
+ if (source == &usock->task_send) {
nn_assert (type == NN_WORKER_TASK_EXECUTE);
nn_worker_set_out (usock->worker, &usock->wfd);
return;
}
- if (source == &usock->wtask_recv) {
+ if (source == &usock->task_recv) {
nn_assert (type == NN_WORKER_TASK_EXECUTE);
nn_worker_set_in (usock->worker, &usock->wfd);
return;
@@ -499,36 +482,24 @@
usock->in.len -= sz;
if (!usock->in.len) {
nn_worker_reset_in (usock->worker, &usock->wfd);
- nn_ctx_enter (usock->ctx);
- usock->out_callback->fn (usock->out_callback, usock,
- NN_USOCK_RECEIVED);
- nn_ctx_leave (usock->ctx);
+ nn_fsm_raise (&usock->fsm, &usock->event_received);
}
return;
}
errnum_assert (rc == -ECONNRESET, -rc);
- nn_ctx_enter (usock->ctx);
- usock->out_callback->fn (usock->out_callback, usock,
- NN_USOCK_ERROR);
- nn_ctx_leave (usock->ctx);
+ nn_fsm_raise (&usock->fsm, &usock->event_error);
return;
case NN_WORKER_FD_OUT:
rc = nn_usock_send_raw (usock, &usock->out.hdr);
if (nn_fast (rc == 0)) {
nn_worker_reset_out (usock->worker, &usock->wfd);
- nn_ctx_enter (usock->ctx);
- usock->out_callback->fn (usock->out_callback, usock,
- NN_USOCK_SENT);
- nn_ctx_leave (usock->ctx);
+ nn_fsm_raise (&usock->fsm, &usock->event_sent);
return;
}
if (nn_fast (rc == -EAGAIN))
return;
errnum_assert (rc == -ECONNRESET, -rc);
- nn_ctx_enter (usock->ctx);
- usock->out_callback->fn (usock->out_callback, usock,
- NN_USOCK_ERROR);
- nn_ctx_leave (usock->ctx);
+ nn_fsm_raise (&usock->fsm, &usock->event_error);
return;
case NN_WORKER_FD_ERR:
nn_assert (0);
diff --git a/src/aio/usock.h b/src/aio/usock.h
index dd24959..ad362f5 100644
--- a/src/aio/usock.h
+++ b/src/aio/usock.h
@@ -26,9 +26,8 @@
/* Import the definition of nn_iovec. */
#include "../nn.h"
-#include "callback.h"
+#include "fsm.h"
#include "worker.h"
-#include "ctx.h"
#define _GNU_SOURCE
#include <sys/types.h>
@@ -54,14 +53,8 @@
struct nn_usock {
- /* This class is sink of events. */
- struct nn_callback in_callback;
-
- /* This class is source of events. */
- struct nn_callback *out_callback;
-
- /* AIO context the socket belongs to. */
- struct nn_ctx *ctx;
+ /* State machine base class. */
+ struct nn_fsm fsm;
/* The worker thread the usock is associated with. */
struct nn_worker *worker;
@@ -104,28 +97,29 @@
} out;
/* Asynchronous tasks for the worker. */
- struct nn_worker_task wtask_connect;
- struct nn_worker_task wtask_connected;
- struct nn_worker_task wtask_accept;
- struct nn_worker_task wtask_send;
- struct nn_worker_task wtask_recv;
- struct nn_worker_task wtask_close;
+ struct nn_worker_task task_connect;
+ struct nn_worker_task task_connected;
+ struct nn_worker_task task_accept;
+ struct nn_worker_task task_send;
+ struct nn_worker_task task_recv;
+ struct nn_worker_task task_close;
- /* Asynchronous callback tasks. */
- struct nn_ctx_task ctask_accepted;
- struct nn_ctx_task ctask_connected;
- struct nn_ctx_task ctask_sent;
- struct nn_ctx_task ctask_received;
- struct nn_ctx_task ctask_error;
+ /* Events raised by the usock. */
+ struct nn_fsm_event event_accepted;
+ struct nn_fsm_event event_connected;
+ struct nn_fsm_event event_sent;
+ struct nn_fsm_event event_received;
+ struct nn_fsm_event event_error;
+ struct nn_fsm_event event_closed;
/* When accepting a new connection, the pointer to the object to associate
the new connection with is stored here. */
struct nn_usock *newsock;
- struct nn_callback *newcallback;
+ struct nn_fsm *newowner;
};
int nn_usock_init (struct nn_usock *self, int domain, int type, int protocol,
- struct nn_ctx *ctx, struct nn_callback *callback);
+ struct nn_fsm *owner);
void nn_usock_close (struct nn_usock *self);
int nn_usock_setsockopt (struct nn_usock *self, int level, int optname,
@@ -135,7 +129,7 @@
size_t addrlen);
int nn_usock_listen (struct nn_usock *self, int backlog);
void nn_usock_accept (struct nn_usock *self, struct nn_usock *newsock,
- struct nn_callback *newcallback);
+ struct nn_fsm *newowner);
void nn_usock_connect (struct nn_usock *self, const struct sockaddr *addr,
size_t addrlen);
diff --git a/src/aio/worker.c b/src/aio/worker.c
index d0960d3..480728d 100644
--- a/src/aio/worker.c
+++ b/src/aio/worker.c
@@ -31,20 +31,18 @@
/* Private functions. */
static void nn_worker_routine (void *arg);
-void nn_worker_fd_init (struct nn_worker_fd *self,
- struct nn_callback *callback)
+void nn_worker_fd_init (struct nn_worker_fd *self, struct nn_fsm *owner)
{
- self->callback = callback;
+ self->owner = owner;
}
void nn_worker_fd_term (struct nn_worker_fd *self)
{
}
-void nn_worker_timer_init (struct nn_worker_timer *self,
- struct nn_callback *callback)
+void nn_worker_timer_init (struct nn_worker_timer *self, struct nn_fsm *owner)
{
- self->callback = callback;
+ self->owner = owner;
nn_timerset_hndl_init (&self->hndl);
}
@@ -95,10 +93,9 @@
nn_timerset_rm (&((struct nn_worker*) self)->timerset, &timer->hndl);
}
-void nn_worker_task_init (struct nn_worker_task *self,
- struct nn_callback *callback)
+void nn_worker_task_init (struct nn_worker_task *self, struct nn_fsm *owner)
{
- self->callback = callback;
+ self->owner = owner;
nn_queue_item_init (&self->item);
}
@@ -187,8 +184,7 @@
break;
errnum_assert (rc == 0, -rc);
timer = nn_cont (thndl, struct nn_worker_timer, hndl);
- timer->callback->fn (timer->callback, timer,
- NN_WORKER_TIMER_TIMEOUT);
+ nn_fsm_execute (timer->owner, NN_WORKER_TIMER_TIMEOUT);
}
/* Process all events from the poller. */
@@ -228,8 +224,7 @@
/* It's a user-defined task. Notify the user that it has
arrived in the worker thread. */
task = nn_cont (item, struct nn_worker_task, item);
- task->callback->fn (task->callback,
- task, NN_WORKER_TASK_EXECUTE);
+ nn_fsm_execute (task->owner, NN_WORKER_TASK_EXECUTE);
}
nn_queue_term (&tasks);
continue;
@@ -237,7 +232,7 @@
/* It's a true I/O event. Invoke the handler. */
fd = nn_cont (phndl, struct nn_worker_fd, hndl);
- fd->callback->fn (fd->callback, fd, pevent);
+ nn_fsm_execute (fd->owner, pevent);
}
}
}
diff --git a/src/aio/worker.h b/src/aio/worker.h
index a1cc9ff..0423ddf 100644
--- a/src/aio/worker.h
+++ b/src/aio/worker.h
@@ -30,7 +30,7 @@
#include "../utils/thread.h"
#include "../utils/efd.h"
-#include "callback.h"
+#include "fsm.h"
#include "poller.h"
#include "timerset.h"
@@ -39,23 +39,21 @@
#define NN_WORKER_FD_ERR NN_POLLER_ERR
struct nn_worker_fd {
- struct nn_callback *callback;
+ struct nn_fsm *owner;
struct nn_poller_hndl hndl;
};
-void nn_worker_fd_init (struct nn_worker_fd *self,
- struct nn_callback *callback);
+void nn_worker_fd_init (struct nn_worker_fd *self, struct nn_fsm *owner);
void nn_worker_fd_term (struct nn_worker_fd *self);
#define NN_WORKER_TIMER_TIMEOUT 1
struct nn_worker_timer {
- struct nn_callback *callback;
+ struct nn_fsm *owner;
struct nn_timerset_hndl hndl;
};
-void nn_worker_timer_init (struct nn_worker_timer *self,
- struct nn_callback *callback);
+void nn_worker_timer_init (struct nn_worker_timer *self, struct nn_fsm *owner);
void nn_worker_timer_term (struct nn_worker_timer *self);
@@ -64,12 +62,11 @@
#define NN_WORKER_TASK_EXECUTE 1
struct nn_worker_task {
- struct nn_callback *callback;
+ struct nn_fsm *owner;
struct nn_queue_item item;
};
-void nn_worker_task_init (struct nn_worker_task *self,
- struct nn_callback *callback);
+void nn_worker_task_init (struct nn_worker_task *self, struct nn_fsm *owner);
void nn_worker_task_term (struct nn_worker_task *self);
struct nn_worker {