First version of survey pattern

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 530cead..55a5983 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -121,6 +121,15 @@
     patterns/reqrep/xreq.h
     patterns/reqrep/xreq.c
 
+    patterns/survey/respondent.h
+    patterns/survey/respondent.c
+    patterns/survey/surveyor.h
+    patterns/survey/surveyor.c
+    patterns/survey/xrespondent.h
+    patterns/survey/xrespondent.c
+    patterns/survey/xsurveyor.h
+    patterns/survey/xsurveyor.c
+
     transports/inproc/inproc.h
     transports/inproc/inproc_ctx.h
     transports/inproc/inproc_ctx.c
diff --git a/src/core/ctx.c b/src/core/ctx.c
index dd7a2bb..2fdc622 100644
--- a/src/core/ctx.c
+++ b/src/core/ctx.c
@@ -55,6 +55,10 @@
 #include "../patterns/fanout/pull.h"
 #include "../patterns/fanout/xpush.h"
 #include "../patterns/fanout/xpull.h"
+#include "../patterns/survey/respondent.h"
+#include "../patterns/survey/surveyor.h"
+#include "../patterns/survey/xrespondent.h"
+#include "../patterns/survey/xsurveyor.h"
 
 #include <stddef.h>
 
@@ -220,6 +224,10 @@
     sp_ctx_add_socktype (sp_push_socktype);
     sp_ctx_add_socktype (sp_pull_socktype);
     sp_ctx_add_socktype (sp_xpull_socktype);
+    sp_ctx_add_socktype (sp_respondent_socktype);
+    sp_ctx_add_socktype (sp_surveyor_socktype);
+//    sp_ctx_add_socktype (sp_xrespondent_socktype);
+    sp_ctx_add_socktype (sp_xsurveyor_socktype);
 
     sp_glock_unlock ();
 
