[datahopper] Add metrics for flaky tasks

Bug: skia:
Change-Id: I2cbb2b15156705daad7da7717ebd876987e04b5d
Reviewed-on: https://skia-review.googlesource.com/c/171481
Commit-Queue: Eric Boren <borenet@google.com>
Reviewed-by: Ben Wagner <benjaminwagner@google.com>
diff --git a/datahopper/go/datahopper/jobs.go b/datahopper/go/datahopper/jobs.go
index 61c50a8..065618d 100644
--- a/datahopper/go/datahopper/jobs.go
+++ b/datahopper/go/datahopper/jobs.go
@@ -29,11 +29,11 @@
 )
 
 const (
-	STREAM = "job_metrics"
+	JOB_STREAM = "job_metrics"
 )
 
-// jobTypeString is an enum of the types of Jobs that computeAvgDuration will aggregate separately.
-// The string value is the same as the job_type tag value returned by computeAvgDuration.
+// jobTypeString is an enum of the types of Jobs that computeAvgJobDuration will aggregate separately.
+// The string value is the same as the job_type tag value returned by computeAvgJobDuration.
 type jobTypeString string
 
 const (
@@ -54,7 +54,7 @@
 	// 3. Thread 1: EventManager.updateMetrics calls jobEventDB.Range, which waits
 	//    to lock jobEventDB.mtx
 	// 4. Thread 2: jobEventDB.update calls EventMetrics.AggregateMetric (by way
-	//    of addAggregates and EventStream.AggregateMetric), which waits to lock
+	//    of addJobAggregates and EventStream.AggregateMetric), which waits to lock
 	//    EventMetrics.mtx
 	mtx sync.Mutex
 }
@@ -118,7 +118,7 @@
 			return fmt.Errorf("Failed to encode %#v to GOB: %s", job, err)
 		}
 		ev := &events.Event{
-			Stream:    STREAM,
+			Stream:    JOB_STREAM,
 			Timestamp: job.Created,
 			Data:      buf.Bytes(),
 		}
@@ -130,12 +130,12 @@
 	return nil
 }
 
-// computeAvgDuration is an events.DynamicAggregateFn that returns metrics for average Job duration
+// computeAvgJobDuration is an events.DynamicAggregateFn that returns metrics for average Job duration
 // for Jobs with status SUCCESS or FAILURE, given a slice of Events created by jobEventDB.update.
 // The first return value will contain the tags "job_name" (db.Job.Name) and "job_type" (one of
 // "normal", "tryjob", "forced"), and the second return value will be the corresponding average Job
 // duration. Returns an error if Event.Data can't be GOB-decoded as a db.Job.
