More work on TCP re-connect
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/transports/tcp/tcpb.c b/src/transports/tcp/tcpb.c
index 0d1dfd7..635e415 100644
--- a/src/transports/tcp/tcpb.c
+++ b/src/transports/tcp/tcpb.c
@@ -130,7 +130,8 @@
static void sp_tcpb_accepted (const struct sp_sink **self,
struct sp_usock *usock, int s)
{
- printf ("accepted %d\n", s);
+ printf ("accepted\n");
+ sp_assert (0);
}
diff --git a/src/transports/tcp/tcpc.c b/src/transports/tcp/tcpc.c
index b657b1e..c698e91 100644
--- a/src/transports/tcp/tcpc.c
+++ b/src/transports/tcp/tcpc.c
@@ -33,6 +33,17 @@
static const struct sp_epbase_vfptr sp_tcpc_epbase_vfptr =
{sp_tcpc_close};
+/* WAITING state. */
+static void sp_tcpc_timeout (const struct sp_sink **self,
+ struct sp_timer *timer);
+static const struct sp_sink sp_tcpc_state_waiting = {
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ sp_tcpc_timeout
+};
/* CONNECTING state. */
static void sp_tcpc_connected (const struct sp_sink **self,
@@ -48,69 +59,30 @@
NULL
};
-/* WAITING state. */
-static void sp_tcpc_timeout (const struct sp_sink **self,
- struct sp_timer *timer);
-static const struct sp_sink sp_tcpc_state_waiting = {
- NULL,
- NULL,
- NULL,
- NULL,
- NULL,
- sp_tcpc_timeout
-};
-
int sp_tcpc_init (struct sp_tcpc *self, const char *addr, void *hint)
{
int rc;
- int port;
- const char *colon;
- struct sockaddr_storage ss;
- socklen_t sslen;
- /* Make sure we're working from a clean slate. Required on Mac OS X. */
- memset (&ss, 0, sizeof (ss));
-
- /* Parse the port. */
- rc = sp_addr_parse_port (addr, &colon);
- if (rc < 0)
- return rc;
- port = rc;
-
- /* TODO: Parse the local address, if any. */
-
- /* Parse the address. */
- /* TODO: Get the actual value of the IPV4ONLY socket option. */
- rc = sp_addr_parse_remote (addr, colon - addr, SP_ADDR_IPV4ONLY,
- &ss, &sslen);
- if (rc < 0)
- return rc;
-
- /* Combine the port and the address. */
- if (ss.ss_family == AF_INET)
- ((struct sockaddr_in*) &ss)->sin_port = htons (port);
- else if (ss.ss_family == AF_INET6)
- ((struct sockaddr_in6*) &ss)->sin6_port = htons (port);
- else
- sp_assert (0);
+ /* TODO: Check the syntax of the address and return error if it is
+ not a valid address string. Don't do any blocking DNS operations
+ though! */
/* Initialise the base class. */
sp_epbase_init (&self->epbase, &sp_tcpc_epbase_vfptr, addr, hint);
+ /* Open a socket. */
+ rc = sp_usock_init (&self->usock, &self->sink,
+ AF_INET, SOCK_STREAM, IPPROTO_TCP, sp_epbase_getcp (&self->epbase));
+ errnum_assert (rc == 0, -rc);
+
/* Initialise the retry timer. */
sp_timer_init (&self->retry_timer, &self->sink,
sp_epbase_getcp (&self->epbase));
- /* Open the socket and start connecting. */
- self->sink = &sp_tcpc_state_connecting;
- rc = sp_usock_init (&self->usock, &self->sink,
- AF_INET, SOCK_STREAM, IPPROTO_TCP, sp_epbase_getcp (&self->epbase));
- errnum_assert (rc == 0, -rc);
- rc = sp_usock_connect (&self->usock, (struct sockaddr*) &ss, sslen);
- if (rc == 0)
- sp_tcpc_connected (&self->sink, &self->usock);
- else
- errnum_assert (rc == -EINPROGRESS, -rc);
+ /* Pretend we were waiting for the re-connect timer and that the timer
+ have expired. */
+ self->sink = &sp_tcpc_state_waiting;
+ sp_tcpc_timeout (&self->sink, &self->retry_timer);
return 0;
}
@@ -137,7 +109,6 @@
int rc;
struct sp_tcpc *tcpc;
-printf ("connect failed!\n");
tcpc = sp_cont (self, struct sp_tcpc, sink);
/* Connect failed. Close the old socket and create a new one. */
@@ -155,11 +126,57 @@
static void sp_tcpc_timeout (const struct sp_sink **self,
struct sp_timer *timer)
{
+ int rc;
struct sp_tcpc *tcpc;
+ const char *addr;
+ int port;
+ const char *colon;
+ struct sockaddr_storage ss;
+ socklen_t sslen;
tcpc = sp_cont (self, struct sp_tcpc, sink);
- /* Retry timer expired. */
- sp_assert (0);
+ /* Retry timer expired. Now we'll try to resolve the address. */
+ addr = sp_epbase_getaddr (&tcpc->epbase);
+
+ /* Make sure we're working from a clean slate. Required on Mac OS X. */
+ memset (&ss, 0, sizeof (ss));
+
+ /* Parse the port. */
+ port = sp_addr_parse_port (addr, &colon);
+
+ /* TODO: Parse the local address, if any. */
+
+ /* Parse the address. */
+ /* TODO: Get the actual value of the IPV4ONLY socket option. */
+ rc = sp_addr_parse_remote (addr, colon - addr, SP_ADDR_IPV4ONLY,
+ &ss, &sslen);
+
+ /* If the address resolution have failed, wait and re-try. */
+ if (rc < 0) {
+ tcpc->sink = &sp_tcpc_state_waiting;
+ /* TODO: Get the retry interval from the socket option. */
+ sp_timer_start (&tcpc->retry_timer, 100);
+ return;
+ }
+
+ /* Combine the port and the address. */
+ if (ss.ss_family == AF_INET)
+ ((struct sockaddr_in*) &ss)->sin_port = htons (port);
+ else if (ss.ss_family == AF_INET6)
+ ((struct sockaddr_in6*) &ss)->sin6_port = htons (port);
+ else
+ sp_assert (0);
+
+ /* TODO: New RESOLVING state should be added here to deal with
+ asynchronous DNS queries. */
+
+ /* Open the socket and start connecting. */
+ tcpc->sink = &sp_tcpc_state_connecting;
+ rc = sp_usock_connect (&tcpc->usock, (struct sockaddr*) &ss, sslen);
+ if (rc == 0)
+ sp_tcpc_connected (&tcpc->sink, &tcpc->usock);
+ else
+ errnum_assert (rc == -EINPROGRESS, -rc);
}
diff --git a/src/utils/aio.c b/src/utils/aio.c
index b096ff4..35197d3 100644
--- a/src/utils/aio.c
+++ b/src/utils/aio.c
@@ -359,16 +359,17 @@
if (errno != EINPROGRESS)
return -errno;
- /* If we are in the worker thread we can simply start polling for out. */
+ /* If we are in the worker thread we can simply start polling for out.
+ Otherwise, ask worker thread to start polling for out. */
if (sp_thread_current (&self->cp->worker)) {
sp_poller_add (&self->cp->poller, self->s, &self->hndl);
sp_poller_set_out (&self->cp->poller, &self->hndl);
}
-
- /* Otherwise, ask worker thread to start polling for out. */
- sp_queue_push (&self->cp->opqueue, &self->add_hndl.item);
- sp_queue_push (&self->cp->opqueue, &self->out.hndl.item);
- sp_efd_signal (&self->cp->efd);
+ else {
+ sp_queue_push (&self->cp->opqueue, &self->add_hndl.item);
+ sp_queue_push (&self->cp->opqueue, &self->out.hndl.item);
+ sp_efd_signal (&self->cp->efd);
+ }
return -EINPROGRESS;
}
@@ -563,6 +564,7 @@
waiting. */
if (phndl == &self->efd_hndl) {
sp_assert (event == SP_POLLER_IN);
+ sp_efd_unsignal (&self->efd);
continue;
}