SYNC protocol; initial version

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/.gitignore b/.gitignore
index c443293..f7639c7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -80,3 +80,4 @@
 tests/trie
 tests/zerocopy
 tests/term
+tests/sync
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 10054c2..dc559f4 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -72,6 +72,7 @@
 add_libnanomsg_test (pipeline)
 add_libnanomsg_test (survey)
 add_libnanomsg_test (bus)
+add_libnanomsg_test (sync)
 
 #  Feature tests.
 add_libnanomsg_test (block)
diff --git a/Makefile.am b/Makefile.am
index a5aa529..f9148fe 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -40,7 +40,8 @@
     src/reqrep.h \
     src/pipeline.h \
     src/survey.h \
-    src/bus.h
+    src/bus.h \
+    src/sync.h
 
 lib_LTLIBRARIES = libnanomsg.la
 
@@ -206,6 +207,16 @@
     src/protocols/survey/xsurveyor.h \
     src/protocols/survey/xsurveyor.c
 
+PROTOCOLS_SYNC = \
+    src/protocols/sync/master.h \
+    src/protocols/sync/master.c \
+    src/protocols/sync/xmaster.h \
+    src/protocols/sync/xmaster.c \
+    src/protocols/sync/mirror.h \
+    src/protocols/sync/mirror.c \
+    src/protocols/sync/xmirror.h \
+    src/protocols/sync/xmirror.c
+
 NANOMSG_PROTOCOLS = \
     $(PROTOCOLS_BUS) \
     $(PROTOCOLS_PIPELINE) \
@@ -215,8 +226,7 @@
     $(PROTOCOLS_SURVEY) \
     $(PROTOCOLS_UTILS) \
     $(PROTOCOLS_INPROC) \
-    $(PROTOCOLS_IPC) \
-    $(PROTOCOLS_TCP)
+    $(PROTOCOLS_SYNC)
 
 
 TRANSPORTS_UTILS = \
@@ -309,6 +319,7 @@
     doc/nn_survey.txt \
     doc/nn_pipeline.txt \
     doc/nn_bus.txt \
+    doc/nn_sync.txt \
     doc/nn_inproc.txt \
     doc/nn_ipc.txt \
     doc/nn_tcp.txt
@@ -421,7 +432,8 @@
     tests/reqrep \
     tests/pipeline \
     tests/survey \
-    tests/bus
+    tests/bus \
+    tests/sync
 
 FEATURE_TESTS = \
     tests/block \
diff --git a/doc/nanomsg.txt b/doc/nanomsg.txt
index 0d94c65..72b4e39 100644
--- a/doc/nanomsg.txt
+++ b/doc/nanomsg.txt
@@ -94,6 +94,9 @@
 Message bus protocol::
     linknanomsg:nn_bus[7]
 
+State synchronisation protocol::
+    linknanomsg:nn_sync[7]
+
 Following transport mechanisms are provided by nanomsg:
 
 In-process transport::
diff --git a/doc/nn_bus.txt b/doc/nn_bus.txt
index 96a372a..bcc0bb2 100644
--- a/doc/nn_bus.txt
+++ b/doc/nn_bus.txt
@@ -45,10 +45,11 @@
 SEE ALSO
 --------
 linknanomsg:nn_pubsub[7]
-linknanomsg:nn_reqrep[7]
-linknanomsg:nn_pipeline[7]
-linknanomsg:nn_survey[7]
 linknanomsg:nn_pair[7]
+linknanomsg:nn_pipeline[7]
+linknanomsg:nn_reqrep[7]
+linknanomsg:nn_survey[7]
+linknanomsg:nn_sync[7]
 linknanomsg:nanomsg[7]
 
 AUTHORS
diff --git a/doc/nn_pair.txt b/doc/nn_pair.txt
index 06d732d..985697d 100644
--- a/doc/nn_pair.txt
+++ b/doc/nn_pair.txt
@@ -49,6 +49,7 @@
 linknanomsg:nn_reqrep[7]
 linknanomsg:nn_pipeline[7]
 linknanomsg:nn_survey[7]
+linknanomsg:nn_sync[7]
 linknanomsg:nanomsg[7]
 
 
