package td

import (
	"context"
	"errors"
	"fmt"
	"io"
	"os"
	"strings"
	"sync/atomic"
	"time"

	"github.com/google/uuid"
	"go.skia.org/infra/go/auth"
	"go.skia.org/infra/go/common"
	"go.skia.org/infra/go/exec"
	"go.skia.org/infra/go/sklog"
	"go.skia.org/infra/go/sklog/cloud_logging"
	"go.skia.org/infra/go/sklog/sklog_impl"
	"go.skia.org/infra/go/util"
	"golang.org/x/oauth2"
	compute "google.golang.org/api/compute/v1"
)

const (
	// PubSub topic name for task driver metadata.
	PUBSUB_TOPIC = "task-driver"
	// PubSub topic name for task driver logs.
	PUBSUB_TOPIC_LOGS = "task-driver-logs"

	// Log ID for all Task Drivers. Logs are labeled with task ID and step
	// ID as well, and those labels should be used for filtering in most
	// cases.
	LOG_ID = "task-driver"

	// Special ID of the root step.
	STEP_ID_ROOT = "root"

	// Environment variables provided to all Swarming tasks.
	ENVVAR_SWARMING_BOT    = "SWARMING_BOT_ID"
	ENVVAR_SWARMING_SERVER = "SWARMING_SERVER"
	ENVVAR_SWARMING_TASK   = "SWARMING_TASK_ID"

	// PATH_VAR represents the PATH environment variable.
	PATH_VAR = "PATH"
)

var (
	BASE_ENV = []string{
		"CHROME_HEADLESS=1",
		"GIT_USER_AGENT=git/1.9.1", // I don't think this version matters.
	}

	// Auth scopes required for all task_drivers.
	SCOPES = []string{compute.CloudPlatformScope}
)

// RunProperties are properties for a single run of a Task Driver.
type RunProperties struct {
	Local          bool   `json:"local"`
	SwarmingBot    string `json:"swarmingBot,omitempty"`
	SwarmingServer string `json:"swarmingServer,omitempty"`
	SwarmingTask   string `json:"swarmingTask,omitempty"`
}

// Return an error if the RunProperties are not valid.
func (p *RunProperties) Validate() error {
	if p.Local {
		if p.SwarmingBot != "" {
			return errors.New("SwarmingBot must be empty for local runs!")
		}
		if p.SwarmingServer != "" {
			return errors.New("SwarmingServer must be empty for local runs!")
		}
		if p.SwarmingTask != "" {
			return errors.New("SwarmingTask must be empty for local runs!")
		}
	} else {
		if p.SwarmingBot == "" {
			return errors.New("SwarmingBot is required for non-local runs!")
		}
		if p.SwarmingServer == "" {
			return errors.New("SwarmingServer is required for non-local runs!")
		}
		if p.SwarmingTask == "" {
			return errors.New("SwarmingTask is required for non-local runs!")
		}
	}
	return nil
}

// Return a copy of the RunProperties.
func (p *RunProperties) Copy() *RunProperties {
	if p == nil {
		return nil
	}
	return &RunProperties{
		Local:          p.Local,
		SwarmingBot:    p.SwarmingBot,
		SwarmingServer: p.SwarmingServer,
		SwarmingTask:   p.SwarmingTask,
	}
}

