blob: 1d59d9fdd64341453903240a9ac656a40590addc [file] [log] [blame]
// Utility that contains methods for both CT master and worker scripts.
package util
import (
"bytes"
"context"
"crypto/sha1"
"encoding/csv"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/fs"
"math"
"os"
"path"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"time"
apipb "go.chromium.org/luci/swarming/proto/api_v2"
"go.skia.org/infra/go/cas"
"go.skia.org/infra/go/cas/rbe"
"go.skia.org/infra/go/cipd"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/exec"
"go.skia.org/infra/go/fileutil"
"go.skia.org/infra/go/gitiles"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/swarming"
swarmingv2 "go.skia.org/infra/go/swarming/v2"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/specs"
)
const (
MAX_SYNC_TRIES = 3
TS_FORMAT = "20060102150405"
MAX_SIMULTANEOUS_SWARMING_TASKS_PER_RUN = 10000
PATCH_LIMIT = 1 << 26
)
var (
CIPD_PATHS = []string{
"cipd_bin_packages",
"cipd_bin_packages/bin",
"cipd_bin_packages/cpython",
"cipd_bin_packages/cpython/bin",
"cipd_bin_packages/cpython3",
"cipd_bin_packages/cpython3/bin",
}
VPYTHON_VIRTUALENV_ROOT = filepath.Join(os.TempDir(), "vpython")
)
// CasSpecs for master scripts.
func CasCreatePagesetsMaster() *CasSpec {
return &CasSpec{
Paths: []string{"bin/create_pagesets_on_workers"},
IncludeCasSpecs: []*CasSpec{
CasPython(),
CasCreatePagesets(),
},
}
}
func CasCaptureArchivesMaster() *CasSpec {
return &CasSpec{
Paths: []string{"bin/capture_archives_on_workers"},
IncludeCasSpecs: []*CasSpec{
CasPython(),
CasIsolateTelemetryLinux(),
CasCaptureArchives(),
},
}
}
func CasChromiumAnalysisMaster() *CasSpec {
return &CasSpec{
Paths: []string{"bin/run_chromium_analysis_on_workers"},
IncludeCasSpecs: []*CasSpec{
CasPython(),
CasBuildRepoLinux(),
CasIsolateTelemetryLinux(),
CasChromiumAnalysisLinux(),
},
}
}
func CasChromiumPerfMaster() *CasSpec {
return &CasSpec{
Paths: []string{"bin/run_chromium_perf_on_workers"},
IncludeCasSpecs: []*CasSpec{
CasPython(),
CasBuildRepoLinux(),
CasIsolateTelemetryLinux(),
CasChromiumPerfLinux(),
},
}
}
func CasMetricsAnalysisMaster() *CasSpec {
return &CasSpec{
Paths: []string{"bin/metrics_analysis_on_workers"},
IncludeCasSpecs: []*CasSpec{
CasPython(),
CasIsolateTelemetryLinux(),
CasMetricsAnalysis(),
},
}
}
// CasSpecs for worker scripts.
func CasCreatePagesets() *CasSpec {
return &CasSpec{
Paths: []string{"bin/create_pagesets"},
IncludeCasSpecs: []*CasSpec{CasPython()},
}
}
func CasCaptureArchives() *CasSpec {
return &CasSpec{
Paths: []string{"bin/capture_archives"},
}
}
func CasChromiumAnalysisLinux() *CasSpec {
return &CasSpec{
Paths: []string{"bin/run_chromium_analysis"},
IncludeCasSpecs: []*CasSpec{CasPython()},
}
}
func CasChromiumPerfLinux() *CasSpec {
return &CasSpec{
Paths: []string{"bin/run_chromium_perf"},
IncludeCasSpecs: []*CasSpec{CasPython()},
}
}
func CasMetricsAnalysis() *CasSpec {
return &CasSpec{
Paths: []string{"bin/metrics_analysis"},
IncludeCasSpecs: []*CasSpec{CasPython()},
}
}
// CasSpecs for build scripts.
func CasBuildRepoLinux() *CasSpec {
return &CasSpec{
Paths: []string{"bin/build_repo"},
IncludeCasSpecs: []*CasSpec{CasPython()},
}
}
func CasIsolateTelemetryLinux() *CasSpec {
return &CasSpec{
Paths: []string{"bin/isolate_telemetry"},
IncludeCasSpecs: []*CasSpec{CasPython()},
}
}
func CasPython() *CasSpec {
return &CasSpec{
Paths: []string{"py/"},
}
}
// CasSpec describes a set of files to upload to CAS.
type CasSpec struct {
Paths []string
IncludeCasSpecs []*CasSpec
IncludeDigests []string
}
func TimeTrack(start time.Time, name string) {
elapsed := time.Since(start)
sklog.Infof("===== %s took %s =====", name, elapsed)
}
// ExecuteCmd calls ExecuteCmdWithConfigurableLogging with logStdout and logStderr set to true.
func ExecuteCmd(ctx context.Context, binary string, args, env []string, timeout time.Duration, stdout, stderr io.Writer) error {
return ExecuteCmdWithConfigurableLogging(ctx, binary, args, env, timeout, stdout, stderr, true, true)
}
// ExecuteCmdWithConfigurableLogging executes the specified binary with the specified args and env.
// Stdout and Stderr are written to stdout and stderr respectively if specified. If not specified
// then Stdout and Stderr will be outputted only to sklog.
func ExecuteCmdWithConfigurableLogging(ctx context.Context, binary string, args, env []string, timeout time.Duration, stdout, stderr io.Writer, logStdout, logStderr bool) error {
return exec.Run(ctx, &exec.Command{
Name: binary,
Args: args,
Env: env,
InheritPath: true,
Timeout: timeout,
LogStdout: logStdout,
Stdout: stdout,
LogStderr: logStderr,
Stderr: stderr,
})
}
// SyncDir runs "git pull" and "gclient sync" on the specified directory.
// The revisions map enforces revision/hash for the solutions with the format
// branch@rev.
func SyncDir(ctx context.Context, dir string, revisions map[string]string, additionalArgs []string, gitExec string) error {
err := os.Chdir(dir)
if err != nil {
return fmt.Errorf("Could not chdir to %s: %s", dir, err)
}
for i := 0; i < MAX_SYNC_TRIES; i++ {
if i > 0 {
sklog.Warningf("%d. retry for syncing %s", i, dir)
}
err = syncDirStep(ctx, revisions, additionalArgs, gitExec)
if err == nil {
break
}
sklog.Errorf("Error syncing %s: %s", dir, err)
}
if err != nil {
sklog.Errorf("Failed to sync %s after %d attempts", dir, MAX_SYNC_TRIES)
}
return err
}
func syncDirStep(ctx context.Context, revisions map[string]string, additionalArgs []string, gitExec string) error {
err := ExecuteCmd(ctx, gitExec, []string{"pull"}, []string{}, GIT_PULL_TIMEOUT, nil, nil)
if err != nil {
return fmt.Errorf("Error running git pull: %s", err)
}
syncCmd := []string{"sync", "--force"}
syncCmd = append(syncCmd, additionalArgs...)
for branch, rev := range revisions {
syncCmd = append(syncCmd, "--revision")
syncCmd = append(syncCmd, fmt.Sprintf("%s@%s", branch, rev))
}
err = ExecuteCmd(ctx, BINARY_GCLIENT, syncCmd, []string{}, GCLIENT_SYNC_TIMEOUT, nil, nil)
if err != nil {
return fmt.Errorf("Error running gclient sync: %s", err)
}
return nil
}
func runSkiaGnGen(ctx context.Context, clangLocation, gnExtraArgs string) error {
// Run "bin/fetch-gn".
util.LogErr(ExecuteCmd(ctx, "bin/fetch-gn", []string{}, []string{}, FETCH_GN_TIMEOUT, nil,
nil))
// gn gen out/Release '--args=cc="/home/chrome-bot/test/clang_linux/bin/clang" cxx="/home/chrome-bot/test/clang_linux/bin/clang++" extra_cflags=["-B/home/chrome-bot/test/clang_linux/bin"] extra_ldflags=["-B/home/chrome-bot/test/clang_linux/bin", "-fuse-ld=lld"] is_debug=false target_cpu="x86_64"'
gnArgs := fmt.Sprintf("--args=cc=\"%s/bin/clang\" cxx=\"%s/bin/clang++\" extra_cflags=[\"-B%s/bin\"] extra_ldflags=[\"-B%s/bin\", \"-fuse-ld=lld\"] is_debug=false target_cpu=\"x86_64\"", clangLocation, clangLocation, clangLocation, clangLocation)
if gnExtraArgs != "" {
gnArgs += " " + gnExtraArgs
}
if err := ExecuteCmd(ctx, "buildtools/linux64/gn", []string{"gen", "out/Release", gnArgs}, os.Environ(), GN_GEN_TIMEOUT, nil, nil); err != nil {
return fmt.Errorf("Error while running gn: %s", err)
}
return nil
}
// GetCipdPackageFromAsset returns a string of the format "path:package_name:version".
// It returns the latest version of the asset via gitiles.
func GetCipdPackageFromAsset(assetName string) (string, error) {
// Find the latest version of the asset from gitiles.
assetVersionFilePath := path.Join("infra", "bots", "assets", assetName, "VERSION")
contents, err := gitiles.NewRepo(common.REPO_SKIA, nil).ReadFile(context.Background(), assetVersionFilePath)
if err != nil {
return "", err
}
return fmt.Sprintf("%s:skia/bots/%s:version:%s", assetName, assetName, strings.TrimSpace(string(contents))), nil
}
// ResetCheckout resets the specified Git checkout.
func ResetCheckout(ctx context.Context, dir, resetTo, checkoutArg, gitExec string) error {
if err := os.Chdir(dir); err != nil {
return fmt.Errorf("Could not chdir to %s: %s", dir, err)
}
// Clear out remnants of incomplete rebases from .git/rebase-apply.
rebaseArgs := []string{"rebase", "--abort"}
util.LogErr(ExecuteCmd(ctx, gitExec, rebaseArgs, []string{}, GIT_REBASE_TIMEOUT, nil, nil))
// Checkout the specified branch or argument (eg: --detach).
checkoutArgs := []string{"checkout", checkoutArg}
util.LogErr(ExecuteCmd(ctx, gitExec, checkoutArgs, []string{}, GIT_CHECKOUT_TIMEOUT, nil, nil))
// Run "git reset --hard HEAD"
resetArgs := []string{"reset", "--hard", resetTo}
util.LogErr(ExecuteCmd(ctx, gitExec, resetArgs, []string{}, GIT_RESET_TIMEOUT, nil, nil))
// Run "git clean -f"
// Not doing "-d" here because it can delete directories like "/android_build_tools/aapt2/lib64/"
// even if "/android_build_tools/aapt2/lib64/*.so" is in .gitignore.
cleanArgs := []string{"clean", "-f"}
util.LogErr(ExecuteCmd(ctx, gitExec, cleanArgs, []string{}, GIT_CLEAN_TIMEOUT, nil, nil))
return nil
}
// ApplyPatch applies a patch to a Git checkout.
func ApplyPatch(ctx context.Context, patch, dir, gitExec string) error {
if err := os.Chdir(dir); err != nil {
return fmt.Errorf("Could not chdir to %s: %s", dir, err)
}
// Run "git apply --index -p1 --verbose --ignore-whitespace
// --ignore-space-change ${PATCH_FILE}"
args := []string{"apply", "--index", "-p1", "--verbose", "--ignore-whitespace", "--ignore-space-change", patch}
return ExecuteCmd(ctx, gitExec, args, []string{}, GIT_APPLY_TIMEOUT, nil, nil)
}
// CleanTmpDir deletes all tmp files from the caller because telemetry tends to
// generate a lot of temporary artifacts there and they take up root disk space.
func CleanTmpDir() {
files, _ := os.ReadDir(os.TempDir())
for _, f := range files {
util.RemoveAll(filepath.Join(os.TempDir(), f.Name()))
}
}
func GetTimeFromTs(formattedTime string) time.Time {
t, _ := time.Parse(TS_FORMAT, formattedTime)
return t
}
func GetCurrentTs() string {
return time.Now().UTC().Format(TS_FORMAT)
}
func GetCurrentTsInt64() int64 {
ts, err := strconv.ParseInt(GetCurrentTs(), 10, 64)
if err != nil {
sklog.Fatalf("Could not parse timestamp: %s", err)
}
return ts
}
// Returns channel that contains all pageset file names without the timestamp
// file and pyc files.
func GetClosedChannelOfPagesets(fileInfos []fs.DirEntry) chan string {
pagesetsChannel := make(chan string, len(fileInfos))
for _, fileInfo := range fileInfos {
pagesetName := fileInfo.Name()
pagesetBaseName := filepath.Base(pagesetName)
if filepath.Ext(pagesetBaseName) == ".pyc" {
// Ignore .pyc files.
continue
}
pagesetsChannel <- pagesetName
}
close(pagesetsChannel)
return pagesetsChannel
}
// Running benchmarks in parallel leads to multiple chrome instances coming up
// at the same time, when there are crashes chrome processes stick around which
// can severely impact the machine's performance. To stop this from
// happening chrome zombie processes are periodically killed.
func ChromeProcessesCleaner(ctx context.Context, locker sync.Locker, chromeCleanerTimer time.Duration) {
for range time.Tick(chromeCleanerTimer) {
sklog.Info("The chromeProcessesCleaner goroutine has started")
sklog.Info("Waiting for all existing tasks to complete before killing zombie chrome processes")
locker.Lock()
util.LogErr(ExecuteCmd(ctx, "pkill", []string{"-9", "chrome"}, []string{}, PKILL_TIMEOUT, nil, nil))
locker.Unlock()
}
}
// Contains the data included in CT pagesets.
type PagesetVars struct {
// A comma separated list of URLs.
UrlsList string `json:"urls_list"`
// Will be either "mobile" or "desktop".
UserAgent string `json:"user_agent"`
// The location of the web page's WPR data file.
ArchiveDataFile string `json:"archive_data_file"`
}
func ReadPageset(pagesetPath string) (PagesetVars, error) {
decodedPageset := PagesetVars{}
pagesetContent, err := os.Open(pagesetPath)
defer util.Close(pagesetContent)
if err != nil {
return decodedPageset, fmt.Errorf("Could not read %s: %s", pagesetPath, err)
}
if err := json.NewDecoder(pagesetContent).Decode(&decodedPageset); err != nil {
return decodedPageset, fmt.Errorf("Could not JSON decode %s: %s", pagesetPath, err)
}
return decodedPageset, nil
}
// GetStartRange returns the range worker should start processing at based on its num and how many
// pages it is allowed to process.
func GetStartRange(workerNum, numPagesPerBot int) int {
return ((workerNum - 1) * numPagesPerBot) + 1
}
// GetNumPagesPerBot returns the number of web pages each worker should process.
func GetNumPagesPerBot(repeatValue, maxPagesPerBot int) int {
return int(math.Ceil(float64(maxPagesPerBot) / float64(repeatValue)))
}
// UploadToCAS uploads the given CasSpec and returns the resulting digest.
func UploadToCAS(ctx context.Context, casClient cas.CAS, casSpec *CasSpec, local, runOnMaster bool) (string, error) {
casRoot, err := GetCASRoot(local, runOnMaster)
if err != nil {
return "", skerr.Wrapf(err, "failed to get CAS root")
}
// Gather the paths to upload.
paths := util.NewStringSet()
digests := util.NewStringSet()
var gather func(*CasSpec)
gather = func(casSpec *CasSpec) {
paths.AddLists(casSpec.Paths)
digests.AddLists(casSpec.IncludeDigests)
for _, includeCasSpec := range casSpec.IncludeCasSpecs {
gather(includeCasSpec)
}
}
gather(casSpec)
// Upload.
rootDigest, err := casClient.Upload(ctx, casRoot, paths.Keys(), nil)
if err != nil {
return "", skerr.Wrapf(err, "failed to upload to CAS")
}
digests[rootDigest] = true
mergedCASDigest, err := casClient.Merge(ctx, digests.Keys())
if err != nil {
return "", skerr.Wrapf(err, "failed to merge CAS digests")
}
return mergedCASDigest, nil
}
// TriggerSwarmingTask returns the number of triggered tasks and an error (if any).
func TriggerSwarmingTask(ctx context.Context, pagesetType, taskPrefix, runID, targetPlatform string, casSpec *CasSpec, hardTimeout, ioTimeout time.Duration, priority, maxPagesPerBot, numPages int, runOnGCE, local bool, repeatValue int, baseCmd []string, swarmingClient swarmingv2.SwarmingV2Client, casClient cas.CAS) (int, error) {
// Upload the task inputs to CAS.
casDigest, err := UploadToCAS(ctx, casClient, casSpec, local, false)
if err != nil {
return 0, skerr.Wrapf(err, "failed to upload CAS inputs for task")
}
// Create swarming commands for all tasks.
numPagesPerBot := GetNumPagesPerBot(repeatValue, maxPagesPerBot)
numTasks := int(math.Ceil(float64(numPages) / float64(numPagesPerBot)))
tasksToCmds := map[string][]string{}
for i := 1; i <= numTasks; i++ {
taskCmd := append(
baseCmd,
"--start_range="+strconv.Itoa(GetStartRange(i, numPagesPerBot)),
"--num="+strconv.Itoa(numPagesPerBot),
)
if pagesetType != "" {
taskCmd = append(taskCmd, "--pageset_type="+pagesetType)
}
taskName := fmt.Sprintf("%s_%d", taskPrefix, i)
tasksToCmds[taskName] = taskCmd
}
// Find swarming dimensions to use.
var dimensions map[string]string
if runOnGCE {
if targetPlatform == PLATFORM_WINDOWS {
dimensions = GCE_WINDOWS_WORKER_DIMENSIONS
} else {
dimensions = GCE_LINUX_WORKER_DIMENSIONS
}
} else {
if targetPlatform == PLATFORM_ANDROID {
dimensions = GOLO_ANDROID_WORKER_DIMENSIONS
} else {
dimensions = GOLO_LINUX_WORKER_DIMENSIONS
}
}
// The channel where batches of tasks to be triggered and collected will be sent to.
chTasks := make(chan map[string][]string)
// Kick off one goroutine to populate the above channel.
go func() {
defer close(chTasks)
tmpMap := map[string][]string{}
for task, cmds := range tasksToCmds {
if len(tmpMap) >= MAX_SIMULTANEOUS_SWARMING_TASKS_PER_RUN {
// Add the map to the channel.
chTasks <- tmpMap
// Reinitialize the temporary map.
tmpMap = map[string][]string{}
}
tmpMap[task] = cmds
}
chTasks <- tmpMap
}()
cipdPkgs := []string{}
if targetPlatform == PLATFORM_WINDOWS {
cipdPkgs = append(cipdPkgs, LUCI_AUTH_CIPD_PACKAGE_WIN)
} else {
cipdPkgs = append(cipdPkgs, LUCI_AUTH_CIPD_PACKAGE_LINUX)
cipdPkgs = append(cipdPkgs, cipd.GetStrCIPDPkgs(cipd.PkgsPython[cipd.PlatformLinuxAmd64])...)
}
if targetPlatform == PLATFORM_ANDROID {
// Add adb CIPD package for Android runs.
cipdPkgs = append(cipdPkgs, ADB_CIPD_PACKAGE)
}
// Trigger and collect swarming tasks.
for tasksMap := range chTasks {
// Collect all tasks and retrigger the ones that fail. Do this in a goroutine for
// each task so that it is done in parallel and retries are immediately triggered
// instead of at the end (see skbug.com/8191).
var wg sync.WaitGroup
for taskName, cmd := range tasksMap {
wg.Add(1)
// https://golang.org/doc/faq#closures_and_goroutines
taskName := taskName
cmd := cmd
go func() {
defer wg.Done()
req, err := MakeSwarmingTaskRequest(ctx, taskName, casDigest, cipdPkgs, cmd, []string{"name:" + taskName, "runid:" + runID}, dimensions, map[string][]string{"PATH": CIPD_PATHS}, int32(priority), ioTimeout, casClient)
if err != nil {
sklog.Errorf("Failed to create Swarming task request for task %q: %s", taskName, err)
}
resp, err := swarmingClient.NewTask(ctx, req)
if err != nil {
sklog.Errorf("Could not trigger swarming task %s: %s", taskName, err)
return
}
_, state, err := pollSwarmingTaskToCompletion(ctx, resp.TaskId, swarmingClient)
if err != nil {
sklog.Errorf("task %s failed: %s", taskName, err)
if state == apipb.TaskState_KILLED {
sklog.Infof("task %s was killed (either manually or via CT's delete button). Not going to retry it.", taskName)
return
}
sklog.Infof("Retrying task %s with high priority %d", taskName, TASKS_PRIORITY_HIGH)
req.Priority = TASKS_PRIORITY_HIGH
retryResp, err := swarmingClient.NewTask(ctx, req)
if err != nil {
sklog.Errorf("Could not trigger swarming retry task %s: %s", taskName, err)
return
}
if _, _, err := pollSwarmingTaskToCompletion(ctx, retryResp.TaskId, swarmingClient); err != nil {
sklog.Errorf("task %s failed inspite of a retry: %s", taskName, err)
return
}
}
}()
}
wg.Wait()
}
return numTasks, nil
}
// GetCASRoot returns the location of CT's CAS inputs. local should be set to
// true when debugging locally. runOnMaster should be set on ctfe. If both are
// false then it is assumed that we are running on a swarming bot.
func GetCASRoot(local, runOnMaster bool) (string, error) {
if local {
_, currentFile, _, _ := runtime.Caller(0)
return filepath.Join(filepath.Dir(filepath.Dir(filepath.Dir(currentFile)))), nil
} else if runOnMaster {
return filepath.Join("/", "usr", "local", "share", "ctfe"), nil
} else {
return filepath.Abs(filepath.Join(filepath.Dir(filepath.Dir(os.Args[0]))))
}
}
// GetPathToPyFiles returns the location of CT's python scripts.
// local should be set to true if we need the location of py files when debugging locally.
func GetPathToPyFiles(local bool) (string, error) {
if local {
_, currentFile, _, _ := runtime.Caller(0)
return filepath.Join(filepath.Dir(filepath.Dir(filepath.Dir(currentFile))), "py"), nil
} else {
return filepath.Abs(filepath.Join(filepath.Dir(filepath.Dir(os.Args[0])), "py"))
}
}
// GetPathToTelemetryBinaries returns the location of Telemetry binaries.
func GetPathToTelemetryBinaries(local bool) string {
if local {
return TelemetryBinariesDir
} else {
return filepath.Join(filepath.Dir(filepath.Dir(os.Args[0])), "tools", "perf")
}
}
// GetPathToTelemetryBinaries returns the location of CT binaries in Telemetry.
func GetPathToTelemetryCTBinaries(local bool) string {
return filepath.Join(GetPathToTelemetryBinaries(local), "contrib", "cluster_telemetry")
}
func MergeUploadCSVFiles(ctx context.Context, runID, pathToPyFiles string, gs *GcsUtil, totalPages, maxPagesPerBot int, handleStrings bool, repeatValue int) (string, []string, error) {
localOutputDir := filepath.Join(StorageDir, BenchmarkRunsDir, runID)
MkdirAll(localOutputDir, 0700)
noOutputWorkers := []string{}
// Copy outputs from all workers locally.
numPagesPerBot := GetNumPagesPerBot(repeatValue, maxPagesPerBot)
numTasks := int(math.Ceil(float64(totalPages) / float64(numPagesPerBot)))
for i := 1; i <= numTasks; i++ {
startRange := GetStartRange(i, numPagesPerBot)
workerLocalOutputPath := filepath.Join(localOutputDir, strconv.Itoa(startRange)+".csv")
workerRemoteOutputPath := filepath.Join(BenchmarkRunsDir, runID, strconv.Itoa(startRange), "outputs", runID+".output")
respBody, err := gs.GetRemoteFileContents(workerRemoteOutputPath)
if err != nil {
sklog.Errorf("Could not fetch %s: %s", workerRemoteOutputPath, err)
noOutputWorkers = append(noOutputWorkers, strconv.Itoa(i))
continue
}
defer util.Close(respBody)
out, err := os.Create(workerLocalOutputPath)
if err != nil {
return "", noOutputWorkers, fmt.Errorf("Unable to create file %s: %s", workerLocalOutputPath, err)
}
defer util.Close(out)
defer util.Remove(workerLocalOutputPath)
if _, err = io.Copy(out, respBody); err != nil {
return "", noOutputWorkers, fmt.Errorf("Unable to copy to file %s: %s", workerLocalOutputPath, err)
}
// If an output is less than 20 bytes that means something went wrong on the worker.
outputInfo, err := out.Stat()
if err != nil {
return "", noOutputWorkers, fmt.Errorf("Unable to stat file %s: %s", workerLocalOutputPath, err)
}
if outputInfo.Size() <= 20 {
sklog.Errorf("Output file was less than 20 bytes %s: %s", workerLocalOutputPath, err)
noOutputWorkers = append(noOutputWorkers, strconv.Itoa(i))
continue
}
}
// Call csv_merger.py to merge all results into a single results CSV.
pathToCsvMerger := filepath.Join(pathToPyFiles, "csv_merger.py")
outputFileName := runID + ".output"
outputFilePath := filepath.Join(localOutputDir, outputFileName)
args := []string{
pathToCsvMerger,
"--csv_dir=" + localOutputDir,
"--output_csv_name=" + outputFilePath,
}
if handleStrings {
args = append(args, "--handle_strings")
}
err := ExecuteCmd(ctx, BINARY_PYTHON, args, []string{fmt.Sprintf("VPYTHON_VIRTUALENV_ROOT=%s", VPYTHON_VIRTUALENV_ROOT)}, CSV_MERGER_TIMEOUT, nil, nil)
if err != nil {
return outputFilePath, noOutputWorkers, fmt.Errorf("Error running csv_merger.py: %s", err)
}
// Copy the output file to Google Storage.
remoteOutputDir := path.Join(BenchmarkRunsStorageDir, runID, "consolidated_outputs")
if err := gs.UploadFile(outputFileName, localOutputDir, remoteOutputDir); err != nil {
return outputFilePath, noOutputWorkers, fmt.Errorf("Unable to upload %s to %s: %s", outputFileName, remoteOutputDir, err)
}
return outputFilePath, noOutputWorkers, nil
}
// GetStrFlagValue returns the defaultValue if the specified flag name is not in benchmarkArgs.
func GetStrFlagValue(benchmarkArgs, flagName, defaultValue string) string {
if strings.Contains(benchmarkArgs, flagName) {
r := regexp.MustCompile(flagName + `[= ](\w+)`)
m := r.FindStringSubmatch(benchmarkArgs)
if len(m) >= 2 {
return m[1]
}
}
// If we reached here then return the default Value.
return defaultValue
}
// GetUserAgentValue returns the defaultValue if "--user-agent" is not specified in benchmarkArgs.
func GetUserAgentValue(benchmarkArgs, defaultValue string) string {
return GetStrFlagValue(benchmarkArgs, USER_AGENT_FLAG, defaultValue)
}
// GetRepeatValue returns the defaultValue if "--pageset-repeat" is not specified in benchmarkArgs.
func GetRepeatValue(benchmarkArgs string, defaultValue int) int {
return GetIntFlagValue(benchmarkArgs, PAGESET_REPEAT_FLAG, defaultValue)
}
// GetRunBenchmarkTimeoutValue returns the defaultValue if "--run_benchmark_timeout" is not specified in benchmarkArgs.
func GetRunBenchmarkTimeoutValue(benchmarkArgs string, defaultValue int) int {
return GetIntFlagValue(benchmarkArgs, RUN_BENCHMARK_TIMEOUT_FLAG, defaultValue)
}
// GetMaxPagesPerBotValue returns the defaultValue if "--max-pages-per-bot" is not specified in benchmarkArgs.
func GetMaxPagesPerBotValue(benchmarkArgs string, defaultValue int) int {
return GetIntFlagValue(benchmarkArgs, MAX_PAGES_PER_BOT, defaultValue)
}
// GetNumAnalysisRetriesValue returns the defaultValue if "--num-analysis-retries" is not specified in benchmarkArgs.
func GetNumAnalysisRetriesValue(benchmarkArgs string, defaultValue int) int {
return GetIntFlagValue(benchmarkArgs, NUM_ANALYSIS_RETRIES, defaultValue)
}
// GetIntFlagValue returns the defaultValue if the specified flag name is not in benchmarkArgs.
func GetIntFlagValue(benchmarkArgs, flagName string, defaultValue int) int {
if strings.Contains(benchmarkArgs, flagName) {
r := regexp.MustCompile(flagName + `[= ](\d+)`)
m := r.FindStringSubmatch(benchmarkArgs)
if len(m) != 0 {
ret, err := strconv.Atoi(m[1])
if err != nil {
return defaultValue
}
return ret
}
}
// If we reached here then return the default Value.
return defaultValue
}
func RemoveFlagsFromArgs(benchmarkArgs string, flags ...string) string {
for _, f := range flags {
re, err := regexp.Compile(fmt.Sprintf(`\s*%s(=[[:alnum:]]*)?\s*`, f))
if err != nil {
sklog.Warningf("Could not compile flag regex with %s: %s", f, err)
continue
}
benchmarkArgs = re.ReplaceAllString(benchmarkArgs, " ")
}
// Remove extra whitespace.
return strings.Join(strings.Fields(benchmarkArgs), " ")
}
// RunBenchmark runs the specified benchmark with the specified arguments. It prints the output of
// the run_benchmark command and also returns the output in case the caller needs to do any
// post-processing on it. In case of any errors the output will be empty.
func RunBenchmark(ctx context.Context, fileInfoName, pathToPagesets, pathToPyFiles, localOutputDir, chromiumBinary, runID, browserExtraArgs, benchmarkName, targetPlatform, benchmarkExtraArgs, pagesetType string, defaultRepeatValue int, runOnSwarming bool) (string, error) {
pagesetBaseName := filepath.Base(fileInfoName)
if filepath.Ext(pagesetBaseName) == ".pyc" {
// Ignore .pyc files.
return "", nil
}
// Read the pageset.
pagesetName := strings.TrimSuffix(pagesetBaseName, filepath.Ext(pagesetBaseName))
pagesetPath := filepath.Join(pathToPagesets, fileInfoName)
decodedPageset, err := ReadPageset(pagesetPath)
if err != nil {
return "", fmt.Errorf("Could not read %s: %s", pagesetPath, err)
}
sklog.Infof("===== Processing %s for %s =====", pagesetPath, runID)
args := []string{
filepath.Join(GetPathToTelemetryBinaries(!runOnSwarming), BINARY_RUN_BENCHMARK),
benchmarkName,
"--also-run-disabled-tests",
"--urls-list=" + decodedPageset.UrlsList,
"--archive-data-file=" + decodedPageset.ArchiveDataFile,
}
if GetUserAgentValue(benchmarkExtraArgs, "") == "" {
// Add --user-agent only if the flag is not already specified. See skbug.com/11283 for context.
args = append(args, "--user-agent="+decodedPageset.UserAgent)
}
// Need to capture output for all benchmarks.
outputDirArgValue := filepath.Join(localOutputDir, pagesetName)
args = append(args, "--output-dir="+outputDirArgValue)
// Figure out which browser and device should be used.
if targetPlatform == PLATFORM_ANDROID {
if err := InstallChromeAPK(ctx, chromiumBinary); err != nil {
return "", fmt.Errorf("Error while installing APK: %s", err)
}
if strings.Contains(chromiumBinary, CUSTOM_APK_DIR_NAME) {
// TODO(rmistry): Not sure if the custom APK will always be called android-chrome. Might have to
// make this configurable if it can vary or unzip and use sed to get the app name from the APK.
args = append(args, "--browser=android-chrome")
} else {
args = append(args, "--browser=android-chromium")
}
} else {
args = append(args, "--browser=exact", "--browser-executable="+chromiumBinary)
args = append(args, "--device=desktop")
}
// Calculate the timeout.
timeoutSecs := GetRunBenchmarkTimeoutValue(benchmarkExtraArgs, PagesetTypeToInfo[pagesetType].RunChromiumPerfTimeoutSecs)
repeatBenchmark := GetRepeatValue(benchmarkExtraArgs, defaultRepeatValue)
if repeatBenchmark > 0 {
args = append(args, fmt.Sprintf("%s=%d", PAGESET_REPEAT_FLAG, repeatBenchmark))
// Increase the timeoutSecs if repeats are used.
timeoutSecs = timeoutSecs * repeatBenchmark
}
sklog.Infof("Using %d seconds for timeout", timeoutSecs)
// Remove from benchmarkExtraArgs "special" flags that are recognized by CT but not
// by the run_benchmark script.
benchmarkExtraArgs = RemoveFlagsFromArgs(benchmarkExtraArgs, RUN_BENCHMARK_TIMEOUT_FLAG, MAX_PAGES_PER_BOT, NUM_ANALYSIS_RETRIES)
// Split benchmark args if not empty and append to args.
if benchmarkExtraArgs != "" {
args = append(args, strings.Fields(benchmarkExtraArgs)...)
}
// Add browserArgs if not empty to args.
if browserExtraArgs != "" {
args = append(args, "--extra-browser-args="+browserExtraArgs)
}
env := []string{}
if targetPlatform != PLATFORM_WINDOWS {
// Set the DISPLAY.
env = append(env, "DISPLAY=:0")
}
pythonExec := BINARY_VPYTHON3
// Set VPYTHON_VIRTUALENV_ROOT for vpython
env = append(env, fmt.Sprintf("VPYTHON_VIRTUALENV_ROOT=%s", VPYTHON_VIRTUALENV_ROOT))
// Append the original environment as well.
for _, e := range os.Environ() {
env = append(env, e)
}
if targetPlatform == PLATFORM_WINDOWS {
// Could not figure out how to make vpython work on windows so use python instead.
// The downside of this is that we might have to keep installing packages on win GCE
// instances.
pythonExec = BINARY_PYTHON
} else if targetPlatform == PLATFORM_ANDROID {
// Reset android logcat prior to the run so that we can examine the logs later.
util.LogErr(ExecuteCmd(ctx, BINARY_ADB, []string{"logcat", "-c"}, []string{}, ADB_ROOT_TIMEOUT, nil, nil))
}
// Create buffer for capturing the stdout and stderr of the benchmark run.
var b bytes.Buffer
if _, err := b.WriteString(fmt.Sprintf("========== Stdout and stderr for %s ==========\n", pagesetPath)); err != nil {
return "", fmt.Errorf("Error writing to output buffer: %s", err)
}
if err := ExecuteCmdWithConfigurableLogging(ctx, pythonExec, args, env, time.Duration(timeoutSecs)*time.Second, &b, &b, false, false); err != nil {
if targetPlatform == PLATFORM_ANDROID {
// Kill the port-forwarder to start from a clean slate.
util.LogErr(ExecuteCmdWithConfigurableLogging(ctx, "pkill", []string{"-f", "forwarder_host"}, []string{}, PKILL_TIMEOUT, &b, &b, false, false))
}
output, getErr := GetRunBenchmarkOutput(b)
util.LogErr(getErr)
fmt.Println(output)
return "", fmt.Errorf("Run benchmark command failed with: %s", err)
}
// Append logcat output if we ran on Android.
if targetPlatform == PLATFORM_ANDROID {
if err := ExecuteCmdWithConfigurableLogging(ctx, BINARY_ADB, []string{"logcat", "-d"}, env, ADB_ROOT_TIMEOUT, &b, &b, false, false); err != nil {
return "", fmt.Errorf("Error running logcat -d: %s", err)
}
}
output, err := GetRunBenchmarkOutput(b)
if err != nil {
return "", fmt.Errorf("Could not get run benchmark output: %s", err)
}
// Print the output and return.
fmt.Println(output)
return output, nil
}
func GetRunBenchmarkOutput(b bytes.Buffer) (string, error) {
if _, err := b.WriteString("===================="); err != nil {
return "", fmt.Errorf("Error writing to output buffer: %s", err)
}
return b.String(), nil
}
func MergeUploadCSVFilesOnWorkers(ctx context.Context, localOutputDir, pathToPyFiles, runID, remoteDir, valueColumnName string, gs *GcsUtil, startRange int, handleStrings, addRanks bool, pageRankToAdditionalFields map[string]map[string]string) error {
// Move all results into a single directory.
fileInfos, err := os.ReadDir(localOutputDir)
if err != nil {
return fmt.Errorf("Unable to read %s: %s", localOutputDir, err)
}
for _, fileInfo := range fileInfos {
if !fileInfo.IsDir() {
continue
}
outputFile := filepath.Join(localOutputDir, fileInfo.Name(), "results.csv")
newFile := filepath.Join(localOutputDir, fmt.Sprintf("%s.csv", fileInfo.Name()))
if err := os.Rename(outputFile, newFile); err != nil {
sklog.Errorf("Could not rename %s to %s: %s", outputFile, newFile, err)
continue
}
if addRanks || len(pageRankToAdditionalFields) != 0 {
headers, values, err := GetRowsFromCSV(newFile)
if err != nil {
sklog.Errorf("Could not read %s: %s", newFile, err)
continue
}
// Add the rank of the page to the CSV file.
pageRank := fileInfo.Name()
pageNameWithRank := ""
for i := range headers {
for j := range values {
if headers[i] == "stories" && addRanks {
pageNameWithRank = fmt.Sprintf("%s (#%s)", values[j][i], pageRank)
values[j][i] = pageNameWithRank
}
}
}
// Add additionalFields (if any) to the output CSV.
if additionalFields, ok := pageRankToAdditionalFields[fileInfo.Name()]; ok {
for h, v := range additionalFields {
valueLine := make([]string, len(headers))
for i := range headers {
if headers[i] == "name" {
valueLine[i] = h
} else if headers[i] == valueColumnName {
valueLine[i] = v
} else if headers[i] == "stories" && addRanks {
valueLine[i] = pageNameWithRank
} else {
valueLine[i] = ""
}
}
values = append(values, valueLine)
}
}
if err := writeRowsToCSV(newFile, headers, values); err != nil {
sklog.Errorf("Could not write to %s: %s", newFile, err)
continue
}
}
}
// Call csv_pivot_table_merger.py to merge all results into a single results CSV.
pathToCsvMerger := filepath.Join(pathToPyFiles, "csv_pivot_table_merger.py")
outputFileName := runID + ".output"
args := []string{
pathToCsvMerger,
"--csv_dir=" + localOutputDir,
"--output_csv_name=" + filepath.Join(localOutputDir, outputFileName),
"--value_column_name=" + valueColumnName,
}
if handleStrings {
args = append(args, "--handle_strings")
}
err = ExecuteCmd(ctx, BINARY_PYTHON, args, []string{fmt.Sprintf("VPYTHON_VIRTUALENV_ROOT=%s", VPYTHON_VIRTUALENV_ROOT)}, CSV_PIVOT_TABLE_MERGER_TIMEOUT, nil, nil)
if err != nil {
return fmt.Errorf("Error running csv_pivot_table_merger.py: %s", err)
}
// Check to see if the output CSV has more than just the header line.
// TODO(rmistry): Inefficient to count all the lines when we really only want to know if
// it's > or <= 1 line.
lines, err := fileutil.CountLines(filepath.Join(localOutputDir, outputFileName))
if err != nil {
return fmt.Errorf("Could not count lines from %s: %s", filepath.Join(localOutputDir, outputFileName), err)
}
if lines <= 1 {
sklog.Warningf("%s has %d lines. More than 1 line is expected.", filepath.Join(localOutputDir, outputFileName), lines)
}
// Copy the output file to Google Storage.
remoteOutputDir := path.Join(remoteDir, strconv.Itoa(startRange), "outputs")
if err := gs.UploadFile(outputFileName, localOutputDir, remoteOutputDir); err != nil {
return fmt.Errorf("Unable to upload %s to %s: %s", outputFileName, remoteOutputDir, err)
}
return nil
}
// GetRowsFromCSV reads the provided CSV and returns it's headers (first row)
// and values (all other rows).
func GetRowsFromCSV(csvPath string) ([]string, [][]string, error) {
csvFile, err := os.Open(csvPath)
defer util.Close(csvFile)
if err != nil {
return nil, nil, fmt.Errorf("Could not open %s: %s", csvPath, err)
}
reader := csv.NewReader(csvFile)
reader.FieldsPerRecord = -1
rawCSVdata, err := reader.ReadAll()
if err != nil {
return nil, nil, fmt.Errorf("Could not read %s: %s", csvPath, err)
}
if len(rawCSVdata) < 2 {
return nil, nil, fmt.Errorf("No data in %s", csvPath)
}
return rawCSVdata[0], rawCSVdata[1:], nil
}
func writeRowsToCSV(csvPath string, headers []string, values [][]string) error {
csvFile, err := os.OpenFile(csvPath, os.O_WRONLY, 666)
defer util.Close(csvFile)
if err != nil {
return fmt.Errorf("Could not open %s: %s", csvPath, err)
}
writer := csv.NewWriter(csvFile)
defer writer.Flush()
// Write the headers.
if err := writer.Write(headers); err != nil {
return fmt.Errorf("Could not write to %s: %s", csvPath, err)
}
// Write all values.
for _, row := range values {
if err := writer.Write(row); err != nil {
return fmt.Errorf("Could not write to %s: %s", csvPath, err)
}
}
return nil
}
// pollSwarmingTaskToCompletion polls the specified swarming task till it completes. It returns the
// resulting CAS digest (if it exists) and the state of the swarming task if there is no error.
// TODO(rmistry): Use pubsub instead.
func pollSwarmingTaskToCompletion(ctx context.Context, taskId string, swarmingClient swarmingv2.SwarmingV2Client) (string, apipb.TaskState, error) {
for range time.Tick(2 * time.Minute) {
swarmingTask, err := swarmingClient.GetResult(ctx, &apipb.TaskIdWithPerfRequest{
TaskId: taskId,
IncludePerformanceStats: false,
})
if err != nil {
return "", apipb.TaskState_INVALID, fmt.Errorf("Could not get task %s: %s", taskId, err)
}
switch swarmingTask.State {
case apipb.TaskState_BOT_DIED, apipb.TaskState_CANCELED, apipb.TaskState_CLIENT_ERROR, apipb.TaskState_EXPIRED, apipb.TaskState_NO_RESOURCE, apipb.TaskState_TIMED_OUT, apipb.TaskState_KILLED:
return "", swarmingTask.State, fmt.Errorf("The task %s exited early with state %v", taskId, swarmingTask.State)
case apipb.TaskState_PENDING:
// The task is in pending state.
case apipb.TaskState_RUNNING:
// The task is in running state.
case apipb.TaskState_COMPLETED:
if swarmingTask.Failure {
return "", swarmingTask.State, fmt.Errorf("The task %s failed", taskId)
}
sklog.Infof("The task %s successfully completed", taskId)
if swarmingTask.CasOutputRoot == nil {
return "", swarmingTask.State, nil
}
digest := rbe.DigestToString(swarmingTask.CasOutputRoot.Digest.Hash, swarmingTask.CasOutputRoot.Digest.SizeBytes)
return digest, swarmingTask.State, nil
default:
sklog.Errorf("Unknown swarming state %v in %v", swarmingTask.State, swarmingTask)
}
}
return "", apipb.TaskState_INVALID, nil
}
// TriggerIsolateTelemetrySwarmingTask triggers a swarming task which runs the
// isolate_telemetry worker script to upload telemetry to CAS and returns the
// resulting digest.
func TriggerIsolateTelemetrySwarmingTask(ctx context.Context, taskName, runID, chromiumHash, serviceAccountJSON, targetPlatform string, patches []string, hardTimeout, ioTimeout time.Duration, local bool, swarmingClient swarmingv2.SwarmingV2Client, casClient cas.CAS) (string, error) {
// Find which dimensions, os and CIPD pkgs to use.
dimensions := GCE_LINUX_BUILDER_DIMENSIONS
var casSpec *CasSpec
cipdPkgs := []string{}
cipdPkgs = append(cipdPkgs, cipd.GetStrCIPDPkgs(specs.CIPD_PKGS_ISOLATE)...)
if targetPlatform == PLATFORM_WINDOWS {
dimensions = GCE_WINDOWS_BUILDER_DIMENSIONS
cipdPkgs = append(cipdPkgs, cipd.GetStrCIPDPkgs(cipd.PkgsGit[cipd.PlatformWindowsAmd64])...)
cipdPkgs = append(cipdPkgs, LUCI_AUTH_CIPD_PACKAGE_WIN)
} else {
casSpec = CasIsolateTelemetryLinux()
cipdPkgs = append(cipdPkgs, cipd.GetStrCIPDPkgs(cipd.PkgsGit[cipd.PlatformLinuxAmd64])...)
cipdPkgs = append(cipdPkgs, LUCI_AUTH_CIPD_PACKAGE_LINUX)
}
// Upload task inputs to CAS.
casDigest, err := UploadToCAS(ctx, casClient, casSpec, local, false)
if err != nil {
return "", skerr.Wrapf(err, "failed to upload task inputs to CAS")
}
// Trigger swarming task and wait for it to complete.
cmd := []string{
"luci-auth",
"context",
"--",
"bin/isolate_telemetry",
"-logtostderr",
"--run_id=" + runID,
"--chromium_hash=" + chromiumHash,
"--patches=" + strings.Join(patches, ","),
"--target_platform=" + targetPlatform,
"--out=${ISOLATED_OUTDIR}",
}
req, err := MakeSwarmingTaskRequest(ctx, taskName, casDigest, cipdPkgs, cmd, []string{"name:" + taskName, "runid:" + runID}, dimensions, map[string][]string{"PATH": CIPD_PATHS}, swarming.RECOMMENDED_PRIORITY, ioTimeout, casClient)
if err != nil {
return "", skerr.Wrapf(err, "failed to create Swarming task request")
}
resp, err := swarmingClient.NewTask(ctx, req)
if err != nil {
return "", fmt.Errorf("Could not trigger swarming task %s: %s", taskName, err)
}
outputDigest, _, err := pollSwarmingTaskToCompletion(ctx, resp.TaskId, swarmingClient)
if err != nil {
return "", fmt.Errorf("Could not collect task ID %s: %s", resp.TaskId, err)
}
// Download CAS output of the task.
outputDir, err := os.MkdirTemp("", fmt.Sprintf("download_%s", resp.TaskId))
if err != nil {
return "", fmt.Errorf("Failed to create temporary dir: %s", err)
}
defer util.RemoveAll(outputDir)
if err := casClient.Download(ctx, outputDir, outputDigest); err != nil {
return "", fmt.Errorf("Could not download %s: %s", outputDigest, err)
}
outputFile := filepath.Join(outputDir, ISOLATE_TELEMETRY_FILENAME)
contents, err := os.ReadFile(outputFile)
if err != nil {
return "", fmt.Errorf("Could not read outputfile %s: %s", outputFile, err)
}
return strings.Trim(string(contents), "\n"), nil
}
func MakeSwarmingTaskRequest(ctx context.Context, taskName, casDigest string, cipdPkgs, cmd, tags []string, dims map[string]string, envPrefixes map[string][]string, priority int32, ioTimeoutSecs time.Duration, casClient cas.CAS) (*apipb.NewTaskRequest, error) {
var cipdInput *apipb.CipdInput
if len(cipdPkgs) > 0 {
cipdInput = &apipb.CipdInput{
Packages: make([]*apipb.CipdPackage, 0, len(cipdPkgs)),
}
for _, p := range cipdPkgs {
tokens := strings.SplitN(p, ":", 3)
cipdInput.Packages = append(cipdInput.Packages, &apipb.CipdPackage{
Path: tokens[0],
PackageName: tokens[1],
Version: tokens[2],
})
}
}
swarmingDims := make([]*apipb.StringPair, 0, len(dims))
for k, v := range dims {
swarmingDims = append(swarmingDims, &apipb.StringPair{
Key: k,
Value: v,
})
}
var swarmingEnvPrefixes []*apipb.StringListPair
if len(envPrefixes) > 0 {
swarmingEnvPrefixes = make([]*apipb.StringListPair, 0, len(envPrefixes))
for k, v := range envPrefixes {
swarmingEnvPrefixes = append(swarmingEnvPrefixes, &apipb.StringListPair{
Key: k,
Value: v,
})
}
}
casInstance, err := rbe.GetCASInstance(casClient)
if err != nil {
return nil, skerr.Wrap(err)
}
casInput, err := swarmingv2.MakeCASReference(casDigest, casInstance)
if err != nil {
return nil, skerr.Wrap(err)
}
// CT runs use task authentication in swarming (see https://chrome-internal-review.googlesource.com/c/infradata/config/+/2878799/2#message-e3328dd455c1110cd2286a0c343b932594296ea3).
// This does not allow more than 48hours validity duration (expiration time + hard timeout).
executionTimeoutSecs := 36 * time.Hour
expirationTimeoutSecs := 2*24*time.Hour - executionTimeoutSecs - time.Hour // Remove one hour to be safe.
return &apipb.NewTaskRequest{
Name: taskName,
Priority: priority,
ServiceAccount: CT_SERVICE_ACCOUNT,
Tags: tags,
TaskSlices: []*apipb.TaskSlice{
{
ExpirationSecs: int32(expirationTimeoutSecs.Seconds()),
Properties: &apipb.TaskProperties{
CasInputRoot: casInput,
CipdInput: cipdInput,
Command: cmd,
Dimensions: swarmingDims,
EnvPrefixes: swarmingEnvPrefixes,
ExecutionTimeoutSecs: int32(executionTimeoutSecs.Seconds()),
IoTimeoutSecs: int32(ioTimeoutSecs.Seconds()),
},
WaitForCapacity: false,
},
},
User: CT_SERVICE_ACCOUNT,
}, nil
}
func TriggerMasterScriptSwarmingTask(ctx context.Context, runID, taskName string, local bool, cmd []string, casSpec *CasSpec, swarmingClient swarmingv2.SwarmingV2Client, casClient cas.CAS) (string, error) {
// Master scripts only need linux versions of their cipd packages. But still need to specify
// osType correctly so that exe binaries can be packaged for windows.
cipdPkgs := []string{}
cipdPkgs = append(cipdPkgs, cipd.GetStrCIPDPkgs(cipd.PkgsGit[cipd.PlatformLinuxAmd64])...)
cipdPkgs = append(cipdPkgs, LUCI_AUTH_CIPD_PACKAGE_LINUX)
// Upload the task inputs to CAS.
casDigest, err := UploadToCAS(ctx, casClient, casSpec, local, true)
if err != nil {
return "", skerr.Wrapf(err, "failed to upload task inputs to CAS")
}
// Trigger swarming task.
req, err := MakeSwarmingTaskRequest(ctx, taskName, casDigest, cipdPkgs, cmd, []string{"name:" + taskName, "runid:" + runID}, GCE_LINUX_MASTER_DIMENSIONS, map[string][]string{"PATH": CIPD_PATHS}, swarming.RECOMMENDED_PRIORITY, 3*24*time.Hour, casClient)
if err != nil {
return "", skerr.Wrapf(err, "failed to create Swarming task request")
}
resp, err := swarmingClient.NewTask(ctx, req)
if err != nil {
return "", fmt.Errorf("Could not trigger swarming task %s: %s", taskName, err)
}
return resp.TaskId, nil
}
// TriggerBuildRepoSwarmingTask triggers a swarming task which runs the
// build_repo worker script which will return a list of remote build
// directories.
func TriggerBuildRepoSwarmingTask(ctx context.Context, taskName, runID, repoAndTarget, targetPlatform, serviceAccountJSON, gnArgs string, hashes, patches, cipdPkgs []string, singleBuild, local bool, hardTimeout, ioTimeout time.Duration, swarmingClient swarmingv2.SwarmingV2Client, casClient cas.CAS) ([]string, error) {
// Find which os and CIPD pkgs to use.
var casSpec *CasSpec
if targetPlatform == PLATFORM_WINDOWS {
cipdPkgs = append(cipdPkgs, cipd.GetStrCIPDPkgs(cipd.PkgsGit[cipd.PlatformWindowsAmd64])...)
cipdPkgs = append(cipdPkgs, LUCI_AUTH_CIPD_PACKAGE_WIN)
} else {
casSpec = CasBuildRepoLinux()
cipdPkgs = append(cipdPkgs, cipd.GetStrCIPDPkgs(cipd.PkgsGit[cipd.PlatformLinuxAmd64])...)
cipdPkgs = append(cipdPkgs, LUCI_AUTH_CIPD_PACKAGE_LINUX)
}
// Upload task inputs to CAS.
casDigest, err := UploadToCAS(ctx, casClient, casSpec, local, false)
if err != nil {
return nil, skerr.Wrapf(err, "failed to upload task inputs to CAS")
}
// Trigger swarming task and wait for it to complete.
cmd := []string{
"luci-auth",
"context",
"--",
"bin/build_repo",
"-logtostderr",
"--run_id=" + runID,
"--repo_and_target=" + repoAndTarget,
"--gn_args=" + gnArgs,
"--hashes=" + strings.Join(hashes, ","),
"--patches=" + strings.Join(patches, ","),
"--single_build=" + strconv.FormatBool(singleBuild),
"--target_platform=" + targetPlatform,
"--out=${ISOLATED_OUTDIR}",
}
var dimensions map[string]string
if targetPlatform == PLATFORM_WINDOWS {
dimensions = GCE_WINDOWS_BUILDER_DIMENSIONS
} else if targetPlatform == PLATFORM_ANDROID {
dimensions = GCE_ANDROID_BUILDER_DIMENSIONS
} else {
dimensions = GCE_LINUX_BUILDER_DIMENSIONS
}
req, err := MakeSwarmingTaskRequest(ctx, taskName, casDigest, cipdPkgs, cmd, []string{"name:" + taskName, "runid:" + runID}, dimensions, map[string][]string{"PATH": CIPD_PATHS}, swarming.RECOMMENDED_PRIORITY, ioTimeout, casClient)
if err != nil {
return nil, skerr.Wrapf(err, "failed to create Swarming task request")
}
resp, err := swarmingClient.NewTask(ctx, req)
if err != nil {
return nil, fmt.Errorf("Could not trigger swarming task %s: %s", taskName, err)
}
outputDigest, _, err := pollSwarmingTaskToCompletion(ctx, resp.TaskId, swarmingClient)
if err != nil {
return nil, fmt.Errorf("Could not collect task ID %s: %s", resp.TaskId, err)
}
// Download output of the task.
outputDir, err := os.MkdirTemp("", fmt.Sprintf("download_%s", resp.TaskId))
if err != nil {
return nil, fmt.Errorf("Failed to create temporary dir: %s", err)
}
defer util.RemoveAll(outputDir)
if err := casClient.Download(ctx, outputDir, outputDigest); err != nil {
return nil, fmt.Errorf("Could not download %s: %s", outputDigest, err)
}
outputFile := filepath.Join(outputDir, BUILD_OUTPUT_FILENAME)
contents, err := os.ReadFile(outputFile)
if err != nil {
return nil, fmt.Errorf("Could not read outputfile %s: %s", outputFile, err)
}
return strings.Split(string(contents), ","), nil
}
func DownloadPatch(localPath, remotePath string, gs *GcsUtil) (int64, error) {
respBody, err := gs.GetRemoteFileContents(remotePath)
if err != nil {
return -1, fmt.Errorf("Could not fetch %s: %s", remotePath, err)
}
defer util.Close(respBody)
f, err := os.Create(localPath)
if err != nil {
return -1, fmt.Errorf("Could not create %s: %s", localPath, err)
}
defer util.Close(f)
written, err := io.Copy(f, respBody)
if err != nil {
return -1, fmt.Errorf("Could not write to %s: %s", localPath, err)
}
return written, nil
}
func DownloadAndApplyPatch(ctx context.Context, patchName, localDir, remotePatchesDir, checkout, gitExec string, gs *GcsUtil) error {
patchLocalPath := filepath.Join(localDir, patchName)
patchRemotePath := filepath.Join(remotePatchesDir, patchName)
written, err := DownloadPatch(patchLocalPath, patchRemotePath, gs)
if err != nil {
return fmt.Errorf("Could not download %s: %s", patchRemotePath, err)
}
// Apply patch to the local checkout.
if written > 10 {
if err := ApplyPatch(ctx, patchLocalPath, checkout, gitExec); err != nil {
return fmt.Errorf("Could not apply patch in %s: %s", checkout, err)
}
}
return nil
}
// GetArchivesNum returns the number of archives for the specified pagesetType.
// -1 is returned if USE_LIVE_SITES_FLAGS is specified or if there is an error.
func GetArchivesNum(gs *GcsUtil, benchmarkArgs, pagesetType string) (int, error) {
if strings.Contains(benchmarkArgs, USE_LIVE_SITES_FLAGS) {
return -1, nil
}
// Calculate the number of archives the workers worked with.
archivesRemoteDir := filepath.Join(SWARMING_DIR_NAME, WEB_ARCHIVES_DIR_NAME, pagesetType)
totalArchiveArtifacts, err := gs.GetRemoteDirCount(archivesRemoteDir)
if err != nil {
return -1, fmt.Errorf("Could not find archives in %s: %s", archivesRemoteDir, err)
}
// Each archive has a JSON file, a WPR file and a WPR.sha1 file.
return totalArchiveArtifacts / 3, nil
}
// GetHashesFromBuild returns the Chromium and Skia hashes from a CT build string.
// Example build string: try-27af50f-d5dcd58-rmistry-20151026102511-nopatch.
func GetHashesFromBuild(chromiumBuild string) (string, string) {
tokens := strings.Split(chromiumBuild, "-")
return tokens[1], tokens[2]
}
// GetNumPages returns the number of specified custom webpages. If Custom
// webpages are not specified then the number of pages associated with the
// pageset type is returned.
func GetNumPages(pagesetType, customWebPagesFilePath string) (int, error) {
customPages, err := GetCustomPages(customWebPagesFilePath)
if err != nil {
return PagesetTypeToInfo[pagesetType].NumPages, err
}
if len(customPages) == 0 {
return PagesetTypeToInfo[pagesetType].NumPages, nil
}
return len(customPages), nil
}
// GetCustomPages returns the specified custom webpages. If Custom
// webpages are not specified then it returns an empty slice.
func GetCustomPages(customWebPagesFilePath string) ([]string, error) {
csvFile, err := os.Open(customWebPagesFilePath)
if err != nil {
return nil, err
}
defer util.Close(csvFile)
reader := csv.NewReader(csvFile)
customPages := []string{}
for {
records, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
for _, record := range records {
if strings.TrimSpace(record) == "" {
continue
}
customPages = append(customPages, record)
}
}
return customPages, nil
}
func GetCustomPagesWithinRange(startRange, num int, customWebpages []string) []string {
startIndex := startRange - 1
endIndex := util.MinInt(startIndex+num, len(customWebpages))
return customWebpages[startIndex:endIndex]
}
func CreateCustomPagesets(webpages []string, pagesetsDir, targetPlatform string, startRange int) error {
// Empty the local dir.
util.RemoveAll(pagesetsDir)
// Create the local dir.
MkdirAll(pagesetsDir, 0700)
// Figure out which user agent to use.
var userAgent string
if targetPlatform == PLATFORM_ANDROID {
userAgent = "mobile"
} else {
userAgent = "desktop"
}
for i, w := range webpages {
pagesetPath := filepath.Join(pagesetsDir, fmt.Sprintf("%d.py", i+startRange))
if err := WritePageset(pagesetPath, userAgent, DEFAULT_CUSTOM_PAGE_ARCHIVEPATH, w); err != nil {
return err
}
}
return nil
}
func GetAnalysisOutputLink(runID string) string {
return GCS_HTTP_LINK + path.Join(GCSBucketName, BenchmarkRunsDir, runID, "consolidated_outputs", runID+".output")
}
func GetPerfRemoteDir(runID string) string {
return path.Join(ChromiumPerfRunsStorageDir, runID)
}
func GetPerfRemoteHTMLDir(runID string) string {
return path.Join(GetPerfRemoteDir(runID), "html")
}
func GetPerfOutputLinkBase(runID string) string {
return GCS_HTTP_LINK + path.Join(GCSBucketName, GetPerfRemoteHTMLDir(runID)) + "/"
}
func GetPerfOutputLink(runID string) string {
return GetPerfOutputLinkBase(runID) + "index.html"
}
func GetPerfNoPatchOutputLink(runID string) string {
runIDNoPatch := fmt.Sprintf("%s-nopatch", runID)
return GCS_HTTP_LINK + path.Join(GCSBucketName, BenchmarkRunsDir, runIDNoPatch, "consolidated_outputs", runIDNoPatch+".output")
}
func GetPerfWithPatchOutputLink(runID string) string {
runIDWithPatch := fmt.Sprintf("%s-withpatch", runID)
return GCS_HTTP_LINK + path.Join(GCSBucketName, BenchmarkRunsDir, runIDWithPatch, "consolidated_outputs", runIDWithPatch+".output")
}
func GetMetricsAnalysisOutputLink(runID string) string {
return GCS_HTTP_LINK + path.Join(GCSBucketName, BenchmarkRunsDir, runID, "consolidated_outputs", runID+".output")
}
func SavePatchToStorage(patch string) (string, error) {
if len(patch) > PATCH_LIMIT {
return "", fmt.Errorf("Patch is too long with %d bytes; limit %d bytes", len(patch), PATCH_LIMIT)
}
// If sha1 below ever changes, then isEmptyPatch in ctfe.js will also need to
// be updated.
patchHash := sha1.Sum([]byte(patch))
patchHashHex := hex.EncodeToString(patchHash[:])
gs, err := NewGcsUtil(nil)
if err != nil {
return "", err
}
gsDir := "patches"
patchFileName := fmt.Sprintf("%s.patch", patchHashHex)
gsPath := path.Join(gsDir, patchFileName)
res, err := gs.service.Objects.Get(GCSBucketName, gsPath).Do()
if err != nil {
sklog.Infof("This is expected for patches we have not seen before:\nCould not retrieve object metadata for %s: %s", gsPath, err)
}
if res == nil || res.Size != uint64(len(patch)) {
// Patch does not exist in Google Storage yet so upload it.
patchPath := filepath.Join(os.TempDir(), patchFileName)
if err := os.WriteFile(patchPath, []byte(patch), 0666); err != nil {
return "", err
}
defer util.Remove(patchPath)
if err := gs.UploadFile(patchFileName, os.TempDir(), gsDir); err != nil {
return "", err
}
}
return gsPath, nil
}
func GetPatchFromStorage(patchId string) (string, error) {
gs, err := NewGcsUtil(nil)
respBody, err := gs.GetRemoteFileContents(patchId)
if err != nil {
return "", fmt.Errorf("Could not fetch %s: %s", patchId, err)
}
defer util.Close(respBody)
patch, err := io.ReadAll(respBody)
if err != nil {
return "", fmt.Errorf("Could not read from %s: %s", patchId, err)
}
return string(patch), nil
}
func GetRankFromPageset(pagesetFileName string) (int, error) {
// All CT pagesets are of the form [rank].py so just stripping out the
// extension should give us the rank of the pageset.
var extension = filepath.Ext(pagesetFileName)
rank := pagesetFileName[0 : len(pagesetFileName)-len(extension)]
return strconv.Atoi(rank)
}
type Pageset struct {
UserAgent string `json:"user_agent"`
ArchiveDataFile string `json:"archive_data_file"`
UrlsList string `json:"urls_list"`
}
func WritePageset(filePath, userAgent, archiveFilePath, url string) error {
pageSet := Pageset{
UserAgent: userAgent,
ArchiveDataFile: archiveFilePath,
UrlsList: url,
}
b, err := json.Marshal(pageSet)
if err != nil {
return err
}
if err := os.WriteFile(filePath, b, 0644); err != nil {
return err
}
return nil
}
type TimeoutTracker struct {
timeoutCounter int
timeoutCounterMutex sync.Mutex
}
func (t *TimeoutTracker) Increment() {
t.timeoutCounterMutex.Lock()
defer t.timeoutCounterMutex.Unlock()
t.timeoutCounter++
}
func (t *TimeoutTracker) Reset() {
t.timeoutCounterMutex.Lock()
defer t.timeoutCounterMutex.Unlock()
t.timeoutCounter = 0
}
func (t *TimeoutTracker) Read() int {
t.timeoutCounterMutex.Lock()
defer t.timeoutCounterMutex.Unlock()
return t.timeoutCounter
}
// MkdirAll creates the specified path and logs an error if one is returned.
func MkdirAll(name string, perm os.FileMode) {
if err := os.MkdirAll(name, perm); err != nil {
sklog.ErrorfWithDepth(1, "Failed to MkdirAll(%s, %v): %v", name, perm, err)
}
}
// Rename renames the specified file and logs an error if one is returned.
func Rename(oldpath, newpath string) {
if err := os.Rename(oldpath, newpath); err != nil {
sklog.ErrorfWithDepth(1, "Failed to Rename(%s, %s): %v", oldpath, newpath, err)
}
}