blob: 350ab80322c9bda8c5a4598b400992620d08389c [file] [log] [blame]
/*
Handlers, types, and functions common to all types of tasks.
*/
package task_common
import (
"context"
"encoding/json"
"fmt"
"net/http"
"reflect"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
"cloud.google.com/go/datastore"
"github.com/go-chi/chi/v5"
apipb "go.chromium.org/luci/swarming/proto/api_v2"
"go.skia.org/infra/ct/go/ct_autoscaler"
ctfeutil "go.skia.org/infra/ct/go/ctfe/util"
ctutil "go.skia.org/infra/ct/go/util"
"go.skia.org/infra/go/alogin"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/cas"
"go.skia.org/infra/go/ds"
"go.skia.org/infra/go/gerrit"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/roles"
"go.skia.org/infra/go/sklog"
swarmingv2 "go.skia.org/infra/go/swarming/v2"
skutil "go.skia.org/infra/go/util"
"golang.org/x/oauth2/google"
"google.golang.org/api/iterator"
)
const (
// Default page size used for pagination.
DEFAULT_PAGE_SIZE = 10
// Maximum page size used for pagination.
MAX_PAGE_SIZE = 100
CANCEL_SWARMING_TASKS_WORKER_POOL_SIZE = 100
)
var (
httpClient *http.Client
datastoreIdMutex sync.Mutex
// CT autoscaler.
autoscaler ct_autoscaler.ICTAutoscaler
// The location of the service account JSON file.
ServiceAccountFile string
// Will be used to construct task specific URLs in emails. Will have a trailing "/".
WebappURL string
swarm swarmingv2.SwarmingV2Client
casClient cas.CAS
plogin alogin.Login
)
// SetLogin should be called before any HTTP traffic is served.
func SetLogin(p alogin.Login) {
plogin = p
}
func isAdmin(r *http.Request) bool {
return plogin.HasRole(r, roles.Admin)
}
type CommonCols struct {
DatastoreKey *datastore.Key `json:"-" datastore:"__key__"`
TsAdded int64 `json:"ts_added"`
TsStarted int64 `json:"ts_started"`
TsCompleted int64 `json:"ts_completed"`
Username string `json:"username"`
Failure bool `json:"failure"`
RepeatAfterDays int64 `json:"repeat_after_days"`
SwarmingLogs string `json:"swarming_logs"`
TaskDone bool `json:"task_done"`
SwarmingTaskID string `json:"swarming_task_id"`
Id int `json:"id" datastore:"-"`
CanRedo bool `json:"can_redo" datastore:"-"`
CanDelete bool `json:"can_delete" datastore:"-"`
FutureDate bool `json:"future_date" datastore:"-"`
TaskType string `json:"task_type" datastore:"-"`
GetURL string `json:"get_url" datastore:"-"`
DeleteURL string `json:"delete_url" datastore:"-"`
}
type Task interface {
GetCommonCols() *CommonCols
RunsOnGCEWorkers() bool
TriggerSwarmingTaskAndMail(ctx context.Context, swarmingClient swarmingv2.SwarmingV2Client, casClient cas.CAS) error
SendCompletionEmail(ctx context.Context, completedSuccessfully bool) error
GetTaskName() string
SetCompleted(success bool)
GetDatastoreKind() ds.Kind
GetDescription() string
// Returns a slice of the struct type.
Query(it *datastore.Iterator) (interface{}, error)
// Returns the struct type.
Get(c context.Context, key *datastore.Key) (Task, error)
// Returns the corresponding AddTaskVars instance of this Task. The returned
// instance is populated.
GetPopulatedAddTaskVars() (AddTaskVars, error)
// Returns the results link for this task if it completed successfully and if
// the task supports results links.
GetResultsLink() string
}
// UpdateTaskSetStarted sets the following on the task and updates it in Datastore:
// * TsStarted
// * SwarmingTaskID
// * SwarmingLogsLink
func UpdateTaskSetStarted(ctx context.Context, runID, swarmingTaskID string, task Task) error {
task.GetCommonCols().TsStarted = ctutil.GetCurrentTsInt64()
task.GetCommonCols().SwarmingTaskID = swarmingTaskID
task.GetCommonCols().SwarmingLogs = fmt.Sprintf(ctutil.SWARMING_RUN_ID_ALL_TASKS_LINK_TEMPLATE, runID)
if _, err := ds.DS.Put(ctx, task.GetCommonCols().DatastoreKey, task); err != nil {
return fmt.Errorf("Failed to update task %d in the datastore: %s", task.GetCommonCols().DatastoreKey.ID, err)
}
return nil
}
// UpdateTaskSetCompleted calls the task's SetCompleted method and updates it in Datastore.
func UpdateTaskSetCompleted(ctx context.Context, task Task, success bool) error {
task.SetCompleted(success)
if _, err := ds.DS.Put(ctx, task.GetCommonCols().DatastoreKey, task); err != nil {
return fmt.Errorf("Failed to update task %d in the datastore: %s", task.GetCommonCols().DatastoreKey.ID, err)
}
return nil
}
func (dbrow *CommonCols) GetCommonCols() *CommonCols {
return dbrow
}
// Takes the result of Task.Query and returns a slice of Tasks containing the same objects.
func AsTaskSlice(selectResult interface{}) []Task {
if selectResult == nil {
return []Task{}
}
sliceValue := reflect.ValueOf(selectResult)
sliceLen := sliceValue.Len()
result := make([]Task, sliceLen)
for i := 0; i < sliceLen; i++ {
result[i] = sliceValue.Index(i).Interface().(Task)
}
return result
}
// Generates a unique ID for this task.
func GetRunID(task Task) string {
return fmt.Sprintf("%s-%s-%d", strings.SplitN(task.GetCommonCols().Username, "@", 2)[0], task.GetTaskName(), task.GetCommonCols().DatastoreKey.ID)
}
// Data included in all tasks; set by AddTaskHandler.
type AddTaskCommonVars struct {
Username string `json:"username"`
TsAdded string `json:"ts_added"`
RepeatAfterDays string `json:"repeat_after_days"`
}
type AddTaskVars interface {
GetAddTaskCommonVars() *AddTaskCommonVars
IsAdminTask() bool
GetDatastoreKind() ds.Kind
GetPopulatedDatastoreTask(ctx context.Context) (Task, error)
}
func (vars *AddTaskCommonVars) GetAddTaskCommonVars() *AddTaskCommonVars {
return vars
}
func (vars *AddTaskCommonVars) IsAdminTask() bool {
return false
}
func AddTaskHandler(w http.ResponseWriter, r *http.Request, task AddTaskVars) {
if task.IsAdminTask() && !isAdmin(r) {
httputils.ReportError(w, nil, "Must be admin to add admin tasks; contact rmistry@", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewDecoder(r.Body).Decode(&task); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to add %T task", task), http.StatusInternalServerError)
return
}
defer skutil.Close(r.Body)
task.GetAddTaskCommonVars().Username = string(plogin.LoggedInAs(r))
task.GetAddTaskCommonVars().TsAdded = ctutil.GetCurrentTs()
if len(task.GetAddTaskCommonVars().Username) > 255 {
httputils.ReportError(w, nil, "Username is too long, limit 255 bytes", http.StatusInternalServerError)
return
}
if err := AddAndTriggerTask(r.Context(), task); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to insert or trigger %T task", task), http.StatusInternalServerError)
return
}
}
// AddAndTriggerTask adds the task to datastore and then triggers swarming tasks.
// The swarming tasks are triggered in a separate goroutine because if it is a GCE
// task then it can take a min or 2 to autoscale the GCE instances.
func AddAndTriggerTask(ctx context.Context, task AddTaskVars) error {
datastoreTask, err := AddTaskToDatastore(ctx, task)
if err != nil {
return fmt.Errorf("Failed to insert %T task: %s", task, err)
}
go func() {
// Use a new context because we want the following to finish even after the HTTP
// request is completed.
ctx := context.Background()
if err := TriggerTaskOnSwarming(ctx, task, datastoreTask); err != nil {
sklog.Errorf("Failed to trigger on swarming %T task: %s", task, err)
// Populate the started timestamp before we mark it as completed and failed.
datastoreTask.GetCommonCols().TsStarted = ctutil.GetCurrentTsInt64()
if err := UpdateTaskSetCompleted(ctx, datastoreTask, false); err != nil {
sklog.Error(err)
} else {
skutil.LogErr(datastoreTask.SendCompletionEmail(ctx, false))
}
}
}()
return nil
}
func AddTaskToDatastore(ctx context.Context, task AddTaskVars) (Task, error) {
datastoreTask, err := task.GetPopulatedDatastoreTask(ctx)
if err != nil {
return nil, fmt.Errorf("Could not get populated datastore task: %s", err)
}
// Create the key.
id, err := GetNextId(ctx, task.GetDatastoreKind(), datastoreTask)
if err != nil {
return nil, fmt.Errorf("Could not get highest id for %s: %s", task.GetDatastoreKind(), err)
}
key := ds.NewKey(task.GetDatastoreKind())
key.ID = id
datastoreTask.GetCommonCols().DatastoreKey = key
// Add the common columns to the task.
tsAdded, err := strconv.ParseInt(task.GetAddTaskCommonVars().TsAdded, 10, 64)
if err != nil {
return nil, fmt.Errorf("%s is not int64: %s", task.GetAddTaskCommonVars().TsAdded, err)
}
datastoreTask.GetCommonCols().TsAdded = tsAdded
datastoreTask.GetCommonCols().Username = task.GetAddTaskCommonVars().Username
repeatAfterDays, err := strconv.ParseInt(task.GetAddTaskCommonVars().RepeatAfterDays, 10, 64)
if err != nil {
return nil, fmt.Errorf("%s is not int64: %s", task.GetAddTaskCommonVars().RepeatAfterDays, err)
}
datastoreTask.GetCommonCols().RepeatAfterDays = repeatAfterDays
if _, err := ds.DS.Put(ctx, key, datastoreTask); err != nil {
return nil, fmt.Errorf("Error putting task in datastore: %s", err)
}
return datastoreTask, nil
}
func TriggerTaskOnSwarming(ctx context.Context, task AddTaskVars, datastoreTask Task) error {
if autoscaler != nil && datastoreTask.RunsOnGCEWorkers() {
taskId := fmt.Sprintf("%s.%d", datastoreTask.GetTaskName(), datastoreTask.GetCommonCols().DatastoreKey.ID)
autoscaler.RegisterGCETask(taskId)
}
return datastoreTask.TriggerSwarmingTaskAndMail(ctx, swarm, casClient)
}
type QueryParams struct {
// If non-empty, limits to only tasks with the given username.
Username string
// Include only tasks that have completed successfully.
SuccessfulOnly bool
// Include only tasks that have completed after the specified timestamp.
CompletedAfter int
// Include only tasks that are not yet completed.
PendingOnly bool
// Include only completed tasks that are scheduled to repeat.
FutureRunsOnly bool
// Exclude tasks where page_sets is PAGESET_TYPE_DUMMY_1k.
ExcludeDummyPageSets bool
// If true, SELECT COUNT(*). If false, SELECT * and include ORDER BY and LIMIT clauses.
CountQuery bool
// First term of LIMIT clause; ignored if countQuery is true.
Offset int
// Second term of LIMIT clause; ignored if countQuery is true.
Size int
}
func DatastoreTaskQuery(ctx context.Context, prototype Task, params QueryParams) *datastore.Iterator {
q := ds.NewQuery(prototype.GetDatastoreKind())
if params.CountQuery {
q = q.KeysOnly()
}
if params.Username != "" {
q = q.Filter("Username =", params.Username)
}
if params.SuccessfulOnly {
q = q.Filter("TaskDone =", true)
q = q.Filter("Failure =", false)
}
if params.CompletedAfter != 0 {
q = q.Filter("TsCompleted >", params.CompletedAfter)
q = q.Order("TsCompleted")
}
if params.PendingOnly {
q = q.Filter("TaskDone =", false)
}
if params.FutureRunsOnly {
q = q.Filter("RepeatAfterDays >", 0)
q = q.Order("RepeatAfterDays")
q = q.Filter("TaskDone =", true)
}
if params.ExcludeDummyPageSets {
q = q.Filter("IsTestPageSet =", false)
}
if !params.CountQuery {
q = q.Order("-__key__")
q = q.Limit(params.Size)
q = q.Offset(params.Offset)
}
return ds.DS.Run(ctx, q)
}
type ClusterTelemetryIDs struct {
HighestID int64
}
func GetNextId(ctx context.Context, kind ds.Kind, task Task) (int64, error) {
datastoreIdMutex.Lock()
defer datastoreIdMutex.Unlock()
// Hit the datastore to get the current highest ID.
key := ds.NewKey(ds.CLUSTER_TELEMETRY_IDS)
key.Name = string(kind)
var nextId int64 = -1
_, err := ds.DS.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
ids := ClusterTelemetryIDs{}
if err := ds.DS.Get(ctx, key, &ids); err != nil && err != datastore.ErrNoSuchEntity {
return err
}
nextId = ids.HighestID + 1
ids.HighestID = nextId
_, err := ds.DS.Put(ctx, key, &ids)
return err
})
return nextId, err
}
type Permissions struct {
DeleteAllowed bool
RedoAllowed bool
}
type GetTasksResponse struct {
Data interface{} `json:"data"`
Permissions []Permissions `json:"permissions"`
Pagination *httputils.ResponsePagination `json:"pagination"`
IDs []int64 `json:"ids"`
}
func GetTasksHandler(prototype Task, w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
params := QueryParams{}
if ctfeutil.ParseBoolFormValue(r.FormValue("filter_by_logged_in_user")) {
params.Username = string(plogin.LoggedInAs(r))
}
params.SuccessfulOnly = ctfeutil.ParseBoolFormValue(r.FormValue("successful"))
params.PendingOnly = ctfeutil.ParseBoolFormValue(r.FormValue("not_completed"))
params.FutureRunsOnly = ctfeutil.ParseBoolFormValue(r.FormValue("include_future_runs"))
params.ExcludeDummyPageSets = ctfeutil.ParseBoolFormValue(r.FormValue("exclude_dummy_page_sets"))
if params.SuccessfulOnly && params.PendingOnly {
httputils.ReportError(w, fmt.Errorf("Inconsistent params: successful %v not_completed %v", r.FormValue("successful"), r.FormValue("not_completed")), "Inconsistent params", http.StatusInternalServerError)
return
}
offset, size, err := httputils.PaginationParams(r.URL.Query(), 0, DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE)
if err == nil {
params.Offset, params.Size = offset, size
} else {
httputils.ReportError(w, err, "Failed to get pagination params", http.StatusInternalServerError)
return
}
params.CountQuery = false
it := DatastoreTaskQuery(r.Context(), prototype, params)
data, err := prototype.Query(it)
if err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to query %s tasks", prototype.GetTaskName()), http.StatusInternalServerError)
return
}
params.CountQuery = true
it = DatastoreTaskQuery(r.Context(), prototype, params)
count := 0
for {
var i int
_, err := it.Next(i)
if err == iterator.Done {
break
} else if err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to query %s tasks", prototype.GetTaskName()), http.StatusInternalServerError)
return
}
count++
}
pagination := &httputils.ResponsePagination{
Offset: offset,
Size: size,
Total: count,
}
tasks := AsTaskSlice(data)
ids := make([]int64, len(tasks))
permissions := make([]Permissions, len(tasks))
for i := 0; i < len(tasks); i++ {
deleteAllowed, _ := canDeleteTask(tasks[i], r)
redoAllowed, _ := canRedoTask(tasks[i], r)
permissions[i] = Permissions{DeleteAllowed: deleteAllowed, RedoAllowed: redoAllowed}
ids[i] = tasks[i].GetCommonCols().DatastoreKey.ID
}
// jsonResponse := map[string]interface{}{
jsonResponse := GetTasksResponse{
Data: data,
Permissions: permissions,
Pagination: pagination,
IDs: ids,
}
if err := json.NewEncoder(w).Encode(jsonResponse); err != nil {
httputils.ReportError(w, err, "Failed to encode JSON", http.StatusInternalServerError)
return
}
}
// Returns true if the given task can be deleted by the logged-in user; otherwise false and an error
// describing the problem.
func canDeleteTask(task Task, r *http.Request) (bool, error) {
if !isAdmin(r) {
username := string(plogin.LoggedInAs(r))
taskUser := task.GetCommonCols().Username
if taskUser != username {
return false, fmt.Errorf("Task is owned by %s but you are logged in as %s", taskUser, username)
}
}
return true, nil
}
// Returns true if the given task can be re-added by the logged-in user; otherwise false and an
// error describing the problem.
func canRedoTask(task Task, r *http.Request) (bool, error) {
if !task.GetCommonCols().TaskDone {
return false, fmt.Errorf("Cannot redo pending tasks.")
}
return true, nil
}
func getClosedTasksChannel(tasks []*apipb.TaskResultResponse) chan *apipb.TaskResultResponse {
// Create channel that contains specified tasks. This channel will be consumed by the worker
// pool in DeleteTaskHandler.
tasksChannel := make(chan *apipb.TaskResultResponse, len(tasks))
for _, t := range tasks {
tasksChannel <- t
}
close(tasksChannel)
return tasksChannel
}
type DeleteTaskRequest struct {
Id int64 `json:"id"`
}
func DeleteTaskHandler(prototype Task, w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
var req DeleteTaskRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
httputils.ReportError(w, err, "Failed to parse delete request", http.StatusInternalServerError)
return
}
defer skutil.Close(r.Body)
key := ds.NewKey(prototype.GetDatastoreKind())
key.ID = req.Id
task, err := prototype.Get(r.Context(), key)
if err != nil {
httputils.ReportError(w, err, "Failed to find requested task", http.StatusInternalServerError)
return
}
// If the task is currently running then will have to cancel all of its swarming tasks as well.
if task.GetCommonCols().TsStarted != 0 && task.GetCommonCols().TsCompleted == 0 {
runID := GetRunID(task)
tasks, err := swarmingv2.ListTasksHelper(r.Context(), swarm, &apipb.TasksWithPerfRequest{
State: apipb.StateQuery_QUERY_ALL,
Tags: []string{fmt.Sprintf("runid:%s", runID)},
})
if err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Could not list tasks for %s", runID), http.StatusInternalServerError)
}
sklog.Infof("Starting cancelation of %d tasks...", len(tasks))
tasksChannel := getClosedTasksChannel(tasks)
var wg sync.WaitGroup
// Loop through workers in the worker pool.
for i := 0; i < CANCEL_SWARMING_TASKS_WORKER_POOL_SIZE; i++ {
// Increment the WaitGroup counter.
wg.Add(1)
// Create and run a goroutine closure that cancels tasks.
go func() {
// Decrement the WaitGroup counter when the goroutine completes.
defer wg.Done()
for t := range tasksChannel {
resp, err := swarm.CancelTask(r.Context(), &apipb.TaskCancelRequest{
TaskId: t.TaskId,
KillRunning: true,
})
if err != nil || !resp.Canceled {
sklog.Errorf("Could not cancel %s: %s", t.TaskId, err)
continue
}
sklog.Infof("Canceled %s", t.TaskId)
}
}()
}
// Wait for all spawned goroutines to complete
wg.Wait()
sklog.Infof("Cancelled %d tasks.", len(tasks))
// Send completion email since tasks did start and there was a corresponding start email.
skutil.LogErr(task.SendCompletionEmail(r.Context(), false))
}
if err := ds.DS.Delete(r.Context(), key); err != nil {
httputils.ReportError(w, err, "Failed to delete", http.StatusInternalServerError)
return
}
sklog.Infof("%s task with ID %d deleted by %s", prototype.GetTaskName(), req.Id, string(plogin.LoggedInAs(r)))
}
type RedoTaskRequest struct {
Id int64 `json:"id"`
}
func RedoTaskHandler(prototype Task, w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
var req RedoTaskRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
httputils.ReportError(w, err, "Failed to parse redo request", http.StatusInternalServerError)
return
}
defer skutil.Close(r.Body)
key := ds.NewKey(prototype.GetDatastoreKind())
key.ID = req.Id
task, err := prototype.Get(r.Context(), key)
if err != nil {
httputils.ReportError(w, err, "Failed to find requested task", http.StatusInternalServerError)
return
}
addTaskVars, err := task.GetPopulatedAddTaskVars()
if err != nil {
httputils.ReportError(w, err, "Could not GetPopulatedAddTaskVars", http.StatusInternalServerError)
}
// Replace the username with the new requester.
addTaskVars.GetAddTaskCommonVars().Username = string(plogin.LoggedInAs(r))
// Do not preserve repeat_after_days for retried tasks. Carrying over
// repeat_after_days causes the same task to be unknowingly repeated.
addTaskVars.GetAddTaskCommonVars().RepeatAfterDays = "0"
if err := AddAndTriggerTask(r.Context(), addTaskVars); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to insert or trigger %T task", task), http.StatusInternalServerError)
return
}
}
type EditTaskRequest struct {
Id int64 `json:"id"`
}
func EditTaskHandler(prototype Task, w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
var req EditTaskRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
httputils.ReportError(w, err, "Failed to parse edit request", http.StatusInternalServerError)
return
}
defer skutil.Close(r.Body)
key := ds.NewKey(prototype.GetDatastoreKind())
key.ID = req.Id
task, err := prototype.Get(r.Context(), key)
if err != nil {
httputils.ReportError(w, err, "Failed to find requested task", http.StatusInternalServerError)
return
}
addTaskVars, err := task.GetPopulatedAddTaskVars()
if err != nil {
httputils.ReportError(w, err, "Could not GetPopulatedAddTaskVars", http.StatusInternalServerError)
}
if err = json.NewEncoder(w).Encode(addTaskVars); err != nil {
httputils.ReportError(w, err, "Failed to encode JSON", http.StatusInternalServerError)
return
}
}
type PageSet struct {
Key string `json:"key"`
Description string `json:"description"`
}
// ByPageSetDesc implements sort.Interface to order PageSets by their descriptions.
type ByPageSetDesc []PageSet
func (p ByPageSetDesc) Len() int { return len(p) }
func (p ByPageSetDesc) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p ByPageSetDesc) Less(i, j int) bool { return p[i].Description < p[j].Description }
func pageSetsHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
pageSets := []PageSet{}
for pageSet := range ctutil.PagesetTypeToInfo {
p := PageSet{
Key: pageSet,
Description: ctutil.PagesetTypeToInfo[pageSet].Description,
}
pageSets = append(pageSets, p)
}
sort.Sort(ByPageSetDesc(pageSets))
if err := json.NewEncoder(w).Encode(pageSets); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to encode JSON: %v", err), http.StatusInternalServerError)
return
}
}
var gerritURLRegexp = regexp.MustCompile("^(https?://(?:[a-z]+)-review\\.googlesource\\.com)/(?:#/)?c/(?:.+/)?(\\d{3,})(?:/(\\d+)?)?$")
type clDetail struct {
Issue int64 `json:"issue"`
Subject string `json:"subject"`
Modified string `json:"modified"`
Project string `json:"project"`
Patchsets []int `json:"patchsets"`
CodereviewURL string
}
type CLDataResponse struct {
CL string `json:"cl"`
Subject string `json:"subject"`
URL string `json:"url"`
Modified string `json:"modified"`
ChromiumPatch string `json:"chromium_patch"`
SkiaPatch string `json:"skia_patch"`
V8Patch string `json:"v8_patch"`
CatapultPatch string `json:"catapult_patch"`
}
func gatherCLData(detail clDetail, patch string) (*CLDataResponse, error) {
clData := &CLDataResponse{
CL: strconv.FormatInt(detail.Issue, 10),
Subject: detail.Subject,
URL: detail.CodereviewURL,
}
modifiedTime, err := time.Parse("2006-01-02 15:04:05.999999", detail.Modified)
if err != nil {
sklog.Errorf("Unable to parse modified time for CL %d; input '%s', got %v", detail.Issue, detail.Modified, err)
} else {
clData.Modified = modifiedTime.UTC().Format(ctutil.TS_FORMAT)
}
switch detail.Project {
case "chromium", "chromium/src":
clData.ChromiumPatch = patch
case "skia":
clData.SkiaPatch = patch
case "v8/v8":
clData.V8Patch = patch
case "catapult":
clData.CatapultPatch = patch
default:
sklog.Errorf("CL project is %s; only chromium, skia, v8, catapult are supported.", detail.Project)
}
return clData, nil
}
func getCLHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
clURLString := r.FormValue("cl")
var detail clDetail
var patch string
var err error
// See if it is a Gerrit URL.
matches := gerritURLRegexp.FindStringSubmatch(clURLString)
if len(matches) < 3 || matches[1] == "" || matches[2] == "" {
// Return successful empty response, since the user could still be typing.
if err := json.NewEncoder(w).Encode(map[string]interface{}{}); err != nil {
httputils.ReportError(w, err, "Failed to encode JSON", http.StatusInternalServerError)
}
return
}
crURL := matches[1]
clString := matches[2]
g, err := gerrit.NewGerrit(crURL, httpClient)
if err != nil {
httputils.ReportError(w, err, "Failed to talk to Gerrit", http.StatusInternalServerError)
return
}
cl, err := strconv.ParseInt(clString, 10, 32)
if err != nil {
httputils.ReportError(w, err, "Invalid Gerrit CL number", http.StatusInternalServerError)
return
}
change, err := g.GetIssueProperties(r.Context(), cl)
if err != nil {
httputils.ReportError(w, err, "Failed to get issue properties from Gerrit", http.StatusInternalServerError)
return
}
// Use latest patchset if patchset is not specified in the clURL.
patchset := strconv.Itoa(len(change.Patchsets))
if len(matches) > 3 && matches[3] != "" {
patchset = matches[3]
}
// Check to see if the change has any open dependencies
patchsetInt, err := strconv.Atoi(patchset)
if err != nil {
httputils.ReportError(w, err, "Patchset must be an int", http.StatusBadRequest)
return
}
activeDep, err := g.HasOpenDependency(r.Context(), cl, patchsetInt)
if err != nil {
httputils.ReportError(w, err, "Failed to get related changes from Gerrit", http.StatusInternalServerError)
return
}
if activeDep {
httputils.ReportError(w, err, fmt.Sprintf("This CL has an open dependency. Please squash your changes into a single CL."), http.StatusInternalServerError)
return
}
// Check to see if the change has a binary file.
isBinary, err := g.IsBinaryPatch(r.Context(), cl, patchset)
if err != nil {
httputils.ReportError(w, err, "Failed to get list of files from Gerrit", http.StatusInternalServerError)
return
}
if isBinary {
httputils.ReportError(w, err, fmt.Sprintf("CT cannot get a full index for binary files via the Gerrit API. Details in skbug.com/7302."), http.StatusInternalServerError)
return
}
detail = clDetail{
Issue: cl,
Subject: change.Subject,
Modified: change.UpdatedString,
Project: change.Project,
CodereviewURL: fmt.Sprintf("%s/c/%d/%s", crURL, cl, patchset),
}
patch, err = g.GetPatch(r.Context(), cl, patchset, "")
if err != nil {
httputils.ReportError(w, err, "Failed to download patch from Gerrit", http.StatusInternalServerError)
return
}
clData, err := gatherCLData(detail, patch)
if err != nil {
httputils.ReportError(w, err, "Failed to get CL data", http.StatusInternalServerError)
return
}
if err = json.NewEncoder(w).Encode(clData); err != nil {
httputils.ReportError(w, err, "Failed to encode JSON", http.StatusInternalServerError)
return
}
}
type BenchmarksPlatformsResponse struct {
Benchmarks map[string]string `json:"benchmarks"`
Platforms map[string]string `json:"platforms"`
}
func benchmarksPlatformsHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
data := BenchmarksPlatformsResponse{
Benchmarks: ctutil.SupportedBenchmarksToDoc,
Platforms: ctutil.SupportedPlatformsToDesc,
}
if err := json.NewEncoder(w).Encode(data); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to encode JSON: %v", err), http.StatusInternalServerError)
return
}
}
type TaskPrioritiesResponse struct {
TaskPriorities map[int]string `json:"task_priorities"`
}
func taskPrioritiesHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
data := TaskPrioritiesResponse{
TaskPriorities: ctutil.TaskPrioritiesToDesc,
}
if err := json.NewEncoder(w).Encode(data); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to encode JSON: %v", err), http.StatusInternalServerError)
return
}
}
func GetEmailRecipients(runOwner string, ccList []string) []string {
emails := []string{runOwner}
if ccList != nil {
emails = append(emails, ccList...)
}
return emails
}
// Additionally adds CtAdmins to the email list.
func GetFailureEmailRecipients(runOwner string, ccList []string) []string {
emails := []string{runOwner}
if ccList != nil {
emails = append(emails, ccList...)
}
emails = append(emails, ctutil.CtAdmins...)
return emails
}
func isAdminHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
data := map[string]interface{}{
"isAdmin": isAdmin(r),
}
if err := json.NewEncoder(w).Encode(data); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to encode JSON: %v", err), http.StatusInternalServerError)
return
}
}
func AddHandlers(externalRouter chi.Router) {
externalRouter.Post("/"+ctfeutil.PAGE_SETS_PARAMETERS_POST_URI, pageSetsHandler)
externalRouter.Post("/"+ctfeutil.CL_DATA_POST_URI, getCLHandler)
externalRouter.Post("/"+ctfeutil.BENCHMARKS_PLATFORMS_POST_URI, benchmarksPlatformsHandler)
externalRouter.Get("/"+ctfeutil.TASK_PRIORITIES_GET_URI, taskPrioritiesHandler)
externalRouter.Get("/"+ctfeutil.IS_ADMIN_GET_URI, isAdminHandler)
}
func Init(ctx context.Context, local, enableAutoscaler bool, ctfeURL, serviceAccountFileFlagVal string, swarmingClient swarmingv2.SwarmingV2Client, cas cas.CAS, getGCETasksCount func(ctx context.Context) (int, error)) error {
WebappURL = ctfeURL
if WebappURL[len(WebappURL)-1:] != "/" {
WebappURL = WebappURL + "/"
}
ServiceAccountFile = serviceAccountFileFlagVal
swarm = swarmingClient
casClient = cas
ts, err := google.DefaultTokenSource(ctx, auth.ScopeGerrit)
if err != nil {
sklog.Fatal(err)
}
httpClient = httputils.DefaultClientConfig().WithTokenSource(ts).With2xxOnly().Client()
if enableAutoscaler {
autoscaler, err = ct_autoscaler.NewCTAutoscaler(ctx, local, getGCETasksCount)
if err != nil {
return fmt.Errorf("Could not instantiate the CT autoscaler: %s", err)
}
}
return err
}