[task scheduler] Add Buildbucket TaskBackendServer implementation

Bug: b/288158829
Change-Id: Id6949b19ad83fd4bd34fc75ce35f9369fd04b476
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/787324
Reviewed-by: Kevin Lubick <kjlubick@google.com>
Commit-Queue: Eric Boren <borenet@google.com>
diff --git a/.mockery.yaml b/.mockery.yaml
index 0eb0f92..88a238c 100644
--- a/.mockery.yaml
+++ b/.mockery.yaml
@@ -307,6 +307,7 @@
       dir: "{{.InterfaceDir}}/../mocks"
     interfaces:
       RemoteDB:
+      JobDB:
   go.skia.org/infra/task_scheduler/go/db/cache:
     interfaces:
       JobCache:
diff --git a/task_scheduler/go/job_creation/buildbucket_taskbackend/BUILD.bazel b/task_scheduler/go/job_creation/buildbucket_taskbackend/BUILD.bazel
new file mode 100644
index 0000000..492ee16
--- /dev/null
+++ b/task_scheduler/go/job_creation/buildbucket_taskbackend/BUILD.bazel
@@ -0,0 +1,39 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("//bazel/go:go_test.bzl", "go_test")
+
+go_library(
+    name = "buildbucket_taskbackend",
+    srcs = ["buildbucket_taskbackend.go"],
+    importpath = "go.skia.org/infra/task_scheduler/go/job_creation/buildbucket_taskbackend",
+    visibility = ["//visibility:public"],
+    deps = [
+        "//go/buildbucket",
+        "//go/firestore",
+        "//go/now",
+        "//go/skerr",
+        "//go/sklog",
+        "//task_scheduler/go/db",
+        "//task_scheduler/go/types",
+        "@org_chromium_go_luci//buildbucket/proto",
+        "@org_golang_google_genproto_googleapis_rpc//status",
+    ],
+)
+
+go_test(
+    name = "buildbucket_taskbackend_test",
+    srcs = ["buildbucket_taskbackend_test.go"],
+    embed = [":buildbucket_taskbackend"],
+    deps = [
+        "//go/buildbucket/mocks",
+        "//go/firestore",
+        "//go/now",
+        "//go/testutils",
+        "//task_scheduler/go/db",
+        "//task_scheduler/go/mocks",
+        "//task_scheduler/go/types",
+        "@com_github_stretchr_testify//require",
+        "@org_chromium_go_luci//buildbucket/proto",
+        "@org_golang_google_genproto_googleapis_rpc//status",
+        "@org_golang_google_protobuf//types/known/timestamppb",
+    ],
+)
diff --git a/task_scheduler/go/job_creation/buildbucket_taskbackend/buildbucket_taskbackend.go b/task_scheduler/go/job_creation/buildbucket_taskbackend/buildbucket_taskbackend.go
new file mode 100644
index 0000000..62f56b3
--- /dev/null
+++ b/task_scheduler/go/job_creation/buildbucket_taskbackend/buildbucket_taskbackend.go
@@ -0,0 +1,273 @@
+package buildbucket_taskbackend
+
+import (
+	"context"
+	"fmt"
+	"net/http"
+	"strconv"
+	"strings"
+
+	buildbucketpb "go.chromium.org/luci/buildbucket/proto"
+	"go.skia.org/infra/go/buildbucket"
+	"go.skia.org/infra/go/firestore"
+	"go.skia.org/infra/go/now"
+	"go.skia.org/infra/go/skerr"
+	"go.skia.org/infra/go/sklog"
+	"go.skia.org/infra/task_scheduler/go/db"
+	"go.skia.org/infra/task_scheduler/go/types"
+	"google.golang.org/genproto/googleapis/rpc/status"
+)
+
+// TaskBackend implements TaskBackendServer in terms of Task Scheduler Jobs.
+type TaskBackend struct {
+	bb2                buildbucket.BuildBucketInterface
+	buildbucketTarget  string
+	db                 db.JobDB
+	projectRepoMapping map[string]string
+	taskSchedulerHost  string
+}
+
+// NewTaskBackend returns a TaskBackend instance.
+func NewTaskBackend(ctx context.Context, buildbucketTarget, taskSchedulerHost string, projectRepoMapping map[string]string, d db.JobDB, bb2 buildbucket.BuildBucketInterface) (*TaskBackend, error) {
+	return &TaskBackend{
+		bb2:                bb2,
+		buildbucketTarget:  buildbucketTarget,
+		db:                 d,
+		projectRepoMapping: projectRepoMapping,
+		taskSchedulerHost:  taskSchedulerHost,
+	}, nil
+}
+
+// RunTask implements TaskBackendServer.
+func (tb *TaskBackend) RunTask(ctx context.Context, req *buildbucketpb.RunTaskRequest) (*buildbucketpb.RunTaskResponse, error) {
+	// Validation.
+	if req.Target != tb.buildbucketTarget {
+		return nil, skerr.Fmt("incorrect target for this scheduler; expected %s", tb.buildbucketTarget)
+	}
+	if req.Secrets == nil {
+		return nil, skerr.Fmt("secrets not set on request")
+	}
+	buildId, err := strconv.ParseInt(req.BuildId, 10, 64)
+	if err != nil {
+		return nil, skerr.Wrapf(err, "invalid build ID")
+	}
+
+	// Look for any Jobs which we might have already created for this Build.
+	duplicates, err := tb.db.SearchJobs(ctx, &db.JobSearchParams{
+		BuildbucketBuildID: &buildId,
+	})
+	if err != nil {
+		return nil, skerr.Wrapf(err, "failed looking for duplicate jobs")
+	}
+	if len(duplicates) > 0 {
+		return &buildbucketpb.RunTaskResponse{
+			Task: JobToBuildbucketTask(ctx, duplicates[0], tb.buildbucketTarget, tb.taskSchedulerHost),
+		}, nil
+	}
+
+	// Get the build details from the v2 API.
+	// TODO(borenet): It would be much better to avoid sending an extra request
+	// back to Buildbucket. It seems like this information should be part of the
+	// RunTaskRequest - maybe it's already in RunTaskRequest.BackendConfig?
+	build, err := tb.bb2.GetBuild(ctx, buildId)
+	if err != nil {
+		return nil, skerr.Wrapf(err, "failed to retrieve build %d", buildId)
+	}
+	if build.Builder == nil {
+		return nil, skerr.Fmt("builder isn't set on build %d", buildId)
+	}
+
+	// Obtain and validate the RepoState.
+	if build.Input == nil || build.Input.GerritChanges == nil || len(build.Input.GerritChanges) != 1 {
+		return nil, skerr.Fmt("invalid Build %d: input should have exactly one GerritChanges: %+v", buildId, build.Input)
+	}
+	gerritChange := build.Input.GerritChanges[0]
+	repoUrl, ok := tb.projectRepoMapping[gerritChange.Project]
+	if !ok {
+		return nil, skerr.Fmt("unknown patch project %q", gerritChange.Project)
+	}
+	server := gerritChange.Host
+	if !strings.Contains(server, "://") {
+		server = fmt.Sprintf("https://%s", server)
+	}
+	rs := types.RepoState{
+		Patch: types.Patch{
+			Server:    server,
+			Issue:     strconv.FormatInt(gerritChange.Change, 10),
+			PatchRepo: repoUrl,
+			Patchset:  strconv.FormatInt(gerritChange.Patchset, 10),
+		},
+		Repo: repoUrl,
+		// We can't fill this out without retrieving the Gerrit ChangeInfo and
+		// resolving the branch to a commit hash. Defer that work until later.
+		Revision: "",
+	}
+
+	// Create the Job.
+	j := &types.Job{
+		Name:                   build.Builder.Builder,
+		BuildbucketBuildId:     buildId,
+		BuildbucketPubSubTopic: req.PubsubTopic,
+		BuildbucketToken:       req.Secrets.StartBuildToken,
+		Requested:              firestore.FixTimestamp(build.CreateTime.AsTime().UTC()),
+		Created:                firestore.FixTimestamp(now.Now(ctx)),
+		RepoState:              rs,
+		Status:                 types.JOB_STATUS_REQUESTED,
+	}
+	if !j.Requested.Before(j.Created) {
+		sklog.Errorf("Try job created time %s is before requested time %s! Setting equal.", j.Created, j.Requested)
+		j.Requested = j.Created.Add(-firestore.TS_RESOLUTION)
+	}
+
+	// Insert the Job into the DB.
+	if err := tb.db.PutJob(ctx, j); err != nil {
+		return nil, skerr.Wrapf(err, "failed to insert Job into the DB")
+	}
+	return &buildbucketpb.RunTaskResponse{
+		Task: JobToBuildbucketTask(ctx, j, tb.buildbucketTarget, tb.taskSchedulerHost),
+	}, nil
+}
+
+// FetchTasks implements TaskBackendServer.
+func (tb *TaskBackend) FetchTasks(ctx context.Context, req *buildbucketpb.FetchTasksRequest) (*buildbucketpb.FetchTasksResponse, error) {
+	resps := make([]*buildbucketpb.FetchTasksResponse_Response, 0, len(req.TaskIds))
+	for _, id := range req.TaskIds {
+		resp := &buildbucketpb.FetchTasksResponse_Response{}
+		if id.Target != tb.buildbucketTarget {
+			resp.Response = &buildbucketpb.FetchTasksResponse_Response_Error{
+				Error: &status.Status{
+					Code:    http.StatusBadRequest,
+					Message: fmt.Sprintf("incorrect target for this scheduler; expected %s", tb.buildbucketTarget),
+				},
+			}
+			resps = append(resps, resp)
+			continue
+		}
+		job, err := tb.db.GetJobById(ctx, id.Id)
+		if err != nil {
+			resp.Response = &buildbucketpb.FetchTasksResponse_Response_Error{
+				Error: &status.Status{
+					Code:    http.StatusInternalServerError,
+					Message: err.Error(),
+				},
+			}
+			resps = append(resps, resp)
+			continue
+		} else if job == nil {
+			resp.Response = &buildbucketpb.FetchTasksResponse_Response_Error{
+				Error: &status.Status{
+					Code:    http.StatusNotFound,
+					Message: "unknown task",
+				},
+			}
+			resps = append(resps, resp)
+			continue
+		}
+		resp.Response = &buildbucketpb.FetchTasksResponse_Response_Task{
+			Task: JobToBuildbucketTask(ctx, job, tb.buildbucketTarget, tb.taskSchedulerHost),
+		}
+		resps = append(resps, resp)
+	}
+	return &buildbucketpb.FetchTasksResponse{
+		Responses: resps,
+	}, nil
+}
+
+// CancelTasks implements TaskBackendServer.
+func (tb *TaskBackend) CancelTasks(ctx context.Context, req *buildbucketpb.CancelTasksRequest) (*buildbucketpb.CancelTasksResponse, error) {
+	// Note: According to the Buildbucket docs, we're supposed to be ensuring
+	// that the tasks are fully canceled (ie. any underlying work is no longer
+	// running) before we return with a "canceled" status. We have the
+	// capability of canceling Swarming tasks, but given the complexity involved
+	// (we might, for example be sharing a given Swarming task between multiple
+	// jobs), I'm not sure we actually want to do that.
+	jobs := make([]*types.Job, 0, len(req.TaskIds))
+	for _, id := range req.TaskIds {
+		if id.Target != tb.buildbucketTarget {
+			return nil, skerr.Fmt("incorrect target for this scheduler; expected %s", tb.buildbucketTarget)
+		}
+		job, err := tb.db.GetJobById(ctx, id.Id)
+		if err != nil {
+			return nil, skerr.Wrap(err)
+		} else if job == nil {
+			return nil, skerr.Fmt("unknown job %q", id.Id)
+		}
+		jobs = append(jobs, job)
+	}
+	updated := make([]*types.Job, 0, len(jobs))
+	for _, job := range jobs {
+		if !job.Done() {
+			job.Status = types.JOB_STATUS_CANCELED
+			job.StatusDetails = "Canceled by Buildbucket"
+			updated = append(updated, job)
+		}
+	}
+	if len(updated) > 0 {
+		if err := tb.db.PutJobs(ctx, updated); err != nil {
+			return nil, skerr.Wrap(err)
+		}
+	}
+	resps := make([]*buildbucketpb.Task, 0, len(req.TaskIds))
+	for _, job := range jobs {
+		resps = append(resps, JobToBuildbucketTask(ctx, job, tb.buildbucketTarget, tb.taskSchedulerHost))
+	}
+	return &buildbucketpb.CancelTasksResponse{
+		Tasks: resps,
+	}, nil
+}
+
+// ValidateConfigs implements TaskBackendServer.
+func (tb *TaskBackend) ValidateConfigs(ctx context.Context, req *buildbucketpb.ValidateConfigsRequest) (*buildbucketpb.ValidateConfigsResponse, error) {
+	// TODO(borenet): I'm not sure what we're actually supposed to be validating
+	// in this method.
+	var errs []*buildbucketpb.ValidateConfigsResponse_ErrorDetail
+	for idx, cfg := range req.Configs {
+		if cfg.Target != tb.buildbucketTarget {
+			errs = append(errs, &buildbucketpb.ValidateConfigsResponse_ErrorDetail{
+				Index: int32(idx),
+				Error: fmt.Sprintf("incorrect target for this scheduler; expected %s", tb.buildbucketTarget),
+			})
+		}
+	}
+	return &buildbucketpb.ValidateConfigsResponse{
+		ConfigErrors: errs,
+	}, nil
+}
+
+// Assert that TaskBackend implements TaskBackendServer.
+var _ buildbucketpb.TaskBackendServer = &TaskBackend{}
+
+// JobStatusToBuildbucketStatus converts a types.JobStatus to a
+// buildbucketpb.Status.
+func JobStatusToBuildbucketStatus(status types.JobStatus) buildbucketpb.Status {
+	switch status {
+	case types.JOB_STATUS_CANCELED:
+		return buildbucketpb.Status_CANCELED
+	case types.JOB_STATUS_FAILURE:
+		return buildbucketpb.Status_FAILURE
+	case types.JOB_STATUS_IN_PROGRESS:
+		return buildbucketpb.Status_STARTED
+	case types.JOB_STATUS_MISHAP:
+		return buildbucketpb.Status_INFRA_FAILURE
+	case types.JOB_STATUS_REQUESTED:
+		return buildbucketpb.Status_SCHEDULED
+	case types.JOB_STATUS_SUCCESS:
+		return buildbucketpb.Status_SUCCESS
+	default:
+		return buildbucketpb.Status_STATUS_UNSPECIFIED
+	}
+}
+
+// JobToBuildbucketTask converts a types.Job to a buildbucketpb.Task.
+func JobToBuildbucketTask(ctx context.Context, job *types.Job, buildbucketTarget, taskSchedulerHost string) *buildbucketpb.Task {
+	return &buildbucketpb.Task{
+		Id: &buildbucketpb.TaskID{
+			Target: buildbucketTarget,
+			Id:     job.Id,
+		},
+		Link:            job.URL(taskSchedulerHost),
+		Status:          JobStatusToBuildbucketStatus(job.Status),
+		SummaryMarkdown: job.StatusDetails,
+		UpdateId:        now.Now(ctx).UnixNano(),
+	}
+}
diff --git a/task_scheduler/go/job_creation/buildbucket_taskbackend/buildbucket_taskbackend_test.go b/task_scheduler/go/job_creation/buildbucket_taskbackend/buildbucket_taskbackend_test.go
new file mode 100644
index 0000000..ef15f76
--- /dev/null
+++ b/task_scheduler/go/job_creation/buildbucket_taskbackend/buildbucket_taskbackend_test.go
@@ -0,0 +1,518 @@
+package buildbucket_taskbackend
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"net/http"
+	"strconv"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/require"
+	buildbucketpb "go.chromium.org/luci/buildbucket/proto"
+	buildbucket_mocks "go.skia.org/infra/go/buildbucket/mocks"
+	"go.skia.org/infra/go/firestore"
+	"go.skia.org/infra/go/now"
+	"go.skia.org/infra/go/testutils"
+	"go.skia.org/infra/task_scheduler/go/db"
+	"go.skia.org/infra/task_scheduler/go/mocks"
+	"go.skia.org/infra/task_scheduler/go/types"
+	"google.golang.org/genproto/googleapis/rpc/status"
+	"google.golang.org/protobuf/types/known/timestamppb"
+)
+
+const (
+	fakeBuildbucketPubsubTopic = "fake-bb-pubsub"
+	fakeBuildbucketTarget      = "skia://fake-scheduler"
+	fakeBuildbucketToken       = "fake-token"
+	fakeBuildIdStr             = "12345"
+	fakeJobName                = "fake-job-name"
+	fakeTaskSchedulerHost      = "https://fake-scheduler"
+	fakeProject                = "fake-project"
+	fakeRepo                   = "https://fake.git"
+	fakeGerritHost             = "fake-project-review.googlesource.com"
+	fakeGerritChange           = 6789
+	fakeGerritPatchset         = 3
+)
+
+var (
+	// This is a var instead of a constant so that we can take its address.
+	fakeBuildIdInt int64 = 12345
+	fakeCreateTime       = firestore.FixTimestamp(time.Unix(1702395110, 0)) // Arbitrary timestamp.
+)
+
+func setup(t *testing.T) (context.Context, *TaskBackend, *mocks.JobDB, *buildbucket_mocks.BuildBucketInterface) {
+	ctx := now.TimeTravelingContext(fakeCreateTime.Add(time.Minute))
+	bb := &buildbucket_mocks.BuildBucketInterface{}
+	projectRepoMapping := map[string]string{
+		fakeProject: fakeRepo,
+	}
+	db := &mocks.JobDB{}
+	tb, err := NewTaskBackend(ctx, fakeBuildbucketTarget, fakeTaskSchedulerHost, projectRepoMapping, db, bb)
+	require.NoError(t, err)
+	t.Cleanup(func() {
+		db.AssertExpectations(t)
+		bb.AssertExpectations(t)
+	})
+	return ctx, tb, db, bb
+}
+
+func fakeBuild() *buildbucketpb.Build {
+	return &buildbucketpb.Build{
+		Id: fakeBuildIdInt,
+		Builder: &buildbucketpb.BuilderID{
+			Project: fakeProject,
+			Builder: fakeJobName,
+		},
+		CreateTime: timestamppb.New(fakeCreateTime),
+		Input: &buildbucketpb.Build_Input{
+			GerritChanges: []*buildbucketpb.GerritChange{
+				{
+					Project:  fakeProject,
+					Host:     fakeGerritHost,
+					Change:   fakeGerritChange,
+					Patchset: fakeGerritPatchset,
+				},
+			},
+		},
+	}
+}
+
+func fakeRunTaskRequest() *buildbucketpb.RunTaskRequest {
+	return &buildbucketpb.RunTaskRequest{
+		BuildId:     fakeBuildIdStr,
+		PubsubTopic: fakeBuildbucketPubsubTopic,
+		Secrets: &buildbucketpb.BuildSecrets{
+			StartBuildToken: fakeBuildbucketToken,
+		},
+		Target: fakeBuildbucketTarget,
+	}
+}
+
+func makeJob(ctx context.Context) *types.Job {
+	return &types.Job{
+		Name:                   fakeJobName,
+		BuildbucketBuildId:     fakeBuildIdInt,
+		BuildbucketPubSubTopic: fakeBuildbucketPubsubTopic,
+		BuildbucketToken:       fakeBuildbucketToken,
+		Requested:              fakeCreateTime,
+		Created:                firestore.FixTimestamp(now.Now(ctx)),
+		RepoState: types.RepoState{
+			Patch: types.Patch{
+				Server:    "https://" + fakeGerritHost,
+				Issue:     strconv.FormatInt(fakeGerritChange, 10),
+				PatchRepo: fakeRepo,
+				Patchset:  strconv.FormatInt(fakeGerritPatchset, 10),
+			},
+			Repo:     fakeRepo,
+			Revision: "",
+		},
+		Status: types.JOB_STATUS_REQUESTED,
+	}
+}
+
+func TestTaskBackend_RunTask_Success(t *testing.T) {
+	ctx, tb, d, bb := setup(t)
+	d.On("SearchJobs", testutils.AnyContext, &db.JobSearchParams{
+		BuildbucketBuildID: &fakeBuildIdInt,
+	}).Return(nil, nil)
+	bb.On("GetBuild", testutils.AnyContext, fakeBuildIdInt).Return(fakeBuild(), nil)
+	expectedJob := makeJob(ctx)
+	d.On("PutJob", testutils.AnyContext, expectedJob).Return(nil)
+	resp, err := tb.RunTask(ctx, fakeRunTaskRequest())
+	require.NoError(t, err)
+	require.Equal(t, &buildbucketpb.RunTaskResponse{
+		Task: JobToBuildbucketTask(ctx, expectedJob, fakeBuildbucketTarget, fakeTaskSchedulerHost),
+	}, resp)
+}
+
+func TestTaskBackend_RunTask_WrongTarget(t *testing.T) {
+	ctx, tb, _, _ := setup(t)
+	req := fakeRunTaskRequest()
+	req.Target = "bogus target"
+	resp, err := tb.RunTask(ctx, req)
+	require.ErrorContains(t, err, "incorrect target for this scheduler")
+	require.Nil(t, resp)
+}
+
+func TestTaskBackend_RunTask_InvalidID(t *testing.T) {
+	ctx, tb, _, _ := setup(t)
+	req := fakeRunTaskRequest()
+	req.BuildId = "not parseable as int64"
+	resp, err := tb.RunTask(ctx, req)
+	require.ErrorContains(t, err, "invalid build ID")
+	require.Nil(t, resp)
+}
+
+func TestTaskBackend_RunTask_Duplicate(t *testing.T) {
+	ctx, tb, d, _ := setup(t)
+	expectedJob := makeJob(ctx)
+	d.On("SearchJobs", testutils.AnyContext, &db.JobSearchParams{
+		BuildbucketBuildID: &fakeBuildIdInt,
+	}).Return([]*types.Job{expectedJob}, nil)
+	resp, err := tb.RunTask(ctx, fakeRunTaskRequest())
+	require.NoError(t, err)
+	require.Equal(t, &buildbucketpb.RunTaskResponse{
+		Task: JobToBuildbucketTask(ctx, expectedJob, fakeBuildbucketTarget, fakeTaskSchedulerHost),
+	}, resp)
+}
+
+func TestTaskBackend_RunTask_MultipleDuplicates_UseFirst(t *testing.T) {
+	ctx, tb, d, _ := setup(t)
+	expectedJob := makeJob(ctx)
+	otherJob := makeJob(ctx)
+	otherJob.Id = "not this one"
+	d.On("SearchJobs", testutils.AnyContext, &db.JobSearchParams{
+		BuildbucketBuildID: &fakeBuildIdInt,
+	}).Return([]*types.Job{expectedJob, otherJob}, nil)
+	resp, err := tb.RunTask(ctx, fakeRunTaskRequest())
+	require.NoError(t, err)
+	require.Equal(t, &buildbucketpb.RunTaskResponse{
+		Task: JobToBuildbucketTask(ctx, expectedJob, fakeBuildbucketTarget, fakeTaskSchedulerHost),
+	}, resp)
+}
+
+func TestTaskBackend_RunTask_FailedSearchingJobs(t *testing.T) {
+	ctx, tb, d, _ := setup(t)
+	req := fakeRunTaskRequest()
+	d.On("SearchJobs", testutils.AnyContext, &db.JobSearchParams{
+		BuildbucketBuildID: &fakeBuildIdInt,
+	}).Return(nil, errors.New("can't find the jobs!"))
+	resp, err := tb.RunTask(ctx, req)
+	require.ErrorContains(t, err, "failed looking for duplicate jobs")
+	require.Nil(t, resp)
+}
+
+func TestTaskBackend_RunTask_GetBuildFailed(t *testing.T) {
+	ctx, tb, d, bb := setup(t)
+	d.On("SearchJobs", testutils.AnyContext, &db.JobSearchParams{
+		BuildbucketBuildID: &fakeBuildIdInt,
+	}).Return(nil, nil)
+	bb.On("GetBuild", testutils.AnyContext, fakeBuildIdInt).Return(nil, errors.New("can't find the build!"))
+	req := fakeRunTaskRequest()
+	resp, err := tb.RunTask(ctx, req)
+	require.ErrorContains(t, err, "failed to retrieve build")
+	require.Nil(t, resp)
+}
+
+func TestTaskBackend_RunTask_NoBuilder(t *testing.T) {
+	ctx, tb, d, bb := setup(t)
+	d.On("SearchJobs", testutils.AnyContext, &db.JobSearchParams{
+		BuildbucketBuildID: &fakeBuildIdInt,
+	}).Return(nil, nil)
+	build := fakeBuild()
+	build.Builder = nil
+	bb.On("GetBuild", testutils.AnyContext, fakeBuildIdInt).Return(build, nil)
+	req := fakeRunTaskRequest()
+	resp, err := tb.RunTask(ctx, req)
+	require.ErrorContains(t, err, "builder isn't set")
+	require.Nil(t, resp)
+}
+
+func TestTaskBackend_RunTask_NoGerritChanges(t *testing.T) {
+	ctx, tb, d, bb := setup(t)
+	d.On("SearchJobs", testutils.AnyContext, &db.JobSearchParams{
+		BuildbucketBuildID: &fakeBuildIdInt,
+	}).Return(nil, nil)
+	build := fakeBuild()
+	build.Input.GerritChanges = nil
+	bb.On("GetBuild", testutils.AnyContext, fakeBuildIdInt).Return(build, nil)
+	req := fakeRunTaskRequest()
+	resp, err := tb.RunTask(ctx, req)
+	require.ErrorContains(t, err, "input should have exactly one GerritChanges")
+	require.Nil(t, resp)
+}
+
+func TestTaskBackend_RunTask_UnknownPatchProject(t *testing.T) {
+	ctx, tb, d, bb := setup(t)
+	d.On("SearchJobs", testutils.AnyContext, &db.JobSearchParams{
+		BuildbucketBuildID: &fakeBuildIdInt,
+	}).Return(nil, nil)
+	build := fakeBuild()
+	build.Input.GerritChanges[0].Project = "bogus project"
+	bb.On("GetBuild", testutils.AnyContext, fakeBuildIdInt).Return(build, nil)
+	req := fakeRunTaskRequest()
+	resp, err := tb.RunTask(ctx, req)
+	require.ErrorContains(t, err, "unknown patch project")
+	require.Nil(t, resp)
+}
+
+func TestTaskBackend_RunTask_FailedDBInsert(t *testing.T) {
+	ctx, tb, d, bb := setup(t)
+	d.On("SearchJobs", testutils.AnyContext, &db.JobSearchParams{
+		BuildbucketBuildID: &fakeBuildIdInt,
+	}).Return(nil, nil)
+	bb.On("GetBuild", testutils.AnyContext, fakeBuildIdInt).Return(fakeBuild(), nil)
+	expectedJob := makeJob(ctx)
+	d.On("PutJob", testutils.AnyContext, expectedJob).Return(errors.New("db failed"))
+	resp, err := tb.RunTask(ctx, fakeRunTaskRequest())
+	require.ErrorContains(t, err, "failed to insert Job into the DB")
+	require.Nil(t, resp)
+}
+
+func TestTaskBackend_FetchTasks_Success(t *testing.T) {
+	ctx, tb, d, _ := setup(t)
+	job := makeJob(ctx)
+	d.On("GetJobById", testutils.AnyContext, "my-job-id").Return(job, nil)
+	resp, err := tb.FetchTasks(ctx, &buildbucketpb.FetchTasksRequest{
+		TaskIds: []*buildbucketpb.TaskID{
+			{
+				Target: fakeBuildbucketTarget,
+				Id:     "my-job-id",
+			},
+		},
+	})
+	require.NoError(t, err)
+	require.Equal(t, &buildbucketpb.FetchTasksResponse{
+		Responses: []*buildbucketpb.FetchTasksResponse_Response{
+			{
+				Response: &buildbucketpb.FetchTasksResponse_Response_Task{
+					Task: JobToBuildbucketTask(ctx, job, fakeBuildbucketTarget, fakeTaskSchedulerHost),
+				},
+			},
+		},
+	}, resp)
+}
+
+func TestTaskBackend_FetchTasks_WrongTarget(t *testing.T) {
+	ctx, tb, d, _ := setup(t)
+	job := makeJob(ctx)
+	d.On("GetJobById", testutils.AnyContext, "my-job-id").Return(job, nil)
+	resp, err := tb.FetchTasks(ctx, &buildbucketpb.FetchTasksRequest{
+		TaskIds: []*buildbucketpb.TaskID{
+			{
+				Target: fakeBuildbucketTarget,
+				Id:     "my-job-id",
+			},
+			{
+				Target: "bogus target",
+				Id:     "fail-job-id",
+			},
+		},
+	})
+	require.NoError(t, err)
+	require.Equal(t, &buildbucketpb.FetchTasksResponse{
+		Responses: []*buildbucketpb.FetchTasksResponse_Response{
+			{
+				Response: &buildbucketpb.FetchTasksResponse_Response_Task{
+					Task: JobToBuildbucketTask(ctx, job, fakeBuildbucketTarget, fakeTaskSchedulerHost),
+				},
+			},
+			{
+				Response: &buildbucketpb.FetchTasksResponse_Response_Error{
+					Error: &status.Status{
+						Code:    http.StatusBadRequest,
+						Message: fmt.Sprintf("incorrect target for this scheduler; expected %s", fakeBuildbucketTarget),
+					},
+				},
+			},
+		},
+	}, resp)
+}
+
+func TestTaskBackend_FetchTasks_DBError(t *testing.T) {
+	ctx, tb, d, _ := setup(t)
+	job := makeJob(ctx)
+	d.On("GetJobById", testutils.AnyContext, "my-job-id").Return(job, nil)
+	d.On("GetJobById", testutils.AnyContext, "fail-job-id").Return(nil, errors.New("DB error"))
+	resp, err := tb.FetchTasks(ctx, &buildbucketpb.FetchTasksRequest{
+		TaskIds: []*buildbucketpb.TaskID{
+			{
+				Target: fakeBuildbucketTarget,
+				Id:     "my-job-id",
+			},
+			{
+				Target: fakeBuildbucketTarget,
+				Id:     "fail-job-id",
+			},
+		},
+	})
+	require.NoError(t, err)
+	require.Equal(t, &buildbucketpb.FetchTasksResponse{
+		Responses: []*buildbucketpb.FetchTasksResponse_Response{
+			{
+				Response: &buildbucketpb.FetchTasksResponse_Response_Task{
+					Task: JobToBuildbucketTask(ctx, job, fakeBuildbucketTarget, fakeTaskSchedulerHost),
+				},
+			},
+			{
+				Response: &buildbucketpb.FetchTasksResponse_Response_Error{
+					Error: &status.Status{
+						Code:    http.StatusInternalServerError,
+						Message: "DB error",
+					},
+				},
+			},
+		},
+	}, resp)
+}
+
+func TestTaskBackend_FetchTasks_NotFound(t *testing.T) {
+	ctx, tb, d, _ := setup(t)
+	job := makeJob(ctx)
+	d.On("GetJobById", testutils.AnyContext, "my-job-id").Return(job, nil)
+	d.On("GetJobById", testutils.AnyContext, "fail-job-id").Return(nil, nil)
+	resp, err := tb.FetchTasks(ctx, &buildbucketpb.FetchTasksRequest{
+		TaskIds: []*buildbucketpb.TaskID{
+			{
+				Target: fakeBuildbucketTarget,
+				Id:     "my-job-id",
+			},
+			{
+				Target: fakeBuildbucketTarget,
+				Id:     "fail-job-id",
+			},
+		},
+	})
+	require.NoError(t, err)
+	require.Equal(t, &buildbucketpb.FetchTasksResponse{
+		Responses: []*buildbucketpb.FetchTasksResponse_Response{
+			{
+				Response: &buildbucketpb.FetchTasksResponse_Response_Task{
+					Task: JobToBuildbucketTask(ctx, job, fakeBuildbucketTarget, fakeTaskSchedulerHost),
+				},
+			},
+			{
+				Response: &buildbucketpb.FetchTasksResponse_Response_Error{
+					Error: &status.Status{
+						Code:    http.StatusNotFound,
+						Message: "unknown task",
+					},
+				},
+			},
+		},
+	}, resp)
+}
+
+func TestTaskBackend_CancelTasks_Success(t *testing.T) {
+	ctx, tb, d, _ := setup(t)
+	beforeJob := makeJob(ctx)
+	afterJob := beforeJob.Copy()
+	d.On("GetJobById", testutils.AnyContext, "my-job-id").Return(beforeJob, nil)
+	afterJob.Status = types.JOB_STATUS_CANCELED
+	afterJob.StatusDetails = "Canceled by Buildbucket"
+	d.On("PutJobs", testutils.AnyContext, []*types.Job{afterJob}).Return(nil)
+	resp, err := tb.CancelTasks(ctx, &buildbucketpb.CancelTasksRequest{
+		TaskIds: []*buildbucketpb.TaskID{
+			{
+				Target: fakeBuildbucketTarget,
+				Id:     "my-job-id",
+			},
+		},
+	})
+	require.NoError(t, err)
+	require.Equal(t, &buildbucketpb.CancelTasksResponse{
+		Tasks: []*buildbucketpb.Task{
+			JobToBuildbucketTask(ctx, afterJob, fakeBuildbucketTarget, fakeTaskSchedulerHost),
+		},
+	}, resp)
+}
+
+func TestTaskBackend_CancelTasks_WrongTarget(t *testing.T) {
+	ctx, tb, _, _ := setup(t)
+	resp, err := tb.CancelTasks(ctx, &buildbucketpb.CancelTasksRequest{
+		TaskIds: []*buildbucketpb.TaskID{
+			{
+				Target: "bogus-target",
+				Id:     "my-job-id",
+			},
+		},
+	})
+	require.ErrorContains(t, err, "incorrect target for this scheduler")
+	require.Nil(t, resp)
+}
+
+func TestTaskBackend_CancelTasks_FailedDBRetrieve(t *testing.T) {
+	ctx, tb, d, _ := setup(t)
+	d.On("GetJobById", testutils.AnyContext, "my-job-id").Return(nil, nil)
+	resp, err := tb.CancelTasks(ctx, &buildbucketpb.CancelTasksRequest{
+		TaskIds: []*buildbucketpb.TaskID{
+			{
+				Target: fakeBuildbucketTarget,
+				Id:     "my-job-id",
+			},
+		},
+	})
+	require.ErrorContains(t, err, "unknown job")
+	require.Nil(t, resp)
+}
+
+func TestTaskBackend_CancelTasks_FailedDBInsert(t *testing.T) {
+	ctx, tb, d, _ := setup(t)
+	beforeJob := makeJob(ctx)
+	afterJob := beforeJob.Copy()
+	d.On("GetJobById", testutils.AnyContext, "my-job-id").Return(beforeJob, nil)
+	afterJob.Status = types.JOB_STATUS_CANCELED
+	afterJob.StatusDetails = "Canceled by Buildbucket"
+	d.On("PutJobs", testutils.AnyContext, []*types.Job{afterJob}).Return(errors.New("DB error"))
+	resp, err := tb.CancelTasks(ctx, &buildbucketpb.CancelTasksRequest{
+		TaskIds: []*buildbucketpb.TaskID{
+			{
+				Target: fakeBuildbucketTarget,
+				Id:     "my-job-id",
+			},
+		},
+	})
+	require.ErrorContains(t, err, "DB error")
+	require.Nil(t, resp)
+}
+
+func TestTaskBackend_CancelTasks_NoUpdates(t *testing.T) {
+	ctx, tb, d, _ := setup(t)
+	job := makeJob(ctx)
+	job.Status = types.JOB_STATUS_CANCELED
+	job.StatusDetails = "Canceled by Buildbucket"
+	d.On("GetJobById", testutils.AnyContext, "my-job-id").Return(job, nil)
+	resp, err := tb.CancelTasks(ctx, &buildbucketpb.CancelTasksRequest{
+		TaskIds: []*buildbucketpb.TaskID{
+			{
+				Target: fakeBuildbucketTarget,
+				Id:     "my-job-id",
+			},
+		},
+	})
+	require.NoError(t, err)
+	require.Equal(t, &buildbucketpb.CancelTasksResponse{
+		Tasks: []*buildbucketpb.Task{
+			JobToBuildbucketTask(ctx, job, fakeBuildbucketTarget, fakeTaskSchedulerHost),
+		},
+	}, resp)
+}
+
+func TestTaskBackend_ValidateConfigs_Success(t *testing.T) {
+	ctx, tb, _, _ := setup(t)
+	resp, err := tb.ValidateConfigs(ctx, &buildbucketpb.ValidateConfigsRequest{
+		Configs: []*buildbucketpb.ValidateConfigsRequest_ConfigContext{
+			{
+				Target:     fakeBuildbucketTarget,
+				ConfigJson: nil,
+			},
+		},
+	})
+	require.NoError(t, err)
+	require.Equal(t, &buildbucketpb.ValidateConfigsResponse{
+		ConfigErrors: nil,
+	}, resp)
+}
+
+func TestTaskBackend_ValidateConfigs_WrongTarget(t *testing.T) {
+	ctx, tb, _, _ := setup(t)
+	resp, err := tb.ValidateConfigs(ctx, &buildbucketpb.ValidateConfigsRequest{
+		Configs: []*buildbucketpb.ValidateConfigsRequest_ConfigContext{
+			{
+				Target:     "bogus-target",
+				ConfigJson: nil,
+			},
+		},
+	})
+	require.NoError(t, err)
+	require.Equal(t, &buildbucketpb.ValidateConfigsResponse{
+		ConfigErrors: []*buildbucketpb.ValidateConfigsResponse_ErrorDetail{
+			{
+				Index: 0,
+				Error: fmt.Sprintf("incorrect target for this scheduler; expected %s", fakeBuildbucketTarget),
+			},
+		},
+	}, resp)
+}
diff --git a/task_scheduler/go/mocks/BUILD.bazel b/task_scheduler/go/mocks/BUILD.bazel
index 9c1b03c..93922c3 100644
--- a/task_scheduler/go/mocks/BUILD.bazel
+++ b/task_scheduler/go/mocks/BUILD.bazel
@@ -2,7 +2,10 @@
 
 go_library(
     name = "mocks",
-    srcs = ["RemoteDB.go"],
+    srcs = [
+        "JobDB.go",
+        "RemoteDB.go",
+    ],
     importpath = "go.skia.org/infra/task_scheduler/go/mocks",
     visibility = ["//visibility:public"],
     deps = [
diff --git a/task_scheduler/go/mocks/JobDB.go b/task_scheduler/go/mocks/JobDB.go
new file mode 100644
index 0000000..3e7825e
--- /dev/null
+++ b/task_scheduler/go/mocks/JobDB.go
@@ -0,0 +1,197 @@
+// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
+
+package mocks
+
+import (
+	context "context"
+
+	mock "github.com/stretchr/testify/mock"
+	db "go.skia.org/infra/task_scheduler/go/db"
+
+	time "time"
+
+	types "go.skia.org/infra/task_scheduler/go/types"
+)
+
+// JobDB is an autogenerated mock type for the JobDB type
+type JobDB struct {
+	mock.Mock
+}
+
+// GetJobById provides a mock function with given fields: _a0, _a1
+func (_m *JobDB) GetJobById(_a0 context.Context, _a1 string) (*types.Job, error) {
+	ret := _m.Called(_a0, _a1)
+
+	if len(ret) == 0 {
+		panic("no return value specified for GetJobById")
+	}
+
+	var r0 *types.Job
+	var r1 error
+	if rf, ok := ret.Get(0).(func(context.Context, string) (*types.Job, error)); ok {
+		return rf(_a0, _a1)
+	}
+	if rf, ok := ret.Get(0).(func(context.Context, string) *types.Job); ok {
+		r0 = rf(_a0, _a1)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(*types.Job)
+		}
+	}
+
+	if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
+		r1 = rf(_a0, _a1)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// GetJobsFromDateRange provides a mock function with given fields: _a0, _a1, _a2, _a3
+func (_m *JobDB) GetJobsFromDateRange(_a0 context.Context, _a1 time.Time, _a2 time.Time, _a3 string) ([]*types.Job, error) {
+	ret := _m.Called(_a0, _a1, _a2, _a3)
+
+	if len(ret) == 0 {
+		panic("no return value specified for GetJobsFromDateRange")
+	}
+
+	var r0 []*types.Job
+	var r1 error
+	if rf, ok := ret.Get(0).(func(context.Context, time.Time, time.Time, string) ([]*types.Job, error)); ok {
+		return rf(_a0, _a1, _a2, _a3)
+	}
+	if rf, ok := ret.Get(0).(func(context.Context, time.Time, time.Time, string) []*types.Job); ok {
+		r0 = rf(_a0, _a1, _a2, _a3)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).([]*types.Job)
+		}
+	}
+
+	if rf, ok := ret.Get(1).(func(context.Context, time.Time, time.Time, string) error); ok {
+		r1 = rf(_a0, _a1, _a2, _a3)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// ModifiedJobsCh provides a mock function with given fields: _a0
+func (_m *JobDB) ModifiedJobsCh(_a0 context.Context) <-chan []*types.Job {
+	ret := _m.Called(_a0)
+
+	if len(ret) == 0 {
+		panic("no return value specified for ModifiedJobsCh")
+	}
+
+	var r0 <-chan []*types.Job
+	if rf, ok := ret.Get(0).(func(context.Context) <-chan []*types.Job); ok {
+		r0 = rf(_a0)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(<-chan []*types.Job)
+		}
+	}
+
+	return r0
+}
+
+// PutJob provides a mock function with given fields: _a0, _a1
+func (_m *JobDB) PutJob(_a0 context.Context, _a1 *types.Job) error {
+	ret := _m.Called(_a0, _a1)
+
+	if len(ret) == 0 {
+		panic("no return value specified for PutJob")
+	}
+
+	var r0 error
+	if rf, ok := ret.Get(0).(func(context.Context, *types.Job) error); ok {
+		r0 = rf(_a0, _a1)
+	} else {
+		r0 = ret.Error(0)
+	}
+
+	return r0
+}
+
+// PutJobs provides a mock function with given fields: _a0, _a1
+func (_m *JobDB) PutJobs(_a0 context.Context, _a1 []*types.Job) error {
+	ret := _m.Called(_a0, _a1)
+
+	if len(ret) == 0 {
+		panic("no return value specified for PutJobs")
+	}
+
+	var r0 error
+	if rf, ok := ret.Get(0).(func(context.Context, []*types.Job) error); ok {
+		r0 = rf(_a0, _a1)
+	} else {
+		r0 = ret.Error(0)
+	}
+
+	return r0
+}
+
+// PutJobsInChunks provides a mock function with given fields: _a0, _a1
+func (_m *JobDB) PutJobsInChunks(_a0 context.Context, _a1 []*types.Job) error {
+	ret := _m.Called(_a0, _a1)
+
+	if len(ret) == 0 {
+		panic("no return value specified for PutJobsInChunks")
+	}
+
+	var r0 error
+	if rf, ok := ret.Get(0).(func(context.Context, []*types.Job) error); ok {
+		r0 = rf(_a0, _a1)
+	} else {
+		r0 = ret.Error(0)
+	}
+
+	return r0
+}
+
+// SearchJobs provides a mock function with given fields: _a0, _a1
+func (_m *JobDB) SearchJobs(_a0 context.Context, _a1 *db.JobSearchParams) ([]*types.Job, error) {
+	ret := _m.Called(_a0, _a1)
+
+	if len(ret) == 0 {
+		panic("no return value specified for SearchJobs")
+	}
+
+	var r0 []*types.Job
+	var r1 error
+	if rf, ok := ret.Get(0).(func(context.Context, *db.JobSearchParams) ([]*types.Job, error)); ok {
+		return rf(_a0, _a1)
+	}
+	if rf, ok := ret.Get(0).(func(context.Context, *db.JobSearchParams) []*types.Job); ok {
+		r0 = rf(_a0, _a1)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).([]*types.Job)
+		}
+	}
+
+	if rf, ok := ret.Get(1).(func(context.Context, *db.JobSearchParams) error); ok {
+		r1 = rf(_a0, _a1)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// NewJobDB creates a new instance of JobDB. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+// The first argument is typically a *testing.T value.
+func NewJobDB(t interface {
+	mock.TestingT
+	Cleanup(func())
+}) *JobDB {
+	mock := &JobDB{}
+	mock.Mock.Test(t)
+
+	t.Cleanup(func() { mock.AssertExpectations(t) })
+
+	return mock
+}
diff --git a/task_scheduler/go/tryjobs/BUILD.bazel b/task_scheduler/go/tryjobs/BUILD.bazel
index 3694eb8..2603f70 100644
--- a/task_scheduler/go/tryjobs/BUILD.bazel
+++ b/task_scheduler/go/tryjobs/BUILD.bazel
@@ -21,6 +21,7 @@
         "//task_scheduler/go/cacher",
         "//task_scheduler/go/db",
         "//task_scheduler/go/db/cache",
+        "//task_scheduler/go/job_creation/buildbucket_taskbackend",
         "//task_scheduler/go/task_cfg_cache",
         "//task_scheduler/go/types",
         "@com_github_golang_protobuf//ptypes:go_default_library_gen",
@@ -57,6 +58,7 @@
         "//task_scheduler/go/db",
         "//task_scheduler/go/db/cache",
         "//task_scheduler/go/db/memory",
+        "//task_scheduler/go/job_creation/buildbucket_taskbackend",
         "//task_scheduler/go/task_cfg_cache/mocks",
         "//task_scheduler/go/task_cfg_cache/testutils",
         "//task_scheduler/go/types",
diff --git a/task_scheduler/go/tryjobs/tryjobs.go b/task_scheduler/go/tryjobs/tryjobs.go
index 1a6d497..0599f1a 100644
--- a/task_scheduler/go/tryjobs/tryjobs.go
+++ b/task_scheduler/go/tryjobs/tryjobs.go
@@ -30,6 +30,7 @@
 	"go.skia.org/infra/task_scheduler/go/cacher"
 	"go.skia.org/infra/task_scheduler/go/db"
 	"go.skia.org/infra/task_scheduler/go/db/cache"
+	"go.skia.org/infra/task_scheduler/go/job_creation/buildbucket_taskbackend"
 	"go.skia.org/infra/task_scheduler/go/task_cfg_cache"
 	"go.skia.org/infra/task_scheduler/go/types"
 	"google.golang.org/protobuf/encoding/prototext"
@@ -354,7 +355,7 @@
 						Id:     job.Id,
 					},
 					Link:     job.URL(t.host),
