blob: ae9bba893e3e4500b72fd6bd1cecfbb733b04884 [file] [log] [blame]
package cache
import (
"context"
"sort"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.skia.org/infra/go/deepequal/assertdeep"
"go.skia.org/infra/go/testutils/unittest"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/db/memory"
"go.skia.org/infra/task_scheduler/go/types"
"go.skia.org/infra/task_scheduler/go/window"
)
func testGetTasksForCommits(t *testing.T, c TaskCache, b *types.Task) {
for _, commit := range b.Commits {
found, err := c.GetTaskForCommit(types.DEFAULT_TEST_REPO, commit, b.Name)
require.NoError(t, err)
assertdeep.Equal(t, b, found)
tasks, err := c.GetTasksForCommits(types.DEFAULT_TEST_REPO, []string{commit})
require.NoError(t, err)
assertdeep.Equal(t, b, tasks[commit][b.Name])
}
}
func TestTaskCache(t *testing.T) {
unittest.SmallTest(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := memory.NewInMemoryTaskDB()
// Pre-load a task into the DB.
startTime := time.Now().Add(-30 * time.Minute) // Arbitrary starting point.
t1 := types.MakeTestTask(startTime, []string{"a", "b", "c", "d"})
require.NoError(t, d.PutTask(t1))
d.Wait()
// Create the cache. Ensure that the existing task is present.
w, err := window.New(time.Hour, 0, nil)
require.NoError(t, err)
wait := make(chan struct{})
c, err := NewTaskCache(ctx, d, w, func() {
wait <- struct{}{}
})
require.NoError(t, err)
<-wait
testGetTasksForCommits(t, c, t1)
// Bisect the first task.
t2 := types.MakeTestTask(startTime.Add(time.Minute), []string{"c", "d"})
t1.Commits = []string{"a", "b"}
require.NoError(t, d.PutTasks([]*types.Task{t2, t1}))
d.Wait()
<-wait
require.NoError(t, c.Update())
// Ensure that t2 (and not t1) shows up for commits "c" and "d".
testGetTasksForCommits(t, c, t1)
testGetTasksForCommits(t, c, t2)
// Insert a task on a second bot.
t3 := types.MakeTestTask(startTime.Add(2*time.Minute), []string{"a", "b"})
t3.Name = "Another-Task"
require.NoError(t, d.PutTask(t3))
d.Wait()
<-wait
require.NoError(t, c.Update())
tasks, err := c.GetTasksForCommits(types.DEFAULT_TEST_REPO, []string{"b"})
require.NoError(t, err)
assertdeep.Equal(t, map[string]map[string]*types.Task{
"b": {
t1.Name: t1,
t3.Name: t3,
},
}, tasks)
// Ensure that we don't insert outdated entries.
old := t1.Copy()
require.False(t, util.TimeIsZero(old.DbModified))
old.Name = "outdated"
old.DbModified = old.DbModified.Add(-time.Hour)
c.(*taskCache).modified[old.Id] = old
require.NoError(t, c.Update())
got, err := c.GetTask(old.Id)
require.NoError(t, err)
assertdeep.Equal(t, got, t1)
}
func TestTaskCacheKnownTaskName(t *testing.T) {
unittest.SmallTest(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := memory.NewInMemoryTaskDB()
w, err := window.New(time.Hour, 0, nil)
require.NoError(t, err)
wait := make(chan struct{})
c, err := NewTaskCache(ctx, d, w, func() {
wait <- struct{}{}
})
require.NoError(t, err)
<-wait
// Try jobs don't count toward KnownTaskName.
startTime := time.Now().Add(-30 * time.Minute) // Arbitrary starting point.
t1 := types.MakeTestTask(startTime, []string{"a", "b", "c", "d"})
t1.Server = "fake-server"
t1.Issue = "fake-issue"
t1.Patchset = "fake-patchset"
require.NoError(t, d.PutTask(t1))
d.Wait()
<-wait
require.NoError(t, c.Update())
require.False(t, c.KnownTaskName(t1.Repo, t1.Name))
// Forced jobs don't count toward KnownTaskName.
t2 := types.MakeTestTask(startTime, []string{"a", "b", "c", "d"})
t2.ForcedJobId = "job-id"
require.NoError(t, d.PutTask(t2))
d.Wait()
<-wait
require.NoError(t, c.Update())
require.False(t, c.KnownTaskName(t2.Repo, t2.Name))
// Normal task.
t3 := types.MakeTestTask(startTime, []string{"a", "b", "c", "d"})
require.NoError(t, d.PutTask(t3))
d.Wait()
<-wait
require.NoError(t, c.Update())
require.True(t, c.KnownTaskName(t3.Repo, t3.Name))
}
func TestTaskCacheGetTasksFromDateRange(t *testing.T) {
unittest.SmallTest(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := memory.NewInMemoryTaskDB()
// Pre-load a task into the DB.
timeStart := time.Now().Add(-30 * time.Minute) // Arbitrary starting point.
t1 := types.MakeTestTask(timeStart.Add(time.Nanosecond), []string{"a", "b", "c", "d"})
require.NoError(t, d.PutTask(t1))
d.Wait()
// Create the cache.
w, err := window.New(time.Hour, 0, nil)
require.NoError(t, err)
wait := make(chan struct{})
c, err := NewTaskCache(ctx, d, w, func() {
wait <- struct{}{}
})
require.NoError(t, err)
<-wait
// Insert two more tasks. Ensure at least 1 nanosecond between task Created
// times so that t1After != t2Before and t2After != t3Before.
t2 := types.MakeTestTask(timeStart.Add(2*time.Nanosecond), []string{"e", "f"})
t3 := types.MakeTestTask(timeStart.Add(3*time.Nanosecond), []string{"g", "h"})
require.NoError(t, d.PutTasks([]*types.Task{t2, t3}))
d.Wait()
<-wait
require.NoError(t, c.Update())
// Ensure that all tasks show up in the correct time ranges, in sorted order.
t1Before := t1.Created
t1After := t1Before.Add(1 * time.Nanosecond)
t2Before := t2.Created
t2After := t2Before.Add(1 * time.Nanosecond)
t3Before := t3.Created
t3After := t3Before.Add(1 * time.Nanosecond)
timeEnd := timeStart.Add(4 * time.Nanosecond)
tasks, err := c.GetTasksFromDateRange(timeStart, t1Before)
require.NoError(t, err)
require.Equal(t, 0, len(tasks))
tasks, err = c.GetTasksFromDateRange(timeStart, t1After)
require.NoError(t, err)
assertdeep.Equal(t, []*types.Task{t1}, tasks)
tasks, err = c.GetTasksFromDateRange(timeStart, t2Before)
require.NoError(t, err)
assertdeep.Equal(t, []*types.Task{t1}, tasks)
tasks, err = c.GetTasksFromDateRange(timeStart, t2After)
require.NoError(t, err)
assertdeep.Equal(t, []*types.Task{t1, t2}, tasks)
tasks, err = c.GetTasksFromDateRange(timeStart, t3Before)
require.NoError(t, err)
assertdeep.Equal(t, []*types.Task{t1, t2}, tasks)
tasks, err = c.GetTasksFromDateRange(timeStart, t3After)
require.NoError(t, err)
assertdeep.Equal(t, []*types.Task{t1, t2, t3}, tasks)
tasks, err = c.GetTasksFromDateRange(timeStart, timeEnd)
require.NoError(t, err)
assertdeep.Equal(t, []*types.Task{t1, t2, t3}, tasks)
tasks, err = c.GetTasksFromDateRange(t1Before, timeEnd)
require.NoError(t, err)
assertdeep.Equal(t, []*types.Task{t1, t2, t3}, tasks)
tasks, err = c.GetTasksFromDateRange(t1After, timeEnd)
require.NoError(t, err)
assertdeep.Equal(t, []*types.Task{t2, t3}, tasks)
tasks, err = c.GetTasksFromDateRange(t2Before, timeEnd)
require.NoError(t, err)
assertdeep.Equal(t, []*types.Task{t2, t3}, tasks)
tasks, err = c.GetTasksFromDateRange(t2After, timeEnd)
require.NoError(t, err)
assertdeep.Equal(t, []*types.Task{t3}, tasks)
tasks, err = c.GetTasksFromDateRange(t3Before, timeEnd)
require.NoError(t, err)
assertdeep.Equal(t, []*types.Task{t3}, tasks)
tasks, err = c.GetTasksFromDateRange(t3After, timeEnd)
require.NoError(t, err)
assertdeep.Equal(t, []*types.Task{}, tasks)
}
func TestTaskCacheMultiRepo(t *testing.T) {
unittest.SmallTest(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := memory.NewInMemoryTaskDB()
// Insert several tasks with different repos.
startTime := time.Now().Add(-30 * time.Minute) // Arbitrary starting point.
t1 := types.MakeTestTask(startTime, []string{"a", "b"}) // Default Repo.
t2 := types.MakeTestTask(startTime, []string{"a", "b"})
t2.Repo = "thats-what-you.git"
t3 := types.MakeTestTask(startTime, []string{"b", "c"})
t3.Repo = "never-for.git"
require.NoError(t, d.PutTasks([]*types.Task{t1, t2, t3}))
d.Wait()
// Create the cache.
w, err := window.New(time.Hour, 0, nil)
require.NoError(t, err)
c, err := NewTaskCache(ctx, d, w, nil)
require.NoError(t, err)
// Check that there's no conflict among the tasks in different repos.
{
tasks, err := c.GetTasksForCommits(t1.Repo, []string{"a", "b", "c"})
require.NoError(t, err)
assertdeep.Equal(t, map[string]map[string]*types.Task{
"a": {
t1.Name: t1,
},
"b": {
t1.Name: t1,
},
"c": {},
}, tasks)
}
{
tasks, err := c.GetTasksForCommits(t2.Repo, []string{"a", "b", "c"})
require.NoError(t, err)
assertdeep.Equal(t, map[string]map[string]*types.Task{
"a": {
t1.Name: t2,
},
"b": {
t1.Name: t2,
},
"c": {},
}, tasks)
}
{
tasks, err := c.GetTasksForCommits(t3.Repo, []string{"a", "b", "c"})
require.NoError(t, err)
assertdeep.Equal(t, map[string]map[string]*types.Task{
"a": {},
"b": {
t1.Name: t3,
},
"c": {
t1.Name: t3,
},
}, tasks)
}
}
func TestTaskCacheUnfinished(t *testing.T) {
unittest.SmallTest(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := memory.NewInMemoryTaskDB()
// Insert a task.
startTime := time.Now().Add(-30 * time.Minute)
t1 := types.MakeTestTask(startTime, []string{"a"})
require.False(t, t1.Done())
require.NoError(t, d.PutTask(t1))
d.Wait()
// Add a pending task with no swarming ID to test that it won't appear
// in UnfinishedTasks.
fakeTask := types.MakeTestTask(startTime, []string{"b"})
fakeTask.SwarmingTaskId = ""
require.NoError(t, d.PutTask(fakeTask))
d.Wait()
// Create the cache. Ensure that the existing task is present.
w, err := window.New(time.Hour, 0, nil)
require.NoError(t, err)
wait := make(chan struct{})
c, err := NewTaskCache(ctx, d, w, func() {
wait <- struct{}{}
})
require.NoError(t, err)
<-wait
tasks, err := c.UnfinishedTasks()
require.NoError(t, err)
assertdeep.Equal(t, []*types.Task{t1}, tasks)
// Finish the task. Insert it, ensure that it's not unfinished.
t1.Status = types.TASK_STATUS_SUCCESS
require.True(t, t1.Done())
require.NoError(t, d.PutTask(t1))
d.Wait()
<-wait
require.NoError(t, c.Update())
tasks, err = c.UnfinishedTasks()
require.NoError(t, err)
assertdeep.Equal(t, []*types.Task{}, tasks)
// Already-finished task.
t2 := types.MakeTestTask(time.Now(), []string{"a"})
t2.Status = types.TASK_STATUS_MISHAP
require.True(t, t2.Done())
require.NoError(t, d.PutTask(t2))
d.Wait()
<-wait
require.NoError(t, c.Update())
tasks, err = c.UnfinishedTasks()
require.NoError(t, err)
assertdeep.Equal(t, []*types.Task{}, tasks)
// An unfinished task, created after the cache was created.
t3 := types.MakeTestTask(time.Now(), []string{"b"})
require.False(t, t3.Done())
require.NoError(t, d.PutTask(t3))
d.Wait()
<-wait
require.NoError(t, c.Update())
tasks, err = c.UnfinishedTasks()
require.NoError(t, err)
assertdeep.Equal(t, []*types.Task{t3}, tasks)
// Update the task.
t3.Commits = []string{"c", "d", "f"}
require.False(t, t3.Done())
require.NoError(t, d.PutTask(t3))
d.Wait()
<-wait
require.NoError(t, c.Update())
tasks, err = c.UnfinishedTasks()
require.NoError(t, err)
assertdeep.Equal(t, []*types.Task{t3}, tasks)
}
// assertTaskInSlice fails the test if task is not deep-equal to an element of
// slice.
func assertTaskInSlice(t *testing.T, task *types.Task, slice []*types.Task) {
for _, other := range slice {
if task.Id == other.Id {
assertdeep.Equal(t, task, other)
return
}
}
t.Fatalf("Did not find task %v in %v.", task, slice)
}
// assertTasksNotCached checks that none of tasks are retrievable from c.
func assertTasksNotCached(t *testing.T, c TaskCache, tasks []*types.Task) {
byTimeTasks, err := c.GetTasksFromDateRange(time.Date(1900, time.January, 1, 0, 0, 0, 0, time.UTC), time.Date(9999, time.January, 1, 0, 0, 0, 0, time.UTC))
require.NoError(t, err)
unfinishedTasks, err := c.UnfinishedTasks()
require.NoError(t, err)
for _, task := range tasks {
_, err := c.GetTask(task.Id)
require.Error(t, err)
require.True(t, db.IsNotFound(err))
for _, commit := range task.Commits {
found, err := c.GetTaskForCommit(types.DEFAULT_TEST_REPO, commit, task.Name)
require.NoError(t, err)
require.Nil(t, found)
tasks, err := c.GetTasksForCommits(types.DEFAULT_TEST_REPO, []string{commit})
require.NoError(t, err)
_, ok := tasks[commit][task.Name]
require.False(t, ok)
}
for _, other := range byTimeTasks {
if task.Id == other.Id {
t.Errorf("Found unexpected task %v in GetTasksFromDateRange.", task)
}
}
for _, other := range unfinishedTasks {
if task.Id == other.Id {
t.Errorf("Found unexpected task %v in UnfinishedTasks.", task)
}
}
}
}
func TestTaskCacheExpiration(t *testing.T) {
unittest.SmallTest(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := memory.NewInMemoryTaskDB()
period := 10 * time.Minute
w, err := window.New(period, 0, nil)
require.NoError(t, err)
timeStart := w.EarliestStart()
// Make a bunch of tasks with various timestamps.
mk := func(mins int, name string, blame []string) *types.Task {
t := types.MakeTestTask(timeStart.Add(time.Duration(mins)*time.Minute), blame)
t.Name = name
return t
}
tasks := []*types.Task{
mk(1, "Old", []string{"a"}), // 0
mk(1, "Build1", []string{"a", "b"}), // 1
mk(2, "Build1", []string{"c"}), // 2
mk(2, "Build2", []string{"c"}), // 3
mk(3, "Build1", []string{"d"}), // 4
mk(4, "Build2", []string{"d"}), // 5
mk(5, "Build3", []string{"a", "b", "c", "d"}), // 6
}
require.NoError(t, d.PutTasks(tasks))
d.Wait()
// Create the cache.
wait := make(chan struct{})
c, err := NewTaskCache(ctx, d, w, func() {
wait <- struct{}{}
})
require.NoError(t, err)
<-wait
{
// Check that tasks[0] and tasks[1] are in the cache.
firstTasks, err := c.GetTasksFromDateRange(timeStart, timeStart.Add(2*time.Minute))
require.NoError(t, err)
require.Equal(t, 2, len(firstTasks))
unfinishedTasks, err := c.UnfinishedTasks()
require.NoError(t, err)
for _, task := range []*types.Task{tasks[0], tasks[1]} {
cachedTask, err := c.GetTask(task.Id)
require.NoError(t, err)
assertdeep.Equal(t, task, cachedTask)
testGetTasksForCommits(t, c, task)
assertTaskInSlice(t, task, firstTasks)
assertTaskInSlice(t, task, unfinishedTasks)
require.True(t, c.KnownTaskName(task.Repo, task.Name))
}
}
// Add and update tasks.
tasks[1].Status = types.TASK_STATUS_SUCCESS
tasks[6].Commits = []string{"c", "d"}
tasks = append(tasks,
mk(7, "Build3", []string{"a", "b"}), // 7
mk(4, "Build4", []string{"a", "b", "c", "d"})) // 8
// Out of order to test TaskDB.GetModifiedTasks.
require.NoError(t, d.PutTasks([]*types.Task{tasks[6], tasks[8], tasks[1], tasks[7]}))
d.Wait()
<-wait
// update, expiring tasks[0] and tasks[1].
require.NoError(t, w.UpdateWithTime(tasks[0].Created.Add(period).Add(time.Nanosecond)))
require.NoError(t, c.Update())
{
// Check that tasks[0] and tasks[1] are no longer in the cache.
firstTasks, err := c.GetTasksFromDateRange(timeStart, timeStart.Add(2*time.Minute))
require.NoError(t, err)
require.Equal(t, 0, len(firstTasks))
assertTasksNotCached(t, c, []*types.Task{tasks[0], tasks[1]})
// tasks[0].Name is no longer known, but there are later Tasks for tasks[1].Name.
require.False(t, c.KnownTaskName(tasks[0].Repo, tasks[0].Name))
require.True(t, c.KnownTaskName(tasks[1].Repo, tasks[1].Name))
}
{
// Check that other tasks are still cached correctly.
allCachedTasks, err := c.GetTasksFromDateRange(timeStart, timeStart.Add(20*time.Minute))
require.NoError(t, err)
orderedTasks := []*types.Task{tasks[2], tasks[3], tasks[4], tasks[5], tasks[8], tasks[6], tasks[7]}
// Tasks with same timestamp can be in either order.
if allCachedTasks[0].Id != orderedTasks[0].Id {
allCachedTasks[0], allCachedTasks[1] = allCachedTasks[1], allCachedTasks[0]
}
if allCachedTasks[3].Id != orderedTasks[3].Id {
allCachedTasks[3], allCachedTasks[4] = allCachedTasks[4], allCachedTasks[3]
}
assertdeep.Equal(t, orderedTasks, allCachedTasks)
unfinishedTasks, err := c.UnfinishedTasks()
require.NoError(t, err)
for _, task := range orderedTasks {
cachedTask, err := c.GetTask(task.Id)
require.NoError(t, err)
assertdeep.Equal(t, task, cachedTask)
testGetTasksForCommits(t, c, task)
assertTaskInSlice(t, task, unfinishedTasks)
require.True(t, c.KnownTaskName(task.Repo, task.Name))
}
}
// Test entire cache expiration.
newTasks := []*types.Task{
mk(11, "Build3", []string{"e"}),
}
require.NoError(t, d.PutTasks(newTasks))
d.Wait()
<-wait
require.NoError(t, w.UpdateWithTime(newTasks[0].Created.Add(period)))
require.NoError(t, c.Update())
{
// Check that only new task is in the cache.
assertTasksNotCached(t, c, tasks)
for _, task := range newTasks {
cachedTask, err := c.GetTask(task.Id)
require.NoError(t, err)
assertdeep.Equal(t, task, cachedTask)
testGetTasksForCommits(t, c, task)
}
allCachedTasks, err := c.GetTasksFromDateRange(timeStart, timeStart.Add(20*time.Minute))
require.NoError(t, err)
assertdeep.Equal(t, newTasks, allCachedTasks)
// Only new task is known.
require.True(t, c.KnownTaskName(newTasks[0].Repo, newTasks[0].Name))
for _, name := range []string{"Old", "Build1", "Build2"} {
require.False(t, c.KnownTaskName(types.DEFAULT_TEST_REPO, name))
}
unfinishedTasks, err := c.UnfinishedTasks()
require.NoError(t, err)
assertdeep.Equal(t, newTasks, unfinishedTasks)
}
}
func TestJobCache(t *testing.T) {
unittest.SmallTest(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := memory.NewInMemoryJobDB()
// Pre-load a job into the DB.
startTime := time.Now().Add(-30 * time.Minute) // Arbitrary starting point.
j1 := types.MakeTestJob(startTime)
require.NoError(t, d.PutJob(j1))
d.Wait()
// Create the cache. Ensure that the existing job is present.
w, err := window.New(time.Hour, 0, nil)
require.NoError(t, err)
wait := make(chan struct{})
c, err := NewJobCache(ctx, d, w, func() {
wait <- struct{}{}
})
require.NoError(t, err)
<-wait
test, err := c.GetJob(j1.Id)
require.NoError(t, err)
assertdeep.Equal(t, j1, test)
jobs, err := c.GetJobsByRepoState(j1.Name, j1.RepoState)
require.NoError(t, err)
require.Equal(t, 1, len(jobs))
assertdeep.Equal(t, jobs[0], test)
// Create another job. Ensure that it gets picked up.
j2 := types.MakeTestJob(startTime.Add(time.Nanosecond))
require.NoError(t, d.PutJob(j2))
d.Wait()
<-wait
test, err = c.GetJob(j2.Id)
require.Error(t, err)
require.NoError(t, c.Update())
test, err = c.GetJob(j2.Id)
require.NoError(t, err)
assertdeep.Equal(t, j2, test)
jobs, err = c.GetJobsByRepoState(j2.Name, j2.RepoState)
require.NoError(t, err)
require.Equal(t, 2, len(jobs))
assertdeep.Equal(t, jobs[1], test)
// Ensure that we don't insert outdated entries.
old := j1.Copy()
require.False(t, util.TimeIsZero(old.DbModified))
old.Name = "outdated"
old.DbModified = old.DbModified.Add(-time.Hour)
c.(*jobCache).modified[old.Id] = old
require.NoError(t, c.Update())
got, err := c.GetJob(old.Id)
require.NoError(t, err)
assertdeep.Equal(t, got, j1)
}
func testGetUnfinished(t *testing.T, expect []*types.Job, cache JobCache) {
jobs, err := cache.UnfinishedJobs()
require.NoError(t, err)
sort.Sort(types.JobSlice(jobs))
sort.Sort(types.JobSlice(expect))
assertdeep.Equal(t, expect, jobs)
}
func TestJobCacheUnfinished(t *testing.T) {
unittest.SmallTest(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := memory.NewInMemoryJobDB()
// Insert a job.
startTime := time.Now().Add(-30 * time.Minute)
j1 := types.MakeTestJob(startTime)
require.False(t, j1.Done())
require.NoError(t, d.PutJob(j1))
d.Wait()
// Create the cache. Ensure that the existing job is present.
w, err := window.New(time.Hour, 0, nil)
require.NoError(t, err)
wait := make(chan struct{})
c, err := NewJobCache(ctx, d, w, func() {
wait <- struct{}{}
})
require.NoError(t, err)
<-wait
testGetUnfinished(t, []*types.Job{j1}, c)
// Finish the job. Insert it, ensure that it's not unfinished.
j1.Status = types.JOB_STATUS_SUCCESS
require.True(t, j1.Done())
require.NoError(t, d.PutJob(j1))
d.Wait()
<-wait
require.NoError(t, c.Update())
testGetUnfinished(t, []*types.Job{}, c)
// Already-finished job.
j2 := types.MakeTestJob(time.Now())
j2.Status = types.JOB_STATUS_MISHAP
require.True(t, j2.Done())
require.NoError(t, d.PutJob(j2))
d.Wait()
<-wait
require.NoError(t, c.Update())
testGetUnfinished(t, []*types.Job{}, c)
// An unfinished job, created after the cache was created.
j3 := types.MakeTestJob(time.Now())
require.False(t, j3.Done())
require.NoError(t, d.PutJob(j3))
d.Wait()
<-wait
require.NoError(t, c.Update())
testGetUnfinished(t, []*types.Job{j3}, c)
// Update the job.
j3.Dependencies = map[string][]string{"a": {}, "b": {}, "c": {}}
require.False(t, j3.Done())
require.NoError(t, d.PutJob(j3))
d.Wait()
<-wait
require.NoError(t, c.Update())
testGetUnfinished(t, []*types.Job{j3}, c)
}
// assertJobInSlice fails the test if job is not deep-equal to an element of
// slice.
func assertJobInSlice(t *testing.T, job *types.Job, slice []*types.Job) {
for _, other := range slice {
if job.Id == other.Id {
assertdeep.Equal(t, job, other)
return
}
}
t.Fatalf("Did not find job %v in %v.", job, slice)
}
// assertJobsCached checks that all of jobs are retrievable from c.
func assertJobsCached(t *testing.T, c JobCache, jobs []*types.Job) {
unfinishedJobs, err := c.UnfinishedJobs()
require.NoError(t, err)
for _, job := range jobs {
cachedJob, err := c.GetJob(job.Id)
require.NoError(t, err)
assertdeep.Equal(t, job, cachedJob)
if !job.Done() {
assertJobInSlice(t, job, unfinishedJobs)
}
cachedJobs, err := c.GetJobsByRepoState(job.Name, job.RepoState)
require.NoError(t, err)
found := false
for _, otherJob := range cachedJobs {
if job.Id == otherJob.Id {
assertdeep.Equal(t, job, otherJob)
found = true
}
}
require.True(t, found)
found = false
jobsByName, err := c.GetMatchingJobsFromDateRange([]string{job.Name}, job.Created, job.Created.Add(time.Nanosecond))
require.NoError(t, err)
for _, jobsForName := range jobsByName {
for _, otherJob := range jobsForName {
if job.Id == otherJob.Id {
assertdeep.Equal(t, job, otherJob)
found = true
}
}
}
require.True(t, found)
}
}
// assertJobsNotCached checks that none of jobs are retrievable from c.
func assertJobsNotCached(t *testing.T, c JobCache, jobs []*types.Job) {
unfinishedJobs, err := c.UnfinishedJobs()
require.NoError(t, err)
for _, job := range jobs {
_, err := c.GetJob(job.Id)
require.Error(t, err)
require.True(t, db.IsNotFound(err))
for _, other := range unfinishedJobs {
if job.Id == other.Id {
t.Errorf("Found unexpected job %v in UnfinishedJobs.", job)
}
}
cachedJobs, err := c.GetJobsByRepoState(job.Name, job.RepoState)
require.NoError(t, err)
for _, otherJob := range cachedJobs {
if job.Id == otherJob.Id {
t.Fatalf("Found unexpected job %v in GetJobsByRepoState", job)
}
}
found := false
jobsByName, err := c.GetMatchingJobsFromDateRange([]string{job.Name}, time.Time{}, time.Now().Add(10*24*time.Hour))
require.NoError(t, err)
for _, jobsForName := range jobsByName {
for _, otherJob := range jobsForName {
if job.Id == otherJob.Id {
found = true
}
}
}
require.False(t, found)
}
}
func TestJobCacheExpiration(t *testing.T) {
unittest.SmallTest(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := memory.NewInMemoryJobDB()
period := 10 * time.Minute
w, err := window.New(period, 0, nil)
require.NoError(t, err)
timeStart := w.EarliestStart()
// Make a bunch of jobs with various revisions.
mk := func(ts time.Time, isForce bool) *types.Job {
job := types.MakeTestJob(ts)
job.IsForce = isForce
return job
}
jobs := []*types.Job{
mk(timeStart.Add(1*time.Minute), false), // 0
mk(timeStart.Add(1*time.Minute), false), // 1
mk(timeStart.Add(2*time.Minute), false), // 2
mk(timeStart.Add(2*time.Minute), true), // 3
mk(timeStart.Add(2*time.Minute), false), // 4
mk(timeStart.Add(4*time.Minute), true), // 5
mk(timeStart.Add(4*time.Minute), true), // 6
mk(timeStart.Add(3*time.Minute), false), // 7
}
require.NoError(t, d.PutJobs(jobs))
d.Wait()
// Create the cache.
wait := make(chan struct{})
jobCacheI, err := NewJobCache(ctx, d, w, func() {
wait <- struct{}{}
})
require.NoError(t, err)
<-wait
c := jobCacheI.(*jobCache) // To access update method.
// Check that jobs[0] and jobs[1] are in the cache.
assertJobsCached(t, c, []*types.Job{jobs[0], jobs[1]})
// Add and update jobs.
jobs[1].Status = types.JOB_STATUS_SUCCESS
jobs = append(jobs, mk(timeStart.Add(3*time.Minute), false)) // 8
require.NoError(t, d.PutJobs([]*types.Job{jobs[1], jobs[8]}))
d.Wait()
<-wait
// update, expiring jobs[0] and jobs[1].
require.NoError(t, w.UpdateWithTime(timeStart.Add(time.Minute).Add(period).Add(time.Nanosecond)))
require.NoError(t, c.Update())
// Check that jobs[0] and jobs[1] are no longer in the cache.
assertJobsNotCached(t, c, jobs[:2])
// Check that other jobs are still cached correctly.
assertJobsCached(t, c, jobs[2:])
// Test entire cache expiration.
newJobs := []*types.Job{
mk(timeStart.Add(5*time.Minute), false),
}
require.NoError(t, d.PutJobs(newJobs))
d.Wait()
<-wait
require.NoError(t, w.UpdateWithTime(timeStart.Add(5*time.Minute).Add(period).Add(-time.Nanosecond)))
require.NoError(t, c.Update())
// Check that only new job is in the cache.
assertJobsNotCached(t, c, jobs)
assertJobsCached(t, c, newJobs)
}
func TestJobCacheGetMatchingJobsFromDateRange(t *testing.T) {
unittest.SmallTest(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := memory.NewInMemoryJobDB()
// Pre-load a job into the DB.
startTime := time.Now().Add(-30 * time.Minute) // Arbitrary starting point.
j1 := types.MakeTestJob(startTime)
j2 := types.MakeTestJob(startTime)
j2.Name = "job2"
require.NoError(t, d.PutJobs([]*types.Job{j1, j2}))
d.Wait()
// Create the cache. Ensure that the existing job is present.
w, err := window.New(time.Hour, 0, nil)
require.NoError(t, err)
c, err := NewJobCache(ctx, d, w, nil)
require.NoError(t, err)
test := func(names []string, start, end time.Time, expect ...*types.Job) {
expectByName := make(map[string][]*types.Job, len(expect))
for _, job := range expect {
expectByName[job.Name] = append(expectByName[job.Name], job)
}
jobs, err := c.GetMatchingJobsFromDateRange(names, start, end)
require.NoError(t, err)
assertdeep.Equal(t, expectByName, jobs)
}
test([]string{j1.Name, j2.Name}, time.Time{}, time.Now().Add(24*time.Hour), j1, j2)
test([]string{j1.Name, j2.Name}, j1.Created, j1.Created.Add(time.Nanosecond), j1, j2)
test([]string{j1.Name, j2.Name}, time.Time{}, j1.Created)
test([]string{j1.Name, j2.Name}, j1.Created.Add(time.Nanosecond), time.Now().Add(24*time.Hour))
test([]string{j1.Name}, j1.Created, j1.Created.Add(time.Nanosecond), j1)
}