[gold] Implements a tracestore based on BigTable

How to review this CL:

  1. Read bt_tracestore/BIGTABLE.md to get an overview of how the
     data is stored in BT.
  2. Browse through the consts and types in bt_tracestore/types.go
  3. Read TestBTTraceStorePutGet to get a sense of how this code will be used.
  4. Read the implementations for Put and GetTile, as that's the core pathway.
  5. Look at everything else.

Initially uploaded as https://skia-review.googlesource.com/c/buildbot/+/213381
but taken over by me.

Bug: skia:9096
Change-Id: Ida1279f315abc5c21fcca42563e77b16342cc4c4
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/213426
Commit-Queue: Kevin Lubick <kjlubick@google.com>
Reviewed-by: Ben Wagner aka dogben <benjaminwagner@google.com>
diff --git a/go/Makefile b/go/Makefile
index 12feed1..398314d 100644
--- a/go/Makefile
+++ b/go/Makefile
@@ -26,4 +26,4 @@
 mocks:
 	go get github.com/vektra/mockery/...
 
-	go generate ./...
\ No newline at end of file
+	go generate ./...
diff --git a/go/gitstore/bt_gitstore/bt_gitstore.go b/go/gitstore/bt_gitstore/bt_gitstore.go
index 77bb840..fa1d4d3 100644
--- a/go/gitstore/bt_gitstore/bt_gitstore.go
+++ b/go/gitstore/bt_gitstore/bt_gitstore.go
@@ -454,12 +454,12 @@
 		egroup.Go(func() error {
 			rowNames := rowNames[chunkStart:chunkEnd]
 			mutations := mutations[chunkStart:chunkEnd]
-			errs, err := b.table.ApplyBulk(context.TODO(), rowNames, mutations)
+			errs, err := b.table.ApplyBulk(ctx, rowNames, mutations)
 			if err != nil {
-				return skerr.Fmt("Error writing batch: %s", err)
+				return skerr.Fmt("Error writing batch [%d:%d]: %s", chunkStart, chunkEnd, err)
 			}
 			if errs != nil {
-				return skerr.Fmt("Error writing some portions of batch: %s", errs)
+				return skerr.Fmt("Error writing some portions of batch [%d:%d]: %s", chunkStart, chunkEnd, errs)
 			}
 			return nil
 		})
diff --git a/go/paramtools/params.go b/go/paramtools/params.go
index ddf32bf..7e33bb2 100644
--- a/go/paramtools/params.go
+++ b/go/paramtools/params.go
@@ -10,6 +10,7 @@
 	"strings"
 	"sync"
 
+	"go.skia.org/infra/go/skerr"
 	"go.skia.org/infra/go/util"
 )
 
@@ -378,15 +379,15 @@
 		}
 		parts := strings.Split(pair, "=")
 		if len(parts) != 2 {
-			return nil, fmt.Errorf("Failed to parse: %s", pair)
+			return nil, skerr.Fmt("failed to parse: %s", pair)
 		}
 		key, ok := p.keys[parts[0]]
 		if !ok {
-			return nil, fmt.Errorf("Failed to find key: %q", parts[0])
+			return nil, skerr.Fmt("failed to find key: %q", parts[0])
 		}
 		value, ok := p.values[parts[0]][parts[1]]
 		if !ok {
-			return nil, fmt.Errorf("Failed to find value: %q", parts[1])
+			return nil, skerr.Fmt("failed to find value %q in %q (%q)", parts[1], p.values[parts[0]], parts[0])
 		}
 		ret[key] = value
 	}
@@ -450,7 +451,7 @@
 	return o.paramsEncoder
 }
 
-// EncodeParamsAsString encodes the Params as a string.
+// EncodeParamsAsString encodes the Params as a string containing indices.
 func (o *OrderedParamSet) EncodeParamsAsString(p Params) (string, error) {
 	return o.getParamsEncoder().encodeAsString(p)
 }
diff --git a/go/util/util.go b/go/util/util.go
index 1f71be7..6f28a0d 100644
--- a/go/util/util.go
+++ b/go/util/util.go
@@ -185,6 +185,23 @@
 	return r
 }
 
