[task scheduler] Add code paths for Buildbucket V2

Bug: b/288158829
Change-Id: I409c7283e635a3492f500944837d1a6eeb70b173
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/786177
Reviewed-by: Kevin Lubick <kjlubick@google.com>
diff --git a/go/buildbucket/BUILD.bazel b/go/buildbucket/BUILD.bazel
index 2ede382..a43b307 100644
--- a/go/buildbucket/BUILD.bazel
+++ b/go/buildbucket/BUILD.bazel
@@ -9,8 +9,11 @@
     deps = [
         "//go/buildbucket/common",
         "//go/skerr",
+        "@com_github_google_uuid//:uuid",
         "@org_chromium_go_luci//buildbucket/proto",
         "@org_chromium_go_luci//grpc/prpc",
+        "@org_golang_google_grpc//metadata",
+        "@org_golang_google_protobuf//types/known/fieldmaskpb",
         "@org_golang_google_protobuf//types/known/structpb",
     ],
 )
diff --git a/go/buildbucket/buildbucket.go b/go/buildbucket/buildbucket.go
index 8d00ed0..edea2ea 100644
--- a/go/buildbucket/buildbucket.go
+++ b/go/buildbucket/buildbucket.go
@@ -5,7 +5,10 @@
 	"context"
 	"net/http"
 
+	"github.com/google/uuid"
 	buildbucketpb "go.chromium.org/luci/buildbucket/proto"
+	"google.golang.org/grpc/metadata"
+	"google.golang.org/protobuf/types/known/fieldmaskpb"
 	structpb "google.golang.org/protobuf/types/known/structpb"
 
 	"go.chromium.org/luci/grpc/prpc"
@@ -16,6 +19,7 @@
 const (
 	BUILD_URL_TMPL = "https://%s/build/%d"
 	DEFAULT_HOST   = "cr-buildbucket.appspot.com"
+	headerToken    = "x-buildbucket-token"
 )
 
 var (
@@ -26,10 +30,12 @@
 )
 
 type BuildBucketInterface interface {
+	// CancelBuilds cancels the specified buildIDs with the specified
+	// summaryMarkdown. Builds are cancelled with one batch request
+	// to buildbucket.
+	CancelBuilds(ctx context.Context, buildIDs []int64, summaryMarkdown string) ([]*buildbucketpb.Build, error)
 	// GetBuild retrieves the build with the given ID.
 	GetBuild(ctx context.Context, buildId int64) (*buildbucketpb.Build, error)
-	// Search retrieves Builds which match the given criteria.
-	Search(ctx context.Context, pred *buildbucketpb.BuildPredicate) ([]*buildbucketpb.Build, error)
 	// GetTrybotsForCL retrieves trybot results for the given CL using the
 	// optional tags.
 	GetTrybotsForCL(ctx context.Context, issue, patchset int64, gerritUrl string, tags map[string]string) ([]*buildbucketpb.Build, error)
@@ -42,10 +48,13 @@
 	// means that the Infra-PerCommit-Race build should be scheduled with the
 	// "triggered_by: skcq" tag.
 	ScheduleBuilds(ctx context.Context, builds []string, buildsToTags map[string]map[string]string, issue, patchset int64, gerritUrl, repo, bbProject, bbBucket string) ([]*buildbucketpb.Build, error)
-	// CancelBuilds cancels the specified buildIDs with the specified
-	// summaryMarkdown. Builds are cancelled with one batch request
-	// to buildbucket.
-	CancelBuilds(ctx context.Context, buildIDs []int64, summaryMarkdown string) ([]*buildbucketpb.Build, error)
+	// Search retrieves Builds which match the given criteria.
+	Search(ctx context.Context, pred *buildbucketpb.BuildPredicate) ([]*buildbucketpb.Build, error)
+	// UpdateBuild sends an update for the given build.
+	UpdateBuild(ctx context.Context, build *buildbucketpb.Build, token string) error
+	// StartBuild notifies Buildbucket that the build has started. Returns the
+	// token which should be passed to UpdateBuild for subsequent calls.
+	StartBuild(ctx context.Context, buildId int64, taskId, token string) (string, error)
 }
 
 // Client is used for interacting with the BuildBucket API.
@@ -207,5 +216,76 @@
 	return c.Search(ctx, pred)
 }
 
+func contextWithTokenMetadata(ctx context.Context, token string) context.Context {
+	return metadata.NewOutgoingContext(ctx, map[string][]string{
+		headerToken: {token},
+	})
+}
+
+// StartBuild implements BuildbucketInterface.
+func (c *Client) StartBuild(ctx context.Context, buildId int64, taskId, token string) (string, error) {
+	resp, err := c.bc.StartBuild(contextWithTokenMetadata(ctx, token), &buildbucketpb.StartBuildRequest{
+		RequestId: uuid.New().String(),
+		BuildId:   buildId,
+		TaskId:    taskId,
+	})
+	if err != nil {
+		return "", err
+	}
+	return resp.UpdateBuildToken, nil
+}
+
+// UpdateBuild implements BuildbucketInterface.
+func (c *Client) UpdateBuild(ctx context.Context, build *buildbucketpb.Build, token string) error {
+	var updatePaths []string
+	if build.Output != nil {
+		if build.Output.Properties != nil {
+			updatePaths = append(updatePaths, "build.output.properties")
+		}
+		if build.Output.GitilesCommit != nil {
+			updatePaths = append(updatePaths, "build.output.gitiles_commit")
+		}
+		if build.Output.Status != buildbucketpb.Status_STATUS_UNSPECIFIED {
+			updatePaths = append(updatePaths, "build.output.status")
+		}
+		if build.Output.StatusDetails != nil {
+			updatePaths = append(updatePaths, "build.output.status_details")
+		}
+		if build.Output.SummaryMarkdown != "" {
+			updatePaths = append(updatePaths, "build.output.summary_markdown")
+		}
+	}
+	if build.Status != buildbucketpb.Status_STATUS_UNSPECIFIED {
+		updatePaths = append(updatePaths, "build.status")
+	}
+	if build.StatusDetails != nil {
+		updatePaths = append(updatePaths, "build.status_details")
+	}
+	if len(build.Steps) > 0 {
+		updatePaths = append(updatePaths, "build.steps")
+	}
+	if build.SummaryMarkdown != "" {
+		updatePaths = append(updatePaths, "build.summary_markdown")
+	}
+	if len(build.Tags) > 0 {
+		updatePaths = append(updatePaths, "build.tags")
+	}
+	if build.Infra != nil && build.Infra.Buildbucket != nil && build.Infra.Buildbucket.Agent != nil {
+		if build.Infra.Buildbucket.Agent.Output != nil {
+			updatePaths = append(updatePaths, "build.infra.buildbucket.agent.output")
+		}
+		if build.Infra.Buildbucket.Agent.Purposes != nil {
+			updatePaths = append(updatePaths, "build.infra.buildbucket.agent.purposes")
+		}
+	}
+	_, err := c.bc.UpdateBuild(contextWithTokenMetadata(ctx, token), &buildbucketpb.UpdateBuildRequest{
+		Build: build,
+		UpdateMask: &fieldmaskpb.FieldMask{
+			Paths: updatePaths,
+		},
+	})
+	return skerr.Wrap(err)
+}
+
 // Make sure Client fulfills the BuildBucketInterface interface.
 var _ BuildBucketInterface = (*Client)(nil)
diff --git a/go/buildbucket/mocks/BuildBucketInterface.go b/go/buildbucket/mocks/BuildBucketInterface.go
index a965b5f..5531818 100644
--- a/go/buildbucket/mocks/BuildBucketInterface.go
+++ b/go/buildbucket/mocks/BuildBucketInterface.go
@@ -165,6 +165,52 @@
 	return r0, r1
 }
 
