blob: d2903fdea44420d6fd93cda8393b6d928c7fa5f2 [file] [log] [blame]
package watcher
import (
"context"
"fmt"
"path"
"runtime/debug"
"sync"
"time"
"cloud.google.com/go/storage"
"go.skia.org/infra/go/cleanup"
"go.skia.org/infra/go/gcs"
"go.skia.org/infra/go/gcs/gcsclient"
"go.skia.org/infra/go/git"
"go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/gitiles"
"go.skia.org/infra/go/gitstore"
"go.skia.org/infra/go/gitstore/bt_gitstore"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/timer"
"go.skia.org/infra/go/vcsinfo"
"golang.org/x/sync/errgroup"
)
const (
// batchSize is the number of commits retrieved at a time from Gitiles.
batchSize = 10000
)
// Start creates a GitStore with the provided information and starts periodic
// ingestion.
func Start(ctx context.Context, conf *bt_gitstore.BTConfig, repoURL, gitcookiesPath, gcsBucket, gcsPath string, interval time.Duration) error {
sklog.Infof("Initializing watcher for %s", repoURL)
gitStore, err := bt_gitstore.New(ctx, conf, repoURL)
if err != nil {
return skerr.Wrapf(err, "Error instantiating git store for %s.", repoURL)
}
gr := gitiles.NewRepo(repoURL, gitcookiesPath, nil)
s, err := storage.NewClient(ctx)
if err != nil {
return skerr.Wrapf(err, "Failed to create storage client for %s.", gcsBucket)
}
gcsClient := gcsclient.New(s, gcsBucket)
ri, err := newRepoImpl(ctx, gitStore, gr, gcsClient, gcsPath)
if err != nil {
return skerr.Wrapf(err, "Failed to create RepoImpl for %s; using gs://%s/%s.", repoURL, gcsBucket, gcsPath)
}
sklog.Infof("Building Graph for %s...", repoURL)
repo, err := repograph.NewWithRepoImpl(ctx, ri)
if err != nil {
return skerr.Wrapf(err, "Failed to create repo graph for %s.", repoURL)
}
repo.UpdateBranchInfo()
// Start periodic ingestion.
lvGitSync := metrics2.NewLiveness("last_successful_git_sync", map[string]string{"repo": repoURL})
cleanup.Repeat(interval, func(ctx context.Context) {
defer timer.New("Sync " + repoURL).Stop()
// Catch any panic and log relevant information to find the root cause.
defer func() {
if err := recover(); err != nil {
sklog.Errorf("Panic updating %s: %s\n%s", repoURL, err, string(debug.Stack()))
}
}()
sklog.Infof("Updating %s...", repoURL)
if err := repo.Update(ctx); err != nil {
sklog.Errorf("Error updating %s: %s", repoURL, err)
} else {
gotBranches, err := gitStore.GetBranches(ctx)
if err != nil {
sklog.Errorf("Successfully updated %s but failed to retrieve branch heads: %s", repoURL, err)
} else {
sklog.Infof("Successfully updated %s", repoURL)
for name, branch := range gotBranches {
sklog.Debugf(" %s@%s: %d, %s", path.Base(repoURL), name, branch.Index, branch.Head)
}
}
lvGitSync.Reset()
}
}, nil)
return nil
}
// repoImpl is an implementation of repograph.RepoImpl which loads commits into
// a GitStore.
type repoImpl struct {
*repograph.MemCacheRepoImpl
gcsClient gcs.GCSClient
gcsPath string
gitiles *gitiles.Repo
gitstore gitstore.GitStore
}
// newRepoImpl returns a repograph.RepoImpl which uses both Gitiles and
// GitStore.
func newRepoImpl(ctx context.Context, gs gitstore.GitStore, repo *gitiles.Repo, gcsClient gcs.GCSClient, gcsPath string) (repograph.RepoImpl, error) {
indexCommits, err := gs.RangeByTime(ctx, vcsinfo.MinTime, vcsinfo.MaxTime, gitstore.ALL_BRANCHES)
if err != nil {
return nil, skerr.Wrapf(err, "Failed loading IndexCommits from GitStore.")
}
var commits []*vcsinfo.LongCommit
if len(indexCommits) > 0 {
hashes := make([]string, 0, len(indexCommits))
for _, c := range indexCommits {
hashes = append(hashes, c.Hash)
}
commits, err = gs.Get(ctx, hashes)
if err != nil {
return nil, skerr.Wrapf(err, "Failed loading LongCommits from GitStore.")
}
}
gb, err := gs.GetBranches(ctx)
if err != nil {
return nil, skerr.Wrapf(err, "Failed loading branches from GitStore.")
}
branches := make([]*git.Branch, 0, len(gb))
for name, branch := range gb {
branches = append(branches, &git.Branch{
Name: name,
Head: branch.Head,
})
}
commitsMap := make(map[string]*vcsinfo.LongCommit, len(commits))
for _, c := range commits {
commitsMap[c.Hash] = c
}
sklog.Infof("Repo %s has %d commits and %d branches.", repo.URL, len(commits), len(branches))
for _, b := range branches {
sklog.Infof(" branch %s @ %s", b.Name, b.Head)
}
return &repoImpl{
MemCacheRepoImpl: repograph.NewMemCacheRepoImpl(commitsMap, branches),
gcsClient: gcsClient,
gcsPath: gcsPath,
gitiles: repo,
gitstore: gs,
}, nil
}
type commitBatch struct {
branch string
commits []*vcsinfo.LongCommit
}
// processCommits processes commits in a separate goroutine while the
// passed-in func loads commits from Gitiles.
func (r *repoImpl) processCommits(ctx context.Context, process func(context.Context, *commitBatch) error, loadCommits func(context.Context, chan<- *commitBatch) error) error {
// Run GitStore ingestion in a goroutine. Create a cancelable context
// to halt requests to Gitiles if GitStore ingestion fails.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
commitsCh := make(chan *commitBatch)
errCh := make(chan error)
go func() {
var err error
for cb := range commitsCh {
if err != nil {
// We've hit an error but we need to consume
// all of the commits from the channel before
// returning, or the passed-in func may block
// forever.
sklog.Warningf("Skipping %d commits due to previous error.", len(cb.commits))
continue
}
// Process the commits.
if process != nil {
err = process(ctx, cb)
}
if err != nil {
if err != gitiles.ErrStopIteration {
sklog.Errorf("processCommits encountered error: %s", err)
}
// Cancel the context passed to loadCommits().
// If it respects context.Done() as it's
// supposed to, then it should exit early with
// an error.
cancel()
}
}
// Signal that we're done ingesting commits.
errCh <- err
}()
// Run the passed-in func.
loadingErr := loadCommits(ctx, commitsCh)
// Close the commits channel, wait for the goroutine to complete.
close(commitsCh)
processErr := <-errCh
// The error returned from the gitstore goroutine takes precedence,
// because we cancel the context when gitstore.Put fails, and thus
// loadCommits() may return an error simply stating that the context was
// canceled.
if processErr != nil {
if processErr == gitiles.ErrStopIteration {
// Ignore the loadingErr in this case, since it's almost
// certainly just "context canceled".
return nil
}
if loadingErr != nil {
return skerr.Wrapf(processErr, "GitStore ingestion failed, and commit-loading func failed with: %s", loadingErr)
}
return processErr
}
return loadingErr
}
// loadCommitsFromGitiles loads commits from Gitiles and pushes them onto the
// given channel, until we reach the optional from commit, or any other commit
// we've seen before.
func (r *repoImpl) loadCommitsFromGitiles(ctx context.Context, branch, logExpr string, commitsCh chan<- *commitBatch, opts ...gitiles.LogOption) error {
return r.gitiles.LogFnBatch(ctx, logExpr, func(ctx context.Context, commits []*vcsinfo.LongCommit) error {
commitsCh <- &commitBatch{
branch: branch,
commits: commits,
}
return nil
}, opts...)
}
// initialIngestion performs the first-time ingestion of the repo.
func (r *repoImpl) initialIngestion(ctx context.Context) error {
sklog.Warningf("Performing initial ingestion of %s.", r.gitiles.URL)
defer timer.New("Initial ingestion").Stop()
// Create a tmpGitStore.
sklog.Info("Retrieving graph from temporary store.")
t := timer.New("Retrieving graph from temp store")
graph, ri, err := setupInitialIngest(ctx, r.gcsClient, r.gcsPath, r.gitiles.URL)
if err != nil {
return skerr.Wrapf(err, "Failed initial ingestion of %s using GCS file %s", r.gitiles.URL, r.gcsPath)
}
for _, c := range graph.GetAll() {
r.Commits[c.Hash] = c.LongCommit
}
// oldBranches maps branch names to commit hashes for the existing
// branches.
oldBranches := map[string]string{}
for _, b := range graph.BranchHeads() {
oldBranches[b.Name] = b.Head
}
t.Stop()
// Find the current set of branches.
t = timer.New("Loading commits from gitiles")
branches, err := r.gitiles.Branches(ctx)
if err != nil {
return skerr.Wrapf(err, "Failed loading branches from Gitiles.")
}
// Load commits from gitiles.
// We assume that master contains the majority of the commits in the
// repo, and the other branches are comparatively small, with most of
// their ancestry being on master itself.
var master *git.Branch
for _, b := range branches {
if b.Name == "master" {
master = b
break
}
}
if master != nil {
if master.Head == oldBranches[master.Name] {
sklog.Infof("Master is up to date; skipping.")
} else {
sklog.Info("Loading commits from gitiles for master.")
if err := r.processCommits(ctx, func(ctx context.Context, cb *commitBatch) error {
// Add the new commits to our local cache.
sklog.Infof("Adding batch of %d commits", len(cb.commits))
for _, c := range cb.commits {
r.Commits[c.Hash] = c
}
return initialIngestCommitBatch(ctx, graph, ri.MemCacheRepoImpl, cb)
}, func(ctx context.Context, commitsCh chan<- *commitBatch) error {
logExpr := master.Head
if oldHead, ok := oldBranches[master.Name]; ok {
sklog.Errorf("Have master @ %s; requesting %s", oldHead, master.Head)
logExpr = fmt.Sprintf("%s..%s", oldHead, master.Head)
}
return r.loadCommitsFromGitiles(ctx, master.Name, logExpr, commitsCh, gitiles.LogReverse(), gitiles.LogBatchSize(batchSize))
}); err != nil {
return skerr.Wrapf(err, "Failed to ingest commits for master.")
}
}
}
ri.Wait()
// mtx protects graph and ri.commits as we load commits for non-master
// branches in different goroutines.
var mtx sync.Mutex
// Load commits for other branches, in non-reverse order, so that we can
// stop once we reach commits already on the master branch.
var egroup errgroup.Group
for _, branch := range branches {
// https://golang.org/doc/faq#closures_and_goroutines
branch := branch
if branch == master {
continue
}
sklog.Infof("Loading commits for %s", branch.Name)
mtx.Lock()
_, exists := r.Commits[branch.Head]
mtx.Unlock()
if exists {
sklog.Infof(" ... already have %s, skip", branch.Head)
continue
}
egroup.Go(func() error {
var commits []*vcsinfo.LongCommit
mtx.Lock()
localCache := make(map[string]*vcsinfo.LongCommit, len(r.Commits))
for h, c := range r.Commits {
localCache[h] = c
}
mtx.Unlock()
// lookingFor tracks which commits are wanted by this
// branch. As we traverse back through git history, we
// may find commits which we already have in our local
// cache. One might thing that we could stop requesting
// batches of commits at that point, but if there's a
// commit with multiple parents on this branch, we have
// to make sure we follow all lines of history, which
// means that we need to track all of the commit hashes
// which we expect to find but have not yet. We can stop
// when we've found all of the hashes we're looking for.
lookingFor := map[string]bool{}
if err := r.processCommits(ctx, func(ctx context.Context, cb *commitBatch) error {
numIngested := 0
defer func() {
sklog.Infof("Added %d of batch of %d commits", numIngested, len(cb.commits))
}()
for _, c := range cb.commits {
// Remove this commit from lookingFor,
// now that we've found it.
delete(lookingFor, c.Hash)
// Add the commit to the local cache,
// if it's not already present. Track
// the number of new commits we've seen.
if _, ok := localCache[c.Hash]; !ok {
commits = append(commits, c)
localCache[c.Hash] = c
numIngested++
}
// Add any parents of this commit which
// are not already in our cache to the
// lookingFor set.
for _, p := range c.Parents {
if _, ok := localCache[p]; !ok {
lookingFor[p] = true
}
}
// If we've found all the commits we
// need, we can stop.
if len(lookingFor) == 0 {
return gitiles.ErrStopIteration
}
}
return nil
}, func(ctx context.Context, commitsCh chan<- *commitBatch) error {
logExpr := branch.Head
if oldHead, ok := oldBranches[branch.Name]; ok {
logExpr = git.LogFromTo(oldHead, branch.Head)
}
return r.loadCommitsFromGitiles(ctx, branch.Name, logExpr, commitsCh, gitiles.LogBatchSize(batchSize))
}); err != nil {
return skerr.Wrapf(err, "Failed to ingest commits for branch %s.", branch.Name)
}
// Reverse the slice of commits so that they can be added in
// order.
for i := 0; i < len(commits)/2; i++ {
j := len(commits) - i - 1
commits[i], commits[j] = commits[j], commits[i]
}
mtx.Lock()
defer mtx.Unlock()
if err := initialIngestCommitBatch(ctx, graph, ri.MemCacheRepoImpl, &commitBatch{
branch: branch.Name,
commits: commits,
}); err != nil {
return skerr.Wrapf(err, "Failed to add commits to graph for %s", branch.Name)
}
for _, c := range commits {
r.Commits[c.Hash] = c
}
sklog.Infof("Loading commits for %s done", branch.Name)
return nil
})
}
// Wait for the above goroutines to finish.
if err := egroup.Wait(); err != nil {
return skerr.Wrap(err)
}
// Wait for the initialIngestRepoImpl to finish backing up to GCS.
ri.Wait()
t.Stop()
// Replace the fake branches with real ones. Update branch membership.
ri.BranchList = branches
if err := graph.Update(ctx); err != nil {
return skerr.Wrapf(err, "Failed final Graph update.")
}
ri.Wait()
r.BranchList = branches
sklog.Infof("Finished initial ingestion of %s; have %d commits and %d branches.", r.gitiles.URL, graph.Len(), len(graph.Branches()))
return nil
}
// See documentation for RepoImpl interface.
func (r *repoImpl) Update(ctx context.Context) error {
sklog.Infof("repoImpl.Update for %s", r.gitiles.URL)
defer timer.New("repoImpl.Update for " + r.gitiles.URL).Stop()
if len(r.BranchList) == 0 {
if err := r.initialIngestion(ctx); err != nil {
return skerr.Wrapf(err, "Failed initial ingestion.")
}
}
// Find the old and new branch heads.
sklog.Infof("Getting branches for %s.", r.gitiles.URL)
oldBranches := make(map[string]*git.Branch, len(r.BranchList))
for _, branch := range r.BranchList {
oldBranches[branch.Name] = branch
}
branches, err := r.gitiles.Branches(ctx)
if err != nil {
return skerr.Wrapf(err, "Failed loading branches from Gitiles.")
}
// Download any new commits and add them to the local cache.
sklog.Infof("Processing new commits for %s.", r.gitiles.URL)
if err := r.processCommits(ctx, func(ctx context.Context, cb *commitBatch) error {
// Add the new commits to our local cache.
for _, c := range cb.commits {
r.Commits[c.Hash] = c
}
return nil
}, func(ctx context.Context, commitsCh chan<- *commitBatch) error {
for _, branch := range branches {
logExpr := branch.Head
if oldBranch, ok := oldBranches[branch.Name]; ok {
// If there's nothing new, skip this branch.
if branch.Head == oldBranch.Head {
continue
}
// Only load back to the previous branch head.
logExpr = fmt.Sprintf("%s..%s", oldBranch.Head, branch.Head)
}
if err := r.loadCommitsFromGitiles(ctx, branch.Name, logExpr, commitsCh); err != nil {
return skerr.Wrapf(err, "Failed loading commits for %s", branch.Head)
}
}
return nil
}); err != nil {
return err
}
r.BranchList = branches
return nil
}
// See documentation for RepoImpl interface.
func (r *repoImpl) Details(ctx context.Context, hash string) (*vcsinfo.LongCommit, error) {
c, err := r.MemCacheRepoImpl.Details(ctx, hash)
if err == nil {
return c, nil
}
// Fall back to retrieving from Gitiles, store any new commits in the
// local cache.
sklog.Errorf("Missing commit %s in %s", hash[:7], r.gitiles.URL)
lookingFor := map[string]bool{}
if err := r.processCommits(ctx, func(ctx context.Context, cb *commitBatch) error {
// Add the new commits to our local cache.
for _, c := range cb.commits {
delete(lookingFor, c.Hash)
for _, p := range c.Parents {
if _, ok := r.Commits[p]; !ok {
lookingFor[p] = true
}
}
r.Commits[c.Hash] = c
if len(lookingFor) == 0 {
return gitiles.ErrStopIteration
}
}
return nil
}, func(ctx context.Context, commitsCh chan<- *commitBatch) error {
return r.loadCommitsFromGitiles(ctx, "", hash, commitsCh)
}); err != nil {
return nil, err
}
c, ok := r.Commits[hash]
if !ok {
return nil, skerr.Fmt("Commit %s in %s is still missing despite attempting to load it from gitiles.", hash, r.gitiles.URL)
}
return c, nil
}
// See documentation for RepoImpl interface.
func (r *repoImpl) UpdateCallback(ctx context.Context, added, removed []*vcsinfo.LongCommit, graph *repograph.Graph) error {
sklog.Infof("repoImpl.UpdateCallback for %s", r.gitiles.URL)
defer timer.New("repoImpl.UpdateCallback for " + r.gitiles.URL).Stop()
// Ensure that branch membership is up to date.
modified := graph.UpdateBranchInfo()
modifiedMap := make(map[string]*vcsinfo.LongCommit, len(modified)+len(added))
for _, c := range added {
modifiedMap[c.Hash] = c
}
for _, c := range modified {
modifiedMap[c.Hash] = c
}
// Don't include commits in the 'removed' list.
for _, c := range removed {
delete(modifiedMap, c.Hash)
}
putCommits := make([]*vcsinfo.LongCommit, 0, len(modifiedMap))
for _, c := range modifiedMap {
putCommits = append(putCommits, c)
}
sklog.Infof("Put %d new and %d modified commits for %s.", len(added), len(modified), r.gitiles.URL)
if err := r.gitstore.Put(ctx, putCommits); err != nil {
return skerr.Wrapf(err, "Failed putting commits into GitStore.")
}
// TODO(borenet): Should we delete commits which were removed?
branchHeads := graph.BranchHeads()
branches := make(map[string]string, len(branchHeads))
for _, b := range branchHeads {
branches[b.Name] = b.Head
}
// Explicitly delete any old branches which are no longer present.
oldBranches, err := r.gitstore.GetBranches(ctx)
if err != nil {
return skerr.Wrapf(err, "Failed to retrieve old branch heads.")
}
for name := range oldBranches {
if _, ok := branches[name]; !ok {
branches[name] = gitstore.DELETE_BRANCH
}
}
return r.gitstore.PutBranches(ctx, branches)
}