blob: e0e0036e91eb18114fc76e46ef1f2cf3356faf3d [file] [log] [blame]
// metrics_analysis_on_workers is an application that runs the
// analysis_metrics_ct benchmark on all CT workers and uploads results to Google
// Storage. The requester is emailed when the task is started and also after
// completion.
//
// Can be tested locally with:
// $ go run go/master_scripts/metrics_analysis_on_workers/main.go --run_id=rmistry-test1 --benchmark_extra_args="--output-format=csv" --description=testing --metric_name=loadingMetric --analysis_output_link="https://ct.skia.org/results/cluster-telemetry/tasks/benchmark_runs/rmistry-20180502115012/consolidated_outputs/rmistry-20180502115012.output"
package main
import (
"context"
"errors"
"flag"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"go.skia.org/infra/ct/go/master_scripts/master_common"
"go.skia.org/infra/ct/go/util"
"go.skia.org/infra/go/fileutil"
"go.skia.org/infra/go/git"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
skutil "go.skia.org/infra/go/util"
)
const (
MAX_PAGES_PER_SWARMING_BOT = 50
)
var (
benchmarkExtraArgs = flag.String("benchmark_extra_args", "", "The extra arguments that are passed to the analysis_metrics_ct benchmark.")
runID = flag.String("run_id", "", "The unique run id (typically requester + timestamp).")
metricName = flag.String("metric_name", "", "The metric to parse the traces with. Eg: loadingMetric")
analysisOutputLink = flag.String("analysis_output_link", "", "Cloud trace links will be gathered from this specified CT analysis run Id. If not specified, trace links will be read from ${TMPDIR}/<run_id>.traces.csv")
valueColumnName = flag.String("value_column_name", "", "Which column's entries to use as field values when combining CSVs.")
taskPriority = flag.Int("task_priority", util.TASKS_PRIORITY_MEDIUM, "The priority swarming tasks should run at.")
chromiumPatchGSPath = flag.String("chromium_patch_gs_path", "", "The location of the Chromium patch in Google storage.")
catapultPatchGSPath = flag.String("catapult_patch_gs_path", "", "The location of the Catapult patch in Google storage.")
customTracesCsvGSPath = flag.String("custom_traces_csv_gs_path", "", "The location of the custom traces CSV in Google storage.")
)
func metricsAnalysisOnWorkers() error {
swarmingClient, casClient, err := master_common.Init("run_metrics_analysis")
if err != nil {
return fmt.Errorf("Could not init: %s", err)
}
ctx := context.Background()
// Instantiate GcsUtil object.
gs, err := util.NewGcsUtil(nil)
if err != nil {
return fmt.Errorf("Could not instantiate gsutil object: %s", err)
}
// Cleanup dirs after run completes.
defer skutil.RemoveAll(filepath.Join(util.StorageDir, util.BenchmarkRunsDir, *runID))
// Finish with glog flush and how long the task took.
defer util.TimeTrack(time.Now(), "Running metrics analysis task on workers")
defer sklog.Flush()
if *runID == "" {
return errors.New("Must specify --run_id")
}
if *metricName == "" {
return errors.New("Must specify --metric_name")
}
// Use defaults.
if *valueColumnName == "" {
*valueColumnName = util.DEFAULT_VALUE_COLUMN_NAME
}
for fileSuffix, patchGSPath := range map[string]string{
".chromium.patch": *chromiumPatchGSPath,
".catapult.patch": *catapultPatchGSPath,
".traces.csv": *customTracesCsvGSPath,
} {
patch, err := util.GetPatchFromStorage(patchGSPath)
if err != nil {
return fmt.Errorf("Could not download patch %s from Google storage: %s", patchGSPath, err)
}
// Add an extra newline at the end because git sometimes rejects patches due to
// missing newlines.
patch = patch + "\n"
patchPath := filepath.Join(os.TempDir(), *runID+fileSuffix)
if err := os.WriteFile(patchPath, []byte(patch), 0666); err != nil {
return fmt.Errorf("Could not write patch %s to %s: %s", patch, patchPath, err)
}
defer skutil.Remove(patchPath)
}
tracesFileName := *runID + ".traces.csv"
tracesFilePath := filepath.Join(os.TempDir(), tracesFileName)
if *analysisOutputLink != "" {
if err := extractTracesFromAnalysisRun(tracesFilePath, gs); err != nil {
return fmt.Errorf("Error when extracting traces from run %s to %s: %s", *analysisOutputLink, tracesFilePath, err)
}
}
// Figure out how many traces we are dealing with.
traces, err := util.GetCustomPages(tracesFilePath)
if err != nil {
return fmt.Errorf("Could not read %s: %s", tracesFilePath, err)
}
// Copy the patches and traces to Google Storage.
remoteOutputDir := filepath.Join(util.BenchmarkRunsDir, *runID)
chromiumPatchName := *runID + ".chromium.patch"
catapultPatchName := *runID + ".catapult.patch"
for _, patchName := range []string{chromiumPatchName, catapultPatchName, tracesFileName} {
if err := gs.UploadFile(patchName, os.TempDir(), remoteOutputDir); err != nil {
return fmt.Errorf("Could not upload %s to %s: %s", patchName, remoteOutputDir, err)
}
}
// Find which chromium hash the workers should use.
gitExec, err := git.Executable(ctx)
if err != nil {
return skerr.Wrap(err)
}
chromiumHash, err := util.GetChromiumHash(ctx, gitExec)
if err != nil {
return fmt.Errorf("Could not find the latest chromium hash: %s", err)
}
// Trigger task to return hash of telemetry isolates.
telemetryIsolatePatches := []string{filepath.Join(remoteOutputDir, chromiumPatchName), filepath.Join(remoteOutputDir, catapultPatchName)}
telemetryHash, err := util.TriggerIsolateTelemetrySwarmingTask(ctx, "isolate_telemetry", *runID, chromiumHash, "", util.PLATFORM_LINUX, telemetryIsolatePatches, 1*time.Hour, 1*time.Hour, *master_common.Local, swarmingClient, casClient)
if err != nil {
return fmt.Errorf("Error encountered when swarming isolate telemetry task: %s", err)
}
if telemetryHash == "" {
return errors.New("Found empty telemetry hash!")
}
// Calculate the max pages to run per bot.
maxPagesPerBot := util.GetMaxPagesPerBotValue(*benchmarkExtraArgs, MAX_PAGES_PER_SWARMING_BOT)
// Archive, trigger and collect swarming tasks.
baseCmd := []string{
"luci-auth",
"context",
"--",
"bin/metrics_analysis",
"-logtostderr",
"--run_id=" + *runID,
"--benchmark_extra_args=" + *benchmarkExtraArgs,
"--metric_name=" + *metricName,
"--value_column_name=" + *valueColumnName,
}
casSpec := util.CasMetricsAnalysis()
casSpec.IncludeDigests = append(casSpec.IncludeDigests, telemetryHash)
numWorkers, err := util.TriggerSwarmingTask(ctx, "" /* pagesetType */, "metrics_analysis", *runID, util.PLATFORM_LINUX, casSpec, 12*time.Hour, 3*time.Hour, *taskPriority, maxPagesPerBot, len(traces), true /* runOnGCE */, *master_common.Local, util.GetRepeatValue(*benchmarkExtraArgs, 1), baseCmd, swarmingClient, casClient)
if err != nil {
return fmt.Errorf("Error encountered when swarming tasks: %s", err)
}
// If "--output-format=csv" is specified then merge all CSV files and upload.
noOutputWorkers := []string{}
pathToPyFiles, err := util.GetPathToPyFiles(*master_common.Local)
if err != nil {
return fmt.Errorf("Could not get path to py files: %s", err)
}
if strings.Contains(*benchmarkExtraArgs, "--output-format=csv") {
if _, noOutputWorkers, err = util.MergeUploadCSVFiles(ctx, *runID, pathToPyFiles, gs, len(traces), maxPagesPerBot, true /* handleStrings */, util.GetRepeatValue(*benchmarkExtraArgs, 1)); err != nil {
sklog.Errorf("Unable to merge and upload CSV files for %s: %s", *runID, err)
}
}
// If the number of noOutputWorkers is the same as the total number of triggered workers then consider the run failed.
if len(noOutputWorkers) == numWorkers {
return fmt.Errorf("All %d workers produced no output", numWorkers)
}
// Display the no output workers.
for _, noOutputWorker := range noOutputWorkers {
directLink := fmt.Sprintf(util.SWARMING_RUN_ID_TASK_LINK_PREFIX_TEMPLATE, *runID, "metrics_analysis_"+noOutputWorker)
fmt.Printf("Missing output from %s\n", directLink)
}
return nil
}
// extractTracesFromAnalysisRuns gathers all traceURLs from the specified analysis
// run and writes to the specified outputPath.
func extractTracesFromAnalysisRun(outputPath string, gs *util.GcsUtil) error {
// Construct path to the google storage locations and download it locally.
remoteCsvPath := strings.Split(*analysisOutputLink, util.GCSBucketName+"/")[1]
localDownloadPath := filepath.Join(util.StorageDir, util.BenchmarkRunsDir, *runID, "downloads")
localCsvPath := filepath.Join(localDownloadPath, *runID+".csv")
if err := fileutil.EnsureDirPathExists(localCsvPath); err != nil {
return fmt.Errorf("Could not create %s: %s", localDownloadPath, err)
}
defer skutil.RemoveAll(localDownloadPath)
if err := gs.DownloadRemoteFile(remoteCsvPath, localCsvPath); err != nil {
return fmt.Errorf("Error downloading %s to %s: %s", remoteCsvPath, localCsvPath, err)
}
headers, values, err := util.GetRowsFromCSV(localCsvPath)
if err != nil {
return fmt.Errorf("Could not read %s: %s. Analysis output link: %s", localCsvPath, err, *analysisOutputLink)
}
// Gather trace URLs from the CSV.
traceURLs := []string{}
for i := range headers {
if headers[i] == "traceUrls" {
for j := range values {
if values[j][i] != "" {
traceURLs = append(traceURLs, values[j][i])
}
}
}
}
if len(traceURLs) == 0 {
return fmt.Errorf("There were no traceURLs found for the analysis output link: %s", *analysisOutputLink)
}
if err := os.WriteFile(outputPath, []byte(strings.Join(traceURLs, ",")), 0644); err != nil {
return fmt.Errorf("Could not write traceURLs to %s: %s", outputPath, err)
}
return nil
}
func main() {
retCode := 0
if err := metricsAnalysisOnWorkers(); err != nil {
sklog.Errorf("Error while running metrics analysis on workers: %s", err)
retCode = 255
}
os.Exit(retCode)
}