Initial implementation of polling infrastrucutre

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 983b613..42f9769 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -117,6 +117,11 @@
     utils/wire.h
     utils/wire.c
 
+    protocols/bus/bus.h
+    protocols/bus/bus.c
+    protocols/bus/xbus.h
+    protocols/bus/xbus.c
+
     protocols/fanin/sink.h
     protocols/fanin/sink.c
     protocols/fanin/source.h
@@ -165,11 +170,6 @@
     protocols/survey/xsurveyor.h
     protocols/survey/xsurveyor.c
 
-    protocols/bus/bus.h
-    protocols/bus/bus.c
-    protocols/bus/xbus.h
-    protocols/bus/xbus.c
-
     transports/inproc/inproc.h
     transports/inproc/inproc_ctx.h
     transports/inproc/inproc_ctx.c
diff --git a/src/core/sock.c b/src/core/sock.c
index c318a4f..32ae71f 100644
--- a/src/core/sock.c
+++ b/src/core/sock.c
@@ -43,6 +43,17 @@
 #define NN_SOCK_FLAG_RCVFD 4
 #define NN_SOCK_FLAG_ERRFD 8
 
+/*  These bits specify whether individual efds are signalled or not at
+    the moment. Storing this information allows us to avoid redundant signalling
+    and unsignalling of the efd objects. It is also used to store events
+    while efds are not yet created (see above). */
+#define NN_SOCK_FLAG_IN 16
+#define NN_SOCK_FLAG_OUT 32
+#define NN_SOCK_FLAG_ERR 64
+
+/*  Private functions. */
+void nn_sockbase_adjust_events (struct nn_sockbase *self);
+
 void nn_sockbase_init (struct nn_sockbase *self,
     const struct nn_sockbase_vfptr *vfptr, int fd)
 {
@@ -137,14 +148,9 @@
     nn_cp_term (&self->cp);
 }
 
-void nn_sockbase_unblock_recv (struct nn_sockbase *self)
+void nn_sockbase_changed (struct nn_sockbase *self)
 {
-    nn_cond_post (&self->cond);
-}
-
-void nn_sockbase_unblock_send (struct nn_sockbase *self)
-{
-    nn_cond_post (&self->cond);
+    nn_sockbase_adjust_events (self);
 }
 
 struct nn_cp *nn_sockbase_getcp (struct nn_sockbase *self)
@@ -179,6 +185,7 @@
     if (level > NN_SOL_SOCKET) {
         rc = sockbase->vfptr->setopt (sockbase, level, option,
             optval, optvallen);
+        nn_sockbase_adjust_events (sockbase);
         nn_cp_unlock (&sockbase->cp);
         return rc;
     }
