[task driver] Speculative fix for log parser flakes
Bug: skia:9827
Change-Id: I8cceb64f42ddc9cbfa134ab7298a441c8c36e89c
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/279376
Reviewed-by: Kevin Lubick <kjlubick@google.com>
Commit-Queue: Eric Boren <borenet@google.com>
diff --git a/task_driver/go/lib/log_parser/log_parser.go b/task_driver/go/lib/log_parser/log_parser.go
index 9f8416b..6be04de 100644
--- a/task_driver/go/lib/log_parser/log_parser.go
+++ b/task_driver/go/lib/log_parser/log_parser.go
@@ -276,13 +276,17 @@
cmd.Dir = cwd
cmd.Env = td.GetEnv(ctx)
- // Helper function for streaming output.
+ // stream is a helper function for io streams. Keep track of the streams
+ // so that they can be closed and allow the streaming goroutines to exit
+ // in the case of a context cancellation.
+ closers := []io.Closer{}
var wg sync.WaitGroup
- stream := func(r io.Reader, w io.Writer, split bufio.SplitFunc, handle func(string)) {
+ stream := func(readFrom io.ReadCloser, teeTo io.Writer, split bufio.SplitFunc, handle func(string)) {
wg.Add(1)
+ closers = append(closers, readFrom)
go func() {
defer wg.Done()
- scanner := bufio.NewScanner(io.TeeReader(r, w))
+ scanner := bufio.NewScanner(io.TeeReader(readFrom, teeTo))
scanner.Split(split)
for scanner.Scan() {
handle(scanner.Text())
@@ -290,6 +294,46 @@
}()
}
+ // logStderr attaches the given message to the stderr stream for all
+ // active steps.
+ logStderr := func(msg string) {
+ for _, step := range sm.Leaves() {
+ step.StderrLn(msg)
+ }
+ }
+
+ // Spin up a goroutine which watches for context cancellation and closes
+ // the io streams to allow the streaming goroutines to exit in case of a
+ // context cancellation. Note that this wouldn't be necessary if
+ // bufio.Scanner used a channel; we could use select{} inside stream()
+ // in that case.
+ stop := make(chan struct{})
+ go func() {
+ select {
+ case <-stop:
+ // The command exited normally; nothing to do.
+ return
+ case <-ctx.Done():
+ // This is a timeout.
+ // Log errors to the active steps.
+ msg := ctx.Err().Error()
+ logStderr(msg)
+ sm.root.StderrLn(msg)
+ // Close the streams, to allow the streaming goroutines
+ // to exit.
+ for _, closer := range closers {
+ _ = closer.Close()
+ }
+ // Read from the stop channel, to allow the deferred
+ // write to finish.
+ <-stop
+ }
+ }()
+ defer func() {
+ // Allow the above goroutine to exit.
+ stop <- struct{}{}
+ }()
+
// Handle stdout.
stdout, err := cmd.StdoutPipe()
if err != nil {
@@ -308,20 +352,27 @@
if err != nil {
return td.FailStep(ctx, err)
}
- stream(stderr, sm.root.stderr, bufio.ScanLines, func(tok string) {
- for _, step := range sm.Leaves() {
- step.StderrLn(tok)
- }
- })
+ stream(stderr, sm.root.stderr, bufio.ScanLines, logStderr)
// Start the command.
if err := cmd.Start(); err != nil {
return td.FailStep(ctx, skerr.Wrapf(err, "Failed to start command"))
}
- // Wait for the command to finish.
- err = cmd.Wait()
+ // Wait for the command to finish. Per documentation of
+ // exec.StdoutPipe(), "it is incorrect to call Wait before all reads
+ // from the pipe have completed." In other words, wg.Wait(), which waits
+ // for the log-streaming goroutines started above to complete, should be
+ // called before cmd.Wait(). Note, however, that exec.CommandContext()
+ // does not always respect the context cancellation if StdoutPipe() or
+ // StderrPipe() are still open and the subprocess has passed those file
+ // descriptors on to its own subprocess. Therefore, we need the
+ // additional goroutines above to watch for context cancellation and
+ // close the pipes, otherwise the subprocess will run to completion
+ // despite the timeout.
+ // See https://github.com/golang/go/issues/21922 for more detail.
wg.Wait()
+ err = cmd.Wait()
// If any steps are still active, mark them finished.
sm.root.Recurse(func(s *Step) {
diff --git a/task_driver/go/lib/log_parser/log_parser_test.go b/task_driver/go/lib/log_parser/log_parser_test.go
index 07d6cba..0d2c202 100644
--- a/task_driver/go/lib/log_parser/log_parser_test.go
+++ b/task_driver/go/lib/log_parser/log_parser_test.go
@@ -118,6 +118,10 @@
require.Equal(t, 2, len(res.Steps[0].Steps))
require.Equal(t, td.STEP_RESULT_SUCCESS, res.Steps[0].Steps[0].Result)
require.Equal(t, td.STEP_RESULT_FAILURE, res.Steps[0].Steps[1].Result)
+
+ // The active steps should have received errors in their logs.
+ assertLogMatchesContent(t, res.Steps[0], logNameStderr, context.DeadlineExceeded.Error()+"\n")
+ assertLogMatchesContent(t, res.Steps[0].Steps[1], logNameStderr, context.DeadlineExceeded.Error()+"\n")
}
// numberedStepsRe matches lines like "Step 1: Hello World" to produce steps.