"close" step added to the shutdown of individual socket types
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 1d11031..01bb9a9 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright (c) 2012 250bpm s.r.o.
+# Copyright (c) 2012-2013 250bpm s.r.o.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"),
diff --git a/src/protocol.h b/src/protocol.h
index 382e701..83877aa 100644
--- a/src/protocol.h
+++ b/src/protocol.h
@@ -71,7 +71,7 @@
struct nn_sockbase;
-/* Event generated by the sockbase object. Not to be used directly. */
+/* Event generated by the sockbase object. Do not use it directly! */
#define NN_SOCKBASE_CLOSED 1
/* Any combination of these events can be returned from 'events' virtual
@@ -88,9 +88,9 @@
/* Deallocate the socket. */
void (*destroy) (struct nn_sockbase *self);
- /* Management of pipes. 'add' registers a new pipe. The pipe cannot to
- send to or received from at the moment. 'rm' unregisters the pipe.
- The pipe should not be used after this call as it may already be
+ /* Management of pipes. 'add' registers a new pipe. The pipe cannot be used
+ to send to or to be received from at the moment. 'rm' unregisters the
+ pipe. The pipe should not be used after this call as it may already be
deallocated. 'in' informs the socket that pipe is readable. 'out'
informs it that it is writeable. */
int (*add) (struct nn_sockbase *self, struct nn_pipe *pipe);
@@ -99,7 +99,7 @@
void (*out) (struct nn_sockbase *self, struct nn_pipe *pipe);
/* Return any combination of event flags defined above, thus specifying
- whether the socket should be readable, writeable or none. */
+ whether the socket should be readable, writeable, both or none. */
int (*events) (struct nn_sockbase *self);
/* Send a message to the socket. Returns -EAGAIN if it cannot be done at
@@ -125,11 +125,12 @@
struct nn_fsm_event event_closed;
};
-/* Initialise the socket. */
+/* Initialise the socket base class. 'hint' is the opaque value passed to the
+ nn_transport's 'create' function. */
int nn_sockbase_init (struct nn_sockbase *self,
const struct nn_sockbase_vfptr *vfptr, void *hint);
-/* Uninitialise the socket. */
+/* Terminate the socket base class. */
void nn_sockbase_term (struct nn_sockbase *self);
/* Call this function when closing is done. */
diff --git a/src/protocols/fanin/xsink.c b/src/protocols/fanin/xsink.c
index ddb2a41..faecbf9 100644
--- a/src/protocols/fanin/xsink.c
+++ b/src/protocols/fanin/xsink.c
@@ -47,6 +47,7 @@
static void nn_xsink_term (struct nn_xsink *self);
/* Implementation of nn_sockbase's virtual functions. */
+static void nn_xsink_close (struct nn_sockbase *self);
static void nn_xsink_destroy (struct nn_sockbase *self);
static int nn_xsink_add (struct nn_sockbase *self, struct nn_pipe *pipe);
static void nn_xsink_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
@@ -59,6 +60,7 @@
static int nn_xsink_getopt (struct nn_sockbase *self, int level, int option,
void *optval, size_t *optvallen);
static const struct nn_sockbase_vfptr nn_xsink_sockbase_vfptr = {
+ nn_xsink_close,
nn_xsink_destroy,
nn_xsink_add,
nn_xsink_rm,
@@ -91,6 +93,12 @@
nn_sockbase_term (&self->sockbase);
}
+void nn_xsink_close (struct nn_sockbase *self)
+{
+ /* Nothing special to do done. The object is closed straight away. */
+ nn_sockbase_closed (self);
+}
+
void nn_xsink_destroy (struct nn_sockbase *self)
{
struct nn_xsink *xsink;
diff --git a/src/protocols/fanin/xsource.c b/src/protocols/fanin/xsource.c
index 0127762..5b852bc 100644
--- a/src/protocols/fanin/xsource.c
+++ b/src/protocols/fanin/xsource.c
@@ -43,6 +43,7 @@
static void nn_xsource_term (struct nn_xsource *self);
/* Implementation of nn_sockbase's virtual functions. */
+static void nn_xsource_close (struct nn_sockbase *self);
static void nn_xsource_destroy (struct nn_sockbase *self);
static int nn_xsource_add (struct nn_sockbase *self, struct nn_pipe *pipe);
static void nn_xsource_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
@@ -55,6 +56,7 @@
static int nn_xsource_getopt (struct nn_sockbase *self, int level, int option,
void *optval, size_t *optvallen);
static const struct nn_sockbase_vfptr nn_xsource_sockbase_vfptr = {
+ nn_xsource_close,
nn_xsource_destroy,
nn_xsource_add,
nn_xsource_rm,
@@ -87,6 +89,12 @@
nn_sockbase_term (&self->sockbase);
}
+void nn_xsource_close (struct nn_sockbase *self)
+{
+ /* Nothing special to do done. The object is closed straight away. */
+ nn_sockbase_closed (self);
+}
+
void nn_xsource_destroy (struct nn_sockbase *self)
{
struct nn_xsource *xsource;
diff --git a/src/protocols/fanout/xpull.c b/src/protocols/fanout/xpull.c
index 3ebacc4..b421a1e 100644
--- a/src/protocols/fanout/xpull.c
+++ b/src/protocols/fanout/xpull.c
@@ -43,6 +43,7 @@
static void nn_xpull_term (struct nn_xpull *self);
/* Implementation of nn_sockbase's virtual functions. */
+static void nn_xpull_close (struct nn_sockbase *self);
static void nn_xpull_destroy (struct nn_sockbase *self);
static int nn_xpull_add (struct nn_sockbase *self, struct nn_pipe *pipe);
static void nn_xpull_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
@@ -55,6 +56,7 @@
static int nn_xpull_getopt (struct nn_sockbase *self, int level, int option,
void *optval, size_t *optvallen);
static const struct nn_sockbase_vfptr nn_xpull_sockbase_vfptr = {
+ nn_xpull_close,
nn_xpull_destroy,
nn_xpull_add,
nn_xpull_rm,
@@ -87,6 +89,12 @@
nn_sockbase_term (&self->sockbase);
}
+void nn_xpull_close (struct nn_sockbase *self)
+{
+ /* Nothing special to do done. The object is closed straight away. */
+ nn_sockbase_closed (self);
+}
+
void nn_xpull_destroy (struct nn_sockbase *self)
{
struct nn_xpull *xpull;
diff --git a/src/protocols/fanout/xpush.c b/src/protocols/fanout/xpush.c
index a05d05a..79d5f4a 100644
--- a/src/protocols/fanout/xpush.c
+++ b/src/protocols/fanout/xpush.c
@@ -47,6 +47,7 @@
static void nn_xpush_term (struct nn_xpush *self);
/* Implementation of nn_sockbase's virtual functions. */
+static void nn_xpush_close (struct nn_sockbase *self);
static void nn_xpush_destroy (struct nn_sockbase *self);
static int nn_xpush_add (struct nn_sockbase *self, struct nn_pipe *pipe);
static void nn_xpush_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
@@ -59,6 +60,7 @@
static int nn_xpush_getopt (struct nn_sockbase *self, int level, int option,
void *optval, size_t *optvallen);
static const struct nn_sockbase_vfptr nn_xpush_sockbase_vfptr = {
+ nn_xpush_close,
nn_xpush_destroy,
nn_xpush_add,
nn_xpush_rm,
@@ -91,6 +93,12 @@
nn_sockbase_term (&self->sockbase);
}
+void nn_xpush_close (struct nn_sockbase *self)
+{
+ /* Nothing special to do done. The object is closed straight away. */
+ nn_sockbase_closed (self);
+}
+
void nn_xpush_destroy (struct nn_sockbase *self)
{
struct nn_xpush *xpush;
diff --git a/src/protocols/pubsub/pub.c b/src/protocols/pubsub/pub.c
index 4c56bd1..42d7650 100644
--- a/src/protocols/pubsub/pub.c
+++ b/src/protocols/pubsub/pub.c
@@ -53,6 +53,7 @@
static void nn_pub_term (struct nn_pub *self);
/* Implementation of nn_sockbase's virtual functions. */
+static void nn_pub_close (struct nn_sockbase *self);
static void nn_pub_destroy (struct nn_sockbase *self);
static int nn_pub_add (struct nn_sockbase *self, struct nn_pipe *pipe);
static void nn_pub_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
@@ -65,6 +66,7 @@
static int nn_pub_getopt (struct nn_sockbase *self, int level, int option,
void *optval, size_t *optvallen);
static const struct nn_sockbase_vfptr nn_pub_sockbase_vfptr = {
+ nn_pub_close,
nn_pub_destroy,
nn_pub_add,
nn_pub_rm,
@@ -97,6 +99,12 @@
nn_sockbase_term (&self->sockbase);
}
+void nn_pub_close (struct nn_sockbase *self)
+{
+ /* Nothing special to do done. The object is closed straight away. */
+ nn_sockbase_closed (self);
+}
+
void nn_pub_destroy (struct nn_sockbase *self)
{
struct nn_pub *pub;
diff --git a/src/protocols/pubsub/sub.c b/src/protocols/pubsub/sub.c
index 09db755..cd2edbe 100644
--- a/src/protocols/pubsub/sub.c
+++ b/src/protocols/pubsub/sub.c
@@ -45,6 +45,7 @@
static void nn_sub_term (struct nn_sub *self);
/* Implementation of nn_sockbase's virtual functions. */
+static void nn_sub_close (struct nn_sockbase *self);
static void nn_sub_destroy (struct nn_sockbase *self);
static int nn_sub_add (struct nn_sockbase *self, struct nn_pipe *pipe);
static void nn_sub_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
@@ -57,6 +58,7 @@
static int nn_sub_getopt (struct nn_sockbase *self, int level, int option,
void *optval, size_t *optvallen);
static const struct nn_sockbase_vfptr nn_sub_sockbase_vfptr = {
+ nn_sub_close,
nn_sub_destroy,
nn_sub_add,
nn_sub_rm,
@@ -91,6 +93,12 @@
nn_sockbase_term (&self->sockbase);
}
+void nn_sub_close (struct nn_sockbase *self)
+{
+ /* Nothing special to do done. The object is closed straight away. */
+ nn_sockbase_closed (self);
+}
+
void nn_sub_destroy (struct nn_sockbase *self)
{
struct nn_sub *sub;
diff --git a/src/protocols/reqrep/rep.c b/src/protocols/reqrep/rep.c
index d4d1883..6318754 100644
--- a/src/protocols/reqrep/rep.c
+++ b/src/protocols/reqrep/rep.c
@@ -51,12 +51,14 @@
static void nn_rep_term (struct nn_rep *self);
/* Implementation of nn_sockbase's virtual functions. */
+static void nn_rep_close (struct nn_sockbase *self);
static void nn_rep_destroy (struct nn_sockbase *self);
static int nn_rep_events (struct nn_sockbase *self);
static int nn_rep_send (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_rep_recv (struct nn_sockbase *self, struct nn_msg *msg);
static const struct nn_sockbase_vfptr nn_rep_sockbase_vfptr = {
+ nn_rep_close,
nn_rep_destroy,
nn_xrep_add,
nn_xrep_rm,
@@ -89,6 +91,12 @@
nn_xrep_term (&self->xrep);
}
+static void nn_rep_close (struct nn_sockbase *self)
+{
+ /* Nothing special to do done. The object is closed straight away. */
+ nn_sockbase_closed (self);
+}
+
static void nn_rep_destroy (struct nn_sockbase *self)
{
struct nn_rep *rep;
diff --git a/src/protocols/reqrep/req.c b/src/protocols/reqrep/req.c
index 062d24e..b75612d 100644
--- a/src/protocols/reqrep/req.c
+++ b/src/protocols/reqrep/req.c
@@ -88,6 +88,7 @@
static void nn_req_term (struct nn_req *self);
/* Implementation of nn_sockbase's virtual functions. */
+static void nn_req_close (struct nn_sockbase *self);
static void nn_req_destroy (struct nn_sockbase *self);
static void nn_req_in (struct nn_sockbase *self, struct nn_pipe *pipe);
static void nn_req_out (struct nn_sockbase *self, struct nn_pipe *pipe);
@@ -99,6 +100,7 @@
static int nn_req_getopt (struct nn_sockbase *self, int level, int option,
void *optval, size_t *optvallen);
static const struct nn_sockbase_vfptr nn_req_sockbase_vfptr = {
+ nn_req_close,
nn_req_destroy,
nn_xreq_add,
nn_xreq_rm,
@@ -142,6 +144,11 @@
nn_xreq_term (&self->xreq);
}
+static void nn_req_close (struct nn_sockbase *self)
+{
+ nn_assert (0);
+}
+
static void nn_req_destroy (struct nn_sockbase *self)
{
struct nn_req *req;
diff --git a/src/protocols/reqrep/xrep.c b/src/protocols/reqrep/xrep.c
index 111060c..eda85ce 100644
--- a/src/protocols/reqrep/xrep.c
+++ b/src/protocols/reqrep/xrep.c
@@ -36,9 +36,11 @@
#include <string.h>
/* Private functions. */
+static void nn_xrep_close (struct nn_sockbase *self);
static void nn_xrep_destroy (struct nn_sockbase *self);
static const struct nn_sockbase_vfptr nn_xrep_sockbase_vfptr = {
+ nn_xrep_close,
nn_xrep_destroy,
nn_xrep_add,
nn_xrep_rm,
@@ -77,6 +79,12 @@
nn_sockbase_term (&self->sockbase);
}
+static void nn_xrep_close (struct nn_sockbase *self)
+{
+ /* Nothing special to do done. The object is closed straight away. */
+ nn_sockbase_closed (self);
+}
+
static void nn_xrep_destroy (struct nn_sockbase *self)
{
struct nn_xrep *xrep;
diff --git a/src/protocols/reqrep/xreq.c b/src/protocols/reqrep/xreq.c
index fe2f17a..384b522 100644
--- a/src/protocols/reqrep/xreq.c
+++ b/src/protocols/reqrep/xreq.c
@@ -37,9 +37,11 @@
};
/* Private functions. */
+static void nn_xreq_close (struct nn_sockbase *self);
static void nn_xreq_destroy (struct nn_sockbase *self);
static const struct nn_sockbase_vfptr nn_xreq_sockbase_vfptr = {
+ nn_xreq_close,
nn_xreq_destroy,
nn_xreq_add,
nn_xreq_rm,
@@ -73,6 +75,12 @@
nn_sockbase_term (&self->sockbase);
}
+static void nn_xreq_close (struct nn_sockbase *self)
+{
+ /* Nothing special to do done. The object is closed straight away. */
+ nn_sockbase_closed (self);
+}
+
static void nn_xreq_destroy (struct nn_sockbase *self)
{
struct nn_xreq *xreq;
diff --git a/src/protocols/survey/respondent.c b/src/protocols/survey/respondent.c
index ab0197d..b6ab8d1 100644
--- a/src/protocols/survey/respondent.c
+++ b/src/protocols/survey/respondent.c
@@ -50,11 +50,13 @@
static void nn_respondent_term (struct nn_respondent *self);
/* Implementation of nn_sockbase's virtual functions. */
+static void nn_respondent_close (struct nn_sockbase *self);
static void nn_respondent_destroy (struct nn_sockbase *self);
static int nn_respondent_events (struct nn_sockbase *self);
static int nn_respondent_send (struct nn_sockbase *self, struct nn_msg *msg);
static int nn_respondent_recv (struct nn_sockbase *self, struct nn_msg *msg);
static const struct nn_sockbase_vfptr nn_respondent_sockbase_vfptr = {
+ nn_respondent_close,
nn_respondent_destroy,
nn_xrespondent_add,
nn_xrespondent_rm,
@@ -85,6 +87,12 @@
nn_xrespondent_term (&self->xrespondent);
}
+void nn_respondent_close (struct nn_sockbase *self)
+{
+ /* Nothing special to do done. The object is closed straight away. */
+ nn_sockbase_closed (self);
+}
+
void nn_respondent_destroy (struct nn_sockbase *self)
{
struct nn_respondent *respondent;
diff --git a/src/protocols/survey/surveyor.c b/src/protocols/survey/surveyor.c
index 72d4040..460c3b0 100644
--- a/src/protocols/survey/surveyor.c
+++ b/src/protocols/survey/surveyor.c
@@ -56,6 +56,7 @@
static void nn_surveyor_term (struct nn_surveyor *self);
/* Implementation of nn_sockbase's virtual functions. */
+static void nn_surveyor_close (struct nn_sockbase *self);
static void nn_surveyor_destroy (struct nn_sockbase *self);
static int nn_surveyor_events (struct nn_sockbase *self);
static int nn_surveyor_send (struct nn_sockbase *self, struct nn_msg *msg);
@@ -65,8 +66,7 @@
static int nn_surveyor_getopt (struct nn_sockbase *self, int level, int option,
void *optval, size_t *optvallen);
static const struct nn_sockbase_vfptr nn_surveyor_sockbase_vfptr = {
- 0,
- nn_xsurveyor_ispeer,
+ nn_surveyor_close,
nn_surveyor_destroy,
nn_xsurveyor_add,
nn_xsurveyor_rm,
@@ -123,6 +123,11 @@
void nn_surveyor_destroy (struct nn_sockbase *self)
{
+ nn_assert (0);
+}
+
+void nn_surveyor_destroy (struct nn_sockbase *self)
+{
struct nn_surveyor *surveyor;
surveyor = nn_cont (self, struct nn_surveyor, xsurveyor.sockbase);
diff --git a/src/protocols/survey/xrespondent.c b/src/protocols/survey/xrespondent.c
index 6fc3ebd..d1568b1 100644
--- a/src/protocols/survey/xrespondent.c
+++ b/src/protocols/survey/xrespondent.c
@@ -32,10 +32,12 @@
#include "../../utils/list.h"
/* Private functions. */
+static void nn_xrespondent_close (struct nn_sockbase *self);
static void nn_xrespondent_destroy (struct nn_sockbase *self);
/* Implementation of nn_sockbase's virtual functions. */
static const struct nn_sockbase_vfptr nn_xrespondent_sockbase_vfptr = {
+ nn_xrespondent_close,
nn_xrespondent_destroy,
nn_xrespondent_add,
nn_xrespondent_rm,
@@ -68,6 +70,12 @@
nn_sockbase_term (&self->sockbase);
}
+static void nn_xrespondent_close (struct nn_sockbase *self)
+{
+ /* Nothing special to do done. The object is closed straight away. */
+ nn_sockbase_closed (self);
+}
+
static void nn_xrespondent_destroy (struct nn_sockbase *self)
{
struct nn_xrespondent *xrespondent;
diff --git a/src/protocols/survey/xsurveyor.c b/src/protocols/survey/xsurveyor.c
index 4999a9a..e333c71 100644
--- a/src/protocols/survey/xsurveyor.c
+++ b/src/protocols/survey/xsurveyor.c
@@ -35,10 +35,12 @@
#include <stddef.h>
/* Private functions. */
+static void nn_xsurveyor_close (struct nn_sockbase *self);
static void nn_xsurveyor_destroy (struct nn_sockbase *self);
/* Implementation of nn_sockbase's virtual functions. */
static const struct nn_sockbase_vfptr nn_xsurveyor_sockbase_vfptr = {
+ nn_xsurveyor_close,
nn_xsurveyor_destroy,
nn_xsurveyor_add,
nn_xsurveyor_rm,
@@ -73,6 +75,12 @@
nn_sockbase_term (&self->sockbase);
}
+static void nn_xsurveyor_close (struct nn_sockbase *self)
+{
+ /* Nothing special to do done. The object is closed straight away. */
+ nn_sockbase_closed (self);
+}
+
static void nn_xsurveyor_destroy (struct nn_sockbase *self)
{
struct nn_xsurveyor *xsurveyor;
diff --git a/src/transports/inproc/inprocc.c b/src/transports/inproc/inprocc.c
index 2acb71c..c6b4ad7 100644
--- a/src/transports/inproc/inprocc.c
+++ b/src/transports/inproc/inprocc.c
@@ -65,9 +65,9 @@
return socktype;
}
-int nn_inprocc_ispeer (struct nn_inprocc *self, int socktype)
+int nn_inprocc_ispeer (struct nn_inprocc *self, int protocol)
{
- return nn_epbase_ispeer (&self->epbase, socktype);
+ return nn_epbase_ispeer (&self->epbase, protocol);
}
void nn_inprocc_add_pipe (struct nn_inprocc *self, struct nn_msgpipe *pipe)
diff --git a/src/transports/utils/cstream.c b/src/transports/utils/cstream.c
index 7ce18ba..6c04c5c 100644
--- a/src/transports/utils/cstream.c
+++ b/src/transports/utils/cstream.c
@@ -94,8 +94,9 @@
{
nn_assert (self->state == NN_CSTREAM_STATE_CLOSED);
- /* At this point we assume that stream, usock and timer are already
- closed. */
+ nn_timer_term (&self->retry_timer);
+ nn_stream_term (&self->stream);
+ nn_usock_term (&self->usock);
nn_epbase_term (&self->epbase);
}
@@ -286,16 +287,9 @@
if (source == &cstream->stream) {
switch (type) {
case NN_STREAM_CLOSED:
-
- /* Stream state machine is closed. We can deallocate it now. */
- nn_stream_term (&cstream->stream);
-
- /* Start closing the underlying socket itself. */
nn_usock_close (&cstream->usock);
cstream->state = NN_CSTREAM_STATE_CLOSING_USOCK;
-
return;
-
default:
nn_assert (0);
}
@@ -309,7 +303,6 @@
if (source == &cstream->usock) {
switch (type) {
case NN_USOCK_CLOSED:
- nn_usock_term (&cstream->usock);
cstream->state = NN_CSTREAM_STATE_CLOSED;
nn_epbase_closed (&cstream->epbase);
return;
diff --git a/src/transports/utils/stream.c b/src/transports/utils/stream.c
index dc4ecdb..0ecb70c 100644
--- a/src/transports/utils/stream.c
+++ b/src/transports/utils/stream.c
@@ -31,11 +31,11 @@
#include <stdint.h>
/* Possible states of object. */
-#define NN_STREAM_STATE_INIT 0
-#define NN_STREAM_STATE_SENDING_PROTOHDR 1
-#define NN_STREAM_STATE_RECEIVING_PROTOHDR 2
-#define NN_STREAM_STATE_DISABLING_TIMER 3
-#define NN_STREAM_STATE_ACTIVE 4
+#define NN_STREAM_STATE_INIT 1
+#define NN_STREAM_STATE_SENDING_PROTOHDR 2
+#define NN_STREAM_STATE_RECEIVING_PROTOHDR 3
+#define NN_STREAM_STATE_DISABLING_TIMER 4
+#define NN_STREAM_STATE_ACTIVE 5
#define NN_STREAM_STATE_CLOSING_TIMER 6
#define NN_STREAM_STATE_CLOSED 7