blob: 4d5778ccebfd74a2935e9334e41cb2ef2a781d52 [file] [log] [blame]
// run_lua_on_workers is an application that runs the specified lua script on all
// CT workers and uploads the results to Google Storage. The requester is emailed
// when the task is done.
package main
import (
"context"
"flag"
"fmt"
"io"
"io/ioutil"
"math"
"os"
"path"
"path/filepath"
"strconv"
"time"
"go.skia.org/infra/ct/go/ctfe/lua_scripts"
"go.skia.org/infra/ct/go/frontend"
"go.skia.org/infra/ct/go/master_scripts/master_common"
"go.skia.org/infra/ct/go/util"
"go.skia.org/infra/go/sklog"
skutil "go.skia.org/infra/go/util"
)
const (
MAX_PAGES_PER_SWARMING_BOT = 10000
)
var (
emails = flag.String("emails", "", "The comma separated email addresses to notify when the task is picked up and completes.")
description = flag.String("description", "", "The description of the run as entered by the requester.")
taskID = flag.Int64("task_id", -1, "The key of the CT task in CTFE. The task will be updated when it is started and also when it completes.")
pagesetType = flag.String("pageset_type", "", "The type of pagesets to use. Eg: 10k, Mobile10k, All.")
chromiumBuild = flag.String("chromium_build", "", "The chromium build to use for this capture_archives run.")
runOnGCE = flag.Bool("run_on_gce", true, "Run on Linux GCE instances.")
runID = flag.String("run_id", "", "The unique run id (typically requester + timestamp).")
taskCompletedSuccessfully = false
luaOutputRemoteLink = ""
luaAggregatorOutputRemoteLink = ""
)
func sendEmail(recipients []string) {
// Send completion email.
emailSubject := fmt.Sprintf("Run lua script Cluster telemetry task has completed (#%d)", *taskID)
failureHtml := ""
if !taskCompletedSuccessfully {
emailSubject += " with failures"
failureHtml = util.GetFailureEmailHtml(*runID)
}
scriptOutputHtml := ""
if luaOutputRemoteLink != "" {
scriptOutputHtml = fmt.Sprintf("The output of your script is available <a href='%s'>here</a>.<br/>\n", luaOutputRemoteLink)
}
aggregatorOutputHtml := ""
if luaAggregatorOutputRemoteLink != "" {
aggregatorOutputHtml = fmt.Sprintf("The aggregated output of your script is available <a href='%s'>here</a>.<br/>\n", luaAggregatorOutputRemoteLink)
}
bodyTemplate := `
The Cluster telemetry queued task to run lua script on %s pageset has completed. %s.<br/>
Run description: %s<br/>
%s
%s
%s
You can schedule more runs <a href="%s">here</a>.<br/><br/>
Thanks!
`
emailBody := fmt.Sprintf(bodyTemplate, *pagesetType, util.GetSwarmingLogsLink(*runID), *description, failureHtml, scriptOutputHtml, aggregatorOutputHtml, frontend.LuaTasksWebapp)
if err := util.SendEmail(recipients, emailSubject, emailBody); err != nil {
sklog.Errorf("Error while sending email: %s", err)
return
}
}
func updateWebappTask() {
vars := lua_scripts.UpdateVars{}
vars.Id = *taskID
vars.SetCompleted(taskCompletedSuccessfully)
if luaOutputRemoteLink != "" {
vars.ScriptOutput = luaOutputRemoteLink
}
if luaAggregatorOutputRemoteLink != "" {
vars.AggregatedOutput = luaAggregatorOutputRemoteLink
}
skutil.LogErr(frontend.UpdateWebappTaskV2(&vars))
}
func main() {
master_common.Init("run_lua")
ctx := context.Background()
// Send start email.
emailsArr := util.ParseEmails(*emails)
emailsArr = append(emailsArr, util.CtAdmins...)
if len(emailsArr) == 0 {
sklog.Error("At least one email address must be specified")
return
}
skutil.LogErr(frontend.UpdateWebappTaskSetStarted(&lua_scripts.UpdateVars{}, *taskID, *runID))
skutil.LogErr(util.SendTaskStartEmail(*taskID, emailsArr, "Lua script", *runID, *description))
// Ensure webapp is updated and email is sent even if task fails.
defer updateWebappTask()
defer sendEmail(emailsArr)
// Finish with glog flush and how long the task took.
defer util.TimeTrack(time.Now(), "Running Lua script on workers")
defer sklog.Flush()
if *pagesetType == "" {
sklog.Error("Must specify --pageset_type")
return
}
if *chromiumBuild == "" {
sklog.Error("Must specify --chromium_build")
return
}
if *runID == "" {
sklog.Error("Must specify --run_id")
return
}
// Instantiate GcsUtil object.
gs, err := util.NewGcsUtil(nil)
if err != nil {
sklog.Error(err)
return
}
// Upload the lua script for this run to Google storage.
luaScriptName := *runID + ".lua"
defer skutil.Remove(filepath.Join(os.TempDir(), luaScriptName))
luaScriptRemoteDir := filepath.Join(util.LuaRunsDir, *runID, "scripts")
if err := gs.UploadFile(luaScriptName, os.TempDir(), luaScriptRemoteDir); err != nil {
sklog.Errorf("Could not upload %s to %s: %s", luaScriptName, luaScriptRemoteDir, err)
return
}
// Build lua_pictures.
cipdPackage, err := util.GetCipdPackageFromAsset("clang_linux")
if err != nil {
sklog.Errorf("Could not get cipd package for clang_linux: %s", err)
return
}
remoteDirNames, err := util.TriggerBuildRepoSwarmingTask(
ctx, "build_lua_pictures", *runID, "skiaLuaPictures", util.PLATFORM_LINUX, *master_common.ServiceAccountFile, []string{}, []string{}, []string{cipdPackage}, true, *master_common.Local, 3*time.Hour, 1*time.Hour)
if err != nil {
sklog.Errorf("Error encountered when swarming build lua_pictures task: %s", err)
return
}
luaPicturesRemoteDirName := remoteDirNames[0]
luaPicturesRemotePath := path.Join(util.BINARIES_DIR_NAME, luaPicturesRemoteDirName, util.BINARY_LUA_PICTURES)
// Empty the remote dir before the workers upload to it.
gsBaseDir := filepath.Join(util.SWARMING_DIR_NAME, filepath.Join(util.LuaRunsDir, *runID), *pagesetType)
skutil.LogErr(gs.DeleteRemoteDir(gsBaseDir))
// Archive, trigger and collect swarming tasks.
isolateExtraArgs := map[string]string{
"CHROMIUM_BUILD": *chromiumBuild,
"RUN_ID": *runID,
"LUA_PICTURES_REMOTE_PATH": luaPicturesRemotePath,
}
if _, err := util.TriggerSwarmingTask(ctx, *pagesetType, "run_lua", util.RUN_LUA_ISOLATE, *runID, *master_common.ServiceAccountFile, 3*time.Hour, 1*time.Hour, util.TASKS_PRIORITY_MEDIUM, MAX_PAGES_PER_SWARMING_BOT, util.PagesetTypeToInfo[*pagesetType].NumPages, isolateExtraArgs, *runOnGCE, *master_common.Local, 1, []string{} /* isolateDeps */); err != nil {
sklog.Errorf("Error encountered when swarming tasks: %s", err)
return
}
// Copy outputs from all slaves locally and combine it into one file.
consolidatedFileName := "lua-output"
consolidatedLuaOutput := filepath.Join(os.TempDir(), consolidatedFileName)
// If the file already exists it could be that there is another lua task running on this machine.
// Wait for the file to be deleted within a deadline.
if err := waitForOutputFile(consolidatedLuaOutput); err != nil {
sklog.Error(err)
return
}
defer skutil.Remove(consolidatedLuaOutput)
if err := ioutil.WriteFile(consolidatedLuaOutput, []byte{}, 0660); err != nil {
sklog.Errorf("Could not create %s: %s", consolidatedLuaOutput, err)
return
}
numTasks := int(math.Ceil(float64(util.PagesetTypeToInfo[*pagesetType].NumPages) / float64(MAX_PAGES_PER_SWARMING_BOT)))
for i := 1; i <= numTasks; i++ {
startRange := strconv.Itoa(util.GetStartRange(i, MAX_PAGES_PER_SWARMING_BOT))
workerRemoteOutputPath := filepath.Join(util.LuaRunsDir, *runID, startRange, "outputs", *runID+".output")
respBody, err := gs.GetRemoteFileContents(workerRemoteOutputPath)
if err != nil {
sklog.Errorf("Could not fetch %s: %s", workerRemoteOutputPath, err)
continue
}
defer skutil.Close(respBody)
out, err := os.OpenFile(consolidatedLuaOutput, os.O_RDWR|os.O_APPEND, 0660)
if err != nil {
sklog.Errorf("Unable to open file %s: %s", consolidatedLuaOutput, err)
return
}
defer skutil.Close(out)
if _, err = io.Copy(out, respBody); err != nil {
sklog.Errorf("Unable to write out %s to %s: %s", workerRemoteOutputPath, consolidatedLuaOutput, err)
return
}
}
// Copy the consolidated file into Google Storage.
consolidatedOutputRemoteDir := filepath.Join(util.LuaRunsDir, *runID, "consolidated_outputs")
luaOutputRemoteLink = util.GCS_HTTP_LINK + filepath.Join(util.GCSBucketName, consolidatedOutputRemoteDir, consolidatedFileName)
if err := gs.UploadFile(consolidatedFileName, os.TempDir(), consolidatedOutputRemoteDir); err != nil {
sklog.Errorf("Unable to upload %s to %s: %s", consolidatedLuaOutput, consolidatedOutputRemoteDir, err)
return
}
// Upload the lua aggregator (if specified) for this run to Google storage.
luaAggregatorName := *runID + ".aggregator"
luaAggregatorPath := filepath.Join(os.TempDir(), luaAggregatorName)
defer skutil.Remove(luaAggregatorPath)
luaAggregatorFileInfo, err := os.Stat(luaAggregatorPath)
if !os.IsNotExist(err) && luaAggregatorFileInfo.Size() > 10 {
if err := gs.UploadFile(luaAggregatorName, os.TempDir(), luaScriptRemoteDir); err != nil {
sklog.Errorf("Could not upload %s to %s: %s", luaAggregatorName, luaScriptRemoteDir, err)
return
}
// Run the aggregator and save stdout.
luaAggregatorOutputFileName := *runID + ".agg.output"
luaAggregatorOutputFilePath := filepath.Join(os.TempDir(), luaAggregatorOutputFileName)
luaAggregatorOutputFile, err := os.Create(luaAggregatorOutputFilePath)
defer skutil.Close(luaAggregatorOutputFile)
defer skutil.Remove(luaAggregatorOutputFilePath)
if err != nil {
sklog.Errorf("Could not create %s: %s", luaAggregatorOutputFilePath, err)
return
}
err = util.ExecuteCmd(ctx, util.BINARY_LUA, []string{luaAggregatorPath}, []string{},
util.LUA_AGGREGATOR_TIMEOUT, luaAggregatorOutputFile, nil)
if err != nil {
sklog.Errorf("Could not execute the lua aggregator %s: %s", luaAggregatorPath, err)
return
}
// Copy the aggregator output into Google Storage.
luaAggregatorOutputRemoteLink = util.GCS_HTTP_LINK + filepath.Join(util.GCSBucketName, consolidatedOutputRemoteDir, luaAggregatorOutputFileName)
if err := gs.UploadFile(luaAggregatorOutputFileName, os.TempDir(), consolidatedOutputRemoteDir); err != nil {
sklog.Errorf("Unable to upload %s to %s: %s", luaAggregatorOutputFileName, consolidatedOutputRemoteDir, err)
return
}
} else {
sklog.Info("A lua aggregator has not been specified.")
}
taskCompletedSuccessfully = true
}
func waitForOutputFile(luaOutput string) error {
// Check every 10 secs and timeout after 10 mins.
ticker := time.NewTicker(10 * time.Second)
deadline := 10 * time.Minute
deadlineTicker := time.NewTicker(deadline)
defer ticker.Stop()
defer deadlineTicker.Stop()
for {
select {
case <-ticker.C:
if _, err := os.Stat(luaOutput); os.IsNotExist(err) {
return nil
}
sklog.Infof("%s still exists. Waiting for the other lua task to complete.", luaOutput)
case <-deadlineTicker.C:
return fmt.Errorf("%s still existed after %v secs", luaOutput, deadline.Seconds())
}
}
}