| /* | 
 |     Copyright (c) 2013 Insollo Entertainment, LLC.  All rights reserved. | 
 |  | 
 |     Permission is hereby granted, free of charge, to any person obtaining a copy | 
 |     of this software and associated documentation files (the "Software"), | 
 |     to deal in the Software without restriction, including without limitation | 
 |     the rights to use, copy, modify, merge, publish, distribute, sublicense, | 
 |     and/or sell copies of the Software, and to permit persons to whom | 
 |     the Software is furnished to do so, subject to the following conditions: | 
 |  | 
 |     The above copyright notice and this permission notice shall be included | 
 |     in all copies or substantial portions of the Software. | 
 |  | 
 |     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | 
 |     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | 
 |     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL | 
 |     THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | 
 |     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING | 
 |     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | 
 |     IN THE SOFTWARE. | 
 | */ | 
 |  | 
 | #include "../src/nn.h" | 
 | #include "../src/pubsub.h" | 
 | #include "../src/pipeline.h" | 
 | #include "../src/bus.h" | 
 | #include "../src/pair.h" | 
 | #include "../src/survey.h" | 
 | #include "../src/reqrep.h" | 
 |  | 
 | #include "options.h" | 
 | #include "../src/utils/sleep.c" | 
 | #include "../src/utils/clock.c" | 
 |  | 
 | #include <stdio.h> | 
 | #include <string.h> | 
 | #include <errno.h> | 
 | #include <stdlib.h> | 
 | #include <time.h> | 
 | #include <ctype.h> | 
 | #if !defined NN_HAVE_WINDOWS | 
 | #include <unistd.h> | 
 | #endif | 
 |  | 
 | enum echo_format { | 
 |     NN_NO_ECHO, | 
 |     NN_ECHO_RAW, | 
 |     NN_ECHO_ASCII, | 
 |     NN_ECHO_QUOTED, | 
 |     NN_ECHO_MSGPACK | 
 | }; | 
 |  | 
 | typedef struct nn_options { | 
 |     /* Global options */ | 
 |     int verbose; | 
 |  | 
 |     /* Socket options */ | 
 |     int socket_type; | 
 |     struct nn_string_list bind_addresses; | 
 |     struct nn_string_list connect_addresses; | 
 |     float send_timeout; | 
 |     float recv_timeout; | 
 |     struct nn_string_list subscriptions; | 
 |     char *socket_name; | 
 |  | 
 |     /* Output options */ | 
 |     float send_delay; | 
 |     float send_interval; | 
 |     struct nn_blob data_to_send; | 
 |  | 
 |     /* Input options */ | 
 |     enum echo_format echo_format; | 
 | } nn_options_t; | 
 |  | 
 | /*  Constants to get address of in option declaration  */ | 
 | static const int nn_push = NN_PUSH; | 
 | static const int nn_pull = NN_PULL; | 
 | static const int nn_pub = NN_PUB; | 
 | static const int nn_sub = NN_SUB; | 
 | static const int nn_req = NN_REQ; | 
 | static const int nn_rep = NN_REP; | 
 | static const int nn_bus = NN_BUS; | 
 | static const int nn_pair = NN_PAIR; | 
 | static const int nn_surveyor = NN_SURVEYOR; | 
 | static const int nn_respondent = NN_RESPONDENT; | 
 |  | 
 |  | 
 | struct nn_enum_item socket_types[] = { | 
 |     {"PUSH", NN_PUSH}, | 
 |     {"PULL", NN_PULL}, | 
 |     {"PUB", NN_PUB}, | 
 |     {"SUB", NN_SUB}, | 
 |     {"REQ", NN_REQ}, | 
 |     {"REP", NN_REP}, | 
 |     {"BUS", NN_BUS}, | 
 |     {"PAIR", NN_PAIR}, | 
 |     {"SURVEYOR", NN_SURVEYOR}, | 
 |     {"RESPONDENT", NN_RESPONDENT}, | 
 |     {NULL, 0}, | 
 | }; | 
 |  | 
 |  | 
 | /*  Constants to get address of in option declaration  */ | 
 | static const int nn_echo_raw = NN_ECHO_RAW; | 
 | static const int nn_echo_ascii = NN_ECHO_ASCII; | 
 | static const int nn_echo_quoted = NN_ECHO_QUOTED; | 
 | static const int nn_echo_msgpack = NN_ECHO_MSGPACK; | 
 |  | 
 | struct nn_enum_item echo_formats[] = { | 
 |     {"no", NN_NO_ECHO}, | 
 |     {"raw", NN_ECHO_RAW}, | 
 |     {"ascii", NN_ECHO_ASCII}, | 
 |     {"quoted", NN_ECHO_QUOTED}, | 
 |     {"msgpack", NN_ECHO_MSGPACK}, | 
 |     {NULL, 0}, | 
 | }; | 
 |  | 
 | /*  Constants for conflict masks  */ | 
 | #define NN_MASK_SOCK 1 | 
 | #define NN_MASK_WRITEABLE 2 | 
 | #define NN_MASK_READABLE 4 | 
 | #define NN_MASK_SOCK_SUB 8 | 
 | #define NN_MASK_DATA 16 | 
 | #define NN_MASK_ENDPOINT 32 | 
 | #define NN_NO_PROVIDES 0 | 
 | #define NN_NO_CONFLICTS 0 | 
 | #define NN_NO_REQUIRES 0 | 
 | #define NN_MASK_SOCK_WRITEABLE (NN_MASK_SOCK | NN_MASK_WRITEABLE) | 
 | #define NN_MASK_SOCK_READABLE (NN_MASK_SOCK | NN_MASK_READABLE) | 
 | #define NN_MASK_SOCK_READWRITE  (NN_MASK_SOCK_WRITEABLE|NN_MASK_SOCK_READABLE) | 
 |  | 
 | struct nn_option nn_options[] = { | 
 |     /* Generic options */ | 
 |     {"verbose", 'v', NULL, | 
 |      NN_OPT_INCREMENT, offsetof (nn_options_t, verbose), NULL, | 
 |      NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_NO_REQUIRES, | 
 |      "Generic", NULL, "Increase verbosity of the nanocat"}, | 
 |     {"silent", 'q', NULL, | 
 |      NN_OPT_DECREMENT, offsetof (nn_options_t, verbose), NULL, | 
 |      NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_NO_REQUIRES, | 
 |      "Generic", NULL, "Decrease verbosity of the nanocat"}, | 
 |     {"help", 'h', NULL, | 
 |      NN_OPT_HELP, 0, NULL, | 
 |      NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_NO_REQUIRES, | 
 |      "Generic", NULL, "This help text"}, | 
 |  | 
 |     /* Socket types */ | 
 |     {"push", 0, "nn_push", | 
 |      NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_push, | 
 |      NN_MASK_SOCK_WRITEABLE, NN_MASK_SOCK, NN_MASK_DATA, | 
 |      "Socket Types", NULL, "Use NN_PUSH socket type"}, | 
 |     {"pull", 0, "nn_pull", | 
 |      NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_pull, | 
 |      NN_MASK_SOCK_READABLE, NN_MASK_SOCK, NN_NO_REQUIRES, | 
 |      "Socket Types", NULL, "Use NN_PULL socket type"}, | 
 |     {"pub", 0, "nn_pub", | 
 |      NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_pub, | 
 |      NN_MASK_SOCK_WRITEABLE, NN_MASK_SOCK, NN_MASK_DATA, | 
 |      "Socket Types", NULL, "Use NN_PUB socket type"}, | 
 |     {"sub", 0, "nn_sub", | 
 |      NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_sub, | 
 |      NN_MASK_SOCK_READABLE|NN_MASK_SOCK_SUB, NN_MASK_SOCK, NN_NO_REQUIRES, | 
 |      "Socket Types", NULL, "Use NN_SUB socket type"}, | 
 |     {"req", 0, "nn_req", | 
 |      NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_req, | 
 |      NN_MASK_SOCK_READWRITE, NN_MASK_SOCK, NN_MASK_DATA, | 
 |      "Socket Types", NULL, "Use NN_REQ socket type"}, | 
 |     {"rep", 0, "nn_rep", | 
 |      NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_rep, | 
 |      NN_MASK_SOCK_READWRITE, NN_MASK_SOCK, NN_NO_REQUIRES, | 
 |      "Socket Types", NULL, "Use NN_REP socket type"}, | 
 |     {"surveyor", 0, "nn_surveyor", | 
 |      NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_surveyor, | 
 |      NN_MASK_SOCK_READWRITE, NN_MASK_SOCK, NN_MASK_DATA, | 
 |      "Socket Types", NULL, "Use NN_SURVEYOR socket type"}, | 
 |     {"respondent", 0, "nn_respondent", | 
 |      NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_respondent, | 
 |      NN_MASK_SOCK_READWRITE, NN_MASK_SOCK, NN_NO_REQUIRES, | 
 |      "Socket Types", NULL, "Use NN_RESPONDENT socket type"}, | 
 |     {"bus", 0, "nn_bus", | 
 |      NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_bus, | 
 |      NN_MASK_SOCK_READWRITE, NN_MASK_SOCK, NN_NO_REQUIRES, | 
 |      "Socket Types", NULL, "Use NN_BUS socket type"}, | 
 |     {"pair", 0, "nn_pair", | 
 |      NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_pair, | 
 |      NN_MASK_SOCK_READWRITE, NN_MASK_SOCK, NN_NO_REQUIRES, | 
 |      "Socket Types", NULL, "Use NN_PAIR socket type"}, | 
 |  | 
 |     /* Socket Options */ | 
 |     {"bind", 0, NULL, | 
 |      NN_OPT_LIST_APPEND, offsetof (nn_options_t, bind_addresses), NULL, | 
 |      NN_MASK_ENDPOINT, NN_NO_CONFLICTS, NN_NO_REQUIRES, | 
 |      "Socket Options", "ADDR", "Bind socket to the address ADDR"}, | 
 |     {"connect", 0, NULL, | 
 |      NN_OPT_LIST_APPEND, offsetof (nn_options_t, connect_addresses), NULL, | 
 |      NN_MASK_ENDPOINT, NN_NO_CONFLICTS, NN_NO_REQUIRES, | 
 |      "Socket Options", "ADDR", "Connect socket to the address ADDR"}, | 
 |     {"bind-ipc", 'X' , NULL, NN_OPT_LIST_APPEND_FMT, | 
 |      offsetof (nn_options_t, bind_addresses), "ipc://%s", | 
 |      NN_MASK_ENDPOINT, NN_NO_CONFLICTS, NN_NO_REQUIRES, | 
 |      "Socket Options", "PATH", "Bind socket to the ipc address " | 
 |                                "\"ipc://PATH\"."}, | 
 |     {"connect-ipc", 'x' , NULL, NN_OPT_LIST_APPEND_FMT, | 
 |      offsetof (nn_options_t, connect_addresses), "ipc://%s", | 
 |      NN_MASK_ENDPOINT, NN_NO_CONFLICTS, NN_NO_REQUIRES, | 
 |      "Socket Options", "PATH", "Connect socket to the ipc address " | 
 |                                "\"ipc://PATH\"."}, | 
 |     {"bind-local", 'L' , NULL, NN_OPT_LIST_APPEND_FMT, | 
 |      offsetof (nn_options_t, bind_addresses), "tcp://127.0.0.1:%s", | 
 |      NN_MASK_ENDPOINT, NN_NO_CONFLICTS, NN_NO_REQUIRES, | 
 |      "Socket Options", "PORT", "Bind socket to the tcp address " | 
 |                                "\"tcp://127.0.0.1:PORT\"."}, | 
 |     {"connect-local", 'l' , NULL, NN_OPT_LIST_APPEND_FMT, | 
 |      offsetof (nn_options_t, connect_addresses), "tcp://127.0.0.1:%s", | 
 |      NN_MASK_ENDPOINT, NN_NO_CONFLICTS, NN_NO_REQUIRES, | 
 |      "Socket Options", "PORT", "Connect socket to the tcp address " | 
 |                                "\"tcp://127.0.0.1:PORT\"."}, | 
 |     {"recv-timeout", 0, NULL, | 
 |      NN_OPT_FLOAT, offsetof (nn_options_t, recv_timeout), NULL, | 
 |      NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_MASK_READABLE, | 
 |      "Socket Options", "SEC", "Set timeout for receiving a message"}, | 
 |     {"send-timeout", 0, NULL, | 
 |      NN_OPT_FLOAT, offsetof (nn_options_t, send_timeout), NULL, | 
 |      NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_MASK_WRITEABLE, | 
 |      "Socket Options", "SEC", "Set timeout for sending a message"}, | 
 |     {"socket-name", 0, NULL, | 
 |      NN_OPT_STRING, offsetof (nn_options_t, socket_name), NULL, | 
 |      NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_NO_REQUIRES, | 
 |      "Socket Options", "NAME", "Name of the socket for statistics"}, | 
 |  | 
 |     /* Pattern-specific options */ | 
 |     {"subscribe", 0, NULL, | 
 |      NN_OPT_LIST_APPEND, offsetof (nn_options_t, subscriptions), NULL, | 
 |      NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_MASK_SOCK_SUB, | 
 |      "SUB Socket Options", "PREFIX", "Subscribe to the prefix PREFIX. " | 
 |         "Note: socket will be subscribed to everything (empty prefix) if " | 
 |         "no prefixes are specified on the command-line."}, | 
 |  | 
 |     /* Input Options */ | 
 |     {"format", 0, NULL, | 
 |      NN_OPT_ENUM, offsetof (nn_options_t, echo_format), &echo_formats, | 
 |      NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_MASK_READABLE, | 
 |      "Input Options", "FORMAT", "Use echo format FORMAT " | 
 |                                "(same as the options below)"}, | 
 |     {"raw", 0, NULL, | 
 |      NN_OPT_SET_ENUM, offsetof (nn_options_t, echo_format), &nn_echo_raw, | 
 |      NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_MASK_READABLE, | 
 |      "Input Options", NULL, "Dump message as is " | 
 |                            "(Note: no delimiters are printed)"}, | 
 |     {"ascii", 'A', NULL, | 
 |      NN_OPT_SET_ENUM, offsetof (nn_options_t, echo_format), &nn_echo_ascii, | 
 |      NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_MASK_READABLE, | 
 |      "Input Options", NULL, "Print ASCII part of message delimited by newline. " | 
 |                            "All non-ascii characters replaced by dot."}, | 
 |     {"quoted", 'Q', NULL, | 
 |      NN_OPT_SET_ENUM, offsetof (nn_options_t, echo_format), &nn_echo_quoted, | 
 |      NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_MASK_READABLE, | 
 |      "Input Options", NULL, "Print each message on separate line in double " | 
 |                            "quotes with C-like character escaping"}, | 
 |     {"msgpack", 0, NULL, | 
 |      NN_OPT_SET_ENUM, offsetof (nn_options_t, echo_format), &nn_echo_msgpack, | 
 |      NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_MASK_READABLE, | 
 |      "Input Options", NULL, "Print each message as msgpacked string (raw type)." | 
 |                            " This is useful for programmatic parsing."}, | 
 |  | 
 |     /* Output Options */ | 
 |     {"interval", 'i', NULL, | 
 |      NN_OPT_FLOAT, offsetof (nn_options_t, send_interval), NULL, | 
 |      NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_MASK_WRITEABLE, | 
 |      "Output Options", "SEC", "Send message (or request) every SEC seconds"}, | 
 |     {"delay", 'd', NULL, | 
 |      NN_OPT_FLOAT, offsetof (nn_options_t, send_delay), NULL, | 
 |      NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_NO_REQUIRES, | 
 |      "Output Options", "SEC", "Wait for SEC seconds before sending message" | 
 |                               " (useful for one-shot PUB sockets)"}, | 
 |     {"data", 'D', NULL, | 
 |      NN_OPT_BLOB, offsetof (nn_options_t, data_to_send), &echo_formats, | 
 |      NN_MASK_DATA, NN_MASK_DATA, NN_MASK_WRITEABLE, | 
 |      "Output Options", "DATA", "Send DATA to the socket and quit for " | 
 |      "PUB, PUSH, PAIR, BUS socket. Use DATA to reply for REP or " | 
 |      " RESPONDENT socket. Send DATA as request for REQ or SURVEYOR socket."}, | 
 |     {"file", 'F', NULL, | 
 |      NN_OPT_READ_FILE, offsetof (nn_options_t, data_to_send), &echo_formats, | 
 |      NN_MASK_DATA, NN_MASK_DATA, NN_MASK_WRITEABLE, | 
 |      "Output Options", "PATH", "Same as --data but get data from file PATH"}, | 
 |  | 
 |     /* Sentinel */ | 
 |     {NULL, 0, NULL, | 
 |      0, 0, NULL, | 
 |      0, 0, 0, | 
 |      NULL, NULL, NULL}, | 
 |     }; | 
 |  | 
 |  | 
 | struct nn_commandline nn_cli = { | 
 |     "A command-line interface to nanomsg", | 
 |     "", | 
 |     nn_options, | 
 |     NN_MASK_SOCK | NN_MASK_ENDPOINT, | 
 | }; | 
 |  | 
 |  | 
 | void nn_assert_errno (int flag, char *description) | 
 | { | 
 |     int err; | 
 |  | 
 |     if (!flag) { | 
 |         err = errno; | 
 |         fprintf (stderr, "%s: %s\n", description, nn_strerror (err)); | 
 |         exit (3); | 
 |     } | 
 | } | 
 |  | 
 | void nn_sub_init (nn_options_t *options, int sock) | 
 | { | 
 |     int i; | 
 |     int rc; | 
 |  | 
 |     if (options->subscriptions.num) { | 
 |         for (i = 0; i < options->subscriptions.num; ++i) { | 
 |             rc = nn_setsockopt (sock, NN_SUB, NN_SUB_SUBSCRIBE, | 
 |                 options->subscriptions.items[i], | 
 |                 strlen (options->subscriptions.items[i])); | 
 |             nn_assert_errno (rc == 0, "Can't subscribe"); | 
 |         } | 
 |     } else { | 
 |         rc = nn_setsockopt (sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0); | 
 |         nn_assert_errno (rc == 0, "Can't subscribe"); | 
 |     } | 
 | } | 
 |  | 
 | void nn_set_recv_timeout (int sock, int millis) | 
 | { | 
 |     int rc; | 
 |     rc = nn_setsockopt (sock, NN_SOL_SOCKET, NN_RCVTIMEO, | 
 |                        &millis, sizeof (millis)); | 
 |     nn_assert_errno (rc == 0, "Can't set recv timeout"); | 
 | } | 
 |  | 
 | int nn_create_socket (nn_options_t *options) | 
 | { | 
 |     int sock; | 
 |     int rc; | 
 |     int millis; | 
 |  | 
 |     sock = nn_socket (AF_SP, options->socket_type); | 
 |     nn_assert_errno (sock >= 0, "Can't create socket"); | 
 |  | 
 |     /* Generic initialization */ | 
 |     if (options->send_timeout >= 0) { | 
 |         millis = (int)(options->send_timeout * 1000); | 
 |         rc = nn_setsockopt (sock, NN_SOL_SOCKET, NN_SNDTIMEO, | 
 |                            &millis, sizeof (millis)); | 
 |         nn_assert_errno (rc == 0, "Can't set send timeout"); | 
 |     } | 
 |     if (options->recv_timeout >= 0) { | 
 |         nn_set_recv_timeout (sock, (int) options->recv_timeout); | 
 |     } | 
 |     if (options->socket_name) { | 
 |         rc = nn_setsockopt (sock, NN_SOL_SOCKET, NN_SOCKET_NAME, | 
 |                            options->socket_name, strlen(options->socket_name)); | 
 |         nn_assert_errno (rc == 0, "Can't set socket name"); | 
 |     } | 
 |  | 
 |     /* Specific intitalization */ | 
 |     switch (options->socket_type) { | 
 |     case NN_SUB: | 
 |         nn_sub_init (options, sock); | 
 |         break; | 
 |     } | 
 |  | 
 |     return sock; | 
 | } | 
 |  | 
 | void nn_print_message (nn_options_t *options, char *buf, int buflen) | 
 | { | 
 |     switch (options->echo_format) { | 
 |     case NN_NO_ECHO: | 
 |         return; | 
 |     case NN_ECHO_RAW: | 
 |         fwrite (buf, 1, buflen, stdout); | 
 |         break; | 
 |     case NN_ECHO_ASCII: | 
 |         for (; buflen > 0; --buflen, ++buf) { | 
 |             if (isprint (*buf)) { | 
 |                 fputc (*buf, stdout); | 
 |             } else { | 
 |                 fputc ('.', stdout); | 
 |             } | 
 |         } | 
 |         fputc ('\n', stdout); | 
 |         break; | 
 |     case NN_ECHO_QUOTED: | 
 |         fputc ('"', stdout); | 
 |         for (; buflen > 0; --buflen, ++buf) { | 
 |             switch (*buf) { | 
 |             case '\n': | 
 |                 fprintf (stdout, "\\n"); | 
 |                 break; | 
 |             case '\r': | 
 |                 fprintf (stdout, "\\r"); | 
 |                 break; | 
 |             case '\\': | 
 |             case '\"': | 
 |                 fprintf (stdout, "\\%c", *buf); | 
 |                 break; | 
 |             default: | 
 |                 if (isprint (*buf)) { | 
 |                     fputc (*buf, stdout); | 
 |                 } else { | 
 |                     fprintf (stdout, "\\x%02x", (unsigned char)*buf); | 
 |                 } | 
 |             } | 
 |         } | 
 |         fprintf (stdout, "\"\n"); | 
 |         break; | 
 |     case NN_ECHO_MSGPACK: | 
 |         if (buflen < 256) { | 
 |             fputc ('\xc4', stdout); | 
 |             fputc (buflen, stdout); | 
 |             fwrite (buf, 1, buflen, stdout); | 
 |         } else if (buflen < 65536) { | 
 |             fputc ('\xc5', stdout); | 
 |             fputc (buflen >> 8, stdout); | 
 |             fputc (buflen & 0xff, stdout); | 
 |             fwrite (buf, 1, buflen, stdout); | 
 |         } else { | 
 |             fputc ('\xc6', stdout); | 
 |             fputc (buflen >> 24, stdout); | 
 |             fputc ((buflen >> 16) & 0xff, stdout); | 
 |             fputc ((buflen >> 8) & 0xff, stdout); | 
 |             fputc (buflen & 0xff, stdout); | 
 |             fwrite (buf, 1, buflen, stdout); | 
 |         } | 
 |         break; | 
 |     } | 
 |     fflush (stdout); | 
 | } | 
 |  | 
 | void nn_connect_socket (nn_options_t *options, int sock) | 
 | { | 
 |     int i; | 
 |     int rc; | 
 |  | 
 |     for (i = 0; i < options->bind_addresses.num; ++i) { | 
 |         rc = nn_bind (sock, options->bind_addresses.items[i]); | 
 |         nn_assert_errno (rc >= 0, "Can't bind"); | 
 |     } | 
 |     for (i = 0; i < options->connect_addresses.num; ++i) { | 
 |         rc = nn_connect (sock, options->connect_addresses.items[i]); | 
 |         nn_assert_errno (rc >= 0, "Can't connect"); | 
 |     } | 
 | } | 
 |  | 
 | void nn_send_loop (nn_options_t *options, int sock) | 
 | { | 
 |     int rc; | 
 |     uint64_t start_time; | 
 |     int64_t time_to_sleep, interval; | 
 |     struct nn_clock clock; | 
 |  | 
 |     interval = (int)(options->send_interval*1000); | 
 |     nn_clock_init (&clock); | 
 |  | 
 |     for (;;) { | 
 |         start_time = nn_clock_now (&clock); | 
 |         rc = nn_send (sock, | 
 |             options->data_to_send.data, options->data_to_send.length, | 
 |             0); | 
 |         if (rc < 0 && errno == EAGAIN) { | 
 |             fprintf (stderr, "Message not sent (EAGAIN)\n"); | 
 |         } else { | 
 |             nn_assert_errno (rc >= 0, "Can't send"); | 
 |         } | 
 |         if (interval >= 0) { | 
 |             time_to_sleep = (start_time + interval) - nn_clock_now (&clock); | 
 |             if (time_to_sleep > 0) { | 
 |                 nn_sleep ((int) time_to_sleep); | 
 |             } | 
 |         } else { | 
 |             break; | 
 |         } | 
 |     } | 
 |  | 
 |     nn_clock_term(&clock); | 
 | } | 
 |  | 
 | void nn_recv_loop (nn_options_t *options, int sock) | 
 | { | 
 |     int rc; | 
 |     void *buf; | 
 |  | 
 |     for (;;) { | 
 |         rc = nn_recv (sock, &buf, NN_MSG, 0); | 
 |         if (rc < 0 && errno == EAGAIN) { | 
 |             continue; | 
 |         } else if (errno == ETIMEDOUT || errno == EFSM) { | 
 |             return;  /*  No more messages possible  */ | 
 |         } else { | 
 |             nn_assert_errno (rc >= 0, "Can't recv"); | 
 |         } | 
 |         nn_print_message (options, buf, rc); | 
 |         nn_freemsg (buf); | 
 |     } | 
 | } | 
 |  | 
 | void nn_rw_loop (nn_options_t *options, int sock) | 
 | { | 
 |     int rc; | 
 |     void *buf; | 
 |     uint64_t start_time; | 
 |     int64_t time_to_sleep, interval, recv_timeout; | 
 |     struct nn_clock clock; | 
 |  | 
 |     interval = (int)(options->send_interval*1000); | 
 |     recv_timeout = (int)(options->recv_timeout*1000); | 
 |     nn_clock_init (&clock); | 
 |  | 
 |     for (;;) { | 
 |         start_time = nn_clock_now (&clock); | 
 |         rc = nn_send (sock, | 
 |             options->data_to_send.data, options->data_to_send.length, | 
 |             0); | 
 |         if (rc < 0 && errno == EAGAIN) { | 
 |             fprintf (stderr, "Message not sent (EAGAIN)\n"); | 
 |         } else { | 
 |             nn_assert_errno (rc >= 0, "Can't send"); | 
 |         } | 
 |         if (options->send_interval < 0) {  /*  Never send any more  */ | 
 |             nn_recv_loop (options, sock); | 
 |             return; | 
 |         } | 
 |  | 
 |         for (;;) { | 
 |             time_to_sleep = (start_time + interval) - nn_clock_now (&clock); | 
 |             if (time_to_sleep <= 0) { | 
 |                 break; | 
 |             } | 
 |             if (recv_timeout >= 0 && time_to_sleep > recv_timeout) | 
 |             { | 
 |                 time_to_sleep = recv_timeout; | 
 |             } | 
 |             nn_set_recv_timeout (sock, (int) time_to_sleep); | 
 |             rc = nn_recv (sock, &buf, NN_MSG, 0); | 
 |             if (rc < 0) { | 
 |                 if (errno == EAGAIN) { | 
 |                     continue; | 
 |                 } else if (errno == ETIMEDOUT || errno == EFSM) { | 
 |                     time_to_sleep = (start_time + interval) | 
 |                         - nn_clock_now (&clock); | 
 |                     if (time_to_sleep > 0) | 
 |                         nn_sleep ((int) time_to_sleep); | 
 |                     continue; | 
 |                 } | 
 |             } | 
 |             nn_assert_errno (rc >= 0, "Can't recv"); | 
 |             nn_print_message (options, buf, rc); | 
 |             nn_freemsg (buf); | 
 |         } | 
 |     } | 
 |  | 
 |     nn_clock_term(&clock); | 
 | } | 
 |  | 
 | void nn_resp_loop (nn_options_t *options, int sock) | 
 | { | 
 |     int rc; | 
 |     void *buf; | 
 |  | 
 |     for (;;) { | 
 |         rc = nn_recv (sock, &buf, NN_MSG, 0); | 
 |         if (rc < 0 && errno == EAGAIN) { | 
 |                 continue; | 
 |         } else { | 
 |             nn_assert_errno (rc >= 0, "Can't recv"); | 
 |         } | 
 |         nn_print_message (options, buf, rc); | 
 |         nn_freemsg (buf); | 
 |         rc = nn_send (sock, | 
 |             options->data_to_send.data, options->data_to_send.length, | 
 |             0); | 
 |         if (rc < 0 && errno == EAGAIN) { | 
 |             fprintf (stderr, "Message not sent (EAGAIN)\n"); | 
 |         } else { | 
 |             nn_assert_errno (rc >= 0, "Can't send"); | 
 |         } | 
 |     } | 
 | } | 
 |  | 
 | int main (int argc, char **argv) | 
 | { | 
 |     int sock; | 
 |     nn_options_t options = { | 
 |         /* verbose           */ 0, | 
 |         /* socket_type       */ 0, | 
 |         /* bind_addresses    */ {NULL, NULL, 0, 0}, | 
 |         /* connect_addresses */ {NULL, NULL, 0, 0}, | 
 |         /* send_timeout      */ -1.f, | 
 |         /* recv_timeout      */ -1.f, | 
 |         /* subscriptions     */ {NULL, NULL, 0, 0}, | 
 |         /* socket_name       */ NULL, | 
 |         /* send_delay        */ 0.f, | 
 |         /* send_interval     */ -1.f, | 
 |         /* data_to_send      */ {NULL, 0, 0}, | 
 |         /* echo_format       */ NN_NO_ECHO | 
 |     }; | 
 |  | 
 |     nn_parse_options (&nn_cli, &options, argc, argv); | 
 |     sock = nn_create_socket (&options); | 
 |     nn_connect_socket (&options, sock); | 
 |     nn_sleep((int)(options.send_delay*1000)); | 
 |     switch (options.socket_type) { | 
 |     case NN_PUB: | 
 |     case NN_PUSH: | 
 |         nn_send_loop (&options, sock); | 
 |         break; | 
 |     case NN_SUB: | 
 |     case NN_PULL: | 
 |         nn_recv_loop (&options, sock); | 
 |         break; | 
 |     case NN_BUS: | 
 |     case NN_PAIR: | 
 |         if (options.data_to_send.data) { | 
 |             nn_rw_loop (&options, sock); | 
 |         } else { | 
 |             nn_recv_loop (&options, sock); | 
 |         } | 
 |         break; | 
 |     case NN_SURVEYOR: | 
 |     case NN_REQ: | 
 |         nn_rw_loop (&options, sock); | 
 |         break; | 
 |     case NN_REP: | 
 |     case NN_RESPONDENT: | 
 |         if (options.data_to_send.data) { | 
 |             nn_resp_loop (&options, sock); | 
 |         } else { | 
 |             nn_recv_loop (&options, sock); | 
 |         } | 
 |         break; | 
 |     } | 
 |  | 
 |     nn_close (sock); | 
 |     nn_free_options(&nn_cli, &options); | 
 |     return 0; | 
 | } |