[task scheduler] Tryjob-related fixes

- Don't set the lease expiration time before syncing the code.
- Lease, insert into the DB, and mark a job as started all in parallel
  rather than doing so in batches.

Bug: skia:
Change-Id: Ifdcd27f88e92220669121819e1812b789972d626
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/199343
Reviewed-by: Ben Wagner <benjaminwagner@google.com>
Commit-Queue: Eric Boren <borenet@google.com>
diff --git a/task_scheduler/go/scheduling/task_scheduler_test.go b/task_scheduler/go/scheduling/task_scheduler_test.go
index c8d5f78..215c753 100644
--- a/task_scheduler/go/scheduling/task_scheduler_test.go
+++ b/task_scheduler/go/scheduling/task_scheduler_test.go
@@ -2412,9 +2412,9 @@
 	}
 	b.ParametersJson = testutils.MarshalJSON(t, tryjobs.Params(t, tcc_testutils.TestTaskName, "skia", rs.Revision, rs.Server, rs.Issue, rs.Patchset))
 	tryjobs.MockPeek(mock, []*buildbucket_api.ApiCommonBuildMessage{b}, now, "", "", nil)
-	tryjobs.MockTryLeaseBuild(mock, b.Id, now, nil)
-	tryjobs.MockJobStarted(mock, b.Id, now, nil)
-	assert.NoError(t, s.tryjobs.Poll(ctx, now))
+	tryjobs.MockTryLeaseBuild(mock, b.Id, nil)
+	tryjobs.MockJobStarted(mock, b.Id, nil)
+	assert.NoError(t, s.tryjobs.Poll(ctx))
 	assert.True(t, mock.Empty())
 
 	// Ensure that we added a Job.
diff --git a/task_scheduler/go/tryjobs/testutils.go b/task_scheduler/go/tryjobs/testutils.go
index 463512f..534fcca 100644
--- a/task_scheduler/go/tryjobs/testutils.go
+++ b/task_scheduler/go/tryjobs/testutils.go
@@ -273,8 +273,8 @@
 	mock.MockOnce(fmt.Sprintf("%sbuilds/%d/cancel?alt=json&prettyPrint=false", API_URL_TESTING, id), mockhttpclient.MockPostDialogue("application/json", req, resp))
 }
 
-func MockTryLeaseBuild(mock *mockhttpclient.URLMock, id int64, now time.Time, err error) {
-	req := []byte(fmt.Sprintf("{\"lease_expiration_ts\":\"%d\"}\n", now.Add(LEASE_DURATION_INITIAL).Unix()*1000000))
+func MockTryLeaseBuild(mock *mockhttpclient.URLMock, id int64, err error) {
+	req := mockhttpclient.DONT_CARE_REQUEST
 	respStr := fmt.Sprintf("{\"build\": {\"lease_key\": \"%d\"}}", 987654321)
 	if err != nil {
 		respStr = fmt.Sprintf("{\"error\": {\"message\": \"%s\"}}", err)
@@ -283,7 +283,7 @@
 	mock.MockOnce(fmt.Sprintf("%sbuilds/%d/lease?alt=json&prettyPrint=false", API_URL_TESTING, id), mockhttpclient.MockPostDialogue("application/json", req, resp))
 }
 
-func MockJobStarted(mock *mockhttpclient.URLMock, id int64, now time.Time, err error) {
+func MockJobStarted(mock *mockhttpclient.URLMock, id int64, err error) {
 	// We have to use this because we don't know what the Job ID is going to
 	// be until after it's inserted into the DB.
 	req := mockhttpclient.DONT_CARE_REQUEST
diff --git a/task_scheduler/go/tryjobs/tryjobs.go b/task_scheduler/go/tryjobs/tryjobs.go
index c735b28..6038cce 100644
--- a/task_scheduler/go/tryjobs/tryjobs.go
+++ b/task_scheduler/go/tryjobs/tryjobs.go
@@ -19,7 +19,6 @@
 	"go.skia.org/infra/task_scheduler/go/cacher"
 	"go.skia.org/infra/task_scheduler/go/db"
 	"go.skia.org/infra/task_scheduler/go/db/cache"
-	"go.skia.org/infra/task_scheduler/go/db/firestore"
 	"go.skia.org/infra/task_scheduler/go/task_cfg_cache"
 	"go.skia.org/infra/task_scheduler/go/types"
 )
@@ -112,7 +111,7 @@
 		}
 	})
 	go util.RepeatCtx(POLL_INTERVAL, ctx, func() {
-		if err := t.Poll(ctx, time.Now()); err != nil {
+		if err := t.Poll(ctx); err != nil {
 			sklog.Errorf("Failed to poll for new try jobs: %s", err)
 		}
 	})
@@ -323,8 +322,8 @@
 	return nil
 }
 