+// StartBuild provides a mock function with given fields: ctx, buildId, taskId, token
+func (_m *BuildBucketInterface) StartBuild(ctx context.Context, buildId int64, taskId string, token string) (string, error) {
+	ret := _m.Called(ctx, buildId, taskId, token)
+
+	if len(ret) == 0 {
+		panic("no return value specified for StartBuild")
+	}
+
+	var r0 string
+	var r1 error
+	if rf, ok := ret.Get(0).(func(context.Context, int64, string, string) (string, error)); ok {
+		return rf(ctx, buildId, taskId, token)
+	}
+	if rf, ok := ret.Get(0).(func(context.Context, int64, string, string) string); ok {
+		r0 = rf(ctx, buildId, taskId, token)
+	} else {
+		r0 = ret.Get(0).(string)
+	}
+
+	if rf, ok := ret.Get(1).(func(context.Context, int64, string, string) error); ok {
+		r1 = rf(ctx, buildId, taskId, token)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// UpdateBuild provides a mock function with given fields: ctx, build, token
+func (_m *BuildBucketInterface) UpdateBuild(ctx context.Context, build *buildbucketpb.Build, token string) error {
+	ret := _m.Called(ctx, build, token)
+
+	if len(ret) == 0 {
+		panic("no return value specified for UpdateBuild")
+	}
+
+	var r0 error
+	if rf, ok := ret.Get(0).(func(context.Context, *buildbucketpb.Build, string) error); ok {
+		r0 = rf(ctx, build, token)
+	} else {
+		r0 = ret.Error(0)
+	}
+
+	return r0
+}
+
 // NewBuildBucketInterface creates a new instance of BuildBucketInterface. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
 // The first argument is typically a *testing.T value.
 func NewBuildBucketInterface(t interface {
diff --git a/task_scheduler/go/job_creation/BUILD.bazel b/task_scheduler/go/job_creation/BUILD.bazel
index 5a5d2a2..97b32f3 100644
--- a/task_scheduler/go/job_creation/BUILD.bazel
+++ b/task_scheduler/go/job_creation/BUILD.bazel
@@ -15,6 +15,7 @@
         "//go/git/repograph",
         "//go/metrics2",
         "//go/now",
+        "//go/pubsub",
         "//go/skerr",
         "//go/sklog",
         "//go/util",
@@ -27,7 +28,6 @@
         "//task_scheduler/go/tryjobs",
         "//task_scheduler/go/types",
         "//task_scheduler/go/window",
-        "@org_golang_x_oauth2//:oauth2",
         "@org_golang_x_sync//errgroup",
     ],
 )
diff --git a/task_scheduler/go/job_creation/job_creation.go b/task_scheduler/go/job_creation/job_creation.go
index fddb2e9..5498d5d 100644
--- a/task_scheduler/go/job_creation/job_creation.go
+++ b/task_scheduler/go/job_creation/job_creation.go
@@ -2,7 +2,6 @@
 
 import (
 	"context"
-	"fmt"
 	"net/http"
 	"strings"
 	"time"
@@ -15,6 +14,7 @@
 	"go.skia.org/infra/go/git/repograph"
 	"go.skia.org/infra/go/metrics2"
 	"go.skia.org/infra/go/now"
+	"go.skia.org/infra/go/pubsub"
 	"go.skia.org/infra/go/skerr"
 	"go.skia.org/infra/go/sklog"
 	"go.skia.org/infra/go/util"
@@ -27,7 +27,6 @@
 	"go.skia.org/infra/task_scheduler/go/tryjobs"
 	"go.skia.org/infra/task_scheduler/go/types"
 	"go.skia.org/infra/task_scheduler/go/window"
-	"golang.org/x/oauth2"
 	"golang.org/x/sync/errgroup"
 )
 
@@ -64,31 +63,31 @@
 }
 
 // NewJobCreator returns a JobCreator instance.
-func NewJobCreator(ctx context.Context, d db.DB, period time.Duration, numCommits int, workdir, host string, repos repograph.Map, rbe cas.CAS, c *http.Client, buildbucketApiUrl, trybotBucket string, projectRepoMapping map[string]string, depotTools string, gerrit gerrit.GerritInterface, taskCfgCache task_cfg_cache.TaskCfgCache, ts oauth2.TokenSource) (*JobCreator, error) {
+func NewJobCreator(ctx context.Context, d db.DB, period time.Duration, numCommits int, workdir, host string, repos repograph.Map, rbe cas.CAS, c *http.Client, buildbucketApiUrl, buildbucketTarget, buildbucketBucket string, projectRepoMapping map[string]string, depotTools string, gerrit gerrit.GerritInterface, taskCfgCache task_cfg_cache.TaskCfgCache, pubsubClient pubsub.Client) (*JobCreator, error) {
 	// Repos must be updated before window is initialized; otherwise the repos may be uninitialized,
 	// resulting in the window being too short, causing the caches to be loaded with incomplete data.
 	for _, r := range repos {
 		if err := r.Update(ctx); err != nil {
-			return nil, fmt.Errorf("Failed initial repo sync: %s", err)
+			return nil, skerr.Wrapf(err, "failed initial repo sync")
 		}
 	}
 	w, err := window.New(ctx, period, numCommits, repos)
 	if err != nil {
-		return nil, fmt.Errorf("Failed to create window: %s", err)
+		return nil, skerr.Wrapf(err, "failed to create window")
 	}
 
 	// Create caches.
 	jCache, err := cache.NewJobCache(ctx, d, w, nil)
 	if err != nil {
-		return nil, fmt.Errorf("Failed to create JobCache: %s", err)
+		return nil, skerr.Wrapf(err, "fFailed to create JobCache")
 	}
 
 	sc := syncer.New(ctx, repos, depotTools, workdir, syncer.DefaultNumWorkers)
 	chr := cacher.New(sc, taskCfgCache, rbe)
 
-	tryjobs, err := tryjobs.NewTryJobIntegrator(buildbucketApiUrl, trybotBucket, host, c, d, jCache, projectRepoMapping, repos, taskCfgCache, chr, gerrit)
+	tryjobs, err := tryjobs.NewTryJobIntegrator(ctx, buildbucketApiUrl, buildbucketTarget, buildbucketBucket, host, c, d, jCache, projectRepoMapping, repos, taskCfgCache, chr, gerrit, pubsubClient)
 	if err != nil {
-		return nil, fmt.Errorf("Failed to create TryJobIntegrator: %s", err)
+		return nil, skerr.Wrapf(err, "failed to create TryJobIntegrator")
 	}
 	jc := &JobCreator{
 		cacher:        chr,
@@ -334,7 +333,7 @@
 	// because of poorly-timed process restarts. Go through the unfinished
 	// jobs and cache them if necessary.
 	if err := jc.jCache.Update(ctx); err != nil {
-		return fmt.Errorf("Failed to update job cache: %s", err)
+		return skerr.Wrapf(err, "failed to update job cache")
 	}
 	unfinishedJobs, err := jc.jCache.InProgressJobs()
 	if err != nil {
@@ -372,7 +371,7 @@
 					// very long.
 					sklog.Errorf("Have cached error for RepoState %s", rs.RowKey())
 				} else {
-					return fmt.Errorf("Failed to cache RepoState: %s", err)
+					return skerr.Wrapf(err, "failed to cache RepoState")
 				}
 			}
 			return nil
@@ -413,7 +412,7 @@
 			main = repo.Get(git.MainBranch)
 		}
 		if main == nil {
-			return fmt.Errorf("Failed to retrieve branch %q or %q for %s", git.MasterBranch, git.MainBranch, repoUrl)
+			return skerr.Fmt("failed to retrieve branch %q or %q for %s", git.MasterBranch, git.MainBranch, repoUrl)
 		}
 		rs := types.RepoState{
 			Repo:     repoUrl,
@@ -424,13 +423,13 @@
 			err = cachedErr
 		}
 		if err != nil {
-			return fmt.Errorf("Failed to retrieve TaskCfg from %s: %s", repoUrl, err)
+			return skerr.Wrapf(err, "failed to retrieve TaskCfg from %s", repoUrl)
 		}
 		for name, js := range cfg.Jobs {
 			if js.Trigger == triggerName {
 				job, err := task_cfg_cache.MakeJob(ctx, jc.taskCfgCache, rs, name)
 				if err != nil {
-					return fmt.Errorf("Failed to create job: %s", err)
+					return skerr.Wrapf(err, "failed to create job")
 				}
 				job.Requested = job.Created
 				jobs = append(jobs, job)
@@ -474,7 +473,7 @@
 
 	// Insert the new jobs into the DB.
 	if err := jc.putJobsInChunks(ctx, jobsToInsert); err != nil {
-		return fmt.Errorf("Failed to add periodic jobs: %s", err)
+		return skerr.Wrapf(err, "failed to add periodic jobs")
 	}
 	sklog.Infof("Created %d periodic jobs for trigger %q", len(jobs), triggerName)
 	return nil
diff --git a/task_scheduler/go/job_creation/job_creation_test.go b/task_scheduler/go/job_creation/job_creation_test.go
index d38a565..4bc40ac 100644
--- a/task_scheduler/go/job_creation/job_creation_test.go
+++ b/task_scheduler/go/job_creation/job_creation_test.go
@@ -66,7 +66,7 @@
 	cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.TestCASDigest}).Return(tcc_testutils.TestCASDigest, nil)
 	cas.On("Merge", testutils.AnyContext, []string{tcc_testutils.PerfCASDigest}).Return(tcc_testutils.PerfCASDigest, nil)
 
-	jc, err := NewJobCreator(ctx, d, time.Duration(math.MaxInt64), 0, tmp, "fake.server", repos, cas, urlMock.Client(), tryjobs.API_URL_TESTING, tryjobs.BUCKET_TESTING, projectRepoMapping, depotTools, g, taskCfgCache, nil)
+	jc, err := NewJobCreator(ctx, d, time.Duration(math.MaxInt64), 0, tmp, "fake.server", repos, cas, urlMock.Client(), tryjobs.API_URL_TESTING, "fake-bb-target", tryjobs.BUCKET_TESTING, projectRepoMapping, depotTools, g, taskCfgCache, nil)
 	require.NoError(t, err)
 	return ctx, gb, d, jc, urlMock, cas, func() {
 		testutils.AssertCloses(t, jc)
diff --git a/task_scheduler/go/scheduling/perftest/BUILD.bazel b/task_scheduler/go/scheduling/perftest/BUILD.bazel
index 1a66c78..754bb55 100644
--- a/task_scheduler/go/scheduling/perftest/BUILD.bazel
+++ b/task_scheduler/go/scheduling/perftest/BUILD.bazel
@@ -16,6 +16,7 @@
         "//go/git/repograph",
         "//go/httputils",
         "//go/now",
+        "//go/pubsub/mocks",
         "//go/sklog",
         "//go/swarming",
         "//go/util",
diff --git a/task_scheduler/go/scheduling/perftest/perftest.go b/task_scheduler/go/scheduling/perftest/perftest.go
index 7bfac9e..5e1298d 100644
--- a/task_scheduler/go/scheduling/perftest/perftest.go
+++ b/task_scheduler/go/scheduling/perftest/perftest.go
@@ -34,6 +34,7 @@
 	"go.skia.org/infra/go/git/repograph"
 	"go.skia.org/infra/go/httputils"
 	"go.skia.org/infra/go/now"
+	pubsub_mocks "go.skia.org/infra/go/pubsub/mocks"
 	"go.skia.org/infra/go/sklog"
 	"go.skia.org/infra/go/swarming"
 	"go.skia.org/infra/go/util"
@@ -303,6 +304,7 @@
 	assertEqual(head, commits[0])
 
 	ts, err := google.DefaultTokenSource(ctx, datastore.ScopeDatastore)
+	assertNoError(err)
 	fsInstance := uuid.New().String()
 	d, err := firestore.NewDBWithParams(ctx, firestore.FIRESTORE_PROJECT, fsInstance, ts)
 	assertNoError(err)
@@ -346,7 +348,8 @@
 	}
 	depotTools, err := depot_tools.GetDepotTools(ctx, workdir, *recipesCfgFile)
 	assertNoError(err)
-	jc, err := job_creation.NewJobCreator(ctx, d, windowPeriod, 0, workdir, "localhost", repos, cas, client, "", "", nil, depotTools, nil, taskCfgCache, ts)
+	pubsubClient := &pubsub_mocks.Client{}
+	jc, err := job_creation.NewJobCreator(ctx, d, windowPeriod, 0, workdir, "localhost", repos, cas, client, "fake-bb-url", "fake-bb-target", "fake-bb-bucket", nil, depotTools, nil, taskCfgCache, pubsubClient)
 	assertNoError(err)
 
 	// Wait for job-creator to process the jobs from the repo.
diff --git a/task_scheduler/go/task-scheduler-jc/BUILD.bazel b/task_scheduler/go/task-scheduler-jc/BUILD.bazel
index ab043ee..530c91a 100644
--- a/task_scheduler/go/task-scheduler-jc/BUILD.bazel
+++ b/task_scheduler/go/task-scheduler-jc/BUILD.bazel
@@ -18,6 +18,7 @@
         "//go/httputils",
         "//go/human",
         "//go/periodic",
+        "//go/pubsub",
         "//go/sklog",
         "//go/tracing",
         "//go/util",
@@ -28,8 +29,8 @@
         "//task_scheduler/go/types",
         "@com_google_cloud_go_bigtable//:bigtable",
         "@com_google_cloud_go_datastore//:datastore",
-        "@com_google_cloud_go_pubsub//:pubsub",
         "@org_golang_google_api//compute/v1:compute",
+        "@org_golang_google_api//option",
         "@org_golang_x_oauth2//:oauth2",
         "@org_golang_x_oauth2//google",
     ],
diff --git a/task_scheduler/go/task-scheduler-jc/main.go b/task_scheduler/go/task-scheduler-jc/main.go
index 71147a9..8ea6330 100644
--- a/task_scheduler/go/task-scheduler-jc/main.go
+++ b/task_scheduler/go/task-scheduler-jc/main.go
@@ -11,10 +11,10 @@
 
 	"cloud.google.com/go/bigtable"
 	"cloud.google.com/go/datastore"
-	"cloud.google.com/go/pubsub"
 	"golang.org/x/oauth2"
 	"golang.org/x/oauth2/google"
 	"google.golang.org/api/compute/v1"
+	"google.golang.org/api/option"
 
 	"go.skia.org/infra/go/auth"
 	"go.skia.org/infra/go/cas/rbe"
@@ -28,6 +28,7 @@
 	"go.skia.org/infra/go/httputils"
 	"go.skia.org/infra/go/human"
 	"go.skia.org/infra/go/periodic"
+	"go.skia.org/infra/go/pubsub"
 	"go.skia.org/infra/go/sklog"
 	"go.skia.org/infra/go/tracing"
 	"go.skia.org/infra/go/util"
@@ -48,22 +49,24 @@
 
 var (
 	// Flags.
-	btInstance        = flag.String("bigtable_instance", "", "BigTable instance to use.")
-	btProject         = flag.String("bigtable_project", "", "GCE project to use for BigTable.")
-	host              = flag.String("host", "localhost", "HTTP service host")
-	port              = flag.String("port", ":8000", "HTTP service port for the web server (e.g., ':8000')")
-	disableTryjobs    = flag.Bool("disable_try_jobs", false, "If set, no try jobs will be picked up.")
-	firestoreInstance = flag.String("firestore_instance", "", "Firestore instance to use, eg. \"production\"")
-	gitstoreTable     = flag.String("gitstore_bt_table", "git-repos2", "BigTable table used for GitStore.")
-	local             = flag.Bool("local", false, "Whether we're running on a dev machine vs in production.")
-	rbeInstance       = flag.String("rbe_instance", "projects/chromium-swarm/instances/default_instance", "CAS instance to use")
-	repoUrls          = common.NewMultiStringFlag("repo", nil, "Repositories for which to schedule tasks.")
-	recipesCfgFile    = flag.String("recipes_cfg", "", "Path to the recipes.cfg file.")
-	timePeriod        = flag.String("timeWindow", "4d", "Time period to use.")
-	tryJobBucket      = flag.String("tryjob_bucket", tryjobs.BUCKET_PRIMARY, "Which Buildbucket bucket to use for try jobs.")
-	commitWindow      = flag.Int("commitWindow", 10, "Minimum number of recent commits to keep in the timeWindow.")
-	workdir           = flag.String("workdir", "workdir", "Working directory to use.")
-	promPort          = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')")
+	btInstance               = flag.String("bigtable_instance", "", "BigTable instance to use.")
+	btProject                = flag.String("bigtable_project", "", "GCE project to use for BigTable.")
+	buildbucketBucket        = flag.String("tryjob_bucket", tryjobs.BUCKET_PRIMARY, "Which Buildbucket bucket to use for try jobs.")
+	buildbucketTarget        = flag.String("buildbucket_target", "", "Buildbucket backend target name used to address this scheduler.")
+	buildbucketPubSubProject = flag.String("buildbucket_pubsub_project", "", "Pub/sub project used for sending messages to Buildbucket.")
+	host                     = flag.String("host", "localhost", "HTTP service host")
+	port                     = flag.String("port", ":8000", "HTTP service port for the web server (e.g., ':8000')")
+	disableTryjobs           = flag.Bool("disable_try_jobs", false, "If set, no try jobs will be picked up.")
+	firestoreInstance        = flag.String("firestore_instance", "", "Firestore instance to use, eg. \"production\"")
+	gitstoreTable            = flag.String("gitstore_bt_table", "git-repos2", "BigTable table used for GitStore.")
+	local                    = flag.Bool("local", false, "Whether we're running on a dev machine vs in production.")
+	rbeInstance              = flag.String("rbe_instance", "projects/chromium-swarm/instances/default_instance", "CAS instance to use")
+	repoUrls                 = common.NewMultiStringFlag("repo", nil, "Repositories for which to schedule tasks.")
+	recipesCfgFile           = flag.String("recipes_cfg", "", "Path to the recipes.cfg file.")
+	timePeriod               = flag.String("timeWindow", "4d", "Time period to use.")
+	commitWindow             = flag.Int("commitWindow", 10, "Minimum number of recent commits to keep in the timeWindow.")
+	workdir                  = flag.String("workdir", "workdir", "Working directory to use.")
+	promPort                 = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')")
 )
 
 func main() {
@@ -174,9 +177,15 @@
 		sklog.Fatalf("Failed to create TaskCfgCache: %s", err)
 	}
 
+	// Create pubsub client.
+	var pubsubClient pubsub.Client
+	if *buildbucketPubSubProject != "" {
+		pubsubClient, err = pubsub.NewClient(ctx, *buildbucketPubSubProject, option.WithTokenSource(tokenSource))
+	}
+
 	// Create and start the JobCreator.
 	sklog.Infof("Creating JobCreator.")
-	jc, err := job_creation.NewJobCreator(ctx, tsDb, period, *commitWindow, wdAbs, serverURL, repos, cas, httpClient, tryjobs.API_URL_PROD, *tryJobBucket, common.PROJECT_REPO_MAPPING, depotTools, gerrit, taskCfgCache, tokenSource)
+	jc, err := job_creation.NewJobCreator(ctx, tsDb, period, *commitWindow, wdAbs, serverURL, repos, cas, httpClient, tryjobs.API_URL_PROD, *buildbucketTarget, *buildbucketBucket, common.PROJECT_REPO_MAPPING, depotTools, gerrit, taskCfgCache, pubsubClient)
 	if err != nil {
 		sklog.Fatal(err)
 	}
diff --git a/task_scheduler/go/tryjobs/BUILD.bazel b/task_scheduler/go/tryjobs/BUILD.bazel
index de63425..3694eb8 100644
--- a/task_scheduler/go/tryjobs/BUILD.bazel
+++ b/task_scheduler/go/tryjobs/BUILD.bazel
@@ -14,6 +14,7 @@
         "//go/git/repograph",
         "//go/metrics2",
         "//go/now",
+        "//go/pubsub",
         "//go/skerr",
         "//go/sklog",
         "//go/util",
@@ -23,8 +24,11 @@
         "//task_scheduler/go/task_cfg_cache",
         "//task_scheduler/go/types",
         "@com_github_golang_protobuf//ptypes:go_default_library_gen",
+        "@com_github_hashicorp_go_multierror//:go-multierror",
+        "@com_google_cloud_go_pubsub//:pubsub",
         "@org_chromium_go_luci//buildbucket/proto",
         "@org_chromium_go_luci//common/api/buildbucket/buildbucket/v1:buildbucket",
+        "@org_golang_google_protobuf//encoding/prototext",
     ],
 )
 
