| package tryjobs |
| |
| import ( |
| "context" |
| "encoding/json" |
| "fmt" |
| "net/http" |
| "sort" |
| "strconv" |
| "strings" |
| "sync" |
| "time" |
| |
| pubsub_api "cloud.google.com/go/pubsub" |
| "github.com/golang/protobuf/ptypes" |
| "github.com/hashicorp/go-multierror" |
| buildbucketpb "go.chromium.org/luci/buildbucket/proto" |
| buildbucket_api "go.chromium.org/luci/common/api/buildbucket/buildbucket/v1" |
| "go.skia.org/infra/go/buildbucket" |
| "go.skia.org/infra/go/cleanup" |
| "go.skia.org/infra/go/firestore" |
| "go.skia.org/infra/go/gerrit" |
| "go.skia.org/infra/go/git/repograph" |
| "go.skia.org/infra/go/metrics2" |
| "go.skia.org/infra/go/now" |
| "go.skia.org/infra/go/pubsub" |
| "go.skia.org/infra/go/skerr" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/util" |
| "go.skia.org/infra/task_scheduler/go/cacher" |
| "go.skia.org/infra/task_scheduler/go/db" |
| "go.skia.org/infra/task_scheduler/go/db/cache" |
| "go.skia.org/infra/task_scheduler/go/job_creation/buildbucket_taskbackend" |
| "go.skia.org/infra/task_scheduler/go/task_cfg_cache" |
| "go.skia.org/infra/task_scheduler/go/types" |
| "google.golang.org/protobuf/encoding/prototext" |
| ) |
| |
| /* |
| Integration of the Task Scheduler with Buildbucket for try jobs. |
| */ |
| |
| const ( |
| // API URLs |
| API_URL_PROD = "https://cr-buildbucket.appspot.com/api/buildbucket/v1/" |
| API_URL_TESTING = "http://localhost:8008/api/buildbucket/v1/" |
| |
| // Buildbucket buckets used for try jobs. |
| BUCKET_PRIMARY = "skia.primary" |
| BUCKET_INTERNAL = "skia.internal" |
| BUCKET_TESTING = "skia.testing" |
| |
| // How often to send updates to Buildbucket. |
| UPDATE_INTERVAL = 30 * time.Second |
| |
| // We attempt to renew leases in batches. This is the batch size. |
| LEASE_BATCH_SIZE = 25 |
| |
| // We lease a build for this amount of time, and if we don't renew the |
| // lease before the time is up, the build resets to "scheduled" status |
| // and becomes available for leasing again. |
| LEASE_DURATION = time.Hour |
| |
| // We use a shorter initial lease duration in case we succeed in leasing |
| // a build but fail to insert the associated Job into the DB, eg. |
| // because the scheduler was interrupted. |
| LEASE_DURATION_INITIAL = 5 * time.Minute |
| |
| // How many pending builds to read from the bucket at a time. |
| PEEK_MAX_BUILDS = 50 |
| |
| // How often to poll Buildbucket for newly-scheduled builds. |
| POLL_INTERVAL = 10 * time.Second |
| |
| // This error reason indicates that we already marked the build as |
| // finished. |
| BUILDBUCKET_API_ERROR_REASON_COMPLETED = "BUILD_IS_COMPLETED" |
| |
| secondsToMicros = 1000000 |
| microsToNanos = 1000 |
| |
| // In case the error is very verbose (e.g. bot_update output), only send a |
| // truncated cancel reason to Buildbucket to avoid exceeding limits in |
| // Buildbucket's DB. |
| maxCancelReasonLen = 1024 |
| ) |
| |
| // TryJobIntegrator is responsible for communicating with Buildbucket to |
| // trigger try jobs and report their results. |
| type TryJobIntegrator struct { |
| bb *buildbucket_api.Service |
| bb2 buildbucket.BuildBucketInterface |
| buildbucketBucket string |
| buildbucketTarget string |
| chr cacher.Cacher |
| db db.JobDB |
| gerrit gerrit.GerritInterface |
| host string |
| jCache cache.JobCache |
| projectRepoMapping map[string]string |
| pubsub pubsub.Client |
| rm repograph.Map |
| taskCfgCache task_cfg_cache.TaskCfgCache |
| } |
| |
| // NewTryJobIntegrator returns a TryJobIntegrator instance. |
| func NewTryJobIntegrator(ctx context.Context, buildbucketAPIURL, buildbucketTarget, buildbucketBucket, host string, c *http.Client, d db.JobDB, jCache cache.JobCache, projectRepoMapping map[string]string, rm repograph.Map, taskCfgCache task_cfg_cache.TaskCfgCache, chr cacher.Cacher, gerrit gerrit.GerritInterface, pubsubClient pubsub.Client) (*TryJobIntegrator, error) { |
| bb, err := buildbucket_api.New(c) |
| if err != nil { |
| return nil, err |
| } |
| bb.BasePath = buildbucketAPIURL |
| rv := &TryJobIntegrator{ |
| bb: bb, |
| bb2: buildbucket.NewClient(c), |
| buildbucketBucket: buildbucketBucket, |
| buildbucketTarget: buildbucketTarget, |
| db: d, |
| chr: chr, |
| gerrit: gerrit, |
| host: host, |
| jCache: jCache, |
| projectRepoMapping: projectRepoMapping, |
| pubsub: pubsubClient, |
| rm: rm, |
| taskCfgCache: taskCfgCache, |
| } |
| return rv, nil |
| } |
| |
| // Start initiates the TryJobIntegrator's heatbeat and polling loops. If the |
| // given Context is canceled, the loops stop. |
| func (t *TryJobIntegrator) Start(ctx context.Context) { |
| lvUpdate := metrics2.NewLiveness("last_successful_update_buildbucket_tryjob_state") |
| cleanup.Repeat(UPDATE_INTERVAL, func(_ context.Context) { |
| // Explicitly ignore the passed-in context; this allows us to |
| // finish sending heartbeats and updating finished jobs in the |
| // DB even if the context is canceled, which helps to prevent |
| // inconsistencies between Buildbucket and the Task Scheduler |
| // DB. |
| if err := t.updateJobs(ctx); err != nil { |
| sklog.Error(err) |
| } else { |
| lvUpdate.Reset() |
| } |
| }, nil) |
| lvPoll := metrics2.NewLiveness("last_successful_poll_buildbucket_for_new_tryjobs") |
| cleanup.Repeat(POLL_INTERVAL, func(_ context.Context) { |
| // Explicitly ignore the passed-in context; this allows us to |
| // finish leasing jobs from Buildbucket and inserting them into |
| // the DB even if the context is canceled, which helps to |
| // prevent inconsistencies between Buildbucket and the Task |
| // Scheduler DB. |
| ctx := context.Background() |
| if err := t.Poll(ctx); err != nil { |
| sklog.Errorf("Failed to poll for new try jobs: %s", err) |
| } else { |
| lvPoll.Reset() |
| } |
| }, nil) |
| go t.startJobsLoop(ctx) |
| } |
| |
| // getActiveTryJobs returns the active (started but not yet marked as finished |
| // in Buildbucket) tryjobs. |
| func (t *TryJobIntegrator) getActiveTryJobs(ctx context.Context) ([]*types.Job, error) { |
| if err := t.jCache.Update(ctx); err != nil { |
| return nil, err |
| } |
| jobs := t.jCache.GetAllCachedJobs() |
| rv := []*types.Job{} |
| for _, job := range jobs { |
| if (job.BuildbucketLeaseKey != 0 || job.BuildbucketToken != "") && job.Status != types.JOB_STATUS_REQUESTED { |
| rv = append(rv, job) |
| } |
| } |
| return rv, nil |
| } |
| |
| // updateJobs sends updates to Buildbucket for all active try Jobs. |
| func (t *TryJobIntegrator) updateJobs(ctx context.Context) error { |
| // Get all Jobs associated with in-progress Buildbucket builds. |
| jobs, err := t.getActiveTryJobs(ctx) |
| if err != nil { |
| return err |
| } |
| |
| // Divide up finished and unfinished Jobs. |
| finished := make([]*types.Job, 0, len(jobs)) |
| unfinishedV1 := make([]*types.Job, 0, len(jobs)) |
| unfinishedV2 := make([]*types.Job, 0, len(jobs)) |
| for _, j := range jobs { |
| if j.Done() { |
| finished = append(finished, j) |
| } else if isBBv2(j) { |
| unfinishedV2 = append(unfinishedV2, j) |
| } else { |
| unfinishedV1 = append(unfinishedV1, j) |
| } |
| } |
| |
| // Send heartbeats for unfinished Jobs. |
| var heartbeatErr error |
| var wg sync.WaitGroup |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| heartbeatErr = t.sendHeartbeats(ctx, unfinishedV1) |
| }() |
| |
| var pubsubErr error |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| pubsubErr = t.sendPubsubUpdates(ctx, unfinishedV2) |
| }() |
| |
| // Send updates for finished Jobs, empty the lease keys to mark them |
| // as inactive in the DB. |
| errs := []error{} |
| insert := make([]*types.Job, 0, len(finished)) |
| for _, j := range finished { |
| if err := t.jobFinished(ctx, j); err != nil { |
| errs = append(errs, err) |
| } else { |
| j.BuildbucketLeaseKey = 0 |
| j.BuildbucketToken = "" |
| insert = append(insert, j) |
| } |
| } |
| if err := t.db.PutJobsInChunks(ctx, insert); err != nil { |
| errs = append(errs, err) |
| } |
| t.jCache.AddJobs(insert) |
| |
| wg.Wait() |
| if heartbeatErr != nil { |
| errs = append(errs, heartbeatErr) |
| } |
| if pubsubErr != nil { |
| errs = append(errs, pubsubErr) |
| } |
| |
| if len(errs) > 0 { |
| return skerr.Fmt("Failed to update jobs; got errors: %v", errs) |
| } |
| return nil |
| } |
| |
| // heartbeatJobSlice implements sort.Interface to sort Jobs by BuildbucketBuildId. |
| type heartbeatJobSlice []*types.Job |
| |
| func (s heartbeatJobSlice) Len() int { return len(s) } |
| |
| func (s heartbeatJobSlice) Less(i, j int) bool { |
| return s[i].BuildbucketBuildId < s[j].BuildbucketBuildId |
| } |
| |
| func (s heartbeatJobSlice) Swap(i, j int) { |
| s[i], s[j] = s[j], s[i] |
| } |
| |
| // isBBv2 returns true iff the Job was triggered using Buildbucket V2. |
| func isBBv2(j *types.Job) bool { |
| return j.BuildbucketPubSubTopic != "" |
| } |
| |
| // sendHeartbeats sends heartbeats to Buildbucket for all of the unfinished try |
| // Jobs. |
| func (t *TryJobIntegrator) sendHeartbeats(ctx context.Context, jobs []*types.Job) error { |
| defer metrics2.FuncTimer().Stop() |
| |
| // Sort the jobs by BuildbucketBuildId for consistency in testing. |
| sort.Sort(heartbeatJobSlice(jobs)) |
| |
| expiration := now.Now(ctx).Add(LEASE_DURATION).Unix() * secondsToMicros |
| |
| errs := []error{} |
| |
| // Send heartbeats for all leases. |
| send := func(jobs []*types.Job) { |
| heartbeats := make([]*buildbucket_api.LegacyApiHeartbeatBatchRequestMessageOneHeartbeat, 0, len(jobs)) |
| for _, j := range jobs { |
| heartbeats = append(heartbeats, &buildbucket_api.LegacyApiHeartbeatBatchRequestMessageOneHeartbeat{ |
| BuildId: j.BuildbucketBuildId, |
| LeaseKey: j.BuildbucketLeaseKey, |
| LeaseExpirationTs: expiration, |
| }) |
| } |
| sklog.Infof("Sending heartbeats for %d jobs...", len(jobs)) |
| resp, err := t.bb.HeartbeatBatch(&buildbucket_api.LegacyApiHeartbeatBatchRequestMessage{ |
| Heartbeats: heartbeats, |
| }).Do() |
| if err != nil { |
| errs = append(errs, skerr.Wrapf(err, "failed to send heartbeat request")) |
| return |
| } |
| // Results should follow the same ordering as the jobs we sent. |
| if len(resp.Results) != len(jobs) { |
| errs = append(errs, skerr.Fmt("Heartbeat result has incorrect number of jobs (%d vs %d)", len(resp.Results), len(jobs))) |
| return |
| } |
| var cancelJobs []*types.Job |
| var cancelReasons []string |
| for i, result := range resp.Results { |
| if result.Error != nil { |
| // Cancel the job. |
| if result.Error.Reason == BUILDBUCKET_API_ERROR_REASON_COMPLETED { |
| // This indicates that the build was canceled, eg. because |
| // a newer patchset was uploaded. This isn't an error, so we |
| // cancel the job but don't log an error. |
| } else { |
| sklog.Errorf("Error sending heartbeat for job; canceling %q: %s", jobs[i].Id, result.Error.Message) |
| } |
| cancelJobs = append(cancelJobs, jobs[i]) |
| cancelReasons = append(cancelReasons, fmt.Sprintf("Buildbucket rejected heartbeat with: %s", result.Error.Reason)) |
| } |
| } |
| if len(cancelJobs) > 0 { |
| sklog.Infof("Canceling %d jobs", len(cancelJobs)) |
| if err := t.localCancelJobs(ctx, cancelJobs, cancelReasons); err != nil { |
| errs = append(errs, err) |
| } |
| } |
| } |
| |
| // Send heartbeats in batches. |
| for len(jobs) > 0 { |
| j := LEASE_BATCH_SIZE |
| if j > len(jobs) { |
| j = len(jobs) |
| } |
| send(jobs[:j]) |
| jobs = jobs[j:] |
| } |
| sklog.Infof("Finished sending heartbeats.") |
| if len(errs) > 0 { |
| return skerr.Fmt("got errors sending heartbeats: %v", errs) |
| } |
| return nil |
| } |
| |
| // sendPubsubUpdates sends updates to Buildbucket via Pub/Sub for in-progress |
| // Jobs. |
| func (t *TryJobIntegrator) sendPubsubUpdates(ctx context.Context, jobs []*types.Job) error { |
| g := multierror.Group{} |
| for _, job := range jobs { |
| job := job // https://golang.org/doc/faq#closures_and_goroutines |
| g.Go(func() error { |
| update := &buildbucketpb.BuildTaskUpdate{ |
| BuildId: strconv.FormatInt(job.BuildbucketBuildId, 10), |
| Task: &buildbucketpb.Task{ |
| Id: &buildbucketpb.TaskID{ |
| Target: t.buildbucketTarget, |
| Id: job.Id, |
| }, |
| Link: job.URL(t.host), |
| Status: buildbucket_taskbackend.JobStatusToBuildbucketStatus(job.Status), |
| UpdateId: now.Now(ctx).UnixNano(), |
| }, |
| } |
| b, err := prototext.Marshal(update) |
| if err != nil { |
| return skerr.Wrapf(err, "failed to encode BuildTaskUpdate") |
| } |
| _, err = t.pubsub.Topic(job.BuildbucketPubSubTopic).Publish(ctx, &pubsub_api.Message{ |
| Data: b, |
| }).Get(ctx) |
| return err |
| }) |
| } |
| return g.Wait().ErrorOrNil() |
| } |
| |
| // getRepo returns the repo information associated with the given URL. |
| func (t *TryJobIntegrator) getRepo(repoUrl string) (*repograph.Graph, error) { |
| r, ok := t.rm[repoUrl] |
| if !ok { |
| return nil, skerr.Fmt("unknown repo %q", repoUrl) |
| } |
| return r, nil |
| } |
| |
| // getRevision obtains the branch name from Gerrit, then retrieves and returns |
| // the current commit at the head of that branch. |
| func (t *TryJobIntegrator) getRevision(ctx context.Context, repo *repograph.Graph, issue string) (string, error) { |
| issueNum, err := strconv.ParseInt(issue, 10, 64) |
| if err != nil { |
| return "", skerr.Wrapf(err, "failed to parse issue number") |
| } |
| changeInfo, err := t.gerrit.GetIssueProperties(ctx, issueNum) |
| if err != nil { |
| return "", skerr.Wrapf(err, "failed to get ChangeInfo") |
| } |
| c := repo.Get(changeInfo.Branch) |
| if c == nil { |
| return "", skerr.Fmt("Unknown branch %s", changeInfo.Branch) |
| } |
| return c.Hash, nil |
| } |
| |
| func (t *TryJobIntegrator) localCancelJobs(ctx context.Context, jobs []*types.Job, reasons []string) error { |
| if len(jobs) != len(reasons) { |
| return skerr.Fmt("expected jobs and reasons to have the same length") |
| } |
| for idx, j := range jobs { |
| j.BuildbucketLeaseKey = 0 |
| j.Status = types.JOB_STATUS_CANCELED |
| j.StatusDetails = reasons[idx] |
| j.Finished = now.Now(ctx) |
| } |
| if err := t.db.PutJobsInChunks(ctx, jobs); err != nil { |
| return err |
| } |
| t.jCache.AddJobs(jobs) |
| return nil |
| } |
| |
| func (t *TryJobIntegrator) remoteCancelV1Build(buildId int64, msg string) error { |
| sklog.Warningf("Canceling Buildbucket build %d. Reason: %s", buildId, msg) |
| message := struct { |
| Message string `json:"message"` |
| }{ |
| Message: util.Truncate(msg, maxCancelReasonLen), |
| } |
| b, err := json.Marshal(&message) |
| if err != nil { |
| return err |
| } |
| resp, err := t.bb.Cancel(buildId, &buildbucket_api.LegacyApiCancelRequestBodyMessage{ |
| ResultDetailsJson: string(b), |
| }).Do() |
| if err != nil { |
| return err |
| } |
| if resp.Error != nil { |
| return fmt.Errorf(resp.Error.Message) |
| } |
| return nil |
| } |
| |
| func (t *TryJobIntegrator) tryLeaseV1Build(ctx context.Context, id int64) (int64, *buildbucket_api.LegacyApiErrorMessage, error) { |
| expiration := now.Now(ctx).Add(LEASE_DURATION_INITIAL).Unix() * secondsToMicros |
| sklog.Infof("Attempting to lease build %d", id) |
| resp, err := t.bb.Lease(id, &buildbucket_api.LegacyApiLeaseRequestBodyMessage{ |
| LeaseExpirationTs: expiration, |
| }).Do() |
| if err != nil { |
| return 0, nil, skerr.Wrapf(err, "failed request to lease buildbucket build %d", id) |
| } |
| leaseKey := int64(0) |
| if resp.Build != nil { |
| leaseKey = resp.Build.LeaseKey |
| } |
| return leaseKey, resp.Error, nil |
| } |
| |
| func (t *TryJobIntegrator) insertNewJobV1(ctx context.Context, buildId int64) error { |
| // Get the build details from the v2 API. |
| build, err := t.bb2.GetBuild(ctx, buildId) |
| if err != nil { |
| return skerr.Wrapf(err, "failed to retrieve build %d", buildId) |
| } |
| if build.Status != buildbucketpb.Status_SCHEDULED { |
| sklog.Warningf("Found build %d with status: %s; attempting to lease anyway, to trigger the fix in Buildbucket.", build.Id, build.Status) |
| _, bbError, err := t.tryLeaseV1Build(ctx, buildId) |
| if err != nil || bbError != nil { |
| // This is expected. |
| return nil |
| } |
| sklog.Warningf("Unexpectedly able to lease build %d with status %s; canceling it.", buildId, build.Status) |
| if err := t.remoteCancelV1Build(buildId, fmt.Sprintf("Unexpected status %s", build.Status)); err != nil { |
| sklog.Warningf("Failed to cancel errant build %d", buildId) |
| return nil |
| } |
| } |
| |
| // Obtain and validate the RepoState. |
| if build.Input.GerritChanges == nil || len(build.Input.GerritChanges) != 1 { |
| return t.remoteCancelV1Build(buildId, fmt.Sprintf("Invalid Build %d: input should have exactly one GerritChanges: %+v", buildId, build.Input)) |
| } |
| gerritChange := build.Input.GerritChanges[0] |
| repoUrl, ok := t.projectRepoMapping[gerritChange.Project] |
| if !ok { |
| return t.remoteCancelV1Build(buildId, fmt.Sprintf("Unknown patch project %q", gerritChange.Project)) |
| } |
| server := gerritChange.Host |
| if !strings.Contains(server, "://") { |
| server = fmt.Sprintf("https://%s", server) |
| } |
| rs := types.RepoState{ |
| Patch: types.Patch{ |
| Server: server, |
| Issue: strconv.FormatInt(gerritChange.Change, 10), |
| PatchRepo: repoUrl, |
| Patchset: strconv.FormatInt(gerritChange.Patchset, 10), |
| }, |
| Repo: repoUrl, |
| // We can't fill this out without retrieving the Gerrit ChangeInfo and |
| // resolving the branch to a commit hash. Defer that work until later. |
| Revision: "", |
| } |
| requested, err := ptypes.Timestamp(build.CreateTime) |
| if err != nil { |
| return t.remoteCancelV1Build(buildId, fmt.Sprintf("Failed to convert timestamp for %d: %s", build.Id, err)) |
| } |
| j := &types.Job{ |
| Name: build.Builder.Builder, |
| BuildbucketBuildId: buildId, |
| Requested: firestore.FixTimestamp(requested.UTC()), |
| Created: firestore.FixTimestamp(now.Now(ctx)), |
| RepoState: rs, |
| Status: types.JOB_STATUS_REQUESTED, |
| } |
| if !j.Requested.Before(j.Created) { |
| sklog.Errorf("Try job created time %s is before requested time %s! Setting equal.", j.Created, j.Requested) |
| j.Requested = j.Created.Add(-firestore.TS_RESOLUTION) |
| } |
| // Attempt to lease the build. |
| leaseKey, bbError, err := t.tryLeaseV1Build(ctx, j.BuildbucketBuildId) |
| if err != nil { |
| return skerr.Wrapf(err, "failed to lease build %d", j.BuildbucketBuildId) |
| } else if bbError != nil { |
| // Note: we're just assuming that the only reason Buildbucket would |
| // return an error is that the Build has been canceled. While this |
| // is the most likely reason, others are possible, and we may gain |
| // some information by reading the error and behaving accordingly. |
| return t.remoteCancelV1Build(buildId, fmt.Sprintf("Buildbucket refused lease with %q", bbError.Message)) |
| } else if leaseKey == 0 { |
| return t.remoteCancelV1Build(buildId, "Buildbucket returned zero lease key") |
| } |
| j.BuildbucketLeaseKey = leaseKey |
| |
| if err := t.db.PutJob(ctx, j); err != nil { |
| return t.remoteCancelV1Build(j.BuildbucketBuildId, fmt.Sprintf("Failed to insert Job into the DB: %s", err)) |
| } |
| t.jCache.AddJobs([]*types.Job{j}) |
| return nil |
| } |
| |
| func (t *TryJobIntegrator) startJobsLoop(ctx context.Context) { |
| // The code in startJob makes the assumption that we'll come back to the job |
| // and try again if requests to Buildbucket fail for transient-looking |
| // reasons. ModifiedJobsCh only changes when jobs are modified in the |
| // database, so we also need a periodic poll to ensure that we retry any |
| // jobs we failed to start on the first try. A 5-minute period was chosen |
| // because it is short enough not to cause significant lag in handling try |
| // jobs but hopefully long enough that any transient errors are resolved |
| // before we try again. |
| jobsCh := t.db.ModifiedJobsCh(ctx) |
| ticker := time.NewTicker(5 * time.Minute) |
| tickCh := ticker.C |
| doneCh := ctx.Done() |
| for { |
| select { |
| case jobs := <-jobsCh: |
| for _, job := range jobs { |
| if job.Status != types.JOB_STATUS_REQUESTED { |
| continue |
| } |
| sklog.Infof("Found job %s (build %d) via modified jobs channel", job.Id, job.BuildbucketBuildId) |
| if err := t.startJob(ctx, job); err != nil { |
| sklog.Errorf("failed to start job: %s", err) |
| } |
| } |
| case <-tickCh: |
| jobs, err := t.jCache.RequestedJobs() |
| if err != nil { |
| sklog.Errorf("failed retrieving Jobs: %s", err) |
| } else { |
| for _, job := range jobs { |
| sklog.Infof("Found job %s (build %d) via periodic DB poll", job.Id, job.BuildbucketBuildId) |
| if err := t.startJob(ctx, job); err != nil { |
| sklog.Errorf("failed to start job: %s", err) |
| } |
| } |
| } |
| case <-doneCh: |
| ticker.Stop() |
| return |
| } |
| } |
| } |
| |
| func (t *TryJobIntegrator) startJob(ctx context.Context, job *types.Job) error { |
| sklog.Infof("Starting job %s (build %d); lease key: %d", job.Id, job.BuildbucketBuildId, job.BuildbucketLeaseKey) |
| startJobHelper := func() error { |
| repoGraph, err := t.getRepo(job.Repo) |
| if err != nil { |
| return skerr.Wrapf(err, "unable to find repo %s", job.Repo) |
| } |
| if job.Revision == "" { |
| // Derive the revision from the branch specified by the Gerrit CL. |
| revision, err := t.getRevision(ctx, repoGraph, job.Issue) |
| if err != nil { |
| return skerr.Wrapf(err, "failed to find base revision for issue %s in %s", job.Issue, job.Repo) |
| } |
| job.Revision = revision |
| } else { |
| // Resolve the already-set revision (which might be a branch name) |
| // to a commit hash. |
| c := repoGraph.Get(job.Revision) |
| if c == nil { |
| return skerr.Fmt("Unknown revision %s", job.Revision) |
| } |
| job.Revision = c.Hash |
| } |
| if !job.RepoState.Valid() || !job.RepoState.IsTryJob() || skipRepoState(job.RepoState) { |
| return skerr.Fmt("invalid RepoState: %s", job.RepoState) |
| } |
| |
| // Create a Job. |
| if _, err := t.chr.GetOrCacheRepoState(ctx, job.RepoState); err != nil { |
| return skerr.Wrapf(err, "failed to obtain JobSpec") |
| } |
| cfg, cachedErr, err := t.taskCfgCache.Get(ctx, job.RepoState) |
| if err != nil { |
| return err |
| } |
| if cachedErr != nil { |
| return cachedErr |
| } |
| spec, ok := cfg.Jobs[job.Name] |
| if !ok { |
| return skerr.Fmt("no such job: %s", job.Name) |
| } |
| deps, err := spec.GetTaskSpecDAG(cfg) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| job.Dependencies = deps |
| job.Tasks = map[string][]*types.TaskSummary{} |
| |
| // Determine if this is a manual retry of a previously-run try job. If |
| // so, set IsForce to ensure that we don't immediately de-duplicate all |
| // of its tasks. |
| prevJobs, err := t.jCache.GetJobsByRepoState(job.Name, job.RepoState) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| if len(prevJobs) > 0 { |
| job.IsForce = true |
| } |
| return nil |
| } |
| |
| if err := startJobHelper(); err != nil { |
| sklog.Infof("Failed to start job %s (build %d) with: %s", job.Id, job.BuildbucketBuildId, err) |
| job.Status = types.JOB_STATUS_MISHAP |
| job.StatusDetails = util.Truncate(fmt.Sprintf("Failed to start Job: %s", skerr.Unwrap(err)), 1024) |
| } else { |
| job.Status = types.JOB_STATUS_IN_PROGRESS |
| |
| // Notify Buildbucket that the Job has started. |
| bbToken, bbError, err := t.jobStarted(ctx, job) |
| if err != nil { |
| return skerr.Wrapf(err, "failed to send job-started notification") |
| } else if bbError != nil { |
| // Note: we're just assuming that the only reason Buildbucket would |
| // return an error is that the Build has been canceled. While this |
| // is the most likely reason, others are possible, and we may gain |
| // some information by reading the error and behaving accordingly. |
| cancelReason := fmt.Sprintf("Buildbucket rejected Start with: %s", bbError.Reason) |
| if cancelErr := t.localCancelJobs(ctx, []*types.Job{job}, []string{cancelReason}); cancelErr != nil { |
| return skerr.Wrapf(cancelErr, "failed to start build %d with %q and failed to cancel job", job.BuildbucketBuildId, bbError.Message) |
| } else { |
| return skerr.Fmt("failed to start build %d with %q", job.BuildbucketBuildId, bbError.Message) |
| } |
| } else if bbToken != "" { |
| job.BuildbucketToken = bbToken |
| } |
| } |
| |
| // Update the job and insert into the DB. |
| if err := t.db.PutJob(ctx, job); err != nil { |
| return skerr.Wrapf(err, "failed to insert Job into the DB") |
| } |
| t.jCache.AddJobs([]*types.Job{job}) |
| return nil |
| } |
| |
| func (t *TryJobIntegrator) Poll(ctx context.Context) error { |
| if err := t.jCache.Update(ctx); err != nil { |
| return err |
| } |
| |
| // Grab all of the pending Builds from Buildbucket. |
| cursor := "" |
| errs := []error{} |
| var mtx sync.Mutex |
| for { |
| sklog.Infof("Running 'peek' on %s", t.buildbucketBucket) |
| resp, err := t.bb.Peek().Bucket(t.buildbucketBucket).MaxBuilds(PEEK_MAX_BUILDS).StartCursor(cursor).Do() |
| if err != nil { |
| errs = append(errs, err) |
| break |
| } |
| if resp.Error != nil { |
| errs = append(errs, fmt.Errorf(resp.Error.Message)) |
| break |
| } |
| var wg sync.WaitGroup |
| for _, b := range resp.Builds { |
| wg.Add(1) |
| go func(b *buildbucket_api.LegacyApiCommonBuildMessage) { |
| defer wg.Done() |
| if err := t.insertNewJobV1(ctx, b.Id); err != nil { |
| mtx.Lock() |
| errs = append(errs, err) |
| mtx.Unlock() |
| } |
| }(b) |
| } |
| wg.Wait() |
| cursor = resp.NextCursor |
| if cursor == "" { |
| break |
| } |
| } |
| |
| // Report any errors. |
| if len(errs) > 0 { |
| return skerr.Fmt("got errors loading builds from Buildbucket: %v", errs) |
| } |
| |
| return nil |
| } |
| |
| // jobStarted notifies Buildbucket that the given Job has started. Returns the |
| // Buildbucket token returned by Buildbucket, any error object returned by |
| // Buildbucket (eg. if the Build has been canceled), or any error which occurred |
| // when attempting the request. |
| func (t *TryJobIntegrator) jobStarted(ctx context.Context, j *types.Job) (string, *buildbucket_api.LegacyApiErrorMessage, error) { |
| if isBBv2(j) { |
| sklog.Infof("bb2.Start for job %s (build %d)", j.Id, j.BuildbucketBuildId) |
| updateToken, err := t.bb2.StartBuild(ctx, j.BuildbucketBuildId, j.Id, j.BuildbucketToken) |
| return updateToken, nil, skerr.Wrap(err) |
| } else { |
| sklog.Infof("bb.Start for job %s (build %d)", j.Id, j.BuildbucketBuildId) |
| resp, err := t.bb.Start(j.BuildbucketBuildId, &buildbucket_api.LegacyApiStartRequestBodyMessage{ |
| LeaseKey: j.BuildbucketLeaseKey, |
| Url: j.URL(t.host), |
| }).Do() |
| if err != nil { |
| return "", nil, err |
| } |
| return "", resp.Error, nil |
| } |
| } |
| |
| // buildSucceededV1 sends a success notification to Buildbucket. |
| func (t *TryJobIntegrator) buildSucceededV1(j *types.Job) error { |
| sklog.Infof("bb.Succeed for job %s (build %d)", j.Id, j.BuildbucketBuildId) |
| b, err := json.Marshal(struct { |
| Job *types.Job `json:"job"` |
| }{ |
| Job: j, |
| }) |
| if err != nil { |
| return err |
| } |
| resp, err := t.bb.Succeed(j.BuildbucketBuildId, &buildbucket_api.LegacyApiSucceedRequestBodyMessage{ |
| LeaseKey: j.BuildbucketLeaseKey, |
| ResultDetailsJson: string(b), |
| Url: j.URL(t.host), |
| }).Do() |
| if err != nil { |
| return err |
| } |
| if resp.Error != nil { |
| if resp.Error.Reason == BUILDBUCKET_API_ERROR_REASON_COMPLETED { |
| sklog.Warningf("Sent success status for build %d after completion.", j.BuildbucketBuildId) |
| } else { |
| return fmt.Errorf(resp.Error.Message) |
| } |
| } |
| return nil |
| } |
| |
| // buildFailed sends a failure notification to Buildbucket. |
| func (t *TryJobIntegrator) buildFailed(j *types.Job) error { |
| b, err := json.Marshal(struct { |
| Job *types.Job `json:"job"` |
| }{ |
| Job: j, |
| }) |
| if err != nil { |
| return err |
| } |
| failureReason := "BUILD_FAILURE" |
| if j.Status == types.JOB_STATUS_MISHAP { |
| failureReason = "INFRA_FAILURE" |
| } |
| sklog.Infof("bb.Fail for job %s (build %d)", j.Id, j.BuildbucketBuildId) |
| resp, err := t.bb.Fail(j.BuildbucketBuildId, &buildbucket_api.LegacyApiFailRequestBodyMessage{ |
| FailureReason: failureReason, |
| LeaseKey: j.BuildbucketLeaseKey, |
| ResultDetailsJson: string(b), |
| Url: j.URL(t.host), |
| }).Do() |
| if err != nil { |
| return err |
| } |
| if resp.Error != nil { |
| if resp.Error.Reason == BUILDBUCKET_API_ERROR_REASON_COMPLETED { |
| sklog.Warningf("Sent failure status for build %d after completion.", j.BuildbucketBuildId) |
| } else { |
| return fmt.Errorf(resp.Error.Message) |
| } |
| } |
| return nil |
| } |
| |
| func (t *TryJobIntegrator) updateBuild(ctx context.Context, j *types.Job) error { |
| sklog.Infof("bb2.UpdateBuild for job %s (build %d)", j.Id, j.BuildbucketBuildId) |
| return t.bb2.UpdateBuild(ctx, jobToBuildV2(j), j.BuildbucketToken) |
| } |
| |
| // jobFinished notifies Buildbucket that the given Job has finished. |
| func (t *TryJobIntegrator) jobFinished(ctx context.Context, j *types.Job) error { |
| if !j.Done() { |
| return skerr.Fmt("JobFinished called for unfinished Job!") |
| } |
| if isBBv2(j) { |
| return t.updateBuild(ctx, j) |
| } else if j.Status == types.JOB_STATUS_SUCCESS { |
| return t.buildSucceededV1(j) |
| } else { |
| return t.buildFailed(j) |
| } |
| } |
| |
| // skipRepoState determines whether we should skip try jobs for this RepoState, |
| // eg. problematic CLs. |
| func skipRepoState(rs types.RepoState) bool { |
| // Invalid hash; this causes hours of wasted sync times. |
| if rs.Issue == "527502" && rs.Patchset == "1" { |
| return true |
| } |
| return false |
| } |
| |
| // jobToBuildV2 converts a Job to a Buildbucket V2 Build to be used with |
| // UpdateBuild. |
| func jobToBuildV2(job *types.Job) *buildbucketpb.Build { |
| status := buildbucket_taskbackend.JobStatusToBuildbucketStatus(job.Status) |
| |
| // Note: There are other fields we could fill in, but I'm not sure they |
| // would provide any value since we don't actually use Buildbucket builds |
| // for anything. |
| return &buildbucketpb.Build{ |
| Id: job.BuildbucketBuildId, |
| Output: &buildbucketpb.Build_Output{ |
| Status: status, |
| SummaryMarkdown: job.StatusDetails, |
| }, |
| } |
| } |