blob: 80876bb13a44a4b516361fe9c987d4d0d8141368 [file] [log] [blame]
package ingestion
import (
"context"
"crypto/md5"
"fmt"
"io"
"regexp"
"sort"
"sync"
"testing"
"time"
"github.com/stretchr/testify/mock"
assert "github.com/stretchr/testify/require"
"go.skia.org/infra/go/config"
"go.skia.org/infra/go/eventbus"
mockeventbus "go.skia.org/infra/go/eventbus/mocks"
"go.skia.org/infra/go/ingestion/mocks"
"go.skia.org/infra/go/sharedconfig"
"go.skia.org/infra/go/testutils"
"go.skia.org/infra/go/testutils/unittest"
"go.skia.org/infra/go/vcsinfo"
mockvcs "go.skia.org/infra/go/vcsinfo/mocks"
)
const (
RFLOCATION_CONTENT = "result file content"
PROJECT_ID = "test-project-ingestion"
TEST_TOPIC = "test-topic-ingestion-testing"
TEST_SUBSCRIBER = "test-subscriber-ingestion-testing"
TEST_BUCKET_ID = "test-bucket"
)
func TestPollingIngester(t *testing.T) {
unittest.LargeTest(t)
eventBus := eventbus.New()
ctx := context.Background()
now := time.Now()
beginningOfTime := now.Add(-time.Hour * 24 * 10).Unix()
const totalCommits = 100
mis := &mocks.IngestionStore{}
defer mis.AssertExpectations(t)
mis.On("ContainsResultFileHash", mock.Anything, mock.Anything).Return(false, nil)
mis.On("SetResultFileHash", mock.Anything, mock.Anything).Return(nil)
// Instantiate mock VCS and the source.
vcs := getVCS(beginningOfTime, now.Unix(), totalCommits)
hashes := vcs.From(time.Unix(0, 0))
assert.Equal(t, totalCommits, len(hashes))
for _, h := range hashes {
assert.NotEqual(t, "", h)
}
sources := []Source{MockSource(t, TEST_BUCKET_ID, "root", vcs, eventBus)}
// Instantiate the mock processor.
collected := map[string]int{}
var mutex sync.Mutex
resultFiles := []ResultFileLocation{}
processFn := func(result ResultFileLocation) error {
mutex.Lock()
defer mutex.Unlock()
collected[result.Name()] += 1
resultFiles = append(resultFiles, result)
return nil
}
processor := MockProcessor(processFn)
// Instantiate ingesterConf
conf := &sharedconfig.IngesterConfig{
RunEvery: config.Duration{Duration: 5 * time.Second},
NCommits: totalCommits / 2,
MinDays: 3,
}
// Instantiate ingester and start it.
ingester, err := NewIngester("test-ingester", conf, vcs, sources, processor, mis, eventBus)
assert.NoError(t, err)
assert.NoError(t, ingester.Start(ctx))
// Clean up the ingester at the end.
defer testutils.AssertCloses(t, ingester)
assert.NoError(t, testutils.EventuallyConsistent(5*time.Second, func() error {
mutex.Lock()
colen := len(collected)
mutex.Unlock()
if colen >= (totalCommits / 2) {
return nil
}
return testutils.TryAgainErr
}))
for _, count := range collected {
assert.Equal(t, 1, count)
}
for _, result := range sources[0].(*mockSource).data[totalCommits/2:] {
_, ok := collected[result.Name()]
assert.True(t, ok)
}
}
// TestGetStartTimeOfInterestDays checks that we compute the time to start
// polling for commits properly in the case that the commits returned in
// last 3 days exceeds the NCommits we want to scan.
func TestGetStartTimeOfInterestDays(t *testing.T) {
unittest.SmallTest(t)
// We have to provide NewIngester non-nil eventbus and ingestionstore.
meb := &mockeventbus.EventBus{}
mis := &mocks.IngestionStore{}
mvs := &mockvcs.VCS{}
defer meb.AssertExpectations(t)
defer mis.AssertExpectations(t)
defer mvs.AssertExpectations(t)
// arbitrary date
now := time.Date(2019, 8, 5, 11, 20, 0, 0, time.UTC)
threeDaysAgo := now.Add(-3 * 24 * time.Hour)
alphaTime := time.Date(2019, 8, 2, 17, 35, 0, 0, time.UTC)
hashes := []string{"alpha", "beta", "gamma", "delta", "epsilon"}
mvs.On("Update", anyCtx, true, false).Return(nil)
mvs.On("From", threeDaysAgo).Return(hashes)
mvs.On("Details", anyCtx, "alpha", false).Return(&vcsinfo.LongCommit{
// The function only cares about the timestamp
Timestamp: alphaTime,
}, nil)
conf := &sharedconfig.IngesterConfig{
NCommits: 2,
MinDays: 3,
}
i, err := NewIngester("test-ingester-1", conf, mvs, nil, nil, mis, meb)
assert.NoError(t, err)
defer testutils.AssertCloses(t, i)
ts, err := i.getStartTimeOfInterest(context.Background(), now)
assert.NoError(t, err)
assert.Equal(t, alphaTime.Unix(), ts)
}
// TestGetStartTimeOfInterestCommits checks that we compute the time to start
// polling for commits properly in the case that the commits returned in
// last 3 days does not exceed the NCommits we want to scan.
func TestGetStartTimeOfInterestCommits(t *testing.T) {
unittest.SmallTest(t)
// We have to provide NewIngester non-nil eventbus and ingestionstore.
meb := &mockeventbus.EventBus{}
mis := &mocks.IngestionStore{}
mvs := &mockvcs.VCS{}
defer meb.AssertExpectations(t)
defer mis.AssertExpectations(t)
defer mvs.AssertExpectations(t)
// arbitrary date
now := time.Date(2019, 8, 5, 11, 20, 0, 0, time.UTC)
threeDaysAgo := now.Add(-3 * 24 * time.Hour)
sixDaysAgo := now.Add(-6 * 24 * time.Hour)
betaTime := time.Date(2019, 8, 1, 17, 35, 0, 0, time.UTC)
hashes := []string{"alpha", "beta", "gamma", "delta", "epsilon"}
mvs.On("Update", anyCtx, true, false).Return(nil)
mvs.On("From", threeDaysAgo).Return(hashes[3:])
mvs.On("From", sixDaysAgo).Return(hashes)
// Since we retrieve 5 commits, the algorithm trims it to NCommits
// when it has to query more.
mvs.On("Details", anyCtx, "beta", false).Return(&vcsinfo.LongCommit{
// The function only cares about the timestamp
Timestamp: betaTime,
}, nil)
conf := &sharedconfig.IngesterConfig{
NCommits: 4,
MinDays: 3,
}
i, err := NewIngester("test-ingester-2", conf, mvs, nil, nil, mis, meb)
assert.NoError(t, err)
defer testutils.AssertCloses(t, i)
ts, err := i.getStartTimeOfInterest(context.Background(), now)
assert.NoError(t, err)
assert.Equal(t, betaTime.Unix(), ts)
}
// TestGetStartTimeOfInterestNotEnough makes sure we don't loop infinitely
// if there are not enough commits in the repo to fulfill the NCommits.
func TestGetStartTimeOfInterestNotEnough(t *testing.T) {
unittest.SmallTest(t)
// We have to provide NewIngester non-nil eventbus and ingestionstore.
meb := &mockeventbus.EventBus{}
mis := &mocks.IngestionStore{}
mvs := &mockvcs.VCS{}
defer meb.AssertExpectations(t)
defer mis.AssertExpectations(t)
defer mvs.AssertExpectations(t)
// arbitrary date
now := time.Date(2019, 8, 5, 11, 20, 0, 0, time.UTC)
alphaTime := time.Date(2019, 8, 2, 17, 35, 0, 0, time.UTC)
hashes := []string{"alpha", "beta", "gamma", "delta", "epsilon"}
mvs.On("Update", anyCtx, true, false).Return(nil)
mvs.On("From", mock.AnythingOfType("time.Time")).Return(hashes)
mvs.On("Details", anyCtx, "alpha", false).Return(&vcsinfo.LongCommit{
// The function only cares about the timestamp
Timestamp: alphaTime,
}, nil)
conf := &sharedconfig.IngesterConfig{
NCommits: 100,
MinDays: 3,
}
i, err := NewIngester("test-ingester-3", conf, mvs, nil, nil, mis, meb)
assert.NoError(t, err)
defer testutils.AssertCloses(t, i)
ts, err := i.getStartTimeOfInterest(context.Background(), now)
assert.NoError(t, err)
assert.Equal(t, alphaTime.Unix(), ts)
}
// TODO(kjlubick): replace these with mockery-based mocks
// mock processor
type mockProcessor struct {
process func(ResultFileLocation) error
}
func MockProcessor(process func(ResultFileLocation) error) Processor {
return &mockProcessor{
process: process,
}
}
func (m *mockProcessor) Process(ctx context.Context, resultsFile ResultFileLocation) error {
return m.process(resultsFile)
}
type mockRFLocation struct {
path string
bucketID string
objectID string
md5 string
lastUpdated int64
}
func (m *mockRFLocation) Open() (io.ReadCloser, error) { return nil, nil }
func (m *mockRFLocation) Name() string { return m.path }
func (m *mockRFLocation) StorageIDs() (string, string) { return m.bucketID, m.objectID }
func (m *mockRFLocation) MD5() string { return m.md5 }
func (m *mockRFLocation) TimeStamp() int64 { return m.lastUpdated }
func (m *mockRFLocation) Content() []byte { return []byte(RFLOCATION_CONTENT) }
func rfLocation(timeStamp int64, bucketID, objectID string) ResultFileLocation {
path := bucketID + "/" + objectID
return &mockRFLocation{
bucketID: bucketID,
objectID: objectID,
path: path,
md5: fmt.Sprintf("%x", md5.Sum([]byte(path))),
lastUpdated: timeStamp,
}
}
// mock source
type mockSource struct {
data []ResultFileLocation
eventBus eventbus.EventBus
bucketID string
objectPrefix string
regExp *regexp.Regexp
}
func MockSource(t *testing.T, bucketID string, objectPrefix string, vcs vcsinfo.VCS, eventBus eventbus.EventBus) Source {
hashes := vcs.From(time.Unix(0, 0))
ret := make([]ResultFileLocation, 0, len(hashes))
for _, h := range hashes {
detail, err := vcs.Details(context.Background(), h, false)
assert.NoError(t, err)
t := detail.Timestamp
objPrefix := fmt.Sprintf("%s/%d/%d/%d/%d/%d", objectPrefix, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute())
objectID := fmt.Sprintf("%s/result-file-%s", objPrefix, h)
ret = append(ret, rfLocation(detail.Timestamp.Unix(), bucketID, objectID))
}
return &mockSource{
data: ret,
bucketID: bucketID,
objectPrefix: objectPrefix,
eventBus: eventBus,
}
}
func (m *mockSource) Poll(startTime, endTime int64) <-chan ResultFileLocation {
ch := make(chan ResultFileLocation)
go func() {
startIdx := sort.Search(len(m.data), func(i int) bool { return m.data[i].TimeStamp() >= startTime })
endIdx := startIdx
for ; (endIdx < len(m.data)) && (m.data[endIdx].TimeStamp() <= endTime); endIdx++ {
}
for _, entry := range m.data[startIdx:endIdx] {
ch <- entry
}
close(ch)
}()
return ch
}
func (m mockSource) ID() string {
return "test-source"
}
func (m *mockSource) SetEventChannel(resultCh chan<- ResultFileLocation) error {
eventType, err := m.eventBus.RegisterStorageEvents(m.bucketID, m.objectPrefix, m.regExp, nil)
if err != nil {
return err
}
m.eventBus.SubscribeAsync(eventType, func(evData interface{}) {
file := evData.(*eventbus.StorageEvent)
resultCh <- rfLocation(file.TimeStamp, file.BucketID, file.ObjectID)
})
return nil
}
// return a mock vcs
func getVCS(start, end int64, nCommits int) vcsinfo.VCS {
commits := make([]*vcsinfo.LongCommit, 0, nCommits)
inc := (end - start - 3600) / int64(nCommits)
t := start
for i := 0; i < nCommits; i++ {
commits = append(commits, &vcsinfo.LongCommit{
ShortCommit: &vcsinfo.ShortCommit{
Hash: fmt.Sprintf("hash-%d", i),
Subject: fmt.Sprintf("Commit #%d", i),
},
Timestamp: time.Unix(t, 0),
})
t += inc
}
return mockvcs.DeprecatedMockVCS(commits, nil, nil)
}
var anyCtx = mock.AnythingOfType("*context.emptyCtx")