[task scheduler] More fixes for buildbucket
1. Stop trying to update already-finished builds.
2. Fix search range in cleanup.
Bug: b/288158829
Change-Id: I9295478bd4498980b57794eeb6dcc1898549682a
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/803316
Commit-Queue: Eric Boren <borenet@google.com>
Auto-Submit: Eric Boren <borenet@google.com>
Reviewed-by: Kevin Lubick <kjlubick@google.com>
diff --git a/task_scheduler/go/tryjobs/tryjobs.go b/task_scheduler/go/tryjobs/tryjobs.go
index b882c14..522daa6 100644
--- a/task_scheduler/go/tryjobs/tryjobs.go
+++ b/task_scheduler/go/tryjobs/tryjobs.go
@@ -104,6 +104,10 @@
// Buildbucket when we call StartBuild more than once for the same build.
buildAlreadyStartedErr = "has recorded another StartBuild with request id"
+ // buildAlreadyFinishedErr is a substring of the error message returned by
+ // Buildbucket when we call UpdateBuild after the build has finished.
+ buildAlreadyFinishedErr = "cannot update an ended build"
+
// Project name used by buildbucket for all Skia builds.
buildbucketProject = "skia"
)
@@ -701,6 +705,10 @@
return err != nil && strings.Contains(err.Error(), buildAlreadyStartedErr)
}
+func isBuildAlreadyFinishedError(err error) bool {
+ return err != nil && strings.Contains(err.Error(), buildAlreadyFinishedErr)
+}
+
func (t *TryJobIntegrator) startJob(ctx context.Context, job *types.Job) error {
// We might encounter this Job via periodic polling or the query snapshot
// iterator, or both. We don't want to start the Job multiple times, so
@@ -977,14 +985,26 @@
if reason == "" {
reason = "Underlying job was canceled."
}
- return t.cancelBuild(ctx, j, reason)
+ return skerr.Wrap(t.cancelBuild(ctx, j, reason))
} else {
- return t.updateBuild(ctx, j)
+ if err := t.updateBuild(ctx, j); err != nil {
+ if isBuildAlreadyFinishedError(err) {
+ // Either we've already updated the build successfully, or
+ // someone else has updated it (likely canceled). Log a
+ // warning in case this persists and we need to investigate,
+ // but move on without returning an error.
+ sklog.Warningf("Tried to update already-finished job %s (build %d)", j.Id, j.BuildbucketBuildId)
+ return nil
+ }
+ return skerr.Wrap(err)
+ } else {
+ return nil
+ }
}
} else if j.Status == types.JOB_STATUS_SUCCESS {
- return t.buildSucceededV1(j)
+ return skerr.Wrap(t.buildSucceededV1(j))
} else {
- return t.buildFailed(j)
+ return skerr.Wrap(t.buildFailed(j))
}
}
@@ -998,7 +1018,7 @@
},
Status: buildbucketpb.Status_STARTED,
CreateTime: &buildbucketpb.TimeRange{
- EndTime: timestamppb.New(time.Now().Add(CLEANUP_AGE_THRESHOLD)),
+ EndTime: timestamppb.New(time.Now().Add(-CLEANUP_AGE_THRESHOLD)),
},
})
if err != nil {
@@ -1025,9 +1045,16 @@
} else {
sklog.Infof("Cleanup: attempting to update job %s for build %d", job.Id, build.Id)
if err := t.updateBuild(ctx, job); err != nil {
- sklog.Errorf("Cleanup: failed to update job %s for build %d; canceling. Error: %s", job.Id, build.Id, err)
- if err := t.cancelBuild(ctx, job, "Failed to UpdateBuild"); err != nil {
- return skerr.Wrapf(err, "failed to cancel build %d (job %s)", build.Id, job.Id)
+ if isBuildAlreadyFinishedError(err) {
+ // Ignore the error; the build shouldn't show up in the
+ // next round of cleanup, but log the error anyway just
+ // so that we're aware in case it does.
+ sklog.Warningf("Cleanup: tried to update already-finished job %s (build %d)", job.Id, build.Id)
+ } else {
+ sklog.Errorf("Cleanup: failed to update job %s for build %d; canceling. Error: %s", job.Id, build.Id, err)
+ if err := t.cancelBuild(ctx, job, "Failed to UpdateBuild"); err != nil {
+ return skerr.Wrapf(err, "failed to cancel build %d (job %s)", build.Id, job.Id)
+ }
}
}
}
diff --git a/task_scheduler/go/tryjobs/tryjobs_test.go b/task_scheduler/go/tryjobs/tryjobs_test.go
index 35c4f1f..aaf6ffe 100644
--- a/task_scheduler/go/tryjobs/tryjobs_test.go
+++ b/task_scheduler/go/tryjobs/tryjobs_test.go
@@ -371,7 +371,7 @@
const id = int64(12345)
expectErr := "Build does not exist!"
MockCancelBuildFailed(mock, id, "Canceling!", expectErr)
- require.EqualError(t, trybots.remoteCancelV1Build(id, "Canceling!"), expectErr)
+ require.ErrorContains(t, trybots.remoteCancelV1Build(id, "Canceling!"), expectErr)
require.True(t, mock.Empty(), mock.List())
}
@@ -512,7 +512,7 @@
trybots.jCache.AddJobs([]*types.Job{j})
expectErr := "fail"
MockJobSuccess_Failed(mock, j, now, false, expectErr)
- require.EqualError(t, trybots.jobFinished(ctx, j), expectErr)
+ require.ErrorContains(t, trybots.jobFinished(ctx, j), expectErr)
require.True(t, mock.Empty(), mock.List())
}
@@ -589,7 +589,7 @@
trybots.jCache.AddJobs([]*types.Job{j})
expectErr := "fail"
MockJobFailure_Failed(mock, j, now, expectErr)
- require.EqualError(t, trybots.jobFinished(ctx, j), expectErr)
+ require.ErrorContains(t, trybots.jobFinished(ctx, j), expectErr)
require.True(t, mock.Empty(), mock.List())
}
@@ -666,7 +666,7 @@
trybots.jCache.AddJobs([]*types.Job{j})
expectErr := "fail"
MockJobMishap_Failed(mock, j, now, expectErr)
- require.EqualError(t, trybots.jobFinished(ctx, j), expectErr)
+ require.ErrorContains(t, trybots.jobFinished(ctx, j), expectErr)
require.True(t, mock.Empty(), mock.List())
}
@@ -694,6 +694,30 @@
mockBB.AssertExpectations(t)
}
+func TestJobFinishedV2_BuildAlreadyDone_NoError(t *testing.T) {
+ ctx, trybots, _, mockBB, _ := setup(t)
+
+ j := tryjobV2(ctx, repoUrl)
+ now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
+ j.Status = types.JOB_STATUS_MISHAP
+ j.Finished = now
+ require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j}))
+ trybots.jCache.AddJobs([]*types.Job{j})
+ mockBB.On("UpdateBuild", testutils.AnyContext, &buildbucketpb.Build{
+ Id: j.BuildbucketBuildId,
+ Output: &buildbucketpb.Build_Output{
+ Status: buildbucketpb.Status_INFRA_FAILURE,
+ },
+ Infra: &buildbucketpb.BuildInfra{
+ Backend: &buildbucketpb.BuildInfra_Backend{
+ Task: buildbucket_taskbackend.JobToBuildbucketTask(ctx, j, trybots.buildbucketTarget, trybots.host),
+ },
+ },
+ }, j.BuildbucketToken).Return(errors.New(buildAlreadyFinishedErr))
+ require.NoError(t, trybots.jobFinished(ctx, j))
+ mockBB.AssertExpectations(t)
+}
+
type addedJobs map[string]*types.Job
func (aj addedJobs) getAddedJob(ctx context.Context, t *testing.T, d db.JobReader) *types.Job {