[task scheduler] Make TaskCfgCache use BigTable instead of local gob

Bug: skia:8636
Change-Id: I76ac5a00db0f146ff7d3702664c2f901a4747bc9
Reviewed-on: https://skia-review.googlesource.com/c/178840
Commit-Queue: Eric Boren <borenet@google.com>
Reviewed-by: Ben Wagner <benjaminwagner@google.com>
diff --git a/datahopper/go/bot_metrics/bot_metrics.go b/datahopper/go/bot_metrics/bot_metrics.go
index db3e6eb..7775837 100644
--- a/datahopper/go/bot_metrics/bot_metrics.go
+++ b/datahopper/go/bot_metrics/bot_metrics.go
@@ -29,6 +29,7 @@
 	"go.skia.org/infra/task_scheduler/go/db"
 	"go.skia.org/infra/task_scheduler/go/specs"
 	"go.skia.org/infra/task_scheduler/go/types"
+	"golang.org/x/oauth2"
 )
 
 const (
@@ -399,7 +400,7 @@
 }
 
 // Start initiates "average time to X% bot coverage" metrics data generation.
-func Start(ctx context.Context, taskDb db.TaskReader, workdir, recipesCfgFile string) error {
+func Start(ctx context.Context, taskDb db.TaskReader, workdir, recipesCfgFile, tasksCfgProject, tasksCfgInstance string, ts oauth2.TokenSource) error {
 	// Setup.
 	if err := os.MkdirAll(workdir, os.ModePerm); err != nil {
 		return err
@@ -417,7 +418,7 @@
 		return fmt.Errorf("Failed to sync depot_tools: %s", err)
 	}
 
-	tcc, err := specs.NewTaskCfgCache(ctx, repos, depotTools, path.Join(workdir, "taskCfgCache"), 1)
+	tcc, err := specs.NewTaskCfgCache(ctx, repos, depotTools, path.Join(workdir, "taskCfgCache"), 1, tasksCfgProject, tasksCfgInstance, ts)
 	if err != nil {
 		return fmt.Errorf("Failed to create TaskCfgCache: %s", err)
 	}
diff --git a/datahopper/go/datahopper/main.go b/datahopper/go/datahopper/main.go
index b13cf27..15484e8 100644
--- a/datahopper/go/datahopper/main.go
+++ b/datahopper/go/datahopper/main.go
@@ -14,6 +14,7 @@
 	"regexp"
 	"time"
 
+	"cloud.google.com/go/bigtable"
 	"cloud.google.com/go/storage"
 	"go.skia.org/infra/datahopper/go/bot_metrics"
 	"go.skia.org/infra/datahopper/go/swarming_metrics"
@@ -43,9 +44,11 @@
 	taskSchedulerDbUrl = flag.String("task_db_url", "http://skia-task-scheduler:8008/db/", "Where the Skia task scheduler database is hosted.")
 	workdir            = flag.String("workdir", ".", "Working directory used by data processors.")
 
-	perfBucket     = flag.String("perf_bucket", "skia-perf", "The GCS bucket that should be used for writing into perf")
-	perfPrefix     = flag.String("perf_duration_prefix", "task-duration", "The folder name in the bucket that task duration metric shoudl be written.")
-	pubsubTopicSet = flag.String("pubsub_topic_set", "", fmt.Sprintf("Pubsub topic set; one of: %v", pubsub.VALID_TOPIC_SETS))
+	perfBucket       = flag.String("perf_bucket", "skia-perf", "The GCS bucket that should be used for writing into perf")
+	perfPrefix       = flag.String("perf_duration_prefix", "task-duration", "The folder name in the bucket that task duration metric shoudl be written.")
+	pubsubTopicSet   = flag.String("pubsub_topic_set", "", fmt.Sprintf("Pubsub topic set; one of: %v", pubsub.VALID_TOPIC_SETS))
+	tasksCfgProject  = flag.String("tasks_cfg_project", "", "GCE project to use for tasks cfg cache.")
+	tasksCfgInstance = flag.String("tasks_cfg_instance", "", "BigTable instance to use for tasks cfg cache.")
 )
 
 var (
@@ -159,7 +162,7 @@
 	}()
 
 	// Tasks metrics.
-	newTs, err := auth.NewDefaultTokenSource(*local, pubsub.AUTH_SCOPE)
+	newTs, err := auth.NewDefaultTokenSource(*local, pubsub.AUTH_SCOPE, bigtable.Scope)
 	if err != nil {
 		sklog.Fatal(err)
 	}
@@ -193,7 +196,7 @@
 	if *recipesCfgFile == "" {
 		*recipesCfgFile = path.Join(*workdir, "recipes.cfg")
 	}
-	if err := bot_metrics.Start(ctx, d, *workdir, *recipesCfgFile); err != nil {
+	if err := bot_metrics.Start(ctx, d, *workdir, *recipesCfgFile, *tasksCfgProject, *tasksCfgInstance, newTs); err != nil {
 		sklog.Fatal(err)
 	}
 
diff --git a/datahopper/sys/datahopperd.service b/datahopper/sys/datahopperd.service
index 1488e94..81a0cfe 100644
--- a/datahopper/sys/datahopperd.service
+++ b/datahopper/sys/datahopperd.service
@@ -6,6 +6,8 @@
 [Service]
 ExecStart=/usr/local/bin/datahopper \
     --logtostderr \
+    --tasks_cfg_project=skia-public \
+    --tasks_cfg_instance=tasks-cfg-prod \
     --workdir=/mnt/pd0/datahopper_workdir \
 Restart=always
 User=default
diff --git a/datahopper/vm.go b/datahopper/vm.go
index 715a87e..ac9cd74 100644
--- a/datahopper/vm.go
+++ b/datahopper/vm.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"cloud.google.com/go/bigtable"
 	"go.skia.org/infra/go/gce"
 	"go.skia.org/infra/go/gce/server"
 )
@@ -11,6 +12,7 @@
 	vm.DataDisks[0].Type = gce.DISK_TYPE_PERSISTENT_SSD
 	vm.Metadata["owner_primary"] = "borenet"
 	vm.Metadata["owner_secondary"] = "jcgregorio"
+	vm.Scopes = append(vm.Scopes, bigtable.Scope)
 	return vm
 }
 
diff --git a/status/go/status/main.go b/status/go/status/main.go
index 03f0100..3e97390 100644
--- a/status/go/status/main.go
+++ b/status/go/status/main.go
@@ -96,6 +96,8 @@
 	repoUrls                    = common.NewMultiStringFlag("repo", nil, "Repositories to query for status.")
 	resourcesDir                = flag.String("resources_dir", "", "The directory to find templates, JS, and CSS files. If blank the current directory will be used.")
 	swarmingUrl                 = flag.String("swarming_url", "https://chromium-swarm.appspot.com", "URL of the Swarming server.")
+	tasksCfgProject             = flag.String("tasks_cfg_project", "", "GCE project to use for tasks cfg cache.")
+	tasksCfgInstance            = flag.String("tasks_cfg_instance", "", "BigTable instance to use for tasks cfg cache.")
 	taskSchedulerDbUrl          = flag.String("task_db_url", "http://skia-task-scheduler:8008/db/", "Where the Skia task scheduler database is hosted.")
 	taskSchedulerUrl            = flag.String("task_scheduler_url", "https://task-scheduler.skia.org", "URL of the Task Scheduler server.")
 	testing                     = flag.Bool("testing", false, "Set to true for locally testing rules. No email will be sent.")
@@ -713,7 +715,7 @@
 	}
 	ctx := context.Background()
 
-	ts, err := auth.NewDefaultTokenSource(*testing, auth.SCOPE_USERINFO_EMAIL, auth.SCOPE_GERRIT, bigtable.ReadonlyScope, pubsub.AUTH_SCOPE, datastore.ScopeDatastore)
+	ts, err := auth.NewDefaultTokenSource(*testing, auth.SCOPE_USERINFO_EMAIL, auth.SCOPE_GERRIT, bigtable.Scope, pubsub.AUTH_SCOPE, datastore.ScopeDatastore)
 	if err != nil {
 		sklog.Fatal(err)
 	}
@@ -773,7 +775,7 @@
 	sklog.Info("Checkout complete")
 
 	// Cache for buildProgressHandler.
-	tasksPerCommit, err = newTasksPerCommitCache(ctx, *workdir, *repoUrls, 14*24*time.Hour)
+	tasksPerCommit, err = newTasksPerCommitCache(ctx, *workdir, *repoUrls, 14*24*time.Hour, *tasksCfgProject, *tasksCfgInstance, ts)
 	if err != nil {
 		sklog.Fatalf("Failed to create tasksPerCommitCache: %s", err)
 	}
diff --git a/status/go/status/tasks_per_commit.go b/status/go/status/tasks_per_commit.go
index 918d08e..4a648ab 100644
--- a/status/go/status/tasks_per_commit.go
+++ b/status/go/status/tasks_per_commit.go
@@ -15,6 +15,7 @@
 	"go.skia.org/infra/go/util"
 	"go.skia.org/infra/task_scheduler/go/specs"
 	"go.skia.org/infra/task_scheduler/go/types"
+	"golang.org/x/oauth2"
 )
 
 // tasksPerCommitCache is a struct used for caching the number of task specs
@@ -28,7 +29,7 @@
 }
 
 // newTasksPerCommitCache returns a tasksPerCommitCache instance.
