[status] Fixes for status in k8s

Bug: skia:
Change-Id: I15a36efde7f5abe932adbf3dc80f8b896363ef03
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/199681
Reviewed-by: Ben Wagner <benjaminwagner@google.com>
Commit-Queue: Eric Boren <borenet@google.com>
diff --git a/status/build_docker_release b/status/build_docker_release
index bea978b..46f6d29 100755
--- a/status/build_docker_release
+++ b/status/build_docker_release
@@ -17,7 +17,6 @@
 ${INSTALL} --mode=644 -T ./templates/header.html      ${ROOT}/usr/local/share/status/templates/header.html
 ${INSTALL} --mode=644 -T ./templates/commits.html     ${ROOT}/usr/local/share/status/templates/commits.html
 ${INSTALL} --mode=644 -T ./templates/capacity.html    ${ROOT}/usr/local/share/status/templates/capacity.html
-${INSTALL} --mode=644 -T ../infra/config/recipes.cfg  ${ROOT}/usr/local/share/status/recipes.cfg
 }
 
 source ../bash/docker_build.sh
diff --git a/status/go/status/main.go b/status/go/status/main.go
index d72871a..d3dff37 100644
--- a/status/go/status/main.go
+++ b/status/go/status/main.go
@@ -100,7 +100,6 @@
 	swarmingUrl                 = flag.String("swarming_url", "https://chromium-swarm.appspot.com", "URL of the Swarming server.")
 	taskSchedulerUrl            = flag.String("task_scheduler_url", "https://task-scheduler.skia.org", "URL of the Task Scheduler server.")
 	testing                     = flag.Bool("testing", false, "Set to true for locally testing rules. No email will be sent.")
-	useMetadata                 = flag.Bool("use_metadata", true, "Load sensitive values from metadata not from flags.")
 	workdir                     = flag.String("workdir", ".", "Directory to use for scratch work.")
 	pubsubTopicSet              = flag.String("pubsub_topic_set", "", fmt.Sprintf("Pubsub topic set; one of: %v", pubsub.VALID_TOPIC_SETS))
 
@@ -705,9 +704,6 @@
 	skiaversion.MustLogVersion()
 
 	Init()
