[task scheduler] Move overdue job metrics into Datahopper

Less stress on the scheduler, and datahopper already uses JobCache and
TaskCfgCache.

Bug: skia:
Change-Id: Icf15aada9d1c6919d18970cf36fa58ad3345f482
Reviewed-on: https://skia-review.googlesource.com/c/184060
Commit-Queue: Eric Boren <borenet@google.com>
Reviewed-by: Ben Wagner <benjaminwagner@google.com>
diff --git a/datahopper/go/bot_metrics/bot_metrics.go b/datahopper/go/bot_metrics/bot_metrics.go
index 691d8a6..bd02d8f 100644
--- a/datahopper/go/bot_metrics/bot_metrics.go
+++ b/datahopper/go/bot_metrics/bot_metrics.go
@@ -19,8 +19,6 @@
 	"strings"
 	"time"
 
-	"go.skia.org/infra/go/common"
-	"go.skia.org/infra/go/depot_tools"
 	"go.skia.org/infra/go/git/repograph"
 	"go.skia.org/infra/go/metrics2"
 	"go.skia.org/infra/go/metrics2/events"
@@ -29,7 +27,6 @@
 	"go.skia.org/infra/task_scheduler/go/db"
 	"go.skia.org/infra/task_scheduler/go/specs"
 	"go.skia.org/infra/task_scheduler/go/types"
