blob: b60dddc5e2b12fd38ec02e1718db43768ba819da [file] [log] [blame]
// Package bt_tracestore implements a tracestore backed by BigTable
// See BIGTABLE.md for an overview of the schema and design.
package bt_tracestore
import (
"context"
"fmt"
"hash/crc32"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"cloud.google.com/go/bigtable"
"go.skia.org/infra/go/bt"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/tiling"
"go.skia.org/infra/go/util"
"go.skia.org/infra/go/vcsinfo"
"go.skia.org/infra/golden/go/tracestore"
"go.skia.org/infra/golden/go/types"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
)
// InitBT initializes the BT instance for the given configuration. It uses the default way
// to get auth information from the environment and must be called with an account that has
// admin rights.
func InitBT(ctx context.Context, conf BTConfig) error {
if conf.ProjectID == "" || conf.InstanceID == "" || conf.TableID == "" {
return skerr.Fmt("invalid config: must specify all parts of BTConfig")
}
// Set up admin client, tables, and column families.
adminClient, err := bigtable.NewAdminClient(ctx, conf.ProjectID, conf.InstanceID)
if err != nil {
return skerr.Wrapf(err, "creating admin client for project %s and instance %s", conf.ProjectID, conf.InstanceID)
}
err = adminClient.CreateTableFromConf(ctx, &bigtable.TableConf{
TableID: conf.TableID,
Families: map[string]bigtable.GCPolicy{
opsFamily: bigtable.MaxVersionsPolicy(1),
optionsFamily: bigtable.MaxVersionsPolicy(1),
traceFamily: bigtable.MaxVersionsPolicy(1),
},
})
// Create the table. Ignore error if it already existed.
err, code := bt.ErrToCode(err)
if err != nil && code != codes.AlreadyExists {
return skerr.Wrapf(err, "creating table %s", conf.TableID)
} else {
sklog.Infof("Created table %s on %s instance in project %s", conf.TableID, conf.InstanceID, conf.ProjectID)
}
return nil
}
// BTConfig contains the configuration information for the BigTable-based implementation of
// TraceStore.
type BTConfig struct {
ProjectID string
InstanceID string
TableID string
VCS vcsinfo.VCS
}
// BTTraceStore implements the TraceStore interface.
type BTTraceStore struct {
vcs vcsinfo.VCS
client *bigtable.Client
table *bigtable.Table
// if cacheOps is true, then cache the OrderedParamSets between calls
// where possible.
cacheOps bool
// maps rowName (string) -> *OpsCacheEntry
opsCache sync.Map
}
// New implements the TraceStore interface backed by BigTable. If cache is true,
// the OrderedParamSets will be cached based on the row name.
func New(ctx context.Context, conf BTConfig, cache bool) (*BTTraceStore, error) {
client, err := bigtable.NewClient(ctx, conf.ProjectID, conf.InstanceID)
if err != nil {
return nil, skerr.Wrapf(err, "could not instantiate client")
}
ret := &BTTraceStore{
vcs: conf.VCS,
client: client,
table: client.Open(conf.TableID),
cacheOps: cache,
}
return ret, nil
}
// Put implements the TraceStore interface.
func (b *BTTraceStore) Put(ctx context.Context, commitHash string, entries []*tracestore.Entry, ts time.Time) error {
defer metrics2.FuncTimer().Stop()
// if there are no entries this becomes a no-op.
if len(entries) == 0 {
return nil
}
// Accumulate all parameters into a paramset and collect all the digests.
paramSet := make(paramtools.ParamSet, len(entries[0].Params))
digestSet := make(types.DigestSet, len(entries))
for _, entry := range entries {
paramSet.AddParams(entry.Params)
digestSet[entry.Digest] = true
}
repoIndex, err := b.vcs.IndexOf(ctx, commitHash)
if err != nil {
err = b.vcs.Update(ctx, true, false)
if err != nil {
return skerr.Wrapf(err, "could not update VCS to look up %s", commitHash)
}
repoIndex, err = b.vcs.IndexOf(ctx, commitHash)
}
if err != nil {
return skerr.Wrapf(err, "could not look up commit index of %s", commitHash)
}
commit, err := b.vcs.Details(ctx, commitHash, false)
if err != nil {
return skerr.Wrapf(err, "could not look up commit details for %s", commitHash)
}
// Find out what tile we need to fetch and what index into that tile we need.
// Reminder that tileKeys start at 2^32-1 and decrease in value.
tileKey, commitIndex := b.getTileKey(repoIndex)
// If these entries have any params we haven't seen before, we need to store those in BigTable.
ops, err := b.updateOrderedParamSet(ctx, tileKey, paramSet)
if err != nil {
sklog.Warningf("Bad paramset: %#v", paramSet)
return skerr.Wrapf(err, "cannot update paramset")
}
// These are two parallel arrays. mutations[i] should be applied to rowNames[i] for all i.
rowNames, mutations, err := b.createPutMutations(entries, tileKey, commitIndex, ops, ts, commit.Timestamp)
if err != nil {
return skerr.Wrapf(err, "could not create mutations to put data for tile %d", tileKey)
}
// Write the trace data. We pick a batchsize based on the assumption
// that the whole batch should be 2MB large and each entry is ~200 Bytes of data.
// 2MB / 200B = 10000. This is extremely conservative but should not be a problem
// since the batches are written in parallel.
return b.applyBulkBatched(ctx, rowNames, mutations, 10000)
}
// createPutMutations is a helper function that returns two parallel arrays of
// the rows that need updating and the mutations to apply to those rows.
// Specifically, the mutations will add the given entries to BT, clearing out
// anything that was there previously.
func (b *BTTraceStore) createPutMutations(entries []*tracestore.Entry, tk tileKey, commitIndex int, ops *paramtools.OrderedParamSet, digestTS, optionsTS time.Time) ([]string, []*bigtable.Mutation, error) {
// These mutations...
mutations := make([]*bigtable.Mutation, 0, len(entries))
// .. should be applied to these rows.
rowNames := make([]string, 0, len(entries))
for _, entry := range entries {
// To save space, traceID isn't the long form tiling.TraceID
// (e.g. ,foo=bar,baz=gm,), it's a string of key-value numbers
// that refer to the params.(e.g. ,0=3,2=18,)
// See params.paramsEncoder
sTrace, err := ops.EncodeParamsAsString(entry.Params)
if err != nil {
return nil, nil, skerr.Wrapf(err, "invalid params")
}
traceID := encodedTraceID(sTrace)
rowName := b.calcShardedRowName(tk, typeTrace, string(traceID))
rowNames = append(rowNames, rowName)
// Create a mutation that puts the given digest at the given row
// (i.e. the trace combined with the tile), at the given column
// (i.e. the commit offset into this tile).
mut := bigtable.NewMutation()
column := fmt.Sprintf(columnPad, commitIndex)
mut.Set(traceFamily, column, bigtable.Time(digestTS), toBytes(entry.Digest))
mutations = append(mutations, mut)
if len(entry.Options) == 0 {
continue
}
rowName = b.calcShardedRowName(tk, typeOptions, string(traceID))
rowNames = append(rowNames, rowName)
pBytes := encodeParams(entry.Options)
mut.Set(optionsFamily, optionsBytesColumn, bigtable.Time(optionsTS), pBytes)
mutations = append(mutations, mut)
}
return rowNames, mutations, nil
}
// GetTile implements the TraceStore interface.
// Of note, due to this request possibly spanning over multiple tiles, the ParamsSet may have a
// set of params that does not actually correspond to a trace (this shouldn't be a problem, but is
// worth calling out). For example, suppose a trace with param " device=alpha" abruptly ends on
// tile 4, commit 7 (where the device was removed from testing). If we are on tile 5 and need to
// query both tile 4 starting at commit 10 and tile 5 (the whole thing), we'll just merge the
// paramsets from both tiles, which includes the "device=alpha" params, but they don't exist in
// any traces seen in the tile (since it ended prior to our cutoff point).
func (b *BTTraceStore) GetTile(ctx context.Context, nCommits int) (*tiling.Tile, []*tiling.Commit, error) {
defer metrics2.FuncTimer().Stop()
// Look up the commits we need to query from BT
idxCommits := b.vcs.LastNIndex(nCommits)
if len(idxCommits) == 0 {
return nil, nil, skerr.Fmt("No commits found.")
}
// These commits could span across multiple tiles, so derive the tiles we need to query.
c := idxCommits[0]
startTileKey, startCommitIndex := b.getTileKey(c.Index)
c = idxCommits[len(idxCommits)-1]
endTileKey, endCommitIndex := b.getTileKey(c.Index)
var egroup errgroup.Group
var commits []*tiling.Commit
egroup.Go(func() error {
hashes := make([]string, 0, len(idxCommits))
for _, ic := range idxCommits {
hashes = append(hashes, ic.Hash)
}
var err error
commits, err = b.makeTileCommits(ctx, hashes)
if err != nil {
return skerr.Wrapf(err, "could not load tile commits")
}
return nil
})
var traces traceMap
var params paramtools.ParamSet
egroup.Go(func() error {
var err error
traces, params, err = b.getTracesInRange(ctx, startTileKey, endTileKey, startCommitIndex, endCommitIndex)
if err != nil {
return skerr.Wrapf(err, "could not load tile commits (%d, %d, %d, %d)", startTileKey, endTileKey, startCommitIndex, endCommitIndex)
}
return nil
})
if err := egroup.Wait(); err != nil {
return nil, nil, skerr.Wrapf(err, "could not load last %d commits into tile", nCommits)
}
ret := &tiling.Tile{
Traces: traces,
ParamSet: params,
Commits: commits,
Scale: 0,
}
return ret, commits, nil
}
// getTracesInRange returns a traceMap with data from the given start and stop points (tile and index).
// It also includes the ParamSet for that range.
func (b *BTTraceStore) getTracesInRange(ctx context.Context, startTileKey, endTileKey tileKey, startCommitIndex, endCommitIndex int) (traceMap, paramtools.ParamSet, error) {
sklog.Debugf("getTracesInRange(%d, %d, %d, %d)", startTileKey, endTileKey, startCommitIndex, endCommitIndex)
// Query those tiles.
nTiles := int(startTileKey - endTileKey + 1)
nCommits := int(startTileKey-endTileKey)*int(DefaultTileSize) + (endCommitIndex - startCommitIndex) + 1
encTiles := make([]*encTile, nTiles)
options := make([]map[encodedTraceID]paramtools.Params, nTiles)
var egroup errgroup.Group
tk := startTileKey
for idx := 0; idx < nTiles; idx++ {
func(idx int, tk tileKey) {
egroup.Go(func() error {
var err error
encTiles[idx], err = b.loadTile(ctx, tk)
if err != nil {
return skerr.Wrapf(err, "could not load tile with key %d to index %d", tk, idx)
}
return nil
})
egroup.Go(func() error {
var err error
options[idx], err = b.loadOptions(ctx, tk)
if err != nil {
return skerr.Wrapf(err, "could not load options with key %d to index %d", tk, idx)
}
return nil
})
}(idx, tk)
tk--
}
if err := egroup.Wait(); err != nil {
return nil, nil, skerr.Wrapf(err, "could not load %d tiles", nTiles)
}
// This is the full tile we are going to return.
tileTraces := make(traceMap, len(encTiles[0].traces))
paramSet := paramtools.ParamSet{}
// traceIdx tracks the index we are writing digests into the trace.
traceIdx := nCommits
// We go backwards to make it easier to identify the most recent Options in the map.
for idx := nTiles - 1; idx >= 0; idx-- {
encTile := encTiles[idx]
// Determine the offset within the tile that we should consider.
startOffset := 0
if idx == 0 {
// If we are on the first tile, start at startCommitIndex
startOffset = startCommitIndex
}
endOffset := DefaultTileSize - 1
if idx == (nTiles - 1) {
// If we are on the last tile, end at endCommitIndex
endOffset = endCommitIndex
}
segLen := endOffset - startOffset + 1
traceIdx -= segLen
if idx == 0 && traceIdx != 0 {
// Check our math - should never happen.
return nil, nil, skerr.Fmt("incorrect tile math (tile %d, index %d) => tile(%d, index %d) was not %d commits", startTileKey, startCommitIndex, endTileKey, endCommitIndex, nCommits)
}
for _, pair := range encTile.traces {
// at this point, pair.ID looks like ,0=1,1=3,3=0,
// See params.paramsEncoder
params, err := encTile.ops.DecodeParamsFromString(string(pair.ID))
if err != nil {
// This can occur because we read the tile's OPS and the tile's
// traces concurrently. If Put adds a new trace to the tile after
// we have read the OPS, we may see the new trace, which may contain
// params that are not in our copy of the OPS. We don't promise that
// Put is atomic, so it's fine to just skip this trace.
sklog.Warningf("Unreadable trace key - could not decode %s: %s", pair.ID, err)
continue
}
// Turn the params into the tiling.TraceID we expect elsewhere.
traceKey := tracestore.TraceIDFromParams(params)
if _, ok := tileTraces[traceKey]; !ok {
if opts, ok := options[idx][pair.ID]; ok {
params.Add(opts)
}
gt := types.NewEmptyGoldenTrace(nCommits, params)
tileTraces[traceKey] = gt
// Build up the total set of params
paramSet.AddParams(params)
}
trace := tileTraces[traceKey].(*types.GoldenTrace)
digests := pair.Digests[startOffset : startOffset+segLen]
copy(trace.Digests[traceIdx:traceIdx+segLen], digests)
}
}
// Sort the params for determinism.
paramSet.Normalize()
return tileTraces, paramSet, nil
}
// GetDenseTile implements the TraceStore interface. It fetches the most recent tile and sees if
// there is enough non-empty data, then queries the next oldest tile until it has nCommits
// non-empty commits.
func (b *BTTraceStore) GetDenseTile(ctx context.Context, nCommits int) (*tiling.Tile, []*tiling.Commit, error) {
sklog.Debugf("GetDenseTile(%d)", nCommits)
defer metrics2.FuncTimer().Stop()
// Figure out what index we are on.
idxCommits := b.vcs.LastNIndex(1)
if len(idxCommits) == 0 {
return nil, nil, skerr.Fmt("No commits found.")
}
c := idxCommits[0]
endKey, endIdx := b.getTileKey(c.Index)
tileStartCommitIdx := c.Index - endIdx
// Given nCommits and the current index, we can figure out how many tiles to
// request on the first batch (assuming everything has data).
n := nCommits - endIdx
startKey := endKey
for n > 0 {
n -= DefaultTileSize
tileStartCommitIdx -= DefaultTileSize
startKey++
}
// commitsWithData is a slice of indexes of commits that have data. These indexes are
// relative to the repo itself, with index 0 being the first (oldest) commit in the repo.
commitsWithData := make([]int, 0, nCommits)
paramSet := paramtools.ParamSet{}
allTraces := traceMap{}
// Start at the most recent tile(s) and step backwards until we have enough commits with data.
for i := 0; i < maxTilesForDenseTile; i++ {
commitsToFetch := int(startKey-endKey)*DefaultTileSize + endIdx + 1
traces, params, err := b.getTracesInRange(ctx, startKey, endKey, 0, endIdx)
if err != nil {
return nil, nil, skerr.Wrapf(err, "could not load commits from %d-0 to %d-%d", startKey, endKey, endIdx)
}
paramSet.AddParamSet(params)
// filledCommits are the indexes in the traces that have data.
// That is, they are the indexes of commits in this tile.
// It will be sorted from low indexes to high indexes
filledCommits := traces.CommitIndicesWithData(commitsToFetch)
sklog.Debugf("Got the following commits with data: %d", filledCommits)
density := float64(len(filledCommits)) / float64(commitsToFetch)
metrics2.GetFloat64SummaryMetric("tile_density").Observe(density)
if len(filledCommits)+len(commitsWithData) > nCommits {
targetLength := nCommits - len(commitsWithData)
// trim filledCommits so we get to exactly nCommits
filledCommits = filledCommits[len(filledCommits)-targetLength:]
}
for _, tileIdx := range filledCommits {
commitsWithData = append(commitsWithData, tileStartCommitIdx+tileIdx)
}
cTraces := traces.MakeFromCommitIndexes(filledCommits)
allTraces.PrependTraces(cTraces)
if len(commitsWithData) >= nCommits || startKey == tileKeyFromIndex(0) {
break
}
startKey++ // go backwards in time one tile
endKey = startKey
endIdx = DefaultTileSize - 1 // fetch the whole previous tile
tileStartCommitIdx -= DefaultTileSize
}
if len(commitsWithData) == 0 {
return &tiling.Tile{}, nil, nil
}
// put them in oldest to newest order
sort.Ints(commitsWithData)
oldestIdx := commitsWithData[0]
oldestCommit, err := b.vcs.ByIndex(ctx, oldestIdx)
if err != nil {
return nil, nil, skerr.Wrapf(err, "invalid oldest index %d", oldestIdx)
}
hashes := b.vcs.From(oldestCommit.Timestamp.Add(-1 * time.Millisecond))
// There's no guarantee that hashes[0] == oldestCommit[0] (e.g. two commits at same timestamp)
// So we trim hashes down if necessary
for i := 0; i < len(hashes); i++ {
if hashes[i] == oldestCommit.Hash {
hashes = hashes[i:]
break
}
}
allCommits, err := b.makeTileCommits(ctx, hashes)
if err != nil {
return nil, nil, skerr.Wrapf(err, "could not make tile commits")
}
denseCommits := make([]*tiling.Commit, len(commitsWithData))
for i, idx := range commitsWithData {
denseCommits[i] = allCommits[idx-oldestIdx]
}
ret := &tiling.Tile{
Traces: allTraces,
ParamSet: paramSet,
Commits: denseCommits,
Scale: 0,
}
sklog.Debugf("GetDenseTile complete")
return ret, allCommits, nil
}
// getTileKey retrieves the tile key and the index of the commit in the given tile (commitIndex)
// given the index of a commit in the repo (repoIndex).
// commitIndex starts at 0 for the oldest commit in the tile.
func (b *BTTraceStore) getTileKey(repoIndex int) (tileKey, int) {
tileIndex := int32(repoIndex) / DefaultTileSize
commitIndex := repoIndex % int(DefaultTileSize)
return tileKeyFromIndex(tileIndex), commitIndex
}
// loadTile returns an *encTile corresponding to the tileKey.
func (b *BTTraceStore) loadTile(ctx context.Context, tileKey tileKey) (*encTile, error) {
defer metrics2.FuncTimer().Stop()
var egroup errgroup.Group
// Load the OrderedParamSet so the caller can decode the data from the tile.
var ops *paramtools.OrderedParamSet
egroup.Go(func() error {
opsEntry, _, err := b.getOPS(ctx, tileKey)
if err != nil {
return skerr.Wrapf(err, "could not load OPS")
}
ops = opsEntry.ops
return nil
})
var traces []*encodedTracePair
egroup.Go(func() error {
var err error
traces, err = b.loadEncodedTraces(ctx, tileKey)
if err != nil {
return skerr.Wrapf(err, "could not load traces")
}
return nil
})
if err := egroup.Wait(); err != nil {
return nil, err
}
return &encTile{
ops: ops,
traces: traces,
}, nil
}
// loadOptions returns the options map corresponding to the given tile.
func (b *BTTraceStore) loadOptions(ctx context.Context, tileKey tileKey) (map[encodedTraceID]paramtools.Params, error) {
defer metrics2.FuncTimer().Stop()
var egroup errgroup.Group
shardResults := make([]map[encodedTraceID]paramtools.Params, DefaultShards)
traceCount := int64(0)
// Query all shards in parallel.
for shard := int32(0); shard < DefaultShards; shard++ {
func(shard int32) {
egroup.Go(func() error {
// Most of the options aren't unique. For example, in a single tile,
// the same options can be shared across most traces with the same name.
uniqueParams := make(paramCache, 1000)
// This prefix will match all traces belonging to the
// current shard in the current tile.
prefixRange := bigtable.PrefixRange(shardedRowName(shard, typeOptions, tileKey, ""))
target := map[encodedTraceID]paramtools.Params{}
shardResults[shard] = target
// There should only be one column per row (optionsBytesColumn)
// The cell in this column should be the most recent based on the timestamp in Put
err := b.table.ReadRows(ctx, prefixRange, func(row bigtable.Row) bool {
// The encoded trace id is the "subkey" part of the row name.
encID := encodedTraceID(extractSubkey(row.Key()))
atomic.AddInt64(&traceCount, 1)
var p paramtools.Params
for _, c := range row[optionsFamily] {
p = uniqueParams.FromBytesOrCache(c.Value)
break
}
if len(p) == 0 {
return true
}
target[encID] = p
return true
}, bigtable.RowFilter(
bigtable.ChainFilters(
bigtable.FamilyFilter(optionsFamily),
bigtable.LatestNFilter(1),
bigtable.CellsPerRowLimitFilter(1),
),
))
if err != nil {
return skerr.Wrapf(err, "could not read options on shard %d", shard)
}
return nil
})
}(shard)
}
if err := egroup.Wait(); err != nil {
return nil, skerr.Wrapf(err, "could not read options")
}
// Merge all the results together
ret := make(map[encodedTraceID]paramtools.Params, traceCount)
for _, r := range shardResults {
for encID, opts := range r {
// different shards should never share results for a tracekey
// since a trace always maps to the same shard.
ret[encID] = opts
}
}
return ret, nil
}
// loadEncodedTraces returns all traces belonging to the given tileKey.
// As outlined in BIGTABLE.md, the trace ids and the digest ids they
// map to are in an encoded form and will need to be expanded prior to use.
func (b *BTTraceStore) loadEncodedTraces(ctx context.Context, tileKey tileKey) ([]*encodedTracePair, error) {
defer metrics2.FuncTimer().Stop()
var egroup errgroup.Group
shardResults := [DefaultShards][]*encodedTracePair{}
traceCount := int64(0)
// Query all shards in parallel.
for shard := int32(0); shard < DefaultShards; shard++ {
func(shard int32) {
// Most of the strings aren't unique. For example, in a single, stable trace,
// the same digest may be used for all commits in the row. Thus, we don't want
// to have to allocate memory on the heap for each of those strings, we can
// just reuse those (immutable) strings.
uniqueDigests := make(digestCache, 1000)
egroup.Go(func() error {
// This prefix will match all traces belonging to the
// current shard in the current tile.
prefixRange := bigtable.PrefixRange(shardedRowName(shard, typeTrace, tileKey, ""))
var parseErr error
err := b.table.ReadRows(ctx, prefixRange, func(row bigtable.Row) bool {
atomic.AddInt64(&traceCount, 1)
// The encoded trace id is the "subkey" part of the row name.
traceKey := encodedTraceID(extractSubkey(row.Key()))
pair := encodedTracePair{
ID: traceKey,
Digests: [DefaultTileSize]types.Digest{},
}
// kjlubick@ and benjaminwagner@ are not super sure why, but this
// helps reduce memory usage by allowing the allocated pair to be
// cleaned up after GetTile/GetDenseTile completes.
// Without this initialization step, the Digest arrays seem to
// persist, causing a 2-3x increase in tile RAM size.
// It might be due to a bug in golang and/or just that the GC gets a
// little confused due to the complex procedure of passing things around.
for i := range pair.Digests {
pair.Digests[i] = types.MISSING_DIGEST
}
for _, col := range row[traceFamily] {
// The columns are something like T:35 where the part
// after the colon is the commitIndex i.e. the index
// of this commit in the current tile.
idx, err := strconv.Atoi(strings.TrimPrefix(col.Column, traceFamilyPrefix))
if err != nil {
// Should never happen
parseErr = err
return false
}
if idx < 0 || idx >= DefaultTileSize {
// This would happen if the tile size changed from a past
// value. It shouldn't be changed, even if the Gold tile size
// (n_commits) changes.
parseErr = skerr.Fmt("got index %d that is outside of the target slice of length %d", idx, DefaultTileSize)
return false
}
d := uniqueDigests.FromBytesOrCache(col.Value)
pair.Digests[idx] = d
}
shardResults[shard] = append(shardResults[shard], &pair)
return true
}, bigtable.RowFilter(
bigtable.ChainFilters(
bigtable.FamilyFilter(traceFamily),
// can be used for local testing to keep RAM usage lower
//bigtable.RowSampleFilter(0.1),
bigtable.LatestNFilter(1),
bigtable.CellsPerRowLimitFilter(DefaultTileSize),
),
))
if err != nil {
return skerr.Wrapf(err, "could not read rows")
}
return parseErr
})
}(shard)
}
if err := egroup.Wait(); err != nil {
return nil, err
}
// Merge all the results together
ret := make([]*encodedTracePair, 0, traceCount)
for _, r := range shardResults {
// different shards should never share results for a tracekey
// since a trace always maps to the same shard.
ret = append(ret, r...)
}
return ret, nil
}
// applyBulkBatched writes the given rowNames/mutation pairs to BigTable in batches that are
// maximally of size 'batchSize'. The batches are written in parallel.
func (b *BTTraceStore) applyBulkBatched(ctx context.Context, rowNames []string, mutations []*bigtable.Mutation, batchSize int) error {
if len(rowNames) == 0 {
return nil
}
var egroup errgroup.Group
err := util.ChunkIter(len(rowNames), batchSize, func(chunkStart, chunkEnd int) error {
egroup.Go(func() error {
tctx, cancel := context.WithTimeout(ctx, writeTimeout)
defer cancel()
rowNames := rowNames[chunkStart:chunkEnd]
mutations := mutations[chunkStart:chunkEnd]
errs, err := b.table.ApplyBulk(tctx, rowNames, mutations)
if err != nil {
return skerr.Wrapf(err, "writing batch [%d:%d]", chunkStart, chunkEnd)
}
if errs != nil {
return skerr.Wrapf(err, "writing some portions of batch [%d:%d]", chunkStart, chunkEnd)
}
return nil
})
return nil
})
if err != nil {
return skerr.Wrapf(err, "running ChunkIter(%s...%s) batch size %d", rowNames[0], rowNames[len(rowNames)-1], batchSize)
}
return egroup.Wait()
}
// calcShardedRowName deterministically assigns a shard for the given subkey (e.g. traceID)
// Once this is done, the shard, rowtype, tileKey and the subkey are combined into a
// single string to be used as a row name in BT.
func (b *BTTraceStore) calcShardedRowName(tileKey tileKey, rowType, subkey string) string {
shard := int32(crc32.ChecksumIEEE([]byte(subkey)) % uint32(DefaultShards))
return shardedRowName(shard, rowType, tileKey, subkey)
}
// Copied from btts.go in infra/perf
// UpdateOrderedParamSet will add all params from 'p' to the OrderedParamSet
// for 'tileKey' and write it back to BigTable.
func (b *BTTraceStore) updateOrderedParamSet(ctx context.Context, tileKey tileKey, p paramtools.ParamSet) (*paramtools.OrderedParamSet, error) {
defer metrics2.FuncTimer().Stop()
tctx, cancel := context.WithTimeout(ctx, writeTimeout)
defer cancel()
var newEntry *opsCacheEntry
for {
// Get OPS.
entry, existsInBT, err := b.getOPS(ctx, tileKey)
if err != nil {
return nil, skerr.Wrapf(err, "failed to get OPS for tile %d", tileKey)
}
// If the OPS contains our paramset then we're done.
if delta := entry.ops.Delta(p); len(delta) == 0 {
return entry.ops, nil
}
// Create a new updated ops.
ops := entry.ops.Copy()
ops.Update(p)
newEntry, err = opsCacheEntryFromOPS(ops)
if err != nil {
return nil, skerr.Wrapf(err, "failed to create cache entry")
}
encodedOps, err := newEntry.ops.Encode()
if err != nil {
return nil, skerr.Wrapf(err, "failed to encode new ops")
}
now := bigtable.Time(time.Now())
condTrue := false
if existsInBT {
// Create an update that avoids the lost update problem.
cond := bigtable.ChainFilters(
bigtable.LatestNFilter(1),
bigtable.FamilyFilter(opsFamily),
bigtable.ColumnFilter(opsHashColumn),
bigtable.ValueFilter(string(entry.hash)),
)
updateMutation := bigtable.NewMutation()
updateMutation.Set(opsFamily, opsHashColumn, now, []byte(newEntry.hash))
updateMutation.Set(opsFamily, opsOpsColumn, now, encodedOps)
// Add a mutation that cleans up old versions.
before := bigtable.Time(now.Time().Add(-1 * time.Second))
updateMutation.DeleteTimestampRange(opsFamily, opsHashColumn, 0, before)
updateMutation.DeleteTimestampRange(opsFamily, opsOpsColumn, 0, before)
condUpdate := bigtable.NewCondMutation(cond, updateMutation, nil)
if err := b.table.Apply(tctx, tileKey.OpsRowName(), condUpdate, bigtable.GetCondMutationResult(&condTrue)); err != nil {
sklog.Warningf("Failed to apply: %s", err)
return nil, err
}
// If !condTrue then we need to try again,
// and clear our local cache.
if !condTrue {
sklog.Warningf("Exists !condTrue - clearing cache and trying again.")
b.opsCache.Delete(tileKey.OpsRowName())
continue
}
} else {
// Create an update that only works if the ops entry doesn't exist yet.
// I.e. only apply the mutation if the HASH column doesn't exist for this row.
cond := bigtable.ChainFilters(
bigtable.FamilyFilter(opsFamily),
bigtable.ColumnFilter(opsHashColumn),
)
updateMutation := bigtable.NewMutation()
updateMutation.Set(opsFamily, opsHashColumn, now, []byte(newEntry.hash))
updateMutation.Set(opsFamily, opsOpsColumn, now, encodedOps)
condUpdate := bigtable.NewCondMutation(cond, nil, updateMutation)
if err := b.table.Apply(tctx, tileKey.OpsRowName(), condUpdate, bigtable.GetCondMutationResult(&condTrue)); err != nil {
sklog.Warningf("Failed to apply: %s", err)
// clear cache and try again
b.opsCache.Delete(tileKey.OpsRowName())
continue
}
// If condTrue then we need to try again,
// and clear our local cache.
if condTrue {
sklog.Warningf("First Write condTrue - clearing cache and trying again.")
b.opsCache.Delete(tileKey.OpsRowName())
continue
}
}
// Successfully wrote OPS, so update the cache.
if b.cacheOps {
b.opsCache.Store(tileKey.OpsRowName(), newEntry)
}
break
}
return newEntry.ops, nil
}
// getOps returns the OpsCacheEntry for a given tile.
//
// Note that it will create a new OpsCacheEntry if none exists.
//
// getOps returns false if the OPS in BT was empty, true otherwise (even if cached).
func (b *BTTraceStore) getOPS(ctx context.Context, tk tileKey) (*opsCacheEntry, bool, error) {
defer metrics2.FuncTimer().Stop()
if b.cacheOps {
entry, ok := b.opsCache.Load(tk.OpsRowName())
if ok {
return entry.(*opsCacheEntry), true, nil
}
}
tctx, cancel := context.WithTimeout(ctx, readTimeout)
defer cancel()
row, err := b.table.ReadRow(tctx, tk.OpsRowName(), bigtable.RowFilter(
bigtable.ChainFilters(
bigtable.LatestNFilter(1),
bigtable.FamilyFilter(opsFamily),
),
))
if err != nil {
return nil, false, skerr.Wrapf(err, "failed to read OPS row %s from BigTable", tk.OpsRowName())
}
// If there is no entry in BigTable then return an empty OPS.
if len(row) == 0 {
sklog.Warningf("Failed to read OPS from BT for %s; the tile could be empty", tk.OpsRowName())
entry, err := newOpsCacheEntry()
return entry, false, err
}
entry, err := newOpsCacheEntryFromRow(row)
if err == nil && b.cacheOps {
b.opsCache.Store(tk.OpsRowName(), entry)
}
return entry, true, err
}
// makeTileCommits creates a slice of tiling.Commit from the given git hashes.
// Specifically, we need to look up the details to get the author information.
func (b *BTTraceStore) makeTileCommits(ctx context.Context, hashes []string) ([]*tiling.Commit, error) {
longCommits, err := b.vcs.DetailsMulti(ctx, hashes, false)
if err != nil {
// put hashes second in case they get truncated for being quite long.
return nil, skerr.Wrapf(err, "could not fetch commit data for commits with hashes %q", hashes)
}
commits := make([]*tiling.Commit, len(hashes))
for i, lc := range longCommits {
if lc == nil {
return nil, skerr.Fmt("commit %s not found from VCS", hashes[i])
}
commits[i] = &tiling.Commit{
Hash: lc.Hash,
Author: lc.Author,
CommitTime: lc.Timestamp.Unix(),
}
}
return commits, nil
}
// Make sure BTTraceStore fulfills the TraceStore Interface
var _ tracestore.TraceStore = (*BTTraceStore)(nil)