package log_parser

import (
	"bufio"
	"context"
	"errors"
	"io"
	"os/exec"
	"regexp"
	"strings"
	"sync"

	"go.skia.org/infra/go/ring"
	"go.skia.org/infra/go/skerr"
	"go.skia.org/infra/go/sklog"
	"go.skia.org/infra/task_driver/go/td"
)

const (
	// Maximum number of output lines stored in memory to pass along as
	// error messages for failed tests.
	OUTPUT_LINES = 20

	// logNameStdout is the log name used for the stdout stream of each step.
	logNameStdout = "stdout"
	// logNameStderr is the log name used for the stderr stream of each step.
	logNameStderr = "stderr"
)

// Step represents a step in a tree of steps generated during Run.
type Step struct {
	// Task Driver uses contexts to represent steps, which fits the typical
	// case where the step hierarchy maps to function scopes. However,
	// log_parser is dealing with a tree containing an arbitrary number of
	// concurrent steps which are generated on the fly, which makes it
	// necessary to break the best practice of never storing contexts.
	ctx    context.Context
	stdout io.Writer
	stderr io.Writer
	logBuf *ring.StringRing
	name   string
	isRoot bool
	parent *Step
	// The empty struct has a size of zero, so it's more memory efficient
	// than bool or other options.
	children    map[*Step]struct{}
	childrenMtx sync.Mutex
}

// newStep returns a Step instance.
func newStep(ctx context.Context, name string, parent *Step) *Step {
	logBuf := ring.NewStringRing(OUTPUT_LINES)
	stdout := io.MultiWriter(logBuf, td.NewLogStream(ctx, logNameStdout, td.Info))
	stderr := io.MultiWriter(logBuf, td.NewLogStream(ctx, logNameStderr, td.Error))
	if parent != nil && !parent.isRoot {
		stdout = io.MultiWriter(stdout, parent.stdout)
		stderr = io.MultiWriter(stderr, parent.stderr)
	}
	return &Step{
		ctx:      ctx,
		stdout:   stdout,
		stderr:   stderr,
		logBuf:   logBuf,
		name:     name,
		isRoot:   false,
		parent:   parent,
		children: map[*Step]struct{}{},
	}
}

// StartChild creates a new step as a direct child of this step.
func (s *Step) StartChild(props *td.StepProperties) *Step {
	ctx := td.StartStep(s.ctx, props)
	child := newStep(ctx, props.Name, s)
	s.childrenMtx.Lock()
	defer s.childrenMtx.Unlock()
	s.children[child] = struct{}{}
	return child
}

// FindChild finds the descendant of this step with the given name. Returns nil
// if no active descendant step exists. If it is possible that the log output of
// the command being run could generate more than one step with the same name,
// then it probably isn't a good idea to use FindChild; if more than one
// matching descendant exists, one is arbitrarily chosen.
func (s *Step) FindChild(name string) *Step {
	var found *Step
	s.Recurse(func(s *Step) {
		if s.name == name {
			found = s
		}
	})
	return found
}

// Fail this step.
func (s *Step) Fail() {
	msg := strings.Join(s.logBuf.GetAll(), "")
	if msg == "" {
		msg = "Step failed with no output"
	}
	_ = td.FailStep(s.ctx, errors.New(msg))
}

// removeChild removes the given child step.
func (s *Step) removeChild(child *Step) {
	s.childrenMtx.Lock()
	defer s.childrenMtx.Unlock()
	delete(s.children, child)
}

// End this step and any of its active descendants.
func (s *Step) End() {
	s.Recurse(func(s *Step) {
		// Mark this step as finished.
		td.EndStep(s.ctx)
		// Remove this step's children, which have already been marked
		// as finished because Recurse() runs bottom-up. It is safe to
		// access s.children because Recurse() locks s.childrenMtx.
		s.children = map[*Step]struct{}{}
	})
	// Remove this step from the parent's children map.
	if s.parent != nil {
		s.parent.removeChild(s)
	}
}

// stringToByteLine converts the given string to a slice of bytes and appends
// a newline.
func stringToByteLine(s string) []byte {
	b := make([]byte, len(s)+1)
	copy(b, s)
	b[len(b)-1] = '\n'
	return b
}

// Stdout writes the given string to the stdout streams for this step and all of
// its ancestors except for the root step, which automatically receives the raw
// output of the command. Note that no newline is appended, so if you are using
// bufio.ScanLines to tokenize log output and then calling Step.Stdout to attach
// logs to each step, the newlines which were originally present in the log
// stream will be lost.
func (s *Step) Stdout(msg string) {
	if _, err := s.stdout.Write([]byte(msg)); err != nil {
		sklog.Errorf("Failed to write log output: %s", err)
	}
}

// StdoutLn writes the given string, along with a trailing newline, to the
// stdout streams for this step and all of its ancestors except for the root
// step, which automatically receives the raw output of the command.
func (s *Step) StdoutLn(msg string) {
	if _, err := s.stdout.Write(stringToByteLine(msg)); err != nil {
		sklog.Errorf("Failed to write log output: %s", err)
	}
}