diff --git a/src/patterns/survey/respondent.c b/src/patterns/survey/respondent.c
new file mode 100644
index 0000000..80bbc85
--- /dev/null
+++ b/src/patterns/survey/respondent.c
@@ -0,0 +1,113 @@
+/*
+    Copyright (c) 2012 250bpm s.r.o.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#include "respondent.h"
+#include "xrespondent.h"
+
+#include "../../sp.h"
+
+#include "../../utils/err.h"
+#include "../../utils/cont.h"
+#include "../../utils/fast.h"
+#include "../../utils/alloc.h"
+
+#include <stdint.h>
+
+#define SP_RESPONDENT_INPROGRESS 1
+
+struct sp_respondent {
+    struct sp_xrespondent xrespondent;
+    uint32_t surveyid;
+    uint32_t flags;
+};
+
+/*  Implementation of sp_sockbase's virtual functions. */
+static void sp_respondent_term (struct sp_sockbase *self);
+static int sp_respondent_send (struct sp_sockbase *self, const void *buf,
+    size_t len);
+static int sp_respondent_recv (struct sp_sockbase *self, void *buf,
+    size_t *len);
+static const struct sp_sockbase_vfptr sp_respondent_sockbase_vfptr = {
+    sp_respondent_term,
+    sp_xrespondent_add,
+    sp_xrespondent_rm,
+    sp_xrespondent_in,
+    sp_xrespondent_out,
+    sp_respondent_send,
+    sp_respondent_recv,
+    sp_xrespondent_setopt,
+    sp_xrespondent_getopt
+};
+
+static void sp_respondent_init (struct sp_respondent *self,
+    const struct sp_sockbase_vfptr *vfptr, int fd)
+{
+    sp_xrespondent_init (&self->xrespondent, vfptr, fd);
+    self->flags = 0;
+}
+
+void sp_respondent_term (struct sp_sockbase *self)
+{
+    struct sp_respondent *respondent;
+
+    respondent = sp_cont (self, struct sp_respondent, xrespondent.sockbase);
+
+    sp_xrespondent_term (self);
+}
+
+static int sp_respondent_send (struct sp_sockbase *self, const void *buf,
+    size_t len)
+{
+    struct sp_respondent *respondent;
+
+    respondent = sp_cont (self, struct sp_respondent, xrespondent.sockbase);
+
+    sp_assert (0);
+}
+
+static int sp_respondent_recv (struct sp_sockbase *self, void *buf, size_t *len)
+{
+    struct sp_respondent *respondent;
+
+    respondent = sp_cont (self, struct sp_respondent, xrespondent.sockbase);
+
+    sp_assert (0);
+}
+
+static struct sp_sockbase *sp_respondent_create (int fd)
+{
+    struct sp_respondent *self;
+
+    self = sp_alloc (sizeof (struct sp_respondent));
+    alloc_assert (self);
+    sp_respondent_init (self, &sp_respondent_sockbase_vfptr, fd);
+    return &self->xrespondent.sockbase;
+}
+
+static struct sp_socktype sp_respondent_socktype_struct = {
+    AF_SP,
+    SP_RESPONDENT,
+    sp_respondent_create
+};
+
+struct sp_socktype *sp_respondent_socktype = &sp_respondent_socktype_struct;
+
diff --git a/src/patterns/survey/respondent.h b/src/patterns/survey/respondent.h
new file mode 100644
index 0000000..b2a0000
--- /dev/null
+++ b/src/patterns/survey/respondent.h
@@ -0,0 +1,30 @@
+/*
+    Copyright (c) 2012 250bpm s.r.o.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#ifndef SP_RESPONDENT_INCLUDED
+#define SP_RESPONDENT_INCLUDED
+
+#include "../../pattern.h"
+
+extern struct sp_socktype *sp_respondent_socktype;
+
+#endif
diff --git a/src/patterns/survey/surveyor.c b/src/patterns/survey/surveyor.c
new file mode 100644
index 0000000..97d265d
--- /dev/null
+++ b/src/patterns/survey/surveyor.c
@@ -0,0 +1,180 @@
+/*
+    Copyright (c) 2012 250bpm s.r.o.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#include "surveyor.h"
+#include "xsurveyor.h"
+
+#include "../../utils/err.h"
+#include "../../utils/cont.h"
+#include "../../utils/fast.h"
+#include "../../utils/alloc.h"
+
+#define SP_SURVEYOR_DEFAULT_DEADLINE_IVL 1000
+
+#define SP_SURVEYOR_INPROGRESS 1
+
+struct sp_surveyor {
+    struct sp_xsurveyor xsurveyor;
+    const struct sp_cp_sink *sink;
+    uint32_t flags;
+    uint32_t surveyid;
+    int deadline_ivl;
+    struct sp_timer deadline_timer;
+};
+
+/*  Implementation of sp_sockbase's virtual functions. */
+static void sp_surveyor_term (struct sp_sockbase *self);
+static int sp_surveyor_send (struct sp_sockbase *self, const void *buf,
+    size_t len);
+static int sp_surveyor_recv (struct sp_sockbase *self, void *buf, size_t *len);
+static const struct sp_sockbase_vfptr sp_surveyor_sockbase_vfptr = {
+    sp_surveyor_term,
+    sp_xsurveyor_add,
+    sp_xsurveyor_rm,
+    sp_xsurveyor_in,
+    sp_xsurveyor_out,
+    sp_surveyor_send,
+    sp_surveyor_recv,
+    sp_xsurveyor_setopt,
+    sp_xsurveyor_getopt
+};
+
+/*  Event sink. */
+static void sp_surveyor_timeout (const struct sp_cp_sink **self,
+    struct sp_timer *timer);
+static const struct sp_cp_sink sp_surveyor_sink = {
+    NULL,
+    NULL,
+    NULL,
+    NULL,
+    NULL,
+    sp_surveyor_timeout
+};
+
+static void sp_surveyor_init (struct sp_surveyor *self,
+    const struct sp_sockbase_vfptr *vfptr, int fd)
+{
+    sp_xsurveyor_init (&self->xsurveyor, vfptr, fd);
+    self->sink = &sp_surveyor_sink;
+    self->flags = 0;
+
+    /*  Start assigning survey IDs beginning with a random number. This way
+        there should be no key clashes even if the executable is re-started. */
+    sp_random_generate (&self->surveyid, sizeof (self->surveyid));
+
+    self->deadline_ivl = SP_SURVEYOR_DEFAULT_DEADLINE_IVL;
+    sp_timer_init (&self->deadline_timer, &self->sink,
+        sp_sockbase_getcp (&self->xsurveyor.sockbase));
+}
+
+void sp_surveyor_term (struct sp_sockbase *self)
+{
+    struct sp_surveyor *surveyor;
+
+    surveyor = sp_cont (self, struct sp_surveyor, xsurveyor.sockbase);
+
+    sp_timer_term (&surveyor->deadline_timer);
+    sp_xsurveyor_term (&surveyor->xsurveyor.sockbase);
+}
+
+static int sp_surveyor_send (struct sp_sockbase *self, const void *buf,
+    size_t len)
+{
+    int rc;
+    struct sp_surveyor *surveyor;
+    size_t surveylen;
+    void *survey;
+
+    surveyor = sp_cont (self, struct sp_surveyor, xsurveyor.sockbase);
+
+    /*  Cancel any ongoing survey. */
+    if (sp_slow (surveyor->flags & SP_SURVEYOR_INPROGRESS)) {
+        surveyor->flags &= ~SP_SURVEYOR_INPROGRESS;
+        sp_timer_stop (&surveyor->deadline_timer);
+    }
+
+    /*  Generate new survey ID. */
+    ++surveyor->surveyid;
+
+    /*  Tag the survey body with survey ID. */
+    /*  TODO: Do this using iovecs. */
+    surveylen = sizeof (uint32_t) + len;
+    survey = sp_alloc (surveylen);
+    alloc_assert (survey);
+    sp_putl (survey, surveyor->surveyid);
+    memcpy (((uint32_t*) survey) + 1, buf, len);
+
+    /*  Send the survey. */
+    rc = sp_xsurveyor_send (&surveyor->xsurveyor.sockbase, survey, surveylen);
+    sp_free (survey);
+    if (sp_slow (rc < 0))
+        return rc;
+
+    surveyor->flags |= SP_SURVEYOR_INPROGRESS;
+
+    /*  Set up the re-send timer. */
+    sp_timer_start (&surveyor->deadline_timer, surveyor->deadline_ivl);
+
+    return 0;
+}
+
+static int sp_surveyor_recv (struct sp_sockbase *self, void *buf, size_t *len)
+{
+    struct sp_surveyor *surveyor;
+
+    surveyor = sp_cont (self, struct sp_surveyor, xsurveyor.sockbase);
+
+    /*  If no survey is going on return EFSM error. */
+    if (sp_slow (!(surveyor->flags & SP_SURVEYOR_INPROGRESS)))
+       return -EFSM;
+
+    sp_assert (0);
+}
+
+static void sp_surveyor_timeout (const struct sp_cp_sink **self,
+    struct sp_timer *timer)
+{
+    struct sp_surveyor *surveyor;
+
+    surveyor = sp_cont (self, struct sp_surveyor, sink);
+
+    sp_assert (0);
+}
+
+static struct sp_sockbase *sp_surveyor_create (int fd)
+{
+    struct sp_surveyor *self;
+
+    self = sp_alloc (sizeof (struct sp_surveyor));
+    alloc_assert (self);
+    sp_surveyor_init (self, &sp_surveyor_sockbase_vfptr, fd);
+    return &self->xsurveyor.sockbase;
+}
+
+static struct sp_socktype sp_surveyor_socktype_struct = {
+    AF_SP,
+    SP_SURVEYOR,
+    sp_surveyor_create
+};
+
+struct sp_socktype *sp_surveyor_socktype = &sp_surveyor_socktype_struct;
+
diff --git a/src/patterns/survey/surveyor.h b/src/patterns/survey/surveyor.h
new file mode 100644
index 0000000..88f9d03
--- /dev/null
+++ b/src/patterns/survey/surveyor.h
@@ -0,0 +1,31 @@
+/*
+    Copyright (c) 2012 250bpm s.r.o.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#ifndef SP_SURVEYOR_INCLUDED
+#define SP_SURVEYOR_INCLUDED
+
+#include "../../pattern.h"
+
+extern struct sp_socktype *sp_surveyor_socktype;
+
+#endif
+
diff --git a/src/patterns/survey/xrespondent.c b/src/patterns/survey/xrespondent.c
new file mode 100644
index 0000000..8b3c37d
--- /dev/null
+++ b/src/patterns/survey/xrespondent.c
@@ -0,0 +1,131 @@
+/*
+    Copyright (c) 2012 250bpm s.r.o.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#include "xrespondent.h"
+
+#include "../../sp.h"
+
+#include "../../utils/err.h"
+#include "../../utils/cont.h"
+#include "../../utils/fast.h"
+#include "../../utils/alloc.h"
+
+/*  Implementation of sp_sockbase's virtual functions. */
+static const struct sp_sockbase_vfptr sp_xrespondent_sockbase_vfptr = {
+    sp_xrespondent_term,
+    sp_xrespondent_add,
+    sp_xrespondent_rm,
+    sp_xrespondent_in,
+    sp_xrespondent_out,
+    sp_xrespondent_send,
+    sp_xrespondent_recv,
+    sp_xrespondent_setopt,
+    sp_xrespondent_getopt
+};
+
+void sp_xrespondent_init (struct sp_xrespondent *self,
+    const struct sp_sockbase_vfptr *vfptr, int fd)
+{
+    sp_sockbase_init (&self->sockbase, vfptr, fd);
+    sp_excl_init (&self->excl);
+}
+
+void sp_xrespondent_term (struct sp_sockbase *self)
+{
+    struct sp_xrespondent *xrespondent;
+
+    xrespondent = sp_cont (self, struct sp_xrespondent, sockbase);
+
+    sp_excl_term (&xrespondent->excl);
+}
+
+int sp_xrespondent_add (struct sp_sockbase *self, struct sp_pipe *pipe)
+{
+    return sp_excl_add (&sp_cont (self, struct sp_xrespondent, sockbase)->excl,
+        pipe);
+}
+
+void sp_xrespondent_rm (struct sp_sockbase *self, struct sp_pipe *pipe)
+{
+    sp_excl_rm (&sp_cont (self, struct sp_xrespondent, sockbase)->excl, pipe);
+}
+
+int sp_xrespondent_in (struct sp_sockbase *self, struct sp_pipe *pipe)
+{
+    return sp_excl_in (&sp_cont (self, struct sp_xrespondent, sockbase)->excl,
+        pipe);
+}
+
+int sp_xrespondent_out (struct sp_sockbase *self, struct sp_pipe *pipe)
+{
+    return sp_excl_out (&sp_cont (self, struct sp_xrespondent, sockbase)->excl,
+        pipe);
+}
+
+int sp_xrespondent_send (struct sp_sockbase *self, const void *buf,
+    size_t len)
+{
+    return sp_excl_send (&sp_cont (self, struct sp_xrespondent, sockbase)->excl,
+        buf, len);
+}
+
+int sp_xrespondent_recv (struct sp_sockbase *self, void *buf,
+    size_t *len)
+{
+    int rc;
+    struct sp_xrespondent *xrespondent;
+
+    xrespondent = sp_cont (self, struct sp_xrespondent, sockbase);
+
+    return sp_excl_recv (&xrespondent->excl, buf, len);
+}
+
+int sp_xrespondent_setopt (struct sp_sockbase *self, int option,
+        const void *optval, size_t optvallen)
+{
+    return -ENOPROTOOPT;
+}
+
+int sp_xrespondent_getopt (struct sp_sockbase *self, int option,
+        void *optval, size_t *optvallen)
+{
+    return -ENOPROTOOPT;
+}
+
+static struct sp_sockbase *sp_xrespondent_create (int fd)
+{
+    struct sp_xrespondent *self;
+
+    self = sp_alloc (sizeof (struct sp_xrespondent));
+    alloc_assert (self);
+    sp_xrespondent_init (self, &sp_xrespondent_sockbase_vfptr, fd);
+    return &self->sockbase;
+}
+
+static struct sp_socktype sp_xrespondent_socktype_struct = {
+    AF_SP_RAW,
+    SP_RESPONDENT,
+    sp_xrespondent_create
+};
+
+struct sp_socktype *sp_xrespondent_socktype = &sp_xrespondent_socktype_struct;
+
diff --git a/src/patterns/survey/xrespondent.h b/src/patterns/survey/xrespondent.h
new file mode 100644
index 0000000..326cc88
--- /dev/null
+++ b/src/patterns/survey/xrespondent.h
@@ -0,0 +1,52 @@
+/*
+    Copyright (c) 2012 250bpm s.r.o.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#ifndef SP_XRESPONDENT_INCLUDED
+#define SP_XRESPONDENT_INCLUDED
+
+#include "../../pattern.h"
+
+#include "../../utils/excl.h"
+
+extern struct sp_socktype *sp_xrespondent_socktype;
+
+struct sp_xrespondent {
+    struct sp_sockbase sockbase;
+    struct sp_excl excl;
+};
+
+void sp_xrespondent_init (struct sp_xrespondent *self,
+    const struct sp_sockbase_vfptr *vfptr, int fd);
+
+void sp_xrespondent_term (struct sp_sockbase *self);
+int sp_xrespondent_add (struct sp_sockbase *self, struct sp_pipe *pipe);
+void sp_xrespondent_rm (struct sp_sockbase *self, struct sp_pipe *pipe);
+int sp_xrespondent_in (struct sp_sockbase *self, struct sp_pipe *pipe);
+int sp_xrespondent_out (struct sp_sockbase *self, struct sp_pipe *pipe);
+int sp_xrespondent_send (struct sp_sockbase *self, const void *buf, size_t len);
+int sp_xrespondent_recv (struct sp_sockbase *self, void *buf, size_t *len);
+int sp_xrespondent_setopt (struct sp_sockbase *self, int option,
+    const void *optval, size_t optvallen);
+int sp_xrespondent_getopt (struct sp_sockbase *self, int option,
+    void *optval, size_t *optvallen);
+
+#endif
diff --git a/src/patterns/survey/xsurveyor.c b/src/patterns/survey/xsurveyor.c
new file mode 100644
index 0000000..cab4fe1
--- /dev/null
+++ b/src/patterns/survey/xsurveyor.c
@@ -0,0 +1,204 @@
+/*
+    Copyright (c) 2012 250bpm s.r.o.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#include "xsurveyor.h"
+#include "../../utils/err.h"
+#include "../../utils/cont.h"
+#include "../../utils/fast.h"
+#include "../../utils/list.h"
+#include "../../utils/alloc.h"
+
+#include <stddef.h>
+
+/*  Implementation of sp_sockbase's virtual functions. */
+static const struct sp_sockbase_vfptr sp_xsurveyor_sockbase_vfptr = {
+    sp_xsurveyor_term,
+    sp_xsurveyor_add,
+    sp_xsurveyor_rm,
+    sp_xsurveyor_in,
+    sp_xsurveyor_out,
+    sp_xsurveyor_send,
+    sp_xsurveyor_recv,
+    sp_xsurveyor_setopt,
+    sp_xsurveyor_getopt
+};
+
+void sp_xsurveyor_init (struct sp_xsurveyor *self,
+    const struct sp_sockbase_vfptr *vfptr, int fd)
+{
+    sp_sockbase_init (&self->sockbase, vfptr, fd);
+    sp_list_init (&self->outpipes);
+    sp_list_init (&self->inpipes);
+    self->current = NULL;
+}
+
+void sp_xsurveyor_term (struct sp_sockbase *self)
+{
+    struct sp_xsurveyor *xsurveyor;
+
+    xsurveyor = sp_cont (self, struct sp_xsurveyor, sockbase);
+
+    sp_list_term (&xsurveyor->inpipes);
+    sp_list_term (&xsurveyor->outpipes);
+}
+
+int sp_xsurveyor_add (struct sp_sockbase *self, struct sp_pipe *pipe)
+{
+    struct sp_xsurveyor_data *data;
+
+    data = sp_alloc (sizeof (struct sp_xsurveyor_data));
+    alloc_assert (data);
+    data->pipe = pipe;
+    sp_pipe_setdata (pipe, data);
+
+    return 0;
+}
+
+void sp_xsurveyor_rm (struct sp_sockbase *self, struct sp_pipe *pipe)
+{
+    struct sp_xsurveyor_data *data;
+
+    data = sp_pipe_getdata (pipe);
+
+    /*  TODO: If pipe is in the pipe lists, remove it. */
+
+    sp_free (data);
+}
+
+int sp_xsurveyor_in (struct sp_sockbase *self, struct sp_pipe *pipe)
+{
+    struct sp_xsurveyor *xsurveyor;
+    struct sp_xsurveyor_data *data;
+    int result;
+
+    xsurveyor = sp_cont (self, struct sp_xsurveyor, sockbase);
+    data = sp_pipe_getdata (pipe);
+    result = sp_list_empty (&xsurveyor->inpipes) ? 1 : 0;
+    if (result)
+        xsurveyor->current = data;
+    sp_list_insert (&xsurveyor->inpipes, &data->initem,
+        sp_list_end (&xsurveyor->inpipes));
+    return result;
+}
+
+int sp_xsurveyor_out (struct sp_sockbase *self, struct sp_pipe *pipe)
+{
+    struct sp_xsurveyor *xsurveyor;
+    struct sp_xsurveyor_data *data;
+    int result;
+
+    xsurveyor = sp_cont (self, struct sp_xsurveyor, sockbase);
+    data = sp_pipe_getdata (pipe);
+
+    result = sp_list_empty (&xsurveyor->outpipes) ? 1 : 0;
+    sp_list_insert (&xsurveyor->outpipes, &data->outitem,
+        sp_list_end (&xsurveyor->outpipes));
+    return result;
+}
+
+int sp_xsurveyor_send (struct sp_sockbase *self, const void *buf,
+    size_t len)
+{
+    int rc;
+    struct sp_list_item *it;
+    struct sp_xsurveyor_data *data;
+    struct sp_xsurveyor *xsurveyor;
+
+    xsurveyor = sp_cont (self, struct sp_xsurveyor, sockbase);
+
+    /*  Send the survey to all the respondents. */
+    it = sp_list_begin (&xsurveyor->outpipes);
+    while (it != sp_list_end (&xsurveyor->outpipes)) {
+       data = sp_cont (it, struct sp_xsurveyor_data, outitem);
+       rc = sp_pipe_send (data->pipe, buf, len);
+       errnum_assert (rc >= 0, -rc);
+       if (rc & SP_PIPE_RELEASE) {
+           it = sp_list_erase (&xsurveyor->outpipes, it);
+           continue;
+       }
+       it = sp_list_next (&xsurveyor->outpipes, it);
+    }
+
+    return 0;
+}
+
+int sp_xsurveyor_recv (struct sp_sockbase *self, void *buf, size_t *len)
+{
+    int rc;
+    struct sp_xsurveyor *xsurveyor;
+    struct sp_list_item *it;
+
+    xsurveyor = sp_cont (self, struct sp_xsurveyor, sockbase);
+
+    /*  Current is NULL only when there are no avialable inbound pipes. */
+    if (sp_slow (!xsurveyor->current))
+        return -EAGAIN;
+
+    /*  Get the messsage. */
+    rc = sp_pipe_recv (xsurveyor->current->pipe, buf, len);
+    errnum_assert (rc >= 0, -rc);
+
+    /*  Move the current pointer to next pipe. */
+    if (rc & SP_PIPE_RELEASE)
+        it = sp_list_erase (&xsurveyor->inpipes, &xsurveyor->current->initem);
+    else
+        it = sp_list_next (&xsurveyor->inpipes, &xsurveyor->current->initem);
+    if (!it)
+        it = sp_list_begin (&xsurveyor->inpipes);
+    if (!it)
+        xsurveyor->current = NULL;
+    else
+        xsurveyor->current = sp_cont (it, struct sp_xsurveyor_data, initem);
+
+    return 0;
+}
+
+int sp_xsurveyor_setopt (struct sp_sockbase *self, int option,
+        const void *optval, size_t optvallen)
+{
+    return -ENOPROTOOPT;
+}
+
+int sp_xsurveyor_getopt (struct sp_sockbase *self, int option,
+        void *optval, size_t *optvallen)
+{
+    return -ENOPROTOOPT;
+}
+
+static struct sp_sockbase *sp_xsurveyor_create (int fd)
+{
+    struct sp_xsurveyor *self;
+
+    self = sp_alloc (sizeof (struct sp_xsurveyor));
+    alloc_assert (self);
+    sp_xsurveyor_init (self, &sp_xsurveyor_sockbase_vfptr, fd);
+    return &self->sockbase;
+}
+
+static struct sp_socktype sp_xsurveyor_socktype_struct = {
+    AF_SP_RAW,
+    SP_SURVEYOR,
+    sp_xsurveyor_create
+};
+
+struct sp_socktype *sp_xsurveyor_socktype = &sp_xsurveyor_socktype_struct;
+
diff --git a/src/patterns/survey/xsurveyor.h b/src/patterns/survey/xsurveyor.h
new file mode 100644
index 0000000..a31c1f9
--- /dev/null
+++ b/src/patterns/survey/xsurveyor.h
@@ -0,0 +1,68 @@
+/*
+    Copyright (c) 2012 250bpm s.r.o.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#ifndef SP_XSURVEYOR_INCLUDED
+#define SP_XSURVEYOR_INCLUDED
+
+#include "../../pattern.h"
+
+extern struct sp_socktype *sp_xsurveyor_socktype;
+
+struct sp_xsurveyor_data {
+    struct sp_pipe *pipe;
+    struct sp_list_item outitem;
+    struct sp_list_item initem;
+};
+
+struct sp_xsurveyor {
+
+    /*  The generic socket base class. */
+    struct sp_sockbase sockbase;
+
+    /*  List of pipes to send messages to. */
+    struct sp_list outpipes;
+
+    /*  List of pipes that we can get messages from. */
+    struct sp_list inpipes;
+
+    /*  Next pipe to receive from. */
+    struct sp_xsurveyor_data *current;
+
+};
+
+void sp_xsurveyor_init (struct sp_xsurveyor *self,
+    const struct sp_sockbase_vfptr *vfptr, int fd);
+
+void sp_xsurveyor_term (struct sp_sockbase *self);
+int sp_xsurveyor_add (struct sp_sockbase *self, struct sp_pipe *pipe);
+void sp_xsurveyor_rm (struct sp_sockbase *self, struct sp_pipe *pipe);
+int sp_xsurveyor_in (struct sp_sockbase *self, struct sp_pipe *pipe);
+int sp_xsurveyor_out (struct sp_sockbase *self, struct sp_pipe *pipe);
+int sp_xsurveyor_send (struct sp_sockbase *self, const void *buf, size_t len);
+int sp_xsurveyor_recv (struct sp_sockbase *self, void *buf, size_t *len);
+int sp_xsurveyor_setopt (struct sp_sockbase *self, int option,
+    const void *optval, size_t optvallen);
+int sp_xsurveyor_getopt (struct sp_sockbase *self, int option,
+    void *optval, size_t *optvallen);
+
+#endif
+
diff --git a/src/sp.h b/src/sp.h
index a1b438e..87978d3 100644
--- a/src/sp.h
+++ b/src/sp.h
@@ -155,6 +155,8 @@
 #define SP_SOURCE 7
 #define SP_PUSH 8
 #define SP_PULL 9
