blob: 044d24d3f437ea557b2bf9bea2b65feaf7100898 [file] [log] [blame]
// create_pagesets_on_workers is an application that creates pagesets on all CT
// workers and uploads it to Google Storage. The requester is emailed when the task
// is done.
package main
import (
"context"
"flag"
"fmt"
"path/filepath"
"time"
"go.skia.org/infra/ct/go/ctfe/admin_tasks"
"go.skia.org/infra/ct/go/frontend"
"go.skia.org/infra/ct/go/master_scripts/master_common"
"go.skia.org/infra/ct/go/util"
"go.skia.org/infra/go/sklog"
skutil "go.skia.org/infra/go/util"
)
const (
// TODO(rmistry): Change back to 1000 once swarming can handle >10k pending tasks.
MAX_PAGES_PER_SWARMING_BOT = 50000
)
var (
emails = flag.String("emails", "", "The comma separated email addresses to notify when the task is picked up and completes.")
taskID = flag.Int64("task_id", -1, "The key of the CT task in CTFE. The task will be updated when it is started and also when it completes.")
pagesetType = flag.String("pageset_type", "", "The type of pagesets to create from the Alexa CSV list. Eg: 10k, Mobile10k, All.")
runOnGCE = flag.Bool("run_on_gce", true, "Run on Linux GCE instances.")
runID = flag.String("run_id", "", "The unique run id (typically requester + timestamp).")
taskCompletedSuccessfully = new(bool)
)
func sendEmail(recipients []string) {
// Send completion email.
emailSubject := "Create pagesets Cluster telemetry task has completed"
failureHtml := ""
if !*taskCompletedSuccessfully {
emailSubject += " with failures"
failureHtml = util.GetFailureEmailHtml(*runID)
}
bodyTemplate := `
The Cluster telemetry queued task to create %s pagesets has completed. %s.<br/>
%s
You can schedule more runs <a href="%s">here</a>.<br/><br/>
Thanks!
`
emailBody := fmt.Sprintf(bodyTemplate, *pagesetType, util.GetSwarmingLogsLink(*runID), failureHtml, frontend.AdminTasksWebapp)
if err := util.SendEmail(recipients, emailSubject, emailBody); err != nil {
sklog.Errorf("Error while sending email: %s", err)
return
}
}
func updateWebappTask() {
vars := admin_tasks.RecreatePageSetsUpdateVars{}
vars.Id = *taskID
vars.SetCompleted(*taskCompletedSuccessfully)
skutil.LogErr(frontend.UpdateWebappTaskV2(&vars))
}
func main() {
master_common.Init("create_pagesets")
ctx := context.Background()
// Send start email.
emailsArr := util.ParseEmails(*emails)
emailsArr = append(emailsArr, util.CtAdmins...)
if len(emailsArr) == 0 {
sklog.Error("At least one email address must be specified")
return
}
skutil.LogErr(frontend.UpdateWebappTaskSetStarted(&admin_tasks.RecreatePageSetsUpdateVars{}, *taskID, *runID))
skutil.LogErr(util.SendTaskStartEmail(*taskID, emailsArr, "Creating pagesets", *runID, ""))
// Ensure webapp is updated and completion email is sent even if task fails.
defer updateWebappTask()
defer sendEmail(emailsArr)
// Finish with glog flush and how long the task took.
defer util.TimeTrack(time.Now(), "Creating Pagesets on Workers")
defer sklog.Flush()
if *pagesetType == "" {
sklog.Error("Must specify --pageset_type")
return
}
// Empty the remote dir before the workers upload to it.
gs, err := util.NewGcsUtil(nil)
if err != nil {
sklog.Error(err)
return
}
gsBaseDir := filepath.Join(util.SWARMING_DIR_NAME, util.PAGESETS_DIR_NAME, *pagesetType)
skutil.LogErr(gs.DeleteRemoteDir(gsBaseDir))
// Archive, trigger and collect swarming tasks.
if _, err := util.TriggerSwarmingTask(ctx, *pagesetType, "create_pagesets", util.CREATE_PAGESETS_ISOLATE, *runID, *master_common.ServiceAccountFile, 5*time.Hour, 1*time.Hour, util.TASKS_PRIORITY_LOW, MAX_PAGES_PER_SWARMING_BOT, util.PagesetTypeToInfo[*pagesetType].NumPages, map[string]string{}, *runOnGCE, *master_common.Local, 1, []string{} /* isolateDeps */); err != nil {
sklog.Errorf("Error encountered when swarming tasks: %s", err)
return
}
*taskCompletedSuccessfully = true
}