blob: e165d64c5b39c72a34b929d4c161785eda2a0573 [file] [log] [blame]
package btts
import (
"context"
"encoding/json"
"fmt"
"math"
"net/url"
"sort"
"strings"
"sync"
"testing"
"time"
"cloud.google.com/go/bigtable"
"github.com/stretchr/testify/assert"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/query"
"go.skia.org/infra/go/testutils/unittest"
"go.skia.org/infra/go/vec32"
"go.skia.org/infra/perf/go/config"
"go.skia.org/infra/perf/go/tracestore/btts/btts_testutils"
"go.skia.org/infra/perf/go/types"
)
var (
cfg = &config.InstanceConfig{
DataStoreConfig: config.DataStoreConfig{
TileSize: 256,
Project: "testtest",
Instance: "testtest",
Table: "testtest",
Shards: 8,
},
}
)
func TestBasic(t *testing.T) {
unittest.LargeTest(t)
unittest.RequiresBigTableEmulator(t)
ctx := context.Background()
btts_testutils.CreateTestTable(t)
defer btts_testutils.CleanUpTestTable(t)
b, err := NewBigTableTraceStoreFromConfig(ctx, cfg, &btts_testutils.MockTS{}, true)
assert.NoError(t, err)
// Create an OPS in a fresh tile.
tileKey := TileKeyFromTileNumber(1)
op, err := b.updateOrderedParamSet(tileKey, paramtools.ParamSet{
"cpu": []string{"x86", "arm"},
"config": []string{"8888", "565"},
})
assert.NoError(t, err)
assert.Len(t, op.KeyOrder, 2)
// Then update that OPS.
op, err = b.updateOrderedParamSet(tileKey, paramtools.ParamSet{
"os": []string{"linux", "win"},
})
assert.NoError(t, err)
assert.Len(t, op.KeyOrder, 3)
// Do we calculate LatestTile correctly?
latest, err := b.GetLatestTile()
assert.NoError(t, err)
assert.Equal(t, types.TileNumber(1), latest)
// Add an OPS for a new tile.
tileKey2 := TileKeyFromTileNumber(4)
op, err = b.updateOrderedParamSet(tileKey2, paramtools.ParamSet{
"os": []string{"win", "linux"},
})
// Do we calculate LatestTile correctly?
latest, err = b.GetLatestTile()
assert.NoError(t, err)
assert.Equal(t, types.TileNumber(4), latest)
// Create another instance, so it has no cache.
b2, err := NewBigTableTraceStoreFromConfig(ctx, cfg, &btts_testutils.MockTS{}, false)
assert.NoError(t, err)
// OPS for tile 4 should be a no-op since it's already in BT.
op2, err := b2.updateOrderedParamSet(tileKey2, paramtools.ParamSet{
// Note we reverse "linux", "win" order, but still get the same
// result as op.
"os": []string{"linux", "win"},
})
assert.Equal(t, op, op2)
}
func TestOPSThreaded(t *testing.T) {
unittest.LargeTest(t)
unittest.RequiresBigTableEmulator(t)
ctx := context.Background()
btts_testutils.CreateTestTable(t)
defer btts_testutils.CleanUpTestTable(t)
b, err := NewBigTableTraceStoreFromConfig(ctx, cfg, &btts_testutils.MockTS{}, true)
assert.NoError(t, err)
tileKey := TileKeyFromTileNumber(1)
expected := paramtools.ParamSet{}
// Add multiple params to the OPS in goroutines.
wg := sync.WaitGroup{}
for _, cpu := range []string{"x86", "arm"} {
for _, config := range []string{"8888", "565"} {
for _, os := range []string{"linux", "win"} {
paramset := paramtools.ParamSet{
"cpu": []string{cpu},
"config": []string{config},
"os": []string{os},
}
expected.AddParamSet(paramset)
wg.Add(1)
go func() {
defer wg.Done()
_, err := b.updateOrderedParamSet(tileKey, paramset)
assert.NoError(t, err)
}()
}
}
}
wg.Wait()
// read current OPS
entry, _, err := b.getOPS(tileKey)
assert.NoError(t, err)
expected.Normalize()
entry.ops.ParamSet.Normalize()
assert.Equal(t, expected, entry.ops.ParamSet)
}
// assertIndices asserts that the indices in the table match expectedKeys and
// expectedColumns. The 'params' is a slice of Params, one for each trace id we
// expect to find. Returns the number of rows actually retrieved.
func assertIndices(t *testing.T, ops *paramtools.OrderedParamSet, b *BigTableTraceStore, params []paramtools.Params, msg string) int {
var count int
err := b.getTable().ReadRows(context.Background(), bigtable.PrefixRange(INDEX_PREFIX), func(row bigtable.Row) bool {
count++
parts := strings.Split(row.Key(), ":")
// The key (1) and value (2) should appear as part of the encoded
// structured key (trace id) (3)
substr := fmt.Sprintf(",%s=%s,", parts[1], parts[2])
assert.Contains(t, parts[3], substr)
p, err := ops.DecodeParamsFromString(parts[3])
assert.NoError(t, err)
assert.Contains(t, params, p)
return true
}, bigtable.RowFilter(
bigtable.ChainFilters(
bigtable.LatestNFilter(1),
bigtable.FamilyFilter(INDEX_FAMILY),
),
),
)
assert.NoError(t, err)
return count
}
// getIndexRowKeys returns the row keys for all the index entries.
func getIndexRowKeys(t *testing.T, b *BigTableTraceStore) []string {
ret := []string{}
err := b.getTable().ReadRows(context.Background(), bigtable.PrefixRange(INDEX_PREFIX), func(row bigtable.Row) bool {
ret = append(ret, row.Key())
return true
})
assert.NoError(t, err)
return ret
}
func TestTraces(t *testing.T) {
unittest.LargeTest(t)
unittest.RequiresBigTableEmulator(t)
ctx := context.Background()
btts_testutils.CreateTestTable(t)
defer btts_testutils.CleanUpTestTable(t)
now := time.Now()
b, err := NewBigTableTraceStoreFromConfig(ctx, cfg, &btts_testutils.MockTS{}, true)
assert.NoError(t, err)
tileNumber := types.TileNumber(1)
ops, err := b.GetOrderedParamSet(ctx, tileNumber, time.Now())
assert.NoError(t, err)
assertIndices(t, ops, b, nil, "Start empty")
paramset := paramtools.ParamSet{
"config": []string{"8888", "565"},
"cpu": []string{"x86", "arm"},
}
assert.NoError(t, err)
expectedParams := []paramtools.Params{
{"cpu": "x86", "config": "8888"},
{"cpu": "x86", "config": "565"},
{"cpu": "arm", "config": "8888"},
{"cpu": "arm", "config": "565"},
}
values := []float32{
1.0,
1.1,
1.2,
1.3,
}
err = b.WriteTraces(257, expectedParams, values, paramset, "gs://some/test/location", now)
assert.NoError(t, err)
ops, err = b.GetOrderedParamSet(ctx, tileNumber, time.Now())
assert.NoError(t, err)
count := assertIndices(t, ops, b, expectedParams, "First write")
assert.Equal(t, 8, count)
indexCount, err := b.CountIndices(context.Background(), tileNumber)
assert.NoError(t, err)
assert.Equal(t, int64(8), indexCount)
q, err := query.New(url.Values{"config": []string{"8888"}})
assert.NoError(t, err)
results, err := b.queryTraces(ctx, tileNumber, q)
assert.NoError(t, err)
vec1 := vec32.New(256)
vec1[1] = 1.0
vec2 := vec32.New(256)
vec2[1] = 1.2
expected := types.TraceSet{
",config=8888,cpu=x86,": vec1,
",config=8888,cpu=arm,": vec2,
}
assert.Equal(t, expected, results)
results, err = b.QueryTracesByIndex(context.Background(), tileNumber, q)
assert.NoError(t, err)
assert.Equal(t, expected, results)
outCh, err := b.QueryTracesIDOnlyByIndex(ctx, tileNumber, q)
keys := []string{}
for key := range expected {
keys = append(keys, key)
}
assert.NoError(t, err)
for p := range outCh {
key, err := query.MakeKeyFast(p)
assert.NoError(t, err)
assert.Contains(t, keys, key)
}
out, errCh := b.tileKeys(ctx, tileNumber)
assert.Empty(t, errCh)
keys = []string{}
for s := range out {
ps, err := ops.DecodeParamsFromString(s)
assert.NoError(t, err)
key, err := query.MakeKeyFast(ps)
assert.NoError(t, err)
keys = append(keys, key)
}
assert.NoError(t, err)
sort.Strings(keys)
assert.Equal(t, []string{",config=565,cpu=arm,", ",config=565,cpu=x86,", ",config=8888,cpu=arm,", ",config=8888,cpu=x86,"}, keys)
// Now overwrite a value.
overWriteParams := []paramtools.Params{
{"cpu": "x86", "config": "8888"},
}
values = []float32{
2.0,
}
err = b.WriteTraces(257, overWriteParams, values, paramset, "gs://some/other/test/location", now)
assert.NoError(t, err)
ops, err = b.GetOrderedParamSet(ctx, tileNumber, time.Now())
assert.NoError(t, err)
count = assertIndices(t, ops, b, expectedParams, "Overwrite")
assert.Equal(t, 8, count)
// Query again to get the updated value.
results, err = b.queryTraces(context.Background(), tileNumber, q)
assert.NoError(t, err)
vec1 = vec32.New(256)
vec1[1] = 2.0
vec2 = vec32.New(256)
vec2[1] = 1.2
expected = types.TraceSet{
",config=8888,cpu=x86,": vec1,
",config=8888,cpu=arm,": vec2,
}
assert.Equal(t, expected, results)
results, err = b.QueryTracesByIndex(context.Background(), tileNumber, q)
assert.NoError(t, err)
assert.Equal(t, expected, results)
// Write in the next column.
writeParams := []paramtools.Params{
{"cpu": "x86", "config": "8888"},
}
values = []float32{
3.0,
}
err = b.WriteTraces(258, writeParams, values, paramset, "gs://some/other/test/location", now)
assert.NoError(t, err)
// Query again to get the updated value.
results, err = b.queryTraces(context.Background(), tileNumber, q)
assert.NoError(t, err)
vec1 = vec32.New(256)
vec1[1] = 2.0
vec1[2] = 3.0
vec2 = vec32.New(256)
vec2[1] = 1.2
expected = types.TraceSet{
",config=8888,cpu=x86,": vec1,
",config=8888,cpu=arm,": vec2,
}
assert.Equal(t, expected, results)
count = assertIndices(t, ops, b, expectedParams, "Write new value.")
assert.Equal(t, 8, count)
results, err = b.QueryTracesByIndex(context.Background(), tileNumber, q)
assert.NoError(t, err)
assert.Equal(t, expected, results)
// Write to a new trace.
writeParams = []paramtools.Params{
{"cpu": "risc-v", "config": "8888"},
}
values = []float32{
2.0,
}
paramset.AddParams(writeParams[0])
err = b.WriteTraces(258, writeParams, values, paramset, "gs://some/other/test/location", now)
assert.NoError(t, err)
// Add new trace to expectations.
expectedParams = append(expectedParams, writeParams[0])
ops, err = b.GetOrderedParamSet(ctx, tileNumber, time.Now())
assert.NoError(t, err)
count = assertIndices(t, ops, b, expectedParams, "Add new trace")
assert.Equal(t, 10, count)
// Remove indices and repopulate with WriteIndices.
muts := []*bigtable.Mutation{}
indexRowKeys := getIndexRowKeys(t, b)
for range indexRowKeys {
mut := bigtable.NewMutation()
mut.DeleteRow()
muts = append(muts, mut)
}
errs, err := b.getTable().ApplyBulk(ctx, indexRowKeys, muts)
assert.NoError(t, err)
for _, e := range errs {
assert.NoError(t, e)
}
// Confirm they have all been deleted.
count = assertIndices(t, ops, b, expectedParams, "Add new trace")
assert.Equal(t, 0, count)
// Write fresh indices.
err = b.WriteIndices(ctx, tileNumber)
assert.NoError(t, err)
// Confirm they are correct.
count = assertIndices(t, ops, b, expectedParams, "Add new trace")
assert.Equal(t, 10, count)
// Confirm we can get the source file location back.
traceId, err := query.MakeKey(paramtools.Params{"cpu": "x86", "config": "8888"})
assert.NoError(t, err)
s, err := b.GetSource(context.Background(), 258, traceId)
assert.NoError(t, err)
assert.Equal(t, "gs://some/other/test/location", s)
// Confirm we get an error trying to retrieve a source file that doesn't exist.
s, err = b.GetSource(context.Background(), 259, traceId)
assert.Error(t, err)
assert.Equal(t, "", s)
}
func TestTileKey(t *testing.T) {
unittest.SmallTest(t)
numShards := int32(3)
tileKey := TileKeyFromTileNumber(0)
assert.Equal(t, int32(math.MaxInt32), int32(tileKey))
assert.Equal(t, types.TileNumber(0), tileKey.Offset())
assert.Equal(t, "@2147483647", tileKey.OpsRowName())
assert.Equal(t, "2:2147483647:", tileKey.TraceRowPrefix(2))
assert.Equal(t, "1:2147483647:,0=1,", tileKey.TraceRowName(",0=1,", numShards))
rowName, shard := tileKey.TraceRowNameAndShard(",0=1,", numShards)
assert.Equal(t, "1:2147483647:,0=1,", rowName)
assert.Equal(t, uint32(1), shard)
tileKey = TileKeyFromTileNumber(1)
assert.Equal(t, int32(math.MaxInt32-1), int32(tileKey))
assert.Equal(t, "@2147483646", tileKey.OpsRowName())
assert.Equal(t, "3:2147483646:", tileKey.TraceRowPrefix(3))
rowName, shard = tileKey.TraceRowNameAndShard(",0=1,", numShards)
assert.Equal(t, "1:2147483646:,0=1,", rowName)
assert.Equal(t, uint32(1), shard)
rowName, shard = tileKey.TraceRowNameAndShard(",0=2,", numShards)
assert.Equal(t, "0:2147483646:,0=2,", rowName)
assert.Equal(t, uint32(0), shard)
tileKey = TileKeyFromTileNumber(-1)
assert.Equal(t, badBttsTileKey, tileKey)
var err error
tileKey, err = TileKeyFromOpsRowName("2147483646")
assert.Error(t, err)
assert.Equal(t, badBttsTileKey, tileKey)
tileKey, err = TileKeyFromOpsRowName("@2147483637")
assert.NoError(t, err)
assert.Equal(t, "@2147483637", tileKey.OpsRowName())
}
const rowJson = `{
"D": [
{
"Row": "@2147483643",
"Column": "D:H",
"Timestamp": 1536145696388000,
"Value": "NWY2MDQ5ZTk3ODdiMDcxMGFhY2U2MTYzNDU3NzRiNTk="
},
{
"Row": "@2147483643",
"Column": "D:OPS",
"Timestamp": 1536145696388000,
"Value": "Of+BAwEBD09yZGVyZWRQYXJhbVNldAH/ggABAgEIS2V5T3JkZXIB/4QAAQhQYXJhbVNldAH/hgAAABb/gwIBAQhbXXN0cmluZwH/hAABDAAAGf+FBAEBCFBhcmFtU2V0Af+GAAEMAf+EAAAY/4IBAQJvcwEBAm9zAgN3aW4FbGludXgA"
}
]
}`
func TestOpsCacheEntry(t *testing.T) {
unittest.SmallTest(t)
// Entry for an empty OPS.
o, err := NewOpsCacheEntry()
assert.NoError(t, err)
assert.Equal(t, 0, len(o.ops.KeyOrder))
assert.Equal(t, "c011636276b346664a4d3a473ff07fc5", o.hash)
// Entry for an OPS with just one key-value pair:
ops := o.ops.Copy()
ops.Update(paramtools.ParamSet{"config": []string{"8888"}})
o2, err := opsCacheEntryFromOPS(ops)
assert.NoError(t, err)
assert.Equal(t, []string{"config"}, o2.ops.KeyOrder)
assert.Equal(t, "7a59e4600a8f20900c933037d8e0011a", o2.hash)
// From a BigTable row.
goodRow := bigtable.Row{}
err = json.Unmarshal([]byte(rowJson), &goodRow)
assert.NoError(t, err)
o3, err := NewOpsCacheEntryFromRow(goodRow)
assert.NoError(t, err)
assert.Equal(t, []string{"os"}, o3.ops.KeyOrder)
assert.Equal(t, "5f6049e9787b0710aace616345774b59", o3.hash)
// Empty BT row.
row := bigtable.Row{}
_, err = NewOpsCacheEntryFromRow(row)
assert.Error(t, err)
// Nothing missing.
row = bigtable.Row{
"D": []bigtable.ReadItem{
goodRow["D"][0],
goodRow["D"][1],
},
}
_, err = NewOpsCacheEntryFromRow(row)
assert.NoError(t, err)
// Missing H.
row = bigtable.Row{
"D": []bigtable.ReadItem{
goodRow["D"][0],
},
}
_, err = NewOpsCacheEntryFromRow(row)
assert.Error(t, err)
// Missing OPS.
row = bigtable.Row{
"D": []bigtable.ReadItem{
goodRow["D"][1],
},
}
_, err = NewOpsCacheEntryFromRow(row)
assert.Error(t, err)
}
func TestBigTableTraceStore_IndexOfTileStart(t *testing.T) {
unittest.SmallTest(t)
tests := []struct {
name string
tileSize int32
index types.CommitNumber
want types.CommitNumber
}{
{
name: "basic",
tileSize: 100,
index: 2,
want: 0,
},
{
name: "offset",
tileSize: 100,
index: 202,
want: 200,
},
{
name: "offset exact",
tileSize: 100,
index: 200,
want: 200,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := &BigTableTraceStore{
tileSize: tt.tileSize,
}
if got := b.CommitNumberOfTileStart(tt.index); got != tt.want {
t.Errorf("BigTableTraceStore.IndexOfTileStart() = %v, want %v", got, tt.want)
}
})
}
}