-func computeAvgDuration(ev []*events.Event) ([]map[string]string, []float64, error) {
+func computeAvgJobDuration(ev []*events.Event) ([]map[string]string, []float64, error) {
 	if len(ev) > 0 {
 		// ev should be ordered by timestamp
 		sklog.Debugf("Calculating avg-duration for %d jobs since %s.", len(ev), ev[0].Timestamp)
@@ -196,12 +196,12 @@
 	return rvTags, rvVals, nil
 }
 
-// computeAvgDuration is an events.DynamicAggregateFn that returns metrics for Job failure rate and
+// computeJobFailureMishapRate is an events.DynamicAggregateFn that returns metrics for Job failure rate and
 // mishap rate, given a slice of Events created by jobEventDB.update. The first return value will
 // contain the tags "job_name" (db.Job.Name) and "metric" (one of "failure-rate", "mishap-rate"),
 // and the second return value will be the corresponding ratio of failed/mishap Jobs to all
 // completed Jobs. Returns an error if Event.Data can't be GOB-decoded as a db.Job.
-func computeFailureMishapRate(ev []*events.Event) ([]map[string]string, []float64, error) {
+func computeJobFailureMishapRate(ev []*events.Event) ([]map[string]string, []float64, error) {
 	if len(ev) > 0 {
 		// ev should be ordered by timestamp
 		sklog.Debugf("Calculating failure-rate for %d jobs since %s.", len(ev), ev[0].Timestamp)
@@ -250,15 +250,15 @@
 	return rvTags, rvVals, nil
 }
 
-// addAggregates adds aggregation functions for job events to the EventStream.
-func addAggregates(s *events.EventStream) error {
+// addJobAggregates adds aggregation functions for job events to the EventStream.
+func addJobAggregates(s *events.EventStream) error {
 	for _, period := range TIME_PERIODS {
-		if err := s.DynamicMetric(map[string]string{"metric": "avg-duration"}, period, computeAvgDuration); err != nil {
+		if err := s.DynamicMetric(map[string]string{"metric": "avg-duration"}, period, computeAvgJobDuration); err != nil {
 			return err
 		}
 
 		// Job failure/mishap rate.
-		if err := s.DynamicMetric(nil, period, computeFailureMishapRate); err != nil {
+		if err := s.DynamicMetric(nil, period, computeJobFailureMishapRate); err != nil {
 			return err
 		}
 	}
@@ -281,8 +281,8 @@
 	}
 	edb.em = em
 
-	s := em.GetEventStream(STREAM)
-	if err := addAggregates(s); err != nil {
+	s := em.GetEventStream(JOB_STREAM)
+	if err := addJobAggregates(s); err != nil {
 		return err
 	}
 
diff --git a/datahopper/go/datahopper/jobs_test.go b/datahopper/go/datahopper/jobs_test.go
index 7c4818a..be342f4 100644
--- a/datahopper/go/datahopper/jobs_test.go
+++ b/datahopper/go/datahopper/jobs_test.go
@@ -15,7 +15,7 @@
 )
 
 // Create a db.JobDB and jobEventDB.
-func setup(t *testing.T, now time.Time) (*jobEventDB, db.JobDB) {
+func setupJobs(t *testing.T, now time.Time) (*jobEventDB, db.JobDB) {
 	jdb := db.NewInMemoryJobDB()
 	edb := &jobEventDB{
 		cached: []*events.Event{},
@@ -45,20 +45,20 @@
 	return j
 }
 
-// assertEvent checks that ev.Data contains j.
-func assertEvent(t *testing.T, ev *events.Event, j *db.Job) {
-	assert.Equal(t, STREAM, ev.Stream)
+// assertJobEvent checks that ev.Data contains j.
+func assertJobEvent(t *testing.T, ev *events.Event, j *db.Job) {
+	assert.Equal(t, JOB_STREAM, ev.Stream)
 	var job db.Job
 	assert.NoError(t, gob.NewDecoder(bytes.NewReader(ev.Data)).Decode(&job))
 	deepequal.AssertDeepEqual(t, j, &job)
 	assert.True(t, j.Created.Equal(ev.Timestamp))
 }
 
-// TestUpdate checks that jobEventDB.update creates the correct Events from Jobs in the DB.
-func TestUpdate(t *testing.T) {
+// TestJobUpdate checks that jobEventDB.update creates the correct Events from Jobs in the DB.
+func TestJobUpdate(t *testing.T) {
 	testutils.SmallTest(t)
 	now := time.Now()
-	edb, jdb := setup(t, now)
+	edb, jdb := setupJobs(t, now)
 	start := now.Add(-TIME_PERIODS[len(TIME_PERIODS)-1])
 	jobs := []*db.Job{
 		// 0: Filtered out -- too early.
@@ -74,21 +74,21 @@
 	}
 	assert.NoError(t, jdb.PutJobs(jobs))
 	assert.NoError(t, edb.update())
-	evs, err := edb.Range(STREAM, start.Add(-time.Hour), start.Add(time.Hour))
+	evs, err := edb.Range(JOB_STREAM, start.Add(-time.Hour), start.Add(time.Hour))
 	assert.NoError(t, err)
 
 	expected := append(jobs[1:3], jobs[4:8]...)
 	assert.Len(t, evs, len(expected))
 	for i, ev := range evs {
-		assertEvent(t, ev, expected[i])
+		assertJobEvent(t, ev, expected[i])
 	}
 }
 
-// TestRange checks that jobEventDB.Range returns Events within the given range.
-func TestRange(t *testing.T) {
+// TestJobRange checks that jobEventDB.Range returns Events within the given range.
+func TestJobRange(t *testing.T) {
 	testutils.SmallTest(t)
 	now := time.Now()
-	edb, jdb := setup(t, now)
+	edb, jdb := setupJobs(t, now)
 	base := now.Add(-time.Hour)
 	jobs := []*db.Job{
 		makeJob(base.Add(-time.Nanosecond), "A", db.JOB_STATUS_SUCCESS, NORMAL, time.Minute),
@@ -100,11 +100,11 @@
 	assert.NoError(t, edb.update())
 
 	test := func(start, end time.Time, startIdx, count int) {
-		evs, err := edb.Range(STREAM, start, end)
+		evs, err := edb.Range(JOB_STREAM, start, end)
 		assert.NoError(t, err)
 		assert.Len(t, evs, count)
 		for i, ev := range evs {
-			assertEvent(t, ev, jobs[startIdx+i])
+			assertJobEvent(t, ev, jobs[startIdx+i])
 		}
 	}
 	before := base.Add(-time.Hour)
@@ -177,13 +177,13 @@
 	}
 }
 
-func TestComputeAvgDuration(t *testing.T) {
+func TestComputeAvgJobDuration(t *testing.T) {
 	testutils.SmallTest(t)
 	now := time.Now()
-	edb, jdb := setup(t, now)
+	edb, jdb := setupJobs(t, now)
 	created := now.Add(-time.Hour)
 
-	tester := newDynamicAggregateFnTester(t, computeAvgDuration)
+	tester := newDynamicAggregateFnTester(t, computeAvgJobDuration)
 	expect := func(jobName string, jobType jobTypeString, jobs []*db.Job) {
 		var totalDur float64 = 0
 		for _, j := range jobs {
@@ -230,20 +230,20 @@
 	expect("AllTypes", FORCED, jobsType[8:9])
 
 	assert.NoError(t, edb.update())
-	evs, err := edb.Range(STREAM, created.Add(-time.Hour), created.Add(time.Hour))
+	evs, err := edb.Range(JOB_STREAM, created.Add(-time.Hour), created.Add(time.Hour))
 	assert.NoError(t, err)
 	assert.Len(t, evs, len(jobsStatus)+len(jobsType))
 
 	tester.Run(evs)
 }
 
-func TestComputeFailureMishapRate(t *testing.T) {
+func TestComputeJobFailureMishapRate(t *testing.T) {
 	testutils.SmallTest(t)
 	now := time.Now()
-	edb, jdb := setup(t, now)
+	edb, jdb := setupJobs(t, now)
 	created := now.Add(-time.Hour)
 
-	tester := newDynamicAggregateFnTester(t, computeFailureMishapRate)
+	tester := newDynamicAggregateFnTester(t, computeJobFailureMishapRate)
 	expect := func(jobName string, metric string, numer, denom int) {
 		tester.AddAssert(map[string]string{
 			"job_name": jobName,
@@ -322,7 +322,7 @@
 	}
 
 	assert.NoError(t, edb.update())
-	evs, err := edb.Range(STREAM, created.Add(-time.Hour), created.Add(time.Hour))
+	evs, err := edb.Range(JOB_STREAM, created.Add(-time.Hour), created.Add(time.Hour))
 	assert.NoError(t, err)
 	assert.Len(t, evs, jobCount)
 
diff --git a/datahopper/go/datahopper/tasks.go b/datahopper/go/datahopper/tasks.go
new file mode 100644
index 0000000..65e79cb
--- /dev/null
+++ b/datahopper/go/datahopper/tasks.go
@@ -0,0 +1,207 @@
+package main
+
+import (
+	"bytes"
+	"context"
+	"encoding/gob"
+	"fmt"
+	"sort"
+	"sync"
+	"time"
+
+	"go.skia.org/infra/go/httputils"
+	"go.skia.org/infra/go/metrics2"
+	"go.skia.org/infra/go/metrics2/events"
+	"go.skia.org/infra/go/sklog"
+	"go.skia.org/infra/go/util"
+	"go.skia.org/infra/task_scheduler/go/db"
+	"go.skia.org/infra/task_scheduler/go/db/remote_db"
+	"go.skia.org/infra/task_scheduler/go/flakes"
+)
+
+const (
+	TASK_STREAM = "task_metrics"
+)
+
+// taskEventDB implements the events.EventDB interface.
+type taskEventDB struct {
+	cached []*events.Event
+	db     db.TaskReader
+	em     *events.EventMetrics
+	// Do not lock mtx when calling methods on em. Otherwise deadlock can occur.
+	// E.g. (now fixed):
+	// 1. Thread 1: EventManager.updateMetrics locks EventMetrics.mtx
+	// 2. Thread 2: taskEventDB.update locks taskEventDB.mtx
+	// 3. Thread 1: EventManager.updateMetrics calls taskEventDB.Range, which waits
+	//    to lock taskEventDB.mtx
+	// 4. Thread 2: taskEventDB.update calls EventMetrics.AggregateMetric (by way
+	//    of addTaskAggregates and EventStream.AggregateMetric), which waits to lock
+	//    EventMetrics.mtx
+	mtx sync.Mutex
+}
+
+// See docs for events.EventDB interface.
+func (t *taskEventDB) Append(string, []byte) error {
+	return fmt.Errorf("taskEventDB is read-only!")
+}
+
+// See docs for events.EventDB interface.
+func (t *taskEventDB) Close() error {
+	return nil
+}
+
+// See docs for events.EventDB interface.
+func (t *taskEventDB) Insert(*events.Event) error {
+	return fmt.Errorf("taskEventDB is read-only!")
+}
+
+// See docs for events.EventDB interface.
+func (t *taskEventDB) Range(stream string, start, end time.Time) ([]*events.Event, error) {
+	t.mtx.Lock()
+	defer t.mtx.Unlock()
+
+	n := len(t.cached)
+	if n == 0 {
+		return []*events.Event{}, nil
+	}
+
+	first := sort.Search(n, func(i int) bool {
+		return !t.cached[i].Timestamp.Before(start)
+	})
+
+	last := first + sort.Search(n-first, func(i int) bool {
+		return !t.cached[i+first].Timestamp.Before(end)
+	})
+
+	rv := make([]*events.Event, last-first, last-first)
+	copy(rv[:], t.cached[first:last])
+	return rv, nil
+}
+
+// update updates the cached tasks in the taskEventDB. Only a single thread may
+// call this method, but it can be called concurrently with other methods.
+func (t *taskEventDB) update() error {
+	defer metrics2.FuncTimer().Stop()
+	now := time.Now()
+	longestPeriod := TIME_PERIODS[len(TIME_PERIODS)-1]
+	tasks, err := t.db.GetTasksFromDateRange(now.Add(-longestPeriod), now, "")
+	if err != nil {
+		return fmt.Errorf("Failed to load tasks from %s to %s: %s", now.Add(-longestPeriod), now, err)
+	}
+	sklog.Debugf("taskEventDB.update: Processing %d tasks for time range %s to %s.", len(tasks), now.Add(-longestPeriod), now)
+	cached := make([]*events.Event, 0, len(tasks))
+	for _, task := range tasks {
+		if !task.Done() {
+			continue
+		}
+		var buf bytes.Buffer
+		if err := gob.NewEncoder(&buf).Encode(task); err != nil {
+			return fmt.Errorf("Failed to encode %#v to GOB: %s", task, err)
+		}
+		ev := &events.Event{
+			Stream:    TASK_STREAM,
+			Timestamp: task.Created,
+			Data:      buf.Bytes(),
+		}
+		cached = append(cached, ev)
+	}
+	t.mtx.Lock()
+	defer t.mtx.Unlock()
+	t.cached = cached
+	return nil
+}
+
+// computeTaskFlakeRate is an events.DynamicAggregateFn that returns metrics for Task flake rate, given
+// a slice of Events created by taskEventDB.update. The first return value will contain the tags
+// "task_name" (db.Task.Name) and "metric" (one of "failure-rate", "mishap-rate"),
+// and the second return value will be the corresponding ratio of failed/mishap Task to all
+// completed Tasks. Returns an error if Event.Data can't be GOB-decoded as a db.Task.
+func computeTaskFlakeRate(ev []*events.Event) ([]map[string]string, []float64, error) {
+	if len(ev) > 0 {
+		// ev should be ordered by timestamp
+		sklog.Debugf("Calculating flake-rate for %d tasks since %s.", len(ev), ev[0].Timestamp)
+	}
+	type taskSum struct {
+		flakes int
+		count  int
+	}
+	byTask := map[string]*taskSum{}
+	tasks := make([]*db.Task, 0, len(ev))
+	for _, e := range ev {
+		var task db.Task
+		if err := gob.NewDecoder(bytes.NewReader(e.Data)).Decode(&task); err != nil {
+			return nil, nil, err
+		}
+		tasks = append(tasks, &task)
+		entry, ok := byTask[task.Name]
+		if !ok {
+			entry = &taskSum{}
+			byTask[task.Name] = entry
+		}
+		entry.count++
+	}
+	flaky := flakes.FindFlakes(tasks)
+	for _, task := range flaky {
+		byTask[task.Name].flakes++
+	}
+	rvTags := make([]map[string]string, 0, len(byTask)*2)
+	rvVals := make([]float64, 0, len(byTask)*2)
+	add := func(taskName, metric string, value float64) {
+		rvTags = append(rvTags, map[string]string{
+			"task_name": taskName,
+			"metric":    metric,
+		})
+		rvVals = append(rvVals, value)
+	}
+	for taskName, taskSum := range byTask {
+		if taskSum.count == 0 {
+			continue
+		}
+		add(taskName, "flake-rate", float64(taskSum.flakes)/float64(taskSum.count))
+	}
+	return rvTags, rvVals, nil
+}
+
+// addTaskAggregates adds aggregation functions for job events to the EventStream.
+func addTaskAggregates(s *events.EventStream) error {
+	for _, period := range TIME_PERIODS {
+		// Flake rate.
+		if err := s.DynamicMetric(nil, period, computeTaskFlakeRate); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+// StartTaskMetrics starts a goroutine which ingests metrics data based on Tasks.
+func StartTaskMetrics(taskSchedulerDbUrl string, ctx context.Context) error {
+	db, err := remote_db.NewClient(taskSchedulerDbUrl, httputils.NewTimeoutClient())
+	if err != nil {
+		return err
+	}
+	edb := &taskEventDB{
+		cached: []*events.Event{},
+		db:     db,
+	}
+	em, err := events.NewEventMetrics(edb, "task_metrics")
+	if err != nil {
+		return err
+	}
+	edb.em = em
+
+	s := em.GetEventStream(TASK_STREAM)
+	if err := addTaskAggregates(s); err != nil {
+		return err
+	}
+
+	lv := metrics2.NewLiveness("last_successful_task_metrics_update")
+	go util.RepeatCtx(5*time.Minute, ctx, func() {
+		if err := edb.update(); err != nil {
+			sklog.Errorf("Failed to update task data: %s", err)
+		} else {
+			lv.Reset()
+		}
+	})
+	em.Start(ctx)
+	return nil
+}
diff --git a/datahopper/go/datahopper/tasks_test.go b/datahopper/go/datahopper/tasks_test.go
new file mode 100644
index 0000000..cabd391
--- /dev/null
+++ b/datahopper/go/datahopper/tasks_test.go
@@ -0,0 +1,207 @@
+package main
+
+import (
+	"bytes"
+	"encoding/gob"
+	"testing"
+	"time"
+
+	assert "github.com/stretchr/testify/require"
+	"go.skia.org/infra/go/deepequal"
+	"go.skia.org/infra/go/metrics2/events"
+	"go.skia.org/infra/go/testutils"
+	"go.skia.org/infra/task_scheduler/go/db"
+)
+
+// Create a db.TaskDB and taskEventDB.
+func setupTasks(t *testing.T, now time.Time) (*taskEventDB, db.TaskDB) {
+	tdb := db.NewInMemoryTaskDB()
+	edb := &taskEventDB{
+		cached: []*events.Event{},
+		db:     tdb,
+	}
+	return edb, tdb
+}
+
+// makeTask returns a fake task with only the fields relevant to this test set.
+func makeTask(created time.Time, name string, status db.TaskStatus) *db.Task {
+	task := &db.Task{
+		Created: created,
+		TaskKey: db.TaskKey{
+			Name: name,
+		},
+		Status: status,
+	}
+	return task
+}
+
+// assertTaskEvent checks that ev.Data contains task.
+func assertTaskEvent(t *testing.T, ev *events.Event, task *db.Task) {
+	assert.Equal(t, TASK_STREAM, ev.Stream)
+	var other db.Task
+	assert.NoError(t, gob.NewDecoder(bytes.NewReader(ev.Data)).Decode(&other))
+	deepequal.AssertDeepEqual(t, task, &other)
+	assert.True(t, task.Created.Equal(ev.Timestamp))
+}
+
+// TestTaskUpdate checks that taskEventDB.update creates the correct Events from Tasks in the DB.
+func TestTaskUpdate(t *testing.T) {
+	testutils.SmallTest(t)
+	now := time.Now()
+	edb, tdb := setupTasks(t, now)
+	start := now.Add(-TIME_PERIODS[len(TIME_PERIODS)-1])
+	tasks := []*db.Task{
+		// 0: Filtered out -- too early.
+		makeTask(start.Add(-time.Minute), "A", db.TASK_STATUS_SUCCESS),
+		makeTask(start.Add(time.Minute), "A", db.TASK_STATUS_SUCCESS),
+		makeTask(start.Add(2*time.Minute), "A", db.TASK_STATUS_FAILURE),
+		// 3: Filtered out -- not Done.
+		makeTask(start.Add(3*time.Minute), "A", db.TASK_STATUS_RUNNING),
+		makeTask(start.Add(4*time.Minute), "A", db.TASK_STATUS_MISHAP),
+		makeTask(start.Add(5*time.Minute), "A", db.TASK_STATUS_FAILURE),
+		makeTask(start.Add(6*time.Minute), "B", db.TASK_STATUS_SUCCESS),
+		makeTask(start.Add(7*time.Minute), "A", db.TASK_STATUS_SUCCESS),
+	}
+	assert.NoError(t, tdb.PutTasks(tasks))
+	assert.NoError(t, edb.update())
+	evs, err := edb.Range(TASK_STREAM, start.Add(-time.Hour), start.Add(time.Hour))
+	assert.NoError(t, err)
+
+	expected := append(tasks[1:3], tasks[4:8]...)
+	assert.Len(t, evs, len(expected))
+	for i, ev := range evs {
+		assertTaskEvent(t, ev, expected[i])
+	}
+}
+
+// TestTaskRange checks that taskEventDB.Range returns Events within the given range.
+func TestTaskRange(t *testing.T) {
+	testutils.SmallTest(t)
+	now := time.Now()
+	edb, tdb := setupTasks(t, now)
+	base := now.Add(-time.Hour)
+	tasks := []*db.Task{
+		makeTask(base.Add(-time.Nanosecond), "A", db.TASK_STATUS_SUCCESS),
+		makeTask(base, "A", db.TASK_STATUS_SUCCESS),
+		makeTask(base.Add(time.Nanosecond), "A", db.TASK_STATUS_SUCCESS),
+		makeTask(base.Add(time.Minute), "A", db.TASK_STATUS_SUCCESS),
+	}
+	assert.NoError(t, tdb.PutTasks(tasks))
+	assert.NoError(t, edb.update())
+
+	test := func(start, end time.Time, startIdx, count int) {
+		evs, err := edb.Range(TASK_STREAM, start, end)
+		assert.NoError(t, err)
+		assert.Len(t, evs, count)
+		for i, ev := range evs {
+			assertTaskEvent(t, ev, tasks[startIdx+i])
+		}
+	}
+	before := base.Add(-time.Hour)
+	after := base.Add(time.Hour)
+	test(before, before, -1, 0)
+	test(before, tasks[0].Created, -1, 0)
+	test(before, tasks[1].Created, 0, 1)
+	test(before, tasks[2].Created, 0, 2)
+	test(before, tasks[3].Created, 0, 3)
+	test(before, after, 0, 4)
+	test(tasks[0].Created, before, -1, 0)
+	test(tasks[0].Created, tasks[0].Created, -1, 0)
+	test(tasks[0].Created, tasks[1].Created, 0, 1)
+	test(tasks[0].Created, tasks[2].Created, 0, 2)
+	test(tasks[0].Created, tasks[3].Created, 0, 3)
+	test(tasks[0].Created, after, 0, 4)
+	test(tasks[1].Created, tasks[0].Created, -1, 0)
+	test(tasks[1].Created, tasks[1].Created, -1, 0)
+	test(tasks[1].Created, tasks[2].Created, 1, 1)
+	test(tasks[1].Created, tasks[3].Created, 1, 2)
+	test(tasks[1].Created, after, 1, 3)
+	test(tasks[2].Created, tasks[2].Created, -1, 0)
+	test(tasks[2].Created, tasks[3].Created, 2, 1)
+	test(tasks[2].Created, after, 2, 2)
+	test(tasks[3].Created, tasks[3].Created, -1, 0)
+	test(tasks[3].Created, after, 3, 1)
+	test(after, after, -1, 0)
+}
+
+func TestComputeTaskFlakeRate(t *testing.T) {
+	testutils.SmallTest(t)
+	now := time.Now()
+	edb, tdb := setupTasks(t, now)
+	created := now.Add(-time.Hour)
+
+	tester := newDynamicAggregateFnTester(t, computeTaskFlakeRate)
+	expect := func(taskName string, metric string, numer, denom int) {
+		tester.AddAssert(map[string]string{
+			"task_name": taskName,
+			"metric":    metric,
+		}, float64(numer)/float64(denom))
+	}
+
+	taskCount := 0
+	addTask := func(name, commit string, status db.TaskStatus) {
+		taskCount++
+		task := makeTask(created, name, status)
+		task.Revision = commit
+		assert.NoError(t, tdb.PutTask(task))
+	}
+	{
+		name := "NoFlakes"
+		addTask(name, "a", db.TASK_STATUS_SUCCESS)
+		addTask(name, "b", db.TASK_STATUS_SUCCESS)
+		addTask(name, "c", db.TASK_STATUS_SUCCESS)
+		addTask(name, "d", db.TASK_STATUS_FAILURE)
+		addTask(name, "d", db.TASK_STATUS_FAILURE)
+		expect(name, "flake-rate", 0, 5)
+	}
+	{
+		name := "Mishaps"
+		addTask(name, "a", db.TASK_STATUS_FAILURE)
+		addTask(name, "b", db.TASK_STATUS_FAILURE)
+		addTask(name, "c", db.TASK_STATUS_FAILURE)
+		addTask(name, "c", db.TASK_STATUS_MISHAP)
+		expect(name, "flake-rate", 1, 4)
+	}
+	{
+		name := "RetrySucceeded"
+		addTask(name, "a", db.TASK_STATUS_SUCCESS)
+		addTask(name, "b", db.TASK_STATUS_FAILURE)
+		addTask(name, "b", db.TASK_STATUS_SUCCESS)
+		expect(name, "flake-rate", 1, 3)
+	}
+	{
+		name := "RetryFailed"
+		addTask(name, "a", db.TASK_STATUS_FAILURE)
+		addTask(name, "a", db.TASK_STATUS_FAILURE)
+		expect(name, "flake-rate", 0, 2)
+	}
+	{
+		name := "Mix"
+		addTask(name, "a", db.TASK_STATUS_SUCCESS)
+		addTask(name, "b", db.TASK_STATUS_FAILURE)
+		addTask(name, "c", db.TASK_STATUS_FAILURE)
+		addTask(name, "b", db.TASK_STATUS_FAILURE)
+		addTask(name, "c", db.TASK_STATUS_SUCCESS)
+		addTask(name, "d", db.TASK_STATUS_MISHAP)
+		addTask(name, "d", db.TASK_STATUS_SUCCESS)
+		expect(name, "flake-rate", 2, 7)
+	}
+	{
+		name := "LongRetryChain"
+		addTask(name, "a", db.TASK_STATUS_FAILURE)
+		addTask(name, "a", db.TASK_STATUS_FAILURE)
+		addTask(name, "a", db.TASK_STATUS_FAILURE)
+		addTask(name, "a", db.TASK_STATUS_FAILURE)
+		addTask(name, "a", db.TASK_STATUS_FAILURE)
+		addTask(name, "a", db.TASK_STATUS_FAILURE)
+		addTask(name, "a", db.TASK_STATUS_SUCCESS)
+		expect(name, "flake-rate", 6, 7)
+	}
+
+	assert.NoError(t, edb.update())
+	evs, err := edb.Range(TASK_STREAM, created.Add(-time.Hour), created.Add(time.Hour))
+	assert.NoError(t, err)
+	assert.Len(t, evs, taskCount)
+
+	tester.Run(evs)
+}
diff --git a/task_scheduler/go/flakes/flakes.go b/task_scheduler/go/flakes/flakes.go
new file mode 100644
index 0000000..f447d61
--- /dev/null
+++ b/task_scheduler/go/flakes/flakes.go
@@ -0,0 +1,44 @@
+package flakes
+
+/*
+   Find flakily-failed tasks in a time window.
+*/
+
+import (
+	"go.skia.org/infra/task_scheduler/go/db"
+)
+
+// Find flakily-failed tasks in the given slice of tasks.
+func FindFlakes(tasks []*db.Task) []*db.Task {
+	tasksMap := map[db.TaskKey][]*db.Task{}
+	for _, task := range tasks {
+		if task.Done() {
+			tasksMap[task.TaskKey] = append(tasksMap[task.TaskKey], task)
+		}
+	}
+	flaky := []*db.Task{}
+	for _, tasks := range tasksMap {
+		// If one or more tasks succeeded and failed, then all failures
+		// are flakes.
+		success := 0
+		failure := 0
+		for _, task := range tasks {
+			if task.Status == db.TASK_STATUS_SUCCESS {
+				success++
+			} else if task.Status == db.TASK_STATUS_FAILURE {
+				failure++
+			} else if task.Status == db.TASK_STATUS_MISHAP {
+				// Mishaps are flakes by definition.
+				flaky = append(flaky, task)
+			}
+		}
+		if success > 0 && failure > 0 {
+			for _, task := range tasks {
+				if task.Status == db.TASK_STATUS_FAILURE {
+					flaky = append(flaky, task)
+				}
+			}
+		}
+	}
+	return flaky
+}