"close" step added to the shutdown of individual socket types

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 1d11031..01bb9a9 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -1,5 +1,5 @@
 #
-#   Copyright (c) 2012 250bpm s.r.o.
+#   Copyright (c) 2012-2013 250bpm s.r.o.
 #
 #   Permission is hereby granted, free of charge, to any person obtaining a copy
 #   of this software and associated documentation files (the "Software"),
diff --git a/src/protocol.h b/src/protocol.h
index 382e701..83877aa 100644
--- a/src/protocol.h
+++ b/src/protocol.h
@@ -71,7 +71,7 @@
 
 struct nn_sockbase;
 
-/*  Event generated by the sockbase object. Not to be used directly. */
+/*  Event generated by the sockbase object. Do not use it directly! */
 #define NN_SOCKBASE_CLOSED 1
 
 /*  Any combination of these events can be returned from 'events' virtual
@@ -88,9 +88,9 @@
     /*  Deallocate the socket. */
     void (*destroy) (struct nn_sockbase *self);
 
-    /*  Management of pipes. 'add' registers a new pipe. The pipe cannot to
-        send to or received from at the moment. 'rm' unregisters the pipe.
-        The pipe should not be used after this call as it may already be
+    /*  Management of pipes. 'add' registers a new pipe. The pipe cannot be used
+        to send to or to be received from at the moment. 'rm' unregisters the
+        pipe. The pipe should not be used after this call as it may already be
         deallocated. 'in' informs the socket that pipe is readable. 'out'
         informs it that it is writeable. */
     int (*add) (struct nn_sockbase *self, struct nn_pipe *pipe);
@@ -99,7 +99,7 @@
     void (*out) (struct nn_sockbase *self, struct nn_pipe *pipe);
 
     /*  Return any combination of event flags defined above, thus specifying
-        whether the socket should be readable, writeable or none. */
+        whether the socket should be readable, writeable, both or none. */
     int (*events) (struct nn_sockbase *self);
 
     /*  Send a message to the socket. Returns -EAGAIN if it cannot be done at
@@ -125,11 +125,12 @@
     struct nn_fsm_event event_closed;
 };
 
-/*  Initialise the socket. */
+/*  Initialise the socket base class. 'hint' is the opaque value passed to the
+    nn_transport's 'create' function. */
 int nn_sockbase_init (struct nn_sockbase *self,
     const struct nn_sockbase_vfptr *vfptr, void *hint);
 
-/*  Uninitialise the socket. */
+/*  Terminate the socket base class. */
 void nn_sockbase_term (struct nn_sockbase *self);
 
 /*  Call this function when closing is done. */
diff --git a/src/protocols/fanin/xsink.c b/src/protocols/fanin/xsink.c
index ddb2a41..faecbf9 100644
--- a/src/protocols/fanin/xsink.c
+++ b/src/protocols/fanin/xsink.c
@@ -47,6 +47,7 @@
 static void nn_xsink_term (struct nn_xsink *self);
 
 /*  Implementation of nn_sockbase's virtual functions. */
+static void nn_xsink_close (struct nn_sockbase *self);
 static void nn_xsink_destroy (struct nn_sockbase *self);
 static int nn_xsink_add (struct nn_sockbase *self, struct nn_pipe *pipe);
 static void nn_xsink_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
@@ -59,6 +60,7 @@
 static int nn_xsink_getopt (struct nn_sockbase *self, int level, int option,
     void *optval, size_t *optvallen);
 static const struct nn_sockbase_vfptr nn_xsink_sockbase_vfptr = {
+    nn_xsink_close,
     nn_xsink_destroy,
     nn_xsink_add,
     nn_xsink_rm,
@@ -91,6 +93,12 @@
     nn_sockbase_term (&self->sockbase);
 }
 
