| package db |
| |
| import ( |
| "context" |
| "errors" |
| "io" |
| "sort" |
| "time" |
| |
| "go.opencensus.io/trace" |
| "go.skia.org/infra/go/metrics2" |
| "go.skia.org/infra/go/now" |
| "go.skia.org/infra/go/skerr" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/task_scheduler/go/types" |
| "go.skia.org/infra/task_scheduler/go/window" |
| ) |
| |
| const ( |
| // Retries attempted by UpdateTasksWithRetries. |
| NUM_RETRIES = 5 |
| |
| // SearchResultLimit is the maximum number of results returned by SearchJobs |
| // or SearchTasks. |
| SearchResultLimit = 500 |
| ) |
| |
| var ( |
| ErrAlreadyExists = errors.New("Object already exists and modification not allowed.") |
| ErrConcurrentUpdate = errors.New("Concurrent update") |
| ErrNotFound = errors.New("Task/Job with given ID does not exist") |
| ErrTooManyUsers = errors.New("Too many users") |
| ErrUnknownId = types.ErrUnknownId |
| ErrDoneSearching = errors.New("Done searching") |
| ) |
| |
| func IsAlreadyExists(e error) bool { |
| return e != nil && e.Error() == ErrAlreadyExists.Error() |
| } |
| |
| func IsConcurrentUpdate(e error) bool { |
| return e != nil && e.Error() == ErrConcurrentUpdate.Error() |
| } |
| |
| func IsNotFound(e error) bool { |
| return e != nil && e.Error() == ErrNotFound.Error() |
| } |
| |
| func IsTooManyUsers(e error) bool { |
| return e != nil && e.Error() == ErrTooManyUsers.Error() |
| } |
| |
| func IsUnknownId(e error) bool { |
| return e != nil && e.Error() == ErrUnknownId.Error() |
| } |
| |
| // TaskReader is a read-only view of a TaskDB. |
| type TaskReader interface { |
| // GetTaskById returns the task with the given Id field. Returns nil, nil if |
| // task is not found. |
| GetTaskById(context.Context, string) (*types.Task, error) |
| |
| // GetTasksFromDateRange retrieves all tasks with Created in the given range. |
| // The returned tasks are sorted by Created timestamp. The string field is |
| // an optional repository; if provided, only return tasks associated with |
| // that repo. |
| GetTasksFromDateRange(context.Context, time.Time, time.Time, string) ([]*types.Task, error) |
| |
| // ModifiedTasksCh returns a channel which produces Tasks as they are |
| // modified in the DB. The channel is closed when the given Context is |
| // canceled. The channel will immediately produce a slice of Tasks which |
| // may or may not be empty. |
| ModifiedTasksCh(context.Context) <-chan []*types.Task |
| |
| // SearchTasks retrieves all matching Tasks from the DB. |
| SearchTasks(context.Context, *TaskSearchParams) ([]*types.Task, error) |
| } |
| |
| // TaskDB is used by the task scheduler to store Tasks. |
| type TaskDB interface { |
| TaskReader |
| |
| // AssignId sets the given task's Id field. Does not insert the task into the |
| // database. |
| AssignId(context.Context, *types.Task) error |
| |
| // PutTask inserts or updates the Task in the database. Task's Id field must |
| // be empty or set with AssignId. PutTask will set Task.DbModified. |
| PutTask(context.Context, *types.Task) error |
| |
| // PutTasks inserts or updates the Tasks in the database. Each Task's Id field |
| // must be empty or set with AssignId. Each Task's DbModified field will be |
| // set. All modifications are performed in a single transaction; the |
| // caller must determine what to do when there are too many Tasks to be |
| // inserted at once for a given TaskDB implementation. Use only when |
| // consistency is important; otherwise, callers should use |
| // PutTasksInChunks. |
| PutTasks(context.Context, []*types.Task) error |
| |
| // PutTasksInChunks is like PutTasks but inserts Tasks in multiple |
| // transactions. Not appropriate for updates in which consistency is |
| // important. |
| PutTasksInChunks(context.Context, []*types.Task) error |
| } |
| |
| // UpdateTasksWithRetries wraps a call to db.PutTasks with retries. It calls |
| // db.PutTasks(f()) repeatedly until one of the following happen: |
| // - f or db.PutTasks returns an error, which is then returned from |
| // UpdateTasksWithRetries; |
| // - PutTasks succeeds, in which case UpdateTasksWithRetries returns the updated |
| // Tasks returned by f; |
| // - retries are exhausted, in which case UpdateTasksWithRetries returns |
| // ErrConcurrentUpdate. |
| // |
| // Within f, tasks should be refreshed from the DB, e.g. with |
| // db.GetModifiedTasks or db.GetTaskById. |
| func UpdateTasksWithRetries(ctx context.Context, db TaskDB, f func() ([]*types.Task, error)) ([]*types.Task, error) { |
| ctx, span := trace.StartSpan(ctx, "db_UpdateTasksWithRetries", trace.WithSampler(trace.ProbabilitySampler(0.01))) |
| defer span.End() |
| var lastErr error |
| for i := 0; i < NUM_RETRIES; i++ { |
| t, err := f() |
| if err != nil { |
| return nil, err |
| } |
| lastErr = db.PutTasks(ctx, t) |
| if lastErr == nil { |
| return t, nil |
| } else if !IsConcurrentUpdate(lastErr) { |
| return nil, lastErr |
| } |
| } |
| sklog.Warningf("UpdateWithRetries: %d consecutive ErrConcurrentUpdate.", NUM_RETRIES) |
| return nil, lastErr |
| } |
| |
| // UpdateTaskWithRetries reads, updates, and writes a single Task in the DB. It: |
| // 1. reads the task with the given id, |
| // 2. calls f on that task, and |
| // 3. calls db.PutTask() on the updated task |
| // 4. repeats from step 1 as long as PutTasks returns ErrConcurrentUpdate and |
| // retries have not been exhausted. |
| // |
| // Returns the updated task if it was successfully updated in the DB. |
| // Immediately returns ErrNotFound if db.GetTaskById(id) returns nil. |
| // Immediately returns any error returned from f or from PutTasks (except |
| // ErrConcurrentUpdate). Returns ErrConcurrentUpdate if retries are exhausted. |
| func UpdateTaskWithRetries(ctx context.Context, db TaskDB, id string, f func(*types.Task) error) (*types.Task, error) { |
| ctx, span := trace.StartSpan(ctx, "db_UpdateTaskWithRetries", trace.WithSampler(trace.ProbabilitySampler(0.01))) |
| span.AddAttributes(trace.StringAttribute("task_id", id)) |
| defer span.End() |
| tasks, err := UpdateTasksWithRetries(ctx, db, func() ([]*types.Task, error) { |
| t, err := db.GetTaskById(ctx, id) |
| if err != nil { |
| return nil, err |
| } |
| if t == nil { |
| return nil, ErrNotFound |
| } |
| err = f(t) |
| if err != nil { |
| return nil, err |
| } |
| return []*types.Task{t}, nil |
| }) |
| if err != nil { |
| return nil, err |
| } else { |
| return tasks[0], nil |
| } |
| } |
| |
| // JobReader is a read-only view of a JobDB. |
| type JobReader interface { |
| // GetJobById returns the job with the given Id field. Returns nil, nil if |
| // job is not found. |
| GetJobById(context.Context, string) (*types.Job, error) |
| |
| // GetJobsFromDateRange retrieves all jobs with Created in the given |
| // range. The returned jobs are sorted by Created timestamp. The string |
| // field is an optional repository; if provided, only return tasks |
| // associated with that repo. |
| GetJobsFromDateRange(context.Context, time.Time, time.Time, string) ([]*types.Job, error) |
| |
| // ModifiedJobsCh returns a channel which produces Jobs as they are |
| // modified in the DB. The channel is closed when the given Context is |
| // canceled. The channel will immediately produce a slice of Jobs which |
| // may or may not be empty. |
| ModifiedJobsCh(context.Context) <-chan []*types.Job |
| |
| // SearchJobs retrieves all matching Jobs from the DB. |
| SearchJobs(context.Context, *JobSearchParams) ([]*types.Job, error) |
| } |
| |
| // JobDB is used by the task scheduler to store Jobs. |
| type JobDB interface { |
| JobReader |
| |
| // PutJob inserts or updates the Job in the database. Job's Id field |
| // must be empty if it is a new Job. PutJob will set Job.DbModified. |
| PutJob(context.Context, *types.Job) error |
| |
| // PutJobs inserts or updates the Jobs in the database. Each Jobs' Id |
| // field must be empty if it is a new Job. Each Jobs' DbModified field |
| // will be set. All modifications are performed in a single transaction; |
| // the caller must determine what to do when there are too many Jobs to |
| // be inserted at once for a given JobDB implementation. Use only when |
| // consistency is important; otherwise, callers should use |
| // PutJobsInChunks. |
| PutJobs(context.Context, []*types.Job) error |
| |
| // PutJobsInChunks is like PutJobs but inserts Jobs in multiple |
| // transactions. Not appropriate for updates in which consistency is |
| // important. |
| PutJobsInChunks(context.Context, []*types.Job) error |
| } |
| |
| // JobSearchParams are parameters on which Jobs may be searched. All fields |
| // are optional; if a field is not provided, the search will return Jobs with |
| // any value for that field. If TimeEnd is not provided the search terminates at |
| // the current time. If TimeStart is not provided the search begins 24 hours |
| // before TimeEnd. |
| type JobSearchParams struct { |
| BuildbucketBuildID *int64 `json:"buildbucket_build_id,string,omitempty"` |
| IsForce *bool `json:"is_force,omitempty"` |
| Issue *string `json:"issue,omitempty"` |
| Name *string `json:"name"` |
| Patchset *string `json:"patchset,omitempty"` |
| Repo *string `json:"repo,omitempty"` |
| Revision *string `json:"revision,omitempty"` |
| Status *types.JobStatus `json:"status"` |
| TimeStart *time.Time `json:"time_start"` |
| TimeEnd *time.Time `json:"time_end"` |
| } |
| |
| // SearchBoolEqual compares the two bools and returns true if the first is |
| // nil or equal to the second. |
| func SearchBoolEqual(search *bool, test bool) bool { |
| if search == nil { |
| return true |
| } |
| return *search == test |
| } |
| |
| // SearchInt64Equal compares the two int64s and returns true if the first is |
| // either nil or equal to the second. |
| func SearchInt64Equal(search *int64, test int64) bool { |
| if search == nil { |
| return true |
| } |
| return *search == test |
| } |
| |
| // SearchStringEqual compares the two strings and returns true if the first is |
| // either not provided or equal to the second. |
| func SearchStringEqual(search *string, test string) bool { |
| if search == nil { |
| return true |
| } |
| return *search == test |
| } |
| |
| // SearchStatusEqual compares the two strings and returns true if the first is |
| // either not provided or equal to the second. |
| func SearchStatusEqual(search *string, test string) bool { |
| if search == nil { |
| return true |
| } |
| return *search == test |
| } |
| |
| // MatchJob returns true if the given Job matches the given search parameters. |
| func MatchJob(j *types.Job, p *JobSearchParams) bool { |
| // Compare all attributes which are provided. |
| return true && |
| (p.TimeStart == nil || !(*p.TimeStart).After(j.Created)) && |
| (p.TimeEnd == nil || j.Created.Before(*p.TimeEnd)) && |
| SearchStringEqual(p.Issue, j.Issue) && |
| SearchStringEqual(p.Name, j.Name) && |
| SearchStringEqual(p.Patchset, j.Patchset) && |
| SearchStringEqual(p.Repo, j.Repo) && |
| SearchStringEqual(p.Revision, j.Revision) && |
| SearchStatusEqual((*string)(p.Status), string(j.Status)) && |
| SearchBoolEqual(p.IsForce, j.IsForce) && |
| SearchInt64Equal(p.BuildbucketBuildID, j.BuildbucketBuildId) |
| } |
| |
| // FilterJobs filters the given slice of Jobs to those which match the given |
| // search parameters. Provided for use by implementations of |
| // JobReader.SearchJobs. |
| func FilterJobs(jobs []*types.Job, p *JobSearchParams) []*types.Job { |
| rv := []*types.Job{} |
| for _, j := range jobs { |
| if MatchJob(j, p) { |
| rv = append(rv, j) |
| } |
| } |
| return rv |
| } |
| |
| // MatchTask returns true if the given Task matches the given search parameters. |
| func MatchTask(t *types.Task, p *TaskSearchParams) bool { |
| // Compare all attributes which are provided. |
| return true && |
| (p.TimeStart == nil || !(*p.TimeStart).After(t.Created)) && |
| (p.TimeEnd == nil || t.Created.Before(*p.TimeEnd)) && |
| SearchInt64Equal(p.Attempt, int64(t.Attempt)) && |
| SearchStringEqual(p.Issue, t.Issue) && |
| SearchStringEqual(p.Patchset, t.Patchset) && |
| SearchStringEqual(p.Repo, t.Repo) && |
| SearchStringEqual(p.Revision, t.Revision) && |
| SearchStringEqual(p.Name, t.Name) && |
| SearchStatusEqual((*string)(p.Status), string(t.Status)) && |
| SearchStringEqual(p.ForcedJobId, t.ForcedJobId) |
| } |
| |
| // FilterTasks filters the given slice of Tasks to those which match the given |
| // search parameters. Provided for use by implementations of |
| // TaskReader.SearchTasks. |
| func FilterTasks(tasks []*types.Task, p *TaskSearchParams) []*types.Task { |
| rv := []*types.Task{} |
| for _, t := range tasks { |
| if MatchTask(t, p) { |
| rv = append(rv, t) |
| } |
| } |
| return rv |
| } |
| |
| // TaskSearchParams are parameters on which Tasks may be searched. All fields |
| // are optional; if a field is not provided, the search will return Tasks with |
| // any value for that field. If TimeEnd is not provided the search terminates at |
| // the current time. If TimeStart is not provided the search begins 24 hours |
| // before TimeEnd. |
| type TaskSearchParams struct { |
| Attempt *int64 `json:"attempt,string,omitempty"` |
| Status *types.TaskStatus `json:"status"` |
| ForcedJobId *string `json:"forcedJobId,omitempty"` |
| Issue *string `json:"issue,omitempty"` |
| Name *string `json:"name"` |
| Patchset *string `json:"patchset,omitempty"` |
| Repo *string `json:"repo,omitempty"` |
| Revision *string `json:"revision,omitempty"` |
| TimeStart *time.Time `json:"time_start"` |
| TimeEnd *time.Time `json:"time_end"` |
| } |
| |
| // RemoteDB allows retrieving tasks and jobs and full access to comments. |
| type RemoteDB interface { |
| TaskReader |
| JobReader |
| CommentDB |
| } |
| |
| // DB implements TaskDB, JobDB, and CommentDB. |
| type DB interface { |
| TaskDB |
| JobDB |
| CommentDB |
| } |
| |
| // DBCloser is a DB that must be closed when no longer in use. |
| type DBCloser interface { |
| io.Closer |
| DB |
| } |
| |
| // federatedDB joins an independent TaskDB, JobDB, and CommentDB into a DB. |
| type federatedDB struct { |
| TaskDB |
| JobDB |
| CommentDB |
| } |
| |
| // NewDB returns a DB that delegates to independent TaskDB, JobDB, and |
| // CommentDB. |
| func NewDB(tdb TaskDB, jdb JobDB, cdb CommentDB) DB { |
| return &federatedDB{ |
| TaskDB: tdb, |
| JobDB: jdb, |
| CommentDB: cdb, |
| } |
| } |
| |
| // GetTasksFromWindow returns all tasks matching the given Window from the |
| // TaskReader. |
| func GetTasksFromWindow(ctx context.Context, db TaskReader, w window.Window) ([]*types.Task, error) { |
| ctx, span := trace.StartSpan(ctx, "db_GetTasksFromWindow") |
| defer span.End() |
| defer metrics2.FuncTimer().Stop() |
| |
| startTimesByRepo := w.StartTimesByRepo() |
| if len(startTimesByRepo) == 0 { |
| // If the timeWindow has no associated repos, default to loading |
| // tasks for all repos from the beginning of the timeWindow. |
| startTimesByRepo[""] = w.EarliestStart() |
| } |
| currentTime := now.Now(ctx) |
| tasks := make([]*types.Task, 0, 1024) |
| for repo, start := range startTimesByRepo { |
| sklog.Infof("Reading Tasks in %s from %s to %s.", repo, start, currentTime) |
| t0 := time.Now() |
| t, err := db.GetTasksFromDateRange(ctx, start, currentTime, repo) |
| if err != nil { |
| return nil, err |
| } |
| sklog.Infof("Read %d tasks from %s in %s", len(t), repo, time.Now().Sub(t0)) |
| tasks = append(tasks, t...) |
| } |
| sort.Sort(types.TaskSlice(tasks)) |
| return tasks, nil |
| } |
| |
| // GetJobsFromWindow returns all jobs matching the given Window from the |
| // JobReader. |
| func GetJobsFromWindow(ctx context.Context, db JobReader, w window.Window) ([]*types.Job, error) { |
| ctx, span := trace.StartSpan(ctx, "db_GetJobsFromWindow") |
| defer span.End() |
| defer metrics2.FuncTimer().Stop() |
| |
| startTimesByRepo := w.StartTimesByRepo() |
| if len(startTimesByRepo) == 0 { |
| // If the timeWindow has no associated repos, default to loading |
| // tasks for all repos from the beginning of the timeWindow. |
| startTimesByRepo[""] = w.EarliestStart() |
| } |
| currentTime := now.Now(ctx) |
| jobs := make([]*types.Job, 0, 1024) |
| for repo, start := range startTimesByRepo { |
| sklog.Infof("Reading Jobs in %s from %s to %s.", repo, start, currentTime) |
| t0 := time.Now() |
| j, err := db.GetJobsFromDateRange(ctx, start, currentTime, repo) |
| if err != nil { |
| return nil, err |
| } |
| sklog.Infof("Read %d jobs from %s in %s", len(j), repo, time.Now().Sub(t0)) |
| jobs = append(jobs, j...) |
| } |
| sort.Sort(types.JobSlice(jobs)) |
| return jobs, nil |
| } |
| |
| var errNotModified = errors.New("Task not modified") |
| |
| // UpdateDBFromTaskResult updates a task in db from data in s. |
| func UpdateDBFromTaskResult(ctx context.Context, db TaskDB, res *types.TaskResult) (bool, error) { |
| ctx, span := trace.StartSpan(ctx, "db_UpdateDBFromTaskResult", trace.WithSampler(trace.ProbabilitySampler(0.01))) |
| defer span.End() |
| id, ok := res.Tags[types.SWARMING_TAG_ID] |
| if !ok || len(id) == 0 { |
| return false, skerr.Fmt("missing %s tag", types.SWARMING_TAG_ID) |
| } |
| _, err := UpdateTaskWithRetries(ctx, db, id[0], func(task *types.Task) error { |
| modified, err := task.UpdateFromTaskResult(res) |
| if err != nil { |
| return err |
| } |
| if !modified { |
| return errNotModified |
| } |
| return nil |
| }) |
| if err == errNotModified { |
| return false, nil |
| } else if err != nil { |
| return false, err |
| } else { |
| return true, nil |
| } |
| } |