| /* |
| Copyright (c) 2013 Martin Sustrik All rights reserved. |
| Copyright (c) 2013 GoPivotal, Inc. All rights reserved. |
| |
| Permission is hereby granted, free of charge, to any person obtaining a copy |
| of this software and associated documentation files (the "Software"), |
| to deal in the Software without restriction, including without limitation |
| the rights to use, copy, modify, merge, publish, distribute, sublicense, |
| and/or sell copies of the Software, and to permit persons to whom |
| the Software is furnished to do so, subject to the following conditions: |
| |
| The above copyright notice and this permission notice shall be included |
| in all copies or substantial portions of the Software. |
| |
| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL |
| THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
| FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| IN THE SOFTWARE. |
| */ |
| |
| #include "worker.h" |
| |
| #include "../utils/err.h" |
| #include "../utils/cont.h" |
| #include "../utils/alloc.h" |
| |
| #include <stddef.h> |
| #include <string.h> |
| #include <limits.h> |
| |
| #define NN_USOCK_STATE_IDLE 1 |
| #define NN_USOCK_STATE_STARTING 2 |
| #define NN_USOCK_STATE_BEING_ACCEPTED 3 |
| #define NN_USOCK_STATE_ACCEPTED 4 |
| #define NN_USOCK_STATE_CONNECTING 5 |
| #define NN_USOCK_STATE_ACTIVE 6 |
| #define NN_USOCK_STATE_CANCELLING_IO 7 |
| #define NN_USOCK_STATE_DONE 8 |
| #define NN_USOCK_STATE_LISTENING 9 |
| #define NN_USOCK_STATE_ACCEPTING 10 |
| #define NN_USOCK_STATE_CANCELLING 11 |
| #define NN_USOCK_STATE_STOPPING 12 |
| #define NN_USOCK_STATE_STOPPING_ACCEPT 13 |
| |
| #define NN_USOCK_ACTION_ACCEPT 1 |
| #define NN_USOCK_ACTION_BEING_ACCEPTED 2 |
| #define NN_USOCK_ACTION_CANCEL 3 |
| #define NN_USOCK_ACTION_LISTEN 4 |
| #define NN_USOCK_ACTION_CONNECT 5 |
| #define NN_USOCK_ACTION_ACTIVATE 6 |
| #define NN_USOCK_ACTION_DONE 7 |
| #define NN_USOCK_ACTION_ERROR 8 |
| |
| #define NN_USOCK_SRC_IN 1 |
| #define NN_USOCK_SRC_OUT 2 |
| |
| /* Private functions. */ |
| static void nn_usock_handler (struct nn_fsm *self, int src, int type, |
| void *srcptr); |
| static void nn_usock_shutdown (struct nn_fsm *self, int src, int type, |
| void *srcptr); |
| static int nn_usock_cancel_io (struct nn_usock *self); |
| static void nn_usock_create_io_completion (struct nn_usock *self); |
| DWORD nn_usock_open_pipe (struct nn_usock *self, const char *name); |
| void nn_usock_accept_pipe (struct nn_usock *self, struct nn_usock *listener); |
| |
| void nn_usock_init (struct nn_usock *self, int src, struct nn_fsm *owner) |
| { |
| nn_fsm_init (&self->fsm, nn_usock_handler, nn_usock_shutdown, |
| src, self, owner); |
| self->state = NN_USOCK_STATE_IDLE; |
| self->s = INVALID_SOCKET; |
| self->isaccepted = 0; |
| nn_worker_op_init (&self->in, NN_USOCK_SRC_IN, &self->fsm); |
| nn_worker_op_init (&self->out, NN_USOCK_SRC_OUT, &self->fsm); |
| self->domain = -1; |
| self->type = -1; |
| self->protocol = -1; |
| |
| /* Intialise events raised by usock. */ |
| nn_fsm_event_init (&self->event_established); |
| nn_fsm_event_init (&self->event_sent); |
| nn_fsm_event_init (&self->event_received); |
| nn_fsm_event_init (&self->event_error); |
| |
| /* No accepting is going on at the moment. */ |
| self->asock = NULL; |
| self->ainfo = NULL; |
| |
| /* NamedPipe-related stuff. */ |
| memset (&self->pipename, 0, sizeof (self->pipename)); |
| self->pipesendbuf = NULL; |
| self->sec_attr = NULL; |
| |
| /* default size for both in and out buffers is 4096 */ |
| self->outbuffersz = 4096; |
| self->inbuffersz = 4096; |
| } |
| |
| void nn_usock_term (struct nn_usock *self) |
| { |
| nn_assert_state (self, NN_USOCK_STATE_IDLE); |
| |
| if (self->ainfo) |
| nn_free (self->ainfo); |
| if (self->pipesendbuf) |
| nn_free (self->pipesendbuf); |
| nn_fsm_event_term (&self->event_error); |
| nn_fsm_event_term (&self->event_received); |
| nn_fsm_event_term (&self->event_sent); |
| nn_fsm_event_term (&self->event_established); |
| nn_worker_op_term (&self->out); |
| nn_worker_op_term (&self->in); |
| nn_fsm_term (&self->fsm); |
| } |
| |
| int nn_usock_isidle (struct nn_usock *self) |
| { |
| return nn_fsm_isidle (&self->fsm); |
| } |
| |
| int nn_usock_start (struct nn_usock *self, int domain, int type, int protocol) |
| { |
| int rc; |
| #if defined IPV6_V6ONLY |
| DWORD only; |
| #endif |
| #if defined HANDLE_FLAG_INHERIT |
| BOOL brc; |
| #endif |
| |
| /* NamedPipes aren't sockets. They don't need all the socket |
| initialisation stuff. */ |
| if (domain != AF_UNIX) { |
| |
| /* Open the underlying socket. */ |
| self->s = socket (domain, type, protocol); |
| if (self->s == INVALID_SOCKET) |
| return -nn_err_wsa_to_posix (WSAGetLastError ()); |
| |
| /* Disable inheriting the socket to the child processes. */ |
| #if defined HANDLE_FLAG_INHERIT |
| brc = SetHandleInformation (self->p, HANDLE_FLAG_INHERIT, 0); |
| win_assert (brc); |
| #endif |
| |
| /* IPv4 mapping for IPv6 sockets is disabled by default. Switch it on. */ |
| #if defined IPV6_V6ONLY |
| if (domain == AF_INET6) { |
| only = 0; |
| rc = setsockopt (self->s, IPPROTO_IPV6, IPV6_V6ONLY, |
| (const char*) &only, sizeof (only)); |
| wsa_assert (rc != SOCKET_ERROR); |
| } |
| #endif |
| |
| /* Associate the socket with a worker thread/completion port. */ |
| nn_usock_create_io_completion (self); |
| } |
| |
| /* Remember the type of the socket. */ |
| self->domain = domain; |
| self->type = type; |
| self->protocol = protocol; |
| |
| /* Start the state machine. */ |
| nn_fsm_start (&self->fsm); |
| |
| return 0; |
| } |
| |
| void nn_usock_start_fd (struct nn_usock *self, int fd) |
| { |
| nn_assert (0); |
| } |
| |
| void nn_usock_stop (struct nn_usock *self) |
| { |
| nn_fsm_stop (&self->fsm); |
| } |
| |
| void nn_usock_swap_owner (struct nn_usock *self, struct nn_fsm_owner *owner) |
| { |
| nn_fsm_swap_owner (&self->fsm, owner); |
| } |
| |
| int nn_usock_setsockopt (struct nn_usock *self, int level, int optname, |
| const void *optval, size_t optlen) |
| { |
| int rc; |
| |
| /* NamedPipes aren't sockets. We can't set socket options on them. |
| For now we'll ignore the options. */ |
| if (self->domain == AF_UNIX) |
| return 0; |
| |
| /* The socket can be modified only before it's active. */ |
| nn_assert (self->state == NN_USOCK_STATE_STARTING || |
| self->state == NN_USOCK_STATE_ACCEPTED); |
| |
| nn_assert (optlen < INT_MAX); |
| |
| rc = setsockopt (self->s, level, optname, (char*) optval, (int) optlen); |
| if (nn_slow (rc == SOCKET_ERROR)) |
| return -nn_err_wsa_to_posix (WSAGetLastError ()); |
| |
| return 0; |
| } |
| |
| int nn_usock_bind (struct nn_usock *self, const struct sockaddr *addr, |
| size_t addrlen) |
| { |
| int rc; |
| ULONG opt; |
| |
| /* In the case of named pipes, let's save the address |
| for the later use. */ |
| if (self->domain == AF_UNIX) { |
| if (addrlen > sizeof (struct sockaddr_un)) |
| return -EINVAL; |
| memcpy (&self->pipename, addr, addrlen); |
| return 0; |
| } |
| |
| /* You can set socket options only before the socket is connected. */ |
| nn_assert_state (self, NN_USOCK_STATE_STARTING); |
| |
| /* On Windows, the bound port can be hijacked |
| if SO_EXCLUSIVEADDRUSE is not set. */ |
| opt = 1; |
| rc = setsockopt (self->s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, |
| (const char*) &opt, sizeof (opt)); |
| wsa_assert (rc != SOCKET_ERROR); |
| |
| nn_assert (addrlen < INT_MAX); |
| rc = bind (self->s, addr, (int) addrlen); |
| if (nn_slow (rc == SOCKET_ERROR)) |
| return -nn_err_wsa_to_posix (WSAGetLastError ()); |
| |
| return 0; |
| } |
| |
| int nn_usock_listen (struct nn_usock *self, int backlog) |
| { |
| int rc; |
| |
| /* You can start listening only before the socket is connected. */ |
| nn_assert_state (self, NN_USOCK_STATE_STARTING); |
| |
| /* Start listening for incoming connections. NamedPipes are already |
| created in the listening state, so no need to do anything here. */ |
| if (self->domain != AF_UNIX) { |
| rc = listen (self->s, backlog); |
| if (nn_slow (rc == SOCKET_ERROR)) |
| return -nn_err_wsa_to_posix (WSAGetLastError ()); |
| } |
| |
| /* Notify the state machine. */ |
| nn_fsm_action (&self->fsm, NN_USOCK_ACTION_LISTEN); |
| |
| return 0; |
| } |
| |
| void nn_usock_accept (struct nn_usock *self, struct nn_usock *listener) |
| { |
| int rc; |
| BOOL brc; |
| DWORD nbytes; |
| |
| /* NamedPipes have their own accepting mechanism. */ |
| if (listener->domain == AF_UNIX) { |
| nn_usock_accept_pipe (self, listener); |
| return; |
| } |
| |
| rc = nn_usock_start (self, listener->domain, listener->type, |
| listener->protocol); |
| /* TODO: EMFILE can be returned here. */ |
| errnum_assert (rc == 0, -rc); |
| nn_fsm_action (&listener->fsm, NN_USOCK_ACTION_ACCEPT); |
| nn_fsm_action (&self->fsm, NN_USOCK_ACTION_BEING_ACCEPTED); |
| |
| /* If the memory for accept information is not yet allocated, do so. */ |
| if (!listener->ainfo) { |
| listener->ainfo = nn_alloc (512, "accept info"); |
| alloc_assert (listener->ainfo); |
| } |
| |
| /* Wait for the incoming connection. */ |
| memset (&listener->in.olpd, 0, sizeof (listener->in.olpd)); |
| brc = AcceptEx (listener->s, self->s, listener->ainfo, 0, 256, 256, &nbytes, |
| &listener->in.olpd); |
| |
| /* Immediate success. */ |
| if (nn_fast (brc == TRUE)) { |
| nn_fsm_action (&listener->fsm, NN_USOCK_ACTION_DONE); |
| nn_fsm_action (&self->fsm, NN_USOCK_ACTION_DONE); |
| return; |
| } |
| |
| /* We don't expect a synchronous failure at this point. */ |
| wsa_assert (nn_slow (WSAGetLastError () == WSA_IO_PENDING)); |
| |
| /* Pair the two sockets. */ |
| nn_assert (!self->asock); |
| self->asock = listener; |
| nn_assert (!listener->asock); |
| listener->asock = self; |
| |
| /* Asynchronous accept. */ |
| nn_worker_op_start (&listener->in, 0); |
| } |
| |
| void nn_usock_activate (struct nn_usock *self) |
| { |
| nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ACTIVATE); |
| } |
| |
| void nn_usock_connect (struct nn_usock *self, const struct sockaddr *addr, |
| size_t addrlen) |
| { |
| BOOL brc; |
| GUID fid = WSAID_CONNECTEX; |
| LPFN_CONNECTEX pconnectex; |
| DWORD nbytes; |
| DWORD winerror; |
| |
| /* Fail if the socket is already connected, closed or such. */ |
| nn_assert_state (self, NN_USOCK_STATE_STARTING); |
| |
| /* Notify the state machine that we've started connecting. */ |
| nn_fsm_action (&self->fsm, NN_USOCK_ACTION_CONNECT); |
| |
| memset (&self->out.olpd, 0, sizeof (self->out.olpd)); |
| |
| if (self->domain == AF_UNIX) { |
| winerror = nn_usock_open_pipe (self, ((struct sockaddr_un*) addr)->sun_path); |
| } |
| else |
| { |
| /* Get the pointer to connect function. */ |
| brc = WSAIoctl (self->s, SIO_GET_EXTENSION_FUNCTION_POINTER, |
| &fid, sizeof (fid), &pconnectex, sizeof (pconnectex), |
| &nbytes, NULL, NULL) == 0; |
| wsa_assert (brc == TRUE); |
| nn_assert (nbytes == sizeof (pconnectex)); |
| |
| /* Ensure it is safe to cast this value to what might be a smaller |
| integer type to conform to the pconnectex function signature. */ |
| nn_assert (addrlen < INT_MAX); |
| |
| /* Connect itself. */ |
| brc = pconnectex (self->s, addr, (int) addrlen, NULL, 0, NULL, |
| &self->out.olpd); |
| winerror = brc ? ERROR_SUCCESS : WSAGetLastError (); |
| } |
| |
| /* Immediate success. */ |
| if (nn_fast (winerror == ERROR_SUCCESS)) { |
| nn_fsm_action (&self->fsm, NN_USOCK_ACTION_DONE); |
| return; |
| } |
| |
| /* Immediate error. */ |
| if (nn_slow (winerror != WSA_IO_PENDING)) { |
| nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR); |
| return; |
| } |
| |
| /* Asynchronous connect. */ |
| nn_worker_op_start (&self->out, 0); |
| } |
| |
| void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov, |
| int iovcnt) |
| { |
| int rc; |
| BOOL brc; |
| WSABUF wbuf [NN_USOCK_MAX_IOVCNT]; |
| int i; |
| size_t len; |
| size_t idx; |
| DWORD error; |
| |
| /* Make sure that the socket is actually alive. */ |
| nn_assert_state (self, NN_USOCK_STATE_ACTIVE); |
| |
| /* Create a WinAPI-style iovec. */ |
| len = 0; |
| nn_assert (iovcnt <= NN_USOCK_MAX_IOVCNT); |
| for (i = 0; i != iovcnt; ++i) { |
| wbuf [i].buf = (char FAR*) iov [i].iov_base; |
| wbuf [i].len = (ULONG) iov [i].iov_len; |
| len += iov [i].iov_len; |
| } |
| |
| /* Start the send operation. */ |
| memset (&self->out.olpd, 0, sizeof (self->out.olpd)); |
| if (self->domain == AF_UNIX) |
| { |
| /* TODO: Do not copy the buffer, find an efficent way to Write |
| multiple buffers that doesn't affect the state machine. */ |
| |
| /* Ensure the total buffer size does not exceed size limitation |
| of WriteFile. */ |
| nn_assert (len <= MAXDWORD); |
| |
| nn_assert (!self->pipesendbuf); |
| self->pipesendbuf = nn_alloc (len, "named pipe sendbuf"); |
| |
| idx = 0; |
| for (i = 0; i != iovcnt; ++i) { |
| memcpy ((char*)(self->pipesendbuf) + idx, iov [i].iov_base, iov [i].iov_len); |
| idx += iov [i].iov_len; |
| } |
| brc = WriteFile (self->p, self->pipesendbuf, (DWORD) len, NULL, &self->out.olpd); |
| if (nn_fast (brc || GetLastError() == ERROR_IO_PENDING)) { |
| nn_worker_op_start (&self->out, 0); |
| return; |
| } |
| error = GetLastError(); |
| win_assert (error == ERROR_NO_DATA); |
| self->errnum = EINVAL; |
| nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR); |
| return; |
| } |
| |
| rc = WSASend (self->s, wbuf, iovcnt, NULL, 0, &self->out.olpd, NULL); |
| if (nn_fast (rc == 0)) { |
| nn_worker_op_start (&self->out, 0); |
| return; |
| } |
| error = WSAGetLastError(); |
| if (nn_fast (error == WSA_IO_PENDING)) { |
| nn_worker_op_start (&self->out, 0); |
| return; |
| } |
| wsa_assert (error == WSAECONNABORTED || error == WSAECONNRESET || |
| error == WSAENETDOWN || error == WSAENETRESET || |
| error == WSAENOBUFS || error == WSAEWOULDBLOCK); |
| self->errnum = nn_err_wsa_to_posix (error); |
| nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR); |
| } |
| |
| void nn_usock_recv (struct nn_usock *self, void *buf, size_t len, int *fd) |
| { |
| int rc; |
| BOOL brc; |
| WSABUF wbuf; |
| DWORD wflags; |
| DWORD error; |
| |
| /* Passing file descriptors is not implemented on Windows platform. */ |
| if (fd) |
| *fd = -1; |
| |
| /* Make sure that the socket is actually alive. */ |
| nn_assert_state (self, NN_USOCK_STATE_ACTIVE); |
| |
| /* Start the receive operation. */ |
| wbuf.len = (ULONG) len; |
| wbuf.buf = (char FAR*) buf; |
| wflags = MSG_WAITALL; |
| memset (&self->in.olpd, 0, sizeof (self->in.olpd)); |
| |
| if (self->domain == AF_UNIX) { |
| |
| /* Ensure the total buffer size does not exceed size limitation |
| of WriteFile. */ |
| nn_assert (len <= MAXDWORD); |
| brc = ReadFile (self->p, buf, (DWORD) len, NULL, &self->in.olpd); |
| error = brc ? ERROR_SUCCESS : GetLastError (); |
| } |
| else { |
| rc = WSARecv (self->s, &wbuf, 1, NULL, &wflags, &self->in.olpd, NULL); |
| error = (rc == 0) ? ERROR_SUCCESS : WSAGetLastError (); |
| } |
| |
| if (nn_fast (error == ERROR_SUCCESS)) { |
| nn_worker_op_start (&self->in, 1); |
| return; |
| } |
| |
| if (nn_fast (error == WSA_IO_PENDING)) { |
| nn_worker_op_start (&self->in, 1); |
| return; |
| } |
| |
| if (error == WSAECONNABORTED || error == WSAECONNRESET || |
| error == WSAENETDOWN || error == WSAENETRESET || |
| error == WSAETIMEDOUT || error == WSAEWOULDBLOCK || |
| error == ERROR_PIPE_NOT_CONNECTED || error == ERROR_BROKEN_PIPE) { |
| nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR); |
| return; |
| } |
| |
| wsa_assert (0); |
| } |
| |
| static void nn_usock_create_io_completion (struct nn_usock *self) |
| { |
| struct nn_worker *worker; |
| HANDLE cp; |
| |
| /* Associate the socket with a worker thread/completion port. */ |
| worker = nn_fsm_choose_worker (&self->fsm); |
| cp = CreateIoCompletionPort ( |
| self->p, |
| nn_worker_getcp(worker), |
| (ULONG_PTR) NULL, |
| 0); |
| nn_assert(cp); |
| } |
| |
| static void nn_usock_create_pipe (struct nn_usock *self, const char *name) |
| { |
| char fullname [256]; |
| /* First, create a fully qualified name for the named pipe. */ |
| _snprintf(fullname, sizeof (fullname), "\\\\.\\pipe\\%s", name); |
| |
| self->p = CreateNamedPipeA ( |
| (char*) fullname, |
| PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, |
| PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | |
| PIPE_WAIT | PIPE_REJECT_REMOTE_CLIENTS, |
| PIPE_UNLIMITED_INSTANCES, |
| self->outbuffersz, |
| self->inbuffersz, |
| 0, |
| self->sec_attr); |
| |
| /* TODO: How to properly handle self->p == INVALID_HANDLE_VALUE? */ |
| win_assert (self->p != INVALID_HANDLE_VALUE); |
| |
| self->isaccepted = 1; |
| nn_usock_create_io_completion (self); |
| } |
| |
| DWORD nn_usock_open_pipe (struct nn_usock *self, const char *name) |
| { |
| char fullname [256]; |
| DWORD winerror; |
| DWORD mode; |
| BOOL brc; |
| |
| /* First, create a fully qualified name for the named pipe. */ |
| _snprintf(fullname, sizeof (fullname), "\\\\.\\pipe\\%s", name); |
| |
| self->p = CreateFileA ( |
| fullname, |
| GENERIC_READ | GENERIC_WRITE, |
| 0, |
| self->sec_attr, |
| OPEN_ALWAYS, |
| FILE_FLAG_OVERLAPPED, |
| NULL); |
| |
| if (self->p == INVALID_HANDLE_VALUE) |
| return GetLastError (); |
| |
| mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; |
| brc = SetNamedPipeHandleState ( |
| self->p, |
| &mode, |
| NULL, |
| NULL); |
| if (!brc) { |
| CloseHandle (self->p); |
| self->p = INVALID_HANDLE_VALUE; |
| return GetLastError (); |
| } |
| self->isaccepted = 0; |
| nn_usock_create_io_completion (self); |
| |
| winerror = GetLastError (); |
| if (winerror != ERROR_SUCCESS && winerror != ERROR_ALREADY_EXISTS) |
| return winerror; |
| |
| return ERROR_SUCCESS; |
| } |
| |
| void nn_usock_accept_pipe (struct nn_usock *self, struct nn_usock *listener) |
| { |
| int rc; |
| BOOL brc; |
| DWORD winerror; |
| |
| /* TODO: EMFILE can be returned here. */ |
| rc = nn_usock_start (self, listener->domain, listener->type, |
| listener->protocol); |
| errnum_assert(rc == 0, -rc); |
| |
| nn_fsm_action(&listener->fsm, NN_USOCK_ACTION_ACCEPT); |
| nn_fsm_action(&self->fsm, NN_USOCK_ACTION_BEING_ACCEPTED); |
| |
| /* If the memory for accept information is not yet allocated, do so now. */ |
| if (!listener->ainfo) { |
| listener->ainfo = nn_alloc (512, "accept info"); |
| alloc_assert (listener->ainfo); |
| } |
| |
| /* Wait for the incoming connection. */ |
| memset (&listener->in.olpd, 0, sizeof(listener->in.olpd)); |
| nn_usock_create_pipe (self, listener->pipename.sun_path); |
| brc = ConnectNamedPipe (self->p, (LPOVERLAPPED) &listener->in.olpd); |
| |
| /* TODO: Can this function possibly succeed? */ |
| nn_assert (brc == 0); |
| winerror = GetLastError(); |
| |
| /* Immediate success. */ |
| if (nn_fast (winerror == ERROR_PIPE_CONNECTED)) { |
| nn_fsm_action (&listener->fsm, NN_USOCK_ACTION_DONE); |
| nn_fsm_action (&self->fsm, NN_USOCK_ACTION_DONE); |
| return; |
| } |
| |
| /* We don't expect a synchronous failure at this point. */ |
| wsa_assert (nn_slow (winerror == WSA_IO_PENDING)); |
| |
| /* Pair the two sockets. */ |
| nn_assert (!self->asock); |
| self->asock = listener; |
| nn_assert (!listener->asock); |
| listener->asock = self; |
| |
| /* Asynchronous accept. */ |
| nn_worker_op_start (&listener->in, 0); |
| } |
| |
| static void nn_usock_close (struct nn_usock *self) |
| { |
| int rc; |
| BOOL brc; |
| |
| if (self->domain == AF_UNIX) { |
| if (self->p == INVALID_HANDLE_VALUE) |
| return; |
| if (self->isaccepted) |
| DisconnectNamedPipe(self->p); |
| brc = CloseHandle (self->p); |
| self->p = INVALID_HANDLE_VALUE; |
| win_assert (brc); |
| } |
| else |
| { |
| rc = closesocket (self->s); |
| self->s = INVALID_SOCKET; |
| wsa_assert (rc == 0); |
| } |
| } |
| |
| static void nn_usock_shutdown (struct nn_fsm *self, int src, int type, |
| void *srcptr) |
| { |
| struct nn_usock *usock; |
| |
| usock = nn_cont (self, struct nn_usock, fsm); |
| |
| if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) { |
| |
| /* Socket in ACCEPTING state cannot be closed. |
| Stop the socket being accepted first. */ |
| nn_assert (usock->state != NN_USOCK_STATE_ACCEPTING); |
| |
| /* Synchronous stop. */ |
| if (usock->state == NN_USOCK_STATE_IDLE) |
| goto finish3; |
| if (usock->state == NN_USOCK_STATE_DONE) |
| goto finish2; |
| if (usock->state == NN_USOCK_STATE_STARTING || |
| usock->state == NN_USOCK_STATE_ACCEPTED || |
| usock->state == NN_USOCK_STATE_LISTENING) |
| goto finish1; |
| |
| /* When socket that's being accepted is asked to stop, we have to |
| ask the listener socket to stop accepting first. */ |
| if (usock->state == NN_USOCK_STATE_BEING_ACCEPTED) { |
| nn_fsm_action (&usock->asock->fsm, NN_USOCK_ACTION_CANCEL); |
| usock->state = NN_USOCK_STATE_STOPPING_ACCEPT; |
| return; |
| } |
| |
| /* If we were already in the process of cancelling overlapped |
| operations, we don't have to do anything. Continue waiting |
| till cancelling is finished. */ |
| if (usock->state == NN_USOCK_STATE_CANCELLING_IO) { |
| usock->state = NN_USOCK_STATE_STOPPING; |
| return; |
| } |
| |
| /* Notify our parent that pipe socket is shutting down */ |
| nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_SHUTDOWN); |
| |
| /* In all remaining states we'll simply cancel all overlapped |
| operations. */ |
| if (nn_usock_cancel_io (usock) == 0) |
| goto finish1; |
| usock->state = NN_USOCK_STATE_STOPPING; |
| return; |
| } |
| if (nn_slow (usock->state == NN_USOCK_STATE_STOPPING_ACCEPT)) { |
| nn_assert (src == NN_FSM_ACTION && type == NN_USOCK_ACTION_DONE); |
| goto finish1; |
| } |
| if (nn_slow (usock->state == NN_USOCK_STATE_STOPPING)) { |
| if (!nn_worker_op_isidle (&usock->in) || |
| !nn_worker_op_isidle (&usock->out)) |
| return; |
| finish1: |
| nn_usock_close(usock); |
| finish2: |
| usock->state = NN_USOCK_STATE_IDLE; |
| nn_fsm_stopped (&usock->fsm, NN_USOCK_STOPPED); |
| finish3: |
| return; |
| } |
| |
| nn_fsm_bad_state(usock->state, src, type); |
| } |
| |
| static void nn_usock_handler (struct nn_fsm *self, int src, int type, |
| void *srcptr) |
| { |
| struct nn_usock *usock; |
| |
| usock = nn_cont (self, struct nn_usock, fsm); |
| |
| switch (usock->state) { |
| |
| /*****************************************************************************/ |
| /* IDLE state. */ |
| /*****************************************************************************/ |
| case NN_USOCK_STATE_IDLE: |
| switch (src) { |
| case NN_FSM_ACTION: |
| switch (type) { |
| case NN_FSM_START: |
| usock->state = NN_USOCK_STATE_STARTING; |
| return; |
| default: |
| nn_fsm_bad_action (usock->state, src, type); |
| } |
| default: |
| nn_fsm_bad_source (usock->state, src, type); |
| } |
| |
| /*****************************************************************************/ |
| /* STARTING state. */ |
| /*****************************************************************************/ |
| case NN_USOCK_STATE_STARTING: |
| switch (src) { |
| case NN_FSM_ACTION: |
| switch (type) { |
| case NN_USOCK_ACTION_LISTEN: |
| usock->state = NN_USOCK_STATE_LISTENING; |
| return; |
| case NN_USOCK_ACTION_CONNECT: |
| usock->state = NN_USOCK_STATE_CONNECTING; |
| return; |
| case NN_USOCK_ACTION_BEING_ACCEPTED: |
| usock->state = NN_USOCK_STATE_BEING_ACCEPTED; |
| return; |
| default: |
| nn_fsm_bad_action (usock->state, src, type); |
| } |
| default: |
| nn_fsm_bad_source (usock->state, src, type); |
| } |
| |
| /*****************************************************************************/ |
| /* BEING_ACCEPTED state. */ |
| /*****************************************************************************/ |
| case NN_USOCK_STATE_BEING_ACCEPTED: |
| switch (src) { |
| case NN_FSM_ACTION: |
| switch (type) { |
| case NN_USOCK_ACTION_DONE: |
| usock->state = NN_USOCK_STATE_ACCEPTED; |
| nn_fsm_raise (&usock->fsm, &usock->event_established, |
| NN_USOCK_ACCEPTED); |
| return; |
| default: |
| nn_fsm_bad_action (usock->state, src, type); |
| } |
| default: |
| nn_fsm_bad_source (usock->state, src, type); |
| } |
| |
| /*****************************************************************************/ |
| /* ACCEPTED state. */ |
| /*****************************************************************************/ |
| case NN_USOCK_STATE_ACCEPTED: |
| switch (src) { |
| case NN_FSM_ACTION: |
| switch (type) { |
| case NN_USOCK_ACTION_ACTIVATE: |
| usock->state = NN_USOCK_STATE_ACTIVE; |
| return; |
| default: |
| nn_fsm_bad_action (usock->state, src, type); |
| } |
| default: |
| nn_fsm_bad_source (usock->state, src, type); |
| } |
| |
| /*****************************************************************************/ |
| /* CONNECTING state. */ |
| /*****************************************************************************/ |
| case NN_USOCK_STATE_CONNECTING: |
| switch (src) { |
| case NN_FSM_ACTION: |
| switch (type) { |
| case NN_USOCK_ACTION_DONE: |
| usock->state = NN_USOCK_STATE_ACTIVE; |
| nn_fsm_raise (&usock->fsm, &usock->event_established, |
| NN_USOCK_CONNECTED); |
| return; |
| case NN_USOCK_ACTION_ERROR: |
| nn_usock_close(usock); |
| usock->state = NN_USOCK_STATE_DONE; |
| nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR); |
| return; |
| default: |
| nn_fsm_bad_action (usock->state, src, type); |
| } |
| case NN_USOCK_SRC_OUT: |
| switch (type) { |
| case NN_WORKER_OP_DONE: |
| usock->state = NN_USOCK_STATE_ACTIVE; |
| nn_fsm_raise (&usock->fsm, &usock->event_established, |
| NN_USOCK_CONNECTED); |
| return; |
| case NN_WORKER_OP_ERROR: |
| nn_usock_close(usock); |
| usock->state = NN_USOCK_STATE_DONE; |
| nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR); |
| return; |
| default: |
| nn_fsm_bad_action (usock->state, src, type); |
| } |
| default: |
| nn_fsm_bad_source (usock->state, src, type); |
| } |
| |
| /*****************************************************************************/ |
| /* ACTIVE state. */ |
| /*****************************************************************************/ |
| case NN_USOCK_STATE_ACTIVE: |
| switch (src) { |
| case NN_USOCK_SRC_IN: |
| switch (type) { |
| case NN_WORKER_OP_DONE: |
| nn_fsm_raise (&usock->fsm, &usock->event_received, |
| NN_USOCK_RECEIVED); |
| return; |
| case NN_WORKER_OP_ERROR: |
| if (nn_usock_cancel_io (usock) == 0) { |
| nn_fsm_raise(&usock->fsm, &usock->event_error, |
| NN_USOCK_ERROR); |
| nn_usock_close (usock); |
| usock->state = NN_USOCK_STATE_DONE; |
| return; |
| } |
| usock->state = NN_USOCK_STATE_CANCELLING_IO; |
| return; |
| default: |
| nn_fsm_bad_action (usock->state, src, type); |
| } |
| case NN_USOCK_SRC_OUT: |
| switch (type) { |
| case NN_WORKER_OP_DONE: |
| if (usock->pipesendbuf) { |
| nn_free(usock->pipesendbuf); |
| usock->pipesendbuf = NULL; |
| } |
| nn_fsm_raise (&usock->fsm, &usock->event_sent, NN_USOCK_SENT); |
| return; |
| case NN_WORKER_OP_ERROR: |
| if (nn_usock_cancel_io (usock) == 0) { |
| nn_fsm_raise(&usock->fsm, &usock->event_error, |
| NN_USOCK_ERROR); |
| nn_usock_close(usock); |
| usock->state = NN_USOCK_STATE_DONE; |
| return; |
| } |
| usock->state = NN_USOCK_STATE_CANCELLING_IO; |
| return; |
| default: |
| nn_fsm_bad_action (usock->state, src, type); |
| } |
| case NN_FSM_ACTION: |
| switch (type) { |
| case NN_USOCK_ACTION_ERROR: |
| if (nn_usock_cancel_io (usock) == 0) { |
| nn_fsm_raise(&usock->fsm, &usock->event_error, |
| NN_USOCK_SHUTDOWN); |
| nn_usock_close(usock); |
| usock->state = NN_USOCK_STATE_DONE; |
| return; |
| } |
| usock->state = NN_USOCK_STATE_CANCELLING_IO; |
| return; |
| default: |
| nn_fsm_bad_action (usock->state, src, type); |
| } |
| default: |
| nn_fsm_bad_source (usock->state, src, type); |
| } |
| |
| /*****************************************************************************/ |
| /* CANCELLING_IO state. */ |
| /*****************************************************************************/ |
| case NN_USOCK_STATE_CANCELLING_IO: |
| switch (src) { |
| case NN_USOCK_SRC_IN: |
| case NN_USOCK_SRC_OUT: |
| if (!nn_worker_op_isidle (&usock->in) || |
| !nn_worker_op_isidle (&usock->out)) |
| return; |
| nn_fsm_raise(&usock->fsm, &usock->event_error, NN_USOCK_SHUTDOWN); |
| nn_usock_close(usock); |
| usock->state = NN_USOCK_STATE_DONE; |
| return; |
| default: |
| nn_fsm_bad_source (usock->state, src, type); |
| } |
| |
| /*****************************************************************************/ |
| /* DONE state. */ |
| /*****************************************************************************/ |
| case NN_USOCK_STATE_DONE: |
| nn_fsm_bad_source (usock->state, src, type); |
| |
| /*****************************************************************************/ |
| /* LISTENING state. */ |
| /*****************************************************************************/ |
| case NN_USOCK_STATE_LISTENING: |
| switch (src) { |
| case NN_FSM_ACTION: |
| switch (type) { |
| case NN_USOCK_ACTION_ACCEPT: |
| usock->state = NN_USOCK_STATE_ACCEPTING; |
| return; |
| default: |
| nn_fsm_bad_action (usock->state, src, type); |
| } |
| default: |
| nn_fsm_bad_source (usock->state, src, type); |
| } |
| |
| /*****************************************************************************/ |
| /* ACCEPTING state. */ |
| /*****************************************************************************/ |
| case NN_USOCK_STATE_ACCEPTING: |
| switch (src) { |
| case NN_FSM_ACTION: |
| switch (type) { |
| case NN_USOCK_ACTION_DONE: |
| usock->state = NN_USOCK_STATE_LISTENING; |
| return; |
| case NN_USOCK_ACTION_CANCEL: |
| if (usock->p == INVALID_HANDLE_VALUE && usock->asock != NULL && usock->domain == AF_UNIX) { |
| usock->p = usock->asock->p; |
| nn_usock_cancel_io (usock); |
| usock->p = INVALID_HANDLE_VALUE; |
| } |
| else |
| { |
| nn_usock_cancel_io(usock); |
| } |
| usock->state = NN_USOCK_STATE_CANCELLING; |
| return; |
| default: |
| nn_fsm_bad_action (usock->state, src, type); |
| } |
| case NN_USOCK_SRC_IN: |
| switch (type) { |
| case NN_WORKER_OP_DONE: |
| |
| /* Adjust the new usock object. */ |
| usock->asock->state = NN_USOCK_STATE_ACCEPTED; |
| |
| /* Notify the user that connection was accepted. */ |
| nn_fsm_raise (&usock->asock->fsm, |
| &usock->asock->event_established, NN_USOCK_ACCEPTED); |
| |
| /* Disassociate the listener socket from the accepted |
| socket. */ |
| usock->asock->asock = NULL; |
| usock->asock = NULL; |
| |
| /* Wait till the user starts accepting once again. */ |
| usock->state = NN_USOCK_STATE_LISTENING; |
| |
| return; |
| |
| default: |
| nn_fsm_bad_action (usock->state, src, type); |
| } |
| default: |
| nn_fsm_bad_source (usock->state, src, type); |
| } |
| |
| /*****************************************************************************/ |
| /* CANCELLING state. */ |
| /*****************************************************************************/ |
| case NN_USOCK_STATE_CANCELLING: |
| switch (src) { |
| case NN_USOCK_SRC_IN: |
| switch (type) { |
| case NN_WORKER_OP_DONE: |
| case NN_WORKER_OP_ERROR: |
| |
| /* TODO: The socket being accepted should be closed here. */ |
| |
| usock->state = NN_USOCK_STATE_LISTENING; |
| |
| /* Notify the accepted socket that it was stopped. */ |
| nn_fsm_action (&usock->asock->fsm, NN_USOCK_ACTION_DONE); |
| |
| return; |
| |
| default: |
| nn_fsm_bad_action (usock->state, src, type); |
| } |
| default: |
| nn_fsm_bad_source (usock->state, src, type); |
| } |
| |
| /*****************************************************************************/ |
| /* Invalid state. */ |
| /*****************************************************************************/ |
| default: |
| nn_fsm_bad_state (usock->state, src, type); |
| } |
| } |
| |
| /*****************************************************************************/ |
| /* State machine actions. */ |
| /*****************************************************************************/ |
| |
| /* Returns 0 if there's nothing to cancel or 1 otherwise. */ |
| static int nn_usock_cancel_io (struct nn_usock *self) |
| { |
| int rc; |
| BOOL brc; |
| |
| /* For some reason simple CancelIo doesn't seem to work here. |
| We have to use CancelIoEx instead. */ |
| rc = 0; |
| if (!nn_worker_op_isidle (&self->in)) { |
| brc = CancelIoEx (self->p, &self->in.olpd); |
| win_assert (brc || GetLastError () == ERROR_NOT_FOUND); |
| rc = 1; |
| } |
| if (!nn_worker_op_isidle (&self->out)) { |
| brc = CancelIoEx (self->p, &self->out.olpd); |
| win_assert (brc || GetLastError () == ERROR_NOT_FOUND); |
| rc = 1; |
| } |
| |
| return rc; |
| } |