bind and connect functions moved to nn_ins
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/transports/inproc/binproc.c b/src/transports/inproc/binproc.c
index 2b4c97b..3fe6e70 100644
--- a/src/transports/inproc/binproc.c
+++ b/src/transports/inproc/binproc.c
@@ -23,7 +23,7 @@
#include "binproc.h"
#include "sinproc.h"
#include "cinproc.h"
-#include "inproc.h"
+#include "ins.h"
#include "../../utils/err.h"
#include "../../utils/cont.h"
@@ -127,7 +127,7 @@
/* First, unregister the endpoint from the global repository of inproc
endpoints. This way, new connections cannot be created anymore. */
- nn_inproc_unbind (binproc);
+ nn_ins_unbind (&binproc->item);
/* Stop the existing connections. */
for (it = nn_list_begin (&binproc->sinprocs);
diff --git a/src/transports/inproc/cinproc.c b/src/transports/inproc/cinproc.c
index bd5da13..c112b1b 100644
--- a/src/transports/inproc/cinproc.c
+++ b/src/transports/inproc/cinproc.c
@@ -22,7 +22,7 @@
#include "cinproc.h"
#include "binproc.h"
-#include "inproc.h"
+#include "ins.h"
#include "../../utils/err.h"
#include "../../utils/cont.h"
@@ -121,7 +121,7 @@
/* First, unregister the endpoint from the global repository of inproc
endpoints. This way, new connections cannot be created anymore. */
- nn_inproc_disconnect (cinproc);
+ nn_ins_disconnect (&cinproc->item);
/* Stop the existing connection. */
nn_sinproc_stop (&cinproc->sinproc);
diff --git a/src/transports/inproc/inproc.c b/src/transports/inproc/inproc.c
index 727c340..86104dd 100644
--- a/src/transports/inproc/inproc.c
+++ b/src/transports/inproc/inproc.c
@@ -21,18 +21,12 @@
*/
#include "inproc.h"
+#include "ins.h"
#include "binproc.h"
#include "cinproc.h"
#include "../../inproc.h"
-#include "../../utils/mutex.h"
-#include "../../utils/alloc.h"
-#include "../../utils/list.h"
-#include "../../utils/cont.h"
-#include "../../utils/fast.h"
-#include "../../utils/err.h"
-
#include <string.h>
/* nn_transport interface. */
@@ -56,141 +50,25 @@
struct nn_transport *nn_inproc = &nn_inproc_vfptr;
-struct nn_inproc {
-
- /* Synchronises access to this object. Note that 'connected' members in
- sinproc object are synchronised by this mutex as well. */
- struct nn_mutex sync;
-
- /* List of all bound inproc endpoints. */
- /* TODO: O(n) lookup, shouldn't we do better? Hash? */
- struct nn_list bound;
-
- /* List of all connected inproc endpoints. */
- /* TODO: O(n) lookup, shouldn't we do better? Hash? */
- struct nn_list connected;
-};
-
-/* Global instance of the nn_inproc object. It contains the lists of all
- inproc endpoints in the current process. */
-static struct nn_inproc self;
-
static void nn_inproc_init (void)
{
- nn_mutex_init (&self.sync);
- nn_list_init (&self.bound);
- nn_list_init (&self.connected);
+ nn_ins_init ();
}
+
static void nn_inproc_term (void)
{
- nn_list_term (&self.connected);
- nn_list_term (&self.bound);
- nn_mutex_term (&self.sync);
+ nn_ins_term ();
}
static int nn_inproc_bind (const char *addr, void *hint,
struct nn_epbase **epbase)
{
- struct nn_list_item *it;
- struct nn_binproc *binproc;
- struct nn_cinproc *cinproc;
-
- nn_mutex_lock (&self.sync);
-
- /* Check whether the endpoint isn't already bound. */
- /* TODO: This is an O(n) algorithm! */
- for (it = nn_list_begin (&self.bound); it != nn_list_end (&self.bound);
- it = nn_list_next (&self.bound, it)) {
- binproc = nn_cont (it, struct nn_binproc, item.item);
- if (strncmp (addr, nn_binproc_getaddr (binproc),
- NN_SOCKADDR_MAX) == 0) {
- nn_mutex_unlock (&self.sync);
- return -EADDRINUSE;
- }
- }
-
- /* Insert the entry into the endpoint repository. */
- binproc = nn_binproc_create (hint);
- nn_list_insert (&self.bound, &binproc->item.item,
- nn_list_end (&self.bound));
-
- /* During this process new pipes may be created. */
- for (it = nn_list_begin (&self.connected);
- it != nn_list_end (&self.connected);
- it = nn_list_next (&self.connected, it)) {
- cinproc = nn_cont (it, struct nn_cinproc, item.item);
- if (strncmp (addr, nn_cinproc_getaddr (cinproc),
- NN_SOCKADDR_MAX) == 0) {
-
- /* Check whether the two sockets are compatible. */
- if (!nn_epbase_ispeer (&binproc->item.epbase,
- cinproc->item.protocol))
- continue;
-
- nn_assert (cinproc->item.connects == 0);
- cinproc->item.connects = 1;
- nn_binproc_connect (binproc, cinproc);
- }
- }
-
- nn_assert (epbase);
- *epbase = &binproc->item.epbase;
- nn_mutex_unlock (&self.sync);
-
- return 0;
+ return nn_ins_bind (addr, hint, epbase);
}
static int nn_inproc_connect (const char *addr, void *hint,
struct nn_epbase **epbase)
{
- struct nn_list_item *it;
- struct nn_cinproc *cinproc;
- struct nn_binproc *binproc;
-
- nn_mutex_lock (&self.sync);
-
- /* Insert the entry into the endpoint repository. */
- cinproc = nn_cinproc_create (hint);
- nn_list_insert (&self.connected, &cinproc->item.item,
- nn_list_end (&self.connected));
-
- /* During this process a pipe may be created. */
- for (it = nn_list_begin (&self.bound);
- it != nn_list_end (&self.bound);
- it = nn_list_next (&self.bound, it)) {
- binproc = nn_cont (it, struct nn_binproc, item.item);
- if (strncmp (addr, nn_binproc_getaddr (binproc),
- NN_SOCKADDR_MAX) == 0) {
-
- /* Check whether the two sockets are compatible. */
- if (!nn_epbase_ispeer (&cinproc->item.epbase,
- binproc->item.protocol))
- break;
-
- ++binproc->item.connects;
- nn_cinproc_connect (cinproc, binproc);
- break;
- }
- }
-
- nn_assert (epbase);
- *epbase = &cinproc->item.epbase;
- nn_mutex_unlock (&self.sync);
-
- return 0;
-}
-
-void nn_inproc_disconnect (struct nn_cinproc *cinproc)
-{
- nn_mutex_lock (&self.sync);
- nn_list_erase (&self.connected, &cinproc->item.item);
- nn_mutex_unlock (&self.sync);
-}
-
-void nn_inproc_unbind (struct nn_binproc *binproc)
-{
- nn_mutex_lock (&self.sync);
- nn_list_erase (&self.bound, &binproc->item.item);
- nn_mutex_unlock (&self.sync);
+ return nn_ins_connect (addr, hint, epbase);
}
diff --git a/src/transports/inproc/inproc.h b/src/transports/inproc/inproc.h
index 7197c40..c43b691 100644
--- a/src/transports/inproc/inproc.h
+++ b/src/transports/inproc/inproc.h
@@ -25,12 +25,6 @@
#include "../../transport.h"
-struct nn_binproc;
-struct nn_cinproc;
-
extern struct nn_transport *nn_inproc;
-void nn_inproc_disconnect (struct nn_cinproc *cinproc);
-void nn_inproc_unbind (struct nn_binproc *binproc);
-
#endif
diff --git a/src/transports/inproc/ins.c b/src/transports/inproc/ins.c
index d0d4f9b..bf3f511 100644
--- a/src/transports/inproc/ins.c
+++ b/src/transports/inproc/ins.c
@@ -22,8 +22,34 @@
#include "ins.h"
+#include "binproc.h"
+#include "cinproc.h"
+
+#include "../../utils/mutex.h"
+#include "../../utils/alloc.h"
+#include "../../utils/list.h"
+#include "../../utils/cont.h"
+#include "../../utils/fast.h"
#include "../../utils/err.h"
+struct nn_ins {
+
+ /* Synchronises access to this object. */
+ struct nn_mutex sync;
+
+ /* List of all bound inproc endpoints. */
+ /* TODO: O(n) lookup, shouldn't we do better? Hash? */
+ struct nn_list bound;
+
+ /* List of all connected inproc endpoints. */
+ /* TODO: O(n) lookup, shouldn't we do better? Hash? */
+ struct nn_list connected;
+};
+
+/* Global instance of the nn_ins object. It contains the lists of all
+ inproc endpoints in the current process. */
+static struct nn_ins self;
+
void nn_ins_item_init (struct nn_ins_item *self,
const struct nn_epbase_vfptr *vfptr, void *hint)
{
@@ -45,3 +71,120 @@
nn_epbase_term (&self->epbase);
}
+void nn_ins_init (void)
+{
+ nn_mutex_init (&self.sync);
+ nn_list_init (&self.bound);
+ nn_list_init (&self.connected);
+}
+void nn_ins_term (void)
+{
+ nn_list_term (&self.connected);
+ nn_list_term (&self.bound);
+ nn_mutex_term (&self.sync);
+}
+
+int nn_ins_bind (const char *addr, void *hint, struct nn_epbase **epbase)
+{
+ struct nn_list_item *it;
+ struct nn_binproc *binproc;
+ struct nn_cinproc *cinproc;
+
+ nn_mutex_lock (&self.sync);
+
+ /* Check whether the endpoint isn't already bound. */
+ /* TODO: This is an O(n) algorithm! */
+ for (it = nn_list_begin (&self.bound); it != nn_list_end (&self.bound);
+ it = nn_list_next (&self.bound, it)) {
+ binproc = nn_cont (it, struct nn_binproc, item.item);
+ if (strncmp (addr, nn_binproc_getaddr (binproc),
+ NN_SOCKADDR_MAX) == 0) {
+ nn_mutex_unlock (&self.sync);
+ return -EADDRINUSE;
+ }
+ }
+
+ /* Insert the entry into the endpoint repository. */
+ binproc = nn_binproc_create (hint);
+ nn_list_insert (&self.bound, &binproc->item.item,
+ nn_list_end (&self.bound));
+
+ /* During this process new pipes may be created. */
+ for (it = nn_list_begin (&self.connected);
+ it != nn_list_end (&self.connected);
+ it = nn_list_next (&self.connected, it)) {
+ cinproc = nn_cont (it, struct nn_cinproc, item.item);
+ if (strncmp (addr, nn_cinproc_getaddr (cinproc),
+ NN_SOCKADDR_MAX) == 0) {
+
+ /* Check whether the two sockets are compatible. */
+ if (!nn_epbase_ispeer (&binproc->item.epbase,
+ cinproc->item.protocol))
+ continue;
+
+ nn_assert (cinproc->item.connects == 0);
+ cinproc->item.connects = 1;
+ nn_binproc_connect (binproc, cinproc);
+ }
+ }
+
+ nn_assert (epbase);
+ *epbase = &binproc->item.epbase;
+ nn_mutex_unlock (&self.sync);
+
+ return 0;
+}
+
+int nn_ins_connect (const char *addr, void *hint, struct nn_epbase **epbase)
+{
+ struct nn_list_item *it;
+ struct nn_cinproc *cinproc;
+ struct nn_binproc *binproc;
+
+ nn_mutex_lock (&self.sync);
+
+ /* Insert the entry into the endpoint repository. */
+ cinproc = nn_cinproc_create (hint);
+ nn_list_insert (&self.connected, &cinproc->item.item,
+ nn_list_end (&self.connected));
+
+ /* During this process a pipe may be created. */
+ for (it = nn_list_begin (&self.bound);
+ it != nn_list_end (&self.bound);
+ it = nn_list_next (&self.bound, it)) {
+ binproc = nn_cont (it, struct nn_binproc, item.item);
+ if (strncmp (addr, nn_binproc_getaddr (binproc),
+ NN_SOCKADDR_MAX) == 0) {
+
+ /* Check whether the two sockets are compatible. */
+ if (!nn_epbase_ispeer (&cinproc->item.epbase,
+ binproc->item.protocol))
+ break;
+
+ ++binproc->item.connects;
+ nn_cinproc_connect (cinproc, binproc);
+ break;
+ }
+ }
+
+ nn_assert (epbase);
+ *epbase = &cinproc->item.epbase;
+ nn_mutex_unlock (&self.sync);
+
+ return 0;
+}
+
+void nn_ins_disconnect (struct nn_ins_item *item)
+{
+ nn_mutex_lock (&self.sync);
+ nn_list_erase (&self.connected, &item->item);
+ nn_mutex_unlock (&self.sync);
+}
+
+void nn_ins_unbind (struct nn_ins_item *item)
+{
+ nn_mutex_lock (&self.sync);
+ nn_list_erase (&self.bound, &item->item);
+ nn_mutex_unlock (&self.sync);
+}
+
diff --git a/src/transports/inproc/ins.h b/src/transports/inproc/ins.h
index d0421f2..306b786 100644
--- a/src/transports/inproc/ins.h
+++ b/src/transports/inproc/ins.h
@@ -27,6 +27,8 @@
#include "../../utils/list.h"
+/* Inproc naming system. A global repository of inproc endpoints. */
+
struct nn_ins_item {
/* Every ins_item is an endpoint. */
@@ -48,5 +50,13 @@
const struct nn_epbase_vfptr *vfptr, void *hint);
void nn_ins_item_term (struct nn_ins_item *self);
+void nn_ins_init (void);
+void nn_ins_term (void);
+
+int nn_ins_bind (const char *addr, void *hint, struct nn_epbase **epbase);
+int nn_ins_connect (const char *addr, void *hint, struct nn_epbase **epbase);
+void nn_ins_disconnect (struct nn_ins_item *item);
+void nn_ins_unbind (struct nn_ins_item *item);
+
#endif