fixes #554 weird hang/crash & race against nn_term
diff --git a/src/core/global.c b/src/core/global.c
index 7f5bfae..b50d7bd 100644
--- a/src/core/global.c
+++ b/src/core/global.c
@@ -154,10 +154,14 @@
static int nn_global_create_socket (int domain, int protocol);
/* Socket holds. */
-static int nn_global_hold_socket(struct nn_sock **sockp, int s);
-static int nn_global_hold_socket_locked(struct nn_sock **sockp, int s);
+static int nn_global_hold_socket (struct nn_sock **sockp, int s);
+static int nn_global_hold_socket_locked (struct nn_sock **sockp, int s,
+ int is_term);
static void nn_global_rele_socket(struct nn_sock *);
+/* Close helper. */
+static int nn_close_impl (int s, int is_term);
+
int nn_errno (void)
{
return nn_err_errno ();
@@ -303,6 +307,7 @@
void nn_term (void)
{
int i;
+ struct nn_sock *sock;
nn_glock_lock ();
@@ -310,13 +315,20 @@
self.flags |= NN_CTX_FLAG_ZOMBIE;
/* Mark all open sockets as terminating. */
- if (self.socks && self.nsocks) {
+ if (self.nsocks != 0) {
for (i = 0; i != NN_MAX_SOCKETS; ++i)
- if (self.socks [i])
+ if (self.socks [i] != NULL) {
nn_sock_zombify (self.socks [i]);
+ }
}
nn_glock_unlock ();
+
+ /* Make sure we really close resources, this will cause global
+ resources to be freed too when the last socket is closed. */
+ for (i = 0; i < NN_MAX_SOCKETS; i++) {
+ (void) nn_close_impl (i, 1);
+ }
}
void *nn_allocmsg (size_t size, int type)
@@ -469,13 +481,13 @@
return rc;
}
-int nn_close (int s)
+int nn_close_impl (int s, int is_term)
{
int rc;
struct nn_sock *sock;
nn_glock_lock ();
- rc = nn_global_hold_socket_locked (&sock, s);
+ rc = nn_global_hold_socket_locked (&sock, s, is_term);
if (nn_slow (rc < 0)) {
nn_glock_unlock ();
errno = -rc;
@@ -520,6 +532,11 @@
return 0;
}
+int nn_close (int s)
+{
+ return (nn_close_impl (s, 0));
+}
+
int nn_setsockopt (int s, int level, int option, const void *optval,
size_t optvallen)
{
@@ -813,6 +830,10 @@
return (int) sz;
fail:
+ /* Socket may have closed asynchronously; check for nn_term. */
+ if (self.flags & NN_CTX_FLAG_ZOMBIE) {
+ rc = -ETERM;
+ }
nn_global_rele_socket (sock);
errno = -rc;
@@ -946,6 +967,10 @@
return (int) sz;
fail:
+ /* Socket may have closed asynchronously; check for nn_term. */
+ if (self.flags & NN_CTX_FLAG_ZOMBIE) {
+ rc = -ETERM;
+ }
nn_global_rele_socket (sock);
errno = -rc;
@@ -1114,7 +1139,7 @@
/* Get the socket structure for a socket id. This must be called under
the global lock (nn_glock_lock.) The socket itself will not be freed
while the hold is active. */
-int nn_global_hold_socket_locked(struct nn_sock **sockp, int s)
+int nn_global_hold_socket_locked(struct nn_sock **sockp, int s, int is_term)
{
struct nn_sock *sock;
@@ -1122,7 +1147,7 @@
*sockp = NULL;
return -ETERM;
}
- if (nn_slow ((self.flags & NN_CTX_FLAG_ZOMBIE) != 0)) {
+ if (nn_slow ((!is_term) && ((self.flags & NN_CTX_FLAG_ZOMBIE) != 0))) {
*sockp = NULL;
return -ETERM;
}
@@ -1131,8 +1156,9 @@
return -EBADF;
sock = self.socks[s];
- if (nn_slow (sock == NULL))
+ if (nn_slow (sock == NULL)) {
return -EBADF;
+ }
if (nn_slow (nn_sock_hold (sock) != 0)) {
return -EBADF;
@@ -1145,7 +1171,7 @@
{
int rc;
nn_glock_lock();
- rc = nn_global_hold_socket_locked(sockp, s);
+ rc = nn_global_hold_socket_locked(sockp, s, 0);
nn_glock_unlock();
return rc;
}
diff --git a/src/devices/device.c b/src/devices/device.c
index c296c6d..752cb40 100644
--- a/src/devices/device.c
+++ b/src/devices/device.c
@@ -27,20 +27,13 @@
#include "../utils/fast.h"
#include "../utils/fd.h"
#include "../utils/attr.h"
+#include "../utils/thread.h"
#include "device.h"
#include <string.h>
-#if defined NN_HAVE_WINDOWS
-#include "../utils/win.h"
-#elif defined NN_HAVE_POLL
-#include <poll.h>
-#else
-#error
-#endif
-
int nn_custom_device(struct nn_device_recipe *device, int s1, int s2,
- int flags)
+ int flags)
{
return nn_device_entry (device, s1, s2, flags);
}
@@ -51,7 +44,7 @@
}
int nn_device_entry (struct nn_device_recipe *device, int s1, int s2,
- NN_UNUSED int flags)
+ NN_UNUSED int flags)
{
int rc;
int op1;
@@ -182,18 +175,17 @@
/* Two-directional device. */
if (device->required_checks & NN_CHECK_ALLOW_BIDIRECTIONAL) {
if (s1rcv != -1 && s1snd != -1 && s2rcv != -1 && s2snd != -1)
- return nn_device_twoway (device, s1, s1rcv, s1snd,
- s2, s2rcv, s2snd);
+ return nn_device_twoway (device, s1, s2);
}
if (device->required_checks & NN_CHECK_ALLOW_UNIDIRECTIONAL) {
/* Single-directional device passing messages from s1 to s2. */
if (s1rcv != -1 && s1snd == -1 && s2rcv == -1 && s2snd != -1)
- return nn_device_oneway (device,s1, s1rcv, s2, s2snd);
+ return nn_device_oneway (device, s1, s2);
/* Single-directional device passing messages from s2 to s1. */
if (s1rcv == -1 && s1snd != -1 && s2rcv != -1 && s2snd == -1)
- return nn_device_oneway (device,s2, s2rcv, s1, s1snd);
+ return nn_device_oneway (device, s2, s1);
}
/* This should never happen. */
@@ -224,141 +216,56 @@
}
}
-#if defined NN_HAVE_WINDOWS
-
-int nn_device_twoway (struct nn_device_recipe *device,
- int s1, nn_fd s1rcv, nn_fd s1snd,
- int s2, nn_fd s2rcv, nn_fd s2snd)
-{
+struct nn_device_forwarder_args {
+ struct nn_device_recipe *device;
+ int s1;
+ int s2;
int rc;
- fd_set fds;
- int s1rcv_isready = 0;
- int s1snd_isready = 0;
- int s2rcv_isready = 0;
- int s2snd_isready = 0;
+ int err;
+};
- /* Initialise the pollset. */
- FD_ZERO (&fds);
-
+static void nn_device_forwarder (void *a)
+{
+ struct nn_device_forwarder_args *args = a;
for (;;) {
-
- /* Wait for network events. Adjust the 'ready' events based
- on the result. */
- if (s1rcv_isready)
- FD_CLR (s1rcv, &fds);
- else
- FD_SET (s1rcv, &fds);
- if (s1snd_isready)
- FD_CLR (s1snd, &fds);
- else
- FD_SET (s1snd, &fds);
- if (s2rcv_isready)
- FD_CLR (s2rcv, &fds);
- else
- FD_SET (s2rcv, &fds);
- if (s2snd_isready)
- FD_CLR (s2snd, &fds);
- else
- FD_SET (s2snd, &fds);
- rc = select (0, &fds, NULL, NULL, NULL);
- if (nn_slow (rc == SOCKET_ERROR))
- return -1;
- if (FD_ISSET (s1rcv, &fds))
- s1rcv_isready = 1;
- if (FD_ISSET (s1snd, &fds))
- s1snd_isready = 1;
- if (FD_ISSET (s2rcv, &fds))
- s2rcv_isready = 1;
- if (FD_ISSET (s2snd, &fds))
- s2snd_isready = 1;
-
- /* If possible, pass the message from s1 to s2. */
- if (s1rcv_isready && s2snd_isready) {
- rc = nn_device_mvmsg (device,s1, s2, NN_DONTWAIT);
- if (nn_slow (rc < 0))
- return -1;
- s1rcv_isready = 0;
- s2snd_isready = 0;
- }
-
- /* If possible, pass the message from s2 to s1. */
- if (s2rcv_isready && s1snd_isready) {
- rc = nn_device_mvmsg (device,s2, s1, NN_DONTWAIT);
- if (nn_slow (rc < 0))
- return -1;
- s2rcv_isready = 0;
- s1snd_isready = 0;
+ args->rc = nn_device_mvmsg (args->device, args->s1, args->s2, 0);
+ if (nn_slow (args->rc < 0)) {
+ args->err = nn_errno ();
+ return;
}
}
}
-#elif defined NN_HAVE_POLL
-
-int nn_device_twoway (struct nn_device_recipe *device,
- int s1, nn_fd s1rcv, nn_fd s1snd,
- int s2, nn_fd s2rcv, nn_fd s2snd)
+int nn_device_twoway (struct nn_device_recipe *device, int s1, int s2)
{
- int rc;
- struct pollfd pfd [4];
+ struct nn_thread t1;
+ struct nn_thread t2;
+ struct nn_device_forwarder_args a1;
+ struct nn_device_forwarder_args a2;
- /* Initialise the pollset. */
- pfd [0].fd = s1rcv;
- pfd [0].events = POLLIN;
- pfd [1].fd = s1snd;
- pfd [1].events = POLLIN;
- pfd [2].fd = s2rcv;
- pfd [2].events = POLLIN;
- pfd [3].fd = s2snd;
- pfd [3].events = POLLIN;
+ a1.device = device;
+ a1.s1 = s1;
+ a1.s2 = s2;
- for (;;) {
+ a2.device = device;
+ a2.s1 = s2;
+ a2.s2 = s1;
- /* Wait for network events. */
- rc = poll (pfd, 4, -1);
- if (nn_slow (rc < 0))
- return -1;
+ nn_thread_init (&t1, nn_device_forwarder, &a1);
+ nn_thread_init (&t2, nn_device_forwarder, &a2);
- /* We had an infinite timeout, and have already checked for errors. */
- nn_assert (rc != 0);
+ nn_thread_term (&t1);
+ nn_thread_term (&t2);
- /* Process the events. When the event is received, we cease polling
- for it. */
- if (pfd [0].revents & POLLIN)
- pfd [0].events = 0;
- if (pfd [1].revents & POLLIN)
- pfd [1].events = 0;
- if (pfd [2].revents & POLLIN)
- pfd [2].events = 0;
- if (pfd [3].revents & POLLIN)
- pfd [3].events = 0;
-
- /* If possible, pass the message from s1 to s2. */
- if (pfd [0].events == 0 && pfd [3].events == 0) {
- rc = nn_device_mvmsg (device, s1, s2, NN_DONTWAIT);
- if (nn_slow (rc < 0))
- return -1;
- pfd [0].events = POLLIN;
- pfd [3].events = POLLIN;
- }
-
- /* If possible, pass the message from s2 to s1. */
- if (pfd [2].events == 0 && pfd [1].events == 0) {
- rc = nn_device_mvmsg (device, s2, s1, NN_DONTWAIT);
- if (nn_slow (rc < 0))
- return -1;
- pfd [2].events = POLLIN;
- pfd [1].events = POLLIN;
- }
+ if (a1.rc != 0) {
+ errno = a1.err;
+ return (a1.rc);
}
+ errno = a2.err;
+ return a2.rc;
}
-#else
-#error
-#endif
-
-int nn_device_oneway (struct nn_device_recipe *device,
- int s1, NN_UNUSED nn_fd s1rcv,
- int s2, NN_UNUSED nn_fd s2snd)
+int nn_device_oneway (struct nn_device_recipe *device, int s1, int s2)
{
int rc;
@@ -386,10 +293,11 @@
hdr.msg_control = &control;
hdr.msg_controllen = NN_MSG;
rc = nn_recvmsg (from, &hdr, flags);
- if (nn_slow (rc < 0 && (nn_errno () == ETERM || nn_errno () == EBADF)))
+ if (nn_slow (rc < 0 && (nn_errno () == ETERM || nn_errno () == EBADF))) {
return -1;
+ }
errno_assert (rc >= 0);
-
+
rc = device->nn_device_rewritemsg (device, from, to, flags, &hdr, rc);
if (nn_slow (rc == -1))
return -1;
@@ -398,16 +306,16 @@
nn_assert(rc == 1);
rc = nn_sendmsg (to, &hdr, flags);
- if (nn_slow (rc < 0 && nn_errno () == ETERM))
+ if (nn_slow (rc < 0 && (nn_errno () == ETERM || nn_errno () == EBADF))) {
return -1;
+ }
errno_assert (rc >= 0);
return 0;
}
int nn_device_rewritemsg (NN_UNUSED struct nn_device_recipe *device,
NN_UNUSED int from, NN_UNUSED int to, NN_UNUSED int flags,
- NN_UNUSED struct nn_msghdr *msghdr, NN_UNUSED int bytes)
+ NN_UNUSED struct nn_msghdr *msghdr, NN_UNUSED int bytes)
{
return 1; /* always forward */
}
-
diff --git a/src/devices/device.h b/src/devices/device.h
index 4540a62..37fb2cb 100644
--- a/src/devices/device.h
+++ b/src/devices/device.h
@@ -34,12 +34,10 @@
int s1, int s2, int flags);
/* The two-way poll function. */
- int (*nn_device_twoway) (struct nn_device_recipe *device,
- int s1, nn_fd s1rcv, nn_fd s1snd, int s2, nn_fd s2rcv, nn_fd s2snd);
+ int (*nn_device_twoway) (struct nn_device_recipe *device, int s1, int s2);
/* The one-way poll function. */
- int (*nn_device_oneway) (struct nn_device_recipe *device,
- int s1, nn_fd s1rcv, int s2, nn_fd s2snd);
+ int (*nn_device_oneway) (struct nn_device_recipe *device, int s1, int s2);
int (*nn_device_loopback) (struct nn_device_recipe *device, int s);
@@ -72,10 +70,8 @@
/* Default implementations of the functions. */
int nn_device_loopback (struct nn_device_recipe *device, int s);
-int nn_device_twoway (struct nn_device_recipe *device,
- int s1, nn_fd s1rcv, nn_fd s1snd, int s2, nn_fd s2rcv, nn_fd s2snd);
-int nn_device_oneway (struct nn_device_recipe *device,
- int s1, nn_fd s1rcv, int s2, nn_fd s2snd);
+int nn_device_twoway (struct nn_device_recipe *device, int s1, int s2);
+int nn_device_oneway (struct nn_device_recipe *device, int s1, int s2);
int nn_device_mvmsg (struct nn_device_recipe *device,
int from, int to, int flags);
int nn_device_entry(struct nn_device_recipe *device,
diff --git a/tests/device.c b/tests/device.c
index 97983c0..7a639bb 100644
--- a/tests/device.c
+++ b/tests/device.c
@@ -51,7 +51,7 @@
/* Run the device. */
rc = nn_device (deva, devb);
- nn_assert (rc < 0 && (nn_errno () == ETERM || nn_errno () == EBADF));
+ nn_assert (rc < 0 && (nn_errno () == ETERM));
/* Clean up. */
test_close (devb);