-	"golang.org/x/oauth2"
 )
 
 const (
@@ -195,10 +192,6 @@
 // before any other task, and inserts event data based on the lag time between
 // a commit landing and each task finishing for that commit.
 func cycle(ctx context.Context, taskDb db.TaskReader, repos repograph.Map, tcc *specs.TaskCfgCache, edb events.EventDB, em *events.EventMetrics, lastFinished, now time.Time, workdir string) error {
-	if err := repos.Update(ctx); err != nil {
-		return err
-	}
-
 	totalCommits := 0
 	for _, r := range repos {
 		totalCommits += r.Len()
@@ -371,7 +364,7 @@
 	if err := writeTs(workdir, now); err != nil {
 		return fmt.Errorf("Failed to write timestamp file: %s", err)
 	}
-	return tcc.Cleanup(-COMMIT_TASK_WINDOW)
+	return nil
 }
 
 // readTs returns the last successful run timestamp which was cached in a file.
@@ -400,28 +393,12 @@
 }
 
 // Start initiates "average time to X% bot coverage" metrics data generation.
-func Start(ctx context.Context, taskDb db.TaskReader, workdir, recipesCfgFile, btProject, btInstance string, ts oauth2.TokenSource) error {
+// The caller is responsible for updating the passed-in repos and TaskCfgCache.
+func Start(ctx context.Context, taskDb db.TaskReader, repos repograph.Map, tcc *specs.TaskCfgCache, workdir string) error {
 	// Setup.
 	if err := os.MkdirAll(workdir, os.ModePerm); err != nil {
 		return err
 	}
-	repos, err := repograph.NewMap(ctx, common.PUBLIC_REPOS, workdir)
-	if err != nil {
-		return fmt.Errorf("Failed to sync repograph: %s", err)
-	}
-	if err := repos.Update(ctx); err != nil {
-		return err
-	}
-
-	depotTools, err := depot_tools.Sync(ctx, workdir, recipesCfgFile)
-	if err != nil {
-		return fmt.Errorf("Failed to sync depot_tools: %s", err)
-	}
-
-	tcc, err := specs.NewTaskCfgCache(ctx, repos, depotTools, path.Join(workdir, "taskCfgCache"), 1, btProject, btInstance, ts)
-	if err != nil {
-		return fmt.Errorf("Failed to create TaskCfgCache: %s", err)
-	}
 
 	// Set up event metrics.
 	edb, err := events.NewEventDB(path.Join(workdir, "percent-metrics.bdb"))
diff --git a/datahopper/go/datahopper/jobs.go b/datahopper/go/datahopper/jobs.go
index 1c72e56..c06fd8e 100644
--- a/datahopper/go/datahopper/jobs.go
+++ b/datahopper/go/datahopper/jobs.go
@@ -13,22 +13,33 @@
 	"sync"
 	"time"
 
+	"go.skia.org/infra/go/git/repograph"
 	"go.skia.org/infra/go/metrics2"
 	"go.skia.org/infra/go/metrics2/events"
 	"go.skia.org/infra/go/sklog"
 	"go.skia.org/infra/go/util"
 	"go.skia.org/infra/task_scheduler/go/db"
+	"go.skia.org/infra/task_scheduler/go/db/cache"
+	"go.skia.org/infra/task_scheduler/go/specs"
 	"go.skia.org/infra/task_scheduler/go/types"
+	"go.skia.org/infra/task_scheduler/go/window"
 )
 
 var (
 	// Time periods over which to compute metrics. Assumed to be sorted in
 	// increasing order.
 	TIME_PERIODS = []time.Duration{24 * time.Hour, 7 * 24 * time.Hour}
+
+	// Measurement name for overdue job specs. Records the age of the oldest commit for which the
+	// job has not completed (nor for any later commit), for each job spec and for each repo.
+	MEASUREMENT_OVERDUE_JOB_SPECS = "overdue_job_specs_s"
 )
 
 const (
 	JOB_STREAM = "job_metrics"
+
+	OVERDUE_JOB_METRICS_PERIOD      = 8 * 24 * time.Hour
+	OVERDUE_JOB_METRICS_NUM_COMMITS = 5
 )
 
 // jobTypeString is an enum of the types of Jobs that computeAvgJobDuration will aggregate separately.
@@ -265,7 +276,8 @@
 }
 
 // StartJobMetrics starts a goroutine which ingests metrics data based on Jobs.
-func StartJobMetrics(ctx context.Context, jobDb db.JobReader) error {
+// The caller is responsible for updating the passed-in repos and TaskCfgCache.
+func StartJobMetrics(ctx context.Context, jobDb db.JobReader, repos repograph.Map, tcc *specs.TaskCfgCache) error {
 	edb := &jobEventDB{
 		cached: []*events.Event{},
 		db:     jobDb,
@@ -281,6 +293,12 @@
 		return err
 	}
 
+	om, err := newOverdueJobMetrics(jobDb, repos, tcc)
+	if err != nil {
+		return err
+	}
+	om.start(ctx)
+
 	lv := metrics2.NewLiveness("last_successful_job_metrics_update")
 	go util.RepeatCtx(5*time.Minute, ctx, func() {
 		if err := edb.update(); err != nil {
@@ -292,3 +310,170 @@
 	em.Start(ctx)
 	return nil
 }
+
+// overdueJobSpecMetricKey is a map key for overdueJobMetrics.overdueMetrics. The tags added to the
+// metric are reflected here so that we delete/recreate the metric when the tags change.
+type overdueJobSpecMetricKey struct {
+	// Repo URL.
+	Repo string
+	// Job name.
+	Job string
+	// Job trigger.
+	Trigger string
+}
+
+type overdueJobMetrics struct {
+	// Metric for age of commit with no completed job, in ns.
+	overdueMetrics map[overdueJobSpecMetricKey]metrics2.Int64Metric
+
+	jCache       cache.JobCache
+	repos        repograph.Map
+	taskCfgCache *specs.TaskCfgCache
+	window       *window.Window
+}
+
+// Return an overdueJobMetrics instance. The caller is responsible for updating
+// the passed-in repos and TaskCfgCache.
+func newOverdueJobMetrics(jobDb db.JobReader, repos repograph.Map, tcc *specs.TaskCfgCache) (*overdueJobMetrics, error) {
+	w, err := window.New(OVERDUE_JOB_METRICS_PERIOD, OVERDUE_JOB_METRICS_NUM_COMMITS, repos)
+	if err != nil {
+		return nil, err
+	}
+	jCache, err := cache.NewJobCache(jobDb, w, cache.GitRepoGetRevisionTimestamp(repos))
+	if err != nil {
+		return nil, err
+	}
+	return &overdueJobMetrics{
+		overdueMetrics: map[overdueJobSpecMetricKey]metrics2.Int64Metric{},
+		jCache:         jCache,
+		repos:          repos,
+		taskCfgCache:   tcc,
+		window:         w,
+	}, nil
+}
+
+func (m *overdueJobMetrics) start(ctx context.Context) {
+	lvOverdueMetrics := metrics2.NewLiveness("last_successful_overdue_metrics_update")
+	go util.RepeatCtx(5*time.Second, ctx, func() {
+		if err := m.updateOverdueJobSpecMetrics(ctx, time.Now()); err != nil {
+			sklog.Errorf("Failed to update metrics for overdue job specs: %s", err)
+		} else {
+			lvOverdueMetrics.Reset()
+		}
+	})
+}
+
+// updateOverdueJobSpecMetrics updates metrics for MEASUREMENT_OVERDUE_JOB_SPECS.
+func (m *overdueJobMetrics) updateOverdueJobSpecMetrics(ctx context.Context, now time.Time) error {
+	defer metrics2.FuncTimer().Stop()
+
+	// Update the window and cache.
+	if err := m.window.Update(); err != nil {
+		return err
+	}
+	if err := m.jCache.Update(); err != nil {
+		return err
+	}
+
+	// Process each repo individually.
+	for repoUrl, repo := range m.repos {
+		// Include only the jobs at current master. We don't report on JobSpecs that have been removed.
+		head := repo.Get("master")
+		if head == nil {
+			return sklog.FmtErrorf("Can't resolve %q in %q.", "master", repoUrl)
+		}
+		headTaskCfg, err := m.taskCfgCache.ReadTasksCfg(ctx, types.RepoState{
+			Repo:     repoUrl,
+			Revision: head.Hash,
+		})
+		if err != nil {
+			return sklog.FmtErrorf("Error reading TaskCfg for %q at %q: %s", repoUrl, head.Hash, err)
+		}
+		// Set of JobSpec names left to process.
+		todo := util.StringSet{}
+		// Maps JobSpec name to time of oldest untested commit; initialized to 'now' and updated each
+		// time we see an untested commit.
+		times := map[string]time.Time{}
+		for name := range headTaskCfg.Jobs {
+			todo[name] = true
+			times[name] = now
+		}
+
+		// Iterate backwards to find the most-recently tested commit. We're not going to worry about
+		// merges -- if a job was run on both branches, we'll use the first commit we come across.
+		if err := head.Recurse(func(c *repograph.Commit) (bool, error) {
+			// Stop if this commit is outside the scheduling window.
+			if in, err := m.window.TestCommitHash(repoUrl, c.Hash); err != nil {
+				return false, sklog.FmtErrorf("TestCommitHash: %s", err)
+			} else if !in {
+				return false, nil
+			}
+			rs := types.RepoState{
+				Repo:     repoUrl,
+				Revision: c.Hash,
+			}
+			// Look in the cache for each remaining JobSpec at this commit.
+			for name := range todo {
+				jobs, err := m.jCache.GetJobsByRepoState(name, rs)
+				if err != nil {
+					return false, sklog.FmtErrorf("GetJobsByRepoState: %s", err)
+				}
+				for _, j := range jobs {
+					if j.Done() {
+						delete(todo, name)
+						break
+					}
+				}
+			}
+			if len(todo) == 0 {
+				return false, nil
+			}
+			// Check that the remaining JobSpecs are still valid at this commit.
+			taskCfg, err := m.taskCfgCache.ReadTasksCfg(ctx, rs)
+			if err != nil {
+				return false, sklog.FmtErrorf("Error reading TaskCfg for %q at %q: %s", repoUrl, c.Hash, err)
+			}
+			for name := range todo {
+				if _, ok := taskCfg.Jobs[name]; !ok {
+					delete(todo, name)
+				} else {
+					// Set times, since this job still exists and we haven't seen a completed job.
+					times[name] = c.Timestamp
+				}
+			}
+			return len(todo) > 0, nil
+		}); err != nil {
+			return err
+		}
+		// Delete metrics for jobs that have been removed or whose tags have changed.
+		for key, metric := range m.overdueMetrics {
+			if key.Repo != repoUrl {
+				continue
+			}
+			if jobCfg, ok := headTaskCfg.Jobs[key.Job]; !ok || jobCfg.Trigger != key.Trigger {
+				// Set to 0 before deleting so that alerts ignore it.
+				metric.Update(0)
+				delete(m.overdueMetrics, key)
+			}
+		}
+		// Update metrics or add metrics for any new jobs (or other tag changes).
+		for name, ts := range times {
+			key := overdueJobSpecMetricKey{
+				Repo:    repoUrl,
+				Job:     name,
+				Trigger: headTaskCfg.Jobs[name].Trigger,
+			}
+			metric, ok := m.overdueMetrics[key]
+			if !ok {
+				metric = metrics2.GetInt64Metric(MEASUREMENT_OVERDUE_JOB_SPECS, map[string]string{
+					"repo":        key.Repo,
+					"job_name":    key.Job,
+					"job_trigger": key.Trigger,
+				})
+				m.overdueMetrics[key] = metric
+			}
+			metric.Update(int64(now.Sub(ts).Seconds()))
+		}
+	}
+	return nil
+}
diff --git a/datahopper/go/datahopper/jobs_test.go b/datahopper/go/datahopper/jobs_test.go
index 27bfb45..5aa928e 100644
--- a/datahopper/go/datahopper/jobs_test.go
+++ b/datahopper/go/datahopper/jobs_test.go
@@ -3,16 +3,24 @@
 import (
 	"bytes"
 	"encoding/gob"
+	"io/ioutil"
+	"path"
 	"testing"
 	"time"
 
 	assert "github.com/stretchr/testify/require"
 	"go.skia.org/infra/go/deepequal"
+	depot_tools_testutils "go.skia.org/infra/go/depot_tools/testutils"
+	"go.skia.org/infra/go/git"
+	"go.skia.org/infra/go/git/repograph"
 	"go.skia.org/infra/go/metrics2/events"
+	metrics2_testutils "go.skia.org/infra/go/metrics2/testutils"
 	"go.skia.org/infra/go/testutils"
 	"go.skia.org/infra/go/util"
 	"go.skia.org/infra/task_scheduler/go/db"
 	"go.skia.org/infra/task_scheduler/go/db/memory"
+	"go.skia.org/infra/task_scheduler/go/specs"
+	specs_testutils "go.skia.org/infra/task_scheduler/go/specs/testutils"
 	"go.skia.org/infra/task_scheduler/go/types"
 )
 
@@ -330,3 +338,135 @@
 
 	tester.Run(evs)
 }
+
+func TestOverdueJobSpecMetrics(t *testing.T) {
+	testutils.LargeTest(t)
+
+	wd, err := ioutil.TempDir("", "")
+	assert.NoError(t, err)
+	defer testutils.RemoveAll(t, wd)
+
+	d := memory.NewInMemoryDB(nil)
+	ctx, gb, _, _ := specs_testutils.SetupTestRepo(t)
+	repos, err := repograph.NewMap(ctx, []string{gb.RepoUrl()}, wd)
+	assert.NoError(t, err)
+	assert.NoError(t, repos.Update(ctx))
+	repo := repos[gb.RepoUrl()]
+
+	depotTools := depot_tools_testutils.GetDepotTools(t, ctx)
+	btProject, btInstance, btCleanup := specs_testutils.SetupBigTable(t)
+	defer btCleanup()
+	tcc, err := specs.NewTaskCfgCache(ctx, repos, depotTools, path.Join(wd, "taskCfgCache"), 1, btProject, btInstance, nil)
+	assert.NoError(t, err)
+
+	c1, err := git.GitDir(gb.Dir()).RevParse(ctx, "HEAD^")
+	assert.NoError(t, err)
+	c1time := repo.Get(c1).Timestamp
+	c2, err := git.GitDir(gb.Dir()).RevParse(ctx, "HEAD")
+	assert.NoError(t, err)
+	// c2 is 5 seconds after c1
+	c2time := repo.Get(c2).Timestamp
+
+	// At 'now', c1 is 60 seconds old, c2 is 55 seconds old, and c3 (below) is 50 seconds old.
+	now := c1time.Add(time.Minute)
+	c1age := "60.0"
+	c2age := "55.0"
+	c3age := "50.0"
+
+	check := func(buildAge, testAge, perfAge string) {
+		tags := map[string]string{
+			"repo":        gb.RepoUrl(),
+			"job_name":    specs_testutils.BuildTask,
+			"job_trigger": "",
+		}
+		assert.Equal(t, buildAge, metrics2_testutils.GetRecordedMetric(t, MEASUREMENT_OVERDUE_JOB_SPECS, tags))
+
+		tags["job_name"] = specs_testutils.TestTask
+		assert.Equal(t, testAge, metrics2_testutils.GetRecordedMetric(t, MEASUREMENT_OVERDUE_JOB_SPECS, tags))
+
+		tags["job_name"] = specs_testutils.PerfTask
+		assert.Equal(t, perfAge, metrics2_testutils.GetRecordedMetric(t, MEASUREMENT_OVERDUE_JOB_SPECS, tags))
+	}
+
+	om, err := newOverdueJobMetrics(d, repos, tcc)
+	assert.NoError(t, err)
+
+	// No jobs have finished yet.
+	assert.NoError(t, om.updateOverdueJobSpecMetrics(ctx, now))
+	check(c1age, c1age, c2age)
+
+	// Insert jobs.
+	j1 := &types.Job{
+		Name: specs_testutils.BuildTask,
+		RepoState: types.RepoState{
+			Repo:     gb.RepoUrl(),
+			Revision: c1,
+		},
+		Created: c1time,
+	}
+	j2 := &types.Job{
+		Name: specs_testutils.BuildTask,
+		RepoState: types.RepoState{
+			Repo:     gb.RepoUrl(),
+			Revision: c2,
+		},
+		Created: c2time,
+	}
+	j3 := &types.Job{
+		Name: specs_testutils.TestTask,
+		RepoState: types.RepoState{
+			Repo:     gb.RepoUrl(),
+			Revision: c1,
+		},
+		Created: c1time,
+	}
+	j4 := &types.Job{
+		Name: specs_testutils.TestTask,
+		RepoState: types.RepoState{
+			Repo:     gb.RepoUrl(),
+			Revision: c2,
+		},
+		Created: c2time,
+	}
+	j5 := &types.Job{
+		Name: specs_testutils.PerfTask,
+		RepoState: types.RepoState{
+			Repo:     gb.RepoUrl(),
+			Revision: c2,
+		},
+		Created: c2time,
+	}
+	assert.NoError(t, d.PutJobs([]*types.Job{j1, j2, j3, j4, j5}))
+	// Jobs have not completed, so same as above.
+	assert.NoError(t, om.updateOverdueJobSpecMetrics(ctx, now))
+	check(c1age, c1age, c2age)
+
+	// One job is complete.
+	j2.Status = types.JOB_STATUS_SUCCESS
+	j2.Finished = time.Now()
+	assert.NoError(t, d.PutJob(j2))
+	// Expect Build to be up-to-date.
+	assert.NoError(t, om.updateOverdueJobSpecMetrics(ctx, now))
+	check("0.0", c1age, c2age)
+
+	// Revert back to c1 (no Perf task) and check that Perf job disappears.
+	content, err := repo.Repo().GetFile(ctx, "infra/bots/tasks.json", c1)
+	assert.NoError(t, err)
+	gb.Add(ctx, "infra/bots/tasks.json", content)
+	c3 := gb.CommitMsgAt(ctx, "c3", c1time.Add(10*time.Second)) // 5 seconds after c2
+	assert.NoError(t, repos.Update(ctx))
+	c3time := repo.Get(c3).Timestamp
+
+	// Update to c3. Perf job should be reset to zero. Build job age is now at c3.
+	j6 := &types.Job{
+		Name: specs_testutils.BuildTask,
+		RepoState: types.RepoState{
+			Repo:     gb.RepoUrl(),
+			Revision: c3,
+		},
+		Created: c3time,
+	}
+	assert.NoError(t, d.PutJob(j6))
+	assert.NoError(t, om.updateOverdueJobSpecMetrics(ctx, now))
+	check(c3age, c1age, "0.0")
+}
diff --git a/datahopper/go/datahopper/main.go b/datahopper/go/datahopper/main.go
index d743feb..0b4ba0b 100644
--- a/datahopper/go/datahopper/main.go
+++ b/datahopper/go/datahopper/main.go
@@ -20,6 +20,7 @@
 	"go.skia.org/infra/datahopper/go/swarming_metrics"
 	"go.skia.org/infra/go/auth"
 	"go.skia.org/infra/go/common"
+	"go.skia.org/infra/go/depot_tools"
 	"go.skia.org/infra/go/gcs"
 	"go.skia.org/infra/go/git/repograph"
 	"go.skia.org/infra/go/httputils"
@@ -27,11 +28,13 @@
 	"go.skia.org/infra/go/sklog"
 	"go.skia.org/infra/go/swarming"
 	"go.skia.org/infra/go/taskname"
+	"go.skia.org/infra/go/util"
 	"go.skia.org/infra/perf/go/perfclient"
 	"go.skia.org/infra/task_scheduler/go/db"
 	"go.skia.org/infra/task_scheduler/go/db/firestore"
 	"go.skia.org/infra/task_scheduler/go/db/pubsub"
 	"go.skia.org/infra/task_scheduler/go/db/remote_db"
+	"go.skia.org/infra/task_scheduler/go/specs"
 	"google.golang.org/api/option"
 )
 
@@ -124,6 +127,36 @@
 	if err := repos.Update(ctx); err != nil {
 		sklog.Fatal(err)
 	}
+	go util.RepeatCtx(time.Minute, ctx, func() {
+		lvRepos := metrics2.NewLiveness("datahopper_repo_update")
+		if err := repos.Update(ctx); err != nil {
+			sklog.Errorf("Failed to update repos: %s", err)
+		} else {
+			lvRepos.Reset()
+		}
+	})
+
+	// TaskCfgCache.
+	if *recipesCfgFile == "" {
+		*recipesCfgFile = path.Join(*workdir, "recipes.cfg")
+	}
+	depotTools, err := depot_tools.Sync(ctx, w, *recipesCfgFile)
+	if err != nil {
+		sklog.Fatalf("Failed to sync depot_tools: %s", err)
+	}
+	newTs, err := auth.NewDefaultTokenSource(*local, auth.SCOPE_USERINFO_EMAIL, pubsub.AUTH_SCOPE, bigtable.Scope)
+	if err != nil {
+		sklog.Fatal(err)
+	}
+	tcc, err := specs.NewTaskCfgCache(ctx, repos, depotTools, path.Join(w, "taskCfgCache"), 1, *btProject, *btInstance, newTs)
+	if err != nil {
+		sklog.Fatalf("Failed to create TaskCfgCache: %s", err)
+	}
+	go util.RepeatCtx(30*time.Minute, ctx, func() {
+		if err := tcc.Cleanup(OVERDUE_JOB_METRICS_PERIOD); err != nil {
+			sklog.Errorf("Failed to cleanup TaskCfgCache: %s", err)
+		}
+	})
 
 	// Data generation goroutines.
 
@@ -164,10 +197,6 @@
 	}()
 
 	// Tasks metrics.
-	newTs, err := auth.NewDefaultTokenSource(*local, auth.SCOPE_USERINFO_EMAIL, pubsub.AUTH_SCOPE, bigtable.Scope)
-	if err != nil {
-		sklog.Fatal(err)
-	}
 	var d db.RemoteDB
 	if *firestoreInstance != "" {
 		label := "datahopper"
@@ -190,15 +219,12 @@
 	}
 
 	// Jobs metrics.
-	if err := StartJobMetrics(ctx, d); err != nil {
+	if err := StartJobMetrics(ctx, d, repos, tcc); err != nil {
 		sklog.Fatal(err)
 	}
 
 	// Generate "time to X% bot coverage" metrics.
-	if *recipesCfgFile == "" {
-		*recipesCfgFile = path.Join(*workdir, "recipes.cfg")
-	}
-	if err := bot_metrics.Start(ctx, d, *workdir, *recipesCfgFile, *btProject, *btInstance, newTs); err != nil {
+	if err := bot_metrics.Start(ctx, d, repos, tcc, *workdir); err != nil {
 		sklog.Fatal(err)
 	}
 
diff --git a/task_scheduler/go/db/cache/cache.go b/task_scheduler/go/db/cache/cache.go
index 5f79ce2..5cc4661 100644
--- a/task_scheduler/go/db/cache/cache.go
+++ b/task_scheduler/go/db/cache/cache.go
@@ -464,7 +464,7 @@
 }
 
 type jobCache struct {
-	db                   db.JobDB
+	db                   db.JobReader
 	getRevisionTimestamp db.GetRevisionTimestamp
 	mtx                  sync.RWMutex
 	queryId              string
@@ -688,7 +688,7 @@
 
 // NewJobCache returns a local cache which provides more convenient views of
 // job data than the database can provide.
-func NewJobCache(d db.JobDB, timeWindow *window.Window, getRevisionTimestamp db.GetRevisionTimestamp) (JobCache, error) {
+func NewJobCache(d db.JobReader, timeWindow *window.Window, getRevisionTimestamp db.GetRevisionTimestamp) (JobCache, error) {
 	tc := &jobCache{
 		db:                   d,
 		getRevisionTimestamp: getRevisionTimestamp,
diff --git a/task_scheduler/go/scheduling/task_scheduler.go b/task_scheduler/go/scheduling/task_scheduler.go
index 411e2d1..d870212 100644
--- a/task_scheduler/go/scheduling/task_scheduler.go
+++ b/task_scheduler/go/scheduling/task_scheduler.go
@@ -53,10 +53,6 @@
 	// Measurement name for task candidate counts by dimension set.
 	MEASUREMENT_TASK_CANDIDATE_COUNT = "task_candidate_count"
 
-	// Measurement name for overdue job specs. Records the age of the oldest commit for which the
-	// job has not completed (nor for any later commit), for each job spec and for each repo.
-	MEASUREMENT_OVERDUE_JOB_SPECS = "overdue_job_specs_s"
-
 	NUM_TOP_CANDIDATES = 50
 )
 
@@ -80,17 +76,6 @@
 	ERR_BLAMELIST_DONE = errors.New("ERR_BLAMELIST_DONE")
 )
 
-// overdueJobSpecMetricKey is a map key for TaskScheduler.overdueMetrics. The tags added to the
-// metric are reflected here so that we delete/recreate the metric when the tags change.
-type overdueJobSpecMetricKey struct {
-	// Repo URL.
-	Repo string
-	// Job name.
-	Job string
-	// Job trigger.
-	Trigger string
-}
-
 // TaskScheduler is a struct used for scheduling tasks on bots.
 type TaskScheduler struct {
 	bl                  *blacklist.Blacklist
@@ -120,10 +105,8 @@
 	tCache           cache.TaskCache
 	timeDecayAmt24Hr float64
 	tryjobs          *tryjobs.TryJobIntegrator
-	// Metric for age of commit with no completed job, in ns.
-	overdueMetrics map[overdueJobSpecMetricKey]metrics2.Int64Metric
-	window         *window.Window
-	workdir        string
+	window           *window.Window
+	workdir          string
 }
 
 func NewTaskScheduler(ctx context.Context, d db.DB, bl *blacklist.Blacklist, period time.Duration, numCommits int, workdir, host string, repos repograph.Map, isolateClient *isolate.Client, swarmingClient swarming.ApiClient, c *http.Client, timeDecayAmt24Hr float64, buildbucketApiUrl, trybotBucket string, projectRepoMapping map[string]string, pools []string, pubsubTopic, depotTools string, gerrit gerrit.GerritInterface, btProject, btInstance string, ts oauth2.TokenSource) (*TaskScheduler, error) {
@@ -179,7 +162,6 @@
 		tCache:           tCache,
 		timeDecayAmt24Hr: timeDecayAmt24Hr,
 		tryjobs:          tryjobs,
-		overdueMetrics:   map[overdueJobSpecMetricKey]metrics2.Int64Metric{},
 		window:           w,
 		workdir:          workdir,
 	}
@@ -193,22 +175,12 @@
 		s.tryjobs.Start(ctx)
 	}
 	lvScheduling := metrics2.NewLiveness("last_successful_task_scheduling")
-	lvOverdueMetrics := metrics2.NewLiveness("last_successful_overdue_metrics_update")
 	go util.RepeatCtx(5*time.Second, ctx, func() {
 		beforeMainLoop()
 		if err := s.MainLoop(ctx); err != nil {
 			sklog.Errorf("Failed to run the task scheduler: %s", err)
 		} else {
 			lvScheduling.Reset()
-			// Do this after MainLoop so that the repos and Jobs have been updated.
-			go func() {
-				if err := s.updateOverdueJobSpecMetrics(ctx, time.Now()); err != nil {
-					sklog.Errorf("Failed to update metrics for overdue job specs: %s", err)
-				} else {
-					lvOverdueMetrics.Reset()
-				}
-			}()
-
 		}
 	})
 	lvUpdate := metrics2.NewLiveness("last_successful_tasks_update")
@@ -2102,110 +2074,3 @@
 	}
 	return true
 }
-
-// updateOverdueJobSpecMetrics updates metrics for MEASUREMENT_OVERDUE_JOB_SPECS.
-func (s *TaskScheduler) updateOverdueJobSpecMetrics(ctx context.Context, now time.Time) error {
-	defer metrics2.FuncTimer().Stop()
-
-	// Process each repo individually.
-	for repoUrl, repo := range s.repos {
-		// Include only the jobs at current master. We don't report on JobSpecs that have been removed.
-		head := repo.Get("master")
-		if head == nil {
-			return sklog.FmtErrorf("Can't resolve %q in %q.", "master", repoUrl)
-		}
-		headTaskCfg, err := s.taskCfgCache.ReadTasksCfg(ctx, types.RepoState{
-			Repo:     repoUrl,
-			Revision: head.Hash,
-		})
-		if err != nil {
-			return sklog.FmtErrorf("Error reading TaskCfg for %q at %q: %s", repoUrl, head.Hash, err)
-		}
-		// Set of JobSpec names left to process.
-		todo := util.StringSet{}
-		// Maps JobSpec name to time of oldest untested commit; initialized to 'now' and updated each
-		// time we see an untested commit.
-		times := map[string]time.Time{}
-		for name := range headTaskCfg.Jobs {
-			todo[name] = true
-			times[name] = now
-		}
-
-		// Iterate backwards to find the most-recently tested commit. We're not going to worry about
-		// merges -- if a job was run on both branches, we'll use the first commit we come across.
-		if err := head.Recurse(func(c *repograph.Commit) (bool, error) {
-			// Stop if this commit is outside the scheduling window.
-			if in, err := s.window.TestCommitHash(repoUrl, c.Hash); err != nil {
-				return false, sklog.FmtErrorf("TestCommitHash: %s", err)
-			} else if !in {
-				return false, nil
-			}
-			rs := types.RepoState{
-				Repo:     repoUrl,
-				Revision: c.Hash,
-			}
-			// Look in the cache for each remaining JobSpec at this commit.
-			for name := range todo {
-				jobs, err := s.jCache.GetJobsByRepoState(name, rs)
-				if err != nil {
-					return false, sklog.FmtErrorf("GetJobsByRepoState: %s", err)
-				}
-				for _, j := range jobs {
-					if j.Done() {
-						delete(todo, name)
-						break
-					}
-				}
-			}
-			if len(todo) == 0 {
-				return false, nil
-			}
-			// Check that the remaining JobSpecs are still valid at this commit.
-			taskCfg, err := s.taskCfgCache.ReadTasksCfg(ctx, rs)
-			if err != nil {
-				return false, sklog.FmtErrorf("Error reading TaskCfg for %q at %q: %s", repoUrl, c.Hash, err)
-			}
-			for name := range todo {
-				if _, ok := taskCfg.Jobs[name]; !ok {
-					delete(todo, name)
-				} else {
-					// Set times, since this job still exists and we haven't seen a completed job.
-					times[name] = c.Timestamp
-				}
-			}
-			return len(todo) > 0, nil
-		}); err != nil {
-			return err
-		}
-		// Delete metrics for jobs that have been removed or whose tags have changed.
-		for key, m := range s.overdueMetrics {
-			if key.Repo != repoUrl {
-				continue
-			}
-			if jobCfg, ok := headTaskCfg.Jobs[key.Job]; !ok || jobCfg.Trigger != key.Trigger {
-				// Set to 0 before deleting so that alerts ignore it.
-				m.Update(0)
-				delete(s.overdueMetrics, key)
-			}
-		}
-		// Update metrics or add metrics for any new jobs (or other tag changes).
-		for name, ts := range times {
-			key := overdueJobSpecMetricKey{
-				Repo:    repoUrl,
-				Job:     name,
-				Trigger: headTaskCfg.Jobs[name].Trigger,
-			}
-			m, ok := s.overdueMetrics[key]
-			if !ok {
-				m = metrics2.GetInt64Metric(MEASUREMENT_OVERDUE_JOB_SPECS, map[string]string{
-					"repo":        key.Repo,
-					"job_name":    key.Job,
-					"job_trigger": key.Trigger,
-				})
-				s.overdueMetrics[key] = m
-			}
-			m.Update(int64(now.Sub(ts).Seconds()))
-		}
-	}
-	return nil
-}
diff --git a/task_scheduler/go/scheduling/task_scheduler_test.go b/task_scheduler/go/scheduling/task_scheduler_test.go
index ad14c96..0c6c4db 100644
--- a/task_scheduler/go/scheduling/task_scheduler_test.go
+++ b/task_scheduler/go/scheduling/task_scheduler_test.go
@@ -24,7 +24,6 @@
 	"go.skia.org/infra/go/git/repograph"
 	git_testutils "go.skia.org/infra/go/git/testutils"
 	"go.skia.org/infra/go/isolate"
-	metrics2_testutils "go.skia.org/infra/go/metrics2/testutils"
 	"go.skia.org/infra/go/mockhttpclient"
 	"go.skia.org/infra/go/swarming"
 	"go.skia.org/infra/go/testutils"
@@ -3683,89 +3682,3 @@
 	sort.Strings(t1.Commits)
 	deepequal.AssertDeepEqual(t, expect1, t1.Commits)
 }
-
-func TestOverdueJobSpecMetrics(t *testing.T) {
-	ctx, gb, d, swarmingClient, s, _, cleanup := setup(t)
-	defer cleanup()
-
-	repo := s.repos[gb.RepoUrl()]
-	c1 := getRS1(t, ctx, gb).Revision
-	c1time := repo.Get(c1).Timestamp
-	c2 := getRS2(t, ctx, gb).Revision
-	// c2 is 5 seconds after c1
-
-	// At 'now', c1 is 60 seconds old, c2 is 55 seconds old, and c3 (below) is 50 seconds old.
-	now := c1time.Add(time.Minute)
-	c1age := "60.0"
-	c2age := "55.0"
-	c3age := "50.0"
-
-	check := func(buildAge, testAge, perfAge string) {
-		tags := map[string]string{
-			"repo":        gb.RepoUrl(),
-			"job_name":    specs_testutils.BuildTask,
-			"job_trigger": "",
-		}
-		assert.Equal(t, buildAge, metrics2_testutils.GetRecordedMetric(t, MEASUREMENT_OVERDUE_JOB_SPECS, tags))
-
-		tags["job_name"] = specs_testutils.TestTask
-		assert.Equal(t, testAge, metrics2_testutils.GetRecordedMetric(t, MEASUREMENT_OVERDUE_JOB_SPECS, tags))
-
-		tags["job_name"] = specs_testutils.PerfTask
-		assert.Equal(t, perfAge, metrics2_testutils.GetRecordedMetric(t, MEASUREMENT_OVERDUE_JOB_SPECS, tags))
-	}
-
-	// Expect no errors before MainLoop.
-	assert.NoError(t, s.updateOverdueJobSpecMetrics(ctx, now))
-	// Build and Test were added in c1; Perf was added in c2.
-	check(c1age, c1age, c2age)
-
-	// Run MainLoop. No free bots, so no tasks to complete.
-	assert.NoError(t, s.MainLoop(ctx))
-	assert.NoError(t, s.updateOverdueJobSpecMetrics(ctx, now))
-	check(c1age, c1age, c2age)
-
-	// One bot free, schedule a task.
-	bot1 := makeBot("bot1", map[string]string{
-		"pool": "Skia",
-		"os":   "Ubuntu",
-	})
-	swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{bot1})
-	assert.NoError(t, s.MainLoop(ctx))
-	assert.NoError(t, s.tCache.Update())
-	tasks, err := s.tCache.GetTasksForCommits(gb.RepoUrl(), []string{c1, c2})
-	assert.NoError(t, err)
-	t1 := tasks[c2][specs_testutils.BuildTask]
-	assert.NotNil(t, t1)
-	assert.Equal(t, c2, t1.Revision)
-	// Task has not completed, so same as above.
-	assert.NoError(t, s.updateOverdueJobSpecMetrics(ctx, now))
-	check(c1age, c1age, c2age)
-
-	// The task is complete.
-	t1.Status = types.TASK_STATUS_SUCCESS
-	t1.Finished = time.Now()
-	t1.IsolatedOutput = "abc123"
-	assert.NoError(t, d.PutTask(t1))
-	swarmingClient.MockTasks([]*swarming_api.SwarmingRpcsTaskRequestMetadata{
-		makeSwarmingRpcsTaskRequestMetadata(t, t1, linuxTaskDims),
-	})
-
-	// Run MainLoop to update Jobs.
-	swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{})
-	assert.NoError(t, s.MainLoop(ctx))
-	// Expect Build to be up-to-date.
-	assert.NoError(t, s.updateOverdueJobSpecMetrics(ctx, now))
-	check("0.0", c1age, c2age)
-
-	// Revert back to c1 (no Perf task) and check that Perf job disappears.
-	content, err := repo.Repo().GetFile(ctx, "infra/bots/tasks.json", c1)
-	assert.NoError(t, err)
-	gb.Add(ctx, "infra/bots/tasks.json", content)
-	_ = gb.CommitMsgAt(ctx, "c3", c1time.Add(10*time.Second)) // 5 seconds after c2
-
-	// Run MainLoop to update to c3. Perf job should be reset to zero. Build job age is now at c3.
-	assert.NoError(t, s.MainLoop(ctx))
-	assert.NoError(t, s.updateOverdueJobSpecMetrics(ctx, now))
-	check(c3age, c1age, "0.0")
-}