blob: 4fefa63538fef6befec4e504824e0dbc260e0736 [file] [log] [blame]
package watcher
import (
"context"
"errors"
"fmt"
"io/ioutil"
"strings"
"sync"
"testing"
"time"
assert "github.com/stretchr/testify/require"
"go.skia.org/infra/go/deepequal"
"go.skia.org/infra/go/gcs/test_gcsclient"
"go.skia.org/infra/go/git"
"go.skia.org/infra/go/git/repograph"
repograph_shared_tests "go.skia.org/infra/go/git/repograph/shared_tests"
git_testutils "go.skia.org/infra/go/git/testutils"
"go.skia.org/infra/go/gitiles"
gitiles_testutils "go.skia.org/infra/go/gitiles/testutils"
"go.skia.org/infra/go/gitstore"
"go.skia.org/infra/go/gitstore/mocks"
gitstore_testutils "go.skia.org/infra/go/gitstore/testutils"
"go.skia.org/infra/go/mockhttpclient"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/testutils"
"go.skia.org/infra/go/testutils/unittest"
"go.skia.org/infra/go/util"
"go.skia.org/infra/go/vcsinfo"
)
func TestIngestCommits(t *testing.T) {
unittest.SmallTest(t)
ctx := context.Background()
gs := &mocks.GitStore{}
ri := &repoImpl{
MemCacheRepoImpl: repograph.NewMemCacheRepoImpl(nil, nil),
gitstore: gs,
}
idx := 0
makeCommits := func(hashes ...string) []*vcsinfo.LongCommit {
rv := make([]*vcsinfo.LongCommit, 0, len(hashes))
for _, h := range hashes {
rv = append(rv, &vcsinfo.LongCommit{
ShortCommit: &vcsinfo.ShortCommit{
Hash: h,
},
Index: idx,
Branches: map[string]bool{"master": true},
})
idx++
}
return rv
}
totalIngested := 0
assertNew := func(numNew int) {
totalIngested += numNew
assert.Equal(t, totalIngested, len(ri.Commits))
}
// There's a data race between the mocked GitStore's use of reflection
// and the lock/unlock of a mutex in Context.Err(). We add a mutex here
// to avoid that problem.
var ctxMtx sync.Mutex
process := func(ctx context.Context, cb *commitBatch) error {
ctxMtx.Lock()
err := ri.gitstore.Put(ctx, cb.commits)
ctxMtx.Unlock()
if err != nil {
return err
}
for _, c := range cb.commits {
ri.Commits[c.Hash] = c
}
return nil
}
// Ingest a single commit.
assert.NoError(t, ri.processCommits(ctx, process, func(ctx context.Context, ch chan<- *commitBatch) error {
commits := makeCommits("abc123")
gs.On("Put", ctx, commits).Return(nil)
gs.On("PutBranches", ctx, map[string]string{"master": commits[len(commits)-1].Hash}).Return(nil)
ch <- &commitBatch{
commits: commits,
}
return nil
}))
assertNew(1)
// Ingest a series of commits.
assert.NoError(t, ri.processCommits(ctx, process, func(ctx context.Context, ch chan<- *commitBatch) error {
for i := 1; i < 5; i++ {
hashes := make([]string, 0, i)
for j := 0; j < i; j++ {
hashes = append(hashes, fmt.Sprintf("%dabc%d", i, j))
}
commits := makeCommits(hashes...)
gs.On("Put", ctx, commits).Return(nil)
gs.On("PutBranches", ctx, map[string]string{"master": commits[len(commits)-1].Hash}).Return(nil)
ch <- &commitBatch{
commits: commits,
}
}
return nil
}))
assertNew(1 + 2 + 3 + 4)
// If the passed-in func returns an error, it should propagate, and the
// previously-queued commits should still get ingested.
err := errors.New("commit retrieval failed.")
assert.Equal(t, err, ri.processCommits(ctx, process, func(ctx context.Context, ch chan<- *commitBatch) error {
commits := makeCommits("def456")
gs.On("Put", ctx, commits).Return(nil)
gs.On("PutBranches", ctx, map[string]string{"master": commits[len(commits)-1].Hash}).Return(nil)
ch <- &commitBatch{
commits: commits,
}
return err
}))
assertNew(1)
// Ensure that the context gets canceled if ingestion fails.
err = ri.processCommits(ctx, process, func(ctx context.Context, ch chan<- *commitBatch) error {
for i := 5; i < 10; i++ {
// See the above comment about mocks, reflect, and race.
ctxMtx.Lock()
err := ctx.Err()
ctxMtx.Unlock()
if err != nil {
return err
}
hashes := make([]string, 0, i)
for j := 0; j < i; j++ {
hashes = append(hashes, fmt.Sprintf("%dabc%d", i, j))
}
commits := makeCommits(hashes...)
if i == 7 {
gs.On("Put", ctx, commits).Return(errors.New("commit ingestion failed."))
} else {
gs.On("Put", ctx, commits).Return(nil)
gs.On("PutBranches", ctx, map[string]string{"master": commits[len(commits)-1].Hash}).Return(nil)
}
ch <- &commitBatch{
commits: commits,
}
}
return nil
})
sklog.Errorf("Error: %s", err.Error())
assert.True(t, strings.Contains(err.Error(), "commit ingestion failed"))
assert.True(t, strings.Contains(err.Error(), "and commit-loading func failed with: context canceled"))
assertNew(5 + 6)
}
// gitsyncRefresher is an implementation of repograph_shared_tests.RepoImplRefresher
// used for testing gitsync.
type gitsyncRefresher struct {
gitiles *gitiles_testutils.MockRepo
graph *repograph.Graph
gs gitstore.GitStore
initialSync bool
oldBranches map[string]string
repo *git.Repo
t *testing.T
}
func newGitsyncRefresher(t *testing.T, ctx context.Context, gs gitstore.GitStore, gb *git_testutils.GitBuilder, mr *gitiles_testutils.MockRepo) repograph_shared_tests.RepoImplRefresher {
repo := &git.Repo{GitDir: git.GitDir(gb.Dir())}
branches, err := repo.Branches(ctx)
assert.NoError(t, err)
oldBranches := make(map[string]string, len(branches))
for _, b := range branches {
oldBranches[b.Name] = b.Head
}
return &gitsyncRefresher{
gitiles: mr,
graph: nil, // Set later in setupGitsync, after the graph is created.
gs: gs,
initialSync: true,
oldBranches: oldBranches,
repo: repo,
t: t,
}
}
func (u *gitsyncRefresher) Refresh(commits ...*vcsinfo.LongCommit) {
ctx := context.Background()
assert.True(u.t, u.gitiles.Empty())
// Check the GitStore contents before updating the underlying repo.
u.checkIngestion(ctx)
// Update the backing repo.
assert.NoError(u.t, u.repo.Update(ctx))
// Mock calls to gitiles.
branches, err := u.repo.Branches(ctx)
assert.NoError(u.t, err)
branchMap := make(map[string]string, len(branches))
for _, b := range branches {
oldHead := u.oldBranches[b.Name]
if b.Head != oldHead {
logExpr := b.Head
if oldHead != "" {
logExpr = fmt.Sprintf("%s..%s", oldHead, b.Head)
}
var opts []gitiles.LogOption
if u.initialSync && b.Name == "master" {
opts = append(opts, gitiles.LogReverse(), gitiles.LogBatchSize(batchSize))
}
u.gitiles.MockLog(ctx, logExpr, opts...)
}
branchMap[b.Name] = b.Head
}
u.oldBranches = branchMap
u.gitiles.MockBranches(ctx)
if u.initialSync {
u.gitiles.MockBranches(ctx)
u.initialSync = false
}
}
// checkIngestion asserts that the contents of the GitStore match those of the
// repograph.Graph.
func (u *gitsyncRefresher) checkIngestion(ctx context.Context) {
if u.graph == nil {
return
}
// Wait for GitStore to be up to date.
branchHeads := u.graph.BranchHeads()
expectBranches := make(map[string]string, len(branchHeads))
for _, b := range branchHeads {
expectBranches[b.Name] = b.Head
}
assert.NoError(u.t, testutils.EventuallyConsistent(time.Second, func() error {
actual, err := u.gs.GetBranches(ctx)
assert.NoError(u.t, err)
for name, expect := range expectBranches {
actualBranch, ok := actual[name]
if !ok || actualBranch.Head != expect {
sklog.Errorf("%s is %+v, expect %s", name, actualBranch, expect)
time.Sleep(10 * time.Millisecond)
return testutils.TryAgainErr
}
}
for name := range actual {
if _, ok := expectBranches[name]; name != gitstore.ALL_BRANCHES && !ok {
sklog.Errorf("Expected %s not to be present", name)
time.Sleep(10 * time.Millisecond)
return testutils.TryAgainErr
}
}
return nil
}))
// Assert that the branch heads are the same.
gotBranches, err := u.gs.GetBranches(ctx)
assert.NoError(u.t, err)
delete(gotBranches, gitstore.ALL_BRANCHES)
assert.Equal(u.t, len(expectBranches), len(gotBranches))
for name, head := range expectBranches {
assert.Equal(u.t, head, gotBranches[name].Head)
}
// Assert that all LongCommits are present and correct.
iCommits, err := u.gs.RangeByTime(ctx, vcsinfo.MinTime, vcsinfo.MaxTime, gitstore.ALL_BRANCHES)
assert.NoError(u.t, err)
hashes := make([]string, 0, len(iCommits))
for _, c := range iCommits {
hashes = append(hashes, c.Hash)
}
longCommits, err := u.gs.Get(ctx, hashes)
assert.NoError(u.t, err)
commits := make(map[string]*vcsinfo.LongCommit, len(hashes))
for _, c := range longCommits {
assert.NotNil(u.t, c)
commits[c.Hash] = c
}
for _, c := range u.graph.GetAll() {
deepequal.AssertDeepEqual(u.t, c.LongCommit, commits[c.Hash])
}
// Assert that the IndexCommits are correct for each branch.
for name := range expectBranches {
branchPtr := gotBranches[name]
branchCommits, err := u.graph.LogLinear("", name)
assert.NoError(u.t, err)
expectIndexCommits := make([]*vcsinfo.IndexCommit, 0, len(branchCommits))
for i := len(branchCommits) - 1; i >= 0; i-- {
c := branchCommits[i]
expectIndexCommits = append(expectIndexCommits, &vcsinfo.IndexCommit{
Hash: c.Hash,
Index: len(expectIndexCommits),
Timestamp: c.Timestamp.UTC(),
})
}
// RangeN.
gotIndexCommits, err := u.gs.RangeN(ctx, 0, branchPtr.Index+1, name)
assert.NoError(u.t, err)
deepequal.AssertDeepEqual(u.t, expectIndexCommits, gotIndexCommits)
// RangeByTime.
gotIndexCommits, err = u.gs.RangeByTime(ctx, vcsinfo.MinTime, vcsinfo.MaxTime, name)
assert.NoError(u.t, err)
deepequal.AssertDeepEqual(u.t, expectIndexCommits, gotIndexCommits)
}
}
// setupGitsync performs common setup for GitStore based Graphs.
func setupGitsync(t *testing.T) (context.Context, *git_testutils.GitBuilder, *repograph.Graph, repograph_shared_tests.RepoImplRefresher, func()) {
ctx, g, cleanup := repograph_shared_tests.CommonSetup(t)
wd, err := ioutil.TempDir("", "")
assert.NoError(t, err)
defer util.RemoveAll(wd)
_, _, gs := gitstore_testutils.SetupAndLoadBTGitStore(t, ctx, wd, g.RepoUrl(), true)
urlMock := mockhttpclient.NewURLMock()
mockRepo := gitiles_testutils.NewMockRepo(t, g.RepoUrl(), git.GitDir(g.Dir()), urlMock)
repo := gitiles.NewRepo(g.RepoUrl(), "", urlMock.Client())
gcsClient := test_gcsclient.NewMemoryClient("fake-bucket")
ri, err := newRepoImpl(ctx, gs, repo, gcsClient, "repo-ingestion")
assert.NoError(t, err)
ud := newGitsyncRefresher(t, ctx, gs, g, mockRepo)
graph, err := repograph.NewWithRepoImpl(ctx, ri)
assert.NoError(t, err)
ud.(*gitsyncRefresher).graph = graph
return ctx, g, graph, ud, cleanup
}
func TestGraphWellFormedGitSync(t *testing.T) {
unittest.LargeTest(t)
ctx, g, repo, ud, cleanup := setupGitsync(t)
defer cleanup()
repograph_shared_tests.TestGraphWellFormed(t, ctx, g, repo, ud)
}
func TestRecurseGitSync(t *testing.T) {
unittest.LargeTest(t)
ctx, g, repo, ud, cleanup := setupGitsync(t)
defer cleanup()
repograph_shared_tests.TestRecurse(t, ctx, g, repo, ud)
}
func TestRecurseAllBranchesGitSync(t *testing.T) {
unittest.LargeTest(t)
ctx, g, repo, ud, cleanup := setupGitsync(t)
defer cleanup()
repograph_shared_tests.TestRecurseAllBranches(t, ctx, g, repo, ud)
}
func TestUpdateHistoryChangedGitSync(t *testing.T) {
unittest.LargeTest(t)
ctx, g, repo, ud, cleanup := setupGitsync(t)
defer cleanup()
repograph_shared_tests.TestUpdateHistoryChanged(t, ctx, g, repo, ud)
}
func TestUpdateAndReturnCommitDiffsGitSync(t *testing.T) {
unittest.LargeTest(t)
ctx, g, repo, ud, cleanup := setupGitsync(t)
defer cleanup()
repograph_shared_tests.TestUpdateAndReturnCommitDiffs(t, ctx, g, repo, ud)
}
func TestRevListGitSync(t *testing.T) {
unittest.LargeTest(t)
ctx, g, repo, ud, cleanup := setupGitsync(t)
defer cleanup()
repograph_shared_tests.TestRevList(t, ctx, g, repo, ud)
}
func TestBranchMembershipGitSync(t *testing.T) {
unittest.LargeTest(t)
ctx, g, repo, ud, cleanup := setupGitsync(t)
defer cleanup()
repograph_shared_tests.TestBranchMembership(t, ctx, g, repo, ud)
}