blob: b76f3ed8c52f292a0dedf5410d42503f2e4fc635 [file] [log] [blame]
package bt_gitstore
// The bt_gitstore package implements a way to store Git metadata in BigTable for faster retrieval
// than requiring a local checkout.
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"hash/crc32"
"strconv"
"strings"
"sync/atomic"
"time"
"cloud.google.com/go/bigtable"
"go.skia.org/infra/go/git"
"go.skia.org/infra/go/gitstore"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/go/vcsinfo"
"golang.org/x/sync/errgroup"
)
const (
DEPRECATED_TABLE_ID = "git-repos"
METRIC_BT_REQS_READ = "bt_gitstore_reqs_read"
METRIC_BT_REQS_WRITE = "bt_gitstore_reqs_write"
METRIC_BT_ROWS_READ = "bt_gitstore_rows_read"
METRIC_BT_ROWS_WRITE = "bt_gitstore_rows_write"
)
// BigTableGitStore implements the GitStore interface based on BigTable.
type BigTableGitStore struct {
RepoID int64
RepoURL string
shards uint32
writeGoroutines int
table *bigtable.Table
metricReqsRead metrics2.Counter
metricReqsWrite metrics2.Counter
metricRowsRead metrics2.Counter
metricRowsWrite metrics2.Counter
}
// New returns an instance of GitStore that uses BigTable as its backend storage.
// The given repoURL serves to identify the repository. Internally it is stored normalized
// via a call to git.NormalizeURL.
func New(ctx context.Context, config *BTConfig, repoURL string) (*BigTableGitStore, error) {
if config.TableID == DEPRECATED_TABLE_ID {
return nil, skerr.Fmt("This implementation of BigTableGitStore cannot be used with deprecated table %q", DEPRECATED_TABLE_ID)
}
if config.AppProfile == "" {
return nil, skerr.Fmt("BTConfig.AppProfile is required.")
}
// Create the client.
client, err := bigtable.NewClientWithConfig(ctx, config.ProjectID, config.InstanceID, bigtable.ClientConfig{
AppProfile: config.AppProfile,
})
if err != nil {
return nil, skerr.Wrapf(err, "creating bigtable client (project: %s; instance: %s)", config.ProjectID, config.InstanceID)
}
repoURL, err = git.NormalizeURL(repoURL)
if err != nil {
return nil, skerr.Wrapf(err, "normalizing URL %q", repoURL)
}
shards := config.Shards
if shards <= 0 {
shards = DefaultShards
}
writeGoroutines := config.WriteGoroutines
if writeGoroutines <= 0 {
writeGoroutines = DefaultWriteGoroutines
}
tags := map[string]string{
"project": config.ProjectID,
"instance": config.InstanceID,
"table": config.TableID,
"repo": repoURL,
}
ret := &BigTableGitStore{
table: client.Open(config.TableID),
shards: uint32(shards),
RepoURL: repoURL,
writeGoroutines: writeGoroutines,
metricReqsRead: metrics2.GetCounter(METRIC_BT_REQS_READ, tags),
metricReqsWrite: metrics2.GetCounter(METRIC_BT_REQS_WRITE, tags),
metricRowsRead: metrics2.GetCounter(METRIC_BT_ROWS_READ, tags),
metricRowsWrite: metrics2.GetCounter(METRIC_BT_ROWS_WRITE, tags),
}
repoInfo, err := ret.loadRepoInfo(ctx, true)
if err != nil {
return nil, skerr.Wrapf(err, "getting initial repo info for %s (project %s; instance: %s)", repoURL, config.ProjectID, config.InstanceID)
}
ret.RepoID = repoInfo.ID
return ret, nil
}
const (
// Column families.
cfBranches = "B"
cfCommit = "C"
cfTsCommit = "T"
cfMeta = "M"
// meta data columns.
colMetaID = "metaID"
colMetaIDCounter = "metaIDCounter"
// Keys of meta data rows.
metaVarRepo = "repo"
metaVarIDCounter = "idcounter"
// index commit
colHashTs = "ht"
// long commit
colAuthor = "a"
colSubject = "s"
colParents = "p"
colBody = "b"
colBranches = "br"
colHash = "h"
colIndex = "i"
// Define the row types.
typIndex = "i"
typTimeStamp = "z"
typCommit = "k"
typMeta = "!"
// getBatchSize is the batchsize for the Get operation. Each call to bigtable is made with maximally
// this number of git hashes. This is a conservative number to stay within the 1M request
// size limit. Since requests are sharded this will not limit throughput in practice.
getBatchSize = 5000
// writeBatchSize is the number of mutations to write at once. This could be fine tuned for different
// row types.
writeBatchSize = 1000
// Default number of shards used, if not shards provided in BTConfig.
DefaultShards = 32
// DefaultWriteGoroutines defines the maximum number of goroutines
// used to write to BigTable concurrently, if not provided in BTConfig.
// This number was shown to keep memory usage reasonably low while still
// providing decent throughput.
DefaultWriteGoroutines = 100
)
// rowMutation is a mutation for a single BT row.
type rowMutation struct {
row string
mut *bigtable.Mutation
}
// Put implements the GitStore interface.
func (b *BigTableGitStore) Put(ctx context.Context, commits []*vcsinfo.LongCommit) error {
if len(commits) == 0 {
return nil
}
// Spin up a goroutine to create mutations for commits.
mutations := make(chan rowMutation, writeBatchSize)
var egroup errgroup.Group
egroup.Go(func() error {
defer func() {
close(mutations)
}()
// Create IndexCommits mutations for each branch for each commit.
for i, c := range commits {
// Validation.
if c.Index == 0 && len(c.Parents) != 0 {
return skerr.Fmt("Commit %s has index zero but has at least one parent. This cannot be correct.", c.Hash)
}
if len(c.Branches) == 0 {
// TODO(borenet): Is there any way to check for this?
sklog.Warningf("Commit %s has no branch information; this is valid if it is not on the first-parent ancestry chain of any branch.", c.Hash)
}
// LongCommit mutation.
mut, err := b.mutationForLongCommit(c)
if err != nil {
return skerr.Wrapf(err, "Failed to Put commits; failed to create mutation.")
}
mutations <- mut
// Create the IndexCommit.
ic := c.IndexCommit()
mutations <- b.mutationForIndexCommit(gitstore.ALL_BRANCHES, ic)
mutations <- b.mutationForTimestampCommit(gitstore.ALL_BRANCHES, ic)
for branch := range c.Branches {
if branch != gitstore.ALL_BRANCHES {
mutations <- b.mutationForIndexCommit(branch, ic)
mutations <- b.mutationForTimestampCommit(branch, ic)
}
}
if i%1000 == 0 {
sklog.Infof("Created mutations for %d of %d commits.", i+1, len(commits))
}
}
return nil
})
// Spin up workers to write to BT.
empty := ""
var egroup2 errgroup.Group
for i := 0; i < b.writeGoroutines; i++ {
egroup2.Go(func() error {
rows := make([]string, 0, writeBatchSize)
muts := make([]*bigtable.Mutation, 0, writeBatchSize)
for rowMut := range mutations {
rows = append(rows, rowMut.row)
muts = append(muts, rowMut.mut)
if len(rows) == writeBatchSize {
if err := b.applyBulk(ctx, rows, muts); err != nil {
return skerr.Wrapf(err, "Failed to write commits.")
}
// Reuse the buffers. We need to clear
// out the values so that the underlying
// elements can be GC'd.
for i := 0; i < len(rows); i++ {
rows[i] = empty
muts[i] = nil
}
rows = rows[:0]
muts = muts[:0]
}
}
if len(rows) > 0 {
if err := b.applyBulk(ctx, rows, muts); err != nil {
return skerr.Wrapf(err, "Failed to write commits.")
}
}
return nil
})
}
// Wait for the inserts to finish.
insertErr := egroup2.Wait()
if insertErr != nil {
// We need to consume all of the mutations so that the first
// goroutine can exit.
for range mutations {
}
}
generateErr := egroup.Wait()
if insertErr != nil && generateErr != nil {
return skerr.Wrapf(generateErr, "Failed to generate BT mutations, and failed to apply with: %s", insertErr)
} else if insertErr != nil {
return skerr.Wrapf(insertErr, "Failed to apply BT mutations.")
} else if generateErr != nil {
return skerr.Wrapf(generateErr, "Failed to generate BT mutations.")
}
// Write the ALL_BRANCHES branch pointer.
lastCommit := commits[len(commits)-1]
ic := &vcsinfo.IndexCommit{
Hash: lastCommit.Hash,
Index: lastCommit.Index,
Timestamp: lastCommit.Timestamp,
}
return b.putBranchPointer(ctx, getRepoInfoRowName(b.RepoURL), gitstore.ALL_BRANCHES, ic)
}
// mutationForLongCommit returns a rowMutation for the given LongCommit.
func (b *BigTableGitStore) mutationForLongCommit(commit *vcsinfo.LongCommit) (rowMutation, error) {
mut, err := b.getCommitMutation(commit)
if err != nil {
return rowMutation{}, skerr.Wrapf(err, "Failed to create BT mutation")
}
return rowMutation{
row: b.rowName("", typCommit, commit.Hash),
mut: mut,
}, nil
}
// mutationForIndexCommit returns a rowMutation for the given IndexCommit.
func (b *BigTableGitStore) mutationForIndexCommit(branch string, commit *vcsinfo.IndexCommit) rowMutation {
return rowMutation{
row: b.rowName(branch, typIndex, sortableIndex(commit.Index)),
mut: b.simpleMutation(cfCommit, [][2]string{
{colHashTs, fmt.Sprintf("%s#%d", commit.Hash, commit.Timestamp.Unix())}, // Git has a timestamp resolution of 1s.
}...),
}
}
// mutationForTimestampCommit returns a rowMutation for the given IndexCommit
// keyed by timestamp.
func (b *BigTableGitStore) mutationForTimestampCommit(branch string, commit *vcsinfo.IndexCommit) rowMutation {
return rowMutation{
row: b.rowName(branch, typTimeStamp, sortableTimestamp(commit.Timestamp)),
mut: b.simpleMutation(cfTsCommit, [][2]string{
{commit.Hash, sortableIndex(commit.Index)},
}...),
}
}
// apply is a helper function for b.table.Apply.
func (b *BigTableGitStore) apply(ctx context.Context, row string, mut *bigtable.Mutation) error {
b.metricReqsWrite.Inc(1)
b.metricRowsWrite.Inc(1)
return b.table.Apply(ctx, row, mut)
}
// applyBulk is a helper function for b.table.ApplyBulk.
func (b *BigTableGitStore) applyBulk(ctx context.Context, rows []string, muts []*bigtable.Mutation) error {
b.metricReqsWrite.Inc(1)
b.metricRowsWrite.Inc(int64(len(rows)))
errs, err := b.table.ApplyBulk(ctx, rows, muts)
if err != nil {
return skerr.Fmt("Error writing batch: %s", err)
}
if errs != nil {
return skerr.Fmt("Error writing some portions of batch: %s", errs)
}
return nil
}
// applyReadModifyWrite is a helper function for b.table.ApplyReadModifyWrite.
func (b *BigTableGitStore) applyReadModifyWrite(ctx context.Context, row string, m *bigtable.ReadModifyWrite) (bigtable.Row, error) {
b.metricReqsWrite.Inc(1)
b.metricRowsWrite.Inc(1)
b.metricReqsRead.Inc(1)
b.metricRowsRead.Inc(1)
return b.table.ApplyReadModifyWrite(ctx, row, m)
}
// readRow is a helper function for b.table.ReadRow.
func (b *BigTableGitStore) readRow(ctx context.Context, row string, opts ...bigtable.ReadOption) (bigtable.Row, error) {
b.metricReqsRead.Inc(1)
b.metricRowsRead.Inc(1)
return b.table.ReadRow(ctx, row, opts...)
}
// readRows is a helper function for b.table.ReadRows.
func (b *BigTableGitStore) readRows(ctx context.Context, arg bigtable.RowSet, f func(bigtable.Row) bool, opts ...bigtable.ReadOption) error {
b.metricReqsRead.Inc(1)
return b.table.ReadRows(ctx, arg, func(row bigtable.Row) bool {
b.metricRowsRead.Inc(1)
return f(row)
}, opts...)
}
// Get implements the GitStore interface.
func (b *BigTableGitStore) Get(ctx context.Context, hashes []string) ([]*vcsinfo.LongCommit, error) {
// hashOrder tracks the original index(es) of each hash in the passed-in
// slice. It is used to ensure that we return the LongCommits in the
// desired order, despite our receiving them from BT in arbitrary order.
hashOrder := make(map[string][]int, len(hashes))
for idx, h := range hashes {
hashOrder[h] = append(hashOrder[h], idx)
}
rowNames := make(bigtable.RowList, 0, len(hashOrder))
for h := range hashOrder {
rowNames = append(rowNames, b.rowName("", typCommit, h))
}
tempRet := make([]*vcsinfo.LongCommit, len(rowNames))
prefix := cfCommit + ":"
err := util.ChunkIterParallel(ctx, len(rowNames), getBatchSize, func(ctx context.Context, bStart, bEnd int) error {
bRowNames := rowNames[bStart:bEnd]
batchIdx := int64(bStart - 1)
err := b.readRows(ctx, bRowNames, func(row bigtable.Row) bool {
longCommit := vcsinfo.NewLongCommit()
longCommit.Hash = keyFromRowName(row.Key())
for _, col := range row[cfCommit] {
switch strings.TrimPrefix(col.Column, prefix) {
case colHash:
longCommit.Timestamp = col.Timestamp.Time().UTC()
case colAuthor:
longCommit.Author = string(col.Value)
case colSubject:
longCommit.Subject = string(col.Value)
case colParents:
if len(col.Value) > 0 {
longCommit.Parents = strings.Split(string(col.Value), ":")
}
case colBody:
longCommit.Body = string(col.Value)
case colBranches:
if err := json.Unmarshal(col.Value, &longCommit.Branches); err != nil {
// We don't want to fail forever if there's a bad value in
// BigTable. Log an error and move on.
sklog.Errorf("Failed to decode LongCommit branches: %s\nStored value: %s", err, string(col.Value))
}
case colIndex:
index, err := strconv.Atoi(string(col.Value))
if err != nil {
// We don't want to fail forever if there's a bad value in
// BigTable. Log an error and move on.
sklog.Errorf("Failed to decode LongCommit branches: %s\nStored value: %s", err, string(col.Value))
}
longCommit.Index = index
}
}
targetIdx := atomic.AddInt64(&batchIdx, 1)
tempRet[targetIdx] = longCommit
return true
})
if err != nil {
return skerr.Wrapf(err, "running ReadRows")
}
return nil
})
if err != nil {
return nil, skerr.Wrapf(err, "Failed loading commits from BT.")
}
// Order the LongCommits to match the passed-in slice of hashes.
ret := make([]*vcsinfo.LongCommit, len(hashes))
for _, commit := range tempRet {
if commit != nil {
for _, targetIdx := range hashOrder[commit.Hash] {
ret[targetIdx] = commit
}
}
}
return ret, nil
}
// PutBranches implements the GitStore interface.
func (b *BigTableGitStore) PutBranches(ctx context.Context, branches map[string]string) error {
// Get the commits pointed to by the branches.
hashes := make([]string, 0, len(branches))
for _, head := range branches {
if head != gitstore.DELETE_BRANCH {
hashes = append(hashes, head)
}
}
longCommits, err := b.Get(ctx, hashes)
if err != nil {
return skerr.Wrapf(err, "Failed to retrieve branch heads.")
}
indexCommitsByHash := make(map[string]*vcsinfo.IndexCommit, len(longCommits))
for idx, c := range longCommits {
if c == nil {
return skerr.Fmt("Commit %s is missing from GitStore", hashes[idx])
}
indexCommitsByHash[c.Hash] = &vcsinfo.IndexCommit{
Hash: c.Hash,
Index: c.Index,
Timestamp: c.Timestamp,
}
}
var egroup errgroup.Group
for name, head := range branches {
// https://golang.org/doc/faq#closures_and_goroutines
name := name
head := head
egroup.Go(func() error {
if head == gitstore.DELETE_BRANCH {
return b.deleteBranchPointer(ctx, name)
} else {
return b.putBranchPointer(ctx, getRepoInfoRowName(b.RepoURL), name, indexCommitsByHash[head])
}
})
}
if err := egroup.Wait(); err != nil {
return skerr.Fmt("Error updating branches: %s", err)
}
return nil
}
// GetBranches implements the GitStore interface.
func (b *BigTableGitStore) GetBranches(ctx context.Context) (map[string]*gitstore.BranchPointer, error) {
repoInfo, err := b.loadRepoInfo(ctx, false)
if err != nil {
return nil, err
}
return repoInfo.Branches, nil
}
// RangeByTime implements the GitStore interface.
func (b *BigTableGitStore) RangeByTime(ctx context.Context, start, end time.Time, branch string) ([]*vcsinfo.IndexCommit, error) {
startTS := sortableTimestamp(start)
endTS := sortableTimestamp(end)
// If a branch was supplied, retrieve the pointer.
var branchPtr *gitstore.BranchPointer
var egroup errgroup.Group
if branch != gitstore.ALL_BRANCHES {
egroup.Go(func() error {
branches, err := b.GetBranches(ctx)
if err != nil {
return err
}
branchPtr = branches[branch]
return nil
})
}
result := newSRTimestampCommits(b.shards)
// Note that we do NOT use a LatestN filter here, because that would
// result in incomplete results in the case of commits which have the
// same timestamp. Git has a timestamp resolution of one second, which
// makes this likely, especially in tests.
filters := []bigtable.Filter{bigtable.FamilyFilter(cfTsCommit)}
err := b.iterShardedRange(ctx, branch, typTimeStamp, startTS, endTS, filters, result)
if err != nil {
return nil, err
}
indexCommits, timestamps := result.Sorted()
// Filter out results which do not belong on the given branch.
if err := egroup.Wait(); err != nil {
return nil, skerr.Wrapf(err, "Failed to retrieve branch pointer for %s", branch)
}
if branchPtr == nil && branch != gitstore.ALL_BRANCHES {
// If we don't know about the requested branch, return nil even
// if we found IndexCommits. This is correct behavior for
// deleted branches, because we don't delete the IndexCommits.
return nil, nil
}
if branchPtr != nil {
filtered := make(map[int][]*vcsinfo.IndexCommit, len(indexCommits))
for _, ic := range indexCommits {
if ic.Index <= branchPtr.Index {
filtered[ic.Index] = append(filtered[ic.Index], ic)
}
}
indexCommits = make([]*vcsinfo.IndexCommit, 0, len(filtered))
for idx := 0; idx <= branchPtr.Index; idx++ {
commits, ok := filtered[idx]
if !ok {
return nil, skerr.Fmt("Missing index %d for branch %s.", idx, branch)
}
if len(commits) == 1 {
indexCommits = append(indexCommits, commits[0])
} else {
sklog.Warningf("History was changed for branch %s. Deduplicating by last insertion into BT.", branch)
var mostRecent *vcsinfo.IndexCommit
for _, ic := range commits {
if mostRecent == nil || timestamps[ic].After(timestamps[mostRecent]) {
mostRecent = ic
}
}
indexCommits = append(indexCommits, mostRecent)
}
}
}
return indexCommits, nil
}
// RangeN implements the GitStore interface.
func (b *BigTableGitStore) RangeN(ctx context.Context, startIndex, endIndex int, branch string) ([]*vcsinfo.IndexCommit, error) {
startIdx := sortableIndex(startIndex)
endIdx := sortableIndex(endIndex)
// If a branch was supplied, retrieve the pointer.
var branchPtr *gitstore.BranchPointer
var egroup errgroup.Group
if branch != gitstore.ALL_BRANCHES {
egroup.Go(func() error {
branches, err := b.GetBranches(ctx)
if err != nil {
return err
}
branchPtr = branches[branch]
return nil
})
}
result := newSRIndexCommits(b.shards)
filters := []bigtable.Filter{bigtable.FamilyFilter(cfCommit), bigtable.LatestNFilter(1)}
err := b.iterShardedRange(ctx, branch, typIndex, startIdx, endIdx, filters, result)
if err != nil {
return nil, err
}
indexCommits, timestamps := result.Sorted()
// Filter out results which do not belong on the given branch.
if err := egroup.Wait(); err != nil {
return nil, skerr.Wrapf(err, "Failed to retrieve branch pointer for %s", branch)
}
if branchPtr == nil && branch != gitstore.ALL_BRANCHES {
// If we don't know about the requested branch, return nil even
// if we found IndexCommits. This is correct behavior for
// deleted branches, because we don't delete the IndexCommits.
return nil, nil
}
if branchPtr != nil {
filtered := make(map[int][]*vcsinfo.IndexCommit, len(indexCommits))
for _, ic := range indexCommits {
if ic.Index <= branchPtr.Index {
filtered[ic.Index] = append(filtered[ic.Index], ic)
}
}
indexCommits = make([]*vcsinfo.IndexCommit, 0, len(filtered))
for idx := 0; idx <= branchPtr.Index; idx++ {
commits, ok := filtered[idx]
if ok {
if len(commits) == 1 {
indexCommits = append(indexCommits, commits[0])
} else {
sklog.Warningf("History was changed for branch %s. Deduplicating by last insertion into BT.", branch)
var mostRecent *vcsinfo.IndexCommit
for _, ic := range commits {
if mostRecent == nil || timestamps[ic].After(timestamps[mostRecent]) {
mostRecent = ic
}
}
indexCommits = append(indexCommits, mostRecent)
}
}
}
}
return indexCommits, nil
}
func (b *BigTableGitStore) loadRepoInfo(ctx context.Context, create bool) (*gitstore.RepoInfo, error) {
// load repo info
rowName := getRepoInfoRowName(b.RepoURL)
row, err := b.readRow(ctx, rowName, bigtable.RowFilter(bigtable.LatestNFilter(1)))
if err != nil {
return nil, err
}
if row != nil {
return extractRepoInfo(row)
}
// If we are not create new repo information, return an error.
if !create {
return nil, skerr.Fmt("Repo information for %s not found.", b.RepoURL)
}
// Get a new ID from the DB
rmw := bigtable.NewReadModifyWrite()
rmw.Increment(cfMeta, colMetaIDCounter, 1)
row, err = b.applyReadModifyWrite(ctx, unshardedRowName(typMeta, metaVarIDCounter), rmw)
if err != nil {
return nil, err
}
// encID contains the big-endian encoded ID
encID := []byte(rowMap(row).GetStr(cfMeta, colMetaIDCounter))
id := int64(binary.BigEndian.Uint64(encID))
mut := bigtable.NewMutation()
mut.Set(cfMeta, colMetaID, bigtable.ServerTime, encID)
if err := b.apply(ctx, rowName, mut); err != nil {
return nil, err
}
b.RepoID = id
return &gitstore.RepoInfo{
RepoURL: b.RepoURL,
ID: id,
Branches: map[string]*gitstore.BranchPointer{},
}, nil
}
// putBranchPointer writes the branch pointer (the HEAD of a branch) to the row that stores
// the repo information. idxCommit is the index commit of the HEAD of the branch.
func (b *BigTableGitStore) putBranchPointer(ctx context.Context, repoInfoRowName, branchName string, idxCommit *vcsinfo.IndexCommit) error {
mut := bigtable.NewMutation()
now := bigtable.Now()
mut.Set(cfBranches, branchName, now, encBranchPointer(idxCommit.Hash, idxCommit.Index))
mut.DeleteTimestampRange(cfBranches, branchName, 0, now)
return b.apply(ctx, repoInfoRowName, mut)
}
// deleteBranchPointer deletes the row containing the branch pointer.
func (b *BigTableGitStore) deleteBranchPointer(ctx context.Context, branchName string) error {
mut := bigtable.NewMutation()
mut.DeleteCellsInColumn(cfBranches, branchName)
return b.apply(ctx, getRepoInfoRowName(b.RepoURL), mut)
}
// iterShardedRange iterates the keys in the half open interval [startKey, endKey) across all
// shards triggering as many queries as there are shards. If endKey is empty, then startKey is
// used to generate a prefix and a Prefix scan is performed.
// The results of the query are added to the instance of shardedResults.
func (b *BigTableGitStore) iterShardedRange(ctx context.Context, branch, rowType, startKey, endKey string, filters []bigtable.Filter, result shardedResults) error {
var egroup errgroup.Group
// Query all shards in parallel.
for shard := uint32(0); shard < b.shards; shard++ {
// https://golang.org/doc/faq#closures_and_goroutines
shard := shard
egroup.Go(func() error {
defer result.Finish(shard)
var rr bigtable.RowRange
// Treat the startKey as part of a prefix and do a prefix scan.
if endKey == "" {
rowPrefix := b.shardedRowName(shard, branch, rowType, startKey)
rr = bigtable.PrefixRange(rowPrefix)
} else {
// Derive the start and end row names.
rStart := b.shardedRowName(shard, branch, rowType, startKey)
rEnd := b.shardedRowName(shard, branch, rowType, endKey)
rr = bigtable.NewRange(rStart, rEnd)
}
var addErr error
err := b.readRows(ctx, rr, func(row bigtable.Row) bool {
addErr = result.Add(shard, row)
return addErr == nil
}, filtersToReadOptions(filters)...)
if err != nil {
return err
}
return addErr
})
}
if err := egroup.Wait(); err != nil {
return err
}
return nil
}
// simpleMutation assembles a simple mutation consisting of a column family, a timestamp and a
// set of column/value pairs. The timestamp is applied to all column/pairs.
func (b *BigTableGitStore) simpleMutation(cfFam string, colValPairs ...[2]string) *bigtable.Mutation {
ret := bigtable.NewMutation()
for _, pair := range colValPairs {
ret.DeleteCellsInColumn(cfFam, pair[0])
ret.Set(cfFam, pair[0], bigtable.ServerTime, []byte(pair[1]))
}
return ret
}
// getCommitMutation gets the mutation to write a long commit. Since the timestamp is set to the
// timestamp of the commit this is idempotent.
func (b *BigTableGitStore) getCommitMutation(commit *vcsinfo.LongCommit) (*bigtable.Mutation, error) {
ts := bigtable.Time(commit.Timestamp.UTC())
ret := bigtable.NewMutation()
ret.Set(cfCommit, colHash, ts, []byte(commit.Hash))
ret.Set(cfCommit, colAuthor, ts, []byte(commit.Author))
ret.Set(cfCommit, colSubject, ts, []byte(commit.Subject))
ret.Set(cfCommit, colParents, ts, []byte(strings.Join(commit.Parents, ":")))
ret.Set(cfCommit, colBody, ts, []byte(commit.Body))
encBranches, err := json.Marshal(commit.Branches)
if err != nil {
return nil, err
}
ret.Set(cfCommit, colBranches, ts, encBranches)
ret.Set(cfCommit, colIndex, ts, []byte(strconv.Itoa(commit.Index)))
return ret, nil
}
// rowName returns that BT rowName based on the tuple: (branch,rowType,Key).
// It also derives a unique shard for the given tuple and generates the complete rowName.
func (b *BigTableGitStore) rowName(branch string, rowType string, key string) string {
return b.shardedRowName(crc32.ChecksumIEEE([]byte(key))%b.shards, branch, rowType, key)
}
// shardedRowName returns the row name from (shard, branch, rowType, key) this is useful
// when we want to generate a specific row name with a defined shard.
func (b *BigTableGitStore) shardedRowName(shard uint32, branch, rowType, key string) string {
return fmt.Sprintf("%02d:%04d:%s:%s:%s", shard, b.RepoID, branch, rowType, key)
}
// unshardedRowName concatenates parts without prefixing any sharding information. This is for
// row types that are not sharded.
func unshardedRowName(parts ...string) string {
return strings.Join(parts, ":")
}
// getRepoInfoRowName returns the name of the row where the repo meta data is stored based on the repoURL.
func getRepoInfoRowName(repoURL string) string {
return unshardedRowName(typMeta, metaVarRepo, repoURL)
}
// getRepoInfoRowNamePrefix returns the row Name prefix to scan all rows containing repo meta data.
func getRepoInfoRowNamePrefix() string {
return unshardedRowName(typMeta, metaVarRepo)
}
// extractRepoInfo extract the repo meta data information from a read row.
func extractRepoInfo(row bigtable.Row) (*gitstore.RepoInfo, error) {
rm := rowMap(row)
// Extract the branch info.
branchInfo := rm.GetStrMap(cfBranches)
branches := make(map[string]*gitstore.BranchPointer, len(branchInfo))
var err error
for name, b := range branchInfo {
branches[name], err = decBranchPointer([]byte(b))
if err != nil {
return nil, skerr.Fmt("Error decoding branch pointer: %s", err)
}
}
// Extract the repo ID.
idBytes := []byte(rm.GetStr(cfMeta, colMetaID))
if len(idBytes) != 8 {
return nil, skerr.Fmt("Error: Got id that's not exactly 8 bytes: '%x': %s", idBytes, err)
}
ret := &gitstore.RepoInfo{
RepoURL: keyFromRowName(row.Key()),
ID: int64(binary.BigEndian.Uint64(idBytes)),
Branches: branches,
}
return ret, nil
}
// filtersToReadOptions converts a list of filters to []bigtable.ReadOption. It will inject
// a ChainFilters(...) instance if multiple filters are defined. By returning a slice we are
// able to pass 0 - n filter to a ReadRows call.
func filtersToReadOptions(filters []bigtable.Filter) []bigtable.ReadOption {
if len(filters) == 0 {
return []bigtable.ReadOption{}
}
// If there is more than one filter then chain them.
if len(filters) > 1 {
filters = []bigtable.Filter{bigtable.ChainFilters(filters...)}
}
return []bigtable.ReadOption{bigtable.RowFilter(filters[0])}
}
// keyFromRowName assumes that key segments are separated by ':' and the last segment is the
// actual key we are interested in.
func keyFromRowName(rowName string) string {
parts := strings.Split(rowName, ":")
return parts[len(parts)-1]
}
// sortableTimestamp returns a timestamp as a string (in seconds) that can used as a key in BT.
func sortableTimestamp(ts time.Time) string {
// Convert the timestamp in seconds to a string that is sortable and limit it to 10 digits.
// That is the equivalent of valid timestamps up to November 2286.
return fmt.Sprintf("%010d", util.MinInt64(9999999999, ts.Unix()))
}
// sortableIndex returns an index as a string that can be used as a key in BT.
func sortableIndex(index int) string {
return fmt.Sprintf("%08d", util.MinInt(99999999, index))
}
// parseIndex parses an index previously generated with sortableIndex.
func parseIndex(indexStr string) int {
ret, err := strconv.ParseInt(indexStr, 10, 64)
if err != nil {
return -1
}
return int(ret)
}
// encBranchPointer converts a hash and an index into a string where the parts are separated by ':'
func encBranchPointer(hash string, index int) []byte {
idxBuf := make([]byte, 8)
binary.BigEndian.PutUint64(idxBuf, uint64(index))
return []byte(hash + ":" + string(idxBuf))
}
// decBranchPointer a string previously generated by encBranchPointer into a BranchPointer,
// containing a head and index.
func decBranchPointer(encPointer []byte) (*gitstore.BranchPointer, error) {
parts := bytes.SplitN(encPointer, []byte(":"), 2)
if len(parts) != 2 || len(parts[1]) != 8 {
return nil, skerr.Fmt("Received wrong branch pointer. Expected format <commit>:<big_endian_64_bit>")
}
return &gitstore.BranchPointer{
Head: string(parts[0]),
Index: int(binary.BigEndian.Uint64([]byte(parts[1]))),
}, nil
}
// rowMap is a helper type that wraps around a bigtable.Row and allows to extract columns and their
// values.
type rowMap bigtable.Row
// GetStr extracts that value of colFam:colName as a string from the row. If it doesn't exist it
// returns ""
func (r rowMap) GetStr(colFamName, colName string) string {
prefix := colFamName + ":"
for _, col := range r[colFamName] {
if strings.TrimPrefix(col.Column, prefix) == colName {
return string(col.Value)
}
}
return ""
}
// GetStrMap extracts a map[string]string from the row that maps columns -> values for the given
// column family.
func (r rowMap) GetStrMap(colFamName string) map[string]string {
prefix := colFamName + ":"
ret := make(map[string]string, len(r[colFamName]))
for _, col := range r[colFamName] {
trimmed := strings.TrimPrefix(col.Column, prefix)
ret[trimmed] = string(col.Value)
}
return ret
}
// Make sure BigTableGitStore fulfills the GitStore interface
var _ gitstore.GitStore = (*BigTableGitStore)(nil)