blob: 558f616b9bcf635989423d2cb3d0efc286702d62 [file] [log] [blame]
// run_chromium_perf is an application that runs the specified benchmark over CT's
// webpage archives.
package main
import (
"context"
"errors"
"flag"
"fmt"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"
"go.skia.org/infra/ct/go/adb"
"go.skia.org/infra/ct/go/util"
"go.skia.org/infra/ct/go/worker_scripts/worker_common"
"go.skia.org/infra/go/exec"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
skutil "go.skia.org/infra/go/util"
)
const (
// The number of goroutines that will run in parallel to run benchmarks.
WORKER_POOL_SIZE = 5
// The number of allowed benchmark timeouts in a row before the worker
// script fails.
MAX_ALLOWED_SEQUENTIAL_TIMEOUTS = 20
)
var (
startRange = flag.Int("start_range", 1, "The number this worker will run benchmarks from.")
num = flag.Int("num", 100, "The total number of benchmarks to run starting from the start_range.")
pagesetType = flag.String("pageset_type", util.PAGESET_TYPE_MOBILE_10k, "The type of pagesets to create from the Alexa CSV list. Eg: 10k, Mobile10k, All.")
chromiumBuildNoPatch = flag.String("chromium_build_nopatch", "", "The chromium build to use for the nopatch run.")
chromiumBuildWithPatch = flag.String("chromium_build_withpatch", "", "The chromium build to use for the withpatch run.")
runID = flag.String("run_id", "", "The unique run id (typically requester + timestamp).")
benchmarkName = flag.String("benchmark_name", "", "The telemetry benchmark to run on this worker.")
benchmarkExtraArgs = flag.String("benchmark_extra_args", "", "The extra arguments that are passed to the specified benchmark.")
browserExtraArgsNoPatch = flag.String("browser_extra_args_nopatch", "", "The extra arguments that are passed to the browser while running the benchmark during the nopatch run.")
browserExtraArgsWithPatch = flag.String("browser_extra_args_withpatch", "", "The extra arguments that are passed to the browser while running the benchmark during the withpatch run.")
repeatBenchmark = flag.Int("repeat_benchmark", 3, "The number of times the benchmark should be repeated.")
runInParallel = flag.Bool("run_in_parallel", false, "Run the benchmark by bringing up multiple chrome instances in parallel.")
targetPlatform = flag.String("target_platform", util.PLATFORM_ANDROID, "The platform the benchmark will run on (Android / Linux).")
chromeCleanerTimer = flag.Duration("cleaner_timer", 15*time.Minute, "How often all chrome processes will be killed on this worker.")
valueColumnName = flag.String("value_column_name", "", "Which column's entries to use as field values when combining CSVs.")
)
func runChromiumPerf() error {
ctx := context.Background()
httpClient, err := worker_common.Init(ctx, false /* useDepotTools */)
if err != nil {
return skerr.Wrap(err)
}
if !*worker_common.Local {
defer util.CleanTmpDir()
}
defer util.TimeTrack(time.Now(), "Running Chromium Perf")
defer sklog.Flush()
// Validate required arguments.
if *chromiumBuildNoPatch == "" {
return errors.New("Must specify --chromium_build_nopatch")
}
if *chromiumBuildWithPatch == "" {
return errors.New("Must specify --chromium_build_withpatch")
}
if *runID == "" {
return errors.New("Must specify --run_id")
}
if *benchmarkName == "" {
return errors.New("Must specify --benchmark_name")
}
// Use defaults.
if *valueColumnName == "" {
*valueColumnName = util.DEFAULT_VALUE_COLUMN_NAME
}
// Instantiate GcsUtil object.
gs, err := util.NewGcsUtil(httpClient)
if err != nil {
return err
}
tmpDir, err := os.MkdirTemp("", "patches")
remotePatchesDir := path.Join(util.ChromiumPerfRunsStorageDir, *runID)
// Download the custom webpages for this run from Google storage.
customWebpagesName := *runID + ".custom_webpages.csv"
if _, err := util.DownloadPatch(filepath.Join(tmpDir, customWebpagesName), path.Join(remotePatchesDir, customWebpagesName), gs); err != nil {
return fmt.Errorf("Could not download %s: %s", customWebpagesName, err)
}
customWebpages, err := util.GetCustomPages(filepath.Join(tmpDir, customWebpagesName))
if err != nil {
return fmt.Errorf("Could not read custom webpages file %s: %s", customWebpagesName, err)
}
if len(customWebpages) > 0 {
customWebpages = util.GetCustomPagesWithinRange(*startRange, *num, customWebpages)
}
chromiumBuilds := []string{*chromiumBuildNoPatch}
// No point downloading the same build twice. Download only if builds are different.
if *chromiumBuildNoPatch != *chromiumBuildWithPatch {
chromiumBuilds = append(chromiumBuilds, *chromiumBuildWithPatch)
}
chromiumBinaryName := util.BINARY_CHROME
if *targetPlatform == util.PLATFORM_WINDOWS {
chromiumBinaryName = util.BINARY_CHROME_WINDOWS
} else if *targetPlatform == util.PLATFORM_ANDROID {
chromiumBinaryName = util.ApkName
}
// Download the specified chromium builds.
for _, chromiumBuild := range chromiumBuilds {
if _, err := gs.DownloadChromiumBuild(chromiumBuild, chromiumBinaryName); err != nil {
return err
}
//Delete the chromium build to save space when we are done.
defer skutil.RemoveAll(filepath.Join(util.ChromiumBuildsDir, chromiumBuild))
}
chromiumBinaryNoPatch := filepath.Join(util.ChromiumBuildsDir, *chromiumBuildNoPatch, chromiumBinaryName)
chromiumBinaryWithPatch := filepath.Join(util.ChromiumBuildsDir, *chromiumBuildWithPatch, chromiumBinaryName)
var pathToPagesets string
if len(customWebpages) > 0 {
pathToPagesets = filepath.Join(util.PagesetsDir, "custom")
if err := util.CreateCustomPagesets(customWebpages, pathToPagesets, *targetPlatform, *startRange); err != nil {
return err
}
} else {
// Download pagesets if they do not exist locally.
pathToPagesets = filepath.Join(util.PagesetsDir, *pagesetType)
if _, err := gs.DownloadSwarmingArtifacts(pathToPagesets, util.PAGESETS_DIR_NAME, *pagesetType, *startRange, *num); err != nil {
return err
}
}
defer skutil.RemoveAll(pathToPagesets)
if !strings.Contains(*benchmarkExtraArgs, util.USE_LIVE_SITES_FLAGS) {
// Download archives if they do not exist locally.
pathToArchives := filepath.Join(util.WebArchivesDir, *pagesetType)
if _, err := gs.DownloadSwarmingArtifacts(pathToArchives, util.WEB_ARCHIVES_DIR_NAME, *pagesetType, *startRange, *num); err != nil {
return err
}
defer skutil.RemoveAll(pathToArchives)
}
// Establish nopatch output paths.
runIDNoPatch := fmt.Sprintf("%s-nopatch", *runID)
localOutputDirNoPatch := filepath.Join(util.StorageDir, util.BenchmarkRunsDir, runIDNoPatch)
skutil.RemoveAll(localOutputDirNoPatch)
util.MkdirAll(localOutputDirNoPatch, 0700)
defer skutil.RemoveAll(localOutputDirNoPatch)
remoteDirNoPatch := path.Join(util.BenchmarkRunsStorageDir, runIDNoPatch)
// Establish withpatch output paths.
runIDWithPatch := fmt.Sprintf("%s-withpatch", *runID)
localOutputDirWithPatch := filepath.Join(util.StorageDir, util.BenchmarkRunsDir, runIDWithPatch)
skutil.RemoveAll(localOutputDirWithPatch)
util.MkdirAll(localOutputDirWithPatch, 0700)
defer skutil.RemoveAll(localOutputDirWithPatch)
remoteDirWithPatch := path.Join(util.BenchmarkRunsStorageDir, runIDWithPatch)
// Construct path to the ct_run_benchmark python script.
pathToPyFiles, err := util.GetPathToPyFiles(*worker_common.Local)
if err != nil {
return fmt.Errorf("Could not get path to py files: %s", err)
}
fileInfos, err := os.ReadDir(pathToPagesets)
if err != nil {
return fmt.Errorf("Unable to read the pagesets dir %s: %s", pathToPagesets, err)
}
numWorkers := WORKER_POOL_SIZE
if *targetPlatform == util.PLATFORM_ANDROID || !*runInParallel {
// Do not run page sets in parallel if the target platform is Android.
// This is because the nopatch/withpatch APK needs to be installed prior to
// each run and this will interfere with the parallel runs. Instead of trying
// to find a complicated solution to this, it makes sense for Android to
// continue to be serial because it will help guard against
// crashes/flakiness/inconsistencies which are more prevalent in mobile runs.
numWorkers = 1
sklog.Info("===== Going to run the task serially =====")
} else {
sklog.Info("===== Going to run the task with parallel chrome processes =====")
}
// Create channel that contains all pageset file names. This channel will
// be consumed by the worker pool.
pagesetRequests := util.GetClosedChannelOfPagesets(fileInfos)
var wg sync.WaitGroup
// Use a RWMutex for the chromeProcessesCleaner goroutine to communicate to
// the workers (acting as "readers") when it wants to be the "writer" and
// kill all zombie chrome processes.
var mutex sync.RWMutex
timeoutTracker := util.TimeoutTracker{}
// Loop through workers in the worker pool.
for i := 0; i < numWorkers; i++ {
// Increment the WaitGroup counter.
wg.Add(1)
// Create and run a goroutine closure that runs the benchmark.
go func() {
// Decrement the WaitGroup counter when the goroutine completes.
defer wg.Done()
for pagesetName := range pagesetRequests {
mutex.RLock()
_, noPatchErr := util.RunBenchmark(ctx, pagesetName, pathToPagesets, pathToPyFiles, localOutputDirNoPatch, chromiumBinaryNoPatch, runIDNoPatch, *browserExtraArgsNoPatch, *benchmarkName, *targetPlatform, *benchmarkExtraArgs, *pagesetType, *repeatBenchmark, !*worker_common.Local)
if noPatchErr != nil {
if exec.IsTimeout(noPatchErr) {
timeoutTracker.Increment()
}
// For Android runs make sure that the device is online. If not then stop early.
if *targetPlatform == util.PLATFORM_ANDROID {
if err := adb.VerifyLocalDevice(ctx); err != nil {
sklog.Errorf("Could not find Android device: %s", err)
return
}
}
} else {
timeoutTracker.Reset()
}
_, withPatchErr := util.RunBenchmark(ctx, pagesetName, pathToPagesets, pathToPyFiles, localOutputDirWithPatch, chromiumBinaryWithPatch, runIDWithPatch, *browserExtraArgsWithPatch, *benchmarkName, *targetPlatform, *benchmarkExtraArgs, *pagesetType, *repeatBenchmark, !*worker_common.Local)
if withPatchErr != nil {
if exec.IsTimeout(withPatchErr) {
timeoutTracker.Increment()
}
// For Android runs make sure that the device is online. If not then stop early.
if *targetPlatform == util.PLATFORM_ANDROID {
if err := adb.VerifyLocalDevice(ctx); err != nil {
sklog.Errorf("Could not find Android device: %s", err)
return
}
}
} else {
timeoutTracker.Reset()
}
mutex.RUnlock()
if timeoutTracker.Read() > MAX_ALLOWED_SEQUENTIAL_TIMEOUTS {
sklog.Errorf("Ran into %d sequential timeouts. Something is wrong. Killing the goroutine.", MAX_ALLOWED_SEQUENTIAL_TIMEOUTS)
return
}
}
}()
}
if !*worker_common.Local {
// Start the cleaner.
go util.ChromeProcessesCleaner(ctx, &mutex, *chromeCleanerTimer)
}
// Wait for all spawned goroutines to complete.
wg.Wait()
if timeoutTracker.Read() > MAX_ALLOWED_SEQUENTIAL_TIMEOUTS {
return fmt.Errorf("There were %d sequential timeouts.", MAX_ALLOWED_SEQUENTIAL_TIMEOUTS)
}
// If "--output-format=csv" is specified then merge all CSV files and upload.
if strings.Contains(*benchmarkExtraArgs, "--output-format=csv") {
if err := util.MergeUploadCSVFilesOnWorkers(ctx, localOutputDirNoPatch, pathToPyFiles, runIDNoPatch, remoteDirNoPatch, *valueColumnName, gs, *startRange, true /* handleStrings */, true /* addRanks */, map[string]map[string]string{} /* pageRankToAdditionalFields */); err != nil {
return fmt.Errorf("Error while processing withpatch CSV files: %s", err)
}
if err := util.MergeUploadCSVFilesOnWorkers(ctx, localOutputDirWithPatch, pathToPyFiles, runIDWithPatch, remoteDirWithPatch, *valueColumnName, gs, *startRange, true /* handleStrings */, true /* addRanks */, map[string]map[string]string{} /* pageRankToAdditionalFields */); err != nil {
return fmt.Errorf("Error while processing withpatch CSV files: %s", err)
}
}
return nil
}
func main() {
retCode := 0
if err := runChromiumPerf(); err != nil {
sklog.Errorf("Error while running chromium perf: %s", err)
retCode = 255
}
os.Exit(retCode)
}