blob: e73ed69712a8da83dd63958ea0a847d49cfa865e [file] [log] [blame]
/*
Copyright (c) 2012 250bpm s.r.o.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom
the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE SOFTWARE.
*/
#include "../sp.h"
#include "../transport.h"
#include "../pattern.h"
#include "ctx.h"
#include "sock.h"
#include "ep.h"
#include "../utils/err.h"
#include "../utils/alloc.h"
#include "../utils/mutex.h"
#include "../utils/list.h"
#include "../utils/cont.h"
#include "../utils/random.h"
#include "../utils/glock.h"
#include "../transports/inproc/inproc.h"
#include "../transports/tcp/tcp.h"
#include "../patterns/pair/pair.h"
#include "../patterns/pubsub/pub.h"
#include "../patterns/pubsub/sub.h"
#include "../patterns/reqrep/rep.h"
#include "../patterns/reqrep/req.h"
#include "../patterns/reqrep/xrep.h"
#include "../patterns/reqrep/xreq.h"
#include "../patterns/fanin/sink.h"
#include "../patterns/fanin/source.h"
#include "../patterns/fanin/xsink.h"
#include "../patterns/fanin/xsource.h"
#include "../patterns/fanout/push.h"
#include "../patterns/fanout/pull.h"
#include "../patterns/fanout/xpush.h"
#include "../patterns/fanout/xpull.h"
#include <stddef.h>
#if defined SP_HAVE_WINDOWS
#include "../utils/win.h"
#endif
/* Max number of concurrent SP sockets. */
#define SP_MAX_SOCKETS 512
/* Max number of concurrent SP endpoints. */
#define SP_MAX_EPS 512
/* This check is performed at the beginning of each socket operation to make
sure that the library was initialised and the socket actually exists. */
#define SP_BASIC_CHECKS \
if (sp_slow (!self.socks)) {\
errno = EFAULT;\
return -1;\
}\
if (sp_slow (!self.socks [s])) {\
errno = EBADF;\
return -1;\
}
struct sp_ctx {
/* Synchronisation of socket-related global state of the library.
(following members). */
struct sp_mutex ssync;
/* The global table of existing sockets. The descriptor representing
the socket is the index to this table. */
struct sp_sock **socks;
size_t max_socks;
/* Synchronisation of endpoint-related global state of the library.
(following members, the endpoints themselves, any global state in
transports). */
struct sp_mutex esync;
/* List of all existing endpoints including orphan endpoints. */
struct sp_ep **eps;
size_t max_eps;
/* List of all available transports. The access to this list is not
synchronised. We assume that it never changes after the library was
initialised. */
struct sp_list transports;
/* List of all available socket types. */
struct sp_list socktypes;
/* Next endpoint ID to use. */
int next_eid;
};
/* Number of times sp_init() was called without corresponding sp_term().
This variable is synchronised using the global lock (sp_glock). */
int sp_ctx_refcount = 0;
/* Singleton object containing the global state of the library. */
static struct sp_ctx self = {0};
/* Transport-related private functions. */
static void sp_ctx_add_transport (struct sp_transport *transport);
static void sp_ctx_add_socktype (struct sp_socktype *socktype);
static struct sp_transport *sp_ctx_find_transport (const char *name,
size_t len);
/* Private function that unifies sp_bind and sp_connect functionality.
It returns the ID of the newly created endpoint. */
static int sp_ctx_create_endpoint (int fd, const char *addr, int bind);
void sp_version (int *major, int *minor, int *patch)
{
if (major)
*major = SP_VERSION_MAJOR;
if (minor)
*minor = SP_VERSION_MINOR;
if (patch)
*patch = SP_VERSION_PATCH;
}
int sp_errno (void)
{
return sp_err_errno ();
}
const char *sp_strerror (int errnum)
{
return sp_err_strerror (errnum);
}
int sp_init (void)
{
int i;
#if defined SP_HAVE_WINDOWS
WSADATA data;
int rc;
#endif
sp_glock_lock ();
/* If the library is already initialised, do nothing, just increment
the reference count. */
++sp_ctx_refcount;
if (sp_ctx_refcount > 1) {
sp_glock_unlock ();
return 0;
}
/* On Windows, initialise the socket library. */
#if defined SP_HAVE_WINDOWS
rc = WSAStartup (MAKEWORD (2, 2), &data);
sp_assert (rc == 0);
sp_assert (LOBYTE (data.wVersion) == 2 &&
HIBYTE (data.wVersion) == 2);
#endif
/* Seed the pseudo-random number generator. */
sp_random_seed ();
/* Allocate the global table of SP sockets. */
self.max_socks = SP_MAX_SOCKETS;
self.socks = sp_alloc (sizeof (struct sp_sock*) * self.max_socks);
alloc_assert (self.socks);
for (i = 0; i != self.max_socks; ++i)
self.socks [i] = NULL;
/* Allocate the global table of SP endpoints. */
self.max_eps = SP_MAX_EPS;
self.eps = sp_alloc (sizeof (struct sp_ep*) * self.max_eps);
alloc_assert (self.eps);
for (i = 0; i != self.max_eps; ++i)
self.eps [i] = NULL;
/* Initialise other parts of the global state. */
sp_mutex_init (&self.ssync, 0);
sp_mutex_init (&self.esync, 0);
sp_list_init (&self.transports);
sp_list_init (&self.socktypes);
self.next_eid = 1;
/* Plug in individual transports. */
sp_ctx_add_transport (sp_inproc);
sp_ctx_add_transport (sp_tcp);
/* Plug in individual socktypes. */
sp_ctx_add_socktype (sp_pair_socktype);
sp_ctx_add_socktype (sp_pub_socktype);
sp_ctx_add_socktype (sp_sub_socktype);
sp_ctx_add_socktype (sp_rep_socktype);
sp_ctx_add_socktype (sp_req_socktype);
sp_ctx_add_socktype (sp_xrep_socktype);
sp_ctx_add_socktype (sp_xreq_socktype);
sp_ctx_add_socktype (sp_sink_socktype);
sp_ctx_add_socktype (sp_source_socktype);
sp_ctx_add_socktype (sp_xsink_socktype);
sp_ctx_add_socktype (sp_xsource_socktype);
sp_ctx_add_socktype (sp_push_socktype);
sp_ctx_add_socktype (sp_pull_socktype);
sp_ctx_add_socktype (sp_xpull_socktype);
sp_glock_unlock ();
return 0;
}
int sp_term (void)
{
#if defined SP_HAVE_WINDOWS
int rc;
#endif
sp_glock_lock ();
/* If there are still references to the library, do nothing, just
decrement the reference count. */
--sp_ctx_refcount;
if (sp_ctx_refcount) {
sp_glock_unlock ();
return 0;
}
/* TODO: Wait for all sockets to be closed. */
/* TODO: Wait for all endpoints to be closed. */
/* Final deallocation of the global resources. */
sp_list_term (&self.socktypes);
sp_list_term (&self.transports);
sp_mutex_term (&self.esync);
sp_mutex_term (&self.ssync);
sp_free (self.eps);
self.eps = NULL;
sp_free (self.socks);
self.socks = NULL;
/* On Windows, uninitialise the socket library. */
#if defined SP_HAVE_WINDOWS
rc = WSACleanup ();
sp_assert (rc == 0);
#endif
sp_glock_unlock ();
return 0;
}
int sp_socket (int domain, int protocol)
{
int s;
struct sp_list_item *it;
struct sp_socktype *socktype;
/* Check whether library was initialised. */
if (sp_slow (!self.socks)) {
errno = EFAULT;
return -1;
}
/* Only AF_SP and AF_SP_RAW domains are supported. */
if (sp_slow (domain != AF_SP && domain != AF_SP_RAW)) {
errno = -EAFNOSUPPORT;
return -1;
}
sp_mutex_lock (&self.ssync);
/* Find an empty socket slot. */
/* TODO: This is O(n) operation! Linked list of empty slots should be
implemented. */
for (s = 0; s != self.max_socks; ++s)
if (!self.socks [s])
break;
/* TODO: Auto-resize the array here! */
if (sp_slow (s == self.max_socks)) {
sp_mutex_unlock (&self.ssync);
errno = EMFILE;
return -1;
}
for (it = sp_list_begin (&self.socktypes);
it != sp_list_end (&self.socktypes);
it = sp_list_next (&self.socktypes, it)) {
socktype = sp_cont (it, struct sp_socktype, list);
if (socktype->domain == domain && socktype->protocol == protocol) {
self.socks [s] = (struct sp_sock*) socktype->create (s);
sp_mutex_unlock (&self.ssync);
return s;
}
}
/* Specified socket type wasn't found. */
sp_mutex_unlock (&self.ssync);
errno = EINVAL;
return -1;
}
int sp_close (int s)
{
int rc;
int i;
struct sp_ep *ep;
SP_BASIC_CHECKS;
sp_mutex_lock (&self.esync);
/* Ask all the endpoints associated with the socket to shut down. */
/* TODO: This is O(n) algorithm. */
for (i = 0; i != self.max_eps; ++i) {
ep = self.eps [i];
if (ep && sp_ep_fd (ep) == s) {
rc = sp_ep_close (ep, 0); /* TODO: linger */
/* TODO: The case where endpoint was already asked to shut down
should be distinguished from the case where the shut down is
processed asynchronously. */
if (rc == -EINPROGRESS)
continue;
errnum_assert (rc == 0, -rc);
self.eps [i] = NULL;
}
}
sp_mutex_unlock (&self.esync);
/* Deallocate the socket object itself. */
sp_mutex_lock (&self.ssync);
sp_sock_term (self.socks [s]);
sp_free (self.socks [s]);
self.socks [s] = NULL;
sp_mutex_unlock (&self.ssync);
return 0;
}
int sp_setsockopt (int s, int level, int option, const void *optval,
size_t optvallen)
{
int rc;
SP_BASIC_CHECKS;
if (sp_slow (!optval && optvallen)) {
errno = EFAULT;
return -1;
}
rc = sp_sock_setopt (self.socks [s], level, option, optval, optvallen);
if (sp_slow (rc < 0)) {
errno = -rc;
return -1;
}
errnum_assert (rc == 0, -rc);
return 0;
}
int sp_getsockopt (int s, int level, int option, void *optval,
size_t *optvallen)
{
int rc;
SP_BASIC_CHECKS;
if (sp_slow (!optval && optvallen)) {
errno = EFAULT;
return -1;
}
rc = sp_sock_getopt (self.socks [s], level, option, optval, optvallen);
if (sp_slow (rc < 0)) {
errno = -rc;
return -1;
}
errnum_assert (rc == 0, -rc);
return 0;
}
int sp_bind (int s, const char *addr)
{
int rc;
SP_BASIC_CHECKS;
rc = sp_ctx_create_endpoint (s, addr, 1);
if (rc < 0) {
errno = -rc;
return -1;
}
return rc;
}
int sp_connect (int s, const char *addr)
{
int rc;
SP_BASIC_CHECKS;
rc = sp_ctx_create_endpoint (s, addr, 0);
if (rc < 0) {
errno = -rc;
return -1;
}
return rc;
}
int sp_shutdown (int s, int how)
{
int rc;
struct sp_ep *ep;
SP_BASIC_CHECKS;
sp_mutex_lock (&self.esync);
/* Check whether the endpoint exists. */
ep = self.eps [how];
if (sp_slow (!ep)) {
sp_mutex_unlock (&self.esync);
errno = -EINVAL;
return -1;
}
/* Check whether the endpoint is associated with the socket in question. */
if (sp_slow (sp_ep_fd (ep) != s)) {
sp_mutex_unlock (&self.esync);
errno = -EINVAL;
return -1;
}
/* Ask all the endpoint to shut down. */
rc = sp_ep_close (ep, 0); /* TODO: linger */
/* If the endpoint haven't deallocated itself immediately, do nothing. */
/* TODO: The case where endpoint was already asked to shut down should
be distinguished from the case where the shut down is processed
asynchronously. */
if (rc == -EINPROGRESS) {
sp_mutex_unlock (&self.esync);
return 0;
}
errnum_assert (rc == 0, -rc);
self.eps [how] = NULL;
sp_mutex_unlock (&self.esync);
return 0;
}
int sp_send (int s, const void *buf, size_t len, int flags)
{
int rc;
SP_BASIC_CHECKS;
if (sp_slow (!buf && len)) {
errno = EFAULT;
return -1;
}
rc = sp_sock_send (self.socks [s], buf, len, flags);
if (sp_slow (rc < 0)) {
errno = -rc;
return -1;
}
sp_assert (rc == 0);
return (int) len;
}
int sp_recv (int s, void *buf, size_t len, int flags)
{
int rc;
SP_BASIC_CHECKS;
if (sp_slow (!buf && len)) {
errno = EFAULT;
return -1;
}
rc = sp_sock_recv (self.socks [s], buf, &len, flags);
if (sp_slow (rc < 0)) {
errno = -rc;
return -1;
}
sp_assert (rc == 0);
return (int) len;
}
static void sp_ctx_add_transport (struct sp_transport *transport)
{
transport->init ();
sp_list_insert (&self.transports, &transport->list,
sp_list_end (&self.transports));
}
static struct sp_transport *sp_ctx_find_transport (const char *name, size_t len)
{
struct sp_list_item *it;
struct sp_transport *tp;
for (it = sp_list_begin (&self.transports);
it != sp_list_end (&self.transports);
it = sp_list_next (&self.transports, it)) {
tp = sp_cont (it, struct sp_transport, list);
if (strlen (tp->name ()) == len &&
memcmp (tp->name (), name, len) == 0)
return tp;
}
return NULL;
}
static void sp_ctx_add_socktype (struct sp_socktype *socktype)
{
sp_list_insert (&self.socktypes, &socktype->list,
sp_list_end (&self.socktypes));
}
static int sp_ctx_create_endpoint (int fd, const char *addr, int bind)
{
int rc;
int eid;
const char *proto;
const char *delim;
size_t protosz;
struct sp_transport *tp;
struct sp_ep *ep;
/* Check whether address is valid. */
if (!addr)
return -EFAULT;
if (strlen (addr) >= SP_SOCKADDR_MAX)
return -EINVAL;
/* Separate the protocol and the actual address. */
proto = addr;
delim = strchr (addr, ':');
if (!delim)
return -EINVAL;
if (delim [1] != '/' || delim [2] != '/')
return -EINVAL;
protosz = delim - addr;
addr += protosz + 3;
tp = sp_ctx_find_transport (proto, protosz);
/* The protocol specified doesn't match any known protocol. */
if (!tp)
return -EPROTONOSUPPORT;
sp_mutex_lock (&self.esync);
/* Ask transport to create appropriate endpoint object. */
if (bind)
rc = tp->bind (addr, (void*) self.socks [fd],
(struct sp_epbase**) &ep);
else
rc = tp->connect (addr, (void*) self.socks [fd],
(struct sp_epbase**) &ep);
if (sp_slow (rc != 0)) {
sp_mutex_unlock (&self.esync);
return rc;
}
sp_assert (ep);
/* Find an unused endpoint ID. */
/* TODO: This is O(n) operation! Linked list of empty endpoint IDs
should be implemented. */
for (eid = 0; eid != self.max_eps; ++eid)
if (!self.eps [eid])
break;
/* TODO: Auto-resize the array here! */
if (sp_slow (eid == self.max_eps)) {
sp_mutex_unlock (&self.esync);
return -EMFILE;
}
/* Store the reference to the endpoint. */
self.eps [eid] = ep;
sp_mutex_unlock (&self.esync);
return eid;
}