blob: 7f4b342719b1b8f1cbe8b59b9b32d735e26a9a2b [file] [log] [blame]
package buildbucket_taskbackend
import (
"context"
"fmt"
"net/http"
"strconv"
"strings"
buildbucketpb "go.chromium.org/luci/buildbucket/proto"
"go.skia.org/infra/go/buildbucket"
"go.skia.org/infra/go/firestore"
"go.skia.org/infra/go/now"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/types"
"google.golang.org/genproto/googleapis/rpc/status"
)
// TaskBackend implements TaskBackendServer in terms of Task Scheduler Jobs.
type TaskBackend struct {
bb2 buildbucket.BuildBucketInterface
buildbucketTarget string
db db.JobDB
projectRepoMapping map[string]string
taskSchedulerHost string
}
// NewTaskBackend returns a TaskBackend instance.
func NewTaskBackend(buildbucketTarget, taskSchedulerHost string, projectRepoMapping map[string]string, d db.JobDB, bb2 buildbucket.BuildBucketInterface) *TaskBackend {
return &TaskBackend{
bb2: bb2,
buildbucketTarget: buildbucketTarget,
db: d,
projectRepoMapping: projectRepoMapping,
taskSchedulerHost: taskSchedulerHost,
}
}
// RunTask implements TaskBackendServer.
func (tb *TaskBackend) RunTask(ctx context.Context, req *buildbucketpb.RunTaskRequest) (*buildbucketpb.RunTaskResponse, error) {
// Validation.
if req.Target != tb.buildbucketTarget {
return nil, skerr.Fmt("incorrect target for this scheduler; expected %s", tb.buildbucketTarget)
}
if req.Secrets == nil {
return nil, skerr.Fmt("secrets not set on request")
}
if req.Secrets.StartBuildToken == "" {
return nil, skerr.Fmt("missing StartBuildToken")
}
buildId, err := strconv.ParseInt(req.BuildId, 10, 64)
if err != nil {
return nil, skerr.Wrapf(err, "invalid build ID")
}
// Look for any Jobs which we might have already created for this Build.
duplicates, err := tb.db.SearchJobs(ctx, &db.JobSearchParams{
BuildbucketBuildID: &buildId,
})
if err != nil {
return nil, skerr.Wrapf(err, "failed looking for duplicate jobs")
}
if len(duplicates) > 0 {
return &buildbucketpb.RunTaskResponse{
Task: JobToBuildbucketTask(ctx, duplicates[0], tb.buildbucketTarget, tb.taskSchedulerHost),
}, nil
}
// Get the build details from the v2 API.
// TODO(borenet): It would be much better to avoid sending an extra request
// back to Buildbucket. It seems like this information should be part of the
// RunTaskRequest - maybe it's already in RunTaskRequest.BackendConfig?
build, err := tb.bb2.GetBuild(ctx, buildId)
if err != nil {
return nil, skerr.Wrapf(err, "failed to retrieve build %d", buildId)
}
if build.Builder == nil {
return nil, skerr.Fmt("builder isn't set on build %d", buildId)
}
// Obtain and validate the RepoState.
if build.Input == nil || build.Input.GerritChanges == nil || len(build.Input.GerritChanges) != 1 {
return nil, skerr.Fmt("invalid Build %d: input should have exactly one GerritChanges: %+v", buildId, build.Input)
}
gerritChange := build.Input.GerritChanges[0]
repoUrl, ok := tb.projectRepoMapping[gerritChange.Project]
if !ok {
return nil, skerr.Fmt("unknown patch project %q", gerritChange.Project)
}
server := gerritChange.Host
if !strings.Contains(server, "://") {
server = fmt.Sprintf("https://%s", server)
}
rs := types.RepoState{
Patch: types.Patch{
Server: server,
Issue: strconv.FormatInt(gerritChange.Change, 10),
PatchRepo: repoUrl,
Patchset: strconv.FormatInt(gerritChange.Patchset, 10),
},
Repo: repoUrl,
// We can't fill this out without retrieving the Gerrit ChangeInfo and
// resolving the branch to a commit hash. Defer that work until later.
Revision: "",
}
// Create the Job.
j := &types.Job{
Name: build.Builder.Builder,
BuildbucketBuildId: buildId,
BuildbucketPubSubTopic: req.PubsubTopic,
BuildbucketToken: req.Secrets.StartBuildToken,
Requested: firestore.FixTimestamp(build.CreateTime.AsTime().UTC()),
Created: firestore.FixTimestamp(now.Now(ctx)),
RepoState: rs,
Status: types.JOB_STATUS_REQUESTED,
}
if !j.Requested.Before(j.Created) {
sklog.Errorf("Try job created time %s is before requested time %s! Setting equal.", j.Created, j.Requested)
j.Requested = j.Created.Add(-firestore.TS_RESOLUTION)
}
// Insert the Job into the DB.
if err := tb.db.PutJob(ctx, j); err != nil {
return nil, skerr.Wrapf(err, "failed to insert Job into the DB")
}
return &buildbucketpb.RunTaskResponse{
Task: JobToBuildbucketTask(ctx, j, tb.buildbucketTarget, tb.taskSchedulerHost),
}, nil
}
// FetchTasks implements TaskBackendServer.
func (tb *TaskBackend) FetchTasks(ctx context.Context, req *buildbucketpb.FetchTasksRequest) (*buildbucketpb.FetchTasksResponse, error) {
resps := make([]*buildbucketpb.FetchTasksResponse_Response, 0, len(req.TaskIds))
for _, id := range req.TaskIds {
resp := &buildbucketpb.FetchTasksResponse_Response{}
if id.Target != tb.buildbucketTarget {
resp.Response = &buildbucketpb.FetchTasksResponse_Response_Error{
Error: &status.Status{
Code: http.StatusBadRequest,
Message: fmt.Sprintf("incorrect target for this scheduler; expected %s", tb.buildbucketTarget),
},
}
resps = append(resps, resp)
continue
}
job, err := tb.db.GetJobById(ctx, id.Id)
if err != nil {
resp.Response = &buildbucketpb.FetchTasksResponse_Response_Error{
Error: &status.Status{
Code: http.StatusInternalServerError,
Message: err.Error(),
},
}
resps = append(resps, resp)
continue
} else if job == nil {
resp.Response = &buildbucketpb.FetchTasksResponse_Response_Error{
Error: &status.Status{
Code: http.StatusNotFound,
Message: "unknown task",
},
}
resps = append(resps, resp)
continue
}
resp.Response = &buildbucketpb.FetchTasksResponse_Response_Task{
Task: JobToBuildbucketTask(ctx, job, tb.buildbucketTarget, tb.taskSchedulerHost),
}
resps = append(resps, resp)
}
return &buildbucketpb.FetchTasksResponse{
Responses: resps,
}, nil
}
// CancelTasks implements TaskBackendServer.
func (tb *TaskBackend) CancelTasks(ctx context.Context, req *buildbucketpb.CancelTasksRequest) (*buildbucketpb.CancelTasksResponse, error) {
// Note: According to the Buildbucket docs, we're supposed to be ensuring
// that the tasks are fully canceled (ie. any underlying work is no longer
// running) before we return with a "canceled" status. We have the
// capability of canceling Swarming tasks, but given the complexity involved
// (we might, for example be sharing a given Swarming task between multiple
// jobs), I'm not sure we actually want to do that.
jobs := make([]*types.Job, 0, len(req.TaskIds))
for _, id := range req.TaskIds {
if id.Target != tb.buildbucketTarget {
return nil, skerr.Fmt("incorrect target for this scheduler; expected %s", tb.buildbucketTarget)
}
job, err := tb.db.GetJobById(ctx, id.Id)
if err != nil {
return nil, skerr.Wrap(err)
} else if job == nil {
return nil, skerr.Fmt("unknown job %q", id.Id)
}
jobs = append(jobs, job)
}
updated := make([]*types.Job, 0, len(jobs))
for _, job := range jobs {
if !job.Done() {
job.Status = types.JOB_STATUS_CANCELED
job.StatusDetails = "Canceled by Buildbucket"
updated = append(updated, job)
}
}
if len(updated) > 0 {
if err := tb.db.PutJobs(ctx, updated); err != nil {
return nil, skerr.Wrap(err)
}
}
resps := make([]*buildbucketpb.Task, 0, len(req.TaskIds))
for _, job := range jobs {
resps = append(resps, JobToBuildbucketTask(ctx, job, tb.buildbucketTarget, tb.taskSchedulerHost))
}
return &buildbucketpb.CancelTasksResponse{
Tasks: resps,
}, nil
}
// ValidateConfigs implements TaskBackendServer.
func (tb *TaskBackend) ValidateConfigs(ctx context.Context, req *buildbucketpb.ValidateConfigsRequest) (*buildbucketpb.ValidateConfigsResponse, error) {
// TODO(borenet): I'm not sure what we're actually supposed to be validating
// in this method.
var errs []*buildbucketpb.ValidateConfigsResponse_ErrorDetail
for idx, cfg := range req.Configs {
if cfg.Target != tb.buildbucketTarget {
errs = append(errs, &buildbucketpb.ValidateConfigsResponse_ErrorDetail{
Index: int32(idx),
Error: fmt.Sprintf("incorrect target for this scheduler; expected %s", tb.buildbucketTarget),
})
}
}
return &buildbucketpb.ValidateConfigsResponse{
ConfigErrors: errs,
}, nil
}
// Assert that TaskBackend implements TaskBackendServer.
var _ buildbucketpb.TaskBackendServer = &TaskBackend{}
// JobStatusToBuildbucketStatus converts a types.JobStatus to a
// buildbucketpb.Status.
func JobStatusToBuildbucketStatus(status types.JobStatus) buildbucketpb.Status {
switch status {
case types.JOB_STATUS_CANCELED:
return buildbucketpb.Status_CANCELED
case types.JOB_STATUS_FAILURE:
return buildbucketpb.Status_FAILURE
case types.JOB_STATUS_IN_PROGRESS:
return buildbucketpb.Status_STARTED
case types.JOB_STATUS_MISHAP:
return buildbucketpb.Status_INFRA_FAILURE
case types.JOB_STATUS_REQUESTED:
return buildbucketpb.Status_SCHEDULED
case types.JOB_STATUS_SUCCESS:
return buildbucketpb.Status_SUCCESS
default:
return buildbucketpb.Status_STATUS_UNSPECIFIED
}
}
// JobToBuildbucketTask converts a types.Job to a buildbucketpb.Task.
func JobToBuildbucketTask(ctx context.Context, job *types.Job, buildbucketTarget, taskSchedulerHost string) *buildbucketpb.Task {
return &buildbucketpb.Task{
Id: &buildbucketpb.TaskID{
Target: buildbucketTarget,
Id: job.Id,
},
Link: job.URL(taskSchedulerHost),
Status: JobStatusToBuildbucketStatus(job.Status),
SummaryMarkdown: job.StatusDetails,
UpdateId: now.Now(ctx).UnixNano(),
}
}