+void nn_xsink_close (struct nn_sockbase *self)
+{
+    /*  Nothing special to do done. The object is closed straight away. */
+    nn_sockbase_closed (self); 
+}
+
 void nn_xsink_destroy (struct nn_sockbase *self)
 {
     struct nn_xsink *xsink;
diff --git a/src/protocols/fanin/xsource.c b/src/protocols/fanin/xsource.c
index 0127762..5b852bc 100644
--- a/src/protocols/fanin/xsource.c
+++ b/src/protocols/fanin/xsource.c
@@ -43,6 +43,7 @@
 static void nn_xsource_term (struct nn_xsource *self);
 
 /*  Implementation of nn_sockbase's virtual functions. */
+static void nn_xsource_close (struct nn_sockbase *self);
 static void nn_xsource_destroy (struct nn_sockbase *self);
 static int nn_xsource_add (struct nn_sockbase *self, struct nn_pipe *pipe);
 static void nn_xsource_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
@@ -55,6 +56,7 @@
 static int nn_xsource_getopt (struct nn_sockbase *self, int level, int option,
     void *optval, size_t *optvallen);
 static const struct nn_sockbase_vfptr nn_xsource_sockbase_vfptr = {
+    nn_xsource_close,
     nn_xsource_destroy,
     nn_xsource_add,
     nn_xsource_rm,
@@ -87,6 +89,12 @@
     nn_sockbase_term (&self->sockbase);
 }
 
+void nn_xsource_close (struct nn_sockbase *self)
+{
+    /*  Nothing special to do done. The object is closed straight away. */
+    nn_sockbase_closed (self); 
+}
+
 void nn_xsource_destroy (struct nn_sockbase *self)
 {
     struct nn_xsource *xsource;
diff --git a/src/protocols/fanout/xpull.c b/src/protocols/fanout/xpull.c
index 3ebacc4..b421a1e 100644
--- a/src/protocols/fanout/xpull.c
+++ b/src/protocols/fanout/xpull.c
@@ -43,6 +43,7 @@
 static void nn_xpull_term (struct nn_xpull *self);
 
 /*  Implementation of nn_sockbase's virtual functions. */
+static void nn_xpull_close (struct nn_sockbase *self);
 static void nn_xpull_destroy (struct nn_sockbase *self);
 static int nn_xpull_add (struct nn_sockbase *self, struct nn_pipe *pipe);
 static void nn_xpull_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
@@ -55,6 +56,7 @@
 static int nn_xpull_getopt (struct nn_sockbase *self, int level, int option,
     void *optval, size_t *optvallen);
 static const struct nn_sockbase_vfptr nn_xpull_sockbase_vfptr = {
+    nn_xpull_close,
     nn_xpull_destroy,
     nn_xpull_add,
     nn_xpull_rm,
@@ -87,6 +89,12 @@
     nn_sockbase_term (&self->sockbase);
 }
 
+void nn_xpull_close (struct nn_sockbase *self)
+{
+    /*  Nothing special to do done. The object is closed straight away. */
+    nn_sockbase_closed (self);
+}
+
 void nn_xpull_destroy (struct nn_sockbase *self)
 {
     struct nn_xpull *xpull;
diff --git a/src/protocols/fanout/xpush.c b/src/protocols/fanout/xpush.c
index a05d05a..79d5f4a 100644
--- a/src/protocols/fanout/xpush.c
+++ b/src/protocols/fanout/xpush.c
@@ -47,6 +47,7 @@
 static void nn_xpush_term (struct nn_xpush *self);
 
 /*  Implementation of nn_sockbase's virtual functions. */
+static void nn_xpush_close (struct nn_sockbase *self);
 static void nn_xpush_destroy (struct nn_sockbase *self);
 static int nn_xpush_add (struct nn_sockbase *self, struct nn_pipe *pipe);
 static void nn_xpush_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
@@ -59,6 +60,7 @@
 static int nn_xpush_getopt (struct nn_sockbase *self, int level, int option,
     void *optval, size_t *optvallen);
 static const struct nn_sockbase_vfptr nn_xpush_sockbase_vfptr = {
+    nn_xpush_close,
     nn_xpush_destroy,
     nn_xpush_add,
     nn_xpush_rm,
@@ -91,6 +93,12 @@
     nn_sockbase_term (&self->sockbase);
 }
 
+void nn_xpush_close (struct nn_sockbase *self)
+{
+    /*  Nothing special to do done. The object is closed straight away. */
+    nn_sockbase_closed (self);
+}
+
 void nn_xpush_destroy (struct nn_sockbase *self)
 {
     struct nn_xpush *xpush;
diff --git a/src/protocols/pubsub/pub.c b/src/protocols/pubsub/pub.c
index 4c56bd1..42d7650 100644
--- a/src/protocols/pubsub/pub.c
+++ b/src/protocols/pubsub/pub.c
@@ -53,6 +53,7 @@
 static void nn_pub_term (struct nn_pub *self);
 
 /*  Implementation of nn_sockbase's virtual functions. */
+static void nn_pub_close (struct nn_sockbase *self);
 static void nn_pub_destroy (struct nn_sockbase *self);
 static int nn_pub_add (struct nn_sockbase *self, struct nn_pipe *pipe);
 static void nn_pub_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
@@ -65,6 +66,7 @@
 static int nn_pub_getopt (struct nn_sockbase *self, int level, int option,
     void *optval, size_t *optvallen);
 static const struct nn_sockbase_vfptr nn_pub_sockbase_vfptr = {
+    nn_pub_close,
     nn_pub_destroy,
     nn_pub_add,
     nn_pub_rm,
@@ -97,6 +99,12 @@
     nn_sockbase_term (&self->sockbase);
 }
 
+void nn_pub_close (struct nn_sockbase *self)
+{
+    /*  Nothing special to do done. The object is closed straight away. */
+    nn_sockbase_closed (self);
+}
+
 void nn_pub_destroy (struct nn_sockbase *self)
 {
     struct nn_pub *pub;
diff --git a/src/protocols/pubsub/sub.c b/src/protocols/pubsub/sub.c
index 09db755..cd2edbe 100644
--- a/src/protocols/pubsub/sub.c
+++ b/src/protocols/pubsub/sub.c
@@ -45,6 +45,7 @@
 static void nn_sub_term (struct nn_sub *self);
 
 /*  Implementation of nn_sockbase's virtual functions. */
+static void nn_sub_close (struct nn_sockbase *self);
 static void nn_sub_destroy (struct nn_sockbase *self);
 static int nn_sub_add (struct nn_sockbase *self, struct nn_pipe *pipe);
 static void nn_sub_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
@@ -57,6 +58,7 @@
 static int nn_sub_getopt (struct nn_sockbase *self, int level, int option,
     void *optval, size_t *optvallen);
 static const struct nn_sockbase_vfptr nn_sub_sockbase_vfptr = {
+    nn_sub_close,
     nn_sub_destroy,
     nn_sub_add,
     nn_sub_rm,
@@ -91,6 +93,12 @@
     nn_sockbase_term (&self->sockbase);
 }
 
+void nn_sub_close (struct nn_sockbase *self)
+{
+    /*  Nothing special to do done. The object is closed straight away. */
+    nn_sockbase_closed (self);
+}
+
 void nn_sub_destroy (struct nn_sockbase *self)
 {
     struct nn_sub *sub;
diff --git a/src/protocols/reqrep/rep.c b/src/protocols/reqrep/rep.c
index d4d1883..6318754 100644
--- a/src/protocols/reqrep/rep.c
+++ b/src/protocols/reqrep/rep.c
@@ -51,12 +51,14 @@
 static void nn_rep_term (struct nn_rep *self);
 
 /*  Implementation of nn_sockbase's virtual functions. */
+static void nn_rep_close (struct nn_sockbase *self);
 static void nn_rep_destroy (struct nn_sockbase *self);
 static int nn_rep_events (struct nn_sockbase *self);
 static int nn_rep_send (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_rep_recv (struct nn_sockbase *self, struct nn_msg *msg);
 
 static const struct nn_sockbase_vfptr nn_rep_sockbase_vfptr = {
+    nn_rep_close,
     nn_rep_destroy,
     nn_xrep_add,
     nn_xrep_rm,
@@ -89,6 +91,12 @@
     nn_xrep_term (&self->xrep);
 }
 
+static void nn_rep_close (struct nn_sockbase *self)
+{
+    /*  Nothing special to do done. The object is closed straight away. */
+    nn_sockbase_closed (self);
+}
+
 static void nn_rep_destroy (struct nn_sockbase *self)
 {
     struct nn_rep *rep;
diff --git a/src/protocols/reqrep/req.c b/src/protocols/reqrep/req.c
index 062d24e..b75612d 100644
--- a/src/protocols/reqrep/req.c
+++ b/src/protocols/reqrep/req.c
@@ -88,6 +88,7 @@
 static void nn_req_term (struct nn_req *self);
 
 /*  Implementation of nn_sockbase's virtual functions. */
+static void nn_req_close (struct nn_sockbase *self);
 static void nn_req_destroy (struct nn_sockbase *self);
 static void nn_req_in (struct nn_sockbase *self, struct nn_pipe *pipe);
 static void nn_req_out (struct nn_sockbase *self, struct nn_pipe *pipe);
@@ -99,6 +100,7 @@
 static int nn_req_getopt (struct nn_sockbase *self, int level, int option,
     void *optval, size_t *optvallen);
 static const struct nn_sockbase_vfptr nn_req_sockbase_vfptr = {
+    nn_req_close,
     nn_req_destroy,
     nn_xreq_add,
     nn_xreq_rm,
@@ -142,6 +144,11 @@
     nn_xreq_term (&self->xreq);
 }
 
+static void nn_req_close (struct nn_sockbase *self)
+{
+    nn_assert (0);
+}
+
 static void nn_req_destroy (struct nn_sockbase *self)
 {
     struct nn_req *req;
diff --git a/src/protocols/reqrep/xrep.c b/src/protocols/reqrep/xrep.c
index 111060c..eda85ce 100644
--- a/src/protocols/reqrep/xrep.c
+++ b/src/protocols/reqrep/xrep.c
@@ -36,9 +36,11 @@
 #include <string.h>
 
 /*  Private functions. */
+static void nn_xrep_close (struct nn_sockbase *self);
 static void nn_xrep_destroy (struct nn_sockbase *self);
 
 static const struct nn_sockbase_vfptr nn_xrep_sockbase_vfptr = {
+    nn_xrep_close,
     nn_xrep_destroy,
     nn_xrep_add,
     nn_xrep_rm,
@@ -77,6 +79,12 @@
     nn_sockbase_term (&self->sockbase);
 }
 
+static void nn_xrep_close (struct nn_sockbase *self)
+{
+    /*  Nothing special to do done. The object is closed straight away. */
+    nn_sockbase_closed (self);
+}
+
 static void nn_xrep_destroy (struct nn_sockbase *self)
 {
     struct nn_xrep *xrep;
diff --git a/src/protocols/reqrep/xreq.c b/src/protocols/reqrep/xreq.c
index fe2f17a..384b522 100644
--- a/src/protocols/reqrep/xreq.c
+++ b/src/protocols/reqrep/xreq.c
@@ -37,9 +37,11 @@
 };
 
 /*  Private functions. */
+static void nn_xreq_close (struct nn_sockbase *self);
 static void nn_xreq_destroy (struct nn_sockbase *self);
 
 static const struct nn_sockbase_vfptr nn_xreq_sockbase_vfptr = {
+    nn_xreq_close,
     nn_xreq_destroy,
     nn_xreq_add,
     nn_xreq_rm,
@@ -73,6 +75,12 @@
     nn_sockbase_term (&self->sockbase);
 }
 
+static void nn_xreq_close (struct nn_sockbase *self)
+{
+    /*  Nothing special to do done. The object is closed straight away. */
+    nn_sockbase_closed (self);
+}
+
 static void nn_xreq_destroy (struct nn_sockbase *self)
 {
     struct nn_xreq *xreq;
diff --git a/src/protocols/survey/respondent.c b/src/protocols/survey/respondent.c
index ab0197d..b6ab8d1 100644
--- a/src/protocols/survey/respondent.c
+++ b/src/protocols/survey/respondent.c
@@ -50,11 +50,13 @@
 static void nn_respondent_term (struct nn_respondent *self);
 
 /*  Implementation of nn_sockbase's virtual functions. */
+static void nn_respondent_close (struct nn_sockbase *self);
 static void nn_respondent_destroy (struct nn_sockbase *self);
 static int nn_respondent_events (struct nn_sockbase *self);
 static int nn_respondent_send (struct nn_sockbase *self, struct nn_msg *msg);
 static int nn_respondent_recv (struct nn_sockbase *self, struct nn_msg *msg);
 static const struct nn_sockbase_vfptr nn_respondent_sockbase_vfptr = {
+    nn_respondent_close,
     nn_respondent_destroy,
     nn_xrespondent_add,
     nn_xrespondent_rm,
@@ -85,6 +87,12 @@
     nn_xrespondent_term (&self->xrespondent);
 }
 
+void nn_respondent_close (struct nn_sockbase *self)
+{
+    /*  Nothing special to do done. The object is closed straight away. */
+    nn_sockbase_closed (self);
+}
+
 void nn_respondent_destroy (struct nn_sockbase *self)
 {
     struct nn_respondent *respondent;
diff --git a/src/protocols/survey/surveyor.c b/src/protocols/survey/surveyor.c
index 72d4040..460c3b0 100644
--- a/src/protocols/survey/surveyor.c
+++ b/src/protocols/survey/surveyor.c
@@ -56,6 +56,7 @@
 static void nn_surveyor_term (struct nn_surveyor *self);
 
 /*  Implementation of nn_sockbase's virtual functions. */
+static void nn_surveyor_close (struct nn_sockbase *self);
 static void nn_surveyor_destroy (struct nn_sockbase *self);
 static int nn_surveyor_events (struct nn_sockbase *self);
 static int nn_surveyor_send (struct nn_sockbase *self, struct nn_msg *msg);
@@ -65,8 +66,7 @@
 static int nn_surveyor_getopt (struct nn_sockbase *self, int level, int option,
     void *optval, size_t *optvallen);
 static const struct nn_sockbase_vfptr nn_surveyor_sockbase_vfptr = {
-    0,
-    nn_xsurveyor_ispeer,
+    nn_surveyor_close,
     nn_surveyor_destroy,
     nn_xsurveyor_add,
     nn_xsurveyor_rm,
@@ -123,6 +123,11 @@
 
 void nn_surveyor_destroy (struct nn_sockbase *self)
 {
+    nn_assert (0);
+}
+
+void nn_surveyor_destroy (struct nn_sockbase *self)
+{
     struct nn_surveyor *surveyor;
 
     surveyor = nn_cont (self, struct nn_surveyor, xsurveyor.sockbase);
diff --git a/src/protocols/survey/xrespondent.c b/src/protocols/survey/xrespondent.c
index 6fc3ebd..d1568b1 100644
--- a/src/protocols/survey/xrespondent.c
+++ b/src/protocols/survey/xrespondent.c
@@ -32,10 +32,12 @@
 #include "../../utils/list.h"
 
 /*  Private functions. */
+static void nn_xrespondent_close (struct nn_sockbase *self);
 static void nn_xrespondent_destroy (struct nn_sockbase *self);
 
 /*  Implementation of nn_sockbase's virtual functions. */
 static const struct nn_sockbase_vfptr nn_xrespondent_sockbase_vfptr = {
+    nn_xrespondent_close,
     nn_xrespondent_destroy,
     nn_xrespondent_add,
     nn_xrespondent_rm,
@@ -68,6 +70,12 @@
     nn_sockbase_term (&self->sockbase);
 }
 
+static void nn_xrespondent_close (struct nn_sockbase *self)
+{
+    /*  Nothing special to do done. The object is closed straight away. */
+    nn_sockbase_closed (self);
+}
+
 static void nn_xrespondent_destroy (struct nn_sockbase *self)
 {
     struct nn_xrespondent *xrespondent;
diff --git a/src/protocols/survey/xsurveyor.c b/src/protocols/survey/xsurveyor.c
index 4999a9a..e333c71 100644
--- a/src/protocols/survey/xsurveyor.c
+++ b/src/protocols/survey/xsurveyor.c
@@ -35,10 +35,12 @@
 #include <stddef.h>
 
 /*  Private functions. */
+static void nn_xsurveyor_close (struct nn_sockbase *self);
 static void nn_xsurveyor_destroy (struct nn_sockbase *self);
 
 /*  Implementation of nn_sockbase's virtual functions. */
 static const struct nn_sockbase_vfptr nn_xsurveyor_sockbase_vfptr = {
+    nn_xsurveyor_close,
     nn_xsurveyor_destroy,
     nn_xsurveyor_add,
     nn_xsurveyor_rm,
@@ -73,6 +75,12 @@
     nn_sockbase_term (&self->sockbase);
 }
 
+static void nn_xsurveyor_close (struct nn_sockbase *self)
+{
+    /*  Nothing special to do done. The object is closed straight away. */
+    nn_sockbase_closed (self);
+}
+
 static void nn_xsurveyor_destroy (struct nn_sockbase *self)
 {
     struct nn_xsurveyor *xsurveyor;
diff --git a/src/transports/inproc/inprocc.c b/src/transports/inproc/inprocc.c
index 2acb71c..c6b4ad7 100644
--- a/src/transports/inproc/inprocc.c
+++ b/src/transports/inproc/inprocc.c
@@ -65,9 +65,9 @@
     return socktype;
 }
 
-int nn_inprocc_ispeer (struct nn_inprocc *self, int socktype)
+int nn_inprocc_ispeer (struct nn_inprocc *self, int protocol)
 {
-    return nn_epbase_ispeer (&self->epbase, socktype);
+    return nn_epbase_ispeer (&self->epbase, protocol);
 }
 
 void nn_inprocc_add_pipe (struct nn_inprocc *self, struct nn_msgpipe *pipe)
diff --git a/src/transports/utils/cstream.c b/src/transports/utils/cstream.c
index 7ce18ba..6c04c5c 100644
--- a/src/transports/utils/cstream.c
+++ b/src/transports/utils/cstream.c
@@ -94,8 +94,9 @@
 {
     nn_assert (self->state == NN_CSTREAM_STATE_CLOSED);
 
-    /*  At this point we assume that stream, usock and timer are already
-        closed. */
+    nn_timer_term (&self->retry_timer);
+    nn_stream_term (&self->stream);
+    nn_usock_term (&self->usock);
     nn_epbase_term (&self->epbase);
 }
 
@@ -286,16 +287,9 @@
         if (source == &cstream->stream) {
             switch (type) {
             case NN_STREAM_CLOSED:
-
-                /*  Stream state machine is closed. We can deallocate it now. */
-                nn_stream_term (&cstream->stream);
-
-                /*  Start closing the underlying socket itself. */
                 nn_usock_close (&cstream->usock);
                 cstream->state = NN_CSTREAM_STATE_CLOSING_USOCK;
-
                 return;
-
             default:
                 nn_assert (0);
             }
@@ -309,7 +303,6 @@
         if (source == &cstream->usock) {
             switch (type) {
             case NN_USOCK_CLOSED:
-                nn_usock_term (&cstream->usock);
                 cstream->state = NN_CSTREAM_STATE_CLOSED;
                 nn_epbase_closed (&cstream->epbase);
                 return;
diff --git a/src/transports/utils/stream.c b/src/transports/utils/stream.c
index dc4ecdb..0ecb70c 100644
--- a/src/transports/utils/stream.c
+++ b/src/transports/utils/stream.c
@@ -31,11 +31,11 @@
 #include <stdint.h>
 
 /*  Possible states of object. */
-#define NN_STREAM_STATE_INIT 0
-#define NN_STREAM_STATE_SENDING_PROTOHDR 1
-#define NN_STREAM_STATE_RECEIVING_PROTOHDR 2
-#define NN_STREAM_STATE_DISABLING_TIMER 3
-#define NN_STREAM_STATE_ACTIVE 4
+#define NN_STREAM_STATE_INIT 1
+#define NN_STREAM_STATE_SENDING_PROTOHDR 2
+#define NN_STREAM_STATE_RECEIVING_PROTOHDR 3
+#define NN_STREAM_STATE_DISABLING_TIMER 4
+#define NN_STREAM_STATE_ACTIVE 5
 #define NN_STREAM_STATE_CLOSING_TIMER 6
 #define NN_STREAM_STATE_CLOSED 7