blob: 37f19425b1a4200859b40173407414b87b566920 [file] [log] [blame]
package scheduling
import (
multierror ""
const (
// Manually-forced jobs have high priority.
// Try jobs have high priority, equal to building at HEAD when we're
// 5 commits behind.
// When retrying a try job task that has failed, prioritize the retry
// lower than tryjob tasks that haven't run yet.
// When bisecting or retrying a task that failed or had a mishap, add a bonus
// to the raw score.
// A value of 0.75 means that a retry scores higher than a bisecting a
// successful task with a blamelist of 2 commits, but lower than testing new
// commits or bisecting successful tasks with blamelist of 3 or
// more. Bisecting a failure with a blamelist of 2 commits scores the same as
// bisecting a successful task with a blamelist of 4 commits.
// MAX_BLAMELIST_COMMITS is the maximum number of commits which are
// allowed in a task blamelist before we stop tracing commit history.
// Measurement name for task candidate counts by dimension set.
// To avoid errors resulting from DB transaction size limits, we
// restrict the number of tasks triggered per TaskSpec (we insert tasks
// into the DB in chunks by TaskSpec) to half of the DB transaction size
// limit (since we may need to update an existing whose blamelist was
// split by the new task).
var (
// TaskScheduler is a struct used for scheduling tasks on bots.
type TaskScheduler struct {
busyBots *busyBots
candidateMetrics map[string]metrics2.Int64Metric
candidateMetricsMtx sync.Mutex
db db.DB
diagClient gcs.GCSClient
diagInstance string
rbeCas cas.CAS
rbeCasInstance string
jCache cache.JobCache
lastScheduled time.Time // protected by queueMtx.
pendingInsert map[string]bool
pendingInsertMtx sync.RWMutex
pools []string
pubsubCount metrics2.Counter
pubsubTopic string
queue []*taskCandidate // protected by queueMtx.
queueMtx sync.RWMutex
repos repograph.Map
skipTasks *skip_tasks.DB
taskExecutor types.TaskExecutor
taskCfgCache *task_cfg_cache.TaskCfgCache
tCache cache.TaskCache
// testWaitGroup keeps track of any goroutines the TaskScheduler methods
// create so that tests can ensure all goroutines finish before asserting.
testWaitGroup sync.WaitGroup
timeDecayAmt24Hr float64
triggeredCount metrics2.Counter
updateUnfinishedCount metrics2.Counter
window *window.Window
func NewTaskScheduler(ctx context.Context, d db.DB, bl *skip_tasks.DB, period time.Duration, numCommits int, repos repograph.Map, rbeCas cas.CAS, rbeCasInstance string, taskExecutor types.TaskExecutor, c *http.Client, timeDecayAmt24Hr float64, pools []string, pubsubTopic string, taskCfgCache *task_cfg_cache.TaskCfgCache, ts oauth2.TokenSource, diagClient gcs.GCSClient, diagInstance string) (*TaskScheduler, error) {
// Repos must be updated before window is initialized; otherwise the repos may be uninitialized,
// resulting in the window being too short, causing the caches to be loaded with incomplete data.
for _, r := range repos {
if err := r.Update(ctx); err != nil {
return nil, fmt.Errorf("Failed initial repo sync: %s", err)
w, err := window.New(period, numCommits, repos)
if err != nil {
return nil, fmt.Errorf("Failed to create window: %s", err)
// Create caches.
tCache, err := cache.NewTaskCache(ctx, d, w, nil)
if err != nil {
return nil, fmt.Errorf("Failed to create TaskCache: %s", err)
jCache, err := cache.NewJobCache(ctx, d, w, nil)
if err != nil {
return nil, fmt.Errorf("Failed to create JobCache: %s", err)
s := &TaskScheduler{
skipTasks: bl,
busyBots: newBusyBots(),
candidateMetrics: map[string]metrics2.Int64Metric{},
db: d,
diagClient: diagClient,
diagInstance: diagInstance,
jCache: jCache,
pendingInsert: map[string]bool{},
pools: pools,
pubsubCount: metrics2.GetCounter("task_scheduler_pubsub_handler"),
pubsubTopic: pubsubTopic,
queue: []*taskCandidate{},
queueMtx: sync.RWMutex{},
rbeCas: rbeCas,
rbeCasInstance: rbeCasInstance,
repos: repos,
taskExecutor: taskExecutor,
taskCfgCache: taskCfgCache,
tCache: tCache,
timeDecayAmt24Hr: timeDecayAmt24Hr,
triggeredCount: metrics2.GetCounter("task_scheduler_triggered_count"),
updateUnfinishedCount: metrics2.GetCounter("task_scheduler_update_unfinished_tasks_count"),
window: w,
return s, nil
// Close cleans up resources used by the TaskScheduler.
func (s *TaskScheduler) Close() error {
err1 := s.taskCfgCache.Close()
err2 := s.rbeCas.Close()
if err1 != nil {
return skerr.Wrap(err1)
if err2 != nil {
return skerr.Wrap(err2)
return nil
// Start initiates the TaskScheduler's goroutines for scheduling tasks. beforeMainLoop
// will be run before each scheduling iteration.
func (s *TaskScheduler) Start(ctx context.Context, beforeMainLoop func()) {
lvScheduling := metrics2.NewLiveness("last_successful_task_scheduling")
cleanup.Repeat(5*time.Second, func(_ context.Context) {
// Explicitly ignore the passed-in context; this allows us to
// finish the current scheduling cycle even if the context is
// canceled, which helps prevent "orphaned" tasks which were
// triggered on Swarming but were not inserted into the DB.
ctx := context.Background()
sklog.Infof("Running beforeMainLoop()")
sklog.Infof("beforeMainLoop() finished.")
if err := s.MainLoop(ctx); err != nil {
sklog.Errorf("Failed to run the task scheduler: %s", err)
} else {
}, nil)
lvUpdateUnfinishedTasks := metrics2.NewLiveness("last_successful_tasks_update")
go util.RepeatCtx(ctx, 5*time.Minute, func(ctx context.Context) {
if err := s.updateUnfinishedTasks(); err != nil {
sklog.Errorf("Failed to run periodic tasks update: %s", err)
} else {
// putTask is a wrapper around DB.PutTask which adds the task to the cache.
func (s *TaskScheduler) putTask(t *types.Task) error {
if err := s.db.PutTask(t); err != nil {
return err
return nil
// putTasks is a wrapper around DB.PutTasks which adds the tasks to the cache.
func (s *TaskScheduler) putTasks(t []*types.Task) error {
if err := s.db.PutTasks(t); err != nil {
return err
return nil
// putTasksInChunks is a wrapper around DB.PutTasksInChunks which adds the tasks
// to the cache.
func (s *TaskScheduler) putTasksInChunks(t []*types.Task) error {
if err := s.db.PutTasksInChunks(t); err != nil {
return err
return nil
// putJob is a wrapper around DB.PutJob which adds the job to the cache.
func (s *TaskScheduler) putJob(j *types.Job) error {
if err := s.db.PutJob(j); err != nil {
return err
return nil
// putJobsInChunks is a wrapper around DB.PutJobsInChunks which adds the jobs
// to the cache.
func (s *TaskScheduler) putJobsInChunks(j []*types.Job) error {
if err := s.db.PutJobsInChunks(j); err != nil {
return err
return nil
// TaskSchedulerStatus is a struct which provides status information about the
// TaskScheduler.
type TaskSchedulerStatus struct {
LastScheduled time.Time `json:"last_scheduled"`
TopCandidates []*taskCandidate `json:"top_candidates"`
// Status returns the current status of the TaskScheduler.
func (s *TaskScheduler) Status() *TaskSchedulerStatus {
defer metrics2.FuncTimer().Stop()
defer s.queueMtx.RUnlock()
if len(s.queue) < n {
n = len(s.queue)
return &TaskSchedulerStatus{
LastScheduled: s.lastScheduled,
TopCandidates: s.queue[:n],
// TaskCandidateSearchTerms includes fields used for searching task candidates.
type TaskCandidateSearchTerms struct {
Dimensions []string `json:"dimensions"`
// SearchQueue returns all task candidates in the queue which match the given
// TaskKey. Any blank fields are considered to be wildcards.
func (s *TaskScheduler) SearchQueue(q *TaskCandidateSearchTerms) []*taskCandidate {
defer s.queueMtx.RUnlock()
rv := []*taskCandidate{}
for _, c := range s.queue {
// TODO(borenet): I wish there was a better way to do this.
if q.ForcedJobId != "" && c.ForcedJobId != q.ForcedJobId {
if q.Name != "" && c.Name != q.Name {
if q.Repo != "" && c.Repo != q.Repo {
if q.Revision != "" && c.Revision != q.Revision {
if q.Issue != "" && c.Issue != q.Issue {
if q.PatchRepo != "" && c.PatchRepo != q.PatchRepo {
if q.Patchset != "" && c.Patchset != q.Patchset {
if q.Server != "" && c.Server != q.Server {
if len(q.Dimensions) > 0 {
ok := true
for _, d := range q.Dimensions {
if !util.In(d, c.TaskSpec.Dimensions) {
ok = false
if !ok {
rv = append(rv, c)
return rv
// ComputeBlamelist computes the blamelist for a new task, specified by name,
// repo, and revision. Returns the list of commits covered by the task, and any
// previous task which part or all of the blamelist was "stolen" from (see
// below). There are three cases:
// 1. The new task tests commits which have not yet been tested. Trace commit
// history, accumulating commits until we find commits which have been tested
// by previous tasks.
// 2. The new task runs at the same commit as a previous task. This is a retry,
// so the entire blamelist of the previous task is "stolen".
// 3. The new task runs at a commit which is in a previous task's blamelist, but
// no task has run at the same commit. This is a bisect. Trace commit
// history, "stealing" commits from the previous task until we find a commit
// which was covered by a *different* previous task.
// Args:
// - cache: TaskCache instance.
// - repo: repograph.Graph instance corresponding to the repository of the task.
// - taskName: Name of the task.
// - repoName: Name of the repository for the task.
// - revision: Revision at which the task would run.
// - commitsBuf: Buffer for use as scratch space.
func ComputeBlamelist(ctx context.Context, cache cache.TaskCache, repo *repograph.Graph, taskName, repoName string, revision *repograph.Commit, commitsBuf []*repograph.Commit, tcc *task_cfg_cache.TaskCfgCache, w *window.Window) ([]string, *types.Task, error) {
commitsBuf = commitsBuf[:0]
var stealFrom *types.Task
// Run the helper function to recurse on commit history.
if err := revision.Recurse(func(commit *repograph.Commit) error {
// If this commit is outside the scheduling window, we won't
// have tasks in the cache for it, and thus we won't be able
// to compute the correct blamelist. Stop here.
if !w.TestCommit(repoName, commit) {
return repograph.ErrStopRecursing
// If the task spec is not defined at this commit, it can't be
// part of the blamelist.
rs := types.RepoState{
Repo: repoName,
Revision: commit.Hash,
cfg, cachedErr, err := tcc.Get(ctx, rs)
if cachedErr != nil {
sklog.Warningf("Stopping blamelist recursion at %s; TaskCfgCache has error: %s", commit.Hash, err)
return repograph.ErrStopRecursing
if err != nil {
if err == task_cfg_cache.ErrNoSuchEntry {
sklog.Warningf("Computing blamelist for %s in %s @ %s, no cached TasksCfg at %s; stopping blamelist calculation.", taskName, repoName, revision.Hash, commit.Hash)
return repograph.ErrStopRecursing
return skerr.Wrap(err)
if _, ok := cfg.Tasks[taskName]; !ok {
sklog.Infof("Computing blamelist for %s in %s @ %s, Task Spec not defined in %s (have %d tasks); stopping blamelist calculation.", taskName, repoName, revision.Hash, commit.Hash, len(cfg.Tasks))
return repograph.ErrStopRecursing
// Determine whether any task already includes this commit.
prev, err := cache.GetTaskForCommit(repoName, commit.Hash, taskName)
if err != nil {
return err
// If the blamelist is too large, just use a single commit.
if len(commitsBuf) > MAX_BLAMELIST_COMMITS {
commitsBuf = append(commitsBuf[:0], revision)
//sklog.Warningf("Found too many commits for %s @ %s; using single-commit blamelist.", taskName, revision.Hash)
// If we're stealing commits from a previous task but the current
// commit is not in any task's blamelist, we must have scrolled past
// the beginning of the tasks. Just return.
if prev == nil && stealFrom != nil {
return repograph.ErrStopRecursing
// If a previous task already included this commit, we have to make a decision.
if prev != nil {
// If this Task's Revision is already included in a different
// Task, then we're either bisecting or retrying a task. We'll
// "steal" commits from the previous Task's blamelist.
if len(commitsBuf) == 0 {
stealFrom = prev
// Another shortcut: If our Revision is the same as the
// Revision of the Task we're stealing commits from,
// ie. both tasks ran at the same commit, then this is a
// retry. Just steal all of the commits without doing
// any more work.
if stealFrom.Revision == revision.Hash {
commitsBuf = commitsBuf[:0]
for _, c := range stealFrom.Commits {
ptr := repo.Get(c)
if ptr == nil {
return fmt.Errorf("No such commit: %q", c)
commitsBuf = append(commitsBuf, ptr)
if stealFrom == nil || prev.Id != stealFrom.Id {
// If we've hit a commit belonging to a different task,
// we're done.
return repograph.ErrStopRecursing
// Add the commit.
commitsBuf = append(commitsBuf, commit)
// Recurse on the commit's parents.
return nil
}); err != nil && err != ERR_BLAMELIST_DONE {
return nil, nil, err
rv := make([]string, 0, len(commitsBuf))
for _, c := range commitsBuf {
rv = append(rv, c.Hash)
return rv, stealFrom, nil
type taskSchedulerMainLoopDiagnostics struct {
StartTime time.Time `json:"startTime"`
EndTime time.Time `json:"endTime"`
Error string `json:"error,omitEmpty"`
Candidates []*taskCandidate `json:"candidates"`
FreeBots []*types.Machine `json:"freeBots"`
// writeMainLoopDiagnosticsToGCS writes JSON containing allCandidates and
// freeBots to GCS. If called in a goroutine, the arguments may not be modified.
func writeMainLoopDiagnosticsToGCS(ctx context.Context, start time.Time, end time.Time, diagClient gcs.GCSClient, diagInstance string, allCandidates map[types.TaskKey]*taskCandidate, freeBots []*types.Machine, scheduleErr error) error {
defer metrics2.FuncTimer().Stop()
candidateSlice := make([]*taskCandidate, 0, len(allCandidates))
for _, c := range allCandidates {
candidateSlice = append(candidateSlice, c)
content := taskSchedulerMainLoopDiagnostics{
StartTime: start.UTC(),
EndTime: end.UTC(),
Candidates: candidateSlice,
FreeBots: freeBots,
if scheduleErr != nil {
content.Error = scheduleErr.Error()
filenameBase := start.UTC().Format("20060102T150405.000000000Z")
path := path.Join(diagInstance, GCS_MAIN_LOOP_DIAGNOSTICS_DIR, filenameBase+".json")
ctx, cancel := context.WithTimeout(ctx, GCS_DIAGNOSTICS_WRITE_TIMEOUT)
defer cancel()
return gcs.WithWriteFileGzip(diagClient, ctx, path, func(w io.Writer) error {
enc := json.NewEncoder(w)
enc.SetIndent("", " ")
return enc.Encode(&content)
// findTaskCandidatesForJobs returns the set of all taskCandidates needed by all
// currently-unfinished jobs.
func (s *TaskScheduler) findTaskCandidatesForJobs(ctx context.Context, unfinishedJobs []*types.Job) (map[types.TaskKey]*taskCandidate, error) {
defer metrics2.FuncTimer().Stop()
// Get the repo+commit+taskspecs for each job.
candidates := map[types.TaskKey]*taskCandidate{}
for _, j := range unfinishedJobs {
if !s.window.TestTime(j.Repo, j.Created) {
// If git history was changed, we should avoid running jobs at
// orphaned commits.
if s.repos[j.Repo].Get(j.Revision) == nil {
// TODO(borenet): Cancel the job.
// Add task candidates for this job.
for tsName := range j.Dependencies {
key := j.MakeTaskKey(tsName)
c, ok := candidates[key]
if !ok {
taskCfg, cachedErr, err := s.taskCfgCache.Get(ctx, j.RepoState)
if cachedErr != nil {
if err != nil {
return nil, skerr.Wrap(err)
spec, ok := taskCfg.Tasks[tsName]
if !ok {
// TODO(borenet): This should have already been caught when
// we validated the TasksCfg before inserting the job.
sklog.Errorf("Job %s wants task %s which is not defined in %+v", j.Name, tsName, j.RepoState)
casSpec, ok := taskCfg.CasSpecs[spec.CasSpec]
if !ok {
sklog.Errorf("Task %s depends on non-existent CasSpec %s", tsName, spec.CasSpec)
c = &taskCandidate{
// NB: Because multiple Jobs may share a Task, the BuildbucketBuildId
// could be inherited from any matching Job. Therefore, this should be
// used for non-critical, informational purposes only.
BuildbucketBuildId: j.BuildbucketBuildId,
CasDigests: []string{casSpec.Digest},
Jobs: nil,
TaskKey: key,
TaskSpec: spec,
candidates[key] = c
sklog.Infof("Found %d task candidates for %d unfinished jobs.", len(candidates), len(unfinishedJobs))
return candidates, nil
// filterTaskCandidates reduces the set of taskCandidates to the ones we might
// actually want to run and organizes them by repo and TaskSpec name.
func (s *TaskScheduler) filterTaskCandidates(preFilterCandidates map[types.TaskKey]*taskCandidate) (map[string]map[string][]*taskCandidate, error) {
defer metrics2.FuncTimer().Stop()
candidatesBySpec := map[string]map[string][]*taskCandidate{}
total := 0
skipped := map[string]int{}
for _, c := range preFilterCandidates {
// Reject skipped tasks.
if rule := s.skipTasks.MatchRule(c.Name, c.Revision); rule != "" {
c.GetDiagnostics().Filtering = &taskCandidateFilteringDiagnostics{SkippedByRule: rule}
// Reject tasks for too-old commits, as long as they aren't try jobs.
if !c.IsTryJob() {
if in, err := s.window.TestCommitHash(c.Repo, c.Revision); err != nil {
return nil, err
} else if !in {
c.GetDiagnostics().Filtering = &taskCandidateFilteringDiagnostics{RevisionTooOld: true}
// We shouldn't duplicate pending, in-progress,
// or successfully completed tasks.
prevTasks, err := s.tCache.GetTasksByKey(&c.TaskKey)
if err != nil {
return nil, err
var previous *types.Task
if len(prevTasks) > 0 {
// Just choose the last (most recently created) previous
// Task.
previous = prevTasks[len(prevTasks)-1]
if previous != nil {
if previous.Status == types.TASK_STATUS_PENDING || previous.Status == types.TASK_STATUS_RUNNING {
c.GetDiagnostics().Filtering = &taskCandidateFilteringDiagnostics{SupersededByTask: previous.Id}
if previous.Success() {
c.GetDiagnostics().Filtering = &taskCandidateFilteringDiagnostics{SupersededByTask: previous.Id}
// The attempt counts are only valid if the previous
// attempt we're looking at is the last attempt for this
// TaskSpec. Fortunately, TaskCache.GetTasksByKey sorts
// by creation time, and we've selected the last of the
// results.
maxAttempts := c.TaskSpec.MaxAttempts
if maxAttempts == 0 {
// Special case for tasks created before arbitrary
// numbers of attempts were possible.
previousAttempt := previous.Attempt
if previousAttempt == 0 && previous.RetryOf != "" {
previousAttempt = 1
if previousAttempt >= maxAttempts-1 {
previousIds := make([]string, 0, len(prevTasks))
for _, t := range prevTasks {
previousIds = append(previousIds, t.Id)
c.GetDiagnostics().Filtering = &taskCandidateFilteringDiagnostics{PreviousAttempts: previousIds}
c.Attempt = previousAttempt + 1
c.RetryOf = previous.Id
// Don't consider candidates whose dependencies are not met.
depsMet, idsToHashes, err := c.allDepsMet(s.tCache)
if err != nil {
return nil, err
if !depsMet {
// c.Filtering set in allDepsMet.
hashes := make([]string, 0, len(idsToHashes))
parentTaskIds := make([]string, 0, len(idsToHashes))
for id, hash := range idsToHashes {
hashes = append(hashes, hash)
parentTaskIds = append(parentTaskIds, id)
c.CasDigests = append(c.CasDigests, hashes...)
c.ParentTaskIds = parentTaskIds
candidates, ok := candidatesBySpec[c.Repo]
if !ok {
candidates = map[string][]*taskCandidate{}
candidatesBySpec[c.Repo] = candidates
candidates[c.Name] = append(candidates[c.Name], c)
for rule, numSkipped := range skipped {
diagLink := fmt.Sprintf("", path.Join(s.diagInstance, GCS_MAIN_LOOP_DIAGNOSTICS_DIR))
sklog.Infof("Skipped %d candidates due to skip_tasks rule %q. See details in diagnostics at %s.", numSkipped, rule, diagLink)
sklog.Infof("Filtered to %d candidates in %d spec categories.", total, len(candidatesBySpec))
return candidatesBySpec, nil
// processTaskCandidate computes the remaining information about the task
// candidate, eg. blamelists and scoring.
func (s *TaskScheduler) processTaskCandidate(ctx context.Context, c *taskCandidate, now time.Time, cache *cacheWrapper, commitsBuf []*repograph.Commit, diag *taskCandidateScoringDiagnostics) error {
if len(c.Jobs) == 0 {
// Log an error and return to allow scheduling other tasks.
sklog.Errorf("taskCandidate has no Jobs: %#v", c)
c.Score = 0
return nil
// Formula for priority is 1 - (1-<job1 priority>)(1-<job2 priority>)...(1-<jobN priority>).
inversePriorityProduct := 1.0
for _, j := range c.Jobs {
jobPriority := specs.DEFAULT_JOB_SPEC_PRIORITY
if j.Priority <= 1 && j.Priority > 0 {
jobPriority = j.Priority
inversePriorityProduct *= 1 - jobPriority
priority := 1 - inversePriorityProduct
diag.Priority = priority
// Use the earliest Job's Created time, which will maximize priority for older forced/try jobs.
earliestJob := c.Jobs[0]
diag.JobCreatedHours = now.Sub(earliestJob.Created).Hours()
if c.IsTryJob() {
c.Score = CANDIDATE_SCORE_TRY_JOB + now.Sub(earliestJob.Created).Hours()
// Proritize each subsequent attempt lower than the previous attempt.
for i := 0; i < c.Attempt; i++ {
c.Score *= priority
return nil
// Compute blamelist.
repo, ok := s.repos[c.Repo]
if !ok {
return fmt.Errorf("No such repo: %s", c.Repo)
revision := repo.Get(c.Revision)
if revision == nil {
return fmt.Errorf("No such commit %s in %s.", c.Revision, c.Repo)
var stealingFrom *types.Task
var commits []string
if !s.window.TestTime(c.Repo, revision.Timestamp) {
// If the commit has scrolled out of our window, don't bother computing
// a blamelist.
commits = []string{}
} else {
var err error
commits, stealingFrom, err = ComputeBlamelist(ctx, cache, repo, c.Name, c.Repo, revision, commitsBuf, s.taskCfgCache, s.window)
if err != nil {
return err
c.Commits = commits
if stealingFrom != nil {
c.StealingFromId = stealingFrom.Id
if len(c.Commits) > 0 && !util.In(c.Revision, c.Commits) {
sklog.Errorf("task candidate %s @ %s doesn't have its own revision in its blamelist: %v", c.Name, c.Revision, c.Commits)
if c.IsForceRun() {
c.Score = CANDIDATE_SCORE_FORCE_RUN + now.Sub(earliestJob.Created).Hours()
c.Score *= priority
return nil
// Score the candidate.
// The score for a candidate is based on the "testedness" increase
// provided by running the task.
stoleFromCommits := 0
stoleFromStatus := types.TASK_STATUS_SUCCESS
if stealingFrom != nil {
stoleFromCommits = len(stealingFrom.Commits)
stoleFromStatus = stealingFrom.Status
diag.StoleFromCommits = stoleFromCommits
score := testednessIncrease(len(c.Commits), stoleFromCommits)
diag.TestednessIncrease = score
// Add a bonus when retrying or backfilling failures and mishaps.
if stoleFromStatus == types.TASK_STATUS_FAILURE || stoleFromStatus == types.TASK_STATUS_MISHAP {
// Scale the score by other factors, eg. time decay.
decay, err := s.timeDecayForCommit(now, revision)
if err != nil {
return err
diag.TimeDecay = decay
score *= decay
score *= priority
c.Score = score
return nil
// Process the task candidates.
func (s *TaskScheduler) processTaskCandidates(ctx context.Context, candidates map[string]map[string][]*taskCandidate, now time.Time) ([]*taskCandidate, error) {
defer metrics2.FuncTimer().Stop()
processed := make(chan *taskCandidate)
errs := make(chan error)
wg := sync.WaitGroup{}
for _, cs := range candidates {
for _, c := range cs {
go func(candidates []*taskCandidate) {
defer wg.Done()
cache := newCacheWrapper(s.tCache)
commitsBuf := make([]*repograph.Commit, 0, MAX_BLAMELIST_COMMITS)
for {
// Find the best candidate.
idx := -1
var orig *taskCandidate
var best *taskCandidate
var bestDiag *taskCandidateScoringDiagnostics
for i, candidate := range candidates {
c := candidate.CopyNoDiagnostics()
diag := &taskCandidateScoringDiagnostics{}
if err := s.processTaskCandidate(ctx, c, now, cache, commitsBuf, diag); err != nil {
errs <- err
if best == nil || c.Score > best.Score {
orig = candidate
best = c
bestDiag = diag
idx = i
if best == nil {
// Use the original candidate since we're holding on to that pointer for diagnostics.
*orig = *best
best = orig
best.GetDiagnostics().Scoring = bestDiag
processed <- best
t := best.MakeTask()
t.Id = best.MakeId()
if best.StealingFromId != "" {
stoleFrom, err := cache.GetTask(best.StealingFromId)
if err != nil {
errs <- err
stole := util.NewStringSet(best.Commits)
oldC := util.NewStringSet(stoleFrom.Commits)
newC := oldC.Complement(stole)
commits := make([]string, 0, len(newC))
for c := range newC {
commits = append(commits, c)
stoleFrom.Commits = commits
candidates = append(candidates[:idx], candidates[idx+1:]...)
go func() {
rvCandidates := []*taskCandidate{}
rvErrs := []error{}
for {
select {
case c, ok := <-processed:
if ok {
rvCandidates = append(rvCandidates, c)
} else {
processed = nil
case err, ok := <-errs:
if ok {
rvErrs = append(rvErrs, err)
} else {
errs = nil
if processed == nil && errs == nil {
if len(rvErrs) != 0 {
return nil, rvErrs[0]
return rvCandidates, nil
// flatten all the dimensions in 'dims' into a single valued map.
func flatten(dims map[string]string) map[string]string {
keys := make([]string, 0, len(dims))
for key := range dims {
keys = append(keys, key)
ret := make([]string, 0, 2*len(dims))
for _, key := range keys {
ret = append(ret, key, dims[key])
return map[string]string{"dimensions": strings.Join(ret, " ")}
// recordCandidateMetrics generates metrics for candidates by dimension sets.
func (s *TaskScheduler) recordCandidateMetrics(candidates map[string]map[string][]*taskCandidate) {
defer metrics2.FuncTimer().Stop()
// Generate counts. These maps are keyed by the MD5 hash of the
// candidate's TaskSpec's dimensions.
counts := map[string]int64{}
dimensions := map[string]map[string]string{}
for _, byRepo := range candidates {
for _, bySpec := range byRepo {
for _, c := range bySpec {
parseDims, err := swarming.ParseDimensions(c.TaskSpec.Dimensions)
if err != nil {
sklog.Errorf("Failed to parse dimensions: %s", err)
dims := make(map[string]string, len(parseDims))
for k, v := range parseDims {
// Just take the first value for each dimension.
dims[k] = v[0]
k, err := util.MD5Sum(dims)
if err != nil {
sklog.Errorf("Failed to create metrics key: %s", err)
dimensions[k] = dims
// Report the data.
defer s.candidateMetricsMtx.Unlock()
for k, count := range counts {
metric, ok := s.candidateMetrics[k]
if !ok {
metric = metrics2.GetInt64Metric(MEASUREMENT_TASK_CANDIDATE_COUNT, flatten(dimensions[k]))
s.candidateMetrics[k] = metric
for k, metric := range s.candidateMetrics {
_, ok := counts[k]
if !ok {
delete(s.candidateMetrics, k)
// regenerateTaskQueue obtains the set of all eligible task candidates, scores
// them, and prepares them to be triggered. The second return value contains
// all candidates.
func (s *TaskScheduler) regenerateTaskQueue(ctx context.Context, now time.Time) ([]*taskCandidate, map[types.TaskKey]*taskCandidate, error) {
defer metrics2.FuncTimer().Stop()
// Find the unfinished Jobs.
unfinishedJobs, err := s.jCache.UnfinishedJobs()
if err != nil {
return nil, nil, err
// Find TaskSpecs for all unfinished Jobs.
preFilterCandidates, err := s.findTaskCandidatesForJobs(ctx, unfinishedJobs)
if err != nil {
return nil, nil, err
// Filter task candidates.
candidates, err := s.filterTaskCandidates(preFilterCandidates)
if err != nil {
return nil, nil, err
// Record the number of task candidates per dimension set.
// Process the remaining task candidates.
queue, err := s.processTaskCandidates(ctx, candidates, now)
if err != nil {
return nil, nil, err
return queue, preFilterCandidates, nil
// getCandidatesToSchedule matches the list of free Swarming bots to task
// candidates in the queue and returns the candidates which should be run.
// Assumes that the tasks are sorted in decreasing order by score.
func getCandidatesToSchedule(bots []*types.Machine, tasks []*taskCandidate) []*taskCandidate {
defer metrics2.FuncTimer().Stop()
// Create a bots-by-swarming-dimension mapping.
botsByDim := map[string]util.StringSet{}
for _, b := range bots {
for _, dim := range b.Dimensions {
if _, ok := botsByDim[dim]; !ok {
botsByDim[dim] = util.StringSet{}
botsByDim[dim][b.ID] = true
// BotIds that have been used by previous candidates.
usedBots := util.StringSet{}
// Map BotId to the candidates that could have used that bot. In the
// case that no bots are available for a candidate, map concatenated
// dimensions to candidates.
botToCandidates := map[string][]*taskCandidate{}
// Match bots to tasks.
// TODO(borenet): Some tasks require a more specialized bot. We should
// match so that less-specialized tasks don't "steal" more-specialized
// bots which they don't actually need.
rv := make([]*taskCandidate, 0, len(bots))
countByTaskSpec := make(map[string]int, len(bots))
for _, c := range tasks {
diag := &taskCandidateSchedulingDiagnostics{}
c.GetDiagnostics().Scheduling = diag
if countByTaskSpec[c.Name] == SCHEDULING_LIMIT_PER_TASK_SPEC {
sklog.Warningf("Too many tasks to schedule for %s; not scheduling more than %d", c.Name, SCHEDULING_LIMIT_PER_TASK_SPEC)
diag.OverSchedulingLimitPerTaskSpec = true
// TODO(borenet): Make this threshold configurable.
if c.Score <= 0.0 {
// This normally shouldn't happen, but it can happen if there is both a
// forced task and an unused retry for the same repo state.
sklog.Debugf("candidate %s @ %s has a score of %2f; skipping (%d commits).", c.Name, c.Revision, c.Score, len(c.Commits))
diag.ScoreBelowThreshold = true
// For each dimension of the task, find the set of bots which matches.
matches := util.StringSet{}
for i, d := range c.TaskSpec.Dimensions {
if i == 0 {
matches = matches.Union(botsByDim[d])
} else {
matches = matches.Intersect(botsByDim[d])
// Set of candidates that could have used the same bots.
similarCandidates := map[*taskCandidate]struct{}{}
var lowestScoreSimilarCandidate *taskCandidate
addCandidates := func(key string) {
candidates := botToCandidates[key]
for _, candidate := range candidates {
similarCandidates[candidate] = struct{}{}
if len(candidates) > 0 {
lastCandidate := candidates[len(candidates)-1]
if lowestScoreSimilarCandidate == nil || lowestScoreSimilarCandidate.Score > lastCandidate.Score {
lowestScoreSimilarCandidate = lastCandidate
botToCandidates[key] = append(candidates, c)
// Choose a particular bot to mark as used. Sort by ID so that the choice is deterministic.
var chosenBot string
if len(matches) > 0 {
diag.MatchingBots = matches.Keys()
for botId := range matches {
if (chosenBot == "" || botId < chosenBot) && !usedBots[botId] {
chosenBot = botId
} else {
diag.NoBotsAvailable = true
diag.MatchingBots = nil
// Use sorted concatenated dimensions instead of botId as the key.
dims := util.CopyStringSlice(c.TaskSpec.Dimensions)
addCandidates(strings.Join(dims, ","))
diag.NumHigherScoreSimilarCandidates = len(similarCandidates)
if lowestScoreSimilarCandidate != nil {
diag.LastSimilarCandidate = &lowestScoreSimilarCandidate.TaskKey
if chosenBot != "" {
// We're going to run this task.
diag.Selected = true
usedBots[chosenBot] = true
// Add the task to the scheduling list.
rv = append(rv, c)
return rv
// mergeCASInputs uploads inputs for the taskCandidates to content-addressed
// storage. Returns the list of candidates which were successfully merged, with
// their CasInput set, and any error which occurred. Note that the successful
// candidates AND an error may both be returned if some were successfully merged
// but others failed.
func (s *TaskScheduler) mergeCASInputs(ctx context.Context, candidates []*taskCandidate) ([]*taskCandidate, error) {
defer metrics2.FuncTimer().Stop()
mergedCandidates := make([]*taskCandidate, 0, len(candidates))
var errs *multierror.Error
for _, c := range candidates {
digest, err := s.rbeCas.Merge(ctx, c.CasDigests)
if err != nil {
errStr := err.Error()
c.GetDiagnostics().Triggering = &taskCandidateTriggeringDiagnostics{IsolateError: errStr}
errs = multierror.Append(errs, fmt.Errorf("Failed to merge CAS inputs: %s", errStr))
c.CasInput = digest
mergedCandidates = append(mergedCandidates, c)
return mergedCandidates, errs.ErrorOrNil()
// triggerTasks triggers the given slice of tasks to run on Swarming and returns
// a channel of the successfully-triggered tasks which is closed after all tasks
// have been triggered or failed. Each failure is sent to errCh.
func (s *TaskScheduler) triggerTasks(candidates []*taskCandidate, errCh chan<- error) <-chan *types.Task {
defer metrics2.FuncTimer().Stop()
triggered := make(chan *types.Task)
var wg sync.WaitGroup
for _, candidate := range candidates {
go func(candidate *taskCandidate) {
defer wg.Done()
t := candidate.MakeTask()
diag := &taskCandidateTriggeringDiagnostics{}
candidate.GetDiagnostics().Triggering = diag
recordErr := func(context string, err error) {
err = fmt.Errorf("%s: %s", context, err)
diag.TriggerError = err.Error()
errCh <- err
if err := s.db.AssignId(t); err != nil {
recordErr("Failed to assign id", err)
diag.TaskId = t.Id
req, err := candidate.MakeTaskRequest(t.Id, s.rbeCasInstance, s.pubsubTopic)
if err != nil {
recordErr("Failed to create task request", err)
s.pendingInsert[t.Id] = true
resp, err := s.taskExecutor.TriggerTask(context.TODO(), req)
if err != nil {
delete(s.pendingInsert, t.Id)
jobIds := make([]string, 0, len(candidate.Jobs))
for _, job := range candidate.Jobs {
jobIds = append(jobIds, job.Id)
recordErr("Failed to trigger task", skerr.Wrapf(err, "%q needed for jobs: %+v", candidate.Name, jobIds))
t.Created = resp.Created
t.Started = resp.Started
t.Finished = resp.Finished
t.SwarmingTaskId = resp.ID
// The task may have been de-duplicated.
if resp.Status == types.TASK_STATUS_SUCCESS {
if _, err := t.UpdateFromTaskResult(resp); err != nil {
recordErr("Failed to update de-duplicated task", err)
triggered <- t
go func() {
return triggered
// scheduleTasks queries for free Swarming bots and triggers tasks according
// to relative priorities in the queue.
func (s *TaskScheduler) scheduleTasks(ctx context.Context, bots []*types.Machine, queue []*taskCandidate) error {
defer metrics2.FuncTimer().Stop()
// Match free bots with tasks.
candidates := getCandidatesToSchedule(bots, queue)
// Merge CAS inputs for the tasks.
merged, mergeErr := s.mergeCASInputs(ctx, candidates)
if mergeErr != nil && len(merged) == 0 {
return mergeErr
// Setup the error channel.
errs := []error{}
if mergeErr != nil {
errs = append(errs, mergeErr)
errCh := make(chan error)
var errWg sync.WaitGroup
go func() {
defer errWg.Done()
for err := range errCh {
errs = append(errs, err)
// Trigger Swarming tasks.
triggered := s.triggerTasks(merged, errCh)
// Collect the tasks we triggered.
numTriggered := 0
insert := map[string]map[string][]*types.Task{}
for t := range triggered {
byRepo, ok := insert[t.Repo]
if !ok {
byRepo = map[string][]*types.Task{}
insert[t.Repo] = byRepo
byRepo[t.Name] = append(byRepo[t.Name], t)
if len(insert) > 0 {
// Insert the newly-triggered tasks into the DB.
err := s.addTasks(ctx, insert)
// Remove the tasks from the pending map, regardless of whether
// we successfully inserted into the DB.
for _, byRepo := range insert {
for _, byName := range byRepo {
for _, t := range byName {
delete(s.pendingInsert, t.Id)
// Handle failure/success.
if err != nil {
errs = append(errs, fmt.Errorf("Triggered tasks but failed to insert into DB: %s", err))
} else {
// Organize the triggered task by TaskKey.
remove := make(map[types.TaskKey]*types.Task, numTriggered)
for _, byRepo := range insert {
for _, byName := range byRepo {
for _, t := range byName {
remove[t.TaskKey] = t
if len(remove) != numTriggered {
return fmt.Errorf("Number of tasks to remove from the queue (%d) differs from the number of tasks triggered (%d)", len(remove), numTriggered)
// Remove the tasks from the queue.
newQueue := make([]*taskCandidate, 0, len(queue)-numTriggered)
for _, c := range queue {
if _, ok := remove[c.TaskKey]; !ok {
newQueue = append(newQueue, c)
// Note; if regenerateQueue and scheduleTasks are ever decoupled so that
// the queue is reused by multiple runs of scheduleTasks, we'll need to
// address the fact that some candidates may still have their
// StoleFromId pointing to candidates which have been triggered and
// removed from the queue. In that case, we should just need to write a
// loop which updates those candidates to use the IDs of the newly-
// inserted Tasks in the database rather than the candidate ID.
sklog.Infof("Triggered %d tasks on %d bots (%d in queue).", numTriggered, len(bots), len(queue))
queue = newQueue
} else {
sklog.Infof("Triggered no tasks (%d in queue, %d bots available)", len(queue), len(bots))
defer s.queueMtx.Unlock()
s.queue = queue
s.lastScheduled = time.Now()
if len(errs) > 0 {
rvErr := "Got failures: "
for _, e := range errs {
rvErr += fmt.Sprintf("\n%s\n", e)
return fmt.Errorf(rvErr)
return nil
// MainLoop runs a single end-to-end task scheduling loop.
func (s *TaskScheduler) MainLoop(ctx context.Context) error {
defer metrics2.FuncTimer().Stop()
sklog.Infof("Task Scheduler MainLoop starting...")
start := time.Now()
var getSwarmingBotsErr error
var wg sync.WaitGroup
var bots []*types.Machine
go func() {
defer wg.Done()
var err error
bots, err = getFreeMachines(s.taskExecutor, s.busyBots, s.pools)
if err != nil {
getSwarmingBotsErr = err
if err := s.tCache.Update(); err != nil {
return fmt.Errorf("Failed to update task cache: %s", err)
if err := s.jCache.Update(); err != nil {
return fmt.Errorf("Failed to update job cache: %s", err)
if err := s.updateUnfinishedJobs(); err != nil {
return fmt.Errorf("Failed to update unfinished jobs: %s", err)
if err := s.skipTasks.Update(); err != nil {
return fmt.Errorf("Failed to update skip_tasks: %s", err)
// Regenerate the queue.
sklog.Infof("Task Scheduler regenerating the queue...")
queue, allCandidates, err := s.regenerateTaskQueue(ctx, start)
if err != nil {
return fmt.Errorf("Failed to regenerate task queue: %s", err)
if getSwarmingBotsErr != nil {
return fmt.Errorf("Failed to retrieve free Swarming bots: %s", getSwarmingBotsErr)
sklog.Infof("Task Scheduler scheduling tasks...")
err = s.scheduleTasks(ctx, bots, queue)
// An error from scheduleTasks can indicate a partial error; write diagnostics
// in either case.
if s.diagClient != nil {
end := time.Now()
go func() {
defer s.testWaitGroup.Done()
util.LogErr(writeMainLoopDiagnosticsToGCS(ctx, start, end, s.diagClient, s.diagInstance, allCandidates, bots, err))
if err != nil {
return fmt.Errorf("Failed to schedule tasks: %s", err)
sklog.Infof("Task Scheduler MainLoop finished.")
return nil
// QueueLen returns the length of the queue.
func (s *TaskScheduler) QueueLen() int {
defer s.queueMtx.RUnlock()
return len(s.queue)
// timeDecay24Hr computes a linear time decay amount for the given duration,
// given the requested decay amount at 24 hours.
func timeDecay24Hr(decayAmt24Hr float64, elapsed time.Duration) float64 {
return math.Max(1.0-(1.0-decayAmt24Hr)*(float64(elapsed)/float64(24*time.Hour)), 0.0)
// timeDecayForCommit computes a multiplier for a task candidate score based
// on how long ago the given commit landed. This allows us to prioritize more
// recent commits.
func (s *TaskScheduler) timeDecayForCommit(now time.Time, commit *repograph.Commit) (float64, error) {
if s.timeDecayAmt24Hr == 1.0 {
// Shortcut for special case.
return 1.0, nil
rv := timeDecay24Hr(s.timeDecayAmt24Hr, now.Sub(commit.Timestamp))
// TODO(benjaminwagner): Change to an exponential decay to prevent
// zero/negative scores.
//if rv == 0.0 {
// sklog.Warningf("timeDecayForCommit is zero. Now: %s, Commit: %s ts %s, TimeDecay: %2f\nDetails: %v", now, commit.Hash, commit.Timestamp, s.timeDecayAmt24Hr, commit)
return rv, nil
func (ts *TaskScheduler) GetSkipTasks() *skip_tasks.DB {
return ts.skipTasks
// testedness computes the total "testedness" of a set of commits covered by a
// task whose blamelist included N commits. The "testedness" of a task spec at a
// given commit is defined as follows:
// -1.0 if no task has ever included this commit for this task spec.
// 1.0 if a task was run for this task spec AT this commit.
// 1.0 / N if a task for this task spec has included this commit, where N is
// the number of commits included in the task.
// This function gives the sum of the testedness for a blamelist of N commits.
func testedness(n int) float64 {
if n < 0 {
// This should never happen.
sklog.Errorf("Task score function got a blamelist with %d commits", n)
return -1.0
} else if n == 0 {
// Zero commits have zero testedness.
return 0.0
} else if n == 1 {
return 1.0
} else {
return 1.0 + float64(n-1)/float64(n)
// testednessIncrease computes the increase in "testedness" obtained by running
// a task with the given blamelist length which may have "stolen" commits from
// a previous task with a different blamelist length. To do so, we compute the
// "testedness" for every commit affected by the task, before and after the
// task would run. We subtract the "before" score from the "after" score to
// obtain the "testedness" increase at each commit, then sum them to find the
// total increase in "testedness" obtained by running the task.
func testednessIncrease(blamelistLength, stoleFromBlamelistLength int) float64 {
// Invalid inputs.
if blamelistLength <= 0 || stoleFromBlamelistLength < 0 {
return -1.0
if stoleFromBlamelistLength == 0 {
// This task covers previously-untested commits. Previous testedness
// is -1.0 for each commit in the blamelist.
beforeTestedness := float64(-blamelistLength)
afterTestedness := testedness(blamelistLength)
return afterTestedness - beforeTestedness
} else if blamelistLength == stoleFromBlamelistLength {
// This is a retry. It provides no testedness increase, so shortcut here
// rather than perform the math to obtain the same answer.
return 0.0
} else {
// This is a bisect/backfill.
beforeTestedness := testedness(stoleFromBlamelistLength)
afterTestedness := testedness(blamelistLength) + testedness(stoleFromBlamelistLength-blamelistLength)
return afterTestedness - beforeTestedness
// getFreeMachines returns a slice of free machines.
func getFreeMachines(taskExec types.TaskExecutor, busy *busyBots, pools []string) ([]*types.Machine, error) {
defer metrics2.FuncTimer().Stop()
// Query for free machines and pending tasks in all pools.
var wg sync.WaitGroup
machines := []*types.Machine{}
pending := []*types.TaskResult{}
errs := []error{}
var mtx sync.Mutex
for _, pool := range pools {
// Free bots.
go func(pool string) {
defer wg.Done()
b, err := taskExec.GetFreeMachines(context.TODO(), pool)
defer mtx.Unlock()
if err != nil {
errs = append(errs, err)
} else {
machines = append(machines, b...)
// Pending tasks.
go func(pool string) {
defer wg.Done()
pendingTasks, err := taskExec.GetPendingTasks(context.TODO(), pool)
defer mtx.Unlock()
if err != nil {
errs = append(errs, err)
} else {
pending = append(pending, pendingTasks...)
if len(errs) > 0 {
return nil, fmt.Errorf("Got errors loading bots and tasks from Swarming: %v", errs)
rv := make([]*types.Machine, 0, len(machines))
for _, machine := range machines {
if machine.IsDead {
if machine.IsQuarantined {
if machine.CurrentTaskID != "" {
rv = append(rv, machine)
return busy.Filter(rv), nil
// updateUnfinishedTasks queries Swarming for all unfinished tasks and updates
// their status in the DB.
func (s *TaskScheduler) updateUnfinishedTasks() error {
defer metrics2.FuncTimer().Stop()
// Update the TaskCache.
if err := s.tCache.Update(); err != nil {
return err
tasks, err := s.tCache.UnfinishedTasks()
if err != nil {
return err
// Query Swarming for all unfinished tasks.
sklog.Infof("Querying states of %d unfinished tasks.", len(tasks))
ids := make([]string, 0, len(tasks))
for _, t := range tasks {
ids = append(ids, t.SwarmingTaskId)
finishedStates, err := s.taskExecutor.GetTaskCompletionStatuses(context.TODO(), ids)
if err != nil {
return err
finished := make([]*types.Task, 0, len(finishedStates))
for idx, task := range tasks {
if finishedStates[idx] {
finished = append(finished, task)
// Update any newly-finished tasks.
if len(finished) > 0 {
sklog.Infof("Updating %d newly-finished tasks.", len(finished))
var wg sync.WaitGroup
errs := make([]error, len(tasks))
for i, t := range finished {
go func(idx int, t *types.Task) {
defer wg.Done()
taskResult, err := s.taskExecutor.GetTaskResult(context.TODO(), t.SwarmingTaskId)
if err != nil {
errs[idx] = fmt.Errorf("Failed to update unfinished task; failed to get updated task from swarming: %s", err)
modified, err := db.UpdateDBFromTaskResult(s.db, taskResult)
if err != nil {
errs[idx] = fmt.Errorf("Failed to update unfinished task: %s", err)
} else if modified {
}(i, t)
for _, err := range errs {
if err != nil {
return err
return s.tCache.Update()
// updateUnfinishedJobs updates all not-yet-finished Jobs to determine if their
// state has changed.
func (s *TaskScheduler) updateUnfinishedJobs() error {
defer metrics2.FuncTimer().Stop()
jobs, err := s.jCache.UnfinishedJobs()
if err != nil {
return err
modifiedJobs := make([]*types.Job, 0, len(jobs))
modifiedTasks := make(map[string]*types.Task, len(jobs))
for _, j := range jobs {
tasks, err := s.getTasksForJob(j)
if err != nil {
return err
summaries := make(map[string][]*types.TaskSummary, len(tasks))
for k, v := range tasks {
cpy := make([]*types.TaskSummary, 0, len(v))
for _, t := range v {
if existing := modifiedTasks[t.Id]; existing != nil {
t = existing
cpy = append(cpy, t.MakeTaskSummary())
// The Jobs list is always sorted.
oldLen := len(t.Jobs)
t.Jobs = util.InsertStringSorted(t.Jobs, j.Id)
if len(t.Jobs) > oldLen {
modifiedTasks[t.Id] = t
summaries[k] = cpy
if !reflect.DeepEqual(summaries, j.Tasks) {
j.Tasks = summaries
j.Status = j.DeriveStatus()
if j.Done() {
j.Finished = time.Now()
modifiedJobs = append(modifiedJobs, j)
if len(modifiedTasks) > 0 {
tasks := make([]*types.Task, 0, len(modifiedTasks))
for _, t := range modifiedTasks {
tasks = append(tasks, t)
if err := s.putTasksInChunks(tasks); err != nil {
return err
if len(modifiedJobs) > 0 {
if err := s.putJobsInChunks(modifiedJobs); err != nil {
return err
return nil
// getTasksForJob finds all Tasks for the given Job. It returns the Tasks
// in a map keyed by name.
func (s *TaskScheduler) getTasksForJob(j *types.Job) (map[string][]*types.Task, error) {
tasks := map[string][]*types.Task{}
for d := range j.Dependencies {
key := j.MakeTaskKey(d)
gotTasks, err := s.tCache.GetTasksByKey(&key)
if err != nil {
return nil, err
tasks[d] = gotTasks
return tasks, nil
// addTasksSingleTaskSpec computes the blamelist for each task in tasks, all of
// which must have the same Repo and Name fields, and inserts/updates them in
// the TaskDB. Also adjusts blamelists of existing tasks.
func (s *TaskScheduler) addTasksSingleTaskSpec(ctx context.Context, tasks []*types.Task) error {
cache := newCacheWrapper(s.tCache)
repoName := tasks[0].Repo
taskName := tasks[0].Name
repo, ok := s.repos[repoName]
if !ok {
return fmt.Errorf("No such repo: %s", repoName)
commitsBuf := make([]*repograph.Commit, 0, MAX_BLAMELIST_COMMITS)
updatedTasks := map[string]*types.Task{}
for _, task := range tasks {
if task.Repo != repoName || task.Name != taskName {
return fmt.Errorf("Mismatched Repo or Name: %v", tasks)
if task.Id == "" {
if err := s.db.AssignId(task); err != nil {
return err
if task.IsTryJob() {
updatedTasks[task.Id] = task
// Compute blamelist.
revision := repo.Get(task.Revision)
if revision == nil {
return fmt.Errorf("No such commit %s in %s.", task.Revision, task.Repo)
if !s.window.TestTime(task.Repo, revision.Timestamp) {
return fmt.Errorf("Can not add task %s with revision %s (at %s) before window start.", task.Id, task.Revision, revision.Timestamp)
commits, stealingFrom, err := ComputeBlamelist(ctx, cache, repo, task.Name, task.Repo, revision, commitsBuf, s.taskCfgCache, s.window)
if err != nil {
return err
task.Commits = commits
if len(task.Commits) > 0 && !util.In(task.Revision, task.Commits) {
sklog.Errorf("task %s (%s @ %s) doesn't have its own revision in its blamelist: %v", task.Id, task.Name, task.Revision, task.Commits)
updatedTasks[task.Id] = task
if stealingFrom != nil && stealingFrom.Id != task.Id {
stole := util.NewStringSet(commits)
oldC := util.NewStringSet(stealingFrom.Commits)
newC := oldC.Complement(stole)
if len(newC) == 0 {
stealingFrom.Commits = nil
} else {
newCommits := make([]string, 0, len(newC))
for c := range newC {
newCommits = append(newCommits, c)
stealingFrom.Commits = newCommits
updatedTasks[stealingFrom.Id] = stealingFrom
putTasks := make([]*types.Task, 0, len(updatedTasks))
for _, task := range updatedTasks {
putTasks = append(putTasks, task)
if err := s.putTasks(putTasks); err != nil {
return err
return nil
// addTasks inserts the given tasks into the TaskDB, updating blamelists. The
// provided Tasks should have all fields initialized except for Commits, which
// will be overwritten, and optionally Id, which will be assigned if necessary.
// addTasks updates existing Tasks' blamelists, if needed. The provided map
// groups Tasks by repo and TaskSpec name. May return error on partial success.
// May modify Commits and Id of argument tasks on error.
func (s *TaskScheduler) addTasks(ctx context.Context, taskMap map[string]map[string][]*types.Task) error {
type queueItem struct {
Repo string
Name string
queue := map[queueItem]bool{}
for repo, byName := range taskMap {
for name, tasks := range byName {
if len(tasks) == 0 {
Repo: repo,
Name: name,
}] = true
for i := 0; i < db.NUM_RETRIES; i++ {
if len(queue) == 0 {
return nil
if err := s.tCache.Update(); err != nil {
return err
done := make(chan queueItem)
errs := make(chan error, len(queue))
wg := sync.WaitGroup{}
for item := range queue {
go func(item queueItem, tasks []*types.Task) {
defer wg.Done()
if err := s.addTasksSingleTaskSpec(ctx, tasks); err != nil {
if !db.IsConcurrentUpdate(err) {
errs <- fmt.Errorf("Error adding tasks for %s (in repo %s): %s", item.Name, item.Repo, err)
} else {
done <- item
}(item, taskMap[item.Repo][item.Name])
go func() {
for item := range done {
delete(queue, item)
rvErrs := []error{}
for err := range errs {
rvErrs = append(rvErrs, err)
if len(rvErrs) != 0 {
return rvErrs[0]
if len(queue) > 0 {
return fmt.Errorf("addTasks: %d consecutive ErrConcurrentUpdate; %d of %d TaskSpecs failed. %#v", db.NUM_RETRIES, len(queue), len(taskMap), queue)
return nil
// HandleSwarmingPubSub loads the given Swarming task ID from Swarming and
// updates the associated types.Task in the database. Returns a bool indicating
// whether the pubsub message should be acknowledged.
func (s *TaskScheduler) HandleSwarmingPubSub(msg *swarming.PubSubTaskMessage) bool {
if msg.UserData == "" {
// This message is invalid. ACK it to make it go away.
return true
// If the task has been triggered but not yet inserted into the DB, NACK
// the message so that we'll receive it later.
isPending := s.pendingInsert[msg.UserData]
if isPending {
sklog.Debugf("Received pub/sub message for task which hasn't yet been inserted into the db: %s (%s); not ack'ing message; will try again later.", msg.SwarmingTaskId, msg.UserData)
return false
// Obtain the Swarming task data.
res, err := s.taskExecutor.GetTaskResult(context.TODO(), msg.SwarmingTaskId)
if err != nil {
sklog.Errorf("pubsub: Failed to retrieve task from Swarming: %s", err)
return true
// Skip unfinished tasks.
if util.TimeIsZero(res.Finished) {
return true
// Update the task in the DB.
if _, err := db.UpdateDBFromTaskResult(s.db, res); err != nil {
// TODO(borenet): Some of these cases should never be hit, after all tasks
// start supplying the ID in msg.UserData. We should be able to remove the logic.
id := "<MISSING ID TAG>"
if err == db.ErrNotFound {
ids, ok := res.Tags[types.SWARMING_TAG_ID]
if ok {
id = ids[0]
if time.Now().Sub(res.Created) < 2*time.Minute {
sklog.Infof("Failed to update task %q: No such task ID: %q. Less than two minutes old; try again later.", msg.SwarmingTaskId, id)
return false
sklog.Errorf("Failed to update task %q: No such task ID: %q", msg.SwarmingTaskId, id)
return true
} else if err == db.ErrUnknownId {
expectedSwarmingTaskId := "<unknown>"
ids, ok := res.Tags[types.SWARMING_TAG_ID]
if ok {
id = ids[0]
t, err := s.db.GetTaskById(id)
if err != nil {
sklog.Errorf("Failed to update task %q; mismatched ID and failed to retrieve task from DB: %s", msg.SwarmingTaskId, err)
return true
} else {
expectedSwarmingTaskId = t.SwarmingTaskId
sklog.Errorf("Failed to update task %q: Task %s has a different Swarming task ID associated with it: %s", msg.SwarmingTaskId, id, expectedSwarmingTaskId)
return true
} else {
sklog.Errorf("Failed to update task %q: %s", msg.SwarmingTaskId, err)
return true
return true