External events implemented
Preparation for inproc transport. External events can cross
the boundary between different SP sockets.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/aio/ctx.c b/src/aio/ctx.c
index 53cfa9b..a4f82be 100644
--- a/src/aio/ctx.c
+++ b/src/aio/ctx.c
@@ -32,11 +32,13 @@
nn_mutex_init (&self->sync);
self->pool = pool;
nn_queue_init (&self->events);
+ nn_queue_init (&self->eventsto);
self->onleave = onleave;
}
void nn_ctx_term (struct nn_ctx *self)
{
+ nn_queue_term (&self->eventsto);
nn_queue_term (&self->events);
nn_mutex_term (&self->sync);
}
@@ -50,8 +52,9 @@
{
struct nn_queue_item *item;
struct nn_fsm_event *event;
+ struct nn_queue eventsto;
- /* Process any queued events before leaving the AIO context. */
+ /* Process any queued events before leaving the context. */
while (1) {
item = nn_queue_pop (&self->events);
event = nn_cont (item, struct nn_fsm_event, item);
@@ -64,7 +67,32 @@
if (nn_fast (self->onleave != NULL))
self->onleave (self);
+ /* Shortcut in the case there are no external events. */
+ if (nn_queue_empty (&self->eventsto)) {
+ nn_mutex_unlock (&self->sync);
+ return;
+ }
+
+ /* Make a copy of the queue of the external events so that it does not
+ get corrupted once we unlock the context. */
+ eventsto = self->eventsto;
+ nn_queue_init (&self->eventsto);
+
nn_mutex_unlock (&self->sync);
+
+ /* Process any queued external events. Before processing each event
+ lock the context it belongs to. */
+ while (1) {
+ item = nn_queue_pop (&eventsto);
+ event = nn_cont (item, struct nn_fsm_event, item);
+ if (!event)
+ break;
+ nn_ctx_enter (event->fsm->ctx);
+ nn_fsm_event_process (event);
+ nn_ctx_leave (event->fsm->ctx);
+ }
+
+ nn_queue_term (&eventsto);
}
struct nn_worker *nn_ctx_choose_worker (struct nn_ctx *self)
@@ -77,3 +105,8 @@
nn_queue_push (&self->events, &event->item);
}
+void nn_ctx_raiseto (struct nn_ctx *self, struct nn_fsm_event *event)
+{
+ nn_queue_push (&self->eventsto, &event->item);
+}
+
diff --git a/src/aio/ctx.h b/src/aio/ctx.h
index 8d76d23..f7cfec5 100644
--- a/src/aio/ctx.h
+++ b/src/aio/ctx.h
@@ -38,6 +38,7 @@
struct nn_mutex sync;
struct nn_pool *pool;
struct nn_queue events;
+ struct nn_queue eventsto;
nn_ctx_onleave onleave;
};
@@ -51,6 +52,7 @@
struct nn_worker *nn_ctx_choose_worker (struct nn_ctx *self);
void nn_ctx_raise (struct nn_ctx *self, struct nn_fsm_event *event);
+void nn_ctx_raiseto (struct nn_ctx *self, struct nn_fsm_event *event);
#endif
diff --git a/src/aio/fsm.c b/src/aio/fsm.c
index f49d22a..7e3508e 100644
--- a/src/aio/fsm.c
+++ b/src/aio/fsm.c
@@ -142,3 +142,12 @@
nn_ctx_raise (self->ctx, event);
}
+void nn_fsm_raiseto (struct nn_fsm *self, struct nn_fsm *dst,
+ struct nn_fsm_event *event, void *source, int type)
+{
+ event->fsm = dst;
+ event->source = source;
+ event->type = type;
+ nn_ctx_raiseto (self->ctx, event);
+}
+
diff --git a/src/aio/fsm.h b/src/aio/fsm.h
index affae78..e3239e1 100644
--- a/src/aio/fsm.h
+++ b/src/aio/fsm.h
@@ -84,5 +84,13 @@
void nn_fsm_raise (struct nn_fsm *self, struct nn_fsm_event *event,
void *source, int type);
+/* Send event to the specified state machine. It's caller's responsibility
+ to ensure that the destination state machine will still exist when the
+ event is delivered. NOTE: This function is a hack to make inproc transport
+ work in the most efficient manner. Do not use it outside of inproc
+ transport! */
+void nn_fsm_raiseto (struct nn_fsm *self, struct nn_fsm *dst,
+ struct nn_fsm_event *event, void *source, int type);
+
#endif