ws_shutdown test added

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/.gitignore b/.gitignore
index bcf3f2f..826729b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -85,6 +85,7 @@
 tests/term
 tests/cmsg
 tests/ws
+tests/ws_shutdown
 tests/tcpmux
 *.dir
 *.vcxproj
diff --git a/CMakeLists.txt b/CMakeLists.txt
index f46fc56..67bbb95 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -71,6 +71,7 @@
 add_libnanomsg_test (tcp)
 add_libnanomsg_test (tcp_shutdown)
 add_libnanomsg_test (ws)
+add_libnanomsg_test (ws_shutdown)
 add_libnanomsg_test (tcpmux)
 
 #  Protocol tests.
diff --git a/Makefile.am b/Makefile.am
index 320102d..956426f 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -473,6 +473,7 @@
     tests/tcp \
     tests/tcp_shutdown \
     tests/ws \
+    tests/ws_shutdown \
     tests/tcpmux
 
 PROTOCOL_TESTS = \
diff --git a/src/transports/ws/aws.c b/src/transports/ws/aws.c
index 48f4883..df48c5b 100644
--- a/src/transports/ws/aws.c
+++ b/src/transports/ws/aws.c
@@ -317,9 +317,11 @@
                 return;
 
             case NN_USOCK_ERROR:
+            case NN_USOCK_SHUTDOWN:
                 nn_usock_stop (&aws->usock);
                 aws->state = NN_AWS_STATE_STOPPING_USOCK;
                 return;
+
             default:
                 nn_fsm_bad_action (aws->state, src, type);
             }
@@ -347,6 +349,7 @@
                 return;
 
             case NN_USOCK_ERROR:
+            case NN_USOCK_SHUTDOWN:
                 nn_usock_stop (&aws->usock);
                 aws->state = NN_AWS_STATE_STOPPING_USOCK;
                 return;
@@ -380,6 +383,7 @@
                     NN_STAT_ACCEPTED_CONNECTIONS, 1);
                 return;
             case NN_USOCK_ERROR:
+            case NN_USOCK_SHUTDOWN:
                 nn_usock_stop (&aws->usock);
                 aws->state = NN_AWS_STATE_STOPPING_USOCK;
                 return;
diff --git a/src/transports/ws/cws.c b/src/transports/ws/cws.c
index 506aeac..e46444d 100644
--- a/src/transports/ws/cws.c
+++ b/src/transports/ws/cws.c
@@ -465,6 +465,7 @@
                 cws->state = NN_CWS_STATE_RECEIVING_WSHDR;
                 return;
 
+            case NN_USOCK_SHUTDOWN:
             case NN_USOCK_ERROR:
                 nn_epbase_set_error (&cws->epbase,
                     nn_usock_geterrno (&cws->usock));
@@ -514,6 +515,7 @@
                 cws->state = NN_CWS_STATE_RECEIVING_SPHDR;
                 return;
 
+            case NN_USOCK_SHUTDOWN:
             case NN_USOCK_ERROR:
                 nn_epbase_set_error (&cws->epbase,
                     nn_usock_geterrno (&cws->usock));
@@ -557,6 +559,7 @@
                 nn_epbase_clear_error (&cws->epbase);
                 return;
 
+            case NN_USOCK_SHUTDOWN:
             case NN_USOCK_ERROR:
                 nn_epbase_set_error (&cws->epbase,
                     nn_usock_geterrno (&cws->usock));
diff --git a/tests/tcp_shutdown.c b/tests/tcp_shutdown.c
index 9ca3940..314d543 100644
--- a/tests/tcp_shutdown.c
+++ b/tests/tcp_shutdown.c
@@ -116,3 +116,4 @@
 
     return 0;
 }
+
diff --git a/tests/ws_shutdown.c b/tests/ws_shutdown.c
new file mode 100644
index 0000000..b683f0c
--- /dev/null
+++ b/tests/ws_shutdown.c
@@ -0,0 +1,119 @@
+/*
+    Copyright (c) 2012 Martin Sustrik  All rights reserved.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#include "../src/nn.h"
+#include "../src/pair.h"
+#include "../src/pubsub.h"
+#include "../src/pipeline.h"
+#include "../src/ws.h"
+
+#include "testutil.h"
+#include "../src/utils/attr.h"
+#include "../src/utils/thread.c"
+#include "../src/utils/atomic.c"
+
+/*  Stress test the WS transport. */
+
+#define THREAD_COUNT 100
+#define TEST2_THREAD_COUNT 10
+#define MESSAGES_PER_THREAD 10
+#define TEST_LOOPS 10
+#define SOCKET_ADDRESS "ws://127.0.0.1:5557"
+
+struct nn_atomic active;
+
+static void routine (NN_UNUSED void *arg)
+{
+    int s;
+
+    s = nn_socket (AF_SP, NN_SUB);
+    if (s < 0 && nn_errno () == EMFILE)
+        return;
+    errno_assert (s >= 0);
+    test_connect (s, SOCKET_ADDRESS);
+    test_close (s);
+}
+
+static void routine2 (NN_UNUSED void *arg)
+{
+    int s;
+    int i;
+
+    s = test_socket (AF_SP, NN_PULL);
+
+    for (i = 0; i < 10; ++i) {
+        test_connect (s, SOCKET_ADDRESS);
+    }
+
+    for (i = 0; i < MESSAGES_PER_THREAD; ++i) {
+        test_recv (s, "hello");
+    }
+
+    test_close (s);
+    nn_atomic_dec(&active, 1);
+}
+
+int main ()
+{
+    int sb;
+    int i;
+    int j;
+    struct nn_thread threads [THREAD_COUNT];
+
+    /*  Stress the shutdown algorithm. */
+
+    sb = test_socket (AF_SP, NN_PUB);
+    test_bind (sb, SOCKET_ADDRESS);
+
+    for (j = 0; j != TEST_LOOPS; ++j) {
+        for (i = 0; i != THREAD_COUNT; ++i)
+            nn_thread_init (&threads [i], routine, NULL);
+        for (i = 0; i != THREAD_COUNT; ++i)
+            nn_thread_term (&threads [i]);
+    }
+
+    test_close (sb);
+
+    /*  Test race condition of sending message while socket shutting down  */
+
+    sb = test_socket (AF_SP, NN_PUSH);
+    test_bind (sb, SOCKET_ADDRESS);
+
+    for (j = 0; j != TEST_LOOPS; ++j) {
+        for (i = 0; i != TEST2_THREAD_COUNT; ++i)
+            nn_thread_init (&threads [i], routine2, NULL);
+        nn_atomic_init(&active, TEST2_THREAD_COUNT);
+
+        while (active.n) {
+            (void) nn_send (sb, "hello", 5, NN_DONTWAIT);
+        }
+
+        for (i = 0; i != TEST2_THREAD_COUNT; ++i)
+            nn_thread_term (&threads [i]);
+        nn_atomic_term(&active);
+    }
+
+    test_close (sb);
+
+    return 0;
+}
+