blob: 262cdac5bcc21352014b4c8456ba83ddb987d357 [file] [log] [blame]
package ingestion
import (
"crypto/md5"
"fmt"
"io"
"path/filepath"
"sort"
"sync"
"testing"
"time"
assert "github.com/stretchr/testify/require"
"go.skia.org/infra/go/fileutil"
"go.skia.org/infra/go/sharedconfig"
"go.skia.org/infra/go/testutils"
"go.skia.org/infra/go/util"
"go.skia.org/infra/go/vcsinfo"
)
const LOCAL_STATUS_DIR = "./ingestion_status"
const RFLOCATION_CONTENT = "result file content"
func TestPollingIngester(t *testing.T) {
testutils.MediumTest(t)
testIngester(t, LOCAL_STATUS_DIR+"-polling")
}
func testIngester(t *testing.T, statusDir string) {
defer util.RemoveAll(statusDir)
now := time.Now()
beginningOfTime := now.Add(-time.Hour * 24 * 10).Unix()
const totalCommits = 100
// Instantiate mock VCS and the source.
vcs := getVCS(beginningOfTime, now.Unix(), totalCommits)
hashes := vcs.From(time.Unix(0, 0))
assert.Equal(t, totalCommits, len(hashes))
for _, h := range hashes {
assert.NotEqual(t, "", h)
}
sources := []Source{MockSource(t, vcs)}
// Instantiate the mock processor.
collected := map[string]int{}
var mutex sync.Mutex
resultFiles := []ResultFileLocation{}
processFn := func(result ResultFileLocation) error {
mutex.Lock()
defer mutex.Unlock()
collected[result.Name()] += 1
resultFiles = append(resultFiles, result)
return nil
}
finishFn := func() error { return nil }
processor := MockProcessor(processFn, finishFn)
// Instantiate ingesterConf
conf := &sharedconfig.IngesterConfig{
RunEvery: sharedconfig.ConfDuration{Duration: 1 * time.Second},
NCommits: totalCommits / 2,
MinDays: 3,
StatusDir: statusDir,
LocalCache: true,
}
// Instantiate ingester and start it.
ingester, err := NewIngester("test-ingester", conf, vcs, sources, processor)
assert.NoError(t, err)
ingester.Start()
// Wait until we have collected the desired result, but no more than two seconds.
startTime := time.Now()
for {
mutex.Lock()
colen := len(collected)
mutex.Unlock()
if colen >= (totalCommits/2) || (time.Now().Sub(startTime) > (time.Second * 2)) {
break
}
time.Sleep(time.Millisecond * 100)
}
assert.Equal(t, totalCommits/2, len(collected))
for _, count := range collected {
assert.Equal(t, 1, count)
}
for _, result := range sources[0].(*mockSource).data[totalCommits/2:] {
_, ok := collected[result.Name()]
assert.True(t, ok)
}
// Make sure that all the files were written to disk.
ingester.syncFileWrite()
for _, result := range resultFiles {
fPath := filepath.Join(ingester.resultFilesDir, result.Name())
assert.True(t, fileutil.FileExists(fPath), fmt.Sprintf("File: %s does not exist", fPath))
}
}
// mock processor
type mockProcessor struct {
process func(ResultFileLocation) error
finish func() error
}
func MockProcessor(process func(ResultFileLocation) error, finish func() error) Processor {
return &mockProcessor{
process: process,
finish: finish,
}
}
func (m *mockProcessor) Process(resultsFile ResultFileLocation) error {
return m.process(resultsFile)
}
func (m *mockProcessor) BatchFinished() error {
return m.finish()
}
type mockRFLocation struct {
path string
md5 string
lastUpdated int64
}
func (m *mockRFLocation) Open() (io.ReadCloser, error) { return nil, nil }
func (m *mockRFLocation) Name() string { return m.path }
func (m *mockRFLocation) MD5() string { return m.md5 }
func (m *mockRFLocation) TimeStamp() int64 { return m.lastUpdated }
func (m *mockRFLocation) Content() []byte { return []byte(RFLOCATION_CONTENT) }
func rfLocation(t time.Time, fname string) ResultFileLocation {
path := fmt.Sprintf("root/%d/%d/%d/%d/%d/%s", t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), fname)
return &mockRFLocation{
path: path,
md5: fmt.Sprintf("%x", md5.Sum([]byte(path))),
lastUpdated: t.Unix(),
}
}
// mock source
type mockSource struct {
data []ResultFileLocation
}
func MockSource(t *testing.T, vcs vcsinfo.VCS) Source {
hashes := vcs.From(time.Unix(0, 0))
ret := make([]ResultFileLocation, 0, len(hashes))
for _, h := range hashes {
detail, err := vcs.Details(h, true)
assert.NoError(t, err)
ret = append(ret, rfLocation(detail.Timestamp, fmt.Sprintf("result-file-%s", h)))
}
return &mockSource{
data: ret,
}
}
func (m *mockSource) Poll(startTime, endTime int64) ([]ResultFileLocation, error) {
startIdx := sort.Search(len(m.data), func(i int) bool { return m.data[i].TimeStamp() >= startTime })
endIdx := startIdx
for ; (endIdx < len(m.data)) && (m.data[endIdx].TimeStamp() <= endTime); endIdx++ {
}
return m.data[startIdx:endIdx], nil
}
func (m mockSource) ID() string {
return "test-source"
}
// return a mock vcs
func getVCS(start, end int64, nCommits int) vcsinfo.VCS {
commits := make([]*vcsinfo.LongCommit, 0, nCommits)
inc := (end - start - 3600) / int64(nCommits)
t := start
for i := 0; i < nCommits; i++ {
commits = append(commits, &vcsinfo.LongCommit{
ShortCommit: &vcsinfo.ShortCommit{
Hash: fmt.Sprintf("hash-%d", i),
Subject: fmt.Sprintf("Commit #%d", i),
},
Timestamp: time.Unix(t, 0),
})
t += inc
}
return MockVCS(commits)
}
func TestRflQueue(t *testing.T) {
testutils.SmallTest(t)
locs := []ResultFileLocation{
rfLocation(time.Now(), "1"),
rfLocation(time.Now(), "2"),
rfLocation(time.Now(), "3"),
rfLocation(time.Now(), "4"),
rfLocation(time.Now(), "5"),
}
queue := rflQueue([]ResultFileLocation{})
queue.push(locs[0:3])
queue.push(locs[3:])
assert.Equal(t, locs, []ResultFileLocation(queue))
queue.clear()
assert.Equal(t, 0, len(queue))
}