blob: 7f6b84dc207c764339a4a2a73eb9f0375604f3d1 [file] [log] [blame]
package memory
import (
"fmt"
"sort"
"sync"
"time"
"github.com/google/uuid"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/db/modified"
"go.skia.org/infra/task_scheduler/go/types"
)
type inMemoryTaskDB struct {
tasks map[string]*types.Task
tasksMtx sync.RWMutex
db.ModifiedTasks
}
// See docs for TaskDB interface. Does not take any locks.
func (d *inMemoryTaskDB) AssignId(t *types.Task) error {
if t.Id != "" {
return fmt.Errorf("Task Id already assigned: %v", t.Id)
}
t.Id = uuid.New().String()
return nil
}
// See docs for TaskDB interface.
func (d *inMemoryTaskDB) GetTaskById(id string) (*types.Task, error) {
d.tasksMtx.RLock()
defer d.tasksMtx.RUnlock()
if task := d.tasks[id]; task != nil {
return task.Copy(), nil
}
return nil, nil
}
// See docs for TaskDB interface.
func (d *inMemoryTaskDB) GetTasksFromDateRange(start, end time.Time, repo string) ([]*types.Task, error) {
d.tasksMtx.RLock()
defer d.tasksMtx.RUnlock()
rv := []*types.Task{}
// TODO(borenet): Binary search.
for _, b := range d.tasks {
if (b.Created.Equal(start) || b.Created.After(start)) && b.Created.Before(end) {
if repo == "" || b.Repo == repo {
rv = append(rv, b.Copy())
}
}
}
sort.Sort(types.TaskSlice(rv))
return rv, nil
}
// See docs for TaskDB interface.
func (d *inMemoryTaskDB) PutTask(task *types.Task) error {
return d.PutTasks([]*types.Task{task})
}
// See docs for TaskDB interface.
func (d *inMemoryTaskDB) PutTasks(tasks []*types.Task) error {
d.tasksMtx.Lock()
defer d.tasksMtx.Unlock()
// Validate.
for _, task := range tasks {
if util.TimeIsZero(task.Created) {
return fmt.Errorf("Created not set. Task %s created time is %s. %v", task.Id, task.Created, task)
}
if existing := d.tasks[task.Id]; existing != nil {
if !existing.DbModified.Equal(task.DbModified) {
sklog.Warningf("Cached Task has been modified in the DB. Current:\n%v\nCached:\n%v", existing, task)
return db.ErrConcurrentUpdate
}
}
}
// Insert.
for _, task := range tasks {
if task.Id == "" {
if err := d.AssignId(task); err != nil {
// Should never happen.
return err
}
}
task.DbModified = time.Now()
// TODO(borenet): Keep tasks in a sorted slice.
d.tasks[task.Id] = task.Copy()
d.TrackModifiedTask(task)
}
return nil
}
// NewInMemoryTaskDB returns an extremely simple, inefficient, in-memory TaskDB
// implementation.
func NewInMemoryTaskDB(modTasks db.ModifiedTasks) db.TaskDB {
if modTasks == nil {
modTasks = &modified.ModifiedTasksImpl{}
}
db := &inMemoryTaskDB{
tasks: map[string]*types.Task{},
ModifiedTasks: modTasks,
}
return db
}
type inMemoryJobDB struct {
jobs map[string]*types.Job
jobsMtx sync.RWMutex
db.ModifiedJobs
}
func (d *inMemoryJobDB) assignId(j *types.Job) error {
if j.Id != "" {
return fmt.Errorf("Job Id already assigned: %v", j.Id)
}
j.Id = uuid.New().String()
return nil
}
// See docs for JobDB interface.
func (d *inMemoryJobDB) GetJobById(id string) (*types.Job, error) {
d.jobsMtx.RLock()
defer d.jobsMtx.RUnlock()
if job := d.jobs[id]; job != nil {
return job.Copy(), nil
}
return nil, nil
}
// See docs for JobDB interface.
func (d *inMemoryJobDB) GetJobsFromDateRange(start, end time.Time) ([]*types.Job, error) {
d.jobsMtx.RLock()
defer d.jobsMtx.RUnlock()
rv := []*types.Job{}
// TODO(borenet): Binary search.
for _, b := range d.jobs {
if (b.Created.Equal(start) || b.Created.After(start)) && b.Created.Before(end) {
rv = append(rv, b.Copy())
}
}
sort.Sort(types.JobSlice(rv))
return rv, nil
}
// See docs for JobDB interface.
func (d *inMemoryJobDB) PutJob(job *types.Job) error {
return d.PutJobs([]*types.Job{job})
}
// See docs for JobDB interface.
func (d *inMemoryJobDB) PutJobs(jobs []*types.Job) error {
d.jobsMtx.Lock()
defer d.jobsMtx.Unlock()
// Validate.
for _, job := range jobs {
if util.TimeIsZero(job.Created) {
return fmt.Errorf("Created not set. Job %s created time is %s. %v", job.Id, job.Created, job)
}
if existing := d.jobs[job.Id]; existing != nil {
if !existing.DbModified.Equal(job.DbModified) {
sklog.Warningf("Cached Job has been modified in the DB. Current:\n%v\nCached:\n%v", existing, job)
return db.ErrConcurrentUpdate
}
}
}
// Insert.
for _, job := range jobs {
if job.Id == "" {
if err := d.assignId(job); err != nil {
// Should never happen.
return err
}
}
job.DbModified = time.Now()
// TODO(borenet): Keep jobs in a sorted slice.
d.jobs[job.Id] = job.Copy()
d.TrackModifiedJob(job)
}
return nil
}
// NewInMemoryJobDB returns an extremely simple, inefficient, in-memory JobDB
// implementation.
func NewInMemoryJobDB(modJobs db.ModifiedJobs) db.JobDB {
if modJobs == nil {
modJobs = &modified.ModifiedJobsImpl{}
}
db := &inMemoryJobDB{
jobs: map[string]*types.Job{},
ModifiedJobs: modJobs,
}
return db
}
// NewInMemoryDB returns an extremely simple, inefficient, in-memory DB
// implementation.
func NewInMemoryDB(mod db.ModifiedData) db.DB {
if mod == nil {
mod = modified.NewModifiedData()
}
return db.NewDB(NewInMemoryTaskDB(mod), NewInMemoryJobDB(mod), &CommentBox{ModifiedComments: mod})
}