AIO subsystem removed to be refactorised from scratch
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index c546466..fe5ea76 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -37,8 +37,6 @@
core/sock.c
utils/addr.h
utils/addr.c
- utils/aio.h
- utils/aio.c
utils/alloc.h
utils/alloc.c
utils/clock.h
diff --git a/src/core/ep.c b/src/core/ep.c
index 6afc2d3..8270cd5 100644
--- a/src/core/ep.c
+++ b/src/core/ep.c
@@ -55,8 +55,3 @@
return epbase->vfptr->close (epbase, linger);
}
-struct sp_cp *sp_epbase_getcp (struct sp_epbase *self)
-{
- return sp_sock_getcp (self->sock);
-}
-
diff --git a/src/core/sock.c b/src/core/sock.c
index ec4b36e..7b1c88d 100644
--- a/src/core/sock.c
+++ b/src/core/sock.c
@@ -28,27 +28,13 @@
#include "../utils/err.h"
#include "../utils/cont.h"
-#define SP_SOCK_OP_STOP 1
-#define SP_SOCK_OP_TIMERS 2
-#define SP_SOCK_OP_IN 3
-#define SP_SOCK_OP_OUT 4
-
-/* Private functions. */
-void sp_sockbase_timer_routine (struct sp_timer *self);
-static void sp_sock_worker_routine (void *arg);
-
void sp_sockbase_init (struct sp_sockbase *self,
const struct sp_sockbase_vfptr *vfptr, int fd)
{
self->vfptr = vfptr;
sp_mutex_init (&self->sync, 0);
sp_cond_init (&self->cond);
- self->fd = fd;
- sp_clock_init (&self->clock);
- sp_list_init (&self->timers);
- sp_cp_init (&self->cp);
- sp_thread_init (&self->worker, sp_sock_worker_routine, self);
-}
+ self->fd = fd;}
void sp_sock_term (struct sp_sock *self)
{
@@ -59,13 +45,6 @@
/* Terminate the derived class. */
sockbase->vfptr->term (sockbase);
- /* Ask the worker thread to terminate and wait till it does. */
- sp_cp_post (&sockbase->cp, SP_SOCK_OP_STOP, NULL);
- sp_thread_term (&sockbase->worker);
- sp_cp_term (&sockbase->cp);
- sp_list_term (&sockbase->timers);
- sp_clock_term (&sockbase->clock);
-
/* Terminate the sp_sockbase itself. */
sp_cond_term (&sockbase->cond);
sp_mutex_term (&sockbase->sync);
@@ -239,144 +218,11 @@
void sp_sock_in (struct sp_sock *self, struct sp_pipe *pipe)
{
- sp_cp_post (&((struct sp_sockbase*) self)->cp, SP_SOCK_OP_IN, pipe);
+ sp_assert (0);
}
void sp_sock_out (struct sp_sock *self, struct sp_pipe *pipe)
{
- sp_cp_post (&((struct sp_sockbase*) self)->cp, SP_SOCK_OP_OUT, pipe);
-}
-
-void sp_timer_start (struct sp_timer *self, struct sp_sockbase *sockbase,
- int timeout, void (*fn) (struct sp_timer *self))
-{
- struct sp_list_item *it;
- struct sp_timer *itt;
-
- /* No need to lock the socket mutex here as the function is always
- called from within the socket. I.e. the mutex is already locked. */
- self->timeout = sp_clock_now (&sockbase->clock) + timeout;
- self->fn = fn;
- for (it = sp_list_begin (&sockbase->timers);
- it != sp_list_end (&sockbase->timers);
- it = sp_list_next (&sockbase->timers, it)) {
- itt = sp_cont (it, struct sp_timer, list);
- if (self->timeout < itt->timeout)
- break;
- }
- sp_list_insert (&sockbase->timers, &self->list, it);
- if (&self->list == sp_list_begin (&sockbase->timers))
- sp_cp_post (&sockbase->cp, SP_SOCK_OP_TIMERS, NULL);
-}
-
-void sp_timer_cancel (struct sp_timer *self, struct sp_sockbase *sockbase)
-{
- int signal;
-
- /* No need to lock the socket mutex here as the function is always
- called from within the socket. I.e. the mutex is already locked. */
- signal = (&self->list == sp_list_begin (&sockbase->timers)) ? 1 : 0;
- sp_list_erase (&sockbase->timers, &self->list);
- if (signal)
- sp_cp_post (&sockbase->cp, SP_SOCK_OP_TIMERS, NULL);
-}
-
-static void sp_sock_worker_routine (void *arg)
-{
- int rc;
- int timeout;
- uint64_t now;
- struct sp_sockbase *sockbase;
- struct sp_timer *timer;
- int op;
- struct sp_usock *usock;
- void *oparg;
-
- sockbase = (struct sp_sockbase*) arg;
-
- /* Get the current time. */
- now = sp_clock_now (&sockbase->clock);
-
- sp_mutex_lock (&sockbase->sync);
-
- while (1) {
-
- /* Compute the waiting period till first timer expires. */
- if (sp_list_empty (&sockbase->timers))
- timeout = -1;
- else {
- timer = sp_cont (sp_list_begin (&sockbase->timers),
- struct sp_timer, list);
- timeout = (int) (timer->timeout - now);
- if (timeout < 0)
- timeout = 0;
- }
-
- /* Wait for a completion of an operation or a timer expiration. */
- sp_mutex_unlock (&sockbase->sync);
- rc = sp_cp_wait (&sockbase->cp, timeout, &op, &usock, &oparg);
- errnum_assert (rc == 0 || rc == -ETIMEDOUT, -rc);
- sp_mutex_lock (&sockbase->sync);
-
- /* If there's a task completion event available. */
- if (rc == 0) {
-
- /* Handle inbound pipes. */
- if (sp_fast (op == SP_SOCK_OP_IN)) {
- rc = sockbase->vfptr->in (sockbase, (struct sp_pipe*) oparg);
- errnum_assert (rc >= 0, -rc);
- if (rc > 0)
- sp_cond_post (&sockbase->cond);
- }
-
- /* Handle outbound pipes. */
- else if (sp_fast (op == SP_SOCK_OP_OUT)) {
- rc = sockbase->vfptr->out (sockbase, (struct sp_pipe*) oparg);
- errnum_assert (rc >= 0, -rc);
- if (rc > 0)
- sp_cond_post (&sockbase->cond);
- }
-
- /* If timers were modified do nothing and move straight to
- execution and re-computation of the timers. */
- else if (sp_fast (op == SP_SOCK_OP_TIMERS)) {
- }
-
- /* If the socket is terminating, exit the worker thread. */
- else if (sp_fast (op == SP_SOCK_OP_STOP))
- break;
-
- /* Unknown operation. */
- else
- sp_assert (0);
- }
-
- /* Adjust the current time. */
- now = sp_clock_now (&sockbase->clock);
-
- /* Execute the timers. */
- while (1) {
- if (sp_list_empty (&sockbase->timers))
- break;
- timer = sp_cont (sp_list_begin (&sockbase->timers),
- struct sp_timer, list);
- if (timer->timeout > now)
- break;
- sp_list_erase (&sockbase->timers, &timer->list);
-
- /* Invoke the timer callback. The important point here is that
- the timer structure is not referenced anymore and none of its
- members will be used again. Thus, callback is free to re-use
- it to launch a new timer. */
- timer->fn (timer);
- }
- }
-
- sp_mutex_unlock (&sockbase->sync);
-}
-
-struct sp_cp *sp_sock_getcp (struct sp_sock *self)
-{
- return &((struct sp_sockbase*) self)->cp;
+ sp_assert (0);
}
diff --git a/src/core/sock.h b/src/core/sock.h
index 8e9672d..a904418 100644
--- a/src/core/sock.h
+++ b/src/core/sock.h
@@ -45,7 +45,4 @@
/* Receive a message from the socket. */
int sp_sock_recv (struct sp_sock *self, void *buf, size_t *len, int flags);
-/* Get access to the completion port associated with the socket. */
-struct sp_cp *sp_sock_getcp (struct sp_sock *self);
-
#endif
diff --git a/src/pattern.h b/src/pattern.h
index 2ee00fd..1a4a898 100644
--- a/src/pattern.h
+++ b/src/pattern.h
@@ -24,11 +24,8 @@
#define SP_PATTERN_INCLUDED
#include "utils/mutex.h"
-#include "utils/thread.h"
-#include "utils/clock.h"
#include "utils/cond.h"
#include "utils/list.h"
-#include "utils/aio.h"
#include <stddef.h>
#include <stdint.h>
@@ -47,25 +44,6 @@
int sp_pipe_recv (struct sp_pipe *self, void *buf, size_t *len);
/******************************************************************************/
-/* Timers. */
-/******************************************************************************/
-
-struct sp_sockbase;
-
-struct sp_timer {
- struct sp_list_item list;
- uint64_t timeout;
- void (*fn) (struct sp_timer *self);
-};
-
-/* Start the timer. */
-void sp_timer_start (struct sp_timer *self, struct sp_sockbase *sockbase,
- int timeout, void (*fn) (struct sp_timer *self));
-
-/* Cancel a running timer. */
-void sp_timer_cancel (struct sp_timer *self, struct sp_sockbase *sockbase);
-
-/******************************************************************************/
/* Base class for all socket types. */
/******************************************************************************/
@@ -91,7 +69,7 @@
/* Table of virtual functions supplied by the socket type. */
const struct sp_sockbase_vfptr *vfptr;
- /* Synchronises inbound-related state of the socket. */
+ /* Synchronises state of the socket. */
struct sp_mutex sync;
/* Condition variable to implement sleeping in blocking socket
@@ -100,20 +78,6 @@
/* File descriptor for this socket. */
int fd;
-
- /* Worker thread's instance of the clock. */
- struct sp_clock clock;
-
- /* List of active timers. */
- struct sp_list timers;
-
- /* Completion port processed the worker thread. */
- struct sp_cp cp;
-
- /* Worker thread associated with the socket. */
- /* At the moment there's one worker thread per socket. Later on we can
- move to thread pool model if needed. */
- struct sp_thread worker;
};
/* Initialise the socket. */
diff --git a/src/patterns/reqrep/req.c b/src/patterns/reqrep/req.c
index af565d7..ef7bcb4 100644
--- a/src/patterns/reqrep/req.c
+++ b/src/patterns/reqrep/req.c
@@ -44,7 +44,6 @@
size_t requestlen;
void *request;
int resend_ivl;
- struct sp_timer resend_timer;
};
/* Implementation of sp_sockbase's virtual functions. */
@@ -56,9 +55,6 @@
static int sp_req_getopt (struct sp_sockbase *self, int option,
void *optval, size_t *optvallen);
-/* Private functions. */
-void sp_req_resend_routine (struct sp_timer *self);
-
static const struct sp_sockbase_vfptr sp_req_sockbase_vfptr = {
sp_req_term,
sp_xreq_add,
@@ -95,7 +91,7 @@
req = sp_cont (self, struct sp_req, xreq.sockbase);
if (req->flags & SP_REQ_INPROGRESS) {
- sp_timer_cancel (&req->resend_timer, self);
+ /* TODO: Cancel the timer here. */
sp_free (req->request);
}
@@ -115,7 +111,7 @@
req->requestlen = 0;
req->request = NULL;
req->flags &= ~SP_REQ_INPROGRESS;
- sp_timer_cancel (&req->resend_timer, self);
+ /* TODO: Cancel the resend timer here. */
}
/* Generate new request ID for the new request. */
@@ -142,9 +138,7 @@
at the moment. */
req->flags |= SP_REQ_INPROGRESS;
- /* Set up the re-send timer. */
- sp_timer_start (&req->resend_timer, self, req->resend_ivl,
- sp_req_resend_routine);
+ /* TODO: Set up the re-send timer. */
return 0;
}
@@ -201,7 +195,7 @@
*len = replylen - sizeof (uint32_t);
/* Clean-up. */
- sp_timer_cancel (&req->resend_timer, self);
+ /* TODO: Cancel the resend timer here. */
sp_free (reply);
sp_free (req->request);
req->requestlen = 0;
@@ -211,23 +205,6 @@
return 0;
}
-void sp_req_resend_routine (struct sp_timer *self)
-{
- int rc;
- struct sp_req *req;
-
- req = sp_cont (self, struct sp_req, resend_timer);
- sp_assert (req->flags & SP_REQ_INPROGRESS);
-
- /* Re-send the request. */
- rc = sp_xreq_send (&req->xreq.sockbase, req->request, req->requestlen);
- errnum_assert (rc == 0 || rc == -EAGAIN, -rc);
-
- /* Set up the next re-send timer. */
- sp_timer_start (&req->resend_timer, &req->xreq.sockbase,
- req->resend_ivl, sp_req_resend_routine);
-}
-
static int sp_req_setopt (struct sp_sockbase *self, int option,
const void *optval, size_t optvallen)
{
diff --git a/src/transport.h b/src/transport.h
index f7ddc33..2ccfefd 100644
--- a/src/transport.h
+++ b/src/transport.h
@@ -62,9 +62,6 @@
/* Destroys the endpoint. */
void sp_epbase_term (struct sp_epbase *self);
-/* Get access to the completion port associated with the socket. */
-struct sp_cp *sp_epbase_getcp (struct sp_epbase *self);
-
/******************************************************************************/
/* The base class for pipes. */
/******************************************************************************/
diff --git a/src/transports/tcp/tcpb.c b/src/transports/tcp/tcpb.c
index e1ea43f..36c98af 100644
--- a/src/transports/tcp/tcpb.c
+++ b/src/transports/tcp/tcpb.c
@@ -24,6 +24,7 @@
#include "../../utils/err.h"
#include "../../utils/cont.h"
+#include "../../utils/addr.h"
/* Implementation of sp_epbase interface. */
static int sp_tcpb_close (struct sp_epbase *self, int linger);
@@ -62,24 +63,7 @@
/* Initialise the base class. */
sp_epbase_init (&self->epbase, &sp_tcpb_epbase_vfptr, hint);
- /* Open the listening socket. */
- rc = sp_usock_init (&self->usock, AF_INET, SOCK_STREAM, IPPROTO_TCP,
- sp_epbase_getcp (&self->epbase));
- errnum_assert (rc == 0, -rc);
- rc = sp_usock_bind (&self->usock, (struct sockaddr*) &ss, sslen);
- errnum_assert (rc == 0, -rc);
- /* TODO: Get the backlog value from a socket option! */
- rc = sp_usock_listen (&self->usock, 100);
- errnum_assert (rc == 0, -rc);
-
- /* Start accepting new connections. */
- while (1) {
- rc = sp_usock_accept (&self->usock, &self->newsock);
- if (rc == -EINPROGRESS)
- break;
- errnum_assert (rc == 0, -rc);
- /* TODO */
- }
+ /* TODO: Open the listing socket here and start accepting connections. */
return 0;
}
@@ -90,9 +74,6 @@
tcpb = sp_cont (self, struct sp_tcpb, epbase);
- /* Close the listening socket. */
- sp_usock_term (&tcpb->usock);
-
sp_assert (0);
return 0;
}
diff --git a/src/transports/tcp/tcpb.h b/src/transports/tcp/tcpb.h
index b251e70..f118a37 100644
--- a/src/transports/tcp/tcpb.h
+++ b/src/transports/tcp/tcpb.h
@@ -25,18 +25,10 @@
#include "../../transport.h"
-#include "../../utils/aio.h"
-
struct sp_tcpb {
/* This object is an endpoint. */
struct sp_epbase epbase;
-
- /* The listening TCP socket. */
- struct sp_usock usock;
-
- /* The freshly accepted connection. */
- struct sp_usock newsock;
};
int sp_tcpb_init (struct sp_tcpb *self, const char *addr, void *hint);
diff --git a/src/transports/tcp/tcpc.c b/src/transports/tcp/tcpc.c
index 7251a82..f4250a9 100644
--- a/src/transports/tcp/tcpc.c
+++ b/src/transports/tcp/tcpc.c
@@ -24,6 +24,7 @@
#include "../../utils/err.h"
#include "../../utils/cont.h"
+#include "../../utils/addr.h"
/* Implementation of sp_epbase interface. */
static int sp_tcpc_close (struct sp_epbase *self, int linger);
@@ -64,16 +65,7 @@
/* Initialise the base class. */
sp_epbase_init (&self->epbase, &sp_tcpc_epbase_vfptr, hint);
- /* Open the underlying socket. */
- rc = sp_usock_init (&self->usock, AF_INET, SOCK_STREAM, IPPROTO_TCP,
- sp_epbase_getcp (&self->epbase));
- errnum_assert (rc == 0, -rc);
-
- /* Start connecting. */
- rc = sp_usock_connect (&self->usock, (struct sockaddr*) &ss, sslen);
- if (rc == -EINPROGRESS)
- return 0;
- errnum_assert (rc == 0, -rc);
+ /* TODO: Open the socket and start connecting. */
return 0;
}
diff --git a/src/transports/tcp/tcpc.h b/src/transports/tcp/tcpc.h
index c22f381..7ba18a6 100644
--- a/src/transports/tcp/tcpc.h
+++ b/src/transports/tcp/tcpc.h
@@ -25,17 +25,13 @@
#include "../../transport.h"
-#include "../../utils/aio.h"
-
struct sp_tcpc {
/* This object is an endpoint. */
struct sp_epbase epbase;
-
- /* The underlying TCP socket. */
- struct sp_usock usock;
};
int sp_tcpc_init (struct sp_tcpc *self, const char *addr, void *hint);
#endif
+
diff --git a/src/utils/aio.c b/src/utils/aio.c
deleted file mode 100644
index b4fb7d9..0000000
--- a/src/utils/aio.c
+++ /dev/null
@@ -1,776 +0,0 @@
-/*
- Copyright (c) 2012 250bpm s.r.o.
-
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"),
- to deal in the Software without restriction, including without limitation
- the rights to use, copy, modify, merge, publish, distribute, sublicense,
- and/or sell copies of the Software, and to permit persons to whom
- the Software is furnished to do so, subject to the following conditions:
-
- The above copyright notice and this permission notice shall be included
- in all copies or substantial portions of the Software.
-
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
- THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- IN THE SOFTWARE.
-*/
-
-#include "aio.h"
-#include "err.h"
-#include "fast.h"
-#include "cont.h"
-
-#if !defined SP_HAVE_WINDOWS
-#if defined SP_HAVE_ACCEPT4
-#define _GNU_SOURCE
-#endif
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/tcp.h>
-#include <netinet/in.h>
-#include <fcntl.h>
-#endif
-
-/* Private functions. */
-void sp_usock_tune (struct sp_usock *self);
-
-int sp_usock_init (struct sp_usock *self, int domain, int type, int protocol,
- struct sp_cp *cp)
-{
-#if !defined SOCK_CLOEXEC && defined FD_CLOEXEC
- int rc;
-#endif
-#if defined SP_HAVE_WINDOWS
- HANDLE wcp;
-#endif
-
- /* If the operating system allows to directly open the socket with CLOEXEC
- flag, do so. That way there are no race conditions. */
-#ifdef SOCK_CLOEXEC
- type |= SOCK_CLOEXEC;
-#endif
-
- /* Open the underlying socket. */
- self->s = socket (domain, type, protocol);
-#if defined SP_HAVE_WINDOWS
- if (self->s == INVALID_SOCKET)
- return -sp_err_wsa_to_posix (WSAGetLastError ());
-#else
- if (self->s < 0)
- return -errno;
-#endif
- self->domain = domain;
- self->type = type;
- self->protocol = protocol;
- self->cp = cp;
-#if !defined SP_HAVE_WINDOWS
- self->in.op = SP_USOCK_INOP_NONE;
- self->out.op = SP_USOCK_OUTOP_NONE;
-#endif
-
- /* Setting FD_CLOEXEC option immediately after socket creation is the
- second best option. There is a race condition (if process is forked
- between socket creation and setting the option) but the problem is
- pretty unlikely to happen. */
-#if !defined SOCK_CLOEXEC && defined FD_CLOEXEC
- rc = fcntl (self->s, F_SETFD, FD_CLOEXEC);
- errno_assert (rc != -1);
-#endif
-
- sp_usock_tune (self);
-
-#if defined SP_HAVE_WINDOWS
- wcp = CreateIoCompletionPort ((HANDLE) self->s, cp->hndl,
- (ULONG_PTR) NULL, 0);
- sp_assert (wcp);
-#endif
-
- return 0;
-}
-
-void sp_usock_tune (struct sp_usock *self)
-{
- int rc;
- int opt;
-#if defined SP_HAVE_WINDOWS
- u_long flags;
- BOOL brc;
- DWORD only;
-#else
- int flags;
- int only;
-#endif
-
- /* If applicable, prevent SIGPIPE signal when writing to the connection
- already closed by the peer. */
-#ifdef SO_NOSIGPIPE
- opt = 1;
- rc = setsockopt (self->s, SOL_SOCKET, SO_NOSIGPIPE, &opt, sizeof (opt));
- errno_assert (rc == 0);
-#endif
-
- /* Switch the socket to the non-blocking mode. All underlying sockets
- are always used in the asynchronous mode. */
-#if defined SP_HAVE_WINDOWS
- flags = 1;
- rc = ioctlsocket (self->s, FIONBIO, &flags);
- wsa_assert (rc != SOCKET_ERROR);
-#else
- flags = fcntl (self->s, F_GETFL, 0);
- if (flags == -1)
- flags = 0;
- rc = fcntl (self->s, F_SETFL, flags | O_NONBLOCK);
- errno_assert (rc != -1);
-#endif
-
- /* On TCP sockets switch off the Nagle's algorithm to get
- the best possible latency. */
- if ((self->domain == AF_INET || self->domain == AF_INET6) &&
- self->type == SOCK_STREAM) {
- opt = 1;
- rc = setsockopt (self->s, IPPROTO_TCP, TCP_NODELAY,
- (const char*) &opt, sizeof (opt));
-#if defined SP_HAVE_WINDOWS
- wsa_assert (rc != SOCKET_ERROR);
-#else
- errno_assert (rc == 0);
-#endif
- }
-
- /* If applicable, disable delayed acknowledgements to improve latency. */
-#if defined TCP_NODELACK
- opt = 1;
- rc = setsockopt (self->s, IPPROTO_TCP, TCP_NODELACK, &opt, sizeof (opt));
- errno_assert (rc == 0);
-#endif
-
- /* On some operating systems IPv4 mapping for IPv6 sockets is disabled
- by default. In such case, switch it on. */
-#if defined IPV6_V6ONLY
- if (self->domain == AF_INET6) {
- only = 0;
- rc = setsockopt (self->s, IPPROTO_IPV6, IPV6_V6ONLY,
- (const char*) &only, sizeof (only));
-#ifdef SP_HAVE_WINDOWS
- wsa_assert (rc != SOCKET_ERROR);
-#else
- errno_assert (rc == 0);
-#endif
- }
-#endif
-
-/* On Windows, disable inheriting the socket to the child processes. */
-#if defined SP_HAVE_WINDOWS && defined HANDLE_FLAG_INHERIT
- brc = SetHandleInformation ((HANDLE) self->s, HANDLE_FLAG_INHERIT, 0);
- win_assert (brc);
-#endif
-}
-
-#if defined SP_HAVE_WINDOWS
-
-#include <string.h>
-
-void sp_cp_init (struct sp_cp *self)
-{
- self->hndl = CreateIoCompletionPort (INVALID_HANDLE_VALUE, NULL, 0, 0);
- win_assert (self->hndl);
-}
-
-void sp_cp_term (struct sp_cp *self)
-{
- BOOL brc;
-
- brc = CloseHandle (self->hndl);
- win_assert (brc);
-}
-
-void sp_cp_post (struct sp_cp *self, int op, void *arg)
-{
- BOOL brc;
-
- brc = PostQueuedCompletionStatus (self->hndl, (DWORD) op,
- (ULONG_PTR) arg, NULL);
- win_assert (brc);
-}
-
-int sp_cp_wait (struct sp_cp *self, int timeout, int *op,
- struct sp_usock **usock, void **arg)
-{
- BOOL brc;
- DWORD nbytes;
- ULONG_PTR key;
- LPOVERLAPPED olpd;
-
- brc = GetQueuedCompletionStatus (self->hndl, &nbytes, &key,
- &olpd, timeout < 0 ? INFINITE : timeout);
- if (sp_slow (!brc && !olpd))
- return -ETIMEDOUT;
- win_assert (brc);
- *op = (int) nbytes;
- *arg = (void*) key;
-
- return 0;
-}
-
-void sp_usock_term (struct sp_usock *self)
-{
- int rc;
-
- rc = closesocket (self->s);
- wsa_assert (rc != SOCKET_ERROR);
-}
-
-int sp_usock_bind (struct sp_usock *self, const struct sockaddr *addr,
- sp_socklen addrlen)
-{
- int rc;
-
- rc = bind (self->s, addr, addrlen);
- if (sp_slow (rc == SOCKET_ERROR))
- return -sp_err_wsa_to_posix (WSAGetLastError ());
-
- return 0;
-}
-
-int sp_usock_connect (struct sp_usock *self, const struct sockaddr *addr,
- sp_socklen addrlen)
-{
- int rc;
- BOOL brc;
- const GUID fid = WSAID_CONNECTEX;
- LPFN_CONNECTEX pconnectex;
- DWORD nbytes;
-
- rc = WSAIoctl (self->s, SIO_GET_EXTENSION_FUNCTION_POINTER,
- (void*) &fid, sizeof (fid), (void*) &pconnectex, sizeof (pconnectex),
- &nbytes, NULL, NULL);
- wsa_assert (rc == 0);
- sp_assert (nbytes == sizeof (pconnectex));
- memset (&self->conn, 0, sizeof (self->conn));
- brc = pconnectex (self->s, (struct sockaddr*) &addr, addrlen,
- NULL, 0, NULL, (OVERLAPPED*) &self->conn);
- if (sp_fast (brc == TRUE))
- return 0;
- wsa_assert (WSAGetLastError () == WSA_IO_PENDING);
- return -EINPROGRESS;
-}
-
-int sp_usock_listen (struct sp_usock *self, int backlog)
-{
- int rc;
- int opt;
-
- /* On Windows, the bound port can be hijacked if SO_EXCLUSIVEADDRUSE
- is not set. */
- opt = 1;
- rc = setsockopt (self->s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
- (const char*) &opt, sizeof (opt));
- wsa_assert (rc != SOCKET_ERROR);
-
- rc = listen (self->s, backlog);
- if (sp_slow (rc == SOCKET_ERROR))
- return -sp_err_wsa_to_posix (WSAGetLastError ());
-
- return 0;
-}
-
-int sp_usock_accept (struct sp_usock *self, struct sp_usock *usock)
-{
- BOOL brc;
- char info [64];
- DWORD nbytes;
-
- sp_usock_init (usock, self->domain, self->type, self->protocol, self->cp);
-
- memset (&usock->conn, 0, sizeof (usock->conn));
- brc = AcceptEx (self->s, usock->s, info, 0, 256, 256, &nbytes,
- &usock->conn);
- if (sp_fast (brc == TRUE))
- return 0;
- wsa_assert (WSAGetLastError () == WSA_IO_PENDING);
- return -EINPROGRESS;
-}
-
-int sp_usock_send (struct sp_usock *self, const void *buf, size_t *len,
- int flags)
-{
- int rc;
- WSABUF wbuf;
- DWORD nbytes;
-
- /* TODO: Support partial send. */
-
- wbuf.len = (u_long) *len;
- wbuf.buf = (char FAR*) buf;
- memset (&self->out, 0, sizeof (self->out));
- rc = WSASend (self->s, &wbuf, 1, &nbytes, 0, &self->out, NULL);
- if (sp_fast (rc == 0)) {
- *len = nbytes;
- return 0;
- }
- wsa_assert (WSAGetLastError () == WSA_IO_PENDING);
- return -EINPROGRESS;
-}
-
-int sp_usock_recv (struct sp_usock *self, void *buf, size_t *len, int flags)
-{
- int rc;
- WSABUF wbuf;
- DWORD wflags;
- DWORD nbytes;
-
- /* TODO: Support partial recv. */
-
- wbuf.len = (u_long) *len;
- wbuf.buf = (char FAR*) buf;
- wflags = MSG_WAITALL;
- memset (&self->in, 0, sizeof (self->in));
- rc = WSARecv (self->s, &wbuf, 1, &nbytes, &wflags, &self->in, NULL);
- if (sp_fast (rc == 0)) {
- *len = nbytes;
- return 0;
- }
- wsa_assert (WSAGetLastError () == WSA_IO_PENDING);
- return -EINPROGRESS;
-}
-
-#else
-
-#include "alloc.h"
-
-#define SP_CP_INITIAL_CAPACITY 64
-
-void sp_cp_init (struct sp_cp *self)
-{
- sp_mutex_init (&self->sync, 0);
- sp_poller_init (&self->poller);
- sp_eventfd_init (&self->eventfd);
- sp_poller_add (&self->poller, sp_eventfd_getfd (&self->eventfd),
- &self->evhndl);
- sp_poller_set_in (&self->poller, &self->evhndl);
- self->capacity = SP_CP_INITIAL_CAPACITY;
- self->head = 0;
- self->tail = 0;
- self->items = sp_alloc (self->capacity * sizeof (struct sp_cp_item));
- alloc_assert (self->items);
-}
-
-void sp_cp_term (struct sp_cp *self)
-{
- sp_free (self->items);
- sp_poller_rm (&self->poller, &self->evhndl);
- sp_eventfd_term (&self->eventfd);
- sp_poller_term (&self->poller);
- sp_mutex_term (&self->sync);
-}
-
-void sp_cp_post (struct sp_cp *self, int op, void *arg)
-{
- int empty;
-
- sp_mutex_lock (&self->sync);
-
- /* Fill in new item in the circular buffer. */
- self->items [self->tail].op = op;
- self->items [self->tail].arg = arg;
-
- /* Move tail by 1 position. */
- empty = self->tail == self->head ? 1 : 0;
- self->tail = (self->tail + 1) % self->capacity;
-
- /* If the capacity of the circular buffer is exhausted, allocate some
- more memory. */
- if (sp_slow (self->head == self->tail)) {
- self->items = sp_realloc (self->items,
- self->capacity * 2 * sizeof (struct sp_cp_item));
- alloc_assert (self->items);
- memcpy (self->items + self->capacity, self->items,
- self->tail * sizeof (struct sp_cp_item));
- self->tail += self->capacity;
- self->capacity *= 2;
- }
-
- if (empty)
- sp_eventfd_signal (&self->eventfd);
-
- sp_mutex_unlock (&self->sync);
-}
-
-int sp_cp_wait (struct sp_cp *self, int timeout, int *op,
- struct sp_usock **usock, void **arg)
-{
- int rc;
- int event;
- struct sp_poller_hndl *hndl;
- int newsock;
-
- rc = sp_poller_event (&self->poller, &event, &hndl);
- if (rc == -EAGAIN) {
- rc = sp_poller_wait (&self->poller, timeout);
- if (sp_slow (rc == -EINTR))
- return -EINTR;
- errnum_assert (rc == 0, -rc);
- return -ETIMEDOUT;
- }
- else
- errnum_assert (rc == 0, -rc);
-
- /* If there are any queued operations, process them. */
- if (hndl == &self->evhndl) {
- sp_mutex_lock (&self->sync);
- while (self->head != self->tail) {
-
- /* Retrieve one operation from the queue. */
- *op = self->items [self->head].op;
- *arg = self->items [self->head].arg;
- self->head = (self->head + 1) % self->capacity;
- if (self->head == self->tail)
- sp_eventfd_unsignal (&self->eventfd);
-
- /* Custom operations are returned to the caller straight away. */
- if (*op >= 0) {
- *usock = NULL;
- sp_mutex_unlock (&self->sync);
- return 0;
- }
-
- *usock = (struct sp_usock*) *arg;
- *arg = NULL;
-
- if (*op == SP_USOCK_RECV) {
- sp_poller_set_in (&self->poller, &(*usock)->hndl);
- continue;
- }
-
- if (*op == SP_USOCK_SEND) {
- sp_poller_set_out (&self->poller, &(*usock)->hndl);
- continue;
- }
-
- if (*op == SP_USOCK_ACCEPT) {
- sp_poller_set_in (&self->poller, &(*usock)->hndl);
- continue;
- }
-
- if (*op == SP_USOCK_CONNECT) {
- sp_poller_set_out (&self->poller, &(*usock)->hndl);
- continue;
- }
-
- if (*op == SP_USOCK_REGISTER) {
- sp_poller_add (&self->poller, (*usock)->s, &(*usock)->hndl);
- continue;
- }
-
- if (*op == SP_USOCK_UNREGISTER) {
- sp_poller_rm (&self->poller, &(*usock)->hndl);
- continue;
- }
-
- /* Invalid operation. */
- sp_assert (0);
- }
- sp_mutex_unlock (&self->sync);
-
- /* All queued operations were processed but there's no event to
- return to the called. Do spurious wake-up. */
- return -ETIMEDOUT;
- }
-
- *usock = sp_cont (hndl, struct sp_usock, hndl);
-
- if (event == SP_POLLER_IN) {
-
- if ((*usock)->in.op == SP_USOCK_INOP_ACCEPT) {
- newsock = accept ((*usock)->parent->s, NULL, NULL);
- if (newsock == -1) {
-
- /* Make sure this is a network error,
- not a programming error. */
- errno_assert (errno == ECONNABORTED ||
- errno == EPROTO || errno == ENOBUFS || errno == EMFILE ||
- errno == ENFILE || errno == ENOMEM);
-
- /* Do nothing, wait for new connections. */
- return -ETIMEDOUT;
- }
-
- /* TODO: Intialise the usock. */
- sp_assert (0);
- }
-
- if ((*usock)->in.op == SP_USOCK_INOP_RECV ||
- (*usock)->in.op == SP_USOCK_INOP_PARTIAL_RECV) {
- sp_assert (0);
- }
-
- /* Invalid operation. */
- sp_assert (0);
- }
-
- if (event == SP_POLLER_OUT) {
-
- if ((*usock)->out.op == SP_USOCK_OUTOP_CONNECT) {
- sp_assert (0);
- }
-
- if ((*usock)->out.op == SP_USOCK_OUTOP_SEND ||
- (*usock)->out.op == SP_USOCK_OUTOP_PARTIAL_SEND) {
- sp_assert (0);
- }
-
- /* Invalid operation. */
- sp_assert (0);
- }
-
- if (event == SP_POLLER_ERR) {
- sp_assert (0);
- }
-
- /* Invalid event. */
- sp_assert (0);
-}
-
-void sp_usock_term (struct sp_usock *self)
-{
- int rc;
-
- rc = close (self->s);
- errno_assert (rc == 0);
-}
-
-int sp_usock_bind (struct sp_usock *self, const struct sockaddr *addr,
- sp_socklen addrlen)
-{
- int rc;
-
- rc = bind (self->s, addr, addrlen);
- if (sp_slow (rc < 0))
- return -errno;
-
- return 0;
-}
-
-int sp_usock_connect (struct sp_usock *self, const struct sockaddr *addr,
- sp_socklen addrlen)
-{
- int rc;
-
- /* If there's out operation already in progress, fail. */
- sp_assert (self->out.op == SP_USOCK_OUTOP_NONE);
- self->out.op = SP_USOCK_OUTOP_CONNECT;
-
- rc = connect (self->s, addr, addrlen) ;
-
- /* Move the operation to the worker thread. */
- if (rc == 0 || errno == EINPROGRESS || errno == EAGAIN ||
- errno == EWOULDBLOCK) {
- sp_cp_post (self->cp, SP_USOCK_REGISTER, (void*) self);
- sp_cp_post (self->cp, SP_USOCK_CONNECT, (void*) self);
- return -EINPROGRESS;
- }
-
- /* TODO: Handle EINTR, ECONREFUSED, ENETUNREACH. */
-
- /* Anything else is considered to be a fatal error. */
- errno_assert (0);
- return 0;
-
-}
-
-int sp_usock_listen (struct sp_usock *self, int backlog)
-{
- int rc;
- int opt;
-
- /* To allow for rapid restart of SP services, allow new bind to succeed
- immediately after previous instance of the process failed, skipping the
- grace period. */
- opt = 1;
- rc = setsockopt (self->s, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt));
- errno_assert (rc == 0);
-
- rc = listen (self->s, backlog);
- if (sp_slow (rc < 0))
- return -errno;
-
- /* Register the socket with the poller. */
- sp_cp_post (self->cp, SP_USOCK_REGISTER, (void*) self);
-
- return 0;
-}
-
-int sp_usock_accept (struct sp_usock *self, struct sp_usock *usock)
-{
-#if !defined SP_HAVE_ACCEPT4 && defined FD_CLOEXEC
- int rc;
-#endif
-
- usock->domain = self->domain;
- usock->type = self->type;
- usock->protocol = self->protocol;
- usock->cp = self->cp;
-#if defined SP_HAVE_ACCEPT4
- usock->s = accept4 (self->s, NULL, NULL, SOCK_CLOEXEC);
-#else
- usock->s = accept (self->s, NULL, NULL);
-#endif
- if (sp_fast (usock->s >= 0)) {
-#if !defined SP_HAVE_ACCEPT4 && defined FD_CLOEXEC
- rc = fcntl (self->s, F_SETFD, FD_CLOEXEC);
- errno_assert (rc != -1);
-#endif
- sp_usock_tune (usock);
- return 0;
- }
-
- /* If there's an in operaton in progress, fail. */
- sp_assert (self->out.op == SP_USOCK_OUTOP_NONE);
-
- /* Move the operation to the worker thread. */
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- usock->parent = self;
- self->in.op = SP_USOCK_INOP_ACCEPT;
- sp_cp_post (self->cp, SP_USOCK_ACCEPT, (void*) self);
- return -EINPROGRESS;
- }
-
- /* TODO: Handle EINTR, ECONNABORTED, EPROTO, ENOBUFS, ENOMEM, EMFILE
- and ENFILE. */
-
- /* Anything else is considered to be a fatal error. */
- errno_assert (0);
- return 0;
-}
-
-int sp_usock_send (struct sp_usock *self, const void *buf, size_t *len,
- int flags)
-{
- ssize_t nbytes;
-#if defined MSG_NOSIGNAL
- const int sflags = MSG_NOSIGNAL;
-#else
- const int sflags = 0;
-#endif
-
- /* If there's nothing to send, return straight away. */
- if (*len == 0)
- return 0;
-
- /* Try to send as much data as possible in synchronous manner. */
- nbytes = send (self->s, buf, *len, sflags);
- if (sp_fast (nbytes == *len))
- return 0;
-
- /* Handle errors. */
- if (nbytes < 0) {
-
- /* If no bytes were transferred. */
- if (sp_fast (errno != EAGAIN && errno != EWOULDBLOCK)) {
- nbytes = 0;
- goto async;
- }
-
- /* In theory, this should never happen as all the sockets are
- non-blocking. However, test the condition just in case. */
- if (errno == EINTR)
- return -EINTR;
-
- /* In the case of connection failure. */
- if (errno == ECONNRESET || errno == EPIPE)
- return -ECONNRESET;
-
- /* Other errors are not expected to happen. */
- errno_assert (0);
- }
-
-async:
-
- /* If there's out operation already in progress, fail. */
- sp_assert (self->out.op == SP_USOCK_OUTOP_NONE);
-
- /* Store the info about the asynchronous operation requested. */
- self->out.op = flags & SP_USOCK_PARTIAL ?
- SP_USOCK_OUTOP_PARTIAL_SEND : SP_USOCK_OUTOP_SEND;
- self->out.buf = buf;
- self->out.len = *len;
- self->out.olen = *len;
-
- /* Move the operation to the worker thread. */
- sp_cp_post (self->cp, SP_USOCK_SEND, (void*) self);
-
- return -EINPROGRESS;
-}
-
-int sp_usock_recv (struct sp_usock *self, void *buf, size_t *len,
- int flags)
-{
- ssize_t nbytes;
-
- /* If there's nothing to receive, return straight away. */
- if (*len == 0)
- return 0;
-
- /* Try to receive as much data as possible in synchronous manner. */
- nbytes = recv (self->s, buf, *len, 0);
-
- /* Success. */
- if (sp_fast (nbytes == *len))
- return 0;
- if (sp_fast (nbytes > 0 && flags & SP_USOCK_PARTIAL)) {
- *len = nbytes;
- return 0;
- }
-
- /* Connection terminated. */
- if (sp_slow (nbytes == 0))
- return -ECONNRESET;
-
- /* Handle errors. */
- if (nbytes < 0) {
-
- /* If no bytes were received. */
- if (sp_fast (errno == EAGAIN || errno == EWOULDBLOCK)) {
- nbytes = 0;
- goto async;
- }
-
- /* In theory, this should never happen as all the sockets are
- non-blocking. However, test the condition just in case. */
- if (errno == EINTR)
- return -EINTR;
-
- /* In the case of connection failure. */
- if (errno == ECONNRESET || errno == ECONNREFUSED ||
- errno == ETIMEDOUT || errno == EHOSTUNREACH || errno == ENOTCONN)
- return -ECONNRESET;
-
- /* Other errors are not expected to happen. */
- errno_assert (0);
- }
-
-async:
-
- /* If there's in operation already in progress, fail. */
- sp_assert (self->in.op == SP_USOCK_INOP_NONE);
-
- /* Store the info about the asynchronous operation requested. */
- self->in.op = flags & SP_USOCK_PARTIAL ?
- SP_USOCK_INOP_PARTIAL_RECV : SP_USOCK_INOP_RECV;
- self->in.buf = buf;
- self->in.len = *len;
- self->in.olen = *len;
-
- /* Move the operation to the worker thread. */
- sp_cp_post (self->cp, SP_USOCK_RECV, (void*) self);
-
- return -EINPROGRESS;
-}
-
-#endif
-
diff --git a/src/utils/aio.h b/src/utils/aio.h
deleted file mode 100644
index 8cb8078..0000000
--- a/src/utils/aio.h
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- Copyright (c) 2012 250bpm s.r.o.
-
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"),
- to deal in the Software without restriction, including without limitation
- the rights to use, copy, modify, merge, publish, distribute, sublicense,
- and/or sell copies of the Software, and to permit persons to whom
- the Software is furnished to do so, subject to the following conditions:
-
- The above copyright notice and this permission notice shall be included
- in all copies or substantial portions of the Software.
-
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
- THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- IN THE SOFTWARE.
-*/
-
-#ifndef SP_AIO_INCLUDED
-#define SP_AIO_INCLUDED
-
-#include "mutex.h"
-#include "poller.h"
-#include "eventfd.h"
-#include "addr.h"
-
-#if defined SP_HAVE_WINDOWS
-#include "win.h"
-#endif
-
-#include <stddef.h>
-
-/* Implementation of platform-neutral asynchronous I/O subsystem. */
-
-/* If this flag is set, recv doesn't have to wait for all bytes to be received
- before it completes. */
-#define SP_USOCK_PARTIAL 1
-
-/* These standard operations have negative indices so that positive namespace
- if free for custom events. */
-#define SP_USOCK_REGISTER -1
-#define SP_USOCK_UNREGISTER -2
-#define SP_USOCK_ACCEPT -3
-#define SP_USOCK_CONNECT -4
-#define SP_USOCK_SEND -5
-#define SP_USOCK_RECV -6
-
-struct sp_cp;
-struct sp_usock;
-
-int sp_usock_init (struct sp_usock *self, int domain, int type, int protocol,
- struct sp_cp *cp);
-void sp_usock_term (struct sp_usock *self);
-
-int sp_usock_bind (struct sp_usock *self, const struct sockaddr *addr,
- sp_socklen addrlen);
-int sp_usock_connect (struct sp_usock *self, const struct sockaddr *addr,
- sp_socklen addrlen);
-int sp_usock_listen (struct sp_usock *self, int backlog);
-int sp_usock_accept (struct sp_usock *self, struct sp_usock *usock);
-
-int sp_usock_send (struct sp_usock *self, const void *buf, size_t *len,
- int flags);
-int sp_usock_recv (struct sp_usock *self, void *buf, size_t *len, int flags);
-
-void sp_cp_init (struct sp_cp *self);
-void sp_cp_term (struct sp_cp *self);
-void sp_cp_post (struct sp_cp *self, int op, void *arg);
-
-/* The function is suspectible to spurious ETIMEDOUT wake-ups. */
-int sp_cp_wait (struct sp_cp *self, int timeout, int *op,
- struct sp_usock **usock, void **arg);
-
-#if defined SP_HAVE_WINDOWS
-
-struct sp_usock {
- SOCKET s;
- int domain;
- int type;
- int protocol;
- struct sp_cp *cp;
- OVERLAPPED conn;
- OVERLAPPED in;
- OVERLAPPED out;
-};
-
-struct sp_cp {
- HANDLE hndl;
-};
-
-#else
-
-#define SP_USOCK_INOP_NONE 0
-#define SP_USOCK_INOP_ACCEPT 1
-#define SP_USOCK_INOP_RECV 2
-#define SP_USOCK_INOP_PARTIAL_RECV 3
-
-#define SP_USOCK_OUTOP_NONE 0
-#define SP_USOCK_OUTOP_CONNECT 1
-#define SP_USOCK_OUTOP_SEND 2
-#define SP_USOCK_OUTOP_PARTIAL_SEND 3
-
-struct sp_usock {
- int s;
- int domain;
- int type;
- int protocol;
- struct sp_cp *cp;
- struct sp_poller_hndl hndl;
- struct sp_usock *parent;
- struct {
- int op;
- void *buf;
- size_t len;
- size_t olen;
- } in;
- struct {
- int op;
- const void *buf;
- size_t len;
- size_t olen;
- } out;
-};
-
-struct sp_cp {
- struct sp_mutex sync;
- struct sp_poller poller;
- struct sp_eventfd eventfd;
- struct sp_poller_hndl evhndl;
- size_t capacity;
- size_t head;
- size_t tail;
- struct sp_cp_item {
- int op;
- void *arg;
- } *items;
-};
-
-#endif
-
-#endif
-