[task scheduler] Add Buildbucket TaskBackendServer implementation
Bug: b/288158829
Change-Id: Id6949b19ad83fd4bd34fc75ce35f9369fd04b476
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/787324
Reviewed-by: Kevin Lubick <kjlubick@google.com>
Commit-Queue: Eric Boren <borenet@google.com>
diff --git a/.mockery.yaml b/.mockery.yaml
index 0eb0f92..88a238c 100644
--- a/.mockery.yaml
+++ b/.mockery.yaml
@@ -307,6 +307,7 @@
dir: "{{.InterfaceDir}}/../mocks"
interfaces:
RemoteDB:
+ JobDB:
go.skia.org/infra/task_scheduler/go/db/cache:
interfaces:
JobCache:
diff --git a/task_scheduler/go/job_creation/buildbucket_taskbackend/BUILD.bazel b/task_scheduler/go/job_creation/buildbucket_taskbackend/BUILD.bazel
new file mode 100644
index 0000000..492ee16
--- /dev/null
+++ b/task_scheduler/go/job_creation/buildbucket_taskbackend/BUILD.bazel
@@ -0,0 +1,39 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("//bazel/go:go_test.bzl", "go_test")
+
+go_library(
+ name = "buildbucket_taskbackend",
+ srcs = ["buildbucket_taskbackend.go"],
+ importpath = "go.skia.org/infra/task_scheduler/go/job_creation/buildbucket_taskbackend",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//go/buildbucket",
+ "//go/firestore",
+ "//go/now",
+ "//go/skerr",
+ "//go/sklog",
+ "//task_scheduler/go/db",
+ "//task_scheduler/go/types",
+ "@org_chromium_go_luci//buildbucket/proto",
+ "@org_golang_google_genproto_googleapis_rpc//status",
+ ],
+)
+
+go_test(
+ name = "buildbucket_taskbackend_test",
+ srcs = ["buildbucket_taskbackend_test.go"],
+ embed = [":buildbucket_taskbackend"],
+ deps = [
+ "//go/buildbucket/mocks",
+ "//go/firestore",
+ "//go/now",
+ "//go/testutils",
+ "//task_scheduler/go/db",
+ "//task_scheduler/go/mocks",
+ "//task_scheduler/go/types",
+ "@com_github_stretchr_testify//require",
+ "@org_chromium_go_luci//buildbucket/proto",
+ "@org_golang_google_genproto_googleapis_rpc//status",
+ "@org_golang_google_protobuf//types/known/timestamppb",
+ ],
+)
diff --git a/task_scheduler/go/job_creation/buildbucket_taskbackend/buildbucket_taskbackend.go b/task_scheduler/go/job_creation/buildbucket_taskbackend/buildbucket_taskbackend.go
new file mode 100644
index 0000000..62f56b3
--- /dev/null
+++ b/task_scheduler/go/job_creation/buildbucket_taskbackend/buildbucket_taskbackend.go
@@ -0,0 +1,273 @@
+package buildbucket_taskbackend
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "strconv"
+ "strings"
+
+ buildbucketpb "go.chromium.org/luci/buildbucket/proto"
+ "go.skia.org/infra/go/buildbucket"
+ "go.skia.org/infra/go/firestore"
+ "go.skia.org/infra/go/now"
+ "go.skia.org/infra/go/skerr"
+ "go.skia.org/infra/go/sklog"
+ "go.skia.org/infra/task_scheduler/go/db"
+ "go.skia.org/infra/task_scheduler/go/types"
+ "google.golang.org/genproto/googleapis/rpc/status"
+)
+
+// TaskBackend implements TaskBackendServer in terms of Task Scheduler Jobs.
+type TaskBackend struct {
+ bb2 buildbucket.BuildBucketInterface
+ buildbucketTarget string
+ db db.JobDB
+ projectRepoMapping map[string]string
+ taskSchedulerHost string
+}
+
+// NewTaskBackend returns a TaskBackend instance.
+func NewTaskBackend(ctx context.Context, buildbucketTarget, taskSchedulerHost string, projectRepoMapping map[string]string, d db.JobDB, bb2 buildbucket.BuildBucketInterface) (*TaskBackend, error) {
+ return &TaskBackend{
+ bb2: bb2,
+ buildbucketTarget: buildbucketTarget,
+ db: d,
+ projectRepoMapping: projectRepoMapping,
+ taskSchedulerHost: taskSchedulerHost,
+ }, nil
+}
+
+// RunTask implements TaskBackendServer.
+func (tb *TaskBackend) RunTask(ctx context.Context, req *buildbucketpb.RunTaskRequest) (*buildbucketpb.RunTaskResponse, error) {
+ // Validation.
+ if req.Target != tb.buildbucketTarget {
+ return nil, skerr.Fmt("incorrect target for this scheduler; expected %s", tb.buildbucketTarget)
+ }
+ if req.Secrets == nil {
+ return nil, skerr.Fmt("secrets not set on request")
+ }
+ buildId, err := strconv.ParseInt(req.BuildId, 10, 64)
+ if err != nil {
+ return nil, skerr.Wrapf(err, "invalid build ID")
+ }
+
+ // Look for any Jobs which we might have already created for this Build.
+ duplicates, err := tb.db.SearchJobs(ctx, &db.JobSearchParams{
+ BuildbucketBuildID: &buildId,
+ })
+ if err != nil {
+ return nil, skerr.Wrapf(err, "failed looking for duplicate jobs")
+ }
+ if len(duplicates) > 0 {
+ return &buildbucketpb.RunTaskResponse{
+ Task: JobToBuildbucketTask(ctx, duplicates[0], tb.buildbucketTarget, tb.taskSchedulerHost),
+ }, nil
+ }
+
+ // Get the build details from the v2 API.
+ // TODO(borenet): It would be much better to avoid sending an extra request
+ // back to Buildbucket. It seems like this information should be part of the
+ // RunTaskRequest - maybe it's already in RunTaskRequest.BackendConfig?
+ build, err := tb.bb2.GetBuild(ctx, buildId)
+ if err != nil {
+ return nil, skerr.Wrapf(err, "failed to retrieve build %d", buildId)
+ }
+ if build.Builder == nil {
+ return nil, skerr.Fmt("builder isn't set on build %d", buildId)
+ }
+
+ // Obtain and validate the RepoState.
+ if build.Input == nil || build.Input.GerritChanges == nil || len(build.Input.GerritChanges) != 1 {
+ return nil, skerr.Fmt("invalid Build %d: input should have exactly one GerritChanges: %+v", buildId, build.Input)
+ }
+ gerritChange := build.Input.GerritChanges[0]
+ repoUrl, ok := tb.projectRepoMapping[gerritChange.Project]
+ if !ok {
+ return nil, skerr.Fmt("unknown patch project %q", gerritChange.Project)
+ }
+ server := gerritChange.Host
+ if !strings.Contains(server, "://") {
+ server = fmt.Sprintf("https://%s", server)
+ }
+ rs := types.RepoState{
+ Patch: types.Patch{
+ Server: server,
+ Issue: strconv.FormatInt(gerritChange.Change, 10),
+ PatchRepo: repoUrl,
+ Patchset: strconv.FormatInt(gerritChange.Patchset, 10),
+ },
+ Repo: repoUrl,
+ // We can't fill this out without retrieving the Gerrit ChangeInfo and
+ // resolving the branch to a commit hash. Defer that work until later.
+ Revision: "",
+ }
+
+ // Create the Job.
+ j := &types.Job{
+ Name: build.Builder.Builder,
+ BuildbucketBuildId: buildId,
+ BuildbucketPubSubTopic: req.PubsubTopic,
+ BuildbucketToken: req.Secrets.StartBuildToken,
+ Requested: firestore.FixTimestamp(build.CreateTime.AsTime().UTC()),
+ Created: firestore.FixTimestamp(now.Now(ctx)),
+ RepoState: rs,
+ Status: types.JOB_STATUS_REQUESTED,
+ }
+ if !j.Requested.Before(j.Created) {
+ sklog.Errorf("Try job created time %s is before requested time %s! Setting equal.", j.Created, j.Requested)
+ j.Requested = j.Created.Add(-firestore.TS_RESOLUTION)
+ }
+
+ // Insert the Job into the DB.
+ if err := tb.db.PutJob(ctx, j); err != nil {
+ return nil, skerr.Wrapf(err, "failed to insert Job into the DB")
+ }
+ return &buildbucketpb.RunTaskResponse{
+ Task: JobToBuildbucketTask(ctx, j, tb.buildbucketTarget, tb.taskSchedulerHost),
+ }, nil
+}
+
+// FetchTasks implements TaskBackendServer.
+func (tb *TaskBackend) FetchTasks(ctx context.Context, req *buildbucketpb.FetchTasksRequest) (*buildbucketpb.FetchTasksResponse, error) {
+ resps := make([]*buildbucketpb.FetchTasksResponse_Response, 0, len(req.TaskIds))
+ for _, id := range req.TaskIds {
+ resp := &buildbucketpb.FetchTasksResponse_Response{}
+ if id.Target != tb.buildbucketTarget {
+ resp.Response = &buildbucketpb.FetchTasksResponse_Response_Error{
+ Error: &status.Status{
+ Code: http.StatusBadRequest,
+ Message: fmt.Sprintf("incorrect target for this scheduler; expected %s", tb.buildbucketTarget),
+ },
+ }
+ resps = append(resps, resp)
+ continue
+ }
+ job, err := tb.db.GetJobById(ctx, id.Id)
+ if err != nil {
+ resp.Response = &buildbucketpb.FetchTasksResponse_Response_Error{
+ Error: &status.Status{
+ Code: http.StatusInternalServerError,
+ Message: err.Error(),
+ },
+ }
+ resps = append(resps, resp)
+ continue
+ } else if job == nil {
+ resp.Response = &buildbucketpb.FetchTasksResponse_Response_Error{
+ Error: &status.Status{
+ Code: http.StatusNotFound,
+ Message: "unknown task",
+ },
+ }
+ resps = append(resps, resp)
+ continue
+ }
+ resp.Response = &buildbucketpb.FetchTasksResponse_Response_Task{
+ Task: JobToBuildbucketTask(ctx, job, tb.buildbucketTarget, tb.taskSchedulerHost),
+ }
+ resps = append(resps, resp)
+ }
+ return &buildbucketpb.FetchTasksResponse{
+ Responses: resps,
+ }, nil
+}
+
+// CancelTasks implements TaskBackendServer.
+func (tb *TaskBackend) CancelTasks(ctx context.Context, req *buildbucketpb.CancelTasksRequest) (*buildbucketpb.CancelTasksResponse, error) {
+ // Note: According to the Buildbucket docs, we're supposed to be ensuring
+ // that the tasks are fully canceled (ie. any underlying work is no longer
+ // running) before we return with a "canceled" status. We have the
+ // capability of canceling Swarming tasks, but given the complexity involved
+ // (we might, for example be sharing a given Swarming task between multiple
+ // jobs), I'm not sure we actually want to do that.
+ jobs := make([]*types.Job, 0, len(req.TaskIds))
+ for _, id := range req.TaskIds {
+ if id.Target != tb.buildbucketTarget {
+ return nil, skerr.Fmt("incorrect target for this scheduler; expected %s", tb.buildbucketTarget)
+ }
+ job, err := tb.db.GetJobById(ctx, id.Id)
+ if err != nil {
+ return nil, skerr.Wrap(err)
+ } else if job == nil {
+ return nil, skerr.Fmt("unknown job %q", id.Id)
+ }
+ jobs = append(jobs, job)
+ }
+ updated := make([]*types.Job, 0, len(jobs))
+ for _, job := range jobs {
+ if !job.Done() {
+ job.Status = types.JOB_STATUS_CANCELED
+ job.StatusDetails = "Canceled by Buildbucket"
+ updated = append(updated, job)
+ }
+ }
+ if len(updated) > 0 {
+ if err := tb.db.PutJobs(ctx, updated); err != nil {
+ return nil, skerr.Wrap(err)
+ }
+ }
+ resps := make([]*buildbucketpb.Task, 0, len(req.TaskIds))
+ for _, job := range jobs {
+ resps = append(resps, JobToBuildbucketTask(ctx, job, tb.buildbucketTarget, tb.taskSchedulerHost))
+ }
+ return &buildbucketpb.CancelTasksResponse{
+ Tasks: resps,
+ }, nil
+}
+
+// ValidateConfigs implements TaskBackendServer.
+func (tb *TaskBackend) ValidateConfigs(ctx context.Context, req *buildbucketpb.ValidateConfigsRequest) (*buildbucketpb.ValidateConfigsResponse, error) {
+ // TODO(borenet): I'm not sure what we're actually supposed to be validating
+ // in this method.
+ var errs []*buildbucketpb.ValidateConfigsResponse_ErrorDetail
+ for idx, cfg := range req.Configs {
+ if cfg.Target != tb.buildbucketTarget {
+ errs = append(errs, &buildbucketpb.ValidateConfigsResponse_ErrorDetail{
+ Index: int32(idx),
+ Error: fmt.Sprintf("incorrect target for this scheduler; expected %s", tb.buildbucketTarget),
+ })
+ }
+ }
+ return &buildbucketpb.ValidateConfigsResponse{
+ ConfigErrors: errs,
+ }, nil
+}
+
+// Assert that TaskBackend implements TaskBackendServer.
+var _ buildbucketpb.TaskBackendServer = &TaskBackend{}
+
+// JobStatusToBuildbucketStatus converts a types.JobStatus to a
+// buildbucketpb.Status.
+func JobStatusToBuildbucketStatus(status types.JobStatus) buildbucketpb.Status {
+ switch status {
+ case types.JOB_STATUS_CANCELED:
+ return buildbucketpb.Status_CANCELED
+ case types.JOB_STATUS_FAILURE:
+ return buildbucketpb.Status_FAILURE
+ case types.JOB_STATUS_IN_PROGRESS:
+ return buildbucketpb.Status_STARTED
+ case types.JOB_STATUS_MISHAP:
+ return buildbucketpb.Status_INFRA_FAILURE
+ case types.JOB_STATUS_REQUESTED:
+ return buildbucketpb.Status_SCHEDULED
+ case types.JOB_STATUS_SUCCESS:
+ return buildbucketpb.Status_SUCCESS
+ default:
+ return buildbucketpb.Status_STATUS_UNSPECIFIED
+ }
+}
+
+// JobToBuildbucketTask converts a types.Job to a buildbucketpb.Task.
+func JobToBuildbucketTask(ctx context.Context, job *types.Job, buildbucketTarget, taskSchedulerHost string) *buildbucketpb.Task {
+ return &buildbucketpb.Task{
+ Id: &buildbucketpb.TaskID{
+ Target: buildbucketTarget,
+ Id: job.Id,
+ },
+ Link: job.URL(taskSchedulerHost),
+ Status: JobStatusToBuildbucketStatus(job.Status),
+ SummaryMarkdown: job.StatusDetails,
+ UpdateId: now.Now(ctx).UnixNano(),
+ }
+}
diff --git a/task_scheduler/go/job_creation/buildbucket_taskbackend/buildbucket_taskbackend_test.go b/task_scheduler/go/job_creation/buildbucket_taskbackend/buildbucket_taskbackend_test.go
new file mode 100644
index 0000000..ef15f76
--- /dev/null
+++ b/task_scheduler/go/job_creation/buildbucket_taskbackend/buildbucket_taskbackend_test.go
@@ -0,0 +1,518 @@
+package buildbucket_taskbackend
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "net/http"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ buildbucketpb "go.chromium.org/luci/buildbucket/proto"
+ buildbucket_mocks "go.skia.org/infra/go/buildbucket/mocks"
+ "go.skia.org/infra/go/firestore"
+ "go.skia.org/infra/go/now"
+ "go.skia.org/infra/go/testutils"
+ "go.skia.org/infra/task_scheduler/go/db"
+ "go.skia.org/infra/task_scheduler/go/mocks"
+ "go.skia.org/infra/task_scheduler/go/types"
+ "google.golang.org/genproto/googleapis/rpc/status"
+ "google.golang.org/protobuf/types/known/timestamppb"
+)
+
+const (
+ fakeBuildbucketPubsubTopic = "fake-bb-pubsub"
+ fakeBuildbucketTarget = "skia://fake-scheduler"
+ fakeBuildbucketToken = "fake-token"
+ fakeBuildIdStr = "12345"
+ fakeJobName = "fake-job-name"
+ fakeTaskSchedulerHost = "https://fake-scheduler"
+ fakeProject = "fake-project"
+ fakeRepo = "https://fake.git"
+ fakeGerritHost = "fake-project-review.googlesource.com"
+ fakeGerritChange = 6789
+ fakeGerritPatchset = 3
+)
+
+var (
+ // This is a var instead of a constant so that we can take its address.
+ fakeBuildIdInt int64 = 12345
+ fakeCreateTime = firestore.FixTimestamp(time.Unix(1702395110, 0)) // Arbitrary timestamp.
+)
+
+func setup(t *testing.T) (context.Context, *TaskBackend, *mocks.JobDB, *buildbucket_mocks.BuildBucketInterface) {
+ ctx := now.TimeTravelingContext(fakeCreateTime.Add(time.Minute))
+ bb := &buildbucket_mocks.BuildBucketInterface{}
+ projectRepoMapping := map[string]string{
+ fakeProject: fakeRepo,
+ }
+ db := &mocks.JobDB{}
+ tb, err := NewTaskBackend(ctx, fakeBuildbucketTarget, fakeTaskSchedulerHost, projectRepoMapping, db, bb)
+ require.NoError(t, err)
+ t.Cleanup(func() {
+ db.AssertExpectations(t)
+ bb.AssertExpectations(t)
+ })
+ return ctx, tb, db, bb
+}
+
+func fakeBuild() *buildbucketpb.Build {
+ return &buildbucketpb.Build{
+ Id: fakeBuildIdInt,
+ Builder: &buildbucketpb.BuilderID{
+ Project: fakeProject,
+ Builder: fakeJobName,
+ },
+ CreateTime: timestamppb.New(fakeCreateTime),
+ Input: &buildbucketpb.Build_Input{
+ GerritChanges: []*buildbucketpb.GerritChange{
+ {
+ Project: fakeProject,
+ Host: fakeGerritHost,
+ Change: fakeGerritChange,
+ Patchset: fakeGerritPatchset,
+ },
+ },
+ },
+ }
+}
+
+func fakeRunTaskRequest() *buildbucketpb.RunTaskRequest {
+ return &buildbucketpb.RunTaskRequest{
+ BuildId: fakeBuildIdStr,
+ PubsubTopic: fakeBuildbucketPubsubTopic,
+ Secrets: &buildbucketpb.BuildSecrets{
+ StartBuildToken: fakeBuildbucketToken,
+ },
+ Target: fakeBuildbucketTarget,
+ }
+}
+
+func makeJob(ctx context.Context) *types.Job {
+ return &types.Job{
+ Name: fakeJobName,
+ BuildbucketBuildId: fakeBuildIdInt,
+ BuildbucketPubSubTopic: fakeBuildbucketPubsubTopic,
+ BuildbucketToken: fakeBuildbucketToken,
+ Requested: fakeCreateTime,
+ Created: firestore.FixTimestamp(now.Now(ctx)),
+ RepoState: types.RepoState{
+ Patch: types.Patch{
+ Server: "https://" + fakeGerritHost,
+ Issue: strconv.FormatInt(fakeGerritChange, 10),
+ PatchRepo: fakeRepo,
+ Patchset: strconv.FormatInt(fakeGerritPatchset, 10),
+ },
+ Repo: fakeRepo,
+ Revision: "",
+ },
+ Status: types.JOB_STATUS_REQUESTED,
+ }
+}
+
+func TestTaskBackend_RunTask_Success(t *testing.T) {
+ ctx, tb, d, bb := setup(t)
+ d.On("SearchJobs", testutils.AnyContext, &db.JobSearchParams{
+ BuildbucketBuildID: &fakeBuildIdInt,
+ }).Return(nil, nil)
+ bb.On("GetBuild", testutils.AnyContext, fakeBuildIdInt).Return(fakeBuild(), nil)
+ expectedJob := makeJob(ctx)
+ d.On("PutJob", testutils.AnyContext, expectedJob).Return(nil)
+ resp, err := tb.RunTask(ctx, fakeRunTaskRequest())
+ require.NoError(t, err)
+ require.Equal(t, &buildbucketpb.RunTaskResponse{
+ Task: JobToBuildbucketTask(ctx, expectedJob, fakeBuildbucketTarget, fakeTaskSchedulerHost),
+ }, resp)
+}
+
+func TestTaskBackend_RunTask_WrongTarget(t *testing.T) {
+ ctx, tb, _, _ := setup(t)
+ req := fakeRunTaskRequest()
+ req.Target = "bogus target"
+ resp, err := tb.RunTask(ctx, req)
+ require.ErrorContains(t, err, "incorrect target for this scheduler")
+ require.Nil(t, resp)
+}
+
+func TestTaskBackend_RunTask_InvalidID(t *testing.T) {
+ ctx, tb, _, _ := setup(t)
+ req := fakeRunTaskRequest()
+ req.BuildId = "not parseable as int64"
+ resp, err := tb.RunTask(ctx, req)
+ require.ErrorContains(t, err, "invalid build ID")
+ require.Nil(t, resp)
+}
+
+func TestTaskBackend_RunTask_Duplicate(t *testing.T) {
+ ctx, tb, d, _ := setup(t)
+ expectedJob := makeJob(ctx)
+ d.On("SearchJobs", testutils.AnyContext, &db.JobSearchParams{
+ BuildbucketBuildID: &fakeBuildIdInt,
+ }).Return([]*types.Job{expectedJob}, nil)
+ resp, err := tb.RunTask(ctx, fakeRunTaskRequest())
+ require.NoError(t, err)
+ require.Equal(t, &buildbucketpb.RunTaskResponse{
+ Task: JobToBuildbucketTask(ctx, expectedJob, fakeBuildbucketTarget, fakeTaskSchedulerHost),
+ }, resp)
+}
+
+func TestTaskBackend_RunTask_MultipleDuplicates_UseFirst(t *testing.T) {
+ ctx, tb, d, _ := setup(t)
+ expectedJob := makeJob(ctx)
+ otherJob := makeJob(ctx)
+ otherJob.Id = "not this one"
+ d.On("SearchJobs", testutils.AnyContext, &db.JobSearchParams{
+ BuildbucketBuildID: &fakeBuildIdInt,
+ }).Return([]*types.Job{expectedJob, otherJob}, nil)
+ resp, err := tb.RunTask(ctx, fakeRunTaskRequest())
+ require.NoError(t, err)
+ require.Equal(t, &buildbucketpb.RunTaskResponse{
+ Task: JobToBuildbucketTask(ctx, expectedJob, fakeBuildbucketTarget, fakeTaskSchedulerHost),
+ }, resp)
+}
+
+func TestTaskBackend_RunTask_FailedSearchingJobs(t *testing.T) {
+ ctx, tb, d, _ := setup(t)
+ req := fakeRunTaskRequest()
+ d.On("SearchJobs", testutils.AnyContext, &db.JobSearchParams{
+ BuildbucketBuildID: &fakeBuildIdInt,
+ }).Return(nil, errors.New("can't find the jobs!"))
+ resp, err := tb.RunTask(ctx, req)
+ require.ErrorContains(t, err, "failed looking for duplicate jobs")
+ require.Nil(t, resp)
+}
+
+func TestTaskBackend_RunTask_GetBuildFailed(t *testing.T) {
+ ctx, tb, d, bb := setup(t)
+ d.On("SearchJobs", testutils.AnyContext, &db.JobSearchParams{
+ BuildbucketBuildID: &fakeBuildIdInt,
+ }).Return(nil, nil)
+ bb.On("GetBuild", testutils.AnyContext, fakeBuildIdInt).Return(nil, errors.New("can't find the build!"))
+ req := fakeRunTaskRequest()
+ resp, err := tb.RunTask(ctx, req)
+ require.ErrorContains(t, err, "failed to retrieve build")
+ require.Nil(t, resp)
+}
+
+func TestTaskBackend_RunTask_NoBuilder(t *testing.T) {
+ ctx, tb, d, bb := setup(t)
+ d.On("SearchJobs", testutils.AnyContext, &db.JobSearchParams{
+ BuildbucketBuildID: &fakeBuildIdInt,
+ }).Return(nil, nil)
+ build := fakeBuild()
+ build.Builder = nil
+ bb.On("GetBuild", testutils.AnyContext, fakeBuildIdInt).Return(build, nil)
+ req := fakeRunTaskRequest()
+ resp, err := tb.RunTask(ctx, req)
+ require.ErrorContains(t, err, "builder isn't set")
+ require.Nil(t, resp)
+}
+
+func TestTaskBackend_RunTask_NoGerritChanges(t *testing.T) {
+ ctx, tb, d, bb := setup(t)
+ d.On("SearchJobs", testutils.AnyContext, &db.JobSearchParams{
+ BuildbucketBuildID: &fakeBuildIdInt,
+ }).Return(nil, nil)
+ build := fakeBuild()
+ build.Input.GerritChanges = nil
+ bb.On("GetBuild", testutils.AnyContext, fakeBuildIdInt).Return(build, nil)
+ req := fakeRunTaskRequest()
+ resp, err := tb.RunTask(ctx, req)
+ require.ErrorContains(t, err, "input should have exactly one GerritChanges")
+ require.Nil(t, resp)
+}
+
+func TestTaskBackend_RunTask_UnknownPatchProject(t *testing.T) {
+ ctx, tb, d, bb := setup(t)
+ d.On("SearchJobs", testutils.AnyContext, &db.JobSearchParams{
+ BuildbucketBuildID: &fakeBuildIdInt,
+ }).Return(nil, nil)
+ build := fakeBuild()
+ build.Input.GerritChanges[0].Project = "bogus project"
+ bb.On("GetBuild", testutils.AnyContext, fakeBuildIdInt).Return(build, nil)
+ req := fakeRunTaskRequest()
+ resp, err := tb.RunTask(ctx, req)
+ require.ErrorContains(t, err, "unknown patch project")
+ require.Nil(t, resp)
+}
+
+func TestTaskBackend_RunTask_FailedDBInsert(t *testing.T) {
+ ctx, tb, d, bb := setup(t)
+ d.On("SearchJobs", testutils.AnyContext, &db.JobSearchParams{
+ BuildbucketBuildID: &fakeBuildIdInt,
+ }).Return(nil, nil)
+ bb.On("GetBuild", testutils.AnyContext, fakeBuildIdInt).Return(fakeBuild(), nil)
+ expectedJob := makeJob(ctx)
+ d.On("PutJob", testutils.AnyContext, expectedJob).Return(errors.New("db failed"))
+ resp, err := tb.RunTask(ctx, fakeRunTaskRequest())
+ require.ErrorContains(t, err, "failed to insert Job into the DB")
+ require.Nil(t, resp)
+}
+
+func TestTaskBackend_FetchTasks_Success(t *testing.T) {
+ ctx, tb, d, _ := setup(t)
+ job := makeJob(ctx)
+ d.On("GetJobById", testutils.AnyContext, "my-job-id").Return(job, nil)
+ resp, err := tb.FetchTasks(ctx, &buildbucketpb.FetchTasksRequest{
+ TaskIds: []*buildbucketpb.TaskID{
+ {
+ Target: fakeBuildbucketTarget,
+ Id: "my-job-id",
+ },
+ },
+ })
+ require.NoError(t, err)
+ require.Equal(t, &buildbucketpb.FetchTasksResponse{
+ Responses: []*buildbucketpb.FetchTasksResponse_Response{
+ {
+ Response: &buildbucketpb.FetchTasksResponse_Response_Task{
+ Task: JobToBuildbucketTask(ctx, job, fakeBuildbucketTarget, fakeTaskSchedulerHost),
+ },
+ },
+ },
+ }, resp)
+}
+
+func TestTaskBackend_FetchTasks_WrongTarget(t *testing.T) {
+ ctx, tb, d, _ := setup(t)
+ job := makeJob(ctx)
+ d.On("GetJobById", testutils.AnyContext, "my-job-id").Return(job, nil)
+ resp, err := tb.FetchTasks(ctx, &buildbucketpb.FetchTasksRequest{
+ TaskIds: []*buildbucketpb.TaskID{
+ {
+ Target: fakeBuildbucketTarget,
+ Id: "my-job-id",
+ },
+ {
+ Target: "bogus target",
+ Id: "fail-job-id",
+ },
+ },
+ })
+ require.NoError(t, err)
+ require.Equal(t, &buildbucketpb.FetchTasksResponse{
+ Responses: []*buildbucketpb.FetchTasksResponse_Response{
+ {
+ Response: &buildbucketpb.FetchTasksResponse_Response_Task{
+ Task: JobToBuildbucketTask(ctx, job, fakeBuildbucketTarget, fakeTaskSchedulerHost),
+ },
+ },
+ {
+ Response: &buildbucketpb.FetchTasksResponse_Response_Error{
+ Error: &status.Status{
+ Code: http.StatusBadRequest,
+ Message: fmt.Sprintf("incorrect target for this scheduler; expected %s", fakeBuildbucketTarget),
+ },
+ },
+ },
+ },
+ }, resp)
+}
+
+func TestTaskBackend_FetchTasks_DBError(t *testing.T) {
+ ctx, tb, d, _ := setup(t)
+ job := makeJob(ctx)
+ d.On("GetJobById", testutils.AnyContext, "my-job-id").Return(job, nil)
+ d.On("GetJobById", testutils.AnyContext, "fail-job-id").Return(nil, errors.New("DB error"))
+ resp, err := tb.FetchTasks(ctx, &buildbucketpb.FetchTasksRequest{
+ TaskIds: []*buildbucketpb.TaskID{
+ {
+ Target: fakeBuildbucketTarget,
+ Id: "my-job-id",
+ },
+ {
+ Target: fakeBuildbucketTarget,
+ Id: "fail-job-id",
+ },
+ },
+ })
+ require.NoError(t, err)
+ require.Equal(t, &buildbucketpb.FetchTasksResponse{
+ Responses: []*buildbucketpb.FetchTasksResponse_Response{
+ {
+ Response: &buildbucketpb.FetchTasksResponse_Response_Task{
+ Task: JobToBuildbucketTask(ctx, job, fakeBuildbucketTarget, fakeTaskSchedulerHost),
+ },
+ },
+ {
+ Response: &buildbucketpb.FetchTasksResponse_Response_Error{
+ Error: &status.Status{
+ Code: http.StatusInternalServerError,
+ Message: "DB error",
+ },
+ },
+ },
+ },
+ }, resp)
+}
+
+func TestTaskBackend_FetchTasks_NotFound(t *testing.T) {
+ ctx, tb, d, _ := setup(t)
+ job := makeJob(ctx)
+ d.On("GetJobById", testutils.AnyContext, "my-job-id").Return(job, nil)
+ d.On("GetJobById", testutils.AnyContext, "fail-job-id").Return(nil, nil)
+ resp, err := tb.FetchTasks(ctx, &buildbucketpb.FetchTasksRequest{
+ TaskIds: []*buildbucketpb.TaskID{
+ {
+ Target: fakeBuildbucketTarget,
+ Id: "my-job-id",
+ },
+ {
+ Target: fakeBuildbucketTarget,
+ Id: "fail-job-id",
+ },
+ },
+ })
+ require.NoError(t, err)
+ require.Equal(t, &buildbucketpb.FetchTasksResponse{
+ Responses: []*buildbucketpb.FetchTasksResponse_Response{
+ {
+ Response: &buildbucketpb.FetchTasksResponse_Response_Task{
+ Task: JobToBuildbucketTask(ctx, job, fakeBuildbucketTarget, fakeTaskSchedulerHost),
+ },
+ },
+ {
+ Response: &buildbucketpb.FetchTasksResponse_Response_Error{
+ Error: &status.Status{
+ Code: http.StatusNotFound,
+ Message: "unknown task",
+ },
+ },
+ },
+ },
+ }, resp)
+}
+
+func TestTaskBackend_CancelTasks_Success(t *testing.T) {
+ ctx, tb, d, _ := setup(t)
+ beforeJob := makeJob(ctx)
+ afterJob := beforeJob.Copy()
+ d.On("GetJobById", testutils.AnyContext, "my-job-id").Return(beforeJob, nil)
+ afterJob.Status = types.JOB_STATUS_CANCELED
+ afterJob.StatusDetails = "Canceled by Buildbucket"
+ d.On("PutJobs", testutils.AnyContext, []*types.Job{afterJob}).Return(nil)
+ resp, err := tb.CancelTasks(ctx, &buildbucketpb.CancelTasksRequest{
+ TaskIds: []*buildbucketpb.TaskID{
+ {
+ Target: fakeBuildbucketTarget,
+ Id: "my-job-id",
+ },
+ },
+ })
+ require.NoError(t, err)
+ require.Equal(t, &buildbucketpb.CancelTasksResponse{
+ Tasks: []*buildbucketpb.Task{
+ JobToBuildbucketTask(ctx, afterJob, fakeBuildbucketTarget, fakeTaskSchedulerHost),
+ },
+ }, resp)
+}
+
+func TestTaskBackend_CancelTasks_WrongTarget(t *testing.T) {
+ ctx, tb, _, _ := setup(t)
+ resp, err := tb.CancelTasks(ctx, &buildbucketpb.CancelTasksRequest{
+ TaskIds: []*buildbucketpb.TaskID{
+ {
+ Target: "bogus-target",
+ Id: "my-job-id",
+ },
+ },
+ })
+ require.ErrorContains(t, err, "incorrect target for this scheduler")
+ require.Nil(t, resp)
+}
+
+func TestTaskBackend_CancelTasks_FailedDBRetrieve(t *testing.T) {
+ ctx, tb, d, _ := setup(t)
+ d.On("GetJobById", testutils.AnyContext, "my-job-id").Return(nil, nil)
+ resp, err := tb.CancelTasks(ctx, &buildbucketpb.CancelTasksRequest{
+ TaskIds: []*buildbucketpb.TaskID{
+ {
+ Target: fakeBuildbucketTarget,
+ Id: "my-job-id",
+ },
+ },
+ })
+ require.ErrorContains(t, err, "unknown job")
+ require.Nil(t, resp)
+}
+
+func TestTaskBackend_CancelTasks_FailedDBInsert(t *testing.T) {
+ ctx, tb, d, _ := setup(t)
+ beforeJob := makeJob(ctx)
+ afterJob := beforeJob.Copy()
+ d.On("GetJobById", testutils.AnyContext, "my-job-id").Return(beforeJob, nil)
+ afterJob.Status = types.JOB_STATUS_CANCELED
+ afterJob.StatusDetails = "Canceled by Buildbucket"
+ d.On("PutJobs", testutils.AnyContext, []*types.Job{afterJob}).Return(errors.New("DB error"))
+ resp, err := tb.CancelTasks(ctx, &buildbucketpb.CancelTasksRequest{
+ TaskIds: []*buildbucketpb.TaskID{
+ {
+ Target: fakeBuildbucketTarget,
+ Id: "my-job-id",
+ },
+ },
+ })
+ require.ErrorContains(t, err, "DB error")
+ require.Nil(t, resp)
+}
+
+func TestTaskBackend_CancelTasks_NoUpdates(t *testing.T) {
+ ctx, tb, d, _ := setup(t)
+ job := makeJob(ctx)
+ job.Status = types.JOB_STATUS_CANCELED
+ job.StatusDetails = "Canceled by Buildbucket"
+ d.On("GetJobById", testutils.AnyContext, "my-job-id").Return(job, nil)
+ resp, err := tb.CancelTasks(ctx, &buildbucketpb.CancelTasksRequest{
+ TaskIds: []*buildbucketpb.TaskID{
+ {
+ Target: fakeBuildbucketTarget,
+ Id: "my-job-id",
+ },
+ },
+ })
+ require.NoError(t, err)
+ require.Equal(t, &buildbucketpb.CancelTasksResponse{
+ Tasks: []*buildbucketpb.Task{
+ JobToBuildbucketTask(ctx, job, fakeBuildbucketTarget, fakeTaskSchedulerHost),
+ },
+ }, resp)
+}
+
+func TestTaskBackend_ValidateConfigs_Success(t *testing.T) {
+ ctx, tb, _, _ := setup(t)
+ resp, err := tb.ValidateConfigs(ctx, &buildbucketpb.ValidateConfigsRequest{
+ Configs: []*buildbucketpb.ValidateConfigsRequest_ConfigContext{
+ {
+ Target: fakeBuildbucketTarget,
+ ConfigJson: nil,
+ },
+ },
+ })
+ require.NoError(t, err)
+ require.Equal(t, &buildbucketpb.ValidateConfigsResponse{
+ ConfigErrors: nil,
+ }, resp)
+}
+
+func TestTaskBackend_ValidateConfigs_WrongTarget(t *testing.T) {
+ ctx, tb, _, _ := setup(t)
+ resp, err := tb.ValidateConfigs(ctx, &buildbucketpb.ValidateConfigsRequest{
+ Configs: []*buildbucketpb.ValidateConfigsRequest_ConfigContext{
+ {
+ Target: "bogus-target",
+ ConfigJson: nil,
+ },
+ },
+ })
+ require.NoError(t, err)
+ require.Equal(t, &buildbucketpb.ValidateConfigsResponse{
+ ConfigErrors: []*buildbucketpb.ValidateConfigsResponse_ErrorDetail{
+ {
+ Index: 0,
+ Error: fmt.Sprintf("incorrect target for this scheduler; expected %s", fakeBuildbucketTarget),
+ },
+ },
+ }, resp)
+}
diff --git a/task_scheduler/go/mocks/BUILD.bazel b/task_scheduler/go/mocks/BUILD.bazel
index 9c1b03c..93922c3 100644
--- a/task_scheduler/go/mocks/BUILD.bazel
+++ b/task_scheduler/go/mocks/BUILD.bazel
@@ -2,7 +2,10 @@
go_library(
name = "mocks",
- srcs = ["RemoteDB.go"],
+ srcs = [
+ "JobDB.go",
+ "RemoteDB.go",
+ ],
importpath = "go.skia.org/infra/task_scheduler/go/mocks",
visibility = ["//visibility:public"],
deps = [
diff --git a/task_scheduler/go/mocks/JobDB.go b/task_scheduler/go/mocks/JobDB.go
new file mode 100644
index 0000000..3e7825e
--- /dev/null
+++ b/task_scheduler/go/mocks/JobDB.go
@@ -0,0 +1,197 @@
+// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
+
+package mocks
+
+import (
+ context "context"
+
+ mock "github.com/stretchr/testify/mock"
+ db "go.skia.org/infra/task_scheduler/go/db"
+
+ time "time"
+
+ types "go.skia.org/infra/task_scheduler/go/types"
+)
+
+// JobDB is an autogenerated mock type for the JobDB type
+type JobDB struct {
+ mock.Mock
+}
+
+// GetJobById provides a mock function with given fields: _a0, _a1
+func (_m *JobDB) GetJobById(_a0 context.Context, _a1 string) (*types.Job, error) {
+ ret := _m.Called(_a0, _a1)
+
+ if len(ret) == 0 {
+ panic("no return value specified for GetJobById")
+ }
+
+ var r0 *types.Job
+ var r1 error
+ if rf, ok := ret.Get(0).(func(context.Context, string) (*types.Job, error)); ok {
+ return rf(_a0, _a1)
+ }
+ if rf, ok := ret.Get(0).(func(context.Context, string) *types.Job); ok {
+ r0 = rf(_a0, _a1)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(*types.Job)
+ }
+ }
+
+ if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
+ r1 = rf(_a0, _a1)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// GetJobsFromDateRange provides a mock function with given fields: _a0, _a1, _a2, _a3
+func (_m *JobDB) GetJobsFromDateRange(_a0 context.Context, _a1 time.Time, _a2 time.Time, _a3 string) ([]*types.Job, error) {
+ ret := _m.Called(_a0, _a1, _a2, _a3)
+
+ if len(ret) == 0 {
+ panic("no return value specified for GetJobsFromDateRange")
+ }
+
+ var r0 []*types.Job
+ var r1 error
+ if rf, ok := ret.Get(0).(func(context.Context, time.Time, time.Time, string) ([]*types.Job, error)); ok {
+ return rf(_a0, _a1, _a2, _a3)
+ }
+ if rf, ok := ret.Get(0).(func(context.Context, time.Time, time.Time, string) []*types.Job); ok {
+ r0 = rf(_a0, _a1, _a2, _a3)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).([]*types.Job)
+ }
+ }
+
+ if rf, ok := ret.Get(1).(func(context.Context, time.Time, time.Time, string) error); ok {
+ r1 = rf(_a0, _a1, _a2, _a3)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// ModifiedJobsCh provides a mock function with given fields: _a0
+func (_m *JobDB) ModifiedJobsCh(_a0 context.Context) <-chan []*types.Job {
+ ret := _m.Called(_a0)
+
+ if len(ret) == 0 {
+ panic("no return value specified for ModifiedJobsCh")
+ }
+
+ var r0 <-chan []*types.Job
+ if rf, ok := ret.Get(0).(func(context.Context) <-chan []*types.Job); ok {
+ r0 = rf(_a0)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(<-chan []*types.Job)
+ }
+ }
+
+ return r0
+}
+
+// PutJob provides a mock function with given fields: _a0, _a1
+func (_m *JobDB) PutJob(_a0 context.Context, _a1 *types.Job) error {
+ ret := _m.Called(_a0, _a1)
+
+ if len(ret) == 0 {
+ panic("no return value specified for PutJob")
+ }
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func(context.Context, *types.Job) error); ok {
+ r0 = rf(_a0, _a1)
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
+// PutJobs provides a mock function with given fields: _a0, _a1
+func (_m *JobDB) PutJobs(_a0 context.Context, _a1 []*types.Job) error {
+ ret := _m.Called(_a0, _a1)
+
+ if len(ret) == 0 {
+ panic("no return value specified for PutJobs")
+ }
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func(context.Context, []*types.Job) error); ok {
+ r0 = rf(_a0, _a1)
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
+// PutJobsInChunks provides a mock function with given fields: _a0, _a1
+func (_m *JobDB) PutJobsInChunks(_a0 context.Context, _a1 []*types.Job) error {
+ ret := _m.Called(_a0, _a1)
+
+ if len(ret) == 0 {
+ panic("no return value specified for PutJobsInChunks")
+ }
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func(context.Context, []*types.Job) error); ok {
+ r0 = rf(_a0, _a1)
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
+// SearchJobs provides a mock function with given fields: _a0, _a1
+func (_m *JobDB) SearchJobs(_a0 context.Context, _a1 *db.JobSearchParams) ([]*types.Job, error) {
+ ret := _m.Called(_a0, _a1)
+
+ if len(ret) == 0 {
+ panic("no return value specified for SearchJobs")
+ }
+
+ var r0 []*types.Job
+ var r1 error
+ if rf, ok := ret.Get(0).(func(context.Context, *db.JobSearchParams) ([]*types.Job, error)); ok {
+ return rf(_a0, _a1)
+ }
+ if rf, ok := ret.Get(0).(func(context.Context, *db.JobSearchParams) []*types.Job); ok {
+ r0 = rf(_a0, _a1)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).([]*types.Job)
+ }
+ }
+
+ if rf, ok := ret.Get(1).(func(context.Context, *db.JobSearchParams) error); ok {
+ r1 = rf(_a0, _a1)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// NewJobDB creates a new instance of JobDB. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+// The first argument is typically a *testing.T value.
+func NewJobDB(t interface {
+ mock.TestingT
+ Cleanup(func())
+}) *JobDB {
+ mock := &JobDB{}
+ mock.Mock.Test(t)
+
+ t.Cleanup(func() { mock.AssertExpectations(t) })
+
+ return mock
+}
diff --git a/task_scheduler/go/tryjobs/BUILD.bazel b/task_scheduler/go/tryjobs/BUILD.bazel
index 3694eb8..2603f70 100644
--- a/task_scheduler/go/tryjobs/BUILD.bazel
+++ b/task_scheduler/go/tryjobs/BUILD.bazel
@@ -21,6 +21,7 @@
"//task_scheduler/go/cacher",
"//task_scheduler/go/db",
"//task_scheduler/go/db/cache",
+ "//task_scheduler/go/job_creation/buildbucket_taskbackend",
"//task_scheduler/go/task_cfg_cache",
"//task_scheduler/go/types",
"@com_github_golang_protobuf//ptypes:go_default_library_gen",
@@ -57,6 +58,7 @@
"//task_scheduler/go/db",
"//task_scheduler/go/db/cache",
"//task_scheduler/go/db/memory",
+ "//task_scheduler/go/job_creation/buildbucket_taskbackend",
"//task_scheduler/go/task_cfg_cache/mocks",
"//task_scheduler/go/task_cfg_cache/testutils",
"//task_scheduler/go/types",
diff --git a/task_scheduler/go/tryjobs/tryjobs.go b/task_scheduler/go/tryjobs/tryjobs.go
index 1a6d497..0599f1a 100644
--- a/task_scheduler/go/tryjobs/tryjobs.go
+++ b/task_scheduler/go/tryjobs/tryjobs.go
@@ -30,6 +30,7 @@
"go.skia.org/infra/task_scheduler/go/cacher"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/db/cache"
+ "go.skia.org/infra/task_scheduler/go/job_creation/buildbucket_taskbackend"
"go.skia.org/infra/task_scheduler/go/task_cfg_cache"
"go.skia.org/infra/task_scheduler/go/types"
"google.golang.org/protobuf/encoding/prototext"
@@ -354,7 +355,7 @@
Id: job.Id,
},
Link: job.URL(t.host),
- Status: jobStatusToBuildbucketStatus(job.Status),
+ Status: buildbucket_taskbackend.JobStatusToBuildbucketStatus(job.Status),
UpdateId: now.Now(ctx).UnixNano(),
},
}
@@ -839,29 +840,10 @@
return false
}
-func jobStatusToBuildbucketStatus(status types.JobStatus) buildbucketpb.Status {
- switch status {
- case types.JOB_STATUS_CANCELED:
- return buildbucketpb.Status_CANCELED
- case types.JOB_STATUS_FAILURE:
- return buildbucketpb.Status_FAILURE
- case types.JOB_STATUS_IN_PROGRESS:
- return buildbucketpb.Status_STARTED
- case types.JOB_STATUS_MISHAP:
- return buildbucketpb.Status_INFRA_FAILURE
- case types.JOB_STATUS_REQUESTED:
- return buildbucketpb.Status_SCHEDULED
- case types.JOB_STATUS_SUCCESS:
- return buildbucketpb.Status_SUCCESS
- default:
- return buildbucketpb.Status_STATUS_UNSPECIFIED
- }
-}
-
// jobToBuildV2 converts a Job to a Buildbucket V2 Build to be used with
// UpdateBuild.
func jobToBuildV2(job *types.Job) *buildbucketpb.Build {
- status := jobStatusToBuildbucketStatus(job.Status)
+ status := buildbucket_taskbackend.JobStatusToBuildbucketStatus(job.Status)
// Note: There are other fields we could fill in, but I'm not sure they
// would provide any value since we don't actually use Buildbucket builds
diff --git a/task_scheduler/go/tryjobs/tryjobs_test.go b/task_scheduler/go/tryjobs/tryjobs_test.go
index 3d8aba7..1017ec8 100644
--- a/task_scheduler/go/tryjobs/tryjobs_test.go
+++ b/task_scheduler/go/tryjobs/tryjobs_test.go
@@ -20,10 +20,10 @@
"go.skia.org/infra/go/gerrit"
"go.skia.org/infra/go/git"
"go.skia.org/infra/go/mockhttpclient"
- "go.skia.org/infra/go/now"
pubsub_mocks "go.skia.org/infra/go/pubsub/mocks"
"go.skia.org/infra/go/testutils"
"go.skia.org/infra/task_scheduler/go/db"
+ "go.skia.org/infra/task_scheduler/go/job_creation/buildbucket_taskbackend"
"go.skia.org/infra/task_scheduler/go/types"
"google.golang.org/protobuf/encoding/prototext"
)
@@ -77,15 +77,7 @@
// Mock the pubsub message.
update := &buildbucketpb.BuildTaskUpdate{
BuildId: strconv.FormatInt(j1.BuildbucketBuildId, 10),
- Task: &buildbucketpb.Task{
- Id: &buildbucketpb.TaskID{
- Target: trybots.buildbucketTarget,
- Id: j1.Id,
- },
- Link: j1.URL(trybots.host),
- Status: jobStatusToBuildbucketStatus(j1.Status),
- UpdateId: now.Now(ctx).UnixNano(),
- },
+ Task: buildbucket_taskbackend.JobToBuildbucketTask(ctx, j1, trybots.buildbucketTarget, trybots.host),
}
b, err := prototext.Marshal(update)
require.NoError(t, err)
@@ -221,15 +213,7 @@
for _, job := range jobs {
update := &buildbucketpb.BuildTaskUpdate{
BuildId: strconv.FormatInt(job.BuildbucketBuildId, 10),
- Task: &buildbucketpb.Task{
- Id: &buildbucketpb.TaskID{
- Target: trybots.buildbucketTarget,
- Id: job.Id,
- },
- Link: job.URL(trybots.host),
- Status: jobStatusToBuildbucketStatus(job.Status),
- UpdateId: now.Now(ctx).UnixNano(),
- },
+ Task: buildbucket_taskbackend.JobToBuildbucketTask(ctx, job, trybots.buildbucketTarget, trybots.host),
}
b, err := prototext.Marshal(update)
require.NoError(t, err)