@@ -44,6 +48,7 @@
         "//go/git/repograph",
         "//go/mockhttpclient",
         "//go/now",
+        "//go/pubsub/mocks",
         "//go/sklog",
         "//go/sktest",
         "//go/testutils",
@@ -57,8 +62,11 @@
         "//task_scheduler/go/types",
         "//task_scheduler/go/window",
         "@com_github_golang_protobuf//ptypes:go_default_library_gen",
+        "@com_github_stretchr_testify//mock",
         "@com_github_stretchr_testify//require",
+        "@com_google_cloud_go_pubsub//:pubsub",
         "@org_chromium_go_luci//buildbucket/proto",
         "@org_chromium_go_luci//common/api/buildbucket/buildbucket/v1:buildbucket",
+        "@org_golang_google_protobuf//encoding/prototext",
     ],
 )
diff --git a/task_scheduler/go/tryjobs/tryjobs.go b/task_scheduler/go/tryjobs/tryjobs.go
index 644ffe1..1a6d497 100644
--- a/task_scheduler/go/tryjobs/tryjobs.go
+++ b/task_scheduler/go/tryjobs/tryjobs.go
@@ -11,7 +11,9 @@
 	"sync"
 	"time"
 
+	pubsub_api "cloud.google.com/go/pubsub"
 	"github.com/golang/protobuf/ptypes"
+	"github.com/hashicorp/go-multierror"
 	buildbucketpb "go.chromium.org/luci/buildbucket/proto"
 	buildbucket_api "go.chromium.org/luci/common/api/buildbucket/buildbucket/v1"
 	"go.skia.org/infra/go/buildbucket"
@@ -21,6 +23,7 @@
 	"go.skia.org/infra/go/git/repograph"
 	"go.skia.org/infra/go/metrics2"
 	"go.skia.org/infra/go/now"
+	"go.skia.org/infra/go/pubsub"
 	"go.skia.org/infra/go/skerr"
 	"go.skia.org/infra/go/sklog"
 	"go.skia.org/infra/go/util"
@@ -29,6 +32,7 @@
 	"go.skia.org/infra/task_scheduler/go/db/cache"
 	"go.skia.org/infra/task_scheduler/go/task_cfg_cache"
 	"go.skia.org/infra/task_scheduler/go/types"
