| // Package bt_tracestore implements a tracestore backed by BigTable |
| // See BIGTABLE.md for an overview of the schema and design. |
| package bt_tracestore |
| |
| import ( |
| "context" |
| "encoding/binary" |
| "hash/crc32" |
| "sort" |
| "strconv" |
| "strings" |
| "sync" |
| "sync/atomic" |
| "time" |
| |
| "cloud.google.com/go/bigtable" |
| "go.skia.org/infra/go/bt" |
| "go.skia.org/infra/go/metrics2" |
| "go.skia.org/infra/go/paramtools" |
| "go.skia.org/infra/go/skerr" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/tiling" |
| "go.skia.org/infra/go/util" |
| "go.skia.org/infra/go/vcsinfo" |
| "go.skia.org/infra/golden/go/tracestore" |
| "go.skia.org/infra/golden/go/types" |
| "golang.org/x/sync/errgroup" |
| ) |
| |
| // InitBT initializes the BT instance for the given configuration. It uses the default way |
| // to get auth information from the environment and must be called with an account that has |
| // admin rights. |
| func InitBT(conf BTConfig) error { |
| return bt.InitBigtable(conf.ProjectID, conf.InstanceID, conf.TableID, btColumnFamilies) |
| } |
| |
| // BTConfig contains the configuration information for the BigTable-based implementation of |
| // TraceStore. |
| type BTConfig struct { |
| ProjectID string |
| InstanceID string |
| TableID string |
| VCS vcsinfo.VCS |
| } |
| |
| // BTTraceStore implements the TraceStore interface. |
| type BTTraceStore struct { |
| vcs vcsinfo.VCS |
| client *bigtable.Client |
| table *bigtable.Table |
| |
| tileSize int32 |
| shards int32 |
| |
| // if cacheOps is true, then cache the OrderedParamSets between calls |
| // where possible. |
| cacheOps bool |
| // maps rowName (string) -> *OpsCacheEntry |
| opsCache sync.Map |
| |
| availIDsMutex sync.Mutex |
| availIDs []digestID |
| } |
| |
| // New implements the TraceStore interface backed by BigTable. If cache is true, |
| // the OrderedParamSets will be cached based on the row name. |
| func New(ctx context.Context, conf BTConfig, cache bool) (*BTTraceStore, error) { |
| client, err := bigtable.NewClient(ctx, conf.ProjectID, conf.InstanceID) |
| if err != nil { |
| return nil, skerr.Fmt("could not instantiate client: %s", err) |
| } |
| |
| ret := &BTTraceStore{ |
| vcs: conf.VCS, |
| client: client, |
| tileSize: DefaultTileSize, |
| shards: DefaultShards, |
| table: client.Open(conf.TableID), |
| cacheOps: cache, |
| availIDs: []digestID{}, |
| } |
| return ret, nil |
| } |
| |
| // Put implements the TraceStore interface. |
| func (b *BTTraceStore) Put(ctx context.Context, commitHash string, entries []*tracestore.Entry, ts time.Time) error { |
| defer metrics2.FuncTimer().Stop() |
| // if there are no entries this becomes a no-op. |
| if len(entries) == 0 { |
| return nil |
| } |
| |
| // Accumulate all parameters into a paramset and collect all the digests. |
| paramSet := make(paramtools.ParamSet, len(entries[0].Params)) |
| digestSet := make(types.DigestSet, len(entries)) |
| for _, entry := range entries { |
| paramSet.AddParams(entry.Params) |
| digestSet[entry.Digest] = true |
| } |
| |
| repoIndex, err := b.vcs.IndexOf(ctx, commitHash) |
| if err != nil { |
| return skerr.Fmt("could not look up commit %s: %s", commitHash, err) |
| } |
| |
| // Find out what tile we need to fetch and what index into that tile we need. |
| // Reminder that tileKeys start at 2^32-1 and decrease in value. |
| tileKey, commitIndex := b.getTileKey(repoIndex) |
| |
| // If these entries have any params we haven't seen before, we need to store those in BigTable. |
| ops, err := b.updateOrderedParamSet(ctx, tileKey, paramSet) |
| if err != nil { |
| sklog.Warningf("Bad paramset: %#v", paramSet) |
| return skerr.Fmt("cannot update paramset: %s", err) |
| } |
| |
| // Similarly, if we have some new digests (almost certainly), we need to update |
| // the digestMap with them in there. Of note, we store this |
| // map of string (types.Digest) -> int64(DigestId) in big table, then refer to |
| // the DigestID elsewhere in the table. DigestIds are essentially a monotonically |
| // increasing arbitrary number. |
| digestMap, err := b.updateDigestMap(ctx, digestSet) |
| if err != nil { |
| sklog.Warningf("Bad digestSet: %#v", digestSet) |
| return skerr.Fmt("cannot update digest map: %s", err) |
| } |
| |
| metrics2.GetInt64Metric("gold_digest_map_size").Update(int64(digestMap.Len())) |
| |
| if len(digestMap.Delta(digestSet)) != 0 { |
| // Should never happen |
| return skerr.Fmt("delta should be empty at this point: %v", digestMap.Delta(digestSet)) |
| } |
| |
| // These are two parallel arrays. mutations[i] should be applied to rowNames[i] for all i. |
| rowNames, mutations, err := b.createPutMutations(entries, ts, tileKey, commitIndex, ops, digestMap) |
| if err != nil { |
| return skerr.Fmt("could not create mutations to put data: %s", err) |
| } |
| |
| // Write the trace data. We pick a batchsize based on the assumption |
| // that the whole batch should be 2MB large and each entry is ~200 Bytes of data. |
| // 2MB / 200B = 10000. This is extremely conservative but should not be a problem |
| // since the batches are written in parallel. |
| return b.applyBulkBatched(ctx, rowNames, mutations, 10000) |
| } |
| |
| // createPutMutations is a helper function that returns two parallel arrays of |
| // the rows that need updating and the mutations to apply to those rows. |
| // Specifically, the mutations will add the given entries to BT, clearing out |
| // anything that was there previously. |
| func (b *BTTraceStore) createPutMutations(entries []*tracestore.Entry, ts time.Time, tk tileKey, commitIndex int, ops *paramtools.OrderedParamSet, dm *digestMap) ([]string, []*bigtable.Mutation, error) { |
| // These mutations... |
| mutations := make([]*bigtable.Mutation, 0, len(entries)) |
| // .. should be applied to these rows. |
| rowNames := make([]string, 0, len(entries)) |
| btTS := bigtable.Time(ts) |
| before := bigtable.Time(ts.Add(-1 * time.Millisecond)) |
| |
| for _, entry := range entries { |
| // To save space, traceID isn't the long form tiling.TraceId |
| // (e.g. ,foo=bar,baz=gm,), it's a string of key-value numbers |
| // that refer to the params.(e.g. ,0=3,2=18,) |
| // See params.paramsEncoder |
| sTrace, err := ops.EncodeParamsAsString(entry.Params) |
| if err != nil { |
| return nil, nil, skerr.Fmt("invalid params: %s", err) |
| } |
| traceID := encodedTraceID(sTrace) |
| |
| rowName := b.calcShardedRowName(tk, typeTrace, string(traceID)) |
| rowNames = append(rowNames, rowName) |
| |
| dID, err := dm.ID(entry.Digest) |
| if err != nil { |
| // this should never happen, the digest map should know about every digest already. |
| return nil, nil, skerr.Fmt("could not fetch id for digest %s: %s", entry.Digest, err) |
| } |
| |
| // Create a mutation that puts the given digest at the given row |
| // (i.e. the trace combined with the tile), at the given column |
| // (i.e. the commit offset into this tile). |
| mut := bigtable.NewMutation() |
| column := strconv.Itoa(commitIndex) |
| dBytes, err := dID.MarshalBinary() |
| if err != nil { |
| // this should never happen, we are just marshalling an int to binary |
| return nil, nil, skerr.Fmt("could not encode digest id %d to bytes: %s", dID, err) |
| } |
| mut.Set(traceFamily, column, btTS, dBytes) |
| // Delete anything that existed at this cell before now. |
| mut.DeleteTimestampRange(traceFamily, column, 0, before) |
| mutations = append(mutations, mut) |
| } |
| return rowNames, mutations, nil |
| } |
| |
| // GetTile implements the TraceStore interface. |
| // Of note, due to this request possibly spanning over multiple tiles, the ParamsSet may have a |
| // set of params that does not actually correspond to a trace (this shouldn't be a problem, but is |
| // worth calling out). For example, suppose a trace with param " device=alpha" abruptly ends on |
| // tile 4, commit 7 (where the device was removed from testing). If we are on tile 5 and need to |
| // query both tile 4 starting at commit 10 and tile 5 (the whole thing), we'll just merge the |
| // paramsets from both tiles, which includes the "device=alpha" params, but they don't exist in |
| // any traces seen in the tile (since it ended prior to our cutoff point). |
| func (b *BTTraceStore) GetTile(ctx context.Context, nCommits int) (*tiling.Tile, []*tiling.Commit, error) { |
| defer metrics2.FuncTimer().Stop() |
| // Look up the commits we need to query from BT |
| idxCommits := b.vcs.LastNIndex(nCommits) |
| if len(idxCommits) == 0 { |
| return nil, nil, skerr.Fmt("No commits found.") |
| } |
| |
| // These commits could span across multiple tiles, so derive the tiles we need to query. |
| c := idxCommits[0] |
| startTileKey, startCommitIndex := b.getTileKey(c.Index) |
| |
| c = idxCommits[len(idxCommits)-1] |
| endTileKey, endCommitIndex := b.getTileKey(c.Index) |
| |
| var egroup errgroup.Group |
| |
| var commits []*tiling.Commit |
| egroup.Go(func() error { |
| hashes := make([]string, 0, len(idxCommits)) |
| for _, ic := range idxCommits { |
| hashes = append(hashes, ic.Hash) |
| } |
| var err error |
| commits, err = b.makeTileCommits(ctx, hashes) |
| if err != nil { |
| return skerr.Fmt("could not load tile commits: %s", err) |
| } |
| return nil |
| }) |
| |
| var traces traceMap |
| var params paramtools.ParamSet |
| egroup.Go(func() error { |
| var err error |
| traces, params, err = b.getTracesInRange(ctx, startTileKey, endTileKey, startCommitIndex, endCommitIndex) |
| |
| if err != nil { |
| return skerr.Fmt("could not load tile commits: %s", err) |
| } |
| return nil |
| }) |
| |
| if err := egroup.Wait(); err != nil { |
| return nil, nil, skerr.Fmt("could not load last %d commits into tile: %s", nCommits, err) |
| } |
| |
| ret := &tiling.Tile{ |
| Traces: traces, |
| ParamSet: params, |
| Commits: commits, |
| Scale: 0, |
| } |
| |
| return ret, commits, nil |
| } |
| |
| // getTracesInRange returns a traceMap with data from the given start and stop points (tile and index). |
| // It also includes the ParamSet for that range. |
| func (b *BTTraceStore) getTracesInRange(ctx context.Context, startTileKey, endTileKey tileKey, startCommitIndex, endCommitIndex int) (traceMap, paramtools.ParamSet, error) { |
| // Query those tiles. |
| nTiles := int(startTileKey - endTileKey + 1) |
| nCommits := int(startTileKey-endTileKey)*int(b.tileSize) + (endCommitIndex - startCommitIndex) + 1 |
| encTiles := make([]*encTile, nTiles) |
| var egroup errgroup.Group |
| tk := startTileKey |
| for idx := 0; idx < nTiles; idx++ { |
| func(idx int, tk tileKey) { |
| egroup.Go(func() error { |
| var err error |
| encTiles[idx], err = b.loadTile(ctx, tk) |
| if err != nil { |
| return skerr.Fmt("could not load tile with key %d to index %d: %s", tk, idx, err) |
| } |
| return nil |
| }) |
| }(idx, tk) |
| tk-- |
| } |
| |
| var digestMap *digestMap |
| egroup.Go(func() error { |
| var err error |
| digestMap, err = b.getDigestMap(ctx) |
| if err != nil { |
| return skerr.Fmt("could not load digestMap: %s", err) |
| } |
| return nil |
| }) |
| |
| if err := egroup.Wait(); err != nil { |
| return nil, nil, skerr.Fmt("could not load %d tiles: %s", nTiles, err) |
| } |
| |
| // This is the full tile we are going to return. |
| tileTraces := make(traceMap, len(encTiles[0].traces)) |
| paramSet := paramtools.ParamSet{} |
| |
| commitIDX := 0 |
| for idx, encTile := range encTiles { |
| // Determine the offset within the tile that we should consider. |
| endOffset := int(b.tileSize - 1) |
| if idx == (len(encTiles) - 1) { |
| // If we are on the last tile, stop early (that is, at endCommitIndex) |
| endOffset = endCommitIndex |
| } |
| segLen := endOffset - startCommitIndex + 1 |
| |
| for encodedKey, encValues := range encTile.traces { |
| // at this point, the encodedKey looks like ,0=1,1=3,3=0, |
| // See params.paramsEncoder |
| params, err := encTile.ops.DecodeParamsFromString(string(encodedKey)) |
| if err != nil { |
| sklog.Warningf("Incomplete OPS: %#v\n", encTile.ops) |
| return nil, nil, skerr.Fmt("corrupted trace key - could not decode %s: %s", encodedKey, err) |
| } |
| |
| // Turn the params into the tiling.TraceId we expect elsewhere. |
| traceKey := tracestore.TraceIDFromParams(params) |
| if _, ok := tileTraces[traceKey]; !ok { |
| tileTraces[traceKey] = types.NewGoldenTraceN(nCommits) |
| } |
| gt := tileTraces[traceKey].(*types.GoldenTrace) |
| gt.Keys = params |
| // Build up the total set of params |
| paramSet.AddParams(params) |
| |
| // Convert the digests from integer IDs to strings. |
| digestIDs := encValues[startCommitIndex : startCommitIndex+segLen] |
| digests, err := digestMap.DecodeIDs(digestIDs) |
| if err != nil { |
| return nil, nil, skerr.Fmt("corrupted digest id - could not decode: %s", err) |
| } |
| copy(gt.Digests[commitIDX:commitIDX+segLen], digests) |
| } |
| |
| // After the first tile we always start at the first entry and advance the |
| // overall commit index by the segment length. |
| commitIDX += segLen |
| startCommitIndex = 0 |
| } |
| |
| // Sort the params for determinism. |
| paramSet.Normalize() |
| |
| return tileTraces, paramSet, nil |
| } |
| |
| // GetDenseTile implements the TraceStore interface. It fetches the most recent tile and sees if |
| // there is enough non-empty data, then queries the next oldest tile until it has nCommits |
| // non-empty commits. |
| func (b *BTTraceStore) GetDenseTile(ctx context.Context, nCommits int) (*tiling.Tile, []*tiling.Commit, error) { |
| defer metrics2.FuncTimer().Stop() |
| // Figure out what index we are on. |
| idxCommits := b.vcs.LastNIndex(1) |
| if len(idxCommits) == 0 { |
| return nil, nil, skerr.Fmt("No commits found.") |
| } |
| |
| c := idxCommits[0] |
| tKey, endIdx := b.getTileKey(c.Index) |
| tileStartCommitIdx := c.Index - endIdx |
| |
| // commitsWithData is a slice of indexes of commits that have data. These indexes are |
| // relative to the repo itself, with index 0 being the first (oldest) commit in the repo. |
| commitsWithData := make([]int, 0, nCommits) |
| paramSet := paramtools.ParamSet{} |
| allTraces := traceMap{} |
| |
| // Start at the most recent tile and step backwards until we have enough commits with data. |
| for { |
| traces, params, err := b.getTracesInRange(ctx, tKey, tKey, 0, endIdx) |
| |
| if err != nil { |
| return nil, nil, skerr.Fmt("could not load commits from tile %d: %s", tKey, err) |
| } |
| |
| paramSet.AddParamSet(params) |
| // filledCommits are the indexes in the traces that have data. |
| // That is, they are the indexes of commits in this tile. |
| // It will be sorted from low indexes to high indexes |
| filledCommits := traces.CommitIndicesWithData() |
| |
| if len(filledCommits)+len(commitsWithData) > nCommits { |
| targetLength := nCommits - len(commitsWithData) |
| // trim filledCommits so we get to exactly nCommits |
| filledCommits = filledCommits[len(filledCommits)-targetLength:] |
| } |
| |
| for _, tileIdx := range filledCommits { |
| commitsWithData = append(commitsWithData, tileStartCommitIdx+tileIdx) |
| } |
| cTraces := traces.MakeFromCommitIndexes(filledCommits) |
| allTraces.PrependTraces(cTraces) |
| |
| if len(commitsWithData) >= nCommits || tKey == tileKeyFromIndex(0) { |
| break |
| } |
| |
| tKey++ // go backwards in time one tile |
| endIdx = int(b.tileSize - 1) // fetch the whole previous tile |
| tileStartCommitIdx -= int(b.tileSize) |
| } |
| |
| if len(commitsWithData) == 0 { |
| return &tiling.Tile{}, nil, nil |
| } |
| // put them in oldest to newest order |
| sort.Ints(commitsWithData) |
| |
| oldestIdx := commitsWithData[0] |
| oldestCommit, err := b.vcs.ByIndex(ctx, oldestIdx) |
| if err != nil { |
| return nil, nil, skerr.Fmt("invalid oldest index %d: %s", oldestIdx, err) |
| } |
| hashes := b.vcs.From(oldestCommit.Timestamp.Add(-1 * time.Millisecond)) |
| |
| // There's no guarantee that hashes[0] == oldestCommit[0] (e.g. two commits at same timestamp) |
| // So we trim hashes down if necessary |
| for i := 0; i < len(hashes); i++ { |
| if hashes[i] == oldestCommit.Hash { |
| hashes = hashes[i:] |
| break |
| } |
| } |
| |
| allCommits, err := b.makeTileCommits(ctx, hashes) |
| if err != nil { |
| return nil, nil, skerr.Fmt("could not make tile commits: %s", err) |
| } |
| |
| denseCommits := make([]*tiling.Commit, len(commitsWithData)) |
| for i, idx := range commitsWithData { |
| denseCommits[i] = allCommits[idx-oldestIdx] |
| } |
| |
| ret := &tiling.Tile{ |
| Traces: allTraces, |
| ParamSet: paramSet, |
| Commits: denseCommits, |
| Scale: 0, |
| } |
| return ret, allCommits, nil |
| } |
| |
| // getTileKey retrieves the tile key and the index of the commit in the given tile (commitIndex) |
| // given the index of a commit in the repo (repoIndex). |
| // commitIndex starts at 0 for the oldest commit in the tile. |
| func (b *BTTraceStore) getTileKey(repoIndex int) (tileKey, int) { |
| tileIndex := int32(repoIndex) / b.tileSize |
| commitIndex := repoIndex % int(b.tileSize) |
| return tileKeyFromIndex(tileIndex), commitIndex |
| } |
| |
| // loadTile returns an *encTile corresponding to the tileKey. |
| func (b *BTTraceStore) loadTile(ctx context.Context, tileKey tileKey) (*encTile, error) { |
| defer metrics2.FuncTimer().Stop() |
| var egroup errgroup.Group |
| |
| // Load the OrderedParamSet so the caller can decode the data from the tile. |
| var ops *paramtools.OrderedParamSet |
| egroup.Go(func() error { |
| opsEntry, _, err := b.getOPS(ctx, tileKey) |
| if err != nil { |
| return skerr.Fmt("could not load OPS: %s", err) |
| } |
| ops = opsEntry.ops |
| return nil |
| }) |
| |
| var traces map[encodedTraceID][]digestID |
| egroup.Go(func() error { |
| var err error |
| traces, err = b.loadEncodedTraces(ctx, tileKey) |
| if err != nil { |
| return skerr.Fmt("could not load traces: %s", err) |
| } |
| return nil |
| }) |
| |
| if err := egroup.Wait(); err != nil { |
| return nil, err |
| } |
| |
| return &encTile{ |
| ops: ops, |
| traces: traces, |
| }, nil |
| } |
| |
| // loadEncodedTraces returns all traces belonging to the given tileKey. |
| // As outlined in BIGTABLE.md, the trace ids and the digest ids they |
| // map to are in an encoded form and will need to be expanded prior to use. |
| func (b *BTTraceStore) loadEncodedTraces(ctx context.Context, tileKey tileKey) (map[encodedTraceID][]digestID, error) { |
| defer metrics2.FuncTimer().Stop() |
| var egroup errgroup.Group |
| shardResults := make([]map[encodedTraceID][]digestID, b.shards) |
| traceCount := int64(0) |
| |
| // Query all shards in parallel. |
| for shard := int32(0); shard < b.shards; shard++ { |
| func(shard int32) { |
| egroup.Go(func() error { |
| // This prefix will match all traces belonging to the |
| // current shard in the current tile. |
| prefixRange := bigtable.PrefixRange(shardedRowName(shard, typeTrace, tileKey, "")) |
| target := map[encodedTraceID][]digestID{} |
| shardResults[shard] = target |
| var parseErr error |
| err := b.table.ReadRows(ctx, prefixRange, func(row bigtable.Row) bool { |
| // The encoded trace id is the "subkey" part of the row name. |
| traceKey := encodedTraceID(extractSubkey(row.Key())) |
| // If this is the first time we've seen the trace, initialize the |
| // slice of digest ids for it. |
| if _, ok := target[traceKey]; !ok { |
| target[traceKey] = make([]digestID, b.tileSize) |
| atomic.AddInt64(&traceCount, 1) |
| } |
| |
| for _, col := range row[traceFamily] { |
| // The columns are something like T:35 where the part |
| // after the colon is the commitIndex i.e. the index |
| // of this commit in the current tile. |
| idx, err := strconv.Atoi(strings.TrimPrefix(col.Column, traceFamilyPrefix)) |
| if err != nil { |
| // Should never happen |
| parseErr = err |
| return false |
| } |
| var dID digestID |
| if err := dID.UnmarshalBinary(col.Value); err != nil { |
| // This should never happen |
| parseErr = err |
| return false |
| } |
| if idx < 0 || idx >= int(b.tileSize) { |
| // This would happen if the tile size changed from a past |
| // value. It shouldn't be changed, even if the Gold tile size |
| // (n_commits) changes. |
| parseErr = skerr.Fmt("got index %d that is outside of the target slice of length %d", idx, len(target)) |
| return false |
| } |
| target[traceKey][idx] = dID |
| } |
| return true |
| }, bigtable.RowFilter(bigtable.LatestNFilter(1))) |
| if err != nil { |
| return skerr.Fmt("could not read rows: %s", err) |
| } |
| return parseErr |
| }) |
| }(shard) |
| } |
| |
| if err := egroup.Wait(); err != nil { |
| return nil, err |
| } |
| |
| // Merge all the results together |
| ret := make(map[encodedTraceID][]digestID, traceCount) |
| for _, r := range shardResults { |
| for traceKey, digestIDs := range r { |
| // different shards should never share results for a tracekey |
| // since a trace always maps to the same shard. |
| ret[traceKey] = digestIDs |
| } |
| } |
| |
| return ret, nil |
| } |
| |
| // applyBulkBatched writes the given rowNames/mutation pairs to BigTable in batches that are |
| // maximally of size 'batchSize'. The batches are written in parallel. |
| func (b *BTTraceStore) applyBulkBatched(ctx context.Context, rowNames []string, mutations []*bigtable.Mutation, batchSize int) error { |
| |
| var egroup errgroup.Group |
| err := util.ChunkIter(len(rowNames), batchSize, func(chunkStart, chunkEnd int) error { |
| egroup.Go(func() error { |
| tctx, cancel := context.WithTimeout(ctx, writeTimeout) |
| defer cancel() |
| rowNames := rowNames[chunkStart:chunkEnd] |
| mutations := mutations[chunkStart:chunkEnd] |
| errs, err := b.table.ApplyBulk(tctx, rowNames, mutations) |
| if err != nil { |
| return skerr.Fmt("error writing batch [%d:%d]: %s", chunkStart, chunkEnd, err) |
| } |
| if errs != nil { |
| return skerr.Fmt("error writing some portions of batch [%d:%d]: %s", chunkStart, chunkEnd, errs) |
| } |
| return nil |
| }) |
| return nil |
| }) |
| if err != nil { |
| return skerr.Fmt("error running ChunkIter: %s", err) |
| } |
| return egroup.Wait() |
| } |
| |
| // calcShardedRowName deterministically assigns a shard for the given subkey (e.g. traceID) |
| // Once this is done, the shard, rowtype, tileKey and the subkey are combined into a |
| // single string to be used as a row name in BT. |
| func (b *BTTraceStore) calcShardedRowName(tileKey tileKey, rowType, subkey string) string { |
| shard := int32(crc32.ChecksumIEEE([]byte(subkey)) % uint32(b.shards)) |
| return shardedRowName(shard, rowType, tileKey, subkey) |
| } |
| |
| // To avoid having one monolithic row, we take the first three characters of the digest |
| // and use it as a subkey in the row. Then, what remains is used as the column name. |
| // In practice this means our digests will be split using three hexadecimal characters, so |
| // we will have 16^3 = 4096 rows for our digest map. |
| func (b *BTTraceStore) rowAndColNameFromDigest(digest types.Digest) (string, string) { |
| subkey := string(digest[:3]) |
| colName := string(digest[3:]) |
| return b.calcShardedRowName(digestMapTile, typeDigestMap, subkey), colName |
| } |
| |
| // getDigestMap gets the global (i.e. same for all tiles) digestMap. |
| func (b *BTTraceStore) getDigestMap(ctx context.Context) (*digestMap, error) { |
| defer metrics2.FuncTimer().Stop() |
| // Query all shards in parallel. |
| var egroup errgroup.Group |
| shardResults := make([]map[types.Digest]digestID, b.shards) |
| total := int64(0) |
| for shard := int32(0); shard < b.shards; shard++ { |
| func(shard int32) { |
| egroup.Go(func() error { |
| prefRange := bigtable.PrefixRange(shardedRowName(shard, typeDigestMap, digestMapTile, "")) |
| var idx int64 |
| var parseErr error = nil |
| ret := map[types.Digest]digestID{} |
| err := b.table.ReadRows(ctx, prefRange, func(row bigtable.Row) bool { |
| digestPrefix := extractSubkey(row.Key()) |
| for _, col := range row[digestMapFamily] { |
| idx, parseErr = strconv.ParseInt(string(col.Value), 10, 64) |
| if parseErr != nil { |
| // Should never happen |
| return false |
| } |
| digest := types.Digest(digestPrefix + strings.TrimPrefix(col.Column, digestMapFamilyPrefix)) |
| ret[digest] = digestID(idx) |
| } |
| return true |
| }, bigtable.RowFilter(bigtable.LatestNFilter(1))) |
| |
| if err != nil { |
| return skerr.Fmt("problem fetching shard %d of digestmap: %s", shard, err) |
| } |
| if parseErr != nil { |
| return parseErr |
| } |
| |
| shardResults[shard] = ret |
| atomic.AddInt64(&total, int64(len(ret))) |
| return nil |
| }) |
| }(shard) |
| } |
| if err := egroup.Wait(); err != nil { |
| return nil, skerr.Fmt("problem fetching digestmap: %s", err) |
| } |
| |
| ret := newDigestMap(int(total)) |
| for _, dm := range shardResults { |
| if err := ret.Add(dm); err != nil { |
| // put the digest map latter in case it gets truncated |
| return nil, skerr.Fmt("could not build DigestMap: %s \nresults %#v", err, dm) |
| } |
| } |
| return ret, nil |
| } |
| |
| // getIDs returns a []DigestID of length n where each of the |
| // digestIDs are unique (even between processes). |
| func (b *BTTraceStore) getIDs(ctx context.Context, n int) ([]digestID, error) { |
| defer metrics2.FuncTimer().Stop() |
| // Extract up to n ids from those we have already cached. |
| b.availIDsMutex.Lock() |
| defer b.availIDsMutex.Unlock() |
| toExtract := util.MinInt(len(b.availIDs), n) |
| |
| ids := make([]digestID, 0, n) |
| ids = append(ids, b.availIDs[:toExtract]...) |
| b.availIDs = b.availIDs[toExtract:] |
| |
| // missing is how many ids we are short |
| missing := int64(n - len(ids)) |
| if missing == 0 { |
| return ids, nil |
| } |
| // For performance reasons, make a few big requests for ids instead of many small ones. |
| // That is, always request numReservedIds extra. |
| toRequest := missing + numReservedIds |
| // Reserve new IDs via the ID counter |
| rmw := bigtable.NewReadModifyWrite() |
| rmw.Increment(idCounterFamily, idCounterColumn, toRequest) |
| row, err := b.table.ApplyReadModifyWrite(ctx, idCounterRow, rmw) |
| if err != nil { |
| return nil, skerr.Fmt("could not fetch counter from BT: %s", err) |
| } |
| |
| // ri are the cells in Row of the given counter family |
| // This should be 1 cell belonging to 1 column. |
| ri, ok := row[idCounterFamily] |
| if !ok { |
| // should never happen |
| return nil, skerr.Fmt("malformed response - no id counter family: %#v", ri) |
| } |
| if len(ri) != 1 { |
| // should never happen |
| return nil, skerr.Fmt("malformed response - expected 1 cell: %#v", ri) |
| } |
| |
| maxID := digestID(binary.BigEndian.Uint64(ri[0].Value)) |
| |
| lastID := maxID - digestID(toRequest) |
| // ID of 0 is a special case - it's already assigned to MISSING_DIGEST, so skip it. |
| if lastID == missingDigestID { |
| lastID++ |
| } |
| for i := lastID; i < maxID; i++ { |
| // Give the first ids to the current allocation request... |
| if missing > 0 { |
| ids = append(ids, i) |
| } else { |
| // ... and put the remainder in the store for later. |
| b.availIDs = append(b.availIDs, i) |
| } |
| missing-- |
| } |
| |
| return ids, nil |
| } |
| |
| // returnIDs can be called with a []DigestID of ids that were not actually |
| // assigned to digests. This allows them to be used by future requests to |
| // getIDs. |
| func (b *BTTraceStore) returnIDs(unusedIDs []digestID) { |
| b.availIDsMutex.Lock() |
| defer b.availIDsMutex.Unlock() |
| b.availIDs = append(b.availIDs, unusedIDs...) |
| } |
| |
| // getOrAddDigests fills the given digestMap with the given digests |
| // assigned to a DigestID if they don't already have an assignment. |
| // This is a helper function for updateDigestMap |
| // TODO(kjlubick): This currently makes a lot of requests to BT - |
| // Should there be some caching done here to prevent that? |
| func (b *BTTraceStore) getOrAddDigests(ctx context.Context, digests []types.Digest, digestMap *digestMap) (*digestMap, error) { |
| defer metrics2.FuncTimer().Stop() |
| availIDs, err := b.getIDs(ctx, len(digests)) |
| if err != nil { |
| return nil, err |
| } |
| |
| now := bigtable.Time(time.Now()) |
| newIDMapping := make(map[types.Digest]digestID, len(digests)) |
| unusedIDs := make([]digestID, 0, len(availIDs)) |
| for idx, digest := range digests { |
| idVal := availIDs[idx] |
| if _, err := digestMap.ID(digest); err == nil { |
| // digestMap already has a mapping for this digest, no need to check |
| // if BT has seen it yet (because it has). |
| // Should never happen because we we've already done this check in updateDigestMap. |
| unusedIDs = append(unusedIDs, idVal) |
| continue |
| } |
| rowName, colName := b.rowAndColNameFromDigest(digest) |
| // This mutation says "Add an entry to the map for digest -> idVal iff |
| // the digest doesn't already have a mapping". |
| addMut := bigtable.NewMutation() |
| addMut.Set(digestMapFamily, colName, now, []byte(strconv.FormatInt(int64(idVal), 10))) |
| filter := bigtable.ColumnFilter(colName) |
| // Note that we only add the value if filter is false, i.e. the column does not |
| // already exist. |
| condMut := bigtable.NewCondMutation(filter, nil, addMut) |
| var digestAlreadyHadId bool |
| if err := b.table.Apply(ctx, rowName, condMut, bigtable.GetCondMutationResult(&digestAlreadyHadId)); err != nil { |
| return nil, skerr.Fmt("could not check if row %s col %s already had a DigestID: %s", rowName, colName, err) |
| } |
| |
| // We didn't need this ID so let's re-use it later. |
| if digestAlreadyHadId { |
| unusedIDs = append(unusedIDs, idVal) |
| } else { |
| newIDMapping[digest] = idVal |
| } |
| } |
| |
| // If all ids were added to BT, then we know our newIDMapping can simply be added |
| // to what we already have, since there were no collisions between digests and what |
| // was in the table already. |
| if len(unusedIDs) == 0 { |
| if err := digestMap.Add(newIDMapping); err != nil { |
| return nil, err |
| } |
| return digestMap, nil |
| } |
| // At this point, some of the digests already had ids, so we should reload |
| // the entire digestMap to make sure we have the full picture. |
| // TODO(kjlubick): Can we not just add what new ones we saw to what we already have? |
| |
| // Return the unused IDs for later use. |
| b.returnIDs(unusedIDs) |
| return b.getDigestMap(ctx) |
| } |
| |
| // updateDigestMap returns the current global DigestMap after making sure the given |
| // digests are a part of it. |
| func (b *BTTraceStore) updateDigestMap(ctx context.Context, digests types.DigestSet) (*digestMap, error) { |
| defer metrics2.FuncTimer().Stop() |
| // Load the digest map from BT. |
| // TODO(kjlubick): should we cache this map and first check to see if the digests |
| // are all in there? |
| digestMap, err := b.getDigestMap(ctx) |
| if err != nil { |
| return nil, err |
| } |
| |
| delta := digestMap.Delta(digests) |
| if len(delta) == 0 { |
| return digestMap, nil |
| } |
| |
| return b.getOrAddDigests(ctx, delta, digestMap) |
| } |
| |
| // Copied from btts.go in infra/perf |
| |
| // UpdateOrderedParamSet will add all params from 'p' to the OrderedParamSet |
| // for 'tileKey' and write it back to BigTable. |
| func (b *BTTraceStore) updateOrderedParamSet(ctx context.Context, tileKey tileKey, p paramtools.ParamSet) (*paramtools.OrderedParamSet, error) { |
| defer metrics2.FuncTimer().Stop() |
| |
| tctx, cancel := context.WithTimeout(ctx, writeTimeout) |
| defer cancel() |
| var newEntry *opsCacheEntry |
| for { |
| // Get OPS. |
| entry, existsInBT, err := b.getOPS(ctx, tileKey) |
| if err != nil { |
| return nil, skerr.Fmt("failed to get OPS: %s", err) |
| } |
| |
| // If the OPS contains our paramset then we're done. |
| if delta := entry.ops.Delta(p); len(delta) == 0 { |
| return entry.ops, nil |
| } |
| |
| // Create a new updated ops. |
| ops := entry.ops.Copy() |
| ops.Update(p) |
| newEntry, err = opsCacheEntryFromOPS(ops) |
| if err != nil { |
| return nil, skerr.Fmt("failed to create cache entry: %s", err) |
| } |
| encodedOps, err := newEntry.ops.Encode() |
| if err != nil { |
| return nil, skerr.Fmt("failed to encode new ops: %s", err) |
| } |
| |
| now := bigtable.Time(time.Now()) |
| condTrue := false |
| if existsInBT { |
| // Create an update that avoids the lost update problem. |
| cond := bigtable.ChainFilters( |
| bigtable.LatestNFilter(1), |
| bigtable.FamilyFilter(opsFamily), |
| bigtable.ColumnFilter(opsHashColumn), |
| bigtable.ValueFilter(string(entry.hash)), |
| ) |
| updateMutation := bigtable.NewMutation() |
| updateMutation.Set(opsFamily, opsHashColumn, now, []byte(newEntry.hash)) |
| updateMutation.Set(opsFamily, opsOpsColumn, now, encodedOps) |
| |
| // Add a mutation that cleans up old versions. |
| before := bigtable.Time(now.Time().Add(-1 * time.Second)) |
| updateMutation.DeleteTimestampRange(opsFamily, opsHashColumn, 0, before) |
| updateMutation.DeleteTimestampRange(opsFamily, opsOpsColumn, 0, before) |
| condUpdate := bigtable.NewCondMutation(cond, updateMutation, nil) |
| |
| if err := b.table.Apply(tctx, tileKey.OpsRowName(), condUpdate, bigtable.GetCondMutationResult(&condTrue)); err != nil { |
| sklog.Warningf("Failed to apply: %s", err) |
| return nil, err |
| } |
| |
| // If !condTrue then we need to try again, |
| // and clear our local cache. |
| if !condTrue { |
| sklog.Warningf("Exists !condTrue - clearing cache and trying again.") |
| b.opsCache.Delete(tileKey.OpsRowName()) |
| continue |
| } |
| } else { |
| // Create an update that only works if the ops entry doesn't exist yet. |
| // I.e. only apply the mutation if the HASH column doesn't exist for this row. |
| cond := bigtable.ChainFilters( |
| bigtable.FamilyFilter(opsFamily), |
| bigtable.ColumnFilter(opsHashColumn), |
| ) |
| updateMutation := bigtable.NewMutation() |
| updateMutation.Set(opsFamily, opsHashColumn, now, []byte(newEntry.hash)) |
| updateMutation.Set(opsFamily, opsOpsColumn, now, encodedOps) |
| |
| condUpdate := bigtable.NewCondMutation(cond, nil, updateMutation) |
| if err := b.table.Apply(tctx, tileKey.OpsRowName(), condUpdate, bigtable.GetCondMutationResult(&condTrue)); err != nil { |
| sklog.Warningf("Failed to apply: %s", err) |
| // clear cache and try again |
| b.opsCache.Delete(tileKey.OpsRowName()) |
| continue |
| } |
| |
| // If condTrue then we need to try again, |
| // and clear our local cache. |
| if condTrue { |
| sklog.Warningf("First Write condTrue - clearing cache and trying again.") |
| b.opsCache.Delete(tileKey.OpsRowName()) |
| continue |
| } |
| } |
| |
| // Successfully wrote OPS, so update the cache. |
| if b.cacheOps { |
| b.opsCache.Store(tileKey.OpsRowName(), newEntry) |
| } |
| break |
| } |
| return newEntry.ops, nil |
| } |
| |
| // getOps returns the OpsCacheEntry for a given tile. |
| // |
| // Note that it will create a new OpsCacheEntry if none exists. |
| // |
| // getOps returns false if the OPS in BT was empty, true otherwise (even if cached). |
| func (b *BTTraceStore) getOPS(ctx context.Context, tileKey tileKey) (*opsCacheEntry, bool, error) { |
| defer metrics2.FuncTimer().Stop() |
| if b.cacheOps { |
| entry, ok := b.opsCache.Load(tileKey.OpsRowName()) |
| if ok { |
| return entry.(*opsCacheEntry), true, nil |
| } |
| } |
| tctx, cancel := context.WithTimeout(ctx, readTimeout) |
| defer cancel() |
| row, err := b.table.ReadRow(tctx, tileKey.OpsRowName(), bigtable.RowFilter(bigtable.LatestNFilter(1))) |
| if err != nil { |
| return nil, false, skerr.Fmt("failed to read OPS from BigTable for %s: %s", tileKey.OpsRowName(), err) |
| } |
| // If there is no entry in BigTable then return an empty OPS. |
| if len(row) == 0 { |
| sklog.Warningf("Failed to read OPS from BT for %s.", tileKey.OpsRowName()) |
| entry, err := newOpsCacheEntry() |
| return entry, false, err |
| } |
| entry, err := newOpsCacheEntryFromRow(row) |
| if err == nil && b.cacheOps { |
| b.opsCache.Store(tileKey.OpsRowName(), entry) |
| } |
| return entry, true, err |
| } |
| |
| // makeTileCommits creates a slice of tiling.Commit from the given git hashes. |
| // Specifically, we need to look up the details to get the author information. |
| func (b *BTTraceStore) makeTileCommits(ctx context.Context, hashes []string) ([]*tiling.Commit, error) { |
| longCommits, err := b.vcs.DetailsMulti(ctx, hashes, false) |
| if err != nil { |
| // put hashes second in case they get truncated for being quite long. |
| return nil, skerr.Fmt("could not fetch commit data for commits %s (hashes: %q)", err, hashes) |
| } |
| |
| commits := make([]*tiling.Commit, len(hashes)) |
| for i, lc := range longCommits { |
| if lc == nil { |
| return nil, skerr.Fmt("commit %s not found from VCS", hashes[i]) |
| } |
| commits[i] = &tiling.Commit{ |
| Hash: lc.Hash, |
| Author: lc.Author, |
| CommitTime: lc.Timestamp.Unix(), |
| } |
| } |
| return commits, nil |
| } |
| |
| // Make sure BTTraceStore fulfills the TraceStore Interface |
| var _ tracestore.TraceStore = (*BTTraceStore)(nil) |