More work on TCP re-connect

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
diff --git a/src/transports/tcp/tcpb.c b/src/transports/tcp/tcpb.c
index 0d1dfd7..635e415 100644
--- a/src/transports/tcp/tcpb.c
+++ b/src/transports/tcp/tcpb.c
@@ -130,7 +130,8 @@
 static void sp_tcpb_accepted (const struct sp_sink **self,
     struct sp_usock *usock, int s)
 {
-    printf ("accepted %d\n", s);
+    printf ("accepted\n");
+    sp_assert (0);
 }
 
 
diff --git a/src/transports/tcp/tcpc.c b/src/transports/tcp/tcpc.c
index b657b1e..c698e91 100644
--- a/src/transports/tcp/tcpc.c
+++ b/src/transports/tcp/tcpc.c
@@ -33,6 +33,17 @@
 static const struct sp_epbase_vfptr sp_tcpc_epbase_vfptr =
     {sp_tcpc_close};
 
+/*  WAITING state. */
+static void sp_tcpc_timeout (const struct sp_sink **self,
+    struct sp_timer *timer);
+static const struct sp_sink sp_tcpc_state_waiting = {
+    NULL,
+    NULL,
+    NULL,
+    NULL,
+    NULL,
+    sp_tcpc_timeout
+};
 
 /*  CONNECTING state. */
 static void sp_tcpc_connected (const struct sp_sink **self,
@@ -48,69 +59,30 @@
     NULL
 };
 
-/*  WAITING state. */
-static void sp_tcpc_timeout (const struct sp_sink **self,
-    struct sp_timer *timer);
-static const struct sp_sink sp_tcpc_state_waiting = {
-    NULL,
-    NULL,
-    NULL,
-    NULL,
-    NULL,
-    sp_tcpc_timeout
-};
-
 int sp_tcpc_init (struct sp_tcpc *self, const char *addr, void *hint)
 {
     int rc;
-    int port;
-    const char *colon;
-    struct sockaddr_storage ss;
-    socklen_t sslen;
 
-    /*  Make sure we're working from a clean slate. Required on Mac OS X. */
-    memset (&ss, 0, sizeof (ss));
-
-    /*  Parse the port. */
-    rc = sp_addr_parse_port (addr, &colon);
-    if (rc < 0)
-        return rc;
-    port = rc;
-
-    /*  TODO: Parse the local address, if any. */
-
-    /*  Parse the address. */
-    /*  TODO:  Get the actual value of the IPV4ONLY socket option. */
-    rc = sp_addr_parse_remote (addr, colon - addr, SP_ADDR_IPV4ONLY,
-        &ss, &sslen);
-    if (rc < 0)
-        return rc;
-
-    /*  Combine the port and the address. */
-    if (ss.ss_family == AF_INET)
-        ((struct sockaddr_in*) &ss)->sin_port = htons (port);
-    else if (ss.ss_family == AF_INET6)
-        ((struct sockaddr_in6*) &ss)->sin6_port = htons (port);
-    else
-        sp_assert (0);
+    /*  TODO: Check the syntax of the address and return error if it is
+        not a valid address string. Don't do any blocking DNS operations
+        though! */
 
     /*  Initialise the base class. */
     sp_epbase_init (&self->epbase, &sp_tcpc_epbase_vfptr, addr, hint);
 
+    /*  Open a socket. */
+    rc = sp_usock_init (&self->usock, &self->sink,
+        AF_INET, SOCK_STREAM, IPPROTO_TCP, sp_epbase_getcp (&self->epbase));
+    errnum_assert (rc == 0, -rc);
+
     /*  Initialise the retry timer. */
     sp_timer_init (&self->retry_timer, &self->sink,
         sp_epbase_getcp (&self->epbase));
 
-    /*  Open the socket and start connecting. */
-    self->sink = &sp_tcpc_state_connecting;
-    rc = sp_usock_init (&self->usock, &self->sink,
-        AF_INET, SOCK_STREAM, IPPROTO_TCP, sp_epbase_getcp (&self->epbase));
-    errnum_assert (rc == 0, -rc);
-    rc = sp_usock_connect (&self->usock, (struct sockaddr*) &ss, sslen);
-    if (rc == 0)
-        sp_tcpc_connected (&self->sink, &self->usock);
-    else
-        errnum_assert (rc == -EINPROGRESS, -rc);
+    /*  Pretend we were waiting for the re-connect timer and that the timer
+        have expired. */
+    self->sink = &sp_tcpc_state_waiting;
+    sp_tcpc_timeout (&self->sink, &self->retry_timer);
 
     return 0;
 }