// Stderr writes the given string to the stderr streams for this step and all of
// its ancestors except for the root step, which automatically receives the raw
// output of the command. Note that no newline is appended, so if you are using
// bufio.ScanLines to tokenize log output and then calling Step.Stdout to attach
// logs to each step, the newlines which were originally present in the log
// stream will be lost.
func (s *Step) Stderr(msg string) {
	if _, err := s.stderr.Write([]byte(msg)); err != nil {
		sklog.Errorf("Failed to write log output: %s", err)
	}
}

// StderrLn writes the given string, along with a trailing newline, to the
// stderr streams for this step and all of its ancestors except for the root
// step, which automatically receives the raw output of the command.
func (s *Step) StderrLn(msg string) {
	if _, err := s.stderr.Write(stringToByteLine(msg)); err != nil {
		sklog.Errorf("Failed to write log output: %s", err)
	}
}

// Recurse runs the given function on this Step and each of its descendants,
// in bottom-up order, ie. the func runs for the children before the parent.
// The provided function may not call Recurse; this will result in a deadlock.
func (s *Step) Recurse(fn func(s *Step)) {
	s.childrenMtx.Lock()
	defer s.childrenMtx.Unlock()
	for child := range s.children {
		child.Recurse(fn)
	}
	fn(s)
}

// StepManager emits steps during a Run. It is thread-safe.
type StepManager struct {
	mtx  sync.Mutex
	root *Step
}

// newStepManager returns a StepManager instance which creates child steps of
// the given parent step.
func newStepManager(ctx context.Context) *StepManager {
	root := newStep(ctx, "", nil)
	root.isRoot = true
	return &StepManager{
		root: root,
	}
}

// Leaves returns all active non-root Steps with no children.
func (sm *StepManager) Leaves() []*Step {
	var rv []*Step
	sm.root.Recurse(func(s *Step) {
		// It is safe to access s.children because Recurse locks
		// s.childrenMtx.
		if s != sm.root && len(s.children) == 0 {
			rv = append(rv, s)
		}
	})
	return rv
}

// CurrentStep returns the current step or nil if there is no active step. If
// there is more than one active step, one is arbitrarily chosen.
func (s *StepManager) CurrentStep() *Step {
	s.mtx.Lock()
	defer s.mtx.Unlock()
	for _, step := range s.Leaves() {
		return step
	}
	return nil
}

// StartStep starts a step as a child of the root step.
func (s *StepManager) StartStep(props *td.StepProperties) *Step {
	s.mtx.Lock()
	defer s.mtx.Unlock()
	return s.root.StartChild(props)
}

// FindStep finds the descendant of this step with the given name. Returns nil
// if no active descendant step exists. If it is possible that the log output of
// the command being run could generate more than one step with the same name,
// then it probably isn't a good idea to use FindStep; if more than one
// matching descendant exists, one is arbitrarily chosen.
func (s *StepManager) FindStep(name string) *Step {
	return s.root.FindChild(name)
}

// TokenHandler is a function which is called for every token in the log stream
// during a given execution of Run(). It generates steps using the provided
// StepManager.
type TokenHandler func(*StepManager, string) error

