| package scheduling |
| |
| import ( |
| "context" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "io" |
| "math" |
| "net/http" |
| "path" |
| "reflect" |
| "sort" |
| "strings" |
| "sync" |
| "time" |
| |
| "go.opencensus.io/trace" |
| "golang.org/x/oauth2" |
| "golang.org/x/sync/errgroup" |
| |
| "go.skia.org/infra/go/cas" |
| "go.skia.org/infra/go/cleanup" |
| "go.skia.org/infra/go/firestore" |
| "go.skia.org/infra/go/gcs" |
| "go.skia.org/infra/go/git/repograph" |
| "go.skia.org/infra/go/metrics2" |
| "go.skia.org/infra/go/now" |
| "go.skia.org/infra/go/skerr" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/swarming" |
| "go.skia.org/infra/go/util" |
| "go.skia.org/infra/task_scheduler/go/db" |
| "go.skia.org/infra/task_scheduler/go/db/cache" |
| "go.skia.org/infra/task_scheduler/go/skip_tasks" |
| "go.skia.org/infra/task_scheduler/go/specs" |
| "go.skia.org/infra/task_scheduler/go/task_cfg_cache" |
| "go.skia.org/infra/task_scheduler/go/types" |
| "go.skia.org/infra/task_scheduler/go/window" |
| ) |
| |
| const ( |
| // Manually-forced jobs have high priority. |
| CANDIDATE_SCORE_FORCE_RUN = 100.0 |
| |
| // Try jobs have high priority, equal to building at HEAD when we're |
| // 5 commits behind. |
| CANDIDATE_SCORE_TRY_JOB = 10.0 |
| |
| // When retrying a try job task that has failed, prioritize the retry |
| // lower than tryjob tasks that haven't run yet. |
| CANDIDATE_SCORE_TRY_JOB_RETRY_MULTIPLIER = 0.75 |
| |
| // When bisecting or retrying a task that failed or had a mishap, add a bonus |
| // to the raw score. |
| // |
| // A value of 0.75 means that a retry scores higher than a bisecting a |
| // successful task with a blamelist of 2 commits, but lower than testing new |
| // commits or bisecting successful tasks with blamelist of 3 or |
| // more. Bisecting a failure with a blamelist of 2 commits scores the same as |
| // bisecting a successful task with a blamelist of 4 commits. |
| CANDIDATE_SCORE_FAILURE_OR_MISHAP_BONUS = 0.75 |
| |
| // MAX_BLAMELIST_COMMITS is the maximum number of commits which are |
| // allowed in a task blamelist before we stop tracing commit history. |
| MAX_BLAMELIST_COMMITS = 500 |
| |
| // Measurement name for task candidate counts by dimension set. |
| MEASUREMENT_TASK_CANDIDATE_COUNT = "task_candidate_count" |
| |
| NUM_TOP_CANDIDATES = 50 |
| |
| // To avoid errors resulting from DB transaction size limits, we |
| // restrict the number of tasks triggered per TaskSpec (we insert tasks |
| // into the DB in chunks by TaskSpec) to half of the DB transaction size |
| // limit (since we may need to update an existing whose blamelist was |
| // split by the new task). |
| SCHEDULING_LIMIT_PER_TASK_SPEC = firestore.MAX_TRANSACTION_DOCS / 2 |
| |
| GCS_MAIN_LOOP_DIAGNOSTICS_DIR = "MainLoop" |
| GCS_DIAGNOSTICS_WRITE_TIMEOUT = 60 * time.Second |
| ) |
| |
| var ( |
| ERR_BLAMELIST_DONE = errors.New("ERR_BLAMELIST_DONE") |
| ) |
| |
| // TaskScheduler is a struct used for scheduling tasks on bots. |
| type TaskScheduler struct { |
| busyBots *busyBots |
| candidateMetrics map[string]metrics2.Int64Metric |
| candidateMetricsMtx sync.Mutex |
| db db.DB |
| diagClient gcs.GCSClient |
| diagInstance string |
| rbeCas cas.CAS |
| rbeCasInstance string |
| jCache cache.JobCache |
| lastScheduled time.Time // protected by queueMtx. |
| |
| pendingInsert map[string]bool |
| pendingInsertMtx sync.RWMutex |
| |
| pools []string |
| pubsubCount metrics2.Counter |
| pubsubTopic string |
| queue []*TaskCandidate // protected by queueMtx. |
| queueMtx sync.RWMutex |
| repos repograph.Map |
| skipTasks *skip_tasks.DB |
| taskExecutors map[string]types.TaskExecutor |
| taskCfgCache task_cfg_cache.TaskCfgCache |
| tCache cache.TaskCache |
| // testWaitGroup keeps track of any goroutines the TaskScheduler methods |
| // create so that tests can ensure all goroutines finish before asserting. |
| testWaitGroup sync.WaitGroup |
| timeDecayAmt24Hr float64 |
| triggeredCount metrics2.Counter |
| updateUnfinishedCount metrics2.Counter |
| window window.Window |
| } |
| |
| func NewTaskScheduler(ctx context.Context, d db.DB, bl *skip_tasks.DB, period time.Duration, numCommits int, repos repograph.Map, rbeCas cas.CAS, rbeCasInstance string, taskExecutors map[string]types.TaskExecutor, c *http.Client, timeDecayAmt24Hr float64, pools []string, pubsubTopic string, taskCfgCache task_cfg_cache.TaskCfgCache, ts oauth2.TokenSource, diagClient gcs.GCSClient, diagInstance string, debugBusyBots BusyBotsDebugLog) (*TaskScheduler, error) { |
| // Repos must be updated before window is initialized; otherwise the repos may be uninitialized, |
| // resulting in the window being too short, causing the caches to be loaded with incomplete data. |
| for _, r := range repos { |
| if err := r.Update(ctx); err != nil { |
| return nil, skerr.Wrapf(err, "Failed initial repo sync") |
| } |
| } |
| w, err := window.New(ctx, period, numCommits, repos) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "Failed to create window") |
| } |
| |
| // Create caches. |
| tCache, err := cache.NewTaskCache(ctx, d, w, nil) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "Failed to create TaskCache") |
| } |
| |
| jCache, err := cache.NewJobCache(ctx, d, w, nil) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "Failed to create JobCache") |
| } |
| |
| s := &TaskScheduler{ |
| skipTasks: bl, |
| busyBots: newBusyBots(debugBusyBots), |
| candidateMetrics: map[string]metrics2.Int64Metric{}, |
| db: d, |
| diagClient: diagClient, |
| diagInstance: diagInstance, |
| jCache: jCache, |
| pendingInsert: map[string]bool{}, |
| pools: pools, |
| pubsubCount: metrics2.GetCounter("task_scheduler_pubsub_handler"), |
| pubsubTopic: pubsubTopic, |
| queue: []*TaskCandidate{}, |
| queueMtx: sync.RWMutex{}, |
| rbeCas: rbeCas, |
| rbeCasInstance: rbeCasInstance, |
| repos: repos, |
| taskExecutors: taskExecutors, |
| taskCfgCache: taskCfgCache, |
| tCache: tCache, |
| timeDecayAmt24Hr: timeDecayAmt24Hr, |
| triggeredCount: metrics2.GetCounter("task_scheduler_triggered_count"), |
| updateUnfinishedCount: metrics2.GetCounter("task_scheduler_update_unfinished_tasks_count"), |
| window: w, |
| } |
| return s, nil |
| } |
| |
| // Close cleans up resources used by the TaskScheduler. |
| func (s *TaskScheduler) Close() error { |
| err1 := s.taskCfgCache.Close() |
| err2 := s.rbeCas.Close() |
| if err1 != nil { |
| return skerr.Wrap(err1) |
| } |
| if err2 != nil { |
| return skerr.Wrap(err2) |
| } |
| return nil |
| } |
| |
| // Start initiates the TaskScheduler's goroutines for scheduling tasks. beforeMainLoop |
| // will be run before each scheduling iteration. |
| func (s *TaskScheduler) Start(ctx context.Context) { |
| lvScheduling := metrics2.NewLiveness("last_successful_task_scheduling") |
| cleanup.Repeat(5*time.Second, func(_ context.Context) { |
| // Explicitly ignore the passed-in context; this allows us to |
| // finish the current scheduling cycle even if the context is |
| // canceled, which helps prevent "orphaned" tasks which were |
| // triggered on Swarming but were not inserted into the DB. |
| ctx, span := trace.StartSpan(context.Background(), "taskscheduler_Start_MainLoop", trace.WithSampler(trace.AlwaysSample())) |
| defer span.End() |
| if err := s.MainLoop(ctx); err != nil { |
| sklog.Errorf("Failed to run the task scheduler: %s", err) |
| } else { |
| lvScheduling.Reset() |
| } |
| }, nil) |
| lvUpdateUnfinishedTasks := metrics2.NewLiveness("last_successful_tasks_update") |
| go util.RepeatCtx(ctx, 5*time.Minute, func(ctx context.Context) { |
| ctx, span := trace.StartSpan(ctx, "taskscheduler_Start_UpdateUnfinishedTasks", trace.WithSampler(trace.AlwaysSample())) |
| defer span.End() |
| if err := s.updateUnfinishedTasks(ctx); err != nil { |
| sklog.Errorf("Failed to run periodic tasks update: %s", err) |
| } else { |
| lvUpdateUnfinishedTasks.Reset() |
| } |
| }) |
| } |
| |
| // putTask is a wrapper around DB.PutTask which adds the task to the cache. |
| func (s *TaskScheduler) putTask(ctx context.Context, t *types.Task) error { |
| if err := s.db.PutTask(ctx, t); err != nil { |
| return err |
| } |
| s.tCache.AddTasks([]*types.Task{t}) |
| return nil |
| } |
| |
| // putTasks is a wrapper around DB.PutTasks which adds the tasks to the cache. |
| func (s *TaskScheduler) putTasks(ctx context.Context, t []*types.Task) error { |
| if err := s.db.PutTasks(ctx, t); err != nil { |
| return err |
| } |
| s.tCache.AddTasks(t) |
| return nil |
| } |
| |
| // putTasksInChunks is a wrapper around DB.PutTasksInChunks which adds the tasks |
| // to the cache. |
| func (s *TaskScheduler) putTasksInChunks(ctx context.Context, t []*types.Task) error { |
| if err := s.db.PutTasksInChunks(ctx, t); err != nil { |
| return err |
| } |
| s.tCache.AddTasks(t) |
| return nil |
| } |
| |
| // putJob is a wrapper around DB.PutJob which adds the job to the cache. |
| func (s *TaskScheduler) putJob(ctx context.Context, j *types.Job) error { |
| if err := s.db.PutJob(ctx, j); err != nil { |
| return err |
| } |
| s.jCache.AddJobs([]*types.Job{j}) |
| return nil |
| } |
| |
| // putJobsInChunks is a wrapper around DB.PutJobsInChunks which adds the jobs |
| // to the cache. |
| func (s *TaskScheduler) putJobsInChunks(ctx context.Context, j []*types.Job) error { |
| if err := s.db.PutJobsInChunks(ctx, j); err != nil { |
| return err |
| } |
| s.jCache.AddJobs(j) |
| return nil |
| } |
| |
| // TaskSchedulerStatus is a struct which provides status information about the |
| // TaskScheduler. |
| type TaskSchedulerStatus struct { |
| LastScheduled time.Time `json:"last_scheduled"` |
| TopCandidates []*TaskCandidate `json:"top_candidates"` |
| } |
| |
| // Status returns the current status of the TaskScheduler. |
| func (s *TaskScheduler) Status() *TaskSchedulerStatus { |
| defer metrics2.FuncTimer().Stop() |
| s.queueMtx.RLock() |
| defer s.queueMtx.RUnlock() |
| n := NUM_TOP_CANDIDATES |
| if len(s.queue) < n { |
| n = len(s.queue) |
| } |
| return &TaskSchedulerStatus{ |
| LastScheduled: s.lastScheduled, |
| TopCandidates: s.queue[:n], |
| } |
| } |
| |
| // TaskCandidateSearchTerms includes fields used for searching task candidates. |
| type TaskCandidateSearchTerms struct { |
| types.TaskKey |
| Dimensions []string `json:"dimensions"` |
| } |
| |
| // SearchQueue returns all task candidates in the queue which match the given |
| // TaskKey. Any blank fields are considered to be wildcards. |
| func (s *TaskScheduler) SearchQueue(q *TaskCandidateSearchTerms) []*TaskCandidate { |
| s.queueMtx.RLock() |
| defer s.queueMtx.RUnlock() |
| rv := []*TaskCandidate{} |
| for _, c := range s.queue { |
| // TODO(borenet): I wish there was a better way to do this. |
| if q.ForcedJobId != "" && c.ForcedJobId != q.ForcedJobId { |
| continue |
| } |
| if q.Name != "" && c.Name != q.Name { |
| continue |
| } |
| if q.Repo != "" && c.Repo != q.Repo { |
| continue |
| } |
| if q.Revision != "" && c.Revision != q.Revision { |
| continue |
| } |
| if q.Issue != "" && c.Issue != q.Issue { |
| continue |
| } |
| if q.PatchRepo != "" && c.PatchRepo != q.PatchRepo { |
| continue |
| } |
| if q.Patchset != "" && c.Patchset != q.Patchset { |
| continue |
| } |
| if q.Server != "" && c.Server != q.Server { |
| continue |
| } |
| if len(q.Dimensions) > 0 { |
| ok := true |
| for _, d := range q.Dimensions { |
| if !util.In(d, c.TaskSpec.Dimensions) { |
| ok = false |
| break |
| } |
| } |
| if !ok { |
| continue |
| } |
| } |
| rv = append(rv, c) |
| } |
| return rv |
| } |
| |
| type commitTester interface { |
| TestCommit(repo string, c *repograph.Commit) bool |
| } |
| |
| type tasksCfgGetter interface { |
| Get(ctx context.Context, rs types.RepoState) (*specs.TasksCfg, error, error) |
| } |
| |
| type commitGetter interface { |
| Get(ref string) *repograph.Commit |
| } |
| |
| // ComputeBlamelist computes the blamelist for a new task, specified by name, |
| // repo, and revision. Returns the list of commits covered by the task, and any |
| // previous task which part or all of the blamelist was "stolen" from (see |
| // below). There are three cases: |
| // |
| // 1. The new task tests commits which have not yet been tested. Trace commit |
| // history, accumulating commits until we find commits which have been tested |
| // by previous tasks. |
| // |
| // 2. The new task runs at the same commit as a previous task. This is a retry, |
| // so the entire blamelist of the previous task is "stolen". |
| // |
| // 3. The new task runs at a commit which is in a previous task's blamelist, but |
| // no task has run at the same commit. This is a bisect. Trace commit |
| // history, "stealing" commits from the previous task until we find a commit |
| // which was covered by a *different* previous task. |
| // |
| // Args: |
| // - cache: TaskCache instance. |
| // - repo: repograph.Graph instance corresponding to the repository of the task. |
| // - taskName: Name of the task. |
| // - repoName: Name of the repository for the task. |
| // - revision: Revision at which the task would run. |
| // - commitsBuf: Buffer for use as scratch space. |
| func ComputeBlamelist(ctx context.Context, cache cache.TaskCache, repo commitGetter, taskName, repoName string, revision *repograph.Commit, commitsBuf []*repograph.Commit, tcg tasksCfgGetter, ct commitTester) ([]string, *types.Task, error) { |
| ctx, span := trace.StartSpan(ctx, "taskscheduler_ComputeBlamelist", trace.WithSampler(trace.ProbabilitySampler(0.01))) |
| defer span.End() |
| commitsBuf = commitsBuf[:0] |
| var stealFrom *types.Task |
| |
| // Run the helper function to recurse on commit history. |
| if err := revision.Recurse(func(commit *repograph.Commit) error { |
| // If this commit is outside the scheduling window, we won't |
| // have tasks in the cache for it, and thus we won't be able |
| // to compute the correct blamelist. Stop here. |
| if !ct.TestCommit(repoName, commit) { |
| return repograph.ErrStopRecursing |
| } |
| |
| // If the task spec is not defined at this commit, it can't be |
| // part of the blamelist. |
| rs := types.RepoState{ |
| Repo: repoName, |
| Revision: commit.Hash, |
| } |
| cfg, cachedErr, err := tcg.Get(ctx, rs) |
| if cachedErr != nil { |
| sklog.Warningf("Stopping blamelist recursion at %s; TaskCfgCache has error: %s", commit.Hash, err) |
| return repograph.ErrStopRecursing |
| } |
| if err != nil { |
| if err == task_cfg_cache.ErrNoSuchEntry { |
| sklog.Warningf("Computing blamelist for %s in %s @ %s, no cached TasksCfg at %s; stopping blamelist calculation.", taskName, repoName, revision.Hash, commit.Hash) |
| return repograph.ErrStopRecursing |
| } |
| return skerr.Wrap(err) |
| } |
| if _, ok := cfg.Tasks[taskName]; !ok { |
| sklog.Infof("Computing blamelist for %s in %s @ %s, Task Spec not defined in %s (have %d tasks); stopping blamelist calculation.", taskName, repoName, revision.Hash, commit.Hash, len(cfg.Tasks)) |
| return repograph.ErrStopRecursing |
| } |
| |
| // Determine whether any task already includes this commit. |
| prev, err := cache.GetTaskForCommit(repoName, commit.Hash, taskName) |
| if err != nil { |
| return err |
| } |
| |
| // If the blamelist is too large, just use a single commit. |
| if len(commitsBuf) > MAX_BLAMELIST_COMMITS { |
| commitsBuf = append(commitsBuf[:0], revision) |
| //sklog.Warningf("Found too many commits for %s @ %s; using single-commit blamelist.", taskName, revision.Hash) |
| return ERR_BLAMELIST_DONE |
| } |
| |
| // If we're stealing commits from a previous task but the current |
| // commit is not in any task's blamelist, we must have scrolled past |
| // the beginning of the tasks. Just return. |
| if prev == nil && stealFrom != nil { |
| return repograph.ErrStopRecursing |
| } |
| |
| // If a previous task already included this commit, we have to make a decision. |
| if prev != nil { |
| // If this Task's Revision is already included in a different |
| // Task, then we're either bisecting or retrying a task. We'll |
| // "steal" commits from the previous Task's blamelist. |
| if len(commitsBuf) == 0 { |
| stealFrom = prev |
| |
| // Another shortcut: If our Revision is the same as the |
| // Revision of the Task we're stealing commits from, |
| // ie. both tasks ran at the same commit, then this is a |
| // retry. Just steal all of the commits without doing |
| // any more work. |
| if stealFrom.Revision == revision.Hash { |
| commitsBuf = commitsBuf[:0] |
| for _, c := range stealFrom.Commits { |
| ptr := repo.Get(c) |
| if ptr == nil { |
| return skerr.Fmt("No such commit: %q", c) |
| } |
| commitsBuf = append(commitsBuf, ptr) |
| } |
| return ERR_BLAMELIST_DONE |
| } |
| } |
| if stealFrom == nil || prev.Id != stealFrom.Id { |
| // If we've hit a commit belonging to a different task, |
| // we're done. |
| return repograph.ErrStopRecursing |
| } |
| } |
| |
| // Add the commit. |
| commitsBuf = append(commitsBuf, commit) |
| |
| // Recurse on the commit's parents. |
| return nil |
| |
| }); err != nil && err != ERR_BLAMELIST_DONE { |
| return nil, nil, err |
| } |
| |
| rv := make([]string, 0, len(commitsBuf)) |
| for _, c := range commitsBuf { |
| rv = append(rv, c.Hash) |
| } |
| return rv, stealFrom, nil |
| } |
| |
| type taskSchedulerMainLoopDiagnostics struct { |
| StartTime time.Time `json:"startTime"` |
| EndTime time.Time `json:"endTime"` |
| Error string `json:"error,omitEmpty"` |
| Candidates []*TaskCandidate `json:"candidates"` |
| FreeBots []*types.Machine `json:"freeBots"` |
| } |
| |
| // writeMainLoopDiagnosticsToGCS writes JSON containing allCandidates and |
| // freeBots to GCS. If called in a goroutine, the arguments may not be modified. |
| func writeMainLoopDiagnosticsToGCS(ctx context.Context, start time.Time, end time.Time, diagClient gcs.GCSClient, diagInstance string, allCandidates map[types.TaskKey]*TaskCandidate, freeBots []*types.Machine, scheduleErr error) error { |
| ctx, span := trace.StartSpan(ctx, "writeMainLoopDiagnosticsToGCS") |
| defer span.End() |
| |
| candidateSlice := make([]*TaskCandidate, 0, len(allCandidates)) |
| for _, c := range allCandidates { |
| candidateSlice = append(candidateSlice, c) |
| } |
| sort.Sort(taskCandidateSlice(candidateSlice)) |
| content := taskSchedulerMainLoopDiagnostics{ |
| StartTime: start.UTC(), |
| EndTime: end.UTC(), |
| Candidates: candidateSlice, |
| FreeBots: freeBots, |
| } |
| if scheduleErr != nil { |
| content.Error = scheduleErr.Error() |
| } |
| filenameBase := start.UTC().Format("20060102T150405.000000000Z") |
| path := path.Join(diagInstance, GCS_MAIN_LOOP_DIAGNOSTICS_DIR, filenameBase+".json") |
| ctx, cancel := context.WithTimeout(ctx, GCS_DIAGNOSTICS_WRITE_TIMEOUT) |
| defer cancel() |
| return gcs.WithWriteFileGzip(diagClient, ctx, path, func(w io.Writer) error { |
| enc := json.NewEncoder(w) |
| enc.SetIndent("", " ") |
| return enc.Encode(&content) |
| }) |
| } |
| |
| // findTaskCandidatesForJobs returns the set of all taskCandidates needed by all |
| // currently-unfinished jobs. |
| func (s *TaskScheduler) findTaskCandidatesForJobs(ctx context.Context, unfinishedJobs []*types.Job) (map[types.TaskKey]*TaskCandidate, error) { |
| ctx, span := trace.StartSpan(ctx, "findTaskCandidatesForJobs") |
| defer span.End() |
| |
| // Get the repo+commit+taskspecs for each job. |
| candidates := map[types.TaskKey]*TaskCandidate{} |
| for _, j := range unfinishedJobs { |
| if !s.window.TestTime(j.Repo, j.Created) { |
| continue |
| } |
| |
| // If git history was changed, we should avoid running jobs at |
| // orphaned commits. |
| if s.repos[j.Repo].Get(j.Revision) == nil { |
| // TODO(borenet): Cancel the job. |
| continue |
| } |
| |
| // Add task candidates for this job. |
| for tsName := range j.Dependencies { |
| key := j.MakeTaskKey(tsName) |
| c, ok := candidates[key] |
| if !ok { |
| taskCfg, cachedErr, err := s.taskCfgCache.Get(ctx, j.RepoState) |
| if cachedErr != nil { |
| continue |
| } |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| spec, ok := taskCfg.Tasks[tsName] |
| if !ok { |
| // TODO(borenet): This should have already been caught when |
| // we validated the TasksCfg before inserting the job. |
| sklog.Errorf("Job %s wants task %s which is not defined in %+v", j.Name, tsName, j.RepoState) |
| continue |
| } |
| casSpec, ok := taskCfg.CasSpecs[spec.CasSpec] |
| if !ok { |
| sklog.Errorf("Task %s at %+v depends on non-existent CasSpec %q; wanted by job %s", tsName, j.RepoState, spec.CasSpec, j.Id) |
| continue |
| } |
| c = &TaskCandidate{ |
| // NB: Because multiple Jobs may share a Task, the BuildbucketBuildId |
| // could be inherited from any matching Job. Therefore, this should be |
| // used for non-critical, informational purposes only. |
| BuildbucketBuildId: j.BuildbucketBuildId, |
| CasDigests: []string{casSpec.Digest}, |
| Jobs: nil, |
| TaskKey: key, |
| TaskSpec: spec, |
| } |
| candidates[key] = c |
| } |
| c.AddJob(j) |
| } |
| } |
| sklog.Infof("Found %d task candidates for %d unfinished jobs.", len(candidates), len(unfinishedJobs)) |
| return candidates, nil |
| } |
| |
| // filterTaskCandidates reduces the set of taskCandidates to the ones we might |
| // actually want to run and organizes them by repo and TaskSpec name. |
| func (s *TaskScheduler) filterTaskCandidates(ctx context.Context, preFilterCandidates map[types.TaskKey]*TaskCandidate) (map[string]map[string][]*TaskCandidate, error) { |
| ctx, span := trace.StartSpan(ctx, "filterTaskCandidates") |
| defer span.End() |
| |
| candidatesBySpec := map[string]map[string][]*TaskCandidate{} |
| total := 0 |
| skipped := map[string]int{} |
| for _, c := range preFilterCandidates { |
| // Reject skipped tasks. |
| if rule := s.skipTasks.MatchRule(c.Name, c.Revision); rule != "" { |
| skipped[rule]++ |
| c.GetDiagnostics().Filtering = &taskCandidateFilteringDiagnostics{SkippedByRule: rule} |
| continue |
| } |
| |
| // Reject tasks for too-old commits, as long as they aren't try jobs. |
| if !c.IsTryJob() { |
| if in, err := s.window.TestCommitHash(c.Repo, c.Revision); err != nil { |
| return nil, skerr.Wrap(err) |
| } else if !in { |
| c.GetDiagnostics().Filtering = &taskCandidateFilteringDiagnostics{RevisionTooOld: true} |
| continue |
| } |
| } |
| // We shouldn't duplicate pending, in-progress, |
| // or successfully completed tasks. |
| prevTasks, err := s.tCache.GetTasksByKey(c.TaskKey) |
| if err != nil { |
| return nil, err |
| } |
| var previous *types.Task |
| if len(prevTasks) > 0 { |
| // Just choose the last (most recently created) previous |
| // Task. |
| previous = prevTasks[len(prevTasks)-1] |
| } |
| if previous != nil { |
| if previous.Status == types.TASK_STATUS_PENDING || previous.Status == types.TASK_STATUS_RUNNING { |
| c.GetDiagnostics().Filtering = &taskCandidateFilteringDiagnostics{SupersededByTask: previous.Id} |
| continue |
| } |
| if previous.Success() { |
| c.GetDiagnostics().Filtering = &taskCandidateFilteringDiagnostics{SupersededByTask: previous.Id} |
| continue |
| } |
| // The attempt counts are only valid if the previous |
| // attempt we're looking at is the last attempt for this |
| // TaskSpec. Fortunately, TaskCache.GetTasksByKey sorts |
| // by creation time, and we've selected the last of the |
| // results. |
| maxAttempts := c.TaskSpec.MaxAttempts |
| if maxAttempts == 0 { |
| maxAttempts = specs.DEFAULT_TASK_SPEC_MAX_ATTEMPTS |
| } |
| // Special case for tasks created before arbitrary |
| // numbers of attempts were possible. |
| previousAttempt := previous.Attempt |
| if previousAttempt == 0 && previous.RetryOf != "" { |
| previousAttempt = 1 |
| } |
| if previousAttempt >= maxAttempts-1 { |
| previousIds := make([]string, 0, len(prevTasks)) |
| for _, t := range prevTasks { |
| previousIds = append(previousIds, t.Id) |
| } |
| c.GetDiagnostics().Filtering = &taskCandidateFilteringDiagnostics{PreviousAttempts: previousIds} |
| continue |
| } |
| c.Attempt = previousAttempt + 1 |
| c.RetryOf = previous.Id |
| } |
| |
| // Don't consider candidates whose dependencies are not met. |
| depsMet, idsToHashes, err := c.allDepsMet(s.tCache) |
| if err != nil { |
| return nil, err |
| } |
| if !depsMet { |
| // c.Filtering set in allDepsMet. |
| continue |
| } |
| |
| // Add the CasDigests and ParentTaskIds which feed into this candidate. |
| hashes := make([]string, 0, len(idsToHashes)) |
| parentTaskIds := make([]string, 0, len(idsToHashes)) |
| for id, hash := range idsToHashes { |
| hashes = append(hashes, hash) |
| parentTaskIds = append(parentTaskIds, id) |
| } |
| c.CasDigests = append(c.CasDigests, hashes...) |
| sort.Strings(parentTaskIds) |
| c.ParentTaskIds = parentTaskIds |
| |
| candidates, ok := candidatesBySpec[c.Repo] |
| if !ok { |
| candidates = map[string][]*TaskCandidate{} |
| candidatesBySpec[c.Repo] = candidates |
| } |
| candidates[c.Name] = append(candidates[c.Name], c) |
| total++ |
| } |
| for rule, numSkipped := range skipped { |
| diagLink := fmt.Sprintf("https://console.cloud.google.com/storage/browser/skia-task-scheduler-diagnostics/%s?project=google.com:skia-corp", path.Join(s.diagInstance, GCS_MAIN_LOOP_DIAGNOSTICS_DIR)) |
| sklog.Infof("Skipped %d candidates due to skip_tasks rule %q. See details in diagnostics at %s.", numSkipped, rule, diagLink) |
| } |
| sklog.Infof("Filtered to %d candidates in %d spec categories.", total, len(candidatesBySpec)) |
| return candidatesBySpec, nil |
| } |
| |
| // scoreCandidate sets the Score field on the given Task Candidate. Also records |
| // diagnostic information on TaskCandidate.Diagnostics.Scoring. |
| func (s *TaskScheduler) scoreCandidate(ctx context.Context, c *TaskCandidate, cycleStart, commitTime time.Time, stealingFrom *types.Task) { |
| ctx, span := trace.StartSpan(ctx, "scoreTaskCandidate", trace.WithSampler(trace.ProbabilitySampler(0.01))) |
| defer span.End() |
| if len(c.Jobs) == 0 { |
| // Log an error and return to allow scheduling other tasks. |
| sklog.Errorf("taskCandidate has no Jobs: %#v", c) |
| c.Score = 0 |
| return |
| } |
| |
| // Record diagnostic information; this will be uploaded to GCS for forensics |
| // in case we need to determine why a candidate was or was not triggered. |
| diag := &taskCandidateScoringDiagnostics{} |
| c.GetDiagnostics().Scoring = diag |
| |
| // Formula for priority is 1 - (1-<job1 priority>)(1-<job2 priority>)...(1-<jobN priority>). |
| inversePriorityProduct := 1.0 |
| for _, j := range c.Jobs { |
| jobPriority := specs.DEFAULT_JOB_SPEC_PRIORITY |
| if j.Priority <= 1 && j.Priority > 0 { |
| jobPriority = j.Priority |
| } |
| inversePriorityProduct *= 1 - jobPriority |
| } |
| priority := 1 - inversePriorityProduct |
| diag.Priority = priority |
| |
| // Use the earliest Job's Created time, which will maximize priority for older forced/try jobs. |
| earliestJob := c.Jobs[0] |
| diag.JobCreatedHours = cycleStart.Sub(earliestJob.Created).Hours() |
| |
| if c.IsTryJob() { |
| c.Score = CANDIDATE_SCORE_TRY_JOB + cycleStart.Sub(earliestJob.Created).Hours() |
| // Prioritize each subsequent attempt lower than the previous attempt. |
| for i := 0; i < c.Attempt; i++ { |
| c.Score *= CANDIDATE_SCORE_TRY_JOB_RETRY_MULTIPLIER |
| } |
| c.Score *= priority |
| return |
| } |
| |
| if c.IsForceRun() { |
| c.Score = CANDIDATE_SCORE_FORCE_RUN + cycleStart.Sub(earliestJob.Created).Hours() |
| c.Score *= priority |
| return |
| } |
| |
| // Score the candidate. |
| // The score for a candidate is based on the "testedness" increase |
| // provided by running the task. |
| stoleFromCommits := 0 |
| stoleFromStatus := types.TASK_STATUS_SUCCESS |
| if stealingFrom != nil { |
| stoleFromCommits = len(stealingFrom.Commits) |
| stoleFromStatus = stealingFrom.Status |
| } |
| diag.StoleFromCommits = stoleFromCommits |
| score := testednessIncrease(len(c.Commits), stoleFromCommits) |
| diag.TestednessIncrease = score |
| |
| // Add a bonus when retrying or backfilling failures and mishaps. |
| if stoleFromStatus == types.TASK_STATUS_FAILURE || stoleFromStatus == types.TASK_STATUS_MISHAP { |
| score += CANDIDATE_SCORE_FAILURE_OR_MISHAP_BONUS |
| } |
| |
| // Scale the score by other factors, eg. time decay. |
| decay := s.timeDecayForCommit(cycleStart, commitTime) |
| diag.TimeDecay = decay |
| score *= decay |
| score *= priority |
| |
| c.Score = score |
| } |
| |
| // Process task candidates within a single task spec. |
| func (s *TaskScheduler) processTaskCandidatesSingleTaskSpec(ctx context.Context, currentTime time.Time, repoUrl, name string, candidatesWithTryJobs []*TaskCandidate) ([]*TaskCandidate, error) { |
| ctx, span := trace.StartSpan(ctx, "processTaskCandidatesSingleTaskSpec", trace.WithSampler(trace.ProbabilitySampler(0.01))) |
| defer span.End() |
| |
| // commitsBuf is used in blamelist computation to prevent needing repeated |
| // allocation of large blocks of commits. |
| commitsBuf := make([]*repograph.Commit, 0, MAX_BLAMELIST_COMMITS) |
| repo := s.repos[repoUrl] |
| finished := make([]*TaskCandidate, 0, len(candidatesWithTryJobs)) |
| |
| // 1. Handle try jobs. Tasks for try jobs don't have blamelists, |
| // so we can go ahead and score them and remove them from |
| // consideration below. |
| candidates := make([]*TaskCandidate, 0, len(candidatesWithTryJobs)) |
| for _, candidate := range candidatesWithTryJobs { |
| if candidate.IsTryJob() { |
| s.scoreCandidate(ctx, candidate, currentTime, repo.Get(candidate.Revision).Timestamp, nil) |
| finished = append(finished, candidate) |
| } else { |
| candidates = append(candidates, candidate) |
| } |
| } |
| |
| // 2. Compute blamelists and scores for all other candidates. |
| // The scores are just the initial scores, in the absence of |
| // all the other candidates. In reality, the candidates are |
| // not independent, since their blamelists will interact, so |
| // as we repeatedly choose the most important candidate, we |
| // need to update the others' blamelists and scores. |
| stealingFromTasks := map[string]*types.Task{} |
| for _, candidate := range candidates { |
| revision := repo.Get(candidate.Revision) |
| commits, stealingFrom, err := ComputeBlamelist(ctx, s.tCache, repo, name, repoUrl, revision, commitsBuf, s.taskCfgCache, s.window) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| candidate.Commits = commits |
| if stealingFrom != nil { |
| candidate.StealingFromId = stealingFrom.Id |
| stealingFromTasks[stealingFrom.Id] = stealingFrom |
| } |
| s.scoreCandidate(ctx, candidate, currentTime, revision.Timestamp, stealingFrom) |
| } |
| |
| // 3. Repeat until all candidates have been ranked: |
| for len(candidates) > 0 { |
| // a. Choose the highest-scoring candidate and add to the queue. |
| sort.Sort(taskCandidateSlice(candidates)) |
| bestOrig := candidates[0] |
| finished = append(finished, bestOrig) |
| candidates = candidates[1:] |
| // Copy, since we might mangle the blamelist later. |
| best := bestOrig.CopyNoDiagnostics() |
| bestId := best.MakeId() |
| bestFakeTask := best.MakeTask() |
| stealingFromTasks[bestId] = bestFakeTask |
| // Update the blamelist of the task we stole commits from if applicable. |
| if best.StealingFromId != "" { |
| stealingFrom := stealingFromTasks[best.StealingFromId] |
| stealingFrom.Commits = util.NewStringSet(stealingFrom.Commits).Complement(util.NewStringSet(best.Commits)).Keys() |
| } |
| |
| // b. Update candidates which were affected by the choice of the best |
| // candidate; their blamelist, StealingFrom field, and score may need |
| // to be updated. |
| for _, candidate := range candidates { |
| // TODO(borenet): This is still O(n^2), we should be able to get |
| // down to O(n lg n) with a blamelist map. |
| updated := false |
| // b1. This candidate has the best candidate's revision in its |
| // blamelist. |
| if util.In(best.Revision, candidate.Commits) { |
| // Only transfer commits if the new candidate runs at a |
| // different revision from the best candidate. If they run at |
| // the same revision, this new candidate is a retry of the best |
| // candidate and therefore has the same blamelist. |
| if best.Revision != candidate.Revision { |
| // candidate.StealingFrom doesn't change, but the best |
| // candidate effectively steals commits from this candidate. |
| candidate.Commits = util.NewStringSet(candidate.Commits).Complement(util.NewStringSet(best.Commits)).Keys() |
| updated = true |
| } |
| } |
| // b2. The best candidate has this candidate's revision in its |
| // blamelist. |
| if util.In(candidate.Revision, best.Commits) { |
| // This candidate is now stealing commits from the best |
| // candidate, but its blamelist doesn't change. |
| candidate.StealingFromId = bestId |
| updated = true |
| } |
| if updated { |
| s.scoreCandidate(ctx, candidate, currentTime, repo.Get(candidate.Revision).Timestamp, stealingFromTasks[candidate.StealingFromId]) |
| } |
| } |
| } |
| return finished, nil |
| } |
| |
| // Process the task candidates. |
| func (s *TaskScheduler) processTaskCandidates(ctx context.Context, candidates map[string]map[string][]*TaskCandidate) ([]*TaskCandidate, error) { |
| ctx, span := trace.StartSpan(ctx, "processTaskCandidates") |
| defer span.End() |
| |
| currentTime := now.Now(ctx) |
| processed := make(chan *TaskCandidate) |
| errs := make(chan error) |
| wg := sync.WaitGroup{} |
| for repo, cs := range candidates { |
| for name, c := range cs { |
| wg.Add(1) |
| go func(repoUrl, name string, candidatesWithTryJobs []*TaskCandidate) { |
| defer wg.Done() |
| c, err := s.processTaskCandidatesSingleTaskSpec(ctx, currentTime, repoUrl, name, candidatesWithTryJobs) |
| if err != nil { |
| errs <- err |
| } else { |
| for _, candidate := range c { |
| processed <- candidate |
| } |
| } |
| }(repo, name, c) |
| } |
| } |
| go func() { |
| wg.Wait() |
| close(processed) |
| close(errs) |
| }() |
| rvCandidates := []*TaskCandidate{} |
| rvErrs := []error{} |
| for { |
| select { |
| case c, ok := <-processed: |
| if ok { |
| rvCandidates = append(rvCandidates, c) |
| } else { |
| processed = nil |
| } |
| case err, ok := <-errs: |
| if ok { |
| rvErrs = append(rvErrs, err) |
| } else { |
| errs = nil |
| } |
| } |
| if processed == nil && errs == nil { |
| break |
| } |
| } |
| |
| if len(rvErrs) != 0 { |
| return nil, rvErrs[0] |
| } |
| sort.Sort(taskCandidateSlice(rvCandidates)) |
| return rvCandidates, nil |
| } |
| |
| // flatten all the dimensions in 'dims' into a single valued map. |
| func flatten(dims map[string]string) map[string]string { |
| keys := make([]string, 0, len(dims)) |
| for key := range dims { |
| keys = append(keys, key) |
| } |
| sort.Strings(keys) |
| ret := make([]string, 0, 2*len(dims)) |
| for _, key := range keys { |
| ret = append(ret, key, dims[key]) |
| } |
| return map[string]string{"dimensions": strings.Join(ret, " ")} |
| } |
| |
| // recordCandidateMetrics generates metrics for candidates by dimension sets. |
| func (s *TaskScheduler) recordCandidateMetrics(ctx context.Context, candidates map[string]map[string][]*TaskCandidate) { |
| ctx, span := trace.StartSpan(ctx, "recordCandidateMetrics", trace.WithSampler(trace.ProbabilitySampler(0.01))) |
| defer span.End() |
| |
| // Generate counts. These maps are keyed by the MD5 hash of the |
| // candidate's TaskSpec's dimensions. |
| counts := map[string]int64{} |
| dimensions := map[string]map[string]string{} |
| for _, byRepo := range candidates { |
| for _, bySpec := range byRepo { |
| for _, c := range bySpec { |
| parseDims, err := swarming.ParseDimensions(c.TaskSpec.Dimensions) |
| if err != nil { |
| sklog.Errorf("Failed to parse dimensions: %s", err) |
| continue |
| } |
| dims := make(map[string]string, len(parseDims)) |
| for k, v := range parseDims { |
| // Just take the first value for each dimension. |
| dims[k] = v[0] |
| } |
| k, err := util.MD5Sum(dims) |
| if err != nil { |
| sklog.Errorf("Failed to create metrics key: %s", err) |
| continue |
| } |
| dimensions[k] = dims |
| counts[k]++ |
| } |
| } |
| } |
| // Report the data. |
| s.candidateMetricsMtx.Lock() |
| defer s.candidateMetricsMtx.Unlock() |
| for k, count := range counts { |
| metric, ok := s.candidateMetrics[k] |
| if !ok { |
| metric = metrics2.GetInt64Metric(MEASUREMENT_TASK_CANDIDATE_COUNT, flatten(dimensions[k])) |
| s.candidateMetrics[k] = metric |
| } |
| metric.Update(count) |
| } |
| for k, metric := range s.candidateMetrics { |
| _, ok := counts[k] |
| if !ok { |
| metric.Update(0) |
| delete(s.candidateMetrics, k) |
| } |
| } |
| } |
| |
| // regenerateTaskQueue obtains the set of all eligible task candidates, scores |
| // them, and prepares them to be triggered. The second return value contains |
| // all candidates. |
| func (s *TaskScheduler) regenerateTaskQueue(ctx context.Context) ([]*TaskCandidate, map[types.TaskKey]*TaskCandidate, error) { |
| ctx, span := trace.StartSpan(ctx, "regenerateTaskQueue") |
| defer span.End() |
| |
| // Find the unfinished Jobs. |
| unfinishedJobs, err := s.jCache.InProgressJobs() |
| if err != nil { |
| return nil, nil, err |
| } |
| |
| // Find TaskSpecs for all unfinished Jobs. |
| preFilterCandidates, err := s.findTaskCandidatesForJobs(ctx, unfinishedJobs) |
| if err != nil { |
| return nil, nil, err |
| } |
| |
| // Filter task candidates. |
| candidates, err := s.filterTaskCandidates(ctx, preFilterCandidates) |
| if err != nil { |
| return nil, nil, err |
| } |
| |
| // Record the number of task candidates per dimension set. |
| s.recordCandidateMetrics(ctx, candidates) |
| |
| // Process the remaining task candidates. |
| queue, err := s.processTaskCandidates(ctx, candidates) |
| if err != nil { |
| return nil, nil, err |
| } |
| |
| return queue, preFilterCandidates, nil |
| } |
| |
| // getCandidatesToSchedule matches the list of free Swarming bots to task |
| // candidates in the queue and returns the candidates which should be run. |
| // Assumes that the tasks are sorted in decreasing order by score. |
| func getCandidatesToSchedule(ctx context.Context, bots []*types.Machine, tasks []*TaskCandidate) []*TaskCandidate { |
| ctx, span := trace.StartSpan(ctx, "getCandidatesToSchedule") |
| defer span.End() |
| |
| // Create a bots-by-swarming-dimension mapping. |
| botsByDim := map[string]util.StringSet{} |
| for _, b := range bots { |
| for _, dim := range b.Dimensions { |
| if _, ok := botsByDim[dim]; !ok { |
| botsByDim[dim] = util.StringSet{} |
| } |
| botsByDim[dim][b.ID] = true |
| } |
| } |
| // BotIds that have been used by previous candidates. |
| usedBots := util.StringSet{} |
| // Map BotId to the candidates that could have used that bot. In the |
| // case that no bots are available for a candidate, map concatenated |
| // dimensions to candidates. |
| botToCandidates := map[string][]*TaskCandidate{} |
| |
| // Match bots to tasks. |
| // TODO(borenet): Some tasks require a more specialized bot. We should |
| // match so that less-specialized tasks don't "steal" more-specialized |
| // bots which they don't actually need. |
| rv := make([]*TaskCandidate, 0, len(bots)) |
| countByTaskSpec := make(map[string]int, len(bots)) |
| for _, c := range tasks { |
| diag := &taskCandidateSchedulingDiagnostics{} |
| c.GetDiagnostics().Scheduling = diag |
| // Don't exceed SCHEDULING_LIMIT_PER_TASK_SPEC. |
| if countByTaskSpec[c.Name] == SCHEDULING_LIMIT_PER_TASK_SPEC { |
| sklog.Warningf("Too many tasks to schedule for %s; not scheduling more than %d", c.Name, SCHEDULING_LIMIT_PER_TASK_SPEC) |
| diag.OverSchedulingLimitPerTaskSpec = true |
| continue |
| } |
| // TODO(borenet): Make this threshold configurable. |
| if c.Score <= 0.0 { |
| // This normally shouldn't happen, but it can happen if there is both a |
| // forced task and an unused retry for the same repo state. |
| sklog.Debugf("candidate %s @ %s has a score of %2f; skipping (%d commits). %+v", c.Name, c.Revision, c.Score, len(c.Commits), c.Diagnostics.Scoring) |
| diag.ScoreBelowThreshold = true |
| continue |
| } |
| |
| // For each dimension of the task, find the set of bots which matches. |
| matches := util.StringSet{} |
| for i, d := range c.TaskSpec.Dimensions { |
| if i == 0 { |
| matches = matches.Union(botsByDim[d]) |
| } else { |
| matches = matches.Intersect(botsByDim[d]) |
| } |
| } |
| |
| // Set of candidates that could have used the same bots. |
| similarCandidates := map[*TaskCandidate]struct{}{} |
| var lowestScoreSimilarCandidate *TaskCandidate |
| addCandidates := func(key string) { |
| candidates := botToCandidates[key] |
| for _, candidate := range candidates { |
| similarCandidates[candidate] = struct{}{} |
| } |
| if len(candidates) > 0 { |
| lastCandidate := candidates[len(candidates)-1] |
| if lowestScoreSimilarCandidate == nil || lowestScoreSimilarCandidate.Score > lastCandidate.Score { |
| lowestScoreSimilarCandidate = lastCandidate |
| } |
| } |
| botToCandidates[key] = append(candidates, c) |
| } |
| |
| // Choose a particular bot to mark as used. Sort by ID so that the choice is deterministic. |
| var chosenBot string |
| if len(matches) > 0 { |
| diag.MatchingBots = matches.Keys() |
| sort.Strings(diag.MatchingBots) |
| for botId := range matches { |
| if (chosenBot == "" || botId < chosenBot) && !usedBots[botId] { |
| chosenBot = botId |
| } |
| addCandidates(botId) |
| } |
| } else { |
| diag.NoBotsAvailable = true |
| diag.MatchingBots = nil |
| // Use sorted concatenated dimensions instead of botId as the key. |
| dims := util.CopyStringSlice(c.TaskSpec.Dimensions) |
| sort.Strings(dims) |
| addCandidates(strings.Join(dims, ",")) |
| } |
| diag.NumHigherScoreSimilarCandidates = len(similarCandidates) |
| if lowestScoreSimilarCandidate != nil { |
| diag.LastSimilarCandidate = &lowestScoreSimilarCandidate.TaskKey |
| } |
| |
| if chosenBot != "" { |
| // We're going to run this task. |
| diag.Selected = true |
| usedBots[chosenBot] = true |
| |
| // Add the task to the scheduling list. |
| rv = append(rv, c) |
| countByTaskSpec[c.Name]++ |
| } |
| } |
| sort.Sort(taskCandidateSlice(rv)) |
| return rv |
| } |
| |
| // mergeCASInputs uploads inputs for the taskCandidates to content-addressed |
| // storage. Returns the list of candidates which were successfully merged, with |
| // their CasInput set, and any error which occurred. Note that the successful |
| // candidates AND an error may both be returned if some were successfully merged |
| // but others failed. |
| func (s *TaskScheduler) mergeCASInputs(ctx context.Context, candidates []*TaskCandidate) ([]*TaskCandidate, error) { |
| ctx, span := trace.StartSpan(ctx, "mergeCASInputs", trace.WithSampler(trace.ProbabilitySampler(0.01))) |
| defer span.End() |
| start := now.Now(ctx) |
| |
| mergedCandidates := make([]*TaskCandidate, len(candidates)) |
| eg, eCtx := errgroup.WithContext(ctx) |
| |
| for i, c := range candidates { |
| i, c := i, c // https://golang.org/doc/faq#closures_and_goroutines |
| eg.Go(func() error { |
| if eCtx.Err() != nil { |
| return nil |
| } |
| digest, err := s.rbeCas.Merge(eCtx, c.CasDigests) |
| if err != nil { |
| errStr := err.Error() |
| c.GetDiagnostics().Triggering = &taskCandidateTriggeringDiagnostics{IsolateError: errStr} |
| return skerr.Wrapf(err, "failed to merge CAS inputs for %s @ %s", c.Name, c.RepoState.String()) |
| } |
| c.CasInput = digest |
| mergedCandidates[i] = c |
| return nil |
| }) |
| } |
| end := now.Now(ctx) |
| if dur := end.Sub(start); dur > time.Minute { |
| sklog.Warningf("mergeCASInputs took longer than usual: %s", dur) |
| } |
| |
| if err := eg.Wait(); err != nil { |
| partialSuccesses := make([]*TaskCandidate, 0, len(candidates)) |
| // The non-nil task candidates were successes, so find those and put them in a new slice. |
| for _, c := range mergedCandidates { |
| if c != nil { |
| partialSuccesses = append(partialSuccesses, c) |
| } |
| } |
| return partialSuccesses, skerr.Wrap(err) |
| } |
| |
| return mergedCandidates, nil |
| } |
| |
| // triggerTasks triggers the given slice of tasks to run on Swarming and returns |
| // a channel of the successfully-triggered tasks which is closed after all tasks |
| // have been triggered or failed. Each failure is sent to errCh. |
| func (s *TaskScheduler) triggerTasks(ctx context.Context, candidates []*TaskCandidate, errCh chan<- error) <-chan *types.Task { |
| ctx, span := trace.StartSpan(ctx, "triggerTasks") |
| defer span.End() |
| |
| triggered := make(chan *types.Task) |
| var wg sync.WaitGroup |
| for _, candidate := range candidates { |
| wg.Add(1) |
| go func(candidate *TaskCandidate) { |
| defer wg.Done() |
| t := candidate.MakeTask() |
| diag := &taskCandidateTriggeringDiagnostics{} |
| candidate.GetDiagnostics().Triggering = diag |
| recordErr := func(context string, err error) { |
| err = skerr.Wrapf(err, "with context %s", context) |
| diag.TriggerError = err.Error() |
| errCh <- err |
| } |
| if err := s.db.AssignId(ctx, t); err != nil { |
| recordErr("Failed to assign id", err) |
| return |
| } |
| diag.TaskId = t.Id |
| req, err := candidate.MakeTaskRequest(t.Id, s.rbeCasInstance, s.pubsubTopic) |
| if err != nil { |
| recordErr("Failed to create task request", err) |
| return |
| } |
| s.pendingInsertMtx.Lock() |
| s.pendingInsert[t.Id] = true |
| s.pendingInsertMtx.Unlock() |
| taskExecutor, ok := s.taskExecutors[t.TaskExecutor] |
| if !ok { |
| recordErr("Failed to trigger task", skerr.Fmt("Unknown task executor %q wanted by %s", candidate.TaskSpec.TaskExecutor, candidate.Name)) |
| return |
| } |
| resp, err := taskExecutor.TriggerTask(ctx, req) |
| if err != nil { |
| s.pendingInsertMtx.Lock() |
| delete(s.pendingInsert, t.Id) |
| s.pendingInsertMtx.Unlock() |
| jobIds := make([]string, 0, len(candidate.Jobs)) |
| for _, job := range candidate.Jobs { |
| jobIds = append(jobIds, job.Id) |
| } |
| recordErr("Failed to trigger task", skerr.Wrapf(err, "%q needed for jobs: %+v", candidate.Name, jobIds)) |
| return |
| } |
| t.Created = resp.Created |
| t.Started = resp.Started |
| t.Finished = resp.Finished |
| t.SwarmingTaskId = resp.ID |
| // The task may have been de-duplicated. |
| if resp.Status == types.TASK_STATUS_SUCCESS { |
| if _, err := t.UpdateFromTaskResult(resp); err != nil { |
| recordErr("Failed to update de-duplicated task", err) |
| return |
| } |
| } |
| triggered <- t |
| }(candidate) |
| } |
| go func() { |
| wg.Wait() |
| close(triggered) |
| }() |
| return triggered |
| } |
| |
| // scheduleTasks queries for free Swarming bots and triggers tasks according |
| // to relative priorities in the queue. |
| func (s *TaskScheduler) scheduleTasks(ctx context.Context, bots []*types.Machine, queue []*TaskCandidate) error { |
| ctx, span := trace.StartSpan(ctx, "scheduleTasks") |
| defer span.End() |
| |
| // Match free bots with tasks. |
| candidates := getCandidatesToSchedule(ctx, bots, queue) |
| |
| // Merge CAS inputs for the tasks. |
| merged, mergeErr := s.mergeCASInputs(ctx, candidates) |
| if mergeErr != nil && len(merged) == 0 { |
| return skerr.Wrapf(mergeErr, "mergeCAS with no partial successes") |
| } |
| |
| // Setup the error channel. |
| errs := []error{} |
| if mergeErr != nil { |
| errs = append(errs, mergeErr) |
| } |
| errCh := make(chan error) |
| var errWg sync.WaitGroup |
| errWg.Add(1) |
| go func() { |
| defer errWg.Done() |
| for err := range errCh { |
| errs = append(errs, err) |
| } |
| }() |
| |
| // Trigger Swarming tasks. |
| triggered := s.triggerTasks(ctx, merged, errCh) |
| |
| // Collect the tasks we triggered. |
| numTriggered := 0 |
| insert := map[string]map[string][]*types.Task{} |
| for t := range triggered { |
| byRepo, ok := insert[t.Repo] |
| if !ok { |
| byRepo = map[string][]*types.Task{} |
| insert[t.Repo] = byRepo |
| } |
| byRepo[t.Name] = append(byRepo[t.Name], t) |
| numTriggered++ |
| } |
| close(errCh) |
| errWg.Wait() |
| |
| if len(insert) > 0 { |
| // Insert the newly-triggered tasks into the DB. |
| err := s.addTasks(ctx, insert) |
| |
| // Remove the tasks from the pending map, regardless of whether |
| // we successfully inserted into the DB. |
| s.pendingInsertMtx.Lock() |
| for _, byRepo := range insert { |
| for _, byName := range byRepo { |
| for _, t := range byName { |
| delete(s.pendingInsert, t.Id) |
| } |
| } |
| } |
| s.pendingInsertMtx.Unlock() |
| |
| // Handle failure/success. |
| if err != nil { |
| errs = append(errs, skerr.Wrapf(err, "Triggered tasks but failed to insert into DB")) |
| } else { |
| // Organize the triggered task by TaskKey. |
| remove := make(map[types.TaskKey]*types.Task, numTriggered) |
| for _, byRepo := range insert { |
| for _, byName := range byRepo { |
| for _, t := range byName { |
| remove[t.TaskKey] = t |
| } |
| } |
| } |
| if len(remove) != numTriggered { |
| return skerr.Fmt("Number of tasks to remove from the queue (%d) differs from the number of tasks triggered (%d)", len(remove), numTriggered) |
| } |
| |
| // Remove the tasks from the queue. |
| newQueue := make([]*TaskCandidate, 0, len(queue)-numTriggered) |
| for _, c := range queue { |
| if _, ok := remove[c.TaskKey]; !ok { |
| newQueue = append(newQueue, c) |
| } |
| } |
| |
| // Note; if regenerateQueue and scheduleTasks are ever decoupled so that |
| // the queue is reused by multiple runs of scheduleTasks, we'll need to |
| // address the fact that some candidates may still have their |
| // StoleFromId pointing to candidates which have been triggered and |
| // removed from the queue. In that case, we should just need to write a |
| // loop which updates those candidates to use the IDs of the newly- |
| // inserted Tasks in the database rather than the candidate ID. |
| |
| sklog.Infof("Triggered %d tasks on %d bots (%d in queue).", numTriggered, len(bots), len(queue)) |
| queue = newQueue |
| } |
| } else { |
| sklog.Infof("Triggered no tasks (%d in queue, %d bots available)", len(queue), len(bots)) |
| } |
| s.triggeredCount.Inc(int64(numTriggered)) |
| s.queueMtx.Lock() |
| defer s.queueMtx.Unlock() |
| s.queue = queue |
| s.lastScheduled = now.Now(ctx) |
| |
| if len(errs) == 1 { |
| return skerr.Wrapf(errs[0], "Cannot schedule all tasks") |
| } else if len(errs) > 1 { |
| rvErr := "Got failures: " |
| for _, e := range errs { |
| rvErr += fmt.Sprintf("\n%s\n", e) |
| } |
| return skerr.Fmt(rvErr) |
| } |
| return nil |
| } |
| |
| // MainLoop runs a single end-to-end task scheduling loop. |
| func (s *TaskScheduler) MainLoop(ctx context.Context) error { |
| ctx, span := trace.StartSpan(ctx, "taskscheduler_MainLoop") |
| defer span.End() |
| |
| sklog.Infof("Task Scheduler MainLoop starting...") |
| diagStart := now.Now(ctx) |
| |
| // Load the free machines from all task executors. |
| var freeMachines []*types.Machine |
| var freeMachinesMtx sync.Mutex |
| getFreeMachinesGroup := errgroup.Group{} |
| for taskExecName, taskExec := range s.taskExecutors { |
| if taskExecName == types.TaskExecutor_UseDefault { |
| // This one will be handled by the explicitly-named entry. |
| continue |
| } |
| taskExec := taskExec |
| getFreeMachinesGroup.Go(func() error { |
| m, err := getFreeMachines(ctx, taskExec, s.busyBots, s.pools) |
| if err != nil { |
| return err |
| } |
| freeMachinesMtx.Lock() |
| defer freeMachinesMtx.Unlock() |
| freeMachines = append(freeMachines, m...) |
| return nil |
| }) |
| } |
| |
| if err := s.tCache.Update(ctx); err != nil { |
| return skerr.Wrapf(err, "Failed to update task cache") |
| } |
| |
| if err := s.jCache.Update(ctx); err != nil { |
| return skerr.Wrapf(err, "Failed to update job cache") |
| } |
| |
| if err := s.updateUnfinishedJobs(ctx); err != nil { |
| return skerr.Wrapf(err, "Failed to update unfinished jobs") |
| } |
| |
| if err := s.skipTasks.Update(ctx); err != nil { |
| return skerr.Wrapf(err, "Failed to update skip_tasks") |
| } |
| |
| // Regenerate the queue. |
| sklog.Infof("Task Scheduler regenerating the queue...") |
| queue, allCandidates, err := s.regenerateTaskQueue(ctx) |
| if err != nil { |
| return skerr.Wrapf(err, "Failed to regenerate task queue") |
| } |
| |
| if getFreeMachinesErr := getFreeMachinesGroup.Wait(); getFreeMachinesErr != nil { |
| return skerr.Wrapf(getFreeMachinesErr, "Failed to retrieve free machines") |
| } |
| |
| sklog.Infof("Task Scheduler scheduling tasks...") |
| err = s.scheduleTasks(ctx, freeMachines, queue) |
| |
| // An error from scheduleTasks can indicate a partial error; write diagnostics |
| // in either case. |
| if s.diagClient != nil { |
| diagEnd := now.Now(ctx) |
| s.testWaitGroup.Add(1) |
| go func() { |
| defer s.testWaitGroup.Done() |
| util.LogErr(writeMainLoopDiagnosticsToGCS(ctx, diagStart, diagEnd, s.diagClient, s.diagInstance, allCandidates, freeMachines, err)) |
| }() |
| } |
| |
| if err != nil { |
| return skerr.Wrapf(err, "Failed to schedule tasks") |
| } |
| |
| // Clean up the taskCfgCache. |
| if err := s.taskCfgCache.Cleanup(ctx, now.Now(ctx).Sub(s.window.EarliestStart())); err != nil { |
| return skerr.Wrapf(err, "failed to cleanup task cfg cache") |
| } |
| |
| sklog.Infof("Task Scheduler MainLoop finished.") |
| return nil |
| } |
| |
| // QueueLen returns the length of the queue. |
| func (s *TaskScheduler) QueueLen() int { |
| s.queueMtx.RLock() |
| defer s.queueMtx.RUnlock() |
| return len(s.queue) |
| } |
| |
| // CloneQueue returns a full copy of the queue. |
| func (s *TaskScheduler) CloneQueue() []*TaskCandidate { |
| s.queueMtx.RLock() |
| defer s.queueMtx.RUnlock() |
| rv := make([]*TaskCandidate, 0, len(s.queue)) |
| for _, c := range s.queue { |
| rv = append(rv, c.CopyNoDiagnostics()) |
| } |
| return rv |
| } |
| |
| // timeDecay24Hr computes a linear time decay amount for the given duration, |
| // given the requested decay amount at 24 hours. |
| func timeDecay24Hr(decayAmt24Hr float64, elapsed time.Duration) float64 { |
| return math.Max(1.0-(1.0-decayAmt24Hr)*(float64(elapsed)/float64(24*time.Hour)), 0.0) |
| } |
| |
| // timeDecayForCommit computes a multiplier for a task candidate score based |
| // on how long ago the given commit landed. This allows us to prioritize more |
| // recent commits. |
| func (s *TaskScheduler) timeDecayForCommit(currentTime, commitTime time.Time) float64 { |
| if s.timeDecayAmt24Hr == 1.0 { |
| // Shortcut for special case. |
| return 1.0 |
| } |
| rv := timeDecay24Hr(s.timeDecayAmt24Hr, commitTime.Sub(commitTime)) |
| // TODO(benjaminwagner): Change to an exponential decay to prevent |
| // zero/negative scores. |
| //if rv == 0.0 { |
| // sklog.Warningf("timeDecayForCommit is zero. Now: %s, Commit: %s ts %s, TimeDecay: %2f\nDetails: %v", now, commit.Hash, commit.Timestamp, s.timeDecayAmt24Hr, commit) |
| //} |
| return rv |
| } |
| |
| func (s *TaskScheduler) GetSkipTasks() *skip_tasks.DB { |
| return s.skipTasks |
| } |
| |
| // testedness computes the total "testedness" of a set of commits covered by a |
| // task whose blamelist included N commits. The "testedness" of a task spec at a |
| // given commit is defined as follows: |
| // |
| // -1.0 if no task has ever included this commit for this task spec. |
| // 1.0 if a task was run for this task spec AT this commit. |
| // 1.0 / N if a task for this task spec has included this commit, where N is |
| // |
| // the number of commits included in the task. |
| // |
| // This function gives the sum of the testedness for a blamelist of N commits. |
| func testedness(n int) float64 { |
| if n < 0 { |
| // This should never happen. |
| sklog.Errorf("Task score function got a blamelist with %d commits", n) |
| return -1.0 |
| } else if n == 0 { |
| // Zero commits have zero testedness. |
| return 0.0 |
| } else if n == 1 { |
| return 1.0 |
| } else { |
| return 1.0 + float64(n-1)/float64(n) |
| } |
| } |
| |
| // testednessIncrease computes the increase in "testedness" obtained by running |
| // a task with the given blamelist length which may have "stolen" commits from |
| // a previous task with a different blamelist length. To do so, we compute the |
| // "testedness" for every commit affected by the task, before and after the |
| // task would run. We subtract the "before" score from the "after" score to |
| // obtain the "testedness" increase at each commit, then sum them to find the |
| // total increase in "testedness" obtained by running the task. |
| func testednessIncrease(blamelistLength, stoleFromBlamelistLength int) float64 { |
| // Invalid inputs. |
| if blamelistLength <= 0 || stoleFromBlamelistLength < 0 { |
| return -1.0 |
| } |
| |
| if stoleFromBlamelistLength == 0 { |
| // This task covers previously-untested commits. Previous testedness |
| // is -1.0 for each commit in the blamelist. |
| beforeTestedness := float64(-blamelistLength) |
| afterTestedness := testedness(blamelistLength) |
| return afterTestedness - beforeTestedness |
| } else if blamelistLength == stoleFromBlamelistLength { |
| // This is a retry. It provides no testedness increase, so shortcut here |
| // rather than perform the math to obtain the same answer. |
| return 0.0 |
| } else { |
| // This is a bisect/backfill. |
| beforeTestedness := testedness(stoleFromBlamelistLength) |
| afterTestedness := testedness(blamelistLength) + testedness(stoleFromBlamelistLength-blamelistLength) |
| return afterTestedness - beforeTestedness |
| } |
| } |
| |
| // getFreeMachines returns a slice of free machines. |
| func getFreeMachines(ctx context.Context, taskExec types.TaskExecutor, busy *busyBots, pools []string) ([]*types.Machine, error) { |
| ctx, span := trace.StartSpan(ctx, "getFreeMachines") |
| defer span.End() |
| |
| // Query for free machines and pending tasks in all pools. |
| var wg sync.WaitGroup |
| machines := []*types.Machine{} |
| pending := []*types.TaskResult{} |
| errs := []error{} |
| var mtx sync.Mutex |
| for _, pool := range pools { |
| // Free bots. |
| wg.Add(1) |
| go func(pool string) { |
| defer wg.Done() |
| b, err := taskExec.GetFreeMachines(ctx, pool) |
| mtx.Lock() |
| defer mtx.Unlock() |
| if err != nil { |
| errs = append(errs, err) |
| } else { |
| machines = append(machines, b...) |
| } |
| }(pool) |
| |
| // Pending tasks. |
| wg.Add(1) |
| go func(pool string) { |
| defer wg.Done() |
| pendingTasks, err := taskExec.GetPendingTasks(ctx, pool) |
| mtx.Lock() |
| defer mtx.Unlock() |
| if err != nil { |
| errs = append(errs, err) |
| } else { |
| pending = append(pending, pendingTasks...) |
| } |
| }(pool) |
| } |
| |
| wg.Wait() |
| if len(errs) > 0 { |
| return nil, skerr.Fmt("Got errors loading bots and tasks from Swarming: %v", errs) |
| } |
| |
| rv := make([]*types.Machine, 0, len(machines)) |
| for _, machine := range machines { |
| if machine.IsDead { |
| continue |
| } |
| if machine.IsQuarantined { |
| continue |
| } |
| if machine.CurrentTaskID != "" { |
| continue |
| } |
| rv = append(rv, machine) |
| } |
| busy.RefreshTasks(pending) |
| return busy.Filter(rv), nil |
| } |
| |
| // updateUnfinishedTasks queries Swarming for all unfinished tasks and updates |
| // their status in the DB. |
| func (s *TaskScheduler) updateUnfinishedTasks(ctx context.Context) error { |
| ctx, span := trace.StartSpan(ctx, "updateUnfinishedTasks") |
| defer span.End() |
| |
| // Update the TaskCache. |
| if err := s.tCache.Update(ctx); err != nil { |
| return err |
| } |
| |
| tasks, err := s.tCache.UnfinishedTasks() |
| if err != nil { |
| return err |
| } |
| sort.Sort(types.TaskSlice(tasks)) |
| |
| // Query for all unfinished tasks. |
| sklog.Infof("Querying states of %d unfinished tasks.", len(tasks)) |
| unfinishedIDsByExecutor := map[string][]string{} |
| for _, t := range tasks { |
| executor := t.TaskExecutor |
| if executor == types.TaskExecutor_UseDefault { |
| executor = types.DefaultTaskExecutor |
| } |
| unfinishedIDsByExecutor[executor] = append(unfinishedIDsByExecutor[executor], t.SwarmingTaskId) |
| } |
| for executorName := range unfinishedIDsByExecutor { |
| ids := unfinishedIDsByExecutor[executorName] |
| taskExecutor, ok := s.taskExecutors[executorName] |
| if !ok { |
| return skerr.Fmt("Tasks use unknown task executor %q: %v", executorName, ids) |
| } |
| finishedStates, err := taskExecutor.GetTaskCompletionStatuses(ctx, ids) |
| if err != nil { |
| return err |
| } |
| finished := make([]*types.Task, 0, len(finishedStates)) |
| for idx, task := range tasks { |
| if finishedStates[idx] { |
| finished = append(finished, task) |
| } |
| } |
| |
| // Update any newly-finished tasks. |
| if len(finished) > 0 { |
| sklog.Infof("Updating %d newly-finished tasks.", len(finished)) |
| var wg sync.WaitGroup |
| errs := make([]error, len(tasks)) |
| for i, t := range finished { |
| wg.Add(1) |
| go func(idx int, t *types.Task) { |
| defer wg.Done() |
| taskResult, err := taskExecutor.GetTaskResult(ctx, t.SwarmingTaskId) |
| if err != nil { |
| errs[idx] = skerr.Wrapf(err, "Failed to update unfinished task %s; failed to get updated task from swarming", t.SwarmingTaskId) |
| return |
| } |
| modified, err := db.UpdateDBFromTaskResult(ctx, s.db, taskResult) |
| if err != nil { |
| errs[idx] = skerr.Wrapf(err, "Failed to update unfinished task %s", t.SwarmingTaskId) |
| return |
| } else if modified { |
| s.updateUnfinishedCount.Inc(1) |
| } |
| }(i, t) |
| } |
| wg.Wait() |
| for _, err := range errs { |
| if err != nil { |
| return err |
| } |
| } |
| } |
| } |
| |
| return s.tCache.Update(ctx) |
| } |
| |
| // updateUnfinishedJobs updates all not-yet-finished Jobs to determine if their |
| // state has changed. |
| func (s *TaskScheduler) updateUnfinishedJobs(ctx context.Context) error { |
| ctx, span := trace.StartSpan(ctx, "updateUnfinishedJobs") |
| defer span.End() |
| |
| jobs, err := s.jCache.InProgressJobs() |
| if err != nil { |
| return err |
| } |
| |
| modifiedJobs := make([]*types.Job, 0, len(jobs)) |
| modifiedTasks := make(map[string]*types.Task, len(jobs)) |
| for _, j := range jobs { |
| tasks, err := s.getTasksForJob(j) |
| if err != nil { |
| return err |
| } |
| summaries := make(map[string][]*types.TaskSummary, len(tasks)) |
| for k, v := range tasks { |
| cpy := make([]*types.TaskSummary, 0, len(v)) |
| for _, t := range v { |
| if existing := modifiedTasks[t.Id]; existing != nil { |
| t = existing |
| } |
| cpy = append(cpy, t.MakeTaskSummary()) |
| // The Jobs list is always sorted. |
| oldLen := len(t.Jobs) |
| t.Jobs = util.InsertStringSorted(t.Jobs, j.Id) |
| if len(t.Jobs) > oldLen { |
| modifiedTasks[t.Id] = t |
| } |
| } |
| summaries[k] = cpy |
| } |
| if !reflect.DeepEqual(summaries, j.Tasks) { |
| j.Tasks = summaries |
| j.Status = j.DeriveStatus() |
| if j.Done() { |
| j.Finished = now.Now(ctx) |
| } |
| modifiedJobs = append(modifiedJobs, j) |
| } |
| } |
| if len(modifiedTasks) > 0 { |
| tasks := make([]*types.Task, 0, len(modifiedTasks)) |
| for _, t := range modifiedTasks { |
| tasks = append(tasks, t) |
| } |
| if err := s.putTasksInChunks(ctx, tasks); err != nil { |
| return err |
| } |
| } |
| if len(modifiedJobs) > 0 { |
| if err := s.putJobsInChunks(ctx, modifiedJobs); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // getTasksForJob finds all Tasks for the given Job. It returns the Tasks |
| // in a map keyed by name. |
| func (s *TaskScheduler) getTasksForJob(j *types.Job) (map[string][]*types.Task, error) { |
| tasks := map[string][]*types.Task{} |
| for d := range j.Dependencies { |
| key := j.MakeTaskKey(d) |
| gotTasks, err := s.tCache.GetTasksByKey(key) |
| if err != nil { |
| return nil, err |
| } |
| tasks[d] = gotTasks |
| } |
| return tasks, nil |
| } |
| |
| // addTasksSingleTaskSpec computes the blamelist for each task in tasks, all of |
| // which must have the same Repo and Name fields, and inserts/updates them in |
| // the TaskDB. Also adjusts blamelists of existing tasks. |
| func (s *TaskScheduler) addTasksSingleTaskSpec(ctx context.Context, tasks []*types.Task) error { |
| sort.Sort(types.TaskSlice(tasks)) |
| // TODO(borenet): This is the only user of cacheWrapper; can we remove it? |
| cache := newCacheWrapper(s.tCache) |
| repoName := tasks[0].Repo |
| taskName := tasks[0].Name |
| repo, ok := s.repos[repoName] |
| if !ok { |
| return skerr.Fmt("No such repo: %s", repoName) |
| } |
| |
| commitsBuf := make([]*repograph.Commit, 0, MAX_BLAMELIST_COMMITS) |
| updatedTasks := map[string]*types.Task{} |
| for _, task := range tasks { |
| if task.Repo != repoName || task.Name != taskName { |
| return skerr.Fmt("Mismatched Repo or Name: %v", tasks) |
| } |
| if task.Id == "" { |
| if err := s.db.AssignId(ctx, task); err != nil { |
| return err |
| } |
| } |
| if task.IsTryJob() { |
| updatedTasks[task.Id] = task |
| continue |
| } |
| // Compute blamelist. |
| revision := repo.Get(task.Revision) |
| if revision == nil { |
| return skerr.Fmt("No such commit %s in %s.", task.Revision, task.Repo) |
| } |
| if !s.window.TestTime(task.Repo, revision.Timestamp) { |
| return skerr.Fmt("Can not add task %s with revision %s (at %s) before window start.", task.Id, task.Revision, revision.Timestamp) |
| } |
| commits, stealingFrom, err := ComputeBlamelist(ctx, cache, repo, task.Name, task.Repo, revision, commitsBuf, s.taskCfgCache, s.window) |
| if err != nil { |
| return err |
| } |
| task.Commits = commits |
| if len(task.Commits) > 0 && !util.In(task.Revision, task.Commits) { |
| sklog.Errorf("task %s (%s @ %s) doesn't have its own revision in its blamelist: %v", task.Id, task.Name, task.Revision, task.Commits) |
| } |
| updatedTasks[task.Id] = task |
| cache.insert(task) |
| if stealingFrom != nil && stealingFrom.Id != task.Id { |
| stole := util.NewStringSet(commits) |
| oldC := util.NewStringSet(stealingFrom.Commits) |
| newC := oldC.Complement(stole) |
| if len(newC) == 0 { |
| stealingFrom.Commits = nil |
| } else { |
| newCommits := make([]string, 0, len(newC)) |
| for c := range newC { |
| newCommits = append(newCommits, c) |
| } |
| stealingFrom.Commits = newCommits |
| } |
| updatedTasks[stealingFrom.Id] = stealingFrom |
| cache.insert(stealingFrom) |
| } |
| } |
| putTasks := make([]*types.Task, 0, len(updatedTasks)) |
| for _, task := range updatedTasks { |
| putTasks = append(putTasks, task) |
| } |
| if err := s.putTasks(ctx, putTasks); err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| // addTasks inserts the given tasks into the TaskDB, updating blamelists. The |
| // provided Tasks should have all fields initialized except for Commits, which |
| // will be overwritten, and optionally Id, which will be assigned if necessary. |
| // addTasks updates existing Tasks' blamelists, if needed. The provided map |
| // groups Tasks by repo and TaskSpec name. May return error on partial success. |
| // May modify Commits and Id of argument tasks on error. |
| func (s *TaskScheduler) addTasks(ctx context.Context, taskMap map[string]map[string][]*types.Task) error { |
| ctx, span := trace.StartSpan(ctx, "addTasks", trace.WithSampler(trace.ProbabilitySampler(0.01))) |
| defer span.End() |
| type queueItem struct { |
| Repo string |
| Name string |
| } |
| queue := map[queueItem]bool{} |
| for repo, byName := range taskMap { |
| for name, tasks := range byName { |
| if len(tasks) == 0 { |
| continue |
| } |
| queue[queueItem{ |
| Repo: repo, |
| Name: name, |
| }] = true |
| } |
| } |
| |
| for i := 0; i < db.NUM_RETRIES; i++ { |
| if len(queue) == 0 { |
| return nil |
| } |
| if err := s.tCache.Update(ctx); err != nil { |
| return err |
| } |
| |
| done := make(chan queueItem) |
| errs := make(chan error, len(queue)) |
| wg := sync.WaitGroup{} |
| for item := range queue { |
| wg.Add(1) |
| go func(item queueItem, tasks []*types.Task) { |
| defer wg.Done() |
| if err := s.addTasksSingleTaskSpec(ctx, tasks); err != nil { |
| if !db.IsConcurrentUpdate(err) { |
| errs <- skerr.Wrapf(err, "Error adding tasks for %s (in repo %s)", item.Name, item.Repo) |
| } |
| } else { |
| done <- item |
| } |
| }(item, taskMap[item.Repo][item.Name]) |
| } |
| go func() { |
| wg.Wait() |
| close(done) |
| close(errs) |
| }() |
| for item := range done { |
| delete(queue, item) |
| } |
| rvErrs := []error{} |
| for err := range errs { |
| sklog.Error(err) |
| rvErrs = append(rvErrs, err) |
| } |
| if len(rvErrs) != 0 { |
| return rvErrs[0] |
| } |
| } |
| |
| if len(queue) > 0 { |
| return skerr.Fmt("addTasks: %d consecutive ErrConcurrentUpdate; %d of %d TaskSpecs failed. %#v", db.NUM_RETRIES, len(queue), len(taskMap), queue) |
| } |
| return nil |
| } |
| |
| // HandleSwarmingPubSub loads the given Swarming task ID from Swarming and |
| // updates the associated types.Task in the database. Returns a bool indicating |
| // whether the pubsub message should be acknowledged. |
| func (s *TaskScheduler) HandleSwarmingPubSub(msg *swarming.PubSubTaskMessage) bool { |
| ctx, span := trace.StartSpan(context.Background(), "taskscheduler_HandleSwarmingPubSub", trace.WithSampler(trace.ProbabilitySampler(0.01))) |
| defer span.End() |
| s.pubsubCount.Inc(1) |
| if msg.UserData == "" { |
| // This message is invalid. ACK it to make it go away. |
| return true |
| } |
| |
| // If the task has been triggered but not yet inserted into the DB, NACK |
| // the message so that we'll receive it later. |
| s.pendingInsertMtx.RLock() |
| isPending := s.pendingInsert[msg.UserData] |
| s.pendingInsertMtx.RUnlock() |
| if isPending { |
| sklog.Debugf("Received pub/sub message for task which hasn't yet been inserted into the db: %s (%s); not ack'ing message; will try again later.", msg.SwarmingTaskId, msg.UserData) |
| return false |
| } |
| |
| // Obtain the Swarming task data. |
| res, err := s.taskExecutors[types.TaskExecutor_Swarming].GetTaskResult(ctx, msg.SwarmingTaskId) |
| if err != nil { |
| sklog.Errorf("pubsub: Failed to retrieve task from Swarming: %s", err) |
| return true |
| } |
| // Skip unfinished tasks. |
| if util.TimeIsZero(res.Finished) { |
| return true |
| } |
| // Update the task in the DB. |
| if _, err := db.UpdateDBFromTaskResult(ctx, s.db, res); err != nil { |
| // TODO(borenet): Some of these cases should never be hit, after all tasks |
| // start supplying the ID in msg.UserData. We should be able to remove the logic. |
| id := "<MISSING ID TAG>" |
| if err == db.ErrNotFound { |
| ids, ok := res.Tags[types.SWARMING_TAG_ID] |
| if ok { |
| id = ids[0] |
| } |
| if now.Now(ctx).Sub(res.Created) < 2*time.Minute { |
| sklog.Infof("Failed to update task %q: No such task ID: %q. Less than two minutes old; try again later.", msg.SwarmingTaskId, id) |
| return false |
| } |
| sklog.Errorf("Failed to update task %q: No such task ID: %q", msg.SwarmingTaskId, id) |
| return true |
| } else if err == db.ErrUnknownId { |
| expectedSwarmingTaskId := "<unknown>" |
| ids, ok := res.Tags[types.SWARMING_TAG_ID] |
| if ok { |
| id = ids[0] |
| t, err := s.db.GetTaskById(ctx, id) |
| if err != nil { |
| sklog.Errorf("Failed to update task %q; mismatched ID and failed to retrieve task from DB: %s", msg.SwarmingTaskId, err) |
| return true |
| } else { |
| expectedSwarmingTaskId = t.SwarmingTaskId |
| } |
| } |
| sklog.Errorf("Failed to update task %q: Task %s has a different Swarming task ID associated with it: %s", msg.SwarmingTaskId, id, expectedSwarmingTaskId) |
| return true |
| } else { |
| sklog.Errorf("Failed to update task %q: %s", msg.SwarmingTaskId, err) |
| return true |
| } |
| } |
| return true |
| } |