blob: 2d25d1e9484bfdd529830d2af88ac6cd48de8ffb [file] [log] [blame]
package td
import (
"context"
"errors"
"fmt"
"io"
"os"
"strings"
"time"
"github.com/google/uuid"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/exec"
"go.skia.org/infra/go/luciauth"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/sklog/cloudlogging"
"go.skia.org/infra/go/util"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
compute "google.golang.org/api/compute/v1"
)
const (
// PubsubTopicLogs is the PubSub topic name for task driver logs.
PubsubTopicLogs = "task-driver-logs"
// logID is the StackDriver log ID for all Task Drivers. Logs are labeled
// with task ID and step ID as well, and those labels should be used for
// filtering in most cases.
logID = "task-driver"
// StepIDRoot is the fixed ID of the root step.
StepIDRoot = "root"
// Environment variables provided to all Swarming tasks.
envVarSwarmingBot = "SWARMING_BOT_ID"
envVarSwarmingServer = "SWARMING_SERVER"
envVarSwarmingTask = "SWARMING_TASK_ID"
// envVarPath represents the PATH environment variable.
envVarPath = "PATH"
// EnvVarWrappedStepID indicates that a task driver is nested inside of
// another, with the given step ID as its parent.
EnvVarWrappedStepID = "TASK_DRIVER_WRAPPED_STEP_ID"
)
var (
// BaseEnv is the basic set of environment variables provided to all steps.
BaseEnv = []string{
"CHROME_HEADLESS=1",
"GIT_USER_AGENT=git/1.9.1", // I don't think this version matters.
}
// Auth scopes required for all task_drivers.
authScopes = []string{compute.CloudPlatformScope}
)
// RunProperties are properties for a single run of a Task Driver.
type RunProperties struct {
Local bool `json:"local"`
SwarmingBot string `json:"swarmingBot,omitempty"`
SwarmingServer string `json:"swarmingServer,omitempty"`
SwarmingTask string `json:"swarmingTask,omitempty"`
}
// Validate implements util.Validator.
func (p *RunProperties) Validate() error {
if p.Local {
if p.SwarmingBot != "" {
return errors.New("SwarmingBot must be empty for local runs!")
}
if p.SwarmingServer != "" {
return errors.New("SwarmingServer must be empty for local runs!")
}
if p.SwarmingTask != "" {
return errors.New("SwarmingTask must be empty for local runs!")
}
} else {
if p.SwarmingBot == "" {
return errors.New("SwarmingBot is required for non-local runs!")
}
if p.SwarmingServer == "" {
return errors.New("SwarmingServer is required for non-local runs!")
}
if p.SwarmingTask == "" {
return errors.New("SwarmingTask is required for non-local runs!")
}
}
return nil
}
// Copy returns a copy of the RunProperties.
func (p *RunProperties) Copy() *RunProperties {
if p == nil {
return nil
}
return &RunProperties{
Local: p.Local,
SwarmingBot: p.SwarmingBot,
SwarmingServer: p.SwarmingServer,
SwarmingTask: p.SwarmingTask,
}
}
// StartRunWithErr begins a new test automation run, returning any error which
// occurs.
func StartRunWithErr(projectId, taskId, taskName, output *string, local *bool) (context.Context, error) {
common.Init()
// TODO(borenet): Catch SIGINT, SIGKILL and report.
// Gather RunProperties.
swarmingBot := os.Getenv(envVarSwarmingBot)
swarmingServer := os.Getenv(envVarSwarmingServer)
swarmingTask := os.Getenv(envVarSwarmingTask)
// "reproduce" is supplied by "swarming.py reproduce" and indicates that
// this is actually a local run, but --local won't have been provided
// because the command was copied directly from the Swarming task.
if swarmingTask == "reproduce" || swarmingBot == "reproduce" {
*local = true
swarmingBot = ""
swarmingServer = ""
swarmingTask = ""
}
if *local {
// Check to make sure we're not actually running in production.
// Note that the presence of SWARMING_SERVER does not indicate
// that we're running in production, because it can be used with
// swarming.py as an alternative to --swarming.
errTmpl := "--local was supplied but %s environment variable was found. Was --local used by accident?"
if swarmingBot != "" {
return nil, fmt.Errorf(errTmpl, envVarSwarmingBot)
} else if swarmingTask != "" {
return nil, fmt.Errorf(errTmpl, envVarSwarmingTask)
}
// Prevent clobbering real task data for local tasks.
hostname, err := os.Hostname()
if err != nil {
return nil, err
}
*taskId = fmt.Sprintf("%s_%s", hostname, uuid.New())
} else {
// Check to make sure that we're not running locally and the
// user forgot to use --local.
errTmpl := "--local was not supplied but environment variable %s was not found. Did you forget to use --local?"
if swarmingBot == "" {
return nil, fmt.Errorf(errTmpl, envVarSwarmingBot)
} else if swarmingServer == "" {
return nil, fmt.Errorf(errTmpl, envVarSwarmingServer)
} else if swarmingTask == "" {
return nil, fmt.Errorf(errTmpl, envVarSwarmingTask)
}
}
// Validate properties and flags.
props := &RunProperties{
Local: *local,
SwarmingBot: swarmingBot,
SwarmingServer: swarmingServer,
SwarmingTask: swarmingTask,
}
if err := props.Validate(); err != nil {
return nil, err
}
if !*local {
if *projectId == "" {
return nil, fmt.Errorf("Project ID is required.")
}
if *taskId == "" {
return nil, fmt.Errorf("Task ID is required.")
}
if *taskName == "" {
return nil, fmt.Errorf("Task name is required.")
}
}
// Create the token source.
var ts oauth2.TokenSource
if *local {
var err error
ts, err = google.DefaultTokenSource(context.TODO(), authScopes...)
if err != nil {
return nil, err
}
} else {
var err error
ts, err = luciauth.NewLUCIContextTokenSource(authScopes...)
if err != nil {
return nil, fmt.Errorf("Failed to obtain LUCI TokenSource: %s", err)
}
}
// Connect receivers.
receiver := MultiReceiver([]Receiver{
&DebugReceiver{},
newReportReceiver(*output),
})
// Initialize Cloud Logging.
ctx := context.Background()
if *projectId != "" && *taskId != "" && *taskName != "" {
labels := map[string]string{
"taskId": *taskId,
"taskName": *taskName,
}
logger, err := cloudlogging.New(ctx, *projectId, logID, ts, labels)
if err != nil {
return nil, err
}
cloudLogging, err := NewCloudLoggingReceiver(logger.Logger())
if err != nil {
return nil, err
}
receiver = append(receiver, cloudLogging)
}
// Dump environment variables.
sklog.Infof("Environment:\n%s", strings.Join(os.Environ(), "\n"))
// Set up and return the root-level Step.
ctx = newRun(ctx, receiver, *taskId, *taskName, props)
return ctx, nil
}
// StartRun begins a new test automation run, panicking if any setup fails.
func StartRun(projectId, taskId, taskName, output *string, local *bool) context.Context {
ctx, err := StartRunWithErr(projectId, taskId, taskName, output, local)
if err != nil {
sklog.Fatalf("Failed task_driver.StartRun(): %s", err)
}
return ctx
}
// EndRun performs any cleanup work for the run. Should be deferred in main().
func EndRun(ctx context.Context) {
defer util.Close(getCtx(ctx).run)
// Mark the root step as finished.
finishStep(ctx, recover())
}
// run represents a full test automation run.
type run struct {
receiver Receiver
taskId string
}
// newRun returns a context.Context representing a Task Driver run, including
// creation of a root step.
func newRun(ctx context.Context, rec Receiver, taskId, taskName string, props *RunProperties) context.Context {
r := &run{
receiver: rec,
taskId: taskId,
}
execRun := exec.GetRunFn(ctx)
ctx = context.WithValue(ctx, contextKey, &Context{
run: r,
execRun: execRun,
})
env := MergeEnv(os.Environ(), BaseEnv)
if wrappedStepID, ok := os.LookupEnv(EnvVarWrappedStepID); ok {
// We don't send a RUN_STARTED message, or a STEP_STARTED message for
// the root step, because we want all of the steps for this task driver
// to nest under the step given by the environment variable. Instead,
// set up a context which uses the wrapped step ID.
rootProps := &StepProperties{
Id: wrappedStepID,
Name: taskName,
Environ: env,
}
ctx = withChildCtx(ctx, &Context{
step: rootProps,
})
// ReportReceiver needs a root step for its tree structure. Send a
// message to any ReportReceivers (but not to any others) to create the
// root step.
msg := &Message{
Type: MsgType_StepStarted,
StepId: StepIDRoot,
Step: rootProps,
}
var sendMsg func(Receiver)
sendMsg = func(rec Receiver) {
if multi, ok := rec.(MultiReceiver); ok {
for _, elem := range multi {
sendMsg(elem)
}
} else if report, ok := rec.(*ReportReceiver); ok {
_ = report.HandleMessage(msg)
}
}
sendMsg(rec)
} else {
r.send(&Message{
Type: MsgType_RunStarted,
Run: props,
})
ctx = newStep(ctx, StepIDRoot, nil, Props(taskName).Env(env))
}
return ctx
}
// Send the given message to the receiver. Does not return an error, even if
// sending fails.
func (r *run) send(msg *Message) {
msg.ID = uuid.New().String()
msg.TaskId = r.taskId
msg.Timestamp = time.Now().UTC()
if err := msg.Validate(); err != nil {
sklog.Error(err)
}
if err := r.receiver.HandleMessage(msg); err != nil {
// Just log the error but don't return it.
// TODO(borenet): How do we handle this?
sklog.Error(err)
}
}
// Send a Message indicating that a new step has started.
func (r *run) Start(props *StepProperties) {
msg := &Message{
Type: MsgType_StepStarted,
StepId: props.Id,
Step: props,
}
r.send(msg)
}
// Send a Message with additional data for the current step.
func (r *run) AddStepData(id string, typ DataType, d interface{}) {
msg := &Message{
Type: MsgType_StepData,
StepId: id,
Data: d,
DataType: typ,
}
r.send(msg)
}
// Send a Message indicating that the current step has failed with the given
// error.
func (r *run) Failed(id string, err error) {
msg := &Message{
StepId: id,
Error: err.Error(),
}
if IsInfraError(err) {
msg.Type = MsgType_StepException
} else {
msg.Type = MsgType_StepFailed
}
r.send(msg)
}
// Send a Message indicating that the current step has finished.
func (r *run) Finish(id string) {
msg := &Message{
Type: MsgType_StepFinished,
StepId: id,
}
r.send(msg)
}
// cbWriter is a wrapper around an io.Writer which runs a callback on the first
// call to Write().
type cbWriter struct {
w io.Writer
cb func()
}
// See documentation for io.Writer interface.
func (w *cbWriter) Write(b []byte) (int, error) {
if w.cb != nil {
w.cb()
w.cb = nil
}
return w.w.Write(b)
}
// Open a log stream.
func (r *run) LogStream(stepId, logName string, severity Severity) io.Writer {
logId := uuid.New().String() // TODO(borenet): Come up with a better ID.
w, err := r.receiver.LogStream(stepId, logId, severity)
if err != nil {
panic(err)
}
// Wrap the io.Writer with a cbWrite so that we only send step data for
// log streams which are actually used.
return &cbWriter{
w: w,
cb: func() {
// Emit step data for the log stream.
r.AddStepData(stepId, DataType_Log, &LogData{
Name: logName,
Id: logId,
Severity: severity.String(),
})
},
}
}
// Close the run.
func (r *run) Close() error {
return r.receiver.Close()
}