blob: f6fa4070248c9bb37ff1aea70969cb558e797025 [file] [log] [blame]
/*
Copyright (c) 2012-2013 250bpm s.r.o.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom
the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE SOFTWARE.
*/
#include "stream.h"
#include "../../utils/err.h"
#include "../../utils/cont.h"
#include "../../utils/wire.h"
#include "../../utils/fast.h"
#include <string.h>
#include <stdint.h>
/* Possible states of object. */
#define NN_STREAM_STATE_INIT 0
#define NN_STREAM_STATE_SENDING_PROTOHDR 1
#define NN_STREAM_STATE_RECEIVING_PROTOHDR 2
#define NN_STREAM_STATE_CLOSING_TIMER 3
#define NN_STREAM_STATE_CLOSING_TIMER_ERROR 4
#define NN_STREAM_STATE_RECEIVING_MSGHDR 5
#define NN_STREAM_STATE_RECEIVING_MSGBODY 6
#define NN_STREAM_STATE_CLOSED 7
/* Inbound events. */
#define NN_STREAM_EVENT_START 1
#define NN_STREAM_EVENT_SEND 2
#define NN_STREAM_EVENT_RECV 3
#define NN_STREAM_EVENT_CLOSE 4
/* Private functions. */
static void nn_stream_callback (struct nn_fsm *self, void *source, int type);
/* Stream is a special type of pipe. Here it implements the pipe interface. */
static int nn_stream_send (struct nn_pipebase *self, struct nn_msg *msg);
static int nn_stream_recv (struct nn_pipebase *self, struct nn_msg *msg);
const struct nn_pipebase_vfptr nn_stream_pipebase_vfptr = {
nn_stream_send,
nn_stream_recv
};
void nn_stream_init (struct nn_stream *self, struct nn_epbase *epbase,
struct nn_usock *usock, struct nn_fsm *owner)
{
int rc;
int protocol;
size_t sz;
/* Initialise the state machine. */
nn_fsm_init (&self->fsm, nn_stream_callback, owner);
self->state = NN_STREAM_STATE_INIT;
/* Redirect the underlying socket's events to this state machine. */
self->usock = usock;
self->usock_owner = nn_usock_swap_owner (self->usock, &self->fsm);
/* Initialise the pipe to communicate with the user. */
rc = nn_pipebase_init (&self->pipebase, &nn_stream_pipebase_vfptr, epbase);
nn_assert (rc == 0);
nn_msg_init (&self->inmsg, 0);
nn_msg_init (&self->outmsg, 0);
nn_timer_init (&self->hdr_timeout, &self->fsm);
/* Prepare the outgoing protocol header. */
sz = sizeof (protocol);
nn_epbase_getopt (epbase, NN_SOL_SOCKET, NN_PROTOCOL, &protocol, &sz);
errnum_assert (rc == 0, -rc);
nn_assert (sz == sizeof (protocol));
memcpy (self->protohdr, "\0\0SP\0\0\0\0", 8);
nn_puts (self->protohdr + 4, (uint16_t) protocol);
/* Pass the event to the state machine. */
nn_stream_callback (&self->fsm, NULL, NN_STREAM_EVENT_START);
}
void nn_stream_close (struct nn_stream *self)
{
/* Pass the appropriate event to the state machine. */
nn_stream_callback (&self->fsm, NULL, NN_STREAM_EVENT_CLOSE);
}
void nn_stream_term (struct nn_stream *self)
{
/* Sanity check. */
nn_assert (self->state == NN_STREAM_STATE_CLOSED);
nn_msg_term (&self->inmsg);
nn_msg_term (&self->outmsg);
nn_pipebase_term (&self->pipebase);
/* Return control of the underlying socket to the parent state machine. */
nn_usock_swap_owner (self->usock, self->usock_owner);
}
#if 0
static void nn_stream_hdr_sent (const struct nn_cp_sink **self,
struct nn_aio_usock *usock)
{
struct nn_stream *stream;
stream = nn_cont (self, struct nn_stream, sink);
stream->sink = &nn_stream_state_sent;
/* Receive the protocol header from the peer. */
nn_aio_usock_recv (usock, stream->protohdr, 8);
}
static void nn_stream_hdr_received (const struct nn_cp_sink **self,
struct nn_aio_usock *usock)
{
struct nn_stream *stream;
int protocol;
stream = nn_cont (self, struct nn_stream, sink);
stream->sink = &nn_stream_state_active;
nn_aio_timer_stop (&stream->hdr_timeout);
/* TODO: If it does not conform, drop the connection. */
protocol = nn_gets (stream->protohdr + 4);
if (!nn_pipebase_ispeer (&stream->pipebase, protocol))
nn_assert (0);
/* Connection is ready for sending. Make outpipe available
to the SP socket. */
nn_pipebase_activate (&stream->pipebase);
/* Start waiting for incoming messages. First, read the 8-byte size. */
stream->instate = NN_STREAM_INSTATE_HDR;
nn_aio_usock_recv (stream->usock, stream->inhdr, 8);
}
static void nn_stream_hdr_timeout (const struct nn_cp_sink **self,
struct nn_aio_timer *timer)
{
struct nn_stream *stream;
const struct nn_cp_sink **original_sink;
/* The initial protocol header exchange have timed out. */
stream = nn_cont (self, struct nn_stream, sink);
original_sink = stream->original_sink;
/* Terminate the session object. */
nn_stream_term (stream);
/* Notify the parent state machine about the failure. */
nn_assert ((*original_sink)->err);
(*original_sink)->err (original_sink, stream->usock, ETIMEDOUT);
}
static void nn_stream_received (const struct nn_cp_sink **self,
struct nn_aio_usock *usock)
{
struct nn_stream *stream;
uint64_t size;
stream = nn_cont (self, struct nn_stream, sink);
switch (stream->instate) {
case NN_STREAM_INSTATE_HDR:
size = nn_getll (stream->inhdr);
nn_msg_term (&stream->inmsg);
nn_msg_init (&stream->inmsg, (size_t) size);
if (!size) {
nn_pipebase_received (&stream->pipebase);
break;
}
stream->instate = NN_STREAM_INSTATE_BODY;
nn_aio_usock_recv (stream->usock,
nn_chunkref_data (&stream->inmsg.body), (size_t) size);
break;
case NN_STREAM_INSTATE_BODY:
nn_pipebase_received (&stream->pipebase);
break;
default:
nn_assert (0);
}
}
static void nn_stream_sent (const struct nn_cp_sink **self,
struct nn_aio_usock *usock)
{
struct nn_stream *stream;
stream = nn_cont (self, struct nn_stream, sink);
nn_pipebase_sent (&stream->pipebase);
nn_msg_term (&stream->outmsg);
nn_msg_init (&stream->outmsg, 0);
}
static void nn_stream_err (const struct nn_cp_sink **self,
struct nn_aio_usock *usock, int errnum)
{
struct nn_stream *stream;
const struct nn_cp_sink **original_sink;
stream = nn_cont (self, struct nn_stream, sink);
original_sink = stream->original_sink;
/* Terminate the session object. */
nn_stream_term (stream);
/* Notify the parent state machine about the failure. */
nn_assert ((*original_sink)->err);
(*original_sink)->err (original_sink, usock, errnum);
}
#endif
static int nn_stream_send (struct nn_pipebase *self, struct nn_msg *msg)
{
struct nn_stream *stream;
stream = nn_cont (self, struct nn_stream, pipebase);
/* Move the message to the local storage. */
nn_msg_term (&stream->outmsg);
nn_msg_mv (&stream->outmsg, msg);
/* Pass the event to the state machine. */
nn_stream_callback (&stream->fsm, NULL, NN_STREAM_EVENT_SEND);
#if 0
struct nn_iobuf iov [3];
/* Serialise the message header. */
nn_putll (stream->outhdr, nn_chunkref_size (&stream->outmsg.hdr) +
nn_chunkref_size (&stream->outmsg.body));
/* Start async sending. */
iov [0].iov_base = stream->outhdr;
iov [0].iov_len = sizeof (stream->outhdr);
iov [1].iov_base = nn_chunkref_data (&stream->outmsg.hdr);
iov [1].iov_len = nn_chunkref_size (&stream->outmsg.hdr);
iov [2].iov_base = nn_chunkref_data (&stream->outmsg.body);
iov [2].iov_len = nn_chunkref_size (&stream->outmsg.body);;
nn_aio_usock_send (stream->usock, iov, 3);
#endif
return 0;
}
static int nn_stream_recv (struct nn_pipebase *self, struct nn_msg *msg)
{
struct nn_stream *stream;
stream = nn_cont (self, struct nn_stream, pipebase);
/* Move received message to the user. */
nn_msg_mv (msg, &stream->inmsg);
nn_msg_init (&stream->inmsg, 0);
/* We can start receiving a new message now. Pass the event to the
state machine. */
nn_stream_callback (&stream->fsm, NULL, NN_STREAM_EVENT_RECV);
#if 0
/* Start receiving new message. */
stream->instate = NN_STREAM_INSTATE_HDR;
nn_aio_usock_recv (stream->usock, stream->inhdr, 8);
#endif
return 0;
}
static void nn_stream_callback (struct nn_fsm *self, void *source, int type)
{
struct nn_stream *stream;
struct nn_iovec iovec;
stream = nn_cont (self, struct nn_stream, fsm);
switch (stream->state) {
case NN_STREAM_STATE_INIT:
if (source == NULL) {
switch (type) {
case NN_STREAM_EVENT_START:
/* Start the header timeout timer. */
nn_timer_start (&stream->hdr_timeout, 1000);
/* Send the protocol header. We don't event try to do sending
and receiving the header in parallel. The rationale is that
the outgoing header will fill into TCP tx buffer and thus
will be sent asynchronously anyway. */
iovec.iov_base = stream->protohdr;
iovec.iov_len = 8;
nn_usock_send (stream->usock, &iovec, 1);
stream->state = NN_STREAM_STATE_SENDING_PROTOHDR;
return;
default:
nn_assert (0);
}
}
nn_assert (0);
/******************************************************************************/
/* SENDING_PROTOHDR state. */
/******************************************************************************/
case NN_STREAM_STATE_SENDING_PROTOHDR:
if (source == stream->usock) {
switch (type) {
case NN_USOCK_SENT:
nn_assert (0);
case NN_USOCK_ERROR:
nn_assert (0);
default:
nn_assert (0);
}
}
if (source == &stream->hdr_timeout) {
switch (type) {
case NN_TIMER_TIMEOUT:
nn_assert (0);
default:
nn_assert (0);
}
}
nn_assert (0);
/******************************************************************************/
/* RECEIVING_PROTOHDR state. */
/******************************************************************************/
case NN_STREAM_STATE_RECEIVING_PROTOHDR:
if (source == stream->usock) {
switch (type) {
case NN_USOCK_RECEIVED:
nn_assert (0);
case NN_USOCK_ERROR:
nn_assert (0);
default:
nn_assert (0);
}
}
if (source == &stream->hdr_timeout) {
switch (type) {
case NN_TIMER_TIMEOUT:
nn_assert (0);
default:
nn_assert (0);
}
}
nn_assert (0);
/******************************************************************************/
/* CLOSING__TIMER state. */
/******************************************************************************/
case NN_STREAM_STATE_CLOSING_TIMER:
nn_assert (0);
/******************************************************************************/
/* CLOSING_TIMER_ERROR state. */
/******************************************************************************/
case NN_STREAM_STATE_CLOSING_TIMER_ERROR:
nn_assert (0);
/******************************************************************************/
/* ACTIVE state (actually, it's a combination of two sub-states) */
/******************************************************************************/
case NN_STREAM_STATE_RECEIVING_MSGHDR:
case NN_STREAM_STATE_RECEIVING_MSGBODY:
nn_assert (0);
/******************************************************************************/
/* CLOSED state. */
/******************************************************************************/
case NN_STREAM_STATE_CLOSED:
nn_assert (0);
default:
nn_assert (0);
}
}