SYNC protocol; initial version
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/.gitignore b/.gitignore
index c443293..f7639c7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -80,3 +80,4 @@
tests/trie
tests/zerocopy
tests/term
+tests/sync
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 10054c2..dc559f4 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -72,6 +72,7 @@
add_libnanomsg_test (pipeline)
add_libnanomsg_test (survey)
add_libnanomsg_test (bus)
+add_libnanomsg_test (sync)
# Feature tests.
add_libnanomsg_test (block)
diff --git a/Makefile.am b/Makefile.am
index a5aa529..f9148fe 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -40,7 +40,8 @@
src/reqrep.h \
src/pipeline.h \
src/survey.h \
- src/bus.h
+ src/bus.h \
+ src/sync.h
lib_LTLIBRARIES = libnanomsg.la
@@ -206,6 +207,16 @@
src/protocols/survey/xsurveyor.h \
src/protocols/survey/xsurveyor.c
+PROTOCOLS_SYNC = \
+ src/protocols/sync/master.h \
+ src/protocols/sync/master.c \
+ src/protocols/sync/xmaster.h \
+ src/protocols/sync/xmaster.c \
+ src/protocols/sync/mirror.h \
+ src/protocols/sync/mirror.c \
+ src/protocols/sync/xmirror.h \
+ src/protocols/sync/xmirror.c
+
NANOMSG_PROTOCOLS = \
$(PROTOCOLS_BUS) \
$(PROTOCOLS_PIPELINE) \
@@ -215,8 +226,7 @@
$(PROTOCOLS_SURVEY) \
$(PROTOCOLS_UTILS) \
$(PROTOCOLS_INPROC) \
- $(PROTOCOLS_IPC) \
- $(PROTOCOLS_TCP)
+ $(PROTOCOLS_SYNC)
TRANSPORTS_UTILS = \
@@ -309,6 +319,7 @@
doc/nn_survey.txt \
doc/nn_pipeline.txt \
doc/nn_bus.txt \
+ doc/nn_sync.txt \
doc/nn_inproc.txt \
doc/nn_ipc.txt \
doc/nn_tcp.txt
@@ -421,7 +432,8 @@
tests/reqrep \
tests/pipeline \
tests/survey \
- tests/bus
+ tests/bus \
+ tests/sync
FEATURE_TESTS = \
tests/block \
diff --git a/doc/nanomsg.txt b/doc/nanomsg.txt
index 0d94c65..72b4e39 100644
--- a/doc/nanomsg.txt
+++ b/doc/nanomsg.txt
@@ -94,6 +94,9 @@
Message bus protocol::
linknanomsg:nn_bus[7]
+State synchronisation protocol::
+ linknanomsg:nn_sync[7]
+
Following transport mechanisms are provided by nanomsg:
In-process transport::
diff --git a/doc/nn_bus.txt b/doc/nn_bus.txt
index 96a372a..bcc0bb2 100644
--- a/doc/nn_bus.txt
+++ b/doc/nn_bus.txt
@@ -45,10 +45,11 @@
SEE ALSO
--------
linknanomsg:nn_pubsub[7]
-linknanomsg:nn_reqrep[7]
-linknanomsg:nn_pipeline[7]
-linknanomsg:nn_survey[7]
linknanomsg:nn_pair[7]
+linknanomsg:nn_pipeline[7]
+linknanomsg:nn_reqrep[7]
+linknanomsg:nn_survey[7]
+linknanomsg:nn_sync[7]
linknanomsg:nanomsg[7]
AUTHORS
diff --git a/doc/nn_pair.txt b/doc/nn_pair.txt
index 06d732d..985697d 100644
--- a/doc/nn_pair.txt
+++ b/doc/nn_pair.txt
@@ -49,6 +49,7 @@
linknanomsg:nn_reqrep[7]
linknanomsg:nn_pipeline[7]
linknanomsg:nn_survey[7]
+linknanomsg:nn_sync[7]
linknanomsg:nanomsg[7]
diff --git a/doc/nn_pipeline.txt b/doc/nn_pipeline.txt
index e213987..fa709c7 100644
--- a/doc/nn_pipeline.txt
+++ b/doc/nn_pipeline.txt
@@ -39,6 +39,7 @@
linknanomsg:nn_pubsub[7]
linknanomsg:nn_reqrep[7]
linknanomsg:nn_survey[7]
+linknanomsg:nn_sync[7]
linknanomsg:nn_pair[7]
linknanomsg:nanomsg[7]
diff --git a/doc/nn_pubsub.txt b/doc/nn_pubsub.txt
index 69ea985..ca16568 100644
--- a/doc/nn_pubsub.txt
+++ b/doc/nn_pubsub.txt
@@ -46,6 +46,7 @@
linknanomsg:nn_reqrep[7]
linknanomsg:nn_pipeline[7]
linknanomsg:nn_survey[7]
+linknanomsg:nn_sync[7]
linknanomsg:nn_pair[7]
linknanomsg:nanomsg[7]
diff --git a/doc/nn_reqrep.txt b/doc/nn_reqrep.txt
index cca3c66..126883e 100644
--- a/doc/nn_reqrep.txt
+++ b/doc/nn_reqrep.txt
@@ -43,6 +43,7 @@
linknanomsg:nn_pubsub[7]
linknanomsg:nn_pipeline[7]
linknanomsg:nn_survey[7]
+linknanomsg:nn_sync[7]
linknanomsg:nn_pair[7]
linknanomsg:nanomsg[7]
diff --git a/doc/nn_survey.txt b/doc/nn_survey.txt
index 36b30ee..5b59427 100644
--- a/doc/nn_survey.txt
+++ b/doc/nn_survey.txt
@@ -47,6 +47,7 @@
linknanomsg:nn_pubsub[7]
linknanomsg:nn_reqrep[7]
linknanomsg:nn_pipeline[7]
+linknanomsg:nn_sync[7]
linknanomsg:nn_pair[7]
linknanomsg:nanomsg[7]
diff --git a/doc/nn_sync.txt b/doc/nn_sync.txt
new file mode 100644
index 0000000..ef86c19
--- /dev/null
+++ b/doc/nn_sync.txt
@@ -0,0 +1,51 @@
+nn_sync(7)
+==========
+
+NAME
+----
+nn_sync - scalability protocol for synchronising state between components
+
+
+SYNOPSIS
+--------
+*#include <nanomsg/nn.h>*
+
+*#include <nanomsg/sync.h>*
+
+
+DESCRIPTION
+-----------
+Synchronises a state (one message) on the master with the state on the mirrors.
+
+Socket Types
+~~~~~~~~~~~~
+
+NN_MASTER::
+ This socket is used to expose the state (one message) to the network. State
+ is set using send oprtation. Every such operation overwrites the old state
+ by the new state. Receive operation is not implemented on this socket type.
+NN_MIRROR::
+ This socket is used to mirror the state on the master. The changes of
+ the state are delivered to the component by receive operations. Send
+ operation is not implemented on this socket type.
+
+Socket Options
+~~~~~~~~~~~~~~
+
+No protocol-specific socket options are defined at the moment.
+
+SEE ALSO
+--------
+linknanomsg:nn_bus[7]
+linknanomsg:nn_pair[7]
+linknanomsg:nn_pipeline[7]
+linknanomsg:nn_pubsub[7]
+linknanomsg:nn_reqrep[7]
+linknanomsg:nn_survey[7]
+linknanomsg:nanomsg[7]
+
+
+AUTHORS
+-------
+Martin Sustrik <sustrik@250bpm.com>
+
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 36444c9..cc1f500 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -38,6 +38,7 @@
pipeline.h
survey.h
bus.h
+ sync.h
core/ep.h
core/ep.c
@@ -188,6 +189,15 @@
protocols/survey/xsurveyor.h
protocols/survey/xsurveyor.c
+ protocols/sync/master.h
+ protocols/sync/master.c
+ protocols/sync/xmaster.h
+ protocols/sync/xmaster.c
+ protocols/sync/mirror.h
+ protocols/sync/mirror.c
+ protocols/sync/xmirror.h
+ protocols/sync/xmirror.c
+
transports/utils/backoff.h
transports/utils/backoff.c
transports/utils/dns.h
diff --git a/src/core/global.c b/src/core/global.c
index 9ff7119..21d5c3a 100644
--- a/src/core/global.c
+++ b/src/core/global.c
@@ -63,6 +63,10 @@
#include "../protocols/survey/xsurveyor.h"
#include "../protocols/bus/bus.h"
#include "../protocols/bus/xbus.h"
+#include "../protocols/sync/master.h"
+#include "../protocols/sync/xmaster.h"
+#include "../protocols/sync/mirror.h"
+#include "../protocols/sync/xmirror.h"
#include <stddef.h>
#include <string.h>
@@ -212,6 +216,10 @@
nn_global_add_socktype (nn_xsurveyor_socktype);
nn_global_add_socktype (nn_bus_socktype);
nn_global_add_socktype (nn_xbus_socktype);
+ nn_global_add_socktype (nn_master_socktype);
+ nn_global_add_socktype (nn_xmaster_socktype);
+ nn_global_add_socktype (nn_mirror_socktype);
+ nn_global_add_socktype (nn_xmirror_socktype);
/* Start the worker threads. */
nn_pool_init (&self.pool);
diff --git a/src/protocols/sync/master.c b/src/protocols/sync/master.c
new file mode 100644
index 0000000..73f1a53
--- /dev/null
+++ b/src/protocols/sync/master.c
@@ -0,0 +1,40 @@
+/*
+ Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include "master.h"
+#include "xmaster.h"
+
+#include "../../nn.h"
+#include "../../sync.h"
+#include "../../utils/list.h"
+
+static struct nn_socktype nn_master_socktype_struct = {
+ AF_SP,
+ NN_MASTER,
+ NN_SOCKTYPE_FLAG_NORECV,
+ nn_xmaster_create,
+ nn_xmaster_ispeer,
+ NN_LIST_ITEM_INITIALIZER
+};
+
+struct nn_socktype *nn_master_socktype = &nn_master_socktype_struct;
+
diff --git a/src/protocols/sync/master.h b/src/protocols/sync/master.h
new file mode 100644
index 0000000..1be7fa7
--- /dev/null
+++ b/src/protocols/sync/master.h
@@ -0,0 +1,31 @@
+/*
+ Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#ifndef NN_MASTER_INCLUDED
+#define NN_MASTER_INCLUDED
+
+#include "../../protocol.h"
+
+extern struct nn_socktype *nn_master_socktype;
+
+#endif
+
diff --git a/src/protocols/sync/mirror.c b/src/protocols/sync/mirror.c
new file mode 100644
index 0000000..aad585d
--- /dev/null
+++ b/src/protocols/sync/mirror.c
@@ -0,0 +1,40 @@
+/*
+ Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include "mirror.h"
+#include "xmirror.h"
+
+#include "../../nn.h"
+#include "../../sync.h"
+#include "../../utils/list.h"
+
+static struct nn_socktype nn_mirror_socktype_struct = {
+ AF_SP,
+ NN_MIRROR,
+ NN_SOCKTYPE_FLAG_NOSEND,
+ nn_xmirror_create,
+ nn_xmirror_ispeer,
+ NN_LIST_ITEM_INITIALIZER
+};
+
+struct nn_socktype *nn_mirror_socktype = &nn_mirror_socktype_struct;
+
diff --git a/src/protocols/sync/mirror.h b/src/protocols/sync/mirror.h
new file mode 100644
index 0000000..e17eda7
--- /dev/null
+++ b/src/protocols/sync/mirror.h
@@ -0,0 +1,30 @@
+/*
+ Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#ifndef NN_MIRROR_INCLUDED
+#define NN_MIRROR_INCLUDED
+
+#include "../../protocol.h"
+
+extern struct nn_socktype *nn_mirror_socktype;
+
+#endif
diff --git a/src/protocols/sync/xmaster.c b/src/protocols/sync/xmaster.c
new file mode 100644
index 0000000..e4879fd
--- /dev/null
+++ b/src/protocols/sync/xmaster.c
@@ -0,0 +1,231 @@
+/*
+ Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include "xmaster.h"
+
+#include "../../nn.h"
+#include "../../sync.h"
+
+#include "../utils/dist.h"
+
+#include "../../utils/err.h"
+#include "../../utils/cont.h"
+#include "../../utils/fast.h"
+#include "../../utils/alloc.h"
+#include "../../utils/list.h"
+#include "../../utils/msg.h"
+
+#include <string.h>
+
+struct nn_xmaster_data {
+ struct nn_dist_data dist;
+
+ /* If set to 1 it represents a new connection where the state haven't
+ been sent yet. 0 otherwise. */
+ int fresh;
+};
+
+struct nn_xmaster {
+ struct nn_sockbase sockbase;
+ struct nn_dist dist;
+ struct nn_msg cache;
+};
+
+/* Private functions. */
+static void nn_xmaster_init (struct nn_xmaster *self,
+ const struct nn_sockbase_vfptr *vfptr, void *hint);
+static void nn_xmaster_term (struct nn_xmaster *self);
+
+/* Implementation of nn_sockbase's virtual functions. */
+static void nn_xmaster_destroy (struct nn_sockbase *self);
+static int nn_xmaster_add (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xmaster_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xmaster_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xmaster_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static int nn_xmaster_events (struct nn_sockbase *self);
+static int nn_xmaster_send (struct nn_sockbase *self, struct nn_msg *msg);
+static int nn_xmaster_setopt (struct nn_sockbase *self, int level, int option,
+ const void *optval, size_t optvallen);
+static int nn_xmaster_getopt (struct nn_sockbase *self, int level, int option,
+ void *optval, size_t *optvallen);
+static const struct nn_sockbase_vfptr nn_xmaster_sockbase_vfptr = {
+ NULL,
+ nn_xmaster_destroy,
+ nn_xmaster_add,
+ nn_xmaster_rm,
+ nn_xmaster_in,
+ nn_xmaster_out,
+ nn_xmaster_events,
+ nn_xmaster_send,
+ NULL,
+ nn_xmaster_setopt,
+ nn_xmaster_getopt
+};
+
+static void nn_xmaster_init (struct nn_xmaster *self,
+ const struct nn_sockbase_vfptr *vfptr, void *hint)
+{
+ nn_sockbase_init (&self->sockbase, vfptr, hint);
+ nn_dist_init (&self->dist);
+ nn_msg_init (&self->cache, 0);
+}
+
+static void nn_xmaster_term (struct nn_xmaster *self)
+{
+ nn_msg_term (&self->cache);
+ nn_dist_term (&self->dist);
+ nn_sockbase_term (&self->sockbase);
+}
+
+void nn_xmaster_destroy (struct nn_sockbase *self)
+{
+ struct nn_xmaster *xmaster;
+
+ xmaster = nn_cont (self, struct nn_xmaster, sockbase);
+
+ nn_xmaster_term (xmaster);
+ nn_free (xmaster);
+}
+
+static int nn_xmaster_add (struct nn_sockbase *self, struct nn_pipe *pipe)
+{
+ int rc;
+ struct nn_xmaster *xmaster;
+ struct nn_xmaster_data *data;
+ int sndprio;
+ size_t sz;
+
+ xmaster = nn_cont (self, struct nn_xmaster, sockbase);
+
+ data = nn_alloc (sizeof (struct nn_xmaster_data), "pipe data (master)");
+ alloc_assert (data);
+ data->fresh = 1;
+ nn_pipe_setdata (pipe, data);
+ nn_dist_add (&xmaster->dist, pipe, &data->dist);
+
+ return 0;
+}
+
+static void nn_xmaster_rm (struct nn_sockbase *self, struct nn_pipe *pipe)
+{
+ struct nn_xmaster *xmaster;
+ struct nn_xmaster_data *data;
+
+ xmaster = nn_cont (self, struct nn_xmaster, sockbase);
+ data = nn_pipe_getdata (pipe);
+ nn_dist_rm (&xmaster->dist, pipe, &data->dist);
+ nn_free (data);
+}
+
+static void nn_xmaster_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+{
+ /* We are not going to receive any messages, so there's no need to store
+ the list of inbound pipes. */
+}
+
+static void nn_xmaster_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+{
+ int rc;
+ struct nn_xmaster *xmaster;
+ struct nn_xmaster_data *data;
+
+ xmaster = nn_cont (self, struct nn_xmaster, sockbase);
+ data = nn_pipe_getdata (pipe);
+
+ /* Send the state to newly established pipe. */
+ if (data->fresh) {
+ rc = nn_pipe_send (pipe, &xmaster->cache);
+ errnum_assert (rc >= 0, -rc);
+ data->fresh = 0;
+ if (rc & NN_PIPE_RELEASE)
+ return;
+ }
+
+ nn_dist_out (&xmaster->dist, pipe, &data->dist);
+}
+
+static int nn_xmaster_events (struct nn_sockbase *self)
+{
+ return NN_SOCKBASE_EVENT_OUT;
+}
+
+static int nn_xmaster_send (struct nn_sockbase *self, struct nn_msg *msg)
+{
+ struct nn_xmaster *xmaster;
+
+ xmaster = nn_cont (self, struct nn_xmaster, sockbase);
+
+ /* Check whether new state is the same as the old one.
+ If so, do nothing. */
+ if (nn_chunkref_size (&xmaster->cache.body) == nn_chunkref_size (&msg->body)
+ && memcmp (nn_chunkref_data (&xmaster->cache.body),
+ nn_chunkref_data (&msg->body), nn_chunkref_size (&msg->body)) == 0)
+ return 0;
+
+ /* Overwrite the cache by the new state. */
+ nn_msg_term (&xmaster->cache);
+ nn_msg_cp (&xmaster->cache, msg);
+
+ /* Distribute the new state to all the mirrors. */
+ return nn_dist_send (&xmaster->dist, msg, NULL);
+}
+
+static int nn_xmaster_setopt (struct nn_sockbase *self, int level,
+ int option, const void *optval, size_t optvallen)
+{
+ return -ENOPROTOOPT;
+}
+
+static int nn_xmaster_getopt (struct nn_sockbase *self, int level,
+ int option, void *optval, size_t *optvallen)
+{
+ return -ENOPROTOOPT;
+}
+
+int nn_xmaster_create (void *hint, struct nn_sockbase **sockbase)
+{
+ struct nn_xmaster *self;
+
+ self = nn_alloc (sizeof (struct nn_xmaster), "socket (master)");
+ alloc_assert (self);
+ nn_xmaster_init (self, &nn_xmaster_sockbase_vfptr, hint);
+ *sockbase = &self->sockbase;
+
+ return 0;
+}
+
+int nn_xmaster_ispeer (int socktype)
+{
+ return socktype == NN_MIRROR ? 1 : 0;
+}
+
+static struct nn_socktype nn_xmaster_socktype_struct = {
+ AF_SP_RAW,
+ NN_MASTER,
+ NN_SOCKTYPE_FLAG_NORECV,
+ nn_xmaster_create,
+ nn_xmaster_ispeer,
+ NN_LIST_ITEM_INITIALIZER
+};
+
+struct nn_socktype *nn_xmaster_socktype = &nn_xmaster_socktype_struct;
+
diff --git a/src/protocols/sync/xmaster.h b/src/protocols/sync/xmaster.h
new file mode 100644
index 0000000..6fd15c3
--- /dev/null
+++ b/src/protocols/sync/xmaster.h
@@ -0,0 +1,34 @@
+/*
+ Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#ifndef NN_XMASTER_INCLUDED
+#define NN_XMASTER_INCLUDED
+
+#include "../../protocol.h"
+
+extern struct nn_socktype *nn_xmaster_socktype;
+
+int nn_xmaster_create (void *hint, struct nn_sockbase **sockbase);
+int nn_xmaster_ispeer (int socktype);
+
+#endif
+
diff --git a/src/protocols/sync/xmirror.c b/src/protocols/sync/xmirror.c
new file mode 100644
index 0000000..e1f6caa
--- /dev/null
+++ b/src/protocols/sync/xmirror.c
@@ -0,0 +1,186 @@
+/*
+ Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include "xmirror.h"
+
+#include "../../nn.h"
+#include "../../sync.h"
+
+#include "../utils/excl.h"
+
+#include "../../utils/err.h"
+#include "../../utils/cont.h"
+#include "../../utils/fast.h"
+#include "../../utils/alloc.h"
+#include "../../utils/list.h"
+
+struct nn_xmirror {
+ struct nn_sockbase sockbase;
+ struct nn_excl excl;
+};
+
+/* Private functions. */
+static void nn_xmirror_init (struct nn_xmirror *self,
+ const struct nn_sockbase_vfptr *vfptr, void *hint);
+static void nn_xmirror_term (struct nn_xmirror *self);
+
+/* Implementation of nn_sockbase's virtual functions. */
+static void nn_xmirror_destroy (struct nn_sockbase *self);
+static int nn_xmirror_add (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xmirror_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xmirror_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xmirror_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static int nn_xmirror_events (struct nn_sockbase *self);
+static int nn_xmirror_recv (struct nn_sockbase *self, struct nn_msg *msg);
+static int nn_xmirror_setopt (struct nn_sockbase *self, int level,
+ int option, const void *optval, size_t optvallen);
+static int nn_xmirror_getopt (struct nn_sockbase *self, int level,
+ int option, void *optval, size_t *optvallen);
+static const struct nn_sockbase_vfptr nn_xmirror_sockbase_vfptr = {
+ NULL,
+ nn_xmirror_destroy,
+ nn_xmirror_add,
+ nn_xmirror_rm,
+ nn_xmirror_in,
+ nn_xmirror_out,
+ nn_xmirror_events,
+ NULL,
+ nn_xmirror_recv,
+ nn_xmirror_setopt,
+ nn_xmirror_getopt
+};
+
+static void nn_xmirror_init (struct nn_xmirror *self,
+ const struct nn_sockbase_vfptr *vfptr, void *hint)
+{
+ nn_sockbase_init (&self->sockbase, vfptr, hint);
+ nn_excl_init (&self->excl);
+}
+
+static void nn_xmirror_term (struct nn_xmirror *self)
+{
+ nn_excl_term (&self->excl);
+ nn_sockbase_term (&self->sockbase);
+}
+
+void nn_xmirror_destroy (struct nn_sockbase *self)
+{
+ struct nn_xmirror *xmirror;
+
+ xmirror = nn_cont (self, struct nn_xmirror, sockbase);
+
+ nn_xmirror_term (xmirror);
+ nn_free (xmirror);
+}
+
+static int nn_xmirror_add (struct nn_sockbase *self, struct nn_pipe *pipe)
+{
+ struct nn_xmirror *xmirror;
+
+ xmirror = nn_cont (self, struct nn_xmirror, sockbase);
+
+ nn_excl_add (&xmirror->excl, pipe);
+
+ return 0;
+}
+
+static void nn_xmirror_rm (struct nn_sockbase *self, struct nn_pipe *pipe)
+{
+ struct nn_xmirror *xmirror;
+
+ xmirror = nn_cont (self, struct nn_xmirror, sockbase);
+
+ nn_excl_rm (&xmirror->excl, pipe);
+}
+
+static void nn_xmirror_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+{
+ struct nn_xmirror *xmirror;
+
+ xmirror = nn_cont (self, struct nn_xmirror, sockbase);
+
+ nn_excl_in (&xmirror->excl, pipe);
+}
+
+static void nn_xmirror_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+{
+ /* We are not going to send any messages, so there's no point is
+ maintaining a list of pipes ready for sending. */
+}
+
+static int nn_xmirror_events (struct nn_sockbase *self)
+{
+ return nn_excl_can_recv (&nn_cont (self, struct nn_xmirror,
+ sockbase)->excl) ? NN_SOCKBASE_EVENT_IN : 0;
+}
+
+static int nn_xmirror_recv (struct nn_sockbase *self, struct nn_msg *msg)
+{
+ int rc;
+
+ rc = nn_excl_recv (&nn_cont (self, struct nn_xmirror, sockbase)->excl,
+ msg);
+
+ /* Discard NN_PIPEBASE_PARSED flag. */
+ return rc < 0 ? rc : 0;
+}
+
+static int nn_xmirror_setopt (struct nn_sockbase *self, int level,
+ int option, const void *optval, size_t optvallen)
+{
+ return -ENOPROTOOPT;
+}
+
+static int nn_xmirror_getopt (struct nn_sockbase *self, int level,
+ int option, void *optval, size_t *optvallen)
+{
+ return -ENOPROTOOPT;
+}
+
+int nn_xmirror_create (void *hint, struct nn_sockbase **sockbase)
+{
+ struct nn_xmirror *self;
+
+ self = nn_alloc (sizeof (struct nn_xmirror), "socket (mirror)");
+ alloc_assert (self);
+ nn_xmirror_init (self, &nn_xmirror_sockbase_vfptr, hint);
+ *sockbase = &self->sockbase;
+
+ return 0;
+}
+
+int nn_xmirror_ispeer (int socktype)
+{
+ return socktype == NN_MASTER ? 1 : 0;
+}
+
+static struct nn_socktype nn_xmirror_socktype_struct = {
+ AF_SP_RAW,
+ NN_MIRROR,
+ NN_SOCKTYPE_FLAG_NOSEND,
+ nn_xmirror_create,
+ nn_xmirror_ispeer,
+ NN_LIST_ITEM_INITIALIZER
+};
+
+struct nn_socktype *nn_xmirror_socktype = &nn_xmirror_socktype_struct;
+
diff --git a/src/protocols/sync/xmirror.h b/src/protocols/sync/xmirror.h
new file mode 100644
index 0000000..10b9613
--- /dev/null
+++ b/src/protocols/sync/xmirror.h
@@ -0,0 +1,33 @@
+/*
+ Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#ifndef NN_XMIRROR_INCLUDED
+#define NN_XMIRROR_INCLUDED
+
+#include "../../protocol.h"
+
+extern struct nn_socktype *nn_xmirror_socktype;
+
+int nn_xmirror_create (void *hint, struct nn_sockbase **sockbase);
+int nn_xmirror_ispeer (int socktype);
+
+#endif
diff --git a/src/sync.h b/src/sync.h
new file mode 100644
index 0000000..e83f4d3
--- /dev/null
+++ b/src/sync.h
@@ -0,0 +1,40 @@
+/*
+ Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#ifndef SYNC_H_INCLUDED
+#define SYNC_H_INCLUDED
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define NN_PROTO_SYNC 8
+
+#define NN_MASTER (NN_PROTO_SYNC * 16 + 0)
+#define NN_MIRROR (NN_PROTO_SYNC * 16 + 1)
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
+
diff --git a/tests/sync.c b/tests/sync.c
new file mode 100644
index 0000000..ac46408
--- /dev/null
+++ b/tests/sync.c
@@ -0,0 +1,55 @@
+/*
+ Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include "../src/nn.h"
+#include "../src/sync.h"
+
+#include "testutil.h"
+
+#define SOCKET_ADDRESS "inproc://a"
+
+int main ()
+{
+ int rc;
+ int master;
+ int mirror;
+ char buf [3];
+
+ /* Check whether unitialised state is sent to the mirror when it
+ connects. */
+ master = test_socket (AF_SP, NN_MASTER);
+ test_send (master, "A");
+ test_bind (master, SOCKET_ADDRESS);
+ mirror = test_socket (AF_SP, NN_MIRROR);
+ test_connect (mirror, SOCKET_ADDRESS);
+ test_recv (mirror, "A");
+
+ /* Test whether changing of the state results in updating the mirror. */
+ test_send (master, "B");
+ test_recv (mirror, "B");
+
+ test_close (mirror);
+ test_close (master);
+
+ return 0;
+}
+