blob: 1cf9d8c00036d7cf6f1d113d77e6eda870088490 [file] [log] [blame]
package ds_expstore
import (
assert ""
ds_testutil ""
var testKinds = []ds.Kind{
func TestMasterCloudExpectationsStore(t *testing.T) {
cleanup := initDS(t)
defer cleanup()
// Test the DS backed store for master.
masterEventBus := eventbus.New()
cloudStore, _, err := New(ds.DS, masterEventBus)
assert.NoError(t, err)
testExpectationStore(t, cloudStore, masterEventBus, 0, expstorage.EV_EXPSTORAGE_CHANGED)
testCloudExpstoreClear(t, cloudStore)
func testCloudExpstoreClear(t *testing.T, cloudStore expstorage.ExpectationsStore) {
// Make sure the clear works.
ctx := context.Background()
assert.NoError(t, cloudStore.Clear(ctx))
assert.NoError(t, testutils.EventuallyConsistent(5*time.Second, func() error {
for _, kind := range testKinds {
count, err := ds.DS.Count(ctx, ds.NewQuery(kind).KeysOnly())
assert.NoError(t, err)
if count > 0 {
return testutils.TryAgainErr
return nil
func TestIssueCloudExpectationsStore(t *testing.T) {
cleanup := initDS(t)
defer cleanup()
// Test the expectation store for an individual issue.
masterEventBus := eventbus.New()
_, issueStoreFactory, err := New(ds.DS, masterEventBus)
assert.NoError(t, err)
issueID := int64(1234567)
issueStore := issueStoreFactory(issueID)
testExpectationStore(t, issueStore, masterEventBus, issueID, expstorage.EV_TRYJOB_EXP_CHANGED)
testCloudExpstoreClear(t, issueStore)
// initDS initializes the datastore for testing.
func initDS(t *testing.T, kinds ...ds.Kind) func() {
initKinds := []ds.Kind{}
initKinds = append(initKinds, testKinds...)
initKinds = append(initKinds, kinds...)
return ds_testutil.InitDatastore(t, initKinds...)
// Test against the expectation store interface.
func testExpectationStore(t *testing.T, store expstorage.ExpectationsStore, eventBus eventbus.EventBus, issueID int64, eventType string) {
// Get the initial log size. This is necessary because we
// call this function multiple times with the same underlying
// ExpectationStore.
ctx := context.Background()
initialLogRecs, initialLogTotal, err := store.QueryLog(ctx, 0, 100, true)
assert.NoError(t, err)
initialLogRecsLen := len(initialLogRecs)
// Request expectations and make sure they are empty.
emptyExp, err := store.Get()
assert.NoError(t, err)
assert.Empty(t, emptyExp)
// If we have an event bus then keep gathering events.
callbackCh := make(chan types.TestNameSlice, 3)
if eventBus != nil {
eventBus.SubscribeAsync(eventType, func(e interface{}) {
evData := e.(*expstorage.EventExpectationChange)
if (issueID > 0) && (evData.IssueID != issueID) {
testNames := make(types.TestNameSlice, 0, len(evData.TestChanges))
for testName := range evData.TestChanges {
testNames = append(testNames, testName)
callbackCh <- testNames
TEST_1, TEST_2 := types.TestName("test1"), types.TestName("test2")
// digests
DIGEST_11, DIGEST_12 := types.Digest("d11"), types.Digest("d12")
DIGEST_21, DIGEST_22 := types.Digest("d21"), types.Digest("d22")
expChange_1 := types.Expectations{
TEST_1: {
TEST_2: {
logEntry_1 := []*expstorage.TriageDetail{
{TestName: TEST_1, Digest: DIGEST_11, Label: "positive"},
{TestName: TEST_1, Digest: DIGEST_12, Label: "negative"},
{TestName: TEST_2, Digest: DIGEST_21, Label: "positive"},
{TestName: TEST_2, Digest: DIGEST_22, Label: "negative"},
assert.NoError(t, store.AddChange(ctx, expChange_1, "user-0"))
if eventBus != nil {
found := waitForChanLen(t, callbackCh, 1)
assert.Equal(t, types.TestNameSlice{TEST_1, TEST_2}, found[0])
// TODO(kjlubick): assert something with foundExps
foundExps, err := store.Get()
assert.NoError(t, err)
assert.Equal(t, expChange_1, foundExps)
checkLogEntry(t, store, expChange_1)
// Update digests.
expChange_2 := types.Expectations{
TEST_1: {
TEST_2: {
logEntry_2 := []*expstorage.TriageDetail{
{TestName: TEST_1, Digest: DIGEST_11, Label: "negative"},
{TestName: TEST_2, Digest: DIGEST_22, Label: "untriaged"},
assert.NoError(t, store.AddChange(ctx, expChange_2, "user-1"))
if eventBus != nil {
found := waitForChanLen(t, callbackCh, 1)
assert.Equal(t, types.TestNameSlice{TEST_1, TEST_2}, found[0])
foundTestExp, err := store.Get()
assert.NoError(t, err)
assert.Equal(t, types.NEGATIVE, foundTestExp[TEST_1][DIGEST_11])
assert.Equal(t, types.UNTRIAGED, foundTestExp[TEST_2][DIGEST_22])
checkLogEntry(t, store, expChange_2)
// Send empty changes to test the event bus.
emptyChanges := types.Expectations{}
assert.NoError(t, store.AddChange(ctx, emptyChanges, "user-2"))
if eventBus != nil {
found := waitForChanLen(t, callbackCh, 1)
assert.Empty(t, found[0])
checkLogEntry(t, store, emptyChanges)
foundExps, err = store.Get()
assert.NoError(t, err)
// Make sure we added the correct number of triage log entries.
addedRecs := 3
logEntries, total, err := store.QueryLog(ctx, 0, 5, true)
assert.NoError(t, err)
assert.Equal(t, addedRecs+initialLogTotal, total)
assert.Equal(t, util.MinInt(addedRecs+initialLogRecsLen, 5), len(logEntries))
lastRec := logEntries[0]
secondToLastRec := logEntries[1]
assert.Equal(t, 0, len(logEntries[0].Details))
assert.Equal(t, logEntry_2, logEntries[1].Details)
assert.Equal(t, logEntry_1, logEntries[2].Details)
logEntries, total, err = store.QueryLog(ctx, 100, 5, true)
assert.NoError(t, err)
assert.Equal(t, addedRecs+initialLogTotal, total)
assert.Equal(t, 0, len(logEntries))
// Undo the latest version and make sure the corresponding record is correct.
changes, err := store.UndoChange(ctx, parseID(t, lastRec.ID), "user-1")
assert.NoError(t, err)
checkLogEntry(t, store, changes)
changes, err = store.UndoChange(ctx, parseID(t, secondToLastRec.ID), "user-1")
assert.NoError(t, err)
checkLogEntry(t, store, changes)
addedRecs += 2
logEntries, total, err = store.QueryLog(ctx, 0, 2, true)
assert.NoError(t, err)
assert.Equal(t, addedRecs+initialLogTotal, total)
assert.Equal(t, 0, len(logEntries[1].Details))
assert.Equal(t, 2, len(logEntries[0].Details))
foundTestExp, err = store.Get()
assert.NoError(t, err)
for testName, digests := range expChange_2 {
for d := range digests {
_, ok := foundTestExp[testName][d]
assert.True(t, ok)
assert.Equal(t, expChange_1[testName][d].String(), foundTestExp[testName][d].String())
// Make sure undoing the previous undo causes an error.
logEntries, _, err = store.QueryLog(ctx, 0, 1, false)
assert.NoError(t, err)
assert.Equal(t, 1, len(logEntries))
_, err = store.UndoChange(ctx, parseID(t, logEntries[0].ID), "user-1")
assert.NotNil(t, err)
// waitForChan removes 'targetLen' elements from the channel and returns them.
// If the given number of items are not returned within one second the test fails.
func waitForChanLen(t *testing.T, ch chan types.TestNameSlice, targetLen int) []types.TestNameSlice {
ret := make([]types.TestNameSlice, 0, targetLen)
assert.NoError(t, testutils.EventuallyConsistent(time.Second, func() error {
select {
case ele := <-ch:
ret = append(ret, ele)
if len(ret) != targetLen {
return testutils.TryAgainErr
return nil
return ret
func parseID(t *testing.T, idStr string) int64 {
ret, err := strconv.ParseInt(idStr, 10, 64)
assert.NoError(t, err)
return ret
func checkLogEntry(t *testing.T, store expstorage.ExpectationsStore, changes types.Expectations) {
logEntries, _, err := store.QueryLog(context.Background(), 0, 1, true)
assert.NoError(t, err)
assert.Equal(t, 1, len(logEntries))
counter := 0
for _, digests := range changes {
counter += len(digests)
assert.Equal(t, counter, len(logEntries[0].Details))
for _, d := range logEntries[0].Details {
_, ok := changes[d.TestName][d.Digest]
assert.True(t, ok)
assert.Equal(t, changes[d.TestName][d.Digest].String(), d.Label)