[gitstore] Use gitiles instead of a local git repo

Use a repograph.Graph to update the GitStore, with a RepoImpl that reads from
Gitiles and writes to GitStore as the Graph requests commits.

Bug: skia:9084
Change-Id: Ia19271890ec4240779885c5f7b79032fad3d9435
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/233361
Commit-Queue: Eric Boren <borenet@google.com>
Reviewed-by: Kevin Lubick <kjlubick@google.com>
diff --git a/gitsync/Makefile b/gitsync/Makefile
index be0d588..52cc4df0 100644
--- a/gitsync/Makefile
+++ b/gitsync/Makefile
@@ -12,7 +12,7 @@
 	go install ./go/btgit/...
 
 .PHONY: build_release
-build_release: build-static-gitsync
+release: build-static-gitsync
 	./build_release
 
 .PHONY: build-static-gitsync
diff --git a/gitsync/go/gitsync/config.go b/gitsync/go/gitsync/config.go
index 74405ce..c8c749d 100644
--- a/gitsync/go/gitsync/config.go
+++ b/gitsync/go/gitsync/config.go
@@ -10,15 +10,35 @@
 // gitSyncConfig contains the configuration options that can be defined in a config file.
 // The JSON names of the fields match the flags defined in main.go.
 type gitSyncConfig struct {
-	BTInstanceID    string             `json:"bt_instance"` // BigTable instance
-	BTTableID       string             `json:"bt_table"`    // BigTable table ID.
-	HttpPort        string             `json:"http_port"`   // HTTP port for the health endpoint.
-	Local           bool               `json:"local"`       // Indicating whether this is running local.
-	ProjectID       string             `json:"project"`     // GCP project ID.
-	PromPort        string             `json:"prom_port"`   // Port at which the Prometheus metrics are be exposed.
-	RepoURLs        []string           `json:"repo_url"`    // List of repository URLs that should be updated.
-	RefreshInterval human.JSONDuration `json:"refresh"`     // Interval at which to poll each git repository.
-	WorkDir         string             `json:"workdir"`     // Work directory that should contain the checkouts.
+	// BigTable instance.
+	BTInstanceID string `json:"bt_instance"`
+	// BigTable table ID.
+	BTTableID string `json:"bt_table"`
+	// Number of goroutines to use when writing to BigTable. This is a
+	// tradeoff between write throughput and memory usage; more goroutines
+	// will achieve higher throughput but will also use more memory. There
+	// are diminishing returns here, as the number of CPU cores and BigTable
+	// performance will also limit throughput. The default value in
+	// bt_gitstore.DefaultWriteGoroutines has been shown to keep memory
+	// usage within a reasonable range while still providing decent
+	// throughput; you should only need to override this value in the case
+	// of high memory pressure (fewer goroutines) or the initial ingestion
+	// of an exceptionally large repository (more goroutines).
+	BTWriteGoroutines int `json:"bt_write_goroutines"`
+	// HTTP port for the health endpoint.
+	HttpPort string `json:"http_port"`
+	// Indicating whether this is running local.
+	Local bool `json:"local"`
+	// GCP project ID.
+	ProjectID string `json:"project"`
+	// Port at which the Prometheus metrics are be exposed.
+	PromPort string `json:"prom_port"`
+	// List of repository URLs that should be updated.
+	RepoURLs []string `json:"repo_url"`
+	// Interval at which to poll each git repository.
+	RefreshInterval human.JSONDuration `json:"refresh"`
+	// Work directory that should contain the checkouts.
+	WorkDir string `json:"workdir"`
 }
 
 // String returns all configuration settings as a string intended to be printed upon startup
@@ -26,16 +46,17 @@
 func (g *gitSyncConfig) String() string {
 	ret := ""
 	prefix := "      "
-	ret += fmt.Sprintf("%s bt_instance  : %s\n", prefix, g.BTInstanceID)
-	ret += fmt.Sprintf("%s bt_table     : %s\n", prefix, g.BTTableID)
-	ret += fmt.Sprintf("%s http_port    : %s\n", prefix, g.HttpPort)
-	ret += fmt.Sprintf("%s local        : %s\n", prefix, strconv.FormatBool(g.Local))
-	ret += fmt.Sprintf("%s project      : %s\n", prefix, g.ProjectID)
-	ret += fmt.Sprintf("%s prom_port    : %s\n", prefix, g.PromPort)
+	ret += fmt.Sprintf("%s bt_instance        : %s\n", prefix, g.BTInstanceID)
+	ret += fmt.Sprintf("%s bt_table           : %s\n", prefix, g.BTTableID)
+	ret += fmt.Sprintf("%s bt_write_goroutines: %d\n", prefix, g.BTWriteGoroutines)
+	ret += fmt.Sprintf("%s http_port          : %s\n", prefix, g.HttpPort)
+	ret += fmt.Sprintf("%s local              : %s\n", prefix, strconv.FormatBool(g.Local))
+	ret += fmt.Sprintf("%s project            : %s\n", prefix, g.ProjectID)
+	ret += fmt.Sprintf("%s prom_port          : %s\n", prefix, g.PromPort)
 	for _, url := range g.RepoURLs {
-		ret += fmt.Sprintf("%s repo_url     : %s\n", prefix, url)
+		ret += fmt.Sprintf("%s repo_url           : %s\n", prefix, url)
 	}
-	ret += fmt.Sprintf("%s refresh      : %s\n", prefix, g.RefreshInterval.String())
-	ret += fmt.Sprintf("%s workdir      : %s\n", prefix, g.WorkDir)
+	ret += fmt.Sprintf("%s refresh            : %s\n", prefix, g.RefreshInterval.String())
+	ret += fmt.Sprintf("%s workdir            : %s\n", prefix, g.WorkDir)
 	return ret
 }
diff --git a/gitsync/go/gitsync/main.go b/gitsync/go/gitsync/main.go
index 3875b44..5038f4d 100644
--- a/gitsync/go/gitsync/main.go
+++ b/gitsync/go/gitsync/main.go
@@ -6,15 +6,12 @@
 	"io/ioutil"
 	"log"
 	"net/http"
-	"path/filepath"
-	"strings"
 	"time"
 
 	"github.com/flynn/json5"
+	"go.skia.org/infra/gitsync/go/watcher"
 	"go.skia.org/infra/go/auth"
 	"go.skia.org/infra/go/common"
-	"go.skia.org/infra/go/fileutil"
-	"go.skia.org/infra/go/git"
 	"go.skia.org/infra/go/gitauth"
 	"go.skia.org/infra/go/gitstore/bt_gitstore"
 	"go.skia.org/infra/go/httputils"
