blob: 994b9f2ff1f16f6d1124971d5aedb3dc8f810fc0 [file] [log] [blame]
/*
The Cluster Telemetry poller checks for new pending tasks by polling the Cluster Telemetry
frontend. Pending tasks are picked up according to the order they were added to CTFE.
When picked up, tasks are immediately executed. There could be multiple tasks running at the
same time.
*/
package main
import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/ct/go/ctfe/admin_tasks"
"go.skia.org/infra/ct/go/ctfe/capture_skps"
"go.skia.org/infra/ct/go/ctfe/chromium_analysis"
"go.skia.org/infra/ct/go/ctfe/chromium_builds"
"go.skia.org/infra/ct/go/ctfe/chromium_perf"
"go.skia.org/infra/ct/go/ctfe/lua_scripts"
"go.skia.org/infra/ct/go/ctfe/task_common"
"go.skia.org/infra/ct/go/frontend"
"go.skia.org/infra/ct/go/master_scripts/master_common"
ctutil "go.skia.org/infra/ct/go/util"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/exec"
"go.skia.org/infra/go/metrics2"
skutil "go.skia.org/infra/go/util"
)
// flags
var (
dryRun = flag.Bool("dry_run", false, "If true, just log the commands that would be executed; don't actually execute the commands. Still polls CTFE for pending tasks, but does not post updates.")
promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':20000')")
pollInterval = flag.Duration("poll_interval", 30*time.Second, "How often to poll CTFE for new pending tasks.")
// Mutex that controls updating and building of the local checkout.
repoMtx = sync.Mutex{}
// Map that holds all picked up tasks. Used to ensure same task is not picked up more than once.
pickedUpTasks = map[string]string{}
// Mutex that controls access to the above map.
tasksMtx = sync.Mutex{}
)
// Enum of states that the poller could be in. Satisfies the fmt.Stringer interface.
type TaskType uint32
const (
IDLE TaskType = iota
UPDATE_AND_BUILD
CHROMIUM_PERF
CAPTURE_SKPS
LUA_SCRIPT
CHROMIUM_BUILD
RECREATE_PAGE_SETS
RECREATE_WEBPAGE_ARCHIVES
CHROMIUM_ANALYSIS
POLL
)
func (t TaskType) String() string {
switch t {
case IDLE:
return "IDLE"
case UPDATE_AND_BUILD:
return "UPDATE_AND_BUILD"
case CHROMIUM_PERF:
return "CHROMIUM_PERF"
case CAPTURE_SKPS:
return "CAPTURE_SKPS"
case LUA_SCRIPT:
return "LUA_SCRIPT"
case CHROMIUM_BUILD:
return "CHROMIUM_BUILD"
case RECREATE_PAGE_SETS:
return "RECREATE_PAGE_SETS"
case RECREATE_WEBPAGE_ARCHIVES:
return "RECREATE_WEBPAGE_ARCHIVES"
case CHROMIUM_ANALYSIS:
return "CHROMIUM_ANALYSIS"
case POLL:
return "POLL"
default:
return "(unknown)"
}
}
// Runs "git pull; make all".
func updateAndBuild() error {
repoMtx.Lock()
defer repoMtx.Unlock()
makefilePath := ctutil.CtTreeDir
// TODO(benjaminwagner): Should this also do 'go get -u ...' and/or 'gclient sync'?
err := exec.Run(&exec.Command{
Name: "git",
Args: []string{"pull"},
Dir: makefilePath,
Timeout: ctutil.GIT_PULL_TIMEOUT,
LogStdout: true,
LogStderr: true,
})
if err != nil {
return err
}
return exec.Run(&exec.Command{
Name: "make",
Args: []string{"all"},
Dir: makefilePath,
Timeout: ctutil.MAKE_ALL_TIMEOUT,
LogStdout: true,
LogStderr: true,
})
}
// Specifies the methods that poll requires for each type of task.
type Task interface {
GetTaskName() string
GetCommonCols() *task_common.CommonCols
// Writes any files required by the task and then uses exec.Run to run the task command.
Execute() error
// Returns the corresponding UpdateTaskVars instance of this Task. The
// returned instance is not populated.
GetUpdateTaskVars() task_common.UpdateTaskVars
}
// Generates a hopefully-unique ID for this execution of this task.
func runId(task Task) string {
// TODO(benjaminwagner): May need a way to ensure that run IDs are unique. It is currently
// possible, though very unlikely, for GetOldestPendingTaskV2 to take
// ~(pollInterval - 1 second) and for the returned task to fail very quickly, in which case
// the next task would could start within the same second as the first task.
return strings.SplitN(task.GetCommonCols().Username, "@", 2)[0] + "-" + ctutil.GetCurrentTs()
}
// Define frontend.ChromiumAnalysisDBTask here so we can add methods.
type ChromiumAnalysisTask struct {
chromium_analysis.DBTask
}
func (task *ChromiumAnalysisTask) Execute() error {
runId := runId(task)
for fileSuffix, patch := range map[string]string{
".chromium.patch": task.ChromiumPatch,
".catapult.patch": task.CatapultPatch,
".benchmark.patch": task.BenchmarkPatch,
".custom_webpages.csv": task.CustomWebpages,
} {
// Add an extra newline at the end because git sometimes rejects patches due to
// missing newlines.
patch = patch + "\n"
patchPath := filepath.Join(os.TempDir(), runId+fileSuffix)
if err := ioutil.WriteFile(patchPath, []byte(patch), 0666); err != nil {
return err
}
defer skutil.Remove(patchPath)
}
return exec.Run(&exec.Command{
Name: "run_chromium_analysis_on_workers",
Args: []string{
"--emails=" + task.Username,
"--description=" + task.Description,
"--gae_task_id=" + strconv.FormatInt(task.Id, 10),
"--pageset_type=" + task.PageSets,
"--benchmark_name=" + task.Benchmark,
"--benchmark_extra_args=" + task.BenchmarkArgs,
"--browser_extra_args=" + task.BrowserArgs,
"--run_in_parallel=" + strconv.FormatBool(task.RunInParallel),
"--target_platform=" + task.Platform,
"--run_on_gce=" + strconv.FormatBool(task.RunOnGCE),
"--run_id=" + runId,
"--logtostderr",
"--log_id=" + runId,
fmt.Sprintf("--local=%t", *master_common.Local),
},
})
}
// Define frontend.ChromiumPerfDBTask here so we can add methods.
type ChromiumPerfTask struct {
chromium_perf.DBTask
}
func (task *ChromiumPerfTask) Execute() error {
runId := runId(task)
// TODO(benjaminwagner): Since run_chromium_perf_on_workers only reads these in order to
// upload to Google Storage, eventually we should move the upload step here to avoid writing
// to disk.
for fileSuffix, patch := range map[string]string{
".chromium.patch": task.ChromiumPatch,
".skia.patch": task.SkiaPatch,
".catapult.patch": task.CatapultPatch,
".benchmark.patch": task.BenchmarkPatch,
".custom_webpages.csv": task.CustomWebpages,
} {
// Add an extra newline at the end because git sometimes rejects patches due to
// missing newlines.
patch = patch + "\n"
patchPath := filepath.Join(os.TempDir(), runId+fileSuffix)
if err := ioutil.WriteFile(patchPath, []byte(patch), 0666); err != nil {
return err
}
defer skutil.Remove(patchPath)
}
return exec.Run(&exec.Command{
Name: "run_chromium_perf_on_workers",
Args: []string{
"--emails=" + task.Username,
"--description=" + task.Description,
"--gae_task_id=" + strconv.FormatInt(task.Id, 10),
"--pageset_type=" + task.PageSets,
"--benchmark_name=" + task.Benchmark,
"--benchmark_extra_args=" + task.BenchmarkArgs,
"--browser_extra_args_nopatch=" + task.BrowserArgsNoPatch,
"--browser_extra_args_withpatch=" + task.BrowserArgsWithPatch,
"--repeat_benchmark=" + strconv.FormatInt(task.RepeatRuns, 10),
"--run_in_parallel=" + strconv.FormatBool(task.RunInParallel),
"--target_platform=" + task.Platform,
"--run_id=" + runId,
"--logtostderr",
"--log_id=" + runId,
fmt.Sprintf("--local=%t", *master_common.Local),
},
})
}
// Define frontend.CaptureSkpsDBTask here so we can add methods.
type CaptureSkpsTask struct {
capture_skps.DBTask
}
func (task *CaptureSkpsTask) Execute() error {
runId := runId(task)
chromiumBuildDir := ctutil.ChromiumBuildDir(task.ChromiumRev, task.SkiaRev, "")
return exec.Run(&exec.Command{
Name: "capture_skps_on_workers",
Args: []string{
"--emails=" + task.Username,
"--description=" + task.Description,
"--gae_task_id=" + strconv.FormatInt(task.Id, 10),
"--pageset_type=" + task.PageSets,
"--chromium_build=" + chromiumBuildDir,
"--target_platform=Linux",
"--run_id=" + runId,
"--logtostderr",
"--log_id=" + runId,
fmt.Sprintf("--local=%t", *master_common.Local),
},
})
}
// Define frontend.LuaScriptDBTask here so we can add methods.
type LuaScriptTask struct {
lua_scripts.DBTask
}
func (task *LuaScriptTask) Execute() error {
runId := runId(task)
chromiumBuildDir := ctutil.ChromiumBuildDir(task.ChromiumRev, task.SkiaRev, "")
// TODO(benjaminwagner): Since run_lua_on_workers only reads the lua script in order to
// upload to Google Storage, eventually we should move the upload step here to avoid writing
// to disk. Not sure if we can/should do the same for the aggregator script.
luaScriptName := runId + ".lua"
luaScriptPath := filepath.Join(os.TempDir(), luaScriptName)
if err := ioutil.WriteFile(luaScriptPath, []byte(task.LuaScript), 0666); err != nil {
return err
}
defer skutil.Remove(luaScriptPath)
if task.LuaAggregatorScript != "" {
luaAggregatorName := runId + ".aggregator"
luaAggregatorPath := filepath.Join(os.TempDir(), luaAggregatorName)
if err := ioutil.WriteFile(luaAggregatorPath, []byte(task.LuaAggregatorScript), 0666); err != nil {
return err
}
defer skutil.Remove(luaAggregatorPath)
}
return exec.Run(&exec.Command{
Name: "run_lua_on_workers",
Args: []string{
"--emails=" + task.Username,
"--description=" + task.Description,
"--gae_task_id=" + strconv.FormatInt(task.Id, 10),
"--pageset_type=" + task.PageSets,
"--chromium_build=" + chromiumBuildDir,
"--run_id=" + runId,
"--logtostderr",
"--log_id=" + runId,
fmt.Sprintf("--local=%t", *master_common.Local),
},
})
}
// Define frontend.ChromiumBuildDBTask here so we can add methods.
type ChromiumBuildTask struct {
chromium_builds.DBTask
}
func (task *ChromiumBuildTask) Execute() error {
runId := runId(task)
return exec.Run(&exec.Command{
Name: "build_chromium",
Args: []string{
"--emails=" + task.Username,
"--gae_task_id=" + strconv.FormatInt(task.Id, 10),
"--run_id=" + runId,
"--target_platform=Linux",
"--chromium_hash=" + task.ChromiumRev,
"--skia_hash=" + task.SkiaRev,
"--logtostderr",
"--log_id=" + runId,
fmt.Sprintf("--local=%t", *master_common.Local),
},
})
}
// Define frontend.RecreatePageSetsDBTask here so we can add methods.
type RecreatePageSetsTask struct {
admin_tasks.RecreatePageSetsDBTask
}
func (task *RecreatePageSetsTask) Execute() error {
runId := runId(task)
return exec.Run(&exec.Command{
Name: "create_pagesets_on_workers",
Args: []string{
"--emails=" + task.Username,
"--gae_task_id=" + strconv.FormatInt(task.Id, 10),
"--run_id=" + runId,
"--pageset_type=" + task.PageSets,
"--logtostderr",
"--log_id=" + runId,
fmt.Sprintf("--local=%t", *master_common.Local),
},
})
}
// Define frontend.RecreateWebpageArchivesDBTask here so we can add methods.
type RecreateWebpageArchivesTask struct {
admin_tasks.RecreateWebpageArchivesDBTask
}
func (task *RecreateWebpageArchivesTask) Execute() error {
runId := runId(task)
return exec.Run(&exec.Command{
Name: "capture_archives_on_workers",
Args: []string{
"--emails=" + task.Username,
"--gae_task_id=" + strconv.FormatInt(task.Id, 10),
"--run_id=" + runId,
"--pageset_type=" + task.PageSets,
"--logtostderr",
"--log_id=" + runId,
fmt.Sprintf("--local=%t", *master_common.Local),
},
})
}
// Returns a poller Task containing the given task_common.Task, or nil if otherTask is nil.
func asPollerTask(otherTask task_common.Task) Task {
if otherTask == nil {
return nil
}
switch t := otherTask.(type) {
case *chromium_perf.DBTask:
return &ChromiumPerfTask{DBTask: *t}
case *capture_skps.DBTask:
return &CaptureSkpsTask{DBTask: *t}
case *lua_scripts.DBTask:
return &LuaScriptTask{DBTask: *t}
case *chromium_builds.DBTask:
return &ChromiumBuildTask{DBTask: *t}
case *admin_tasks.RecreatePageSetsDBTask:
return &RecreatePageSetsTask{RecreatePageSetsDBTask: *t}
case *admin_tasks.RecreateWebpageArchivesDBTask:
return &RecreateWebpageArchivesTask{RecreateWebpageArchivesDBTask: *t}
case *chromium_analysis.DBTask:
return &ChromiumAnalysisTask{DBTask: *t}
default:
sklog.Errorf("Missing case for %T in asPollerTask", otherTask)
return nil
}
}
// Notifies the frontend that task failed.
func updateWebappTaskSetFailed(task Task) error {
updateVars := task.GetUpdateTaskVars()
updateVars.GetUpdateTaskCommonVars().Id = task.GetCommonCols().Id
updateVars.GetUpdateTaskCommonVars().SetCompleted(false)
return frontend.UpdateWebappTaskV2(updateVars)
}
// pollAndExecOnce looks for the oldest pending task in CTFE. If one is found, then
// the local checkout is synced and built, and the picked up task is started in a
// go routine. The function returns without waiting for the task to finish and the
// WaitGroup of the goroutine is returned to the caller. The caller can then call
// wg.Wait() if they would like to wait for the task to finish.
func pollAndExecOnce() *sync.WaitGroup {
pending, err := frontend.GetOldestPendingTaskV2()
var wg sync.WaitGroup
if err != nil {
sklog.Error(err)
return &wg
}
task := asPollerTask(pending)
if task == nil {
return &wg
}
taskName, id := task.GetTaskName(), task.GetCommonCols().Id
tasksMtx.Lock()
_, exists := pickedUpTasks[fmt.Sprintf("%s.%d", taskName, id)]
tasksMtx.Unlock()
if exists {
return &wg
}
tasksMtx.Lock()
pickedUpTasks[fmt.Sprintf("%s.%d", taskName, id)] = "1"
tasksMtx.Unlock()
sklog.Infof("Preparing to execute task %s %d", taskName, id)
if err = updateAndBuild(); err != nil {
sklog.Error(err)
return &wg
}
sklog.Infof("Executing task %s %d", taskName, id)
// Increment the WaitGroup counter.
wg.Add(1)
go func() {
// Decrement the counter when the goroutine completes.
defer wg.Done()
if err = task.Execute(); err == nil {
sklog.Infof("Completed task %s %d", taskName, id)
} else {
sklog.Errorf("Task %s %d failed: %v", taskName, id, err)
if !*dryRun {
if err := updateWebappTaskSetFailed(task); err != nil {
sklog.Error(err)
}
}
}
tasksMtx.Lock()
delete(pickedUpTasks, fmt.Sprintf("%s.%d", taskName, id))
tasksMtx.Unlock()
}()
// Return the WaitGroup to allow some callers to call wg.Wait()
return &wg
}
func main() {
defer common.LogPanic()
master_common.InitWithMetrics2("ct-poller", promPort)
if *dryRun {
exec.SetRunForTesting(func(command *exec.Command) error {
sklog.Infof("dry_run: %s", exec.DebugString(command))
return nil
})
}
healthyGauge := metrics2.GetInt64Metric("healthy")
// Run immediately, since pollTick will not fire until after pollInterval.
pollAndExecOnce()
for range time.Tick(*pollInterval) {
healthyGauge.Update(1)
pollAndExecOnce()
}
}