| /* |
| Pulls data from multiple sources and funnels into InfluxDB. |
| */ |
| |
| package main |
| |
| import ( |
| "flag" |
| "os" |
| "path" |
| "path/filepath" |
| "regexp" |
| "time" |
| |
| "cloud.google.com/go/storage" |
| "go.skia.org/infra/datahopper/go/accum" |
| "go.skia.org/infra/datahopper/go/bot_metrics" |
| "go.skia.org/infra/go/auth" |
| "go.skia.org/infra/go/buildbot" |
| "go.skia.org/infra/go/common" |
| "go.skia.org/infra/go/gcs" |
| "go.skia.org/infra/go/git/repograph" |
| "go.skia.org/infra/go/metrics2" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/swarming" |
| "golang.org/x/net/context" |
| "google.golang.org/api/option" |
| ) |
| |
| const ( |
| MEASUREMENT_SWARM_BOTS_LAST_SEEN = "swarming.bots.last-seen" |
| MEASUREMENT_SWARM_BOTS_QUARANTINED = "swarming.bots.quarantined" |
| MEASUREMENT_SWARM_TASKS_STATE = "swarming.tasks.state" |
| MEASUREMENT_SWARM_TASKS_DURATION = "swarming.tasks.duration" |
| MEASUREMENT_SWARM_TASKS_OVERHEAD_BOT = "swarming.tasks.overhead.bot" |
| MEASUREMENT_SWARM_TASKS_OVERHEAD_DOWNLOAD = "swarming.tasks.overhead.download" |
| MEASUREMENT_SWARM_TASKS_OVERHEAD_UPLOAD = "swarming.tasks.overhead.upload" |
| MEASUREMENT_SWARM_TASKS_PENDING_TIME = "swarming.tasks.pending-time" |
| ) |
| |
| // flags |
| var ( |
| grpcPort = flag.String("grpc_port", ":8000", "Port on which to run the buildbot data gRPC server.") |
| httpPort = flag.String("http_port", ":8001", "Port on which to run the HTTP server.") |
| local = flag.Bool("local", false, "Running locally if true. As opposed to in production.") |
| promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')") |
| taskSchedulerDbUrl = flag.String("task_db_url", "http://skia-task-scheduler:8008/db/", "Where the Skia task scheduler database is hosted.") |
| workdir = flag.String("workdir", ".", "Working directory used by data processors.") |
| ) |
| |
| var ( |
| // Regexp matching non-alphanumeric characters. |
| re = regexp.MustCompile("[^A-Za-z0-9]+") |
| |
| BUILDSLAVE_OFFLINE_BLACKLIST = []string{ |
| "build3-a3", |
| "build4-a3", |
| "vm255-m3", |
| } |
| ) |
| |
| func main() { |
| defer common.LogPanic() |
| common.InitWithMust( |
| "datahopper", |
| common.PrometheusOpt(promPort), |
| common.CloudLoggingOpt(), |
| ) |
| |
| // Absolutify the workdir. |
| w, err := filepath.Abs(*workdir) |
| if err != nil { |
| sklog.Fatal(w) |
| } |
| sklog.Infof("Workdir is %s", w) |
| |
| // Authenticated HTTP client. |
| oauthCacheFile := path.Join(w, "google_storage_token.data") |
| httpClient, err := auth.NewClient(*local, oauthCacheFile, swarming.AUTH_SCOPE) |
| if err != nil { |
| sklog.Fatal(err) |
| } |
| |
| // Swarming API client. |
| swarm, err := swarming.NewApiClient(httpClient, swarming.SWARMING_SERVER) |
| if err != nil { |
| sklog.Fatal(err) |
| } |
| |
| // Shared repo objects. |
| reposDir := path.Join(w, "repos") |
| if err := os.MkdirAll(reposDir, os.ModePerm); err != nil { |
| sklog.Fatal(err) |
| } |
| repos, err := repograph.NewMap([]string{common.REPO_SKIA, common.REPO_SKIA_INFRA}, reposDir) |
| if err != nil { |
| sklog.Fatal(err) |
| } |
| if err := repos.Update(); err != nil { |
| sklog.Fatal(err) |
| } |
| |
| // Data generation goroutines. |
| db, err := buildbot.NewLocalDB(path.Join(w, "buildbot.db")) |
| if err != nil { |
| sklog.Fatal(err) |
| } |
| |
| // Run a server for the buildbot data. |
| if _, err := buildbot.RunBuildServer(*grpcPort, db); err != nil { |
| sklog.Fatal(err) |
| } |
| |
| // Swarming bots. |
| go func() { |
| oldMetrics := []metrics2.Int64Metric{} |
| for _ = range time.Tick(2 * time.Minute) { |
| sklog.Info("Loading Skia Swarming bot data.") |
| skiaBots, err := swarm.ListSkiaBots() |
| if err != nil { |
| sklog.Error(err) |
| continue |
| } |
| sklog.Info("Loading CT Swarming bot data.") |
| ctBots, err := swarm.ListCTBots() |
| if err != nil { |
| sklog.Error(err) |
| continue |
| } |
| bots := append(skiaBots, ctBots...) |
| |
| // Delete old metrics, replace with new ones. This fixes the case where |
| // bots are removed but their metrics hang around, or where dimensions |
| // change resulting in duplicate metrics with the same bot ID. |
| failedDelete := []metrics2.Int64Metric{} |
| for _, m := range oldMetrics { |
| if err := m.Delete(); err != nil { |
| sklog.Warningf("Failed to delete metric: %s", err) |
| failedDelete = append(failedDelete, m) |
| } |
| } |
| oldMetrics = append([]metrics2.Int64Metric{}, failedDelete...) |
| |
| now := time.Now() |
| for _, bot := range bots { |
| last, err := time.Parse("2006-01-02T15:04:05", bot.LastSeenTs) |
| if err != nil { |
| sklog.Error(err) |
| continue |
| } |
| |
| tags := map[string]string{ |
| "bot": bot.BotId, |
| "pool": "Skia", |
| } |
| |
| // Bot last seen <duration> ago. |
| m1 := metrics2.GetInt64Metric(MEASUREMENT_SWARM_BOTS_LAST_SEEN, tags) |
| m1.Update(int64(now.Sub(last))) |
| oldMetrics = append(oldMetrics, m1) |
| |
| // Bot quarantined status. |
| quarantined := int64(0) |
| if bot.Quarantined { |
| quarantined = int64(1) |
| } |
| m2 := metrics2.GetInt64Metric(MEASUREMENT_SWARM_BOTS_QUARANTINED, tags) |
| m2.Update(quarantined) |
| oldMetrics = append(oldMetrics, m2) |
| } |
| } |
| }() |
| |
| // Swarming tasks. |
| go func() { |
| taskAccum := accum.New(accum.DefaultReporter) |
| // Initial query: load data from the past 2 minutes. |
| lastLoad := time.Now().Add(-2 * time.Minute) |
| |
| revisitTasks := map[string]bool{} |
| |
| for _ = range time.Tick(2 * time.Minute) { |
| now := time.Now() |
| tasks, err := swarm.ListSkiaTasks(lastLoad, now) |
| if err != nil { |
| sklog.Error(err) |
| continue |
| } |
| |
| // Count the number of tasks in each state. |
| counts := map[string]int64{} |
| for _, t := range tasks { |
| counts[t.TaskResult.State] += 1 |
| } |
| |
| // Report the number of running tasks as a metric. |
| for state, count := range counts { |
| skiaGauge := metrics2.GetInt64Metric(MEASUREMENT_SWARM_TASKS_STATE, map[string]string{"pool": "Skia", "state": state}) |
| skiaGauge.Update(count) |
| } |
| |
| for id, _ := range revisitTasks { |
| task, err := swarm.GetTaskMetadata(id) |
| if err != nil { |
| sklog.Error(err) |
| continue |
| } |
| tasks = append(tasks, task) |
| } |
| revisitTasks = map[string]bool{} |
| lastLoad = now |
| for _, task := range tasks { |
| if task.TaskResult.State == "COMPLETED" { |
| if task.TaskResult.DedupedFrom != "" { |
| continue |
| } |
| |
| // Get the created time for the task. We'll use that as the |
| // timestamp for all data points related to it. |
| createdTime, err := swarming.Created(task) |
| if err != nil { |
| sklog.Errorf("Failed to parse Swarming task created timestamp: %s", err) |
| continue |
| } |
| |
| // Find the tags for the task, including ID, name, dimensions, |
| // and components of the builder name. |
| var builderName string |
| var name string |
| user, err := swarming.GetTagValue(task.TaskResult, "user") |
| if err != nil || user == "" { |
| // This is an old-style task. |
| name, err = swarming.GetTagValue(task.TaskResult, "name") |
| if err != nil || name == "" { |
| sklog.Errorf("Failed to find name for Swarming task: %v", task) |
| continue |
| } |
| builderName, err = swarming.GetTagValue(task.TaskResult, "buildername") |
| if err != nil || builderName == "" { |
| sklog.Errorf("Failed to find buildername for Swarming task: %v", task) |
| continue |
| } |
| } else if user == "skia-task-scheduler" { |
| // This is a new-style task. |
| builderName, err = swarming.GetTagValue(task.TaskResult, "sk_name") |
| if err != nil || builderName == "" { |
| sklog.Errorf("Failed to find sk_name for Swarming task: %v", task) |
| continue |
| } |
| name = builderName |
| } |
| |
| // Leave 'task-id' in 'tags' so that it gets reported to logs, |
| // knowing that Accum will remove it from the metrics. |
| tags := map[string]string{ |
| "bot-id": task.TaskResult.BotId, |
| "task-id": task.TaskId, |
| "task-name": name, |
| "pool": "Skia", |
| } |
| |
| // Task duration in milliseconds. |
| taskAccum.Add(MEASUREMENT_SWARM_TASKS_DURATION, tags, int64(task.TaskResult.Duration*float64(1000.0))) |
| |
| if task.TaskResult.PerformanceStats != nil { |
| // Overhead stats, in milliseconds. |
| taskAccum.Add(MEASUREMENT_SWARM_TASKS_OVERHEAD_BOT, tags, int64(task.TaskResult.PerformanceStats.BotOverhead*float64(1000.0))) |
| if task.TaskResult.PerformanceStats.IsolatedDownload != nil { |
| taskAccum.Add(MEASUREMENT_SWARM_TASKS_OVERHEAD_DOWNLOAD, tags, int64(task.TaskResult.PerformanceStats.IsolatedDownload.Duration*float64(1000.0))) |
| } else { |
| sklog.Errorf("Swarming task is missing its IsolatedDownload section: %v", task.TaskResult) |
| } |
| if task.TaskResult.PerformanceStats.IsolatedUpload != nil { |
| taskAccum.Add(MEASUREMENT_SWARM_TASKS_OVERHEAD_UPLOAD, tags, int64(task.TaskResult.PerformanceStats.IsolatedUpload.Duration*float64(1000.0))) |
| } else { |
| sklog.Errorf("Swarming task is missing its IsolatedUpload section: %v", task.TaskResult) |
| } |
| } |
| |
| // Pending time in milliseconds. |
| startTime, err := swarming.Started(task) |
| if err != nil { |
| sklog.Errorf("Failed to parse Swarming task started timestamp: %s", err) |
| continue |
| } |
| pendingMs := int64(startTime.Sub(createdTime).Seconds() * float64(1000.0)) |
| taskAccum.Add(MEASUREMENT_SWARM_TASKS_PENDING_TIME, tags, pendingMs) |
| } else { |
| revisitTasks[task.TaskId] = true |
| } |
| } |
| taskAccum.Report() |
| } |
| }() |
| |
| // Number of commits in the repo. |
| go func() { |
| skiaGauge := metrics2.GetInt64Metric("repo.commits", map[string]string{"repo": "skia"}) |
| infraGauge := metrics2.GetInt64Metric("repo.commits", map[string]string{"repo": "infra"}) |
| for _ = range time.Tick(5 * time.Minute) { |
| nSkia, err := repos[common.REPO_SKIA].Repo().NumCommits() |
| if err != nil { |
| sklog.Errorf("Failed to get number of commits for Skia: %s", err) |
| } else { |
| skiaGauge.Update(nSkia) |
| } |
| nInfra, err := repos[common.REPO_SKIA_INFRA].Repo().NumCommits() |
| if err != nil { |
| sklog.Errorf("Failed to get number of commits for Infra: %s", err) |
| } else { |
| infraGauge.Update(nInfra) |
| } |
| } |
| }() |
| |
| // Time since last successful backup. |
| go func() { |
| lv := metrics2.NewLiveness("last-buildbot-db-backup", nil) |
| authClient, err := auth.NewDefaultJWTServiceAccountClient(auth.SCOPE_READ_ONLY) |
| if err != nil { |
| sklog.Fatal(err) |
| } |
| gsClient, err := storage.NewClient(context.Background(), option.WithHTTPClient(authClient)) |
| if err != nil { |
| sklog.Fatal(err) |
| } |
| setLastBackupTime := func() error { |
| last := time.Time{} |
| if err := gcs.AllFilesInDir(gsClient, "skia-buildbots", "db_backup", func(item *storage.ObjectAttrs) { |
| if item.Updated.After(last) { |
| last = item.Updated |
| } |
| }); err != nil { |
| return err |
| } |
| lv.ManualReset(last) |
| sklog.Infof("Last DB backup was %s.", last) |
| return nil |
| } |
| if err := setLastBackupTime(); err != nil { |
| sklog.Fatal(err) |
| } |
| for _ = range time.Tick(10 * time.Minute) { |
| if err := setLastBackupTime(); err != nil { |
| sklog.Errorf("Failed to get last DB backup time: %s", err) |
| } |
| } |
| }() |
| |
| // Jobs metrics. |
| if err := StartJobMetrics(*taskSchedulerDbUrl, context.Background()); err != nil { |
| sklog.Fatal(err) |
| } |
| |
| // Generate "time to X% bot coverage" metrics. |
| if err := bot_metrics.Start(*taskSchedulerDbUrl, *workdir, db, context.Background()); err != nil { |
| sklog.Fatal(err) |
| } |
| |
| // Run a backup server. |
| go func() { |
| sklog.Fatal(buildbot.RunBackupServer(db, *httpPort)) |
| }() |
| |
| // Wait while the above goroutines generate data. |
| select {} |
| } |