blob: e2151a1194233fc75acf184fca9465a569e9f7cf [file] [log] [blame]
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package main
import (
"bufio"
"bytes"
"flag"
"fmt"
"math/rand"
"os"
"runtime"
"strings"
"sync"
"sync/atomic"
"go.skia.org/infra/go/exec"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_driver/go/td"
)
type work struct {
Sources []string
Flags []string
}
func main() {
var (
projectId = flag.String("project_id", "", "ID of the Google Cloud project.")
taskId = flag.String("task_id", "", "ID of this task.")
taskName = flag.String("task_name", "", "Name of the task.")
local = flag.Bool("local", true, "True if running locally (as opposed to on the bots)")
output = flag.String("o", "", "If provided, dump a JSON blob of step data to the given file. Prints to stdout if '-' is given.")
resources = flag.String("resources", "resources", "Passed to fm -i.")
)
ctx := td.StartRun(projectId, taskId, taskName, output, local)
defer td.EndRun(ctx)
actualStderr := os.Stderr
if *local {
// Task Driver echoes every exec.Run() stdout and stderr to the console,
// which makes it hard to find failures (especially stdout). Send them to /dev/null.
f, err := os.OpenFile(os.DevNull, os.O_WRONLY, 0)
if err != nil {
td.Fatal(ctx, err)
}
os.Stdout = f
os.Stderr = f
}
if flag.NArg() < 1 {
td.Fatalf(ctx, "Please pass an fm binary.")
}
fm := flag.Arg(0)
// Run fm --flag to find the names of all linked GMs or tests.
query := func(flag string) []string {
stdout := &bytes.Buffer{}
cmd := &exec.Command{Name: fm, Stdout: stdout}
cmd.Args = append(cmd.Args, "-i", *resources)
cmd.Args = append(cmd.Args, flag)
if err := exec.Run(ctx, cmd); err != nil {
td.Fatal(ctx, err)
}
lines := []string{}
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
if err := scanner.Err(); err != nil {
td.Fatal(ctx, err)
}
return lines
}
gms := query("--listGMs")
tests := query("--listTests")
parse := func(job []string) *work {
w := &work{}
for _, token := range job {
// Everything after # is a comment.
if strings.HasPrefix(token, "#") {
break
}
// Treat "gm" or "gms" as a shortcut for all known GMs.
if token == "gm" || token == "gms" {
w.Sources = append(w.Sources, gms...)
continue
}
// Same for tests.
if token == "test" || token == "tests" {
w.Sources = append(w.Sources, tests...)
continue
}
// Is this a flag to pass through to FM?
if parts := strings.Split(token, "="); len(parts) == 2 {
f := "-"
if len(parts[0]) > 1 {
f += "-"
}
f += parts[0]
w.Flags = append(w.Flags, f, parts[1])
continue
}
// Anything else must be the name of a source for FM to run.
w.Sources = append(w.Sources, token)
}
return w
}
// TODO: this doesn't have to be hard coded, of course.
// TODO: add some .skps or images to demo that.
script := `
b=cpu tests
b=cpu gms
b=cpu gms skvm=true
#b=cpu gms skvm=true gamut=p3
#b=cpu gms skvm=true ct=565
`
jobs := [][]string{}
scanner := bufio.NewScanner(strings.NewReader(script))
for scanner.Scan() {
jobs = append(jobs, strings.Fields(scanner.Text()))
}
if err := scanner.Err(); err != nil {
td.Fatal(ctx, err)
}
var failures int32 = 0
wg := &sync.WaitGroup{}
worker := func(queue chan work) {
for w := range queue {
stdout := &bytes.Buffer{}
stderr := &bytes.Buffer{}
cmd := &exec.Command{Name: fm, Stdout: stdout, Stderr: stderr}
cmd.Args = append(cmd.Args, "-i", *resources)
cmd.Args = append(cmd.Args, w.Flags...)
cmd.Args = append(cmd.Args, "-s")
cmd.Args = append(cmd.Args, w.Sources...)
if err := exec.Run(ctx, cmd); err != nil {
if len(w.Sources) == 1 {
// If a source ran alone and failed, that's just a failure.
atomic.AddInt32(&failures, 1)
td.FailStep(ctx, err)
if *local {
lines := []string{}
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
if err := scanner.Err(); err != nil {
td.Fatal(ctx, err)
}
fmt.Fprintf(actualStderr, "%v %v #failed:\n\t%v\n",
cmd.Name,
strings.Join(cmd.Args, " "),
strings.Join(lines, "\n\t"))
}
} else {
// If a batch of sources ran and failed, split them up and try again.
for _, source := range w.Sources {
wg.Add(1)
queue <- work{[]string{source}, w.Flags}
}
}
}
wg.Done()
}
}
workers := runtime.NumCPU()
queue := make(chan work, 1<<20)
for i := 0; i < workers; i++ {
go worker(queue)
}
for _, job := range jobs {
w := parse(job)
if len(w.Sources) == 0 {
continue
}
// Shuffle the sources randomly as a cheap way to approximate evenly expensive batches.
rand.Shuffle(len(w.Sources), func(i, j int) {
w.Sources[i], w.Sources[j] = w.Sources[j], w.Sources[i]
})
// Round up so there's at least one source per batch.
batch := (len(w.Sources) + workers - 1) / workers
util.ChunkIter(len(w.Sources), batch, func(start, end int) error {
wg.Add(1)
queue <- work{w.Sources[start:end], w.Flags}
return nil
})
}
wg.Wait()
if failures > 0 {
if *local {
// td.Fatalf() would work fine, but barfs up a panic that we don't need to see.
fmt.Fprintf(actualStderr, "%v runs of %v failed after retries.\n", failures, fm)
os.Exit(1)
} else {
td.Fatalf(ctx, "%v runs of %v failed after retries.", failures, fm)
}
}
}