blob: 8c0bf7202ce7affea4e44f59d1783d31d0679990 [file] [log] [blame]
package scheduling
import (
"context"
"errors"
"fmt"
"math"
"net/http"
"path"
"reflect"
"sort"
"strings"
"sync"
"time"
swarming_api "go.chromium.org/luci/common/api/swarming/swarming/v1"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/gerrit"
"go.skia.org/infra/go/git"
"go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/isolate"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/swarming"
"go.skia.org/infra/go/timeout"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/blacklist"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/db/cache"
"go.skia.org/infra/task_scheduler/go/specs"
"go.skia.org/infra/task_scheduler/go/tryjobs"
"go.skia.org/infra/task_scheduler/go/types"
"go.skia.org/infra/task_scheduler/go/window"
"golang.org/x/oauth2"
)
const (
// Manually-forced jobs have high priority.
CANDIDATE_SCORE_FORCE_RUN = 100.0
// Try jobs have high priority, equal to building at HEAD when we're
// 5 commits behind.
CANDIDATE_SCORE_TRY_JOB = 10.0
// When retrying a try job task that has failed, prioritize the retry
// lower than tryjob tasks that haven't run yet.
CANDIDATE_SCORE_TRY_JOB_RETRY_MULTIPLIER = 0.75
// MAX_BLAMELIST_COMMITS is the maximum number of commits which are
// allowed in a task blamelist before we stop tracing commit history.
MAX_BLAMELIST_COMMITS = 500
// Measurement name for task candidate counts by dimension set.
MEASUREMENT_TASK_CANDIDATE_COUNT = "task_candidate_count"
NUM_TOP_CANDIDATES = 50
)
var (
// Don't schedule on these branches.
// WARNING: Any commit reachable from any of these branches will be
// skipped. So, for example, if you fork a branch from head of master
// and immediately blacklist it, no tasks will be scheduled for any
// commits on master up to the branch point.
// TODO(borenet): An alternative would be to only follow the first
// parent for merge commits. That way, we could remove the checks which
// cause this issue but still blacklist the branch as expected. The
// downside is that we'll miss commits in the case where we fork a
// branch, merge it back, and delete the new branch head.
BRANCH_BLACKLIST = map[string][]string{
common.REPO_SKIA_INTERNAL: {
"skia-master",
},
}
ERR_BLAMELIST_DONE = errors.New("ERR_BLAMELIST_DONE")
)
// TaskScheduler is a struct used for scheduling tasks on bots.
type TaskScheduler struct {
bl *blacklist.Blacklist
busyBots *busyBots
candidateMetrics map[string]metrics2.Int64Metric
candidateMetricsMtx sync.Mutex
db db.DB
depotToolsDir string
isolate *isolate.Client
jCache cache.JobCache
lastScheduled time.Time // protected by queueMtx.
// TODO(benjaminwagner): newTasks probably belongs in the TaskCfgCache.
newTasks map[types.RepoState]util.StringSet
newTasksMtx sync.RWMutex
pendingInsert map[string]bool
pendingInsertMtx sync.RWMutex
pools []string
pubsubTopic string
queue []*taskCandidate // protected by queueMtx.
queueMtx sync.RWMutex
repos repograph.Map
swarming swarming.ApiClient
taskCfgCache *specs.TaskCfgCache
tCache cache.TaskCache
timeDecayAmt24Hr float64
tryjobs *tryjobs.TryJobIntegrator
window *window.Window
workdir string
}
func NewTaskScheduler(ctx context.Context, d db.DB, bl *blacklist.Blacklist, period time.Duration, numCommits int, workdir, host string, repos repograph.Map, isolateClient *isolate.Client, swarmingClient swarming.ApiClient, c *http.Client, timeDecayAmt24Hr float64, buildbucketApiUrl, trybotBucket string, projectRepoMapping map[string]string, pools []string, pubsubTopic, depotTools string, gerrit gerrit.GerritInterface, btProject, btInstance string, ts oauth2.TokenSource) (*TaskScheduler, error) {
// Repos must be updated before window is initialized; otherwise the repos may be uninitialized,
// resulting in the window being too short, causing the caches to be loaded with incomplete data.
for _, r := range repos {
if err := r.Update(ctx); err != nil {
return nil, fmt.Errorf("Failed initial repo sync: %s", err)
}
}
w, err := window.New(period, numCommits, repos)
if err != nil {
return nil, fmt.Errorf("Failed to create window: %s", err)
}
// Create caches.
tCache, err := cache.NewTaskCache(d, w)
if err != nil {
return nil, fmt.Errorf("Failed to create TaskCache: %s", err)
}
jCache, err := cache.NewJobCache(d, w, cache.GitRepoGetRevisionTimestamp(repos))
if err != nil {
return nil, fmt.Errorf("Failed to create JobCache: %s", err)
}
taskCfgCache, err := specs.NewTaskCfgCache(ctx, repos, depotTools, path.Join(workdir, "taskCfgCache"), specs.DEFAULT_NUM_WORKERS, btProject, btInstance, ts)
if err != nil {
return nil, fmt.Errorf("Failed to create TaskCfgCache: %s", err)
}
tryjobs, err := tryjobs.NewTryJobIntegrator(buildbucketApiUrl, trybotBucket, host, c, d, w, projectRepoMapping, repos, taskCfgCache, gerrit)
if err != nil {
return nil, fmt.Errorf("Failed to create TryJobIntegrator: %s", err)
}
s := &TaskScheduler{
bl: bl,
busyBots: newBusyBots(),
candidateMetrics: map[string]metrics2.Int64Metric{},
db: d,
depotToolsDir: depotTools,
isolate: isolateClient,
jCache: jCache,
newTasks: map[types.RepoState]util.StringSet{},
newTasksMtx: sync.RWMutex{},
pendingInsert: map[string]bool{},
pools: pools,
pubsubTopic: pubsubTopic,
queue: []*taskCandidate{},
queueMtx: sync.RWMutex{},
repos: repos,
swarming: swarmingClient,
taskCfgCache: taskCfgCache,
tCache: tCache,
timeDecayAmt24Hr: timeDecayAmt24Hr,
tryjobs: tryjobs,
window: w,
workdir: workdir,
}
return s, nil
}
// Start initiates the TaskScheduler's goroutines for scheduling tasks. beforeMainLoop
// will be run before each scheduling iteration.
func (s *TaskScheduler) Start(ctx context.Context, enableTryjobs bool, beforeMainLoop func()) {
if enableTryjobs {
s.tryjobs.Start(ctx)
}
lvScheduling := metrics2.NewLiveness("last_successful_task_scheduling")
go util.RepeatCtx(5*time.Second, ctx, func() {
beforeMainLoop()
if err := s.MainLoop(ctx); err != nil {
sklog.Errorf("Failed to run the task scheduler: %s", err)
} else {
lvScheduling.Reset()
}
})
lvUpdate := metrics2.NewLiveness("last_successful_tasks_update")
go util.RepeatCtx(5*time.Minute, ctx, func() {
if err := s.updateUnfinishedTasks(); err != nil {
sklog.Errorf("Failed to run periodic tasks update: %s", err)
} else {
lvUpdate.Reset()
}
})
}
// TaskSchedulerStatus is a struct which provides status information about the
// TaskScheduler.
type TaskSchedulerStatus struct {
LastScheduled time.Time `json:"last_scheduled"`
TopCandidates []*taskCandidate `json:"top_candidates"`
}
// Status returns the current status of the TaskScheduler.
func (s *TaskScheduler) Status() *TaskSchedulerStatus {
s.queueMtx.RLock()
defer s.queueMtx.RUnlock()
candidates := make([]*taskCandidate, 0, NUM_TOP_CANDIDATES)
n := NUM_TOP_CANDIDATES
if len(s.queue) < n {
n = len(s.queue)
}
for _, c := range s.queue[:n] {
candidates = append(candidates, c.Copy())
}
return &TaskSchedulerStatus{
LastScheduled: s.lastScheduled,
TopCandidates: candidates,
}
}
// TaskCandidateSearchTerms includes fields used for searching task candidates.
type TaskCandidateSearchTerms struct {
types.TaskKey
Dimensions []string `json:"dimensions"`
}
// SearchQueue returns all task candidates in the queue which match the given
// TaskKey. Any blank fields are considered to be wildcards.
func (s *TaskScheduler) SearchQueue(q *TaskCandidateSearchTerms) []*taskCandidate {
s.queueMtx.RLock()
defer s.queueMtx.RUnlock()
rv := []*taskCandidate{}
for _, c := range s.queue {
// TODO(borenet): I wish there was a better way to do this.
if q.ForcedJobId != "" && c.ForcedJobId != q.ForcedJobId {
continue
}
if q.Name != "" && c.Name != q.Name {
continue
}
if q.Repo != "" && c.Repo != q.Repo {
continue
}
if q.Revision != "" && c.Revision != q.Revision {
continue
}
if q.Issue != "" && c.Issue != q.Issue {
continue
}
if q.PatchRepo != "" && c.PatchRepo != q.PatchRepo {
continue
}
if q.Patchset != "" && c.Patchset != q.Patchset {
continue
}
if q.Server != "" && c.Server != q.Server {
continue
}
if len(q.Dimensions) > 0 {
ok := true
for _, d := range q.Dimensions {
if !util.In(d, c.TaskSpec.Dimensions) {
ok = false
break
}
}
if !ok {
continue
}
}
rv = append(rv, c.Copy())
}
return rv
}
// RecentSpecsAndCommits returns the lists of recent JobSpec names, TaskSpec
// names and commit hashes.
func (s *TaskScheduler) RecentSpecsAndCommits() ([]string, []string, []string) {
return s.taskCfgCache.RecentSpecsAndCommits()
}
// MaybeTriggerPeriodicJobs triggers all periodic jobs with the given trigger
// name, if those jobs haven't already been triggered.
func (s *TaskScheduler) MaybeTriggerPeriodicJobs(ctx context.Context, triggerName string) error {
// We'll search the jobs we've already triggered to ensure that we don't
// trigger the same jobs multiple times in a day/week/whatever. Search a
// window that is not quite the size of the trigger interval, to allow
// for lag time.
end := time.Now()
var start time.Time
if triggerName == specs.TRIGGER_NIGHTLY {
start = end.Add(-23 * time.Hour)
} else if triggerName == specs.TRIGGER_WEEKLY {
// Note that if the cache window is less than a week, this start
// time isn't going to work as expected. However, we only really
// expect to need to debounce periodic triggers for a short
// window, so anything longer than a few minutes would probably
// be enough, and the ~4 days we normally keep in the cache
// should be more than sufficient.
start = end.Add(-6 * 24 * time.Hour)
} else {
sklog.Warningf("Ignoring unknown periodic trigger %q", triggerName)
return nil
}
// Find the job specs matching the trigger and create Job instances.
jobs := []*types.Job{}
for repoUrl, repo := range s.repos {
master := repo.Get("master")
if master == nil {
return fmt.Errorf("Failed to retrieve branch 'master' for %s", repoUrl)
}
rs := types.RepoState{
Repo: repoUrl,
Revision: master.Hash,
}
cfg, err := s.taskCfgCache.ReadTasksCfg(ctx, rs)
if err != nil {
return fmt.Errorf("Failed to retrieve TaskCfg from %s: %s", repoUrl, err)
}
for name, js := range cfg.Jobs {
if js.Trigger == triggerName {
job, err := s.taskCfgCache.MakeJob(ctx, rs, name)
if err != nil {
return fmt.Errorf("Failed to create job: %s", err)
}
jobs = append(jobs, job)
}
}
}
if len(jobs) == 0 {
return nil
}
// Filter out any jobs which we've already triggered. Generally, we'd
// expect to have triggered all of the jobs or none of them, but there
// might be circumstances which caused us to trigger a partial set.
names := make([]string, 0, len(jobs))
for _, job := range jobs {
names = append(names, job.Name)
}
existing, err := s.jCache.GetMatchingJobsFromDateRange(names, start, end)
if err != nil {
return err
}
jobsToInsert := make([]*types.Job, 0, len(jobs))
for _, job := range jobs {
var prev *types.Job = nil
for _, existingJob := range existing[job.Name] {
if !existingJob.IsTryJob() && !existingJob.IsForce {
// Pick an arbitrary pre-existing job for logging.
prev = existingJob
break
}
}
if prev == nil {
jobsToInsert = append(jobsToInsert, job)
} else {
sklog.Warningf("Already triggered a job for %s (eg. id %s at %s); not triggering again.", job.Name, prev.Id, prev.Created)
}
}
if len(jobsToInsert) == 0 {
return nil
}
// Insert the new jobs into the DB.
if err := s.db.PutJobs(jobsToInsert); err != nil {
return fmt.Errorf("Failed to add periodic jobs: %s", err)
}
sklog.Infof("Created %d periodic jobs for trigger %q", len(jobs), triggerName)
return nil
}
// TriggerJob adds the given Job to the database and returns its ID.
func (s *TaskScheduler) TriggerJob(ctx context.Context, repo, commit, jobName string) (string, error) {
j, err := s.taskCfgCache.MakeJob(ctx, types.RepoState{
Repo: repo,
Revision: commit,
}, jobName)
if err != nil {
return "", err
}
j.IsForce = true
if err := s.db.PutJob(j); err != nil {
return "", err
}
sklog.Infof("Created manually-triggered Job %q", j.Id)
return j.Id, nil
}
// CancelJob cancels the given Job if it is not already finished.
func (s *TaskScheduler) CancelJob(id string) (*types.Job, error) {
// TODO(borenet): Prevent concurrent update of the Job.
j, err := s.jCache.GetJobMaybeExpired(id)
if err != nil {
return nil, err
}
if j.Done() {
return nil, fmt.Errorf("Job %s is already finished with status %s", id, j.Status)
}
j.Status = types.JOB_STATUS_CANCELED
if err := s.jobFinished(j); err != nil {
return nil, err
}
if err := s.db.PutJob(j); err != nil {
return nil, err
}
return j, s.jCache.Update()
}
// ComputeBlamelist computes the blamelist for a new task, specified by name,
// repo, and revision. Returns the list of commits covered by the task, and any
// previous task which part or all of the blamelist was "stolen" from (see
// below). There are three cases:
//
// 1. The new task tests commits which have not yet been tested. Trace commit
// history, accumulating commits until we find commits which have been tested
// by previous tasks.
//
// 2. The new task runs at the same commit as a previous task. This is a retry,
// so the entire blamelist of the previous task is "stolen".
//
// 3. The new task runs at a commit which is in a previous task's blamelist, but
// no task has run at the same commit. This is a bisect. Trace commit
// history, "stealing" commits from the previous task until we find a commit
// which was covered by a *different* previous task.
//
// Args:
// - cache: TaskCache instance.
// - repo: repograph.Graph instance corresponding to the repository of the task.
// - taskName: Name of the task.
// - repoName: Name of the repository for the task.
// - revision: Revision at which the task would run.
// - commitsBuf: Buffer for use as scratch space.
func ComputeBlamelist(ctx context.Context, cache cache.TaskCache, repo *repograph.Graph, taskName, repoName string, revision *repograph.Commit, commitsBuf []*repograph.Commit, newTasks map[types.RepoState]util.StringSet) ([]string, *types.Task, error) {
commitsBuf = commitsBuf[:0]
var stealFrom *types.Task
// Run the helper function to recurse on commit history.
if err := revision.Recurse(func(commit *repograph.Commit) (bool, error) {
// Determine whether any task already includes this commit.
prev, err := cache.GetTaskForCommit(repoName, commit.Hash, taskName)
if err != nil {
return false, err
}
// If the blamelist is too large, just use a single commit.
if len(commitsBuf) > MAX_BLAMELIST_COMMITS {
commitsBuf = append(commitsBuf[:0], revision)
//sklog.Warningf("Found too many commits for %s @ %s; using single-commit blamelist.", taskName, revision.Hash)
return false, ERR_BLAMELIST_DONE
}
// If we're stealing commits from a previous task but the current
// commit is not in any task's blamelist, we must have scrolled past
// the beginning of the tasks. Just return.
if prev == nil && stealFrom != nil {
return false, nil
}
// If a previous task already included this commit, we have to make a decision.
if prev != nil {
// If this Task's Revision is already included in a different
// Task, then we're either bisecting or retrying a task. We'll
// "steal" commits from the previous Task's blamelist.
if len(commitsBuf) == 0 {
stealFrom = prev
// Another shortcut: If our Revision is the same as the
// Revision of the Task we're stealing commits from,
// ie. both tasks ran at the same commit, then this is a
// retry. Just steal all of the commits without doing
// any more work.
if stealFrom.Revision == revision.Hash {
commitsBuf = commitsBuf[:0]
for _, c := range stealFrom.Commits {
ptr := repo.Get(c)
if ptr == nil {
return false, fmt.Errorf("No such commit: %q", c)
}
commitsBuf = append(commitsBuf, ptr)
}
return false, ERR_BLAMELIST_DONE
}
}
if stealFrom == nil || prev.Id != stealFrom.Id {
// If we've hit a commit belonging to a different task,
// we're done.
return false, nil
}
}
// Add the commit.
commitsBuf = append(commitsBuf, commit)
// If the task is new at this commit, stop now.
rs := types.RepoState{
Repo: repoName,
Revision: commit.Hash,
}
if newTasks[rs][taskName] {
sklog.Infof("Task Spec %s was added in %s; stopping blamelist calculation.", taskName, commit.Hash)
return false, nil
}
// Recurse on the commit's parents.
return true, nil
}); err != nil && err != ERR_BLAMELIST_DONE {
return nil, nil, err
}
rv := make([]string, 0, len(commitsBuf))
for _, c := range commitsBuf {
rv = append(rv, c.Hash)
}
return rv, stealFrom, nil
}
// findTaskCandidatesForJobs returns the set of all taskCandidates needed by all
// currently-unfinished jobs.
func (s *TaskScheduler) findTaskCandidatesForJobs(ctx context.Context, unfinishedJobs []*types.Job) (map[types.TaskKey]*taskCandidate, error) {
defer metrics2.FuncTimer().Stop()
// Get the repo+commit+taskspecs for each job.
candidates := map[types.TaskKey]*taskCandidate{}
for _, j := range unfinishedJobs {
if !s.window.TestTime(j.Repo, j.Created) {
continue
}
for tsName := range j.Dependencies {
key := j.MakeTaskKey(tsName)
c, ok := candidates[key]
if !ok {
spec, err := s.taskCfgCache.GetTaskSpec(ctx, j.RepoState, tsName)
if err != nil {
return nil, err
}
c = &taskCandidate{
// NB: Because multiple Jobs may share a Task, the BuildbucketBuildId
// could be inherited from any matching Job. Therefore, this should be
// used for non-critical, informational purposes only.
BuildbucketBuildId: j.BuildbucketBuildId,
Jobs: jobSet(),
TaskKey: key,
TaskSpec: spec,
}
candidates[key] = c
}
c.Jobs[j] = struct{}{}
}
}
sklog.Infof("Found %d task candidates for %d unfinished jobs.", len(candidates), len(unfinishedJobs))
return candidates, nil
}
// filterTaskCandidates reduces the set of taskCandidates to the ones we might
// actually want to run and organizes them by repo and TaskSpec name.
func (s *TaskScheduler) filterTaskCandidates(preFilterCandidates map[types.TaskKey]*taskCandidate) (map[string]map[string][]*taskCandidate, error) {
defer metrics2.FuncTimer().Stop()
candidatesBySpec := map[string]map[string][]*taskCandidate{}
total := 0
for _, c := range preFilterCandidates {
// Reject blacklisted tasks.
if rule := s.bl.MatchRule(c.Name, c.Revision); rule != "" {
sklog.Warningf("Skipping blacklisted task candidate: %s @ %s due to rule %q", c.Name, c.Revision, rule)
continue
}
// Reject tasks for too-old commits, as long as they aren't try jobs.
if !c.IsTryJob() {
if in, err := s.window.TestCommitHash(c.Repo, c.Revision); err != nil {
return nil, err
} else if !in {
continue
}
}
// We shouldn't duplicate pending, in-progress,
// or successfully completed tasks.
prevTasks, err := s.tCache.GetTasksByKey(&c.TaskKey)
if err != nil {
return nil, err
}
var previous *types.Task
if len(prevTasks) > 0 {
// Just choose the last (most recently created) previous
// Task.
previous = prevTasks[len(prevTasks)-1]
}
if previous != nil {
if previous.Status == types.TASK_STATUS_PENDING || previous.Status == types.TASK_STATUS_RUNNING {
continue
}
if previous.Success() {
continue
}
// The attempt counts are only valid if the previous
// attempt we're looking at is the last attempt for this
// TaskSpec. Fortunately, TaskCache.GetTasksByKey sorts
// by creation time, and we've selected the last of the
// results.
maxAttempts := c.TaskSpec.MaxAttempts
if maxAttempts == 0 {
maxAttempts = specs.DEFAULT_TASK_SPEC_MAX_ATTEMPTS
}
// Special case for tasks created before arbitrary
// numbers of attempts were possible.
previousAttempt := previous.Attempt
if previousAttempt == 0 && previous.RetryOf != "" {
previousAttempt = 1
}
if previousAttempt >= maxAttempts-1 {
continue
}
c.Attempt = previousAttempt + 1
c.RetryOf = previous.Id
}
// Don't consider candidates whose dependencies are not met.
depsMet, idsToHashes, err := c.allDepsMet(s.tCache)
if err != nil {
return nil, err
}
if !depsMet {
continue
}
hashes := make([]string, 0, len(idsToHashes))
parentTaskIds := make([]string, 0, len(idsToHashes))
for id, hash := range idsToHashes {
hashes = append(hashes, hash)
parentTaskIds = append(parentTaskIds, id)
}
c.IsolatedHashes = hashes
sort.Strings(parentTaskIds)
c.ParentTaskIds = parentTaskIds
candidates, ok := candidatesBySpec[c.Repo]
if !ok {
candidates = map[string][]*taskCandidate{}
candidatesBySpec[c.Repo] = candidates
}
candidates[c.Name] = append(candidates[c.Name], c)
total++
}
sklog.Infof("Filtered to %d candidates in %d spec categories.", total, len(candidatesBySpec))
return candidatesBySpec, nil
}
// processTaskCandidate computes the remaining information about the task
// candidate, eg. blamelists and scoring.
func (s *TaskScheduler) processTaskCandidate(ctx context.Context, c *taskCandidate, now time.Time, cache *cacheWrapper, commitsBuf []*repograph.Commit) error {
if len(c.Jobs) == 0 {
// Log an error and return to allow scheduling other tasks.
sklog.Errorf("taskCandidate has no Jobs: %#v", c)
c.Score = 0
return nil
}
// Formula for priority is 1 - (1-<job1 priority>)(1-<job2 priority>)...(1-<jobN priority>).
inversePriorityProduct := 1.0
for j := range c.Jobs {
jobPriority := specs.DEFAULT_JOB_SPEC_PRIORITY
if j.Priority <= 1 && j.Priority > 0 {
jobPriority = j.Priority
}
inversePriorityProduct *= 1 - jobPriority
}
priority := 1 - inversePriorityProduct
// Use the earliest Job's Created time, which will maximize priority for older forced/try jobs.
var earliestJob *types.Job
for j := range c.Jobs {
if earliestJob == nil || earliestJob.Created.After(j.Created) {
earliestJob = j
}
}
if c.IsTryJob() {
c.Score = CANDIDATE_SCORE_TRY_JOB + now.Sub(earliestJob.Created).Hours()
// Proritize each subsequent attempt lower than the previous attempt.
for i := 0; i < c.Attempt; i++ {
c.Score *= CANDIDATE_SCORE_TRY_JOB_RETRY_MULTIPLIER
}
c.Score *= priority
return nil
}
// Compute blamelist.
repo, ok := s.repos[c.Repo]
if !ok {
return fmt.Errorf("No such repo: %s", c.Repo)
}
revision := repo.Get(c.Revision)
if revision == nil {
return fmt.Errorf("No such commit %s in %s.", c.Revision, c.Repo)
}
var stealingFrom *types.Task
var commits []string
if !s.window.TestTime(c.Repo, revision.Timestamp) {
// If the commit has scrolled out of our window, don't bother computing
// a blamelist.
commits = []string{}
} else {
var err error
commits, stealingFrom, err = ComputeBlamelist(ctx, cache, repo, c.Name, c.Repo, revision, commitsBuf, s.newTasks)
if err != nil {
return err
}
}
c.Commits = commits
if stealingFrom != nil {
c.StealingFromId = stealingFrom.Id
}
if len(c.Commits) > 0 && !util.In(c.Revision, c.Commits) {
sklog.Errorf("task candidate %s @ %s doesn't have its own revision in its blamelist: %v", c.Name, c.Revision, c.Commits)
}
if c.IsForceRun() {
c.Score = CANDIDATE_SCORE_FORCE_RUN + now.Sub(earliestJob.Created).Hours()
c.Score *= priority
return nil
}
// Score the candidate.
// The score for a candidate is based on the "testedness" increase
// provided by running the task.
stoleFromCommits := 0
if stealingFrom != nil {
// Treat retries as if they're new; don't use stealingFrom.Commits.
if c.RetryOf != "" {
if stealingFrom.Id != c.RetryOf && stealingFrom.ForcedJobId == "" {
sklog.Errorf("Candidate %v is a retry of %s but is stealing commits from %s!", c.TaskKey, c.RetryOf, stealingFrom.Id)
}
} else if stealingFrom.ForcedJobId == c.ForcedJobId {
stoleFromCommits = len(stealingFrom.Commits)
}
}
score := testednessIncrease(len(c.Commits), stoleFromCommits)
// Scale the score by other factors, eg. time decay.
decay, err := s.timeDecayForCommit(now, revision)
if err != nil {
return err
}
score *= decay
score *= priority
c.Score = score
return nil
}
// Process the task candidates.
func (s *TaskScheduler) processTaskCandidates(ctx context.Context, candidates map[string]map[string][]*taskCandidate, now time.Time) ([]*taskCandidate, error) {
defer metrics2.FuncTimer().Stop()
// Get newly-added task specs by repo state.
if err := s.updateAddedTaskSpecs(ctx); err != nil {
return nil, err
}
s.newTasksMtx.RLock()
defer s.newTasksMtx.RUnlock()
processed := make(chan *taskCandidate)
errs := make(chan error)
wg := sync.WaitGroup{}
for _, cs := range candidates {
for _, c := range cs {
wg.Add(1)
go func(candidates []*taskCandidate) {
defer wg.Done()
cache := newCacheWrapper(s.tCache)
commitsBuf := make([]*repograph.Commit, 0, MAX_BLAMELIST_COMMITS)
for {
// Find the best candidate.
idx := -1
var best *taskCandidate
for i, candidate := range candidates {
c := candidate.Copy()
if err := s.processTaskCandidate(ctx, c, now, cache, commitsBuf); err != nil {
errs <- err
return
}
if best == nil || c.Score > best.Score {
best = c
idx = i
}
}
if best == nil {
return
}
processed <- best
t := best.MakeTask()
t.Id = best.MakeId()
cache.insert(t)
if best.StealingFromId != "" {
stoleFrom, err := cache.GetTask(best.StealingFromId)
if err != nil {
errs <- err
return
}
stole := util.NewStringSet(best.Commits)
oldC := util.NewStringSet(stoleFrom.Commits)
newC := oldC.Complement(stole)
commits := make([]string, 0, len(newC))
for c := range newC {
commits = append(commits, c)
}
stoleFrom.Commits = commits
cache.insert(stoleFrom)
}
candidates = append(candidates[:idx], candidates[idx+1:]...)
}
}(c)
}
}
go func() {
wg.Wait()
close(processed)
close(errs)
}()
rvCandidates := []*taskCandidate{}
rvErrs := []error{}
for {
select {
case c, ok := <-processed:
if ok {
rvCandidates = append(rvCandidates, c)
} else {
processed = nil
}
case err, ok := <-errs:
if ok {
rvErrs = append(rvErrs, err)
} else {
errs = nil
}
}
if processed == nil && errs == nil {
break
}
}
if len(rvErrs) != 0 {
return nil, rvErrs[0]
}
sort.Sort(taskCandidateSlice(rvCandidates))
return rvCandidates, nil
}
// flatten all the dimensions in 'dims' into a single valued map.
func flatten(dims map[string]string) map[string]string {
keys := make([]string, 0, len(dims))
for key := range dims {
keys = append(keys, key)
}
sort.Strings(keys)
ret := make([]string, 0, 2*len(dims))
for _, key := range keys {
ret = append(ret, key, dims[key])
}
return map[string]string{"dimensions": strings.Join(ret, " ")}
}
// recordCandidateMetrics generates metrics for candidates by dimension sets.
func (s *TaskScheduler) recordCandidateMetrics(candidates map[string]map[string][]*taskCandidate) {
defer metrics2.FuncTimer().Stop()
// Generate counts. These maps are keyed by the MD5 hash of the
// candidate's TaskSpec's dimensions.
counts := map[string]int64{}
dimensions := map[string]map[string]string{}
for _, byRepo := range candidates {
for _, bySpec := range byRepo {
for _, c := range bySpec {
parseDims, err := swarming.ParseDimensions(c.TaskSpec.Dimensions)
if err != nil {
sklog.Errorf("Failed to parse dimensions: %s", err)
continue
}
dims := make(map[string]string, len(parseDims))
for k, v := range parseDims {
// Just take the first value for each dimension.
dims[k] = v[0]
}
k, err := util.MD5Params(dims)
if err != nil {
sklog.Errorf("Failed to create metrics key: %s", err)
continue
}
dimensions[k] = dims
counts[k]++
}
}
}
// Report the data.
s.candidateMetricsMtx.Lock()
defer s.candidateMetricsMtx.Unlock()
for k, count := range counts {
metric, ok := s.candidateMetrics[k]
if !ok {
metric = metrics2.GetInt64Metric(MEASUREMENT_TASK_CANDIDATE_COUNT, flatten(dimensions[k]))
s.candidateMetrics[k] = metric
}
metric.Update(count)
}
for k, metric := range s.candidateMetrics {
_, ok := counts[k]
if !ok {
metric.Update(0)
delete(s.candidateMetrics, k)
}
}
}
// regenerateTaskQueue obtains the set of all eligible task candidates, scores
// them, and prepares them to be triggered.
func (s *TaskScheduler) regenerateTaskQueue(ctx context.Context, now time.Time) ([]*taskCandidate, error) {
defer metrics2.FuncTimer().Stop()
// Find the unfinished Jobs.
unfinishedJobs, err := s.jCache.UnfinishedJobs()
if err != nil {
return nil, err
}
// Find TaskSpecs for all unfinished Jobs.
preFilterCandidates, err := s.findTaskCandidatesForJobs(ctx, unfinishedJobs)
if err != nil {
return nil, err
}
// Filter task candidates.
candidates, err := s.filterTaskCandidates(preFilterCandidates)
if err != nil {
return nil, err
}
// Record the number of task candidates per dimension set.
s.recordCandidateMetrics(candidates)
// Process the remaining task candidates.
queue, err := s.processTaskCandidates(ctx, candidates, now)
if err != nil {
return nil, err
}
return queue, nil
}
// getCandidatesToSchedule matches the list of free Swarming bots to task
// candidates in the queue and returns the candidates which should be run.
// Assumes that the tasks are sorted in decreasing order by score.
func getCandidatesToSchedule(bots []*swarming_api.SwarmingRpcsBotInfo, tasks []*taskCandidate) []*taskCandidate {
defer metrics2.FuncTimer().Stop()
// Create a bots-by-swarming-dimension mapping.
botsByDim := map[string]util.StringSet{}
for _, b := range bots {
for _, dim := range b.Dimensions {
for _, val := range dim.Value {
d := fmt.Sprintf("%s:%s", dim.Key, val)
if _, ok := botsByDim[d]; !ok {
botsByDim[d] = util.StringSet{}
}
botsByDim[d][b.BotId] = true
}
}
}
// Match bots to tasks.
// TODO(borenet): Some tasks require a more specialized bot. We should
// match so that less-specialized tasks don't "steal" more-specialized
// bots which they don't actually need.
rv := make([]*taskCandidate, 0, len(bots))
for _, c := range tasks {
// TODO(borenet): Make this threshold configurable.
if c.Score <= 0.0 {
sklog.Warningf("candidate %s @ %s has a score of %2f; skipping (%d commits).", c.Name, c.Revision, c.Score, len(c.Commits))
continue
}
// For each dimension of the task, find the set of bots which matches.
matches := util.StringSet{}
for i, d := range c.TaskSpec.Dimensions {
if i == 0 {
matches = matches.Union(botsByDim[d])
} else {
matches = matches.Intersect(botsByDim[d])
}
}
if len(matches) > 0 {
// We're going to run this task. Choose a bot. Sort the
// bots by ID so that the choice is deterministic.
choices := make([]string, 0, len(matches))
for botId := range matches {
choices = append(choices, botId)
}
sort.Strings(choices)
bot := choices[0]
// Remove the bot from consideration.
for dim, subset := range botsByDim {
delete(subset, bot)
if len(subset) == 0 {
delete(botsByDim, dim)
}
}
// Add the task to the scheduling list.
rv = append(rv, c)
// If we've exhausted the bot list, stop here.
if len(botsByDim) == 0 {
break
}
}
}
sort.Sort(taskCandidateSlice(rv))
return rv
}
// isolateTasks sets up the given RepoState and isolates the given
// taskCandidates.
func (s *TaskScheduler) isolateTasks(ctx context.Context, rs types.RepoState, candidates []*taskCandidate) error {
defer metrics2.FuncTimer().Stop()
// Create and check out a temporary repo.
return s.taskCfgCache.TempGitRepo(ctx, rs, true, func(c *git.TempCheckout) error {
// Isolate the tasks.
infraBotsDir := path.Join(c.Dir(), "infra", "bots")
baseDir := path.Dir(c.Dir())
tasks := make([]*isolate.Task, 0, len(candidates))
for _, c := range candidates {
tasks = append(tasks, c.MakeIsolateTask(infraBotsDir, baseDir))
}
hashes, err := s.isolate.IsolateTasks(ctx, tasks)
if err != nil {
return err
}
if len(hashes) != len(candidates) {
return fmt.Errorf("IsolateTasks returned incorrect number of hashes.")
}
for i, c := range candidates {
c.IsolatedInput = hashes[i]
}
return nil
})
}
// isolateCandidates uploads inputs for the taskCandidates to the Isolate
// server. Returns a channel of the successfully-isolated candidates which is
// closed after all candidates have been isolated or failed. Each failure is
// sent to errCh.
func (s *TaskScheduler) isolateCandidates(ctx context.Context, candidates []*taskCandidate, errCh chan<- error) <-chan *taskCandidate {
defer metrics2.FuncTimer().Stop()
// First, group by RepoState since we have to isolate the code at
// that state for each task.
byRepoState := map[types.RepoState][]*taskCandidate{}
for _, c := range candidates {
byRepoState[c.RepoState] = append(byRepoState[c.RepoState], c)
}
// Isolate the tasks by commit.
isolated := make(chan *taskCandidate)
var wg sync.WaitGroup
for rs, candidates := range byRepoState {
wg.Add(1)
go func(rs types.RepoState, candidates []*taskCandidate) {
defer wg.Done()
if err := s.isolateTasks(ctx, rs, candidates); err != nil {
names := make([]string, 0, len(candidates))
for _, c := range candidates {
names = append(names, fmt.Sprintf("%s@%s", c.Name, c.Revision))
}
errCh <- fmt.Errorf("Failed on %s: %s", strings.Join(names, ", "), err)
return
}
for _, c := range candidates {
isolated <- c
}
}(rs, candidates)
}
go func() {
wg.Wait()
close(isolated)
}()
return isolated
}
// triggerTasks triggers the given slice of tasks to run on Swarming and returns
// a channel of the successfully-triggered tasks which is closed after all tasks
// have been triggered or failed. Each failure is sent to errCh.
func (s *TaskScheduler) triggerTasks(isolated <-chan *taskCandidate, errCh chan<- error) <-chan *types.Task {
defer metrics2.FuncTimer().Stop()
triggered := make(chan *types.Task)
var wg sync.WaitGroup
for candidate := range isolated {
wg.Add(1)
go func(candidate *taskCandidate) {
defer wg.Done()
t := candidate.MakeTask()
if err := s.db.AssignId(t); err != nil {
errCh <- fmt.Errorf("Failed to trigger task: %s", err)
return
}
req, err := candidate.MakeTaskRequest(t.Id, s.isolate.ServerURL(), s.pubsubTopic)
if err != nil {
errCh <- fmt.Errorf("Failed to trigger task: %s", err)
return
}
s.pendingInsertMtx.Lock()
s.pendingInsert[t.Id] = true
s.pendingInsertMtx.Unlock()
var resp *swarming_api.SwarmingRpcsTaskRequestMetadata
if err := timeout.Run(func() error {
var err error
resp, err = s.swarming.TriggerTask(req)
return err
}, time.Minute); err != nil {
s.pendingInsertMtx.Lock()
delete(s.pendingInsert, t.Id)
s.pendingInsertMtx.Unlock()
errCh <- fmt.Errorf("Failed to trigger task: %s", err)
return
}
created, err := swarming.ParseTimestamp(resp.Request.CreatedTs)
if err != nil {
errCh <- fmt.Errorf("Failed to trigger task: %s", err)
return
}
t.Created = created
t.SwarmingTaskId = resp.TaskId
triggered <- t
}(candidate)
}
go func() {
wg.Wait()
close(triggered)
}()
return triggered
}
// scheduleTasks queries for free Swarming bots and triggers tasks according
// to relative priorities in the queue.
func (s *TaskScheduler) scheduleTasks(ctx context.Context, bots []*swarming_api.SwarmingRpcsBotInfo, queue []*taskCandidate) error {
defer metrics2.FuncTimer().Stop()
// Match free bots with tasks.
schedule := getCandidatesToSchedule(bots, queue)
// Setup the error channel.
errs := []error{}
errCh := make(chan error)
var errWg sync.WaitGroup
errWg.Add(1)
go func() {
defer errWg.Done()
for err := range errCh {
errs = append(errs, err)
}
}()
// Isolate the tasks by RepoState.
isolated := s.isolateCandidates(ctx, schedule, errCh)
// Trigger Swarming tasks.
triggered := s.triggerTasks(isolated, errCh)
// Collect the tasks we triggered.
numTriggered := 0
insert := map[string]map[string][]*types.Task{}
for t := range triggered {
byRepo, ok := insert[t.Repo]
if !ok {
byRepo = map[string][]*types.Task{}
insert[t.Repo] = byRepo
}
byRepo[t.Name] = append(byRepo[t.Name], t)
numTriggered++
}
close(errCh)
errWg.Wait()
if len(insert) > 0 {
// Insert the newly-triggered tasks into the DB.
if err := s.AddTasks(ctx, insert); err != nil {
errs = append(errs, fmt.Errorf("Triggered tasks but failed to insert into DB: %s", err))
} else {
// Remove the tasks from the pending map.
s.pendingInsertMtx.Lock()
for _, byRepo := range insert {
for _, byName := range byRepo {
for _, t := range byName {
delete(s.pendingInsert, t.Id)
}
}
}
s.pendingInsertMtx.Unlock()
// Organize the triggered task by TaskKey.
remove := make(map[types.TaskKey]*types.Task, numTriggered)
for _, byRepo := range insert {
for _, byName := range byRepo {
for _, t := range byName {
remove[t.TaskKey] = t
}
}
}
if len(remove) != numTriggered {
return fmt.Errorf("WHAAT")
}
// Remove the tasks from the queue.
newQueue := make([]*taskCandidate, 0, len(queue)-numTriggered)
for _, c := range queue {
if _, ok := remove[c.TaskKey]; !ok {
newQueue = append(newQueue, c)
}
}
// Note; if regenerateQueue and scheduleTasks are ever decoupled so that
// the queue is reused by multiple runs of scheduleTasks, we'll need to
// address the fact that some candidates may still have their
// StoleFromId pointing to candidates which have been triggered and
// removed from the queue. In that case, we should just need to write a
// loop which updates those candidates to use the IDs of the newly-
// inserted Tasks in the database rather than the candidate ID.
sklog.Infof("Triggered %d tasks on %d bots (%d in queue).", numTriggered, len(bots), len(queue))
queue = newQueue
}
} else {
sklog.Infof("Triggered no tasks (%d in queue, %d bots available)", len(queue), len(bots))
}
s.queueMtx.Lock()
defer s.queueMtx.Unlock()
s.queue = queue
s.lastScheduled = time.Now()
if len(errs) > 0 {
rvErr := "Got failures: "
for _, e := range errs {
rvErr += fmt.Sprintf("\n%s\n", e)
}
return fmt.Errorf(rvErr)
}
return nil
}
// recurseAllBranches runs the given func on every commit on all branches, with
// some Task Scheduler-specific exceptions.
func (s *TaskScheduler) RecurseAllBranches(ctx context.Context, fn func(string, *repograph.Graph, *repograph.Commit) (bool, error)) error {
for repoUrl, r := range s.repos {
blacklistBranches := BRANCH_BLACKLIST[repoUrl]
blacklistCommits := make(map[*repograph.Commit]string, len(blacklistBranches))
for _, b := range blacklistBranches {
c := r.Get(b)
if c != nil {
blacklistCommits[c] = b
}
}
if err := r.RecurseAllBranches(func(c *repograph.Commit) (bool, error) {
if blacklistBranch, ok := blacklistCommits[c]; ok {
sklog.Infof("Skipping blacklisted branch %q", blacklistBranch)
return false, nil
}
for head, blacklistBranch := range blacklistCommits {
isAncestor, err := r.Repo().IsAncestor(ctx, c.Hash, head.Hash)
if err != nil {
return false, err
} else if isAncestor {
sklog.Infof("Skipping blacklisted branch %q (--is-ancestor)", blacklistBranch)
return false, nil
}
}
if !s.window.TestCommit(repoUrl, c) {
return false, nil
}
return fn(repoUrl, r, c)
}); err != nil {
return err
}
}
return nil
}
// gatherNewJobs finds and inserts Jobs for all new commits.
func (s *TaskScheduler) gatherNewJobs(ctx context.Context) error {
defer metrics2.FuncTimer().Stop()
// Find all new Jobs for all new commits.
newJobs := []*types.Job{}
if err := s.RecurseAllBranches(ctx, func(repoUrl string, r *repograph.Graph, c *repograph.Commit) (bool, error) {
scheduled, err := s.jCache.ScheduledJobsForCommit(repoUrl, c.Hash)
if err != nil {
return false, err
}
if scheduled {
return false, nil
}
rs := types.RepoState{
Repo: repoUrl,
Revision: c.Hash,
}
cfg, err := s.taskCfgCache.ReadTasksCfg(ctx, rs)
if err != nil {
return false, err
}
for name, spec := range cfg.Jobs {
shouldRun := false
if !util.In(spec.Trigger, specs.PERIODIC_TRIGGERS) {
if spec.Trigger == specs.TRIGGER_ANY_BRANCH {
shouldRun = true
} else if spec.Trigger == specs.TRIGGER_MASTER_ONLY {
isAncestor, err := r.Repo().IsAncestor(ctx, c.Hash, "master")
if err != nil {
return false, err
} else if isAncestor {
shouldRun = true
}
}
}
if shouldRun {
j, err := s.taskCfgCache.MakeJob(ctx, rs, name)
if err != nil {
return false, err
}
newJobs = append(newJobs, j)
}
}
if c.Hash == "50537e46e4f0999df0a4707b227000cfa8c800ff" {
// Stop recursing here, since Jobs were added
// in this commit and previous commits won't be
// valid.
return false, nil
}
return true, nil
}); err != nil {
return err
}
if err := s.db.PutJobs(newJobs); err != nil {
return err
}
return s.jCache.Update()
}
// updateAddedTaskSpecs updates the mapping of RepoStates to the new task specs
// they added.
func (s *TaskScheduler) updateAddedTaskSpecs(ctx context.Context) error {
repoStates := []types.RepoState{}
if err := s.RecurseAllBranches(ctx, func(repoUrl string, r *repograph.Graph, c *repograph.Commit) (bool, error) {
repoStates = append(repoStates, types.RepoState{
Repo: repoUrl,
Revision: c.Hash,
})
return true, nil
}); err != nil {
return err
}
newTasks, err := s.taskCfgCache.GetAddedTaskSpecsForRepoStates(ctx, repoStates)
if err != nil {
return err
}
s.newTasksMtx.Lock()
defer s.newTasksMtx.Unlock()
s.newTasks = newTasks
return nil
}
// MainLoop runs a single end-to-end task scheduling loop.
func (s *TaskScheduler) MainLoop(ctx context.Context) error {
defer metrics2.FuncTimer().Stop()
sklog.Infof("Task Scheduler updating...")
var e1, e2 error
var wg1, wg2 sync.WaitGroup
var bots []*swarming_api.SwarmingRpcsBotInfo
wg1.Add(1)
go func() {
defer wg1.Done()
var err error
bots, err = getFreeSwarmingBots(s.swarming, s.busyBots, s.pools)
if err != nil {
e1 = err
return
}
}()
wg2.Add(1)
go func() {
defer wg2.Done()
if err := s.updateRepos(ctx); err != nil {
e2 = err
return
}
}()
now := time.Now()
// TODO(borenet): This is only needed for the perftest because it no
// longer has access to the TaskCache used by TaskScheduler. Since it
// pushes tasks into the DB between executions of MainLoop, we need to
// update the cache here so that we see those changes.
if err := s.tCache.Update(); err != nil {
return err
}
if err := s.jCache.Update(); err != nil {
return err
}
if err := s.updateUnfinishedJobs(); err != nil {
return err
}
if err := s.bl.Update(); err != nil {
return err
}
wg2.Wait()
if e2 != nil {
return e2
}
// Add Jobs for new commits.
if err := s.gatherNewJobs(ctx); err != nil {
return err
}
// Regenerate the queue.
sklog.Infof("Task Scheduler regenerating the queue...")
queue, err := s.regenerateTaskQueue(ctx, now)
if err != nil {
return err
}
wg1.Wait()
if e1 != nil {
return e1
}
sklog.Infof("Task Scheduler scheduling tasks...")
if err := s.scheduleTasks(ctx, bots, queue); err != nil {
return err
}
if err := s.taskCfgCache.Cleanup(time.Now().Sub(s.window.EarliestStart())); err != nil {
return fmt.Errorf("Failed to Cleanup TaskCfgCache: %s", err)
}
return nil
}
// updateRepos syncs the scheduler's repos.
func (s *TaskScheduler) updateRepos(ctx context.Context) error {
defer metrics2.FuncTimer().Stop()
for _, r := range s.repos {
if err := r.Update(ctx); err != nil {
return err
}
}
if err := s.window.Update(); err != nil {
return err
}
return nil
}
// QueueLen returns the length of the queue.
func (s *TaskScheduler) QueueLen() int {
s.queueMtx.RLock()
defer s.queueMtx.RUnlock()
return len(s.queue)
}
// timeDecay24Hr computes a linear time decay amount for the given duration,
// given the requested decay amount at 24 hours.
func timeDecay24Hr(decayAmt24Hr float64, elapsed time.Duration) float64 {
return math.Max(1.0-(1.0-decayAmt24Hr)*(float64(elapsed)/float64(24*time.Hour)), 0.0)
}
// timeDecayForCommit computes a multiplier for a task candidate score based
// on how long ago the given commit landed. This allows us to prioritize more
// recent commits.
func (s *TaskScheduler) timeDecayForCommit(now time.Time, commit *repograph.Commit) (float64, error) {
if s.timeDecayAmt24Hr == 1.0 {
// Shortcut for special case.
return 1.0, nil
}
rv := timeDecay24Hr(s.timeDecayAmt24Hr, now.Sub(commit.Timestamp))
// TODO(benjaminwagner): Change to an exponential decay to prevent
// zero/negative scores.
//if rv == 0.0 {
// sklog.Warningf("timeDecayForCommit is zero. Now: %s, Commit: %s ts %s, TimeDecay: %2f\nDetails: %v", now, commit.Hash, commit.Timestamp, s.timeDecayAmt24Hr, commit)
//}
return rv, nil
}
func (ts *TaskScheduler) GetBlacklist() *blacklist.Blacklist {
return ts.bl
}
// testedness computes the total "testedness" of a set of commits covered by a
// task whose blamelist included N commits. The "testedness" of a task spec at a
// given commit is defined as follows:
//
// -1.0 if no task has ever included this commit for this task spec.
// 1.0 if a task was run for this task spec AT this commit.
// 1.0 / N if a task for this task spec has included this commit, where N is
// the number of commits included in the task.
//
// This function gives the sum of the testedness for a blamelist of N commits.
func testedness(n int) float64 {
if n < 0 {
// This should never happen.
sklog.Errorf("Task score function got a blamelist with %d commits", n)
return -1.0
} else if n == 0 {
// Zero commits have zero testedness.
return 0.0
} else if n == 1 {
return 1.0
} else {
return 1.0 + float64(n-1)/float64(n)
}
}
// testednessIncrease computes the increase in "testedness" obtained by running
// a task with the given blamelist length which may have "stolen" commits from
// a previous task with a different blamelist length. To do so, we compute the
// "testedness" for every commit affected by the task, before and after the
// task would run. We subtract the "before" score from the "after" score to
// obtain the "testedness" increase at each commit, then sum them to find the
// total increase in "testedness" obtained by running the task.
func testednessIncrease(blamelistLength, stoleFromBlamelistLength int) float64 {
// Invalid inputs.
if blamelistLength <= 0 || stoleFromBlamelistLength < 0 {
return -1.0
}
if stoleFromBlamelistLength == 0 {
// This task covers previously-untested commits. Previous testedness
// is -1.0 for each commit in the blamelist.
beforeTestedness := float64(-blamelistLength)
afterTestedness := testedness(blamelistLength)
return afterTestedness - beforeTestedness
} else if blamelistLength == stoleFromBlamelistLength {
// This is a retry. It provides no testedness increase, so shortcut here
// rather than perform the math to obtain the same answer.
return 0.0
} else {
// This is a bisect/backfill.
beforeTestedness := testedness(stoleFromBlamelistLength)
afterTestedness := testedness(blamelistLength) + testedness(stoleFromBlamelistLength-blamelistLength)
return afterTestedness - beforeTestedness
}
}
// getFreeSwarmingBots returns a slice of free swarming bots.
func getFreeSwarmingBots(s swarming.ApiClient, busy *busyBots, pools []string) ([]*swarming_api.SwarmingRpcsBotInfo, error) {
defer metrics2.FuncTimer().Stop()
// Query for free Swarming bots and pending Swarming tasks in all pools.
var wg sync.WaitGroup
bots := []*swarming_api.SwarmingRpcsBotInfo{}
pending := []*swarming_api.SwarmingRpcsTaskResult{}
errs := []error{}
var mtx sync.Mutex
t := time.Time{}
for _, pool := range pools {
// Free bots.
wg.Add(1)
go func(pool string) {
defer wg.Done()
b, err := s.ListFreeBots(pool)
mtx.Lock()
defer mtx.Unlock()
if err != nil {
errs = append(errs, err)
} else {
bots = append(bots, b...)
}
}(pool)
// Pending tasks.
wg.Add(1)
go func(pool string) {
defer wg.Done()
t, err := s.ListTaskResults(t, t, []string{fmt.Sprintf("pool:%s", pool)}, "PENDING", false)
mtx.Lock()
defer mtx.Unlock()
if err != nil {
errs = append(errs, err)
} else {
pending = append(pending, t...)
}
}(pool)
}
wg.Wait()
if len(errs) > 0 {
return nil, fmt.Errorf("Got errors loading bots and tasks from Swarming: %v", errs)
}
rv := make([]*swarming_api.SwarmingRpcsBotInfo, 0, len(bots))
for _, bot := range bots {
if bot.IsDead {
continue
}
if bot.Quarantined {
continue
}
if bot.TaskId != "" {
continue
}
rv = append(rv, bot)
}
busy.RefreshTasks(pending)
return busy.Filter(rv), nil
}
// updateUnfinishedTasks queries Swarming for all unfinished tasks and updates
// their status in the DB.
func (s *TaskScheduler) updateUnfinishedTasks() error {
defer metrics2.FuncTimer().Stop()
// Update the TaskCache.
if err := s.tCache.Update(); err != nil {
return err
}
tasks, err := s.tCache.UnfinishedTasks()
if err != nil {
return err
}
sort.Sort(types.TaskSlice(tasks))
// Query Swarming for all unfinished tasks.
// TODO(borenet): This would be faster if Swarming had a
// get-multiple-tasks-by-ID endpoint.
sklog.Infof("Querying Swarming for %d unfinished tasks.", len(tasks))
var wg sync.WaitGroup
errs := make([]error, len(tasks))
for i, t := range tasks {
wg.Add(1)
go func(idx int, t *types.Task) {
defer wg.Done()
swarmTask, err := s.swarming.GetTask(t.SwarmingTaskId, false)
if err != nil {
errs[idx] = fmt.Errorf("Failed to update unfinished task; failed to get updated task from swarming: %s", err)
return
}
if err := db.UpdateDBFromSwarmingTask(s.db, swarmTask); err != nil {
errs[idx] = fmt.Errorf("Failed to update unfinished task: %s", err)
return
}
}(i, t)
}
wg.Wait()
for _, err := range errs {
if err != nil {
return err
}
}
return s.tCache.Update()
}
// jobFinished marks the Job as finished.
func (s *TaskScheduler) jobFinished(j *types.Job) error {
if !j.Done() {
return fmt.Errorf("jobFinished called on Job with status %q", j.Status)
}
j.Finished = time.Now()
return nil
}
// updateUnfinishedJobs updates all not-yet-finished Jobs to determine if their
// state has changed.
func (s *TaskScheduler) updateUnfinishedJobs() error {
defer metrics2.FuncTimer().Stop()
jobs, err := s.jCache.UnfinishedJobs()
if err != nil {
return err
}
modifiedJobs := make([]*types.Job, 0, len(jobs))
modifiedTasks := make(map[string]*types.Task, len(jobs))
for _, j := range jobs {
tasks, err := s.getTasksForJob(j)
if err != nil {
return err
}
summaries := make(map[string][]*types.TaskSummary, len(tasks))
for k, v := range tasks {
cpy := make([]*types.TaskSummary, 0, len(v))
for _, t := range v {
if existing := modifiedTasks[t.Id]; existing != nil {
t = existing
}
cpy = append(cpy, t.MakeTaskSummary())
// The Jobs list is always sorted.
oldLen := len(t.Jobs)
t.Jobs = util.InsertStringSorted(t.Jobs, j.Id)
if len(t.Jobs) > oldLen {
modifiedTasks[t.Id] = t
}
}
summaries[k] = cpy
}
if !reflect.DeepEqual(summaries, j.Tasks) {
j.Tasks = summaries
j.Status = j.DeriveStatus()
if j.Done() {
if err := s.jobFinished(j); err != nil {
return err
}
}
modifiedJobs = append(modifiedJobs, j)
}
}
if len(modifiedTasks) > 0 {
tasks := make([]*types.Task, 0, len(modifiedTasks))
for _, t := range modifiedTasks {
tasks = append(tasks, t)
}
if err := s.db.PutTasks(tasks); err != nil {
return err
} else if err := s.tCache.Update(); err != nil {
return err
}
}
if len(modifiedJobs) > 0 {
if err := s.db.PutJobs(modifiedJobs); err != nil {
return err
} else if err := s.jCache.Update(); err != nil {
return err
}
}
return nil
}
// getTasksForJob finds all Tasks for the given Job. It returns the Tasks
// in a map keyed by name.
func (s *TaskScheduler) getTasksForJob(j *types.Job) (map[string][]*types.Task, error) {
tasks := map[string][]*types.Task{}
for d := range j.Dependencies {
key := j.MakeTaskKey(d)
gotTasks, err := s.tCache.GetTasksByKey(&key)
if err != nil {
return nil, err
}
tasks[d] = gotTasks
}
return tasks, nil
}
// GetJob returns the given Job.
func (s *TaskScheduler) GetJob(id string) (*types.Job, error) {
return s.jCache.GetJobMaybeExpired(id)
}
// GetTask returns the given Task.
func (s *TaskScheduler) GetTask(id string) (*types.Task, error) {
return s.tCache.GetTaskMaybeExpired(id)
}
// addTasksSingleTaskSpec computes the blamelist for each task in tasks, all of
// which must have the same Repo and Name fields, and inserts/updates them in
// the TaskDB. Also adjusts blamelists of existing tasks.
func (s *TaskScheduler) addTasksSingleTaskSpec(ctx context.Context, tasks []*types.Task) error {
sort.Sort(types.TaskSlice(tasks))
cache := newCacheWrapper(s.tCache)
repoName := tasks[0].Repo
taskName := tasks[0].Name
repo, ok := s.repos[repoName]
if !ok {
return fmt.Errorf("No such repo: %s", repoName)
}
commitsBuf := make([]*repograph.Commit, 0, MAX_BLAMELIST_COMMITS)
updatedTasks := map[string]*types.Task{}
for _, task := range tasks {
if task.Repo != repoName || task.Name != taskName {
return fmt.Errorf("Mismatched Repo or Name: %v", tasks)
}
if task.Id == "" {
if err := s.db.AssignId(task); err != nil {
return err
}
}
if task.IsTryJob() {
updatedTasks[task.Id] = task
continue
}
// Compute blamelist.
revision := repo.Get(task.Revision)
if revision == nil {
return fmt.Errorf("No such commit %s in %s.", task.Revision, task.Repo)
}
if !s.window.TestTime(task.Repo, revision.Timestamp) {
return fmt.Errorf("Can not add task %s with revision %s (at %s) before window start.", task.Id, task.Revision, revision.Timestamp)
}
commits, stealingFrom, err := ComputeBlamelist(ctx, cache, repo, task.Name, task.Repo, revision, commitsBuf, s.newTasks)
if err != nil {
return err
}
task.Commits = commits
if len(task.Commits) > 0 && !util.In(task.Revision, task.Commits) {
sklog.Errorf("task %s (%s @ %s) doesn't have its own revision in its blamelist: %v", task.Id, task.Name, task.Revision, task.Commits)
}
updatedTasks[task.Id] = task
cache.insert(task)
if stealingFrom != nil && stealingFrom.Id != task.Id {
stole := util.NewStringSet(commits)
oldC := util.NewStringSet(stealingFrom.Commits)
newC := oldC.Complement(stole)
if len(newC) == 0 {
stealingFrom.Commits = nil
} else {
newCommits := make([]string, 0, len(newC))
for c := range newC {
newCommits = append(newCommits, c)
}
stealingFrom.Commits = newCommits
}
updatedTasks[stealingFrom.Id] = stealingFrom
cache.insert(stealingFrom)
}
}
putTasks := make([]*types.Task, 0, len(updatedTasks))
for _, task := range updatedTasks {
putTasks = append(putTasks, task)
}
return s.db.PutTasks(putTasks)
}
// AddTasks inserts the given tasks into the TaskDB, updating blamelists. The
// provided Tasks should have all fields initialized except for Commits, which
// will be overwritten, and optionally Id, which will be assigned if necessary.
// AddTasks updates existing Tasks' blamelists, if needed. The provided map
// groups Tasks by repo and TaskSpec name. May return error on partial success.
// May modify Commits and Id of argument tasks on error.
func (s *TaskScheduler) AddTasks(ctx context.Context, taskMap map[string]map[string][]*types.Task) error {
type queueItem struct {
Repo string
Name string
}
queue := map[queueItem]bool{}
for repo, byName := range taskMap {
for name, tasks := range byName {
if len(tasks) == 0 {
continue
}
queue[queueItem{
Repo: repo,
Name: name,
}] = true
}
}
s.newTasksMtx.RLock()
defer s.newTasksMtx.RUnlock()
for i := 0; i < db.NUM_RETRIES; i++ {
if len(queue) == 0 {
return nil
}
if err := s.tCache.Update(); err != nil {
return err
}
done := make(chan queueItem)
errs := make(chan error, len(queue))
wg := sync.WaitGroup{}
for item := range queue {
wg.Add(1)
go func(item queueItem, tasks []*types.Task) {
defer wg.Done()
if err := s.addTasksSingleTaskSpec(ctx, tasks); err != nil {
errs <- err
} else {
done <- item
}
}(item, taskMap[item.Repo][item.Name])
}
go func() {
wg.Wait()
close(done)
close(errs)
}()
for item := range done {
delete(queue, item)
}
rvErrs := []error{}
for err := range errs {
if !db.IsConcurrentUpdate(err) {
sklog.Error(err)
rvErrs = append(rvErrs, err)
}
}
if len(rvErrs) != 0 {
return rvErrs[0]
}
}
if len(queue) > 0 {
return fmt.Errorf("AddTasks: %d consecutive ErrConcurrentUpdate", db.NUM_RETRIES)
}
return nil
}
// ValidateAndAddTask inserts the given task into the TaskDB, updating
// blamelists. Checks that the task has a valid repo, revision, name, etc. The
// task should have all fields initialized except for Commits and Id, which must
// be empty. Updates existing Tasks' blamelists, if needed. May modify Commits
// and Id on error.
func (s *TaskScheduler) ValidateAndAddTask(ctx context.Context, task *types.Task) error {
if task.Id != "" {
return fmt.Errorf("Can not specify Id when adding task. Got: %q", task.Id)
}
if err := task.Validate(); err != nil {
return err
}
if !task.Fake() {
return fmt.Errorf("Only fake tasks supported currently.")
}
// Check RepoState and TaskSpec.
taskCfg, err := s.taskCfgCache.ReadTasksCfg(ctx, task.RepoState)
if err != nil {
return err
}
_, taskSpecExists := taskCfg.Tasks[task.Name]
if taskSpecExists {
return fmt.Errorf("Can not add a fake task for a real task spec.")
}
if util.TimeIsZero(task.Created) {
task.Created = time.Now().UTC()
}
if len(task.Commits) > 0 {
sklog.Warningf("Ignoring Commits in ValidateAndAddTask. %v", task)
}
task.Commits = nil
return s.AddTasks(ctx, map[string]map[string][]*types.Task{
task.Repo: {
task.Name: {task},
},
})
}
// ValidateAndUpdateTask modifies the given task in the TaskDB. Ensures the
// task's blamelist, repo, revision, etc. do not change. The task should have
// all fields initialized.
func (s *TaskScheduler) ValidateAndUpdateTask(task *types.Task) error {
return validateAndUpdateTask(s.db, task)
}
// validateAndUpdateTask implements ValidateAndUpdateTask. Function instead of
// method for easier testing.
func validateAndUpdateTask(d db.TaskDB, task *types.Task) error {
if task.Id == "" {
return fmt.Errorf("Must specify Id when updating task.")
}
if err := task.Validate(); err != nil {
return err
}
if !task.Fake() {
return fmt.Errorf("Only fake tasks supported currently.")
}
old, err := d.GetTaskById(task.Id)
if err != nil {
return err
} else if old == nil {
return fmt.Errorf("No such task %q.", task.Id)
}
if !old.Fake() {
return fmt.Errorf("Can not overwrite real task with fake task.")
}
if !old.DbModified.Equal(task.DbModified) {
return db.ErrConcurrentUpdate
}
if !old.Created.Equal(task.Created) {
return fmt.Errorf("Illegal update: Created time changed.")
}
if old.TaskKey != task.TaskKey {
return fmt.Errorf("Illegal update: TaskKey changed.")
}
if !util.SSliceEqual(old.Commits, util.CopyStringSlice(task.Commits)) {
return fmt.Errorf("Illegal update: Commits changed.")
}
return d.PutTask(task)
}
// HandleSwarmingPubSub loads the given Swarming task ID from Swarming and
// updates the associated types.Task in the database. Returns a bool indicating
// whether the pubsub message should be acknowledged.
func (s *TaskScheduler) HandleSwarmingPubSub(msg *swarming.PubSubTaskMessage) bool {
// First, make sure we have the task in our DB.
if msg.UserData != "" {
// We use ID of the task in our DB for the UserData field.
t, err := s.db.GetTaskById(msg.UserData)
if err != nil {
sklog.Errorf("Swarming Pub/Sub: Failed to retrieve task %q by ID: %s", msg.SwarmingTaskId, msg.UserData)
return true
} else if t == nil {
isPending := false
func() {
s.pendingInsertMtx.RLock()
defer s.pendingInsertMtx.RUnlock()
if s.pendingInsert[msg.UserData] {
isPending = true
}
}()
if isPending {
sklog.Debugf("Received pub/sub message for task which hasn't yet been inserted into the db: %s (%s); not ack'ing message; will try again later.", msg.SwarmingTaskId, msg.UserData)
return false
} else {
sklog.Errorf("Failed to update task %q from pub/sub: no such task ID: %q", msg.SwarmingTaskId, msg.UserData)
return true
}
}
}
// Obtain the Swarming task data.
res, err := s.swarming.GetTask(msg.SwarmingTaskId, false)
if err != nil {
sklog.Errorf("pubsub: Failed to retrieve task from Swarming: %s", err)
return true
}
// Skip unfinished tasks.
if res.CompletedTs == "" {
return true
}
// Update the task in the DB.
if err := db.UpdateDBFromSwarmingTask(s.db, res); err != nil {
// TODO(borenet): Some of these cases should never be hit, after all tasks
// start supplying the ID in msg.UserData. We should be able to remove the logic.
if err == db.ErrNotFound {
id, err := swarming.GetTagValue(res, types.SWARMING_TAG_ID)
if err != nil {
id = "<MISSING ID TAG>"
}
created, err := swarming.ParseTimestamp(res.CreatedTs)
if err != nil {
sklog.Errorf("Failed to parse timestamp: %s; %s", res.CreatedTs, err)
return true
}
if time.Now().Sub(created) < 2*time.Minute {
sklog.Infof("Failed to update task %q: No such task ID: %q. Less than two minutes old; try again later.", msg.SwarmingTaskId, id)
return false
}
sklog.Errorf("Failed to update task %q: No such task ID: %q", msg.SwarmingTaskId, id)
return true
} else if err == db.ErrUnknownId {
expectedSwarmingTaskId := "<unknown>"
id, err := swarming.GetTagValue(res, types.SWARMING_TAG_ID)
if err != nil {
id = "<MISSING ID TAG>"
} else {
t, err := s.db.GetTaskById(id)
if err != nil {
sklog.Errorf("Failed to update task %q; mismatched ID and failed to retrieve task from DB: %s", msg.SwarmingTaskId, err)
return true
} else {
expectedSwarmingTaskId = t.SwarmingTaskId
}
}
sklog.Errorf("Failed to update task %q: Task %s has a different Swarming task ID associated with it: %s", msg.SwarmingTaskId, id, expectedSwarmingTaskId)
return true
} else {
sklog.Errorf("Failed to update task %q: %s", msg.SwarmingTaskId, err)
return true
}
}
return true
}