@@ -27,15 +24,15 @@
 
 // Default config/flag values
 var defaultConf = gitSyncConfig{
-	BTInstanceID:    "production",
-	BTTableID:       "git-repos2",
-	HttpPort:        ":9091",
-	Local:           false,
-	ProjectID:       "skia-public",
-	PromPort:        ":20000",
-	RepoURLs:        []string{},
-	RefreshInterval: human.JSONDuration(10 * time.Minute),
-	WorkDir:         "",
+	BTInstanceID:      "production",
+	BTTableID:         "git-repos2",
+	BTWriteGoroutines: bt_gitstore.DefaultWriteGoroutines,
+	HttpPort:          ":9091",
+	Local:             false,
+	ProjectID:         "skia-public",
+	PromPort:          ":20000",
+	RepoURLs:          []string{},
+	RefreshInterval:   human.JSONDuration(10 * time.Minute),
 }
 
 func main() {
@@ -45,17 +42,19 @@
 	// Flags that cause the flags below to be disregarded.
 	configFile := flag.String("config", "", "Disregard flags and load the configuration from this JSON5 config file. The keys and types of the config file match the flags.")
 	runInit := flag.Bool("init", false, "Initialize the BigTable instance and quit. This should be run with a different different user who has admin rights.")
+	gcsBucket := flag.String("gcs_bucket", "", "GCS bucket used for temporary storage during ingestion.")
+	gcsPath := flag.String("gcs_path", "", "GCS path used for temporary storage during ingestion.")
 
 	// Define flags that map to field in the configuration struct.
 	flag.StringVar(&config.BTInstanceID, "bt_instance", defaultConf.BTInstanceID, "Big Table instance")
 	flag.StringVar(&config.BTTableID, "bt_table", defaultConf.BTTableID, "BigTable table ID")
+	flag.IntVar(&config.BTWriteGoroutines, "bt_write_goroutines", defaultConf.BTWriteGoroutines, "Number of goroutines to use when writing to BigTable.")
 	flag.StringVar(&config.HttpPort, "http_port", defaultConf.HttpPort, "The http port where ready-ness endpoints are served.")
 	flag.BoolVar(&config.Local, "local", defaultConf.Local, "Running locally if true. As opposed to in production.")
 	flag.StringVar(&config.ProjectID, "project", defaultConf.ProjectID, "ID of the GCP project")
 	flag.StringVar(&config.PromPort, "prom_port", defaultConf.PromPort, "Metrics service address (e.g., ':10110')")
 	common.MultiStringFlagVar(&config.RepoURLs, "repo_url", defaultConf.RepoURLs, "Repo url")
 	flag.DurationVar((*time.Duration)(&config.RefreshInterval), "refresh", time.Duration(defaultConf.RefreshInterval), "Interval in which to poll git and refresh the GitStore.")
-	flag.StringVar(&config.WorkDir, "workdir", defaultConf.WorkDir, "Working directory where repos are cached. Use the same directory between calls to speed up checkout time.")
 
 	common.InitWithMust(
 		"gitsync",
@@ -81,9 +80,10 @@
 
 	// Configure the bigtable instance.
 	btConfig := &bt_gitstore.BTConfig{
-		ProjectID:  config.ProjectID,
-		InstanceID: config.BTInstanceID,
-		TableID:    config.BTTableID,
+		ProjectID:       config.ProjectID,
+		InstanceID:      config.BTInstanceID,
+		TableID:         config.BTTableID,
+		WriteGoroutines: config.BTWriteGoroutines,
 	}
 
 	// Initialize bigtable if invoked with --init and quit.
@@ -97,12 +97,6 @@
 		return
 	}
 
-	// Make sure we have a data directory and it exists or can be created.
-	if config.WorkDir == "" {
-		sklog.Fatal("No workdir specified.")
-	}
-	useWorkDir := fileutil.Must(fileutil.EnsureDirExists(config.WorkDir))
-
 	// Make sure we have at least one repo configured.
 	if len(config.RepoURLs) == 0 {
 		sklog.Fatalf("At least one repository URL must be configured.")
@@ -117,9 +111,10 @@
 	}
 
 	// Set up Git authentication if a service account email was set.
+	gitcookiesPath := ""
 	if !config.Local {
 		// Use the gitcookie created by the gitauth package.
-		gitcookiesPath := "/tmp/gitcookies"
+		gitcookiesPath = "/tmp/gitcookies"
 		sklog.Infof("Writing gitcookies to %s", gitcookiesPath)
 		if _, err := gitauth.New(ts, gitcookiesPath, true, ""); err != nil {
 			sklog.Fatalf("Failed to create git cookie updater: %s", err)
@@ -127,24 +122,12 @@
 		sklog.Infof("Git authentication set up successfully.")
 	}
 
-	// Start all repo watchers in the background.
+	// Start all repo watchers.
 	ctx := context.Background()
 	for _, repoURL := range config.RepoURLs {
-		go func(repoURL string) {
-			repoDir, err := git.NormalizeURL(repoURL)
-			if err != nil {
-				sklog.Fatalf("Error getting normalized URL for %q:  %s", repoURL, err)
-			}
-			repoDir = strings.Replace(repoDir, "/", "_", -1)
-			repoDir = filepath.Join(useWorkDir, repoDir)
-			sklog.Infof("Checking out %s into %s", repoURL, repoDir)
-
-			watcher, err := NewRepoWatcher(ctx, btConfig, repoURL, repoDir)
-			if err != nil {
-				sklog.Fatalf("Error initializing repo watcher: %s", err)
-			}
-			watcher.Start(ctx, time.Duration(config.RefreshInterval))
-		}(repoURL)
+		if err := watcher.Start(ctx, btConfig, repoURL, gitcookiesPath, *gcsBucket, *gcsPath, time.Duration(config.RefreshInterval)); err != nil {
+			sklog.Fatalf("Error initializing repo watcher: %s", err)
+		}
 	}
 
 	// Set up the http handler to indicate ready-ness and start serving.
diff --git a/gitsync/go/gitsync/watcher.go b/gitsync/go/gitsync/watcher.go
deleted file mode 100644
index 51691f5..0000000
--- a/gitsync/go/gitsync/watcher.go
+++ /dev/null
@@ -1,199 +0,0 @@
-package main
-
-import (
-	"context"
-	"fmt"
-	"runtime"
-	"time"
-
-	"go.skia.org/infra/go/fileutil"
-	"go.skia.org/infra/go/git"
-	"go.skia.org/infra/go/gitstore"
-	"go.skia.org/infra/go/gitstore/bt_gitstore"
-	"go.skia.org/infra/go/metrics2"
-	"go.skia.org/infra/go/skerr"
-	"go.skia.org/infra/go/sklog"
-	"go.skia.org/infra/go/util"
-	"go.skia.org/infra/go/vcsinfo"
-)
-
-const (
-	// batchSize is the size of a batch of commits that is imported into BTGit.
-	batchSize = 10000
-)
-
-// RepoWatcher continuously watches a repository and uploads changes to a BigTable Gitstore.
-type RepoWatcher struct {
-	gitStore gitstore.GitStore
-	repo     *git.Repo
-	repoDir  string
-	repoURL  string
-}
-
-// NewRepoWatcher creates a GitStore with the provided information and checks out the git repo
-// at repoURL into repoDir. It's Start(...) function will watch a repo in the background.
-func NewRepoWatcher(ctx context.Context, conf *bt_gitstore.BTConfig, repoURL, repoDir string) (*RepoWatcher, error) {
-	repoDir, err := fileutil.EnsureDirExists(repoDir)
-	if err != nil {
-		return nil, err
-	}
-
-	gitStore, err := bt_gitstore.New(ctx, conf, repoURL)
-	if err != nil {
-		return nil, skerr.Fmt("Error instantiating git store: %s", err)
-	}
-
-	repo, err := git.NewRepo(ctx, repoURL, repoDir)
-	if err != nil {
-		return nil, fmt.Errorf("Failed to create git repo: %s", err)
-	}
-
-	return &RepoWatcher{
-		gitStore: gitStore,
-		repo:     repo,
-		repoDir:  repoDir,
-		repoURL:  repoURL,
-	}, nil
-}
-
-// Start watches the repo in the background and updates the BT GitStore. The frequency is
-// defined by 'interval'.
-func (r *RepoWatcher) Start(ctx context.Context, interval time.Duration) {
-	lvGitSync := metrics2.NewLiveness("last_successful_git_sync", map[string]string{"repo": r.repoURL})
-	go util.RepeatCtx(interval, ctx, func(ctx context.Context) {
-		// Catch any panic and log relevant information to find the root cause.
-		defer func() {
-			if err := recover(); err != nil {
-				const size = 64 << 10
-				buf := make([]byte, size)
-				buf = buf[:runtime.Stack(buf, false)]
-				sklog.Errorf("Panic updating %s in %s:  %s\n%s", r.repoURL, r.repoDir, err, buf)
-			}
-		}()
-
-		if err := r.updateFn(); err != nil {
-			sklog.Errorf("Error updating %s: %s", r.repoURL, err)
-		} else {
-			lvGitSync.Reset()
-		}
-	})
-}
-
-// updateFn retrieves git info from the repository and updates the GitStore.
-func (r *RepoWatcher) updateFn() error {
-	// Update the git repo.
-	ctx := context.Background()
-	sklog.Infof("Updating repo ...")
-	if err := r.repo.Update(ctx); err != nil {
-		return skerr.Fmt("Failed to update repo: %s", err)
-	}
-
-	// Get the branches from the repo.
-	sklog.Info("Getting branches...")
-	branches, err := r.repo.Branches(ctx)
-	if err != nil {
-		return skerr.Fmt("Failed to get branches from Git repo: %s", err)
-	}
-
-	// Get the current branches from the GitStore.
-	currBranches, err := r.gitStore.GetBranches(ctx)
-	if err != nil {
-		return skerr.Fmt("Error retrieving branches from GitStore: %s", err)
-	}
-
-	// Find the hashes all all commits that need to be added to the GitStore. This
-	// considers all branches in the repo and whether they are already in the GitStore.
-	hashes := util.StringSet{}
-	for _, newBranch := range branches {
-		// revListStr is an argument to repo.RevList below and controls how many commits we
-		// retrieve. By default we retrieve all commits in the branch, but may restrict that if
-		// we find an ancester to the current branch (see below).
-		revListStr := newBranch.Head
-
-		// See if we have the branch in the repo already.
-		foundBranch, ok := currBranches[newBranch.Name]
-		if ok {
-			// If the branch hasn't changed we  are done.
-			if foundBranch.Head == newBranch.Head {
-				continue
-			}
-
-			// See if the new branch head is a descendant of the old branch head.
-			anc, err := r.repo.IsAncestor(ctx, foundBranch.Head, newBranch.Head)
-			if err != nil {
-				return skerr.Fmt("Error checking if %s is an ancestor of %s: %s", foundBranch.Head, newBranch.Head, err)
-			}
-
-			if anc {
-				// Only get the commits between the old and new head.
-				revListStr = git.LogFromTo(foundBranch.Head, newBranch.Head)
-			}
-		}
-
-		// Retrieve the target commits.
-		foundHashes, err := r.repo.RevList(ctx, "--topo-order", revListStr)
-		if err != nil {
-			return skerr.Fmt("Error retrieving hashes with the argument %q: %s", revListStr, err)
-		}
-		hashes.AddLists(foundHashes)
-	}
-	sklog.Infof("Repo @ %s: Found %d unique hashes in %d branches.", r.repoURL, len(hashes), len(branches))
-
-	// Iterate over the LongCommits that correspond to batches.
-	ctx, cancelFn := context.WithCancel(context.Background())
-	defer cancelFn()
-
-	commitsCh, err := r.iterateLongCommits(ctx, hashes.Keys(), batchSize)
-	if err != nil {
-		return skerr.Fmt("Error iterating over new commits: %s", err)
-	}
-
-	// Make sure we iterate over all commits, so we don't leak go-routine.
-	for commits := range commitsCh {
-		if err := r.gitStore.Put(ctx, commits); err != nil {
-			return skerr.Fmt("Error writing commits to BigTable: %s", err)
-		}
-	}
-
-	branchMap := make(map[string]string, len(branches))
-	for _, gb := range branches {
-		branchMap[gb.Name] = gb.Head
-	}
-	if err := r.gitStore.PutBranches(ctx, branchMap); err != nil {
-		return skerr.Fmt("Error calling PutBranches on GitStore: %s", err)
-	}
-	sklog.Infof("Repo @ %s: Branches updated successfully.", r.repoURL)
-	return nil
-}
-
-// iterateLongCommit returns batches of commits corresponding to the given hashes.
-func (r *RepoWatcher) iterateLongCommits(ctx context.Context, hashes []string, batchSize int) (<-chan []*vcsinfo.LongCommit, error) {
-	// Allocate a channel so can always send all batches and are not dependent on the speed of the receiver.
-	retCh := make(chan []*vcsinfo.LongCommit, len(hashes)/batchSize+1)
-
-	go func() {
-		longCommits := make([]*vcsinfo.LongCommit, 0, batchSize)
-		for idx, hash := range hashes {
-			// Check whether the context has been canceled.
-			select {
-			case <-ctx.Done():
-				return
-			default:
-			}
-
-			c, err := r.repo.Details(ctx, hash)
-			if err != nil {
-				sklog.Errorf("Error fetching commit %q: %s", hash, err)
-				continue
-			}
-
-			longCommits = append(longCommits, c)
-			if len(longCommits) >= batchSize || idx == (len(hashes)-1) {
-				retCh <- longCommits
-				longCommits = make([]*vcsinfo.LongCommit, 0, batchSize)
-			}
-		}
-		close(retCh)
-	}()
-	return retCh, nil
-}
diff --git a/gitsync/go/watcher/initial.go b/gitsync/go/watcher/initial.go
new file mode 100644
index 0000000..90f65fc
--- /dev/null
+++ b/gitsync/go/watcher/initial.go
@@ -0,0 +1,239 @@
+package watcher
+
+/*
+   This file contains code related to the initial ingestion of a git repo.
+*/
+
+import (
+	"context"
+	"fmt"
+	"path"
+	"strings"
+	"sync"
+	"time"
+
+	"cloud.google.com/go/storage"
+	"github.com/google/uuid"
+	"go.skia.org/infra/go/gcs"
+	"go.skia.org/infra/go/git"
+	"go.skia.org/infra/go/git/repograph"
+	"go.skia.org/infra/go/skerr"
+	"go.skia.org/infra/go/sklog"
+	"go.skia.org/infra/go/util"
+	"go.skia.org/infra/go/vcsinfo"
+)
+
+const (
+	fakeBranchPrefix = "///Fake"
+	gcsIdleWait      = time.Second
+	gcsRetryWait     = 3 * time.Second
+)
+
+// isFakeBranch returns true iff the given branch name is a fake one created by
+// getFakeBranch.
+func isFakeBranch(branch string) bool {
+	return strings.HasPrefix(branch, fakeBranchPrefix)
+}
+
+// getFakeBranch returns an unused fake branch name, given the map of existing
+// branch names to commit hashes.
+func getFakeBranch(existingBranches util.StringSet) string {
+	return fmt.Sprintf("%s%s", fakeBranchPrefix, uuid.New())
+}
+
+// initialIngestCommitBatch ingests the given batch of commits by adding them to
+// the Graph, ensuring that all of them are reachable from branch heads. Because
+// we're dealing with an incomplete Graph at this stage, it's possible that the
+// commitBatch may contain commits which are not reachable from any branch. In
+// that case, initialIngestCommitBatch will create fake branches as needed to
+// ensure that all commits are reachable. The commits in the batch must be in
+// topological order, ie. a commit's parents must appear before the commit.
+func initialIngestCommitBatch(ctx context.Context, graph *repograph.Graph, ri *repograph.MemCacheRepoImpl, cb *commitBatch) error {
+	// Get the current state of the branch heads.
+
+	// branchSet tracks which branches are present.
+	branchSet := util.NewStringSet()
+	// reverseBranchMap has commit hashes as keys and branch names as
+	// values. This will cause branches which point to the same commit to be
+	// deduplicated, which is desirable in the case of fake branches and is
+	// not a problem for real branches, because they will be fixed once the
+	// initial ingestion is complete.
+	reverseBranchMap := map[string]string{}
+	for _, b := range graph.BranchHeads() {
+		branchSet[b.Name] = true
+		reverseBranchMap[b.Head] = b.Name
+	}
+
+	// Loop over the commits in the commitBatch, "walking" the branch heads
+	// forward to account for the new commits.
+	for _, c := range cb.commits {
+		// Add the commit to the RepoImpl so that it can be found
+		// during graph.Update().
+		ri.Commits[c.Hash] = c
+
+		// Figure out which branches point to this commit's parents.
+		// Only keep the first real and fake branches we find; the rest
+		// can be thrown away. We assume that the first parent is the
+		// more important line of history.
+		var realBranch string
+		var fakeBranch string
+		for _, p := range c.Parents {
+			if name, ok := reverseBranchMap[p]; ok {
+				delete(branchSet, name)
+				delete(reverseBranchMap, p)
+				if isFakeBranch(name) {
+					if fakeBranch == "" {
+						fakeBranch = name
+					}
+				} else {
+					if realBranch == "" {
+						realBranch = name
+					}
+				}
+			}
+		}
+		var branch string
+		if realBranch != "" {
+			// If we have a real branch, use that.
+			branch = realBranch
+		} else if !branchSet[cb.branch] {
+			// If we haven't yet used the branch on the commitBatch,
+			// use that.
+			branch = cb.branch
+		} else if fakeBranch != "" {
+			// If we have a fake branch, fall back to that.
+			branch = fakeBranch
+		} else {
+			// No branch points to any of this commit's parents;
+			// create a fake branch to point to this commit.
+			branch = getFakeBranch(branchSet)
+		}
+		branchSet[branch] = true
+		reverseBranchMap[c.Hash] = branch
+	}
+
+	// Create the new set of branch heads.
+	branches := make([]*git.Branch, 0, len(reverseBranchMap))
+	for head, name := range reverseBranchMap {
+		branches = append(branches, &git.Branch{
+			Name: name,
+			Head: head,
+		})
+	}
+	ri.BranchList = branches
+
+	// Update the graph. This triggers a request to save the Graph to GCS.
+	if err := graph.Update(ctx); err != nil {
+		return skerr.Wrapf(err, "Failed to update Graph with new commits and branches.")
+	}
+	return nil
+}
+
+// setupInitialIngest creates a repograph.Graph and a RepoImpl to be used for
+// the initial ingestion of a git repo.
+func setupInitialIngest(ctx context.Context, gcsClient gcs.GCSClient, gcsPath, repoUrl string) (*repograph.Graph, *initialIngestRepoImpl, error) {
+	normUrl, err := git.NormalizeURL(repoUrl)
+	if err != nil {
+		return nil, nil, skerr.Wrapf(err, "Failed to normalize repo URL: %s", repoUrl)
+	}
+	file := path.Join(gcsPath, strings.ReplaceAll(normUrl, "/", "_"))
+	ri := newInitialIngestRepoImpl(ctx, gcsClient, file)
+	r, err := gcsClient.FileReader(ctx, file)
+	if err != nil {
+		if err == storage.ErrObjectNotExist {
+			g, err := repograph.NewWithRepoImpl(ctx, ri)
+			if err != nil {
+				return nil, nil, skerr.Wrapf(err, "Failed to create repo graph.")
+			}
+			ri.graph = g
+			return g, ri, nil
+		} else {
+			return nil, nil, skerr.Wrapf(err, "Failed to read Graph from GCS.")
+		}
+	}
+	defer util.Close(r)
+	g, err := repograph.NewFromGob(ctx, r, ri)
+	if err != nil {
+		return nil, nil, skerr.Wrapf(err, "Failed to create Graph from GCS.")
+	}
+	ri.graph = g
+	return g, ri, nil
+}
+
+// initialIngestRepoImpl is a struct used during initial ingestion of a git repo.
+type initialIngestRepoImpl struct {
+	*repograph.MemCacheRepoImpl
+	file             string
+	gcs              gcs.GCSClient
+	graph            *repograph.Graph
+	writeRequests    int
+	writeRequestsMtx sync.Mutex
+}
+
+// newInitialIngestRepoImpl returns a repograph.RepoImpl used for initial
+// ingestion of a git repo.
+func newInitialIngestRepoImpl(ctx context.Context, gcsClient gcs.GCSClient, file string) *initialIngestRepoImpl {
+	mem := repograph.NewMemCacheRepoImpl(map[string]*vcsinfo.LongCommit{}, nil)
+	ri := &initialIngestRepoImpl{
+		MemCacheRepoImpl: mem,
+		file:             file,
+		gcs:              gcsClient,
+	}
+	go func() {
+		for {
+			ri.writeRequestsMtx.Lock()
+			writeRequests := ri.writeRequests
+			ri.writeRequestsMtx.Unlock()
+			if writeRequests > 0 {
+				if err := ri.write(ctx); err != nil {
+					sklog.Errorf("Failed to write Graph to GCS: %s; will retry in %s", err, gcsRetryWait)
+					time.Sleep(gcsRetryWait)
+				} else {
+					ri.writeRequestsMtx.Lock()
+					ri.writeRequests -= writeRequests
+					ri.writeRequestsMtx.Unlock()
+				}
+			} else {
+				time.Sleep(gcsIdleWait)
+			}
+		}
+	}()
+	return ri
+}
+
+// See documentation for RepoImpl interface.
+func (ri *initialIngestRepoImpl) UpdateCallback(ctx context.Context, _, _ []*vcsinfo.LongCommit, _ *repograph.Graph) (rv error) {
+	ri.writeRequestsMtx.Lock()
+	defer ri.writeRequestsMtx.Unlock()
+	ri.writeRequests += 1
+	return nil
+}
+
+// Write the Graph to the backing store.
+func (ri *initialIngestRepoImpl) write(ctx context.Context) error {
+	sklog.Infof("Backing up graph with %d commits.", ri.graph.Len())
+	w := ri.gcs.FileWriter(ctx, ri.file, gcs.FILE_WRITE_OPTS_TEXT)
+	writeErr := ri.graph.WriteGob(w)
+	closeErr := w.Close()
+	if writeErr != nil && closeErr != nil {
+		return skerr.Wrapf(writeErr, "Failed to write Graph to GCS and failed to close GCS file with: %s", closeErr)
+	} else if writeErr != nil {
+		return skerr.Wrapf(writeErr, "Failed to write Graph to GCS.")
+	} else if closeErr != nil {
+		return skerr.Wrapf(closeErr, "Failed to close GCS file.")
+	}
+	return nil
+}
+
+// Wait for any push to the backing store to be finished.
+func (ri *initialIngestRepoImpl) Wait() {
+	for {
+		ri.writeRequestsMtx.Lock()
+		writeRequests := ri.writeRequests
+		ri.writeRequestsMtx.Unlock()
+		if writeRequests == 0 {
+			return
+		}
+		time.Sleep(time.Second)
+	}
+}
diff --git a/gitsync/go/watcher/initial_test.go b/gitsync/go/watcher/initial_test.go
new file mode 100644
index 0000000..4da1246
--- /dev/null
+++ b/gitsync/go/watcher/initial_test.go
@@ -0,0 +1,221 @@
+package watcher
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/google/uuid"
+	assert "github.com/stretchr/testify/require"
+	"go.skia.org/infra/go/deepequal"
+	"go.skia.org/infra/go/gcs/test_gcsclient"
+	"go.skia.org/infra/go/git"
+	"go.skia.org/infra/go/git/repograph"
+	git_testutils "go.skia.org/infra/go/git/testutils"
+	"go.skia.org/infra/go/gitiles"
+	gitiles_testutils "go.skia.org/infra/go/gitiles/testutils"
+	"go.skia.org/infra/go/gitstore"
+	"go.skia.org/infra/go/gitstore/mocks"
+	"go.skia.org/infra/go/mockhttpclient"
+	"go.skia.org/infra/go/testutils/unittest"
+	"go.skia.org/infra/go/util"
+	"go.skia.org/infra/go/vcsinfo"
+)
+
+func TestInitialIngestCommitBatch(t *testing.T) {
+	unittest.MediumTest(t)
+
+	ctx := context.Background()
+	ri := repograph.NewMemCacheRepoImpl(nil, nil)
+	graph, err := repograph.NewWithRepoImpl(ctx, ri)
+	assert.NoError(t, err)
+	assert.Equal(t, 0, graph.Len())
+	assert.Equal(t, 0, len(graph.Branches()))
+
+	gb := git_testutils.GitInit(t, ctx)
+	defer gb.Cleanup()
+	gd := git.GitDir(gb.Dir())
+
+	commit := func() *vcsinfo.LongCommit {
+		h := gb.CommitGen(ctx, uuid.New().String())
+		d, err := gd.Details(ctx, h)
+		assert.NoError(t, err)
+		return d
+	}
+
+	test := func(cb *commitBatch, expectCommits, expectBranches int) {
+		assert.NoError(t, initialIngestCommitBatch(ctx, graph, ri, cb))
+		assert.Equal(t, expectCommits, graph.Len())
+		assert.Equal(t, expectBranches, len(graph.Branches()))
+	}
+
+	// Empty batch, nothing to do.
+	test(&commitBatch{}, 0, 0)
+
+	// Verify that we create a new branch.
+	c0 := commit()
+	test(&commitBatch{
+		branch:  "mybranch", // Don't use master, to make sure we didn't pick it up accidentally.
+		commits: []*vcsinfo.LongCommit{c0},
+	}, 1, 1)
+	assert.True(t, util.In("mybranch", graph.Branches()))
+
+	// Verify that we walk the branch head forward for new commits.
+	c1 := commit()
+	test(&commitBatch{
+		branch:  "mybranch",
+		commits: []*vcsinfo.LongCommit{c1},
+	}, 2, 1)
+	assert.Equal(t, c1.Hash, graph.BranchHeads()[0].Head)
+	assert.False(t, isFakeBranch(graph.BranchHeads()[0].Name))
+
+	// Add two commits, both based at c1. Ensure that we create a fake
+	// branch for the second one.
+	c2 := commit()
+	gb.CreateBranchTrackBranch(ctx, "mybranch2", "master")
+	gb.Reset(ctx, "--hard", c1.Hash)
+	c3 := commit()
+	test(&commitBatch{
+		branch:  "mybranch",
+		commits: []*vcsinfo.LongCommit{c2, c3},
+	}, 4, 2)
+	var fakeBranch string
+	for _, b := range graph.BranchHeads() {
+		if b.Name == "mybranch" {
+			assert.False(t, isFakeBranch(b.Name))
+			assert.Equal(t, c2.Hash, b.Head)
+		} else {
+			fakeBranch = b.Name
+			assert.True(t, isFakeBranch(b.Name))
+			assert.Equal(t, c3.Hash, b.Head)
+		}
+	}
+	assert.NotEqual(t, "", fakeBranch)
+
+	// Add another commit on each branch. Ensure that we walk both branches
+	// forward as expected.
+	c4 := commit()
+	gb.CheckoutBranch(ctx, "master")
+	c5 := commit()
+	test(&commitBatch{
+		branch:  "mybranch",
+		commits: []*vcsinfo.LongCommit{c4, c5},
+	}, 6, 2)
+	for _, b := range graph.BranchHeads() {
+		if b.Name == "mybranch" {
+			assert.False(t, isFakeBranch(b.Name))
+			assert.Equal(t, c5.Hash, b.Head)
+		} else {
+			assert.Equal(t, fakeBranch, b.Name)
+			assert.True(t, isFakeBranch(b.Name))
+			assert.Equal(t, c4.Hash, b.Head)
+		}
+	}
+
+	// Another commit on each, then merge. Ensure that we kept the real
+	// branch, not the fake one.
+	c6 := commit()
+	gb.CheckoutBranch(ctx, "mybranch2")
+	c7 := commit()
+	c8Hash := gb.MergeBranch(ctx, "master")
+	c8, err := gd.Details(ctx, c8Hash)
+	assert.NoError(t, err)
+	test(&commitBatch{
+		branch:  "mybranch",
+		commits: []*vcsinfo.LongCommit{c6, c7, c8},
+	}, 9, 1)
+	b := graph.BranchHeads()[0]
+	assert.False(t, isFakeBranch(b.Name))
+	assert.Equal(t, "mybranch", b.Name)
+	assert.Equal(t, c8.Hash, b.Head)
+}
+
+func setupTestInitial(t *testing.T) (context.Context, *git_testutils.GitBuilder, *gitiles_testutils.MockRepo, *repoImpl, func()) {
+	ctx := context.Background()
+	g := git_testutils.GitInit(t, ctx)
+	gs := mocks.GitStore{}
+	gs.On("RangeByTime", ctx, vcsinfo.MinTime, vcsinfo.MaxTime, gitstore.ALL_BRANCHES).Return(nil, nil)
+	gs.On("GetBranches", ctx).Return(nil, nil)
+	urlMock := mockhttpclient.NewURLMock()
+	mockRepo := gitiles_testutils.NewMockRepo(t, g.RepoUrl(), git.GitDir(g.Dir()), urlMock)
+	repo := gitiles.NewRepo(g.RepoUrl(), "", urlMock.Client())
+	gcsClient := test_gcsclient.NewMemoryClient("fake-bucket")
+	ri, err := newRepoImpl(ctx, &gs, repo, gcsClient, "repo-ingestion")
+	assert.NoError(t, err)
+	return ctx, g, mockRepo, ri.(*repoImpl), g.Cleanup
+}
+
+func TestInitialIngestion(t *testing.T) {
+	unittest.LargeTest(t)
+	ctx, gb, mockRepo, ri, cleanup := setupTestInitial(t)
+	defer cleanup()
+
+	gd := git.GitDir(gb.Dir())
+
+	commit := func() *vcsinfo.LongCommit {
+		h := gb.CommitGen(ctx, uuid.New().String())
+		c, err := gd.Details(ctx, h)
+		assert.NoError(t, err)
+		return c
+	}
+
+	test := func(expectBranches, expectCommits int) {
+		// Clear the cache between every attempt.
+		ri.MemCacheRepoImpl = repograph.NewMemCacheRepoImpl(nil, nil)
+		mockRepo.MockBranches(ctx)
+		assert.NoError(t, ri.initialIngestion(ctx))
+		assert.Equal(t, expectBranches, len(ri.BranchList))
+		assert.Equal(t, expectCommits, len(ri.Commits))
+		assert.True(t, mockRepo.Empty())
+	}
+
+	// No commits, no branches; nothing to do.
+	test(0, 0)
+
+	// One commit.
+	c0 := commit()
+	mockRepo.MockLog(ctx, c0.Hash, gitiles.LogReverse(), gitiles.LogBatchSize(batchSize))
+	test(1, 1)
+	assert.Equal(t, "master", ri.BranchList[0].Name)
+	assert.Equal(t, c0.Hash, ri.BranchList[0].Head)
+	deepequal.AssertDeepEqual(t, ri.Commits[c0.Hash], c0)
+
+	// No new commits. Clear out the cache and ensure that we don't request
+	// the log of c0 again, because it's backed up in GCS.
+	test(1, 1)
+
+	// New commits on a non-master branch.
+	gb.CreateBranchTrackBranch(ctx, "branch2", "master")
+	var newBranchCommits []*vcsinfo.LongCommit
+	for i := 0; i < 10; i++ {
+		newBranchCommits = append(newBranchCommits, commit())
+	}
+	last := newBranchCommits[len(newBranchCommits)-1]
+	mockRepo.MockLog(ctx, last.Hash, gitiles.LogBatchSize(batchSize))
+	test(2, 11)
+	for _, b := range ri.BranchList {
+		if b.Name == "master" {
+			assert.Equal(t, c0.Hash, b.Head)
+		} else {
+			assert.Equal(t, "branch2", b.Name)
+			assert.Equal(t, last.Hash, b.Head)
+		}
+	}
+
+	// New commits on several new branches.
+	for i := 0; i < 10; i++ {
+		gb.CreateBranchTrackBranch(ctx, fmt.Sprintf("b%d", i), "master")
+		commits := []*vcsinfo.LongCommit{}
+		for j := 0; j < 10; j++ {
+			commits = append(commits, commit())
+		}
+		last = commits[len(commits)-1]
+		mockRepo.MockLog(ctx, last.Hash, gitiles.LogBatchSize(batchSize))
+	}
+	test(12, 111)
+
+	// One new commit on one of the branches. Ensure that we only request
+	// the new commit.
+	mockRepo.MockLog(ctx, git.LogFromTo(last.Hash, commit().Hash), gitiles.LogBatchSize(batchSize))
+	test(12, 112)
+}
diff --git a/gitsync/go/watcher/watcher.go b/gitsync/go/watcher/watcher.go
new file mode 100644
index 0000000..d2903fd
--- /dev/null
+++ b/gitsync/go/watcher/watcher.go
@@ -0,0 +1,548 @@
+package watcher
+
+import (
+	"context"
+	"fmt"
+	"path"
+	"runtime/debug"
+	"sync"
+	"time"
+
+	"cloud.google.com/go/storage"
+	"go.skia.org/infra/go/cleanup"
+	"go.skia.org/infra/go/gcs"
+	"go.skia.org/infra/go/gcs/gcsclient"
+	"go.skia.org/infra/go/git"
+	"go.skia.org/infra/go/git/repograph"
+	"go.skia.org/infra/go/gitiles"
+	"go.skia.org/infra/go/gitstore"
+	"go.skia.org/infra/go/gitstore/bt_gitstore"
+	"go.skia.org/infra/go/metrics2"
+	"go.skia.org/infra/go/skerr"
+	"go.skia.org/infra/go/sklog"
+	"go.skia.org/infra/go/timer"
+	"go.skia.org/infra/go/vcsinfo"
+	"golang.org/x/sync/errgroup"
+)
+
+const (
+	// batchSize is the number of commits retrieved at a time from Gitiles.
+	batchSize = 10000
+)
+
+// Start creates a GitStore with the provided information and starts periodic
+// ingestion.
+func Start(ctx context.Context, conf *bt_gitstore.BTConfig, repoURL, gitcookiesPath, gcsBucket, gcsPath string, interval time.Duration) error {
+	sklog.Infof("Initializing watcher for %s", repoURL)
+	gitStore, err := bt_gitstore.New(ctx, conf, repoURL)
+	if err != nil {
+		return skerr.Wrapf(err, "Error instantiating git store for %s.", repoURL)
+	}
+	gr := gitiles.NewRepo(repoURL, gitcookiesPath, nil)
+	s, err := storage.NewClient(ctx)
+	if err != nil {
+		return skerr.Wrapf(err, "Failed to create storage client for %s.", gcsBucket)
+	}
+	gcsClient := gcsclient.New(s, gcsBucket)
+	ri, err := newRepoImpl(ctx, gitStore, gr, gcsClient, gcsPath)
+	if err != nil {
+		return skerr.Wrapf(err, "Failed to create RepoImpl for %s; using gs://%s/%s.", repoURL, gcsBucket, gcsPath)
+	}
+	sklog.Infof("Building Graph for %s...", repoURL)
+	repo, err := repograph.NewWithRepoImpl(ctx, ri)
+	if err != nil {
+		return skerr.Wrapf(err, "Failed to create repo graph for %s.", repoURL)
+	}
+	repo.UpdateBranchInfo()
+
+	// Start periodic ingestion.
+	lvGitSync := metrics2.NewLiveness("last_successful_git_sync", map[string]string{"repo": repoURL})
+	cleanup.Repeat(interval, func(ctx context.Context) {
+		defer timer.New("Sync " + repoURL).Stop()
+		// Catch any panic and log relevant information to find the root cause.
+		defer func() {
+			if err := recover(); err != nil {
+				sklog.Errorf("Panic updating %s:  %s\n%s", repoURL, err, string(debug.Stack()))
+			}
+		}()
+
+		sklog.Infof("Updating %s...", repoURL)
+		if err := repo.Update(ctx); err != nil {
+			sklog.Errorf("Error updating %s: %s", repoURL, err)
+		} else {
+			gotBranches, err := gitStore.GetBranches(ctx)
+			if err != nil {
+				sklog.Errorf("Successfully updated %s but failed to retrieve branch heads: %s", repoURL, err)
+			} else {
+				sklog.Infof("Successfully updated %s", repoURL)
+				for name, branch := range gotBranches {
+					sklog.Debugf("  %s@%s: %d, %s", path.Base(repoURL), name, branch.Index, branch.Head)
+				}
+			}
+			lvGitSync.Reset()
+		}
+	}, nil)
+	return nil
+}
+
+// repoImpl is an implementation of repograph.RepoImpl which loads commits into
+// a GitStore.
+type repoImpl struct {
+	*repograph.MemCacheRepoImpl
+	gcsClient gcs.GCSClient
+	gcsPath   string
+	gitiles   *gitiles.Repo
+	gitstore  gitstore.GitStore
+}
+
+// newRepoImpl returns a repograph.RepoImpl which uses both Gitiles and
+// GitStore.
+func newRepoImpl(ctx context.Context, gs gitstore.GitStore, repo *gitiles.Repo, gcsClient gcs.GCSClient, gcsPath string) (repograph.RepoImpl, error) {
+	indexCommits, err := gs.RangeByTime(ctx, vcsinfo.MinTime, vcsinfo.MaxTime, gitstore.ALL_BRANCHES)
+	if err != nil {
+		return nil, skerr.Wrapf(err, "Failed loading IndexCommits from GitStore.")
+	}
+	var commits []*vcsinfo.LongCommit
+	if len(indexCommits) > 0 {
+		hashes := make([]string, 0, len(indexCommits))
+		for _, c := range indexCommits {
+			hashes = append(hashes, c.Hash)
+		}
+		commits, err = gs.Get(ctx, hashes)
+		if err != nil {
+			return nil, skerr.Wrapf(err, "Failed loading LongCommits from GitStore.")
+		}
+	}
+	gb, err := gs.GetBranches(ctx)
+	if err != nil {
+		return nil, skerr.Wrapf(err, "Failed loading branches from GitStore.")
+	}
+	branches := make([]*git.Branch, 0, len(gb))
+	for name, branch := range gb {
+		branches = append(branches, &git.Branch{
+			Name: name,
+			Head: branch.Head,
+		})
+	}
+	commitsMap := make(map[string]*vcsinfo.LongCommit, len(commits))
+	for _, c := range commits {
+		commitsMap[c.Hash] = c
+	}
+	sklog.Infof("Repo %s has %d commits and %d branches.", repo.URL, len(commits), len(branches))
+	for _, b := range branches {
+		sklog.Infof("  branch %s @ %s", b.Name, b.Head)
+	}
+	return &repoImpl{
+		MemCacheRepoImpl: repograph.NewMemCacheRepoImpl(commitsMap, branches),
+		gcsClient:        gcsClient,
+		gcsPath:          gcsPath,
+		gitiles:          repo,
+		gitstore:         gs,
+	}, nil
+}
+
+type commitBatch struct {
+	branch  string
+	commits []*vcsinfo.LongCommit
+}
+
+// processCommits processes commits in a separate goroutine while the
+// passed-in func loads commits from Gitiles.
+func (r *repoImpl) processCommits(ctx context.Context, process func(context.Context, *commitBatch) error, loadCommits func(context.Context, chan<- *commitBatch) error) error {
+	// Run GitStore ingestion in a goroutine. Create a cancelable context
+	// to halt requests to Gitiles if GitStore ingestion fails.
+	ctx, cancel := context.WithCancel(ctx)
+	defer cancel()
+	commitsCh := make(chan *commitBatch)
+	errCh := make(chan error)
+	go func() {
+		var err error
+		for cb := range commitsCh {
+			if err != nil {
+				// We've hit an error but we need to consume
+				// all of the commits from the channel before
+				// returning, or the passed-in func may block
+				// forever.
+				sklog.Warningf("Skipping %d commits due to previous error.", len(cb.commits))
+				continue
+			}
+			// Process the commits.
+			if process != nil {
+				err = process(ctx, cb)
+			}
+			if err != nil {
+				if err != gitiles.ErrStopIteration {
+					sklog.Errorf("processCommits encountered error: %s", err)
+				}
+				// Cancel the context passed to loadCommits().
+				// If it respects context.Done() as it's
+				// supposed to, then it should exit early with
+				// an error.
+				cancel()
+			}
+		}
+		// Signal that we're done ingesting commits.
+		errCh <- err
+	}()
+
+	// Run the passed-in func.
+	loadingErr := loadCommits(ctx, commitsCh)
+
+	// Close the commits channel, wait for the goroutine to complete.
+	close(commitsCh)
+	processErr := <-errCh
+
+	// The error returned from the gitstore goroutine takes precedence,
+	// because we cancel the context when gitstore.Put fails, and thus
+	// loadCommits() may return an error simply stating that the context was
+	// canceled.
+	if processErr != nil {
+		if processErr == gitiles.ErrStopIteration {
+			// Ignore the loadingErr in this case, since it's almost
+			// certainly just "context canceled".
+			return nil
+		}
+		if loadingErr != nil {
+			return skerr.Wrapf(processErr, "GitStore ingestion failed, and commit-loading func failed with: %s", loadingErr)
+		}
+		return processErr
+	}
+	return loadingErr
+}
+
+// loadCommitsFromGitiles loads commits from Gitiles and pushes them onto the
+// given channel, until we reach the optional from commit, or any other commit
+// we've seen before.
+func (r *repoImpl) loadCommitsFromGitiles(ctx context.Context, branch, logExpr string, commitsCh chan<- *commitBatch, opts ...gitiles.LogOption) error {
+	return r.gitiles.LogFnBatch(ctx, logExpr, func(ctx context.Context, commits []*vcsinfo.LongCommit) error {
+		commitsCh <- &commitBatch{
+			branch:  branch,
+			commits: commits,
+		}
+		return nil
+	}, opts...)
+}
+
+// initialIngestion performs the first-time ingestion of the repo.
+func (r *repoImpl) initialIngestion(ctx context.Context) error {
+	sklog.Warningf("Performing initial ingestion of %s.", r.gitiles.URL)
+	defer timer.New("Initial ingestion").Stop()
+
+	// Create a tmpGitStore.
+	sklog.Info("Retrieving graph from temporary store.")
+	t := timer.New("Retrieving graph from temp store")
+	graph, ri, err := setupInitialIngest(ctx, r.gcsClient, r.gcsPath, r.gitiles.URL)
+	if err != nil {
+		return skerr.Wrapf(err, "Failed initial ingestion of %s using GCS file %s", r.gitiles.URL, r.gcsPath)
+	}
+	for _, c := range graph.GetAll() {
+		r.Commits[c.Hash] = c.LongCommit
+	}
+	// oldBranches maps branch names to commit hashes for the existing
+	// branches.
+	oldBranches := map[string]string{}
+	for _, b := range graph.BranchHeads() {
+		oldBranches[b.Name] = b.Head
+	}
+	t.Stop()
+
+	// Find the current set of branches.
+	t = timer.New("Loading commits from gitiles")
+	branches, err := r.gitiles.Branches(ctx)
+	if err != nil {
+		return skerr.Wrapf(err, "Failed loading branches from Gitiles.")
+	}
+
+	// Load commits from gitiles.
+
+	// We assume that master contains the majority of the commits in the
+	// repo, and the other branches are comparatively small, with most of
+	// their ancestry being on master itself.
+	var master *git.Branch
+	for _, b := range branches {
+		if b.Name == "master" {
+			master = b
+			break
+		}
+	}
+	if master != nil {
+		if master.Head == oldBranches[master.Name] {
+			sklog.Infof("Master is up to date; skipping.")
+		} else {
+			sklog.Info("Loading commits from gitiles for master.")
+			if err := r.processCommits(ctx, func(ctx context.Context, cb *commitBatch) error {
+				// Add the new commits to our local cache.
+				sklog.Infof("Adding batch of %d commits", len(cb.commits))
+				for _, c := range cb.commits {
+					r.Commits[c.Hash] = c
+				}
+				return initialIngestCommitBatch(ctx, graph, ri.MemCacheRepoImpl, cb)
+			}, func(ctx context.Context, commitsCh chan<- *commitBatch) error {
+				logExpr := master.Head
+				if oldHead, ok := oldBranches[master.Name]; ok {
+					sklog.Errorf("Have master @ %s; requesting %s", oldHead, master.Head)
+					logExpr = fmt.Sprintf("%s..%s", oldHead, master.Head)
+				}
+				return r.loadCommitsFromGitiles(ctx, master.Name, logExpr, commitsCh, gitiles.LogReverse(), gitiles.LogBatchSize(batchSize))
+			}); err != nil {
+				return skerr.Wrapf(err, "Failed to ingest commits for master.")
+			}
+		}
+	}
+	ri.Wait()
+
+	// mtx protects graph and ri.commits as we load commits for non-master
+	// branches in different goroutines.
+	var mtx sync.Mutex
+
+	// Load commits for other branches, in non-reverse order, so that we can
+	// stop once we reach commits already on the master branch.
+	var egroup errgroup.Group
+
+	for _, branch := range branches {
+		// https://golang.org/doc/faq#closures_and_goroutines
+		branch := branch
+		if branch == master {
+			continue
+		}
+		sklog.Infof("Loading commits for %s", branch.Name)
+		mtx.Lock()
+		_, exists := r.Commits[branch.Head]
+		mtx.Unlock()
+		if exists {
+			sklog.Infof(" ... already have %s, skip", branch.Head)
+			continue
+		}
+		egroup.Go(func() error {
+			var commits []*vcsinfo.LongCommit
+			mtx.Lock()
+			localCache := make(map[string]*vcsinfo.LongCommit, len(r.Commits))
+			for h, c := range r.Commits {
+				localCache[h] = c
+			}
+			mtx.Unlock()
+
+			// lookingFor tracks which commits are wanted by this
+			// branch. As we traverse back through git history, we
+			// may find commits which we already have in our local
+			// cache. One might thing that we could stop requesting
+			// batches of commits at that point, but if there's a
+			// commit with multiple parents on this branch, we have
+			// to make sure we follow all lines of history, which
+			// means that we need to track all of the commit hashes
+			// which we expect to find but have not yet. We can stop
+			// when we've found all of the hashes we're looking for.
+			lookingFor := map[string]bool{}
+			if err := r.processCommits(ctx, func(ctx context.Context, cb *commitBatch) error {
+				numIngested := 0
+				defer func() {
+					sklog.Infof("Added %d of batch of %d commits", numIngested, len(cb.commits))
+				}()
+				for _, c := range cb.commits {
+					// Remove this commit from lookingFor,
+					// now that we've found it.
+					delete(lookingFor, c.Hash)
+					// Add the commit to the local cache,
+					// if it's not already present. Track
+					// the number of new commits we've seen.
+					if _, ok := localCache[c.Hash]; !ok {
+						commits = append(commits, c)
+						localCache[c.Hash] = c
+						numIngested++
+					}
+					// Add any parents of this commit which
+					// are not already in our cache to the
+					// lookingFor set.
+					for _, p := range c.Parents {
+						if _, ok := localCache[p]; !ok {
+							lookingFor[p] = true
+						}
+					}
+					// If we've found all the commits we
+					// need, we can stop.
+					if len(lookingFor) == 0 {
+						return gitiles.ErrStopIteration
+					}
+				}
+				return nil
+			}, func(ctx context.Context, commitsCh chan<- *commitBatch) error {
+				logExpr := branch.Head
+				if oldHead, ok := oldBranches[branch.Name]; ok {
+					logExpr = git.LogFromTo(oldHead, branch.Head)
+				}
+				return r.loadCommitsFromGitiles(ctx, branch.Name, logExpr, commitsCh, gitiles.LogBatchSize(batchSize))
+			}); err != nil {
+				return skerr.Wrapf(err, "Failed to ingest commits for branch %s.", branch.Name)
+			}
+			// Reverse the slice of commits so that they can be added in
+			// order.
+			for i := 0; i < len(commits)/2; i++ {
+				j := len(commits) - i - 1
+				commits[i], commits[j] = commits[j], commits[i]
+			}
+			mtx.Lock()
+			defer mtx.Unlock()
+			if err := initialIngestCommitBatch(ctx, graph, ri.MemCacheRepoImpl, &commitBatch{
+				branch:  branch.Name,
+				commits: commits,
+			}); err != nil {
+				return skerr.Wrapf(err, "Failed to add commits to graph for %s", branch.Name)
+			}
+			for _, c := range commits {
+				r.Commits[c.Hash] = c
+			}
+			sklog.Infof("Loading commits for %s done", branch.Name)
+			return nil
+		})
+	}
+	// Wait for the above goroutines to finish.
+	if err := egroup.Wait(); err != nil {
+		return skerr.Wrap(err)
+	}
+	// Wait for the initialIngestRepoImpl to finish backing up to GCS.
+	ri.Wait()
+	t.Stop()
+
+	// Replace the fake branches with real ones. Update branch membership.
+	ri.BranchList = branches
+	if err := graph.Update(ctx); err != nil {
+		return skerr.Wrapf(err, "Failed final Graph update.")
+	}
+	ri.Wait()
+	r.BranchList = branches
+	sklog.Infof("Finished initial ingestion of %s; have %d commits and %d branches.", r.gitiles.URL, graph.Len(), len(graph.Branches()))
+	return nil
+}
+
+// See documentation for RepoImpl interface.
+func (r *repoImpl) Update(ctx context.Context) error {
+	sklog.Infof("repoImpl.Update for %s", r.gitiles.URL)
+	defer timer.New("repoImpl.Update for " + r.gitiles.URL).Stop()
+	if len(r.BranchList) == 0 {
+		if err := r.initialIngestion(ctx); err != nil {
+			return skerr.Wrapf(err, "Failed initial ingestion.")
+		}
+	}
+
+	// Find the old and new branch heads.
+	sklog.Infof("Getting branches for %s.", r.gitiles.URL)
+	oldBranches := make(map[string]*git.Branch, len(r.BranchList))
+	for _, branch := range r.BranchList {
+		oldBranches[branch.Name] = branch
+	}
+	branches, err := r.gitiles.Branches(ctx)
+	if err != nil {
+		return skerr.Wrapf(err, "Failed loading branches from Gitiles.")
+	}
+
+	// Download any new commits and add them to the local cache.
+	sklog.Infof("Processing new commits for %s.", r.gitiles.URL)
+	if err := r.processCommits(ctx, func(ctx context.Context, cb *commitBatch) error {
+		// Add the new commits to our local cache.
+		for _, c := range cb.commits {
+			r.Commits[c.Hash] = c
+		}
+		return nil
+	}, func(ctx context.Context, commitsCh chan<- *commitBatch) error {
+		for _, branch := range branches {
+			logExpr := branch.Head
+			if oldBranch, ok := oldBranches[branch.Name]; ok {
+				// If there's nothing new, skip this branch.
+				if branch.Head == oldBranch.Head {
+					continue
+				}
+				// Only load back to the previous branch head.
+				logExpr = fmt.Sprintf("%s..%s", oldBranch.Head, branch.Head)
+			}
+			if err := r.loadCommitsFromGitiles(ctx, branch.Name, logExpr, commitsCh); err != nil {
+				return skerr.Wrapf(err, "Failed loading commits for %s", branch.Head)
+			}
+		}
+		return nil
+	}); err != nil {
+		return err
+	}
+	r.BranchList = branches
+	return nil
+}
+
+// See documentation for RepoImpl interface.
+func (r *repoImpl) Details(ctx context.Context, hash string) (*vcsinfo.LongCommit, error) {
+	c, err := r.MemCacheRepoImpl.Details(ctx, hash)
+	if err == nil {
+		return c, nil
+	}
+	// Fall back to retrieving from Gitiles, store any new commits in the
+	// local cache.
+	sklog.Errorf("Missing commit %s in %s", hash[:7], r.gitiles.URL)
+	lookingFor := map[string]bool{}
+	if err := r.processCommits(ctx, func(ctx context.Context, cb *commitBatch) error {
+		// Add the new commits to our local cache.
+		for _, c := range cb.commits {
+			delete(lookingFor, c.Hash)
+			for _, p := range c.Parents {
+				if _, ok := r.Commits[p]; !ok {
+					lookingFor[p] = true
+				}
+			}
+			r.Commits[c.Hash] = c
+			if len(lookingFor) == 0 {
+				return gitiles.ErrStopIteration
+			}
+		}
+		return nil
+	}, func(ctx context.Context, commitsCh chan<- *commitBatch) error {
+		return r.loadCommitsFromGitiles(ctx, "", hash, commitsCh)
+	}); err != nil {
+		return nil, err
+	}
+	c, ok := r.Commits[hash]
+	if !ok {
+		return nil, skerr.Fmt("Commit %s in %s is still missing despite attempting to load it from gitiles.", hash, r.gitiles.URL)
+	}
+	return c, nil
+}
+
+// See documentation for RepoImpl interface.
+func (r *repoImpl) UpdateCallback(ctx context.Context, added, removed []*vcsinfo.LongCommit, graph *repograph.Graph) error {
+	sklog.Infof("repoImpl.UpdateCallback for %s", r.gitiles.URL)
+	defer timer.New("repoImpl.UpdateCallback for " + r.gitiles.URL).Stop()
+	// Ensure that branch membership is up to date.
+	modified := graph.UpdateBranchInfo()
+	modifiedMap := make(map[string]*vcsinfo.LongCommit, len(modified)+len(added))
+	for _, c := range added {
+		modifiedMap[c.Hash] = c
+	}
+	for _, c := range modified {
+		modifiedMap[c.Hash] = c
+	}
+	// Don't include commits in the 'removed' list.
+	for _, c := range removed {
+		delete(modifiedMap, c.Hash)
+	}
+	putCommits := make([]*vcsinfo.LongCommit, 0, len(modifiedMap))
+	for _, c := range modifiedMap {
+		putCommits = append(putCommits, c)
+	}
+	sklog.Infof("Put %d new and %d modified commits for %s.", len(added), len(modified), r.gitiles.URL)
+	if err := r.gitstore.Put(ctx, putCommits); err != nil {
+		return skerr.Wrapf(err, "Failed putting commits into GitStore.")
+	}
+	// TODO(borenet): Should we delete commits which were removed?
+	branchHeads := graph.BranchHeads()
+	branches := make(map[string]string, len(branchHeads))
+	for _, b := range branchHeads {
+		branches[b.Name] = b.Head
+	}
+	// Explicitly delete any old branches which are no longer present.
+	oldBranches, err := r.gitstore.GetBranches(ctx)
+	if err != nil {
+		return skerr.Wrapf(err, "Failed to retrieve old branch heads.")
+	}
+	for name := range oldBranches {
+		if _, ok := branches[name]; !ok {
+			branches[name] = gitstore.DELETE_BRANCH
+		}
+	}
+	return r.gitstore.PutBranches(ctx, branches)
+}
diff --git a/gitsync/go/watcher/watcher_test.go b/gitsync/go/watcher/watcher_test.go
new file mode 100644
index 0000000..8475389
--- /dev/null
+++ b/gitsync/go/watcher/watcher_test.go
@@ -0,0 +1,368 @@
+package watcher
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"io/ioutil"
+	"strings"
+	"testing"
+	"time"
+
+	assert "github.com/stretchr/testify/require"
+	"go.skia.org/infra/go/deepequal"
+	"go.skia.org/infra/go/gcs/test_gcsclient"
+	"go.skia.org/infra/go/git"
+	"go.skia.org/infra/go/git/repograph"
+	repograph_shared_tests "go.skia.org/infra/go/git/repograph/shared_tests"
+	git_testutils "go.skia.org/infra/go/git/testutils"
+	"go.skia.org/infra/go/gitiles"
+	gitiles_testutils "go.skia.org/infra/go/gitiles/testutils"
+	"go.skia.org/infra/go/gitstore"
+	"go.skia.org/infra/go/gitstore/mocks"
+	gitstore_testutils "go.skia.org/infra/go/gitstore/testutils"
+	"go.skia.org/infra/go/mockhttpclient"
+	"go.skia.org/infra/go/sklog"
+	"go.skia.org/infra/go/testutils"
+	"go.skia.org/infra/go/testutils/unittest"
+	"go.skia.org/infra/go/util"
+	"go.skia.org/infra/go/vcsinfo"
+)
+
+func TestIngestCommits(t *testing.T) {
+	unittest.SmallTest(t)
+
+	ctx := context.Background()
+	gs := &mocks.GitStore{}
+	ri := &repoImpl{
+		MemCacheRepoImpl: repograph.NewMemCacheRepoImpl(nil, nil),
+		gitstore:         gs,
+	}
+	idx := 0
+	makeCommits := func(hashes ...string) []*vcsinfo.LongCommit {
+		rv := make([]*vcsinfo.LongCommit, 0, len(hashes))
+		for _, h := range hashes {
+			rv = append(rv, &vcsinfo.LongCommit{
+				ShortCommit: &vcsinfo.ShortCommit{
+					Hash: h,
+				},
+				Index:    idx,
+				Branches: map[string]bool{"master": true},
+			})
+			idx++
+		}
+		return rv
+	}
+
+	totalIngested := 0
+	assertNew := func(numNew int) {
+		totalIngested += numNew
+		assert.Equal(t, totalIngested, len(ri.Commits))
+	}
+	process := func(ctx context.Context, cb *commitBatch) error {
+		if err := ri.gitstore.Put(ctx, cb.commits); err != nil {
+			return err
+		}
+		for _, c := range cb.commits {
+			ri.Commits[c.Hash] = c
+		}
+		return nil
+	}
+	// Ingest a single commit.
+	assert.NoError(t, ri.processCommits(ctx, process, func(ctx context.Context, ch chan<- *commitBatch) error {
+		commits := makeCommits("abc123")
+		gs.On("Put", ctx, commits).Return(nil)
+		gs.On("PutBranches", ctx, map[string]string{"master": commits[len(commits)-1].Hash}).Return(nil)
+		ch <- &commitBatch{
+			commits: commits,
+		}
+		return nil
+	}))
+	assertNew(1)
+
+	// Ingest a series of commits.
+	assert.NoError(t, ri.processCommits(ctx, process, func(ctx context.Context, ch chan<- *commitBatch) error {
+		for i := 1; i < 5; i++ {
+			hashes := make([]string, 0, i)
+			for j := 0; j < i; j++ {
+				hashes = append(hashes, fmt.Sprintf("%dabc%d", i, j))
+			}
+			commits := makeCommits(hashes...)
+			gs.On("Put", ctx, commits).Return(nil)
+			gs.On("PutBranches", ctx, map[string]string{"master": commits[len(commits)-1].Hash}).Return(nil)
+			ch <- &commitBatch{
+				commits: commits,
+			}
+		}
+		return nil
+	}))
+	assertNew(1 + 2 + 3 + 4)
+
+	// If the passed-in func returns an error, it should propagate, and the
+	// previously-queued commits should still get ingested.
+	err := errors.New("commit retrieval failed.")
+	assert.Equal(t, err, ri.processCommits(ctx, process, func(ctx context.Context, ch chan<- *commitBatch) error {
+		commits := makeCommits("def456")
+		gs.On("Put", ctx, commits).Return(nil)
+		gs.On("PutBranches", ctx, map[string]string{"master": commits[len(commits)-1].Hash}).Return(nil)
+		ch <- &commitBatch{
+			commits: commits,
+		}
+		return err
+	}))
+	assertNew(1)
+
+	// Ensure that the context gets canceled if ingestion fails.
+	err = ri.processCommits(ctx, process, func(ctx context.Context, ch chan<- *commitBatch) error {
+		for i := 5; i < 10; i++ {
+			if err := ctx.Err(); err != nil {
+				return err
+			}
+			hashes := make([]string, 0, i)
+			for j := 0; j < i; j++ {
+				hashes = append(hashes, fmt.Sprintf("%dabc%d", i, j))
+			}
+			commits := makeCommits(hashes...)
+			if i == 7 {
+				gs.On("Put", ctx, commits).Return(errors.New("commit ingestion failed."))
+			} else {
+				gs.On("Put", ctx, commits).Return(nil)
+				gs.On("PutBranches", ctx, map[string]string{"master": commits[len(commits)-1].Hash}).Return(nil)
+			}
+			ch <- &commitBatch{
+				commits: commits,
+			}
+		}
+		return nil
+	})
+	sklog.Errorf("Error: %s", err.Error())
+	assert.True(t, strings.Contains(err.Error(), "commit ingestion failed"))
+	assert.True(t, strings.Contains(err.Error(), "and commit-loading func failed with: context canceled"))
+	assertNew(5 + 6)
+}
+
+// gitsyncRefresher is an implementation of repograph_shared_tests.RepoImplRefresher
+// used for testing gitsync.
+type gitsyncRefresher struct {
+	gitiles     *gitiles_testutils.MockRepo
+	graph       *repograph.Graph
+	gs          gitstore.GitStore
+	initialSync bool
+	oldBranches map[string]string
+	repo        *git.Repo
+	t           *testing.T
+}
+
+func newGitsyncRefresher(t *testing.T, ctx context.Context, gs gitstore.GitStore, gb *git_testutils.GitBuilder, mr *gitiles_testutils.MockRepo) repograph_shared_tests.RepoImplRefresher {
+	repo := &git.Repo{GitDir: git.GitDir(gb.Dir())}
+	branches, err := repo.Branches(ctx)
+	assert.NoError(t, err)
+	oldBranches := make(map[string]string, len(branches))
+	for _, b := range branches {
+		oldBranches[b.Name] = b.Head
+	}
+	return &gitsyncRefresher{
+		gitiles:     mr,
+		graph:       nil, // Set later in setupGitsync, after the graph is created.
+		gs:          gs,
+		initialSync: true,
+		oldBranches: oldBranches,
+		repo:        repo,
+		t:           t,
+	}
+}
+
+func (u *gitsyncRefresher) Refresh(commits ...*vcsinfo.LongCommit) {
+	ctx := context.Background()
+
+	assert.True(u.t, u.gitiles.Empty())
+
+	// Check the GitStore contents before updating the underlying repo.
+	u.checkIngestion(ctx)
+
+	// Update the backing repo.
+	assert.NoError(u.t, u.repo.Update(ctx))
+
+	// Mock calls to gitiles.
+	branches, err := u.repo.Branches(ctx)
+	assert.NoError(u.t, err)
+	branchMap := make(map[string]string, len(branches))
+	for _, b := range branches {
+		oldHead := u.oldBranches[b.Name]
+		if b.Head != oldHead {
+			logExpr := b.Head
+			if oldHead != "" {
+				logExpr = fmt.Sprintf("%s..%s", oldHead, b.Head)
+			}
+			var opts []gitiles.LogOption
+			if u.initialSync && b.Name == "master" {
+				opts = append(opts, gitiles.LogReverse(), gitiles.LogBatchSize(batchSize))
+			}
+			u.gitiles.MockLog(ctx, logExpr, opts...)
+		}
+		branchMap[b.Name] = b.Head
+	}
+	u.oldBranches = branchMap
+	u.gitiles.MockBranches(ctx)
+	if u.initialSync {
+		u.gitiles.MockBranches(ctx)
+		u.initialSync = false
+	}
+}
+
+// checkIngestion asserts that the contents of the GitStore match those of the
+// repograph.Graph.
+func (u *gitsyncRefresher) checkIngestion(ctx context.Context) {
+	if u.graph == nil {
+		return
+	}
+
+	// Wait for GitStore to be up to date.
+	branchHeads := u.graph.BranchHeads()
+	expectBranches := make(map[string]string, len(branchHeads))
+	for _, b := range branchHeads {
+		expectBranches[b.Name] = b.Head
+	}
+	assert.NoError(u.t, testutils.EventuallyConsistent(time.Second, func() error {
+		actual, err := u.gs.GetBranches(ctx)
+		assert.NoError(u.t, err)
+		for name, expect := range expectBranches {
+			actualBranch, ok := actual[name]
+			if !ok || actualBranch.Head != expect {
+				sklog.Errorf("%s is %+v, expect %s", name, actualBranch, expect)
+				time.Sleep(10 * time.Millisecond)
+				return testutils.TryAgainErr
+			}
+		}
+		for name := range actual {
+			if _, ok := expectBranches[name]; name != gitstore.ALL_BRANCHES && !ok {
+				sklog.Errorf("Expected %s not to be present", name)
+				time.Sleep(10 * time.Millisecond)
+				return testutils.TryAgainErr
+			}
+		}
+		return nil
+	}))
+
+	// Assert that the branch heads are the same.
+	gotBranches, err := u.gs.GetBranches(ctx)
+	assert.NoError(u.t, err)
+	delete(gotBranches, gitstore.ALL_BRANCHES)
+	assert.Equal(u.t, len(expectBranches), len(gotBranches))
+	for name, head := range expectBranches {
+		assert.Equal(u.t, head, gotBranches[name].Head)
+	}
+
+	// Assert that all LongCommits are present and correct.
+	iCommits, err := u.gs.RangeByTime(ctx, vcsinfo.MinTime, vcsinfo.MaxTime, gitstore.ALL_BRANCHES)
+	assert.NoError(u.t, err)
+	hashes := make([]string, 0, len(iCommits))
+	for _, c := range iCommits {
+		hashes = append(hashes, c.Hash)
+	}
+	longCommits, err := u.gs.Get(ctx, hashes)
+	assert.NoError(u.t, err)
+	commits := make(map[string]*vcsinfo.LongCommit, len(hashes))
+	for _, c := range longCommits {
+		assert.NotNil(u.t, c)
+		commits[c.Hash] = c
+	}
+	for _, c := range u.graph.GetAll() {
+		deepequal.AssertDeepEqual(u.t, c.LongCommit, commits[c.Hash])
+	}
+
+	// Assert that the IndexCommits are correct for each branch.
+	for name := range expectBranches {
+		branchPtr := gotBranches[name]
+		branchCommits, err := u.graph.LogLinear("", name)
+		assert.NoError(u.t, err)
+		expectIndexCommits := make([]*vcsinfo.IndexCommit, 0, len(branchCommits))
+		for i := len(branchCommits) - 1; i >= 0; i-- {
+			c := branchCommits[i]
+			expectIndexCommits = append(expectIndexCommits, &vcsinfo.IndexCommit{
+				Hash:      c.Hash,
+				Index:     len(expectIndexCommits),
+				Timestamp: c.Timestamp.UTC(),
+			})
+		}
+
+		// RangeN.
+		gotIndexCommits, err := u.gs.RangeN(ctx, 0, branchPtr.Index+1, name)
+		assert.NoError(u.t, err)
+		deepequal.AssertDeepEqual(u.t, expectIndexCommits, gotIndexCommits)
+
+		// RangeByTime.
+		gotIndexCommits, err = u.gs.RangeByTime(ctx, vcsinfo.MinTime, vcsinfo.MaxTime, name)
+		assert.NoError(u.t, err)
+		deepequal.AssertDeepEqual(u.t, expectIndexCommits, gotIndexCommits)
+	}
+}
+
+// setupGitsync performs common setup for GitStore based Graphs.
+func setupGitsync(t *testing.T) (context.Context, *git_testutils.GitBuilder, *repograph.Graph, repograph_shared_tests.RepoImplRefresher, func()) {
+	ctx, g, cleanup := repograph_shared_tests.CommonSetup(t)
+	wd, err := ioutil.TempDir("", "")
+	assert.NoError(t, err)
+	defer util.RemoveAll(wd)
+	_, _, gs := gitstore_testutils.SetupAndLoadBTGitStore(t, ctx, wd, g.RepoUrl(), true)
+	urlMock := mockhttpclient.NewURLMock()
+	mockRepo := gitiles_testutils.NewMockRepo(t, g.RepoUrl(), git.GitDir(g.Dir()), urlMock)
+	repo := gitiles.NewRepo(g.RepoUrl(), "", urlMock.Client())
+	gcsClient := test_gcsclient.NewMemoryClient("fake-bucket")
+	ri, err := newRepoImpl(ctx, gs, repo, gcsClient, "repo-ingestion")
+	assert.NoError(t, err)
+	ud := newGitsyncRefresher(t, ctx, gs, g, mockRepo)
+	graph, err := repograph.NewWithRepoImpl(ctx, ri)
+	assert.NoError(t, err)
+	ud.(*gitsyncRefresher).graph = graph
+	return ctx, g, graph, ud, cleanup
+}
+
+func TestGraphWellFormedGitSync(t *testing.T) {
+	unittest.LargeTest(t)
+	ctx, g, repo, ud, cleanup := setupGitsync(t)
+	defer cleanup()
+	repograph_shared_tests.TestGraphWellFormed(t, ctx, g, repo, ud)
+}
+
+func TestRecurseGitSync(t *testing.T) {
+	unittest.LargeTest(t)
+	ctx, g, repo, ud, cleanup := setupGitsync(t)
+	defer cleanup()
+	repograph_shared_tests.TestRecurse(t, ctx, g, repo, ud)
+}
+
+func TestRecurseAllBranchesGitSync(t *testing.T) {
+	unittest.LargeTest(t)
+	ctx, g, repo, ud, cleanup := setupGitsync(t)
+	defer cleanup()
+	repograph_shared_tests.TestRecurseAllBranches(t, ctx, g, repo, ud)
+}
+
+func TestUpdateHistoryChangedGitSync(t *testing.T) {
+	unittest.LargeTest(t)
+	ctx, g, repo, ud, cleanup := setupGitsync(t)
+	defer cleanup()
+	repograph_shared_tests.TestUpdateHistoryChanged(t, ctx, g, repo, ud)
+}
+
+func TestUpdateAndReturnCommitDiffsGitSync(t *testing.T) {
+	unittest.LargeTest(t)
+	ctx, g, repo, ud, cleanup := setupGitsync(t)
+	defer cleanup()
+	repograph_shared_tests.TestUpdateAndReturnCommitDiffs(t, ctx, g, repo, ud)
+}
+
+func TestRevListGitSync(t *testing.T) {
+	unittest.LargeTest(t)
+	ctx, g, repo, ud, cleanup := setupGitsync(t)
+	defer cleanup()
+	repograph_shared_tests.TestRevList(t, ctx, g, repo, ud)
+}
+
+func TestBranchMembershipGitSync(t *testing.T) {
+	unittest.LargeTest(t)
+	ctx, g, repo, ud, cleanup := setupGitsync(t)
+	defer cleanup()
+	repograph_shared_tests.TestBranchMembership(t, ctx, g, repo, ud)
+}
diff --git a/go/gitstore/bt_gitstore/bt_gitstore.go b/go/gitstore/bt_gitstore/bt_gitstore.go
index 8a0e503..35acd3b 100644
--- a/go/gitstore/bt_gitstore/bt_gitstore.go
+++ b/go/gitstore/bt_gitstore/bt_gitstore.go
@@ -145,7 +145,7 @@
 			close(mutations)
 		}()
 		// Create IndexCommits mutations for each branch for each commit.
-		for _, c := range commits {
+		for i, c := range commits {
 			// Validation.
 			if c.Index == 0 && len(c.Parents) != 0 {
 				return skerr.Fmt("Commit %s has index zero but has at least one parent. This cannot be correct.", c.Hash)
@@ -176,6 +176,9 @@
 					mutations <- b.mutationForTimestampCommit(branch, ic)
 				}
 			}
+			if i%1000 == 0 {
+				sklog.Infof("Created mutations for %d of %d commits.", i+1, len(commits))
+			}
 		}
 		return nil
 	})