-func newTasksPerCommitCache(ctx context.Context, workdir string, repoUrls []string, period time.Duration) (*tasksPerCommitCache, error) {
+func newTasksPerCommitCache(ctx context.Context, workdir string, repoUrls []string, period time.Duration, tasksCfgProject, tasksCfgInstance string, ts oauth2.TokenSource) (*tasksPerCommitCache, error) {
 	wd := path.Join(workdir, "tasksPerCommitCache")
 	if _, err := os.Stat(wd); err != nil {
 		if os.IsNotExist(err) {
@@ -51,7 +52,7 @@
 		return nil, err
 	}
 	gitCache := path.Join(wd, "cache")
-	tcc, err := specs.NewTaskCfgCache(ctx, repos, depotTools.Dir(), gitCache, 3)
+	tcc, err := specs.NewTaskCfgCache(ctx, repos, depotTools.Dir(), gitCache, 3, tasksCfgProject, tasksCfgInstance, ts)
 	if err != nil {
 		return nil, err
 	}
diff --git a/status/go/statusk/main.go b/status/go/statusk/main.go
index 9334ff1..0a48154 100644
--- a/status/go/statusk/main.go
+++ b/status/go/statusk/main.go
@@ -104,6 +104,8 @@
 	resourcesDir                = flag.String("resources_dir", "", "The directory to find templates, JS, and CSS files. If blank the current directory will be used.")
 	chromeInfraAuthJWT          = flag.String("service_account_jwt", "/var/secrets/skia-public-auth/key.json", "The JWT key for the service account that has access to chrome infra auth.")
 	swarmingUrl                 = flag.String("swarming_url", "https://chromium-swarm.appspot.com", "URL of the Swarming server.")
+	tasksCfgProject             = flag.String("tasks_cfg_project", "", "GCE project to use for tasks cfg cache.")
+	tasksCfgInstance            = flag.String("tasks_cfg_instance", "", "BigTable instance to use for tasks cfg cache.")
 	taskSchedulerDbUrl          = flag.String("task_db_url", "https://task-scheduler.skia.org/db/", "Where the Skia task scheduler database is hosted.")
 	taskSchedulerUrl            = flag.String("task_scheduler_url", "https://task-scheduler.skia.org", "URL of the Task Scheduler server.")
 	testing                     = flag.Bool("testing", false, "Set to true for locally testing rules. No email will be sent.")
@@ -700,7 +702,7 @@
 	}
 	ctx := context.Background()
 
-	ts, err := auth.NewDefaultTokenSource(false, auth.SCOPE_USERINFO_EMAIL, auth.SCOPE_GERRIT, bigtable.ReadonlyScope, pubsub.AUTH_SCOPE)
+	ts, err := auth.NewDefaultTokenSource(false, auth.SCOPE_USERINFO_EMAIL, auth.SCOPE_GERRIT, bigtable.Scope, pubsub.AUTH_SCOPE)
 	if err != nil {
 		sklog.Fatal(err)
 	}
@@ -778,7 +780,7 @@
 	sklog.Info("Checkout complete")
 
 	// Cache for buildProgressHandler.
-	tasksPerCommit, err = newTasksPerCommitCache(ctx, *workdir, *recipesCfgFile, repos, 14*24*time.Hour)
+	tasksPerCommit, err = newTasksPerCommitCache(ctx, *workdir, *recipesCfgFile, repos, 14*24*time.Hour, *tasksCfgProject, *tasksCfgInstance, ts)
 	if err != nil {
 		sklog.Fatalf("Failed to create tasksPerCommitCache: %s", err)
 	}
diff --git a/status/go/statusk/tasks_per_commit.go b/status/go/statusk/tasks_per_commit.go
index 56b9534..3c28ddc 100644
--- a/status/go/statusk/tasks_per_commit.go
+++ b/status/go/statusk/tasks_per_commit.go
@@ -14,6 +14,7 @@
 	"go.skia.org/infra/go/util"
 	"go.skia.org/infra/task_scheduler/go/specs"
 	"go.skia.org/infra/task_scheduler/go/types"
+	"golang.org/x/oauth2"
 )
 
 // tasksPerCommitCache is a struct used for caching the number of task specs
@@ -27,7 +28,7 @@
 }
 
 // newTasksPerCommitCache returns a tasksPerCommitCache instance.
