blob: ca930f34bccc2925be0a8973e6482648296c8236 [file] [log] [blame]
/*
Copyright (c) 2012-2013 250bpm s.r.o.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom
the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE SOFTWARE.
*/
#include "../protocol.h"
#include "../transport.h"
#include "sock.h"
#include "global.h"
#include "ep.h"
#include "../utils/err.h"
#include "../utils/cont.h"
#include "../utils/fast.h"
#include "../utils/msg.h"
/* This flag is set, if nn_term() function was already called. All the socket
function, except for nn_close() should return ETERM error in such case. */
#define NN_SOCK_FLAG_ZOMBIE 1
/* These bits specify whether individual efds are signalled or not at
the moment. Storing this information allows us to avoid redundant signalling
and unsignalling of the efd objects. */
#define NN_SOCK_FLAG_IN 2
#define NN_SOCK_FLAG_OUT 4
/* Set if nn_close() is already in progress. */
#define NN_SOCK_FLAG_CLOSING 8
/* Private functions. */
void nn_sockbase_adjust_events (struct nn_sockbase *self);
struct nn_optset *nn_sockbase_optset (struct nn_sockbase *self, int id);
int nn_sockbase_init (struct nn_sockbase *self,
const struct nn_sockbase_vfptr *vfptr)
{
int rc;
int i;
/* Make sure that at least one message direction is supported. */
nn_assert (!(vfptr->flags & NN_SOCKBASE_FLAG_NOSEND) ||
!(vfptr->flags & NN_SOCKBASE_FLAG_NORECV));
/* Create the AIO context for the SP socket. */
nn_ctx_init (&self->ctx, nn_global_getpool ());
/* Open the NN_SNDFD and NN_RCVFD efds. Do so, only if the socket type
supports send/recv, as appropriate. */
if (vfptr->flags & NN_SOCKBASE_FLAG_NOSEND)
memset (&self->sndfd, 0xcd, sizeof (self->sndfd));
else {
rc = nn_efd_init (&self->sndfd);
if (nn_slow (rc < 0))
return rc;
}
if (vfptr->flags & NN_SOCKBASE_FLAG_NORECV)
memset (&self->rcvfd, 0xcd, sizeof (self->rcvfd));
else {
rc = nn_efd_init (&self->rcvfd);
if (nn_slow (rc < 0)) {
if (!(vfptr->flags & NN_SOCKBASE_FLAG_NOSEND))
nn_efd_term (&self->sndfd);
return rc;
}
}
memset (&self->termsem, 0xcd, sizeof (self->termsem));
rc = nn_cp_init (&self->cp);
if (nn_slow (rc < 0)) {
if (!(vfptr->flags & NN_SOCKBASE_FLAG_NORECV))
nn_efd_term (&self->rcvfd);
if (!(vfptr->flags & NN_SOCKBASE_FLAG_NOSEND))
nn_efd_term (&self->sndfd);
return rc;
}
self->vfptr = vfptr;
self->flags = 0;
nn_clock_init (&self->clock);
nn_list_init (&self->eps);
self->eid = 1;
/* Default values for NN_SOL_SOCKET options. */
self->domain = -1;
self->protocol = -1;
self->linger = 1000;
self->sndbuf = 128 * 1024;
self->rcvbuf = 128 * 1024;
self->sndtimeo = -1;
self->rcvtimeo = -1;
self->reconnect_ivl = 100;
self->reconnect_ivl_max = 0;
self->sndprio = 8;
self->rcvprio = 8;
/* The transport-specific options are not initialised immediately,
rather, they are allocated later on when needed. */
for (i = 0; i != NN_MAX_TRANSPORT; ++i)
self->optsets [i] = NULL;
return 0;
}
void nn_sock_zombify (struct nn_sock *self)
{
struct nn_sockbase *sockbase;
sockbase = (struct nn_sockbase*) self;
nn_cp_lock (&sockbase->cp);
sockbase->flags |= NN_SOCK_FLAG_ZOMBIE;
/* Reset IN and OUT events to unblock any polling function. */
if (!(sockbase->flags & NN_SOCK_FLAG_CLOSING)) {
if (!(sockbase->flags & NN_SOCK_FLAG_IN)) {
sockbase->flags |= NN_SOCK_FLAG_IN;
if (!(sockbase->vfptr->flags & NN_SOCKBASE_FLAG_NORECV))
nn_efd_signal (&sockbase->rcvfd);
}
if (!(sockbase->flags & NN_SOCK_FLAG_OUT)) {
sockbase->flags |= NN_SOCK_FLAG_OUT;
if (!(sockbase->vfptr->flags & NN_SOCKBASE_FLAG_NOSEND))
nn_efd_signal (&sockbase->sndfd);
}
}
nn_cp_unlock (&sockbase->cp);
}
int nn_sock_destroy (struct nn_sock *self)
{
int rc;
struct nn_sockbase *sockbase;
struct nn_list_item *it;
struct nn_epbase *ep;
sockbase = (struct nn_sockbase*) self;
nn_cp_lock (&sockbase->cp);
/* The call may have been interrupted by a singal and restarted afterwards.
In such case don't do the following stuff again. */
if (!(sockbase->flags & NN_SOCK_FLAG_CLOSING)) {
/* Mark the socket as being in process of shutting down. */
sockbase->flags |= NN_SOCK_FLAG_CLOSING;
/* Close sndfd and rcvfd. This should make any current select/poll
using SNDFD and/or RCVFD exit. */
if (!(sockbase->vfptr->flags & NN_SOCKBASE_FLAG_NORECV)) {
nn_efd_term (&sockbase->rcvfd);
memset (&sockbase->rcvfd, 0xcd, sizeof (sockbase->rcvfd));
}
if (!(sockbase->vfptr->flags & NN_SOCKBASE_FLAG_NOSEND)) {
nn_efd_term (&sockbase->sndfd);
memset (&sockbase->sndfd, 0xcd, sizeof (sockbase->sndfd));
}
/* Create a semaphore to wait on for all endpoint to terminate. */
nn_sem_init (&sockbase->termsem);
/* Ask all the associated endpoints to terminate. Call to nn_ep_close
can actually deallocate the endpoint, so take care to get pointer
to the next endpoint before the call. */
it = nn_list_begin (&sockbase->eps);
while (it != nn_list_end (&sockbase->eps)) {
ep = nn_cont (it, struct nn_epbase, item);
it = nn_list_next (&sockbase->eps, it);
rc = nn_ep_close ((void*) ep);
errnum_assert (rc == 0 || rc == -EINPROGRESS, -rc);
}
}
/* Shutdown process was already started but some endpoints are still
alive. Here we are going to wait till they are all closed. */
if (!nn_list_empty (&sockbase->eps)) {
nn_cp_unlock (&sockbase->cp);
rc = nn_sem_wait (&sockbase->termsem);
if (nn_slow (rc == -EINTR))
return -EINTR;
errnum_assert (rc == 0, -rc);
nn_cp_lock (&sockbase->cp);
nn_assert (nn_list_empty (&sockbase->eps));
}
/* Deallocation of the socket is done by asking the derived class
to deallocate. Derived class, in turn will terminate the sockbase
class. */
nn_sem_term (&sockbase->termsem);
sockbase->vfptr->destroy (sockbase);
/* At this point the socket is already deallocated, make sure
that it is not used here any more. */
return 0;
}
void nn_sockbase_term (struct nn_sockbase *self)
{
int i;
nn_assert (self->flags & NN_SOCK_FLAG_CLOSING);
/* The lock was done in nn_sock_destroy function. */
nn_cp_unlock (&self->cp);
/* Destroy any optsets associated with the socket. */
for (i = 0; i != NN_MAX_TRANSPORT; ++i)
if (self->optsets [i])
self->optsets [i]->vfptr->destroy (self->optsets [i]);
nn_list_term (&self->eps);
nn_clock_term (&self->clock);
nn_ctx_term (&self->ctx);
nn_cp_term (&self->cp);
}
void nn_sock_postinit (struct nn_sock *self, int domain, int protocol)
{
struct nn_sockbase *sockbase;
sockbase = (struct nn_sockbase*) self;
nn_assert (sockbase->domain == -1 && sockbase->protocol == -1);
sockbase->domain = domain;
sockbase->protocol = protocol;
nn_sockbase_adjust_events (sockbase);
}
void nn_sockbase_changed (struct nn_sockbase *self)
{
nn_sockbase_adjust_events (self);
}
struct nn_cp *nn_sockbase_getcp (struct nn_sockbase *self)
{
return &self->cp;
}
struct nn_ctx *nn_sockbase_getctx (struct nn_sockbase *self)
{
return &self->ctx;
}
struct nn_cp *nn_sock_getcp (struct nn_sock *self)
{
return &((struct nn_sockbase*) self)->cp;
}
struct nn_ctx *nn_sock_getctx (struct nn_sock *self)
{
return &((struct nn_sockbase*) self)->ctx;
}
int nn_sock_ispeer (struct nn_sock *self, int socktype)
{
struct nn_sockbase *sockbase;
sockbase = (struct nn_sockbase*) self;
/* If the peer implements a different SP protocol,
it is not a valid peer. */
if ((sockbase->protocol & 0xfff0) != (socktype & 0xfff0))
return 0;
/* As long as the peer speaks the same protocol, socket type itself
decides which socket types are to be accepted. */
return sockbase->vfptr->ispeer (socktype);
}
int nn_sock_setopt (struct nn_sock *self, int level, int option,
const void *optval, size_t optvallen)
{
int rc;
struct nn_sockbase *sockbase;
struct nn_optset *optset;
int val;
int *dst;
sockbase = (struct nn_sockbase*) self;
nn_cp_lock (&sockbase->cp);
/* If nn_term() was already called, return ETERM. */
if (nn_slow (sockbase->flags &
(NN_SOCK_FLAG_ZOMBIE | NN_SOCK_FLAG_CLOSING))) {
nn_cp_unlock (&sockbase->cp);
return -ETERM;
}
/* Protocol-specific socket options. */
if (level > NN_SOL_SOCKET) {
rc = sockbase->vfptr->setopt (sockbase, level, option,
optval, optvallen);
nn_sockbase_adjust_events (sockbase);
nn_cp_unlock (&sockbase->cp);
return rc;
}
/* Transport-specific options. */
if (level < NN_SOL_SOCKET) {
optset = nn_sockbase_optset (sockbase, level);
if (!optset) {
nn_cp_unlock (&sockbase->cp);
return -ENOPROTOOPT;
}
rc = optset->vfptr->setopt (optset, option, optval, optvallen);
nn_cp_unlock (&sockbase->cp);
return rc;
}
/* At this point we assume that all options are of type int. */
if (optvallen != sizeof (int)) {
nn_cp_unlock (&sockbase->cp);
return -EINVAL;
}
val = *(int*) optval;
/* Generic socket-level options. */
if (level == NN_SOL_SOCKET) {
switch (option) {
case NN_LINGER:
dst = &sockbase->linger;
break;
case NN_SNDBUF:
if (nn_slow (val <= 0)) {
nn_cp_unlock (&sockbase->cp);
return -EINVAL;
}
dst = &sockbase->sndbuf;
break;
case NN_RCVBUF:
if (nn_slow (val <= 0)) {
nn_cp_unlock (&sockbase->cp);
return -EINVAL;
}
dst = &sockbase->rcvbuf;
break;
case NN_SNDTIMEO:
dst = &sockbase->sndtimeo;
break;
case NN_RCVTIMEO:
dst = &sockbase->rcvtimeo;
break;
case NN_RECONNECT_IVL:
if (nn_slow (val < 0)) {
nn_cp_unlock (&sockbase->cp);
return -EINVAL;
}
dst = &sockbase->reconnect_ivl;
break;
case NN_RECONNECT_IVL_MAX:
if (nn_slow (val < 0)) {
nn_cp_unlock (&sockbase->cp);
return -EINVAL;
}
dst = &sockbase->reconnect_ivl_max;
break;
case NN_SNDPRIO:
if (nn_slow (val < 1 || val > 16)) {
nn_cp_unlock (&sockbase->cp);
return -EINVAL;
}
dst = &sockbase->sndprio;
break;
default:
nn_cp_unlock (&sockbase->cp);
return -ENOPROTOOPT;
}
*dst = val;
nn_cp_unlock (&sockbase->cp);
return 0;
}
nn_assert (0);
}
int nn_sock_getopt (struct nn_sock *self, int level, int option,
void *optval, size_t *optvallen, int internal)
{
int rc;
struct nn_sockbase *sockbase;
struct nn_optset *optset;
int intval;
nn_fd fd;
sockbase = (struct nn_sockbase*) self;
if (!internal)
nn_cp_lock (&sockbase->cp);
/* If nn_term() was already called, return ETERM. */
if (!internal && nn_slow (sockbase->flags &
(NN_SOCK_FLAG_ZOMBIE | NN_SOCK_FLAG_CLOSING))) {
nn_cp_unlock (&sockbase->cp);
return -ETERM;
}
/* Generic socket-level options. */
if (level == NN_SOL_SOCKET) {
switch (option) {
case NN_DOMAIN:
intval = sockbase->domain;
break;
case NN_PROTOCOL:
intval = sockbase->protocol;
break;
case NN_LINGER:
intval = sockbase->linger;
break;
case NN_SNDBUF:
intval = sockbase->sndbuf;
break;
case NN_RCVBUF:
intval = sockbase->rcvbuf;
break;
case NN_SNDTIMEO:
intval = sockbase->sndtimeo;
break;
case NN_RCVTIMEO:
intval = sockbase->rcvtimeo;
break;
case NN_RECONNECT_IVL:
intval = sockbase->reconnect_ivl;
break;
case NN_RECONNECT_IVL_MAX:
intval = sockbase->reconnect_ivl_max;
break;
case NN_SNDPRIO:
intval = sockbase->sndprio;
break;
case NN_SNDFD:
if (sockbase->vfptr->flags & NN_SOCKBASE_FLAG_NOSEND) {
if (!internal)
nn_cp_unlock (&sockbase->cp);
return -ENOPROTOOPT;
}
fd = nn_efd_getfd (&sockbase->sndfd);
memcpy (optval, &fd,
*optvallen < sizeof (nn_fd) ? *optvallen : sizeof (nn_fd));
*optvallen = sizeof (nn_fd);
if (!internal)
nn_cp_unlock (&sockbase->cp);
return 0;
case NN_RCVFD:
if (sockbase->vfptr->flags & NN_SOCKBASE_FLAG_NORECV) {
if (!internal)
nn_cp_unlock (&sockbase->cp);
return -ENOPROTOOPT;
}
fd = nn_efd_getfd (&sockbase->rcvfd);
memcpy (optval, &fd,
*optvallen < sizeof (nn_fd) ? *optvallen : sizeof (nn_fd));
*optvallen = sizeof (nn_fd);
if (!internal)
nn_cp_unlock (&sockbase->cp);
return 0;
default:
if (!internal)
nn_cp_unlock (&sockbase->cp);
return -ENOPROTOOPT;
}
memcpy (optval, &intval,
*optvallen < sizeof (int) ? *optvallen : sizeof (int));
*optvallen = sizeof (int);
if (!internal)
nn_cp_unlock (&sockbase->cp);
return 0;
}
/* Protocol-specific socket options. */
if (level > NN_SOL_SOCKET) {
rc = sockbase->vfptr->getopt (sockbase, level, option,
optval, optvallen);
nn_sockbase_adjust_events (sockbase);
if (!internal)
nn_cp_unlock (&sockbase->cp);
return rc;
}
/* Transport-specific options. */
if (level < NN_SOL_SOCKET) {
optset = nn_sockbase_optset (sockbase, level);
if (!optset) {
if (!internal)
nn_cp_unlock (&sockbase->cp);
return -ENOPROTOOPT;
}
rc = optset->vfptr->getopt (optset, option, optval, optvallen);
if (!internal)
nn_cp_unlock (&sockbase->cp);
return rc;
}
nn_assert (0);
}
int nn_sock_add_ep (struct nn_sock *self, const char *addr,
int (*factory) (const char *addr, void *hint, struct nn_epbase **ep))
{
int rc;
struct nn_sockbase *sockbase;
struct nn_epbase *ep;
int eid;
sockbase = (struct nn_sockbase*) self;
nn_cp_lock (&sockbase->cp);
/* Create the transport-specific endpoint. */
rc = factory (addr, (void*) self, &ep);
if (nn_slow (rc < 0)) {
nn_cp_unlock (&sockbase->cp);
return rc;
}
/* Provide it with an unique endpoint ID. */
eid = ep->eid = sockbase->eid;
++sockbase->eid;
/* Add it to the list of active endpoints. */
nn_list_insert (&sockbase->eps, &ep->item, nn_list_end (&sockbase->eps));
nn_cp_unlock (&sockbase->cp);
return eid;
}
int nn_sock_rm_ep (struct nn_sock *self, int eid)
{
int rc;
struct nn_sockbase *sockbase;
struct nn_list_item *it;
struct nn_epbase *ep;
sockbase = (struct nn_sockbase*) self;
nn_cp_lock (&sockbase->cp);
/* Find the specified enpoint. */
ep = NULL;
for (it = nn_list_begin (&sockbase->eps);
it != nn_list_end (&sockbase->eps);
it = nn_list_next (&sockbase->eps, it)) {
ep = nn_cont (it, struct nn_epbase, item);
if (ep->eid == eid)
break;
ep = NULL;
}
/* The endpoint doesn't exist. */
if (!ep) {
nn_cp_unlock (&sockbase->cp);
return -EINVAL;
}
/* Ask the endpoint to shutdown. Actual terminatation may be delayed
by the transport. */
rc = nn_ep_close ((void*) ep);
errnum_assert (rc == 0 || rc == -EINPROGRESS, -rc);
nn_cp_unlock (&sockbase->cp);
return 0;
}
void nn_sock_ep_closed (struct nn_sock *self, struct nn_epbase *ep)
{
struct nn_sockbase *sockbase;
sockbase = (struct nn_sockbase*) self;
/* Remove the endpoint from the list of active endpoints. */
nn_list_erase (&sockbase->eps, &ep->item);
/* nn_close() may be waiting for termination of this endpoint.
Send it a signal. */
if (sockbase->flags & NN_SOCK_FLAG_CLOSING &&
nn_list_empty (&sockbase->eps))
nn_sem_post (&sockbase->termsem);
}
int nn_sock_send (struct nn_sock *self, struct nn_msg *msg, int flags)
{
int rc;
struct nn_sockbase *sockbase;
uint64_t deadline;
uint64_t now;
int timeout;
sockbase = (struct nn_sockbase*) self;
/* Some sockets types cannot be used for sending messages. */
if (nn_slow (sockbase->vfptr->flags & NN_SOCKBASE_FLAG_NOSEND))
return -ENOTSUP;
nn_cp_lock (&sockbase->cp);
/* Compute the deadline for SNDTIMEO timer. */
if (sockbase->sndtimeo < 0)
timeout = -1;
else {
deadline = nn_clock_now (&sockbase->clock) + sockbase->sndtimeo;
timeout = sockbase->sndtimeo;
}
while (1) {
/* If nn_term() was already called, return ETERM. */
if (nn_slow (sockbase->flags &
(NN_SOCK_FLAG_ZOMBIE | NN_SOCK_FLAG_CLOSING))) {
nn_cp_unlock (&sockbase->cp);
return -ETERM;
}
/* Try to send the message in a non-blocking way. */
rc = sockbase->vfptr->send (sockbase, msg);
nn_sockbase_adjust_events (sockbase);
if (nn_fast (rc == 0)) {
nn_cp_unlock (&sockbase->cp);
return 0;
}
nn_assert (rc < 0);
/* Any unexpected error is forwarded to the caller. */
if (nn_slow (rc != -EAGAIN)) {
nn_cp_unlock (&sockbase->cp);
return rc;
}
/* If the message cannot be sent at the moment and the send call
is non-blocking, return immediately. */
if (nn_fast (flags & NN_DONTWAIT)) {
nn_cp_unlock (&sockbase->cp);
return -EAGAIN;
}
/* With blocking send, wait while there are new pipes available
for sending. */
nn_cp_unlock (&sockbase->cp);
rc = nn_efd_wait (&sockbase->sndfd, timeout);
if (nn_slow (rc == -ETIMEDOUT))
return -EAGAIN;
if (nn_slow (rc == -EINTR))
return -EINTR;
errnum_assert (rc == 0, rc);
nn_cp_lock (&sockbase->cp);
/* If needed, re-compute the timeout to reflect the time that have
already elapsed. */
if (sockbase->sndtimeo >= 0) {
now = nn_clock_now (&sockbase->clock);
timeout = (int) (now > deadline ? 0 : deadline - now);
}
}
}
int nn_sock_recv (struct nn_sock *self, struct nn_msg *msg, int flags)
{
int rc;
struct nn_sockbase *sockbase;
uint64_t deadline;
uint64_t now;
int timeout;
sockbase = (struct nn_sockbase*) self;
/* Some sockets types cannot be used for receiving messages. */
if (nn_slow (sockbase->vfptr->flags & NN_SOCKBASE_FLAG_NORECV))
return -ENOTSUP;
nn_cp_lock (&sockbase->cp);
/* Compute the deadline for RCVTIMEO timer. */
if (sockbase->rcvtimeo < 0)
timeout = -1;
else {
deadline = nn_clock_now (&sockbase->clock) + sockbase->rcvtimeo;
timeout = sockbase->rcvtimeo;
}
while (1) {
/* If nn_term() was already called, return ETERM. */
if (nn_slow (sockbase->flags &
(NN_SOCK_FLAG_ZOMBIE | NN_SOCK_FLAG_CLOSING))) {
nn_cp_unlock (&sockbase->cp);
return -ETERM;
}
/* Try to receive the message in a non-blocking way. */
rc = sockbase->vfptr->recv (sockbase, msg);
nn_sockbase_adjust_events (sockbase);
if (nn_fast (rc == 0)) {
nn_cp_unlock (&sockbase->cp);
return 0;
}
nn_assert (rc < 0);
/* Any unexpected error is forwarded to the caller. */
if (nn_slow (rc != -EAGAIN)) {
nn_cp_unlock (&sockbase->cp);
return rc;
}
/* If the message cannot be received at the moment and the recv call
is non-blocking, return immediately. */
if (nn_fast (flags & NN_DONTWAIT)) {
nn_cp_unlock (&sockbase->cp);
return -EAGAIN;
}
/* With blocking recv, wait while there are new pipes available
for receiving. */
nn_cp_unlock (&sockbase->cp);
rc = nn_efd_wait (&sockbase->rcvfd, timeout);
if (nn_slow (rc == -ETIMEDOUT))
return -EAGAIN;
if (nn_slow (rc == -EINTR))
return -EINTR;
errnum_assert (rc == 0, rc);
nn_cp_lock (&sockbase->cp);
/* If needed, re-compute the timeout to reflect the time that have
already elapsed. */
if (sockbase->rcvtimeo >= 0) {
now = nn_clock_now (&sockbase->clock);
timeout = (int) (now > deadline ? 0 : deadline - now);
}
}
}
int nn_sock_add (struct nn_sock *self, struct nn_pipe *pipe)
{
int rc;
struct nn_sockbase *sockbase;
sockbase = (struct nn_sockbase*) self;
rc = sockbase->vfptr->add (sockbase, pipe);
nn_sockbase_adjust_events (sockbase);
return rc;
}
void nn_sock_rm (struct nn_sock *self, struct nn_pipe *pipe)
{
struct nn_sockbase *sockbase;
sockbase = (struct nn_sockbase*) self;
sockbase->vfptr->rm (sockbase, pipe);
nn_sockbase_adjust_events (sockbase);
}
void nn_sock_in (struct nn_sock *self, struct nn_pipe *pipe)
{
struct nn_sockbase *sockbase;
sockbase = (struct nn_sockbase*) self;
sockbase->vfptr->in (sockbase, pipe);
nn_sockbase_adjust_events (sockbase);
}
void nn_sock_out (struct nn_sock *self, struct nn_pipe *pipe)
{
struct nn_sockbase *sockbase;
sockbase = (struct nn_sockbase*) self;
sockbase->vfptr->out (sockbase, pipe);
nn_sockbase_adjust_events (sockbase);
}
void nn_sockbase_adjust_events (struct nn_sockbase *self)
{
int events;
/* If nn_close() was already called there's no point in adjusting the
snd/rcv file descriptors. */
if (self->flags & NN_SOCK_FLAG_CLOSING)
return;
/* Check whether socket is readable and/or writeable at the moment. */
events = self->vfptr->events (self);
errnum_assert (events >= 0, -events);
/* Signal/unsignal IN as needed. */
if (!(self->vfptr->flags & NN_SOCKBASE_FLAG_NORECV)) {
if (events & NN_SOCKBASE_EVENT_IN) {
nn_assert (!(self->vfptr->flags & NN_SOCKBASE_FLAG_NORECV));
if (!(self->flags & NN_SOCK_FLAG_IN)) {
self->flags |= NN_SOCK_FLAG_IN;
nn_efd_signal (&self->rcvfd);
}
}
else {
if (self->flags & NN_SOCK_FLAG_IN) {
self->flags &= ~NN_SOCK_FLAG_IN;
nn_efd_unsignal (&self->rcvfd);
}
}
}
/* Signal/unsignal OUT as needed. */
if (!(self->vfptr->flags & NN_SOCKBASE_FLAG_NOSEND)) {
if (events & NN_SOCKBASE_EVENT_OUT) {
nn_assert (!(self->vfptr->flags & NN_SOCKBASE_FLAG_NOSEND));
if (!(self->flags & NN_SOCK_FLAG_OUT)) {
self->flags |= NN_SOCK_FLAG_OUT;
nn_efd_signal (&self->sndfd);
}
}
else {
if (self->flags & NN_SOCK_FLAG_OUT) {
self->flags &= ~NN_SOCK_FLAG_OUT;
nn_efd_unsignal (&self->sndfd);
}
}
}
}
struct nn_optset *nn_sockbase_optset (struct nn_sockbase *self, int id)
{
int index;
struct nn_transport *tp;
/* Transport IDs are negative and start from -1. */
index = (-id) - 1;
/* Check for invalid indices. */
if (nn_slow (index < 0 || index >= NN_MAX_TRANSPORT))
return NULL;
/* If the option set already exists return it. */
if (nn_fast (self->optsets [index] != NULL))
return self->optsets [index];
/* If the option set doesn't exist yet, create it. */
tp = nn_global_transport (id);
if (nn_slow (!tp))
return NULL;
if (nn_slow (!tp->optset))
return NULL;
self->optsets [index] = tp->optset ();
return self->optsets [index];
}