blob: 9e9a1694ce3e1c4f63abc72a60647a8caea7f529 [file] [log] [blame]
package main
import (
"bytes"
"context"
"encoding/gob"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.skia.org/infra/go/deepequal/assertdeep"
"go.skia.org/infra/go/metrics2/events"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/testutils/unittest"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/db/cache"
"go.skia.org/infra/task_scheduler/go/db/memory"
"go.skia.org/infra/task_scheduler/go/types"
"go.skia.org/infra/task_scheduler/go/window"
)
// Create a db.TaskDB and taskEventDB, a channel which should be read from
// immediately after every Put into the TaskDB, and a cleanup function which
// should be deferred.
func setupTasks(t *testing.T, now time.Time) (*taskEventDB, db.TaskDB, <-chan struct{}, func()) {
ctx, cancel := context.WithCancel(context.Background())
tdb := memory.NewInMemoryTaskDB()
period := TIME_PERIODS[len(TIME_PERIODS)-1]
w, err := window.New(period, 0, nil)
if err != nil {
sklog.Fatalf("Failed to create time window: %s", err)
}
wait := make(chan struct{})
tCache, err := cache.NewTaskCache(ctx, tdb, w, func() {
wait <- struct{}{}
})
if err != nil {
sklog.Fatalf("Failed to create task cache: %s", err)
}
<-wait
edb := &taskEventDB{
cached: []*events.Event{},
tCache: tCache,
}
return edb, tdb, wait, cancel
}
// makeTask returns a fake task with only the fields relevant to this test set.
func makeTask(created time.Time, name string, status types.TaskStatus) *types.Task {
task := &types.Task{
Created: created,
TaskKey: types.TaskKey{
Name: name,
},
Status: status,
}
return task
}
// assertTaskEvent checks that ev.Data contains task.
func assertTaskEvent(t *testing.T, ev *events.Event, task *types.Task) {
require.Equal(t, TASK_STREAM, ev.Stream)
var other types.Task
require.NoError(t, gob.NewDecoder(bytes.NewReader(ev.Data)).Decode(&other))
assertdeep.Equal(t, task, &other)
require.True(t, task.Created.Equal(ev.Timestamp))
}
// TestTaskUpdate checks that taskEventDB.update creates the correct Events from Tasks in the DB.
func TestTaskUpdate(t *testing.T) {
unittest.SmallTest(t)
now := time.Now()
edb, tdb, wait, cancel := setupTasks(t, now)
defer cancel()
start := now.Add(-TIME_PERIODS[len(TIME_PERIODS)-1])
tasks := []*types.Task{
// 0: Filtered out -- too early.
makeTask(start.Add(-time.Minute), "A", types.TASK_STATUS_SUCCESS),
makeTask(start.Add(time.Minute), "A", types.TASK_STATUS_SUCCESS),
makeTask(start.Add(2*time.Minute), "A", types.TASK_STATUS_FAILURE),
// 3: Filtered out -- not Done.
makeTask(start.Add(3*time.Minute), "A", types.TASK_STATUS_RUNNING),
makeTask(start.Add(4*time.Minute), "A", types.TASK_STATUS_MISHAP),
makeTask(start.Add(5*time.Minute), "A", types.TASK_STATUS_FAILURE),
makeTask(start.Add(6*time.Minute), "B", types.TASK_STATUS_SUCCESS),
makeTask(start.Add(7*time.Minute), "A", types.TASK_STATUS_SUCCESS),
}
require.NoError(t, tdb.PutTasks(tasks))
<-wait
require.NoError(t, edb.update())
evs, err := edb.Range(TASK_STREAM, start.Add(-time.Hour), start.Add(time.Hour))
require.NoError(t, err)
expected := append(tasks[1:3], tasks[4:8]...)
require.Len(t, evs, len(expected))
for i, ev := range evs {
assertTaskEvent(t, ev, expected[i])
}
}
// TestTaskRange checks that taskEventDB.Range returns Events within the given range.
func TestTaskRange(t *testing.T) {
unittest.SmallTest(t)
now := time.Now()
edb, tdb, wait, cancel := setupTasks(t, now)
defer cancel()
base := now.Add(-time.Hour)
tasks := []*types.Task{
makeTask(base.Add(-time.Nanosecond), "A", types.TASK_STATUS_SUCCESS),
makeTask(base, "A", types.TASK_STATUS_SUCCESS),
makeTask(base.Add(time.Nanosecond), "A", types.TASK_STATUS_SUCCESS),
makeTask(base.Add(time.Minute), "A", types.TASK_STATUS_SUCCESS),
}
require.NoError(t, tdb.PutTasks(tasks))
<-wait
require.NoError(t, edb.update())
test := func(start, end time.Time, startIdx, count int) {
evs, err := edb.Range(TASK_STREAM, start, end)
require.NoError(t, err)
require.Len(t, evs, count)
for i, ev := range evs {
assertTaskEvent(t, ev, tasks[startIdx+i])
}
}
before := base.Add(-time.Hour)
after := base.Add(time.Hour)
test(before, before, -1, 0)
test(before, tasks[0].Created, -1, 0)
test(before, tasks[1].Created, 0, 1)
test(before, tasks[2].Created, 0, 2)
test(before, tasks[3].Created, 0, 3)
test(before, after, 0, 4)
test(tasks[0].Created, before, -1, 0)
test(tasks[0].Created, tasks[0].Created, -1, 0)
test(tasks[0].Created, tasks[1].Created, 0, 1)
test(tasks[0].Created, tasks[2].Created, 0, 2)
test(tasks[0].Created, tasks[3].Created, 0, 3)
test(tasks[0].Created, after, 0, 4)
test(tasks[1].Created, tasks[0].Created, -1, 0)
test(tasks[1].Created, tasks[1].Created, -1, 0)
test(tasks[1].Created, tasks[2].Created, 1, 1)
test(tasks[1].Created, tasks[3].Created, 1, 2)
test(tasks[1].Created, after, 1, 3)
test(tasks[2].Created, tasks[2].Created, -1, 0)
test(tasks[2].Created, tasks[3].Created, 2, 1)
test(tasks[2].Created, after, 2, 2)
test(tasks[3].Created, tasks[3].Created, -1, 0)
test(tasks[3].Created, after, 3, 1)
test(after, after, -1, 0)
}
func TestComputeTaskFlakeRate(t *testing.T) {
unittest.SmallTest(t)
now := time.Now()
edb, tdb, wait, cancel := setupTasks(t, now)
defer cancel()
created := now.Add(-time.Hour)
tester := newDynamicAggregateFnTester(t, computeTaskFlakeRate)
expect := func(taskName string, metric string, numer, denom int) {
tester.AddAssert(map[string]string{
"task_name": taskName,
"metric": metric,
}, float64(numer)/float64(denom))
}
taskCount := 0
addTask := func(name, commit string, status types.TaskStatus) {
taskCount++
task := makeTask(created, name, status)
task.Revision = commit
require.NoError(t, tdb.PutTask(task))
<-wait
}
{
name := "NoFlakes"
addTask(name, "a", types.TASK_STATUS_SUCCESS)
addTask(name, "b", types.TASK_STATUS_SUCCESS)
addTask(name, "c", types.TASK_STATUS_SUCCESS)
addTask(name, "d", types.TASK_STATUS_FAILURE)
addTask(name, "d", types.TASK_STATUS_FAILURE)
expect(name, "flake-rate", 0, 5)
}
{
name := "Mishaps"
addTask(name, "a", types.TASK_STATUS_FAILURE)
addTask(name, "b", types.TASK_STATUS_FAILURE)
addTask(name, "c", types.TASK_STATUS_FAILURE)
addTask(name, "c", types.TASK_STATUS_MISHAP)
expect(name, "flake-rate", 1, 4)
}
{
name := "RetrySucceeded"
addTask(name, "a", types.TASK_STATUS_SUCCESS)
addTask(name, "b", types.TASK_STATUS_FAILURE)
addTask(name, "b", types.TASK_STATUS_SUCCESS)
expect(name, "flake-rate", 1, 3)
}
{
name := "RetryFailed"
addTask(name, "a", types.TASK_STATUS_FAILURE)
addTask(name, "a", types.TASK_STATUS_FAILURE)
expect(name, "flake-rate", 0, 2)
}
{
name := "Mix"
addTask(name, "a", types.TASK_STATUS_SUCCESS)
addTask(name, "b", types.TASK_STATUS_FAILURE)
addTask(name, "c", types.TASK_STATUS_FAILURE)
addTask(name, "b", types.TASK_STATUS_FAILURE)
addTask(name, "c", types.TASK_STATUS_SUCCESS)
addTask(name, "d", types.TASK_STATUS_MISHAP)
addTask(name, "d", types.TASK_STATUS_SUCCESS)
expect(name, "flake-rate", 2, 7)
}
{
name := "LongRetryChain"
addTask(name, "a", types.TASK_STATUS_FAILURE)
addTask(name, "a", types.TASK_STATUS_FAILURE)
addTask(name, "a", types.TASK_STATUS_FAILURE)
addTask(name, "a", types.TASK_STATUS_FAILURE)
addTask(name, "a", types.TASK_STATUS_FAILURE)
addTask(name, "a", types.TASK_STATUS_FAILURE)
addTask(name, "a", types.TASK_STATUS_SUCCESS)
expect(name, "flake-rate", 6, 7)
}
require.NoError(t, edb.update())
evs, err := edb.Range(TASK_STREAM, created.Add(-time.Hour), created.Add(time.Hour))
require.NoError(t, err)
require.Len(t, evs, taskCount)
tester.Run(evs)
}