blob: 82e375f45d339d9292bfefbe89dbb121ccf90d97 [file] [log] [blame]
package supported_branches
import (
"context"
"net/http"
"strings"
"time"
apipb "go.chromium.org/luci/swarming/proto/api_v2"
"go.skia.org/infra/go/cq"
"go.skia.org/infra/go/gitiles"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/supported_branches"
"go.skia.org/infra/go/swarming"
swarmingv2 "go.skia.org/infra/go/swarming/v2"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/specs"
)
const (
// This metric indicates whether or not a given branch has a valid
// commit queue config; its value is 0 (false) or 1 (true).
metricBranchExists = "cq_cfg_branch_exists"
// This metric indicates whether or not a given CQ tryjob for a
// particular branch exists in tasks.json for that branch; its value
// is 0 (false) or 1 (true).
metricTryJobExists = "cq_cfg_tryjob_exists"
// This metric indicates whether or not bots exist which are able to run
// a given CQ tryjob for a given branch. Its value is 0 (false) or 1
// (true).
metricBotExists = "cq_cfg_bot_exists_for_tryjob"
)
// botCanRunTask returns true iff a bot with the given dimensions is able to
// run a task with the given dimensions.
func botCanRunTask(botDims, taskDims map[string][]string) bool {
for k, vals := range taskDims {
for _, v := range vals {
if !util.In(v, botDims[k]) {
return false
}
}
}
return true
}
// metricsForRepo collects supported branch metrics for a single repo.
func metricsForRepo(repo *gitiles.Repo, newMetrics map[metrics2.Int64Metric]struct{}, botDimsList []map[string][]string) error {
sbc, err := supported_branches.ReadConfigFromRepo(repo)
if err != nil {
if strings.Contains(err.Error(), "Not Found") || strings.Contains(err.Error(), "404") {
sklog.Infof("Skipping repo %s; no supported branches file found.", repo.URL())
return nil
}
return skerr.Wrapf(err, "failed to get supported branches for %s", repo.URL())
}
for _, branch := range sbc.Branches {
// Find the CQ trybots for this branch.
cqJobsToCfg, err := cq.GetCQJobsToConfig(repo, branch.Ref)
if err != nil {
return skerr.Wrapf(err, "Failed to get CQ config for %s and %s", repo.URL(), branch.Ref)
}
branchExistsMetric := metrics2.GetInt64Metric(metricBranchExists, map[string]string{
"repo": repo.URL(),
"branch": branch.Ref,
})
branchExistsMetric.Update(1)
newMetrics[branchExistsMetric] = struct{}{}
cqTrybots := []string{}
for builder := range cqJobsToCfg {
cqTrybots = append(cqTrybots, builder)
}
// Obtain the tasks cfg for this branch.
tasksContents, err := repo.ReadFileAtRef(context.Background(), specs.TASKS_CFG_FILE, branch.Ref)
if err != nil {
return skerr.Wrapf(err, "failed to read %s on %s of %s", specs.TASKS_CFG_FILE, branch.Ref, repo.URL())
}
tasksCfg, err := specs.ParseTasksCfg(string(tasksContents))
if err != nil {
return skerr.Wrapf(err, "failed to parse %s on %s of %s", specs.TASKS_CFG_FILE, branch.Ref, repo.URL())
}
// Determine whether each tryjob exists in the tasks cfg.
for _, job := range cqTrybots {
jobSpec, ok := tasksCfg.Jobs[job]
jobExists := int64(0)
if ok {
jobExists = 1
}
jobExistsMetric := metrics2.GetInt64Metric(metricTryJobExists, map[string]string{
"repo": repo.URL(),
"branch": branch.Ref,
"tryjob": job,
})
jobExistsMetric.Update(jobExists)
newMetrics[jobExistsMetric] = struct{}{}
// Determine whether bots exist for this tryjob.
if ok {
// First, find all tasks for the job.
tasks := map[string]*specs.TaskSpec{}
var add func(string)
add = func(name string) {
taskSpec := tasksCfg.Tasks[name]
tasks[name] = taskSpec
for _, dep := range taskSpec.Dependencies {
add(dep)
}
}
for _, task := range jobSpec.TaskSpecs {
add(task)
}
// Now verify that there's at least one bot
// which can run each task.
botExists := int64(1)
for taskName, taskSpec := range tasks {
taskDims, err := swarming.ParseDimensions(taskSpec.Dimensions)
if err != nil {
return skerr.Wrapf(err, "failed to parse dimensions for %s on %s; dims: %+v", taskName, branch.Ref, taskSpec.Dimensions)
}
canRunTask := false
for _, botDims := range botDimsList {
if botCanRunTask(botDims, taskDims) {
canRunTask = true
break
}
}
if !canRunTask {
botExists = 0
sklog.Warningf("No bot can run %s on %s in %s", taskName, branch.Ref, repo.URL)
break
}
}
botExistsMetric := metrics2.GetInt64Metric(metricBotExists, map[string]string{
"repo": repo.URL(),
"branch": branch.Ref,
"tryjob": job,
})
botExistsMetric.Update(botExists)
newMetrics[botExistsMetric] = struct{}{}
}
}
}
return nil
}
// Perform one iteration of supported branch metrics.
func cycle(ctx context.Context, repos []*gitiles.Repo, oldMetrics map[metrics2.Int64Metric]struct{}, swarm swarmingv2.SwarmingV2Client, pools []string) (map[metrics2.Int64Metric]struct{}, error) {
// Get all of the Swarming bots.
bots := []*apipb.BotInfo{}
for _, pool := range pools {
// We don't want to count dead or quarantined bots, but it
// doesn't matter if they are busy or free at the moment.
b, err := swarmingv2.ListBotsHelper(ctx, swarm, &apipb.BotsRequest{
Dimensions: []*apipb.StringPair{
{Key: swarming.DIMENSION_POOL_KEY, Value: pool},
},
IsDead: apipb.NullableBool_FALSE,
Quarantined: apipb.NullableBool_FALSE,
})
if err != nil {
return nil, err
}
bots = append(bots, b...)
}
// Collect all dimensions for all bots.
// TODO(borenet): Can we exclude duplicates?
botDimsList := make([]map[string][]string, 0, len(bots))
for _, bot := range bots {
botDimsList = append(botDimsList, swarmingv2.BotDimensionsToStringMap(bot.Dimensions))
}
// Calculate metrics for each repo.
newMetrics := map[metrics2.Int64Metric]struct{}{}
for _, repo := range repos {
if err := metricsForRepo(repo, newMetrics, botDimsList); err != nil {
return nil, err
}
}
// Delete unused old metrics.
for m := range oldMetrics {
if _, ok := newMetrics[m]; !ok {
if err := m.Delete(); err != nil {
sklog.Errorf("Failed to delete metric: %s", err)
// Add the metric to newMetrics so that we'll
// have the chance to delete it again on the
// next cycle.
newMetrics[m] = struct{}{}
}
}
}
return newMetrics, nil
}
// Start collecting metrics for supported branches.
func Start(ctx context.Context, repoUrls []string, client *http.Client, swarm swarmingv2.SwarmingV2Client, pools []string) {
repos := make([]*gitiles.Repo, 0, len(repoUrls))
for _, repo := range repoUrls {
repos = append(repos, gitiles.NewRepo(repo, client))
}
lv := metrics2.NewLiveness("last_successful_supported_branches_update")
oldMetrics := map[metrics2.Int64Metric]struct{}{}
go util.RepeatCtx(ctx, 5*time.Minute, func(ctx context.Context) {
newMetrics, err := cycle(ctx, repos, oldMetrics, swarm, pools)
if err == nil {
lv.Reset()
oldMetrics = newMetrics
} else {
sklog.Errorf("Failed to update supported branches metrics: %s", err)
}
})
}