blob: a2f8005a7b06364bd1da26400edfe562e0805b6e [file] [log] [blame]
/*
Provides roll-up statuses for Skia build/test/perf.
*/
package main
import (
"context"
"flag"
"fmt"
"net/http"
"os"
"path"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
"text/template"
"time"
"unicode"
"cloud.google.com/go/bigtable"
"cloud.google.com/go/datastore"
"cloud.google.com/go/pubsub"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"go.skia.org/infra/autoroll/go/status"
autoroll_status "go.skia.org/infra/autoroll/go/status"
"go.skia.org/infra/go/alogin"
"go.skia.org/infra/go/alogin/proxylogin"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/ds"
"go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/gitiles"
"go.skia.org/infra/go/gitstore/bt_gitstore"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/status/go/capacity"
"go.skia.org/infra/status/go/incremental"
"go.skia.org/infra/status/go/lkgr"
"go.skia.org/infra/status/go/rpc"
task_driver_db "go.skia.org/infra/task_driver/go/db"
bigtable_db "go.skia.org/infra/task_driver/go/db/bigtable"
"go.skia.org/infra/task_driver/go/handlers"
"go.skia.org/infra/task_driver/go/logs"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/db/cache"
"go.skia.org/infra/task_scheduler/go/db/firestore"
"go.skia.org/infra/task_scheduler/go/window"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
)
const (
appName = "status"
defaultCommitsToLoad = 35
maxCommitsToLoad = 100
)
var (
autorollMtx sync.RWMutex
autorollStatusTwirp *rpc.GetAutorollerStatusesResponse = nil
capacityClient *capacity.CapacityClientImpl = nil
capacityTemplate *template.Template = nil
commitsTemplate *template.Template = nil
iCache *incremental.IncrementalCacheImpl = nil
lkgrObj *lkgr.LKGR = nil
taskDb db.RemoteDB = nil
taskDriverDb task_driver_db.DB = nil
taskDriverLogs *logs.LogsManager = nil
tasksPerCommit *tasksPerCommitCache = nil
tCache cache.TaskCache = nil
plogin alogin.Login
// autorollerIDsToNames maps autoroll frontend host to maps of roller IDs to
// their human-friendly display names.
autorollerIDsToNames = map[string]map[string]string{
"autoroll.skia.org": {
"skia-flutter-autoroll": "Flutter",
"skia-autoroll": "Chrome",
"angle-skia-autoroll": "ANGLE",
"dawn-skia-autoroll": "Dawn",
"skcms-skia-autoroll": "skcms",
"swiftshader-skia-autoroll": "SwiftSh",
"vulkan-deps-skia-autoroll": "VkDeps",
},
"skia-autoroll.corp.goog": {
"android-master-autoroll": "Android",
"google3-autoroll": "Google3",
},
}
)
// flags
var (
chromeInfraAuthJWT = flag.String("chrome_infra_auth_jwt", "/var/secrets/skia-public-auth/key.json", "Path to a local file, or name of a GCP secret, containing the JWT key for the service account that has access to chrome infra auth.")
// TODO(borenet): Combine btInstance and firestoreInstance.
btInstance = flag.String("bigtable_instance", "", "BigTable instance to use.")
btProject = flag.String("bigtable_project", "", "GCE project to use for BigTable.")
capacityRecalculateInterval = flag.Duration("capacity_recalculate_interval", 10*time.Minute, "How often to re-calculate capacity statistics.")
firestoreInstance = flag.String("firestore_instance", "", "Firestore instance to use, eg. \"production\"")
gitstoreTable = flag.String("gitstore_bt_table", "git-repos2", "BigTable table used for GitStore.")
host = flag.String("host", "localhost", "HTTP service host")
port = flag.String("port", ":8002", "HTTP service port (e.g., ':8002')")
promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')")
repoUrls = common.NewMultiStringFlag("repo", nil, "Repositories to query for status.")
resourcesDir = flag.String("resources_dir", "", "The directory to find templates, JS, and CSS files. If blank the current directory will be used.")
secretProject = flag.String("secret-project", "skia-infra-public", "Name of the GCP project used for secret management.")
swarmingUrl = flag.String("swarming_url", "https://chromium-swarm.appspot.com", "URL of the Swarming server.")
taskLogsUrlTemplate = flag.String("task_logs_url_template", "https://ci.chromium.org/raw/build/logs.chromium.org/skia/{{TaskID}}/+/annotations", "Template URL for direct link to logs, with {{TaskID}} as placeholder.")
taskSchedulerUrl = flag.String("task_scheduler_url", "https://task-scheduler.skia.org", "URL of the Task Scheduler server.")
testing = flag.Bool("testing", false, "Set to true for locally testing rules. No email will be sent.")
treeStatusBaseUrl = flag.String("tree_status_base_url", "https://tree-status.skia.org", "Repo specific tree status URLs will be created using this base url. Eg: https://tree-status.skia.org or https://skia-tree-status.corp.goog")
podId string
repos repograph.Map
// Repos and associated templates for creating links to their commits.
repoURLsByName map[string]string
)
// StringIsInteresting returns true iff the string contains non-whitespace characters.
func StringIsInteresting(s string) bool {
for _, c := range s {
if !unicode.IsSpace(c) {
return true
}
}
return false
}
func reloadTemplates() {
// Change the current working directory to two directories up from this source file so that we
// can read templates and serve static (res/) files.
if *resourcesDir == "" {
_, filename, _, _ := runtime.Caller(0)
*resourcesDir = filepath.Join(filepath.Dir(filename), "../..")
}
commitsTemplate = template.Must(template.ParseFiles(
filepath.Join(*resourcesDir, "dist", "status.html"),
))
capacityTemplate = template.Must(template.ParseFiles(
filepath.Join(*resourcesDir, "dist", "capacity.html"),
))
}
func Init() {
reloadTemplates()
}
// repoUrlToName returns a short repo nickname given a full repo URL.
func repoUrlToName(repoUrl string) string {
// Special case: we like "infra" better than "buildbot".
if repoUrl == common.REPO_SKIA_INFRA {
return "infra"
}
return strings.TrimSuffix(path.Base(repoUrl), ".git")
}
// repoNameToUrl returns a full repo URL given a short nickname, or an error
// if no matching repo URL is found.
func repoNameToUrl(repoName string) (string, error) {
// Special case: we like "infra" better than "buildbot".
if repoName == "infra" {
return common.REPO_SKIA_INFRA, nil
}
// Search the list of repos used by this server.
for _, repoUrl := range *repoUrls {
if repoUrlToName(repoUrl) == repoName {
return repoUrl, nil
}
}
return "", fmt.Errorf("No such repo.")
}
// Same as above, for new WIP Twirp server.
// TODO(westont): Refactor once Twirp server is in use.
func getRepoTwirp(repo string) (string, string, error) {
repoURL, err := repoNameToUrl(repo)
if err != nil {
return "", "", err
}
return repoUrlToName(repoURL), repoURL, nil
}
func defaultHandler(w http.ResponseWriter, _ *http.Request) {
defer metrics2.FuncTimer().Stop()
w.Header().Set("Content-Type", "text/html")
defaultRepo := repoUrlToName((*repoUrls)[0])
// Don't use cached templates in testing mode.
if *testing {
reloadTemplates()
}
d := struct {
Title string
SwarmingURL string
TreeStatusBaseURL string
LogsURLTemplate string
TaskSchedulerURL string
DefaultRepo string
// Repo name to repo URL.
Repos map[string]string
}{
Title: fmt.Sprintf("Status: %s", defaultRepo),
SwarmingURL: *swarmingUrl,
TreeStatusBaseURL: *treeStatusBaseUrl,
LogsURLTemplate: *taskLogsUrlTemplate,
TaskSchedulerURL: *taskSchedulerUrl,
DefaultRepo: defaultRepo,
Repos: repoURLsByName,
}
if err := commitsTemplate.Execute(w, d); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to expand template: %v", err), http.StatusInternalServerError)
}
}
func capacityHandler(w http.ResponseWriter, _ *http.Request) {
defer metrics2.FuncTimer().Stop()
w.Header().Set("Content-Type", "text/html")
defaultRepo := repoUrlToName((*repoUrls)[0])
// Don't use cached templates in testing mode.
if *testing {
reloadTemplates()
}
d := struct {
Title string
SwarmingURL string
LogsURLTemplate string
TaskSchedulerURL string
DefaultRepo string
// Repo name to repo URL.
Repos map[string]string
}{
Title: "Capacity Statistics for Skia Bots",
SwarmingURL: *swarmingUrl,
LogsURLTemplate: *taskLogsUrlTemplate,
TaskSchedulerURL: *taskSchedulerUrl,
DefaultRepo: defaultRepo,
Repos: repoURLsByName,
}
if err := capacityTemplate.Execute(w, d); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to expand template: %v", err), http.StatusInternalServerError)
}
}
func lkgrHandler(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "text/html")
if _, err := w.Write([]byte(lkgrObj.Get())); err != nil {
httputils.ReportError(w, err, "Failed to write response.", http.StatusInternalServerError)
return
}
}
func getAutorollerStatusesTwirp() *rpc.GetAutorollerStatusesResponse {
autorollMtx.RLock()
defer autorollMtx.RUnlock()
return autorollStatusTwirp
}
// Note: srv already has the twirp handlers on it when passed into this function.
func runServer(serverURL string, srv http.Handler) {
topLevelRouter := chi.NewRouter()
topLevelRouter.Use(alogin.StatusMiddleware(plogin))
// Our 'main' router doesn't include the Twirp server, since it would double gzip responses.
topLevelRouter.Handle(rpc.StatusServicePathPrefix+"*", httputils.LoggingRequestResponse(srv))
topLevelRouter.With(httputils.LoggingGzipRequestResponse).Route("/", func(r chi.Router) {
r.HandleFunc("/", httputils.CorsHandler(defaultHandler))
r.HandleFunc("/capacity", capacityHandler)
r.HandleFunc("/lkgr", lkgrHandler)
r.HandleFunc("/_/login/status", alogin.LoginStatusHandler(plogin))
r.HandleFunc("/dist/*", httputils.MakeResourceHandler(*resourcesDir))
handlers.AddTaskDriverHandlers(r, taskDriverDb, taskDriverLogs)
})
var h http.Handler = topLevelRouter
if !*testing {
h = httputils.HealthzAndHTTPS(topLevelRouter)
}
h = httputils.XFrameOptionsDeny(h)
http.Handle("/", h)
sklog.Infof("Ready to serve on %s", serverURL)
sklog.Fatal(http.ListenAndServe(*port, nil))
}
type autoRollStatus struct {
autoroll_status.AutoRollMiniStatus
Url string `json:"url"`
}
func main() {
// Setup flags.
common.InitWithMust(
appName,
common.PrometheusOpt(promPort),
)
Init()
serverURL := "https://" + *host
if *testing {
serverURL = "http://" + *host + *port
}
ctx := context.Background()
podId = os.Getenv("POD_ID")
if podId == "" {
sklog.Error("POD_ID not defined; falling back to UUID.")
podId = uuid.New().String()
}
repoURLsByName = make(map[string]string)
for _, repoURL := range *repoUrls {
repoURLsByName[repoUrlToName(repoURL)] = fmt.Sprintf(gitiles.CommitURL, repoURL, "")
}
ts, err := google.DefaultTokenSource(ctx, auth.ScopeUserinfoEmail, auth.ScopeGerrit, bigtable.Scope, pubsub.ScopePubSub, datastore.ScopeDatastore)
if err != nil {
sklog.Fatal(err)
}
// Create LKGR object.
lkgrObj, err = lkgr.New(ctx)
if err != nil {
sklog.Fatalf("Failed to create LKGR: %s", err)
}
lkgrObj.UpdateLoop(10*time.Minute, ctx)
// Create remote Tasks DB.
taskDb, err = firestore.NewDBWithParams(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, ts)
if err != nil {
sklog.Fatalf("Failed to create Firestore DB client: %s", err)
}
plogin = proxylogin.NewWithDefaults()
// Check out source code.
if *repoUrls == nil {
sklog.Fatal("At least one --repo is required.")
}
btConf := &bt_gitstore.BTConfig{
ProjectID: *btProject,
InstanceID: *btInstance,
TableID: *gitstoreTable,
AppProfile: appName,
}
repos, err = bt_gitstore.NewBTGitStoreMap(ctx, *repoUrls, btConf)
if err != nil {
sklog.Fatal(err)
}
sklog.Info("Checkout complete")
// Cache for buildProgressHandler.
tasksPerCommit, err = newTasksPerCommitCache(ctx, repos, 14*24*time.Hour, *btProject, *btInstance, ts)
if err != nil {
sklog.Fatalf("Failed to create tasksPerCommitCache: %s", err)
}
// Create the IncrementalCacheImpl.
w, err := window.New(ctx, time.Minute, maxCommitsToLoad, repos)
if err != nil {
sklog.Fatalf("Failed to create time window: %s", err)
}
iCache, err = incremental.NewIncrementalCacheImpl(ctx, taskDb, w, repos, maxCommitsToLoad, *swarmingUrl, *taskSchedulerUrl)
if err != nil {
sklog.Fatalf("Failed to create IncrementalCacheImpl: %s", err)
}
iCache.UpdateLoop(ctx, 60*time.Second)
// Create a regular task cache.
tCache, err = cache.NewTaskCache(ctx, taskDb, w, nil)
if err != nil {
sklog.Fatalf("Failed to create TaskCache: %s", err)
}
lvTaskCache := metrics2.NewLiveness("status_task_cache")
go util.RepeatCtx(ctx, 60*time.Second, func(ctx context.Context) {
if err := tCache.Update(ctx); err != nil {
sklog.Errorf("Failed to update TaskCache: %s", err)
} else {
lvTaskCache.Reset()
}
})
// Capacity stats.
capacityClient = capacity.New(tasksPerCommit.tcc, tCache, repos)
capacityClient.StartLoading(ctx, *capacityRecalculateInterval)
// Periodically obtain the autoroller statuses.
if err := ds.InitWithOpt(common.PROJECT_ID, ds.AUTOROLL_NS, option.WithTokenSource(ts)); err != nil {
sklog.Fatalf("Failed to initialize datastore: %s", err)
}
// TODO(borenet): We're hard-coding the Firestore instance. We should find a
// way not to do so.
autorollStatusDB, err := status.NewDB(ctx, firestore.FIRESTORE_PROJECT, ds.AUTOROLL_NS, "production", ts)
if err != nil {
sklog.Fatalf("Failed to create status DB: %s", err)
}
updateAutorollStatus := func(ctx context.Context) error {
statuses := map[string]autoRollStatus{}
statusesTwirp := []*rpc.AutorollerStatus{}
for host, subMap := range autorollerIDsToNames {
for roller, friendlyName := range subMap {
s, err := autorollStatusDB.Get(ctx, roller)
if err != nil {
return skerr.Wrapf(err, "retrieving status for %s", roller)
}
miniStatus := s.AutoRollMiniStatus
url := fmt.Sprintf("https://%s/r/%s", host, roller)
statuses[friendlyName] = autoRollStatus{
AutoRollMiniStatus: miniStatus,
Url: url,
}
statusesTwirp = append(statusesTwirp,
&rpc.AutorollerStatus{
Name: friendlyName,
CurrentRollRev: miniStatus.CurrentRollRev,
LastRollRev: miniStatus.LastRollRev,
Mode: miniStatus.Mode,
NumBehind: int32(miniStatus.NumNotRolledCommits),
NumFailed: int32(miniStatus.NumFailedRolls),
Url: url})
}
}
sort.Slice(statusesTwirp, func(i, j int) bool {
return statusesTwirp[i].Name < statusesTwirp[j].Name
})
autorollMtx.Lock()
defer autorollMtx.Unlock()
autorollStatusTwirp = &rpc.GetAutorollerStatusesResponse{Rollers: statusesTwirp}
return nil
}
if err := updateAutorollStatus(ctx); err != nil {
sklog.Fatal(err)
}
go util.RepeatCtx(ctx, 60*time.Second, func(ctx context.Context) {
if err := updateAutorollStatus(ctx); err != nil {
sklog.Errorf("Failed to update autoroll status: %s", err)
}
})
// Create the TaskDriver DB.
taskDriverBtInstance := "staging" // Task Drivers aren't in prod yet.
taskDriverDb, err = bigtable_db.NewBigTableDB(ctx, *btProject, taskDriverBtInstance, ts)
if err != nil {
sklog.Fatal(err)
}
taskDriverLogs, err = logs.NewLogsManager(ctx, *btProject, taskDriverBtInstance, ts)
if err != nil {
sklog.Fatal(err)
}
// Create Twirp Server.
twirpServer := rpc.NewStatusServer(iCache, taskDb, capacityClient, getAutorollerStatusesTwirp, getRepoTwirp, maxCommitsToLoad, defaultCommitsToLoad, podId)
// Run the server.
runServer(serverURL, twirpServer)
}