+#define SP_SURVEYOR 10
+#define SP_RESPONDENT 11
 
 /*  Socket option levels.                                                     */
 #define SP_SOL_SOCKET 1
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 61fa442..a1dd00e 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -38,6 +38,7 @@
 add_libnanomsg_test (reqrep)
 add_libnanomsg_test (fanin)
 add_libnanomsg_test (fanout)
+add_libnanomsg_test (survey)
 
 #  Feature tests.
 add_libnanomsg_test (block)
diff --git a/tests/survey.c b/tests/survey.c
new file mode 100644
index 0000000..5d54ef2
--- /dev/null
+++ b/tests/survey.c
@@ -0,0 +1,87 @@
+/*
+    Copyright (c) 2012 250bpm s.r.o.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"),
+    to deal in the Software without restriction, including without limitation
+    the rights to use, copy, modify, merge, publish, distribute, sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#include "../src/sp.h"
+
+#include "../src/utils/err.c"
+
+int main ()
+{
+    int rc;
+    int surveyor;
+    int respondent1;
+    int respondent2;
+    int resend_ivl;
+    char buf [7];
+
+    rc = sp_init ();
+    errno_assert (rc == 0);
+    surveyor = sp_socket (AF_SP, SP_SURVEYOR);
+    errno_assert (surveyor != -1);
+    rc = sp_bind (surveyor, "inproc://a");
+    errno_assert (rc >= 0);
+    respondent1 = sp_socket (AF_SP, SP_RESPONDENT);
+    errno_assert (respondent1 != -1);
+    rc = sp_connect (respondent1, "inproc://a");
+    errno_assert (rc >= 0);
+    respondent2 = sp_socket (AF_SP, SP_RESPONDENT);
+    errno_assert (respondent2 != -1);
+    rc = sp_connect (respondent2, "inproc://a");
+    errno_assert (rc >= 0);
+
+    rc = sp_send (surveyor, "ABC", 3, 0);
+    errno_assert (rc >= 0);
+    sp_assert (rc == 3);
+
+    rc = sp_recv (respondent1, buf, sizeof (buf), 0);
+    errno_assert (rc >= 0);
+    sp_assert (rc == 3);
+    rc = sp_send (respondent1, "DEF", 3, 0);
+    errno_assert (rc >= 0);
+    sp_assert (rc == 3);
+
+    rc = sp_recv (respondent2, buf, sizeof (buf), 0);
+    errno_assert (rc >= 0);
+    sp_assert (rc == 3);
+    rc = sp_send (respondent2, "DEF", 3, 0);
+    errno_assert (rc >= 0);
+    sp_assert (rc == 3);
+
+    rc = sp_recv (surveyor, buf, sizeof (buf), 0);
+    errno_assert (rc >= 0);
+    sp_assert (rc == 3);
+    rc = sp_recv (surveyor, buf, sizeof (buf), 0);
+    errno_assert (rc >= 0);
+    sp_assert (rc == 3);
+
+    rc = sp_close (surveyor);
+    errno_assert (rc == 0);
+    rc = sp_close (respondent1);
+    errno_assert (rc == 0);    
+    rc = sp_close (respondent2);
+    errno_assert (rc == 0);
+    rc = sp_term ();
+    errno_assert (rc == 0);
+
+    return 0;
+}
+