[gitstore] Use gitiles instead of a local git repo
Use a repograph.Graph to update the GitStore, with a RepoImpl that reads from
Gitiles and writes to GitStore as the Graph requests commits.
Bug: skia:9084
Change-Id: Ia19271890ec4240779885c5f7b79032fad3d9435
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/233361
Commit-Queue: Eric Boren <borenet@google.com>
Reviewed-by: Kevin Lubick <kjlubick@google.com>
diff --git a/gitsync/Makefile b/gitsync/Makefile
index be0d588..52cc4df0 100644
--- a/gitsync/Makefile
+++ b/gitsync/Makefile
@@ -12,7 +12,7 @@
go install ./go/btgit/...
.PHONY: build_release
-build_release: build-static-gitsync
+release: build-static-gitsync
./build_release
.PHONY: build-static-gitsync
diff --git a/gitsync/go/gitsync/config.go b/gitsync/go/gitsync/config.go
index 74405ce..c8c749d 100644
--- a/gitsync/go/gitsync/config.go
+++ b/gitsync/go/gitsync/config.go
@@ -10,15 +10,35 @@
// gitSyncConfig contains the configuration options that can be defined in a config file.
// The JSON names of the fields match the flags defined in main.go.
type gitSyncConfig struct {
- BTInstanceID string `json:"bt_instance"` // BigTable instance
- BTTableID string `json:"bt_table"` // BigTable table ID.
- HttpPort string `json:"http_port"` // HTTP port for the health endpoint.
- Local bool `json:"local"` // Indicating whether this is running local.
- ProjectID string `json:"project"` // GCP project ID.
- PromPort string `json:"prom_port"` // Port at which the Prometheus metrics are be exposed.
- RepoURLs []string `json:"repo_url"` // List of repository URLs that should be updated.
- RefreshInterval human.JSONDuration `json:"refresh"` // Interval at which to poll each git repository.
- WorkDir string `json:"workdir"` // Work directory that should contain the checkouts.
+ // BigTable instance.
+ BTInstanceID string `json:"bt_instance"`
+ // BigTable table ID.
+ BTTableID string `json:"bt_table"`
+ // Number of goroutines to use when writing to BigTable. This is a
+ // tradeoff between write throughput and memory usage; more goroutines
+ // will achieve higher throughput but will also use more memory. There
+ // are diminishing returns here, as the number of CPU cores and BigTable
+ // performance will also limit throughput. The default value in
+ // bt_gitstore.DefaultWriteGoroutines has been shown to keep memory
+ // usage within a reasonable range while still providing decent
+ // throughput; you should only need to override this value in the case
+ // of high memory pressure (fewer goroutines) or the initial ingestion
+ // of an exceptionally large repository (more goroutines).
+ BTWriteGoroutines int `json:"bt_write_goroutines"`
+ // HTTP port for the health endpoint.
+ HttpPort string `json:"http_port"`
+ // Indicating whether this is running local.
+ Local bool `json:"local"`
+ // GCP project ID.
+ ProjectID string `json:"project"`
+ // Port at which the Prometheus metrics are be exposed.
+ PromPort string `json:"prom_port"`
+ // List of repository URLs that should be updated.
+ RepoURLs []string `json:"repo_url"`
+ // Interval at which to poll each git repository.
+ RefreshInterval human.JSONDuration `json:"refresh"`
+ // Work directory that should contain the checkouts.
+ WorkDir string `json:"workdir"`
}
// String returns all configuration settings as a string intended to be printed upon startup
@@ -26,16 +46,17 @@
func (g *gitSyncConfig) String() string {
ret := ""
prefix := " "
- ret += fmt.Sprintf("%s bt_instance : %s\n", prefix, g.BTInstanceID)
- ret += fmt.Sprintf("%s bt_table : %s\n", prefix, g.BTTableID)
- ret += fmt.Sprintf("%s http_port : %s\n", prefix, g.HttpPort)
- ret += fmt.Sprintf("%s local : %s\n", prefix, strconv.FormatBool(g.Local))
- ret += fmt.Sprintf("%s project : %s\n", prefix, g.ProjectID)
- ret += fmt.Sprintf("%s prom_port : %s\n", prefix, g.PromPort)
+ ret += fmt.Sprintf("%s bt_instance : %s\n", prefix, g.BTInstanceID)
+ ret += fmt.Sprintf("%s bt_table : %s\n", prefix, g.BTTableID)
+ ret += fmt.Sprintf("%s bt_write_goroutines: %d\n", prefix, g.BTWriteGoroutines)
+ ret += fmt.Sprintf("%s http_port : %s\n", prefix, g.HttpPort)
+ ret += fmt.Sprintf("%s local : %s\n", prefix, strconv.FormatBool(g.Local))
+ ret += fmt.Sprintf("%s project : %s\n", prefix, g.ProjectID)
+ ret += fmt.Sprintf("%s prom_port : %s\n", prefix, g.PromPort)
for _, url := range g.RepoURLs {
- ret += fmt.Sprintf("%s repo_url : %s\n", prefix, url)
+ ret += fmt.Sprintf("%s repo_url : %s\n", prefix, url)
}
- ret += fmt.Sprintf("%s refresh : %s\n", prefix, g.RefreshInterval.String())
- ret += fmt.Sprintf("%s workdir : %s\n", prefix, g.WorkDir)
+ ret += fmt.Sprintf("%s refresh : %s\n", prefix, g.RefreshInterval.String())
+ ret += fmt.Sprintf("%s workdir : %s\n", prefix, g.WorkDir)
return ret
}
diff --git a/gitsync/go/gitsync/main.go b/gitsync/go/gitsync/main.go
index 3875b44..5038f4d 100644
--- a/gitsync/go/gitsync/main.go
+++ b/gitsync/go/gitsync/main.go
@@ -6,15 +6,12 @@
"io/ioutil"
"log"
"net/http"
- "path/filepath"
- "strings"
"time"
"github.com/flynn/json5"
+ "go.skia.org/infra/gitsync/go/watcher"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/common"
- "go.skia.org/infra/go/fileutil"
- "go.skia.org/infra/go/git"
"go.skia.org/infra/go/gitauth"
"go.skia.org/infra/go/gitstore/bt_gitstore"
"go.skia.org/infra/go/httputils"
@@ -27,15 +24,15 @@
// Default config/flag values
var defaultConf = gitSyncConfig{
- BTInstanceID: "production",
- BTTableID: "git-repos2",
- HttpPort: ":9091",
- Local: false,
- ProjectID: "skia-public",
- PromPort: ":20000",
- RepoURLs: []string{},
- RefreshInterval: human.JSONDuration(10 * time.Minute),
- WorkDir: "",
+ BTInstanceID: "production",
+ BTTableID: "git-repos2",
+ BTWriteGoroutines: bt_gitstore.DefaultWriteGoroutines,
+ HttpPort: ":9091",
+ Local: false,
+ ProjectID: "skia-public",
+ PromPort: ":20000",
+ RepoURLs: []string{},
+ RefreshInterval: human.JSONDuration(10 * time.Minute),
}
func main() {
@@ -45,17 +42,19 @@
// Flags that cause the flags below to be disregarded.
configFile := flag.String("config", "", "Disregard flags and load the configuration from this JSON5 config file. The keys and types of the config file match the flags.")
runInit := flag.Bool("init", false, "Initialize the BigTable instance and quit. This should be run with a different different user who has admin rights.")
+ gcsBucket := flag.String("gcs_bucket", "", "GCS bucket used for temporary storage during ingestion.")
+ gcsPath := flag.String("gcs_path", "", "GCS path used for temporary storage during ingestion.")
// Define flags that map to field in the configuration struct.
flag.StringVar(&config.BTInstanceID, "bt_instance", defaultConf.BTInstanceID, "Big Table instance")
flag.StringVar(&config.BTTableID, "bt_table", defaultConf.BTTableID, "BigTable table ID")
+ flag.IntVar(&config.BTWriteGoroutines, "bt_write_goroutines", defaultConf.BTWriteGoroutines, "Number of goroutines to use when writing to BigTable.")
flag.StringVar(&config.HttpPort, "http_port", defaultConf.HttpPort, "The http port where ready-ness endpoints are served.")
flag.BoolVar(&config.Local, "local", defaultConf.Local, "Running locally if true. As opposed to in production.")
flag.StringVar(&config.ProjectID, "project", defaultConf.ProjectID, "ID of the GCP project")
flag.StringVar(&config.PromPort, "prom_port", defaultConf.PromPort, "Metrics service address (e.g., ':10110')")
common.MultiStringFlagVar(&config.RepoURLs, "repo_url", defaultConf.RepoURLs, "Repo url")
flag.DurationVar((*time.Duration)(&config.RefreshInterval), "refresh", time.Duration(defaultConf.RefreshInterval), "Interval in which to poll git and refresh the GitStore.")
- flag.StringVar(&config.WorkDir, "workdir", defaultConf.WorkDir, "Working directory where repos are cached. Use the same directory between calls to speed up checkout time.")
common.InitWithMust(
"gitsync",
@@ -81,9 +80,10 @@
// Configure the bigtable instance.
btConfig := &bt_gitstore.BTConfig{
- ProjectID: config.ProjectID,
- InstanceID: config.BTInstanceID,
- TableID: config.BTTableID,
+ ProjectID: config.ProjectID,
+ InstanceID: config.BTInstanceID,
+ TableID: config.BTTableID,
+ WriteGoroutines: config.BTWriteGoroutines,
}
// Initialize bigtable if invoked with --init and quit.
@@ -97,12 +97,6 @@
return
}
- // Make sure we have a data directory and it exists or can be created.
- if config.WorkDir == "" {
- sklog.Fatal("No workdir specified.")
- }
- useWorkDir := fileutil.Must(fileutil.EnsureDirExists(config.WorkDir))
-
// Make sure we have at least one repo configured.
if len(config.RepoURLs) == 0 {
sklog.Fatalf("At least one repository URL must be configured.")
@@ -117,9 +111,10 @@
}
// Set up Git authentication if a service account email was set.
+ gitcookiesPath := ""
if !config.Local {
// Use the gitcookie created by the gitauth package.
- gitcookiesPath := "/tmp/gitcookies"
+ gitcookiesPath = "/tmp/gitcookies"
sklog.Infof("Writing gitcookies to %s", gitcookiesPath)
if _, err := gitauth.New(ts, gitcookiesPath, true, ""); err != nil {
sklog.Fatalf("Failed to create git cookie updater: %s", err)
@@ -127,24 +122,12 @@
sklog.Infof("Git authentication set up successfully.")
}
- // Start all repo watchers in the background.
+ // Start all repo watchers.
ctx := context.Background()
for _, repoURL := range config.RepoURLs {
- go func(repoURL string) {
- repoDir, err := git.NormalizeURL(repoURL)
- if err != nil {
- sklog.Fatalf("Error getting normalized URL for %q: %s", repoURL, err)
- }
- repoDir = strings.Replace(repoDir, "/", "_", -1)
- repoDir = filepath.Join(useWorkDir, repoDir)
- sklog.Infof("Checking out %s into %s", repoURL, repoDir)
-
- watcher, err := NewRepoWatcher(ctx, btConfig, repoURL, repoDir)
- if err != nil {
- sklog.Fatalf("Error initializing repo watcher: %s", err)
- }
- watcher.Start(ctx, time.Duration(config.RefreshInterval))
- }(repoURL)
+ if err := watcher.Start(ctx, btConfig, repoURL, gitcookiesPath, *gcsBucket, *gcsPath, time.Duration(config.RefreshInterval)); err != nil {
+ sklog.Fatalf("Error initializing repo watcher: %s", err)
+ }
}
// Set up the http handler to indicate ready-ness and start serving.
diff --git a/gitsync/go/gitsync/watcher.go b/gitsync/go/gitsync/watcher.go
deleted file mode 100644
index 51691f5..0000000
--- a/gitsync/go/gitsync/watcher.go
+++ /dev/null
@@ -1,199 +0,0 @@
-package main
-
-import (
- "context"
- "fmt"
- "runtime"
- "time"
-
- "go.skia.org/infra/go/fileutil"
- "go.skia.org/infra/go/git"
- "go.skia.org/infra/go/gitstore"
- "go.skia.org/infra/go/gitstore/bt_gitstore"
- "go.skia.org/infra/go/metrics2"
- "go.skia.org/infra/go/skerr"
- "go.skia.org/infra/go/sklog"
- "go.skia.org/infra/go/util"
- "go.skia.org/infra/go/vcsinfo"
-)
-
-const (
- // batchSize is the size of a batch of commits that is imported into BTGit.
- batchSize = 10000
-)
-
-// RepoWatcher continuously watches a repository and uploads changes to a BigTable Gitstore.
-type RepoWatcher struct {
- gitStore gitstore.GitStore
- repo *git.Repo
- repoDir string
- repoURL string
-}
-
-// NewRepoWatcher creates a GitStore with the provided information and checks out the git repo
-// at repoURL into repoDir. It's Start(...) function will watch a repo in the background.
-func NewRepoWatcher(ctx context.Context, conf *bt_gitstore.BTConfig, repoURL, repoDir string) (*RepoWatcher, error) {
- repoDir, err := fileutil.EnsureDirExists(repoDir)
- if err != nil {
- return nil, err
- }
-
- gitStore, err := bt_gitstore.New(ctx, conf, repoURL)
- if err != nil {
- return nil, skerr.Fmt("Error instantiating git store: %s", err)
- }
-
- repo, err := git.NewRepo(ctx, repoURL, repoDir)
- if err != nil {
- return nil, fmt.Errorf("Failed to create git repo: %s", err)
- }
-
- return &RepoWatcher{
- gitStore: gitStore,
- repo: repo,
- repoDir: repoDir,
- repoURL: repoURL,
- }, nil
-}
-
-// Start watches the repo in the background and updates the BT GitStore. The frequency is
-// defined by 'interval'.
-func (r *RepoWatcher) Start(ctx context.Context, interval time.Duration) {
- lvGitSync := metrics2.NewLiveness("last_successful_git_sync", map[string]string{"repo": r.repoURL})
- go util.RepeatCtx(interval, ctx, func(ctx context.Context) {
- // Catch any panic and log relevant information to find the root cause.
- defer func() {
- if err := recover(); err != nil {
- const size = 64 << 10
- buf := make([]byte, size)
- buf = buf[:runtime.Stack(buf, false)]
- sklog.Errorf("Panic updating %s in %s: %s\n%s", r.repoURL, r.repoDir, err, buf)
- }
- }()
-
- if err := r.updateFn(); err != nil {
- sklog.Errorf("Error updating %s: %s", r.repoURL, err)
- } else {
- lvGitSync.Reset()
- }
- })
-}
-
-// updateFn retrieves git info from the repository and updates the GitStore.
-func (r *RepoWatcher) updateFn() error {
- // Update the git repo.
- ctx := context.Background()
- sklog.Infof("Updating repo ...")
- if err := r.repo.Update(ctx); err != nil {
- return skerr.Fmt("Failed to update repo: %s", err)
- }
-
- // Get the branches from the repo.
- sklog.Info("Getting branches...")
- branches, err := r.repo.Branches(ctx)
- if err != nil {
- return skerr.Fmt("Failed to get branches from Git repo: %s", err)
- }
-
- // Get the current branches from the GitStore.
- currBranches, err := r.gitStore.GetBranches(ctx)
- if err != nil {
- return skerr.Fmt("Error retrieving branches from GitStore: %s", err)
- }
-
- // Find the hashes all all commits that need to be added to the GitStore. This
- // considers all branches in the repo and whether they are already in the GitStore.
- hashes := util.StringSet{}
- for _, newBranch := range branches {
- // revListStr is an argument to repo.RevList below and controls how many commits we
- // retrieve. By default we retrieve all commits in the branch, but may restrict that if
- // we find an ancester to the current branch (see below).
- revListStr := newBranch.Head
-
- // See if we have the branch in the repo already.
- foundBranch, ok := currBranches[newBranch.Name]
- if ok {
- // If the branch hasn't changed we are done.
- if foundBranch.Head == newBranch.Head {
- continue
- }
-
- // See if the new branch head is a descendant of the old branch head.
- anc, err := r.repo.IsAncestor(ctx, foundBranch.Head, newBranch.Head)
- if err != nil {
- return skerr.Fmt("Error checking if %s is an ancestor of %s: %s", foundBranch.Head, newBranch.Head, err)
- }
-
- if anc {
- // Only get the commits between the old and new head.
- revListStr = git.LogFromTo(foundBranch.Head, newBranch.Head)
- }
- }
-
- // Retrieve the target commits.
- foundHashes, err := r.repo.RevList(ctx, "--topo-order", revListStr)
- if err != nil {
- return skerr.Fmt("Error retrieving hashes with the argument %q: %s", revListStr, err)
- }
- hashes.AddLists(foundHashes)
- }
- sklog.Infof("Repo @ %s: Found %d unique hashes in %d branches.", r.repoURL, len(hashes), len(branches))
-
- // Iterate over the LongCommits that correspond to batches.
- ctx, cancelFn := context.WithCancel(context.Background())
- defer cancelFn()
-
- commitsCh, err := r.iterateLongCommits(ctx, hashes.Keys(), batchSize)
- if err != nil {
- return skerr.Fmt("Error iterating over new commits: %s", err)
- }
-
- // Make sure we iterate over all commits, so we don't leak go-routine.
- for commits := range commitsCh {
- if err := r.gitStore.Put(ctx, commits); err != nil {
- return skerr.Fmt("Error writing commits to BigTable: %s", err)
- }
- }
-
- branchMap := make(map[string]string, len(branches))
- for _, gb := range branches {
- branchMap[gb.Name] = gb.Head
- }
- if err := r.gitStore.PutBranches(ctx, branchMap); err != nil {
- return skerr.Fmt("Error calling PutBranches on GitStore: %s", err)
- }
- sklog.Infof("Repo @ %s: Branches updated successfully.", r.repoURL)
- return nil
-}
-
-// iterateLongCommit returns batches of commits corresponding to the given hashes.
-func (r *RepoWatcher) iterateLongCommits(ctx context.Context, hashes []string, batchSize int) (<-chan []*vcsinfo.LongCommit, error) {
- // Allocate a channel so can always send all batches and are not dependent on the speed of the receiver.
- retCh := make(chan []*vcsinfo.LongCommit, len(hashes)/batchSize+1)
-
- go func() {
- longCommits := make([]*vcsinfo.LongCommit, 0, batchSize)
- for idx, hash := range hashes {
- // Check whether the context has been canceled.
- select {
- case <-ctx.Done():
- return
- default:
- }
-
- c, err := r.repo.Details(ctx, hash)
- if err != nil {
- sklog.Errorf("Error fetching commit %q: %s", hash, err)
- continue
- }
-
- longCommits = append(longCommits, c)
- if len(longCommits) >= batchSize || idx == (len(hashes)-1) {
- retCh <- longCommits
- longCommits = make([]*vcsinfo.LongCommit, 0, batchSize)
- }
- }
- close(retCh)
- }()
- return retCh, nil
-}
diff --git a/gitsync/go/watcher/initial.go b/gitsync/go/watcher/initial.go
new file mode 100644
index 0000000..90f65fc
--- /dev/null
+++ b/gitsync/go/watcher/initial.go
@@ -0,0 +1,239 @@
+package watcher
+
+/*
+ This file contains code related to the initial ingestion of a git repo.
+*/
+
+import (
+ "context"
+ "fmt"
+ "path"
+ "strings"
+ "sync"
+ "time"
+
+ "cloud.google.com/go/storage"
+ "github.com/google/uuid"
+ "go.skia.org/infra/go/gcs"
+ "go.skia.org/infra/go/git"
+ "go.skia.org/infra/go/git/repograph"
+ "go.skia.org/infra/go/skerr"
+ "go.skia.org/infra/go/sklog"
+ "go.skia.org/infra/go/util"
+ "go.skia.org/infra/go/vcsinfo"
+)
+
+const (
+ fakeBranchPrefix = "///Fake"
+ gcsIdleWait = time.Second
+ gcsRetryWait = 3 * time.Second
+)
+
+// isFakeBranch returns true iff the given branch name is a fake one created by
+// getFakeBranch.
+func isFakeBranch(branch string) bool {
+ return strings.HasPrefix(branch, fakeBranchPrefix)
+}
+
+// getFakeBranch returns an unused fake branch name, given the map of existing
+// branch names to commit hashes.
+func getFakeBranch(existingBranches util.StringSet) string {
+ return fmt.Sprintf("%s%s", fakeBranchPrefix, uuid.New())
+}
+
+// initialIngestCommitBatch ingests the given batch of commits by adding them to
+// the Graph, ensuring that all of them are reachable from branch heads. Because
+// we're dealing with an incomplete Graph at this stage, it's possible that the
+// commitBatch may contain commits which are not reachable from any branch. In
+// that case, initialIngestCommitBatch will create fake branches as needed to
+// ensure that all commits are reachable. The commits in the batch must be in
+// topological order, ie. a commit's parents must appear before the commit.
+func initialIngestCommitBatch(ctx context.Context, graph *repograph.Graph, ri *repograph.MemCacheRepoImpl, cb *commitBatch) error {
+ // Get the current state of the branch heads.
+
+ // branchSet tracks which branches are present.
+ branchSet := util.NewStringSet()
+ // reverseBranchMap has commit hashes as keys and branch names as
+ // values. This will cause branches which point to the same commit to be
+ // deduplicated, which is desirable in the case of fake branches and is
+ // not a problem for real branches, because they will be fixed once the
+ // initial ingestion is complete.
+ reverseBranchMap := map[string]string{}
+ for _, b := range graph.BranchHeads() {
+ branchSet[b.Name] = true
+ reverseBranchMap[b.Head] = b.Name
+ }
+
+ // Loop over the commits in the commitBatch, "walking" the branch heads
+ // forward to account for the new commits.
+ for _, c := range cb.commits {
+ // Add the commit to the RepoImpl so that it can be found
+ // during graph.Update().
+ ri.Commits[c.Hash] = c
+
+ // Figure out which branches point to this commit's parents.
+ // Only keep the first real and fake branches we find; the rest
+ // can be thrown away. We assume that the first parent is the
+ // more important line of history.
+ var realBranch string
+ var fakeBranch string
+ for _, p := range c.Parents {
+ if name, ok := reverseBranchMap[p]; ok {
+ delete(branchSet, name)
+ delete(reverseBranchMap, p)
+ if isFakeBranch(name) {
+ if fakeBranch == "" {
+ fakeBranch = name
+ }
+ } else {
+ if realBranch == "" {
+ realBranch = name
+ }
+ }
+ }
+ }
+ var branch string
+ if realBranch != "" {
+ // If we have a real branch, use that.
+ branch = realBranch
+ } else if !branchSet[cb.branch] {
+ // If we haven't yet used the branch on the commitBatch,
+ // use that.
+ branch = cb.branch
+ } else if fakeBranch != "" {
+ // If we have a fake branch, fall back to that.
+ branch = fakeBranch
+ } else {
+ // No branch points to any of this commit's parents;
+ // create a fake branch to point to this commit.
+ branch = getFakeBranch(branchSet)
+ }
+ branchSet[branch] = true
+ reverseBranchMap[c.Hash] = branch
+ }
+
+ // Create the new set of branch heads.
+ branches := make([]*git.Branch, 0, len(reverseBranchMap))
+ for head, name := range reverseBranchMap {
+ branches = append(branches, &git.Branch{
+ Name: name,
+ Head: head,
+ })
+ }
+ ri.BranchList = branches
+
+ // Update the graph. This triggers a request to save the Graph to GCS.
+ if err := graph.Update(ctx); err != nil {
+ return skerr.Wrapf(err, "Failed to update Graph with new commits and branches.")
+ }
+ return nil
+}
+
+// setupInitialIngest creates a repograph.Graph and a RepoImpl to be used for
+// the initial ingestion of a git repo.
+func setupInitialIngest(ctx context.Context, gcsClient gcs.GCSClient, gcsPath, repoUrl string) (*repograph.Graph, *initialIngestRepoImpl, error) {
+ normUrl, err := git.NormalizeURL(repoUrl)
+ if err != nil {
+ return nil, nil, skerr.Wrapf(err, "Failed to normalize repo URL: %s", repoUrl)
+ }
+ file := path.Join(gcsPath, strings.ReplaceAll(normUrl, "/", "_"))
+ ri := newInitialIngestRepoImpl(ctx, gcsClient, file)
+ r, err := gcsClient.FileReader(ctx, file)
+ if err != nil {
+ if err == storage.ErrObjectNotExist {
+ g, err := repograph.NewWithRepoImpl(ctx, ri)
+ if err != nil {
+ return nil, nil, skerr.Wrapf(err, "Failed to create repo graph.")
+ }
+ ri.graph = g
+ return g, ri, nil
+ } else {
+ return nil, nil, skerr.Wrapf(err, "Failed to read Graph from GCS.")
+ }
+ }
+ defer util.Close(r)
+ g, err := repograph.NewFromGob(ctx, r, ri)
+ if err != nil {
+ return nil, nil, skerr.Wrapf(err, "Failed to create Graph from GCS.")
+ }
+ ri.graph = g
+ return g, ri, nil
+}
+
+// initialIngestRepoImpl is a struct used during initial ingestion of a git repo.
+type initialIngestRepoImpl struct {
+ *repograph.MemCacheRepoImpl
+ file string
+ gcs gcs.GCSClient
+ graph *repograph.Graph
+ writeRequests int
+ writeRequestsMtx sync.Mutex
+}
+
+// newInitialIngestRepoImpl returns a repograph.RepoImpl used for initial
+// ingestion of a git repo.
+func newInitialIngestRepoImpl(ctx context.Context, gcsClient gcs.GCSClient, file string) *initialIngestRepoImpl {
+ mem := repograph.NewMemCacheRepoImpl(map[string]*vcsinfo.LongCommit{}, nil)
+ ri := &initialIngestRepoImpl{
+ MemCacheRepoImpl: mem,
+ file: file,
+ gcs: gcsClient,
+ }
+ go func() {
+ for {
+ ri.writeRequestsMtx.Lock()
+ writeRequests := ri.writeRequests
+ ri.writeRequestsMtx.Unlock()
+ if writeRequests > 0 {
+ if err := ri.write(ctx); err != nil {
+ sklog.Errorf("Failed to write Graph to GCS: %s; will retry in %s", err, gcsRetryWait)
+ time.Sleep(gcsRetryWait)
+ } else {
+ ri.writeRequestsMtx.Lock()
+ ri.writeRequests -= writeRequests
+ ri.writeRequestsMtx.Unlock()
+ }
+ } else {
+ time.Sleep(gcsIdleWait)
+ }
+ }
+ }()
+ return ri
+}
+
+// See documentation for RepoImpl interface.
+func (ri *initialIngestRepoImpl) UpdateCallback(ctx context.Context, _, _ []*vcsinfo.LongCommit, _ *repograph.Graph) (rv error) {
+ ri.writeRequestsMtx.Lock()
+ defer ri.writeRequestsMtx.Unlock()
+ ri.writeRequests += 1
+ return nil
+}
+
+// Write the Graph to the backing store.
+func (ri *initialIngestRepoImpl) write(ctx context.Context) error {
+ sklog.Infof("Backing up graph with %d commits.", ri.graph.Len())
+ w := ri.gcs.FileWriter(ctx, ri.file, gcs.FILE_WRITE_OPTS_TEXT)
+ writeErr := ri.graph.WriteGob(w)
+ closeErr := w.Close()
+ if writeErr != nil && closeErr != nil {
+ return skerr.Wrapf(writeErr, "Failed to write Graph to GCS and failed to close GCS file with: %s", closeErr)
+ } else if writeErr != nil {
+ return skerr.Wrapf(writeErr, "Failed to write Graph to GCS.")
+ } else if closeErr != nil {
+ return skerr.Wrapf(closeErr, "Failed to close GCS file.")
+ }
+ return nil
+}
+
+// Wait for any push to the backing store to be finished.
+func (ri *initialIngestRepoImpl) Wait() {
+ for {
+ ri.writeRequestsMtx.Lock()
+ writeRequests := ri.writeRequests
+ ri.writeRequestsMtx.Unlock()
+ if writeRequests == 0 {
+ return
+ }
+ time.Sleep(time.Second)
+ }
+}
diff --git a/gitsync/go/watcher/initial_test.go b/gitsync/go/watcher/initial_test.go
new file mode 100644
index 0000000..4da1246
--- /dev/null
+++ b/gitsync/go/watcher/initial_test.go
@@ -0,0 +1,221 @@
+package watcher
+
+import (
+ "context"
+ "fmt"
+ "testing"
+
+ "github.com/google/uuid"
+ assert "github.com/stretchr/testify/require"
+ "go.skia.org/infra/go/deepequal"
+ "go.skia.org/infra/go/gcs/test_gcsclient"
+ "go.skia.org/infra/go/git"
+ "go.skia.org/infra/go/git/repograph"
+ git_testutils "go.skia.org/infra/go/git/testutils"
+ "go.skia.org/infra/go/gitiles"
+ gitiles_testutils "go.skia.org/infra/go/gitiles/testutils"
+ "go.skia.org/infra/go/gitstore"
+ "go.skia.org/infra/go/gitstore/mocks"
+ "go.skia.org/infra/go/mockhttpclient"
+ "go.skia.org/infra/go/testutils/unittest"
+ "go.skia.org/infra/go/util"
+ "go.skia.org/infra/go/vcsinfo"
+)
+
+func TestInitialIngestCommitBatch(t *testing.T) {
+ unittest.MediumTest(t)
+
+ ctx := context.Background()
+ ri := repograph.NewMemCacheRepoImpl(nil, nil)
+ graph, err := repograph.NewWithRepoImpl(ctx, ri)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, graph.Len())
+ assert.Equal(t, 0, len(graph.Branches()))
+
+ gb := git_testutils.GitInit(t, ctx)
+ defer gb.Cleanup()
+ gd := git.GitDir(gb.Dir())
+
+ commit := func() *vcsinfo.LongCommit {
+ h := gb.CommitGen(ctx, uuid.New().String())
+ d, err := gd.Details(ctx, h)
+ assert.NoError(t, err)
+ return d
+ }
+
+ test := func(cb *commitBatch, expectCommits, expectBranches int) {
+ assert.NoError(t, initialIngestCommitBatch(ctx, graph, ri, cb))
+ assert.Equal(t, expectCommits, graph.Len())
+ assert.Equal(t, expectBranches, len(graph.Branches()))
+ }
+
+ // Empty batch, nothing to do.
+ test(&commitBatch{}, 0, 0)
+
+ // Verify that we create a new branch.
+ c0 := commit()
+ test(&commitBatch{
+ branch: "mybranch", // Don't use master, to make sure we didn't pick it up accidentally.
+ commits: []*vcsinfo.LongCommit{c0},
+ }, 1, 1)
+ assert.True(t, util.In("mybranch", graph.Branches()))
+
+ // Verify that we walk the branch head forward for new commits.
+ c1 := commit()
+ test(&commitBatch{
+ branch: "mybranch",
+ commits: []*vcsinfo.LongCommit{c1},
+ }, 2, 1)
+ assert.Equal(t, c1.Hash, graph.BranchHeads()[0].Head)
+ assert.False(t, isFakeBranch(graph.BranchHeads()[0].Name))
+
+ // Add two commits, both based at c1. Ensure that we create a fake
+ // branch for the second one.
+ c2 := commit()
+ gb.CreateBranchTrackBranch(ctx, "mybranch2", "master")
+ gb.Reset(ctx, "--hard", c1.Hash)
+ c3 := commit()
+ test(&commitBatch{
+ branch: "mybranch",
+ commits: []*vcsinfo.LongCommit{c2, c3},
+ }, 4, 2)
+ var fakeBranch string
+ for _, b := range graph.BranchHeads() {
+ if b.Name == "mybranch" {
+ assert.False(t, isFakeBranch(b.Name))
+ assert.Equal(t, c2.Hash, b.Head)
+ } else {
+ fakeBranch = b.Name
+ assert.True(t, isFakeBranch(b.Name))
+ assert.Equal(t, c3.Hash, b.Head)
+ }
+ }
+ assert.NotEqual(t, "", fakeBranch)
+
+ // Add another commit on each branch. Ensure that we walk both branches
+ // forward as expected.
+ c4 := commit()
+ gb.CheckoutBranch(ctx, "master")
+ c5 := commit()
+ test(&commitBatch{
+ branch: "mybranch",
+ commits: []*vcsinfo.LongCommit{c4, c5},
+ }, 6, 2)
+ for _, b := range graph.BranchHeads() {
+ if b.Name == "mybranch" {
+ assert.False(t, isFakeBranch(b.Name))
+ assert.Equal(t, c5.Hash, b.Head)
+ } else {
+ assert.Equal(t, fakeBranch, b.Name)
+ assert.True(t, isFakeBranch(b.Name))
+ assert.Equal(t, c4.Hash, b.Head)
+ }
+ }
+
+ // Another commit on each, then merge. Ensure that we kept the real
+ // branch, not the fake one.
+ c6 := commit()
+ gb.CheckoutBranch(ctx, "mybranch2")
+ c7 := commit()
+ c8Hash := gb.MergeBranch(ctx, "master")
+ c8, err := gd.Details(ctx, c8Hash)
+ assert.NoError(t, err)
+ test(&commitBatch{
+ branch: "mybranch",
+ commits: []*vcsinfo.LongCommit{c6, c7, c8},
+ }, 9, 1)
+ b := graph.BranchHeads()[0]
+ assert.False(t, isFakeBranch(b.Name))
+ assert.Equal(t, "mybranch", b.Name)
+ assert.Equal(t, c8.Hash, b.Head)
+}
+
+func setupTestInitial(t *testing.T) (context.Context, *git_testutils.GitBuilder, *gitiles_testutils.MockRepo, *repoImpl, func()) {
+ ctx := context.Background()
+ g := git_testutils.GitInit(t, ctx)
+ gs := mocks.GitStore{}
+ gs.On("RangeByTime", ctx, vcsinfo.MinTime, vcsinfo.MaxTime, gitstore.ALL_BRANCHES).Return(nil, nil)
+ gs.On("GetBranches", ctx).Return(nil, nil)
+ urlMock := mockhttpclient.NewURLMock()
+ mockRepo := gitiles_testutils.NewMockRepo(t, g.RepoUrl(), git.GitDir(g.Dir()), urlMock)
+ repo := gitiles.NewRepo(g.RepoUrl(), "", urlMock.Client())
+ gcsClient := test_gcsclient.NewMemoryClient("fake-bucket")
+ ri, err := newRepoImpl(ctx, &gs, repo, gcsClient, "repo-ingestion")
+ assert.NoError(t, err)
+ return ctx, g, mockRepo, ri.(*repoImpl), g.Cleanup
+}
+
+func TestInitialIngestion(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, gb, mockRepo, ri, cleanup := setupTestInitial(t)
+ defer cleanup()
+
+ gd := git.GitDir(gb.Dir())
+
+ commit := func() *vcsinfo.LongCommit {
+ h := gb.CommitGen(ctx, uuid.New().String())
+ c, err := gd.Details(ctx, h)
+ assert.NoError(t, err)
+ return c
+ }
+
+ test := func(expectBranches, expectCommits int) {
+ // Clear the cache between every attempt.
+ ri.MemCacheRepoImpl = repograph.NewMemCacheRepoImpl(nil, nil)
+ mockRepo.MockBranches(ctx)
+ assert.NoError(t, ri.initialIngestion(ctx))
+ assert.Equal(t, expectBranches, len(ri.BranchList))
+ assert.Equal(t, expectCommits, len(ri.Commits))
+ assert.True(t, mockRepo.Empty())
+ }
+
+ // No commits, no branches; nothing to do.
+ test(0, 0)
+
+ // One commit.
+ c0 := commit()
+ mockRepo.MockLog(ctx, c0.Hash, gitiles.LogReverse(), gitiles.LogBatchSize(batchSize))
+ test(1, 1)
+ assert.Equal(t, "master", ri.BranchList[0].Name)
+ assert.Equal(t, c0.Hash, ri.BranchList[0].Head)
+ deepequal.AssertDeepEqual(t, ri.Commits[c0.Hash], c0)
+
+ // No new commits. Clear out the cache and ensure that we don't request
+ // the log of c0 again, because it's backed up in GCS.
+ test(1, 1)
+
+ // New commits on a non-master branch.
+ gb.CreateBranchTrackBranch(ctx, "branch2", "master")
+ var newBranchCommits []*vcsinfo.LongCommit
+ for i := 0; i < 10; i++ {
+ newBranchCommits = append(newBranchCommits, commit())
+ }
+ last := newBranchCommits[len(newBranchCommits)-1]
+ mockRepo.MockLog(ctx, last.Hash, gitiles.LogBatchSize(batchSize))
+ test(2, 11)
+ for _, b := range ri.BranchList {
+ if b.Name == "master" {
+ assert.Equal(t, c0.Hash, b.Head)
+ } else {
+ assert.Equal(t, "branch2", b.Name)
+ assert.Equal(t, last.Hash, b.Head)
+ }
+ }
+
+ // New commits on several new branches.
+ for i := 0; i < 10; i++ {
+ gb.CreateBranchTrackBranch(ctx, fmt.Sprintf("b%d", i), "master")
+ commits := []*vcsinfo.LongCommit{}
+ for j := 0; j < 10; j++ {
+ commits = append(commits, commit())
+ }
+ last = commits[len(commits)-1]
+ mockRepo.MockLog(ctx, last.Hash, gitiles.LogBatchSize(batchSize))
+ }
+ test(12, 111)
+
+ // One new commit on one of the branches. Ensure that we only request
+ // the new commit.
+ mockRepo.MockLog(ctx, git.LogFromTo(last.Hash, commit().Hash), gitiles.LogBatchSize(batchSize))
+ test(12, 112)
+}
diff --git a/gitsync/go/watcher/watcher.go b/gitsync/go/watcher/watcher.go
new file mode 100644
index 0000000..d2903fd
--- /dev/null
+++ b/gitsync/go/watcher/watcher.go
@@ -0,0 +1,548 @@
+package watcher
+
+import (
+ "context"
+ "fmt"
+ "path"
+ "runtime/debug"
+ "sync"
+ "time"
+
+ "cloud.google.com/go/storage"
+ "go.skia.org/infra/go/cleanup"
+ "go.skia.org/infra/go/gcs"
+ "go.skia.org/infra/go/gcs/gcsclient"
+ "go.skia.org/infra/go/git"
+ "go.skia.org/infra/go/git/repograph"
+ "go.skia.org/infra/go/gitiles"
+ "go.skia.org/infra/go/gitstore"
+ "go.skia.org/infra/go/gitstore/bt_gitstore"
+ "go.skia.org/infra/go/metrics2"
+ "go.skia.org/infra/go/skerr"
+ "go.skia.org/infra/go/sklog"
+ "go.skia.org/infra/go/timer"
+ "go.skia.org/infra/go/vcsinfo"
+ "golang.org/x/sync/errgroup"
+)
+
+const (
+ // batchSize is the number of commits retrieved at a time from Gitiles.
+ batchSize = 10000
+)
+
+// Start creates a GitStore with the provided information and starts periodic
+// ingestion.
+func Start(ctx context.Context, conf *bt_gitstore.BTConfig, repoURL, gitcookiesPath, gcsBucket, gcsPath string, interval time.Duration) error {
+ sklog.Infof("Initializing watcher for %s", repoURL)
+ gitStore, err := bt_gitstore.New(ctx, conf, repoURL)
+ if err != nil {
+ return skerr.Wrapf(err, "Error instantiating git store for %s.", repoURL)
+ }
+ gr := gitiles.NewRepo(repoURL, gitcookiesPath, nil)
+ s, err := storage.NewClient(ctx)
+ if err != nil {
+ return skerr.Wrapf(err, "Failed to create storage client for %s.", gcsBucket)
+ }
+ gcsClient := gcsclient.New(s, gcsBucket)
+ ri, err := newRepoImpl(ctx, gitStore, gr, gcsClient, gcsPath)
+ if err != nil {
+ return skerr.Wrapf(err, "Failed to create RepoImpl for %s; using gs://%s/%s.", repoURL, gcsBucket, gcsPath)
+ }
+ sklog.Infof("Building Graph for %s...", repoURL)
+ repo, err := repograph.NewWithRepoImpl(ctx, ri)
+ if err != nil {
+ return skerr.Wrapf(err, "Failed to create repo graph for %s.", repoURL)
+ }
+ repo.UpdateBranchInfo()
+
+ // Start periodic ingestion.
+ lvGitSync := metrics2.NewLiveness("last_successful_git_sync", map[string]string{"repo": repoURL})
+ cleanup.Repeat(interval, func(ctx context.Context) {
+ defer timer.New("Sync " + repoURL).Stop()
+ // Catch any panic and log relevant information to find the root cause.
+ defer func() {
+ if err := recover(); err != nil {
+ sklog.Errorf("Panic updating %s: %s\n%s", repoURL, err, string(debug.Stack()))
+ }
+ }()
+
+ sklog.Infof("Updating %s...", repoURL)
+ if err := repo.Update(ctx); err != nil {
+ sklog.Errorf("Error updating %s: %s", repoURL, err)
+ } else {
+ gotBranches, err := gitStore.GetBranches(ctx)
+ if err != nil {
+ sklog.Errorf("Successfully updated %s but failed to retrieve branch heads: %s", repoURL, err)
+ } else {
+ sklog.Infof("Successfully updated %s", repoURL)
+ for name, branch := range gotBranches {
+ sklog.Debugf(" %s@%s: %d, %s", path.Base(repoURL), name, branch.Index, branch.Head)
+ }
+ }
+ lvGitSync.Reset()
+ }
+ }, nil)
+ return nil
+}
+
+// repoImpl is an implementation of repograph.RepoImpl which loads commits into
+// a GitStore.
+type repoImpl struct {
+ *repograph.MemCacheRepoImpl
+ gcsClient gcs.GCSClient
+ gcsPath string
+ gitiles *gitiles.Repo
+ gitstore gitstore.GitStore
+}
+
+// newRepoImpl returns a repograph.RepoImpl which uses both Gitiles and
+// GitStore.
+func newRepoImpl(ctx context.Context, gs gitstore.GitStore, repo *gitiles.Repo, gcsClient gcs.GCSClient, gcsPath string) (repograph.RepoImpl, error) {
+ indexCommits, err := gs.RangeByTime(ctx, vcsinfo.MinTime, vcsinfo.MaxTime, gitstore.ALL_BRANCHES)
+ if err != nil {
+ return nil, skerr.Wrapf(err, "Failed loading IndexCommits from GitStore.")
+ }
+ var commits []*vcsinfo.LongCommit
+ if len(indexCommits) > 0 {
+ hashes := make([]string, 0, len(indexCommits))
+ for _, c := range indexCommits {
+ hashes = append(hashes, c.Hash)
+ }
+ commits, err = gs.Get(ctx, hashes)
+ if err != nil {
+ return nil, skerr.Wrapf(err, "Failed loading LongCommits from GitStore.")
+ }
+ }
+ gb, err := gs.GetBranches(ctx)
+ if err != nil {
+ return nil, skerr.Wrapf(err, "Failed loading branches from GitStore.")
+ }
+ branches := make([]*git.Branch, 0, len(gb))
+ for name, branch := range gb {
+ branches = append(branches, &git.Branch{
+ Name: name,
+ Head: branch.Head,
+ })
+ }
+ commitsMap := make(map[string]*vcsinfo.LongCommit, len(commits))
+ for _, c := range commits {
+ commitsMap[c.Hash] = c
+ }
+ sklog.Infof("Repo %s has %d commits and %d branches.", repo.URL, len(commits), len(branches))
+ for _, b := range branches {
+ sklog.Infof(" branch %s @ %s", b.Name, b.Head)
+ }
+ return &repoImpl{
+ MemCacheRepoImpl: repograph.NewMemCacheRepoImpl(commitsMap, branches),
+ gcsClient: gcsClient,
+ gcsPath: gcsPath,
+ gitiles: repo,
+ gitstore: gs,
+ }, nil
+}
+
+type commitBatch struct {
+ branch string
+ commits []*vcsinfo.LongCommit
+}
+
+// processCommits processes commits in a separate goroutine while the
+// passed-in func loads commits from Gitiles.
+func (r *repoImpl) processCommits(ctx context.Context, process func(context.Context, *commitBatch) error, loadCommits func(context.Context, chan<- *commitBatch) error) error {
+ // Run GitStore ingestion in a goroutine. Create a cancelable context
+ // to halt requests to Gitiles if GitStore ingestion fails.
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+ commitsCh := make(chan *commitBatch)
+ errCh := make(chan error)
+ go func() {
+ var err error
+ for cb := range commitsCh {
+ if err != nil {
+ // We've hit an error but we need to consume
+ // all of the commits from the channel before
+ // returning, or the passed-in func may block
+ // forever.
+ sklog.Warningf("Skipping %d commits due to previous error.", len(cb.commits))
+ continue
+ }
+ // Process the commits.
+ if process != nil {
+ err = process(ctx, cb)
+ }
+ if err != nil {
+ if err != gitiles.ErrStopIteration {
+ sklog.Errorf("processCommits encountered error: %s", err)
+ }
+ // Cancel the context passed to loadCommits().
+ // If it respects context.Done() as it's
+ // supposed to, then it should exit early with
+ // an error.
+ cancel()
+ }
+ }
+ // Signal that we're done ingesting commits.
+ errCh <- err
+ }()
+
+ // Run the passed-in func.
+ loadingErr := loadCommits(ctx, commitsCh)
+
+ // Close the commits channel, wait for the goroutine to complete.
+ close(commitsCh)
+ processErr := <-errCh
+
+ // The error returned from the gitstore goroutine takes precedence,
+ // because we cancel the context when gitstore.Put fails, and thus
+ // loadCommits() may return an error simply stating that the context was
+ // canceled.
+ if processErr != nil {
+ if processErr == gitiles.ErrStopIteration {
+ // Ignore the loadingErr in this case, since it's almost
+ // certainly just "context canceled".
+ return nil
+ }
+ if loadingErr != nil {
+ return skerr.Wrapf(processErr, "GitStore ingestion failed, and commit-loading func failed with: %s", loadingErr)
+ }
+ return processErr
+ }
+ return loadingErr
+}
+
+// loadCommitsFromGitiles loads commits from Gitiles and pushes them onto the
+// given channel, until we reach the optional from commit, or any other commit
+// we've seen before.
+func (r *repoImpl) loadCommitsFromGitiles(ctx context.Context, branch, logExpr string, commitsCh chan<- *commitBatch, opts ...gitiles.LogOption) error {
+ return r.gitiles.LogFnBatch(ctx, logExpr, func(ctx context.Context, commits []*vcsinfo.LongCommit) error {
+ commitsCh <- &commitBatch{
+ branch: branch,
+ commits: commits,
+ }
+ return nil
+ }, opts...)
+}
+
+// initialIngestion performs the first-time ingestion of the repo.
+func (r *repoImpl) initialIngestion(ctx context.Context) error {
+ sklog.Warningf("Performing initial ingestion of %s.", r.gitiles.URL)
+ defer timer.New("Initial ingestion").Stop()
+
+ // Create a tmpGitStore.
+ sklog.Info("Retrieving graph from temporary store.")
+ t := timer.New("Retrieving graph from temp store")
+ graph, ri, err := setupInitialIngest(ctx, r.gcsClient, r.gcsPath, r.gitiles.URL)
+ if err != nil {
+ return skerr.Wrapf(err, "Failed initial ingestion of %s using GCS file %s", r.gitiles.URL, r.gcsPath)
+ }
+ for _, c := range graph.GetAll() {
+ r.Commits[c.Hash] = c.LongCommit
+ }
+ // oldBranches maps branch names to commit hashes for the existing
+ // branches.
+ oldBranches := map[string]string{}
+ for _, b := range graph.BranchHeads() {
+ oldBranches[b.Name] = b.Head
+ }
+ t.Stop()
+
+ // Find the current set of branches.
+ t = timer.New("Loading commits from gitiles")
+ branches, err := r.gitiles.Branches(ctx)
+ if err != nil {
+ return skerr.Wrapf(err, "Failed loading branches from Gitiles.")
+ }
+
+ // Load commits from gitiles.
+
+ // We assume that master contains the majority of the commits in the
+ // repo, and the other branches are comparatively small, with most of
+ // their ancestry being on master itself.
+ var master *git.Branch
+ for _, b := range branches {
+ if b.Name == "master" {
+ master = b
+ break
+ }
+ }
+ if master != nil {
+ if master.Head == oldBranches[master.Name] {
+ sklog.Infof("Master is up to date; skipping.")
+ } else {
+ sklog.Info("Loading commits from gitiles for master.")
+ if err := r.processCommits(ctx, func(ctx context.Context, cb *commitBatch) error {
+ // Add the new commits to our local cache.
+ sklog.Infof("Adding batch of %d commits", len(cb.commits))
+ for _, c := range cb.commits {
+ r.Commits[c.Hash] = c
+ }
+ return initialIngestCommitBatch(ctx, graph, ri.MemCacheRepoImpl, cb)
+ }, func(ctx context.Context, commitsCh chan<- *commitBatch) error {
+ logExpr := master.Head
+ if oldHead, ok := oldBranches[master.Name]; ok {
+ sklog.Errorf("Have master @ %s; requesting %s", oldHead, master.Head)
+ logExpr = fmt.Sprintf("%s..%s", oldHead, master.Head)
+ }
+ return r.loadCommitsFromGitiles(ctx, master.Name, logExpr, commitsCh, gitiles.LogReverse(), gitiles.LogBatchSize(batchSize))
+ }); err != nil {
+ return skerr.Wrapf(err, "Failed to ingest commits for master.")
+ }
+ }
+ }
+ ri.Wait()
+
+ // mtx protects graph and ri.commits as we load commits for non-master
+ // branches in different goroutines.
+ var mtx sync.Mutex
+
+ // Load commits for other branches, in non-reverse order, so that we can
+ // stop once we reach commits already on the master branch.
+ var egroup errgroup.Group
+
+ for _, branch := range branches {
+ // https://golang.org/doc/faq#closures_and_goroutines
+ branch := branch
+ if branch == master {
+ continue
+ }
+ sklog.Infof("Loading commits for %s", branch.Name)
+ mtx.Lock()
+ _, exists := r.Commits[branch.Head]
+ mtx.Unlock()
+ if exists {
+ sklog.Infof(" ... already have %s, skip", branch.Head)
+ continue
+ }
+ egroup.Go(func() error {
+ var commits []*vcsinfo.LongCommit
+ mtx.Lock()
+ localCache := make(map[string]*vcsinfo.LongCommit, len(r.Commits))
+ for h, c := range r.Commits {
+ localCache[h] = c
+ }
+ mtx.Unlock()
+
+ // lookingFor tracks which commits are wanted by this
+ // branch. As we traverse back through git history, we
+ // may find commits which we already have in our local
+ // cache. One might thing that we could stop requesting
+ // batches of commits at that point, but if there's a
+ // commit with multiple parents on this branch, we have
+ // to make sure we follow all lines of history, which
+ // means that we need to track all of the commit hashes
+ // which we expect to find but have not yet. We can stop
+ // when we've found all of the hashes we're looking for.
+ lookingFor := map[string]bool{}
+ if err := r.processCommits(ctx, func(ctx context.Context, cb *commitBatch) error {
+ numIngested := 0
+ defer func() {
+ sklog.Infof("Added %d of batch of %d commits", numIngested, len(cb.commits))
+ }()
+ for _, c := range cb.commits {
+ // Remove this commit from lookingFor,
+ // now that we've found it.
+ delete(lookingFor, c.Hash)
+ // Add the commit to the local cache,
+ // if it's not already present. Track
+ // the number of new commits we've seen.
+ if _, ok := localCache[c.Hash]; !ok {
+ commits = append(commits, c)
+ localCache[c.Hash] = c
+ numIngested++
+ }
+ // Add any parents of this commit which
+ // are not already in our cache to the
+ // lookingFor set.
+ for _, p := range c.Parents {
+ if _, ok := localCache[p]; !ok {
+ lookingFor[p] = true
+ }
+ }
+ // If we've found all the commits we
+ // need, we can stop.
+ if len(lookingFor) == 0 {
+ return gitiles.ErrStopIteration
+ }
+ }
+ return nil
+ }, func(ctx context.Context, commitsCh chan<- *commitBatch) error {
+ logExpr := branch.Head
+ if oldHead, ok := oldBranches[branch.Name]; ok {
+ logExpr = git.LogFromTo(oldHead, branch.Head)
+ }
+ return r.loadCommitsFromGitiles(ctx, branch.Name, logExpr, commitsCh, gitiles.LogBatchSize(batchSize))
+ }); err != nil {
+ return skerr.Wrapf(err, "Failed to ingest commits for branch %s.", branch.Name)
+ }
+ // Reverse the slice of commits so that they can be added in
+ // order.
+ for i := 0; i < len(commits)/2; i++ {
+ j := len(commits) - i - 1
+ commits[i], commits[j] = commits[j], commits[i]
+ }
+ mtx.Lock()
+ defer mtx.Unlock()
+ if err := initialIngestCommitBatch(ctx, graph, ri.MemCacheRepoImpl, &commitBatch{
+ branch: branch.Name,
+ commits: commits,
+ }); err != nil {
+ return skerr.Wrapf(err, "Failed to add commits to graph for %s", branch.Name)
+ }
+ for _, c := range commits {
+ r.Commits[c.Hash] = c
+ }
+ sklog.Infof("Loading commits for %s done", branch.Name)
+ return nil
+ })
+ }
+ // Wait for the above goroutines to finish.
+ if err := egroup.Wait(); err != nil {
+ return skerr.Wrap(err)
+ }
+ // Wait for the initialIngestRepoImpl to finish backing up to GCS.
+ ri.Wait()
+ t.Stop()
+
+ // Replace the fake branches with real ones. Update branch membership.
+ ri.BranchList = branches
+ if err := graph.Update(ctx); err != nil {
+ return skerr.Wrapf(err, "Failed final Graph update.")
+ }
+ ri.Wait()
+ r.BranchList = branches
+ sklog.Infof("Finished initial ingestion of %s; have %d commits and %d branches.", r.gitiles.URL, graph.Len(), len(graph.Branches()))
+ return nil
+}
+
+// See documentation for RepoImpl interface.
+func (r *repoImpl) Update(ctx context.Context) error {
+ sklog.Infof("repoImpl.Update for %s", r.gitiles.URL)
+ defer timer.New("repoImpl.Update for " + r.gitiles.URL).Stop()
+ if len(r.BranchList) == 0 {
+ if err := r.initialIngestion(ctx); err != nil {
+ return skerr.Wrapf(err, "Failed initial ingestion.")
+ }
+ }
+
+ // Find the old and new branch heads.
+ sklog.Infof("Getting branches for %s.", r.gitiles.URL)
+ oldBranches := make(map[string]*git.Branch, len(r.BranchList))
+ for _, branch := range r.BranchList {
+ oldBranches[branch.Name] = branch
+ }
+ branches, err := r.gitiles.Branches(ctx)
+ if err != nil {
+ return skerr.Wrapf(err, "Failed loading branches from Gitiles.")
+ }
+
+ // Download any new commits and add them to the local cache.
+ sklog.Infof("Processing new commits for %s.", r.gitiles.URL)
+ if err := r.processCommits(ctx, func(ctx context.Context, cb *commitBatch) error {
+ // Add the new commits to our local cache.
+ for _, c := range cb.commits {
+ r.Commits[c.Hash] = c
+ }
+ return nil
+ }, func(ctx context.Context, commitsCh chan<- *commitBatch) error {
+ for _, branch := range branches {
+ logExpr := branch.Head
+ if oldBranch, ok := oldBranches[branch.Name]; ok {
+ // If there's nothing new, skip this branch.
+ if branch.Head == oldBranch.Head {
+ continue
+ }
+ // Only load back to the previous branch head.
+ logExpr = fmt.Sprintf("%s..%s", oldBranch.Head, branch.Head)
+ }
+ if err := r.loadCommitsFromGitiles(ctx, branch.Name, logExpr, commitsCh); err != nil {
+ return skerr.Wrapf(err, "Failed loading commits for %s", branch.Head)
+ }
+ }
+ return nil
+ }); err != nil {
+ return err
+ }
+ r.BranchList = branches
+ return nil
+}
+
+// See documentation for RepoImpl interface.
+func (r *repoImpl) Details(ctx context.Context, hash string) (*vcsinfo.LongCommit, error) {
+ c, err := r.MemCacheRepoImpl.Details(ctx, hash)
+ if err == nil {
+ return c, nil
+ }
+ // Fall back to retrieving from Gitiles, store any new commits in the
+ // local cache.
+ sklog.Errorf("Missing commit %s in %s", hash[:7], r.gitiles.URL)
+ lookingFor := map[string]bool{}
+ if err := r.processCommits(ctx, func(ctx context.Context, cb *commitBatch) error {
+ // Add the new commits to our local cache.
+ for _, c := range cb.commits {
+ delete(lookingFor, c.Hash)
+ for _, p := range c.Parents {
+ if _, ok := r.Commits[p]; !ok {
+ lookingFor[p] = true
+ }
+ }
+ r.Commits[c.Hash] = c
+ if len(lookingFor) == 0 {
+ return gitiles.ErrStopIteration
+ }
+ }
+ return nil
+ }, func(ctx context.Context, commitsCh chan<- *commitBatch) error {
+ return r.loadCommitsFromGitiles(ctx, "", hash, commitsCh)
+ }); err != nil {
+ return nil, err
+ }
+ c, ok := r.Commits[hash]
+ if !ok {
+ return nil, skerr.Fmt("Commit %s in %s is still missing despite attempting to load it from gitiles.", hash, r.gitiles.URL)
+ }
+ return c, nil
+}
+
+// See documentation for RepoImpl interface.
+func (r *repoImpl) UpdateCallback(ctx context.Context, added, removed []*vcsinfo.LongCommit, graph *repograph.Graph) error {
+ sklog.Infof("repoImpl.UpdateCallback for %s", r.gitiles.URL)
+ defer timer.New("repoImpl.UpdateCallback for " + r.gitiles.URL).Stop()
+ // Ensure that branch membership is up to date.
+ modified := graph.UpdateBranchInfo()
+ modifiedMap := make(map[string]*vcsinfo.LongCommit, len(modified)+len(added))
+ for _, c := range added {
+ modifiedMap[c.Hash] = c
+ }
+ for _, c := range modified {
+ modifiedMap[c.Hash] = c
+ }
+ // Don't include commits in the 'removed' list.
+ for _, c := range removed {
+ delete(modifiedMap, c.Hash)
+ }
+ putCommits := make([]*vcsinfo.LongCommit, 0, len(modifiedMap))
+ for _, c := range modifiedMap {
+ putCommits = append(putCommits, c)
+ }
+ sklog.Infof("Put %d new and %d modified commits for %s.", len(added), len(modified), r.gitiles.URL)
+ if err := r.gitstore.Put(ctx, putCommits); err != nil {
+ return skerr.Wrapf(err, "Failed putting commits into GitStore.")
+ }
+ // TODO(borenet): Should we delete commits which were removed?
+ branchHeads := graph.BranchHeads()
+ branches := make(map[string]string, len(branchHeads))
+ for _, b := range branchHeads {
+ branches[b.Name] = b.Head
+ }
+ // Explicitly delete any old branches which are no longer present.
+ oldBranches, err := r.gitstore.GetBranches(ctx)
+ if err != nil {
+ return skerr.Wrapf(err, "Failed to retrieve old branch heads.")
+ }
+ for name := range oldBranches {
+ if _, ok := branches[name]; !ok {
+ branches[name] = gitstore.DELETE_BRANCH
+ }
+ }
+ return r.gitstore.PutBranches(ctx, branches)
+}
diff --git a/gitsync/go/watcher/watcher_test.go b/gitsync/go/watcher/watcher_test.go
new file mode 100644
index 0000000..8475389
--- /dev/null
+++ b/gitsync/go/watcher/watcher_test.go
@@ -0,0 +1,368 @@
+package watcher
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "strings"
+ "testing"
+ "time"
+
+ assert "github.com/stretchr/testify/require"
+ "go.skia.org/infra/go/deepequal"
+ "go.skia.org/infra/go/gcs/test_gcsclient"
+ "go.skia.org/infra/go/git"
+ "go.skia.org/infra/go/git/repograph"
+ repograph_shared_tests "go.skia.org/infra/go/git/repograph/shared_tests"
+ git_testutils "go.skia.org/infra/go/git/testutils"
+ "go.skia.org/infra/go/gitiles"
+ gitiles_testutils "go.skia.org/infra/go/gitiles/testutils"
+ "go.skia.org/infra/go/gitstore"
+ "go.skia.org/infra/go/gitstore/mocks"
+ gitstore_testutils "go.skia.org/infra/go/gitstore/testutils"
+ "go.skia.org/infra/go/mockhttpclient"
+ "go.skia.org/infra/go/sklog"
+ "go.skia.org/infra/go/testutils"
+ "go.skia.org/infra/go/testutils/unittest"
+ "go.skia.org/infra/go/util"
+ "go.skia.org/infra/go/vcsinfo"
+)
+
+func TestIngestCommits(t *testing.T) {
+ unittest.SmallTest(t)
+
+ ctx := context.Background()
+ gs := &mocks.GitStore{}
+ ri := &repoImpl{
+ MemCacheRepoImpl: repograph.NewMemCacheRepoImpl(nil, nil),
+ gitstore: gs,
+ }
+ idx := 0
+ makeCommits := func(hashes ...string) []*vcsinfo.LongCommit {
+ rv := make([]*vcsinfo.LongCommit, 0, len(hashes))
+ for _, h := range hashes {
+ rv = append(rv, &vcsinfo.LongCommit{
+ ShortCommit: &vcsinfo.ShortCommit{
+ Hash: h,
+ },
+ Index: idx,
+ Branches: map[string]bool{"master": true},
+ })
+ idx++
+ }
+ return rv
+ }
+
+ totalIngested := 0
+ assertNew := func(numNew int) {
+ totalIngested += numNew
+ assert.Equal(t, totalIngested, len(ri.Commits))
+ }
+ process := func(ctx context.Context, cb *commitBatch) error {
+ if err := ri.gitstore.Put(ctx, cb.commits); err != nil {
+ return err
+ }
+ for _, c := range cb.commits {
+ ri.Commits[c.Hash] = c
+ }
+ return nil
+ }
+ // Ingest a single commit.
+ assert.NoError(t, ri.processCommits(ctx, process, func(ctx context.Context, ch chan<- *commitBatch) error {
+ commits := makeCommits("abc123")
+ gs.On("Put", ctx, commits).Return(nil)
+ gs.On("PutBranches", ctx, map[string]string{"master": commits[len(commits)-1].Hash}).Return(nil)
+ ch <- &commitBatch{
+ commits: commits,
+ }
+ return nil
+ }))
+ assertNew(1)
+
+ // Ingest a series of commits.
+ assert.NoError(t, ri.processCommits(ctx, process, func(ctx context.Context, ch chan<- *commitBatch) error {
+ for i := 1; i < 5; i++ {
+ hashes := make([]string, 0, i)
+ for j := 0; j < i; j++ {
+ hashes = append(hashes, fmt.Sprintf("%dabc%d", i, j))
+ }
+ commits := makeCommits(hashes...)
+ gs.On("Put", ctx, commits).Return(nil)
+ gs.On("PutBranches", ctx, map[string]string{"master": commits[len(commits)-1].Hash}).Return(nil)
+ ch <- &commitBatch{
+ commits: commits,
+ }
+ }
+ return nil
+ }))
+ assertNew(1 + 2 + 3 + 4)
+
+ // If the passed-in func returns an error, it should propagate, and the
+ // previously-queued commits should still get ingested.
+ err := errors.New("commit retrieval failed.")
+ assert.Equal(t, err, ri.processCommits(ctx, process, func(ctx context.Context, ch chan<- *commitBatch) error {
+ commits := makeCommits("def456")
+ gs.On("Put", ctx, commits).Return(nil)
+ gs.On("PutBranches", ctx, map[string]string{"master": commits[len(commits)-1].Hash}).Return(nil)
+ ch <- &commitBatch{
+ commits: commits,
+ }
+ return err
+ }))
+ assertNew(1)
+
+ // Ensure that the context gets canceled if ingestion fails.
+ err = ri.processCommits(ctx, process, func(ctx context.Context, ch chan<- *commitBatch) error {
+ for i := 5; i < 10; i++ {
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+ hashes := make([]string, 0, i)
+ for j := 0; j < i; j++ {
+ hashes = append(hashes, fmt.Sprintf("%dabc%d", i, j))
+ }
+ commits := makeCommits(hashes...)
+ if i == 7 {
+ gs.On("Put", ctx, commits).Return(errors.New("commit ingestion failed."))
+ } else {
+ gs.On("Put", ctx, commits).Return(nil)
+ gs.On("PutBranches", ctx, map[string]string{"master": commits[len(commits)-1].Hash}).Return(nil)
+ }
+ ch <- &commitBatch{
+ commits: commits,
+ }
+ }
+ return nil
+ })
+ sklog.Errorf("Error: %s", err.Error())
+ assert.True(t, strings.Contains(err.Error(), "commit ingestion failed"))
+ assert.True(t, strings.Contains(err.Error(), "and commit-loading func failed with: context canceled"))
+ assertNew(5 + 6)
+}
+
+// gitsyncRefresher is an implementation of repograph_shared_tests.RepoImplRefresher
+// used for testing gitsync.
+type gitsyncRefresher struct {
+ gitiles *gitiles_testutils.MockRepo
+ graph *repograph.Graph
+ gs gitstore.GitStore
+ initialSync bool
+ oldBranches map[string]string
+ repo *git.Repo
+ t *testing.T
+}
+
+func newGitsyncRefresher(t *testing.T, ctx context.Context, gs gitstore.GitStore, gb *git_testutils.GitBuilder, mr *gitiles_testutils.MockRepo) repograph_shared_tests.RepoImplRefresher {
+ repo := &git.Repo{GitDir: git.GitDir(gb.Dir())}
+ branches, err := repo.Branches(ctx)
+ assert.NoError(t, err)
+ oldBranches := make(map[string]string, len(branches))
+ for _, b := range branches {
+ oldBranches[b.Name] = b.Head
+ }
+ return &gitsyncRefresher{
+ gitiles: mr,
+ graph: nil, // Set later in setupGitsync, after the graph is created.
+ gs: gs,
+ initialSync: true,
+ oldBranches: oldBranches,
+ repo: repo,
+ t: t,
+ }
+}
+
+func (u *gitsyncRefresher) Refresh(commits ...*vcsinfo.LongCommit) {
+ ctx := context.Background()
+
+ assert.True(u.t, u.gitiles.Empty())
+
+ // Check the GitStore contents before updating the underlying repo.
+ u.checkIngestion(ctx)
+
+ // Update the backing repo.
+ assert.NoError(u.t, u.repo.Update(ctx))
+
+ // Mock calls to gitiles.
+ branches, err := u.repo.Branches(ctx)
+ assert.NoError(u.t, err)
+ branchMap := make(map[string]string, len(branches))
+ for _, b := range branches {
+ oldHead := u.oldBranches[b.Name]
+ if b.Head != oldHead {
+ logExpr := b.Head
+ if oldHead != "" {
+ logExpr = fmt.Sprintf("%s..%s", oldHead, b.Head)
+ }
+ var opts []gitiles.LogOption
+ if u.initialSync && b.Name == "master" {
+ opts = append(opts, gitiles.LogReverse(), gitiles.LogBatchSize(batchSize))
+ }
+ u.gitiles.MockLog(ctx, logExpr, opts...)
+ }
+ branchMap[b.Name] = b.Head
+ }
+ u.oldBranches = branchMap
+ u.gitiles.MockBranches(ctx)
+ if u.initialSync {
+ u.gitiles.MockBranches(ctx)
+ u.initialSync = false
+ }
+}
+
+// checkIngestion asserts that the contents of the GitStore match those of the
+// repograph.Graph.
+func (u *gitsyncRefresher) checkIngestion(ctx context.Context) {
+ if u.graph == nil {
+ return
+ }
+
+ // Wait for GitStore to be up to date.
+ branchHeads := u.graph.BranchHeads()
+ expectBranches := make(map[string]string, len(branchHeads))
+ for _, b := range branchHeads {
+ expectBranches[b.Name] = b.Head
+ }
+ assert.NoError(u.t, testutils.EventuallyConsistent(time.Second, func() error {
+ actual, err := u.gs.GetBranches(ctx)
+ assert.NoError(u.t, err)
+ for name, expect := range expectBranches {
+ actualBranch, ok := actual[name]
+ if !ok || actualBranch.Head != expect {
+ sklog.Errorf("%s is %+v, expect %s", name, actualBranch, expect)
+ time.Sleep(10 * time.Millisecond)
+ return testutils.TryAgainErr
+ }
+ }
+ for name := range actual {
+ if _, ok := expectBranches[name]; name != gitstore.ALL_BRANCHES && !ok {
+ sklog.Errorf("Expected %s not to be present", name)
+ time.Sleep(10 * time.Millisecond)
+ return testutils.TryAgainErr
+ }
+ }
+ return nil
+ }))
+
+ // Assert that the branch heads are the same.
+ gotBranches, err := u.gs.GetBranches(ctx)
+ assert.NoError(u.t, err)
+ delete(gotBranches, gitstore.ALL_BRANCHES)
+ assert.Equal(u.t, len(expectBranches), len(gotBranches))
+ for name, head := range expectBranches {
+ assert.Equal(u.t, head, gotBranches[name].Head)
+ }
+
+ // Assert that all LongCommits are present and correct.
+ iCommits, err := u.gs.RangeByTime(ctx, vcsinfo.MinTime, vcsinfo.MaxTime, gitstore.ALL_BRANCHES)
+ assert.NoError(u.t, err)
+ hashes := make([]string, 0, len(iCommits))
+ for _, c := range iCommits {
+ hashes = append(hashes, c.Hash)
+ }
+ longCommits, err := u.gs.Get(ctx, hashes)
+ assert.NoError(u.t, err)
+ commits := make(map[string]*vcsinfo.LongCommit, len(hashes))
+ for _, c := range longCommits {
+ assert.NotNil(u.t, c)
+ commits[c.Hash] = c
+ }
+ for _, c := range u.graph.GetAll() {
+ deepequal.AssertDeepEqual(u.t, c.LongCommit, commits[c.Hash])
+ }
+
+ // Assert that the IndexCommits are correct for each branch.
+ for name := range expectBranches {
+ branchPtr := gotBranches[name]
+ branchCommits, err := u.graph.LogLinear("", name)
+ assert.NoError(u.t, err)
+ expectIndexCommits := make([]*vcsinfo.IndexCommit, 0, len(branchCommits))
+ for i := len(branchCommits) - 1; i >= 0; i-- {
+ c := branchCommits[i]
+ expectIndexCommits = append(expectIndexCommits, &vcsinfo.IndexCommit{
+ Hash: c.Hash,
+ Index: len(expectIndexCommits),
+ Timestamp: c.Timestamp.UTC(),
+ })
+ }
+
+ // RangeN.
+ gotIndexCommits, err := u.gs.RangeN(ctx, 0, branchPtr.Index+1, name)
+ assert.NoError(u.t, err)
+ deepequal.AssertDeepEqual(u.t, expectIndexCommits, gotIndexCommits)
+
+ // RangeByTime.
+ gotIndexCommits, err = u.gs.RangeByTime(ctx, vcsinfo.MinTime, vcsinfo.MaxTime, name)
+ assert.NoError(u.t, err)
+ deepequal.AssertDeepEqual(u.t, expectIndexCommits, gotIndexCommits)
+ }
+}
+
+// setupGitsync performs common setup for GitStore based Graphs.
+func setupGitsync(t *testing.T) (context.Context, *git_testutils.GitBuilder, *repograph.Graph, repograph_shared_tests.RepoImplRefresher, func()) {
+ ctx, g, cleanup := repograph_shared_tests.CommonSetup(t)
+ wd, err := ioutil.TempDir("", "")
+ assert.NoError(t, err)
+ defer util.RemoveAll(wd)
+ _, _, gs := gitstore_testutils.SetupAndLoadBTGitStore(t, ctx, wd, g.RepoUrl(), true)
+ urlMock := mockhttpclient.NewURLMock()
+ mockRepo := gitiles_testutils.NewMockRepo(t, g.RepoUrl(), git.GitDir(g.Dir()), urlMock)
+ repo := gitiles.NewRepo(g.RepoUrl(), "", urlMock.Client())
+ gcsClient := test_gcsclient.NewMemoryClient("fake-bucket")
+ ri, err := newRepoImpl(ctx, gs, repo, gcsClient, "repo-ingestion")
+ assert.NoError(t, err)
+ ud := newGitsyncRefresher(t, ctx, gs, g, mockRepo)
+ graph, err := repograph.NewWithRepoImpl(ctx, ri)
+ assert.NoError(t, err)
+ ud.(*gitsyncRefresher).graph = graph
+ return ctx, g, graph, ud, cleanup
+}
+
+func TestGraphWellFormedGitSync(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, g, repo, ud, cleanup := setupGitsync(t)
+ defer cleanup()
+ repograph_shared_tests.TestGraphWellFormed(t, ctx, g, repo, ud)
+}
+
+func TestRecurseGitSync(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, g, repo, ud, cleanup := setupGitsync(t)
+ defer cleanup()
+ repograph_shared_tests.TestRecurse(t, ctx, g, repo, ud)
+}
+
+func TestRecurseAllBranchesGitSync(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, g, repo, ud, cleanup := setupGitsync(t)
+ defer cleanup()
+ repograph_shared_tests.TestRecurseAllBranches(t, ctx, g, repo, ud)
+}
+
+func TestUpdateHistoryChangedGitSync(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, g, repo, ud, cleanup := setupGitsync(t)
+ defer cleanup()
+ repograph_shared_tests.TestUpdateHistoryChanged(t, ctx, g, repo, ud)
+}
+
+func TestUpdateAndReturnCommitDiffsGitSync(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, g, repo, ud, cleanup := setupGitsync(t)
+ defer cleanup()
+ repograph_shared_tests.TestUpdateAndReturnCommitDiffs(t, ctx, g, repo, ud)
+}
+
+func TestRevListGitSync(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, g, repo, ud, cleanup := setupGitsync(t)
+ defer cleanup()
+ repograph_shared_tests.TestRevList(t, ctx, g, repo, ud)
+}
+
+func TestBranchMembershipGitSync(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, g, repo, ud, cleanup := setupGitsync(t)
+ defer cleanup()
+ repograph_shared_tests.TestBranchMembership(t, ctx, g, repo, ud)
+}
diff --git a/go/gitstore/bt_gitstore/bt_gitstore.go b/go/gitstore/bt_gitstore/bt_gitstore.go
index 8a0e503..35acd3b 100644
--- a/go/gitstore/bt_gitstore/bt_gitstore.go
+++ b/go/gitstore/bt_gitstore/bt_gitstore.go
@@ -145,7 +145,7 @@
close(mutations)
}()
// Create IndexCommits mutations for each branch for each commit.
- for _, c := range commits {
+ for i, c := range commits {
// Validation.
if c.Index == 0 && len(c.Parents) != 0 {
return skerr.Fmt("Commit %s has index zero but has at least one parent. This cannot be correct.", c.Hash)
@@ -176,6 +176,9 @@
mutations <- b.mutationForTimestampCommit(branch, ic)
}
}
+ if i%1000 == 0 {
+ sklog.Infof("Created mutations for %d of %d commits.", i+1, len(commits))
+ }
}
return nil
})