blob: 3daee1276d336a4973d5283df937af5e65ac3bc9 [file] [log] [blame]
package td
import (
"context"
"errors"
"fmt"
"io"
"os"
"strings"
"sync/atomic"
"time"
"github.com/google/uuid"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/exec"
"go.skia.org/infra/go/luciauth"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/sklog/cloud_logging"
"go.skia.org/infra/go/sklog/sklog_impl"
"go.skia.org/infra/go/util"
"golang.org/x/oauth2"
compute "google.golang.org/api/compute/v1"
)
const (
// PubSub topic name for task driver metadata.
PUBSUB_TOPIC = "task-driver"
// PubSub topic name for task driver logs.
PUBSUB_TOPIC_LOGS = "task-driver-logs"
// Log ID for all Task Drivers. Logs are labeled with task ID and step
// ID as well, and those labels should be used for filtering in most
// cases.
LOG_ID = "task-driver"
// Special ID of the root step.
STEP_ID_ROOT = "root"
// Environment variables provided to all Swarming tasks.
ENVVAR_SWARMING_BOT = "SWARMING_BOT_ID"
ENVVAR_SWARMING_SERVER = "SWARMING_SERVER"
ENVVAR_SWARMING_TASK = "SWARMING_TASK_ID"
// PATH_VAR represents the PATH environment variable.
PATH_VAR = "PATH"
)
var (
BASE_ENV = []string{
"CHROME_HEADLESS=1",
"GIT_USER_AGENT=git/1.9.1", // I don't think this version matters.
}
// Auth scopes required for all task_drivers.
SCOPES = []string{compute.CloudPlatformScope}
)
// RunProperties are properties for a single run of a Task Driver.
type RunProperties struct {
Local bool `json:"local"`
SwarmingBot string `json:"swarmingBot,omitempty"`
SwarmingServer string `json:"swarmingServer,omitempty"`
SwarmingTask string `json:"swarmingTask,omitempty"`
}
// Return an error if the RunProperties are not valid.
func (p *RunProperties) Validate() error {
if p.Local {
if p.SwarmingBot != "" {
return errors.New("SwarmingBot must be empty for local runs!")
}
if p.SwarmingServer != "" {
return errors.New("SwarmingServer must be empty for local runs!")
}
if p.SwarmingTask != "" {
return errors.New("SwarmingTask must be empty for local runs!")
}
} else {
if p.SwarmingBot == "" {
return errors.New("SwarmingBot is required for non-local runs!")
}
if p.SwarmingServer == "" {
return errors.New("SwarmingServer is required for non-local runs!")
}
if p.SwarmingTask == "" {
return errors.New("SwarmingTask is required for non-local runs!")
}
}
return nil
}
// Return a copy of the RunProperties.
func (p *RunProperties) Copy() *RunProperties {
if p == nil {
return nil
}
return &RunProperties{
Local: p.Local,
SwarmingBot: p.SwarmingBot,
SwarmingServer: p.SwarmingServer,
SwarmingTask: p.SwarmingTask,
}
}
// StartRunWithErr begins a new test automation run, returning any error which
// occurs.
func StartRunWithErr(projectId, taskId, taskName, output *string, local *bool) (context.Context, error) {
common.Init()
// TODO(borenet): Catch SIGINT, SIGKILL and report.
// Gather RunProperties.
swarmingBot := os.Getenv(ENVVAR_SWARMING_BOT)
swarmingServer := os.Getenv(ENVVAR_SWARMING_SERVER)
swarmingTask := os.Getenv(ENVVAR_SWARMING_TASK)
// "reproduce" is supplied by "swarming.py reproduce" and indicates that
// this is actually a local run, but --local won't have been provided
// because the command was copied directly from the Swarming task.
if swarmingTask == "reproduce" || swarmingBot == "reproduce" {
*local = true
swarmingBot = ""
swarmingServer = ""
swarmingTask = ""
}
if *local {
// Check to make sure we're not actually running in production.
// Note that the presence of SWARMING_SERVER does not indicate
// that we're running in production, because it can be used with
// swarming.py as an alternative to --swarming.
errTmpl := "--local was supplied but %s environment variable was found. Was --local used by accident?"
if swarmingBot != "" {
return nil, fmt.Errorf(errTmpl, ENVVAR_SWARMING_BOT)
} else if swarmingTask != "" {
return nil, fmt.Errorf(errTmpl, ENVVAR_SWARMING_TASK)
}
// Prevent clobbering real task data for local tasks.
hostname, err := os.Hostname()
if err != nil {
return nil, err
}
*taskId = fmt.Sprintf("%s_%s", hostname, uuid.New())
} else {
// Check to make sure that we're not running locally and the
// user forgot to use --local.
errTmpl := "--local was not supplied but environment variable %s was not found. Did you forget to use --local?"
if swarmingBot == "" {
return nil, fmt.Errorf(errTmpl, ENVVAR_SWARMING_BOT)
} else if swarmingServer == "" {
return nil, fmt.Errorf(errTmpl, ENVVAR_SWARMING_SERVER)
} else if swarmingTask == "" {
return nil, fmt.Errorf(errTmpl, ENVVAR_SWARMING_TASK)
}
}
// Validate properties and flags.
props := &RunProperties{
Local: *local,
SwarmingBot: swarmingBot,
SwarmingServer: swarmingServer,
SwarmingTask: swarmingTask,
}
if err := props.Validate(); err != nil {
return nil, err
}
if !*local {
if *projectId == "" {
return nil, fmt.Errorf("Project ID is required.")
}
if *taskId == "" {
return nil, fmt.Errorf("Task ID is required.")
}
if *taskName == "" {
return nil, fmt.Errorf("Task name is required.")
}
}
// Create the token source.
var ts oauth2.TokenSource
if *local {
var err error
ts, err = auth.NewDefaultTokenSource(*local, SCOPES...)
if err != nil {
return nil, err
}
} else {
var err error
ts, err = luciauth.NewLUCIContextTokenSource(SCOPES...)
if err != nil {
return nil, fmt.Errorf("Failed to obtain LUCI TokenSource: %s", err)
}
}
// Connect receivers.
receiver := MultiReceiver([]Receiver{
&DebugReceiver{},
newReportReceiver(*output),
})
// Initialize Cloud Logging.
ctx := context.Background()
if *projectId != "" && *taskId != "" && *taskName != "" {
labels := map[string]string{
"taskId": *taskId,
"taskName": *taskName,
}
logger, err := cloud_logging.New(ctx, *projectId, LOG_ID, ts, labels)
if err != nil {
return nil, err
}
sklog_impl.SetLogger(logger)
cloudLogging, err := NewCloudLoggingReceiver(logger.Logger())
if err != nil {
return nil, err
}
receiver = append(receiver, cloudLogging)
}
// Dump environment variables.
sklog.Infof("Environment:\n%s", strings.Join(os.Environ(), "\n"))
// Set up and return the root-level Step.
ctx = newRun(ctx, receiver, *taskId, *taskName, props)
return ctx, nil
}
// StartRun begins a new test automation run, panicking if any setup fails.
func StartRun(projectId, taskId, taskName, output *string, local *bool) context.Context {
ctx, err := StartRunWithErr(projectId, taskId, taskName, output, local)
if err != nil {
sklog.Fatalf("Failed task_driver.StartRun(): %s", err)
}
return ctx
}
// Perform any cleanup work for the run. Should be deferred in main().
func EndRun(ctx context.Context) {
defer util.Close(getCtx(ctx).run)
// Mark the root step as finished.
finishStep(ctx, recover())
}
// run represents a full test automation run.
type run struct {
receiver Receiver
taskId string
msgIndex int32
}
// newRun returns a context.Context representing a Task Driver run, including
// creation of a root step.
func newRun(ctx context.Context, rec Receiver, taskId, taskName string, props *RunProperties) context.Context {
r := &run{
receiver: rec,
taskId: taskId,
}
r.send(&Message{
Type: MSG_TYPE_RUN_STARTED,
Run: props,
})
ctx = context.WithValue(ctx, contextKey, &Context{
run: r,
execRun: exec.DefaultRun,
})
env := MergeEnv(os.Environ(), BASE_ENV)
ctx = newStep(ctx, STEP_ID_ROOT, nil, Props(taskName).Env(env))
return ctx
}
// Send the given message to the receiver. Does not return an error, even if
// sending fails.
func (r *run) send(msg *Message) {
msg.Index = int(atomic.AddInt32(&r.msgIndex, 1))
msg.TaskId = r.taskId
msg.Timestamp = time.Now().UTC()
if err := msg.Validate(); err != nil {
sklog.Error(err)
}
if err := r.receiver.HandleMessage(msg); err != nil {
// Just log the error but don't return it.
// TODO(borenet): How do we handle this?
sklog.Error(err)
}
}
// Send a Message indicating that a new step has started.
func (r *run) Start(props *StepProperties) {
msg := &Message{
Type: MSG_TYPE_STEP_STARTED,
StepId: props.Id,
Step: props,
}
r.send(msg)
}
// Send a Message with additional data for the current step.
func (r *run) AddStepData(id string, typ DataType, d interface{}) {
msg := &Message{
Type: MSG_TYPE_STEP_DATA,
StepId: id,
Data: d,
DataType: typ,
}
r.send(msg)
}
// Send a Message indicating that the current step has failed with the given
// error.
func (r *run) Failed(id string, err error) {
msg := &Message{
StepId: id,
Error: err.Error(),
}
if IsInfraError(err) {
msg.Type = MSG_TYPE_STEP_EXCEPTION
} else {
msg.Type = MSG_TYPE_STEP_FAILED
}
r.send(msg)
}
// Send a Message indicating that the current step has finished.
func (r *run) Finish(id string) {
msg := &Message{
Type: MSG_TYPE_STEP_FINISHED,
StepId: id,
}
r.send(msg)
}
// cbWriter is a wrapper around an io.Writer which runs a callback on the first
// call to Write().
type cbWriter struct {
w io.Writer
cb func()
}
// See documentation for io.Writer interface.
func (w *cbWriter) Write(b []byte) (int, error) {
if w.cb != nil {
w.cb()
w.cb = nil
}
return w.w.Write(b)
}
// Open a log stream.
func (r *run) LogStream(stepId, logName string, severity Severity) io.Writer {
logId := uuid.New().String() // TODO(borenet): Come up with a better ID.
w, err := r.receiver.LogStream(stepId, logId, severity)
if err != nil {
panic(err)
}
// Wrap the io.Writer with a cbWrite so that we only send step data for
// log streams which are actually used.
return &cbWriter{
w: w,
cb: func() {
// Emit step data for the log stream.
r.AddStepData(stepId, DATA_TYPE_LOG, &LogData{
Name: logName,
Id: logId,
Severity: severity.String(),
})
},
}
}
// Close the run.
func (r *run) Close() error {
return r.receiver.Close()
}