External events implemented

Preparation for inproc transport. External events can cross
the boundary between different SP sockets.

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/aio/ctx.c b/src/aio/ctx.c
index 53cfa9b..a4f82be 100644
--- a/src/aio/ctx.c
+++ b/src/aio/ctx.c
@@ -32,11 +32,13 @@
     nn_mutex_init (&self->sync);
     self->pool = pool;
     nn_queue_init (&self->events);
+    nn_queue_init (&self->eventsto);
     self->onleave = onleave;
 }
 
 void nn_ctx_term (struct nn_ctx *self)
 {
+    nn_queue_term (&self->eventsto);
     nn_queue_term (&self->events);
     nn_mutex_term (&self->sync);
 }
@@ -50,8 +52,9 @@
 {
     struct nn_queue_item *item;
     struct nn_fsm_event *event;
+    struct nn_queue eventsto;
 
-    /*  Process any queued events before leaving the AIO context. */
+    /*  Process any queued events before leaving the context. */
     while (1) {
         item = nn_queue_pop (&self->events);
         event = nn_cont (item, struct nn_fsm_event, item);
@@ -64,7 +67,32 @@
     if (nn_fast (self->onleave != NULL))
         self->onleave (self);
 
+    /*  Shortcut in the case there are no external events. */
+    if (nn_queue_empty (&self->eventsto)) {
+        nn_mutex_unlock (&self->sync);
+        return;
+    }
+
+    /*  Make a copy of the queue of the external events so that it does not
+        get corrupted once we unlock the context. */
+    eventsto = self->eventsto;
+    nn_queue_init (&self->eventsto);
+
     nn_mutex_unlock (&self->sync);
+
+    /*  Process any queued external events. Before processing each event
+        lock the context it belongs to. */
+    while (1) {
+        item = nn_queue_pop (&eventsto);
+        event = nn_cont (item, struct nn_fsm_event, item);
+        if (!event)
+            break;
+        nn_ctx_enter (event->fsm->ctx);
+        nn_fsm_event_process (event);
+        nn_ctx_leave (event->fsm->ctx);
+    }
+
+    nn_queue_term (&eventsto);
 }
 
 struct nn_worker *nn_ctx_choose_worker (struct nn_ctx *self)
@@ -77,3 +105,8 @@
     nn_queue_push (&self->events, &event->item);
 }
 
+void nn_ctx_raiseto (struct nn_ctx *self, struct nn_fsm_event *event)
+{
+    nn_queue_push (&self->eventsto, &event->item);
+}
+
diff --git a/src/aio/ctx.h b/src/aio/ctx.h
index 8d76d23..f7cfec5 100644
--- a/src/aio/ctx.h
+++ b/src/aio/ctx.h
@@ -38,6 +38,7 @@
     struct nn_mutex sync;
     struct nn_pool *pool;
     struct nn_queue events;
+    struct nn_queue eventsto;
     nn_ctx_onleave onleave;
 };
 
@@ -51,6 +52,7 @@
 struct nn_worker *nn_ctx_choose_worker (struct nn_ctx *self);
 
 void nn_ctx_raise (struct nn_ctx *self, struct nn_fsm_event *event);
+void nn_ctx_raiseto (struct nn_ctx *self, struct nn_fsm_event *event);
 
 #endif
 
diff --git a/src/aio/fsm.c b/src/aio/fsm.c
index f49d22a..7e3508e 100644
--- a/src/aio/fsm.c
+++ b/src/aio/fsm.c
@@ -142,3 +142,12 @@
     nn_ctx_raise (self->ctx, event);
 }
 
+void nn_fsm_raiseto (struct nn_fsm *self, struct nn_fsm *dst,
+    struct nn_fsm_event *event, void *source, int type)
+{
+    event->fsm = dst;
+    event->source = source;
+    event->type = type;
+    nn_ctx_raiseto (self->ctx, event);
+}
+
diff --git a/src/aio/fsm.h b/src/aio/fsm.h
index affae78..e3239e1 100644
--- a/src/aio/fsm.h
+++ b/src/aio/fsm.h
@@ -84,5 +84,13 @@
 void nn_fsm_raise (struct nn_fsm *self, struct nn_fsm_event *event,
     void *source, int type);
 
+/*  Send event to the specified state machine. It's caller's responsibility
+    to ensure that the destination state machine will still exist when the
+    event is delivered. NOTE: This function is a hack to make inproc transport
+    work in the most efficient manner. Do not use it outside of inproc
+    transport! */
+void nn_fsm_raiseto (struct nn_fsm *self, struct nn_fsm *dst,
+    struct nn_fsm_event *event, void *source, int type);
+
 #endif