blob: a2f8005a7b06364bd1da26400edfe562e0805b6e [file] [log] [blame]
Provides roll-up statuses for Skia build/test/perf.
package main
import (
autoroll_status ""
task_driver_db ""
bigtable_db ""
const (
appName = "status"
defaultCommitsToLoad = 35
maxCommitsToLoad = 100
var (
autorollMtx sync.RWMutex
autorollStatusTwirp *rpc.GetAutorollerStatusesResponse = nil
capacityClient *capacity.CapacityClientImpl = nil
capacityTemplate *template.Template = nil
commitsTemplate *template.Template = nil
iCache *incremental.IncrementalCacheImpl = nil
lkgrObj *lkgr.LKGR = nil
taskDb db.RemoteDB = nil
taskDriverDb task_driver_db.DB = nil
taskDriverLogs *logs.LogsManager = nil
tasksPerCommit *tasksPerCommitCache = nil
tCache cache.TaskCache = nil
plogin alogin.Login
// autorollerIDsToNames maps autoroll frontend host to maps of roller IDs to
// their human-friendly display names.
autorollerIDsToNames = map[string]map[string]string{
"": {
"skia-flutter-autoroll": "Flutter",
"skia-autoroll": "Chrome",
"angle-skia-autoroll": "ANGLE",
"dawn-skia-autoroll": "Dawn",
"skcms-skia-autoroll": "skcms",
"swiftshader-skia-autoroll": "SwiftSh",
"vulkan-deps-skia-autoroll": "VkDeps",
"": {
"android-master-autoroll": "Android",
"google3-autoroll": "Google3",
// flags
var (
chromeInfraAuthJWT = flag.String("chrome_infra_auth_jwt", "/var/secrets/skia-public-auth/key.json", "Path to a local file, or name of a GCP secret, containing the JWT key for the service account that has access to chrome infra auth.")
// TODO(borenet): Combine btInstance and firestoreInstance.
btInstance = flag.String("bigtable_instance", "", "BigTable instance to use.")
btProject = flag.String("bigtable_project", "", "GCE project to use for BigTable.")
capacityRecalculateInterval = flag.Duration("capacity_recalculate_interval", 10*time.Minute, "How often to re-calculate capacity statistics.")
firestoreInstance = flag.String("firestore_instance", "", "Firestore instance to use, eg. \"production\"")
gitstoreTable = flag.String("gitstore_bt_table", "git-repos2", "BigTable table used for GitStore.")
host = flag.String("host", "localhost", "HTTP service host")
port = flag.String("port", ":8002", "HTTP service port (e.g., ':8002')")
promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')")
repoUrls = common.NewMultiStringFlag("repo", nil, "Repositories to query for status.")
resourcesDir = flag.String("resources_dir", "", "The directory to find templates, JS, and CSS files. If blank the current directory will be used.")
secretProject = flag.String("secret-project", "skia-infra-public", "Name of the GCP project used for secret management.")
swarmingUrl = flag.String("swarming_url", "", "URL of the Swarming server.")
taskLogsUrlTemplate = flag.String("task_logs_url_template", "{{TaskID}}/+/annotations", "Template URL for direct link to logs, with {{TaskID}} as placeholder.")
taskSchedulerUrl = flag.String("task_scheduler_url", "", "URL of the Task Scheduler server.")
testing = flag.Bool("testing", false, "Set to true for locally testing rules. No email will be sent.")
treeStatusBaseUrl = flag.String("tree_status_base_url", "", "Repo specific tree status URLs will be created using this base url. Eg: or")
podId string
repos repograph.Map
// Repos and associated templates for creating links to their commits.
repoURLsByName map[string]string
// StringIsInteresting returns true iff the string contains non-whitespace characters.
func StringIsInteresting(s string) bool {
for _, c := range s {
if !unicode.IsSpace(c) {
return true
return false
func reloadTemplates() {
// Change the current working directory to two directories up from this source file so that we
// can read templates and serve static (res/) files.
if *resourcesDir == "" {
_, filename, _, _ := runtime.Caller(0)
*resourcesDir = filepath.Join(filepath.Dir(filename), "../..")
commitsTemplate = template.Must(template.ParseFiles(
filepath.Join(*resourcesDir, "dist", "status.html"),
capacityTemplate = template.Must(template.ParseFiles(
filepath.Join(*resourcesDir, "dist", "capacity.html"),
func Init() {
// repoUrlToName returns a short repo nickname given a full repo URL.
func repoUrlToName(repoUrl string) string {
// Special case: we like "infra" better than "buildbot".
if repoUrl == common.REPO_SKIA_INFRA {
return "infra"
return strings.TrimSuffix(path.Base(repoUrl), ".git")
// repoNameToUrl returns a full repo URL given a short nickname, or an error
// if no matching repo URL is found.
func repoNameToUrl(repoName string) (string, error) {
// Special case: we like "infra" better than "buildbot".
if repoName == "infra" {
return common.REPO_SKIA_INFRA, nil
// Search the list of repos used by this server.
for _, repoUrl := range *repoUrls {
if repoUrlToName(repoUrl) == repoName {
return repoUrl, nil
return "", fmt.Errorf("No such repo.")
// Same as above, for new WIP Twirp server.
// TODO(westont): Refactor once Twirp server is in use.
func getRepoTwirp(repo string) (string, string, error) {
repoURL, err := repoNameToUrl(repo)
if err != nil {
return "", "", err
return repoUrlToName(repoURL), repoURL, nil
func defaultHandler(w http.ResponseWriter, _ *http.Request) {
defer metrics2.FuncTimer().Stop()
w.Header().Set("Content-Type", "text/html")
defaultRepo := repoUrlToName((*repoUrls)[0])
// Don't use cached templates in testing mode.
if *testing {
d := struct {
Title string
SwarmingURL string
TreeStatusBaseURL string
LogsURLTemplate string
TaskSchedulerURL string
DefaultRepo string
// Repo name to repo URL.
Repos map[string]string
Title: fmt.Sprintf("Status: %s", defaultRepo),
SwarmingURL: *swarmingUrl,
TreeStatusBaseURL: *treeStatusBaseUrl,
LogsURLTemplate: *taskLogsUrlTemplate,
TaskSchedulerURL: *taskSchedulerUrl,
DefaultRepo: defaultRepo,
Repos: repoURLsByName,
if err := commitsTemplate.Execute(w, d); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to expand template: %v", err), http.StatusInternalServerError)
func capacityHandler(w http.ResponseWriter, _ *http.Request) {
defer metrics2.FuncTimer().Stop()
w.Header().Set("Content-Type", "text/html")
defaultRepo := repoUrlToName((*repoUrls)[0])
// Don't use cached templates in testing mode.
if *testing {
d := struct {
Title string
SwarmingURL string
LogsURLTemplate string
TaskSchedulerURL string
DefaultRepo string
// Repo name to repo URL.
Repos map[string]string
Title: "Capacity Statistics for Skia Bots",
SwarmingURL: *swarmingUrl,
LogsURLTemplate: *taskLogsUrlTemplate,
TaskSchedulerURL: *taskSchedulerUrl,
DefaultRepo: defaultRepo,
Repos: repoURLsByName,
if err := capacityTemplate.Execute(w, d); err != nil {
httputils.ReportError(w, err, fmt.Sprintf("Failed to expand template: %v", err), http.StatusInternalServerError)
func lkgrHandler(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "text/html")
if _, err := w.Write([]byte(lkgrObj.Get())); err != nil {
httputils.ReportError(w, err, "Failed to write response.", http.StatusInternalServerError)
func getAutorollerStatusesTwirp() *rpc.GetAutorollerStatusesResponse {
defer autorollMtx.RUnlock()
return autorollStatusTwirp
// Note: srv already has the twirp handlers on it when passed into this function.
func runServer(serverURL string, srv http.Handler) {
topLevelRouter := chi.NewRouter()
// Our 'main' router doesn't include the Twirp server, since it would double gzip responses.
topLevelRouter.Handle(rpc.StatusServicePathPrefix+"*", httputils.LoggingRequestResponse(srv))
topLevelRouter.With(httputils.LoggingGzipRequestResponse).Route("/", func(r chi.Router) {
r.HandleFunc("/", httputils.CorsHandler(defaultHandler))
r.HandleFunc("/capacity", capacityHandler)
r.HandleFunc("/lkgr", lkgrHandler)
r.HandleFunc("/_/login/status", alogin.LoginStatusHandler(plogin))
r.HandleFunc("/dist/*", httputils.MakeResourceHandler(*resourcesDir))
handlers.AddTaskDriverHandlers(r, taskDriverDb, taskDriverLogs)
var h http.Handler = topLevelRouter
if !*testing {
h = httputils.HealthzAndHTTPS(topLevelRouter)
h = httputils.XFrameOptionsDeny(h)
http.Handle("/", h)
sklog.Infof("Ready to serve on %s", serverURL)
sklog.Fatal(http.ListenAndServe(*port, nil))
type autoRollStatus struct {
Url string `json:"url"`
func main() {
// Setup flags.
serverURL := "https://" + *host
if *testing {
serverURL = "http://" + *host + *port
ctx := context.Background()
podId = os.Getenv("POD_ID")
if podId == "" {
sklog.Error("POD_ID not defined; falling back to UUID.")
podId = uuid.New().String()
repoURLsByName = make(map[string]string)
for _, repoURL := range *repoUrls {
repoURLsByName[repoUrlToName(repoURL)] = fmt.Sprintf(gitiles.CommitURL, repoURL, "")
ts, err := google.DefaultTokenSource(ctx, auth.ScopeUserinfoEmail, auth.ScopeGerrit, bigtable.Scope, pubsub.ScopePubSub, datastore.ScopeDatastore)
if err != nil {
// Create LKGR object.
lkgrObj, err = lkgr.New(ctx)
if err != nil {
sklog.Fatalf("Failed to create LKGR: %s", err)
lkgrObj.UpdateLoop(10*time.Minute, ctx)
// Create remote Tasks DB.
taskDb, err = firestore.NewDBWithParams(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, ts)
if err != nil {
sklog.Fatalf("Failed to create Firestore DB client: %s", err)
plogin = proxylogin.NewWithDefaults()
// Check out source code.
if *repoUrls == nil {
sklog.Fatal("At least one --repo is required.")
btConf := &bt_gitstore.BTConfig{
ProjectID: *btProject,
InstanceID: *btInstance,
TableID: *gitstoreTable,
AppProfile: appName,
repos, err = bt_gitstore.NewBTGitStoreMap(ctx, *repoUrls, btConf)
if err != nil {
sklog.Info("Checkout complete")
// Cache for buildProgressHandler.
tasksPerCommit, err = newTasksPerCommitCache(ctx, repos, 14*24*time.Hour, *btProject, *btInstance, ts)
if err != nil {
sklog.Fatalf("Failed to create tasksPerCommitCache: %s", err)
// Create the IncrementalCacheImpl.
w, err := window.New(ctx, time.Minute, maxCommitsToLoad, repos)
if err != nil {
sklog.Fatalf("Failed to create time window: %s", err)
iCache, err = incremental.NewIncrementalCacheImpl(ctx, taskDb, w, repos, maxCommitsToLoad, *swarmingUrl, *taskSchedulerUrl)
if err != nil {
sklog.Fatalf("Failed to create IncrementalCacheImpl: %s", err)
iCache.UpdateLoop(ctx, 60*time.Second)
// Create a regular task cache.
tCache, err = cache.NewTaskCache(ctx, taskDb, w, nil)
if err != nil {
sklog.Fatalf("Failed to create TaskCache: %s", err)
lvTaskCache := metrics2.NewLiveness("status_task_cache")
go util.RepeatCtx(ctx, 60*time.Second, func(ctx context.Context) {
if err := tCache.Update(ctx); err != nil {
sklog.Errorf("Failed to update TaskCache: %s", err)
} else {
// Capacity stats.
capacityClient = capacity.New(tasksPerCommit.tcc, tCache, repos)
capacityClient.StartLoading(ctx, *capacityRecalculateInterval)
// Periodically obtain the autoroller statuses.
if err := ds.InitWithOpt(common.PROJECT_ID, ds.AUTOROLL_NS, option.WithTokenSource(ts)); err != nil {
sklog.Fatalf("Failed to initialize datastore: %s", err)
// TODO(borenet): We're hard-coding the Firestore instance. We should find a
// way not to do so.
autorollStatusDB, err := status.NewDB(ctx, firestore.FIRESTORE_PROJECT, ds.AUTOROLL_NS, "production", ts)
if err != nil {
sklog.Fatalf("Failed to create status DB: %s", err)
updateAutorollStatus := func(ctx context.Context) error {
statuses := map[string]autoRollStatus{}
statusesTwirp := []*rpc.AutorollerStatus{}
for host, subMap := range autorollerIDsToNames {
for roller, friendlyName := range subMap {
s, err := autorollStatusDB.Get(ctx, roller)
if err != nil {
return skerr.Wrapf(err, "retrieving status for %s", roller)
miniStatus := s.AutoRollMiniStatus
url := fmt.Sprintf("https://%s/r/%s", host, roller)
statuses[friendlyName] = autoRollStatus{
AutoRollMiniStatus: miniStatus,
Url: url,
statusesTwirp = append(statusesTwirp,
Name: friendlyName,
CurrentRollRev: miniStatus.CurrentRollRev,
LastRollRev: miniStatus.LastRollRev,
Mode: miniStatus.Mode,
NumBehind: int32(miniStatus.NumNotRolledCommits),
NumFailed: int32(miniStatus.NumFailedRolls),
Url: url})
sort.Slice(statusesTwirp, func(i, j int) bool {
return statusesTwirp[i].Name < statusesTwirp[j].Name
defer autorollMtx.Unlock()
autorollStatusTwirp = &rpc.GetAutorollerStatusesResponse{Rollers: statusesTwirp}
return nil
if err := updateAutorollStatus(ctx); err != nil {
go util.RepeatCtx(ctx, 60*time.Second, func(ctx context.Context) {
if err := updateAutorollStatus(ctx); err != nil {
sklog.Errorf("Failed to update autoroll status: %s", err)
// Create the TaskDriver DB.
taskDriverBtInstance := "staging" // Task Drivers aren't in prod yet.
taskDriverDb, err = bigtable_db.NewBigTableDB(ctx, *btProject, taskDriverBtInstance, ts)
if err != nil {
taskDriverLogs, err = logs.NewLogsManager(ctx, *btProject, taskDriverBtInstance, ts)
if err != nil {
// Create Twirp Server.
twirpServer := rpc.NewStatusServer(iCache, taskDb, capacityClient, getAutorollerStatusesTwirp, getRepoTwirp, maxCommitsToLoad, defaultCommitsToLoad, podId)
// Run the server.
runServer(serverURL, twirpServer)