// StartRunWithErr begins a new test automation run, returning any error which
// occurs.
func StartRunWithErr(projectId, taskId, taskName, output *string, local *bool) (context.Context, error) {
	common.Init()

	// TODO(borenet): Catch SIGINT, SIGKILL and report.

	// Gather RunProperties.
	swarmingBot := os.Getenv(ENVVAR_SWARMING_BOT)
	swarmingServer := os.Getenv(ENVVAR_SWARMING_SERVER)
	swarmingTask := os.Getenv(ENVVAR_SWARMING_TASK)

	// "reproduce" is supplied by "swarming.py reproduce" and indicates that
	// this is actually a local run, but --local won't have been provided
	// because the command was copied directly from the Swarming task.
	if swarmingTask == "reproduce" || swarmingBot == "reproduce" {
		*local = true
		swarmingBot = ""
		swarmingServer = ""
		swarmingTask = ""
	}
	if *local {
		// Check to make sure we're not actually running in production.
		// Note that the presence of SWARMING_SERVER does not indicate
		// that we're running in production, because it can be used with
		// swarming.py as an alternative to --swarming.
		errTmpl := "--local was supplied but %s environment variable was found. Was --local used by accident?"
		if swarmingBot != "" {
			return nil, fmt.Errorf(errTmpl, ENVVAR_SWARMING_BOT)
		} else if swarmingTask != "" {
			return nil, fmt.Errorf(errTmpl, ENVVAR_SWARMING_TASK)
		}

		// Prevent clobbering real task data for local tasks.
		hostname, err := os.Hostname()
		if err != nil {
			return nil, err
		}
		*taskId = fmt.Sprintf("%s_%s", hostname, uuid.New())
	} else {
		// Check to make sure that we're not running locally and the
		// user forgot to use --local.
		errTmpl := "--local was not supplied but environment variable %s was not found. Did you forget to use --local?"
		if swarmingBot == "" {
			return nil, fmt.Errorf(errTmpl, ENVVAR_SWARMING_BOT)
		} else if swarmingServer == "" {
			return nil, fmt.Errorf(errTmpl, ENVVAR_SWARMING_SERVER)
		} else if swarmingTask == "" {
			return nil, fmt.Errorf(errTmpl, ENVVAR_SWARMING_TASK)
		}
	}

	// Validate properties and flags.
	props := &RunProperties{
		Local:          *local,
		SwarmingBot:    swarmingBot,
		SwarmingServer: swarmingServer,
		SwarmingTask:   swarmingTask,
	}
	if err := props.Validate(); err != nil {
		return nil, err
	}
	if !*local {
		if *projectId == "" {
			return nil, fmt.Errorf("Project ID is required.")
		}
		if *taskId == "" {
			return nil, fmt.Errorf("Task ID is required.")
		}
		if *taskName == "" {
			return nil, fmt.Errorf("Task name is required.")
		}
	}

	// Create the token source.
	var ts oauth2.TokenSource
	if *local {
		var err error
		ts, err = auth.NewDefaultTokenSource(*local, SCOPES...)
		if err != nil {
			return nil, err
		}
	} else {
		var err error
		ts, err = auth.NewLUCIContextTokenSource(SCOPES...)
		if err != nil {
			return nil, fmt.Errorf("Failed to obtain LUCI TokenSource: %s", err)
		}
	}

	// Connect receivers.
	receiver := MultiReceiver([]Receiver{
		&DebugReceiver{},
		newReportReceiver(*output),
	})

	// Initialize Cloud Logging.
	ctx := context.Background()
	if *projectId != "" && *taskId != "" && *taskName != "" {
		labels := map[string]string{
			"taskId":   *taskId,
			"taskName": *taskName,
		}
		logger, err := cloud_logging.New(ctx, *projectId, LOG_ID, ts, labels)
		if err != nil {
			return nil, err
		}
		sklog_impl.SetLogger(logger)
		cloudLogging, err := NewCloudLoggingReceiver(logger.Logger())
		if err != nil {
			return nil, err
		}
		receiver = append(receiver, cloudLogging)
	}

	// Dump environment variables.
	sklog.Infof("Environment:\n%s", strings.Join(os.Environ(), "\n"))

	// Set up and return the root-level Step.
	ctx = newRun(ctx, receiver, *taskId, *taskName, props)
	return ctx, nil
}

// StartRun begins a new test automation run, panicking if any setup fails.
func StartRun(projectId, taskId, taskName, output *string, local *bool) context.Context {
	ctx, err := StartRunWithErr(projectId, taskId, taskName, output, local)
	if err != nil {
		sklog.Fatalf("Failed task_driver.StartRun(): %s", err)
	}
	return ctx
}

