First version of survey pattern
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 530cead..55a5983 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -121,6 +121,15 @@
patterns/reqrep/xreq.h
patterns/reqrep/xreq.c
+ patterns/survey/respondent.h
+ patterns/survey/respondent.c
+ patterns/survey/surveyor.h
+ patterns/survey/surveyor.c
+ patterns/survey/xrespondent.h
+ patterns/survey/xrespondent.c
+ patterns/survey/xsurveyor.h
+ patterns/survey/xsurveyor.c
+
transports/inproc/inproc.h
transports/inproc/inproc_ctx.h
transports/inproc/inproc_ctx.c
diff --git a/src/core/ctx.c b/src/core/ctx.c
index dd7a2bb..2fdc622 100644
--- a/src/core/ctx.c
+++ b/src/core/ctx.c
@@ -55,6 +55,10 @@
#include "../patterns/fanout/pull.h"
#include "../patterns/fanout/xpush.h"
#include "../patterns/fanout/xpull.h"
+#include "../patterns/survey/respondent.h"
+#include "../patterns/survey/surveyor.h"
+#include "../patterns/survey/xrespondent.h"
+#include "../patterns/survey/xsurveyor.h"
#include <stddef.h>
@@ -220,6 +224,10 @@
sp_ctx_add_socktype (sp_push_socktype);
sp_ctx_add_socktype (sp_pull_socktype);
sp_ctx_add_socktype (sp_xpull_socktype);
+ sp_ctx_add_socktype (sp_respondent_socktype);
+ sp_ctx_add_socktype (sp_surveyor_socktype);
+// sp_ctx_add_socktype (sp_xrespondent_socktype);
+ sp_ctx_add_socktype (sp_xsurveyor_socktype);
sp_glock_unlock ();
diff --git a/src/patterns/survey/respondent.c b/src/patterns/survey/respondent.c
new file mode 100644
index 0000000..80bbc85
--- /dev/null
+++ b/src/patterns/survey/respondent.c
@@ -0,0 +1,113 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include "respondent.h"
+#include "xrespondent.h"
+
+#include "../../sp.h"
+
+#include "../../utils/err.h"
+#include "../../utils/cont.h"
+#include "../../utils/fast.h"
+#include "../../utils/alloc.h"
+
+#include <stdint.h>
+
+#define SP_RESPONDENT_INPROGRESS 1
+
+struct sp_respondent {
+ struct sp_xrespondent xrespondent;
+ uint32_t surveyid;
+ uint32_t flags;
+};
+
+/* Implementation of sp_sockbase's virtual functions. */
+static void sp_respondent_term (struct sp_sockbase *self);
+static int sp_respondent_send (struct sp_sockbase *self, const void *buf,
+ size_t len);
+static int sp_respondent_recv (struct sp_sockbase *self, void *buf,
+ size_t *len);
+static const struct sp_sockbase_vfptr sp_respondent_sockbase_vfptr = {
+ sp_respondent_term,
+ sp_xrespondent_add,
+ sp_xrespondent_rm,
+ sp_xrespondent_in,
+ sp_xrespondent_out,
+ sp_respondent_send,
+ sp_respondent_recv,
+ sp_xrespondent_setopt,
+ sp_xrespondent_getopt
+};
+
+static void sp_respondent_init (struct sp_respondent *self,
+ const struct sp_sockbase_vfptr *vfptr, int fd)
+{
+ sp_xrespondent_init (&self->xrespondent, vfptr, fd);
+ self->flags = 0;
+}
+
+void sp_respondent_term (struct sp_sockbase *self)
+{
+ struct sp_respondent *respondent;
+
+ respondent = sp_cont (self, struct sp_respondent, xrespondent.sockbase);
+
+ sp_xrespondent_term (self);
+}
+
+static int sp_respondent_send (struct sp_sockbase *self, const void *buf,
+ size_t len)
+{
+ struct sp_respondent *respondent;
+
+ respondent = sp_cont (self, struct sp_respondent, xrespondent.sockbase);
+
+ sp_assert (0);
+}
+
+static int sp_respondent_recv (struct sp_sockbase *self, void *buf, size_t *len)
+{
+ struct sp_respondent *respondent;
+
+ respondent = sp_cont (self, struct sp_respondent, xrespondent.sockbase);
+
+ sp_assert (0);
+}
+
+static struct sp_sockbase *sp_respondent_create (int fd)
+{
+ struct sp_respondent *self;
+
+ self = sp_alloc (sizeof (struct sp_respondent));
+ alloc_assert (self);
+ sp_respondent_init (self, &sp_respondent_sockbase_vfptr, fd);
+ return &self->xrespondent.sockbase;
+}
+
+static struct sp_socktype sp_respondent_socktype_struct = {
+ AF_SP,
+ SP_RESPONDENT,
+ sp_respondent_create
+};
+
+struct sp_socktype *sp_respondent_socktype = &sp_respondent_socktype_struct;
+
diff --git a/src/patterns/survey/respondent.h b/src/patterns/survey/respondent.h
new file mode 100644
index 0000000..b2a0000
--- /dev/null
+++ b/src/patterns/survey/respondent.h
@@ -0,0 +1,30 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#ifndef SP_RESPONDENT_INCLUDED
+#define SP_RESPONDENT_INCLUDED
+
+#include "../../pattern.h"
+
+extern struct sp_socktype *sp_respondent_socktype;
+
+#endif
diff --git a/src/patterns/survey/surveyor.c b/src/patterns/survey/surveyor.c
new file mode 100644
index 0000000..97d265d
--- /dev/null
+++ b/src/patterns/survey/surveyor.c
@@ -0,0 +1,180 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include "surveyor.h"
+#include "xsurveyor.h"
+
+#include "../../utils/err.h"
+#include "../../utils/cont.h"
+#include "../../utils/fast.h"
+#include "../../utils/alloc.h"
+
+#define SP_SURVEYOR_DEFAULT_DEADLINE_IVL 1000
+
+#define SP_SURVEYOR_INPROGRESS 1
+
+struct sp_surveyor {
+ struct sp_xsurveyor xsurveyor;
+ const struct sp_cp_sink *sink;
+ uint32_t flags;
+ uint32_t surveyid;
+ int deadline_ivl;
+ struct sp_timer deadline_timer;
+};
+
+/* Implementation of sp_sockbase's virtual functions. */
+static void sp_surveyor_term (struct sp_sockbase *self);
+static int sp_surveyor_send (struct sp_sockbase *self, const void *buf,
+ size_t len);
+static int sp_surveyor_recv (struct sp_sockbase *self, void *buf, size_t *len);
+static const struct sp_sockbase_vfptr sp_surveyor_sockbase_vfptr = {
+ sp_surveyor_term,
+ sp_xsurveyor_add,
+ sp_xsurveyor_rm,
+ sp_xsurveyor_in,
+ sp_xsurveyor_out,
+ sp_surveyor_send,
+ sp_surveyor_recv,
+ sp_xsurveyor_setopt,
+ sp_xsurveyor_getopt
+};
+
+/* Event sink. */
+static void sp_surveyor_timeout (const struct sp_cp_sink **self,
+ struct sp_timer *timer);
+static const struct sp_cp_sink sp_surveyor_sink = {
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ sp_surveyor_timeout
+};
+
+static void sp_surveyor_init (struct sp_surveyor *self,
+ const struct sp_sockbase_vfptr *vfptr, int fd)
+{
+ sp_xsurveyor_init (&self->xsurveyor, vfptr, fd);
+ self->sink = &sp_surveyor_sink;
+ self->flags = 0;
+
+ /* Start assigning survey IDs beginning with a random number. This way
+ there should be no key clashes even if the executable is re-started. */
+ sp_random_generate (&self->surveyid, sizeof (self->surveyid));
+
+ self->deadline_ivl = SP_SURVEYOR_DEFAULT_DEADLINE_IVL;
+ sp_timer_init (&self->deadline_timer, &self->sink,
+ sp_sockbase_getcp (&self->xsurveyor.sockbase));
+}
+
+void sp_surveyor_term (struct sp_sockbase *self)
+{
+ struct sp_surveyor *surveyor;
+
+ surveyor = sp_cont (self, struct sp_surveyor, xsurveyor.sockbase);
+
+ sp_timer_term (&surveyor->deadline_timer);
+ sp_xsurveyor_term (&surveyor->xsurveyor.sockbase);
+}
+
+static int sp_surveyor_send (struct sp_sockbase *self, const void *buf,
+ size_t len)
+{
+ int rc;
+ struct sp_surveyor *surveyor;
+ size_t surveylen;
+ void *survey;
+
+ surveyor = sp_cont (self, struct sp_surveyor, xsurveyor.sockbase);
+
+ /* Cancel any ongoing survey. */
+ if (sp_slow (surveyor->flags & SP_SURVEYOR_INPROGRESS)) {
+ surveyor->flags &= ~SP_SURVEYOR_INPROGRESS;
+ sp_timer_stop (&surveyor->deadline_timer);
+ }
+
+ /* Generate new survey ID. */
+ ++surveyor->surveyid;
+
+ /* Tag the survey body with survey ID. */
+ /* TODO: Do this using iovecs. */
+ surveylen = sizeof (uint32_t) + len;
+ survey = sp_alloc (surveylen);
+ alloc_assert (survey);
+ sp_putl (survey, surveyor->surveyid);
+ memcpy (((uint32_t*) survey) + 1, buf, len);
+
+ /* Send the survey. */
+ rc = sp_xsurveyor_send (&surveyor->xsurveyor.sockbase, survey, surveylen);
+ sp_free (survey);
+ if (sp_slow (rc < 0))
+ return rc;
+
+ surveyor->flags |= SP_SURVEYOR_INPROGRESS;
+
+ /* Set up the re-send timer. */
+ sp_timer_start (&surveyor->deadline_timer, surveyor->deadline_ivl);
+
+ return 0;
+}
+
+static int sp_surveyor_recv (struct sp_sockbase *self, void *buf, size_t *len)
+{
+ struct sp_surveyor *surveyor;
+
+ surveyor = sp_cont (self, struct sp_surveyor, xsurveyor.sockbase);
+
+ /* If no survey is going on return EFSM error. */
+ if (sp_slow (!(surveyor->flags & SP_SURVEYOR_INPROGRESS)))
+ return -EFSM;
+
+ sp_assert (0);
+}
+
+static void sp_surveyor_timeout (const struct sp_cp_sink **self,
+ struct sp_timer *timer)
+{
+ struct sp_surveyor *surveyor;
+
+ surveyor = sp_cont (self, struct sp_surveyor, sink);
+
+ sp_assert (0);
+}
+
+static struct sp_sockbase *sp_surveyor_create (int fd)
+{
+ struct sp_surveyor *self;
+
+ self = sp_alloc (sizeof (struct sp_surveyor));
+ alloc_assert (self);
+ sp_surveyor_init (self, &sp_surveyor_sockbase_vfptr, fd);
+ return &self->xsurveyor.sockbase;
+}
+
+static struct sp_socktype sp_surveyor_socktype_struct = {
+ AF_SP,
+ SP_SURVEYOR,
+ sp_surveyor_create
+};
+
+struct sp_socktype *sp_surveyor_socktype = &sp_surveyor_socktype_struct;
+
diff --git a/src/patterns/survey/surveyor.h b/src/patterns/survey/surveyor.h
new file mode 100644
index 0000000..88f9d03
--- /dev/null
+++ b/src/patterns/survey/surveyor.h
@@ -0,0 +1,31 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#ifndef SP_SURVEYOR_INCLUDED
+#define SP_SURVEYOR_INCLUDED
+
+#include "../../pattern.h"
+
+extern struct sp_socktype *sp_surveyor_socktype;
+
+#endif
+
diff --git a/src/patterns/survey/xrespondent.c b/src/patterns/survey/xrespondent.c
new file mode 100644
index 0000000..8b3c37d
--- /dev/null
+++ b/src/patterns/survey/xrespondent.c
@@ -0,0 +1,131 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include "xrespondent.h"
+
+#include "../../sp.h"
+
+#include "../../utils/err.h"
+#include "../../utils/cont.h"
+#include "../../utils/fast.h"
+#include "../../utils/alloc.h"
+
+/* Implementation of sp_sockbase's virtual functions. */
+static const struct sp_sockbase_vfptr sp_xrespondent_sockbase_vfptr = {
+ sp_xrespondent_term,
+ sp_xrespondent_add,
+ sp_xrespondent_rm,
+ sp_xrespondent_in,
+ sp_xrespondent_out,
+ sp_xrespondent_send,
+ sp_xrespondent_recv,
+ sp_xrespondent_setopt,
+ sp_xrespondent_getopt
+};
+
+void sp_xrespondent_init (struct sp_xrespondent *self,
+ const struct sp_sockbase_vfptr *vfptr, int fd)
+{
+ sp_sockbase_init (&self->sockbase, vfptr, fd);
+ sp_excl_init (&self->excl);
+}
+
+void sp_xrespondent_term (struct sp_sockbase *self)
+{
+ struct sp_xrespondent *xrespondent;
+
+ xrespondent = sp_cont (self, struct sp_xrespondent, sockbase);
+
+ sp_excl_term (&xrespondent->excl);
+}
+
+int sp_xrespondent_add (struct sp_sockbase *self, struct sp_pipe *pipe)
+{
+ return sp_excl_add (&sp_cont (self, struct sp_xrespondent, sockbase)->excl,
+ pipe);
+}
+
+void sp_xrespondent_rm (struct sp_sockbase *self, struct sp_pipe *pipe)
+{
+ sp_excl_rm (&sp_cont (self, struct sp_xrespondent, sockbase)->excl, pipe);
+}
+
+int sp_xrespondent_in (struct sp_sockbase *self, struct sp_pipe *pipe)
+{
+ return sp_excl_in (&sp_cont (self, struct sp_xrespondent, sockbase)->excl,
+ pipe);
+}
+
+int sp_xrespondent_out (struct sp_sockbase *self, struct sp_pipe *pipe)
+{
+ return sp_excl_out (&sp_cont (self, struct sp_xrespondent, sockbase)->excl,
+ pipe);
+}
+
+int sp_xrespondent_send (struct sp_sockbase *self, const void *buf,
+ size_t len)
+{
+ return sp_excl_send (&sp_cont (self, struct sp_xrespondent, sockbase)->excl,
+ buf, len);
+}
+
+int sp_xrespondent_recv (struct sp_sockbase *self, void *buf,
+ size_t *len)
+{
+ int rc;
+ struct sp_xrespondent *xrespondent;
+
+ xrespondent = sp_cont (self, struct sp_xrespondent, sockbase);
+
+ return sp_excl_recv (&xrespondent->excl, buf, len);
+}
+
+int sp_xrespondent_setopt (struct sp_sockbase *self, int option,
+ const void *optval, size_t optvallen)
+{
+ return -ENOPROTOOPT;
+}
+
+int sp_xrespondent_getopt (struct sp_sockbase *self, int option,
+ void *optval, size_t *optvallen)
+{
+ return -ENOPROTOOPT;
+}
+
+static struct sp_sockbase *sp_xrespondent_create (int fd)
+{
+ struct sp_xrespondent *self;
+
+ self = sp_alloc (sizeof (struct sp_xrespondent));
+ alloc_assert (self);
+ sp_xrespondent_init (self, &sp_xrespondent_sockbase_vfptr, fd);
+ return &self->sockbase;
+}
+
+static struct sp_socktype sp_xrespondent_socktype_struct = {
+ AF_SP_RAW,
+ SP_RESPONDENT,
+ sp_xrespondent_create
+};
+
+struct sp_socktype *sp_xrespondent_socktype = &sp_xrespondent_socktype_struct;
+
diff --git a/src/patterns/survey/xrespondent.h b/src/patterns/survey/xrespondent.h
new file mode 100644
index 0000000..326cc88
--- /dev/null
+++ b/src/patterns/survey/xrespondent.h
@@ -0,0 +1,52 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#ifndef SP_XRESPONDENT_INCLUDED
+#define SP_XRESPONDENT_INCLUDED
+
+#include "../../pattern.h"
+
+#include "../../utils/excl.h"
+
+extern struct sp_socktype *sp_xrespondent_socktype;
+
+struct sp_xrespondent {
+ struct sp_sockbase sockbase;
+ struct sp_excl excl;
+};
+
+void sp_xrespondent_init (struct sp_xrespondent *self,
+ const struct sp_sockbase_vfptr *vfptr, int fd);
+
+void sp_xrespondent_term (struct sp_sockbase *self);
+int sp_xrespondent_add (struct sp_sockbase *self, struct sp_pipe *pipe);
+void sp_xrespondent_rm (struct sp_sockbase *self, struct sp_pipe *pipe);
+int sp_xrespondent_in (struct sp_sockbase *self, struct sp_pipe *pipe);
+int sp_xrespondent_out (struct sp_sockbase *self, struct sp_pipe *pipe);
+int sp_xrespondent_send (struct sp_sockbase *self, const void *buf, size_t len);
+int sp_xrespondent_recv (struct sp_sockbase *self, void *buf, size_t *len);
+int sp_xrespondent_setopt (struct sp_sockbase *self, int option,
+ const void *optval, size_t optvallen);
+int sp_xrespondent_getopt (struct sp_sockbase *self, int option,
+ void *optval, size_t *optvallen);
+
+#endif
diff --git a/src/patterns/survey/xsurveyor.c b/src/patterns/survey/xsurveyor.c
new file mode 100644
index 0000000..cab4fe1
--- /dev/null
+++ b/src/patterns/survey/xsurveyor.c
@@ -0,0 +1,204 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include "xsurveyor.h"
+#include "../../utils/err.h"
+#include "../../utils/cont.h"
+#include "../../utils/fast.h"
+#include "../../utils/list.h"
+#include "../../utils/alloc.h"
+
+#include <stddef.h>
+
+/* Implementation of sp_sockbase's virtual functions. */
+static const struct sp_sockbase_vfptr sp_xsurveyor_sockbase_vfptr = {
+ sp_xsurveyor_term,
+ sp_xsurveyor_add,
+ sp_xsurveyor_rm,
+ sp_xsurveyor_in,
+ sp_xsurveyor_out,
+ sp_xsurveyor_send,
+ sp_xsurveyor_recv,
+ sp_xsurveyor_setopt,
+ sp_xsurveyor_getopt
+};
+
+void sp_xsurveyor_init (struct sp_xsurveyor *self,
+ const struct sp_sockbase_vfptr *vfptr, int fd)
+{
+ sp_sockbase_init (&self->sockbase, vfptr, fd);
+ sp_list_init (&self->outpipes);
+ sp_list_init (&self->inpipes);
+ self->current = NULL;
+}
+
+void sp_xsurveyor_term (struct sp_sockbase *self)
+{
+ struct sp_xsurveyor *xsurveyor;
+
+ xsurveyor = sp_cont (self, struct sp_xsurveyor, sockbase);
+
+ sp_list_term (&xsurveyor->inpipes);
+ sp_list_term (&xsurveyor->outpipes);
+}
+
+int sp_xsurveyor_add (struct sp_sockbase *self, struct sp_pipe *pipe)
+{
+ struct sp_xsurveyor_data *data;
+
+ data = sp_alloc (sizeof (struct sp_xsurveyor_data));
+ alloc_assert (data);
+ data->pipe = pipe;
+ sp_pipe_setdata (pipe, data);
+
+ return 0;
+}
+
+void sp_xsurveyor_rm (struct sp_sockbase *self, struct sp_pipe *pipe)
+{
+ struct sp_xsurveyor_data *data;
+
+ data = sp_pipe_getdata (pipe);
+
+ /* TODO: If pipe is in the pipe lists, remove it. */
+
+ sp_free (data);
+}
+
+int sp_xsurveyor_in (struct sp_sockbase *self, struct sp_pipe *pipe)
+{
+ struct sp_xsurveyor *xsurveyor;
+ struct sp_xsurveyor_data *data;
+ int result;
+
+ xsurveyor = sp_cont (self, struct sp_xsurveyor, sockbase);
+ data = sp_pipe_getdata (pipe);
+ result = sp_list_empty (&xsurveyor->inpipes) ? 1 : 0;
+ if (result)
+ xsurveyor->current = data;
+ sp_list_insert (&xsurveyor->inpipes, &data->initem,
+ sp_list_end (&xsurveyor->inpipes));
+ return result;
+}
+
+int sp_xsurveyor_out (struct sp_sockbase *self, struct sp_pipe *pipe)
+{
+ struct sp_xsurveyor *xsurveyor;
+ struct sp_xsurveyor_data *data;
+ int result;
+
+ xsurveyor = sp_cont (self, struct sp_xsurveyor, sockbase);
+ data = sp_pipe_getdata (pipe);
+
+ result = sp_list_empty (&xsurveyor->outpipes) ? 1 : 0;
+ sp_list_insert (&xsurveyor->outpipes, &data->outitem,
+ sp_list_end (&xsurveyor->outpipes));
+ return result;
+}
+
+int sp_xsurveyor_send (struct sp_sockbase *self, const void *buf,
+ size_t len)
+{
+ int rc;
+ struct sp_list_item *it;
+ struct sp_xsurveyor_data *data;
+ struct sp_xsurveyor *xsurveyor;
+
+ xsurveyor = sp_cont (self, struct sp_xsurveyor, sockbase);
+
+ /* Send the survey to all the respondents. */
+ it = sp_list_begin (&xsurveyor->outpipes);
+ while (it != sp_list_end (&xsurveyor->outpipes)) {
+ data = sp_cont (it, struct sp_xsurveyor_data, outitem);
+ rc = sp_pipe_send (data->pipe, buf, len);
+ errnum_assert (rc >= 0, -rc);
+ if (rc & SP_PIPE_RELEASE) {
+ it = sp_list_erase (&xsurveyor->outpipes, it);
+ continue;
+ }
+ it = sp_list_next (&xsurveyor->outpipes, it);
+ }
+
+ return 0;
+}
+
+int sp_xsurveyor_recv (struct sp_sockbase *self, void *buf, size_t *len)
+{
+ int rc;
+ struct sp_xsurveyor *xsurveyor;
+ struct sp_list_item *it;
+
+ xsurveyor = sp_cont (self, struct sp_xsurveyor, sockbase);
+
+ /* Current is NULL only when there are no avialable inbound pipes. */
+ if (sp_slow (!xsurveyor->current))
+ return -EAGAIN;
+
+ /* Get the messsage. */
+ rc = sp_pipe_recv (xsurveyor->current->pipe, buf, len);
+ errnum_assert (rc >= 0, -rc);
+
+ /* Move the current pointer to next pipe. */
+ if (rc & SP_PIPE_RELEASE)
+ it = sp_list_erase (&xsurveyor->inpipes, &xsurveyor->current->initem);
+ else
+ it = sp_list_next (&xsurveyor->inpipes, &xsurveyor->current->initem);
+ if (!it)
+ it = sp_list_begin (&xsurveyor->inpipes);
+ if (!it)
+ xsurveyor->current = NULL;
+ else
+ xsurveyor->current = sp_cont (it, struct sp_xsurveyor_data, initem);
+
+ return 0;
+}
+
+int sp_xsurveyor_setopt (struct sp_sockbase *self, int option,
+ const void *optval, size_t optvallen)
+{
+ return -ENOPROTOOPT;
+}
+
+int sp_xsurveyor_getopt (struct sp_sockbase *self, int option,
+ void *optval, size_t *optvallen)
+{
+ return -ENOPROTOOPT;
+}
+
+static struct sp_sockbase *sp_xsurveyor_create (int fd)
+{
+ struct sp_xsurveyor *self;
+
+ self = sp_alloc (sizeof (struct sp_xsurveyor));
+ alloc_assert (self);
+ sp_xsurveyor_init (self, &sp_xsurveyor_sockbase_vfptr, fd);
+ return &self->sockbase;
+}
+
+static struct sp_socktype sp_xsurveyor_socktype_struct = {
+ AF_SP_RAW,
+ SP_SURVEYOR,
+ sp_xsurveyor_create
+};
+
+struct sp_socktype *sp_xsurveyor_socktype = &sp_xsurveyor_socktype_struct;
+
diff --git a/src/patterns/survey/xsurveyor.h b/src/patterns/survey/xsurveyor.h
new file mode 100644
index 0000000..a31c1f9
--- /dev/null
+++ b/src/patterns/survey/xsurveyor.h
@@ -0,0 +1,68 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#ifndef SP_XSURVEYOR_INCLUDED
+#define SP_XSURVEYOR_INCLUDED
+
+#include "../../pattern.h"
+
+extern struct sp_socktype *sp_xsurveyor_socktype;
+
+struct sp_xsurveyor_data {
+ struct sp_pipe *pipe;
+ struct sp_list_item outitem;
+ struct sp_list_item initem;
+};
+
+struct sp_xsurveyor {
+
+ /* The generic socket base class. */
+ struct sp_sockbase sockbase;
+
+ /* List of pipes to send messages to. */
+ struct sp_list outpipes;
+
+ /* List of pipes that we can get messages from. */
+ struct sp_list inpipes;
+
+ /* Next pipe to receive from. */
+ struct sp_xsurveyor_data *current;
+
+};
+
+void sp_xsurveyor_init (struct sp_xsurveyor *self,
+ const struct sp_sockbase_vfptr *vfptr, int fd);
+
+void sp_xsurveyor_term (struct sp_sockbase *self);
+int sp_xsurveyor_add (struct sp_sockbase *self, struct sp_pipe *pipe);
+void sp_xsurveyor_rm (struct sp_sockbase *self, struct sp_pipe *pipe);
+int sp_xsurveyor_in (struct sp_sockbase *self, struct sp_pipe *pipe);
+int sp_xsurveyor_out (struct sp_sockbase *self, struct sp_pipe *pipe);
+int sp_xsurveyor_send (struct sp_sockbase *self, const void *buf, size_t len);
+int sp_xsurveyor_recv (struct sp_sockbase *self, void *buf, size_t *len);
+int sp_xsurveyor_setopt (struct sp_sockbase *self, int option,
+ const void *optval, size_t optvallen);
+int sp_xsurveyor_getopt (struct sp_sockbase *self, int option,
+ void *optval, size_t *optvallen);
+
+#endif
+
diff --git a/src/sp.h b/src/sp.h
index a1b438e..87978d3 100644
--- a/src/sp.h
+++ b/src/sp.h
@@ -155,6 +155,8 @@
#define SP_SOURCE 7
#define SP_PUSH 8
#define SP_PULL 9
+#define SP_SURVEYOR 10
+#define SP_RESPONDENT 11
/* Socket option levels. */
#define SP_SOL_SOCKET 1
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 61fa442..a1dd00e 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -38,6 +38,7 @@
add_libnanomsg_test (reqrep)
add_libnanomsg_test (fanin)
add_libnanomsg_test (fanout)
+add_libnanomsg_test (survey)
# Feature tests.
add_libnanomsg_test (block)
diff --git a/tests/survey.c b/tests/survey.c
new file mode 100644
index 0000000..5d54ef2
--- /dev/null
+++ b/tests/survey.c
@@ -0,0 +1,87 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include "../src/sp.h"
+
+#include "../src/utils/err.c"
+
+int main ()
+{
+ int rc;
+ int surveyor;
+ int respondent1;
+ int respondent2;
+ int resend_ivl;
+ char buf [7];
+
+ rc = sp_init ();
+ errno_assert (rc == 0);
+ surveyor = sp_socket (AF_SP, SP_SURVEYOR);
+ errno_assert (surveyor != -1);
+ rc = sp_bind (surveyor, "inproc://a");
+ errno_assert (rc >= 0);
+ respondent1 = sp_socket (AF_SP, SP_RESPONDENT);
+ errno_assert (respondent1 != -1);
+ rc = sp_connect (respondent1, "inproc://a");
+ errno_assert (rc >= 0);
+ respondent2 = sp_socket (AF_SP, SP_RESPONDENT);
+ errno_assert (respondent2 != -1);
+ rc = sp_connect (respondent2, "inproc://a");
+ errno_assert (rc >= 0);
+
+ rc = sp_send (surveyor, "ABC", 3, 0);
+ errno_assert (rc >= 0);
+ sp_assert (rc == 3);
+
+ rc = sp_recv (respondent1, buf, sizeof (buf), 0);
+ errno_assert (rc >= 0);
+ sp_assert (rc == 3);
+ rc = sp_send (respondent1, "DEF", 3, 0);
+ errno_assert (rc >= 0);
+ sp_assert (rc == 3);
+
+ rc = sp_recv (respondent2, buf, sizeof (buf), 0);
+ errno_assert (rc >= 0);
+ sp_assert (rc == 3);
+ rc = sp_send (respondent2, "DEF", 3, 0);
+ errno_assert (rc >= 0);
+ sp_assert (rc == 3);
+
+ rc = sp_recv (surveyor, buf, sizeof (buf), 0);
+ errno_assert (rc >= 0);
+ sp_assert (rc == 3);
+ rc = sp_recv (surveyor, buf, sizeof (buf), 0);
+ errno_assert (rc >= 0);
+ sp_assert (rc == 3);
+
+ rc = sp_close (surveyor);
+ errno_assert (rc == 0);
+ rc = sp_close (respondent1);
+ errno_assert (rc == 0);
+ rc = sp_close (respondent2);
+ errno_assert (rc == 0);
+ rc = sp_term ();
+ errno_assert (rc == 0);
+
+ return 0;
+}
+