Add ability to bring up multiple Android compile servers
* Separates FE and BE and creates new FE server.
* Creates new AndroidCompileInstances Kind in Datastore that keeps track of when mirror was last synced and if mirror should be force synced.
* Changed UI to account for and display multiple backends. Screenshot: https://screenshot.googleplex.com/E9fA5A9QNPU
* The force mirror sync button will now signal to all instances to update their mirror and so will behave similarly as before.
* Support for running multiple backends
* Multiple instances satisfy new requests by Ack'ing and Nack'ing pubsub storage notifications.
* Care has been taken to make sure duplicate tasks are not triggered in case pubsub sends the same message again (At-least-once delivery from https://cloud.google.com/pubsub/docs/subscriber).
* In case a backend is restarted it will run all tasks that were interrupted before picking up new tasks.
* Other improvements
* Changed startup to only sync checkouts that do not exist. Otherwise it syncs all checkouts during startup which is unnecessary because checkouts are synced before running tasks anyway.
* Removed UpdateInfraFailureMetric. The alert was only triggered for merge errors and the error is already exposed in the recipe output anyway.
Bug: skia:9302
Change-Id: I662f26c7fc0c841b6b36e39b20f06d50289f7eaa
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/231096
Commit-Queue: Ravi Mistry <rmistry@google.com>
Reviewed-by: Eric Boren <borenet@google.com>
diff --git a/android_compile/Makefile b/android_compile/Makefile
index 47f8d55..9e2d28d 100644
--- a/android_compile/Makefile
+++ b/android_compile/Makefile
@@ -11,20 +11,32 @@
include ../go/skiaversion/skiaversion.mk
# Build debug versions of core.js and elements.html.
-.PHONY: debug_android_compile
-debug_android_compile: clean_webtools debug_core_js debug_elements_html skiaversion
- go install -v ./go/...
+.PHONY: debug_android_compile_fe
+debug_android_compile_fe: clean_webtools debug_core_js debug_elements_html skiaversion
+ go install -v ./go/android_compile_fe/...
-.PHONY: android_compile
-android_compile: clean_webtools elements_html
- GOOS=linux go install -v ./go/...
+.PHONY: android_compile_fe
+android_compile_fe: clean_webtools elements_html
+ GOOS=linux go install -v ./go/android_compile_fe
-.PHONY: release
-release: android_compile
- ./build_docker_release
+.PHONY: release_fe
+release_fe: android_compile_fe
+ ./build_fe_release
-.PHONY: push
+.PHONY: android_compile_be
+android_compile_be:
+ GOOS=linux go install -v ./go/android_compile_be
+
+.PHONY: release_be
+release_be: android_compile_be
+ ./build_be_release
+
+.PHONY: push_fe
push: skia-corp
- pushk --cluster=skia-corp android_compile
+ pushk --cluster=skia-corp android_compile_fe
+
+.PHONY: push_be
+push: skia-corp
+ pushk --cluster=skia-corp android_compile_be
include ../make/clusters.mk
diff --git a/android_compile/build_be_release b/android_compile/build_be_release
new file mode 100755
index 0000000..4bec0ba
--- /dev/null
+++ b/android_compile/build_be_release
@@ -0,0 +1,20 @@
+#!/bin/bash
+APPNAME=android_compile_be
+
+set -x -e
+
+# Copy files into the right locations in ${ROOT}.
+copy_release_files()
+{
+INSTALL="install -D --verbose --backup=none"
+INSTALL_DIR="install -d --verbose --backup=none"
+
+${INSTALL} --mode=644 -T ./go/android_compile_be/Dockerfile ${ROOT}/Dockerfile
+${INSTALL} --mode=755 -T ${GOPATH}/bin/${APPNAME} ${ROOT}/usr/local/bin/${APPNAME}
+
+${INSTALL_DIR} --mode=755 ${ROOT}/usr/local/share/${APPNAME}
+${INSTALL} --mode=755 -T compile.sh ${ROOT}/usr/local/share/${APPNAME}/compile.sh
+${INSTALL} --mode=755 -T clean-checkout.sh ${ROOT}/usr/local/share/${APPNAME}/clean-checkout.sh
+}
+
+source ../bash/docker_build.sh
diff --git a/android_compile/build_docker_release b/android_compile/build_fe_release
similarity index 69%
rename from android_compile/build_docker_release
rename to android_compile/build_fe_release
index 61b013b..b98d218 100755
--- a/android_compile/build_docker_release
+++ b/android_compile/build_fe_release
@@ -1,5 +1,5 @@
#!/bin/bash
-APPNAME=android_compile
+APPNAME=android_compile_fe
set -x -e
@@ -9,13 +9,10 @@
INSTALL="install -D --verbose --backup=none"
INSTALL_DIR="install -d --verbose --backup=none"
-${INSTALL} --mode=644 -T Dockerfile ${ROOT}/Dockerfile
-${INSTALL} --mode=755 -T ${GOPATH}/bin/${APPNAME} ${ROOT}/usr/local/bin/${APPNAME}
+${INSTALL} --mode=644 -T ./go/android_compile_fe/Dockerfile ${ROOT}/Dockerfile
+${INSTALL} --mode=755 -T ${GOPATH}/bin/${APPNAME} ${ROOT}/usr/local/bin/${APPNAME}
-${INSTALL_DIR} --mode=755 ${ROOT}/usr/local/share/${APPNAME}
-${INSTALL} --mode=755 -T compile.sh ${ROOT}/usr/local/share/${APPNAME}/compile.sh
-${INSTALL} --mode=755 -T clean-checkout.sh ${ROOT}/usr/local/share/${APPNAME}/clean-checkout.sh
-
+${INSTALL_DIR} --mode=755 ${ROOT}/usr/local/share/${APPNAME}
${INSTALL_DIR} --mode=755 ${ROOT}/usr/local/share/${APPNAME}/res/img
${INSTALL} --mode=644 ./res/img/* ${ROOT}/usr/local/share/${APPNAME}/res/img
${INSTALL_DIR} --mode=755 ${ROOT}/usr/local/share/${APPNAME}/res/js
diff --git a/android_compile/go/android_compile/datastore.go b/android_compile/go/android_compile/datastore.go
deleted file mode 100644
index 1c9dd38..0000000
--- a/android_compile/go/android_compile/datastore.go
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- Used by the Android Compile Server to interact with the cloud datastore.
-*/
-
-package main
-
-import (
- "context"
- "fmt"
- "sort"
- "time"
-
- "cloud.google.com/go/datastore"
- "go.skia.org/infra/go/ds"
- "golang.org/x/oauth2"
- "google.golang.org/api/iterator"
- "google.golang.org/api/option"
-)
-
-type CompileTask struct {
- Issue int `json:"issue"`
- PatchSet int `json:"patchset"`
- Hash string `json:"hash"`
-
- LunchTarget string `json:"lunch_target"`
- MMMATargets string `json:"mmma_targets"`
-
- Checkout string `json:"checkout"`
-
- Created time.Time `json:"created"`
- Completed time.Time `json:"completed"`
-
- WithPatchSucceeded bool `json:"withpatch_success"`
- NoPatchSucceeded bool `json:"nopatch_success"`
-
- WithPatchLog string `json:"withpatch_log"`
- NoPatchLog string `json:"nopatch_log"`
-
- IsMasterBranch bool `json:"is_master_branch"`
- Done bool `json:"done"`
- Error string `json:"error"`
- InfraFailure bool `json:"infra_failure"`
-}
-
-type CompileTaskAndKey struct {
- task *CompileTask
- key *datastore.Key
-}
-type sortTasks []*CompileTaskAndKey
-
-func (a sortTasks) Len() int { return len(a) }
-func (a sortTasks) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
-func (a sortTasks) Less(i, j int) bool {
- return a[i].task.Created.Before(a[j].task.Created)
-}
-
-func GetCompileTasksAndKeys() ([]*CompileTaskAndKey, []*CompileTaskAndKey, error) {
- waitingTasksAndKeys := []*CompileTaskAndKey{}
- runningTasksAndKeys := []*CompileTaskAndKey{}
-
- it := GetPendingTasks()
- for {
- t := &CompileTask{}
- datastoreKey, err := it.Next(t)
- if err == iterator.Done {
- break
- } else if err != nil {
- return nil, nil, fmt.Errorf("Failed to retrieve list of tasks: %s", err)
- }
- taskAndKey := &CompileTaskAndKey{task: t, key: datastoreKey}
- if t.Checkout == "" {
- waitingTasksAndKeys = append(waitingTasksAndKeys, taskAndKey)
- } else {
- runningTasksAndKeys = append(runningTasksAndKeys, taskAndKey)
- }
- }
- sort.Sort(sortTasks(waitingTasksAndKeys))
- sort.Sort(sortTasks(runningTasksAndKeys))
-
- return waitingTasksAndKeys, runningTasksAndKeys, nil
-}
-
-func GetCompileTasks() ([]*CompileTask, []*CompileTask, error) {
- waitingTasksAndKeys, runningTasksAndKeys, err := GetCompileTasksAndKeys()
- if err != nil {
- return nil, nil, err
- }
- waitingTasks := []*CompileTask{}
- for _, taskAndKey := range waitingTasksAndKeys {
- waitingTasks = append(waitingTasks, taskAndKey.task)
- }
- runningTasks := []*CompileTask{}
- for _, taskAndKey := range runningTasksAndKeys {
- runningTasks = append(runningTasks, taskAndKey.task)
- }
- return waitingTasks, runningTasks, nil
-}
-
-func DatastoreInit(project string, ns string, ts oauth2.TokenSource) error {
- return ds.InitWithOpt(project, ns, option.WithTokenSource(ts))
-}
-
-func GetPendingTasks() *datastore.Iterator {
- q := ds.NewQuery(ds.COMPILE_TASK).EventualConsistency().Filter("Done =", false)
- return ds.DS.Run(context.TODO(), q)
-}
-
-func GetNewDSKey() *datastore.Key {
- return ds.NewKey(ds.COMPILE_TASK)
-}
-
-func GetDSTask(taskID int64) (*datastore.Key, *CompileTask, error) {
- key := ds.NewKey(ds.COMPILE_TASK)
- key.ID = taskID
-
- task := &CompileTask{}
- if err := ds.DS.Get(context.TODO(), key, task); err != nil {
- return nil, nil, fmt.Errorf("Error retrieving task from Datastore: %v", err)
- }
- return key, task, nil
-}
-
-func PutDSTask(ctx context.Context, k *datastore.Key, t *CompileTask) (*datastore.Key, error) {
- return ds.DS.Put(ctx, k, t)
-}
-
-func UpdateDSTask(ctx context.Context, k *datastore.Key, t *CompileTask) (*datastore.Key, error) {
- return ds.DS.Put(ctx, k, t)
-}
diff --git a/android_compile/go/android_compile/main.go b/android_compile/go/android_compile/main.go
deleted file mode 100644
index f9dc0c7..0000000
--- a/android_compile/go/android_compile/main.go
+++ /dev/null
@@ -1,421 +0,0 @@
-/*
- Android Compile Server for Skia Bots.
-*/
-
-package main
-
-import (
- "context"
- "encoding/json"
- "errors"
- "flag"
- "fmt"
- "html/template"
- "net/http"
- "os/user"
- "path/filepath"
- "sync"
- "time"
-
- "cloud.google.com/go/datastore"
- "cloud.google.com/go/storage"
- "github.com/gorilla/mux"
- "google.golang.org/api/option"
-
- "go.skia.org/infra/go/allowed"
- "go.skia.org/infra/go/auth"
- "go.skia.org/infra/go/common"
- "go.skia.org/infra/go/eventbus"
- "go.skia.org/infra/go/gevent"
- "go.skia.org/infra/go/gitauth"
- "go.skia.org/infra/go/httputils"
- "go.skia.org/infra/go/login"
- "go.skia.org/infra/go/skiaversion"
- "go.skia.org/infra/go/sklog"
- "go.skia.org/infra/go/util"
-)
-
-const (
- FORCE_SYNC_POST_URL = "/_/force_sync"
-)
-
-var (
- // Flags
- local = flag.Bool("local", false, "Running locally if true. As opposed to in production.")
- host = flag.String("host", "localhost", "HTTP service host")
- port = flag.String("port", ":8000", "HTTP service port.")
- promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':20000')")
- workdir = flag.String("workdir", ".", "Directory to use for scratch work.")
- resourcesDir = flag.String("resources_dir", "", "The directory to find compile.sh and template files. If blank then the directory two directories up from this source file will be used.")
- numCheckouts = flag.Int("num_checkouts", 10, "The number of checkouts the Android compile server should maintain.")
- repoUpdateDuration = flag.Duration("repo_update_duration", 1*time.Hour, "How often to update the main Android repository.")
- serviceAccount = flag.String("service_account", "", "Should be set when running in K8s.")
- authWhiteList = flag.String("auth_whitelist", "google.com", "White space separated list of domains and email addresses that are allowed to login.")
-
- // Useful for debugging.
- hang = flag.Bool("hang", false, "If true, just hang and do nothing.")
-
- // Pubsub for storage flags.
- projectID = flag.String("project_id", "google.com:skia-corp", "Project ID of the Cloud project where the PubSub topic and GS bucket lives.")
- storageBucket = flag.String("bucket", "android-compile-tasks", "Storage bucket where android compile task JSON files will be kept.")
- subscriberName = flag.String("subscriber", "android-compile-tasks", "ID of the pubsub subscriber.")
- topic = flag.String("topic", "android-compile-tasks", "Google Cloud PubSub topic of the eventbus.")
-
- // Datastore params
- namespace = flag.String("namespace", "android-compile-staging", "The Cloud Datastore namespace, such as 'android-compile'.")
- projectName = flag.String("project_name", "google.com:skia-corp", "The Google Cloud project name.")
-
- // Used to signal when checkouts are ready to serve requests.
- checkoutsReadyMutex sync.RWMutex
-
- // indexTemplate is the main index.html page we serve.
- indexTemplate *template.Template = nil
-
- serverURL string
-)
-
-func reloadTemplates() {
- indexTemplate = template.Must(template.ParseFiles(
- filepath.Join(*resourcesDir, "templates/index.html"),
- filepath.Join(*resourcesDir, "templates/header.html"),
- ))
-}
-
-func loginHandler(w http.ResponseWriter, r *http.Request) {
- http.Redirect(w, r, login.LoginURL(w, r), http.StatusFound)
- return
-}
-
-func indexHandler(w http.ResponseWriter, r *http.Request) {
- if *local {
- reloadTemplates()
- }
- w.Header().Set("Content-Type", "text/html")
-
- if login.LoggedInAs(r) == "" {
- http.Redirect(w, r, login.LoginURL(w, r), http.StatusSeeOther)
- return
- }
-
- waitingTasks, runningTasks, err := GetCompileTasks()
- if err != nil {
- httputils.ReportError(w, r, err, "Failed to get compile tasks")
- return
- }
-
- var info = struct {
- WaitingTasks []*CompileTask
- RunningTasks []*CompileTask
- MirrorLastSynced string
- MirrorUpdateDuration time.Duration
- MirrorUpdateRunning bool
- }{
- WaitingTasks: waitingTasks,
- RunningTasks: runningTasks,
- MirrorLastSynced: MirrorLastSynced.Format("Mon Jan 2 15:04:05 MST"),
- MirrorUpdateDuration: *repoUpdateDuration,
- MirrorUpdateRunning: getMirrorUpdateRunning(),
- }
-
- if err := indexTemplate.Execute(w, info); err != nil {
- httputils.ReportError(w, r, err, "Failed to expand template")
- return
- }
- return
-}
-
-func forceSyncHandler(w http.ResponseWriter, r *http.Request) {
- if *local {
- reloadTemplates()
- }
-
- if login.LoggedInAs(r) == "" {
- http.Redirect(w, r, login.LoginURL(w, r), http.StatusSeeOther)
- return
- }
-
- if getMirrorUpdateRunning() {
- httputils.ReportError(w, r, nil, "Checkout sync is currently in progress")
- return
- }
-
- sklog.Infof("Force sync button has been pressed by %s", login.LoggedInAs(r))
- UpdateMirror(context.Background())
- return
-}
-
-func readGSAndTriggerCompileTask(ctx context.Context, g *gsFileLocation) error {
- data, err := g.storageClient.Bucket(g.bucket).Object(g.name).NewReader(ctx)
- if err != nil {
- return fmt.Errorf("New reader failed for %s/%s: %s", g.bucket, g.name, err)
- }
-
- task := CompileTask{}
- if err := json.NewDecoder(data).Decode(&task); err != nil {
- return fmt.Errorf("Failed to parse request: %s", err)
- }
-
- // Either hash or (issue & patchset) must be specified.
- if task.Hash == "" && (task.Issue == 0 || task.PatchSet == 0) {
- return errors.New("Either hash or (issue & patchset) must be specified")
- }
-
- // Set default values if LunchTarget and MMMATargets are not specified.
- // This is done for backwards compatibility.
- if task.LunchTarget == "" {
- task.LunchTarget = DEFAULT_LUNCH_TARGET
- }
- if task.MMMATargets == "" {
- task.MMMATargets = DEFAULT_MMMA_TARGETS
- }
-
- // Check to see if this task has already been requested and is currently
- // waiting/running. If it is then do not trigger a new task. This is done
- // to avoid creating unnecessary duplicate tasks.
- waitingTasksAndKeys, runningTasksAndKeys, err := GetCompileTasksAndKeys()
- if err != nil {
- return fmt.Errorf("Failed to retrieve currently waiting/running compile tasks and keys: %s", err)
- }
- for _, existingTaskAndKey := range append(waitingTasksAndKeys, runningTasksAndKeys...) {
- if task.LunchTarget == existingTaskAndKey.task.LunchTarget &&
- ((task.Hash != "" && task.Hash == existingTaskAndKey.task.Hash) ||
- (task.Hash == "" && task.Issue == existingTaskAndKey.task.Issue && task.PatchSet == existingTaskAndKey.task.PatchSet)) {
- sklog.Infof("Got request for already existing task [lunch_target: %s, hash: %s, issue: %d, patchset: %d, id: %d]", task.LunchTarget, task.Hash, task.Issue, task.PatchSet, existingTaskAndKey.key.ID)
- return nil
- }
- }
-
- key := GetNewDSKey()
- task.Created = time.Now()
- datastoreKey, err := PutDSTask(ctx, key, &task)
- if err != nil {
- return fmt.Errorf("Error putting task in datastore: %s", err)
- }
-
- // Kick off the task and return the task ID.
- triggerCompileTask(ctx, g, &task, datastoreKey)
-
- // Update the Google storage file.
- if err := updateTaskInGoogleStorage(ctx, g, task, datastoreKey.ID); err != nil {
- return fmt.Errorf("Could not update task in Google storage: %s", err)
- }
-
- return nil
-}
-
-// triggerCompileTask runs the specified CompileTask in a goroutine. After
-// completion the task is marked as Done and updated in the Datastore.
-func triggerCompileTask(ctx context.Context, g *gsFileLocation, task *CompileTask, datastoreKey *datastore.Key) {
- go func() {
- checkoutsReadyMutex.RLock()
- defer checkoutsReadyMutex.RUnlock()
- pathToCompileScript := filepath.Join(*resourcesDir, "compile.sh")
- if err := RunCompileTask(ctx, g, task, datastoreKey, pathToCompileScript); err != nil {
- task.InfraFailure = true
- task.Error = err.Error()
- sklog.Errorf("Error when compiling task with ID %d: %s", datastoreKey.ID, err)
- }
- updateInfraFailureMetric(task.InfraFailure)
- task.Done = true
- task.Completed = time.Now()
- if err := UpdateCompileTask(ctx, g, datastoreKey, task); err != nil {
- sklog.Errorf("Could not update compile task with ID %d: %s", datastoreKey.ID, err)
- }
- }()
-}
-
-// UpdateCompileTask updates the task in both Google storage and in Datastore.
-func UpdateCompileTask(ctx context.Context, g *gsFileLocation, datastoreKey *datastore.Key, task *CompileTask) error {
- if err := updateTaskInGoogleStorage(ctx, g, *task, datastoreKey.ID); err != nil {
- return fmt.Errorf("Could not update in Google storage compile task in with ID %d: %s", datastoreKey.ID, err)
- }
- if _, err := UpdateDSTask(ctx, datastoreKey, task); err != nil {
- return fmt.Errorf("Could not update in Datastore compile task with ID %d: %s", datastoreKey.ID, err)
- }
- return nil
-}
-
-func updateTaskInGoogleStorage(ctx context.Context, g *gsFileLocation, task CompileTask, taskID int64) error {
- // Update the Google storage file with the taskID.
- b, err := json.Marshal(struct {
- CompileTask
- TaskID int64 `json:"task_id"`
- }{
- CompileTask: task,
- TaskID: taskID,
- })
- if err != nil {
- return fmt.Errorf("Could not re-encode compile task: %s", err)
- }
- wr := g.storageClient.Bucket(g.bucket).Object(g.name).NewWriter(ctx)
- defer util.Close(wr)
- wr.ObjectAttrs.ContentEncoding = "application/json"
- if _, err := wr.Write(b); err != nil {
- return fmt.Errorf("Failed writing JSON to GCS: %s", err)
- }
- return nil
-}
-
-type gsFileLocation struct {
- bucket string
- name string
- storageClient *storage.Client
-}
-
-func newGCSFileLocation(result *storage.ObjectAttrs, storageClient *storage.Client) *gsFileLocation {
- return &gsFileLocation{
- bucket: result.Bucket,
- name: result.Name,
- storageClient: storageClient,
- }
-}
-
-func runServer() {
- r := mux.NewRouter()
- r.PathPrefix("/res/").HandlerFunc(httputils.MakeResourceHandler(*resourcesDir))
- r.HandleFunc("/", indexHandler)
- r.HandleFunc(FORCE_SYNC_POST_URL, forceSyncHandler)
-
- r.HandleFunc("/json/version", skiaversion.JsonHandler)
- r.HandleFunc(login.DEFAULT_OAUTH2_CALLBACK, login.OAuth2CallbackHandler)
- r.HandleFunc("/login/", loginHandler)
- r.HandleFunc("/logout/", login.LogoutHandler)
- r.HandleFunc("/loginstatus/", login.StatusHandler)
- h := httputils.LoggingGzipRequestResponse(r)
- if !*local {
- h = httputils.HealthzAndHTTPS(h)
- }
-
- http.Handle("/", h)
- sklog.Infof("Ready to serve on %s", serverURL)
- sklog.Fatal(http.ListenAndServe(*port, nil))
-}
-
-func main() {
- flag.Parse()
-
- common.InitWithMust("android_compile", common.PrometheusOpt(promPort), common.MetricsLoggingOpt())
- defer common.Defer()
- skiaversion.MustLogVersion()
- ctx := context.Background()
-
- if *projectID == "" || *topic == "" || *subscriberName == "" {
- sklog.Fatalf("project_id, topic and subscriber flags must all be set.")
- }
-
- reloadTemplates()
- serverURL = "https://" + *host
- if *local {
- serverURL = "http://" + *host + *port
- }
- login.InitWithAllow(serverURL+login.DEFAULT_OAUTH2_CALLBACK, allowed.Googlers(), allowed.Googlers(), nil)
-
- if *hang {
- sklog.Infof("--hang provided; doing nothing.")
- httputils.RunHealthCheckServer(*port)
- }
-
- // Create token source.
- ts, err := auth.NewDefaultTokenSource(*local, auth.SCOPE_READ_WRITE, auth.SCOPE_USERINFO_EMAIL, auth.SCOPE_GERRIT, datastore.ScopeDatastore)
- if err != nil {
- sklog.Fatalf("Problem setting up default token source: %s", err)
- }
-
- // Instantiate storage client.
- storageClient, err := storage.NewClient(ctx, option.WithTokenSource(ts))
- if err != nil {
- sklog.Fatalf("Failed to create a Google Storage API client: %s", err)
- }
-
- // Initialize cloud datastore.
- if err := DatastoreInit(*projectName, *namespace, ts); err != nil {
- sklog.Fatalf("Failed to init cloud datastore: %s", err)
- }
-
- if !*local {
- // Use the gitcookie created by gitauth package.
- user, err := user.Current()
- if err != nil {
- sklog.Fatal(err)
- }
- gitcookiesPath := filepath.Join(user.HomeDir, ".gitcookies")
- if _, err := gitauth.New(ts, gitcookiesPath, true, *serviceAccount); err != nil {
- sklog.Fatalf("Failed to create git cookie updater: %s", err)
- }
- }
-
- // Initialize checkouts but do not block bringing up the server.
- go func() {
- checkoutsReadyMutex.Lock()
- defer checkoutsReadyMutex.Unlock()
- if err := CheckoutsInit(*numCheckouts, *workdir, *repoUpdateDuration, storageClient); err != nil {
- sklog.Fatalf("Failed to init checkouts: %s", err)
- }
- }()
-
- // Subscribe to storage pubsub events.
- eventBus, err := gevent.New(*projectID, *topic, *subscriberName)
- if err != nil {
- sklog.Fatalf("Error creating event bus: %s", err)
- }
- eventType, err := eventBus.RegisterStorageEvents(*storageBucket, "", nil, storageClient)
- if err != nil {
- sklog.Fatalf("Error: %s", err)
- }
- resultCh := make(chan *gsFileLocation)
- sklog.Infof("Registered storage events. Eventtype: %s", eventType)
- eventBus.SubscribeAsync(eventType, func(evt interface{}) {
- file := evt.(*eventbus.StorageEvent)
- if file.OverwroteGeneration != "" {
- return
- }
- sklog.Infof("Received storage event: %s / %s\n", file.BucketID, file.ObjectID)
-
- // Fetch the object attributes.
- objAttr, err := storageClient.Bucket(file.BucketID).Object(file.ObjectID).Attrs(ctx)
- if err != nil {
- sklog.Errorf("Unable to get handle for '%s/%s': %s", file.BucketID, file.ObjectID, err)
- return
- }
-
- resultCh <- newGCSFileLocation(objAttr, storageClient)
- })
-
- // Reset metrics on server startup.
- resetMetrics()
-
- // Find and reschedule all CompileTasks that are in "running" state. Any
- // "running" CompileTasks means that the server was restarted in the middle
- // of run(s). Do not block bringing up the server.
- go func() {
- _, runningTasksAndKeys, err := GetCompileTasksAndKeys()
- if err != nil {
- sklog.Fatalf("Failed to retrieve compile tasks and keys: %s", err)
- }
-
- for _, taskAndKey := range runningTasksAndKeys {
- sklog.Infof("Found orphaned task %d. Retriggering it...", taskAndKey.key.ID)
- // Fetch the object attributes.
- fileName := fmt.Sprintf("%s-%d-%d.json", taskAndKey.task.LunchTarget, taskAndKey.task.Issue, taskAndKey.task.PatchSet)
- objAttr, err := storageClient.Bucket(*storageBucket).Object(fileName).Attrs(ctx)
- if err != nil {
- sklog.Fatalf("Unable to get handle for orphaned task '%s/%s': %s", *storageBucket, fileName, err)
- }
-
- triggerCompileTask(ctx, newGCSFileLocation(objAttr, storageClient), taskAndKey.task, taskAndKey.key)
- }
- }()
-
- // Wait for compile task requests that come in.
- go func() {
- for true {
- fileLocation := <-resultCh
- if err = readGSAndTriggerCompileTask(ctx, fileLocation); err != nil {
- sklog.Errorf("Error when reading from GS and triggering compile task: %s", err)
- continue
- }
- }
- }()
-
- runServer()
-}
diff --git a/android_compile/go/android_compile/metrics.go b/android_compile/go/android_compile/metrics.go
deleted file mode 100644
index 7ba65de..0000000
--- a/android_compile/go/android_compile/metrics.go
+++ /dev/null
@@ -1,71 +0,0 @@
-package main
-
-import (
- "sync"
-
- "go.skia.org/infra/go/metrics2"
-)
-
-var (
- // Metrics regarding number of waiting and running tasks.
- queueLengthMetric = metrics2.GetCounter("android_compile_waiting_tasks", nil)
- runningLengthMetric = metrics2.GetCounter("android_compile_running_tasks", nil)
- // Mutex to control access to the above metrics.
- lengthMetricsMutex = sync.Mutex{}
-
- // Metric regarding infra failures and it's mutex.
- infraFailureMetric = metrics2.GetInt64Metric("android_compile_infra_failure", nil)
- infraFailureMetricMutex = sync.Mutex{}
-
- // Metric regarding broken android tree and it's mutex.
- androidTreeBrokenMetric = metrics2.GetInt64Metric("android_compile_tree_broken", nil)
- androidTreeBrokenMetricMutex = sync.Mutex{}
-
- // Metric regarding mirror syncs. Does not need a mutex because the tree is
- // only updated after a mutex lock.
- mirrorSyncFailureMetric = metrics2.GetInt64Metric("android_compile_mirror_sync_failure", nil)
-)
-
-func resetMetrics() {
- queueLengthMetric.Reset()
- runningLengthMetric.Reset()
-}
-
-func updateInfraFailureMetric(failure bool) {
- val := 0
- if failure {
- val = 1
- }
- infraFailureMetricMutex.Lock()
- defer infraFailureMetricMutex.Unlock()
- infraFailureMetric.Update(int64(val))
-}
-
-func updateAndroidTreeBrokenMetric(broken bool) {
- val := 0
- if broken {
- val = 1
- }
- androidTreeBrokenMetricMutex.Lock()
- defer androidTreeBrokenMetricMutex.Unlock()
- androidTreeBrokenMetric.Update(int64(val))
-}
-
-func moveToRunningMetric() {
- lengthMetricsMutex.Lock()
- defer lengthMetricsMutex.Unlock()
- queueLengthMetric.Dec(1)
- runningLengthMetric.Inc(1)
-}
-
-func decRunningMetric() {
- lengthMetricsMutex.Lock()
- defer lengthMetricsMutex.Unlock()
- runningLengthMetric.Dec(1)
-}
-
-func incWaitingMetric() {
- lengthMetricsMutex.Lock()
- defer lengthMetricsMutex.Unlock()
- queueLengthMetric.Inc(1)
-}
diff --git a/android_compile/Dockerfile b/android_compile/go/android_compile_be/Dockerfile
similarity index 96%
rename from android_compile/Dockerfile
rename to android_compile/go/android_compile_be/Dockerfile
index 063e242..fa38f47 100644
--- a/android_compile/Dockerfile
+++ b/android_compile/go/android_compile_be/Dockerfile
@@ -39,4 +39,4 @@
ENV JAVA_HOME /usr/lib/jvm/java-1.8.0-openjdk-amd64
ENV PATH /home/skia/bin:$PATH
-ENTRYPOINT ["/usr/local/bin/android_compile"]
+ENTRYPOINT ["/usr/local/bin/android_compile_be"]
diff --git a/android_compile/go/android_compile/checkouts.go b/android_compile/go/android_compile_be/checkouts.go
similarity index 93%
rename from android_compile/go/android_compile/checkouts.go
rename to android_compile/go/android_compile_be/checkouts.go
index 5cf0e59..70cd5b1 100644
--- a/android_compile/go/android_compile/checkouts.go
+++ b/android_compile/go/android_compile_be/checkouts.go
@@ -14,9 +14,9 @@
"sync"
"time"
- "cloud.google.com/go/datastore"
"cloud.google.com/go/storage"
"github.com/cenkalti/backoff"
+ ac_util "go.skia.org/infra/android_compile/go/util"
"go.skia.org/infra/go/android_skia_checkout"
"go.skia.org/infra/go/cleanup"
sk_exec "go.skia.org/infra/go/exec"
@@ -69,7 +69,7 @@
)
var (
- availableCheckoutsChan chan string
+ AvailableCheckoutsChan chan string
bucketHandle *storage.BucketHandle
@@ -102,9 +102,9 @@
func UpdateMirror(ctx context.Context) {
if err := updateCheckout(ctx, pathToMirror, true); err != nil {
sklog.Errorf("Error when updating the mirror: %s", err)
- mirrorSyncFailureMetric.Update(1)
+ ac_util.MirrorSyncFailureMetric.Update(1)
} else {
- mirrorSyncFailureMetric.Update(0)
+ ac_util.MirrorSyncFailureMetric.Update(0)
}
}
@@ -143,7 +143,7 @@
// Slice that will be used to update all checkouts in parallel.
checkoutsToUpdate := []string{}
// Channel that will be used to determine which checkouts are available.
- availableCheckoutsChan = make(chan string, numCheckouts)
+ AvailableCheckoutsChan = make(chan string, numCheckouts)
// Populate the channel with available checkouts.
pathToMirrorManifest := filepath.Join(pathToMirror, "platform", "manifest.git")
for i := 1; i <= numCheckouts; i++ {
@@ -160,9 +160,10 @@
return fmt.Errorf("Error running init on %s: %s", checkoutPath, err)
}
checkoutsMutex.RUnlock()
+ checkoutsToUpdate = append(checkoutsToUpdate, checkoutPath)
+
}
}
- checkoutsToUpdate = append(checkoutsToUpdate, checkoutPath)
addToCheckoutsChannel(checkoutPath)
}
@@ -307,6 +308,9 @@
sklog.Info("Will recreate mirror before next sync.")
}
MirrorLastSynced = time.Now()
+ if err := ac_util.UpdateInstanceInDS(ctx, hostname, MirrorLastSynced.Format("Mon Jan 2 15:04:05 MST"), *repoUpdateDuration, false); err != nil {
+ sklog.Errorf("Failed to update instance kind in datastore: %s", err)
+ }
}
}()
@@ -433,7 +437,7 @@
}
func addToCheckoutsChannel(checkout string) {
- availableCheckoutsChan <- checkout
+ AvailableCheckoutsChan <- checkout
}
// RunCompileTask runs the specified CompileTask using the following algorithm-
@@ -464,19 +468,17 @@
// that the tree is not broken by building at Skia HEAD. Update CompileTask
// with link to logs and whether the no patch run was successful.
//
-func RunCompileTask(ctx context.Context, g *gsFileLocation, task *CompileTask, datastoreKey *datastore.Key, pathToCompileScript string) error {
- incWaitingMetric()
+func RunCompileTask(ctx context.Context, g *gsFileLocation, task *ac_util.CompileTask, pathToCompileScript string) error {
// Blocking call to wait for an available checkout.
- checkoutPath := <-availableCheckoutsChan
- moveToRunningMetric()
- defer decRunningMetric()
+ checkoutPath := <-AvailableCheckoutsChan
defer addToCheckoutsChannel(checkoutPath)
// Step 1: Find an available Android checkout and update the CompileTask
// with the checkout. This is done for the UI and for easier debugging.
+ datastoreKey := ac_util.GetTaskDSKey(task.LunchTarget, task.Issue, task.PatchSet)
task.Checkout = path.Base(checkoutPath)
- if err := UpdateCompileTask(ctx, g, datastoreKey, task); err != nil {
- return fmt.Errorf("Could not update compile task with ID %d: %s", datastoreKey.ID, err)
+ if err := UpdateCompileTask(ctx, g, task); err != nil {
+ return fmt.Errorf("Could not update compile task with Key %s: %s", datastoreKey.Name, err)
}
skiaPath := filepath.Join(checkoutPath, "external", "skia")
@@ -531,8 +533,8 @@
}
task.IsMasterBranch = fromMaster
if !task.IsMasterBranch {
- if err := UpdateCompileTask(ctx, g, datastoreKey, task); err != nil {
- return fmt.Errorf("Could not update compile task with ID %d: %s", datastoreKey.ID, err)
+ if err := UpdateCompileTask(ctx, g, task); err != nil {
+ return fmt.Errorf("Could not update compile task with Key %s: %s", datastoreKey.Name, err)
}
sklog.Infof("Patch with issue %d and patchset %d is not on master branch.", task.Issue, task.PatchSet)
return nil
@@ -555,7 +557,7 @@
}
task.IsMasterBranch = fromMaster
if !task.IsMasterBranch {
- if err := UpdateCompileTask(ctx, g, datastoreKey, task); err != nil {
+ if err := UpdateCompileTask(ctx, g, task); err != nil {
return fmt.Errorf("Could not update compile task with ID %d: %s", datastoreKey.ID, err)
}
sklog.Infof("Hash %s is not on master branch.", task.Hash)
@@ -571,14 +573,14 @@
// Step 8: Do the with patch or with hash compilation and update CompileTask
// with link to logs and whether it was successful.
- withPatchSuccess, gsWithPatchLink, err := compileCheckout(ctx, checkoutPath, task.LunchTarget, task.MMMATargets, fmt.Sprintf("%d_withpatch_", datastoreKey.ID), pathToCompileScript)
+ withPatchSuccess, gsWithPatchLink, err := compileCheckout(ctx, checkoutPath, task.LunchTarget, task.MMMATargets, fmt.Sprintf("%s_withpatch_", datastoreKey.Name), pathToCompileScript)
if err != nil {
return fmt.Errorf("Error when compiling checkout withpatch at %s: %s", checkoutPath, err)
}
task.WithPatchSucceeded = withPatchSuccess
task.WithPatchLog = gsWithPatchLink
- if err := UpdateCompileTask(ctx, g, datastoreKey, task); err != nil {
- return fmt.Errorf("Could not update compile task with ID %d: %s", datastoreKey.ID, err)
+ if err := UpdateCompileTask(ctx, g, task); err != nil {
+ return fmt.Errorf("Could not update compile task with Key %s: %s", datastoreKey.Name, err)
}
// Step 9: If the compilation failed and if it is a trybot run then verify
@@ -597,19 +599,19 @@
return fmt.Errorf("Could not prepare Skia checkout for compile: %s", err)
}
// Do the no patch compilation.
- noPatchSuccess, gsNoPatchLink, err := compileCheckout(ctx, checkoutPath, task.LunchTarget, task.MMMATargets, fmt.Sprintf("%d_nopatch_", datastoreKey.ID), pathToCompileScript)
+ noPatchSuccess, gsNoPatchLink, err := compileCheckout(ctx, checkoutPath, task.LunchTarget, task.MMMATargets, fmt.Sprintf("%s_nopatch_", datastoreKey.Name), pathToCompileScript)
if err != nil {
return fmt.Errorf("Error when compiling checkout nopatch at %s: %s", checkoutPath, err)
}
- updateAndroidTreeBrokenMetric(!noPatchSuccess)
+ ac_util.UpdateAndroidTreeBrokenMetric(!noPatchSuccess)
task.NoPatchSucceeded = noPatchSuccess
task.NoPatchLog = gsNoPatchLink
- if err := UpdateCompileTask(ctx, g, datastoreKey, task); err != nil {
- return fmt.Errorf("Could not update compile task with ID %d: %s", datastoreKey.ID, err)
+ if err := UpdateCompileTask(ctx, g, task); err != nil {
+ return fmt.Errorf("Could not update compile task with Key %s: %s", datastoreKey.Name, err)
}
} else {
// The with patch run succeeded. Mark the android_compile_tree_broken metric accordingly.
- updateAndroidTreeBrokenMetric(false)
+ ac_util.UpdateAndroidTreeBrokenMetric(false)
}
return nil
diff --git a/android_compile/go/android_compile_be/main.go b/android_compile/go/android_compile_be/main.go
new file mode 100644
index 0000000..420a06e
--- /dev/null
+++ b/android_compile/go/android_compile_be/main.go
@@ -0,0 +1,420 @@
+/*
+ Android Compile Server for Skia Bots.
+*/
+
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "flag"
+ "fmt"
+ "html/template"
+ "os"
+ "os/user"
+ "path/filepath"
+ "sync"
+ "time"
+
+ "cloud.google.com/go/datastore"
+ "cloud.google.com/go/pubsub"
+ "cloud.google.com/go/storage"
+ "golang.org/x/oauth2"
+ "google.golang.org/api/option"
+
+ ac_util "go.skia.org/infra/android_compile/go/util"
+ "go.skia.org/infra/go/auth"
+ "go.skia.org/infra/go/cleanup"
+ "go.skia.org/infra/go/common"
+ "go.skia.org/infra/go/gitauth"
+ "go.skia.org/infra/go/httputils"
+ "go.skia.org/infra/go/skiaversion"
+ "go.skia.org/infra/go/sklog"
+ "go.skia.org/infra/go/util"
+)
+
+const (
+ // This determines the number of go routines we want to run when receiving pubsub messages.
+ // When we receive a pubsub msg we handle it immediately and either Ack it or Nack it,
+ // there should be no need for parallel receives. Also we do not expect to get tons of parallel
+ // tryjob requests so handling one at a time should be ok.
+ MAX_PARALLEL_RECEIVES = 1
+)
+
+var (
+ // Flags
+ local = flag.Bool("local", false, "Running locally if true. As opposed to in production.")
+ port = flag.String("port", ":8000", "HTTP service port.")
+ promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':20000')")
+ resourcesDir = flag.String("resources_dir", "", "The directory to find compile.sh and template files. If blank then the directory two directories up from this source file will be used.")
+ workdir = flag.String("workdir", ".", "Directory to use for scratch work.")
+ numCheckouts = flag.Int("num_checkouts", 10, "The number of checkouts the Android compile server should maintain.")
+ repoUpdateDuration = flag.Duration("repo_update_duration", 1*time.Hour, "How often to update the main Android repository.")
+ serviceAccount = flag.String("service_account", "", "Should be set when running in K8s.")
+
+ // Useful for debugging.
+ hang = flag.Bool("hang", false, "If true, just hang and do nothing.")
+
+ // Pubsub for storage flags.
+ projectID = flag.String("project_id", "google.com:skia-corp", "Project ID of the Cloud project where the PubSub topic and GS bucket lives.")
+ storageBucket = flag.String("bucket", "android-compile-tasks-staging", "Storage bucket where android compile task JSON files will be kept.")
+ subscriberName = flag.String("subscriber", "android-compile-tasks-staging", "ID of the pubsub subscriber.")
+ topicName = flag.String("topic", "android-compile-tasks-staging", "Google Cloud PubSub topic of the eventbus.")
+
+ // Datastore params
+ namespace = flag.String("namespace", "android-compile-staging", "The Cloud Datastore namespace, such as 'android-compile'.")
+ projectName = flag.String("project_name", "google.com:skia-corp", "The Google Cloud project name.")
+
+ // Used to signal when checkouts are ready to serve requests.
+ checkoutsReadyMutex sync.RWMutex
+
+ // indexTemplate is the main index.html page we serve.
+ indexTemplate *template.Template = nil
+
+ hostname string
+)
+
+// triggerCompileTask runs the specified CompileTask in a goroutine. After
+// completion the task is marked as Done and updated in the Datastore.
+func triggerCompileTask(ctx context.Context, g *gsFileLocation, task *ac_util.CompileTask) {
+ go func() {
+ checkoutsReadyMutex.RLock()
+ defer checkoutsReadyMutex.RUnlock()
+ pathToCompileScript := filepath.Join(*resourcesDir, "compile.sh")
+ datastoreKey := ac_util.GetTaskDSKey(task.LunchTarget, task.Issue, task.PatchSet)
+ sklog.Infof("Triggering %s", datastoreKey.Name)
+ if err := RunCompileTask(ctx, g, task, pathToCompileScript); err != nil {
+ task.InfraFailure = true
+ task.Error = err.Error()
+ sklog.Errorf("Error when compiling task with Key %s: %s", datastoreKey.Name, err)
+ }
+ task.Done = true
+ task.Completed = time.Now()
+ if err := UpdateCompileTask(ctx, g, task); err != nil {
+ sklog.Errorf("Could not update compile task with Key %s: %s", datastoreKey.Name, err)
+ }
+ }()
+}
+
+// UpdateCompileTask updates the task in both Google storage and in Datastore.
+func UpdateCompileTask(ctx context.Context, g *gsFileLocation, task *ac_util.CompileTask) error {
+ datastoreKey := ac_util.GetTaskDSKey(task.LunchTarget, task.Issue, task.PatchSet)
+ if err := updateTaskInGoogleStorage(ctx, g, *task); err != nil {
+ return fmt.Errorf("Could not update in Google storage compile task with Key %s: %s", datastoreKey.Name, err)
+ }
+ if _, err := ac_util.UpdateTaskInDS(ctx, task); err != nil {
+ return fmt.Errorf("Could not update in Datastore compile task with Key %s: %s", datastoreKey.Name, err)
+ }
+ return nil
+}
+
+func updateTaskInGoogleStorage(ctx context.Context, g *gsFileLocation, task ac_util.CompileTask) error {
+ // Update the Google storage file with the taskID.
+ b, err := json.Marshal(struct {
+ ac_util.CompileTask
+ TaskID string `json:"task_id"`
+ }{
+ CompileTask: task,
+ TaskID: ac_util.GetTaskDSKey(task.LunchTarget, task.Issue, task.PatchSet).Name,
+ })
+ if err != nil {
+ return fmt.Errorf("Could not re-encode compile task: %s", err)
+ }
+ wr := g.storageClient.Bucket(g.bucket).Object(g.name).NewWriter(ctx)
+ defer util.Close(wr)
+ wr.ObjectAttrs.ContentEncoding = "application/json"
+ if _, err := wr.Write(b); err != nil {
+ return fmt.Errorf("Failed writing JSON to GCS: %s", err)
+ }
+ return nil
+}
+
+type gsFileLocation struct {
+ bucket string
+ name string
+ storageClient *storage.Client
+}
+
+func newGCSFileLocation(bucket, name string, storageClient *storage.Client) *gsFileLocation {
+ return &gsFileLocation{
+ bucket: bucket,
+ name: name,
+ storageClient: storageClient,
+ }
+}
+
+func initPubSub(ts oauth2.TokenSource, resultCh chan *ac_util.CompileTask, storageClient *storage.Client) error {
+ ctx := context.Background()
+
+ // Create a client.
+ client, err := pubsub.NewClient(ctx, *projectID, option.WithTokenSource(ts))
+ if err != nil {
+ return err
+ }
+
+ // Create topic and subscription if necessary.
+
+ // Topic.
+ topic := client.Topic(*topicName)
+ exists, err := topic.Exists(ctx)
+ if err != nil {
+ return err
+ }
+ if !exists {
+ if _, err := client.CreateTopic(ctx, *topicName); err != nil {
+ return err
+ }
+ }
+
+ // Subscription.
+ subName := fmt.Sprintf("%s+%s", *subscriberName, *topicName)
+ sub := client.Subscription(subName)
+ exists, err = sub.Exists(ctx)
+ if err != nil {
+ return err
+ }
+ if !exists {
+ if _, err := client.CreateSubscription(ctx, subName, pubsub.SubscriptionConfig{
+ Topic: topic,
+ AckDeadline: 10 * time.Second,
+ }); err != nil {
+ return err
+ }
+ }
+ sub.ReceiveSettings.MaxOutstandingMessages = MAX_PARALLEL_RECEIVES
+ sub.ReceiveSettings.NumGoroutines = MAX_PARALLEL_RECEIVES
+
+ go func() {
+ for {
+ // The pubsub receive callback method does either an Ack()
+ // or Nack() with a comment before all returns.
+ //
+ //
+ // The following cases are Nack'ed:
+ // * All checkouts on this instance are busy and cannot
+ // pickup the task yet.
+ // * This instance is syncing it's mirror and cannot
+ // pickup the task till it is done.
+ // * Unknown errors when trying to claim the task.
+ //
+ //
+ // The followed cases are Ack'ed:
+ // * Could not decode the pubsub message body.
+ // * When storage object is overwritten (not new file). We
+ // only care about processing new files.
+ // * The file no longer exists in Google storage.
+ // * Could not read the JSON in the file.
+ // * There is a datastore entry for the file that says that
+ // the current instance owns it but is not running it yet.
+ // Something probably went wrong and we will Ack it and run
+ // the task.
+ // * There is a datastore entry for the file that says that
+ // another instance owns it. This instance will Ack it and not
+ // run the task to avoid duplicated work.
+ // * If none of the above Nack or Ack cases matched it then we
+ // will Ack it and run the task.
+ //
+ if err := sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
+ var message struct {
+ Bucket string `json:"bucket"`
+ Name string `json:"name"`
+ }
+ if err := json.Unmarshal(m.Data, &message); err != nil {
+ sklog.Errorf("Failed to decode pubsub message body: %s", err)
+ m.Ack() // We'll never be able to handle this message.
+ return
+ }
+
+ // The overwroteGeneration attribute only appears in OBJECT_FINALIZE
+ // events in the case of an overwrite.
+ // Source: https://cloud.google.com/storage/docs/pubsub-notifications
+ // We ignore such messages because we only care about
+ // processing new files.
+ if m.Attributes["overwroteGeneration"] != "" {
+ sklog.Debugf("Override for %s", message.Name)
+ m.Ack() // An existing request.
+ return
+ }
+
+ sklog.Infof("Received storage event: %s / %s\n", message.Bucket, message.Name)
+ data, err := storageClient.Bucket(message.Bucket).Object(message.Name).NewReader(ctx)
+ if err != nil {
+ sklog.Errorf("New reader failed for %s/%s: %s", message.Bucket, message.Name, err)
+ m.Ack() // Maybe the file no longer exists.
+ return
+ }
+ task := ac_util.CompileTask{}
+ if err := json.NewDecoder(data).Decode(&task); err != nil {
+ sklog.Errorf("Failed to parse request: %s", err)
+ m.Ack() // We'll probably never be able to handle this message.
+ return
+ }
+
+ // Is this instance ready to pickup new tasks? Checks for the following to decide:
+ // * Are all checkouts busy?
+ // * Is mirror sync going on?
+ // If either of these cases are true then the message is Nack'ed. Hopefully another instance
+ // handles it, else this instance will pick it up when free.
+ if len(AvailableCheckoutsChan) == 0 {
+ sklog.Debugf("All %d checkouts are busy. Nack'ing %s .", *numCheckouts, message.Name)
+ if err := ac_util.AddUnownedCompileTask(&task); err != nil {
+ sklog.Error(err)
+ }
+ m.Nack()
+ return
+ } else if getMirrorUpdateRunning() {
+ sklog.Debugf("Mirror is being updated right now. Nack'ing %s.", message.Name)
+ if err := ac_util.AddUnownedCompileTask(&task); err != nil {
+ sklog.Error(err)
+ }
+ m.Nack()
+ return
+ }
+
+ // Check to see if another instance picked up this task, if not then claim it.
+ if err := ac_util.ClaimAndAddCompileTask(&task, hostname /* ownedByInstance */); err != nil {
+ if err == ac_util.ErrAnotherInstanceRunningTask || err == ac_util.ErrThisInstanceRunningTask {
+ sklog.Info(err.Error())
+ m.Ack() // An instance is already running this task.
+ return
+ } else if err == ac_util.ErrThisInstanceOwnsTaskButNotRunning {
+ sklog.Info(err.Error())
+ // This instance should run this task so continue.
+ // TODO(rmistry): Not sure if this should be Ack'ed instead.
+ } else {
+ sklog.Errorf("Could not claim %s: %s", message.Name, err)
+ m.Nack() // Failed due to unknown reason. Let's try again.
+ return
+ }
+ }
+
+ // Send the new task to the results channel and Ack the message afterwards.
+ resultCh <- &task
+ m.Ack()
+ return
+ }); err != nil {
+ sklog.Errorf("Failed to receive pubsub messages: %s", err)
+ }
+ }
+ }()
+ return nil
+}
+
+func main() {
+ flag.Parse()
+
+ common.InitWithMust("android_compile", common.PrometheusOpt(promPort), common.MetricsLoggingOpt())
+ defer common.Defer()
+ skiaversion.MustLogVersion()
+ ctx := context.Background()
+
+ var err error
+ hostname, err = os.Hostname()
+ if err != nil {
+ sklog.Fatalf("Could not find hostname: %s", err)
+ }
+
+ if *projectID == "" || *topicName == "" || *subscriberName == "" {
+ sklog.Fatalf("project_id, topic and subscriber flags must all be set.")
+ }
+
+ if *hang {
+ sklog.Infof("--hang provided; doing nothing.")
+ httputils.RunHealthCheckServer(*port)
+ }
+
+ // Create token source.
+ ts, err := auth.NewDefaultTokenSource(*local, auth.SCOPE_READ_WRITE, auth.SCOPE_USERINFO_EMAIL, auth.SCOPE_GERRIT, datastore.ScopeDatastore, pubsub.ScopePubSub)
+ if err != nil {
+ sklog.Fatalf("Problem setting up default token source: %s", err)
+ }
+
+ // Instantiate storage client.
+ storageClient, err := storage.NewClient(ctx, option.WithTokenSource(ts))
+ if err != nil {
+ sklog.Fatalf("Failed to create a Google Storage API client: %s", err)
+ }
+
+ // Initialize cloud datastore.
+ if err := ac_util.DatastoreInit(*projectName, *namespace, ts); err != nil {
+ sklog.Fatalf("Failed to init cloud datastore: %s", err)
+ }
+
+ // Register instance with frontend.
+ if err := ac_util.UpdateInstanceInDS(ctx, hostname, time.Now().Format("Mon Jan 2 15:04:05 MST"), *repoUpdateDuration, false); err != nil {
+ sklog.Fatalf("Failed to update instance kind in datastore: %s", err)
+ }
+
+ if !*local {
+ // Use the gitcookie created by gitauth package.
+ user, err := user.Current()
+ if err != nil {
+ sklog.Fatal(err)
+ }
+ gitcookiesPath := filepath.Join(user.HomeDir, ".gitcookies")
+ if _, err := gitauth.New(ts, gitcookiesPath, true, *serviceAccount); err != nil {
+ sklog.Fatalf("Failed to create git cookie updater: %s", err)
+ }
+ }
+
+ // Initialize checkouts.
+ if err := CheckoutsInit(*numCheckouts, *workdir, *repoUpdateDuration, storageClient); err != nil {
+ sklog.Fatalf("Failed to init checkouts: %s", err)
+ }
+
+ // Init pubsub.
+ resultCh := make(chan *ac_util.CompileTask)
+ if err := initPubSub(ts, resultCh, storageClient); err != nil {
+ sklog.Fatal(err)
+ }
+
+ // Start listener for when the mirror should be force synced.
+ // Update mirror here and then periodically.
+ cleanup.Repeat(time.Minute, func() {
+ // Check the datastore and if it is true then Update the mirror!
+ forceMirror, err := ac_util.GetForceMirrorUpdateBool(ctx, hostname)
+ if err != nil {
+ sklog.Errorf("Could not get force mirror update bool from datastore: %s", err)
+ } else if forceMirror {
+ sklog.Info("Gone request to force sync mirror. Starting now.")
+ UpdateMirror(ctx)
+ }
+ }, nil)
+
+ // Find and reschedule all CompileTasks that are owned by this instance but did not
+ // run to completion. They likely did not run to completion because the server
+ // was restarted in the middle of run(s). Do not block bringing up the server.
+ // Any "running" CompileTasks means that the server was restarted in the middle
+ // of run(s). Do not block bringing up the server.
+ go func() {
+ _, ownedPendingTasks, err := ac_util.GetPendingCompileTasks(hostname /* ownedByInstance */)
+ if err != nil {
+ sklog.Fatalf("Failed to retrieve compile tasks and keys: %s", err)
+ }
+
+ for _, t := range ownedPendingTasks {
+ taskKey := ac_util.GetTaskDSKey(t.LunchTarget, t.Issue, t.PatchSet)
+ sklog.Infof("Found orphaned task %s. Retriggering it...", taskKey.Name)
+ // Make sure the file exists in GS first.
+ fileName := fmt.Sprintf("%s.json", taskKey.Name)
+ _, err := storageClient.Bucket(*storageBucket).Object(fileName).Attrs(ctx)
+ if err != nil {
+ sklog.Fatalf("Unable to get handle for orphaned task '%s/%s': %s", *storageBucket, fileName, err)
+ }
+
+ triggerCompileTask(ctx, newGCSFileLocation(*storageBucket, fileName, storageClient), t)
+ }
+ }()
+
+ // Wait for compile task requests that come in.
+ go func() {
+ for true {
+ compileTask := <-resultCh
+ taskKey := ac_util.GetTaskDSKey(compileTask.LunchTarget, compileTask.Issue, compileTask.PatchSet)
+ fileName := fmt.Sprintf("%s.json", taskKey.Name)
+ triggerCompileTask(ctx, newGCSFileLocation(*storageBucket, fileName, storageClient), compileTask)
+ }
+ }()
+
+ httputils.RunHealthCheckServer(*port)
+}
diff --git a/android_compile/go/android_compile_fe/Dockerfile b/android_compile/go/android_compile_fe/Dockerfile
new file mode 100644
index 0000000..dca69ca
--- /dev/null
+++ b/android_compile/go/android_compile_fe/Dockerfile
@@ -0,0 +1,7 @@
+FROM gcr.io/skia-public/basedebian:testing-slim
+
+USER skia
+
+COPY . /
+
+ENTRYPOINT ["/usr/local/bin/android_compile_fe"]
diff --git a/android_compile/go/android_compile_fe/main.go b/android_compile/go/android_compile_fe/main.go
new file mode 100644
index 0000000..a22bd92
--- /dev/null
+++ b/android_compile/go/android_compile_fe/main.go
@@ -0,0 +1,179 @@
+/*
+ Android Compile Server Frontend.
+*/
+
+package main
+
+import (
+ "context"
+ "flag"
+ "html/template"
+ "net/http"
+ "path/filepath"
+ "time"
+
+ "cloud.google.com/go/datastore"
+ "github.com/gorilla/mux"
+
+ "go.skia.org/infra/android_compile/go/util"
+ "go.skia.org/infra/go/allowed"
+ "go.skia.org/infra/go/auth"
+ "go.skia.org/infra/go/cleanup"
+ "go.skia.org/infra/go/common"
+ "go.skia.org/infra/go/httputils"
+ "go.skia.org/infra/go/login"
+ "go.skia.org/infra/go/skiaversion"
+ "go.skia.org/infra/go/sklog"
+)
+
+const (
+ FORCE_SYNC_POST_URL = "/_/force_sync"
+)
+
+var (
+ // Flags
+ local = flag.Bool("local", false, "Running locally if true. As opposed to in production.")
+ host = flag.String("host", "localhost", "HTTP service host")
+ port = flag.String("port", ":8000", "HTTP service port.")
+ promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':20000')")
+ resourcesDir = flag.String("resources_dir", "", "The directory to find compile.sh and template files. If blank then the directory two directories up from this source file will be used.")
+
+ // Datastore params
+ namespace = flag.String("namespace", "android-compile-staging", "The Cloud Datastore namespace, such as 'android-compile'.")
+ projectName = flag.String("project_name", "google.com:skia-corp", "The Google Cloud project name.")
+
+ // indexTemplate is the main index.html page we serve.
+ indexTemplate *template.Template = nil
+
+ serverURL string
+)
+
+func reloadTemplates() {
+ indexTemplate = template.Must(template.ParseFiles(
+ filepath.Join(*resourcesDir, "templates/index.html"),
+ filepath.Join(*resourcesDir, "templates/header.html"),
+ ))
+}
+
+func loginHandler(w http.ResponseWriter, r *http.Request) {
+ http.Redirect(w, r, login.LoginURL(w, r), http.StatusFound)
+ return
+}
+
+func indexHandler(w http.ResponseWriter, r *http.Request) {
+ if *local {
+ reloadTemplates()
+ }
+ w.Header().Set("Content-Type", "text/html")
+
+ if login.LoggedInAs(r) == "" {
+ http.Redirect(w, r, login.LoginURL(w, r), http.StatusSeeOther)
+ return
+ }
+
+ unownedPendingTasks, ownedPendingTasks, err := util.GetPendingCompileTasks("" /* ownedByInstance */)
+ if err != nil {
+ httputils.ReportError(w, r, err, "Failed to get unowned/owned compile tasks")
+ return
+ }
+ androidCompileInstances, err := util.GetAllCompileInstances(context.Background())
+ if err != nil {
+ httputils.ReportError(w, r, err, "Failed to get android compile instances")
+ return
+ }
+
+ var info = struct {
+ AndroidCompileInstances []*util.AndroidCompileInstance
+ UnownedPendingTasks []*util.CompileTask
+ OwnedPendingTasks []*util.CompileTask
+ }{
+ AndroidCompileInstances: androidCompileInstances,
+ UnownedPendingTasks: unownedPendingTasks,
+ OwnedPendingTasks: ownedPendingTasks,
+ }
+
+ if err := indexTemplate.Execute(w, info); err != nil {
+ httputils.ReportError(w, r, err, "Failed to expand template")
+ return
+ }
+ return
+}
+
+func forceSyncHandler(w http.ResponseWriter, r *http.Request) {
+ if *local {
+ reloadTemplates()
+ }
+
+ if login.LoggedInAs(r) == "" {
+ http.Redirect(w, r, login.LoginURL(w, r), http.StatusSeeOther)
+ return
+ }
+
+ if err := util.SetForceMirrorUpdateOnAllInstances(context.Background()); err != nil {
+ httputils.ReportError(w, r, err, "Failed to set force mirror update on all instances")
+ return
+ }
+
+ sklog.Infof("Force sync button has been pressed by %s", login.LoggedInAs(r))
+ return
+}
+
+func runServer() {
+ r := mux.NewRouter()
+ r.PathPrefix("/res/").HandlerFunc(httputils.MakeResourceHandler(*resourcesDir))
+ r.HandleFunc("/", indexHandler)
+ r.HandleFunc(FORCE_SYNC_POST_URL, forceSyncHandler)
+
+ r.HandleFunc("/json/version", skiaversion.JsonHandler)
+ r.HandleFunc(login.DEFAULT_OAUTH2_CALLBACK, login.OAuth2CallbackHandler)
+ r.HandleFunc("/login/", loginHandler)
+ r.HandleFunc("/logout/", login.LogoutHandler)
+ r.HandleFunc("/loginstatus/", login.StatusHandler)
+ h := httputils.LoggingGzipRequestResponse(r)
+ if !*local {
+ h = httputils.HealthzAndHTTPS(h)
+ }
+
+ http.Handle("/", h)
+ sklog.Infof("Ready to serve on %s", serverURL)
+ sklog.Fatal(http.ListenAndServe(*port, nil))
+}
+
+func main() {
+ flag.Parse()
+
+ common.InitWithMust("android_compile_fe", common.PrometheusOpt(promPort), common.MetricsLoggingOpt())
+ defer common.Defer()
+ skiaversion.MustLogVersion()
+
+ reloadTemplates()
+ serverURL = "https://" + *host
+ if *local {
+ serverURL = "http://" + *host + *port
+ }
+ login.InitWithAllow(serverURL+login.DEFAULT_OAUTH2_CALLBACK, allowed.Googlers(), allowed.Googlers(), nil)
+
+ // Create token source.
+ ts, err := auth.NewDefaultTokenSource(*local, auth.SCOPE_READ_WRITE, auth.SCOPE_USERINFO_EMAIL, auth.SCOPE_GERRIT, datastore.ScopeDatastore)
+ if err != nil {
+ sklog.Fatalf("Problem setting up default token source: %s", err)
+ }
+
+ // Initialize cloud datastore.
+ if err := util.DatastoreInit(*projectName, *namespace, ts); err != nil {
+ sklog.Fatalf("Failed to init cloud datastore: %s", err)
+ }
+
+ // Start updater for the queue length metrics.
+ cleanup.Repeat(time.Minute, func() {
+ unownedPendingTasks, ownedPendingTasks, err := util.GetPendingCompileTasks("" /* ownedByInstance */)
+ if err != nil {
+ sklog.Errorf("Failed to get unowned/owned compile tasks: %s", err)
+ } else {
+ util.QueueLengthMetric.Update(int64(len(unownedPendingTasks)))
+ util.RunningLengthMetric.Update(int64(len(ownedPendingTasks)))
+ }
+ }, nil)
+
+ runServer()
+}
diff --git a/android_compile/go/util/datastore.go b/android_compile/go/util/datastore.go
new file mode 100644
index 0000000..31c56d8
--- /dev/null
+++ b/android_compile/go/util/datastore.go
@@ -0,0 +1,243 @@
+/*
+ Used by the Android Compile Server to interact with the cloud datastore.
+*/
+
+package util
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sort"
+ "time"
+
+ "cloud.google.com/go/datastore"
+ "go.skia.org/infra/go/ds"
+ "golang.org/x/oauth2"
+ "google.golang.org/api/iterator"
+ "google.golang.org/api/option"
+
+ "go.skia.org/infra/go/sklog"
+)
+
+var (
+ ErrAnotherInstanceRunningTask = errors.New("Another instance has picked up this task")
+ ErrThisInstanceRunningTask = errors.New("This instance is already running this task")
+ ErrThisInstanceOwnsTaskButNotRunning = errors.New("This instance has picked up this task but it is not running yet")
+)
+
+type AndroidCompileInstance struct {
+ MirrorLastSynced string `json:"mirror_last_synced"`
+ MirrorUpdateDuration string `json:"mirror_update_duration"`
+ ForceMirrorUpdate bool `json:"force_mirror_update"`
+ Name string `json:"name"`
+}
+
+type CompileTask struct {
+ Issue int `json:"issue"`
+ PatchSet int `json:"patchset"`
+ Hash string `json:"hash"`
+
+ LunchTarget string `json:"lunch_target"`
+ MMMATargets string `json:"mmma_targets"`
+
+ Checkout string `json:"checkout"`
+
+ Created time.Time `json:"created"`
+ Completed time.Time `json:"completed"`
+
+ WithPatchSucceeded bool `json:"withpatch_success"`
+ NoPatchSucceeded bool `json:"nopatch_success"`
+
+ WithPatchLog string `json:"withpatch_log"`
+ NoPatchLog string `json:"nopatch_log"`
+
+ CompileServerInstance string `json:"compile_server_instance"`
+ IsMasterBranch bool `json:"is_master_branch"`
+ Done bool `json:"done"`
+ Error string `json:"error"`
+ InfraFailure bool `json:"infra_failure"`
+}
+
+type sortTasks []*CompileTask
+
+func (a sortTasks) Len() int { return len(a) }
+func (a sortTasks) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
+func (a sortTasks) Less(i, j int) bool {
+ return a[i].Created.Before(a[j].Created)
+}
+
+// ClaimAndAddCompileTask adds the compile task to the datastore and marks it
+// as being owned by the specified instance.
+// The function throws the following custom errors:
+// * ErrThisInstanceOwnsTaskButNotRunning - Thrown when the specified instance
+// owns the task but it is not running yet.
+// * ErrThisInstanceRunningTask - Thrown when the specified instance owns the task
+// * and it is currently running.
+// * ErrAnotherInstanceRunningTask - Thrown when another instance (not the specified
+// instance) owns the task.
+func ClaimAndAddCompileTask(taskFromGS *CompileTask, currentInstance string) error {
+ var err error
+ _, err = ds.DS.RunInTransaction(context.Background(), func(tx *datastore.Transaction) error {
+ var taskFromDS CompileTask
+ // Use the task from GS to construct the Key and look in Datastore.
+ k := GetTaskDSKey(taskFromGS.LunchTarget, taskFromGS.Issue, taskFromGS.PatchSet)
+ if err := tx.Get(k, &taskFromDS); err != nil && err != datastore.ErrNoSuchEntity {
+ return err
+ }
+ if taskFromDS.Done {
+ sklog.Infof("%s exists in Datastore and is completed but there was a new request for it. Running it..", k)
+ } else if taskFromDS.CompileServerInstance != "" {
+ if taskFromDS.CompileServerInstance == currentInstance {
+ if taskFromDS.Checkout == "" {
+ sklog.Infof("%s has already been picked up by this instance but task is not running.", k)
+ return ErrThisInstanceOwnsTaskButNotRunning
+ } else {
+ sklog.Infof("%s has already been picked up by this instance", k)
+ return ErrThisInstanceRunningTask
+ }
+ } else {
+ sklog.Infof("%s has been picked up by %s", k, taskFromDS.CompileServerInstance)
+ return ErrAnotherInstanceRunningTask
+ }
+ }
+ // Populate some taskFromGS properties before adding to datastore.
+ taskFromGS.CompileServerInstance = currentInstance
+ taskFromGS.Created = time.Now()
+ if _, err := tx.Put(k, taskFromGS); err != nil {
+ return err
+ }
+ return nil
+ })
+ return err
+}
+
+// AddUnownedCompileTask adds the task to the datastore without an owner instance.
+// Task is added to the datastore if it does not already exist in the datastore or
+// if it exists but is marked as completed.
+func AddUnownedCompileTask(taskFromGS *CompileTask) error {
+ var err error
+ _, err = ds.DS.RunInTransaction(context.Background(), func(tx *datastore.Transaction) error {
+ var taskFromDS CompileTask
+ k := GetTaskDSKey(taskFromGS.LunchTarget, taskFromGS.Issue, taskFromGS.PatchSet)
+ if err := tx.Get(k, &taskFromDS); err != nil {
+ if err == datastore.ErrNoSuchEntity {
+ // If task does not exist then add it as a pending task.
+ taskFromGS.Created = time.Now()
+ if _, err := tx.Put(k, taskFromGS); err != nil {
+ return err
+ }
+ } else {
+ return err
+ }
+ }
+ if taskFromDS.Done {
+ // Task is in Datastore and has completed, but a new request
+ // has come in so override the old task.
+ taskFromGS.Created = time.Now()
+ if _, err := tx.Put(k, taskFromGS); err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ return err
+}
+
+// GetPendingCompileTasks returns slices of unowned tasks and currently running
+// (but not yet completed) tasks.
+func GetPendingCompileTasks(ownedByInstance string) ([]*CompileTask, []*CompileTask, error) {
+ // Pending tasks that have not been picked up by an instance yet.
+ unownedPendingTasks := []*CompileTask{}
+ // Pending tasks that have been picked up by an instance.
+ ownedPendingTasks := []*CompileTask{}
+
+ q := ds.NewQuery(ds.COMPILE_TASK).EventualConsistency().Filter("Done =", false)
+ it := ds.DS.Run(context.TODO(), q)
+ for {
+ t := &CompileTask{}
+ _, err := it.Next(t)
+ if err == iterator.Done {
+ break
+ } else if err != nil {
+ return nil, nil, fmt.Errorf("Failed to retrieve list of tasks: %s", err)
+ }
+ if t.CompileServerInstance == "" {
+ unownedPendingTasks = append(unownedPendingTasks, t)
+ } else {
+ if ownedByInstance == "" {
+ ownedPendingTasks = append(ownedPendingTasks, t)
+ } else if t.CompileServerInstance == ownedByInstance {
+ ownedPendingTasks = append(ownedPendingTasks, t)
+ }
+ }
+ }
+ sort.Sort(sortTasks(unownedPendingTasks))
+ sort.Sort(sortTasks(ownedPendingTasks))
+
+ return unownedPendingTasks, ownedPendingTasks, nil
+}
+
+func DatastoreInit(project, ns string, ts oauth2.TokenSource) error {
+ return ds.InitWithOpt(project, ns, option.WithTokenSource(ts))
+}
+
+func UpdateInstanceInDS(ctx context.Context, hostname, mirrorLastSynced string, mirrorUpdateDuration time.Duration, forceMirrorUpdate bool) error {
+ k := GetInstanceDSKey(hostname)
+ i := AndroidCompileInstance{
+ MirrorLastSynced: mirrorLastSynced,
+ MirrorUpdateDuration: mirrorUpdateDuration.String(),
+ ForceMirrorUpdate: forceMirrorUpdate,
+ Name: hostname,
+ }
+ _, err := ds.DS.Put(ctx, k, &i)
+ return err
+}
+
+func GetAllCompileInstances(ctx context.Context) ([]*AndroidCompileInstance, error) {
+ var instances []*AndroidCompileInstance
+ q := ds.NewQuery(ds.ANDROID_COMPILE_INSTANCES)
+ _, err := ds.DS.GetAll(ctx, q, &instances)
+ return instances, err
+}
+
+func SetForceMirrorUpdateOnAllInstances(ctx context.Context) error {
+ var instances []*AndroidCompileInstance
+ q := ds.NewQuery(ds.ANDROID_COMPILE_INSTANCES)
+ if _, err := ds.DS.GetAll(ctx, q, &instances); err != nil {
+ return err
+ }
+ for _, i := range instances {
+ i.ForceMirrorUpdate = true
+ if _, err := ds.DS.Put(ctx, GetInstanceDSKey(i.Name), i); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func GetForceMirrorUpdateBool(ctx context.Context, hostname string) (bool, error) {
+ k := GetInstanceDSKey(hostname)
+ var i AndroidCompileInstance
+ if err := ds.DS.Get(ctx, k, &i); err != nil {
+ return false, err
+ }
+ return i.ForceMirrorUpdate, nil
+}
+
+func GetInstanceDSKey(hostname string) *datastore.Key {
+ k := ds.NewKey(ds.ANDROID_COMPILE_INSTANCES)
+ k.Name = hostname
+ return k
+}
+
+func GetTaskDSKey(lunchTarget string, issue, patchset int) *datastore.Key {
+ k := ds.NewKey(ds.COMPILE_TASK)
+ k.Name = fmt.Sprintf("%s-%d-%d", lunchTarget, issue, patchset)
+ return k
+}
+
+func UpdateTaskInDS(ctx context.Context, t *CompileTask) (*datastore.Key, error) {
+ k := GetTaskDSKey(t.LunchTarget, t.Issue, t.PatchSet)
+ return ds.DS.Put(ctx, k, t)
+}
diff --git a/android_compile/go/util/metrics.go b/android_compile/go/util/metrics.go
new file mode 100644
index 0000000..d33f7f4
--- /dev/null
+++ b/android_compile/go/util/metrics.go
@@ -0,0 +1,31 @@
+package util
+
+import (
+ "sync"
+
+ "go.skia.org/infra/go/metrics2"
+)
+
+var (
+ // Metrics regarding number of waiting and running tasks.
+ QueueLengthMetric = metrics2.GetInt64Metric("android_compile_waiting_tasks", nil)
+ RunningLengthMetric = metrics2.GetInt64Metric("android_compile_running_tasks", nil)
+
+ // Metric regarding broken android tree and it's mutex.
+ androidTreeBrokenMetric = metrics2.GetInt64Metric("android_compile_tree_broken", nil)
+ androidTreeBrokenMetricMutex = sync.Mutex{}
+
+ // Metric regarding mirror syncs. Does not need a mutex because the tree is
+ // only updated after a mutex lock.
+ MirrorSyncFailureMetric = metrics2.GetInt64Metric("android_compile_mirror_sync_failure", nil)
+)
+
+func UpdateAndroidTreeBrokenMetric(broken bool) {
+ val := 0
+ if broken {
+ val = 1
+ }
+ androidTreeBrokenMetricMutex.Lock()
+ defer androidTreeBrokenMetricMutex.Unlock()
+ androidTreeBrokenMetric.Update(int64(val))
+}
diff --git a/android_compile/res/imp/force-sync-sk.html b/android_compile/res/imp/force-sync-sk.html
index 53e8230..8b4e3a6 100644
--- a/android_compile/res/imp/force-sync-sk.html
+++ b/android_compile/res/imp/force-sync-sk.html
@@ -32,6 +32,10 @@
border-spacing: 0px;
width: 80%;
}
+ tr.headers {
+ background-color: #CCCCFF;
+ text-align: center;
+ }
tr {
text-align: center;
}
@@ -46,19 +50,25 @@
</style>
<table class="forcesync">
- <col width = "50%">
- <tr>
- <td>
- Checkout synced at: {{mirrorLastSynced}}
- </td>
- <td>
- Periodic syncs done every: {{mirrorUpdateDuration}}
- </td>
+ <tr class="headers">
+ <td colspan=3>Instances</td>
</tr>
+ <template is="dom-repeat" items="[[androidCompileInstances]]">
+ <tr>
+ <td>
+ {{item.name}}
+ </td>
+ <td>
+ Last synced at: {{item.mirror_last_synced}}
+ </td>
+ <td>
+ Periodic syncs done every: {{item.mirror_update_duration}}
+ </td>
+ </tr>
+ </template>
<tr>
- <td colspan=2>
- <paper-button raised id="force_sync" disabled={{mirrorUpdateRunning}}>Force Sync Android Checkout</paper-button>
- <template is="dom-if" if="{{mirrorUpdateRunning}}"><br/>Checkout is currently being synced</template>
+ <td colspan=3>
+ <paper-button raised id="force_sync">Force Sync All Instances</paper-button>
</td>
</tr>
</table>
@@ -69,17 +79,9 @@
Polymer({
is: "force-sync-sk",
properties: {
- mirrorLastSynced: {
- type: Object,
- value: null,
- },
- mirrorUpdateDuration: {
- type: Object,
- value: null,
- },
- mirrorUpdateRunning: {
- type: Boolean,
- value: false,
+ androidCompileInstances: {
+ type: Array,
+ value: [],
},
},
diff --git a/android_compile/res/imp/list-tasks-sk.html b/android_compile/res/imp/list-tasks-sk.html
index b3c6137..f20835b 100644
--- a/android_compile/res/imp/list-tasks-sk.html
+++ b/android_compile/res/imp/list-tasks-sk.html
@@ -16,8 +16,9 @@
<list-tasks-sk></list-tasks-sk>
Attributes:
- waitingTasks - An array of compile tasks waiting to be picked up.
- runningTasks - An array of currently running compile tasks.
+ androidCompileInstances - An array of Android compile instances.
+ unownedPendingTasks - An array of compile tasks waiting to be picked up.
+ ownedPendingTasks - An array of currently running compile tasks.
Events:
None
@@ -55,9 +56,7 @@
<compile-menu-sk navigation></compile-menu-sk>
<br/><br/>
- <force-sync-sk mirror-last-synced="{{mirrorLastSynced}}"
- mirror-update-duration="{{mirrorUpdateDuration}}"
- mirror-update-running="{{mirrorUpdateRunning}}">
+ <force-sync-sk android-compile-instances="[[androidCompileInstances]]">
</force-sync-sk>
<br/><br/>
@@ -66,10 +65,10 @@
<col width = "50%">
<tr class="headers">
<td colspan=2>
- [[waitingTasks.length]] Tasks Waiting in Queue
+ [[unownedPendingTasks.length]] Tasks Waiting in Queue
</td>
</tr>
- <template is="dom-repeat" items="{{waitingTasks}}">
+ <template is="dom-repeat" items="{{unownedPendingTasks}}">
<tr>
<td>
<template is="dom-if" if="[[item.hash]]">
@@ -93,19 +92,14 @@
<col width = "50%">
<tr class="headers">
<td colspan=2>
- [[runningTasks.length]] Tasks Currently Running
+ [[ownedPendingTasks.length]] Tasks Currently Running
</td>
</tr>
- <template is="dom-repeat" items="{{runningTasks}}">
+ <template is="dom-repeat" items="{{ownedPendingTasks}}">
<tr>
<td>
- Running on [[item.checkout]] :
- <template is="dom-if" if="[[item.hash]]">
- <a href="[[ _getHashLink(item.hash) ]]" target="_blank">[[ _getShortHash(item.hash) ]]</a>
- </template>
- <template is="dom-if" if="[[item.issue]]">
- <a href="[[ _getGerritLink(item.issue, item.patchset) ]]" target="_blank">skrev/[[item.issue]]/[[item.patchset]]</a>
- </template>
+ Running on [[item.compile_server_instance]] ([[item.checkout]]):
+ <a href="[[ _getGerritLink(item.issue, item.patchset) ]]" target="_blank">skrev/[[item.issue]]/[[item.patchset]]</a>
</td>
<td>
Created: [[ _formatTimestamp(item.created) ]]
@@ -120,23 +114,15 @@
Polymer({
is: "list-tasks-sk",
properties: {
- mirrorLastSynced: {
- type: Object,
- value: null,
- },
- mirrorUpdateDuration: {
- type: Object,
- value: null,
- },
- mirrorUpdateRunning: {
- type: Boolean,
- value: false,
- },
- waitingTasks: {
+ androidCompileInstances: {
type: Array,
value: [],
},
- runningTasks: {
+ unownedPendingTasks: {
+ type: Array,
+ value: [],
+ },
+ ownedPendingTasks: {
type: Array,
value: [],
},
diff --git a/android_compile/templates/index.html b/android_compile/templates/index.html
index 96b4c6c..ac88633 100644
--- a/android_compile/templates/index.html
+++ b/android_compile/templates/index.html
@@ -9,11 +9,9 @@
<script type="text/javascript" charset="utf-8">
(function() {
- $$$('list-tasks-sk').mirrorLastSynced = {{.MirrorLastSynced}};
- $$$('list-tasks-sk').mirrorUpdateDuration = {{.MirrorUpdateDuration}};
- $$$('list-tasks-sk').mirrorUpdateRunning = {{.MirrorUpdateRunning}};
- $$$('list-tasks-sk').waitingTasks = {{.WaitingTasks}};
- $$$('list-tasks-sk').runningTasks = {{.RunningTasks}};
+ $$$('list-tasks-sk').androidCompileInstances = {{.AndroidCompileInstances}};
+ $$$('list-tasks-sk').unownedPendingTasks = {{.UnownedPendingTasks}};
+ $$$('list-tasks-sk').ownedPendingTasks = {{.OwnedPendingTasks}};
})();
</script>
</body>
diff --git a/go/ds/ds.go b/go/ds/ds.go
index 0378b0a..b52f03c 100644
--- a/go/ds/ds.go
+++ b/go/ds/ds.go
@@ -49,7 +49,8 @@
HELPER_RECENT_KEYS Kind = "HelperRecentKeys"
// Android Compile
- COMPILE_TASK Kind = "CompileTask"
+ COMPILE_TASK Kind = "CompileTask"
+ ANDROID_COMPILE_INSTANCES Kind = "AndroidCompileInstances"
// Leasing
TASK Kind = "Task"
@@ -139,7 +140,7 @@
GOLD_LOTTIE_NS: goldKinds,
GOLD_PDFIUM_NS: goldKinds,
GOLD_SKIA_PROD_NS: goldKinds,
- ANDROID_COMPILE_NS: {COMPILE_TASK},
+ ANDROID_COMPILE_NS: {COMPILE_TASK, ANDROID_COMPILE_INSTANCES},
LEASING_SERVER_NS: {TASK},
CT_NS: {CAPTURE_SKPS_TASKS, CHROMIUM_ANALYSIS_TASKS, CHROMIUM_BUILD_TASKS, CHROMIUM_PERF_TASKS, LUA_SCRIPT_TASKS, METRICS_ANALYSIS_TASKS, PIXEL_DIFF_TASKS, RECREATE_PAGESETS_TASKS, RECREATE_WEBPAGE_ARCHIVES_TASKS, CLUSTER_TELEMETRY_IDS},
ALERT_MANAGER_NS: {INCIDENT_AM, INCIDENT_ACTIVE_PARENT_AM, SILENCE_AM, SILENCE_ACTIVE_PARENT_AM},