package watcher
import (
const (
// batchSize is the number of commits retrieved at a time from Gitiles.
batchSize = 10000
var (
// Don't delete these branches. For some reason, this branch is
// occasionally missing from the branch heads we get back from Gitiles,
// And updating the branch info and re-ingesting the commits wastes
// time. Maps repo URL to branch name to bool, indicating that deletion
// of this branch in this repo should be skipped.
// See http://b/139938100 for more information.
ignoreDeletedBranch = map[string]map[string]bool{
common.REPO_SKIA: {
"chrome/m65": true,
// Start creates a GitStore with the provided information and starts periodic
// ingestion.
func Start(ctx context.Context, conf *bt_gitstore.BTConfig, repoURL, gitilesURL, gcsBucket, gcsPath string, interval time.Duration, ts oauth2.TokenSource) error {
sklog.Infof("Initializing watcher for %s", repoURL)
gitStore, err := bt_gitstore.New(ctx, conf, repoURL)
if err != nil {
return skerr.Wrapf(err, "Error instantiating git store for %s.", repoURL)
client := httputils.DefaultClientConfig().WithTokenSource(ts).Client()
gr := gitiles.NewRepo(gitilesURL, client)
s, err := storage.NewClient(ctx)
if err != nil {
return skerr.Wrapf(err, "Failed to create storage client for %s.", gcsBucket)
gcsClient := gcsclient.New(s, gcsBucket)
p, err := pubsub.NewPublisher(ctx, conf, gitStore.RepoID, ts)
if err != nil {
return skerr.Wrapf(err, "Failed to create PubSub publisher for %s", repoURL)
ri, err := newRepoImpl(ctx, gitStore, gr, gcsClient, gcsPath, p)
if err != nil {
return skerr.Wrapf(err, "Failed to create RepoImpl for %s; using gs://%s/%s.", repoURL, gcsBucket, gcsPath)
sklog.Infof("Building Graph for %s...", repoURL)
repo, err := repograph.NewWithRepoImpl(ctx, ri)
if err != nil {
return skerr.Wrapf(err, "Failed to create repo graph for %s.", repoURL)
// Start periodic ingestion.
lvGitSync := metrics2.NewLiveness("last_successful_git_sync", map[string]string{"repo": repoURL})
cleanup.Repeat(interval, func(ctx context.Context) {
defer metrics2.FuncTimer().Stop()
// Catch any panic and log relevant information to find the root cause.
defer func() {
if err := recover(); err != nil {
sklog.Errorf("Panic updating %s: %s\n%s", repoURL, err, string(debug.Stack()))
sklog.Infof("Updating %s...", repoURL)
if err := repo.Update(ctx); err != nil {
sklog.Errorf("Error updating %s: %s", repoURL, err)
} else {
gotBranches, err := gitStore.GetBranches(ctx)
if err != nil {
sklog.Errorf("Successfully updated %s but failed to retrieve branch heads: %s", repoURL, err)
} else {
sklog.Infof("Successfully updated %s", repoURL)
for name, branch := range gotBranches {
sklog.Debugf(" %s@%s: %d, %s", path.Base(repoURL), name, branch.Index, branch.Head)
}, nil)
return nil
// repoImpl is an implementation of repograph.RepoImpl which loads commits into
// a GitStore.
type repoImpl struct {
gcsClient gcs.GCSClient
gcsPath string
gitiles *gitiles.Repo
gitstore gitstore.GitStore
// The Publisher may be nil, in which case no pubsub messages are sent.
pubsub *pubsub.Publisher
// newRepoImpl returns a repograph.RepoImpl which uses both Gitiles and
// GitStore.
func newRepoImpl(ctx context.Context, gs gitstore.GitStore, repo *gitiles.Repo, gcsClient gcs.GCSClient, gcsPath string, p *pubsub.Publisher) (repograph.RepoImpl, error) {
indexCommits, err := gs.RangeByTime(ctx, vcsinfo.MinTime, vcsinfo.MaxTime, gitstore.ALL_BRANCHES)
if err != nil {
return nil, skerr.Wrapf(err, "Failed loading IndexCommits from GitStore.")
var commits []*vcsinfo.LongCommit
if len(indexCommits) > 0 {
hashes := make([]string, 0, len(indexCommits))
for _, c := range indexCommits {
hashes = append(hashes, c.Hash)
commits, err = gs.Get(ctx, hashes)
if err != nil {
return nil, skerr.Wrapf(err, "Failed loading LongCommits from GitStore.")
gb, err := gs.GetBranches(ctx)
if err != nil {
return nil, skerr.Wrapf(err, "Failed loading branches from GitStore.")
branches := make([]*git.Branch, 0, len(gb))
for name, branch := range gb {
branches = append(branches, &git.Branch{
Name: name,
Head: branch.Head,
commitsMap := make(map[string]*vcsinfo.LongCommit, len(commits))
for _, c := range commits {
commitsMap[c.Hash] = c
sklog.Infof("Repo %s has %d commits and %d branches.", repo.URL, len(commits), len(branches))
for _, b := range branches {
sklog.Infof(" branch %s @ %s", b.Name, b.Head)
return &repoImpl{
MemCacheRepoImpl: repograph.NewMemCacheRepoImpl(commitsMap, branches),
gcsClient: gcsClient,
gcsPath: gcsPath,
gitiles: repo,
gitstore: gs,
pubsub: p,
}, nil
type commitBatch struct {
branch string
commits []*vcsinfo.LongCommit
// processCommits processes commits in a separate goroutine while the
// passed-in func loads commits from Gitiles.
func (r *repoImpl) processCommits(ctx context.Context, process func(context.Context, *commitBatch) error, loadCommits func(context.Context, chan<- *commitBatch) error) error {
// Run GitStore ingestion in a goroutine. Create a cancelable context
// to halt requests to Gitiles if GitStore ingestion fails.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
commitsCh := make(chan *commitBatch)
errCh := make(chan error)
go func() {
var err error
for cb := range commitsCh {
if err != nil {
// We've hit an error but we need to consume
// all of the commits from the channel before
// returning, or the passed-in func may block
// forever.
sklog.Warningf("Skipping %d commits due to previous error.", len(cb.commits))
// Process the commits.
if process != nil {
err = process(ctx, cb)
if err != nil {
if err != gitiles.ErrStopIteration {
sklog.Errorf("processCommits encountered error: %s", err)
// Cancel the context passed to loadCommits().
// If it respects context.Done() as it's
// supposed to, then it should exit early with
// an error.
// Signal that we're done ingesting commits.
errCh <- err
// Run the passed-in func.
loadingErr := loadCommits(ctx, commitsCh)
// Close the commits channel, wait for the goroutine to complete.
processErr := <-errCh
// The error returned from the gitstore goroutine takes precedence,
// because we cancel the context when gitstore.Put fails, and thus
// loadCommits() may return an error simply stating that the context was
// canceled.
if processErr != nil {
if processErr == gitiles.ErrStopIteration {
// Ignore the loadingErr in this case, since it's almost
// certainly just "context canceled".
return nil
if loadingErr != nil {
return skerr.Wrapf(processErr, "GitStore ingestion failed, and commit-loading func failed with: %s", loadingErr)
return processErr
return loadingErr
// loadCommitsFromGitiles loads commits from Gitiles and pushes them onto the
// given channel, until we reach the optional from commit, or any other commit
// we've seen before.
func (r *repoImpl) loadCommitsFromGitiles(ctx context.Context, branch, logExpr string, commitsCh chan<- *commitBatch, opts ...gitiles.LogOption) error {
return r.gitiles.LogFnBatch(ctx, logExpr, func(ctx context.Context, commits []*vcsinfo.LongCommit) error {
commitsCh <- &commitBatch{
branch: branch,
commits: commits,
return nil
}, opts...)
// initialIngestion performs the first-time ingestion of the repo.
func (r *repoImpl) initialIngestion(ctx context.Context) error {
sklog.Warningf("Performing initial ingestion of %s.", r.gitiles.URL)
defer metrics2.FuncTimer().Stop()
// Create a tmpGitStore.
sklog.Info("Retrieving graph from temporary store.")
t := timer.New("Retrieving graph from temp store")
graph, ri, err := setupInitialIngest(ctx, r.gcsClient, r.gcsPath, r.gitiles.URL)
if err != nil {
return skerr.Wrapf(err, "Failed initial ingestion of %s using GCS file %s", r.gitiles.URL, r.gcsPath)
for _, c := range graph.GetAll() {
r.Commits[c.Hash] = c.LongCommit
// oldBranches maps branch names to commit hashes for the existing
// branches.
oldBranches := map[string]string{}
for _, b := range graph.BranchHeads() {
oldBranches[b.Name] = b.Head
// Find the current set of branches.
t = timer.New("Loading commits from gitiles")
branches, err := r.gitiles.Branches(ctx)
if err != nil {
return skerr.Wrapf(err, "Failed loading branches from Gitiles.")
// Load commits from gitiles.
// We assume that master contains the majority of the commits in the
// repo, and the other branches are comparatively small, with most of
// their ancestry being on master itself.
var master *git.Branch
for _, b := range branches {
if b.Name == "master" {
master = b
if master != nil {
if master.Head == oldBranches[master.Name] {
sklog.Infof("Master is up to date; skipping.")
} else {
sklog.Info("Loading commits from gitiles for master.")
if err := r.processCommits(ctx, func(ctx context.Context, cb *commitBatch) error {
// Add the new commits to our local cache.
sklog.Infof("Adding batch of %d commits", len(cb.commits))
for _, c := range cb.commits {
r.Commits[c.Hash] = c
return initialIngestCommitBatch(ctx, graph, ri.MemCacheRepoImpl, cb)
}, func(ctx context.Context, commitsCh chan<- *commitBatch) error {
logExpr := master.Head
if oldHead, ok := oldBranches[master.Name]; ok {
sklog.Errorf("Have master @ %s; requesting %s", oldHead, master.Head)
logExpr = fmt.Sprintf("%s..%s", oldHead, master.Head)
return r.loadCommitsFromGitiles(ctx, master.Name, logExpr, commitsCh, gitiles.LogReverse(), gitiles.LogBatchSize(batchSize))
}); err != nil {
return skerr.Wrapf(err, "Failed to ingest commits for master.")
// mtx protects graph and ri.commits as we load commits for non-master
// branches in different goroutines.
var mtx sync.Mutex
// Load commits for other branches, in non-reverse order, so that we can
// stop once we reach commits already on the master branch.
var egroup errgroup.Group
for _, branch := range branches {
branch := branch
if branch == master {
sklog.Infof("Loading commits for %s", branch.Name)
_, exists := r.Commits[branch.Head]
if exists {
sklog.Infof(" ... already have %s, skip", branch.Head)
egroup.Go(func() error {
var commits []*vcsinfo.LongCommit
localCache := make(map[string]*vcsinfo.LongCommit, len(r.Commits))
for h, c := range r.Commits {
localCache[h] = c
// lookingFor tracks which commits are wanted by this
// branch. As we traverse back through git history, we
// may find commits which we already have in our local
// cache. One might think that we could stop requesting
// batches of commits at that point, but if there's a
// commit with multiple parents on this branch, we have
// to make sure we follow all lines of history, which
// means that we need to track all of the commit hashes
// which we expect to find but have not yet. We can stop
// when we've found all of the hashes we're looking for.
lookingFor := map[string]bool{}
if err := r.processCommits(ctx, func(ctx context.Context, cb *commitBatch) error {
numIngested := 0
defer func() {
sklog.Infof("Added %d of batch of %d commits", numIngested, len(cb.commits))
for _, c := range cb.commits {
// Remove this commit from lookingFor,
// now that we've found it.
delete(lookingFor, c.Hash)
// Add the commit to the local cache,
// if it's not already present. Track
// the number of new commits we've seen.
if _, ok := localCache[c.Hash]; !ok {
commits = append(commits, c)
localCache[c.Hash] = c
// Add any parents of this commit which
// are not already in our cache to the
// lookingFor set.
for _, p := range c.Parents {
if _, ok := localCache[p]; !ok {
lookingFor[p] = true
// If we've found all the commits we
// need, we can stop.
if len(lookingFor) == 0 {
return gitiles.ErrStopIteration
return nil
}, func(ctx context.Context, commitsCh chan<- *commitBatch) error {
logExpr := branch.Head
if oldHead, ok := oldBranches[branch.Name]; ok {
logExpr = git.LogFromTo(oldHead, branch.Head)
return r.loadCommitsFromGitiles(ctx, branch.Name, logExpr, commitsCh, gitiles.LogBatchSize(batchSize))
}); err != nil {
return skerr.Wrapf(err, "Failed to ingest commits for branch %s.", branch.Name)
// Reverse the slice of commits so that they can be added in
// order.
for i := 0; i < len(commits)/2; i++ {
j := len(commits) - i - 1
commits[i], commits[j] = commits[j], commits[i]
defer mtx.Unlock()
if err := initialIngestCommitBatch(ctx, graph, ri.MemCacheRepoImpl, &commitBatch{
branch: branch.Name,
commits: commits,
}); err != nil {
return skerr.Wrapf(err, "Failed to add commits to graph for %s", branch.Name)
for _, c := range commits {
r.Commits[c.Hash] = c
sklog.Infof("Loading commits for %s done", branch.Name)
return nil
// Wait for the above goroutines to finish.
if err := egroup.Wait(); err != nil {
return skerr.Wrap(err)
// Wait for the initialIngestRepoImpl to finish backing up to GCS.
// Replace the fake branches with real ones. Update branch membership.
ri.BranchList = branches
if err := graph.Update(ctx); err != nil {
return skerr.Wrapf(err, "Failed final Graph update.")
r.BranchList = branches
sklog.Infof("Finished initial ingestion of %s; have %d commits and %d branches.", r.gitiles.URL, graph.Len(), len(graph.Branches()))
return nil
// addCommitsToCacheFn returns a function, intended to be passed as a
// parameter to processCommits, which adds any new commits to the local cache
// and stops iteration when all new commits have been added.
func (r *repoImpl) addCommitsToCacheFn() func(context.Context, *commitBatch) error {
lookingFor := map[string]bool{}
return func(_ context.Context, cb *commitBatch) error {
// Add the new commits to our local cache.
for _, c := range cb.commits {
delete(lookingFor, c.Hash)
for _, p := range c.Parents {
if _, ok := r.Commits[p]; !ok {
lookingFor[p] = true
r.Commits[c.Hash] = c
if len(lookingFor) == 0 {
return gitiles.ErrStopIteration
return nil
// See documentation for RepoImpl interface.
func (r *repoImpl) Update(ctx context.Context) error {
sklog.Infof("repoImpl.Update for %s", r.gitiles.URL)
defer metrics2.FuncTimer().Stop()
if len(r.BranchList) == 0 {
if err := r.initialIngestion(ctx); err != nil {
return skerr.Wrapf(err, "Failed initial ingestion.")
// Find the old and new branch heads.
sklog.Infof("Getting branches for %s.", r.gitiles.URL)
oldBranches := make(map[string]*git.Branch, len(r.BranchList))
for _, branch := range r.BranchList {
oldBranches[branch.Name] = branch
branches, err := r.gitiles.Branches(ctx)
if err != nil {
return skerr.Wrapf(err, "Failed loading branches from Gitiles.")
// If any of the whitelisted old branches disappeared, add it back.
newBranches := make(map[string]string, len(branches))
for _, branch := range branches {
newBranches[branch.Name] = branch.Head
for name, b := range oldBranches {
if _, ok := newBranches[name]; !ok && ignoreDeletedBranch[r.gitiles.URL][name] {
sklog.Warningf("Branch %q missing from new branches; ignoring due to explicit whitelist.", name)
branches = append(branches, b)
// Download any new commits and add them to the local cache.
sklog.Infof("Processing new commits for %s.", r.gitiles.URL)
for _, branch := range branches {
logExpr := branch.Head
if oldBranch, ok := oldBranches[branch.Name]; ok {
// If there's nothing new, skip this branch.
if branch.Head == oldBranch.Head {
// Only load back to the previous branch head.
logExpr = fmt.Sprintf("%s..%s", oldBranch.Head, branch.Head)
if err := r.processCommits(ctx, r.addCommitsToCacheFn(), func(ctx context.Context, commitsCh chan<- *commitBatch) error {
err := r.loadCommitsFromGitiles(ctx, branch.Name, logExpr, commitsCh)
if err != nil && strings.Contains(err.Error(), "404 Not Found") {
// If history was changed, the old branch head
// may not be present on the server. Try again
// as if the branch is new.
sklog.Errorf("Failed loading commits for %s (%q); trying %s: %s", branch.Name, logExpr, branch.Head, err)
err = r.loadCommitsFromGitiles(ctx, branch.Name, branch.Head, commitsCh)
if err != nil {
return skerr.Wrapf(err, "Failed loading commits for %s (%s); %q", branch.Name, branch.Head, logExpr)
return nil
}); err != nil {
return err
r.BranchList = branches
return nil
// See documentation for RepoImpl interface.
func (r *repoImpl) Details(ctx context.Context, hash string) (*vcsinfo.LongCommit, error) {
c, err := r.MemCacheRepoImpl.Details(ctx, hash)
if err == nil {
return c, nil
// Fall back to retrieving from Gitiles, store any new commits in the
// local cache.
sklog.Errorf("Missing commit %s in %s", hash[:7], r.gitiles.URL)
if err := r.processCommits(ctx, r.addCommitsToCacheFn(), func(ctx context.Context, commitsCh chan<- *commitBatch) error {
return r.loadCommitsFromGitiles(ctx, "", hash, commitsCh)
}); err != nil {
return nil, err
c, ok := r.Commits[hash]
if !ok {
return nil, skerr.Fmt("Commit %s in %s is still missing despite attempting to load it from gitiles.", hash, r.gitiles.URL)
return c, nil
// See documentation for RepoImpl interface.
func (r *repoImpl) UpdateCallback(ctx context.Context, added, removed []*vcsinfo.LongCommit, graph *repograph.Graph) error {
sklog.Infof("repoImpl.UpdateCallback for %s", r.gitiles.URL)
defer metrics2.FuncTimer().Stop()
// Ensure that branch membership is up to date.
modified := graph.UpdateBranchInfo()
modifiedMap := make(map[string]*vcsinfo.LongCommit, len(modified)+len(added))
for _, c := range added {
modifiedMap[c.Hash] = c
for _, c := range modified {
modifiedMap[c.Hash] = c
// Don't include commits in the 'removed' list.
for _, c := range removed {
delete(modifiedMap, c.Hash)
putCommits := make([]*vcsinfo.LongCommit, 0, len(modifiedMap))
for _, c := range modifiedMap {
putCommits = append(putCommits, c)
// TODO(borenet): Should we delete commits which were removed?
sklog.Infof("Put %d new and %d modified commits for %s.", len(added), len(modified), r.gitiles.URL)
if err := r.gitstore.Put(ctx, putCommits); err != nil {
return skerr.Wrapf(err, "Failed putting commits into GitStore.")
// Figure out which branches changed.
oldBranches, err := r.gitstore.GetBranches(ctx)
if err != nil {
return skerr.Wrapf(err, "Failed to retrieve old branch heads.")
branchHeads := graph.BranchHeads()
allBranches := make(map[string]string, len(branchHeads))
updateBranches := make(map[string]string, len(branchHeads))
for _, b := range branchHeads {
allBranches[b.Name] = b.Head
if old, ok := oldBranches[b.Name]; !ok || old.Head != b.Head {
updateBranches[b.Name] = b.Head
// Explicitly delete any old branches which are no longer present.
for name := range oldBranches {
if _, ok := allBranches[name]; !ok {
updateBranches[name] = gitstore.DELETE_BRANCH
if err := r.gitstore.PutBranches(ctx, updateBranches); err != nil {
return skerr.Wrapf(err, "Failed to put new branch heads.")
if r.pubsub != nil && len(updateBranches) > 0 {
r.pubsub.Publish(ctx, updateBranches)
return nil