[gold] Make indexer use timestamp on tryjob results.
Bug: skia:10399
Change-Id: I89f87ce19002e75fee80d6034feef736c3152423
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/297600
Reviewed-by: Leandro Lovisolo <lovisolo@google.com>
diff --git a/golden/go/indexer/indexer.go b/golden/go/indexer/indexer.go
index 653f2c0..4c921a8 100644
--- a/golden/go/indexer/indexer.go
+++ b/golden/go/indexer/indexer.go
@@ -749,7 +749,7 @@
}
// An arbitrary cut off to the amount of recent, open CLs we try to index.
- recent := time.Now().Add(-maxAgeOfOpenCLsToIndex)
+ recent := now.Add(-maxAgeOfOpenCLsToIndex)
xcl, _, err := ix.CLStore.GetChangeLists(ctx, clstore.SearchOptions{
StartIdx: 0,
Limit: maxCLsToIndex,
@@ -803,11 +803,19 @@
CRS: crs,
PS: latestPS.SystemID,
}
- xtjr, err := ix.TryJobStore.GetResults(ctx, psID)
+ afterTime := time.Time{}
+ var existingUntriagedResults []tjstore.TryJobResult
+ // Test to see if we can do an incremental index (just for results that were uploaded
+ // for this patchset since the last time we indexed).
+ if ok && clIdx.LatestPatchSet.PS == latestPS.SystemID {
+ afterTime = clIdx.ComputedTS
+ existingUntriagedResults = clIdx.UntriagedResults
+ }
+ xtjr, err := ix.TryJobStore.GetResults(ctx, psID, afterTime)
if err != nil {
return skerr.Wrap(err)
}
- untriagedResults, params := indexTryJobResults(xtjr, exps)
+ untriagedResults, params := indexTryJobResults(existingUntriagedResults, xtjr, exps)
// Copy the existing ParamSet into the newly created one. It is important to copy it from
// old into new (and not new into old), so we don't cause a race condition on the cached
// ParamSet by writing to it while GetIndexForCL is reading from it.
@@ -829,19 +837,40 @@
// indexTryJobResults goes through all the TryJobResults and returns results useful for indexing.
// Concretely, these results are a slice with just the untriaged results and a ParamSet with the
// observed params.
-func indexTryJobResults(xtjr []tjstore.TryJobResult, exps expectations.Classifier) ([]tjstore.TryJobResult, paramtools.ParamSet) {
- var untriagedResults []tjstore.TryJobResult
+func indexTryJobResults(existing, newResults []tjstore.TryJobResult, exps expectations.Classifier) ([]tjstore.TryJobResult, paramtools.ParamSet) {
params := paramtools.ParamSet{}
- for _, tjr := range xtjr {
+ var newlyUntriagedResults []tjstore.TryJobResult
+ for _, tjr := range newResults {
params.AddParams(tjr.GroupParams)
params.AddParams(tjr.ResultParams)
params.AddParams(tjr.Options)
tn := types.TestName(tjr.ResultParams[types.PrimaryKeyField])
if exps.Classification(tn, tjr.Digest) == expectations.Untriaged {
- untriagedResults = append(untriagedResults, tjr)
+ // If the same digest somehow shows up twice (maybe because of how we
+ alreadyInList := false
+ for _, existingResult := range existing {
+ if existingResult.Digest == tjr.Digest && existingResult.ResultParams[types.PrimaryKeyField] == tjr.ResultParams[types.PrimaryKeyField] {
+ alreadyInList = true
+ break
+ }
+ }
+ if !alreadyInList {
+ newlyUntriagedResults = append(newlyUntriagedResults, tjr)
+ }
}
}
- return untriagedResults, params
+ if len(newlyUntriagedResults) == 0 {
+ return existing, params
+ }
+
+ if len(existing) == 0 {
+ return newlyUntriagedResults, params
+ }
+ // make a copy of the slice, so as not to confuse the existing index.
+ combined := make([]tjstore.TryJobResult, 0, len(existing)+len(newlyUntriagedResults))
+ combined = append(combined, existing...)
+ combined = append(combined, newlyUntriagedResults...)
+ return combined, params
}
// getCLIndex is a helper that returns the appropriately typed element from changeListIndices.
diff --git a/golden/go/indexer/indexer_test.go b/golden/go/indexer/indexer_test.go
index 9cf29b2..598d283 100644
--- a/golden/go/indexer/indexer_test.go
+++ b/golden/go/indexer/indexer_test.go
@@ -272,7 +272,7 @@
"day_of_week": "wednesday",
}
- mts.On("GetResults", testutils.AnyContext, firstCombinedID).Return([]tjstore.TryJobResult{
+ mts.On("GetResults", testutils.AnyContext, firstCombinedID, time.Time{}).Return([]tjstore.TryJobResult{
{
ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
GroupParams: androidGroup,
@@ -293,7 +293,7 @@
Digest: data.AlphaUntriagedDigest,
},
}, nil)
- mts.On("GetResults", testutils.AnyContext, secondCombinedID).Return([]tjstore.TryJobResult{
+ mts.On("GetResults", testutils.AnyContext, secondCombinedID, time.Time{}).Return([]tjstore.TryJobResult{
{
ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
GroupParams: androidGroup,
@@ -361,7 +361,7 @@
assert.Equal(t, int64(2), ixr.changeListsReindexed.Get())
}
-func TestIndexer_CalcChangeListIndices_HasPreviousIndex_Success(t *testing.T) {
+func TestIndexer_CalcChangeListIndices_HasIndexForPreviousPS_Success(t *testing.T) {
unittest.SmallTest(t)
const crs = "gerrit"
@@ -406,7 +406,7 @@
"model": "crosshatch",
}
- mts.On("GetResults", testutils.AnyContext, secondPatchSetCombinedID).Return([]tjstore.TryJobResult{
+ mts.On("GetResults", testutils.AnyContext, secondPatchSetCombinedID, time.Time{}).Return([]tjstore.TryJobResult{
{
ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
GroupParams: androidGroup,
@@ -485,6 +485,121 @@
assert.Equal(t, int64(1), ixr.changeListsReindexed.Get())
}
+func TestIndexer_CalcChangeListIndices_HasIndexForCurrentPS_IncrementalUpdateSuccess(t *testing.T) {
+ unittest.SmallTest(t)
+
+ const crs = "gerrit"
+ const clID = "111111"
+ const firstPatchSet = "firstPS"
+ const firstUntriagedDigest = types.Digest("11111111111111111111")
+ const secondUntriagedDigest = types.Digest("22222222222222222222")
+ const thirdUntriagedDigest = types.Digest("33333333333333333333")
+
+ firstPatchSetCombinedID := tjstore.CombinedPSID{CL: clID, CRS: crs, PS: firstPatchSet}
+
+ longAgo := time.Date(2020, time.April, 15, 15, 15, 0, 0, time.UTC)
+ recently := time.Date(2020, time.May, 5, 12, 12, 0, 0, time.UTC)
+
+ mcs := &mock_clstore.Store{}
+ mes := &mock_expectations.Store{}
+ mts := &mock_tjstore.Store{}
+
+ // All digests are untriaged
+ masterExp := expectations.Expectations{}
+
+ // The CL has no additional expectations.
+ mes.On("Get", testutils.AnyContext).Return(&masterExp, nil)
+ loadChangeListExpectations(mes, crs, map[string]*expectations.Expectations{
+ clID: {},
+ })
+
+ mcs.On("GetChangeLists", testutils.AnyContext, mock.Anything).Return([]code_review.ChangeList{
+ {
+ SystemID: clID,
+ Updated: recently,
+ },
+ }, 0, nil)
+
+ mcs.On("GetPatchSets", testutils.AnyContext, clID).Return([]code_review.PatchSet{
+ {SystemID: firstPatchSet}, // all other fields ignored from patch set.
+ }, nil)
+ mcs.On("System").Return(crs)
+
+ androidGroup := paramtools.Params{
+ "os": "Android",
+ "model": "crosshatch",
+ }
+
+ // Note that this time is based on the previous indexed time.
+ mts.On("GetResults", testutils.AnyContext, firstPatchSetCombinedID, longAgo).Return([]tjstore.TryJobResult{
+ {
+ ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+ GroupParams: androidGroup,
+ Digest: secondUntriagedDigest,
+ },
+ {
+ ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+ GroupParams: androidGroup,
+ Digest: thirdUntriagedDigest,
+ },
+ }, nil)
+
+ ctx := context.Background()
+ ic := IndexerConfig{
+ CLStore: mcs,
+ ExpectationsStore: mes,
+ TryJobStore: mts,
+ }
+ ixr, err := New(ctx, ic, 0)
+ require.NoError(t, err)
+ ixr.changeListsReindexed.Reset()
+
+ // The scenario here is that the first index for this patchset identified two untriaged digests.
+ // Later, .
+ previousIdx := ChangeListIndex{
+ LatestPatchSet: firstPatchSetCombinedID,
+ UntriagedResults: []tjstore.TryJobResult{
+ {
+ ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+ Digest: firstUntriagedDigest,
+ // Other fields ignored
+ },
+ {
+ ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+ Digest: secondUntriagedDigest,
+ },
+ },
+ ParamSet: paramtools.ParamSet{
+ // This ParamSet is purposely incomplete (i.e. no data.AlphaTest) to make sure new data is
+ // merged in correctly.
+ types.PrimaryKeyField: []string{"this_test_was_here_before"},
+ "os": []string{"Android", "iOS"},
+ "model": []string{"bluefish", "redfish"},
+ },
+ ComputedTS: longAgo,
+ }
+ ixr.changeListIndices.Set("gerrit_111111", &previousIdx, 0)
+
+ ixr.calcChangeListIndices(ctx)
+
+ clIdx := ixr.GetIndexForCL(crs, clID)
+ assert.NotNil(t, clIdx)
+ assert.Equal(t, firstPatchSetCombinedID, clIdx.LatestPatchSet)
+ assert.True(t, clIdx.ComputedTS.After(longAgo)) // should be updated
+ assert.Len(t, clIdx.UntriagedResults, 3)
+ assert.Equal(t, firstUntriagedDigest, clIdx.UntriagedResults[0].Digest)
+ assert.Equal(t, secondUntriagedDigest, clIdx.UntriagedResults[1].Digest)
+ assert.Equal(t, thirdUntriagedDigest, clIdx.UntriagedResults[2].Digest)
+ require.NotNil(t, clIdx.ParamSet)
+ clIdx.ParamSet.Normalize()
+ assert.Equal(t, paramtools.ParamSet{
+ "model": []string{"bluefish", "crosshatch", "redfish"},
+ "name": []string{"test_alpha", "this_test_was_here_before"},
+ "os": []string{"Android", "iOS"},
+ }, clIdx.ParamSet)
+ assert.Equal(t, int64(1), ixr.changeListsReindexed.Get())
+}
+
func TestIndexer_CalcChangeListIndices_PreviousIndexDoesNotNeedUpdating_Success(t *testing.T) {
unittest.SmallTest(t)
diff --git a/golden/go/search/search.go b/golden/go/search/search.go
index 4b4e278..f219b16 100644
--- a/golden/go/search/search.go
+++ b/golden/go/search/search.go
@@ -589,7 +589,7 @@
if xtr, ok := s.storeCache.Get(key); ok {
return xtr.([]tjstore.TryJobResult), nil
}
- xtr, err := s.tryJobStore.GetResults(ctx, id)
+ xtr, err := s.tryJobStore.GetResults(ctx, id, time.Time{})
if err != nil {
return nil, skerr.Wrap(err)
}
diff --git a/golden/go/search/search_test.go b/golden/go/search/search_test.go
index 17c6228..cfe5a6f 100644
--- a/golden/go/search/search_test.go
+++ b/golden/go/search/search_test.go
@@ -758,7 +758,7 @@
"ext": "png",
}
- mtjs.On("GetResults", testutils.AnyContext, expectedID).Return([]tjstore.TryJobResult{
+ mtjs.On("GetResults", testutils.AnyContext, expectedID, anyTime).Return([]tjstore.TryJobResult{
{
GroupParams: anglerGroup,
Options: options,
@@ -1260,7 +1260,7 @@
// Return 4 results, 2 that match on digest and test, 1 that matches only on digest and 1 that
// matches only on test.
- mts.On("GetResults", testutils.AnyContext, tjstore.CombinedPSID{CRS: testCRS, CL: testCLID, PS: latestPSID}).Return([]tjstore.TryJobResult{
+ mts.On("GetResults", testutils.AnyContext, tjstore.CombinedPSID{CRS: testCRS, CL: testCLID, PS: latestPSID}, anyTime).Return([]tjstore.TryJobResult{
{
Digest: digestWeWantDetailsAbout,
ResultParams: paramtools.Params{types.PrimaryKeyField: string(testWeWantDetailsAbout)},
@@ -1347,7 +1347,7 @@
// Return 2 results that match on digest and test, 1 of which has an OS that is not on the
// publicly viewable list and should be filtered.
- mts.On("GetResults", testutils.AnyContext, tjstore.CombinedPSID{CRS: testCRS, CL: testCLID, PS: latestPSID}).Return([]tjstore.TryJobResult{
+ mts.On("GetResults", testutils.AnyContext, tjstore.CombinedPSID{CRS: testCRS, CL: testCLID, PS: latestPSID}, anyTime).Return([]tjstore.TryJobResult{
{
Digest: digestWeWantDetailsAbout,
ResultParams: paramtools.Params{types.PrimaryKeyField: string(testWeWantDetailsAbout)},
@@ -1566,7 +1566,7 @@
options := map[string]string{
"ext": "png",
}
- mtjs.On("GetResults", testutils.AnyContext, expectedID).Return([]tjstore.TryJobResult{
+ mtjs.On("GetResults", testutils.AnyContext, expectedID, anyTime).Return([]tjstore.TryJobResult{
{
GroupParams: anglerGroup,
Options: options,
@@ -1666,7 +1666,7 @@
options := map[string]string{
"ext": "png",
}
- mtjs.On("GetResults", testutils.AnyContext, expectedID).Return([]tjstore.TryJobResult{
+ mtjs.On("GetResults", testutils.AnyContext, expectedID, anyTime).Return([]tjstore.TryJobResult{
{
// This trace is "flaky", so should not be reported. Specifically, AnglerAlphaTraceID
// has 2 digests in it on master, AlphaNegativeDigest and AlphaPositiveDigest.
@@ -2352,6 +2352,8 @@
alphaPositiveTriageTS = time.Date(2020, time.March, 1, 2, 3, 4, 0, time.UTC)
alphaNegativeTriageTS = time.Date(2020, time.March, 4, 2, 3, 4, 0, time.UTC)
betaPositiveTriageTS = time.Date(2020, time.March, 7, 2, 3, 4, 0, time.UTC)
+
+ anyTime = mock.MatchedBy(func(time.Time) bool { return true })
)
func makeThreeDevicesExpectationStore() *mock_expectations.Store {
diff --git a/golden/go/tjstore/fs_tjstore/fs_tjstore.go b/golden/go/tjstore/fs_tjstore/fs_tjstore.go
index f184ee1..414461e 100644
--- a/golden/go/tjstore/fs_tjstore/fs_tjstore.go
+++ b/golden/go/tjstore/fs_tjstore/fs_tjstore.go
@@ -171,8 +171,9 @@
return xtj, nil
}
-// GetResults implements the tjstore.Store interface.
-func (s *StoreImpl) GetResults(ctx context.Context, psID tjstore.CombinedPSID) ([]tjstore.TryJobResult, error) {
+// GetResults implements the tjstore.Store interface. TODO(kjlubick) add support for updatedAfter
+// after the updated ingestion logic has baked in, filling the results with timestamps.
+func (s *StoreImpl) GetResults(ctx context.Context, psID tjstore.CombinedPSID, updatedAfter time.Time) ([]tjstore.TryJobResult, error) {
defer metrics2.FuncTimer().Stop()
q := s.client.Collection(tjResultCollection).Where(crsField, "==", psID.CRS).
Where(changeListIDField, "==", psID.CL).Where(patchSetIDField, "==", psID.PS)
diff --git a/golden/go/tjstore/fs_tjstore/fs_tjstore_test.go b/golden/go/tjstore/fs_tjstore/fs_tjstore_test.go
index 5218893..fd15d15 100644
--- a/golden/go/tjstore/fs_tjstore/fs_tjstore_test.go
+++ b/golden/go/tjstore/fs_tjstore/fs_tjstore_test.go
@@ -294,7 +294,7 @@
}})
assert.NoError(t, err)
- xtr, err = f.GetResults(ctx, psID)
+ xtr, err = f.GetResults(ctx, psID, time.Time{})
assert.NoError(t, err)
assert.Len(t, xtr, 10)
@@ -352,7 +352,7 @@
err := f.PutResults(ctx, psID, tryJobID, cis, xtr)
assert.NoError(t, err)
- xtr, err = f.GetResults(ctx, psID)
+ xtr, err = f.GetResults(ctx, psID, time.Time{})
assert.NoError(t, err)
assert.Len(t, xtr, 1)
assert.Equal(t, tjstore.TryJobResult{
@@ -410,7 +410,7 @@
err := f.PutResults(ctx, psID, tryJobID, cis, xtr)
assert.NoError(t, err)
- xtr, err = f.GetResults(ctx, psID)
+ xtr, err = f.GetResults(ctx, psID, time.Time{})
assert.NoError(t, err)
assert.Len(t, xtr, N)
diff --git a/golden/go/tjstore/mocks/Store.go b/golden/go/tjstore/mocks/Store.go
index 17a4626..733d932 100644
--- a/golden/go/tjstore/mocks/Store.go
+++ b/golden/go/tjstore/mocks/Store.go
@@ -8,6 +8,8 @@
mock "github.com/stretchr/testify/mock"
continuous_integration "go.skia.org/infra/golden/go/continuous_integration"
+ time "time"
+
tjstore "go.skia.org/infra/golden/go/tjstore"
)
@@ -16,13 +18,13 @@
mock.Mock
}
-// GetResults provides a mock function with given fields: ctx, psID
-func (_m *Store) GetResults(ctx context.Context, psID tjstore.CombinedPSID) ([]tjstore.TryJobResult, error) {
- ret := _m.Called(ctx, psID)
+// GetResults provides a mock function with given fields: ctx, psID, updatedAfter
+func (_m *Store) GetResults(ctx context.Context, psID tjstore.CombinedPSID, updatedAfter time.Time) ([]tjstore.TryJobResult, error) {
+ ret := _m.Called(ctx, psID, updatedAfter)
var r0 []tjstore.TryJobResult
- if rf, ok := ret.Get(0).(func(context.Context, tjstore.CombinedPSID) []tjstore.TryJobResult); ok {
- r0 = rf(ctx, psID)
+ if rf, ok := ret.Get(0).(func(context.Context, tjstore.CombinedPSID, time.Time) []tjstore.TryJobResult); ok {
+ r0 = rf(ctx, psID, updatedAfter)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]tjstore.TryJobResult)
@@ -30,8 +32,8 @@
}
var r1 error
- if rf, ok := ret.Get(1).(func(context.Context, tjstore.CombinedPSID) error); ok {
- r1 = rf(ctx, psID)
+ if rf, ok := ret.Get(1).(func(context.Context, tjstore.CombinedPSID, time.Time) error); ok {
+ r1 = rf(ctx, psID, updatedAfter)
} else {
r1 = ret.Error(1)
}
diff --git a/golden/go/tjstore/tjstore.go b/golden/go/tjstore/tjstore.go
index 2332400..4bfb532 100644
--- a/golden/go/tjstore/tjstore.go
+++ b/golden/go/tjstore/tjstore.go
@@ -6,6 +6,7 @@
"context"
"errors"
"fmt"
+ "time"
"go.skia.org/infra/go/paramtools"
ci "go.skia.org/infra/golden/go/continuous_integration"
@@ -28,7 +29,7 @@
// GetResults returns any TryJobResults for a given ChangeList and PatchSet.
// The returned slice could be empty and is not sorted.
- GetResults(ctx context.Context, psID CombinedPSID) ([]TryJobResult, error)
+ GetResults(ctx context.Context, psID CombinedPSID, updatedAfter time.Time) ([]TryJobResult, error)
// PutTryJob stores the given TryJob, overwriting any values for
// that TryJob if they already existed. The TryJob will "belong" to the