[task scheduler] Start jobs in parallel

This should speed things up substantially. We still process jobs which
have the same RepoState (same issue+patchset+commit) sequentially, since
the cache will prevent parallelizing those anyway.

Bug: b/339625138
Change-Id: Ida88a4f0a96165656c9ed8fd817adba382e2289f
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/875080
Auto-Submit: Eric Boren <borenet@google.com>
Reviewed-by: Ravi Mistry <rmistry@google.com>
Commit-Queue: Ravi Mistry <rmistry@google.com>
diff --git a/task_scheduler/go/tryjobs/BUILD.bazel b/task_scheduler/go/tryjobs/BUILD.bazel
index 70a5432..4bdae9d 100644
--- a/task_scheduler/go/tryjobs/BUILD.bazel
+++ b/task_scheduler/go/tryjobs/BUILD.bazel
@@ -11,7 +11,6 @@
         "//go/cleanup",
         "//go/gerrit",
         "//go/git/repograph",
-        "//go/human",
         "//go/metrics2",
         "//go/now",
         "//go/pubsub",
diff --git a/task_scheduler/go/tryjobs/tryjobs.go b/task_scheduler/go/tryjobs/tryjobs.go
index 7d280fe..26f5bfd 100644
--- a/task_scheduler/go/tryjobs/tryjobs.go
+++ b/task_scheduler/go/tryjobs/tryjobs.go
@@ -18,7 +18,6 @@
 	"go.skia.org/infra/go/cleanup"
 	"go.skia.org/infra/go/gerrit"
 	"go.skia.org/infra/go/git/repograph"
-	"go.skia.org/infra/go/human"
 	"go.skia.org/infra/go/metrics2"
 	"go.skia.org/infra/go/now"
 	"go.skia.org/infra/go/pubsub"
@@ -361,35 +360,14 @@
 	for {
 		select {
 		case jobs := <-jobsCh:
-			sklog.Infof("Start processing jobs from modified jobs channel.")
-			for _, job := range jobs {
-				sklog.Infof("Found job %s (build %d) via modified jobs channel", job.Id, job.BuildbucketBuildId)
-			}
-			for _, job := range jobs {
-				if job.Status != types.JOB_STATUS_REQUESTED {
-					continue
-				}
-				if err := t.startJob(ctx, job); err != nil {
-					sklog.Errorf("failed to start job %s (build %d): %s", job.Id, job.BuildbucketBuildId, err)
-				}
-			}
-			sklog.Infof("Done processing jobs from modified jobs channel.")
+			t.startJobs(ctx, jobs, "modified jobs channel")
 		case <-tickCh:
-			sklog.Infof("Start processing jobs from periodic DB poll, cache updated %s ago.", human.Duration(time.Now().Sub(t.jCache.LastUpdated())))
 			jobs, err := t.jCache.RequestedJobs()
 			if err != nil {
 				sklog.Errorf("failed retrieving Jobs: %s", err)
 			} else {
-				for _, job := range jobs {
-					sklog.Infof("Found job %s (build %d) via periodic DB poll", job.Id, job.BuildbucketBuildId)
-				}
-				for _, job := range jobs {
-					if err := t.startJob(ctx, job); err != nil {
-						sklog.Errorf("failed to start job %s (build %d): %s", job.Id, job.BuildbucketBuildId, err)
-					}
-				}
+				t.startJobs(ctx, jobs, "periodic DB poll")
 			}
-			sklog.Infof("Done processing jobs from periodic DB poll.")
 		case <-doneCh:
 			ticker.Stop()
 			return
@@ -397,6 +375,32 @@
 	}
 }
 
+func (t *TryJobIntegrator) startJobs(ctx context.Context, jobs []*types.Job, foundVia string) {
+	sklog.Infof("Start processing jobs from %s.", foundVia)
+	// Organize jobs by RepoState, since jobs with the same RepoState must be
+	// processed serially.
+	byRepoState := map[types.RepoState][]*types.Job{}
+	for _, job := range jobs {
+		sklog.Infof("Found job %s (build %d) via %s", job.Id, job.BuildbucketBuildId, foundVia)
+		byRepoState[job.RepoState] = append(byRepoState[job.RepoState], job)
+	}
+	var wg sync.WaitGroup
+	for _, jobs := range byRepoState {
+		wg.Add(1)
+		jobs := jobs // https://golang.org/doc/faq#closures_and_goroutines
+		go func() {
+			defer wg.Done()
+			for _, job := range jobs {
+				if err := t.startJob(ctx, job); err != nil {
+					sklog.Errorf("failed to start job %s (build %d): %s", job.Id, job.BuildbucketBuildId, err)
+				}
+			}
+		}()
+	}
+	wg.Wait()
+	sklog.Infof("Done processing jobs from %s.", foundVia)
+}
+
 func isBuildAlreadyStartedError(err error) bool {
 	return err != nil && strings.Contains(err.Error(), buildAlreadyStartedErr)
 }