@@ -137,7 +109,6 @@
     int rc;
     struct sp_tcpc *tcpc;
 
-printf ("connect failed!\n");
     tcpc = sp_cont (self, struct sp_tcpc, sink);
 
     /*  Connect failed. Close the old socket and create a new one. */
@@ -155,11 +126,57 @@
 static void sp_tcpc_timeout (const struct sp_sink **self,
     struct sp_timer *timer)
 {
+    int rc;
     struct sp_tcpc *tcpc;
+    const char *addr;
+    int port;
+    const char *colon;
+    struct sockaddr_storage ss;
+    socklen_t sslen;
 
     tcpc = sp_cont (self, struct sp_tcpc, sink);
 
-    /*  Retry timer expired. */
-    sp_assert (0);
+    /*  Retry timer expired. Now we'll try to resolve the address. */
+    addr = sp_epbase_getaddr (&tcpc->epbase);
+
+    /*  Make sure we're working from a clean slate. Required on Mac OS X. */
+    memset (&ss, 0, sizeof (ss));
+
+    /*  Parse the port. */
+    port = sp_addr_parse_port (addr, &colon);
+
+    /*  TODO: Parse the local address, if any. */
+
+    /*  Parse the address. */
+    /*  TODO:  Get the actual value of the IPV4ONLY socket option. */
+    rc = sp_addr_parse_remote (addr, colon - addr, SP_ADDR_IPV4ONLY,
+        &ss, &sslen);
+
+    /*  If the address resolution have failed, wait and re-try. */
+    if (rc < 0) {
+        tcpc->sink = &sp_tcpc_state_waiting;
+        /*  TODO: Get the retry interval from the socket option. */
+        sp_timer_start (&tcpc->retry_timer, 100);
+        return;
+    }
+
+    /*  Combine the port and the address. */
+    if (ss.ss_family == AF_INET)
+        ((struct sockaddr_in*) &ss)->sin_port = htons (port);
+    else if (ss.ss_family == AF_INET6)
+        ((struct sockaddr_in6*) &ss)->sin6_port = htons (port);
+    else
+        sp_assert (0);
+
+    /*  TODO: New RESOLVING state should be added here to deal with
+        asynchronous DNS queries. */
+
+    /*  Open the socket and start connecting. */
+    tcpc->sink = &sp_tcpc_state_connecting;
+    rc = sp_usock_connect (&tcpc->usock, (struct sockaddr*) &ss, sslen);
+    if (rc == 0)
+        sp_tcpc_connected (&tcpc->sink, &tcpc->usock);
+    else
+        errnum_assert (rc == -EINPROGRESS, -rc);
 }
 
diff --git a/src/utils/aio.c b/src/utils/aio.c
index b096ff4..35197d3 100644
--- a/src/utils/aio.c
+++ b/src/utils/aio.c
@@ -359,16 +359,17 @@
     if (errno != EINPROGRESS)
         return -errno;
 
-    /*  If we are in the worker thread we can simply start polling for out. */
+    /*  If we are in the worker thread we can simply start polling for out.
+        Otherwise, ask worker thread to start polling for out. */
     if (sp_thread_current (&self->cp->worker)) {
         sp_poller_add (&self->cp->poller, self->s, &self->hndl);
         sp_poller_set_out (&self->cp->poller, &self->hndl);
     }
-
-    /*  Otherwise, ask worker thread to start polling for out. */
-    sp_queue_push (&self->cp->opqueue, &self->add_hndl.item);
-    sp_queue_push (&self->cp->opqueue, &self->out.hndl.item);
-    sp_efd_signal (&self->cp->efd);
+    else {
+        sp_queue_push (&self->cp->opqueue, &self->add_hndl.item);
+        sp_queue_push (&self->cp->opqueue, &self->out.hndl.item);
+        sp_efd_signal (&self->cp->efd);
+    }
 
     return -EINPROGRESS;
 }
@@ -563,6 +564,7 @@
                 waiting. */
             if (phndl == &self->efd_hndl) {
                 sp_assert (event == SP_POLLER_IN);
+                sp_efd_unsignal (&self->efd);
                 continue;
             }