blob: 73e4823ed7d1660a8275fa54136f215d43a863c4 [file] [log] [blame]
// Utility that contains methods for both CT master and worker scripts.
package util
import (
"bytes"
"context"
"crypto/sha1"
"encoding/csv"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math"
"os"
"path"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"time"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/exec"
"go.skia.org/infra/go/fileutil"
"go.skia.org/infra/go/gitiles"
"go.skia.org/infra/go/isolate"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/swarming"
"go.skia.org/infra/go/util"
)
const (
MAX_SYNC_TRIES = 3
TS_FORMAT = "20060102150405"
REMOVE_INVALID_SKPS_WORKER_POOL = 20
MAX_SIMULTANEOUS_SWARMING_TASKS_PER_RUN = 10000
PATCH_LIMIT = 1 << 26
)
func TimeTrack(start time.Time, name string) {
elapsed := time.Since(start)
sklog.Infof("===== %s took %s =====", name, elapsed)
}
// ExecuteCmd calls ExecuteCmdWithConfigurableLogging with logStdout and logStderr set to true.
func ExecuteCmd(ctx context.Context, binary string, args, env []string, timeout time.Duration, stdout, stderr io.Writer) error {
return ExecuteCmdWithConfigurableLogging(ctx, binary, args, env, timeout, stdout, stderr, true, true)
}
// ExecuteCmdWithConfigurableLogging executes the specified binary with the specified args and env.
// Stdout and Stderr are written to stdout and stderr respectively if specified. If not specified
// then Stdout and Stderr will be outputted only to sklog.
func ExecuteCmdWithConfigurableLogging(ctx context.Context, binary string, args, env []string, timeout time.Duration, stdout, stderr io.Writer, logStdout, logStderr bool) error {
return exec.Run(ctx, &exec.Command{
Name: binary,
Args: args,
Env: env,
InheritPath: true,
Timeout: timeout,
LogStdout: logStdout,
Stdout: stdout,
LogStderr: logStderr,
Stderr: stderr,
})
}
// SyncDir runs "git pull" and "gclient sync" on the specified directory.
// The revisions map enforces revision/hash for the solutions with the format
// branch@rev.
func SyncDir(ctx context.Context, dir string, revisions map[string]string, additionalArgs []string) error {
err := os.Chdir(dir)
if err != nil {
return fmt.Errorf("Could not chdir to %s: %s", dir, err)
}
for i := 0; i < MAX_SYNC_TRIES; i++ {
if i > 0 {
sklog.Warningf("%d. retry for syncing %s", i, dir)
}
err = syncDirStep(ctx, revisions, additionalArgs)
if err == nil {
break
}
sklog.Errorf("Error syncing %s: %s", dir, err)
}
if err != nil {
sklog.Errorf("Failed to sync %s after %d attempts", dir, MAX_SYNC_TRIES)
}
return err
}
func syncDirStep(ctx context.Context, revisions map[string]string, additionalArgs []string) error {
err := ExecuteCmd(ctx, BINARY_GIT, []string{"pull"}, []string{}, GIT_PULL_TIMEOUT, nil, nil)
if err != nil {
return fmt.Errorf("Error running git pull: %s", err)
}
syncCmd := []string{"sync", "--force"}
syncCmd = append(syncCmd, additionalArgs...)
for branch, rev := range revisions {
syncCmd = append(syncCmd, "--revision")
syncCmd = append(syncCmd, fmt.Sprintf("%s@%s", branch, rev))
}
err = ExecuteCmd(ctx, BINARY_GCLIENT, syncCmd, []string{}, GCLIENT_SYNC_TIMEOUT, nil, nil)
if err != nil {
return fmt.Errorf("Error running gclient sync: %s", err)
}
return nil
}
func runSkiaGnGen(ctx context.Context, clangLocation, gnExtraArgs string) error {
// Run "bin/fetch-gn".
util.LogErr(ExecuteCmd(ctx, "bin/fetch-gn", []string{}, []string{}, FETCH_GN_TIMEOUT, nil,
nil))
// gn gen out/Release '--args=cc="/home/chrome-bot/test/clang_linux/bin/clang" cxx="/home/chrome-bot/test/clang_linux/bin/clang++" extra_cflags=["-B/home/chrome-bot/test/clang_linux/bin"] extra_ldflags=["-B/home/chrome-bot/test/clang_linux/bin", "-fuse-ld=lld"] is_debug=false target_cpu="x86_64"'
gnArgs := fmt.Sprintf("--args=cc=\"%s/bin/clang\" cxx=\"%s/bin/clang++\" extra_cflags=[\"-B%s/bin\"] extra_ldflags=[\"-B%s/bin\", \"-fuse-ld=lld\"] is_debug=false target_cpu=\"x86_64\"", clangLocation, clangLocation, clangLocation, clangLocation)
if gnExtraArgs != "" {
gnArgs += " " + gnExtraArgs
}
if err := ExecuteCmd(ctx, "buildtools/linux64/gn", []string{"gen", "out/Release", gnArgs}, os.Environ(), GN_GEN_TIMEOUT, nil, nil); err != nil {
return fmt.Errorf("Error while running gn: %s", err)
}
return nil
}
// BuildSkiaSKPInfo builds "skpinfo" in the Skia trunk directory.
// Returns the directory that contains the binary.
func BuildSkiaSKPInfo(ctx context.Context, runningOnSwarming bool) (string, error) {
if err := os.Chdir(SkiaTreeDir); err != nil {
return "", fmt.Errorf("Could not chdir to %s: %s", SkiaTreeDir, err)
}
// Run gn gen
clangDir := filepath.Join(filepath.Dir(filepath.Dir(os.Args[0])), "clang_linux")
if err := runSkiaGnGen(ctx, clangDir, "" /* gnExtraArgs */); err != nil {
return "", fmt.Errorf("Error while running gn: %s", err)
}
// Run "ninja -C out/Release -j100 skpinfo".
// Use the full system env when building.
outPath := filepath.Join(SkiaTreeDir, "out", "Release")
args := []string{"-C", outPath, "-j100", BINARY_SKPINFO}
return outPath, ExecuteCmd(ctx, filepath.Join(DepotToolsDir, "ninja"), args, os.Environ(), NINJA_TIMEOUT, nil, nil)
}
// BuildSkiaLuaPictures builds "lua_pictures" in the Skia trunk directory.
// Returns the directory that contains the binary.
func BuildSkiaLuaPictures(ctx context.Context, runningOnSwarming bool) (string, error) {
if err := os.Chdir(SkiaTreeDir); err != nil {
return "", fmt.Errorf("Could not chdir to %s: %s", SkiaTreeDir, err)
}
// Run gn gen
clangDir := filepath.Join(filepath.Dir(filepath.Dir(os.Args[0])), "clang_linux")
if err := runSkiaGnGen(ctx, clangDir, "skia_use_lua=true"); err != nil {
return "", fmt.Errorf("Error while running gn: %s", err)
}
// Run "ninja -C out/Release -j100 lua_pictures".
// Use the full system env when building.
outPath := filepath.Join(SkiaTreeDir, "out", "Release")
args := []string{"-C", outPath, "-j100", BINARY_LUA_PICTURES}
return outPath, ExecuteCmd(ctx, filepath.Join(DepotToolsDir, "ninja"), args, os.Environ(), NINJA_TIMEOUT, nil, nil)
}
// GetCipdPackageFromAsset returns a string of the format "path:package_name:version".
// It returns the latest version of the asset via gitiles.
func GetCipdPackageFromAsset(assetName string) (string, error) {
// Find the latest version of the asset from gitiles.
assetVersionFilePath := path.Join("infra", "bots", "assets", assetName, "VERSION")
var buf bytes.Buffer
if err := gitiles.NewRepo(common.REPO_SKIA, "", nil).ReadFile(assetVersionFilePath, &buf); err != nil {
return "", err
}
return fmt.Sprintf("%s:skia/bots/%s:version:%s", assetName, assetName, strings.TrimSpace(buf.String())), nil
}
// ResetCheckout resets the specified Git checkout.
func ResetCheckout(ctx context.Context, dir, resetTo string) error {
if err := os.Chdir(dir); err != nil {
return fmt.Errorf("Could not chdir to %s: %s", dir, err)
}
// Clear out remnants of incomplete rebases from .git/rebase-apply.
rebaseArgs := []string{"rebase", "--abort"}
util.LogErr(ExecuteCmd(ctx, BINARY_GIT, rebaseArgs, []string{}, GIT_REBASE_TIMEOUT, nil, nil))
// Make sure we are on master branch and not stuck in a rebase branch for whatever reason.
branchArgs := []string{"checkout", "master"}
util.LogErr(ExecuteCmd(ctx, BINARY_GIT, branchArgs, []string{}, GIT_BRANCH_TIMEOUT, nil, nil))
// Run "git reset --hard HEAD"
resetArgs := []string{"reset", "--hard", resetTo}
util.LogErr(ExecuteCmd(ctx, BINARY_GIT, resetArgs, []string{}, GIT_RESET_TIMEOUT, nil, nil))
// Run "git clean -f -d"
cleanArgs := []string{"clean", "-f", "-d"}
util.LogErr(ExecuteCmd(ctx, BINARY_GIT, cleanArgs, []string{}, GIT_CLEAN_TIMEOUT, nil, nil))
return nil
}
// ApplyPatch applies a patch to a Git checkout.
func ApplyPatch(ctx context.Context, patch, dir string) error {
if err := os.Chdir(dir); err != nil {
return fmt.Errorf("Could not chdir to %s: %s", dir, err)
}
// Run "git apply --index -p1 --verbose --ignore-whitespace
// --ignore-space-change ${PATCH_FILE}"
args := []string{"apply", "--index", "-p1", "--verbose", "--ignore-whitespace", "--ignore-space-change", patch}
return ExecuteCmd(ctx, BINARY_GIT, args, []string{}, GIT_APPLY_TIMEOUT, nil, nil)
}
// CleanTmpDir deletes all tmp files from the caller because telemetry tends to
// generate a lot of temporary artifacts there and they take up root disk space.
func CleanTmpDir() {
files, _ := ioutil.ReadDir(os.TempDir())
for _, f := range files {
util.RemoveAll(filepath.Join(os.TempDir(), f.Name()))
}
}
func GetTimeFromTs(formattedTime string) time.Time {
t, _ := time.Parse(TS_FORMAT, formattedTime)
return t
}
func GetCurrentTs() string {
return time.Now().UTC().Format(TS_FORMAT)
}
// Returns channel that contains all pageset file names without the timestamp
// file and pyc files.
func GetClosedChannelOfPagesets(fileInfos []os.FileInfo) chan string {
pagesetsChannel := make(chan string, len(fileInfos))
for _, fileInfo := range fileInfos {
pagesetName := fileInfo.Name()
pagesetBaseName := filepath.Base(pagesetName)
if filepath.Ext(pagesetBaseName) == ".pyc" {
// Ignore .pyc files.
continue
}
pagesetsChannel <- pagesetName
}
close(pagesetsChannel)
return pagesetsChannel
}
// Running benchmarks in parallel leads to multiple chrome instances coming up
// at the same time, when there are crashes chrome processes stick around which
// can severely impact the machine's performance. To stop this from
// happening chrome zombie processes are periodically killed.
func ChromeProcessesCleaner(ctx context.Context, locker sync.Locker, chromeCleanerTimer time.Duration) {
for range time.Tick(chromeCleanerTimer) {
sklog.Info("The chromeProcessesCleaner goroutine has started")
sklog.Info("Waiting for all existing tasks to complete before killing zombie chrome processes")
locker.Lock()
util.LogErr(ExecuteCmd(ctx, "pkill", []string{"-9", "chrome"}, []string{}, PKILL_TIMEOUT, nil, nil))
locker.Unlock()
}
}
// Contains the data included in CT pagesets.
type PagesetVars struct {
// A comma separated list of URLs.
UrlsList string `json:"urls_list"`
// Will be either "mobile" or "desktop".
UserAgent string `json:"user_agent"`
// The location of the web page's WPR data file.
ArchiveDataFile string `json:"archive_data_file"`
}
func ReadPageset(pagesetPath string) (PagesetVars, error) {
decodedPageset := PagesetVars{}
pagesetContent, err := os.Open(pagesetPath)
defer util.Close(pagesetContent)
if err != nil {
return decodedPageset, fmt.Errorf("Could not read %s: %s", pagesetPath, err)
}
if err := json.NewDecoder(pagesetContent).Decode(&decodedPageset); err != nil {
return decodedPageset, fmt.Errorf("Could not JSON decode %s: %s", pagesetPath, err)
}
return decodedPageset, nil
}
// ValidateSKPs moves all root_dir/index/dir_name/*.skp into the root_dir/index
// and validates them. SKPs that fail validation are logged and deleted.
func ValidateSKPs(ctx context.Context, pathToSkps, pathToPyFiles, pathToSkpinfo string) error {
// This slice will be used to run remove_invalid_skp.py.
skps := []string{}
// List all directories in pathToSkps and copy out the skps.
indexDirs, err := filepath.Glob(path.Join(pathToSkps, "*"))
if err != nil {
return fmt.Errorf("Unable to read %s: %s", pathToSkps, err)
}
for _, indexDir := range indexDirs {
index := path.Base(indexDir)
skpFileInfos, err := ioutil.ReadDir(indexDir)
if err != nil {
return fmt.Errorf("Unable to read %s: %s", indexDir, err)
}
for _, fileInfo := range skpFileInfos {
if !fileInfo.IsDir() {
// We are only interested in directories.
continue
}
skpName := fileInfo.Name()
// Find the largest layer in this directory.
layerInfos, err := ioutil.ReadDir(filepath.Join(pathToSkps, index, skpName))
if err != nil {
sklog.Errorf("Unable to read %s: %s", filepath.Join(pathToSkps, index, skpName), err)
}
if len(layerInfos) > 0 {
largestLayerInfo := layerInfos[0]
for _, layerInfo := range layerInfos {
if layerInfo.Size() > largestLayerInfo.Size() {
largestLayerInfo = layerInfo
}
}
// Only save SKPs greater than 6000 bytes. Less than that are probably
// malformed.
if largestLayerInfo.Size() > 6000 {
layerPath := filepath.Join(pathToSkps, index, skpName, largestLayerInfo.Name())
destSKP := filepath.Join(pathToSkps, index, skpName+".skp")
util.Rename(layerPath, destSKP)
skps = append(skps, destSKP)
} else {
sklog.Warningf("Skipping %s because size was less than 6000 bytes", skpName)
}
}
// We extracted what we needed from the directory, now delete it.
util.RemoveAll(filepath.Join(pathToSkps, index, skpName))
}
}
// Create channel that contains all SKP file paths. This channel will
// be consumed by the worker pool below to run remove_invalid_skp.py in
// parallel.
skpsChannel := make(chan string, len(skps))
for _, skp := range skps {
skpsChannel <- skp
}
close(skpsChannel)
sklog.Info("Calling remove_invalid_skp.py")
pathToRemoveSKPs := filepath.Join(pathToPyFiles, "remove_invalid_skp.py")
var wg sync.WaitGroup
// Loop through workers in the worker pool.
for i := 0; i < REMOVE_INVALID_SKPS_WORKER_POOL; i++ {
// Increment the WaitGroup counter.
wg.Add(1)
// Create and run a goroutine closure that captures SKPs.
go func(i int) {
// Decrement the WaitGroup counter when the goroutine completes.
defer wg.Done()
for skpPath := range skpsChannel {
args := []string{
pathToRemoveSKPs,
"--path_to_skp=" + skpPath,
"--path_to_skpinfo=" + pathToSkpinfo,
}
sklog.Infof("Executing remove_invalid_skp.py with goroutine#%d", i+1)
// Execute the command with stdout not logged. It otherwise outputs
// tons of log msgs.
util.LogErr(exec.Run(ctx, &exec.Command{
Name: "python",
Args: args,
Env: []string{},
InheritPath: true,
Timeout: REMOVE_INVALID_SKPS_TIMEOUT,
LogStdout: false,
Stdout: nil,
LogStderr: true,
Stderr: nil,
}))
}
}(i)
}
// Wait for all spawned goroutines to complete.
wg.Wait()
return nil
}
// GetStartRange returns the range worker should start processing at based on its num and how many
// pages it is allowed to process.
func GetStartRange(workerNum, numPagesPerBot int) int {
return ((workerNum - 1) * numPagesPerBot) + 1
}
// GetNumPagesPerBot returns the number of web pages each worker should process.
func GetNumPagesPerBot(repeatValue, maxPagesPerBot int) int {
return int(math.Ceil(float64(maxPagesPerBot) / float64(repeatValue)))
}
// TriggerSwarmingTask returns the number of triggered tasks and an error (if any).
func TriggerSwarmingTask(ctx context.Context, pagesetType, taskPrefix, isolateName, runID, serviceAccountJSON string, hardTimeout, ioTimeout time.Duration, priority, maxPagesPerBot, numPages int, isolateExtraArgs map[string]string, runOnGCE, local bool, repeatValue int, isolateDeps []string) (int, error) {
// Instantiate the swarming client.
workDir, err := ioutil.TempDir(StorageDir, "swarming_work_")
if err != nil {
return 0, fmt.Errorf("Could not get temp dir: %s", err)
}
s, err := swarming.NewSwarmingClient(ctx, workDir, swarming.SWARMING_SERVER_PRIVATE, isolate.ISOLATE_SERVER_URL_PRIVATE, serviceAccountJSON)
if err != nil {
// Cleanup workdir.
if err := os.RemoveAll(workDir); err != nil {
sklog.Errorf("Could not cleanup swarming work dir: %s", err)
}
return 0, fmt.Errorf("Could not instantiate swarming client: %s", err)
}
defer s.Cleanup()
isolateTasks := []*isolate.Task{}
// Get path to isolate files.
pathToIsolates := GetPathToIsolates(local)
numPagesPerBot := GetNumPagesPerBot(repeatValue, maxPagesPerBot)
numTasks := int(math.Ceil(float64(numPages) / float64(numPagesPerBot)))
for i := 1; i <= numTasks; i++ {
isolateArgs := map[string]string{
"START_RANGE": strconv.Itoa(GetStartRange(i, numPagesPerBot)),
"NUM": strconv.Itoa(numPagesPerBot),
}
if pagesetType != "" {
isolateArgs["PAGESET_TYPE"] = pagesetType
}
// Add isolateExtraArgs (if specified) into the isolateArgs.
for k, v := range isolateExtraArgs {
isolateArgs[k] = v
}
isolateTask := &isolate.Task{
BaseDir: pathToIsolates,
Blacklist: []string{},
IsolateFile: path.Join(pathToIsolates, isolateName),
Deps: isolateDeps,
ExtraVars: isolateArgs,
OsType: "linux",
}
isolateTasks = append(isolateTasks, isolateTask)
}
// Isolate the tasks. Do not isolate more than 1000 at a time.
tasksToHashes := map[string]string{}
countOfTasks := 0
for i := 0; i < len(isolateTasks); i += 1000 {
startRange := i
endRange := util.MinInt(len(isolateTasks), i+1000)
hashes, err := s.GetIsolateClient().IsolateTasks(ctx, isolateTasks[startRange:endRange])
if err != nil {
return numTasks, fmt.Errorf("Could not isolate targets: %s", err)
}
// Add the above hashes to tasksToHashes.
for _, h := range hashes {
countOfTasks++
taskName := fmt.Sprintf("%s_%d", taskPrefix, countOfTasks)
tasksToHashes[taskName] = h
}
// Sleep for a sec to give the swarming server some time to recuperate.
time.Sleep(time.Second)
}
if len(isolateTasks) != len(tasksToHashes) {
return numTasks, fmt.Errorf("len(isolateTasks) was %d and len(tasksToHashes) was %d", len(isolateTasks), len(tasksToHashes))
}
var dimensions map[string]string
if runOnGCE {
dimensions = GCE_WORKER_DIMENSIONS
} else {
dimensions = GOLO_WORKER_DIMENSIONS
}
// The channel where batches of tasks to be triggered and collected will be sent to.
chTasks := make(chan map[string]string)
// Kick off one goroutine to populate the above channel.
go func() {
defer close(chTasks)
tmpMap := map[string]string{}
for task, hash := range tasksToHashes {
if len(tmpMap) >= MAX_SIMULTANEOUS_SWARMING_TASKS_PER_RUN {
// Add the map to the channel.
chTasks <- tmpMap
// Reinitialize the temporary map.
tmpMap = map[string]string{}
}
tmpMap[task] = hash
}
chTasks <- tmpMap
}()
// Trigger and collect swarming tasks.
for taskMap := range chTasks {
// Trigger swarming using the isolate hashes.
tasks, err := s.TriggerSwarmingTasks(ctx, taskMap, dimensions, map[string]string{"runid": runID}, []string{}, priority, 7*24*time.Hour, hardTimeout, ioTimeout, false, true, getServiceAccount(dimensions))
if err != nil {
return numTasks, fmt.Errorf("Could not trigger swarming tasks: %s", err)
}
// Collect all tasks and retrigger the ones that fail. Do this in a goroutine for
// each task so that it is done in parallel and retries are immediately triggered
// instead of at the end (see skbug.com/8191).
var wg sync.WaitGroup
for _, task := range tasks {
wg.Add(1)
task := task // https://golang.org/doc/faq#closures_and_goroutines
go func() {
defer wg.Done()
if _, _, err := task.Collect(ctx, s); err != nil {
sklog.Errorf("task %s failed: %s", task.Title, err)
sklog.Infof("Retrying task %s", task.Title)
retryTask, err := s.TriggerSwarmingTasks(ctx, map[string]string{task.Title: tasksToHashes[task.Title]}, dimensions, map[string]string{"runid": runID}, []string{}, priority, 7*24*time.Hour, hardTimeout, ioTimeout, false, true, getServiceAccount(dimensions))
if err != nil {
sklog.Errorf("Could not trigger retry of task %s: %s", task.Title, err)
return
}
// Collect the retried task.
if _, _, err := retryTask[0].Collect(ctx, s); err != nil {
sklog.Errorf("task %s failed inspite of a retry: %s", retryTask[0].Title, err)
return
}
}
}()
}
wg.Wait()
}
return numTasks, nil
}
// getServiceAccount returns the service account that should be used when triggering swarming tasks.
func getServiceAccount(dimensions map[string]string) string {
serviceAccount := ""
if util.MapsEqual(dimensions, GCE_WORKER_DIMENSIONS) || util.MapsEqual(dimensions, GCE_LINUX_BUILDER_DIMENSIONS) || util.MapsEqual(dimensions, GCE_ANDROID_BUILDER_DIMENSIONS) {
// GCE bots need to use "bot". See skbug.com/6611.
serviceAccount = "bot"
}
return serviceAccount
}
// GetPathToIsolates returns the location of CT's isolates.
func GetPathToIsolates(local bool) string {
if local {
_, currentFile, _, _ := runtime.Caller(0)
return filepath.Join(filepath.Dir(filepath.Dir(filepath.Dir(currentFile))), "isolates")
} else {
return filepath.Join("/", "usr", "local", "share", "ct-master", "isolates")
}
}
// GetPathToPyFiles returns the location of CT's python scripts.
// local should be set to true if we need the location of py files when debugging locally.
// runOnMaster should be set if we need the location of py files on the ct-master.
// If both are false then it is assumed that we are running on a swarming bot.
func GetPathToPyFiles(local, runOnMaster bool) string {
if local {
_, currentFile, _, _ := runtime.Caller(0)
return filepath.Join(filepath.Dir(filepath.Dir(filepath.Dir(currentFile))), "py")
} else if runOnMaster {
return filepath.Join("/", "usr", "local", "share", "ct-master", "py")
} else {
return filepath.Join(filepath.Dir(filepath.Dir(os.Args[0])), "share", "ct-master", "py")
}
}
// GetPathToTelemetryBinaries returns the location of Telemetry binaries.
func GetPathToTelemetryBinaries(local bool) string {
if local {
return TelemetryBinariesDir
} else {
return filepath.Join(filepath.Dir(filepath.Dir(os.Args[0])), "tools", "perf")
}
}
func MergeUploadCSVFiles(ctx context.Context, runID, pathToPyFiles string, gs *GcsUtil, totalPages, maxPagesPerBot int, handleStrings bool, repeatValue int) (string, []string, error) {
localOutputDir := filepath.Join(StorageDir, BenchmarkRunsDir, runID)
util.MkdirAll(localOutputDir, 0700)
noOutputSlaves := []string{}
// Copy outputs from all slaves locally.
numPagesPerBot := GetNumPagesPerBot(repeatValue, maxPagesPerBot)
numTasks := int(math.Ceil(float64(totalPages) / float64(numPagesPerBot)))
for i := 1; i <= numTasks; i++ {
startRange := GetStartRange(i, numPagesPerBot)
workerLocalOutputPath := filepath.Join(localOutputDir, strconv.Itoa(startRange)+".csv")
workerRemoteOutputPath := filepath.Join(BenchmarkRunsDir, runID, strconv.Itoa(startRange), "outputs", runID+".output")
respBody, err := gs.GetRemoteFileContents(workerRemoteOutputPath)
if err != nil {
sklog.Errorf("Could not fetch %s: %s", workerRemoteOutputPath, err)
noOutputSlaves = append(noOutputSlaves, strconv.Itoa(i))
continue
}
defer util.Close(respBody)
out, err := os.Create(workerLocalOutputPath)
if err != nil {
return "", noOutputSlaves, fmt.Errorf("Unable to create file %s: %s", workerLocalOutputPath, err)
}
defer util.Close(out)
defer util.Remove(workerLocalOutputPath)
if _, err = io.Copy(out, respBody); err != nil {
return "", noOutputSlaves, fmt.Errorf("Unable to copy to file %s: %s", workerLocalOutputPath, err)
}
// If an output is less than 20 bytes that means something went wrong on the slave.
outputInfo, err := out.Stat()
if err != nil {
return "", noOutputSlaves, fmt.Errorf("Unable to stat file %s: %s", workerLocalOutputPath, err)
}
if outputInfo.Size() <= 20 {
sklog.Errorf("Output file was less than 20 bytes %s: %s", workerLocalOutputPath, err)
noOutputSlaves = append(noOutputSlaves, strconv.Itoa(i))
continue
}
}
// Call csv_merger.py to merge all results into a single results CSV.
pathToCsvMerger := filepath.Join(pathToPyFiles, "csv_merger.py")
outputFileName := runID + ".output"
outputFilePath := filepath.Join(localOutputDir, outputFileName)
args := []string{
pathToCsvMerger,
"--csv_dir=" + localOutputDir,
"--output_csv_name=" + outputFilePath,
}
if handleStrings {
args = append(args, "--handle_strings")
}
err := ExecuteCmd(ctx, "python", args, []string{}, CSV_MERGER_TIMEOUT, nil, nil)
if err != nil {
return outputFilePath, noOutputSlaves, fmt.Errorf("Error running csv_merger.py: %s", err)
}
// Copy the output file to Google Storage.
remoteOutputDir := filepath.Join(BenchmarkRunsDir, runID, "consolidated_outputs")
if err := gs.UploadFile(outputFileName, localOutputDir, remoteOutputDir); err != nil {
return outputFilePath, noOutputSlaves, fmt.Errorf("Unable to upload %s to %s: %s", outputFileName, remoteOutputDir, err)
}
return outputFilePath, noOutputSlaves, nil
}
// GetRepeatValue returns the defaultValue if "--pageset-repeat" is not specified in benchmarkArgs.
func GetRepeatValue(benchmarkArgs string, defaultValue int) int {
return GetIntFlagValue(benchmarkArgs, PAGESET_REPEAT_FLAG, defaultValue)
}
// GetRunBenchmarkTimeoutValue returns the defaultValue if "--run_benchmark_timeout" is not specified in benchmarkArgs.
func GetRunBenchmarkTimeoutValue(benchmarkArgs string, defaultValue int) int {
return GetIntFlagValue(benchmarkArgs, RUN_BENCHMARK_TIMEOUT_FLAG, defaultValue)
}
// GetMaxPagesPerBotValue returns the defaultValue if "--max-pages-per-bot" is not specified in benchmarkArgs.
func GetMaxPagesPerBotValue(benchmarkArgs string, defaultValue int) int {
return GetIntFlagValue(benchmarkArgs, MAX_PAGES_PER_BOT, defaultValue)
}
// GetNumAnalysisRetriesValue returns the defaultValue if "--num-analysis-retries" is not specified in benchmarkArgs.
func GetNumAnalysisRetriesValue(benchmarkArgs string, defaultValue int) int {
return GetIntFlagValue(benchmarkArgs, NUM_ANALYSIS_RETRIES, defaultValue)
}
// GetIntFlagValue returns the defaultValue if the specified flag name is not in benchmarkArgs.
func GetIntFlagValue(benchmarkArgs, flagName string, defaultValue int) int {
if strings.Contains(benchmarkArgs, flagName) {
r := regexp.MustCompile(flagName + `[= ](\d+)`)
m := r.FindStringSubmatch(benchmarkArgs)
if len(m) != 0 {
ret, err := strconv.Atoi(m[1])
if err != nil {
return defaultValue
}
return ret
}
}
// If we reached here then return the default Value.
return defaultValue
}
func RemoveFlagsFromArgs(benchmarkArgs string, flags ...string) string {
for _, f := range flags {
re, err := regexp.Compile(fmt.Sprintf(`\s*%s(=[[:alnum:]]*)?\s*`, f))
if err != nil {
sklog.Warningf("Could not compile flag regex with %s: %s", f, err)
continue
}
benchmarkArgs = re.ReplaceAllString(benchmarkArgs, " ")
}
// Remove extra whitespace.
return strings.Join(strings.Fields(benchmarkArgs), " ")
}
// RunBenchmark runs the specified benchmark with the specified arguments. It prints the output of
// the run_benchmark command and also returns the output incase the caller needs to do any
// post-processing on it. Incase of any errors the output will be empty.
func RunBenchmark(ctx context.Context, fileInfoName, pathToPagesets, pathToPyFiles, localOutputDir, chromiumBuildName, chromiumBinary, runID, browserExtraArgs, benchmarkName, targetPlatform, benchmarkExtraArgs, pagesetType string, defaultRepeatValue int, runOnSwarming bool) (string, error) {
pagesetBaseName := filepath.Base(fileInfoName)
if filepath.Ext(pagesetBaseName) == ".pyc" {
// Ignore .pyc files.
return "", nil
}
// Read the pageset.
pagesetName := strings.TrimSuffix(pagesetBaseName, filepath.Ext(pagesetBaseName))
pagesetPath := filepath.Join(pathToPagesets, fileInfoName)
decodedPageset, err := ReadPageset(pagesetPath)
if err != nil {
return "", fmt.Errorf("Could not read %s: %s", pagesetPath, err)
}
sklog.Infof("===== Processing %s for %s =====", pagesetPath, runID)
args := []string{
filepath.Join(GetPathToTelemetryBinaries(!runOnSwarming), BINARY_RUN_BENCHMARK),
benchmarkName,
"--also-run-disabled-tests",
"--user-agent=" + decodedPageset.UserAgent,
"--urls-list=" + decodedPageset.UrlsList,
"--archive-data-file=" + decodedPageset.ArchiveDataFile,
}
// Need to capture output for all benchmarks.
outputDirArgValue := filepath.Join(localOutputDir, pagesetName)
args = append(args, "--output-dir="+outputDirArgValue)
// Figure out which browser and device should be used.
if targetPlatform == PLATFORM_ANDROID {
if err := InstallChromeAPK(ctx, chromiumBuildName); err != nil {
return "", fmt.Errorf("Error while installing APK: %s", err)
}
args = append(args, "--browser=android-chromium")
} else {
args = append(args, "--browser=exact", "--browser-executable="+chromiumBinary)
args = append(args, "--device=desktop")
}
// Calculate the timeout.
timeoutSecs := GetRunBenchmarkTimeoutValue(benchmarkExtraArgs, PagesetTypeToInfo[pagesetType].RunChromiumPerfTimeoutSecs)
repeatBenchmark := GetRepeatValue(benchmarkExtraArgs, defaultRepeatValue)
if repeatBenchmark > 0 {
args = append(args, fmt.Sprintf("%s=%d", PAGESET_REPEAT_FLAG, repeatBenchmark))
// Increase the timeoutSecs if repeats are used.
timeoutSecs = timeoutSecs * repeatBenchmark
}
sklog.Infof("Using %d seconds for timeout", timeoutSecs)
// Remove from benchmarkExtraArgs "special" flags that are recognized by CT but not
// by the run_benchmark script.
benchmarkExtraArgs = RemoveFlagsFromArgs(benchmarkExtraArgs, RUN_BENCHMARK_TIMEOUT_FLAG, MAX_PAGES_PER_BOT, NUM_ANALYSIS_RETRIES)
// Split benchmark args if not empty and append to args.
if benchmarkExtraArgs != "" {
args = append(args, strings.Fields(benchmarkExtraArgs)...)
}
// Add browserArgs if not empty to args.
if browserExtraArgs != "" {
args = append(args, "--extra-browser-args="+browserExtraArgs)
}
// Set the DISPLAY.
env := []string{
"DISPLAY=:0",
}
// Append the original environment as well.
for _, e := range os.Environ() {
env = append(env, e)
}
if targetPlatform == PLATFORM_ANDROID {
// Reset android logcat prior to the run so that we can examine the logs later.
util.LogErr(ExecuteCmd(ctx, BINARY_ADB, []string{"logcat", "-c"}, []string{}, ADB_ROOT_TIMEOUT, nil, nil))
}
// Create buffer for capturing the stdout and stderr of the benchmark run.
var b bytes.Buffer
if _, err := b.WriteString(fmt.Sprintf("========== Stdout and stderr for %s ==========\n", pagesetPath)); err != nil {
return "", fmt.Errorf("Error writing to output buffer: %s", err)
}
if err := ExecuteCmdWithConfigurableLogging(ctx, "python", args, env, time.Duration(timeoutSecs)*time.Second, &b, &b, false, false); err != nil {
if targetPlatform == PLATFORM_ANDROID {
// Kill the port-forwarder to start from a clean slate.
util.LogErr(ExecuteCmdWithConfigurableLogging(ctx, "pkill", []string{"-f", "forwarder_host"}, []string{}, PKILL_TIMEOUT, &b, &b, false, false))
}
output, getErr := GetRunBenchmarkOutput(b)
util.LogErr(getErr)
fmt.Println(output)
return "", fmt.Errorf("Run benchmark command failed with: %s", err)
}
// Append logcat output if we ran on Android.
if targetPlatform == PLATFORM_ANDROID {
if err := ExecuteCmdWithConfigurableLogging(ctx, BINARY_ADB, []string{"logcat", "-d"}, env, ADB_ROOT_TIMEOUT, &b, &b, false, false); err != nil {
return "", fmt.Errorf("Error running logcat -d: %s", err)
}
}
output, err := GetRunBenchmarkOutput(b)
if err != nil {
return "", fmt.Errorf("Could not get run benchmark output: %s", err)
}
// Print the output and return.
fmt.Println(output)
return output, nil
}
func GetRunBenchmarkOutput(b bytes.Buffer) (string, error) {
if _, err := b.WriteString("===================="); err != nil {
return "", fmt.Errorf("Error writing to output buffer: %s", err)
}
return b.String(), nil
}
func MergeUploadCSVFilesOnWorkers(ctx context.Context, localOutputDir, pathToPyFiles, runID, remoteDir, valueColumnName string, gs *GcsUtil, startRange int, handleStrings, addRanks bool, pageRankToAdditionalFields map[string]map[string]string) error {
// Move all results into a single directory.
fileInfos, err := ioutil.ReadDir(localOutputDir)
if err != nil {
return fmt.Errorf("Unable to read %s: %s", localOutputDir, err)
}
for _, fileInfo := range fileInfos {
if !fileInfo.IsDir() {
continue
}
outputFile := filepath.Join(localOutputDir, fileInfo.Name(), "results.csv")
newFile := filepath.Join(localOutputDir, fmt.Sprintf("%s.csv", fileInfo.Name()))
if err := os.Rename(outputFile, newFile); err != nil {
sklog.Errorf("Could not rename %s to %s: %s", outputFile, newFile, err)
continue
}
if addRanks || len(pageRankToAdditionalFields) != 0 {
headers, values, err := GetRowsFromCSV(newFile)
if err != nil {
sklog.Errorf("Could not read %s: %s", newFile, err)
continue
}
// Add the rank of the page to the CSV file.
pageRank := fileInfo.Name()
pageNameWithRank := ""
for i := range headers {
for j := range values {
if headers[i] == "stories" && addRanks {
pageNameWithRank = fmt.Sprintf("%s (#%s)", values[j][i], pageRank)
values[j][i] = pageNameWithRank
}
}
}
// Add additionalFields (if any) to the output CSV.
if additionalFields, ok := pageRankToAdditionalFields[fileInfo.Name()]; ok {
for h, v := range additionalFields {
valueLine := make([]string, len(headers))
for i := range headers {
if headers[i] == "name" {
valueLine[i] = h
} else if headers[i] == valueColumnName {
valueLine[i] = v
} else if headers[i] == "stories" && addRanks {
valueLine[i] = pageNameWithRank
} else {
valueLine[i] = ""
}
}
values = append(values, valueLine)
}
}
if err := writeRowsToCSV(newFile, headers, values); err != nil {
sklog.Errorf("Could not write to %s: %s", newFile, err)
continue
}
}
}
// Call csv_pivot_table_merger.py to merge all results into a single results CSV.
pathToCsvMerger := filepath.Join(pathToPyFiles, "csv_pivot_table_merger.py")
outputFileName := runID + ".output"
args := []string{
pathToCsvMerger,
"--csv_dir=" + localOutputDir,
"--output_csv_name=" + filepath.Join(localOutputDir, outputFileName),
"--value_column_name=" + valueColumnName,
}
if handleStrings {
args = append(args, "--handle_strings")
}
err = ExecuteCmd(ctx, "python", args, []string{}, CSV_PIVOT_TABLE_MERGER_TIMEOUT, nil, nil)
if err != nil {
return fmt.Errorf("Error running csv_pivot_table_merger.py: %s", err)
}
// Check to see if the output CSV has more than just the header line.
// TODO(rmistry): Inefficient to count all the lines when we really only want to know if
// it's > or <= 1 line.
lines, err := fileutil.CountLines(filepath.Join(localOutputDir, outputFileName))
if err != nil {
return fmt.Errorf("Could not count lines from %s: %s", filepath.Join(localOutputDir, outputFileName), err)
}
if lines <= 1 {
return fmt.Errorf("%s has %d lines. More than 1 line is expected.", filepath.Join(localOutputDir, outputFileName), lines)
}
// Copy the output file to Google Storage.
remoteOutputDir := filepath.Join(remoteDir, strconv.Itoa(startRange), "outputs")
if err := gs.UploadFile(outputFileName, localOutputDir, remoteOutputDir); err != nil {
return fmt.Errorf("Unable to upload %s to %s: %s", outputFileName, remoteOutputDir, err)
}
return nil
}
// GetRowsFromCSV reads the provided CSV and returns it's headers (first row)
// and values (all other rows).
func GetRowsFromCSV(csvPath string) ([]string, [][]string, error) {
csvFile, err := os.Open(csvPath)
defer util.Close(csvFile)
if err != nil {
return nil, nil, fmt.Errorf("Could not open %s: %s", csvPath, err)
}
reader := csv.NewReader(csvFile)
reader.FieldsPerRecord = -1
rawCSVdata, err := reader.ReadAll()
if err != nil {
return nil, nil, fmt.Errorf("Could not read %s: %s", csvPath, err)
}
if len(rawCSVdata) < 2 {
return nil, nil, fmt.Errorf("No data in %s", csvPath)
}
return rawCSVdata[0], rawCSVdata[1:], nil
}
func writeRowsToCSV(csvPath string, headers []string, values [][]string) error {
csvFile, err := os.OpenFile(csvPath, os.O_WRONLY, 666)
defer util.Close(csvFile)
if err != nil {
return fmt.Errorf("Could not open %s: %s", csvPath, err)
}
writer := csv.NewWriter(csvFile)
defer writer.Flush()
// Write the headers.
if err := writer.Write(headers); err != nil {
return fmt.Errorf("Could not write to %s: %s", csvPath, err)
}
// Write all values.
for _, row := range values {
if err := writer.Write(row); err != nil {
return fmt.Errorf("Could not write to %s: %s", csvPath, err)
}
}
return nil
}
// TriggerIsolateTelemetrySwarmingTask creates a isolated.gen.json file using ISOLATE_TELEMETRY_ISOLATE,
// archives it, and triggers its swarming task. The swarming task will run the isolate_telemetry
// worker script which will return the isolate hash.
func TriggerIsolateTelemetrySwarmingTask(ctx context.Context, taskName, runID, chromiumHash, serviceAccountJSON string, patches []string, hardTimeout, ioTimeout time.Duration, local bool) (string, error) {
// Instantiate the swarming client.
workDir, err := ioutil.TempDir(StorageDir, "swarming_work_")
if err != nil {
return "", fmt.Errorf("Could not get temp dir: %s", err)
}
s, err := swarming.NewSwarmingClient(ctx, workDir, swarming.SWARMING_SERVER_PRIVATE, isolate.ISOLATE_SERVER_URL_PRIVATE, serviceAccountJSON)
if err != nil {
// Cleanup workdir.
if err := os.RemoveAll(workDir); err != nil {
sklog.Errorf("Could not cleanup swarming work dir: %s", err)
}
return "", fmt.Errorf("Could not instantiate swarming client: %s", err)
}
defer s.Cleanup()
// Create isolated.gen.json.
// Get path to isolate files.
pathToIsolates := GetPathToIsolates(local)
isolateArgs := map[string]string{
"RUN_ID": runID,
"CHROMIUM_HASH": chromiumHash,
"PATCHES": strings.Join(patches, ","),
}
genJSON, err := s.CreateIsolatedGenJSON(path.Join(pathToIsolates, ISOLATE_TELEMETRY_ISOLATE), s.WorkDir, "linux", taskName, isolateArgs, []string{})
if err != nil {
return "", fmt.Errorf("Could not create isolated.gen.json for task %s: %s", taskName, err)
}
// Batcharchive the task.
tasksToHashes, err := s.BatchArchiveTargets(ctx, []string{genJSON}, BATCHARCHIVE_TIMEOUT)
if err != nil {
return "", fmt.Errorf("Could not batch archive target: %s", err)
}
// Trigger swarming using the isolate hash.
tasks, err := s.TriggerSwarmingTasks(ctx, tasksToHashes, GCE_LINUX_BUILDER_DIMENSIONS, map[string]string{"runid": runID}, []string{}, swarming.RECOMMENDED_PRIORITY, 2*24*time.Hour, hardTimeout, ioTimeout, false, true, getServiceAccount(GCE_LINUX_BUILDER_DIMENSIONS))
if err != nil {
return "", fmt.Errorf("Could not trigger swarming task: %s", err)
}
if len(tasks) != 1 {
return "", fmt.Errorf("Expected a single task instead got: %v", tasks)
}
// Collect all tasks and log the ones that fail.
task := tasks[0]
_, outputDir, err := task.Collect(ctx, s)
if err != nil {
return "", fmt.Errorf("task %s failed: %s", task.Title, err)
}
outputFile := filepath.Join(outputDir, ISOLATE_TELEMETRY_FILENAME)
contents, err := ioutil.ReadFile(outputFile)
if err != nil {
return "", fmt.Errorf("Could not read outputfile %s: %s", outputFile, err)
}
return strings.Trim(string(contents), "\n"), nil
}
// TriggerBuildRepoSwarmingTask creates a isolated.gen.json file using BUILD_REPO_ISOLATE,
// archives it, and triggers it's swarming task. The swarming task will run the build_repo
// worker script which will return a list of remote build directories.
func TriggerBuildRepoSwarmingTask(ctx context.Context, taskName, runID, repoAndTarget, targetPlatform, serviceAccountJSON string, hashes, patches, cipdPackages []string, singleBuild, local bool, hardTimeout, ioTimeout time.Duration) ([]string, error) {
// Instantiate the swarming client.
workDir, err := ioutil.TempDir(StorageDir, "swarming_work_")
if err != nil {
return nil, fmt.Errorf("Could not get temp dir: %s", err)
}
s, err := swarming.NewSwarmingClient(ctx, workDir, swarming.SWARMING_SERVER_PRIVATE, isolate.ISOLATE_SERVER_URL_PRIVATE, serviceAccountJSON)
if err != nil {
// Cleanup workdir.
if err := os.RemoveAll(workDir); err != nil {
sklog.Errorf("Could not cleanup swarming work dir: %s", err)
}
return nil, fmt.Errorf("Could not instantiate swarming client: %s", err)
}
defer s.Cleanup()
// Create isolated.gen.json.
// Get path to isolate files.
pathToIsolates := GetPathToIsolates(local)
isolateArgs := map[string]string{
"RUN_ID": runID,
"REPO_AND_TARGET": repoAndTarget,
"HASHES": strings.Join(hashes, ","),
"PATCHES": strings.Join(patches, ","),
"SINGLE_BUILD": strconv.FormatBool(singleBuild),
"TARGET_PLATFORM": targetPlatform,
}
genJSON, err := s.CreateIsolatedGenJSON(path.Join(pathToIsolates, BUILD_REPO_ISOLATE), s.WorkDir, "linux", taskName, isolateArgs, []string{})
if err != nil {
return nil, fmt.Errorf("Could not create isolated.gen.json for task %s: %s", taskName, err)
}
// Batcharchive the task.
tasksToHashes, err := s.BatchArchiveTargets(ctx, []string{genJSON}, BATCHARCHIVE_TIMEOUT)
if err != nil {
return nil, fmt.Errorf("Could not batch archive target: %s", err)
}
// Trigger swarming using the isolate hash.
var dimensions map[string]string
if targetPlatform == "Android" {
dimensions = GCE_ANDROID_BUILDER_DIMENSIONS
} else {
dimensions = GCE_LINUX_BUILDER_DIMENSIONS
}
tasks, err := s.TriggerSwarmingTasks(ctx, tasksToHashes, dimensions, map[string]string{"runid": runID}, cipdPackages, swarming.RECOMMENDED_PRIORITY, 2*24*time.Hour, hardTimeout, ioTimeout, false, true, getServiceAccount(dimensions))
if err != nil {
return nil, fmt.Errorf("Could not trigger swarming task: %s", err)
}
if len(tasks) != 1 {
return nil, fmt.Errorf("Expected a single task instead got: %v", tasks)
}
// Collect all tasks and log the ones that fail.
task := tasks[0]
_, outputDir, err := task.Collect(ctx, s)
if err != nil {
return nil, fmt.Errorf("task %s failed: %s", task.Title, err)
}
outputFile := filepath.Join(outputDir, BUILD_OUTPUT_FILENAME)
contents, err := ioutil.ReadFile(outputFile)
if err != nil {
return nil, fmt.Errorf("Could not read outputfile %s: %s", outputFile, err)
}
return strings.Split(string(contents), ","), nil
}
func DownloadPatch(localPath, remotePath string, gs *GcsUtil) (int64, error) {
respBody, err := gs.GetRemoteFileContents(remotePath)
if err != nil {
return -1, fmt.Errorf("Could not fetch %s: %s", remotePath, err)
}
defer util.Close(respBody)
f, err := os.Create(localPath)
if err != nil {
return -1, fmt.Errorf("Could not create %s: %s", localPath, err)
}
defer util.Close(f)
written, err := io.Copy(f, respBody)
if err != nil {
return -1, fmt.Errorf("Could not write to %s: %s", localPath, err)
}
return written, nil
}
func DownloadAndApplyPatch(ctx context.Context, patchName, localDir, remotePatchesDir, checkout string, gs *GcsUtil) error {
patchLocalPath := filepath.Join(localDir, patchName)
patchRemotePath := filepath.Join(remotePatchesDir, patchName)
written, err := DownloadPatch(patchLocalPath, patchRemotePath, gs)
if err != nil {
return fmt.Errorf("Could not download %s: %s", patchRemotePath, err)
}
// Apply patch to the local checkout.
if written > 10 {
if err := ApplyPatch(ctx, patchLocalPath, checkout); err != nil {
return fmt.Errorf("Could not apply patch in %s: %s", checkout, err)
}
}
return nil
}
// GetArchivesNum returns the number of archives for the specified pagesetType.
// -1 is returned if USE_LIVE_SITES_FLAGS is specified or if there is an error.
func GetArchivesNum(gs *GcsUtil, benchmarkArgs, pagesetType string) (int, error) {
if strings.Contains(benchmarkArgs, USE_LIVE_SITES_FLAGS) {
return -1, nil
}
// Calculate the number of archives the workers worked with.
archivesRemoteDir := filepath.Join(SWARMING_DIR_NAME, WEB_ARCHIVES_DIR_NAME, pagesetType)
totalArchiveArtifacts, err := gs.GetRemoteDirCount(archivesRemoteDir)
if err != nil {
return -1, fmt.Errorf("Could not find archives in %s: %s", archivesRemoteDir, err)
}
// Each archive has a JSON file, a WPR file and a WPR.sha1 file.
return totalArchiveArtifacts / 3, nil
}
// GetHashesFromBuild returns the Chromium and Skia hashes from a CT build string.
// Example build string: try-27af50f-d5dcd58-rmistry-20151026102511-nopatch.
func GetHashesFromBuild(chromiumBuild string) (string, string) {
tokens := strings.Split(chromiumBuild, "-")
return tokens[1], tokens[2]
}
// GetNumPages returns the number of specified custom webpages. If Custom
// webpages are not specified then the number of pages associated with the
// pageset type is returned.
func GetNumPages(pagesetType, customWebPagesFilePath string) (int, error) {
customPages, err := GetCustomPages(customWebPagesFilePath)
if err != nil {
return PagesetTypeToInfo[pagesetType].NumPages, err
}
if len(customPages) == 0 {
return PagesetTypeToInfo[pagesetType].NumPages, nil
}
return len(customPages), nil
}
// GetCustomPages returns the specified custom webpages. If Custom
// webpages are not specified then it returns an empty slice.
func GetCustomPages(customWebPagesFilePath string) ([]string, error) {
csvFile, err := os.Open(customWebPagesFilePath)
if err != nil {
return nil, err
}
defer util.Close(csvFile)
reader := csv.NewReader(csvFile)
customPages := []string{}
for {
records, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
for _, record := range records {
if strings.TrimSpace(record) == "" {
continue
}
customPages = append(customPages, record)
}
}
return customPages, nil
}
func GetCustomPagesWithinRange(startRange, num int, customWebpages []string) []string {
startIndex := startRange - 1
endIndex := util.MinInt(startIndex+num, len(customWebpages))
return customWebpages[startIndex:endIndex]
}
func CreateCustomPagesets(webpages []string, pagesetsDir, targetPlatform string) error {
// Empty the local dir.
util.RemoveAll(pagesetsDir)
// Create the local dir.
util.MkdirAll(pagesetsDir, 0700)
// Figure out which user agent to use.
var userAgent string
if targetPlatform == PLATFORM_ANDROID {
userAgent = "mobile"
} else {
userAgent = "desktop"
}
for i, w := range webpages {
pagesetPath := filepath.Join(pagesetsDir, fmt.Sprintf("%d.py", i+1))
if err := WritePageset(pagesetPath, userAgent, DEFAULT_CUSTOM_PAGE_ARCHIVEPATH, w); err != nil {
return err
}
}
return nil
}
func GetBasePixelDiffRemoteDir(runID string) (string, error) {
// Parse the CT runID to extract YYYY/MM/DD/HH
regex := regexp.MustCompile(`[a-z]+-([0-9]{4})([0-9]{2})([0-9]{2})([0-9]{2})[0-9]{4}`)
matches := regex.FindStringSubmatch(runID)
if len(matches) != 5 {
return "", fmt.Errorf("Could not parse runID %s with the regex %q", runID, regex.String())
}
return filepath.Join(PixelDiffRunsDir, matches[1], matches[2], matches[3], matches[4], runID), nil
}
func SavePatchToStorage(patch string) (string, error) {
if len(patch) > PATCH_LIMIT {
return "", fmt.Errorf("Patch is too long with %d bytes; limit %d bytes", len(patch), PATCH_LIMIT)
}
// If sha1 below ever changes, then isEmptyPatch in ctfe.js will also need to
// be updated.
patchHash := sha1.Sum([]byte(patch))
patchHashHex := hex.EncodeToString(patchHash[:])
gs, err := NewGcsUtil(nil)
if err != nil {
return "", err
}
gsDir := "patches"
patchFileName := fmt.Sprintf("%s.patch", patchHashHex)
gsPath := path.Join(gsDir, patchFileName)
res, err := gs.service.Objects.Get(GCSBucketName, gsPath).Do()
if err != nil {
sklog.Infof("This is expected for patches we have not seen before:\nCould not retrieve object metadata for %s: %s", gsPath, err)
}
if res == nil || res.Size != uint64(len(patch)) {
// Patch does not exist in Google Storage yet so upload it.
patchPath := filepath.Join(os.TempDir(), patchFileName)
if err := ioutil.WriteFile(patchPath, []byte(patch), 0666); err != nil {
return "", err
}
defer util.Remove(patchPath)
if err := gs.UploadFile(patchFileName, os.TempDir(), gsDir); err != nil {
return "", err
}
}
return gsPath, nil
}
func GetPatchFromStorage(patchId string) (string, error) {
gs, err := NewGcsUtil(nil)
respBody, err := gs.GetRemoteFileContents(patchId)
if err != nil {
return "", fmt.Errorf("Could not fetch %s: %s", patchId, err)
}
defer util.Close(respBody)
patch, err := ioutil.ReadAll(respBody)
if err != nil {
return "", fmt.Errorf("Could not read from %s: %s", patchId, err)
}
return string(patch), nil
}
func GetRankFromPageset(pagesetFileName string) (int, error) {
// All CT pagesets are of the form [rank].py so just stripping out the
// extension should give us the rank of the pageset.
var extension = filepath.Ext(pagesetFileName)
rank := pagesetFileName[0 : len(pagesetFileName)-len(extension)]
return strconv.Atoi(rank)
}
type Pageset struct {
UserAgent string `json:"user_agent"`
ArchiveDataFile string `json:"archive_data_file"`
UrlsList string `json:"urls_list"`
}
func WritePageset(filePath, userAgent, archiveFilePath, url string) error {
pageSet := Pageset{
UserAgent: userAgent,
ArchiveDataFile: archiveFilePath,
UrlsList: url,
}
b, err := json.Marshal(pageSet)
if err != nil {
return err
}
if err := ioutil.WriteFile(filePath, b, 0644); err != nil {
return err
}
return nil
}
type TimeoutTracker struct {
timeoutCounter int
timeoutCounterMutex sync.Mutex
}
func (t *TimeoutTracker) Increment() {
t.timeoutCounterMutex.Lock()
defer t.timeoutCounterMutex.Unlock()
t.timeoutCounter++
}
func (t *TimeoutTracker) Reset() {
t.timeoutCounterMutex.Lock()
defer t.timeoutCounterMutex.Unlock()
t.timeoutCounter = 0
}
func (t *TimeoutTracker) Read() int {
t.timeoutCounterMutex.Lock()
defer t.timeoutCounterMutex.Unlock()
return t.timeoutCounter
}