[gold] Implements a tracestore based on BigTable
How to review this CL:
1. Read bt_tracestore/BIGTABLE.md to get an overview of how the
data is stored in BT.
2. Browse through the consts and types in bt_tracestore/types.go
3. Read TestBTTraceStorePutGet to get a sense of how this code will be used.
4. Read the implementations for Put and GetTile, as that's the core pathway.
5. Look at everything else.
Initially uploaded as https://skia-review.googlesource.com/c/buildbot/+/213381
but taken over by me.
Bug: skia:9096
Change-Id: Ida1279f315abc5c21fcca42563e77b16342cc4c4
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/213426
Commit-Queue: Kevin Lubick <kjlubick@google.com>
Reviewed-by: Ben Wagner aka dogben <benjaminwagner@google.com>
diff --git a/go/Makefile b/go/Makefile
index 12feed1..398314d 100644
--- a/go/Makefile
+++ b/go/Makefile
@@ -26,4 +26,4 @@
mocks:
go get github.com/vektra/mockery/...
- go generate ./...
\ No newline at end of file
+ go generate ./...
diff --git a/go/gitstore/bt_gitstore/bt_gitstore.go b/go/gitstore/bt_gitstore/bt_gitstore.go
index 77bb840..fa1d4d3 100644
--- a/go/gitstore/bt_gitstore/bt_gitstore.go
+++ b/go/gitstore/bt_gitstore/bt_gitstore.go
@@ -454,12 +454,12 @@
egroup.Go(func() error {
rowNames := rowNames[chunkStart:chunkEnd]
mutations := mutations[chunkStart:chunkEnd]
- errs, err := b.table.ApplyBulk(context.TODO(), rowNames, mutations)
+ errs, err := b.table.ApplyBulk(ctx, rowNames, mutations)
if err != nil {
- return skerr.Fmt("Error writing batch: %s", err)
+ return skerr.Fmt("Error writing batch [%d:%d]: %s", chunkStart, chunkEnd, err)
}
if errs != nil {
- return skerr.Fmt("Error writing some portions of batch: %s", errs)
+ return skerr.Fmt("Error writing some portions of batch [%d:%d]: %s", chunkStart, chunkEnd, errs)
}
return nil
})
diff --git a/go/paramtools/params.go b/go/paramtools/params.go
index ddf32bf..7e33bb2 100644
--- a/go/paramtools/params.go
+++ b/go/paramtools/params.go
@@ -10,6 +10,7 @@
"strings"
"sync"
+ "go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/util"
)
@@ -378,15 +379,15 @@
}
parts := strings.Split(pair, "=")
if len(parts) != 2 {
- return nil, fmt.Errorf("Failed to parse: %s", pair)
+ return nil, skerr.Fmt("failed to parse: %s", pair)
}
key, ok := p.keys[parts[0]]
if !ok {
- return nil, fmt.Errorf("Failed to find key: %q", parts[0])
+ return nil, skerr.Fmt("failed to find key: %q", parts[0])
}
value, ok := p.values[parts[0]][parts[1]]
if !ok {
- return nil, fmt.Errorf("Failed to find value: %q", parts[1])
+ return nil, skerr.Fmt("failed to find value %q in %q (%q)", parts[1], p.values[parts[0]], parts[0])
}
ret[key] = value
}
@@ -450,7 +451,7 @@
return o.paramsEncoder
}
-// EncodeParamsAsString encodes the Params as a string.
+// EncodeParamsAsString encodes the Params as a string containing indices.
func (o *OrderedParamSet) EncodeParamsAsString(p Params) (string, error) {
return o.getParamsEncoder().encodeAsString(p)
}
diff --git a/go/util/util.go b/go/util/util.go
index 1f71be7..6f28a0d 100644
--- a/go/util/util.go
+++ b/go/util/util.go
@@ -185,6 +185,23 @@
return r
}
+// ReverseString reverses a string. It may not handle well for UTF
+// combining characters.
+// https://groups.google.com/forum/#!topic/golang-nuts/oPuBaYJ17t4
+func ReverseString(input string) string {
+ // Get Unicode code points.
+ runes := []rune(input)
+ n := len(runes)
+
+ // Reverse
+ for i := 0; i < n/2; i++ {
+ runes[i], runes[n-1-i] = runes[n-1-i], runes[i]
+ }
+
+ // Convert back to UTF-8.
+ return string(runes)
+}
+
// InsertString inserts the given string into the slice at the given index.
func InsertString(strs []string, idx int, s string) []string {
oldLen := len(strs)
diff --git a/go/vcsinfo/mocks/VCS.go b/go/vcsinfo/mocks/VCS.go
new file mode 100644
index 0000000..176048c
--- /dev/null
+++ b/go/vcsinfo/mocks/VCS.go
@@ -0,0 +1,221 @@
+// Code generated by mockery v1.0.0. DO NOT EDIT.
+
+package mocks
+
+import context "context"
+import mock "github.com/stretchr/testify/mock"
+import time "time"
+import vcsinfo "go.skia.org/infra/go/vcsinfo"
+
+// VCS is an autogenerated mock type for the VCS type
+type VCS struct {
+ mock.Mock
+}
+
+// ByIndex provides a mock function with given fields: ctx, N
+func (_m *VCS) ByIndex(ctx context.Context, N int) (*vcsinfo.LongCommit, error) {
+ ret := _m.Called(ctx, N)
+
+ var r0 *vcsinfo.LongCommit
+ if rf, ok := ret.Get(0).(func(context.Context, int) *vcsinfo.LongCommit); ok {
+ r0 = rf(ctx, N)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(*vcsinfo.LongCommit)
+ }
+ }
+
+ var r1 error
+ if rf, ok := ret.Get(1).(func(context.Context, int) error); ok {
+ r1 = rf(ctx, N)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// Details provides a mock function with given fields: ctx, hash, includeBranchInfo
+func (_m *VCS) Details(ctx context.Context, hash string, includeBranchInfo bool) (*vcsinfo.LongCommit, error) {
+ ret := _m.Called(ctx, hash, includeBranchInfo)
+
+ var r0 *vcsinfo.LongCommit
+ if rf, ok := ret.Get(0).(func(context.Context, string, bool) *vcsinfo.LongCommit); ok {
+ r0 = rf(ctx, hash, includeBranchInfo)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(*vcsinfo.LongCommit)
+ }
+ }
+
+ var r1 error
+ if rf, ok := ret.Get(1).(func(context.Context, string, bool) error); ok {
+ r1 = rf(ctx, hash, includeBranchInfo)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// DetailsMulti provides a mock function with given fields: ctx, hashes, includeBranchInfo
+func (_m *VCS) DetailsMulti(ctx context.Context, hashes []string, includeBranchInfo bool) ([]*vcsinfo.LongCommit, error) {
+ ret := _m.Called(ctx, hashes, includeBranchInfo)
+
+ var r0 []*vcsinfo.LongCommit
+ if rf, ok := ret.Get(0).(func(context.Context, []string, bool) []*vcsinfo.LongCommit); ok {
+ r0 = rf(ctx, hashes, includeBranchInfo)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).([]*vcsinfo.LongCommit)
+ }
+ }
+
+ var r1 error
+ if rf, ok := ret.Get(1).(func(context.Context, []string, bool) error); ok {
+ r1 = rf(ctx, hashes, includeBranchInfo)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// From provides a mock function with given fields: start
+func (_m *VCS) From(start time.Time) []string {
+ ret := _m.Called(start)
+
+ var r0 []string
+ if rf, ok := ret.Get(0).(func(time.Time) []string); ok {
+ r0 = rf(start)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).([]string)
+ }
+ }
+
+ return r0
+}
+
+// GetBranch provides a mock function with given fields:
+func (_m *VCS) GetBranch() string {
+ ret := _m.Called()
+
+ var r0 string
+ if rf, ok := ret.Get(0).(func() string); ok {
+ r0 = rf()
+ } else {
+ r0 = ret.Get(0).(string)
+ }
+
+ return r0
+}
+
+// GetFile provides a mock function with given fields: ctx, fileName, commitHash
+func (_m *VCS) GetFile(ctx context.Context, fileName string, commitHash string) (string, error) {
+ ret := _m.Called(ctx, fileName, commitHash)
+
+ var r0 string
+ if rf, ok := ret.Get(0).(func(context.Context, string, string) string); ok {
+ r0 = rf(ctx, fileName, commitHash)
+ } else {
+ r0 = ret.Get(0).(string)
+ }
+
+ var r1 error
+ if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok {
+ r1 = rf(ctx, fileName, commitHash)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// IndexOf provides a mock function with given fields: ctx, hash
+func (_m *VCS) IndexOf(ctx context.Context, hash string) (int, error) {
+ ret := _m.Called(ctx, hash)
+
+ var r0 int
+ if rf, ok := ret.Get(0).(func(context.Context, string) int); ok {
+ r0 = rf(ctx, hash)
+ } else {
+ r0 = ret.Get(0).(int)
+ }
+
+ var r1 error
+ if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
+ r1 = rf(ctx, hash)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// LastNIndex provides a mock function with given fields: N
+func (_m *VCS) LastNIndex(N int) []*vcsinfo.IndexCommit {
+ ret := _m.Called(N)
+
+ var r0 []*vcsinfo.IndexCommit
+ if rf, ok := ret.Get(0).(func(int) []*vcsinfo.IndexCommit); ok {
+ r0 = rf(N)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).([]*vcsinfo.IndexCommit)
+ }
+ }
+
+ return r0
+}
+
+// Range provides a mock function with given fields: begin, end
+func (_m *VCS) Range(begin time.Time, end time.Time) []*vcsinfo.IndexCommit {
+ ret := _m.Called(begin, end)
+
+ var r0 []*vcsinfo.IndexCommit
+ if rf, ok := ret.Get(0).(func(time.Time, time.Time) []*vcsinfo.IndexCommit); ok {
+ r0 = rf(begin, end)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).([]*vcsinfo.IndexCommit)
+ }
+ }
+
+ return r0
+}
+
+// ResolveCommit provides a mock function with given fields: ctx, commitHash
+func (_m *VCS) ResolveCommit(ctx context.Context, commitHash string) (string, error) {
+ ret := _m.Called(ctx, commitHash)
+
+ var r0 string
+ if rf, ok := ret.Get(0).(func(context.Context, string) string); ok {
+ r0 = rf(ctx, commitHash)
+ } else {
+ r0 = ret.Get(0).(string)
+ }
+
+ var r1 error
+ if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
+ r1 = rf(ctx, commitHash)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// Update provides a mock function with given fields: ctx, pull, allBranches
+func (_m *VCS) Update(ctx context.Context, pull bool, allBranches bool) error {
+ ret := _m.Called(ctx, pull, allBranches)
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func(context.Context, bool, bool) error); ok {
+ r0 = rf(ctx, pull, allBranches)
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
diff --git a/go/vcsinfo/mocks/generate.go b/go/vcsinfo/mocks/generate.go
new file mode 100644
index 0000000..f926ff3
--- /dev/null
+++ b/go/vcsinfo/mocks/generate.go
@@ -0,0 +1,3 @@
+package mocks
+
+//go:generate mockery -name VCS -dir .. -output .
diff --git a/go/vcsinfo/types.go b/go/vcsinfo/types.go
index e6a980c..593ff23 100644
--- a/go/vcsinfo/types.go
+++ b/go/vcsinfo/types.go
@@ -83,7 +83,7 @@
// includes 'begin' and excludes 'end'.
Range(begin, end time.Time) []*IndexCommit
- // IndexOf returns the index of the commit hash, where 0 is the index of the first hash.
+ // IndexOf returns the index of the commit hash, where 0 is the index of the first commit.
IndexOf(ctx context.Context, hash string) (int, error)
// ByIndex returns a LongCommit describing the commit
diff --git a/golden/go/digest_counter/digest_counter_test.go b/golden/go/digest_counter/digest_counter_test.go
index 0656410..58a2782 100644
--- a/golden/go/digest_counter/digest_counter_test.go
+++ b/golden/go/digest_counter/digest_counter_test.go
@@ -120,19 +120,17 @@
AlphaTest = types.TestName("test_alpha")
BetaTest = types.TestName("test_beta")
- x86TestAlphaTraceID = tiling.TraceId("x86:test_alpha:gm")
- x64TestAlphaTraceID = tiling.TraceId("x86_64:test_alpha:image")
+ // TraceIDs are created like tracestore.TraceIDFromParams
+ x86TestAlphaTraceID = tiling.TraceId(",config=x86,source_type=test_alpha,name=gm")
+ x64TestAlphaTraceID = tiling.TraceId(",config=x86_64,source_type=test_alpha,name=image")
- x64TestBetaTraceID = tiling.TraceId("x86_64:test_beta:image")
+ x64TestBetaTraceID = tiling.TraceId(",config=x86_64,source_type=test_beta,name=image")
)
func makePartialTileOne() *tiling.Tile {
return &tiling.Tile{
// Commits, Scale and Tile Index omitted (should not affect things)
-
Traces: map[tiling.TraceId]tiling.Trace{
- // Reminder that the ids for the traces are created by concatenating
- // all the values in alphabetical order of the keys.
x86TestAlphaTraceID: &types.GoldenTrace{
Digests: types.DigestSlice{FirstDigest, FirstDigest, SecondDigest},
Keys: map[string]string{
diff --git a/golden/go/goldingestion/goldingestion_test.go b/golden/go/goldingestion/goldingestion_test.go
index 137f093..e3d0bdf 100644
--- a/golden/go/goldingestion/goldingestion_test.go
+++ b/golden/go/goldingestion/goldingestion_test.go
@@ -41,6 +41,8 @@
var (
// trace ids and values that are contained in the test file.
+ // These trace ids are the old format, new ones are like:
+ // ,key1=value1,key2=value2
TEST_ENTRIES = []struct {
key tiling.TraceId
value types.Digest
diff --git a/golden/go/indexer/indexer_test.go b/golden/go/indexer/indexer_test.go
index 094fc4e..9ee68e2 100644
--- a/golden/go/indexer/indexer_test.go
+++ b/golden/go/indexer/indexer_test.go
@@ -247,8 +247,8 @@
func makeComplexTileWithCrosshatchIgnores() (types.ComplexTile, *tiling.Tile, *tiling.Tile) {
fullTile := data.MakeTestTile()
partialTile := data.MakeTestTile()
- delete(partialTile.Traces, "crosshatch:test_alpha:gm")
- delete(partialTile.Traces, "crosshatch:test_beta:gm")
+ delete(partialTile.Traces, ",device=crosshatch,name=test_alpha,source_type=gm,")
+ delete(partialTile.Traces, ",device=crosshatch,name=test_beta,source_type=gm,")
ct := types.NewComplexTile(fullTile)
ct.SetIgnoreRules(partialTile, []paramtools.ParamSet{
diff --git a/golden/go/paramsets/paramsets_test.go b/golden/go/paramsets/paramsets_test.go
index 2aa9930..03ad627 100644
--- a/golden/go/paramsets/paramsets_test.go
+++ b/golden/go/paramsets/paramsets_test.go
@@ -158,7 +158,7 @@
// Commits, Scale and Tile Index omitted (should not affect things)
Traces: map[tiling.TraceId]tiling.Trace{
// These trace ids have been shortened for test terseness.
- // A real trace id would be like "8888:gm:foo"
+ // A real trace id would be like ",config=8888,source_type=gm,name=foo,"
"a": &types.GoldenTrace{
Digests: types.DigestSlice{DigestA, DigestB},
Keys: map[string]string{
diff --git a/golden/go/summary/summary_test.go b/golden/go/summary/summary_test.go
index 4b64d0c..1ef9692 100644
--- a/golden/go/summary/summary_test.go
+++ b/golden/go/summary/summary_test.go
@@ -86,7 +86,7 @@
tile := &tiling.Tile{
Traces: map[tiling.TraceId]tiling.Trace{
// These trace ids have been shortened for test terseness.
- // A real trace id would be like "8888:gm:test_first"
+ // A real trace id would be like ",config=8888,source_type=gm,name=foo,"
"a": &types.GoldenTrace{
Digests: types.DigestSlice{"aaa", "bbb"},
Keys: map[string]string{
diff --git a/golden/go/testutils/data_three_devices/three_devices.go b/golden/go/testutils/data_three_devices/three_devices.go
index fa88386..1c281ebf 100644
--- a/golden/go/testutils/data_three_devices/three_devices.go
+++ b/golden/go/testutils/data_three_devices/three_devices.go
@@ -44,6 +44,10 @@
AlphaTest = types.TestName("test_alpha")
BetaTest = types.TestName("test_beta")
+
+ AnglerDevice = "angler"
+ BullheadDevice = "bullhead"
+ CrosshatchDevice = "crosshatch"
)
func MakeTestBaseline() *baseline.Baseline {
@@ -95,63 +99,71 @@
func MakeTestTile() *tiling.Tile {
return &tiling.Tile{
Commits: MakeTestCommits(),
- Scale: 1,
+ Scale: 0, // tile contains every data point.
TileIndex: 0,
Traces: map[tiling.TraceId]tiling.Trace{
- // Reminder that the ids for the traces are created by concatenating
- // all the values in alphabetical order of the keys.
- "angler:test_alpha:gm": &types.GoldenTrace{
+ // Reminder that the ids for the traces are created using the
+ // logic in query.MakeKeyFast
+ ",device=angler,name=test_alpha,source_type=gm,": &types.GoldenTrace{
Digests: types.DigestSlice{AlphaBad1Digest, AlphaBad1Digest, AlphaGood1Digest},
Keys: map[string]string{
- "device": "angler",
+ "device": AnglerDevice,
types.PRIMARY_KEY_FIELD: string(AlphaTest),
types.CORPUS_FIELD: "gm",
},
},
- "angler:test_beta:gm": &types.GoldenTrace{
+ ",device=angler,name=test_beta,source_type=gm,": &types.GoldenTrace{
Digests: types.DigestSlice{BetaGood1Digest, BetaGood1Digest, BetaGood1Digest},
Keys: map[string]string{
- "device": "angler",
+ "device": AnglerDevice,
types.PRIMARY_KEY_FIELD: string(BetaTest),
types.CORPUS_FIELD: "gm",
},
},
- "bullhead:test_alpha:gm": &types.GoldenTrace{
+ ",device=bullhead,name=test_alpha,source_type=gm,": &types.GoldenTrace{
Digests: types.DigestSlice{AlphaBad1Digest, AlphaBad1Digest, AlphaUntriaged1Digest},
Keys: map[string]string{
- "device": "bullhead",
+ "device": BullheadDevice,
types.PRIMARY_KEY_FIELD: string(AlphaTest),
types.CORPUS_FIELD: "gm",
},
},
- "bullhead:test_beta:gm": &types.GoldenTrace{
+ ",device=bullhead,name=test_beta,source_type=gm,": &types.GoldenTrace{
Digests: types.DigestSlice{BetaGood1Digest, BetaGood1Digest, BetaGood1Digest},
Keys: map[string]string{
- "device": "bullhead",
+ "device": BullheadDevice,
types.PRIMARY_KEY_FIELD: string(BetaTest),
types.CORPUS_FIELD: "gm",
},
},
- "crosshatch:test_alpha:gm": &types.GoldenTrace{
+ ",device=crosshatch,name=test_alpha,source_type=gm,": &types.GoldenTrace{
Digests: types.DigestSlice{AlphaBad1Digest, AlphaBad1Digest, AlphaGood1Digest},
Keys: map[string]string{
- "device": "crosshatch",
+ "device": CrosshatchDevice,
types.PRIMARY_KEY_FIELD: string(AlphaTest),
types.CORPUS_FIELD: "gm",
},
},
- "crosshatch:test_beta:gm": &types.GoldenTrace{
+ ",device=crosshatch,name=test_beta,source_type=gm,": &types.GoldenTrace{
Digests: types.DigestSlice{BetaUntriaged1Digest, types.MISSING_DIGEST, types.MISSING_DIGEST},
Keys: map[string]string{
- "device": "crosshatch",
+ "device": CrosshatchDevice,
types.PRIMARY_KEY_FIELD: string(BetaTest),
types.CORPUS_FIELD: "gm",
},
},
},
+
+ // Summarizes all the keys and values seen in this tile
+ // The values should be in alphabetical order (see paramset.Normalize())
+ ParamSet: map[string][]string{
+ "device": {AnglerDevice, BullheadDevice, CrosshatchDevice},
+ types.PRIMARY_KEY_FIELD: {string(AlphaTest), string(BetaTest)},
+ types.CORPUS_FIELD: {"gm"},
+ },
}
}
diff --git a/golden/go/tracestore/bt_tracestore/.gitignore b/golden/go/tracestore/bt_tracestore/.gitignore
new file mode 100644
index 0000000..d383c56
--- /dev/null
+++ b/golden/go/tracestore/bt_tracestore/.gitignore
@@ -0,0 +1 @@
+testdata
diff --git a/golden/go/tracestore/bt_tracestore/BIGTABLE.md b/golden/go/tracestore/bt_tracestore/BIGTABLE.md
new file mode 100644
index 0000000..ac6cec6
--- /dev/null
+++ b/golden/go/tracestore/bt_tracestore/BIGTABLE.md
@@ -0,0 +1,209 @@
+Storing Gold traces in BigTable
+===============================
+
+This implementation is based on the original btts.go made for Perf.
+See perf/BIGTABLE.md for an overview of that schema.
+
+Note that we assume our Big Table instance is strongly consistent, which is
+only valid if there is no replication (which we shouldn't need for now).
+
+Data we want to store
+---------------------
+
+We need to store traces, which are arranged in a table.
+Traces have commits as their columns, a tiling.traceID as their rows, and
+a digest as their cells. These traceIDs are derived from the key:value pairs
+associated with the digest, e.g. {"os":"Android", "gpu": "NVidia1080"}
+
+Gold has an idea of a tile, which is a collection of N of these trace columns (i.e. commits).
+
+
+Mapping the data to BigTable
+----------------------------
+
+BigTable (BT) stores its data as one big table. Concretely, we have
+
+```
+ | col_1 | col_2 | col_3 | ...
+==================================
+row_1| [data] | [data] | [data] | ...
+row_2| [data] | [data] | [data] | ...
+...
+```
+
+For performance, BT provides [Column Families](https://cloud.google.com/bigtable/docs/schema-design#column_families_and_column_qualifiers)
+which group columns together so you can query some cells from a row belonging
+to that family w/o getting everything. As an analogy, if the whole table is a Sheets
+workbook, a column family can be thought of as a single spreadsheet in that workbook (at least,
+for our purposes).
+
+BT uses the term "Column Qualifier", which essentially just means "column name", because they
+can be arbitrary strings.
+
+It's tempting to just put the commit hashes as columns, the trace ids as the row names and store the
+digests in the cells, but this isn't ideal for these reasons:
+
+ 1. traceIDs get really long with lots of params - BT has a limit of 4kb per
+ row name (aka a row key).
+ 2. digests are long strings that are repeated a lot - if we stored them as int64s, it would save
+ a lot of data when we fetch a row (32 bytes per cell -> 8 bytes per cell).
+ 3. BT has a soft limit of 100Mb of data per row.
+ 4. BT will perform better if each row contains only the columns we are interested in
+ reading. Since we only really want to fetch the last few hundred commits, we would
+ like to limit each row to contain enough data for a single tile.
+ See also https://cloud.google.com/bigtable/docs/performance#slower-perf
+ 5. We want to split our rows up further into shards, so we can execute multiple queries at once
+ (one query per shard).
+
+To address these performance issues, we need to store our traces and some auxiliary data.
+
+ 1. An OrderedParamSet that can convert tiling.TraceId (long string) <->
+ EncodedTraceId (short string).
+ 2. A DigestMap that can convert types.Digest (string) <-> DigestID (int64).
+ 3. A monotonically increasing counter to derive more DigestIDs.
+
+These 3 "tables" along with the traces will be stored using 4 Column Families and can
+logically thought of being stored as independent "tables" or "spreadsheets" even
+though they are all stored in the "one big table".
+
+Row naming scheme
+-----------------
+There is a row naming scheme (for all 4 tables) as follows:
+
+ [shard]:[namespace]:[type]:[tile]:[subkey]
+
+Where shard and subkey are optional (can be ""). Some tables have tile with a constant value.
+"namespace" is constant for all tracestore data: "ts". Reminder that there's one table per Gold
+instance, so if we store other data to BT (e.g. expectations, tryjobs, etc) we need to have
+several unique namespaces.
+
+tile is a 0 padded 32 bit number (2^32-1) - [tile number].
+For example, tile 0 (the oldest commits) is number `2147483646`.
+Note that this reverses the order of the tiles, i.e. new tiles have
+smaller numbers, so that we can do a simple query to find the newest tile.
+
+BigTable can easily fetch rows starting with a given prefix, so this naming schema
+is set up to request things of a type for one or more tiles, with optional sharding.
+
+Note that sharding is a thing we have enabled by our choice of row names, not something
+given out for free by BigTable.
+
+Gold (and Perf) shards its data based on the subkey (conceptually subkey % 32)
+This makes the workload be spread more evenly, even when fetching only one tile.
+The shards come first in the naming convention to try to spread the rows across multiple
+tablets for better performance (rows on BT are stored on tablets sorted by the entire row name).
+
+Storing the OrderedParamSet for traceIDs
+----------------------------------------
+As mentioned above, traceIDs are derived from the paramtools.Params map of key-value pairs.
+We compress these maps using a paramstool.OrderedParamSet which concretely look like:
+
+ ,0=1,4=2,3=1,
+
+To do this compression/decompression, we need to store the OrderedParamSet (OPS).
+There is one OPS per tile. Conceptually, an OPS is stored like:
+```
+ | OPS | H |
+==============================
+ops_tile0 | [bytes] | [hash] |
+ops_tile1 | [bytes] | [hash] |
+...
+```
+
+The bytes stored in under the "OPS" column are just a gob encoded OrderedParamSet and
+the hash stored under the "H" column is the hash of the OPS, used to query the row when updating.
+
+As mentioned before, ops_tile0 is a conceptual simplification of the actual
+row name. Given the row naming schema, we define "type" for OPS to be "o" and
+let "shard" and "subkey" both be "".
+Thus, the actual row for tile 0 would be
+
+ :ts:o:2147483646:
+
+
+Storing the Digest Map
+----------------------
+We need to store a (potentially large) map of types.Digest (string) <-> DigestID (int64).
+This map is global across tiles. Conceptually, it is stored like:
+```
+ | [digest1] | [digest2] | [digest3] | ...
+====================================================
+map_000 | [blank] | [int64] | [blank] |
+map_001 | [int64] | [blank] | [blank] |
+...
+map_ffe | [blank] | [blank] | [blank] |
+map_fff | [blank] | [blank] | [int64] |
+...
+```
+
+Basically, we take a digest, chop off the first three characters, use those as
+the "subkey" in the row so we can make sure our rows don't exceed the maximum size.
+Given the soft row limit of 100Mb, a digest column + a cell is at most 40 bytes,
+which means a soft limit of 2.5 million digests per row. Splitting on 3 hex
+characters means we have 4096-way splitting, so a soft limit of 10 billion digests.
+
+Why not just have rows be the digests and a single column with the id? BT can only fetch
+so many rows per second (see https://cloud.google.com/bigtable/docs/performance#typical-workloads)
+If we had a million digests in the single column schema, one SSD node would take 100 seconds
+to request those million rows, where for the three character schema, it would take about .4 seconds.
+
+Given that we define "type" for the digest map to be "d", an
+example row for digest "92eb5ffee6ae2fec3ad71c777531578f"
+(assume this subkey is tied to shard 19) would be:
+
+ 19:ts:d:0000000000:92e
+
+The ids of digests out start at 0 (for MISSING_DIGEST aka "") and increase monotonically.
+We manage these ids ourselves because using the autogenerated ids can lead to issues when
+migrating data from one table to another.
+
+Storing the Digest ID counter
+-----------------------------
+We assign newly seen digests an id that is simply an increasing int64. To store
+this int64 in BT, we essentially have a single cell dedicated to this:
+
+```
+ | idc |
+====================================================
+digest_counter | [int64] |
+```
+We have one global id counter to go with the one global digestMap.
+
+The interfacing code will take care not to constantly update this value (BT frowns upon
+having very "hot" rows/cells) by requesting new ids in batches and saving them locally.
+
+There's only one row for the counter, which is:
+
+ :ts:i:0000000000:
+
+Storing the traces
+------------------
+
+With all the auxiliary data set, we can look at how the traces themselves are stored.
+Going with the default tile size of 256, the data would be like:
+
+```
+ | offset0 | offset1 | ... | offset255
+====================================================
+tile0_trace0 | [dig_1] | [dig_1] | ... | [dig_2] |
+tile0_trace1 | [dig_1] | [dig_2] | ... | [dig_2] |
+...
+tile0_traceN | [dig_8] | [blank] | ... | [dig_6] |
+tile1_trace0 | [blank] | [dig_2] | ... | [blank] |
+...
+```
+
+The columns are the offset into a tile of a commit. For example, the third commit in a repo would
+end up in tile 0, offset 3. The 1000th commit with a tile size of 256 would be in
+tile 3 (1000 / 256), offset 232 (1000 % 256). This has the effect of wrapping a given
+trace across many tiles.
+
+The rows follow the standard naming scheme, using "t" as "type", and making use of the shards
+(32 by default). The value for "subkey" is the encoded ParamSet (and from this subkey a shard
+is derived). An example row for encoded ParamSet ",0=1,1=3,3=0," on tile 0 (assume shard
+calculates to be 7) would be:
+
+ 07:ts:t:2147483646:,0=1,1=3,3=0,
+
+The cells are the int64 ids of the digest that were drawn according to that ParamSet.
+Blank cells will be read as id 0, which is hard-coded to belong to MISSING_DIGEST ("").
diff --git a/golden/go/tracestore/bt_tracestore/bt_tracestore.go b/golden/go/tracestore/bt_tracestore/bt_tracestore.go
new file mode 100644
index 0000000..22f0069
--- /dev/null
+++ b/golden/go/tracestore/bt_tracestore/bt_tracestore.go
@@ -0,0 +1,992 @@
+// Package bt_tracestore implements a tracestore backed by BigTable
+// See BIGTABLE.md for an overview of the schema and design.
+package bt_tracestore
+
+import (
+ "context"
+ "encoding/binary"
+ "hash/crc32"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "cloud.google.com/go/bigtable"
+ "go.skia.org/infra/go/bt"
+ "go.skia.org/infra/go/metrics2"
+ "go.skia.org/infra/go/paramtools"
+ "go.skia.org/infra/go/skerr"
+ "go.skia.org/infra/go/sklog"
+ "go.skia.org/infra/go/tiling"
+ "go.skia.org/infra/go/util"
+ "go.skia.org/infra/go/vcsinfo"
+ "go.skia.org/infra/golden/go/tracestore"
+ "go.skia.org/infra/golden/go/types"
+ "golang.org/x/sync/errgroup"
+)
+
+// InitBT initializes the BT instance for the given configuration. It uses the default way
+// to get auth information from the environment and must be called with an account that has
+// admin rights.
+func InitBT(conf BTConfig) error {
+ return bt.InitBigtable(conf.ProjectID, conf.InstanceID, conf.TableID, btColumnFamilies)
+}
+
+// BTConfig contains the configuration information for the BigTable-based implementation of
+// TraceStore.
+type BTConfig struct {
+ ProjectID string
+ InstanceID string
+ TableID string
+ VCS vcsinfo.VCS
+}
+
+// BTTraceStore implements the TraceStore interface.
+type BTTraceStore struct {
+ vcs vcsinfo.VCS
+ client *bigtable.Client
+ table *bigtable.Table
+
+ tileSize int32
+ shards int32
+
+ // if cacheOps is true, then cache the OrderedParamSets between calls
+ // where possible.
+ cacheOps bool
+ // maps rowName (string) -> *OpsCacheEntry
+ opsCache sync.Map
+
+ availIDsMutex sync.Mutex
+ availIDs []digestID
+}
+
+// New implements the TraceStore interface backed by BigTable. If cache is true,
+// the OrderedParamSets will be cached based on the row name.
+func New(ctx context.Context, conf BTConfig, cache bool) (*BTTraceStore, error) {
+ client, err := bigtable.NewClient(ctx, conf.ProjectID, conf.InstanceID)
+ if err != nil {
+ return nil, skerr.Fmt("could not instantiate client: %s", err)
+ }
+
+ ret := &BTTraceStore{
+ vcs: conf.VCS,
+ client: client,
+ tileSize: DefaultTileSize,
+ shards: DefaultShards,
+ table: client.Open(conf.TableID),
+ cacheOps: cache,
+ availIDs: []digestID{},
+ }
+ return ret, nil
+}
+
+// Put implements the TraceStore interface.
+func (b *BTTraceStore) Put(ctx context.Context, commitHash string, entries []*tracestore.Entry, ts time.Time) error {
+ defer metrics2.FuncTimer().Stop()
+ // if there are no entries this becomes a no-op.
+ if len(entries) == 0 {
+ return nil
+ }
+
+ // Accumulate all parameters into a paramset and collect all the digests.
+ paramSet := make(paramtools.ParamSet, len(entries[0].Params))
+ digestSet := make(types.DigestSet, len(entries))
+ for _, entry := range entries {
+ paramSet.AddParams(entry.Params)
+ digestSet[entry.Digest] = true
+ }
+
+ repoIndex, err := b.vcs.IndexOf(ctx, commitHash)
+ if err != nil {
+ return skerr.Fmt("could not look up commit %s: %s", commitHash, err)
+ }
+
+ // Find out what tile we need to fetch and what index into that tile we need.
+ // Reminder that tileKeys start at 2^32-1 and decrease in value.
+ tileKey, commitIndex := b.getTileKey(repoIndex)
+
+ // If these entries have any params we haven't seen before, we need to store those in BigTable.
+ ops, err := b.updateOrderedParamSet(ctx, tileKey, paramSet)
+ if err != nil {
+ sklog.Warningf("Bad paramset: %#v", paramSet)
+ return skerr.Fmt("cannot update paramset: %s", err)
+ }
+
+ // Similarly, if we have some new digests (almost certainly), we need to update
+ // the digestMap with them in there. Of note, we store this
+ // map of string (types.Digest) -> int64(DigestId) in big table, then refer to
+ // the DigestID elsewhere in the table. DigestIds are essentially a monotonically
+ // increasing arbitrary number.
+ digestMap, err := b.updateDigestMap(ctx, digestSet)
+ if err != nil {
+ sklog.Warningf("Bad digestSet: %#v", digestSet)
+ return skerr.Fmt("cannot update digest map: %s", err)
+ }
+
+ metrics2.GetInt64Metric("gold_digest_map_size").Update(int64(digestMap.Len()))
+
+ if len(digestMap.Delta(digestSet)) != 0 {
+ // Should never happen
+ return skerr.Fmt("delta should be empty at this point: %v", digestMap.Delta(digestSet))
+ }
+
+ // These are two parallel arrays. mutations[i] should be applied to rowNames[i] for all i.
+ rowNames, mutations, err := b.createPutMutations(entries, ts, tileKey, commitIndex, ops, digestMap)
+ if err != nil {
+ return skerr.Fmt("could not create mutations to put data: %s", err)
+ }
+
+ // Write the trace data. We pick a batchsize based on the assumption
+ // that the whole batch should be 2MB large and each entry is ~200 Bytes of data.
+ // 2MB / 200B = 10000. This is extremely conservative but should not be a problem
+ // since the batches are written in parallel.
+ return b.applyBulkBatched(ctx, rowNames, mutations, 10000)
+}
+
+// createPutMutations is a helper function that returns two parallel arrays of
+// the rows that need updating and the mutations to apply to those rows.
+// Specifically, the mutations will add the given entries to BT, clearing out
+// anything that was there previously.
+func (b *BTTraceStore) createPutMutations(entries []*tracestore.Entry, ts time.Time, tk tileKey, commitIndex int, ops *paramtools.OrderedParamSet, dm *digestMap) ([]string, []*bigtable.Mutation, error) {
+ // These mutations...
+ mutations := make([]*bigtable.Mutation, 0, len(entries))
+ // .. should be applied to these rows.
+ rowNames := make([]string, 0, len(entries))
+ btTS := bigtable.Time(ts)
+ before := bigtable.Time(ts.Add(-1 * time.Millisecond))
+
+ for _, entry := range entries {
+ // To save space, traceID isn't the long form tiling.TraceId
+ // (e.g. ,foo=bar,baz=gm,), it's a string of key-value numbers
+ // that refer to the params.(e.g. ,0=3,2=18,)
+ // See params.paramsEncoder
+ sTrace, err := ops.EncodeParamsAsString(entry.Params)
+ if err != nil {
+ return nil, nil, skerr.Fmt("invalid params: %s", err)
+ }
+ traceID := encodedTraceID(sTrace)
+
+ rowName := b.calcShardedRowName(tk, typeTrace, string(traceID))
+ rowNames = append(rowNames, rowName)
+
+ dID, err := dm.ID(entry.Digest)
+ if err != nil {
+ // this should never happen, the digest map should know about every digest already.
+ return nil, nil, skerr.Fmt("could not fetch id for digest %s: %s", entry.Digest, err)
+ }
+
+ // Create a mutation that puts the given digest at the given row
+ // (i.e. the trace combined with the tile), at the given column
+ // (i.e. the commit offset into this tile).
+ mut := bigtable.NewMutation()
+ column := strconv.Itoa(commitIndex)
+ dBytes, err := dID.MarshalBinary()
+ if err != nil {
+ // this should never happen, we are just marshalling an int to binary
+ return nil, nil, skerr.Fmt("could not encode digest id %d to bytes: %s", dID, err)
+ }
+ mut.Set(traceFamily, column, btTS, dBytes)
+ // Delete anything that existed at this cell before now.
+ mut.DeleteTimestampRange(traceFamily, column, 0, before)
+ mutations = append(mutations, mut)
+ }
+ return rowNames, mutations, nil
+}
+
+// GetTile implements the TraceStore interface.
+// Of note, due to this request possibly spanning over multiple tiles, the ParamsSet may have a
+// set of params that does not actually correspond to a trace (this shouldn't be a problem, but is
+// worth calling out). For example, suppose a trace with param " device=alpha" abruptly ends on
+// tile 4, commit 7 (where the device was removed from testing). If we are on tile 5 and need to
+// query both tile 4 starting at commit 10 and tile 5 (the whole thing), we'll just merge the
+// paramsets from both tiles, which includes the "device=alpha" params, but they don't exist in
+// any traces seen in the tile (since it ended prior to our cutoff point).
+func (b *BTTraceStore) GetTile(ctx context.Context, nCommits int) (*tiling.Tile, []*tiling.Commit, error) {
+ defer metrics2.FuncTimer().Stop()
+ // Look up the commits we need to query from BT
+ idxCommits := b.vcs.LastNIndex(nCommits)
+ if len(idxCommits) == 0 {
+ return nil, nil, skerr.Fmt("No commits found.")
+ }
+
+ // These commits could span across multiple tiles, so derive the tiles we need to query.
+ c := idxCommits[0]
+ startTileKey, startCommitIndex := b.getTileKey(c.Index)
+
+ c = idxCommits[len(idxCommits)-1]
+ endTileKey, endCommitIndex := b.getTileKey(c.Index)
+
+ var egroup errgroup.Group
+
+ var commits []*tiling.Commit
+ egroup.Go(func() error {
+ hashes := make([]string, 0, len(idxCommits))
+ for _, ic := range idxCommits {
+ hashes = append(hashes, ic.Hash)
+ }
+ var err error
+ commits, err = b.makeTileCommits(ctx, hashes)
+ if err != nil {
+ return skerr.Fmt("could not load tile commits: %s", err)
+ }
+ return nil
+ })
+
+ var traces traceMap
+ var params paramtools.ParamSet
+ egroup.Go(func() error {
+ var err error
+ traces, params, err = b.getTracesInRange(ctx, startTileKey, endTileKey, startCommitIndex, endCommitIndex)
+
+ if err != nil {
+ return skerr.Fmt("could not load tile commits: %s", err)
+ }
+ return nil
+ })
+
+ if err := egroup.Wait(); err != nil {
+ return nil, nil, skerr.Fmt("could not load last %d commits into tile: %s", nCommits, err)
+ }
+
+ ret := &tiling.Tile{
+ Traces: traces,
+ ParamSet: params,
+ Commits: commits,
+ Scale: 0,
+ }
+
+ return ret, commits, nil
+}
+
+// getTracesInRange returns a traceMap with data from the given start and stop points (tile and index).
+// It also includes the ParamSet for that range.
+func (b *BTTraceStore) getTracesInRange(ctx context.Context, startTileKey, endTileKey tileKey, startCommitIndex, endCommitIndex int) (traceMap, paramtools.ParamSet, error) {
+ // Query those tiles.
+ nTiles := int(startTileKey - endTileKey + 1)
+ nCommits := int(startTileKey-endTileKey)*int(b.tileSize) + (endCommitIndex - startCommitIndex) + 1
+ encTiles := make([]*encTile, nTiles)
+ var egroup errgroup.Group
+ tk := startTileKey
+ for idx := 0; idx < nTiles; idx++ {
+ func(idx int, tk tileKey) {
+ egroup.Go(func() error {
+ var err error
+ encTiles[idx], err = b.loadTile(ctx, tk)
+ if err != nil {
+ return skerr.Fmt("could not load tile with key %d to index %d: %s", tk, idx, err)
+ }
+ return nil
+ })
+ }(idx, tk)
+ tk--
+ }
+
+ var digestMap *digestMap
+ egroup.Go(func() error {
+ var err error
+ digestMap, err = b.getDigestMap(ctx)
+ if err != nil {
+ return skerr.Fmt("could not load digestMap: %s", err)
+ }
+ return nil
+ })
+
+ if err := egroup.Wait(); err != nil {
+ return nil, nil, skerr.Fmt("could not load %d tiles: %s", nTiles, err)
+ }
+
+ // This is the full tile we are going to return.
+ tileTraces := make(traceMap, len(encTiles[0].traces))
+ paramSet := paramtools.ParamSet{}
+
+ commitIDX := 0
+ for idx, encTile := range encTiles {
+ // Determine the offset within the tile that we should consider.
+ endOffset := int(b.tileSize - 1)
+ if idx == (len(encTiles) - 1) {
+ // If we are on the last tile, stop early (that is, at endCommitIndex)
+ endOffset = endCommitIndex
+ }
+ segLen := endOffset - startCommitIndex + 1
+
+ for encodedKey, encValues := range encTile.traces {
+ // at this point, the encodedKey looks like ,0=1,1=3,3=0,
+ // See params.paramsEncoder
+ params, err := encTile.ops.DecodeParamsFromString(string(encodedKey))
+ if err != nil {
+ sklog.Warningf("Incomplete OPS: %#v\n", encTile.ops)
+ return nil, nil, skerr.Fmt("corrupted trace key - could not decode %s: %s", encodedKey, err)
+ }
+
+ // Turn the params into the tiling.TraceId we expect elsewhere.
+ traceKey := tracestore.TraceIDFromParams(params)
+ if _, ok := tileTraces[traceKey]; !ok {
+ tileTraces[traceKey] = types.NewGoldenTraceN(nCommits)
+ }
+ gt := tileTraces[traceKey].(*types.GoldenTrace)
+ gt.Keys = params
+ // Build up the total set of params
+ paramSet.AddParams(params)
+
+ // Convert the digests from integer IDs to strings.
+ digestIDs := encValues[startCommitIndex : startCommitIndex+segLen]
+ digests, err := digestMap.DecodeIDs(digestIDs)
+ if err != nil {
+ return nil, nil, skerr.Fmt("corrupted digest id - could not decode: %s", err)
+ }
+ copy(gt.Digests[commitIDX:commitIDX+segLen], digests)
+ }
+
+ // After the first tile we always start at the first entry and advance the
+ // overall commit index by the segment length.
+ commitIDX += segLen
+ startCommitIndex = 0
+ }
+
+ // Sort the params for determinism.
+ paramSet.Normalize()
+
+ return tileTraces, paramSet, nil
+}
+
+// GetDenseTile implements the TraceStore interface. It fetches the most recent tile and sees if
+// there is enough non-empty data, then queries the next oldest tile until it has nCommits
+// non-empty commits.
+func (b *BTTraceStore) GetDenseTile(ctx context.Context, nCommits int) (*tiling.Tile, []*tiling.Commit, error) {
+ defer metrics2.FuncTimer().Stop()
+ // Figure out what index we are on.
+ idxCommits := b.vcs.LastNIndex(1)
+ if len(idxCommits) == 0 {
+ return nil, nil, skerr.Fmt("No commits found.")
+ }
+
+ c := idxCommits[0]
+ tKey, endIdx := b.getTileKey(c.Index)
+ tileStartCommitIdx := c.Index - endIdx
+
+ // commitsWithData is a slice of indexes of commits that have data. These indexes are
+ // relative to the repo itself, with index 0 being the first (oldest) commit in the repo.
+ commitsWithData := make([]int, 0, nCommits)
+ paramSet := paramtools.ParamSet{}
+ allTraces := traceMap{}
+
+ // Start at the most recent tile and step backwards until we have enough commits with data.
+ for {
+ traces, params, err := b.getTracesInRange(ctx, tKey, tKey, 0, endIdx)
+
+ if err != nil {
+ return nil, nil, skerr.Fmt("could not load commits from tile %d: %s", tKey, err)
+ }
+
+ paramSet.AddParamSet(params)
+ // filledCommits are the indexes in the traces that have data.
+ // That is, they are the indexes of commits in this tile.
+ // It will be sorted from low indexes to high indexes
+ filledCommits := traces.CommitIndicesWithData()
+
+ if len(filledCommits)+len(commitsWithData) > nCommits {
+ targetLength := nCommits - len(commitsWithData)
+ // trim filledCommits so we get to exactly nCommits
+ filledCommits = filledCommits[len(filledCommits)-targetLength:]
+ }
+
+ for _, tileIdx := range filledCommits {
+ commitsWithData = append(commitsWithData, tileStartCommitIdx+tileIdx)
+ }
+ cTraces := traces.MakeFromCommitIndexes(filledCommits)
+ allTraces.PrependTraces(cTraces)
+
+ if len(commitsWithData) >= nCommits || tKey == tileKeyFromIndex(0) {
+ break
+ }
+
+ tKey++ // go backwards in time one tile
+ endIdx = int(b.tileSize - 1) // fetch the whole previous tile
+ tileStartCommitIdx -= int(b.tileSize)
+ }
+
+ if len(commitsWithData) == 0 {
+ return &tiling.Tile{}, nil, nil
+ }
+ // put them in oldest to newest order
+ sort.Ints(commitsWithData)
+
+ oldestIdx := commitsWithData[0]
+ oldestCommit, err := b.vcs.ByIndex(ctx, oldestIdx)
+ if err != nil {
+ return nil, nil, skerr.Fmt("invalid oldest index %d: %s", oldestIdx, err)
+ }
+ hashes := b.vcs.From(oldestCommit.Timestamp.Add(-1 * time.Millisecond))
+
+ // There's no guarantee that hashes[0] == oldestCommit[0] (e.g. two commits at same timestamp)
+ // So we trim hashes down if necessary
+ for i := 0; i < len(hashes); i++ {
+ if hashes[i] == oldestCommit.Hash {
+ hashes = hashes[i:]
+ break
+ }
+ }
+
+ allCommits, err := b.makeTileCommits(ctx, hashes)
+ if err != nil {
+ return nil, nil, skerr.Fmt("could not make tile commits: %s", err)
+ }
+
+ denseCommits := make([]*tiling.Commit, len(commitsWithData))
+ for i, idx := range commitsWithData {
+ denseCommits[i] = allCommits[idx-oldestIdx]
+ }
+
+ ret := &tiling.Tile{
+ Traces: allTraces,
+ ParamSet: paramSet,
+ Commits: denseCommits,
+ Scale: 0,
+ }
+ return ret, allCommits, nil
+}
+
+// getTileKey retrieves the tile key and the index of the commit in the given tile (commitIndex)
+// given the index of a commit in the repo (repoIndex).
+// commitIndex starts at 0 for the oldest commit in the tile.
+func (b *BTTraceStore) getTileKey(repoIndex int) (tileKey, int) {
+ tileIndex := int32(repoIndex) / b.tileSize
+ commitIndex := repoIndex % int(b.tileSize)
+ return tileKeyFromIndex(tileIndex), commitIndex
+}
+
+// loadTile returns an *encTile corresponding to the tileKey.
+func (b *BTTraceStore) loadTile(ctx context.Context, tileKey tileKey) (*encTile, error) {
+ defer metrics2.FuncTimer().Stop()
+ var egroup errgroup.Group
+
+ // Load the OrderedParamSet so the caller can decode the data from the tile.
+ var ops *paramtools.OrderedParamSet
+ egroup.Go(func() error {
+ opsEntry, _, err := b.getOPS(ctx, tileKey)
+ if err != nil {
+ return skerr.Fmt("could not load OPS: %s", err)
+ }
+ ops = opsEntry.ops
+ return nil
+ })
+
+ var traces map[encodedTraceID][]digestID
+ egroup.Go(func() error {
+ var err error
+ traces, err = b.loadEncodedTraces(ctx, tileKey)
+ if err != nil {
+ return skerr.Fmt("could not load traces: %s", err)
+ }
+ return nil
+ })
+
+ if err := egroup.Wait(); err != nil {
+ return nil, err
+ }
+
+ return &encTile{
+ ops: ops,
+ traces: traces,
+ }, nil
+}
+
+// loadEncodedTraces returns all traces belonging to the given tileKey.
+// As outlined in BIGTABLE.md, the trace ids and the digest ids they
+// map to are in an encoded form and will need to be expanded prior to use.
+func (b *BTTraceStore) loadEncodedTraces(ctx context.Context, tileKey tileKey) (map[encodedTraceID][]digestID, error) {
+ defer metrics2.FuncTimer().Stop()
+ var egroup errgroup.Group
+ shardResults := make([]map[encodedTraceID][]digestID, b.shards)
+ traceCount := int64(0)
+
+ // Query all shards in parallel.
+ for shard := int32(0); shard < b.shards; shard++ {
+ func(shard int32) {
+ egroup.Go(func() error {
+ // This prefix will match all traces belonging to the
+ // current shard in the current tile.
+ prefixRange := bigtable.PrefixRange(shardedRowName(shard, typeTrace, tileKey, ""))
+ target := map[encodedTraceID][]digestID{}
+ shardResults[shard] = target
+ var parseErr error
+ err := b.table.ReadRows(ctx, prefixRange, func(row bigtable.Row) bool {
+ // The encoded trace id is the "subkey" part of the row name.
+ traceKey := encodedTraceID(extractSubkey(row.Key()))
+ // If this is the first time we've seen the trace, initialize the
+ // slice of digest ids for it.
+ if _, ok := target[traceKey]; !ok {
+ target[traceKey] = make([]digestID, b.tileSize)
+ atomic.AddInt64(&traceCount, 1)
+ }
+
+ for _, col := range row[traceFamily] {
+ // The columns are something like T:35 where the part
+ // after the colon is the commitIndex i.e. the index
+ // of this commit in the current tile.
+ idx, err := strconv.Atoi(strings.TrimPrefix(col.Column, traceFamilyPrefix))
+ if err != nil {
+ // Should never happen
+ parseErr = err
+ return false
+ }
+ var dID digestID
+ if err := dID.UnmarshalBinary(col.Value); err != nil {
+ // This should never happen
+ parseErr = err
+ return false
+ }
+ if idx < 0 || idx >= int(b.tileSize) {
+ // This would happen if the tile size changed from a past
+ // value. It shouldn't be changed, even if the Gold tile size
+ // (n_commits) changes.
+ parseErr = skerr.Fmt("got index %d that is outside of the target slice of length %d", idx, len(target))
+ return false
+ }
+ target[traceKey][idx] = dID
+ }
+ return true
+ }, bigtable.RowFilter(bigtable.LatestNFilter(1)))
+ if err != nil {
+ return skerr.Fmt("could not read rows: %s", err)
+ }
+ return parseErr
+ })
+ }(shard)
+ }
+
+ if err := egroup.Wait(); err != nil {
+ return nil, err
+ }
+
+ // Merge all the results together
+ ret := make(map[encodedTraceID][]digestID, traceCount)
+ for _, r := range shardResults {
+ for traceKey, digestIDs := range r {
+ // different shards should never share results for a tracekey
+ // since a trace always maps to the same shard.
+ ret[traceKey] = digestIDs
+ }
+ }
+
+ return ret, nil
+}
+
+// applyBulkBatched writes the given rowNames/mutation pairs to BigTable in batches that are
+// maximally of size 'batchSize'. The batches are written in parallel.
+func (b *BTTraceStore) applyBulkBatched(ctx context.Context, rowNames []string, mutations []*bigtable.Mutation, batchSize int) error {
+
+ var egroup errgroup.Group
+ err := util.ChunkIter(len(rowNames), batchSize, func(chunkStart, chunkEnd int) error {
+ egroup.Go(func() error {
+ tctx, cancel := context.WithTimeout(ctx, writeTimeout)
+ defer cancel()
+ rowNames := rowNames[chunkStart:chunkEnd]
+ mutations := mutations[chunkStart:chunkEnd]
+ errs, err := b.table.ApplyBulk(tctx, rowNames, mutations)
+ if err != nil {
+ return skerr.Fmt("error writing batch [%d:%d]: %s", chunkStart, chunkEnd, err)
+ }
+ if errs != nil {
+ return skerr.Fmt("error writing some portions of batch [%d:%d]: %s", chunkStart, chunkEnd, errs)
+ }
+ return nil
+ })
+ return nil
+ })
+ if err != nil {
+ return skerr.Fmt("error running ChunkIter: %s", err)
+ }
+ return egroup.Wait()
+}
+
+// calcShardedRowName deterministically assigns a shard for the given subkey (e.g. traceID)
+// Once this is done, the shard, rowtype, tileKey and the subkey are combined into a
+// single string to be used as a row name in BT.
+func (b *BTTraceStore) calcShardedRowName(tileKey tileKey, rowType, subkey string) string {
+ shard := int32(crc32.ChecksumIEEE([]byte(subkey)) % uint32(b.shards))
+ return shardedRowName(shard, rowType, tileKey, subkey)
+}
+
+// To avoid having one monolithic row, we take the first three characters of the digest
+// and use it as a subkey in the row. Then, what remains is used as the column name.
+// In practice this means our digests will be split using three hexadecimal characters, so
+// we will have 16^3 = 4096 rows for our digest map.
+func (b *BTTraceStore) rowAndColNameFromDigest(digest types.Digest) (string, string) {
+ subkey := string(digest[:3])
+ colName := string(digest[3:])
+ return b.calcShardedRowName(digestMapTile, typeDigestMap, subkey), colName
+}
+
+// getDigestMap gets the global (i.e. same for all tiles) digestMap.
+func (b *BTTraceStore) getDigestMap(ctx context.Context) (*digestMap, error) {
+ defer metrics2.FuncTimer().Stop()
+ // Query all shards in parallel.
+ var egroup errgroup.Group
+ shardResults := make([]map[types.Digest]digestID, b.shards)
+ total := int64(0)
+ for shard := int32(0); shard < b.shards; shard++ {
+ func(shard int32) {
+ egroup.Go(func() error {
+ prefRange := bigtable.PrefixRange(shardedRowName(shard, typeDigestMap, digestMapTile, ""))
+ var idx int64
+ var parseErr error = nil
+ ret := map[types.Digest]digestID{}
+ err := b.table.ReadRows(ctx, prefRange, func(row bigtable.Row) bool {
+ digestPrefix := extractSubkey(row.Key())
+ for _, col := range row[digestMapFamily] {
+ idx, parseErr = strconv.ParseInt(string(col.Value), 10, 64)
+ if parseErr != nil {
+ // Should never happen
+ return false
+ }
+ digest := types.Digest(digestPrefix + strings.TrimPrefix(col.Column, digestMapFamilyPrefix))
+ ret[digest] = digestID(idx)
+ }
+ return true
+ }, bigtable.RowFilter(bigtable.LatestNFilter(1)))
+
+ if err != nil {
+ return skerr.Fmt("problem fetching shard %d of digestmap: %s", shard, err)
+ }
+ if parseErr != nil {
+ return parseErr
+ }
+
+ shardResults[shard] = ret
+ atomic.AddInt64(&total, int64(len(ret)))
+ return nil
+ })
+ }(shard)
+ }
+ if err := egroup.Wait(); err != nil {
+ return nil, skerr.Fmt("problem fetching digestmap: %s", err)
+ }
+
+ ret := newDigestMap(int(total))
+ for _, dm := range shardResults {
+ if err := ret.Add(dm); err != nil {
+ // put the digest map latter in case it gets truncated
+ return nil, skerr.Fmt("could not build DigestMap: %s \nresults %#v", err, dm)
+ }
+ }
+ return ret, nil
+}
+
+// getIDs returns a []DigestID of length n where each of the
+// digestIDs are unique (even between processes).
+func (b *BTTraceStore) getIDs(ctx context.Context, n int) ([]digestID, error) {
+ defer metrics2.FuncTimer().Stop()
+ // Extract up to n ids from those we have already cached.
+ b.availIDsMutex.Lock()
+ defer b.availIDsMutex.Unlock()
+ toExtract := util.MinInt(len(b.availIDs), n)
+
+ ids := make([]digestID, 0, n)
+ ids = append(ids, b.availIDs[:toExtract]...)
+ b.availIDs = b.availIDs[toExtract:]
+
+ // missing is how many ids we are short
+ missing := int64(n - len(ids))
+ if missing == 0 {
+ return ids, nil
+ }
+ // For performance reasons, make a few big requests for ids instead of many small ones.
+ // That is, always request numReservedIds extra.
+ toRequest := missing + numReservedIds
+ // Reserve new IDs via the ID counter
+ rmw := bigtable.NewReadModifyWrite()
+ rmw.Increment(idCounterFamily, idCounterColumn, toRequest)
+ row, err := b.table.ApplyReadModifyWrite(ctx, idCounterRow, rmw)
+ if err != nil {
+ return nil, skerr.Fmt("could not fetch counter from BT: %s", err)
+ }
+
+ // ri are the cells in Row of the given counter family
+ // This should be 1 cell belonging to 1 column.
+ ri, ok := row[idCounterFamily]
+ if !ok {
+ // should never happen
+ return nil, skerr.Fmt("malformed response - no id counter family: %#v", ri)
+ }
+ if len(ri) != 1 {
+ // should never happen
+ return nil, skerr.Fmt("malformed response - expected 1 cell: %#v", ri)
+ }
+
+ maxID := digestID(binary.BigEndian.Uint64(ri[0].Value))
+
+ lastID := maxID - digestID(toRequest)
+ // ID of 0 is a special case - it's already assigned to MISSING_DIGEST, so skip it.
+ if lastID == missingDigestID {
+ lastID++
+ }
+ for i := lastID; i < maxID; i++ {
+ // Give the first ids to the current allocation request...
+ if missing > 0 {
+ ids = append(ids, i)
+ } else {
+ // ... and put the remainder in the store for later.
+ b.availIDs = append(b.availIDs, i)
+ }
+ missing--
+ }
+
+ return ids, nil
+}
+
+// returnIDs can be called with a []DigestID of ids that were not actually
+// assigned to digests. This allows them to be used by future requests to
+// getIDs.
+func (b *BTTraceStore) returnIDs(unusedIDs []digestID) {
+ b.availIDsMutex.Lock()
+ defer b.availIDsMutex.Unlock()
+ b.availIDs = append(b.availIDs, unusedIDs...)
+}
+
+// getOrAddDigests fills the given digestMap with the given digests
+// assigned to a DigestID if they don't already have an assignment.
+// This is a helper function for updateDigestMap
+// TODO(kjlubick): This currently makes a lot of requests to BT -
+// Should there be some caching done here to prevent that?
+func (b *BTTraceStore) getOrAddDigests(ctx context.Context, digests []types.Digest, digestMap *digestMap) (*digestMap, error) {
+ defer metrics2.FuncTimer().Stop()
+ availIDs, err := b.getIDs(ctx, len(digests))
+ if err != nil {
+ return nil, err
+ }
+
+ now := bigtable.Time(time.Now())
+ newIDMapping := make(map[types.Digest]digestID, len(digests))
+ unusedIDs := make([]digestID, 0, len(availIDs))
+ for idx, digest := range digests {
+ idVal := availIDs[idx]
+ if _, err := digestMap.ID(digest); err == nil {
+ // digestMap already has a mapping for this digest, no need to check
+ // if BT has seen it yet (because it has).
+ // Should never happen because we we've already done this check in updateDigestMap.
+ unusedIDs = append(unusedIDs, idVal)
+ continue
+ }
+ rowName, colName := b.rowAndColNameFromDigest(digest)
+ // This mutation says "Add an entry to the map for digest -> idVal iff
+ // the digest doesn't already have a mapping".
+ addMut := bigtable.NewMutation()
+ addMut.Set(digestMapFamily, colName, now, []byte(strconv.FormatInt(int64(idVal), 10)))
+ filter := bigtable.ColumnFilter(colName)
+ // Note that we only add the value if filter is false, i.e. the column does not
+ // already exist.
+ condMut := bigtable.NewCondMutation(filter, nil, addMut)
+ var digestAlreadyHadId bool
+ if err := b.table.Apply(ctx, rowName, condMut, bigtable.GetCondMutationResult(&digestAlreadyHadId)); err != nil {
+ return nil, skerr.Fmt("could not check if row %s col %s already had a DigestID: %s", rowName, colName, err)
+ }
+
+ // We didn't need this ID so let's re-use it later.
+ if digestAlreadyHadId {
+ unusedIDs = append(unusedIDs, idVal)
+ } else {
+ newIDMapping[digest] = idVal
+ }
+ }
+
+ // If all ids were added to BT, then we know our newIDMapping can simply be added
+ // to what we already have, since there were no collisions between digests and what
+ // was in the table already.
+ if len(unusedIDs) == 0 {
+ if err := digestMap.Add(newIDMapping); err != nil {
+ return nil, err
+ }
+ return digestMap, nil
+ }
+ // At this point, some of the digests already had ids, so we should reload
+ // the entire digestMap to make sure we have the full picture.
+ // TODO(kjlubick): Can we not just add what new ones we saw to what we already have?
+
+ // Return the unused IDs for later use.
+ b.returnIDs(unusedIDs)
+ return b.getDigestMap(ctx)
+}
+
+// updateDigestMap returns the current global DigestMap after making sure the given
+// digests are a part of it.
+func (b *BTTraceStore) updateDigestMap(ctx context.Context, digests types.DigestSet) (*digestMap, error) {
+ defer metrics2.FuncTimer().Stop()
+ // Load the digest map from BT.
+ // TODO(kjlubick): should we cache this map and first check to see if the digests
+ // are all in there?
+ digestMap, err := b.getDigestMap(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ delta := digestMap.Delta(digests)
+ if len(delta) == 0 {
+ return digestMap, nil
+ }
+
+ return b.getOrAddDigests(ctx, delta, digestMap)
+}
+
+// Copied from btts.go in infra/perf
+
+// UpdateOrderedParamSet will add all params from 'p' to the OrderedParamSet
+// for 'tileKey' and write it back to BigTable.
+func (b *BTTraceStore) updateOrderedParamSet(ctx context.Context, tileKey tileKey, p paramtools.ParamSet) (*paramtools.OrderedParamSet, error) {
+ defer metrics2.FuncTimer().Stop()
+
+ tctx, cancel := context.WithTimeout(ctx, writeTimeout)
+ defer cancel()
+ var newEntry *opsCacheEntry
+ for {
+ // Get OPS.
+ entry, existsInBT, err := b.getOPS(ctx, tileKey)
+ if err != nil {
+ return nil, skerr.Fmt("failed to get OPS: %s", err)
+ }
+
+ // If the OPS contains our paramset then we're done.
+ if delta := entry.ops.Delta(p); len(delta) == 0 {
+ return entry.ops, nil
+ }
+
+ // Create a new updated ops.
+ ops := entry.ops.Copy()
+ ops.Update(p)
+ newEntry, err = opsCacheEntryFromOPS(ops)
+ if err != nil {
+ return nil, skerr.Fmt("failed to create cache entry: %s", err)
+ }
+ encodedOps, err := newEntry.ops.Encode()
+ if err != nil {
+ return nil, skerr.Fmt("failed to encode new ops: %s", err)
+ }
+
+ now := bigtable.Time(time.Now())
+ condTrue := false
+ if existsInBT {
+ // Create an update that avoids the lost update problem.
+ cond := bigtable.ChainFilters(
+ bigtable.LatestNFilter(1),
+ bigtable.FamilyFilter(opsFamily),
+ bigtable.ColumnFilter(opsHashColumn),
+ bigtable.ValueFilter(string(entry.hash)),
+ )
+ updateMutation := bigtable.NewMutation()
+ updateMutation.Set(opsFamily, opsHashColumn, now, []byte(newEntry.hash))
+ updateMutation.Set(opsFamily, opsOpsColumn, now, encodedOps)
+
+ // Add a mutation that cleans up old versions.
+ before := bigtable.Time(now.Time().Add(-1 * time.Second))
+ updateMutation.DeleteTimestampRange(opsFamily, opsHashColumn, 0, before)
+ updateMutation.DeleteTimestampRange(opsFamily, opsOpsColumn, 0, before)
+ condUpdate := bigtable.NewCondMutation(cond, updateMutation, nil)
+
+ if err := b.table.Apply(tctx, tileKey.OpsRowName(), condUpdate, bigtable.GetCondMutationResult(&condTrue)); err != nil {
+ sklog.Warningf("Failed to apply: %s", err)
+ return nil, err
+ }
+
+ // If !condTrue then we need to try again,
+ // and clear our local cache.
+ if !condTrue {
+ sklog.Warningf("Exists !condTrue - clearing cache and trying again.")
+ b.opsCache.Delete(tileKey.OpsRowName())
+ continue
+ }
+ } else {
+ // Create an update that only works if the ops entry doesn't exist yet.
+ // I.e. only apply the mutation if the HASH column doesn't exist for this row.
+ cond := bigtable.ChainFilters(
+ bigtable.FamilyFilter(opsFamily),
+ bigtable.ColumnFilter(opsHashColumn),
+ )
+ updateMutation := bigtable.NewMutation()
+ updateMutation.Set(opsFamily, opsHashColumn, now, []byte(newEntry.hash))
+ updateMutation.Set(opsFamily, opsOpsColumn, now, encodedOps)
+
+ condUpdate := bigtable.NewCondMutation(cond, nil, updateMutation)
+ if err := b.table.Apply(tctx, tileKey.OpsRowName(), condUpdate, bigtable.GetCondMutationResult(&condTrue)); err != nil {
+ sklog.Warningf("Failed to apply: %s", err)
+ // clear cache and try again
+ b.opsCache.Delete(tileKey.OpsRowName())
+ continue
+ }
+
+ // If condTrue then we need to try again,
+ // and clear our local cache.
+ if condTrue {
+ sklog.Warningf("First Write condTrue - clearing cache and trying again.")
+ b.opsCache.Delete(tileKey.OpsRowName())
+ continue
+ }
+ }
+
+ // Successfully wrote OPS, so update the cache.
+ if b.cacheOps {
+ b.opsCache.Store(tileKey.OpsRowName(), newEntry)
+ }
+ break
+ }
+ return newEntry.ops, nil
+}
+
+// getOps returns the OpsCacheEntry for a given tile.
+//
+// Note that it will create a new OpsCacheEntry if none exists.
+//
+// getOps returns false if the OPS in BT was empty, true otherwise (even if cached).
+func (b *BTTraceStore) getOPS(ctx context.Context, tileKey tileKey) (*opsCacheEntry, bool, error) {
+ defer metrics2.FuncTimer().Stop()
+ if b.cacheOps {
+ entry, ok := b.opsCache.Load(tileKey.OpsRowName())
+ if ok {
+ return entry.(*opsCacheEntry), true, nil
+ }
+ }
+ tctx, cancel := context.WithTimeout(ctx, readTimeout)
+ defer cancel()
+ row, err := b.table.ReadRow(tctx, tileKey.OpsRowName(), bigtable.RowFilter(bigtable.LatestNFilter(1)))
+ if err != nil {
+ return nil, false, skerr.Fmt("failed to read OPS from BigTable for %s: %s", tileKey.OpsRowName(), err)
+ }
+ // If there is no entry in BigTable then return an empty OPS.
+ if len(row) == 0 {
+ sklog.Warningf("Failed to read OPS from BT for %s.", tileKey.OpsRowName())
+ entry, err := newOpsCacheEntry()
+ return entry, false, err
+ }
+ entry, err := newOpsCacheEntryFromRow(row)
+ if err == nil && b.cacheOps {
+ b.opsCache.Store(tileKey.OpsRowName(), entry)
+ }
+ return entry, true, err
+}
+
+// makeTileCommits creates a slice of tiling.Commit from the given git hashes.
+// Specifically, we need to look up the details to get the author information.
+func (b *BTTraceStore) makeTileCommits(ctx context.Context, hashes []string) ([]*tiling.Commit, error) {
+ longCommits, err := b.vcs.DetailsMulti(ctx, hashes, false)
+ if err != nil {
+ // put hashes second in case they get truncated for being quite long.
+ return nil, skerr.Fmt("could not fetch commit data for commits %s (hashes: %q)", err, hashes)
+ }
+
+ commits := make([]*tiling.Commit, len(hashes))
+ for i, lc := range longCommits {
+ if lc == nil {
+ return nil, skerr.Fmt("commit %s not found from VCS", hashes[i])
+ }
+ commits[i] = &tiling.Commit{
+ Hash: lc.Hash,
+ Author: lc.Author,
+ CommitTime: lc.Timestamp.Unix(),
+ }
+ }
+ return commits, nil
+}
+
+// Make sure BTTraceStore fulfills the TraceStore Interface
+var _ tracestore.TraceStore = (*BTTraceStore)(nil)
diff --git a/golden/go/tracestore/bt_tracestore/bt_tracestore_test.go b/golden/go/tracestore/bt_tracestore/bt_tracestore_test.go
new file mode 100644
index 0000000..56f37cd
--- /dev/null
+++ b/golden/go/tracestore/bt_tracestore/bt_tracestore_test.go
@@ -0,0 +1,990 @@
+package bt_tracestore
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/mock"
+ assert "github.com/stretchr/testify/require"
+ "go.skia.org/infra/go/bt"
+ "go.skia.org/infra/go/deepequal"
+ "go.skia.org/infra/go/fileutil"
+ "go.skia.org/infra/go/gcs/gcs_testutils"
+ "go.skia.org/infra/go/sktest"
+ "go.skia.org/infra/go/testutils/unittest"
+ "go.skia.org/infra/go/tiling"
+ "go.skia.org/infra/go/util"
+ "go.skia.org/infra/go/vcsinfo"
+ mock_vcs "go.skia.org/infra/go/vcsinfo/mocks"
+ "go.skia.org/infra/golden/go/serialize"
+ data "go.skia.org/infra/golden/go/testutils/data_three_devices"
+ "go.skia.org/infra/golden/go/tracestore"
+ "go.skia.org/infra/golden/go/types"
+)
+
+// TestBTTraceStorePutGet adds a bunch of entries one at a time and
+// then retrieves the full tile.
+func TestBTTraceStorePutGet(t *testing.T) {
+ unittest.LargeTest(t)
+ unittest.RequiresBigTableEmulator(t)
+
+ commits := data.MakeTestCommits()
+ mvcs := MockVCSWithCommits(commits, 0)
+ defer mvcs.AssertExpectations(t)
+
+ btConf := BTConfig{
+ ProjectID: "should-use-the-emulator",
+ InstanceID: "testinstance",
+ TableID: "three_devices_test",
+ VCS: mvcs,
+ }
+
+ assert.NoError(t, bt.DeleteTables(btConf.ProjectID, btConf.InstanceID, btConf.TableID))
+ assert.NoError(t, InitBT(btConf))
+
+ ctx := context.Background()
+ traceStore, err := New(ctx, btConf, true)
+ assert.NoError(t, err)
+
+ // With no data, we should get an empty tile
+ actualTile, _, err := traceStore.GetTile(ctx, len(commits))
+ assert.NoError(t, err)
+ assert.NotNil(t, actualTile)
+ assert.Empty(t, actualTile.Traces)
+
+ // This time is an arbitrary point in time
+ now := time.Date(2019, time.May, 5, 1, 3, 4, 0, time.UTC)
+
+ // Build a tile up from the individual data points, one at a time
+ traces := data.MakeTestTile().Traces
+ for _, trace := range traces {
+ gTrace, ok := trace.(*types.GoldenTrace)
+ assert.True(t, ok)
+
+ // Put them in backwards, just to test that order doesn't matter
+ for i := len(gTrace.Digests) - 1; i >= 0; i-- {
+ e := tracestore.Entry{
+ Digest: gTrace.Digests[i],
+ Params: gTrace.Keys,
+ }
+ err := traceStore.Put(ctx, commits[i].Hash, []*tracestore.Entry{&e}, now)
+ assert.NoError(t, err)
+ // roll forward the clock by an arbitrary amount of time
+ now = now.Add(7 * time.Second)
+ }
+ }
+
+ // Get the tile back and make sure it exactly matches the tile
+ // we hand-crafted for the test data.
+ actualTile, actualCommits, err := traceStore.GetTile(ctx, len(commits))
+ assert.NoError(t, err)
+
+ assert.Equal(t, data.MakeTestTile(), actualTile)
+ assert.Equal(t, commits, actualCommits)
+}
+
+// TestBTTraceStorePutGetSpanTile is like TestBTTraceStorePutGet except the 3 commits
+// are lined up to go across two tiles.
+func TestBTTraceStorePutGetSpanTile(t *testing.T) {
+ unittest.LargeTest(t)
+ unittest.RequiresBigTableEmulator(t)
+
+ commits := data.MakeTestCommits()
+ mvcs := MockVCSWithCommits(commits, DefaultTileSize-2)
+ defer mvcs.AssertExpectations(t)
+
+ btConf := BTConfig{
+ ProjectID: "should-use-the-emulator",
+ InstanceID: "testinstance",
+ TableID: "three_devices_test_span",
+ VCS: mvcs,
+ }
+
+ assert.NoError(t, bt.DeleteTables(btConf.ProjectID, btConf.InstanceID, btConf.TableID))
+ assert.NoError(t, InitBT(btConf))
+
+ ctx := context.Background()
+ traceStore, err := New(ctx, btConf, true)
+ assert.NoError(t, err)
+
+ // With no data, we should get an empty tile
+ actualTile, _, err := traceStore.GetTile(ctx, len(commits))
+ assert.NoError(t, err)
+ assert.NotNil(t, actualTile)
+ assert.Empty(t, actualTile.Traces)
+
+ // This time is an arbitrary point in time
+ now := time.Date(2019, time.May, 5, 1, 3, 4, 0, time.UTC)
+
+ // Build a tile up from the individual data points, one at a time
+ traces := data.MakeTestTile().Traces
+ for _, trace := range traces {
+ gTrace, ok := trace.(*types.GoldenTrace)
+ assert.True(t, ok)
+
+ // Put them in backwards, just to test that order doesn't matter
+ for i := len(gTrace.Digests) - 1; i >= 0; i-- {
+ e := tracestore.Entry{
+ Digest: gTrace.Digests[i],
+ Params: gTrace.Keys,
+ }
+ err := traceStore.Put(ctx, commits[i].Hash, []*tracestore.Entry{&e}, now)
+ assert.NoError(t, err)
+ // roll forward the clock by an arbitrary amount of time
+ now = now.Add(7 * time.Second)
+ }
+ }
+
+ // Get the tile back and make sure it exactly matches the tile
+ // we hand-crafted for the test data.
+ actualTile, actualCommits, err := traceStore.GetTile(ctx, len(commits))
+ assert.NoError(t, err)
+
+ assert.Equal(t, data.MakeTestTile(), actualTile)
+ assert.Equal(t, commits, actualCommits)
+}
+
+// TestBTTraceStorePutGetGrouped adds a bunch of entries batched by device and
+// then retrieves the full Tile.
+func TestBTTraceStorePutGetGrouped(t *testing.T) {
+ unittest.LargeTest(t)
+ unittest.RequiresBigTableEmulator(t)
+
+ commits := data.MakeTestCommits()
+ mvcs := MockVCSWithCommits(commits, 0)
+ defer mvcs.AssertExpectations(t)
+
+ btConf := BTConfig{
+ ProjectID: "should-use-the-emulator",
+ InstanceID: "testinstance",
+ TableID: "three_devices_test_grouped",
+ VCS: mvcs,
+ }
+
+ assert.NoError(t, bt.DeleteTables(btConf.ProjectID, btConf.InstanceID, btConf.TableID))
+ assert.NoError(t, InitBT(btConf))
+
+ ctx := context.Background()
+ traceStore, err := New(ctx, btConf, true)
+ assert.NoError(t, err)
+
+ // With no data, we should get an empty tile
+ actualTile, _, err := traceStore.GetTile(ctx, len(commits))
+ assert.NoError(t, err)
+ assert.NotNil(t, actualTile)
+ assert.Empty(t, actualTile.Traces)
+
+ // Build a tile up from the individual data points, one at a time
+ now := time.Date(2019, time.May, 5, 1, 3, 4, 0, time.UTC)
+
+ // Group the traces by device, so we should have 3 groups of 2 traces.
+ traces := data.MakeTestTile().Traces
+ byDevice := map[string][]*types.GoldenTrace{
+ data.AnglerDevice: nil,
+ data.BullheadDevice: nil,
+ data.CrosshatchDevice: nil,
+ }
+ for _, trace := range traces {
+ gTrace, ok := trace.(*types.GoldenTrace)
+ assert.True(t, ok)
+ assert.Len(t, gTrace.Digests, len(commits), "test data should have one digest per commit")
+ dev := gTrace.Keys["device"]
+ byDevice[dev] = append(byDevice[dev], gTrace)
+ }
+ assert.Len(t, byDevice, 3, "test data should have exactly 3 devices")
+
+ // for each trace, report a group of two digests for each commit.
+ for dev, gTraces := range byDevice {
+ assert.Len(t, gTraces, 2, "test data for %s should have exactly 2 traces", dev)
+
+ for i := 0; i < len(commits); i++ {
+ var entries []*tracestore.Entry
+ for _, gTrace := range gTraces {
+ entries = append(entries, &tracestore.Entry{
+ Digest: gTrace.Digests[i],
+ Params: gTrace.Keys,
+ })
+ }
+
+ err = traceStore.Put(ctx, commits[i].Hash, entries, now)
+ assert.NoError(t, err)
+ // roll forward the clock by an arbitrary amount of time
+ now = now.Add(3 * time.Minute)
+ }
+ }
+
+ // Get the tile back and make sure it exactly matches the tile
+ // we hand-crafted for the test data.
+ actualTile, actualCommits, err := traceStore.GetTile(ctx, len(commits))
+ assert.NoError(t, err)
+
+ assert.Equal(t, data.MakeTestTile(), actualTile)
+ assert.Equal(t, commits, actualCommits)
+}
+
+// TestBTTraceStorePutGetThreaded is like TestBTTraceStorePutGet, just
+// with a bunch of reads/writes done in simultaneous go routines in
+// an effort to catch any race conditions.
+func TestBTTraceStorePutGetThreaded(t *testing.T) {
+ unittest.LargeTest(t)
+ unittest.RequiresBigTableEmulator(t)
+
+ commits := data.MakeTestCommits()
+ mvcs := MockVCSWithCommits(commits, 0)
+ defer mvcs.AssertExpectations(t)
+
+ btConf := BTConfig{
+ ProjectID: "should-use-the-emulator",
+ InstanceID: "testinstance",
+ TableID: "three_devices_test_threaded",
+ VCS: mvcs,
+ }
+
+ assert.NoError(t, bt.DeleteTables(btConf.ProjectID, btConf.InstanceID, btConf.TableID))
+ assert.NoError(t, InitBT(btConf))
+
+ ctx := context.Background()
+ traceStore, err := New(ctx, btConf, true)
+ assert.NoError(t, err)
+
+ wg := sync.WaitGroup{}
+ wg.Add(2)
+
+ now := time.Date(2019, time.May, 5, 1, 3, 4, 0, time.UTC)
+
+ readTile := func() {
+ defer wg.Done()
+ _, _, err := traceStore.GetTile(ctx, len(commits))
+ assert.NoError(t, err)
+ }
+ go readTile()
+
+ // Build a tile up from the individual data points, one at a time
+ traces := data.MakeTestTile().Traces
+ for _, trace := range traces {
+ gTrace, ok := trace.(*types.GoldenTrace)
+ assert.True(t, ok)
+
+ // Put them in backwards, just to test that order doesn't matter
+ for i := len(gTrace.Digests) - 1; i >= 0; i-- {
+ wg.Add(1)
+ go func(now time.Time, i int) {
+ defer wg.Done()
+ e := tracestore.Entry{
+ Digest: gTrace.Digests[i],
+ Params: gTrace.Keys,
+ }
+ err := traceStore.Put(ctx, commits[i].Hash, []*tracestore.Entry{&e}, now)
+ assert.NoError(t, err)
+ }(now, i)
+ now = now.Add(7 * time.Second)
+ }
+ }
+ go readTile()
+
+ wg.Wait()
+
+ // Get the tile back and make sure it exactly matches the tile
+ // we hand-crafted for the test data.
+ actualTile, actualCommits, err := traceStore.GetTile(ctx, len(commits))
+ assert.NoError(t, err)
+
+ assert.Equal(t, data.MakeTestTile(), actualTile)
+ assert.Equal(t, commits, actualCommits)
+}
+
+// TestBTTraceStoreGetDenseTile makes sure we get an empty tile
+func TestBTTraceStoreGetDenseTileEmpty(t *testing.T) {
+ unittest.LargeTest(t)
+ unittest.RequiresBigTableEmulator(t)
+
+ commits := data.MakeTestCommits()
+ realCommitIndices := []int{300, 501, 557}
+ totalCommits := 1101
+ mvcs, _ := MockSparseVCSWithCommits(commits, realCommitIndices, totalCommits)
+ defer mvcs.AssertExpectations(t)
+
+ btConf := BTConfig{
+ ProjectID: "should-use-the-emulator",
+ InstanceID: "testinstance",
+ TableID: "three_devices_test_dense_empty",
+ VCS: mvcs,
+ }
+
+ assert.NoError(t, bt.DeleteTables(btConf.ProjectID, btConf.InstanceID, btConf.TableID))
+ assert.NoError(t, InitBT(btConf))
+
+ ctx := context.Background()
+ traceStore, err := New(ctx, btConf, true)
+ assert.NoError(t, err)
+
+ // With no data, we should get an empty tile
+ actualTile, actualCommits, err := traceStore.GetDenseTile(ctx, len(commits))
+ assert.NoError(t, err)
+ assert.NotNil(t, actualTile)
+ assert.Empty(t, actualCommits)
+ assert.Empty(t, actualTile.Traces)
+
+}
+
+// TestBTTraceStoreGetDenseTile puts in a few data points sparsely spaced throughout
+// time and makes sure we can call GetDenseTile to get them condensed together
+// (i.e. with all the empty commits tossed out). It puts them in a variety of conditions
+// to try to identify any edge cases.
+func TestBTTraceStoreGetDenseTile(t *testing.T) {
+ unittest.LargeTest(t)
+ unittest.RequiresBigTableEmulator(t)
+
+ // 3 commits, arbitrarily spaced out across the last tile
+ commits := data.MakeTestCommits()
+ realCommitIndices := []int{795, 987, 1001}
+ totalCommits := (256 * 4) - 1
+ mvcs, lCommits := MockSparseVCSWithCommits(commits, realCommitIndices, totalCommits)
+ expectedTile := data.MakeTestTile()
+ testDenseTile(t, expectedTile, mvcs, commits, lCommits, realCommitIndices)
+
+ // 3 commits, arbitrarily spaced out across 3 tiles, with no data
+ // in the most recent tile
+ commits = data.MakeTestCommits()
+ realCommitIndices = []int{300, 501, 557}
+ totalCommits = 1101
+ mvcs, lCommits = MockSparseVCSWithCommits(commits, realCommitIndices, totalCommits)
+ expectedTile = data.MakeTestTile()
+ testDenseTile(t, expectedTile, mvcs, commits, lCommits, realCommitIndices)
+
+ // As above, just 2 commits
+ commits = data.MakeTestCommits()[1:]
+ realCommitIndices = []int{501, 557}
+ totalCommits = 1101
+ mvcs, lCommits = MockSparseVCSWithCommits(commits, realCommitIndices, totalCommits)
+ expectedTile = data.MakeTestTile()
+ expectedTile, err := expectedTile.Trim(1, 3)
+ assert.NoError(t, err)
+ testDenseTile(t, expectedTile, mvcs, commits, lCommits, realCommitIndices)
+
+ // All commits are on the first commit of their tile
+ commits = data.MakeTestCommits()
+ realCommitIndices = []int{0, 256, 512}
+ totalCommits = 1101
+ mvcs, lCommits = MockSparseVCSWithCommits(commits, realCommitIndices, totalCommits)
+ expectedTile = data.MakeTestTile()
+ testDenseTile(t, expectedTile, mvcs, commits, lCommits, realCommitIndices)
+
+ // All commits are on the last commit of their tile
+ commits = data.MakeTestCommits()
+ realCommitIndices = []int{255, 511, 767}
+ totalCommits = 1101
+ mvcs, lCommits = MockSparseVCSWithCommits(commits, realCommitIndices, totalCommits)
+ expectedTile = data.MakeTestTile()
+ testDenseTile(t, expectedTile, mvcs, commits, lCommits, realCommitIndices)
+
+ // Empty tiles between commits
+ commits = data.MakeTestCommits()
+ realCommitIndices = []int{50, 800, 1100}
+ totalCommits = 1101
+ mvcs, lCommits = MockSparseVCSWithCommits(commits, realCommitIndices, totalCommits)
+ expectedTile = data.MakeTestTile()
+ testDenseTile(t, expectedTile, mvcs, commits, lCommits, realCommitIndices)
+}
+
+// testDenseTile takes the data from tile, Puts it into BT, then pulls the tile given
+// the commit layout in VCS and returns it.
+func testDenseTile(t *testing.T, tile *tiling.Tile, mvcs *mock_vcs.VCS, commits []*tiling.Commit, lCommits []*vcsinfo.LongCommit, realCommitIndices []int) {
+ defer mvcs.AssertExpectations(t)
+
+ btConf := BTConfig{
+ ProjectID: "should-use-the-emulator",
+ InstanceID: "testinstance",
+ TableID: "three_devices_test_dense",
+ VCS: mvcs,
+ }
+
+ assert.NoError(t, bt.DeleteTables(btConf.ProjectID, btConf.InstanceID, btConf.TableID))
+ assert.NoError(t, InitBT(btConf))
+
+ ctx := context.Background()
+ traceStore, err := New(ctx, btConf, true)
+ assert.NoError(t, err)
+
+ // This time is an arbitrary point in time
+ now := time.Date(2019, time.May, 5, 1, 3, 4, 0, time.UTC)
+
+ // Build a tile up from the individual data points, one at a time
+ traces := tile.Traces
+ for _, trace := range traces {
+ gTrace, ok := trace.(*types.GoldenTrace)
+ assert.True(t, ok)
+
+ // Put them in backwards, just to test that order doesn't matter
+ for i := len(gTrace.Digests) - 1; i >= 0; i-- {
+ e := tracestore.Entry{
+ Digest: gTrace.Digests[i],
+ Params: gTrace.Keys,
+ }
+ err := traceStore.Put(ctx, commits[i].Hash, []*tracestore.Entry{&e}, now)
+ assert.NoError(t, err)
+ // roll forward the clock by an arbitrary amount of time
+ now = now.Add(7 * time.Second)
+ }
+ }
+
+ // Get the tile back and make sure it exactly matches the tile
+ // we hand-crafted for the test data.
+ actualTile, allCommits, err := traceStore.GetDenseTile(ctx, len(commits))
+ assert.NoError(t, err)
+ assert.Len(t, allCommits, len(lCommits)-realCommitIndices[0])
+
+ // In MockSparseVCSWithCommits, we change the time of the commits, so we need
+ // to update the expected times to match.
+ for i, c := range commits {
+ c.CommitTime = lCommits[realCommitIndices[i]].Timestamp.Unix()
+ }
+ tile.Commits = commits
+
+ assert.Equal(t, tile, actualTile)
+}
+
+// TestBTDigestMap tests the internal workings of storing the
+// DigestMap. See BIGTABLE.md for more about the schemas for
+// the DigestMap family and the id counter family.
+func TestBTDigestMap(t *testing.T) {
+ unittest.LargeTest(t)
+ unittest.RequiresBigTableEmulator(t)
+
+ btConf := BTConfig{
+ ProjectID: "should-use-the-emulator",
+ InstanceID: "testinstance",
+ TableID: "digest_map_test",
+ VCS: nil,
+ }
+
+ assert.NoError(t, bt.DeleteTables(btConf.ProjectID, btConf.InstanceID, btConf.TableID))
+ assert.NoError(t, InitBT(btConf))
+
+ ctx := context.Background()
+ traceStore, err := New(ctx, btConf, true)
+ assert.NoError(t, err)
+
+ dm, err := traceStore.getDigestMap(ctx)
+ assert.NoError(t, err)
+ assert.NotNil(t, dm)
+ // should be empty, except for the initial mapping.
+ assert.Equal(t, 1, dm.Len())
+ i, err := dm.ID(types.MISSING_DIGEST)
+ assert.NoError(t, err)
+ assert.Equal(t, missingDigestID, i)
+
+ digests := makeTestDigests(0, 100)
+
+ // add 90 of the 100 digests using update
+ ninety := make(types.DigestSet, 90)
+ for _, d := range digests[0:90] {
+ ninety[d] = true
+ }
+ dm, err = traceStore.updateDigestMap(ctx, ninety)
+ assert.NoError(t, err)
+ assert.NotNil(t, dm)
+ assert.Equal(t, 91, dm.Len())
+ // We can't check to see if our known digests map to
+ // a specific id because the digest map could present
+ // the digests in a non-deterministic order.
+ // We can spot check one of the ids though
+ _, err = dm.Digest(88)
+ assert.NoError(t, err)
+
+ ids, err := traceStore.getIDs(ctx, 3)
+ // The next 3 numbers should be 91, 92, 93 because they are
+ // monotonically increasing
+ assert.NoError(t, err)
+ assert.Equal(t, []digestID{91, 92, 93}, ids)
+ func() {
+ traceStore.availIDsMutex.Lock()
+ defer traceStore.availIDsMutex.Unlock()
+ assert.NotContains(t, traceStore.availIDs, digestID(92))
+ assert.NotContains(t, traceStore.availIDs, digestID(93))
+ assert.Contains(t, traceStore.availIDs, digestID(94))
+ }()
+
+ // give two ids back (pretend we used id 92)
+ traceStore.returnIDs([]digestID{91, 93})
+
+ func() {
+ traceStore.availIDsMutex.Lock()
+ defer traceStore.availIDsMutex.Unlock()
+ assert.NotContains(t, traceStore.availIDs, digestID(92))
+ assert.Contains(t, traceStore.availIDs, digestID(93))
+ assert.Contains(t, traceStore.availIDs, digestID(94))
+ }()
+
+ // call update with an overlap of new and old
+ twenty := make(types.DigestSet, 90)
+ for _, d := range digests[80:] {
+ twenty[d] = true
+ }
+ dm, err = traceStore.updateDigestMap(ctx, twenty)
+ assert.NoError(t, err)
+ assert.NotNil(t, dm)
+ assert.Equal(t, 101, dm.Len())
+
+ // Get it again and make sure it matches the last update phase.
+ dm2, err := traceStore.getDigestMap(ctx)
+ assert.NoError(t, err)
+ assert.NotNil(t, dm2)
+ assert.Equal(t, dm, dm2)
+
+ // Add a lot more digests to make sure the bulk requesting works
+ for i := 1; i < 10; i++ {
+ // 113 is an arbitrary prime number that does not divide batchIdRequest.
+ ds := make(types.DigestSet, 113)
+ ds.AddLists(makeTestDigests(113*i, 113))
+
+ dm, err := traceStore.updateDigestMap(ctx, ds)
+ assert.NoError(t, err)
+ assert.NotNil(t, dm)
+ }
+}
+
+// makeTestDigests returns n valid digests. These digests are easy
+// for humans to understand, as they are just the hex values [0, 99]
+// reversed and 0-padded to 32 chars long (a valid md5 hash).
+func makeTestDigests(start, n int) []types.Digest {
+ xd := make([]types.Digest, n)
+ for i := 0; i < n; i++ {
+ // Reverse them to exercise the prefixing of the digestMap.
+ s := util.ReverseString(fmt.Sprintf("%032x", start+i))
+ xd[i] = types.Digest(s)
+ }
+ return xd
+}
+
+// TestGetTileKey tests the internal workings of deriving a
+// tileKey from the commit index. See BIGTABLE.md for more.
+func TestGetTileKey(t *testing.T) {
+ unittest.LargeTest(t)
+ unittest.RequiresBigTableEmulator(t)
+
+ btConf := BTConfig{
+ // Leaving other things blank because we won't actually hit BT or use VCS.
+ }
+
+ ctx := context.Background()
+ traceStore, err := New(ctx, btConf, true)
+ assert.NoError(t, err)
+
+ type testStruct struct {
+ InputRepoIndex int
+
+ ExpectedKey tileKey
+ ExpectedIndex int
+ }
+ // test data is valid, but arbitrary.
+ tests := []testStruct{
+ {
+ InputRepoIndex: 0,
+ ExpectedKey: tileKey(2147483647),
+ ExpectedIndex: 0,
+ },
+ {
+ InputRepoIndex: 10,
+ ExpectedKey: tileKey(2147483647),
+ ExpectedIndex: 10,
+ },
+ {
+ InputRepoIndex: 300,
+ ExpectedKey: tileKey(2147483646),
+ ExpectedIndex: 44,
+ },
+ {
+ InputRepoIndex: 123456,
+ ExpectedKey: tileKey(2147483165),
+ ExpectedIndex: 64,
+ },
+ }
+
+ for _, test := range tests {
+ key, index := traceStore.getTileKey(test.InputRepoIndex)
+ assert.Equal(t, test.ExpectedKey, key)
+ assert.Equal(t, test.ExpectedIndex, index)
+ }
+}
+
+// TestCalcShardedRowName tests the internal workings of sharding
+// a given subkey.
+func TestCalcShardedRowName(t *testing.T) {
+ unittest.LargeTest(t)
+ unittest.RequiresBigTableEmulator(t)
+
+ btConf := BTConfig{
+ // Leaving other things blank because we won't actually hit BT
+ // or use the VCS.
+ }
+
+ ctx := context.Background()
+ traceStore, err := New(ctx, btConf, true)
+ assert.NoError(t, err)
+
+ type testStruct struct {
+ InputKey tileKey
+ InputRowType string
+ InputSubKey string
+
+ ExpectedRowName string
+ }
+ // test data is valid, but arbitrary.
+ tests := []testStruct{
+ {
+ InputKey: tileKey(2147483647),
+ InputRowType: typeTrace,
+ InputSubKey: ",0=1,1=3,3=0,",
+
+ ExpectedRowName: "09:ts:t:2147483647:,0=1,1=3,3=0,",
+ },
+ {
+ InputKey: tileKey(2147483647),
+ InputRowType: typeTrace,
+ InputSubKey: ",0=1,1=3,9=0,",
+
+ ExpectedRowName: "13:ts:t:2147483647:,0=1,1=3,9=0,",
+ },
+ {
+ InputKey: tileKey(2147483540),
+ InputRowType: typeDigestMap,
+ InputSubKey: "abc",
+
+ ExpectedRowName: "02:ts:d:2147483540:abc",
+ },
+ {
+ InputKey: tileKey(2147483540),
+ InputRowType: typeDigestMap,
+ InputSubKey: "bcd",
+
+ ExpectedRowName: "25:ts:d:2147483540:bcd",
+ },
+ }
+
+ for _, test := range tests {
+ row := traceStore.calcShardedRowName(test.InputKey, test.InputRowType, test.InputSubKey)
+ assert.Equal(t, test.ExpectedRowName, row)
+ }
+}
+
+// TestRowAndColNameFromDigest tests the internal workings of sharding
+// a digest for use in the digest map
+func TestRowAndColNameFromDigest(t *testing.T) {
+ unittest.LargeTest(t)
+ unittest.RequiresBigTableEmulator(t)
+
+ btConf := BTConfig{
+ // Leaving other things blank because we won't actually hit BT
+ // or use the VCS.
+ }
+
+ ctx := context.Background()
+ traceStore, err := New(ctx, btConf, true)
+ assert.NoError(t, err)
+
+ type testStruct struct {
+ InputDigest types.Digest
+
+ ExpectedRowName string
+ ExpectedColName string
+ }
+ // test data is valid, but arbitrary.
+ tests := []testStruct{
+ {
+ InputDigest: types.Digest("9e1d402515193304cafbdf02b8fd751b"),
+ ExpectedRowName: "21:ts:d:0000000000:9e1",
+ ExpectedColName: "d402515193304cafbdf02b8fd751b",
+ },
+ {
+ InputDigest: types.Digest("e30a49351a7e45b9591a989073e755b2"),
+ ExpectedRowName: "05:ts:d:0000000000:e30",
+ ExpectedColName: "a49351a7e45b9591a989073e755b2",
+ },
+ {
+ InputDigest: types.Digest("60b5e978d116cccc0ef12278d724245b"),
+ ExpectedRowName: "23:ts:d:0000000000:60b",
+ ExpectedColName: "5e978d116cccc0ef12278d724245b",
+ },
+ }
+
+ for _, test := range tests {
+ row, col := traceStore.rowAndColNameFromDigest(test.InputDigest)
+ assert.Equal(t, test.ExpectedRowName, row)
+ assert.Equal(t, test.ExpectedColName, col)
+ }
+}
+
+const (
+ // Directory with testdata.
+ TEST_DATA_DIR = "./testdata"
+
+ // Local file location of the test data.
+ TEST_DATA_PATH = TEST_DATA_DIR + "/10-test-sample-4bytes.tile"
+
+ // Folder in the testdata bucket. See go/testutils for details.
+ TEST_DATA_STORAGE_PATH = "gold-testdata/10-test-sample-4bytes.tile"
+
+ TILE_LENGTH = 50
+)
+
+// TestBTTraceStoreLargeTile stores a large amount of data into the tracestore
+// and retrieves it.
+func TestBTTraceStoreLargeTile(t *testing.T) {
+ unittest.LargeTest(t)
+ unittest.RequiresBigTableEmulator(t)
+
+ btConf, mvcs, tile := setupLargeTile(t)
+ defer mvcs.AssertExpectations(t)
+
+ ctx := context.Background()
+
+ traceStore, err := New(ctx, btConf, true)
+ assert.NoError(t, err)
+
+ // For each value in tile get the traceIDs that are not empty.
+ traceIDsPerCommit := make([]tiling.TraceIdSlice, TILE_LENGTH)
+ for traceID, trace := range tile.Traces {
+ gTrace := trace.(*types.GoldenTrace)
+ for i := 0; i < TILE_LENGTH; i++ {
+ if gTrace.Digests[i] != types.MISSING_DIGEST {
+ traceIDsPerCommit[i] = append(traceIDsPerCommit[i], traceID)
+ }
+ }
+ }
+ indices := make([]int, TILE_LENGTH)
+ maxIndex := 0
+ maxLen := len(traceIDsPerCommit[0])
+ for idx := range indices {
+ if len(traceIDsPerCommit[idx]) > maxLen {
+ maxLen = len(traceIDsPerCommit[idx])
+ maxIndex = idx
+ }
+ indices[idx] = idx
+ }
+
+ // Ingest the biggest tile.
+ entries := []*tracestore.Entry{}
+ allDigests := map[types.Digest]bool{"": true}
+ for _, traceID := range traceIDsPerCommit[maxIndex] {
+ t := tile.Traces[traceID].(*types.GoldenTrace)
+ digest := t.Digests[maxIndex]
+ allDigests[digest] = true
+ entries = append(entries, &tracestore.Entry{Digest: digest, Params: t.Params()})
+ }
+ assert.NoError(t, traceStore.Put(ctx, tile.Commits[maxIndex].Hash, entries, time.Now()))
+
+ foundDigestMap, err := traceStore.getDigestMap(ctx)
+ assert.NoError(t, err)
+ assert.Equal(t, len(allDigests), foundDigestMap.Len())
+
+ for digest := range allDigests {
+ id, err := foundDigestMap.ID(digest)
+ assert.NoError(t, err)
+ if digest == "" {
+ assert.Equal(t, missingDigestID, id)
+ } else {
+ assert.NotEqual(t, missingDigestID, id)
+ }
+ }
+
+ traceIDsPerCommit[maxIndex] = []tiling.TraceId{}
+
+ // Randomly add samples from the tile to that
+ for len(indices) > 0 {
+ idx := indices[0]
+ indices = indices[1:]
+ if len(traceIDsPerCommit[idx]) == 0 {
+ continue
+ }
+
+ entries := []*tracestore.Entry{}
+ for _, traceID := range traceIDsPerCommit[idx] {
+ t := tile.Traces[traceID].(*types.GoldenTrace)
+ digest := t.Digests[idx]
+ allDigests[digest] = true
+ entries = append(entries, &tracestore.Entry{Digest: digest, Params: t.Params()})
+ }
+ assert.NoError(t, traceStore.Put(ctx, tile.Commits[idx].Hash, entries, time.Now()))
+ }
+
+ // Load the tile and verify it's identical.
+ foundTile, commits, err := traceStore.GetTile(ctx, TILE_LENGTH)
+ assert.NoError(t, err)
+ assert.NotNil(t, commits)
+ assert.Equal(t, tile.Commits[len(tile.Commits)-TILE_LENGTH:], commits)
+
+ assert.Equal(t, len(tile.Traces), len(foundTile.Traces))
+ for traceID, trace := range tile.Traces {
+ gt := trace.(*types.GoldenTrace)
+ params := gt.Params()
+ found := false
+
+ foundCount := 0
+ for _, foundTrace := range foundTile.Traces {
+ if deepequal.DeepEqual(params, foundTrace.Params()) {
+ foundCount++
+ }
+ }
+ assert.Equal(t, 1, foundCount)
+
+ for foundID, foundTrace := range foundTile.Traces {
+ if deepequal.DeepEqual(params, foundTrace.Params()) {
+ expDigests := gt.Digests[len(gt.Digests)-TILE_LENGTH:]
+ found = true
+ fgt := foundTrace.(*types.GoldenTrace)
+ assert.Equal(t, len(expDigests), len(fgt.Digests))
+
+ var diff []string
+ diffStr := ""
+ for idx, digest := range expDigests {
+ isDiff := digest != fgt.Digests[idx]
+ if isDiff {
+ diff = append(diff, fmt.Sprintf("%d", idx))
+ diffStr += fmt.Sprintf(" %q != %q \n", digest, fgt.Digests[idx])
+ }
+ }
+ // Nothing should be different
+ assert.Nil(t, diff)
+ assert.Equal(t, "", diffStr)
+
+ delete(foundTile.Traces, foundID)
+ break
+ }
+ }
+ assert.True(t, found)
+ delete(tile.Traces, traceID)
+ }
+ assert.Equal(t, 0, len(foundTile.Traces))
+ assert.Equal(t, 0, len(tile.Traces))
+}
+
+func setupLargeTile(t sktest.TestingT) (BTConfig, *mock_vcs.VCS, *tiling.Tile) {
+ if !fileutil.FileExists(TEST_DATA_PATH) {
+ err := gcs_testutils.DownloadTestDataFile(t, gcs_testutils.TEST_DATA_BUCKET, TEST_DATA_STORAGE_PATH, TEST_DATA_PATH)
+ assert.NoError(t, err, "Unable to download testdata.")
+ }
+
+ tile := makeSampleTile(t, TEST_DATA_PATH)
+ assert.Len(t, tile.Commits, TILE_LENGTH)
+
+ mvcs := MockVCSWithCommits(tile.Commits, 0)
+
+ btConf := BTConfig{
+ ProjectID: "should-use-the-emulator",
+ InstanceID: "testinstance",
+ TableID: "large_tile_test",
+ VCS: mvcs,
+ }
+
+ assert.NoError(t, bt.DeleteTables(btConf.ProjectID, btConf.InstanceID, btConf.TableID))
+ assert.NoError(t, InitBT(btConf))
+ fmt.Println("BT emulator set up")
+ return btConf, mvcs, tile
+}
+
+func makeSampleTile(t sktest.TestingT, fileName string) *tiling.Tile {
+ file, err := os.Open(fileName)
+ assert.NoError(t, err)
+
+ sample, err := serialize.DeserializeSample(file)
+ assert.NoError(t, err)
+
+ return sample.Tile
+}
+
+func MockVCSWithCommits(commits []*tiling.Commit, offset int) *mock_vcs.VCS {
+ mvcs := &mock_vcs.VCS{}
+
+ indexCommits := make([]*vcsinfo.IndexCommit, 0, len(commits))
+ hashes := make([]string, 0, len(commits))
+ longCommits := make([]*vcsinfo.LongCommit, 0, len(commits))
+ for i, c := range commits {
+ mvcs.On("IndexOf", ctx, c.Hash).Return(i+offset, nil).Maybe()
+
+ indexCommits = append(indexCommits, &vcsinfo.IndexCommit{
+ Hash: c.Hash,
+ Index: i + offset,
+ Timestamp: time.Unix(c.CommitTime, 0),
+ })
+ hashes = append(hashes, c.Hash)
+ longCommits = append(longCommits, &vcsinfo.LongCommit{
+ ShortCommit: &vcsinfo.ShortCommit{
+ Hash: c.Hash,
+ Author: c.Author,
+ Subject: fmt.Sprintf("Commit #%d in test", i),
+ },
+ Timestamp: time.Unix(c.CommitTime, 0),
+ })
+ }
+
+ mvcs.On("LastNIndex", len(commits)).Return(indexCommits)
+ mvcs.On("DetailsMulti", ctx, hashes, false).Return(longCommits, nil)
+
+ return mvcs
+}
+
+func MockSparseVCSWithCommits(commits []*tiling.Commit, realCommitIndices []int, totalCommits int) (*mock_vcs.VCS, []*vcsinfo.LongCommit) {
+ mvcs := &mock_vcs.VCS{}
+ if len(commits) != len(realCommitIndices) {
+ panic("commits should be same length as realCommitIndices")
+ }
+
+ // Create many synthetic commits.
+ indexCommits := make([]*vcsinfo.IndexCommit, totalCommits)
+ longCommits := make([]*vcsinfo.LongCommit, totalCommits)
+ hashes := []string{}
+ for i := 0; i < totalCommits; i++ {
+ h := fmt.Sprintf("%040d", i)
+ indexCommits[i] = &vcsinfo.IndexCommit{
+ Hash: h,
+ Index: i,
+ // space the commits 1700 seconds apart, starting at the epoch
+ // This is an arbitrary amount of space.
+ Timestamp: time.Unix(int64(i*1700), 0),
+ }
+
+ longCommits[i] = &vcsinfo.LongCommit{
+ ShortCommit: &vcsinfo.ShortCommit{
+ Hash: h,
+ Author: "nobody@example.com",
+ },
+ Timestamp: time.Unix(int64(i*1700), 0),
+ }
+ hashes = append(hashes, h)
+
+ }
+
+ for i, c := range commits {
+ index := realCommitIndices[i]
+ mvcs.On("IndexOf", ctx, c.Hash).Return(index, nil).Maybe()
+ indexCommits[index] = &vcsinfo.IndexCommit{
+ Hash: c.Hash,
+ Index: index,
+ Timestamp: time.Unix(int64(index*1700), 0),
+ }
+ hashes[index] = c.Hash
+ longCommits[index] = &vcsinfo.LongCommit{
+ ShortCommit: &vcsinfo.ShortCommit{
+ Hash: c.Hash,
+ Author: c.Author,
+ Subject: fmt.Sprintf("Real commit #%d in test", i),
+ },
+ Timestamp: time.Unix(int64(index*1700), 0),
+ }
+ }
+
+ firstRealCommitIdx := realCommitIndices[0]
+ mvcs.On("ByIndex", ctx, firstRealCommitIdx).Return(longCommits[firstRealCommitIdx], nil).Maybe()
+ mvcs.On("From", mock.Anything).Return(hashes[firstRealCommitIdx:], nil).Maybe()
+ mvcs.On("LastNIndex", 1).Return(indexCommits[totalCommits-1:]).Maybe()
+ mvcs.On("DetailsMulti", ctx, hashes[firstRealCommitIdx:], false).Return(longCommits[firstRealCommitIdx:], nil).Maybe()
+
+ return mvcs, longCommits
+}
+
+var ctx = mock.AnythingOfType("*context.emptyCtx")
diff --git a/golden/go/tracestore/bt_tracestore/types.go b/golden/go/tracestore/bt_tracestore/types.go
new file mode 100644
index 0000000..20d5d56
--- /dev/null
+++ b/golden/go/tracestore/bt_tracestore/types.go
@@ -0,0 +1,386 @@
+package bt_tracestore
+
+import (
+ "crypto/md5"
+ "encoding"
+ "encoding/binary"
+ "fmt"
+ "time"
+
+ "cloud.google.com/go/bigtable"
+ "go.skia.org/infra/go/paramtools"
+ "go.skia.org/infra/go/skerr"
+ "go.skia.org/infra/go/tiling"
+ "go.skia.org/infra/golden/go/types"
+)
+
+// Constants adapted from btts.go
+// See BIGTABLE.md for an overview of how the data is stored in BT.
+const (
+ // Namespace for this package's data. Due to the fact that there is one table per instance,
+ // This makes sure we don't have collisions between traces and something else
+ // in BT (i.e. git info)
+ traceStoreNameSpace = "ts"
+
+ // Column Families.
+ // https://cloud.google.com/bigtable/docs/schema-design#column_families_and_column_qualifiers
+ digestMapFamily = "D" // Store a mapping of Digest->DigestID
+ idCounterFamily = "I" // Keeps a monotonically increasing number to generate DigestIDs
+ opsFamily = "O" // ops short for Ordered Param Set
+ traceFamily = "T" // Holds "0"..."tilesize-1" columns with a DigestID at each cell
+
+ // The columns (and rows) in the digest map family are derived from the digest, which are md5
+ // hashes. The row will be the first three characters of the hash and the column will
+ // be the remaining characters (see b.rowAndColNameFromDigest)
+ // All entries will have the following prefix
+ digestMapFamilyPrefix = digestMapFamily + ":"
+ // All entries are part of a global map (set tile to 0)
+ digestMapTile = tileKey(0)
+
+ // Columns in the ID counter family. There is only one row and one column.
+ idCounterColumn = "idc"
+
+ // Columns in the OrderedParamSet column family.
+ opsHashColumn = "H"
+ opsOpsColumn = "OPS"
+ hashFullColName = opsFamily + ":" + opsHashColumn
+ opsFullColName = opsFamily + ":" + opsOpsColumn
+
+ // The columns in the trace family are "0", "1", "2"..."N" where N is
+ // the BT tile size (default below). These values correspond to the commitOffset,
+ // where 0 is the first (most recent) commit in the tile and N is the last (oldest)
+ // commit in the tile.
+ // They will all have the following prefix.
+ traceFamilyPrefix = traceFamily + ":"
+
+ // Define the row types.
+ typeDigestMap = "d"
+ typeIdCounter = "i"
+ typeOPS = "o"
+ typeTrace = "t"
+
+ // This is the size of the tile in Big Table. That is, how many commits do we store in one tile.
+ // We can have up to 2^32 tiles in big table, so this would let us store 1 trillion
+ // commits worth of data. This tile size does not need to be related to the tile size that
+ // Gold operates on (although when tuning, it should be greater than, or an even divisor
+ // of the Gold tile size). The first commit in the repo belongs to tile 2^32-1 and tile numbers
+ // decrease for newer commits.
+ DefaultTileSize = 256
+
+ // Default number of shards used. A shard splits the traces up on a tile.
+ // If a trace exists on shard N in tile A, it will be on shard N for all tiles.
+ // Having traces on shards lets BT split up the work more evenly.
+ DefaultShards = 32
+
+ // To avoid many successive increment calls to the id counter cell, we request a number of
+ // ids at once. This number is arbitrarily picked and can be increased if need be.
+ numReservedIds = 256
+
+ readTimeout = 4 * time.Minute
+ writeTimeout = 10 * time.Minute
+
+ // BadTileKey is returned in error conditions.
+ badTileKey = tileKey(-1)
+
+ // missingDigestID is the id for types.MISSING_DIGEST
+ missingDigestID = digestID(0)
+)
+
+// List of families (conceptually similar to tables) we are creating in BT.
+var btColumnFamilies = []string{
+ traceFamily,
+ opsFamily,
+ idCounterFamily,
+ digestMapFamily,
+}
+
+// We have one global digest id counter, so just hard-code it to tile 0.
+var idCounterRow = unshardedRowName(typeIdCounter, 0)
+
+// tileKey is the identifier for each tile held in BigTable.
+//
+// Note that tile keys are in the opposite order of tile offset, that is, the first commit
+// in a repo goes in the first tile, which has key 2^32-1. We do this so more recent
+// tiles come first in sort order.
+type tileKey int32
+
+// digestID is an arbitrary number for referring to a types.Digest (string)
+// that is stored in a digestMap.
+type digestID uint64
+
+// MarshalBinary implements the encoding.BinaryMarshaler interface, allowing for
+// us to compactly store these to BT (about 50% space savings over string representation).
+func (d digestID) MarshalBinary() ([]byte, error) {
+ rv := make([]byte, binary.MaxVarintLen64)
+ n := binary.PutUvarint(rv, uint64(d))
+ return rv[:n], nil
+}
+
+var _ encoding.BinaryMarshaler = digestID(0)
+
+// UnmarshalBinary implements the encoding.BinaryUnmarshaler interface, allowing for
+// us to compactly store these to BT (about 50% space savings over string representation).
+func (d *digestID) UnmarshalBinary(data []byte) error {
+ val, n := binary.Uvarint(data)
+ if n != len(data) {
+ return skerr.Fmt("Error decoding digestID from %x; Uvarint consumed %d bytes instead of %d", data, n, len(data))
+ }
+ *d = digestID(val)
+ return nil
+}
+
+var _ encoding.BinaryUnmarshaler = (*digestID)(nil)
+
+// encodedTraceID is a shortened form of a tiling.TraceId, e.g. 0=1,1=3,3=0,
+// Those indices are references to the OrderedParamSet stored in encTile.
+// See params.paramsEncoder
+type encodedTraceID string
+
+// encTile contains an encoded tile.
+type encTile struct {
+ // maps a trace id to the list of digest ids. The list corresponds to the commits, with index
+ // 0 being the oldest commit and the last commit being the most recent.
+ traces map[encodedTraceID][]digestID
+ ops *paramtools.OrderedParamSet
+}
+
+// When ingesting we keep a cache of the OrderedParamSets we have seen per-tile.
+type opsCacheEntry struct {
+ ops *paramtools.OrderedParamSet
+ hash string // md5 has of the serialized ops - used for deterministic querying.
+}
+
+// opsCacheEntryFromOPS creates and fills in an OpsCacheEntry from the given
+// OrderedParamSet and sets the hash appropriately.
+func opsCacheEntryFromOPS(ops *paramtools.OrderedParamSet) (*opsCacheEntry, error) {
+ buf, err := ops.Encode()
+ if err != nil {
+ return nil, skerr.Fmt("could not encode the given ops to bytes: %s", err)
+ }
+ hash := fmt.Sprintf("%x", md5.Sum(buf))
+ return &opsCacheEntry{
+ ops: ops,
+ hash: hash,
+ }, nil
+}
+
+// newOpsCacheEntry returns an empty OpsCacheEntry.
+func newOpsCacheEntry() (*opsCacheEntry, error) {
+ return opsCacheEntryFromOPS(paramtools.NewOrderedParamSet())
+}
+
+// newOpsCacheEntryFromRow loads the appropriate data from the given BT row
+// and returns a OpsCacheEntry with that data.
+func newOpsCacheEntryFromRow(row bigtable.Row) (*opsCacheEntry, error) {
+ family := row[opsFamily]
+ if len(family) != 2 {
+ // This should never happen
+ return nil, skerr.Fmt("incorrect number of of OPS columns in BT for key %s, %d != 2", row.Key(), len(family))
+ }
+ ops := ¶mtools.OrderedParamSet{}
+ hash := ""
+ for _, col := range family {
+ if col.Column == opsFullColName {
+ var err error
+ ops, err = paramtools.NewOrderedParamSetFromBytes(col.Value)
+ if err != nil {
+ // should never happen
+ return nil, skerr.Fmt("corrupted paramset in BT for key %s: %s", row.Key(), err)
+ }
+ } else if col.Column == hashFullColName {
+ hash = string(col.Value)
+ }
+ }
+ if hash == "" {
+ return nil, skerr.Fmt("missing hash for OPS for key %s: %#v", row.Key(), ops)
+ }
+ // You might be tempted to use opsCacheEntryFromOps and
+ // check that entry.hash == hash here, but that will fail
+ // because GoB encoding of maps is not deterministic.
+ entry := opsCacheEntry{
+ ops: ops,
+ hash: hash,
+ }
+ return &entry, nil
+}
+
+// A digestMap keeps track of the mapping between encoded digestID and their corresponding
+// types.Digest.
+type digestMap struct {
+ intMap map[digestID]types.Digest
+ strMap map[types.Digest]digestID
+}
+
+// newDigestMap creates an empty digestMap with the given capacity.
+func newDigestMap(cap int) *digestMap {
+ ret := &digestMap{
+ intMap: make(map[digestID]types.Digest, cap),
+ strMap: make(map[types.Digest]digestID, cap),
+ }
+ ret.intMap[missingDigestID] = types.MISSING_DIGEST
+ ret.strMap[types.MISSING_DIGEST] = missingDigestID
+ return ret
+}
+
+// Delta returns a []types.Digest of those passed in digests that are
+// not in this mapping currently.
+func (d *digestMap) Delta(digests map[types.Digest]bool) []types.Digest {
+ ret := make([]types.Digest, 0, len(digests))
+ for digest := range digests {
+ if _, ok := d.strMap[digest]; !ok {
+ ret = append(ret, digest)
+ }
+ }
+ return ret
+}
+
+// Add expands the map with the given entries. It fails if any key or any value
+// is already in the mapping.
+func (d *digestMap) Add(newEntries map[types.Digest]digestID) error {
+ for digest, id := range newEntries {
+ if digest == types.MISSING_DIGEST || id == 0 {
+ return skerr.Fmt("invalid input id or digest: (%q -> %d)", digest, id)
+ }
+
+ foundID, strExists := d.strMap[digest]
+ foundDigest, intExists := d.intMap[id]
+ if strExists && intExists {
+ if (foundID != id) || (foundDigest != digest) {
+ return skerr.Fmt("inconsistent data - got (%q -> %d) when (%q -> %d) was expected", digest, id, foundDigest, foundID)
+ }
+ // Already contained so this is a no-op.
+ return nil
+ }
+
+ if strExists || intExists {
+ return skerr.Fmt("internal inconsistency - expected forward mapping (%q -> %d) and reverse mapping (%d -> %q) to both be present", digest, foundID, id, foundDigest)
+ }
+
+ // New mapping. Add it.
+ d.intMap[id] = digest
+ d.strMap[digest] = id
+ }
+ return nil
+}
+
+// ID returns the DigestID for a given types.Digest.
+func (d *digestMap) ID(digest types.Digest) (digestID, error) {
+ ret, ok := d.strMap[digest]
+ if !ok {
+ return 0, skerr.Fmt("unable to find id for %q", digest)
+ }
+ return ret, nil
+}
+
+// DecodeIDs is like Digest but in bulk.
+func (d *digestMap) DecodeIDs(ids []digestID) ([]types.Digest, error) {
+ ret := make([]types.Digest, len(ids))
+ var ok bool
+ for idx, id := range ids {
+ ret[idx], ok = d.intMap[id]
+ if !ok {
+ return nil, skerr.Fmt("unable to find id %d in intMap", id)
+ }
+ }
+ return ret, nil
+}
+
+// Digest returns the types.Digest for a given DigestID.
+func (d *digestMap) Digest(id digestID) (types.Digest, error) {
+ ret, ok := d.intMap[id]
+ if !ok {
+ return "", skerr.Fmt("unable to find digest for %d", id)
+ }
+ return ret, nil
+}
+
+// Len returns how many map entries are in this map.
+func (d *digestMap) Len() int {
+ return len(d.strMap)
+}
+
+// Define this as a type so we can define some helper functions.
+type traceMap map[tiling.TraceId]tiling.Trace
+
+// CommitIndicesWithData returns the indexes of the commits with at least one non-missing
+// digest in at least one trace.
+func (t traceMap) CommitIndicesWithData() []int {
+ if len(t) == 0 {
+ return nil
+ }
+ numCommits := 0
+ for _, trace := range t {
+ gt := trace.(*types.GoldenTrace)
+ numCommits = len(gt.Digests)
+ break
+ }
+ var haveData []int
+ for i := 0; i < numCommits; i++ {
+ for _, trace := range t {
+ gt := trace.(*types.GoldenTrace)
+ if !gt.IsMissing(i) {
+ haveData = append(haveData, i)
+ break
+ }
+ }
+ }
+ return haveData
+}
+
+// MakeFromCommitIndexes creates a new traceMap from the data in this one that
+// only has the digests belonging to the given commit indices. Conceptually,
+// this grabs a subset of the commit columns from the tile.
+func (t traceMap) MakeFromCommitIndexes(indices []int) traceMap {
+ if len(indices) == 0 {
+ return traceMap{}
+ }
+ r := make(traceMap, len(t))
+ for id, trace := range t {
+ gt := trace.(*types.GoldenTrace)
+
+ newDigests := make([]types.Digest, len(indices))
+ for i, idx := range indices {
+ newDigests[i] = gt.Digests[idx]
+ }
+
+ r[id] = &types.GoldenTrace{
+ Keys: gt.Keys,
+ Digests: newDigests,
+ }
+ }
+ return r
+}
+
+// PrependTraces augments this traceMap with the data from the given one.
+// Specifically, it prepends that data, assuming the "other" data came
+// before the data in this map.
+// TODO(kjlubick): Deduplicate this with tiling.Merge
+func (t traceMap) PrependTraces(other traceMap) {
+ numCommits := 0
+ for _, trace := range t {
+ gt := trace.(*types.GoldenTrace)
+ numCommits = len(gt.Digests)
+ break
+ }
+
+ numOtherCommits := 0
+ for id, trace := range other {
+ numOtherCommits = trace.Len()
+ original, ok := t[id]
+ if ok {
+ // Keys are constant and are what the id is derived from
+ t[id] = trace.Merge(original)
+ } else {
+ // if we stopped seeing the trace in t, we need to pad the end with MISSING_DIGEST
+ trace.Grow(numOtherCommits+numCommits, tiling.FILL_AFTER) // Assumes we can modify other
+ t[id] = trace
+ }
+ }
+
+ // if we saw a trace in t, but not in other, we need to pad the beginning with MISSING_DIGEST
+ for id, trace := range t {
+ if _, ok := other[id]; !ok {
+ trace.Grow(numOtherCommits+numCommits, tiling.FILL_BEFORE)
+ }
+ }
+}
diff --git a/golden/go/tracestore/bt_tracestore/types_test.go b/golden/go/tracestore/bt_tracestore/types_test.go
new file mode 100644
index 0000000..6aefc3d
--- /dev/null
+++ b/golden/go/tracestore/bt_tracestore/types_test.go
@@ -0,0 +1,267 @@
+package bt_tracestore
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "go.skia.org/infra/go/testutils/unittest"
+ "go.skia.org/infra/golden/go/types"
+)
+
+// TestDigestMapMissingDigest verifies a newly created digest map
+// starts off with the mapping for a missing digest and nothing else.
+func TestDigestMapMissingDigest(t *testing.T) {
+ unittest.SmallTest(t)
+
+ digestMap := newDigestMap(1000)
+ assert.Equal(t, 1, digestMap.Len())
+ id, err := digestMap.ID(types.MISSING_DIGEST)
+ assert.NoError(t, err)
+ assert.Equal(t, id, missingDigestID)
+
+ _, err = digestMap.ID(AlphaDigest)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "unable to find")
+}
+
+// TestDigestMapAddGet tests the three ways to get data out of the map.
+func TestDigestMapAddGet(t *testing.T) {
+ unittest.SmallTest(t)
+
+ digestMap := newDigestMap(1)
+ err := digestMap.Add(map[types.Digest]digestID{
+ AlphaDigest: AlphaID,
+ BetaDigest: BetaID,
+ GammaDigest: GammaID,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, 4, digestMap.Len())
+
+ digests, err := digestMap.DecodeIDs([]digestID{GammaID, AlphaID})
+ assert.NoError(t, err)
+ assert.Equal(t, digests, []types.Digest{GammaDigest, AlphaDigest})
+
+ d, err := digestMap.Digest(BetaID)
+ assert.NoError(t, err)
+ assert.Equal(t, BetaDigest, d)
+
+ id, err := digestMap.ID(BetaDigest)
+ assert.NoError(t, err)
+ assert.Equal(t, BetaID, id)
+}
+
+// TestDigestMapAddBadGet tests getting things out of the map that don't exist.
+func TestDigestMapAddBadGet(t *testing.T) {
+ unittest.SmallTest(t)
+
+ notExistID := digestID(99)
+ notExistDigest := types.Digest("fffe39544765f38baab53350aef79966")
+
+ digestMap := newDigestMap(1)
+ err := digestMap.Add(map[types.Digest]digestID{
+ AlphaDigest: AlphaID,
+ BetaDigest: BetaID,
+ GammaDigest: GammaID,
+ })
+ assert.NoError(t, err)
+ _, err = digestMap.DecodeIDs([]digestID{AlphaID, notExistID})
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "unable to find id")
+
+ _, err = digestMap.Digest(notExistID)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "unable to find digest")
+
+ _, err = digestMap.ID(notExistDigest)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "unable to find id")
+}
+
+// TestDigestMapBadAdd tests some corner cases with adding things to the map.
+func TestDigestMapBadAdd(t *testing.T) {
+ unittest.SmallTest(t)
+
+ digestMap := newDigestMap(1)
+ err := digestMap.Add(map[types.Digest]digestID{AlphaDigest: AlphaID})
+ assert.NoError(t, err)
+ err = digestMap.Add(map[types.Digest]digestID{BetaDigest: BetaID})
+ assert.NoError(t, err)
+
+ // Adding something multiple times is no error
+ err = digestMap.Add(map[types.Digest]digestID{BetaDigest: BetaID})
+ assert.NoError(t, err)
+ err = digestMap.Add(map[types.Digest]digestID{BetaDigest: BetaID})
+ assert.NoError(t, err)
+
+ // Can't add something with MISSING_DIGEST as a key ...
+ err = digestMap.Add(map[types.Digest]digestID{types.MISSING_DIGEST: 5})
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "invalid input id")
+ // ... or as a value.
+ err = digestMap.Add(map[types.Digest]digestID{BetaDigest: missingDigestID})
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "invalid input id")
+
+ // Can't add something that is already in the map as a key
+ err = digestMap.Add(map[types.Digest]digestID{BetaDigest: GammaID})
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "internal inconsistency")
+ // ... or as a value.
+ err = digestMap.Add(map[types.Digest]digestID{GammaDigest: BetaID})
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "internal inconsistency")
+
+ // Can't mix up data that has already been seen before
+ err = digestMap.Add(map[types.Digest]digestID{AlphaDigest: BetaID})
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "inconsistent data")
+}
+
+func TestTraceMapCommitIndicesWithData(t *testing.T) {
+ unittest.SmallTest(t)
+
+ tm := traceMap{
+ ",key=first,": &types.GoldenTrace{
+ Digests: []types.Digest{
+ types.MISSING_DIGEST, types.MISSING_DIGEST, AlphaDigest,
+ AlphaDigest, types.MISSING_DIGEST, BetaDigest,
+ },
+ },
+ ",key=second,": &types.GoldenTrace{
+ Digests: []types.Digest{
+ GammaDigest, types.MISSING_DIGEST, types.MISSING_DIGEST,
+ GammaDigest, types.MISSING_DIGEST, GammaDigest,
+ },
+ },
+ }
+ assert.Equal(t, []int{0, 2, 3, 5}, tm.CommitIndicesWithData())
+
+ empty := traceMap{
+ ",key=first,": &types.GoldenTrace{
+ Digests: []types.Digest{
+ types.MISSING_DIGEST, types.MISSING_DIGEST, types.MISSING_DIGEST,
+ },
+ },
+ ",key=second,": &types.GoldenTrace{
+ Digests: []types.Digest{
+ types.MISSING_DIGEST, types.MISSING_DIGEST, types.MISSING_DIGEST,
+ },
+ },
+ }
+ assert.Nil(t, empty.CommitIndicesWithData())
+
+ assert.Nil(t, traceMap{}.CommitIndicesWithData())
+}
+
+func TestTraceMapMakeFromCommitIndexes(t *testing.T) {
+ unittest.SmallTest(t)
+
+ tm := traceMap{
+ ",key=first,": &types.GoldenTrace{
+ Digests: []types.Digest{
+ types.MISSING_DIGEST, types.MISSING_DIGEST, AlphaDigest,
+ AlphaDigest, types.MISSING_DIGEST, BetaDigest,
+ },
+ },
+ ",key=second,": &types.GoldenTrace{
+ Digests: []types.Digest{
+ GammaDigest, types.MISSING_DIGEST, types.MISSING_DIGEST,
+ GammaDigest, types.MISSING_DIGEST, GammaDigest,
+ },
+ },
+ }
+
+ assert.Equal(t, traceMap{
+ ",key=first,": &types.GoldenTrace{
+ Digests: []types.Digest{
+ types.MISSING_DIGEST, AlphaDigest,
+ AlphaDigest, BetaDigest,
+ },
+ },
+ ",key=second,": &types.GoldenTrace{
+ Digests: []types.Digest{
+ GammaDigest, types.MISSING_DIGEST,
+ GammaDigest, GammaDigest,
+ },
+ },
+ }, tm.MakeFromCommitIndexes([]int{0, 2, 3, 5}))
+
+ assert.Equal(t, traceMap{
+ ",key=first,": &types.GoldenTrace{
+ Digests: []types.Digest{
+ types.MISSING_DIGEST, types.MISSING_DIGEST, types.MISSING_DIGEST,
+ },
+ },
+ ",key=second,": &types.GoldenTrace{
+ Digests: []types.Digest{
+ GammaDigest, types.MISSING_DIGEST, types.MISSING_DIGEST,
+ },
+ },
+ }, tm.MakeFromCommitIndexes([]int{0, 1, 4}))
+
+ assert.Equal(t, traceMap{}, tm.MakeFromCommitIndexes([]int{}))
+ assert.Equal(t, traceMap{}, tm.MakeFromCommitIndexes(nil))
+}
+
+func TestTraceMapPrependTraces(t *testing.T) {
+ unittest.SmallTest(t)
+
+ tm1 := traceMap{
+ ",key=first,": &types.GoldenTrace{
+ Digests: []types.Digest{
+ types.MISSING_DIGEST, types.MISSING_DIGEST, AlphaDigest,
+ },
+ },
+ ",key=second,": &types.GoldenTrace{
+ Digests: []types.Digest{
+ GammaDigest, types.MISSING_DIGEST, types.MISSING_DIGEST,
+ },
+ },
+ }
+
+ tm2 := traceMap{
+ ",key=first,": &types.GoldenTrace{
+ Digests: []types.Digest{
+ types.MISSING_DIGEST, GammaDigest,
+ },
+ },
+ ",key=third,": &types.GoldenTrace{
+ Digests: []types.Digest{
+ GammaDigest, BetaDigest,
+ },
+ },
+ }
+
+ tm1.PrependTraces(tm2)
+
+ assert.Equal(t, traceMap{
+ ",key=first,": &types.GoldenTrace{
+ Digests: []types.Digest{
+ types.MISSING_DIGEST, GammaDigest,
+ types.MISSING_DIGEST, types.MISSING_DIGEST, AlphaDigest,
+ },
+ },
+ ",key=second,": &types.GoldenTrace{
+ Digests: []types.Digest{
+ types.MISSING_DIGEST, types.MISSING_DIGEST,
+ GammaDigest, types.MISSING_DIGEST, types.MISSING_DIGEST,
+ },
+ },
+ ",key=third,": &types.GoldenTrace{
+ Digests: []types.Digest{
+ GammaDigest, BetaDigest,
+ types.MISSING_DIGEST, types.MISSING_DIGEST, types.MISSING_DIGEST,
+ },
+ },
+ }, tm1)
+}
+
+const (
+ AlphaDigest = types.Digest("aaa6fc936d06e6569788366f1e3fda4e")
+ BetaDigest = types.Digest("bbb15c047d150d961573062854f35a55")
+ GammaDigest = types.Digest("cccd42f3ee0b02687f63963adb36a580")
+
+ AlphaID = digestID(1)
+ BetaID = digestID(2)
+ GammaID = digestID(3)
+)
diff --git a/golden/go/tracestore/bt_tracestore/util.go b/golden/go/tracestore/bt_tracestore/util.go
new file mode 100644
index 0000000..9bb8c87
--- /dev/null
+++ b/golden/go/tracestore/bt_tracestore/util.go
@@ -0,0 +1,45 @@
+package bt_tracestore
+
+import (
+ "fmt"
+ "math"
+ "strings"
+)
+
+// tileKeyFromIndex converts the tile index to the tileKey.
+// See BIGTABLE.md for more on this conversion.
+func tileKeyFromIndex(tileIndex int32) tileKey {
+ if tileIndex < 0 {
+ return badTileKey
+ }
+ return tileKey(math.MaxInt32 - tileIndex)
+}
+
+// OpsRowName returns the name of the BigTable row which stores the OrderedParamSet
+// for this tile.
+func (t tileKey) OpsRowName() string {
+ return unshardedRowName(typeOPS, t)
+}
+
+// unshardedRowName calculates the row for the given data which all has the same format:
+// :[namespace]:[type]:[tile]:
+func unshardedRowName(rowType string, tileKey tileKey) string {
+ return fmt.Sprintf(":%s:%s:%010d:", traceStoreNameSpace, rowType, tileKey)
+}
+
+// shardedRowName calculates the row for the given data which all has the same format:
+// [shard]:[namespace]:[type]:[tile]:[subkey]
+// For some data types, where there is only one row, or when doing a prefix-match,
+// subkey may be "".
+func shardedRowName(shard int32, rowType string, tileKey tileKey, subkey string) string {
+ return fmt.Sprintf("%02d:%s:%s:%010d:%s", shard, traceStoreNameSpace, rowType, tileKey, subkey)
+}
+
+// extractKey returns the subkey from the given row name. This could be "".
+func extractSubkey(rowName string) string {
+ parts := strings.Split(rowName, ":")
+ if len(parts) == 0 {
+ return ""
+ }
+ return parts[len(parts)-1]
+}
diff --git a/golden/go/tracestore/bt_tracestore/util_test.go b/golden/go/tracestore/bt_tracestore/util_test.go
new file mode 100644
index 0000000..586a593
--- /dev/null
+++ b/golden/go/tracestore/bt_tracestore/util_test.go
@@ -0,0 +1,57 @@
+package bt_tracestore
+
+import (
+ "math"
+ "testing"
+
+ assert "github.com/stretchr/testify/require"
+ "go.skia.org/infra/go/testutils/unittest"
+)
+
+// The tests in this package are mainly to make sure changes that
+// are not backward-compatible are detected.
+
+func TestTileKeyFromIndex(t *testing.T) {
+ unittest.SmallTest(t)
+
+ // spot-check some arbitrary values
+ assert.Equal(t, tileKey(2147483647), tileKeyFromIndex(0))
+ assert.Equal(t, tileKey(2147483451), tileKeyFromIndex(196))
+ assert.Equal(t, tileKey(908536335), tileKeyFromIndex(1238947312))
+}
+
+func TestOpsRowName(t *testing.T) {
+ unittest.SmallTest(t)
+
+ // spot-check some arbitrary values
+ assert.Equal(t, ":ts:o:2147483647:", tileKeyFromIndex(0).OpsRowName())
+ assert.Equal(t, ":ts:o:2147483451:", tileKeyFromIndex(196).OpsRowName())
+ assert.Equal(t, ":ts:o:0908536335:", tileKeyFromIndex(1238947312).OpsRowName())
+}
+
+func TestShardedRowName(t *testing.T) {
+ unittest.SmallTest(t)
+
+ shard := int32(3) // arbitrarily picked
+ tileZeroKey := tileKey(math.MaxInt32 - 1)
+ veryNewTileKey := tileKey(57)
+
+ // Example RowName for a trace
+ encodedTrace := ",0=1,1=3,3=0,"
+ assert.Equal(t, "03:ts:t:2147483646:,0=1,1=3,3=0,", shardedRowName(shard, typeTrace, tileZeroKey, encodedTrace))
+ assert.Equal(t, "03:ts:t:0000000057:,0=1,1=3,3=0,", shardedRowName(shard, typeTrace, veryNewTileKey, encodedTrace))
+
+ // Example RowName for a digest
+ // digests are stored in a row based on the first three characters and a
+ // column with the remaining characters.
+ digestPrefix := string(AlphaDigest[:3])
+ assert.Equal(t, "03:ts:d:2147483646:aaa", shardedRowName(shard, typeDigestMap, tileZeroKey, digestPrefix))
+}
+
+func TestExtractKeyFromRowName(t *testing.T) {
+ unittest.SmallTest(t)
+
+ assert.Equal(t, "ae3", extractSubkey("07:ts:d:2147483646:ae3"))
+ assert.Equal(t, "", extractSubkey(":ts:o:2147483646:"))
+ assert.Equal(t, ",0=1,1=3,3=0,", extractSubkey("03:ts:t:2147483646:,0=1,1=3,3=0,"))
+}
diff --git a/golden/go/tracestore/types.go b/golden/go/tracestore/types.go
new file mode 100644
index 0000000..7b70785
--- /dev/null
+++ b/golden/go/tracestore/types.go
@@ -0,0 +1,75 @@
+package tracestore
+
+import (
+ "context"
+ "regexp"
+ "time"
+
+ "go.skia.org/infra/go/paramtools"
+ "go.skia.org/infra/go/query"
+ "go.skia.org/infra/go/sklog"
+ "go.skia.org/infra/go/tiling"
+ "go.skia.org/infra/golden/go/types"
+)
+
+// Entry is one digests and related params to be added to the TraceStore.
+// TODO(kjlubick): Make sure this does not include the keys included in
+// options.
+type Entry struct {
+ // Params describe the configuration that produced the digest/image.
+ Params map[string]string
+
+ // Digest references the image that was generated by the test.
+ Digest types.Digest
+}
+
+// TraceStore is the interface to store trace data.
+type TraceStore interface {
+ // Put writes the given entries to the TraceStore at the given commit hash. The timestamp is
+ // assumed to be the time when the entries were generated.
+ // It is undefined behavior to have multiple entries with the exact same Params.
+ Put(ctx context.Context, commitHash string, entries []*Entry, ts time.Time) error
+
+ // GetTile reads the last n commits and returns them as a tile.
+ // The second return value is all commits that are in the tile.
+ GetTile(ctx context.Context, nCommits int) (*tiling.Tile, []*tiling.Commit, error)
+
+ // GetDenseTile constructs a tile containing only commits that have data for at least one trace.
+ // The returned tile will always have length exactly nCommits unless there are fewer than
+ // nCommits commits with data. The second return value contains all commits starting with the
+ // first commit of the tile and ending with the most recent commit, in order; i.e. it includes
+ // all commits in the tile as well as the omitted commits.
+ GetDenseTile(ctx context.Context, nCommits int) (*tiling.Tile, []*tiling.Commit, error)
+}
+
+// TraceIDFromParams deterministically returns a TraceId that uniquely encodes
+// the given params. It follows the same convention as perf's trace ids, that
+// is something like ",key1=value1,key2=value2,...," where the keys
+// are in alphabetical order.
+func TraceIDFromParams(params paramtools.Params) tiling.TraceId {
+ // Clean up any params with , or =
+ params = forceValid(params)
+ s, err := query.MakeKeyFast(params)
+ if err != nil {
+ sklog.Warningf("Invalid params passed in for trace id %#v: %s", params, err)
+ }
+ return tiling.TraceId(s)
+}
+
+var (
+ invalidChar = regexp.MustCompile("([,=])")
+)
+
+func clean(s string) string {
+ return invalidChar.ReplaceAllLiteralString(s, "_")
+}
+
+// forceValid ensures that the resulting map will make a valid structured key.
+func forceValid(m map[string]string) map[string]string {
+ ret := make(map[string]string, len(m))
+ for key, value := range m {
+ ret[clean(key)] = clean(value)
+ }
+
+ return ret
+}
diff --git a/golden/go/tracestore/types_test.go b/golden/go/tracestore/types_test.go
new file mode 100644
index 0000000..d535146
--- /dev/null
+++ b/golden/go/tracestore/types_test.go
@@ -0,0 +1,42 @@
+package tracestore
+
+import (
+ "testing"
+
+ assert "github.com/stretchr/testify/require"
+ "go.skia.org/infra/go/paramtools"
+ "go.skia.org/infra/go/testutils/unittest"
+ "go.skia.org/infra/go/tiling"
+ "go.skia.org/infra/golden/go/types"
+)
+
+func TestTraceIDFromParams(t *testing.T) {
+ unittest.SmallTest(t)
+
+ input := paramtools.Params{
+ "cpu": "x86",
+ "gpu": "nVidia",
+ types.PRIMARY_KEY_FIELD: "test_alpha",
+ types.CORPUS_FIELD: "dm",
+ }
+
+ expected := tiling.TraceId(",cpu=x86,gpu=nVidia,name=test_alpha,source_type=dm,")
+
+ assert.Equal(t, expected, TraceIDFromParams(input))
+}
+
+// TestTraceIDFromParamsMalicious adds some values with invalid chars.
+func TestTraceIDFromParamsMalicious(t *testing.T) {
+ unittest.SmallTest(t)
+
+ input := paramtools.Params{
+ "c=p,u": `"x86"`,
+ "gpu": "nVi,,=dia",
+ types.PRIMARY_KEY_FIELD: "test=alpha",
+ types.CORPUS_FIELD: "dm!",
+ }
+
+ expected := tiling.TraceId(`,c_p_u="x86",gpu=nVi___dia,name=test_alpha,source_type=dm!,`)
+
+ assert.Equal(t, expected, TraceIDFromParams(input))
+}
diff --git a/golden/go/types/types.go b/golden/go/types/types.go
index 62b8dc3..10ca302 100644
--- a/golden/go/types/types.go
+++ b/golden/go/types/types.go
@@ -387,6 +387,11 @@
return -1
}
+// String prints a human friendly version of this trace.
+func (g *GoldenTrace) String() string {
+ return fmt.Sprintf("Keys: %#v, Digests: %q", g.Keys, g.Digests)
+}
+
// NewGoldenTrace allocates a new Trace set up for the given number of samples.
//
// The Trace Digests are pre-filled in with the missing data sentinel since not
diff --git a/perf/BIGTABLE.md b/perf/BIGTABLE.md
index 6990c05..f219726 100644
--- a/perf/BIGTABLE.md
+++ b/perf/BIGTABLE.md
@@ -94,8 +94,8 @@
Values used in row names:
- TileKey = 2^22 - (tile number)
- - With 256 values per tile this let's us store 1 billion values per trace.
+ TileKey = (2^32-1) - (tile number)
+ - With 256 values per tile this lets us store 1 trillion values per trace.
- Formatted as %07d
- Note that this reverses the order of the tiles, i.e. new tiles have
smaller numbers, so that we can do a simple query to find the newest tile.