-					Status:   jobStatusToBuildbucketStatus(job.Status),
+					Status:   buildbucket_taskbackend.JobStatusToBuildbucketStatus(job.Status),
 					UpdateId: now.Now(ctx).UnixNano(),
 				},
 			}
@@ -839,29 +840,10 @@
 	return false
 }
 
-func jobStatusToBuildbucketStatus(status types.JobStatus) buildbucketpb.Status {
-	switch status {
-	case types.JOB_STATUS_CANCELED:
-		return buildbucketpb.Status_CANCELED
-	case types.JOB_STATUS_FAILURE:
-		return buildbucketpb.Status_FAILURE
-	case types.JOB_STATUS_IN_PROGRESS:
-		return buildbucketpb.Status_STARTED
-	case types.JOB_STATUS_MISHAP:
-		return buildbucketpb.Status_INFRA_FAILURE
-	case types.JOB_STATUS_REQUESTED:
-		return buildbucketpb.Status_SCHEDULED
-	case types.JOB_STATUS_SUCCESS:
-		return buildbucketpb.Status_SUCCESS
-	default:
-		return buildbucketpb.Status_STATUS_UNSPECIFIED
-	}
-}
-
 // jobToBuildV2 converts a Job to a Buildbucket V2 Build to be used with
 // UpdateBuild.
 func jobToBuildV2(job *types.Job) *buildbucketpb.Build {
-	status := jobStatusToBuildbucketStatus(job.Status)
+	status := buildbucket_taskbackend.JobStatusToBuildbucketStatus(job.Status)
 
 	// Note: There are other fields we could fill in, but I'm not sure they
 	// would provide any value since we don't actually use Buildbucket builds
diff --git a/task_scheduler/go/tryjobs/tryjobs_test.go b/task_scheduler/go/tryjobs/tryjobs_test.go
index 3d8aba7..1017ec8 100644
--- a/task_scheduler/go/tryjobs/tryjobs_test.go
+++ b/task_scheduler/go/tryjobs/tryjobs_test.go
@@ -20,10 +20,10 @@
 	"go.skia.org/infra/go/gerrit"
 	"go.skia.org/infra/go/git"
 	"go.skia.org/infra/go/mockhttpclient"
-	"go.skia.org/infra/go/now"
 	pubsub_mocks "go.skia.org/infra/go/pubsub/mocks"
 	"go.skia.org/infra/go/testutils"
 	"go.skia.org/infra/task_scheduler/go/db"
+	"go.skia.org/infra/task_scheduler/go/job_creation/buildbucket_taskbackend"
 	"go.skia.org/infra/task_scheduler/go/types"
 	"google.golang.org/protobuf/encoding/prototext"
 )