@@ -354,6 +361,7 @@
     if (level > NN_SOL_SOCKET) {
         rc = sockbase->vfptr->getopt (sockbase, level, option,
             optval, optvallen);
+        nn_sockbase_adjust_events (sockbase);
         if (!internal)
             nn_cp_unlock (&sockbase->cp);
         return rc;
@@ -473,6 +481,7 @@
 
         /*  Try to send the message in a non-blocking way. */
         rc = sockbase->vfptr->send (sockbase, msg);
+        nn_sockbase_adjust_events (sockbase);
         if (nn_fast (rc == 0)) {
             nn_cp_unlock (&sockbase->cp);
             return 0;
@@ -526,6 +535,7 @@
 
         /*  Try to receive the message in a non-blocking way. */
         rc = sockbase->vfptr->recv (sockbase, msg);
+        nn_sockbase_adjust_events (sockbase);
         if (nn_fast (rc == 0)) {
             nn_cp_unlock (&sockbase->cp);
             return 0;
@@ -581,49 +591,83 @@
 
 int nn_sock_add (struct nn_sock *self, struct nn_pipe *pipe)
 {
+    int rc;
     struct nn_sockbase *sockbase;
 
-    /*  Forward the call to the specific socket type. */
     sockbase = (struct nn_sockbase*) self;
-    return sockbase->vfptr->add (sockbase, pipe);
+
+    rc = sockbase->vfptr->add (sockbase, pipe);
+    nn_sockbase_adjust_events (sockbase);
+    return rc;
 }
 
 void nn_sock_rm (struct nn_sock *self, struct nn_pipe *pipe)
 {
     struct nn_sockbase *sockbase;
 
-    /*  Forward the call to the specific socket type. */
     sockbase = (struct nn_sockbase*) self;
+
     sockbase->vfptr->rm (sockbase, pipe);
+    nn_sockbase_adjust_events (sockbase);
 }
 
 void nn_sock_in (struct nn_sock *self, struct nn_pipe *pipe)
 {
-    int rc;
     struct nn_sockbase *sockbase;
 
-    /*  Forward the call to the specific socket type. */
     sockbase = (struct nn_sockbase*) self;
-    rc = sockbase->vfptr->in (sockbase, pipe);
-    errnum_assert (rc >= 0, -rc);
-    if (rc == 1) {
-#if defined NN_LATENCY_MONITOR
-        nn_latmon_measure (NN_LATMON_COND_POST);
-#endif
-        nn_cond_post (&sockbase->cond);
-    }
+
+    sockbase->vfptr->in (sockbase, pipe);
+    nn_sockbase_adjust_events (sockbase);
 }
 
 void nn_sock_out (struct nn_sock *self, struct nn_pipe *pipe)
 {
-    int rc;
     struct nn_sockbase *sockbase;
 
-    /*  Forward the call to the specific socket type. */
     sockbase = (struct nn_sockbase*) self;
-    rc = sockbase->vfptr->out (sockbase, pipe);
-    errnum_assert (rc >= 0, -rc);
-    if (rc == 1)
-        nn_cond_post (&sockbase->cond);
+
+    sockbase->vfptr->out (sockbase, pipe);
+    nn_sockbase_adjust_events (sockbase);
+}
+
+void nn_sockbase_adjust_events (struct nn_sockbase *self)
+{
+    int events;
+
+    /*  Check whether socket is readable and/or writeable at the moment. */
+    events = self->vfptr->events (self);
+    errnum_assert (events >= 0, -events);
+
+    /*  Socket becomes readable. */
+    if (!(self->flags & NN_SOCK_FLAG_IN) && events & NN_SOCKBASE_EVENT_IN) {
+        self->flags |= NN_SOCK_FLAG_IN;
+        if (self->flags & NN_SOCK_FLAG_RCVFD)
+            nn_efd_signal (&self->rcvfd);
+        nn_cond_post (&self->cond);
+    }
+
+    /*  Socket becomes writeable. */
+    if (!(self->flags & NN_SOCK_FLAG_OUT) && events & NN_SOCKBASE_EVENT_OUT) {
+        self->flags |= NN_SOCK_FLAG_OUT;
+        if (self->flags & NN_SOCK_FLAG_SNDFD)
+            nn_efd_signal (&self->sndfd);
+        nn_cond_post (&self->cond);
+    }
+    
+    /*  Socket ceases to be readable. */
+    if (self->flags & NN_SOCK_FLAG_IN && !(events & NN_SOCKBASE_EVENT_IN)) {
+        self->flags &= ~NN_SOCK_FLAG_IN;
+        if (self->flags & NN_SOCK_FLAG_RCVFD)
+            nn_efd_unsignal (&self->rcvfd);
+    }
+
+    /*  Socket ceases to be writeable. */
+    if (self->flags & NN_SOCK_FLAG_OUT &&
+          !(events & NN_SOCKBASE_EVENT_OUT)) {
+        self->flags &= ~NN_SOCK_FLAG_OUT;
+        if (self->flags & NN_SOCK_FLAG_SNDFD)
+            nn_efd_unsignal (&self->sndfd);
+    }
 }
 
diff --git a/src/protocol.h b/src/protocol.h
index 1add55e..ef8dd1a 100644
--- a/src/protocol.h
+++ b/src/protocol.h
@@ -73,6 +73,9 @@
 
 struct nn_sockbase;
 
+#define NN_SOCKBASE_EVENT_IN 1
+#define NN_SOCKBASE_EVENT_OUT 2
+
 /*  To be implemented by individual socket types. */
 struct nn_sockbase_vfptr {
 
@@ -86,8 +89,12 @@
         informs it that it is writeable. */
     int (*add) (struct nn_sockbase *self, struct nn_pipe *pipe);
     void (*rm) (struct nn_sockbase *self, struct nn_pipe *pipe);
-    int (*in) (struct nn_sockbase *self, struct nn_pipe *pipe);
-    int (*out) (struct nn_sockbase *self, struct nn_pipe *pipe);
+    void (*in) (struct nn_sockbase *self, struct nn_pipe *pipe);
+    void (*out) (struct nn_sockbase *self, struct nn_pipe *pipe);
+
+    /*  Return any combination of event flags defined above, thus specifying
+        whether the socket should be readable, writeable or none. */
+    int (*events) (struct nn_sockbase *self);
 
     /*  Send a message to the socket. Returns -EAGAIN if it cannot be done at
         the moment or zero in case of success. */
@@ -144,11 +151,13 @@
 /*  Uninitialise the socket. */
 void nn_sockbase_term (struct nn_sockbase *self);
 
-/*  If recv is blocking at the moment, this function will unblock it. */
-void nn_sockbase_unblock_recv (struct nn_sockbase *self);
-
-/*  If send is blocking at the moment, this function will unblock it. */
-void nn_sockbase_unblock_send (struct nn_sockbase *self);
+/*  Call this function when the state of the socket changed asynchronously,
+    i.e. if the socket became (un)readable/(un)writeable outside of one of the
+    nn_sockbase_vfptr functions, for example, in a timer handler or in a worker
+    thread created by the protocol implementation. Calling it allows nanomsg
+    core to check the new state and unblock any threads blocked in
+    send/recv/poll functions as needed. */
+void nn_sockbase_changed (struct nn_sockbase *self);
 
 /*  Returns the completion port associated with the socket. */
 struct nn_cp *nn_sockbase_getcp (struct nn_sockbase *self);
diff --git a/src/protocols/bus/bus.c b/src/protocols/bus/bus.c
index aa0c339..cba2708 100644
--- a/src/protocols/bus/bus.c
+++ b/src/protocols/bus/bus.c
@@ -51,6 +51,7 @@
     nn_xbus_rm,
     nn_xbus_in,
     nn_xbus_out,
+    nn_xbus_events,
     nn_bus_send,
     nn_bus_recv,
     nn_xbus_setopt,
diff --git a/src/protocols/bus/xbus.c b/src/protocols/bus/xbus.c
index 6d18ad2..16901bd 100644
--- a/src/protocols/bus/xbus.c
+++ b/src/protocols/bus/xbus.c
@@ -47,6 +47,7 @@
     nn_xbus_rm,
     nn_xbus_in,
     nn_xbus_out,
+    nn_xbus_events,
     nn_xbus_send,
     nn_xbus_recv,
     nn_xbus_setopt,
@@ -111,7 +112,7 @@
     nn_free (data);
 }
 
-int nn_xbus_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xbus_in (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
     struct nn_xbus *xbus;
     struct nn_xbus_data *data;
@@ -119,10 +120,10 @@
     xbus = nn_cont (self, struct nn_xbus, sockbase);
     data = nn_pipe_getdata (pipe);
 
-    return nn_fq_in (&xbus->inpipes, pipe, &data->initem);
+    nn_fq_in (&xbus->inpipes, pipe, &data->initem);
 }
 
-int nn_xbus_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xbus_out (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
     struct nn_xbus *xbus;
     struct nn_xbus_data *data;
@@ -130,7 +131,13 @@
     xbus = nn_cont (self, struct nn_xbus, sockbase);
     data = nn_pipe_getdata (pipe);
 
-    return nn_dist_out (&xbus->outpipes, pipe, &data->outitem);
+    nn_dist_out (&xbus->outpipes, pipe, &data->outitem);
+}
+
+int nn_xbus_events (struct nn_sockbase *self)
+{
+    return (nn_fq_can_recv (&nn_cont (self, struct nn_xbus,
+        sockbase)->inpipes) ? NN_SOCKBASE_EVENT_IN : 0) | NN_SOCKBASE_EVENT_OUT;
 }
 
 int nn_xbus_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/bus/xbus.h b/src/protocols/bus/xbus.h
index f5a2e7e..fed737c 100644
--- a/src/protocols/bus/xbus.h
+++ b/src/protocols/bus/xbus.h
@@ -49,8 +49,9 @@
 
 int nn_xbus_add (struct nn_sockbase *self, struct nn_pipe *pipe);
 void nn_xbus_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xbus_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xbus_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xbus_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xbus_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+int nn_xbus_events (struct nn_sockbase *self);
 int nn_xbus_send (struct nn_sockbase *self, struct nn_msg *msg);
 int nn_xbus_recv (struct nn_sockbase *self, struct nn_msg *msg);
 int nn_xbus_setopt (struct nn_sockbase *self, int level, int option,
diff --git a/src/protocols/fanin/xsink.c b/src/protocols/fanin/xsink.c
index 29b1eb6..af6cc32 100644
--- a/src/protocols/fanin/xsink.c
+++ b/src/protocols/fanin/xsink.c
@@ -49,8 +49,9 @@
 static void nn_xsink_destroy (struct nn_sockbase *self);
 static int nn_xsink_add (struct nn_sockbase *self, struct nn_pipe *pipe);
 static void nn_xsink_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xsink_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xsink_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xsink_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xsink_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static int nn_xsink_events (struct nn_sockbase *self);
 static int nn_xsink_send (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_xsink_recv (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_xsink_setopt (struct nn_sockbase *self, int level, int option,
@@ -66,6 +67,7 @@
     nn_xsink_rm,
     nn_xsink_in,
     nn_xsink_out,
+    nn_xsink_events,
     nn_xsink_send,
     nn_xsink_recv,
     nn_xsink_setopt,
@@ -122,21 +124,26 @@
     nn_free (data);
 }
 
-static int nn_xsink_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xsink_in (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
     struct nn_xsink *xsink;
     struct nn_xsink_data *data;
 
     xsink = nn_cont (self, struct nn_xsink, sockbase);
     data = nn_pipe_getdata (pipe);
-    return nn_fq_in (&xsink->fq, pipe, &data->fq);
+    nn_fq_in (&xsink->fq, pipe, &data->fq);
 }
 
-static int nn_xsink_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xsink_out (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
     /*  We are not going to send any messages, so there's no need to store
         the list of outbound pipes. */
-    return 0;
+}
+
+static int nn_xsink_events (struct nn_sockbase *self)
+{
+    return nn_fq_can_recv (&nn_cont (self, struct nn_xsink, sockbase)->fq) ?
+        NN_SOCKBASE_EVENT_IN : 0;
 }
 
 static int nn_xsink_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/fanin/xsource.c b/src/protocols/fanin/xsource.c
index 4d72cfb..31e5e80 100644
--- a/src/protocols/fanin/xsource.c
+++ b/src/protocols/fanin/xsource.c
@@ -45,8 +45,9 @@
 static void nn_xsource_destroy (struct nn_sockbase *self);
 static int nn_xsource_add (struct nn_sockbase *self, struct nn_pipe *pipe);
 static void nn_xsource_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xsource_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xsource_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xsource_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xsource_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static int nn_xsource_events (struct nn_sockbase *self);
 static int nn_xsource_send (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_xsource_recv (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_xsource_setopt (struct nn_sockbase *self, int level, int option,
@@ -62,6 +63,7 @@
     nn_xsource_rm,
     nn_xsource_in,
     nn_xsource_out,
+    nn_xsource_events,
     nn_xsource_send,
     nn_xsource_recv,
     nn_xsource_setopt,
@@ -104,16 +106,29 @@
     nn_excl_rm (&nn_cont (self, struct nn_xsource, sockbase)->excl, pipe);
 }
 
-static int nn_xsource_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xsource_in (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
-    return nn_excl_in (&nn_cont (self, struct nn_xsource, sockbase)->excl,
-        pipe);
+    nn_excl_in (&nn_cont (self, struct nn_xsource, sockbase)->excl, pipe);
 }
 
-static int nn_xsource_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xsource_out (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
-    return nn_excl_out (&nn_cont (self, struct nn_xsource, sockbase)->excl,
-        pipe);
+    nn_excl_out (&nn_cont (self, struct nn_xsource, sockbase)->excl, pipe);
+}
+
+static int nn_xsource_events (struct nn_sockbase *self)
+{
+    struct nn_xsource *xsource;
+    int events;
+
+    xsource = nn_cont (self, struct nn_xsource, sockbase);
+
+    events = 0;
+    if (nn_excl_can_recv (&xsource->excl))
+        events |= NN_SOCKBASE_EVENT_IN;
+    if (nn_excl_can_send (&xsource->excl))
+        events |= NN_SOCKBASE_EVENT_OUT;
+    return events;
 }
 
 static int nn_xsource_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/fanout/xpull.c b/src/protocols/fanout/xpull.c
index 05f4e9b..32de5b3 100644
--- a/src/protocols/fanout/xpull.c
+++ b/src/protocols/fanout/xpull.c
@@ -45,8 +45,9 @@
 static void nn_xpull_destroy (struct nn_sockbase *self);
 static int nn_xpull_add (struct nn_sockbase *self, struct nn_pipe *pipe);
 static void nn_xpull_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xpull_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xpull_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xpull_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xpull_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static int nn_xpull_events (struct nn_sockbase *self);
 static int nn_xpull_send (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_xpull_recv (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_xpull_setopt (struct nn_sockbase *self, int level, int option,
@@ -62,6 +63,7 @@
     nn_xpull_rm,
     nn_xpull_in,
     nn_xpull_out,
+    nn_xpull_events,
     nn_xpull_send,
     nn_xpull_recv,
     nn_xpull_setopt,
@@ -104,14 +106,29 @@
     nn_excl_rm (&nn_cont (self, struct nn_xpull, sockbase)->excl, pipe);
 }
 
-static int nn_xpull_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xpull_in (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
-    return nn_excl_in (&nn_cont (self, struct nn_xpull, sockbase)->excl, pipe);
+    nn_excl_in (&nn_cont (self, struct nn_xpull, sockbase)->excl, pipe);
 }
 
-static int nn_xpull_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xpull_out (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
-    return nn_excl_out (&nn_cont (self, struct nn_xpull, sockbase)->excl, pipe);
+    nn_excl_out (&nn_cont (self, struct nn_xpull, sockbase)->excl, pipe);
+}
+
+static int nn_xpull_events (struct nn_sockbase *self)
+{
+    struct nn_xpull *xpull;
+    int events;
+
+    xpull = nn_cont (self, struct nn_xpull, sockbase);
+
+    events = 0;
+    if (nn_excl_can_recv (&xpull->excl))
+        events |= NN_SOCKBASE_EVENT_IN;
+    if (nn_excl_can_send (&xpull->excl))
+        events |= NN_SOCKBASE_EVENT_OUT;
+    return events;
 }
 
 static int nn_xpull_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/fanout/xpush.c b/src/protocols/fanout/xpush.c
index 44326a4..1033d4e 100644
--- a/src/protocols/fanout/xpush.c
+++ b/src/protocols/fanout/xpush.c
@@ -49,8 +49,9 @@
 static void nn_xpush_destroy (struct nn_sockbase *self);
 static int nn_xpush_add (struct nn_sockbase *self, struct nn_pipe *pipe);
 static void nn_xpush_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xpush_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xpush_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xpush_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xpush_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static int nn_xpush_events (struct nn_sockbase *self);
 static int nn_xpush_send (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_xpush_recv (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_xpush_setopt (struct nn_sockbase *self, int level, int option,
@@ -66,6 +67,7 @@
     nn_xpush_rm,
     nn_xpush_in,
     nn_xpush_out,
+    nn_xpush_events,
     nn_xpush_send,
     nn_xpush_recv,
     nn_xpush_setopt,
@@ -122,21 +124,26 @@
     nn_free (data);
 }
 
-static int nn_xpush_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xpush_in (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
     /*  We are not going to receive any messages, so there's no need to store
         the list of inbound pipes. */
-    return 0;
 }
 
-static int nn_xpush_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xpush_out (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
     struct nn_xpush *xpush;
     struct nn_xpush_data *data;
 
     xpush = nn_cont (self, struct nn_xpush, sockbase);
     data = nn_pipe_getdata (pipe);
-    return nn_lb_out (&xpush->lb, pipe, &data->lb);
+    nn_lb_out (&xpush->lb, pipe, &data->lb);
+}
+
+static int nn_xpush_events (struct nn_sockbase *self)
+{
+    return nn_lb_can_send (&nn_cont (self, struct nn_xpush, sockbase)->lb) ?
+        NN_SOCKBASE_EVENT_OUT : 0;
 }
 
 static int nn_xpush_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/pair/xpair.c b/src/protocols/pair/xpair.c
index b5ffe57..1c128ad 100644
--- a/src/protocols/pair/xpair.c
+++ b/src/protocols/pair/xpair.c
@@ -45,8 +45,9 @@
 static void nn_xpair_destroy (struct nn_sockbase *self);
 static int nn_xpair_add (struct nn_sockbase *self, struct nn_pipe *pipe);
 static void nn_xpair_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xpair_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xpair_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xpair_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xpair_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static int nn_xpair_events (struct nn_sockbase *self);
 static int nn_xpair_send (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_xpair_recv (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_xpair_setopt (struct nn_sockbase *self, int level, int option,
@@ -62,6 +63,7 @@
     nn_xpair_rm,
     nn_xpair_in,
     nn_xpair_out,
+    nn_xpair_events,
     nn_xpair_send,
     nn_xpair_recv,
     nn_xpair_setopt,
@@ -103,14 +105,29 @@
     nn_excl_rm (&nn_cont (self, struct nn_xpair, sockbase)->excl, pipe);
 }
 
-static int nn_xpair_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xpair_in (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
-    return nn_excl_in (&nn_cont (self, struct nn_xpair, sockbase)->excl, pipe);
+    nn_excl_in (&nn_cont (self, struct nn_xpair, sockbase)->excl, pipe);
 }
 
-static int nn_xpair_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xpair_out (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
-    return nn_excl_out (&nn_cont (self, struct nn_xpair, sockbase)->excl, pipe);
+    nn_excl_out (&nn_cont (self, struct nn_xpair, sockbase)->excl, pipe);
+}
+
+static int nn_xpair_events (struct nn_sockbase *self)
+{
+    struct nn_xpair *xpair;
+    int events;
+
+    xpair = nn_cont (self, struct nn_xpair, sockbase);
+
+    events = 0;
+    if (nn_excl_can_recv (&xpair->excl))
+        events |= NN_SOCKBASE_EVENT_IN;
+    if (nn_excl_can_send (&xpair->excl))
+        events |= NN_SOCKBASE_EVENT_OUT;
+    return events;
 }
 
 static int nn_xpair_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/pubsub/pub.c b/src/protocols/pubsub/pub.c
index d624ea1..a709efc 100644
--- a/src/protocols/pubsub/pub.c
+++ b/src/protocols/pubsub/pub.c
@@ -55,8 +55,9 @@
 static void nn_pub_destroy (struct nn_sockbase *self);
 static int nn_pub_add (struct nn_sockbase *self, struct nn_pipe *pipe);
 static void nn_pub_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_pub_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_pub_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_pub_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_pub_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static int nn_pub_events (struct nn_sockbase *self);
 static int nn_pub_send (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_pub_recv (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_pub_setopt (struct nn_sockbase *self, int level, int option,
@@ -72,6 +73,7 @@
     nn_pub_rm,
     nn_pub_in,
     nn_pub_out,
+    nn_pub_events,
     nn_pub_send,
     nn_pub_recv,
     nn_pub_setopt,
@@ -131,13 +133,13 @@
     nn_free (data);
 }
 
-static int nn_pub_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_pub_in (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
     /*  We shouldn't get any messages from subscribers. */
     nn_assert (0);
 }
 
-static int nn_pub_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_pub_out (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
     struct nn_pub *pub;
     struct nn_pub_data *data;
@@ -145,7 +147,12 @@
     pub = nn_cont (self, struct nn_pub, sockbase);
     data = nn_pipe_getdata (pipe);
 
-    return nn_dist_out (&pub->outpipes, pipe, &data->item);
+    nn_dist_out (&pub->outpipes, pipe, &data->item);
+}
+
+static int nn_pub_events (struct nn_sockbase *self)
+{
+    return NN_SOCKBASE_EVENT_OUT;
 }
 
 static int nn_pub_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/pubsub/sub.c b/src/protocols/pubsub/sub.c
index 7f2edd8..0ad7d93 100644
--- a/src/protocols/pubsub/sub.c
+++ b/src/protocols/pubsub/sub.c
@@ -47,8 +47,9 @@
 static void nn_sub_destroy (struct nn_sockbase *self);
 static int nn_sub_add (struct nn_sockbase *self, struct nn_pipe *pipe);
 static void nn_sub_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_sub_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_sub_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_sub_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_sub_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static int nn_sub_events (struct nn_sockbase *self);
 static int nn_sub_send (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_sub_recv (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_sub_setopt (struct nn_sockbase *self, int level, int option,
@@ -64,6 +65,7 @@
     nn_sub_rm,
     nn_sub_in,
     nn_sub_out,
+    nn_sub_events,
     nn_sub_send,
     nn_sub_recv,
     nn_sub_setopt,
@@ -107,14 +109,29 @@
     nn_excl_rm (&nn_cont (self, struct nn_sub, sockbase)->excl, pipe);
 }
 
-static int nn_sub_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_sub_in (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
-    return nn_excl_in (&nn_cont (self, struct nn_sub, sockbase)->excl, pipe);
+    nn_excl_in (&nn_cont (self, struct nn_sub, sockbase)->excl, pipe);
 }
 
-static int nn_sub_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_sub_out (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
-    return nn_excl_out (&nn_cont (self, struct nn_sub, sockbase)->excl, pipe);
+    nn_excl_out (&nn_cont (self, struct nn_sub, sockbase)->excl, pipe);
+}
+
+static int nn_sub_events (struct nn_sockbase *self)
+{
+    struct nn_sub *sub;
+    int events;
+
+    sub = nn_cont (self, struct nn_sub, sockbase);
+
+    events = 0;
+    if (nn_excl_can_recv (&sub->excl))
+        events |= NN_SOCKBASE_EVENT_IN;
+    if (nn_excl_can_send (&sub->excl))
+        events |= NN_SOCKBASE_EVENT_OUT;
+    return events;
 }
 
 static int nn_sub_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/reqrep/rep.c b/src/protocols/reqrep/rep.c
index d9f9702..71c3046 100644
--- a/src/protocols/reqrep/rep.c
+++ b/src/protocols/reqrep/rep.c
@@ -51,6 +51,7 @@
 
 /*  Implementation of nn_sockbase's virtual functions. */
 static void nn_rep_destroy (struct nn_sockbase *self);
+static int nn_rep_events (struct nn_sockbase *self);
 static int nn_rep_send (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_rep_recv (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_rep_sethdr (struct nn_msg *msg, const void *hdr,
@@ -63,6 +64,7 @@
     nn_xrep_rm,
     nn_xrep_in,
     nn_xrep_out,
+    nn_rep_events,
     nn_rep_send,
     nn_rep_recv,
     nn_xrep_setopt,
@@ -95,6 +97,17 @@
     nn_free (rep);
 }
 
+static int nn_rep_events (struct nn_sockbase *self)
+{
+    struct nn_rep *rep;
+    int events;
+
+    rep = nn_cont (self, struct nn_rep, xrep.sockbase);
+    if (!(rep->flags & NN_REP_INPROGRESS))
+        events &= ~NN_SOCKBASE_EVENT_OUT;
+    events = nn_xrep_events (&rep->xrep.sockbase);
+}
+
 static int nn_rep_send (struct nn_sockbase *self, struct nn_msg *msg)
 {
     int rc;
diff --git a/src/protocols/reqrep/req.c b/src/protocols/reqrep/req.c
index b9f2dfc..8c1c3d1 100644
--- a/src/protocols/reqrep/req.c
+++ b/src/protocols/reqrep/req.c
@@ -57,6 +57,7 @@
 
 /*  Implementation of nn_sockbase's virtual functions. */
 static void nn_req_destroy (struct nn_sockbase *self);
+static int nn_req_events (struct nn_sockbase *self);
 static int nn_req_send (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_req_recv (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_req_setopt (struct nn_sockbase *self, int level, int option,
@@ -72,6 +73,7 @@
     nn_xreq_rm,
     nn_xreq_in,
     nn_xreq_out,
+    nn_req_events,
     nn_req_send,
     nn_req_recv,
     nn_req_setopt,
@@ -128,6 +130,19 @@
     nn_free (req);
 }
 
+static int nn_req_events (struct nn_sockbase *self)
+{
+    struct nn_req *req;
+    int events;
+
+    req = nn_cont (self, struct nn_req, xreq.sockbase);
+
+    events = nn_xreq_events (&req->xreq.sockbase);
+    if (!(req->flags & NN_REQ_INPROGRESS))
+        events &= ~NN_SOCKBASE_EVENT_IN;
+    return events;
+}
+
 static int nn_req_send (struct nn_sockbase *self, struct nn_msg *msg)
 {
     int rc;
diff --git a/src/protocols/reqrep/xrep.c b/src/protocols/reqrep/xrep.c
index 8297bbe..d3c3bf7 100644
--- a/src/protocols/reqrep/xrep.c
+++ b/src/protocols/reqrep/xrep.c
@@ -43,6 +43,7 @@
     nn_xrep_rm,
     nn_xrep_in,
     nn_xrep_out,
+    nn_xrep_events,
     nn_xrep_send,
     nn_xrep_recv,
     nn_xrep_setopt,
@@ -116,7 +117,7 @@
     nn_free (data);
 }
 
-int nn_xrep_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xrep_in (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
     struct nn_xrep *xrep;
     struct nn_xrep_data *data;
@@ -124,18 +125,21 @@
     xrep = nn_cont (self, struct nn_xrep, sockbase);
     data = nn_pipe_getdata (pipe);
     
-    return nn_fq_in (&xrep->inpipes, pipe, &data->initem);
+    nn_fq_in (&xrep->inpipes, pipe, &data->initem);
 }
 
-int nn_xrep_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xrep_out (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
     struct nn_xrep_data *data;
 
     data = nn_pipe_getdata (pipe);
     data->flags |= NN_XREP_OUT;
+}
 
-    /*  XREP socket never blocks on send, so there's no point in unblocking. */
-    return 0;
+int nn_xrep_events (struct nn_sockbase *self)
+{
+    return (nn_fq_can_recv (&nn_cont (self, struct nn_xrep,
+        sockbase)->inpipes) ? NN_SOCKBASE_EVENT_IN : 0) | NN_SOCKBASE_EVENT_OUT;
 }
 
 int nn_xrep_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/reqrep/xrep.h b/src/protocols/reqrep/xrep.h
index d9abcde..2a0d0fd 100644
--- a/src/protocols/reqrep/xrep.h
+++ b/src/protocols/reqrep/xrep.h
@@ -60,8 +60,9 @@
 
 int nn_xrep_add (struct nn_sockbase *self, struct nn_pipe *pipe);
 void nn_xrep_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xrep_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xrep_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xrep_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xrep_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+int nn_xrep_events (struct nn_sockbase *self);
 int nn_xrep_send (struct nn_sockbase *self, struct nn_msg *msg);
 int nn_xrep_recv (struct nn_sockbase *self, struct nn_msg *msg);
 int nn_xrep_setopt (struct nn_sockbase *self, int level, int option,
diff --git a/src/protocols/reqrep/xreq.c b/src/protocols/reqrep/xreq.c
index b801b2d..ea4011e 100644
--- a/src/protocols/reqrep/xreq.c
+++ b/src/protocols/reqrep/xreq.c
@@ -44,6 +44,7 @@
     nn_xreq_rm,
     nn_xreq_in,
     nn_xreq_out,
+    nn_xreq_events,
     nn_xreq_send,
     nn_xreq_recv,
     nn_xreq_setopt,
@@ -103,24 +104,34 @@
     nn_free (data);
 }
 
-int nn_xreq_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xreq_in (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
     struct nn_xreq *xreq;
     struct nn_xreq_data *data;
 
     xreq = nn_cont (self, struct nn_xreq, sockbase);
     data = nn_pipe_getdata (pipe);
-    return nn_fq_in (&xreq->fq, pipe, &data->fq);
+    nn_fq_in (&xreq->fq, pipe, &data->fq);
 }
 
-int nn_xreq_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xreq_out (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
     struct nn_xreq *xreq;
     struct nn_xreq_data *data;
 
     xreq = nn_cont (self, struct nn_xreq, sockbase);
     data = nn_pipe_getdata (pipe);
-    return nn_lb_out (&xreq->lb, pipe, &data->lb);
+    nn_lb_out (&xreq->lb, pipe, &data->lb);
+}
+
+int nn_xreq_events (struct nn_sockbase *self)
+{
+    struct nn_xreq *xreq;
+
+    xreq = nn_cont (self, struct nn_xreq, sockbase);
+
+    return (nn_fq_can_recv (&xreq->fq) ? NN_SOCKBASE_EVENT_IN : 0) |
+        (nn_lb_can_send (&xreq->lb) ? NN_SOCKBASE_EVENT_OUT : 0);
 }
 
 int nn_xreq_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/reqrep/xreq.h b/src/protocols/reqrep/xreq.h
index 4cdce58..3328fa5 100644
--- a/src/protocols/reqrep/xreq.h
+++ b/src/protocols/reqrep/xreq.h
@@ -40,8 +40,9 @@
 
 int nn_xreq_add (struct nn_sockbase *self, struct nn_pipe *pipe);
 void nn_xreq_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xreq_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xreq_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xreq_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xreq_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+int nn_xreq_events (struct nn_sockbase *self);
 int nn_xreq_send (struct nn_sockbase *self, struct nn_msg *msg);
 int nn_xreq_recv (struct nn_sockbase *self, struct nn_msg *msg);
 int nn_xreq_setopt (struct nn_sockbase *self, int level, int option,
diff --git a/src/protocols/survey/respondent.c b/src/protocols/survey/respondent.c
index fee77a2..116e72e 100644
--- a/src/protocols/survey/respondent.c
+++ b/src/protocols/survey/respondent.c
@@ -50,6 +50,7 @@
 
 /*  Implementation of nn_sockbase's virtual functions. */
 static void nn_respondent_destroy (struct nn_sockbase *self);
+static int nn_respondent_events (struct nn_sockbase *self);
 static int nn_respondent_send (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_respondent_recv (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_respondent_sethdr (struct nn_msg *msg, const void *hdr,
@@ -61,6 +62,7 @@
     nn_xrespondent_rm,
     nn_xrespondent_in,
     nn_xrespondent_out,
+    nn_respondent_events,
     nn_respondent_send,
     nn_respondent_recv,
     nn_xrespondent_setopt,
@@ -91,6 +93,19 @@
     nn_free (respondent);
 }
 
+static int nn_respondent_events (struct nn_sockbase *self)
+{
+    int events;
+    struct nn_respondent *respondent;
+
+    respondent = nn_cont (self, struct nn_respondent, xrespondent.sockbase);
+
+    events = nn_xrespondent_events (&respondent->xrespondent.sockbase);
+    if (!(respondent->flags & NN_RESPONDENT_INPROGRESS))
+        events &= ~NN_SOCKBASE_EVENT_OUT;
+    return events;
+}
+
 static int nn_respondent_send (struct nn_sockbase *self, struct nn_msg *msg)
 {
     int rc;
diff --git a/src/protocols/survey/surveyor.c b/src/protocols/survey/surveyor.c
index a08d8bf..a228edc 100644
--- a/src/protocols/survey/surveyor.c
+++ b/src/protocols/survey/surveyor.c
@@ -56,6 +56,7 @@
 
 /*  Implementation of nn_sockbase's virtual functions. */
 static void nn_surveyor_destroy (struct nn_sockbase *self);
+static int nn_surveyor_events (struct nn_sockbase *self);
 static int nn_surveyor_send (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_surveyor_recv (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_surveyor_setopt (struct nn_sockbase *self, int level, int option,
@@ -71,6 +72,7 @@
     nn_xsurveyor_rm,
     nn_xsurveyor_in,
     nn_xsurveyor_out,
+    nn_surveyor_events,
     nn_surveyor_send,
     nn_surveyor_recv,
     nn_surveyor_setopt,
@@ -124,6 +126,17 @@
     nn_free (surveyor);
 }
 
+static int nn_surveyor_events (struct nn_sockbase *self)
+{
+    struct nn_surveyor *surveyor;
+
+    surveyor = nn_cont (self, struct nn_surveyor, xsurveyor.sockbase);
+
+    if (!(surveyor->flags & NN_SURVEYOR_INPROGRESS))
+        return NN_SOCKBASE_EVENT_IN | NN_SOCKBASE_EVENT_OUT;
+    return nn_xsurveyor_events (&surveyor->xsurveyor.sockbase);
+}
+
 static int nn_surveyor_send (struct nn_sockbase *self, struct nn_msg *msg)
 {
     int rc;
@@ -208,8 +221,8 @@
     /*  Cancel the survey. */
     surveyor->flags &= ~NN_SURVEYOR_INPROGRESS;
 
-    /*  If there's a blocked recv() operation, unblock it. */
-    nn_sockbase_unblock_recv (&surveyor->xsurveyor.sockbase);
+    /*  If there's a blocked recv/poll operation, unblock it. */
+    nn_sockbase_changed (&surveyor->xsurveyor.sockbase);
 }
 
 static int nn_surveyor_setopt (struct nn_sockbase *self, int level, int option,
diff --git a/src/protocols/survey/xrespondent.c b/src/protocols/survey/xrespondent.c
index cef8021..a07dd8b 100644
--- a/src/protocols/survey/xrespondent.c
+++ b/src/protocols/survey/xrespondent.c
@@ -40,6 +40,7 @@
     nn_xrespondent_rm,
     nn_xrespondent_in,
     nn_xrespondent_out,
+    nn_xrespondent_events,
     nn_xrespondent_send,
     nn_xrespondent_recv,
     nn_xrespondent_setopt,
@@ -80,16 +81,29 @@
     nn_excl_rm (&nn_cont (self, struct nn_xrespondent, sockbase)->excl, pipe);
 }
 
-int nn_xrespondent_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xrespondent_in (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
-    return nn_excl_in (&nn_cont (self, struct nn_xrespondent, sockbase)->excl,
-        pipe);
+    nn_excl_in (&nn_cont (self, struct nn_xrespondent, sockbase)->excl, pipe);
 }
 
-int nn_xrespondent_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xrespondent_out (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
-    return nn_excl_out (&nn_cont (self, struct nn_xrespondent, sockbase)->excl,
-        pipe);
+    nn_excl_out (&nn_cont (self, struct nn_xrespondent, sockbase)->excl, pipe);
+}
+
+int nn_xrespondent_events (struct nn_sockbase *self)
+{
+    struct nn_xrespondent *xrespondent;
+    int events;
+
+    xrespondent = nn_cont (self, struct nn_xrespondent, sockbase);
+
+    events = 0;
+    if (nn_excl_can_recv (&xrespondent->excl))
+        events |= NN_SOCKBASE_EVENT_IN;
+    if (nn_excl_can_send (&xrespondent->excl))
+        events |= NN_SOCKBASE_EVENT_OUT;
+    return events;
 }
 
 int nn_xrespondent_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/survey/xrespondent.h b/src/protocols/survey/xrespondent.h
index fe9facf..d69cc49 100644
--- a/src/protocols/survey/xrespondent.h
+++ b/src/protocols/survey/xrespondent.h
@@ -40,8 +40,9 @@
 
 int nn_xrespondent_add (struct nn_sockbase *self, struct nn_pipe *pipe);
 void nn_xrespondent_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xrespondent_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xrespondent_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xrespondent_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xrespondent_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+int nn_xrespondent_events (struct nn_sockbase *self);
 int nn_xrespondent_send (struct nn_sockbase *self, struct nn_msg *msg);
 int nn_xrespondent_recv (struct nn_sockbase *self, struct nn_msg *msg);
 int nn_xrespondent_setopt (struct nn_sockbase *self, int level, int option,
diff --git a/src/protocols/survey/xsurveyor.c b/src/protocols/survey/xsurveyor.c
index 87eb09c..7749803 100644
--- a/src/protocols/survey/xsurveyor.c
+++ b/src/protocols/survey/xsurveyor.c
@@ -43,6 +43,7 @@
     nn_xsurveyor_rm,
     nn_xsurveyor_in,
     nn_xsurveyor_out,
+    nn_xsurveyor_events,
     nn_xsurveyor_send,
     nn_xsurveyor_recv,
     nn_xsurveyor_setopt,
@@ -106,7 +107,7 @@
     nn_free (data);
 }
 
-int nn_xsurveyor_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xsurveyor_in (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
     struct nn_xsurveyor *xsurveyor;
     struct nn_xsurveyor_data *data;
@@ -114,10 +115,10 @@
     xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
     data = nn_pipe_getdata (pipe);
 
-    return nn_fq_in (&xsurveyor->inpipes, pipe, &data->initem);
+    nn_fq_in (&xsurveyor->inpipes, pipe, &data->initem);
 }
 
-int nn_xsurveyor_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xsurveyor_out (struct nn_sockbase *self, struct nn_pipe *pipe)
 {
     struct nn_xsurveyor *xsurveyor;
     struct nn_xsurveyor_data *data;
@@ -125,7 +126,20 @@
     xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
     data = nn_pipe_getdata (pipe);
 
-    return nn_dist_out (&xsurveyor->outpipes, pipe, &data->outitem);
+    nn_dist_out (&xsurveyor->outpipes, pipe, &data->outitem);
+}
+
+int nn_xsurveyor_events (struct nn_sockbase *self)
+{
+    struct nn_xsurveyor *xsurveyor;
+    int events;
+
+    xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
+
+    events = NN_SOCKBASE_EVENT_OUT;
+    if (nn_fq_can_recv (&xsurveyor->inpipes))
+        events |= NN_SOCKBASE_EVENT_IN;
+    return events;
 }
 
 int nn_xsurveyor_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/survey/xsurveyor.h b/src/protocols/survey/xsurveyor.h
index 0cb0268..ba1864b 100644
--- a/src/protocols/survey/xsurveyor.h
+++ b/src/protocols/survey/xsurveyor.h
@@ -54,8 +54,9 @@
 
 int nn_xsurveyor_add (struct nn_sockbase *self, struct nn_pipe *pipe);
 void nn_xsurveyor_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xsurveyor_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xsurveyor_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xsurveyor_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xsurveyor_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+int nn_xsurveyor_events (struct nn_sockbase *self);
 int nn_xsurveyor_send (struct nn_sockbase *self, struct nn_msg *msg);
 int nn_xsurveyor_recv (struct nn_sockbase *self, struct nn_msg *msg);
 int nn_xsurveyor_setopt (struct nn_sockbase *self, int level, int option,
diff --git a/src/utils/dist.c b/src/utils/dist.c
index b162994..0fb2daa 100644
--- a/src/utils/dist.c
+++ b/src/utils/dist.c
@@ -51,15 +51,10 @@
         nn_list_erase (&self->pipes, &data->item);
 }
 
-int nn_dist_out (struct nn_dist *self, struct nn_pipe *pipe,
+void nn_dist_out (struct nn_dist *self, struct nn_pipe *pipe,
     struct nn_dist_data *data)
 {
-    int result;
-
-    result = nn_list_empty (&self->pipes) ? 1 : 0;
     nn_list_insert (&self->pipes, &data->item, nn_list_end (&self->pipes));
-
-    return result;
 }
 
 int nn_dist_send (struct nn_dist *self, struct nn_msg *msg,
diff --git a/src/utils/dist.h b/src/utils/dist.h
index fc6904c..7fc5f90 100644
--- a/src/utils/dist.h
+++ b/src/utils/dist.h
@@ -44,7 +44,7 @@
     struct nn_dist_data *data);
 void nn_dist_rm (struct nn_dist *self, struct nn_pipe *pipe,
     struct nn_dist_data *data);
-int nn_dist_out (struct nn_dist *self, struct nn_pipe *pipe,
+void nn_dist_out (struct nn_dist *self, struct nn_pipe *pipe,
     struct nn_dist_data *data);
 
 /*  Sends the message to all the attached pipes except the one specified
diff --git a/src/utils/excl.c b/src/utils/excl.c
index 8db4acd..d3cdfe3 100644
--- a/src/utils/excl.c
+++ b/src/utils/excl.c
@@ -59,20 +59,18 @@
    self->outpipe = NULL;
 }
 
-int nn_excl_in (struct nn_excl *self, struct nn_pipe *pipe)
+void nn_excl_in (struct nn_excl *self, struct nn_pipe *pipe)
 {
     nn_assert (!self->inpipe);
     nn_assert (pipe == self->pipe);
     self->inpipe = pipe;
-    return 1;
 }
 
-int nn_excl_out (struct nn_excl *self, struct nn_pipe *pipe)
+void nn_excl_out (struct nn_excl *self, struct nn_pipe *pipe)
 {
     nn_assert (!self->outpipe);
     nn_assert (pipe == self->pipe);
     self->outpipe = pipe;
-    return 1;
 }
 
 int nn_excl_send (struct nn_excl *self, struct nn_msg *msg)
@@ -107,3 +105,13 @@
     return rc & ~NN_PIPE_RELEASE;
 }
 
+int nn_excl_can_send (struct nn_excl *self)
+{
+    return self->outpipe ? 1 : 0;
+}
+
+int nn_excl_can_recv (struct nn_excl *self)
+{
+    return self->inpipe ? 1 : 0;
+}
+
diff --git a/src/utils/excl.h b/src/utils/excl.h
index dfcba77..fd26468 100644
--- a/src/utils/excl.h
+++ b/src/utils/excl.h
@@ -48,9 +48,11 @@
 void nn_excl_term (struct nn_excl *self);
 int nn_excl_add (struct nn_excl *self, struct nn_pipe *pipe);
 void nn_excl_rm (struct nn_excl *self, struct nn_pipe *pipe);
-int nn_excl_in (struct nn_excl *self, struct nn_pipe *pipe);
-int nn_excl_out (struct nn_excl *self, struct nn_pipe *pipe);
+void nn_excl_in (struct nn_excl *self, struct nn_pipe *pipe);
+void nn_excl_out (struct nn_excl *self, struct nn_pipe *pipe);
 int nn_excl_send (struct nn_excl *self, struct nn_msg *msg);
 int nn_excl_recv (struct nn_excl *self, struct nn_msg *msg);
+int nn_excl_can_send (struct nn_excl *self);
+int nn_excl_can_recv (struct nn_excl *self);
 
 #endif
diff --git a/src/utils/fq.c b/src/utils/fq.c
index 7c1779b..d37f487 100644
--- a/src/utils/fq.c
+++ b/src/utils/fq.c
@@ -48,10 +48,15 @@
     nn_priolist_rm (&self->priolist, pipe, &data->priolist);
 }
 
-int nn_fq_in (struct nn_fq *self, struct nn_pipe *pipe,
+void nn_fq_in (struct nn_fq *self, struct nn_pipe *pipe,
     struct nn_fq_data *data)
 {
-    return nn_priolist_activate (&self->priolist, pipe, &data->priolist);
+    nn_priolist_activate (&self->priolist, pipe, &data->priolist);
+}
+
+int nn_fq_can_recv (struct nn_fq *self)
+{
+    return nn_priolist_is_active (&self->priolist);
 }
 
 int nn_fq_recv (struct nn_fq *self, struct nn_msg *msg, struct nn_pipe **pipe)
diff --git a/src/utils/fq.h b/src/utils/fq.h
index 4798a8b..7d34fef 100644
--- a/src/utils/fq.h
+++ b/src/utils/fq.h
@@ -44,8 +44,9 @@
     struct nn_fq_data *data, int priority);
 void nn_fq_rm (struct nn_fq *self, struct nn_pipe *pipe,
     struct nn_fq_data *data);
-int nn_fq_in (struct nn_fq *self, struct nn_pipe *pipe,
+void nn_fq_in (struct nn_fq *self, struct nn_pipe *pipe,
     struct nn_fq_data *data);
+int nn_fq_can_recv (struct nn_fq *self);
 int nn_fq_recv (struct nn_fq *self, struct nn_msg *msg, struct nn_pipe **pipe);
 
 #endif
diff --git a/src/utils/lb.c b/src/utils/lb.c
index 5cf2250..78add2c 100644
--- a/src/utils/lb.c
+++ b/src/utils/lb.c
@@ -48,10 +48,15 @@
     nn_priolist_rm (&self->priolist, pipe, &data->priolist);
 }
 
-int nn_lb_out (struct nn_lb *self, struct nn_pipe *pipe,
+void nn_lb_out (struct nn_lb *self, struct nn_pipe *pipe,
     struct nn_lb_data *data)
 {
-    return nn_priolist_activate (&self->priolist, pipe, &data->priolist);
+    nn_priolist_activate (&self->priolist, pipe, &data->priolist);
+}
+
+int nn_lb_can_send (struct nn_lb *self)
+{
+    return nn_priolist_is_active (&self->priolist);
 }
 
 int nn_lb_send (struct nn_lb *self, struct nn_msg *msg)
diff --git a/src/utils/lb.h b/src/utils/lb.h
index 6f4dbd0..1b6ebb0 100644
--- a/src/utils/lb.h
+++ b/src/utils/lb.h
@@ -43,8 +43,9 @@
     struct nn_lb_data *data, int priority);
 void nn_lb_rm (struct nn_lb *self, struct nn_pipe *pipe,
     struct nn_lb_data *data);
-int nn_lb_out (struct nn_lb *self, struct nn_pipe *pipe,
+void nn_lb_out (struct nn_lb *self, struct nn_pipe *pipe,
     struct nn_lb_data *data);
+int nn_lb_can_send (struct nn_lb *self);
 int nn_lb_send (struct nn_lb *self, struct nn_msg *msg);
 
 #endif
diff --git a/src/utils/priolist.c b/src/utils/priolist.c
index 457fe79..87ed754 100644
--- a/src/utils/priolist.c
+++ b/src/utils/priolist.c
@@ -61,7 +61,7 @@
         nn_list_erase (&self->slots [data->priority - 1].pipes, &data->item);
 }
 
-int nn_priolist_activate (struct nn_priolist *self, struct nn_pipe *pipe,
+void nn_priolist_activate (struct nn_priolist *self, struct nn_pipe *pipe,
     struct nn_priolist_data *data)
 {
     struct nn_priolist_slot *slot;
@@ -72,7 +72,7 @@
         going to change. */
     if (!nn_list_empty (&slot->pipes)) {
         nn_list_insert (&slot->pipes, &data->item, nn_list_end (&slot->pipes));
-        return 0;
+        return;
     }
 
     /*  Add first pipe into the slot. If there are no pipes in priolist at all
@@ -81,18 +81,22 @@
     slot->current = data;
     if (self->current == -1) {
         self->current = data->priority;
-        return 1;
+        return;
     }
 
     /*  If the current priority is lower than the one of the newly activated
         pipe, this slot becomes current. */
     if (self->current > data->priority) {
         self->current = data->priority;
-        return 0;
+        return;
     }
 
     /*  Current doesn't change otherwise. */
-    return 0;
+}
+
+int nn_priolist_is_active (struct nn_priolist *self)
+{
+    return self->current == -1 ? 0 : 1;
 }
 
 struct nn_pipe *nn_priolist_getpipe (struct nn_priolist *self)
diff --git a/src/utils/priolist.h b/src/utils/priolist.h
index cbdd01e..ae04815 100644
--- a/src/utils/priolist.h
+++ b/src/utils/priolist.h
@@ -53,8 +53,9 @@
     struct nn_priolist_data *data, int priority);
 void nn_priolist_rm (struct nn_priolist *self, struct nn_pipe *pipe,
     struct nn_priolist_data *data);
-int nn_priolist_activate (struct nn_priolist *self, struct nn_pipe *pipe,
+void nn_priolist_activate (struct nn_priolist *self, struct nn_pipe *pipe,
     struct nn_priolist_data *data);
+int nn_priolist_is_active (struct nn_priolist *self);
 struct nn_pipe *nn_priolist_getpipe (struct nn_priolist *self);
 void nn_priolist_advance (struct nn_priolist *self, int release);