blob: a75e0108d9642905ba0459cfcf37c6ac3f19f8e6 [file] [log] [blame]
package bt_tracestore
import (
"crypto/md5"
"fmt"
"runtime"
"sync"
"time"
"cloud.google.com/go/bigtable"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/tiling"
"go.skia.org/infra/golden/go/types"
)
// Constants adapted from btts.go
// See BIGTABLE.md for an overview of how the data is stored in BT.
const (
// Namespace for this package's data. Due to the fact that there is one table per instance,
// This makes sure we don't have collisions between traces and something else
// in BT (i.e. git info)
traceStoreNameSpace = "ts"
// Column Families.
// https://cloud.google.com/bigtable/docs/schema-design#column_families_and_column_qualifiers
opsFamily = "O" // ops short for OrderedParamSet
optionsFamily = "P" // oPtions map for a trace
traceFamily = "T" // Holds "0"..."tilesize-1" columns with a DigestID at each cell
// Columns in the OrderedParamSet column family.
opsHashColumn = "H"
opsOpsColumn = "OPS"
hashFullColName = opsFamily + ":" + opsHashColumn
opsFullColName = opsFamily + ":" + opsOpsColumn
// The only column in the optionsFamily - used to store the param map encoded
// liked a trace id: `,key1=value1,`
optionsBytesColumn = "B"
// The columns in the trace family are "0", "1", "2"..."N" where N is
// the BT tile size (default below). These values correspond to the commitOffset,
// where 0 is the first (most recent) commit in the tile and N is the last (oldest)
// commit in the tile.
// They will all have the following prefix.
traceFamilyPrefix = traceFamily + ":"
// Define the row types.
typeOPS = "o"
typeOptions = "p"
typeTrace = "t"
// We pad the columns so they are properly sorted lexicographically.
columnPad = "%03d"
// This is the size of the tile in Big Table. That is, how many commits do we store in one tile.
// We can have up to 2^32 tiles in big table, so this would let us store 1 trillion
// commits worth of data. This tile size does not need to be related to the tile size that
// Gold operates on (although when tuning, it should be greater than, or an even divisor
// of the Gold tile size). The first commit in the repo belongs to tile 2^32-1 and tile numbers
// decrease for newer commits. The columnPad const also depends on the number of digits of
// DefaultTileSize.
DefaultTileSize = 256
// Default number of shards used. A shard splits the traces up on a tile.
// If a trace exists on shard N in tile A, it will be on shard N for all tiles.
// Having traces on shards lets BT split up the work more evenly.
DefaultShards = 32
readTimeout = 4 * time.Minute
writeTimeout = 10 * time.Minute
maxTilesForDenseTile = 50
// BadTileKey is returned in error conditions.
badTileKey = tileKey(-1)
)
var (
// missingDigestBytes is the sentinel for types.MISSING_DIGEST
missingDigestBytes = []byte("")
)
// tileKey is the identifier for each tile held in BigTable.
//
// Note that tile keys are in the opposite order of tile offset, that is, the first commit
// in a repo goes in the first tile, which has key 2^32-1. We do this so more recent
// tiles come first in sort order.
type tileKey int32
// encodedTraceID is a shortened form of a tiling.TraceID, e.g. 0=1,1=3,3=0,
// Those indices are references to the OrderedParamSet stored in encTile.
// See params.paramsEncoder
type encodedTraceID string
// encTile contains an encoded tile.
type encTile struct {
// This being a slice is more performant than a map, since we only really
// needed to iterate over the data structure, not look anything up by trace.
traces []*encodedTracePair
ops *paramtools.OrderedParamSet
}
// maps a trace id to the list of digests.
type encodedTracePair struct {
ID encodedTraceID
// This corresponds to the commits, with index 0 being the oldest commit
// and the last commit being the most recent.
Digests [DefaultTileSize]types.Digest
}
// When ingesting we keep a cache of the OrderedParamSets we have seen per-tile.
type opsCacheEntry struct {
ops *paramtools.OrderedParamSet
hash string // md5 has of the serialized ops - used for deterministic querying.
}
// opsCacheEntryFromOPS creates and fills in an OpsCacheEntry from the given
// OrderedParamSet and sets the hash appropriately.
func opsCacheEntryFromOPS(ops *paramtools.OrderedParamSet) (*opsCacheEntry, error) {
buf, err := ops.Encode()
if err != nil {
return nil, skerr.Fmt("could not encode the given ops to bytes: %s", err)
}
hash := fmt.Sprintf("%x", md5.Sum(buf))
return &opsCacheEntry{
ops: ops,
hash: hash,
}, nil
}
// newOpsCacheEntry returns an empty OpsCacheEntry.
func newOpsCacheEntry() (*opsCacheEntry, error) {
return opsCacheEntryFromOPS(paramtools.NewOrderedParamSet())
}
// newOpsCacheEntryFromRow loads the appropriate data from the given BT row
// and returns a OpsCacheEntry with that data.
func newOpsCacheEntryFromRow(row bigtable.Row) (*opsCacheEntry, error) {
family := row[opsFamily]
if len(family) != 2 {
// This should never happen
return nil, skerr.Fmt("incorrect number of of OPS columns in BT for key %s, %d != 2", row.Key(), len(family))
}
ops := &paramtools.OrderedParamSet{}
hash := ""
for _, col := range family {
if col.Column == opsFullColName {
var err error
ops, err = paramtools.NewOrderedParamSetFromBytes(col.Value)
if err != nil {
// should never happen
return nil, skerr.Fmt("corrupted paramset in BT for key %s: %s", row.Key(), err)
}
} else if col.Column == hashFullColName {
hash = string(col.Value)
}
}
if hash == "" {
return nil, skerr.Fmt("missing hash for OPS for key %s: %#v", row.Key(), ops)
}
// You might be tempted to use opsCacheEntryFromOps and
// check that entry.hash == hash here, but that will fail
// because GoB encoding of maps is not deterministic.
entry := opsCacheEntry{
ops: ops,
hash: hash,
}
return &entry, nil
}
// Define this as a type so we can define some helper functions.
type traceMap map[tiling.TraceID]tiling.Trace
// CommitIndicesWithData returns the indexes of the commits with at least one non-missing
// digest in at least one trace. Since the traces always have DefaultTraceSize commits
// and might not be fully filled in, maxIndex tell us where to cut off the search so as
// to avoid unneeded work.
func (t traceMap) CommitIndicesWithData(maxIndex int) []int {
if len(t) == 0 {
return nil
}
nCommits := 0
for _, trace := range t {
nCommits = trace.Len()
break
}
// This is pretty expensive, as in the worst case, it will have to go through all digests of
// all traces.
// One optimization is: break the work up into chunks and run each chunk on a
// goroutine. That way all the CPUs can get involved searching every trace (if needed).
// Finding the best params for breaking the data up is potentially tricky, so right now we do
// the naive thing and break it up based on the number of CPUs. On a 4 core laptop, this improved
// the sparse case by 2x and the dense case by 1.5x over the naive implementation.
chunkSize := maxIndex / runtime.NumCPU()
// Prevent integer division from making this 0 (or other small numbers where the overhead
// may not be worth it).
if chunkSize < 4 {
chunkSize = 4
}
wg := sync.WaitGroup{}
// store data to a slice of bools so we can safely share it between the goroutines without
// a mutex (which had some contention problems in the dense case).
haveData := make([]bool, maxIndex)
for i := 0; i < maxIndex; i += chunkSize {
wg.Add(1)
go func(start int) {
defer wg.Done()
for i := start; i < start+chunkSize && i < maxIndex && i < nCommits; i++ {
for _, trace := range t {
gt := trace.(*types.GoldenTrace)
if !gt.IsMissing(i) {
haveData[i] = true
break
}
}
}
}(i)
}
wg.Wait()
var indices []int
for i, b := range haveData {
if b {
indices = append(indices, i)
}
}
return indices
}
// MakeFromCommitIndexes creates a new traceMap from the data in this one that
// only has the digests belonging to the given commit indices. Conceptually,
// this grabs a subset of the commit columns from the tile.
func (t traceMap) MakeFromCommitIndexes(indices []int) traceMap {
if len(indices) == 0 {
return traceMap{}
}
r := make(traceMap, len(t))
for id, trace := range t {
gt := trace.(*types.GoldenTrace)
newDigests := make([]types.Digest, len(indices))
for i, idx := range indices {
newDigests[i] = gt.Digests[idx]
}
r[id] = &types.GoldenTrace{
Keys: gt.Keys,
Digests: newDigests,
}
}
return r
}
// PrependTraces augments this traceMap with the data from the given one.
// Specifically, it prepends that data, assuming the "other" data came
// before the data in this map.
// TODO(kjlubick): Deduplicate this with tiling.Merge
func (t traceMap) PrependTraces(other traceMap) {
numCommits := 0
for _, trace := range t {
numCommits = trace.Len()
break
}
numOtherCommits := 0
for id, trace := range other {
numOtherCommits = trace.Len()
original, ok := t[id]
if ok {
// Keys are constant and are what the id is derived from
t[id] = trace.Merge(original)
} else {
// if we stopped seeing the trace in t, we need to pad the end with MISSING_DIGEST
trace.Grow(numOtherCommits+numCommits, tiling.FILL_AFTER) // Assumes we can modify other
t[id] = trace
}
}
// if we saw a trace in t, but not in other, we need to pad the beginning with MISSING_DIGEST
for id, trace := range t {
if _, ok := other[id]; !ok {
trace.Grow(numOtherCommits+numCommits, tiling.FILL_BEFORE)
}
}
}
// Most of the strings aren't unique. For example, in a single, stable trace,
// the same digest may be used for all commits in the row. Thus, we don't want
// to have to allocate memory on the heap for each of those strings, we can
// just reuse those (immutable) strings.
// THIS IS NOT SAFE to be shared between goroutines.
type digestCache map[[md5.Size]byte]types.Digest
func (c digestCache) FromBytesOrCache(b []byte) types.Digest {
if len(b) != md5.Size {
return types.MISSING_DIGEST
}
// Allocate a small array on the stack, then copy the bytes
// into it and use that as the key in the map.
// This is faster than k := string(b), maybe because of
// extra copies or simply that runtime.mapaccess2_faststr is slower
// than runtime.mapaccess2 (used by array).
k := [md5.Size]byte{}
copy(k[:], b)
d, ok := c[k]
if ok {
return d
}
d = fromBytes(b)
c[k] = d
return d
}
// Most of the options aren't unique. For example, in a single tile,
// the same options can be shared across most traces with the same name.
// THIS IS NOT SAFE to be shared between goroutines.
type paramCache map[string]paramtools.Params
func (c paramCache) FromBytesOrCache(b []byte) paramtools.Params {
if len(b) == 0 {
return paramtools.Params{}
}
k := string(b)
p, ok := c[k]
if ok {
return p
}
p = decodeParams(b)
c[k] = p
return p
}