[datahopper] Add metrics for flaky tasks
Bug: skia:
Change-Id: I2cbb2b15156705daad7da7717ebd876987e04b5d
Reviewed-on: https://skia-review.googlesource.com/c/171481
Commit-Queue: Eric Boren <borenet@google.com>
Reviewed-by: Ben Wagner <benjaminwagner@google.com>
diff --git a/datahopper/go/datahopper/jobs.go b/datahopper/go/datahopper/jobs.go
index 61c50a8..065618d 100644
--- a/datahopper/go/datahopper/jobs.go
+++ b/datahopper/go/datahopper/jobs.go
@@ -29,11 +29,11 @@
)
const (
- STREAM = "job_metrics"
+ JOB_STREAM = "job_metrics"
)
-// jobTypeString is an enum of the types of Jobs that computeAvgDuration will aggregate separately.
-// The string value is the same as the job_type tag value returned by computeAvgDuration.
+// jobTypeString is an enum of the types of Jobs that computeAvgJobDuration will aggregate separately.
+// The string value is the same as the job_type tag value returned by computeAvgJobDuration.
type jobTypeString string
const (
@@ -54,7 +54,7 @@
// 3. Thread 1: EventManager.updateMetrics calls jobEventDB.Range, which waits
// to lock jobEventDB.mtx
// 4. Thread 2: jobEventDB.update calls EventMetrics.AggregateMetric (by way
- // of addAggregates and EventStream.AggregateMetric), which waits to lock
+ // of addJobAggregates and EventStream.AggregateMetric), which waits to lock
// EventMetrics.mtx
mtx sync.Mutex
}
@@ -118,7 +118,7 @@
return fmt.Errorf("Failed to encode %#v to GOB: %s", job, err)
}
ev := &events.Event{
- Stream: STREAM,
+ Stream: JOB_STREAM,
Timestamp: job.Created,
Data: buf.Bytes(),
}
@@ -130,12 +130,12 @@
return nil
}
-// computeAvgDuration is an events.DynamicAggregateFn that returns metrics for average Job duration
+// computeAvgJobDuration is an events.DynamicAggregateFn that returns metrics for average Job duration
// for Jobs with status SUCCESS or FAILURE, given a slice of Events created by jobEventDB.update.
// The first return value will contain the tags "job_name" (db.Job.Name) and "job_type" (one of
// "normal", "tryjob", "forced"), and the second return value will be the corresponding average Job
// duration. Returns an error if Event.Data can't be GOB-decoded as a db.Job.
-func computeAvgDuration(ev []*events.Event) ([]map[string]string, []float64, error) {
+func computeAvgJobDuration(ev []*events.Event) ([]map[string]string, []float64, error) {
if len(ev) > 0 {
// ev should be ordered by timestamp
sklog.Debugf("Calculating avg-duration for %d jobs since %s.", len(ev), ev[0].Timestamp)
@@ -196,12 +196,12 @@
return rvTags, rvVals, nil
}
-// computeAvgDuration is an events.DynamicAggregateFn that returns metrics for Job failure rate and
+// computeJobFailureMishapRate is an events.DynamicAggregateFn that returns metrics for Job failure rate and
// mishap rate, given a slice of Events created by jobEventDB.update. The first return value will
// contain the tags "job_name" (db.Job.Name) and "metric" (one of "failure-rate", "mishap-rate"),
// and the second return value will be the corresponding ratio of failed/mishap Jobs to all
// completed Jobs. Returns an error if Event.Data can't be GOB-decoded as a db.Job.
-func computeFailureMishapRate(ev []*events.Event) ([]map[string]string, []float64, error) {
+func computeJobFailureMishapRate(ev []*events.Event) ([]map[string]string, []float64, error) {
if len(ev) > 0 {
// ev should be ordered by timestamp
sklog.Debugf("Calculating failure-rate for %d jobs since %s.", len(ev), ev[0].Timestamp)
@@ -250,15 +250,15 @@
return rvTags, rvVals, nil
}
-// addAggregates adds aggregation functions for job events to the EventStream.
-func addAggregates(s *events.EventStream) error {
+// addJobAggregates adds aggregation functions for job events to the EventStream.
+func addJobAggregates(s *events.EventStream) error {
for _, period := range TIME_PERIODS {
- if err := s.DynamicMetric(map[string]string{"metric": "avg-duration"}, period, computeAvgDuration); err != nil {
+ if err := s.DynamicMetric(map[string]string{"metric": "avg-duration"}, period, computeAvgJobDuration); err != nil {
return err
}
// Job failure/mishap rate.
- if err := s.DynamicMetric(nil, period, computeFailureMishapRate); err != nil {
+ if err := s.DynamicMetric(nil, period, computeJobFailureMishapRate); err != nil {
return err
}
}
@@ -281,8 +281,8 @@
}
edb.em = em
- s := em.GetEventStream(STREAM)
- if err := addAggregates(s); err != nil {
+ s := em.GetEventStream(JOB_STREAM)
+ if err := addJobAggregates(s); err != nil {
return err
}
diff --git a/datahopper/go/datahopper/jobs_test.go b/datahopper/go/datahopper/jobs_test.go
index 7c4818a..be342f4 100644
--- a/datahopper/go/datahopper/jobs_test.go
+++ b/datahopper/go/datahopper/jobs_test.go
@@ -15,7 +15,7 @@
)
// Create a db.JobDB and jobEventDB.
-func setup(t *testing.T, now time.Time) (*jobEventDB, db.JobDB) {
+func setupJobs(t *testing.T, now time.Time) (*jobEventDB, db.JobDB) {
jdb := db.NewInMemoryJobDB()
edb := &jobEventDB{
cached: []*events.Event{},
@@ -45,20 +45,20 @@
return j
}
-// assertEvent checks that ev.Data contains j.
-func assertEvent(t *testing.T, ev *events.Event, j *db.Job) {
- assert.Equal(t, STREAM, ev.Stream)
+// assertJobEvent checks that ev.Data contains j.
+func assertJobEvent(t *testing.T, ev *events.Event, j *db.Job) {
+ assert.Equal(t, JOB_STREAM, ev.Stream)
var job db.Job
assert.NoError(t, gob.NewDecoder(bytes.NewReader(ev.Data)).Decode(&job))
deepequal.AssertDeepEqual(t, j, &job)
assert.True(t, j.Created.Equal(ev.Timestamp))
}
-// TestUpdate checks that jobEventDB.update creates the correct Events from Jobs in the DB.
-func TestUpdate(t *testing.T) {
+// TestJobUpdate checks that jobEventDB.update creates the correct Events from Jobs in the DB.
+func TestJobUpdate(t *testing.T) {
testutils.SmallTest(t)
now := time.Now()
- edb, jdb := setup(t, now)
+ edb, jdb := setupJobs(t, now)
start := now.Add(-TIME_PERIODS[len(TIME_PERIODS)-1])
jobs := []*db.Job{
// 0: Filtered out -- too early.
@@ -74,21 +74,21 @@
}
assert.NoError(t, jdb.PutJobs(jobs))
assert.NoError(t, edb.update())
- evs, err := edb.Range(STREAM, start.Add(-time.Hour), start.Add(time.Hour))
+ evs, err := edb.Range(JOB_STREAM, start.Add(-time.Hour), start.Add(time.Hour))
assert.NoError(t, err)
expected := append(jobs[1:3], jobs[4:8]...)
assert.Len(t, evs, len(expected))
for i, ev := range evs {
- assertEvent(t, ev, expected[i])
+ assertJobEvent(t, ev, expected[i])
}
}
-// TestRange checks that jobEventDB.Range returns Events within the given range.
-func TestRange(t *testing.T) {
+// TestJobRange checks that jobEventDB.Range returns Events within the given range.
+func TestJobRange(t *testing.T) {
testutils.SmallTest(t)
now := time.Now()
- edb, jdb := setup(t, now)
+ edb, jdb := setupJobs(t, now)
base := now.Add(-time.Hour)
jobs := []*db.Job{
makeJob(base.Add(-time.Nanosecond), "A", db.JOB_STATUS_SUCCESS, NORMAL, time.Minute),
@@ -100,11 +100,11 @@
assert.NoError(t, edb.update())
test := func(start, end time.Time, startIdx, count int) {
- evs, err := edb.Range(STREAM, start, end)
+ evs, err := edb.Range(JOB_STREAM, start, end)
assert.NoError(t, err)
assert.Len(t, evs, count)
for i, ev := range evs {
- assertEvent(t, ev, jobs[startIdx+i])
+ assertJobEvent(t, ev, jobs[startIdx+i])
}
}
before := base.Add(-time.Hour)
@@ -177,13 +177,13 @@
}
}
-func TestComputeAvgDuration(t *testing.T) {
+func TestComputeAvgJobDuration(t *testing.T) {
testutils.SmallTest(t)
now := time.Now()
- edb, jdb := setup(t, now)
+ edb, jdb := setupJobs(t, now)
created := now.Add(-time.Hour)
- tester := newDynamicAggregateFnTester(t, computeAvgDuration)
+ tester := newDynamicAggregateFnTester(t, computeAvgJobDuration)
expect := func(jobName string, jobType jobTypeString, jobs []*db.Job) {
var totalDur float64 = 0
for _, j := range jobs {
@@ -230,20 +230,20 @@
expect("AllTypes", FORCED, jobsType[8:9])
assert.NoError(t, edb.update())
- evs, err := edb.Range(STREAM, created.Add(-time.Hour), created.Add(time.Hour))
+ evs, err := edb.Range(JOB_STREAM, created.Add(-time.Hour), created.Add(time.Hour))
assert.NoError(t, err)
assert.Len(t, evs, len(jobsStatus)+len(jobsType))
tester.Run(evs)
}
-func TestComputeFailureMishapRate(t *testing.T) {
+func TestComputeJobFailureMishapRate(t *testing.T) {
testutils.SmallTest(t)
now := time.Now()
- edb, jdb := setup(t, now)
+ edb, jdb := setupJobs(t, now)
created := now.Add(-time.Hour)
- tester := newDynamicAggregateFnTester(t, computeFailureMishapRate)
+ tester := newDynamicAggregateFnTester(t, computeJobFailureMishapRate)
expect := func(jobName string, metric string, numer, denom int) {
tester.AddAssert(map[string]string{
"job_name": jobName,
@@ -322,7 +322,7 @@
}
assert.NoError(t, edb.update())
- evs, err := edb.Range(STREAM, created.Add(-time.Hour), created.Add(time.Hour))
+ evs, err := edb.Range(JOB_STREAM, created.Add(-time.Hour), created.Add(time.Hour))
assert.NoError(t, err)
assert.Len(t, evs, jobCount)
diff --git a/datahopper/go/datahopper/tasks.go b/datahopper/go/datahopper/tasks.go
new file mode 100644
index 0000000..65e79cb
--- /dev/null
+++ b/datahopper/go/datahopper/tasks.go
@@ -0,0 +1,207 @@
+package main
+
+import (
+ "bytes"
+ "context"
+ "encoding/gob"
+ "fmt"
+ "sort"
+ "sync"
+ "time"
+
+ "go.skia.org/infra/go/httputils"
+ "go.skia.org/infra/go/metrics2"
+ "go.skia.org/infra/go/metrics2/events"
+ "go.skia.org/infra/go/sklog"
+ "go.skia.org/infra/go/util"
+ "go.skia.org/infra/task_scheduler/go/db"
+ "go.skia.org/infra/task_scheduler/go/db/remote_db"
+ "go.skia.org/infra/task_scheduler/go/flakes"
+)
+
+const (
+ TASK_STREAM = "task_metrics"
+)
+
+// taskEventDB implements the events.EventDB interface.
+type taskEventDB struct {
+ cached []*events.Event
+ db db.TaskReader
+ em *events.EventMetrics
+ // Do not lock mtx when calling methods on em. Otherwise deadlock can occur.
+ // E.g. (now fixed):
+ // 1. Thread 1: EventManager.updateMetrics locks EventMetrics.mtx
+ // 2. Thread 2: taskEventDB.update locks taskEventDB.mtx
+ // 3. Thread 1: EventManager.updateMetrics calls taskEventDB.Range, which waits
+ // to lock taskEventDB.mtx
+ // 4. Thread 2: taskEventDB.update calls EventMetrics.AggregateMetric (by way
+ // of addTaskAggregates and EventStream.AggregateMetric), which waits to lock
+ // EventMetrics.mtx
+ mtx sync.Mutex
+}
+
+// See docs for events.EventDB interface.
+func (t *taskEventDB) Append(string, []byte) error {
+ return fmt.Errorf("taskEventDB is read-only!")
+}
+
+// See docs for events.EventDB interface.
+func (t *taskEventDB) Close() error {
+ return nil
+}
+
+// See docs for events.EventDB interface.
+func (t *taskEventDB) Insert(*events.Event) error {
+ return fmt.Errorf("taskEventDB is read-only!")
+}
+
+// See docs for events.EventDB interface.
+func (t *taskEventDB) Range(stream string, start, end time.Time) ([]*events.Event, error) {
+ t.mtx.Lock()
+ defer t.mtx.Unlock()
+
+ n := len(t.cached)
+ if n == 0 {
+ return []*events.Event{}, nil
+ }
+
+ first := sort.Search(n, func(i int) bool {
+ return !t.cached[i].Timestamp.Before(start)
+ })
+
+ last := first + sort.Search(n-first, func(i int) bool {
+ return !t.cached[i+first].Timestamp.Before(end)
+ })
+
+ rv := make([]*events.Event, last-first, last-first)
+ copy(rv[:], t.cached[first:last])
+ return rv, nil
+}
+
+// update updates the cached tasks in the taskEventDB. Only a single thread may
+// call this method, but it can be called concurrently with other methods.
+func (t *taskEventDB) update() error {
+ defer metrics2.FuncTimer().Stop()
+ now := time.Now()
+ longestPeriod := TIME_PERIODS[len(TIME_PERIODS)-1]
+ tasks, err := t.db.GetTasksFromDateRange(now.Add(-longestPeriod), now, "")
+ if err != nil {
+ return fmt.Errorf("Failed to load tasks from %s to %s: %s", now.Add(-longestPeriod), now, err)
+ }
+ sklog.Debugf("taskEventDB.update: Processing %d tasks for time range %s to %s.", len(tasks), now.Add(-longestPeriod), now)
+ cached := make([]*events.Event, 0, len(tasks))
+ for _, task := range tasks {
+ if !task.Done() {
+ continue
+ }
+ var buf bytes.Buffer
+ if err := gob.NewEncoder(&buf).Encode(task); err != nil {
+ return fmt.Errorf("Failed to encode %#v to GOB: %s", task, err)
+ }
+ ev := &events.Event{
+ Stream: TASK_STREAM,
+ Timestamp: task.Created,
+ Data: buf.Bytes(),
+ }
+ cached = append(cached, ev)
+ }
+ t.mtx.Lock()
+ defer t.mtx.Unlock()
+ t.cached = cached
+ return nil
+}
+
+// computeTaskFlakeRate is an events.DynamicAggregateFn that returns metrics for Task flake rate, given
+// a slice of Events created by taskEventDB.update. The first return value will contain the tags
+// "task_name" (db.Task.Name) and "metric" (one of "failure-rate", "mishap-rate"),
+// and the second return value will be the corresponding ratio of failed/mishap Task to all
+// completed Tasks. Returns an error if Event.Data can't be GOB-decoded as a db.Task.
+func computeTaskFlakeRate(ev []*events.Event) ([]map[string]string, []float64, error) {
+ if len(ev) > 0 {
+ // ev should be ordered by timestamp
+ sklog.Debugf("Calculating flake-rate for %d tasks since %s.", len(ev), ev[0].Timestamp)
+ }
+ type taskSum struct {
+ flakes int
+ count int
+ }
+ byTask := map[string]*taskSum{}
+ tasks := make([]*db.Task, 0, len(ev))
+ for _, e := range ev {
+ var task db.Task
+ if err := gob.NewDecoder(bytes.NewReader(e.Data)).Decode(&task); err != nil {
+ return nil, nil, err
+ }
+ tasks = append(tasks, &task)
+ entry, ok := byTask[task.Name]
+ if !ok {
+ entry = &taskSum{}
+ byTask[task.Name] = entry
+ }
+ entry.count++
+ }
+ flaky := flakes.FindFlakes(tasks)
+ for _, task := range flaky {
+ byTask[task.Name].flakes++
+ }
+ rvTags := make([]map[string]string, 0, len(byTask)*2)
+ rvVals := make([]float64, 0, len(byTask)*2)
+ add := func(taskName, metric string, value float64) {
+ rvTags = append(rvTags, map[string]string{
+ "task_name": taskName,
+ "metric": metric,
+ })
+ rvVals = append(rvVals, value)
+ }
+ for taskName, taskSum := range byTask {
+ if taskSum.count == 0 {
+ continue
+ }
+ add(taskName, "flake-rate", float64(taskSum.flakes)/float64(taskSum.count))
+ }
+ return rvTags, rvVals, nil
+}
+
+// addTaskAggregates adds aggregation functions for job events to the EventStream.
+func addTaskAggregates(s *events.EventStream) error {
+ for _, period := range TIME_PERIODS {
+ // Flake rate.
+ if err := s.DynamicMetric(nil, period, computeTaskFlakeRate); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// StartTaskMetrics starts a goroutine which ingests metrics data based on Tasks.
+func StartTaskMetrics(taskSchedulerDbUrl string, ctx context.Context) error {
+ db, err := remote_db.NewClient(taskSchedulerDbUrl, httputils.NewTimeoutClient())
+ if err != nil {
+ return err
+ }
+ edb := &taskEventDB{
+ cached: []*events.Event{},
+ db: db,
+ }
+ em, err := events.NewEventMetrics(edb, "task_metrics")
+ if err != nil {
+ return err
+ }
+ edb.em = em
+
+ s := em.GetEventStream(TASK_STREAM)
+ if err := addTaskAggregates(s); err != nil {
+ return err
+ }
+
+ lv := metrics2.NewLiveness("last_successful_task_metrics_update")
+ go util.RepeatCtx(5*time.Minute, ctx, func() {
+ if err := edb.update(); err != nil {
+ sklog.Errorf("Failed to update task data: %s", err)
+ } else {
+ lv.Reset()
+ }
+ })
+ em.Start(ctx)
+ return nil
+}
diff --git a/datahopper/go/datahopper/tasks_test.go b/datahopper/go/datahopper/tasks_test.go
new file mode 100644
index 0000000..cabd391
--- /dev/null
+++ b/datahopper/go/datahopper/tasks_test.go
@@ -0,0 +1,207 @@
+package main
+
+import (
+ "bytes"
+ "encoding/gob"
+ "testing"
+ "time"
+
+ assert "github.com/stretchr/testify/require"
+ "go.skia.org/infra/go/deepequal"
+ "go.skia.org/infra/go/metrics2/events"
+ "go.skia.org/infra/go/testutils"
+ "go.skia.org/infra/task_scheduler/go/db"
+)
+
+// Create a db.TaskDB and taskEventDB.
+func setupTasks(t *testing.T, now time.Time) (*taskEventDB, db.TaskDB) {
+ tdb := db.NewInMemoryTaskDB()
+ edb := &taskEventDB{
+ cached: []*events.Event{},
+ db: tdb,
+ }
+ return edb, tdb
+}
+
+// makeTask returns a fake task with only the fields relevant to this test set.
+func makeTask(created time.Time, name string, status db.TaskStatus) *db.Task {
+ task := &db.Task{
+ Created: created,
+ TaskKey: db.TaskKey{
+ Name: name,
+ },
+ Status: status,
+ }
+ return task
+}
+
+// assertTaskEvent checks that ev.Data contains task.
+func assertTaskEvent(t *testing.T, ev *events.Event, task *db.Task) {
+ assert.Equal(t, TASK_STREAM, ev.Stream)
+ var other db.Task
+ assert.NoError(t, gob.NewDecoder(bytes.NewReader(ev.Data)).Decode(&other))
+ deepequal.AssertDeepEqual(t, task, &other)
+ assert.True(t, task.Created.Equal(ev.Timestamp))
+}
+
+// TestTaskUpdate checks that taskEventDB.update creates the correct Events from Tasks in the DB.
+func TestTaskUpdate(t *testing.T) {
+ testutils.SmallTest(t)
+ now := time.Now()
+ edb, tdb := setupTasks(t, now)
+ start := now.Add(-TIME_PERIODS[len(TIME_PERIODS)-1])
+ tasks := []*db.Task{
+ // 0: Filtered out -- too early.
+ makeTask(start.Add(-time.Minute), "A", db.TASK_STATUS_SUCCESS),
+ makeTask(start.Add(time.Minute), "A", db.TASK_STATUS_SUCCESS),
+ makeTask(start.Add(2*time.Minute), "A", db.TASK_STATUS_FAILURE),
+ // 3: Filtered out -- not Done.
+ makeTask(start.Add(3*time.Minute), "A", db.TASK_STATUS_RUNNING),
+ makeTask(start.Add(4*time.Minute), "A", db.TASK_STATUS_MISHAP),
+ makeTask(start.Add(5*time.Minute), "A", db.TASK_STATUS_FAILURE),
+ makeTask(start.Add(6*time.Minute), "B", db.TASK_STATUS_SUCCESS),
+ makeTask(start.Add(7*time.Minute), "A", db.TASK_STATUS_SUCCESS),
+ }
+ assert.NoError(t, tdb.PutTasks(tasks))
+ assert.NoError(t, edb.update())
+ evs, err := edb.Range(TASK_STREAM, start.Add(-time.Hour), start.Add(time.Hour))
+ assert.NoError(t, err)
+
+ expected := append(tasks[1:3], tasks[4:8]...)
+ assert.Len(t, evs, len(expected))
+ for i, ev := range evs {
+ assertTaskEvent(t, ev, expected[i])
+ }
+}
+
+// TestTaskRange checks that taskEventDB.Range returns Events within the given range.
+func TestTaskRange(t *testing.T) {
+ testutils.SmallTest(t)
+ now := time.Now()
+ edb, tdb := setupTasks(t, now)
+ base := now.Add(-time.Hour)
+ tasks := []*db.Task{
+ makeTask(base.Add(-time.Nanosecond), "A", db.TASK_STATUS_SUCCESS),
+ makeTask(base, "A", db.TASK_STATUS_SUCCESS),
+ makeTask(base.Add(time.Nanosecond), "A", db.TASK_STATUS_SUCCESS),
+ makeTask(base.Add(time.Minute), "A", db.TASK_STATUS_SUCCESS),
+ }
+ assert.NoError(t, tdb.PutTasks(tasks))
+ assert.NoError(t, edb.update())
+
+ test := func(start, end time.Time, startIdx, count int) {
+ evs, err := edb.Range(TASK_STREAM, start, end)
+ assert.NoError(t, err)
+ assert.Len(t, evs, count)
+ for i, ev := range evs {
+ assertTaskEvent(t, ev, tasks[startIdx+i])
+ }
+ }
+ before := base.Add(-time.Hour)
+ after := base.Add(time.Hour)
+ test(before, before, -1, 0)
+ test(before, tasks[0].Created, -1, 0)
+ test(before, tasks[1].Created, 0, 1)
+ test(before, tasks[2].Created, 0, 2)
+ test(before, tasks[3].Created, 0, 3)
+ test(before, after, 0, 4)
+ test(tasks[0].Created, before, -1, 0)
+ test(tasks[0].Created, tasks[0].Created, -1, 0)
+ test(tasks[0].Created, tasks[1].Created, 0, 1)
+ test(tasks[0].Created, tasks[2].Created, 0, 2)
+ test(tasks[0].Created, tasks[3].Created, 0, 3)
+ test(tasks[0].Created, after, 0, 4)
+ test(tasks[1].Created, tasks[0].Created, -1, 0)
+ test(tasks[1].Created, tasks[1].Created, -1, 0)
+ test(tasks[1].Created, tasks[2].Created, 1, 1)
+ test(tasks[1].Created, tasks[3].Created, 1, 2)
+ test(tasks[1].Created, after, 1, 3)
+ test(tasks[2].Created, tasks[2].Created, -1, 0)
+ test(tasks[2].Created, tasks[3].Created, 2, 1)
+ test(tasks[2].Created, after, 2, 2)
+ test(tasks[3].Created, tasks[3].Created, -1, 0)
+ test(tasks[3].Created, after, 3, 1)
+ test(after, after, -1, 0)
+}
+
+func TestComputeTaskFlakeRate(t *testing.T) {
+ testutils.SmallTest(t)
+ now := time.Now()
+ edb, tdb := setupTasks(t, now)
+ created := now.Add(-time.Hour)
+
+ tester := newDynamicAggregateFnTester(t, computeTaskFlakeRate)
+ expect := func(taskName string, metric string, numer, denom int) {
+ tester.AddAssert(map[string]string{
+ "task_name": taskName,
+ "metric": metric,
+ }, float64(numer)/float64(denom))
+ }
+
+ taskCount := 0
+ addTask := func(name, commit string, status db.TaskStatus) {
+ taskCount++
+ task := makeTask(created, name, status)
+ task.Revision = commit
+ assert.NoError(t, tdb.PutTask(task))
+ }
+ {
+ name := "NoFlakes"
+ addTask(name, "a", db.TASK_STATUS_SUCCESS)
+ addTask(name, "b", db.TASK_STATUS_SUCCESS)
+ addTask(name, "c", db.TASK_STATUS_SUCCESS)
+ addTask(name, "d", db.TASK_STATUS_FAILURE)
+ addTask(name, "d", db.TASK_STATUS_FAILURE)
+ expect(name, "flake-rate", 0, 5)
+ }
+ {
+ name := "Mishaps"
+ addTask(name, "a", db.TASK_STATUS_FAILURE)
+ addTask(name, "b", db.TASK_STATUS_FAILURE)
+ addTask(name, "c", db.TASK_STATUS_FAILURE)
+ addTask(name, "c", db.TASK_STATUS_MISHAP)
+ expect(name, "flake-rate", 1, 4)
+ }
+ {
+ name := "RetrySucceeded"
+ addTask(name, "a", db.TASK_STATUS_SUCCESS)
+ addTask(name, "b", db.TASK_STATUS_FAILURE)
+ addTask(name, "b", db.TASK_STATUS_SUCCESS)
+ expect(name, "flake-rate", 1, 3)
+ }
+ {
+ name := "RetryFailed"
+ addTask(name, "a", db.TASK_STATUS_FAILURE)
+ addTask(name, "a", db.TASK_STATUS_FAILURE)
+ expect(name, "flake-rate", 0, 2)
+ }
+ {
+ name := "Mix"
+ addTask(name, "a", db.TASK_STATUS_SUCCESS)
+ addTask(name, "b", db.TASK_STATUS_FAILURE)
+ addTask(name, "c", db.TASK_STATUS_FAILURE)
+ addTask(name, "b", db.TASK_STATUS_FAILURE)
+ addTask(name, "c", db.TASK_STATUS_SUCCESS)
+ addTask(name, "d", db.TASK_STATUS_MISHAP)
+ addTask(name, "d", db.TASK_STATUS_SUCCESS)
+ expect(name, "flake-rate", 2, 7)
+ }
+ {
+ name := "LongRetryChain"
+ addTask(name, "a", db.TASK_STATUS_FAILURE)
+ addTask(name, "a", db.TASK_STATUS_FAILURE)
+ addTask(name, "a", db.TASK_STATUS_FAILURE)
+ addTask(name, "a", db.TASK_STATUS_FAILURE)
+ addTask(name, "a", db.TASK_STATUS_FAILURE)
+ addTask(name, "a", db.TASK_STATUS_FAILURE)
+ addTask(name, "a", db.TASK_STATUS_SUCCESS)
+ expect(name, "flake-rate", 6, 7)
+ }
+
+ assert.NoError(t, edb.update())
+ evs, err := edb.Range(TASK_STREAM, created.Add(-time.Hour), created.Add(time.Hour))
+ assert.NoError(t, err)
+ assert.Len(t, evs, taskCount)
+
+ tester.Run(evs)
+}
diff --git a/task_scheduler/go/flakes/flakes.go b/task_scheduler/go/flakes/flakes.go
new file mode 100644
index 0000000..f447d61
--- /dev/null
+++ b/task_scheduler/go/flakes/flakes.go
@@ -0,0 +1,44 @@
+package flakes
+
+/*
+ Find flakily-failed tasks in a time window.
+*/
+
+import (
+ "go.skia.org/infra/task_scheduler/go/db"
+)
+
+// Find flakily-failed tasks in the given slice of tasks.
+func FindFlakes(tasks []*db.Task) []*db.Task {
+ tasksMap := map[db.TaskKey][]*db.Task{}
+ for _, task := range tasks {
+ if task.Done() {
+ tasksMap[task.TaskKey] = append(tasksMap[task.TaskKey], task)
+ }
+ }
+ flaky := []*db.Task{}
+ for _, tasks := range tasksMap {
+ // If one or more tasks succeeded and failed, then all failures
+ // are flakes.
+ success := 0
+ failure := 0
+ for _, task := range tasks {
+ if task.Status == db.TASK_STATUS_SUCCESS {
+ success++
+ } else if task.Status == db.TASK_STATUS_FAILURE {
+ failure++
+ } else if task.Status == db.TASK_STATUS_MISHAP {
+ // Mishaps are flakes by definition.
+ flaky = append(flaky, task)
+ }
+ }
+ if success > 0 && failure > 0 {
+ for _, task := range tasks {
+ if task.Status == db.TASK_STATUS_FAILURE {
+ flaky = append(flaky, task)
+ }
+ }
+ }
+ }
+ return flaky
+}