blob: d92e4bd425a1c769f4ce9638aef88864a04f9bb4 [file] [log] [blame]
/*
Provides roll-up statuses for Skia build/test/perf.
*/
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
"os"
"path"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"text/template"
"time"
"unicode"
"cloud.google.com/go/bigtable"
"cloud.google.com/go/datastore"
"cloud.google.com/go/pubsub"
"github.com/google/uuid"
"github.com/gorilla/mux"
autoroll_status "go.skia.org/infra/autoroll/go/status"
"go.skia.org/infra/go/allowed"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/ds"
"go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/gitstore/bt_gitstore"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/login"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/skiaversion"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/status/go/capacity"
"go.skia.org/infra/status/go/incremental"
"go.skia.org/infra/status/go/lkgr"
task_driver_db "go.skia.org/infra/task_driver/go/db"
bigtable_db "go.skia.org/infra/task_driver/go/db/bigtable"
"go.skia.org/infra/task_driver/go/handlers"
"go.skia.org/infra/task_driver/go/logs"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/db/cache"
"go.skia.org/infra/task_scheduler/go/db/firestore"
"go.skia.org/infra/task_scheduler/go/types"
"go.skia.org/infra/task_scheduler/go/window"
"google.golang.org/api/option"
)
const (
APPNAME = "status"
// The chrome infra auth group to use for restricting admin rights.
AUTH_GROUP_ADMIN_RIGHTS = "google/skia-root@google.com"
// The chrome infra auth group to use for restricting edit rights.
AUTH_GROUP_EDIT_RIGHTS = "google/skia-staff@google.com"
DEFAULT_COMMITS_TO_LOAD = 35
MAX_COMMITS_TO_LOAD = 100
SKIA_REPO = "skia"
INFRA_REPO = "infra"
)
var (
autorollMtx sync.RWMutex
autorollStatus []byte = nil
capacityClient *capacity.CapacityClient = nil
capacityTemplate *template.Template = nil
commitsTemplate *template.Template = nil
iCache *incremental.IncrementalCache = nil
lkgrObj *lkgr.LKGR = nil
taskDb db.RemoteDB = nil
taskDriverDb task_driver_db.DB = nil
taskDriverLogs *logs.LogsManager = nil
tasksPerCommit *tasksPerCommitCache = nil
tCache cache.TaskCache = nil
// AUTOROLLERS maps autoroll frontend host to maps of roller IDs to
// their human-friendly display names.
AUTOROLLERS = map[string]map[string]string{
"autoroll.skia.org": {
"skia-flutter-autoroll": "Flutter",
"skia-autoroll": "Chrome",
"angle-skia-autoroll": "ANGLE",
"skcms-skia-autoroll": "skcms",
"swiftshader-skia-autoroll": "SwiftSh",
},
"skia-autoroll.corp.goog": {
"android-master-autoroll": "Android",
"google3-autoroll": "Google3",
},
}
)
// flags
var (
chromeInfraAuthJWT = flag.String("chrome_infra_auth_jwt", "/var/secrets/skia-public-auth/key.json", "The JWT key for the service account that has access to chrome infra auth.")
// TODO(borenet): Combine btInstance and firestoreInstance.
btInstance = flag.String("bigtable_instance", "", "BigTable instance to use.")
btProject = flag.String("bigtable_project", "", "GCE project to use for BigTable.")
capacityRecalculateInterval = flag.Duration("capacity_recalculate_interval", 10*time.Minute, "How often to re-calculate capacity statistics.")
firestoreInstance = flag.String("firestore_instance", "", "Firestore instance to use, eg. \"production\"")
gitstoreTable = flag.String("gitstore_bt_table", "git-repos2", "BigTable table used for GitStore.")
host = flag.String("host", "localhost", "HTTP service host")
port = flag.String("port", ":8002", "HTTP service port (e.g., ':8002')")
promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')")
repoUrls = common.NewMultiStringFlag("repo", nil, "Repositories to query for status.")
resourcesDir = flag.String("resources_dir", "", "The directory to find templates, JS, and CSS files. If blank the current directory will be used.")
swarmingUrl = flag.String("swarming_url", "https://chromium-swarm.appspot.com", "URL of the Swarming server.")
taskSchedulerUrl = flag.String("task_scheduler_url", "https://task-scheduler.skia.org", "URL of the Task Scheduler server.")
testing = flag.Bool("testing", false, "Set to true for locally testing rules. No email will be sent.")
podId string
repos repograph.Map
)
// StringIsInteresting returns true iff the string contains non-whitespace characters.
func StringIsInteresting(s string) bool {
for _, c := range s {
if !unicode.IsSpace(c) {
return true
}
}
return false
}
func reloadTemplates() {
// Change the current working directory to two directories up from this source file so that we
// can read templates and serve static (res/) files.
if *resourcesDir == "" {
_, filename, _, _ := runtime.Caller(0)
*resourcesDir = filepath.Join(filepath.Dir(filename), "../..")
}
commitsTemplate = template.Must(template.ParseFiles(
filepath.Join(*resourcesDir, "templates/commits.html"),
filepath.Join(*resourcesDir, "templates/header.html"),
))
capacityTemplate = template.Must(template.ParseFiles(
filepath.Join(*resourcesDir, "templates/capacity.html"),
filepath.Join(*resourcesDir, "templates/header.html"),
))
}
func Init() {
reloadTemplates()
}
func getIntParam(name string, r *http.Request) (*int64, error) {
raw, ok := r.URL.Query()[name]
if !ok {
return nil, nil
}
v, err := strconv.ParseInt(raw[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("Invalid integer value for parameter %q", name)
}
return &v, nil
}
func getStringParam(name string, r *http.Request) string {
raw, ok := r.URL.Query()[name]
if !ok {
return ""
}
return raw[0]
}
// repoUrlToName returns a short repo nickname given a full repo URL.
func repoUrlToName(repoUrl string) string {
// Special case: we like "infra" better than "buildbot".
if repoUrl == common.REPO_SKIA_INFRA {
return "infra"
}
return strings.TrimSuffix(path.Base(repoUrl), ".git")
}
// repoNameToUrl returns a full repo URL given a short nickname, or an error
// if no matching repo URL is found.
func repoNameToUrl(repoName string) (string, error) {
// Special case: we like "infra" better than "buildbot".
if repoName == "infra" {
return common.REPO_SKIA_INFRA, nil
}
// Search the list of repos used by this server.
for _, repoUrl := range *repoUrls {
if repoUrlToName(repoUrl) == repoName {
return repoUrl, nil
}
}
return "", fmt.Errorf("No such repo.")
}
// getRepo returns a short repo nickname and a full repo URL based on the URL
// path of the given http.Request.
func getRepo(r *http.Request) (string, string, error) {
repoPath, _ := mux.Vars(r)["repo"]
repoUrl, err := repoNameToUrl(repoPath)
if err != nil {
return "", "", err
}
return repoUrlToName(repoUrl), repoUrl, nil
}
// getRepoNames returns the nicknames for all repos on this server.
func getRepoNames() []string {
repoNames := make([]string, 0, len(*repoUrls))
for _, repoUrl := range *repoUrls {
repoNames = append(repoNames, repoUrlToName(repoUrl))
}
return repoNames
}
func commentsForRepoHandler(w http.ResponseWriter, r *http.Request) {
defer metrics2.FuncTimer().Stop()
w.Header().Set("Content-Type", "application/json")
_, repoUrl, err := getRepo(r)
if err != nil {
httputils.ReportError(w, err, err.Error(), http.StatusInternalServerError)
return
}
comments, err := taskDb.GetCommentsForRepos([]string{repoUrl}, time.Now().Add(-10000*time.Hour))
if err != nil {
httputils.ReportError(w, err, err.Error(), http.StatusInternalServerError)
return
}
if err := json.NewEncoder(w).Encode(comments); err != nil {
sklog.Errorf("Failed to encode comments as JSON: %s", err)
}
}
func incrementalJsonHandler(w http.ResponseWriter, r *http.Request) {
defer metrics2.FuncTimer().Stop()
w.Header().Set("Content-Type", "application/json")
_, repoUrl, err := getRepo(r)
if err != nil {
httputils.ReportError(w, err, err.Error(), http.StatusInternalServerError)
return
}
from, err := getIntParam("from", r)
if err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Invalid parameter for \"from\": %s", err), http.StatusInternalServerError)
return
}
to, err := getIntParam("to", r)
if err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Invalid parameter for \"to\": %s", err), http.StatusInternalServerError)
return
}
n, err := getIntParam("n", r)
if err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Invalid parameter for \"n\": %s", err), http.StatusInternalServerError)
return
}
expectPodId := getStringParam("pod", r)
numCommits := DEFAULT_COMMITS_TO_LOAD
if n != nil {
numCommits = int(*n)
if numCommits > MAX_COMMITS_TO_LOAD {
numCommits = MAX_COMMITS_TO_LOAD
}
}
update := struct {
*incremental.Update
Pod string `json:"pod"`
}{
Pod: podId,
}
if (expectPodId != "" && expectPodId != podId) || from == nil {
update.Update, err = iCache.GetAll(repoUrl, numCommits)
} else {
fromTime := time.Unix(0, (*from)*util.MILLIS_TO_NANOS)
if to != nil {
toTime := time.Unix(0, (*to)*util.MILLIS_TO_NANOS)
update.Update, err = iCache.GetRange(repoUrl, fromTime, toTime, numCommits)
} else {
update.Update, err = iCache.Get(repoUrl, fromTime, numCommits)
}
}
if err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to retrieve updates: %s", err), http.StatusInternalServerError)
return
}
if err := json.NewEncoder(w).Encode(update); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to encode response: %s", err), http.StatusInternalServerError)
return
}
}
func addTaskCommentHandler(w http.ResponseWriter, r *http.Request) {
defer metrics2.FuncTimer().Stop()
defer util.Close(r.Body)
w.Header().Set("Content-Type", "application/json")
id, ok := mux.Vars(r)["id"]
if !ok {
httputils.ReportError(w, fmt.Errorf("No task ID given!"), "No task ID given!", http.StatusInternalServerError)
return
}
task, err := taskDb.GetTaskById(id)
if err != nil {
httputils.ReportError(w, err, "Failed to obtain task details.", http.StatusInternalServerError)
return
}
comment := struct {
Comment string `json:"comment"`
}{}
if err := json.NewDecoder(r.Body).Decode(&comment); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to add comment: %s", err), http.StatusInternalServerError)
return
}
c := types.TaskComment{
Repo: task.Repo,
Revision: task.Revision,
Name: task.Name,
Timestamp: time.Now().UTC(),
TaskId: task.Id,
User: login.LoggedInAs(r),
Message: comment.Comment,
}
if err := taskDb.PutTaskComment(&c); err != nil {
httputils.ReportError(w, nil, fmt.Sprintf("Failed to add comment: %s", err), http.StatusInternalServerError)
return
}
if err := iCache.Update(context.Background(), false); err != nil {
httputils.ReportError(w, nil, fmt.Sprintf("Failed to update cache: %s", err), http.StatusInternalServerError)
return
}
}
func deleteTaskCommentHandler(w http.ResponseWriter, r *http.Request) {
defer metrics2.FuncTimer().Stop()
w.Header().Set("Content-Type", "application/json")
id, ok := mux.Vars(r)["id"]
if !ok {
httputils.ReportError(w, fmt.Errorf("No task ID given!"), "No task ID given!", http.StatusInternalServerError)
return
}
task, err := taskDb.GetTaskById(id)
if err != nil {
httputils.ReportError(w, err, "Failed to obtain task details.", http.StatusInternalServerError)
return
}
timestamp, err := strconv.ParseInt(mux.Vars(r)["timestamp"], 10, 64)
if err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Invalid comment id: %v", err), http.StatusInternalServerError)
return
}
c := &types.TaskComment{
Repo: task.Repo,
Revision: task.Revision,
Name: task.Name,
Timestamp: time.Unix(0, timestamp),
TaskId: task.Id,
}
if err := taskDb.DeleteTaskComment(c); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to delete comment: %v", err), http.StatusInternalServerError)
return
}
if err := iCache.Update(context.Background(), false); err != nil {
httputils.ReportError(w, nil, fmt.Sprintf("Failed to update cache: %s", err), http.StatusInternalServerError)
return
}
}
func addTaskSpecCommentHandler(w http.ResponseWriter, r *http.Request) {
defer metrics2.FuncTimer().Stop()
w.Header().Set("Content-Type", "application/json")
taskSpec, ok := mux.Vars(r)["taskSpec"]
if !ok {
httputils.ReportError(w, nil, "No taskSpec provided!", http.StatusInternalServerError)
return
}
_, repoUrl, err := getRepo(r)
if err != nil {
httputils.ReportError(w, err, err.Error(), http.StatusInternalServerError)
return
}
comment := struct {
Comment string `json:"comment"`
Flaky bool `json:"flaky"`
IgnoreFailure bool `json:"ignoreFailure"`
}{}
if err := json.NewDecoder(r.Body).Decode(&comment); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to add comment: %v", err), http.StatusInternalServerError)
return
}
defer util.Close(r.Body)
c := types.TaskSpecComment{
Repo: repoUrl,
Name: taskSpec,
Timestamp: time.Now().UTC(),
User: login.LoggedInAs(r),
Flaky: comment.Flaky,
IgnoreFailure: comment.IgnoreFailure,
Message: comment.Comment,
}
if err := taskDb.PutTaskSpecComment(&c); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to add task spec comment: %v", err), http.StatusInternalServerError)
return
}
if err := iCache.Update(context.Background(), false); err != nil {
httputils.ReportError(w, nil, fmt.Sprintf("Failed to update cache: %s", err), http.StatusInternalServerError)
return
}
}
func deleteTaskSpecCommentHandler(w http.ResponseWriter, r *http.Request) {
defer metrics2.FuncTimer().Stop()
w.Header().Set("Content-Type", "application/json")
taskSpec, ok := mux.Vars(r)["taskSpec"]
if !ok {
httputils.ReportError(w, nil, "No taskSpec provided!", http.StatusInternalServerError)
return
}
_, repoUrl, err := getRepo(r)
if err != nil {
httputils.ReportError(w, err, err.Error(), http.StatusInternalServerError)
return
}
timestamp, err := strconv.ParseInt(mux.Vars(r)["timestamp"], 10, 64)
if err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Invalid timestamp: %v", err), http.StatusInternalServerError)
return
}
c := types.TaskSpecComment{
Repo: repoUrl,
Name: taskSpec,
Timestamp: time.Unix(0, timestamp),
}
if err := taskDb.DeleteTaskSpecComment(&c); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to delete comment: %v", err), http.StatusInternalServerError)
return
}
if err := iCache.Update(context.Background(), false); err != nil {
httputils.ReportError(w, nil, fmt.Sprintf("Failed to update cache: %s", err), http.StatusInternalServerError)
return
}
}
func addCommitCommentHandler(w http.ResponseWriter, r *http.Request) {
defer metrics2.FuncTimer().Stop()
w.Header().Set("Content-Type", "application/json")
_, repoUrl, err := getRepo(r)
if err != nil {
httputils.ReportError(w, err, err.Error(), http.StatusInternalServerError)
return
}
commit := mux.Vars(r)["commit"]
comment := struct {
Comment string `json:"comment"`
IgnoreFailure bool `json:"ignoreFailure"`
}{}
if err := json.NewDecoder(r.Body).Decode(&comment); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to add comment: %v", err), http.StatusInternalServerError)
return
}
defer util.Close(r.Body)
c := types.CommitComment{
Repo: repoUrl,
Revision: commit,
Timestamp: time.Now().UTC(),
User: login.LoggedInAs(r),
IgnoreFailure: comment.IgnoreFailure,
Message: comment.Comment,
}
if err := taskDb.PutCommitComment(&c); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to add commit comment: %s", err), http.StatusInternalServerError)
return
}
if err := iCache.Update(context.Background(), false); err != nil {
httputils.ReportError(w, nil, fmt.Sprintf("Failed to update cache: %s", err), http.StatusInternalServerError)
return
}
}
func deleteCommitCommentHandler(w http.ResponseWriter, r *http.Request) {
defer metrics2.FuncTimer().Stop()
w.Header().Set("Content-Type", "application/json")
_, repoUrl, err := getRepo(r)
if err != nil {
httputils.ReportError(w, err, err.Error(), http.StatusInternalServerError)
return
}
commit := mux.Vars(r)["commit"]
timestamp, err := strconv.ParseInt(mux.Vars(r)["timestamp"], 10, 64)
if err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Invalid comment id: %v", err), http.StatusInternalServerError)
return
}
c := types.CommitComment{
Repo: repoUrl,
Revision: commit,
Timestamp: time.Unix(0, timestamp),
}
if err := taskDb.DeleteCommitComment(&c); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to delete commit comment: %s", err), http.StatusInternalServerError)
return
}
if err := iCache.Update(context.Background(), false); err != nil {
httputils.ReportError(w, nil, fmt.Sprintf("Failed to update cache: %s", err), http.StatusInternalServerError)
return
}
}
type commitsTemplateData struct {
Repo string
Title string
RepoBase string
Repos []string
}
func defaultRedirectHandler(w http.ResponseWriter, r *http.Request) {
defaultRepo := repoUrlToName((*repoUrls)[0])
http.Redirect(w, r, fmt.Sprintf("/repo/%s", defaultRepo), http.StatusFound)
}
func statusHandler(w http.ResponseWriter, r *http.Request) {
defer metrics2.FuncTimer().Stop()
w.Header().Set("Content-Type", "text/html")
repoName, repoUrl, err := getRepo(r)
if err != nil {
httputils.ReportError(w, err, err.Error(), http.StatusInternalServerError)
return
}
// Don't use cached templates in testing mode.
if *testing {
reloadTemplates()
}
d := commitsTemplateData{
Repo: repoName,
RepoBase: fmt.Sprintf("%s/+/", repoUrl),
Repos: getRepoNames(),
Title: fmt.Sprintf("Status: %s", repoName),
}
if err := commitsTemplate.Execute(w, d); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to expand template: %v", err), http.StatusInternalServerError)
}
}
func capacityHandler(w http.ResponseWriter, r *http.Request) {
defer metrics2.FuncTimer().Stop()
w.Header().Set("Content-Type", "text/html")
// Don't use cached templates in testing mode.
if *testing {
reloadTemplates()
}
page := struct {
Repos []string
}{
Repos: getRepoNames(),
}
if err := capacityTemplate.Execute(w, page); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to expand template: %v", err), http.StatusInternalServerError)
}
}
func capacityStatsHandler(w http.ResponseWriter, r *http.Request) {
defer metrics2.FuncTimer().Stop()
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(capacityClient.CapacityMetrics()); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to encode response: %s", err), http.StatusInternalServerError)
return
}
}
// buildProgressHandler returns the number of finished builds at the given
// commit, compared to that of an older commit.
func buildProgressHandler(w http.ResponseWriter, r *http.Request) {
defer metrics2.FuncTimer().Stop()
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*")
// Get the number of finished tasks for the requested commit.
hash := r.FormValue("commit")
if !util.ValidateCommit(hash) {
httputils.ReportError(w, nil, fmt.Sprintf("%q is not a valid commit hash.", hash), http.StatusInternalServerError)
return
}
_, repoUrl, err := getRepo(r)
if err != nil {
httputils.ReportError(w, err, err.Error(), http.StatusInternalServerError)
return
}
tasks, err := tCache.GetTasksForCommits(repoUrl, []string{hash})
if err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to get the number of finished builds."), http.StatusInternalServerError)
return
}
finished := 0
for _, byCommit := range tasks {
for _, t := range byCommit {
if t.Done() {
finished++
}
}
}
tasksForCommit, err := tasksPerCommit.Get(context.Background(), types.RepoState{
Repo: repoUrl,
Revision: hash,
})
if err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to get number of tasks at commit."), http.StatusInternalServerError)
return
}
proportion := 1.0
if tasksForCommit > 0 {
proportion = float64(finished) / float64(tasksForCommit)
}
res := struct {
Commit string `json:"commit"`
FinishedTasks int `json:"finishedTasks"`
FinishedProportion float64 `json:"finishedProportion"`
TotalTasks int `json:"totalTasks"`
}{
Commit: hash,
FinishedTasks: finished,
FinishedProportion: proportion,
TotalTasks: tasksForCommit,
}
if err := json.NewEncoder(w).Encode(res); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to encode JSON."), http.StatusInternalServerError)
return
}
}
func lkgrHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html")
if _, err := w.Write([]byte(lkgrObj.Get())); err != nil {
httputils.ReportError(w, err, "Failed to write response.", http.StatusInternalServerError)
return
}
}
func autorollStatusHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
autorollMtx.RLock()
defer autorollMtx.RUnlock()
if _, err := w.Write(autorollStatus); err != nil {
httputils.ReportError(w, err, "Failed to write response.", http.StatusInternalServerError)
return
}
}
func runServer(serverURL string) {
r := mux.NewRouter()
r.HandleFunc("/", defaultRedirectHandler)
r.HandleFunc("/repo/{repo}", statusHandler)
r.HandleFunc("/capacity", capacityHandler)
r.HandleFunc("/capacity/json", capacityStatsHandler)
r.HandleFunc("/json/autorollers", autorollStatusHandler)
r.HandleFunc("/json/version", skiaversion.JsonHandler)
r.HandleFunc("/json/{repo}/all_comments", commentsForRepoHandler)
r.HandleFunc("/json/{repo}/buildProgress", buildProgressHandler)
r.HandleFunc("/json/{repo}/incremental", incrementalJsonHandler)
r.HandleFunc("/lkgr", lkgrHandler)
r.HandleFunc("/logout/", login.LogoutHandler)
r.HandleFunc("/loginstatus/", login.StatusHandler)
r.HandleFunc(login.DEFAULT_OAUTH2_CALLBACK, login.OAuth2CallbackHandler)
r.PathPrefix("/res/").HandlerFunc(httputils.MakeResourceHandler(*resourcesDir))
taskComments := r.PathPrefix("/json/tasks/{id}").Subrouter()
taskComments.HandleFunc("/comments", addTaskCommentHandler).Methods("POST")
taskComments.HandleFunc("/comments/{timestamp:[0-9]+}", deleteTaskCommentHandler).Methods("DELETE")
taskComments.Use(login.RestrictEditor)
taskSpecs := r.PathPrefix("/json/{repo}/taskSpecs/{taskSpec}").Subrouter()
taskSpecs.HandleFunc("/comments", addTaskSpecCommentHandler).Methods("POST")
taskSpecs.HandleFunc("/comments/{timestamp:[0-9]+}", deleteTaskSpecCommentHandler).Methods("DELETE")
taskSpecs.Use(login.RestrictEditor)
commits := r.PathPrefix("/json/{repo}/commits").Subrouter()
commits.HandleFunc("/{commit:[a-f0-9]+}/comments", addCommitCommentHandler).Methods("POST")
commits.HandleFunc("/{commit:[a-f0-9]+}/comments/{timestamp:[0-9]+}", deleteCommitCommentHandler).Methods("DELETE")
commits.Use(login.RestrictEditor)
handlers.AddTaskDriverHandlers(r, taskDriverDb, taskDriverLogs)
h := httputils.LoggingGzipRequestResponse(login.RestrictViewer(r))
if !*testing {
h = httputils.HealthzAndHTTPS(h)
}
http.Handle("/", h)
sklog.Infof("Ready to serve on %s", serverURL)
sklog.Fatal(http.ListenAndServe(*port, nil))
}
type autoRollStatus struct {
autoroll_status.AutoRollMiniStatus
Url string `json:"url"`
}
func main() {
// Setup flags.
common.InitWithMust(
APPNAME,
common.PrometheusOpt(promPort),
common.MetricsLoggingOpt(),
)
skiaversion.MustLogVersion()
Init()
serverURL := "https://" + *host
if *testing {
serverURL = "http://" + *host + *port
}
ctx := context.Background()
podId = os.Getenv("POD_ID")
if podId == "" {
sklog.Error("POD_ID not defined; falling back to UUID.")
podId = uuid.New().String()
}
ts, err := auth.NewDefaultTokenSource(*testing, auth.SCOPE_USERINFO_EMAIL, auth.SCOPE_GERRIT, bigtable.Scope, pubsub.ScopePubSub, datastore.ScopeDatastore)
if err != nil {
sklog.Fatal(err)
}
// Create LKGR object.
lkgrObj, err = lkgr.New(ctx)
if err != nil {
sklog.Fatalf("Failed to create LKGR: %s", err)
}
lkgrObj.UpdateLoop(10*time.Minute, ctx)
// Create remote Tasks DB.
taskDb, err = firestore.NewDBWithParams(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, ts)
if err != nil {
sklog.Fatalf("Failed to create Firestore DB client: %s", err)
}
criaTs, err := auth.NewJWTServiceAccountTokenSource("", *chromeInfraAuthJWT, auth.SCOPE_USERINFO_EMAIL)
if err != nil {
sklog.Fatal(err)
}
criaClient := httputils.DefaultClientConfig().WithTokenSource(criaTs).With2xxOnly().Client()
adminAllowed, err := allowed.NewAllowedFromChromeInfraAuth(criaClient, AUTH_GROUP_ADMIN_RIGHTS)
if err != nil {
sklog.Fatal(err)
}
editAllowed, err := allowed.NewAllowedFromChromeInfraAuth(criaClient, AUTH_GROUP_EDIT_RIGHTS)
if err != nil {
sklog.Fatal(err)
}
login.InitWithAllow(serverURL+login.DEFAULT_OAUTH2_CALLBACK, adminAllowed, editAllowed, nil)
// Check out source code.
if *repoUrls == nil {
sklog.Fatal("At least one --repo is required.")
}
btConf := &bt_gitstore.BTConfig{
ProjectID: *btProject,
InstanceID: *btInstance,
TableID: *gitstoreTable,
AppProfile: APPNAME,
}
repos, err = bt_gitstore.NewBTGitStoreMap(ctx, *repoUrls, btConf)
if err != nil {
sklog.Fatal(err)
}
sklog.Info("Checkout complete")
// Cache for buildProgressHandler.
tasksPerCommit, err = newTasksPerCommitCache(ctx, repos, 14*24*time.Hour, *btProject, *btInstance, ts)
if err != nil {
sklog.Fatalf("Failed to create tasksPerCommitCache: %s", err)
}
// Create the IncrementalCache.
w, err := window.New(time.Minute, MAX_COMMITS_TO_LOAD, repos)
if err != nil {
sklog.Fatalf("Failed to create time window: %s", err)
}
iCache, err = incremental.NewIncrementalCache(ctx, taskDb, w, repos, MAX_COMMITS_TO_LOAD, *swarmingUrl, *taskSchedulerUrl)
if err != nil {
sklog.Fatalf("Failed to create IncrementalCache: %s", err)
}
iCache.UpdateLoop(60*time.Second, ctx)
// Create a regular task cache.
tCache, err = cache.NewTaskCache(ctx, taskDb, w, nil)
if err != nil {
sklog.Fatalf("Failed to create TaskCache: %s", err)
}
lvTaskCache := metrics2.NewLiveness("status_task_cache")
go util.RepeatCtx(60*time.Second, ctx, func(ctx context.Context) {
if err := tCache.Update(); err != nil {
sklog.Errorf("Failed to update TaskCache: %s", err)
} else {
lvTaskCache.Reset()
}
})
// Capacity stats.
capacityClient = capacity.New(tasksPerCommit.tcc, tCache, repos)
capacityClient.StartLoading(ctx, *capacityRecalculateInterval)
// Periodically obtain the autoroller statuses.
if err := ds.InitWithOpt(common.PROJECT_ID, ds.AUTOROLL_NS, option.WithTokenSource(ts)); err != nil {
sklog.Fatalf("Failed to initialize datastore: %s", err)
}
updateAutorollStatus := func(ctx context.Context) error {
statuses := map[string]autoRollStatus{}
for host, subMap := range AUTOROLLERS {
for roller, friendlyName := range subMap {
s, err := autoroll_status.Get(ctx, roller)
if err != nil {
return err
}
statuses[friendlyName] = autoRollStatus{
AutoRollMiniStatus: s.AutoRollMiniStatus,
Url: fmt.Sprintf("https://%s/r/%s", host, roller),
}
}
}
b, err := json.Marshal(statuses)
if err != nil {
return err
}
autorollMtx.Lock()
defer autorollMtx.Unlock()
autorollStatus = b
return nil
}
if err := updateAutorollStatus(ctx); err != nil {
sklog.Fatal(err)
}
go util.RepeatCtx(60*time.Second, ctx, func(ctx context.Context) {
if err := updateAutorollStatus(ctx); err != nil {
sklog.Errorf("Failed to update autoroll status: %s", err)
}
})
// Create the TaskDriver DB.
taskDriverBtInstance := "staging" // Task Drivers aren't in prod yet.
taskDriverDb, err = bigtable_db.NewBigTableDB(ctx, *btProject, taskDriverBtInstance, ts)
if err != nil {
sklog.Fatal(err)
}
taskDriverLogs, err = logs.NewLogsManager(ctx, *btProject, taskDriverBtInstance, ts)
if err != nil {
sklog.Fatal(err)
}
// Run the server.
runServer(serverURL)
}