blob: b9562a1b44260eacccaea7e30658bac8d5b5f44c [file] [log] [blame]
package scheduling
import (
swarming_api ""
ftestutils ""
cache_mocks ""
tcc_mocks ""
tcc_testutils ""
swarming_task_execution ""
swarming_testutils ""
window_mocks ""
const (
scoreDelta = 0.000001
cdPoolName = "cd-pool"
var (
androidTaskDims = map[string]string{
"pool": "Skia",
"os": "Android",
"device_type": "grouper",
linuxTaskDims = map[string]string{
"os": "Ubuntu",
"pool": "Skia",
androidBotDims = map[string][]string{
"pool": {"Skia"},
"os": {"Android"},
"device_type": {"grouper"},
linuxBotDims = map[string][]string{
"os": {"Ubuntu"},
"pool": {"Skia"},
// The following are commits and RepoStates used in tests. The commit
// hashes were chosen to be equal to those generated by MemGit, but the
// contents are completely arbitrary.
lc1 = &vcsinfo.LongCommit{
ShortCommit: &vcsinfo.ShortCommit{
Hash: mem_git.Commit0,
Author: "",
Subject: "First Commit",
Body: "My first commit",
Timestamp: time.Unix(1571926390, 0),
Index: 0,
Branches: map[string]bool{
git.MainBranch: true,
lc2 = &vcsinfo.LongCommit{
ShortCommit: &vcsinfo.ShortCommit{
Hash: mem_git.Commit1,
Author: "",
Subject: "Second Commit",
Body: "My second commit",
Parents: []string{lc1.Hash},
Timestamp: time.Unix(1571926450, 0),
Index: 1,
Branches: map[string]bool{
git.MainBranch: true,
ic1 = &vcsinfo.IndexCommit{
Hash: lc1.Hash,
Index: lc1.Index,
Timestamp: lc1.Timestamp,
ic2 = &vcsinfo.IndexCommit{
Hash: lc2.Hash,
Index: lc2.Index,
Timestamp: lc2.Timestamp,
rs1 = types.RepoState{
Repo: "fake.git",
Revision: lc1.Hash,
rs2 = types.RepoState{
Repo: "fake.git",
Revision: lc2.Hash,
func makeTask(ctx context.Context, name, repo, revision string) *types.Task {
return &types.Task{
Commits: []string{revision},
Created: now.Now(ctx),
TaskKey: types.TaskKey{
RepoState: types.RepoState{
Repo: repo,
Revision: revision,
Name: name,
SwarmingTaskId: "swarmid",
func makeSwarmingRpcsTaskRequestMetadata(t *testing.T, task *types.Task, dims map[string]string) *swarming_api.SwarmingRpcsTaskRequestMetadata {
tag := func(k, v string) string {
return fmt.Sprintf("%s:%s", k, v)
ts := func(t time.Time) string {
if util.TimeIsZero(t) {
return ""
return t.Format(swarming.TIMESTAMP_FORMAT)
abandoned := ""
state := swarming.TASK_STATE_PENDING
failed := false
switch task.Status {
state = swarming.TASK_STATE_BOT_DIED
abandoned = ts(task.Finished)
state = swarming.TASK_STATE_RUNNING
state = swarming.TASK_STATE_COMPLETED
failed = true
state = swarming.TASK_STATE_COMPLETED
// noop
require.FailNow(t, "Unknown task status: %s", task.Status)
tags := []string{
tag(types.SWARMING_TAG_ID, task.Id),
tag(types.SWARMING_TAG_FORCED_JOB_ID, task.ForcedJobId),
tag(types.SWARMING_TAG_NAME, task.Name),
tag(types.SWARMING_TAG_REPO, task.Repo),
tag(types.SWARMING_TAG_REVISION, task.Revision),
for _, p := range task.ParentTaskIds {
tags = append(tags, tag(types.SWARMING_TAG_PARENT_TASK_ID, p))
dimensions := make([]*swarming_api.SwarmingRpcsStringPair, 0, len(dims))
for k, v := range dims {
dimensions = append(dimensions, &swarming_api.SwarmingRpcsStringPair{
Key: k,
Value: v,
var casOutput *swarming_api.SwarmingRpcsCASReference
if task.IsolatedOutput != "" {
var err error
casOutput, err = swarming.MakeCASReference(task.IsolatedOutput, "fake-cas-instance")
require.NoError(t, err)
return &swarming_api.SwarmingRpcsTaskRequestMetadata{
Request: &swarming_api.SwarmingRpcsTaskRequest{
CreatedTs: ts(task.Created),
TaskSlices: []*swarming_api.SwarmingRpcsTaskSlice{
Properties: &swarming_api.SwarmingRpcsTaskProperties{
Dimensions: dimensions,
Tags: tags,
TaskId: task.SwarmingTaskId,
TaskResult: &swarming_api.SwarmingRpcsTaskResult{
AbandonedTs: abandoned,
BotId: task.SwarmingBotId,
CreatedTs: ts(task.Created),
CompletedTs: ts(task.Finished),
Failure: failed,
CasOutputRoot: casOutput,
StartedTs: ts(task.Started),
State: state,
Tags: tags,
TaskId: task.SwarmingTaskId,
// Insert the given data into the caches.
func fillCaches(t sktest.TestingT, ctx context.Context, taskCfgCache task_cfg_cache.TaskCfgCache, rs types.RepoState, cfg *specs.TasksCfg) {
require.NoError(t, taskCfgCache.Set(ctx, rs, cfg, nil))
// Add jobs for the given RepoStates. Assumes that the RepoStates have already
// been added to the caches.
func insertJobs(t sktest.TestingT, ctx context.Context, s *TaskScheduler, rss ...types.RepoState) {
jobs := []*types.Job{}
for _, rs := range rss {
cfg, cachedErr, err := s.taskCfgCache.Get(ctx, rs)
require.NoError(t, err)
require.NoError(t, cachedErr)
for name := range cfg.Jobs {
j, err := task_cfg_cache.MakeJob(ctx, s.taskCfgCache, rs, name)
require.NoError(t, err)
jobs = append(jobs, j)
require.NoError(t, s.putJobsInChunks(ctx, jobs))
// Common setup for TaskScheduler tests.
func setup(t *testing.T) (context.Context, *mem_git.MemGit, *memory.InMemoryDB, *swarming_testutils.TestClient, *TaskScheduler, *mockhttpclient.URLMock, *mocks.CAS, func()) {
ctx, cancel := context.WithCancel(context.Background())
tmp, err := ioutil.TempDir("", "")
require.NoError(t, err)
d := memory.NewInMemoryDB()
swarmingClient := swarming_testutils.NewTestClient()
urlMock := mockhttpclient.NewURLMock()
mg, repo := newMemRepo(t)
hashes := mg.CommitN(2)
// Sanity check.
require.Equal(t, lc1.Hash, hashes[1])
require.Equal(t, lc2.Hash, hashes[0])
repos := repograph.Map{
rs1.Repo: repo,
btProject, btInstance, btCleanup := tcc_testutils.SetupBigTable(t)
taskCfgCache, err := task_cfg_cache.NewTaskCfgCache(ctx, repos, btProject, btInstance, nil)
require.NoError(t, err)
// Cache the RepoStates. This is normally done by the JobCreator.
fillCaches(t, ctx, taskCfgCache, rs1, tcc_testutils.TasksCfg1)
fillCaches(t, ctx, taskCfgCache, rs2, tcc_testutils.TasksCfg2)
cas := &mocks.CAS{}
// Go ahead and mock the single-input merge calls.
cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.CompileCASDigest}).Return(tcc_testutils.CompileCASDigest, nil)
cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.TestCASDigest}).Return(tcc_testutils.TestCASDigest, nil)
cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.PerfCASDigest}).Return(tcc_testutils.PerfCASDigest, nil)
taskExec := swarming_task_execution.NewSwarmingTaskExecutor(swarmingClient, "fake-cas-instance", "")
taskExecs := map[string]types.TaskExecutor{
types.TaskExecutor_Swarming: taskExec,
types.TaskExecutor_UseDefault: taskExec,
s, err := NewTaskScheduler(ctx, d, nil, time.Duration(math.MaxInt64), 0, repos, cas, "fake-cas-instance", taskExecs, urlMock.Client(), 1.0, swarming.POOLS_PUBLIC, cdPoolName, "", taskCfgCache, nil, mem_gcsclient.New("diag_unit_tests"), btInstance, false)
require.NoError(t, err)
// Insert jobs. This is normally done by the JobCreator.
insertJobs(t, ctx, s, rs1, rs2)
return ctx, mg, d, swarmingClient, s, urlMock, cas, func() {
testutils.AssertCloses(t, s)
testutils.RemoveAll(t, tmp)
// runMainLoop calls s.MainLoop, asserts there was no error, and waits for
// background goroutines to finish.
func runMainLoop(t *testing.T, s *TaskScheduler, ctx context.Context) {
require.NoError(t, s.MainLoop(ctx))
func lastDiagnostics(t *testing.T, s *TaskScheduler) taskSchedulerMainLoopDiagnostics {
ctx := context.Background()
lastname := ""
require.NoError(t, s.diagClient.AllFilesInDirectory(ctx, path.Join(s.diagInstance, GCS_MAIN_LOOP_DIAGNOSTICS_DIR), func(item *storage.ObjectAttrs) error {
if lastname == "" || item.Name > lastname {
lastname = item.Name
return nil
require.NotEqual(t, lastname, "")
reader, err := s.diagClient.FileReader(ctx, lastname)
require.NoError(t, err)
defer testutils.AssertCloses(t, reader)
rv := taskSchedulerMainLoopDiagnostics{}
require.NoError(t, json.NewDecoder(reader).Decode(&rv))
return rv
func TestFindTaskCandidatesForJobs(t *testing.T) {
ctx, _, _, _, s, _, _, cleanup := setup(t)
defer cleanup()
test := func(jobs []*types.Job, expect map[types.TaskKey]*TaskCandidate) {
actual, err := s.findTaskCandidatesForJobs(ctx, jobs)
require.NoError(t, err)
assertdeep.Equal(t, expect, actual)
cfg1, cachedErr, err := s.taskCfgCache.Get(ctx, rs1)
require.NoError(t, err)
require.NoError(t, cachedErr)
cfg2, cachedErr, err := s.taskCfgCache.Get(ctx, rs2)
require.NoError(t, err)
require.NoError(t, cachedErr)
// Run on an empty job list, ensure empty list returned.
test([]*types.Job{}, map[types.TaskKey]*TaskCandidate{})
currentTime := now.Now(ctx).UTC()
// Run for one job, ensure that we get the right set of task specs
// returned (ie. all dependencies and their dependencies).
j1 := &types.Job{
Created: currentTime,
Id: "job1id",
Name: tcc_testutils.TestTaskName,
Dependencies: map[string][]string{tcc_testutils.TestTaskName: {tcc_testutils.BuildTaskName}, tcc_testutils.BuildTaskName: {}},
Priority: 0.5,
RepoState: rs1.Copy(),
tc1 := &TaskCandidate{
CasDigests: []string{tcc_testutils.CompileCASDigest},
Jobs: []*types.Job{j1},
TaskKey: types.TaskKey{
RepoState: rs1.Copy(),
Name: tcc_testutils.BuildTaskName,
TaskSpec: cfg1.Tasks[tcc_testutils.BuildTaskName].Copy(),
tc2 := &TaskCandidate{
CasDigests: []string{tcc_testutils.TestCASDigest},
Jobs: []*types.Job{j1},
TaskKey: types.TaskKey{
RepoState: rs1.Copy(),
Name: tcc_testutils.TestTaskName,
TaskSpec: cfg1.Tasks[tcc_testutils.TestTaskName].Copy(),
test([]*types.Job{j1}, map[types.TaskKey]*TaskCandidate{
tc1.TaskKey: tc1,
tc2.TaskKey: tc2,
// Add a job, ensure that its dependencies are added and that the right
// dependencies are de-duplicated.
j2 := &types.Job{
Created: currentTime,
Id: "job2id",
Name: tcc_testutils.TestTaskName,
Dependencies: map[string][]string{tcc_testutils.TestTaskName: {tcc_testutils.BuildTaskName}, tcc_testutils.BuildTaskName: {}},
Priority: 0.6,
RepoState: rs2,
j3 := &types.Job{
Created: currentTime,
Id: "job3id",
Name: tcc_testutils.PerfTaskName,
Dependencies: map[string][]string{tcc_testutils.PerfTaskName: {tcc_testutils.BuildTaskName}, tcc_testutils.BuildTaskName: {}},
Priority: 0.6,
RepoState: rs2,
tc3 := &TaskCandidate{
CasDigests: []string{tcc_testutils.CompileCASDigest},
Jobs: []*types.Job{j2, j3},
TaskKey: types.TaskKey{
RepoState: rs2.Copy(),
Name: tcc_testutils.BuildTaskName,
TaskSpec: cfg2.Tasks[tcc_testutils.BuildTaskName].Copy(),
tc4 := &TaskCandidate{
CasDigests: []string{tcc_testutils.TestCASDigest},
Jobs: []*types.Job{j2},
TaskKey: types.TaskKey{
RepoState: rs2.Copy(),
Name: tcc_testutils.TestTaskName,
TaskSpec: cfg2.Tasks[tcc_testutils.TestTaskName].Copy(),
tc5 := &TaskCandidate{
CasDigests: []string{tcc_testutils.PerfCASDigest},
Jobs: []*types.Job{j3},
TaskKey: types.TaskKey{
RepoState: rs2.Copy(),
Name: tcc_testutils.PerfTaskName,
TaskSpec: cfg2.Tasks[tcc_testutils.PerfTaskName].Copy(),
allCandidates := map[types.TaskKey]*TaskCandidate{
tc1.TaskKey: tc1,
tc2.TaskKey: tc2,
tc3.TaskKey: tc3,
tc4.TaskKey: tc4,
tc5.TaskKey: tc5,
test([]*types.Job{j1, j2, j3}, allCandidates)
// Finish j3, ensure that its task specs no longer show up.
delete(allCandidates, j3.MakeTaskKey(tcc_testutils.PerfTaskName))
// This is hacky, but findTaskCandidatesForJobs accepts an already-
// filtered list of jobs, so we have to pretend it never existed.
tc3.Jobs = tc3.Jobs[:1]
test([]*types.Job{j1, j2}, allCandidates)
// Ensure that we don't generate candidates for jobs at nonexistent commits.
j4 := &types.Job{
Created: currentTime,
Id: "job4id",
Name: tcc_testutils.PerfTaskName,
Dependencies: map[string][]string{tcc_testutils.PerfTaskName: {tcc_testutils.BuildTaskName}, tcc_testutils.BuildTaskName: {}},
Priority: 0.6,
RepoState: types.RepoState{
Repo: rs2.Repo,
Revision: "aaaaabbbbbcccccdddddeeeeefffff1111122222",
test([]*types.Job{j4}, map[types.TaskKey]*TaskCandidate{})
func TestFilterTaskCandidates(t *testing.T) {
ctx, _, _, _, s, _, _, cleanup := setup(t)
defer cleanup()
c1 := rs1.Revision
c2 := rs2.Revision
// Fake out the initial candidates.
k1 := types.TaskKey{
RepoState: rs1,
Name: tcc_testutils.BuildTaskName,
k2 := types.TaskKey{
RepoState: rs1,
Name: tcc_testutils.TestTaskName,
k3 := types.TaskKey{
RepoState: rs2,
Name: tcc_testutils.BuildTaskName,
k4 := types.TaskKey{
RepoState: rs2,
Name: tcc_testutils.TestTaskName,
k5 := types.TaskKey{
RepoState: rs2,
Name: tcc_testutils.PerfTaskName,
candidates := map[types.TaskKey]*TaskCandidate{
k1: {
TaskKey: k1,
TaskSpec: &specs.TaskSpec{},
k2: {
TaskKey: k2,
TaskSpec: &specs.TaskSpec{
Dependencies: []string{tcc_testutils.BuildTaskName},
k3: {
TaskKey: k3,
TaskSpec: &specs.TaskSpec{},
k4: {
TaskKey: k4,
TaskSpec: &specs.TaskSpec{
Dependencies: []string{tcc_testutils.BuildTaskName},
k5: {
TaskKey: k5,
TaskSpec: &specs.TaskSpec{
Dependencies: []string{tcc_testutils.BuildTaskName},
clearDiagnostics := func(candidates map[types.TaskKey]*TaskCandidate) {
for _, c := range candidates {
c.Diagnostics = nil
// Check the initial set of task candidates. The two Build tasks
// should be the only ones available.
c, err := s.filterTaskCandidates(ctx, candidates)
require.NoError(t, err)
require.Len(t, c, 1)
require.Equal(t, 1, len(c[rs1.Repo]))
require.Equal(t, 2, len(c[rs1.Repo][tcc_testutils.BuildTaskName]))
for _, byRepo := range c {
for _, byName := range byRepo {
for _, candidate := range byName {
require.Equal(t, candidate.Name, tcc_testutils.BuildTaskName)
// Check filtering diagnostics. Non-Build tasks have unmet dependencies.
for _, candidate := range candidates {
if candidate.Name != tcc_testutils.BuildTaskName {
require.Equal(t, candidate.Diagnostics.Filtering.UnmetDependencies, []string{tcc_testutils.BuildTaskName})
// Insert a the Build task at c1 (1 dependent) into the database,
// transition through various states.
var t1 *types.Task
for _, byRepo := range c { // Order not guaranteed, find the right candidate.
for _, byName := range byRepo {
for _, candidate := range byName {
if candidate.Revision == c1 {
t1 = makeTask(ctx, candidate.Name, candidate.Repo, candidate.Revision)
require.NotNil(t, t1)
// We shouldn't duplicate pending or running tasks.
for _, status := range []types.TaskStatus{types.TASK_STATUS_PENDING, types.TASK_STATUS_RUNNING} {
t1.Status = status
require.NoError(t, s.putTask(ctx, t1))
c, err = s.filterTaskCandidates(ctx, candidates)
require.NoError(t, err)
require.Len(t, c, 1)
for _, byRepo := range c {
for _, byName := range byRepo {
for _, candidate := range byName {
require.Equal(t, candidate.Name, tcc_testutils.BuildTaskName)
require.Equal(t, c2, candidate.Revision)
// Check filtering diagnostics.
for _, candidate := range candidates {
if candidate.Name != tcc_testutils.BuildTaskName {
// Non-Build tasks have unmet dependencies.
require.Equal(t, candidate.Diagnostics.Filtering.UnmetDependencies, []string{tcc_testutils.BuildTaskName})
} else if candidate.Revision == c1 {
// Blocked by t1
require.Equal(t, candidate.Diagnostics.Filtering.SupersededByTask, t1.Id)
// The task failed. Ensure that its dependents are not candidates, but
// the task itself is back in the list of candidates, in case we want
// to retry.
t1.Status = types.TASK_STATUS_FAILURE
require.NoError(t, s.putTask(ctx, t1))
c, err = s.filterTaskCandidates(ctx, candidates)
require.NoError(t, err)
require.Len(t, c, 1)
for _, byRepo := range c {
require.Len(t, byRepo, 1)
for _, byName := range byRepo {
require.Len(t, byName, 2)
for _, candidate := range byName {
require.Equal(t, candidate.Name, tcc_testutils.BuildTaskName)
// Check filtering diagnostics.
for _, candidate := range candidates {
if candidate.Name != tcc_testutils.BuildTaskName {
// Non-Build tasks have unmet dependencies.
require.Equal(t, candidate.Diagnostics.Filtering.UnmetDependencies, []string{tcc_testutils.BuildTaskName})
// The task succeeded. Ensure that its dependents are candidates and
// the task itself is not.
t1.Status = types.TASK_STATUS_SUCCESS
t1.IsolatedOutput = "fake isolated hash"
require.NoError(t, s.putTask(ctx, t1))
c, err = s.filterTaskCandidates(ctx, candidates)
require.NoError(t, err)
require.Len(t, c, 1)
for _, byRepo := range c {
require.Len(t, byRepo, 2)
for _, byName := range byRepo {
for _, candidate := range byName {
require.False(t, t1.Name == candidate.Name && t1.Revision == candidate.Revision)
// Candidate with k1 is blocked by t1.
require.Equal(t, candidates[k1].Diagnostics.Filtering.SupersededByTask, t1.Id)
// Create the other Build task.
var t2 *types.Task
for _, byRepo := range c {
for _, byName := range byRepo {
for _, candidate := range byName {
if candidate.Revision == c2 && strings.HasPrefix(candidate.Name, "Build-") {
t2 = makeTask(ctx, candidate.Name, candidate.Repo, candidate.Revision)
require.NotNil(t, t2)
t2.Status = types.TASK_STATUS_SUCCESS
t2.IsolatedOutput = "fake isolated hash"
require.NoError(t, s.putTask(ctx, t2))
// All test and perf tasks are now candidates, no build tasks.
c, err = s.filterTaskCandidates(ctx, candidates)
require.NoError(t, err)
require.Len(t, c, 1)
require.Equal(t, 2, len(c[rs1.Repo][tcc_testutils.TestTaskName]))
require.Equal(t, 1, len(c[rs1.Repo][tcc_testutils.PerfTaskName]))
for _, byRepo := range c {
for _, byName := range byRepo {
for _, candidate := range byName {
require.NotEqual(t, candidate.Name, tcc_testutils.BuildTaskName)
// Build candidates are blocked by completed tasks.
require.Equal(t, candidates[k1].Diagnostics.Filtering.SupersededByTask, t1.Id)
require.Equal(t, candidates[k3].Diagnostics.Filtering.SupersededByTask, t2.Id)
// Add a try job. Ensure that no deps have been incorrectly satisfied.
tryKey := k4.Copy()
tryKey.Server = "dummy-server"
tryKey.Issue = "dummy-issue"
tryKey.Patchset = "dummy-patchset"
candidates[tryKey] = &TaskCandidate{
TaskKey: tryKey,
TaskSpec: &specs.TaskSpec{
Dependencies: []string{tcc_testutils.BuildTaskName},
c, err = s.filterTaskCandidates(ctx, candidates)
require.NoError(t, err)
require.Len(t, c, 1)
require.Equal(t, 2, len(c[rs1.Repo][tcc_testutils.TestTaskName]))
require.Equal(t, 1, len(c[rs1.Repo][tcc_testutils.PerfTaskName]))
for _, byRepo := range c {
for _, byName := range byRepo {
for _, candidate := range byName {
require.NotEqual(t, candidate.Name, tcc_testutils.BuildTaskName)
require.False(t, candidate.IsTryJob())
// Check diagnostics for tryKey
require.Equal(t, candidates[tryKey].Diagnostics.Filtering.UnmetDependencies, []string{tcc_testutils.BuildTaskName})
// processTaskCandidate is a helper function for processing a single task
// candidate.
func processTaskCandidate(ctx context.Context, s *TaskScheduler, c *TaskCandidate) error {
candidates := map[string]map[string][]*TaskCandidate{
c.Repo: {
c.Name: []*TaskCandidate{c},
_, err := s.processTaskCandidates(ctx, candidates)
return err
func TestProcessTaskCandidate(t *testing.T) {
_, _, _, _, s, _, _, cleanup := setup(t)
defer cleanup()
currentTime := time.Unix(0, 1470674884000000)
ctx := now.TimeTravelingContext(currentTime)
checkDiagTryForced := func(c *TaskCandidate) {
require.NotNil(t, c.Diagnostics)
diag := c.Diagnostics.Scoring
require.NotNil(t, diag)
require.Equal(t, c.Jobs[0].Priority, diag.Priority)
require.Equal(t, currentTime.Sub(c.Jobs[0].Created).Hours(), diag.JobCreatedHours)
// The remaining fields should always be 0 for try/forced jobs.
require.Equal(t, 0, diag.StoleFromCommits)
require.Equal(t, 0.0, diag.TestednessIncrease)
require.Equal(t, 0.0, diag.TimeDecay)
tryjobRs := types.RepoState{
Patch: types.Patch{
Server: "my-server",
Issue: "my-issue",
Patchset: "my-patchset",
Repo: rs1.Repo,
Revision: rs1.Revision,
tryjob := &types.Job{
Id: "tryjobId",
Created: currentTime.Add(-1 * time.Hour),
Name: "job",
Priority: 0.5,
RepoState: tryjobRs,
c := &TaskCandidate{
Jobs: []*types.Job{tryjob},
TaskKey: types.TaskKey{
Name: tcc_testutils.BuildTaskName,
RepoState: tryjobRs,
TaskSpec: tcc_testutils.TasksCfg1.Tasks[tcc_testutils.BuildTaskName],
require.NoError(t, processTaskCandidate(ctx, s, c))
// Try job candidates have a specific score and no blamelist.
require.InDelta(t, (CANDIDATE_SCORE_TRY_JOB+1.0)*0.5, c.Score, scoreDelta)
require.Nil(t, c.Commits)
// Retries are scored lower.
c = &TaskCandidate{
Attempt: 1,
Jobs: []*types.Job{tryjob},
TaskKey: types.TaskKey{
Name: tcc_testutils.BuildTaskName,
RepoState: tryjobRs,
TaskSpec: tcc_testutils.TasksCfg1.Tasks[tcc_testutils.BuildTaskName],
require.NoError(t, processTaskCandidate(ctx, s, c))
require.Nil(t, c.Commits)
forcedJob := &types.Job{
Id: "forcedJobId",
Created: currentTime.Add(-2 * time.Hour),
Name: tcc_testutils.BuildTaskName,
Priority: 0.5,
RepoState: rs2,
// Manually forced candidates have a blamelist and a specific score.
c = &TaskCandidate{
Jobs: []*types.Job{forcedJob},
TaskKey: types.TaskKey{
Name: tcc_testutils.BuildTaskName,
RepoState: rs2,
ForcedJobId: forcedJob.Id,
TaskSpec: tcc_testutils.TasksCfg2.Tasks[tcc_testutils.BuildTaskName],
require.NoError(t, processTaskCandidate(ctx, s, c))
require.InDelta(t, (CANDIDATE_SCORE_FORCE_RUN+2.0)*0.5, c.Score, scoreDelta)
require.Equal(t, 2, len(c.Commits))
// All other candidates have a blamelist and a time-decayed score.
regularJob := &types.Job{
Id: "regularJobId",
Created: currentTime.Add(-1 * time.Hour),
Name: tcc_testutils.BuildTaskName,
Priority: 0.5,
RepoState: rs2,
c = &TaskCandidate{
Jobs: []*types.Job{regularJob},
TaskKey: types.TaskKey{
Name: tcc_testutils.BuildTaskName,
RepoState: rs2,
TaskSpec: tcc_testutils.TasksCfg2.Tasks[tcc_testutils.BuildTaskName],
require.NoError(t, processTaskCandidate(ctx, s, c))
require.True(t, c.Score > 0)
require.Equal(t, 2, len(c.Commits))
diag := c.GetDiagnostics().Scoring
require.Equal(t, 0.5, diag.Priority)
require.Equal(t, 1.0, diag.JobCreatedHours)
require.Equal(t, 0, diag.StoleFromCommits)
require.Equal(t, 3.5, diag.TestednessIncrease)
require.InDelta(t, 1.0, diag.TimeDecay, scoreDelta)
func TestRegularJobRetryScoring(t *testing.T) {
_, _, _, _, s, _, _, cleanup := setup(t)
defer cleanup()
ctx := now.TimeTravelingContext(time.Date(2021, time.October, 15, 0, 0, 0, 0, time.UTC))
currentTime := now.Now(ctx)
checkDiag := func(c *TaskCandidate) {
require.NotNil(t, c.Diagnostics)
diag := c.Diagnostics.Scoring
require.NotNil(t, diag)
// All candidates in this test have a single Job.
require.Equal(t, c.Jobs[0].Priority, diag.Priority)
require.InDelta(t, currentTime.Sub(c.Jobs[0].Created).Hours(), diag.JobCreatedHours, scoreDelta)
// The commits are added close enough to "now" that there is no time decay.
require.Equal(t, 1.0, diag.TimeDecay)
j1 := &types.Job{
Id: "regularJobId1",
Created: currentTime.Add(-1 * time.Hour),
Name: tcc_testutils.BuildTaskName,
Priority: 0.5,
RepoState: rs1,
j2 := &types.Job{
Id: "regularJobId2",
Created: currentTime.Add(-1 * time.Hour),
Name: tcc_testutils.BuildTaskName,
Priority: 0.5,
RepoState: rs2,
// Candidates at rs1 and rs2
c1 := &TaskCandidate{
Jobs: []*types.Job{j1},
TaskKey: types.TaskKey{
Name: tcc_testutils.BuildTaskName,
RepoState: rs1,
TaskSpec: tcc_testutils.TasksCfg1.Tasks[tcc_testutils.BuildTaskName],
c2 := &TaskCandidate{
Jobs: []*types.Job{j2},
TaskKey: types.TaskKey{
Name: tcc_testutils.BuildTaskName,
RepoState: rs2,
TaskSpec: tcc_testutils.TasksCfg2.Tasks[tcc_testutils.BuildTaskName],
// Regular task at HEAD with 2 commits has score 3.5 scaled by priority 0.5.
require.NoError(t, processTaskCandidate(ctx, s, c2))
require.InDelta(t, 3.5*0.5, c2.Score, scoreDelta)
require.Equal(t, 2, len(c2.Commits))
diag := c2.GetDiagnostics().Scoring
require.Equal(t, 0, diag.StoleFromCommits)
require.Equal(t, 3.5, diag.TestednessIncrease)
// Regular task at HEAD^ (no backfill) with 1 commit has score 2 scaled by
// priority 0.5.
require.NoError(t, processTaskCandidate(ctx, s, c1))
diag = c1.GetDiagnostics().Scoring
require.InDelta(t, 2*0.5, c1.Score, scoreDelta)
require.Equal(t, 1, len(c1.Commits))
require.Equal(t, 0, diag.StoleFromCommits)
require.Equal(t, 2.0, diag.TestednessIncrease)
// Add a task at rs2 that failed.
t2 := makeTask(ctx, c2.Name, c2.Repo, c2.Revision)
t2.Status = types.TASK_STATUS_FAILURE
t2.Commits = util.CopyStringSlice(c2.Commits)
require.NoError(t, s.putTask(ctx, t2))
// Update Attempt and RetryOf before calling processTaskCandidate.
c2.Attempt = 1
c2.RetryOf = t2.Id
// Retry task at rs2 with 2 commits for 2nd of 2 attempts has score 0.75
// scaled by priority 0.5.
require.NoError(t, processTaskCandidate(ctx, s, c2))
require.InDelta(t, 0.75*0.5, c2.Score, scoreDelta)
require.Equal(t, 2, len(c2.Commits))
diag = c2.GetDiagnostics().Scoring
require.Equal(t, 2, diag.StoleFromCommits)
require.Equal(t, 0.0, diag.TestednessIncrease)
// Regular task at rs1 (backfilling failed task) with 1 commit has score 1.25
// scaled by priority 0.5.
diag = &taskCandidateScoringDiagnostics{}
require.NoError(t, processTaskCandidate(ctx, s, c1))
diag = c1.GetDiagnostics().Scoring
require.InDelta(t, 1.25*0.5, c1.Score, scoreDelta)
require.Equal(t, 1, len(c1.Commits))
require.Equal(t, 2, diag.StoleFromCommits)
require.Equal(t, 0.5, diag.TestednessIncrease)
// Actually, the task at rs2 had a mishap.
t2.Status = types.TASK_STATUS_MISHAP
require.NoError(t, s.putTask(ctx, t2))
// Scores should be same as for FAILURE.
require.NoError(t, processTaskCandidate(ctx, s, c2))
diag = c2.GetDiagnostics().Scoring
require.InDelta(t, 0.75*0.5, c2.Score, scoreDelta)
require.Equal(t, 2, len(c2.Commits))
require.Equal(t, 2, diag.StoleFromCommits)
require.Equal(t, 0.0, diag.TestednessIncrease)
require.NoError(t, processTaskCandidate(ctx, s, c1))
diag = c1.GetDiagnostics().Scoring
require.InDelta(t, 1.25*0.5, c1.Score, scoreDelta)
require.Equal(t, 1, len(c1.Commits))
require.Equal(t, 2, diag.StoleFromCommits)
require.Equal(t, 0.5, diag.TestednessIncrease)
func TestProcessTaskCandidates(t *testing.T) {
ctx, _, _, _, s, _, _, cleanup := setup(t)
defer cleanup()
ts := now.Now(ctx)
// Processing of individual candidates is already tested; just verify
// that if we pass in a bunch of candidates they all get processed.
// The JobSpecs do not specify priority, so they use the default of 0.5.
assertProcessed := func(c *TaskCandidate) {
if c.IsTryJob() {
require.True(t, c.Score > CANDIDATE_SCORE_TRY_JOB*0.5)
require.Nil(t, c.Commits)
} else if c.IsForceRun() {
require.True(t, c.Score > CANDIDATE_SCORE_FORCE_RUN*0.5)
require.Equal(t, 2, len(c.Commits))
} else if c.Revision == rs2.Revision {
if c.Name == tcc_testutils.PerfTaskName {
require.InDelta(t, 2.0*0.5, c.Score, scoreDelta)
require.Equal(t, 1, len(c.Commits))
} else if c.Name == tcc_testutils.BuildTaskName {
// Already covered by the forced job, so zero score.
require.InDelta(t, 0, c.Score, scoreDelta)
// Scores below the BuildTask at rs1, so it has a blamelist of 1 commit.
require.Equal(t, 1, len(c.Commits))
} else {
require.InDelta(t, 3.5*0.5, c.Score, scoreDelta)
require.Equal(t, 2, len(c.Commits))
} else {
require.InDelta(t, 0.5*0.5, c.Score, scoreDelta) // These will be backfills.
require.Equal(t, 1, len(c.Commits))
testJob1 := &types.Job{
Id: "testJob1",
Created: ts,
Name: tcc_testutils.TestTaskName,
Priority: 0.5,
RepoState: rs1,
testJob2 := &types.Job{
Id: "testJob2",
Created: ts,
Name: tcc_testutils.TestTaskName,
Priority: 0.5,
RepoState: rs2,
perfJob2 := &types.Job{
Id: "perfJob2",
Created: ts,
Name: tcc_testutils.PerfTaskName,
Priority: 0.5,
RepoState: rs2,
forcedBuildJob2 := &types.Job{
Id: "forcedBuildJob2",
Created: ts,
Name: tcc_testutils.BuildTaskName,
Priority: 0.5,
RepoState: rs2,
tryjobRs := types.RepoState{
Patch: types.Patch{
Server: "my-server",
Issue: "my-issue",
Patchset: "my-patchset",
Repo: rs1.Repo,
Revision: rs1.Revision,
perfTryjob2 := &types.Job{
Id: "perfJob2",
Created: ts,
Name: tcc_testutils.PerfTaskName,
Priority: 0.5,
RepoState: tryjobRs,
candidates := map[string]map[string][]*TaskCandidate{
rs1.Repo: {
tcc_testutils.BuildTaskName: {
Jobs: []*types.Job{testJob1},
TaskKey: types.TaskKey{
RepoState: rs1,
Name: tcc_testutils.BuildTaskName,
TaskSpec: &specs.TaskSpec{},
Jobs: []*types.Job{testJob2, perfJob2},
TaskKey: types.TaskKey{
RepoState: rs2,
Name: tcc_testutils.BuildTaskName,
TaskSpec: &specs.TaskSpec{},
Jobs: []*types.Job{forcedBuildJob2},
TaskKey: types.TaskKey{
RepoState: rs2,
Name: tcc_testutils.BuildTaskName,
ForcedJobId: forcedBuildJob2.Id,
TaskSpec: &specs.TaskSpec{},
tcc_testutils.TestTaskName: {
Jobs: []*types.Job{testJob1},
TaskKey: types.TaskKey{
RepoState: rs1,
Name: tcc_testutils.TestTaskName,
TaskSpec: &specs.TaskSpec{},
Jobs: []*types.Job{testJob2},
TaskKey: types.TaskKey{
RepoState: rs2,
Name: tcc_testutils.TestTaskName,
TaskSpec: &specs.TaskSpec{},
tcc_testutils.PerfTaskName: {
Jobs: []*types.Job{perfJob2},
TaskKey: types.TaskKey{
RepoState: rs2,
Name: tcc_testutils.PerfTaskName,
TaskSpec: &specs.TaskSpec{},
Jobs: []*types.Job{perfTryjob2},
TaskKey: types.TaskKey{
RepoState: tryjobRs,
Name: tcc_testutils.PerfTaskName,
TaskSpec: &specs.TaskSpec{},
processed, err := s.processTaskCandidates(ctx, candidates)
require.NoError(t, err)
require.Len(t, processed, 7)
for _, c := range processed {
func TestTestedness(t *testing.T) {
tc := []struct {
in int
out float64
in: -1,
out: -1.0,
in: 0,
out: 0.0,
in: 1,
out: 1.0,
in: 2,
out: 1.0 + 1.0/2.0,
in: 3,
out: 1.0 + float64(2.0)/float64(3.0),
in: 4,
out: 1.0 + 3.0/4.0,
in: 4096,
out: 1.0 + float64(4095)/float64(4096),
for i, c := range tc {
require.Equal(t, c.out, testedness(, fmt.Sprintf("test case #%d", i))
func TestTestednessIncrease(t *testing.T) {
tc := []struct {
a int
b int
out float64
// Invalid cases.
a: -1,
b: 10,
out: -1.0,
a: 10,
b: -1,
out: -1.0,
a: 0,
b: -1,
out: -1.0,
a: 0,
b: 0,
out: -1.0,
// Invalid because if we're re-running at already-tested commits
// then we should have a blamelist which is at most the size of
// the blamelist of the previous task. We naturally get negative
// testedness increase in these cases.
a: 2,
b: 1,
out: -0.5,
// Testing only new commits.
a: 1,
b: 0,
out: 1.0 + 1.0,
a: 2,
b: 0,
out: 2.0 + (1.0 + 1.0/2.0),
a: 3,
b: 0,
out: 3.0 + (1.0 + float64(2.0)/float64(3.0)),
a: 4096,
b: 0,
out: 4096.0 + (1.0 + float64(4095.0)/float64(4096.0)),
// Retries.
a: 1,
b: 1,
out: 0.0,
a: 2,
b: 2,
out: 0.0,
a: 3,
b: 3,
out: 0.0,
a: 4096,
b: 4096,
out: 0.0,
// Bisect/backfills.
a: 1,
b: 2,
out: 0.5, // (1 + 1) - (1 + 1/2)
a: 1,
b: 3,
out: float64(2.5) - (1.0 + float64(2.0)/float64(3.0)),
a: 5,
b: 10,
out: 2.0*(1.0+float64(4.0)/float64(5.0)) - (1.0 + float64(9.0)/float64(10.0)),
for i, c := range tc {
require.Equal(t, c.out, testednessIncrease(c.a, c.b), fmt.Sprintf("test case #%d", i))
func TestComputeBlamelist(t *testing.T) {
// Setup.
nowCtx := now.TimeTravelingContext(mem_git.BaseTime.Add(time.Hour))
ctx, cancel := context.WithCancel(nowCtx)
defer cancel()
mg, repo := newMemRepo(t)
repos := repograph.Map{
rs1.Repo: repo,
d := memory.NewInMemoryTaskDB()
w, err := window.New(ctx, 2*time.Hour, 0, nil)
cache, err := cache.NewTaskCache(ctx, d, w, nil)
require.NoError(t, err)
btProject, btInstance, btCleanup := tcc_testutils.SetupBigTable(t)
defer btCleanup()
tcc, err := task_cfg_cache.NewTaskCfgCache(ctx, repos, btProject, btInstance, nil)
require.NoError(t, err)
// The test repo is laid out like this:
// * T (HEAD, main, Case #12)
// * S (Time travel commit; before the start of the window)
// * R (Case #11)
// |\
// * | Q
// | * P
// |/
// * O (Case #9)
// * N
// * M (Case #10)
// * L
// * K (Case #6)
// * J (Case #5)
// |\
// | * I
// | * H (Case #4)
// * | G
// * | F (Case #3)
// * | E (Case #8, previously #7)
// |/
// * D (Case #2)
// * C (Case #1)
// ...
// * B (Case #0)
// * A
// * _ (No TasksCfg; blamelists shouldn't include this)
hashes := map[string]string{}
name := "Test-Ubuntu12-ShuttleA-GTX660-x86-Release"
taskCfg := &specs.TasksCfg{
Tasks: map[string]*specs.TaskSpec{
name: {},
commit := func(name string) {
hashes[name] = mg.Commit(name)
require.NoError(t, tcc.Set(ctx, types.RepoState{
Repo: rs1.Repo,
Revision: hashes[name],
}, taskCfg, nil))
// Initial commit.
hashes["_"] = mg.Commit("_")
// First commit containing TasksCfg.
type testCase struct {
Revision string
Expected []string
StoleFromIdx int
TaskName string
ids := []string{}
commitsBuf := make([]*repograph.Commit, 0, MAX_BLAMELIST_COMMITS)
test := func(tc *testCase) {
// Update the repo.
require.NoError(t, repo.Update(ctx))
// Self-check: make sure we don't pass in empty commit hashes.
for _, h := range tc.Expected {
require.NotEqual(t, h, "")
// Ensure that we get the expected blamelist.
revision := repo.Get(tc.Revision)
require.NotNil(t, revision)
taskName := tc.TaskName
if taskName == "" {
taskName = name
commits, stoleFrom, err := ComputeBlamelist(ctx, cache, repo, taskName, rs1.Repo, revision, commitsBuf, tcc, w)
if tc.Revision == "" {
require.Error(t, err)
} else {
require.NoError(t, err)
assertdeep.Equal(t, tc.Expected, commits)
if tc.StoleFromIdx >= 0 {
require.NotNil(t, stoleFrom)
require.Equal(t, ids[tc.StoleFromIdx], stoleFrom.Id)
} else {
require.Nil(t, stoleFrom)
// Insert the task into the DB.
c := &TaskCandidate{
TaskKey: types.TaskKey{
RepoState: types.RepoState{
Repo: rs1.Repo,
Revision: tc.Revision,
Name: taskName,
TaskSpec: &specs.TaskSpec{},
task := c.MakeTask()
task.Commits = commits
task.Created = now.Now(ctx)
if stoleFrom != nil {
// Re-insert the stoleFrom task without the commits
// which were stolen from it.
stoleFromCommits := make([]string, 0, len(stoleFrom.Commits)-len(commits))
for _, commit := range stoleFrom.Commits {
if !util.In(commit, task.Commits) {
stoleFromCommits = append(stoleFromCommits, commit)
stoleFrom.Commits = stoleFromCommits
require.NoError(t, d.PutTasks(ctx, []*types.Task{task, stoleFrom}))
cache.AddTasks([]*types.Task{task, stoleFrom})
} else {
require.NoError(t, d.PutTask(ctx, task))
ids = append(ids, task.Id)
require.NoError(t, cache.Update(ctx))
// Commit B.
// Test cases. Each test case builds on the previous cases.
// 0. The first task, at HEAD.
Revision: hashes["B"],
Expected: []string{hashes["B"], hashes["A"]},
StoleFromIdx: -1,
// Test the blamelist too long case by creating a bunch of commits.
for i := 0; i < MAX_BLAMELIST_COMMITS+1; i++ {
// 1. Blamelist too long, not a branch head.
Revision: hashes["C"],
Expected: []string{hashes["C"]},
StoleFromIdx: -1,
// 2. Blamelist too long, is a branch head.
Revision: hashes["D"],
Expected: []string{hashes["D"]},
StoleFromIdx: -1,
// Create the remaining commits.
mg.NewBranch("otherbranch", hashes["D"])
hashes["J"] = mg.Merge("otherbranch")
require.NoError(t, tcc.Set(ctx, types.RepoState{
Repo: rs1.Repo,
Revision: hashes["J"],
}, taskCfg, nil))
// 3. On a linear set of commits, with at least one previous task.
Revision: hashes["F"],
Expected: []string{hashes["E"], hashes["F"]},
StoleFromIdx: -1,
// 4. The first task on a new branch.
Revision: hashes["H"],
Expected: []string{hashes["H"]},
StoleFromIdx: -1,
// 5. After a merge.
Revision: hashes["J"],
Expected: []string{hashes["G"], hashes["I"], hashes["J"]},
StoleFromIdx: -1,
// 6. One last "normal" task.
Revision: hashes["K"],
Expected: []string{hashes["K"]},
StoleFromIdx: -1,
// 7. Steal commits from a previously-ingested task.
Revision: hashes["E"],
Expected: []string{hashes["E"]},
StoleFromIdx: 3,
// Ensure that task #8 really stole the commit from #3.
task, err := cache.GetTask(ids[3])
require.NoError(t, err)
require.False(t, util.In(hashes["E"], task.Commits), fmt.Sprintf("Expected not to find %s in %v", hashes["E"], task.Commits))
// 8. Retry #7.
Revision: hashes["E"],
Expected: []string{hashes["E"]},
StoleFromIdx: 7,
// Ensure that task #8 really stole the commit from #7.
task, err = cache.GetTask(ids[7])
require.NoError(t, err)
require.Equal(t, 0, len(task.Commits))
// Four more commits.
// 9. Not really a test case, but setting up for #10.
Revision: hashes["O"],
Expected: []string{hashes["L"], hashes["M"], hashes["N"], hashes["O"]},
StoleFromIdx: -1,
// 10. Steal *two* commits from #9.
Revision: hashes["M"],
Expected: []string{hashes["L"], hashes["M"]},
StoleFromIdx: 9,
// 11. Verify that we correctly track when task specs were added.
mg.NewBranch("otherbranch2", hashes["O"])
newTaskCfg := taskCfg.Copy()
newTaskCfg.Tasks["added-task"] = &specs.TaskSpec{}
require.NoError(t, tcc.Set(ctx, types.RepoState{
Repo: rs1.Repo,
Revision: hashes["Q"],
}, newTaskCfg, nil))
hashes["R"] = mg.Merge("otherbranch2")
require.NoError(t, tcc.Set(ctx, types.RepoState{
Repo: rs1.Repo,
Revision: hashes["R"],
}, newTaskCfg, nil))
// Existing task should get a normal blamelist of all three new commits.
Revision: hashes["R"],
Expected: []string{hashes["P"], hashes["Q"], hashes["R"]},
StoleFromIdx: -1,
// The added task's blamelist should only include commits at which the
// task was defined, ie. not P.
Revision: hashes["R"],
Expected: []string{hashes["Q"], hashes["R"]},
StoleFromIdx: -1,
TaskName: "added-task",
// 12. Stop computing blamelists when we reach a commit outside of the
// scheduling window.
hashes["S"] = mg.CommitAt("S", w.EarliestStart().Add(-time.Hour))
require.NoError(t, tcc.Set(ctx, types.RepoState{
Repo: rs1.Repo,
Revision: hashes["S"],
}, taskCfg, nil))
Revision: hashes["T"],
Expected: []string{hashes["T"]},
StoleFromIdx: -1,
func TestTimeDecay24Hr(t *testing.T) {
tc := []struct {
decayAmt24Hr float64
elapsed time.Duration
out float64
decayAmt24Hr: 1.0,
elapsed: 10 * time.Hour,
out: 1.0,
decayAmt24Hr: 0.5,
elapsed: 0 * time.Hour,
out: 1.0,
decayAmt24Hr: 0.5,
elapsed: 24 * time.Hour,
out: 0.5,
decayAmt24Hr: 0.5,
elapsed: 12 * time.Hour,
out: 0.75,
decayAmt24Hr: 0.5,
elapsed: 36 * time.Hour,
out: 0.25,
decayAmt24Hr: 0.5,
elapsed: 48 * time.Hour,
out: 0.0,
decayAmt24Hr: 0.5,
elapsed: 72 * time.Hour,
out: 0.0,
for i, c := range tc {
require.Equal(t, c.out, timeDecay24Hr(c.decayAmt24Hr, c.elapsed), fmt.Sprintf("test case #%d", i))
func TestRegenerateTaskQueue(t *testing.T) {
ctx, _, _, _, s, _, _, cleanup := setup(t)
defer cleanup()
// Ensure that the queue is initially empty.
require.Equal(t, 0, len(s.queue))
c1 := rs1.Revision
c2 := rs2.Revision
// Regenerate the task queue.
queue, _, err := s.regenerateTaskQueue(ctx)
require.NoError(t, err)
require.Len(t, queue, 2) // Two Build tasks.
testSort := func() {
// Ensure that we sorted correctly.
if len(queue) == 0 {
highScore := queue[0].Score
for _, c := range queue {
require.True(t, highScore >= c.Score)
highScore = c.Score
// Since we haven't run any task yet, we should have the two Build
// tasks.
// The one at HEAD should have a two-commit blamelist and a
// score of 3.5, scaled by a priority of 0.875 due to three jobs
// depending on it (1 - 0.5^3).
require.Equal(t, tcc_testutils.BuildTaskName, queue[0].Name)
require.Equal(t, []string{c2, c1}, queue[0].Commits)
require.Equal(t, 3, len(queue[0].Jobs))
require.InDelta(t, 3.5*0.875, queue[0].Score, scoreDelta)
// The other should have one commit in its blamelist and
// a score of 0.5, scaled by a priority of 0.75 due to two jobs.
require.Equal(t, tcc_testutils.BuildTaskName, queue[1].Name)
require.Equal(t, []string{c1}, queue[1].Commits)
require.InDelta(t, 0.5*0.75, queue[1].Score, scoreDelta)
// Insert the task at c1, even though it scored lower.
t1 := makeTask(ctx, queue[1].Name, queue[1].Repo, queue[1].Revision)
require.NotNil(t, t1)
t1.Status = types.TASK_STATUS_SUCCESS
t1.IsolatedOutput = "fake isolated hash"
require.NoError(t, s.putTask(ctx, t1))
// Regenerate the task queue.
queue, _, err = s.regenerateTaskQueue(ctx)
require.NoError(t, err)
// Now we expect the queue to contain the other Build task and the one
// Test task we unblocked by running the first Build task.
require.Len(t, queue, 2)
for _, c := range queue {
if c.Name == tcc_testutils.TestTaskName {
require.InDelta(t, 2.0*0.5, c.Score, scoreDelta)
require.Equal(t, 1, len(c.Commits))
} else {
require.Equal(t, c.Name, tcc_testutils.BuildTaskName)
require.InDelta(t, 2.0*0.875, c.Score, scoreDelta)
require.Equal(t, []string{c.Revision}, c.Commits)
buildIdx := 0
testIdx := 1
if queue[1].Name == tcc_testutils.BuildTaskName {
buildIdx = 1
testIdx = 0
require.Equal(t, tcc_testutils.BuildTaskName, queue[buildIdx].Name)
require.Equal(t, c2, queue[buildIdx].Revision)
require.Equal(t, tcc_testutils.TestTaskName, queue[testIdx].Name)
require.Equal(t, c1, queue[testIdx].Revision)
// Run the other Build task.
t2 := makeTask(ctx, queue[buildIdx].Name, queue[buildIdx].Repo, queue[buildIdx].Revision)
t2.Status = types.TASK_STATUS_SUCCESS
t2.IsolatedOutput = "fake isolated hash"
require.NoError(t, s.putTask(ctx, t2))
// Regenerate the task queue.
queue, _, err = s.regenerateTaskQueue(ctx)
require.NoError(t, err)
require.Len(t, queue, 3)
perfIdx := -1
for i, c := range queue {
if c.Name == tcc_testutils.PerfTaskName {
perfIdx = i
require.Equal(t, c2, c.Revision)
require.InDelta(t, 2.0*0.5, c.Score, scoreDelta)
require.Equal(t, []string{c.Revision}, c.Commits)
} else {
require.Equal(t, c.Name, tcc_testutils.TestTaskName)
if c.Revision == c2 {
require.InDelta(t, 3.5*0.5, c.Score, scoreDelta)
require.Equal(t, []string{c2, c1}, c.Commits)
} else {
require.InDelta(t, 0.5*0.5, c.Score, scoreDelta)
require.Equal(t, []string{c.Revision}, c.Commits)
require.True(t, perfIdx > -1)
// Run the Test task at tip of tree; its blamelist covers both commits.
t3 := makeTask(ctx, tcc_testutils.TestTaskName, rs1.Repo, c2)
t3.Commits = []string{c2, c1}
t3.Status = types.TASK_STATUS_SUCCESS
t3.IsolatedOutput = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/256"
require.NoError(t, s.putTask(ctx, t3))
// Regenerate the task queue.
queue, _, err = s.regenerateTaskQueue(ctx)
require.NoError(t, err)
// Now we expect the queue to contain one Test and one Perf task. The
// Test task is a backfill, and should have a score of 0.5, scaled by
// the priority of 0.5.
require.Len(t, queue, 2)
// First candidate should be the perf task.
require.Equal(t, tcc_testutils.PerfTaskName, queue[0].Name)
require.InDelta(t, 2.0*0.5, queue[0].Score, scoreDelta)
// The test task is next, a backfill.
require.Equal(t, tcc_testutils.TestTaskName, queue[1].Name)
require.InDelta(t, 0.5*0.5, queue[1].Score, scoreDelta)
func makeTaskCandidate(name string, dims []string) *TaskCandidate {
return &TaskCandidate{
Score: 1.0,
TaskKey: types.TaskKey{
Name: name,
TaskSpec: &specs.TaskSpec{
Dimensions: dims,
func makeSwarmingBot(id string, dims []string) *types.Machine {
return &types.Machine{
ID: id,
Dimensions: dims,
func TestGetCandidatesToSchedule(t *testing.T) {
ctx := context.Background()
// Empty lists.
rv := getCandidatesToSchedule(ctx, []*types.Machine{}, []*TaskCandidate{})
require.Empty(t, rv)
// checkDiags takes a list of bots with the same dimensions and a list of
// ordered candidates that match those bots and checks the Diagnostics for
// candidates.
checkDiags := func(bots []*types.Machine, candidates []*TaskCandidate) {
var expectedBots []string
if len(bots) > 0 {
expectedBots = make([]string, len(bots), len(bots))
for i, b := range bots {
expectedBots[i] = b.ID
for i, c := range candidates {
// These conditions are not tested.
require.False(t, c.Diagnostics.Scheduling.OverSchedulingLimitPerTaskSpec)
require.False(t, c.Diagnostics.Scheduling.ScoreBelowThreshold)
// NoBotsAvailable and MatchingBots will be the same for all candidates.
require.Equal(t, len(expectedBots) == 0, c.Diagnostics.Scheduling.NoBotsAvailable)
require.Equal(t, expectedBots, c.Diagnostics.Scheduling.MatchingBots)
require.Equal(t, i, c.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates)
if i == 0 {
// First candidate.
require.Nil(t, c.Diagnostics.Scheduling.LastSimilarCandidate)
} else {
last := candidates[i-1]
require.Equal(t, &last.TaskKey, c.Diagnostics.Scheduling.LastSimilarCandidate)
require.Equal(t, i < len(bots), c.Diagnostics.Scheduling.Selected)
// Clear diagnostics for next test.
for _, c := range candidates {
c.Diagnostics = nil
t1 := makeTaskCandidate("task1", []string{"k:v"})
rv = getCandidatesToSchedule(ctx, []*types.Machine{}, []*TaskCandidate{t1})
require.Empty(t, rv)
checkDiags([]*types.Machine{}, []*TaskCandidate{t1})
b1 := makeSwarmingBot("bot1", []string{"k:v"})
rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{})
require.Empty(t, rv)
// Single match.
rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{t1})
assertdeep.Equal(t, []*TaskCandidate{t1}, rv)
checkDiags([]*types.Machine{b1}, []*TaskCandidate{t1})
// No match.
t1.TaskSpec.Dimensions[0] = "k:v2"
rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{t1})
require.Empty(t, rv)
checkDiags([]*types.Machine{}, []*TaskCandidate{t1})
// Add a task candidate to match b1.
t1 = makeTaskCandidate("task1", []string{"k:v2"})
t2 := makeTaskCandidate("task2", []string{"k:v"})
rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{t1, t2})
assertdeep.Equal(t, []*TaskCandidate{t2}, rv)
checkDiags([]*types.Machine{}, []*TaskCandidate{t1})
checkDiags([]*types.Machine{b1}, []*TaskCandidate{t2})
// Switch the task order.
t1 = makeTaskCandidate("task1", []string{"k:v2"})
t2 = makeTaskCandidate("task2", []string{"k:v"})
rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{t2, t1})
assertdeep.Equal(t, []*TaskCandidate{t2}, rv)
checkDiags([]*types.Machine{}, []*TaskCandidate{t1})
checkDiags([]*types.Machine{b1}, []*TaskCandidate{t2})
// Make both tasks match the bot, ensure that we pick the first one.
t1 = makeTaskCandidate("task1", []string{"k:v"})
t2 = makeTaskCandidate("task2", []string{"k:v"})
rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{t1, t2})
assertdeep.Equal(t, []*TaskCandidate{t1}, rv)
checkDiags([]*types.Machine{b1}, []*TaskCandidate{t1, t2})
rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{t2, t1})
assertdeep.Equal(t, []*TaskCandidate{t2}, rv)
checkDiags([]*types.Machine{b1}, []*TaskCandidate{t2, t1})
// Multiple dimensions. Ensure that different permutations of the bots
// and tasks lists give us the expected results.
dims := []string{"k:v", "k2:v2", "k3:v3"}
b1 = makeSwarmingBot("bot1", dims)
b2 := makeSwarmingBot("bot2", t1.TaskSpec.Dimensions)
t1 = makeTaskCandidate("task1", []string{"k:v"})
t2 = makeTaskCandidate("task2", dims)
// In the first two cases, the task with fewer dimensions has the
// higher priority. It gets the bot with more dimensions because it
// is first in sorted order. The second task does not get scheduled
// because there is no bot available which can run it.
// TODO(borenet): Use a more optimal solution to avoid this case.
rv = getCandidatesToSchedule(ctx, []*types.Machine{b1, b2}, []*TaskCandidate{t1, t2})
assertdeep.Equal(t, []*TaskCandidate{t1}, rv)
// Can't use checkDiags for these cases.
require.Equal(t, []string{b1.ID, b2.ID}, t1.Diagnostics.Scheduling.MatchingBots)
require.Equal(t, 0, t1.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates)
require.Nil(t, t1.Diagnostics.Scheduling.LastSimilarCandidate)
require.True(t, t1.Diagnostics.Scheduling.Selected)
t1.Diagnostics = nil
require.Equal(t, []string{b1.ID}, t2.Diagnostics.Scheduling.MatchingBots)
require.Equal(t, 1, t2.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates)
require.Equal(t, &t1.TaskKey, t2.Diagnostics.Scheduling.LastSimilarCandidate)
require.False(t, t2.Diagnostics.Scheduling.Selected)
t2.Diagnostics = nil
t1 = makeTaskCandidate("task1", []string{"k:v"})
t2 = makeTaskCandidate("task2", dims)
rv = getCandidatesToSchedule(ctx, []*types.Machine{b2, b1}, []*TaskCandidate{t1, t2})
assertdeep.Equal(t, []*TaskCandidate{t1}, rv)
require.Equal(t, []string{b1.ID, b2.ID}, t1.Diagnostics.Scheduling.MatchingBots)
require.Equal(t, 0, t1.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates)
require.Nil(t, t1.Diagnostics.Scheduling.LastSimilarCandidate)
require.True(t, t1.Diagnostics.Scheduling.Selected)
t1.Diagnostics = nil
require.Equal(t, []string{b1.ID}, t2.Diagnostics.Scheduling.MatchingBots)
require.Equal(t, 1, t2.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates)
require.Equal(t, &t1.TaskKey, t2.Diagnostics.Scheduling.LastSimilarCandidate)
require.False(t, t2.Diagnostics.Scheduling.Selected)
t2.Diagnostics = nil
// In these two cases, the task with more dimensions has the higher
// priority. Both tasks get scheduled.
t1 = makeTaskCandidate("task1", []string{"k:v"})
t2 = makeTaskCandidate("task2", dims)
rv = getCandidatesToSchedule(ctx, []*types.Machine{b1, b2}, []*TaskCandidate{t2, t1})
assertdeep.Equal(t, []*TaskCandidate{t2, t1}, rv)
require.Equal(t, []string{b1.ID, b2.ID}, t1.Diagnostics.Scheduling.MatchingBots)
require.Equal(t, 1, t1.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates)
require.Equal(t, &t2.TaskKey, t1.Diagnostics.Scheduling.LastSimilarCandidate)
require.True(t, t1.Diagnostics.Scheduling.Selected)
t1.Diagnostics = nil
require.Equal(t, []string{b1.ID}, t2.Diagnostics.Scheduling.MatchingBots)
require.Equal(t, 0, t2.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates)
require.Nil(t, t2.Diagnostics.Scheduling.LastSimilarCandidate)
require.True(t, t2.Diagnostics.Scheduling.Selected)
t2.Diagnostics = nil
t1 = makeTaskCandidate("task1", []string{"k:v"})
t2 = makeTaskCandidate("task2", dims)
rv = getCandidatesToSchedule(ctx, []*types.Machine{b2, b1}, []*TaskCandidate{t2, t1})
assertdeep.Equal(t, []*TaskCandidate{t2, t1}, rv)
require.Equal(t, []string{b1.ID, b2.ID}, t1.Diagnostics.Scheduling.MatchingBots)
require.Equal(t, 1, t1.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates)
require.Equal(t, &t2.TaskKey, t1.Diagnostics.Scheduling.LastSimilarCandidate)
require.True(t, t1.Diagnostics.Scheduling.Selected)
t1.Diagnostics = nil
require.Equal(t, []string{b1.ID}, t2.Diagnostics.Scheduling.MatchingBots)
require.Equal(t, 0, t2.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates)
require.Nil(t, t2.Diagnostics.Scheduling.LastSimilarCandidate)
require.True(t, t2.Diagnostics.Scheduling.Selected)
t2.Diagnostics = nil
// Matching dimensions. More bots than tasks.
b2 = makeSwarmingBot("bot2", dims)
b3 := makeSwarmingBot("bot3", dims)
t1 = makeTaskCandidate("task1", dims)
t2 = makeTaskCandidate("task2", dims)
t3 := makeTaskCandidate("task3", dims)
rv = getCandidatesToSchedule(ctx, []*types.Machine{b1, b2, b3}, []*TaskCandidate{t1, t2})
assertdeep.Equal(t, []*TaskCandidate{t1, t2}, rv)
checkDiags([]*types.Machine{b1, b2, b3}, []*TaskCandidate{t1, t2})
// More tasks than bots.
t1 = makeTaskCandidate("task1", dims)
t2 = makeTaskCandidate("task2", dims)
t3 = makeTaskCandidate("task3", dims)
rv = getCandidatesToSchedule(ctx, []*types.Machine{b1, b2}, []*TaskCandidate{t1, t2, t3})
assertdeep.Equal(t, []*TaskCandidate{t1, t2}, rv)
checkDiags([]*types.Machine{b1, b2}, []*TaskCandidate{t1, t2, t3})
func makeBot(id string, dims map[string]string) *types.Machine {
dimensions := make([]string, 0, len(dims))
for k, v := range dims {
dimensions = append(dimensions, fmt.Sprintf("%s:%s", k, v))
return &types.Machine{
ID: id,
Dimensions: dimensions,
func mockBots(t sktest.TestingT, swarmingClient *swarming_testutils.TestClient, bots ...*types.Machine) {
swarmBots := make([]*swarming_api.SwarmingRpcsBotInfo, 0, len(bots))
for _, bot := range bots {
dims, err := swarming.ParseDimensions(bot.Dimensions)
require.NoError(t, err)
swarmBots = append(swarmBots, &swarming_api.SwarmingRpcsBotInfo{
BotId: bot.ID,
Dimensions: swarming.StringMapToBotDimensions(dims),
func TestSchedulingE2E(t *testing.T) {
ctx, _, _, swarmingClient, s, _, cas, cleanup := setup(t)
defer cleanup()
c1 := rs1.Revision
c2 := rs2.Revision
// Start testing. No free bots, so we get a full queue with nothing
// scheduled.
runMainLoop(t, s, ctx)
tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2})
require.NoError(t, err)
expect := map[string]map[string]*types.Task{
c1: {},
c2: {},
assertdeep.Equal(t, expect, tasks)
require.Equal(t, 2, len(s.queue)) // Two compile tasks.
// A bot is free but doesn't have all of the right dimensions to run a task.
bot1 := makeBot("bot1", map[string]string{"pool": "Skia"})
mockBots(t, swarmingClient, bot1)
runMainLoop(t, s, ctx)
tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2})
require.NoError(t, err)
expect = map[string]map[string]*types.Task{
c1: {},
c2: {},
assertdeep.Equal(t, expect, tasks)
require.Equal(t, 2, len(s.queue)) // Still two compile tasks.
// One bot free, schedule a task, ensure it's not in the queue.
bot1.Dimensions = append(bot1.Dimensions, "os:Ubuntu")
mockBots(t, swarmingClient, bot1)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2})
require.NoError(t, err)
t1 := tasks[c2][tcc_testutils.BuildTaskName]
require.NotNil(t, t1)
require.Equal(t, c2, t1.Revision)
require.Equal(t, tcc_testutils.BuildTaskName, t1.Name)
require.Equal(t, []string{c2, c1}, t1.Commits)
require.Equal(t, 1, len(s.queue))
// The task is complete.
t1.Status = types.TASK_STATUS_SUCCESS
t1.Finished = now.Now(ctx)
t1.IsolatedOutput = "dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd/86"
require.NoError(t, s.putTask(ctx, t1))
makeSwarmingRpcsTaskRequestMetadata(t, t1, linuxTaskDims),
cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.TestCASDigest, t1.IsolatedOutput}).Return("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbabc123/56", nil)
cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.PerfCASDigest, t1.IsolatedOutput}).Return("ccccccccccccccccccccccccccccccccccccccccccccccccccccccccccabc123/56", nil)
// No bots free. Ensure that the queue is correct.
mockBots(t, swarmingClient)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2})
require.NoError(t, err)
for _, c := range t1.Commits {
expect[c][t1.Name] = t1
assertdeep.Equal(t, expect, tasks)
expectLen := 3 // One remaining build task, plus one test task and one perf task.
require.Equal(t, expectLen, len(s.queue))
// More bots than tasks free, ensure the queue is correct.
bot2 := makeBot("bot2", androidTaskDims)
bot3 := makeBot("bot3", androidTaskDims)
bot4 := makeBot("bot4", linuxTaskDims)
mockBots(t, swarmingClient, bot1, bot2, bot3, bot4)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
_, err = s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2})
require.NoError(t, err)
require.Equal(t, 0, len(s.queue))
// The build, test, and perf tasks should have triggered.
var t2 *types.Task
var t3 *types.Task
var t4 *types.Task
tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2})
require.NoError(t, err)
require.Len(t, tasks, 2)
for commit, v := range tasks {
if commit == c1 {
// Build task at c1 and test task at c2 whose blamelist also has c1.
require.Len(t, v, 2)
for _, task := range v {
if task.Revision != commit {
require.Equal(t, tcc_testutils.BuildTaskName, task.Name)
require.Nil(t, t4)
t4 = task
require.Equal(t, c1, task.Revision)
require.Equal(t, []string{c1}, task.Commits)
} else {
require.Len(t, v, 3)
for _, task := range v {
if task.Name == tcc_testutils.TestTaskName {
require.Nil(t, t2)
t2 = task
require.Equal(t, c2, task.Revision)
require.Equal(t, []string{c2, c1}, task.Commits)
} else if task.Name == tcc_testutils.PerfTaskName {
require.Nil(t, t3)
t3 = task
require.Equal(t, c2, task.Revision)
require.Equal(t, []string{c2}, task.Commits)
} else {
// This is the first task we triggered.
require.Equal(t, tcc_testutils.BuildTaskName, task.Name)
require.NotNil(t, t2)
require.NotNil(t, t3)
require.NotNil(t, t4)
t4.Status = types.TASK_STATUS_SUCCESS
t4.Finished = now.Now(ctx)
t4.IsolatedOutput = "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee/99"
require.NoError(t, s.putTask(ctx, t4))
// No new bots free; only the remaining test task should be in the queue.
mockBots(t, swarmingClient)
mockTasks := []*swarming_api.SwarmingRpcsTaskRequestMetadata{
makeSwarmingRpcsTaskRequestMetadata(t, t2, linuxTaskDims),
makeSwarmingRpcsTaskRequestMetadata(t, t3, linuxTaskDims),
makeSwarmingRpcsTaskRequestMetadata(t, t4, linuxTaskDims),
require.NoError(t, s.updateUnfinishedTasks(ctx))
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
expectLen = 1 // Test task from c1
require.Equal(t, expectLen, len(s.queue))
// Finish the other task.
t3, err = s.tCache.GetTask(t3.Id)
require.NoError(t, err)
t3.Status = types.TASK_STATUS_SUCCESS
t3.Finished = now.Now(ctx)
t3.IsolatedOutput = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/256"
// Ensure that we finalize all of the tasks and insert into the DB.
mockBots(t, swarmingClient, bot1, bot2, bot3, bot4)
mockTasks = []*swarming_api.SwarmingRpcsTaskRequestMetadata{
makeSwarmingRpcsTaskRequestMetadata(t, t3, linuxTaskDims),
cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.TestCASDigest, t4.IsolatedOutput}).Return("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbabc123/56", nil)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2})
require.NoError(t, err)
require.Equal(t, 2, len(tasks[c1]))
require.Equal(t, 3, len(tasks[c2]))
require.Equal(t, 0, len(s.queue))
// Mark everything as finished. Ensure that the queue still ends up empty.
tasksList := []*types.Task{}
for _, v := range tasks {
for _, task := range v {
if task.Status != types.TASK_STATUS_SUCCESS {
task.Status = types.TASK_STATUS_SUCCESS
task.Finished = now.Now(ctx)
task.IsolatedOutput = "abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234/56"
tasksList = append(tasksList, task)
mockTasks = make([]*swarming_api.SwarmingRpcsTaskRequestMetadata, 0, len(tasksList))
for _, task := range tasksList {
mockTasks = append(mockTasks, makeSwarmingRpcsTaskRequestMetadata(t, task, linuxTaskDims))
mockBots(t, swarmingClient, bot1, bot2, bot3, bot4)
require.NoError(t, s.updateUnfinishedTasks(ctx))
runMainLoop(t, s, ctx)
require.Equal(t, 0, len(s.queue))
func TestSchedulerStealingFrom(t *testing.T) {
ctx, mg, _, swarmingClient, s, _, _, cleanup := setup(t)
defer cleanup()
c1 := rs1.Revision
c2 := rs2.Revision
// Run the available compile task at c2.
bot1 := makeBot("bot1", linuxTaskDims)
bot2 := makeBot("bot2", linuxTaskDims)
mockBots(t, swarmingClient, bot1, bot2)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2})
require.NoError(t, err)
require.Equal(t, 1, len(tasks[c1]))
require.Equal(t, 1, len(tasks[c2]))
// Finish the one task.
tasksList := []*types.Task{}
t1 := tasks[c2][tcc_testutils.BuildTaskName]
t1.Status = types.TASK_STATUS_SUCCESS
t1.Finished = now.Now(ctx)
t1.IsolatedOutput = "abc123"
tasksList = append(tasksList, t1)
// Forcibly create and insert a second task at c1.
t2 := t1.Copy()
t2.Id = "t2id"
t2.Revision = c1
t2.Commits = []string{c1}
tasksList = append(tasksList, t2)
require.NoError(t, s.putTasks(ctx, tasksList))
// Add some commits.
commits := mg.CommitN(10)
require.NoError(t, s.repos[rs1.Repo].Update(ctx))
for _, h := range commits {
rs := types.RepoState{
Repo: rs1.Repo,
Revision: h,
fillCaches(t, ctx, s.taskCfgCache, rs, tcc_testutils.TasksCfg2)
insertJobs(t, ctx, s, rs)
// Run one task. Ensure that it's at tip-of-tree.
head := s.repos[rs1.Repo].Get(git.MainBranch).Hash
mockBots(t, swarmingClient, bot1)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, commits)
require.NoError(t, err)
require.Len(t, tasks[head], 1)
task := tasks[head][tcc_testutils.BuildTaskName]
require.Equal(t, head, task.Revision)
expect := commits[:]
assertdeep.Equal(t, expect, task.Commits)
task.Status = types.TASK_STATUS_SUCCESS
task.Finished = now.Now(ctx)
task.IsolatedOutput = "abc123"
require.NoError(t, s.putTask(ctx, task))
oldTasksByCommit := tasks
// Run backfills, ensuring that each one steals the right set of commits
// from previous builds, until all of the build task candidates have run.
for i := 0; i < 9; i++ {
// Now, run another task. The new task should bisect the old one.
mockBots(t, swarmingClient, bot1)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, commits)
require.NoError(t, err)
var newTask *types.Task
for _, v := range tasks {
for _, task := range v {
if task.Status == types.TASK_STATUS_PENDING {
require.True(t, newTask == nil || task.Id == newTask.Id)
newTask = task
require.NotNil(t, newTask)
oldTask := oldTasksByCommit[newTask.Revision][newTask.Name]
require.NotNil(t, oldTask)
require.True(t, util.In(newTask.Revision, oldTask.Commits))
// Find the updated old task.
updatedOldTask, err := s.tCache.GetTask(oldTask.Id)
require.NoError(t, err)
require.NotNil(t, updatedOldTask)
// Ensure that the blamelists are correct.
old := util.NewStringSet(oldTask.Commits)
new := util.NewStringSet(newTask.Commits)
updatedOld := util.NewStringSet(updatedOldTask.Commits)
assertdeep.Equal(t, old, new.Union(updatedOld))
require.Equal(t, 0, len(new.Intersect(updatedOld)))
// Finish the new task.
newTask.Status = types.TASK_STATUS_SUCCESS
newTask.Finished = now.Now(ctx)
newTask.IsolatedOutput = "abc123"
require.NoError(t, s.putTask(ctx, newTask))
oldTasksByCommit = tasks
// Ensure that we're really done.
mockBots(t, swarmingClient, bot1)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, commits)
require.NoError(t, err)
var newTask *types.Task
for _, v := range tasks {
for _, task := range v {
if task.Status == types.TASK_STATUS_PENDING {
require.True(t, newTask == nil || task.Id == newTask.Id)
newTask = task
require.Nil(t, newTask)
// spyDB calls onPutTasks before delegating PutTask(s) to DB.
type spyDB struct {
onPutTasks func([]*types.Task)
func (s *spyDB) PutTask(ctx context.Context, task *types.Task) error {
return s.DB.PutTask(ctx, task)
func (s *spyDB) PutTasks(ctx context.Context, tasks []*types.Task) error {
return s.DB.PutTasks(ctx, tasks)
func testMultipleCandidatesBackfillingEachOtherSetup(t *testing.T) (context.Context, *mem_git.MemGit, db.DB, *TaskScheduler, *swarming_testutils.TestClient, []string, func(*types.Task), *specs.TasksCfg, func()) {
ctx, cancel := context.WithCancel(context.Background())
workdir, err := ioutil.TempDir("", "")
require.NoError(t, err)
// Setup the scheduler.
d := memory.NewInMemoryDB()
swarmingClient := swarming_testutils.NewTestClient()
mg, repo := newMemRepo(t)
repos := repograph.Map{
rs1.Repo: repo,
btProject, btInstance, btCleanup := tcc_testutils.SetupBigTable(t)
taskCfgCache, err := task_cfg_cache.NewTaskCfgCache(ctx, repos, btProject, btInstance, nil)
require.NoError(t, err)
// Create a single task in the config.
taskName := "faketask"
cfg := &specs.TasksCfg{
CasSpecs: map[string]*specs.CasSpec{
"compile": {
Digest: tcc_testutils.CompileCASDigest,
Tasks: map[string]*specs.TaskSpec{
taskName: {
CasSpec: "compile",
CipdPackages: []*specs.CipdPackage{},
Dependencies: []string{},
Dimensions: []string{"pool:Skia"},
Priority: 1.0,
Jobs: map[string]*specs.JobSpec{
"j1": {
TaskSpecs: []string{taskName},
hashes := mg.CommitN(1)
mockTasks := []*swarming_api.SwarmingRpcsTaskRequestMetadata{}
mock := func(task *types.Task) {
task.Status = types.TASK_STATUS_SUCCESS
task.Finished = now.Now(ctx)
task.IsolatedOutput = tcc_testutils.CompileCASDigest
mockTasks = append(mockTasks, makeSwarmingRpcsTaskRequestMetadata(t, task, linuxTaskDims))
// Create the TaskScheduler.
cas := &mocks.CAS{}
// Go ahead and mock the single-input merge calls.
cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.CompileCASDigest}).Return(tcc_testutils.CompileCASDigest, nil)
cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.TestCASDigest}).Return(tcc_testutils.TestCASDigest, nil)
cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.PerfCASDigest}).Return(tcc_testutils.PerfCASDigest, nil)
taskExec := swarming_task_execution.NewSwarmingTaskExecutor(swarmingClient, "fake-cas-instance", "")
taskExecs := map[string]types.TaskExecutor{
types.TaskExecutor_Swarming: taskExec,
types.TaskExecutor_UseDefault: taskExec,
s, err := NewTaskScheduler(ctx, d, nil, time.Duration(math.MaxInt64), 0, repos, cas, "fake-cas-instance", taskExecs, mockhttpclient.NewURLMock().Client(), 1.0, swarming.POOLS_PUBLIC, cdPoolName, "", taskCfgCache, nil, mem_gcsclient.New("diag_unit_tests"), btInstance, BusyBotsDebugLoggingOff)
require.NoError(t, err)
for _, h := range hashes {
rs := types.RepoState{
Repo: rs1.Repo,
Revision: h,
fillCaches(t, ctx, taskCfgCache, rs, cfg)
insertJobs(t, ctx, s, rs)
// Cycle once.
bot1 := makeBot("bot1", map[string]string{"pool": "Skia"})
mockBots(t, swarmingClient, bot1)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
require.Equal(t, 0, len(s.queue))
head := s.repos[rs1.Repo].Get(git.MainBranch).Hash
tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, []string{head})
require.NoError(t, err)
require.Len(t, tasks[head], 1)
// Add some commits to the repo.
newHashes := mg.CommitN(8)
for _, h := range newHashes {
rs := types.RepoState{
Repo: rs1.Repo,
Revision: h,
fillCaches(t, ctx, taskCfgCache, rs, cfg)
insertJobs(t, ctx, s, rs)
require.NoError(t, s.repos[rs1.Repo].Update(ctx))
return ctx, mg, d, s, swarmingClient, newHashes, mock, cfg, func() {
testutils.AssertCloses(t, s)
testutils.RemoveAll(t, workdir)
func TestMultipleCandidatesBackfillingEachOther(t *testing.T) {
ctx, _, d, s, swarmingClient, commits, mock, _, cleanup := testMultipleCandidatesBackfillingEachOtherSetup(t)
defer cleanup()
// Trigger builds simultaneously.
bot1 := makeBot("bot1", map[string]string{"pool": "Skia"})
bot2 := makeBot("bot2", map[string]string{"pool": "Skia"})
bot3 := makeBot("bot3", map[string]string{"pool": "Skia"})
mockBots(t, swarmingClient, bot1, bot2, bot3)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
require.Equal(t, 5, len(s.queue))
tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, commits)
require.NoError(t, err)
// If we're queueing correctly, we should've triggered tasks at
// commits[0], commits[4], and either commits[2] or commits[6].
var t1, t2, t3 *types.Task
for _, byName := range tasks {
for _, task := range byName {
if task.Revision == commits[0] {
t1 = task
} else if task.Revision == commits[4] {
t2 = task
} else if task.Revision == commits[2] || task.Revision == commits[6] {
t3 = task
} else {
require.FailNow(t, fmt.Sprintf("Task has unknown revision: %+v", task))
require.NotNil(t, t1)
require.NotNil(t, t2)
require.NotNil(t, t3)
// Ensure that we got the blamelists right.
var expect1, expect2, expect3 []string
if t3.Revision == commits[2] {
expect1 = util.CopyStringSlice(commits[:2])
expect2 = util.CopyStringSlice(commits[4:])
expect3 = util.CopyStringSlice(commits[2:4])
} else {
expect1 = util.CopyStringSlice(commits[:4])
expect2 = util.CopyStringSlice(commits[4:6])
expect3 = util.CopyStringSlice(commits[6:])
assertdeep.Equal(t, expect1, t1.Commits)
assertdeep.Equal(t, expect2, t2.Commits)
assertdeep.Equal(t, expect3, t3.Commits)
// Just for good measure, check the task at the head of the queue.
expectIdx := 2
if t3.Revision == commits[expectIdx] {
expectIdx = 6
require.Equal(t, commits[expectIdx], s.queue[0].Revision)
retryCount := 0
causeConcurrentUpdate := func(tasks []*types.Task) {
// HACK(benjaminwagner): Filter out PutTask calls from
// updateUnfinishedTasks by looking for new tasks.
anyNew := false
for _, task := range tasks {
if util.TimeIsZero(task.DbModified) {
anyNew = true
if !anyNew {
if retryCount < 3 {
taskToUpdate := []*types.Task{t1, t2, t3}[retryCount]
taskInDb, err := d.GetTaskById(ctx, taskToUpdate.Id)
require.NoError(t, err)
taskInDb.Status = types.TASK_STATUS_SUCCESS
require.NoError(t, d.PutTask(ctx, taskInDb))
s.db = &spyDB{
DB: d,
onPutTasks: causeConcurrentUpdate,
// Run again with 5 bots to check the case where we bisect the same
// task twice.
bot4 := makeBot("bot4", map[string]string{"pool": "Skia"})
bot5 := makeBot("bot5", map[string]string{"pool": "Skia"})
mockBots(t, swarmingClient, bot1, bot2, bot3, bot4, bot5)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
require.Equal(t, 0, len(s.queue))
require.Equal(t, 3, retryCount)
tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, commits)
require.NoError(t, err)
for _, byName := range tasks {
for _, task := range byName {
require.Equal(t, 1, len(task.Commits))
require.Equal(t, task.Revision, task.Commits[0])
if util.In(task.Id, []string{t1.Id, t2.Id, t3.Id}) {
require.Equal(t, types.TASK_STATUS_SUCCESS, task.Status)
} else {
require.Equal(t, types.TASK_STATUS_PENDING, task.Status)
func TestSchedulingRetry(t *testing.T) {
ctx, _, _, swarmingClient, s, _, _, cleanup := setup(t)
defer cleanup()
c1 := rs1.Revision
// Run the available compile task at c2.
bot1 := makeBot("bot1", linuxTaskDims)
mockBots(t, swarmingClient, bot1)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
tasks, err := s.tCache.UnfinishedTasks()
require.NoError(t, err)
require.Len(t, tasks, 1)
t1 := tasks[0]
require.NotNil(t, t1)
// Ensure c2, not c1.
require.NotEqual(t, c1, t1.Revision)
c2 := t1.Revision
// Forcibly add a second build task at c1.
t2 := t1.Copy()
t2.Id = "t2Id"
t2.Revision = c1
t2.Commits = []string{c1}
t1.Commits = []string{c2}
// One task successful, the other not.
t1.Status = types.TASK_STATUS_FAILURE
t1.Finished = now.Now(ctx)
t2.Status = types.TASK_STATUS_SUCCESS
t2.Finished = now.Now(ctx)
t2.IsolatedOutput = "abc123"
require.NoError(t, s.putTasks(ctx, []*types.Task{t1, t2}))
// Cycle. Ensure that we schedule a retry of t1.
prev := t1
i := 1
for {
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
tasks, err = s.tCache.UnfinishedTasks()
require.NoError(t, err)
if len(tasks) == 0 {
require.Len(t, tasks, 1)
retry := tasks[0]
require.NotNil(t, retry)
require.Equal(t, prev.Id, retry.RetryOf)
require.Equal(t, i, retry.Attempt)
require.Equal(t, c2, retry.Revision)
retry.Status = types.TASK_STATUS_FAILURE
retry.Finished = now.Now(ctx)
require.NoError(t, s.putTask(ctx, retry))
prev = retry
require.Equal(t, 5, i)
func TestParentTaskId(t *testing.T) {
ctx, _, _, swarmingClient, s, _, cas, cleanup := setup(t)
defer cleanup()
// Run the available compile task at c2.
bot1 := makeBot("bot1", linuxTaskDims)
mockBots(t, swarmingClient, bot1)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
tasks, err := s.tCache.UnfinishedTasks()
require.NoError(t, err)
require.Len(t, tasks, 1)
t1 := tasks[0]
t1.Status = types.TASK_STATUS_SUCCESS
t1.Finished = now.Now(ctx)
t1.IsolatedOutput = "abc123/45"
require.Equal(t, 0, len(t1.ParentTaskIds))
require.NoError(t, s.putTasks(ctx, []*types.Task{t1}))
// Run the dependent tasks. Ensure that their parent IDs are correct.
bot3 := makeBot("bot3", androidTaskDims)
bot4 := makeBot("bot4", androidTaskDims)
mockBots(t, swarmingClient, bot3, bot4)
cas.On("Merge", testutils.AnyContext, []string{"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb/42", "abc123/45"}).Return("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbabc123/87", nil)
cas.On("Merge", testutils.AnyContext, []string{"cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc/42", "abc123/45"}).Return("ccccccccccccccccccccccccccccccccccccccccccccccccccccccccccabc123/87", nil)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
tasks, err = s.tCache.UnfinishedTasks()
require.NoError(t, err)
require.Len(t, tasks, 2)
for _, task := range tasks {
require.Equal(t, 1, len(task.ParentTaskIds))
p := task.ParentTaskIds[0]
require.Equal(t, p, t1.Id)
updated, err := task.UpdateFromTaskResult(&types.TaskResult{
ID: task.SwarmingTaskId,
CasOutput: task.IsolatedOutput,
Created: task.Created,
Finished: task.Finished,
Started: task.Started,
Status: task.Status,
Tags: map[string][]string{
types.SWARMING_TAG_ID: {task.Id},
types.SWARMING_TAG_NAME: {task.Name},
types.SWARMING_TAG_REPO: {task.Repo},
types.SWARMING_TAG_REVISION: {task.Revision},
types.SWARMING_TAG_PARENT_TASK_ID: task.ParentTaskIds,
types.SWARMING_TAG_FORCED_JOB_ID: {task.ForcedJobId},
MachineID: task.SwarmingBotId,
require.NoError(t, err)
require.False(t, updated)
func TestSkipTasks(t *testing.T) {
// skip_tasks has its own tests, so this test just verifies that it's
// actually integrated into the scheduler.
ctx, _, _, swarmingClient, s, _, _, cleanup := setup(t)
defer cleanup()
c, cleanupfs := ftestutils.NewClientForTesting(context.Background(), t)
defer cleanupfs()
bl, err := skip_tasks.New(context.Background(), c)
require.NoError(t, err)
s.skipTasks = bl
c1 := rs1.Revision
// Mock some bots, add one of the build tasks to the skip_tasks list.
bot1 := makeBot("bot1", linuxTaskDims)
bot2 := makeBot("bot2", linuxTaskDims)
mockBots(t, swarmingClient, bot1, bot2)
require.NoError(t, s.GetSkipTasks().AddRule(ctx, &skip_tasks.Rule{
AddedBy: "Tests",
TaskSpecPatterns: []string{".*"},
Commits: []string{c1},
Description: "desc",
Name: "My-Rule",
}, s.repos))
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
tasks, err := s.tCache.UnfinishedTasks()
require.NoError(t, err)
// The skipped commit should not have been triggered.
require.Len(t, tasks, 1)
require.NotEqual(t, c1, tasks[0].Revision)
// Candidate diagnostics should indicate the skip rule.
diag := lastDiagnostics(t, s)
foundSkipped := 0
for _, c := range diag.Candidates {
if c.Revision == c1 {
require.Equal(t, "My-Rule", c.Diagnostics.Filtering.SkippedByRule)
} else if c.TaskKey == tasks[0].TaskKey {
require.Nil(t, c.Diagnostics.Filtering)
} else {
require.Equal(t, "", c.Diagnostics.Filtering.SkippedByRule)
require.True(t, len(c.Diagnostics.Filtering.UnmetDependencies) > 0)
// Should be one Build task and one Test task skipped.
require.Equal(t, 2, foundSkipped)
func TestGetTasksForJob(t *testing.T) {
ctx, _, _, swarmingClient, s, _, cas, cleanup := setup(t)
defer cleanup()
c1 := rs1.Revision
c2 := rs2.Revision
// Cycle once, check that we have empty sets for all Jobs.
runMainLoop(t, s, ctx)
jobs, err := s.jCache.UnfinishedJobs()
require.NoError(t, err)
require.Len(t, jobs, 5)
var j1, j2, j3, j4, j5 *types.Job
for _, j := range jobs {
if j.Revision == c1 {
if j.Name == tcc_testutils.BuildTaskName {
j1 = j
} else {
j2 = j
} else {
if j.Name == tcc_testutils.BuildTaskName {
j3 = j
} else if j.Name == tcc_testutils.TestTaskName {
j4 = j
} else {
j5 = j
tasksByName, err := s.getTasksForJob(j)
require.NoError(t, err)
for _, tasks := range tasksByName {
require.Empty(t, tasks)
require.NotNil(t, j1)
require.NotNil(t, j2)
require.NotNil(t, j3)
require.NotNil(t, j4)
require.NotNil(t, j5)
// Run the available compile task at c2.
bot1 := makeBot("bot1", linuxTaskDims)
mockBots(t, swarmingClient, bot1)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
tasks, err := s.tCache.UnfinishedTasks()
require.NoError(t, err)
require.Len(t, tasks, 1)
t1 := tasks[0]
require.NotNil(t, t1)
require.Equal(t, t1.Revision, c2)
// Test that we get the new tasks where applicable.
expect := map[string]map[string][]*types.Task{
j1.Id: {
tcc_testutils.BuildTaskName: {},
j2.Id: {
tcc_testutils.BuildTaskName: {},
tcc_testutils.TestTaskName: {},
j3.Id: {
tcc_testutils.BuildTaskName: {t1},
j4.Id: {
tcc_testutils.BuildTaskName: {t1},
tcc_testutils.TestTaskName: {},
j5.Id: {
tcc_testutils.BuildTaskName: {t1},
tcc_testutils.PerfTaskName: {},
for _, j := range jobs {
tasksByName, err := s.getTasksForJob(j)
require.NoError(t, err)
assertdeep.Equal(t, expect[j.Id], tasksByName)
// Mark the task as failed.
t1.Status = types.TASK_STATUS_FAILURE
t1.Finished = now.Now(ctx)
require.NoError(t, s.putTasks(ctx, []*types.Task{t1}))
// Test that the results propagated through.
for _, j := range jobs {
tasksByName, err := s.getTasksForJob(j)
require.NoError(t, err)
assertdeep.Equal(t, expect[j.Id], tasksByName)
// Cycle. Ensure that we schedule a retry of t1.
// Need two bots, since the retry will score lower than the Build task at c1.
bot2 := makeBot("bot2", linuxTaskDims)
mockBots(t, swarmingClient, bot1, bot2)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
tasks, err = s.tCache.UnfinishedTasks()
require.NoError(t, err)
require.Len(t, tasks, 2)
var t2, t3 *types.Task
for _, task := range tasks {
if task.TaskKey == t1.TaskKey {
t2 = task
} else {
t3 = task
require.NotNil(t, t2)
require.Equal(t, t1.Id, t2.RetryOf)
// Verify that both the original t1 and its retry show up.
t1, err = s.tCache.GetTask(t1.Id) // t1 was updated.
require.NoError(t, err)
expect[j1.Id][tcc_testutils.BuildTaskName] = []*types.Task{t3}
expect[j2.Id][tcc_testutils.BuildTaskName] = []*types.Task{t3}
expect[j3.Id][tcc_testutils.BuildTaskName] = []*types.Task{t1, t2}
expect[j4.Id][tcc_testutils.BuildTaskName] = []*types.Task{t1, t2}
expect[j5.Id][tcc_testutils.BuildTaskName] = []*types.Task{t1, t2}
for _, j := range jobs {
tasksByName, err := s.getTasksForJob(j)
require.NoError(t, err)
assertdeep.Equal(t, expect[j.Id], tasksByName)
// The retry succeeded.
t2.Status = types.TASK_STATUS_SUCCESS
t2.Finished = now.Now(ctx)
t2.IsolatedOutput = "abc123/45"
// The Build at c1 failed.
t3.Status = types.TASK_STATUS_FAILURE
t3.Finished = now.Now(ctx)
require.NoError(t, s.putTasks(ctx, []*types.Task{t2, t3}))
mockBots(t, swarmingClient)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
tasks, err = s.tCache.UnfinishedTasks()
require.NoError(t, err)
require.Empty(t, tasks)
// Test that the results propagated through.
for _, j := range jobs {
tasksByName, err := s.getTasksForJob(j)
require.NoError(t, err)
assertdeep.Equal(t, expect[j.Id], tasksByName)
// Schedule the remaining tasks.
bot3 := makeBot("bot3", androidTaskDims)
bot4 := makeBot("bot4", androidTaskDims)
bot5 := makeBot("bot5", androidTaskDims)
mockBots(t, swarmingClient, bot3, bot4, bot5)
cas.On("Merge", testutils.AnyContext, []string{"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb/42", "abc123/45"}).Return("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbabc123/87", nil)
cas.On("Merge", testutils.AnyContext, []string{"cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc/42", "abc123/45"}).Return("ccccccccccccccccccccccccccccccccccccccccccccccccccccccccccabc123/87", nil)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
// Verify that the new tasks show up.
tasks, err = s.tCache.UnfinishedTasks()
require.NoError(t, err)
require.Len(t, tasks, 2) // Test and perf at c2.
var t4, t5 *types.Task
for _, task := range tasks {
if task.Name == tcc_testutils.TestTaskName {
t4 = task
} else {
t5 = task
expect[j4.Id][tcc_testutils.TestTaskName] = []*types.Task{t4}
expect[j5.Id][tcc_testutils.PerfTaskName] = []*types.Task{t5}
for _, j := range jobs {
tasksByName, err := s.getTasksForJob(j)
require.NoError(t, err)
assertdeep.Equal(t, expect[j.Id], tasksByName)
func TestTaskTimeouts(t *testing.T) {
ctx, mg, _, swarmingClient, s, _, _, cleanup := setup(t)
defer cleanup()
// The test repo does not set any timeouts. Ensure that we get
// reasonable default values.
bot1 := makeBot("bot1", map[string]string{"pool": "Skia", "os": "Ubuntu", "gpu": "none"})
mockBots(t, swarmingClient, bot1)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
unfinished, err := s.tCache.UnfinishedTasks()
require.NoError(t, err)
require.Len(t, unfinished, 1)
task := unfinished[0]
swarmingTask, err := swarmingClient.GetTaskMetadata(ctx, task.SwarmingTaskId)
require.NoError(t, err)
// These are the defaults in go/swarming/swarming.go.
require.Equal(t, 1, len(swarmingTask.Request.TaskSlices))
require.Equal(t, int64(60*60), swarmingTask.Request.TaskSlices[0].Properties.ExecutionTimeoutSecs)
require.Equal(t, int64(20*60), swarmingTask.Request.TaskSlices[0].Properties.IoTimeoutSecs)
require.Equal(t, int64(4*60*60), swarmingTask.Request.TaskSlices[0].ExpirationSecs)
// Fail the task to get it out of the unfinished list.
task.Status = types.TASK_STATUS_FAILURE
require.NoError(t, s.putTask(ctx, task))
// Rewrite tasks.json with some timeouts.
name := "Timeout-Task"
cfg := &specs.TasksCfg{
CasSpecs: map[string]*specs.CasSpec{
"compile": {
Digest: tcc_testutils.CompileCASDigest,
Jobs: map[string]*specs.JobSpec{
"Timeout-Job": {
Priority: 1.0,
TaskSpecs: []string{name},
Tasks: map[string]*specs.TaskSpec{
name: {
CasSpec: "compile",
CipdPackages: []*specs.CipdPackage{},
Dependencies: []string{},
Dimensions: []string{
ExecutionTimeout: 40 * time.Minute,
Expiration: 2 * time.Hour,
IoTimeout: 3 * time.Minute,
Priority: 1.0,
hashes := mg.CommitN(1)
require.NoError(t, s.repos.Update(ctx))
rs := types.RepoState{
Repo: "fake.git",
Revision: hashes[0],
fillCaches(t, ctx, s.taskCfgCache, rs, cfg)
insertJobs(t, ctx, s, rs)
// Cycle, ensure that we get the expected timeouts.
bot2 := makeBot("bot2", map[string]string{"pool": "Skia", "os": "Mac", "gpu": "my-gpu"})
mockBots(t, swarmingClient, bot2)
runMainLoop(t, s, ctx)
require.NoError(t, s.tCache.Update(ctx))
unfinished, err = s.tCache.UnfinishedTasks()
require.NoError(t, err)
require.Len(t, unfinished, 1)
task = unfinished[0]
require.Equal(t, name, task.Name)
swarmingTask, err = swarmingClient.GetTaskMetadata(ctx, task.SwarmingTaskId)
require.NoError(t, err)
require.Equal(t, 1, len(swarmingTask.Request.TaskSlices))
require.Equal(t, int64(40*60), swarmingTask.Request.TaskSlices[0].Properties.ExecutionTimeoutSecs)
require.Equal(t, int64(3*60), swarmingTask.Request.TaskSlices[0].Properties.IoTimeoutSecs)
require.Equal(t, int64(2*60*60), swarmingTask.Request.TaskSlices[0].ExpirationSecs)
func TestUpdateUnfinishedTasks(t *testing.T) {
ctx, _, _, swarmingClient, s, _, _, cleanup := setup(t)
defer cleanup()
// Create a few tasks.
now := time.Unix(1480683321, 0).UTC()
t1 := &types.Task{
Id: "t1",
Created: now.Add(-time.Minute),
SwarmingTaskId: "swarmt1",
t2 := &types.Task{
Id: "t2",
Created: now.Add(-10 * time.Minute),
SwarmingTaskId: "swarmt2",
t3 := &types.Task{
Id: "t3",
Created: now.Add(-5 * time.Hour), // Outside the 4-hour window.
SwarmingTaskId: "swarmt3",
// Include a fake task to ensure it's ignored.
t4 := &types.Task{
Id: "t4",
Created: now.Add(-time.Minute),
// Insert the tasks into the DB.
tasks := []*types.Task{t1, t2, t3, t4}
require.NoError(t, s.putTasks(ctx, tasks))
// Update the tasks, mock in Swarming.
t1.Status = types.TASK_STATUS_SUCCESS
t1.IsolatedOutput = "dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd/86"
t2.Status = types.TASK_STATUS_FAILURE
t3.Status = types.TASK_STATUS_SUCCESS
t3.IsolatedOutput = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/256"
m1 := makeSwarmingRpcsTaskRequestMetadata(t, t1, linuxTaskDims)
m2 := makeSwarmingRpcsTaskRequestMetadata(t, t2, linuxTaskDims)
m3 := makeSwarmingRpcsTaskRequestMetadata(t, t3, linuxTaskDims)
swarmingClient.MockTasks([]*swarming_api.SwarmingRpcsTaskRequestMetadata{m1, m2, m3})
// Assert that the third task doesn't show up in the time range query.
got, err := swarmingClient.ListTasks(ctx, now.Add(-4*time.Hour), now, []string{"pool:Skia"}, "")
require.NoError(t, err)
assertdeep.Equal(t, []*swarming_api.SwarmingRpcsTaskRequestMetadata{m1, m2}, got)
// Ensure that we update the tasks as expected.
require.NoError(t, s.updateUnfinishedTasks(ctx))
for _, task := range tasks {
got, err := s.db.GetTaskById(ctx, task.Id)
require.NoError(t, err)
// Ignore DbModified when comparing.
task.DbModified = got.DbModified
assertdeep.Equal(t, task, got)
// setupAddTasksTest calls setup then adds 7 commits to the repo and returns
// their hashes.
func setupAddTasksTest(t *testing.T) (context.Context, *mem_git.MemGit, []string, *memory.InMemoryDB, *TaskScheduler, func()) {
ctx, mg, d, _, s, _, _, cleanup := setup(t)
// Add some commits to test blamelist calculation.
require.NoError(t, s.repos.Update(ctx))
hashes, err := s.repos[rs1.Repo].Get(git.MainBranch).AllCommits()
require.NoError(t, err)
for _, hash := range hashes {
require.NoError(t, s.taskCfgCache.Set(ctx, types.RepoState{
Repo: rs1.Repo,
Revision: hash,
}, &specs.TasksCfg{
Tasks: map[string]*specs.TaskSpec{
"duty": {},
"onus": {},
"toil": {},
"work": {},
}, nil))
return ctx, mg, hashes, d, s, func() {
// assertBlamelist asserts task.Commits contains exactly the hashes at the given
// indexes of hashes, in any order.
func assertBlamelist(t *testing.T, hashes []string, task *types.Task, indexes []int) {
expected := util.NewStringSet()
for _, idx := range indexes {
expected[hashes[idx]] = true
require.Equal(t, expected, util.NewStringSet(task.Commits))
// assertModifiedTasks asserts that the result of GetModifiedTasks is deep-equal
// to expected, in any order.
func assertModifiedTasks(t *testing.T, d db.TaskReader, mod <-chan []*types.Task, expected []*types.Task) {
tasksById := map[string]*types.Task{}
require.Eventually(t, func() bool {
// Use a select so that the test will fail after 10 seconds
// rather than time out after 10 minutes (or whatever the
// overall timeout is set to).
select {
case modTasks := <-mod:
for _, task := range modTasks {
tasksById[task.Id] = task
for _, expectedTask := range expected {
actualTask, ok := tasksById[expectedTask.Id]
if !ok {
return false
if !deepequal.DeepEqual(expectedTask, actualTask) {
return false
return true
// Nothing to do.
return false
}, 10*time.Second, 50*time.Millisecond)
// addTasksSingleTaskSpec should add tasks and compute simple blamelists.
func TestAddTasksSingleTaskSpecSimple(t *testing.T) {
ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t)
defer cleanup()
t1 := makeTask(ctx, "toil", rs1.Repo, hashes[6])
require.NoError(t, s.putTask(ctx, t1))
mod := d.ModifiedTasksCh(ctx)
<-mod // The first batch is unused.
t2 := makeTask(ctx, "toil", rs1.Repo, hashes[5]) // Commits should be {5}
t3 := makeTask(ctx, "toil", rs1.Repo, hashes[3]) // Commits should be {3, 4}
t4 := makeTask(ctx, "toil", rs1.Repo, hashes[2]) // Commits should be {2}
t5 := makeTask(ctx, "toil", rs1.Repo, hashes[0]) // Commits should be {0, 1}
// Clear Commits on some tasks, set incorrect Commits on others to
// ensure it's ignored.
t3.Commits = nil
t4.Commits = []string{hashes[5], hashes[4], hashes[3], hashes[2]}
// Specify tasks in wrong order to ensure results are deterministic.
require.NoError(t, s.addTasksSingleTaskSpec(ctx, []*types.Task{t5, t2, t3, t4}))
assertBlamelist(t, hashes, t2, []int{5})
assertBlamelist(t, hashes, t3, []int{3, 4})
assertBlamelist(t, hashes, t4, []int{2})
assertBlamelist(t, hashes, t5, []int{0, 1})
// Check that the tasks were inserted into the DB.
assertModifiedTasks(t, d, mod, []*types.Task{t2, t3, t4, t5})
// addTasksSingleTaskSpec should compute blamelists when new tasks bisect each
// other.
func TestAddTasksSingleTaskSpecBisectNew(t *testing.T) {
ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t)
defer cleanup()
t1 := makeTask(ctx, "toil", rs1.Repo, hashes[6])
require.NoError(t, s.putTask(ctx, t1))
mod := d.ModifiedTasksCh(ctx)
<-mod // The first batch is unused.
// t2.Commits = {1, 2, 3, 4, 5}
t2 := makeTask(ctx, "toil", rs1.Repo, hashes[1])
// t3.Commits = {3, 4, 5}
// t2.Commits = {1, 2}
t3 := makeTask(ctx, "toil", rs1.Repo, hashes[3])
// t4.Commits = {4, 5}
// t3.Commits = {3}
t4 := makeTask(ctx, "toil", rs1.Repo, hashes[4])
// t5.Commits = {0}
t5 := makeTask(ctx, "toil", rs1.Repo, hashes[0])
// t6.Commits = {2}
// t2.Commits = {1}
t6 := makeTask(ctx, "toil", rs1.Repo, hashes[2])
// t7.Commits = {1}
// t2.Commits = {}
t7 := makeTask(ctx, "toil", rs1.Repo, hashes[1])
// Specify tasks in wrong order to ensure results are deterministic.
tasks := []*types.Task{t5, t2, t7, t3, t6, t4}
// Assign Ids.
for _, task := range tasks {
require.NoError(t, d.AssignId(ctx, task))
require.NoError(t, s.addTasksSingleTaskSpec(ctx, tasks))
assertBlamelist(t, hashes, t2, []int{})
assertBlamelist(t, hashes, t3, []int{3})
assertBlamelist(t, hashes, t4, []int{4, 5})
assertBlamelist(t, hashes, t5, []int{0})
assertBlamelist(t, hashes, t6, []int{2})
assertBlamelist(t, hashes, t7, []int{1})
// Check that the tasks were inserted into the DB.
assertModifiedTasks(t, d, mod, tasks)
// addTasksSingleTaskSpec should compute blamelists when new tasks bisect old
// tasks.
func TestAddTasksSingleTaskSpecBisectOld(t *testing.T) {
ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t)
defer cleanup()
t1 := makeTask(ctx, "toil", rs1.Repo, hashes[6])
t2 := makeTask(ctx, "toil", rs1.Repo, hashes[1])
t2.Commits = []string{hashes[1], hashes[2], hashes[3], hashes[4], hashes[5]}
require.NoError(t, s.putTasks(ctx, []*types.Task{t1, t2}))
mod := d.ModifiedTasksCh(ctx)
<-mod // The first batch is unused.
// t3.Commits = {3, 4, 5}
// t2.Commits = {1, 2}
t3 := makeTask(ctx, "toil", rs1.Repo, hashes[3])
// t4.Commits = {4, 5}
// t3.Commits = {3}
t4 := makeTask(ctx, "toil", rs1.Repo, hashes[4])
// t5.Commits = {2}
// t2.Commits = {1}
t5 := makeTask(ctx, "toil", rs1.Repo, hashes[2])
// t6.Commits = {4, 5}
// t4.Commits = {}
t6 := makeTask(ctx, "toil", rs1.Repo, hashes[4])
// Specify tasks in wrong order to ensure results are deterministic.
require.NoError(t, s.addTasksSingleTaskSpec(ctx, []*types.Task{t5, t3, t6, t4}))
t2Updated, err := d.GetTaskById(ctx, t2.Id)
require.NoError(t, err)
assertBlamelist(t, hashes, t2Updated, []int{1})
assertBlamelist(t, hashes, t3, []int{3})
assertBlamelist(t, hashes, t4, []int{})
assertBlamelist(t, hashes, t5, []int{2})
assertBlamelist(t, hashes, t6, []int{4, 5})
// Check that the tasks were inserted into the DB.
t2.Commits = t2Updated.Commits
t2.DbModified = t2Updated.DbModified
assertModifiedTasks(t, d, mod, []*types.Task{t2, t3, t4, t5, t6})
// addTasksSingleTaskSpec should update existing tasks, keeping the correct
// blamelist.
func TestAddTasksSingleTaskSpecUpdate(t *testing.T) {
ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t)
defer cleanup()
t1 := makeTask(ctx, "toil", rs1.Repo, hashes[6])
t2 := makeTask(ctx, "toil", rs1.Repo, hashes[3])
t3 := makeTask(ctx, "toil", rs1.Repo, hashes[4])
t3.Commits = nil // Stolen by t5
t4 := makeTask(ctx, "toil", rs1.Repo, hashes[0])
t4.Commits = []string{hashes[0], hashes[1], hashes[2]}
t5 := makeTask(ctx, "toil", rs1.Repo, hashes[4])
t5.Commits = []string{hashes[4], hashes[5]}
tasks := []*types.Task{t1, t2, t3, t4, t5}
require.NoError(t, s.putTasks(ctx, tasks))
mod := d.ModifiedTasksCh(ctx)
<-mod // The first batch is unused.
// Make an update.
for _, task := range tasks {
task.Status = types.TASK_STATUS_MISHAP
// Specify tasks in wrong order to ensure results are deterministic.
require.NoError(t, s.addTasksSingleTaskSpec(ctx, []*types.Task{t5, t3, t1, t4, t2}))
// Check that blamelists did not change.
assertBlamelist(t, hashes, t1, []int{6})
assertBlamelist(t, hashes, t2, []int{3})
assertBlamelist(t, hashes, t3, []int{})
assertBlamelist(t, hashes, t4, []int{0, 1, 2})
assertBlamelist(t, hashes, t5, []int{4, 5})
// Check that the tasks were inserted into the DB.
assertModifiedTasks(t, d, mod, []*types.Task{t1, t2, t3, t4, t5})
// AddTasks should call addTasksSingleTaskSpec for each group of tasks.
func TestAddTasks(t *testing.T) {
ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t)
defer cleanup()
toil1 := makeTask(ctx, "toil", rs1.Repo, hashes[6])
duty1 := makeTask(ctx, "duty", rs1.Repo, hashes[6])
work1 := makeTask(ctx, "work", rs1.Repo, hashes[6])
work2 := makeTask(ctx, "work", rs1.Repo, hashes[1])
work2.Commits = []string{hashes[1], hashes[2], hashes[3], hashes[4], hashes[5]}
onus1 := makeTask(ctx, "onus", rs1.Repo, hashes[6])
onus2 := makeTask(ctx, "onus", rs1.Repo, hashes[3])
onus2.Commits = []string{hashes[3], hashes[4], hashes[5]}
require.NoError(t, s.putTasks(ctx, []*types.Task{toil1, duty1, work1, work2, onus1, onus2}))
mod := d.ModifiedTasksCh(ctx)
<-mod // The first batch is unused.
// toil2.Commits = {5}
toil2 := makeTask(ctx, "toil", rs1.Repo, hashes[5])
// toil3.Commits = {3, 4}
toil3 := makeTask(ctx, "toil", rs1.Repo, hashes[3])
// duty2.Commits = {1, 2, 3, 4, 5}
duty2 := makeTask(ctx, "duty", rs1.Repo, hashes[1])
// duty3.Commits = {3, 4, 5}
// duty2.Commits = {1, 2}
duty3 := makeTask(ctx, "duty", rs1.Repo, hashes[3])
// work3.Commits = {3, 4, 5}
// work2.Commits = {1, 2}
work3 := makeTask(ctx, "work", rs1.Repo, hashes[3])
// work4.Commits = {2}
// work2.Commits = {1}
work4 := makeTask(ctx, "work", rs1.Repo, hashes[2])
onus2.Status = types.TASK_STATUS_MISHAP
// onus3 steals all commits from onus2
onus3 := makeTask(ctx, "onus", rs1.Repo, hashes[3])
// onus4 steals all commits from onus3
onus4 := makeTask(ctx, "onus", rs1.Repo, hashes[3])
tasks := map[string]map[string][]*types.Task{
rs1.Repo: {
"toil": {toil2, toil3},
"duty": {duty2, duty3},
"work": {work3, work4},
"onus": {onus2, onus3, onus4},
require.NoError(t, s.addTasks(ctx, tasks))
assertBlamelist(t, hashes, toil2, []int{5})
assertBlamelist(t, hashes, toil3, []int{3, 4})
assertBlamelist(t, hashes, duty2, []int{1, 2})
assertBlamelist(t, hashes, duty3, []int{3, 4, 5})
work2Updated, err := d.GetTaskById(ctx, work2.Id)
require.NoError(t, err)
assertBlamelist(t, hashes, work2Updated, []int{1})
assertBlamelist(t, hashes, work3, []int{3, 4, 5})
assertBlamelist(t, hashes, work4, []int{2})
assertBlamelist(t, hashes, onus2, []int{})
assertBlamelist(t, hashes, onus3, []int{})
assertBlamelist(t, hashes, onus4, []int{3, 4, 5})
// Check that the tasks were inserted into the DB.
work2.Commits = work2Updated.Commits
work2.DbModified = work2Updated.DbModified
assertModifiedTasks(t, d, mod, []*types.Task{toil2, toil3, duty2, duty3, work2, work3, work4, onus2, onus3, onus4})
// AddTasks should not leave DB in an inconsistent state if there is a partial error.
func TestAddTasksFailure(t *testing.T) {
ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t)
defer cleanup()
toil1 := makeTask(ctx, "toil", rs1.Repo, hashes[6])
duty1 := makeTask(ctx, "duty", rs1.Repo, hashes[6])
duty2 := makeTask(ctx, "duty", rs1.Repo, hashes[5])
require.NoError(t, s.putTasks(ctx, []*types.Task{toil1, duty1, duty2}))
// Cause ErrConcurrentUpdate in AddTasks.
cachedDuty2 := duty2.Copy()
duty2.Status = types.TASK_STATUS_MISHAP
require.NoError(t, d.PutTask(ctx, duty2))
mod := d.ModifiedTasksCh(ctx)
<-mod // The first batch is unused.
// toil2.Commits = {3, 4, 5}
toil2 := makeTask(ctx, "toil", rs1.Repo, hashes[3])
cachedDuty2.Status = types.TASK_STATUS_FAILURE
// duty3.Commits = {3, 4}
duty3 := makeTask(ctx, "duty", rs1.Repo, hashes[3])
tasks := map[string]map[string][]*types.Task{
rs1.Repo: {
"toil": {toil2},
"duty": {cachedDuty2, duty3},
// Try multiple times to reduce chance of test passing flakily.
for i := 0; i < 3; i++ {
err := s.addTasks(ctx, tasks)
require.Error(t, err)
modTasks := <-mod
// "duty" tasks should never be updated.
for _, task := range modTasks {
require.Equal(t, "toil", task.Name)
assertdeep.Equal(t, toil2, task)
assertBlamelist(t, hashes, toil2, []int{3, 4, 5})
duty2.Status = types.TASK_STATUS_FAILURE
tasks[rs1.Repo]["duty"] = []*types.Task{duty2, duty3}
require.NoError(t, s.addTasks(ctx, tasks))
assertBlamelist(t, hashes, toil2, []int{3, 4, 5})
assertBlamelist(t, hashes, duty2, []int{5})
assertBlamelist(t, hashes, duty3, []int{3, 4})
// Check that the tasks were inserted into the DB.
assertModifiedTasks(t, d, mod, []*types.Task{toil2, duty2, duty3})
// AddTasks should retry on ErrConcurrentUpdate.
func TestAddTasksRetries(t *testing.T) {
ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t)
defer cleanup()
toil1 := makeTask(ctx, "toil", rs1.Repo, hashes[6])
duty1 := makeTask(ctx, "duty", rs1.Repo, hashes[6])
work1 := makeTask(ctx, "work", rs1.Repo, hashes[6])
toil2 := makeTask(ctx, "toil", rs1.Repo, hashes[1])
toil2.Commits = []string{hashes[1], hashes[2], hashes[3], hashes[4], hashes[5]}
duty2 := makeTask(ctx, "duty", rs1.Repo, hashes[1])
duty2.Commits = util.CopyStringSlice(toil2.Commits)
work2 := makeTask(ctx, "work", rs1.Repo, hashes[1])
work2.Commits = util.CopyStringSlice(toil2.Commits)
require.NoError(t, s.putTasks(ctx, []*types.Task{toil1, toil2, duty1, duty2, work1, work2}))
mod := d.ModifiedTasksCh(ctx)
<-mod // The first batch is unused.
// *3.Commits = {3, 4, 5}
// *2.Commits = {1, 2}
toil3 := makeTask(ctx, "toil", rs1.Repo, hashes[3])
duty3 := makeTask(ctx, "duty", rs1.Repo, hashes[3])
work3 := makeTask(ctx, "work", rs1.Repo, hashes[3])
// *4.Commits = {2}
// *2.Commits = {1}
toil4 := makeTask(ctx, "toil", rs1.Repo, hashes[2])
duty4 := makeTask(ctx, "duty", rs1.Repo, hashes[2])
work4 := makeTask(ctx, "work", rs1.Repo, hashes[2])
tasks := map[string]map[string][]*types.Task{
rs1.Repo: {
"toil": {toil3.Copy(), toil4.Copy()},
"duty": {duty3.Copy(), duty4.Copy()},
"work": {work3.Copy(), work4.Copy()},
retryCountMtx := sync.Mutex{}
retryCount := map[string]int{}
causeConcurrentUpdate := func(tasks []*types.Task) {
defer retryCountMtx.Unlock()
if tasks[0].Name == "toil" && retryCount["toil"] < 2 {
toil2.Started = now.Now(ctx).UTC()
require.NoError(t, d.PutTasks(ctx, []*types.Task{toil2}))
if tasks[0].Name == "duty" && retryCount["duty"] < 3 {
duty2.Started = now.Now(ctx).UTC()
require.NoError(t, d.PutTasks(ctx, []*types.Task{duty2}))
if tasks[0].Name == "work" && retryCount["work"] < 4 {
work2.Started = now.Now(ctx).UTC()
require.NoError(t, d.PutTasks(ctx, []*types.Task{work2}))
s.db = &spyDB{
DB: d,
onPutTasks: causeConcurrentUpdate,
require.NoError(t, s.addTasks(ctx, tasks))
defer retryCountMtx.Unlock()
require.Equal(t, 2, retryCount["toil"])
require.Equal(t, 3, retryCount["duty"])
require.Equal(t, 4, retryCount["work"])
modified := []*types.Task{}
check := func(t2, t3, t4 *types.Task) {
t2InDB, err := d.GetTaskById(ctx, t2.Id)
require.NoError(t, err)
assertBlamelist(t, hashes, t2InDB, []int{1})
t3Arg := tasks[t3.Repo][t3.Name][0]
t3InDB, err := d.GetTaskById(ctx, t3Arg.Id)
require.NoError(t, err)
assertdeep.Equal(t, t3Arg, t3InDB)
assertBlamelist(t, hashes, t3InDB, []int{3, 4, 5})
t4Arg := tasks[t4.Repo][t4.Name][1]
t4InDB, err := d.GetTaskById(ctx, t4Arg.Id)
require.NoError(t, err)
assertdeep.Equal(t, t4Arg, t4InDB)
assertBlamelist(t, hashes, t4InDB, []int{2})
t2.Commits = t2InDB.Commits
t2.DbModified = t2InDB.DbModified
t3.Id = t3InDB.Id
t3.Commits = t3InDB.Commits
t3.DbModified = t3InDB.DbModified
t4.Id = t4InDB.Id
t4.Commits = t4InDB.Commits
t4.DbModified = t4InDB.DbModified
modified = append(modified, t2, t3, t4)
check(toil2, toil3, toil4)
check(duty2, duty3, duty4)
check(work2, work3, work4)
assertModifiedTasks(t, d, mod, modified)
func TestTriggerTaskFailed(t *testing.T) {
// Verify that if one task out of a set fails to trigger, the others are
// still inserted into the DB and handled properly, eg. wrt. blamelists.
ctx, _, _, s, swarmingClient, commits, _, _, cleanup := testMultipleCandidatesBackfillingEachOtherSetup(t)
defer cleanup()
// Trigger three tasks. We should attempt to trigger tasks at
// commits[0], commits[4], and either commits[2] or commits[6]. Mock
// failure to trigger the task at commits[4] and ensure that the other
// two tasks get inserted with the correct blamelists.
bot1 := makeBot("bot1", map[string]string{"pool": "Skia"})
bot2 := makeBot("bot2", map[string]string{"pool": "Skia"})
bot3 := makeBot("bot3", map[string]string{"pool": "Skia"})
makeTags := func(commit string) []string {
return []string{
fmt.Sprintf("source_revision:%s", commit),
"source_repo:" + fmt.Sprintf(gitiles.CommitURL, rs1.Repo, "%s"),
fmt.Sprintf("sk_repo:%s", rs1.Repo),
fmt.Sprintf("sk_revision:%s", commit),
mockBots(t, swarmingClient, bot1, bot2, bot3)
err := s.MainLoop(ctx)
require.NotNil(t, err)
require.True(t, strings.Contains(err.Error(), "Mocked trigger failure!"))
require.NoError(t, s.tCache.Update(ctx))
require.Equal(t, 6, len(s.queue))
tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, commits)
require.NoError(t, err)
var t1, t2, t3 *types.Task
for _, byName := range tasks {
for _, task := range byName {
if task.Revision == commits[0] {
t1 = task
} else if task.Revision == commits[4] {
t2 = task
} else if task.Revision == commits[2] || task.Revision == commits[6] {
t3 = task
} else {
require.FailNow(t, fmt.Sprintf("Task has unknown revision: %v", task))
require.NotNil(t, t1)
require.Nil(t, t2)
require.NotNil(t, t3)
// Ensure that we got the blamelists right.
var expect1, expect3 []string
if t3.Revision == commits[2] {
expect1 = util.CopyStringSlice(commits[:2])
expect3 = util.CopyStringSlice(commits[2:])
} else {
expect1 = util.CopyStringSlice(commits[:6])
expect3 = util.CopyStringSlice(commits[6:])
assertdeep.Equal(t, expect1, t1.Commits)
assertdeep.Equal(t, expect3, t3.Commits)
// Check diagnostics.
diag := lastDiagnostics(t, s)
failedTrigger := 0
for _, c := range diag.Candidates {
if c.Revision == commits[4] {
require.True(t, strings.Contains(c.Diagnostics.Triggering.TriggerError, "Mocked trigger failure!"))
} else {
if c.TaskKey == t1.TaskKey {
require.Equal(t, "", c.Diagnostics.Triggering.TriggerError)
require.Equal(t, t1.Id, c.Diagnostics.Triggering.TaskId)
} else if c.TaskKey == t3.TaskKey {
require.Equal(t, "", c.Diagnostics.Triggering.TriggerError)
require.Equal(t, t3.Id, c.Diagnostics.Triggering.TaskId)
} else {
require.Nil(t, c.Diagnostics.Triggering)
// Should be one task that failed
require.Equal(t, 1, failedTrigger)
const badTaskName = "badtask"
type mockDB struct {
failAssignId bool
failPutTasks bool
func (d *mockDB) AssignId(ctx context.Context, t *types.Task) error {
if t.Name == badTaskName && d.failAssignId {
return errors.New(badTaskName)
return d.DB.AssignId(ctx, t)
func (d *mockDB) PutTasks(ctx context.Context, tasks []*types.Task) error {
for _, t := range tasks {
if t.Name == badTaskName && d.failPutTasks {
return errors.New(badTaskName)
return d.DB.PutTasks(ctx, tasks)
func TestContinueOnTriggerTaskFailure(t *testing.T) {
// Verify that if one task out of a set fails any part of the triggering
// process, the others are still triggered, inserted into the DB, etc.
ctx, mg, _, s, swarmingClient, commits, _, cfg, cleanup := testMultipleCandidatesBackfillingEachOtherSetup(t)
defer cleanup()
// Add several more commits.
newCommits := mg.CommitN(10)
commits = append(newCommits, commits...)
badCommit := commits[0]
badTaskName := "badtask"
badPool := "BadPool"
s.pools = []string{"Skia", badPool}
require.NoError(t, s.repos.Update(ctx))
for _, hash := range newCommits {
rs := types.RepoState{
Repo: rs1.Repo,
Revision: hash,
c := cfg.Copy()
if hash == badCommit {
c.Tasks[badTaskName] = &specs.TaskSpec{
CasSpec: "compile",
CipdPackages: []*specs.CipdPackage{},
Dependencies: []string{},
Dimensions: []string{fmt.Sprintf("pool:%s", badPool)},
Priority: 1.0,
c.Jobs["badjob"] = &specs.JobSpec{
TaskSpecs: []string{badTaskName},
fillCaches(t, ctx, s.taskCfgCache, rs, c)
insertJobs(t, ctx, s, rs)
finishAll := func() {
tasks, err := s.tCache.UnfinishedTasks()
require.NoError(t, err)
for _, task := range tasks {
task.Finished = now.Now(ctx)
task.Status = types.TASK_STATUS_SUCCESS
require.NoError(t, s.db.PutTasks(ctx, tasks))
finishAll() // The setup func triggers a task.
badTags := []string{
fmt.Sprintf("sk_dim_pool:%s", badPool),
fmt.Sprintf("source_revision:%s", badCommit),
"source_repo:" + fmt.Sprintf(gitiles.CommitURL, rs1.Repo, "%s"),
fmt.Sprintf("sk_repo:%s", rs1.Repo),
fmt.Sprintf("sk_revision:%s", badCommit),
fmt.Sprintf("sk_name:%s", badTaskName),
test := func() {
// Pretend there are two bots available.
bots := []*types.Machine{
makeBot("bot0", map[string]string{"pool": "Skia"}),
makeBot("bot1", map[string]string{"pool": badPool}),
mockBots(t, swarmingClient, bots...)
// Run MainLoop.
err := s.MainLoop(ctx)
require.NotNil(t, err)
require.NoError(t, s.tCache.Update(ctx))
// We'll try to trigger all tasks but the one for the bad commit will
// fail. Ensure that we triggered all of the others.
tasks, err := s.tCache.UnfinishedTasks()
require.NoError(t, err)
require.Len(t, tasks, 1)
require.NotEqual(t, badTaskName, tasks[0].Name)
// Clean up for the next iteration.
// 1. Actual triggering failed.
// 2. DB.AssignId failed.
mdb := &mockDB{
DB: s.db,
failAssignId: true,
s.db = mdb
// 3. DB.PutTasks failed.
mdb.failAssignId = false
mdb.failPutTasks = true
// 4. Failure to load details of de-duplicated task after triggering.
mdb.failPutTasks = false
swarmingClient.MockTriggerTaskDeduped(badTags, "thistagwontparse")
s.db = mdb.DB
// 5. Failure to merge CAS entries.
// TODO(borenet)
func TestTriggerTaskDeduped(t *testing.T) {
// Verify that we properly handle de-duplicated tasks.
ctx, _, _, s, swarmingClient, commits, _, _, cleanup := testMultipleCandidatesBackfillingEachOtherSetup(t)
defer cleanup()
// Trigger three tasks. We should attempt to trigger tasks at
// commits[0], commits[4], and either commits[2] or commits[6]. Mock
// deduplication of the task at commits[4] and ensure that the other
// two tasks are not deduped.
bot1 := makeBot("bot1", map[string]string{"pool": "Skia"})
bot2 := makeBot("bot2", map[string]string{"pool": "Skia"})
bot3 := makeBot("bot3", map[string]string{"pool": "Skia"})
makeTags := func(commit string) []string {
return []string{
fmt.Sprintf("source_revision:%s", commit),
"source_repo:" + fmt.Sprintf(gitiles.CommitURL, rs1.Repo, "%s"),
fmt.Sprintf("sk_repo:%s", rs1.Repo),
fmt.Sprintf("sk_revision:%s", commit),
mockBots(t, swarmingClient, bot1, bot2, bot3)
require.NoError(t, s.MainLoop(ctx))
require.NoError(t, s.tCache.Update(ctx))
require.Equal(t, 5, len(s.queue))
tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, commits)
require.NoError(t, err)
var t1, t2, t3 *types.Task
for _, byName := range tasks {
for _, task := range byName {
if task.Revision == commits[0] {
t1 = task
} else if task.Revision == commits[4] {
t2 = task
} else if task.Revision == commits[2] || task.Revision == commits[6] {
t3 = task
} else {
require.FailNow(t, fmt.Sprintf("Task has unknown revision: %v", task))
require.NotNil(t, t1)
require.NotNil(t, t2)
require.NotNil(t, t3)
// Ensure that t2 was correctly deduped, and the others weren't.
require.Equal(t, types.TASK_STATUS_PENDING, t1.Status)
require.Equal(t, types.TASK_STATUS_SUCCESS, t2.Status)
require.Equal(t, types.TASK_STATUS_PENDING, t3.Status)
func TestTriggerTaskNoResource(t *testing.T) {
// Verify that we properly handle tasks which are rejected due to lack
// of matching bots.
ctx, _, _, s, swarmingClient, commits, _, _, cleanup := testMultipleCandidatesBackfillingEachOtherSetup(t)
defer cleanup()
// Attempt to trigger a task. It gets rejected because the bot we
// thought was available to run it is now busy/quarantined/offline.
bot1 := makeBot("bot1", map[string]string{"pool": "Skia"})
makeTags := func(commit string) []string {
return []string{
fmt.Sprintf("source_revision:%s", commit),
"source_repo:" + fmt.Sprintf(gitiles.CommitURL, rs1.Repo, "%s"),
fmt.Sprintf("sk_repo:%s", rs1.Repo),
fmt.Sprintf("sk_revision:%s", commit),
mockBots(t, swarmingClient, bot1)
err := s.MainLoop(ctx)
require.NotNil(t, err)
require.True(t, strings.Contains(err.Error(), "No bots available to run faketask with dimensions: pool:Skia"))
require.NoError(t, s.tCache.Update(ctx))
require.Equal(t, 8, len(s.queue))
tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, []string{commits[0]})
require.NoError(t, err)
require.Equal(t, 0, len(tasks[commits[0]]))
func TestScoreCandidate_TryJob_PrioritizedHigherByWaitTime(t *testing.T) {
ctx := context.Background()
test := func(name, jobCreated, now string, expectedScore float64) {
t.Run(name, func(t *testing.T) {
s := TaskScheduler{}
job := types.Job{
Created: rfc3339(t, jobCreated),
tc := asTryJob(TaskCandidate{
Jobs: []*types.Job{&job},
s.scoreCandidate(ctx, &tc, rfc3339(t, now), timeDoesNotMatter, nil)
assert.InDelta(t, expectedScore, tc.Score, 0.0001)
test("no waiting", "2021-10-01T15:00:00Z", "2021-10-01T15:00:00Z", 5)
test("10 min waiting", "2021-10-01T15:00:00Z", "2021-10-01T15:10:00Z", 5.08333)
test("30 min waiting", "2021-10-01T08:00:00Z", "2021-10-01T08:30:00Z", 5.25)
test("2 hours waiting", "2021-10-01T13:00:00Z", "2021-10-01T15:00:00Z", 6)
func TestScoreCandidate_TryJob_PrioritizedLowerByNumRetries(t *testing.T) {
ctx := context.Background()
test := func(name string, numRetries int, expectedScore float64) {
t.Run(name, func(t *testing.T) {
s := TaskScheduler{}
ts := rfc3339(t, "2021-10-01T15:00:00Z") // fixed time indicating no waiting
job := types.Job{
Created: ts,
tc := asTryJob(TaskCandidate{
Jobs: []*types.Job{&job},
Attempt: numRetries,
s.scoreCandidate(ctx, &tc, ts, timeDoesNotMatter, nil)
assert.InDelta(t, expectedScore, tc.Score, 0.0001)
test("first try", 0, 5)
test("second try", 1, 3.75)
test("third try", 2, 2.8125)
test("tenth try", 9, 0.3754)
func TestScoreCandidate_TryJob_MultipleJobPrioritiesImpactScore(t *testing.T) {
ctx := context.Background()
test := func(name string, expectedScore float64, jobPriorities ...float64) {
t.Run(name, func(t *testing.T) {
s := TaskScheduler{}
ts := rfc3339(t, "2021-10-01T15:00:00Z") // fixed time indicating no waiting
tc := asTryJob(TaskCandidate{})
// For each job priority passed in, create a job with that priority that's a part
// of this task candidate.
for _, jp := range jobPriorities {
tc.Jobs = append(tc.Jobs, &types.Job{
Created: ts,
Priority: jp,
s.scoreCandidate(ctx, &tc, ts, timeDoesNotMatter, nil)
assert.InDelta(t, expectedScore, tc.Score, 0.0001)
test("one job, default priority", 5, specs.DEFAULT_JOB_SPEC_PRIORITY)
test("one job, higher priority", 7.5, 0.75)
test("one job, highest priority", 10, 1.0)
test("one job, lower priority", 2.5, 0.25)
test("one job, lowest priority", 0.001, 0.0001)
test("out of range priority results in default", 5, 1234567)
test("zero priority results in default", 5, 0)
// We score tasks higher if they have more jobs.
test("two jobs, medium priority", 7.5, 0.5, 0.5)
test("three jobs, medium priority", 8.75, 0.5, 0.5, 0.5)
test("four jobs, medium priority", 9.375, 0.5, 0.5, 0.5, 0.5)
test("four jobs, mixed priorities", 9.811, 0.1, 0.7, 0.3, 0.9)
// Double-check the math for the mixed priorities test
assert.InDelta(t, .9811, 1-(1-0.1)*(1-0.7)*(1-0.3)*(1-0.9), 0.0001)
func TestScoreCandidate_TryJob_OldestJobOnlyUsedForWaitTime(t *testing.T) {
ctx := context.Background()
cycleStart := rfc3339(t, "2021-10-01T15:00:00Z")
test := func(name string, expectedScore float64, jobCreated ...string) {
t.Run(name, func(t *testing.T) {
s := TaskScheduler{}
tc := asTryJob(TaskCandidate{})
for _, ts := range jobCreated {
tc.Jobs = append(tc.Jobs, &types.Job{
Created: rfc3339(t, ts),
s.scoreCandidate(ctx, &tc, cycleStart, timeDoesNotMatter, nil)
assert.InDelta(t, expectedScore, tc.Score, 0.0001)
// These should all be the same because the first job (which is presumed to be the oldest)
// is the time that is used.
test("two jobs, both waited 2 hours", 9, "2021-10-01T13:00:00Z", "2021-10-01T13:00:00Z")
test("two jobs, only one waited 2 hours", 9, "2021-10-01T13:00:00Z", "2021-10-01T14:55:00Z")
func TestScoreCandidate_ForcedJob_PrioritizedHigherByWaitTime(t *testing.T) {
ctx := context.Background()
test := func(name, jobCreated, now string, expectedScore float64) {
t.Run(name, func(t *testing.T) {
s := TaskScheduler{}
job := types.Job{
Created: rfc3339(t, jobCreated),
tc := asForcedJob(TaskCandidate{
Jobs: []*types.Job{&job},
s.scoreCandidate(ctx, &tc, rfc3339(t, now), timeDoesNotMatter, nil)
assert.InDelta(t, expectedScore, tc.Score, 0.0001)
test("no waiting", "2021-10-01T15:00:00Z", "2021-10-01T15:00:00Z", 50)
test("10 min waiting", "2021-10-01T15:00:00Z", "2021-10-01T15:10:00Z", 50.08333)
test("30 min waiting", "2021-10-01T08:00:00Z", "2021-10-01T08:30:00Z", 50.25)
test("2 hours waiting", "2021-10-01T13:00:00Z", "2021-10-01T15:00:00Z", 51)
func TestScoreCandidate_ForcedJob_MultipleJobPrioritiesImpactScore(t *testing.T) {
ctx := context.Background()
test := func(name string, expectedScore float64, jobPriorities ...float64) {
t.Run(name, func(t *testing.T) {
s := TaskScheduler{}
ts := rfc3339(t, "2021-10-01T15:00:00Z") // fixed time indicating no waiting
tc := asForcedJob(TaskCandidate{})
// For each job priority passed in, create a job with that priority that's a part
// of this task candidate.
for _, jp := range jobPriorities {
tc.Jobs = append(tc.Jobs, &types.Job{
Created: ts,
Priority: jp,
s.scoreCandidate(ctx, &tc, ts, timeDoesNotMatter, nil)
assert.InDelta(t, expectedScore, tc.Score, 0.0001)
test("one job, default priority", 50, specs.DEFAULT_JOB_SPEC_PRIORITY)
test("one job, higher priority", 75, 0.75)
test("one job, highest priority", 100, 1.0)
test("one job, lower priority", 25, 0.25)
test("one job, lowest priority", 0.01, 0.0001)
test("out of range priority results in default", 50, 1234567)
test("zero priority results in default", 50, 0)
// We score tasks higher if they have more jobs.
test("two jobs, medium priority", 75, 0.5, 0.5)
test("three jobs, medium priority", 87.5, 0.5, 0.5, 0.5)
test("four jobs, medium priority", 93.75, 0.5, 0.5, 0.5, 0.5)
test("four jobs, mixed priorities", 98.11, 0.1, 0.7, 0.3, 0.9)
// Double-check the math for the mixed priorities test
assert.InDelta(t, .9811, 1-(1-0.1)*(1-0.7)*(1-0.3)*(1-0.9), 0.0001)
func TestScoreCandidate_ForcedJob_OldestJobOnlyUsedForWaitTime(t *testing.T) {
ctx := context.Background()
cycleStart := rfc3339(t, "2021-10-01T15:00:00Z")
test := func(name string, expectedScore float64, jobCreated ...string) {
t.Run(name, func(t *testing.T) {
s := TaskScheduler{}
tc := asForcedJob(TaskCandidate{})
for _, ts := range jobCreated {
tc.Jobs = append(tc.Jobs, &types.Job{
Created: rfc3339(t, ts),
s.scoreCandidate(ctx, &tc, cycleStart, timeDoesNotMatter, nil)
assert.InDelta(t, expectedScore, tc.Score, 0.0001)
// These should all be the same because the first job (which is presumed to be the oldest)
// is the time that is used.
test("two jobs, both waited 2 hours", 76.5, "2021-10-01T13:00:00Z", "2021-10-01T13:00:00Z")
test("two jobs, only one waited 2 hours", 76.5, "2021-10-01T13:00:00Z", "2021-10-01T14:55:00Z")
func TestComputeBlamelist_NoExistingTests(t *testing.T) {
ctx := context.Background()
const repoName = "some_repo"
const taskName = "Test-Foo-Bar"
tcg := tcc_mocks.TasksAlwaysDefined(taskName)
mg, repo := newMemRepo(t)
firstCommit := mg.Commit("a")
secondCommit := mg.Commit("b", firstCommit)
thirdCommit := mg.Commit("c", secondCommit)
mtc := &cache_mocks.TaskCache{}
mtc.On("GetTaskForCommit", repoName, mock.Anything, taskName).Return(nil, nil)
blamelistNoTask := func(name string, revision string, expectedBlamelist []string) {
t.Run(name, func(t *testing.T) {
blamedCommits, stealFromTask, err := ComputeBlamelist(ctx, mtc, repo, taskName, repoName, repo.Get(revision), commitsBuffer, tcg, window_mocks.AllInclusiveWindow())
require.NoError(t, err)
assert.Equal(t, expectedBlamelist, blamedCommits)
assert.Nil(t, stealFromTask)
// We expect this commit and all following commits to be blamed
blamelistNoTask("first commit", firstCommit, []string{firstCommit})
blamelistNoTask("second commit", secondCommit, []string{secondCommit, firstCommit})
blamelistNoTask("third commit", thirdCommit, []string{thirdCommit, secondCommit, firstCommit})
func TestComputeBlamelist_FirstCommitTested(t *testing.T) {
ctx := context.Background()
const repoName = "some_repo"
const taskName = "Test-Foo-Bar"
tcg := tcc_mocks.TasksAlwaysDefined(taskName)
mg, repo := newMemRepo(t)
firstCommit := mg.Commit("a")
secondCommit := mg.Commit("b", firstCommit)
thirdCommit := mg.Commit("c", secondCommit)
firstCommitTask := newTask("task-1", firstCommit)
mtc := &cache_mocks.TaskCache{}
mtc.On("GetTaskForCommit", repoName, firstCommit, taskName).Return(firstCommitTask, nil)
mtc.On("GetTaskForCommit", repoName, mock.Anything, taskName).Return(nil, nil)
blamelistNoTask := func(name string, revision string, expectedBlamelist []string) {
t.Run(name, func(t *testing.T) {
blamedCommits, stealFromTask, err := ComputeBlamelist(ctx, mtc, repo, taskName, repoName, repo.Get(revision), commitsBuffer, tcg, window_mocks.AllInclusiveWindow())
require.NoError(t, err)
assert.Equal(t, expectedBlamelist, blamedCommits)
assert.Nil(t, stealFromTask)
// For commits after the task, the blamelist should extend back, but not include that commit.
blamelistNoTask("second commit", secondCommit, []string{secondCommit})
blamelistNoTask("third commit", thirdCommit, []string{thirdCommit, secondCommit})
// Calculating a blame for a commit which already ran a task is considered a retry. As such,
// the entire blamelist is returned.
t.Run("first commit (retry)", func(t *testing.T) {
blamedCommits, stealFromTask, err := ComputeBlamelist(ctx, mtc, repo, taskName, repoName, repo.Get(firstCommit), commitsBuffer, tcg, window_mocks.AllInclusiveWindow())
require.NoError(t, err)
assert.Equal(t, []string{firstCommit}, blamedCommits)
assert.Equal(t, firstCommitTask, stealFromTask)
func TestComputeBlamelist_LastCommitTested_FollowingCommitsBlamed(t *testing.T) {
ctx := context.Background()
const repoName = "some_repo"
const taskName = "Test-Foo-Bar"
tcg := tcc_mocks.TasksAlwaysDefined(taskName)
mg, repo := newMemRepo(t)
firstCommit := mg.Commit("a")
secondCommit := mg.Commit("b", firstCommit)
thirdCommit := mg.Commit("c", secondCommit)
thirdCommitTask := newTask("task-1", thirdCommit, secondCommit, firstCommit)
mtc := &cache_mocks.TaskCache{}
// By returning this task for all commits, we are saying that every commit is covered by
// the task that ran on the last commit.
mtc.On("GetTaskForCommit", repoName, mock.Anything, taskName).Return(thirdCommitTask, nil)
blamelistAndTask := func(name string, revision string, expectedBlamelist []string, expectedTask *types.Task) {
t.Run(name, func(t *testing.T) {
blamedCommits, stealFromTask, err := ComputeBlamelist(ctx, mtc, repo, taskName, repoName, repo.Get(revision), commitsBuffer, tcg, window_mocks.AllInclusiveWindow())
require.NoError(t, err)
assert.Equal(t, expectedBlamelist, blamedCommits)
assert.Equal(t, expectedTask, stealFromTask)
blamelistAndTask("first commit", firstCommit, []string{firstCommit}, thirdCommitTask)
blamelistAndTask("second commit", secondCommit, []string{secondCommit, firstCommit}, thirdCommitTask)
// Calculating a blame for a commit which already ran a task is considered a retry. As such,
// the entire blamelist is returned.
blamelistAndTask("third commit (retry)", thirdCommit, []string{thirdCommit, secondCommit, firstCommit}, thirdCommitTask)
func TestComputeBlamelist_BlamelistTooLong_UseProvidedCommit(t *testing.T) {
ctx := context.Background()
const repoName = "some_repo"
const taskName = "Test-Foo-Bar"
tcg := tcc_mocks.TasksAlwaysDefined(taskName)
// Pretend we have 1000 commits in a row. We make sure their hashes are unique but predictable
// by using the integer values from 0 to 999 prefixed with enough zeros to make them 40 digits
// (the length of a real-world git SHA1 hash).
require.Greater(t, 1000, MAX_BLAMELIST_COMMITS)
mg, repo := newMemRepo(t)
for i := 1; i < 1000; i++ {
h := fmt.Sprintf("Commit_%d", i)
lastCommit := repo.Get(repo.Branches()[0])
require.Equal(t, "Commit_999", lastCommit.Subject)
mtc := &cache_mocks.TaskCache{}
mtc.On("GetTaskForCommit", repoName, mock.Anything, taskName).Return(nil, nil)
blamedCommits, stealFromTask, err := ComputeBlamelist(ctx, mtc, repo, taskName, repoName, lastCommit, commitsBuffer, tcg, window_mocks.AllInclusiveWindow())
require.NoError(t, err)
assert.Equal(t, []string{lastCommit.Hash}, blamedCommits)
assert.Nil(t, stealFromTask)
func TestComputeBlamelist_BranchInHistory_NonOverlappingCoverage(t *testing.T) {
ctx := context.Background()
const repoName = "some_repo"
const taskName = "Test-Foo-Bar"
tcg := tcc_mocks.TasksAlwaysDefined(taskName)
// This represents a git history with a branch and merge like this.
// The asterisks represent at which commits a task has run. Only one of the branches
// has a test, so commit 2 is not "double covered".
// 0
// 1*
// 2
// 3a 3b
// 4a* 4b
// 5a |
// 6
mg, repo := newMemRepo(t)
zero := mg.Commit("0")
one := mg.Commit("1", zero)
two := mg.Commit("2", one)
threeA := mg.Commit("3a", two)
threeB := mg.Commit("3b", two)
fourA := mg.Commit("4a", threeA)
fourB := mg.Commit("4b", threeB)
fiveA := mg.Commit("5a", fourA)
six := mg.Commit("6", fourB, fiveA)
commitOneTask := newTask("task-1", one, zero)
commitFourATask := newTask("task-2", fourA, threeA, two)
mtc := &cache_mocks.TaskCache{}
mtc.On("GetTaskForCommit", repoName, zero, taskName).Return(commitOneTask, nil)
mtc.On("GetTaskForCommit", repoName, one, taskName).Return(commitOneTask, nil)
mtc.On("GetTaskForCommit", repoName, two, taskName).Return(commitFourATask, nil)
mtc.On("GetTaskForCommit", repoName, threeA, taskName).Return(commitFourATask, nil)
mtc.On("GetTaskForCommit", repoName, fourA, taskName).Return(commitFourATask, nil)
mtc.On("GetTaskForCommit", repoName, mock.Anything, taskName).Return(nil, nil)
blamelistAndTask := func(name string, revision string, expectedBlamelist []string, expectedTask *types.Task) {
t.Run(name, func(t *testing.T) {
blamedCommits, stealFromTask, err := ComputeBlamelist(ctx, mtc, repo, taskName, repoName, repo.Get(revision), commitsBuffer, tcg, window_mocks.AllInclusiveWindow())
require.NoError(t, err)
assert.Equal(t, expectedBlamelist, blamedCommits)
assert.Equal(t, expectedTask, stealFromTask)
blamelistAndTask("commit zero", zero, []string{zero}, commitOneTask)
blamelistAndTask("commit one", one, []string{one, zero}, commitOneTask)
blamelistAndTask("commit two", two, []string{two}, commitFourATask)
blamelistAndTask("commit threeA", threeA, []string{threeA, two}, commitFourATask)
blamelistAndTask("commit threeB", threeB, []string{threeB}, nil)
blamelistAndTask("commit fourA", fourA, []string{fourA, threeA, two}, commitFourATask)
blamelistAndTask("commit fourB", fourB, []string{fourB, threeB}, nil)
blamelistAndTask("commit fiveA", fiveA, []string{fiveA}, nil)
blamelistAndTask("commit six", six, []string{six, fourB, threeB, fiveA}, nil)
var (
// Use of timeDoesNotMatter indicates that the time does not affect the outputs.
timeDoesNotMatter = time.Time{}
commitsBuffer = make([]*repograph.Commit, 0, MAX_BLAMELIST_COMMITS)
func rfc3339(t *testing.T, n string) time.Time {
ts, err := time.Parse(time.RFC3339, n)
require.NoError(t, err)
return ts
func asTryJob(c TaskCandidate) TaskCandidate {
c.Issue = "nonempty"
c.Patchset = "nonempty"
c.Server = "nonempty"
if !c.IsTryJob() {
panic("Should be a tryjob task candidate now")
return c
func asForcedJob(c TaskCandidate) TaskCandidate {
c.ForcedJobId = "nonempty"
if !c.IsForceRun() {
panic("Should be a tryjob task candidate now")
return c
func newTask(id string, ranAt string, alsoCovered ...string) *types.Task {
nt := types.Task{Id: id}
nt.Revision = ranAt
nt.Commits = append(nt.Commits, ranAt)
for _, c := range alsoCovered {
nt.Commits = append(nt.Commits, c)
return &nt
func TestRegenerateTaskQueue_OnlyBestCandidateForCD(t *testing.T) {
ctx := context.Background()
// Create a TasksCfg.
tc := &specs.TasksCfg{
Jobs: map[string]*specs.JobSpec{
"cd-job": {
IsCD: true,
TaskSpecs: []string{
Tasks: map[string]*specs.TaskSpec{
"cd-task": {
CasSpec: "empty",
Dimensions: []string{fmt.Sprintf("pool:%s", cdPoolName)},
CasSpecs: map[string]*specs.CasSpec{
"empty": {
Digest: rbe.EmptyDigest,
tcc := &tcc_mocks.TaskCfgCache{}
tcc.On("Get", mock.Anything, mock.Anything).Return(tc, nil, nil)
// There are three commits in the repo.
mg, repo := newMemRepo(t)
hashes := mg.CommitN(3)
rs1 := types.RepoState{
Repo: "fake-repo",
Revision: hashes[2],
rs2 := types.RepoState{
Repo: "fake-repo",
Revision: hashes[1],
rs3 := types.RepoState{
Repo: "fake-repo",
Revision: hashes[0],
repos := repograph.Map{
rs1.Repo: repo,
// Add jobs to the cache.
jCache := &cache_mocks.JobCache{}
jobs := []*types.Job{
Name: "cd-job",
RepoState: rs1,
Created: rfc3339(t, "2021-10-01T15:00:00Z"),
Dependencies: map[string][]string{"cd-task": {}},
Name: "cd-job",
RepoState: rs2,
Created: rfc3339(t, "2021-10-01T15:00:00Z"),
Dependencies: map[string][]string{"cd-task": {}},
Name: "cd-job",
RepoState: rs3,
Created: rfc3339(t, "2021-10-01T15:00:00Z"),
Dependencies: map[string][]string{"cd-task": {}},
jCache.On("UnfinishedJobs").Return(jobs, nil)
// No tasks in the TaskCache.
tCache := &cache_mocks.TaskCache{}
tCache.On("GetTasksByKey", mock.Anything).Return([]*types.Task{}, nil)
tCache.On("GetTaskForCommit", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
// Set up a time window to include everything.
w := window_mocks.AllInclusiveWindow()
// Create the scheduler.
s := &TaskScheduler{
candidateMetrics: map[string]metrics2.Int64Metric{},
jCache: jCache,
repos: repos,
taskCfgCache: tcc,
tCache: tCache,
window: w,
// Regenerate the task queue.
queue, _, err := s.regenerateTaskQueue(ctx)
require.NoError(t, err)
// There should only be one task in the queue, despite the fact that there
// are three candidates available, since we only consider running the newest
// candidate.
require.Len(t, queue, 1)
c := queue[0]
require.Len(t, c.Commits, 3)
require.True(t, c.IsCD)
require.Equal(t, rs3.Revision, c.Revision)
func TestRegenerateTaskQueue_NoBackfillForCD(t *testing.T) {
ctx := context.Background()
// Create a TasksCfg.
tc := &specs.TasksCfg{
Jobs: map[string]*specs.JobSpec{
"cd-job": {
IsCD: true,
TaskSpecs: []string{
Tasks: map[string]*specs.TaskSpec{
"cd-task": {
CasSpec: "empty",
Dimensions: []string{fmt.Sprintf("pool:%s", cdPoolName)},
CasSpecs: map[string]*specs.CasSpec{
"empty": {
Digest: rbe.EmptyDigest,
tcc := &tcc_mocks.TaskCfgCache{}
tcc.On("Get", mock.Anything, mock.Anything).Return(tc, nil, nil)
// There are three commits in the repo.
gb, repo := newMemRepo(t)
hashes := gb.CommitN(3)
rs1 := types.RepoState{
Repo: "fake-repo",
Revision: hashes[2],
rs2 := types.RepoState{
Repo: "fake-repo",
Revision: hashes[1],
rs3 := types.RepoState{
Repo: "fake-repo",
Revision: hashes[0],
repos := repograph.Map{
rs1.Repo: repo,
// Add jobs to the cache.
jCache := &cache_mocks.JobCache{}
jobs := []*types.Job{
Name: "cd-job",
RepoState: rs1,
Created: rfc3339(t, "2021-10-01T15:00:00Z"),
Dependencies: map[string][]string{"cd-task": {}},
Name: "cd-job",
RepoState: rs2,
Created: rfc3339(t, "2021-10-01T15:00:00Z"),
Dependencies: map[string][]string{"cd-task": {}},
Name: "cd-job",
RepoState: rs3,
Created: rfc3339(t, "2021-10-01T15:00:00Z"),
Dependencies: map[string][]string{"cd-task": {}},
jCache.On("UnfinishedJobs").Return(jobs, nil)
// One task already ran at the newest commit.
t3 := &types.Task{
Commits: []string{rs3.Revision, rs2.Revision, rs1.Revision},
Id: "fake-task",
TaskKey: types.TaskKey{
Name: "cd-task",
RepoState: rs3,
tCache := &cache_mocks.TaskCache{}
tCache.On("GetTasksByKey", types.TaskKey{
RepoState: rs3,
Name: "cd-task",
}).Return([]*types.Task{t3}, nil)
tCache.On("GetTasksByKey", mock.Anything).Return([]*types.Task{}, nil)
tCache.On("GetTaskForCommit", rs1.Repo, rs1.Revision, "cd-task").Return(t3, nil)
tCache.On("GetTaskForCommit", rs2.Repo, rs2.Revision, "cd-task").Return(t3, nil)
// Set up a time window to include everything.
w := window_mocks.AllInclusiveWindow()
// Create the scheduler.
s := &TaskScheduler{
candidateMetrics: map[string]metrics2.Int64Metric{},
jCache: jCache,
repos: repos,
taskCfgCache: tcc,
tCache: tCache,
window: w,
// Regenerate the task queue. We should end up with zero candidates, despite
// the fact that we have two unfinished jobs at the older commits.
queue, _, err := s.regenerateTaskQueue(ctx)
require.NoError(t, err)
require.Empty(t, queue)
func TestFilterTaskCandidates_NoCDTasksInRegularPools(t *testing.T) {
ctx := context.Background()
// Create a single candidate, which is a CD task but requests to be run in a
// non-CD pool.
k1 := types.TaskKey{
RepoState: rs1,
Name: tcc_testutils.BuildTaskName,
candidate := &TaskCandidate{
IsCD: true,
TaskKey: k1,
TaskSpec: &specs.TaskSpec{
Dimensions: []string{"pool:Skia"},
candidates := map[types.TaskKey]*TaskCandidate{
k1: candidate,
// Set up mocks.
tCache := &cache_mocks.TaskCache{}
tCache.On("GetTasksByKey", candidate.TaskKey).Return([]*types.Task{}, nil)
s := &TaskScheduler{
cdPool: cdPoolName,
tCache: tCache,
window: window_mocks.AllInclusiveWindow(),
// Ensure that the candidate gets filtered out, and that the diagnostics
// tell us why.
c, err := s.filterTaskCandidates(ctx, candidates)
require.NoError(t, err)
require.Empty(t, c)
require.Equal(t, "Skia", candidate.GetDiagnostics().Filtering.ForbiddenPool)
func TestFilterTaskCandidates_NoRegularTasksInCDPool(t *testing.T) {
ctx := context.Background()
// Create a single candidate, which is a non-CD task but requests to be run
// in the CD pool.
k1 := types.TaskKey{
RepoState: rs1,
Name: tcc_testutils.BuildTaskName,
candidate := &TaskCandidate{
TaskKey: k1,
TaskSpec: &specs.TaskSpec{
Dimensions: []string{fmt.Sprintf("pool:%s", cdPoolName)},
candidates := map[types.TaskKey]*TaskCandidate{
k1: candidate,
// Set up mocks.
tCache := &cache_mocks.TaskCache{}
tCache.On("GetTasksByKey", candidate.TaskKey).Return([]*types.Task{}, nil)
s := &TaskScheduler{
cdPool: cdPoolName,
tCache: tCache,
window: window_mocks.AllInclusiveWindow(),
// Ensure that the candidate gets filtered out, and that the diagnostics
// tell us why.
c, err := s.filterTaskCandidates(ctx, candidates)
require.NoError(t, err)
require.Empty(t, c)
require.Equal(t, cdPoolName, candidate.GetDiagnostics().Filtering.ForbiddenPool)
// newMemRepo is a convenience function which creates a MemGit backed by an
// in-memory GitStore and builds a repograph.Graph from it. MemGit automatically
// calls Graph.Update() whenever it is mutated.
func newMemRepo(t sktest.TestingT) (*mem_git.MemGit, *repograph.Graph) {
gs := mem_gitstore.New()
gb := mem_git.New(t, gs)
ctx := context.Background()
ri, err := gitstore.NewGitStoreRepoImpl(ctx, gs)
require.NoError(t, err)
repo, err := repograph.NewWithRepoImpl(ctx, ri)
require.NoError(t, err)
return gb, repo