+	"google.golang.org/protobuf/encoding/prototext"
 )
 
 /*
@@ -85,34 +89,38 @@
 type TryJobIntegrator struct {
 	bb                 *buildbucket_api.Service
 	bb2                buildbucket.BuildBucketInterface
-	bucket             string
+	buildbucketBucket  string
+	buildbucketTarget  string
 	chr                cacher.Cacher
 	db                 db.JobDB
 	gerrit             gerrit.GerritInterface
 	host               string
 	jCache             cache.JobCache
 	projectRepoMapping map[string]string
+	pubsub             pubsub.Client
 	rm                 repograph.Map
 	taskCfgCache       task_cfg_cache.TaskCfgCache
 }
 
 // NewTryJobIntegrator returns a TryJobIntegrator instance.
-func NewTryJobIntegrator(apiUrl, bucket, host string, c *http.Client, d db.JobDB, jCache cache.JobCache, projectRepoMapping map[string]string, rm repograph.Map, taskCfgCache task_cfg_cache.TaskCfgCache, chr cacher.Cacher, gerrit gerrit.GerritInterface) (*TryJobIntegrator, error) {
+func NewTryJobIntegrator(ctx context.Context, buildbucketAPIURL, buildbucketTarget, buildbucketBucket, host string, c *http.Client, d db.JobDB, jCache cache.JobCache, projectRepoMapping map[string]string, rm repograph.Map, taskCfgCache task_cfg_cache.TaskCfgCache, chr cacher.Cacher, gerrit gerrit.GerritInterface, pubsubClient pubsub.Client) (*TryJobIntegrator, error) {
 	bb, err := buildbucket_api.New(c)
 	if err != nil {
 		return nil, err
 	}
-	bb.BasePath = apiUrl
+	bb.BasePath = buildbucketAPIURL
 	rv := &TryJobIntegrator{
 		bb:                 bb,
 		bb2:                buildbucket.NewClient(c),
-		bucket:             bucket,
+		buildbucketBucket:  buildbucketBucket,
+		buildbucketTarget:  buildbucketTarget,
 		db:                 d,
 		chr:                chr,
 		gerrit:             gerrit,
 		host:               host,
 		jCache:             jCache,
 		projectRepoMapping: projectRepoMapping,
+		pubsub:             pubsubClient,
 		rm:                 rm,
 		taskCfgCache:       taskCfgCache,
 	}
@@ -161,7 +169,7 @@
 	jobs := t.jCache.GetAllCachedJobs()
 	rv := []*types.Job{}
 	for _, job := range jobs {
-		if job.BuildbucketLeaseKey != 0 && job.Status != types.JOB_STATUS_REQUESTED {
+		if (job.BuildbucketLeaseKey != 0 || job.BuildbucketToken != "") && job.Status != types.JOB_STATUS_REQUESTED {
 			rv = append(rv, job)
 		}
 	}
@@ -178,12 +186,15 @@
 
 	// Divide up finished and unfinished Jobs.
 	finished := make([]*types.Job, 0, len(jobs))
-	unfinished := make([]*types.Job, 0, len(jobs))
+	unfinishedV1 := make([]*types.Job, 0, len(jobs))
+	unfinishedV2 := make([]*types.Job, 0, len(jobs))
 	for _, j := range jobs {
 		if j.Done() {
 			finished = append(finished, j)
+		} else if isBBv2(j) {
+			unfinishedV2 = append(unfinishedV2, j)
 		} else {
-			unfinished = append(unfinished, j)
+			unfinishedV1 = append(unfinishedV1, j)
 		}
 	}
 
@@ -193,7 +204,14 @@
 	wg.Add(1)
 	go func() {
 		defer wg.Done()
-		heartbeatErr = t.sendHeartbeats(ctx, unfinished)
+		heartbeatErr = t.sendHeartbeats(ctx, unfinishedV1)
+	}()
+
+	var pubsubErr error
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		pubsubErr = t.sendPubsubUpdates(ctx, unfinishedV2)
 	}()
 
 	// Send updates for finished Jobs, empty the lease keys to mark them
@@ -201,10 +219,11 @@
 	errs := []error{}
 	insert := make([]*types.Job, 0, len(finished))
 	for _, j := range finished {
-		if err := t.jobFinished(j); err != nil {
+		if err := t.jobFinished(ctx, j); err != nil {
 			errs = append(errs, err)
 		} else {
 			j.BuildbucketLeaseKey = 0
+			j.BuildbucketToken = ""
 			insert = append(insert, j)
 		}
 	}
@@ -217,6 +236,9 @@
 	if heartbeatErr != nil {
 		errs = append(errs, heartbeatErr)
 	}
+	if pubsubErr != nil {
+		errs = append(errs, pubsubErr)
+	}
 
 	if len(errs) > 0 {
 		return skerr.Fmt("Failed to update jobs; got errors: %v", errs)
@@ -237,6 +259,11 @@
 	s[i], s[j] = s[j], s[i]
 }
 
+// isBBv2 returns true iff the Job was triggered using Buildbucket V2.
+func isBBv2(j *types.Job) bool {
+	return j.BuildbucketPubSubTopic != ""
+}
+
 // sendHeartbeats sends heartbeats to Buildbucket for all of the unfinished try
 // Jobs.
 func (t *TryJobIntegrator) sendHeartbeats(ctx context.Context, jobs []*types.Job) error {
@@ -251,13 +278,6 @@
 
 	// Send heartbeats for all leases.
 	send := func(jobs []*types.Job) {
-		// TODO(borenet): Remove this logging when no longer needed.
-		ids := make([]string, 0, len(jobs))
-		for _, job := range jobs {
-			ids = append(ids, job.Id)
-		}
-		sklog.Infof("Sending heartbeats for jobs: %s", strings.Join(ids, ", "))
-
 		heartbeats := make([]*buildbucket_api.LegacyApiHeartbeatBatchRequestMessageOneHeartbeat, 0, len(jobs))
 		for _, j := range jobs {
 			heartbeats = append(heartbeats, &buildbucket_api.LegacyApiHeartbeatBatchRequestMessageOneHeartbeat{
@@ -319,6 +339,38 @@
 	return nil
 }
 
+// sendPubsubUpdates sends updates to Buildbucket via Pub/Sub for in-progress
+// Jobs.
+func (t *TryJobIntegrator) sendPubsubUpdates(ctx context.Context, jobs []*types.Job) error {
+	g := multierror.Group{}
+	for _, job := range jobs {
+		job := job // https://golang.org/doc/faq#closures_and_goroutines
+		g.Go(func() error {
+			update := &buildbucketpb.BuildTaskUpdate{
+				BuildId: strconv.FormatInt(job.BuildbucketBuildId, 10),
+				Task: &buildbucketpb.Task{
+					Id: &buildbucketpb.TaskID{
+						Target: t.buildbucketTarget,
+						Id:     job.Id,
+					},
+					Link:     job.URL(t.host),
+					Status:   jobStatusToBuildbucketStatus(job.Status),
+					UpdateId: now.Now(ctx).UnixNano(),
+				},
+			}
+			b, err := prototext.Marshal(update)
+			if err != nil {
+				return skerr.Wrapf(err, "failed to encode BuildTaskUpdate")
+			}
+			_, err = t.pubsub.Topic(job.BuildbucketPubSubTopic).Publish(ctx, &pubsub_api.Message{
+				Data: b,
+			}).Get(ctx)
+			return err
+		})
+	}
+	return g.Wait().ErrorOrNil()
+}
+
 // getRepo returns the repo information associated with the given URL.
 func (t *TryJobIntegrator) getRepo(repoUrl string) (*repograph.Graph, error) {
 	r, ok := t.rm[repoUrl]
@@ -363,8 +415,8 @@
 	return nil
 }
 
-func (t *TryJobIntegrator) remoteCancelBuild(id int64, msg string) error {
-	sklog.Warningf("Canceling Buildbucket build %d. Reason: %s", id, msg)
+func (t *TryJobIntegrator) remoteCancelV1Build(buildId int64, msg string) error {
+	sklog.Warningf("Canceling Buildbucket build %d. Reason: %s", buildId, msg)
 	message := struct {
 		Message string `json:"message"`
 	}{
@@ -374,7 +426,7 @@
 	if err != nil {
 		return err
 	}
-	resp, err := t.bb.Cancel(id, &buildbucket_api.LegacyApiCancelRequestBodyMessage{
+	resp, err := t.bb.Cancel(buildId, &buildbucket_api.LegacyApiCancelRequestBodyMessage{
 		ResultDetailsJson: string(b),
 	}).Do()
 	if err != nil {
@@ -386,7 +438,7 @@
 	return nil
 }
 
-func (t *TryJobIntegrator) tryLeaseBuild(ctx context.Context, id int64) (int64, *buildbucket_api.LegacyApiErrorMessage, error) {
+func (t *TryJobIntegrator) tryLeaseV1Build(ctx context.Context, id int64) (int64, *buildbucket_api.LegacyApiErrorMessage, error) {
 	expiration := now.Now(ctx).Add(LEASE_DURATION_INITIAL).Unix() * secondsToMicros
 	sklog.Infof("Attempting to lease build %d", id)
 	resp, err := t.bb.Lease(id, &buildbucket_api.LegacyApiLeaseRequestBodyMessage{
@@ -402,7 +454,7 @@
 	return leaseKey, resp.Error, nil
 }
 
-func (t *TryJobIntegrator) insertNewJob(ctx context.Context, buildId int64) error {
+func (t *TryJobIntegrator) insertNewJobV1(ctx context.Context, buildId int64) error {
 	// Get the build details from the v2 API.
 	build, err := t.bb2.GetBuild(ctx, buildId)
 	if err != nil {
@@ -410,13 +462,13 @@
 	}
 	if build.Status != buildbucketpb.Status_SCHEDULED {
 		sklog.Warningf("Found build %d with status: %s; attempting to lease anyway, to trigger the fix in Buildbucket.", build.Id, build.Status)
-		_, bbError, err := t.tryLeaseBuild(ctx, buildId)
+		_, bbError, err := t.tryLeaseV1Build(ctx, buildId)
 		if err != nil || bbError != nil {
 			// This is expected.
 			return nil
 		}
 		sklog.Warningf("Unexpectedly able to lease build %d with status %s; canceling it.", buildId, build.Status)
-		if err := t.remoteCancelBuild(buildId, fmt.Sprintf("Unexpected status %s", build.Status)); err != nil {
+		if err := t.remoteCancelV1Build(buildId, fmt.Sprintf("Unexpected status %s", build.Status)); err != nil {
 			sklog.Warningf("Failed to cancel errant build %d", buildId)
 			return nil
 		}
@@ -424,12 +476,12 @@
 
 	// Obtain and validate the RepoState.
 	if build.Input.GerritChanges == nil || len(build.Input.GerritChanges) != 1 {
-		return t.remoteCancelBuild(buildId, fmt.Sprintf("Invalid Build %d: input should have exactly one GerritChanges: %+v", buildId, build.Input))
+		return t.remoteCancelV1Build(buildId, fmt.Sprintf("Invalid Build %d: input should have exactly one GerritChanges: %+v", buildId, build.Input))
 	}
 	gerritChange := build.Input.GerritChanges[0]
 	repoUrl, ok := t.projectRepoMapping[gerritChange.Project]
 	if !ok {
-		return t.remoteCancelBuild(buildId, fmt.Sprintf("Unknown patch project %q", gerritChange.Project))
+		return t.remoteCancelV1Build(buildId, fmt.Sprintf("Unknown patch project %q", gerritChange.Project))
 	}
 	server := gerritChange.Host
 	if !strings.Contains(server, "://") {
@@ -449,7 +501,7 @@
 	}
 	requested, err := ptypes.Timestamp(build.CreateTime)
 	if err != nil {
-		return t.remoteCancelBuild(buildId, fmt.Sprintf("Failed to convert timestamp for %d: %s", build.Id, err))
+		return t.remoteCancelV1Build(buildId, fmt.Sprintf("Failed to convert timestamp for %d: %s", build.Id, err))
 	}
 	j := &types.Job{
 		Name:               build.Builder.Builder,
@@ -464,7 +516,7 @@
 		j.Requested = j.Created.Add(-firestore.TS_RESOLUTION)
 	}
 	// Attempt to lease the build.
-	leaseKey, bbError, err := t.tryLeaseBuild(ctx, j.BuildbucketBuildId)
+	leaseKey, bbError, err := t.tryLeaseV1Build(ctx, j.BuildbucketBuildId)
 	if err != nil {
 		return skerr.Wrapf(err, "failed to lease build %d", j.BuildbucketBuildId)
 	} else if bbError != nil {
@@ -472,14 +524,14 @@
 		// return an error is that the Build has been canceled. While this
 		// is the most likely reason, others are possible, and we may gain
 		// some information by reading the error and behaving accordingly.
-		return t.remoteCancelBuild(buildId, fmt.Sprintf("Buildbucket refused lease with %q", bbError.Message))
+		return t.remoteCancelV1Build(buildId, fmt.Sprintf("Buildbucket refused lease with %q", bbError.Message))
 	} else if leaseKey == 0 {
-		return t.remoteCancelBuild(buildId, "Buildbucket returned zero lease key")
+		return t.remoteCancelV1Build(buildId, "Buildbucket returned zero lease key")
 	}
 	j.BuildbucketLeaseKey = leaseKey
 
 	if err := t.db.PutJob(ctx, j); err != nil {
-		return t.remoteCancelBuild(j.BuildbucketBuildId, fmt.Sprintf("Failed to insert Job into the DB: %s", err))
+		return t.remoteCancelV1Build(j.BuildbucketBuildId, fmt.Sprintf("Failed to insert Job into the DB: %s", err))
 	}
 	t.jCache.AddJobs([]*types.Job{j})
 	return nil
@@ -594,12 +646,12 @@
 	if err := startJobHelper(); err != nil {
 		sklog.Infof("Failed to start job %s (build %d) with: %s", job.Id, job.BuildbucketBuildId, err)
 		job.Status = types.JOB_STATUS_MISHAP
-		job.StatusDetails = util.Truncate(fmt.Sprintf("Failed to start Job: %s", err), 1024)
+		job.StatusDetails = util.Truncate(fmt.Sprintf("Failed to start Job: %s", skerr.Unwrap(err)), 1024)
 	} else {
 		job.Status = types.JOB_STATUS_IN_PROGRESS
 
 		// Notify Buildbucket that the Job has started.
-		bbError, err := t.jobStarted(job)
+		bbToken, bbError, err := t.jobStarted(ctx, job)
 		if err != nil {
 			return skerr.Wrapf(err, "failed to send job-started notification")
 		} else if bbError != nil {
@@ -613,6 +665,8 @@
 			} else {
 				return skerr.Fmt("failed to start build %d with %q", job.BuildbucketBuildId, bbError.Message)
 			}
+		} else if bbToken != "" {
+			job.BuildbucketToken = bbToken
 		}
 	}
 
@@ -630,14 +684,12 @@
 	}
 
 	// Grab all of the pending Builds from Buildbucket.
-	// TODO(borenet): Buildbot maintains a maximum lease count. Should we do
-	// that too?
 	cursor := ""
 	errs := []error{}
 	var mtx sync.Mutex
 	for {
-		sklog.Infof("Running 'peek' on %s", t.bucket)
-		resp, err := t.bb.Peek().Bucket(t.bucket).MaxBuilds(PEEK_MAX_BUILDS).StartCursor(cursor).Do()
+		sklog.Infof("Running 'peek' on %s", t.buildbucketBucket)
+		resp, err := t.bb.Peek().Bucket(t.buildbucketBucket).MaxBuilds(PEEK_MAX_BUILDS).StartCursor(cursor).Do()
 		if err != nil {
 			errs = append(errs, err)
 			break
@@ -651,7 +703,7 @@
 			wg.Add(1)
 			go func(b *buildbucket_api.LegacyApiCommonBuildMessage) {
 				defer wg.Done()
-				if err := t.insertNewJob(ctx, b.Id); err != nil {
+				if err := t.insertNewJobV1(ctx, b.Id); err != nil {
 					mtx.Lock()
 					errs = append(errs, err)
 					mtx.Unlock()
@@ -673,26 +725,31 @@
 	return nil
 }
 
-// jobStarted notifies Buildbucket that the given Job has started. Returns any
-// error object returned by Buildbucket (eg. if the Build has been canceled), or
-// any error which occurred when attempting the request.
-func (t *TryJobIntegrator) jobStarted(j *types.Job) (*buildbucket_api.LegacyApiErrorMessage, error) {
-	sklog.Infof("bb.Start for job %s (build %d)", j.Id, j.BuildbucketBuildId)
-	resp, err := t.bb.Start(j.BuildbucketBuildId, &buildbucket_api.LegacyApiStartRequestBodyMessage{
-		LeaseKey: j.BuildbucketLeaseKey,
-		Url:      j.URL(t.host),
-	}).Do()
-	if err != nil {
-		return nil, err
+// jobStarted notifies Buildbucket that the given Job has started. Returns the
+// Buildbucket token returned by Buildbucket, any error object returned by
+// Buildbucket (eg. if the Build has been canceled), or any error which occurred
+// when attempting the request.
+func (t *TryJobIntegrator) jobStarted(ctx context.Context, j *types.Job) (string, *buildbucket_api.LegacyApiErrorMessage, error) {
+	if isBBv2(j) {
+		sklog.Infof("bb2.Start for job %s (build %d)", j.Id, j.BuildbucketBuildId)
+		updateToken, err := t.bb2.StartBuild(ctx, j.BuildbucketBuildId, j.Id, j.BuildbucketToken)
+		return updateToken, nil, skerr.Wrap(err)
+	} else {
+		sklog.Infof("bb.Start for job %s (build %d)", j.Id, j.BuildbucketBuildId)
+		resp, err := t.bb.Start(j.BuildbucketBuildId, &buildbucket_api.LegacyApiStartRequestBodyMessage{
+			LeaseKey: j.BuildbucketLeaseKey,
+			Url:      j.URL(t.host),
+		}).Do()
+		if err != nil {
+			return "", nil, err
+		}
+		return "", resp.Error, nil
 	}
-	return resp.Error, nil
 }
 
-// jobFinished notifies Buildbucket that the given Job has finished.
-func (t *TryJobIntegrator) jobFinished(j *types.Job) error {
-	if !j.Done() {
-		return skerr.Fmt("JobFinished called for unfinished Job!")
-	}
+// buildSucceededV1 sends a success notification to Buildbucket.
+func (t *TryJobIntegrator) buildSucceededV1(j *types.Job) error {
+	sklog.Infof("bb.Succeed for job %s (build %d)", j.Id, j.BuildbucketBuildId)
 	b, err := json.Marshal(struct {
 		Job *types.Job `json:"job"`
 	}{
@@ -701,49 +758,77 @@
 	if err != nil {
 		return err
 	}
-	if j.Status == types.JOB_STATUS_SUCCESS {
-		sklog.Infof("bb.Succeed for job %s (build %d)", j.Id, j.BuildbucketBuildId)
-		resp, err := t.bb.Succeed(j.BuildbucketBuildId, &buildbucket_api.LegacyApiSucceedRequestBodyMessage{
-			LeaseKey:          j.BuildbucketLeaseKey,
-			ResultDetailsJson: string(b),
-			Url:               j.URL(t.host),
-		}).Do()
-		if err != nil {
-			return err
-		}
-		if resp.Error != nil {
-			if resp.Error.Reason == BUILDBUCKET_API_ERROR_REASON_COMPLETED {
-				sklog.Warningf("Sent success status for build %d after completion.", j.BuildbucketBuildId)
-			} else {
-				return fmt.Errorf(resp.Error.Message)
-			}
-		}
-	} else {
-		failureReason := "BUILD_FAILURE"
-		if j.Status == types.JOB_STATUS_MISHAP {
-			failureReason = "INFRA_FAILURE"
-		}
-		sklog.Infof("bb.Fail for job %s (build %d)", j.Id, j.BuildbucketBuildId)
-		resp, err := t.bb.Fail(j.BuildbucketBuildId, &buildbucket_api.LegacyApiFailRequestBodyMessage{
-			FailureReason:     failureReason,
-			LeaseKey:          j.BuildbucketLeaseKey,
-			ResultDetailsJson: string(b),
-			Url:               j.URL(t.host),
-		}).Do()
-		if err != nil {
-			return err
-		}
-		if resp.Error != nil {
-			if resp.Error.Reason == BUILDBUCKET_API_ERROR_REASON_COMPLETED {
-				sklog.Warningf("Sent failure status for build %d after completion.", j.BuildbucketBuildId)
-			} else {
-				return fmt.Errorf(resp.Error.Message)
-			}
+	resp, err := t.bb.Succeed(j.BuildbucketBuildId, &buildbucket_api.LegacyApiSucceedRequestBodyMessage{
+		LeaseKey:          j.BuildbucketLeaseKey,
+		ResultDetailsJson: string(b),
+		Url:               j.URL(t.host),
+	}).Do()
+	if err != nil {
+		return err
+	}
+	if resp.Error != nil {
+		if resp.Error.Reason == BUILDBUCKET_API_ERROR_REASON_COMPLETED {
+			sklog.Warningf("Sent success status for build %d after completion.", j.BuildbucketBuildId)
+		} else {
+			return fmt.Errorf(resp.Error.Message)
 		}
 	}
 	return nil
 }
 
+// buildFailed sends a failure notification to Buildbucket.
+func (t *TryJobIntegrator) buildFailed(j *types.Job) error {
+	b, err := json.Marshal(struct {
+		Job *types.Job `json:"job"`
+	}{
+		Job: j,
+	})
+	if err != nil {
+		return err
+	}
+	failureReason := "BUILD_FAILURE"
+	if j.Status == types.JOB_STATUS_MISHAP {
+		failureReason = "INFRA_FAILURE"
+	}
+	sklog.Infof("bb.Fail for job %s (build %d)", j.Id, j.BuildbucketBuildId)
+	resp, err := t.bb.Fail(j.BuildbucketBuildId, &buildbucket_api.LegacyApiFailRequestBodyMessage{
+		FailureReason:     failureReason,
+		LeaseKey:          j.BuildbucketLeaseKey,
+		ResultDetailsJson: string(b),
+		Url:               j.URL(t.host),
+	}).Do()
+	if err != nil {
+		return err
+	}
+	if resp.Error != nil {
+		if resp.Error.Reason == BUILDBUCKET_API_ERROR_REASON_COMPLETED {
+			sklog.Warningf("Sent failure status for build %d after completion.", j.BuildbucketBuildId)
+		} else {
+			return fmt.Errorf(resp.Error.Message)
+		}
+	}
+	return nil
+}
+
+func (t *TryJobIntegrator) updateBuild(ctx context.Context, j *types.Job) error {
+	sklog.Infof("bb2.UpdateBuild for job %s (build %d)", j.Id, j.BuildbucketBuildId)
+	return t.bb2.UpdateBuild(ctx, jobToBuildV2(j), j.BuildbucketToken)
+}
+
+// jobFinished notifies Buildbucket that the given Job has finished.
+func (t *TryJobIntegrator) jobFinished(ctx context.Context, j *types.Job) error {
+	if !j.Done() {
+		return skerr.Fmt("JobFinished called for unfinished Job!")
+	}
+	if isBBv2(j) {
+		return t.updateBuild(ctx, j)
+	} else if j.Status == types.JOB_STATUS_SUCCESS {
+		return t.buildSucceededV1(j)
+	} else {
+		return t.buildFailed(j)
+	}
+}
+
 // skipRepoState determines whether we should skip try jobs for this RepoState,
 // eg. problematic CLs.
 func skipRepoState(rs types.RepoState) bool {
@@ -753,3 +838,39 @@
 	}
 	return false
 }
+
+func jobStatusToBuildbucketStatus(status types.JobStatus) buildbucketpb.Status {
+	switch status {
+	case types.JOB_STATUS_CANCELED:
+		return buildbucketpb.Status_CANCELED
+	case types.JOB_STATUS_FAILURE:
+		return buildbucketpb.Status_FAILURE
+	case types.JOB_STATUS_IN_PROGRESS:
+		return buildbucketpb.Status_STARTED
+	case types.JOB_STATUS_MISHAP:
+		return buildbucketpb.Status_INFRA_FAILURE
+	case types.JOB_STATUS_REQUESTED:
+		return buildbucketpb.Status_SCHEDULED
+	case types.JOB_STATUS_SUCCESS:
+		return buildbucketpb.Status_SUCCESS
+	default:
+		return buildbucketpb.Status_STATUS_UNSPECIFIED
+	}
+}
+
+// jobToBuildV2 converts a Job to a Buildbucket V2 Build to be used with
+// UpdateBuild.
+func jobToBuildV2(job *types.Job) *buildbucketpb.Build {
+	status := jobStatusToBuildbucketStatus(job.Status)
+
+	// Note: There are other fields we could fill in, but I'm not sure they
+	// would provide any value since we don't actually use Buildbucket builds
+	// for anything.
+	return &buildbucketpb.Build{
+		Id: job.BuildbucketBuildId,
+		Output: &buildbucketpb.Build_Output{
+			Status:          status,
+			SummaryMarkdown: job.StatusDetails,
+		},
+	}
+}
diff --git a/task_scheduler/go/tryjobs/tryjobs_test.go b/task_scheduler/go/tryjobs/tryjobs_test.go
index 4875f1e..3d8aba7 100644
--- a/task_scheduler/go/tryjobs/tryjobs_test.go
+++ b/task_scheduler/go/tryjobs/tryjobs_test.go
@@ -3,6 +3,7 @@
 import (
 	"context"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"sort"
 	"strconv"
@@ -10,6 +11,8 @@
 	"testing"
 	"time"
 
+	"cloud.google.com/go/pubsub"
+	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
 	buildbucketpb "go.chromium.org/luci/buildbucket/proto"
 	"go.skia.org/infra/go/buildbucket/mocks"
@@ -17,9 +20,12 @@
 	"go.skia.org/infra/go/gerrit"
 	"go.skia.org/infra/go/git"
 	"go.skia.org/infra/go/mockhttpclient"
+	"go.skia.org/infra/go/now"
+	pubsub_mocks "go.skia.org/infra/go/pubsub/mocks"
 	"go.skia.org/infra/go/testutils"
 	"go.skia.org/infra/task_scheduler/go/db"
 	"go.skia.org/infra/task_scheduler/go/types"
+	"google.golang.org/protobuf/encoding/prototext"
 )
 
 var distantFutureTime = time.Date(3000, time.January, 1, 0, 0, 0, 0, time.UTC)
@@ -40,18 +46,18 @@
 
 // Verify that updateJobs sends heartbeats for unfinished try Jobs and
 // success/failure for finished Jobs.
-func TestUpdateJobs_NoJobs_NoAction(t *testing.T) {
-	ctx, trybots, mock, _ := setup(t)
+func TestUpdateJob_NoJobs_NoAction(t *testing.T) {
+	ctx, trybots, mock, _, _ := setup(t)
 
 	assertNoActiveTryJobs(t, trybots)
 	require.NoError(t, trybots.updateJobs(ctx))
 	require.True(t, mock.Empty(), mock.List())
 }
 
-func TestUpdateJobs_OneUnfinished_SendsHeartbeat(t *testing.T) {
-	ctx, trybots, mock, _ := setup(t)
+func TestUpdateJobsV1_OneUnfinished_SendsHeartbeat(t *testing.T) {
+	ctx, trybots, mock, _, _ := setup(t)
 
-	j1 := tryjob(ctx, repoUrl)
+	j1 := tryjobV1(ctx, repoUrl)
 	MockHeartbeats(t, mock, ts, []*types.Job{j1}, nil)
 	require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j1}))
 	trybots.jCache.AddJobs([]*types.Job{j1})
@@ -60,41 +66,133 @@
 	assertActiveTryJob(t, trybots, j1)
 }
 
-func TestUpdateJobs_FinishedJob_SendSuccess(t *testing.T) {
-	ctx, trybots, mock, _ := setup(t)
+func TestUpdateJobsV2_OneUnfinished_SendsPubSub(t *testing.T) {
+	ctx, trybots, _, _, topic := setup(t)
 
-	j1 := tryjob(ctx, repoUrl)
+	// Create the Job.
+	j1 := tryjobV2(ctx, repoUrl)
+	require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j1}))
+	trybots.jCache.AddJobs([]*types.Job{j1})
+
+	// Mock the pubsub message.
+	update := &buildbucketpb.BuildTaskUpdate{
+		BuildId: strconv.FormatInt(j1.BuildbucketBuildId, 10),
+		Task: &buildbucketpb.Task{
+			Id: &buildbucketpb.TaskID{
+				Target: trybots.buildbucketTarget,
+				Id:     j1.Id,
+			},
+			Link:     j1.URL(trybots.host),
+			Status:   jobStatusToBuildbucketStatus(j1.Status),
+			UpdateId: now.Now(ctx).UnixNano(),
+		},
+	}
+	b, err := prototext.Marshal(update)
+	require.NoError(t, err)
+	result := &pubsub_mocks.PublishResult{}
+	result.On("Get", testutils.AnyContext).Return("fake-server-id", nil)
+	topic.On("Publish", testutils.AnyContext, &pubsub.Message{Data: b}).Return(result)
+
+	// Run updateJobs, assert that we sent the message.
+	require.NoError(t, trybots.updateJobs(ctx))
+	assertActiveTryJob(t, trybots, j1)
+	topic.AssertExpectations(t)
+	result.AssertExpectations(t)
+}
+
+func TestUpdateJobsV1_FinishedJob_SendSuccess(t *testing.T) {
+	ctx, trybots, mock, _, _ := setup(t)
+
+	j1 := tryjobV1(ctx, repoUrl)
 	j1.Status = types.JOB_STATUS_SUCCESS
 	j1.Finished = ts
 	require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j1}))
 	trybots.jCache.AddJobs([]*types.Job{j1})
+	require.NotEmpty(t, j1.BuildbucketLeaseKey)
 	MockJobSuccess(mock, j1, ts, false)
 	require.NoError(t, trybots.updateJobs(ctx))
 	require.True(t, mock.Empty(), mock.List())
 	assertNoActiveTryJobs(t, trybots)
+	j1, err := trybots.db.GetJobById(ctx, j1.Id)
+	require.NoError(t, err)
+	require.Empty(t, j1.BuildbucketLeaseKey)
+	require.Empty(t, j1.BuildbucketToken)
 }
 
-func TestUpdateJobs_FailedJob_SendFailure(t *testing.T) {
-	ctx, trybots, mock, _ := setup(t)
+func TestUpdateJobsV2_FinishedJob_SendSuccess(t *testing.T) {
+	ctx, trybots, _, mockBB, _ := setup(t)
 
-	j1 := tryjob(ctx, repoUrl)
+	j1 := tryjobV2(ctx, repoUrl)
+	j1.Status = types.JOB_STATUS_SUCCESS
+	j1.Finished = ts
+	require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j1}))
+	trybots.jCache.AddJobs([]*types.Job{j1})
+	require.NotEmpty(t, j1.BuildbucketToken)
+	mockBB.On("UpdateBuild", testutils.AnyContext, &buildbucketpb.Build{
+		Id: j1.BuildbucketBuildId,
+		Output: &buildbucketpb.Build_Output{
+			Status: buildbucketpb.Status_SUCCESS,
+		},
+	}, j1.BuildbucketToken).Return(nil)
+	require.NoError(t, trybots.updateJobs(ctx))
+	mockBB.AssertExpectations(t)
+	assertNoActiveTryJobs(t, trybots)
+	j1, err := trybots.db.GetJobById(ctx, j1.Id)
+	require.NoError(t, err)
+	require.Empty(t, j1.BuildbucketLeaseKey)
+	require.Empty(t, j1.BuildbucketToken)
+}
+
+func TestUpdateJobsV1_FailedJob_SendFailure(t *testing.T) {
+	ctx, trybots, mock, _, _ := setup(t)
+
+	j1 := tryjobV1(ctx, repoUrl)
 	j1.Status = types.JOB_STATUS_FAILURE
 	j1.Finished = ts
 	j1.BuildbucketLeaseKey = 12345
 	require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j1}))
 	trybots.jCache.AddJobs([]*types.Job{j1})
+	require.NotEmpty(t, j1.BuildbucketLeaseKey)
 	MockJobFailure(mock, j1, ts)
 	require.NoError(t, trybots.updateJobs(ctx))
 	require.True(t, mock.Empty(), mock.List())
 	assertNoActiveTryJobs(t, trybots)
+	j1, err := trybots.db.GetJobById(ctx, j1.Id)
+	require.NoError(t, err)
+	require.Empty(t, j1.BuildbucketLeaseKey)
+	require.Empty(t, j1.BuildbucketToken)
 }
 
-func TestUpdateJobs_ManyInProgress_MultipleHeartbeatBatches(t *testing.T) {
-	ctx, trybots, mock, _ := setup(t)
+func TestUpdateJobsV2_FailedJob_SendFailure(t *testing.T) {
+	ctx, trybots, _, mockBB, _ := setup(t)
+
+	j1 := tryjobV2(ctx, repoUrl)
+	j1.Status = types.JOB_STATUS_FAILURE
+	j1.Finished = ts
+	require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j1}))
+	trybots.jCache.AddJobs([]*types.Job{j1})
+	require.NotEmpty(t, j1.BuildbucketToken)
+	mockBB.On("UpdateBuild", testutils.AnyContext, &buildbucketpb.Build{
+		Id: j1.BuildbucketBuildId,
+		Output: &buildbucketpb.Build_Output{
+			Status: buildbucketpb.Status_FAILURE,
+		},
+	}, j1.BuildbucketToken).Return(nil)
+	require.NoError(t, trybots.updateJobs(ctx))
+	mockBB.AssertExpectations(t)
+	assertNoActiveTryJobs(t, trybots)
+	j1, err := trybots.db.GetJobById(ctx, j1.Id)
+	require.NoError(t, err)
+	require.Empty(t, j1.BuildbucketLeaseKey)
+	require.Empty(t, j1.BuildbucketToken)
+}
+
+func TestUpdateJobsV1_ManyInProgress_MultipleHeartbeatBatches(t *testing.T) {
+	ctx, trybots, mock, _, _ := setup(t)
 
 	jobs := []*types.Job{}
 	for i := 0; i < LEASE_BATCH_SIZE+2; i++ {
-		jobs = append(jobs, tryjob(ctx, repoUrl))
+		jobs = append(jobs, tryjobV1(ctx, repoUrl))
 	}
 	sort.Sort(heartbeatJobSlice(jobs))
 	MockHeartbeats(t, mock, ts, jobs[:LEASE_BATCH_SIZE], nil)
@@ -105,12 +203,55 @@
 	require.True(t, mock.Empty(), mock.List())
 }
 
-func TestUpdateJobs_HeartbeatBatchOneFailed_JobIsCanceled(t *testing.T) {
-	ctx, trybots, mock, _ := setup(t)
+func TestUpdateJobsV2_ManyInProgress_MultiplePubSubMessages(t *testing.T) {
+	ctx, trybots, _, _, topic := setup(t)
+
+	// Create the Jobs.
+	var jobs []*types.Job
+	for i := 0; i < 27; i++ { // Arbitrary number of jobs.
+		job := tryjobV2(ctx, repoUrl)
+		job.BuildbucketBuildId = int64(i) // Easier to debug.
+		jobs = append(jobs, job)
+	}
+	require.NoError(t, trybots.db.PutJobs(ctx, jobs))
+	trybots.jCache.AddJobs(jobs)
+
+	// Mock the pubsub messages.
+	allMocks := []*mock.Mock{&topic.Mock}
+	for _, job := range jobs {
+		update := &buildbucketpb.BuildTaskUpdate{
+			BuildId: strconv.FormatInt(job.BuildbucketBuildId, 10),
+			Task: &buildbucketpb.Task{
+				Id: &buildbucketpb.TaskID{
+					Target: trybots.buildbucketTarget,
+					Id:     job.Id,
+				},
+				Link:     job.URL(trybots.host),
+				Status:   jobStatusToBuildbucketStatus(job.Status),
+				UpdateId: now.Now(ctx).UnixNano(),
+			},
+		}
+		b, err := prototext.Marshal(update)
+		require.NoError(t, err)
+		result := &pubsub_mocks.PublishResult{}
+		result.On("Get", testutils.AnyContext).Return("fake-server-id", nil)
+		topic.On("Publish", testutils.AnyContext, &pubsub.Message{Data: b}).Return(result)
+		allMocks = append(allMocks, &result.Mock)
+	}
+
+	// Call updateJobs, assert that we sent the messages.
+	require.NoError(t, trybots.updateJobs(ctx))
+	for _, mock := range allMocks {
+		mock.AssertExpectations(t)
+	}
+}
+
+func TestUpdateJobsV1_HeartbeatBatchOneFailed_JobIsCanceled(t *testing.T) {
+	ctx, trybots, mock, _, _ := setup(t)
 
 	jobs := []*types.Job{}
 	for i := 0; i < LEASE_BATCH_SIZE+2; i++ {
-		jobs = append(jobs, tryjob(ctx, repoUrl))
+		jobs = append(jobs, tryjobV1(ctx, repoUrl))
 	}
 	j1, j2 := jobs[0], jobs[1]
 	for _, j := range jobs[2:] {
@@ -124,7 +265,7 @@
 	}
 	MockHeartbeats(t, mock, ts, []*types.Job{j1, j2}, map[string]*heartbeatResp{
 		j1.Id: {
-			BuildId: fmt.Sprintf("%d", j1.BuildbucketBuildId),
+			BuildId: strconv.FormatInt(j1.BuildbucketBuildId, 10),
 			Error: &errMsg{
 				Message: "fail",
 			},
@@ -142,7 +283,7 @@
 }
 
 func TestGetRevision(t *testing.T) {
-	ctx, trybots, mock, _ := setup(t)
+	ctx, trybots, mock, _, _ := setup(t)
 
 	// Get the (only) commit from the repo.
 	r, err := trybots.getRepo(repoUrl)
@@ -164,111 +305,159 @@
 	require.Equal(t, c, got)
 }
 
-func TestCancelBuild_Success(t *testing.T) {
-	_, trybots, mock, _ := setup(t)
+func TestRemoteCancelV1Build_Success(t *testing.T) {
+	_, trybots, mock, _, _ := setup(t)
 
 	const id = int64(12345)
 	MockCancelBuild(mock, id, "Canceling!")
-	require.NoError(t, trybots.remoteCancelBuild(id, "Canceling!"))
+	require.NoError(t, trybots.remoteCancelV1Build(id, "Canceling!"))
 	require.True(t, mock.Empty(), mock.List())
 }
 
-func TestCancelBuild_Success_LongMessageTruncated(t *testing.T) {
-	_, trybots, mock, _ := setup(t)
+func TestRemoteCancelV1Build_Success_LongMessageTruncated(t *testing.T) {
+	_, trybots, mock, _, _ := setup(t)
 
 	const id = int64(12345)
 	MockCancelBuild(mock, id, strings.Repeat("X", maxCancelReasonLen-3)+"...")
-	require.NoError(t, trybots.remoteCancelBuild(id, strings.Repeat("X", maxCancelReasonLen+50)))
+	require.NoError(t, trybots.remoteCancelV1Build(id, strings.Repeat("X", maxCancelReasonLen+50)))
 	require.True(t, mock.Empty(), mock.List())
 }
 
-func TestCancelBuild_Failed(t *testing.T) {
-	_, trybots, mock, _ := setup(t)
+func TestRemoteCancelV1Build_Failed(t *testing.T) {
+	_, trybots, mock, _, _ := setup(t)
 
 	const id = int64(12345)
 	expectErr := "Build does not exist!"
 	MockCancelBuildFailed(mock, id, "Canceling!", expectErr)
-	require.EqualError(t, trybots.remoteCancelBuild(id, "Canceling!"), expectErr)
+	require.EqualError(t, trybots.remoteCancelV1Build(id, "Canceling!"), expectErr)
 	require.True(t, mock.Empty(), mock.List())
 }
 
-func TestTryLeaseBuild_Success(t *testing.T) {
-	ctx, trybots, mock, _ := setup(t)
+func TestTryLeaseV1Build_Success(t *testing.T) {
+	ctx, trybots, mock, _, _ := setup(t)
 
 	const id = int64(12345)
 	MockTryLeaseBuild(mock, id)
-	k, bbError, err := trybots.tryLeaseBuild(ctx, id)
+	k, bbError, err := trybots.tryLeaseV1Build(ctx, id)
 	require.NoError(t, err)
 	require.Nil(t, bbError)
 	require.NotEqual(t, k, 0)
 	require.True(t, mock.Empty(), mock.List())
 }
 
-func TestTryLeaseBuild_Failure(t *testing.T) {
-	ctx, trybots, mock, _ := setup(t)
+func TestTryLeaseV1Build_Failure(t *testing.T) {
+	ctx, trybots, mock, _, _ := setup(t)
 
 	const id = int64(12345)
 	expect := "Can't lease this!"
 	MockTryLeaseBuildFailed(mock, id, expect)
-	_, bbError, err := trybots.tryLeaseBuild(ctx, id)
+	_, bbError, err := trybots.tryLeaseV1Build(ctx, id)
 	require.NoError(t, err)
 	require.NotNil(t, bbError)
 	require.Contains(t, bbError.Message, expect)
 	require.True(t, mock.Empty(), mock.List())
 }
 
-func TestJobStarted_Success(t *testing.T) {
-	ctx, trybots, mock, _ := setup(t)
+func TestJobStartedV1_Success(t *testing.T) {
+	ctx, trybots, mock, _, _ := setup(t)
 
-	j := tryjob(ctx, repoUrl)
+	j := tryjobV1(ctx, repoUrl)
 
 	MockJobStarted(mock, j.BuildbucketBuildId)
-	bbError, err := trybots.jobStarted(j)
+	bbToken, bbError, err := trybots.jobStarted(ctx, j)
 	require.NoError(t, err)
 	require.Nil(t, bbError)
+	require.Empty(t, bbToken) // No update token for V1 builds.
 	require.True(t, mock.Empty(), mock.List())
 }
 
-func TestJobStarted_Failure(t *testing.T) {
-	ctx, trybots, mock, _ := setup(t)
+func TestJobStartedV2_Success(t *testing.T) {
+	ctx, trybots, _, mockBB, _ := setup(t)
 
-	j := tryjob(ctx, repoUrl)
+	j := tryjobV2(ctx, repoUrl)
+
+	mockBB.On("StartBuild", testutils.AnyContext, j.BuildbucketBuildId, j.Id, j.BuildbucketToken).Return(bbFakeUpdateToken, nil)
+	bbToken, bbError, err := trybots.jobStarted(ctx, j)
+	require.NoError(t, err)
+	require.Nil(t, bbError)
+	require.Equal(t, bbFakeUpdateToken, bbToken)
+	mockBB.AssertExpectations(t)
+}
+
+func TestJobStartedV1_Failure(t *testing.T) {
+	ctx, trybots, mock, _, _ := setup(t)
+
+	j := tryjobV1(ctx, repoUrl)
 
 	expectErr := "fail"
-	MockJobStartedFailed(mock, j.BuildbucketBuildId, expectErr)
-	bbError, err := trybots.jobStarted(j)
+	MockJobStartedFailed(mock, j.BuildbucketBuildId, expectErr, "INVALID_INPUT")
+	bbToken, bbError, err := trybots.jobStarted(ctx, j)
 	require.NoError(t, err)
 	require.NotNil(t, bbError)
+	require.Empty(t, bbToken)
 	require.Contains(t, bbError.Message, expectErr)
+	require.Contains(t, bbError.Reason, "INVALID_INPUT")
 	require.True(t, mock.Empty(), mock.List())
 }
 
-func TestJobFinished_NotActuallyFinished(t *testing.T) {
-	ctx, trybots, _, _ := setup(t)
+func TestJobStartedV2_Failure(t *testing.T) {
+	ctx, trybots, _, mockBB, _ := setup(t)
 
-	j := tryjob(ctx, repoUrl)
+	j := tryjobV2(ctx, repoUrl)
 
-	require.ErrorContains(t, trybots.jobFinished(j), "JobFinished called for unfinished Job!")
+	mockBB.On("StartBuild", testutils.AnyContext, j.BuildbucketBuildId, j.Id, j.BuildbucketToken).Return("", errors.New("failed"))
+	bbToken, bbError, err := trybots.jobStarted(ctx, j)
+	require.ErrorContains(t, err, "failed")
+	require.Nil(t, bbError)
+	require.Empty(t, bbToken)
+	mockBB.AssertExpectations(t)
 }
 
-func TestJobFinished_JobSucceeded_UpdateSucceeds(t *testing.T) {
-	ctx, trybots, mock, _ := setup(t)
+func TestJobFinishedV1_NotActuallyFinished(t *testing.T) {
+	ctx, trybots, _, _, _ := setup(t)
 
-	j := tryjob(ctx, repoUrl)
+	j := tryjobV1(ctx, repoUrl)
+
+	require.ErrorContains(t, trybots.jobFinished(ctx, j), "JobFinished called for unfinished Job!")
+}
+
+func TestJobFinishedV1_JobSucceeded_UpdateSucceeds(t *testing.T) {
+	ctx, trybots, mock, _, _ := setup(t)
+
+	j := tryjobV1(ctx, repoUrl)
 	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
 	j.Status = types.JOB_STATUS_SUCCESS
 	j.Finished = now
 	require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j}))
 	trybots.jCache.AddJobs([]*types.Job{j})
 	MockJobSuccess(mock, j, now, false)
-	require.NoError(t, trybots.jobFinished(j))
+	require.NoError(t, trybots.jobFinished(ctx, j))
 	require.True(t, mock.Empty(), mock.List())
 }
 
-func TestJobFinished_JobSucceeded_UpdateFails(t *testing.T) {
-	ctx, trybots, mock, _ := setup(t)
+func TestJobFinishedV2_JobSucceeded_UpdateSucceeds(t *testing.T) {
+	ctx, trybots, _, mockBB, _ := setup(t)
 
-	j := tryjob(ctx, repoUrl)
+	j := tryjobV2(ctx, repoUrl)
+	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
+	j.Status = types.JOB_STATUS_SUCCESS
+	j.Finished = now
+	require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j}))
+	trybots.jCache.AddJobs([]*types.Job{j})
+	mockBB.On("UpdateBuild", testutils.AnyContext, &buildbucketpb.Build{
+		Id: j.BuildbucketBuildId,
+		Output: &buildbucketpb.Build_Output{
+			Status: buildbucketpb.Status_SUCCESS,
+		},
+	}, j.BuildbucketToken).Return(nil)
+	require.NoError(t, trybots.jobFinished(ctx, j))
+	mockBB.AssertExpectations(t)
+}
+
+func TestJobFinishedV1_JobSucceeded_UpdateFails(t *testing.T) {
+	ctx, trybots, mock, _, _ := setup(t)
+
+	j := tryjobV1(ctx, repoUrl)
 	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
 	j.Status = types.JOB_STATUS_SUCCESS
 	j.Finished = now
@@ -276,28 +465,66 @@
 	trybots.jCache.AddJobs([]*types.Job{j})
 	expectErr := "fail"
 	MockJobSuccess_Failed(mock, j, now, false, expectErr)
-	require.EqualError(t, trybots.jobFinished(j), expectErr)
+	require.EqualError(t, trybots.jobFinished(ctx, j), expectErr)
 	require.True(t, mock.Empty(), mock.List())
 }
 
-func TestJobFinished_JobFailed_UpdateSucceeds(t *testing.T) {
-	ctx, trybots, mock, _ := setup(t)
+func TestJobFinishedV2_JobSucceeded_UpdateFails(t *testing.T) {
+	ctx, trybots, _, mockBB, _ := setup(t)
 
-	j := tryjob(ctx, repoUrl)
+	j := tryjobV2(ctx, repoUrl)
+	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
+	j.Status = types.JOB_STATUS_SUCCESS
+	j.Finished = now
+	require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j}))
+	trybots.jCache.AddJobs([]*types.Job{j})
+	mockBB.On("UpdateBuild", testutils.AnyContext, &buildbucketpb.Build{
+		Id: j.BuildbucketBuildId,
+		Output: &buildbucketpb.Build_Output{
+			Status: buildbucketpb.Status_SUCCESS,
+		},
+	}, j.BuildbucketToken).Return(errors.New("failed"))
+	require.ErrorContains(t, trybots.jobFinished(ctx, j), "failed")
+	mockBB.AssertExpectations(t)
+}
+
+func TestJobFinishedV1_JobFailed_UpdateSucceeds(t *testing.T) {
+	ctx, trybots, mock, _, _ := setup(t)
+
+	j := tryjobV1(ctx, repoUrl)
 	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
 	j.Status = types.JOB_STATUS_FAILURE
 	j.Finished = now
 	require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j}))
 	trybots.jCache.AddJobs([]*types.Job{j})
 	MockJobFailure(mock, j, now)
-	require.NoError(t, trybots.jobFinished(j))
+	require.NoError(t, trybots.jobFinished(ctx, j))
 	require.True(t, mock.Empty(), mock.List())
 }
 
-func TestJobFinished_JobFailed_UpdateFails(t *testing.T) {
-	ctx, trybots, mock, _ := setup(t)
+func TestJobFinishedV2_JobFailed_UpdateSucceeds(t *testing.T) {
+	ctx, trybots, _, mockBB, _ := setup(t)
 
-	j := tryjob(ctx, repoUrl)
+	j := tryjobV2(ctx, repoUrl)
+	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
+	j.Status = types.JOB_STATUS_FAILURE
+	j.Finished = now
+	require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j}))
+	trybots.jCache.AddJobs([]*types.Job{j})
+	mockBB.On("UpdateBuild", testutils.AnyContext, &buildbucketpb.Build{
+		Id: j.BuildbucketBuildId,
+		Output: &buildbucketpb.Build_Output{
+			Status: buildbucketpb.Status_FAILURE,
+		},
+	}, j.BuildbucketToken).Return(nil)
+	require.NoError(t, trybots.jobFinished(ctx, j))
+	mockBB.AssertExpectations(t)
+}
+
+func TestJobFinishedV1_JobFailed_UpdateFails(t *testing.T) {
+	ctx, trybots, mock, _, _ := setup(t)
+
+	j := tryjobV1(ctx, repoUrl)
 	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
 	j.Status = types.JOB_STATUS_FAILURE
 	j.Finished = now
@@ -305,28 +532,66 @@
 	trybots.jCache.AddJobs([]*types.Job{j})
 	expectErr := "fail"
 	MockJobFailure_Failed(mock, j, now, expectErr)
-	require.EqualError(t, trybots.jobFinished(j), expectErr)
+	require.EqualError(t, trybots.jobFinished(ctx, j), expectErr)
 	require.True(t, mock.Empty(), mock.List())
 }
 
-func TestJobFinished_JobMishap_UpdateSucceeds(t *testing.T) {
-	ctx, trybots, mock, _ := setup(t)
+func TestJobFinishedV2_JobFailed_UpdateFails(t *testing.T) {
+	ctx, trybots, _, mockBB, _ := setup(t)
 
-	j := tryjob(ctx, repoUrl)
+	j := tryjobV2(ctx, repoUrl)
+	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
+	j.Status = types.JOB_STATUS_FAILURE
+	j.Finished = now
+	require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j}))
+	trybots.jCache.AddJobs([]*types.Job{j})
+	mockBB.On("UpdateBuild", testutils.AnyContext, &buildbucketpb.Build{
+		Id: j.BuildbucketBuildId,
+		Output: &buildbucketpb.Build_Output{
+			Status: buildbucketpb.Status_FAILURE,
+		},
+	}, j.BuildbucketToken).Return(errors.New("failed"))
+	require.ErrorContains(t, trybots.jobFinished(ctx, j), "failed")
+	mockBB.AssertExpectations(t)
+}
+
+func TestJobFinishedV1_JobMishap_UpdateSucceeds(t *testing.T) {
+	ctx, trybots, mock, _, _ := setup(t)
+
+	j := tryjobV1(ctx, repoUrl)
 	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
 	j.Status = types.JOB_STATUS_MISHAP
 	j.Finished = now
 	require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j}))
 	trybots.jCache.AddJobs([]*types.Job{j})
 	MockJobMishap(mock, j, now)
-	require.NoError(t, trybots.jobFinished(j))
+	require.NoError(t, trybots.jobFinished(ctx, j))
 	require.True(t, mock.Empty(), mock.List())
 }
 
-func TestJobFinished_JobMishap_UpdateFails(t *testing.T) {
-	ctx, trybots, mock, _ := setup(t)
+func TestJobFinishedV2_JobMishap_UpdateSucceeds(t *testing.T) {
+	ctx, trybots, _, mockBB, _ := setup(t)
 
-	j := tryjob(ctx, repoUrl)
+	j := tryjobV2(ctx, repoUrl)
+	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
+	j.Status = types.JOB_STATUS_MISHAP
+	j.Finished = now
+	require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j}))
+	trybots.jCache.AddJobs([]*types.Job{j})
+	mockBB.On("UpdateBuild", testutils.AnyContext, &buildbucketpb.Build{
+		Id: j.BuildbucketBuildId,
+		Output: &buildbucketpb.Build_Output{
+			Status: buildbucketpb.Status_INFRA_FAILURE,
+		},
+	}, j.BuildbucketToken).Return(nil)
+	require.NoError(t, trybots.jobFinished(ctx, j))
+	mockBB.AssertExpectations(t)
+}
+
+func TestJobFinishedV1_JobMishap_UpdateFails(t *testing.T) {
+	ctx, trybots, mock, _, _ := setup(t)
+
+	j := tryjobV1(ctx, repoUrl)
 	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
 	j.Status = types.JOB_STATUS_MISHAP
 	j.Finished = now
@@ -334,10 +599,29 @@
 	trybots.jCache.AddJobs([]*types.Job{j})
 	expectErr := "fail"
 	MockJobMishap_Failed(mock, j, now, expectErr)
-	require.EqualError(t, trybots.jobFinished(j), expectErr)
+	require.EqualError(t, trybots.jobFinished(ctx, j), expectErr)
 	require.True(t, mock.Empty(), mock.List())
 }
 
+func TestJobFinishedV2_JobMishap_UpdateFails(t *testing.T) {
+	ctx, trybots, _, mockBB, _ := setup(t)
+
+	j := tryjobV2(ctx, repoUrl)
+	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
+	j.Status = types.JOB_STATUS_MISHAP
+	j.Finished = now
+	require.NoError(t, trybots.db.PutJobs(ctx, []*types.Job{j}))
+	trybots.jCache.AddJobs([]*types.Job{j})
+	mockBB.On("UpdateBuild", testutils.AnyContext, &buildbucketpb.Build{
+		Id: j.BuildbucketBuildId,
+		Output: &buildbucketpb.Build_Output{
+			Status: buildbucketpb.Status_INFRA_FAILURE,
+		},
+	}, j.BuildbucketToken).Return(errors.New("failed"))
+	require.ErrorContains(t, trybots.jobFinished(ctx, j), "failed")
+	mockBB.AssertExpectations(t)
+}
+
 type addedJobs map[string]*types.Job
 
 func (aj addedJobs) getAddedJob(ctx context.Context, t *testing.T, d db.JobReader) *types.Job {
@@ -352,8 +636,8 @@
 	return nil
 }
 
-func TestInsertNewJob_LeaseSucceeds_StatusIsRequested(t *testing.T) {
-	ctx, trybots, mock, mockBB := setup(t)
+func TestInsertNewJobV1_LeaseSucceeds_StatusIsRequested(t *testing.T) {
+	ctx, trybots, mock, mockBB, _ := setup(t)
 
 	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
 	aj := addedJobs(map[string]*types.Job{})
@@ -364,7 +648,7 @@
 	b1 := Build(t, now)
 	mockBB.On("GetBuild", ctx, b1.Id).Return(b1, nil)
 	MockTryLeaseBuild(mock, b1.Id)
-	err := trybots.insertNewJob(ctx, b1.Id)
+	err := trybots.insertNewJobV1(ctx, b1.Id)
 	require.NoError(t, err)
 	require.True(t, mock.Empty(), mock.List())
 	result := aj.getAddedJob(ctx, t, trybots.db)
@@ -380,8 +664,8 @@
 	require.True(t, result.RepoState.Valid())
 }
 
-func TestInsertNewJob_NoGerritChanges_BuildIsCanceled(t *testing.T) {
-	ctx, trybots, mock, mockBB := setup(t)
+func TestInsertNewJobV1_NoGerritChanges_BuildIsCanceled(t *testing.T) {
+	ctx, trybots, mock, mockBB, _ := setup(t)
 
 	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
 	aj := addedJobs(map[string]*types.Job{})
@@ -390,15 +674,15 @@
 	b2.Input.GerritChanges = nil
 	MockCancelBuild(mock, b2.Id, fmt.Sprintf("Invalid Build %d: input should have exactly one GerritChanges: ", b2.Id))
 	mockBB.On("GetBuild", ctx, b2.Id).Return(b2, nil)
-	err := trybots.insertNewJob(ctx, b2.Id)
+	err := trybots.insertNewJobV1(ctx, b2.Id)
 	require.NoError(t, err) // We don't report errors for bad data from buildbucket.
 	result := aj.getAddedJob(ctx, t, trybots.db)
 	require.Nil(t, result)
 	require.True(t, mock.Empty(), mock.List())
 }
 
-func TestInsertNewJob_InvalidRepo_BuildIsCanceled(t *testing.T) {
-	ctx, trybots, mock, mockBB := setup(t)
+func TestInsertNewJobV1_InvalidRepo_BuildIsCanceled(t *testing.T) {
+	ctx, trybots, mock, mockBB, _ := setup(t)
 
 	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
 	aj := addedJobs(map[string]*types.Job{})
@@ -407,15 +691,15 @@
 	b3.Input.GerritChanges[0].Project = "bogus-repo"
 	MockCancelBuild(mock, b3.Id, `Unknown patch project \\\"bogus-repo\\\"`)
 	mockBB.On("GetBuild", ctx, b3.Id).Return(b3, nil)
-	err := trybots.insertNewJob(ctx, b3.Id)
+	err := trybots.insertNewJobV1(ctx, b3.Id)
 	require.NoError(t, err) // We don't report errors for bad data from buildbucket.
 	result := aj.getAddedJob(ctx, t, trybots.db)
 	require.Nil(t, result)
 	require.True(t, mock.Empty(), mock.List())
 }
 
-func TestInsertNewJob_LeaseFailed_BuildIsCanceled(t *testing.T) {
-	ctx, trybots, mock, mockBB := setup(t)
+func TestInsertNewJobV1_LeaseFailed_BuildIsCanceled(t *testing.T) {
+	ctx, trybots, mock, mockBB, _ := setup(t)
 
 	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
 	aj := addedJobs(map[string]*types.Job{})
@@ -425,15 +709,15 @@
 	expectErr := "Can't lease this!"
 	MockTryLeaseBuildFailed(mock, b4.Id, expectErr)
 	MockCancelBuild(mock, b4.Id, `Buildbucket refused lease with \\\"Can't lease this!\\\"`)
-	err := trybots.insertNewJob(ctx, b4.Id)
+	err := trybots.insertNewJobV1(ctx, b4.Id)
 	require.NoError(t, err) // We don't report errors for bad data from buildbucket.
 	result := aj.getAddedJob(ctx, t, trybots.db)
 	require.Nil(t, result)
 	require.True(t, mock.Empty(), mock.List())
 }
 
-func TestStartJob_NormalJob_Succeeds(t *testing.T) {
-	ctx, trybots, mock, mockBB := setup(t)
+func TestStartJobV1_NormalJob_Succeeds(t *testing.T) {
+	ctx, trybots, mock, mockBB, _ := setup(t)
 
 	mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
 	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
@@ -441,21 +725,41 @@
 	b1 := Build(t, now)
 	mockBB.On("GetBuild", ctx, b1.Id).Return(b1, nil)
 	MockTryLeaseBuild(mock, b1.Id)
-	require.NoError(t, trybots.insertNewJob(ctx, b1.Id))
+	require.NoError(t, trybots.insertNewJobV1(ctx, b1.Id))
 	j1 := aj.getAddedJob(ctx, t, trybots.db)
 	require.Empty(t, j1.Revision) // Revision isn't set until startJob runs.
 
 	MockJobStarted(mock, b1.Id)
-	err := trybots.startJob(ctx, j1)
-	require.NoError(t, err)
+	require.NoError(t, trybots.startJob(ctx, j1))
 	require.True(t, mock.Empty(), mock.List())
 	require.Equal(t, j1.BuildbucketBuildId, b1.Id)
 	require.NotEqual(t, "", j1.BuildbucketLeaseKey)
 	require.Equal(t, commit2.Hash, j1.Revision)
 }
 
-func TestStartJob_RevisionAlreadySet_Succeeds(t *testing.T) {
-	ctx, trybots, mock, mockBB := setup(t)
+func TestStartJobV2_NormalJob_Succeeds(t *testing.T) {
+	ctx, trybots, mock, mockBB, _ := setup(t)
+
+	j1 := tryjobV2(ctx, repoUrl)
+	j1.Revision = "" // No revision is set initially; it's derived in startJob.
+	j1.Status = types.JOB_STATUS_REQUESTED
+	require.NoError(t, trybots.db.PutJob(ctx, j1))
+	oldToken := j1.BuildbucketToken
+	mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
+	mockBB.On("StartBuild", testutils.AnyContext, j1.BuildbucketBuildId, j1.Id, j1.BuildbucketToken).Return(bbFakeUpdateToken, nil)
+	require.NoError(t, trybots.startJob(ctx, j1))
+	j1, err := trybots.db.GetJobById(ctx, j1.Id)
+	require.NoError(t, err)
+	require.Equal(t, commit2.Hash, j1.Revision)
+	require.Equal(t, types.JOB_STATUS_IN_PROGRESS, j1.Status)
+	// Start token is exchanged for an update token.
+	require.NotEmpty(t, j1.BuildbucketToken)
+	require.NotEqual(t, oldToken, j1.BuildbucketToken)
+	mockBB.AssertExpectations(t)
+}
+
+func TestStartJobV1_RevisionAlreadySet_Succeeds(t *testing.T) {
+	ctx, trybots, mock, mockBB, _ := setup(t)
 
 	mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
 	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
@@ -463,21 +767,43 @@
 	b1 := Build(t, now)
 	mockBB.On("GetBuild", ctx, b1.Id).Return(b1, nil)
 	MockTryLeaseBuild(mock, b1.Id)
-	require.NoError(t, trybots.insertNewJob(ctx, b1.Id))
+	require.NoError(t, trybots.insertNewJobV1(ctx, b1.Id))
 	j1 := aj.getAddedJob(ctx, t, trybots.db)
 	j1.Revision = oldBranchName // We'll resolve this to the actual hash.
 
 	MockJobStarted(mock, b1.Id)
-	err := trybots.startJob(ctx, j1)
-	require.NoError(t, err)
+	require.NoError(t, trybots.startJob(ctx, j1))
 	require.True(t, mock.Empty(), mock.List())
 	require.Equal(t, j1.BuildbucketBuildId, b1.Id)
 	require.NotEqual(t, "", j1.BuildbucketLeaseKey)
 	require.Equal(t, commit1.Hash, j1.Revision) // Ensure we resolved the branch
 }
 
-func TestStartJob_NormalJob_Failed(t *testing.T) {
-	ctx, trybots, mock, mockBB := setup(t)
+func TestStartJobV2_RevisionAlreadySet_Succeeds(t *testing.T) {
+	ctx, trybots, mock, mockBB, _ := setup(t)
+
+	j1 := tryjobV2(ctx, repoUrl)
+	// Set revision to a different value; ensure we don't override.
+	j1.Revision = commit1.Hash
+	j1.Status = types.JOB_STATUS_REQUESTED
+	require.NoError(t, trybots.db.PutJob(ctx, j1))
+	oldToken := j1.BuildbucketToken
+	mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
+	mockBB.On("StartBuild", testutils.AnyContext, j1.BuildbucketBuildId, j1.Id, j1.BuildbucketToken).Return(bbFakeUpdateToken, nil)
+	require.NoError(t, trybots.startJob(ctx, j1))
+	j1, err := trybots.db.GetJobById(ctx, j1.Id)
+	require.NoError(t, err)
+	require.Equal(t, commit1.Hash, j1.Revision)
+	require.Equal(t, types.JOB_STATUS_IN_PROGRESS, j1.Status)
+	// Start token is exchanged for an update token.
+	require.NotEmpty(t, j1.BuildbucketToken)
+	require.NotEqual(t, oldToken, j1.BuildbucketToken)
+	require.True(t, j1.Valid())
+	mockBB.AssertExpectations(t)
+}
+
+func TestStartJobV1_NormalJob_Failed(t *testing.T) {
+	ctx, trybots, mock, mockBB, _ := setup(t)
 
 	mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
 	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
@@ -485,22 +811,44 @@
 	b1 := Build(t, now)
 	mockBB.On("GetBuild", ctx, b1.Id).Return(b1, nil)
 	MockTryLeaseBuild(mock, b1.Id)
-	require.NoError(t, trybots.insertNewJob(ctx, b1.Id))
+	require.NoError(t, trybots.insertNewJobV1(ctx, b1.Id))
 	j1 := aj.getAddedJob(ctx, t, trybots.db)
 
 	expectErr := "Can't start this build!"
-	MockJobStartedFailed(mock, b1.Id, expectErr)
+	MockJobStartedFailed(mock, b1.Id, expectErr, "INVALID_INPUT")
 	err := trybots.startJob(ctx, j1)
 	require.ErrorContains(t, err, expectErr)
 	require.True(t, mock.Empty(), mock.List())
-	updatedJ1, err := trybots.jCache.GetJob(j1.Id)
+	j1, err = trybots.jCache.GetJob(j1.Id)
 	require.NoError(t, err)
-	require.Equal(t, types.JOB_STATUS_CANCELED, updatedJ1.Status)
-	// TODO(borenet): Add a field to Job to give more details and check it here.
+	require.Equal(t, types.JOB_STATUS_CANCELED, j1.Status)
+	require.Contains(t, j1.StatusDetails, "INVALID_INPUT")
 }
 
-func TestStartJob_InvalidJobSpec_Failed(t *testing.T) {
-	ctx, trybots, mock, mockBB := setup(t)
+func TestStartJobV2_NormalJob_Failed(t *testing.T) {
+	ctx, trybots, mock, mockBB, _ := setup(t)
+
+	j1 := tryjobV2(ctx, repoUrl)
+	j1.Revision = "" // No revision is set initially; it's derived in startJob.
+	j1.Status = types.JOB_STATUS_REQUESTED
+	require.NoError(t, trybots.db.PutJob(ctx, j1))
+	oldToken := j1.BuildbucketToken
+	mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
+	mockBB.On("StartBuild", testutils.AnyContext, j1.BuildbucketBuildId, j1.Id, j1.BuildbucketToken).Return("", errors.New("can't start this build"))
+	err := trybots.startJob(ctx, j1)
+	require.ErrorContains(t, err, "can't start this build")
+	require.ErrorContains(t, err, "failed to send job-started notification")
+	j1, err = trybots.db.GetJobById(ctx, j1.Id)
+	require.NoError(t, err)
+	require.Empty(t, j1.Revision)
+	require.Equal(t, types.JOB_STATUS_REQUESTED, j1.Status)
+	require.NotEmpty(t, j1.BuildbucketToken)
+	require.Equal(t, oldToken, j1.BuildbucketToken)
+	mockBB.AssertExpectations(t)
+}
+
+func TestStartJobV1_InvalidJobSpec_Failed(t *testing.T) {
+	ctx, trybots, mock, mockBB, _ := setup(t)
 
 	mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
 	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
@@ -510,7 +858,7 @@
 	b2.Builder.Builder = "bogus-job"
 	mockBB.On("GetBuild", ctx, b2.Id).Return(b2, nil)
 	MockTryLeaseBuild(mock, b2.Id)
-	require.NoError(t, trybots.insertNewJob(ctx, b2.Id))
+	require.NoError(t, trybots.insertNewJobV1(ctx, b2.Id))
 	j2 := aj.getAddedJob(ctx, t, trybots.db)
 	mockBB.On("GetBuild", ctx, b2.Id).Return(b2, nil)
 	err := trybots.startJob(ctx, j2)
@@ -519,7 +867,27 @@
 	j2, err = trybots.jCache.GetJob(j2.Id)
 	require.NoError(t, err)
 	require.Equal(t, types.JOB_STATUS_MISHAP, j2.Status)
-	// TODO(borenet): Add a field to Job to give more details and check it here.
+	require.Contains(t, j2.StatusDetails, "Failed to start Job: no such job: bogus-job")
+}
+
+func TestStartJobV2_InvalidJobSpec_Failed(t *testing.T) {
+	ctx, trybots, mock, mockBB, _ := setup(t)
+
+	j1 := tryjobV2(ctx, repoUrl)
+	j1.Name = "bogus-job"
+	j1.Revision = "" // No revision is set initially; it's derived in startJob.
+	j1.Status = types.JOB_STATUS_REQUESTED
+	require.NoError(t, trybots.db.PutJob(ctx, j1))
+	mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
+	require.NoError(t, trybots.startJob(ctx, j1))
+	j1, err := trybots.db.GetJobById(ctx, j1.Id)
+	require.NoError(t, err)
+	require.Equal(t, commit2.Hash, j1.Revision)
+	require.Equal(t, types.JOB_STATUS_MISHAP, j1.Status)
+	require.Equal(t, bbFakeStartToken, j1.BuildbucketToken)
+	require.True(t, j1.Valid())
+	require.Contains(t, j1.StatusDetails, "Failed to start Job: no such job: bogus-job")
+	mockBB.AssertExpectations(t)
 }
 
 func mockGetChangeInfo(t *testing.T, mock *mockhttpclient.URLMock, id int, project, branch string) {
@@ -534,8 +902,8 @@
 	mock.Mock(fmt.Sprintf("%s/a%s", fakeGerritUrl, fmt.Sprintf(gerrit.URLTmplChange, ci.Id)), mockhttpclient.MockGetDialogue(issueBytes))
 }
 
-func TestRetry(t *testing.T) {
-	ctx, trybots, mock, mockBB := setup(t)
+func TestRetryV1(t *testing.T) {
+	ctx, trybots, mock, mockBB, _ := setup(t)
 
 	mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
 
@@ -547,7 +915,7 @@
 	MockTryLeaseBuild(mock, b1.Id)
 	MockJobStarted(mock, b1.Id)
 	mockBB.On("GetBuild", ctx, b1.Id).Return(b1, nil)
-	require.NoError(t, trybots.insertNewJob(ctx, b1.Id))
+	require.NoError(t, trybots.insertNewJobV1(ctx, b1.Id))
 	j1 := aj.getAddedJob(ctx, t, trybots.db)
 	require.NoError(t, trybots.startJob(ctx, j1))
 	require.True(t, mock.Empty(), mock.List())
@@ -564,14 +932,47 @@
 	MockTryLeaseBuild(mock, b2.Id)
 	MockJobStarted(mock, b2.Id)
 	mockBB.On("GetBuild", ctx, b2.Id).Return(b2, nil)
-	require.NoError(t, trybots.insertNewJob(ctx, b2.Id))
+	require.NoError(t, trybots.insertNewJobV1(ctx, b2.Id))
 	j2 := aj.getAddedJob(ctx, t, trybots.db)
 	require.NoError(t, trybots.startJob(ctx, j2))
 	require.True(t, mock.Empty(), mock.List())
 	require.Equal(t, j2.BuildbucketBuildId, b2.Id)
 	require.NotEqual(t, "", j2.BuildbucketLeaseKey)
 	require.True(t, j2.Valid())
+}
+
+func TestRetryV2(t *testing.T) {
+	ctx, trybots, mock, mockBB, _ := setup(t)
+
+	mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
+
+	// Insert one try job.
+	j1 := tryjobV2(ctx, repoUrl)
+	j1.Revision = "" // No revision is set initially; it's derived in startJob.
+	j1.Status = types.JOB_STATUS_REQUESTED
+	require.NoError(t, trybots.db.PutJob(ctx, j1))
+	mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
+	mockBB.On("StartBuild", testutils.AnyContext, j1.BuildbucketBuildId, j1.Id, j1.BuildbucketToken).Return(bbFakeUpdateToken, nil)
+	require.NoError(t, trybots.startJob(ctx, j1))
+	mockBB.AssertExpectations(t)
+
+	// Obtain a second try job, ensure that it gets IsForce = true.
+	j2 := tryjobV2(ctx, repoUrl)
+	j2.Revision = "" // No revision is set initially; it's derived in startJob.
+	j2.Status = types.JOB_STATUS_REQUESTED
+	require.NoError(t, trybots.db.PutJob(ctx, j2))
+	mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
+	mockBB.On("StartBuild", testutils.AnyContext, j2.BuildbucketBuildId, j2.Id, j2.BuildbucketToken).Return(bbFakeUpdateToken, nil)
+	require.NoError(t, trybots.startJob(ctx, j2))
+	j2, err := trybots.db.GetJobById(ctx, j2.Id)
+	require.NoError(t, err)
+	require.Equal(t, commit2.Hash, j2.Revision)
+	require.Equal(t, types.JOB_STATUS_IN_PROGRESS, j2.Status)
+	// Start token is exchanged for an update token.
+	require.NotEmpty(t, j2.BuildbucketToken)
 	require.True(t, j2.IsForce)
+	require.True(t, j2.Valid())
+	mockBB.AssertExpectations(t)
 }
 
 func testPollAssertAdded(t *testing.T, now time.Time, trybots *TryJobIntegrator, builds []*buildbucketpb.Build) {
@@ -617,7 +1018,7 @@
 }
 
 func TestPoll_OneNewBuild_Success(t *testing.T) {
-	_, trybots, mock, mockBB := setup(t)
+	_, trybots, mock, mockBB, _ := setup(t)
 	mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
 	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
 
@@ -625,7 +1026,7 @@
 }
 
 func TestPoll_MultipleNewBuilds_Success(t *testing.T) {
-	_, trybots, mock, mockBB := setup(t)
+	_, trybots, mock, mockBB, _ := setup(t)
 	mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
 	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
 
@@ -633,7 +1034,7 @@
 }
 
 func TestPoll_MultiplePagesOfNewBuilds_Success(t *testing.T) {
-	_, trybots, mock, mockBB := setup(t)
+	_, trybots, mock, mockBB, _ := setup(t)
 	mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
 	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
 
@@ -648,7 +1049,7 @@
 }
 
 func TestPoll_MultipleNewBuilds_OneFailsInsert_OthersInsertSuccessfully(t *testing.T) {
-	_, trybots, mock, mockBB := setup(t)
+	_, trybots, mock, mockBB, _ := setup(t)
 	mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
 	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
 
@@ -668,7 +1069,7 @@
 }
 
 func TestPoll_MultiplePagesOfNewBuilds_OnePeekFails_OthersInsertSuccessfully(t *testing.T) {
-	_, trybots, mock, mockBB := setup(t)
+	_, trybots, mock, mockBB, _ := setup(t)
 	mockGetChangeInfo(t, mock, gerritIssue, patchProject, git.MainBranch)
 	now := time.Date(2021, time.April, 27, 0, 0, 0, 0, time.UTC)
 
diff --git a/task_scheduler/go/tryjobs/utils_test.go b/task_scheduler/go/tryjobs/utils_test.go
index d2bcaf3..0d562ff 100644
--- a/task_scheduler/go/tryjobs/utils_test.go
+++ b/task_scheduler/go/tryjobs/utils_test.go
@@ -20,6 +20,7 @@
 	"go.skia.org/infra/go/git/repograph"
 	"go.skia.org/infra/go/mockhttpclient"
 	"go.skia.org/infra/go/now"
+	pubsub_mocks "go.skia.org/infra/go/pubsub/mocks"
 	"go.skia.org/infra/go/sklog"
 	"go.skia.org/infra/go/sktest"
 	"go.skia.org/infra/go/testutils"
@@ -41,8 +42,12 @@
 	patchProject   = "skia"
 	parentProject  = "parent-project"
 
-	fakeGerritUrl = "https://fake-skia-review.googlesource.com"
-	oldBranchName = "old-branch"
+	fakeGerritUrl     = "https://fake-skia-review.googlesource.com"
+	oldBranchName     = "old-branch"
+	bbPubSubProject   = "fake-bb-pubsub-project"
+	bbPubSubTopic     = "fake-bb-pubsub-topic"
+	bbFakeStartToken  = "fake-bb-start-token"
+	bbFakeUpdateToken = "fake-bb-update-token"
 )
 
 var (
@@ -93,7 +98,7 @@
 
 // setup prepares the tests to run. Returns the created temporary dir,
 // TryJobIntegrator instance, and URLMock instance.
-func setup(t sktest.TestingT) (context.Context, *TryJobIntegrator, *mockhttpclient.URLMock, *mocks.BuildBucketInterface) {
+func setup(t sktest.TestingT) (context.Context, *TryJobIntegrator, *mockhttpclient.URLMock, *mocks.BuildBucketInterface, *pubsub_mocks.Topic) {
 	ctx := context.WithValue(context.Background(), now.ContextKey, ts)
 
 	// Set up other TryJobIntegrator inputs.
@@ -130,9 +135,12 @@
 	require.NoError(t, err)
 	jCache, err := cache.NewJobCache(ctx, d, window, nil)
 	require.NoError(t, err)
-	integrator, err := NewTryJobIntegrator(API_URL_TESTING, BUCKET_TESTING, "fake-server", mock.Client(), d, jCache, projectRepoMapping, rm, taskCfgCache, chr, g)
+	pubsubClient := &pubsub_mocks.Client{}
+	pubsubTopic := &pubsub_mocks.Topic{}
+	pubsubClient.On("Topic", bbPubSubTopic).Return(pubsubTopic, nil)
+	integrator, err := NewTryJobIntegrator(ctx, API_URL_TESTING, "fake-bb-target", BUCKET_TESTING, "fake-server", mock.Client(), d, jCache, projectRepoMapping, rm, taskCfgCache, chr, g, pubsubClient)
 	require.NoError(t, err)
-	return ctx, integrator, mock, MockBuildbucket(integrator)
+	return ctx, integrator, mock, MockBuildbucket(integrator), pubsubTopic
 }
 
 func MockBuildbucket(tj *TryJobIntegrator) *mocks.BuildBucketInterface {
@@ -171,24 +179,24 @@
 	}
 }
 
-func tryjob(ctx context.Context, repoName string) *types.Job {
+func tryjobV1(ctx context.Context, repoName string) *types.Job {
 	return &types.Job{
 		BuildbucketBuildId:  rand.Int63(),
 		BuildbucketLeaseKey: rand.Int63(),
 		Created:             now.Now(ctx),
-		Name:                "fake-name",
-		RepoState: types.RepoState{
-			Patch: types.Patch{
-				Server:   "fake-server",
-				Issue:    "fake-issue",
-				Patchset: "fake-patchset",
-			},
-			Repo:     repoName,
-			Revision: "fake-revision",
-		},
+		Name:                tcc_testutils.BuildTaskName,
+		RepoState:           repoState2,
 	}
 }
 
+func tryjobV2(ctx context.Context, repoName string) *types.Job {
+	job := tryjobV1(ctx, repoName)
+	job.BuildbucketLeaseKey = 0
+	job.BuildbucketToken = bbFakeStartToken
+	job.BuildbucketPubSubTopic = bbPubSubTopic
+	return job
+}
+
 type errMsg struct {
 	Message string `json:"message"`
 }
@@ -277,15 +285,15 @@
 	// We have to use this because we don't know what the Job ID is going to
 	// be until after it's inserted into the DB.
 	req := mockhttpclient.DONT_CARE_REQUEST
-	resp := []byte("{}")
+	resp := []byte(fmt.Sprintf(`{"build": {},"update_build_token":"%s"}`, bbFakeUpdateToken))
 	mock.MockOnce(fmt.Sprintf("%sbuilds/%d/start?alt=json&prettyPrint=false", API_URL_TESTING, id), mockhttpclient.MockPostDialogue("application/json", req, resp))
 }
 
-func MockJobStartedFailed(mock *mockhttpclient.URLMock, id int64, mockErr string) {
+func MockJobStartedFailed(mock *mockhttpclient.URLMock, id int64, mockErr, mockReason string) {
 	// We have to use this because we don't know what the Job ID is going to
 	// be until after it's inserted into the DB.
 	req := mockhttpclient.DONT_CARE_REQUEST
-	resp := []byte(fmt.Sprintf("{\"error\":{\"message\":\"%s\"}}", mockErr))
+	resp := []byte(fmt.Sprintf(`{"error":{"message":"%s","reason":"%s"}}`, mockErr, mockReason))
 	mock.MockOnce(fmt.Sprintf("%sbuilds/%d/start?alt=json&prettyPrint=false", API_URL_TESTING, id), mockhttpclient.MockPostDialogue("application/json", req, resp))
 }