blob: 75b2b260cbd654e0ff64a29c0107e8b15eedf27e [file] [log] [blame]
// Utility that contains methods for both CT master and worker scripts.
package util
import (
"bufio"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"runtime"
"strconv"
"sync"
"time"
"go.skia.org/infra/go/exec"
"go.skia.org/infra/go/swarming"
"go.skia.org/infra/go/util"
"github.com/skia-dev/glog"
)
const (
MAX_SYNC_TRIES = 3
TS_FORMAT = "20060102150405"
REMOVE_INVALID_SKPS_WORKER_POOL = 20
)
// GetCTWorkersProd returns an array of all CT workers in the Cluster Telemetry Golo.
func GetCTWorkersProd() []string {
workers := make([]string, NUM_WORKERS_PROD)
for i := 0; i < NUM_WORKERS_PROD; i++ {
workers[i] = fmt.Sprintf(WORKER_NAME_TEMPLATE, i+1)
}
return workers
}
// CreateTimestampFile creates a TIMESTAMP file in the specified dir. The dir must
// exist else an error is returned.
func CreateTimestampFile(dir string) error {
// Create the task file in TaskFileDir.
timestampFilePath := filepath.Join(dir, TIMESTAMP_FILE_NAME)
out, err := os.Create(timestampFilePath)
if err != nil {
return fmt.Errorf("Could not create %s: %s", timestampFilePath, err)
}
defer util.Close(out)
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
w := bufio.NewWriter(out)
if _, err := w.WriteString(strconv.FormatInt(timestamp, 10)); err != nil {
return fmt.Errorf("Could not write to %s: %s", timestampFilePath, err)
}
util.LogErr(w.Flush())
return nil
}
// CreateTaskFile creates a taskName file in the TaskFileDir dir. It signifies
// that the worker is currently busy doing a particular task.
func CreateTaskFile(taskName string) error {
// Create TaskFileDir if it does not exist.
if _, err := os.Stat(TaskFileDir); err != nil {
if os.IsNotExist(err) {
// Dir does not exist create it.
if err := os.MkdirAll(TaskFileDir, 0700); err != nil {
return fmt.Errorf("Could not create %s: %s", TaskFileDir, err)
}
} else {
// There was some other error.
return err
}
}
// Create the task file in TaskFileDir.
taskFilePath := filepath.Join(TaskFileDir, taskName)
if _, err := os.Create(taskFilePath); err != nil {
return fmt.Errorf("Could not create %s: %s", taskFilePath, err)
}
return nil
}
// DeleteTaskFile deletes a taskName file in the TaskFileDir dir. It should be
// called when the worker is done executing a particular task.
func DeleteTaskFile(taskName string) {
taskFilePath := filepath.Join(TaskFileDir, taskName)
if err := os.Remove(taskFilePath); err != nil {
glog.Errorf("Could not delete %s: %s", taskFilePath, err)
}
}
func TimeTrack(start time.Time, name string) {
elapsed := time.Since(start)
glog.Infof("===== %s took %s =====", name, elapsed)
}
// ExecuteCmd executes the specified binary with the specified args and env. Stdout and Stderr are
// written to stdout and stderr respectively if specified. If not specified then Stdout and Stderr
// will be outputted only to glog.
func ExecuteCmd(binary string, args, env []string, timeout time.Duration, stdout, stderr io.Writer) error {
return exec.Run(&exec.Command{
Name: binary,
Args: args,
Env: env,
InheritPath: true,
Timeout: timeout,
LogStdout: true,
Stdout: stdout,
LogStderr: true,
Stderr: stderr,
})
}
// SyncDir runs "git pull" and "gclient sync" on the specified directory.
func SyncDir(dir string) error {
err := os.Chdir(dir)
if err != nil {
return fmt.Errorf("Could not chdir to %s: %s", dir, err)
}
for i := 0; i < MAX_SYNC_TRIES; i++ {
if i > 0 {
glog.Warningf("%d. retry for syncing %s", i, dir)
}
err = syncDirStep()
if err == nil {
break
}
glog.Errorf("Error syncing %s", dir)
}
if err != nil {
glog.Errorf("Failed to sync %s after %d attempts", dir, MAX_SYNC_TRIES)
}
return err
}
func syncDirStep() error {
err := ExecuteCmd(BINARY_GIT, []string{"pull"}, []string{}, GIT_PULL_TIMEOUT, nil, nil)
if err != nil {
return fmt.Errorf("Error running git pull: %s", err)
}
err = ExecuteCmd(BINARY_GCLIENT, []string{"sync"}, []string{}, GCLIENT_SYNC_TIMEOUT, nil,
nil)
if err != nil {
return fmt.Errorf("Error running gclient sync: %s", err)
}
return nil
}
// BuildSkiaTools builds "tools" in the Skia trunk directory.
func BuildSkiaTools() error {
if err := os.Chdir(SkiaTreeDir); err != nil {
return fmt.Errorf("Could not chdir to %s: %s", SkiaTreeDir, err)
}
// Run "make clean".
util.LogErr(ExecuteCmd(BINARY_MAKE, []string{"clean"}, []string{}, MAKE_CLEAN_TIMEOUT, nil,
nil))
// Build tools.
return ExecuteCmd(BINARY_MAKE, []string{"tools", "BUILDTYPE=Release"},
[]string{"GYP_DEFINES=\"skia_warnings_as_errors=0\""}, MAKE_TOOLS_TIMEOUT, nil, nil)
}
// BuildPDFium builds "pdfium_test" in the PDFium repo directory.
func BuildPDFium() error {
if err := os.Chdir(PDFiumTreeDir); err != nil {
return fmt.Errorf("Could not chdir to %s: %s", SkiaTreeDir, err)
}
// Run "build/gyp_pdfium"
if err := ExecuteCmd(path.Join("build_gyp", "gyp_pdfium"), []string{},
[]string{"GYP_DEFINES=\"pdf_use_skia=1\"", "CPPFLAGS=\"-Wno-error\""}, GYP_PDFIUM_TIMEOUT, nil, nil); err != nil {
return err
}
// Build pdfium_test.
return ExecuteCmd(BINARY_NINJA, []string{"-C", "out/Debug", BINARY_PDFIUM_TEST},
[]string{}, NINJA_TIMEOUT, nil, nil)
}
// ResetCheckout resets the specified Git checkout.
func ResetCheckout(dir string) error {
if err := os.Chdir(dir); err != nil {
return fmt.Errorf("Could not chdir to %s: %s", dir, err)
}
// Run "git reset --hard HEAD"
resetArgs := []string{"reset", "--hard", "HEAD"}
util.LogErr(ExecuteCmd(BINARY_GIT, resetArgs, []string{}, GIT_RESET_TIMEOUT, nil, nil))
// Run "git clean -f -d"
cleanArgs := []string{"clean", "-f", "-d"}
util.LogErr(ExecuteCmd(BINARY_GIT, cleanArgs, []string{}, GIT_CLEAN_TIMEOUT, nil, nil))
return nil
}
// ApplyPatch applies a patch to a Git checkout.
func ApplyPatch(patch, dir string) error {
if err := os.Chdir(dir); err != nil {
return fmt.Errorf("Could not chdir to %s: %s", dir, err)
}
// Run "git apply --index -p1 --verbose --ignore-whitespace
// --ignore-space-change ${PATCH_FILE}"
args := []string{"apply", "--index", "-p1", "--verbose", "--ignore-whitespace", "--ignore-space-change", patch}
return ExecuteCmd(BINARY_GIT, args, []string{}, GIT_APPLY_TIMEOUT, nil, nil)
}
// CleanTmpDir deletes all tmp files from the caller because telemetry tends to
// generate a lot of temporary artifacts there and they take up root disk space.
func CleanTmpDir() {
files, _ := ioutil.ReadDir(os.TempDir())
for _, f := range files {
util.RemoveAll(filepath.Join(os.TempDir(), f.Name()))
}
}
// Get a direct link to the log of this task on the master's logserver.
func GetMasterLogLink(runID string) string {
programName := filepath.Base(os.Args[0])
return fmt.Sprintf("%s/%s.%s.%s.log.INFO.%s", MASTER_LOGSERVER_LINK, programName, MASTER_NAME, CtUser, runID)
}
func GetTimeFromTs(formattedTime string) time.Time {
t, _ := time.Parse(TS_FORMAT, formattedTime)
return t
}
func GetCurrentTs() string {
return time.Now().UTC().Format(TS_FORMAT)
}
// Returns channel that contains all pageset file names without the timestamp
// file and pyc files.
func GetClosedChannelOfPagesets(fileInfos []os.FileInfo) chan string {
pagesetsChannel := make(chan string, len(fileInfos))
for _, fileInfo := range fileInfos {
pagesetName := fileInfo.Name()
pagesetBaseName := filepath.Base(pagesetName)
if pagesetBaseName == TIMESTAMP_FILE_NAME || filepath.Ext(pagesetBaseName) == ".pyc" {
// Ignore timestamp files and .pyc files.
continue
}
pagesetsChannel <- pagesetName
}
close(pagesetsChannel)
return pagesetsChannel
}
// Running benchmarks in parallel leads to multiple chrome instances coming up
// at the same time, when there are crashes chrome processes stick around which
// can severely impact the machine's performance. To stop this from
// happening chrome zombie processes are periodically killed.
func ChromeProcessesCleaner(locker sync.Locker, chromeCleanerTimer time.Duration) {
for _ = range time.Tick(chromeCleanerTimer) {
glog.Info("The chromeProcessesCleaner goroutine has started")
glog.Info("Waiting for all existing tasks to complete before killing zombie chrome processes")
locker.Lock()
util.LogErr(ExecuteCmd("pkill", []string{"-9", "chrome"}, []string{}, PKILL_TIMEOUT, nil, nil))
locker.Unlock()
}
}
// Contains the data included in CT pagesets.
type PagesetVars struct {
// A comma separated list of URLs.
UrlsList string `json:"urls_list"`
// Will be either "mobile" or "desktop".
UserAgent string `json:"user_agent"`
// The location of the web page's WPR data file.
ArchiveDataFile string `json:"archive_data_file"`
}
func ReadPageset(pagesetPath string) (PagesetVars, error) {
decodedPageset := PagesetVars{}
pagesetContent, err := os.Open(pagesetPath)
if err != nil {
return decodedPageset, fmt.Errorf("Could not read %s: %s", pagesetPath, err)
}
if err := json.NewDecoder(pagesetContent).Decode(&decodedPageset); err != nil {
return decodedPageset, fmt.Errorf("Could not JSON decode %s: %s", pagesetPath, err)
}
return decodedPageset, nil
}
// ValidateSKPs moves all root_dir/dir_name/*.skp into the root_dir and validates them.
// SKPs that fail validation are logged and deleted.
func ValidateSKPs(pathToSkps string) error {
// List all directories in pathToSkps and copy out the skps.
skpFileInfos, err := ioutil.ReadDir(pathToSkps)
if err != nil {
return fmt.Errorf("Unable to read %s: %s", pathToSkps, err)
}
for _, fileInfo := range skpFileInfos {
if !fileInfo.IsDir() {
// We are only interested in directories.
continue
}
skpName := fileInfo.Name()
// Find the largest layer in this directory.
layerInfos, err := ioutil.ReadDir(filepath.Join(pathToSkps, skpName))
if err != nil {
glog.Errorf("Unable to read %s: %s", filepath.Join(pathToSkps, skpName), err)
}
if len(layerInfos) > 0 {
largestLayerInfo := layerInfos[0]
for _, layerInfo := range layerInfos {
if layerInfo.Size() > largestLayerInfo.Size() {
largestLayerInfo = layerInfo
}
}
// Only save SKPs greater than 6000 bytes. Less than that are probably
// malformed.
if largestLayerInfo.Size() > 6000 {
layerPath := filepath.Join(pathToSkps, skpName, largestLayerInfo.Name())
destSKP := filepath.Join(pathToSkps, skpName+".skp")
util.Rename(layerPath, destSKP)
} else {
glog.Warningf("Skipping %s because size was less than 6000 bytes", skpName)
}
}
// We extracted what we needed from the directory, now delete it.
util.RemoveAll(filepath.Join(pathToSkps, skpName))
}
// Create channel that contains all SKP file paths. This channel will
// be consumed by the worker pool below to run remove_invalid_skp.py in
// parallel.
skps, err := ioutil.ReadDir(pathToSkps)
if err != nil {
return fmt.Errorf("Unable to read %s: %s", pathToSkps, err)
}
skpsChannel := make(chan string, len(skps))
for _, skp := range skps {
skpsChannel <- filepath.Join(pathToSkps, skp.Name())
}
close(skpsChannel)
glog.Info("Calling remove_invalid_skp.py")
// Sync Skia tree.
util.LogErr(SyncDir(SkiaTreeDir))
// Build tools.
util.LogErr(BuildSkiaTools())
// Run remove_invalid_skp.py in parallel goroutines.
// Construct path to the python script.
_, currentFile, _, _ := runtime.Caller(0)
pathToPyFiles := filepath.Join(
filepath.Dir((filepath.Dir(filepath.Dir(currentFile)))),
"py")
pathToRemoveSKPs := filepath.Join(pathToPyFiles, "remove_invalid_skp.py")
pathToSKPInfo := filepath.Join(SkiaTreeDir, "out", "Release", "skpinfo")
var wg sync.WaitGroup
// Loop through workers in the worker pool.
for i := 0; i < REMOVE_INVALID_SKPS_WORKER_POOL; i++ {
// Increment the WaitGroup counter.
wg.Add(1)
// Create and run a goroutine closure that captures SKPs.
go func(i int) {
// Decrement the WaitGroup counter when the goroutine completes.
defer wg.Done()
for skpPath := range skpsChannel {
args := []string{
pathToRemoveSKPs,
"--path_to_skp=" + skpPath,
"--path_to_skpinfo=" + pathToSKPInfo,
}
glog.Infof("Executing remove_invalid_skp.py with goroutine#%d", i+1)
// Execute the command with stdout not logged. It otherwise outputs
// tons of log msgs.
util.LogErr(exec.Run(&exec.Command{
Name: "python",
Args: args,
Env: []string{},
InheritPath: true,
Timeout: REMOVE_INVALID_SKPS_TIMEOUT,
LogStdout: false,
Stdout: nil,
LogStderr: true,
Stderr: nil,
}))
}
}(i)
}
// Wait for all spawned goroutines to complete.
wg.Wait()
return nil
}
// GetStartRange returns the range worker should start processing at based on its num and how many
// artifacts it is allowed to process.
func GetStartRange(workerNum, artifactsPerWorker int) int {
return ((workerNum - 1) * artifactsPerWorker) + 1
}
func TriggerSwarmingTask(pagesetType, taskPrefix, isolateName string, hardTimeout, ioTimeout time.Duration, maxPagesPerBot int, isolateExtraArgs map[string]string) error {
// Instantiate the swarming client.
workDir, err := ioutil.TempDir("", "swarming_work_")
if err != nil {
return fmt.Errorf("Could not get temp dir: %s", err)
}
s, err := swarming.NewSwarmingClient(workDir)
if err != nil {
return fmt.Errorf("Could not instantiate swarming client: %s", err)
}
defer s.Cleanup()
// Create isolated.gen.json files from tasks.
genJSONs := []string{}
// Get path to isolate files.
_, currentFile, _, _ := runtime.Caller(0)
pathToIsolates := filepath.Join(filepath.Dir(filepath.Dir(filepath.Dir(currentFile))), "isolates")
for i := 1; i <= PagesetTypeToInfo[pagesetType].NumPages/maxPagesPerBot; i++ {
isolateArgs := map[string]string{
"START_RANGE": strconv.Itoa(GetStartRange(i, maxPagesPerBot)),
"NUM": strconv.Itoa(maxPagesPerBot),
"PAGESET_TYPE": pagesetType,
}
// Add isolateExtraArgs (if specified) into the isolateArgs.
for k, v := range isolateExtraArgs {
isolateArgs[k] = v
}
taskName := fmt.Sprintf("%s_%d", taskPrefix, i)
genJSON, err := s.CreateIsolatedGenJSON(path.Join(pathToIsolates, isolateName), s.WorkDir, "linux", taskName, isolateArgs, []string{})
if err != nil {
return fmt.Errorf("Could not create isolated.gen.json for task %s: %s", taskName, err)
}
genJSONs = append(genJSONs, genJSON)
}
// Batcharchive the tasks.
tasksToHashes, err := s.BatchArchiveTargets(genJSONs, BATCHARCHIVE_TIMEOUT)
if err != nil {
return fmt.Errorf("Could not batch archive targets: %s", err)
}
if len(genJSONs) != len(tasksToHashes) {
return fmt.Errorf("len(genJSONs) was %d and len(tasksToHashes) was %d", len(genJSONs), len(tasksToHashes))
}
// Trigger swarming using the isolate hashes.
dimensions := map[string]string{"pool": SWARMING_POOL}
tasks, err := s.TriggerSwarmingTasks(tasksToHashes, dimensions, swarming.RECOMMENDED_PRIORITY, swarming.RECOMMENDED_EXPIRATION, hardTimeout, ioTimeout, false)
if err != nil {
return fmt.Errorf("Could not trigger swarming task: %s", err)
}
// Collect all tasks and log the ones that fail.
for _, task := range tasks {
if _, _, err := task.Collect(s); err != nil {
glog.Errorf("task %s failed: %s", task.Title, err)
continue
}
}
return nil
}
// GetPathToPyFiles returns the location of CT's python scripts.
func GetPathToPyFiles(runOnSwarming bool) string {
if runOnSwarming {
return filepath.Join(filepath.Dir(filepath.Dir(os.Args[0])), "src", "go.skia.org", "infra", "ct", "py")
} else {
_, currentFile, _, _ := runtime.Caller(0)
return filepath.Join(filepath.Dir(filepath.Dir(filepath.Dir(currentFile))), "py")
}
}