[datahopper] Add latest_job_age_s metric

Bug: skia:
Change-Id: Ic3c2025cf410d8da3fde2f62e862eed3bda34678
Reviewed-on: https://skia-review.googlesource.com/c/184069
Commit-Queue: Eric Boren <borenet@google.com>
Reviewed-by: Ben Wagner <benjaminwagner@google.com>
diff --git a/datahopper/go/datahopper/jobs.go b/datahopper/go/datahopper/jobs.go
index c06fd8e..0282c66 100644
--- a/datahopper/go/datahopper/jobs.go
+++ b/datahopper/go/datahopper/jobs.go
@@ -33,6 +33,10 @@
 	// Measurement name for overdue job specs. Records the age of the oldest commit for which the
 	// job has not completed (nor for any later commit), for each job spec and for each repo.
 	MEASUREMENT_OVERDUE_JOB_SPECS = "overdue_job_specs_s"
+
+	// Measurement indicating the age of the most-recently created job, by
+	// name, in seconds.
+	MEASUREMENT_LATEST_JOB_AGE = "latest_job_age_s"
 )
 
 const (
@@ -311,9 +315,9 @@
 	return nil
 }
 
-// overdueJobSpecMetricKey is a map key for overdueJobMetrics.overdueMetrics. The tags added to the
+// jobSpecMetricKey is a map key for overdueJobMetrics.overdueMetrics. The tags added to the
 // metric are reflected here so that we delete/recreate the metric when the tags change.
-type overdueJobSpecMetricKey struct {
+type jobSpecMetricKey struct {
 	// Repo URL.
 	Repo string
 	// Job name.
@@ -323,8 +327,11 @@
 }
 
 type overdueJobMetrics struct {
-	// Metric for age of commit with no completed job, in ns.
-	overdueMetrics map[overdueJobSpecMetricKey]metrics2.Int64Metric
+	// Metric for age of commit with no completed job, in seconds.
+	overdueMetrics map[jobSpecMetricKey]metrics2.Int64Metric
+
+	// Metric for age of last-created job by name, in seconds.
+	prevLatestJobAge map[jobSpecMetricKey]metrics2.Int64Metric
 
 	jCache       cache.JobCache
 	repos        repograph.Map
@@ -344,7 +351,7 @@
 		return nil, err
 	}
 	return &overdueJobMetrics{
-		overdueMetrics: map[overdueJobSpecMetricKey]metrics2.Int64Metric{},
+		overdueMetrics: map[jobSpecMetricKey]metrics2.Int64Metric{},
 		jCache:         jCache,
 		repos:          repos,
 		taskCfgCache:   tcc,
@@ -451,14 +458,18 @@
 				continue
 			}
 			if jobCfg, ok := headTaskCfg.Jobs[key.Job]; !ok || jobCfg.Trigger != key.Trigger {
-				// Set to 0 before deleting so that alerts ignore it.
-				metric.Update(0)
-				delete(m.overdueMetrics, key)
+				if err := metric.Delete(); err != nil {
+					sklog.Errorf("Failed to delete metric: %s", err)
+					// Set to 0; we'll attempt to remove on the next cycle.
+					metric.Update(0)
+				} else {
+					delete(m.overdueMetrics, key)
+				}
 			}
 		}
 		// Update metrics or add metrics for any new jobs (or other tag changes).
 		for name, ts := range times {
-			key := overdueJobSpecMetricKey{
+			key := jobSpecMetricKey{
 				Repo:    repoUrl,
 				Job:     name,
 				Trigger: headTaskCfg.Jobs[name].Trigger,
@@ -474,6 +485,52 @@
 			}
 			metric.Update(int64(now.Sub(ts).Seconds()))
 		}
+
+		// Record the age of the most-recently created job for each
+		// JobSpec.
+		names := []string{}
+		for name, jobSpec := range headTaskCfg.Jobs {
+			// We're only interested in periodic jobs for this metric.
+			if util.In(jobSpec.Trigger, specs.PERIODIC_TRIGGERS) {
+				names = append(names, name)
+			}
+		}
+		jobsByName, err := m.jCache.GetMatchingJobsFromDateRange(names, m.window.Start(repoUrl), now)
+		if err != nil {
+			return err
+		}
+		latestJobAge := make(map[jobSpecMetricKey]metrics2.Int64Metric, len(names))
+		for name, jobs := range jobsByName {
+			key := jobSpecMetricKey{
+				Repo:    repoUrl,
+				Job:     name,
+				Trigger: headTaskCfg.Jobs[name].Trigger,
+			}
+			latest := time.Time{}
+			for _, job := range jobs {
+				if job.Created.After(latest) {
+					latest = job.Created
+				}
+			}
+			metric := metrics2.GetInt64Metric(MEASUREMENT_LATEST_JOB_AGE, map[string]string{
+				"repo":        key.Repo,
+				"job_name":    key.Job,
+				"job_trigger": key.Trigger,
+			})
+			metric.Update(int64(now.Sub(latest).Seconds()))
+			latestJobAge[key] = metric
+		}
+		for key, metric := range m.prevLatestJobAge {
+			if _, ok := latestJobAge[key]; !ok {
+				if err := metric.Delete(); err != nil {
+					sklog.Errorf("Failed to delete metric: %s", err)
+					// Set to 0 and attempt to remove on the next cycle.
+					metric.Update(0)
+					latestJobAge[key] = metric
+				}
+			}
+		}
+		m.prevLatestJobAge = latestJobAge
 	}
 	return nil
 }
diff --git a/datahopper/go/datahopper/jobs_test.go b/datahopper/go/datahopper/jobs_test.go
index 5aa928e..c2ec49f 100644
--- a/datahopper/go/datahopper/jobs_test.go
+++ b/datahopper/go/datahopper/jobs_test.go
@@ -3,6 +3,7 @@
 import (
 	"bytes"
 	"encoding/gob"
+	"fmt"
 	"io/ioutil"
 	"path"
 	"testing"
@@ -457,7 +458,7 @@
 	assert.NoError(t, repos.Update(ctx))
 	c3time := repo.Get(c3).Timestamp
 
-	// Update to c3. Perf job should be reset to zero. Build job age is now at c3.
+	// Update to c3. Build job age is now at c3. Perf job should be missing.
 	j6 := &types.Job{
 		Name: specs_testutils.BuildTask,
 		RepoState: types.RepoState{
@@ -468,5 +469,5 @@
 	}
 	assert.NoError(t, d.PutJob(j6))
 	assert.NoError(t, om.updateOverdueJobSpecMetrics(ctx, now))
-	check(c3age, c1age, "0.0")
+	check(c3age, c1age, fmt.Sprintf("Could not find anything for overdue_job_specs_s{job_name=\"%s\",job_trigger=\"\",repo=\"%s\"}", specs_testutils.PerfTask, gb.RepoUrl()))
 }