+// ReverseString reverses a string. It may not handle well for UTF
+// combining characters.
+// https://groups.google.com/forum/#!topic/golang-nuts/oPuBaYJ17t4
+func ReverseString(input string) string {
+	// Get Unicode code points.
+	runes := []rune(input)
+	n := len(runes)
+
+	// Reverse
+	for i := 0; i < n/2; i++ {
+		runes[i], runes[n-1-i] = runes[n-1-i], runes[i]
+	}
+
+	// Convert back to UTF-8.
+	return string(runes)
+}
+
 // InsertString inserts the given string into the slice at the given index.
 func InsertString(strs []string, idx int, s string) []string {
 	oldLen := len(strs)
diff --git a/go/vcsinfo/mocks/VCS.go b/go/vcsinfo/mocks/VCS.go
new file mode 100644
index 0000000..176048c
--- /dev/null
+++ b/go/vcsinfo/mocks/VCS.go
@@ -0,0 +1,221 @@
+// Code generated by mockery v1.0.0. DO NOT EDIT.
+
+package mocks
+
+import context "context"
+import mock "github.com/stretchr/testify/mock"
+import time "time"
+import vcsinfo "go.skia.org/infra/go/vcsinfo"
+
+// VCS is an autogenerated mock type for the VCS type
+type VCS struct {
+	mock.Mock
+}
+
+// ByIndex provides a mock function with given fields: ctx, N
+func (_m *VCS) ByIndex(ctx context.Context, N int) (*vcsinfo.LongCommit, error) {
+	ret := _m.Called(ctx, N)
+
+	var r0 *vcsinfo.LongCommit
+	if rf, ok := ret.Get(0).(func(context.Context, int) *vcsinfo.LongCommit); ok {
+		r0 = rf(ctx, N)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(*vcsinfo.LongCommit)
+		}
+	}
+
+	var r1 error
+	if rf, ok := ret.Get(1).(func(context.Context, int) error); ok {
+		r1 = rf(ctx, N)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// Details provides a mock function with given fields: ctx, hash, includeBranchInfo
+func (_m *VCS) Details(ctx context.Context, hash string, includeBranchInfo bool) (*vcsinfo.LongCommit, error) {
+	ret := _m.Called(ctx, hash, includeBranchInfo)
+
+	var r0 *vcsinfo.LongCommit
+	if rf, ok := ret.Get(0).(func(context.Context, string, bool) *vcsinfo.LongCommit); ok {
+		r0 = rf(ctx, hash, includeBranchInfo)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(*vcsinfo.LongCommit)
+		}
+	}
+
+	var r1 error
+	if rf, ok := ret.Get(1).(func(context.Context, string, bool) error); ok {
+		r1 = rf(ctx, hash, includeBranchInfo)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// DetailsMulti provides a mock function with given fields: ctx, hashes, includeBranchInfo
+func (_m *VCS) DetailsMulti(ctx context.Context, hashes []string, includeBranchInfo bool) ([]*vcsinfo.LongCommit, error) {
+	ret := _m.Called(ctx, hashes, includeBranchInfo)
+
+	var r0 []*vcsinfo.LongCommit
+	if rf, ok := ret.Get(0).(func(context.Context, []string, bool) []*vcsinfo.LongCommit); ok {
+		r0 = rf(ctx, hashes, includeBranchInfo)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).([]*vcsinfo.LongCommit)
+		}
+	}
+
+	var r1 error
+	if rf, ok := ret.Get(1).(func(context.Context, []string, bool) error); ok {
+		r1 = rf(ctx, hashes, includeBranchInfo)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// From provides a mock function with given fields: start
+func (_m *VCS) From(start time.Time) []string {
+	ret := _m.Called(start)
+
+	var r0 []string
+	if rf, ok := ret.Get(0).(func(time.Time) []string); ok {
+		r0 = rf(start)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).([]string)
+		}
+	}
+
+	return r0
+}
+
+// GetBranch provides a mock function with given fields:
+func (_m *VCS) GetBranch() string {
+	ret := _m.Called()
+
+	var r0 string
+	if rf, ok := ret.Get(0).(func() string); ok {
+		r0 = rf()
+	} else {
+		r0 = ret.Get(0).(string)
+	}
+
+	return r0
+}
+
+// GetFile provides a mock function with given fields: ctx, fileName, commitHash
+func (_m *VCS) GetFile(ctx context.Context, fileName string, commitHash string) (string, error) {
+	ret := _m.Called(ctx, fileName, commitHash)
+
+	var r0 string
+	if rf, ok := ret.Get(0).(func(context.Context, string, string) string); ok {
+		r0 = rf(ctx, fileName, commitHash)
+	} else {
+		r0 = ret.Get(0).(string)
+	}
+
+	var r1 error
+	if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok {
+		r1 = rf(ctx, fileName, commitHash)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// IndexOf provides a mock function with given fields: ctx, hash
+func (_m *VCS) IndexOf(ctx context.Context, hash string) (int, error) {
+	ret := _m.Called(ctx, hash)
+
+	var r0 int
+	if rf, ok := ret.Get(0).(func(context.Context, string) int); ok {
+		r0 = rf(ctx, hash)
+	} else {
+		r0 = ret.Get(0).(int)
+	}
+
+	var r1 error
+	if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
+		r1 = rf(ctx, hash)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// LastNIndex provides a mock function with given fields: N
+func (_m *VCS) LastNIndex(N int) []*vcsinfo.IndexCommit {
+	ret := _m.Called(N)
+
+	var r0 []*vcsinfo.IndexCommit
+	if rf, ok := ret.Get(0).(func(int) []*vcsinfo.IndexCommit); ok {
+		r0 = rf(N)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).([]*vcsinfo.IndexCommit)
+		}
+	}
+
+	return r0
+}
+
+// Range provides a mock function with given fields: begin, end
+func (_m *VCS) Range(begin time.Time, end time.Time) []*vcsinfo.IndexCommit {
+	ret := _m.Called(begin, end)
+
+	var r0 []*vcsinfo.IndexCommit
+	if rf, ok := ret.Get(0).(func(time.Time, time.Time) []*vcsinfo.IndexCommit); ok {
+		r0 = rf(begin, end)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).([]*vcsinfo.IndexCommit)
+		}
+	}
+
+	return r0
+}
+
+// ResolveCommit provides a mock function with given fields: ctx, commitHash
+func (_m *VCS) ResolveCommit(ctx context.Context, commitHash string) (string, error) {
+	ret := _m.Called(ctx, commitHash)
+
+	var r0 string
+	if rf, ok := ret.Get(0).(func(context.Context, string) string); ok {
+		r0 = rf(ctx, commitHash)
+	} else {
+		r0 = ret.Get(0).(string)
+	}
+
+	var r1 error
+	if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
+		r1 = rf(ctx, commitHash)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// Update provides a mock function with given fields: ctx, pull, allBranches
+func (_m *VCS) Update(ctx context.Context, pull bool, allBranches bool) error {
+	ret := _m.Called(ctx, pull, allBranches)
+
+	var r0 error
+	if rf, ok := ret.Get(0).(func(context.Context, bool, bool) error); ok {
+		r0 = rf(ctx, pull, allBranches)
+	} else {
+		r0 = ret.Error(0)
+	}
+
+	return r0
+}
diff --git a/go/vcsinfo/mocks/generate.go b/go/vcsinfo/mocks/generate.go
new file mode 100644
index 0000000..f926ff3
--- /dev/null
+++ b/go/vcsinfo/mocks/generate.go
@@ -0,0 +1,3 @@
+package mocks
+
+//go:generate mockery -name VCS -dir .. -output .
diff --git a/go/vcsinfo/types.go b/go/vcsinfo/types.go
index e6a980c..593ff23 100644
--- a/go/vcsinfo/types.go
+++ b/go/vcsinfo/types.go
@@ -83,7 +83,7 @@
 	// includes 'begin' and excludes 'end'.
 	Range(begin, end time.Time) []*IndexCommit
 
-	// IndexOf returns the index of the commit hash, where 0 is the index of the first hash.
+	// IndexOf returns the index of the commit hash, where 0 is the index of the first commit.
 	IndexOf(ctx context.Context, hash string) (int, error)
 
 	// ByIndex returns a LongCommit describing the commit
diff --git a/golden/go/digest_counter/digest_counter_test.go b/golden/go/digest_counter/digest_counter_test.go
index 0656410..58a2782 100644
--- a/golden/go/digest_counter/digest_counter_test.go
+++ b/golden/go/digest_counter/digest_counter_test.go
@@ -120,19 +120,17 @@
 	AlphaTest = types.TestName("test_alpha")
 	BetaTest  = types.TestName("test_beta")
 
-	x86TestAlphaTraceID = tiling.TraceId("x86:test_alpha:gm")
-	x64TestAlphaTraceID = tiling.TraceId("x86_64:test_alpha:image")
+	// TraceIDs are created like tracestore.TraceIDFromParams
+	x86TestAlphaTraceID = tiling.TraceId(",config=x86,source_type=test_alpha,name=gm")
+	x64TestAlphaTraceID = tiling.TraceId(",config=x86_64,source_type=test_alpha,name=image")
 
-	x64TestBetaTraceID = tiling.TraceId("x86_64:test_beta:image")
+	x64TestBetaTraceID = tiling.TraceId(",config=x86_64,source_type=test_beta,name=image")
 )
 
 func makePartialTileOne() *tiling.Tile {
 	return &tiling.Tile{
 		// Commits, Scale and Tile Index omitted (should not affect things)
-
 		Traces: map[tiling.TraceId]tiling.Trace{
-			// Reminder that the ids for the traces are created by concatenating
-			// all the values in alphabetical order of the keys.
 			x86TestAlphaTraceID: &types.GoldenTrace{
 				Digests: types.DigestSlice{FirstDigest, FirstDigest, SecondDigest},
 				Keys: map[string]string{
diff --git a/golden/go/goldingestion/goldingestion_test.go b/golden/go/goldingestion/goldingestion_test.go
index 137f093..e3d0bdf 100644
--- a/golden/go/goldingestion/goldingestion_test.go
+++ b/golden/go/goldingestion/goldingestion_test.go
@@ -41,6 +41,8 @@
 
 var (
 	// trace ids and values that are contained in the test file.
+	// These trace ids are the old format, new ones are like:
+	//   ,key1=value1,key2=value2
 	TEST_ENTRIES = []struct {
 		key   tiling.TraceId
 		value types.Digest
diff --git a/golden/go/indexer/indexer_test.go b/golden/go/indexer/indexer_test.go
index 094fc4e..9ee68e2 100644
--- a/golden/go/indexer/indexer_test.go
+++ b/golden/go/indexer/indexer_test.go
@@ -247,8 +247,8 @@
 func makeComplexTileWithCrosshatchIgnores() (types.ComplexTile, *tiling.Tile, *tiling.Tile) {
 	fullTile := data.MakeTestTile()
 	partialTile := data.MakeTestTile()
-	delete(partialTile.Traces, "crosshatch:test_alpha:gm")
-	delete(partialTile.Traces, "crosshatch:test_beta:gm")
+	delete(partialTile.Traces, ",device=crosshatch,name=test_alpha,source_type=gm,")
+	delete(partialTile.Traces, ",device=crosshatch,name=test_beta,source_type=gm,")
 
 	ct := types.NewComplexTile(fullTile)
 	ct.SetIgnoreRules(partialTile, []paramtools.ParamSet{
diff --git a/golden/go/paramsets/paramsets_test.go b/golden/go/paramsets/paramsets_test.go
index 2aa9930..03ad627 100644
--- a/golden/go/paramsets/paramsets_test.go
+++ b/golden/go/paramsets/paramsets_test.go
@@ -158,7 +158,7 @@
 		// Commits, Scale and Tile Index omitted (should not affect things)
 		Traces: map[tiling.TraceId]tiling.Trace{
 			// These trace ids have been shortened for test terseness.
-			// A real trace id would be like "8888:gm:foo"
+			// A real trace id would be like ",config=8888,source_type=gm,name=foo,"
 			"a": &types.GoldenTrace{
 				Digests: types.DigestSlice{DigestA, DigestB},
 				Keys: map[string]string{
diff --git a/golden/go/summary/summary_test.go b/golden/go/summary/summary_test.go
index 4b64d0c..1ef9692 100644
--- a/golden/go/summary/summary_test.go
+++ b/golden/go/summary/summary_test.go
@@ -86,7 +86,7 @@
 	tile := &tiling.Tile{
 		Traces: map[tiling.TraceId]tiling.Trace{
 			// These trace ids have been shortened for test terseness.
-			// A real trace id would be like "8888:gm:test_first"
+			// A real trace id would be like ",config=8888,source_type=gm,name=foo,"
 			"a": &types.GoldenTrace{
 				Digests: types.DigestSlice{"aaa", "bbb"},
 				Keys: map[string]string{
diff --git a/golden/go/testutils/data_three_devices/three_devices.go b/golden/go/testutils/data_three_devices/three_devices.go
index fa88386..1c281ebf 100644
--- a/golden/go/testutils/data_three_devices/three_devices.go
+++ b/golden/go/testutils/data_three_devices/three_devices.go
@@ -44,6 +44,10 @@
 
 	AlphaTest = types.TestName("test_alpha")
 	BetaTest  = types.TestName("test_beta")
+
+	AnglerDevice     = "angler"
+	BullheadDevice   = "bullhead"
+	CrosshatchDevice = "crosshatch"
 )
 
 func MakeTestBaseline() *baseline.Baseline {
@@ -95,63 +99,71 @@
 func MakeTestTile() *tiling.Tile {
 	return &tiling.Tile{
 		Commits:   MakeTestCommits(),
-		Scale:     1,
+		Scale:     0, // tile contains every data point.
 		TileIndex: 0,
 
 		Traces: map[tiling.TraceId]tiling.Trace{
-			// Reminder that the ids for the traces are created by concatenating
-			// all the values in alphabetical order of the keys.
-			"angler:test_alpha:gm": &types.GoldenTrace{
+			// Reminder that the ids for the traces are created using the
+			// logic in query.MakeKeyFast
+			",device=angler,name=test_alpha,source_type=gm,": &types.GoldenTrace{
 				Digests: types.DigestSlice{AlphaBad1Digest, AlphaBad1Digest, AlphaGood1Digest},
 				Keys: map[string]string{
-					"device":                "angler",
+					"device":                AnglerDevice,
 					types.PRIMARY_KEY_FIELD: string(AlphaTest),
 					types.CORPUS_FIELD:      "gm",
 				},
 			},
-			"angler:test_beta:gm": &types.GoldenTrace{
+			",device=angler,name=test_beta,source_type=gm,": &types.GoldenTrace{
 				Digests: types.DigestSlice{BetaGood1Digest, BetaGood1Digest, BetaGood1Digest},
 				Keys: map[string]string{
-					"device":                "angler",
+					"device":                AnglerDevice,
 					types.PRIMARY_KEY_FIELD: string(BetaTest),
 					types.CORPUS_FIELD:      "gm",
 				},
 			},
 
-			"bullhead:test_alpha:gm": &types.GoldenTrace{
+			",device=bullhead,name=test_alpha,source_type=gm,": &types.GoldenTrace{
 				Digests: types.DigestSlice{AlphaBad1Digest, AlphaBad1Digest, AlphaUntriaged1Digest},
 				Keys: map[string]string{
-					"device":                "bullhead",
+					"device":                BullheadDevice,
 					types.PRIMARY_KEY_FIELD: string(AlphaTest),
 					types.CORPUS_FIELD:      "gm",
 				},
 			},
-			"bullhead:test_beta:gm": &types.GoldenTrace{
+			",device=bullhead,name=test_beta,source_type=gm,": &types.GoldenTrace{
 				Digests: types.DigestSlice{BetaGood1Digest, BetaGood1Digest, BetaGood1Digest},
 				Keys: map[string]string{
-					"device":                "bullhead",
+					"device":                BullheadDevice,
 					types.PRIMARY_KEY_FIELD: string(BetaTest),
 					types.CORPUS_FIELD:      "gm",
 				},
 			},
 
-			"crosshatch:test_alpha:gm": &types.GoldenTrace{
+			",device=crosshatch,name=test_alpha,source_type=gm,": &types.GoldenTrace{
 				Digests: types.DigestSlice{AlphaBad1Digest, AlphaBad1Digest, AlphaGood1Digest},
 				Keys: map[string]string{
-					"device":                "crosshatch",
+					"device":                CrosshatchDevice,
 					types.PRIMARY_KEY_FIELD: string(AlphaTest),
 					types.CORPUS_FIELD:      "gm",
 				},
 			},
-			"crosshatch:test_beta:gm": &types.GoldenTrace{
+			",device=crosshatch,name=test_beta,source_type=gm,": &types.GoldenTrace{
 				Digests: types.DigestSlice{BetaUntriaged1Digest, types.MISSING_DIGEST, types.MISSING_DIGEST},
 				Keys: map[string]string{
-					"device":                "crosshatch",
+					"device":                CrosshatchDevice,
 					types.PRIMARY_KEY_FIELD: string(BetaTest),
 					types.CORPUS_FIELD:      "gm",
 				},
 			},
 		},
+
+		// Summarizes all the keys and values seen in this tile
+		// The values should be in alphabetical order (see paramset.Normalize())
+		ParamSet: map[string][]string{
+			"device":                {AnglerDevice, BullheadDevice, CrosshatchDevice},
+			types.PRIMARY_KEY_FIELD: {string(AlphaTest), string(BetaTest)},
+			types.CORPUS_FIELD:      {"gm"},
+		},
 	}
 }
 
diff --git a/golden/go/tracestore/bt_tracestore/.gitignore b/golden/go/tracestore/bt_tracestore/.gitignore
new file mode 100644
index 0000000..d383c56
--- /dev/null
+++ b/golden/go/tracestore/bt_tracestore/.gitignore
@@ -0,0 +1 @@
+testdata
diff --git a/golden/go/tracestore/bt_tracestore/BIGTABLE.md b/golden/go/tracestore/bt_tracestore/BIGTABLE.md
new file mode 100644
index 0000000..ac6cec6
--- /dev/null
+++ b/golden/go/tracestore/bt_tracestore/BIGTABLE.md
@@ -0,0 +1,209 @@
+Storing Gold traces in BigTable
+===============================
+
+This implementation is based on the original btts.go made for Perf.
+See perf/BIGTABLE.md for an overview of that schema.
+
+Note that we assume our Big Table instance is strongly consistent, which is
+only valid if there is no replication (which we shouldn't need for now).
+
+Data we want to store
+---------------------
+
+We need to store traces, which are arranged in a table.
+Traces have commits as their columns, a tiling.traceID as their rows, and
+a digest as their cells. These traceIDs are derived from the key:value pairs
+associated with the digest, e.g. {"os":"Android", "gpu": "NVidia1080"}
+
+Gold has an idea of a tile, which is a collection of N of these trace columns (i.e. commits).
+
+
+Mapping the data to BigTable
+----------------------------
+
+BigTable (BT) stores its data as one big table. Concretely, we have
+
+```
+     | col_1  | col_2  | col_3  | ...
+==================================
+row_1| [data] | [data] | [data] | ...
+row_2| [data] | [data] | [data] | ...
+...
+```
+
+For performance, BT provides [Column Families](https://cloud.google.com/bigtable/docs/schema-design#column_families_and_column_qualifiers)
+which group columns together so you can query some cells from a row belonging
+to that family w/o getting everything. As an analogy, if the whole table is a Sheets
+workbook, a column family can be thought of as a single spreadsheet in that workbook (at least,
+for our purposes).
+
+BT uses the term "Column Qualifier", which essentially just means "column name", because they
+can be arbitrary strings.
+
+It's tempting to just put the commit hashes as columns, the trace ids as the row names and store the
+digests in the cells, but this isn't ideal for these reasons:
+
+  1. traceIDs get really long with lots of params - BT has a limit of 4kb per
+     row name (aka a row key).
+  2. digests are long strings that are repeated a lot - if we stored them as int64s, it would save
+     a lot of data when we fetch a row (32 bytes per cell -> 8 bytes per cell).
+  3. BT has a soft limit of 100Mb of data per row.
+  4. BT will perform better if each row contains only the columns we are interested in
+    reading. Since we only really want to fetch the last few hundred commits, we would
+    like to limit each row to contain enough data for a single tile.
+    See also https://cloud.google.com/bigtable/docs/performance#slower-perf
+  5. We want to split our rows up further into shards, so we can execute multiple queries at once
+     (one query per shard).
+
+To address these performance issues, we need to store our traces and some auxiliary data.
+
+  1. An OrderedParamSet that can convert tiling.TraceId (long string) <->
+     EncodedTraceId (short string).
+  2. A DigestMap that can convert types.Digest (string) <-> DigestID (int64).
+  3. A monotonically increasing counter to derive more DigestIDs.
+
+These 3 "tables" along with the traces will be stored using 4 Column Families and can
+logically thought of being stored as independent "tables" or "spreadsheets" even
+though they are all stored in the "one big table".
+
+Row naming scheme
+-----------------
+There is a row naming scheme (for all 4 tables) as follows:
+
+    [shard]:[namespace]:[type]:[tile]:[subkey]
+
+Where shard and subkey are optional (can be ""). Some tables have tile with a constant value.
+"namespace" is constant for all tracestore data: "ts". Reminder that there's one table per Gold
+instance, so if we store other data to BT (e.g. expectations, tryjobs, etc) we need to have
+several unique namespaces.
+
+tile is a 0 padded 32 bit number (2^32-1) - [tile number].
+For example, tile 0 (the oldest commits) is number `2147483646`.
+Note that this reverses the order of the tiles, i.e. new tiles have
+smaller numbers, so that we can do a simple query to find the newest tile.
+
+BigTable can easily fetch rows starting with a given prefix, so this naming schema
+is set up to request things of a type for one or more tiles, with optional sharding.
+
+Note that sharding is a thing we have enabled by our choice of row names, not something
+given out for free by BigTable.
+
+Gold (and Perf) shards its data based on the subkey (conceptually subkey % 32)
+This makes the workload be spread more evenly, even when fetching only one tile.
+The shards come first in the naming convention to try to spread the rows across multiple
+tablets for better performance (rows on BT are stored on tablets sorted by the entire row name).
+
+Storing the OrderedParamSet for traceIDs
+----------------------------------------
+As mentioned above, traceIDs are derived from the paramtools.Params map of key-value pairs.
+We compress these maps using a paramstool.OrderedParamSet which concretely look like:
+
+    ,0=1,4=2,3=1,
+
+To do this compression/decompression, we need to store the OrderedParamSet (OPS).
+There is one OPS per tile. Conceptually, an OPS is stored like:
+```
+           |   OPS   |    H   |
+==============================
+ops_tile0 | [bytes] | [hash] |
+ops_tile1 | [bytes] | [hash] |
+...
+```
+
+The bytes stored in under the "OPS" column are just a gob encoded OrderedParamSet and
+the hash stored under the "H" column is the hash of the OPS, used to query the row when updating.
+
+As mentioned before, ops_tile0 is a conceptual simplification of the actual
+row name. Given the row naming schema, we define "type" for OPS to be "o" and
+let "shard" and "subkey" both be "".
+Thus, the actual row for tile 0 would be
+
+    :ts:o:2147483646:
+
+
+Storing the Digest Map
+----------------------
+We need to store a (potentially large) map of types.Digest (string) <-> DigestID (int64).
+This map is global across tiles. Conceptually, it is stored like:
+```
+        | [digest1] | [digest2] | [digest3] | ...
+====================================================
+map_000 |  [blank]  |  [int64]  |  [blank]  |
+map_001 |  [int64]  |  [blank]  |  [blank]  |
+...
+map_ffe |  [blank]  |  [blank]  |  [blank]  |
+map_fff |  [blank]  |  [blank]  |  [int64]  |
+...
+```
+
+Basically, we take a digest, chop off the first three characters, use those as
+the "subkey" in the row so we can make sure our rows don't exceed the maximum size.
+Given the soft row limit of 100Mb, a digest column + a cell is at most 40 bytes,
+which means a soft limit of 2.5 million digests per row. Splitting on 3 hex
+characters means we have 4096-way splitting, so a soft limit of 10 billion digests.
+
+Why not just have rows be the digests and a single column with the id? BT can only fetch
+so many rows per second (see https://cloud.google.com/bigtable/docs/performance#typical-workloads)
+If we had a million digests in the single column schema, one SSD node would take 100 seconds
+to request those million rows, where for the three character schema, it would take about .4 seconds.
+
+Given that we define "type" for the digest map to be "d", an
+example row for digest "92eb5ffee6ae2fec3ad71c777531578f"
+(assume this subkey is tied to shard 19) would be:
+
+    19:ts:d:0000000000:92e
+
+The ids of digests out start at 0 (for MISSING_DIGEST aka "") and increase monotonically.
+We manage these ids ourselves because using the autogenerated ids can lead to issues when
+migrating data from one table to another.
+
+Storing the Digest ID counter
+-----------------------------
+We assign newly seen digests an id that is simply an increasing int64. To store
+this int64 in BT, we essentially have a single cell dedicated to this:
+
+```
+               |    idc   |
+====================================================
+digest_counter |  [int64] |
+```
+We have one global id counter to go with the one global digestMap.
+
+The interfacing code will take care not to constantly update this value (BT frowns upon
+having very "hot" rows/cells) by requesting new ids in batches and saving them locally.
+
+There's only one row for the counter, which is:
+
+    :ts:i:0000000000:
+
+Storing the traces
+------------------
+
+With all the auxiliary data set, we can look at how the traces themselves are stored.
+Going with the default tile size of 256, the data would be like:
+
+```
+             | offset0 | offset1 | ... | offset255
+====================================================
+tile0_trace0 | [dig_1] | [dig_1] | ... | [dig_2] |
+tile0_trace1 | [dig_1] | [dig_2] | ... | [dig_2] |
+...
+tile0_traceN | [dig_8] | [blank] | ... | [dig_6] |
+tile1_trace0 | [blank] | [dig_2] | ... | [blank] |
+...
+```
+
+The columns are the offset into a tile of a commit. For example, the third commit in a repo would
+end up in tile 0, offset 3. The 1000th commit with a tile size of 256 would be in
+tile 3 (1000 / 256), offset 232 (1000 % 256). This has the effect of wrapping a given
+trace across many tiles.
+
+The rows follow the standard naming scheme, using "t" as "type", and making use of the shards
+(32 by default). The value for "subkey" is the encoded ParamSet (and from this subkey a shard
+is derived). An example row for encoded ParamSet ",0=1,1=3,3=0," on tile 0 (assume shard
+calculates to be 7) would be:
+
+    07:ts:t:2147483646:,0=1,1=3,3=0,
+
+The cells are the int64 ids of the digest that were drawn according to that ParamSet.
+Blank cells will be read as id 0, which is hard-coded to belong to MISSING_DIGEST ("").
diff --git a/golden/go/tracestore/bt_tracestore/bt_tracestore.go b/golden/go/tracestore/bt_tracestore/bt_tracestore.go
new file mode 100644
index 0000000..22f0069
--- /dev/null
+++ b/golden/go/tracestore/bt_tracestore/bt_tracestore.go
@@ -0,0 +1,992 @@
+// Package bt_tracestore implements a tracestore backed by BigTable
+// See BIGTABLE.md for an overview of the schema and design.
+package bt_tracestore
+
+import (
+	"context"
+	"encoding/binary"
+	"hash/crc32"
+	"sort"
+	"strconv"
+	"strings"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"cloud.google.com/go/bigtable"
+	"go.skia.org/infra/go/bt"
+	"go.skia.org/infra/go/metrics2"
+	"go.skia.org/infra/go/paramtools"
+	"go.skia.org/infra/go/skerr"
+	"go.skia.org/infra/go/sklog"
+	"go.skia.org/infra/go/tiling"
+	"go.skia.org/infra/go/util"
+	"go.skia.org/infra/go/vcsinfo"
+	"go.skia.org/infra/golden/go/tracestore"
+	"go.skia.org/infra/golden/go/types"
+	"golang.org/x/sync/errgroup"
+)
+
+// InitBT initializes the BT instance for the given configuration. It uses the default way
+// to get auth information from the environment and must be called with an account that has
+// admin rights.
+func InitBT(conf BTConfig) error {
+	return bt.InitBigtable(conf.ProjectID, conf.InstanceID, conf.TableID, btColumnFamilies)
+}
+
+// BTConfig contains the configuration information for the BigTable-based implementation of
+// TraceStore.
+type BTConfig struct {
+	ProjectID  string
+	InstanceID string
+	TableID    string
+	VCS        vcsinfo.VCS
+}
+
+// BTTraceStore implements the TraceStore interface.
+type BTTraceStore struct {
+	vcs    vcsinfo.VCS
+	client *bigtable.Client
+	table  *bigtable.Table
+
+	tileSize int32
+	shards   int32
+
+	// if cacheOps is true, then cache the OrderedParamSets between calls
+	// where possible.
+	cacheOps bool
+	// maps rowName (string) -> *OpsCacheEntry
+	opsCache sync.Map
+
+	availIDsMutex sync.Mutex
+	availIDs      []digestID
+}
+
+// New implements the TraceStore interface backed by BigTable. If cache is true,
+// the OrderedParamSets will be cached based on the row name.
+func New(ctx context.Context, conf BTConfig, cache bool) (*BTTraceStore, error) {
+	client, err := bigtable.NewClient(ctx, conf.ProjectID, conf.InstanceID)
+	if err != nil {
+		return nil, skerr.Fmt("could not instantiate client: %s", err)
+	}
+
+	ret := &BTTraceStore{
+		vcs:      conf.VCS,
+		client:   client,
+		tileSize: DefaultTileSize,
+		shards:   DefaultShards,
+		table:    client.Open(conf.TableID),
+		cacheOps: cache,
+		availIDs: []digestID{},
+	}
+	return ret, nil
+}
+
+// Put implements the TraceStore interface.
+func (b *BTTraceStore) Put(ctx context.Context, commitHash string, entries []*tracestore.Entry, ts time.Time) error {
+	defer metrics2.FuncTimer().Stop()
+	// if there are no entries this becomes a no-op.
+	if len(entries) == 0 {
+		return nil
+	}
+
+	// Accumulate all parameters into a paramset and collect all the digests.
+	paramSet := make(paramtools.ParamSet, len(entries[0].Params))
+	digestSet := make(types.DigestSet, len(entries))
+	for _, entry := range entries {
+		paramSet.AddParams(entry.Params)
+		digestSet[entry.Digest] = true
+	}
+
+	repoIndex, err := b.vcs.IndexOf(ctx, commitHash)
+	if err != nil {
+		return skerr.Fmt("could not look up commit %s: %s", commitHash, err)
+	}
+
+	// Find out what tile we need to fetch and what index into that tile we need.
+	// Reminder that tileKeys start at 2^32-1 and decrease in value.
+	tileKey, commitIndex := b.getTileKey(repoIndex)
+
+	// If these entries have any params we haven't seen before, we need to store those in BigTable.
+	ops, err := b.updateOrderedParamSet(ctx, tileKey, paramSet)
+	if err != nil {
+		sklog.Warningf("Bad paramset: %#v", paramSet)
+		return skerr.Fmt("cannot update paramset: %s", err)
+	}
+
+	// Similarly, if we have some new digests (almost certainly), we need to update
+	// the digestMap with them in there. Of note, we store this
+	// map of string (types.Digest) -> int64(DigestId) in big table, then refer to
+	// the DigestID elsewhere in the table. DigestIds are essentially a monotonically
+	// increasing arbitrary number.
+	digestMap, err := b.updateDigestMap(ctx, digestSet)
+	if err != nil {
+		sklog.Warningf("Bad digestSet: %#v", digestSet)
+		return skerr.Fmt("cannot update digest map: %s", err)
+	}
+
+	metrics2.GetInt64Metric("gold_digest_map_size").Update(int64(digestMap.Len()))
+
+	if len(digestMap.Delta(digestSet)) != 0 {
+		// Should never happen
+		return skerr.Fmt("delta should be empty at this point: %v", digestMap.Delta(digestSet))
+	}
+
+	// These are two parallel arrays. mutations[i] should be applied to rowNames[i] for all i.
+	rowNames, mutations, err := b.createPutMutations(entries, ts, tileKey, commitIndex, ops, digestMap)
+	if err != nil {
+		return skerr.Fmt("could not create mutations to put data: %s", err)
+	}
+
+	// Write the trace data. We pick a batchsize based on the assumption
+	// that the whole batch should be 2MB large and each entry is ~200 Bytes of data.
+	// 2MB / 200B = 10000. This is extremely conservative but should not be a problem
+	// since the batches are written in parallel.
+	return b.applyBulkBatched(ctx, rowNames, mutations, 10000)
+}
+
+// createPutMutations is a helper function that returns two parallel arrays of
+// the rows that need updating and the mutations to apply to those rows.
+// Specifically, the mutations will add the given entries to BT, clearing out
+// anything that was there previously.
+func (b *BTTraceStore) createPutMutations(entries []*tracestore.Entry, ts time.Time, tk tileKey, commitIndex int, ops *paramtools.OrderedParamSet, dm *digestMap) ([]string, []*bigtable.Mutation, error) {
+	// These mutations...
+	mutations := make([]*bigtable.Mutation, 0, len(entries))
+	// .. should be applied to these rows.
+	rowNames := make([]string, 0, len(entries))
+	btTS := bigtable.Time(ts)
+	before := bigtable.Time(ts.Add(-1 * time.Millisecond))
+
+	for _, entry := range entries {
+		// To save space, traceID isn't the long form tiling.TraceId
+		// (e.g. ,foo=bar,baz=gm,), it's a string of key-value numbers
+		// that refer to the params.(e.g. ,0=3,2=18,)
+		// See params.paramsEncoder
+		sTrace, err := ops.EncodeParamsAsString(entry.Params)
+		if err != nil {
+			return nil, nil, skerr.Fmt("invalid params: %s", err)
+		}
+		traceID := encodedTraceID(sTrace)
+
+		rowName := b.calcShardedRowName(tk, typeTrace, string(traceID))
+		rowNames = append(rowNames, rowName)
+
+		dID, err := dm.ID(entry.Digest)
+		if err != nil {
+			// this should never happen, the digest map should know about every digest already.
+			return nil, nil, skerr.Fmt("could not fetch id for digest %s: %s", entry.Digest, err)
+		}
+
+		// Create a mutation that puts the given digest at the given row
+		// (i.e. the trace combined with the tile), at the given column
+		// (i.e. the commit offset into this tile).
+		mut := bigtable.NewMutation()
+		column := strconv.Itoa(commitIndex)
+		dBytes, err := dID.MarshalBinary()
+		if err != nil {
+			// this should never happen, we are just marshalling an int to binary
+			return nil, nil, skerr.Fmt("could not encode digest id %d to bytes: %s", dID, err)
+		}
+		mut.Set(traceFamily, column, btTS, dBytes)
+		// Delete anything that existed at this cell before now.
+		mut.DeleteTimestampRange(traceFamily, column, 0, before)
+		mutations = append(mutations, mut)
+	}
+	return rowNames, mutations, nil
+}
+
+// GetTile implements the TraceStore interface.
+// Of note, due to this request possibly spanning over multiple tiles, the ParamsSet may have a
+// set of params that does not actually correspond to a trace (this shouldn't be a problem, but is
+// worth calling out). For example, suppose a trace with param " device=alpha" abruptly ends on
+// tile 4, commit 7 (where the device was removed from testing). If we are on tile 5 and need to
+// query both tile 4 starting at commit 10 and tile 5 (the whole thing), we'll just merge the
+// paramsets from both tiles, which includes the "device=alpha" params, but they don't exist in
+// any traces seen in the tile (since it ended prior to our cutoff point).
+func (b *BTTraceStore) GetTile(ctx context.Context, nCommits int) (*tiling.Tile, []*tiling.Commit, error) {
+	defer metrics2.FuncTimer().Stop()
+	// Look up the commits we need to query from BT
+	idxCommits := b.vcs.LastNIndex(nCommits)
+	if len(idxCommits) == 0 {
+		return nil, nil, skerr.Fmt("No commits found.")
+	}
+
+	// These commits could span across multiple tiles, so derive the tiles we need to query.
+	c := idxCommits[0]
+	startTileKey, startCommitIndex := b.getTileKey(c.Index)
+
+	c = idxCommits[len(idxCommits)-1]
+	endTileKey, endCommitIndex := b.getTileKey(c.Index)
+
+	var egroup errgroup.Group
+
+	var commits []*tiling.Commit
+	egroup.Go(func() error {
+		hashes := make([]string, 0, len(idxCommits))
+		for _, ic := range idxCommits {
+			hashes = append(hashes, ic.Hash)
+		}
+		var err error
+		commits, err = b.makeTileCommits(ctx, hashes)
+		if err != nil {
+			return skerr.Fmt("could not load tile commits: %s", err)
+		}
+		return nil
+	})
+
+	var traces traceMap
+	var params paramtools.ParamSet
+	egroup.Go(func() error {
+		var err error
+		traces, params, err = b.getTracesInRange(ctx, startTileKey, endTileKey, startCommitIndex, endCommitIndex)
+
+		if err != nil {
+			return skerr.Fmt("could not load tile commits: %s", err)
+		}
+		return nil
+	})
+
+	if err := egroup.Wait(); err != nil {
+		return nil, nil, skerr.Fmt("could not load last %d commits into tile: %s", nCommits, err)
+	}
+
+	ret := &tiling.Tile{
+		Traces:   traces,
+		ParamSet: params,
+		Commits:  commits,
+		Scale:    0,
+	}
+
+	return ret, commits, nil
+}
+
+// getTracesInRange returns a traceMap with data from the given start and stop points (tile and index).
+// It also includes the ParamSet for that range.
+func (b *BTTraceStore) getTracesInRange(ctx context.Context, startTileKey, endTileKey tileKey, startCommitIndex, endCommitIndex int) (traceMap, paramtools.ParamSet, error) {
+	// Query those tiles.
+	nTiles := int(startTileKey - endTileKey + 1)
+	nCommits := int(startTileKey-endTileKey)*int(b.tileSize) + (endCommitIndex - startCommitIndex) + 1
+	encTiles := make([]*encTile, nTiles)
+	var egroup errgroup.Group
+	tk := startTileKey
+	for idx := 0; idx < nTiles; idx++ {
+		func(idx int, tk tileKey) {
+			egroup.Go(func() error {
+				var err error
+				encTiles[idx], err = b.loadTile(ctx, tk)
+				if err != nil {
+					return skerr.Fmt("could not load tile with key %d to index %d: %s", tk, idx, err)
+				}
+				return nil
+			})
+		}(idx, tk)
+		tk--
+	}
+
+	var digestMap *digestMap
+	egroup.Go(func() error {
+		var err error
+		digestMap, err = b.getDigestMap(ctx)
+		if err != nil {
+			return skerr.Fmt("could not load digestMap: %s", err)
+		}
+		return nil
+	})
+
+	if err := egroup.Wait(); err != nil {
+		return nil, nil, skerr.Fmt("could not load %d tiles: %s", nTiles, err)
+	}
+
+	// This is the full tile we are going to return.
+	tileTraces := make(traceMap, len(encTiles[0].traces))
+	paramSet := paramtools.ParamSet{}
+
+	commitIDX := 0
+	for idx, encTile := range encTiles {
+		// Determine the offset within the tile that we should consider.
+		endOffset := int(b.tileSize - 1)
+		if idx == (len(encTiles) - 1) {
+			// If we are on the last tile, stop early (that is, at endCommitIndex)
+			endOffset = endCommitIndex
+		}
+		segLen := endOffset - startCommitIndex + 1
+
+		for encodedKey, encValues := range encTile.traces {
+			// at this point, the encodedKey looks like ,0=1,1=3,3=0,
+			// See params.paramsEncoder
+			params, err := encTile.ops.DecodeParamsFromString(string(encodedKey))
+			if err != nil {
+				sklog.Warningf("Incomplete OPS: %#v\n", encTile.ops)
+				return nil, nil, skerr.Fmt("corrupted trace key - could not decode %s: %s", encodedKey, err)
+			}
+
+			// Turn the params into the tiling.TraceId we expect elsewhere.
+			traceKey := tracestore.TraceIDFromParams(params)
+			if _, ok := tileTraces[traceKey]; !ok {
+				tileTraces[traceKey] = types.NewGoldenTraceN(nCommits)
+			}
+			gt := tileTraces[traceKey].(*types.GoldenTrace)
+			gt.Keys = params
+			// Build up the total set of params
+			paramSet.AddParams(params)
+
+			// Convert the digests from integer IDs to strings.
+			digestIDs := encValues[startCommitIndex : startCommitIndex+segLen]
+			digests, err := digestMap.DecodeIDs(digestIDs)
+			if err != nil {
+				return nil, nil, skerr.Fmt("corrupted digest id - could not decode: %s", err)
+			}
+			copy(gt.Digests[commitIDX:commitIDX+segLen], digests)
+		}
+
+		// After the first tile we always start at the first entry and advance the
+		// overall commit index by the segment length.
+		commitIDX += segLen
+		startCommitIndex = 0
+	}
+
+	// Sort the params for determinism.
+	paramSet.Normalize()
+
+	return tileTraces, paramSet, nil
+}
+
+// GetDenseTile implements the TraceStore interface. It fetches the most recent tile and sees if
+// there is enough non-empty data, then queries the next oldest tile until it has nCommits
+// non-empty commits.
+func (b *BTTraceStore) GetDenseTile(ctx context.Context, nCommits int) (*tiling.Tile, []*tiling.Commit, error) {
+	defer metrics2.FuncTimer().Stop()
+	// Figure out what index we are on.
+	idxCommits := b.vcs.LastNIndex(1)
+	if len(idxCommits) == 0 {
+		return nil, nil, skerr.Fmt("No commits found.")
+	}
+
+	c := idxCommits[0]
+	tKey, endIdx := b.getTileKey(c.Index)
+	tileStartCommitIdx := c.Index - endIdx
+
+	// commitsWithData is a slice of indexes of commits that have data. These indexes are
+	// relative to the repo itself, with index 0 being the first (oldest) commit in the repo.
+	commitsWithData := make([]int, 0, nCommits)
+	paramSet := paramtools.ParamSet{}
+	allTraces := traceMap{}
+
+	// Start at the most recent tile and step backwards until we have enough commits with data.
+	for {
+		traces, params, err := b.getTracesInRange(ctx, tKey, tKey, 0, endIdx)
+
+		if err != nil {
+			return nil, nil, skerr.Fmt("could not load commits from tile %d: %s", tKey, err)
+		}
+
+		paramSet.AddParamSet(params)
+		// filledCommits are the indexes in the traces that have data.
+		// That is, they are the indexes of commits in this tile.
+		// It will be sorted from low indexes to high indexes
+		filledCommits := traces.CommitIndicesWithData()
+
+		if len(filledCommits)+len(commitsWithData) > nCommits {
+			targetLength := nCommits - len(commitsWithData)
+			// trim filledCommits so we get to exactly nCommits
+			filledCommits = filledCommits[len(filledCommits)-targetLength:]
+		}
+
+		for _, tileIdx := range filledCommits {
+			commitsWithData = append(commitsWithData, tileStartCommitIdx+tileIdx)
+		}
+		cTraces := traces.MakeFromCommitIndexes(filledCommits)
+		allTraces.PrependTraces(cTraces)
+
+		if len(commitsWithData) >= nCommits || tKey == tileKeyFromIndex(0) {
+			break
+		}
+
+		tKey++                       // go backwards in time one tile
+		endIdx = int(b.tileSize - 1) // fetch the whole previous tile
+		tileStartCommitIdx -= int(b.tileSize)
+	}
+
+	if len(commitsWithData) == 0 {
+		return &tiling.Tile{}, nil, nil
+	}
+	// put them in oldest to newest order
+	sort.Ints(commitsWithData)
+
+	oldestIdx := commitsWithData[0]
+	oldestCommit, err := b.vcs.ByIndex(ctx, oldestIdx)
+	if err != nil {
+		return nil, nil, skerr.Fmt("invalid oldest index %d: %s", oldestIdx, err)
+	}
+	hashes := b.vcs.From(oldestCommit.Timestamp.Add(-1 * time.Millisecond))
+
+	// There's no guarantee that hashes[0] == oldestCommit[0] (e.g. two commits at same timestamp)
+	// So we trim hashes down if necessary
+	for i := 0; i < len(hashes); i++ {
+		if hashes[i] == oldestCommit.Hash {
+			hashes = hashes[i:]
+			break
+		}
+	}
+
+	allCommits, err := b.makeTileCommits(ctx, hashes)
+	if err != nil {
+		return nil, nil, skerr.Fmt("could not make tile commits: %s", err)
+	}
+
+	denseCommits := make([]*tiling.Commit, len(commitsWithData))
+	for i, idx := range commitsWithData {
+		denseCommits[i] = allCommits[idx-oldestIdx]
+	}
+
+	ret := &tiling.Tile{
+		Traces:   allTraces,
+		ParamSet: paramSet,
+		Commits:  denseCommits,
+		Scale:    0,
+	}
+	return ret, allCommits, nil
+}
+
+// getTileKey retrieves the tile key and the index of the commit in the given tile (commitIndex)
+// given the index of a commit in the repo (repoIndex).
+// commitIndex starts at 0 for the oldest commit in the tile.
+func (b *BTTraceStore) getTileKey(repoIndex int) (tileKey, int) {
+	tileIndex := int32(repoIndex) / b.tileSize
+	commitIndex := repoIndex % int(b.tileSize)
+	return tileKeyFromIndex(tileIndex), commitIndex
+}
+
+// loadTile returns an *encTile corresponding to the tileKey.
+func (b *BTTraceStore) loadTile(ctx context.Context, tileKey tileKey) (*encTile, error) {
+	defer metrics2.FuncTimer().Stop()
+	var egroup errgroup.Group
+
+	// Load the OrderedParamSet so the caller can decode the data from the tile.
+	var ops *paramtools.OrderedParamSet
+	egroup.Go(func() error {
+		opsEntry, _, err := b.getOPS(ctx, tileKey)
+		if err != nil {
+			return skerr.Fmt("could not load OPS: %s", err)
+		}
+		ops = opsEntry.ops
+		return nil
+	})
+
+	var traces map[encodedTraceID][]digestID
+	egroup.Go(func() error {
+		var err error
+		traces, err = b.loadEncodedTraces(ctx, tileKey)
+		if err != nil {
+			return skerr.Fmt("could not load traces: %s", err)
+		}
+		return nil
+	})
+
+	if err := egroup.Wait(); err != nil {
+		return nil, err
+	}
+
+	return &encTile{
+		ops:    ops,
+		traces: traces,
+	}, nil
+}
+
+// loadEncodedTraces returns all traces belonging to the given tileKey.
+// As outlined in BIGTABLE.md, the trace ids and the digest ids they
+// map to are in an encoded form and will need to be expanded prior to use.
+func (b *BTTraceStore) loadEncodedTraces(ctx context.Context, tileKey tileKey) (map[encodedTraceID][]digestID, error) {
+	defer metrics2.FuncTimer().Stop()
+	var egroup errgroup.Group
+	shardResults := make([]map[encodedTraceID][]digestID, b.shards)
+	traceCount := int64(0)
+
+	// Query all shards in parallel.
+	for shard := int32(0); shard < b.shards; shard++ {
+		func(shard int32) {
+			egroup.Go(func() error {
+				// This prefix will match all traces belonging to the
+				// current shard in the current tile.
+				prefixRange := bigtable.PrefixRange(shardedRowName(shard, typeTrace, tileKey, ""))
+				target := map[encodedTraceID][]digestID{}
+				shardResults[shard] = target
+				var parseErr error
+				err := b.table.ReadRows(ctx, prefixRange, func(row bigtable.Row) bool {
+					// The encoded trace id is the "subkey" part of the row name.
+					traceKey := encodedTraceID(extractSubkey(row.Key()))
+					// If this is the first time we've seen the trace, initialize the
+					// slice of digest ids for it.
+					if _, ok := target[traceKey]; !ok {
+						target[traceKey] = make([]digestID, b.tileSize)
+						atomic.AddInt64(&traceCount, 1)
+					}
+
+					for _, col := range row[traceFamily] {
+						// The columns are something like T:35 where the part
+						// after the colon is the commitIndex i.e. the index
+						// of this commit in the current tile.
+						idx, err := strconv.Atoi(strings.TrimPrefix(col.Column, traceFamilyPrefix))
+						if err != nil {
+							// Should never happen
+							parseErr = err
+							return false
+						}
+						var dID digestID
+						if err := dID.UnmarshalBinary(col.Value); err != nil {
+							// This should never happen
+							parseErr = err
+							return false
+						}
+						if idx < 0 || idx >= int(b.tileSize) {
+							// This would happen if the tile size changed from a past
+							// value. It shouldn't be changed, even if the Gold tile size
+							// (n_commits) changes.
+							parseErr = skerr.Fmt("got index %d that is outside of the target slice of length %d", idx, len(target))
+							return false
+						}
+						target[traceKey][idx] = dID
+					}
+					return true
+				}, bigtable.RowFilter(bigtable.LatestNFilter(1)))
+				if err != nil {
+					return skerr.Fmt("could not read rows: %s", err)
+				}
+				return parseErr
+			})
+		}(shard)
+	}
+
+	if err := egroup.Wait(); err != nil {
+		return nil, err
+	}
+
+	// Merge all the results together
+	ret := make(map[encodedTraceID][]digestID, traceCount)
+	for _, r := range shardResults {
+		for traceKey, digestIDs := range r {
+			// different shards should never share results for a tracekey
+			// since a trace always maps to the same shard.
+			ret[traceKey] = digestIDs
+		}
+	}
+
+	return ret, nil
+}
+
+// applyBulkBatched writes the given rowNames/mutation pairs to BigTable in batches that are
+// maximally of size 'batchSize'. The batches are written in parallel.
+func (b *BTTraceStore) applyBulkBatched(ctx context.Context, rowNames []string, mutations []*bigtable.Mutation, batchSize int) error {
+
+	var egroup errgroup.Group
+	err := util.ChunkIter(len(rowNames), batchSize, func(chunkStart, chunkEnd int) error {
+		egroup.Go(func() error {
+			tctx, cancel := context.WithTimeout(ctx, writeTimeout)
+			defer cancel()
+			rowNames := rowNames[chunkStart:chunkEnd]
+			mutations := mutations[chunkStart:chunkEnd]
+			errs, err := b.table.ApplyBulk(tctx, rowNames, mutations)
+			if err != nil {
+				return skerr.Fmt("error writing batch [%d:%d]: %s", chunkStart, chunkEnd, err)
+			}
+			if errs != nil {
+				return skerr.Fmt("error writing some portions of batch [%d:%d]: %s", chunkStart, chunkEnd, errs)
+			}
+			return nil
+		})
+		return nil
+	})
+	if err != nil {
+		return skerr.Fmt("error running ChunkIter: %s", err)
+	}
+	return egroup.Wait()
+}
+
+// calcShardedRowName deterministically assigns a shard for the given subkey (e.g. traceID)
+// Once this is done, the shard, rowtype, tileKey and the subkey are combined into a
+// single string to be used as a row name in BT.
+func (b *BTTraceStore) calcShardedRowName(tileKey tileKey, rowType, subkey string) string {
+	shard := int32(crc32.ChecksumIEEE([]byte(subkey)) % uint32(b.shards))
+	return shardedRowName(shard, rowType, tileKey, subkey)
+}
+
+// To avoid having one monolithic row, we take the first three characters of the digest
+// and use it as a subkey in the row. Then, what remains is used as the column name.
+// In practice this means our digests will be split using three hexadecimal characters, so
+// we will have 16^3 = 4096 rows for our digest map.
+func (b *BTTraceStore) rowAndColNameFromDigest(digest types.Digest) (string, string) {
+	subkey := string(digest[:3])
+	colName := string(digest[3:])
+	return b.calcShardedRowName(digestMapTile, typeDigestMap, subkey), colName
+}
+
+// getDigestMap gets the global (i.e. same for all tiles) digestMap.
+func (b *BTTraceStore) getDigestMap(ctx context.Context) (*digestMap, error) {
+	defer metrics2.FuncTimer().Stop()
+	// Query all shards in parallel.
+	var egroup errgroup.Group
+	shardResults := make([]map[types.Digest]digestID, b.shards)
+	total := int64(0)
+	for shard := int32(0); shard < b.shards; shard++ {
+		func(shard int32) {
+			egroup.Go(func() error {
+				prefRange := bigtable.PrefixRange(shardedRowName(shard, typeDigestMap, digestMapTile, ""))
+				var idx int64
+				var parseErr error = nil
+				ret := map[types.Digest]digestID{}
+				err := b.table.ReadRows(ctx, prefRange, func(row bigtable.Row) bool {
+					digestPrefix := extractSubkey(row.Key())
+					for _, col := range row[digestMapFamily] {
+						idx, parseErr = strconv.ParseInt(string(col.Value), 10, 64)
+						if parseErr != nil {
+							// Should never happen
+							return false
+						}
+						digest := types.Digest(digestPrefix + strings.TrimPrefix(col.Column, digestMapFamilyPrefix))
+						ret[digest] = digestID(idx)
+					}
+					return true
+				}, bigtable.RowFilter(bigtable.LatestNFilter(1)))
+
+				if err != nil {
+					return skerr.Fmt("problem fetching shard %d of digestmap: %s", shard, err)
+				}
+				if parseErr != nil {
+					return parseErr
+				}
+
+				shardResults[shard] = ret
+				atomic.AddInt64(&total, int64(len(ret)))
+				return nil
+			})
+		}(shard)
+	}
+	if err := egroup.Wait(); err != nil {
+		return nil, skerr.Fmt("problem fetching digestmap: %s", err)
+	}
+
+	ret := newDigestMap(int(total))
+	for _, dm := range shardResults {
+		if err := ret.Add(dm); err != nil {
+			// put the digest map latter in case it gets truncated
+			return nil, skerr.Fmt("could not build DigestMap: %s \nresults %#v", err, dm)
+		}
+	}
+	return ret, nil
+}
+
+// getIDs returns a []DigestID of length n where each of the
+// digestIDs are unique (even between processes).
+func (b *BTTraceStore) getIDs(ctx context.Context, n int) ([]digestID, error) {
+	defer metrics2.FuncTimer().Stop()
+	// Extract up to n ids from those we have already cached.
+	b.availIDsMutex.Lock()
+	defer b.availIDsMutex.Unlock()
+	toExtract := util.MinInt(len(b.availIDs), n)
+
+	ids := make([]digestID, 0, n)
+	ids = append(ids, b.availIDs[:toExtract]...)
+	b.availIDs = b.availIDs[toExtract:]
+
+	// missing is how many ids we are short
+	missing := int64(n - len(ids))
+	if missing == 0 {
+		return ids, nil
+	}
+	// For performance reasons, make a few big requests for ids instead of many small ones.
+	// That is, always request numReservedIds extra.
+	toRequest := missing + numReservedIds
+	// Reserve new IDs via the ID counter
+	rmw := bigtable.NewReadModifyWrite()
+	rmw.Increment(idCounterFamily, idCounterColumn, toRequest)
+	row, err := b.table.ApplyReadModifyWrite(ctx, idCounterRow, rmw)
+	if err != nil {
+		return nil, skerr.Fmt("could not fetch counter from BT: %s", err)
+	}
+
+	// ri are the cells in Row of the given counter family
+	// This should be 1 cell belonging to 1 column.
+	ri, ok := row[idCounterFamily]
+	if !ok {
+		// should never happen
+		return nil, skerr.Fmt("malformed response - no id counter family: %#v", ri)
+	}
+	if len(ri) != 1 {
+		// should never happen
+		return nil, skerr.Fmt("malformed response - expected 1 cell: %#v", ri)
+	}
+
+	maxID := digestID(binary.BigEndian.Uint64(ri[0].Value))
+
+	lastID := maxID - digestID(toRequest)
+	// ID of 0 is a special case - it's already assigned to MISSING_DIGEST, so skip it.
+	if lastID == missingDigestID {
+		lastID++
+	}
+	for i := lastID; i < maxID; i++ {
+		// Give the first ids to the current allocation request...
+		if missing > 0 {
+			ids = append(ids, i)
+		} else {
+			// ... and put the remainder in the store for later.
+			b.availIDs = append(b.availIDs, i)
+		}
+		missing--
+	}
+
+	return ids, nil
+}
+
+// returnIDs can be called with a []DigestID of ids that were not actually
+// assigned to digests. This allows them to be used by future requests to
+// getIDs.
+func (b *BTTraceStore) returnIDs(unusedIDs []digestID) {
+	b.availIDsMutex.Lock()
+	defer b.availIDsMutex.Unlock()
+	b.availIDs = append(b.availIDs, unusedIDs...)
+}
+
+// getOrAddDigests fills the given digestMap with the given digests
+// assigned to a DigestID if they don't already have an assignment.
+// This is a helper function for updateDigestMap
+// TODO(kjlubick): This currently makes a lot of requests to BT -
+// Should there be some caching done here to prevent that?
+func (b *BTTraceStore) getOrAddDigests(ctx context.Context, digests []types.Digest, digestMap *digestMap) (*digestMap, error) {
+	defer metrics2.FuncTimer().Stop()
+	availIDs, err := b.getIDs(ctx, len(digests))
+	if err != nil {
+		return nil, err
+	}
+
+	now := bigtable.Time(time.Now())
+	newIDMapping := make(map[types.Digest]digestID, len(digests))
+	unusedIDs := make([]digestID, 0, len(availIDs))
+	for idx, digest := range digests {
+		idVal := availIDs[idx]
+		if _, err := digestMap.ID(digest); err == nil {
+			// digestMap already has a mapping for this digest, no need to check
+			// if BT has seen it yet (because it has).
+			// Should never happen because we we've already done this check in updateDigestMap.
+			unusedIDs = append(unusedIDs, idVal)
+			continue
+		}
+		rowName, colName := b.rowAndColNameFromDigest(digest)
+		// This mutation says "Add an entry to the map for digest -> idVal iff
+		// the digest doesn't already have a mapping".
+		addMut := bigtable.NewMutation()
+		addMut.Set(digestMapFamily, colName, now, []byte(strconv.FormatInt(int64(idVal), 10)))
+		filter := bigtable.ColumnFilter(colName)
+		// Note that we only add the value if filter is false, i.e. the column does not
+		// already exist.
+		condMut := bigtable.NewCondMutation(filter, nil, addMut)
+		var digestAlreadyHadId bool
+		if err := b.table.Apply(ctx, rowName, condMut, bigtable.GetCondMutationResult(&digestAlreadyHadId)); err != nil {
+			return nil, skerr.Fmt("could not check if row %s col %s already had a DigestID: %s", rowName, colName, err)
+		}
+
+		// We didn't need this ID so let's re-use it later.
+		if digestAlreadyHadId {
+			unusedIDs = append(unusedIDs, idVal)
+		} else {
+			newIDMapping[digest] = idVal
+		}
+	}
+
+	// If all ids were added to BT, then we know our newIDMapping can simply be added
+	// to what we already have, since there were no collisions between digests and what
+	// was in the table already.
+	if len(unusedIDs) == 0 {
+		if err := digestMap.Add(newIDMapping); err != nil {
+			return nil, err
+		}
+		return digestMap, nil
+	}
+	// At this point, some of the digests already had ids, so we should reload
+	// the entire digestMap to make sure we have the full picture.
+	// TODO(kjlubick): Can we not just add what new ones we saw to what we already have?
+
+	// Return the unused IDs for later use.
+	b.returnIDs(unusedIDs)
+	return b.getDigestMap(ctx)
+}
+
+// updateDigestMap returns the current global DigestMap after making sure the given
+// digests are a part of it.
+func (b *BTTraceStore) updateDigestMap(ctx context.Context, digests types.DigestSet) (*digestMap, error) {
+	defer metrics2.FuncTimer().Stop()
+	// Load the digest map from BT.
+	// TODO(kjlubick): should we cache this map and first check to see if the digests
+	// are all in there?
+	digestMap, err := b.getDigestMap(ctx)
+	if err != nil {
+		return nil, err
+	}
+
+	delta := digestMap.Delta(digests)
+	if len(delta) == 0 {
+		return digestMap, nil
+	}
+
+	return b.getOrAddDigests(ctx, delta, digestMap)
+}
+
+// Copied from btts.go in infra/perf
+
+// UpdateOrderedParamSet will add all params from 'p' to the OrderedParamSet
+// for 'tileKey' and write it back to BigTable.
+func (b *BTTraceStore) updateOrderedParamSet(ctx context.Context, tileKey tileKey, p paramtools.ParamSet) (*paramtools.OrderedParamSet, error) {
+	defer metrics2.FuncTimer().Stop()
+
+	tctx, cancel := context.WithTimeout(ctx, writeTimeout)
+	defer cancel()
+	var newEntry *opsCacheEntry
+	for {
+		// Get OPS.
+		entry, existsInBT, err := b.getOPS(ctx, tileKey)
+		if err != nil {
+			return nil, skerr.Fmt("failed to get OPS: %s", err)
+		}
+
+		// If the OPS contains our paramset then we're done.
+		if delta := entry.ops.Delta(p); len(delta) == 0 {
+			return entry.ops, nil
+		}
+
+		// Create a new updated ops.
+		ops := entry.ops.Copy()
+		ops.Update(p)
+		newEntry, err = opsCacheEntryFromOPS(ops)
+		if err != nil {
+			return nil, skerr.Fmt("failed to create cache entry: %s", err)
+		}
+		encodedOps, err := newEntry.ops.Encode()
+		if err != nil {
+			return nil, skerr.Fmt("failed to encode new ops: %s", err)
+		}
+
+		now := bigtable.Time(time.Now())
+		condTrue := false
+		if existsInBT {
+			// Create an update that avoids the lost update problem.
+			cond := bigtable.ChainFilters(
+				bigtable.LatestNFilter(1),
+				bigtable.FamilyFilter(opsFamily),
+				bigtable.ColumnFilter(opsHashColumn),
+				bigtable.ValueFilter(string(entry.hash)),
+			)
+			updateMutation := bigtable.NewMutation()
+			updateMutation.Set(opsFamily, opsHashColumn, now, []byte(newEntry.hash))
+			updateMutation.Set(opsFamily, opsOpsColumn, now, encodedOps)
+
+			// Add a mutation that cleans up old versions.
+			before := bigtable.Time(now.Time().Add(-1 * time.Second))
+			updateMutation.DeleteTimestampRange(opsFamily, opsHashColumn, 0, before)
+			updateMutation.DeleteTimestampRange(opsFamily, opsOpsColumn, 0, before)
+			condUpdate := bigtable.NewCondMutation(cond, updateMutation, nil)
+
+			if err := b.table.Apply(tctx, tileKey.OpsRowName(), condUpdate, bigtable.GetCondMutationResult(&condTrue)); err != nil {
+				sklog.Warningf("Failed to apply: %s", err)
+				return nil, err
+			}
+
+			// If !condTrue then we need to try again,
+			// and clear our local cache.
+			if !condTrue {
+				sklog.Warningf("Exists !condTrue - clearing cache and trying again.")
+				b.opsCache.Delete(tileKey.OpsRowName())
+				continue
+			}
+		} else {
+			// Create an update that only works if the ops entry doesn't exist yet.
+			// I.e. only apply the mutation if the HASH column doesn't exist for this row.
+			cond := bigtable.ChainFilters(
+				bigtable.FamilyFilter(opsFamily),
+				bigtable.ColumnFilter(opsHashColumn),
+			)
+			updateMutation := bigtable.NewMutation()
+			updateMutation.Set(opsFamily, opsHashColumn, now, []byte(newEntry.hash))
+			updateMutation.Set(opsFamily, opsOpsColumn, now, encodedOps)
+
+			condUpdate := bigtable.NewCondMutation(cond, nil, updateMutation)
+			if err := b.table.Apply(tctx, tileKey.OpsRowName(), condUpdate, bigtable.GetCondMutationResult(&condTrue)); err != nil {
+				sklog.Warningf("Failed to apply: %s", err)
+				// clear cache and try again
+				b.opsCache.Delete(tileKey.OpsRowName())
+				continue
+			}
+
+			// If condTrue then we need to try again,
+			// and clear our local cache.
+			if condTrue {
+				sklog.Warningf("First Write condTrue - clearing cache and trying again.")
+				b.opsCache.Delete(tileKey.OpsRowName())
+				continue
+			}
+		}
+
+		// Successfully wrote OPS, so update the cache.
+		if b.cacheOps {
+			b.opsCache.Store(tileKey.OpsRowName(), newEntry)
+		}
+		break
+	}
+	return newEntry.ops, nil
+}
+
+// getOps returns the OpsCacheEntry for a given tile.
+//
+// Note that it will create a new OpsCacheEntry if none exists.
+//
+// getOps returns false if the OPS in BT was empty, true otherwise (even if cached).
+func (b *BTTraceStore) getOPS(ctx context.Context, tileKey tileKey) (*opsCacheEntry, bool, error) {
+	defer metrics2.FuncTimer().Stop()
+	if b.cacheOps {
+		entry, ok := b.opsCache.Load(tileKey.OpsRowName())
+		if ok {
+			return entry.(*opsCacheEntry), true, nil
+		}
+	}
+	tctx, cancel := context.WithTimeout(ctx, readTimeout)
+	defer cancel()
+	row, err := b.table.ReadRow(tctx, tileKey.OpsRowName(), bigtable.RowFilter(bigtable.LatestNFilter(1)))
+	if err != nil {
+		return nil, false, skerr.Fmt("failed to read OPS from BigTable for %s: %s", tileKey.OpsRowName(), err)
+	}
+	// If there is no entry in BigTable then return an empty OPS.
+	if len(row) == 0 {
+		sklog.Warningf("Failed to read OPS from BT for %s.", tileKey.OpsRowName())
+		entry, err := newOpsCacheEntry()
+		return entry, false, err
+	}
+	entry, err := newOpsCacheEntryFromRow(row)
+	if err == nil && b.cacheOps {
+		b.opsCache.Store(tileKey.OpsRowName(), entry)
+	}
+	return entry, true, err
+}
+
+// makeTileCommits creates a slice of tiling.Commit from the given git hashes.
+// Specifically, we need to look up the details to get the author information.
+func (b *BTTraceStore) makeTileCommits(ctx context.Context, hashes []string) ([]*tiling.Commit, error) {
+	longCommits, err := b.vcs.DetailsMulti(ctx, hashes, false)
+	if err != nil {
+		// put hashes second in case they get truncated for being quite long.
+		return nil, skerr.Fmt("could not fetch commit data for commits %s (hashes: %q)", err, hashes)
+	}
+
+	commits := make([]*tiling.Commit, len(hashes))
+	for i, lc := range longCommits {
+		if lc == nil {
+			return nil, skerr.Fmt("commit %s not found from VCS", hashes[i])
+		}
+		commits[i] = &tiling.Commit{
+			Hash:       lc.Hash,
+			Author:     lc.Author,
+			CommitTime: lc.Timestamp.Unix(),
+		}
+	}
+	return commits, nil
+}
+
+// Make sure BTTraceStore fulfills the TraceStore Interface
+var _ tracestore.TraceStore = (*BTTraceStore)(nil)
diff --git a/golden/go/tracestore/bt_tracestore/bt_tracestore_test.go b/golden/go/tracestore/bt_tracestore/bt_tracestore_test.go
new file mode 100644
index 0000000..56f37cd
--- /dev/null
+++ b/golden/go/tracestore/bt_tracestore/bt_tracestore_test.go
@@ -0,0 +1,990 @@
+package bt_tracestore
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/mock"
+	assert "github.com/stretchr/testify/require"
+	"go.skia.org/infra/go/bt"
+	"go.skia.org/infra/go/deepequal"
+	"go.skia.org/infra/go/fileutil"
+	"go.skia.org/infra/go/gcs/gcs_testutils"
+	"go.skia.org/infra/go/sktest"
+	"go.skia.org/infra/go/testutils/unittest"
+	"go.skia.org/infra/go/tiling"
+	"go.skia.org/infra/go/util"
+	"go.skia.org/infra/go/vcsinfo"
+	mock_vcs "go.skia.org/infra/go/vcsinfo/mocks"
+	"go.skia.org/infra/golden/go/serialize"
+	data "go.skia.org/infra/golden/go/testutils/data_three_devices"
+	"go.skia.org/infra/golden/go/tracestore"
+	"go.skia.org/infra/golden/go/types"
+)
+
+// TestBTTraceStorePutGet adds a bunch of entries one at a time and
+// then retrieves the full tile.
+func TestBTTraceStorePutGet(t *testing.T) {
+	unittest.LargeTest(t)
+	unittest.RequiresBigTableEmulator(t)
+
+	commits := data.MakeTestCommits()
+	mvcs := MockVCSWithCommits(commits, 0)
+	defer mvcs.AssertExpectations(t)
+
+	btConf := BTConfig{
+		ProjectID:  "should-use-the-emulator",
+		InstanceID: "testinstance",
+		TableID:    "three_devices_test",
+		VCS:        mvcs,
+	}
+
+	assert.NoError(t, bt.DeleteTables(btConf.ProjectID, btConf.InstanceID, btConf.TableID))
+	assert.NoError(t, InitBT(btConf))
+
+	ctx := context.Background()
+	traceStore, err := New(ctx, btConf, true)
+	assert.NoError(t, err)
+
+	// With no data, we should get an empty tile
+	actualTile, _, err := traceStore.GetTile(ctx, len(commits))
+	assert.NoError(t, err)
+	assert.NotNil(t, actualTile)
+	assert.Empty(t, actualTile.Traces)
+
+	// This time is an arbitrary point in time
+	now := time.Date(2019, time.May, 5, 1, 3, 4, 0, time.UTC)
+
+	// Build a tile up from the individual data points, one at a time
+	traces := data.MakeTestTile().Traces
+	for _, trace := range traces {
+		gTrace, ok := trace.(*types.GoldenTrace)
+		assert.True(t, ok)
+
+		// Put them in backwards, just to test that order doesn't matter
+		for i := len(gTrace.Digests) - 1; i >= 0; i-- {
+			e := tracestore.Entry{
+				Digest: gTrace.Digests[i],
+				Params: gTrace.Keys,
+			}
+			err := traceStore.Put(ctx, commits[i].Hash, []*tracestore.Entry{&e}, now)
+			assert.NoError(t, err)
+			// roll forward the clock by an arbitrary amount of time
+			now = now.Add(7 * time.Second)
+		}
+	}
+
+	// Get the tile back and make sure it exactly matches the tile
+	// we hand-crafted for the test data.
+	actualTile, actualCommits, err := traceStore.GetTile(ctx, len(commits))
+	assert.NoError(t, err)
+
+	assert.Equal(t, data.MakeTestTile(), actualTile)
+	assert.Equal(t, commits, actualCommits)
+}
+
+// TestBTTraceStorePutGetSpanTile is like TestBTTraceStorePutGet except the 3 commits
+// are lined up to go across two tiles.
+func TestBTTraceStorePutGetSpanTile(t *testing.T) {
+	unittest.LargeTest(t)
+	unittest.RequiresBigTableEmulator(t)
+
+	commits := data.MakeTestCommits()
+	mvcs := MockVCSWithCommits(commits, DefaultTileSize-2)
+	defer mvcs.AssertExpectations(t)
+
+	btConf := BTConfig{
+		ProjectID:  "should-use-the-emulator",
+		InstanceID: "testinstance",
+		TableID:    "three_devices_test_span",
+		VCS:        mvcs,
+	}
+
+	assert.NoError(t, bt.DeleteTables(btConf.ProjectID, btConf.InstanceID, btConf.TableID))
+	assert.NoError(t, InitBT(btConf))
+
+	ctx := context.Background()
+	traceStore, err := New(ctx, btConf, true)
+	assert.NoError(t, err)
+
+	// With no data, we should get an empty tile
+	actualTile, _, err := traceStore.GetTile(ctx, len(commits))
+	assert.NoError(t, err)
+	assert.NotNil(t, actualTile)
+	assert.Empty(t, actualTile.Traces)
+
+	// This time is an arbitrary point in time
+	now := time.Date(2019, time.May, 5, 1, 3, 4, 0, time.UTC)
+
+	// Build a tile up from the individual data points, one at a time
+	traces := data.MakeTestTile().Traces
+	for _, trace := range traces {
+		gTrace, ok := trace.(*types.GoldenTrace)
+		assert.True(t, ok)
+
+		// Put them in backwards, just to test that order doesn't matter
+		for i := len(gTrace.Digests) - 1; i >= 0; i-- {
+			e := tracestore.Entry{
+				Digest: gTrace.Digests[i],
+				Params: gTrace.Keys,
+			}
+			err := traceStore.Put(ctx, commits[i].Hash, []*tracestore.Entry{&e}, now)
+			assert.NoError(t, err)
+			// roll forward the clock by an arbitrary amount of time
+			now = now.Add(7 * time.Second)
+		}
+	}
+
+	// Get the tile back and make sure it exactly matches the tile
+	// we hand-crafted for the test data.
+	actualTile, actualCommits, err := traceStore.GetTile(ctx, len(commits))
+	assert.NoError(t, err)
+
+	assert.Equal(t, data.MakeTestTile(), actualTile)
+	assert.Equal(t, commits, actualCommits)
+}
+
+// TestBTTraceStorePutGetGrouped adds a bunch of entries batched by device and
+// then retrieves the full Tile.
+func TestBTTraceStorePutGetGrouped(t *testing.T) {
+	unittest.LargeTest(t)
+	unittest.RequiresBigTableEmulator(t)
+
+	commits := data.MakeTestCommits()
+	mvcs := MockVCSWithCommits(commits, 0)
+	defer mvcs.AssertExpectations(t)
+
+	btConf := BTConfig{
+		ProjectID:  "should-use-the-emulator",
+		InstanceID: "testinstance",
+		TableID:    "three_devices_test_grouped",
+		VCS:        mvcs,
+	}
+
+	assert.NoError(t, bt.DeleteTables(btConf.ProjectID, btConf.InstanceID, btConf.TableID))
+	assert.NoError(t, InitBT(btConf))
+
+	ctx := context.Background()
+	traceStore, err := New(ctx, btConf, true)
+	assert.NoError(t, err)
+
+	// With no data, we should get an empty tile
+	actualTile, _, err := traceStore.GetTile(ctx, len(commits))
+	assert.NoError(t, err)
+	assert.NotNil(t, actualTile)
+	assert.Empty(t, actualTile.Traces)
+
+	// Build a tile up from the individual data points, one at a time
+	now := time.Date(2019, time.May, 5, 1, 3, 4, 0, time.UTC)
+
+	// Group the traces by device, so we should have 3 groups of 2 traces.
+	traces := data.MakeTestTile().Traces
+	byDevice := map[string][]*types.GoldenTrace{
+		data.AnglerDevice:     nil,
+		data.BullheadDevice:   nil,
+		data.CrosshatchDevice: nil,
+	}
+	for _, trace := range traces {
+		gTrace, ok := trace.(*types.GoldenTrace)
+		assert.True(t, ok)
+		assert.Len(t, gTrace.Digests, len(commits), "test data should have one digest per commit")
+		dev := gTrace.Keys["device"]
+		byDevice[dev] = append(byDevice[dev], gTrace)
+	}
+	assert.Len(t, byDevice, 3, "test data should have exactly 3 devices")
+
+	// for each trace, report a group of two digests for each commit.
+	for dev, gTraces := range byDevice {
+		assert.Len(t, gTraces, 2, "test data for %s should have exactly 2 traces", dev)
+
+		for i := 0; i < len(commits); i++ {
+			var entries []*tracestore.Entry
+			for _, gTrace := range gTraces {
+				entries = append(entries, &tracestore.Entry{
+					Digest: gTrace.Digests[i],
+					Params: gTrace.Keys,
+				})
+			}
+
+			err = traceStore.Put(ctx, commits[i].Hash, entries, now)
+			assert.NoError(t, err)
+			// roll forward the clock by an arbitrary amount of time
+			now = now.Add(3 * time.Minute)
+		}
+	}
+
+	// Get the tile back and make sure it exactly matches the tile
+	// we hand-crafted for the test data.
+	actualTile, actualCommits, err := traceStore.GetTile(ctx, len(commits))
+	assert.NoError(t, err)
+
+	assert.Equal(t, data.MakeTestTile(), actualTile)
+	assert.Equal(t, commits, actualCommits)
+}
+
+// TestBTTraceStorePutGetThreaded is like TestBTTraceStorePutGet, just
+// with a bunch of reads/writes done in simultaneous go routines in
+// an effort to catch any race conditions.
+func TestBTTraceStorePutGetThreaded(t *testing.T) {
+	unittest.LargeTest(t)
+	unittest.RequiresBigTableEmulator(t)
+
+	commits := data.MakeTestCommits()
+	mvcs := MockVCSWithCommits(commits, 0)
+	defer mvcs.AssertExpectations(t)
+
+	btConf := BTConfig{
+		ProjectID:  "should-use-the-emulator",
+		InstanceID: "testinstance",
+		TableID:    "three_devices_test_threaded",
+		VCS:        mvcs,
+	}
+
+	assert.NoError(t, bt.DeleteTables(btConf.ProjectID, btConf.InstanceID, btConf.TableID))
+	assert.NoError(t, InitBT(btConf))
+
+	ctx := context.Background()
+	traceStore, err := New(ctx, btConf, true)
+	assert.NoError(t, err)
+
+	wg := sync.WaitGroup{}
+	wg.Add(2)
+
+	now := time.Date(2019, time.May, 5, 1, 3, 4, 0, time.UTC)
+
+	readTile := func() {
+		defer wg.Done()
+		_, _, err := traceStore.GetTile(ctx, len(commits))
+		assert.NoError(t, err)
+	}
+	go readTile()
+
+	// Build a tile up from the individual data points, one at a time
+	traces := data.MakeTestTile().Traces
+	for _, trace := range traces {
+		gTrace, ok := trace.(*types.GoldenTrace)
+		assert.True(t, ok)
+
+		// Put them in backwards, just to test that order doesn't matter
+		for i := len(gTrace.Digests) - 1; i >= 0; i-- {
+			wg.Add(1)
+			go func(now time.Time, i int) {
+				defer wg.Done()
+				e := tracestore.Entry{
+					Digest: gTrace.Digests[i],
+					Params: gTrace.Keys,
+				}
+				err := traceStore.Put(ctx, commits[i].Hash, []*tracestore.Entry{&e}, now)
+				assert.NoError(t, err)
+			}(now, i)
+			now = now.Add(7 * time.Second)
+		}
+	}
+	go readTile()
+
+	wg.Wait()
+
+	// Get the tile back and make sure it exactly matches the tile
+	// we hand-crafted for the test data.
+	actualTile, actualCommits, err := traceStore.GetTile(ctx, len(commits))
+	assert.NoError(t, err)
+
+	assert.Equal(t, data.MakeTestTile(), actualTile)
+	assert.Equal(t, commits, actualCommits)
+}
+
+// TestBTTraceStoreGetDenseTile makes sure we get an empty tile
+func TestBTTraceStoreGetDenseTileEmpty(t *testing.T) {
+	unittest.LargeTest(t)
+	unittest.RequiresBigTableEmulator(t)
+
+	commits := data.MakeTestCommits()
+	realCommitIndices := []int{300, 501, 557}
+	totalCommits := 1101
+	mvcs, _ := MockSparseVCSWithCommits(commits, realCommitIndices, totalCommits)
+	defer mvcs.AssertExpectations(t)
+
+	btConf := BTConfig{
+		ProjectID:  "should-use-the-emulator",
+		InstanceID: "testinstance",
+		TableID:    "three_devices_test_dense_empty",
+		VCS:        mvcs,
+	}
+
+	assert.NoError(t, bt.DeleteTables(btConf.ProjectID, btConf.InstanceID, btConf.TableID))
+	assert.NoError(t, InitBT(btConf))
+
+	ctx := context.Background()
+	traceStore, err := New(ctx, btConf, true)
+	assert.NoError(t, err)
+
+	// With no data, we should get an empty tile
+	actualTile, actualCommits, err := traceStore.GetDenseTile(ctx, len(commits))
+	assert.NoError(t, err)
+	assert.NotNil(t, actualTile)
+	assert.Empty(t, actualCommits)
+	assert.Empty(t, actualTile.Traces)
+
+}
+
+// TestBTTraceStoreGetDenseTile puts in a few data points sparsely spaced throughout
+// time and makes sure we can call GetDenseTile to get them condensed together
+// (i.e. with all the empty commits tossed out). It puts them in a variety of conditions
+// to try to identify any edge cases.
+func TestBTTraceStoreGetDenseTile(t *testing.T) {
+	unittest.LargeTest(t)
+	unittest.RequiresBigTableEmulator(t)
+
+	// 3 commits, arbitrarily spaced out across the last tile
+	commits := data.MakeTestCommits()
+	realCommitIndices := []int{795, 987, 1001}
+	totalCommits := (256 * 4) - 1
+	mvcs, lCommits := MockSparseVCSWithCommits(commits, realCommitIndices, totalCommits)
+	expectedTile := data.MakeTestTile()
+	testDenseTile(t, expectedTile, mvcs, commits, lCommits, realCommitIndices)
+
+	// 3 commits, arbitrarily spaced out across 3 tiles, with no data
+	// in the most recent tile
+	commits = data.MakeTestCommits()
+	realCommitIndices = []int{300, 501, 557}
+	totalCommits = 1101
+	mvcs, lCommits = MockSparseVCSWithCommits(commits, realCommitIndices, totalCommits)
+	expectedTile = data.MakeTestTile()
+	testDenseTile(t, expectedTile, mvcs, commits, lCommits, realCommitIndices)
+
+	// As above, just 2 commits
+	commits = data.MakeTestCommits()[1:]
+	realCommitIndices = []int{501, 557}
+	totalCommits = 1101
+	mvcs, lCommits = MockSparseVCSWithCommits(commits, realCommitIndices, totalCommits)
+	expectedTile = data.MakeTestTile()
+	expectedTile, err := expectedTile.Trim(1, 3)
+	assert.NoError(t, err)
+	testDenseTile(t, expectedTile, mvcs, commits, lCommits, realCommitIndices)
+
+	// All commits are on the first commit of their tile
+	commits = data.MakeTestCommits()
+	realCommitIndices = []int{0, 256, 512}
+	totalCommits = 1101
+	mvcs, lCommits = MockSparseVCSWithCommits(commits, realCommitIndices, totalCommits)
+	expectedTile = data.MakeTestTile()
+	testDenseTile(t, expectedTile, mvcs, commits, lCommits, realCommitIndices)
+
+	// All commits are on the last commit of their tile
+	commits = data.MakeTestCommits()
+	realCommitIndices = []int{255, 511, 767}
+	totalCommits = 1101
+	mvcs, lCommits = MockSparseVCSWithCommits(commits, realCommitIndices, totalCommits)
+	expectedTile = data.MakeTestTile()
+	testDenseTile(t, expectedTile, mvcs, commits, lCommits, realCommitIndices)
+
+	// Empty tiles between commits
+	commits = data.MakeTestCommits()
+	realCommitIndices = []int{50, 800, 1100}
+	totalCommits = 1101
+	mvcs, lCommits = MockSparseVCSWithCommits(commits, realCommitIndices, totalCommits)
+	expectedTile = data.MakeTestTile()
+	testDenseTile(t, expectedTile, mvcs, commits, lCommits, realCommitIndices)
+}
+
+// testDenseTile takes the data from tile, Puts it into BT, then pulls the tile given
+// the commit layout in VCS and returns it.
+func testDenseTile(t *testing.T, tile *tiling.Tile, mvcs *mock_vcs.VCS, commits []*tiling.Commit, lCommits []*vcsinfo.LongCommit, realCommitIndices []int) {
+	defer mvcs.AssertExpectations(t)
+
+	btConf := BTConfig{
+		ProjectID:  "should-use-the-emulator",
+		InstanceID: "testinstance",
+		TableID:    "three_devices_test_dense",
+		VCS:        mvcs,
+	}
+
+	assert.NoError(t, bt.DeleteTables(btConf.ProjectID, btConf.InstanceID, btConf.TableID))
+	assert.NoError(t, InitBT(btConf))
+
+	ctx := context.Background()
+	traceStore, err := New(ctx, btConf, true)
+	assert.NoError(t, err)
+
+	// This time is an arbitrary point in time
+	now := time.Date(2019, time.May, 5, 1, 3, 4, 0, time.UTC)
+
+	// Build a tile up from the individual data points, one at a time
+	traces := tile.Traces
+	for _, trace := range traces {
+		gTrace, ok := trace.(*types.GoldenTrace)
+		assert.True(t, ok)
+
+		// Put them in backwards, just to test that order doesn't matter
+		for i := len(gTrace.Digests) - 1; i >= 0; i-- {
+			e := tracestore.Entry{
+				Digest: gTrace.Digests[i],
+				Params: gTrace.Keys,
+			}
+			err := traceStore.Put(ctx, commits[i].Hash, []*tracestore.Entry{&e}, now)
+			assert.NoError(t, err)
+			// roll forward the clock by an arbitrary amount of time
+			now = now.Add(7 * time.Second)
+		}
+	}
+
+	// Get the tile back and make sure it exactly matches the tile
+	// we hand-crafted for the test data.
+	actualTile, allCommits, err := traceStore.GetDenseTile(ctx, len(commits))
+	assert.NoError(t, err)
+	assert.Len(t, allCommits, len(lCommits)-realCommitIndices[0])
+
+	// In MockSparseVCSWithCommits, we change the time of the commits, so we need
+	// to update the expected times to match.
+	for i, c := range commits {
+		c.CommitTime = lCommits[realCommitIndices[i]].Timestamp.Unix()
+	}
+	tile.Commits = commits
+
+	assert.Equal(t, tile, actualTile)
+}
+
+// TestBTDigestMap tests the internal workings of storing the
+// DigestMap. See BIGTABLE.md for more about the schemas for
+// the DigestMap family and the id counter family.
+func TestBTDigestMap(t *testing.T) {
+	unittest.LargeTest(t)
+	unittest.RequiresBigTableEmulator(t)
+
+	btConf := BTConfig{
+		ProjectID:  "should-use-the-emulator",
+		InstanceID: "testinstance",
+		TableID:    "digest_map_test",
+		VCS:        nil,
+	}
+
+	assert.NoError(t, bt.DeleteTables(btConf.ProjectID, btConf.InstanceID, btConf.TableID))
+	assert.NoError(t, InitBT(btConf))
+
+	ctx := context.Background()
+	traceStore, err := New(ctx, btConf, true)
+	assert.NoError(t, err)
+
+	dm, err := traceStore.getDigestMap(ctx)
+	assert.NoError(t, err)
+	assert.NotNil(t, dm)
+	// should be empty, except for the initial mapping.
+	assert.Equal(t, 1, dm.Len())
+	i, err := dm.ID(types.MISSING_DIGEST)
+	assert.NoError(t, err)
+	assert.Equal(t, missingDigestID, i)
+
+	digests := makeTestDigests(0, 100)
+
+	// add 90 of the 100 digests using update
+	ninety := make(types.DigestSet, 90)
+	for _, d := range digests[0:90] {
+		ninety[d] = true
+	}
+	dm, err = traceStore.updateDigestMap(ctx, ninety)
+	assert.NoError(t, err)
+	assert.NotNil(t, dm)
+	assert.Equal(t, 91, dm.Len())
+	// We can't check to see if our known digests map to
+	// a specific id because the digest map could present
+	// the digests in a non-deterministic order.
+	// We can spot check one of the ids though
+	_, err = dm.Digest(88)
+	assert.NoError(t, err)
+
+	ids, err := traceStore.getIDs(ctx, 3)
+	// The next 3 numbers should be 91, 92, 93 because they are
+	// monotonically increasing
+	assert.NoError(t, err)
+	assert.Equal(t, []digestID{91, 92, 93}, ids)
+	func() {
+		traceStore.availIDsMutex.Lock()
+		defer traceStore.availIDsMutex.Unlock()
+		assert.NotContains(t, traceStore.availIDs, digestID(92))
+		assert.NotContains(t, traceStore.availIDs, digestID(93))
+		assert.Contains(t, traceStore.availIDs, digestID(94))
+	}()
+
+	// give two ids back (pretend we used id 92)
+	traceStore.returnIDs([]digestID{91, 93})
+
+	func() {
+		traceStore.availIDsMutex.Lock()
+		defer traceStore.availIDsMutex.Unlock()
+		assert.NotContains(t, traceStore.availIDs, digestID(92))
+		assert.Contains(t, traceStore.availIDs, digestID(93))
+		assert.Contains(t, traceStore.availIDs, digestID(94))
+	}()
+
+	// call update with an overlap of new and old
+	twenty := make(types.DigestSet, 90)
+	for _, d := range digests[80:] {
+		twenty[d] = true
+	}
+	dm, err = traceStore.updateDigestMap(ctx, twenty)
+	assert.NoError(t, err)
+	assert.NotNil(t, dm)
+	assert.Equal(t, 101, dm.Len())
+
+	// Get it again and make sure it matches the last update phase.
+	dm2, err := traceStore.getDigestMap(ctx)
+	assert.NoError(t, err)
+	assert.NotNil(t, dm2)
+	assert.Equal(t, dm, dm2)
+
+	// Add a lot more digests to make sure the bulk requesting works
+	for i := 1; i < 10; i++ {
+		// 113 is an arbitrary prime number that does not divide batchIdRequest.
+		ds := make(types.DigestSet, 113)
+		ds.AddLists(makeTestDigests(113*i, 113))
+
+		dm, err := traceStore.updateDigestMap(ctx, ds)
+		assert.NoError(t, err)
+		assert.NotNil(t, dm)
+	}
+}
+
+// makeTestDigests returns n valid digests. These digests are easy
+// for humans to understand, as they are just the hex values [0, 99]
+// reversed and 0-padded to 32 chars long (a valid md5 hash).
+func makeTestDigests(start, n int) []types.Digest {
+	xd := make([]types.Digest, n)
+	for i := 0; i < n; i++ {
+		// Reverse them to exercise the prefixing of the digestMap.
+		s := util.ReverseString(fmt.Sprintf("%032x", start+i))
+		xd[i] = types.Digest(s)
+	}
+	return xd
+}
+
+// TestGetTileKey tests the internal workings of deriving a
+// tileKey from the commit index. See BIGTABLE.md for more.
+func TestGetTileKey(t *testing.T) {
+	unittest.LargeTest(t)
+	unittest.RequiresBigTableEmulator(t)
+
+	btConf := BTConfig{
+		// Leaving other things blank because we won't actually hit BT or use VCS.
+	}
+
+	ctx := context.Background()
+	traceStore, err := New(ctx, btConf, true)
+	assert.NoError(t, err)
+
+	type testStruct struct {
+		InputRepoIndex int
+
+		ExpectedKey   tileKey
+		ExpectedIndex int
+	}
+	// test data is valid, but arbitrary.
+	tests := []testStruct{
+		{
+			InputRepoIndex: 0,
+			ExpectedKey:    tileKey(2147483647),
+			ExpectedIndex:  0,
+		},
+		{
+			InputRepoIndex: 10,
+			ExpectedKey:    tileKey(2147483647),
+			ExpectedIndex:  10,
+		},
+		{
+			InputRepoIndex: 300,
+			ExpectedKey:    tileKey(2147483646),
+			ExpectedIndex:  44,
+		},
+		{
+			InputRepoIndex: 123456,
+			ExpectedKey:    tileKey(2147483165),
+			ExpectedIndex:  64,
+		},
+	}
+
+	for _, test := range tests {
+		key, index := traceStore.getTileKey(test.InputRepoIndex)
+		assert.Equal(t, test.ExpectedKey, key)
+		assert.Equal(t, test.ExpectedIndex, index)
+	}
+}
+
+// TestCalcShardedRowName tests the internal workings of sharding
+// a given subkey.
+func TestCalcShardedRowName(t *testing.T) {
+	unittest.LargeTest(t)
+	unittest.RequiresBigTableEmulator(t)
+
+	btConf := BTConfig{
+		// Leaving other things blank because we won't actually hit BT
+		// or use the VCS.
+	}
+
+	ctx := context.Background()
+	traceStore, err := New(ctx, btConf, true)
+	assert.NoError(t, err)
+
+	type testStruct struct {
+		InputKey     tileKey
+		InputRowType string
+		InputSubKey  string
+
+		ExpectedRowName string
+	}
+	// test data is valid, but arbitrary.
+	tests := []testStruct{
+		{
+			InputKey:     tileKey(2147483647),
+			InputRowType: typeTrace,
+			InputSubKey:  ",0=1,1=3,3=0,",
+
+			ExpectedRowName: "09:ts:t:2147483647:,0=1,1=3,3=0,",
+		},
+		{
+			InputKey:     tileKey(2147483647),
+			InputRowType: typeTrace,
+			InputSubKey:  ",0=1,1=3,9=0,",
+
+			ExpectedRowName: "13:ts:t:2147483647:,0=1,1=3,9=0,",
+		},
+		{
+			InputKey:     tileKey(2147483540),
+			InputRowType: typeDigestMap,
+			InputSubKey:  "abc",
+
+			ExpectedRowName: "02:ts:d:2147483540:abc",
+		},
+		{
+			InputKey:     tileKey(2147483540),
+			InputRowType: typeDigestMap,
+			InputSubKey:  "bcd",
+
+			ExpectedRowName: "25:ts:d:2147483540:bcd",
+		},
+	}
+
+	for _, test := range tests {
+		row := traceStore.calcShardedRowName(test.InputKey, test.InputRowType, test.InputSubKey)
+		assert.Equal(t, test.ExpectedRowName, row)
+	}
+}
+
+// TestRowAndColNameFromDigest tests the internal workings of sharding
+// a digest for use in the digest map
+func TestRowAndColNameFromDigest(t *testing.T) {
+	unittest.LargeTest(t)
+	unittest.RequiresBigTableEmulator(t)
+
+	btConf := BTConfig{
+		// Leaving other things blank because we won't actually hit BT
+		// or use the VCS.
+	}
+
+	ctx := context.Background()
+	traceStore, err := New(ctx, btConf, true)
+	assert.NoError(t, err)
+
+	type testStruct struct {
+		InputDigest types.Digest
+
+		ExpectedRowName string
+		ExpectedColName string
+	}
+	// test data is valid, but arbitrary.
+	tests := []testStruct{
+		{
+			InputDigest:     types.Digest("9e1d402515193304cafbdf02b8fd751b"),
+			ExpectedRowName: "21:ts:d:0000000000:9e1",
+			ExpectedColName: "d402515193304cafbdf02b8fd751b",
+		},
+		{
+			InputDigest:     types.Digest("e30a49351a7e45b9591a989073e755b2"),
+			ExpectedRowName: "05:ts:d:0000000000:e30",
+			ExpectedColName: "a49351a7e45b9591a989073e755b2",
+		},
+		{
+			InputDigest:     types.Digest("60b5e978d116cccc0ef12278d724245b"),
+			ExpectedRowName: "23:ts:d:0000000000:60b",
+			ExpectedColName: "5e978d116cccc0ef12278d724245b",
+		},
+	}
+
+	for _, test := range tests {
+		row, col := traceStore.rowAndColNameFromDigest(test.InputDigest)
+		assert.Equal(t, test.ExpectedRowName, row)
+		assert.Equal(t, test.ExpectedColName, col)
+	}
+}
+
+const (
+	// Directory with testdata.
+	TEST_DATA_DIR = "./testdata"
+
+	// Local file location of the test data.
+	TEST_DATA_PATH = TEST_DATA_DIR + "/10-test-sample-4bytes.tile"
+
+	// Folder in the testdata bucket. See go/testutils for details.
+	TEST_DATA_STORAGE_PATH = "gold-testdata/10-test-sample-4bytes.tile"
+
+	TILE_LENGTH = 50
+)
+
+// TestBTTraceStoreLargeTile stores a large amount of data into the tracestore
+// and retrieves it.
+func TestBTTraceStoreLargeTile(t *testing.T) {
+	unittest.LargeTest(t)
+	unittest.RequiresBigTableEmulator(t)
+
+	btConf, mvcs, tile := setupLargeTile(t)
+	defer mvcs.AssertExpectations(t)
+
+	ctx := context.Background()
+
+	traceStore, err := New(ctx, btConf, true)
+	assert.NoError(t, err)
+
+	// For each value in tile get the traceIDs that are not empty.
+	traceIDsPerCommit := make([]tiling.TraceIdSlice, TILE_LENGTH)
+	for traceID, trace := range tile.Traces {
+		gTrace := trace.(*types.GoldenTrace)
+		for i := 0; i < TILE_LENGTH; i++ {
+			if gTrace.Digests[i] != types.MISSING_DIGEST {
+				traceIDsPerCommit[i] = append(traceIDsPerCommit[i], traceID)
+			}
+		}
+	}
+	indices := make([]int, TILE_LENGTH)
+	maxIndex := 0
+	maxLen := len(traceIDsPerCommit[0])
+	for idx := range indices {
+		if len(traceIDsPerCommit[idx]) > maxLen {
+			maxLen = len(traceIDsPerCommit[idx])
+			maxIndex = idx
+		}
+		indices[idx] = idx
+	}
+
+	// Ingest the biggest tile.
+	entries := []*tracestore.Entry{}
+	allDigests := map[types.Digest]bool{"": true}
+	for _, traceID := range traceIDsPerCommit[maxIndex] {
+		t := tile.Traces[traceID].(*types.GoldenTrace)
+		digest := t.Digests[maxIndex]
+		allDigests[digest] = true
+		entries = append(entries, &tracestore.Entry{Digest: digest, Params: t.Params()})
+	}
+	assert.NoError(t, traceStore.Put(ctx, tile.Commits[maxIndex].Hash, entries, time.Now()))
+
+	foundDigestMap, err := traceStore.getDigestMap(ctx)
+	assert.NoError(t, err)
+	assert.Equal(t, len(allDigests), foundDigestMap.Len())
+
+	for digest := range allDigests {
+		id, err := foundDigestMap.ID(digest)
+		assert.NoError(t, err)
+		if digest == "" {
+			assert.Equal(t, missingDigestID, id)
+		} else {
+			assert.NotEqual(t, missingDigestID, id)
+		}
+	}
+
+	traceIDsPerCommit[maxIndex] = []tiling.TraceId{}
+
+	// Randomly add samples from the tile to that
+	for len(indices) > 0 {
+		idx := indices[0]
+		indices = indices[1:]
+		if len(traceIDsPerCommit[idx]) == 0 {
+			continue
+		}
+
+		entries := []*tracestore.Entry{}
+		for _, traceID := range traceIDsPerCommit[idx] {
+			t := tile.Traces[traceID].(*types.GoldenTrace)
+			digest := t.Digests[idx]
+			allDigests[digest] = true
+			entries = append(entries, &tracestore.Entry{Digest: digest, Params: t.Params()})
+		}
+		assert.NoError(t, traceStore.Put(ctx, tile.Commits[idx].Hash, entries, time.Now()))
+	}
+
+	// Load the tile and verify it's identical.
+	foundTile, commits, err := traceStore.GetTile(ctx, TILE_LENGTH)
+	assert.NoError(t, err)
+	assert.NotNil(t, commits)
+	assert.Equal(t, tile.Commits[len(tile.Commits)-TILE_LENGTH:], commits)
+
+	assert.Equal(t, len(tile.Traces), len(foundTile.Traces))
+	for traceID, trace := range tile.Traces {
+		gt := trace.(*types.GoldenTrace)
+		params := gt.Params()
+		found := false
+
+		foundCount := 0
+		for _, foundTrace := range foundTile.Traces {
+			if deepequal.DeepEqual(params, foundTrace.Params()) {
+				foundCount++
+			}
+		}
+		assert.Equal(t, 1, foundCount)
+
+		for foundID, foundTrace := range foundTile.Traces {
+			if deepequal.DeepEqual(params, foundTrace.Params()) {
+				expDigests := gt.Digests[len(gt.Digests)-TILE_LENGTH:]
+				found = true
+				fgt := foundTrace.(*types.GoldenTrace)
+				assert.Equal(t, len(expDigests), len(fgt.Digests))
+
+				var diff []string
+				diffStr := ""
+				for idx, digest := range expDigests {
+					isDiff := digest != fgt.Digests[idx]
+					if isDiff {
+						diff = append(diff, fmt.Sprintf("%d", idx))
+						diffStr += fmt.Sprintf("    %q  !=  %q   \n", digest, fgt.Digests[idx])
+					}
+				}
+				// Nothing should be different
+				assert.Nil(t, diff)
+				assert.Equal(t, "", diffStr)
+
+				delete(foundTile.Traces, foundID)
+				break
+			}
+		}
+		assert.True(t, found)
+		delete(tile.Traces, traceID)
+	}
+	assert.Equal(t, 0, len(foundTile.Traces))
+	assert.Equal(t, 0, len(tile.Traces))
+}
+
+func setupLargeTile(t sktest.TestingT) (BTConfig, *mock_vcs.VCS, *tiling.Tile) {
+	if !fileutil.FileExists(TEST_DATA_PATH) {
+		err := gcs_testutils.DownloadTestDataFile(t, gcs_testutils.TEST_DATA_BUCKET, TEST_DATA_STORAGE_PATH, TEST_DATA_PATH)
+		assert.NoError(t, err, "Unable to download testdata.")
+	}
+
+	tile := makeSampleTile(t, TEST_DATA_PATH)
+	assert.Len(t, tile.Commits, TILE_LENGTH)
+
+	mvcs := MockVCSWithCommits(tile.Commits, 0)
+
+	btConf := BTConfig{
+		ProjectID:  "should-use-the-emulator",
+		InstanceID: "testinstance",
+		TableID:    "large_tile_test",
+		VCS:        mvcs,
+	}
+
+	assert.NoError(t, bt.DeleteTables(btConf.ProjectID, btConf.InstanceID, btConf.TableID))
+	assert.NoError(t, InitBT(btConf))
+	fmt.Println("BT emulator set up")
+	return btConf, mvcs, tile
+}
+
+func makeSampleTile(t sktest.TestingT, fileName string) *tiling.Tile {
+	file, err := os.Open(fileName)
+	assert.NoError(t, err)
+
+	sample, err := serialize.DeserializeSample(file)
+	assert.NoError(t, err)
+
+	return sample.Tile
+}
+
+func MockVCSWithCommits(commits []*tiling.Commit, offset int) *mock_vcs.VCS {
+	mvcs := &mock_vcs.VCS{}
+
+	indexCommits := make([]*vcsinfo.IndexCommit, 0, len(commits))
+	hashes := make([]string, 0, len(commits))
+	longCommits := make([]*vcsinfo.LongCommit, 0, len(commits))
+	for i, c := range commits {
+		mvcs.On("IndexOf", ctx, c.Hash).Return(i+offset, nil).Maybe()
+
+		indexCommits = append(indexCommits, &vcsinfo.IndexCommit{
+			Hash:      c.Hash,
+			Index:     i + offset,
+			Timestamp: time.Unix(c.CommitTime, 0),
+		})
+		hashes = append(hashes, c.Hash)
+		longCommits = append(longCommits, &vcsinfo.LongCommit{
+			ShortCommit: &vcsinfo.ShortCommit{
+				Hash:    c.Hash,
+				Author:  c.Author,
+				Subject: fmt.Sprintf("Commit #%d in test", i),
+			},
+			Timestamp: time.Unix(c.CommitTime, 0),
+		})
+	}
+
+	mvcs.On("LastNIndex", len(commits)).Return(indexCommits)
+	mvcs.On("DetailsMulti", ctx, hashes, false).Return(longCommits, nil)
+
+	return mvcs
+}
+
+func MockSparseVCSWithCommits(commits []*tiling.Commit, realCommitIndices []int, totalCommits int) (*mock_vcs.VCS, []*vcsinfo.LongCommit) {
+	mvcs := &mock_vcs.VCS{}
+	if len(commits) != len(realCommitIndices) {
+		panic("commits should be same length as realCommitIndices")
+	}
+
+	// Create many synthetic commits.
+	indexCommits := make([]*vcsinfo.IndexCommit, totalCommits)
+	longCommits := make([]*vcsinfo.LongCommit, totalCommits)
+	hashes := []string{}
+	for i := 0; i < totalCommits; i++ {
+		h := fmt.Sprintf("%040d", i)
+		indexCommits[i] = &vcsinfo.IndexCommit{
+			Hash:  h,
+			Index: i,
+			// space the commits 1700 seconds apart, starting at the epoch
+			// This is an arbitrary amount of space.
+			Timestamp: time.Unix(int64(i*1700), 0),
+		}
+
+		longCommits[i] = &vcsinfo.LongCommit{
+			ShortCommit: &vcsinfo.ShortCommit{
+				Hash:   h,
+				Author: "nobody@example.com",
+			},
+			Timestamp: time.Unix(int64(i*1700), 0),
+		}
+		hashes = append(hashes, h)
+
+	}
+
+	for i, c := range commits {
+		index := realCommitIndices[i]
+		mvcs.On("IndexOf", ctx, c.Hash).Return(index, nil).Maybe()
+		indexCommits[index] = &vcsinfo.IndexCommit{
+			Hash:      c.Hash,
+			Index:     index,
+			Timestamp: time.Unix(int64(index*1700), 0),
+		}
+		hashes[index] = c.Hash
+		longCommits[index] = &vcsinfo.LongCommit{
+			ShortCommit: &vcsinfo.ShortCommit{
+				Hash:    c.Hash,
+				Author:  c.Author,
+				Subject: fmt.Sprintf("Real commit #%d in test", i),
+			},
+			Timestamp: time.Unix(int64(index*1700), 0),
+		}
+	}
+
+	firstRealCommitIdx := realCommitIndices[0]
+	mvcs.On("ByIndex", ctx, firstRealCommitIdx).Return(longCommits[firstRealCommitIdx], nil).Maybe()
+	mvcs.On("From", mock.Anything).Return(hashes[firstRealCommitIdx:], nil).Maybe()
+	mvcs.On("LastNIndex", 1).Return(indexCommits[totalCommits-1:]).Maybe()
+	mvcs.On("DetailsMulti", ctx, hashes[firstRealCommitIdx:], false).Return(longCommits[firstRealCommitIdx:], nil).Maybe()
+
+	return mvcs, longCommits
+}
+
+var ctx = mock.AnythingOfType("*context.emptyCtx")
diff --git a/golden/go/tracestore/bt_tracestore/types.go b/golden/go/tracestore/bt_tracestore/types.go
new file mode 100644
index 0000000..20d5d56
--- /dev/null
+++ b/golden/go/tracestore/bt_tracestore/types.go
@@ -0,0 +1,386 @@
+package bt_tracestore
+
+import (
+	"crypto/md5"
+	"encoding"
+	"encoding/binary"
+	"fmt"
+	"time"
+
+	"cloud.google.com/go/bigtable"
+	"go.skia.org/infra/go/paramtools"
+	"go.skia.org/infra/go/skerr"
+	"go.skia.org/infra/go/tiling"
+	"go.skia.org/infra/golden/go/types"
+)
+
+// Constants adapted from btts.go
+// See BIGTABLE.md for an overview of how the data is stored in BT.
+const (
+	// Namespace for this package's data. Due to the fact that there is one table per instance,
+	// This makes sure we don't have collisions between traces and something else
+	// in BT (i.e. git info)
+	traceStoreNameSpace = "ts"
+
+	// Column Families.
+	// https://cloud.google.com/bigtable/docs/schema-design#column_families_and_column_qualifiers
+	digestMapFamily = "D" // Store a mapping of Digest->DigestID
+	idCounterFamily = "I" // Keeps a monotonically increasing number to generate DigestIDs
+	opsFamily       = "O" // ops short for Ordered Param Set
+	traceFamily     = "T" // Holds "0"..."tilesize-1" columns with a DigestID at each cell
+
+	// The columns (and rows) in the digest map family are derived from the digest, which are md5
+	// hashes. The row will be the first three characters of the hash and the column will
+	// be the remaining characters (see b.rowAndColNameFromDigest)
+	// All entries will have the following prefix
+	digestMapFamilyPrefix = digestMapFamily + ":"
+	// All entries are part of a global map (set tile to 0)
+	digestMapTile = tileKey(0)
+
+	// Columns in the ID counter family. There is only one row and one column.
+	idCounterColumn = "idc"
+
+	// Columns in the OrderedParamSet column family.
+	opsHashColumn   = "H"
+	opsOpsColumn    = "OPS"
+	hashFullColName = opsFamily + ":" + opsHashColumn
+	opsFullColName  = opsFamily + ":" + opsOpsColumn
+
+	// The columns in the trace family are "0", "1", "2"..."N" where N is
+	// the BT tile size (default below). These values correspond to the commitOffset,
+	// where 0 is the first (most recent) commit in the tile and N is the last (oldest)
+	// commit in the tile.
+	// They will all have the following prefix.
+	traceFamilyPrefix = traceFamily + ":"
+
+	// Define the row types.
+	typeDigestMap = "d"
+	typeIdCounter = "i"
+	typeOPS       = "o"
+	typeTrace     = "t"
+
+	// This is the size of the tile in Big Table. That is, how many commits do we store in one tile.
+	// We can have up to 2^32 tiles in big table, so this would let us store 1 trillion
+	// commits worth of data. This tile size does not need to be related to the tile size that
+	// Gold operates on (although when tuning, it should be greater than, or an even divisor
+	// of the Gold tile size). The first commit in the repo belongs to tile 2^32-1 and tile numbers
+	// decrease for newer commits.
+	DefaultTileSize = 256
+
+	// Default number of shards used. A shard splits the traces up on a tile.
+	// If a trace exists on shard N in tile A, it will be on shard N for all tiles.
+	// Having traces on shards lets BT split up the work more evenly.
+	DefaultShards = 32
+
+	// To avoid many successive increment calls to the id counter cell, we request a number of
+	// ids at once. This number is arbitrarily picked and can be increased if need be.
+	numReservedIds = 256
+
+	readTimeout  = 4 * time.Minute
+	writeTimeout = 10 * time.Minute
+
+	// BadTileKey is returned in error conditions.
+	badTileKey = tileKey(-1)
+
+	// missingDigestID is the id for types.MISSING_DIGEST
+	missingDigestID = digestID(0)
+)
+
+// List of families (conceptually similar to tables) we are creating in BT.
+var btColumnFamilies = []string{
+	traceFamily,
+	opsFamily,
+	idCounterFamily,
+	digestMapFamily,
+}
+
+// We have one global digest id counter, so just hard-code it to tile 0.
+var idCounterRow = unshardedRowName(typeIdCounter, 0)
+
+// tileKey is the identifier for each tile held in BigTable.
+//
+// Note that tile keys are in the opposite order of tile offset, that is, the first commit
+// in a repo goes in the first tile, which has key 2^32-1. We do this so more recent
+// tiles come first in sort order.
+type tileKey int32
+
+// digestID is an arbitrary number for referring to a types.Digest (string)
+// that is stored in a digestMap.
+type digestID uint64
+
+// MarshalBinary implements the encoding.BinaryMarshaler interface, allowing for
+// us to compactly store these to BT (about 50% space savings over string representation).
+func (d digestID) MarshalBinary() ([]byte, error) {
+	rv := make([]byte, binary.MaxVarintLen64)
+	n := binary.PutUvarint(rv, uint64(d))
+	return rv[:n], nil
+}
+
+var _ encoding.BinaryMarshaler = digestID(0)
+
+// UnmarshalBinary implements the encoding.BinaryUnmarshaler interface, allowing for
+// us to compactly store these to BT (about 50% space savings over string representation).
+func (d *digestID) UnmarshalBinary(data []byte) error {
+	val, n := binary.Uvarint(data)
+	if n != len(data) {
+		return skerr.Fmt("Error decoding digestID from %x; Uvarint consumed %d bytes instead of %d", data, n, len(data))
+	}
+	*d = digestID(val)
+	return nil
+}
+
+var _ encoding.BinaryUnmarshaler = (*digestID)(nil)
+
+// encodedTraceID is a shortened form of a tiling.TraceId, e.g. 0=1,1=3,3=0,
+// Those indices are references to the OrderedParamSet stored in encTile.
+// See params.paramsEncoder
+type encodedTraceID string
+
+// encTile contains an encoded tile.
+type encTile struct {
+	// maps a trace id to the list of digest ids. The list corresponds to the commits, with index
+	// 0 being the oldest commit and the last commit being the most recent.
+	traces map[encodedTraceID][]digestID
+	ops    *paramtools.OrderedParamSet
+}
+
+// When ingesting we keep a cache of the OrderedParamSets we have seen per-tile.
+type opsCacheEntry struct {
+	ops  *paramtools.OrderedParamSet
+	hash string // md5 has of the serialized ops - used for deterministic querying.
+}
+
+// opsCacheEntryFromOPS creates and fills in an OpsCacheEntry from the given
+// OrderedParamSet and sets the hash appropriately.
+func opsCacheEntryFromOPS(ops *paramtools.OrderedParamSet) (*opsCacheEntry, error) {
+	buf, err := ops.Encode()
+	if err != nil {
+		return nil, skerr.Fmt("could not encode the given ops to bytes: %s", err)
+	}
+	hash := fmt.Sprintf("%x", md5.Sum(buf))
+	return &opsCacheEntry{
+		ops:  ops,
+		hash: hash,
+	}, nil
+}
+
+// newOpsCacheEntry returns an empty OpsCacheEntry.
+func newOpsCacheEntry() (*opsCacheEntry, error) {
+	return opsCacheEntryFromOPS(paramtools.NewOrderedParamSet())
+}
+
+// newOpsCacheEntryFromRow loads the appropriate data from the given BT row
+// and returns a OpsCacheEntry with that data.
+func newOpsCacheEntryFromRow(row bigtable.Row) (*opsCacheEntry, error) {
+	family := row[opsFamily]
+	if len(family) != 2 {
+		// This should never happen
+		return nil, skerr.Fmt("incorrect number of of OPS columns in BT for key %s, %d != 2", row.Key(), len(family))
+	}
+	ops := &paramtools.OrderedParamSet{}
+	hash := ""
+	for _, col := range family {
+		if col.Column == opsFullColName {
+			var err error
+			ops, err = paramtools.NewOrderedParamSetFromBytes(col.Value)
+			if err != nil {
+				// should never happen
+				return nil, skerr.Fmt("corrupted paramset in BT for key %s: %s", row.Key(), err)
+			}
+		} else if col.Column == hashFullColName {
+			hash = string(col.Value)
+		}
+	}
+	if hash == "" {
+		return nil, skerr.Fmt("missing hash for OPS for key %s: %#v", row.Key(), ops)
+	}
+	// You might be tempted to use opsCacheEntryFromOps and
+	// check that entry.hash == hash here, but that will fail
+	// because GoB encoding of maps is not deterministic.
+	entry := opsCacheEntry{
+		ops:  ops,
+		hash: hash,
+	}
+	return &entry, nil
+}
+
+// A digestMap keeps track of the mapping between encoded digestID and their corresponding
+// types.Digest.
+type digestMap struct {
+	intMap map[digestID]types.Digest
+	strMap map[types.Digest]digestID
+}
+
+// newDigestMap creates an empty digestMap with the given capacity.
+func newDigestMap(cap int) *digestMap {
+	ret := &digestMap{
+		intMap: make(map[digestID]types.Digest, cap),
+		strMap: make(map[types.Digest]digestID, cap),
+	}
+	ret.intMap[missingDigestID] = types.MISSING_DIGEST
+	ret.strMap[types.MISSING_DIGEST] = missingDigestID
+	return ret
+}
+
+// Delta returns a []types.Digest of those passed in digests that are
+// not in this mapping currently.
+func (d *digestMap) Delta(digests map[types.Digest]bool) []types.Digest {
+	ret := make([]types.Digest, 0, len(digests))
+	for digest := range digests {
+		if _, ok := d.strMap[digest]; !ok {
+			ret = append(ret, digest)
+		}
+	}
+	return ret
+}
+
+// Add expands the map with the given entries. It fails if any key or any value
+// is already in the mapping.
+func (d *digestMap) Add(newEntries map[types.Digest]digestID) error {
+	for digest, id := range newEntries {
+		if digest == types.MISSING_DIGEST || id == 0 {
+			return skerr.Fmt("invalid input id or digest: (%q -> %d)", digest, id)
+		}
+
+		foundID, strExists := d.strMap[digest]
+		foundDigest, intExists := d.intMap[id]
+		if strExists && intExists {
+			if (foundID != id) || (foundDigest != digest) {
+				return skerr.Fmt("inconsistent data - got (%q -> %d) when (%q -> %d) was expected", digest, id, foundDigest, foundID)
+			}
+			// Already contained so this is a no-op.
+			return nil
+		}
+
+		if strExists || intExists {
+			return skerr.Fmt("internal inconsistency - expected forward mapping (%q -> %d) and reverse mapping (%d -> %q) to both be present", digest, foundID, id, foundDigest)
+		}
+
+		// New mapping. Add it.
+		d.intMap[id] = digest
+		d.strMap[digest] = id
+	}
+	return nil
+}
+
+// ID returns the DigestID for a given types.Digest.
+func (d *digestMap) ID(digest types.Digest) (digestID, error) {
+	ret, ok := d.strMap[digest]
+	if !ok {
+		return 0, skerr.Fmt("unable to find id for %q", digest)
+	}
+	return ret, nil
+}
+
+// DecodeIDs is like Digest but in bulk.
+func (d *digestMap) DecodeIDs(ids []digestID) ([]types.Digest, error) {
+	ret := make([]types.Digest, len(ids))
+	var ok bool
+	for idx, id := range ids {
+		ret[idx], ok = d.intMap[id]
+		if !ok {
+			return nil, skerr.Fmt("unable to find id %d in intMap", id)
+		}
+	}
+	return ret, nil
+}
+
+// Digest returns the types.Digest for a given DigestID.
+func (d *digestMap) Digest(id digestID) (types.Digest, error) {
+	ret, ok := d.intMap[id]
+	if !ok {
+		return "", skerr.Fmt("unable to find digest for %d", id)
+	}
+	return ret, nil
+}
+
+// Len returns how many map entries are in this map.
+func (d *digestMap) Len() int {
+	return len(d.strMap)
+}
+
+// Define this as a type so we can define some helper functions.
+type traceMap map[tiling.TraceId]tiling.Trace
+
+// CommitIndicesWithData returns the indexes of the commits with at least one non-missing
+// digest in at least one trace.
+func (t traceMap) CommitIndicesWithData() []int {
+	if len(t) == 0 {
+		return nil
+	}
+	numCommits := 0
+	for _, trace := range t {
+		gt := trace.(*types.GoldenTrace)
+		numCommits = len(gt.Digests)
+		break
+	}
+	var haveData []int
+	for i := 0; i < numCommits; i++ {
+		for _, trace := range t {
+			gt := trace.(*types.GoldenTrace)
+			if !gt.IsMissing(i) {
+				haveData = append(haveData, i)
+				break
+			}
+		}
+	}
+	return haveData
+}
+
+// MakeFromCommitIndexes creates a new traceMap from the data in this one that
+// only has the digests belonging to the given commit indices. Conceptually,
+// this grabs a subset of the commit columns from the tile.
+func (t traceMap) MakeFromCommitIndexes(indices []int) traceMap {
+	if len(indices) == 0 {
+		return traceMap{}
+	}
+	r := make(traceMap, len(t))
+	for id, trace := range t {
+		gt := trace.(*types.GoldenTrace)
+
+		newDigests := make([]types.Digest, len(indices))
+		for i, idx := range indices {
+			newDigests[i] = gt.Digests[idx]
+		}
+
+		r[id] = &types.GoldenTrace{
+			Keys:    gt.Keys,
+			Digests: newDigests,
+		}
+	}
+	return r
+}
+
+// PrependTraces augments this traceMap with the data from the given one.
+// Specifically, it prepends that data, assuming the "other" data came
+// before the data in this map.
+// TODO(kjlubick): Deduplicate this with tiling.Merge
+func (t traceMap) PrependTraces(other traceMap) {
+	numCommits := 0
+	for _, trace := range t {
+		gt := trace.(*types.GoldenTrace)
+		numCommits = len(gt.Digests)
+		break
+	}
+
+	numOtherCommits := 0
+	for id, trace := range other {
+		numOtherCommits = trace.Len()
+		original, ok := t[id]
+		if ok {
+			// Keys are constant and are what the id is derived from
+			t[id] = trace.Merge(original)
+		} else {
+			// if we stopped seeing the trace in t, we need to pad the end with MISSING_DIGEST
+			trace.Grow(numOtherCommits+numCommits, tiling.FILL_AFTER) // Assumes we can modify other
+			t[id] = trace
+		}
+	}
+
+	// if we saw a trace in t, but not in other, we need to pad the beginning with MISSING_DIGEST
+	for id, trace := range t {
+		if _, ok := other[id]; !ok {
+			trace.Grow(numOtherCommits+numCommits, tiling.FILL_BEFORE)
+		}
+	}
+}
diff --git a/golden/go/tracestore/bt_tracestore/types_test.go b/golden/go/tracestore/bt_tracestore/types_test.go
new file mode 100644
index 0000000..6aefc3d
--- /dev/null
+++ b/golden/go/tracestore/bt_tracestore/types_test.go
@@ -0,0 +1,267 @@
+package bt_tracestore
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+	"go.skia.org/infra/go/testutils/unittest"
+	"go.skia.org/infra/golden/go/types"
+)
+
+// TestDigestMapMissingDigest verifies a newly created digest map
+// starts off with the mapping for a missing digest and nothing else.
+func TestDigestMapMissingDigest(t *testing.T) {
+	unittest.SmallTest(t)
+
+	digestMap := newDigestMap(1000)
+	assert.Equal(t, 1, digestMap.Len())
+	id, err := digestMap.ID(types.MISSING_DIGEST)
+	assert.NoError(t, err)
+	assert.Equal(t, id, missingDigestID)
+
+	_, err = digestMap.ID(AlphaDigest)
+	assert.Error(t, err)
+	assert.Contains(t, err.Error(), "unable to find")
+}
+
+// TestDigestMapAddGet tests the three ways to get data out of the map.
+func TestDigestMapAddGet(t *testing.T) {
+	unittest.SmallTest(t)
+
+	digestMap := newDigestMap(1)
+	err := digestMap.Add(map[types.Digest]digestID{
+		AlphaDigest: AlphaID,
+		BetaDigest:  BetaID,
+		GammaDigest: GammaID,
+	})
+	assert.NoError(t, err)
+	assert.Equal(t, 4, digestMap.Len())
+
+	digests, err := digestMap.DecodeIDs([]digestID{GammaID, AlphaID})
+	assert.NoError(t, err)
+	assert.Equal(t, digests, []types.Digest{GammaDigest, AlphaDigest})
+
+	d, err := digestMap.Digest(BetaID)
+	assert.NoError(t, err)
+	assert.Equal(t, BetaDigest, d)
+
+	id, err := digestMap.ID(BetaDigest)
+	assert.NoError(t, err)
+	assert.Equal(t, BetaID, id)
+}
+
+// TestDigestMapAddBadGet tests getting things out of the map that don't exist.
+func TestDigestMapAddBadGet(t *testing.T) {
+	unittest.SmallTest(t)
+
+	notExistID := digestID(99)
+	notExistDigest := types.Digest("fffe39544765f38baab53350aef79966")
+
+	digestMap := newDigestMap(1)
+	err := digestMap.Add(map[types.Digest]digestID{
+		AlphaDigest: AlphaID,
+		BetaDigest:  BetaID,
+		GammaDigest: GammaID,
+	})
+	assert.NoError(t, err)
+	_, err = digestMap.DecodeIDs([]digestID{AlphaID, notExistID})
+	assert.Error(t, err)
+	assert.Contains(t, err.Error(), "unable to find id")
+
+	_, err = digestMap.Digest(notExistID)
+	assert.Error(t, err)
+	assert.Contains(t, err.Error(), "unable to find digest")
+
+	_, err = digestMap.ID(notExistDigest)
+	assert.Error(t, err)
+	assert.Contains(t, err.Error(), "unable to find id")
+}
+
+// TestDigestMapBadAdd tests some corner cases with adding things to the map.
+func TestDigestMapBadAdd(t *testing.T) {
+	unittest.SmallTest(t)
+
+	digestMap := newDigestMap(1)
+	err := digestMap.Add(map[types.Digest]digestID{AlphaDigest: AlphaID})
+	assert.NoError(t, err)
+	err = digestMap.Add(map[types.Digest]digestID{BetaDigest: BetaID})
+	assert.NoError(t, err)
+
+	// Adding something multiple times is no error
+	err = digestMap.Add(map[types.Digest]digestID{BetaDigest: BetaID})
+	assert.NoError(t, err)
+	err = digestMap.Add(map[types.Digest]digestID{BetaDigest: BetaID})
+	assert.NoError(t, err)
+
+	// Can't add something with MISSING_DIGEST as a key ...
+	err = digestMap.Add(map[types.Digest]digestID{types.MISSING_DIGEST: 5})
+	assert.Error(t, err)
+	assert.Contains(t, err.Error(), "invalid input id")
+	// ... or as a value.
+	err = digestMap.Add(map[types.Digest]digestID{BetaDigest: missingDigestID})
+	assert.Error(t, err)
+	assert.Contains(t, err.Error(), "invalid input id")
+
+	// Can't add something that is already in the map as a key
+	err = digestMap.Add(map[types.Digest]digestID{BetaDigest: GammaID})
+	assert.Error(t, err)
+	assert.Contains(t, err.Error(), "internal inconsistency")
+	// ... or as a value.
+	err = digestMap.Add(map[types.Digest]digestID{GammaDigest: BetaID})
+	assert.Error(t, err)
+	assert.Contains(t, err.Error(), "internal inconsistency")
+
+	// Can't mix up data that has already been seen before
+	err = digestMap.Add(map[types.Digest]digestID{AlphaDigest: BetaID})
+	assert.Error(t, err)
+	assert.Contains(t, err.Error(), "inconsistent data")
+}
+
+func TestTraceMapCommitIndicesWithData(t *testing.T) {
+	unittest.SmallTest(t)
+
+	tm := traceMap{
+		",key=first,": &types.GoldenTrace{
+			Digests: []types.Digest{
+				types.MISSING_DIGEST, types.MISSING_DIGEST, AlphaDigest,
+				AlphaDigest, types.MISSING_DIGEST, BetaDigest,
+			},
+		},
+		",key=second,": &types.GoldenTrace{
+			Digests: []types.Digest{
+				GammaDigest, types.MISSING_DIGEST, types.MISSING_DIGEST,
+				GammaDigest, types.MISSING_DIGEST, GammaDigest,
+			},
+		},
+	}
+	assert.Equal(t, []int{0, 2, 3, 5}, tm.CommitIndicesWithData())
+
+	empty := traceMap{
+		",key=first,": &types.GoldenTrace{
+			Digests: []types.Digest{
+				types.MISSING_DIGEST, types.MISSING_DIGEST, types.MISSING_DIGEST,
+			},
+		},
+		",key=second,": &types.GoldenTrace{
+			Digests: []types.Digest{
+				types.MISSING_DIGEST, types.MISSING_DIGEST, types.MISSING_DIGEST,
+			},
+		},
+	}
+	assert.Nil(t, empty.CommitIndicesWithData())
+
+	assert.Nil(t, traceMap{}.CommitIndicesWithData())
+}
+
+func TestTraceMapMakeFromCommitIndexes(t *testing.T) {
+	unittest.SmallTest(t)
+
+	tm := traceMap{
+		",key=first,": &types.GoldenTrace{
+			Digests: []types.Digest{
+				types.MISSING_DIGEST, types.MISSING_DIGEST, AlphaDigest,
+				AlphaDigest, types.MISSING_DIGEST, BetaDigest,
+			},
+		},
+		",key=second,": &types.GoldenTrace{
+			Digests: []types.Digest{
+				GammaDigest, types.MISSING_DIGEST, types.MISSING_DIGEST,
+				GammaDigest, types.MISSING_DIGEST, GammaDigest,
+			},
+		},
+	}
+
+	assert.Equal(t, traceMap{
+		",key=first,": &types.GoldenTrace{
+			Digests: []types.Digest{
+				types.MISSING_DIGEST, AlphaDigest,
+				AlphaDigest, BetaDigest,
+			},
+		},
+		",key=second,": &types.GoldenTrace{
+			Digests: []types.Digest{
+				GammaDigest, types.MISSING_DIGEST,
+				GammaDigest, GammaDigest,
+			},
+		},
+	}, tm.MakeFromCommitIndexes([]int{0, 2, 3, 5}))
+
+	assert.Equal(t, traceMap{
+		",key=first,": &types.GoldenTrace{
+			Digests: []types.Digest{
+				types.MISSING_DIGEST, types.MISSING_DIGEST, types.MISSING_DIGEST,
+			},
+		},
+		",key=second,": &types.GoldenTrace{
+			Digests: []types.Digest{
+				GammaDigest, types.MISSING_DIGEST, types.MISSING_DIGEST,
+			},
+		},
+	}, tm.MakeFromCommitIndexes([]int{0, 1, 4}))
+
+	assert.Equal(t, traceMap{}, tm.MakeFromCommitIndexes([]int{}))
+	assert.Equal(t, traceMap{}, tm.MakeFromCommitIndexes(nil))
+}
+
+func TestTraceMapPrependTraces(t *testing.T) {
+	unittest.SmallTest(t)
+
+	tm1 := traceMap{
+		",key=first,": &types.GoldenTrace{
+			Digests: []types.Digest{
+				types.MISSING_DIGEST, types.MISSING_DIGEST, AlphaDigest,
+			},
+		},
+		",key=second,": &types.GoldenTrace{
+			Digests: []types.Digest{
+				GammaDigest, types.MISSING_DIGEST, types.MISSING_DIGEST,
+			},
+		},
+	}
+
+	tm2 := traceMap{
+		",key=first,": &types.GoldenTrace{
+			Digests: []types.Digest{
+				types.MISSING_DIGEST, GammaDigest,
+			},
+		},
+		",key=third,": &types.GoldenTrace{
+			Digests: []types.Digest{
+				GammaDigest, BetaDigest,
+			},
+		},
+	}
+
+	tm1.PrependTraces(tm2)
+
+	assert.Equal(t, traceMap{
+		",key=first,": &types.GoldenTrace{
+			Digests: []types.Digest{
+				types.MISSING_DIGEST, GammaDigest,
+				types.MISSING_DIGEST, types.MISSING_DIGEST, AlphaDigest,
+			},
+		},
+		",key=second,": &types.GoldenTrace{
+			Digests: []types.Digest{
+				types.MISSING_DIGEST, types.MISSING_DIGEST,
+				GammaDigest, types.MISSING_DIGEST, types.MISSING_DIGEST,
+			},
+		},
+		",key=third,": &types.GoldenTrace{
+			Digests: []types.Digest{
+				GammaDigest, BetaDigest,
+				types.MISSING_DIGEST, types.MISSING_DIGEST, types.MISSING_DIGEST,
+			},
+		},
+	}, tm1)
+}
+
+const (
+	AlphaDigest = types.Digest("aaa6fc936d06e6569788366f1e3fda4e")
+	BetaDigest  = types.Digest("bbb15c047d150d961573062854f35a55")
+	GammaDigest = types.Digest("cccd42f3ee0b02687f63963adb36a580")
+
+	AlphaID = digestID(1)
+	BetaID  = digestID(2)
+	GammaID = digestID(3)
+)
diff --git a/golden/go/tracestore/bt_tracestore/util.go b/golden/go/tracestore/bt_tracestore/util.go
new file mode 100644
index 0000000..9bb8c87
--- /dev/null
+++ b/golden/go/tracestore/bt_tracestore/util.go
@@ -0,0 +1,45 @@
+package bt_tracestore
+
+import (
+	"fmt"
+	"math"
+	"strings"
+)
+
+// tileKeyFromIndex converts the tile index to the tileKey.
+// See BIGTABLE.md for more on this conversion.
+func tileKeyFromIndex(tileIndex int32) tileKey {
+	if tileIndex < 0 {
+		return badTileKey
+	}
+	return tileKey(math.MaxInt32 - tileIndex)
+}
+
+// OpsRowName returns the name of the BigTable row which stores the OrderedParamSet
+// for this tile.
+func (t tileKey) OpsRowName() string {
+	return unshardedRowName(typeOPS, t)
+}
+
+// unshardedRowName calculates the row for the given data which all has the same format:
+// :[namespace]:[type]:[tile]:
+func unshardedRowName(rowType string, tileKey tileKey) string {
+	return fmt.Sprintf(":%s:%s:%010d:", traceStoreNameSpace, rowType, tileKey)
+}
+
+// shardedRowName calculates the row for the given data which all has the same format:
+// [shard]:[namespace]:[type]:[tile]:[subkey]
+// For some data types, where there is only one row,  or when doing a prefix-match,
+// subkey may be "".
+func shardedRowName(shard int32, rowType string, tileKey tileKey, subkey string) string {
+	return fmt.Sprintf("%02d:%s:%s:%010d:%s", shard, traceStoreNameSpace, rowType, tileKey, subkey)
+}
+
+// extractKey returns the subkey from the given row name. This could be "".
+func extractSubkey(rowName string) string {
+	parts := strings.Split(rowName, ":")
+	if len(parts) == 0 {
+		return ""
+	}
+	return parts[len(parts)-1]
+}
diff --git a/golden/go/tracestore/bt_tracestore/util_test.go b/golden/go/tracestore/bt_tracestore/util_test.go
new file mode 100644
index 0000000..586a593
--- /dev/null
+++ b/golden/go/tracestore/bt_tracestore/util_test.go
@@ -0,0 +1,57 @@
+package bt_tracestore
+
+import (
+	"math"
+	"testing"
+
+	assert "github.com/stretchr/testify/require"
+	"go.skia.org/infra/go/testutils/unittest"
+)
+
+// The tests in this package are mainly to make sure changes that
+// are not backward-compatible are detected.
+
+func TestTileKeyFromIndex(t *testing.T) {
+	unittest.SmallTest(t)
+
+	// spot-check some arbitrary values
+	assert.Equal(t, tileKey(2147483647), tileKeyFromIndex(0))
+	assert.Equal(t, tileKey(2147483451), tileKeyFromIndex(196))
+	assert.Equal(t, tileKey(908536335), tileKeyFromIndex(1238947312))
+}
+
+func TestOpsRowName(t *testing.T) {
+	unittest.SmallTest(t)
+
+	// spot-check some arbitrary values
+	assert.Equal(t, ":ts:o:2147483647:", tileKeyFromIndex(0).OpsRowName())
+	assert.Equal(t, ":ts:o:2147483451:", tileKeyFromIndex(196).OpsRowName())
+	assert.Equal(t, ":ts:o:0908536335:", tileKeyFromIndex(1238947312).OpsRowName())
+}
+
+func TestShardedRowName(t *testing.T) {
+	unittest.SmallTest(t)
+
+	shard := int32(3) // arbitrarily picked
+	tileZeroKey := tileKey(math.MaxInt32 - 1)
+	veryNewTileKey := tileKey(57)
+
+	// Example RowName for a trace
+	encodedTrace := ",0=1,1=3,3=0,"
+	assert.Equal(t, "03:ts:t:2147483646:,0=1,1=3,3=0,", shardedRowName(shard, typeTrace, tileZeroKey, encodedTrace))
+	assert.Equal(t, "03:ts:t:0000000057:,0=1,1=3,3=0,", shardedRowName(shard, typeTrace, veryNewTileKey, encodedTrace))
+
+	// Example RowName for a digest
+	// digests are stored in a row based on the first three characters and a
+	// column with the remaining characters.
+	digestPrefix := string(AlphaDigest[:3])
+	assert.Equal(t, "03:ts:d:2147483646:aaa", shardedRowName(shard, typeDigestMap, tileZeroKey, digestPrefix))
+}
+
+func TestExtractKeyFromRowName(t *testing.T) {
+	unittest.SmallTest(t)
+
+	assert.Equal(t, "ae3", extractSubkey("07:ts:d:2147483646:ae3"))
+	assert.Equal(t, "", extractSubkey(":ts:o:2147483646:"))
+	assert.Equal(t, ",0=1,1=3,3=0,", extractSubkey("03:ts:t:2147483646:,0=1,1=3,3=0,"))
+}
diff --git a/golden/go/tracestore/types.go b/golden/go/tracestore/types.go
new file mode 100644
index 0000000..7b70785
--- /dev/null
+++ b/golden/go/tracestore/types.go
@@ -0,0 +1,75 @@
+package tracestore
+
+import (
+	"context"
+	"regexp"
+	"time"
+
+	"go.skia.org/infra/go/paramtools"
+	"go.skia.org/infra/go/query"
+	"go.skia.org/infra/go/sklog"
+	"go.skia.org/infra/go/tiling"
+	"go.skia.org/infra/golden/go/types"
+)
+
+// Entry is one digests and related params to be added to the TraceStore.
+// TODO(kjlubick): Make sure this does not include the keys included in
+// options.
+type Entry struct {
+	// Params describe the configuration that produced the digest/image.
+	Params map[string]string
+
+	// Digest references the image that was generated by the test.
+	Digest types.Digest
+}
+
+// TraceStore is the interface to store trace data.
+type TraceStore interface {
+	// Put writes the given entries to the TraceStore at the given commit hash. The timestamp is
+	// assumed to be the time when the entries were generated.
+	// It is undefined behavior to have multiple entries with the exact same Params.
+	Put(ctx context.Context, commitHash string, entries []*Entry, ts time.Time) error
+
+	// GetTile reads the last n commits and returns them as a tile.
+	// The second return value is all commits that are in the tile.
+	GetTile(ctx context.Context, nCommits int) (*tiling.Tile, []*tiling.Commit, error)
+
+	// GetDenseTile constructs a tile containing only commits that have data for at least one trace.
+	// The returned tile will always have length exactly nCommits unless there are fewer than
+	// nCommits commits with data. The second return value contains all commits starting with the
+	// first commit of the tile and ending with the most recent commit, in order; i.e. it includes
+	// all commits in the tile as well as the omitted commits.
+	GetDenseTile(ctx context.Context, nCommits int) (*tiling.Tile, []*tiling.Commit, error)
+}
+
+// TraceIDFromParams deterministically returns a TraceId that uniquely encodes
+// the given params. It follows the same convention as perf's trace ids, that
+// is something like ",key1=value1,key2=value2,...," where the keys
+// are in alphabetical order.
+func TraceIDFromParams(params paramtools.Params) tiling.TraceId {
+	// Clean up any params with , or =
+	params = forceValid(params)
+	s, err := query.MakeKeyFast(params)
+	if err != nil {
+		sklog.Warningf("Invalid params passed in for trace id %#v: %s", params, err)
+	}
+	return tiling.TraceId(s)
+}
+
+var (
+	invalidChar = regexp.MustCompile("([,=])")
+)
+
+func clean(s string) string {
+	return invalidChar.ReplaceAllLiteralString(s, "_")
+}
+
+// forceValid ensures that the resulting map will make a valid structured key.
+func forceValid(m map[string]string) map[string]string {
+	ret := make(map[string]string, len(m))
+	for key, value := range m {
+		ret[clean(key)] = clean(value)
+	}
+
+	return ret
+}
diff --git a/golden/go/tracestore/types_test.go b/golden/go/tracestore/types_test.go
new file mode 100644
index 0000000..d535146
--- /dev/null
+++ b/golden/go/tracestore/types_test.go
@@ -0,0 +1,42 @@
+package tracestore
+
+import (
+	"testing"
+
+	assert "github.com/stretchr/testify/require"
+	"go.skia.org/infra/go/paramtools"
+	"go.skia.org/infra/go/testutils/unittest"
+	"go.skia.org/infra/go/tiling"
+	"go.skia.org/infra/golden/go/types"
+)
+
+func TestTraceIDFromParams(t *testing.T) {
+	unittest.SmallTest(t)
+
+	input := paramtools.Params{
+		"cpu":                   "x86",
+		"gpu":                   "nVidia",
+		types.PRIMARY_KEY_FIELD: "test_alpha",
+		types.CORPUS_FIELD:      "dm",
+	}
+
+	expected := tiling.TraceId(",cpu=x86,gpu=nVidia,name=test_alpha,source_type=dm,")
+
+	assert.Equal(t, expected, TraceIDFromParams(input))
+}
+
+// TestTraceIDFromParamsMalicious adds some values with invalid chars.
+func TestTraceIDFromParamsMalicious(t *testing.T) {
+	unittest.SmallTest(t)
+
+	input := paramtools.Params{
+		"c=p,u":                 `"x86"`,
+		"gpu":                   "nVi,,=dia",
+		types.PRIMARY_KEY_FIELD: "test=alpha",
+		types.CORPUS_FIELD:      "dm!",
+	}
+
+	expected := tiling.TraceId(`,c_p_u="x86",gpu=nVi___dia,name=test_alpha,source_type=dm!,`)
+
+	assert.Equal(t, expected, TraceIDFromParams(input))
+}
diff --git a/golden/go/types/types.go b/golden/go/types/types.go
index 62b8dc3..10ca302 100644
--- a/golden/go/types/types.go
+++ b/golden/go/types/types.go
@@ -387,6 +387,11 @@
 	return -1
 }
 
+// String prints a human friendly version of this trace.
+func (g *GoldenTrace) String() string {
+	return fmt.Sprintf("Keys: %#v, Digests: %q", g.Keys, g.Digests)
+}
+
 // NewGoldenTrace allocates a new Trace set up for the given number of samples.
 //
 // The Trace Digests are pre-filled in with the missing data sentinel since not
diff --git a/perf/BIGTABLE.md b/perf/BIGTABLE.md
index 6990c05..f219726 100644
--- a/perf/BIGTABLE.md
+++ b/perf/BIGTABLE.md
@@ -94,8 +94,8 @@
 
 Values used in row names:
 
-    TileKey = 2^22 - (tile number)
-       - With 256 values per tile this let's us store 1 billion values per trace.
+    TileKey = (2^32-1) - (tile number)
+       - With 256 values per tile this lets us store 1 trillion values per trace.
        - Formatted as %07d
        - Note that this reverses the order of the tiles, i.e. new tiles have
          smaller numbers, so that we can do a simple query to find the newest tile.