blob: 4614b153fb7b24e8296fbcf8a31e070f13a56e0f [file] [log] [blame]
package tryjobs
import (
"sort"
"sync"
"time"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/types"
"go.skia.org/infra/task_scheduler/go/window"
)
// tryJobCache is a struct which provides more useful views of Jobs than the
// database itself can.
type tryJobCache struct {
activeTryJobs map[string]*types.Job
db db.JobDB
mtx sync.RWMutex
queryId string
timeWindow *window.Window
}
// GetActiveTryJobs returns all active try Jobs. A try Job is
// considered to be active if it has a non-zero Buildbucket lease key.
func (c *tryJobCache) GetActiveTryJobs() ([]*types.Job, error) {
c.mtx.RLock()
defer c.mtx.RUnlock()
rv := make([]*types.Job, 0, len(c.activeTryJobs))
for _, j := range c.activeTryJobs {
rv = append(rv, j.Copy())
}
// Sort to maintain deterministic testing.
sort.Sort(types.JobSlice(rv))
return rv, nil
}
// expireJobs removes data from c where getJobTimestamp or getRevisionTimestamp
// is outside of the Window. Assumes the caller holds a lock. This is a helper
// for expireAndUpdate.
func (c *tryJobCache) expireJobs() {
expiredUnfinishedCount := 0
for _, job := range c.activeTryJobs {
if !c.timeWindow.TestTime(job.Repo, job.Created) {
delete(c.activeTryJobs, job.Id)
if !job.Done() {
expiredUnfinishedCount++
}
}
}
if expiredUnfinishedCount > 0 {
sklog.Infof("Expired %d unfinished jobs created before the window.", expiredUnfinishedCount)
}
}
// insertOrUpdateJob inserts the new/updated job into the cache. Assumes the
// caller holds a lock. This is a helper for expireAndUpdate.
func (c *tryJobCache) insertOrUpdateJob(job *types.Job) {
// Active try jobs.
if job.BuildbucketLeaseKey == 0 {
delete(c.activeTryJobs, job.Id)
} else {
c.activeTryJobs[job.Id] = job
}
}
// expireAndUpdate removes Jobs before the Window and inserts the
// new/updated jobs into the cache. Assumes the caller holds a lock.
func (c *tryJobCache) expireAndUpdate(jobs []*types.Job) {
c.expireJobs()
for _, job := range jobs {
if !c.timeWindow.TestTime(job.Repo, job.Created) {
//sklog.Warningf("Updated job %s after expired. getJobTimestamp returned %s. %#v", job.Id, job.Created, job)
} else {
c.insertOrUpdateJob(job.Copy())
}
}
}
// reset re-initializes c. Assumes the caller holds a lock.
func (c *tryJobCache) reset() error {
if c.queryId != "" {
c.db.StopTrackingModifiedJobs(c.queryId)
}
queryId, err := c.db.StartTrackingModifiedJobs()
if err != nil {
return err
}
now := time.Now()
start := c.timeWindow.EarliestStart()
sklog.Infof("Reading Jobs from %s to %s.", start, now)
jobs, err := c.db.GetJobsFromDateRange(start, now)
if err != nil {
c.db.StopTrackingModifiedJobs(queryId)
return err
}
c.activeTryJobs = map[string]*types.Job{}
c.queryId = queryId
c.expireAndUpdate(jobs)
return nil
}
// See documentation for JobCache interface.
func (c *tryJobCache) Update() error {
newJobs, err := c.db.GetModifiedJobs(c.queryId)
c.mtx.Lock()
defer c.mtx.Unlock()
if err != nil {
sklog.Warningf("Connection to db lost; re-initializing cache from scratch.")
if err := c.reset(); err != nil {
return err
}
return nil
}
c.expireAndUpdate(newJobs)
return nil
}
// newTryJobCache returns a local cache which provides more convenient views of
// job data than the database can provide.
func newTryJobCache(db db.JobDB, timeWindow *window.Window) (*tryJobCache, error) {
tc := &tryJobCache{
db: db,
timeWindow: timeWindow,
}
if err := tc.reset(); err != nil {
return nil, err
}
return tc, nil
}