-func (t *TryJobIntegrator) tryLeaseBuild(id int64, now time.Time) (int64, error) {
-	expiration := now.Add(LEASE_DURATION_INITIAL).Unix() * 1000000
+func (t *TryJobIntegrator) tryLeaseBuild(id int64) (int64, error) {
+	expiration := time.Now().Add(LEASE_DURATION_INITIAL).Unix() * 1000000
 	sklog.Infof("Attempting to lease build %d", id)
 	resp, err := t.bb.Lease(id, &buildbucket_api.ApiLeaseRequestBodyMessage{
 		LeaseExpirationTs: expiration,
@@ -338,21 +337,21 @@
 	return resp.Build.LeaseKey, nil
 }
 
-func (t *TryJobIntegrator) getJobToSchedule(ctx context.Context, b *buildbucket_api.ApiCommonBuildMessage, now time.Time) (*types.Job, error) {
+func (t *TryJobIntegrator) insertNewJob(ctx context.Context, b *buildbucket_api.ApiCommonBuildMessage) error {
 	// Parse the build parameters.
 	var params buildbucket.Parameters
 	if err := json.NewDecoder(strings.NewReader(b.ParametersJson)).Decode(&params); err != nil {
-		return nil, t.remoteCancelBuild(b.Id, fmt.Sprintf("Invalid parameters_json: %s;\n\n%s", err, b.ParametersJson))
+		return t.remoteCancelBuild(b.Id, fmt.Sprintf("Invalid parameters_json: %s;\n\n%s", err, b.ParametersJson))
 	}
 
 	// Obtain and validate the RepoState.
 	topRepoUrl, topRepoGraph, patchRepoUrl, err := t.getRepo(&params.Properties)
 	if err != nil {
-		return nil, t.remoteCancelBuild(b.Id, fmt.Sprintf("Unable to find repo: %s", err))
+		return t.remoteCancelBuild(b.Id, fmt.Sprintf("Unable to find repo: %s", err))
 	}
 	revision, err := t.getRevision(topRepoGraph, params.Properties.Revision, int64(params.Properties.GerritIssue))
 	if err != nil {
-		return nil, t.remoteCancelBuild(b.Id, fmt.Sprintf("Invalid revision: %s", err))
+		return t.remoteCancelBuild(b.Id, fmt.Sprintf("Invalid revision: %s", err))
 	}
 	server := params.Properties.Gerrit
 	issue := params.Properties.GerritIssue
@@ -361,7 +360,7 @@
 		psSplit := strings.Split(patchset, "/")
 		patchset = psSplit[len(psSplit)-1]
 	} else {
-		return nil, t.remoteCancelBuild(b.Id, fmt.Sprintf("Invalid patch storage: %s", params.Properties.PatchStorage))
+		return t.remoteCancelBuild(b.Id, fmt.Sprintf("Invalid patch storage: %s", params.Properties.PatchStorage))
 	}
 	rs := types.RepoState{
 		Patch: types.Patch{
@@ -374,16 +373,16 @@
 		Revision: revision,
 	}
 	if !rs.Valid() || !rs.IsTryJob() {
-		return nil, t.remoteCancelBuild(b.Id, fmt.Sprintf("Invalid RepoState: %s", rs))
+		return t.remoteCancelBuild(b.Id, fmt.Sprintf("Invalid RepoState: %s", rs))
 	}
 
 	// Create a Job.
 	if _, err := t.chr.GetOrCacheRepoState(ctx, rs); err != nil {
-		return nil, t.remoteCancelBuild(b.Id, fmt.Sprintf("Failed to obtain JobSpec: %s; \n\n%v", err, params))
+		return t.remoteCancelBuild(b.Id, fmt.Sprintf("Failed to obtain JobSpec: %s; \n\n%v", err, params))
 	}
 	j, err := t.taskCfgCache.MakeJob(ctx, rs, params.BuilderName)
 	if err != nil {
-		return nil, t.remoteCancelBuild(b.Id, fmt.Sprintf("Failed to create Job from JobSpec: %s; \n\n%v", err, params))
+		return t.remoteCancelBuild(b.Id, fmt.Sprintf("Failed to create Job from JobSpec: %s; \n\n%v", err, params))
 	}
 
 	// Determine if this is a manual retry of a previously-run try job. If
@@ -391,28 +390,44 @@
 	// of its tasks.
 	prevJobs, err := t.jCache.GetJobsByRepoState(j.Name, j.RepoState)
 	if err != nil {
-		return nil, err
+		return err
 	}
 	if len(prevJobs) > 0 {
 		j.IsForce = true
 	}
 
 	// Attempt to lease the build.
-	leaseKey, err := t.tryLeaseBuild(b.Id, now)
+	leaseKey, err := t.tryLeaseBuild(b.Id)
 	if err != nil {
 		// TODO(borenet): Buildbot cancels the build in this case.
 		// Should we do that too?
-		return nil, err
+		return err
 	}
 
-	// Update and return the Job.
+	// Update the job and insert into the DB.
 	j.BuildbucketBuildId = b.Id
 	j.BuildbucketLeaseKey = leaseKey
+	if err := t.db.PutJob(j); err != nil {
+		return t.remoteCancelBuild(b.Id, fmt.Sprintf("Failed to insert Job into the DB: %s", err))
+	}
 
-	return j, nil
+	// Since Jobs may consist of multiple Tasks, we consider them to be
+	// "started" as soon as we've picked them up.
+	// TODO(borenet): Sending "started" notifications after inserting the
+	// new Jobs into the database puts us at risk of never sending the
+	// notification if the process is interrupted. However, we need to
+	// include the Job ID with the notification, so we have to insert the
+	// Job into the DB first.
+	if err := t.jobStarted(j); err != nil {
+		if cancelErr := t.localCancelJobs([]*types.Job{j}); cancelErr != nil {
+			return fmt.Errorf("Failed to send job-started notification with: %s\nAnd failed to cancel the job with: %s", err, cancelErr)
+		}
+		return fmt.Errorf("Failed to send job-started notification with: %s", err)
+	}
+	return nil
 }
 
-func (t *TryJobIntegrator) Poll(ctx context.Context, now time.Time) error {
+func (t *TryJobIntegrator) Poll(ctx context.Context) error {
 	if err := t.jCache.Update(); err != nil {
 		return err
 	}
@@ -421,7 +436,6 @@
 	// TODO(borenet): Buildbot maintains a maximum lease count. Should we do
 	// that too?
 	cursor := ""
-	jobs := []*types.Job{}
 	errs := []error{}
 	var mtx sync.Mutex
 	for {
@@ -440,13 +454,10 @@
 			wg.Add(1)
 			go func(b *buildbucket_api.ApiCommonBuildMessage) {
 				defer wg.Done()
-				j, err := t.getJobToSchedule(ctx, b, now)
-				mtx.Lock()
-				defer mtx.Unlock()
-				if err != nil {
+				if err := t.insertNewJob(ctx, b); err != nil {
+					mtx.Lock()
 					errs = append(errs, err)
-				} else if j != nil {
-					jobs = append(jobs, j)
+					mtx.Unlock()
 				}
 			}(b)
 		}
@@ -457,40 +468,6 @@
 		}
 	}
 
-	// Insert Jobs into the database.
-	insertedJobs := make([]*types.Job, 0, len(jobs))
-	if len(jobs) > 0 {
-		if err := util.ChunkIter(len(jobs), firestore.MAX_TRANSACTION_DOCS, func(i, j int) error {
-			if err := t.db.PutJobs(jobs[i:j]); err != nil {
-				return err
-			}
-			insertedJobs = append(insertedJobs, jobs[i:j]...)
-			return nil
-		}); err != nil {
-			errs = append(errs, err)
-		}
-	}
-
-	// Since Jobs may consist of multiple Tasks, we consider them to be
-	// "started" as soon as we've picked them up.
-	// TODO(borenet): Sending "started" notifications after inserting the
-	// new Jobs into the database puts us at risk of never sending the
-	// notification if the process is interrupted. However, we need to
-	// include the Job ID with the notification, so we have to insert the
-	// Job into the DB first.
-	cancelJobs := []*types.Job{}
-	for _, j := range insertedJobs {
-		if err := t.jobStarted(j); err != nil {
-			errs = append(errs, err)
-			cancelJobs = append(cancelJobs, j)
-		}
-	}
-	if len(cancelJobs) > 0 {
-		if err := t.localCancelJobs(cancelJobs); err != nil {
-			errs = append(errs, err)
-		}
-	}
-
 	// Report any errors.
 	if len(errs) > 0 {
 		return fmt.Errorf("Got errors loading builds from Buildbucket: %v", errs)
diff --git a/task_scheduler/go/tryjobs/tryjobs_test.go b/task_scheduler/go/tryjobs/tryjobs_test.go
index 449317e..49ca48a 100644
--- a/task_scheduler/go/tryjobs/tryjobs_test.go
+++ b/task_scheduler/go/tryjobs/tryjobs_test.go
@@ -13,6 +13,7 @@
 	"go.skia.org/infra/go/gerrit"
 	"go.skia.org/infra/go/mockhttpclient"
 	"go.skia.org/infra/go/testutils"
+	"go.skia.org/infra/task_scheduler/go/db"
 	"go.skia.org/infra/task_scheduler/go/types"
 )
 
@@ -205,16 +206,15 @@
 	defer cleanup()
 
 	id := int64(12345)
-	now := time.Now()
-	MockTryLeaseBuild(mock, id, now, nil)
-	k, err := trybots.tryLeaseBuild(id, now)
+	MockTryLeaseBuild(mock, id, nil)
+	k, err := trybots.tryLeaseBuild(id)
 	assert.NoError(t, err)
 	assert.NotEqual(t, k, 0)
 	assert.True(t, mock.Empty())
 
 	expect := fmt.Errorf("Can't lease this!")
-	MockTryLeaseBuild(mock, id, now, expect)
-	_, err = trybots.tryLeaseBuild(id, now)
+	MockTryLeaseBuild(mock, id, expect)
+	_, err = trybots.tryLeaseBuild(id)
 	assert.Contains(t, err.Error(), expect.Error())
 	assert.True(t, mock.Empty())
 }
@@ -224,16 +224,15 @@
 	defer cleanup()
 
 	j := tryjob(gb.RepoUrl())
-	now := time.Now()
 
 	// Success
-	MockJobStarted(mock, j.BuildbucketBuildId, now, nil)
+	MockJobStarted(mock, j.BuildbucketBuildId, nil)
 	assert.NoError(t, trybots.jobStarted(j))
 	assert.True(t, mock.Empty())
 
 	// Failure
 	err := fmt.Errorf("fail")
-	MockJobStarted(mock, j.BuildbucketBuildId, now, err)
+	MockJobStarted(mock, j.BuildbucketBuildId, err)
 	assert.EqualError(t, trybots.jobStarted(j), err.Error())
 	assert.True(t, mock.Empty())
 }
@@ -287,27 +286,46 @@
 	assert.True(t, mock.Empty())
 }
 
-func TestGetJobToSchedule(t *testing.T) {
+type addedJobs map[string]*types.Job
+
+func (aj addedJobs) getAddedJob(t *testing.T, d db.JobReader) *types.Job {
+	allJobs, err := d.GetJobsFromDateRange(time.Time{}, time.Now())
+	assert.NoError(t, err)
+	for _, job := range allJobs {
+		if _, ok := aj[job.Id]; !ok {
+			aj[job.Id] = job
+			return job
+		}
+	}
+	return nil
+}
+
+func TestInsertNewJob(t *testing.T) {
 	ctx, trybots, _, mock, cleanup := setup(t)
 	defer cleanup()
 
 	now := time.Now()
 
+	aj := addedJobs(map[string]*types.Job{})
+
 	// Normal job, Gerrit patch.
 	b1 := Build(t, now)
-	MockTryLeaseBuild(mock, b1.Id, now, nil)
-	result, err := trybots.getJobToSchedule(ctx, b1, now)
+	MockTryLeaseBuild(mock, b1.Id, nil)
+	MockJobStarted(mock, b1.Id, nil)
+	err := trybots.insertNewJob(ctx, b1)
 	assert.NoError(t, err)
 	assert.True(t, mock.Empty())
+	result := aj.getAddedJob(t, trybots.db)
 	assert.Equal(t, result.BuildbucketBuildId, b1.Id)
 	assert.Equal(t, result.BuildbucketLeaseKey, b1.LeaseKey)
 	assert.True(t, result.Valid())
 
 	// Failed to lease build.
 	expectErr := fmt.Errorf("Can't lease this!")
-	MockTryLeaseBuild(mock, b1.Id, now, expectErr)
-	result, err = trybots.getJobToSchedule(ctx, b1, now)
+	MockTryLeaseBuild(mock, b1.Id, expectErr)
+	err = trybots.insertNewJob(ctx, b1)
 	assert.Contains(t, err.Error(), expectErr.Error())
+	result = aj.getAddedJob(t, trybots.db)
 	assert.Nil(t, result)
 	assert.True(t, mock.Empty())
 
@@ -315,8 +333,9 @@
 	b2 := Build(t, now)
 	b2.ParametersJson = "dklsadfklas"
 	MockCancelBuild(mock, b2.Id, "Invalid parameters_json: invalid character 'd' looking for beginning of value;\\\\n\\\\ndklsadfklas", nil)
-	result, err = trybots.getJobToSchedule(ctx, b2, now)
+	err = trybots.insertNewJob(ctx, b2)
 	assert.NoError(t, err) // We don't report errors for bad data from buildbucket.
+	result = aj.getAddedJob(t, trybots.db)
 	assert.Nil(t, result)
 	assert.True(t, mock.Empty())
 
@@ -324,8 +343,9 @@
 	b3 := Build(t, now)
 	b3.ParametersJson = testutils.MarshalJSON(t, Params(t, "fake-job", "bogus-repo", "master", gerritPatch.Server, gerritPatch.Issue, gerritPatch.Patchset))
 	MockCancelBuild(mock, b3.Id, "Unable to find repo: Unknown patch project \\\\\\\"bogus-repo\\\\\\\"", nil)
-	result, err = trybots.getJobToSchedule(ctx, b3, now)
+	err = trybots.insertNewJob(ctx, b3)
 	assert.NoError(t, err) // We don't report errors for bad data from buildbucket.
+	result = aj.getAddedJob(t, trybots.db)
 	assert.Nil(t, result)
 	assert.True(t, mock.Empty())
 
@@ -333,8 +353,9 @@
 	b4 := Build(t, now)
 	b4.ParametersJson = testutils.MarshalJSON(t, Params(t, "fake-job", patchProject, "abz", gerritPatch.Server, gerritPatch.Issue, gerritPatch.Patchset))
 	MockCancelBuild(mock, b4.Id, "Invalid revision: Unknown revision abz", nil)
-	result, err = trybots.getJobToSchedule(ctx, b4, now)
+	err = trybots.insertNewJob(ctx, b4)
 	assert.NoError(t, err) // We don't report errors for bad data from buildbucket.
+	result = aj.getAddedJob(t, trybots.db)
 	assert.Nil(t, result)
 	assert.True(t, mock.Empty())
 
@@ -344,8 +365,9 @@
 	p.Properties.PatchStorage = "???"
 	b6.ParametersJson = testutils.MarshalJSON(t, p)
 	MockCancelBuild(mock, b6.Id, "Invalid patch storage: ???", nil)
-	result, err = trybots.getJobToSchedule(ctx, b6, now)
+	err = trybots.insertNewJob(ctx, b6)
 	assert.NoError(t, err) // We don't report errors for bad data from buildbucket.
+	result = aj.getAddedJob(t, trybots.db)
 	assert.Nil(t, result)
 	assert.True(t, mock.Empty())
 
@@ -353,8 +375,9 @@
 	b7 := Build(t, now)
 	b7.ParametersJson = testutils.MarshalJSON(t, Params(t, "fake-job", patchProject, "bad-revision", gerritPatch.Server, gerritPatch.Issue, gerritPatch.Patchset))
 	MockCancelBuild(mock, b7.Id, "Invalid revision: Unknown revision bad-revision", nil)
-	result, err = trybots.getJobToSchedule(ctx, b7, now)
+	err = trybots.insertNewJob(ctx, b7)
 	assert.NoError(t, err) // We don't report errors for bad data from buildbucket.
+	result = aj.getAddedJob(t, trybots.db)
 	assert.Nil(t, result)
 	assert.True(t, mock.Empty())
 
@@ -362,8 +385,9 @@
 	b8 := Build(t, now)
 	b8.ParametersJson = testutils.MarshalJSON(t, Params(t, "bogus-job", patchProject, "master", gerritPatch.Server, gerritPatch.Issue, gerritPatch.Patchset))
 	MockCancelBuild(mock, b8.Id, "Failed to create Job from JobSpec: No such job: bogus-job; \\\\n\\\\n{bogus-job [] {0  https://skia-review.googlesource.com/ 2112 3  skia gerrit  master } \\\\u003cnil\\\\u003e}", nil)
-	result, err = trybots.getJobToSchedule(ctx, b8, now)
+	err = trybots.insertNewJob(ctx, b8)
 	assert.NoError(t, err) // We don't report errors for bad data from buildbucket.
+	result = aj.getAddedJob(t, trybots.db)
 	assert.Nil(t, result)
 	assert.True(t, mock.Empty())
 
@@ -372,8 +396,9 @@
 	b9.ParametersJson = testutils.MarshalJSON(t, Params(t, "bogus-job", patchProject, "master", gerritPatch.Server, gerritPatch.Issue, gerritPatch.Patchset))
 	expect := fmt.Errorf("no cancel!")
 	MockCancelBuild(mock, b9.Id, "Failed to create Job from JobSpec: No such job: bogus-job; \\\\n\\\\n{bogus-job [] {0  https://skia-review.googlesource.com/ 2112 3  skia gerrit  master } \\\\u003cnil\\\\u003e}", expect)
-	result, err = trybots.getJobToSchedule(ctx, b9, now)
+	err = trybots.insertNewJob(ctx, b9)
 	assert.EqualError(t, err, expect.Error())
+	result = aj.getAddedJob(t, trybots.db)
 	assert.Nil(t, result)
 	assert.True(t, mock.Empty())
 }
@@ -385,10 +410,13 @@
 	now := time.Now()
 
 	// Insert one try job.
+	aj := addedJobs(map[string]*types.Job{})
 	b1 := Build(t, now)
-	MockTryLeaseBuild(mock, b1.Id, now, nil)
-	j1, err := trybots.getJobToSchedule(ctx, b1, now)
+	MockTryLeaseBuild(mock, b1.Id, nil)
+	MockJobStarted(mock, b1.Id, nil)
+	err := trybots.insertNewJob(ctx, b1)
 	assert.NoError(t, err)
+	j1 := aj.getAddedJob(t, trybots.db)
 	assert.True(t, mock.Empty())
 	assert.Equal(t, j1.BuildbucketBuildId, b1.Id)
 	assert.Equal(t, j1.BuildbucketLeaseKey, b1.LeaseKey)
@@ -399,10 +427,12 @@
 
 	// Obtain a second try job, ensure that it gets IsForce = true.
 	b2 := Build(t, now)
-	MockTryLeaseBuild(mock, b2.Id, now, nil)
-	j2, err := trybots.getJobToSchedule(ctx, b2, now)
+	MockTryLeaseBuild(mock, b2.Id, nil)
+	MockJobStarted(mock, b2.Id, nil)
+	err = trybots.insertNewJob(ctx, b2)
 	assert.NoError(t, err)
 	assert.True(t, mock.Empty())
+	j2 := aj.getAddedJob(t, trybots.db)
 	assert.Equal(t, j2.BuildbucketBuildId, b2.Id)
 	assert.Equal(t, j2.BuildbucketLeaseKey, b2.LeaseKey)
 	assert.True(t, j2.Valid())
@@ -444,14 +474,14 @@
 	mockBuilds := func(builds []*buildbucket_api.ApiCommonBuildMessage) []*buildbucket_api.ApiCommonBuildMessage {
 		MockPeek(mock, builds, now, "", "", nil)
 		for _, b := range builds {
-			MockTryLeaseBuild(mock, b.Id, now, nil)
-			MockJobStarted(mock, b.Id, now, nil)
+			MockTryLeaseBuild(mock, b.Id, nil)
+			MockJobStarted(mock, b.Id, nil)
 		}
 		return builds
 	}
 
 	check := func(builds []*buildbucket_api.ApiCommonBuildMessage) {
-		assert.Nil(t, trybots.Poll(ctx, now))
+		assert.Nil(t, trybots.Poll(ctx))
 		assert.True(t, mock.Empty())
 		assertAdded(builds)
 	}
@@ -467,12 +497,12 @@
 	MockPeek(mock, builds[:PEEK_MAX_BUILDS], now, "", "cursor1", nil)
 	MockPeek(mock, builds[PEEK_MAX_BUILDS:], now, "cursor1", "", nil)
 	for _, b := range builds {
-		MockTryLeaseBuild(mock, b.Id, now, nil)
-		MockJobStarted(mock, b.Id, now, nil)
+		MockTryLeaseBuild(mock, b.Id, nil)
+		MockJobStarted(mock, b.Id, nil)
 	}
 	check(builds)
 
-	// Multiple new builds, fail getJobToSchedule, ensure successful builds
+	// Multiple new builds, fail insertNewJob, ensure successful builds
 	// are inserted.
 	builds = makeBuilds(5)
 	failIdx := 2
@@ -481,8 +511,8 @@
 	MockPeek(mock, builds, now, "", "", nil)
 	builds = append(builds[:failIdx], builds[failIdx+1:]...)
 	for _, b := range builds {
-		MockTryLeaseBuild(mock, b.Id, now, nil)
-		MockJobStarted(mock, b.Id, now, nil)
+		MockTryLeaseBuild(mock, b.Id, nil)
+		MockJobStarted(mock, b.Id, nil)
 	}
 	MockCancelBuild(mock, failBuild.Id, "Invalid parameters_json: invalid character '?' looking for beginning of value;\\\\n\\\\n???", nil)
 	check(builds)
@@ -494,12 +524,12 @@
 	MockPeek(mock, builds, now, "", "", nil)
 	builds = append(builds[:failIdx], builds[failIdx+1:]...)
 	for _, b := range builds {
-		MockTryLeaseBuild(mock, b.Id, now, nil)
-		MockJobStarted(mock, b.Id, now, nil)
+		MockTryLeaseBuild(mock, b.Id, nil)
+		MockJobStarted(mock, b.Id, nil)
 	}
-	MockTryLeaseBuild(mock, failBuild.Id, now, nil)
-	MockJobStarted(mock, failBuild.Id, now, fmt.Errorf("Failed to start build."))
-	assert.EqualError(t, trybots.Poll(ctx, now), "Got errors loading builds from Buildbucket: [Failed to start build.]")
+	MockTryLeaseBuild(mock, failBuild.Id, nil)
+	MockJobStarted(mock, failBuild.Id, fmt.Errorf("Failed to start build."))
+	assert.EqualError(t, trybots.Poll(ctx), "Got errors loading builds from Buildbucket: [Failed to send job-started notification with: Failed to start build.]")
 	assert.True(t, mock.Empty())
 	assertAdded(builds)
 
@@ -511,10 +541,10 @@
 	MockPeek(mock, builds[PEEK_MAX_BUILDS:], now, "cursor1", "", err)
 	builds = builds[:PEEK_MAX_BUILDS]
 	for _, b := range builds {
-		MockTryLeaseBuild(mock, b.Id, now, nil)
-		MockJobStarted(mock, b.Id, now, nil)
+		MockTryLeaseBuild(mock, b.Id, nil)
+		MockJobStarted(mock, b.Id, nil)
 	}
-	assert.EqualError(t, trybots.Poll(ctx, now), "Got errors loading builds from Buildbucket: [Failed peek]")
+	assert.EqualError(t, trybots.Poll(ctx), "Got errors loading builds from Buildbucket: [Failed peek]")
 	assert.True(t, mock.Empty())
 	assertAdded(builds)
 }