@@ -77,15 +77,7 @@
 	// Mock the pubsub message.
 	update := &buildbucketpb.BuildTaskUpdate{
 		BuildId: strconv.FormatInt(j1.BuildbucketBuildId, 10),
-		Task: &buildbucketpb.Task{
-			Id: &buildbucketpb.TaskID{
-				Target: trybots.buildbucketTarget,
-				Id:     j1.Id,
-			},
-			Link:     j1.URL(trybots.host),
-			Status:   jobStatusToBuildbucketStatus(j1.Status),
-			UpdateId: now.Now(ctx).UnixNano(),
-		},
+		Task:    buildbucket_taskbackend.JobToBuildbucketTask(ctx, j1, trybots.buildbucketTarget, trybots.host),
 	}
 	b, err := prototext.Marshal(update)
 	require.NoError(t, err)
@@ -221,15 +213,7 @@
 	for _, job := range jobs {
 		update := &buildbucketpb.BuildTaskUpdate{
 			BuildId: strconv.FormatInt(job.BuildbucketBuildId, 10),
-			Task: &buildbucketpb.Task{
-				Id: &buildbucketpb.TaskID{
-					Target: trybots.buildbucketTarget,
-					Id:     job.Id,
-				},
-				Link:     job.URL(trybots.host),
-				Status:   jobStatusToBuildbucketStatus(job.Status),
-				UpdateId: now.Now(ctx).UnixNano(),
-			},
+			Task:    buildbucket_taskbackend.JobToBuildbucketTask(ctx, job, trybots.buildbucketTarget, trybots.host),
 		}
 		b, err := prototext.Marshal(update)
 		require.NoError(t, err)