|  | package scheduling | 
|  |  | 
|  | import ( | 
|  | "context" | 
|  | "encoding/json" | 
|  | "errors" | 
|  | "fmt" | 
|  | "math" | 
|  | "os" | 
|  | "path" | 
|  | "sort" | 
|  | "strings" | 
|  | "sync" | 
|  | "testing" | 
|  | "time" | 
|  |  | 
|  | "cloud.google.com/go/storage" | 
|  | "github.com/stretchr/testify/assert" | 
|  | "github.com/stretchr/testify/mock" | 
|  | "github.com/stretchr/testify/require" | 
|  | swarming_api "go.chromium.org/luci/common/api/swarming/swarming/v1" | 
|  |  | 
|  | "go.skia.org/infra/go/cas/mocks" | 
|  | "go.skia.org/infra/go/cas/rbe" | 
|  | "go.skia.org/infra/go/deepequal" | 
|  | "go.skia.org/infra/go/deepequal/assertdeep" | 
|  | ftestutils "go.skia.org/infra/go/firestore/testutils" | 
|  | "go.skia.org/infra/go/gcs/mem_gcsclient" | 
|  | "go.skia.org/infra/go/git" | 
|  | "go.skia.org/infra/go/git/repograph" | 
|  | "go.skia.org/infra/go/git/testutils/mem_git" | 
|  | "go.skia.org/infra/go/gitiles" | 
|  | "go.skia.org/infra/go/gitstore" | 
|  | "go.skia.org/infra/go/gitstore/mem_gitstore" | 
|  | "go.skia.org/infra/go/metrics2" | 
|  | "go.skia.org/infra/go/mockhttpclient" | 
|  | "go.skia.org/infra/go/now" | 
|  | "go.skia.org/infra/go/sktest" | 
|  | "go.skia.org/infra/go/swarming" | 
|  | "go.skia.org/infra/go/testutils" | 
|  | "go.skia.org/infra/go/util" | 
|  | "go.skia.org/infra/go/vcsinfo" | 
|  | "go.skia.org/infra/task_scheduler/go/db" | 
|  | "go.skia.org/infra/task_scheduler/go/db/cache" | 
|  | cache_mocks "go.skia.org/infra/task_scheduler/go/db/cache/mocks" | 
|  | "go.skia.org/infra/task_scheduler/go/db/memory" | 
|  | "go.skia.org/infra/task_scheduler/go/skip_tasks" | 
|  | "go.skia.org/infra/task_scheduler/go/specs" | 
|  | "go.skia.org/infra/task_scheduler/go/task_cfg_cache" | 
|  | tcc_mocks "go.skia.org/infra/task_scheduler/go/task_cfg_cache/mocks" | 
|  | tcc_testutils "go.skia.org/infra/task_scheduler/go/task_cfg_cache/testutils" | 
|  | swarming_task_execution "go.skia.org/infra/task_scheduler/go/task_execution/swarming" | 
|  | swarming_testutils "go.skia.org/infra/task_scheduler/go/testutils" | 
|  | "go.skia.org/infra/task_scheduler/go/types" | 
|  | "go.skia.org/infra/task_scheduler/go/window" | 
|  | window_mocks "go.skia.org/infra/task_scheduler/go/window/mocks" | 
|  | ) | 
|  |  | 
|  | const ( | 
|  | scoreDelta = 0.000001 | 
|  | cdPoolName = "cd-pool" | 
|  | ) | 
|  |  | 
|  | var ( | 
|  | androidTaskDims = map[string]string{ | 
|  | "pool":        "Skia", | 
|  | "os":          "Android", | 
|  | "device_type": "grouper", | 
|  | } | 
|  |  | 
|  | linuxTaskDims = map[string]string{ | 
|  | "os":   "Ubuntu", | 
|  | "pool": "Skia", | 
|  | } | 
|  |  | 
|  | androidBotDims = map[string][]string{ | 
|  | "pool":        {"Skia"}, | 
|  | "os":          {"Android"}, | 
|  | "device_type": {"grouper"}, | 
|  | } | 
|  |  | 
|  | linuxBotDims = map[string][]string{ | 
|  | "os":   {"Ubuntu"}, | 
|  | "pool": {"Skia"}, | 
|  | } | 
|  |  | 
|  | // The following are commits and RepoStates used in tests. The commit | 
|  | // hashes were chosen to be equal to those generated by MemGit, but the | 
|  | // contents are completely arbitrary. | 
|  | lc1 = &vcsinfo.LongCommit{ | 
|  | ShortCommit: &vcsinfo.ShortCommit{ | 
|  | Hash:    mem_git.Commit0, | 
|  | Author:  "me@google.com", | 
|  | Subject: "First Commit", | 
|  | }, | 
|  | Body:      "My first commit", | 
|  | Timestamp: time.Unix(1571926390, 0), | 
|  | Index:     0, | 
|  | Branches: map[string]bool{ | 
|  | git.MainBranch: true, | 
|  | }, | 
|  | } | 
|  | lc2 = &vcsinfo.LongCommit{ | 
|  | ShortCommit: &vcsinfo.ShortCommit{ | 
|  | Hash:    mem_git.Commit1, | 
|  | Author:  "you@google.com", | 
|  | Subject: "Second Commit", | 
|  | }, | 
|  | Body:      "My second commit", | 
|  | Parents:   []string{lc1.Hash}, | 
|  | Timestamp: time.Unix(1571926450, 0), | 
|  | Index:     1, | 
|  | Branches: map[string]bool{ | 
|  | git.MainBranch: true, | 
|  | }, | 
|  | } | 
|  |  | 
|  | ic1 = &vcsinfo.IndexCommit{ | 
|  | Hash:      lc1.Hash, | 
|  | Index:     lc1.Index, | 
|  | Timestamp: lc1.Timestamp, | 
|  | } | 
|  | ic2 = &vcsinfo.IndexCommit{ | 
|  | Hash:      lc2.Hash, | 
|  | Index:     lc2.Index, | 
|  | Timestamp: lc2.Timestamp, | 
|  | } | 
|  |  | 
|  | rs1 = types.RepoState{ | 
|  | Repo:     "fake.git", | 
|  | Revision: lc1.Hash, | 
|  | } | 
|  | rs2 = types.RepoState{ | 
|  | Repo:     "fake.git", | 
|  | Revision: lc2.Hash, | 
|  | } | 
|  | ) | 
|  |  | 
|  | func makeTask(ctx context.Context, name, repo, revision string) *types.Task { | 
|  | return &types.Task{ | 
|  | Commits: []string{revision}, | 
|  | Created: now.Now(ctx), | 
|  | TaskKey: types.TaskKey{ | 
|  | RepoState: types.RepoState{ | 
|  | Repo:     repo, | 
|  | Revision: revision, | 
|  | }, | 
|  | Name: name, | 
|  | }, | 
|  | MaxAttempts:    types.DEFAULT_MAX_TASK_ATTEMPTS, | 
|  | SwarmingTaskId: "swarmid", | 
|  | } | 
|  | } | 
|  |  | 
|  | func makeSwarmingRpcsTaskRequestMetadata(t *testing.T, task *types.Task, dims map[string]string) *swarming_api.SwarmingRpcsTaskRequestMetadata { | 
|  | tag := func(k, v string) string { | 
|  | return fmt.Sprintf("%s:%s", k, v) | 
|  | } | 
|  | ts := func(t time.Time) string { | 
|  | if util.TimeIsZero(t) { | 
|  | return "" | 
|  | } | 
|  | return t.Format(swarming.TIMESTAMP_FORMAT) | 
|  | } | 
|  | abandoned := "" | 
|  | state := swarming.TASK_STATE_PENDING | 
|  | failed := false | 
|  | switch task.Status { | 
|  | case types.TASK_STATUS_MISHAP: | 
|  | state = swarming.TASK_STATE_BOT_DIED | 
|  | abandoned = ts(task.Finished) | 
|  | case types.TASK_STATUS_RUNNING: | 
|  | state = swarming.TASK_STATE_RUNNING | 
|  | case types.TASK_STATUS_FAILURE: | 
|  | state = swarming.TASK_STATE_COMPLETED | 
|  | failed = true | 
|  | case types.TASK_STATUS_SUCCESS: | 
|  | state = swarming.TASK_STATE_COMPLETED | 
|  | case types.TASK_STATUS_PENDING: | 
|  | // noop | 
|  | default: | 
|  | require.FailNow(t, "Unknown task status: %s", task.Status) | 
|  | } | 
|  | tags := []string{ | 
|  | tag(types.SWARMING_TAG_ID, task.Id), | 
|  | tag(types.SWARMING_TAG_FORCED_JOB_ID, task.ForcedJobId), | 
|  | tag(types.SWARMING_TAG_NAME, task.Name), | 
|  | tag(swarming.DIMENSION_POOL_KEY, swarming.DIMENSION_POOL_VALUE_SKIA), | 
|  | tag(types.SWARMING_TAG_REPO, task.Repo), | 
|  | tag(types.SWARMING_TAG_REVISION, task.Revision), | 
|  | } | 
|  | for _, p := range task.ParentTaskIds { | 
|  | tags = append(tags, tag(types.SWARMING_TAG_PARENT_TASK_ID, p)) | 
|  | } | 
|  |  | 
|  | dimensions := make([]*swarming_api.SwarmingRpcsStringPair, 0, len(dims)) | 
|  | for k, v := range dims { | 
|  | dimensions = append(dimensions, &swarming_api.SwarmingRpcsStringPair{ | 
|  | Key:   k, | 
|  | Value: v, | 
|  | }) | 
|  | } | 
|  |  | 
|  | var casOutput *swarming_api.SwarmingRpcsCASReference | 
|  | if task.IsolatedOutput != "" { | 
|  | var err error | 
|  | casOutput, err = swarming.MakeCASReference(task.IsolatedOutput, "fake-cas-instance") | 
|  | require.NoError(t, err) | 
|  | } | 
|  |  | 
|  | return &swarming_api.SwarmingRpcsTaskRequestMetadata{ | 
|  | Request: &swarming_api.SwarmingRpcsTaskRequest{ | 
|  | CreatedTs: ts(task.Created), | 
|  | TaskSlices: []*swarming_api.SwarmingRpcsTaskSlice{ | 
|  | { | 
|  | Properties: &swarming_api.SwarmingRpcsTaskProperties{ | 
|  | Dimensions: dimensions, | 
|  | }, | 
|  | }, | 
|  | }, | 
|  | Tags: tags, | 
|  | }, | 
|  | TaskId: task.SwarmingTaskId, | 
|  | TaskResult: &swarming_api.SwarmingRpcsTaskResult{ | 
|  | AbandonedTs:   abandoned, | 
|  | BotId:         task.SwarmingBotId, | 
|  | CreatedTs:     ts(task.Created), | 
|  | CompletedTs:   ts(task.Finished), | 
|  | Failure:       failed, | 
|  | CasOutputRoot: casOutput, | 
|  | StartedTs:     ts(task.Started), | 
|  | State:         state, | 
|  | Tags:          tags, | 
|  | TaskId:        task.SwarmingTaskId, | 
|  | }, | 
|  | } | 
|  | } | 
|  |  | 
|  | // Insert the given data into the caches. | 
|  | func fillCaches(t sktest.TestingT, ctx context.Context, taskCfgCache task_cfg_cache.TaskCfgCache, rs types.RepoState, cfg *specs.TasksCfg) { | 
|  | require.NoError(t, taskCfgCache.Set(ctx, rs, cfg, nil)) | 
|  | } | 
|  |  | 
|  | // Add jobs for the given RepoStates. Assumes that the RepoStates have already | 
|  | // been added to the caches. | 
|  | func insertJobs(t sktest.TestingT, ctx context.Context, s *TaskScheduler, rss ...types.RepoState) { | 
|  | jobs := []*types.Job{} | 
|  | for _, rs := range rss { | 
|  | cfg, cachedErr, err := s.taskCfgCache.Get(ctx, rs) | 
|  | require.NoError(t, err) | 
|  | require.NoError(t, cachedErr) | 
|  | for name := range cfg.Jobs { | 
|  | j, err := task_cfg_cache.MakeJob(ctx, s.taskCfgCache, rs, name) | 
|  | require.NoError(t, err) | 
|  | jobs = append(jobs, j) | 
|  | } | 
|  | } | 
|  | require.NoError(t, s.putJobsInChunks(ctx, jobs)) | 
|  | } | 
|  |  | 
|  | // Common setup for TaskScheduler tests. | 
|  | func setup(t *testing.T) (context.Context, *mem_git.MemGit, *memory.InMemoryDB, *swarming_testutils.TestClient, *TaskScheduler, *mockhttpclient.URLMock, *mocks.CAS, func()) { | 
|  |  | 
|  | ctx, cancel := context.WithCancel(context.Background()) | 
|  |  | 
|  | tmp, err := os.MkdirTemp("", "") | 
|  | require.NoError(t, err) | 
|  |  | 
|  | d := memory.NewInMemoryDB() | 
|  | swarmingClient := swarming_testutils.NewTestClient() | 
|  | urlMock := mockhttpclient.NewURLMock() | 
|  | mg, repo := newMemRepo(t) | 
|  | hashes := mg.CommitN(2) | 
|  | // Sanity check. | 
|  | require.Equal(t, lc1.Hash, hashes[1]) | 
|  | require.Equal(t, lc2.Hash, hashes[0]) | 
|  | repos := repograph.Map{ | 
|  | rs1.Repo: repo, | 
|  | } | 
|  | btProject, btInstance, btCleanup := tcc_testutils.SetupBigTable(t) | 
|  | taskCfgCache, err := task_cfg_cache.NewTaskCfgCache(ctx, repos, btProject, btInstance, nil) | 
|  | require.NoError(t, err) | 
|  |  | 
|  | // Cache the RepoStates. This is normally done by the JobCreator. | 
|  | fillCaches(t, ctx, taskCfgCache, rs1, tcc_testutils.TasksCfg1) | 
|  | fillCaches(t, ctx, taskCfgCache, rs2, tcc_testutils.TasksCfg2) | 
|  |  | 
|  | cas := &mocks.CAS{} | 
|  | cas.On("Close").Return(nil) | 
|  | // Go ahead and mock the single-input merge calls. | 
|  | cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.CompileCASDigest}).Return(tcc_testutils.CompileCASDigest, nil) | 
|  | cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.TestCASDigest}).Return(tcc_testutils.TestCASDigest, nil) | 
|  | cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.PerfCASDigest}).Return(tcc_testutils.PerfCASDigest, nil) | 
|  |  | 
|  | taskExec := swarming_task_execution.NewSwarmingTaskExecutor(swarmingClient, "fake-cas-instance", "") | 
|  | taskExecs := map[string]types.TaskExecutor{ | 
|  | types.TaskExecutor_Swarming:   taskExec, | 
|  | types.TaskExecutor_UseDefault: taskExec, | 
|  | } | 
|  | s, err := NewTaskScheduler(ctx, d, nil, time.Duration(math.MaxInt64), 0, repos, cas, "fake-cas-instance", taskExecs, urlMock.Client(), 1.0, swarming.POOLS_PUBLIC, cdPoolName, "", taskCfgCache, nil, mem_gcsclient.New("diag_unit_tests"), btInstance, false) | 
|  | require.NoError(t, err) | 
|  |  | 
|  | // Insert jobs. This is normally done by the JobCreator. | 
|  | insertJobs(t, ctx, s, rs1, rs2) | 
|  |  | 
|  | return ctx, mg, d, swarmingClient, s, urlMock, cas, func() { | 
|  | testutils.AssertCloses(t, s) | 
|  | testutils.RemoveAll(t, tmp) | 
|  | btCleanup() | 
|  | cancel() | 
|  | } | 
|  | } | 
|  |  | 
|  | // runMainLoop calls s.MainLoop, asserts there was no error, and waits for | 
|  | // background goroutines to finish. | 
|  | func runMainLoop(t *testing.T, s *TaskScheduler, ctx context.Context) { | 
|  | require.NoError(t, s.MainLoop(ctx)) | 
|  | s.testWaitGroup.Wait() | 
|  | } | 
|  |  | 
|  | func lastDiagnostics(t *testing.T, s *TaskScheduler) taskSchedulerMainLoopDiagnostics { | 
|  | ctx := context.Background() | 
|  | lastname := "" | 
|  | require.NoError(t, s.diagClient.AllFilesInDirectory(ctx, path.Join(s.diagInstance, GCS_MAIN_LOOP_DIAGNOSTICS_DIR), func(item *storage.ObjectAttrs) error { | 
|  | if lastname == "" || item.Name > lastname { | 
|  | lastname = item.Name | 
|  | } | 
|  | return nil | 
|  | })) | 
|  | require.NotEqual(t, lastname, "") | 
|  | reader, err := s.diagClient.FileReader(ctx, lastname) | 
|  | require.NoError(t, err) | 
|  | defer testutils.AssertCloses(t, reader) | 
|  | rv := taskSchedulerMainLoopDiagnostics{} | 
|  | require.NoError(t, json.NewDecoder(reader).Decode(&rv)) | 
|  | return rv | 
|  | } | 
|  |  | 
|  | func TestFindTaskCandidatesForJobs(t *testing.T) { | 
|  | ctx, _, _, _, s, _, _, cleanup := setup(t) | 
|  | defer cleanup() | 
|  |  | 
|  | test := func(jobs []*types.Job, expect map[types.TaskKey]*TaskCandidate) { | 
|  | actual, err := s.findTaskCandidatesForJobs(ctx, jobs) | 
|  | require.NoError(t, err) | 
|  | assertdeep.Equal(t, expect, actual) | 
|  | } | 
|  |  | 
|  | cfg1, cachedErr, err := s.taskCfgCache.Get(ctx, rs1) | 
|  | require.NoError(t, err) | 
|  | require.NoError(t, cachedErr) | 
|  | cfg2, cachedErr, err := s.taskCfgCache.Get(ctx, rs2) | 
|  | require.NoError(t, err) | 
|  | require.NoError(t, cachedErr) | 
|  |  | 
|  | // Run on an empty job list, ensure empty list returned. | 
|  | test([]*types.Job{}, map[types.TaskKey]*TaskCandidate{}) | 
|  |  | 
|  | currentTime := now.Now(ctx).UTC() | 
|  |  | 
|  | // Run for one job, ensure that we get the right set of task specs | 
|  | // returned (ie. all dependencies and their dependencies). | 
|  | j1 := &types.Job{ | 
|  | Created:      currentTime, | 
|  | Id:           "job1id", | 
|  | Name:         tcc_testutils.TestTaskName, | 
|  | Dependencies: map[string][]string{tcc_testutils.TestTaskName: {tcc_testutils.BuildTaskName}, tcc_testutils.BuildTaskName: {}}, | 
|  | Priority:     0.5, | 
|  | RepoState:    rs1.Copy(), | 
|  | } | 
|  | tc1 := &TaskCandidate{ | 
|  | CasDigests: []string{tcc_testutils.CompileCASDigest}, | 
|  | Jobs:       []*types.Job{j1}, | 
|  | TaskKey: types.TaskKey{ | 
|  | RepoState: rs1.Copy(), | 
|  | Name:      tcc_testutils.BuildTaskName, | 
|  | }, | 
|  | TaskSpec: cfg1.Tasks[tcc_testutils.BuildTaskName].Copy(), | 
|  | } | 
|  | tc2 := &TaskCandidate{ | 
|  | CasDigests: []string{tcc_testutils.TestCASDigest}, | 
|  | Jobs:       []*types.Job{j1}, | 
|  | TaskKey: types.TaskKey{ | 
|  | RepoState: rs1.Copy(), | 
|  | Name:      tcc_testutils.TestTaskName, | 
|  | }, | 
|  | TaskSpec: cfg1.Tasks[tcc_testutils.TestTaskName].Copy(), | 
|  | } | 
|  |  | 
|  | test([]*types.Job{j1}, map[types.TaskKey]*TaskCandidate{ | 
|  | tc1.TaskKey: tc1, | 
|  | tc2.TaskKey: tc2, | 
|  | }) | 
|  |  | 
|  | // Add a job, ensure that its dependencies are added and that the right | 
|  | // dependencies are de-duplicated. | 
|  | j2 := &types.Job{ | 
|  | Created:      currentTime, | 
|  | Id:           "job2id", | 
|  | Name:         tcc_testutils.TestTaskName, | 
|  | Dependencies: map[string][]string{tcc_testutils.TestTaskName: {tcc_testutils.BuildTaskName}, tcc_testutils.BuildTaskName: {}}, | 
|  | Priority:     0.6, | 
|  | RepoState:    rs2, | 
|  | } | 
|  | j3 := &types.Job{ | 
|  | Created:      currentTime, | 
|  | Id:           "job3id", | 
|  | Name:         tcc_testutils.PerfTaskName, | 
|  | Dependencies: map[string][]string{tcc_testutils.PerfTaskName: {tcc_testutils.BuildTaskName}, tcc_testutils.BuildTaskName: {}}, | 
|  | Priority:     0.6, | 
|  | RepoState:    rs2, | 
|  | } | 
|  | tc3 := &TaskCandidate{ | 
|  | CasDigests: []string{tcc_testutils.CompileCASDigest}, | 
|  | Jobs:       []*types.Job{j2, j3}, | 
|  | TaskKey: types.TaskKey{ | 
|  | RepoState: rs2.Copy(), | 
|  | Name:      tcc_testutils.BuildTaskName, | 
|  | }, | 
|  | TaskSpec: cfg2.Tasks[tcc_testutils.BuildTaskName].Copy(), | 
|  | } | 
|  | tc4 := &TaskCandidate{ | 
|  | CasDigests: []string{tcc_testutils.TestCASDigest}, | 
|  | Jobs:       []*types.Job{j2}, | 
|  | TaskKey: types.TaskKey{ | 
|  | RepoState: rs2.Copy(), | 
|  | Name:      tcc_testutils.TestTaskName, | 
|  | }, | 
|  | TaskSpec: cfg2.Tasks[tcc_testutils.TestTaskName].Copy(), | 
|  | } | 
|  | tc5 := &TaskCandidate{ | 
|  | CasDigests: []string{tcc_testutils.PerfCASDigest}, | 
|  | Jobs:       []*types.Job{j3}, | 
|  | TaskKey: types.TaskKey{ | 
|  | RepoState: rs2.Copy(), | 
|  | Name:      tcc_testutils.PerfTaskName, | 
|  | }, | 
|  | TaskSpec: cfg2.Tasks[tcc_testutils.PerfTaskName].Copy(), | 
|  | } | 
|  | allCandidates := map[types.TaskKey]*TaskCandidate{ | 
|  | tc1.TaskKey: tc1, | 
|  | tc2.TaskKey: tc2, | 
|  | tc3.TaskKey: tc3, | 
|  | tc4.TaskKey: tc4, | 
|  | tc5.TaskKey: tc5, | 
|  | } | 
|  | test([]*types.Job{j1, j2, j3}, allCandidates) | 
|  |  | 
|  | // Finish j3, ensure that its task specs no longer show up. | 
|  | delete(allCandidates, j3.MakeTaskKey(tcc_testutils.PerfTaskName)) | 
|  | // This is hacky, but findTaskCandidatesForJobs accepts an already- | 
|  | // filtered list of jobs, so we have to pretend it never existed. | 
|  | tc3.Jobs = tc3.Jobs[:1] | 
|  | test([]*types.Job{j1, j2}, allCandidates) | 
|  |  | 
|  | // Ensure that we don't generate candidates for jobs at nonexistent commits. | 
|  | j4 := &types.Job{ | 
|  | Created:      currentTime, | 
|  | Id:           "job4id", | 
|  | Name:         tcc_testutils.PerfTaskName, | 
|  | Dependencies: map[string][]string{tcc_testutils.PerfTaskName: {tcc_testutils.BuildTaskName}, tcc_testutils.BuildTaskName: {}}, | 
|  | Priority:     0.6, | 
|  | RepoState: types.RepoState{ | 
|  | Repo:     rs2.Repo, | 
|  | Revision: "aaaaabbbbbcccccdddddeeeeefffff1111122222", | 
|  | }, | 
|  | } | 
|  | test([]*types.Job{j4}, map[types.TaskKey]*TaskCandidate{}) | 
|  | } | 
|  |  | 
|  | func TestFilterTaskCandidates(t *testing.T) { | 
|  | ctx, _, _, _, s, _, _, cleanup := setup(t) | 
|  | defer cleanup() | 
|  |  | 
|  | c1 := rs1.Revision | 
|  | c2 := rs2.Revision | 
|  |  | 
|  | // Fake out the initial candidates. | 
|  | k1 := types.TaskKey{ | 
|  | RepoState: rs1, | 
|  | Name:      tcc_testutils.BuildTaskName, | 
|  | } | 
|  | k2 := types.TaskKey{ | 
|  | RepoState: rs1, | 
|  | Name:      tcc_testutils.TestTaskName, | 
|  | } | 
|  | k3 := types.TaskKey{ | 
|  | RepoState: rs2, | 
|  | Name:      tcc_testutils.BuildTaskName, | 
|  | } | 
|  | k4 := types.TaskKey{ | 
|  | RepoState: rs2, | 
|  | Name:      tcc_testutils.TestTaskName, | 
|  | } | 
|  | k5 := types.TaskKey{ | 
|  | RepoState: rs2, | 
|  | Name:      tcc_testutils.PerfTaskName, | 
|  | } | 
|  | candidates := map[types.TaskKey]*TaskCandidate{ | 
|  | k1: { | 
|  | TaskKey:  k1, | 
|  | TaskSpec: &specs.TaskSpec{}, | 
|  | }, | 
|  | k2: { | 
|  | TaskKey: k2, | 
|  | TaskSpec: &specs.TaskSpec{ | 
|  | Dependencies: []string{tcc_testutils.BuildTaskName}, | 
|  | }, | 
|  | }, | 
|  | k3: { | 
|  | TaskKey:  k3, | 
|  | TaskSpec: &specs.TaskSpec{}, | 
|  | }, | 
|  | k4: { | 
|  | TaskKey: k4, | 
|  | TaskSpec: &specs.TaskSpec{ | 
|  | Dependencies: []string{tcc_testutils.BuildTaskName}, | 
|  | }, | 
|  | }, | 
|  | k5: { | 
|  | TaskKey: k5, | 
|  | TaskSpec: &specs.TaskSpec{ | 
|  | Dependencies: []string{tcc_testutils.BuildTaskName}, | 
|  | }, | 
|  | }, | 
|  | } | 
|  |  | 
|  | clearDiagnostics := func(candidates map[types.TaskKey]*TaskCandidate) { | 
|  | for _, c := range candidates { | 
|  | c.Diagnostics = nil | 
|  | } | 
|  | } | 
|  |  | 
|  | // Check the initial set of task candidates. The two Build tasks | 
|  | // should be the only ones available. | 
|  | c, err := s.filterTaskCandidates(ctx, candidates) | 
|  | require.NoError(t, err) | 
|  | require.Len(t, c, 1) | 
|  | require.Equal(t, 1, len(c[rs1.Repo])) | 
|  | require.Equal(t, 2, len(c[rs1.Repo][tcc_testutils.BuildTaskName])) | 
|  | for _, byRepo := range c { | 
|  | for _, byName := range byRepo { | 
|  | for _, candidate := range byName { | 
|  | require.Equal(t, candidate.Name, tcc_testutils.BuildTaskName) | 
|  | } | 
|  | } | 
|  | } | 
|  | // Check filtering diagnostics. Non-Build tasks have unmet dependencies. | 
|  | for _, candidate := range candidates { | 
|  | if candidate.Name != tcc_testutils.BuildTaskName { | 
|  | require.Equal(t, candidate.Diagnostics.Filtering.UnmetDependencies, []string{tcc_testutils.BuildTaskName}) | 
|  | } | 
|  | } | 
|  |  | 
|  | // Insert a the Build task at c1 (1 dependent) into the database, | 
|  | // transition through various states. | 
|  | var t1 *types.Task | 
|  | for _, byRepo := range c { // Order not guaranteed, find the right candidate. | 
|  | for _, byName := range byRepo { | 
|  | for _, candidate := range byName { | 
|  | if candidate.Revision == c1 { | 
|  | t1 = makeTask(ctx, candidate.Name, candidate.Repo, candidate.Revision) | 
|  | break | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  | require.NotNil(t, t1) | 
|  |  | 
|  | // We shouldn't duplicate pending or running tasks. | 
|  | for _, status := range []types.TaskStatus{types.TASK_STATUS_PENDING, types.TASK_STATUS_RUNNING} { | 
|  | clearDiagnostics(candidates) | 
|  |  | 
|  | t1.Status = status | 
|  | require.NoError(t, s.putTask(ctx, t1)) | 
|  |  | 
|  | c, err = s.filterTaskCandidates(ctx, candidates) | 
|  | require.NoError(t, err) | 
|  | require.Len(t, c, 1) | 
|  | for _, byRepo := range c { | 
|  | for _, byName := range byRepo { | 
|  | for _, candidate := range byName { | 
|  | require.Equal(t, candidate.Name, tcc_testutils.BuildTaskName) | 
|  | require.Equal(t, c2, candidate.Revision) | 
|  | } | 
|  | } | 
|  | } | 
|  | // Check filtering diagnostics. | 
|  | for _, candidate := range candidates { | 
|  | if candidate.Name != tcc_testutils.BuildTaskName { | 
|  | // Non-Build tasks have unmet dependencies. | 
|  | require.Equal(t, candidate.Diagnostics.Filtering.UnmetDependencies, []string{tcc_testutils.BuildTaskName}) | 
|  | } else if candidate.Revision == c1 { | 
|  | // Blocked by t1 | 
|  | require.Equal(t, candidate.Diagnostics.Filtering.SupersededByTask, t1.Id) | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | clearDiagnostics(candidates) | 
|  |  | 
|  | // The task failed. Ensure that its dependents are not candidates, but | 
|  | // the task itself is back in the list of candidates, in case we want | 
|  | // to retry. | 
|  | t1.Status = types.TASK_STATUS_FAILURE | 
|  | require.NoError(t, s.putTask(ctx, t1)) | 
|  |  | 
|  | c, err = s.filterTaskCandidates(ctx, candidates) | 
|  | require.NoError(t, err) | 
|  | require.Len(t, c, 1) | 
|  | for _, byRepo := range c { | 
|  | require.Len(t, byRepo, 1) | 
|  | for _, byName := range byRepo { | 
|  | require.Len(t, byName, 2) | 
|  | for _, candidate := range byName { | 
|  | require.Equal(t, candidate.Name, tcc_testutils.BuildTaskName) | 
|  | } | 
|  | } | 
|  | } | 
|  | // Check filtering diagnostics. | 
|  | for _, candidate := range candidates { | 
|  | if candidate.Name != tcc_testutils.BuildTaskName { | 
|  | // Non-Build tasks have unmet dependencies. | 
|  | require.Equal(t, candidate.Diagnostics.Filtering.UnmetDependencies, []string{tcc_testutils.BuildTaskName}) | 
|  | } | 
|  | } | 
|  |  | 
|  | clearDiagnostics(candidates) | 
|  |  | 
|  | // The task succeeded. Ensure that its dependents are candidates and | 
|  | // the task itself is not. | 
|  | t1.Status = types.TASK_STATUS_SUCCESS | 
|  | t1.IsolatedOutput = "fake isolated hash" | 
|  | require.NoError(t, s.putTask(ctx, t1)) | 
|  |  | 
|  | c, err = s.filterTaskCandidates(ctx, candidates) | 
|  | require.NoError(t, err) | 
|  | require.Len(t, c, 1) | 
|  | for _, byRepo := range c { | 
|  | require.Len(t, byRepo, 2) | 
|  | for _, byName := range byRepo { | 
|  | for _, candidate := range byName { | 
|  | require.False(t, t1.Name == candidate.Name && t1.Revision == candidate.Revision) | 
|  | } | 
|  | } | 
|  | } | 
|  | // Candidate with k1 is blocked by t1. | 
|  | require.Equal(t, candidates[k1].Diagnostics.Filtering.SupersededByTask, t1.Id) | 
|  |  | 
|  | clearDiagnostics(candidates) | 
|  |  | 
|  | // Create the other Build task. | 
|  | var t2 *types.Task | 
|  | for _, byRepo := range c { | 
|  | for _, byName := range byRepo { | 
|  | for _, candidate := range byName { | 
|  | if candidate.Revision == c2 && strings.HasPrefix(candidate.Name, "Build-") { | 
|  | t2 = makeTask(ctx, candidate.Name, candidate.Repo, candidate.Revision) | 
|  | break | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  | require.NotNil(t, t2) | 
|  | t2.Status = types.TASK_STATUS_SUCCESS | 
|  | t2.IsolatedOutput = "fake isolated hash" | 
|  | require.NoError(t, s.putTask(ctx, t2)) | 
|  |  | 
|  | // All test and perf tasks are now candidates, no build tasks. | 
|  | c, err = s.filterTaskCandidates(ctx, candidates) | 
|  | require.NoError(t, err) | 
|  | require.Len(t, c, 1) | 
|  | require.Equal(t, 2, len(c[rs1.Repo][tcc_testutils.TestTaskName])) | 
|  | require.Equal(t, 1, len(c[rs1.Repo][tcc_testutils.PerfTaskName])) | 
|  | for _, byRepo := range c { | 
|  | for _, byName := range byRepo { | 
|  | for _, candidate := range byName { | 
|  | require.NotEqual(t, candidate.Name, tcc_testutils.BuildTaskName) | 
|  | } | 
|  | } | 
|  | } | 
|  | // Build candidates are blocked by completed tasks. | 
|  | require.Equal(t, candidates[k1].Diagnostics.Filtering.SupersededByTask, t1.Id) | 
|  | require.Equal(t, candidates[k3].Diagnostics.Filtering.SupersededByTask, t2.Id) | 
|  |  | 
|  | clearDiagnostics(candidates) | 
|  |  | 
|  | // Add a try job. Ensure that no deps have been incorrectly satisfied. | 
|  | tryKey := k4.Copy() | 
|  | tryKey.Server = "dummy-server" | 
|  | tryKey.Issue = "dummy-issue" | 
|  | tryKey.Patchset = "dummy-patchset" | 
|  | candidates[tryKey] = &TaskCandidate{ | 
|  | TaskKey: tryKey, | 
|  | TaskSpec: &specs.TaskSpec{ | 
|  | Dependencies: []string{tcc_testutils.BuildTaskName}, | 
|  | }, | 
|  | } | 
|  | c, err = s.filterTaskCandidates(ctx, candidates) | 
|  | require.NoError(t, err) | 
|  | require.Len(t, c, 1) | 
|  | require.Equal(t, 2, len(c[rs1.Repo][tcc_testutils.TestTaskName])) | 
|  | require.Equal(t, 1, len(c[rs1.Repo][tcc_testutils.PerfTaskName])) | 
|  | for _, byRepo := range c { | 
|  | for _, byName := range byRepo { | 
|  | for _, candidate := range byName { | 
|  | require.NotEqual(t, candidate.Name, tcc_testutils.BuildTaskName) | 
|  | require.False(t, candidate.IsTryJob()) | 
|  | } | 
|  | } | 
|  | } | 
|  | // Check diagnostics for tryKey | 
|  | require.Equal(t, candidates[tryKey].Diagnostics.Filtering.UnmetDependencies, []string{tcc_testutils.BuildTaskName}) | 
|  | } | 
|  |  | 
|  | // processTaskCandidate is a helper function for processing a single task | 
|  | // candidate. | 
|  | func processTaskCandidate(ctx context.Context, s *TaskScheduler, c *TaskCandidate) error { | 
|  | candidates := map[string]map[string][]*TaskCandidate{ | 
|  | c.Repo: { | 
|  | c.Name: []*TaskCandidate{c}, | 
|  | }, | 
|  | } | 
|  | _, err := s.processTaskCandidates(ctx, candidates) | 
|  | return err | 
|  | } | 
|  |  | 
|  | func TestProcessTaskCandidate(t *testing.T) { | 
|  | _, _, _, _, s, _, _, cleanup := setup(t) | 
|  | defer cleanup() | 
|  |  | 
|  | currentTime := time.Unix(0, 1470674884000000) | 
|  | ctx := now.TimeTravelingContext(currentTime) | 
|  |  | 
|  | checkDiagTryForced := func(c *TaskCandidate) { | 
|  | require.NotNil(t, c.Diagnostics) | 
|  | diag := c.Diagnostics.Scoring | 
|  | require.NotNil(t, diag) | 
|  | require.Equal(t, c.Jobs[0].Priority, diag.Priority) | 
|  | require.Equal(t, currentTime.Sub(c.Jobs[0].Created).Hours(), diag.JobCreatedHours) | 
|  | // The remaining fields should always be 0 for try/forced jobs. | 
|  | require.Equal(t, 0, diag.StoleFromCommits) | 
|  | require.Equal(t, 0.0, diag.TestednessIncrease) | 
|  | require.Equal(t, 0.0, diag.TimeDecay) | 
|  | } | 
|  |  | 
|  | tryjobRs := types.RepoState{ | 
|  | Patch: types.Patch{ | 
|  | Server:   "my-server", | 
|  | Issue:    "my-issue", | 
|  | Patchset: "my-patchset", | 
|  | }, | 
|  | Repo:     rs1.Repo, | 
|  | Revision: rs1.Revision, | 
|  | } | 
|  | tryjob := &types.Job{ | 
|  | Id:        "tryjobId", | 
|  | Created:   currentTime.Add(-1 * time.Hour), | 
|  | Name:      "job", | 
|  | Priority:  0.5, | 
|  | RepoState: tryjobRs, | 
|  | } | 
|  | c := &TaskCandidate{ | 
|  | Jobs: []*types.Job{tryjob}, | 
|  | TaskKey: types.TaskKey{ | 
|  | Name:      tcc_testutils.BuildTaskName, | 
|  | RepoState: tryjobRs, | 
|  | }, | 
|  | TaskSpec: tcc_testutils.TasksCfg1.Tasks[tcc_testutils.BuildTaskName], | 
|  | } | 
|  | require.NoError(t, processTaskCandidate(ctx, s, c)) | 
|  | // Try job candidates have a specific score and no blamelist. | 
|  | require.InDelta(t, (CANDIDATE_SCORE_TRY_JOB+1.0)*0.5, c.Score, scoreDelta) | 
|  | require.Nil(t, c.Commits) | 
|  | checkDiagTryForced(c) | 
|  |  | 
|  | // Retries are scored lower. | 
|  | c = &TaskCandidate{ | 
|  | Attempt: 1, | 
|  | Jobs:    []*types.Job{tryjob}, | 
|  | TaskKey: types.TaskKey{ | 
|  | Name:      tcc_testutils.BuildTaskName, | 
|  | RepoState: tryjobRs, | 
|  | }, | 
|  | TaskSpec: tcc_testutils.TasksCfg1.Tasks[tcc_testutils.BuildTaskName], | 
|  | } | 
|  | require.NoError(t, processTaskCandidate(ctx, s, c)) | 
|  | require.InDelta(t, (CANDIDATE_SCORE_TRY_JOB+1.0)*0.5*CANDIDATE_SCORE_TRY_JOB_RETRY_MULTIPLIER, c.Score, scoreDelta) | 
|  | require.Nil(t, c.Commits) | 
|  | checkDiagTryForced(c) | 
|  |  | 
|  | forcedJob := &types.Job{ | 
|  | Id:        "forcedJobId", | 
|  | Created:   currentTime.Add(-2 * time.Hour), | 
|  | Name:      tcc_testutils.BuildTaskName, | 
|  | Priority:  0.5, | 
|  | RepoState: rs2, | 
|  | } | 
|  | // Manually forced candidates have a blamelist and a specific score. | 
|  | c = &TaskCandidate{ | 
|  | Jobs: []*types.Job{forcedJob}, | 
|  | TaskKey: types.TaskKey{ | 
|  | Name:        tcc_testutils.BuildTaskName, | 
|  | RepoState:   rs2, | 
|  | ForcedJobId: forcedJob.Id, | 
|  | }, | 
|  | TaskSpec: tcc_testutils.TasksCfg2.Tasks[tcc_testutils.BuildTaskName], | 
|  | } | 
|  | require.NoError(t, processTaskCandidate(ctx, s, c)) | 
|  | require.InDelta(t, (CANDIDATE_SCORE_FORCE_RUN+2.0)*0.5, c.Score, scoreDelta) | 
|  | require.Equal(t, 2, len(c.Commits)) | 
|  | checkDiagTryForced(c) | 
|  |  | 
|  | // All other candidates have a blamelist and a time-decayed score. | 
|  | regularJob := &types.Job{ | 
|  | Id:        "regularJobId", | 
|  | Created:   currentTime.Add(-1 * time.Hour), | 
|  | Name:      tcc_testutils.BuildTaskName, | 
|  | Priority:  0.5, | 
|  | RepoState: rs2, | 
|  | } | 
|  | c = &TaskCandidate{ | 
|  | Jobs: []*types.Job{regularJob}, | 
|  | TaskKey: types.TaskKey{ | 
|  | Name:      tcc_testutils.BuildTaskName, | 
|  | RepoState: rs2, | 
|  | }, | 
|  | TaskSpec: tcc_testutils.TasksCfg2.Tasks[tcc_testutils.BuildTaskName], | 
|  | } | 
|  | require.NoError(t, processTaskCandidate(ctx, s, c)) | 
|  | require.True(t, c.Score > 0) | 
|  | require.Equal(t, 2, len(c.Commits)) | 
|  | diag := c.GetDiagnostics().Scoring | 
|  | require.Equal(t, 0.5, diag.Priority) | 
|  | require.Equal(t, 1.0, diag.JobCreatedHours) | 
|  | require.Equal(t, 0, diag.StoleFromCommits) | 
|  | require.Equal(t, 3.5, diag.TestednessIncrease) | 
|  | require.InDelta(t, 1.0, diag.TimeDecay, scoreDelta) | 
|  | } | 
|  |  | 
|  | func TestRegularJobRetryScoring(t *testing.T) { | 
|  | _, _, _, _, s, _, _, cleanup := setup(t) | 
|  | defer cleanup() | 
|  |  | 
|  | ctx := now.TimeTravelingContext(time.Date(2021, time.October, 15, 0, 0, 0, 0, time.UTC)) | 
|  | currentTime := now.Now(ctx) | 
|  |  | 
|  | checkDiag := func(c *TaskCandidate) { | 
|  | require.NotNil(t, c.Diagnostics) | 
|  | diag := c.Diagnostics.Scoring | 
|  | require.NotNil(t, diag) | 
|  | // All candidates in this test have a single Job. | 
|  | require.Equal(t, c.Jobs[0].Priority, diag.Priority) | 
|  | require.InDelta(t, currentTime.Sub(c.Jobs[0].Created).Hours(), diag.JobCreatedHours, scoreDelta) | 
|  | // The commits are added close enough to "now" that there is no time decay. | 
|  | require.Equal(t, 1.0, diag.TimeDecay) | 
|  | } | 
|  |  | 
|  | j1 := &types.Job{ | 
|  | Id:        "regularJobId1", | 
|  | Created:   currentTime.Add(-1 * time.Hour), | 
|  | Name:      tcc_testutils.BuildTaskName, | 
|  | Priority:  0.5, | 
|  | RepoState: rs1, | 
|  | } | 
|  | j2 := &types.Job{ | 
|  | Id:        "regularJobId2", | 
|  | Created:   currentTime.Add(-1 * time.Hour), | 
|  | Name:      tcc_testutils.BuildTaskName, | 
|  | Priority:  0.5, | 
|  | RepoState: rs2, | 
|  | } | 
|  | // Candidates at rs1 and rs2 | 
|  | c1 := &TaskCandidate{ | 
|  | Jobs: []*types.Job{j1}, | 
|  | TaskKey: types.TaskKey{ | 
|  | Name:      tcc_testutils.BuildTaskName, | 
|  | RepoState: rs1, | 
|  | }, | 
|  | TaskSpec: tcc_testutils.TasksCfg1.Tasks[tcc_testutils.BuildTaskName], | 
|  | } | 
|  | c2 := &TaskCandidate{ | 
|  | Jobs: []*types.Job{j2}, | 
|  | TaskKey: types.TaskKey{ | 
|  | Name:      tcc_testutils.BuildTaskName, | 
|  | RepoState: rs2, | 
|  | }, | 
|  | TaskSpec: tcc_testutils.TasksCfg2.Tasks[tcc_testutils.BuildTaskName], | 
|  | } | 
|  | // Regular task at HEAD with 2 commits has score 3.5 scaled by priority 0.5. | 
|  | require.NoError(t, processTaskCandidate(ctx, s, c2)) | 
|  | require.InDelta(t, 3.5*0.5, c2.Score, scoreDelta) | 
|  | require.Equal(t, 2, len(c2.Commits)) | 
|  | diag := c2.GetDiagnostics().Scoring | 
|  | require.Equal(t, 0, diag.StoleFromCommits) | 
|  | require.Equal(t, 3.5, diag.TestednessIncrease) | 
|  | checkDiag(c2) | 
|  | // Regular task at HEAD^ (no backfill) with 1 commit has score 2 scaled by | 
|  | // priority 0.5. | 
|  | require.NoError(t, processTaskCandidate(ctx, s, c1)) | 
|  | diag = c1.GetDiagnostics().Scoring | 
|  | require.InDelta(t, 2*0.5, c1.Score, scoreDelta) | 
|  | require.Equal(t, 1, len(c1.Commits)) | 
|  | require.Equal(t, 0, diag.StoleFromCommits) | 
|  | require.Equal(t, 2.0, diag.TestednessIncrease) | 
|  | checkDiag(c1) | 
|  |  | 
|  | // Add a task at rs2 that failed. | 
|  | t2 := makeTask(ctx, c2.Name, c2.Repo, c2.Revision) | 
|  | t2.Status = types.TASK_STATUS_FAILURE | 
|  | t2.Commits = util.CopyStringSlice(c2.Commits) | 
|  | require.NoError(t, s.putTask(ctx, t2)) | 
|  |  | 
|  | // Update Attempt and RetryOf before calling processTaskCandidate. | 
|  | c2.Attempt = 1 | 
|  | c2.RetryOf = t2.Id | 
|  |  | 
|  | // Retry task at rs2 with 2 commits for 2nd of 2 attempts has score 0.75 | 
|  | // scaled by priority 0.5. | 
|  | require.NoError(t, processTaskCandidate(ctx, s, c2)) | 
|  | require.InDelta(t, 0.75*0.5, c2.Score, scoreDelta) | 
|  | require.Equal(t, 2, len(c2.Commits)) | 
|  | diag = c2.GetDiagnostics().Scoring | 
|  | require.Equal(t, 2, diag.StoleFromCommits) | 
|  | require.Equal(t, 0.0, diag.TestednessIncrease) | 
|  | checkDiag(c2) | 
|  | // Regular task at rs1 (backfilling failed task) with 1 commit has score 1.25 | 
|  | // scaled by priority 0.5. | 
|  | diag = &taskCandidateScoringDiagnostics{} | 
|  | require.NoError(t, processTaskCandidate(ctx, s, c1)) | 
|  | diag = c1.GetDiagnostics().Scoring | 
|  | require.InDelta(t, 1.25*0.5, c1.Score, scoreDelta) | 
|  | require.Equal(t, 1, len(c1.Commits)) | 
|  | require.Equal(t, 2, diag.StoleFromCommits) | 
|  | require.Equal(t, 0.5, diag.TestednessIncrease) | 
|  | checkDiag(c1) | 
|  |  | 
|  | // Actually, the task at rs2 had a mishap. | 
|  | t2.Status = types.TASK_STATUS_MISHAP | 
|  | require.NoError(t, s.putTask(ctx, t2)) | 
|  |  | 
|  | // Scores should be same as for FAILURE. | 
|  | require.NoError(t, processTaskCandidate(ctx, s, c2)) | 
|  | diag = c2.GetDiagnostics().Scoring | 
|  | require.InDelta(t, 0.75*0.5, c2.Score, scoreDelta) | 
|  | require.Equal(t, 2, len(c2.Commits)) | 
|  | require.Equal(t, 2, diag.StoleFromCommits) | 
|  | require.Equal(t, 0.0, diag.TestednessIncrease) | 
|  | checkDiag(c2) | 
|  | require.NoError(t, processTaskCandidate(ctx, s, c1)) | 
|  | diag = c1.GetDiagnostics().Scoring | 
|  | require.InDelta(t, 1.25*0.5, c1.Score, scoreDelta) | 
|  | require.Equal(t, 1, len(c1.Commits)) | 
|  | require.Equal(t, 2, diag.StoleFromCommits) | 
|  | require.Equal(t, 0.5, diag.TestednessIncrease) | 
|  | checkDiag(c1) | 
|  | } | 
|  |  | 
|  | func TestProcessTaskCandidates(t *testing.T) { | 
|  | ctx, _, _, _, s, _, _, cleanup := setup(t) | 
|  | defer cleanup() | 
|  |  | 
|  | ts := now.Now(ctx) | 
|  |  | 
|  | // Processing of individual candidates is already tested; just verify | 
|  | // that if we pass in a bunch of candidates they all get processed. | 
|  | // The JobSpecs do not specify priority, so they use the default of 0.5. | 
|  | assertProcessed := func(c *TaskCandidate) { | 
|  | if c.IsTryJob() { | 
|  | require.True(t, c.Score > CANDIDATE_SCORE_TRY_JOB*0.5) | 
|  | require.Nil(t, c.Commits) | 
|  | } else if c.IsForceRun() { | 
|  | require.True(t, c.Score > CANDIDATE_SCORE_FORCE_RUN*0.5) | 
|  | require.Equal(t, 2, len(c.Commits)) | 
|  | } else if c.Revision == rs2.Revision { | 
|  | if c.Name == tcc_testutils.PerfTaskName { | 
|  | require.InDelta(t, 2.0*0.5, c.Score, scoreDelta) | 
|  | require.Equal(t, 1, len(c.Commits)) | 
|  | } else if c.Name == tcc_testutils.BuildTaskName { | 
|  | // Already covered by the forced job, so zero score. | 
|  | require.InDelta(t, 0, c.Score, scoreDelta) | 
|  | // Scores below the BuildTask at rs1, so it has a blamelist of 1 commit. | 
|  | require.Equal(t, 1, len(c.Commits)) | 
|  | } else { | 
|  | require.InDelta(t, 3.5*0.5, c.Score, scoreDelta) | 
|  | require.Equal(t, 2, len(c.Commits)) | 
|  | } | 
|  | } else { | 
|  | require.InDelta(t, 0.5*0.5, c.Score, scoreDelta) // These will be backfills. | 
|  | require.Equal(t, 1, len(c.Commits)) | 
|  | } | 
|  | } | 
|  |  | 
|  | testJob1 := &types.Job{ | 
|  | Id:        "testJob1", | 
|  | Created:   ts, | 
|  | Name:      tcc_testutils.TestTaskName, | 
|  | Priority:  0.5, | 
|  | RepoState: rs1, | 
|  | } | 
|  | testJob2 := &types.Job{ | 
|  | Id:        "testJob2", | 
|  | Created:   ts, | 
|  | Name:      tcc_testutils.TestTaskName, | 
|  | Priority:  0.5, | 
|  | RepoState: rs2, | 
|  | } | 
|  | perfJob2 := &types.Job{ | 
|  | Id:        "perfJob2", | 
|  | Created:   ts, | 
|  | Name:      tcc_testutils.PerfTaskName, | 
|  | Priority:  0.5, | 
|  | RepoState: rs2, | 
|  | } | 
|  | forcedBuildJob2 := &types.Job{ | 
|  | Id:        "forcedBuildJob2", | 
|  | Created:   ts, | 
|  | Name:      tcc_testutils.BuildTaskName, | 
|  | Priority:  0.5, | 
|  | RepoState: rs2, | 
|  | } | 
|  | tryjobRs := types.RepoState{ | 
|  | Patch: types.Patch{ | 
|  | Server:   "my-server", | 
|  | Issue:    "my-issue", | 
|  | Patchset: "my-patchset", | 
|  | }, | 
|  | Repo:     rs1.Repo, | 
|  | Revision: rs1.Revision, | 
|  | } | 
|  | perfTryjob2 := &types.Job{ | 
|  | Id:        "perfJob2", | 
|  | Created:   ts, | 
|  | Name:      tcc_testutils.PerfTaskName, | 
|  | Priority:  0.5, | 
|  | RepoState: tryjobRs, | 
|  | } | 
|  |  | 
|  | candidates := map[string]map[string][]*TaskCandidate{ | 
|  | rs1.Repo: { | 
|  | tcc_testutils.BuildTaskName: { | 
|  | { | 
|  | Jobs: []*types.Job{testJob1}, | 
|  | TaskKey: types.TaskKey{ | 
|  | RepoState: rs1, | 
|  | Name:      tcc_testutils.BuildTaskName, | 
|  | }, | 
|  | TaskSpec: &specs.TaskSpec{}, | 
|  | }, | 
|  | { | 
|  | Jobs: []*types.Job{testJob2, perfJob2}, | 
|  | TaskKey: types.TaskKey{ | 
|  | RepoState: rs2, | 
|  | Name:      tcc_testutils.BuildTaskName, | 
|  | }, | 
|  | TaskSpec: &specs.TaskSpec{}, | 
|  | }, | 
|  | { | 
|  | Jobs: []*types.Job{forcedBuildJob2}, | 
|  | TaskKey: types.TaskKey{ | 
|  | RepoState:   rs2, | 
|  | Name:        tcc_testutils.BuildTaskName, | 
|  | ForcedJobId: forcedBuildJob2.Id, | 
|  | }, | 
|  | TaskSpec: &specs.TaskSpec{}, | 
|  | }, | 
|  | }, | 
|  | tcc_testutils.TestTaskName: { | 
|  | { | 
|  | Jobs: []*types.Job{testJob1}, | 
|  | TaskKey: types.TaskKey{ | 
|  | RepoState: rs1, | 
|  | Name:      tcc_testutils.TestTaskName, | 
|  | }, | 
|  | TaskSpec: &specs.TaskSpec{}, | 
|  | }, | 
|  | { | 
|  | Jobs: []*types.Job{testJob2}, | 
|  | TaskKey: types.TaskKey{ | 
|  | RepoState: rs2, | 
|  | Name:      tcc_testutils.TestTaskName, | 
|  | }, | 
|  | TaskSpec: &specs.TaskSpec{}, | 
|  | }, | 
|  | }, | 
|  | tcc_testutils.PerfTaskName: { | 
|  | { | 
|  | Jobs: []*types.Job{perfJob2}, | 
|  | TaskKey: types.TaskKey{ | 
|  | RepoState: rs2, | 
|  | Name:      tcc_testutils.PerfTaskName, | 
|  | }, | 
|  | TaskSpec: &specs.TaskSpec{}, | 
|  | }, | 
|  | { | 
|  | Jobs: []*types.Job{perfTryjob2}, | 
|  | TaskKey: types.TaskKey{ | 
|  | RepoState: tryjobRs, | 
|  | Name:      tcc_testutils.PerfTaskName, | 
|  | }, | 
|  | TaskSpec: &specs.TaskSpec{}, | 
|  | }, | 
|  | }, | 
|  | }, | 
|  | } | 
|  |  | 
|  | processed, err := s.processTaskCandidates(ctx, candidates) | 
|  | require.NoError(t, err) | 
|  | require.Len(t, processed, 7) | 
|  | for _, c := range processed { | 
|  | assertProcessed(c) | 
|  | } | 
|  | } | 
|  |  | 
|  | func TestTestedness(t *testing.T) { | 
|  | tc := []struct { | 
|  | in  int | 
|  | out float64 | 
|  | }{ | 
|  | { | 
|  | in:  -1, | 
|  | out: -1.0, | 
|  | }, | 
|  | { | 
|  | in:  0, | 
|  | out: 0.0, | 
|  | }, | 
|  | { | 
|  | in:  1, | 
|  | out: 1.0, | 
|  | }, | 
|  | { | 
|  | in:  2, | 
|  | out: 1.0 + 1.0/2.0, | 
|  | }, | 
|  | { | 
|  | in:  3, | 
|  | out: 1.0 + float64(2.0)/float64(3.0), | 
|  | }, | 
|  | { | 
|  | in:  4, | 
|  | out: 1.0 + 3.0/4.0, | 
|  | }, | 
|  | { | 
|  | in:  4096, | 
|  | out: 1.0 + float64(4095)/float64(4096), | 
|  | }, | 
|  | } | 
|  | for i, c := range tc { | 
|  | require.Equal(t, c.out, testedness(c.in), fmt.Sprintf("test case #%d", i)) | 
|  | } | 
|  | } | 
|  |  | 
|  | func TestTestednessIncrease(t *testing.T) { | 
|  | tc := []struct { | 
|  | a   int | 
|  | b   int | 
|  | out float64 | 
|  | }{ | 
|  | // Invalid cases. | 
|  | { | 
|  | a:   -1, | 
|  | b:   10, | 
|  | out: -1.0, | 
|  | }, | 
|  | { | 
|  | a:   10, | 
|  | b:   -1, | 
|  | out: -1.0, | 
|  | }, | 
|  | { | 
|  | a:   0, | 
|  | b:   -1, | 
|  | out: -1.0, | 
|  | }, | 
|  | { | 
|  | a:   0, | 
|  | b:   0, | 
|  | out: -1.0, | 
|  | }, | 
|  | // Invalid because if we're re-running at already-tested commits | 
|  | // then we should have a blamelist which is at most the size of | 
|  | // the blamelist of the previous task. We naturally get negative | 
|  | // testedness increase in these cases. | 
|  | { | 
|  | a:   2, | 
|  | b:   1, | 
|  | out: -0.5, | 
|  | }, | 
|  | // Testing only new commits. | 
|  | { | 
|  | a:   1, | 
|  | b:   0, | 
|  | out: 1.0 + 1.0, | 
|  | }, | 
|  | { | 
|  | a:   2, | 
|  | b:   0, | 
|  | out: 2.0 + (1.0 + 1.0/2.0), | 
|  | }, | 
|  | { | 
|  | a:   3, | 
|  | b:   0, | 
|  | out: 3.0 + (1.0 + float64(2.0)/float64(3.0)), | 
|  | }, | 
|  | { | 
|  | a:   4096, | 
|  | b:   0, | 
|  | out: 4096.0 + (1.0 + float64(4095.0)/float64(4096.0)), | 
|  | }, | 
|  | // Retries. | 
|  | { | 
|  | a:   1, | 
|  | b:   1, | 
|  | out: 0.0, | 
|  | }, | 
|  | { | 
|  | a:   2, | 
|  | b:   2, | 
|  | out: 0.0, | 
|  | }, | 
|  | { | 
|  | a:   3, | 
|  | b:   3, | 
|  | out: 0.0, | 
|  | }, | 
|  | { | 
|  | a:   4096, | 
|  | b:   4096, | 
|  | out: 0.0, | 
|  | }, | 
|  | // Bisect/backfills. | 
|  | { | 
|  | a:   1, | 
|  | b:   2, | 
|  | out: 0.5, // (1 + 1) - (1 + 1/2) | 
|  | }, | 
|  | { | 
|  | a:   1, | 
|  | b:   3, | 
|  | out: float64(2.5) - (1.0 + float64(2.0)/float64(3.0)), | 
|  | }, | 
|  | { | 
|  | a:   5, | 
|  | b:   10, | 
|  | out: 2.0*(1.0+float64(4.0)/float64(5.0)) - (1.0 + float64(9.0)/float64(10.0)), | 
|  | }, | 
|  | } | 
|  | for i, c := range tc { | 
|  | require.Equal(t, c.out, testednessIncrease(c.a, c.b), fmt.Sprintf("test case #%d", i)) | 
|  | } | 
|  | } | 
|  |  | 
|  | func TestComputeBlamelist(t *testing.T) { | 
|  |  | 
|  | // Setup. | 
|  | nowCtx := now.TimeTravelingContext(mem_git.BaseTime.Add(time.Hour)) | 
|  | ctx, cancel := context.WithCancel(nowCtx) | 
|  | defer cancel() | 
|  |  | 
|  | mg, repo := newMemRepo(t) | 
|  | repos := repograph.Map{ | 
|  | rs1.Repo: repo, | 
|  | } | 
|  |  | 
|  | d := memory.NewInMemoryTaskDB() | 
|  | w, err := window.New(ctx, 2*time.Hour, 0, nil) | 
|  | cache, err := cache.NewTaskCache(ctx, d, w, nil) | 
|  | require.NoError(t, err) | 
|  |  | 
|  | btProject, btInstance, btCleanup := tcc_testutils.SetupBigTable(t) | 
|  | defer btCleanup() | 
|  | tcc, err := task_cfg_cache.NewTaskCfgCache(ctx, repos, btProject, btInstance, nil) | 
|  | require.NoError(t, err) | 
|  |  | 
|  | // The test repo is laid out like this: | 
|  | // *   T (HEAD, main, Case #12) | 
|  | // *   S (Time travel commit; before the start of the window) | 
|  | // *   R (Case #11) | 
|  | // |\ | 
|  | // * | Q | 
|  | // | * P | 
|  | // |/ | 
|  | // *   O (Case #9) | 
|  | // *   N | 
|  | // *   M (Case #10) | 
|  | // *   L | 
|  | // *   K (Case #6) | 
|  | // *   J (Case #5) | 
|  | // |\ | 
|  | // | * I | 
|  | // | * H (Case #4) | 
|  | // * | G | 
|  | // * | F (Case #3) | 
|  | // * | E (Case #8, previously #7) | 
|  | // |/ | 
|  | // *   D (Case #2) | 
|  | // *   C (Case #1) | 
|  | // ... | 
|  | // *   B (Case #0) | 
|  | // *   A | 
|  | // *   _ (No TasksCfg; blamelists shouldn't include this) | 
|  | // | 
|  | hashes := map[string]string{} | 
|  | name := "Test-Ubuntu12-ShuttleA-GTX660-x86-Release" | 
|  | taskCfg := &specs.TasksCfg{ | 
|  | Tasks: map[string]*specs.TaskSpec{ | 
|  | name: {}, | 
|  | }, | 
|  | } | 
|  | commit := func(name string) { | 
|  | hashes[name] = mg.Commit(name) | 
|  | require.NoError(t, tcc.Set(ctx, types.RepoState{ | 
|  | Repo:     rs1.Repo, | 
|  | Revision: hashes[name], | 
|  | }, taskCfg, nil)) | 
|  | } | 
|  |  | 
|  | // Initial commit. | 
|  | hashes["_"] = mg.Commit("_") | 
|  |  | 
|  | // First commit containing TasksCfg. | 
|  | commit("A") | 
|  |  | 
|  | type testCase struct { | 
|  | Revision     string | 
|  | Expected     []string | 
|  | StoleFromIdx int | 
|  | TaskName     string | 
|  | } | 
|  |  | 
|  | ids := []string{} | 
|  | commitsBuf := make([]*repograph.Commit, 0, MAX_BLAMELIST_COMMITS) | 
|  | test := func(tc *testCase) { | 
|  | // Update the repo. | 
|  | require.NoError(t, repo.Update(ctx)) | 
|  | // Self-check: make sure we don't pass in empty commit hashes. | 
|  | for _, h := range tc.Expected { | 
|  | require.NotEqual(t, h, "") | 
|  | } | 
|  |  | 
|  | // Ensure that we get the expected blamelist. | 
|  | revision := repo.Get(tc.Revision) | 
|  | require.NotNil(t, revision) | 
|  | taskName := tc.TaskName | 
|  | if taskName == "" { | 
|  | taskName = name | 
|  | } | 
|  | commits, stoleFrom, err := ComputeBlamelist(ctx, cache, repo, taskName, rs1.Repo, revision, commitsBuf, tcc, w) | 
|  | if tc.Revision == "" { | 
|  | require.Error(t, err) | 
|  | return | 
|  | } else { | 
|  | require.NoError(t, err) | 
|  | } | 
|  | sort.Strings(commits) | 
|  | sort.Strings(tc.Expected) | 
|  | assertdeep.Equal(t, tc.Expected, commits) | 
|  | if tc.StoleFromIdx >= 0 { | 
|  | require.NotNil(t, stoleFrom) | 
|  | require.Equal(t, ids[tc.StoleFromIdx], stoleFrom.Id) | 
|  | } else { | 
|  | require.Nil(t, stoleFrom) | 
|  | } | 
|  |  | 
|  | // Insert the task into the DB. | 
|  | c := &TaskCandidate{ | 
|  | TaskKey: types.TaskKey{ | 
|  | RepoState: types.RepoState{ | 
|  | Repo:     rs1.Repo, | 
|  | Revision: tc.Revision, | 
|  | }, | 
|  | Name: taskName, | 
|  | }, | 
|  | TaskSpec: &specs.TaskSpec{}, | 
|  | } | 
|  | task := c.MakeTask() | 
|  | task.Commits = commits | 
|  | task.Created = now.Now(ctx) | 
|  | if stoleFrom != nil { | 
|  | // Re-insert the stoleFrom task without the commits | 
|  | // which were stolen from it. | 
|  | stoleFromCommits := make([]string, 0, len(stoleFrom.Commits)-len(commits)) | 
|  | for _, commit := range stoleFrom.Commits { | 
|  | if !util.In(commit, task.Commits) { | 
|  | stoleFromCommits = append(stoleFromCommits, commit) | 
|  | } | 
|  | } | 
|  | stoleFrom.Commits = stoleFromCommits | 
|  | require.NoError(t, d.PutTasks(ctx, []*types.Task{task, stoleFrom})) | 
|  | cache.AddTasks([]*types.Task{task, stoleFrom}) | 
|  | } else { | 
|  | require.NoError(t, d.PutTask(ctx, task)) | 
|  | cache.AddTasks([]*types.Task{task}) | 
|  | } | 
|  | ids = append(ids, task.Id) | 
|  | require.NoError(t, cache.Update(ctx)) | 
|  | } | 
|  |  | 
|  | // Commit B. | 
|  | commit("B") | 
|  |  | 
|  | // Test cases. Each test case builds on the previous cases. | 
|  |  | 
|  | // 0. The first task, at HEAD. | 
|  | test(&testCase{ | 
|  | Revision:     hashes["B"], | 
|  | Expected:     []string{hashes["B"], hashes["A"]}, | 
|  | StoleFromIdx: -1, | 
|  | }) | 
|  |  | 
|  | // Test the blamelist too long case by creating a bunch of commits. | 
|  | for i := 0; i < MAX_BLAMELIST_COMMITS+1; i++ { | 
|  | commit("C") | 
|  | } | 
|  | commit("D") | 
|  |  | 
|  | // 1. Blamelist too long, not a branch head. | 
|  | test(&testCase{ | 
|  | Revision:     hashes["C"], | 
|  | Expected:     []string{hashes["C"]}, | 
|  | StoleFromIdx: -1, | 
|  | }) | 
|  |  | 
|  | // 2. Blamelist too long, is a branch head. | 
|  | test(&testCase{ | 
|  | Revision:     hashes["D"], | 
|  | Expected:     []string{hashes["D"]}, | 
|  | StoleFromIdx: -1, | 
|  | }) | 
|  |  | 
|  | // Create the remaining commits. | 
|  | commit("E") | 
|  | commit("F") | 
|  | commit("G") | 
|  | mg.NewBranch("otherbranch", hashes["D"]) | 
|  | mg.CheckoutBranch("otherbranch") | 
|  | commit("H") | 
|  | commit("I") | 
|  | mg.CheckoutBranch(git.MainBranch) | 
|  | hashes["J"] = mg.Merge("otherbranch") | 
|  | require.NoError(t, tcc.Set(ctx, types.RepoState{ | 
|  | Repo:     rs1.Repo, | 
|  | Revision: hashes["J"], | 
|  | }, taskCfg, nil)) | 
|  |  | 
|  | commit("K") | 
|  |  | 
|  | // 3. On a linear set of commits, with at least one previous task. | 
|  | test(&testCase{ | 
|  | Revision:     hashes["F"], | 
|  | Expected:     []string{hashes["E"], hashes["F"]}, | 
|  | StoleFromIdx: -1, | 
|  | }) | 
|  | // 4. The first task on a new branch. | 
|  | test(&testCase{ | 
|  | Revision:     hashes["H"], | 
|  | Expected:     []string{hashes["H"]}, | 
|  | StoleFromIdx: -1, | 
|  | }) | 
|  | // 5. After a merge. | 
|  | test(&testCase{ | 
|  | Revision:     hashes["J"], | 
|  | Expected:     []string{hashes["G"], hashes["I"], hashes["J"]}, | 
|  | StoleFromIdx: -1, | 
|  | }) | 
|  | // 6. One last "normal" task. | 
|  | test(&testCase{ | 
|  | Revision:     hashes["K"], | 
|  | Expected:     []string{hashes["K"]}, | 
|  | StoleFromIdx: -1, | 
|  | }) | 
|  | // 7. Steal commits from a previously-ingested task. | 
|  | test(&testCase{ | 
|  | Revision:     hashes["E"], | 
|  | Expected:     []string{hashes["E"]}, | 
|  | StoleFromIdx: 3, | 
|  | }) | 
|  |  | 
|  | // Ensure that task #8 really stole the commit from #3. | 
|  | task, err := cache.GetTask(ids[3]) | 
|  | require.NoError(t, err) | 
|  | require.False(t, util.In(hashes["E"], task.Commits), fmt.Sprintf("Expected not to find %s in %v", hashes["E"], task.Commits)) | 
|  |  | 
|  | // 8. Retry #7. | 
|  | test(&testCase{ | 
|  | Revision:     hashes["E"], | 
|  | Expected:     []string{hashes["E"]}, | 
|  | StoleFromIdx: 7, | 
|  | }) | 
|  |  | 
|  | // Ensure that task #8 really stole the commit from #7. | 
|  | task, err = cache.GetTask(ids[7]) | 
|  | require.NoError(t, err) | 
|  | require.Equal(t, 0, len(task.Commits)) | 
|  |  | 
|  | // Four more commits. | 
|  | commit("L") | 
|  | commit("M") | 
|  | commit("N") | 
|  | commit("O") | 
|  |  | 
|  | // 9. Not really a test case, but setting up for #10. | 
|  | test(&testCase{ | 
|  | Revision:     hashes["O"], | 
|  | Expected:     []string{hashes["L"], hashes["M"], hashes["N"], hashes["O"]}, | 
|  | StoleFromIdx: -1, | 
|  | }) | 
|  |  | 
|  | // 10. Steal *two* commits from #9. | 
|  | test(&testCase{ | 
|  | Revision:     hashes["M"], | 
|  | Expected:     []string{hashes["L"], hashes["M"]}, | 
|  | StoleFromIdx: 9, | 
|  | }) | 
|  |  | 
|  | // 11. Verify that we correctly track when task specs were added. | 
|  | mg.NewBranch("otherbranch2", hashes["O"]) | 
|  | commit("P") | 
|  | mg.CheckoutBranch(git.MainBranch) | 
|  | commit("Q") | 
|  | newTaskCfg := taskCfg.Copy() | 
|  | newTaskCfg.Tasks["added-task"] = &specs.TaskSpec{} | 
|  | require.NoError(t, tcc.Set(ctx, types.RepoState{ | 
|  | Repo:     rs1.Repo, | 
|  | Revision: hashes["Q"], | 
|  | }, newTaskCfg, nil)) | 
|  | hashes["R"] = mg.Merge("otherbranch2") | 
|  | require.NoError(t, tcc.Set(ctx, types.RepoState{ | 
|  | Repo:     rs1.Repo, | 
|  | Revision: hashes["R"], | 
|  | }, newTaskCfg, nil)) | 
|  | // Existing task should get a normal blamelist of all three new commits. | 
|  | test(&testCase{ | 
|  | Revision:     hashes["R"], | 
|  | Expected:     []string{hashes["P"], hashes["Q"], hashes["R"]}, | 
|  | StoleFromIdx: -1, | 
|  | }) | 
|  | // The added task's blamelist should only include commits at which the | 
|  | // task was defined, ie. not P. | 
|  | test(&testCase{ | 
|  | Revision:     hashes["R"], | 
|  | Expected:     []string{hashes["Q"], hashes["R"]}, | 
|  | StoleFromIdx: -1, | 
|  | TaskName:     "added-task", | 
|  | }) | 
|  |  | 
|  | // 12. Stop computing blamelists when we reach a commit outside of the | 
|  | // scheduling window. | 
|  | hashes["S"] = mg.CommitAt("S", w.EarliestStart().Add(-time.Hour)) | 
|  | require.NoError(t, tcc.Set(ctx, types.RepoState{ | 
|  | Repo:     rs1.Repo, | 
|  | Revision: hashes["S"], | 
|  | }, taskCfg, nil)) | 
|  | commit("T") | 
|  | test(&testCase{ | 
|  | Revision:     hashes["T"], | 
|  | Expected:     []string{hashes["T"]}, | 
|  | StoleFromIdx: -1, | 
|  | }) | 
|  | } | 
|  |  | 
|  | func TestTimeDecay24Hr(t *testing.T) { | 
|  | tc := []struct { | 
|  | decayAmt24Hr float64 | 
|  | elapsed      time.Duration | 
|  | out          float64 | 
|  | }{ | 
|  | { | 
|  | decayAmt24Hr: 1.0, | 
|  | elapsed:      10 * time.Hour, | 
|  | out:          1.0, | 
|  | }, | 
|  | { | 
|  | decayAmt24Hr: 0.5, | 
|  | elapsed:      0 * time.Hour, | 
|  | out:          1.0, | 
|  | }, | 
|  | { | 
|  | decayAmt24Hr: 0.5, | 
|  | elapsed:      24 * time.Hour, | 
|  | out:          0.5, | 
|  | }, | 
|  | { | 
|  | decayAmt24Hr: 0.5, | 
|  | elapsed:      12 * time.Hour, | 
|  | out:          0.75, | 
|  | }, | 
|  | { | 
|  | decayAmt24Hr: 0.5, | 
|  | elapsed:      36 * time.Hour, | 
|  | out:          0.25, | 
|  | }, | 
|  | { | 
|  | decayAmt24Hr: 0.5, | 
|  | elapsed:      48 * time.Hour, | 
|  | out:          0.0, | 
|  | }, | 
|  | { | 
|  | decayAmt24Hr: 0.5, | 
|  | elapsed:      72 * time.Hour, | 
|  | out:          0.0, | 
|  | }, | 
|  | } | 
|  | for i, c := range tc { | 
|  | require.Equal(t, c.out, timeDecay24Hr(c.decayAmt24Hr, c.elapsed), fmt.Sprintf("test case #%d", i)) | 
|  | } | 
|  | } | 
|  |  | 
|  | func TestRegenerateTaskQueue(t *testing.T) { | 
|  | ctx, _, _, _, s, _, _, cleanup := setup(t) | 
|  | defer cleanup() | 
|  |  | 
|  | // Ensure that the queue is initially empty. | 
|  | require.Equal(t, 0, len(s.queue)) | 
|  |  | 
|  | c1 := rs1.Revision | 
|  | c2 := rs2.Revision | 
|  |  | 
|  | // Regenerate the task queue. | 
|  | queue, _, err := s.regenerateTaskQueue(ctx) | 
|  | require.NoError(t, err) | 
|  | require.Len(t, queue, 2) // Two Build tasks. | 
|  |  | 
|  | testSort := func() { | 
|  | // Ensure that we sorted correctly. | 
|  | if len(queue) == 0 { | 
|  | return | 
|  | } | 
|  | highScore := queue[0].Score | 
|  | for _, c := range queue { | 
|  | require.True(t, highScore >= c.Score) | 
|  | highScore = c.Score | 
|  | } | 
|  | } | 
|  | testSort() | 
|  |  | 
|  | // Since we haven't run any task yet, we should have the two Build | 
|  | // tasks. | 
|  | // The one at HEAD should have a two-commit blamelist and a | 
|  | // score of 3.5, scaled by a priority of 0.875 due to three jobs | 
|  | // depending on it (1 - 0.5^3). | 
|  | require.Equal(t, tcc_testutils.BuildTaskName, queue[0].Name) | 
|  | require.Equal(t, []string{c2, c1}, queue[0].Commits) | 
|  | require.Equal(t, 3, len(queue[0].Jobs)) | 
|  | require.InDelta(t, 3.5*0.875, queue[0].Score, scoreDelta) | 
|  | // The other should have one commit in its blamelist and | 
|  | // a score of 0.5, scaled by a priority of 0.75 due to two jobs. | 
|  | require.Equal(t, tcc_testutils.BuildTaskName, queue[1].Name) | 
|  | require.Equal(t, []string{c1}, queue[1].Commits) | 
|  | require.InDelta(t, 0.5*0.75, queue[1].Score, scoreDelta) | 
|  |  | 
|  | // Insert the task at c1, even though it scored lower. | 
|  | t1 := makeTask(ctx, queue[1].Name, queue[1].Repo, queue[1].Revision) | 
|  | require.NotNil(t, t1) | 
|  | t1.Status = types.TASK_STATUS_SUCCESS | 
|  | t1.IsolatedOutput = "fake isolated hash" | 
|  | require.NoError(t, s.putTask(ctx, t1)) | 
|  |  | 
|  | // Regenerate the task queue. | 
|  | queue, _, err = s.regenerateTaskQueue(ctx) | 
|  | require.NoError(t, err) | 
|  |  | 
|  | // Now we expect the queue to contain the other Build task and the one | 
|  | // Test task we unblocked by running the first Build task. | 
|  | require.Len(t, queue, 2) | 
|  | testSort() | 
|  | for _, c := range queue { | 
|  | if c.Name == tcc_testutils.TestTaskName { | 
|  | require.InDelta(t, 2.0*0.5, c.Score, scoreDelta) | 
|  | require.Equal(t, 1, len(c.Commits)) | 
|  | } else { | 
|  | require.Equal(t, c.Name, tcc_testutils.BuildTaskName) | 
|  | require.InDelta(t, 2.0*0.875, c.Score, scoreDelta) | 
|  | require.Equal(t, []string{c.Revision}, c.Commits) | 
|  | } | 
|  | } | 
|  | buildIdx := 0 | 
|  | testIdx := 1 | 
|  | if queue[1].Name == tcc_testutils.BuildTaskName { | 
|  | buildIdx = 1 | 
|  | testIdx = 0 | 
|  | } | 
|  | require.Equal(t, tcc_testutils.BuildTaskName, queue[buildIdx].Name) | 
|  | require.Equal(t, c2, queue[buildIdx].Revision) | 
|  |  | 
|  | require.Equal(t, tcc_testutils.TestTaskName, queue[testIdx].Name) | 
|  | require.Equal(t, c1, queue[testIdx].Revision) | 
|  |  | 
|  | // Run the other Build task. | 
|  | t2 := makeTask(ctx, queue[buildIdx].Name, queue[buildIdx].Repo, queue[buildIdx].Revision) | 
|  | t2.Status = types.TASK_STATUS_SUCCESS | 
|  | t2.IsolatedOutput = "fake isolated hash" | 
|  | require.NoError(t, s.putTask(ctx, t2)) | 
|  |  | 
|  | // Regenerate the task queue. | 
|  | queue, _, err = s.regenerateTaskQueue(ctx) | 
|  | require.NoError(t, err) | 
|  | require.Len(t, queue, 3) | 
|  | testSort() | 
|  | perfIdx := -1 | 
|  | for i, c := range queue { | 
|  | if c.Name == tcc_testutils.PerfTaskName { | 
|  | perfIdx = i | 
|  | require.Equal(t, c2, c.Revision) | 
|  | require.InDelta(t, 2.0*0.5, c.Score, scoreDelta) | 
|  | require.Equal(t, []string{c.Revision}, c.Commits) | 
|  | } else { | 
|  | require.Equal(t, c.Name, tcc_testutils.TestTaskName) | 
|  | if c.Revision == c2 { | 
|  | require.InDelta(t, 3.5*0.5, c.Score, scoreDelta) | 
|  | require.Equal(t, []string{c2, c1}, c.Commits) | 
|  | } else { | 
|  | require.InDelta(t, 0.5*0.5, c.Score, scoreDelta) | 
|  | require.Equal(t, []string{c.Revision}, c.Commits) | 
|  | } | 
|  | } | 
|  | } | 
|  | require.True(t, perfIdx > -1) | 
|  |  | 
|  | // Run the Test task at tip of tree; its blamelist covers both commits. | 
|  | t3 := makeTask(ctx, tcc_testutils.TestTaskName, rs1.Repo, c2) | 
|  | t3.Commits = []string{c2, c1} | 
|  | t3.Status = types.TASK_STATUS_SUCCESS | 
|  | t3.IsolatedOutput = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/256" | 
|  | require.NoError(t, s.putTask(ctx, t3)) | 
|  |  | 
|  | // Regenerate the task queue. | 
|  | queue, _, err = s.regenerateTaskQueue(ctx) | 
|  | require.NoError(t, err) | 
|  |  | 
|  | // Now we expect the queue to contain one Test and one Perf task. The | 
|  | // Test task is a backfill, and should have a score of 0.5, scaled by | 
|  | // the priority of 0.5. | 
|  | require.Len(t, queue, 2) | 
|  | testSort() | 
|  | // First candidate should be the perf task. | 
|  | require.Equal(t, tcc_testutils.PerfTaskName, queue[0].Name) | 
|  | require.InDelta(t, 2.0*0.5, queue[0].Score, scoreDelta) | 
|  | // The test task is next, a backfill. | 
|  | require.Equal(t, tcc_testutils.TestTaskName, queue[1].Name) | 
|  | require.InDelta(t, 0.5*0.5, queue[1].Score, scoreDelta) | 
|  | } | 
|  |  | 
|  | func makeTaskCandidate(name string, dims []string) *TaskCandidate { | 
|  | return &TaskCandidate{ | 
|  | Score: 1.0, | 
|  | TaskKey: types.TaskKey{ | 
|  | Name: name, | 
|  | }, | 
|  | TaskSpec: &specs.TaskSpec{ | 
|  | Dimensions: dims, | 
|  | }, | 
|  | } | 
|  | } | 
|  |  | 
|  | func makeSwarmingBot(id string, dims []string) *types.Machine { | 
|  | return &types.Machine{ | 
|  | ID:         id, | 
|  | Dimensions: dims, | 
|  | } | 
|  | } | 
|  |  | 
|  | func TestGetCandidatesToSchedule(t *testing.T) { | 
|  | ctx := context.Background() | 
|  | // Empty lists. | 
|  | rv := getCandidatesToSchedule(ctx, []*types.Machine{}, []*TaskCandidate{}) | 
|  | require.Empty(t, rv) | 
|  |  | 
|  | // checkDiags takes a list of bots with the same dimensions and a list of | 
|  | // ordered candidates that match those bots and checks the Diagnostics for | 
|  | // candidates. | 
|  | checkDiags := func(bots []*types.Machine, candidates []*TaskCandidate) { | 
|  | var expectedBots []string | 
|  | if len(bots) > 0 { | 
|  | expectedBots = make([]string, len(bots), len(bots)) | 
|  | for i, b := range bots { | 
|  | expectedBots[i] = b.ID | 
|  | } | 
|  | } | 
|  | for i, c := range candidates { | 
|  | // These conditions are not tested. | 
|  | require.False(t, c.Diagnostics.Scheduling.OverSchedulingLimitPerTaskSpec) | 
|  | require.False(t, c.Diagnostics.Scheduling.ScoreBelowThreshold) | 
|  | // NoBotsAvailable and MatchingBots will be the same for all candidates. | 
|  | require.Equal(t, len(expectedBots) == 0, c.Diagnostics.Scheduling.NoBotsAvailable) | 
|  | require.Equal(t, expectedBots, c.Diagnostics.Scheduling.MatchingBots) | 
|  | require.Equal(t, i, c.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates) | 
|  | if i == 0 { | 
|  | // First candidate. | 
|  | require.Nil(t, c.Diagnostics.Scheduling.LastSimilarCandidate) | 
|  | } else { | 
|  | last := candidates[i-1] | 
|  | require.Equal(t, &last.TaskKey, c.Diagnostics.Scheduling.LastSimilarCandidate) | 
|  | } | 
|  | require.Equal(t, i < len(bots), c.Diagnostics.Scheduling.Selected) | 
|  | } | 
|  | // Clear diagnostics for next test. | 
|  | for _, c := range candidates { | 
|  | c.Diagnostics = nil | 
|  | } | 
|  | } | 
|  |  | 
|  | t1 := makeTaskCandidate("task1", []string{"k:v"}) | 
|  | rv = getCandidatesToSchedule(ctx, []*types.Machine{}, []*TaskCandidate{t1}) | 
|  | require.Empty(t, rv) | 
|  | checkDiags([]*types.Machine{}, []*TaskCandidate{t1}) | 
|  |  | 
|  | b1 := makeSwarmingBot("bot1", []string{"k:v"}) | 
|  | rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{}) | 
|  | require.Empty(t, rv) | 
|  |  | 
|  | // Single match. | 
|  | rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{t1}) | 
|  | assertdeep.Equal(t, []*TaskCandidate{t1}, rv) | 
|  | checkDiags([]*types.Machine{b1}, []*TaskCandidate{t1}) | 
|  |  | 
|  | // No match. | 
|  | t1.TaskSpec.Dimensions[0] = "k:v2" | 
|  | rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{t1}) | 
|  | require.Empty(t, rv) | 
|  | checkDiags([]*types.Machine{}, []*TaskCandidate{t1}) | 
|  |  | 
|  | // Add a task candidate to match b1. | 
|  | t1 = makeTaskCandidate("task1", []string{"k:v2"}) | 
|  | t2 := makeTaskCandidate("task2", []string{"k:v"}) | 
|  | rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{t1, t2}) | 
|  | assertdeep.Equal(t, []*TaskCandidate{t2}, rv) | 
|  | checkDiags([]*types.Machine{}, []*TaskCandidate{t1}) | 
|  | checkDiags([]*types.Machine{b1}, []*TaskCandidate{t2}) | 
|  |  | 
|  | // Switch the task order. | 
|  | t1 = makeTaskCandidate("task1", []string{"k:v2"}) | 
|  | t2 = makeTaskCandidate("task2", []string{"k:v"}) | 
|  | rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{t2, t1}) | 
|  | assertdeep.Equal(t, []*TaskCandidate{t2}, rv) | 
|  | checkDiags([]*types.Machine{}, []*TaskCandidate{t1}) | 
|  | checkDiags([]*types.Machine{b1}, []*TaskCandidate{t2}) | 
|  |  | 
|  | // Make both tasks match the bot, ensure that we pick the first one. | 
|  | t1 = makeTaskCandidate("task1", []string{"k:v"}) | 
|  | t2 = makeTaskCandidate("task2", []string{"k:v"}) | 
|  | rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{t1, t2}) | 
|  | assertdeep.Equal(t, []*TaskCandidate{t1}, rv) | 
|  | checkDiags([]*types.Machine{b1}, []*TaskCandidate{t1, t2}) | 
|  | rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{t2, t1}) | 
|  | assertdeep.Equal(t, []*TaskCandidate{t2}, rv) | 
|  | checkDiags([]*types.Machine{b1}, []*TaskCandidate{t2, t1}) | 
|  |  | 
|  | // Multiple dimensions. Ensure that different permutations of the bots | 
|  | // and tasks lists give us the expected results. | 
|  | dims := []string{"k:v", "k2:v2", "k3:v3"} | 
|  | b1 = makeSwarmingBot("bot1", dims) | 
|  | b2 := makeSwarmingBot("bot2", t1.TaskSpec.Dimensions) | 
|  | t1 = makeTaskCandidate("task1", []string{"k:v"}) | 
|  | t2 = makeTaskCandidate("task2", dims) | 
|  | // In the first two cases, the task with fewer dimensions has the | 
|  | // higher priority. It gets the bot with more dimensions because it | 
|  | // is first in sorted order. The second task does not get scheduled | 
|  | // because there is no bot available which can run it. | 
|  | // TODO(borenet): Use a more optimal solution to avoid this case. | 
|  | rv = getCandidatesToSchedule(ctx, []*types.Machine{b1, b2}, []*TaskCandidate{t1, t2}) | 
|  | assertdeep.Equal(t, []*TaskCandidate{t1}, rv) | 
|  | // Can't use checkDiags for these cases. | 
|  | require.Equal(t, []string{b1.ID, b2.ID}, t1.Diagnostics.Scheduling.MatchingBots) | 
|  | require.Equal(t, 0, t1.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates) | 
|  | require.Nil(t, t1.Diagnostics.Scheduling.LastSimilarCandidate) | 
|  | require.True(t, t1.Diagnostics.Scheduling.Selected) | 
|  | t1.Diagnostics = nil | 
|  | require.Equal(t, []string{b1.ID}, t2.Diagnostics.Scheduling.MatchingBots) | 
|  | require.Equal(t, 1, t2.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates) | 
|  | require.Equal(t, &t1.TaskKey, t2.Diagnostics.Scheduling.LastSimilarCandidate) | 
|  | require.False(t, t2.Diagnostics.Scheduling.Selected) | 
|  | t2.Diagnostics = nil | 
|  |  | 
|  | t1 = makeTaskCandidate("task1", []string{"k:v"}) | 
|  | t2 = makeTaskCandidate("task2", dims) | 
|  | rv = getCandidatesToSchedule(ctx, []*types.Machine{b2, b1}, []*TaskCandidate{t1, t2}) | 
|  | assertdeep.Equal(t, []*TaskCandidate{t1}, rv) | 
|  | require.Equal(t, []string{b1.ID, b2.ID}, t1.Diagnostics.Scheduling.MatchingBots) | 
|  | require.Equal(t, 0, t1.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates) | 
|  | require.Nil(t, t1.Diagnostics.Scheduling.LastSimilarCandidate) | 
|  | require.True(t, t1.Diagnostics.Scheduling.Selected) | 
|  | t1.Diagnostics = nil | 
|  | require.Equal(t, []string{b1.ID}, t2.Diagnostics.Scheduling.MatchingBots) | 
|  | require.Equal(t, 1, t2.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates) | 
|  | require.Equal(t, &t1.TaskKey, t2.Diagnostics.Scheduling.LastSimilarCandidate) | 
|  | require.False(t, t2.Diagnostics.Scheduling.Selected) | 
|  | t2.Diagnostics = nil | 
|  |  | 
|  | // In these two cases, the task with more dimensions has the higher | 
|  | // priority. Both tasks get scheduled. | 
|  | t1 = makeTaskCandidate("task1", []string{"k:v"}) | 
|  | t2 = makeTaskCandidate("task2", dims) | 
|  | rv = getCandidatesToSchedule(ctx, []*types.Machine{b1, b2}, []*TaskCandidate{t2, t1}) | 
|  | assertdeep.Equal(t, []*TaskCandidate{t2, t1}, rv) | 
|  | require.Equal(t, []string{b1.ID, b2.ID}, t1.Diagnostics.Scheduling.MatchingBots) | 
|  | require.Equal(t, 1, t1.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates) | 
|  | require.Equal(t, &t2.TaskKey, t1.Diagnostics.Scheduling.LastSimilarCandidate) | 
|  | require.True(t, t1.Diagnostics.Scheduling.Selected) | 
|  | t1.Diagnostics = nil | 
|  | require.Equal(t, []string{b1.ID}, t2.Diagnostics.Scheduling.MatchingBots) | 
|  | require.Equal(t, 0, t2.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates) | 
|  | require.Nil(t, t2.Diagnostics.Scheduling.LastSimilarCandidate) | 
|  | require.True(t, t2.Diagnostics.Scheduling.Selected) | 
|  | t2.Diagnostics = nil | 
|  |  | 
|  | t1 = makeTaskCandidate("task1", []string{"k:v"}) | 
|  | t2 = makeTaskCandidate("task2", dims) | 
|  | rv = getCandidatesToSchedule(ctx, []*types.Machine{b2, b1}, []*TaskCandidate{t2, t1}) | 
|  | assertdeep.Equal(t, []*TaskCandidate{t2, t1}, rv) | 
|  | require.Equal(t, []string{b1.ID, b2.ID}, t1.Diagnostics.Scheduling.MatchingBots) | 
|  | require.Equal(t, 1, t1.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates) | 
|  | require.Equal(t, &t2.TaskKey, t1.Diagnostics.Scheduling.LastSimilarCandidate) | 
|  | require.True(t, t1.Diagnostics.Scheduling.Selected) | 
|  | t1.Diagnostics = nil | 
|  | require.Equal(t, []string{b1.ID}, t2.Diagnostics.Scheduling.MatchingBots) | 
|  | require.Equal(t, 0, t2.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates) | 
|  | require.Nil(t, t2.Diagnostics.Scheduling.LastSimilarCandidate) | 
|  | require.True(t, t2.Diagnostics.Scheduling.Selected) | 
|  | t2.Diagnostics = nil | 
|  |  | 
|  | // Matching dimensions. More bots than tasks. | 
|  | b2 = makeSwarmingBot("bot2", dims) | 
|  | b3 := makeSwarmingBot("bot3", dims) | 
|  | t1 = makeTaskCandidate("task1", dims) | 
|  | t2 = makeTaskCandidate("task2", dims) | 
|  | t3 := makeTaskCandidate("task3", dims) | 
|  | rv = getCandidatesToSchedule(ctx, []*types.Machine{b1, b2, b3}, []*TaskCandidate{t1, t2}) | 
|  | assertdeep.Equal(t, []*TaskCandidate{t1, t2}, rv) | 
|  | checkDiags([]*types.Machine{b1, b2, b3}, []*TaskCandidate{t1, t2}) | 
|  |  | 
|  | // More tasks than bots. | 
|  | t1 = makeTaskCandidate("task1", dims) | 
|  | t2 = makeTaskCandidate("task2", dims) | 
|  | t3 = makeTaskCandidate("task3", dims) | 
|  | rv = getCandidatesToSchedule(ctx, []*types.Machine{b1, b2}, []*TaskCandidate{t1, t2, t3}) | 
|  | assertdeep.Equal(t, []*TaskCandidate{t1, t2}, rv) | 
|  | checkDiags([]*types.Machine{b1, b2}, []*TaskCandidate{t1, t2, t3}) | 
|  | } | 
|  |  | 
|  | func makeBot(id string, dims map[string]string) *types.Machine { | 
|  | dimensions := make([]string, 0, len(dims)) | 
|  | for k, v := range dims { | 
|  | dimensions = append(dimensions, fmt.Sprintf("%s:%s", k, v)) | 
|  | } | 
|  | return &types.Machine{ | 
|  | ID:         id, | 
|  | Dimensions: dimensions, | 
|  | } | 
|  | } | 
|  |  | 
|  | func mockBots(t sktest.TestingT, swarmingClient *swarming_testutils.TestClient, bots ...*types.Machine) { | 
|  | swarmBots := make([]*swarming_api.SwarmingRpcsBotInfo, 0, len(bots)) | 
|  | for _, bot := range bots { | 
|  | dims, err := swarming.ParseDimensions(bot.Dimensions) | 
|  | require.NoError(t, err) | 
|  | swarmBots = append(swarmBots, &swarming_api.SwarmingRpcsBotInfo{ | 
|  | BotId:      bot.ID, | 
|  | Dimensions: swarming.StringMapToBotDimensions(dims), | 
|  | }) | 
|  | } | 
|  | swarmingClient.MockBots(swarmBots) | 
|  | } | 
|  |  | 
|  | func TestSchedulingE2E(t *testing.T) { | 
|  | ctx, _, _, swarmingClient, s, _, cas, cleanup := setup(t) | 
|  | defer cleanup() | 
|  |  | 
|  | c1 := rs1.Revision | 
|  | c2 := rs2.Revision | 
|  |  | 
|  | // Start testing. No free bots, so we get a full queue with nothing | 
|  | // scheduled. | 
|  | runMainLoop(t, s, ctx) | 
|  | tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2}) | 
|  | require.NoError(t, err) | 
|  | expect := map[string]map[string]*types.Task{ | 
|  | c1: {}, | 
|  | c2: {}, | 
|  | } | 
|  | assertdeep.Equal(t, expect, tasks) | 
|  | require.Equal(t, 2, len(s.queue)) // Two compile tasks. | 
|  |  | 
|  | // A bot is free but doesn't have all of the right dimensions to run a task. | 
|  | bot1 := makeBot("bot1", map[string]string{"pool": "Skia"}) | 
|  | mockBots(t, swarmingClient, bot1) | 
|  | runMainLoop(t, s, ctx) | 
|  | tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2}) | 
|  | require.NoError(t, err) | 
|  | expect = map[string]map[string]*types.Task{ | 
|  | c1: {}, | 
|  | c2: {}, | 
|  | } | 
|  | assertdeep.Equal(t, expect, tasks) | 
|  | require.Equal(t, 2, len(s.queue)) // Still two compile tasks. | 
|  |  | 
|  | // One bot free, schedule a task, ensure it's not in the queue. | 
|  | bot1.Dimensions = append(bot1.Dimensions, "os:Ubuntu") | 
|  | mockBots(t, swarmingClient, bot1) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2}) | 
|  | require.NoError(t, err) | 
|  | t1 := tasks[c2][tcc_testutils.BuildTaskName] | 
|  | require.NotNil(t, t1) | 
|  | require.Equal(t, c2, t1.Revision) | 
|  | require.Equal(t, tcc_testutils.BuildTaskName, t1.Name) | 
|  | require.Equal(t, []string{c2, c1}, t1.Commits) | 
|  | require.Equal(t, 1, len(s.queue)) | 
|  |  | 
|  | // The task is complete. | 
|  | t1.Status = types.TASK_STATUS_SUCCESS | 
|  | t1.Finished = now.Now(ctx) | 
|  | t1.IsolatedOutput = "dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd/86" | 
|  | require.NoError(t, s.putTask(ctx, t1)) | 
|  | swarmingClient.MockTasks([]*swarming_api.SwarmingRpcsTaskRequestMetadata{ | 
|  | makeSwarmingRpcsTaskRequestMetadata(t, t1, linuxTaskDims), | 
|  | }) | 
|  | cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.TestCASDigest, t1.IsolatedOutput}).Return("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbabc123/56", nil) | 
|  | cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.PerfCASDigest, t1.IsolatedOutput}).Return("ccccccccccccccccccccccccccccccccccccccccccccccccccccccccccabc123/56", nil) | 
|  |  | 
|  | // No bots free. Ensure that the queue is correct. | 
|  | mockBots(t, swarmingClient) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2}) | 
|  | require.NoError(t, err) | 
|  | for _, c := range t1.Commits { | 
|  | expect[c][t1.Name] = t1 | 
|  | } | 
|  | assertdeep.Equal(t, expect, tasks) | 
|  | expectLen := 3 // One remaining build task, plus one test task and one perf task. | 
|  | require.Equal(t, expectLen, len(s.queue)) | 
|  |  | 
|  | // More bots than tasks free, ensure the queue is correct. | 
|  | bot2 := makeBot("bot2", androidTaskDims) | 
|  | bot3 := makeBot("bot3", androidTaskDims) | 
|  | bot4 := makeBot("bot4", linuxTaskDims) | 
|  | mockBots(t, swarmingClient, bot1, bot2, bot3, bot4) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | _, err = s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2}) | 
|  | require.NoError(t, err) | 
|  | require.Equal(t, 0, len(s.queue)) | 
|  |  | 
|  | // The build, test, and perf tasks should have triggered. | 
|  | var t2 *types.Task | 
|  | var t3 *types.Task | 
|  | var t4 *types.Task | 
|  | tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2}) | 
|  | require.NoError(t, err) | 
|  | require.Len(t, tasks, 2) | 
|  | for commit, v := range tasks { | 
|  | if commit == c1 { | 
|  | // Build task at c1 and test task at c2 whose blamelist also has c1. | 
|  | require.Len(t, v, 2) | 
|  | for _, task := range v { | 
|  | if task.Revision != commit { | 
|  | continue | 
|  | } | 
|  | require.Equal(t, tcc_testutils.BuildTaskName, task.Name) | 
|  | require.Nil(t, t4) | 
|  | t4 = task | 
|  | require.Equal(t, c1, task.Revision) | 
|  | require.Equal(t, []string{c1}, task.Commits) | 
|  | } | 
|  | } else { | 
|  | require.Len(t, v, 3) | 
|  | for _, task := range v { | 
|  | if task.Name == tcc_testutils.TestTaskName { | 
|  | require.Nil(t, t2) | 
|  | t2 = task | 
|  | require.Equal(t, c2, task.Revision) | 
|  | require.Equal(t, []string{c2, c1}, task.Commits) | 
|  | } else if task.Name == tcc_testutils.PerfTaskName { | 
|  | require.Nil(t, t3) | 
|  | t3 = task | 
|  | require.Equal(t, c2, task.Revision) | 
|  | require.Equal(t, []string{c2}, task.Commits) | 
|  | } else { | 
|  | // This is the first task we triggered. | 
|  | require.Equal(t, tcc_testutils.BuildTaskName, task.Name) | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  | require.NotNil(t, t2) | 
|  | require.NotNil(t, t3) | 
|  | require.NotNil(t, t4) | 
|  | t4.Status = types.TASK_STATUS_SUCCESS | 
|  | t4.Finished = now.Now(ctx) | 
|  | t4.IsolatedOutput = "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee/99" | 
|  | require.NoError(t, s.putTask(ctx, t4)) | 
|  |  | 
|  | // No new bots free; only the remaining test task should be in the queue. | 
|  | mockBots(t, swarmingClient) | 
|  | mockTasks := []*swarming_api.SwarmingRpcsTaskRequestMetadata{ | 
|  | makeSwarmingRpcsTaskRequestMetadata(t, t2, linuxTaskDims), | 
|  | makeSwarmingRpcsTaskRequestMetadata(t, t3, linuxTaskDims), | 
|  | makeSwarmingRpcsTaskRequestMetadata(t, t4, linuxTaskDims), | 
|  | } | 
|  | swarmingClient.MockTasks(mockTasks) | 
|  | require.NoError(t, s.updateUnfinishedTasks(ctx)) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | expectLen = 1 // Test task from c1 | 
|  | require.Equal(t, expectLen, len(s.queue)) | 
|  |  | 
|  | // Finish the other task. | 
|  | t3, err = s.tCache.GetTask(t3.Id) | 
|  | require.NoError(t, err) | 
|  | t3.Status = types.TASK_STATUS_SUCCESS | 
|  | t3.Finished = now.Now(ctx) | 
|  | t3.IsolatedOutput = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/256" | 
|  |  | 
|  | // Ensure that we finalize all of the tasks and insert into the DB. | 
|  | mockBots(t, swarmingClient, bot1, bot2, bot3, bot4) | 
|  | mockTasks = []*swarming_api.SwarmingRpcsTaskRequestMetadata{ | 
|  | makeSwarmingRpcsTaskRequestMetadata(t, t3, linuxTaskDims), | 
|  | } | 
|  | cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.TestCASDigest, t4.IsolatedOutput}).Return("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbabc123/56", nil) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2}) | 
|  | require.NoError(t, err) | 
|  | require.Equal(t, 2, len(tasks[c1])) | 
|  | require.Equal(t, 3, len(tasks[c2])) | 
|  | require.Equal(t, 0, len(s.queue)) | 
|  |  | 
|  | // Mark everything as finished. Ensure that the queue still ends up empty. | 
|  | tasksList := []*types.Task{} | 
|  | for _, v := range tasks { | 
|  | for _, task := range v { | 
|  | if task.Status != types.TASK_STATUS_SUCCESS { | 
|  | task.Status = types.TASK_STATUS_SUCCESS | 
|  | task.Finished = now.Now(ctx) | 
|  | task.IsolatedOutput = "abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234/56" | 
|  | tasksList = append(tasksList, task) | 
|  | } | 
|  | } | 
|  | } | 
|  | mockTasks = make([]*swarming_api.SwarmingRpcsTaskRequestMetadata, 0, len(tasksList)) | 
|  | for _, task := range tasksList { | 
|  | mockTasks = append(mockTasks, makeSwarmingRpcsTaskRequestMetadata(t, task, linuxTaskDims)) | 
|  | } | 
|  | swarmingClient.MockTasks(mockTasks) | 
|  | mockBots(t, swarmingClient, bot1, bot2, bot3, bot4) | 
|  | require.NoError(t, s.updateUnfinishedTasks(ctx)) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.Equal(t, 0, len(s.queue)) | 
|  | } | 
|  |  | 
|  | func TestSchedulerStealingFrom(t *testing.T) { | 
|  | ctx, mg, _, swarmingClient, s, _, _, cleanup := setup(t) | 
|  | defer cleanup() | 
|  |  | 
|  | c1 := rs1.Revision | 
|  | c2 := rs2.Revision | 
|  |  | 
|  | // Run the available compile task at c2. | 
|  | bot1 := makeBot("bot1", linuxTaskDims) | 
|  | bot2 := makeBot("bot2", linuxTaskDims) | 
|  | mockBots(t, swarmingClient, bot1, bot2) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2}) | 
|  | require.NoError(t, err) | 
|  | require.Equal(t, 1, len(tasks[c1])) | 
|  | require.Equal(t, 1, len(tasks[c2])) | 
|  |  | 
|  | // Finish the one task. | 
|  | tasksList := []*types.Task{} | 
|  | t1 := tasks[c2][tcc_testutils.BuildTaskName] | 
|  | t1.Status = types.TASK_STATUS_SUCCESS | 
|  | t1.Finished = now.Now(ctx) | 
|  | t1.IsolatedOutput = "abc123" | 
|  | tasksList = append(tasksList, t1) | 
|  |  | 
|  | // Forcibly create and insert a second task at c1. | 
|  | t2 := t1.Copy() | 
|  | t2.Id = "t2id" | 
|  | t2.Revision = c1 | 
|  | t2.Commits = []string{c1} | 
|  | tasksList = append(tasksList, t2) | 
|  |  | 
|  | require.NoError(t, s.putTasks(ctx, tasksList)) | 
|  |  | 
|  | // Add some commits. | 
|  | commits := mg.CommitN(10) | 
|  | require.NoError(t, s.repos[rs1.Repo].Update(ctx)) | 
|  | for _, h := range commits { | 
|  | rs := types.RepoState{ | 
|  | Repo:     rs1.Repo, | 
|  | Revision: h, | 
|  | } | 
|  | fillCaches(t, ctx, s.taskCfgCache, rs, tcc_testutils.TasksCfg2) | 
|  | insertJobs(t, ctx, s, rs) | 
|  | } | 
|  |  | 
|  | // Run one task. Ensure that it's at tip-of-tree. | 
|  | head := s.repos[rs1.Repo].Get(git.MainBranch).Hash | 
|  | mockBots(t, swarmingClient, bot1) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, commits) | 
|  | require.NoError(t, err) | 
|  | require.Len(t, tasks[head], 1) | 
|  | task := tasks[head][tcc_testutils.BuildTaskName] | 
|  | require.Equal(t, head, task.Revision) | 
|  | expect := commits[:] | 
|  | sort.Strings(expect) | 
|  | sort.Strings(task.Commits) | 
|  | assertdeep.Equal(t, expect, task.Commits) | 
|  |  | 
|  | task.Status = types.TASK_STATUS_SUCCESS | 
|  | task.Finished = now.Now(ctx) | 
|  | task.IsolatedOutput = "abc123" | 
|  | require.NoError(t, s.putTask(ctx, task)) | 
|  |  | 
|  | oldTasksByCommit := tasks | 
|  |  | 
|  | // Run backfills, ensuring that each one steals the right set of commits | 
|  | // from previous builds, until all of the build task candidates have run. | 
|  | for i := 0; i < 9; i++ { | 
|  | // Now, run another task. The new task should bisect the old one. | 
|  | mockBots(t, swarmingClient, bot1) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, commits) | 
|  | require.NoError(t, err) | 
|  | var newTask *types.Task | 
|  | for _, v := range tasks { | 
|  | for _, task := range v { | 
|  | if task.Status == types.TASK_STATUS_PENDING { | 
|  | require.True(t, newTask == nil || task.Id == newTask.Id) | 
|  | newTask = task | 
|  | } | 
|  | } | 
|  | } | 
|  | require.NotNil(t, newTask) | 
|  |  | 
|  | oldTask := oldTasksByCommit[newTask.Revision][newTask.Name] | 
|  | require.NotNil(t, oldTask) | 
|  | require.True(t, util.In(newTask.Revision, oldTask.Commits)) | 
|  |  | 
|  | // Find the updated old task. | 
|  | updatedOldTask, err := s.tCache.GetTask(oldTask.Id) | 
|  | require.NoError(t, err) | 
|  | require.NotNil(t, updatedOldTask) | 
|  |  | 
|  | // Ensure that the blamelists are correct. | 
|  | old := util.NewStringSet(oldTask.Commits) | 
|  | new := util.NewStringSet(newTask.Commits) | 
|  | updatedOld := util.NewStringSet(updatedOldTask.Commits) | 
|  |  | 
|  | assertdeep.Equal(t, old, new.Union(updatedOld)) | 
|  | require.Equal(t, 0, len(new.Intersect(updatedOld))) | 
|  | // Finish the new task. | 
|  | newTask.Status = types.TASK_STATUS_SUCCESS | 
|  | newTask.Finished = now.Now(ctx) | 
|  | newTask.IsolatedOutput = "abc123" | 
|  | require.NoError(t, s.putTask(ctx, newTask)) | 
|  | oldTasksByCommit = tasks | 
|  | } | 
|  |  | 
|  | // Ensure that we're really done. | 
|  | mockBots(t, swarmingClient, bot1) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, commits) | 
|  | require.NoError(t, err) | 
|  | var newTask *types.Task | 
|  | for _, v := range tasks { | 
|  | for _, task := range v { | 
|  | if task.Status == types.TASK_STATUS_PENDING { | 
|  | require.True(t, newTask == nil || task.Id == newTask.Id) | 
|  | newTask = task | 
|  | } | 
|  | } | 
|  | } | 
|  | require.Nil(t, newTask) | 
|  | } | 
|  |  | 
|  | // spyDB calls onPutTasks before delegating PutTask(s) to DB. | 
|  | type spyDB struct { | 
|  | db.DB | 
|  | onPutTasks func([]*types.Task) | 
|  | } | 
|  |  | 
|  | func (s *spyDB) PutTask(ctx context.Context, task *types.Task) error { | 
|  | s.onPutTasks([]*types.Task{task}) | 
|  | return s.DB.PutTask(ctx, task) | 
|  | } | 
|  |  | 
|  | func (s *spyDB) PutTasks(ctx context.Context, tasks []*types.Task) error { | 
|  | s.onPutTasks(tasks) | 
|  | return s.DB.PutTasks(ctx, tasks) | 
|  | } | 
|  |  | 
|  | func testMultipleCandidatesBackfillingEachOtherSetup(t *testing.T) (context.Context, *mem_git.MemGit, db.DB, *TaskScheduler, *swarming_testutils.TestClient, []string, func(*types.Task), *specs.TasksCfg, func()) { | 
|  |  | 
|  | ctx, cancel := context.WithCancel(context.Background()) | 
|  | workdir, err := os.MkdirTemp("", "") | 
|  | require.NoError(t, err) | 
|  |  | 
|  | // Setup the scheduler. | 
|  | d := memory.NewInMemoryDB() | 
|  | swarmingClient := swarming_testutils.NewTestClient() | 
|  | mg, repo := newMemRepo(t) | 
|  | repos := repograph.Map{ | 
|  | rs1.Repo: repo, | 
|  | } | 
|  | btProject, btInstance, btCleanup := tcc_testutils.SetupBigTable(t) | 
|  | taskCfgCache, err := task_cfg_cache.NewTaskCfgCache(ctx, repos, btProject, btInstance, nil) | 
|  | require.NoError(t, err) | 
|  |  | 
|  | // Create a single task in the config. | 
|  | taskName := "faketask" | 
|  | cfg := &specs.TasksCfg{ | 
|  | CasSpecs: map[string]*specs.CasSpec{ | 
|  | "compile": { | 
|  | Digest: tcc_testutils.CompileCASDigest, | 
|  | }, | 
|  | }, | 
|  | Tasks: map[string]*specs.TaskSpec{ | 
|  | taskName: { | 
|  | CasSpec:      "compile", | 
|  | CipdPackages: []*specs.CipdPackage{}, | 
|  | Dependencies: []string{}, | 
|  | Dimensions:   []string{"pool:Skia"}, | 
|  | Priority:     1.0, | 
|  | }, | 
|  | }, | 
|  | Jobs: map[string]*specs.JobSpec{ | 
|  | "j1": { | 
|  | TaskSpecs: []string{taskName}, | 
|  | }, | 
|  | }, | 
|  | } | 
|  | hashes := mg.CommitN(1) | 
|  |  | 
|  | mockTasks := []*swarming_api.SwarmingRpcsTaskRequestMetadata{} | 
|  | mock := func(task *types.Task) { | 
|  | task.Status = types.TASK_STATUS_SUCCESS | 
|  | task.Finished = now.Now(ctx) | 
|  | task.IsolatedOutput = tcc_testutils.CompileCASDigest | 
|  | mockTasks = append(mockTasks, makeSwarmingRpcsTaskRequestMetadata(t, task, linuxTaskDims)) | 
|  | swarmingClient.MockTasks(mockTasks) | 
|  | } | 
|  |  | 
|  | // Create the TaskScheduler. | 
|  | cas := &mocks.CAS{} | 
|  | cas.On("Close").Return(nil) | 
|  | // Go ahead and mock the single-input merge calls. | 
|  | cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.CompileCASDigest}).Return(tcc_testutils.CompileCASDigest, nil) | 
|  | cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.TestCASDigest}).Return(tcc_testutils.TestCASDigest, nil) | 
|  | cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.PerfCASDigest}).Return(tcc_testutils.PerfCASDigest, nil) | 
|  |  | 
|  | taskExec := swarming_task_execution.NewSwarmingTaskExecutor(swarmingClient, "fake-cas-instance", "") | 
|  | taskExecs := map[string]types.TaskExecutor{ | 
|  | types.TaskExecutor_Swarming:   taskExec, | 
|  | types.TaskExecutor_UseDefault: taskExec, | 
|  | } | 
|  | s, err := NewTaskScheduler(ctx, d, nil, time.Duration(math.MaxInt64), 0, repos, cas, "fake-cas-instance", taskExecs, mockhttpclient.NewURLMock().Client(), 1.0, swarming.POOLS_PUBLIC, cdPoolName, "", taskCfgCache, nil, mem_gcsclient.New("diag_unit_tests"), btInstance, BusyBotsDebugLoggingOff) | 
|  | require.NoError(t, err) | 
|  |  | 
|  | for _, h := range hashes { | 
|  | rs := types.RepoState{ | 
|  | Repo:     rs1.Repo, | 
|  | Revision: h, | 
|  | } | 
|  | fillCaches(t, ctx, taskCfgCache, rs, cfg) | 
|  | insertJobs(t, ctx, s, rs) | 
|  | } | 
|  |  | 
|  | // Cycle once. | 
|  | bot1 := makeBot("bot1", map[string]string{"pool": "Skia"}) | 
|  | mockBots(t, swarmingClient, bot1) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | require.Equal(t, 0, len(s.queue)) | 
|  | head := s.repos[rs1.Repo].Get(git.MainBranch).Hash | 
|  | tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, []string{head}) | 
|  | require.NoError(t, err) | 
|  | require.Len(t, tasks[head], 1) | 
|  | mock(tasks[head][taskName]) | 
|  |  | 
|  | // Add some commits to the repo. | 
|  | newHashes := mg.CommitN(8) | 
|  | for _, h := range newHashes { | 
|  | rs := types.RepoState{ | 
|  | Repo:     rs1.Repo, | 
|  | Revision: h, | 
|  | } | 
|  | fillCaches(t, ctx, taskCfgCache, rs, cfg) | 
|  | insertJobs(t, ctx, s, rs) | 
|  | } | 
|  | require.NoError(t, s.repos[rs1.Repo].Update(ctx)) | 
|  | return ctx, mg, d, s, swarmingClient, newHashes, mock, cfg, func() { | 
|  | testutils.AssertCloses(t, s) | 
|  | testutils.RemoveAll(t, workdir) | 
|  | btCleanup() | 
|  | cancel() | 
|  | } | 
|  | } | 
|  |  | 
|  | func TestMultipleCandidatesBackfillingEachOther(t *testing.T) { | 
|  | ctx, _, d, s, swarmingClient, commits, mock, _, cleanup := testMultipleCandidatesBackfillingEachOtherSetup(t) | 
|  | defer cleanup() | 
|  |  | 
|  | // Trigger builds simultaneously. | 
|  | bot1 := makeBot("bot1", map[string]string{"pool": "Skia"}) | 
|  | bot2 := makeBot("bot2", map[string]string{"pool": "Skia"}) | 
|  | bot3 := makeBot("bot3", map[string]string{"pool": "Skia"}) | 
|  | mockBots(t, swarmingClient, bot1, bot2, bot3) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | require.Equal(t, 5, len(s.queue)) | 
|  | tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, commits) | 
|  | require.NoError(t, err) | 
|  |  | 
|  | // If we're queueing correctly, we should've triggered tasks at | 
|  | // commits[0], commits[4], and either commits[2] or commits[6]. | 
|  | var t1, t2, t3 *types.Task | 
|  | for _, byName := range tasks { | 
|  | for _, task := range byName { | 
|  | if task.Revision == commits[0] { | 
|  | t1 = task | 
|  | } else if task.Revision == commits[4] { | 
|  | t2 = task | 
|  | } else if task.Revision == commits[2] || task.Revision == commits[6] { | 
|  | t3 = task | 
|  | } else { | 
|  | require.FailNow(t, fmt.Sprintf("Task has unknown revision: %+v", task)) | 
|  | } | 
|  | } | 
|  | } | 
|  | require.NotNil(t, t1) | 
|  | require.NotNil(t, t2) | 
|  | require.NotNil(t, t3) | 
|  | mock(t1) | 
|  | mock(t2) | 
|  | mock(t3) | 
|  |  | 
|  | // Ensure that we got the blamelists right. | 
|  | var expect1, expect2, expect3 []string | 
|  | if t3.Revision == commits[2] { | 
|  | expect1 = util.CopyStringSlice(commits[:2]) | 
|  | expect2 = util.CopyStringSlice(commits[4:]) | 
|  | expect3 = util.CopyStringSlice(commits[2:4]) | 
|  | } else { | 
|  | expect1 = util.CopyStringSlice(commits[:4]) | 
|  | expect2 = util.CopyStringSlice(commits[4:6]) | 
|  | expect3 = util.CopyStringSlice(commits[6:]) | 
|  | } | 
|  | sort.Strings(expect1) | 
|  | sort.Strings(expect2) | 
|  | sort.Strings(expect3) | 
|  | sort.Strings(t1.Commits) | 
|  | sort.Strings(t2.Commits) | 
|  | sort.Strings(t3.Commits) | 
|  | assertdeep.Equal(t, expect1, t1.Commits) | 
|  | assertdeep.Equal(t, expect2, t2.Commits) | 
|  | assertdeep.Equal(t, expect3, t3.Commits) | 
|  |  | 
|  | // Just for good measure, check the task at the head of the queue. | 
|  | expectIdx := 2 | 
|  | if t3.Revision == commits[expectIdx] { | 
|  | expectIdx = 6 | 
|  | } | 
|  | require.Equal(t, commits[expectIdx], s.queue[0].Revision) | 
|  |  | 
|  | retryCount := 0 | 
|  | causeConcurrentUpdate := func(tasks []*types.Task) { | 
|  | // HACK(benjaminwagner): Filter out PutTask calls from | 
|  | // updateUnfinishedTasks by looking for new tasks. | 
|  | anyNew := false | 
|  | for _, task := range tasks { | 
|  | if util.TimeIsZero(task.DbModified) { | 
|  | anyNew = true | 
|  | break | 
|  | } | 
|  | } | 
|  | if !anyNew { | 
|  | return | 
|  | } | 
|  | if retryCount < 3 { | 
|  | taskToUpdate := []*types.Task{t1, t2, t3}[retryCount] | 
|  | retryCount++ | 
|  | taskInDb, err := d.GetTaskById(ctx, taskToUpdate.Id) | 
|  | require.NoError(t, err) | 
|  | taskInDb.Status = types.TASK_STATUS_SUCCESS | 
|  | require.NoError(t, d.PutTask(ctx, taskInDb)) | 
|  | s.tCache.AddTasks([]*types.Task{taskInDb}) | 
|  | } | 
|  | } | 
|  | s.db = &spyDB{ | 
|  | DB:         d, | 
|  | onPutTasks: causeConcurrentUpdate, | 
|  | } | 
|  |  | 
|  | // Run again with 5 bots to check the case where we bisect the same | 
|  | // task twice. | 
|  | bot4 := makeBot("bot4", map[string]string{"pool": "Skia"}) | 
|  | bot5 := makeBot("bot5", map[string]string{"pool": "Skia"}) | 
|  | mockBots(t, swarmingClient, bot1, bot2, bot3, bot4, bot5) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | require.Equal(t, 0, len(s.queue)) | 
|  | require.Equal(t, 3, retryCount) | 
|  | tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, commits) | 
|  | require.NoError(t, err) | 
|  | for _, byName := range tasks { | 
|  | for _, task := range byName { | 
|  | require.Equal(t, 1, len(task.Commits)) | 
|  | require.Equal(t, task.Revision, task.Commits[0]) | 
|  | if util.In(task.Id, []string{t1.Id, t2.Id, t3.Id}) { | 
|  | require.Equal(t, types.TASK_STATUS_SUCCESS, task.Status) | 
|  | } else { | 
|  | require.Equal(t, types.TASK_STATUS_PENDING, task.Status) | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | func TestSchedulingRetry(t *testing.T) { | 
|  | ctx, _, _, swarmingClient, s, _, _, cleanup := setup(t) | 
|  | defer cleanup() | 
|  |  | 
|  | c1 := rs1.Revision | 
|  |  | 
|  | // Run the available compile task at c2. | 
|  | bot1 := makeBot("bot1", linuxTaskDims) | 
|  | mockBots(t, swarmingClient, bot1) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | tasks, err := s.tCache.UnfinishedTasks() | 
|  | require.NoError(t, err) | 
|  | require.Len(t, tasks, 1) | 
|  | t1 := tasks[0] | 
|  | require.NotNil(t, t1) | 
|  | // Ensure c2, not c1. | 
|  | require.NotEqual(t, c1, t1.Revision) | 
|  | c2 := t1.Revision | 
|  |  | 
|  | // Forcibly add a second build task at c1. | 
|  | t2 := t1.Copy() | 
|  | t2.Id = "t2Id" | 
|  | t2.Revision = c1 | 
|  | t2.Commits = []string{c1} | 
|  | t1.Commits = []string{c2} | 
|  |  | 
|  | // One task successful, the other not. | 
|  | t1.Status = types.TASK_STATUS_FAILURE | 
|  | t1.Finished = now.Now(ctx) | 
|  | t2.Status = types.TASK_STATUS_SUCCESS | 
|  | t2.Finished = now.Now(ctx) | 
|  | t2.IsolatedOutput = "abc123" | 
|  |  | 
|  | require.NoError(t, s.putTasks(ctx, []*types.Task{t1, t2})) | 
|  |  | 
|  | // Cycle. Ensure that we schedule a retry of t1. | 
|  | prev := t1 | 
|  | i := 1 | 
|  | for { | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | tasks, err = s.tCache.UnfinishedTasks() | 
|  | require.NoError(t, err) | 
|  | if len(tasks) == 0 { | 
|  | break | 
|  | } | 
|  | require.Len(t, tasks, 1) | 
|  | retry := tasks[0] | 
|  | require.NotNil(t, retry) | 
|  | require.Equal(t, prev.Id, retry.RetryOf) | 
|  | require.Equal(t, i, retry.Attempt) | 
|  | require.Equal(t, c2, retry.Revision) | 
|  | retry.Status = types.TASK_STATUS_FAILURE | 
|  | retry.Finished = now.Now(ctx) | 
|  | require.NoError(t, s.putTask(ctx, retry)) | 
|  |  | 
|  | prev = retry | 
|  | i++ | 
|  | } | 
|  | require.Equal(t, 5, i) | 
|  | } | 
|  |  | 
|  | func TestParentTaskId(t *testing.T) { | 
|  | ctx, _, _, swarmingClient, s, _, cas, cleanup := setup(t) | 
|  | defer cleanup() | 
|  |  | 
|  | // Run the available compile task at c2. | 
|  | bot1 := makeBot("bot1", linuxTaskDims) | 
|  | mockBots(t, swarmingClient, bot1) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | tasks, err := s.tCache.UnfinishedTasks() | 
|  | require.NoError(t, err) | 
|  | require.Len(t, tasks, 1) | 
|  | t1 := tasks[0] | 
|  | t1.Status = types.TASK_STATUS_SUCCESS | 
|  | t1.Finished = now.Now(ctx) | 
|  | t1.IsolatedOutput = "abc123/45" | 
|  | require.Equal(t, 0, len(t1.ParentTaskIds)) | 
|  | require.NoError(t, s.putTasks(ctx, []*types.Task{t1})) | 
|  |  | 
|  | // Run the dependent tasks. Ensure that their parent IDs are correct. | 
|  | bot3 := makeBot("bot3", androidTaskDims) | 
|  | bot4 := makeBot("bot4", androidTaskDims) | 
|  | mockBots(t, swarmingClient, bot3, bot4) | 
|  | cas.On("Merge", testutils.AnyContext, []string{"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb/42", "abc123/45"}).Return("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbabc123/87", nil) | 
|  | cas.On("Merge", testutils.AnyContext, []string{"cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc/42", "abc123/45"}).Return("ccccccccccccccccccccccccccccccccccccccccccccccccccccccccccabc123/87", nil) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | tasks, err = s.tCache.UnfinishedTasks() | 
|  | require.NoError(t, err) | 
|  | require.Len(t, tasks, 2) | 
|  | for _, task := range tasks { | 
|  | require.Equal(t, 1, len(task.ParentTaskIds)) | 
|  | p := task.ParentTaskIds[0] | 
|  | require.Equal(t, p, t1.Id) | 
|  |  | 
|  | updated, err := task.UpdateFromTaskResult(&types.TaskResult{ | 
|  | ID:        task.SwarmingTaskId, | 
|  | CasOutput: task.IsolatedOutput, | 
|  | Created:   task.Created, | 
|  | Finished:  task.Finished, | 
|  | Started:   task.Started, | 
|  | Status:    task.Status, | 
|  | Tags: map[string][]string{ | 
|  | types.SWARMING_TAG_ID:             {task.Id}, | 
|  | types.SWARMING_TAG_NAME:           {task.Name}, | 
|  | types.SWARMING_TAG_REPO:           {task.Repo}, | 
|  | types.SWARMING_TAG_REVISION:       {task.Revision}, | 
|  | types.SWARMING_TAG_PARENT_TASK_ID: task.ParentTaskIds, | 
|  | types.SWARMING_TAG_FORCED_JOB_ID:  {task.ForcedJobId}, | 
|  | }, | 
|  | MachineID: task.SwarmingBotId, | 
|  | }) | 
|  | require.NoError(t, err) | 
|  | require.False(t, updated) | 
|  | } | 
|  | } | 
|  |  | 
|  | func TestSkipTasks(t *testing.T) { | 
|  | // skip_tasks has its own tests, so this test just verifies that it's | 
|  | // actually integrated into the scheduler. | 
|  | ctx, _, _, swarmingClient, s, _, _, cleanup := setup(t) | 
|  | defer cleanup() | 
|  | c, cleanupfs := ftestutils.NewClientForTesting(context.Background(), t) | 
|  | defer cleanupfs() | 
|  | bl, err := skip_tasks.New(context.Background(), c) | 
|  | require.NoError(t, err) | 
|  | s.skipTasks = bl | 
|  |  | 
|  | c1 := rs1.Revision | 
|  |  | 
|  | // Mock some bots, add one of the build tasks to the skip_tasks list. | 
|  | bot1 := makeBot("bot1", linuxTaskDims) | 
|  | bot2 := makeBot("bot2", linuxTaskDims) | 
|  | mockBots(t, swarmingClient, bot1, bot2) | 
|  | require.NoError(t, s.GetSkipTasks().AddRule(ctx, &skip_tasks.Rule{ | 
|  | AddedBy:          "Tests", | 
|  | TaskSpecPatterns: []string{".*"}, | 
|  | Commits:          []string{c1}, | 
|  | Description:      "desc", | 
|  | Name:             "My-Rule", | 
|  | }, s.repos)) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | tasks, err := s.tCache.UnfinishedTasks() | 
|  | require.NoError(t, err) | 
|  | // The skipped commit should not have been triggered. | 
|  | require.Len(t, tasks, 1) | 
|  | require.NotEqual(t, c1, tasks[0].Revision) | 
|  | // Candidate diagnostics should indicate the skip rule. | 
|  | diag := lastDiagnostics(t, s) | 
|  | foundSkipped := 0 | 
|  | for _, c := range diag.Candidates { | 
|  | if c.Revision == c1 { | 
|  | foundSkipped++ | 
|  | require.Equal(t, "My-Rule", c.Diagnostics.Filtering.SkippedByRule) | 
|  | } else if c.TaskKey == tasks[0].TaskKey { | 
|  | require.Nil(t, c.Diagnostics.Filtering) | 
|  | } else { | 
|  | require.Equal(t, "", c.Diagnostics.Filtering.SkippedByRule) | 
|  | require.True(t, len(c.Diagnostics.Filtering.UnmetDependencies) > 0) | 
|  | } | 
|  | } | 
|  | // Should be one Build task and one Test task skipped. | 
|  | require.Equal(t, 2, foundSkipped) | 
|  | } | 
|  |  | 
|  | func TestGetTasksForJob(t *testing.T) { | 
|  | ctx, _, _, swarmingClient, s, _, cas, cleanup := setup(t) | 
|  | defer cleanup() | 
|  |  | 
|  | c1 := rs1.Revision | 
|  | c2 := rs2.Revision | 
|  |  | 
|  | // Cycle once, check that we have empty sets for all Jobs. | 
|  | runMainLoop(t, s, ctx) | 
|  | jobs, err := s.jCache.UnfinishedJobs() | 
|  | require.NoError(t, err) | 
|  | require.Len(t, jobs, 5) | 
|  | var j1, j2, j3, j4, j5 *types.Job | 
|  | for _, j := range jobs { | 
|  | if j.Revision == c1 { | 
|  | if j.Name == tcc_testutils.BuildTaskName { | 
|  | j1 = j | 
|  | } else { | 
|  | j2 = j | 
|  | } | 
|  | } else { | 
|  | if j.Name == tcc_testutils.BuildTaskName { | 
|  | j3 = j | 
|  | } else if j.Name == tcc_testutils.TestTaskName { | 
|  | j4 = j | 
|  | } else { | 
|  | j5 = j | 
|  | } | 
|  | } | 
|  | tasksByName, err := s.getTasksForJob(j) | 
|  | require.NoError(t, err) | 
|  | for _, tasks := range tasksByName { | 
|  | require.Empty(t, tasks) | 
|  | } | 
|  | } | 
|  | require.NotNil(t, j1) | 
|  | require.NotNil(t, j2) | 
|  | require.NotNil(t, j3) | 
|  | require.NotNil(t, j4) | 
|  | require.NotNil(t, j5) | 
|  |  | 
|  | // Run the available compile task at c2. | 
|  | bot1 := makeBot("bot1", linuxTaskDims) | 
|  | mockBots(t, swarmingClient, bot1) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | tasks, err := s.tCache.UnfinishedTasks() | 
|  | require.NoError(t, err) | 
|  | require.Len(t, tasks, 1) | 
|  | t1 := tasks[0] | 
|  | require.NotNil(t, t1) | 
|  | require.Equal(t, t1.Revision, c2) | 
|  |  | 
|  | // Test that we get the new tasks where applicable. | 
|  | expect := map[string]map[string][]*types.Task{ | 
|  | j1.Id: { | 
|  | tcc_testutils.BuildTaskName: {}, | 
|  | }, | 
|  | j2.Id: { | 
|  | tcc_testutils.BuildTaskName: {}, | 
|  | tcc_testutils.TestTaskName:  {}, | 
|  | }, | 
|  | j3.Id: { | 
|  | tcc_testutils.BuildTaskName: {t1}, | 
|  | }, | 
|  | j4.Id: { | 
|  | tcc_testutils.BuildTaskName: {t1}, | 
|  | tcc_testutils.TestTaskName:  {}, | 
|  | }, | 
|  | j5.Id: { | 
|  | tcc_testutils.BuildTaskName: {t1}, | 
|  | tcc_testutils.PerfTaskName:  {}, | 
|  | }, | 
|  | } | 
|  | for _, j := range jobs { | 
|  | tasksByName, err := s.getTasksForJob(j) | 
|  | require.NoError(t, err) | 
|  | assertdeep.Equal(t, expect[j.Id], tasksByName) | 
|  | } | 
|  |  | 
|  | // Mark the task as failed. | 
|  | t1.Status = types.TASK_STATUS_FAILURE | 
|  | t1.Finished = now.Now(ctx) | 
|  | require.NoError(t, s.putTasks(ctx, []*types.Task{t1})) | 
|  |  | 
|  | // Test that the results propagated through. | 
|  | for _, j := range jobs { | 
|  | tasksByName, err := s.getTasksForJob(j) | 
|  | require.NoError(t, err) | 
|  | assertdeep.Equal(t, expect[j.Id], tasksByName) | 
|  | } | 
|  |  | 
|  | // Cycle. Ensure that we schedule a retry of t1. | 
|  | // Need two bots, since the retry will score lower than the Build task at c1. | 
|  | bot2 := makeBot("bot2", linuxTaskDims) | 
|  | mockBots(t, swarmingClient, bot1, bot2) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | tasks, err = s.tCache.UnfinishedTasks() | 
|  | require.NoError(t, err) | 
|  | require.Len(t, tasks, 2) | 
|  | var t2, t3 *types.Task | 
|  | for _, task := range tasks { | 
|  | if task.TaskKey == t1.TaskKey { | 
|  | t2 = task | 
|  | } else { | 
|  | t3 = task | 
|  | } | 
|  | } | 
|  | require.NotNil(t, t2) | 
|  | require.Equal(t, t1.Id, t2.RetryOf) | 
|  |  | 
|  | // Verify that both the original t1 and its retry show up. | 
|  | t1, err = s.tCache.GetTask(t1.Id) // t1 was updated. | 
|  | require.NoError(t, err) | 
|  | expect[j1.Id][tcc_testutils.BuildTaskName] = []*types.Task{t3} | 
|  | expect[j2.Id][tcc_testutils.BuildTaskName] = []*types.Task{t3} | 
|  | expect[j3.Id][tcc_testutils.BuildTaskName] = []*types.Task{t1, t2} | 
|  | expect[j4.Id][tcc_testutils.BuildTaskName] = []*types.Task{t1, t2} | 
|  | expect[j5.Id][tcc_testutils.BuildTaskName] = []*types.Task{t1, t2} | 
|  | for _, j := range jobs { | 
|  | tasksByName, err := s.getTasksForJob(j) | 
|  | require.NoError(t, err) | 
|  | assertdeep.Equal(t, expect[j.Id], tasksByName) | 
|  | } | 
|  |  | 
|  | // The retry succeeded. | 
|  | t2.Status = types.TASK_STATUS_SUCCESS | 
|  | t2.Finished = now.Now(ctx) | 
|  | t2.IsolatedOutput = "abc123/45" | 
|  | // The Build at c1 failed. | 
|  | t3.Status = types.TASK_STATUS_FAILURE | 
|  | t3.Finished = now.Now(ctx) | 
|  | require.NoError(t, s.putTasks(ctx, []*types.Task{t2, t3})) | 
|  | mockBots(t, swarmingClient) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | tasks, err = s.tCache.UnfinishedTasks() | 
|  | require.NoError(t, err) | 
|  | require.Empty(t, tasks) | 
|  |  | 
|  | // Test that the results propagated through. | 
|  | for _, j := range jobs { | 
|  | tasksByName, err := s.getTasksForJob(j) | 
|  | require.NoError(t, err) | 
|  | assertdeep.Equal(t, expect[j.Id], tasksByName) | 
|  | } | 
|  |  | 
|  | // Schedule the remaining tasks. | 
|  | bot3 := makeBot("bot3", androidTaskDims) | 
|  | bot4 := makeBot("bot4", androidTaskDims) | 
|  | bot5 := makeBot("bot5", androidTaskDims) | 
|  | mockBots(t, swarmingClient, bot3, bot4, bot5) | 
|  | cas.On("Merge", testutils.AnyContext, []string{"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb/42", "abc123/45"}).Return("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbabc123/87", nil) | 
|  | cas.On("Merge", testutils.AnyContext, []string{"cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc/42", "abc123/45"}).Return("ccccccccccccccccccccccccccccccccccccccccccccccccccccccccccabc123/87", nil) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  |  | 
|  | // Verify that the new tasks show up. | 
|  | tasks, err = s.tCache.UnfinishedTasks() | 
|  | require.NoError(t, err) | 
|  | require.Len(t, tasks, 2) // Test and perf at c2. | 
|  | var t4, t5 *types.Task | 
|  | for _, task := range tasks { | 
|  | if task.Name == tcc_testutils.TestTaskName { | 
|  | t4 = task | 
|  | } else { | 
|  | t5 = task | 
|  | } | 
|  | } | 
|  | expect[j4.Id][tcc_testutils.TestTaskName] = []*types.Task{t4} | 
|  | expect[j5.Id][tcc_testutils.PerfTaskName] = []*types.Task{t5} | 
|  | for _, j := range jobs { | 
|  | tasksByName, err := s.getTasksForJob(j) | 
|  | require.NoError(t, err) | 
|  | assertdeep.Equal(t, expect[j.Id], tasksByName) | 
|  | } | 
|  | } | 
|  |  | 
|  | func TestTaskTimeouts(t *testing.T) { | 
|  | ctx, mg, _, swarmingClient, s, _, _, cleanup := setup(t) | 
|  | defer cleanup() | 
|  |  | 
|  | // The test repo does not set any timeouts. Ensure that we get | 
|  | // reasonable default values. | 
|  | bot1 := makeBot("bot1", map[string]string{"pool": "Skia", "os": "Ubuntu", "gpu": "none"}) | 
|  | mockBots(t, swarmingClient, bot1) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | unfinished, err := s.tCache.UnfinishedTasks() | 
|  | require.NoError(t, err) | 
|  | require.Len(t, unfinished, 1) | 
|  | task := unfinished[0] | 
|  | swarmingTask, err := swarmingClient.GetTaskMetadata(ctx, task.SwarmingTaskId) | 
|  | require.NoError(t, err) | 
|  | // These are the defaults in go/swarming/swarming.go. | 
|  | require.Equal(t, 1, len(swarmingTask.Request.TaskSlices)) | 
|  | require.Equal(t, int64(60*60), swarmingTask.Request.TaskSlices[0].Properties.ExecutionTimeoutSecs) | 
|  | require.Equal(t, int64(20*60), swarmingTask.Request.TaskSlices[0].Properties.IoTimeoutSecs) | 
|  | require.Equal(t, int64(4*60*60), swarmingTask.Request.TaskSlices[0].ExpirationSecs) | 
|  | // Fail the task to get it out of the unfinished list. | 
|  | task.Status = types.TASK_STATUS_FAILURE | 
|  | require.NoError(t, s.putTask(ctx, task)) | 
|  |  | 
|  | // Rewrite tasks.json with some timeouts. | 
|  | name := "Timeout-Task" | 
|  | cfg := &specs.TasksCfg{ | 
|  | CasSpecs: map[string]*specs.CasSpec{ | 
|  | "compile": { | 
|  | Digest: tcc_testutils.CompileCASDigest, | 
|  | }, | 
|  | }, | 
|  | Jobs: map[string]*specs.JobSpec{ | 
|  | "Timeout-Job": { | 
|  | Priority:  1.0, | 
|  | TaskSpecs: []string{name}, | 
|  | }, | 
|  | }, | 
|  | Tasks: map[string]*specs.TaskSpec{ | 
|  | name: { | 
|  | CasSpec:      "compile", | 
|  | CipdPackages: []*specs.CipdPackage{}, | 
|  | Dependencies: []string{}, | 
|  | Dimensions: []string{ | 
|  | "pool:Skia", | 
|  | "os:Mac", | 
|  | "gpu:my-gpu", | 
|  | }, | 
|  | ExecutionTimeout: 40 * time.Minute, | 
|  | Expiration:       2 * time.Hour, | 
|  | IoTimeout:        3 * time.Minute, | 
|  | Priority:         1.0, | 
|  | }, | 
|  | }, | 
|  | } | 
|  | hashes := mg.CommitN(1) | 
|  | require.NoError(t, s.repos.Update(ctx)) | 
|  | rs := types.RepoState{ | 
|  | Repo:     "fake.git", | 
|  | Revision: hashes[0], | 
|  | } | 
|  | fillCaches(t, ctx, s.taskCfgCache, rs, cfg) | 
|  | insertJobs(t, ctx, s, rs) | 
|  |  | 
|  | // Cycle, ensure that we get the expected timeouts. | 
|  | bot2 := makeBot("bot2", map[string]string{"pool": "Skia", "os": "Mac", "gpu": "my-gpu"}) | 
|  | mockBots(t, swarmingClient, bot2) | 
|  | runMainLoop(t, s, ctx) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | unfinished, err = s.tCache.UnfinishedTasks() | 
|  | require.NoError(t, err) | 
|  | require.Len(t, unfinished, 1) | 
|  | task = unfinished[0] | 
|  | require.Equal(t, name, task.Name) | 
|  | swarmingTask, err = swarmingClient.GetTaskMetadata(ctx, task.SwarmingTaskId) | 
|  | require.NoError(t, err) | 
|  | require.Equal(t, 1, len(swarmingTask.Request.TaskSlices)) | 
|  | require.Equal(t, int64(40*60), swarmingTask.Request.TaskSlices[0].Properties.ExecutionTimeoutSecs) | 
|  | require.Equal(t, int64(3*60), swarmingTask.Request.TaskSlices[0].Properties.IoTimeoutSecs) | 
|  | require.Equal(t, int64(2*60*60), swarmingTask.Request.TaskSlices[0].ExpirationSecs) | 
|  | } | 
|  |  | 
|  | func TestUpdateUnfinishedTasks(t *testing.T) { | 
|  | ctx, _, _, swarmingClient, s, _, _, cleanup := setup(t) | 
|  | defer cleanup() | 
|  |  | 
|  | // Create a few tasks. | 
|  | now := time.Unix(1480683321, 0).UTC() | 
|  | t1 := &types.Task{ | 
|  | Id:             "t1", | 
|  | Created:        now.Add(-time.Minute), | 
|  | Status:         types.TASK_STATUS_RUNNING, | 
|  | SwarmingTaskId: "swarmt1", | 
|  | } | 
|  | t2 := &types.Task{ | 
|  | Id:             "t2", | 
|  | Created:        now.Add(-10 * time.Minute), | 
|  | Status:         types.TASK_STATUS_PENDING, | 
|  | SwarmingTaskId: "swarmt2", | 
|  | } | 
|  | t3 := &types.Task{ | 
|  | Id:             "t3", | 
|  | Created:        now.Add(-5 * time.Hour), // Outside the 4-hour window. | 
|  | Status:         types.TASK_STATUS_PENDING, | 
|  | SwarmingTaskId: "swarmt3", | 
|  | } | 
|  | // Include a fake task to ensure it's ignored. | 
|  | t4 := &types.Task{ | 
|  | Id:      "t4", | 
|  | Created: now.Add(-time.Minute), | 
|  | Status:  types.TASK_STATUS_PENDING, | 
|  | } | 
|  |  | 
|  | // Insert the tasks into the DB. | 
|  | tasks := []*types.Task{t1, t2, t3, t4} | 
|  | require.NoError(t, s.putTasks(ctx, tasks)) | 
|  |  | 
|  | // Update the tasks, mock in Swarming. | 
|  | t1.Status = types.TASK_STATUS_SUCCESS | 
|  | t1.IsolatedOutput = "dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd/86" | 
|  | t2.Status = types.TASK_STATUS_FAILURE | 
|  | t3.Status = types.TASK_STATUS_SUCCESS | 
|  | t3.IsolatedOutput = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/256" | 
|  |  | 
|  | m1 := makeSwarmingRpcsTaskRequestMetadata(t, t1, linuxTaskDims) | 
|  | m2 := makeSwarmingRpcsTaskRequestMetadata(t, t2, linuxTaskDims) | 
|  | m3 := makeSwarmingRpcsTaskRequestMetadata(t, t3, linuxTaskDims) | 
|  | swarmingClient.MockTasks([]*swarming_api.SwarmingRpcsTaskRequestMetadata{m1, m2, m3}) | 
|  |  | 
|  | // Assert that the third task doesn't show up in the time range query. | 
|  | got, err := swarmingClient.ListTasks(ctx, now.Add(-4*time.Hour), now, []string{"pool:Skia"}, "") | 
|  | require.NoError(t, err) | 
|  | assertdeep.Equal(t, []*swarming_api.SwarmingRpcsTaskRequestMetadata{m1, m2}, got) | 
|  |  | 
|  | // Ensure that we update the tasks as expected. | 
|  | require.NoError(t, s.updateUnfinishedTasks(ctx)) | 
|  | for _, task := range tasks { | 
|  | got, err := s.db.GetTaskById(ctx, task.Id) | 
|  | require.NoError(t, err) | 
|  | // Ignore DbModified when comparing. | 
|  | task.DbModified = got.DbModified | 
|  | assertdeep.Equal(t, task, got) | 
|  | } | 
|  | } | 
|  |  | 
|  | // setupAddTasksTest calls setup then adds 7 commits to the repo and returns | 
|  | // their hashes. | 
|  | func setupAddTasksTest(t *testing.T) (context.Context, *mem_git.MemGit, []string, *memory.InMemoryDB, *TaskScheduler, func()) { | 
|  | ctx, mg, d, _, s, _, _, cleanup := setup(t) | 
|  |  | 
|  | // Add some commits to test blamelist calculation. | 
|  | mg.CommitN(7) | 
|  | require.NoError(t, s.repos.Update(ctx)) | 
|  | hashes, err := s.repos[rs1.Repo].Get(git.MainBranch).AllCommits() | 
|  | require.NoError(t, err) | 
|  | for _, hash := range hashes { | 
|  | require.NoError(t, s.taskCfgCache.Set(ctx, types.RepoState{ | 
|  | Repo:     rs1.Repo, | 
|  | Revision: hash, | 
|  | }, &specs.TasksCfg{ | 
|  | Tasks: map[string]*specs.TaskSpec{ | 
|  | "duty": {}, | 
|  | "onus": {}, | 
|  | "toil": {}, | 
|  | "work": {}, | 
|  | }, | 
|  | }, nil)) | 
|  | } | 
|  |  | 
|  | return ctx, mg, hashes, d, s, func() { | 
|  | cleanup() | 
|  | } | 
|  | } | 
|  |  | 
|  | // assertBlamelist asserts task.Commits contains exactly the hashes at the given | 
|  | // indexes of hashes, in any order. | 
|  | func assertBlamelist(t *testing.T, hashes []string, task *types.Task, indexes []int) { | 
|  | expected := util.NewStringSet() | 
|  | for _, idx := range indexes { | 
|  | expected[hashes[idx]] = true | 
|  | } | 
|  | require.Equal(t, expected, util.NewStringSet(task.Commits)) | 
|  | } | 
|  |  | 
|  | // assertModifiedTasks asserts that the result of GetModifiedTasks is deep-equal | 
|  | // to expected, in any order. | 
|  | func assertModifiedTasks(t *testing.T, d db.TaskReader, mod <-chan []*types.Task, expected []*types.Task) { | 
|  | tasksById := map[string]*types.Task{} | 
|  | require.Eventually(t, func() bool { | 
|  | // Use a select so that the test will fail after 10 seconds | 
|  | // rather than time out after 10 minutes (or whatever the | 
|  | // overall timeout is set to). | 
|  | select { | 
|  | case modTasks := <-mod: | 
|  | for _, task := range modTasks { | 
|  | tasksById[task.Id] = task | 
|  | } | 
|  | for _, expectedTask := range expected { | 
|  | actualTask, ok := tasksById[expectedTask.Id] | 
|  | if !ok { | 
|  | return false | 
|  | } | 
|  | if !deepequal.DeepEqual(expectedTask, actualTask) { | 
|  | return false | 
|  | } | 
|  | } | 
|  | return true | 
|  | default: | 
|  | // Nothing to do. | 
|  | } | 
|  | return false | 
|  | }, 10*time.Second, 50*time.Millisecond) | 
|  | } | 
|  |  | 
|  | // addTasksSingleTaskSpec should add tasks and compute simple blamelists. | 
|  | func TestAddTasksSingleTaskSpecSimple(t *testing.T) { | 
|  | ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t) | 
|  | defer cleanup() | 
|  |  | 
|  | t1 := makeTask(ctx, "toil", rs1.Repo, hashes[6]) | 
|  | require.NoError(t, s.putTask(ctx, t1)) | 
|  |  | 
|  | mod := d.ModifiedTasksCh(ctx) | 
|  | <-mod // The first batch is unused. | 
|  |  | 
|  | t2 := makeTask(ctx, "toil", rs1.Repo, hashes[5]) // Commits should be {5} | 
|  | t3 := makeTask(ctx, "toil", rs1.Repo, hashes[3]) // Commits should be {3, 4} | 
|  | t4 := makeTask(ctx, "toil", rs1.Repo, hashes[2]) // Commits should be {2} | 
|  | t5 := makeTask(ctx, "toil", rs1.Repo, hashes[0]) // Commits should be {0, 1} | 
|  |  | 
|  | // Clear Commits on some tasks, set incorrect Commits on others to | 
|  | // ensure it's ignored. | 
|  | t3.Commits = nil | 
|  | t4.Commits = []string{hashes[5], hashes[4], hashes[3], hashes[2]} | 
|  | sort.Strings(t4.Commits) | 
|  |  | 
|  | // Specify tasks in wrong order to ensure results are deterministic. | 
|  | require.NoError(t, s.addTasksSingleTaskSpec(ctx, []*types.Task{t5, t2, t3, t4})) | 
|  |  | 
|  | assertBlamelist(t, hashes, t2, []int{5}) | 
|  | assertBlamelist(t, hashes, t3, []int{3, 4}) | 
|  | assertBlamelist(t, hashes, t4, []int{2}) | 
|  | assertBlamelist(t, hashes, t5, []int{0, 1}) | 
|  |  | 
|  | // Check that the tasks were inserted into the DB. | 
|  | assertModifiedTasks(t, d, mod, []*types.Task{t2, t3, t4, t5}) | 
|  | } | 
|  |  | 
|  | // addTasksSingleTaskSpec should compute blamelists when new tasks bisect each | 
|  | // other. | 
|  | func TestAddTasksSingleTaskSpecBisectNew(t *testing.T) { | 
|  | ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t) | 
|  | defer cleanup() | 
|  |  | 
|  | t1 := makeTask(ctx, "toil", rs1.Repo, hashes[6]) | 
|  | require.NoError(t, s.putTask(ctx, t1)) | 
|  |  | 
|  | mod := d.ModifiedTasksCh(ctx) | 
|  | <-mod // The first batch is unused. | 
|  |  | 
|  | // t2.Commits = {1, 2, 3, 4, 5} | 
|  | t2 := makeTask(ctx, "toil", rs1.Repo, hashes[1]) | 
|  | // t3.Commits = {3, 4, 5} | 
|  | // t2.Commits = {1, 2} | 
|  | t3 := makeTask(ctx, "toil", rs1.Repo, hashes[3]) | 
|  | // t4.Commits = {4, 5} | 
|  | // t3.Commits = {3} | 
|  | t4 := makeTask(ctx, "toil", rs1.Repo, hashes[4]) | 
|  | // t5.Commits = {0} | 
|  | t5 := makeTask(ctx, "toil", rs1.Repo, hashes[0]) | 
|  | // t6.Commits = {2} | 
|  | // t2.Commits = {1} | 
|  | t6 := makeTask(ctx, "toil", rs1.Repo, hashes[2]) | 
|  | // t7.Commits = {1} | 
|  | // t2.Commits = {} | 
|  | t7 := makeTask(ctx, "toil", rs1.Repo, hashes[1]) | 
|  |  | 
|  | // Specify tasks in wrong order to ensure results are deterministic. | 
|  | tasks := []*types.Task{t5, t2, t7, t3, t6, t4} | 
|  |  | 
|  | // Assign Ids. | 
|  | for _, task := range tasks { | 
|  | require.NoError(t, d.AssignId(ctx, task)) | 
|  | } | 
|  |  | 
|  | require.NoError(t, s.addTasksSingleTaskSpec(ctx, tasks)) | 
|  |  | 
|  | assertBlamelist(t, hashes, t2, []int{}) | 
|  | assertBlamelist(t, hashes, t3, []int{3}) | 
|  | assertBlamelist(t, hashes, t4, []int{4, 5}) | 
|  | assertBlamelist(t, hashes, t5, []int{0}) | 
|  | assertBlamelist(t, hashes, t6, []int{2}) | 
|  | assertBlamelist(t, hashes, t7, []int{1}) | 
|  |  | 
|  | // Check that the tasks were inserted into the DB. | 
|  | assertModifiedTasks(t, d, mod, tasks) | 
|  | } | 
|  |  | 
|  | // addTasksSingleTaskSpec should compute blamelists when new tasks bisect old | 
|  | // tasks. | 
|  | func TestAddTasksSingleTaskSpecBisectOld(t *testing.T) { | 
|  | ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t) | 
|  | defer cleanup() | 
|  |  | 
|  | t1 := makeTask(ctx, "toil", rs1.Repo, hashes[6]) | 
|  | t2 := makeTask(ctx, "toil", rs1.Repo, hashes[1]) | 
|  | t2.Commits = []string{hashes[1], hashes[2], hashes[3], hashes[4], hashes[5]} | 
|  | sort.Strings(t2.Commits) | 
|  | require.NoError(t, s.putTasks(ctx, []*types.Task{t1, t2})) | 
|  |  | 
|  | mod := d.ModifiedTasksCh(ctx) | 
|  | <-mod // The first batch is unused. | 
|  |  | 
|  | // t3.Commits = {3, 4, 5} | 
|  | // t2.Commits = {1, 2} | 
|  | t3 := makeTask(ctx, "toil", rs1.Repo, hashes[3]) | 
|  | // t4.Commits = {4, 5} | 
|  | // t3.Commits = {3} | 
|  | t4 := makeTask(ctx, "toil", rs1.Repo, hashes[4]) | 
|  | // t5.Commits = {2} | 
|  | // t2.Commits = {1} | 
|  | t5 := makeTask(ctx, "toil", rs1.Repo, hashes[2]) | 
|  | // t6.Commits = {4, 5} | 
|  | // t4.Commits = {} | 
|  | t6 := makeTask(ctx, "toil", rs1.Repo, hashes[4]) | 
|  |  | 
|  | // Specify tasks in wrong order to ensure results are deterministic. | 
|  | require.NoError(t, s.addTasksSingleTaskSpec(ctx, []*types.Task{t5, t3, t6, t4})) | 
|  |  | 
|  | t2Updated, err := d.GetTaskById(ctx, t2.Id) | 
|  | require.NoError(t, err) | 
|  | assertBlamelist(t, hashes, t2Updated, []int{1}) | 
|  | assertBlamelist(t, hashes, t3, []int{3}) | 
|  | assertBlamelist(t, hashes, t4, []int{}) | 
|  | assertBlamelist(t, hashes, t5, []int{2}) | 
|  | assertBlamelist(t, hashes, t6, []int{4, 5}) | 
|  |  | 
|  | // Check that the tasks were inserted into the DB. | 
|  | t2.Commits = t2Updated.Commits | 
|  | t2.DbModified = t2Updated.DbModified | 
|  | assertModifiedTasks(t, d, mod, []*types.Task{t2, t3, t4, t5, t6}) | 
|  | } | 
|  |  | 
|  | // addTasksSingleTaskSpec should update existing tasks, keeping the correct | 
|  | // blamelist. | 
|  | func TestAddTasksSingleTaskSpecUpdate(t *testing.T) { | 
|  | ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t) | 
|  | defer cleanup() | 
|  |  | 
|  | t1 := makeTask(ctx, "toil", rs1.Repo, hashes[6]) | 
|  | t2 := makeTask(ctx, "toil", rs1.Repo, hashes[3]) | 
|  | t3 := makeTask(ctx, "toil", rs1.Repo, hashes[4]) | 
|  | t3.Commits = nil // Stolen by t5 | 
|  | t4 := makeTask(ctx, "toil", rs1.Repo, hashes[0]) | 
|  | t4.Commits = []string{hashes[0], hashes[1], hashes[2]} | 
|  | sort.Strings(t4.Commits) | 
|  | t5 := makeTask(ctx, "toil", rs1.Repo, hashes[4]) | 
|  | t5.Commits = []string{hashes[4], hashes[5]} | 
|  | sort.Strings(t5.Commits) | 
|  |  | 
|  | tasks := []*types.Task{t1, t2, t3, t4, t5} | 
|  | require.NoError(t, s.putTasks(ctx, tasks)) | 
|  |  | 
|  | mod := d.ModifiedTasksCh(ctx) | 
|  | <-mod // The first batch is unused. | 
|  |  | 
|  | // Make an update. | 
|  | for _, task := range tasks { | 
|  | task.Status = types.TASK_STATUS_MISHAP | 
|  | } | 
|  |  | 
|  | // Specify tasks in wrong order to ensure results are deterministic. | 
|  | require.NoError(t, s.addTasksSingleTaskSpec(ctx, []*types.Task{t5, t3, t1, t4, t2})) | 
|  |  | 
|  | // Check that blamelists did not change. | 
|  | assertBlamelist(t, hashes, t1, []int{6}) | 
|  | assertBlamelist(t, hashes, t2, []int{3}) | 
|  | assertBlamelist(t, hashes, t3, []int{}) | 
|  | assertBlamelist(t, hashes, t4, []int{0, 1, 2}) | 
|  | assertBlamelist(t, hashes, t5, []int{4, 5}) | 
|  |  | 
|  | // Check that the tasks were inserted into the DB. | 
|  | assertModifiedTasks(t, d, mod, []*types.Task{t1, t2, t3, t4, t5}) | 
|  | } | 
|  |  | 
|  | // AddTasks should call addTasksSingleTaskSpec for each group of tasks. | 
|  | func TestAddTasks(t *testing.T) { | 
|  | ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t) | 
|  | defer cleanup() | 
|  |  | 
|  | toil1 := makeTask(ctx, "toil", rs1.Repo, hashes[6]) | 
|  | duty1 := makeTask(ctx, "duty", rs1.Repo, hashes[6]) | 
|  | work1 := makeTask(ctx, "work", rs1.Repo, hashes[6]) | 
|  | work2 := makeTask(ctx, "work", rs1.Repo, hashes[1]) | 
|  | work2.Commits = []string{hashes[1], hashes[2], hashes[3], hashes[4], hashes[5]} | 
|  | sort.Strings(work2.Commits) | 
|  | onus1 := makeTask(ctx, "onus", rs1.Repo, hashes[6]) | 
|  | onus2 := makeTask(ctx, "onus", rs1.Repo, hashes[3]) | 
|  | onus2.Commits = []string{hashes[3], hashes[4], hashes[5]} | 
|  | sort.Strings(onus2.Commits) | 
|  | require.NoError(t, s.putTasks(ctx, []*types.Task{toil1, duty1, work1, work2, onus1, onus2})) | 
|  |  | 
|  | mod := d.ModifiedTasksCh(ctx) | 
|  | <-mod // The first batch is unused. | 
|  |  | 
|  | // toil2.Commits = {5} | 
|  | toil2 := makeTask(ctx, "toil", rs1.Repo, hashes[5]) | 
|  | // toil3.Commits = {3, 4} | 
|  | toil3 := makeTask(ctx, "toil", rs1.Repo, hashes[3]) | 
|  |  | 
|  | // duty2.Commits = {1, 2, 3, 4, 5} | 
|  | duty2 := makeTask(ctx, "duty", rs1.Repo, hashes[1]) | 
|  | // duty3.Commits = {3, 4, 5} | 
|  | // duty2.Commits = {1, 2} | 
|  | duty3 := makeTask(ctx, "duty", rs1.Repo, hashes[3]) | 
|  |  | 
|  | // work3.Commits = {3, 4, 5} | 
|  | // work2.Commits = {1, 2} | 
|  | work3 := makeTask(ctx, "work", rs1.Repo, hashes[3]) | 
|  | // work4.Commits = {2} | 
|  | // work2.Commits = {1} | 
|  | work4 := makeTask(ctx, "work", rs1.Repo, hashes[2]) | 
|  |  | 
|  | onus2.Status = types.TASK_STATUS_MISHAP | 
|  | // onus3 steals all commits from onus2 | 
|  | onus3 := makeTask(ctx, "onus", rs1.Repo, hashes[3]) | 
|  | // onus4 steals all commits from onus3 | 
|  | onus4 := makeTask(ctx, "onus", rs1.Repo, hashes[3]) | 
|  |  | 
|  | tasks := map[string]map[string][]*types.Task{ | 
|  | rs1.Repo: { | 
|  | "toil": {toil2, toil3}, | 
|  | "duty": {duty2, duty3}, | 
|  | "work": {work3, work4}, | 
|  | "onus": {onus2, onus3, onus4}, | 
|  | }, | 
|  | } | 
|  |  | 
|  | require.NoError(t, s.addTasks(ctx, tasks)) | 
|  |  | 
|  | assertBlamelist(t, hashes, toil2, []int{5}) | 
|  | assertBlamelist(t, hashes, toil3, []int{3, 4}) | 
|  |  | 
|  | assertBlamelist(t, hashes, duty2, []int{1, 2}) | 
|  | assertBlamelist(t, hashes, duty3, []int{3, 4, 5}) | 
|  |  | 
|  | work2Updated, err := d.GetTaskById(ctx, work2.Id) | 
|  | require.NoError(t, err) | 
|  | assertBlamelist(t, hashes, work2Updated, []int{1}) | 
|  | assertBlamelist(t, hashes, work3, []int{3, 4, 5}) | 
|  | assertBlamelist(t, hashes, work4, []int{2}) | 
|  |  | 
|  | assertBlamelist(t, hashes, onus2, []int{}) | 
|  | assertBlamelist(t, hashes, onus3, []int{}) | 
|  | assertBlamelist(t, hashes, onus4, []int{3, 4, 5}) | 
|  |  | 
|  | // Check that the tasks were inserted into the DB. | 
|  | work2.Commits = work2Updated.Commits | 
|  | work2.DbModified = work2Updated.DbModified | 
|  | assertModifiedTasks(t, d, mod, []*types.Task{toil2, toil3, duty2, duty3, work2, work3, work4, onus2, onus3, onus4}) | 
|  | } | 
|  |  | 
|  | // AddTasks should not leave DB in an inconsistent state if there is a partial error. | 
|  | func TestAddTasksFailure(t *testing.T) { | 
|  | ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t) | 
|  | defer cleanup() | 
|  |  | 
|  | toil1 := makeTask(ctx, "toil", rs1.Repo, hashes[6]) | 
|  | duty1 := makeTask(ctx, "duty", rs1.Repo, hashes[6]) | 
|  | duty2 := makeTask(ctx, "duty", rs1.Repo, hashes[5]) | 
|  | require.NoError(t, s.putTasks(ctx, []*types.Task{toil1, duty1, duty2})) | 
|  | d.Wait() | 
|  |  | 
|  | // Cause ErrConcurrentUpdate in AddTasks. | 
|  | cachedDuty2 := duty2.Copy() | 
|  | duty2.Status = types.TASK_STATUS_MISHAP | 
|  | require.NoError(t, d.PutTask(ctx, duty2)) | 
|  | d.Wait() | 
|  |  | 
|  | mod := d.ModifiedTasksCh(ctx) | 
|  | <-mod // The first batch is unused. | 
|  |  | 
|  | // toil2.Commits = {3, 4, 5} | 
|  | toil2 := makeTask(ctx, "toil", rs1.Repo, hashes[3]) | 
|  |  | 
|  | cachedDuty2.Status = types.TASK_STATUS_FAILURE | 
|  | // duty3.Commits = {3, 4} | 
|  | duty3 := makeTask(ctx, "duty", rs1.Repo, hashes[3]) | 
|  |  | 
|  | tasks := map[string]map[string][]*types.Task{ | 
|  | rs1.Repo: { | 
|  | "toil": {toil2}, | 
|  | "duty": {cachedDuty2, duty3}, | 
|  | }, | 
|  | } | 
|  |  | 
|  | // Try multiple times to reduce chance of test passing flakily. | 
|  | for i := 0; i < 3; i++ { | 
|  | err := s.addTasks(ctx, tasks) | 
|  | require.Error(t, err) | 
|  | modTasks := <-mod | 
|  | // "duty" tasks should never be updated. | 
|  | for _, task := range modTasks { | 
|  | require.Equal(t, "toil", task.Name) | 
|  | assertdeep.Equal(t, toil2, task) | 
|  | assertBlamelist(t, hashes, toil2, []int{3, 4, 5}) | 
|  | } | 
|  | } | 
|  |  | 
|  | duty2.Status = types.TASK_STATUS_FAILURE | 
|  | tasks[rs1.Repo]["duty"] = []*types.Task{duty2, duty3} | 
|  | require.NoError(t, s.addTasks(ctx, tasks)) | 
|  |  | 
|  | assertBlamelist(t, hashes, toil2, []int{3, 4, 5}) | 
|  |  | 
|  | assertBlamelist(t, hashes, duty2, []int{5}) | 
|  | assertBlamelist(t, hashes, duty3, []int{3, 4}) | 
|  |  | 
|  | // Check that the tasks were inserted into the DB. | 
|  | assertModifiedTasks(t, d, mod, []*types.Task{toil2, duty2, duty3}) | 
|  | } | 
|  |  | 
|  | // AddTasks should retry on ErrConcurrentUpdate. | 
|  | func TestAddTasksRetries(t *testing.T) { | 
|  | ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t) | 
|  | defer cleanup() | 
|  |  | 
|  | toil1 := makeTask(ctx, "toil", rs1.Repo, hashes[6]) | 
|  | duty1 := makeTask(ctx, "duty", rs1.Repo, hashes[6]) | 
|  | work1 := makeTask(ctx, "work", rs1.Repo, hashes[6]) | 
|  | toil2 := makeTask(ctx, "toil", rs1.Repo, hashes[1]) | 
|  | toil2.Commits = []string{hashes[1], hashes[2], hashes[3], hashes[4], hashes[5]} | 
|  | sort.Strings(toil2.Commits) | 
|  | duty2 := makeTask(ctx, "duty", rs1.Repo, hashes[1]) | 
|  | duty2.Commits = util.CopyStringSlice(toil2.Commits) | 
|  | work2 := makeTask(ctx, "work", rs1.Repo, hashes[1]) | 
|  | work2.Commits = util.CopyStringSlice(toil2.Commits) | 
|  | require.NoError(t, s.putTasks(ctx, []*types.Task{toil1, toil2, duty1, duty2, work1, work2})) | 
|  |  | 
|  | mod := d.ModifiedTasksCh(ctx) | 
|  | <-mod // The first batch is unused. | 
|  |  | 
|  | // *3.Commits = {3, 4, 5} | 
|  | // *2.Commits = {1, 2} | 
|  | toil3 := makeTask(ctx, "toil", rs1.Repo, hashes[3]) | 
|  | duty3 := makeTask(ctx, "duty", rs1.Repo, hashes[3]) | 
|  | work3 := makeTask(ctx, "work", rs1.Repo, hashes[3]) | 
|  | // *4.Commits = {2} | 
|  | // *2.Commits = {1} | 
|  | toil4 := makeTask(ctx, "toil", rs1.Repo, hashes[2]) | 
|  | duty4 := makeTask(ctx, "duty", rs1.Repo, hashes[2]) | 
|  | work4 := makeTask(ctx, "work", rs1.Repo, hashes[2]) | 
|  |  | 
|  | tasks := map[string]map[string][]*types.Task{ | 
|  | rs1.Repo: { | 
|  | "toil": {toil3.Copy(), toil4.Copy()}, | 
|  | "duty": {duty3.Copy(), duty4.Copy()}, | 
|  | "work": {work3.Copy(), work4.Copy()}, | 
|  | }, | 
|  | } | 
|  |  | 
|  | retryCountMtx := sync.Mutex{} | 
|  | retryCount := map[string]int{} | 
|  | causeConcurrentUpdate := func(tasks []*types.Task) { | 
|  | retryCountMtx.Lock() | 
|  | defer retryCountMtx.Unlock() | 
|  | retryCount[tasks[0].Name]++ | 
|  | if tasks[0].Name == "toil" && retryCount["toil"] < 2 { | 
|  | toil2.Started = now.Now(ctx).UTC() | 
|  | require.NoError(t, d.PutTasks(ctx, []*types.Task{toil2})) | 
|  | s.tCache.AddTasks([]*types.Task{toil2}) | 
|  | } | 
|  | if tasks[0].Name == "duty" && retryCount["duty"] < 3 { | 
|  | duty2.Started = now.Now(ctx).UTC() | 
|  | require.NoError(t, d.PutTasks(ctx, []*types.Task{duty2})) | 
|  | s.tCache.AddTasks([]*types.Task{duty2}) | 
|  | } | 
|  | if tasks[0].Name == "work" && retryCount["work"] < 4 { | 
|  | work2.Started = now.Now(ctx).UTC() | 
|  | require.NoError(t, d.PutTasks(ctx, []*types.Task{work2})) | 
|  | s.tCache.AddTasks([]*types.Task{work2}) | 
|  | } | 
|  | } | 
|  | s.db = &spyDB{ | 
|  | DB:         d, | 
|  | onPutTasks: causeConcurrentUpdate, | 
|  | } | 
|  |  | 
|  | require.NoError(t, s.addTasks(ctx, tasks)) | 
|  |  | 
|  | retryCountMtx.Lock() | 
|  | defer retryCountMtx.Unlock() | 
|  | require.Equal(t, 2, retryCount["toil"]) | 
|  | require.Equal(t, 3, retryCount["duty"]) | 
|  | require.Equal(t, 4, retryCount["work"]) | 
|  |  | 
|  | modified := []*types.Task{} | 
|  | check := func(t2, t3, t4 *types.Task) { | 
|  | t2InDB, err := d.GetTaskById(ctx, t2.Id) | 
|  | require.NoError(t, err) | 
|  | assertBlamelist(t, hashes, t2InDB, []int{1}) | 
|  | t3Arg := tasks[t3.Repo][t3.Name][0] | 
|  | t3InDB, err := d.GetTaskById(ctx, t3Arg.Id) | 
|  | require.NoError(t, err) | 
|  | assertdeep.Equal(t, t3Arg, t3InDB) | 
|  | assertBlamelist(t, hashes, t3InDB, []int{3, 4, 5}) | 
|  | t4Arg := tasks[t4.Repo][t4.Name][1] | 
|  | t4InDB, err := d.GetTaskById(ctx, t4Arg.Id) | 
|  | require.NoError(t, err) | 
|  | assertdeep.Equal(t, t4Arg, t4InDB) | 
|  | assertBlamelist(t, hashes, t4InDB, []int{2}) | 
|  | t2.Commits = t2InDB.Commits | 
|  | t2.DbModified = t2InDB.DbModified | 
|  | t3.Id = t3InDB.Id | 
|  | t3.Commits = t3InDB.Commits | 
|  | t3.DbModified = t3InDB.DbModified | 
|  | t4.Id = t4InDB.Id | 
|  | t4.Commits = t4InDB.Commits | 
|  | t4.DbModified = t4InDB.DbModified | 
|  | modified = append(modified, t2, t3, t4) | 
|  | } | 
|  |  | 
|  | check(toil2, toil3, toil4) | 
|  | check(duty2, duty3, duty4) | 
|  | check(work2, work3, work4) | 
|  | assertModifiedTasks(t, d, mod, modified) | 
|  | } | 
|  |  | 
|  | func TestTriggerTaskFailed(t *testing.T) { | 
|  | // Verify that if one task out of a set fails to trigger, the others are | 
|  | // still inserted into the DB and handled properly, eg. wrt. blamelists. | 
|  | ctx, _, _, s, swarmingClient, commits, _, _, cleanup := testMultipleCandidatesBackfillingEachOtherSetup(t) | 
|  | defer cleanup() | 
|  |  | 
|  | // Trigger three tasks. We should attempt to trigger tasks at | 
|  | // commits[0], commits[4], and either commits[2] or commits[6]. Mock | 
|  | // failure to trigger the task at commits[4] and ensure that the other | 
|  | // two tasks get inserted with the correct blamelists. | 
|  | bot1 := makeBot("bot1", map[string]string{"pool": "Skia"}) | 
|  | bot2 := makeBot("bot2", map[string]string{"pool": "Skia"}) | 
|  | bot3 := makeBot("bot3", map[string]string{"pool": "Skia"}) | 
|  | makeTags := func(commit string) []string { | 
|  | return []string{ | 
|  | "luci_project:", | 
|  | "milo_host:https://ci.chromium.org/raw/build/%s", | 
|  | "sk_attempt:0", | 
|  | "sk_dim_pool:Skia", | 
|  | "sk_retry_of:", | 
|  | fmt.Sprintf("source_revision:%s", commit), | 
|  | "source_repo:" + fmt.Sprintf(gitiles.CommitURL, rs1.Repo, "%s"), | 
|  | fmt.Sprintf("sk_repo:%s", rs1.Repo), | 
|  | fmt.Sprintf("sk_revision:%s", commit), | 
|  | "sk_forced_job_id:", | 
|  | "sk_name:faketask", | 
|  | } | 
|  | } | 
|  | mockBots(t, swarmingClient, bot1, bot2, bot3) | 
|  | swarmingClient.MockTriggerTaskFailure(makeTags(commits[4])) | 
|  | err := s.MainLoop(ctx) | 
|  | s.testWaitGroup.Wait() | 
|  | require.NotNil(t, err) | 
|  | require.True(t, strings.Contains(err.Error(), "Mocked trigger failure!")) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | require.Equal(t, 6, len(s.queue)) | 
|  | tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, commits) | 
|  | require.NoError(t, err) | 
|  |  | 
|  | var t1, t2, t3 *types.Task | 
|  | for _, byName := range tasks { | 
|  | for _, task := range byName { | 
|  | if task.Revision == commits[0] { | 
|  | t1 = task | 
|  | } else if task.Revision == commits[4] { | 
|  | t2 = task | 
|  | } else if task.Revision == commits[2] || task.Revision == commits[6] { | 
|  | t3 = task | 
|  | } else { | 
|  | require.FailNow(t, fmt.Sprintf("Task has unknown revision: %v", task)) | 
|  | } | 
|  | } | 
|  | } | 
|  | require.NotNil(t, t1) | 
|  | require.Nil(t, t2) | 
|  | require.NotNil(t, t3) | 
|  |  | 
|  | // Ensure that we got the blamelists right. | 
|  | var expect1, expect3 []string | 
|  | if t3.Revision == commits[2] { | 
|  | expect1 = util.CopyStringSlice(commits[:2]) | 
|  | expect3 = util.CopyStringSlice(commits[2:]) | 
|  | } else { | 
|  | expect1 = util.CopyStringSlice(commits[:6]) | 
|  | expect3 = util.CopyStringSlice(commits[6:]) | 
|  | } | 
|  | sort.Strings(expect1) | 
|  | sort.Strings(expect3) | 
|  | sort.Strings(t1.Commits) | 
|  | sort.Strings(t3.Commits) | 
|  | assertdeep.Equal(t, expect1, t1.Commits) | 
|  | assertdeep.Equal(t, expect3, t3.Commits) | 
|  |  | 
|  | // Check diagnostics. | 
|  | diag := lastDiagnostics(t, s) | 
|  | failedTrigger := 0 | 
|  | for _, c := range diag.Candidates { | 
|  | if c.Revision == commits[4] { | 
|  | require.True(t, strings.Contains(c.Diagnostics.Triggering.TriggerError, "Mocked trigger failure!")) | 
|  | failedTrigger++ | 
|  | } else { | 
|  | if c.TaskKey == t1.TaskKey { | 
|  | require.Equal(t, "", c.Diagnostics.Triggering.TriggerError) | 
|  | require.Equal(t, t1.Id, c.Diagnostics.Triggering.TaskId) | 
|  | } else if c.TaskKey == t3.TaskKey { | 
|  | require.Equal(t, "", c.Diagnostics.Triggering.TriggerError) | 
|  | require.Equal(t, t3.Id, c.Diagnostics.Triggering.TaskId) | 
|  | } else { | 
|  | require.Nil(t, c.Diagnostics.Triggering) | 
|  | } | 
|  | } | 
|  | } | 
|  | // Should be one task that failed | 
|  | require.Equal(t, 1, failedTrigger) | 
|  | } | 
|  |  | 
|  | const badTaskName = "badtask" | 
|  |  | 
|  | type mockDB struct { | 
|  | db.DB | 
|  | failAssignId bool | 
|  | failPutTasks bool | 
|  | } | 
|  |  | 
|  | func (d *mockDB) AssignId(ctx context.Context, t *types.Task) error { | 
|  | if t.Name == badTaskName && d.failAssignId { | 
|  | return errors.New(badTaskName) | 
|  | } | 
|  | return d.DB.AssignId(ctx, t) | 
|  | } | 
|  |  | 
|  | func (d *mockDB) PutTasks(ctx context.Context, tasks []*types.Task) error { | 
|  | for _, t := range tasks { | 
|  | if t.Name == badTaskName && d.failPutTasks { | 
|  | return errors.New(badTaskName) | 
|  | } | 
|  | } | 
|  | return d.DB.PutTasks(ctx, tasks) | 
|  | } | 
|  |  | 
|  | func TestContinueOnTriggerTaskFailure(t *testing.T) { | 
|  | // Verify that if one task out of a set fails any part of the triggering | 
|  | // process, the others are still triggered, inserted into the DB, etc. | 
|  | ctx, mg, _, s, swarmingClient, commits, _, cfg, cleanup := testMultipleCandidatesBackfillingEachOtherSetup(t) | 
|  | defer cleanup() | 
|  |  | 
|  | // Add several more commits. | 
|  | newCommits := mg.CommitN(10) | 
|  | commits = append(newCommits, commits...) | 
|  | badCommit := commits[0] | 
|  | badTaskName := "badtask" | 
|  | badPool := "BadPool" | 
|  | s.pools = []string{"Skia", badPool} | 
|  | require.NoError(t, s.repos.Update(ctx)) | 
|  | for _, hash := range newCommits { | 
|  | rs := types.RepoState{ | 
|  | Repo:     rs1.Repo, | 
|  | Revision: hash, | 
|  | } | 
|  | c := cfg.Copy() | 
|  | if hash == badCommit { | 
|  | c.Tasks[badTaskName] = &specs.TaskSpec{ | 
|  | CasSpec:      "compile", | 
|  | CipdPackages: []*specs.CipdPackage{}, | 
|  | Dependencies: []string{}, | 
|  | Dimensions:   []string{fmt.Sprintf("pool:%s", badPool)}, | 
|  | Priority:     1.0, | 
|  | } | 
|  | c.Jobs["badjob"] = &specs.JobSpec{ | 
|  | TaskSpecs: []string{badTaskName}, | 
|  | } | 
|  | } | 
|  | fillCaches(t, ctx, s.taskCfgCache, rs, c) | 
|  | insertJobs(t, ctx, s, rs) | 
|  | } | 
|  |  | 
|  | finishAll := func() { | 
|  | tasks, err := s.tCache.UnfinishedTasks() | 
|  | require.NoError(t, err) | 
|  | for _, task := range tasks { | 
|  | task.Finished = now.Now(ctx) | 
|  | task.Status = types.TASK_STATUS_SUCCESS | 
|  | } | 
|  | require.NoError(t, s.db.PutTasks(ctx, tasks)) | 
|  | } | 
|  | finishAll() // The setup func triggers a task. | 
|  |  | 
|  | badTags := []string{ | 
|  | "luci_project:", | 
|  | "milo_host:https://ci.chromium.org/raw/build/%s", | 
|  | "sk_attempt:0", | 
|  | fmt.Sprintf("sk_dim_pool:%s", badPool), | 
|  | "sk_retry_of:", | 
|  | fmt.Sprintf("source_revision:%s", badCommit), | 
|  | "source_repo:" + fmt.Sprintf(gitiles.CommitURL, rs1.Repo, "%s"), | 
|  | fmt.Sprintf("sk_repo:%s", rs1.Repo), | 
|  | fmt.Sprintf("sk_revision:%s", badCommit), | 
|  | "sk_forced_job_id:", | 
|  | fmt.Sprintf("sk_name:%s", badTaskName), | 
|  | } | 
|  |  | 
|  | test := func() { | 
|  | // Pretend there are two bots available. | 
|  | bots := []*types.Machine{ | 
|  | makeBot("bot0", map[string]string{"pool": "Skia"}), | 
|  | makeBot("bot1", map[string]string{"pool": badPool}), | 
|  | } | 
|  | mockBots(t, swarmingClient, bots...) | 
|  |  | 
|  | // Run MainLoop. | 
|  | err := s.MainLoop(ctx) | 
|  | s.testWaitGroup.Wait() | 
|  | require.NotNil(t, err) | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  |  | 
|  | // We'll try to trigger all tasks but the one for the bad commit will | 
|  | // fail. Ensure that we triggered all of the others. | 
|  | tasks, err := s.tCache.UnfinishedTasks() | 
|  | require.NoError(t, err) | 
|  | require.Len(t, tasks, 1) | 
|  | require.NotEqual(t, badTaskName, tasks[0].Name) | 
|  |  | 
|  | // Clean up for the next iteration. | 
|  | finishAll() | 
|  | } | 
|  |  | 
|  | // 1. Actual triggering failed. | 
|  | swarmingClient.MockTriggerTaskFailure(badTags) | 
|  | test() | 
|  |  | 
|  | // 2. DB.AssignId failed. | 
|  | mdb := &mockDB{ | 
|  | DB:           s.db, | 
|  | failAssignId: true, | 
|  | } | 
|  | s.db = mdb | 
|  | test() | 
|  |  | 
|  | // 3. DB.PutTasks failed. | 
|  | mdb.failAssignId = false | 
|  | mdb.failPutTasks = true | 
|  | test() | 
|  |  | 
|  | // 4. Failure to load details of de-duplicated task after triggering. | 
|  | mdb.failPutTasks = false | 
|  | swarmingClient.MockTriggerTaskDeduped(badTags, "thistagwontparse") | 
|  | test() | 
|  | s.db = mdb.DB | 
|  |  | 
|  | // 5. Failure to merge CAS entries. | 
|  | // TODO(borenet) | 
|  | } | 
|  |  | 
|  | func TestTriggerTaskDeduped(t *testing.T) { | 
|  | // Verify that we properly handle de-duplicated tasks. | 
|  | ctx, _, _, s, swarmingClient, commits, _, _, cleanup := testMultipleCandidatesBackfillingEachOtherSetup(t) | 
|  | defer cleanup() | 
|  |  | 
|  | // Trigger three tasks. We should attempt to trigger tasks at | 
|  | // commits[0], commits[4], and either commits[2] or commits[6]. Mock | 
|  | // deduplication of the task at commits[4] and ensure that the other | 
|  | // two tasks are not deduped. | 
|  | bot1 := makeBot("bot1", map[string]string{"pool": "Skia"}) | 
|  | bot2 := makeBot("bot2", map[string]string{"pool": "Skia"}) | 
|  | bot3 := makeBot("bot3", map[string]string{"pool": "Skia"}) | 
|  | makeTags := func(commit string) []string { | 
|  | return []string{ | 
|  | "luci_project:", | 
|  | "milo_host:https://ci.chromium.org/raw/build/%s", | 
|  | "sk_attempt:0", | 
|  | "sk_dim_pool:Skia", | 
|  | "sk_retry_of:", | 
|  | fmt.Sprintf("source_revision:%s", commit), | 
|  | "source_repo:" + fmt.Sprintf(gitiles.CommitURL, rs1.Repo, "%s"), | 
|  | fmt.Sprintf("sk_repo:%s", rs1.Repo), | 
|  | fmt.Sprintf("sk_revision:%s", commit), | 
|  | "sk_forced_job_id:", | 
|  | "sk_name:faketask", | 
|  | } | 
|  | } | 
|  | mockBots(t, swarmingClient, bot1, bot2, bot3) | 
|  | swarmingClient.MockTriggerTaskDeduped(makeTags(commits[4])) | 
|  | require.NoError(t, s.MainLoop(ctx)) | 
|  | s.testWaitGroup.Wait() | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | require.Equal(t, 5, len(s.queue)) | 
|  | tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, commits) | 
|  | require.NoError(t, err) | 
|  |  | 
|  | var t1, t2, t3 *types.Task | 
|  | for _, byName := range tasks { | 
|  | for _, task := range byName { | 
|  | if task.Revision == commits[0] { | 
|  | t1 = task | 
|  | } else if task.Revision == commits[4] { | 
|  | t2 = task | 
|  | } else if task.Revision == commits[2] || task.Revision == commits[6] { | 
|  | t3 = task | 
|  | } else { | 
|  | require.FailNow(t, fmt.Sprintf("Task has unknown revision: %v", task)) | 
|  | } | 
|  | } | 
|  | } | 
|  | require.NotNil(t, t1) | 
|  | require.NotNil(t, t2) | 
|  | require.NotNil(t, t3) | 
|  |  | 
|  | // Ensure that t2 was correctly deduped, and the others weren't. | 
|  | require.Equal(t, types.TASK_STATUS_PENDING, t1.Status) | 
|  | require.Equal(t, types.TASK_STATUS_SUCCESS, t2.Status) | 
|  | require.Equal(t, types.TASK_STATUS_PENDING, t3.Status) | 
|  | } | 
|  |  | 
|  | func TestTriggerTaskNoResource(t *testing.T) { | 
|  | // Verify that we properly handle tasks which are rejected due to lack | 
|  | // of matching bots. | 
|  | ctx, _, _, s, swarmingClient, commits, _, _, cleanup := testMultipleCandidatesBackfillingEachOtherSetup(t) | 
|  | defer cleanup() | 
|  |  | 
|  | // Attempt to trigger a task. It gets rejected because the bot we | 
|  | // thought was available to run it is now busy/quarantined/offline. | 
|  | bot1 := makeBot("bot1", map[string]string{"pool": "Skia"}) | 
|  | makeTags := func(commit string) []string { | 
|  | return []string{ | 
|  | "luci_project:", | 
|  | "milo_host:https://ci.chromium.org/raw/build/%s", | 
|  | "sk_attempt:0", | 
|  | "sk_dim_pool:Skia", | 
|  | "sk_retry_of:", | 
|  | fmt.Sprintf("source_revision:%s", commit), | 
|  | "source_repo:" + fmt.Sprintf(gitiles.CommitURL, rs1.Repo, "%s"), | 
|  | fmt.Sprintf("sk_repo:%s", rs1.Repo), | 
|  | fmt.Sprintf("sk_revision:%s", commit), | 
|  | "sk_forced_job_id:", | 
|  | "sk_name:faketask", | 
|  | } | 
|  | } | 
|  | mockBots(t, swarmingClient, bot1) | 
|  | swarmingClient.MockTriggerTaskNoResource(makeTags(commits[0])) | 
|  | err := s.MainLoop(ctx) | 
|  | require.NotNil(t, err) | 
|  | require.True(t, strings.Contains(err.Error(), "No bots available to run faketask with dimensions: pool:Skia")) | 
|  | s.testWaitGroup.Wait() | 
|  | require.NoError(t, s.tCache.Update(ctx)) | 
|  | require.Equal(t, 8, len(s.queue)) | 
|  | tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, []string{commits[0]}) | 
|  | require.NoError(t, err) | 
|  | require.Equal(t, 0, len(tasks[commits[0]])) | 
|  | } | 
|  |  | 
|  | func TestScoreCandidate_TryJob_PrioritizedHigherByWaitTime(t *testing.T) { | 
|  | ctx := context.Background() | 
|  |  | 
|  | test := func(name, jobCreated, now string, expectedScore float64) { | 
|  | t.Run(name, func(t *testing.T) { | 
|  | s := TaskScheduler{} | 
|  | job := types.Job{ | 
|  | Created:  rfc3339(t, jobCreated), | 
|  | Priority: specs.DEFAULT_JOB_SPEC_PRIORITY, | 
|  | } | 
|  | tc := asTryJob(TaskCandidate{ | 
|  | Jobs: []*types.Job{&job}, | 
|  | }) | 
|  | s.scoreCandidate(ctx, &tc, rfc3339(t, now), timeDoesNotMatter, nil) | 
|  | assert.InDelta(t, expectedScore, tc.Score, 0.0001) | 
|  | }) | 
|  | } | 
|  |  | 
|  | test("no waiting", "2021-10-01T15:00:00Z", "2021-10-01T15:00:00Z", 5) | 
|  | test("10 min waiting", "2021-10-01T15:00:00Z", "2021-10-01T15:10:00Z", 5.08333) | 
|  | test("30 min waiting", "2021-10-01T08:00:00Z", "2021-10-01T08:30:00Z", 5.25) | 
|  | test("2 hours waiting", "2021-10-01T13:00:00Z", "2021-10-01T15:00:00Z", 6) | 
|  | } | 
|  |  | 
|  | func TestScoreCandidate_TryJob_PrioritizedLowerByNumRetries(t *testing.T) { | 
|  | ctx := context.Background() | 
|  |  | 
|  | test := func(name string, numRetries int, expectedScore float64) { | 
|  | t.Run(name, func(t *testing.T) { | 
|  | s := TaskScheduler{} | 
|  | ts := rfc3339(t, "2021-10-01T15:00:00Z") // fixed time indicating no waiting | 
|  | job := types.Job{ | 
|  | Created:  ts, | 
|  | Priority: specs.DEFAULT_JOB_SPEC_PRIORITY, | 
|  | } | 
|  | tc := asTryJob(TaskCandidate{ | 
|  | Jobs:    []*types.Job{&job}, | 
|  | Attempt: numRetries, | 
|  | }) | 
|  | s.scoreCandidate(ctx, &tc, ts, timeDoesNotMatter, nil) | 
|  | assert.InDelta(t, expectedScore, tc.Score, 0.0001) | 
|  | }) | 
|  | } | 
|  |  | 
|  | test("first try", 0, 5) | 
|  | test("second try", 1, 3.75) | 
|  | test("third try", 2, 2.8125) | 
|  | test("tenth try", 9, 0.3754) | 
|  | } | 
|  |  | 
|  | func TestScoreCandidate_TryJob_MultipleJobPrioritiesImpactScore(t *testing.T) { | 
|  | ctx := context.Background() | 
|  |  | 
|  | test := func(name string, expectedScore float64, jobPriorities ...float64) { | 
|  | t.Run(name, func(t *testing.T) { | 
|  | s := TaskScheduler{} | 
|  | ts := rfc3339(t, "2021-10-01T15:00:00Z") // fixed time indicating no waiting | 
|  | tc := asTryJob(TaskCandidate{}) | 
|  | // For each job priority passed in, create a job with that priority that's a part | 
|  | // of this task candidate. | 
|  | for _, jp := range jobPriorities { | 
|  | tc.Jobs = append(tc.Jobs, &types.Job{ | 
|  | Created:  ts, | 
|  | Priority: jp, | 
|  | }) | 
|  | } | 
|  | s.scoreCandidate(ctx, &tc, ts, timeDoesNotMatter, nil) | 
|  | assert.InDelta(t, expectedScore, tc.Score, 0.0001) | 
|  | }) | 
|  | } | 
|  |  | 
|  | test("one job, default priority", 5, specs.DEFAULT_JOB_SPEC_PRIORITY) | 
|  | test("one job, higher priority", 7.5, 0.75) | 
|  | test("one job, highest priority", 10, 1.0) | 
|  | test("one job, lower priority", 2.5, 0.25) | 
|  | test("one job, lowest priority", 0.001, 0.0001) | 
|  |  | 
|  | test("out of range priority results in default", 5, 1234567) | 
|  | test("zero priority results in default", 5, 0) | 
|  |  | 
|  | // We score tasks higher if they have more jobs. | 
|  | test("two jobs, medium priority", 7.5, 0.5, 0.5) | 
|  | test("three jobs, medium priority", 8.75, 0.5, 0.5, 0.5) | 
|  | test("four jobs, medium priority", 9.375, 0.5, 0.5, 0.5, 0.5) | 
|  | test("four jobs, mixed priorities", 9.811, 0.1, 0.7, 0.3, 0.9) | 
|  | // Double-check the math for the mixed priorities test | 
|  | assert.InDelta(t, .9811, 1-(1-0.1)*(1-0.7)*(1-0.3)*(1-0.9), 0.0001) | 
|  | } | 
|  |  | 
|  | func TestScoreCandidate_TryJob_OldestJobOnlyUsedForWaitTime(t *testing.T) { | 
|  | ctx := context.Background() | 
|  |  | 
|  | cycleStart := rfc3339(t, "2021-10-01T15:00:00Z") | 
|  | test := func(name string, expectedScore float64, jobCreated ...string) { | 
|  | t.Run(name, func(t *testing.T) { | 
|  | s := TaskScheduler{} | 
|  | tc := asTryJob(TaskCandidate{}) | 
|  | for _, ts := range jobCreated { | 
|  | tc.Jobs = append(tc.Jobs, &types.Job{ | 
|  | Created:  rfc3339(t, ts), | 
|  | Priority: specs.DEFAULT_JOB_SPEC_PRIORITY, | 
|  | }) | 
|  | } | 
|  | s.scoreCandidate(ctx, &tc, cycleStart, timeDoesNotMatter, nil) | 
|  | assert.InDelta(t, expectedScore, tc.Score, 0.0001) | 
|  | }) | 
|  | } | 
|  |  | 
|  | // These should all be the same because the first job (which is presumed to be the oldest) | 
|  | // is the time that is used. | 
|  | test("two jobs, both waited 2 hours", 9, "2021-10-01T13:00:00Z", "2021-10-01T13:00:00Z") | 
|  | test("two jobs, only one waited 2 hours", 9, "2021-10-01T13:00:00Z", "2021-10-01T14:55:00Z") | 
|  | } | 
|  |  | 
|  | func TestScoreCandidate_ForcedJob_PrioritizedHigherByWaitTime(t *testing.T) { | 
|  | ctx := context.Background() | 
|  |  | 
|  | test := func(name, jobCreated, now string, expectedScore float64) { | 
|  | t.Run(name, func(t *testing.T) { | 
|  | s := TaskScheduler{} | 
|  | job := types.Job{ | 
|  | Created:  rfc3339(t, jobCreated), | 
|  | Priority: specs.DEFAULT_JOB_SPEC_PRIORITY, | 
|  | } | 
|  | tc := asForcedJob(TaskCandidate{ | 
|  | Jobs: []*types.Job{&job}, | 
|  | }) | 
|  | s.scoreCandidate(ctx, &tc, rfc3339(t, now), timeDoesNotMatter, nil) | 
|  | assert.InDelta(t, expectedScore, tc.Score, 0.0001) | 
|  | }) | 
|  | } | 
|  |  | 
|  | test("no waiting", "2021-10-01T15:00:00Z", "2021-10-01T15:00:00Z", 50) | 
|  | test("10 min waiting", "2021-10-01T15:00:00Z", "2021-10-01T15:10:00Z", 50.08333) | 
|  | test("30 min waiting", "2021-10-01T08:00:00Z", "2021-10-01T08:30:00Z", 50.25) | 
|  | test("2 hours waiting", "2021-10-01T13:00:00Z", "2021-10-01T15:00:00Z", 51) | 
|  | } | 
|  |  | 
|  | func TestScoreCandidate_ForcedJob_MultipleJobPrioritiesImpactScore(t *testing.T) { | 
|  | ctx := context.Background() | 
|  |  | 
|  | test := func(name string, expectedScore float64, jobPriorities ...float64) { | 
|  | t.Run(name, func(t *testing.T) { | 
|  | s := TaskScheduler{} | 
|  | ts := rfc3339(t, "2021-10-01T15:00:00Z") // fixed time indicating no waiting | 
|  | tc := asForcedJob(TaskCandidate{}) | 
|  | // For each job priority passed in, create a job with that priority that's a part | 
|  | // of this task candidate. | 
|  | for _, jp := range jobPriorities { | 
|  | tc.Jobs = append(tc.Jobs, &types.Job{ | 
|  | Created:  ts, | 
|  | Priority: jp, | 
|  | }) | 
|  | } | 
|  | s.scoreCandidate(ctx, &tc, ts, timeDoesNotMatter, nil) | 
|  | assert.InDelta(t, expectedScore, tc.Score, 0.0001) | 
|  | }) | 
|  | } | 
|  |  | 
|  | test("one job, default priority", 50, specs.DEFAULT_JOB_SPEC_PRIORITY) | 
|  | test("one job, higher priority", 75, 0.75) | 
|  | test("one job, highest priority", 100, 1.0) | 
|  | test("one job, lower priority", 25, 0.25) | 
|  | test("one job, lowest priority", 0.01, 0.0001) | 
|  |  | 
|  | test("out of range priority results in default", 50, 1234567) | 
|  | test("zero priority results in default", 50, 0) | 
|  |  | 
|  | // We score tasks higher if they have more jobs. | 
|  | test("two jobs, medium priority", 75, 0.5, 0.5) | 
|  | test("three jobs, medium priority", 87.5, 0.5, 0.5, 0.5) | 
|  | test("four jobs, medium priority", 93.75, 0.5, 0.5, 0.5, 0.5) | 
|  | test("four jobs, mixed priorities", 98.11, 0.1, 0.7, 0.3, 0.9) | 
|  | // Double-check the math for the mixed priorities test | 
|  | assert.InDelta(t, .9811, 1-(1-0.1)*(1-0.7)*(1-0.3)*(1-0.9), 0.0001) | 
|  | } | 
|  |  | 
|  | func TestScoreCandidate_ForcedJob_OldestJobOnlyUsedForWaitTime(t *testing.T) { | 
|  | ctx := context.Background() | 
|  |  | 
|  | cycleStart := rfc3339(t, "2021-10-01T15:00:00Z") | 
|  | test := func(name string, expectedScore float64, jobCreated ...string) { | 
|  | t.Run(name, func(t *testing.T) { | 
|  | s := TaskScheduler{} | 
|  | tc := asForcedJob(TaskCandidate{}) | 
|  | for _, ts := range jobCreated { | 
|  | tc.Jobs = append(tc.Jobs, &types.Job{ | 
|  | Created:  rfc3339(t, ts), | 
|  | Priority: specs.DEFAULT_JOB_SPEC_PRIORITY, | 
|  | }) | 
|  | } | 
|  | s.scoreCandidate(ctx, &tc, cycleStart, timeDoesNotMatter, nil) | 
|  | assert.InDelta(t, expectedScore, tc.Score, 0.0001) | 
|  | }) | 
|  | } | 
|  |  | 
|  | // These should all be the same because the first job (which is presumed to be the oldest) | 
|  | // is the time that is used. | 
|  | test("two jobs, both waited 2 hours", 76.5, "2021-10-01T13:00:00Z", "2021-10-01T13:00:00Z") | 
|  | test("two jobs, only one waited 2 hours", 76.5, "2021-10-01T13:00:00Z", "2021-10-01T14:55:00Z") | 
|  | } | 
|  |  | 
|  | func TestComputeBlamelist_NoExistingTests(t *testing.T) { | 
|  | ctx := context.Background() | 
|  |  | 
|  | const repoName = "some_repo" | 
|  | const taskName = "Test-Foo-Bar" | 
|  | tcg := tcc_mocks.TasksAlwaysDefined(taskName) | 
|  |  | 
|  | mg, repo := newMemRepo(t) | 
|  | firstCommit := mg.Commit("a") | 
|  | secondCommit := mg.Commit("b", firstCommit) | 
|  | thirdCommit := mg.Commit("c", secondCommit) | 
|  |  | 
|  | mtc := &cache_mocks.TaskCache{} | 
|  | mtc.On("GetTaskForCommit", repoName, mock.Anything, taskName).Return(nil, nil) | 
|  |  | 
|  | blamelistNoTask := func(name string, revision string, expectedBlamelist []string) { | 
|  | t.Run(name, func(t *testing.T) { | 
|  | blamedCommits, stealFromTask, err := ComputeBlamelist(ctx, mtc, repo, taskName, repoName, repo.Get(revision), commitsBuffer, tcg, window_mocks.AllInclusiveWindow()) | 
|  | require.NoError(t, err) | 
|  | assert.Equal(t, expectedBlamelist, blamedCommits) | 
|  | assert.Nil(t, stealFromTask) | 
|  | }) | 
|  | } | 
|  | // We expect this commit and all following commits to be blamed | 
|  | blamelistNoTask("first commit", firstCommit, []string{firstCommit}) | 
|  | blamelistNoTask("second commit", secondCommit, []string{secondCommit, firstCommit}) | 
|  | blamelistNoTask("third commit", thirdCommit, []string{thirdCommit, secondCommit, firstCommit}) | 
|  | } | 
|  |  | 
|  | func TestComputeBlamelist_FirstCommitTested(t *testing.T) { | 
|  | ctx := context.Background() | 
|  |  | 
|  | const repoName = "some_repo" | 
|  | const taskName = "Test-Foo-Bar" | 
|  | tcg := tcc_mocks.TasksAlwaysDefined(taskName) | 
|  |  | 
|  | mg, repo := newMemRepo(t) | 
|  | firstCommit := mg.Commit("a") | 
|  | secondCommit := mg.Commit("b", firstCommit) | 
|  | thirdCommit := mg.Commit("c", secondCommit) | 
|  |  | 
|  | firstCommitTask := newTask("task-1", firstCommit) | 
|  |  | 
|  | mtc := &cache_mocks.TaskCache{} | 
|  | mtc.On("GetTaskForCommit", repoName, firstCommit, taskName).Return(firstCommitTask, nil) | 
|  | mtc.On("GetTaskForCommit", repoName, mock.Anything, taskName).Return(nil, nil) | 
|  |  | 
|  | blamelistNoTask := func(name string, revision string, expectedBlamelist []string) { | 
|  | t.Run(name, func(t *testing.T) { | 
|  | blamedCommits, stealFromTask, err := ComputeBlamelist(ctx, mtc, repo, taskName, repoName, repo.Get(revision), commitsBuffer, tcg, window_mocks.AllInclusiveWindow()) | 
|  | require.NoError(t, err) | 
|  | assert.Equal(t, expectedBlamelist, blamedCommits) | 
|  | assert.Nil(t, stealFromTask) | 
|  | }) | 
|  | } | 
|  | // For commits after the task, the blamelist should extend back, but not include that commit. | 
|  | blamelistNoTask("second commit", secondCommit, []string{secondCommit}) | 
|  | blamelistNoTask("third commit", thirdCommit, []string{thirdCommit, secondCommit}) | 
|  |  | 
|  | // Calculating a blame for a commit which already ran a task is considered a retry. As such, | 
|  | // the entire blamelist is returned. | 
|  | t.Run("first commit (retry)", func(t *testing.T) { | 
|  | blamedCommits, stealFromTask, err := ComputeBlamelist(ctx, mtc, repo, taskName, repoName, repo.Get(firstCommit), commitsBuffer, tcg, window_mocks.AllInclusiveWindow()) | 
|  | require.NoError(t, err) | 
|  | assert.Equal(t, []string{firstCommit}, blamedCommits) | 
|  | assert.Equal(t, firstCommitTask, stealFromTask) | 
|  | }) | 
|  | } | 
|  |  | 
|  | func TestComputeBlamelist_LastCommitTested_FollowingCommitsBlamed(t *testing.T) { | 
|  | ctx := context.Background() | 
|  |  | 
|  | const repoName = "some_repo" | 
|  | const taskName = "Test-Foo-Bar" | 
|  | tcg := tcc_mocks.TasksAlwaysDefined(taskName) | 
|  |  | 
|  | mg, repo := newMemRepo(t) | 
|  | firstCommit := mg.Commit("a") | 
|  | secondCommit := mg.Commit("b", firstCommit) | 
|  | thirdCommit := mg.Commit("c", secondCommit) | 
|  |  | 
|  | thirdCommitTask := newTask("task-1", thirdCommit, secondCommit, firstCommit) | 
|  |  | 
|  | mtc := &cache_mocks.TaskCache{} | 
|  | // By returning this task for all commits, we are saying that every commit is covered by | 
|  | // the task that ran on the last commit. | 
|  | mtc.On("GetTaskForCommit", repoName, mock.Anything, taskName).Return(thirdCommitTask, nil) | 
|  |  | 
|  | blamelistAndTask := func(name string, revision string, expectedBlamelist []string, expectedTask *types.Task) { | 
|  | t.Run(name, func(t *testing.T) { | 
|  | blamedCommits, stealFromTask, err := ComputeBlamelist(ctx, mtc, repo, taskName, repoName, repo.Get(revision), commitsBuffer, tcg, window_mocks.AllInclusiveWindow()) | 
|  | require.NoError(t, err) | 
|  | assert.Equal(t, expectedBlamelist, blamedCommits) | 
|  | assert.Equal(t, expectedTask, stealFromTask) | 
|  | }) | 
|  | } | 
|  |  | 
|  | blamelistAndTask("first commit", firstCommit, []string{firstCommit}, thirdCommitTask) | 
|  | blamelistAndTask("second commit", secondCommit, []string{secondCommit, firstCommit}, thirdCommitTask) | 
|  | // Calculating a blame for a commit which already ran a task is considered a retry. As such, | 
|  | // the entire blamelist is returned. | 
|  | blamelistAndTask("third commit (retry)", thirdCommit, []string{thirdCommit, secondCommit, firstCommit}, thirdCommitTask) | 
|  | } | 
|  |  | 
|  | func TestComputeBlamelist_BlamelistTooLong_UseProvidedCommit(t *testing.T) { | 
|  | ctx := context.Background() | 
|  |  | 
|  | const repoName = "some_repo" | 
|  | const taskName = "Test-Foo-Bar" | 
|  | tcg := tcc_mocks.TasksAlwaysDefined(taskName) | 
|  |  | 
|  | // Pretend we have 1000 commits in a row. We make sure their hashes are unique but predictable | 
|  | // by using the integer values from 0 to 999 prefixed with enough zeros to make them 40 digits | 
|  | // (the length of a real-world git SHA1 hash). | 
|  | require.Greater(t, 1000, MAX_BLAMELIST_COMMITS) | 
|  | mg, repo := newMemRepo(t) | 
|  | mg.Commit("0") | 
|  | for i := 1; i < 1000; i++ { | 
|  | h := fmt.Sprintf("Commit_%d", i) | 
|  | mg.Commit(h) | 
|  | } | 
|  | lastCommit := repo.Get(repo.Branches()[0]) | 
|  | require.Equal(t, "Commit_999", lastCommit.Subject) | 
|  |  | 
|  | mtc := &cache_mocks.TaskCache{} | 
|  | mtc.On("GetTaskForCommit", repoName, mock.Anything, taskName).Return(nil, nil) | 
|  |  | 
|  | blamedCommits, stealFromTask, err := ComputeBlamelist(ctx, mtc, repo, taskName, repoName, lastCommit, commitsBuffer, tcg, window_mocks.AllInclusiveWindow()) | 
|  | require.NoError(t, err) | 
|  | assert.Equal(t, []string{lastCommit.Hash}, blamedCommits) | 
|  | assert.Nil(t, stealFromTask) | 
|  | } | 
|  |  | 
|  | func TestComputeBlamelist_BranchInHistory_NonOverlappingCoverage(t *testing.T) { | 
|  | ctx := context.Background() | 
|  |  | 
|  | const repoName = "some_repo" | 
|  | const taskName = "Test-Foo-Bar" | 
|  | tcg := tcc_mocks.TasksAlwaysDefined(taskName) | 
|  |  | 
|  | // This represents a git history with a branch and merge like this. | 
|  | // The asterisks represent at which commits a task has run. Only one of the branches | 
|  | // has a test, so commit 2 is not "double covered". | 
|  | //     0 | 
|  | //     1* | 
|  | //     2 | 
|  | //   3a  3b | 
|  | //   4a* 4b | 
|  | //   5a   | | 
|  | //      6 | 
|  | mg, repo := newMemRepo(t) | 
|  | zero := mg.Commit("0") | 
|  | one := mg.Commit("1", zero) | 
|  | two := mg.Commit("2", one) | 
|  | threeA := mg.Commit("3a", two) | 
|  | threeB := mg.Commit("3b", two) | 
|  | fourA := mg.Commit("4a", threeA) | 
|  | fourB := mg.Commit("4b", threeB) | 
|  | fiveA := mg.Commit("5a", fourA) | 
|  | six := mg.Commit("6", fourB, fiveA) | 
|  |  | 
|  | commitOneTask := newTask("task-1", one, zero) | 
|  | commitFourATask := newTask("task-2", fourA, threeA, two) | 
|  |  | 
|  | mtc := &cache_mocks.TaskCache{} | 
|  | mtc.On("GetTaskForCommit", repoName, zero, taskName).Return(commitOneTask, nil) | 
|  | mtc.On("GetTaskForCommit", repoName, one, taskName).Return(commitOneTask, nil) | 
|  | mtc.On("GetTaskForCommit", repoName, two, taskName).Return(commitFourATask, nil) | 
|  | mtc.On("GetTaskForCommit", repoName, threeA, taskName).Return(commitFourATask, nil) | 
|  | mtc.On("GetTaskForCommit", repoName, fourA, taskName).Return(commitFourATask, nil) | 
|  | mtc.On("GetTaskForCommit", repoName, mock.Anything, taskName).Return(nil, nil) | 
|  |  | 
|  | blamelistAndTask := func(name string, revision string, expectedBlamelist []string, expectedTask *types.Task) { | 
|  | t.Run(name, func(t *testing.T) { | 
|  | blamedCommits, stealFromTask, err := ComputeBlamelist(ctx, mtc, repo, taskName, repoName, repo.Get(revision), commitsBuffer, tcg, window_mocks.AllInclusiveWindow()) | 
|  | require.NoError(t, err) | 
|  | assert.Equal(t, expectedBlamelist, blamedCommits) | 
|  | assert.Equal(t, expectedTask, stealFromTask) | 
|  | }) | 
|  | } | 
|  |  | 
|  | blamelistAndTask("commit zero", zero, []string{zero}, commitOneTask) | 
|  | blamelistAndTask("commit one", one, []string{one, zero}, commitOneTask) | 
|  | blamelistAndTask("commit two", two, []string{two}, commitFourATask) | 
|  | blamelistAndTask("commit threeA", threeA, []string{threeA, two}, commitFourATask) | 
|  | blamelistAndTask("commit threeB", threeB, []string{threeB}, nil) | 
|  | blamelistAndTask("commit fourA", fourA, []string{fourA, threeA, two}, commitFourATask) | 
|  | blamelistAndTask("commit fourB", fourB, []string{fourB, threeB}, nil) | 
|  | blamelistAndTask("commit fiveA", fiveA, []string{fiveA}, nil) | 
|  | blamelistAndTask("commit six", six, []string{six, fourB, threeB, fiveA}, nil) | 
|  | } | 
|  |  | 
|  | var ( | 
|  | // Use of timeDoesNotMatter indicates that the time does not affect the outputs. | 
|  | timeDoesNotMatter = time.Time{} | 
|  | commitsBuffer     = make([]*repograph.Commit, 0, MAX_BLAMELIST_COMMITS) | 
|  | ) | 
|  |  | 
|  | func rfc3339(t *testing.T, n string) time.Time { | 
|  | ts, err := time.Parse(time.RFC3339, n) | 
|  | require.NoError(t, err) | 
|  | return ts | 
|  | } | 
|  |  | 
|  | func asTryJob(c TaskCandidate) TaskCandidate { | 
|  | c.Issue = "nonempty" | 
|  | c.Patchset = "nonempty" | 
|  | c.Server = "nonempty" | 
|  | if !c.IsTryJob() { | 
|  | panic("Should be a tryjob task candidate now") | 
|  | } | 
|  | return c | 
|  | } | 
|  |  | 
|  | func asForcedJob(c TaskCandidate) TaskCandidate { | 
|  | c.ForcedJobId = "nonempty" | 
|  | if !c.IsForceRun() { | 
|  | panic("Should be a tryjob task candidate now") | 
|  | } | 
|  | return c | 
|  | } | 
|  |  | 
|  | func newTask(id string, ranAt string, alsoCovered ...string) *types.Task { | 
|  | nt := types.Task{Id: id} | 
|  | nt.Revision = ranAt | 
|  | nt.Commits = append(nt.Commits, ranAt) | 
|  | for _, c := range alsoCovered { | 
|  | nt.Commits = append(nt.Commits, c) | 
|  | } | 
|  | return &nt | 
|  | } | 
|  |  | 
|  | func TestRegenerateTaskQueue_OnlyBestCandidateForCD(t *testing.T) { | 
|  |  | 
|  | ctx := context.Background() | 
|  |  | 
|  | // Create a TasksCfg. | 
|  | tc := &specs.TasksCfg{ | 
|  | Jobs: map[string]*specs.JobSpec{ | 
|  | "cd-job": { | 
|  | IsCD: true, | 
|  | TaskSpecs: []string{ | 
|  | "cd-task", | 
|  | }, | 
|  | }, | 
|  | }, | 
|  | Tasks: map[string]*specs.TaskSpec{ | 
|  | "cd-task": { | 
|  | CasSpec:    "empty", | 
|  | Dimensions: []string{fmt.Sprintf("pool:%s", cdPoolName)}, | 
|  | }, | 
|  | }, | 
|  | CasSpecs: map[string]*specs.CasSpec{ | 
|  | "empty": { | 
|  | Digest: rbe.EmptyDigest, | 
|  | }, | 
|  | }, | 
|  | } | 
|  | tcc := &tcc_mocks.TaskCfgCache{} | 
|  | tcc.On("Get", mock.Anything, mock.Anything).Return(tc, nil, nil) | 
|  |  | 
|  | // There are three commits in the repo. | 
|  | mg, repo := newMemRepo(t) | 
|  | hashes := mg.CommitN(3) | 
|  | rs1 := types.RepoState{ | 
|  | Repo:     "fake-repo", | 
|  | Revision: hashes[2], | 
|  | } | 
|  | rs2 := types.RepoState{ | 
|  | Repo:     "fake-repo", | 
|  | Revision: hashes[1], | 
|  | } | 
|  | rs3 := types.RepoState{ | 
|  | Repo:     "fake-repo", | 
|  | Revision: hashes[0], | 
|  | } | 
|  | repos := repograph.Map{ | 
|  | rs1.Repo: repo, | 
|  | } | 
|  |  | 
|  | // Add jobs to the cache. | 
|  | jCache := &cache_mocks.JobCache{} | 
|  | jobs := []*types.Job{ | 
|  | { | 
|  | Name:         "cd-job", | 
|  | RepoState:    rs1, | 
|  | Created:      rfc3339(t, "2021-10-01T15:00:00Z"), | 
|  | Dependencies: map[string][]string{"cd-task": {}}, | 
|  | }, | 
|  | { | 
|  | Name:         "cd-job", | 
|  | RepoState:    rs2, | 
|  | Created:      rfc3339(t, "2021-10-01T15:00:00Z"), | 
|  | Dependencies: map[string][]string{"cd-task": {}}, | 
|  | }, | 
|  | { | 
|  | Name:         "cd-job", | 
|  | RepoState:    rs3, | 
|  | Created:      rfc3339(t, "2021-10-01T15:00:00Z"), | 
|  | Dependencies: map[string][]string{"cd-task": {}}, | 
|  | }, | 
|  | } | 
|  | jCache.On("UnfinishedJobs").Return(jobs, nil) | 
|  |  | 
|  | // No tasks in the TaskCache. | 
|  | tCache := &cache_mocks.TaskCache{} | 
|  | tCache.On("GetTasksByKey", mock.Anything).Return([]*types.Task{}, nil) | 
|  | tCache.On("GetTaskForCommit", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) | 
|  |  | 
|  | // Set up a time window to include everything. | 
|  | w := window_mocks.AllInclusiveWindow() | 
|  |  | 
|  | // Create the scheduler. | 
|  | s := &TaskScheduler{ | 
|  | candidateMetrics: map[string]metrics2.Int64Metric{}, | 
|  | jCache:           jCache, | 
|  | repos:            repos, | 
|  | taskCfgCache:     tcc, | 
|  | tCache:           tCache, | 
|  | window:           w, | 
|  | } | 
|  |  | 
|  | // Regenerate the task queue. | 
|  | queue, _, err := s.regenerateTaskQueue(ctx) | 
|  | require.NoError(t, err) | 
|  |  | 
|  | // There should only be one task in the queue, despite the fact that there | 
|  | // are three candidates available, since we only consider running the newest | 
|  | // candidate. | 
|  | require.Len(t, queue, 1) | 
|  | c := queue[0] | 
|  | require.Len(t, c.Commits, 3) | 
|  | require.True(t, c.IsCD) | 
|  | require.Equal(t, rs3.Revision, c.Revision) | 
|  | } | 
|  |  | 
|  | func TestRegenerateTaskQueue_NoBackfillForCD(t *testing.T) { | 
|  |  | 
|  | ctx := context.Background() | 
|  |  | 
|  | // Create a TasksCfg. | 
|  | tc := &specs.TasksCfg{ | 
|  | Jobs: map[string]*specs.JobSpec{ | 
|  | "cd-job": { | 
|  | IsCD: true, | 
|  | TaskSpecs: []string{ | 
|  | "cd-task", | 
|  | }, | 
|  | }, | 
|  | }, | 
|  | Tasks: map[string]*specs.TaskSpec{ | 
|  | "cd-task": { | 
|  | CasSpec:    "empty", | 
|  | Dimensions: []string{fmt.Sprintf("pool:%s", cdPoolName)}, | 
|  | }, | 
|  | }, | 
|  | CasSpecs: map[string]*specs.CasSpec{ | 
|  | "empty": { | 
|  | Digest: rbe.EmptyDigest, | 
|  | }, | 
|  | }, | 
|  | } | 
|  | tcc := &tcc_mocks.TaskCfgCache{} | 
|  | tcc.On("Get", mock.Anything, mock.Anything).Return(tc, nil, nil) | 
|  |  | 
|  | // There are three commits in the repo. | 
|  | gb, repo := newMemRepo(t) | 
|  | hashes := gb.CommitN(3) | 
|  | rs1 := types.RepoState{ | 
|  | Repo:     "fake-repo", | 
|  | Revision: hashes[2], | 
|  | } | 
|  | rs2 := types.RepoState{ | 
|  | Repo:     "fake-repo", | 
|  | Revision: hashes[1], | 
|  | } | 
|  | rs3 := types.RepoState{ | 
|  | Repo:     "fake-repo", | 
|  | Revision: hashes[0], | 
|  | } | 
|  | repos := repograph.Map{ | 
|  | rs1.Repo: repo, | 
|  | } | 
|  |  | 
|  | // Add jobs to the cache. | 
|  | jCache := &cache_mocks.JobCache{} | 
|  | jobs := []*types.Job{ | 
|  | { | 
|  | Name:         "cd-job", | 
|  | RepoState:    rs1, | 
|  | Created:      rfc3339(t, "2021-10-01T15:00:00Z"), | 
|  | Dependencies: map[string][]string{"cd-task": {}}, | 
|  | }, | 
|  | { | 
|  | Name:         "cd-job", | 
|  | RepoState:    rs2, | 
|  | Created:      rfc3339(t, "2021-10-01T15:00:00Z"), | 
|  | Dependencies: map[string][]string{"cd-task": {}}, | 
|  | }, | 
|  | { | 
|  | Name:         "cd-job", | 
|  | RepoState:    rs3, | 
|  | Created:      rfc3339(t, "2021-10-01T15:00:00Z"), | 
|  | Dependencies: map[string][]string{"cd-task": {}}, | 
|  | }, | 
|  | } | 
|  | jCache.On("UnfinishedJobs").Return(jobs, nil) | 
|  |  | 
|  | // One task already ran at the newest commit. | 
|  | t3 := &types.Task{ | 
|  | Commits: []string{rs3.Revision, rs2.Revision, rs1.Revision}, | 
|  | Id:      "fake-task", | 
|  | TaskKey: types.TaskKey{ | 
|  | Name:      "cd-task", | 
|  | RepoState: rs3, | 
|  | }, | 
|  | Status: types.TASK_STATUS_SUCCESS, | 
|  | } | 
|  | tCache := &cache_mocks.TaskCache{} | 
|  | tCache.On("GetTasksByKey", types.TaskKey{ | 
|  | RepoState: rs3, | 
|  | Name:      "cd-task", | 
|  | }).Return([]*types.Task{t3}, nil) | 
|  | tCache.On("GetTasksByKey", mock.Anything).Return([]*types.Task{}, nil) | 
|  | tCache.On("GetTaskForCommit", rs1.Repo, rs1.Revision, "cd-task").Return(t3, nil) | 
|  | tCache.On("GetTaskForCommit", rs2.Repo, rs2.Revision, "cd-task").Return(t3, nil) | 
|  |  | 
|  | // Set up a time window to include everything. | 
|  | w := window_mocks.AllInclusiveWindow() | 
|  |  | 
|  | // Create the scheduler. | 
|  | s := &TaskScheduler{ | 
|  | candidateMetrics: map[string]metrics2.Int64Metric{}, | 
|  | jCache:           jCache, | 
|  | repos:            repos, | 
|  | taskCfgCache:     tcc, | 
|  | tCache:           tCache, | 
|  | window:           w, | 
|  | } | 
|  |  | 
|  | // Regenerate the task queue. We should end up with zero candidates, despite | 
|  | // the fact that we have two unfinished jobs at the older commits. | 
|  | queue, _, err := s.regenerateTaskQueue(ctx) | 
|  | require.NoError(t, err) | 
|  | require.Empty(t, queue) | 
|  | } | 
|  |  | 
|  | func TestFilterTaskCandidates_NoCDTasksInRegularPools(t *testing.T) { | 
|  |  | 
|  | ctx := context.Background() | 
|  |  | 
|  | // Create a single candidate, which is a CD task but requests to be run in a | 
|  | // non-CD pool. | 
|  | k1 := types.TaskKey{ | 
|  | RepoState: rs1, | 
|  | Name:      tcc_testutils.BuildTaskName, | 
|  | } | 
|  | candidate := &TaskCandidate{ | 
|  | IsCD:    true, | 
|  | TaskKey: k1, | 
|  | TaskSpec: &specs.TaskSpec{ | 
|  | Dimensions: []string{"pool:Skia"}, | 
|  | }, | 
|  | } | 
|  | candidates := map[types.TaskKey]*TaskCandidate{ | 
|  | k1: candidate, | 
|  | } | 
|  |  | 
|  | // Set up mocks. | 
|  | tCache := &cache_mocks.TaskCache{} | 
|  | tCache.On("GetTasksByKey", candidate.TaskKey).Return([]*types.Task{}, nil) | 
|  | s := &TaskScheduler{ | 
|  | cdPool: cdPoolName, | 
|  | tCache: tCache, | 
|  | window: window_mocks.AllInclusiveWindow(), | 
|  | } | 
|  |  | 
|  | // Ensure that the candidate gets filtered out, and that the diagnostics | 
|  | // tell us why. | 
|  | c, err := s.filterTaskCandidates(ctx, candidates) | 
|  | require.NoError(t, err) | 
|  | require.Empty(t, c) | 
|  | require.Equal(t, "Skia", candidate.GetDiagnostics().Filtering.ForbiddenPool) | 
|  | } | 
|  |  | 
|  | func TestFilterTaskCandidates_NoRegularTasksInCDPool(t *testing.T) { | 
|  |  | 
|  | ctx := context.Background() | 
|  |  | 
|  | // Create a single candidate, which is a non-CD task but requests to be run | 
|  | // in the CD pool. | 
|  | k1 := types.TaskKey{ | 
|  | RepoState: rs1, | 
|  | Name:      tcc_testutils.BuildTaskName, | 
|  | } | 
|  | candidate := &TaskCandidate{ | 
|  | TaskKey: k1, | 
|  | TaskSpec: &specs.TaskSpec{ | 
|  | Dimensions: []string{fmt.Sprintf("pool:%s", cdPoolName)}, | 
|  | }, | 
|  | } | 
|  | candidates := map[types.TaskKey]*TaskCandidate{ | 
|  | k1: candidate, | 
|  | } | 
|  |  | 
|  | // Set up mocks. | 
|  | tCache := &cache_mocks.TaskCache{} | 
|  | tCache.On("GetTasksByKey", candidate.TaskKey).Return([]*types.Task{}, nil) | 
|  | s := &TaskScheduler{ | 
|  | cdPool: cdPoolName, | 
|  | tCache: tCache, | 
|  | window: window_mocks.AllInclusiveWindow(), | 
|  | } | 
|  |  | 
|  | // Ensure that the candidate gets filtered out, and that the diagnostics | 
|  | // tell us why. | 
|  | c, err := s.filterTaskCandidates(ctx, candidates) | 
|  | require.NoError(t, err) | 
|  | require.Empty(t, c) | 
|  | require.Equal(t, cdPoolName, candidate.GetDiagnostics().Filtering.ForbiddenPool) | 
|  | } | 
|  |  | 
|  | // newMemRepo is a convenience function which creates a MemGit backed by an | 
|  | // in-memory GitStore and builds a repograph.Graph from it. MemGit automatically | 
|  | // calls Graph.Update() whenever it is mutated. | 
|  | func newMemRepo(t sktest.TestingT) (*mem_git.MemGit, *repograph.Graph) { | 
|  | gs := mem_gitstore.New() | 
|  | gb := mem_git.New(t, gs) | 
|  | ctx := context.Background() | 
|  | ri, err := gitstore.NewGitStoreRepoImpl(ctx, gs) | 
|  | require.NoError(t, err) | 
|  | repo, err := repograph.NewWithRepoImpl(ctx, ri) | 
|  | require.NoError(t, err) | 
|  | gb.AddUpdater(repo) | 
|  | return gb, repo | 
|  | } |