[task scheduler] Start jobs in parallel
This should speed things up substantially. We still process jobs which
have the same RepoState (same issue+patchset+commit) sequentially, since
the cache will prevent parallelizing those anyway.
Bug: b/339625138
Change-Id: Ida88a4f0a96165656c9ed8fd817adba382e2289f
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/875080
Auto-Submit: Eric Boren <borenet@google.com>
Reviewed-by: Ravi Mistry <rmistry@google.com>
Commit-Queue: Ravi Mistry <rmistry@google.com>
diff --git a/task_scheduler/go/tryjobs/BUILD.bazel b/task_scheduler/go/tryjobs/BUILD.bazel
index 70a5432..4bdae9d 100644
--- a/task_scheduler/go/tryjobs/BUILD.bazel
+++ b/task_scheduler/go/tryjobs/BUILD.bazel
@@ -11,7 +11,6 @@
"//go/cleanup",
"//go/gerrit",
"//go/git/repograph",
- "//go/human",
"//go/metrics2",
"//go/now",
"//go/pubsub",
diff --git a/task_scheduler/go/tryjobs/tryjobs.go b/task_scheduler/go/tryjobs/tryjobs.go
index 7d280fe..26f5bfd 100644
--- a/task_scheduler/go/tryjobs/tryjobs.go
+++ b/task_scheduler/go/tryjobs/tryjobs.go
@@ -18,7 +18,6 @@
"go.skia.org/infra/go/cleanup"
"go.skia.org/infra/go/gerrit"
"go.skia.org/infra/go/git/repograph"
- "go.skia.org/infra/go/human"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/now"
"go.skia.org/infra/go/pubsub"
@@ -361,35 +360,14 @@
for {
select {
case jobs := <-jobsCh:
- sklog.Infof("Start processing jobs from modified jobs channel.")
- for _, job := range jobs {
- sklog.Infof("Found job %s (build %d) via modified jobs channel", job.Id, job.BuildbucketBuildId)
- }
- for _, job := range jobs {
- if job.Status != types.JOB_STATUS_REQUESTED {
- continue
- }
- if err := t.startJob(ctx, job); err != nil {
- sklog.Errorf("failed to start job %s (build %d): %s", job.Id, job.BuildbucketBuildId, err)
- }
- }
- sklog.Infof("Done processing jobs from modified jobs channel.")
+ t.startJobs(ctx, jobs, "modified jobs channel")
case <-tickCh:
- sklog.Infof("Start processing jobs from periodic DB poll, cache updated %s ago.", human.Duration(time.Now().Sub(t.jCache.LastUpdated())))
jobs, err := t.jCache.RequestedJobs()
if err != nil {
sklog.Errorf("failed retrieving Jobs: %s", err)
} else {
- for _, job := range jobs {
- sklog.Infof("Found job %s (build %d) via periodic DB poll", job.Id, job.BuildbucketBuildId)
- }
- for _, job := range jobs {
- if err := t.startJob(ctx, job); err != nil {
- sklog.Errorf("failed to start job %s (build %d): %s", job.Id, job.BuildbucketBuildId, err)
- }
- }
+ t.startJobs(ctx, jobs, "periodic DB poll")
}
- sklog.Infof("Done processing jobs from periodic DB poll.")
case <-doneCh:
ticker.Stop()
return
@@ -397,6 +375,32 @@
}
}
+func (t *TryJobIntegrator) startJobs(ctx context.Context, jobs []*types.Job, foundVia string) {
+ sklog.Infof("Start processing jobs from %s.", foundVia)
+ // Organize jobs by RepoState, since jobs with the same RepoState must be
+ // processed serially.
+ byRepoState := map[types.RepoState][]*types.Job{}
+ for _, job := range jobs {
+ sklog.Infof("Found job %s (build %d) via %s", job.Id, job.BuildbucketBuildId, foundVia)
+ byRepoState[job.RepoState] = append(byRepoState[job.RepoState], job)
+ }
+ var wg sync.WaitGroup
+ for _, jobs := range byRepoState {
+ wg.Add(1)
+ jobs := jobs // https://golang.org/doc/faq#closures_and_goroutines
+ go func() {
+ defer wg.Done()
+ for _, job := range jobs {
+ if err := t.startJob(ctx, job); err != nil {
+ sklog.Errorf("failed to start job %s (build %d): %s", job.Id, job.BuildbucketBuildId, err)
+ }
+ }
+ }()
+ }
+ wg.Wait()
+ sklog.Infof("Done processing jobs from %s.", foundVia)
+}
+
func isBuildAlreadyStartedError(err error) bool {
return err != nil && strings.Contains(err.Error(), buildAlreadyStartedErr)
}