diff --git a/doc/nn_pipeline.txt b/doc/nn_pipeline.txt
index e213987..fa709c7 100644
--- a/doc/nn_pipeline.txt
+++ b/doc/nn_pipeline.txt
@@ -39,6 +39,7 @@
 linknanomsg:nn_pubsub[7]
 linknanomsg:nn_reqrep[7]
 linknanomsg:nn_survey[7]
+linknanomsg:nn_sync[7]
 linknanomsg:nn_pair[7]
 linknanomsg:nanomsg[7]
 
diff --git a/doc/nn_pubsub.txt b/doc/nn_pubsub.txt
index 69ea985..ca16568 100644
--- a/doc/nn_pubsub.txt
+++ b/doc/nn_pubsub.txt
@@ -46,6 +46,7 @@
 linknanomsg:nn_reqrep[7]
 linknanomsg:nn_pipeline[7]
 linknanomsg:nn_survey[7]
+linknanomsg:nn_sync[7]
 linknanomsg:nn_pair[7]
 linknanomsg:nanomsg[7]
 
diff --git a/doc/nn_reqrep.txt b/doc/nn_reqrep.txt
index cca3c66..126883e 100644
--- a/doc/nn_reqrep.txt
+++ b/doc/nn_reqrep.txt
@@ -43,6 +43,7 @@
 linknanomsg:nn_pubsub[7]
 linknanomsg:nn_pipeline[7]
 linknanomsg:nn_survey[7]
+linknanomsg:nn_sync[7]
 linknanomsg:nn_pair[7]
 linknanomsg:nanomsg[7]
 
diff --git a/doc/nn_survey.txt b/doc/nn_survey.txt
index 36b30ee..5b59427 100644
--- a/doc/nn_survey.txt
+++ b/doc/nn_survey.txt
@@ -47,6 +47,7 @@
 linknanomsg:nn_pubsub[7]
 linknanomsg:nn_reqrep[7]
 linknanomsg:nn_pipeline[7]
+linknanomsg:nn_sync[7]
 linknanomsg:nn_pair[7]
 linknanomsg:nanomsg[7]
 
diff --git a/doc/nn_sync.txt b/doc/nn_sync.txt
new file mode 100644
index 0000000..ef86c19
--- /dev/null
+++ b/doc/nn_sync.txt
@@ -0,0 +1,51 @@
+nn_sync(7)
+==========
+
+NAME
+----
+nn_sync - scalability protocol for synchronising state between components
+
+
+SYNOPSIS
+--------
+*#include <nanomsg/nn.h>*
+
+*#include <nanomsg/sync.h>*
+
+
+DESCRIPTION
+-----------
+Synchronises a state (one message) on the master with the state on the mirrors.
+
+Socket Types
+~~~~~~~~~~~~
+
+NN_MASTER::
+    This socket is used to expose the state (one message) to the network. State
+    is set using send oprtation. Every such operation overwrites the old state
+    by the new state. Receive operation is not implemented on this socket type.
+NN_MIRROR::
+    This socket is used to mirror the state on the master. The changes of
+    the state are delivered to the component by receive operations. Send
+    operation is not implemented on this socket type.
+
+Socket Options
+~~~~~~~~~~~~~~
+
+No protocol-specific socket options are defined at the moment.
+
+SEE ALSO
+--------
+linknanomsg:nn_bus[7]
+linknanomsg:nn_pair[7]
+linknanomsg:nn_pipeline[7]
+linknanomsg:nn_pubsub[7]
+linknanomsg:nn_reqrep[7]
+linknanomsg:nn_survey[7]
+linknanomsg:nanomsg[7]
+
+
+AUTHORS
+-------
+Martin Sustrik <sustrik@250bpm.com>
+
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 36444c9..cc1f500 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -38,6 +38,7 @@
     pipeline.h
     survey.h
     bus.h
+    sync.h
 
     core/ep.h
     core/ep.c
@@ -188,6 +189,15 @@
     protocols/survey/xsurveyor.h
     protocols/survey/xsurveyor.c
 
+    protocols/sync/master.h
+    protocols/sync/master.c
+    protocols/sync/xmaster.h
+    protocols/sync/xmaster.c
+    protocols/sync/mirror.h
+    protocols/sync/mirror.c
+    protocols/sync/xmirror.h
+    protocols/sync/xmirror.c
+
     transports/utils/backoff.h
     transports/utils/backoff.c
     transports/utils/dns.h
