nn_poll implemented

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/Makefile.am b/Makefile.am
index ce79601..6ad9ba2 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -50,6 +50,7 @@
     src/core/global.h \
     src/core/global.c \
     src/core/pipe.c \
+    src/core/poll.c \
     src/core/sock.h \
     src/core/sock.c \
     src/core/sockbase.c \
@@ -336,7 +337,8 @@
     doc/nn_sendmsg.txt \
     doc/nn_recvmsg.txt \
     doc/nn_device.txt \
-    doc/nn_cmsg.txt
+    doc/nn_cmsg.txt \
+    doc/nn_poll.txt
 
 MAN1 = \
     doc/nanocat.txt
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index ad0d77c..b0b937e 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -43,6 +43,7 @@
     core/global.h
     core/global.c
     core/pipe.c
+    core/poll.c
     core/sock.h
     core/sock.c
     core/sockbase.c
diff --git a/src/core/poll.c b/src/core/poll.c
new file mode 100644
index 0000000..63b67c3
--- /dev/null
+++ b/src/core/poll.c
@@ -0,0 +1,105 @@
+/*
+    Copyright (c) 2013 250bpm s.r.o.  All rights reserved.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#include "../nn.h"
+
+#include "../utils/alloc.h"
+#include "../utils/fast.h"
+#include "../utils/err.h"
+
+#include <poll.h>
+#include <stddef.h>
+
+int nn_poll (struct nn_pollfd *fds, int nfds, int timeout)
+{
+    int rc;
+    int i;
+    int pos;
+    int fd;
+    int res;
+    size_t sz;
+    struct pollfd *pfd;
+
+    /*  Construct a pollset to be used with OS-level 'poll' function. */
+    pfd = nn_alloc (sizeof (struct pollfd) * nfds * 2, "pollset");
+    alloc_assert (pfd);
+    pos = 0;
+    for (i = 0; i != nfds; ++i) {
+        if (fds [i].events & NN_POLLIN) {
+            sz = sizeof (fd);
+            rc = nn_getsockopt (fds [i].fd, NN_SOL_SOCKET, NN_RCVFD, &fd, &sz);
+            if (nn_slow (rc < 0)) {
+                nn_free (pfd);
+                errno = -rc;
+                return -1;
+            }
+            nn_assert (sz == sizeof (fd));
+            pfd [pos].fd = fd;
+            pfd [pos].events = POLLIN;
+            ++pos;
+        }
+        if (fds [i].events & NN_POLLOUT) {
+            sz = sizeof (fd);
+            rc = nn_getsockopt (fds [i].fd, NN_SOL_SOCKET, NN_SNDFD, &fd, &sz);
+            if (nn_slow (rc < 0)) {
+                nn_free (pfd);
+                errno = -rc;
+                return -1;
+            }
+            nn_assert (sz == sizeof (fd));
+            pfd [pos].fd = fd;
+            pfd [pos].events = POLLIN;
+            ++pos;
+        }
+    }    
+
+    /*  Do the polling itself. */
+    rc = poll (pfd, pos, timeout);
+    if (nn_slow (rc <= 0)) {
+        nn_free (pfd);
+        errno = -rc;
+        return -1;
+    }
+
+    /*  Move the results from OS-level poll to nn_poll's pollset. */
+    res = 0;
+    pos = 0;
+    for (i = 0; i != nfds; ++i) {
+        fds [i].revents = 0;
+        if (fds [i].events & NN_POLLIN) {
+            if (pfd [pos].revents & POLLIN)
+                fds [i].revents |= NN_POLLIN;
+            ++pos;
+        }
+        if (fds [i].events & NN_POLLOUT) {
+            if (pfd [pos].revents & POLLIN)
+                fds [i].revents |= NN_POLLOUT;
+            ++pos;
+        }
+        if (fds [i].revents)
+            ++res;
+    }
+
+    nn_free (pfd);
+    return res;
+}
+
diff --git a/src/nn.h b/src/nn.h
index 17b8002..24119dc 100644
--- a/src/nn.h
+++ b/src/nn.h
@@ -307,6 +307,21 @@
 NN_EXPORT int nn_recvmsg (int s, struct nn_msghdr *msghdr, int flags);
 
 /******************************************************************************/
+/*  Socket mutliplexing support.                                              */
+/******************************************************************************/
+
+#define NN_POLLIN 1
+#define NN_POLLOUT 2
+
+struct nn_pollfd {
+    int fd;
+    short events;
+    short revents;
+};
+
+NN_EXPORT int nn_poll (struct nn_pollfd *fds, int nfds, int timeout);
+
+/******************************************************************************/
 /*  Built-in support for devices.                                             */
 /******************************************************************************/
 
diff --git a/tests/poll.c b/tests/poll.c
index dfdda4c..33a62b9 100644
--- a/tests/poll.c
+++ b/tests/poll.c
@@ -125,6 +125,26 @@
     int sb;
     char buf [3];
     struct nn_thread thread;
+    struct nn_pollfd pfd [2];
+
+    /* Test nn_poll() function. */
+    sb = test_socket (AF_SP, NN_PAIR);
+    test_bind (sb, SOCKET_ADDRESS);
+    sc = test_socket (AF_SP, NN_PAIR);
+    test_connect (sc, SOCKET_ADDRESS);
+    test_send (sc, "ABC");
+    nn_sleep (100);
+    pfd [0].fd = sb;
+    pfd [0].events = NN_POLLIN | NN_POLLOUT;
+    pfd [1].fd = sc;
+    pfd [1].events = NN_POLLIN | NN_POLLOUT;
+    rc = nn_poll (pfd, 2, -1);
+    errno_assert (rc >= 0);
+    nn_assert (rc == 2);
+    nn_assert (pfd [0].revents == NN_POLLIN | NN_POLLOUT);
+    nn_assert (pfd [1].revents == NN_POLLOUT);
+    test_close (sc);
+    test_close (sb);
 
     /*  Create a simple topology. */
     sb = test_socket (AF_SP, NN_PAIR);