blob: e0a014d4f7d63f545c92178472ed8b8668812db9 [file] [log] [blame]
package bt_vcs
import (
"bytes"
"context"
"errors"
"math"
"sort"
"sync"
"time"
"go.skia.org/infra/go/depot_tools"
"go.skia.org/infra/go/eventbus"
"go.skia.org/infra/go/gitiles"
"go.skia.org/infra/go/gitstore"
"go.skia.org/infra/go/gitstore/bt_gitstore"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/go/vcsinfo"
"golang.org/x/sync/errgroup"
)
const (
// EV_NEW_GIT_COMMIT is the event that is fired when a previously unseen Git commit is available.
// The event data for this commit is of type []*vcsinfo.IndexCommit containing all new commits
// that have been added since the last commit was sent.
EV_NEW_GIT_COMMIT = "gitstore:new-git-commit"
// defaultWatchInterval is the interval at which we check for new commits being added to the repo.
defaultWatchInterval = time.Second * 10
// nCommits is a parameter for StartTracking to tune how many
// commits gets sent over the event bus when new commits are
// detected.
nCommits = 20
)
// BigTableVCS implements the vcsinfo.VCS interface based on a BT-backed GitStore.
type BigTableVCS struct {
gitStore gitstore.GitStore
repo *gitiles.Repo
defaultBranch string
secondaryVCS vcsinfo.VCS
secondaryExtractor depot_tools.DEPSExtractor
branchInfo *gitstore.BranchPointer
// This mutex protects detailsCache and indexCommits
mutex sync.RWMutex
// detailsCache is for LongCommits so we don't have to query gitStore every time
detailsCache map[string]*vcsinfo.LongCommit
indexCommits []*vcsinfo.IndexCommit
}
// NewVCS returns an instance of vcsinfo.VCS that is backed by the given GitStore and uses the
// gittiles.Repo to retrieve files. Each instance provides an interface to one branch.
// If defaultBranch is gitstore.ALL_BRANCHES all commits in the repository are considered.
// The instances of gitiles.Repo is only used to fetch files.
func New(ctx context.Context, gitStore gitstore.GitStore, defaultBranch string, repo *gitiles.Repo) (*BigTableVCS, error) {
if gitStore == nil {
return nil, errors.New("Cannot have nil gitStore")
}
ret := &BigTableVCS{
gitStore: gitStore,
repo: repo,
defaultBranch: defaultBranch,
detailsCache: map[string]*vcsinfo.LongCommit{},
}
if err := ret.Update(ctx, true, false); err != nil {
return nil, skerr.Wrapf(err, "could not perform initial update")
}
return ret, nil
}
// GetBranch implements the vcsinfo.VCS interface.
func (b *BigTableVCS) GetBranch() string {
return b.defaultBranch
}
// SetSecondaryRepo allows to add a secondary repository and extractor to this instance.
// It is not included in the constructor since it is currently only used by the Gold ingesters.
func (b *BigTableVCS) SetSecondaryRepo(secVCS vcsinfo.VCS, extractor depot_tools.DEPSExtractor) {
b.secondaryVCS = secVCS
b.secondaryExtractor = extractor
}
// Update implements the vcsinfo.VCS interface
func (b *BigTableVCS) Update(ctx context.Context, pull, allBranches bool) error {
// Check if we need to pull across all branches.
targetBranch := b.defaultBranch
if allBranches {
targetBranch = gitstore.ALL_BRANCHES
}
// Simulate a pull by fetching the latest head of the target branch.
if pull {
branchHeads, err := b.gitStore.GetBranches(ctx)
if err != nil {
return skerr.Wrap(err)
}
var ok bool
b.branchInfo, ok = branchHeads[targetBranch]
if !ok {
r := (b.gitStore.(*bt_gitstore.BigTableGitStore)).RepoURL
return skerr.Fmt("unable to find branch %q in BigTable repo %s", targetBranch, r)
}
}
// Get all index commits for the current branch.
if err := b.fetchIndexRange(ctx, 0, b.branchInfo.Index+1); err != nil {
return skerr.Wrapf(err, "could not fetch index commits [0,%d]", b.branchInfo.Index+1)
}
// warm the cache of long commits
hashes := func() []string {
b.mutex.RLock()
defer b.mutex.RUnlock()
hashes := make([]string, 0, len(b.indexCommits))
for _, ic := range b.indexCommits {
hashes = append(hashes, ic.Hash)
}
return hashes
}()
_, err := b.DetailsMulti(ctx, hashes, false)
return err
}
// From implements the vcsinfo.VCS interface
func (b *BigTableVCS) From(start time.Time) []string {
b.mutex.RLock()
defer b.mutex.RUnlock()
// Add a millisecond because we only want commits after the startTime. Timestamps in git are
// only at second level granularity.
found := b.timeRange(start.Add(time.Millisecond), vcsinfo.MaxTime)
ret := make([]string, len(found))
for i, c := range found {
ret[i] = c.Hash
}
return ret
}
// Details implements the vcsinfo.VCS interface
func (b *BigTableVCS) Details(ctx context.Context, hash string, includeBranchInfo bool) (*vcsinfo.LongCommit, error) {
// Check the cache first, being sure to unlock the mutex after so that
// if we have to poll BT for it, we aren't blocking the whole time.
rv := func() *vcsinfo.LongCommit {
b.mutex.RLock()
defer b.mutex.RUnlock()
c, ok := b.detailsCache[hash]
if ok {
// If the user is querying for branch info,
// we have to check to see if the cached value has it.
// If the cached value has it and the user didn't request it,
// return it anyway.
if !includeBranchInfo || len(c.Branches) > 0 {
return c
}
}
return nil
}()
if rv != nil {
return rv, nil
}
// cache miss, so query for it
c, err := b.details(ctx, hash, includeBranchInfo)
if err != nil {
return nil, skerr.Wrapf(err, "could not fetch Details for commit %s", hash)
}
if c != nil {
b.mutex.Lock()
defer b.mutex.Unlock()
b.detailsCache[hash] = c
}
return c, err
}
// DetailsMulti implements the vcsinfo.VCS interface
func (b *BigTableVCS) DetailsMulti(ctx context.Context, hashes []string, includeBranchInfo bool) ([]*vcsinfo.LongCommit, error) {
// Instantiate a list of N nil values so we can directly insert them into the proper index.
rv := make([]*vcsinfo.LongCommit, len(hashes))
missedHashes := []string{}
// Index into hashes of which hashes were not in the cache
missedHashesIdx := []int{}
func() {
b.mutex.RLock()
defer b.mutex.RUnlock()
for i, hash := range hashes {
c, ok := b.detailsCache[hash]
// If the user is querying for branch info,
// we have to check to see if the cached value has it.
if ok && (!includeBranchInfo || len(c.Branches) > 0) {
rv[i] = c
} else {
missedHashesIdx = append(missedHashesIdx, i)
missedHashes = append(missedHashes, hash)
}
}
}()
if len(missedHashes) == 0 {
return rv, nil
}
// bulk fetch the missedHashes. A simpler, but potentially slower, approach
// could just do a b.details() for each commit (parallelized with an errgroup)
commits, err := b.gitStore.Get(ctx, missedHashes)
if err != nil {
return nil, skerr.Wrapf(err, "Get missed hashes %q (superset %q)", missedHashes, hashes)
}
if includeBranchInfo {
branchPointers, err := b.gitStore.GetBranches(ctx)
if err != nil {
return nil, skerr.Wrap(err)
}
// We need to do an individual request per commit for its branch info
var egroup errgroup.Group
for _, c := range commits {
if c != nil {
// Create a closure since we pass each value of 'c' to its own go-routine.
func(c *vcsinfo.LongCommit) {
egroup.Go(func() error {
branches, err := b.getBranchInfo(ctx, c, branchPointers)
if err != nil {
return skerr.Wrapf(err, "getBranchInfo for commit %s", c.Hash)
}
c.Branches = branches
return nil
})
}(c)
}
}
if err := egroup.Wait(); err != nil {
return nil, skerr.Wrapf(err, "Fetching branch info for %q", missedHashes)
}
}
b.mutex.Lock()
defer b.mutex.Unlock()
for i, hash := range missedHashes {
c := commits[i]
if c != nil {
rv[missedHashesIdx[i]] = c
b.detailsCache[hash] = c
}
}
return rv, nil
}
// details returns all meta data details we care about.
func (b *BigTableVCS) details(ctx context.Context, hash string, includeBranchInfo bool) (*vcsinfo.LongCommit, error) {
commits, err := b.gitStore.Get(ctx, []string{hash})
if err != nil {
return nil, err
}
if len(commits) == 0 || commits[0] == nil {
return nil, skerr.Fmt("Commit %s not found", hash)
}
if includeBranchInfo {
branchPointers, err := b.gitStore.GetBranches(ctx)
if err != nil {
return nil, skerr.Wrap(err)
}
branches, err := b.getBranchInfo(ctx, commits[0], branchPointers)
if err != nil {
return nil, skerr.Wrapf(err, "getting branch info for commit %s", commits[0].Hash)
}
commits[0].Branches = branches
}
return commits[0], nil
}
// getBranchInfo determines which branches contain the given commit 'c'.
// This function can potentially spawn a huge number of goroutines (one per branch).
func (b *BigTableVCS) getBranchInfo(ctx context.Context, c *vcsinfo.LongCommit, allBranches map[string]*gitstore.BranchPointer) (map[string]bool, error) {
ret := make(map[string]bool, len(allBranches))
var mutex sync.Mutex
var egroup errgroup.Group
for branchName := range allBranches {
if branchName != gitstore.ALL_BRANCHES {
func(branchName string) {
egroup.Go(func() error {
// Since we cannot look up a commit in a branch directly we query for all commits that
// occurred at that specific timestamp (Git has second granularity) on the target branch.
// Then we check whether the target commit is returned as part of the result.
commits, err := b.gitStore.RangeByTime(ctx, c.Timestamp, c.Timestamp.Add(time.Second), branchName)
if err != nil {
return skerr.Wrapf(err, "range query for branch %s", branchName)
}
// Iterate over the commits at the given timestamp. Most of the time there should
// only be one commit at a given one second time range.
for _, idxCommit := range commits {
if idxCommit.Hash == c.Hash {
mutex.Lock()
ret[branchName] = true
mutex.Unlock()
break
}
}
return nil
})
}(branchName)
}
}
if err := egroup.Wait(); err != nil {
return nil, skerr.Wrapf(err, "retrieving branch info for %s", c.Hash)
}
return ret, nil
}
// LastNIndex implements the vcsinfo.VCS interface
func (b *BigTableVCS) LastNIndex(N int) []*vcsinfo.IndexCommit {
b.mutex.RLock()
defer b.mutex.RUnlock()
if N > len(b.indexCommits) {
N = len(b.indexCommits)
}
ret := make([]*vcsinfo.IndexCommit, 0, N)
return append(ret, b.indexCommits[len(b.indexCommits)-N:]...)
}
// Range implements the vcsinfo.VCS interface
func (b *BigTableVCS) Range(begin, end time.Time) []*vcsinfo.IndexCommit {
return b.timeRange(begin, end)
}
// IndexOf implements the vcsinfo.VCS interface
func (b *BigTableVCS) IndexOf(ctx context.Context, hash string) (int, error) {
b.mutex.RLock()
defer b.mutex.RUnlock()
for i := len(b.indexCommits) - 1; i >= 0; i-- {
if hash == b.indexCommits[i].Hash {
return b.indexCommits[i].Index, nil
}
}
return -1, skerr.Fmt("commit %s not found", hash)
}
// ByIndex implements the vcsinfo.VCS interface
func (b *BigTableVCS) ByIndex(ctx context.Context, N int) (*vcsinfo.LongCommit, error) {
// findFn returns the hash when N is within commits
findFn := func(commits []*vcsinfo.IndexCommit) string {
i := sort.Search(len(commits), func(i int) bool { return commits[i].Index >= N })
return commits[i].Hash
}
hash := func() string {
b.mutex.RLock()
defer b.mutex.RUnlock()
if len(b.indexCommits) > 0 {
firstIdx := b.indexCommits[0].Index
lastIdx := b.indexCommits[len(b.indexCommits)-1].Index
if (N >= firstIdx) && (N <= lastIdx) {
return findFn(b.indexCommits)
}
}
return ""
}()
// Fetch the hash
if hash == "" {
return nil, skerr.Fmt("Hash index not found: %d", N)
}
return b.Details(ctx, hash, false)
}
// GetFile implements the vcsinfo.VCS interface
func (b *BigTableVCS) GetFile(ctx context.Context, fileName, commitHash string) (string, error) {
var buf bytes.Buffer
if err := b.repo.ReadFileAtRef(ctx, fileName, commitHash, &buf); err != nil {
return "", skerr.Wrapf(err, "reading file %s @ %s via gitiles", fileName, commitHash)
}
return buf.String(), nil
}
// ResolveCommit implements the vcsinfo.VCS interface
func (b *BigTableVCS) ResolveCommit(ctx context.Context, commitHash string) (string, error) {
if b.secondaryVCS == nil {
return "", vcsinfo.NoSecondaryRepo
}
foundCommit, err := b.secondaryExtractor.ExtractCommit(b.secondaryVCS.GetFile(ctx, "DEPS", commitHash))
if err != nil {
return "", err
}
return foundCommit, nil
}
// GetGitStore implements the gitstore.GitStoreBased interface
func (b *BigTableVCS) GetGitStore() gitstore.GitStore {
return b.gitStore
}
// fetchIndexRange gets in the range [startIndex, endIndex).
func (b *BigTableVCS) fetchIndexRange(ctx context.Context, startIndex, endIndex int) error {
newIC, err := b.gitStore.RangeN(ctx, startIndex, endIndex, b.defaultBranch)
if err != nil {
return err
}
if len(newIC) == 0 {
return nil
}
b.mutex.Lock()
defer b.mutex.Unlock()
b.indexCommits = newIC
return nil
}
func (b *BigTableVCS) timeRange(start time.Time, end time.Time) []*vcsinfo.IndexCommit {
n := len(b.indexCommits)
startIdx := 0
for ; startIdx < n; startIdx++ {
exp := b.indexCommits[startIdx].Timestamp.After(start) || b.indexCommits[startIdx].Timestamp.Equal(start)
if exp {
break
}
}
endIdx := startIdx
for ; endIdx < n; endIdx++ {
exp := b.indexCommits[endIdx].Timestamp.After(end) || b.indexCommits[endIdx].Timestamp.Equal(end)
if exp {
break
}
}
if endIdx <= startIdx {
return []*vcsinfo.IndexCommit{}
}
return b.indexCommits[startIdx:endIdx]
}
// StartTracking begins watching the repo for changes on a background thread.
// When a new commit is detected a EV_NEW_GIT_COMMIT event is triggered on the event bus.
func (b *BigTableVCS) StartTracking(ctx context.Context, evt eventbus.EventBus) {
if evt == nil {
sklog.Warningf("Not starting tracking eventbus was nil")
return
}
// Keep track of commits.
var prevCommits []*vcsinfo.IndexCommit
go util.RepeatCtx(defaultWatchInterval, ctx, func(ctx context.Context) {
allBranches, err := b.gitStore.GetBranches(ctx)
if err != nil {
sklog.Errorf("Error retrieving branches: %s", err)
return
}
branchInfo, ok := allBranches[b.defaultBranch]
if !ok {
sklog.Errorf("Branch %s not found in gitstore", b.defaultBranch)
return
}
startIdx := util.MaxInt(0, branchInfo.Index+1-nCommits)
commits, err := b.gitStore.RangeN(ctx, startIdx, int(math.MaxInt32), b.defaultBranch)
if err != nil {
sklog.Errorf("Error getting last %d commits: %s", nCommits, err)
return
}
// If we received new commits then publish an event and save them for the next round.
if len(prevCommits) != len(commits) || commits[len(commits)-1].Index > prevCommits[len(prevCommits)-1].Index {
prevCommits = commits
cpCommits := append([]*vcsinfo.IndexCommit{}, commits...)
evt.Publish(EV_NEW_GIT_COMMIT, cpCommits, false)
}
})
}
// Make sure BigTableVCS fulfills the VCS interface
var _ vcsinfo.VCS = (*BigTableVCS)(nil)