| package main |
| |
| /* |
| Performance test for TaskScheduler. |
| */ |
| |
| import ( |
| "context" |
| "encoding/json" |
| "flag" |
| "fmt" |
| "io" |
| "io/ioutil" |
| "math" |
| "net/http" |
| _ "net/http/pprof" |
| "os" |
| "path" |
| "path/filepath" |
| "reflect" |
| "time" |
| |
| "cloud.google.com/go/datastore" |
| "github.com/davecgh/go-spew/spew" |
| swarming_api "go.chromium.org/luci/common/api/swarming/swarming/v1" |
| "go.skia.org/infra/go/auth" |
| "go.skia.org/infra/go/common" |
| "go.skia.org/infra/go/depot_tools" |
| "go.skia.org/infra/go/exec" |
| "go.skia.org/infra/go/gerrit" |
| "go.skia.org/infra/go/git" |
| "go.skia.org/infra/go/git/repograph" |
| "go.skia.org/infra/go/isolate" |
| "go.skia.org/infra/go/mockhttpclient" |
| "go.skia.org/infra/go/recipe_cfg" |
| "go.skia.org/infra/go/repo_root" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/swarming" |
| "go.skia.org/infra/go/util" |
| "go.skia.org/infra/task_scheduler/go/db/cache" |
| "go.skia.org/infra/task_scheduler/go/db/firestore" |
| "go.skia.org/infra/task_scheduler/go/scheduling" |
| "go.skia.org/infra/task_scheduler/go/specs" |
| "go.skia.org/infra/task_scheduler/go/testutils" |
| "go.skia.org/infra/task_scheduler/go/tryjobs" |
| "go.skia.org/infra/task_scheduler/go/types" |
| "go.skia.org/infra/task_scheduler/go/window" |
| ) |
| |
| var ( |
| fsInstance = flag.String("firestore_instance", "", "Firestore instance to use, eg. \"testing\"") |
| ) |
| |
| func assertNoError(err error) { |
| if err != nil { |
| sklog.Fatalf("Expected no error but got: %s", err.Error()) |
| } |
| } |
| |
| func assertEqual(a, b interface{}) { |
| if a != b { |
| sklog.Fatalf("Expected %v but got %v", a, b) |
| } |
| } |
| |
| func assertDeepEqual(a, b interface{}) { |
| if !reflect.DeepEqual(a, b) { |
| sklog.Fatalf("Objects do not match: \na:\n%s\n\nb:\n%s\n", spew.Sprint(a), spew.Sprint(b)) |
| } |
| } |
| |
| func makeBot(id string, dims map[string]string) *swarming_api.SwarmingRpcsBotInfo { |
| dimensions := make([]*swarming_api.SwarmingRpcsStringListPair, 0, len(dims)) |
| for k, v := range dims { |
| dimensions = append(dimensions, &swarming_api.SwarmingRpcsStringListPair{ |
| Key: k, |
| Value: []string{v}, |
| }) |
| } |
| return &swarming_api.SwarmingRpcsBotInfo{ |
| BotId: id, |
| Dimensions: dimensions, |
| } |
| } |
| |
| var commitDate = time.Unix(1472647568, 0) |
| |
| func commit(ctx context.Context, repoDir, message string) { |
| gitExec, err := git.Executable(ctx) |
| assertNoError(err) |
| assertNoError(exec.Run(ctx, &exec.Command{ |
| Name: gitExec, |
| Args: []string{"commit", "-m", message}, |
| Env: []string{fmt.Sprintf("GIT_AUTHOR_DATE=%d +0000", commitDate.Unix()), fmt.Sprintf("GIT_COMMITTER_DATE=%d +0000", commitDate.Unix())}, |
| InheritPath: true, |
| Dir: repoDir, |
| Verbose: exec.Silent, |
| })) |
| commitDate = commitDate.Add(10 * time.Second) |
| } |
| |
| func makeDummyCommits(ctx context.Context, repoDir string, numCommits int) { |
| gd := git.GitDir(repoDir) |
| _, err := gd.Git(ctx, "checkout", "master") |
| assertNoError(err) |
| dummyFile := path.Join(repoDir, "dummyfile.txt") |
| for i := 0; i < numCommits; i++ { |
| title := fmt.Sprintf("Dummy #%d", i) |
| assertNoError(ioutil.WriteFile(dummyFile, []byte(title), os.ModePerm)) |
| _, err = gd.Git(ctx, "add", dummyFile) |
| assertNoError(err) |
| commit(ctx, repoDir, title) |
| _, err = gd.Git(ctx, "push", "origin", "master") |
| assertNoError(err) |
| } |
| } |
| |
| func run(ctx context.Context, dir string, cmd ...string) { |
| if _, err := exec.RunCwd(ctx, dir, cmd...); err != nil { |
| sklog.Fatal(err) |
| } |
| } |
| |
| func addFile(ctx context.Context, repoDir, subPath, contents string) { |
| assertNoError(ioutil.WriteFile(path.Join(repoDir, subPath), []byte(contents), os.ModePerm)) |
| _, err := git.GitDir(repoDir).Git(ctx, "add", subPath) |
| assertNoError(err) |
| } |
| |
| func main() { |
| common.Init() |
| |
| // Create a repo with lots of commits. |
| workdir, err := ioutil.TempDir("", "") |
| assertNoError(err) |
| defer func() { |
| if err := os.RemoveAll(workdir); err != nil { |
| sklog.Fatal(err) |
| } |
| }() |
| ctx := context.Background() |
| repoName := "skia.git" |
| repoDir := path.Join(workdir, repoName) |
| assertNoError(os.Mkdir(path.Join(workdir, repoName), os.ModePerm)) |
| gd := git.GitDir(repoDir) |
| _, err = gd.Git(ctx, "init") |
| assertNoError(err) |
| _, err = gd.Git(ctx, "remote", "add", "origin", ".") |
| assertNoError(err) |
| |
| // Write some files. |
| assertNoError(ioutil.WriteFile(path.Join(workdir, ".gclient"), []byte("dummy"), os.ModePerm)) |
| addFile(ctx, repoDir, "a.txt", "dummy2") |
| addFile(ctx, repoDir, "somefile.txt", "dummy3") |
| infraBotsSubDir := path.Join("infra", "bots") |
| infraBotsDir := path.Join(repoDir, infraBotsSubDir) |
| assertNoError(os.MkdirAll(infraBotsDir, os.ModePerm)) |
| addFile(ctx, repoDir, path.Join(infraBotsSubDir, "compile_skia.isolate"), `{ |
| 'includes': [ |
| 'swarm_recipe.isolate', |
| ], |
| 'variables': { |
| 'files': [ |
| '../../../.gclient', |
| ], |
| }, |
| }`) |
| addFile(ctx, repoDir, path.Join(infraBotsSubDir, "perf_skia.isolate"), `{ |
| 'includes': [ |
| 'swarm_recipe.isolate', |
| ], |
| 'variables': { |
| 'files': [ |
| '../../../.gclient', |
| ], |
| }, |
| }`) |
| addFile(ctx, repoDir, path.Join(infraBotsSubDir, "test_skia.isolate"), `{ |
| 'includes': [ |
| 'swarm_recipe.isolate', |
| ], |
| 'variables': { |
| 'files': [ |
| '../../../.gclient', |
| ], |
| }, |
| }`) |
| addFile(ctx, repoDir, path.Join(infraBotsSubDir, "swarm_recipe.isolate"), `{ |
| 'variables': { |
| 'command': [ |
| 'python', 'recipes.py', 'run', |
| ], |
| 'files': [ |
| '../../somefile.txt', |
| ], |
| }, |
| }`) |
| |
| // Add tasks to the repo. |
| var tasks = map[string]*specs.TaskSpec{ |
| "Build-Ubuntu-GCC-Arm7-Release-Android": { |
| CipdPackages: []*specs.CipdPackage{}, |
| Dependencies: []string{}, |
| Dimensions: []string{"pool:Skia", "os:Ubuntu"}, |
| Isolate: "compile_skia.isolate", |
| Priority: 0.9, |
| }, |
| "Test-Android-GCC-Nexus7-GPU-Tegra3-Arm7-Release": { |
| CipdPackages: []*specs.CipdPackage{}, |
| Dependencies: []string{"Build-Ubuntu-GCC-Arm7-Release-Android"}, |
| Dimensions: []string{"pool:Skia", "os:Android", "device_type:grouper"}, |
| Isolate: "test_skia.isolate", |
| Priority: 0.9, |
| }, |
| "Perf-Android-GCC-Nexus7-GPU-Tegra3-Arm7-Release": { |
| CipdPackages: []*specs.CipdPackage{}, |
| Dependencies: []string{"Build-Ubuntu-GCC-Arm7-Release-Android"}, |
| Dimensions: []string{"pool:Skia", "os:Android", "device_type:grouper"}, |
| Isolate: "perf_skia.isolate", |
| Priority: 0.9, |
| }, |
| } |
| moarTasks := map[string]*specs.TaskSpec{} |
| jobs := map[string]*specs.JobSpec{} |
| for name, task := range tasks { |
| for i := 0; i < 100; i++ { |
| newName := fmt.Sprintf("%s%d", name, i) |
| deps := make([]string, 0, len(task.Dependencies)) |
| for _, d := range task.Dependencies { |
| deps = append(deps, fmt.Sprintf("%s%d", d, i)) |
| } |
| newTask := &specs.TaskSpec{ |
| CipdPackages: task.CipdPackages, |
| Dependencies: deps, |
| Dimensions: task.Dimensions, |
| Isolate: task.Isolate, |
| Priority: task.Priority, |
| } |
| moarTasks[newName] = newTask |
| jobs[newName] = &specs.JobSpec{ |
| Priority: task.Priority, |
| TaskSpecs: []string{newName}, |
| } |
| } |
| } |
| cfg := specs.TasksCfg{ |
| Tasks: moarTasks, |
| Jobs: jobs, |
| } |
| assertNoError(util.WithWriteFile(path.Join(repoDir, specs.TASKS_CFG_FILE), func(w io.Writer) error { |
| return json.NewEncoder(w).Encode(&cfg) |
| })) |
| _, err = gd.Git(ctx, "add", specs.TASKS_CFG_FILE) |
| assertNoError(err) |
| commit(ctx, repoDir, "Add more tasks!") |
| _, err = gd.Git(ctx, "push", "origin", "master") |
| assertNoError(err) |
| _, err = gd.Git(ctx, "branch", "-u", "origin/master") |
| assertNoError(err) |
| |
| // Create a bunch of bots. |
| bots := make([]*swarming_api.SwarmingRpcsBotInfo, 100) |
| for idx := range bots { |
| dims := map[string]string{ |
| "pool": "Skia", |
| } |
| if idx >= 50 { |
| dims["os"] = "Ubuntu" |
| } else { |
| dims["os"] = "Android" |
| dims["device_type"] = "grouper" |
| } |
| bots[idx] = makeBot(fmt.Sprintf("bot%d", idx), dims) |
| } |
| |
| // Create the task scheduler. |
| repo, err := repograph.NewLocalGraph(ctx, repoName, workdir) |
| assertNoError(err) |
| assertNoError(repo.Update(ctx)) |
| headCommit := repo.Get("master") |
| if headCommit == nil { |
| sklog.Fatal("Could not find HEAD of master.") |
| } |
| head := headCommit.Hash |
| |
| commits, err := repo.Get(head).AllCommits() |
| assertNoError(err) |
| assertDeepEqual([]string{head}, commits) |
| |
| ts, err := auth.NewDefaultTokenSource(true, datastore.ScopeDatastore) |
| d, err := firestore.NewDBWithParams(ctx, firestore.FIRESTORE_PROJECT, *fsInstance, ts) |
| assertNoError(err) |
| w, err := window.New(time.Hour, 0, nil) |
| assertNoError(err) |
| tCache, err := cache.NewTaskCache(ctx, d, w, nil) |
| assertNoError(err) |
| jCache, err := cache.NewJobCache(ctx, d, w, nil) |
| assertNoError(err) |
| |
| isolateClient, err := isolate.NewClient(workdir, isolate.ISOLATE_SERVER_URL_FAKE) |
| assertNoError(err) |
| swarmingClient := testutils.NewTestClient() |
| |
| // This is okay since this only runs locally. |
| root, err := repo_root.Get() |
| assertNoError(err) |
| recipesCfgFile := filepath.Join(root, recipe_cfg.RECIPE_CFG_PATH) |
| depotTools, err := depot_tools.Sync(ctx, workdir, recipesCfgFile) |
| assertNoError(err) |
| urlMock := mockhttpclient.NewURLMock() |
| gitcookies := path.Join(workdir, "gitcookies_fake") |
| assertNoError(ioutil.WriteFile(gitcookies, []byte(".googlesource.com\tTRUE\t/\tTRUE\t123\to\tgit-user.google.com=abc123"), os.ModePerm)) |
| g, err := gerrit.NewGerrit("https://fake-skia-review.googlesource.com", gitcookies, urlMock.Client()) |
| assertNoError(err) |
| s, err := scheduling.NewTaskScheduler(ctx, d, nil, time.Duration(math.MaxInt64), 0, workdir, "fake.server", repograph.Map{repoName: repo}, isolateClient, swarmingClient, http.DefaultClient, 0.9, tryjobs.API_URL_TESTING, tryjobs.BUCKET_TESTING, map[string]string{"skia": repoName}, swarming.POOLS_PUBLIC, "", depotTools, g, "test-project", "test-instance", nil, nil, "") |
| assertNoError(err) |
| |
| runTasks := func(bots []*swarming_api.SwarmingRpcsBotInfo) { |
| swarmingClient.MockBots(bots) |
| assertNoError(s.MainLoop(ctx)) |
| assertNoError(w.Update()) |
| assertNoError(tCache.Update()) |
| tasks, err := tCache.GetTasksForCommits(repoName, commits) |
| assertNoError(err) |
| newTasks := map[string]*types.Task{} |
| for _, v := range tasks { |
| for _, task := range v { |
| if task.Status == types.TASK_STATUS_PENDING { |
| if _, ok := newTasks[task.Id]; !ok { |
| newTasks[task.Id] = task |
| } |
| } |
| } |
| } |
| insert := make([]*types.Task, 0, len(newTasks)) |
| for _, task := range newTasks { |
| task.Status = types.TASK_STATUS_SUCCESS |
| task.Finished = time.Now() |
| task.IsolatedOutput = "abc123" |
| insert = append(insert, task) |
| } |
| assertNoError(d.PutTasks(insert)) |
| assertNoError(tCache.Update()) |
| assertNoError(jCache.Update()) |
| } |
| |
| // Consume all tasks. |
| for { |
| runTasks(bots) |
| unfinished, err := jCache.UnfinishedJobs() |
| assertNoError(err) |
| sklog.Infof("Found %d unfinished jobs.", len(unfinished)) |
| if len(unfinished) == 0 { |
| tasks, err := tCache.GetTasksForCommits(repoName, commits) |
| assertNoError(err) |
| assertEqual(s.QueueLen(), 0) |
| assertEqual(len(moarTasks), len(tasks[head])) |
| break |
| } |
| } |
| |
| // Add more commits to the repo. |
| makeDummyCommits(ctx, repoDir, 200) |
| commits, err = repo.RevList(head, "master") |
| assertNoError(err) |
| |
| // Start the profiler. |
| go func() { |
| sklog.Fatal(http.ListenAndServe("localhost:6060", nil)) |
| }() |
| |
| // Actually run the test. |
| i := 0 |
| for ; ; i++ { |
| runTasks(bots) |
| if s.QueueLen() == 0 { |
| break |
| } |
| } |
| sklog.Infof("Finished in %d iterations.", i) |
| } |