fixes #509 Device is not working properly with REQ/REP
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a575468..fc6d4db 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -91,6 +91,7 @@
 add_libnanomsg_test (prio)
 add_libnanomsg_test (poll)
 add_libnanomsg_test (device)
+add_libnanomsg_test (device4)
 add_libnanomsg_test (emfile)
 add_libnanomsg_test (domain)
 add_libnanomsg_test (trie)
diff --git a/Makefile.am b/Makefile.am
index 41cf5b3..f97f3ad 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -493,6 +493,7 @@
     tests/prio \
     tests/poll \
     tests/device \
+    tests/device4 \
     tests/emfile \
     tests/domain \
     tests/trie \
diff --git a/src/devices/device.c b/src/devices/device.c
index 8bbdd47..42de60d 100644
--- a/src/devices/device.c
+++ b/src/devices/device.c
@@ -318,7 +318,7 @@
 
         /*  If possible, pass the message from s1 to s2. */
         if (pfd [0].events == 0 && pfd [3].events == 0) {
-            rc = nn_device_mvmsg (device,s1, s2, NN_DONTWAIT);
+            rc = nn_device_mvmsg (device, s1, s2, NN_DONTWAIT);
             if (nn_slow (rc < 0))
                 return -1;
             pfd [0].events = POLLIN;
@@ -327,7 +327,7 @@
 
         /*  If possible, pass the message from s2 to s1. */
         if (pfd [2].events == 0 && pfd [1].events == 0) {
-            rc = nn_device_mvmsg (device,s2, s1, NN_DONTWAIT);
+            rc = nn_device_mvmsg (device, s2, s1, NN_DONTWAIT);
             if (nn_slow (rc < 0))
                 return -1;
             pfd [2].events = POLLIN;
diff --git a/src/protocols/reqrep/xreq.c b/src/protocols/reqrep/xreq.c
index 392382f..6a2da3a 100644
--- a/src/protocols/reqrep/xreq.c
+++ b/src/protocols/reqrep/xreq.c
@@ -31,6 +31,7 @@
 #include "../../utils/alloc.h"
 #include "../../utils/list.h"
 #include "../../utils/attr.h"
+#include "../../utils/wire.h"
 
 struct nn_xreq_data {
     struct nn_lb_data lb;
@@ -178,6 +179,9 @@
 int nn_xreq_recv (struct nn_sockbase *self, struct nn_msg *msg)
 {
     int rc;
+    void *data;
+    int i;
+    size_t sz;
 
     rc = nn_fq_recv (&nn_cont (self, struct nn_xreq, sockbase)->fq, msg, NULL);
     if (rc == -EAGAIN)
@@ -186,21 +190,28 @@
 
     if (!(rc & NN_PIPE_PARSED)) {
 
-        /*  Ignore malformed replies. */
-        if (nn_slow (nn_chunkref_size (&msg->body) < sizeof (uint32_t))) {
-            nn_msg_term (msg);
-            return -EAGAIN;
-        }
+	data = nn_chunkref_data (&msg->body);
+	sz = nn_chunkref_size (&msg->body);
+        i = 0;
 
-        /*  Split the message into the header and the body. */
+        for (;;) {
+	    if (nn_slow ((i + 1) * sizeof (uint32_t)) > sz) {
+                nn_msg_term (msg);
+                return -EAGAIN;
+	    }
+	    if (nn_getl ((uint8_t*)(((uint32_t *) data) + i)) & 0x80000000)
+		break;
+
+	    ++i;
+	}
+	++i;
+        /*  Split the header and the body. */
         nn_assert (nn_chunkref_size (&msg->sphdr) == 0);
         nn_chunkref_term (&msg->sphdr);
-        nn_chunkref_init (&msg->sphdr, sizeof (uint32_t));
-        memcpy (nn_chunkref_data (&msg->sphdr), nn_chunkref_data (&msg->body),
-            sizeof (uint32_t));
-        nn_chunkref_trim (&msg->body, sizeof (uint32_t));
+        nn_chunkref_init (&msg->sphdr, i * sizeof (uint32_t));
+        memcpy (nn_chunkref_data (&msg->sphdr), data, i * sizeof (uint32_t));
+        nn_chunkref_trim (&msg->body, i * sizeof (uint32_t));
     }
-
     return 0;
 }
 
diff --git a/tests/device4.c b/tests/device4.c
new file mode 100644
index 0000000..1abcd07
--- /dev/null
+++ b/tests/device4.c
@@ -0,0 +1,96 @@
+/*
+    Copyright (c) 2012 Martin Sustrik  All rights reserved.
+    Copyright (c) 2013 GoPivotal, Inc.  All rights reserved.
+    Copyright 2015 Garrett D'Amore <garrett@damore.org>
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#include "../src/nn.h"
+#include "../src/reqrep.h"
+#include "../src/tcp.h"
+
+#include "testutil.h"
+#include "../src/utils/attr.h"
+#include "../src/utils/thread.c"
+
+#define SOCKET_ADDRESS_F "tcp://127.0.0.1:5565"
+#define SOCKET_ADDRESS_G "tcp://127.0.0.1:5566"
+
+void device4 (NN_UNUSED void *arg)
+{
+    int rc;
+    int devf;
+    int devg;
+
+    /*  Intialise the device sockets. */
+    devf = test_socket (AF_SP_RAW, NN_REP);
+    test_bind (devf, SOCKET_ADDRESS_F);
+    devg = test_socket (AF_SP_RAW, NN_REQ);
+    test_bind (devg, SOCKET_ADDRESS_G);
+
+    /*  Run the device. */
+    rc = nn_device (devf, devg);
+    nn_assert (rc < 0 && nn_errno () == ETERM);
+
+    /*  Clean up. */
+    test_close (devg);
+    test_close (devf);
+}
+
+int main ()
+{
+    int endf;
+    int endg;
+    struct nn_thread thread4;
+    int timeo;
+
+    /*  Test the bi-directional device with REQ/REP (headers). */
+
+    /*  Start the device. */
+    nn_thread_init (&thread4, device4, NULL);
+
+    /*  Create two sockets to connect to the device. */
+    endf = test_socket (AF_SP, NN_REQ);
+    test_connect (endf, SOCKET_ADDRESS_F);
+    endg = test_socket (AF_SP, NN_REP);
+    test_connect (endg, SOCKET_ADDRESS_G);
+
+    /*  Wait for TCP to establish. */
+    nn_sleep (100);
+
+    /*  Pass a message between endpoints. */
+    test_send (endf, "XYZ");
+    test_recv (endg, "XYZ");
+
+    /*  Now send a reply. */
+    test_send (endg, "REPLYXYZ");
+    test_recv (endf, "REPLYXYZ");
+
+    /*  Clean up. */
+    test_close (endg);
+    test_close (endf);
+
+    /*  Shut down the devices. */
+    nn_term ();
+    nn_thread_term (&thread4);
+
+    return 0;
+}
+