fixes #509 Device is not working properly with REQ/REP
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a575468..fc6d4db 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -91,6 +91,7 @@
add_libnanomsg_test (prio)
add_libnanomsg_test (poll)
add_libnanomsg_test (device)
+add_libnanomsg_test (device4)
add_libnanomsg_test (emfile)
add_libnanomsg_test (domain)
add_libnanomsg_test (trie)
diff --git a/Makefile.am b/Makefile.am
index 41cf5b3..f97f3ad 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -493,6 +493,7 @@
tests/prio \
tests/poll \
tests/device \
+ tests/device4 \
tests/emfile \
tests/domain \
tests/trie \
diff --git a/src/devices/device.c b/src/devices/device.c
index 8bbdd47..42de60d 100644
--- a/src/devices/device.c
+++ b/src/devices/device.c
@@ -318,7 +318,7 @@
/* If possible, pass the message from s1 to s2. */
if (pfd [0].events == 0 && pfd [3].events == 0) {
- rc = nn_device_mvmsg (device,s1, s2, NN_DONTWAIT);
+ rc = nn_device_mvmsg (device, s1, s2, NN_DONTWAIT);
if (nn_slow (rc < 0))
return -1;
pfd [0].events = POLLIN;
@@ -327,7 +327,7 @@
/* If possible, pass the message from s2 to s1. */
if (pfd [2].events == 0 && pfd [1].events == 0) {
- rc = nn_device_mvmsg (device,s2, s1, NN_DONTWAIT);
+ rc = nn_device_mvmsg (device, s2, s1, NN_DONTWAIT);
if (nn_slow (rc < 0))
return -1;
pfd [2].events = POLLIN;
diff --git a/src/protocols/reqrep/xreq.c b/src/protocols/reqrep/xreq.c
index 392382f..6a2da3a 100644
--- a/src/protocols/reqrep/xreq.c
+++ b/src/protocols/reqrep/xreq.c
@@ -31,6 +31,7 @@
#include "../../utils/alloc.h"
#include "../../utils/list.h"
#include "../../utils/attr.h"
+#include "../../utils/wire.h"
struct nn_xreq_data {
struct nn_lb_data lb;
@@ -178,6 +179,9 @@
int nn_xreq_recv (struct nn_sockbase *self, struct nn_msg *msg)
{
int rc;
+ void *data;
+ int i;
+ size_t sz;
rc = nn_fq_recv (&nn_cont (self, struct nn_xreq, sockbase)->fq, msg, NULL);
if (rc == -EAGAIN)
@@ -186,21 +190,28 @@
if (!(rc & NN_PIPE_PARSED)) {
- /* Ignore malformed replies. */
- if (nn_slow (nn_chunkref_size (&msg->body) < sizeof (uint32_t))) {
- nn_msg_term (msg);
- return -EAGAIN;
- }
+ data = nn_chunkref_data (&msg->body);
+ sz = nn_chunkref_size (&msg->body);
+ i = 0;
- /* Split the message into the header and the body. */
+ for (;;) {
+ if (nn_slow ((i + 1) * sizeof (uint32_t)) > sz) {
+ nn_msg_term (msg);
+ return -EAGAIN;
+ }
+ if (nn_getl ((uint8_t*)(((uint32_t *) data) + i)) & 0x80000000)
+ break;
+
+ ++i;
+ }
+ ++i;
+ /* Split the header and the body. */
nn_assert (nn_chunkref_size (&msg->sphdr) == 0);
nn_chunkref_term (&msg->sphdr);
- nn_chunkref_init (&msg->sphdr, sizeof (uint32_t));
- memcpy (nn_chunkref_data (&msg->sphdr), nn_chunkref_data (&msg->body),
- sizeof (uint32_t));
- nn_chunkref_trim (&msg->body, sizeof (uint32_t));
+ nn_chunkref_init (&msg->sphdr, i * sizeof (uint32_t));
+ memcpy (nn_chunkref_data (&msg->sphdr), data, i * sizeof (uint32_t));
+ nn_chunkref_trim (&msg->body, i * sizeof (uint32_t));
}
-
return 0;
}
diff --git a/tests/device4.c b/tests/device4.c
new file mode 100644
index 0000000..1abcd07
--- /dev/null
+++ b/tests/device4.c
@@ -0,0 +1,96 @@
+/*
+ Copyright (c) 2012 Martin Sustrik All rights reserved.
+ Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
+ Copyright 2015 Garrett D'Amore <garrett@damore.org>
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include "../src/nn.h"
+#include "../src/reqrep.h"
+#include "../src/tcp.h"
+
+#include "testutil.h"
+#include "../src/utils/attr.h"
+#include "../src/utils/thread.c"
+
+#define SOCKET_ADDRESS_F "tcp://127.0.0.1:5565"
+#define SOCKET_ADDRESS_G "tcp://127.0.0.1:5566"
+
+void device4 (NN_UNUSED void *arg)
+{
+ int rc;
+ int devf;
+ int devg;
+
+ /* Intialise the device sockets. */
+ devf = test_socket (AF_SP_RAW, NN_REP);
+ test_bind (devf, SOCKET_ADDRESS_F);
+ devg = test_socket (AF_SP_RAW, NN_REQ);
+ test_bind (devg, SOCKET_ADDRESS_G);
+
+ /* Run the device. */
+ rc = nn_device (devf, devg);
+ nn_assert (rc < 0 && nn_errno () == ETERM);
+
+ /* Clean up. */
+ test_close (devg);
+ test_close (devf);
+}
+
+int main ()
+{
+ int endf;
+ int endg;
+ struct nn_thread thread4;
+ int timeo;
+
+ /* Test the bi-directional device with REQ/REP (headers). */
+
+ /* Start the device. */
+ nn_thread_init (&thread4, device4, NULL);
+
+ /* Create two sockets to connect to the device. */
+ endf = test_socket (AF_SP, NN_REQ);
+ test_connect (endf, SOCKET_ADDRESS_F);
+ endg = test_socket (AF_SP, NN_REP);
+ test_connect (endg, SOCKET_ADDRESS_G);
+
+ /* Wait for TCP to establish. */
+ nn_sleep (100);
+
+ /* Pass a message between endpoints. */
+ test_send (endf, "XYZ");
+ test_recv (endg, "XYZ");
+
+ /* Now send a reply. */
+ test_send (endg, "REPLYXYZ");
+ test_recv (endf, "REPLYXYZ");
+
+ /* Clean up. */
+ test_close (endg);
+ test_close (endf);
+
+ /* Shut down the devices. */
+ nn_term ();
+ nn_thread_term (&thread4);
+
+ return 0;
+}
+