blob: 72de000383e61b071850980b0411d519fb4bf7cf [file] [log] [blame]
// The CQ Watcher monitors the Skia CQ for Gerrit CLs. It pumps the results of
// the monitoring into InfluxDB.
package main
import (
"context"
"flag"
"fmt"
"time"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/cq"
"go.skia.org/infra/go/gerrit"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"golang.org/x/oauth2/google"
)
var (
testing = flag.Bool("testing", false, "Set to true for local testing.")
promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':20000')")
)
const (
// How often the two pollers should poll Gerrit.
IN_FLIGHT_POLL_TIME = 30 * time.Second
AFTER_COMMIT_POLL_TIME = 5 * time.Minute
// How often to refresh the trybots received from cq.cfg.
REFRESH_CQ_TRYBOTS_TIME = time.Hour
MAX_CLS_PER_POLL = 100
METRIC_NAME = "cq_watcher"
)
// monitorStatsForInFlightCLs queries Gerrit for all CLs currently in dry run
// or waiting for commit. The following metrics are then reported for the
// matching CLs:
// * Number of CLs waiting for the CQ (both for dry runs and commits).
// * How long CQ trybots have been running for.
// * How many CQ trybots have been triggered.
func monitorStatsForInFlightCLs(ctx context.Context, cqClient *cq.Client, gerritClient *gerrit.Gerrit) {
liveness := metrics2.NewLiveness(fmt.Sprintf("%s_%s", METRIC_NAME, cq.INFLIGHT_METRIC_NAME))
cqMetric := metrics2.GetInt64Metric(fmt.Sprintf("%s_%s_%s", METRIC_NAME, cq.INFLIGHT_METRIC_NAME, cq.INFLIGHT_WAITING_IN_CQ))
oldMetrics := map[metrics2.Int64Metric]struct{}{}
util.RepeatCtx(ctx, time.Duration(IN_FLIGHT_POLL_TIME), func(ctx context.Context) {
dryRunChanges, err := gerritClient.Search(ctx, MAX_CLS_PER_POLL, true, gerrit.SearchStatus("open"), gerrit.SearchProject("skia"), gerrit.SearchLabel(gerrit.LabelCommitQueue, "1"))
if err != nil {
sklog.Errorf("Error searching for open changes with dry run in Gerrit: %s", err)
return
}
cqChanges, err := gerritClient.Search(ctx, MAX_CLS_PER_POLL, true, gerrit.SearchStatus("open"), gerrit.SearchProject("skia"), gerrit.SearchLabel(gerrit.LabelCommitQueue, "2"))
if err != nil {
sklog.Errorf("Error searching for open changes waiting for CQ in Gerrit: %s", err)
return
}
// Combine dryrun and CQ changes into one slice.
changes := dryRunChanges
changes = append(changes, cqChanges...)
cqMetric.Update(int64(len(changes)))
newMetrics := map[metrics2.Int64Metric]struct{}{}
for _, change := range changes {
if err := cqClient.ReportCQStats(ctx, change.Issue, newMetrics); err != nil {
sklog.Errorf("Could not get CQ stats for %d: %s", change.Issue, err)
continue
}
}
// If no CLs are currently in the CQ then report dummy data to prevent
// Absent alerts in the alert manager.
if len(changes) == 0 {
inflightTrybotDurationMetric := metrics2.GetInt64Metric(fmt.Sprintf("%s_%s_%s", METRIC_NAME, cq.INFLIGHT_METRIC_NAME, cq.INFLIGHT_TRYBOT_DURATION), map[string]string{"trybot": "DummyTrybot", "gerritURL": "DummyGerritURL"})
inflightTrybotDurationMetric.Update(0)
trybotNumDurationMetric := metrics2.GetInt64Metric(fmt.Sprintf("%s_%s_%s", METRIC_NAME, cq.INFLIGHT_METRIC_NAME, cq.INFLIGHT_TRYBOT_NUM), map[string]string{"gerritURL": "DummyGerritURL"})
trybotNumDurationMetric.Update(0)
}
liveness.Reset()
// Delete unused old metrics and use new metrics as old ones for next iteration.
deleteUnusedOldMetrics(newMetrics, oldMetrics)
oldMetrics = newMetrics
})
}
// monitorStatsForLandedCLs queries Gerrit for all CLs that have been merged in
// the last AFTER_COMMIT_POLL_TIME seconds. The following metrics are then
// reported for the matching CLs:
// * The total time the CL spent waiting for CQ trybots to complete.
// * The time each CQ trybot took to complete.
func monitorStatsForLandedCLs(ctx context.Context, cqClient *cq.Client, gerritClient *gerrit.Gerrit) {
liveness := metrics2.NewLiveness(fmt.Sprintf("%s_%s", METRIC_NAME, cq.LANDED_METRIC_NAME))
previousPollChanges := []*gerrit.ChangeInfo{}
oldMetrics := map[metrics2.Int64Metric]struct{}{}
util.RepeatCtx(ctx, time.Duration(AFTER_COMMIT_POLL_TIME), func(ctx context.Context) {
// Add a short (2 min) buffer to overlap with the last poll to make sure
// we do not lose any edge cases.
t_delta := time.Now().Add(-AFTER_COMMIT_POLL_TIME).Add(-2 * time.Minute)
changes, err := gerritClient.Search(ctx, MAX_CLS_PER_POLL, true, gerrit.SearchModifiedAfter(t_delta), gerrit.SearchStatus("merged"), gerrit.SearchProject("skia"))
if err != nil {
sklog.Errorf("Error searching for merged changes in Gerrit: %s", err)
return
}
newMetrics := map[metrics2.Int64Metric]struct{}{}
for _, change := range changes {
if gerrit.ContainsAny(change.Issue, previousPollChanges) {
continue
}
if err := cqClient.ReportCQStats(ctx, change.Issue, newMetrics); err != nil {
sklog.Errorf("Could not get CQ stats for %d: %s", change.Issue, err)
continue
}
}
previousPollChanges = changes
liveness.Reset()
// Delete unused old metrics and use new metrics as old ones for next iteration.
deleteUnusedOldMetrics(newMetrics, oldMetrics)
oldMetrics = newMetrics
})
}
func deleteUnusedOldMetrics(newMetrics, oldMetrics map[metrics2.Int64Metric]struct{}) {
for m := range oldMetrics {
if _, ok := newMetrics[m]; !ok {
if err := m.Delete(); err != nil {
sklog.Errorf("Failed to delete metric: %s", err)
// Add the metric to newMetrics so that we'll
// have the chance to delete it again on the
// next cycle.
newMetrics[m] = struct{}{}
}
}
}
}
func refreshCQTryBots(cqClient *cq.Client) {
for range time.Tick(time.Duration(REFRESH_CQ_TRYBOTS_TIME)) {
if err := cqClient.RefreshCQTryBots(); err != nil {
sklog.Errorf("Error refreshing CQ trybots: %s", err)
}
}
}
func main() {
common.InitWithMust(
METRIC_NAME,
common.PrometheusOpt(promPort),
)
ctx := context.Background()
ts, err := google.DefaultTokenSource(ctx, auth.ScopeUserinfoEmail, auth.ScopeGerrit)
if err != nil {
sklog.Fatal(err)
}
httpClient := httputils.DefaultClientConfig().WithTokenSource(ts).With2xxOnly().Client()
gerritClient, err := gerrit.NewGerrit(gerrit.GerritSkiaURL, httpClient)
if err != nil {
sklog.Fatalf("Failed to create Gerrit client: %s", err)
}
cqClient, err := cq.NewClient(gerritClient, cq.GetSkiaCQTryBots, METRIC_NAME)
if err != nil {
sklog.Fatalf("Failed to create CQ client: %s", err)
}
// Periodically refresh slice of trybots to make sure any changes to cq.cfg
// are picked up without needing to restart the app.
go refreshCQTryBots(cqClient)
// Monitor in-flight CLs.
go monitorStatsForInFlightCLs(ctx, cqClient, gerritClient)
// Monitor landed CLs.
monitorStatsForLandedCLs(ctx, cqClient, gerritClient)
}