blob: bc24f7b756264b061d7a29b18f59d066658eb8a6 [file] [log] [blame]
package cache
import (
"context"
"fmt"
"sort"
"sync"
"time"
"go.opencensus.io/trace"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/types"
"go.skia.org/infra/task_scheduler/go/window"
)
const (
// Caches should initially allocate enough space for this many tasks.
tasksInitCapacity = 60000
// Caches should initially allocate enough space for this many jobs.
jobsInitCapacity = 10000
)
type TaskCache interface {
// GetTask returns the task with the given ID, or an error if no such task exists.
GetTask(id string) (*types.Task, error)
// GetTaskMaybeExpired does the same as GetTask but tries to dig into
// the DB in case the Task is old enough to have scrolled out of the
// cache window.
GetTaskMaybeExpired(context.Context, string) (*types.Task, error)
// GetTaskForCommit retrieves the task with the given name whose blamelist
// includes the given commit, or nil if no such task exists.
GetTaskForCommit(repo string, revision string, name string) (*types.Task, error)
// GetTasksByKey returns the tasks with the given TaskKey, sorted
// by creation time.
GetTasksByKey(key types.TaskKey) ([]*types.Task, error)
// GetTasksForCommits retrieves all tasks which included[1] each of the
// given commits. Returns a map whose keys are commit hashes and values are
// sub-maps whose keys are task spec names and values are tasks.
//
// 1) Blamelist calculation is outside the scope of the taskCache, but the
// implied assumption here is that there is at most one task for each
// task spec which has a given commit in its blamelist. The user is
// responsible for inserting tasks into the database so that this invariant
// is maintained. Generally, a more recent task will "steal" commits from an
// earlier task's blamelist, if the blamelists overlap. There are three
// cases to consider:
// 1. The newer task ran at a newer commit than the older task. Its
// blamelist consists of all commits not covered by the previous task,
// and therefore does not overlap with the older task's blamelist.
// 2. The newer task ran at the same commit as the older task. Its
// blamelist is the same as the previous task's blamelist, and
// therefore it "steals" all commits from the previous task, whose
// blamelist becomes empty.
// 3. The newer task ran at a commit which was in the previous task's
// blamelist. Its blamelist consists of the commits in the previous
// task's blamelist which it also covered. Those commits move out of
// the previous task's blamelist and into the newer task's blamelist.
GetTasksForCommits(repo string, commits []string) (map[string]map[string]*types.Task, error)
// GetTasksFromDateRange retrieves all tasks which were created in the given
// date range.
GetTasksFromDateRange(from time.Time, to time.Time) ([]*types.Task, error)
// KnownTaskName returns true iff the given task name has been seen
// before for a non-forced, non-tryjob run.
KnownTaskName(repo string, name string) bool
// UnfinishedTasks returns a list of tasks which were not finished at
// the time of the last cache update. Fake tasks are not included.
UnfinishedTasks() ([]*types.Task, error)
// Update loads new tasks from the database.
Update(ctx context.Context) error
// AddTasks adds tasks directly to the TaskCache.
AddTasks(tasks []*types.Task)
}
type taskCache struct {
db db.TaskReader
// map[repo_name][task_spec_name]Task.Created for most recent Task.
knownTaskNames map[string]map[string]time.Time
mtx sync.RWMutex
tasks map[string]*types.Task
// map[repo_name][commit_hash][task_spec_name]*Task
tasksByCommit map[string]map[string]map[string]*types.Task
// map[TaskKey]map[task_id]*Task
tasksByKey map[types.TaskKey]map[string]*types.Task
// tasksByTime is sorted by Task.Created.
tasksByTime []*types.Task
timeWindow window.Window
unfinished map[string]*types.Task
// Stash modified tasks until Update() is called.
modified map[string]*types.Task
modMtx sync.Mutex
onModFn func()
}
// See documentation for TaskCache interface.
func (c *taskCache) GetTask(id string) (*types.Task, error) {
c.mtx.RLock()
defer c.mtx.RUnlock()
if t, ok := c.tasks[id]; ok {
return t.Copy(), nil
}
return nil, db.ErrNotFound
}
// See documentation for TaskCache interface.
func (c *taskCache) GetTaskMaybeExpired(ctx context.Context, id string) (*types.Task, error) {
t, err := c.GetTask(id)
if err == nil {
return t, nil
} else if err != db.ErrNotFound {
return nil, err
}
// Fall back to searching the DB.
t, err = c.db.GetTaskById(ctx, id)
if err != nil {
return nil, err
} else if t == nil {
return nil, db.ErrNotFound
}
return t, nil
}
// See documentation for TaskCache interface.
func (c *taskCache) GetTasksByKey(k types.TaskKey) ([]*types.Task, error) {
if !k.Valid() {
return nil, fmt.Errorf("TaskKey is invalid: %v", k)
}
c.mtx.RLock()
defer c.mtx.RUnlock()
tasks := c.tasksByKey[k]
rv := make([]*types.Task, 0, len(tasks))
for _, t := range tasks {
rv = append(rv, t.Copy())
}
sort.Sort(types.TaskSlice(rv))
return rv, nil
}
// See documentation for TaskCache interface.
func (c *taskCache) GetTasksForCommits(repo string, commits []string) (map[string]map[string]*types.Task, error) {
c.mtx.RLock()
defer c.mtx.RUnlock()
rv := make(map[string]map[string]*types.Task, len(commits))
commitMap := c.tasksByCommit[repo]
for _, commit := range commits {
if tasks, ok := commitMap[commit]; ok {
rv[commit] = make(map[string]*types.Task, len(tasks))
for k, v := range tasks {
rv[commit][k] = v.Copy()
}
} else {
rv[commit] = map[string]*types.Task{}
}
}
return rv, nil
}
// searchTaskSlice returns the index in tasks of the first Task whose Created
// time is >= ts.
func searchTaskSlice(tasks []*types.Task, ts time.Time) int {
return sort.Search(len(tasks), func(i int) bool {
return !tasks[i].Created.Before(ts)
})
}
// See documentation for TaskCache interface.
func (c *taskCache) GetTasksFromDateRange(from time.Time, to time.Time) ([]*types.Task, error) {
c.mtx.RLock()
defer c.mtx.RUnlock()
fromIdx := searchTaskSlice(c.tasksByTime, from)
toIdx := searchTaskSlice(c.tasksByTime, to)
rv := make([]*types.Task, toIdx-fromIdx)
for i, task := range c.tasksByTime[fromIdx:toIdx] {
rv[i] = task.Copy()
}
return rv, nil
}
// See documentation for TaskCache interface.
func (c *taskCache) KnownTaskName(repo, name string) bool {
c.mtx.RLock()
defer c.mtx.RUnlock()
_, ok := c.knownTaskNames[repo][name]
return ok
}
// See documentation for TaskCache interface.
func (c *taskCache) GetTaskForCommit(repo, commit, name string) (*types.Task, error) {
c.mtx.RLock()
defer c.mtx.RUnlock()
commitMap, ok := c.tasksByCommit[repo]
if !ok {
return nil, nil
}
if tasks, ok := commitMap[commit]; ok {
if t, ok := tasks[name]; ok {
return t.Copy(), nil
}
}
return nil, nil
}
// See documentation for TaskCache interface.
func (c *taskCache) UnfinishedTasks() ([]*types.Task, error) {
c.mtx.RLock()
defer c.mtx.RUnlock()
rv := make([]*types.Task, 0, len(c.unfinished))
for _, t := range c.unfinished {
rv = append(rv, t.Copy())
}
return rv, nil
}
// removeFromTasksByCommit removes task (which must be a previously-inserted
// Task, not a new Task) from c.tasksByCommit for all of task.Commits. Assumes
// the caller holds a lock.
func (c *taskCache) removeFromTasksByCommit(task *types.Task) {
if commitMap, ok := c.tasksByCommit[task.Repo]; ok {
for _, commit := range task.Commits {
// Shouldn't be necessary to check other.Id == task.Id, but being paranoid.
if other, ok := commitMap[commit][task.Name]; ok && other.Id == task.Id {
delete(commitMap[commit], task.Name)
if len(commitMap[commit]) == 0 {
delete(commitMap, commit)
}
}
}
}
}
// expireTasks removes data from c whose Created time is before the beginning
// of the Window. Assumes the caller holds a lock. This is a helper for
// Update.
func (c *taskCache) expireTasks() {
for repoUrl, nameMap := range c.knownTaskNames {
for name, ts := range nameMap {
if !c.timeWindow.TestTime(repoUrl, ts) {
delete(nameMap, name)
}
}
}
for i, task := range c.tasksByTime {
if c.timeWindow.TestTime(task.Repo, task.Created) {
c.tasksByTime = c.tasksByTime[i:]
return
}
// Tasks by ID.
delete(c.tasks, task.Id)
// Tasks by commit.
c.removeFromTasksByCommit(task)
// Tasks by key.
byKey, ok := c.tasksByKey[task.TaskKey]
if ok {
delete(byKey, task.Id)
if len(byKey) == 0 {
delete(c.tasksByKey, task.TaskKey)
}
}
// Tasks by time.
c.tasksByTime[i] = nil // Allow GC.
// Unfinished tasks.
if _, ok := c.unfinished[task.Id]; ok {
sklog.Warningf("Found unfinished task that is so old it is being expired. %#v", task)
delete(c.unfinished, task.Id)
}
}
if len(c.tasksByTime) > 0 {
sklog.Warningf("All tasks expired because they are outside the window.")
c.tasksByTime = nil
}
}
// insertOrUpdateTask inserts task into the cache if it is a new task, or
// updates the existing entries if not. Assumes the caller holds a lock. This is
// a helper for Update.
func (c *taskCache) insertOrUpdateTask(task *types.Task) {
old, isUpdate := c.tasks[task.Id]
if isUpdate && !task.DbModified.After(old.DbModified) {
return
}
// Insert the new task into the main map.
c.tasks[task.Id] = task
// Insert into tasksByKey.
byKey, ok := c.tasksByKey[task.TaskKey]
if !ok {
byKey = map[string]*types.Task{}
c.tasksByKey[task.TaskKey] = byKey
}
byKey[task.Id] = task
if isUpdate {
// If we already know about this task, the blamelist might have changed, so
// we need to remove it from tasksByCommit and re-insert where needed.
c.removeFromTasksByCommit(old)
}
// Insert the task into tasksByCommits.
commitMap, ok := c.tasksByCommit[task.Repo]
if !ok {
commitMap = map[string]map[string]*types.Task{}
c.tasksByCommit[task.Repo] = commitMap
}
for _, commit := range task.Commits {
if _, ok := commitMap[commit]; !ok {
commitMap[commit] = map[string]*types.Task{}
}
commitMap[commit][task.Name] = task
}
if isUpdate {
// Loop in case there are multiple tasks with the same Created time.
for i := searchTaskSlice(c.tasksByTime, task.Created); i < len(c.tasksByTime); i++ {
other := c.tasksByTime[i]
if other.Id == task.Id {
c.tasksByTime[i] = task
break
}
if !other.Created.Equal(task.Created) {
panic(fmt.Sprintf("taskCache inconsistent; c.tasks contains task not in c.tasksByTime. old: %v, task: %v", old, task))
}
}
} else {
// If profiling indicates this code is slow or GCs too much, see
// https://skia.googlesource.com/buildbot/+show/0cf94832dd57f0e7b5b9f1b28546181d15dbbbc6
// for a different implementation.
// Most common case is that the new task should be inserted at the end.
if len(c.tasksByTime) == 0 {
c.tasksByTime = append(make([]*types.Task, 0, tasksInitCapacity), task)
} else if lastTask := c.tasksByTime[len(c.tasksByTime)-1]; !task.Created.Before(lastTask.Created) {
c.tasksByTime = append(c.tasksByTime, task)
} else {
insertIdx := searchTaskSlice(c.tasksByTime, task.Created)
// Extend size by one:
c.tasksByTime = append(c.tasksByTime, nil)
// Move later elements out of the way:
copy(c.tasksByTime[insertIdx+1:], c.tasksByTime[insertIdx:])
// Assign at the correct index:
c.tasksByTime[insertIdx] = task
}
}
// Unfinished tasks.
if !task.Done() && !task.Fake() {
c.unfinished[task.Id] = task
} else if isUpdate {
delete(c.unfinished, task.Id)
}
// Known task names.
if !task.IsForceRun() && !task.IsTryJob() {
if nameMap, ok := c.knownTaskNames[task.Repo]; ok {
if ts, ok := nameMap[task.Name]; !ok || ts.Before(task.Created) {
nameMap[task.Name] = task.Created
}
} else {
c.knownTaskNames[task.Repo] = map[string]time.Time{task.Name: task.Created}
}
}
}
// See documentation for TaskCache interface.
func (c *taskCache) Update(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "taskcache_Update")
defer span.End()
c.mtx.Lock()
defer c.mtx.Unlock()
c.modMtx.Lock()
defer c.modMtx.Unlock()
c.expireTasks()
for _, t := range c.modified {
if c.timeWindow.TestTime(t.Repo, t.Created) {
c.insertOrUpdateTask(t)
}
}
c.modified = map[string]*types.Task{}
return nil
}
// See documentation for TaskCache interface.
func (c *taskCache) AddTasks(tasks []*types.Task) {
c.mtx.Lock()
defer c.mtx.Unlock()
for _, t := range tasks {
if c.timeWindow.TestTime(t.Repo, t.Created) {
c.insertOrUpdateTask(t.Copy())
}
}
}
// NewTaskCache returns a local cache which provides more convenient views of
// task data than the database can provide. The last parameter is a callback
// function which is called when modified tasks are received from the DB. This
// is used for testing.
func NewTaskCache(ctx context.Context, d db.TaskReader, timeWindow window.Window, onModifiedTasks func()) (TaskCache, error) {
mod := d.ModifiedTasksCh(ctx)
tasks, err := db.GetTasksFromWindow(ctx, d, timeWindow)
if err != nil {
return nil, err
}
c := &taskCache{
db: d,
knownTaskNames: map[string]map[string]time.Time{},
tasks: map[string]*types.Task{},
tasksByCommit: map[string]map[string]map[string]*types.Task{},
tasksByKey: map[types.TaskKey]map[string]*types.Task{},
unfinished: map[string]*types.Task{},
modified: map[string]*types.Task{},
timeWindow: timeWindow,
onModFn: onModifiedTasks,
}
for _, t := range tasks {
if c.timeWindow.TestTime(t.Repo, t.Created) {
c.insertOrUpdateTask(t)
}
}
go func() {
for tasks := range mod {
c.modMtx.Lock()
for _, task := range tasks {
if old, ok := c.modified[task.Id]; !ok || task.DbModified.After(old.DbModified) {
c.modified[task.Id] = task
}
}
c.modMtx.Unlock()
if c.onModFn != nil {
c.onModFn()
}
}
sklog.Warning("Modified tasks channel closed; is this expected?")
}()
return c, nil
}
type JobCache interface {
// GetAllCachedJobs returns every job in the cache.
GetAllCachedJobs() []*types.Job
// GetJob returns the job with the given ID, or an error if no such job exists.
GetJob(string) (*types.Job, error)
// GetJobMaybeExpired does the same as GetJob but tries to dig into the
// DB in case the Job is old enough to have scrolled out of the cache
// window.
GetJobMaybeExpired(context.Context, string) (*types.Job, error)
// GetJobsByRepoState retrieves all known jobs with the given name at
// the given RepoState. Does not search the underlying DB.
GetJobsByRepoState(string, types.RepoState) ([]*types.Job, error)
// GetJobsFromDateRange retrieves all jobs which were created in the
// given date range.
GetJobsFromDateRange(time.Time, time.Time) ([]*types.Job, error)
// GetMatchingJobsFromDateRange retrieves all jobs which were created
// in the given date range and match one of the given job names.
GetMatchingJobsFromDateRange(names []string, from time.Time, to time.Time) (map[string][]*types.Job, error)
// RequestedJobs returns a list of jobs which were not yet started at
// the time of the last cache update.
RequestedJobs() ([]*types.Job, error)
// InProgressJobs returns a list of jobs which were started but not finished
// at the time of the last cache update.
InProgressJobs() ([]*types.Job, error)
// Update loads new jobs from the database.
Update(ctx context.Context) error
// AddJobs adds jobs directly to the JobCache.
AddJobs([]*types.Job)
// LastUpdated is the timestamp of the last call to Update.
LastUpdated() time.Time
}
type jobCache struct {
db db.JobReader
mtx sync.RWMutex
jobs map[string]*types.Job
jobsByNameAndState map[types.RepoState]map[string]map[string]*types.Job
// jobsByTime is sorted by Task.Created.
jobsByTime []*types.Job
timeWindow window.Window
requested map[string]*types.Job
inProgress map[string]*types.Job
modified map[string]*types.Job
modMtx sync.Mutex
onModFn func()
lastUpdated time.Time
}
// See documentation for JobCache interface.
func (c *jobCache) GetAllCachedJobs() []*types.Job {
c.mtx.RLock()
defer c.mtx.RUnlock()
rv := make([]*types.Job, 0, len(c.jobs))
for _, j := range c.jobs {
rv = append(rv, j.Copy())
}
return rv
}
// See documentation for JobCache interface.
func (c *jobCache) GetJob(id string) (*types.Job, error) {
c.mtx.RLock()
defer c.mtx.RUnlock()
if j, ok := c.jobs[id]; ok {
return j.Copy(), nil
}
return nil, db.ErrNotFound
}
// See documentation for JobCache interface.
func (c *jobCache) GetJobMaybeExpired(ctx context.Context, id string) (*types.Job, error) {
j, err := c.GetJob(id)
if err == nil {
return j, nil
}
if err != db.ErrNotFound {
return nil, err
}
return c.db.GetJobById(ctx, id)
}
// See documentation for JobCache interface.
func (c *jobCache) GetJobsByRepoState(name string, rs types.RepoState) ([]*types.Job, error) {
c.mtx.RLock()
defer c.mtx.RUnlock()
jobs := c.jobsByNameAndState[rs][name]
rv := make([]*types.Job, 0, len(jobs))
for _, j := range jobs {
rv = append(rv, j)
}
sort.Sort(types.JobSlice(rv))
return rv, nil
}
// searchJobSlice returns the index in tasks of the first Job whose Created
// time is >= ts.
func searchJobSlice(jobs []*types.Job, ts time.Time) int {
return sort.Search(len(jobs), func(i int) bool {
return !jobs[i].Created.Before(ts)
})
}
// jobSliceIsSorted returns true if jobs is sorted by Created time.
func jobSliceIsSorted(jobs []*types.Job) bool {
for i := 0; i < len(jobs)-1; i++ {
if jobs[i+1].Created.Before(jobs[i].Created) {
return false
}
}
return true
}
// See documentation for JobCache interface.
func (c *jobCache) GetJobsFromDateRange(from time.Time, to time.Time) ([]*types.Job, error) {
c.mtx.RLock()
defer c.mtx.RUnlock()
fromIdx := searchJobSlice(c.jobsByTime, from)
toIdx := searchJobSlice(c.jobsByTime, to)
rv := make([]*types.Job, toIdx-fromIdx)
for i, job := range c.jobsByTime[fromIdx:toIdx] {
rv[i] = job.Copy()
}
return rv, nil
}
// See documentation for JobCache interface.
func (c *jobCache) GetMatchingJobsFromDateRange(names []string, from time.Time, to time.Time) (map[string][]*types.Job, error) {
jobs, err := c.GetJobsFromDateRange(from, to)
if err != nil {
return nil, err
}
m := make(map[string]bool, len(names))
for _, name := range names {
m[name] = true
}
rv := make(map[string][]*types.Job, len(names))
for _, job := range jobs {
if m[job.Name] {
rv[job.Name] = append(rv[job.Name], job)
}
}
return rv, nil
}
// See documentation for JobCache interface.
func (c *jobCache) RequestedJobs() ([]*types.Job, error) {
c.mtx.RLock()
defer c.mtx.RUnlock()
rv := make([]*types.Job, 0, len(c.requested))
for _, t := range c.requested {
rv = append(rv, t.Copy())
}
return rv, nil
}
// See documentation for JobCache interface.
func (c *jobCache) InProgressJobs() ([]*types.Job, error) {
c.mtx.RLock()
defer c.mtx.RUnlock()
rv := make([]*types.Job, 0, len(c.inProgress))
for _, t := range c.inProgress {
rv = append(rv, t.Copy())
}
return rv, nil
}
// expireJobs removes data from c where Job.Created is before start. Assumes
// the caller holds a lock. This is a helper for Update.
func (c *jobCache) expireJobs() {
expiredUnfinishedCount := 0
defer func() {
if expiredUnfinishedCount > 0 {
sklog.Infof("Expired %d unfinished jobs created before window.", expiredUnfinishedCount)
}
}()
defer func() {
// Debugging for https://bugs.chromium.org/p/skia/issues/detail?id=9444
if len(c.jobsByTime) > 0 {
firstTs := c.jobsByTime[0].Created
for _, job := range c.inProgress {
if job.Created.Before(firstTs) {
sklog.Warningf("Found job %q in unfinished, Created %s, before first jobsByTime, Created %s. %+v", job.Id, job.Created.Format(time.RFC3339Nano), firstTs.Format(time.RFC3339Nano), job)
}
}
} else {
if len(c.inProgress) > 0 {
sklog.Warningf("Found %d jobs in unfinished when jobsByTime is empty", len(c.inProgress))
}
}
}()
for i, job := range c.jobsByTime {
if c.timeWindow.TestTime(job.Repo, job.Created) {
c.jobsByTime = c.jobsByTime[i:]
return
}
// Allow GC.
c.jobsByTime[i] = nil
delete(c.jobs, job.Id)
delete(c.inProgress, job.Id)
byName := c.jobsByNameAndState[job.RepoState]
byId := byName[job.Name]
delete(byId, job.Id)
if len(byId) == 0 {
delete(byName, job.Name)
}
if len(byName) == 0 {
delete(c.jobsByNameAndState, job.RepoState)
}
if !job.Done() {
expiredUnfinishedCount++
}
}
if len(c.jobsByTime) > 0 {
sklog.Warningf("All jobs expired because they are outside the window.")
c.jobsByTime = nil
}
}
// insertOrUpdateJob inserts the new/updated job into the cache. Assumes the
// caller holds a lock. This is a helper for Update.
func (c *jobCache) insertOrUpdateJob(job *types.Job) {
old, isUpdate := c.jobs[job.Id]
if isUpdate && !job.DbModified.After(old.DbModified) {
return
}
// Insert the new job into the main map.
c.jobs[job.Id] = job
// Map by RepoState, Name, and ID.
byName, ok := c.jobsByNameAndState[job.RepoState]
if !ok {
byName = map[string]map[string]*types.Job{}
c.jobsByNameAndState[job.RepoState] = byName
}
byId, ok := byName[job.Name]
if !ok {
byId = map[string]*types.Job{}
byName[job.Name] = byId
}
byId[job.Id] = job
// In-progress jobs.
if job.Status == types.JOB_STATUS_IN_PROGRESS {
c.inProgress[job.Id] = job
} else {
delete(c.inProgress, job.Id)
}
// Requested but not started jobs.
if job.Status == types.JOB_STATUS_REQUESTED {
c.requested[job.Id] = job
} else {
delete(c.requested, job.Id)
}
if isUpdate {
// Loop in case there are multiple tasks with the same Created time.
for i := searchJobSlice(c.jobsByTime, job.Created); i < len(c.jobsByTime); i++ {
other := c.jobsByTime[i]
if other.Id == job.Id {
c.jobsByTime[i] = job
break
}
if !other.Created.Equal(job.Created) {
panic(fmt.Sprintf("jobCache inconsistent; c.jobs contains job not in c.jobsByTime. old: %v, task: %v", old, job))
}
}
} else {
// If profiling indicates this code is slow or GCs too much, see
// https://skia.googlesource.com/buildbot/+show/0cf94832dd57f0e7b5b9f1b28546181d15dbbbc6
// for a different implementation.
// Most common case is that the new job should be inserted at the end.
if len(c.jobsByTime) == 0 {
c.jobsByTime = append(make([]*types.Job, 0, jobsInitCapacity), job)
} else if lastJob := c.jobsByTime[len(c.jobsByTime)-1]; !job.Created.Before(lastJob.Created) {
c.jobsByTime = append(c.jobsByTime, job)
} else {
insertIdx := searchJobSlice(c.jobsByTime, job.Created)
// Extend size by one:
c.jobsByTime = append(c.jobsByTime, nil)
// Move later elements out of the way:
copy(c.jobsByTime[insertIdx+1:], c.jobsByTime[insertIdx:])
// Assign at the correct index:
c.jobsByTime[insertIdx] = job
}
}
}
// See documentation for JobCache interface.
func (c *jobCache) Update(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "jobcache_Update")
defer span.End()
c.mtx.Lock()
defer c.mtx.Unlock()
c.modMtx.Lock()
defer c.modMtx.Unlock()
c.lastUpdated = time.Now()
c.expireJobs()
for _, job := range c.modified {
if c.timeWindow.TestTime(job.Repo, job.Created) {
c.insertOrUpdateJob(job)
}
}
// Debugging for https://bugs.chromium.org/p/skia/issues/detail?id=9444
if !jobSliceIsSorted(c.jobsByTime) {
sklog.Errorf("jobsByTime is not sorted after Update of %v", c.modified)
}
c.modified = map[string]*types.Job{}
return nil
}
// See documentation for JobCache interface.
func (c *jobCache) AddJobs(jobs []*types.Job) {
c.mtx.Lock()
defer c.mtx.Unlock()
for _, job := range jobs {
if c.timeWindow.TestTime(job.Repo, job.Created) {
c.insertOrUpdateJob(job.Copy())
}
}
// Debugging for https://bugs.chromium.org/p/skia/issues/detail?id=9444
if !jobSliceIsSorted(c.jobsByTime) {
sklog.Errorf("jobsByTime is not sorted after AddJobs of %v", jobs)
}
}
// See documentation for JobCache interface.
func (c *jobCache) LastUpdated() time.Time {
c.mtx.RLock()
defer c.mtx.RUnlock()
return c.lastUpdated
}
// NewJobCache returns a local cache which provides more convenient views of
// job data than the database can provide. The last parameter is a callback
// function which is called when modified tasks are received from the DB. This
// is used for testing.
func NewJobCache(ctx context.Context, d db.JobReader, timeWindow window.Window, onModifiedJobs func()) (JobCache, error) {
mod := d.ModifiedJobsCh(ctx)
jobs, err := db.GetJobsFromWindow(ctx, d, timeWindow)
if err != nil {
return nil, err
}
c := &jobCache{
db: d,
jobs: map[string]*types.Job{},
jobsByNameAndState: map[types.RepoState]map[string]map[string]*types.Job{},
requested: map[string]*types.Job{},
inProgress: map[string]*types.Job{},
modified: map[string]*types.Job{},
timeWindow: timeWindow,
onModFn: onModifiedJobs,
}
for _, job := range jobs {
if c.timeWindow.TestTime(job.Repo, job.Created) {
c.insertOrUpdateJob(job)
}
}
// Debugging for https://bugs.chromium.org/p/skia/issues/detail?id=9444
if !jobSliceIsSorted(c.jobsByTime) {
return nil, fmt.Errorf("jobsByTime is not sorted after initial load")
}
go func() {
for jobs := range mod {
now := time.Now()
c.modMtx.Lock()
for _, job := range jobs {
if old, ok := c.modified[job.Id]; !ok || job.DbModified.After(old.DbModified) {
c.modified[job.Id] = job
}
// Log a warning if the modified-jobs channel is lagging.
latency := now.Sub(job.DbModified)
if latency > 5*time.Minute {
sklog.Warningf("modified-job latency for job %s (build %s) is %s", job.Id, job.BuildbucketBuildId, latency)
} else if latency > 2*time.Minute {
sklog.Debugf("modified-job latency for job %s (build %s) is %s", job.Id, job.BuildbucketBuildId, latency)
}
}
c.modMtx.Unlock()
if c.onModFn != nil {
c.onModFn()
}
}
sklog.Warning("Modified jobs channel closed; is this expected?")
}()
return c, nil
}