blob: 6be04deaccf74c1ebdd5e9efb580bd7951255e03 [file] [log] [blame]
package log_parser
import (
"bufio"
"context"
"errors"
"io"
"os/exec"
"regexp"
"strings"
"sync"
"go.skia.org/infra/go/ring"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/task_driver/go/td"
)
const (
// Maximum number of output lines stored in memory to pass along as
// error messages for failed tests.
OUTPUT_LINES = 20
// logNameStdout is the log name used for the stdout stream of each step.
logNameStdout = "stdout"
// logNameStderr is the log name used for the stderr stream of each step.
logNameStderr = "stderr"
)
// Step represents a step in a tree of steps generated during Run.
type Step struct {
// Task Driver uses contexts to represent steps, which fits the typical
// case where the step hierarchy maps to function scopes. However,
// log_parser is dealing with a tree containing an arbitrary number of
// concurrent steps which are generated on the fly, which makes it
// necessary to break the best practice of never storing contexts.
ctx context.Context
stdout io.Writer
stderr io.Writer
logBuf *ring.StringRing
name string
isRoot bool
parent *Step
// The empty struct has a size of zero, so it's more memory efficient
// than bool or other options.
children map[*Step]struct{}
childrenMtx sync.Mutex
}
// newStep returns a Step instance.
func newStep(ctx context.Context, name string, parent *Step) *Step {
logBuf := ring.NewStringRing(OUTPUT_LINES)
stdout := io.MultiWriter(logBuf, td.NewLogStream(ctx, logNameStdout, td.Info))
stderr := io.MultiWriter(logBuf, td.NewLogStream(ctx, logNameStderr, td.Error))
if parent != nil && !parent.isRoot {
stdout = io.MultiWriter(stdout, parent.stdout)
stderr = io.MultiWriter(stderr, parent.stderr)
}
return &Step{
ctx: ctx,
stdout: stdout,
stderr: stderr,
logBuf: logBuf,
name: name,
isRoot: false,
parent: parent,
children: map[*Step]struct{}{},
}
}
// StartChild creates a new step as a direct child of this step.
func (s *Step) StartChild(props *td.StepProperties) *Step {
ctx := td.StartStep(s.ctx, props)
child := newStep(ctx, props.Name, s)
s.childrenMtx.Lock()
defer s.childrenMtx.Unlock()
s.children[child] = struct{}{}
return child
}
// FindChild finds the descendant of this step with the given name. Returns nil
// if no active descendant step exists. If it is possible that the log output of
// the command being run could generate more than one step with the same name,
// then it probably isn't a good idea to use FindChild; if more than one
// matching descendant exists, one is arbitrarily chosen.
func (s *Step) FindChild(name string) *Step {
var found *Step
s.Recurse(func(s *Step) {
if s.name == name {
found = s
}
})
return found
}
// Fail this step.
func (s *Step) Fail() {
msg := strings.Join(s.logBuf.GetAll(), "")
if msg == "" {
msg = "Step failed with no output"
}
_ = td.FailStep(s.ctx, errors.New(msg))
}
// removeChild removes the given child step.
func (s *Step) removeChild(child *Step) {
s.childrenMtx.Lock()
defer s.childrenMtx.Unlock()
delete(s.children, child)
}
// End this step and any of its active descendants.
func (s *Step) End() {
s.Recurse(func(s *Step) {
// Mark this step as finished.
td.EndStep(s.ctx)
// Remove this step's children, which have already been marked
// as finished because Recurse() runs bottom-up. It is safe to
// access s.children because Recurse() locks s.childrenMtx.
s.children = map[*Step]struct{}{}
})
// Remove this step from the parent's children map.
if s.parent != nil {
s.parent.removeChild(s)
}
}
// stringToByteLine converts the given string to a slice of bytes and appends
// a newline.
func stringToByteLine(s string) []byte {
b := make([]byte, len(s)+1)
copy(b, s)
b[len(b)-1] = '\n'
return b
}
// Stdout writes the given string to the stdout streams for this step and all of
// its ancestors except for the root step, which automatically receives the raw
// output of the command. Note that no newline is appended, so if you are using
// bufio.ScanLines to tokenize log output and then calling Step.Stdout to attach
// logs to each step, the newlines which were originally present in the log
// stream will be lost.
func (s *Step) Stdout(msg string) {
if _, err := s.stdout.Write([]byte(msg)); err != nil {
sklog.Errorf("Failed to write log output: %s", err)
}
}
// StdoutLn writes the given string, along with a trailing newline, to the
// stdout streams for this step and all of its ancestors except for the root
// step, which automatically receives the raw output of the command.
func (s *Step) StdoutLn(msg string) {
if _, err := s.stdout.Write(stringToByteLine(msg)); err != nil {
sklog.Errorf("Failed to write log output: %s", err)
}
}
// Stderr writes the given string to the stderr streams for this step and all of
// its ancestors except for the root step, which automatically receives the raw
// output of the command. Note that no newline is appended, so if you are using
// bufio.ScanLines to tokenize log output and then calling Step.Stdout to attach
// logs to each step, the newlines which were originally present in the log
// stream will be lost.
func (s *Step) Stderr(msg string) {
if _, err := s.stderr.Write([]byte(msg)); err != nil {
sklog.Errorf("Failed to write log output: %s", err)
}
}
// StderrLn writes the given string, along with a trailing newline, to the
// stderr streams for this step and all of its ancestors except for the root
// step, which automatically receives the raw output of the command.
func (s *Step) StderrLn(msg string) {
if _, err := s.stderr.Write(stringToByteLine(msg)); err != nil {
sklog.Errorf("Failed to write log output: %s", err)
}
}
// Recurse runs the given function on this Step and each of its descendants,
// in bottom-up order, ie. the func runs for the children before the parent.
// The provided function may not call Recurse; this will result in a deadlock.
func (s *Step) Recurse(fn func(s *Step)) {
s.childrenMtx.Lock()
defer s.childrenMtx.Unlock()
for child := range s.children {
child.Recurse(fn)
}
fn(s)
}
// StepManager emits steps during a Run. It is thread-safe.
type StepManager struct {
mtx sync.Mutex
root *Step
}
// newStepManager returns a StepManager instance which creates child steps of
// the given parent step.
func newStepManager(ctx context.Context) *StepManager {
root := newStep(ctx, "", nil)
root.isRoot = true
return &StepManager{
root: root,
}
}
// Leaves returns all active non-root Steps with no children.
func (sm *StepManager) Leaves() []*Step {
var rv []*Step
sm.root.Recurse(func(s *Step) {
// It is safe to access s.children because Recurse locks
// s.childrenMtx.
if s != sm.root && len(s.children) == 0 {
rv = append(rv, s)
}
})
return rv
}
// CurrentStep returns the current step or nil if there is no active step. If
// there is more than one active step, one is arbitrarily chosen.
func (s *StepManager) CurrentStep() *Step {
s.mtx.Lock()
defer s.mtx.Unlock()
for _, step := range s.Leaves() {
return step
}
return nil
}
// StartStep starts a step as a child of the root step.
func (s *StepManager) StartStep(props *td.StepProperties) *Step {
s.mtx.Lock()
defer s.mtx.Unlock()
return s.root.StartChild(props)
}
// FindStep finds the descendant of this step with the given name. Returns nil
// if no active descendant step exists. If it is possible that the log output of
// the command being run could generate more than one step with the same name,
// then it probably isn't a good idea to use FindStep; if more than one
// matching descendant exists, one is arbitrarily chosen.
func (s *StepManager) FindStep(name string) *Step {
return s.root.FindChild(name)
}
// TokenHandler is a function which is called for every token in the log stream
// during a given execution of Run(). It generates steps using the provided
// StepManager.
type TokenHandler func(*StepManager, string) error
// Run runs the given command in the given working directory. A root-level step
// is automatically created and inherits the raw output (stdout and stderr,
// separately) and result of the command. Run calls the provided function for
// every token in the stdout stream of the command, as defined by the given
// SplitFunc. This function receives a StepManager which may be used to generate
// sub-steps based on the tokens. Stderr is sent to both the root step and to
// each active sub-step at the time that output is received; note that, since
// stdout and stderr are independent streams, there is no guarantee that stderr
// related to a given sub-step will actually appear in the stderr stream for
// that sub-step. The degree of consistency will depend on the operating system
// and the sub-process itself. Therefore, Run will be most useful and consistent
// for applications whose output is highly structured, with any errors sent to
// stdout as part of this structure. See `go test --json` for a good example.
// Note that this inconsistency applies to the raw stderr stream but not to
// calls to Step.Stdout(), Step.Stderr(), etc.
func Run(ctx context.Context, cwd string, cmdLine []string, split bufio.SplitFunc, handleToken TokenHandler) error {
ctx = td.StartStep(ctx, td.Props(strings.Join(cmdLine, " ")))
defer td.EndStep(ctx)
// Create the StepManager.
sm := newStepManager(ctx)
// Set up the command.
cmd := exec.CommandContext(ctx, cmdLine[0], cmdLine[1:]...)
cmd.Dir = cwd
cmd.Env = td.GetEnv(ctx)
// stream is a helper function for io streams. Keep track of the streams
// so that they can be closed and allow the streaming goroutines to exit
// in the case of a context cancellation.
closers := []io.Closer{}
var wg sync.WaitGroup
stream := func(readFrom io.ReadCloser, teeTo io.Writer, split bufio.SplitFunc, handle func(string)) {
wg.Add(1)
closers = append(closers, readFrom)
go func() {
defer wg.Done()
scanner := bufio.NewScanner(io.TeeReader(readFrom, teeTo))
scanner.Split(split)
for scanner.Scan() {
handle(scanner.Text())
}
}()
}
// logStderr attaches the given message to the stderr stream for all
// active steps.
logStderr := func(msg string) {
for _, step := range sm.Leaves() {
step.StderrLn(msg)
}
}
// Spin up a goroutine which watches for context cancellation and closes
// the io streams to allow the streaming goroutines to exit in case of a
// context cancellation. Note that this wouldn't be necessary if
// bufio.Scanner used a channel; we could use select{} inside stream()
// in that case.
stop := make(chan struct{})
go func() {
select {
case <-stop:
// The command exited normally; nothing to do.
return
case <-ctx.Done():
// This is a timeout.
// Log errors to the active steps.
msg := ctx.Err().Error()
logStderr(msg)
sm.root.StderrLn(msg)
// Close the streams, to allow the streaming goroutines
// to exit.
for _, closer := range closers {
_ = closer.Close()
}
// Read from the stop channel, to allow the deferred
// write to finish.
<-stop
}
}()
defer func() {
// Allow the above goroutine to exit.
stop <- struct{}{}
}()
// Handle stdout.
stdout, err := cmd.StdoutPipe()
if err != nil {
return td.FailStep(ctx, err)
}
var parseErr error
stream(stdout, sm.root.stdout, split, func(tok string) {
if err := handleToken(sm, tok); err != nil {
parseErr = skerr.Wrapf(err, "Failed handling token %q", tok)
sklog.Error(parseErr.Error())
}
})
// Handle stderr. Attempt to direct it to the appropriate sub-step.
stderr, err := cmd.StderrPipe()
if err != nil {
return td.FailStep(ctx, err)
}
stream(stderr, sm.root.stderr, bufio.ScanLines, logStderr)
// Start the command.
if err := cmd.Start(); err != nil {
return td.FailStep(ctx, skerr.Wrapf(err, "Failed to start command"))
}
// Wait for the command to finish. Per documentation of
// exec.StdoutPipe(), "it is incorrect to call Wait before all reads
// from the pipe have completed." In other words, wg.Wait(), which waits
// for the log-streaming goroutines started above to complete, should be
// called before cmd.Wait(). Note, however, that exec.CommandContext()
// does not always respect the context cancellation if StdoutPipe() or
// StderrPipe() are still open and the subprocess has passed those file
// descriptors on to its own subprocess. Therefore, we need the
// additional goroutines above to watch for context cancellation and
// close the pipes, otherwise the subprocess will run to completion
// despite the timeout.
// See https://github.com/golang/go/issues/21922 for more detail.
wg.Wait()
err = cmd.Wait()
// If any steps are still active, mark them finished.
sm.root.Recurse(func(s *Step) {
// If the command failed, we can't know for sure which step was
// the cause, so we fail any active steps.
if err != nil {
s.Fail()
}
})
sm.root.End()
if err != nil {
return td.FailStep(ctx, err)
}
if parseErr != nil {
return td.FailStep(ctx, parseErr)
}
return nil
}
// RegexpTokenHandler returns a TokenHandler which emits a step whenever it
// encounters a token matching the given regexp. If the regexp matches at
// least one capture group, the first group is used as the name of the step,
// otherwise the entire line is used.
//
// There is at most one active step at a given time; whenever a new step begins,
// any active step is marked finished.
//
// RegexpTokenHandler does not attempt to determine whether steps have
// failed; it relies on Run's behavior of marking any active steps as failed if
// the command itself fails.
//
// All log tokens are emitted as individual lines to the stdout stream of the
// active step.
func RegexpTokenHandler(re *regexp.Regexp) TokenHandler {
return func(sm *StepManager, line string) error {
// Find the currently-active step. Note that log_parser supports
// multiple active steps (because Task Driver does as well), and
// CurrentStep() is therefore ambiguous in general, but because
// RegexpTokenHandler always ends the current step before
// starting a new one, it is not ambiguous in this specific
// case.
s := sm.CurrentStep()
// Does this line indicate the start of a new step?
m := re.FindStringSubmatch(line)
if len(m) > 0 {
// If we're starting a new step and one is already active, mark
// it as finished.
if s != nil {
s.End()
}
// Start the new step.
name := m[0]
if len(m) > 1 {
name = m[1]
}
s = sm.StartStep(td.Props(name))
}
// Log the current line to stdout for the current step, if any.
if s != nil {
s.StdoutLn(line)
}
return nil
}
}
// RunRegexp is a helper function for Run which uses the given regexp to emit
// steps based on lines of output. See documentation for Run and
// RegexpTokenHandler for more detail.
func RunRegexp(ctx context.Context, re *regexp.Regexp, cwd string, cmdLine []string) error {
return Run(ctx, cwd, cmdLine, bufio.ScanLines, RegexpTokenHandler(re))
}