blob: dc8b5725ac9842123714a6f74a77628c3a98c4a9 [file] [log] [blame]
package db
import (
"context"
"errors"
"io"
"sort"
"time"
"go.opencensus.io/trace"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/now"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/task_scheduler/go/types"
"go.skia.org/infra/task_scheduler/go/window"
)
const (
// Retries attempted by UpdateTasksWithRetries.
NUM_RETRIES = 5
// SearchResultLimit is the maximum number of results returned by SearchJobs
// or SearchTasks.
SearchResultLimit = 500
)
var (
ErrAlreadyExists = errors.New("Object already exists and modification not allowed.")
ErrConcurrentUpdate = errors.New("Concurrent update")
ErrNotFound = errors.New("Task/Job with given ID does not exist")
ErrTooManyUsers = errors.New("Too many users")
ErrUnknownId = types.ErrUnknownId
ErrDoneSearching = errors.New("Done searching")
)
func IsAlreadyExists(e error) bool {
return e != nil && e.Error() == ErrAlreadyExists.Error()
}
func IsConcurrentUpdate(e error) bool {
return e != nil && e.Error() == ErrConcurrentUpdate.Error()
}
func IsNotFound(e error) bool {
return e != nil && e.Error() == ErrNotFound.Error()
}
func IsTooManyUsers(e error) bool {
return e != nil && e.Error() == ErrTooManyUsers.Error()
}
func IsUnknownId(e error) bool {
return e != nil && e.Error() == ErrUnknownId.Error()
}
// TaskReader is a read-only view of a TaskDB.
type TaskReader interface {
// GetTaskById returns the task with the given Id field. Returns nil, nil if
// task is not found.
GetTaskById(context.Context, string) (*types.Task, error)
// GetTasksFromDateRange retrieves all tasks with Created in the given range.
// The returned tasks are sorted by Created timestamp. The string field is
// an optional repository; if provided, only return tasks associated with
// that repo.
GetTasksFromDateRange(context.Context, time.Time, time.Time, string) ([]*types.Task, error)
// ModifiedTasksCh returns a channel which produces Tasks as they are
// modified in the DB. The channel is closed when the given Context is
// canceled. The channel will immediately produce a slice of Tasks which
// may or may not be empty.
ModifiedTasksCh(context.Context) <-chan []*types.Task
// SearchTasks retrieves all matching Tasks from the DB.
SearchTasks(context.Context, *TaskSearchParams) ([]*types.Task, error)
}
// TaskDB is used by the task scheduler to store Tasks.
type TaskDB interface {
TaskReader
// AssignId sets the given task's Id field. Does not insert the task into the
// database.
AssignId(context.Context, *types.Task) error
// PutTask inserts or updates the Task in the database. Task's Id field must
// be empty or set with AssignId. PutTask will set Task.DbModified.
PutTask(context.Context, *types.Task) error
// PutTasks inserts or updates the Tasks in the database. Each Task's Id field
// must be empty or set with AssignId. Each Task's DbModified field will be
// set. All modifications are performed in a single transaction; the
// caller must determine what to do when there are too many Tasks to be
// inserted at once for a given TaskDB implementation. Use only when
// consistency is important; otherwise, callers should use
// PutTasksInChunks.
PutTasks(context.Context, []*types.Task) error
// PutTasksInChunks is like PutTasks but inserts Tasks in multiple
// transactions. Not appropriate for updates in which consistency is
// important.
PutTasksInChunks(context.Context, []*types.Task) error
}
// UpdateTasksWithRetries wraps a call to db.PutTasks with retries. It calls
// db.PutTasks(f()) repeatedly until one of the following happen:
// - f or db.PutTasks returns an error, which is then returned from
// UpdateTasksWithRetries;
// - PutTasks succeeds, in which case UpdateTasksWithRetries returns the updated
// Tasks returned by f;
// - retries are exhausted, in which case UpdateTasksWithRetries returns
// ErrConcurrentUpdate.
//
// Within f, tasks should be refreshed from the DB, e.g. with
// db.GetModifiedTasks or db.GetTaskById.
func UpdateTasksWithRetries(ctx context.Context, db TaskDB, f func() ([]*types.Task, error)) ([]*types.Task, error) {
ctx, span := trace.StartSpan(ctx, "db_UpdateTasksWithRetries", trace.WithSampler(trace.ProbabilitySampler(0.01)))
defer span.End()
var lastErr error
for i := 0; i < NUM_RETRIES; i++ {
t, err := f()
if err != nil {
return nil, err
}
lastErr = db.PutTasks(ctx, t)
if lastErr == nil {
return t, nil
} else if !IsConcurrentUpdate(lastErr) {
return nil, lastErr
}
}
sklog.Warningf("UpdateWithRetries: %d consecutive ErrConcurrentUpdate.", NUM_RETRIES)
return nil, lastErr
}
// UpdateTaskWithRetries reads, updates, and writes a single Task in the DB. It:
// 1. reads the task with the given id,
// 2. calls f on that task, and
// 3. calls db.PutTask() on the updated task
// 4. repeats from step 1 as long as PutTasks returns ErrConcurrentUpdate and
// retries have not been exhausted.
//
// Returns the updated task if it was successfully updated in the DB.
// Immediately returns ErrNotFound if db.GetTaskById(id) returns nil.
// Immediately returns any error returned from f or from PutTasks (except
// ErrConcurrentUpdate). Returns ErrConcurrentUpdate if retries are exhausted.
func UpdateTaskWithRetries(ctx context.Context, db TaskDB, id string, f func(*types.Task) error) (*types.Task, error) {
ctx, span := trace.StartSpan(ctx, "db_UpdateTaskWithRetries", trace.WithSampler(trace.ProbabilitySampler(0.01)))
span.AddAttributes(trace.StringAttribute("task_id", id))
defer span.End()
tasks, err := UpdateTasksWithRetries(ctx, db, func() ([]*types.Task, error) {
t, err := db.GetTaskById(ctx, id)
if err != nil {
return nil, err
}
if t == nil {
return nil, ErrNotFound
}
err = f(t)
if err != nil {
return nil, err
}
return []*types.Task{t}, nil
})
if err != nil {
return nil, err
} else {
return tasks[0], nil
}
}
// JobReader is a read-only view of a JobDB.
type JobReader interface {
// GetJobById returns the job with the given Id field. Returns nil, nil if
// job is not found.
GetJobById(context.Context, string) (*types.Job, error)
// GetJobsFromDateRange retrieves all jobs with Created in the given
// range. The returned jobs are sorted by Created timestamp. The string
// field is an optional repository; if provided, only return tasks
// associated with that repo.
GetJobsFromDateRange(context.Context, time.Time, time.Time, string) ([]*types.Job, error)
// ModifiedJobsCh returns a channel which produces Jobs as they are
// modified in the DB. The channel is closed when the given Context is
// canceled. The channel will immediately produce a slice of Jobs which
// may or may not be empty.
ModifiedJobsCh(context.Context) <-chan []*types.Job
// SearchJobs retrieves all matching Jobs from the DB.
SearchJobs(context.Context, *JobSearchParams) ([]*types.Job, error)
}
// JobDB is used by the task scheduler to store Jobs.
type JobDB interface {
JobReader
// PutJob inserts or updates the Job in the database. Job's Id field
// must be empty if it is a new Job. PutJob will set Job.DbModified.
PutJob(context.Context, *types.Job) error
// PutJobs inserts or updates the Jobs in the database. Each Jobs' Id
// field must be empty if it is a new Job. Each Jobs' DbModified field
// will be set. All modifications are performed in a single transaction;
// the caller must determine what to do when there are too many Jobs to
// be inserted at once for a given JobDB implementation. Use only when
// consistency is important; otherwise, callers should use
// PutJobsInChunks.
PutJobs(context.Context, []*types.Job) error
// PutJobsInChunks is like PutJobs but inserts Jobs in multiple
// transactions. Not appropriate for updates in which consistency is
// important.
PutJobsInChunks(context.Context, []*types.Job) error
}
// JobSearchParams are parameters on which Jobs may be searched. All fields
// are optional; if a field is not provided, the search will return Jobs with
// any value for that field. If TimeEnd is not provided the search terminates at
// the current time. If TimeStart is not provided the search begins 24 hours
// before TimeEnd.
type JobSearchParams struct {
BuildbucketBuildID *int64 `json:"buildbucket_build_id,string,omitempty"`
IsForce *bool `json:"is_force,omitempty"`
Issue *string `json:"issue,omitempty"`
Name *string `json:"name"`
Patchset *string `json:"patchset,omitempty"`
Repo *string `json:"repo,omitempty"`
Revision *string `json:"revision,omitempty"`
Status *types.JobStatus `json:"status"`
TimeStart *time.Time `json:"time_start"`
TimeEnd *time.Time `json:"time_end"`
}
// SearchBoolEqual compares the two bools and returns true if the first is
// nil or equal to the second.
func SearchBoolEqual(search *bool, test bool) bool {
if search == nil {
return true
}
return *search == test
}
// SearchInt64Equal compares the two int64s and returns true if the first is
// either nil or equal to the second.
func SearchInt64Equal(search *int64, test int64) bool {
if search == nil {
return true
}
return *search == test
}
// SearchStringEqual compares the two strings and returns true if the first is
// either not provided or equal to the second.
func SearchStringEqual(search *string, test string) bool {
if search == nil {
return true
}
return *search == test
}
// SearchStatusEqual compares the two strings and returns true if the first is
// either not provided or equal to the second.
func SearchStatusEqual(search *string, test string) bool {
if search == nil {
return true
}
return *search == test
}
// MatchJob returns true if the given Job matches the given search parameters.
func MatchJob(j *types.Job, p *JobSearchParams) bool {
// Compare all attributes which are provided.
return true &&
(p.TimeStart == nil || !(*p.TimeStart).After(j.Created)) &&
(p.TimeEnd == nil || j.Created.Before(*p.TimeEnd)) &&
SearchStringEqual(p.Issue, j.Issue) &&
SearchStringEqual(p.Name, j.Name) &&
SearchStringEqual(p.Patchset, j.Patchset) &&
SearchStringEqual(p.Repo, j.Repo) &&
SearchStringEqual(p.Revision, j.Revision) &&
SearchStatusEqual((*string)(p.Status), string(j.Status)) &&
SearchBoolEqual(p.IsForce, j.IsForce) &&
SearchInt64Equal(p.BuildbucketBuildID, j.BuildbucketBuildId)
}
// FilterJobs filters the given slice of Jobs to those which match the given
// search parameters. Provided for use by implementations of
// JobReader.SearchJobs.
func FilterJobs(jobs []*types.Job, p *JobSearchParams) []*types.Job {
rv := []*types.Job{}
for _, j := range jobs {
if MatchJob(j, p) {
rv = append(rv, j)
}
}
return rv
}
// MatchTask returns true if the given Task matches the given search parameters.
func MatchTask(t *types.Task, p *TaskSearchParams) bool {
// Compare all attributes which are provided.
return true &&
(p.TimeStart == nil || !(*p.TimeStart).After(t.Created)) &&
(p.TimeEnd == nil || t.Created.Before(*p.TimeEnd)) &&
SearchInt64Equal(p.Attempt, int64(t.Attempt)) &&
SearchStringEqual(p.Issue, t.Issue) &&
SearchStringEqual(p.Patchset, t.Patchset) &&
SearchStringEqual(p.Repo, t.Repo) &&
SearchStringEqual(p.Revision, t.Revision) &&
SearchStringEqual(p.Name, t.Name) &&
SearchStatusEqual((*string)(p.Status), string(t.Status)) &&
SearchStringEqual(p.ForcedJobId, t.ForcedJobId)
}
// FilterTasks filters the given slice of Tasks to those which match the given
// search parameters. Provided for use by implementations of
// TaskReader.SearchTasks.
func FilterTasks(tasks []*types.Task, p *TaskSearchParams) []*types.Task {
rv := []*types.Task{}
for _, t := range tasks {
if MatchTask(t, p) {
rv = append(rv, t)
}
}
return rv
}
// TaskSearchParams are parameters on which Tasks may be searched. All fields
// are optional; if a field is not provided, the search will return Tasks with
// any value for that field. If TimeEnd is not provided the search terminates at
// the current time. If TimeStart is not provided the search begins 24 hours
// before TimeEnd.
type TaskSearchParams struct {
Attempt *int64 `json:"attempt,string,omitempty"`
Status *types.TaskStatus `json:"status"`
ForcedJobId *string `json:"forcedJobId,omitempty"`
Issue *string `json:"issue,omitempty"`
Name *string `json:"name"`
Patchset *string `json:"patchset,omitempty"`
Repo *string `json:"repo,omitempty"`
Revision *string `json:"revision,omitempty"`
TimeStart *time.Time `json:"time_start"`
TimeEnd *time.Time `json:"time_end"`
}
// RemoteDB allows retrieving tasks and jobs and full access to comments.
type RemoteDB interface {
TaskReader
JobReader
CommentDB
}
// DB implements TaskDB, JobDB, and CommentDB.
type DB interface {
TaskDB
JobDB
CommentDB
}
// DBCloser is a DB that must be closed when no longer in use.
type DBCloser interface {
io.Closer
DB
}
// federatedDB joins an independent TaskDB, JobDB, and CommentDB into a DB.
type federatedDB struct {
TaskDB
JobDB
CommentDB
}
// NewDB returns a DB that delegates to independent TaskDB, JobDB, and
// CommentDB.
func NewDB(tdb TaskDB, jdb JobDB, cdb CommentDB) DB {
return &federatedDB{
TaskDB: tdb,
JobDB: jdb,
CommentDB: cdb,
}
}
// GetTasksFromWindow returns all tasks matching the given Window from the
// TaskReader.
func GetTasksFromWindow(ctx context.Context, db TaskReader, w window.Window) ([]*types.Task, error) {
ctx, span := trace.StartSpan(ctx, "db_GetTasksFromWindow")
defer span.End()
defer metrics2.FuncTimer().Stop()
startTimesByRepo := w.StartTimesByRepo()
if len(startTimesByRepo) == 0 {
// If the timeWindow has no associated repos, default to loading
// tasks for all repos from the beginning of the timeWindow.
startTimesByRepo[""] = w.EarliestStart()
}
currentTime := now.Now(ctx)
tasks := make([]*types.Task, 0, 1024)
for repo, start := range startTimesByRepo {
sklog.Infof("Reading Tasks in %s from %s to %s.", repo, start, currentTime)
t0 := time.Now()
t, err := db.GetTasksFromDateRange(ctx, start, currentTime, repo)
if err != nil {
return nil, err
}
sklog.Infof("Read %d tasks from %s in %s", len(t), repo, time.Now().Sub(t0))
tasks = append(tasks, t...)
}
sort.Sort(types.TaskSlice(tasks))
return tasks, nil
}
// GetJobsFromWindow returns all jobs matching the given Window from the
// JobReader.
func GetJobsFromWindow(ctx context.Context, db JobReader, w window.Window) ([]*types.Job, error) {
ctx, span := trace.StartSpan(ctx, "db_GetJobsFromWindow")
defer span.End()
defer metrics2.FuncTimer().Stop()
startTimesByRepo := w.StartTimesByRepo()
if len(startTimesByRepo) == 0 {
// If the timeWindow has no associated repos, default to loading
// tasks for all repos from the beginning of the timeWindow.
startTimesByRepo[""] = w.EarliestStart()
}
currentTime := now.Now(ctx)
jobs := make([]*types.Job, 0, 1024)
for repo, start := range startTimesByRepo {
sklog.Infof("Reading Jobs in %s from %s to %s.", repo, start, currentTime)
t0 := time.Now()
j, err := db.GetJobsFromDateRange(ctx, start, currentTime, repo)
if err != nil {
return nil, err
}
sklog.Infof("Read %d jobs from %s in %s", len(j), repo, time.Now().Sub(t0))
jobs = append(jobs, j...)
}
sort.Sort(types.JobSlice(jobs))
return jobs, nil
}
var errNotModified = errors.New("Task not modified")
// UpdateDBFromTaskResult updates a task in db from data in s.
func UpdateDBFromTaskResult(ctx context.Context, db TaskDB, res *types.TaskResult) (bool, error) {
ctx, span := trace.StartSpan(ctx, "db_UpdateDBFromTaskResult", trace.WithSampler(trace.ProbabilitySampler(0.01)))
defer span.End()
id, ok := res.Tags[types.SWARMING_TAG_ID]
if !ok || len(id) == 0 {
return false, skerr.Fmt("missing %s tag", types.SWARMING_TAG_ID)
}
_, err := UpdateTaskWithRetries(ctx, db, id[0], func(task *types.Task) error {
modified, err := task.UpdateFromTaskResult(res)
if err != nil {
return err
}
if !modified {
return errNotModified
}
return nil
})
if err == errNotModified {
return false, nil
} else if err != nil {
return false, err
} else {
return true, nil
}
}