Initial implementation of polling infrastrucutre
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 983b613..42f9769 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -117,6 +117,11 @@
utils/wire.h
utils/wire.c
+ protocols/bus/bus.h
+ protocols/bus/bus.c
+ protocols/bus/xbus.h
+ protocols/bus/xbus.c
+
protocols/fanin/sink.h
protocols/fanin/sink.c
protocols/fanin/source.h
@@ -165,11 +170,6 @@
protocols/survey/xsurveyor.h
protocols/survey/xsurveyor.c
- protocols/bus/bus.h
- protocols/bus/bus.c
- protocols/bus/xbus.h
- protocols/bus/xbus.c
-
transports/inproc/inproc.h
transports/inproc/inproc_ctx.h
transports/inproc/inproc_ctx.c
diff --git a/src/core/sock.c b/src/core/sock.c
index c318a4f..32ae71f 100644
--- a/src/core/sock.c
+++ b/src/core/sock.c
@@ -43,6 +43,17 @@
#define NN_SOCK_FLAG_RCVFD 4
#define NN_SOCK_FLAG_ERRFD 8
+/* These bits specify whether individual efds are signalled or not at
+ the moment. Storing this information allows us to avoid redundant signalling
+ and unsignalling of the efd objects. It is also used to store events
+ while efds are not yet created (see above). */
+#define NN_SOCK_FLAG_IN 16
+#define NN_SOCK_FLAG_OUT 32
+#define NN_SOCK_FLAG_ERR 64
+
+/* Private functions. */
+void nn_sockbase_adjust_events (struct nn_sockbase *self);
+
void nn_sockbase_init (struct nn_sockbase *self,
const struct nn_sockbase_vfptr *vfptr, int fd)
{
@@ -137,14 +148,9 @@
nn_cp_term (&self->cp);
}
-void nn_sockbase_unblock_recv (struct nn_sockbase *self)
+void nn_sockbase_changed (struct nn_sockbase *self)
{
- nn_cond_post (&self->cond);
-}
-
-void nn_sockbase_unblock_send (struct nn_sockbase *self)
-{
- nn_cond_post (&self->cond);
+ nn_sockbase_adjust_events (self);
}
struct nn_cp *nn_sockbase_getcp (struct nn_sockbase *self)
@@ -179,6 +185,7 @@
if (level > NN_SOL_SOCKET) {
rc = sockbase->vfptr->setopt (sockbase, level, option,
optval, optvallen);
+ nn_sockbase_adjust_events (sockbase);
nn_cp_unlock (&sockbase->cp);
return rc;
}
@@ -354,6 +361,7 @@
if (level > NN_SOL_SOCKET) {
rc = sockbase->vfptr->getopt (sockbase, level, option,
optval, optvallen);
+ nn_sockbase_adjust_events (sockbase);
if (!internal)
nn_cp_unlock (&sockbase->cp);
return rc;
@@ -473,6 +481,7 @@
/* Try to send the message in a non-blocking way. */
rc = sockbase->vfptr->send (sockbase, msg);
+ nn_sockbase_adjust_events (sockbase);
if (nn_fast (rc == 0)) {
nn_cp_unlock (&sockbase->cp);
return 0;
@@ -526,6 +535,7 @@
/* Try to receive the message in a non-blocking way. */
rc = sockbase->vfptr->recv (sockbase, msg);
+ nn_sockbase_adjust_events (sockbase);
if (nn_fast (rc == 0)) {
nn_cp_unlock (&sockbase->cp);
return 0;
@@ -581,49 +591,83 @@
int nn_sock_add (struct nn_sock *self, struct nn_pipe *pipe)
{
+ int rc;
struct nn_sockbase *sockbase;
- /* Forward the call to the specific socket type. */
sockbase = (struct nn_sockbase*) self;
- return sockbase->vfptr->add (sockbase, pipe);
+
+ rc = sockbase->vfptr->add (sockbase, pipe);
+ nn_sockbase_adjust_events (sockbase);
+ return rc;
}
void nn_sock_rm (struct nn_sock *self, struct nn_pipe *pipe)
{
struct nn_sockbase *sockbase;
- /* Forward the call to the specific socket type. */
sockbase = (struct nn_sockbase*) self;
+
sockbase->vfptr->rm (sockbase, pipe);
+ nn_sockbase_adjust_events (sockbase);
}
void nn_sock_in (struct nn_sock *self, struct nn_pipe *pipe)
{
- int rc;
struct nn_sockbase *sockbase;
- /* Forward the call to the specific socket type. */
sockbase = (struct nn_sockbase*) self;
- rc = sockbase->vfptr->in (sockbase, pipe);
- errnum_assert (rc >= 0, -rc);
- if (rc == 1) {
-#if defined NN_LATENCY_MONITOR
- nn_latmon_measure (NN_LATMON_COND_POST);
-#endif
- nn_cond_post (&sockbase->cond);
- }
+
+ sockbase->vfptr->in (sockbase, pipe);
+ nn_sockbase_adjust_events (sockbase);
}
void nn_sock_out (struct nn_sock *self, struct nn_pipe *pipe)
{
- int rc;
struct nn_sockbase *sockbase;
- /* Forward the call to the specific socket type. */
sockbase = (struct nn_sockbase*) self;
- rc = sockbase->vfptr->out (sockbase, pipe);
- errnum_assert (rc >= 0, -rc);
- if (rc == 1)
- nn_cond_post (&sockbase->cond);
+
+ sockbase->vfptr->out (sockbase, pipe);
+ nn_sockbase_adjust_events (sockbase);
+}
+
+void nn_sockbase_adjust_events (struct nn_sockbase *self)
+{
+ int events;
+
+ /* Check whether socket is readable and/or writeable at the moment. */
+ events = self->vfptr->events (self);
+ errnum_assert (events >= 0, -events);
+
+ /* Socket becomes readable. */
+ if (!(self->flags & NN_SOCK_FLAG_IN) && events & NN_SOCKBASE_EVENT_IN) {
+ self->flags |= NN_SOCK_FLAG_IN;
+ if (self->flags & NN_SOCK_FLAG_RCVFD)
+ nn_efd_signal (&self->rcvfd);
+ nn_cond_post (&self->cond);
+ }
+
+ /* Socket becomes writeable. */
+ if (!(self->flags & NN_SOCK_FLAG_OUT) && events & NN_SOCKBASE_EVENT_OUT) {
+ self->flags |= NN_SOCK_FLAG_OUT;
+ if (self->flags & NN_SOCK_FLAG_SNDFD)
+ nn_efd_signal (&self->sndfd);
+ nn_cond_post (&self->cond);
+ }
+
+ /* Socket ceases to be readable. */
+ if (self->flags & NN_SOCK_FLAG_IN && !(events & NN_SOCKBASE_EVENT_IN)) {
+ self->flags &= ~NN_SOCK_FLAG_IN;
+ if (self->flags & NN_SOCK_FLAG_RCVFD)
+ nn_efd_unsignal (&self->rcvfd);
+ }
+
+ /* Socket ceases to be writeable. */
+ if (self->flags & NN_SOCK_FLAG_OUT &&
+ !(events & NN_SOCKBASE_EVENT_OUT)) {
+ self->flags &= ~NN_SOCK_FLAG_OUT;
+ if (self->flags & NN_SOCK_FLAG_SNDFD)
+ nn_efd_unsignal (&self->sndfd);
+ }
}
diff --git a/src/protocol.h b/src/protocol.h
index 1add55e..ef8dd1a 100644
--- a/src/protocol.h
+++ b/src/protocol.h
@@ -73,6 +73,9 @@
struct nn_sockbase;
+#define NN_SOCKBASE_EVENT_IN 1
+#define NN_SOCKBASE_EVENT_OUT 2
+
/* To be implemented by individual socket types. */
struct nn_sockbase_vfptr {
@@ -86,8 +89,12 @@
informs it that it is writeable. */
int (*add) (struct nn_sockbase *self, struct nn_pipe *pipe);
void (*rm) (struct nn_sockbase *self, struct nn_pipe *pipe);
- int (*in) (struct nn_sockbase *self, struct nn_pipe *pipe);
- int (*out) (struct nn_sockbase *self, struct nn_pipe *pipe);
+ void (*in) (struct nn_sockbase *self, struct nn_pipe *pipe);
+ void (*out) (struct nn_sockbase *self, struct nn_pipe *pipe);
+
+ /* Return any combination of event flags defined above, thus specifying
+ whether the socket should be readable, writeable or none. */
+ int (*events) (struct nn_sockbase *self);
/* Send a message to the socket. Returns -EAGAIN if it cannot be done at
the moment or zero in case of success. */
@@ -144,11 +151,13 @@
/* Uninitialise the socket. */
void nn_sockbase_term (struct nn_sockbase *self);
-/* If recv is blocking at the moment, this function will unblock it. */
-void nn_sockbase_unblock_recv (struct nn_sockbase *self);
-
-/* If send is blocking at the moment, this function will unblock it. */
-void nn_sockbase_unblock_send (struct nn_sockbase *self);
+/* Call this function when the state of the socket changed asynchronously,
+ i.e. if the socket became (un)readable/(un)writeable outside of one of the
+ nn_sockbase_vfptr functions, for example, in a timer handler or in a worker
+ thread created by the protocol implementation. Calling it allows nanomsg
+ core to check the new state and unblock any threads blocked in
+ send/recv/poll functions as needed. */
+void nn_sockbase_changed (struct nn_sockbase *self);
/* Returns the completion port associated with the socket. */
struct nn_cp *nn_sockbase_getcp (struct nn_sockbase *self);
diff --git a/src/protocols/bus/bus.c b/src/protocols/bus/bus.c
index aa0c339..cba2708 100644
--- a/src/protocols/bus/bus.c
+++ b/src/protocols/bus/bus.c
@@ -51,6 +51,7 @@
nn_xbus_rm,
nn_xbus_in,
nn_xbus_out,
+ nn_xbus_events,
nn_bus_send,
nn_bus_recv,
nn_xbus_setopt,
diff --git a/src/protocols/bus/xbus.c b/src/protocols/bus/xbus.c
index 6d18ad2..16901bd 100644
--- a/src/protocols/bus/xbus.c
+++ b/src/protocols/bus/xbus.c
@@ -47,6 +47,7 @@
nn_xbus_rm,
nn_xbus_in,
nn_xbus_out,
+ nn_xbus_events,
nn_xbus_send,
nn_xbus_recv,
nn_xbus_setopt,
@@ -111,7 +112,7 @@
nn_free (data);
}
-int nn_xbus_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xbus_in (struct nn_sockbase *self, struct nn_pipe *pipe)
{
struct nn_xbus *xbus;
struct nn_xbus_data *data;
@@ -119,10 +120,10 @@
xbus = nn_cont (self, struct nn_xbus, sockbase);
data = nn_pipe_getdata (pipe);
- return nn_fq_in (&xbus->inpipes, pipe, &data->initem);
+ nn_fq_in (&xbus->inpipes, pipe, &data->initem);
}
-int nn_xbus_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xbus_out (struct nn_sockbase *self, struct nn_pipe *pipe)
{
struct nn_xbus *xbus;
struct nn_xbus_data *data;
@@ -130,7 +131,13 @@
xbus = nn_cont (self, struct nn_xbus, sockbase);
data = nn_pipe_getdata (pipe);
- return nn_dist_out (&xbus->outpipes, pipe, &data->outitem);
+ nn_dist_out (&xbus->outpipes, pipe, &data->outitem);
+}
+
+int nn_xbus_events (struct nn_sockbase *self)
+{
+ return (nn_fq_can_recv (&nn_cont (self, struct nn_xbus,
+ sockbase)->inpipes) ? NN_SOCKBASE_EVENT_IN : 0) | NN_SOCKBASE_EVENT_OUT;
}
int nn_xbus_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/bus/xbus.h b/src/protocols/bus/xbus.h
index f5a2e7e..fed737c 100644
--- a/src/protocols/bus/xbus.h
+++ b/src/protocols/bus/xbus.h
@@ -49,8 +49,9 @@
int nn_xbus_add (struct nn_sockbase *self, struct nn_pipe *pipe);
void nn_xbus_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xbus_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xbus_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xbus_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xbus_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+int nn_xbus_events (struct nn_sockbase *self);
int nn_xbus_send (struct nn_sockbase *self, struct nn_msg *msg);
int nn_xbus_recv (struct nn_sockbase *self, struct nn_msg *msg);
int nn_xbus_setopt (struct nn_sockbase *self, int level, int option,
diff --git a/src/protocols/fanin/xsink.c b/src/protocols/fanin/xsink.c
index 29b1eb6..af6cc32 100644
--- a/src/protocols/fanin/xsink.c
+++ b/src/protocols/fanin/xsink.c
@@ -49,8 +49,9 @@
static void nn_xsink_destroy (struct nn_sockbase *self);
static int nn_xsink_add (struct nn_sockbase *self, struct nn_pipe *pipe);
static void nn_xsink_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xsink_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xsink_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xsink_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xsink_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static int nn_xsink_events (struct nn_sockbase *self);
static int nn_xsink_send (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_xsink_recv (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_xsink_setopt (struct nn_sockbase *self, int level, int option,
@@ -66,6 +67,7 @@
nn_xsink_rm,
nn_xsink_in,
nn_xsink_out,
+ nn_xsink_events,
nn_xsink_send,
nn_xsink_recv,
nn_xsink_setopt,
@@ -122,21 +124,26 @@
nn_free (data);
}
-static int nn_xsink_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xsink_in (struct nn_sockbase *self, struct nn_pipe *pipe)
{
struct nn_xsink *xsink;
struct nn_xsink_data *data;
xsink = nn_cont (self, struct nn_xsink, sockbase);
data = nn_pipe_getdata (pipe);
- return nn_fq_in (&xsink->fq, pipe, &data->fq);
+ nn_fq_in (&xsink->fq, pipe, &data->fq);
}
-static int nn_xsink_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xsink_out (struct nn_sockbase *self, struct nn_pipe *pipe)
{
/* We are not going to send any messages, so there's no need to store
the list of outbound pipes. */
- return 0;
+}
+
+static int nn_xsink_events (struct nn_sockbase *self)
+{
+ return nn_fq_can_recv (&nn_cont (self, struct nn_xsink, sockbase)->fq) ?
+ NN_SOCKBASE_EVENT_IN : 0;
}
static int nn_xsink_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/fanin/xsource.c b/src/protocols/fanin/xsource.c
index 4d72cfb..31e5e80 100644
--- a/src/protocols/fanin/xsource.c
+++ b/src/protocols/fanin/xsource.c
@@ -45,8 +45,9 @@
static void nn_xsource_destroy (struct nn_sockbase *self);
static int nn_xsource_add (struct nn_sockbase *self, struct nn_pipe *pipe);
static void nn_xsource_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xsource_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xsource_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xsource_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xsource_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static int nn_xsource_events (struct nn_sockbase *self);
static int nn_xsource_send (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_xsource_recv (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_xsource_setopt (struct nn_sockbase *self, int level, int option,
@@ -62,6 +63,7 @@
nn_xsource_rm,
nn_xsource_in,
nn_xsource_out,
+ nn_xsource_events,
nn_xsource_send,
nn_xsource_recv,
nn_xsource_setopt,
@@ -104,16 +106,29 @@
nn_excl_rm (&nn_cont (self, struct nn_xsource, sockbase)->excl, pipe);
}
-static int nn_xsource_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xsource_in (struct nn_sockbase *self, struct nn_pipe *pipe)
{
- return nn_excl_in (&nn_cont (self, struct nn_xsource, sockbase)->excl,
- pipe);
+ nn_excl_in (&nn_cont (self, struct nn_xsource, sockbase)->excl, pipe);
}
-static int nn_xsource_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xsource_out (struct nn_sockbase *self, struct nn_pipe *pipe)
{
- return nn_excl_out (&nn_cont (self, struct nn_xsource, sockbase)->excl,
- pipe);
+ nn_excl_out (&nn_cont (self, struct nn_xsource, sockbase)->excl, pipe);
+}
+
+static int nn_xsource_events (struct nn_sockbase *self)
+{
+ struct nn_xsource *xsource;
+ int events;
+
+ xsource = nn_cont (self, struct nn_xsource, sockbase);
+
+ events = 0;
+ if (nn_excl_can_recv (&xsource->excl))
+ events |= NN_SOCKBASE_EVENT_IN;
+ if (nn_excl_can_send (&xsource->excl))
+ events |= NN_SOCKBASE_EVENT_OUT;
+ return events;
}
static int nn_xsource_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/fanout/xpull.c b/src/protocols/fanout/xpull.c
index 05f4e9b..32de5b3 100644
--- a/src/protocols/fanout/xpull.c
+++ b/src/protocols/fanout/xpull.c
@@ -45,8 +45,9 @@
static void nn_xpull_destroy (struct nn_sockbase *self);
static int nn_xpull_add (struct nn_sockbase *self, struct nn_pipe *pipe);
static void nn_xpull_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xpull_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xpull_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xpull_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xpull_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static int nn_xpull_events (struct nn_sockbase *self);
static int nn_xpull_send (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_xpull_recv (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_xpull_setopt (struct nn_sockbase *self, int level, int option,
@@ -62,6 +63,7 @@
nn_xpull_rm,
nn_xpull_in,
nn_xpull_out,
+ nn_xpull_events,
nn_xpull_send,
nn_xpull_recv,
nn_xpull_setopt,
@@ -104,14 +106,29 @@
nn_excl_rm (&nn_cont (self, struct nn_xpull, sockbase)->excl, pipe);
}
-static int nn_xpull_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xpull_in (struct nn_sockbase *self, struct nn_pipe *pipe)
{
- return nn_excl_in (&nn_cont (self, struct nn_xpull, sockbase)->excl, pipe);
+ nn_excl_in (&nn_cont (self, struct nn_xpull, sockbase)->excl, pipe);
}
-static int nn_xpull_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xpull_out (struct nn_sockbase *self, struct nn_pipe *pipe)
{
- return nn_excl_out (&nn_cont (self, struct nn_xpull, sockbase)->excl, pipe);
+ nn_excl_out (&nn_cont (self, struct nn_xpull, sockbase)->excl, pipe);
+}
+
+static int nn_xpull_events (struct nn_sockbase *self)
+{
+ struct nn_xpull *xpull;
+ int events;
+
+ xpull = nn_cont (self, struct nn_xpull, sockbase);
+
+ events = 0;
+ if (nn_excl_can_recv (&xpull->excl))
+ events |= NN_SOCKBASE_EVENT_IN;
+ if (nn_excl_can_send (&xpull->excl))
+ events |= NN_SOCKBASE_EVENT_OUT;
+ return events;
}
static int nn_xpull_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/fanout/xpush.c b/src/protocols/fanout/xpush.c
index 44326a4..1033d4e 100644
--- a/src/protocols/fanout/xpush.c
+++ b/src/protocols/fanout/xpush.c
@@ -49,8 +49,9 @@
static void nn_xpush_destroy (struct nn_sockbase *self);
static int nn_xpush_add (struct nn_sockbase *self, struct nn_pipe *pipe);
static void nn_xpush_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xpush_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xpush_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xpush_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xpush_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static int nn_xpush_events (struct nn_sockbase *self);
static int nn_xpush_send (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_xpush_recv (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_xpush_setopt (struct nn_sockbase *self, int level, int option,
@@ -66,6 +67,7 @@
nn_xpush_rm,
nn_xpush_in,
nn_xpush_out,
+ nn_xpush_events,
nn_xpush_send,
nn_xpush_recv,
nn_xpush_setopt,
@@ -122,21 +124,26 @@
nn_free (data);
}
-static int nn_xpush_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xpush_in (struct nn_sockbase *self, struct nn_pipe *pipe)
{
/* We are not going to receive any messages, so there's no need to store
the list of inbound pipes. */
- return 0;
}
-static int nn_xpush_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xpush_out (struct nn_sockbase *self, struct nn_pipe *pipe)
{
struct nn_xpush *xpush;
struct nn_xpush_data *data;
xpush = nn_cont (self, struct nn_xpush, sockbase);
data = nn_pipe_getdata (pipe);
- return nn_lb_out (&xpush->lb, pipe, &data->lb);
+ nn_lb_out (&xpush->lb, pipe, &data->lb);
+}
+
+static int nn_xpush_events (struct nn_sockbase *self)
+{
+ return nn_lb_can_send (&nn_cont (self, struct nn_xpush, sockbase)->lb) ?
+ NN_SOCKBASE_EVENT_OUT : 0;
}
static int nn_xpush_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/pair/xpair.c b/src/protocols/pair/xpair.c
index b5ffe57..1c128ad 100644
--- a/src/protocols/pair/xpair.c
+++ b/src/protocols/pair/xpair.c
@@ -45,8 +45,9 @@
static void nn_xpair_destroy (struct nn_sockbase *self);
static int nn_xpair_add (struct nn_sockbase *self, struct nn_pipe *pipe);
static void nn_xpair_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xpair_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_xpair_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xpair_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xpair_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static int nn_xpair_events (struct nn_sockbase *self);
static int nn_xpair_send (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_xpair_recv (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_xpair_setopt (struct nn_sockbase *self, int level, int option,
@@ -62,6 +63,7 @@
nn_xpair_rm,
nn_xpair_in,
nn_xpair_out,
+ nn_xpair_events,
nn_xpair_send,
nn_xpair_recv,
nn_xpair_setopt,
@@ -103,14 +105,29 @@
nn_excl_rm (&nn_cont (self, struct nn_xpair, sockbase)->excl, pipe);
}
-static int nn_xpair_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xpair_in (struct nn_sockbase *self, struct nn_pipe *pipe)
{
- return nn_excl_in (&nn_cont (self, struct nn_xpair, sockbase)->excl, pipe);
+ nn_excl_in (&nn_cont (self, struct nn_xpair, sockbase)->excl, pipe);
}
-static int nn_xpair_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_xpair_out (struct nn_sockbase *self, struct nn_pipe *pipe)
{
- return nn_excl_out (&nn_cont (self, struct nn_xpair, sockbase)->excl, pipe);
+ nn_excl_out (&nn_cont (self, struct nn_xpair, sockbase)->excl, pipe);
+}
+
+static int nn_xpair_events (struct nn_sockbase *self)
+{
+ struct nn_xpair *xpair;
+ int events;
+
+ xpair = nn_cont (self, struct nn_xpair, sockbase);
+
+ events = 0;
+ if (nn_excl_can_recv (&xpair->excl))
+ events |= NN_SOCKBASE_EVENT_IN;
+ if (nn_excl_can_send (&xpair->excl))
+ events |= NN_SOCKBASE_EVENT_OUT;
+ return events;
}
static int nn_xpair_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/pubsub/pub.c b/src/protocols/pubsub/pub.c
index d624ea1..a709efc 100644
--- a/src/protocols/pubsub/pub.c
+++ b/src/protocols/pubsub/pub.c
@@ -55,8 +55,9 @@
static void nn_pub_destroy (struct nn_sockbase *self);
static int nn_pub_add (struct nn_sockbase *self, struct nn_pipe *pipe);
static void nn_pub_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_pub_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_pub_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_pub_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_pub_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static int nn_pub_events (struct nn_sockbase *self);
static int nn_pub_send (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_pub_recv (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_pub_setopt (struct nn_sockbase *self, int level, int option,
@@ -72,6 +73,7 @@
nn_pub_rm,
nn_pub_in,
nn_pub_out,
+ nn_pub_events,
nn_pub_send,
nn_pub_recv,
nn_pub_setopt,
@@ -131,13 +133,13 @@
nn_free (data);
}
-static int nn_pub_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_pub_in (struct nn_sockbase *self, struct nn_pipe *pipe)
{
/* We shouldn't get any messages from subscribers. */
nn_assert (0);
}
-static int nn_pub_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_pub_out (struct nn_sockbase *self, struct nn_pipe *pipe)
{
struct nn_pub *pub;
struct nn_pub_data *data;
@@ -145,7 +147,12 @@
pub = nn_cont (self, struct nn_pub, sockbase);
data = nn_pipe_getdata (pipe);
- return nn_dist_out (&pub->outpipes, pipe, &data->item);
+ nn_dist_out (&pub->outpipes, pipe, &data->item);
+}
+
+static int nn_pub_events (struct nn_sockbase *self)
+{
+ return NN_SOCKBASE_EVENT_OUT;
}
static int nn_pub_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/pubsub/sub.c b/src/protocols/pubsub/sub.c
index 7f2edd8..0ad7d93 100644
--- a/src/protocols/pubsub/sub.c
+++ b/src/protocols/pubsub/sub.c
@@ -47,8 +47,9 @@
static void nn_sub_destroy (struct nn_sockbase *self);
static int nn_sub_add (struct nn_sockbase *self, struct nn_pipe *pipe);
static void nn_sub_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_sub_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-static int nn_sub_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_sub_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_sub_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static int nn_sub_events (struct nn_sockbase *self);
static int nn_sub_send (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_sub_recv (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_sub_setopt (struct nn_sockbase *self, int level, int option,
@@ -64,6 +65,7 @@
nn_sub_rm,
nn_sub_in,
nn_sub_out,
+ nn_sub_events,
nn_sub_send,
nn_sub_recv,
nn_sub_setopt,
@@ -107,14 +109,29 @@
nn_excl_rm (&nn_cont (self, struct nn_sub, sockbase)->excl, pipe);
}
-static int nn_sub_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_sub_in (struct nn_sockbase *self, struct nn_pipe *pipe)
{
- return nn_excl_in (&nn_cont (self, struct nn_sub, sockbase)->excl, pipe);
+ nn_excl_in (&nn_cont (self, struct nn_sub, sockbase)->excl, pipe);
}
-static int nn_sub_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+static void nn_sub_out (struct nn_sockbase *self, struct nn_pipe *pipe)
{
- return nn_excl_out (&nn_cont (self, struct nn_sub, sockbase)->excl, pipe);
+ nn_excl_out (&nn_cont (self, struct nn_sub, sockbase)->excl, pipe);
+}
+
+static int nn_sub_events (struct nn_sockbase *self)
+{
+ struct nn_sub *sub;
+ int events;
+
+ sub = nn_cont (self, struct nn_sub, sockbase);
+
+ events = 0;
+ if (nn_excl_can_recv (&sub->excl))
+ events |= NN_SOCKBASE_EVENT_IN;
+ if (nn_excl_can_send (&sub->excl))
+ events |= NN_SOCKBASE_EVENT_OUT;
+ return events;
}
static int nn_sub_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/reqrep/rep.c b/src/protocols/reqrep/rep.c
index d9f9702..71c3046 100644
--- a/src/protocols/reqrep/rep.c
+++ b/src/protocols/reqrep/rep.c
@@ -51,6 +51,7 @@
/* Implementation of nn_sockbase's virtual functions. */
static void nn_rep_destroy (struct nn_sockbase *self);
+static int nn_rep_events (struct nn_sockbase *self);
static int nn_rep_send (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_rep_recv (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_rep_sethdr (struct nn_msg *msg, const void *hdr,
@@ -63,6 +64,7 @@
nn_xrep_rm,
nn_xrep_in,
nn_xrep_out,
+ nn_rep_events,
nn_rep_send,
nn_rep_recv,
nn_xrep_setopt,
@@ -95,6 +97,17 @@
nn_free (rep);
}
+static int nn_rep_events (struct nn_sockbase *self)
+{
+ struct nn_rep *rep;
+ int events;
+
+ rep = nn_cont (self, struct nn_rep, xrep.sockbase);
+ if (!(rep->flags & NN_REP_INPROGRESS))
+ events &= ~NN_SOCKBASE_EVENT_OUT;
+ events = nn_xrep_events (&rep->xrep.sockbase);
+}
+
static int nn_rep_send (struct nn_sockbase *self, struct nn_msg *msg)
{
int rc;
diff --git a/src/protocols/reqrep/req.c b/src/protocols/reqrep/req.c
index b9f2dfc..8c1c3d1 100644
--- a/src/protocols/reqrep/req.c
+++ b/src/protocols/reqrep/req.c
@@ -57,6 +57,7 @@
/* Implementation of nn_sockbase's virtual functions. */
static void nn_req_destroy (struct nn_sockbase *self);
+static int nn_req_events (struct nn_sockbase *self);
static int nn_req_send (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_req_recv (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_req_setopt (struct nn_sockbase *self, int level, int option,
@@ -72,6 +73,7 @@
nn_xreq_rm,
nn_xreq_in,
nn_xreq_out,
+ nn_req_events,
nn_req_send,
nn_req_recv,
nn_req_setopt,
@@ -128,6 +130,19 @@
nn_free (req);
}
+static int nn_req_events (struct nn_sockbase *self)
+{
+ struct nn_req *req;
+ int events;
+
+ req = nn_cont (self, struct nn_req, xreq.sockbase);
+
+ events = nn_xreq_events (&req->xreq.sockbase);
+ if (!(req->flags & NN_REQ_INPROGRESS))
+ events &= ~NN_SOCKBASE_EVENT_IN;
+ return events;
+}
+
static int nn_req_send (struct nn_sockbase *self, struct nn_msg *msg)
{
int rc;
diff --git a/src/protocols/reqrep/xrep.c b/src/protocols/reqrep/xrep.c
index 8297bbe..d3c3bf7 100644
--- a/src/protocols/reqrep/xrep.c
+++ b/src/protocols/reqrep/xrep.c
@@ -43,6 +43,7 @@
nn_xrep_rm,
nn_xrep_in,
nn_xrep_out,
+ nn_xrep_events,
nn_xrep_send,
nn_xrep_recv,
nn_xrep_setopt,
@@ -116,7 +117,7 @@
nn_free (data);
}
-int nn_xrep_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xrep_in (struct nn_sockbase *self, struct nn_pipe *pipe)
{
struct nn_xrep *xrep;
struct nn_xrep_data *data;
@@ -124,18 +125,21 @@
xrep = nn_cont (self, struct nn_xrep, sockbase);
data = nn_pipe_getdata (pipe);
- return nn_fq_in (&xrep->inpipes, pipe, &data->initem);
+ nn_fq_in (&xrep->inpipes, pipe, &data->initem);
}
-int nn_xrep_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xrep_out (struct nn_sockbase *self, struct nn_pipe *pipe)
{
struct nn_xrep_data *data;
data = nn_pipe_getdata (pipe);
data->flags |= NN_XREP_OUT;
+}
- /* XREP socket never blocks on send, so there's no point in unblocking. */
- return 0;
+int nn_xrep_events (struct nn_sockbase *self)
+{
+ return (nn_fq_can_recv (&nn_cont (self, struct nn_xrep,
+ sockbase)->inpipes) ? NN_SOCKBASE_EVENT_IN : 0) | NN_SOCKBASE_EVENT_OUT;
}
int nn_xrep_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/reqrep/xrep.h b/src/protocols/reqrep/xrep.h
index d9abcde..2a0d0fd 100644
--- a/src/protocols/reqrep/xrep.h
+++ b/src/protocols/reqrep/xrep.h
@@ -60,8 +60,9 @@
int nn_xrep_add (struct nn_sockbase *self, struct nn_pipe *pipe);
void nn_xrep_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xrep_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xrep_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xrep_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xrep_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+int nn_xrep_events (struct nn_sockbase *self);
int nn_xrep_send (struct nn_sockbase *self, struct nn_msg *msg);
int nn_xrep_recv (struct nn_sockbase *self, struct nn_msg *msg);
int nn_xrep_setopt (struct nn_sockbase *self, int level, int option,
diff --git a/src/protocols/reqrep/xreq.c b/src/protocols/reqrep/xreq.c
index b801b2d..ea4011e 100644
--- a/src/protocols/reqrep/xreq.c
+++ b/src/protocols/reqrep/xreq.c
@@ -44,6 +44,7 @@
nn_xreq_rm,
nn_xreq_in,
nn_xreq_out,
+ nn_xreq_events,
nn_xreq_send,
nn_xreq_recv,
nn_xreq_setopt,
@@ -103,24 +104,34 @@
nn_free (data);
}
-int nn_xreq_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xreq_in (struct nn_sockbase *self, struct nn_pipe *pipe)
{
struct nn_xreq *xreq;
struct nn_xreq_data *data;
xreq = nn_cont (self, struct nn_xreq, sockbase);
data = nn_pipe_getdata (pipe);
- return nn_fq_in (&xreq->fq, pipe, &data->fq);
+ nn_fq_in (&xreq->fq, pipe, &data->fq);
}
-int nn_xreq_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xreq_out (struct nn_sockbase *self, struct nn_pipe *pipe)
{
struct nn_xreq *xreq;
struct nn_xreq_data *data;
xreq = nn_cont (self, struct nn_xreq, sockbase);
data = nn_pipe_getdata (pipe);
- return nn_lb_out (&xreq->lb, pipe, &data->lb);
+ nn_lb_out (&xreq->lb, pipe, &data->lb);
+}
+
+int nn_xreq_events (struct nn_sockbase *self)
+{
+ struct nn_xreq *xreq;
+
+ xreq = nn_cont (self, struct nn_xreq, sockbase);
+
+ return (nn_fq_can_recv (&xreq->fq) ? NN_SOCKBASE_EVENT_IN : 0) |
+ (nn_lb_can_send (&xreq->lb) ? NN_SOCKBASE_EVENT_OUT : 0);
}
int nn_xreq_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/reqrep/xreq.h b/src/protocols/reqrep/xreq.h
index 4cdce58..3328fa5 100644
--- a/src/protocols/reqrep/xreq.h
+++ b/src/protocols/reqrep/xreq.h
@@ -40,8 +40,9 @@
int nn_xreq_add (struct nn_sockbase *self, struct nn_pipe *pipe);
void nn_xreq_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xreq_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xreq_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xreq_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xreq_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+int nn_xreq_events (struct nn_sockbase *self);
int nn_xreq_send (struct nn_sockbase *self, struct nn_msg *msg);
int nn_xreq_recv (struct nn_sockbase *self, struct nn_msg *msg);
int nn_xreq_setopt (struct nn_sockbase *self, int level, int option,
diff --git a/src/protocols/survey/respondent.c b/src/protocols/survey/respondent.c
index fee77a2..116e72e 100644
--- a/src/protocols/survey/respondent.c
+++ b/src/protocols/survey/respondent.c
@@ -50,6 +50,7 @@
/* Implementation of nn_sockbase's virtual functions. */
static void nn_respondent_destroy (struct nn_sockbase *self);
+static int nn_respondent_events (struct nn_sockbase *self);
static int nn_respondent_send (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_respondent_recv (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_respondent_sethdr (struct nn_msg *msg, const void *hdr,
@@ -61,6 +62,7 @@
nn_xrespondent_rm,
nn_xrespondent_in,
nn_xrespondent_out,
+ nn_respondent_events,
nn_respondent_send,
nn_respondent_recv,
nn_xrespondent_setopt,
@@ -91,6 +93,19 @@
nn_free (respondent);
}
+static int nn_respondent_events (struct nn_sockbase *self)
+{
+ int events;
+ struct nn_respondent *respondent;
+
+ respondent = nn_cont (self, struct nn_respondent, xrespondent.sockbase);
+
+ events = nn_xrespondent_events (&respondent->xrespondent.sockbase);
+ if (!(respondent->flags & NN_RESPONDENT_INPROGRESS))
+ events &= ~NN_SOCKBASE_EVENT_OUT;
+ return events;
+}
+
static int nn_respondent_send (struct nn_sockbase *self, struct nn_msg *msg)
{
int rc;
diff --git a/src/protocols/survey/surveyor.c b/src/protocols/survey/surveyor.c
index a08d8bf..a228edc 100644
--- a/src/protocols/survey/surveyor.c
+++ b/src/protocols/survey/surveyor.c
@@ -56,6 +56,7 @@
/* Implementation of nn_sockbase's virtual functions. */
static void nn_surveyor_destroy (struct nn_sockbase *self);
+static int nn_surveyor_events (struct nn_sockbase *self);
static int nn_surveyor_send (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_surveyor_recv (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_surveyor_setopt (struct nn_sockbase *self, int level, int option,
@@ -71,6 +72,7 @@
nn_xsurveyor_rm,
nn_xsurveyor_in,
nn_xsurveyor_out,
+ nn_surveyor_events,
nn_surveyor_send,
nn_surveyor_recv,
nn_surveyor_setopt,
@@ -124,6 +126,17 @@
nn_free (surveyor);
}
+static int nn_surveyor_events (struct nn_sockbase *self)
+{
+ struct nn_surveyor *surveyor;
+
+ surveyor = nn_cont (self, struct nn_surveyor, xsurveyor.sockbase);
+
+ if (!(surveyor->flags & NN_SURVEYOR_INPROGRESS))
+ return NN_SOCKBASE_EVENT_IN | NN_SOCKBASE_EVENT_OUT;
+ return nn_xsurveyor_events (&surveyor->xsurveyor.sockbase);
+}
+
static int nn_surveyor_send (struct nn_sockbase *self, struct nn_msg *msg)
{
int rc;
@@ -208,8 +221,8 @@
/* Cancel the survey. */
surveyor->flags &= ~NN_SURVEYOR_INPROGRESS;
- /* If there's a blocked recv() operation, unblock it. */
- nn_sockbase_unblock_recv (&surveyor->xsurveyor.sockbase);
+ /* If there's a blocked recv/poll operation, unblock it. */
+ nn_sockbase_changed (&surveyor->xsurveyor.sockbase);
}
static int nn_surveyor_setopt (struct nn_sockbase *self, int level, int option,
diff --git a/src/protocols/survey/xrespondent.c b/src/protocols/survey/xrespondent.c
index cef8021..a07dd8b 100644
--- a/src/protocols/survey/xrespondent.c
+++ b/src/protocols/survey/xrespondent.c
@@ -40,6 +40,7 @@
nn_xrespondent_rm,
nn_xrespondent_in,
nn_xrespondent_out,
+ nn_xrespondent_events,
nn_xrespondent_send,
nn_xrespondent_recv,
nn_xrespondent_setopt,
@@ -80,16 +81,29 @@
nn_excl_rm (&nn_cont (self, struct nn_xrespondent, sockbase)->excl, pipe);
}
-int nn_xrespondent_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xrespondent_in (struct nn_sockbase *self, struct nn_pipe *pipe)
{
- return nn_excl_in (&nn_cont (self, struct nn_xrespondent, sockbase)->excl,
- pipe);
+ nn_excl_in (&nn_cont (self, struct nn_xrespondent, sockbase)->excl, pipe);
}
-int nn_xrespondent_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xrespondent_out (struct nn_sockbase *self, struct nn_pipe *pipe)
{
- return nn_excl_out (&nn_cont (self, struct nn_xrespondent, sockbase)->excl,
- pipe);
+ nn_excl_out (&nn_cont (self, struct nn_xrespondent, sockbase)->excl, pipe);
+}
+
+int nn_xrespondent_events (struct nn_sockbase *self)
+{
+ struct nn_xrespondent *xrespondent;
+ int events;
+
+ xrespondent = nn_cont (self, struct nn_xrespondent, sockbase);
+
+ events = 0;
+ if (nn_excl_can_recv (&xrespondent->excl))
+ events |= NN_SOCKBASE_EVENT_IN;
+ if (nn_excl_can_send (&xrespondent->excl))
+ events |= NN_SOCKBASE_EVENT_OUT;
+ return events;
}
int nn_xrespondent_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/survey/xrespondent.h b/src/protocols/survey/xrespondent.h
index fe9facf..d69cc49 100644
--- a/src/protocols/survey/xrespondent.h
+++ b/src/protocols/survey/xrespondent.h
@@ -40,8 +40,9 @@
int nn_xrespondent_add (struct nn_sockbase *self, struct nn_pipe *pipe);
void nn_xrespondent_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xrespondent_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xrespondent_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xrespondent_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xrespondent_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+int nn_xrespondent_events (struct nn_sockbase *self);
int nn_xrespondent_send (struct nn_sockbase *self, struct nn_msg *msg);
int nn_xrespondent_recv (struct nn_sockbase *self, struct nn_msg *msg);
int nn_xrespondent_setopt (struct nn_sockbase *self, int level, int option,
diff --git a/src/protocols/survey/xsurveyor.c b/src/protocols/survey/xsurveyor.c
index 87eb09c..7749803 100644
--- a/src/protocols/survey/xsurveyor.c
+++ b/src/protocols/survey/xsurveyor.c
@@ -43,6 +43,7 @@
nn_xsurveyor_rm,
nn_xsurveyor_in,
nn_xsurveyor_out,
+ nn_xsurveyor_events,
nn_xsurveyor_send,
nn_xsurveyor_recv,
nn_xsurveyor_setopt,
@@ -106,7 +107,7 @@
nn_free (data);
}
-int nn_xsurveyor_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xsurveyor_in (struct nn_sockbase *self, struct nn_pipe *pipe)
{
struct nn_xsurveyor *xsurveyor;
struct nn_xsurveyor_data *data;
@@ -114,10 +115,10 @@
xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
data = nn_pipe_getdata (pipe);
- return nn_fq_in (&xsurveyor->inpipes, pipe, &data->initem);
+ nn_fq_in (&xsurveyor->inpipes, pipe, &data->initem);
}
-int nn_xsurveyor_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+void nn_xsurveyor_out (struct nn_sockbase *self, struct nn_pipe *pipe)
{
struct nn_xsurveyor *xsurveyor;
struct nn_xsurveyor_data *data;
@@ -125,7 +126,20 @@
xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
data = nn_pipe_getdata (pipe);
- return nn_dist_out (&xsurveyor->outpipes, pipe, &data->outitem);
+ nn_dist_out (&xsurveyor->outpipes, pipe, &data->outitem);
+}
+
+int nn_xsurveyor_events (struct nn_sockbase *self)
+{
+ struct nn_xsurveyor *xsurveyor;
+ int events;
+
+ xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
+
+ events = NN_SOCKBASE_EVENT_OUT;
+ if (nn_fq_can_recv (&xsurveyor->inpipes))
+ events |= NN_SOCKBASE_EVENT_IN;
+ return events;
}
int nn_xsurveyor_send (struct nn_sockbase *self, struct nn_msg *msg)
diff --git a/src/protocols/survey/xsurveyor.h b/src/protocols/survey/xsurveyor.h
index 0cb0268..ba1864b 100644
--- a/src/protocols/survey/xsurveyor.h
+++ b/src/protocols/survey/xsurveyor.h
@@ -54,8 +54,9 @@
int nn_xsurveyor_add (struct nn_sockbase *self, struct nn_pipe *pipe);
void nn_xsurveyor_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xsurveyor_in (struct nn_sockbase *self, struct nn_pipe *pipe);
-int nn_xsurveyor_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xsurveyor_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+void nn_xsurveyor_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+int nn_xsurveyor_events (struct nn_sockbase *self);
int nn_xsurveyor_send (struct nn_sockbase *self, struct nn_msg *msg);
int nn_xsurveyor_recv (struct nn_sockbase *self, struct nn_msg *msg);
int nn_xsurveyor_setopt (struct nn_sockbase *self, int level, int option,
diff --git a/src/utils/dist.c b/src/utils/dist.c
index b162994..0fb2daa 100644
--- a/src/utils/dist.c
+++ b/src/utils/dist.c
@@ -51,15 +51,10 @@
nn_list_erase (&self->pipes, &data->item);
}
-int nn_dist_out (struct nn_dist *self, struct nn_pipe *pipe,
+void nn_dist_out (struct nn_dist *self, struct nn_pipe *pipe,
struct nn_dist_data *data)
{
- int result;
-
- result = nn_list_empty (&self->pipes) ? 1 : 0;
nn_list_insert (&self->pipes, &data->item, nn_list_end (&self->pipes));
-
- return result;
}
int nn_dist_send (struct nn_dist *self, struct nn_msg *msg,
diff --git a/src/utils/dist.h b/src/utils/dist.h
index fc6904c..7fc5f90 100644
--- a/src/utils/dist.h
+++ b/src/utils/dist.h
@@ -44,7 +44,7 @@
struct nn_dist_data *data);
void nn_dist_rm (struct nn_dist *self, struct nn_pipe *pipe,
struct nn_dist_data *data);
-int nn_dist_out (struct nn_dist *self, struct nn_pipe *pipe,
+void nn_dist_out (struct nn_dist *self, struct nn_pipe *pipe,
struct nn_dist_data *data);
/* Sends the message to all the attached pipes except the one specified
diff --git a/src/utils/excl.c b/src/utils/excl.c
index 8db4acd..d3cdfe3 100644
--- a/src/utils/excl.c
+++ b/src/utils/excl.c
@@ -59,20 +59,18 @@
self->outpipe = NULL;
}
-int nn_excl_in (struct nn_excl *self, struct nn_pipe *pipe)
+void nn_excl_in (struct nn_excl *self, struct nn_pipe *pipe)
{
nn_assert (!self->inpipe);
nn_assert (pipe == self->pipe);
self->inpipe = pipe;
- return 1;
}
-int nn_excl_out (struct nn_excl *self, struct nn_pipe *pipe)
+void nn_excl_out (struct nn_excl *self, struct nn_pipe *pipe)
{
nn_assert (!self->outpipe);
nn_assert (pipe == self->pipe);
self->outpipe = pipe;
- return 1;
}
int nn_excl_send (struct nn_excl *self, struct nn_msg *msg)
@@ -107,3 +105,13 @@
return rc & ~NN_PIPE_RELEASE;
}
+int nn_excl_can_send (struct nn_excl *self)
+{
+ return self->outpipe ? 1 : 0;
+}
+
+int nn_excl_can_recv (struct nn_excl *self)
+{
+ return self->inpipe ? 1 : 0;
+}
+
diff --git a/src/utils/excl.h b/src/utils/excl.h
index dfcba77..fd26468 100644
--- a/src/utils/excl.h
+++ b/src/utils/excl.h
@@ -48,9 +48,11 @@
void nn_excl_term (struct nn_excl *self);
int nn_excl_add (struct nn_excl *self, struct nn_pipe *pipe);
void nn_excl_rm (struct nn_excl *self, struct nn_pipe *pipe);
-int nn_excl_in (struct nn_excl *self, struct nn_pipe *pipe);
-int nn_excl_out (struct nn_excl *self, struct nn_pipe *pipe);
+void nn_excl_in (struct nn_excl *self, struct nn_pipe *pipe);
+void nn_excl_out (struct nn_excl *self, struct nn_pipe *pipe);
int nn_excl_send (struct nn_excl *self, struct nn_msg *msg);
int nn_excl_recv (struct nn_excl *self, struct nn_msg *msg);
+int nn_excl_can_send (struct nn_excl *self);
+int nn_excl_can_recv (struct nn_excl *self);
#endif
diff --git a/src/utils/fq.c b/src/utils/fq.c
index 7c1779b..d37f487 100644
--- a/src/utils/fq.c
+++ b/src/utils/fq.c
@@ -48,10 +48,15 @@
nn_priolist_rm (&self->priolist, pipe, &data->priolist);
}
-int nn_fq_in (struct nn_fq *self, struct nn_pipe *pipe,
+void nn_fq_in (struct nn_fq *self, struct nn_pipe *pipe,
struct nn_fq_data *data)
{
- return nn_priolist_activate (&self->priolist, pipe, &data->priolist);
+ nn_priolist_activate (&self->priolist, pipe, &data->priolist);
+}
+
+int nn_fq_can_recv (struct nn_fq *self)
+{
+ return nn_priolist_is_active (&self->priolist);
}
int nn_fq_recv (struct nn_fq *self, struct nn_msg *msg, struct nn_pipe **pipe)
diff --git a/src/utils/fq.h b/src/utils/fq.h
index 4798a8b..7d34fef 100644
--- a/src/utils/fq.h
+++ b/src/utils/fq.h
@@ -44,8 +44,9 @@
struct nn_fq_data *data, int priority);
void nn_fq_rm (struct nn_fq *self, struct nn_pipe *pipe,
struct nn_fq_data *data);
-int nn_fq_in (struct nn_fq *self, struct nn_pipe *pipe,
+void nn_fq_in (struct nn_fq *self, struct nn_pipe *pipe,
struct nn_fq_data *data);
+int nn_fq_can_recv (struct nn_fq *self);
int nn_fq_recv (struct nn_fq *self, struct nn_msg *msg, struct nn_pipe **pipe);
#endif
diff --git a/src/utils/lb.c b/src/utils/lb.c
index 5cf2250..78add2c 100644
--- a/src/utils/lb.c
+++ b/src/utils/lb.c
@@ -48,10 +48,15 @@
nn_priolist_rm (&self->priolist, pipe, &data->priolist);
}
-int nn_lb_out (struct nn_lb *self, struct nn_pipe *pipe,
+void nn_lb_out (struct nn_lb *self, struct nn_pipe *pipe,
struct nn_lb_data *data)
{
- return nn_priolist_activate (&self->priolist, pipe, &data->priolist);
+ nn_priolist_activate (&self->priolist, pipe, &data->priolist);
+}
+
+int nn_lb_can_send (struct nn_lb *self)
+{
+ return nn_priolist_is_active (&self->priolist);
}
int nn_lb_send (struct nn_lb *self, struct nn_msg *msg)
diff --git a/src/utils/lb.h b/src/utils/lb.h
index 6f4dbd0..1b6ebb0 100644
--- a/src/utils/lb.h
+++ b/src/utils/lb.h
@@ -43,8 +43,9 @@
struct nn_lb_data *data, int priority);
void nn_lb_rm (struct nn_lb *self, struct nn_pipe *pipe,
struct nn_lb_data *data);
-int nn_lb_out (struct nn_lb *self, struct nn_pipe *pipe,
+void nn_lb_out (struct nn_lb *self, struct nn_pipe *pipe,
struct nn_lb_data *data);
+int nn_lb_can_send (struct nn_lb *self);
int nn_lb_send (struct nn_lb *self, struct nn_msg *msg);
#endif
diff --git a/src/utils/priolist.c b/src/utils/priolist.c
index 457fe79..87ed754 100644
--- a/src/utils/priolist.c
+++ b/src/utils/priolist.c
@@ -61,7 +61,7 @@
nn_list_erase (&self->slots [data->priority - 1].pipes, &data->item);
}
-int nn_priolist_activate (struct nn_priolist *self, struct nn_pipe *pipe,
+void nn_priolist_activate (struct nn_priolist *self, struct nn_pipe *pipe,
struct nn_priolist_data *data)
{
struct nn_priolist_slot *slot;
@@ -72,7 +72,7 @@
going to change. */
if (!nn_list_empty (&slot->pipes)) {
nn_list_insert (&slot->pipes, &data->item, nn_list_end (&slot->pipes));
- return 0;
+ return;
}
/* Add first pipe into the slot. If there are no pipes in priolist at all
@@ -81,18 +81,22 @@
slot->current = data;
if (self->current == -1) {
self->current = data->priority;
- return 1;
+ return;
}
/* If the current priority is lower than the one of the newly activated
pipe, this slot becomes current. */
if (self->current > data->priority) {
self->current = data->priority;
- return 0;
+ return;
}
/* Current doesn't change otherwise. */
- return 0;
+}
+
+int nn_priolist_is_active (struct nn_priolist *self)
+{
+ return self->current == -1 ? 0 : 1;
}
struct nn_pipe *nn_priolist_getpipe (struct nn_priolist *self)
diff --git a/src/utils/priolist.h b/src/utils/priolist.h
index cbdd01e..ae04815 100644
--- a/src/utils/priolist.h
+++ b/src/utils/priolist.h
@@ -53,8 +53,9 @@
struct nn_priolist_data *data, int priority);
void nn_priolist_rm (struct nn_priolist *self, struct nn_pipe *pipe,
struct nn_priolist_data *data);
-int nn_priolist_activate (struct nn_priolist *self, struct nn_pipe *pipe,
+void nn_priolist_activate (struct nn_priolist *self, struct nn_pipe *pipe,
struct nn_priolist_data *data);
+int nn_priolist_is_active (struct nn_priolist *self);
struct nn_pipe *nn_priolist_getpipe (struct nn_priolist *self);
void nn_priolist_advance (struct nn_priolist *self, int release);