fixes #554 weird hang/crash & race against nn_term
diff --git a/src/core/global.c b/src/core/global.c
index 7f5bfae..b50d7bd 100644
--- a/src/core/global.c
+++ b/src/core/global.c
@@ -154,10 +154,14 @@
 static int nn_global_create_socket (int domain, int protocol);
 
 /*  Socket holds. */
-static int nn_global_hold_socket(struct nn_sock **sockp, int s);
-static int nn_global_hold_socket_locked(struct nn_sock **sockp, int s);
+static int nn_global_hold_socket (struct nn_sock **sockp, int s);
+static int nn_global_hold_socket_locked (struct nn_sock **sockp, int s,
+    int is_term);
 static void nn_global_rele_socket(struct nn_sock *);
 
+/*  Close helper. */
+static int nn_close_impl (int s, int is_term);
+
 int nn_errno (void)
 {
     return nn_err_errno ();
@@ -303,6 +307,7 @@
 void nn_term (void)
 {
     int i;
+    struct nn_sock *sock;
 
     nn_glock_lock ();
 
@@ -310,13 +315,20 @@
     self.flags |= NN_CTX_FLAG_ZOMBIE;
 
     /*  Mark all open sockets as terminating. */
-    if (self.socks && self.nsocks) {
+    if (self.nsocks != 0) {
         for (i = 0; i != NN_MAX_SOCKETS; ++i)
-            if (self.socks [i])
+            if (self.socks [i] != NULL) {
                 nn_sock_zombify (self.socks [i]);
+            }
     }
 
     nn_glock_unlock ();
+
+    /* Make sure we really close resources, this will cause global
+       resources to be freed too when the last socket is closed. */
+    for (i = 0; i < NN_MAX_SOCKETS; i++) {
+        (void) nn_close_impl (i, 1);
+    }
 }
 
 void *nn_allocmsg (size_t size, int type)
@@ -469,13 +481,13 @@
     return rc;
 }
 
-int nn_close (int s)
+int nn_close_impl (int s, int is_term)
 {
     int rc;
     struct nn_sock *sock;
 
     nn_glock_lock ();
-    rc = nn_global_hold_socket_locked (&sock, s);
+    rc = nn_global_hold_socket_locked (&sock, s, is_term);
     if (nn_slow (rc < 0)) {
         nn_glock_unlock ();
         errno = -rc;
@@ -520,6 +532,11 @@
     return 0;
 }
 
+int nn_close (int s)
+{
+    return (nn_close_impl (s, 0));
+}
+
 int nn_setsockopt (int s, int level, int option, const void *optval,
     size_t optvallen)
 {
@@ -813,6 +830,10 @@
     return (int) sz;
 
 fail:
+    /*  Socket may have closed asynchronously; check for nn_term. */
+    if (self.flags & NN_CTX_FLAG_ZOMBIE) {
+        rc = -ETERM;
+    }
     nn_global_rele_socket (sock);
 
     errno = -rc;
@@ -946,6 +967,10 @@
     return (int) sz;
 
 fail:
+    /*  Socket may have closed asynchronously; check for nn_term. */
+    if (self.flags & NN_CTX_FLAG_ZOMBIE) {
+        rc = -ETERM;
+    }
     nn_global_rele_socket (sock);
 
     errno = -rc;
@@ -1114,7 +1139,7 @@
 /*  Get the socket structure for a socket id.  This must be called under
     the global lock (nn_glock_lock.)  The socket itself will not be freed
     while the hold is active. */
-int nn_global_hold_socket_locked(struct nn_sock **sockp, int s)
+int nn_global_hold_socket_locked(struct nn_sock **sockp, int s, int is_term)
 {
     struct nn_sock *sock;
 
@@ -1122,7 +1147,7 @@
         *sockp = NULL;
         return -ETERM;
     }
-    if (nn_slow ((self.flags & NN_CTX_FLAG_ZOMBIE) != 0)) {
+    if (nn_slow ((!is_term) && ((self.flags & NN_CTX_FLAG_ZOMBIE) != 0))) {
         *sockp = NULL;
         return -ETERM;
     }
@@ -1131,8 +1156,9 @@
         return -EBADF;
 
     sock = self.socks[s];
-    if (nn_slow (sock == NULL))
+    if (nn_slow (sock == NULL)) {
         return -EBADF;
+    }
 
     if (nn_slow (nn_sock_hold (sock) != 0)) {
         return -EBADF;
@@ -1145,7 +1171,7 @@
 {
     int rc;
     nn_glock_lock();
-    rc = nn_global_hold_socket_locked(sockp, s);
+    rc = nn_global_hold_socket_locked(sockp, s, 0);
     nn_glock_unlock();
     return rc;
 }
diff --git a/src/devices/device.c b/src/devices/device.c
index c296c6d..752cb40 100644
--- a/src/devices/device.c
+++ b/src/devices/device.c
@@ -27,20 +27,13 @@
 #include "../utils/fast.h"
 #include "../utils/fd.h"
 #include "../utils/attr.h"
+#include "../utils/thread.h"
 #include "device.h"
 
 #include <string.h>
 
-#if defined NN_HAVE_WINDOWS
-#include "../utils/win.h"
-#elif defined NN_HAVE_POLL
-#include <poll.h>
-#else
-#error
-#endif
-
 int nn_custom_device(struct nn_device_recipe *device, int s1, int s2,
-    int flags) 
+    int flags)
 {
     return nn_device_entry (device, s1, s2, flags);
 }
@@ -51,7 +44,7 @@
 }
 
 int nn_device_entry (struct nn_device_recipe *device, int s1, int s2,
-    NN_UNUSED int flags) 
+    NN_UNUSED int flags)
 {
     int rc;
     int op1;
@@ -182,18 +175,17 @@
     /*  Two-directional device. */
     if (device->required_checks & NN_CHECK_ALLOW_BIDIRECTIONAL) {
         if (s1rcv != -1 && s1snd != -1 && s2rcv != -1 && s2snd != -1)
-            return nn_device_twoway (device, s1, s1rcv, s1snd,
-                s2, s2rcv, s2snd);
+            return nn_device_twoway (device, s1, s2);
     }
 
     if (device->required_checks & NN_CHECK_ALLOW_UNIDIRECTIONAL) {
         /*  Single-directional device passing messages from s1 to s2. */
         if (s1rcv != -1 && s1snd == -1 && s2rcv == -1 && s2snd != -1)
-            return nn_device_oneway (device,s1, s1rcv, s2, s2snd);
+            return nn_device_oneway (device, s1, s2);
 
         /*  Single-directional device passing messages from s2 to s1. */
         if (s1rcv == -1 && s1snd != -1 && s2rcv != -1 && s2snd == -1)
-            return nn_device_oneway (device,s2, s2rcv, s1, s1snd);
+            return nn_device_oneway (device, s2, s1);
     }
 
     /*  This should never happen. */
@@ -224,141 +216,56 @@
     }
 }
 
-#if defined NN_HAVE_WINDOWS
-
-int nn_device_twoway (struct nn_device_recipe *device,
-    int s1, nn_fd s1rcv, nn_fd s1snd,
-    int s2, nn_fd s2rcv, nn_fd s2snd)
-{
+struct nn_device_forwarder_args {
+    struct nn_device_recipe *device;
+    int s1;
+    int s2;
     int rc;
-    fd_set fds;
-    int s1rcv_isready = 0;
-    int s1snd_isready = 0;
-    int s2rcv_isready = 0;
-    int s2snd_isready = 0;
+    int err;
+};
 
-    /*  Initialise the pollset. */
-    FD_ZERO (&fds);
-
+static void nn_device_forwarder (void *a)
+{
+    struct nn_device_forwarder_args *args = a;
     for (;;) {
-
-        /*  Wait for network events. Adjust the 'ready' events based
-            on the result. */
-        if (s1rcv_isready)
-            FD_CLR (s1rcv, &fds);
-        else
-            FD_SET (s1rcv, &fds);
-        if (s1snd_isready)
-            FD_CLR (s1snd, &fds);
-        else
-            FD_SET (s1snd, &fds);
-        if (s2rcv_isready)
-            FD_CLR (s2rcv, &fds);
-        else
-            FD_SET (s2rcv, &fds);
-        if (s2snd_isready)
-            FD_CLR (s2snd, &fds);
-        else
-            FD_SET (s2snd, &fds);
-        rc = select (0, &fds, NULL, NULL, NULL);
-        if (nn_slow (rc == SOCKET_ERROR))
-            return -1;
-        if (FD_ISSET (s1rcv, &fds))
-            s1rcv_isready = 1;
-        if (FD_ISSET (s1snd, &fds))
-            s1snd_isready = 1;
-        if (FD_ISSET (s2rcv, &fds))
-            s2rcv_isready = 1;
-        if (FD_ISSET (s2snd, &fds))
-            s2snd_isready = 1;
-
-        /*  If possible, pass the message from s1 to s2. */
-        if (s1rcv_isready && s2snd_isready) {
-            rc = nn_device_mvmsg (device,s1, s2, NN_DONTWAIT);
-            if (nn_slow (rc < 0))
-                return -1;
-            s1rcv_isready = 0;
-            s2snd_isready = 0;
-        }
-
-        /*  If possible, pass the message from s2 to s1. */
-        if (s2rcv_isready && s1snd_isready) {
-            rc = nn_device_mvmsg (device,s2, s1, NN_DONTWAIT);
-            if (nn_slow (rc < 0))
-                return -1;
-            s2rcv_isready = 0;
-            s1snd_isready = 0;
+        args->rc = nn_device_mvmsg (args->device, args->s1, args->s2, 0);
+        if (nn_slow (args->rc < 0)) {
+            args->err = nn_errno ();
+            return;
         }
     }
 }
 
-#elif defined NN_HAVE_POLL
-
-int nn_device_twoway (struct nn_device_recipe *device,
-    int s1, nn_fd s1rcv, nn_fd s1snd,
-    int s2, nn_fd s2rcv, nn_fd s2snd)
+int nn_device_twoway (struct nn_device_recipe *device, int s1, int s2)
 {
-    int rc;
-    struct pollfd pfd [4];
+    struct nn_thread t1;
+    struct nn_thread t2;
+    struct nn_device_forwarder_args a1;
+    struct nn_device_forwarder_args a2;
 
-    /*  Initialise the pollset. */
-    pfd [0].fd = s1rcv;
-    pfd [0].events = POLLIN;
-    pfd [1].fd = s1snd;
-    pfd [1].events = POLLIN;
-    pfd [2].fd = s2rcv;
-    pfd [2].events = POLLIN;
-    pfd [3].fd = s2snd;
-    pfd [3].events = POLLIN;
+    a1.device = device;
+    a1.s1 = s1;
+    a1.s2 = s2;
 
-    for (;;) {
+    a2.device = device;
+    a2.s1 = s2;
+    a2.s2 = s1;
 
-        /*  Wait for network events. */
-        rc = poll (pfd, 4, -1);
-        if (nn_slow (rc < 0))
-            return -1;
+    nn_thread_init (&t1, nn_device_forwarder, &a1);
+    nn_thread_init (&t2, nn_device_forwarder, &a2);
 
-        /*  We had an infinite timeout, and have already checked for errors. */
-        nn_assert (rc != 0);
+    nn_thread_term (&t1);
+    nn_thread_term (&t2);
 
-        /*  Process the events. When the event is received, we cease polling
-            for it. */
-        if (pfd [0].revents & POLLIN)
-            pfd [0].events = 0;
-        if (pfd [1].revents & POLLIN)
-            pfd [1].events = 0;
-        if (pfd [2].revents & POLLIN)
-            pfd [2].events = 0;
-        if (pfd [3].revents & POLLIN)
-            pfd [3].events = 0;
-
-        /*  If possible, pass the message from s1 to s2. */
-        if (pfd [0].events == 0 && pfd [3].events == 0) {
-            rc = nn_device_mvmsg (device, s1, s2, NN_DONTWAIT);
-            if (nn_slow (rc < 0))
-                return -1;
-            pfd [0].events = POLLIN;
-            pfd [3].events = POLLIN;
-        }
-
-        /*  If possible, pass the message from s2 to s1. */
-        if (pfd [2].events == 0 && pfd [1].events == 0) {
-            rc = nn_device_mvmsg (device, s2, s1, NN_DONTWAIT);
-            if (nn_slow (rc < 0))
-                return -1;
-            pfd [2].events = POLLIN;
-            pfd [1].events = POLLIN;
-        }
+    if (a1.rc != 0) {
+        errno = a1.err;
+        return (a1.rc);
     }
+    errno = a2.err;
+    return a2.rc;
 }
 
-#else
-#error
-#endif
-
-int nn_device_oneway (struct nn_device_recipe *device,
-    int s1, NN_UNUSED nn_fd s1rcv,
-    int s2, NN_UNUSED nn_fd s2snd)
+int nn_device_oneway (struct nn_device_recipe *device, int s1, int s2)
 {
     int rc;
 
@@ -386,10 +293,11 @@
     hdr.msg_control = &control;
     hdr.msg_controllen = NN_MSG;
     rc = nn_recvmsg (from, &hdr, flags);
-    if (nn_slow (rc < 0 && (nn_errno () == ETERM || nn_errno () == EBADF)))
+    if (nn_slow (rc < 0 && (nn_errno () == ETERM || nn_errno () == EBADF))) {
         return -1;
+    }
     errno_assert (rc >= 0);
-    
+
     rc = device->nn_device_rewritemsg (device, from, to, flags, &hdr, rc);
     if (nn_slow (rc == -1))
         return -1;
@@ -398,16 +306,16 @@
     nn_assert(rc == 1);
 
     rc = nn_sendmsg (to, &hdr, flags);
-    if (nn_slow (rc < 0 && nn_errno () == ETERM))
+    if (nn_slow (rc < 0 && (nn_errno () == ETERM || nn_errno () == EBADF))) {
         return -1;
+    }
     errno_assert (rc >= 0);
     return 0;
 }
 
 int nn_device_rewritemsg (NN_UNUSED struct nn_device_recipe *device,
     NN_UNUSED int from, NN_UNUSED int to, NN_UNUSED int flags,
-    NN_UNUSED struct nn_msghdr *msghdr, NN_UNUSED int bytes) 
+    NN_UNUSED struct nn_msghdr *msghdr, NN_UNUSED int bytes)
 {
     return 1; /* always forward */
 }
-
diff --git a/src/devices/device.h b/src/devices/device.h
index 4540a62..37fb2cb 100644
--- a/src/devices/device.h
+++ b/src/devices/device.h
@@ -34,12 +34,10 @@
         int s1, int s2, int flags);
 
     /*  The two-way poll function. */
-    int (*nn_device_twoway) (struct nn_device_recipe *device,
-        int s1, nn_fd s1rcv, nn_fd s1snd, int s2, nn_fd s2rcv, nn_fd s2snd);
+    int (*nn_device_twoway) (struct nn_device_recipe *device, int s1, int s2);
 
     /*  The one-way poll function. */
-    int (*nn_device_oneway) (struct nn_device_recipe *device,
-        int s1, nn_fd s1rcv, int s2, nn_fd s2snd);
+    int (*nn_device_oneway) (struct nn_device_recipe *device, int s1, int s2);
 
     int (*nn_device_loopback) (struct nn_device_recipe *device, int s);
 
@@ -72,10 +70,8 @@
 
 /*  Default implementations of the functions. */
 int nn_device_loopback (struct nn_device_recipe *device, int s);
-int nn_device_twoway (struct nn_device_recipe *device,
-    int s1, nn_fd s1rcv, nn_fd s1snd, int s2, nn_fd s2rcv, nn_fd s2snd);
-int nn_device_oneway (struct nn_device_recipe *device,
-    int s1, nn_fd s1rcv, int s2, nn_fd s2snd);
+int nn_device_twoway (struct nn_device_recipe *device, int s1, int s2);
+int nn_device_oneway (struct nn_device_recipe *device, int s1, int s2);
 int nn_device_mvmsg (struct nn_device_recipe *device,
     int from, int to, int flags);
 int nn_device_entry(struct nn_device_recipe *device,
diff --git a/tests/device.c b/tests/device.c
index 97983c0..7a639bb 100644
--- a/tests/device.c
+++ b/tests/device.c
@@ -51,7 +51,7 @@
 
     /*  Run the device. */
     rc = nn_device (deva, devb);
-    nn_assert (rc < 0 && (nn_errno () == ETERM || nn_errno () == EBADF));
+    nn_assert (rc < 0 && (nn_errno () == ETERM));
 
     /*  Clean up. */
     test_close (devb);