[task scheduler] Move overdue job metrics into Datahopper
Less stress on the scheduler, and datahopper already uses JobCache and
TaskCfgCache.
Bug: skia:
Change-Id: Icf15aada9d1c6919d18970cf36fa58ad3345f482
Reviewed-on: https://skia-review.googlesource.com/c/184060
Commit-Queue: Eric Boren <borenet@google.com>
Reviewed-by: Ben Wagner <benjaminwagner@google.com>
diff --git a/datahopper/go/bot_metrics/bot_metrics.go b/datahopper/go/bot_metrics/bot_metrics.go
index 691d8a6..bd02d8f 100644
--- a/datahopper/go/bot_metrics/bot_metrics.go
+++ b/datahopper/go/bot_metrics/bot_metrics.go
@@ -19,8 +19,6 @@
"strings"
"time"
- "go.skia.org/infra/go/common"
- "go.skia.org/infra/go/depot_tools"
"go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/metrics2/events"
@@ -29,7 +27,6 @@
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/specs"
"go.skia.org/infra/task_scheduler/go/types"
- "golang.org/x/oauth2"
)
const (
@@ -195,10 +192,6 @@
// before any other task, and inserts event data based on the lag time between
// a commit landing and each task finishing for that commit.
func cycle(ctx context.Context, taskDb db.TaskReader, repos repograph.Map, tcc *specs.TaskCfgCache, edb events.EventDB, em *events.EventMetrics, lastFinished, now time.Time, workdir string) error {
- if err := repos.Update(ctx); err != nil {
- return err
- }
-
totalCommits := 0
for _, r := range repos {
totalCommits += r.Len()
@@ -371,7 +364,7 @@
if err := writeTs(workdir, now); err != nil {
return fmt.Errorf("Failed to write timestamp file: %s", err)
}
- return tcc.Cleanup(-COMMIT_TASK_WINDOW)
+ return nil
}
// readTs returns the last successful run timestamp which was cached in a file.
@@ -400,28 +393,12 @@
}
// Start initiates "average time to X% bot coverage" metrics data generation.
-func Start(ctx context.Context, taskDb db.TaskReader, workdir, recipesCfgFile, btProject, btInstance string, ts oauth2.TokenSource) error {
+// The caller is responsible for updating the passed-in repos and TaskCfgCache.
+func Start(ctx context.Context, taskDb db.TaskReader, repos repograph.Map, tcc *specs.TaskCfgCache, workdir string) error {
// Setup.
if err := os.MkdirAll(workdir, os.ModePerm); err != nil {
return err
}
- repos, err := repograph.NewMap(ctx, common.PUBLIC_REPOS, workdir)
- if err != nil {
- return fmt.Errorf("Failed to sync repograph: %s", err)
- }
- if err := repos.Update(ctx); err != nil {
- return err
- }
-
- depotTools, err := depot_tools.Sync(ctx, workdir, recipesCfgFile)
- if err != nil {
- return fmt.Errorf("Failed to sync depot_tools: %s", err)
- }
-
- tcc, err := specs.NewTaskCfgCache(ctx, repos, depotTools, path.Join(workdir, "taskCfgCache"), 1, btProject, btInstance, ts)
- if err != nil {
- return fmt.Errorf("Failed to create TaskCfgCache: %s", err)
- }
// Set up event metrics.
edb, err := events.NewEventDB(path.Join(workdir, "percent-metrics.bdb"))
diff --git a/datahopper/go/datahopper/jobs.go b/datahopper/go/datahopper/jobs.go
index 1c72e56..c06fd8e 100644
--- a/datahopper/go/datahopper/jobs.go
+++ b/datahopper/go/datahopper/jobs.go
@@ -13,22 +13,33 @@
"sync"
"time"
+ "go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/metrics2/events"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/db"
+ "go.skia.org/infra/task_scheduler/go/db/cache"
+ "go.skia.org/infra/task_scheduler/go/specs"
"go.skia.org/infra/task_scheduler/go/types"
+ "go.skia.org/infra/task_scheduler/go/window"
)
var (
// Time periods over which to compute metrics. Assumed to be sorted in
// increasing order.
TIME_PERIODS = []time.Duration{24 * time.Hour, 7 * 24 * time.Hour}
+
+ // Measurement name for overdue job specs. Records the age of the oldest commit for which the
+ // job has not completed (nor for any later commit), for each job spec and for each repo.
+ MEASUREMENT_OVERDUE_JOB_SPECS = "overdue_job_specs_s"
)
const (
JOB_STREAM = "job_metrics"
+
+ OVERDUE_JOB_METRICS_PERIOD = 8 * 24 * time.Hour
+ OVERDUE_JOB_METRICS_NUM_COMMITS = 5
)
// jobTypeString is an enum of the types of Jobs that computeAvgJobDuration will aggregate separately.
@@ -265,7 +276,8 @@
}
// StartJobMetrics starts a goroutine which ingests metrics data based on Jobs.
-func StartJobMetrics(ctx context.Context, jobDb db.JobReader) error {
+// The caller is responsible for updating the passed-in repos and TaskCfgCache.
+func StartJobMetrics(ctx context.Context, jobDb db.JobReader, repos repograph.Map, tcc *specs.TaskCfgCache) error {
edb := &jobEventDB{
cached: []*events.Event{},
db: jobDb,
@@ -281,6 +293,12 @@
return err
}
+ om, err := newOverdueJobMetrics(jobDb, repos, tcc)
+ if err != nil {
+ return err
+ }
+ om.start(ctx)
+
lv := metrics2.NewLiveness("last_successful_job_metrics_update")
go util.RepeatCtx(5*time.Minute, ctx, func() {
if err := edb.update(); err != nil {
@@ -292,3 +310,170 @@
em.Start(ctx)
return nil
}
+
+// overdueJobSpecMetricKey is a map key for overdueJobMetrics.overdueMetrics. The tags added to the
+// metric are reflected here so that we delete/recreate the metric when the tags change.
+type overdueJobSpecMetricKey struct {
+ // Repo URL.
+ Repo string
+ // Job name.
+ Job string
+ // Job trigger.
+ Trigger string
+}
+
+type overdueJobMetrics struct {
+ // Metric for age of commit with no completed job, in ns.
+ overdueMetrics map[overdueJobSpecMetricKey]metrics2.Int64Metric
+
+ jCache cache.JobCache
+ repos repograph.Map
+ taskCfgCache *specs.TaskCfgCache
+ window *window.Window
+}
+
+// Return an overdueJobMetrics instance. The caller is responsible for updating
+// the passed-in repos and TaskCfgCache.
+func newOverdueJobMetrics(jobDb db.JobReader, repos repograph.Map, tcc *specs.TaskCfgCache) (*overdueJobMetrics, error) {
+ w, err := window.New(OVERDUE_JOB_METRICS_PERIOD, OVERDUE_JOB_METRICS_NUM_COMMITS, repos)
+ if err != nil {
+ return nil, err
+ }
+ jCache, err := cache.NewJobCache(jobDb, w, cache.GitRepoGetRevisionTimestamp(repos))
+ if err != nil {
+ return nil, err
+ }
+ return &overdueJobMetrics{
+ overdueMetrics: map[overdueJobSpecMetricKey]metrics2.Int64Metric{},
+ jCache: jCache,
+ repos: repos,
+ taskCfgCache: tcc,
+ window: w,
+ }, nil
+}
+
+func (m *overdueJobMetrics) start(ctx context.Context) {
+ lvOverdueMetrics := metrics2.NewLiveness("last_successful_overdue_metrics_update")
+ go util.RepeatCtx(5*time.Second, ctx, func() {
+ if err := m.updateOverdueJobSpecMetrics(ctx, time.Now()); err != nil {
+ sklog.Errorf("Failed to update metrics for overdue job specs: %s", err)
+ } else {
+ lvOverdueMetrics.Reset()
+ }
+ })
+}
+
+// updateOverdueJobSpecMetrics updates metrics for MEASUREMENT_OVERDUE_JOB_SPECS.
+func (m *overdueJobMetrics) updateOverdueJobSpecMetrics(ctx context.Context, now time.Time) error {
+ defer metrics2.FuncTimer().Stop()
+
+ // Update the window and cache.
+ if err := m.window.Update(); err != nil {
+ return err
+ }
+ if err := m.jCache.Update(); err != nil {
+ return err
+ }
+
+ // Process each repo individually.
+ for repoUrl, repo := range m.repos {
+ // Include only the jobs at current master. We don't report on JobSpecs that have been removed.
+ head := repo.Get("master")
+ if head == nil {
+ return sklog.FmtErrorf("Can't resolve %q in %q.", "master", repoUrl)
+ }
+ headTaskCfg, err := m.taskCfgCache.ReadTasksCfg(ctx, types.RepoState{
+ Repo: repoUrl,
+ Revision: head.Hash,
+ })
+ if err != nil {
+ return sklog.FmtErrorf("Error reading TaskCfg for %q at %q: %s", repoUrl, head.Hash, err)
+ }
+ // Set of JobSpec names left to process.
+ todo := util.StringSet{}
+ // Maps JobSpec name to time of oldest untested commit; initialized to 'now' and updated each
+ // time we see an untested commit.
+ times := map[string]time.Time{}
+ for name := range headTaskCfg.Jobs {
+ todo[name] = true
+ times[name] = now
+ }
+
+ // Iterate backwards to find the most-recently tested commit. We're not going to worry about
+ // merges -- if a job was run on both branches, we'll use the first commit we come across.
+ if err := head.Recurse(func(c *repograph.Commit) (bool, error) {
+ // Stop if this commit is outside the scheduling window.
+ if in, err := m.window.TestCommitHash(repoUrl, c.Hash); err != nil {
+ return false, sklog.FmtErrorf("TestCommitHash: %s", err)
+ } else if !in {
+ return false, nil
+ }
+ rs := types.RepoState{
+ Repo: repoUrl,
+ Revision: c.Hash,
+ }
+ // Look in the cache for each remaining JobSpec at this commit.
+ for name := range todo {
+ jobs, err := m.jCache.GetJobsByRepoState(name, rs)
+ if err != nil {
+ return false, sklog.FmtErrorf("GetJobsByRepoState: %s", err)
+ }
+ for _, j := range jobs {
+ if j.Done() {
+ delete(todo, name)
+ break
+ }
+ }
+ }
+ if len(todo) == 0 {
+ return false, nil
+ }
+ // Check that the remaining JobSpecs are still valid at this commit.
+ taskCfg, err := m.taskCfgCache.ReadTasksCfg(ctx, rs)
+ if err != nil {
+ return false, sklog.FmtErrorf("Error reading TaskCfg for %q at %q: %s", repoUrl, c.Hash, err)
+ }
+ for name := range todo {
+ if _, ok := taskCfg.Jobs[name]; !ok {
+ delete(todo, name)
+ } else {
+ // Set times, since this job still exists and we haven't seen a completed job.
+ times[name] = c.Timestamp
+ }
+ }
+ return len(todo) > 0, nil
+ }); err != nil {
+ return err
+ }
+ // Delete metrics for jobs that have been removed or whose tags have changed.
+ for key, metric := range m.overdueMetrics {
+ if key.Repo != repoUrl {
+ continue
+ }
+ if jobCfg, ok := headTaskCfg.Jobs[key.Job]; !ok || jobCfg.Trigger != key.Trigger {
+ // Set to 0 before deleting so that alerts ignore it.
+ metric.Update(0)
+ delete(m.overdueMetrics, key)
+ }
+ }
+ // Update metrics or add metrics for any new jobs (or other tag changes).
+ for name, ts := range times {
+ key := overdueJobSpecMetricKey{
+ Repo: repoUrl,
+ Job: name,
+ Trigger: headTaskCfg.Jobs[name].Trigger,
+ }
+ metric, ok := m.overdueMetrics[key]
+ if !ok {
+ metric = metrics2.GetInt64Metric(MEASUREMENT_OVERDUE_JOB_SPECS, map[string]string{
+ "repo": key.Repo,
+ "job_name": key.Job,
+ "job_trigger": key.Trigger,
+ })
+ m.overdueMetrics[key] = metric
+ }
+ metric.Update(int64(now.Sub(ts).Seconds()))
+ }
+ }
+ return nil
+}
diff --git a/datahopper/go/datahopper/jobs_test.go b/datahopper/go/datahopper/jobs_test.go
index 27bfb45..5aa928e 100644
--- a/datahopper/go/datahopper/jobs_test.go
+++ b/datahopper/go/datahopper/jobs_test.go
@@ -3,16 +3,24 @@
import (
"bytes"
"encoding/gob"
+ "io/ioutil"
+ "path"
"testing"
"time"
assert "github.com/stretchr/testify/require"
"go.skia.org/infra/go/deepequal"
+ depot_tools_testutils "go.skia.org/infra/go/depot_tools/testutils"
+ "go.skia.org/infra/go/git"
+ "go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/metrics2/events"
+ metrics2_testutils "go.skia.org/infra/go/metrics2/testutils"
"go.skia.org/infra/go/testutils"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/db/memory"
+ "go.skia.org/infra/task_scheduler/go/specs"
+ specs_testutils "go.skia.org/infra/task_scheduler/go/specs/testutils"
"go.skia.org/infra/task_scheduler/go/types"
)
@@ -330,3 +338,135 @@
tester.Run(evs)
}
+
+func TestOverdueJobSpecMetrics(t *testing.T) {
+ testutils.LargeTest(t)
+
+ wd, err := ioutil.TempDir("", "")
+ assert.NoError(t, err)
+ defer testutils.RemoveAll(t, wd)
+
+ d := memory.NewInMemoryDB(nil)
+ ctx, gb, _, _ := specs_testutils.SetupTestRepo(t)
+ repos, err := repograph.NewMap(ctx, []string{gb.RepoUrl()}, wd)
+ assert.NoError(t, err)
+ assert.NoError(t, repos.Update(ctx))
+ repo := repos[gb.RepoUrl()]
+
+ depotTools := depot_tools_testutils.GetDepotTools(t, ctx)
+ btProject, btInstance, btCleanup := specs_testutils.SetupBigTable(t)
+ defer btCleanup()
+ tcc, err := specs.NewTaskCfgCache(ctx, repos, depotTools, path.Join(wd, "taskCfgCache"), 1, btProject, btInstance, nil)
+ assert.NoError(t, err)
+
+ c1, err := git.GitDir(gb.Dir()).RevParse(ctx, "HEAD^")
+ assert.NoError(t, err)
+ c1time := repo.Get(c1).Timestamp
+ c2, err := git.GitDir(gb.Dir()).RevParse(ctx, "HEAD")
+ assert.NoError(t, err)
+ // c2 is 5 seconds after c1
+ c2time := repo.Get(c2).Timestamp
+
+ // At 'now', c1 is 60 seconds old, c2 is 55 seconds old, and c3 (below) is 50 seconds old.
+ now := c1time.Add(time.Minute)
+ c1age := "60.0"
+ c2age := "55.0"
+ c3age := "50.0"
+
+ check := func(buildAge, testAge, perfAge string) {
+ tags := map[string]string{
+ "repo": gb.RepoUrl(),
+ "job_name": specs_testutils.BuildTask,
+ "job_trigger": "",
+ }
+ assert.Equal(t, buildAge, metrics2_testutils.GetRecordedMetric(t, MEASUREMENT_OVERDUE_JOB_SPECS, tags))
+
+ tags["job_name"] = specs_testutils.TestTask
+ assert.Equal(t, testAge, metrics2_testutils.GetRecordedMetric(t, MEASUREMENT_OVERDUE_JOB_SPECS, tags))
+
+ tags["job_name"] = specs_testutils.PerfTask
+ assert.Equal(t, perfAge, metrics2_testutils.GetRecordedMetric(t, MEASUREMENT_OVERDUE_JOB_SPECS, tags))
+ }
+
+ om, err := newOverdueJobMetrics(d, repos, tcc)
+ assert.NoError(t, err)
+
+ // No jobs have finished yet.
+ assert.NoError(t, om.updateOverdueJobSpecMetrics(ctx, now))
+ check(c1age, c1age, c2age)
+
+ // Insert jobs.
+ j1 := &types.Job{
+ Name: specs_testutils.BuildTask,
+ RepoState: types.RepoState{
+ Repo: gb.RepoUrl(),
+ Revision: c1,
+ },
+ Created: c1time,
+ }
+ j2 := &types.Job{
+ Name: specs_testutils.BuildTask,
+ RepoState: types.RepoState{
+ Repo: gb.RepoUrl(),
+ Revision: c2,
+ },
+ Created: c2time,
+ }
+ j3 := &types.Job{
+ Name: specs_testutils.TestTask,
+ RepoState: types.RepoState{
+ Repo: gb.RepoUrl(),
+ Revision: c1,
+ },
+ Created: c1time,
+ }
+ j4 := &types.Job{
+ Name: specs_testutils.TestTask,
+ RepoState: types.RepoState{
+ Repo: gb.RepoUrl(),
+ Revision: c2,
+ },
+ Created: c2time,
+ }
+ j5 := &types.Job{
+ Name: specs_testutils.PerfTask,
+ RepoState: types.RepoState{
+ Repo: gb.RepoUrl(),
+ Revision: c2,
+ },
+ Created: c2time,
+ }
+ assert.NoError(t, d.PutJobs([]*types.Job{j1, j2, j3, j4, j5}))
+ // Jobs have not completed, so same as above.
+ assert.NoError(t, om.updateOverdueJobSpecMetrics(ctx, now))
+ check(c1age, c1age, c2age)
+
+ // One job is complete.
+ j2.Status = types.JOB_STATUS_SUCCESS
+ j2.Finished = time.Now()
+ assert.NoError(t, d.PutJob(j2))
+ // Expect Build to be up-to-date.
+ assert.NoError(t, om.updateOverdueJobSpecMetrics(ctx, now))
+ check("0.0", c1age, c2age)
+
+ // Revert back to c1 (no Perf task) and check that Perf job disappears.
+ content, err := repo.Repo().GetFile(ctx, "infra/bots/tasks.json", c1)
+ assert.NoError(t, err)
+ gb.Add(ctx, "infra/bots/tasks.json", content)
+ c3 := gb.CommitMsgAt(ctx, "c3", c1time.Add(10*time.Second)) // 5 seconds after c2
+ assert.NoError(t, repos.Update(ctx))
+ c3time := repo.Get(c3).Timestamp
+
+ // Update to c3. Perf job should be reset to zero. Build job age is now at c3.
+ j6 := &types.Job{
+ Name: specs_testutils.BuildTask,
+ RepoState: types.RepoState{
+ Repo: gb.RepoUrl(),
+ Revision: c3,
+ },
+ Created: c3time,
+ }
+ assert.NoError(t, d.PutJob(j6))
+ assert.NoError(t, om.updateOverdueJobSpecMetrics(ctx, now))
+ check(c3age, c1age, "0.0")
+}
diff --git a/datahopper/go/datahopper/main.go b/datahopper/go/datahopper/main.go
index d743feb..0b4ba0b 100644
--- a/datahopper/go/datahopper/main.go
+++ b/datahopper/go/datahopper/main.go
@@ -20,6 +20,7 @@
"go.skia.org/infra/datahopper/go/swarming_metrics"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/common"
+ "go.skia.org/infra/go/depot_tools"
"go.skia.org/infra/go/gcs"
"go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/httputils"
@@ -27,11 +28,13 @@
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/swarming"
"go.skia.org/infra/go/taskname"
+ "go.skia.org/infra/go/util"
"go.skia.org/infra/perf/go/perfclient"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/db/firestore"
"go.skia.org/infra/task_scheduler/go/db/pubsub"
"go.skia.org/infra/task_scheduler/go/db/remote_db"
+ "go.skia.org/infra/task_scheduler/go/specs"
"google.golang.org/api/option"
)
@@ -124,6 +127,36 @@
if err := repos.Update(ctx); err != nil {
sklog.Fatal(err)
}
+ go util.RepeatCtx(time.Minute, ctx, func() {
+ lvRepos := metrics2.NewLiveness("datahopper_repo_update")
+ if err := repos.Update(ctx); err != nil {
+ sklog.Errorf("Failed to update repos: %s", err)
+ } else {
+ lvRepos.Reset()
+ }
+ })
+
+ // TaskCfgCache.
+ if *recipesCfgFile == "" {
+ *recipesCfgFile = path.Join(*workdir, "recipes.cfg")
+ }
+ depotTools, err := depot_tools.Sync(ctx, w, *recipesCfgFile)
+ if err != nil {
+ sklog.Fatalf("Failed to sync depot_tools: %s", err)
+ }
+ newTs, err := auth.NewDefaultTokenSource(*local, auth.SCOPE_USERINFO_EMAIL, pubsub.AUTH_SCOPE, bigtable.Scope)
+ if err != nil {
+ sklog.Fatal(err)
+ }
+ tcc, err := specs.NewTaskCfgCache(ctx, repos, depotTools, path.Join(w, "taskCfgCache"), 1, *btProject, *btInstance, newTs)
+ if err != nil {
+ sklog.Fatalf("Failed to create TaskCfgCache: %s", err)
+ }
+ go util.RepeatCtx(30*time.Minute, ctx, func() {
+ if err := tcc.Cleanup(OVERDUE_JOB_METRICS_PERIOD); err != nil {
+ sklog.Errorf("Failed to cleanup TaskCfgCache: %s", err)
+ }
+ })
// Data generation goroutines.
@@ -164,10 +197,6 @@
}()
// Tasks metrics.
- newTs, err := auth.NewDefaultTokenSource(*local, auth.SCOPE_USERINFO_EMAIL, pubsub.AUTH_SCOPE, bigtable.Scope)
- if err != nil {
- sklog.Fatal(err)
- }
var d db.RemoteDB
if *firestoreInstance != "" {
label := "datahopper"
@@ -190,15 +219,12 @@
}
// Jobs metrics.
- if err := StartJobMetrics(ctx, d); err != nil {
+ if err := StartJobMetrics(ctx, d, repos, tcc); err != nil {
sklog.Fatal(err)
}
// Generate "time to X% bot coverage" metrics.
- if *recipesCfgFile == "" {
- *recipesCfgFile = path.Join(*workdir, "recipes.cfg")
- }
- if err := bot_metrics.Start(ctx, d, *workdir, *recipesCfgFile, *btProject, *btInstance, newTs); err != nil {
+ if err := bot_metrics.Start(ctx, d, repos, tcc, *workdir); err != nil {
sklog.Fatal(err)
}
diff --git a/task_scheduler/go/db/cache/cache.go b/task_scheduler/go/db/cache/cache.go
index 5f79ce2..5cc4661 100644
--- a/task_scheduler/go/db/cache/cache.go
+++ b/task_scheduler/go/db/cache/cache.go
@@ -464,7 +464,7 @@
}
type jobCache struct {
- db db.JobDB
+ db db.JobReader
getRevisionTimestamp db.GetRevisionTimestamp
mtx sync.RWMutex
queryId string
@@ -688,7 +688,7 @@
// NewJobCache returns a local cache which provides more convenient views of
// job data than the database can provide.
-func NewJobCache(d db.JobDB, timeWindow *window.Window, getRevisionTimestamp db.GetRevisionTimestamp) (JobCache, error) {
+func NewJobCache(d db.JobReader, timeWindow *window.Window, getRevisionTimestamp db.GetRevisionTimestamp) (JobCache, error) {
tc := &jobCache{
db: d,
getRevisionTimestamp: getRevisionTimestamp,
diff --git a/task_scheduler/go/scheduling/task_scheduler.go b/task_scheduler/go/scheduling/task_scheduler.go
index 411e2d1..d870212 100644
--- a/task_scheduler/go/scheduling/task_scheduler.go
+++ b/task_scheduler/go/scheduling/task_scheduler.go
@@ -53,10 +53,6 @@
// Measurement name for task candidate counts by dimension set.
MEASUREMENT_TASK_CANDIDATE_COUNT = "task_candidate_count"
- // Measurement name for overdue job specs. Records the age of the oldest commit for which the
- // job has not completed (nor for any later commit), for each job spec and for each repo.
- MEASUREMENT_OVERDUE_JOB_SPECS = "overdue_job_specs_s"
-
NUM_TOP_CANDIDATES = 50
)
@@ -80,17 +76,6 @@
ERR_BLAMELIST_DONE = errors.New("ERR_BLAMELIST_DONE")
)
-// overdueJobSpecMetricKey is a map key for TaskScheduler.overdueMetrics. The tags added to the
-// metric are reflected here so that we delete/recreate the metric when the tags change.
-type overdueJobSpecMetricKey struct {
- // Repo URL.
- Repo string
- // Job name.
- Job string
- // Job trigger.
- Trigger string
-}
-
// TaskScheduler is a struct used for scheduling tasks on bots.
type TaskScheduler struct {
bl *blacklist.Blacklist
@@ -120,10 +105,8 @@
tCache cache.TaskCache
timeDecayAmt24Hr float64
tryjobs *tryjobs.TryJobIntegrator
- // Metric for age of commit with no completed job, in ns.
- overdueMetrics map[overdueJobSpecMetricKey]metrics2.Int64Metric
- window *window.Window
- workdir string
+ window *window.Window
+ workdir string
}
func NewTaskScheduler(ctx context.Context, d db.DB, bl *blacklist.Blacklist, period time.Duration, numCommits int, workdir, host string, repos repograph.Map, isolateClient *isolate.Client, swarmingClient swarming.ApiClient, c *http.Client, timeDecayAmt24Hr float64, buildbucketApiUrl, trybotBucket string, projectRepoMapping map[string]string, pools []string, pubsubTopic, depotTools string, gerrit gerrit.GerritInterface, btProject, btInstance string, ts oauth2.TokenSource) (*TaskScheduler, error) {
@@ -179,7 +162,6 @@
tCache: tCache,
timeDecayAmt24Hr: timeDecayAmt24Hr,
tryjobs: tryjobs,
- overdueMetrics: map[overdueJobSpecMetricKey]metrics2.Int64Metric{},
window: w,
workdir: workdir,
}
@@ -193,22 +175,12 @@
s.tryjobs.Start(ctx)
}
lvScheduling := metrics2.NewLiveness("last_successful_task_scheduling")
- lvOverdueMetrics := metrics2.NewLiveness("last_successful_overdue_metrics_update")
go util.RepeatCtx(5*time.Second, ctx, func() {
beforeMainLoop()
if err := s.MainLoop(ctx); err != nil {
sklog.Errorf("Failed to run the task scheduler: %s", err)
} else {
lvScheduling.Reset()
- // Do this after MainLoop so that the repos and Jobs have been updated.
- go func() {
- if err := s.updateOverdueJobSpecMetrics(ctx, time.Now()); err != nil {
- sklog.Errorf("Failed to update metrics for overdue job specs: %s", err)
- } else {
- lvOverdueMetrics.Reset()
- }
- }()
-
}
})
lvUpdate := metrics2.NewLiveness("last_successful_tasks_update")
@@ -2102,110 +2074,3 @@
}
return true
}
-
-// updateOverdueJobSpecMetrics updates metrics for MEASUREMENT_OVERDUE_JOB_SPECS.
-func (s *TaskScheduler) updateOverdueJobSpecMetrics(ctx context.Context, now time.Time) error {
- defer metrics2.FuncTimer().Stop()
-
- // Process each repo individually.
- for repoUrl, repo := range s.repos {
- // Include only the jobs at current master. We don't report on JobSpecs that have been removed.
- head := repo.Get("master")
- if head == nil {
- return sklog.FmtErrorf("Can't resolve %q in %q.", "master", repoUrl)
- }
- headTaskCfg, err := s.taskCfgCache.ReadTasksCfg(ctx, types.RepoState{
- Repo: repoUrl,
- Revision: head.Hash,
- })
- if err != nil {
- return sklog.FmtErrorf("Error reading TaskCfg for %q at %q: %s", repoUrl, head.Hash, err)
- }
- // Set of JobSpec names left to process.
- todo := util.StringSet{}
- // Maps JobSpec name to time of oldest untested commit; initialized to 'now' and updated each
- // time we see an untested commit.
- times := map[string]time.Time{}
- for name := range headTaskCfg.Jobs {
- todo[name] = true
- times[name] = now
- }
-
- // Iterate backwards to find the most-recently tested commit. We're not going to worry about
- // merges -- if a job was run on both branches, we'll use the first commit we come across.
- if err := head.Recurse(func(c *repograph.Commit) (bool, error) {
- // Stop if this commit is outside the scheduling window.
- if in, err := s.window.TestCommitHash(repoUrl, c.Hash); err != nil {
- return false, sklog.FmtErrorf("TestCommitHash: %s", err)
- } else if !in {
- return false, nil
- }
- rs := types.RepoState{
- Repo: repoUrl,
- Revision: c.Hash,
- }
- // Look in the cache for each remaining JobSpec at this commit.
- for name := range todo {
- jobs, err := s.jCache.GetJobsByRepoState(name, rs)
- if err != nil {
- return false, sklog.FmtErrorf("GetJobsByRepoState: %s", err)
- }
- for _, j := range jobs {
- if j.Done() {
- delete(todo, name)
- break
- }
- }
- }
- if len(todo) == 0 {
- return false, nil
- }
- // Check that the remaining JobSpecs are still valid at this commit.
- taskCfg, err := s.taskCfgCache.ReadTasksCfg(ctx, rs)
- if err != nil {
- return false, sklog.FmtErrorf("Error reading TaskCfg for %q at %q: %s", repoUrl, c.Hash, err)
- }
- for name := range todo {
- if _, ok := taskCfg.Jobs[name]; !ok {
- delete(todo, name)
- } else {
- // Set times, since this job still exists and we haven't seen a completed job.
- times[name] = c.Timestamp
- }
- }
- return len(todo) > 0, nil
- }); err != nil {
- return err
- }
- // Delete metrics for jobs that have been removed or whose tags have changed.
- for key, m := range s.overdueMetrics {
- if key.Repo != repoUrl {
- continue
- }
- if jobCfg, ok := headTaskCfg.Jobs[key.Job]; !ok || jobCfg.Trigger != key.Trigger {
- // Set to 0 before deleting so that alerts ignore it.
- m.Update(0)
- delete(s.overdueMetrics, key)
- }
- }
- // Update metrics or add metrics for any new jobs (or other tag changes).
- for name, ts := range times {
- key := overdueJobSpecMetricKey{
- Repo: repoUrl,
- Job: name,
- Trigger: headTaskCfg.Jobs[name].Trigger,
- }
- m, ok := s.overdueMetrics[key]
- if !ok {
- m = metrics2.GetInt64Metric(MEASUREMENT_OVERDUE_JOB_SPECS, map[string]string{
- "repo": key.Repo,
- "job_name": key.Job,
- "job_trigger": key.Trigger,
- })
- s.overdueMetrics[key] = m
- }
- m.Update(int64(now.Sub(ts).Seconds()))
- }
- }
- return nil
-}
diff --git a/task_scheduler/go/scheduling/task_scheduler_test.go b/task_scheduler/go/scheduling/task_scheduler_test.go
index ad14c96..0c6c4db 100644
--- a/task_scheduler/go/scheduling/task_scheduler_test.go
+++ b/task_scheduler/go/scheduling/task_scheduler_test.go
@@ -24,7 +24,6 @@
"go.skia.org/infra/go/git/repograph"
git_testutils "go.skia.org/infra/go/git/testutils"
"go.skia.org/infra/go/isolate"
- metrics2_testutils "go.skia.org/infra/go/metrics2/testutils"
"go.skia.org/infra/go/mockhttpclient"
"go.skia.org/infra/go/swarming"
"go.skia.org/infra/go/testutils"
@@ -3683,89 +3682,3 @@
sort.Strings(t1.Commits)
deepequal.AssertDeepEqual(t, expect1, t1.Commits)
}
-
-func TestOverdueJobSpecMetrics(t *testing.T) {
- ctx, gb, d, swarmingClient, s, _, cleanup := setup(t)
- defer cleanup()
-
- repo := s.repos[gb.RepoUrl()]
- c1 := getRS1(t, ctx, gb).Revision
- c1time := repo.Get(c1).Timestamp
- c2 := getRS2(t, ctx, gb).Revision
- // c2 is 5 seconds after c1
-
- // At 'now', c1 is 60 seconds old, c2 is 55 seconds old, and c3 (below) is 50 seconds old.
- now := c1time.Add(time.Minute)
- c1age := "60.0"
- c2age := "55.0"
- c3age := "50.0"
-
- check := func(buildAge, testAge, perfAge string) {
- tags := map[string]string{
- "repo": gb.RepoUrl(),
- "job_name": specs_testutils.BuildTask,
- "job_trigger": "",
- }
- assert.Equal(t, buildAge, metrics2_testutils.GetRecordedMetric(t, MEASUREMENT_OVERDUE_JOB_SPECS, tags))
-
- tags["job_name"] = specs_testutils.TestTask
- assert.Equal(t, testAge, metrics2_testutils.GetRecordedMetric(t, MEASUREMENT_OVERDUE_JOB_SPECS, tags))
-
- tags["job_name"] = specs_testutils.PerfTask
- assert.Equal(t, perfAge, metrics2_testutils.GetRecordedMetric(t, MEASUREMENT_OVERDUE_JOB_SPECS, tags))
- }
-
- // Expect no errors before MainLoop.
- assert.NoError(t, s.updateOverdueJobSpecMetrics(ctx, now))
- // Build and Test were added in c1; Perf was added in c2.
- check(c1age, c1age, c2age)
-
- // Run MainLoop. No free bots, so no tasks to complete.
- assert.NoError(t, s.MainLoop(ctx))
- assert.NoError(t, s.updateOverdueJobSpecMetrics(ctx, now))
- check(c1age, c1age, c2age)
-
- // One bot free, schedule a task.
- bot1 := makeBot("bot1", map[string]string{
- "pool": "Skia",
- "os": "Ubuntu",
- })
- swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{bot1})
- assert.NoError(t, s.MainLoop(ctx))
- assert.NoError(t, s.tCache.Update())
- tasks, err := s.tCache.GetTasksForCommits(gb.RepoUrl(), []string{c1, c2})
- assert.NoError(t, err)
- t1 := tasks[c2][specs_testutils.BuildTask]
- assert.NotNil(t, t1)
- assert.Equal(t, c2, t1.Revision)
- // Task has not completed, so same as above.
- assert.NoError(t, s.updateOverdueJobSpecMetrics(ctx, now))
- check(c1age, c1age, c2age)
-
- // The task is complete.
- t1.Status = types.TASK_STATUS_SUCCESS
- t1.Finished = time.Now()
- t1.IsolatedOutput = "abc123"
- assert.NoError(t, d.PutTask(t1))
- swarmingClient.MockTasks([]*swarming_api.SwarmingRpcsTaskRequestMetadata{
- makeSwarmingRpcsTaskRequestMetadata(t, t1, linuxTaskDims),
- })
-
- // Run MainLoop to update Jobs.
- swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{})
- assert.NoError(t, s.MainLoop(ctx))
- // Expect Build to be up-to-date.
- assert.NoError(t, s.updateOverdueJobSpecMetrics(ctx, now))
- check("0.0", c1age, c2age)
-
- // Revert back to c1 (no Perf task) and check that Perf job disappears.
- content, err := repo.Repo().GetFile(ctx, "infra/bots/tasks.json", c1)
- assert.NoError(t, err)
- gb.Add(ctx, "infra/bots/tasks.json", content)
- _ = gb.CommitMsgAt(ctx, "c3", c1time.Add(10*time.Second)) // 5 seconds after c2
-
- // Run MainLoop to update to c3. Perf job should be reset to zero. Build job age is now at c3.
- assert.NoError(t, s.MainLoop(ctx))
- assert.NoError(t, s.updateOverdueJobSpecMetrics(ctx, now))
- check(c3age, c1age, "0.0")
-}