// Perform any cleanup work for the run. Should be deferred in main().
func EndRun(ctx context.Context) {
	defer util.Close(getCtx(ctx).run)

	// Mark the root step as finished.
	finishStep(ctx, recover())
}

// run represents a full test automation run.
type run struct {
	receiver Receiver
	taskId   string
	msgIndex int32
}

// newRun returns a context.Context representing a Task Driver run, including
// creation of a root step.
func newRun(ctx context.Context, rec Receiver, taskId, taskName string, props *RunProperties) context.Context {
	r := &run{
		receiver: rec,
		taskId:   taskId,
	}
	r.send(&Message{
		Type: MSG_TYPE_RUN_STARTED,
		Run:  props,
	})
	ctx = context.WithValue(ctx, contextKey, &Context{
		run:     r,
		execRun: exec.DefaultRun,
	})
	env := MergeEnv(os.Environ(), BASE_ENV)
	ctx = newStep(ctx, STEP_ID_ROOT, nil, Props(taskName).Env(env))
	return ctx
}

// Send the given message to the receiver. Does not return an error, even if
// sending fails.
func (r *run) send(msg *Message) {
	msg.Index = int(atomic.AddInt32(&r.msgIndex, 1))
	msg.TaskId = r.taskId
	msg.Timestamp = time.Now().UTC()
	if err := msg.Validate(); err != nil {
		sklog.Error(err)
	}
	if err := r.receiver.HandleMessage(msg); err != nil {
		// Just log the error but don't return it.
		// TODO(borenet): How do we handle this?
		sklog.Error(err)
	}
}

// Send a Message indicating that a new step has started.
func (r *run) Start(props *StepProperties) {
	msg := &Message{
		Type:   MSG_TYPE_STEP_STARTED,
		StepId: props.Id,
		Step:   props,
	}
	r.send(msg)
}

// Send a Message with additional data for the current step.
func (r *run) AddStepData(id string, typ DataType, d interface{}) {
	msg := &Message{
		Type:     MSG_TYPE_STEP_DATA,
		StepId:   id,
		Data:     d,
		DataType: typ,
	}
	r.send(msg)
}

// Send a Message indicating that the current step has failed with the given
// error.
func (r *run) Failed(id string, err error) {
	msg := &Message{
		StepId: id,
		Error:  err.Error(),
	}
	if IsInfraError(err) {
		msg.Type = MSG_TYPE_STEP_EXCEPTION
	} else {
		msg.Type = MSG_TYPE_STEP_FAILED
	}
	r.send(msg)
}

// Send a Message indicating that the current step has finished.
func (r *run) Finish(id string) {
	msg := &Message{
		Type:   MSG_TYPE_STEP_FINISHED,
		StepId: id,
	}
	r.send(msg)
}

// cbWriter is a wrapper around an io.Writer which runs a callback on the first
// call to Write().
type cbWriter struct {
	w  io.Writer
	cb func()
}

// See documentation for io.Writer interface.
func (w *cbWriter) Write(b []byte) (int, error) {
	if w.cb != nil {
		w.cb()
		w.cb = nil
	}
	return w.w.Write(b)
}

// Open a log stream.
func (r *run) LogStream(stepId, logName string, severity Severity) io.Writer {
	logId := uuid.New().String() // TODO(borenet): Come up with a better ID.
	w, err := r.receiver.LogStream(stepId, logId, severity)
	if err != nil {
		panic(err)
	}

	// Wrap the io.Writer with a cbWrite so that we only send step data for
	// log streams which are actually used.
	return &cbWriter{
		w: w,
		cb: func() {
			// Emit step data for the log stream.
			r.AddStepData(stepId, DATA_TYPE_LOG, &LogData{
				Name:     logName,
				Id:       logId,
				Severity: severity.String(),
			})
		},
	}
}

// Close the run.
func (r *run) Close() error {
	return r.receiver.Close()
}