-	if *testing {
-		*useMetadata = false
-	}
 	serverURL := "https://" + *host
 	if *testing {
 		serverURL = "http://" + *host + *port
@@ -732,7 +728,7 @@
 	label := *host
 	mod, err := pubsub.NewModifiedData(*pubsubTopicSet, label, ts)
 	if err != nil {
-		sklog.Fatal(err)
+		sklog.Fatalf("Failed to initialize pubsub: %s", err)
 	}
 	taskDb, err = firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, ts, mod)
 	if err != nil {
@@ -759,7 +755,7 @@
 	sklog.Info("Checkout complete")
 
 	// Cache for buildProgressHandler.
-	tasksPerCommit, err = newTasksPerCommitCache(ctx, *workdir, *repoUrls, 14*24*time.Hour, *btProject, *btInstance, ts)
+	tasksPerCommit, err = newTasksPerCommitCache(ctx, repos, 14*24*time.Hour, *btProject, *btInstance, ts)
 	if err != nil {
 		sklog.Fatalf("Failed to create tasksPerCommitCache: %s", err)
 	}
diff --git a/status/go/status/tasks_per_commit.go b/status/go/status/tasks_per_commit.go
index 909a359..8566973 100644
--- a/status/go/status/tasks_per_commit.go
+++ b/status/go/status/tasks_per_commit.go
@@ -3,8 +3,6 @@
 import (
 	"context"
 	"fmt"
-	"os"
-	"path"
 	"sync"
 	"time"
 
@@ -27,21 +25,7 @@
 }
 
 // newTasksPerCommitCache returns a tasksPerCommitCache instance.
-func newTasksPerCommitCache(ctx context.Context, workdir string, repoUrls []string, period time.Duration, btProject, btInstance string, ts oauth2.TokenSource) (*tasksPerCommitCache, error) {
-	wd := path.Join(workdir, "tasksPerCommitCache")
-	if _, err := os.Stat(wd); err != nil {
-		if os.IsNotExist(err) {
-			if err := os.Mkdir(wd, os.ModePerm); err != nil {
-				return nil, err
-			}
-		} else {
-			return nil, fmt.Errorf("There is a problem with the workdir: %s", err)
-		}
-	}
-	repos, err := repograph.NewMap(ctx, repoUrls, wd)
-	if err != nil {
-		return nil, err
-	}
+func newTasksPerCommitCache(ctx context.Context, repos repograph.Map, period time.Duration, btProject, btInstance string, ts oauth2.TokenSource) (*tasksPerCommitCache, error) {
 	tcc, err := task_cfg_cache.NewTaskCfgCache(ctx, repos, btProject, btInstance, ts)
 	if err != nil {
 		return nil, err
@@ -101,9 +85,6 @@
 
 // update pulls down new commits and evicts old entries from the cache.
 func (c *tasksPerCommitCache) update(ctx context.Context) error {
-	if err := c.repos.Update(ctx); err != nil {
-		return err
-	}
 	c.mtx.Lock()
 	defer c.mtx.Unlock()
 	start := time.Now().Add(-c.period)
diff --git a/status/go/statusk/main.go b/status/go/statusk/main.go
index ef551d7..457f91c 100644
--- a/status/go/statusk/main.go
+++ b/status/go/statusk/main.go
@@ -22,6 +22,7 @@
 	"unicode"
 
 	"cloud.google.com/go/bigtable"
+	"cloud.google.com/go/datastore"
 	"github.com/gorilla/mux"
 	"go.skia.org/infra/go/allowed"
 	"go.skia.org/infra/go/auth"
@@ -101,14 +102,12 @@
 	host                        = flag.String("host", "localhost", "HTTP service host")
 	port                        = flag.String("port", ":8002", "HTTP service port (e.g., ':8002')")
 	promPort                    = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')")
-	recipesCfgFile              = flag.String("recipes_cfg", "", "Path to the recipes.cfg file.")
 	repoUrls                    = common.NewMultiStringFlag("repo", nil, "Repositories to query for status.")
 	resourcesDir                = flag.String("resources_dir", "", "The directory to find templates, JS, and CSS files. If blank the current directory will be used.")
 	chromeInfraAuthJWT          = flag.String("service_account_jwt", "/var/secrets/skia-public-auth/key.json", "The JWT key for the service account that has access to chrome infra auth.")
 	swarmingUrl                 = flag.String("swarming_url", "https://chromium-swarm.appspot.com", "URL of the Swarming server.")
 	taskSchedulerUrl            = flag.String("task_scheduler_url", "https://task-scheduler.skia.org", "URL of the Task Scheduler server.")
 	testing                     = flag.Bool("testing", false, "Set to true for locally testing rules. No email will be sent.")
-	useMetadata                 = flag.Bool("use_metadata", true, "Load sensitive values from metadata not from flags.")
 	workdir                     = flag.String("workdir", ".", "Directory to use for scratch work.")
 	pubsubTopicSet              = flag.String("pubsub_topic_set", "", fmt.Sprintf("Pubsub topic set; one of: %v", pubsub.VALID_TOPIC_SETS))
 
@@ -692,16 +691,13 @@
 	skiaversion.MustLogVersion()
 
 	Init()
-	if *testing {
-		*useMetadata = false
-	}
 	serverURL := "https://" + *host
 	if *testing {
 		serverURL = "http://" + *host + *port
 	}
 	ctx := context.Background()
 
-	ts, err := auth.NewDefaultTokenSource(false, auth.SCOPE_USERINFO_EMAIL, auth.SCOPE_GERRIT, bigtable.Scope, pubsub.AUTH_SCOPE)
+	ts, err := auth.NewDefaultTokenSource(*testing, auth.SCOPE_USERINFO_EMAIL, auth.SCOPE_GERRIT, bigtable.Scope, pubsub.AUTH_SCOPE, datastore.ScopeDatastore)
 	if err != nil {
 		sklog.Fatal(err)
 	}
@@ -725,7 +721,7 @@
 	label := *host
 	mod, err := pubsub.NewModifiedData(*pubsubTopicSet, label, ts)
 	if err != nil {
-		sklog.Fatal(err)
+		sklog.Fatalf("Failed to initialize pubsub: %s", err)
 	}
 	taskDb, err = firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, ts, mod)
 	if err != nil {
@@ -753,7 +749,7 @@
 		sklog.Fatalf("Failed to create repos dir: %s", err)
 	}
 	if *repoUrls == nil {
-		*repoUrls = common.PUBLIC_REPOS
+		sklog.Fatal("At least one --repo is required.")
 	}
 	repos, err = repograph.NewMap(ctx, *repoUrls, reposDir)
 	if err != nil {
@@ -765,7 +761,7 @@
 	sklog.Info("Checkout complete")
 
 	// Cache for buildProgressHandler.
-	tasksPerCommit, err = newTasksPerCommitCache(ctx, *workdir, *recipesCfgFile, repos, 14*24*time.Hour, *btProject, *btInstance, ts)
+	tasksPerCommit, err = newTasksPerCommitCache(ctx, repos, 14*24*time.Hour, *btProject, *btInstance, ts)
 	if err != nil {
 		sklog.Fatalf("Failed to create tasksPerCommitCache: %s", err)
 	}
@@ -839,11 +835,12 @@
 	})
 
 	// Create the TaskDriver DB.
-	taskDriverDb, err = bigtable_db.NewBigTableDB(ctx, *btProject, *btInstance, ts)
+	taskDriverBtInstance := "staging" // Task Drivers aren't in prod yet.
+	taskDriverDb, err = bigtable_db.NewBigTableDB(ctx, *btProject, taskDriverBtInstance, ts)
 	if err != nil {
 		sklog.Fatal(err)
 	}
-	taskDriverLogs, err = logs.NewLogsManager(ctx, *btProject, *btInstance, ts)
+	taskDriverLogs, err = logs.NewLogsManager(ctx, *btProject, taskDriverBtInstance, ts)
 	if err != nil {
 		sklog.Fatal(err)
 	}
diff --git a/status/go/statusk/tasks_per_commit.go b/status/go/statusk/tasks_per_commit.go
index a6d3bce..8566973 100644
--- a/status/go/statusk/tasks_per_commit.go
+++ b/status/go/statusk/tasks_per_commit.go
@@ -3,8 +3,6 @@
 import (
 	"context"
 	"fmt"
-	"os"
-	"path"
 	"sync"
 	"time"
 
@@ -27,17 +25,7 @@
 }
 
 // newTasksPerCommitCache returns a tasksPerCommitCache instance.
-func newTasksPerCommitCache(ctx context.Context, workdir, recipesCfgFile string, repos repograph.Map, period time.Duration, btProject, btInstance string, ts oauth2.TokenSource) (*tasksPerCommitCache, error) {
-	wd := path.Join(workdir, "tasksPerCommitCache")
-	if _, err := os.Stat(wd); err != nil {
-		if os.IsNotExist(err) {
-			if err := os.Mkdir(wd, os.ModePerm); err != nil {
-				return nil, err
-			}
-		} else {
-			return nil, fmt.Errorf("There is a problem with the workdir: %s", err)
-		}
-	}
+func newTasksPerCommitCache(ctx context.Context, repos repograph.Map, period time.Duration, btProject, btInstance string, ts oauth2.TokenSource) (*tasksPerCommitCache, error) {
 	tcc, err := task_cfg_cache.NewTaskCfgCache(ctx, repos, btProject, btInstance, ts)
 	if err != nil {
 		return nil, err
diff --git a/status/sys/status-internal.service b/status/sys/status-internal.service
index 78e4b7d..0e201df 100644
--- a/status/sys/status-internal.service
+++ b/status/sys/status-internal.service
@@ -10,7 +10,6 @@
     --bigtable_instance=internal \
     --logtostderr \
     --workdir=/mnt/pd0/status_workdir \
-    --use_metadata=true \
     --host=status-internal.skia.org \
     --resources_dir=/usr/local/share/status \
     --capacity_recalculate_interval=30m \
diff --git a/status/sys/status-staging.service b/status/sys/status-staging.service
index 94cf735..5c5e902 100644
--- a/status/sys/status-staging.service
+++ b/status/sys/status-staging.service
@@ -10,7 +10,6 @@
     --bigtable_instance=staging \
     --logtostderr \
     --workdir=/mnt/pd0/status_workdir \
-    --use_metadata=true \
     --host=status-staging.skia.org \
     --resources_dir=/usr/local/share/status \
     --capacity_recalculate_interval=30m \
diff --git a/status/sys/statusd.service b/status/sys/statusd.service
index 92d2e87..1cd2baf 100644
--- a/status/sys/statusd.service
+++ b/status/sys/statusd.service
@@ -10,7 +10,6 @@
     --bigtable_instance=production \
     --logtostderr \
     --workdir=/mnt/pd0/status_workdir \
-    --use_metadata=true \
     --host=status.skia.org \
     --resources_dir=/usr/local/share/status \
     --capacity_recalculate_interval=30m \
diff --git a/task_scheduler/go/db/pubsub/pubsub.go b/task_scheduler/go/db/pubsub/pubsub.go
index a6dceda..d475e10 100644
--- a/task_scheduler/go/db/pubsub/pubsub.go
+++ b/task_scheduler/go/db/pubsub/pubsub.go
@@ -4,6 +4,7 @@
 	"bytes"
 	"context"
 	"encoding/gob"
+	"fmt"
 	"sync"
 	"time"
 
@@ -109,11 +110,11 @@
 	t := c.Topic(topic)
 	exists, err := t.Exists(context.Background())
 	if err != nil {
-		return nil, err
+		return nil, fmt.Errorf("Failed to check for topic %q existence: %s", topic, err)
 	}
 	if !exists {
 		if _, err := c.CreateTopic(context.Background(), topic); err != nil {
-			return nil, err
+			return nil, fmt.Errorf("Failed to create topic: %s", err)
 		}
 	}
 	p := &publisher{
@@ -356,7 +357,7 @@
 	}()
 	err := <-errCh
 	if err != nil {
-		return nil, err
+		return nil, fmt.Errorf("Failed to create subscription: %s", err)
 	}
 	return func() {
 		cancelFn()