| package main |
| |
| /* |
| Performance test for TaskScheduler. |
| */ |
| |
| import ( |
| "context" |
| "encoding/json" |
| "flag" |
| "fmt" |
| "io" |
| "math" |
| "net/http" |
| "os" |
| "path" |
| "path/filepath" |
| "runtime/pprof" |
| "sort" |
| "strings" |
| "time" |
| |
| "cloud.google.com/go/datastore" |
| "github.com/google/uuid" |
| apipb "go.chromium.org/luci/swarming/proto/api_v2" |
| "go.skia.org/infra/go/bt" |
| "go.skia.org/infra/go/cas/rbe" |
| "go.skia.org/infra/go/common" |
| "go.skia.org/infra/go/deepequal/assertdeep" |
| "go.skia.org/infra/go/depot_tools" |
| "go.skia.org/infra/go/exec" |
| "go.skia.org/infra/go/git" |
| "go.skia.org/infra/go/git/repograph" |
| "go.skia.org/infra/go/httputils" |
| "go.skia.org/infra/go/now" |
| pubsub_mocks "go.skia.org/infra/go/pubsub/mocks" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/swarming" |
| "go.skia.org/infra/go/util" |
| "go.skia.org/infra/task_scheduler/go/db/cache" |
| "go.skia.org/infra/task_scheduler/go/db/firestore" |
| "go.skia.org/infra/task_scheduler/go/job_creation" |
| "go.skia.org/infra/task_scheduler/go/scheduling" |
| "go.skia.org/infra/task_scheduler/go/specs" |
| "go.skia.org/infra/task_scheduler/go/syncer" |
| "go.skia.org/infra/task_scheduler/go/task_cfg_cache" |
| tcc_testutils "go.skia.org/infra/task_scheduler/go/task_cfg_cache/testutils" |
| swarming_task_execution "go.skia.org/infra/task_scheduler/go/task_execution/swarmingv2" |
| "go.skia.org/infra/task_scheduler/go/testutils" |
| "go.skia.org/infra/task_scheduler/go/types" |
| "go.skia.org/infra/task_scheduler/go/window" |
| "golang.org/x/oauth2/google" |
| ) |
| |
| const ( |
| rbeInstance = "projects/chromium-swarm-dev/instances/default_instance" |
| ) |
| |
| var ( |
| cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") |
| tasksPerCommit = flag.Int("tasks_per_commit", 300, "Number of tasks defined per commit.") |
| numCommits = flag.Int("num_commits", 200, "Number of commits.") |
| maxRounds = flag.Int("max_cycles", 0, "Stop after this many scheduling cycles.") |
| saveQueue = flag.String("save_queue", "", "If set, dump the task candidate queue for every round of scheduling into this file.") |
| checkQueue = flag.String("check_queue", "", "If set, compare the task candidate queue at every round of scheduling to that contained in this file.") |
| ) |
| |
| func assertNoError(err error) { |
| if err != nil { |
| sklog.Fatalf("Expected no error but got: %s", err.Error()) |
| } |
| } |
| |
| func assertEqual(a, b interface{}) { |
| if a != b { |
| sklog.Fatalf("Expected %v but got %v", a, b) |
| } |
| } |
| |
| func makeBot(id string, dims map[string]string) *apipb.BotInfo { |
| dimensions := make([]*apipb.StringListPair, 0, len(dims)) |
| for k, v := range dims { |
| dimensions = append(dimensions, &apipb.StringListPair{ |
| Key: k, |
| Value: []string{v}, |
| }) |
| } |
| return &apipb.BotInfo{ |
| BotId: id, |
| Dimensions: dimensions, |
| } |
| } |
| |
| var commitDate = time.Unix(1472647568, 0) |
| |
| func commit(ctx context.Context, repoDir, message string) { |
| gitExec, err := git.Executable(ctx) |
| assertNoError(err) |
| assertNoError(exec.Run(ctx, &exec.Command{ |
| Name: gitExec, |
| Args: []string{"commit", "-m", message}, |
| Env: []string{fmt.Sprintf("GIT_AUTHOR_DATE=%d +0000", commitDate.Unix()), fmt.Sprintf("GIT_COMMITTER_DATE=%d +0000", commitDate.Unix())}, |
| InheritPath: true, |
| Dir: repoDir, |
| Verbose: exec.Silent, |
| })) |
| commitDate = commitDate.Add(10 * time.Second) |
| } |
| |
| func makeCommits(ctx context.Context, repoDir string, numCommits int) { |
| gd := git.CheckoutDir(repoDir) |
| _, err := gd.Git(ctx, "checkout", git.MainBranch) |
| assertNoError(err) |
| fakeFile := path.Join(repoDir, "fakefile.txt") |
| for i := 0; i < numCommits; i++ { |
| title := fmt.Sprintf("Fake #%d", i) |
| assertNoError(os.WriteFile(fakeFile, []byte(title), os.ModePerm)) |
| _, err = gd.Git(ctx, "add", fakeFile) |
| assertNoError(err) |
| commit(ctx, repoDir, title) |
| _, err = gd.Git(ctx, "push", git.DefaultRemote, git.MainBranch) |
| assertNoError(err) |
| } |
| } |
| |
| func addFile(ctx context.Context, repoDir, subPath, contents string) { |
| assertNoError(os.WriteFile(path.Join(repoDir, subPath), []byte(contents), os.ModePerm)) |
| _, err := git.CheckoutDir(repoDir).Git(ctx, "add", subPath) |
| assertNoError(err) |
| } |
| |
| // waitForNewJobs waits for new jobs to appear in the DB and cache after new |
| // commits have landed. |
| func waitForNewJobs(ctx context.Context, repos repograph.Map, jc *job_creation.JobCreator, jCache cache.JobCache, expectJobs int) { |
| for repoDir, repo := range repos { |
| assertNoError(repo.Update(ctx)) |
| assertNoError(jc.HandleRepoUpdate(ctx, repoDir, repo, func() {}, func() {})) |
| } |
| sklog.Infof("Waiting for QuerySnapshotIterator...") |
| for { |
| time.Sleep(500 * time.Millisecond) |
| assertNoError(jCache.Update(ctx)) |
| unfinished, err := jCache.InProgressJobs() |
| assertNoError(err) |
| if len(unfinished) == expectJobs { |
| break |
| } |
| } |
| } |
| |
| func main() { |
| common.Init() |
| |
| // Create a repo with one commit. |
| workdir, err := os.MkdirTemp("", "") |
| assertNoError(err) |
| defer func() { |
| if err := os.RemoveAll(workdir); err != nil { |
| sklog.Fatal(err) |
| } |
| }() |
| ctx := now.TimeTravelingContext(commitDate.Add(24 * time.Hour)) |
| repoName := "skia.git" |
| repoDir := filepath.Join(workdir, repoName) |
| assertNoError(os.Mkdir(repoDir, os.ModePerm)) |
| gd := git.CheckoutDir(repoDir) |
| _, err = gd.Git(ctx, "init") |
| assertNoError(err) |
| // This sets the remote repo to be the repo itself, which prevents us |
| // needing to have a separate remote repo, either locally or on a server |
| // somewhere. |
| _, err = gd.Git(ctx, "remote", "add", git.DefaultRemote, ".") |
| assertNoError(err) |
| |
| // Write some files. |
| assertNoError(os.WriteFile(path.Join(workdir, ".gclient"), []byte("placeholder"), os.ModePerm)) |
| addFile(ctx, repoDir, "a.txt", "placeholder2") |
| addFile(ctx, repoDir, "somefile.txt", "placeholder3") |
| infraBotsSubDir := path.Join("infra", "bots") |
| infraBotsDir := path.Join(repoDir, infraBotsSubDir) |
| assertNoError(os.MkdirAll(infraBotsDir, os.ModePerm)) |
| |
| // CAS inputs. |
| casSpecs := map[string]*specs.CasSpec{ |
| "compile": { |
| Root: ".", |
| Paths: []string{"somefile.txt"}, |
| }, |
| "perf": { |
| Root: ".", |
| Paths: []string{"somefile.txt"}, |
| }, |
| "test": { |
| Root: ".", |
| Paths: []string{"somefile.txt"}, |
| }, |
| } |
| |
| // Add tasks to the repo. |
| taskNames := []string{ |
| tcc_testutils.BuildTaskName, |
| tcc_testutils.TestTaskName, |
| tcc_testutils.PerfTaskName, |
| } |
| taskTypes := []*specs.TaskSpec{ |
| { |
| CasSpec: "compile", |
| CipdPackages: []*specs.CipdPackage{}, |
| Dependencies: []string{}, |
| Dimensions: []string{"pool:Skia", "os:Ubuntu"}, |
| Priority: 1.0, |
| }, |
| { |
| CasSpec: "test", |
| CipdPackages: []*specs.CipdPackage{}, |
| Dependencies: []string{tcc_testutils.BuildTaskName}, |
| Dimensions: []string{"pool:Skia", "os:Android", "device_type:grouper"}, |
| Priority: 0.7, |
| }, |
| { |
| CasSpec: "perf", |
| CipdPackages: []*specs.CipdPackage{}, |
| Dependencies: []string{tcc_testutils.BuildTaskName}, |
| Dimensions: []string{"pool:Skia", "os:Android", "device_type:grouper"}, |
| Priority: 0.5, |
| }, |
| } |
| // Add the requested number of tasks to the TasksCfg, cycling through Build, |
| // Test, and Perf tasks to keep things interesting. |
| moarTasks := map[string]*specs.TaskSpec{} |
| jobs := map[string]*specs.JobSpec{} |
| taskCycleIndex := -1 |
| for numTasks := 0; numTasks < *tasksPerCommit; numTasks++ { |
| taskType := numTasks % 3 |
| if taskType == 0 { |
| taskCycleIndex++ |
| } |
| name := taskNames[taskType] |
| task := taskTypes[taskType] |
| newName := fmt.Sprintf("%s%d", name, taskCycleIndex) |
| deps := make([]string, 0, len(task.Dependencies)) |
| for _, d := range task.Dependencies { |
| deps = append(deps, fmt.Sprintf("%s%d", d, taskCycleIndex)) |
| } |
| priority := task.Priority * math.Pow(0.99999999, float64(numTasks)) |
| newTask := &specs.TaskSpec{ |
| CasSpec: task.CasSpec, |
| CipdPackages: task.CipdPackages, |
| Dependencies: deps, |
| Dimensions: task.Dimensions, |
| Priority: priority, |
| } |
| moarTasks[newName] = newTask |
| jobs[newName] = &specs.JobSpec{ |
| Priority: priority, |
| TaskSpecs: []string{newName}, |
| } |
| } |
| cfg := specs.TasksCfg{ |
| CasSpecs: casSpecs, |
| Tasks: moarTasks, |
| Jobs: jobs, |
| } |
| assertNoError(util.WithWriteFile(path.Join(repoDir, specs.TASKS_CFG_FILE), func(w io.Writer) error { |
| return json.NewEncoder(w).Encode(&cfg) |
| })) |
| _, err = gd.Git(ctx, "add", specs.TASKS_CFG_FILE) |
| assertNoError(err) |
| commit(ctx, repoDir, "Add more tasks!") |
| _, err = gd.Git(ctx, "push", git.DefaultRemote, git.MainBranch) |
| assertNoError(err) |
| _, err = gd.Git(ctx, "branch", "-u", git.DefaultRemote+"/"+git.MainBranch) |
| assertNoError(err) |
| |
| // Create a bunch of bots. |
| bots := make([]*apipb.BotInfo, 100) |
| for idx := range bots { |
| dims := map[string]string{ |
| "pool": "Skia", |
| } |
| if idx >= 50 { |
| dims["os"] = "Ubuntu" |
| } else { |
| dims["os"] = "Android" |
| dims["device_type"] = "grouper" |
| } |
| bots[idx] = makeBot(fmt.Sprintf("bot%d", idx), dims) |
| } |
| |
| // Create the task scheduler. |
| repo, err := repograph.NewLocalGraph(ctx, repoDir, workdir) |
| assertNoError(err) |
| assertNoError(repo.Update(ctx)) |
| headCommit := repo.Get(git.MainBranch) |
| if headCommit == nil { |
| sklog.Fatalf("Could not find HEAD of %s.", git.MainBranch) |
| } |
| head := headCommit.Hash |
| |
| commits, err := repo.Get(head).AllCommits() |
| assertNoError(err) |
| assertEqual(1, len(commits)) |
| assertEqual(head, commits[0]) |
| |
| ts, err := google.DefaultTokenSource(ctx, datastore.ScopeDatastore) |
| assertNoError(err) |
| fsInstance := uuid.New().String() |
| d, err := firestore.NewDBWithParams(ctx, firestore.FIRESTORE_PROJECT, fsInstance, ts) |
| assertNoError(err) |
| windowPeriod := time.Duration(math.MaxInt64) |
| w, err := window.New(ctx, windowPeriod, 0, nil) |
| assertNoError(err) |
| tCache, err := cache.NewTaskCache(ctx, d, w, nil) |
| assertNoError(err) |
| jCache, err := cache.NewJobCache(ctx, d, w, nil) |
| assertNoError(err) |
| |
| swarmingClient := testutils.NewTestClient() |
| |
| repos := repograph.Map{repoDir: repo} |
| |
| btProject := "test-project" |
| btInstance := "test-instance" |
| assertNoError(bt.InitBigtable(btProject, btInstance, task_cfg_cache.BT_TABLE, []string{task_cfg_cache.BT_COLUMN_FAMILY})) |
| defer func() { |
| assertNoError(bt.DeleteTables(btProject, btInstance, task_cfg_cache.BT_TABLE)) |
| }() |
| taskCfgCache, err := task_cfg_cache.NewTaskCfgCache(ctx, repos, btProject, btInstance, nil) |
| if err != nil { |
| sklog.Fatalf("Failed to create TaskCfgCache: %s", err) |
| } |
| cas, err := rbe.NewClient(ctx, rbeInstance, ts) |
| assertNoError(err) |
| swarmingTaskExec := swarming_task_execution.NewSwarmingV2TaskExecutor(swarmingClient, rbeInstance, "") |
| taskExecs := map[string]types.TaskExecutor{ |
| types.TaskExecutor_UseDefault: swarmingTaskExec, |
| types.TaskExecutor_Swarming: swarmingTaskExec, |
| } |
| s, err := scheduling.NewTaskScheduler(ctx, d, nil, windowPeriod, 0, repos, cas, rbeInstance, taskExecs, http.DefaultClient, 0.99999, swarming.POOLS_PUBLIC, "", taskCfgCache, nil, nil, "", scheduling.BusyBotsDebugLoggingOff) |
| assertNoError(err) |
| |
| client := httputils.DefaultClientConfig().WithTokenSource(ts).Client() |
| |
| depotTools, err := depot_tools.GetDepotTools(ctx, workdir) |
| assertNoError(err) |
| pubsubClient := &pubsub_mocks.Client{} |
| jc, err := job_creation.NewJobCreator(ctx, d, windowPeriod, 0, workdir, "localhost", repos, cas, client, "skia", "fake-bb-target", "fake-bb-bucket", nil, depotTools, nil, taskCfgCache, pubsubClient, syncer.DefaultNumWorkers) |
| assertNoError(err) |
| |
| // Wait for job-creator to process the jobs from the repo. |
| waitForNewJobs(ctx, repos, jc, jCache, *tasksPerCommit) |
| |
| runTasks := func(bots []*apipb.BotInfo) []*types.Task { |
| swarmingClient.MockBots(bots) |
| assertNoError(s.MainLoop(ctx)) |
| time.Sleep(5 * time.Second) // Wait for tasks to appear in the cache. TODO: no! |
| ctx.SetTime(now.Now(ctx).Add(10 * time.Second)) |
| assertNoError(w.Update(ctx)) |
| assertNoError(tCache.Update(ctx)) |
| tasks, err := tCache.GetTasksForCommits(repoDir, commits) |
| assertNoError(err) |
| newTasks := map[string]*types.Task{} |
| for _, v := range tasks { |
| for _, task := range v { |
| if task.Status == types.TASK_STATUS_PENDING { |
| if _, ok := newTasks[task.Id]; !ok { |
| newTasks[task.Id] = task |
| } |
| } |
| } |
| } |
| insert := make([]*types.Task, 0, len(newTasks)) |
| for _, task := range newTasks { |
| task.Status = types.TASK_STATUS_SUCCESS |
| task.Finished = now.Now(ctx) |
| task.IsolatedOutput = rbe.EmptyDigest |
| insert = append(insert, task) |
| } |
| assertNoError(d.PutTasks(ctx, insert)) |
| assertNoError(tCache.Update(ctx)) |
| assertNoError(jCache.Update(ctx)) |
| return insert |
| } |
| |
| assertNoError(jCache.Update(ctx)) |
| allJobs, err := jCache.GetJobsFromDateRange(time.Time{}, now.Now(ctx).Add(24*time.Hour)) |
| assertNoError(err) |
| sklog.Infof("Found %d total jobs", len(allJobs)) |
| assertEqual(*tasksPerCommit, len(allJobs)) |
| |
| // Consume all tasks. |
| for { |
| sklog.Infof("Running all tasks...") |
| runTasks(bots) |
| unfinished, err := jCache.InProgressJobs() |
| assertNoError(err) |
| sklog.Infof("Found %d unfinished jobs.", len(unfinished)) |
| if len(unfinished) == 0 { |
| tasks, err := tCache.GetTasksForCommits(repoDir, commits) |
| assertNoError(err) |
| assertEqual(s.QueueLen(), 0) |
| assertEqual(len(moarTasks), len(tasks[head])) |
| break |
| } |
| } |
| sklog.Infof("Done consuming initial set of jobs.") |
| |
| // Add more commits to the repo. |
| ctx.SetTime(now.Now(ctx).Add(10 * time.Second)) |
| makeCommits(ctx, repoDir, *numCommits) |
| waitForNewJobs(ctx, repos, jc, jCache, (*numCommits)*(*tasksPerCommit)) |
| commits, err = repo.RevList(head, git.MainBranch) |
| assertNoError(err) |
| |
| // If checking the queue against a previous run, load the data now. |
| type schedulingRoundInfo struct { |
| Queue []*scheduling.TaskCandidate |
| Tasks map[string]map[string]*types.Task |
| } |
| var checkSchedulingData []*schedulingRoundInfo |
| if *checkQueue != "" { |
| assertNoError(util.WithReadFile(*checkQueue, func(f io.Reader) error { |
| return json.NewDecoder(f).Decode(&checkSchedulingData) |
| })) |
| } |
| |
| // Actually run the test. |
| sklog.Infof("Starting test...") |
| var queues [][]*scheduling.TaskCandidate |
| var taskLists [][]*types.Task |
| if *cpuprofile != "" { |
| f, err := os.Create(*cpuprofile) |
| assertNoError(err) |
| assertNoError(pprof.StartCPUProfile(f)) |
| defer pprof.StopCPUProfile() |
| } |
| schedulingRounds := 1 |
| started := time.Now() |
| for ; *maxRounds > 0 && schedulingRounds <= *maxRounds; schedulingRounds++ { |
| tasks := runTasks(bots) |
| if *saveQueue != "" || *checkQueue != "" { |
| queues = append(queues, s.CloneQueue()) |
| taskLists = append(taskLists, tasks) |
| } |
| if s.QueueLen() == 0 { |
| break |
| } |
| } |
| elapsed := time.Now().Sub(started) |
| |
| // Sanitize the scheduling data so that we can serialize it and compare it |
| // later. |
| if *saveQueue != "" || *checkQueue != "" { |
| schedulingData := make([]*schedulingRoundInfo, 0, len(queues)) |
| for idx, queue := range queues { |
| tasksByCommit := taskLists[idx] |
| for _, candidate := range queue { |
| for _, job := range candidate.Jobs { |
| job.Id = "fake-job-id" |
| job.Repo = "fake-repo.git" |
| for _, deps := range job.Dependencies { |
| sort.Strings(deps) |
| } |
| for _, taskSummaries := range job.Tasks { |
| sort.Slice(taskSummaries, func(i, j int) bool { |
| return taskSummaries[i].Attempt < taskSummaries[j].Attempt |
| }) |
| for _, ts := range taskSummaries { |
| ts.Id = "fake-task-summary-id" |
| ts.SwarmingTaskId = "fake-swarming-task" |
| } |
| } |
| } |
| sort.Slice(candidate.Jobs, func(i, j int) bool { |
| return candidate.Jobs[i].Name < candidate.Jobs[j].Name |
| }) |
| sort.Strings(candidate.Commits) |
| sort.Strings(candidate.CasDigests) |
| candidate.Repo = "fake-repo.git" |
| for idx := range candidate.ParentTaskIds { |
| candidate.ParentTaskIds[idx] = "fake-parent-id" |
| } |
| if candidate.StealingFromId != "" { |
| if strings.HasPrefix(candidate.StealingFromId, "taskCandidate") { |
| candidate.StealingFromId = "fake-candidate" |
| } else { |
| candidate.StealingFromId = "fake-task" |
| } |
| } |
| } |
| // Sanitize timestamps and randomly-generated IDs from the data. |
| tasks := make(map[string]map[string]*types.Task, len(commits)) |
| for _, task := range tasksByCommit { |
| sort.Strings(task.Commits) |
| task.Id = "fake-task-id" |
| task.Repo = "fake-repo.git" |
| task.SwarmingTaskId = "fake" |
| for idx := range task.Jobs { |
| task.Jobs[idx] = "fake-job" |
| } |
| if _, ok := tasks[task.Revision]; !ok { |
| tasks[task.Revision] = map[string]*types.Task{} |
| } |
| tasks[task.Revision][task.Name] = task |
| } |
| info := &schedulingRoundInfo{ |
| Queue: queue, |
| Tasks: tasks, |
| } |
| schedulingData = append(schedulingData, info) |
| } |
| |
| // Save and/or compare the scheduling data, as requested. |
| if *saveQueue != "" { |
| assertNoError(util.WithWriteFile(*saveQueue, func(w io.Writer) error { |
| enc := json.NewEncoder(w) |
| enc.SetIndent("", " ") |
| return enc.Encode(schedulingData) |
| })) |
| } |
| if *checkQueue != "" { |
| if len(checkSchedulingData) < len(schedulingData) { |
| sklog.Fatalf("Not enough scheduling rounds in %s; have %d but needed %d", *checkQueue, len(schedulingData)) |
| } |
| for idx, info := range schedulingData { |
| diff := assertdeep.Diff(checkSchedulingData[idx], info) |
| if diff != "" { |
| sklog.Fatal(diff) |
| } |
| } |
| } |
| } |
| sklog.Infof("Finished %d scheduling cycles in %s.", schedulingRounds, elapsed) |
| } |