// Run runs the given command in the given working directory. A root-level step
// is automatically created and inherits the raw output (stdout and stderr,
// separately) and result of the command. Run calls the provided function for
// every token in the stdout stream of the command, as defined by the given
// SplitFunc. This function receives a StepManager which may be used to generate
// sub-steps based on the tokens. Stderr is sent to both the root step and to
// each active sub-step at the time that output is received; note that, since
// stdout and stderr are independent streams, there is no guarantee that stderr
// related to a given sub-step will actually appear in the stderr stream for
// that sub-step. The degree of consistency will depend on the operating system
// and the sub-process itself. Therefore, Run will be most useful and consistent
// for applications whose output is highly structured, with any errors sent to
// stdout as part of this structure. See `go test --json` for a good example.
// Note that this inconsistency applies to the raw stderr stream but not to
// calls to Step.Stdout(), Step.Stderr(), etc.
func Run(ctx context.Context, cwd string, cmdLine []string, split bufio.SplitFunc, handleToken TokenHandler) error {
	ctx = td.StartStep(ctx, td.Props(strings.Join(cmdLine, " ")))
	defer td.EndStep(ctx)

	// Create the StepManager.
	sm := newStepManager(ctx)

	// Set up the command.
	cmd := exec.CommandContext(ctx, cmdLine[0], cmdLine[1:]...)
	cmd.Dir = cwd
	cmd.Env = td.GetEnv(ctx)

	// stream is a helper function for io streams. Keep track of the streams
	// so that they can be closed and allow the streaming goroutines to exit
	// in the case of a context cancellation.
	closers := []io.Closer{}
	var wg sync.WaitGroup
	stream := func(readFrom io.ReadCloser, teeTo io.Writer, split bufio.SplitFunc, handle func(string)) {
		wg.Add(1)
		closers = append(closers, readFrom)
		go func() {
			defer wg.Done()
			scanner := bufio.NewScanner(io.TeeReader(readFrom, teeTo))
			scanner.Split(split)
			for scanner.Scan() {
				handle(scanner.Text())
			}
		}()
	}

	// logStderr attaches the given message to the stderr stream for all
	// active steps.
	logStderr := func(msg string) {
		for _, step := range sm.Leaves() {
			step.StderrLn(msg)
		}
	}

	// Spin up a goroutine which watches for context cancellation and closes
	// the io streams to allow the streaming goroutines to exit in case of a
	// context cancellation. Note that this wouldn't be necessary if
	// bufio.Scanner used a channel; we could use select{} inside stream()
	// in that case.
	stop := make(chan struct{})
	go func() {
		select {
		case <-stop:
			// The command exited normally; nothing to do.
			return
		case <-ctx.Done():
			// This is a timeout.
			// Log errors to the active steps.
			msg := ctx.Err().Error()
			logStderr(msg)
			sm.root.StderrLn(msg)
			// Close the streams, to allow the streaming goroutines
			// to exit.
			for _, closer := range closers {
				_ = closer.Close()
			}
			// Read from the stop channel, to allow the deferred
			// write to finish.
			<-stop
		}
	}()
	defer func() {
		// Allow the above goroutine to exit.
		stop <- struct{}{}
	}()

	// Handle stdout.
	stdout, err := cmd.StdoutPipe()
	if err != nil {
		return td.FailStep(ctx, err)
	}
	var parseErr error
	stream(stdout, sm.root.stdout, split, func(tok string) {
		if err := handleToken(sm, tok); err != nil {
			parseErr = skerr.Wrapf(err, "Failed handling token %q", tok)
			sklog.Error(parseErr.Error())
		}
	})

	// Handle stderr. Attempt to direct it to the appropriate sub-step.
	stderr, err := cmd.StderrPipe()
	if err != nil {
		return td.FailStep(ctx, err)
	}
	stream(stderr, sm.root.stderr, bufio.ScanLines, logStderr)

	// Start the command.
	if err := cmd.Start(); err != nil {
		return td.FailStep(ctx, skerr.Wrapf(err, "Failed to start command"))
	}

	// Wait for the command to finish. Per documentation of
	// exec.StdoutPipe(), "it is incorrect to call Wait before all reads
	// from the pipe have completed." In other words, wg.Wait(), which waits
	// for the log-streaming goroutines started above to complete, should be
	// called before cmd.Wait(). Note, however, that exec.CommandContext()
	// does not always respect the context cancellation if StdoutPipe() or
	// StderrPipe() are still open and the subprocess has passed those file
	// descriptors on to its own subprocess. Therefore, we need the
	// additional goroutines above to watch for context cancellation and
	// close the pipes, otherwise the subprocess will run to completion
	// despite the timeout.
	// See https://github.com/golang/go/issues/21922 for more detail.
	wg.Wait()
	err = cmd.Wait()

	// If any steps are still active, mark them finished.
	sm.root.Recurse(func(s *Step) {
		// If the command failed, we can't know for sure which step was
		// the cause, so we fail any active steps.
		if err != nil {
			s.Fail()
		}
	})
	sm.root.End()
	if err != nil {
		return td.FailStep(ctx, err)
	}
	if parseErr != nil {
		return td.FailStep(ctx, parseErr)
	}
	return nil
}

// RegexpTokenHandler returns a TokenHandler which emits a step whenever it
// encounters a token matching the given regexp. If the regexp matches at
// least one capture group, the first group is used as the name of the step,
// otherwise the entire line is used.
//
// There is at most one active step at a given time; whenever a new step begins,
// any active step is marked finished.
//
// RegexpTokenHandler does not attempt to determine whether steps have
// failed; it relies on Run's behavior of marking any active steps as failed if
// the command itself fails.
//
// All log tokens are emitted as individual lines to the stdout stream of the
// active step.
func RegexpTokenHandler(re *regexp.Regexp) TokenHandler {
	return func(sm *StepManager, line string) error {
		// Find the currently-active step. Note that log_parser supports
		// multiple active steps (because Task Driver does as well), and
		// CurrentStep() is therefore ambiguous in general, but because
		// RegexpTokenHandler always ends the current step before
		// starting a new one, it is not ambiguous in this specific
		// case.
		s := sm.CurrentStep()

		// Does this line indicate the start of a new step?
		m := re.FindStringSubmatch(line)
		if len(m) > 0 {
			// If we're starting a new step and one is already active, mark
			// it as finished.
			if s != nil {
				s.End()
			}
			// Start the new step.
			name := m[0]
			if len(m) > 1 {
				name = m[1]
			}
			s = sm.StartStep(td.Props(name))
		}
		// Log the current line to stdout for the current step, if any.
		if s != nil {
			s.StdoutLn(line)
		}
		return nil
	}
}

// RunRegexp is a helper function for Run which uses the given regexp to emit
// steps based on lines of output. See documentation for Run and
// RegexpTokenHandler for more detail.
func RunRegexp(ctx context.Context, re *regexp.Regexp, cwd string, cmdLine []string) error {
	return Run(ctx, cwd, cmdLine, bufio.ScanLines, RegexpTokenHandler(re))
}