-func newTasksPerCommitCache(ctx context.Context, workdir, recipesCfgFile string, repos repograph.Map, period time.Duration) (*tasksPerCommitCache, error) {
+func newTasksPerCommitCache(ctx context.Context, workdir, recipesCfgFile string, repos repograph.Map, period time.Duration, tasksCfgProject, tasksCfgInstance string, ts oauth2.TokenSource) (*tasksPerCommitCache, error) {
 	wd := path.Join(workdir, "tasksPerCommitCache")
 	if _, err := os.Stat(wd); err != nil {
 		if os.IsNotExist(err) {
@@ -43,7 +44,7 @@
 		return nil, err
 	}
 	gitCache := path.Join(wd, "cache")
-	tcc, err := specs.NewTaskCfgCache(ctx, repos, depotTools, gitCache, 3)
+	tcc, err := specs.NewTaskCfgCache(ctx, repos, depotTools, gitCache, 3, tasksCfgProject, tasksCfgInstance, ts)
 	if err != nil {
 		return nil, err
 	}
diff --git a/status/sys/status-internal.service b/status/sys/status-internal.service
index 571d63a..b265e78 100644
--- a/status/sys/status-internal.service
+++ b/status/sys/status-internal.service
@@ -17,7 +17,9 @@
     --repo=https://skia.googlesource.com/internal_test.git \
     --swarming_url=https://chrome-swarming.appspot.com \
     --task_scheduler_url=https://skia-task-scheduler-internal-8000-proxy.skia.org \
-    --task_db_url=http://skia-task-scheduler-internal:8008/db/
+    --task_db_url=http://skia-task-scheduler-internal:8008/db/ \
+    --tasks_cfg_project=skia-corp \
+    --tasks_cfg_instance=tasks-cfg-internal
 Restart=always
 User=default
 Group=default
diff --git a/status/sys/status-staging.service b/status/sys/status-staging.service
index 708ac21..d2ac273 100644
--- a/status/sys/status-staging.service
+++ b/status/sys/status-staging.service
@@ -17,7 +17,9 @@
     --repo=https://skia.googlesource.com/skiabot-test.git \
     --swarming_url=https://chromium-swarm-dev.appspot.com \
     --task_scheduler_url=https://task-scheduler-staging.skia.org \
-    --task_db_url=http://skia-task-scheduler-staging:8008/db/
+    --task_db_url=http://skia-task-scheduler-staging:8008/db/ \
+    --tasks_cfg_project=skia-public \
+    --tasks_cfg_instance=tasks-cfg-staging
 Restart=always
 User=default
 Group=default
diff --git a/status/sys/statusd.service b/status/sys/statusd.service
index 8488151..0afd6b5 100644
--- a/status/sys/statusd.service
+++ b/status/sys/statusd.service
@@ -13,7 +13,9 @@
     --resources_dir=/usr/local/share/status \
     --capacity_recalculate_interval=30m \
     --pubsub_topic_set=prod \
-    --task_db_url=http://skia-task-scheduler:8008/db/
+    --task_db_url=http://skia-task-scheduler:8008/db/ \
+    --tasks_cfg_project=skia-public \
+    --tasks_cfg_instance=tasks-cfg-prod
 Restart=always
 User=default
 Group=default
diff --git a/status/vm.go b/status/vm.go
index 8938af6..e8679b0 100644
--- a/status/vm.go
+++ b/status/vm.go
@@ -17,7 +17,7 @@
 	vm.Metadata["owner_primary"] = "borenet"
 	vm.Metadata["owner_secondary"] = "kjlubick"
 	vm.Scopes = append(vm.Scopes,
-		bigtable.ReadonlyScope,
+		bigtable.Scope,
 		datastore.ScopeDatastore,
 	)
 
diff --git a/task_scheduler/go/scheduling/perftest/perftest.go b/task_scheduler/go/scheduling/perftest/perftest.go
index a2b9c85..6c796c0 100644
--- a/task_scheduler/go/scheduling/perftest/perftest.go
+++ b/task_scheduler/go/scheduling/perftest/perftest.go
@@ -294,7 +294,7 @@
 	assertNoError(ioutil.WriteFile(gitcookies, []byte(".googlesource.com\tTRUE\t/\tTRUE\t123\to\tgit-user.google.com=abc123"), os.ModePerm))
 	g, err := gerrit.NewGerrit("https://fake-skia-review.googlesource.com", gitcookies, urlMock.Client())
 	assertNoError(err)
-	s, err := scheduling.NewTaskScheduler(ctx, d, nil, time.Duration(math.MaxInt64), 0, workdir, "fake.server", repograph.Map{repoName: repo}, isolateClient, swarmingClient, http.DefaultClient, 0.9, tryjobs.API_URL_TESTING, tryjobs.BUCKET_TESTING, map[string]string{"skia": repoName}, swarming.POOLS_PUBLIC, "", depotTools, g)
+	s, err := scheduling.NewTaskScheduler(ctx, d, nil, time.Duration(math.MaxInt64), 0, workdir, "fake.server", repograph.Map{repoName: repo}, isolateClient, swarmingClient, http.DefaultClient, 0.9, tryjobs.API_URL_TESTING, tryjobs.BUCKET_TESTING, map[string]string{"skia": repoName}, swarming.POOLS_PUBLIC, "", depotTools, g, "test-project", "test-instance", nil)
 	assertNoError(err)
 
 	runTasks := func(bots []*swarming_api.SwarmingRpcsBotInfo) {
diff --git a/task_scheduler/go/scheduling/task_scheduler.go b/task_scheduler/go/scheduling/task_scheduler.go
index 30001ca..c770390 100644
--- a/task_scheduler/go/scheduling/task_scheduler.go
+++ b/task_scheduler/go/scheduling/task_scheduler.go
@@ -33,6 +33,7 @@
 	"go.skia.org/infra/task_scheduler/go/tryjobs"
 	"go.skia.org/infra/task_scheduler/go/types"
 	"go.skia.org/infra/task_scheduler/go/window"
+	"golang.org/x/oauth2"
 )
 
 const (
@@ -125,7 +126,7 @@
 	workdir        string
 }
 
-func NewTaskScheduler(ctx context.Context, d db.DB, bl *blacklist.Blacklist, period time.Duration, numCommits int, workdir, host string, repos repograph.Map, isolateClient *isolate.Client, swarmingClient swarming.ApiClient, c *http.Client, timeDecayAmt24Hr float64, buildbucketApiUrl, trybotBucket string, projectRepoMapping map[string]string, pools []string, pubsubTopic, depotTools string, gerrit gerrit.GerritInterface) (*TaskScheduler, error) {
+func NewTaskScheduler(ctx context.Context, d db.DB, bl *blacklist.Blacklist, period time.Duration, numCommits int, workdir, host string, repos repograph.Map, isolateClient *isolate.Client, swarmingClient swarming.ApiClient, c *http.Client, timeDecayAmt24Hr float64, buildbucketApiUrl, trybotBucket string, projectRepoMapping map[string]string, pools []string, pubsubTopic, depotTools string, gerrit gerrit.GerritInterface, tasksCfgProject, tasksCfgInstance string, ts oauth2.TokenSource) (*TaskScheduler, error) {
 	// Repos must be updated before window is initialized; otherwise the repos may be uninitialized,
 	// resulting in the window being too short, causing the caches to be loaded with incomplete data.
 	for _, r := range repos {
@@ -149,7 +150,7 @@
 		return nil, fmt.Errorf("Failed to create JobCache: %s", err)
 	}
 
-	taskCfgCache, err := specs.NewTaskCfgCache(ctx, repos, depotTools, path.Join(workdir, "taskCfgCache"), specs.DEFAULT_NUM_WORKERS)
+	taskCfgCache, err := specs.NewTaskCfgCache(ctx, repos, depotTools, path.Join(workdir, "taskCfgCache"), specs.DEFAULT_NUM_WORKERS, tasksCfgProject, tasksCfgInstance, ts)
 	if err != nil {
 		return nil, fmt.Errorf("Failed to create TaskCfgCache: %s", err)
 	}
diff --git a/task_scheduler/go/scheduling/task_scheduler_test.go b/task_scheduler/go/scheduling/task_scheduler_test.go
index 9eb108c..9646dd3 100644
--- a/task_scheduler/go/scheduling/task_scheduler_test.go
+++ b/task_scheduler/go/scheduling/task_scheduler_test.go
@@ -190,6 +190,7 @@
 func setup(t *testing.T) (context.Context, *git_testutils.GitBuilder, db.DB, *swarming_testutils.TestClient, *TaskScheduler, *mockhttpclient.URLMock, func()) {
 	testutils.LargeTest(t)
 
+	specs_testutils.SetupBigTable(t)
 	ctx, gb, _, _ := specs_testutils.SetupTestRepo(t)
 
 	tmp, err := ioutil.TempDir("", "")
@@ -212,11 +213,13 @@
 	assert.NoError(t, ioutil.WriteFile(gitcookies, []byte(".googlesource.com\tTRUE\t/\tTRUE\t123\to\tgit-user.google.com=abc123"), os.ModePerm))
 	g, err := gerrit.NewGerrit(fakeGerritUrl, gitcookies, urlMock.Client())
 	assert.NoError(t, err)
-	s, err := NewTaskScheduler(ctx, d, nil, time.Duration(math.MaxInt64), 0, tmp, "fake.server", repos, isolateClient, swarmingClient, urlMock.Client(), 1.0, tryjobs.API_URL_TESTING, tryjobs.BUCKET_TESTING, projectRepoMapping, swarming.POOLS_PUBLIC, "", depotTools, g)
+	btProject, btInstance, btCleanup := specs_testutils.SetupBigTable(t)
+	s, err := NewTaskScheduler(ctx, d, nil, time.Duration(math.MaxInt64), 0, tmp, "fake.server", repos, isolateClient, swarmingClient, urlMock.Client(), 1.0, tryjobs.API_URL_TESTING, tryjobs.BUCKET_TESTING, projectRepoMapping, swarming.POOLS_PUBLIC, "", depotTools, g, btProject, btInstance, nil)
 	assert.NoError(t, err)
 	return ctx, gb, d, swarmingClient, s, urlMock, func() {
 		testutils.RemoveAll(t, tmp)
 		gb.Cleanup()
+		btCleanup()
 	}
 }
 
@@ -1021,6 +1024,7 @@
 	testutils.LargeTest(t)
 
 	// Setup.
+	specs_testutils.SetupBigTable(t)
 	ctx := context.Background()
 	gb := git_testutils.GitInit(t, ctx)
 	defer gb.Cleanup()
@@ -1078,7 +1082,9 @@
 	assert.NoError(t, repos.Update(ctx))
 	repo := repos[gb.RepoUrl()]
 	depotTools := depot_tools_testutils.GetDepotTools(t, ctx)
-	tcc, err := specs.NewTaskCfgCache(ctx, repos, depotTools, tmp, 1)
+	btProject, btInstance, btCleanup := specs_testutils.SetupBigTable(t)
+	defer btCleanup()
+	tcc, err := specs.NewTaskCfgCache(ctx, repos, depotTools, tmp, 1, btProject, btInstance, nil)
 	assert.NoError(t, err)
 
 	ids := []string{}
@@ -1966,6 +1972,7 @@
 func testMultipleCandidatesBackfillingEachOtherSetup(t *testing.T) (context.Context, *git_testutils.GitBuilder, db.DB, *TaskScheduler, *swarming_testutils.TestClient, []string, func(*types.Task), func()) {
 	testutils.LargeTest(t)
 
+	specs_testutils.SetupBigTable(t)
 	ctx := context.Background()
 	gb := git_testutils.GitInit(t, ctx)
 	workdir, err := ioutil.TempDir("", "")
@@ -2027,7 +2034,8 @@
 	g, err := gerrit.NewGerrit(fakeGerritUrl, gitcookies, urlMock.Client())
 	assert.NoError(t, err)
 
-	s, err := NewTaskScheduler(ctx, d, nil, time.Duration(math.MaxInt64), 0, workdir, "fake.server", repos, isolateClient, swarmingClient, mockhttpclient.NewURLMock().Client(), 1.0, tryjobs.API_URL_TESTING, tryjobs.BUCKET_TESTING, projectRepoMapping, swarming.POOLS_PUBLIC, "", depotTools, g)
+	btProject, btInstance, btCleanup := specs_testutils.SetupBigTable(t)
+	s, err := NewTaskScheduler(ctx, d, nil, time.Duration(math.MaxInt64), 0, workdir, "fake.server", repos, isolateClient, swarmingClient, mockhttpclient.NewURLMock().Client(), 1.0, tryjobs.API_URL_TESTING, tryjobs.BUCKET_TESTING, projectRepoMapping, swarming.POOLS_PUBLIC, "", depotTools, g, btProject, btInstance, nil)
 	assert.NoError(t, err)
 
 	mockTasks := []*swarming_api.SwarmingRpcsTaskRequestMetadata{}
@@ -2062,6 +2070,7 @@
 	return ctx, gb, d, s, swarmingClient, commits, mock, func() {
 		gb.Cleanup()
 		testutils.RemoveAll(t, workdir)
+		btCleanup()
 	}
 }
 
diff --git a/task_scheduler/go/specs/specs.go b/task_scheduler/go/specs/specs.go
index 0a6fdc3..e3dc18d 100644
--- a/task_scheduler/go/specs/specs.go
+++ b/task_scheduler/go/specs/specs.go
@@ -3,12 +3,10 @@
 import (
 	"bytes"
 	"context"
-	"crypto/sha1"
 	"encoding/gob"
 	"encoding/json"
 	"errors"
 	"fmt"
-	"io"
 	"io/ioutil"
 	"os"
 	"path"
@@ -16,6 +14,7 @@
 	"sync"
 	"time"
 
+	"cloud.google.com/go/bigtable"
 	"go.skia.org/infra/go/exec"
 	"go.skia.org/infra/go/git"
 	"go.skia.org/infra/go/git/repograph"
@@ -23,6 +22,8 @@
 	"go.skia.org/infra/go/sklog"
 	"go.skia.org/infra/go/util"
 	"go.skia.org/infra/task_scheduler/go/types"
+	"golang.org/x/oauth2"
+	"google.golang.org/api/option"
 )
 
 const (
@@ -64,9 +65,33 @@
 	VARIABLE_REVISION             = "REVISION"
 	VARIABLE_TASK_ID              = "TASK_ID"
 	VARIABLE_TASK_NAME            = "TASK_NAME"
+
+	// BigTable configuration.
+
+	// BigTable used for storing TaskCfgs.
+	BT_INSTANCE_PROD     = "tasks-cfg-prod"
+	BT_INSTANCE_INTERNAL = "tasks-cfg-internal"
+	BT_INSTANCE_STAGING  = "tasks-cfg-staging"
+
+	// We use a single BigTable table for storing gob-encoded TaskSpecs and
+	// JobSpecs.
+	BT_TABLE = "tasks-cfg"
+
+	// We use a single BigTable column family.
+	BT_COLUMN_FAMILY = "CFGS"
+
+	// We use a single BigTable column which stores gob-encoded TaskSpecs
+	// and JobSpecs.
+	BT_COLUMN = "CFG"
+
+	INSERT_TIMEOUT = 30 * time.Second
+	QUERY_TIMEOUT  = 5 * time.Second
 )
 
 var (
+	// Fully-qualified BigTable column name.
+	BT_COLUMN_FULL = fmt.Sprintf("%s:%s", BT_COLUMN_FAMILY, BT_COLUMN)
+
 	PLACEHOLDER_BUILDBUCKET_BUILD_ID = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_BUILDBUCKET_BUILD_ID)
 	PLACEHOLDER_CODEREVIEW_SERVER    = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_CODEREVIEW_SERVER)
 	PLACEHOLDER_ISSUE                = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_ISSUE)
@@ -393,15 +418,12 @@
 	return rv, nil
 }
 
-// TasksCfgFileHash represents the SHA-1 checksum of the contents of
-// TASKS_CFG_FILE.
-type TasksCfgFileHash [sha1.Size]byte
-
 // TaskCfgCache is a struct used for caching tasks cfg files. The user should
 // periodically call Cleanup() to remove old entries.
 type TaskCfgCache struct {
 	// protected by mtx
 	cache         map[types.RepoState]*cacheEntry
+	client        *bigtable.Client
 	depotToolsDir string
 	file          string
 	mtx           sync.RWMutex
@@ -415,77 +437,35 @@
 	recentMtx       sync.RWMutex
 	recentTaskSpecs map[string]time.Time
 	repos           repograph.Map
+	table           *bigtable.Table
 	queue           chan func(int)
 	workdir         string
 }
 
-// gobCacheValue contains fields that can be deduplicated across cacheEntry
-// instances.
-type gobCacheValue struct {
-	Cfg  *TasksCfg
-	Err  string
-	Hash TasksCfgFileHash
-}
-
-// gobTaskCfgCache is a struct used for (de)serializing TaskCfgCache instance.
-type gobTaskCfgCache struct {
-	AddedTasksCache map[types.RepoState]util.StringSet
-	Values          []gobCacheValue
-	RecentCommits   map[string]time.Time
-	RecentJobSpecs  map[string]time.Time
-	RecentTaskSpecs map[string]time.Time
-	// Map value is an index into Values. (We can't just use pointers pointing to
-	// the same object because GOB-encoding flattens/dereferences all pointers,
-	// resulting in multiple copies in the encoded file.)
-	RepoStates map[types.RepoState]int
-}
-
 // NewTaskCfgCache returns a TaskCfgCache instance.
-func NewTaskCfgCache(ctx context.Context, repos repograph.Map, depotToolsDir, workdir string, numWorkers int) (*TaskCfgCache, error) {
-	file := path.Join(workdir, "taskCfgCache.gob")
+func NewTaskCfgCache(ctx context.Context, repos repograph.Map, depotToolsDir, workdir string, numWorkers int, project, instance string, ts oauth2.TokenSource) (*TaskCfgCache, error) {
+	client, err := bigtable.NewClient(ctx, project, instance, option.WithTokenSource(ts))
+	if err != nil {
+		return nil, fmt.Errorf("Failed to create BigTable client: %s", err)
+	}
+	table := client.Open(BT_TABLE)
 	queue := make(chan func(int))
 	c := &TaskCfgCache{
+		client:        client,
 		depotToolsDir: depotToolsDir,
-		file:          file,
 		queue:         queue,
 		repos:         repos,
+		table:         table,
 		workdir:       workdir,
 	}
-	f, err := os.Open(file)
-	if err == nil {
-		var gobCache gobTaskCfgCache
-		if err := gob.NewDecoder(f).Decode(&gobCache); err != nil {
-			util.Close(f)
-			return nil, err
-		}
-		util.Close(f)
-		c.addedTasksCache = gobCache.AddedTasksCache
-		c.cache = make(map[types.RepoState]*cacheEntry, len(gobCache.RepoStates))
-		c.recentCommits = gobCache.RecentCommits
-		c.recentJobSpecs = gobCache.RecentJobSpecs
-		c.recentTaskSpecs = gobCache.RecentTaskSpecs
-		for rs, idx := range gobCache.RepoStates {
-			if idx < 0 || idx >= len(gobCache.Values) {
-				return nil, fmt.Errorf("Corrupt cache file %q: RepoStates index %d out of range.", file, idx)
-			}
-			gobCacheValue := gobCache.Values[idx]
-			c.cache[rs] = &cacheEntry{
-				c:    c,
-				cfg:  gobCacheValue.Cfg,
-				err:  gobCacheValue.Err,
-				hash: gobCacheValue.Hash,
-				rs:   rs,
-			}
-		}
-	} else if !os.IsNotExist(err) {
-		return nil, fmt.Errorf("Failed to read cache file: %s", err)
-	} else {
-		c.cache = map[types.RepoState]*cacheEntry{}
-		c.addedTasksCache = map[types.RepoState]util.StringSet{}
-		c.recentCommits = map[string]time.Time{}
-		c.recentJobSpecs = map[string]time.Time{}
-		c.recentTaskSpecs = map[string]time.Time{}
-	}
+	// TODO(borenet): Pre-fetch entries for commits in range. This would be
+	// simpler if we passed in a Window or a list of commits or RepoStates.
+	// Maybe the recent* caches belong in a separate cache entirely?
+	c.cache = map[types.RepoState]*cacheEntry{}
+	c.addedTasksCache = map[types.RepoState]util.StringSet{}
+	c.recentCommits = map[string]time.Time{}
+	c.recentJobSpecs = map[string]time.Time{}
+	c.recentTaskSpecs = map[string]time.Time{}
 	for i := 0; i < numWorkers; i++ {
 		go func(i int) {
 			for f := range queue {
@@ -496,44 +476,183 @@
 	return c, nil
 }
 
+type storedError struct {
+	err string
+}
+
+func (e *storedError) Error() string {
+	return e.err
+}
+
+func isStoredError(err error) bool {
+	_, ok := err.(*storedError)
+	return ok
+}
+
+func GetTasksCfgFromBigTable(table *bigtable.Table, rs types.RepoState) (*TasksCfg, error) {
+	// Retrieve all rows for the TasksCfg from BigTable.
+	tasks := map[string]*TaskSpec{}
+	jobs := map[string]*JobSpec{}
+	var processErr error
+	var storedErr error
+	ctx, cancel := context.WithTimeout(context.Background(), QUERY_TIMEOUT)
+	defer cancel()
+	prefix := rs.RowKey()
+	if err := table.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool {
+		for _, ri := range row[BT_COLUMN_FAMILY] {
+			if ri.Column == BT_COLUMN_FULL {
+				suffix := strings.Split(strings.TrimPrefix(row.Key(), prefix+"#"), "#")
+				if len(suffix) != 2 {
+					processErr = fmt.Errorf("Invalid row key; expected two parts after %q; but have: %v", prefix, suffix)
+					return false
+				}
+				typ := suffix[0]
+				name := suffix[1]
+				if typ == "t" {
+					var task TaskSpec
+					processErr = gob.NewDecoder(bytes.NewReader(ri.Value)).Decode(&task)
+					if processErr != nil {
+						return false
+					}
+					tasks[suffix[1]] = &task
+				} else if typ == "j" {
+					var job JobSpec
+					processErr = gob.NewDecoder(bytes.NewReader(ri.Value)).Decode(&job)
+					if processErr != nil {
+						return false
+					}
+					jobs[name] = &job
+				} else if typ == "e" {
+					storedErr = &storedError{string(ri.Value)}
+					return false
+				} else {
+					processErr = fmt.Errorf("Invalid row key %q; unknown entry type %q", row.Key(), suffix[0])
+					return false
+				}
+				// We only store one message per row.
+				return true
+			}
+		}
+		return true
+	}, bigtable.RowFilter(bigtable.LatestNFilter(1))); err != nil {
+		return nil, fmt.Errorf("Failed to retrieve data from BigTable: %s", err)
+	}
+	if processErr != nil {
+		return nil, fmt.Errorf("Failed to process row: %s", processErr)
+	}
+	if storedErr != nil {
+		return nil, storedErr
+	}
+	if len(tasks) == 0 {
+		return nil, nil
+	}
+	if len(jobs) == 0 {
+		return nil, nil
+	}
+	return &TasksCfg{
+		Tasks: tasks,
+		Jobs:  jobs,
+	}, nil
+}
+
+func WriteTasksCfgToBigTable(table *bigtable.Table, rs types.RepoState, cfg *TasksCfg, err error) error {
+	var rks []string
+	var mts []*bigtable.Mutation
+	prefix := rs.RowKey() + "#"
+	if err != nil {
+		rks = append(rks, prefix+"e#")
+		mt := bigtable.NewMutation()
+		mt.Set(BT_COLUMN_FAMILY, BT_COLUMN, bigtable.ServerTime, []byte(err.Error()))
+		mts = append(mts, mt)
+	} else {
+		rks = make([]string, 0, len(cfg.Tasks)+len(cfg.Jobs))
+		mts = make([]*bigtable.Mutation, 0, len(cfg.Tasks)+len(cfg.Jobs))
+		for name, task := range cfg.Tasks {
+			rks = append(rks, prefix+"t#"+name)
+			buf := bytes.Buffer{}
+			if err := gob.NewEncoder(&buf).Encode(task); err != nil {
+				return err
+			}
+			mt := bigtable.NewMutation()
+			mt.Set(BT_COLUMN_FAMILY, BT_COLUMN, bigtable.ServerTime, buf.Bytes())
+			mts = append(mts, mt)
+		}
+		for name, job := range cfg.Jobs {
+			rks = append(rks, prefix+"j#"+name)
+			buf := bytes.Buffer{}
+			if err := gob.NewEncoder(&buf).Encode(job); err != nil {
+				return err
+			}
+			mt := bigtable.NewMutation()
+			mt.Set(BT_COLUMN_FAMILY, BT_COLUMN, bigtable.ServerTime, buf.Bytes())
+			mts = append(mts, mt)
+		}
+	}
+	ctx, cancel := context.WithTimeout(context.Background(), INSERT_TIMEOUT)
+	defer cancel()
+	errs, err := table.ApplyBulk(ctx, rks, mts)
+	if err != nil {
+		return err
+	}
+	for _, err := range errs {
+		if err != nil {
+			// TODO(borenet): Should we retry? Delete the inserted entries?
+			return err
+		}
+	}
+	return nil
+}
+
 // Close frees up resources used by the TaskCfgCache.
 func (c *TaskCfgCache) Close() error {
 	close(c.queue)
-	return nil
+	return c.client.Close()
 }
 
 type cacheEntry struct {
 	c *TaskCfgCache
 	// Only one of cfg or err may be non-empty.
-	cfg  *TasksCfg
-	err  string
-	hash TasksCfgFileHash
-	mtx  sync.Mutex
-	rs   types.RepoState
+	cfg *TasksCfg
+	err string
+	mtx sync.Mutex
+	rs  types.RepoState
 }
 
 // Get returns the TasksCfg for this cache entry. If it does not already exist
-// in the cache, it is read from the repo and the bool return value is true,
-// indicating that the caller should write out the cache.
-func (e *cacheEntry) Get(ctx context.Context) (*TasksCfg, bool, error) {
+// in the cache, we attempt to read it from BigTable. If it does not exist in
+// BigTable, it is read from the repo and written to BigTable.
+func (e *cacheEntry) Get(ctx context.Context) (*TasksCfg, error) {
 	e.mtx.Lock()
 	defer e.mtx.Unlock()
 	if e.cfg != nil {
-		return e.cfg, false, nil
+		return e.cfg, nil
 	}
 	if e.err != "" {
-		return nil, false, errors.New(e.err)
+		return nil, errors.New(e.err)
+	}
+
+	r, ok := e.c.repos[e.rs.Repo]
+	if !ok {
+		return nil, fmt.Errorf("Unknown repo %q", e.rs.Repo)
+	}
+
+	// Try to read the TasksCfg from BigTable.
+	cfg, err := GetTasksCfgFromBigTable(e.c.table, e.rs)
+	if err != nil {
+		if isStoredError(err) {
+			e.err = err.Error()
+		}
+		return nil, err
+	}
+	if cfg != nil {
+		e.cfg = cfg
+		return cfg, e.c.updateSecondaryCaches(r, e.rs, cfg)
 	}
 
 	// We haven't seen this RepoState before, or it's scrolled out of our
 	// window. Read it.
 	// Point the upstream to a local source of truth to eliminate network
 	// latency.
-	r, ok := e.c.repos[e.rs.Repo]
-	if !ok {
-		return nil, false, fmt.Errorf("Unknown repo %q", e.rs.Repo)
-	}
-	var cfg *TasksCfg
 	if err := e.c.TempGitRepo(ctx, e.rs, e.rs.IsTryJob(), func(checkout *git.TempCheckout) error {
 		contents, err := ioutil.ReadFile(path.Join(checkout.Dir(), TASKS_CFG_FILE))
 		if err != nil {
@@ -548,40 +667,47 @@
 				return fmt.Errorf("Failed to read tasks cfg: could not read file: %s", err)
 			}
 		}
-		e.hash = TasksCfgFileHash(sha1.Sum(contents))
 		cfg, err = ParseTasksCfg(string(contents))
 		return err
 	}); err != nil {
 		if strings.Contains(err.Error(), "error: Failed to merge in the changes.") {
 			e.err = err.Error()
+			if err2 := WriteTasksCfgToBigTable(e.c.table, e.rs, nil, err); err2 != nil {
+				return nil, fmt.Errorf("Failed to obtain TasksCfg due to merge error and failed to cache the error with: %s", err2)
+			}
 		}
-		return nil, false, err
+		return nil, err
+	}
+	if err := e.c.updateSecondaryCaches(r, e.rs, cfg); err != nil {
+		return nil, err
 	}
 	e.cfg = cfg
+	return cfg, WriteTasksCfgToBigTable(e.c.table, e.rs, cfg, nil)
+}
 
+func (c *TaskCfgCache) updateSecondaryCaches(r *repograph.Graph, rs types.RepoState, cfg *TasksCfg) error {
 	// Write the commit and task specs into the recent lists.
-	// TODO(borenet): The below should probably go elsewhere.
-	e.c.recentMtx.Lock()
-	defer e.c.recentMtx.Unlock()
-	d := r.Get(e.rs.Revision)
+	c.recentMtx.Lock()
+	defer c.recentMtx.Unlock()
+	d := r.Get(rs.Revision)
 	if d == nil {
-		return nil, false, fmt.Errorf("Unknown revision %s in %s", e.rs.Revision, e.rs.Repo)
+		return fmt.Errorf("Unknown revision %s in %s", rs.Revision, rs.Repo)
 	}
 	ts := d.Timestamp
-	if ts.After(e.c.recentCommits[e.rs.Revision]) {
-		e.c.recentCommits[e.rs.Revision] = ts
+	if ts.After(c.recentCommits[rs.Revision]) {
+		c.recentCommits[rs.Revision] = ts
 	}
 	for name := range cfg.Tasks {
-		if ts.After(e.c.recentTaskSpecs[name]) {
-			e.c.recentTaskSpecs[name] = ts
+		if ts.After(c.recentTaskSpecs[name]) {
+			c.recentTaskSpecs[name] = ts
 		}
 	}
 	for name := range cfg.Jobs {
-		if ts.After(e.c.recentJobSpecs[name]) {
-			e.c.recentJobSpecs[name] = ts
+		if ts.After(c.recentJobSpecs[name]) {
+			c.recentJobSpecs[name] = ts
 		}
 	}
-	return cfg, true, nil
+	return nil
 }
 
 func (c *TaskCfgCache) getEntry(rs types.RepoState) *cacheEntry {
@@ -603,13 +729,9 @@
 	c.mtx.Lock()
 	defer c.mtx.Unlock()
 	entry := c.getEntry(rs)
-	rv, mustWrite, err := entry.Get(ctx)
+	rv, err := entry.Get(ctx)
 	if err != nil {
 		return nil, err
-	} else if mustWrite {
-		c.recentMtx.Lock()
-		defer c.recentMtx.Unlock()
-		return rv, c.write()
 	}
 	return rv, err
 }
@@ -628,12 +750,11 @@
 	var wg sync.WaitGroup
 	rv := make(map[types.RepoState]map[string]*TaskSpec, len(rs))
 	errs := []error{}
-	mustWrite := false
 	for s, entry := range entries {
 		wg.Add(1)
 		go func(s types.RepoState, entry *cacheEntry) {
 			defer wg.Done()
-			cfg, w, err := entry.Get(ctx)
+			cfg, err := entry.Get(ctx)
 			if err != nil {
 				m.Lock()
 				defer m.Unlock()
@@ -648,19 +769,9 @@
 			m.Lock()
 			defer m.Unlock()
 			rv[s] = subMap
-			if w {
-				mustWrite = true
-			}
 		}(s, entry)
 	}
 	wg.Wait()
-	if mustWrite {
-		c.recentMtx.Lock()
-		defer c.recentMtx.Unlock()
-		if err := c.write(); err != nil {
-			return nil, err
-		}
-	}
 	if len(errs) > 0 {
 		return nil, fmt.Errorf("Errors loading task cfgs: %v", errs)
 	}
@@ -819,59 +930,7 @@
 			delete(c.recentJobSpecs, k)
 		}
 	}
-	return c.write()
-}
-
-// write writes the TaskCfgCache to a file. Assumes the caller holds both c.mtx
-// and c.recentMtx.
-func (c *TaskCfgCache) write() error {
-	dir := path.Dir(c.file)
-	if err := os.MkdirAll(dir, os.ModePerm); err != nil {
-		return err
-	}
-	return util.WithWriteFile(c.file, func(w io.Writer) error {
-		gobCache := gobTaskCfgCache{
-			AddedTasksCache: c.addedTasksCache,
-			Values:          make([]gobCacheValue, 0, len(c.cache)),
-			RecentCommits:   c.recentCommits,
-			RecentJobSpecs:  c.recentJobSpecs,
-			RecentTaskSpecs: c.recentTaskSpecs,
-			RepoStates:      make(map[types.RepoState]int, len(c.cache)),
-		}
-		// When deduplicating gobCacheValue, we need to key by both hash and err. In
-		// the case that a patch doesn't apply, hash will always be all-zero, but
-		// err could vary.
-		type valueKey struct {
-			hash TasksCfgFileHash
-			err  string
-		}
-		valueIdxMap := make(map[valueKey]int, len(c.cache))
-		for _, e := range c.cache {
-			if e.cfg == nil && e.err == "" {
-				// Haven't called Get yet or Get failed; nothing to cache.
-				continue
-			}
-			key := valueKey{
-				hash: e.hash,
-				err:  e.err,
-			}
-			idx, ok := valueIdxMap[key]
-			if !ok {
-				idx = len(gobCache.Values)
-				valueIdxMap[key] = idx
-				gobCache.Values = append(gobCache.Values, gobCacheValue{
-					Cfg:  e.cfg,
-					Err:  e.err,
-					Hash: e.hash,
-				})
-			}
-			gobCache.RepoStates[e.rs] = idx
-		}
-		if err := gob.NewEncoder(w).Encode(&gobCache); err != nil {
-			return fmt.Errorf("Failed to encode TaskCfgCache: %s", err)
-		}
-		return nil
-	})
+	return nil
 }
 
 func stringMapKeys(m map[string]time.Time) []string {
diff --git a/task_scheduler/go/specs/specs_test.go b/task_scheduler/go/specs/specs_test.go
index aaed2dc..5dc82b4 100644
--- a/task_scheduler/go/specs/specs_test.go
+++ b/task_scheduler/go/specs/specs_test.go
@@ -1,13 +1,11 @@
 package specs
 
 import (
-	"bytes"
 	"context"
-	"encoding/gob"
+	"errors"
 	"fmt"
 	"io/ioutil"
 	"path"
-	"path/filepath"
 	"strings"
 	"sync"
 	"testing"
@@ -16,6 +14,7 @@
 	assert "github.com/stretchr/testify/require"
 	"go.skia.org/infra/go/deepequal"
 	depot_tools_testutils "go.skia.org/infra/go/depot_tools/testutils"
+	"go.skia.org/infra/go/exec"
 	"go.skia.org/infra/go/git"
 	"go.skia.org/infra/go/git/repograph"
 	git_testutils "go.skia.org/infra/go/git/testutils"
@@ -99,7 +98,9 @@
 	}
 	assert.NoError(t, repos.Update(ctx))
 
-	cache, err := NewTaskCfgCache(ctx, repos, depot_tools_testutils.GetDepotTools(t, ctx), tmp, DEFAULT_NUM_WORKERS)
+	project, instance, cleanup := specs_testutils.SetupBigTable(t)
+	defer cleanup()
+	cache, err := NewTaskCfgCache(ctx, repos, depot_tools_testutils.GetDepotTools(t, ctx), tmp, DEFAULT_NUM_WORKERS, project, instance, nil)
 	assert.NoError(t, err)
 
 	rs1 := types.RepoState{
@@ -161,7 +162,9 @@
 	}
 	assert.NoError(t, repos.Update(ctx))
 
-	cache, err := NewTaskCfgCache(ctx, repos, depot_tools_testutils.GetDepotTools(t, ctx), tmp, DEFAULT_NUM_WORKERS)
+	project, instance, cleanup := specs_testutils.SetupBigTable(t)
+	defer cleanup()
+	cache, err := NewTaskCfgCache(ctx, repos, depot_tools_testutils.GetDepotTools(t, ctx), tmp, DEFAULT_NUM_WORKERS, project, instance, nil)
 	assert.NoError(t, err)
 
 	rs1 := types.RepoState{
@@ -272,7 +275,9 @@
 		gb.RepoUrl(): repo,
 	}
 	assert.NoError(t, repos.Update(ctx))
-	cache, err := NewTaskCfgCache(ctx, repos, depot_tools_testutils.GetDepotTools(t, ctx), path.Join(tmp, "cache"), DEFAULT_NUM_WORKERS)
+	project, instance, cleanup := specs_testutils.SetupBigTable(t)
+	defer cleanup()
+	cache, err := NewTaskCfgCache(ctx, repos, depot_tools_testutils.GetDepotTools(t, ctx), path.Join(tmp, "cache"), DEFAULT_NUM_WORKERS, project, instance, nil)
 	assert.NoError(t, err)
 
 	// Load configs into the cache.
@@ -305,6 +310,81 @@
 	assert.Equal(t, 1, len(cache.addedTasksCache))
 }
 
+func TestTaskCfgCacheError(t *testing.T) {
+	testutils.LargeTest(t)
+
+	// Verify that we properly cache merge errors.
+	ctx, gb, c1, c2 := specs_testutils.SetupTestRepo(t)
+	defer gb.Cleanup()
+
+	tmp, err := ioutil.TempDir("", "")
+	assert.NoError(t, err)
+	defer testutils.RemoveAll(t, tmp)
+
+	repo, err := repograph.NewGraph(ctx, gb.RepoUrl(), tmp)
+	assert.NoError(t, err)
+	repos := repograph.Map{
+		gb.RepoUrl(): repo,
+	}
+	assert.NoError(t, repos.Update(ctx))
+	project, instance, cleanup := specs_testutils.SetupBigTable(t)
+	defer cleanup()
+	cache, err := NewTaskCfgCache(ctx, repos, depot_tools_testutils.GetDepotTools(t, ctx), path.Join(tmp, "cache"), DEFAULT_NUM_WORKERS, project, instance, nil)
+	assert.NoError(t, err)
+
+	// Load configs into the cache.
+	rs1 := types.RepoState{
+		Repo:     gb.RepoUrl(),
+		Revision: c1,
+	}
+	rs2 := types.RepoState{
+		Repo:     gb.RepoUrl(),
+		Revision: c2,
+	}
+	_, err = cache.GetTaskSpecsForRepoStates(ctx, []types.RepoState{rs1, rs2})
+	assert.NoError(t, err)
+	assert.Equal(t, 2, len(cache.cache))
+
+	botUpdateCount := 0
+	mock := exec.CommandCollector{}
+	mock.SetDelegateRun(func(cmd *exec.Command) error {
+		for _, arg := range cmd.Args {
+			if strings.Contains(arg, "bot_update") {
+				botUpdateCount++
+				return errors.New("error: Failed to merge in the changes.")
+			}
+		}
+		return exec.DefaultRun(cmd)
+	})
+	ctx = exec.NewContext(ctx, mock.Run)
+	repoStates := []types.RepoState{
+		types.RepoState{
+			Repo:     rs1.Repo,
+			Revision: rs1.Revision,
+			Patch: types.Patch{
+				Server:   "my-server",
+				Issue:    "12345",
+				Patchset: "1",
+			},
+		},
+	}
+	_, err = cache.GetTaskSpecsForRepoStates(ctx, repoStates)
+	assert.EqualError(t, err, "Errors loading task cfgs: [error: Failed to merge in the changes.; Stdout+Stderr:\n]")
+	assert.Equal(t, 1, botUpdateCount)
+
+	// Try again, assert that we didn't run bot_update again.
+	_, err = cache.GetTaskSpecsForRepoStates(ctx, repoStates)
+	assert.EqualError(t, err, "Errors loading task cfgs: [error: Failed to merge in the changes.; Stdout+Stderr:\n]")
+	assert.Equal(t, 1, botUpdateCount)
+
+	// Create a new cache, assert that it doesn't run bot_update.
+	cache2, err := NewTaskCfgCache(ctx, repos, depot_tools_testutils.GetDepotTools(t, ctx), path.Join(tmp, "cache2"), DEFAULT_NUM_WORKERS, project, instance, nil)
+	assert.NoError(t, err)
+	_, err = cache2.GetTaskSpecsForRepoStates(ctx, repoStates)
+	assert.EqualError(t, err, "Errors loading task cfgs: [error: Failed to merge in the changes.; Stdout+Stderr:\n]")
+	assert.Equal(t, 1, botUpdateCount)
+}
+
 // makeTasksCfg generates a JSON representation of a TasksCfg based on the given
 // tasks and jobs.
 func makeTasksCfg(t *testing.T, tasks, jobs map[string][]string) string {
@@ -535,7 +615,9 @@
 	repos, err := repograph.NewMap(ctx, []string{gb.RepoUrl()}, tmp)
 	assert.NoError(t, err)
 
-	cache, err := NewTaskCfgCache(ctx, repos, depot_tools_testutils.GetDepotTools(t, ctx), tmp, DEFAULT_NUM_WORKERS)
+	project, instance, cleanup := specs_testutils.SetupBigTable(t)
+	defer cleanup()
+	cache, err := NewTaskCfgCache(ctx, repos, depot_tools_testutils.GetDepotTools(t, ctx), tmp, DEFAULT_NUM_WORKERS, project, instance, nil)
 	assert.NoError(t, err)
 
 	rs := types.RepoState{
@@ -572,7 +654,9 @@
 	repos, err := repograph.NewMap(ctx, []string{gb.RepoUrl()}, tmp)
 	assert.NoError(t, err)
 
-	cache, err := NewTaskCfgCache(ctx, repos, depot_tools_testutils.GetDepotTools(t, ctx), tmp, DEFAULT_NUM_WORKERS)
+	project, instance, cleanup := specs_testutils.SetupBigTable(t)
+	defer cleanup()
+	cache, err := NewTaskCfgCache(ctx, repos, depot_tools_testutils.GetDepotTools(t, ctx), tmp, DEFAULT_NUM_WORKERS, project, instance, nil)
 	assert.NoError(t, err)
 
 	// bot_update will fail to apply the issue if we don't fake it in Git.
@@ -625,10 +709,10 @@
 	}, []string{"a", "g"})
 }
 
-func TestTaskCfgCacheSerialization(t *testing.T) {
+func TestTaskCfgCacheStorage(t *testing.T) {
 	testutils.LargeTest(t)
 
-	ctx, gb, r1, r2 := specs_testutils.SetupTestRepo(t)
+	ctx, gb, r1, _ := specs_testutils.SetupTestRepo(t)
 	defer gb.Cleanup()
 
 	tmp, err := ioutil.TempDir("", "")
@@ -639,58 +723,73 @@
 	assert.NoError(t, err)
 	assert.NoError(t, repos.Update(ctx))
 
-	c, err := NewTaskCfgCache(ctx, repos, depot_tools_testutils.GetDepotTools(t, ctx), tmp, DEFAULT_NUM_WORKERS)
+	botUpdateCount := 0
+	mock := exec.CommandCollector{}
+	mock.SetDelegateRun(func(cmd *exec.Command) error {
+		for _, arg := range cmd.Args {
+			if strings.Contains(arg, "bot_update") {
+				botUpdateCount++
+				break
+			}
+		}
+		return exec.DefaultRun(cmd)
+	})
+	ctx = exec.NewContext(ctx, mock.Run)
+
+	project, instance, cleanup := specs_testutils.SetupBigTable(t)
+	defer cleanup()
+	c, err := NewTaskCfgCache(ctx, repos, depot_tools_testutils.GetDepotTools(t, ctx), tmp, DEFAULT_NUM_WORKERS, project, instance, nil)
 	assert.NoError(t, err)
 
-	check := func() {
-		c2, err := NewTaskCfgCache(ctx, repos, depot_tools_testutils.GetDepotTools(t, ctx), tmp, DEFAULT_NUM_WORKERS)
+	check := func(rs ...types.RepoState) {
+		c2, err := NewTaskCfgCache(ctx, repos, depot_tools_testutils.GetDepotTools(t, ctx), tmp, DEFAULT_NUM_WORKERS, project, instance, nil)
 		assert.NoError(t, err)
+		expectBotUpdateCount := botUpdateCount
+		for _, r := range rs {
+			_, err := c2.ReadTasksCfg(ctx, r)
+			assert.NoError(t, err)
+		}
+		// Assert that we obtained the TasksCfg from BigTable and not by
+		// running bot_update.
+		assert.Equal(t, expectBotUpdateCount, botUpdateCount)
 
-		// We can't use reflect.DeepEqual on channels, so temporarily
-		// nil out the channels for comparison.
+		// Verify that the caches are updated as expected.
 		c.mtx.Lock()
 		defer c.mtx.Unlock()
 		c2.mtx.Lock()
 		defer c2.mtx.Unlock()
-		c1Queue := c.queue
-		c2Queue := c2.queue
-		c.queue = nil
-		c2.queue = nil
-		deepequal.AssertDeepEqual(t, c, c2)
-		c.queue = c1Queue
-		c2.queue = c2Queue
+		assert.Equal(t, len(c.cache), len(c2.cache))
+		for k, v := range c.cache {
+			v2, ok := c2.cache[k]
+			assert.True(t, ok)
+			assert.Equal(t, v.err, v2.err)
+			deepequal.AssertDeepEqual(t, v.cfg, v2.cfg)
+			deepequal.AssertDeepEqual(t, v.rs, v2.rs)
+		}
+		deepequal.AssertDeepEqual(t, c.addedTasksCache, c2.addedTasksCache)
+		deepequal.AssertDeepEqual(t, c.recentCommits, c2.recentCommits)
+		deepequal.AssertDeepEqual(t, c.recentJobSpecs, c2.recentJobSpecs)
+		deepequal.AssertDeepEqual(t, c.recentTaskSpecs, c2.recentTaskSpecs)
 	}
 
 	// Empty cache.
 	check()
 
 	// Insert one commit's worth of specs into the cache.
-	_, err = c.ReadTasksCfg(ctx, types.RepoState{
+	rs1 := types.RepoState{
 		Repo:     gb.RepoUrl(),
 		Revision: r1,
-	})
+	}
+	_, err = c.ReadTasksCfg(ctx, rs1)
 	assert.NoError(t, err)
 	assert.Equal(t, 1, len(c.cache))
-	check()
+	check(rs1)
 
 	// Cleanup() the cache to remove the entries.
 	assert.NoError(t, c.Cleanup(time.Duration(0)))
 	assert.Equal(t, 0, len(c.cache))
 	check()
 
-	// Insert an error into the cache.
-	rs2 := types.RepoState{
-		Repo:     gb.RepoUrl(),
-		Revision: r2,
-	}
-	c.cache[rs2] = &cacheEntry{
-		c:   c,
-		cfg: nil,
-		err: "fail!",
-		rs:  rs2,
-	}
-	assert.NoError(t, c.write())
-
 	// Add two commits with identical tasks.json hash and check serialization.
 	r3 := gb.CommitGen(ctx, "otherfile.txt")
 	rs3 := types.RepoState{
@@ -705,34 +804,6 @@
 	assert.NoError(t, repos.Update(ctx))
 	_, err = c.GetTaskSpecsForRepoStates(ctx, []types.RepoState{rs3, rs4})
 	assert.NoError(t, err)
-	assert.Equal(t, 3, len(c.cache))
-	check()
-
-	fileName := filepath.Join(tmp, "taskCfgCache.gob")
-	{
-		// Check that rs3 and rs4 share a gobCacheEntry.
-		file, err := ioutil.ReadFile(fileName)
-		assert.NoError(t, err)
-		var gobCache gobTaskCfgCache
-		assert.NoError(t, gob.NewDecoder(bytes.NewReader(file)).Decode(&gobCache))
-		assert.Len(t, gobCache.Values, 2)
-		assert.Equal(t, gobCache.RepoStates[rs3], gobCache.RepoStates[rs4])
-	}
-
-	// Check that different errors get different gobCacheEntry's.
-	c.cache[rs4] = &cacheEntry{
-		c:   c,
-		cfg: nil,
-		err: "To err is human.",
-		rs:  rs4,
-	}
-	assert.NoError(t, c.write())
-	{
-		file, err := ioutil.ReadFile(fileName)
-		assert.NoError(t, err)
-		var gobCache gobTaskCfgCache
-		assert.NoError(t, gob.NewDecoder(bytes.NewReader(file)).Decode(&gobCache))
-		assert.Len(t, gobCache.Values, 3)
-		assert.NotEqual(t, gobCache.RepoStates[rs2], gobCache.RepoStates[rs4])
-	}
+	assert.Equal(t, 2, len(c.cache))
+	check(rs3, rs4)
 }
diff --git a/task_scheduler/go/specs/testutils/testutils.go b/task_scheduler/go/specs/testutils/testutils.go
index 6e144dc..a81ed6c 100644
--- a/task_scheduler/go/specs/testutils/testutils.go
+++ b/task_scheduler/go/specs/testutils/testutils.go
@@ -2,8 +2,12 @@
 
 import (
 	"context"
+	"fmt"
 	"time"
 
+	"github.com/google/uuid"
+	assert "github.com/stretchr/testify/require"
+	"go.skia.org/infra/go/bt"
 	git_testutils "go.skia.org/infra/go/git/testutils"
 	"go.skia.org/infra/go/testutils"
 )
@@ -12,6 +16,8 @@
 	BuildTask = "Build-Ubuntu-GCC-Arm7-Release-Android"
 	TestTask  = "Test-Android-GCC-Nexus7-GPU-Tegra3-Arm7-Release"
 	PerfTask  = "Perf-Android-GCC-Nexus7-GPU-Tegra3-Arm7-Release"
+
+	BT_PROJECT = "test-project"
 )
 
 // The test repo has two commits. The first commit adds a tasks.cfg file
@@ -210,3 +216,22 @@
 
 	return ctx, gb, c1, c2
 }
+
+// SetupBigTable performs setup for the TaskCfgCache in BigTable. Returns the
+// project and instance names which should be used to instantiate TaskCfgCache
+// and a cleanup function which should be deferred.
+func SetupBigTable(t testutils.TestingT) (string, string, func()) {
+	// The table and column family names are specs.BT_TABLE and
+	// specs.BT_COLUMN_FAMILY, but are hard-coded here to avoid a dependency
+	// cycle.
+	cfg := bt.TableConfig{
+		"tasks-cfg": {
+			"CFGS",
+		},
+	}
+	instance := fmt.Sprintf("specs-testutils-%s", uuid.New())
+	assert.NoError(t, bt.InitBigtable(BT_PROJECT, instance, cfg))
+	return BT_PROJECT, instance, func() {
+		assert.NoError(t, bt.DeleteTables(BT_PROJECT, instance, cfg))
+	}
+}
diff --git a/task_scheduler/go/task_scheduler/main.go b/task_scheduler/go/task_scheduler/main.go
index 5aeb9a4..f9db99f 100644
--- a/task_scheduler/go/task_scheduler/main.go
+++ b/task_scheduler/go/task_scheduler/main.go
@@ -14,6 +14,7 @@
 	"runtime"
 	"time"
 
+	"cloud.google.com/go/bigtable"
 	"cloud.google.com/go/datastore"
 	"github.com/gorilla/mux"
 	"go.skia.org/infra/go/allowed"
@@ -94,19 +95,21 @@
 	// instance name. Once all schedulers are using Firestore for their
 	// task DB, firestoreInstance will have the same value. We should
 	// combine into a single instanceName flag.
-	pubsubTopicSet = flag.String("pubsub_topic_set", "", fmt.Sprintf("Pubsub topic set; one of: %v", pubsub.VALID_TOPIC_SETS))
-	repoUrls       = common.NewMultiStringFlag("repo", nil, "Repositories for which to schedule tasks.")
-	recipesCfgFile = flag.String("recipes_cfg", "", "Path to the recipes.cfg file.")
-	resourcesDir   = flag.String("resources_dir", "", "The directory to find templates, JS, and CSS files. If blank, assumes you're running inside a checkout and will attempt to find the resources relative to this source file.")
-	scoreDecay24Hr = flag.Float64("scoreDecay24Hr", 0.9, "Task candidate scores are penalized using linear time decay. This is the desired value after 24 hours. Setting it to 1.0 causes commits not to be prioritized according to commit time.")
-	swarmingPools  = common.NewMultiStringFlag("pool", swarming.POOLS_PUBLIC, "Which Swarming pools to use.")
-	swarmingServer = flag.String("swarming_server", swarming.SWARMING_SERVER, "Which Swarming server to use.")
-	timePeriod     = flag.String("timeWindow", "4d", "Time period to use.")
-	tryJobBucket   = flag.String("tryjob_bucket", tryjobs.BUCKET_PRIMARY, "Which Buildbucket bucket to use for try jobs.")
-	commitWindow   = flag.Int("commitWindow", 10, "Minimum number of recent commits to keep in the timeWindow.")
-	gsBucket       = flag.String("gsBucket", "skia-task-scheduler", "Name of Google Cloud Storage bucket to use for backups and recovery.")
-	workdir        = flag.String("workdir", "workdir", "Working directory to use.")
-	promPort       = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')")
+	pubsubTopicSet   = flag.String("pubsub_topic_set", "", fmt.Sprintf("Pubsub topic set; one of: %v", pubsub.VALID_TOPIC_SETS))
+	repoUrls         = common.NewMultiStringFlag("repo", nil, "Repositories for which to schedule tasks.")
+	recipesCfgFile   = flag.String("recipes_cfg", "", "Path to the recipes.cfg file.")
+	resourcesDir     = flag.String("resources_dir", "", "The directory to find templates, JS, and CSS files. If blank, assumes you're running inside a checkout and will attempt to find the resources relative to this source file.")
+	scoreDecay24Hr   = flag.Float64("scoreDecay24Hr", 0.9, "Task candidate scores are penalized using linear time decay. This is the desired value after 24 hours. Setting it to 1.0 causes commits not to be prioritized according to commit time.")
+	swarmingPools    = common.NewMultiStringFlag("pool", swarming.POOLS_PUBLIC, "Which Swarming pools to use.")
+	swarmingServer   = flag.String("swarming_server", swarming.SWARMING_SERVER, "Which Swarming server to use.")
+	tasksCfgProject  = flag.String("tasks_cfg_project", "", "GCE project to use for tasks cfg cache.")
+	tasksCfgInstance = flag.String("tasks_cfg_instance", "", "BigTable instance to use for tasks cfg cache.")
+	timePeriod       = flag.String("timeWindow", "4d", "Time period to use.")
+	tryJobBucket     = flag.String("tryjob_bucket", tryjobs.BUCKET_PRIMARY, "Which Buildbucket bucket to use for try jobs.")
+	commitWindow     = flag.Int("commitWindow", 10, "Minimum number of recent commits to keep in the timeWindow.")
+	gsBucket         = flag.String("gsBucket", "skia-task-scheduler", "Name of Google Cloud Storage bucket to use for backups and recovery.")
+	workdir          = flag.String("workdir", "workdir", "Working directory to use.")
+	promPort         = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')")
 
 	pubsubTopicName      = flag.String("pubsub_topic", swarming.PUBSUB_TOPIC_SWARMING_TASKS, "Pub/Sub topic to use for Swarming tasks.")
 	pubsubSubscriberName = flag.String("pubsub_subscriber", PUBSUB_SUBSCRIBER_TASK_SCHEDULER, "Pub/Sub subscriber name.")
@@ -625,7 +628,7 @@
 
 	// Authenticated HTTP client.
 	oauthCacheFile := path.Join(wdAbs, "google_storage_token.data")
-	tokenSource, err := auth.NewLegacyTokenSource(*local, oauthCacheFile, "", auth.SCOPE_READ_WRITE, pubsub.AUTH_SCOPE, datastore.ScopeDatastore)
+	tokenSource, err := auth.NewLegacyTokenSource(*local, oauthCacheFile, "", auth.SCOPE_READ_WRITE, pubsub.AUTH_SCOPE, datastore.ScopeDatastore, bigtable.Scope)
 	if err != nil {
 		sklog.Fatal(err)
 	}
@@ -745,7 +748,7 @@
 	if err := swarming.InitPubSub(serverURL, *pubsubTopicName, *pubsubSubscriberName); err != nil {
 		sklog.Fatal(err)
 	}
-	ts, err = scheduling.NewTaskScheduler(ctx, tsDb, bl, period, *commitWindow, wdAbs, serverURL, repos, isolateClient, swarm, httpClient, *scoreDecay24Hr, tryjobs.API_URL_PROD, *tryJobBucket, common.PROJECT_REPO_MAPPING, *swarmingPools, *pubsubTopicName, depotTools, gerrit)
+	ts, err = scheduling.NewTaskScheduler(ctx, tsDb, bl, period, *commitWindow, wdAbs, serverURL, repos, isolateClient, swarm, httpClient, *scoreDecay24Hr, tryjobs.API_URL_PROD, *tryJobBucket, common.PROJECT_REPO_MAPPING, *swarmingPools, *pubsubTopicName, depotTools, gerrit, *tasksCfgProject, *tasksCfgInstance, tokenSource)
 	if err != nil {
 		sklog.Fatal(err)
 	}
diff --git a/task_scheduler/go/tryjobs/testutils.go b/task_scheduler/go/tryjobs/testutils.go
index 4f29a5f..ad266f3 100644
--- a/task_scheduler/go/tryjobs/testutils.go
+++ b/task_scheduler/go/tryjobs/testutils.go
@@ -24,6 +24,7 @@
 	"go.skia.org/infra/go/testutils"
 	"go.skia.org/infra/task_scheduler/go/db/local_db"
 	"go.skia.org/infra/task_scheduler/go/specs"
+	specs_testutils "go.skia.org/infra/task_scheduler/go/specs/testutils"
 	"go.skia.org/infra/task_scheduler/go/types"
 	"go.skia.org/infra/task_scheduler/go/window"
 )
@@ -109,7 +110,8 @@
 	// Set up other TryJobIntegrator inputs.
 	window, err := window.New(time.Hour, 100, rm)
 	assert.NoError(t, err)
-	taskCfgCache, err := specs.NewTaskCfgCache(ctx, rm, depot_tools_testutils.GetDepotTools(t, ctx), path.Join(tmpDir, "cache"), specs.DEFAULT_NUM_WORKERS)
+	btProject, btInstance, btCleanup := specs_testutils.SetupBigTable(t)
+	taskCfgCache, err := specs.NewTaskCfgCache(ctx, rm, depot_tools_testutils.GetDepotTools(t, ctx), path.Join(tmpDir, "cache"), specs.DEFAULT_NUM_WORKERS, btProject, btInstance, nil)
 	assert.NoError(t, err)
 	d, err := local_db.NewDB("tasks_db", path.Join(tmpDir, "tasks.db"), nil)
 	assert.NoError(t, err)
@@ -130,6 +132,7 @@
 	return ctx, integrator, gb, mock, func() {
 		testutils.RemoveAll(t, tmpDir)
 		gb.Cleanup()
+		btCleanup()
 	}
 }
 
diff --git a/task_scheduler/go/types/repo_state.go b/task_scheduler/go/types/repo_state.go
index 376f6c9..7c44934 100644
--- a/task_scheduler/go/types/repo_state.go
+++ b/task_scheduler/go/types/repo_state.go
@@ -2,12 +2,17 @@
 
 import (
 	"fmt"
+	"strings"
 
+	"go.skia.org/infra/go/common"
 	"go.skia.org/infra/go/git/repograph"
+	"go.skia.org/infra/go/sklog"
 )
 
 const (
 	ISSUE_SHORT_LENGTH = 2
+
+	BT_ROW_KEY_VERSION = "0"
 )
 
 // Patch describes a patch which may be applied to a code checkout.
@@ -40,6 +45,30 @@
 	return p.Issue != "" && p.Patchset != "" && p.Server != ""
 }
 
+// patchIdentifier returns the identifier sequence for the patch: short issue
+// number, full issue number, patch set number.
+func (p Patch) patchIdentifier() []string {
+	issueShort := p.Issue
+	if len(issueShort) > ISSUE_SHORT_LENGTH {
+		issueShort = issueShort[len(p.Issue)-ISSUE_SHORT_LENGTH:]
+	}
+	return []string{issueShort, p.Issue, p.Patchset}
+}
+
+// GetPatchRef returns the ref for the tryjob patch, if the RepoState includes
+// a patch, and "" otherwise.
+func (p Patch) GetPatchRef() string {
+	if p.Full() {
+		return fmt.Sprintf("refs/changes/%s", strings.Join(p.patchIdentifier(), "/"))
+	}
+	return ""
+}
+
+// RowKey returns a BigTable-compatible row key for the Patch.
+func (p Patch) RowKey() string {
+	return strings.Join(p.patchIdentifier(), "#")
+}
+
 // RepoState encapsulates all of the parameters which define the state of a
 // repo.
 type RepoState struct {
@@ -67,19 +96,6 @@
 	return s.Patch.Full()
 }
 
-// GetPatchRef returns the ref for the tryjob patch, if the RepoState includes
-// a patch, and "" otherwise.
-func (s RepoState) GetPatchRef() string {
-	if s.IsTryJob() {
-		issueShort := s.Issue
-		if len(issueShort) > ISSUE_SHORT_LENGTH {
-			issueShort = issueShort[len(s.Issue)-ISSUE_SHORT_LENGTH:]
-		}
-		return fmt.Sprintf("refs/changes/%s/%s/%s", issueShort, s.Issue, s.Patchset)
-	}
-	return ""
-}
-
 // GetCommit returns the repograph.Commit referenced by s, or an error if it
 // can't be found.
 func (s RepoState) GetCommit(repos repograph.Map) (*repograph.Commit, error) {
@@ -117,3 +133,13 @@
 	}
 	return rv, nil
 }
+
+// RowKey returns a BigTable-compatible row key for the RepoState.
+func (s RepoState) RowKey() string {
+	repo, ok := common.REPO_PROJECT_MAPPING[s.Repo]
+	if !ok {
+		sklog.Errorf("Unknown repo: %s; can't shorten it to a row key.", s.Repo)
+		repo = s.Repo
+	}
+	return strings.Join([]string{BT_ROW_KEY_VERSION, s.Revision, repo, s.Patch.RowKey()}, "#")
+}
diff --git a/task_scheduler/sys/task-scheduler-internal.service b/task_scheduler/sys/task-scheduler-internal.service
index e13856a..d70f271 100644
--- a/task_scheduler/sys/task-scheduler-internal.service
+++ b/task_scheduler/sys/task-scheduler-internal.service
@@ -18,6 +18,8 @@
     --repo=https://skia.googlesource.com/skia_internal.git \
     --resources_dir=/usr/local/share/task-scheduler/ \
     --swarming_server=chrome-swarming.appspot.com \
+    --tasks_cfg_project=skia-corp \
+    --tasks_cfg_instance=tasks-cfg-internal \
     --tryjob_bucket=skia.internal \
     --workdir=/mnt/pd0/task_scheduler_workdir
 Restart=always
diff --git a/task_scheduler/sys/task-scheduler-staging.service b/task_scheduler/sys/task-scheduler-staging.service
index a322539..611b8f5 100644
--- a/task_scheduler/sys/task-scheduler-staging.service
+++ b/task_scheduler/sys/task-scheduler-staging.service
@@ -18,6 +18,8 @@
     --repo=https://skia.googlesource.com/skiabot-test.git \
     --resources_dir=/usr/local/share/task-scheduler/ \
     --swarming_server=chromium-swarm-dev.appspot.com \
+    --tasks_cfg_project=skia-public \
+    --tasks_cfg_instance=tasks-cfg-staging \
     --tryjob_bucket=skia.testing \
     --workdir=/mnt/pd0/task_scheduler_workdir
 Restart=always
diff --git a/task_scheduler/sys/task-scheduler.service b/task_scheduler/sys/task-scheduler.service
index 782014c..8702ef5 100644
--- a/task_scheduler/sys/task-scheduler.service
+++ b/task_scheduler/sys/task-scheduler.service
@@ -9,6 +9,8 @@
     --host=task-scheduler.skia.org \
     --logtostderr \
     --pubsub_topic_set=prod \
+    --tasks_cfg_project=skia-public \
+    --tasks_cfg_instance=tasks-cfg-prod \
     --workdir=/mnt/pd0/task_scheduler_workdir \
     --resources_dir=/usr/local/share/task-scheduler/
 Restart=always
diff --git a/task_scheduler/vm.go b/task_scheduler/vm.go
index 6211a6e..d4256e0 100644
--- a/task_scheduler/vm.go
+++ b/task_scheduler/vm.go
@@ -4,6 +4,7 @@
 	"path"
 	"runtime"
 
+	"cloud.google.com/go/bigtable"
 	"cloud.google.com/go/datastore"
 	"go.skia.org/infra/go/auth"
 	"go.skia.org/infra/go/gce"
@@ -20,6 +21,7 @@
 	vm.Scopes = append(vm.Scopes,
 		auth.SCOPE_GERRIT,
 		datastore.ScopeDatastore,
+		bigtable.Scope,
 	)
 
 	_, filename, _, _ := runtime.Caller(0)