| package scheduling |
| |
| import ( |
| "context" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "io/ioutil" |
| "math" |
| "path" |
| "sort" |
| "strings" |
| "sync" |
| "testing" |
| "time" |
| |
| "cloud.google.com/go/storage" |
| "github.com/stretchr/testify/assert" |
| "github.com/stretchr/testify/mock" |
| "github.com/stretchr/testify/require" |
| swarming_api "go.chromium.org/luci/common/api/swarming/swarming/v1" |
| |
| "go.skia.org/infra/go/cas/mocks" |
| "go.skia.org/infra/go/cas/rbe" |
| "go.skia.org/infra/go/deepequal" |
| "go.skia.org/infra/go/deepequal/assertdeep" |
| ftestutils "go.skia.org/infra/go/firestore/testutils" |
| "go.skia.org/infra/go/gcs/mem_gcsclient" |
| "go.skia.org/infra/go/git" |
| "go.skia.org/infra/go/git/repograph" |
| "go.skia.org/infra/go/git/testutils/mem_git" |
| "go.skia.org/infra/go/gitiles" |
| "go.skia.org/infra/go/gitstore" |
| "go.skia.org/infra/go/gitstore/mem_gitstore" |
| "go.skia.org/infra/go/metrics2" |
| "go.skia.org/infra/go/mockhttpclient" |
| "go.skia.org/infra/go/now" |
| "go.skia.org/infra/go/sktest" |
| "go.skia.org/infra/go/swarming" |
| "go.skia.org/infra/go/testutils" |
| "go.skia.org/infra/go/util" |
| "go.skia.org/infra/go/vcsinfo" |
| "go.skia.org/infra/task_scheduler/go/db" |
| "go.skia.org/infra/task_scheduler/go/db/cache" |
| cache_mocks "go.skia.org/infra/task_scheduler/go/db/cache/mocks" |
| "go.skia.org/infra/task_scheduler/go/db/memory" |
| "go.skia.org/infra/task_scheduler/go/skip_tasks" |
| "go.skia.org/infra/task_scheduler/go/specs" |
| "go.skia.org/infra/task_scheduler/go/task_cfg_cache" |
| tcc_mocks "go.skia.org/infra/task_scheduler/go/task_cfg_cache/mocks" |
| tcc_testutils "go.skia.org/infra/task_scheduler/go/task_cfg_cache/testutils" |
| swarming_task_execution "go.skia.org/infra/task_scheduler/go/task_execution/swarming" |
| swarming_testutils "go.skia.org/infra/task_scheduler/go/testutils" |
| "go.skia.org/infra/task_scheduler/go/types" |
| "go.skia.org/infra/task_scheduler/go/window" |
| window_mocks "go.skia.org/infra/task_scheduler/go/window/mocks" |
| ) |
| |
| const ( |
| scoreDelta = 0.000001 |
| cdPoolName = "cd-pool" |
| ) |
| |
| var ( |
| androidTaskDims = map[string]string{ |
| "pool": "Skia", |
| "os": "Android", |
| "device_type": "grouper", |
| } |
| |
| linuxTaskDims = map[string]string{ |
| "os": "Ubuntu", |
| "pool": "Skia", |
| } |
| |
| androidBotDims = map[string][]string{ |
| "pool": {"Skia"}, |
| "os": {"Android"}, |
| "device_type": {"grouper"}, |
| } |
| |
| linuxBotDims = map[string][]string{ |
| "os": {"Ubuntu"}, |
| "pool": {"Skia"}, |
| } |
| |
| // The following are commits and RepoStates used in tests. The commit |
| // hashes were chosen to be equal to those generated by MemGit, but the |
| // contents are completely arbitrary. |
| lc1 = &vcsinfo.LongCommit{ |
| ShortCommit: &vcsinfo.ShortCommit{ |
| Hash: mem_git.Commit0, |
| Author: "me@google.com", |
| Subject: "First Commit", |
| }, |
| Body: "My first commit", |
| Timestamp: time.Unix(1571926390, 0), |
| Index: 0, |
| Branches: map[string]bool{ |
| git.MainBranch: true, |
| }, |
| } |
| lc2 = &vcsinfo.LongCommit{ |
| ShortCommit: &vcsinfo.ShortCommit{ |
| Hash: mem_git.Commit1, |
| Author: "you@google.com", |
| Subject: "Second Commit", |
| }, |
| Body: "My second commit", |
| Parents: []string{lc1.Hash}, |
| Timestamp: time.Unix(1571926450, 0), |
| Index: 1, |
| Branches: map[string]bool{ |
| git.MainBranch: true, |
| }, |
| } |
| |
| ic1 = &vcsinfo.IndexCommit{ |
| Hash: lc1.Hash, |
| Index: lc1.Index, |
| Timestamp: lc1.Timestamp, |
| } |
| ic2 = &vcsinfo.IndexCommit{ |
| Hash: lc2.Hash, |
| Index: lc2.Index, |
| Timestamp: lc2.Timestamp, |
| } |
| |
| rs1 = types.RepoState{ |
| Repo: "fake.git", |
| Revision: lc1.Hash, |
| } |
| rs2 = types.RepoState{ |
| Repo: "fake.git", |
| Revision: lc2.Hash, |
| } |
| ) |
| |
| func makeTask(ctx context.Context, name, repo, revision string) *types.Task { |
| return &types.Task{ |
| Commits: []string{revision}, |
| Created: now.Now(ctx), |
| TaskKey: types.TaskKey{ |
| RepoState: types.RepoState{ |
| Repo: repo, |
| Revision: revision, |
| }, |
| Name: name, |
| }, |
| MaxAttempts: types.DEFAULT_MAX_TASK_ATTEMPTS, |
| SwarmingTaskId: "swarmid", |
| } |
| } |
| |
| func makeSwarmingRpcsTaskRequestMetadata(t *testing.T, task *types.Task, dims map[string]string) *swarming_api.SwarmingRpcsTaskRequestMetadata { |
| tag := func(k, v string) string { |
| return fmt.Sprintf("%s:%s", k, v) |
| } |
| ts := func(t time.Time) string { |
| if util.TimeIsZero(t) { |
| return "" |
| } |
| return t.Format(swarming.TIMESTAMP_FORMAT) |
| } |
| abandoned := "" |
| state := swarming.TASK_STATE_PENDING |
| failed := false |
| switch task.Status { |
| case types.TASK_STATUS_MISHAP: |
| state = swarming.TASK_STATE_BOT_DIED |
| abandoned = ts(task.Finished) |
| case types.TASK_STATUS_RUNNING: |
| state = swarming.TASK_STATE_RUNNING |
| case types.TASK_STATUS_FAILURE: |
| state = swarming.TASK_STATE_COMPLETED |
| failed = true |
| case types.TASK_STATUS_SUCCESS: |
| state = swarming.TASK_STATE_COMPLETED |
| case types.TASK_STATUS_PENDING: |
| // noop |
| default: |
| require.FailNow(t, "Unknown task status: %s", task.Status) |
| } |
| tags := []string{ |
| tag(types.SWARMING_TAG_ID, task.Id), |
| tag(types.SWARMING_TAG_FORCED_JOB_ID, task.ForcedJobId), |
| tag(types.SWARMING_TAG_NAME, task.Name), |
| tag(swarming.DIMENSION_POOL_KEY, swarming.DIMENSION_POOL_VALUE_SKIA), |
| tag(types.SWARMING_TAG_REPO, task.Repo), |
| tag(types.SWARMING_TAG_REVISION, task.Revision), |
| } |
| for _, p := range task.ParentTaskIds { |
| tags = append(tags, tag(types.SWARMING_TAG_PARENT_TASK_ID, p)) |
| } |
| |
| dimensions := make([]*swarming_api.SwarmingRpcsStringPair, 0, len(dims)) |
| for k, v := range dims { |
| dimensions = append(dimensions, &swarming_api.SwarmingRpcsStringPair{ |
| Key: k, |
| Value: v, |
| }) |
| } |
| |
| var casOutput *swarming_api.SwarmingRpcsCASReference |
| if task.IsolatedOutput != "" { |
| var err error |
| casOutput, err = swarming.MakeCASReference(task.IsolatedOutput, "fake-cas-instance") |
| require.NoError(t, err) |
| } |
| |
| return &swarming_api.SwarmingRpcsTaskRequestMetadata{ |
| Request: &swarming_api.SwarmingRpcsTaskRequest{ |
| CreatedTs: ts(task.Created), |
| TaskSlices: []*swarming_api.SwarmingRpcsTaskSlice{ |
| { |
| Properties: &swarming_api.SwarmingRpcsTaskProperties{ |
| Dimensions: dimensions, |
| }, |
| }, |
| }, |
| Tags: tags, |
| }, |
| TaskId: task.SwarmingTaskId, |
| TaskResult: &swarming_api.SwarmingRpcsTaskResult{ |
| AbandonedTs: abandoned, |
| BotId: task.SwarmingBotId, |
| CreatedTs: ts(task.Created), |
| CompletedTs: ts(task.Finished), |
| Failure: failed, |
| CasOutputRoot: casOutput, |
| StartedTs: ts(task.Started), |
| State: state, |
| Tags: tags, |
| TaskId: task.SwarmingTaskId, |
| }, |
| } |
| } |
| |
| // Insert the given data into the caches. |
| func fillCaches(t sktest.TestingT, ctx context.Context, taskCfgCache task_cfg_cache.TaskCfgCache, rs types.RepoState, cfg *specs.TasksCfg) { |
| require.NoError(t, taskCfgCache.Set(ctx, rs, cfg, nil)) |
| } |
| |
| // Add jobs for the given RepoStates. Assumes that the RepoStates have already |
| // been added to the caches. |
| func insertJobs(t sktest.TestingT, ctx context.Context, s *TaskScheduler, rss ...types.RepoState) { |
| jobs := []*types.Job{} |
| for _, rs := range rss { |
| cfg, cachedErr, err := s.taskCfgCache.Get(ctx, rs) |
| require.NoError(t, err) |
| require.NoError(t, cachedErr) |
| for name := range cfg.Jobs { |
| j, err := task_cfg_cache.MakeJob(ctx, s.taskCfgCache, rs, name) |
| require.NoError(t, err) |
| jobs = append(jobs, j) |
| } |
| } |
| require.NoError(t, s.putJobsInChunks(ctx, jobs)) |
| } |
| |
| // Common setup for TaskScheduler tests. |
| func setup(t *testing.T) (context.Context, *mem_git.MemGit, *memory.InMemoryDB, *swarming_testutils.TestClient, *TaskScheduler, *mockhttpclient.URLMock, *mocks.CAS, func()) { |
| |
| ctx, cancel := context.WithCancel(context.Background()) |
| |
| tmp, err := ioutil.TempDir("", "") |
| require.NoError(t, err) |
| |
| d := memory.NewInMemoryDB() |
| swarmingClient := swarming_testutils.NewTestClient() |
| urlMock := mockhttpclient.NewURLMock() |
| mg, repo := newMemRepo(t) |
| hashes := mg.CommitN(2) |
| // Sanity check. |
| require.Equal(t, lc1.Hash, hashes[1]) |
| require.Equal(t, lc2.Hash, hashes[0]) |
| repos := repograph.Map{ |
| rs1.Repo: repo, |
| } |
| btProject, btInstance, btCleanup := tcc_testutils.SetupBigTable(t) |
| taskCfgCache, err := task_cfg_cache.NewTaskCfgCache(ctx, repos, btProject, btInstance, nil) |
| require.NoError(t, err) |
| |
| // Cache the RepoStates. This is normally done by the JobCreator. |
| fillCaches(t, ctx, taskCfgCache, rs1, tcc_testutils.TasksCfg1) |
| fillCaches(t, ctx, taskCfgCache, rs2, tcc_testutils.TasksCfg2) |
| |
| cas := &mocks.CAS{} |
| cas.On("Close").Return(nil) |
| // Go ahead and mock the single-input merge calls. |
| cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.CompileCASDigest}).Return(tcc_testutils.CompileCASDigest, nil) |
| cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.TestCASDigest}).Return(tcc_testutils.TestCASDigest, nil) |
| cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.PerfCASDigest}).Return(tcc_testutils.PerfCASDigest, nil) |
| |
| taskExec := swarming_task_execution.NewSwarmingTaskExecutor(swarmingClient, "fake-cas-instance", "") |
| taskExecs := map[string]types.TaskExecutor{ |
| types.TaskExecutor_Swarming: taskExec, |
| types.TaskExecutor_UseDefault: taskExec, |
| } |
| s, err := NewTaskScheduler(ctx, d, nil, time.Duration(math.MaxInt64), 0, repos, cas, "fake-cas-instance", taskExecs, urlMock.Client(), 1.0, swarming.POOLS_PUBLIC, cdPoolName, "", taskCfgCache, nil, mem_gcsclient.New("diag_unit_tests"), btInstance, false) |
| require.NoError(t, err) |
| |
| // Insert jobs. This is normally done by the JobCreator. |
| insertJobs(t, ctx, s, rs1, rs2) |
| |
| return ctx, mg, d, swarmingClient, s, urlMock, cas, func() { |
| testutils.AssertCloses(t, s) |
| testutils.RemoveAll(t, tmp) |
| btCleanup() |
| cancel() |
| } |
| } |
| |
| // runMainLoop calls s.MainLoop, asserts there was no error, and waits for |
| // background goroutines to finish. |
| func runMainLoop(t *testing.T, s *TaskScheduler, ctx context.Context) { |
| require.NoError(t, s.MainLoop(ctx)) |
| s.testWaitGroup.Wait() |
| } |
| |
| func lastDiagnostics(t *testing.T, s *TaskScheduler) taskSchedulerMainLoopDiagnostics { |
| ctx := context.Background() |
| lastname := "" |
| require.NoError(t, s.diagClient.AllFilesInDirectory(ctx, path.Join(s.diagInstance, GCS_MAIN_LOOP_DIAGNOSTICS_DIR), func(item *storage.ObjectAttrs) error { |
| if lastname == "" || item.Name > lastname { |
| lastname = item.Name |
| } |
| return nil |
| })) |
| require.NotEqual(t, lastname, "") |
| reader, err := s.diagClient.FileReader(ctx, lastname) |
| require.NoError(t, err) |
| defer testutils.AssertCloses(t, reader) |
| rv := taskSchedulerMainLoopDiagnostics{} |
| require.NoError(t, json.NewDecoder(reader).Decode(&rv)) |
| return rv |
| } |
| |
| func TestFindTaskCandidatesForJobs(t *testing.T) { |
| ctx, _, _, _, s, _, _, cleanup := setup(t) |
| defer cleanup() |
| |
| test := func(jobs []*types.Job, expect map[types.TaskKey]*TaskCandidate) { |
| actual, err := s.findTaskCandidatesForJobs(ctx, jobs) |
| require.NoError(t, err) |
| assertdeep.Equal(t, expect, actual) |
| } |
| |
| cfg1, cachedErr, err := s.taskCfgCache.Get(ctx, rs1) |
| require.NoError(t, err) |
| require.NoError(t, cachedErr) |
| cfg2, cachedErr, err := s.taskCfgCache.Get(ctx, rs2) |
| require.NoError(t, err) |
| require.NoError(t, cachedErr) |
| |
| // Run on an empty job list, ensure empty list returned. |
| test([]*types.Job{}, map[types.TaskKey]*TaskCandidate{}) |
| |
| currentTime := now.Now(ctx).UTC() |
| |
| // Run for one job, ensure that we get the right set of task specs |
| // returned (ie. all dependencies and their dependencies). |
| j1 := &types.Job{ |
| Created: currentTime, |
| Id: "job1id", |
| Name: tcc_testutils.TestTaskName, |
| Dependencies: map[string][]string{tcc_testutils.TestTaskName: {tcc_testutils.BuildTaskName}, tcc_testutils.BuildTaskName: {}}, |
| Priority: 0.5, |
| RepoState: rs1.Copy(), |
| } |
| tc1 := &TaskCandidate{ |
| CasDigests: []string{tcc_testutils.CompileCASDigest}, |
| Jobs: []*types.Job{j1}, |
| TaskKey: types.TaskKey{ |
| RepoState: rs1.Copy(), |
| Name: tcc_testutils.BuildTaskName, |
| }, |
| TaskSpec: cfg1.Tasks[tcc_testutils.BuildTaskName].Copy(), |
| } |
| tc2 := &TaskCandidate{ |
| CasDigests: []string{tcc_testutils.TestCASDigest}, |
| Jobs: []*types.Job{j1}, |
| TaskKey: types.TaskKey{ |
| RepoState: rs1.Copy(), |
| Name: tcc_testutils.TestTaskName, |
| }, |
| TaskSpec: cfg1.Tasks[tcc_testutils.TestTaskName].Copy(), |
| } |
| |
| test([]*types.Job{j1}, map[types.TaskKey]*TaskCandidate{ |
| tc1.TaskKey: tc1, |
| tc2.TaskKey: tc2, |
| }) |
| |
| // Add a job, ensure that its dependencies are added and that the right |
| // dependencies are de-duplicated. |
| j2 := &types.Job{ |
| Created: currentTime, |
| Id: "job2id", |
| Name: tcc_testutils.TestTaskName, |
| Dependencies: map[string][]string{tcc_testutils.TestTaskName: {tcc_testutils.BuildTaskName}, tcc_testutils.BuildTaskName: {}}, |
| Priority: 0.6, |
| RepoState: rs2, |
| } |
| j3 := &types.Job{ |
| Created: currentTime, |
| Id: "job3id", |
| Name: tcc_testutils.PerfTaskName, |
| Dependencies: map[string][]string{tcc_testutils.PerfTaskName: {tcc_testutils.BuildTaskName}, tcc_testutils.BuildTaskName: {}}, |
| Priority: 0.6, |
| RepoState: rs2, |
| } |
| tc3 := &TaskCandidate{ |
| CasDigests: []string{tcc_testutils.CompileCASDigest}, |
| Jobs: []*types.Job{j2, j3}, |
| TaskKey: types.TaskKey{ |
| RepoState: rs2.Copy(), |
| Name: tcc_testutils.BuildTaskName, |
| }, |
| TaskSpec: cfg2.Tasks[tcc_testutils.BuildTaskName].Copy(), |
| } |
| tc4 := &TaskCandidate{ |
| CasDigests: []string{tcc_testutils.TestCASDigest}, |
| Jobs: []*types.Job{j2}, |
| TaskKey: types.TaskKey{ |
| RepoState: rs2.Copy(), |
| Name: tcc_testutils.TestTaskName, |
| }, |
| TaskSpec: cfg2.Tasks[tcc_testutils.TestTaskName].Copy(), |
| } |
| tc5 := &TaskCandidate{ |
| CasDigests: []string{tcc_testutils.PerfCASDigest}, |
| Jobs: []*types.Job{j3}, |
| TaskKey: types.TaskKey{ |
| RepoState: rs2.Copy(), |
| Name: tcc_testutils.PerfTaskName, |
| }, |
| TaskSpec: cfg2.Tasks[tcc_testutils.PerfTaskName].Copy(), |
| } |
| allCandidates := map[types.TaskKey]*TaskCandidate{ |
| tc1.TaskKey: tc1, |
| tc2.TaskKey: tc2, |
| tc3.TaskKey: tc3, |
| tc4.TaskKey: tc4, |
| tc5.TaskKey: tc5, |
| } |
| test([]*types.Job{j1, j2, j3}, allCandidates) |
| |
| // Finish j3, ensure that its task specs no longer show up. |
| delete(allCandidates, j3.MakeTaskKey(tcc_testutils.PerfTaskName)) |
| // This is hacky, but findTaskCandidatesForJobs accepts an already- |
| // filtered list of jobs, so we have to pretend it never existed. |
| tc3.Jobs = tc3.Jobs[:1] |
| test([]*types.Job{j1, j2}, allCandidates) |
| |
| // Ensure that we don't generate candidates for jobs at nonexistent commits. |
| j4 := &types.Job{ |
| Created: currentTime, |
| Id: "job4id", |
| Name: tcc_testutils.PerfTaskName, |
| Dependencies: map[string][]string{tcc_testutils.PerfTaskName: {tcc_testutils.BuildTaskName}, tcc_testutils.BuildTaskName: {}}, |
| Priority: 0.6, |
| RepoState: types.RepoState{ |
| Repo: rs2.Repo, |
| Revision: "aaaaabbbbbcccccdddddeeeeefffff1111122222", |
| }, |
| } |
| test([]*types.Job{j4}, map[types.TaskKey]*TaskCandidate{}) |
| } |
| |
| func TestFilterTaskCandidates(t *testing.T) { |
| ctx, _, _, _, s, _, _, cleanup := setup(t) |
| defer cleanup() |
| |
| c1 := rs1.Revision |
| c2 := rs2.Revision |
| |
| // Fake out the initial candidates. |
| k1 := types.TaskKey{ |
| RepoState: rs1, |
| Name: tcc_testutils.BuildTaskName, |
| } |
| k2 := types.TaskKey{ |
| RepoState: rs1, |
| Name: tcc_testutils.TestTaskName, |
| } |
| k3 := types.TaskKey{ |
| RepoState: rs2, |
| Name: tcc_testutils.BuildTaskName, |
| } |
| k4 := types.TaskKey{ |
| RepoState: rs2, |
| Name: tcc_testutils.TestTaskName, |
| } |
| k5 := types.TaskKey{ |
| RepoState: rs2, |
| Name: tcc_testutils.PerfTaskName, |
| } |
| candidates := map[types.TaskKey]*TaskCandidate{ |
| k1: { |
| TaskKey: k1, |
| TaskSpec: &specs.TaskSpec{}, |
| }, |
| k2: { |
| TaskKey: k2, |
| TaskSpec: &specs.TaskSpec{ |
| Dependencies: []string{tcc_testutils.BuildTaskName}, |
| }, |
| }, |
| k3: { |
| TaskKey: k3, |
| TaskSpec: &specs.TaskSpec{}, |
| }, |
| k4: { |
| TaskKey: k4, |
| TaskSpec: &specs.TaskSpec{ |
| Dependencies: []string{tcc_testutils.BuildTaskName}, |
| }, |
| }, |
| k5: { |
| TaskKey: k5, |
| TaskSpec: &specs.TaskSpec{ |
| Dependencies: []string{tcc_testutils.BuildTaskName}, |
| }, |
| }, |
| } |
| |
| clearDiagnostics := func(candidates map[types.TaskKey]*TaskCandidate) { |
| for _, c := range candidates { |
| c.Diagnostics = nil |
| } |
| } |
| |
| // Check the initial set of task candidates. The two Build tasks |
| // should be the only ones available. |
| c, err := s.filterTaskCandidates(ctx, candidates) |
| require.NoError(t, err) |
| require.Len(t, c, 1) |
| require.Equal(t, 1, len(c[rs1.Repo])) |
| require.Equal(t, 2, len(c[rs1.Repo][tcc_testutils.BuildTaskName])) |
| for _, byRepo := range c { |
| for _, byName := range byRepo { |
| for _, candidate := range byName { |
| require.Equal(t, candidate.Name, tcc_testutils.BuildTaskName) |
| } |
| } |
| } |
| // Check filtering diagnostics. Non-Build tasks have unmet dependencies. |
| for _, candidate := range candidates { |
| if candidate.Name != tcc_testutils.BuildTaskName { |
| require.Equal(t, candidate.Diagnostics.Filtering.UnmetDependencies, []string{tcc_testutils.BuildTaskName}) |
| } |
| } |
| |
| // Insert a the Build task at c1 (1 dependent) into the database, |
| // transition through various states. |
| var t1 *types.Task |
| for _, byRepo := range c { // Order not guaranteed, find the right candidate. |
| for _, byName := range byRepo { |
| for _, candidate := range byName { |
| if candidate.Revision == c1 { |
| t1 = makeTask(ctx, candidate.Name, candidate.Repo, candidate.Revision) |
| break |
| } |
| } |
| } |
| } |
| require.NotNil(t, t1) |
| |
| // We shouldn't duplicate pending or running tasks. |
| for _, status := range []types.TaskStatus{types.TASK_STATUS_PENDING, types.TASK_STATUS_RUNNING} { |
| clearDiagnostics(candidates) |
| |
| t1.Status = status |
| require.NoError(t, s.putTask(ctx, t1)) |
| |
| c, err = s.filterTaskCandidates(ctx, candidates) |
| require.NoError(t, err) |
| require.Len(t, c, 1) |
| for _, byRepo := range c { |
| for _, byName := range byRepo { |
| for _, candidate := range byName { |
| require.Equal(t, candidate.Name, tcc_testutils.BuildTaskName) |
| require.Equal(t, c2, candidate.Revision) |
| } |
| } |
| } |
| // Check filtering diagnostics. |
| for _, candidate := range candidates { |
| if candidate.Name != tcc_testutils.BuildTaskName { |
| // Non-Build tasks have unmet dependencies. |
| require.Equal(t, candidate.Diagnostics.Filtering.UnmetDependencies, []string{tcc_testutils.BuildTaskName}) |
| } else if candidate.Revision == c1 { |
| // Blocked by t1 |
| require.Equal(t, candidate.Diagnostics.Filtering.SupersededByTask, t1.Id) |
| } |
| } |
| } |
| |
| clearDiagnostics(candidates) |
| |
| // The task failed. Ensure that its dependents are not candidates, but |
| // the task itself is back in the list of candidates, in case we want |
| // to retry. |
| t1.Status = types.TASK_STATUS_FAILURE |
| require.NoError(t, s.putTask(ctx, t1)) |
| |
| c, err = s.filterTaskCandidates(ctx, candidates) |
| require.NoError(t, err) |
| require.Len(t, c, 1) |
| for _, byRepo := range c { |
| require.Len(t, byRepo, 1) |
| for _, byName := range byRepo { |
| require.Len(t, byName, 2) |
| for _, candidate := range byName { |
| require.Equal(t, candidate.Name, tcc_testutils.BuildTaskName) |
| } |
| } |
| } |
| // Check filtering diagnostics. |
| for _, candidate := range candidates { |
| if candidate.Name != tcc_testutils.BuildTaskName { |
| // Non-Build tasks have unmet dependencies. |
| require.Equal(t, candidate.Diagnostics.Filtering.UnmetDependencies, []string{tcc_testutils.BuildTaskName}) |
| } |
| } |
| |
| clearDiagnostics(candidates) |
| |
| // The task succeeded. Ensure that its dependents are candidates and |
| // the task itself is not. |
| t1.Status = types.TASK_STATUS_SUCCESS |
| t1.IsolatedOutput = "fake isolated hash" |
| require.NoError(t, s.putTask(ctx, t1)) |
| |
| c, err = s.filterTaskCandidates(ctx, candidates) |
| require.NoError(t, err) |
| require.Len(t, c, 1) |
| for _, byRepo := range c { |
| require.Len(t, byRepo, 2) |
| for _, byName := range byRepo { |
| for _, candidate := range byName { |
| require.False(t, t1.Name == candidate.Name && t1.Revision == candidate.Revision) |
| } |
| } |
| } |
| // Candidate with k1 is blocked by t1. |
| require.Equal(t, candidates[k1].Diagnostics.Filtering.SupersededByTask, t1.Id) |
| |
| clearDiagnostics(candidates) |
| |
| // Create the other Build task. |
| var t2 *types.Task |
| for _, byRepo := range c { |
| for _, byName := range byRepo { |
| for _, candidate := range byName { |
| if candidate.Revision == c2 && strings.HasPrefix(candidate.Name, "Build-") { |
| t2 = makeTask(ctx, candidate.Name, candidate.Repo, candidate.Revision) |
| break |
| } |
| } |
| } |
| } |
| require.NotNil(t, t2) |
| t2.Status = types.TASK_STATUS_SUCCESS |
| t2.IsolatedOutput = "fake isolated hash" |
| require.NoError(t, s.putTask(ctx, t2)) |
| |
| // All test and perf tasks are now candidates, no build tasks. |
| c, err = s.filterTaskCandidates(ctx, candidates) |
| require.NoError(t, err) |
| require.Len(t, c, 1) |
| require.Equal(t, 2, len(c[rs1.Repo][tcc_testutils.TestTaskName])) |
| require.Equal(t, 1, len(c[rs1.Repo][tcc_testutils.PerfTaskName])) |
| for _, byRepo := range c { |
| for _, byName := range byRepo { |
| for _, candidate := range byName { |
| require.NotEqual(t, candidate.Name, tcc_testutils.BuildTaskName) |
| } |
| } |
| } |
| // Build candidates are blocked by completed tasks. |
| require.Equal(t, candidates[k1].Diagnostics.Filtering.SupersededByTask, t1.Id) |
| require.Equal(t, candidates[k3].Diagnostics.Filtering.SupersededByTask, t2.Id) |
| |
| clearDiagnostics(candidates) |
| |
| // Add a try job. Ensure that no deps have been incorrectly satisfied. |
| tryKey := k4.Copy() |
| tryKey.Server = "dummy-server" |
| tryKey.Issue = "dummy-issue" |
| tryKey.Patchset = "dummy-patchset" |
| candidates[tryKey] = &TaskCandidate{ |
| TaskKey: tryKey, |
| TaskSpec: &specs.TaskSpec{ |
| Dependencies: []string{tcc_testutils.BuildTaskName}, |
| }, |
| } |
| c, err = s.filterTaskCandidates(ctx, candidates) |
| require.NoError(t, err) |
| require.Len(t, c, 1) |
| require.Equal(t, 2, len(c[rs1.Repo][tcc_testutils.TestTaskName])) |
| require.Equal(t, 1, len(c[rs1.Repo][tcc_testutils.PerfTaskName])) |
| for _, byRepo := range c { |
| for _, byName := range byRepo { |
| for _, candidate := range byName { |
| require.NotEqual(t, candidate.Name, tcc_testutils.BuildTaskName) |
| require.False(t, candidate.IsTryJob()) |
| } |
| } |
| } |
| // Check diagnostics for tryKey |
| require.Equal(t, candidates[tryKey].Diagnostics.Filtering.UnmetDependencies, []string{tcc_testutils.BuildTaskName}) |
| } |
| |
| // processTaskCandidate is a helper function for processing a single task |
| // candidate. |
| func processTaskCandidate(ctx context.Context, s *TaskScheduler, c *TaskCandidate) error { |
| candidates := map[string]map[string][]*TaskCandidate{ |
| c.Repo: { |
| c.Name: []*TaskCandidate{c}, |
| }, |
| } |
| _, err := s.processTaskCandidates(ctx, candidates) |
| return err |
| } |
| |
| func TestProcessTaskCandidate(t *testing.T) { |
| _, _, _, _, s, _, _, cleanup := setup(t) |
| defer cleanup() |
| |
| currentTime := time.Unix(0, 1470674884000000) |
| ctx := now.TimeTravelingContext(currentTime) |
| |
| checkDiagTryForced := func(c *TaskCandidate) { |
| require.NotNil(t, c.Diagnostics) |
| diag := c.Diagnostics.Scoring |
| require.NotNil(t, diag) |
| require.Equal(t, c.Jobs[0].Priority, diag.Priority) |
| require.Equal(t, currentTime.Sub(c.Jobs[0].Created).Hours(), diag.JobCreatedHours) |
| // The remaining fields should always be 0 for try/forced jobs. |
| require.Equal(t, 0, diag.StoleFromCommits) |
| require.Equal(t, 0.0, diag.TestednessIncrease) |
| require.Equal(t, 0.0, diag.TimeDecay) |
| } |
| |
| tryjobRs := types.RepoState{ |
| Patch: types.Patch{ |
| Server: "my-server", |
| Issue: "my-issue", |
| Patchset: "my-patchset", |
| }, |
| Repo: rs1.Repo, |
| Revision: rs1.Revision, |
| } |
| tryjob := &types.Job{ |
| Id: "tryjobId", |
| Created: currentTime.Add(-1 * time.Hour), |
| Name: "job", |
| Priority: 0.5, |
| RepoState: tryjobRs, |
| } |
| c := &TaskCandidate{ |
| Jobs: []*types.Job{tryjob}, |
| TaskKey: types.TaskKey{ |
| Name: tcc_testutils.BuildTaskName, |
| RepoState: tryjobRs, |
| }, |
| TaskSpec: tcc_testutils.TasksCfg1.Tasks[tcc_testutils.BuildTaskName], |
| } |
| require.NoError(t, processTaskCandidate(ctx, s, c)) |
| // Try job candidates have a specific score and no blamelist. |
| require.InDelta(t, (CANDIDATE_SCORE_TRY_JOB+1.0)*0.5, c.Score, scoreDelta) |
| require.Nil(t, c.Commits) |
| checkDiagTryForced(c) |
| |
| // Retries are scored lower. |
| c = &TaskCandidate{ |
| Attempt: 1, |
| Jobs: []*types.Job{tryjob}, |
| TaskKey: types.TaskKey{ |
| Name: tcc_testutils.BuildTaskName, |
| RepoState: tryjobRs, |
| }, |
| TaskSpec: tcc_testutils.TasksCfg1.Tasks[tcc_testutils.BuildTaskName], |
| } |
| require.NoError(t, processTaskCandidate(ctx, s, c)) |
| require.InDelta(t, (CANDIDATE_SCORE_TRY_JOB+1.0)*0.5*CANDIDATE_SCORE_TRY_JOB_RETRY_MULTIPLIER, c.Score, scoreDelta) |
| require.Nil(t, c.Commits) |
| checkDiagTryForced(c) |
| |
| forcedJob := &types.Job{ |
| Id: "forcedJobId", |
| Created: currentTime.Add(-2 * time.Hour), |
| Name: tcc_testutils.BuildTaskName, |
| Priority: 0.5, |
| RepoState: rs2, |
| } |
| // Manually forced candidates have a blamelist and a specific score. |
| c = &TaskCandidate{ |
| Jobs: []*types.Job{forcedJob}, |
| TaskKey: types.TaskKey{ |
| Name: tcc_testutils.BuildTaskName, |
| RepoState: rs2, |
| ForcedJobId: forcedJob.Id, |
| }, |
| TaskSpec: tcc_testutils.TasksCfg2.Tasks[tcc_testutils.BuildTaskName], |
| } |
| require.NoError(t, processTaskCandidate(ctx, s, c)) |
| require.InDelta(t, (CANDIDATE_SCORE_FORCE_RUN+2.0)*0.5, c.Score, scoreDelta) |
| require.Equal(t, 2, len(c.Commits)) |
| checkDiagTryForced(c) |
| |
| // All other candidates have a blamelist and a time-decayed score. |
| regularJob := &types.Job{ |
| Id: "regularJobId", |
| Created: currentTime.Add(-1 * time.Hour), |
| Name: tcc_testutils.BuildTaskName, |
| Priority: 0.5, |
| RepoState: rs2, |
| } |
| c = &TaskCandidate{ |
| Jobs: []*types.Job{regularJob}, |
| TaskKey: types.TaskKey{ |
| Name: tcc_testutils.BuildTaskName, |
| RepoState: rs2, |
| }, |
| TaskSpec: tcc_testutils.TasksCfg2.Tasks[tcc_testutils.BuildTaskName], |
| } |
| require.NoError(t, processTaskCandidate(ctx, s, c)) |
| require.True(t, c.Score > 0) |
| require.Equal(t, 2, len(c.Commits)) |
| diag := c.GetDiagnostics().Scoring |
| require.Equal(t, 0.5, diag.Priority) |
| require.Equal(t, 1.0, diag.JobCreatedHours) |
| require.Equal(t, 0, diag.StoleFromCommits) |
| require.Equal(t, 3.5, diag.TestednessIncrease) |
| require.InDelta(t, 1.0, diag.TimeDecay, scoreDelta) |
| } |
| |
| func TestRegularJobRetryScoring(t *testing.T) { |
| _, _, _, _, s, _, _, cleanup := setup(t) |
| defer cleanup() |
| |
| ctx := now.TimeTravelingContext(time.Date(2021, time.October, 15, 0, 0, 0, 0, time.UTC)) |
| currentTime := now.Now(ctx) |
| |
| checkDiag := func(c *TaskCandidate) { |
| require.NotNil(t, c.Diagnostics) |
| diag := c.Diagnostics.Scoring |
| require.NotNil(t, diag) |
| // All candidates in this test have a single Job. |
| require.Equal(t, c.Jobs[0].Priority, diag.Priority) |
| require.InDelta(t, currentTime.Sub(c.Jobs[0].Created).Hours(), diag.JobCreatedHours, scoreDelta) |
| // The commits are added close enough to "now" that there is no time decay. |
| require.Equal(t, 1.0, diag.TimeDecay) |
| } |
| |
| j1 := &types.Job{ |
| Id: "regularJobId1", |
| Created: currentTime.Add(-1 * time.Hour), |
| Name: tcc_testutils.BuildTaskName, |
| Priority: 0.5, |
| RepoState: rs1, |
| } |
| j2 := &types.Job{ |
| Id: "regularJobId2", |
| Created: currentTime.Add(-1 * time.Hour), |
| Name: tcc_testutils.BuildTaskName, |
| Priority: 0.5, |
| RepoState: rs2, |
| } |
| // Candidates at rs1 and rs2 |
| c1 := &TaskCandidate{ |
| Jobs: []*types.Job{j1}, |
| TaskKey: types.TaskKey{ |
| Name: tcc_testutils.BuildTaskName, |
| RepoState: rs1, |
| }, |
| TaskSpec: tcc_testutils.TasksCfg1.Tasks[tcc_testutils.BuildTaskName], |
| } |
| c2 := &TaskCandidate{ |
| Jobs: []*types.Job{j2}, |
| TaskKey: types.TaskKey{ |
| Name: tcc_testutils.BuildTaskName, |
| RepoState: rs2, |
| }, |
| TaskSpec: tcc_testutils.TasksCfg2.Tasks[tcc_testutils.BuildTaskName], |
| } |
| // Regular task at HEAD with 2 commits has score 3.5 scaled by priority 0.5. |
| require.NoError(t, processTaskCandidate(ctx, s, c2)) |
| require.InDelta(t, 3.5*0.5, c2.Score, scoreDelta) |
| require.Equal(t, 2, len(c2.Commits)) |
| diag := c2.GetDiagnostics().Scoring |
| require.Equal(t, 0, diag.StoleFromCommits) |
| require.Equal(t, 3.5, diag.TestednessIncrease) |
| checkDiag(c2) |
| // Regular task at HEAD^ (no backfill) with 1 commit has score 2 scaled by |
| // priority 0.5. |
| require.NoError(t, processTaskCandidate(ctx, s, c1)) |
| diag = c1.GetDiagnostics().Scoring |
| require.InDelta(t, 2*0.5, c1.Score, scoreDelta) |
| require.Equal(t, 1, len(c1.Commits)) |
| require.Equal(t, 0, diag.StoleFromCommits) |
| require.Equal(t, 2.0, diag.TestednessIncrease) |
| checkDiag(c1) |
| |
| // Add a task at rs2 that failed. |
| t2 := makeTask(ctx, c2.Name, c2.Repo, c2.Revision) |
| t2.Status = types.TASK_STATUS_FAILURE |
| t2.Commits = util.CopyStringSlice(c2.Commits) |
| require.NoError(t, s.putTask(ctx, t2)) |
| |
| // Update Attempt and RetryOf before calling processTaskCandidate. |
| c2.Attempt = 1 |
| c2.RetryOf = t2.Id |
| |
| // Retry task at rs2 with 2 commits for 2nd of 2 attempts has score 0.75 |
| // scaled by priority 0.5. |
| require.NoError(t, processTaskCandidate(ctx, s, c2)) |
| require.InDelta(t, 0.75*0.5, c2.Score, scoreDelta) |
| require.Equal(t, 2, len(c2.Commits)) |
| diag = c2.GetDiagnostics().Scoring |
| require.Equal(t, 2, diag.StoleFromCommits) |
| require.Equal(t, 0.0, diag.TestednessIncrease) |
| checkDiag(c2) |
| // Regular task at rs1 (backfilling failed task) with 1 commit has score 1.25 |
| // scaled by priority 0.5. |
| diag = &taskCandidateScoringDiagnostics{} |
| require.NoError(t, processTaskCandidate(ctx, s, c1)) |
| diag = c1.GetDiagnostics().Scoring |
| require.InDelta(t, 1.25*0.5, c1.Score, scoreDelta) |
| require.Equal(t, 1, len(c1.Commits)) |
| require.Equal(t, 2, diag.StoleFromCommits) |
| require.Equal(t, 0.5, diag.TestednessIncrease) |
| checkDiag(c1) |
| |
| // Actually, the task at rs2 had a mishap. |
| t2.Status = types.TASK_STATUS_MISHAP |
| require.NoError(t, s.putTask(ctx, t2)) |
| |
| // Scores should be same as for FAILURE. |
| require.NoError(t, processTaskCandidate(ctx, s, c2)) |
| diag = c2.GetDiagnostics().Scoring |
| require.InDelta(t, 0.75*0.5, c2.Score, scoreDelta) |
| require.Equal(t, 2, len(c2.Commits)) |
| require.Equal(t, 2, diag.StoleFromCommits) |
| require.Equal(t, 0.0, diag.TestednessIncrease) |
| checkDiag(c2) |
| require.NoError(t, processTaskCandidate(ctx, s, c1)) |
| diag = c1.GetDiagnostics().Scoring |
| require.InDelta(t, 1.25*0.5, c1.Score, scoreDelta) |
| require.Equal(t, 1, len(c1.Commits)) |
| require.Equal(t, 2, diag.StoleFromCommits) |
| require.Equal(t, 0.5, diag.TestednessIncrease) |
| checkDiag(c1) |
| } |
| |
| func TestProcessTaskCandidates(t *testing.T) { |
| ctx, _, _, _, s, _, _, cleanup := setup(t) |
| defer cleanup() |
| |
| ts := now.Now(ctx) |
| |
| // Processing of individual candidates is already tested; just verify |
| // that if we pass in a bunch of candidates they all get processed. |
| // The JobSpecs do not specify priority, so they use the default of 0.5. |
| assertProcessed := func(c *TaskCandidate) { |
| if c.IsTryJob() { |
| require.True(t, c.Score > CANDIDATE_SCORE_TRY_JOB*0.5) |
| require.Nil(t, c.Commits) |
| } else if c.IsForceRun() { |
| require.True(t, c.Score > CANDIDATE_SCORE_FORCE_RUN*0.5) |
| require.Equal(t, 2, len(c.Commits)) |
| } else if c.Revision == rs2.Revision { |
| if c.Name == tcc_testutils.PerfTaskName { |
| require.InDelta(t, 2.0*0.5, c.Score, scoreDelta) |
| require.Equal(t, 1, len(c.Commits)) |
| } else if c.Name == tcc_testutils.BuildTaskName { |
| // Already covered by the forced job, so zero score. |
| require.InDelta(t, 0, c.Score, scoreDelta) |
| // Scores below the BuildTask at rs1, so it has a blamelist of 1 commit. |
| require.Equal(t, 1, len(c.Commits)) |
| } else { |
| require.InDelta(t, 3.5*0.5, c.Score, scoreDelta) |
| require.Equal(t, 2, len(c.Commits)) |
| } |
| } else { |
| require.InDelta(t, 0.5*0.5, c.Score, scoreDelta) // These will be backfills. |
| require.Equal(t, 1, len(c.Commits)) |
| } |
| } |
| |
| testJob1 := &types.Job{ |
| Id: "testJob1", |
| Created: ts, |
| Name: tcc_testutils.TestTaskName, |
| Priority: 0.5, |
| RepoState: rs1, |
| } |
| testJob2 := &types.Job{ |
| Id: "testJob2", |
| Created: ts, |
| Name: tcc_testutils.TestTaskName, |
| Priority: 0.5, |
| RepoState: rs2, |
| } |
| perfJob2 := &types.Job{ |
| Id: "perfJob2", |
| Created: ts, |
| Name: tcc_testutils.PerfTaskName, |
| Priority: 0.5, |
| RepoState: rs2, |
| } |
| forcedBuildJob2 := &types.Job{ |
| Id: "forcedBuildJob2", |
| Created: ts, |
| Name: tcc_testutils.BuildTaskName, |
| Priority: 0.5, |
| RepoState: rs2, |
| } |
| tryjobRs := types.RepoState{ |
| Patch: types.Patch{ |
| Server: "my-server", |
| Issue: "my-issue", |
| Patchset: "my-patchset", |
| }, |
| Repo: rs1.Repo, |
| Revision: rs1.Revision, |
| } |
| perfTryjob2 := &types.Job{ |
| Id: "perfJob2", |
| Created: ts, |
| Name: tcc_testutils.PerfTaskName, |
| Priority: 0.5, |
| RepoState: tryjobRs, |
| } |
| |
| candidates := map[string]map[string][]*TaskCandidate{ |
| rs1.Repo: { |
| tcc_testutils.BuildTaskName: { |
| { |
| Jobs: []*types.Job{testJob1}, |
| TaskKey: types.TaskKey{ |
| RepoState: rs1, |
| Name: tcc_testutils.BuildTaskName, |
| }, |
| TaskSpec: &specs.TaskSpec{}, |
| }, |
| { |
| Jobs: []*types.Job{testJob2, perfJob2}, |
| TaskKey: types.TaskKey{ |
| RepoState: rs2, |
| Name: tcc_testutils.BuildTaskName, |
| }, |
| TaskSpec: &specs.TaskSpec{}, |
| }, |
| { |
| Jobs: []*types.Job{forcedBuildJob2}, |
| TaskKey: types.TaskKey{ |
| RepoState: rs2, |
| Name: tcc_testutils.BuildTaskName, |
| ForcedJobId: forcedBuildJob2.Id, |
| }, |
| TaskSpec: &specs.TaskSpec{}, |
| }, |
| }, |
| tcc_testutils.TestTaskName: { |
| { |
| Jobs: []*types.Job{testJob1}, |
| TaskKey: types.TaskKey{ |
| RepoState: rs1, |
| Name: tcc_testutils.TestTaskName, |
| }, |
| TaskSpec: &specs.TaskSpec{}, |
| }, |
| { |
| Jobs: []*types.Job{testJob2}, |
| TaskKey: types.TaskKey{ |
| RepoState: rs2, |
| Name: tcc_testutils.TestTaskName, |
| }, |
| TaskSpec: &specs.TaskSpec{}, |
| }, |
| }, |
| tcc_testutils.PerfTaskName: { |
| { |
| Jobs: []*types.Job{perfJob2}, |
| TaskKey: types.TaskKey{ |
| RepoState: rs2, |
| Name: tcc_testutils.PerfTaskName, |
| }, |
| TaskSpec: &specs.TaskSpec{}, |
| }, |
| { |
| Jobs: []*types.Job{perfTryjob2}, |
| TaskKey: types.TaskKey{ |
| RepoState: tryjobRs, |
| Name: tcc_testutils.PerfTaskName, |
| }, |
| TaskSpec: &specs.TaskSpec{}, |
| }, |
| }, |
| }, |
| } |
| |
| processed, err := s.processTaskCandidates(ctx, candidates) |
| require.NoError(t, err) |
| require.Len(t, processed, 7) |
| for _, c := range processed { |
| assertProcessed(c) |
| } |
| } |
| |
| func TestTestedness(t *testing.T) { |
| tc := []struct { |
| in int |
| out float64 |
| }{ |
| { |
| in: -1, |
| out: -1.0, |
| }, |
| { |
| in: 0, |
| out: 0.0, |
| }, |
| { |
| in: 1, |
| out: 1.0, |
| }, |
| { |
| in: 2, |
| out: 1.0 + 1.0/2.0, |
| }, |
| { |
| in: 3, |
| out: 1.0 + float64(2.0)/float64(3.0), |
| }, |
| { |
| in: 4, |
| out: 1.0 + 3.0/4.0, |
| }, |
| { |
| in: 4096, |
| out: 1.0 + float64(4095)/float64(4096), |
| }, |
| } |
| for i, c := range tc { |
| require.Equal(t, c.out, testedness(c.in), fmt.Sprintf("test case #%d", i)) |
| } |
| } |
| |
| func TestTestednessIncrease(t *testing.T) { |
| tc := []struct { |
| a int |
| b int |
| out float64 |
| }{ |
| // Invalid cases. |
| { |
| a: -1, |
| b: 10, |
| out: -1.0, |
| }, |
| { |
| a: 10, |
| b: -1, |
| out: -1.0, |
| }, |
| { |
| a: 0, |
| b: -1, |
| out: -1.0, |
| }, |
| { |
| a: 0, |
| b: 0, |
| out: -1.0, |
| }, |
| // Invalid because if we're re-running at already-tested commits |
| // then we should have a blamelist which is at most the size of |
| // the blamelist of the previous task. We naturally get negative |
| // testedness increase in these cases. |
| { |
| a: 2, |
| b: 1, |
| out: -0.5, |
| }, |
| // Testing only new commits. |
| { |
| a: 1, |
| b: 0, |
| out: 1.0 + 1.0, |
| }, |
| { |
| a: 2, |
| b: 0, |
| out: 2.0 + (1.0 + 1.0/2.0), |
| }, |
| { |
| a: 3, |
| b: 0, |
| out: 3.0 + (1.0 + float64(2.0)/float64(3.0)), |
| }, |
| { |
| a: 4096, |
| b: 0, |
| out: 4096.0 + (1.0 + float64(4095.0)/float64(4096.0)), |
| }, |
| // Retries. |
| { |
| a: 1, |
| b: 1, |
| out: 0.0, |
| }, |
| { |
| a: 2, |
| b: 2, |
| out: 0.0, |
| }, |
| { |
| a: 3, |
| b: 3, |
| out: 0.0, |
| }, |
| { |
| a: 4096, |
| b: 4096, |
| out: 0.0, |
| }, |
| // Bisect/backfills. |
| { |
| a: 1, |
| b: 2, |
| out: 0.5, // (1 + 1) - (1 + 1/2) |
| }, |
| { |
| a: 1, |
| b: 3, |
| out: float64(2.5) - (1.0 + float64(2.0)/float64(3.0)), |
| }, |
| { |
| a: 5, |
| b: 10, |
| out: 2.0*(1.0+float64(4.0)/float64(5.0)) - (1.0 + float64(9.0)/float64(10.0)), |
| }, |
| } |
| for i, c := range tc { |
| require.Equal(t, c.out, testednessIncrease(c.a, c.b), fmt.Sprintf("test case #%d", i)) |
| } |
| } |
| |
| func TestComputeBlamelist(t *testing.T) { |
| |
| // Setup. |
| nowCtx := now.TimeTravelingContext(mem_git.BaseTime.Add(time.Hour)) |
| ctx, cancel := context.WithCancel(nowCtx) |
| defer cancel() |
| |
| mg, repo := newMemRepo(t) |
| repos := repograph.Map{ |
| rs1.Repo: repo, |
| } |
| |
| d := memory.NewInMemoryTaskDB() |
| w, err := window.New(ctx, 2*time.Hour, 0, nil) |
| cache, err := cache.NewTaskCache(ctx, d, w, nil) |
| require.NoError(t, err) |
| |
| btProject, btInstance, btCleanup := tcc_testutils.SetupBigTable(t) |
| defer btCleanup() |
| tcc, err := task_cfg_cache.NewTaskCfgCache(ctx, repos, btProject, btInstance, nil) |
| require.NoError(t, err) |
| |
| // The test repo is laid out like this: |
| // * T (HEAD, main, Case #12) |
| // * S (Time travel commit; before the start of the window) |
| // * R (Case #11) |
| // |\ |
| // * | Q |
| // | * P |
| // |/ |
| // * O (Case #9) |
| // * N |
| // * M (Case #10) |
| // * L |
| // * K (Case #6) |
| // * J (Case #5) |
| // |\ |
| // | * I |
| // | * H (Case #4) |
| // * | G |
| // * | F (Case #3) |
| // * | E (Case #8, previously #7) |
| // |/ |
| // * D (Case #2) |
| // * C (Case #1) |
| // ... |
| // * B (Case #0) |
| // * A |
| // * _ (No TasksCfg; blamelists shouldn't include this) |
| // |
| hashes := map[string]string{} |
| name := "Test-Ubuntu12-ShuttleA-GTX660-x86-Release" |
| taskCfg := &specs.TasksCfg{ |
| Tasks: map[string]*specs.TaskSpec{ |
| name: {}, |
| }, |
| } |
| commit := func(name string) { |
| hashes[name] = mg.Commit(name) |
| require.NoError(t, tcc.Set(ctx, types.RepoState{ |
| Repo: rs1.Repo, |
| Revision: hashes[name], |
| }, taskCfg, nil)) |
| } |
| |
| // Initial commit. |
| hashes["_"] = mg.Commit("_") |
| |
| // First commit containing TasksCfg. |
| commit("A") |
| |
| type testCase struct { |
| Revision string |
| Expected []string |
| StoleFromIdx int |
| TaskName string |
| } |
| |
| ids := []string{} |
| commitsBuf := make([]*repograph.Commit, 0, MAX_BLAMELIST_COMMITS) |
| test := func(tc *testCase) { |
| // Update the repo. |
| require.NoError(t, repo.Update(ctx)) |
| // Self-check: make sure we don't pass in empty commit hashes. |
| for _, h := range tc.Expected { |
| require.NotEqual(t, h, "") |
| } |
| |
| // Ensure that we get the expected blamelist. |
| revision := repo.Get(tc.Revision) |
| require.NotNil(t, revision) |
| taskName := tc.TaskName |
| if taskName == "" { |
| taskName = name |
| } |
| commits, stoleFrom, err := ComputeBlamelist(ctx, cache, repo, taskName, rs1.Repo, revision, commitsBuf, tcc, w) |
| if tc.Revision == "" { |
| require.Error(t, err) |
| return |
| } else { |
| require.NoError(t, err) |
| } |
| sort.Strings(commits) |
| sort.Strings(tc.Expected) |
| assertdeep.Equal(t, tc.Expected, commits) |
| if tc.StoleFromIdx >= 0 { |
| require.NotNil(t, stoleFrom) |
| require.Equal(t, ids[tc.StoleFromIdx], stoleFrom.Id) |
| } else { |
| require.Nil(t, stoleFrom) |
| } |
| |
| // Insert the task into the DB. |
| c := &TaskCandidate{ |
| TaskKey: types.TaskKey{ |
| RepoState: types.RepoState{ |
| Repo: rs1.Repo, |
| Revision: tc.Revision, |
| }, |
| Name: taskName, |
| }, |
| TaskSpec: &specs.TaskSpec{}, |
| } |
| task := c.MakeTask() |
| task.Commits = commits |
| task.Created = now.Now(ctx) |
| if stoleFrom != nil { |
| // Re-insert the stoleFrom task without the commits |
| // which were stolen from it. |
| stoleFromCommits := make([]string, 0, len(stoleFrom.Commits)-len(commits)) |
| for _, commit := range stoleFrom.Commits { |
| if !util.In(commit, task.Commits) { |
| stoleFromCommits = append(stoleFromCommits, commit) |
| } |
| } |
| stoleFrom.Commits = stoleFromCommits |
| require.NoError(t, d.PutTasks(ctx, []*types.Task{task, stoleFrom})) |
| cache.AddTasks([]*types.Task{task, stoleFrom}) |
| } else { |
| require.NoError(t, d.PutTask(ctx, task)) |
| cache.AddTasks([]*types.Task{task}) |
| } |
| ids = append(ids, task.Id) |
| require.NoError(t, cache.Update(ctx)) |
| } |
| |
| // Commit B. |
| commit("B") |
| |
| // Test cases. Each test case builds on the previous cases. |
| |
| // 0. The first task, at HEAD. |
| test(&testCase{ |
| Revision: hashes["B"], |
| Expected: []string{hashes["B"], hashes["A"]}, |
| StoleFromIdx: -1, |
| }) |
| |
| // Test the blamelist too long case by creating a bunch of commits. |
| for i := 0; i < MAX_BLAMELIST_COMMITS+1; i++ { |
| commit("C") |
| } |
| commit("D") |
| |
| // 1. Blamelist too long, not a branch head. |
| test(&testCase{ |
| Revision: hashes["C"], |
| Expected: []string{hashes["C"]}, |
| StoleFromIdx: -1, |
| }) |
| |
| // 2. Blamelist too long, is a branch head. |
| test(&testCase{ |
| Revision: hashes["D"], |
| Expected: []string{hashes["D"]}, |
| StoleFromIdx: -1, |
| }) |
| |
| // Create the remaining commits. |
| commit("E") |
| commit("F") |
| commit("G") |
| mg.NewBranch("otherbranch", hashes["D"]) |
| mg.CheckoutBranch("otherbranch") |
| commit("H") |
| commit("I") |
| mg.CheckoutBranch(git.MainBranch) |
| hashes["J"] = mg.Merge("otherbranch") |
| require.NoError(t, tcc.Set(ctx, types.RepoState{ |
| Repo: rs1.Repo, |
| Revision: hashes["J"], |
| }, taskCfg, nil)) |
| |
| commit("K") |
| |
| // 3. On a linear set of commits, with at least one previous task. |
| test(&testCase{ |
| Revision: hashes["F"], |
| Expected: []string{hashes["E"], hashes["F"]}, |
| StoleFromIdx: -1, |
| }) |
| // 4. The first task on a new branch. |
| test(&testCase{ |
| Revision: hashes["H"], |
| Expected: []string{hashes["H"]}, |
| StoleFromIdx: -1, |
| }) |
| // 5. After a merge. |
| test(&testCase{ |
| Revision: hashes["J"], |
| Expected: []string{hashes["G"], hashes["I"], hashes["J"]}, |
| StoleFromIdx: -1, |
| }) |
| // 6. One last "normal" task. |
| test(&testCase{ |
| Revision: hashes["K"], |
| Expected: []string{hashes["K"]}, |
| StoleFromIdx: -1, |
| }) |
| // 7. Steal commits from a previously-ingested task. |
| test(&testCase{ |
| Revision: hashes["E"], |
| Expected: []string{hashes["E"]}, |
| StoleFromIdx: 3, |
| }) |
| |
| // Ensure that task #8 really stole the commit from #3. |
| task, err := cache.GetTask(ids[3]) |
| require.NoError(t, err) |
| require.False(t, util.In(hashes["E"], task.Commits), fmt.Sprintf("Expected not to find %s in %v", hashes["E"], task.Commits)) |
| |
| // 8. Retry #7. |
| test(&testCase{ |
| Revision: hashes["E"], |
| Expected: []string{hashes["E"]}, |
| StoleFromIdx: 7, |
| }) |
| |
| // Ensure that task #8 really stole the commit from #7. |
| task, err = cache.GetTask(ids[7]) |
| require.NoError(t, err) |
| require.Equal(t, 0, len(task.Commits)) |
| |
| // Four more commits. |
| commit("L") |
| commit("M") |
| commit("N") |
| commit("O") |
| |
| // 9. Not really a test case, but setting up for #10. |
| test(&testCase{ |
| Revision: hashes["O"], |
| Expected: []string{hashes["L"], hashes["M"], hashes["N"], hashes["O"]}, |
| StoleFromIdx: -1, |
| }) |
| |
| // 10. Steal *two* commits from #9. |
| test(&testCase{ |
| Revision: hashes["M"], |
| Expected: []string{hashes["L"], hashes["M"]}, |
| StoleFromIdx: 9, |
| }) |
| |
| // 11. Verify that we correctly track when task specs were added. |
| mg.NewBranch("otherbranch2", hashes["O"]) |
| commit("P") |
| mg.CheckoutBranch(git.MainBranch) |
| commit("Q") |
| newTaskCfg := taskCfg.Copy() |
| newTaskCfg.Tasks["added-task"] = &specs.TaskSpec{} |
| require.NoError(t, tcc.Set(ctx, types.RepoState{ |
| Repo: rs1.Repo, |
| Revision: hashes["Q"], |
| }, newTaskCfg, nil)) |
| hashes["R"] = mg.Merge("otherbranch2") |
| require.NoError(t, tcc.Set(ctx, types.RepoState{ |
| Repo: rs1.Repo, |
| Revision: hashes["R"], |
| }, newTaskCfg, nil)) |
| // Existing task should get a normal blamelist of all three new commits. |
| test(&testCase{ |
| Revision: hashes["R"], |
| Expected: []string{hashes["P"], hashes["Q"], hashes["R"]}, |
| StoleFromIdx: -1, |
| }) |
| // The added task's blamelist should only include commits at which the |
| // task was defined, ie. not P. |
| test(&testCase{ |
| Revision: hashes["R"], |
| Expected: []string{hashes["Q"], hashes["R"]}, |
| StoleFromIdx: -1, |
| TaskName: "added-task", |
| }) |
| |
| // 12. Stop computing blamelists when we reach a commit outside of the |
| // scheduling window. |
| hashes["S"] = mg.CommitAt("S", w.EarliestStart().Add(-time.Hour)) |
| require.NoError(t, tcc.Set(ctx, types.RepoState{ |
| Repo: rs1.Repo, |
| Revision: hashes["S"], |
| }, taskCfg, nil)) |
| commit("T") |
| test(&testCase{ |
| Revision: hashes["T"], |
| Expected: []string{hashes["T"]}, |
| StoleFromIdx: -1, |
| }) |
| } |
| |
| func TestTimeDecay24Hr(t *testing.T) { |
| tc := []struct { |
| decayAmt24Hr float64 |
| elapsed time.Duration |
| out float64 |
| }{ |
| { |
| decayAmt24Hr: 1.0, |
| elapsed: 10 * time.Hour, |
| out: 1.0, |
| }, |
| { |
| decayAmt24Hr: 0.5, |
| elapsed: 0 * time.Hour, |
| out: 1.0, |
| }, |
| { |
| decayAmt24Hr: 0.5, |
| elapsed: 24 * time.Hour, |
| out: 0.5, |
| }, |
| { |
| decayAmt24Hr: 0.5, |
| elapsed: 12 * time.Hour, |
| out: 0.75, |
| }, |
| { |
| decayAmt24Hr: 0.5, |
| elapsed: 36 * time.Hour, |
| out: 0.25, |
| }, |
| { |
| decayAmt24Hr: 0.5, |
| elapsed: 48 * time.Hour, |
| out: 0.0, |
| }, |
| { |
| decayAmt24Hr: 0.5, |
| elapsed: 72 * time.Hour, |
| out: 0.0, |
| }, |
| } |
| for i, c := range tc { |
| require.Equal(t, c.out, timeDecay24Hr(c.decayAmt24Hr, c.elapsed), fmt.Sprintf("test case #%d", i)) |
| } |
| } |
| |
| func TestRegenerateTaskQueue(t *testing.T) { |
| ctx, _, _, _, s, _, _, cleanup := setup(t) |
| defer cleanup() |
| |
| // Ensure that the queue is initially empty. |
| require.Equal(t, 0, len(s.queue)) |
| |
| c1 := rs1.Revision |
| c2 := rs2.Revision |
| |
| // Regenerate the task queue. |
| queue, _, err := s.regenerateTaskQueue(ctx) |
| require.NoError(t, err) |
| require.Len(t, queue, 2) // Two Build tasks. |
| |
| testSort := func() { |
| // Ensure that we sorted correctly. |
| if len(queue) == 0 { |
| return |
| } |
| highScore := queue[0].Score |
| for _, c := range queue { |
| require.True(t, highScore >= c.Score) |
| highScore = c.Score |
| } |
| } |
| testSort() |
| |
| // Since we haven't run any task yet, we should have the two Build |
| // tasks. |
| // The one at HEAD should have a two-commit blamelist and a |
| // score of 3.5, scaled by a priority of 0.875 due to three jobs |
| // depending on it (1 - 0.5^3). |
| require.Equal(t, tcc_testutils.BuildTaskName, queue[0].Name) |
| require.Equal(t, []string{c2, c1}, queue[0].Commits) |
| require.Equal(t, 3, len(queue[0].Jobs)) |
| require.InDelta(t, 3.5*0.875, queue[0].Score, scoreDelta) |
| // The other should have one commit in its blamelist and |
| // a score of 0.5, scaled by a priority of 0.75 due to two jobs. |
| require.Equal(t, tcc_testutils.BuildTaskName, queue[1].Name) |
| require.Equal(t, []string{c1}, queue[1].Commits) |
| require.InDelta(t, 0.5*0.75, queue[1].Score, scoreDelta) |
| |
| // Insert the task at c1, even though it scored lower. |
| t1 := makeTask(ctx, queue[1].Name, queue[1].Repo, queue[1].Revision) |
| require.NotNil(t, t1) |
| t1.Status = types.TASK_STATUS_SUCCESS |
| t1.IsolatedOutput = "fake isolated hash" |
| require.NoError(t, s.putTask(ctx, t1)) |
| |
| // Regenerate the task queue. |
| queue, _, err = s.regenerateTaskQueue(ctx) |
| require.NoError(t, err) |
| |
| // Now we expect the queue to contain the other Build task and the one |
| // Test task we unblocked by running the first Build task. |
| require.Len(t, queue, 2) |
| testSort() |
| for _, c := range queue { |
| if c.Name == tcc_testutils.TestTaskName { |
| require.InDelta(t, 2.0*0.5, c.Score, scoreDelta) |
| require.Equal(t, 1, len(c.Commits)) |
| } else { |
| require.Equal(t, c.Name, tcc_testutils.BuildTaskName) |
| require.InDelta(t, 2.0*0.875, c.Score, scoreDelta) |
| require.Equal(t, []string{c.Revision}, c.Commits) |
| } |
| } |
| buildIdx := 0 |
| testIdx := 1 |
| if queue[1].Name == tcc_testutils.BuildTaskName { |
| buildIdx = 1 |
| testIdx = 0 |
| } |
| require.Equal(t, tcc_testutils.BuildTaskName, queue[buildIdx].Name) |
| require.Equal(t, c2, queue[buildIdx].Revision) |
| |
| require.Equal(t, tcc_testutils.TestTaskName, queue[testIdx].Name) |
| require.Equal(t, c1, queue[testIdx].Revision) |
| |
| // Run the other Build task. |
| t2 := makeTask(ctx, queue[buildIdx].Name, queue[buildIdx].Repo, queue[buildIdx].Revision) |
| t2.Status = types.TASK_STATUS_SUCCESS |
| t2.IsolatedOutput = "fake isolated hash" |
| require.NoError(t, s.putTask(ctx, t2)) |
| |
| // Regenerate the task queue. |
| queue, _, err = s.regenerateTaskQueue(ctx) |
| require.NoError(t, err) |
| require.Len(t, queue, 3) |
| testSort() |
| perfIdx := -1 |
| for i, c := range queue { |
| if c.Name == tcc_testutils.PerfTaskName { |
| perfIdx = i |
| require.Equal(t, c2, c.Revision) |
| require.InDelta(t, 2.0*0.5, c.Score, scoreDelta) |
| require.Equal(t, []string{c.Revision}, c.Commits) |
| } else { |
| require.Equal(t, c.Name, tcc_testutils.TestTaskName) |
| if c.Revision == c2 { |
| require.InDelta(t, 3.5*0.5, c.Score, scoreDelta) |
| require.Equal(t, []string{c2, c1}, c.Commits) |
| } else { |
| require.InDelta(t, 0.5*0.5, c.Score, scoreDelta) |
| require.Equal(t, []string{c.Revision}, c.Commits) |
| } |
| } |
| } |
| require.True(t, perfIdx > -1) |
| |
| // Run the Test task at tip of tree; its blamelist covers both commits. |
| t3 := makeTask(ctx, tcc_testutils.TestTaskName, rs1.Repo, c2) |
| t3.Commits = []string{c2, c1} |
| t3.Status = types.TASK_STATUS_SUCCESS |
| t3.IsolatedOutput = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/256" |
| require.NoError(t, s.putTask(ctx, t3)) |
| |
| // Regenerate the task queue. |
| queue, _, err = s.regenerateTaskQueue(ctx) |
| require.NoError(t, err) |
| |
| // Now we expect the queue to contain one Test and one Perf task. The |
| // Test task is a backfill, and should have a score of 0.5, scaled by |
| // the priority of 0.5. |
| require.Len(t, queue, 2) |
| testSort() |
| // First candidate should be the perf task. |
| require.Equal(t, tcc_testutils.PerfTaskName, queue[0].Name) |
| require.InDelta(t, 2.0*0.5, queue[0].Score, scoreDelta) |
| // The test task is next, a backfill. |
| require.Equal(t, tcc_testutils.TestTaskName, queue[1].Name) |
| require.InDelta(t, 0.5*0.5, queue[1].Score, scoreDelta) |
| } |
| |
| func makeTaskCandidate(name string, dims []string) *TaskCandidate { |
| return &TaskCandidate{ |
| Score: 1.0, |
| TaskKey: types.TaskKey{ |
| Name: name, |
| }, |
| TaskSpec: &specs.TaskSpec{ |
| Dimensions: dims, |
| }, |
| } |
| } |
| |
| func makeSwarmingBot(id string, dims []string) *types.Machine { |
| return &types.Machine{ |
| ID: id, |
| Dimensions: dims, |
| } |
| } |
| |
| func TestGetCandidatesToSchedule(t *testing.T) { |
| ctx := context.Background() |
| // Empty lists. |
| rv := getCandidatesToSchedule(ctx, []*types.Machine{}, []*TaskCandidate{}) |
| require.Empty(t, rv) |
| |
| // checkDiags takes a list of bots with the same dimensions and a list of |
| // ordered candidates that match those bots and checks the Diagnostics for |
| // candidates. |
| checkDiags := func(bots []*types.Machine, candidates []*TaskCandidate) { |
| var expectedBots []string |
| if len(bots) > 0 { |
| expectedBots = make([]string, len(bots), len(bots)) |
| for i, b := range bots { |
| expectedBots[i] = b.ID |
| } |
| } |
| for i, c := range candidates { |
| // These conditions are not tested. |
| require.False(t, c.Diagnostics.Scheduling.OverSchedulingLimitPerTaskSpec) |
| require.False(t, c.Diagnostics.Scheduling.ScoreBelowThreshold) |
| // NoBotsAvailable and MatchingBots will be the same for all candidates. |
| require.Equal(t, len(expectedBots) == 0, c.Diagnostics.Scheduling.NoBotsAvailable) |
| require.Equal(t, expectedBots, c.Diagnostics.Scheduling.MatchingBots) |
| require.Equal(t, i, c.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates) |
| if i == 0 { |
| // First candidate. |
| require.Nil(t, c.Diagnostics.Scheduling.LastSimilarCandidate) |
| } else { |
| last := candidates[i-1] |
| require.Equal(t, &last.TaskKey, c.Diagnostics.Scheduling.LastSimilarCandidate) |
| } |
| require.Equal(t, i < len(bots), c.Diagnostics.Scheduling.Selected) |
| } |
| // Clear diagnostics for next test. |
| for _, c := range candidates { |
| c.Diagnostics = nil |
| } |
| } |
| |
| t1 := makeTaskCandidate("task1", []string{"k:v"}) |
| rv = getCandidatesToSchedule(ctx, []*types.Machine{}, []*TaskCandidate{t1}) |
| require.Empty(t, rv) |
| checkDiags([]*types.Machine{}, []*TaskCandidate{t1}) |
| |
| b1 := makeSwarmingBot("bot1", []string{"k:v"}) |
| rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{}) |
| require.Empty(t, rv) |
| |
| // Single match. |
| rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{t1}) |
| assertdeep.Equal(t, []*TaskCandidate{t1}, rv) |
| checkDiags([]*types.Machine{b1}, []*TaskCandidate{t1}) |
| |
| // No match. |
| t1.TaskSpec.Dimensions[0] = "k:v2" |
| rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{t1}) |
| require.Empty(t, rv) |
| checkDiags([]*types.Machine{}, []*TaskCandidate{t1}) |
| |
| // Add a task candidate to match b1. |
| t1 = makeTaskCandidate("task1", []string{"k:v2"}) |
| t2 := makeTaskCandidate("task2", []string{"k:v"}) |
| rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{t1, t2}) |
| assertdeep.Equal(t, []*TaskCandidate{t2}, rv) |
| checkDiags([]*types.Machine{}, []*TaskCandidate{t1}) |
| checkDiags([]*types.Machine{b1}, []*TaskCandidate{t2}) |
| |
| // Switch the task order. |
| t1 = makeTaskCandidate("task1", []string{"k:v2"}) |
| t2 = makeTaskCandidate("task2", []string{"k:v"}) |
| rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{t2, t1}) |
| assertdeep.Equal(t, []*TaskCandidate{t2}, rv) |
| checkDiags([]*types.Machine{}, []*TaskCandidate{t1}) |
| checkDiags([]*types.Machine{b1}, []*TaskCandidate{t2}) |
| |
| // Make both tasks match the bot, ensure that we pick the first one. |
| t1 = makeTaskCandidate("task1", []string{"k:v"}) |
| t2 = makeTaskCandidate("task2", []string{"k:v"}) |
| rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{t1, t2}) |
| assertdeep.Equal(t, []*TaskCandidate{t1}, rv) |
| checkDiags([]*types.Machine{b1}, []*TaskCandidate{t1, t2}) |
| rv = getCandidatesToSchedule(ctx, []*types.Machine{b1}, []*TaskCandidate{t2, t1}) |
| assertdeep.Equal(t, []*TaskCandidate{t2}, rv) |
| checkDiags([]*types.Machine{b1}, []*TaskCandidate{t2, t1}) |
| |
| // Multiple dimensions. Ensure that different permutations of the bots |
| // and tasks lists give us the expected results. |
| dims := []string{"k:v", "k2:v2", "k3:v3"} |
| b1 = makeSwarmingBot("bot1", dims) |
| b2 := makeSwarmingBot("bot2", t1.TaskSpec.Dimensions) |
| t1 = makeTaskCandidate("task1", []string{"k:v"}) |
| t2 = makeTaskCandidate("task2", dims) |
| // In the first two cases, the task with fewer dimensions has the |
| // higher priority. It gets the bot with more dimensions because it |
| // is first in sorted order. The second task does not get scheduled |
| // because there is no bot available which can run it. |
| // TODO(borenet): Use a more optimal solution to avoid this case. |
| rv = getCandidatesToSchedule(ctx, []*types.Machine{b1, b2}, []*TaskCandidate{t1, t2}) |
| assertdeep.Equal(t, []*TaskCandidate{t1}, rv) |
| // Can't use checkDiags for these cases. |
| require.Equal(t, []string{b1.ID, b2.ID}, t1.Diagnostics.Scheduling.MatchingBots) |
| require.Equal(t, 0, t1.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates) |
| require.Nil(t, t1.Diagnostics.Scheduling.LastSimilarCandidate) |
| require.True(t, t1.Diagnostics.Scheduling.Selected) |
| t1.Diagnostics = nil |
| require.Equal(t, []string{b1.ID}, t2.Diagnostics.Scheduling.MatchingBots) |
| require.Equal(t, 1, t2.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates) |
| require.Equal(t, &t1.TaskKey, t2.Diagnostics.Scheduling.LastSimilarCandidate) |
| require.False(t, t2.Diagnostics.Scheduling.Selected) |
| t2.Diagnostics = nil |
| |
| t1 = makeTaskCandidate("task1", []string{"k:v"}) |
| t2 = makeTaskCandidate("task2", dims) |
| rv = getCandidatesToSchedule(ctx, []*types.Machine{b2, b1}, []*TaskCandidate{t1, t2}) |
| assertdeep.Equal(t, []*TaskCandidate{t1}, rv) |
| require.Equal(t, []string{b1.ID, b2.ID}, t1.Diagnostics.Scheduling.MatchingBots) |
| require.Equal(t, 0, t1.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates) |
| require.Nil(t, t1.Diagnostics.Scheduling.LastSimilarCandidate) |
| require.True(t, t1.Diagnostics.Scheduling.Selected) |
| t1.Diagnostics = nil |
| require.Equal(t, []string{b1.ID}, t2.Diagnostics.Scheduling.MatchingBots) |
| require.Equal(t, 1, t2.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates) |
| require.Equal(t, &t1.TaskKey, t2.Diagnostics.Scheduling.LastSimilarCandidate) |
| require.False(t, t2.Diagnostics.Scheduling.Selected) |
| t2.Diagnostics = nil |
| |
| // In these two cases, the task with more dimensions has the higher |
| // priority. Both tasks get scheduled. |
| t1 = makeTaskCandidate("task1", []string{"k:v"}) |
| t2 = makeTaskCandidate("task2", dims) |
| rv = getCandidatesToSchedule(ctx, []*types.Machine{b1, b2}, []*TaskCandidate{t2, t1}) |
| assertdeep.Equal(t, []*TaskCandidate{t2, t1}, rv) |
| require.Equal(t, []string{b1.ID, b2.ID}, t1.Diagnostics.Scheduling.MatchingBots) |
| require.Equal(t, 1, t1.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates) |
| require.Equal(t, &t2.TaskKey, t1.Diagnostics.Scheduling.LastSimilarCandidate) |
| require.True(t, t1.Diagnostics.Scheduling.Selected) |
| t1.Diagnostics = nil |
| require.Equal(t, []string{b1.ID}, t2.Diagnostics.Scheduling.MatchingBots) |
| require.Equal(t, 0, t2.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates) |
| require.Nil(t, t2.Diagnostics.Scheduling.LastSimilarCandidate) |
| require.True(t, t2.Diagnostics.Scheduling.Selected) |
| t2.Diagnostics = nil |
| |
| t1 = makeTaskCandidate("task1", []string{"k:v"}) |
| t2 = makeTaskCandidate("task2", dims) |
| rv = getCandidatesToSchedule(ctx, []*types.Machine{b2, b1}, []*TaskCandidate{t2, t1}) |
| assertdeep.Equal(t, []*TaskCandidate{t2, t1}, rv) |
| require.Equal(t, []string{b1.ID, b2.ID}, t1.Diagnostics.Scheduling.MatchingBots) |
| require.Equal(t, 1, t1.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates) |
| require.Equal(t, &t2.TaskKey, t1.Diagnostics.Scheduling.LastSimilarCandidate) |
| require.True(t, t1.Diagnostics.Scheduling.Selected) |
| t1.Diagnostics = nil |
| require.Equal(t, []string{b1.ID}, t2.Diagnostics.Scheduling.MatchingBots) |
| require.Equal(t, 0, t2.Diagnostics.Scheduling.NumHigherScoreSimilarCandidates) |
| require.Nil(t, t2.Diagnostics.Scheduling.LastSimilarCandidate) |
| require.True(t, t2.Diagnostics.Scheduling.Selected) |
| t2.Diagnostics = nil |
| |
| // Matching dimensions. More bots than tasks. |
| b2 = makeSwarmingBot("bot2", dims) |
| b3 := makeSwarmingBot("bot3", dims) |
| t1 = makeTaskCandidate("task1", dims) |
| t2 = makeTaskCandidate("task2", dims) |
| t3 := makeTaskCandidate("task3", dims) |
| rv = getCandidatesToSchedule(ctx, []*types.Machine{b1, b2, b3}, []*TaskCandidate{t1, t2}) |
| assertdeep.Equal(t, []*TaskCandidate{t1, t2}, rv) |
| checkDiags([]*types.Machine{b1, b2, b3}, []*TaskCandidate{t1, t2}) |
| |
| // More tasks than bots. |
| t1 = makeTaskCandidate("task1", dims) |
| t2 = makeTaskCandidate("task2", dims) |
| t3 = makeTaskCandidate("task3", dims) |
| rv = getCandidatesToSchedule(ctx, []*types.Machine{b1, b2}, []*TaskCandidate{t1, t2, t3}) |
| assertdeep.Equal(t, []*TaskCandidate{t1, t2}, rv) |
| checkDiags([]*types.Machine{b1, b2}, []*TaskCandidate{t1, t2, t3}) |
| } |
| |
| func makeBot(id string, dims map[string]string) *types.Machine { |
| dimensions := make([]string, 0, len(dims)) |
| for k, v := range dims { |
| dimensions = append(dimensions, fmt.Sprintf("%s:%s", k, v)) |
| } |
| return &types.Machine{ |
| ID: id, |
| Dimensions: dimensions, |
| } |
| } |
| |
| func mockBots(t sktest.TestingT, swarmingClient *swarming_testutils.TestClient, bots ...*types.Machine) { |
| swarmBots := make([]*swarming_api.SwarmingRpcsBotInfo, 0, len(bots)) |
| for _, bot := range bots { |
| dims, err := swarming.ParseDimensions(bot.Dimensions) |
| require.NoError(t, err) |
| swarmBots = append(swarmBots, &swarming_api.SwarmingRpcsBotInfo{ |
| BotId: bot.ID, |
| Dimensions: swarming.StringMapToBotDimensions(dims), |
| }) |
| } |
| swarmingClient.MockBots(swarmBots) |
| } |
| |
| func TestSchedulingE2E(t *testing.T) { |
| ctx, _, _, swarmingClient, s, _, cas, cleanup := setup(t) |
| defer cleanup() |
| |
| c1 := rs1.Revision |
| c2 := rs2.Revision |
| |
| // Start testing. No free bots, so we get a full queue with nothing |
| // scheduled. |
| runMainLoop(t, s, ctx) |
| tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2}) |
| require.NoError(t, err) |
| expect := map[string]map[string]*types.Task{ |
| c1: {}, |
| c2: {}, |
| } |
| assertdeep.Equal(t, expect, tasks) |
| require.Equal(t, 2, len(s.queue)) // Two compile tasks. |
| |
| // A bot is free but doesn't have all of the right dimensions to run a task. |
| bot1 := makeBot("bot1", map[string]string{"pool": "Skia"}) |
| mockBots(t, swarmingClient, bot1) |
| runMainLoop(t, s, ctx) |
| tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2}) |
| require.NoError(t, err) |
| expect = map[string]map[string]*types.Task{ |
| c1: {}, |
| c2: {}, |
| } |
| assertdeep.Equal(t, expect, tasks) |
| require.Equal(t, 2, len(s.queue)) // Still two compile tasks. |
| |
| // One bot free, schedule a task, ensure it's not in the queue. |
| bot1.Dimensions = append(bot1.Dimensions, "os:Ubuntu") |
| mockBots(t, swarmingClient, bot1) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2}) |
| require.NoError(t, err) |
| t1 := tasks[c2][tcc_testutils.BuildTaskName] |
| require.NotNil(t, t1) |
| require.Equal(t, c2, t1.Revision) |
| require.Equal(t, tcc_testutils.BuildTaskName, t1.Name) |
| require.Equal(t, []string{c2, c1}, t1.Commits) |
| require.Equal(t, 1, len(s.queue)) |
| |
| // The task is complete. |
| t1.Status = types.TASK_STATUS_SUCCESS |
| t1.Finished = now.Now(ctx) |
| t1.IsolatedOutput = "dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd/86" |
| require.NoError(t, s.putTask(ctx, t1)) |
| swarmingClient.MockTasks([]*swarming_api.SwarmingRpcsTaskRequestMetadata{ |
| makeSwarmingRpcsTaskRequestMetadata(t, t1, linuxTaskDims), |
| }) |
| cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.TestCASDigest, t1.IsolatedOutput}).Return("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbabc123/56", nil) |
| cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.PerfCASDigest, t1.IsolatedOutput}).Return("ccccccccccccccccccccccccccccccccccccccccccccccccccccccccccabc123/56", nil) |
| |
| // No bots free. Ensure that the queue is correct. |
| mockBots(t, swarmingClient) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2}) |
| require.NoError(t, err) |
| for _, c := range t1.Commits { |
| expect[c][t1.Name] = t1 |
| } |
| assertdeep.Equal(t, expect, tasks) |
| expectLen := 3 // One remaining build task, plus one test task and one perf task. |
| require.Equal(t, expectLen, len(s.queue)) |
| |
| // More bots than tasks free, ensure the queue is correct. |
| bot2 := makeBot("bot2", androidTaskDims) |
| bot3 := makeBot("bot3", androidTaskDims) |
| bot4 := makeBot("bot4", linuxTaskDims) |
| mockBots(t, swarmingClient, bot1, bot2, bot3, bot4) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| _, err = s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2}) |
| require.NoError(t, err) |
| require.Equal(t, 0, len(s.queue)) |
| |
| // The build, test, and perf tasks should have triggered. |
| var t2 *types.Task |
| var t3 *types.Task |
| var t4 *types.Task |
| tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2}) |
| require.NoError(t, err) |
| require.Len(t, tasks, 2) |
| for commit, v := range tasks { |
| if commit == c1 { |
| // Build task at c1 and test task at c2 whose blamelist also has c1. |
| require.Len(t, v, 2) |
| for _, task := range v { |
| if task.Revision != commit { |
| continue |
| } |
| require.Equal(t, tcc_testutils.BuildTaskName, task.Name) |
| require.Nil(t, t4) |
| t4 = task |
| require.Equal(t, c1, task.Revision) |
| require.Equal(t, []string{c1}, task.Commits) |
| } |
| } else { |
| require.Len(t, v, 3) |
| for _, task := range v { |
| if task.Name == tcc_testutils.TestTaskName { |
| require.Nil(t, t2) |
| t2 = task |
| require.Equal(t, c2, task.Revision) |
| require.Equal(t, []string{c2, c1}, task.Commits) |
| } else if task.Name == tcc_testutils.PerfTaskName { |
| require.Nil(t, t3) |
| t3 = task |
| require.Equal(t, c2, task.Revision) |
| require.Equal(t, []string{c2}, task.Commits) |
| } else { |
| // This is the first task we triggered. |
| require.Equal(t, tcc_testutils.BuildTaskName, task.Name) |
| } |
| } |
| } |
| } |
| require.NotNil(t, t2) |
| require.NotNil(t, t3) |
| require.NotNil(t, t4) |
| t4.Status = types.TASK_STATUS_SUCCESS |
| t4.Finished = now.Now(ctx) |
| t4.IsolatedOutput = "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee/99" |
| require.NoError(t, s.putTask(ctx, t4)) |
| |
| // No new bots free; only the remaining test task should be in the queue. |
| mockBots(t, swarmingClient) |
| mockTasks := []*swarming_api.SwarmingRpcsTaskRequestMetadata{ |
| makeSwarmingRpcsTaskRequestMetadata(t, t2, linuxTaskDims), |
| makeSwarmingRpcsTaskRequestMetadata(t, t3, linuxTaskDims), |
| makeSwarmingRpcsTaskRequestMetadata(t, t4, linuxTaskDims), |
| } |
| swarmingClient.MockTasks(mockTasks) |
| require.NoError(t, s.updateUnfinishedTasks(ctx)) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| expectLen = 1 // Test task from c1 |
| require.Equal(t, expectLen, len(s.queue)) |
| |
| // Finish the other task. |
| t3, err = s.tCache.GetTask(t3.Id) |
| require.NoError(t, err) |
| t3.Status = types.TASK_STATUS_SUCCESS |
| t3.Finished = now.Now(ctx) |
| t3.IsolatedOutput = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/256" |
| |
| // Ensure that we finalize all of the tasks and insert into the DB. |
| mockBots(t, swarmingClient, bot1, bot2, bot3, bot4) |
| mockTasks = []*swarming_api.SwarmingRpcsTaskRequestMetadata{ |
| makeSwarmingRpcsTaskRequestMetadata(t, t3, linuxTaskDims), |
| } |
| cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.TestCASDigest, t4.IsolatedOutput}).Return("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbabc123/56", nil) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2}) |
| require.NoError(t, err) |
| require.Equal(t, 2, len(tasks[c1])) |
| require.Equal(t, 3, len(tasks[c2])) |
| require.Equal(t, 0, len(s.queue)) |
| |
| // Mark everything as finished. Ensure that the queue still ends up empty. |
| tasksList := []*types.Task{} |
| for _, v := range tasks { |
| for _, task := range v { |
| if task.Status != types.TASK_STATUS_SUCCESS { |
| task.Status = types.TASK_STATUS_SUCCESS |
| task.Finished = now.Now(ctx) |
| task.IsolatedOutput = "abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234/56" |
| tasksList = append(tasksList, task) |
| } |
| } |
| } |
| mockTasks = make([]*swarming_api.SwarmingRpcsTaskRequestMetadata, 0, len(tasksList)) |
| for _, task := range tasksList { |
| mockTasks = append(mockTasks, makeSwarmingRpcsTaskRequestMetadata(t, task, linuxTaskDims)) |
| } |
| swarmingClient.MockTasks(mockTasks) |
| mockBots(t, swarmingClient, bot1, bot2, bot3, bot4) |
| require.NoError(t, s.updateUnfinishedTasks(ctx)) |
| runMainLoop(t, s, ctx) |
| require.Equal(t, 0, len(s.queue)) |
| } |
| |
| func TestSchedulerStealingFrom(t *testing.T) { |
| ctx, mg, _, swarmingClient, s, _, _, cleanup := setup(t) |
| defer cleanup() |
| |
| c1 := rs1.Revision |
| c2 := rs2.Revision |
| |
| // Run the available compile task at c2. |
| bot1 := makeBot("bot1", linuxTaskDims) |
| bot2 := makeBot("bot2", linuxTaskDims) |
| mockBots(t, swarmingClient, bot1, bot2) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, []string{c1, c2}) |
| require.NoError(t, err) |
| require.Equal(t, 1, len(tasks[c1])) |
| require.Equal(t, 1, len(tasks[c2])) |
| |
| // Finish the one task. |
| tasksList := []*types.Task{} |
| t1 := tasks[c2][tcc_testutils.BuildTaskName] |
| t1.Status = types.TASK_STATUS_SUCCESS |
| t1.Finished = now.Now(ctx) |
| t1.IsolatedOutput = "abc123" |
| tasksList = append(tasksList, t1) |
| |
| // Forcibly create and insert a second task at c1. |
| t2 := t1.Copy() |
| t2.Id = "t2id" |
| t2.Revision = c1 |
| t2.Commits = []string{c1} |
| tasksList = append(tasksList, t2) |
| |
| require.NoError(t, s.putTasks(ctx, tasksList)) |
| |
| // Add some commits. |
| commits := mg.CommitN(10) |
| require.NoError(t, s.repos[rs1.Repo].Update(ctx)) |
| for _, h := range commits { |
| rs := types.RepoState{ |
| Repo: rs1.Repo, |
| Revision: h, |
| } |
| fillCaches(t, ctx, s.taskCfgCache, rs, tcc_testutils.TasksCfg2) |
| insertJobs(t, ctx, s, rs) |
| } |
| |
| // Run one task. Ensure that it's at tip-of-tree. |
| head := s.repos[rs1.Repo].Get(git.MainBranch).Hash |
| mockBots(t, swarmingClient, bot1) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, commits) |
| require.NoError(t, err) |
| require.Len(t, tasks[head], 1) |
| task := tasks[head][tcc_testutils.BuildTaskName] |
| require.Equal(t, head, task.Revision) |
| expect := commits[:] |
| sort.Strings(expect) |
| sort.Strings(task.Commits) |
| assertdeep.Equal(t, expect, task.Commits) |
| |
| task.Status = types.TASK_STATUS_SUCCESS |
| task.Finished = now.Now(ctx) |
| task.IsolatedOutput = "abc123" |
| require.NoError(t, s.putTask(ctx, task)) |
| |
| oldTasksByCommit := tasks |
| |
| // Run backfills, ensuring that each one steals the right set of commits |
| // from previous builds, until all of the build task candidates have run. |
| for i := 0; i < 9; i++ { |
| // Now, run another task. The new task should bisect the old one. |
| mockBots(t, swarmingClient, bot1) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, commits) |
| require.NoError(t, err) |
| var newTask *types.Task |
| for _, v := range tasks { |
| for _, task := range v { |
| if task.Status == types.TASK_STATUS_PENDING { |
| require.True(t, newTask == nil || task.Id == newTask.Id) |
| newTask = task |
| } |
| } |
| } |
| require.NotNil(t, newTask) |
| |
| oldTask := oldTasksByCommit[newTask.Revision][newTask.Name] |
| require.NotNil(t, oldTask) |
| require.True(t, util.In(newTask.Revision, oldTask.Commits)) |
| |
| // Find the updated old task. |
| updatedOldTask, err := s.tCache.GetTask(oldTask.Id) |
| require.NoError(t, err) |
| require.NotNil(t, updatedOldTask) |
| |
| // Ensure that the blamelists are correct. |
| old := util.NewStringSet(oldTask.Commits) |
| new := util.NewStringSet(newTask.Commits) |
| updatedOld := util.NewStringSet(updatedOldTask.Commits) |
| |
| assertdeep.Equal(t, old, new.Union(updatedOld)) |
| require.Equal(t, 0, len(new.Intersect(updatedOld))) |
| // Finish the new task. |
| newTask.Status = types.TASK_STATUS_SUCCESS |
| newTask.Finished = now.Now(ctx) |
| newTask.IsolatedOutput = "abc123" |
| require.NoError(t, s.putTask(ctx, newTask)) |
| oldTasksByCommit = tasks |
| } |
| |
| // Ensure that we're really done. |
| mockBots(t, swarmingClient, bot1) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, commits) |
| require.NoError(t, err) |
| var newTask *types.Task |
| for _, v := range tasks { |
| for _, task := range v { |
| if task.Status == types.TASK_STATUS_PENDING { |
| require.True(t, newTask == nil || task.Id == newTask.Id) |
| newTask = task |
| } |
| } |
| } |
| require.Nil(t, newTask) |
| } |
| |
| // spyDB calls onPutTasks before delegating PutTask(s) to DB. |
| type spyDB struct { |
| db.DB |
| onPutTasks func([]*types.Task) |
| } |
| |
| func (s *spyDB) PutTask(ctx context.Context, task *types.Task) error { |
| s.onPutTasks([]*types.Task{task}) |
| return s.DB.PutTask(ctx, task) |
| } |
| |
| func (s *spyDB) PutTasks(ctx context.Context, tasks []*types.Task) error { |
| s.onPutTasks(tasks) |
| return s.DB.PutTasks(ctx, tasks) |
| } |
| |
| func testMultipleCandidatesBackfillingEachOtherSetup(t *testing.T) (context.Context, *mem_git.MemGit, db.DB, *TaskScheduler, *swarming_testutils.TestClient, []string, func(*types.Task), *specs.TasksCfg, func()) { |
| |
| ctx, cancel := context.WithCancel(context.Background()) |
| workdir, err := ioutil.TempDir("", "") |
| require.NoError(t, err) |
| |
| // Setup the scheduler. |
| d := memory.NewInMemoryDB() |
| swarmingClient := swarming_testutils.NewTestClient() |
| mg, repo := newMemRepo(t) |
| repos := repograph.Map{ |
| rs1.Repo: repo, |
| } |
| btProject, btInstance, btCleanup := tcc_testutils.SetupBigTable(t) |
| taskCfgCache, err := task_cfg_cache.NewTaskCfgCache(ctx, repos, btProject, btInstance, nil) |
| require.NoError(t, err) |
| |
| // Create a single task in the config. |
| taskName := "faketask" |
| cfg := &specs.TasksCfg{ |
| CasSpecs: map[string]*specs.CasSpec{ |
| "compile": { |
| Digest: tcc_testutils.CompileCASDigest, |
| }, |
| }, |
| Tasks: map[string]*specs.TaskSpec{ |
| taskName: { |
| CasSpec: "compile", |
| CipdPackages: []*specs.CipdPackage{}, |
| Dependencies: []string{}, |
| Dimensions: []string{"pool:Skia"}, |
| Priority: 1.0, |
| }, |
| }, |
| Jobs: map[string]*specs.JobSpec{ |
| "j1": { |
| TaskSpecs: []string{taskName}, |
| }, |
| }, |
| } |
| hashes := mg.CommitN(1) |
| |
| mockTasks := []*swarming_api.SwarmingRpcsTaskRequestMetadata{} |
| mock := func(task *types.Task) { |
| task.Status = types.TASK_STATUS_SUCCESS |
| task.Finished = now.Now(ctx) |
| task.IsolatedOutput = tcc_testutils.CompileCASDigest |
| mockTasks = append(mockTasks, makeSwarmingRpcsTaskRequestMetadata(t, task, linuxTaskDims)) |
| swarmingClient.MockTasks(mockTasks) |
| } |
| |
| // Create the TaskScheduler. |
| cas := &mocks.CAS{} |
| cas.On("Close").Return(nil) |
| // Go ahead and mock the single-input merge calls. |
| cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.CompileCASDigest}).Return(tcc_testutils.CompileCASDigest, nil) |
| cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.TestCASDigest}).Return(tcc_testutils.TestCASDigest, nil) |
| cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.PerfCASDigest}).Return(tcc_testutils.PerfCASDigest, nil) |
| |
| taskExec := swarming_task_execution.NewSwarmingTaskExecutor(swarmingClient, "fake-cas-instance", "") |
| taskExecs := map[string]types.TaskExecutor{ |
| types.TaskExecutor_Swarming: taskExec, |
| types.TaskExecutor_UseDefault: taskExec, |
| } |
| s, err := NewTaskScheduler(ctx, d, nil, time.Duration(math.MaxInt64), 0, repos, cas, "fake-cas-instance", taskExecs, mockhttpclient.NewURLMock().Client(), 1.0, swarming.POOLS_PUBLIC, cdPoolName, "", taskCfgCache, nil, mem_gcsclient.New("diag_unit_tests"), btInstance, BusyBotsDebugLoggingOff) |
| require.NoError(t, err) |
| |
| for _, h := range hashes { |
| rs := types.RepoState{ |
| Repo: rs1.Repo, |
| Revision: h, |
| } |
| fillCaches(t, ctx, taskCfgCache, rs, cfg) |
| insertJobs(t, ctx, s, rs) |
| } |
| |
| // Cycle once. |
| bot1 := makeBot("bot1", map[string]string{"pool": "Skia"}) |
| mockBots(t, swarmingClient, bot1) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| require.Equal(t, 0, len(s.queue)) |
| head := s.repos[rs1.Repo].Get(git.MainBranch).Hash |
| tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, []string{head}) |
| require.NoError(t, err) |
| require.Len(t, tasks[head], 1) |
| mock(tasks[head][taskName]) |
| |
| // Add some commits to the repo. |
| newHashes := mg.CommitN(8) |
| for _, h := range newHashes { |
| rs := types.RepoState{ |
| Repo: rs1.Repo, |
| Revision: h, |
| } |
| fillCaches(t, ctx, taskCfgCache, rs, cfg) |
| insertJobs(t, ctx, s, rs) |
| } |
| require.NoError(t, s.repos[rs1.Repo].Update(ctx)) |
| return ctx, mg, d, s, swarmingClient, newHashes, mock, cfg, func() { |
| testutils.AssertCloses(t, s) |
| testutils.RemoveAll(t, workdir) |
| btCleanup() |
| cancel() |
| } |
| } |
| |
| func TestMultipleCandidatesBackfillingEachOther(t *testing.T) { |
| ctx, _, d, s, swarmingClient, commits, mock, _, cleanup := testMultipleCandidatesBackfillingEachOtherSetup(t) |
| defer cleanup() |
| |
| // Trigger builds simultaneously. |
| bot1 := makeBot("bot1", map[string]string{"pool": "Skia"}) |
| bot2 := makeBot("bot2", map[string]string{"pool": "Skia"}) |
| bot3 := makeBot("bot3", map[string]string{"pool": "Skia"}) |
| mockBots(t, swarmingClient, bot1, bot2, bot3) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| require.Equal(t, 5, len(s.queue)) |
| tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, commits) |
| require.NoError(t, err) |
| |
| // If we're queueing correctly, we should've triggered tasks at |
| // commits[0], commits[4], and either commits[2] or commits[6]. |
| var t1, t2, t3 *types.Task |
| for _, byName := range tasks { |
| for _, task := range byName { |
| if task.Revision == commits[0] { |
| t1 = task |
| } else if task.Revision == commits[4] { |
| t2 = task |
| } else if task.Revision == commits[2] || task.Revision == commits[6] { |
| t3 = task |
| } else { |
| require.FailNow(t, fmt.Sprintf("Task has unknown revision: %+v", task)) |
| } |
| } |
| } |
| require.NotNil(t, t1) |
| require.NotNil(t, t2) |
| require.NotNil(t, t3) |
| mock(t1) |
| mock(t2) |
| mock(t3) |
| |
| // Ensure that we got the blamelists right. |
| var expect1, expect2, expect3 []string |
| if t3.Revision == commits[2] { |
| expect1 = util.CopyStringSlice(commits[:2]) |
| expect2 = util.CopyStringSlice(commits[4:]) |
| expect3 = util.CopyStringSlice(commits[2:4]) |
| } else { |
| expect1 = util.CopyStringSlice(commits[:4]) |
| expect2 = util.CopyStringSlice(commits[4:6]) |
| expect3 = util.CopyStringSlice(commits[6:]) |
| } |
| sort.Strings(expect1) |
| sort.Strings(expect2) |
| sort.Strings(expect3) |
| sort.Strings(t1.Commits) |
| sort.Strings(t2.Commits) |
| sort.Strings(t3.Commits) |
| assertdeep.Equal(t, expect1, t1.Commits) |
| assertdeep.Equal(t, expect2, t2.Commits) |
| assertdeep.Equal(t, expect3, t3.Commits) |
| |
| // Just for good measure, check the task at the head of the queue. |
| expectIdx := 2 |
| if t3.Revision == commits[expectIdx] { |
| expectIdx = 6 |
| } |
| require.Equal(t, commits[expectIdx], s.queue[0].Revision) |
| |
| retryCount := 0 |
| causeConcurrentUpdate := func(tasks []*types.Task) { |
| // HACK(benjaminwagner): Filter out PutTask calls from |
| // updateUnfinishedTasks by looking for new tasks. |
| anyNew := false |
| for _, task := range tasks { |
| if util.TimeIsZero(task.DbModified) { |
| anyNew = true |
| break |
| } |
| } |
| if !anyNew { |
| return |
| } |
| if retryCount < 3 { |
| taskToUpdate := []*types.Task{t1, t2, t3}[retryCount] |
| retryCount++ |
| taskInDb, err := d.GetTaskById(ctx, taskToUpdate.Id) |
| require.NoError(t, err) |
| taskInDb.Status = types.TASK_STATUS_SUCCESS |
| require.NoError(t, d.PutTask(ctx, taskInDb)) |
| s.tCache.AddTasks([]*types.Task{taskInDb}) |
| } |
| } |
| s.db = &spyDB{ |
| DB: d, |
| onPutTasks: causeConcurrentUpdate, |
| } |
| |
| // Run again with 5 bots to check the case where we bisect the same |
| // task twice. |
| bot4 := makeBot("bot4", map[string]string{"pool": "Skia"}) |
| bot5 := makeBot("bot5", map[string]string{"pool": "Skia"}) |
| mockBots(t, swarmingClient, bot1, bot2, bot3, bot4, bot5) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| require.Equal(t, 0, len(s.queue)) |
| require.Equal(t, 3, retryCount) |
| tasks, err = s.tCache.GetTasksForCommits(rs1.Repo, commits) |
| require.NoError(t, err) |
| for _, byName := range tasks { |
| for _, task := range byName { |
| require.Equal(t, 1, len(task.Commits)) |
| require.Equal(t, task.Revision, task.Commits[0]) |
| if util.In(task.Id, []string{t1.Id, t2.Id, t3.Id}) { |
| require.Equal(t, types.TASK_STATUS_SUCCESS, task.Status) |
| } else { |
| require.Equal(t, types.TASK_STATUS_PENDING, task.Status) |
| } |
| } |
| } |
| } |
| |
| func TestSchedulingRetry(t *testing.T) { |
| ctx, _, _, swarmingClient, s, _, _, cleanup := setup(t) |
| defer cleanup() |
| |
| c1 := rs1.Revision |
| |
| // Run the available compile task at c2. |
| bot1 := makeBot("bot1", linuxTaskDims) |
| mockBots(t, swarmingClient, bot1) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| tasks, err := s.tCache.UnfinishedTasks() |
| require.NoError(t, err) |
| require.Len(t, tasks, 1) |
| t1 := tasks[0] |
| require.NotNil(t, t1) |
| // Ensure c2, not c1. |
| require.NotEqual(t, c1, t1.Revision) |
| c2 := t1.Revision |
| |
| // Forcibly add a second build task at c1. |
| t2 := t1.Copy() |
| t2.Id = "t2Id" |
| t2.Revision = c1 |
| t2.Commits = []string{c1} |
| t1.Commits = []string{c2} |
| |
| // One task successful, the other not. |
| t1.Status = types.TASK_STATUS_FAILURE |
| t1.Finished = now.Now(ctx) |
| t2.Status = types.TASK_STATUS_SUCCESS |
| t2.Finished = now.Now(ctx) |
| t2.IsolatedOutput = "abc123" |
| |
| require.NoError(t, s.putTasks(ctx, []*types.Task{t1, t2})) |
| |
| // Cycle. Ensure that we schedule a retry of t1. |
| prev := t1 |
| i := 1 |
| for { |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| tasks, err = s.tCache.UnfinishedTasks() |
| require.NoError(t, err) |
| if len(tasks) == 0 { |
| break |
| } |
| require.Len(t, tasks, 1) |
| retry := tasks[0] |
| require.NotNil(t, retry) |
| require.Equal(t, prev.Id, retry.RetryOf) |
| require.Equal(t, i, retry.Attempt) |
| require.Equal(t, c2, retry.Revision) |
| retry.Status = types.TASK_STATUS_FAILURE |
| retry.Finished = now.Now(ctx) |
| require.NoError(t, s.putTask(ctx, retry)) |
| |
| prev = retry |
| i++ |
| } |
| require.Equal(t, 5, i) |
| } |
| |
| func TestParentTaskId(t *testing.T) { |
| ctx, _, _, swarmingClient, s, _, cas, cleanup := setup(t) |
| defer cleanup() |
| |
| // Run the available compile task at c2. |
| bot1 := makeBot("bot1", linuxTaskDims) |
| mockBots(t, swarmingClient, bot1) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| tasks, err := s.tCache.UnfinishedTasks() |
| require.NoError(t, err) |
| require.Len(t, tasks, 1) |
| t1 := tasks[0] |
| t1.Status = types.TASK_STATUS_SUCCESS |
| t1.Finished = now.Now(ctx) |
| t1.IsolatedOutput = "abc123/45" |
| require.Equal(t, 0, len(t1.ParentTaskIds)) |
| require.NoError(t, s.putTasks(ctx, []*types.Task{t1})) |
| |
| // Run the dependent tasks. Ensure that their parent IDs are correct. |
| bot3 := makeBot("bot3", androidTaskDims) |
| bot4 := makeBot("bot4", androidTaskDims) |
| mockBots(t, swarmingClient, bot3, bot4) |
| cas.On("Merge", testutils.AnyContext, []string{"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb/42", "abc123/45"}).Return("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbabc123/87", nil) |
| cas.On("Merge", testutils.AnyContext, []string{"cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc/42", "abc123/45"}).Return("ccccccccccccccccccccccccccccccccccccccccccccccccccccccccccabc123/87", nil) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| tasks, err = s.tCache.UnfinishedTasks() |
| require.NoError(t, err) |
| require.Len(t, tasks, 2) |
| for _, task := range tasks { |
| require.Equal(t, 1, len(task.ParentTaskIds)) |
| p := task.ParentTaskIds[0] |
| require.Equal(t, p, t1.Id) |
| |
| updated, err := task.UpdateFromTaskResult(&types.TaskResult{ |
| ID: task.SwarmingTaskId, |
| CasOutput: task.IsolatedOutput, |
| Created: task.Created, |
| Finished: task.Finished, |
| Started: task.Started, |
| Status: task.Status, |
| Tags: map[string][]string{ |
| types.SWARMING_TAG_ID: {task.Id}, |
| types.SWARMING_TAG_NAME: {task.Name}, |
| types.SWARMING_TAG_REPO: {task.Repo}, |
| types.SWARMING_TAG_REVISION: {task.Revision}, |
| types.SWARMING_TAG_PARENT_TASK_ID: task.ParentTaskIds, |
| types.SWARMING_TAG_FORCED_JOB_ID: {task.ForcedJobId}, |
| }, |
| MachineID: task.SwarmingBotId, |
| }) |
| require.NoError(t, err) |
| require.False(t, updated) |
| } |
| } |
| |
| func TestSkipTasks(t *testing.T) { |
| // skip_tasks has its own tests, so this test just verifies that it's |
| // actually integrated into the scheduler. |
| ctx, _, _, swarmingClient, s, _, _, cleanup := setup(t) |
| defer cleanup() |
| c, cleanupfs := ftestutils.NewClientForTesting(context.Background(), t) |
| defer cleanupfs() |
| bl, err := skip_tasks.New(context.Background(), c) |
| require.NoError(t, err) |
| s.skipTasks = bl |
| |
| c1 := rs1.Revision |
| |
| // Mock some bots, add one of the build tasks to the skip_tasks list. |
| bot1 := makeBot("bot1", linuxTaskDims) |
| bot2 := makeBot("bot2", linuxTaskDims) |
| mockBots(t, swarmingClient, bot1, bot2) |
| require.NoError(t, s.GetSkipTasks().AddRule(ctx, &skip_tasks.Rule{ |
| AddedBy: "Tests", |
| TaskSpecPatterns: []string{".*"}, |
| Commits: []string{c1}, |
| Description: "desc", |
| Name: "My-Rule", |
| }, s.repos)) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| tasks, err := s.tCache.UnfinishedTasks() |
| require.NoError(t, err) |
| // The skipped commit should not have been triggered. |
| require.Len(t, tasks, 1) |
| require.NotEqual(t, c1, tasks[0].Revision) |
| // Candidate diagnostics should indicate the skip rule. |
| diag := lastDiagnostics(t, s) |
| foundSkipped := 0 |
| for _, c := range diag.Candidates { |
| if c.Revision == c1 { |
| foundSkipped++ |
| require.Equal(t, "My-Rule", c.Diagnostics.Filtering.SkippedByRule) |
| } else if c.TaskKey == tasks[0].TaskKey { |
| require.Nil(t, c.Diagnostics.Filtering) |
| } else { |
| require.Equal(t, "", c.Diagnostics.Filtering.SkippedByRule) |
| require.True(t, len(c.Diagnostics.Filtering.UnmetDependencies) > 0) |
| } |
| } |
| // Should be one Build task and one Test task skipped. |
| require.Equal(t, 2, foundSkipped) |
| } |
| |
| func TestGetTasksForJob(t *testing.T) { |
| ctx, _, _, swarmingClient, s, _, cas, cleanup := setup(t) |
| defer cleanup() |
| |
| c1 := rs1.Revision |
| c2 := rs2.Revision |
| |
| // Cycle once, check that we have empty sets for all Jobs. |
| runMainLoop(t, s, ctx) |
| jobs, err := s.jCache.UnfinishedJobs() |
| require.NoError(t, err) |
| require.Len(t, jobs, 5) |
| var j1, j2, j3, j4, j5 *types.Job |
| for _, j := range jobs { |
| if j.Revision == c1 { |
| if j.Name == tcc_testutils.BuildTaskName { |
| j1 = j |
| } else { |
| j2 = j |
| } |
| } else { |
| if j.Name == tcc_testutils.BuildTaskName { |
| j3 = j |
| } else if j.Name == tcc_testutils.TestTaskName { |
| j4 = j |
| } else { |
| j5 = j |
| } |
| } |
| tasksByName, err := s.getTasksForJob(j) |
| require.NoError(t, err) |
| for _, tasks := range tasksByName { |
| require.Empty(t, tasks) |
| } |
| } |
| require.NotNil(t, j1) |
| require.NotNil(t, j2) |
| require.NotNil(t, j3) |
| require.NotNil(t, j4) |
| require.NotNil(t, j5) |
| |
| // Run the available compile task at c2. |
| bot1 := makeBot("bot1", linuxTaskDims) |
| mockBots(t, swarmingClient, bot1) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| tasks, err := s.tCache.UnfinishedTasks() |
| require.NoError(t, err) |
| require.Len(t, tasks, 1) |
| t1 := tasks[0] |
| require.NotNil(t, t1) |
| require.Equal(t, t1.Revision, c2) |
| |
| // Test that we get the new tasks where applicable. |
| expect := map[string]map[string][]*types.Task{ |
| j1.Id: { |
| tcc_testutils.BuildTaskName: {}, |
| }, |
| j2.Id: { |
| tcc_testutils.BuildTaskName: {}, |
| tcc_testutils.TestTaskName: {}, |
| }, |
| j3.Id: { |
| tcc_testutils.BuildTaskName: {t1}, |
| }, |
| j4.Id: { |
| tcc_testutils.BuildTaskName: {t1}, |
| tcc_testutils.TestTaskName: {}, |
| }, |
| j5.Id: { |
| tcc_testutils.BuildTaskName: {t1}, |
| tcc_testutils.PerfTaskName: {}, |
| }, |
| } |
| for _, j := range jobs { |
| tasksByName, err := s.getTasksForJob(j) |
| require.NoError(t, err) |
| assertdeep.Equal(t, expect[j.Id], tasksByName) |
| } |
| |
| // Mark the task as failed. |
| t1.Status = types.TASK_STATUS_FAILURE |
| t1.Finished = now.Now(ctx) |
| require.NoError(t, s.putTasks(ctx, []*types.Task{t1})) |
| |
| // Test that the results propagated through. |
| for _, j := range jobs { |
| tasksByName, err := s.getTasksForJob(j) |
| require.NoError(t, err) |
| assertdeep.Equal(t, expect[j.Id], tasksByName) |
| } |
| |
| // Cycle. Ensure that we schedule a retry of t1. |
| // Need two bots, since the retry will score lower than the Build task at c1. |
| bot2 := makeBot("bot2", linuxTaskDims) |
| mockBots(t, swarmingClient, bot1, bot2) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| tasks, err = s.tCache.UnfinishedTasks() |
| require.NoError(t, err) |
| require.Len(t, tasks, 2) |
| var t2, t3 *types.Task |
| for _, task := range tasks { |
| if task.TaskKey == t1.TaskKey { |
| t2 = task |
| } else { |
| t3 = task |
| } |
| } |
| require.NotNil(t, t2) |
| require.Equal(t, t1.Id, t2.RetryOf) |
| |
| // Verify that both the original t1 and its retry show up. |
| t1, err = s.tCache.GetTask(t1.Id) // t1 was updated. |
| require.NoError(t, err) |
| expect[j1.Id][tcc_testutils.BuildTaskName] = []*types.Task{t3} |
| expect[j2.Id][tcc_testutils.BuildTaskName] = []*types.Task{t3} |
| expect[j3.Id][tcc_testutils.BuildTaskName] = []*types.Task{t1, t2} |
| expect[j4.Id][tcc_testutils.BuildTaskName] = []*types.Task{t1, t2} |
| expect[j5.Id][tcc_testutils.BuildTaskName] = []*types.Task{t1, t2} |
| for _, j := range jobs { |
| tasksByName, err := s.getTasksForJob(j) |
| require.NoError(t, err) |
| assertdeep.Equal(t, expect[j.Id], tasksByName) |
| } |
| |
| // The retry succeeded. |
| t2.Status = types.TASK_STATUS_SUCCESS |
| t2.Finished = now.Now(ctx) |
| t2.IsolatedOutput = "abc123/45" |
| // The Build at c1 failed. |
| t3.Status = types.TASK_STATUS_FAILURE |
| t3.Finished = now.Now(ctx) |
| require.NoError(t, s.putTasks(ctx, []*types.Task{t2, t3})) |
| mockBots(t, swarmingClient) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| tasks, err = s.tCache.UnfinishedTasks() |
| require.NoError(t, err) |
| require.Empty(t, tasks) |
| |
| // Test that the results propagated through. |
| for _, j := range jobs { |
| tasksByName, err := s.getTasksForJob(j) |
| require.NoError(t, err) |
| assertdeep.Equal(t, expect[j.Id], tasksByName) |
| } |
| |
| // Schedule the remaining tasks. |
| bot3 := makeBot("bot3", androidTaskDims) |
| bot4 := makeBot("bot4", androidTaskDims) |
| bot5 := makeBot("bot5", androidTaskDims) |
| mockBots(t, swarmingClient, bot3, bot4, bot5) |
| cas.On("Merge", testutils.AnyContext, []string{"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb/42", "abc123/45"}).Return("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbabc123/87", nil) |
| cas.On("Merge", testutils.AnyContext, []string{"cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc/42", "abc123/45"}).Return("ccccccccccccccccccccccccccccccccccccccccccccccccccccccccccabc123/87", nil) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| |
| // Verify that the new tasks show up. |
| tasks, err = s.tCache.UnfinishedTasks() |
| require.NoError(t, err) |
| require.Len(t, tasks, 2) // Test and perf at c2. |
| var t4, t5 *types.Task |
| for _, task := range tasks { |
| if task.Name == tcc_testutils.TestTaskName { |
| t4 = task |
| } else { |
| t5 = task |
| } |
| } |
| expect[j4.Id][tcc_testutils.TestTaskName] = []*types.Task{t4} |
| expect[j5.Id][tcc_testutils.PerfTaskName] = []*types.Task{t5} |
| for _, j := range jobs { |
| tasksByName, err := s.getTasksForJob(j) |
| require.NoError(t, err) |
| assertdeep.Equal(t, expect[j.Id], tasksByName) |
| } |
| } |
| |
| func TestTaskTimeouts(t *testing.T) { |
| ctx, mg, _, swarmingClient, s, _, _, cleanup := setup(t) |
| defer cleanup() |
| |
| // The test repo does not set any timeouts. Ensure that we get |
| // reasonable default values. |
| bot1 := makeBot("bot1", map[string]string{"pool": "Skia", "os": "Ubuntu", "gpu": "none"}) |
| mockBots(t, swarmingClient, bot1) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| unfinished, err := s.tCache.UnfinishedTasks() |
| require.NoError(t, err) |
| require.Len(t, unfinished, 1) |
| task := unfinished[0] |
| swarmingTask, err := swarmingClient.GetTaskMetadata(ctx, task.SwarmingTaskId) |
| require.NoError(t, err) |
| // These are the defaults in go/swarming/swarming.go. |
| require.Equal(t, 1, len(swarmingTask.Request.TaskSlices)) |
| require.Equal(t, int64(60*60), swarmingTask.Request.TaskSlices[0].Properties.ExecutionTimeoutSecs) |
| require.Equal(t, int64(20*60), swarmingTask.Request.TaskSlices[0].Properties.IoTimeoutSecs) |
| require.Equal(t, int64(4*60*60), swarmingTask.Request.TaskSlices[0].ExpirationSecs) |
| // Fail the task to get it out of the unfinished list. |
| task.Status = types.TASK_STATUS_FAILURE |
| require.NoError(t, s.putTask(ctx, task)) |
| |
| // Rewrite tasks.json with some timeouts. |
| name := "Timeout-Task" |
| cfg := &specs.TasksCfg{ |
| CasSpecs: map[string]*specs.CasSpec{ |
| "compile": { |
| Digest: tcc_testutils.CompileCASDigest, |
| }, |
| }, |
| Jobs: map[string]*specs.JobSpec{ |
| "Timeout-Job": { |
| Priority: 1.0, |
| TaskSpecs: []string{name}, |
| }, |
| }, |
| Tasks: map[string]*specs.TaskSpec{ |
| name: { |
| CasSpec: "compile", |
| CipdPackages: []*specs.CipdPackage{}, |
| Dependencies: []string{}, |
| Dimensions: []string{ |
| "pool:Skia", |
| "os:Mac", |
| "gpu:my-gpu", |
| }, |
| ExecutionTimeout: 40 * time.Minute, |
| Expiration: 2 * time.Hour, |
| IoTimeout: 3 * time.Minute, |
| Priority: 1.0, |
| }, |
| }, |
| } |
| hashes := mg.CommitN(1) |
| require.NoError(t, s.repos.Update(ctx)) |
| rs := types.RepoState{ |
| Repo: "fake.git", |
| Revision: hashes[0], |
| } |
| fillCaches(t, ctx, s.taskCfgCache, rs, cfg) |
| insertJobs(t, ctx, s, rs) |
| |
| // Cycle, ensure that we get the expected timeouts. |
| bot2 := makeBot("bot2", map[string]string{"pool": "Skia", "os": "Mac", "gpu": "my-gpu"}) |
| mockBots(t, swarmingClient, bot2) |
| runMainLoop(t, s, ctx) |
| require.NoError(t, s.tCache.Update(ctx)) |
| unfinished, err = s.tCache.UnfinishedTasks() |
| require.NoError(t, err) |
| require.Len(t, unfinished, 1) |
| task = unfinished[0] |
| require.Equal(t, name, task.Name) |
| swarmingTask, err = swarmingClient.GetTaskMetadata(ctx, task.SwarmingTaskId) |
| require.NoError(t, err) |
| require.Equal(t, 1, len(swarmingTask.Request.TaskSlices)) |
| require.Equal(t, int64(40*60), swarmingTask.Request.TaskSlices[0].Properties.ExecutionTimeoutSecs) |
| require.Equal(t, int64(3*60), swarmingTask.Request.TaskSlices[0].Properties.IoTimeoutSecs) |
| require.Equal(t, int64(2*60*60), swarmingTask.Request.TaskSlices[0].ExpirationSecs) |
| } |
| |
| func TestUpdateUnfinishedTasks(t *testing.T) { |
| ctx, _, _, swarmingClient, s, _, _, cleanup := setup(t) |
| defer cleanup() |
| |
| // Create a few tasks. |
| now := time.Unix(1480683321, 0).UTC() |
| t1 := &types.Task{ |
| Id: "t1", |
| Created: now.Add(-time.Minute), |
| Status: types.TASK_STATUS_RUNNING, |
| SwarmingTaskId: "swarmt1", |
| } |
| t2 := &types.Task{ |
| Id: "t2", |
| Created: now.Add(-10 * time.Minute), |
| Status: types.TASK_STATUS_PENDING, |
| SwarmingTaskId: "swarmt2", |
| } |
| t3 := &types.Task{ |
| Id: "t3", |
| Created: now.Add(-5 * time.Hour), // Outside the 4-hour window. |
| Status: types.TASK_STATUS_PENDING, |
| SwarmingTaskId: "swarmt3", |
| } |
| // Include a fake task to ensure it's ignored. |
| t4 := &types.Task{ |
| Id: "t4", |
| Created: now.Add(-time.Minute), |
| Status: types.TASK_STATUS_PENDING, |
| } |
| |
| // Insert the tasks into the DB. |
| tasks := []*types.Task{t1, t2, t3, t4} |
| require.NoError(t, s.putTasks(ctx, tasks)) |
| |
| // Update the tasks, mock in Swarming. |
| t1.Status = types.TASK_STATUS_SUCCESS |
| t1.IsolatedOutput = "dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd/86" |
| t2.Status = types.TASK_STATUS_FAILURE |
| t3.Status = types.TASK_STATUS_SUCCESS |
| t3.IsolatedOutput = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/256" |
| |
| m1 := makeSwarmingRpcsTaskRequestMetadata(t, t1, linuxTaskDims) |
| m2 := makeSwarmingRpcsTaskRequestMetadata(t, t2, linuxTaskDims) |
| m3 := makeSwarmingRpcsTaskRequestMetadata(t, t3, linuxTaskDims) |
| swarmingClient.MockTasks([]*swarming_api.SwarmingRpcsTaskRequestMetadata{m1, m2, m3}) |
| |
| // Assert that the third task doesn't show up in the time range query. |
| got, err := swarmingClient.ListTasks(ctx, now.Add(-4*time.Hour), now, []string{"pool:Skia"}, "") |
| require.NoError(t, err) |
| assertdeep.Equal(t, []*swarming_api.SwarmingRpcsTaskRequestMetadata{m1, m2}, got) |
| |
| // Ensure that we update the tasks as expected. |
| require.NoError(t, s.updateUnfinishedTasks(ctx)) |
| for _, task := range tasks { |
| got, err := s.db.GetTaskById(ctx, task.Id) |
| require.NoError(t, err) |
| // Ignore DbModified when comparing. |
| task.DbModified = got.DbModified |
| assertdeep.Equal(t, task, got) |
| } |
| } |
| |
| // setupAddTasksTest calls setup then adds 7 commits to the repo and returns |
| // their hashes. |
| func setupAddTasksTest(t *testing.T) (context.Context, *mem_git.MemGit, []string, *memory.InMemoryDB, *TaskScheduler, func()) { |
| ctx, mg, d, _, s, _, _, cleanup := setup(t) |
| |
| // Add some commits to test blamelist calculation. |
| mg.CommitN(7) |
| require.NoError(t, s.repos.Update(ctx)) |
| hashes, err := s.repos[rs1.Repo].Get(git.MainBranch).AllCommits() |
| require.NoError(t, err) |
| for _, hash := range hashes { |
| require.NoError(t, s.taskCfgCache.Set(ctx, types.RepoState{ |
| Repo: rs1.Repo, |
| Revision: hash, |
| }, &specs.TasksCfg{ |
| Tasks: map[string]*specs.TaskSpec{ |
| "duty": {}, |
| "onus": {}, |
| "toil": {}, |
| "work": {}, |
| }, |
| }, nil)) |
| } |
| |
| return ctx, mg, hashes, d, s, func() { |
| cleanup() |
| } |
| } |
| |
| // assertBlamelist asserts task.Commits contains exactly the hashes at the given |
| // indexes of hashes, in any order. |
| func assertBlamelist(t *testing.T, hashes []string, task *types.Task, indexes []int) { |
| expected := util.NewStringSet() |
| for _, idx := range indexes { |
| expected[hashes[idx]] = true |
| } |
| require.Equal(t, expected, util.NewStringSet(task.Commits)) |
| } |
| |
| // assertModifiedTasks asserts that the result of GetModifiedTasks is deep-equal |
| // to expected, in any order. |
| func assertModifiedTasks(t *testing.T, d db.TaskReader, mod <-chan []*types.Task, expected []*types.Task) { |
| tasksById := map[string]*types.Task{} |
| require.Eventually(t, func() bool { |
| // Use a select so that the test will fail after 10 seconds |
| // rather than time out after 10 minutes (or whatever the |
| // overall timeout is set to). |
| select { |
| case modTasks := <-mod: |
| for _, task := range modTasks { |
| tasksById[task.Id] = task |
| } |
| for _, expectedTask := range expected { |
| actualTask, ok := tasksById[expectedTask.Id] |
| if !ok { |
| return false |
| } |
| if !deepequal.DeepEqual(expectedTask, actualTask) { |
| return false |
| } |
| } |
| return true |
| default: |
| // Nothing to do. |
| } |
| return false |
| }, 10*time.Second, 50*time.Millisecond) |
| } |
| |
| // addTasksSingleTaskSpec should add tasks and compute simple blamelists. |
| func TestAddTasksSingleTaskSpecSimple(t *testing.T) { |
| ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t) |
| defer cleanup() |
| |
| t1 := makeTask(ctx, "toil", rs1.Repo, hashes[6]) |
| require.NoError(t, s.putTask(ctx, t1)) |
| |
| mod := d.ModifiedTasksCh(ctx) |
| <-mod // The first batch is unused. |
| |
| t2 := makeTask(ctx, "toil", rs1.Repo, hashes[5]) // Commits should be {5} |
| t3 := makeTask(ctx, "toil", rs1.Repo, hashes[3]) // Commits should be {3, 4} |
| t4 := makeTask(ctx, "toil", rs1.Repo, hashes[2]) // Commits should be {2} |
| t5 := makeTask(ctx, "toil", rs1.Repo, hashes[0]) // Commits should be {0, 1} |
| |
| // Clear Commits on some tasks, set incorrect Commits on others to |
| // ensure it's ignored. |
| t3.Commits = nil |
| t4.Commits = []string{hashes[5], hashes[4], hashes[3], hashes[2]} |
| sort.Strings(t4.Commits) |
| |
| // Specify tasks in wrong order to ensure results are deterministic. |
| require.NoError(t, s.addTasksSingleTaskSpec(ctx, []*types.Task{t5, t2, t3, t4})) |
| |
| assertBlamelist(t, hashes, t2, []int{5}) |
| assertBlamelist(t, hashes, t3, []int{3, 4}) |
| assertBlamelist(t, hashes, t4, []int{2}) |
| assertBlamelist(t, hashes, t5, []int{0, 1}) |
| |
| // Check that the tasks were inserted into the DB. |
| assertModifiedTasks(t, d, mod, []*types.Task{t2, t3, t4, t5}) |
| } |
| |
| // addTasksSingleTaskSpec should compute blamelists when new tasks bisect each |
| // other. |
| func TestAddTasksSingleTaskSpecBisectNew(t *testing.T) { |
| ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t) |
| defer cleanup() |
| |
| t1 := makeTask(ctx, "toil", rs1.Repo, hashes[6]) |
| require.NoError(t, s.putTask(ctx, t1)) |
| |
| mod := d.ModifiedTasksCh(ctx) |
| <-mod // The first batch is unused. |
| |
| // t2.Commits = {1, 2, 3, 4, 5} |
| t2 := makeTask(ctx, "toil", rs1.Repo, hashes[1]) |
| // t3.Commits = {3, 4, 5} |
| // t2.Commits = {1, 2} |
| t3 := makeTask(ctx, "toil", rs1.Repo, hashes[3]) |
| // t4.Commits = {4, 5} |
| // t3.Commits = {3} |
| t4 := makeTask(ctx, "toil", rs1.Repo, hashes[4]) |
| // t5.Commits = {0} |
| t5 := makeTask(ctx, "toil", rs1.Repo, hashes[0]) |
| // t6.Commits = {2} |
| // t2.Commits = {1} |
| t6 := makeTask(ctx, "toil", rs1.Repo, hashes[2]) |
| // t7.Commits = {1} |
| // t2.Commits = {} |
| t7 := makeTask(ctx, "toil", rs1.Repo, hashes[1]) |
| |
| // Specify tasks in wrong order to ensure results are deterministic. |
| tasks := []*types.Task{t5, t2, t7, t3, t6, t4} |
| |
| // Assign Ids. |
| for _, task := range tasks { |
| require.NoError(t, d.AssignId(ctx, task)) |
| } |
| |
| require.NoError(t, s.addTasksSingleTaskSpec(ctx, tasks)) |
| |
| assertBlamelist(t, hashes, t2, []int{}) |
| assertBlamelist(t, hashes, t3, []int{3}) |
| assertBlamelist(t, hashes, t4, []int{4, 5}) |
| assertBlamelist(t, hashes, t5, []int{0}) |
| assertBlamelist(t, hashes, t6, []int{2}) |
| assertBlamelist(t, hashes, t7, []int{1}) |
| |
| // Check that the tasks were inserted into the DB. |
| assertModifiedTasks(t, d, mod, tasks) |
| } |
| |
| // addTasksSingleTaskSpec should compute blamelists when new tasks bisect old |
| // tasks. |
| func TestAddTasksSingleTaskSpecBisectOld(t *testing.T) { |
| ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t) |
| defer cleanup() |
| |
| t1 := makeTask(ctx, "toil", rs1.Repo, hashes[6]) |
| t2 := makeTask(ctx, "toil", rs1.Repo, hashes[1]) |
| t2.Commits = []string{hashes[1], hashes[2], hashes[3], hashes[4], hashes[5]} |
| sort.Strings(t2.Commits) |
| require.NoError(t, s.putTasks(ctx, []*types.Task{t1, t2})) |
| |
| mod := d.ModifiedTasksCh(ctx) |
| <-mod // The first batch is unused. |
| |
| // t3.Commits = {3, 4, 5} |
| // t2.Commits = {1, 2} |
| t3 := makeTask(ctx, "toil", rs1.Repo, hashes[3]) |
| // t4.Commits = {4, 5} |
| // t3.Commits = {3} |
| t4 := makeTask(ctx, "toil", rs1.Repo, hashes[4]) |
| // t5.Commits = {2} |
| // t2.Commits = {1} |
| t5 := makeTask(ctx, "toil", rs1.Repo, hashes[2]) |
| // t6.Commits = {4, 5} |
| // t4.Commits = {} |
| t6 := makeTask(ctx, "toil", rs1.Repo, hashes[4]) |
| |
| // Specify tasks in wrong order to ensure results are deterministic. |
| require.NoError(t, s.addTasksSingleTaskSpec(ctx, []*types.Task{t5, t3, t6, t4})) |
| |
| t2Updated, err := d.GetTaskById(ctx, t2.Id) |
| require.NoError(t, err) |
| assertBlamelist(t, hashes, t2Updated, []int{1}) |
| assertBlamelist(t, hashes, t3, []int{3}) |
| assertBlamelist(t, hashes, t4, []int{}) |
| assertBlamelist(t, hashes, t5, []int{2}) |
| assertBlamelist(t, hashes, t6, []int{4, 5}) |
| |
| // Check that the tasks were inserted into the DB. |
| t2.Commits = t2Updated.Commits |
| t2.DbModified = t2Updated.DbModified |
| assertModifiedTasks(t, d, mod, []*types.Task{t2, t3, t4, t5, t6}) |
| } |
| |
| // addTasksSingleTaskSpec should update existing tasks, keeping the correct |
| // blamelist. |
| func TestAddTasksSingleTaskSpecUpdate(t *testing.T) { |
| ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t) |
| defer cleanup() |
| |
| t1 := makeTask(ctx, "toil", rs1.Repo, hashes[6]) |
| t2 := makeTask(ctx, "toil", rs1.Repo, hashes[3]) |
| t3 := makeTask(ctx, "toil", rs1.Repo, hashes[4]) |
| t3.Commits = nil // Stolen by t5 |
| t4 := makeTask(ctx, "toil", rs1.Repo, hashes[0]) |
| t4.Commits = []string{hashes[0], hashes[1], hashes[2]} |
| sort.Strings(t4.Commits) |
| t5 := makeTask(ctx, "toil", rs1.Repo, hashes[4]) |
| t5.Commits = []string{hashes[4], hashes[5]} |
| sort.Strings(t5.Commits) |
| |
| tasks := []*types.Task{t1, t2, t3, t4, t5} |
| require.NoError(t, s.putTasks(ctx, tasks)) |
| |
| mod := d.ModifiedTasksCh(ctx) |
| <-mod // The first batch is unused. |
| |
| // Make an update. |
| for _, task := range tasks { |
| task.Status = types.TASK_STATUS_MISHAP |
| } |
| |
| // Specify tasks in wrong order to ensure results are deterministic. |
| require.NoError(t, s.addTasksSingleTaskSpec(ctx, []*types.Task{t5, t3, t1, t4, t2})) |
| |
| // Check that blamelists did not change. |
| assertBlamelist(t, hashes, t1, []int{6}) |
| assertBlamelist(t, hashes, t2, []int{3}) |
| assertBlamelist(t, hashes, t3, []int{}) |
| assertBlamelist(t, hashes, t4, []int{0, 1, 2}) |
| assertBlamelist(t, hashes, t5, []int{4, 5}) |
| |
| // Check that the tasks were inserted into the DB. |
| assertModifiedTasks(t, d, mod, []*types.Task{t1, t2, t3, t4, t5}) |
| } |
| |
| // AddTasks should call addTasksSingleTaskSpec for each group of tasks. |
| func TestAddTasks(t *testing.T) { |
| ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t) |
| defer cleanup() |
| |
| toil1 := makeTask(ctx, "toil", rs1.Repo, hashes[6]) |
| duty1 := makeTask(ctx, "duty", rs1.Repo, hashes[6]) |
| work1 := makeTask(ctx, "work", rs1.Repo, hashes[6]) |
| work2 := makeTask(ctx, "work", rs1.Repo, hashes[1]) |
| work2.Commits = []string{hashes[1], hashes[2], hashes[3], hashes[4], hashes[5]} |
| sort.Strings(work2.Commits) |
| onus1 := makeTask(ctx, "onus", rs1.Repo, hashes[6]) |
| onus2 := makeTask(ctx, "onus", rs1.Repo, hashes[3]) |
| onus2.Commits = []string{hashes[3], hashes[4], hashes[5]} |
| sort.Strings(onus2.Commits) |
| require.NoError(t, s.putTasks(ctx, []*types.Task{toil1, duty1, work1, work2, onus1, onus2})) |
| |
| mod := d.ModifiedTasksCh(ctx) |
| <-mod // The first batch is unused. |
| |
| // toil2.Commits = {5} |
| toil2 := makeTask(ctx, "toil", rs1.Repo, hashes[5]) |
| // toil3.Commits = {3, 4} |
| toil3 := makeTask(ctx, "toil", rs1.Repo, hashes[3]) |
| |
| // duty2.Commits = {1, 2, 3, 4, 5} |
| duty2 := makeTask(ctx, "duty", rs1.Repo, hashes[1]) |
| // duty3.Commits = {3, 4, 5} |
| // duty2.Commits = {1, 2} |
| duty3 := makeTask(ctx, "duty", rs1.Repo, hashes[3]) |
| |
| // work3.Commits = {3, 4, 5} |
| // work2.Commits = {1, 2} |
| work3 := makeTask(ctx, "work", rs1.Repo, hashes[3]) |
| // work4.Commits = {2} |
| // work2.Commits = {1} |
| work4 := makeTask(ctx, "work", rs1.Repo, hashes[2]) |
| |
| onus2.Status = types.TASK_STATUS_MISHAP |
| // onus3 steals all commits from onus2 |
| onus3 := makeTask(ctx, "onus", rs1.Repo, hashes[3]) |
| // onus4 steals all commits from onus3 |
| onus4 := makeTask(ctx, "onus", rs1.Repo, hashes[3]) |
| |
| tasks := map[string]map[string][]*types.Task{ |
| rs1.Repo: { |
| "toil": {toil2, toil3}, |
| "duty": {duty2, duty3}, |
| "work": {work3, work4}, |
| "onus": {onus2, onus3, onus4}, |
| }, |
| } |
| |
| require.NoError(t, s.addTasks(ctx, tasks)) |
| |
| assertBlamelist(t, hashes, toil2, []int{5}) |
| assertBlamelist(t, hashes, toil3, []int{3, 4}) |
| |
| assertBlamelist(t, hashes, duty2, []int{1, 2}) |
| assertBlamelist(t, hashes, duty3, []int{3, 4, 5}) |
| |
| work2Updated, err := d.GetTaskById(ctx, work2.Id) |
| require.NoError(t, err) |
| assertBlamelist(t, hashes, work2Updated, []int{1}) |
| assertBlamelist(t, hashes, work3, []int{3, 4, 5}) |
| assertBlamelist(t, hashes, work4, []int{2}) |
| |
| assertBlamelist(t, hashes, onus2, []int{}) |
| assertBlamelist(t, hashes, onus3, []int{}) |
| assertBlamelist(t, hashes, onus4, []int{3, 4, 5}) |
| |
| // Check that the tasks were inserted into the DB. |
| work2.Commits = work2Updated.Commits |
| work2.DbModified = work2Updated.DbModified |
| assertModifiedTasks(t, d, mod, []*types.Task{toil2, toil3, duty2, duty3, work2, work3, work4, onus2, onus3, onus4}) |
| } |
| |
| // AddTasks should not leave DB in an inconsistent state if there is a partial error. |
| func TestAddTasksFailure(t *testing.T) { |
| ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t) |
| defer cleanup() |
| |
| toil1 := makeTask(ctx, "toil", rs1.Repo, hashes[6]) |
| duty1 := makeTask(ctx, "duty", rs1.Repo, hashes[6]) |
| duty2 := makeTask(ctx, "duty", rs1.Repo, hashes[5]) |
| require.NoError(t, s.putTasks(ctx, []*types.Task{toil1, duty1, duty2})) |
| d.Wait() |
| |
| // Cause ErrConcurrentUpdate in AddTasks. |
| cachedDuty2 := duty2.Copy() |
| duty2.Status = types.TASK_STATUS_MISHAP |
| require.NoError(t, d.PutTask(ctx, duty2)) |
| d.Wait() |
| |
| mod := d.ModifiedTasksCh(ctx) |
| <-mod // The first batch is unused. |
| |
| // toil2.Commits = {3, 4, 5} |
| toil2 := makeTask(ctx, "toil", rs1.Repo, hashes[3]) |
| |
| cachedDuty2.Status = types.TASK_STATUS_FAILURE |
| // duty3.Commits = {3, 4} |
| duty3 := makeTask(ctx, "duty", rs1.Repo, hashes[3]) |
| |
| tasks := map[string]map[string][]*types.Task{ |
| rs1.Repo: { |
| "toil": {toil2}, |
| "duty": {cachedDuty2, duty3}, |
| }, |
| } |
| |
| // Try multiple times to reduce chance of test passing flakily. |
| for i := 0; i < 3; i++ { |
| err := s.addTasks(ctx, tasks) |
| require.Error(t, err) |
| modTasks := <-mod |
| // "duty" tasks should never be updated. |
| for _, task := range modTasks { |
| require.Equal(t, "toil", task.Name) |
| assertdeep.Equal(t, toil2, task) |
| assertBlamelist(t, hashes, toil2, []int{3, 4, 5}) |
| } |
| } |
| |
| duty2.Status = types.TASK_STATUS_FAILURE |
| tasks[rs1.Repo]["duty"] = []*types.Task{duty2, duty3} |
| require.NoError(t, s.addTasks(ctx, tasks)) |
| |
| assertBlamelist(t, hashes, toil2, []int{3, 4, 5}) |
| |
| assertBlamelist(t, hashes, duty2, []int{5}) |
| assertBlamelist(t, hashes, duty3, []int{3, 4}) |
| |
| // Check that the tasks were inserted into the DB. |
| assertModifiedTasks(t, d, mod, []*types.Task{toil2, duty2, duty3}) |
| } |
| |
| // AddTasks should retry on ErrConcurrentUpdate. |
| func TestAddTasksRetries(t *testing.T) { |
| ctx, _, hashes, d, s, cleanup := setupAddTasksTest(t) |
| defer cleanup() |
| |
| toil1 := makeTask(ctx, "toil", rs1.Repo, hashes[6]) |
| duty1 := makeTask(ctx, "duty", rs1.Repo, hashes[6]) |
| work1 := makeTask(ctx, "work", rs1.Repo, hashes[6]) |
| toil2 := makeTask(ctx, "toil", rs1.Repo, hashes[1]) |
| toil2.Commits = []string{hashes[1], hashes[2], hashes[3], hashes[4], hashes[5]} |
| sort.Strings(toil2.Commits) |
| duty2 := makeTask(ctx, "duty", rs1.Repo, hashes[1]) |
| duty2.Commits = util.CopyStringSlice(toil2.Commits) |
| work2 := makeTask(ctx, "work", rs1.Repo, hashes[1]) |
| work2.Commits = util.CopyStringSlice(toil2.Commits) |
| require.NoError(t, s.putTasks(ctx, []*types.Task{toil1, toil2, duty1, duty2, work1, work2})) |
| |
| mod := d.ModifiedTasksCh(ctx) |
| <-mod // The first batch is unused. |
| |
| // *3.Commits = {3, 4, 5} |
| // *2.Commits = {1, 2} |
| toil3 := makeTask(ctx, "toil", rs1.Repo, hashes[3]) |
| duty3 := makeTask(ctx, "duty", rs1.Repo, hashes[3]) |
| work3 := makeTask(ctx, "work", rs1.Repo, hashes[3]) |
| // *4.Commits = {2} |
| // *2.Commits = {1} |
| toil4 := makeTask(ctx, "toil", rs1.Repo, hashes[2]) |
| duty4 := makeTask(ctx, "duty", rs1.Repo, hashes[2]) |
| work4 := makeTask(ctx, "work", rs1.Repo, hashes[2]) |
| |
| tasks := map[string]map[string][]*types.Task{ |
| rs1.Repo: { |
| "toil": {toil3.Copy(), toil4.Copy()}, |
| "duty": {duty3.Copy(), duty4.Copy()}, |
| "work": {work3.Copy(), work4.Copy()}, |
| }, |
| } |
| |
| retryCountMtx := sync.Mutex{} |
| retryCount := map[string]int{} |
| causeConcurrentUpdate := func(tasks []*types.Task) { |
| retryCountMtx.Lock() |
| defer retryCountMtx.Unlock() |
| retryCount[tasks[0].Name]++ |
| if tasks[0].Name == "toil" && retryCount["toil"] < 2 { |
| toil2.Started = now.Now(ctx).UTC() |
| require.NoError(t, d.PutTasks(ctx, []*types.Task{toil2})) |
| s.tCache.AddTasks([]*types.Task{toil2}) |
| } |
| if tasks[0].Name == "duty" && retryCount["duty"] < 3 { |
| duty2.Started = now.Now(ctx).UTC() |
| require.NoError(t, d.PutTasks(ctx, []*types.Task{duty2})) |
| s.tCache.AddTasks([]*types.Task{duty2}) |
| } |
| if tasks[0].Name == "work" && retryCount["work"] < 4 { |
| work2.Started = now.Now(ctx).UTC() |
| require.NoError(t, d.PutTasks(ctx, []*types.Task{work2})) |
| s.tCache.AddTasks([]*types.Task{work2}) |
| } |
| } |
| s.db = &spyDB{ |
| DB: d, |
| onPutTasks: causeConcurrentUpdate, |
| } |
| |
| require.NoError(t, s.addTasks(ctx, tasks)) |
| |
| retryCountMtx.Lock() |
| defer retryCountMtx.Unlock() |
| require.Equal(t, 2, retryCount["toil"]) |
| require.Equal(t, 3, retryCount["duty"]) |
| require.Equal(t, 4, retryCount["work"]) |
| |
| modified := []*types.Task{} |
| check := func(t2, t3, t4 *types.Task) { |
| t2InDB, err := d.GetTaskById(ctx, t2.Id) |
| require.NoError(t, err) |
| assertBlamelist(t, hashes, t2InDB, []int{1}) |
| t3Arg := tasks[t3.Repo][t3.Name][0] |
| t3InDB, err := d.GetTaskById(ctx, t3Arg.Id) |
| require.NoError(t, err) |
| assertdeep.Equal(t, t3Arg, t3InDB) |
| assertBlamelist(t, hashes, t3InDB, []int{3, 4, 5}) |
| t4Arg := tasks[t4.Repo][t4.Name][1] |
| t4InDB, err := d.GetTaskById(ctx, t4Arg.Id) |
| require.NoError(t, err) |
| assertdeep.Equal(t, t4Arg, t4InDB) |
| assertBlamelist(t, hashes, t4InDB, []int{2}) |
| t2.Commits = t2InDB.Commits |
| t2.DbModified = t2InDB.DbModified |
| t3.Id = t3InDB.Id |
| t3.Commits = t3InDB.Commits |
| t3.DbModified = t3InDB.DbModified |
| t4.Id = t4InDB.Id |
| t4.Commits = t4InDB.Commits |
| t4.DbModified = t4InDB.DbModified |
| modified = append(modified, t2, t3, t4) |
| } |
| |
| check(toil2, toil3, toil4) |
| check(duty2, duty3, duty4) |
| check(work2, work3, work4) |
| assertModifiedTasks(t, d, mod, modified) |
| } |
| |
| func TestTriggerTaskFailed(t *testing.T) { |
| // Verify that if one task out of a set fails to trigger, the others are |
| // still inserted into the DB and handled properly, eg. wrt. blamelists. |
| ctx, _, _, s, swarmingClient, commits, _, _, cleanup := testMultipleCandidatesBackfillingEachOtherSetup(t) |
| defer cleanup() |
| |
| // Trigger three tasks. We should attempt to trigger tasks at |
| // commits[0], commits[4], and either commits[2] or commits[6]. Mock |
| // failure to trigger the task at commits[4] and ensure that the other |
| // two tasks get inserted with the correct blamelists. |
| bot1 := makeBot("bot1", map[string]string{"pool": "Skia"}) |
| bot2 := makeBot("bot2", map[string]string{"pool": "Skia"}) |
| bot3 := makeBot("bot3", map[string]string{"pool": "Skia"}) |
| makeTags := func(commit string) []string { |
| return []string{ |
| "luci_project:", |
| "milo_host:https://ci.chromium.org/raw/build/%s", |
| "sk_attempt:0", |
| "sk_dim_pool:Skia", |
| "sk_retry_of:", |
| fmt.Sprintf("source_revision:%s", commit), |
| "source_repo:" + fmt.Sprintf(gitiles.CommitURL, rs1.Repo, "%s"), |
| fmt.Sprintf("sk_repo:%s", rs1.Repo), |
| fmt.Sprintf("sk_revision:%s", commit), |
| "sk_forced_job_id:", |
| "sk_name:faketask", |
| } |
| } |
| mockBots(t, swarmingClient, bot1, bot2, bot3) |
| swarmingClient.MockTriggerTaskFailure(makeTags(commits[4])) |
| err := s.MainLoop(ctx) |
| s.testWaitGroup.Wait() |
| require.NotNil(t, err) |
| require.True(t, strings.Contains(err.Error(), "Mocked trigger failure!")) |
| require.NoError(t, s.tCache.Update(ctx)) |
| require.Equal(t, 6, len(s.queue)) |
| tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, commits) |
| require.NoError(t, err) |
| |
| var t1, t2, t3 *types.Task |
| for _, byName := range tasks { |
| for _, task := range byName { |
| if task.Revision == commits[0] { |
| t1 = task |
| } else if task.Revision == commits[4] { |
| t2 = task |
| } else if task.Revision == commits[2] || task.Revision == commits[6] { |
| t3 = task |
| } else { |
| require.FailNow(t, fmt.Sprintf("Task has unknown revision: %v", task)) |
| } |
| } |
| } |
| require.NotNil(t, t1) |
| require.Nil(t, t2) |
| require.NotNil(t, t3) |
| |
| // Ensure that we got the blamelists right. |
| var expect1, expect3 []string |
| if t3.Revision == commits[2] { |
| expect1 = util.CopyStringSlice(commits[:2]) |
| expect3 = util.CopyStringSlice(commits[2:]) |
| } else { |
| expect1 = util.CopyStringSlice(commits[:6]) |
| expect3 = util.CopyStringSlice(commits[6:]) |
| } |
| sort.Strings(expect1) |
| sort.Strings(expect3) |
| sort.Strings(t1.Commits) |
| sort.Strings(t3.Commits) |
| assertdeep.Equal(t, expect1, t1.Commits) |
| assertdeep.Equal(t, expect3, t3.Commits) |
| |
| // Check diagnostics. |
| diag := lastDiagnostics(t, s) |
| failedTrigger := 0 |
| for _, c := range diag.Candidates { |
| if c.Revision == commits[4] { |
| require.True(t, strings.Contains(c.Diagnostics.Triggering.TriggerError, "Mocked trigger failure!")) |
| failedTrigger++ |
| } else { |
| if c.TaskKey == t1.TaskKey { |
| require.Equal(t, "", c.Diagnostics.Triggering.TriggerError) |
| require.Equal(t, t1.Id, c.Diagnostics.Triggering.TaskId) |
| } else if c.TaskKey == t3.TaskKey { |
| require.Equal(t, "", c.Diagnostics.Triggering.TriggerError) |
| require.Equal(t, t3.Id, c.Diagnostics.Triggering.TaskId) |
| } else { |
| require.Nil(t, c.Diagnostics.Triggering) |
| } |
| } |
| } |
| // Should be one task that failed |
| require.Equal(t, 1, failedTrigger) |
| } |
| |
| const badTaskName = "badtask" |
| |
| type mockDB struct { |
| db.DB |
| failAssignId bool |
| failPutTasks bool |
| } |
| |
| func (d *mockDB) AssignId(ctx context.Context, t *types.Task) error { |
| if t.Name == badTaskName && d.failAssignId { |
| return errors.New(badTaskName) |
| } |
| return d.DB.AssignId(ctx, t) |
| } |
| |
| func (d *mockDB) PutTasks(ctx context.Context, tasks []*types.Task) error { |
| for _, t := range tasks { |
| if t.Name == badTaskName && d.failPutTasks { |
| return errors.New(badTaskName) |
| } |
| } |
| return d.DB.PutTasks(ctx, tasks) |
| } |
| |
| func TestContinueOnTriggerTaskFailure(t *testing.T) { |
| // Verify that if one task out of a set fails any part of the triggering |
| // process, the others are still triggered, inserted into the DB, etc. |
| ctx, mg, _, s, swarmingClient, commits, _, cfg, cleanup := testMultipleCandidatesBackfillingEachOtherSetup(t) |
| defer cleanup() |
| |
| // Add several more commits. |
| newCommits := mg.CommitN(10) |
| commits = append(newCommits, commits...) |
| badCommit := commits[0] |
| badTaskName := "badtask" |
| badPool := "BadPool" |
| s.pools = []string{"Skia", badPool} |
| require.NoError(t, s.repos.Update(ctx)) |
| for _, hash := range newCommits { |
| rs := types.RepoState{ |
| Repo: rs1.Repo, |
| Revision: hash, |
| } |
| c := cfg.Copy() |
| if hash == badCommit { |
| c.Tasks[badTaskName] = &specs.TaskSpec{ |
| CasSpec: "compile", |
| CipdPackages: []*specs.CipdPackage{}, |
| Dependencies: []string{}, |
| Dimensions: []string{fmt.Sprintf("pool:%s", badPool)}, |
| Priority: 1.0, |
| } |
| c.Jobs["badjob"] = &specs.JobSpec{ |
| TaskSpecs: []string{badTaskName}, |
| } |
| } |
| fillCaches(t, ctx, s.taskCfgCache, rs, c) |
| insertJobs(t, ctx, s, rs) |
| } |
| |
| finishAll := func() { |
| tasks, err := s.tCache.UnfinishedTasks() |
| require.NoError(t, err) |
| for _, task := range tasks { |
| task.Finished = now.Now(ctx) |
| task.Status = types.TASK_STATUS_SUCCESS |
| } |
| require.NoError(t, s.db.PutTasks(ctx, tasks)) |
| } |
| finishAll() // The setup func triggers a task. |
| |
| badTags := []string{ |
| "luci_project:", |
| "milo_host:https://ci.chromium.org/raw/build/%s", |
| "sk_attempt:0", |
| fmt.Sprintf("sk_dim_pool:%s", badPool), |
| "sk_retry_of:", |
| fmt.Sprintf("source_revision:%s", badCommit), |
| "source_repo:" + fmt.Sprintf(gitiles.CommitURL, rs1.Repo, "%s"), |
| fmt.Sprintf("sk_repo:%s", rs1.Repo), |
| fmt.Sprintf("sk_revision:%s", badCommit), |
| "sk_forced_job_id:", |
| fmt.Sprintf("sk_name:%s", badTaskName), |
| } |
| |
| test := func() { |
| // Pretend there are two bots available. |
| bots := []*types.Machine{ |
| makeBot("bot0", map[string]string{"pool": "Skia"}), |
| makeBot("bot1", map[string]string{"pool": badPool}), |
| } |
| mockBots(t, swarmingClient, bots...) |
| |
| // Run MainLoop. |
| err := s.MainLoop(ctx) |
| s.testWaitGroup.Wait() |
| require.NotNil(t, err) |
| require.NoError(t, s.tCache.Update(ctx)) |
| |
| // We'll try to trigger all tasks but the one for the bad commit will |
| // fail. Ensure that we triggered all of the others. |
| tasks, err := s.tCache.UnfinishedTasks() |
| require.NoError(t, err) |
| require.Len(t, tasks, 1) |
| require.NotEqual(t, badTaskName, tasks[0].Name) |
| |
| // Clean up for the next iteration. |
| finishAll() |
| } |
| |
| // 1. Actual triggering failed. |
| swarmingClient.MockTriggerTaskFailure(badTags) |
| test() |
| |
| // 2. DB.AssignId failed. |
| mdb := &mockDB{ |
| DB: s.db, |
| failAssignId: true, |
| } |
| s.db = mdb |
| test() |
| |
| // 3. DB.PutTasks failed. |
| mdb.failAssignId = false |
| mdb.failPutTasks = true |
| test() |
| |
| // 4. Failure to load details of de-duplicated task after triggering. |
| mdb.failPutTasks = false |
| swarmingClient.MockTriggerTaskDeduped(badTags, "thistagwontparse") |
| test() |
| s.db = mdb.DB |
| |
| // 5. Failure to merge CAS entries. |
| // TODO(borenet) |
| } |
| |
| func TestTriggerTaskDeduped(t *testing.T) { |
| // Verify that we properly handle de-duplicated tasks. |
| ctx, _, _, s, swarmingClient, commits, _, _, cleanup := testMultipleCandidatesBackfillingEachOtherSetup(t) |
| defer cleanup() |
| |
| // Trigger three tasks. We should attempt to trigger tasks at |
| // commits[0], commits[4], and either commits[2] or commits[6]. Mock |
| // deduplication of the task at commits[4] and ensure that the other |
| // two tasks are not deduped. |
| bot1 := makeBot("bot1", map[string]string{"pool": "Skia"}) |
| bot2 := makeBot("bot2", map[string]string{"pool": "Skia"}) |
| bot3 := makeBot("bot3", map[string]string{"pool": "Skia"}) |
| makeTags := func(commit string) []string { |
| return []string{ |
| "luci_project:", |
| "milo_host:https://ci.chromium.org/raw/build/%s", |
| "sk_attempt:0", |
| "sk_dim_pool:Skia", |
| "sk_retry_of:", |
| fmt.Sprintf("source_revision:%s", commit), |
| "source_repo:" + fmt.Sprintf(gitiles.CommitURL, rs1.Repo, "%s"), |
| fmt.Sprintf("sk_repo:%s", rs1.Repo), |
| fmt.Sprintf("sk_revision:%s", commit), |
| "sk_forced_job_id:", |
| "sk_name:faketask", |
| } |
| } |
| mockBots(t, swarmingClient, bot1, bot2, bot3) |
| swarmingClient.MockTriggerTaskDeduped(makeTags(commits[4])) |
| require.NoError(t, s.MainLoop(ctx)) |
| s.testWaitGroup.Wait() |
| require.NoError(t, s.tCache.Update(ctx)) |
| require.Equal(t, 5, len(s.queue)) |
| tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, commits) |
| require.NoError(t, err) |
| |
| var t1, t2, t3 *types.Task |
| for _, byName := range tasks { |
| for _, task := range byName { |
| if task.Revision == commits[0] { |
| t1 = task |
| } else if task.Revision == commits[4] { |
| t2 = task |
| } else if task.Revision == commits[2] || task.Revision == commits[6] { |
| t3 = task |
| } else { |
| require.FailNow(t, fmt.Sprintf("Task has unknown revision: %v", task)) |
| } |
| } |
| } |
| require.NotNil(t, t1) |
| require.NotNil(t, t2) |
| require.NotNil(t, t3) |
| |
| // Ensure that t2 was correctly deduped, and the others weren't. |
| require.Equal(t, types.TASK_STATUS_PENDING, t1.Status) |
| require.Equal(t, types.TASK_STATUS_SUCCESS, t2.Status) |
| require.Equal(t, types.TASK_STATUS_PENDING, t3.Status) |
| } |
| |
| func TestTriggerTaskNoResource(t *testing.T) { |
| // Verify that we properly handle tasks which are rejected due to lack |
| // of matching bots. |
| ctx, _, _, s, swarmingClient, commits, _, _, cleanup := testMultipleCandidatesBackfillingEachOtherSetup(t) |
| defer cleanup() |
| |
| // Attempt to trigger a task. It gets rejected because the bot we |
| // thought was available to run it is now busy/quarantined/offline. |
| bot1 := makeBot("bot1", map[string]string{"pool": "Skia"}) |
| makeTags := func(commit string) []string { |
| return []string{ |
| "luci_project:", |
| "milo_host:https://ci.chromium.org/raw/build/%s", |
| "sk_attempt:0", |
| "sk_dim_pool:Skia", |
| "sk_retry_of:", |
| fmt.Sprintf("source_revision:%s", commit), |
| "source_repo:" + fmt.Sprintf(gitiles.CommitURL, rs1.Repo, "%s"), |
| fmt.Sprintf("sk_repo:%s", rs1.Repo), |
| fmt.Sprintf("sk_revision:%s", commit), |
| "sk_forced_job_id:", |
| "sk_name:faketask", |
| } |
| } |
| mockBots(t, swarmingClient, bot1) |
| swarmingClient.MockTriggerTaskNoResource(makeTags(commits[0])) |
| err := s.MainLoop(ctx) |
| require.NotNil(t, err) |
| require.True(t, strings.Contains(err.Error(), "No bots available to run faketask with dimensions: pool:Skia")) |
| s.testWaitGroup.Wait() |
| require.NoError(t, s.tCache.Update(ctx)) |
| require.Equal(t, 8, len(s.queue)) |
| tasks, err := s.tCache.GetTasksForCommits(rs1.Repo, []string{commits[0]}) |
| require.NoError(t, err) |
| require.Equal(t, 0, len(tasks[commits[0]])) |
| } |
| |
| func TestScoreCandidate_TryJob_PrioritizedHigherByWaitTime(t *testing.T) { |
| ctx := context.Background() |
| |
| test := func(name, jobCreated, now string, expectedScore float64) { |
| t.Run(name, func(t *testing.T) { |
| s := TaskScheduler{} |
| job := types.Job{ |
| Created: rfc3339(t, jobCreated), |
| Priority: specs.DEFAULT_JOB_SPEC_PRIORITY, |
| } |
| tc := asTryJob(TaskCandidate{ |
| Jobs: []*types.Job{&job}, |
| }) |
| s.scoreCandidate(ctx, &tc, rfc3339(t, now), timeDoesNotMatter, nil) |
| assert.InDelta(t, expectedScore, tc.Score, 0.0001) |
| }) |
| } |
| |
| test("no waiting", "2021-10-01T15:00:00Z", "2021-10-01T15:00:00Z", 5) |
| test("10 min waiting", "2021-10-01T15:00:00Z", "2021-10-01T15:10:00Z", 5.08333) |
| test("30 min waiting", "2021-10-01T08:00:00Z", "2021-10-01T08:30:00Z", 5.25) |
| test("2 hours waiting", "2021-10-01T13:00:00Z", "2021-10-01T15:00:00Z", 6) |
| } |
| |
| func TestScoreCandidate_TryJob_PrioritizedLowerByNumRetries(t *testing.T) { |
| ctx := context.Background() |
| |
| test := func(name string, numRetries int, expectedScore float64) { |
| t.Run(name, func(t *testing.T) { |
| s := TaskScheduler{} |
| ts := rfc3339(t, "2021-10-01T15:00:00Z") // fixed time indicating no waiting |
| job := types.Job{ |
| Created: ts, |
| Priority: specs.DEFAULT_JOB_SPEC_PRIORITY, |
| } |
| tc := asTryJob(TaskCandidate{ |
| Jobs: []*types.Job{&job}, |
| Attempt: numRetries, |
| }) |
| s.scoreCandidate(ctx, &tc, ts, timeDoesNotMatter, nil) |
| assert.InDelta(t, expectedScore, tc.Score, 0.0001) |
| }) |
| } |
| |
| test("first try", 0, 5) |
| test("second try", 1, 3.75) |
| test("third try", 2, 2.8125) |
| test("tenth try", 9, 0.3754) |
| } |
| |
| func TestScoreCandidate_TryJob_MultipleJobPrioritiesImpactScore(t *testing.T) { |
| ctx := context.Background() |
| |
| test := func(name string, expectedScore float64, jobPriorities ...float64) { |
| t.Run(name, func(t *testing.T) { |
| s := TaskScheduler{} |
| ts := rfc3339(t, "2021-10-01T15:00:00Z") // fixed time indicating no waiting |
| tc := asTryJob(TaskCandidate{}) |
| // For each job priority passed in, create a job with that priority that's a part |
| // of this task candidate. |
| for _, jp := range jobPriorities { |
| tc.Jobs = append(tc.Jobs, &types.Job{ |
| Created: ts, |
| Priority: jp, |
| }) |
| } |
| s.scoreCandidate(ctx, &tc, ts, timeDoesNotMatter, nil) |
| assert.InDelta(t, expectedScore, tc.Score, 0.0001) |
| }) |
| } |
| |
| test("one job, default priority", 5, specs.DEFAULT_JOB_SPEC_PRIORITY) |
| test("one job, higher priority", 7.5, 0.75) |
| test("one job, highest priority", 10, 1.0) |
| test("one job, lower priority", 2.5, 0.25) |
| test("one job, lowest priority", 0.001, 0.0001) |
| |
| test("out of range priority results in default", 5, 1234567) |
| test("zero priority results in default", 5, 0) |
| |
| // We score tasks higher if they have more jobs. |
| test("two jobs, medium priority", 7.5, 0.5, 0.5) |
| test("three jobs, medium priority", 8.75, 0.5, 0.5, 0.5) |
| test("four jobs, medium priority", 9.375, 0.5, 0.5, 0.5, 0.5) |
| test("four jobs, mixed priorities", 9.811, 0.1, 0.7, 0.3, 0.9) |
| // Double-check the math for the mixed priorities test |
| assert.InDelta(t, .9811, 1-(1-0.1)*(1-0.7)*(1-0.3)*(1-0.9), 0.0001) |
| } |
| |
| func TestScoreCandidate_TryJob_OldestJobOnlyUsedForWaitTime(t *testing.T) { |
| ctx := context.Background() |
| |
| cycleStart := rfc3339(t, "2021-10-01T15:00:00Z") |
| test := func(name string, expectedScore float64, jobCreated ...string) { |
| t.Run(name, func(t *testing.T) { |
| s := TaskScheduler{} |
| tc := asTryJob(TaskCandidate{}) |
| for _, ts := range jobCreated { |
| tc.Jobs = append(tc.Jobs, &types.Job{ |
| Created: rfc3339(t, ts), |
| Priority: specs.DEFAULT_JOB_SPEC_PRIORITY, |
| }) |
| } |
| s.scoreCandidate(ctx, &tc, cycleStart, timeDoesNotMatter, nil) |
| assert.InDelta(t, expectedScore, tc.Score, 0.0001) |
| }) |
| } |
| |
| // These should all be the same because the first job (which is presumed to be the oldest) |
| // is the time that is used. |
| test("two jobs, both waited 2 hours", 9, "2021-10-01T13:00:00Z", "2021-10-01T13:00:00Z") |
| test("two jobs, only one waited 2 hours", 9, "2021-10-01T13:00:00Z", "2021-10-01T14:55:00Z") |
| } |
| |
| func TestScoreCandidate_ForcedJob_PrioritizedHigherByWaitTime(t *testing.T) { |
| ctx := context.Background() |
| |
| test := func(name, jobCreated, now string, expectedScore float64) { |
| t.Run(name, func(t *testing.T) { |
| s := TaskScheduler{} |
| job := types.Job{ |
| Created: rfc3339(t, jobCreated), |
| Priority: specs.DEFAULT_JOB_SPEC_PRIORITY, |
| } |
| tc := asForcedJob(TaskCandidate{ |
| Jobs: []*types.Job{&job}, |
| }) |
| s.scoreCandidate(ctx, &tc, rfc3339(t, now), timeDoesNotMatter, nil) |
| assert.InDelta(t, expectedScore, tc.Score, 0.0001) |
| }) |
| } |
| |
| test("no waiting", "2021-10-01T15:00:00Z", "2021-10-01T15:00:00Z", 50) |
| test("10 min waiting", "2021-10-01T15:00:00Z", "2021-10-01T15:10:00Z", 50.08333) |
| test("30 min waiting", "2021-10-01T08:00:00Z", "2021-10-01T08:30:00Z", 50.25) |
| test("2 hours waiting", "2021-10-01T13:00:00Z", "2021-10-01T15:00:00Z", 51) |
| } |
| |
| func TestScoreCandidate_ForcedJob_MultipleJobPrioritiesImpactScore(t *testing.T) { |
| ctx := context.Background() |
| |
| test := func(name string, expectedScore float64, jobPriorities ...float64) { |
| t.Run(name, func(t *testing.T) { |
| s := TaskScheduler{} |
| ts := rfc3339(t, "2021-10-01T15:00:00Z") // fixed time indicating no waiting |
| tc := asForcedJob(TaskCandidate{}) |
| // For each job priority passed in, create a job with that priority that's a part |
| // of this task candidate. |
| for _, jp := range jobPriorities { |
| tc.Jobs = append(tc.Jobs, &types.Job{ |
| Created: ts, |
| Priority: jp, |
| }) |
| } |
| s.scoreCandidate(ctx, &tc, ts, timeDoesNotMatter, nil) |
| assert.InDelta(t, expectedScore, tc.Score, 0.0001) |
| }) |
| } |
| |
| test("one job, default priority", 50, specs.DEFAULT_JOB_SPEC_PRIORITY) |
| test("one job, higher priority", 75, 0.75) |
| test("one job, highest priority", 100, 1.0) |
| test("one job, lower priority", 25, 0.25) |
| test("one job, lowest priority", 0.01, 0.0001) |
| |
| test("out of range priority results in default", 50, 1234567) |
| test("zero priority results in default", 50, 0) |
| |
| // We score tasks higher if they have more jobs. |
| test("two jobs, medium priority", 75, 0.5, 0.5) |
| test("three jobs, medium priority", 87.5, 0.5, 0.5, 0.5) |
| test("four jobs, medium priority", 93.75, 0.5, 0.5, 0.5, 0.5) |
| test("four jobs, mixed priorities", 98.11, 0.1, 0.7, 0.3, 0.9) |
| // Double-check the math for the mixed priorities test |
| assert.InDelta(t, .9811, 1-(1-0.1)*(1-0.7)*(1-0.3)*(1-0.9), 0.0001) |
| } |
| |
| func TestScoreCandidate_ForcedJob_OldestJobOnlyUsedForWaitTime(t *testing.T) { |
| ctx := context.Background() |
| |
| cycleStart := rfc3339(t, "2021-10-01T15:00:00Z") |
| test := func(name string, expectedScore float64, jobCreated ...string) { |
| t.Run(name, func(t *testing.T) { |
| s := TaskScheduler{} |
| tc := asForcedJob(TaskCandidate{}) |
| for _, ts := range jobCreated { |
| tc.Jobs = append(tc.Jobs, &types.Job{ |
| Created: rfc3339(t, ts), |
| Priority: specs.DEFAULT_JOB_SPEC_PRIORITY, |
| }) |
| } |
| s.scoreCandidate(ctx, &tc, cycleStart, timeDoesNotMatter, nil) |
| assert.InDelta(t, expectedScore, tc.Score, 0.0001) |
| }) |
| } |
| |
| // These should all be the same because the first job (which is presumed to be the oldest) |
| // is the time that is used. |
| test("two jobs, both waited 2 hours", 76.5, "2021-10-01T13:00:00Z", "2021-10-01T13:00:00Z") |
| test("two jobs, only one waited 2 hours", 76.5, "2021-10-01T13:00:00Z", "2021-10-01T14:55:00Z") |
| } |
| |
| func TestComputeBlamelist_NoExistingTests(t *testing.T) { |
| ctx := context.Background() |
| |
| const repoName = "some_repo" |
| const taskName = "Test-Foo-Bar" |
| tcg := tcc_mocks.TasksAlwaysDefined(taskName) |
| |
| mg, repo := newMemRepo(t) |
| firstCommit := mg.Commit("a") |
| secondCommit := mg.Commit("b", firstCommit) |
| thirdCommit := mg.Commit("c", secondCommit) |
| |
| mtc := &cache_mocks.TaskCache{} |
| mtc.On("GetTaskForCommit", repoName, mock.Anything, taskName).Return(nil, nil) |
| |
| blamelistNoTask := func(name string, revision string, expectedBlamelist []string) { |
| t.Run(name, func(t *testing.T) { |
| blamedCommits, stealFromTask, err := ComputeBlamelist(ctx, mtc, repo, taskName, repoName, repo.Get(revision), commitsBuffer, tcg, window_mocks.AllInclusiveWindow()) |
| require.NoError(t, err) |
| assert.Equal(t, expectedBlamelist, blamedCommits) |
| assert.Nil(t, stealFromTask) |
| }) |
| } |
| // We expect this commit and all following commits to be blamed |
| blamelistNoTask("first commit", firstCommit, []string{firstCommit}) |
| blamelistNoTask("second commit", secondCommit, []string{secondCommit, firstCommit}) |
| blamelistNoTask("third commit", thirdCommit, []string{thirdCommit, secondCommit, firstCommit}) |
| } |
| |
| func TestComputeBlamelist_FirstCommitTested(t *testing.T) { |
| ctx := context.Background() |
| |
| const repoName = "some_repo" |
| const taskName = "Test-Foo-Bar" |
| tcg := tcc_mocks.TasksAlwaysDefined(taskName) |
| |
| mg, repo := newMemRepo(t) |
| firstCommit := mg.Commit("a") |
| secondCommit := mg.Commit("b", firstCommit) |
| thirdCommit := mg.Commit("c", secondCommit) |
| |
| firstCommitTask := newTask("task-1", firstCommit) |
| |
| mtc := &cache_mocks.TaskCache{} |
| mtc.On("GetTaskForCommit", repoName, firstCommit, taskName).Return(firstCommitTask, nil) |
| mtc.On("GetTaskForCommit", repoName, mock.Anything, taskName).Return(nil, nil) |
| |
| blamelistNoTask := func(name string, revision string, expectedBlamelist []string) { |
| t.Run(name, func(t *testing.T) { |
| blamedCommits, stealFromTask, err := ComputeBlamelist(ctx, mtc, repo, taskName, repoName, repo.Get(revision), commitsBuffer, tcg, window_mocks.AllInclusiveWindow()) |
| require.NoError(t, err) |
| assert.Equal(t, expectedBlamelist, blamedCommits) |
| assert.Nil(t, stealFromTask) |
| }) |
| } |
| // For commits after the task, the blamelist should extend back, but not include that commit. |
| blamelistNoTask("second commit", secondCommit, []string{secondCommit}) |
| blamelistNoTask("third commit", thirdCommit, []string{thirdCommit, secondCommit}) |
| |
| // Calculating a blame for a commit which already ran a task is considered a retry. As such, |
| // the entire blamelist is returned. |
| t.Run("first commit (retry)", func(t *testing.T) { |
| blamedCommits, stealFromTask, err := ComputeBlamelist(ctx, mtc, repo, taskName, repoName, repo.Get(firstCommit), commitsBuffer, tcg, window_mocks.AllInclusiveWindow()) |
| require.NoError(t, err) |
| assert.Equal(t, []string{firstCommit}, blamedCommits) |
| assert.Equal(t, firstCommitTask, stealFromTask) |
| }) |
| } |
| |
| func TestComputeBlamelist_LastCommitTested_FollowingCommitsBlamed(t *testing.T) { |
| ctx := context.Background() |
| |
| const repoName = "some_repo" |
| const taskName = "Test-Foo-Bar" |
| tcg := tcc_mocks.TasksAlwaysDefined(taskName) |
| |
| mg, repo := newMemRepo(t) |
| firstCommit := mg.Commit("a") |
| secondCommit := mg.Commit("b", firstCommit) |
| thirdCommit := mg.Commit("c", secondCommit) |
| |
| thirdCommitTask := newTask("task-1", thirdCommit, secondCommit, firstCommit) |
| |
| mtc := &cache_mocks.TaskCache{} |
| // By returning this task for all commits, we are saying that every commit is covered by |
| // the task that ran on the last commit. |
| mtc.On("GetTaskForCommit", repoName, mock.Anything, taskName).Return(thirdCommitTask, nil) |
| |
| blamelistAndTask := func(name string, revision string, expectedBlamelist []string, expectedTask *types.Task) { |
| t.Run(name, func(t *testing.T) { |
| blamedCommits, stealFromTask, err := ComputeBlamelist(ctx, mtc, repo, taskName, repoName, repo.Get(revision), commitsBuffer, tcg, window_mocks.AllInclusiveWindow()) |
| require.NoError(t, err) |
| assert.Equal(t, expectedBlamelist, blamedCommits) |
| assert.Equal(t, expectedTask, stealFromTask) |
| }) |
| } |
| |
| blamelistAndTask("first commit", firstCommit, []string{firstCommit}, thirdCommitTask) |
| blamelistAndTask("second commit", secondCommit, []string{secondCommit, firstCommit}, thirdCommitTask) |
| // Calculating a blame for a commit which already ran a task is considered a retry. As such, |
| // the entire blamelist is returned. |
| blamelistAndTask("third commit (retry)", thirdCommit, []string{thirdCommit, secondCommit, firstCommit}, thirdCommitTask) |
| } |
| |
| func TestComputeBlamelist_BlamelistTooLong_UseProvidedCommit(t *testing.T) { |
| ctx := context.Background() |
| |
| const repoName = "some_repo" |
| const taskName = "Test-Foo-Bar" |
| tcg := tcc_mocks.TasksAlwaysDefined(taskName) |
| |
| // Pretend we have 1000 commits in a row. We make sure their hashes are unique but predictable |
| // by using the integer values from 0 to 999 prefixed with enough zeros to make them 40 digits |
| // (the length of a real-world git SHA1 hash). |
| require.Greater(t, 1000, MAX_BLAMELIST_COMMITS) |
| mg, repo := newMemRepo(t) |
| mg.Commit("0") |
| for i := 1; i < 1000; i++ { |
| h := fmt.Sprintf("Commit_%d", i) |
| mg.Commit(h) |
| } |
| lastCommit := repo.Get(repo.Branches()[0]) |
| require.Equal(t, "Commit_999", lastCommit.Subject) |
| |
| mtc := &cache_mocks.TaskCache{} |
| mtc.On("GetTaskForCommit", repoName, mock.Anything, taskName).Return(nil, nil) |
| |
| blamedCommits, stealFromTask, err := ComputeBlamelist(ctx, mtc, repo, taskName, repoName, lastCommit, commitsBuffer, tcg, window_mocks.AllInclusiveWindow()) |
| require.NoError(t, err) |
| assert.Equal(t, []string{lastCommit.Hash}, blamedCommits) |
| assert.Nil(t, stealFromTask) |
| } |
| |
| func TestComputeBlamelist_BranchInHistory_NonOverlappingCoverage(t *testing.T) { |
| ctx := context.Background() |
| |
| const repoName = "some_repo" |
| const taskName = "Test-Foo-Bar" |
| tcg := tcc_mocks.TasksAlwaysDefined(taskName) |
| |
| // This represents a git history with a branch and merge like this. |
| // The asterisks represent at which commits a task has run. Only one of the branches |
| // has a test, so commit 2 is not "double covered". |
| // 0 |
| // 1* |
| // 2 |
| // 3a 3b |
| // 4a* 4b |
| // 5a | |
| // 6 |
| mg, repo := newMemRepo(t) |
| zero := mg.Commit("0") |
| one := mg.Commit("1", zero) |
| two := mg.Commit("2", one) |
| threeA := mg.Commit("3a", two) |
| threeB := mg.Commit("3b", two) |
| fourA := mg.Commit("4a", threeA) |
| fourB := mg.Commit("4b", threeB) |
| fiveA := mg.Commit("5a", fourA) |
| six := mg.Commit("6", fourB, fiveA) |
| |
| commitOneTask := newTask("task-1", one, zero) |
| commitFourATask := newTask("task-2", fourA, threeA, two) |
| |
| mtc := &cache_mocks.TaskCache{} |
| mtc.On("GetTaskForCommit", repoName, zero, taskName).Return(commitOneTask, nil) |
| mtc.On("GetTaskForCommit", repoName, one, taskName).Return(commitOneTask, nil) |
| mtc.On("GetTaskForCommit", repoName, two, taskName).Return(commitFourATask, nil) |
| mtc.On("GetTaskForCommit", repoName, threeA, taskName).Return(commitFourATask, nil) |
| mtc.On("GetTaskForCommit", repoName, fourA, taskName).Return(commitFourATask, nil) |
| mtc.On("GetTaskForCommit", repoName, mock.Anything, taskName).Return(nil, nil) |
| |
| blamelistAndTask := func(name string, revision string, expectedBlamelist []string, expectedTask *types.Task) { |
| t.Run(name, func(t *testing.T) { |
| blamedCommits, stealFromTask, err := ComputeBlamelist(ctx, mtc, repo, taskName, repoName, repo.Get(revision), commitsBuffer, tcg, window_mocks.AllInclusiveWindow()) |
| require.NoError(t, err) |
| assert.Equal(t, expectedBlamelist, blamedCommits) |
| assert.Equal(t, expectedTask, stealFromTask) |
| }) |
| } |
| |
| blamelistAndTask("commit zero", zero, []string{zero}, commitOneTask) |
| blamelistAndTask("commit one", one, []string{one, zero}, commitOneTask) |
| blamelistAndTask("commit two", two, []string{two}, commitFourATask) |
| blamelistAndTask("commit threeA", threeA, []string{threeA, two}, commitFourATask) |
| blamelistAndTask("commit threeB", threeB, []string{threeB}, nil) |
| blamelistAndTask("commit fourA", fourA, []string{fourA, threeA, two}, commitFourATask) |
| blamelistAndTask("commit fourB", fourB, []string{fourB, threeB}, nil) |
| blamelistAndTask("commit fiveA", fiveA, []string{fiveA}, nil) |
| blamelistAndTask("commit six", six, []string{six, fourB, threeB, fiveA}, nil) |
| } |
| |
| var ( |
| // Use of timeDoesNotMatter indicates that the time does not affect the outputs. |
| timeDoesNotMatter = time.Time{} |
| commitsBuffer = make([]*repograph.Commit, 0, MAX_BLAMELIST_COMMITS) |
| ) |
| |
| func rfc3339(t *testing.T, n string) time.Time { |
| ts, err := time.Parse(time.RFC3339, n) |
| require.NoError(t, err) |
| return ts |
| } |
| |
| func asTryJob(c TaskCandidate) TaskCandidate { |
| c.Issue = "nonempty" |
| c.Patchset = "nonempty" |
| c.Server = "nonempty" |
| if !c.IsTryJob() { |
| panic("Should be a tryjob task candidate now") |
| } |
| return c |
| } |
| |
| func asForcedJob(c TaskCandidate) TaskCandidate { |
| c.ForcedJobId = "nonempty" |
| if !c.IsForceRun() { |
| panic("Should be a tryjob task candidate now") |
| } |
| return c |
| } |
| |
| func newTask(id string, ranAt string, alsoCovered ...string) *types.Task { |
| nt := types.Task{Id: id} |
| nt.Revision = ranAt |
| nt.Commits = append(nt.Commits, ranAt) |
| for _, c := range alsoCovered { |
| nt.Commits = append(nt.Commits, c) |
| } |
| return &nt |
| } |
| |
| func TestRegenerateTaskQueue_OnlyBestCandidateForCD(t *testing.T) { |
| |
| ctx := context.Background() |
| |
| // Create a TasksCfg. |
| tc := &specs.TasksCfg{ |
| Jobs: map[string]*specs.JobSpec{ |
| "cd-job": { |
| IsCD: true, |
| TaskSpecs: []string{ |
| "cd-task", |
| }, |
| }, |
| }, |
| Tasks: map[string]*specs.TaskSpec{ |
| "cd-task": { |
| CasSpec: "empty", |
| Dimensions: []string{fmt.Sprintf("pool:%s", cdPoolName)}, |
| }, |
| }, |
| CasSpecs: map[string]*specs.CasSpec{ |
| "empty": { |
| Digest: rbe.EmptyDigest, |
| }, |
| }, |
| } |
| tcc := &tcc_mocks.TaskCfgCache{} |
| tcc.On("Get", mock.Anything, mock.Anything).Return(tc, nil, nil) |
| |
| // There are three commits in the repo. |
| mg, repo := newMemRepo(t) |
| hashes := mg.CommitN(3) |
| rs1 := types.RepoState{ |
| Repo: "fake-repo", |
| Revision: hashes[2], |
| } |
| rs2 := types.RepoState{ |
| Repo: "fake-repo", |
| Revision: hashes[1], |
| } |
| rs3 := types.RepoState{ |
| Repo: "fake-repo", |
| Revision: hashes[0], |
| } |
| repos := repograph.Map{ |
| rs1.Repo: repo, |
| } |
| |
| // Add jobs to the cache. |
| jCache := &cache_mocks.JobCache{} |
| jobs := []*types.Job{ |
| { |
| Name: "cd-job", |
| RepoState: rs1, |
| Created: rfc3339(t, "2021-10-01T15:00:00Z"), |
| Dependencies: map[string][]string{"cd-task": {}}, |
| }, |
| { |
| Name: "cd-job", |
| RepoState: rs2, |
| Created: rfc3339(t, "2021-10-01T15:00:00Z"), |
| Dependencies: map[string][]string{"cd-task": {}}, |
| }, |
| { |
| Name: "cd-job", |
| RepoState: rs3, |
| Created: rfc3339(t, "2021-10-01T15:00:00Z"), |
| Dependencies: map[string][]string{"cd-task": {}}, |
| }, |
| } |
| jCache.On("UnfinishedJobs").Return(jobs, nil) |
| |
| // No tasks in the TaskCache. |
| tCache := &cache_mocks.TaskCache{} |
| tCache.On("GetTasksByKey", mock.Anything).Return([]*types.Task{}, nil) |
| tCache.On("GetTaskForCommit", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) |
| |
| // Set up a time window to include everything. |
| w := window_mocks.AllInclusiveWindow() |
| |
| // Create the scheduler. |
| s := &TaskScheduler{ |
| candidateMetrics: map[string]metrics2.Int64Metric{}, |
| jCache: jCache, |
| repos: repos, |
| taskCfgCache: tcc, |
| tCache: tCache, |
| window: w, |
| } |
| |
| // Regenerate the task queue. |
| queue, _, err := s.regenerateTaskQueue(ctx) |
| require.NoError(t, err) |
| |
| // There should only be one task in the queue, despite the fact that there |
| // are three candidates available, since we only consider running the newest |
| // candidate. |
| require.Len(t, queue, 1) |
| c := queue[0] |
| require.Len(t, c.Commits, 3) |
| require.True(t, c.IsCD) |
| require.Equal(t, rs3.Revision, c.Revision) |
| } |
| |
| func TestRegenerateTaskQueue_NoBackfillForCD(t *testing.T) { |
| |
| ctx := context.Background() |
| |
| // Create a TasksCfg. |
| tc := &specs.TasksCfg{ |
| Jobs: map[string]*specs.JobSpec{ |
| "cd-job": { |
| IsCD: true, |
| TaskSpecs: []string{ |
| "cd-task", |
| }, |
| }, |
| }, |
| Tasks: map[string]*specs.TaskSpec{ |
| "cd-task": { |
| CasSpec: "empty", |
| Dimensions: []string{fmt.Sprintf("pool:%s", cdPoolName)}, |
| }, |
| }, |
| CasSpecs: map[string]*specs.CasSpec{ |
| "empty": { |
| Digest: rbe.EmptyDigest, |
| }, |
| }, |
| } |
| tcc := &tcc_mocks.TaskCfgCache{} |
| tcc.On("Get", mock.Anything, mock.Anything).Return(tc, nil, nil) |
| |
| // There are three commits in the repo. |
| gb, repo := newMemRepo(t) |
| hashes := gb.CommitN(3) |
| rs1 := types.RepoState{ |
| Repo: "fake-repo", |
| Revision: hashes[2], |
| } |
| rs2 := types.RepoState{ |
| Repo: "fake-repo", |
| Revision: hashes[1], |
| } |
| rs3 := types.RepoState{ |
| Repo: "fake-repo", |
| Revision: hashes[0], |
| } |
| repos := repograph.Map{ |
| rs1.Repo: repo, |
| } |
| |
| // Add jobs to the cache. |
| jCache := &cache_mocks.JobCache{} |
| jobs := []*types.Job{ |
| { |
| Name: "cd-job", |
| RepoState: rs1, |
| Created: rfc3339(t, "2021-10-01T15:00:00Z"), |
| Dependencies: map[string][]string{"cd-task": {}}, |
| }, |
| { |
| Name: "cd-job", |
| RepoState: rs2, |
| Created: rfc3339(t, "2021-10-01T15:00:00Z"), |
| Dependencies: map[string][]string{"cd-task": {}}, |
| }, |
| { |
| Name: "cd-job", |
| RepoState: rs3, |
| Created: rfc3339(t, "2021-10-01T15:00:00Z"), |
| Dependencies: map[string][]string{"cd-task": {}}, |
| }, |
| } |
| jCache.On("UnfinishedJobs").Return(jobs, nil) |
| |
| // One task already ran at the newest commit. |
| t3 := &types.Task{ |
| Commits: []string{rs3.Revision, rs2.Revision, rs1.Revision}, |
| Id: "fake-task", |
| TaskKey: types.TaskKey{ |
| Name: "cd-task", |
| RepoState: rs3, |
| }, |
| Status: types.TASK_STATUS_SUCCESS, |
| } |
| tCache := &cache_mocks.TaskCache{} |
| tCache.On("GetTasksByKey", types.TaskKey{ |
| RepoState: rs3, |
| Name: "cd-task", |
| }).Return([]*types.Task{t3}, nil) |
| tCache.On("GetTasksByKey", mock.Anything).Return([]*types.Task{}, nil) |
| tCache.On("GetTaskForCommit", rs1.Repo, rs1.Revision, "cd-task").Return(t3, nil) |
| tCache.On("GetTaskForCommit", rs2.Repo, rs2.Revision, "cd-task").Return(t3, nil) |
| |
| // Set up a time window to include everything. |
| w := window_mocks.AllInclusiveWindow() |
| |
| // Create the scheduler. |
| s := &TaskScheduler{ |
| candidateMetrics: map[string]metrics2.Int64Metric{}, |
| jCache: jCache, |
| repos: repos, |
| taskCfgCache: tcc, |
| tCache: tCache, |
| window: w, |
| } |
| |
| // Regenerate the task queue. We should end up with zero candidates, despite |
| // the fact that we have two unfinished jobs at the older commits. |
| queue, _, err := s.regenerateTaskQueue(ctx) |
| require.NoError(t, err) |
| require.Empty(t, queue) |
| } |
| |
| func TestFilterTaskCandidates_NoCDTasksInRegularPools(t *testing.T) { |
| |
| ctx := context.Background() |
| |
| // Create a single candidate, which is a CD task but requests to be run in a |
| // non-CD pool. |
| k1 := types.TaskKey{ |
| RepoState: rs1, |
| Name: tcc_testutils.BuildTaskName, |
| } |
| candidate := &TaskCandidate{ |
| IsCD: true, |
| TaskKey: k1, |
| TaskSpec: &specs.TaskSpec{ |
| Dimensions: []string{"pool:Skia"}, |
| }, |
| } |
| candidates := map[types.TaskKey]*TaskCandidate{ |
| k1: candidate, |
| } |
| |
| // Set up mocks. |
| tCache := &cache_mocks.TaskCache{} |
| tCache.On("GetTasksByKey", candidate.TaskKey).Return([]*types.Task{}, nil) |
| s := &TaskScheduler{ |
| cdPool: cdPoolName, |
| tCache: tCache, |
| window: window_mocks.AllInclusiveWindow(), |
| } |
| |
| // Ensure that the candidate gets filtered out, and that the diagnostics |
| // tell us why. |
| c, err := s.filterTaskCandidates(ctx, candidates) |
| require.NoError(t, err) |
| require.Empty(t, c) |
| require.Equal(t, "Skia", candidate.GetDiagnostics().Filtering.ForbiddenPool) |
| } |
| |
| func TestFilterTaskCandidates_NoRegularTasksInCDPool(t *testing.T) { |
| |
| ctx := context.Background() |
| |
| // Create a single candidate, which is a non-CD task but requests to be run |
| // in the CD pool. |
| k1 := types.TaskKey{ |
| RepoState: rs1, |
| Name: tcc_testutils.BuildTaskName, |
| } |
| candidate := &TaskCandidate{ |
| TaskKey: k1, |
| TaskSpec: &specs.TaskSpec{ |
| Dimensions: []string{fmt.Sprintf("pool:%s", cdPoolName)}, |
| }, |
| } |
| candidates := map[types.TaskKey]*TaskCandidate{ |
| k1: candidate, |
| } |
| |
| // Set up mocks. |
| tCache := &cache_mocks.TaskCache{} |
| tCache.On("GetTasksByKey", candidate.TaskKey).Return([]*types.Task{}, nil) |
| s := &TaskScheduler{ |
| cdPool: cdPoolName, |
| tCache: tCache, |
| window: window_mocks.AllInclusiveWindow(), |
| } |
| |
| // Ensure that the candidate gets filtered out, and that the diagnostics |
| // tell us why. |
| c, err := s.filterTaskCandidates(ctx, candidates) |
| require.NoError(t, err) |
| require.Empty(t, c) |
| require.Equal(t, cdPoolName, candidate.GetDiagnostics().Filtering.ForbiddenPool) |
| } |
| |
| // newMemRepo is a convenience function which creates a MemGit backed by an |
| // in-memory GitStore and builds a repograph.Graph from it. MemGit automatically |
| // calls Graph.Update() whenever it is mutated. |
| func newMemRepo(t sktest.TestingT) (*mem_git.MemGit, *repograph.Graph) { |
| gs := mem_gitstore.New() |
| gb := mem_git.New(t, gs) |
| ctx := context.Background() |
| ri, err := gitstore.NewGitStoreRepoImpl(ctx, gs) |
| require.NoError(t, err) |
| repo, err := repograph.NewWithRepoImpl(ctx, ri) |
| require.NoError(t, err) |
| gb.AddUpdater(repo) |
| return gb, repo |
| } |