[task driver] Speculative fix for log parser flakes

Bug: skia:9827
Change-Id: I8cceb64f42ddc9cbfa134ab7298a441c8c36e89c
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/279376
Reviewed-by: Kevin Lubick <kjlubick@google.com>
Commit-Queue: Eric Boren <borenet@google.com>
diff --git a/task_driver/go/lib/log_parser/log_parser.go b/task_driver/go/lib/log_parser/log_parser.go
index 9f8416b..6be04de 100644
--- a/task_driver/go/lib/log_parser/log_parser.go
+++ b/task_driver/go/lib/log_parser/log_parser.go
@@ -276,13 +276,17 @@
 	cmd.Dir = cwd
 	cmd.Env = td.GetEnv(ctx)
 
-	// Helper function for streaming output.
+	// stream is a helper function for io streams. Keep track of the streams
+	// so that they can be closed and allow the streaming goroutines to exit
+	// in the case of a context cancellation.
+	closers := []io.Closer{}
 	var wg sync.WaitGroup
-	stream := func(r io.Reader, w io.Writer, split bufio.SplitFunc, handle func(string)) {
+	stream := func(readFrom io.ReadCloser, teeTo io.Writer, split bufio.SplitFunc, handle func(string)) {
 		wg.Add(1)
+		closers = append(closers, readFrom)
 		go func() {
 			defer wg.Done()
-			scanner := bufio.NewScanner(io.TeeReader(r, w))
+			scanner := bufio.NewScanner(io.TeeReader(readFrom, teeTo))
 			scanner.Split(split)
 			for scanner.Scan() {
 				handle(scanner.Text())
@@ -290,6 +294,46 @@
 		}()
 	}
 
+	// logStderr attaches the given message to the stderr stream for all
+	// active steps.
+	logStderr := func(msg string) {
+		for _, step := range sm.Leaves() {
+			step.StderrLn(msg)
+		}
+	}
+
+	// Spin up a goroutine which watches for context cancellation and closes
+	// the io streams to allow the streaming goroutines to exit in case of a
+	// context cancellation. Note that this wouldn't be necessary if
+	// bufio.Scanner used a channel; we could use select{} inside stream()
+	// in that case.
+	stop := make(chan struct{})
+	go func() {
+		select {
+		case <-stop:
+			// The command exited normally; nothing to do.
+			return
+		case <-ctx.Done():
+			// This is a timeout.
+			// Log errors to the active steps.
+			msg := ctx.Err().Error()
+			logStderr(msg)
+			sm.root.StderrLn(msg)
+			// Close the streams, to allow the streaming goroutines
+			// to exit.
+			for _, closer := range closers {
+				_ = closer.Close()
+			}
+			// Read from the stop channel, to allow the deferred
+			// write to finish.
+			<-stop
+		}
+	}()
+	defer func() {
+		// Allow the above goroutine to exit.
+		stop <- struct{}{}
+	}()
+
 	// Handle stdout.
 	stdout, err := cmd.StdoutPipe()
 	if err != nil {
@@ -308,20 +352,27 @@
 	if err != nil {
 		return td.FailStep(ctx, err)
 	}
-	stream(stderr, sm.root.stderr, bufio.ScanLines, func(tok string) {
-		for _, step := range sm.Leaves() {
-			step.StderrLn(tok)
-		}
-	})
+	stream(stderr, sm.root.stderr, bufio.ScanLines, logStderr)
 
 	// Start the command.
 	if err := cmd.Start(); err != nil {
 		return td.FailStep(ctx, skerr.Wrapf(err, "Failed to start command"))
 	}
 
-	// Wait for the command to finish.
-	err = cmd.Wait()
+	// Wait for the command to finish. Per documentation of
+	// exec.StdoutPipe(), "it is incorrect to call Wait before all reads
+	// from the pipe have completed." In other words, wg.Wait(), which waits
+	// for the log-streaming goroutines started above to complete, should be
+	// called before cmd.Wait(). Note, however, that exec.CommandContext()
+	// does not always respect the context cancellation if StdoutPipe() or
+	// StderrPipe() are still open and the subprocess has passed those file
+	// descriptors on to its own subprocess. Therefore, we need the
+	// additional goroutines above to watch for context cancellation and
+	// close the pipes, otherwise the subprocess will run to completion
+	// despite the timeout.
+	// See https://github.com/golang/go/issues/21922 for more detail.
 	wg.Wait()
+	err = cmd.Wait()
 
 	// If any steps are still active, mark them finished.
 	sm.root.Recurse(func(s *Step) {
diff --git a/task_driver/go/lib/log_parser/log_parser_test.go b/task_driver/go/lib/log_parser/log_parser_test.go
index 07d6cba..0d2c202 100644
--- a/task_driver/go/lib/log_parser/log_parser_test.go
+++ b/task_driver/go/lib/log_parser/log_parser_test.go
@@ -118,6 +118,10 @@
 	require.Equal(t, 2, len(res.Steps[0].Steps))
 	require.Equal(t, td.STEP_RESULT_SUCCESS, res.Steps[0].Steps[0].Result)
 	require.Equal(t, td.STEP_RESULT_FAILURE, res.Steps[0].Steps[1].Result)
+
+	// The active steps should have received errors in their logs.
+	assertLogMatchesContent(t, res.Steps[0], logNameStderr, context.DeadlineExceeded.Error()+"\n")
+	assertLogMatchesContent(t, res.Steps[0].Steps[1], logNameStderr, context.DeadlineExceeded.Error()+"\n")
 }
 
 // numberedStepsRe matches lines like "Step 1: Hello World" to produce steps.