diff --git a/src/core/global.c b/src/core/global.c
index 9ff7119..21d5c3a 100644
--- a/src/core/global.c
+++ b/src/core/global.c
@@ -63,6 +63,10 @@
 #include "../protocols/survey/xsurveyor.h"
 #include "../protocols/bus/bus.h"
 #include "../protocols/bus/xbus.h"
+#include "../protocols/sync/master.h"
+#include "../protocols/sync/xmaster.h"
+#include "../protocols/sync/mirror.h"
+#include "../protocols/sync/xmirror.h"
 
 #include <stddef.h>
 #include <string.h>
@@ -212,6 +216,10 @@
     nn_global_add_socktype (nn_xsurveyor_socktype);
     nn_global_add_socktype (nn_bus_socktype);
     nn_global_add_socktype (nn_xbus_socktype);
+    nn_global_add_socktype (nn_master_socktype);
+    nn_global_add_socktype (nn_xmaster_socktype);
+    nn_global_add_socktype (nn_mirror_socktype);
+    nn_global_add_socktype (nn_xmirror_socktype);
 
     /*  Start the worker threads. */
     nn_pool_init (&self.pool);
diff --git a/src/protocols/sync/master.c b/src/protocols/sync/master.c
new file mode 100644
index 0000000..73f1a53
--- /dev/null
+++ b/src/protocols/sync/master.c
@@ -0,0 +1,40 @@
+/*
+    Copyright (c) 2013 GoPivotal, Inc.  All rights reserved.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#include "master.h"
+#include "xmaster.h"
+
+#include "../../nn.h"
+#include "../../sync.h"
+#include "../../utils/list.h"
+
+static struct nn_socktype nn_master_socktype_struct = {
+    AF_SP,
+    NN_MASTER,
+    NN_SOCKTYPE_FLAG_NORECV,
+    nn_xmaster_create,
+    nn_xmaster_ispeer,
+    NN_LIST_ITEM_INITIALIZER
+};
+
+struct nn_socktype *nn_master_socktype = &nn_master_socktype_struct;
+
diff --git a/src/protocols/sync/master.h b/src/protocols/sync/master.h
new file mode 100644
index 0000000..1be7fa7
--- /dev/null
+++ b/src/protocols/sync/master.h
@@ -0,0 +1,31 @@
+/*
+    Copyright (c) 2013 GoPivotal, Inc.  All rights reserved.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#ifndef NN_MASTER_INCLUDED
+#define NN_MASTER_INCLUDED
+
+#include "../../protocol.h"
+
+extern struct nn_socktype *nn_master_socktype;
+
+#endif
+
diff --git a/src/protocols/sync/mirror.c b/src/protocols/sync/mirror.c
new file mode 100644
index 0000000..aad585d
--- /dev/null
+++ b/src/protocols/sync/mirror.c
@@ -0,0 +1,40 @@
+/*
+    Copyright (c) 2013 GoPivotal, Inc.  All rights reserved.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#include "mirror.h"
+#include "xmirror.h"
+
+#include "../../nn.h"
+#include "../../sync.h"
+#include "../../utils/list.h"
+
+static struct nn_socktype nn_mirror_socktype_struct = {
+    AF_SP,
+    NN_MIRROR,
+    NN_SOCKTYPE_FLAG_NOSEND,
+    nn_xmirror_create,
+    nn_xmirror_ispeer,
+    NN_LIST_ITEM_INITIALIZER
+};
+
+struct nn_socktype *nn_mirror_socktype = &nn_mirror_socktype_struct;
+
diff --git a/src/protocols/sync/mirror.h b/src/protocols/sync/mirror.h
new file mode 100644
index 0000000..e17eda7
--- /dev/null
+++ b/src/protocols/sync/mirror.h
@@ -0,0 +1,30 @@
+/*
+    Copyright (c) 2013 GoPivotal, Inc.  All rights reserved.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#ifndef NN_MIRROR_INCLUDED
+#define NN_MIRROR_INCLUDED
+
+#include "../../protocol.h"
+
+extern struct nn_socktype *nn_mirror_socktype;
+
+#endif
diff --git a/src/protocols/sync/xmaster.c b/src/protocols/sync/xmaster.c
new file mode 100644
index 0000000..e4879fd
--- /dev/null
+++ b/src/protocols/sync/xmaster.c
@@ -0,0 +1,231 @@
+/*
+    Copyright (c) 2013 GoPivotal, Inc.  All rights reserved.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#include "xmaster.h"
+
+#include "../../nn.h"
+#include "../../sync.h"
+
+#include "../utils/dist.h"
+
+#include "../../utils/err.h"
+#include "../../utils/cont.h"
+#include "../../utils/fast.h"
+#include "../../utils/alloc.h"
+#include "../../utils/list.h"
+#include "../../utils/msg.h"
+
+#include <string.h>
+
+struct nn_xmaster_data {
+    struct nn_dist_data dist;
+
+    /*  If set to 1 it represents a new connection where the state haven't
+        been sent yet. 0 otherwise. */
+    int fresh;
+};
+
+struct nn_xmaster {
+    struct nn_sockbase sockbase;
+    struct nn_dist dist;
+    struct nn_msg cache;
+};
+
+/*  Private functions. */
+static void nn_xmaster_init (struct nn_xmaster *self,
+    const struct nn_sockbase_vfptr *vfptr, void *hint);
+static void nn_xmaster_term (struct nn_xmaster *self);
+
+/*  Implementation of nn_sockbase's virtual functions. */
+static void nn_xmaster_destroy (struct nn_sockbase *self);
+static int nn_xmaster_add (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xmaster_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xmaster_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xmaster_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static int nn_xmaster_events (struct nn_sockbase *self);
+static int nn_xmaster_send (struct nn_sockbase *self, struct nn_msg *msg);
+static int nn_xmaster_setopt (struct nn_sockbase *self, int level, int option,
+    const void *optval, size_t optvallen);
+static int nn_xmaster_getopt (struct nn_sockbase *self, int level, int option,
+    void *optval, size_t *optvallen);
+static const struct nn_sockbase_vfptr nn_xmaster_sockbase_vfptr = {
+    NULL,
+    nn_xmaster_destroy,
+    nn_xmaster_add,
+    nn_xmaster_rm,
+    nn_xmaster_in,
+    nn_xmaster_out,
+    nn_xmaster_events,
+    nn_xmaster_send,
+    NULL,
+    nn_xmaster_setopt,
+    nn_xmaster_getopt
+};
+
+static void nn_xmaster_init (struct nn_xmaster *self,
+    const struct nn_sockbase_vfptr *vfptr, void *hint)
+{
+    nn_sockbase_init (&self->sockbase, vfptr, hint);
+    nn_dist_init (&self->dist);
+    nn_msg_init (&self->cache, 0);
+}
+
+static void nn_xmaster_term (struct nn_xmaster *self)
+{
+    nn_msg_term (&self->cache);
+    nn_dist_term (&self->dist);
+    nn_sockbase_term (&self->sockbase);
+}
+
+void nn_xmaster_destroy (struct nn_sockbase *self)
+{
+    struct nn_xmaster *xmaster;
+
+    xmaster = nn_cont (self, struct nn_xmaster, sockbase);
+
+    nn_xmaster_term (xmaster);
+    nn_free (xmaster);
+}
+
+static int nn_xmaster_add (struct nn_sockbase *self, struct nn_pipe *pipe)
+{
+    int rc;
+    struct nn_xmaster *xmaster;
+    struct nn_xmaster_data *data;
+    int sndprio;
+    size_t sz;
+
+    xmaster = nn_cont (self, struct nn_xmaster, sockbase);
+
+    data = nn_alloc (sizeof (struct nn_xmaster_data), "pipe data (master)");
+    alloc_assert (data);
+    data->fresh = 1;
+    nn_pipe_setdata (pipe, data);
+    nn_dist_add (&xmaster->dist, pipe, &data->dist);
+
+    return 0;
+}
+
+static void nn_xmaster_rm (struct nn_sockbase *self, struct nn_pipe *pipe)
+{
+    struct nn_xmaster *xmaster;
+    struct nn_xmaster_data *data;
+
+    xmaster = nn_cont (self, struct nn_xmaster, sockbase);
+    data = nn_pipe_getdata (pipe);
+    nn_dist_rm (&xmaster->dist, pipe, &data->dist);
+    nn_free (data);
+}
+
+static void nn_xmaster_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+{
+    /*  We are not going to receive any messages, so there's no need to store
+        the list of inbound pipes. */
+}
+
+static void nn_xmaster_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+{
+    int rc;
+    struct nn_xmaster *xmaster;
+    struct nn_xmaster_data *data;
+
+    xmaster = nn_cont (self, struct nn_xmaster, sockbase);
+    data = nn_pipe_getdata (pipe);
+
+    /*  Send the state to newly established pipe. */
+    if (data->fresh) {
+        rc = nn_pipe_send (pipe, &xmaster->cache);
+        errnum_assert (rc >= 0, -rc);
+        data->fresh = 0;
+        if (rc & NN_PIPE_RELEASE)
+            return;
+    }
+
+    nn_dist_out (&xmaster->dist, pipe, &data->dist);
+}
+
+static int nn_xmaster_events (struct nn_sockbase *self)
+{
+    return NN_SOCKBASE_EVENT_OUT;
+}
+
+static int nn_xmaster_send (struct nn_sockbase *self, struct nn_msg *msg)
+{
+    struct nn_xmaster *xmaster;
+
+    xmaster = nn_cont (self, struct nn_xmaster, sockbase);
+
+    /*  Check whether new state is the same as the old one.
+        If so, do nothing. */
+    if (nn_chunkref_size (&xmaster->cache.body) == nn_chunkref_size (&msg->body)
+          && memcmp (nn_chunkref_data (&xmaster->cache.body),
+          nn_chunkref_data (&msg->body), nn_chunkref_size (&msg->body)) == 0)
+        return 0;
+
+    /*  Overwrite the cache by the new state. */
+    nn_msg_term (&xmaster->cache);
+    nn_msg_cp (&xmaster->cache, msg);
+
+    /*  Distribute the new state to all the mirrors. */
+    return nn_dist_send (&xmaster->dist, msg, NULL);
+}
+
+static int nn_xmaster_setopt (struct nn_sockbase *self, int level,
+    int option, const void *optval, size_t optvallen)
+{
+    return -ENOPROTOOPT;
+}
+
+static int nn_xmaster_getopt (struct nn_sockbase *self, int level,
+    int option, void *optval, size_t *optvallen)
+{
+    return -ENOPROTOOPT;
+}
+
+int nn_xmaster_create (void *hint, struct nn_sockbase **sockbase)
+{
+    struct nn_xmaster *self;
+
+    self = nn_alloc (sizeof (struct nn_xmaster), "socket (master)");
+    alloc_assert (self);
+    nn_xmaster_init (self, &nn_xmaster_sockbase_vfptr, hint);
+    *sockbase = &self->sockbase;
+
+    return 0;
+}
+
+int nn_xmaster_ispeer (int socktype)
+{
+    return socktype == NN_MIRROR ? 1 : 0;
+}
+
+static struct nn_socktype nn_xmaster_socktype_struct = {
+    AF_SP_RAW,
+    NN_MASTER,
+    NN_SOCKTYPE_FLAG_NORECV,
+    nn_xmaster_create,
+    nn_xmaster_ispeer,
+    NN_LIST_ITEM_INITIALIZER
+};
+
+struct nn_socktype *nn_xmaster_socktype = &nn_xmaster_socktype_struct;
+
diff --git a/src/protocols/sync/xmaster.h b/src/protocols/sync/xmaster.h
new file mode 100644
index 0000000..6fd15c3
--- /dev/null
+++ b/src/protocols/sync/xmaster.h
@@ -0,0 +1,34 @@
+/*
+    Copyright (c) 2013 GoPivotal, Inc.  All rights reserved.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#ifndef NN_XMASTER_INCLUDED
+#define NN_XMASTER_INCLUDED
+
+#include "../../protocol.h"
+
+extern struct nn_socktype *nn_xmaster_socktype;
+
+int nn_xmaster_create (void *hint, struct nn_sockbase **sockbase);
+int nn_xmaster_ispeer (int socktype);
+
+#endif
+
diff --git a/src/protocols/sync/xmirror.c b/src/protocols/sync/xmirror.c
new file mode 100644
index 0000000..e1f6caa
--- /dev/null
+++ b/src/protocols/sync/xmirror.c
@@ -0,0 +1,186 @@
+/*
+    Copyright (c) 2013 GoPivotal, Inc.  All rights reserved.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#include "xmirror.h"
+
+#include "../../nn.h"
+#include "../../sync.h"
+
+#include "../utils/excl.h"
+
+#include "../../utils/err.h"
+#include "../../utils/cont.h"
+#include "../../utils/fast.h"
+#include "../../utils/alloc.h"
+#include "../../utils/list.h"
+
+struct nn_xmirror {
+    struct nn_sockbase sockbase;
+    struct nn_excl excl;
+};
+
+/*  Private functions. */
+static void nn_xmirror_init (struct nn_xmirror *self,
+    const struct nn_sockbase_vfptr *vfptr, void *hint);
+static void nn_xmirror_term (struct nn_xmirror *self);
+
+/*  Implementation of nn_sockbase's virtual functions. */
+static void nn_xmirror_destroy (struct nn_sockbase *self);
+static int nn_xmirror_add (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xmirror_rm (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xmirror_in (struct nn_sockbase *self, struct nn_pipe *pipe);
+static void nn_xmirror_out (struct nn_sockbase *self, struct nn_pipe *pipe);
+static int nn_xmirror_events (struct nn_sockbase *self);
+static int nn_xmirror_recv (struct nn_sockbase *self, struct nn_msg *msg);
+static int nn_xmirror_setopt (struct nn_sockbase *self, int level,
+    int option, const void *optval, size_t optvallen);
+static int nn_xmirror_getopt (struct nn_sockbase *self, int level,
+    int option, void *optval, size_t *optvallen);
+static const struct nn_sockbase_vfptr nn_xmirror_sockbase_vfptr = {
+    NULL,
+    nn_xmirror_destroy,
+    nn_xmirror_add,
+    nn_xmirror_rm,
+    nn_xmirror_in,
+    nn_xmirror_out,
+    nn_xmirror_events,
+    NULL,
+    nn_xmirror_recv,
+    nn_xmirror_setopt,
+    nn_xmirror_getopt
+};
+
+static void nn_xmirror_init (struct nn_xmirror *self,
+    const struct nn_sockbase_vfptr *vfptr, void *hint)
+{
+    nn_sockbase_init (&self->sockbase, vfptr, hint);
+    nn_excl_init (&self->excl);
+}
+
+static void nn_xmirror_term (struct nn_xmirror *self)
+{
+    nn_excl_term (&self->excl);
+    nn_sockbase_term (&self->sockbase);
+}
+
+void nn_xmirror_destroy (struct nn_sockbase *self)
+{
+    struct nn_xmirror *xmirror;
+
+    xmirror = nn_cont (self, struct nn_xmirror, sockbase);
+
+    nn_xmirror_term (xmirror);
+    nn_free (xmirror);
+}
+
+static int nn_xmirror_add (struct nn_sockbase *self, struct nn_pipe *pipe)
+{
+    struct nn_xmirror *xmirror;
+
+    xmirror = nn_cont (self, struct nn_xmirror, sockbase);
+
+    nn_excl_add (&xmirror->excl, pipe);
+
+    return 0;
+}
+
+static void nn_xmirror_rm (struct nn_sockbase *self, struct nn_pipe *pipe)
+{
+    struct nn_xmirror *xmirror;
+
+    xmirror = nn_cont (self, struct nn_xmirror, sockbase);
+
+    nn_excl_rm (&xmirror->excl, pipe);
+}
+
+static void nn_xmirror_in (struct nn_sockbase *self, struct nn_pipe *pipe)
+{
+    struct nn_xmirror *xmirror;
+
+    xmirror = nn_cont (self, struct nn_xmirror, sockbase);
+
+    nn_excl_in (&xmirror->excl, pipe);
+}
+
+static void nn_xmirror_out (struct nn_sockbase *self, struct nn_pipe *pipe)
+{
+    /*  We are not going to send any messages, so there's no point is
+        maintaining a list of pipes ready for sending. */
+}
+
+static int nn_xmirror_events (struct nn_sockbase *self)
+{
+    return nn_excl_can_recv (&nn_cont (self, struct nn_xmirror,
+        sockbase)->excl) ? NN_SOCKBASE_EVENT_IN : 0;
+}
+
+static int nn_xmirror_recv (struct nn_sockbase *self, struct nn_msg *msg)
+{
+    int rc;
+
+    rc = nn_excl_recv (&nn_cont (self, struct nn_xmirror, sockbase)->excl,
+         msg);
+
+    /*  Discard NN_PIPEBASE_PARSED flag. */
+    return rc < 0 ? rc : 0;
+}
+
+static int nn_xmirror_setopt (struct nn_sockbase *self, int level,
+    int option, const void *optval, size_t optvallen)
+{
+    return -ENOPROTOOPT;
+}
+
+static int nn_xmirror_getopt (struct nn_sockbase *self, int level,
+    int option, void *optval, size_t *optvallen)
+{
+    return -ENOPROTOOPT;
+}
+
+int nn_xmirror_create (void *hint, struct nn_sockbase **sockbase)
+{
+    struct nn_xmirror *self;
+
+    self = nn_alloc (sizeof (struct nn_xmirror), "socket (mirror)");
+    alloc_assert (self);
+    nn_xmirror_init (self, &nn_xmirror_sockbase_vfptr, hint);
+    *sockbase = &self->sockbase;
+
+    return 0;
+}
+
+int nn_xmirror_ispeer (int socktype)
+{
+    return socktype == NN_MASTER ? 1 : 0;
+}
+
+static struct nn_socktype nn_xmirror_socktype_struct = {
+    AF_SP_RAW,
+    NN_MIRROR,
+    NN_SOCKTYPE_FLAG_NOSEND,
+    nn_xmirror_create,
+    nn_xmirror_ispeer,
+    NN_LIST_ITEM_INITIALIZER
+};
+
+struct nn_socktype *nn_xmirror_socktype = &nn_xmirror_socktype_struct;
+
diff --git a/src/protocols/sync/xmirror.h b/src/protocols/sync/xmirror.h
new file mode 100644
index 0000000..10b9613
--- /dev/null
+++ b/src/protocols/sync/xmirror.h
@@ -0,0 +1,33 @@
+/*
+    Copyright (c) 2013 GoPivotal, Inc.  All rights reserved.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#ifndef NN_XMIRROR_INCLUDED
+#define NN_XMIRROR_INCLUDED
+
+#include "../../protocol.h"
+
+extern struct nn_socktype *nn_xmirror_socktype;
+
+int nn_xmirror_create (void *hint, struct nn_sockbase **sockbase);
+int nn_xmirror_ispeer (int socktype);
+
+#endif
diff --git a/src/sync.h b/src/sync.h
new file mode 100644
index 0000000..e83f4d3
--- /dev/null
+++ b/src/sync.h
@@ -0,0 +1,40 @@
+/*
+    Copyright (c) 2013 GoPivotal, Inc.  All rights reserved.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#ifndef SYNC_H_INCLUDED
+#define SYNC_H_INCLUDED
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define NN_PROTO_SYNC 8
+
+#define NN_MASTER (NN_PROTO_SYNC * 16 + 0)
+#define NN_MIRROR (NN_PROTO_SYNC * 16 + 1)
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
+
diff --git a/tests/sync.c b/tests/sync.c
new file mode 100644
index 0000000..ac46408
--- /dev/null
+++ b/tests/sync.c
@@ -0,0 +1,55 @@
+/*
+    Copyright (c) 2013 GoPivotal, Inc.  All rights reserved.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#include "../src/nn.h"
+#include "../src/sync.h"
+
+#include "testutil.h"
+
+#define SOCKET_ADDRESS "inproc://a"
+
+int main ()
+{
+    int rc;
+    int master;
+    int mirror;
+    char buf [3];
+
+    /*  Check whether unitialised state is sent to the mirror when it
+        connects. */
+    master = test_socket (AF_SP, NN_MASTER);
+    test_send (master, "A");
+    test_bind (master, SOCKET_ADDRESS);
+    mirror = test_socket (AF_SP, NN_MIRROR);
+    test_connect (mirror, SOCKET_ADDRESS);
+    test_recv (mirror, "A");
+
+    /*  Test whether changing of the state results in updating the mirror. */
+    test_send (master, "B");
+    test_recv (mirror, "B");
+
+    test_close (mirror);
+    test_close (master);
+
+    return 0;
+}
+