[task scheduler] Add code paths for Buildbucket V2
Bug: b/288158829
Change-Id: I409c7283e635a3492f500944837d1a6eeb70b173
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/786177
Reviewed-by: Kevin Lubick <kjlubick@google.com>
diff --git a/go/buildbucket/BUILD.bazel b/go/buildbucket/BUILD.bazel
index 2ede382..a43b307 100644
--- a/go/buildbucket/BUILD.bazel
+++ b/go/buildbucket/BUILD.bazel
@@ -9,8 +9,11 @@
deps = [
"//go/buildbucket/common",
"//go/skerr",
+ "@com_github_google_uuid//:uuid",
"@org_chromium_go_luci//buildbucket/proto",
"@org_chromium_go_luci//grpc/prpc",
+ "@org_golang_google_grpc//metadata",
+ "@org_golang_google_protobuf//types/known/fieldmaskpb",
"@org_golang_google_protobuf//types/known/structpb",
],
)
diff --git a/go/buildbucket/buildbucket.go b/go/buildbucket/buildbucket.go
index 8d00ed0..edea2ea 100644
--- a/go/buildbucket/buildbucket.go
+++ b/go/buildbucket/buildbucket.go
@@ -5,7 +5,10 @@
"context"
"net/http"
+ "github.com/google/uuid"
buildbucketpb "go.chromium.org/luci/buildbucket/proto"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/protobuf/types/known/fieldmaskpb"
structpb "google.golang.org/protobuf/types/known/structpb"
"go.chromium.org/luci/grpc/prpc"
@@ -16,6 +19,7 @@
const (
BUILD_URL_TMPL = "https://%s/build/%d"
DEFAULT_HOST = "cr-buildbucket.appspot.com"
+ headerToken = "x-buildbucket-token"
)
var (
@@ -26,10 +30,12 @@
)
type BuildBucketInterface interface {
+ // CancelBuilds cancels the specified buildIDs with the specified
+ // summaryMarkdown. Builds are cancelled with one batch request
+ // to buildbucket.
+ CancelBuilds(ctx context.Context, buildIDs []int64, summaryMarkdown string) ([]*buildbucketpb.Build, error)
// GetBuild retrieves the build with the given ID.
GetBuild(ctx context.Context, buildId int64) (*buildbucketpb.Build, error)
- // Search retrieves Builds which match the given criteria.
- Search(ctx context.Context, pred *buildbucketpb.BuildPredicate) ([]*buildbucketpb.Build, error)
// GetTrybotsForCL retrieves trybot results for the given CL using the
// optional tags.
GetTrybotsForCL(ctx context.Context, issue, patchset int64, gerritUrl string, tags map[string]string) ([]*buildbucketpb.Build, error)
@@ -42,10 +48,13 @@
// means that the Infra-PerCommit-Race build should be scheduled with the
// "triggered_by: skcq" tag.
ScheduleBuilds(ctx context.Context, builds []string, buildsToTags map[string]map[string]string, issue, patchset int64, gerritUrl, repo, bbProject, bbBucket string) ([]*buildbucketpb.Build, error)
- // CancelBuilds cancels the specified buildIDs with the specified
- // summaryMarkdown. Builds are cancelled with one batch request
- // to buildbucket.
- CancelBuilds(ctx context.Context, buildIDs []int64, summaryMarkdown string) ([]*buildbucketpb.Build, error)
+ // Search retrieves Builds which match the given criteria.
+ Search(ctx context.Context, pred *buildbucketpb.BuildPredicate) ([]*buildbucketpb.Build, error)
+ // UpdateBuild sends an update for the given build.
+ UpdateBuild(ctx context.Context, build *buildbucketpb.Build, token string) error
+ // StartBuild notifies Buildbucket that the build has started. Returns the
+ // token which should be passed to UpdateBuild for subsequent calls.
+ StartBuild(ctx context.Context, buildId int64, taskId, token string) (string, error)
}
// Client is used for interacting with the BuildBucket API.
@@ -207,5 +216,76 @@
return c.Search(ctx, pred)
}
+func contextWithTokenMetadata(ctx context.Context, token string) context.Context {
+ return metadata.NewOutgoingContext(ctx, map[string][]string{
+ headerToken: {token},
+ })
+}
+
+// StartBuild implements BuildbucketInterface.
+func (c *Client) StartBuild(ctx context.Context, buildId int64, taskId, token string) (string, error) {
+ resp, err := c.bc.StartBuild(contextWithTokenMetadata(ctx, token), &buildbucketpb.StartBuildRequest{
+ RequestId: uuid.New().String(),
+ BuildId: buildId,
+ TaskId: taskId,
+ })
+ if err != nil {
+ return "", err
+ }
+ return resp.UpdateBuildToken, nil
+}
+
+// UpdateBuild implements BuildbucketInterface.
+func (c *Client) UpdateBuild(ctx context.Context, build *buildbucketpb.Build, token string) error {
+ var updatePaths []string
+ if build.Output != nil {
+ if build.Output.Properties != nil {
+ updatePaths = append(updatePaths, "build.output.properties")
+ }
+ if build.Output.GitilesCommit != nil {
+ updatePaths = append(updatePaths, "build.output.gitiles_commit")
+ }
+ if build.Output.Status != buildbucketpb.Status_STATUS_UNSPECIFIED {
+ updatePaths = append(updatePaths, "build.output.status")
+ }
+ if build.Output.StatusDetails != nil {
+ updatePaths = append(updatePaths, "build.output.status_details")
+ }
+ if build.Output.SummaryMarkdown != "" {
+ updatePaths = append(updatePaths, "build.output.summary_markdown")
+ }
+ }
+ if build.Status != buildbucketpb.Status_STATUS_UNSPECIFIED {
+ updatePaths = append(updatePaths, "build.status")
+ }
+ if build.StatusDetails != nil {
+ updatePaths = append(updatePaths, "build.status_details")
+ }
+ if len(build.Steps) > 0 {
+ updatePaths = append(updatePaths, "build.steps")
+ }
+ if build.SummaryMarkdown != "" {
+ updatePaths = append(updatePaths, "build.summary_markdown")
+ }
+ if len(build.Tags) > 0 {
+ updatePaths = append(updatePaths, "build.tags")
+ }
+ if build.Infra != nil && build.Infra.Buildbucket != nil && build.Infra.Buildbucket.Agent != nil {
+ if build.Infra.Buildbucket.Agent.Output != nil {
+ updatePaths = append(updatePaths, "build.infra.buildbucket.agent.output")
+ }
+ if build.Infra.Buildbucket.Agent.Purposes != nil {
+ updatePaths = append(updatePaths, "build.infra.buildbucket.agent.purposes")
+ }
+ }
+ _, err := c.bc.UpdateBuild(contextWithTokenMetadata(ctx, token), &buildbucketpb.UpdateBuildRequest{
+ Build: build,
+ UpdateMask: &fieldmaskpb.FieldMask{
+ Paths: updatePaths,
+ },
+ })
+ return skerr.Wrap(err)
+}
+
// Make sure Client fulfills the BuildBucketInterface interface.
var _ BuildBucketInterface = (*Client)(nil)
diff --git a/go/buildbucket/mocks/BuildBucketInterface.go b/go/buildbucket/mocks/BuildBucketInterface.go
index a965b5f..5531818 100644
--- a/go/buildbucket/mocks/BuildBucketInterface.go
+++ b/go/buildbucket/mocks/BuildBucketInterface.go
@@ -165,6 +165,52 @@
return r0, r1
}
+// StartBuild provides a mock function with given fields: ctx, buildId, taskId, token
+func (_m *BuildBucketInterface) StartBuild(ctx context.Context, buildId int64, taskId string, token string) (string, error) {
+ ret := _m.Called(ctx, buildId, taskId, token)
+
+ if len(ret) == 0 {
+ panic("no return value specified for StartBuild")
+ }
+
+ var r0 string
+ var r1 error
+ if rf, ok := ret.Get(0).(func(context.Context, int64, string, string) (string, error)); ok {
+ return rf(ctx, buildId, taskId, token)
+ }
+ if rf, ok := ret.Get(0).(func(context.Context, int64, string, string) string); ok {
+ r0 = rf(ctx, buildId, taskId, token)
+ } else {
+ r0 = ret.Get(0).(string)
+ }
+
+ if rf, ok := ret.Get(1).(func(context.Context, int64, string, string) error); ok {
+ r1 = rf(ctx, buildId, taskId, token)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// UpdateBuild provides a mock function with given fields: ctx, build, token
+func (_m *BuildBucketInterface) UpdateBuild(ctx context.Context, build *buildbucketpb.Build, token string) error {
+ ret := _m.Called(ctx, build, token)
+
+ if len(ret) == 0 {
+ panic("no return value specified for UpdateBuild")
+ }
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func(context.Context, *buildbucketpb.Build, string) error); ok {
+ r0 = rf(ctx, build, token)
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
// NewBuildBucketInterface creates a new instance of BuildBucketInterface. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewBuildBucketInterface(t interface {
diff --git a/task_scheduler/go/job_creation/BUILD.bazel b/task_scheduler/go/job_creation/BUILD.bazel
index 5a5d2a2..97b32f3 100644
--- a/task_scheduler/go/job_creation/BUILD.bazel
+++ b/task_scheduler/go/job_creation/BUILD.bazel
@@ -15,6 +15,7 @@
"//go/git/repograph",
"//go/metrics2",
"//go/now",
+ "//go/pubsub",
"//go/skerr",
"//go/sklog",
"//go/util",
@@ -27,7 +28,6 @@
"//task_scheduler/go/tryjobs",
"//task_scheduler/go/types",
"//task_scheduler/go/window",
- "@org_golang_x_oauth2//:oauth2",
"@org_golang_x_sync//errgroup",
],
)
diff --git a/task_scheduler/go/job_creation/job_creation.go b/task_scheduler/go/job_creation/job_creation.go
index fddb2e9..5498d5d 100644
--- a/task_scheduler/go/job_creation/job_creation.go
+++ b/task_scheduler/go/job_creation/job_creation.go
@@ -2,7 +2,6 @@
import (
"context"
- "fmt"
"net/http"
"strings"
"time"
@@ -15,6 +14,7 @@
"go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/now"
+ "go.skia.org/infra/go/pubsub"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
@@ -27,7 +27,6 @@
"go.skia.org/infra/task_scheduler/go/tryjobs"
"go.skia.org/infra/task_scheduler/go/types"
"go.skia.org/infra/task_scheduler/go/window"
- "golang.org/x/oauth2"
"golang.org/x/sync/errgroup"
)
@@ -64,31 +63,31 @@
}
// NewJobCreator returns a JobCreator instance.
-func NewJobCreator(ctx context.Context, d db.DB, period time.Duration, numCommits int, workdir, host string, repos repograph.Map, rbe cas.CAS, c *http.Client, buildbucketApiUrl, trybotBucket string, projectRepoMapping map[string]string, depotTools string, gerrit gerrit.GerritInterface, taskCfgCache task_cfg_cache.TaskCfgCache, ts oauth2.TokenSource) (*JobCreator, error) {
+func NewJobCreator(ctx context.Context, d db.DB, period time.Duration, numCommits int, workdir, host string, repos repograph.Map, rbe cas.CAS, c *http.Client, buildbucketApiUrl, buildbucketTarget, buildbucketBucket string, projectRepoMapping map[string]string, depotTools string, gerrit gerrit.GerritInterface, taskCfgCache task_cfg_cache.TaskCfgCache, pubsubClient pubsub.Client) (*JobCreator, error) {
// Repos must be updated before window is initialized; otherwise the repos may be uninitialized,
// resulting in the window being too short, causing the caches to be loaded with incomplete data.
for _, r := range repos {
if err := r.Update(ctx); err != nil {
- return nil, fmt.Errorf("Failed initial repo sync: %s", err)
+ return nil, skerr.Wrapf(err, "failed initial repo sync")
}
}
w, err := window.New(ctx, period, numCommits, repos)
if err != nil {
- return nil, fmt.Errorf("Failed to create window: %s", err)
+ return nil, skerr.Wrapf(err, "failed to create window")
}
// Create caches.
jCache, err := cache.NewJobCache(ctx, d, w, nil)
if err != nil {
- return nil, fmt.Errorf("Failed to create JobCache: %s", err)
+ return nil, skerr.Wrapf(err, "fFailed to create JobCache")
}
sc := syncer.New(ctx, repos, depotTools, workdir, syncer.DefaultNumWorkers)
chr := cacher.New(sc, taskCfgCache, rbe)
- tryjobs, err := tryjobs.NewTryJobIntegrator(buildbucketApiUrl, trybotBucket, host, c, d, jCache, projectRepoMapping, repos, taskCfgCache, chr, gerrit)
+ tryjobs, err := tryjobs.NewTryJobIntegrator(ctx, buildbucketApiUrl, buildbucketTarget, buildbucketBucket, host, c, d, jCache, projectRepoMapping, repos, taskCfgCache, chr, gerrit, pubsubClient)
if err != nil {
- return nil, fmt.Errorf("Failed to create TryJobIntegrator: %s", err)
+ return nil, skerr.Wrapf(err, "failed to create TryJobIntegrator")
}
jc := &JobCreator{
cacher: chr,
@@ -334,7 +333,7 @@
// because of poorly-timed process restarts. Go through the unfinished
// jobs and cache them if necessary.
if err := jc.jCache.Update(ctx); err != nil {
- return fmt.Errorf("Failed to update job cache: %s", err)
+ return skerr.Wrapf(err, "failed to update job cache")
}
unfinishedJobs, err := jc.jCache.InProgressJobs()
if err != nil {
@@ -372,7 +371,7 @@
// very long.
sklog.Errorf("Have cached error for RepoState %s", rs.RowKey())
} else {
- return fmt.Errorf("Failed to cache RepoState: %s", err)
+ return skerr.Wrapf(err, "failed to cache RepoState")
}
}
return nil
@@ -413,7 +412,7 @@
main = repo.Get(git.MainBranch)
}
if main == nil {
- return fmt.Errorf("Failed to retrieve branch %q or %q for %s", git.MasterBranch, git.MainBranch, repoUrl)
+ return skerr.Fmt("failed to retrieve branch %q or %q for %s", git.MasterBranch, git.MainBranch, repoUrl)
}
rs := types.RepoState{
Repo: repoUrl,
@@ -424,13 +423,13 @@
err = cachedErr
}
if err != nil {
- return fmt.Errorf("Failed to retrieve TaskCfg from %s: %s", repoUrl, err)
+ return skerr.Wrapf(err, "failed to retrieve TaskCfg from %s", repoUrl)
}
for name, js := range cfg.Jobs {
if js.Trigger == triggerName {
job, err := task_cfg_cache.MakeJob(ctx, jc.taskCfgCache, rs, name)
if err != nil {
- return fmt.Errorf("Failed to create job: %s", err)
+ return skerr.Wrapf(err, "failed to create job")
}
job.Requested = job.Created
jobs = append(jobs, job)
@@ -474,7 +473,7 @@
// Insert the new jobs into the DB.
if err := jc.putJobsInChunks(ctx, jobsToInsert); err != nil {
- return fmt.Errorf("Failed to add periodic jobs: %s", err)
+ return skerr.Wrapf(err, "failed to add periodic jobs")
}
sklog.Infof("Created %d periodic jobs for trigger %q", len(jobs), triggerName)
return nil
diff --git a/task_scheduler/go/job_creation/job_creation_test.go b/task_scheduler/go/job_creation/job_creation_test.go
index d38a565..4bc40ac 100644
--- a/task_scheduler/go/job_creation/job_creation_test.go
+++ b/task_scheduler/go/job_creation/job_creation_test.go
@@ -66,7 +66,7 @@
cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.TestCASDigest}).Return(tcc_testutils.TestCASDigest, nil)
cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.PerfCASDigest}).Return(tcc_testutils.PerfCASDigest, nil)
- jc, err := NewJobCreator(ctx, d, time.Duration(math.MaxInt64), 0, tmp, "fake.server", repos, cas, urlMock.Client(), tryjobs.API_URL_TESTING, tryjobs.BUCKET_TESTING, projectRepoMapping, depotTools, g, taskCfgCache, nil)
+ jc, err := NewJobCreator(ctx, d, time.Duration(math.MaxInt64), 0, tmp, "fake.server", repos, cas, urlMock.Client(), tryjobs.API_URL_TESTING, "fake-bb-target", tryjobs.BUCKET_TESTING, projectRepoMapping, depotTools, g, taskCfgCache, nil)
require.NoError(t, err)
return ctx, gb, d, jc, urlMock, cas, func() {
testutils.AssertCloses(t, jc)
diff --git a/task_scheduler/go/scheduling/perftest/BUILD.bazel b/task_scheduler/go/scheduling/perftest/BUILD.bazel
index 1a66c78..754bb55 100644
--- a/task_scheduler/go/scheduling/perftest/BUILD.bazel
+++ b/task_scheduler/go/scheduling/perftest/BUILD.bazel
@@ -16,6 +16,7 @@
"//go/git/repograph",
"//go/httputils",
"//go/now",
+ "//go/pubsub/mocks",
"//go/sklog",
"//go/swarming",
"//go/util",
diff --git a/task_scheduler/go/scheduling/perftest/perftest.go b/task_scheduler/go/scheduling/perftest/perftest.go
index 7bfac9e..5e1298d 100644
--- a/task_scheduler/go/scheduling/perftest/perftest.go
+++ b/task_scheduler/go/scheduling/perftest/perftest.go
@@ -34,6 +34,7 @@
"go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/now"
+ pubsub_mocks "go.skia.org/infra/go/pubsub/mocks"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/swarming"
"go.skia.org/infra/go/util"
@@ -303,6 +304,7 @@
assertEqual(head, commits[0])
ts, err := google.DefaultTokenSource(ctx, datastore.ScopeDatastore)
+ assertNoError(err)
fsInstance := uuid.New().String()
d, err := firestore.NewDBWithParams(ctx, firestore.FIRESTORE_PROJECT, fsInstance, ts)
assertNoError(err)
@@ -346,7 +348,8 @@
}
depotTools, err := depot_tools.GetDepotTools(ctx, workdir, *recipesCfgFile)
assertNoError(err)
- jc, err := job_creation.NewJobCreator(ctx, d, windowPeriod, 0, workdir, "localhost", repos, cas, client, "", "", nil, depotTools, nil, taskCfgCache, ts)
+ pubsubClient := &pubsub_mocks.Client{}
+ jc, err := job_creation.NewJobCreator(ctx, d, windowPeriod, 0, workdir, "localhost", repos, cas, client, "fake-bb-url", "fake-bb-target", "fake-bb-bucket", nil, depotTools, nil, taskCfgCache, pubsubClient)
assertNoError(err)
// Wait for job-creator to process the jobs from the repo.
diff --git a/task_scheduler/go/task-scheduler-jc/BUILD.bazel b/task_scheduler/go/task-scheduler-jc/BUILD.bazel
index ab043ee..530c91a 100644
--- a/task_scheduler/go/task-scheduler-jc/BUILD.bazel
+++ b/task_scheduler/go/task-scheduler-jc/BUILD.bazel
@@ -18,6 +18,7 @@
"//go/httputils",
"//go/human",
"//go/periodic",
+ "//go/pubsub",
"//go/sklog",
"//go/tracing",
"//go/util",
@@ -28,8 +29,8 @@
"//task_scheduler/go/types",
"@com_google_cloud_go_bigtable//:bigtable",
"@com_google_cloud_go_datastore//:datastore",
- "@com_google_cloud_go_pubsub//:pubsub",
"@org_golang_google_api//compute/v1:compute",
+ "@org_golang_google_api//option",
"@org_golang_x_oauth2//:oauth2",
"@org_golang_x_oauth2//google",
],
diff --git a/task_scheduler/go/task-scheduler-jc/main.go b/task_scheduler/go/task-scheduler-jc/main.go
index 71147a9..8ea6330 100644
--- a/task_scheduler/go/task-scheduler-jc/main.go
+++ b/task_scheduler/go/task-scheduler-jc/main.go
@@ -11,10 +11,10 @@
"cloud.google.com/go/bigtable"
"cloud.google.com/go/datastore"
- "cloud.google.com/go/pubsub"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/compute/v1"
+ "google.golang.org/api/option"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/cas/rbe"
@@ -28,6 +28,7 @@
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/human"
"go.skia.org/infra/go/periodic"
+ "go.skia.org/infra/go/pubsub"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/tracing"
"go.skia.org/infra/go/util"
@@ -48,22 +49,24 @@
var (
// Flags.
- btInstance = flag.String("bigtable_instance", "", "BigTable instance to use.")
- btProject = flag.String("bigtable_project", "", "GCE project to use for BigTable.")
- host = flag.String("host", "localhost", "HTTP service host")
- port = flag.String("port", ":8000", "HTTP service port for the web server (e.g., ':8000')")
- disableTryjobs = flag.Bool("disable_try_jobs", false, "If set, no try jobs will be picked up.")
- firestoreInstance = flag.String("firestore_instance", "", "Firestore instance to use, eg. \"production\"")
- gitstoreTable = flag.String("gitstore_bt_table", "git-repos2", "BigTable table used for GitStore.")
- local = flag.Bool("local", false, "Whether we're running on a dev machine vs in production.")
- rbeInstance = flag.String("rbe_instance", "projects/chromium-swarm/instances/default_instance", "CAS instance to use")
- repoUrls = common.NewMultiStringFlag("repo", nil, "Repositories for which to schedule tasks.")
- recipesCfgFile = flag.String("recipes_cfg", "", "Path to the recipes.cfg file.")
- timePeriod = flag.String("timeWindow", "4d", "Time period to use.")
- tryJobBucket = flag.String("tryjob_bucket", tryjobs.BUCKET_PRIMARY, "Which Buildbucket bucket to use for try jobs.")
- commitWindow = flag.Int("commitWindow", 10, "Minimum number of recent commits to keep in the timeWindow.")
- workdir = flag.String("workdir", "workdir", "Working directory to use.")
- promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')")
+ btInstance = flag.String("bigtable_instance", "", "BigTable instance to use.")
+ btProject = flag.String("bigtable_project", "", "GCE project to use for BigTable.")
+ buildbucketBucket = flag.String("tryjob_bucket", tryjobs.BUCKET_PRIMARY, "Which Buildbucket bucket to use for try jobs.")
+ buildbucketTarget = flag.String("buildbucket_target", "", "Buildbucket backend target name used to address this scheduler.")
+ buildbucketPubSubProject = flag.String("buildbucket_pubsub_project", "", "Pub/sub project used for sending messages to Buildbucket.")
+ host = flag.String("host", "localhost", "HTTP service host")
+ port = flag.String("port", ":8000", "HTTP service port for the web server (e.g., ':8000')")
+ disableTryjobs = flag.Bool("disable_try_jobs", false, "If set, no try jobs will be picked up.")
+ firestoreInstance = flag.String("firestore_instance", "", "Firestore instance to use, eg. \"production\"")
+ gitstoreTable = flag.String("gitstore_bt_table", "git-repos2", "BigTable table used for GitStore.")
+ local = flag.Bool("local", false, "Whether we're running on a dev machine vs in production.")
+ rbeInstance = flag.String("rbe_instance", "projects/chromium-swarm/instances/default_instance", "CAS instance to use")
+ repoUrls = common.NewMultiStringFlag("repo", nil, "Repositories for which to schedule tasks.")
+ recipesCfgFile = flag.String("recipes_cfg", "", "Path to the recipes.cfg file.")
+ timePeriod = flag.String("timeWindow", "4d", "Time period to use.")
+ commitWindow = flag.Int("commitWindow", 10, "Minimum number of recent commits to keep in the timeWindow.")
+ workdir = flag.String("workdir", "workdir", "Working directory to use.")
+ promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')")
)
func main() {
@@ -174,9 +177,15 @@
sklog.Fatalf("Failed to create TaskCfgCache: %s", err)
}
+ // Create pubsub client.
+ var pubsubClient pubsub.Client
+ if *buildbucketPubSubProject != "" {
+ pubsubClient, err = pubsub.NewClient(ctx, *buildbucketPubSubProject, option.WithTokenSource(tokenSource))
+ }
+
// Create and start the JobCreator.
sklog.Infof("Creating JobCreator.")
- jc, err := job_creation.NewJobCreator(ctx, tsDb, period, *commitWindow, wdAbs, serverURL, repos, cas, httpClient, tryjobs.API_URL_PROD, *tryJobBucket, common.PROJECT_REPO_MAPPING, depotTools, gerrit, taskCfgCache, tokenSource)
+ jc, err := job_creation.NewJobCreator(ctx, tsDb, period, *commitWindow, wdAbs, serverURL, repos, cas, httpClient, tryjobs.API_URL_PROD, *buildbucketTarget, *buildbucketBucket, common.PROJECT_REPO_MAPPING, depotTools, gerrit, taskCfgCache, pubsubClient)
if err != nil {
sklog.Fatal(err)
}
diff --git a/task_scheduler/go/tryjobs/BUILD.bazel b/task_scheduler/go/tryjobs/BUILD.bazel
index de63425..3694eb8 100644
--- a/task_scheduler/go/tryjobs/BUILD.bazel
+++ b/task_scheduler/go/tryjobs/BUILD.bazel
@@ -14,6 +14,7 @@
"//go/git/repograph",
"//go/metrics2",
"//go/now",
+ "//go/pubsub",
"//go/skerr",
"//go/sklog",
"//go/util",
@@ -23,8 +24,11 @@
"//task_scheduler/go/task_cfg_cache",
"//task_scheduler/go/types",
"@com_github_golang_protobuf//ptypes:go_default_library_gen",
+ "@com_github_hashicorp_go_multierror//:go-multierror",
+ "@com_google_cloud_go_pubsub//:pubsub",
"@org_chromium_go_luci//buildbucket/proto",
"@org_chromium_go_luci//common/api/buildbucket/buildbucket/v1:buildbucket",
+ "@org_golang_google_protobuf//encoding/prototext",
],
)
@@ -44,6 +48,7 @@
"//go/git/repograph",
"//go/mockhttpclient",
"//go/now",
+ "//go/pubsub/mocks",
"//go/sklog",
"//go/sktest",
"//go/testutils",
@@ -57,8 +62,11 @@
"//task_scheduler/go/types",
"//task_scheduler/go/window",
"@com_github_golang_protobuf//ptypes:go_default_library_gen",
+ "@com_github_stretchr_testify//mock",
"@com_github_stretchr_testify//require",
+ "@com_google_cloud_go_pubsub//:pubsub",
"@org_chromium_go_luci//buildbucket/proto",
"@org_chromium_go_luci//common/api/buildbucket/buildbucket/v1:buildbucket",
+ "@org_golang_google_protobuf//encoding/prototext",
],
)
diff --git a/task_scheduler/go/tryjobs/tryjobs.go b/task_scheduler/go/tryjobs/tryjobs.go
index 644ffe1..1a6d497 100644
--- a/task_scheduler/go/tryjobs/tryjobs.go
+++ b/task_scheduler/go/tryjobs/tryjobs.go
@@ -11,7 +11,9 @@
"sync"
"time"
+ pubsub_api "cloud.google.com/go/pubsub"
"github.com/golang/protobuf/ptypes"
+ "github.com/hashicorp/go-multierror"
buildbucketpb "go.chromium.org/luci/buildbucket/proto"
buildbucket_api "go.chromium.org/luci/common/api/buildbucket/buildbucket/v1"
"go.skia.org/infra/go/buildbucket"
@@ -21,6 +23,7 @@
"go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/now"
+ "go.skia.org/infra/go/pubsub"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
@@ -29,6 +32,7 @@
"go.skia.org/infra/task_scheduler/go/db/cache"
"go.skia.org/infra/task_scheduler/go/task_cfg_cache"
"go.skia.org/infra/task_scheduler/go/types"
+ "google.golang.org/protobuf/encoding/prototext"
)
/*
@@ -85,34 +89,38 @@
type TryJobIntegrator struct {
bb *buildbucket_api.Service
bb2 buildbucket.BuildBucketInterface
- bucket string
+ buildbucketBucket string
+ buildbucketTarget string
chr cacher.Cacher
db db.JobDB
gerrit gerrit.GerritInterface
host string
jCache cache.JobCache
projectRepoMapping map[string]string
+ pubsub pubsub.Client
rm repograph.Map
taskCfgCache task_cfg_cache.TaskCfgCache
}
// NewTryJobIntegrator returns a TryJobIntegrator instance.
-func NewTryJobIntegrator(apiUrl, bucket, host string, c *http.Client, d db.JobDB, jCache cache.JobCache, projectRepoMapping map[string]string, rm repograph.Map, taskCfgCache task_cfg_cache.TaskCfgCache, chr cacher.Cacher, gerrit gerrit.GerritInterface) (*TryJobIntegrator, error) {
+func NewTryJobIntegrator(ctx context.Context, buildbucketAPIURL, buildbucketTarget, buildbucketBucket, host string, c *http.Client, d db.JobDB, jCache cache.JobCache, projectRepoMapping map[string]string, rm repograph.Map, taskCfgCache task_cfg_cache.TaskCfgCache, chr cacher.Cacher, gerrit gerrit.GerritInterface, pubsubClient pubsub.Client) (*TryJobIntegrator, error) {
bb, err := buildbucket_api.New(c)
if err != nil {
return nil, err
}
- bb.BasePath = apiUrl
+ bb.BasePath = buildbucketAPIURL
rv := &TryJobIntegrator{
bb: bb,
bb2: buildbucket.NewClient(c),
- bucket: bucket,
+ buildbucketBucket: buildbucketBucket,
+ buildbucketTarget: buildbucketTarget,
db: d,
chr: chr,
gerrit: gerrit,
host: host,
jCache: jCache,
projectRepoMapping: projectRepoMapping,
+ pubsub: pubsubClient,
rm: rm,
taskCfgCache: taskCfgCache,
}
@@ -161,7 +169,7 @@
jobs := t.jCache.GetAllCachedJobs()
rv := []*types.Job{}
for _, job := range jobs {
- if job.BuildbucketLeaseKey != 0 && job.Status != types.JOB_STATUS_REQUESTED {
+ if (job.BuildbucketLeaseKey != 0 || job.BuildbucketToken != "") && job.Status != types.JOB_STATUS_REQUESTED {
rv = append(rv, job)
}
}
@@ -178,12 +186,15 @@
// Divide up finished and unfinished Jobs.
finished := make([]*types.Job, 0, len(jobs))
- unfinished := make([]*types.Job, 0, len(jobs))
+ unfinishedV1 := make([]*types.Job, 0, len(jobs))
+ unfinishedV2 := make([]*types.Job, 0, len(jobs))
for _, j := range jobs {
if j.Done() {
finished = append(finished, j)
+ } else if isBBv2(j) {
+ unfinishedV2 = append(unfinishedV2, j)
} else {
- unfinished = append(unfinished, j)
+ unfinishedV1 = append(unfinishedV1, j)
}
}
@@ -193,7 +204,14 @@
wg.Add(1)
go func() {
defer wg.Done()
- heartbeatErr = t.sendHeartbeats(ctx, unfinished)
+ heartbeatErr = t.sendHeartbeats(ctx, unfinishedV1)
+ }()
+
+ var pubsubErr error
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ pubsubErr = t.sendPubsubUpdates(ctx, unfinishedV2)
}()
// Send updates for finished Jobs, empty the lease keys to mark them
@@ -201,10 +219,11 @@
errs := []error{}
insert := make([]*types.Job, 0, len(finished))
for _, j := range finished {
- if err := t.jobFinished(j); err != nil {
+ if err := t.jobFinished(ctx, j); err != nil {
errs = append(errs, err)
} else {
j.BuildbucketLeaseKey = 0
+ j.BuildbucketToken = ""
insert = append(insert, j)
}
}
@@ -217,6 +236,9 @@
if heartbeatErr != nil {
errs = append(errs, heartbeatErr)
}
+ if pubsubErr != nil {
+ errs = append(errs, pubsubErr)
+ }
if len(errs) > 0 {
return skerr.Fmt("Failed to update jobs; got errors: %v", errs)
@@ -237,6 +259,11 @@
s[i], s[j] = s[j], s[i]
}
+// isBBv2 returns true iff the Job was triggered using Buildbucket V2.
+func isBBv2(j *types.Job) bool {
+ return j.BuildbucketPubSubTopic != ""
+}
+
// sendHeartbeats sends heartbeats to Buildbucket for all of the unfinished try
// Jobs.
func (t *TryJobIntegrator) sendHeartbeats(ctx context.Context, jobs []*types.Job) error {
@@ -251,13 +278,6 @@
// Send heartbeats for all leases.
send := func(jobs []*types.Job) {
- // TODO(borenet): Remove this logging when no longer needed.
- ids := make([]string, 0, len(jobs))
- for _, job := range jobs {
- ids = append(ids, job.Id)
- }
- sklog.Infof("Sending heartbeats for jobs: %s", strings.Join(ids, ", "))
-
heartbeats := make([]*buildbucket_api.LegacyApiHeartbeatBatchRequestMessageOneHeartbeat, 0, len(jobs))
for _, j := range jobs {
heartbeats = append(heartbeats, &buildbucket_api.LegacyApiHeartbeatBatchRequestMessageOneHeartbeat{
@@ -319,6 +339,38 @@
return nil
}
+// sendPubsubUpdates sends updates to Buildbucket via Pub/Sub for in-progress
+// Jobs.
+func (t *TryJobIntegrator) sendPubsubUpdates(ctx context.Context, jobs []*types.Job) error {
+ g := multierror.Group{}
+ for _, job := range jobs {
+ job := job // https://golang.org/doc/faq#closures_and_goroutines
+ g.Go(func() error {
+ update := &buildbucketpb.BuildTaskUpdate{
+ BuildId: strconv.FormatInt(job.BuildbucketBuildId, 10),
+ Task: &buildbucketpb.Task{
+ Id: &buildbucketpb.TaskID{
+ Target: t.buildbucketTarget,
+ Id: job.Id,
+ },
+ Link: job.URL(t.host),
+ Status: jobStatusToBuildbucketStatus(job.Status),
+ UpdateId: now.Now(ctx).UnixNano(),
+ },
+ }
+ b, err := prototext.Marshal(update)
+ if err != nil {
+ return skerr.Wrapf(err, "failed to encode BuildTaskUpdate")
+ }
+ _, err = t.pubsub.Topic(job.BuildbucketPubSubTopic).Publish(ctx, &pubsub_api.Message{
+ Data: b,
+ }).Get(ctx)
+ return err
+ })
+ }
+ return g.Wait().ErrorOrNil()
+}
+
// getRepo returns the repo information associated with the given URL.
func (t *TryJobIntegrator) getRepo(repoUrl string) (*repograph.Graph, error) {
r, ok := t.rm[repoUrl]
@@ -363,8 +415,8 @@
return nil
}
-func (t *TryJobIntegrator) remoteCancelBuild(id int64, msg string) error {
- sklog.Warningf("Canceling Buildbucket build %d. Reason: %s", id, msg)
+func (t *TryJobIntegrator) remoteCancelV1Build(buildId int64, msg string) error {
+ sklog.Warningf("Canceling Buildbucket build %d. Reason: %s", buildId, msg)
message := struct {
Message string `json:"message"`
}{
@@ -374,7 +426,7 @@
if err != nil {
return err
}
- resp, err := t.bb.Cancel(id, &buildbucket_api.LegacyApiCancelRequestBodyMessage{
+ resp, err := t.bb.Cancel(buildId, &buildbucket_api.LegacyApiCancelRequestBodyMessage{
ResultDetailsJson: string(b),
}).Do()
if err != nil {
@@ -386,7 +438,7 @@
return nil
}
-func (t *TryJobIntegrator) tryLeaseBuild(ctx context.Context, id int64) (int64, *buildbucket_api.LegacyApiErrorMessage, error) {
+func (t *TryJobIntegrator) tryLeaseV1Build(ctx context.Context, id int64) (int64, *buildbucket_api.LegacyApiErrorMessage, error) {
expiration := now.Now(ctx).Add(LEASE_DURATION_INITIAL).Unix() * secondsToMicros
sklog.Infof("Attempting to lease build %d", id)
resp, err := t.bb.Lease(id, &buildbucket_api.LegacyApiLeaseRequestBodyMessage{
@@ -402,7 +454,7 @@
return leaseKey, resp.Error, nil
}
-func (t *TryJobIntegrator) insertNewJob(ctx context.Context, buildId int64) error {
+func (t *TryJobIntegrator) insertNewJobV1(ctx context.Context, buildId int64) error {
// Get the build details from the v2 API.
build, err := t.bb2.GetBuild(ctx, buildId)
if err != nil {
@@ -410,13 +462,13 @@
}
if build.Status != buildbucketpb.Status_SCHEDULED {
sklog.Warningf("Found build %d with status: %s; attempting to lease anyway, to trigger the fix in Buildbucket.", build.Id, build.Status)
- _, bbError, err := t.tryLeaseBuild(ctx, buildId)
+ _, bbError, err := t.tryLeaseV1Build(ctx, buildId)
if err != nil || bbError != nil {
// This is expected.
return nil
}
sklog.Warningf("Unexpectedly able to lease build %d with status %s; canceling it.", buildId, build.Status)
- if err := t.remoteCancelBuild(buildId, fmt.Sprintf("Unexpected status %s", build.Status)); err != nil {
+ if err := t.remoteCancelV1Build(buildId, fmt.Sprintf("Unexpected status %s", build.Status)); err != nil {
sklog.Warningf("Failed to cancel errant build %d", buildId)
return nil
}
@@ -424,12 +476,12 @@
// Obtain and validate the RepoState.
if build.Input.GerritChanges == nil || len(build.Input.GerritChanges) != 1 {
- return t.remoteCancelBuild(buildId, fmt.Sprintf("Invalid Build %d: input should have exactly one GerritChanges: %+v", buildId, build.Input))
+ return t.remoteCancelV1Build(buildId, fmt.Sprintf("Invalid Build %d: input should have exactly one GerritChanges: %+v", buildId, build.Input))
}
gerritChange := build.Input.GerritChanges[0]
repoUrl, ok := t.projectRepoMapping[gerritChange.Project]
if !ok {
- return t.remoteCancelBuild(buildId, fmt.Sprintf("Unknown patch project %q", gerritChange.Project))
+ return t.remoteCancelV1Build(buildId, fmt.Sprintf("Unknown patch project %q", gerritChange.Project))
}
server := gerritChange.Host
if !strings.Contains(server, "://") {
@@ -449,7 +501,7 @@
}
requested, err := ptypes.Timestamp(build.CreateTime)
if err != nil {
- return t.remoteCancelBuild(buildId, fmt.Sprintf("Failed to convert timestamp for %d: %s", build.Id, err))
+ return t.remoteCancelV1Build(buildId, fmt.Sprintf("Failed to convert timestamp for %d: %s", build.Id, err))
}
j := &types.Job{
Name: build.Builder.Builder,
@@ -464,7 +516,7 @@
j.Requested = j.Created.Add(-firestore.TS_RESOLUTION)
}
// Attempt to lease the build.
- leaseKey, bbError, err := t.tryLeaseBuild(ctx, j.BuildbucketBuildId)
+ leaseKey, bbError, err := t.tryLeaseV1Build(ctx, j.BuildbucketBuildId)
if err != nil {
return skerr.Wrapf(err, "failed to lease build %d", j.BuildbucketBuildId)
} else if bbError != nil {
@@ -472,14 +524,14 @@
// return an error is that the Build has been canceled. While this
// is the most likely reason, others are possible, and we may gain
// some information by reading the error and behaving accordingly.
- return t.remoteCancelBuild(buildId, fmt.Sprintf("Buildbucket refused lease with %q", bbError.Message))
+ return t.remoteCancelV1Build(buildId, fmt.Sprintf("Buildbucket refused lease with %q", bbError.Message))
} else if leaseKey == 0 {
- return t.remoteCancelBuild(buildId, "Buildbucket returned zero lease key")
+ return t.remoteCancelV1Build(buildId, "Buildbucket returned zero lease key")
}
j.BuildbucketLeaseKey = leaseKey
if err := t.db.PutJob(ctx, j); err != nil {
- return t.remoteCancelBuild(j.BuildbucketBuildId, fmt.Sprintf("Failed to insert Job into the DB: %s", err))
+ return t.remoteCancelV1Build(j.BuildbucketBuildId, fmt.Sprintf("Failed to insert Job into the DB: %s", err))
}
t.jCache.AddJobs([]*types.Job{j})
return nil
@@ -594,12 +646,12 @@
if err := startJobHelper(); err != nil {
sklog.Infof("Failed to start job %s (build %d) with: %s", job.Id, job.BuildbucketBuildId, err)
job.Status = types.JOB_STATUS_MISHAP
- job.StatusDetails = util.Truncate(fmt.Sprintf("Failed to start Job: %s", err), 1024)
+ job.StatusDetails = util.Truncate(fmt.Sprintf("Failed to start Job: %s", skerr.Unwrap(err)), 1024)
} else {
job.Status = types.JOB_STATUS_IN_PROGRESS
// Notify Buildbucket that the Job has started.
- bbError, err := t.jobStarted(job)
+ bbToken, bbError, err := t.jobStarted(ctx, job)
if err != nil {
return skerr.Wrapf(err, "failed to send job-started notification")
} else if bbError != nil {
@@ -613,6 +665,8 @@
} else {
return skerr.Fmt("failed to start build %d with %q", job.BuildbucketBuildId, bbError.Message)
}
+ } else if bbToken != "" {
+ job.BuildbucketToken = bbToken
}
}
@@ -630,14 +684,12 @@
}
// Grab all of the pending Builds from Buildbucket.
- // TODO(borenet): Buildbot maintains a maximum lease count. Should we do
- // that too?
cursor := ""
errs := []error{}
var mtx sync.Mutex
for {
- sklog.Infof("Running 'peek' on %s", t.bucket)
- resp, err := t.bb.Peek().Bucket(t.bucket).MaxBuilds(PEEK_MAX_BUILDS).StartCursor(cursor).Do()
+ sklog.Infof("Running 'peek' on %s", t.buildbucketBucket)
+ resp, err := t.bb.Peek().Bucket(t.buildbucketBucket).MaxBuilds(PEEK_MAX_BUILDS).StartCursor(cursor).Do()
if err != nil {
errs = append(errs, err)
break
@@ -651,7 +703,7 @@
wg.Add(1)
go func(b *buildbucket_api.LegacyApiCommonBuildMessage) {
defer wg.Done()
- if err := t.insertNewJob(ctx, b.Id); err != nil {
+ if err := t.insertNewJobV1(ctx, b.Id); err != nil {
mtx.Lock()
errs = append(errs, err)
mtx.Unlock()
@@ -673,26 +725,31 @@
return nil
}
-// jobStarted notifies Buildbucket that the given Job has started. Returns any
-// error object returned by Buildbucket (eg. if the Build has been canceled), or
-// any error which occurred when attempting the request.
-func (t *TryJobIntegrator) jobStarted(j *types.Job) (*buildbucket_api.LegacyApiErrorMessage, error) {
- sklog.Infof("bb.Start for job %s (build %d)", j.Id, j.BuildbucketBuildId)
- resp, err := t.bb.Start(j.BuildbucketBuildId, &buildbucket_api.LegacyApiStartRequestBodyMessage{
- LeaseKey: j.BuildbucketLeaseKey,
- Url: j.URL(t.host),
- }).Do()
- if err != nil {
- return nil, err
+// jobStarted notifies Buildbucket that the given Job has started. Returns the
+// Buildbucket token returned by Buildbucket, any error object returned by
+// Buildbucket (eg. if the Build has been canceled), or any error which occurred
+// when attempting the request.
+func (t *TryJobIntegrator) jobStarted(ctx context.Context, j *types.Job) (string, *buildbucket_api.LegacyApiErrorMessage, error) {
+ if isBBv2(j) {
+ sklog.Infof("bb2.Start for job %s (build %d)", j.Id, j.BuildbucketBuildId)
+ updateToken, err := t.bb2.StartBuild(ctx, j.BuildbucketBuildId, j.Id, j.BuildbucketToken)
+ return updateToken, nil, skerr.Wrap(err)
+ } else {
+ sklog.Infof("bb.Start for job %s (build %d)", j.Id, j.BuildbucketBuildId)
+ resp, err := t.bb.Start(j.BuildbucketBuildId, &buildbucket_api.LegacyApiStartRequestBodyMessage{
+ LeaseKey: j.BuildbucketLeaseKey,
+ Url: j.URL(t.host),
+ }).Do()
+ if err != nil {
+ return "", nil, err
+ }
+ return "", resp.Error, nil
}
- return resp.Error, nil
}
-// jobFinished notifies Buildbucket that the given Job has finished.
-func (t *TryJobIntegrator) jobFinished(j *types.Job) error {
- if !j.Done() {
- return skerr.Fmt("JobFinished called for unfinished Job!")
- }
+// buildSucceededV1 sends a success notification to Buildbucket.
+func (t *TryJobIntegrator) buildSucceededV1(j *types.Job) error {
+ sklog.Infof("bb.Succeed for job %s (build %d)", j.Id, j.BuildbucketBuildId)
b, err := json.Marshal(struct {
Job *types.Job `json:"job"`
}{
@@ -701,49 +758,77 @@
if err != nil {
return err
}
- if j.Status == types.JOB_STATUS_SUCCESS {
- sklog.Infof("bb.Succeed for job %s (build %d)", j.Id, j.BuildbucketBuildId)
- resp, err := t.bb.Succeed(j.BuildbucketBuildId, &buildbucket_api.LegacyApiSucceedRequestBodyMessage{
- LeaseKey: j.BuildbucketLeaseKey,
- ResultDetailsJson: string(b),
- Url: j.URL(t.host),
- }).Do()
- if err != nil {
- return err
- }
- if resp.Error != nil {
- if resp.Error.Reason == BUILDBUCKET_API_ERROR_REASON_COMPLETED {
- sklog.Warningf("Sent success status for build %d after completion.", j.BuildbucketBuildId)
- } else {
- return fmt.Errorf(resp.Error.Message)
- }
- }
- } else {
- failureReason := "BUILD_FAILURE"
- if j.Status == types.JOB_STATUS_MISHAP {
- failureReason = "INFRA_FAILURE"
- }
- sklog.Infof("bb.Fail for job %s (build %d)", j.Id, j.BuildbucketBuildId)
- resp, err := t.bb.Fail(j.BuildbucketBuildId, &buildbucket_api.LegacyApiFailRequestBodyMessage{
- FailureReason: failureReason,
- LeaseKey: j.BuildbucketLeaseKey,
- ResultDetailsJson: string(b),
- Url: j.URL(t.host),
- }).Do()
- if err != nil {
- return err
- }
- if resp.Error != nil {
- if resp.Error.Reason == BUILDBUCKET_API_ERROR_REASON_COMPLETED {
- sklog.Warningf("Sent failure status for build %d after completion.", j.BuildbucketBuildId)
- } else {
- return fmt.Errorf(resp.Error.Message)
- }
+ resp, err := t.bb.Succeed(j.BuildbucketBuildId, &buildbucket_api.LegacyApiSucceedRequestBodyMessage{
+ LeaseKey: j.BuildbucketLeaseKey,
+ ResultDetailsJson: string(b),
+ Url: j.URL(t.host),
+ }).Do()
+ if err != nil {
+ return err
+ }
+ if resp.Error != nil {
+ if resp.Error.Reason == BUILDBUCKET_API_ERROR_REASON_COMPLETED {
+ sklog.Warningf("Sent success status for build %d after completion.", j.BuildbucketBuildId)
+ } else {
+ return fmt.Errorf(resp.Error.Message)
}
}
return nil
}
+// buildFailed sends a failure notification to Buildbucket.
+func (t *TryJobIntegrator) buildFailed(j *types.Job) error {
+ b, err := json.Marshal(struct {
+ Job *types.Job `json:"job"`
+ }{
+ Job: j,
+ })
+ if err != nil {
+ return err
+ }
+ failureReason := "BUILD_FAILURE"
+ if j.Status == types.JOB_STATUS_MISHAP {
+ failureReason = "INFRA_FAILURE"
+ }
+ sklog.Infof("bb.Fail for job %s (build %d)", j.Id, j.BuildbucketBuildId)
+ resp, err := t.bb.Fail(j.BuildbucketBuildId, &buildbucket_api.LegacyApiFailRequestBodyMessage{
+ FailureReason: failureReason,
+ LeaseKey: j.BuildbucketLeaseKey,
+ ResultDetailsJson: string(b),
+ Url: j.URL(t.host),
+ }).Do()
+ if err != nil {
+ return err
+ }
+ if resp.Error != nil {
+ if resp.Error.Reason == BUILDBUCKET_API_ERROR_REASON_COMPLETED {
+ sklog.Warningf("Sent failure status for build %d after completion.", j.BuildbucketBuildId)
+ } else {
+ return fmt.Errorf(resp.Error.Message)
+ }
+ }
+ return nil
+}
+
+func (t *TryJobIntegrator) updateBuild(ctx context.Context, j *types.Job) error {
+ sklog.Infof("bb2.UpdateBuild for job %s (build %d)", j.Id, j.BuildbucketBuildId)
+ return t.bb2.UpdateBuild(ctx, jobToBuildV2(j), j.BuildbucketToken)
+}
+
+// jobFinished notifies Buildbucket that the given Job has finished.
+func (t *TryJobIntegrator) jobFinished(ctx context.Context, j *types.Job) error {
+ if !j.Done() {
+ return skerr.Fmt("JobFinished called for unfinished Job!")
+ }
+ if isBBv2(j) {
+ return t.updateBuild(ctx, j)
+ } else if j.Status == types.JOB_STATUS_SUCCESS {
+ return t.buildSucceededV1(j)
+ } else {
+ return t.buildFailed(j)
+ }
+}
+
// skipRepoState determines whether we should skip try jobs for this RepoState,
// eg. problematic CLs.
func skipRepoState(rs types.RepoState) bool {
@@ -753,3 +838,39 @@
}
return false
}
+
+func jobStatusToBuildbucketStatus(status types.JobStatus) buildbucketpb.Status {
+ switch status {
+ case types.JOB_STATUS_CANCELED:
+ return buildbucketpb.Status_CANCELED
+ case types.JOB_STATUS_FAILURE:
+ return buildbucketpb.Status_FAILURE
+ case types.JOB_STATUS_IN_PROGRESS:
+ return buildbucketpb.Status_STARTED
+ case types.JOB_STATUS_MISHAP:
+ return buildbucketpb.Status_INFRA_FAILURE
+ case types.JOB_STATUS_REQUESTED:
+ return buildbucketpb.Status_SCHEDULED
+ case types.JOB_STATUS_SUCCESS:
+ return buildbucketpb.Status_SUCCESS
+ default:
+ return buildbucketpb.Status_STATUS_UNSPECIFIED
+ }
+}
+
+// jobToBuildV2 converts a Job to a Buildbucket V2 Build to be used with
+// UpdateBuild.
+func jobToBuildV2(job *types.Job) *buildbucketpb.Build {
+ status := jobStatusToBuildbucketStatus(job.Status)
+
+ // Note: There are other fields we could fill in, but I'm not sure they
+ // would provide any value since we don't actually use Buildbucket builds
+ // for anything.
+ return &buildbucketpb.Build{
+ Id: job.BuildbucketBuildId,
+ Output: &buildbucketpb.Build_Output{
+ Status: status,
+ SummaryMarkdown: job.StatusDetails,
+ },
+ }
+}
diff --git a/task_scheduler/go/tryjobs/tryjobs_test.go b/task_scheduler/go/tryjobs/tryjobs_test.go
index 4875f1e..3d8aba7 100644
--- a/task_scheduler/go/tryjobs/tryjobs_test.go
+++ b/task_scheduler/go/tryjobs/tryjobs_test.go
@@ -3,6 +3,7 @@
import (
"context"
"encoding/json"
+ "errors"
"fmt"
"sort"
"strconv"
@@ -10,6 +11,8 @@
"testing"
"time"
+ "cloud.google.com/go/pubsub"
+ "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
buildbucketpb "go.chromium.org/luci/buildbucket/proto"
"go.skia.org/infra/go/buildbucket/mocks"
@@ -17,9 +20,12 @@
"go.skia.org/infra/go/gerrit"
"go.skia.org/infra/go/git"
"go.skia.org/infra/go/mockhttpclient"
+ "go.skia.org/infra/go/now"
+ pubsub_mocks "go.skia.org/infra/go/pubsub/mocks"
"go.skia.org/infra/go/testutils"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/types"
+ "google.golang.org/protobuf/encoding/prototext"
)
var distantFutureTime = time.Date(3000, time.January, 1, 0, 0, 0, 0, time.UTC)
@@ -40,18 +46,18 @@
// Verify that updateJobs sends heartbeats for unfinished try Jobs and
// success/failure for finished Jobs.
-func TestUpdateJobs_NoJobs_NoAction(t *testing.T) {
- ctx, trybots, mock, _ := setup(t)
+func TestUpdateJob_NoJobs_NoAction(t *testing.T) {
+ ctx, trybots, mock, _, _ := setup(t)
assertNoActiveTryJobs(t, trybots)
require.NoError(t, trybots.updateJobs(ctx))
require.True(t, mock.Empty(), mock.List())
}
-func TestUpdateJobs_OneUnfinished_SendsHeartbeat(t *testing.T) {
- ctx, trybots, mock, _ := setup(t)
+func TestUpdateJobsV1_OneUnfinished_SendsHeartbeat(t *testing.T) {
+ ctx, trybots, mock, _, _ := setup(t)
- j1 := tryjob(ctx, repoUrl)
+ j1 := tryjobV1(ctx, repoUrl)
MockHeartbeats(t, mock, ts, []*types.Job{j1}, nil)
require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j1}))
trybots.jCache.AddJobs([]*types.Job{j1})
@@ -60,41 +66,133 @@
assertActiveTryJob(t, trybots, j1)
}
-func TestUpdateJobs_FinishedJob_SendSuccess(t *testing.T) {
- ctx, trybots, mock, _ := setup(t)
+func TestUpdateJobsV2_OneUnfinished_SendsPubSub(t *testing.T) {
+ ctx, trybots, _, _, topic := setup(t)
- j1 := tryjob(ctx, repoUrl)
+ // Create the Job.
+ j1 := tryjobV2(ctx, repoUrl)
+ require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j1}))
+ trybots.jCache.AddJobs([]*types.Job{j1})
+
+ // Mock the pubsub message.
+ update := &buildbucketpb.BuildTaskUpdate{
+ BuildId: strconv.FormatInt(j1.BuildbucketBuildId, 10),
+ Task: &buildbucketpb.Task{
+ Id: &buildbucketpb.TaskID{
+ Target: trybots.buildbucketTarget,
+ Id: j1.Id,
+ },
+ Link: j1.URL(trybots.host),
+ Status: jobStatusToBuildbucketStatus(j1.Status),
+ UpdateId: now.Now(ctx).UnixNano(),
+ },
+ }
+ b, err := prototext.Marshal(update)
+ require.NoError(t, err)
+ result := &pubsub_mocks.PublishResult{}
+ result.On("Get", testutils.AnyContext).Return("fake-server-id", nil)
+ topic.On("Publish", testutils.AnyContext, &pubsub.Message{Data: b}).Return(result)
+
+ // Run updateJobs, assert that we sent the message.
+ require.NoError(t, trybots.updateJobs(ctx))
+ assertActiveTryJob(t, trybots, j1)
+ topic.AssertExpectations(t)
+ result.AssertExpectations(t)
+}
+
+func TestUpdateJobsV1_FinishedJob_SendSuccess(t *testing.T) {
+ ctx, trybots, mock, _, _ := setup(t)
+
+ j1 := tryjobV1(ctx, repoUrl)
j1.Status = types.JOB_STATUS_SUCCESS
j1.Finished = ts
require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j1}))
trybots.jCache.AddJobs([]*types.Job{j1})
+ require.NotEmpty(t, j1.BuildbucketLeaseKey)
MockJobSuccess(mock, j1, ts, false)
require.NoError(t, trybots.updateJobs(ctx))
require.True(t, mock.Empty(), mock.List())
assertNoActiveTryJobs(t, trybots)
+ j1, err := trybots.db.GetJobById(ctx, j1.Id)
+ require.NoError(t, err)
+ require.Empty(t, j1.BuildbucketLeaseKey)
+ require.Empty(t, j1.BuildbucketToken)
}
-func TestUpdateJobs_FailedJob_SendFailure(t *testing.T) {
- ctx, trybots, mock, _ := setup(t)
+func TestUpdateJobsV2_FinishedJob_SendSuccess(t *testing.T) {
+ ctx, trybots, _, mockBB, _ := setup(t)
- j1 := tryjob(ctx, repoUrl)
+ j1 := tryjobV2(ctx, repoUrl)
+ j1.Status = types.JOB_STATUS_SUCCESS
+ j1.Finished = ts
+ require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j1}))
+ trybots.jCache.AddJobs([]*types.Job{j1})
+ require.NotEmpty(t, j1.BuildbucketToken)
+ mockBB.On("UpdateBuild", testutils.AnyContext, &buildbucketpb.Build{
+ Id: j1.BuildbucketBuildId,
+ Output: &buildbucketpb.Build_Output{
+ Status: buildbucketpb.Status_SUCCESS,
+ },
+ }, j1.BuildbucketToken).Return(nil)
+ require.NoError(t, trybots.updateJobs(ctx))
+ mockBB.AssertExpectations(t)
+ assertNoActiveTryJobs(t, trybots)
+ j1, err := trybots.db.GetJobById(ctx, j1.Id)
+ require.NoError(t, err)
+ require.Empty(t, j1.BuildbucketLeaseKey)
+ require.Empty(t, j1.BuildbucketToken)
+}
+
+func TestUpdateJobsV1_FailedJob_SendFailure(t *testing.T) {
+ ctx, trybots, mock, _, _ := setup(t)
+
+ j1 := tryjobV1(ctx, repoUrl)
j1.Status = types.JOB_STATUS_FAILURE
j1.Finished = ts
j1.BuildbucketLeaseKey = 12345
require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j1}))
trybots.jCache.AddJobs([]*types.Job{j1})
+ require.NotEmpty(t, j1.BuildbucketLeaseKey)
MockJobFailure(mock, j1, ts)
require.NoError(t, trybots.updateJobs(ctx))
require.True(t, mock.Empty(), mock.List())
assertNoActiveTryJobs(t, trybots)
+ j1, err := trybots.db.GetJobById(ctx, j1.Id)
+ require.NoError(t, err)
+ require.Empty(t, j1.BuildbucketLeaseKey)
+ require.Empty(t, j1.BuildbucketToken)
}
-func TestUpdateJobs_ManyInProgress_MultipleHeartbeatBatches(t *testing.T) {
- ctx, trybots, mock, _ := setup(t)
+func TestUpdateJobsV2_FailedJob_SendFailure(t *testing.T) {
+ ctx, trybots, _, mockBB, _ := setup(t)
+
+ j1 := tryjobV2(ctx, repoUrl)
+ j1.Status = types.JOB_STATUS_FAILURE
+ j1.Finished = ts
+ require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j1}))
+ trybots.jCache.AddJobs([]*types.Job{j1})
+ require.NotEmpty(t, j1.BuildbucketToken)
+ mockBB.On("UpdateBuild", testutils.AnyContext, &buildbucketpb.Build{
+ Id: j1.BuildbucketBuildId,
+ Output: &buildbucketpb.Build_Output{
+ Status: buildbucketpb.Status_FAILURE,
+ },
+ }, j1.BuildbucketToken).Return(nil)
+ require.NoError(t, trybots.updateJobs(ctx))
+ mockBB.AssertExpectations(t)
+ assertNoActiveTryJobs(t, trybots)
+ j1, err := trybots.db.GetJobById(ctx, j1.Id)
+ require.NoError(t, err)
+ require.Empty(t, j1.BuildbucketLeaseKey)
+ require.Empty(t, j1.BuildbucketToken)
+}
+
+func TestUpdateJobsV1_ManyInProgress_MultipleHeartbeatBatches(t *testing.T) {
+ ctx, trybots, mock, _, _ := setup(t)
jobs := []*types.Job{}
for i := 0; i < LEASE_BATCH_SIZE+2; i++ {
- jobs = append(jobs, tryjob(ctx, repoUrl))
+ jobs = append(jobs, tryjobV1(ctx, repoUrl))
}
sort.Sort(heartbeatJobSlice(jobs))
MockHeartbeats(t, mock, ts, jobs[:LEASE_BATCH_SIZE], nil)
@@ -105,12 +203,55 @@
require.True(t, mock.Empty(), mock.List())
}
-func TestUpdateJobs_HeartbeatBatchOneFailed_JobIsCanceled(t *testing.T) {
- ctx, trybots, mock, _ := setup(t)
+func TestUpdateJobsV2_ManyInProgress_MultiplePubSubMessages(t *testing.T) {
+ ctx, trybots, _, _, topic := setup(t)
+
+ // Create the Jobs.
+ var jobs []*types.Job
+ for i := 0; i < 27; i++ { // Arbitrary number of jobs.
+ job := tryjobV2(ctx, repoUrl)
+ job.BuildbucketBuildId = int64(i) // Easier to debug.
+ jobs = append(jobs, job)
+ }
+ require.NoError(t, trybots.db.PutJobs(ctx, jobs))
+ trybots.jCache.AddJobs(jobs)
+
+ // Mock the pubsub messages.
+ allMocks := []*mock.Mock{&topic.Mock}
+ for _, job := range jobs {
+ update := &buildbucketpb.BuildTaskUpdate{
+ BuildId: strconv.FormatInt(job.BuildbucketBuildId, 10),
+ Task: &buildbucketpb.Task{
+ Id: &buildbucketpb.TaskID{
+ Target: trybots.buildbucketTarget,
+ Id: job.Id,
+ },
+ Link: job.URL(trybots.host),
+ Status: jobStatusToBuildbucketStatus(job.Status),
+ UpdateId: now.Now(ctx).UnixNano(),
+ },
+ }
+ b, err := prototext.Marshal(update)
+ require.NoError(t, err)
+ result := &pubsub_mocks.PublishResult{}
+ result.On("Get", testutils.AnyContext).Return("fake-server-id", nil)
+ topic.On("Publish", testutils.AnyContext, &pubsub.Message{Data: b}).Return(result)
+ allMocks = append(allMocks, &result.Mock)
+ }
+
+ // Call updateJobs, assert that we sent the messages.
+ require.NoError(t, trybots.updateJobs(ctx))
+ for _, mock := range allMocks {
+ mock.AssertExpectations(t)
+ }
+}
+
+func TestUpdateJobsV1_HeartbeatBatchOneFailed_JobIsCanceled(t *testing.T) {
+ ctx, trybots, mock, _, _ := setup(t)
jobs := []*types.Job{}
for i := 0; i < LEASE_BATCH_SIZE+2; i++ {
- jobs = append(jobs, tryjob(ctx, repoUrl))
+ jobs = append(jobs, tryjobV1(ctx, repoUrl))
}
j1, j2 := jobs[0], jobs[1]
for _, j := range jobs[2:] {
@@ -124,7 +265,7 @@
}
MockHeartbeats(t, mock, ts, []*types.Job{j1, j2}, map[string]*heartbeatResp{
j1.Id: {
- BuildId: fmt.Sprintf("%d", j1.BuildbucketBuildId),
+ BuildId: strconv.FormatInt(j1.BuildbucketBuildId, 10),
Error: &errMsg{
Message: "fail",
},
@@ -142,7 +283,7 @@
}
func TestGetRevision(t *testing.T) {
- ctx, trybots, mock, _ := setup(t)
+ ctx, trybots, mock, _, _ := setup(t)
// Get the (only) commit from the repo.
r, err := trybots.getRepo(repoUrl)
@@ -164,111 +305,159 @@
require.Equal(t, c, got)
}
-func TestCancelBuild_Success(t *testing.T) {
- _, trybots, mock, _ := setup(t)
+func TestRemoteCancelV1Build_Success(t *testing.T) {
+ _, trybots, mock, _, _ := setup(t)
const id = int64(12345)
MockCancelBuild(mock, id, "Canceling!")
- require.NoError(t, trybots.remoteCancelBuild(id, "Canceling!"))
+ require.NoError(t, trybots.remoteCancelV1Build(id, "Canceling!"))
require.True(t, mock.Empty(), mock.List())
}
-func TestCancelBuild_Success_LongMessageTruncated(t *testing.T) {
- _, trybots, mock, _ := setup(t)
+func TestRemoteCancelV1Build_Success_LongMessageTruncated(t *testing.T) {
+ _, trybots, mock, _, _ := setup(t)
const id = int64(12345)
MockCancelBuild(mock, id, strings.Repeat("X", maxCancelReasonLen-3)+"...")
- require.NoError(t, trybots.remoteCancelBuild(id, strings.Repeat("X", maxCancelReasonLen+50)))
+ require.NoError(t, trybots.remoteCancelV1Build(id, strings.Repeat("X", maxCancelReasonLen+50)))
require.True(t, mock.Empty(), mock.List())
}
-func TestCancelBuild_Failed(t *testing.T) {
- _, trybots, mock, _ := setup(t)
+func TestRemoteCancelV1Build_Failed(t *testing.T) {
+ _, trybots, mock, _, _ := setup(t)
const id = int64(12345)
expectErr := "Build does not exist!"
MockCancelBuildFailed(mock, id, "Canceling!", expectErr)
- require.EqualError(t, trybots.remoteCancelBuild(id, "Canceling!"), expectErr)
+ require.EqualError(t, trybots.remoteCancelV1Build(id, "Canceling!"), expectErr)
require.True(t, mock.Empty(), mock.List())
}
-func TestTryLeaseBuild_Success(t *testing.T) {
- ctx, trybots, mock, _ := setup(t)
+func TestTryLeaseV1Build_Success(t *testing.T) {
+ ctx, trybots, mock, _, _ := setup(t)
const id = int64(12345)
MockTryLeaseBuild(mock, id)
- k, bbError, err := trybots.tryLeaseBuild(ctx, id)
+ k, bbError, err := trybots.tryLeaseV1Build(ctx, id)
require.NoError(t, err)
require.Nil(t, bbError)
require.NotEqual(t, k, 0)
require.True(t, mock.Empty(), mock.List())
}
-func TestTryLeaseBuild_Failure(t *testing.T) {
- ctx, trybots, mock, _ := setup(t)
+func TestTryLeaseV1Build_Failure(t *testing.T) {
+ ctx, trybots, mock, _, _ := setup(t)
const id = int64(12345)
expect := "Can't lease this!"
MockTryLeaseBuildFailed(mock, id, expect)
- _, bbError, err := trybots.tryLeaseBuild(ctx, id)
+ _, bbError, err := trybots.tryLeaseV1Build(ctx, id)
require.NoError(t, err)
require.NotNil(t, bbError)
require.Contains(t, bbError.Message, expect)
require.True(t, mock.Empty(), mock.List())
}
-func TestJobStarted_Success(t *testing.T) {
- ctx, trybots, mock, _ := setup(t)
+func TestJobStartedV1_Success(t *testing.T) {
+ ctx, trybots, mock, _, _ := setup(t)
- j := tryjob(ctx, repoUrl)
+ j := tryjobV1(ctx, repoUrl)
MockJobStarted(mock, j.BuildbucketBuildId)
- bbError, err := trybots.jobStarted(j)
+ bbToken, bbError, err := trybots.jobStarted(ctx, j)
require.NoError(t, err)
require.Nil(t, bbError)
+ require.Empty(t, bbToken) // No update token for V1 builds.
require.True(t, mock.Empty(), mock.List())
}
-func TestJobStarted_Failure(t *testing.T) {
- ctx, trybots, mock, _ := setup(t)
+func TestJobStartedV2_Success(t *testing.T) {
+ ctx, trybots, _, mockBB, _ := setup(t)
- j := tryjob(ctx, repoUrl)
+ j := tryjobV2(ctx, repoUrl)
+
+ mockBB.On("StartBuild", testutils.AnyContext, j.BuildbucketBuildId, j.Id, j.BuildbucketToken).Return(bbFakeUpdateToken, nil)
+ bbToken, bbError, err := trybots.jobStarted(ctx, j)
+ require.NoError(t, err)
+ require.Nil(t, bbError)
+ require.Equal(t, bbFakeUpdateToken, bbToken)
+ mockBB.AssertExpectations(t)
+}
+
+func TestJobStartedV1_Failure(t *testing.T) {
+ ctx, trybots, mock, _, _ := setup(t)
+
+ j := tryjobV1(ctx, repoUrl)
expectErr := "fail"
- MockJobStartedFailed(mock, j.BuildbucketBuildId, expectErr)
- bbError, err := trybots.jobStarted(j)
+ MockJobStartedFailed(mock, j.BuildbucketBuildId, expectErr, "INVALID_INPUT")
+ bbToken, bbError, err := trybots.jobStarted(ctx, j)
require.NoError(t, err)
require.NotNil(t, bbError)
+ require.Empty(t, bbToken)
require.Contains(t, bbError.Message, expectErr)
+ require.Contains(t, bbError.Reason, "INVALID_INPUT")
require.True(t, mock.Empty(), mock.List())
}
-func TestJobFinished_NotActuallyFinished(t *testing.T) {
- ctx, trybots, _, _ := setup(t)
+func TestJobStartedV2_Failure(t *testing.T) {
+ ctx, trybots, _, mockBB, _ := setup(t)
- j := tryjob(ctx, repoUrl)
+ j := tryjobV2(ctx, repoUrl)
- require.ErrorContains(t, trybots.jobFinished(j), "JobFinished called for unfinished Job!")
+ mockBB.On("StartBuild", testutils.AnyContext, j.BuildbucketBuildId, j.Id, j.BuildbucketToken).Return("", errors.New("failed"))
+ bbToken, bbError, err := trybots.jobStarted(ctx, j)
+ require.ErrorContains(t, err, "failed")
+ require.Nil(t, bbError)
+ require.Empty(t, bbToken)
+ mockBB.AssertExpectations(t)
}
-func TestJobFinished_JobSucceeded_UpdateSucceeds(t *testing.T) {
- ctx, trybots, mock, _ := setup(t)
+func TestJobFinishedV1_NotActuallyFinished(t *testing.T) {
+ ctx, trybots, _, _, _ := setup(t)
- j := tryjob(ctx, repoUrl)
+ j := tryjobV1(ctx, repoUrl)
+
+ require.ErrorContains(t, trybots.jobFinished(ctx, j), "JobFinished called for unfinished Job!")
+}
+
+func TestJobFinishedV1_JobSucceeded_UpdateSucceeds(t *testing.T) {
+ ctx, trybots, mock, _, _ := setup(t)
+
+ j := tryjobV1(ctx, repoUrl)
now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
j.Status = types.JOB_STATUS_SUCCESS
j.Finished = now
require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j}))
trybots.jCache.AddJobs([]*types.Job{j})
MockJobSuccess(mock, j, now, false)
- require.NoError(t, trybots.jobFinished(j))
+ require.NoError(t, trybots.jobFinished(ctx, j))
require.True(t, mock.Empty(), mock.List())
}
-func TestJobFinished_JobSucceeded_UpdateFails(t *testing.T) {
- ctx, trybots, mock, _ := setup(t)
+func TestJobFinishedV2_JobSucceeded_UpdateSucceeds(t *testing.T) {
+ ctx, trybots, _, mockBB, _ := setup(t)
- j := tryjob(ctx, repoUrl)
+ j := tryjobV2(ctx, repoUrl)
+ now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
+ j.Status = types.JOB_STATUS_SUCCESS
+ j.Finished = now
+ require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j}))
+ trybots.jCache.AddJobs([]*types.Job{j})
+ mockBB.On("UpdateBuild", testutils.AnyContext, &buildbucketpb.Build{
+ Id: j.BuildbucketBuildId,
+ Output: &buildbucketpb.Build_Output{
+ Status: buildbucketpb.Status_SUCCESS,
+ },
+ }, j.BuildbucketToken).Return(nil)
+ require.NoError(t, trybots.jobFinished(ctx, j))
+ mockBB.AssertExpectations(t)
+}
+
+func TestJobFinishedV1_JobSucceeded_UpdateFails(t *testing.T) {
+ ctx, trybots, mock, _, _ := setup(t)
+
+ j := tryjobV1(ctx, repoUrl)
now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
j.Status = types.JOB_STATUS_SUCCESS
j.Finished = now
@@ -276,28 +465,66 @@
trybots.jCache.AddJobs([]*types.Job{j})
expectErr := "fail"
MockJobSuccess_Failed(mock, j, now, false, expectErr)
- require.EqualError(t, trybots.jobFinished(j), expectErr)
+ require.EqualError(t, trybots.jobFinished(ctx, j), expectErr)
require.True(t, mock.Empty(), mock.List())
}
-func TestJobFinished_JobFailed_UpdateSucceeds(t *testing.T) {
- ctx, trybots, mock, _ := setup(t)
+func TestJobFinishedV2_JobSucceeded_UpdateFails(t *testing.T) {
+ ctx, trybots, _, mockBB, _ := setup(t)
- j := tryjob(ctx, repoUrl)
+ j := tryjobV2(ctx, repoUrl)
+ now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
+ j.Status = types.JOB_STATUS_SUCCESS
+ j.Finished = now
+ require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j}))
+ trybots.jCache.AddJobs([]*types.Job{j})
+ mockBB.On("UpdateBuild", testutils.AnyContext, &buildbucketpb.Build{
+ Id: j.BuildbucketBuildId,
+ Output: &buildbucketpb.Build_Output{
+ Status: buildbucketpb.Status_SUCCESS,
+ },
+ }, j.BuildbucketToken).Return(errors.New("failed"))
+ require.ErrorContains(t, trybots.jobFinished(ctx, j), "failed")
+ mockBB.AssertExpectations(t)
+}
+
+func TestJobFinishedV1_JobFailed_UpdateSucceeds(t *testing.T) {
+ ctx, trybots, mock, _, _ := setup(t)
+
+ j := tryjobV1(ctx, repoUrl)
now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
j.Status = types.JOB_STATUS_FAILURE
j.Finished = now
require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j}))
trybots.jCache.AddJobs([]*types.Job{j})
MockJobFailure(mock, j, now)
- require.NoError(t, trybots.jobFinished(j))
+ require.NoError(t, trybots.jobFinished(ctx, j))
require.True(t, mock.Empty(), mock.List())
}
-func TestJobFinished_JobFailed_UpdateFails(t *testing.T) {
- ctx, trybots, mock, _ := setup(t)
+func TestJobFinishedV2_JobFailed_UpdateSucceeds(t *testing.T) {
+ ctx, trybots, _, mockBB, _ := setup(t)
- j := tryjob(ctx, repoUrl)
+ j := tryjobV2(ctx, repoUrl)
+ now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
+ j.Status = types.JOB_STATUS_FAILURE
+ j.Finished = now
+ require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j}))
+ trybots.jCache.AddJobs([]*types.Job{j})
+ mockBB.On("UpdateBuild", testutils.AnyContext, &buildbucketpb.Build{
+ Id: j.BuildbucketBuildId,
+ Output: &buildbucketpb.Build_Output{
+ Status: buildbucketpb.Status_FAILURE,
+ },
+ }, j.BuildbucketToken).Return(nil)
+ require.NoError(t, trybots.jobFinished(ctx, j))
+ mockBB.AssertExpectations(t)
+}
+
+func TestJobFinishedV1_JobFailed_UpdateFails(t *testing.T) {
+ ctx, trybots, mock, _, _ := setup(t)
+
+ j := tryjobV1(ctx, repoUrl)
now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
j.Status = types.JOB_STATUS_FAILURE
j.Finished = now
@@ -305,28 +532,66 @@
trybots.jCache.AddJobs([]*types.Job{j})
expectErr := "fail"
MockJobFailure_Failed(mock, j, now, expectErr)
- require.EqualError(t, trybots.jobFinished(j), expectErr)
+ require.EqualError(t, trybots.jobFinished(ctx, j), expectErr)
require.True(t, mock.Empty(), mock.List())
}
-func TestJobFinished_JobMishap_UpdateSucceeds(t *testing.T) {
- ctx, trybots, mock, _ := setup(t)
+func TestJobFinishedV2_JobFailed_UpdateFails(t *testing.T) {
+ ctx, trybots, _, mockBB, _ := setup(t)
- j := tryjob(ctx, repoUrl)
+ j := tryjobV2(ctx, repoUrl)
+ now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
+ j.Status = types.JOB_STATUS_FAILURE
+ j.Finished = now
+ require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j}))
+ trybots.jCache.AddJobs([]*types.Job{j})
+ mockBB.On("UpdateBuild", testutils.AnyContext, &buildbucketpb.Build{
+ Id: j.BuildbucketBuildId,
+ Output: &buildbucketpb.Build_Output{
+ Status: buildbucketpb.Status_FAILURE,
+ },
+ }, j.BuildbucketToken).Return(errors.New("failed"))
+ require.ErrorContains(t, trybots.jobFinished(ctx, j), "failed")
+ mockBB.AssertExpectations(t)
+}
+
+func TestJobFinishedV1_JobMishap_UpdateSucceeds(t *testing.T) {
+ ctx, trybots, mock, _, _ := setup(t)
+
+ j := tryjobV1(ctx, repoUrl)
now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
j.Status = types.JOB_STATUS_MISHAP
j.Finished = now
require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j}))
trybots.jCache.AddJobs([]*types.Job{j})
MockJobMishap(mock, j, now)
- require.NoError(t, trybots.jobFinished(j))
+ require.NoError(t, trybots.jobFinished(ctx, j))
require.True(t, mock.Empty(), mock.List())
}
-func TestJobFinished_JobMishap_UpdateFails(t *testing.T) {
- ctx, trybots, mock, _ := setup(t)
+func TestJobFinishedV2_JobMishap_UpdateSucceeds(t *testing.T) {
+ ctx, trybots, _, mockBB, _ := setup(t)
- j := tryjob(ctx, repoUrl)
+ j := tryjobV2(ctx, repoUrl)
+ now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
+ j.Status = types.JOB_STATUS_MISHAP
+ j.Finished = now
+ require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j}))
+ trybots.jCache.AddJobs([]*types.Job{j})
+ mockBB.On("UpdateBuild", testutils.AnyContext, &buildbucketpb.Build{
+ Id: j.BuildbucketBuildId,
+ Output: &buildbucketpb.Build_Output{
+ Status: buildbucketpb.Status_INFRA_FAILURE,
+ },
+ }, j.BuildbucketToken).Return(nil)
+ require.NoError(t, trybots.jobFinished(ctx, j))
+ mockBB.AssertExpectations(t)
+}
+
+func TestJobFinishedV1_JobMishap_UpdateFails(t *testing.T) {
+ ctx, trybots, mock, _, _ := setup(t)
+
+ j := tryjobV1(ctx, repoUrl)
now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
j.Status = types.JOB_STATUS_MISHAP
j.Finished = now
@@ -334,10 +599,29 @@
trybots.jCache.AddJobs([]*types.Job{j})
expectErr := "fail"
MockJobMishap_Failed(mock, j, now, expectErr)
- require.EqualError(t, trybots.jobFinished(j), expectErr)
+ require.EqualError(t, trybots.jobFinished(ctx, j), expectErr)
require.True(t, mock.Empty(), mock.List())
}
+func TestJobFinishedV2_JobMishap_UpdateFails(t *testing.T) {
+ ctx, trybots, _, mockBB, _ := setup(t)
+
+ j := tryjobV2(ctx, repoUrl)
+ now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
+ j.Status = types.JOB_STATUS_MISHAP
+ j.Finished = now
+ require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j}))
+ trybots.jCache.AddJobs([]*types.Job{j})
+ mockBB.On("UpdateBuild", testutils.AnyContext, &buildbucketpb.Build{
+ Id: j.BuildbucketBuildId,
+ Output: &buildbucketpb.Build_Output{
+ Status: buildbucketpb.Status_INFRA_FAILURE,
+ },
+ }, j.BuildbucketToken).Return(errors.New("failed"))
+ require.ErrorContains(t, trybots.jobFinished(ctx, j), "failed")
+ mockBB.AssertExpectations(t)
+}
+
type addedJobs map[string]*types.Job
func (aj addedJobs) getAddedJob(ctx context.Context, t *testing.T, d db.JobReader) *types.Job {
@@ -352,8 +636,8 @@
return nil
}
-func TestInsertNewJob_LeaseSucceeds_StatusIsRequested(t *testing.T) {
- ctx, trybots, mock, mockBB := setup(t)
+func TestInsertNewJobV1_LeaseSucceeds_StatusIsRequested(t *testing.T) {
+ ctx, trybots, mock, mockBB, _ := setup(t)
now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
aj := addedJobs(map[string]*types.Job{})
@@ -364,7 +648,7 @@
b1 := Build(t, now)
mockBB.On("GetBuild", ctx, b1.Id).Return(b1, nil)
MockTryLeaseBuild(mock, b1.Id)
- err := trybots.insertNewJob(ctx, b1.Id)
+ err := trybots.insertNewJobV1(ctx, b1.Id)
require.NoError(t, err)
require.True(t, mock.Empty(), mock.List())
result := aj.getAddedJob(ctx, t, trybots.db)
@@ -380,8 +664,8 @@
require.True(t, result.RepoState.Valid())
}
-func TestInsertNewJob_NoGerritChanges_BuildIsCanceled(t *testing.T) {
- ctx, trybots, mock, mockBB := setup(t)
+func TestInsertNewJobV1_NoGerritChanges_BuildIsCanceled(t *testing.T) {
+ ctx, trybots, mock, mockBB, _ := setup(t)
now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
aj := addedJobs(map[string]*types.Job{})
@@ -390,15 +674,15 @@
b2.Input.GerritChanges = nil
MockCancelBuild(mock, b2.Id, fmt.Sprintf("Invalid Build %d: input should have exactly one GerritChanges: ", b2.Id))
mockBB.On("GetBuild", ctx, b2.Id).Return(b2, nil)
- err := trybots.insertNewJob(ctx, b2.Id)
+ err := trybots.insertNewJobV1(ctx, b2.Id)
require.NoError(t, err) // We don't report errors for bad data from buildbucket.
result := aj.getAddedJob(ctx, t, trybots.db)
require.Nil(t, result)
require.True(t, mock.Empty(), mock.List())
}
-func TestInsertNewJob_InvalidRepo_BuildIsCanceled(t *testing.T) {
- ctx, trybots, mock, mockBB := setup(t)
+func TestInsertNewJobV1_InvalidRepo_BuildIsCanceled(t *testing.T) {
+ ctx, trybots, mock, mockBB, _ := setup(t)
now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
aj := addedJobs(map[string]*types.Job{})
@@ -407,15 +691,15 @@
b3.Input.GerritChanges[0].Project = "bogus-repo"
MockCancelBuild(mock, b3.Id, `Unknown patch project \\\"bogus-repo\\\"`)
mockBB.On("GetBuild", ctx, b3.Id).Return(b3, nil)
- err := trybots.insertNewJob(ctx, b3.Id)
+ err := trybots.insertNewJobV1(ctx, b3.Id)
require.NoError(t, err) // We don't report errors for bad data from buildbucket.
result := aj.getAddedJob(ctx, t, trybots.db)
require.Nil(t, result)
require.True(t, mock.Empty(), mock.List())
}
-func TestInsertNewJob_LeaseFailed_BuildIsCanceled(t *testing.T) {
- ctx, trybots, mock, mockBB := setup(t)
+func TestInsertNewJobV1_LeaseFailed_BuildIsCanceled(t *testing.T) {
+ ctx, trybots, mock, mockBB, _ := setup(t)
now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
aj := addedJobs(map[string]*types.Job{})
@@ -425,15 +709,15 @@
expectErr := "Can't lease this!"
MockTryLeaseBuildFailed(mock, b4.Id, expectErr)
MockCancelBuild(mock, b4.Id, `Buildbucket refused lease with \\\"Can't lease this!\\\"`)
- err := trybots.insertNewJob(ctx, b4.Id)
+ err := trybots.insertNewJobV1(ctx, b4.Id)
require.NoError(t, err) // We don't report errors for bad data from buildbucket.
result := aj.getAddedJob(ctx, t, trybots.db)
require.Nil(t, result)
require.True(t, mock.Empty(), mock.List())
}
-func TestStartJob_NormalJob_Succeeds(t *testing.T) {
- ctx, trybots, mock, mockBB := setup(t)
+func TestStartJobV1_NormalJob_Succeeds(t *testing.T) {
+ ctx, trybots, mock, mockBB, _ := setup(t)
mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
@@ -441,21 +725,41 @@
b1 := Build(t, now)
mockBB.On("GetBuild", ctx, b1.Id).Return(b1, nil)
MockTryLeaseBuild(mock, b1.Id)
- require.NoError(t, trybots.insertNewJob(ctx, b1.Id))
+ require.NoError(t, trybots.insertNewJobV1(ctx, b1.Id))
j1 := aj.getAddedJob(ctx, t, trybots.db)
require.Empty(t, j1.Revision) // Revision isn't set until startJob runs.
MockJobStarted(mock, b1.Id)
- err := trybots.startJob(ctx, j1)
- require.NoError(t, err)
+ require.NoError(t, trybots.startJob(ctx, j1))
require.True(t, mock.Empty(), mock.List())
require.Equal(t, j1.BuildbucketBuildId, b1.Id)
require.NotEqual(t, "", j1.BuildbucketLeaseKey)
require.Equal(t, commit2.Hash, j1.Revision)
}
-func TestStartJob_RevisionAlreadySet_Succeeds(t *testing.T) {
- ctx, trybots, mock, mockBB := setup(t)
+func TestStartJobV2_NormalJob_Succeeds(t *testing.T) {
+ ctx, trybots, mock, mockBB, _ := setup(t)
+
+ j1 := tryjobV2(ctx, repoUrl)
+ j1.Revision = "" // No revision is set initially; it's derived in startJob.
+ j1.Status = types.JOB_STATUS_REQUESTED
+ require.NoError(t, trybots.db.PutJob(ctx, j1))
+ oldToken := j1.BuildbucketToken
+ mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
+ mockBB.On("StartBuild", testutils.AnyContext, j1.BuildbucketBuildId, j1.Id, j1.BuildbucketToken).Return(bbFakeUpdateToken, nil)
+ require.NoError(t, trybots.startJob(ctx, j1))
+ j1, err := trybots.db.GetJobById(ctx, j1.Id)
+ require.NoError(t, err)
+ require.Equal(t, commit2.Hash, j1.Revision)
+ require.Equal(t, types.JOB_STATUS_IN_PROGRESS, j1.Status)
+ // Start token is exchanged for an update token.
+ require.NotEmpty(t, j1.BuildbucketToken)
+ require.NotEqual(t, oldToken, j1.BuildbucketToken)
+ mockBB.AssertExpectations(t)
+}
+
+func TestStartJobV1_RevisionAlreadySet_Succeeds(t *testing.T) {
+ ctx, trybots, mock, mockBB, _ := setup(t)
mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
@@ -463,21 +767,43 @@
b1 := Build(t, now)
mockBB.On("GetBuild", ctx, b1.Id).Return(b1, nil)
MockTryLeaseBuild(mock, b1.Id)
- require.NoError(t, trybots.insertNewJob(ctx, b1.Id))
+ require.NoError(t, trybots.insertNewJobV1(ctx, b1.Id))
j1 := aj.getAddedJob(ctx, t, trybots.db)
j1.Revision = oldBranchName // We'll resolve this to the actual hash.
MockJobStarted(mock, b1.Id)
- err := trybots.startJob(ctx, j1)
- require.NoError(t, err)
+ require.NoError(t, trybots.startJob(ctx, j1))
require.True(t, mock.Empty(), mock.List())
require.Equal(t, j1.BuildbucketBuildId, b1.Id)
require.NotEqual(t, "", j1.BuildbucketLeaseKey)
require.Equal(t, commit1.Hash, j1.Revision) // Ensure we resolved the branch
}
-func TestStartJob_NormalJob_Failed(t *testing.T) {
- ctx, trybots, mock, mockBB := setup(t)
+func TestStartJobV2_RevisionAlreadySet_Succeeds(t *testing.T) {
+ ctx, trybots, mock, mockBB, _ := setup(t)
+
+ j1 := tryjobV2(ctx, repoUrl)
+ // Set revision to a different value; ensure we don't override.
+ j1.Revision = commit1.Hash
+ j1.Status = types.JOB_STATUS_REQUESTED
+ require.NoError(t, trybots.db.PutJob(ctx, j1))
+ oldToken := j1.BuildbucketToken
+ mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
+ mockBB.On("StartBuild", testutils.AnyContext, j1.BuildbucketBuildId, j1.Id, j1.BuildbucketToken).Return(bbFakeUpdateToken, nil)
+ require.NoError(t, trybots.startJob(ctx, j1))
+ j1, err := trybots.db.GetJobById(ctx, j1.Id)
+ require.NoError(t, err)
+ require.Equal(t, commit1.Hash, j1.Revision)
+ require.Equal(t, types.JOB_STATUS_IN_PROGRESS, j1.Status)
+ // Start token is exchanged for an update token.
+ require.NotEmpty(t, j1.BuildbucketToken)
+ require.NotEqual(t, oldToken, j1.BuildbucketToken)
+ require.True(t, j1.Valid())
+ mockBB.AssertExpectations(t)
+}
+
+func TestStartJobV1_NormalJob_Failed(t *testing.T) {
+ ctx, trybots, mock, mockBB, _ := setup(t)
mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
@@ -485,22 +811,44 @@
b1 := Build(t, now)
mockBB.On("GetBuild", ctx, b1.Id).Return(b1, nil)
MockTryLeaseBuild(mock, b1.Id)
- require.NoError(t, trybots.insertNewJob(ctx, b1.Id))
+ require.NoError(t, trybots.insertNewJobV1(ctx, b1.Id))
j1 := aj.getAddedJob(ctx, t, trybots.db)
expectErr := "Can't start this build!"
- MockJobStartedFailed(mock, b1.Id, expectErr)
+ MockJobStartedFailed(mock, b1.Id, expectErr, "INVALID_INPUT")
err := trybots.startJob(ctx, j1)
require.ErrorContains(t, err, expectErr)
require.True(t, mock.Empty(), mock.List())
- updatedJ1, err := trybots.jCache.GetJob(j1.Id)
+ j1, err = trybots.jCache.GetJob(j1.Id)
require.NoError(t, err)
- require.Equal(t, types.JOB_STATUS_CANCELED, updatedJ1.Status)
- // TODO(borenet): Add a field to Job to give more details and check it here.
+ require.Equal(t, types.JOB_STATUS_CANCELED, j1.Status)
+ require.Contains(t, j1.StatusDetails, "INVALID_INPUT")
}
-func TestStartJob_InvalidJobSpec_Failed(t *testing.T) {
- ctx, trybots, mock, mockBB := setup(t)
+func TestStartJobV2_NormalJob_Failed(t *testing.T) {
+ ctx, trybots, mock, mockBB, _ := setup(t)
+
+ j1 := tryjobV2(ctx, repoUrl)
+ j1.Revision = "" // No revision is set initially; it's derived in startJob.
+ j1.Status = types.JOB_STATUS_REQUESTED
+ require.NoError(t, trybots.db.PutJob(ctx, j1))
+ oldToken := j1.BuildbucketToken
+ mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
+ mockBB.On("StartBuild", testutils.AnyContext, j1.BuildbucketBuildId, j1.Id, j1.BuildbucketToken).Return("", errors.New("can't start this build"))
+ err := trybots.startJob(ctx, j1)
+ require.ErrorContains(t, err, "can't start this build")
+ require.ErrorContains(t, err, "failed to send job-started notification")
+ j1, err = trybots.db.GetJobById(ctx, j1.Id)
+ require.NoError(t, err)
+ require.Empty(t, j1.Revision)
+ require.Equal(t, types.JOB_STATUS_REQUESTED, j1.Status)
+ require.NotEmpty(t, j1.BuildbucketToken)
+ require.Equal(t, oldToken, j1.BuildbucketToken)
+ mockBB.AssertExpectations(t)
+}
+
+func TestStartJobV1_InvalidJobSpec_Failed(t *testing.T) {
+ ctx, trybots, mock, mockBB, _ := setup(t)
mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
@@ -510,7 +858,7 @@
b2.Builder.Builder = "bogus-job"
mockBB.On("GetBuild", ctx, b2.Id).Return(b2, nil)
MockTryLeaseBuild(mock, b2.Id)
- require.NoError(t, trybots.insertNewJob(ctx, b2.Id))
+ require.NoError(t, trybots.insertNewJobV1(ctx, b2.Id))
j2 := aj.getAddedJob(ctx, t, trybots.db)
mockBB.On("GetBuild", ctx, b2.Id).Return(b2, nil)
err := trybots.startJob(ctx, j2)
@@ -519,7 +867,27 @@
j2, err = trybots.jCache.GetJob(j2.Id)
require.NoError(t, err)
require.Equal(t, types.JOB_STATUS_MISHAP, j2.Status)
- // TODO(borenet): Add a field to Job to give more details and check it here.
+ require.Contains(t, j2.StatusDetails, "Failed to start Job: no such job: bogus-job")
+}
+
+func TestStartJobV2_InvalidJobSpec_Failed(t *testing.T) {
+ ctx, trybots, mock, mockBB, _ := setup(t)
+
+ j1 := tryjobV2(ctx, repoUrl)
+ j1.Name = "bogus-job"
+ j1.Revision = "" // No revision is set initially; it's derived in startJob.
+ j1.Status = types.JOB_STATUS_REQUESTED
+ require.NoError(t, trybots.db.PutJob(ctx, j1))
+ mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
+ require.NoError(t, trybots.startJob(ctx, j1))
+ j1, err := trybots.db.GetJobById(ctx, j1.Id)
+ require.NoError(t, err)
+ require.Equal(t, commit2.Hash, j1.Revision)
+ require.Equal(t, types.JOB_STATUS_MISHAP, j1.Status)
+ require.Equal(t, bbFakeStartToken, j1.BuildbucketToken)
+ require.True(t, j1.Valid())
+ require.Contains(t, j1.StatusDetails, "Failed to start Job: no such job: bogus-job")
+ mockBB.AssertExpectations(t)
}
func mockGetChangeInfo(t *testing.T, mock *mockhttpclient.URLMock, id int, project, branch string) {
@@ -534,8 +902,8 @@
mock.Mock(fmt.Sprintf("%s/a%s", fakeGerritUrl, fmt.Sprintf(gerrit.URLTmplChange, ci.Id)), mockhttpclient.MockGetDialogue(issueBytes))
}
-func TestRetry(t *testing.T) {
- ctx, trybots, mock, mockBB := setup(t)
+func TestRetryV1(t *testing.T) {
+ ctx, trybots, mock, mockBB, _ := setup(t)
mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
@@ -547,7 +915,7 @@
MockTryLeaseBuild(mock, b1.Id)
MockJobStarted(mock, b1.Id)
mockBB.On("GetBuild", ctx, b1.Id).Return(b1, nil)
- require.NoError(t, trybots.insertNewJob(ctx, b1.Id))
+ require.NoError(t, trybots.insertNewJobV1(ctx, b1.Id))
j1 := aj.getAddedJob(ctx, t, trybots.db)
require.NoError(t, trybots.startJob(ctx, j1))
require.True(t, mock.Empty(), mock.List())
@@ -564,14 +932,47 @@
MockTryLeaseBuild(mock, b2.Id)
MockJobStarted(mock, b2.Id)
mockBB.On("GetBuild", ctx, b2.Id).Return(b2, nil)
- require.NoError(t, trybots.insertNewJob(ctx, b2.Id))
+ require.NoError(t, trybots.insertNewJobV1(ctx, b2.Id))
j2 := aj.getAddedJob(ctx, t, trybots.db)
require.NoError(t, trybots.startJob(ctx, j2))
require.True(t, mock.Empty(), mock.List())
require.Equal(t, j2.BuildbucketBuildId, b2.Id)
require.NotEqual(t, "", j2.BuildbucketLeaseKey)
require.True(t, j2.Valid())
+}
+
+func TestRetryV2(t *testing.T) {
+ ctx, trybots, mock, mockBB, _ := setup(t)
+
+ mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
+
+ // Insert one try job.
+ j1 := tryjobV2(ctx, repoUrl)
+ j1.Revision = "" // No revision is set initially; it's derived in startJob.
+ j1.Status = types.JOB_STATUS_REQUESTED
+ require.NoError(t, trybots.db.PutJob(ctx, j1))
+ mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
+ mockBB.On("StartBuild", testutils.AnyContext, j1.BuildbucketBuildId, j1.Id, j1.BuildbucketToken).Return(bbFakeUpdateToken, nil)
+ require.NoError(t, trybots.startJob(ctx, j1))
+ mockBB.AssertExpectations(t)
+
+ // Obtain a second try job, ensure that it gets IsForce = true.
+ j2 := tryjobV2(ctx, repoUrl)
+ j2.Revision = "" // No revision is set initially; it's derived in startJob.
+ j2.Status = types.JOB_STATUS_REQUESTED
+ require.NoError(t, trybots.db.PutJob(ctx, j2))
+ mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
+ mockBB.On("StartBuild", testutils.AnyContext, j2.BuildbucketBuildId, j2.Id, j2.BuildbucketToken).Return(bbFakeUpdateToken, nil)
+ require.NoError(t, trybots.startJob(ctx, j2))
+ j2, err := trybots.db.GetJobById(ctx, j2.Id)
+ require.NoError(t, err)
+ require.Equal(t, commit2.Hash, j2.Revision)
+ require.Equal(t, types.JOB_STATUS_IN_PROGRESS, j2.Status)
+ // Start token is exchanged for an update token.
+ require.NotEmpty(t, j2.BuildbucketToken)
require.True(t, j2.IsForce)
+ require.True(t, j2.Valid())
+ mockBB.AssertExpectations(t)
}
func testPollAssertAdded(t *testing.T, now time.Time, trybots *TryJobIntegrator, builds []*buildbucketpb.Build) {
@@ -617,7 +1018,7 @@
}
func TestPoll_OneNewBuild_Success(t *testing.T) {
- _, trybots, mock, mockBB := setup(t)
+ _, trybots, mock, mockBB, _ := setup(t)
mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
@@ -625,7 +1026,7 @@
}
func TestPoll_MultipleNewBuilds_Success(t *testing.T) {
- _, trybots, mock, mockBB := setup(t)
+ _, trybots, mock, mockBB, _ := setup(t)
mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
@@ -633,7 +1034,7 @@
}
func TestPoll_MultiplePagesOfNewBuilds_Success(t *testing.T) {
- _, trybots, mock, mockBB := setup(t)
+ _, trybots, mock, mockBB, _ := setup(t)
mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
@@ -648,7 +1049,7 @@
}
func TestPoll_MultipleNewBuilds_OneFailsInsert_OthersInsertSuccessfully(t *testing.T) {
- _, trybots, mock, mockBB := setup(t)
+ _, trybots, mock, mockBB, _ := setup(t)
mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
@@ -668,7 +1069,7 @@
}
func TestPoll_MultiplePagesOfNewBuilds_OnePeekFails_OthersInsertSuccessfully(t *testing.T) {
- _, trybots, mock, mockBB := setup(t)
+ _, trybots, mock, mockBB, _ := setup(t)
mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
diff --git a/task_scheduler/go/tryjobs/utils_test.go b/task_scheduler/go/tryjobs/utils_test.go
index d2bcaf3..0d562ff 100644
--- a/task_scheduler/go/tryjobs/utils_test.go
+++ b/task_scheduler/go/tryjobs/utils_test.go
@@ -20,6 +20,7 @@
"go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/mockhttpclient"
"go.skia.org/infra/go/now"
+ pubsub_mocks "go.skia.org/infra/go/pubsub/mocks"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/sktest"
"go.skia.org/infra/go/testutils"
@@ -41,8 +42,12 @@
patchProject = "skia"
parentProject = "parent-project"
- fakeGerritUrl = "https://fake-skia-review.googlesource.com"
- oldBranchName = "old-branch"
+ fakeGerritUrl = "https://fake-skia-review.googlesource.com"
+ oldBranchName = "old-branch"
+ bbPubSubProject = "fake-bb-pubsub-project"
+ bbPubSubTopic = "fake-bb-pubsub-topic"
+ bbFakeStartToken = "fake-bb-start-token"
+ bbFakeUpdateToken = "fake-bb-update-token"
)
var (
@@ -93,7 +98,7 @@
// setup prepares the tests to run. Returns the created temporary dir,
// TryJobIntegrator instance, and URLMock instance.
-func setup(t sktest.TestingT) (context.Context, *TryJobIntegrator, *mockhttpclient.URLMock, *mocks.BuildBucketInterface) {
+func setup(t sktest.TestingT) (context.Context, *TryJobIntegrator, *mockhttpclient.URLMock, *mocks.BuildBucketInterface, *pubsub_mocks.Topic) {
ctx := context.WithValue(context.Background(), now.ContextKey, ts)
// Set up other TryJobIntegrator inputs.
@@ -130,9 +135,12 @@
require.NoError(t, err)
jCache, err := cache.NewJobCache(ctx, d, window, nil)
require.NoError(t, err)
- integrator, err := NewTryJobIntegrator(API_URL_TESTING, BUCKET_TESTING, "fake-server", mock.Client(), d, jCache, projectRepoMapping, rm, taskCfgCache, chr, g)
+ pubsubClient := &pubsub_mocks.Client{}
+ pubsubTopic := &pubsub_mocks.Topic{}
+ pubsubClient.On("Topic", bbPubSubTopic).Return(pubsubTopic, nil)
+ integrator, err := NewTryJobIntegrator(ctx, API_URL_TESTING, "fake-bb-target", BUCKET_TESTING, "fake-server", mock.Client(), d, jCache, projectRepoMapping, rm, taskCfgCache, chr, g, pubsubClient)
require.NoError(t, err)
- return ctx, integrator, mock, MockBuildbucket(integrator)
+ return ctx, integrator, mock, MockBuildbucket(integrator), pubsubTopic
}
func MockBuildbucket(tj *TryJobIntegrator) *mocks.BuildBucketInterface {
@@ -171,24 +179,24 @@
}
}
-func tryjob(ctx context.Context, repoName string) *types.Job {
+func tryjobV1(ctx context.Context, repoName string) *types.Job {
return &types.Job{
BuildbucketBuildId: rand.Int63(),
BuildbucketLeaseKey: rand.Int63(),
Created: now.Now(ctx),
- Name: "fake-name",
- RepoState: types.RepoState{
- Patch: types.Patch{
- Server: "fake-server",
- Issue: "fake-issue",
- Patchset: "fake-patchset",
- },
- Repo: repoName,
- Revision: "fake-revision",
- },
+ Name: tcc_testutils.BuildTaskName,
+ RepoState: repoState2,
}
}
+func tryjobV2(ctx context.Context, repoName string) *types.Job {
+ job := tryjobV1(ctx, repoName)
+ job.BuildbucketLeaseKey = 0
+ job.BuildbucketToken = bbFakeStartToken
+ job.BuildbucketPubSubTopic = bbPubSubTopic
+ return job
+}
+
type errMsg struct {
Message string `json:"message"`
}
@@ -277,15 +285,15 @@
// We have to use this because we don't know what the Job ID is going to
// be until after it's inserted into the DB.
req := mockhttpclient.DONT_CARE_REQUEST
- resp := []byte("{}")
+ resp := []byte(fmt.Sprintf(`{"build": {},"update_build_token":"%s"}`, bbFakeUpdateToken))
mock.MockOnce(fmt.Sprintf("%sbuilds/%d/start?alt=json&prettyPrint=false", API_URL_TESTING, id), mockhttpclient.MockPostDialogue("application/json", req, resp))
}
-func MockJobStartedFailed(mock *mockhttpclient.URLMock, id int64, mockErr string) {
+func MockJobStartedFailed(mock *mockhttpclient.URLMock, id int64, mockErr, mockReason string) {
// We have to use this because we don't know what the Job ID is going to
// be until after it's inserted into the DB.
req := mockhttpclient.DONT_CARE_REQUEST
- resp := []byte(fmt.Sprintf("{\"error\":{\"message\":\"%s\"}}", mockErr))
+ resp := []byte(fmt.Sprintf(`{"error":{"message":"%s","reason":"%s"}}`, mockErr, mockReason))
mock.MockOnce(fmt.Sprintf("%sbuilds/%d/start?alt=json&prettyPrint=false", API_URL_TESTING, id), mockhttpclient.MockPostDialogue("application/json", req, resp))
}