[gold] Make indexer use timestamp on tryjob results.

Bug: skia:10399
Change-Id: I89f87ce19002e75fee80d6034feef736c3152423
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/297600
Reviewed-by: Leandro Lovisolo <lovisolo@google.com>
diff --git a/golden/go/indexer/indexer.go b/golden/go/indexer/indexer.go
index 653f2c0..4c921a8 100644
--- a/golden/go/indexer/indexer.go
+++ b/golden/go/indexer/indexer.go
@@ -749,7 +749,7 @@
 	}
 
 	// An arbitrary cut off to the amount of recent, open CLs we try to index.
-	recent := time.Now().Add(-maxAgeOfOpenCLsToIndex)
+	recent := now.Add(-maxAgeOfOpenCLsToIndex)
 	xcl, _, err := ix.CLStore.GetChangeLists(ctx, clstore.SearchOptions{
 		StartIdx:    0,
 		Limit:       maxCLsToIndex,
@@ -803,11 +803,19 @@
 					CRS: crs,
 					PS:  latestPS.SystemID,
 				}
-				xtjr, err := ix.TryJobStore.GetResults(ctx, psID)
+				afterTime := time.Time{}
+				var existingUntriagedResults []tjstore.TryJobResult
+				// Test to see if we can do an incremental index (just for results that were uploaded
+				// for this patchset since the last time we indexed).
+				if ok && clIdx.LatestPatchSet.PS == latestPS.SystemID {
+					afterTime = clIdx.ComputedTS
+					existingUntriagedResults = clIdx.UntriagedResults
+				}
+				xtjr, err := ix.TryJobStore.GetResults(ctx, psID, afterTime)
 				if err != nil {
 					return skerr.Wrap(err)
 				}
-				untriagedResults, params := indexTryJobResults(xtjr, exps)
+				untriagedResults, params := indexTryJobResults(existingUntriagedResults, xtjr, exps)
 				// Copy the existing ParamSet into the newly created one. It is important to copy it from
 				// old into new (and not new into old), so we don't cause a race condition on the cached
 				// ParamSet by writing to it while GetIndexForCL is reading from it.
@@ -829,19 +837,40 @@
 // indexTryJobResults goes through all the TryJobResults and returns results useful for indexing.
 // Concretely, these results are a slice with just the untriaged results and a ParamSet with the
 // observed params.
-func indexTryJobResults(xtjr []tjstore.TryJobResult, exps expectations.Classifier) ([]tjstore.TryJobResult, paramtools.ParamSet) {
-	var untriagedResults []tjstore.TryJobResult
+func indexTryJobResults(existing, newResults []tjstore.TryJobResult, exps expectations.Classifier) ([]tjstore.TryJobResult, paramtools.ParamSet) {
 	params := paramtools.ParamSet{}
-	for _, tjr := range xtjr {
+	var newlyUntriagedResults []tjstore.TryJobResult
+	for _, tjr := range newResults {
 		params.AddParams(tjr.GroupParams)
 		params.AddParams(tjr.ResultParams)
 		params.AddParams(tjr.Options)
 		tn := types.TestName(tjr.ResultParams[types.PrimaryKeyField])
 		if exps.Classification(tn, tjr.Digest) == expectations.Untriaged {
-			untriagedResults = append(untriagedResults, tjr)
+			// If the same digest somehow shows up twice (maybe because of how we
+			alreadyInList := false
+			for _, existingResult := range existing {
+				if existingResult.Digest == tjr.Digest && existingResult.ResultParams[types.PrimaryKeyField] == tjr.ResultParams[types.PrimaryKeyField] {
+					alreadyInList = true
+					break
+				}
+			}
+			if !alreadyInList {
+				newlyUntriagedResults = append(newlyUntriagedResults, tjr)
+			}
 		}
 	}
-	return untriagedResults, params
+	if len(newlyUntriagedResults) == 0 {
+		return existing, params
+	}
+
+	if len(existing) == 0 {
+		return newlyUntriagedResults, params
+	}
+	// make a copy of the slice, so as not to confuse the existing index.
+	combined := make([]tjstore.TryJobResult, 0, len(existing)+len(newlyUntriagedResults))
+	combined = append(combined, existing...)
+	combined = append(combined, newlyUntriagedResults...)
+	return combined, params
 }
 
 // getCLIndex is a helper that returns the appropriately typed element from changeListIndices.
diff --git a/golden/go/indexer/indexer_test.go b/golden/go/indexer/indexer_test.go
index 9cf29b2..598d283 100644
--- a/golden/go/indexer/indexer_test.go
+++ b/golden/go/indexer/indexer_test.go
@@ -272,7 +272,7 @@
 		"day_of_week": "wednesday",
 	}
 
-	mts.On("GetResults", testutils.AnyContext, firstCombinedID).Return([]tjstore.TryJobResult{
+	mts.On("GetResults", testutils.AnyContext, firstCombinedID, time.Time{}).Return([]tjstore.TryJobResult{
 		{
 			ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
 			GroupParams:  androidGroup,
@@ -293,7 +293,7 @@
 			Digest:       data.AlphaUntriagedDigest,
 		},
 	}, nil)
-	mts.On("GetResults", testutils.AnyContext, secondCombinedID).Return([]tjstore.TryJobResult{
+	mts.On("GetResults", testutils.AnyContext, secondCombinedID, time.Time{}).Return([]tjstore.TryJobResult{
 		{
 			ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
 			GroupParams:  androidGroup,
@@ -361,7 +361,7 @@
 	assert.Equal(t, int64(2), ixr.changeListsReindexed.Get())
 }
 
-func TestIndexer_CalcChangeListIndices_HasPreviousIndex_Success(t *testing.T) {
+func TestIndexer_CalcChangeListIndices_HasIndexForPreviousPS_Success(t *testing.T) {
 	unittest.SmallTest(t)
 
 	const crs = "gerrit"
@@ -406,7 +406,7 @@
 		"model": "crosshatch",
 	}
 
-	mts.On("GetResults", testutils.AnyContext, secondPatchSetCombinedID).Return([]tjstore.TryJobResult{
+	mts.On("GetResults", testutils.AnyContext, secondPatchSetCombinedID, time.Time{}).Return([]tjstore.TryJobResult{
 		{
 			ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
 			GroupParams:  androidGroup,
@@ -485,6 +485,121 @@
 	assert.Equal(t, int64(1), ixr.changeListsReindexed.Get())
 }
 
+func TestIndexer_CalcChangeListIndices_HasIndexForCurrentPS_IncrementalUpdateSuccess(t *testing.T) {
+	unittest.SmallTest(t)
+
+	const crs = "gerrit"
+	const clID = "111111"
+	const firstPatchSet = "firstPS"
+	const firstUntriagedDigest = types.Digest("11111111111111111111")
+	const secondUntriagedDigest = types.Digest("22222222222222222222")
+	const thirdUntriagedDigest = types.Digest("33333333333333333333")
+
+	firstPatchSetCombinedID := tjstore.CombinedPSID{CL: clID, CRS: crs, PS: firstPatchSet}
+
+	longAgo := time.Date(2020, time.April, 15, 15, 15, 0, 0, time.UTC)
+	recently := time.Date(2020, time.May, 5, 12, 12, 0, 0, time.UTC)
+
+	mcs := &mock_clstore.Store{}
+	mes := &mock_expectations.Store{}
+	mts := &mock_tjstore.Store{}
+
+	// All digests are untriaged
+	masterExp := expectations.Expectations{}
+
+	// The CL has no additional expectations.
+	mes.On("Get", testutils.AnyContext).Return(&masterExp, nil)
+	loadChangeListExpectations(mes, crs, map[string]*expectations.Expectations{
+		clID: {},
+	})
+
+	mcs.On("GetChangeLists", testutils.AnyContext, mock.Anything).Return([]code_review.ChangeList{
+		{
+			SystemID: clID,
+			Updated:  recently,
+		},
+	}, 0, nil)
+
+	mcs.On("GetPatchSets", testutils.AnyContext, clID).Return([]code_review.PatchSet{
+		{SystemID: firstPatchSet}, // all other fields ignored from patch set.
+	}, nil)
+	mcs.On("System").Return(crs)
+
+	androidGroup := paramtools.Params{
+		"os":    "Android",
+		"model": "crosshatch",
+	}
+
+	// Note that this time is based on the previous indexed time.
+	mts.On("GetResults", testutils.AnyContext, firstPatchSetCombinedID, longAgo).Return([]tjstore.TryJobResult{
+		{
+			ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+			GroupParams:  androidGroup,
+			Digest:       secondUntriagedDigest,
+		},
+		{
+			ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+			GroupParams:  androidGroup,
+			Digest:       thirdUntriagedDigest,
+		},
+	}, nil)
+
+	ctx := context.Background()
+	ic := IndexerConfig{
+		CLStore:           mcs,
+		ExpectationsStore: mes,
+		TryJobStore:       mts,
+	}
+	ixr, err := New(ctx, ic, 0)
+	require.NoError(t, err)
+	ixr.changeListsReindexed.Reset()
+
+	// The scenario here is that the first index for this patchset identified two untriaged digests.
+	// Later, .
+	previousIdx := ChangeListIndex{
+		LatestPatchSet: firstPatchSetCombinedID,
+		UntriagedResults: []tjstore.TryJobResult{
+			{
+				ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+				Digest:       firstUntriagedDigest,
+				// Other fields ignored
+			},
+			{
+				ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+				Digest:       secondUntriagedDigest,
+			},
+		},
+		ParamSet: paramtools.ParamSet{
+			// This ParamSet is purposely incomplete (i.e. no data.AlphaTest) to make sure new data is
+			// merged in correctly.
+			types.PrimaryKeyField: []string{"this_test_was_here_before"},
+			"os":                  []string{"Android", "iOS"},
+			"model":               []string{"bluefish", "redfish"},
+		},
+		ComputedTS: longAgo,
+	}
+	ixr.changeListIndices.Set("gerrit_111111", &previousIdx, 0)
+
+	ixr.calcChangeListIndices(ctx)
+
+	clIdx := ixr.GetIndexForCL(crs, clID)
+	assert.NotNil(t, clIdx)
+	assert.Equal(t, firstPatchSetCombinedID, clIdx.LatestPatchSet)
+	assert.True(t, clIdx.ComputedTS.After(longAgo)) // should be updated
+	assert.Len(t, clIdx.UntriagedResults, 3)
+	assert.Equal(t, firstUntriagedDigest, clIdx.UntriagedResults[0].Digest)
+	assert.Equal(t, secondUntriagedDigest, clIdx.UntriagedResults[1].Digest)
+	assert.Equal(t, thirdUntriagedDigest, clIdx.UntriagedResults[2].Digest)
+	require.NotNil(t, clIdx.ParamSet)
+	clIdx.ParamSet.Normalize()
+	assert.Equal(t, paramtools.ParamSet{
+		"model": []string{"bluefish", "crosshatch", "redfish"},
+		"name":  []string{"test_alpha", "this_test_was_here_before"},
+		"os":    []string{"Android", "iOS"},
+	}, clIdx.ParamSet)
+	assert.Equal(t, int64(1), ixr.changeListsReindexed.Get())
+}
+
 func TestIndexer_CalcChangeListIndices_PreviousIndexDoesNotNeedUpdating_Success(t *testing.T) {
 	unittest.SmallTest(t)
 
diff --git a/golden/go/search/search.go b/golden/go/search/search.go
index 4b4e278..f219b16 100644
--- a/golden/go/search/search.go
+++ b/golden/go/search/search.go
@@ -589,7 +589,7 @@
 	if xtr, ok := s.storeCache.Get(key); ok {
 		return xtr.([]tjstore.TryJobResult), nil
 	}
-	xtr, err := s.tryJobStore.GetResults(ctx, id)
+	xtr, err := s.tryJobStore.GetResults(ctx, id, time.Time{})
 	if err != nil {
 		return nil, skerr.Wrap(err)
 	}
diff --git a/golden/go/search/search_test.go b/golden/go/search/search_test.go
index 17c6228..cfe5a6f 100644
--- a/golden/go/search/search_test.go
+++ b/golden/go/search/search_test.go
@@ -758,7 +758,7 @@
 		"ext": "png",
 	}
 
-	mtjs.On("GetResults", testutils.AnyContext, expectedID).Return([]tjstore.TryJobResult{
+	mtjs.On("GetResults", testutils.AnyContext, expectedID, anyTime).Return([]tjstore.TryJobResult{
 		{
 			GroupParams: anglerGroup,
 			Options:     options,
@@ -1260,7 +1260,7 @@
 
 	// Return 4 results, 2 that match on digest and test, 1 that matches only on digest and 1 that
 	// matches only on test.
-	mts.On("GetResults", testutils.AnyContext, tjstore.CombinedPSID{CRS: testCRS, CL: testCLID, PS: latestPSID}).Return([]tjstore.TryJobResult{
+	mts.On("GetResults", testutils.AnyContext, tjstore.CombinedPSID{CRS: testCRS, CL: testCLID, PS: latestPSID}, anyTime).Return([]tjstore.TryJobResult{
 		{
 			Digest:       digestWeWantDetailsAbout,
 			ResultParams: paramtools.Params{types.PrimaryKeyField: string(testWeWantDetailsAbout)},
@@ -1347,7 +1347,7 @@
 
 	// Return 2 results that match on digest and test, 1 of which has an OS that is not on the
 	// publicly viewable list and should be filtered.
-	mts.On("GetResults", testutils.AnyContext, tjstore.CombinedPSID{CRS: testCRS, CL: testCLID, PS: latestPSID}).Return([]tjstore.TryJobResult{
+	mts.On("GetResults", testutils.AnyContext, tjstore.CombinedPSID{CRS: testCRS, CL: testCLID, PS: latestPSID}, anyTime).Return([]tjstore.TryJobResult{
 		{
 			Digest:       digestWeWantDetailsAbout,
 			ResultParams: paramtools.Params{types.PrimaryKeyField: string(testWeWantDetailsAbout)},
@@ -1566,7 +1566,7 @@
 	options := map[string]string{
 		"ext": "png",
 	}
-	mtjs.On("GetResults", testutils.AnyContext, expectedID).Return([]tjstore.TryJobResult{
+	mtjs.On("GetResults", testutils.AnyContext, expectedID, anyTime).Return([]tjstore.TryJobResult{
 		{
 			GroupParams: anglerGroup,
 			Options:     options,
@@ -1666,7 +1666,7 @@
 	options := map[string]string{
 		"ext": "png",
 	}
-	mtjs.On("GetResults", testutils.AnyContext, expectedID).Return([]tjstore.TryJobResult{
+	mtjs.On("GetResults", testutils.AnyContext, expectedID, anyTime).Return([]tjstore.TryJobResult{
 		{
 			// This trace is "flaky", so should not be reported. Specifically, AnglerAlphaTraceID
 			// has 2 digests in it on master, AlphaNegativeDigest and AlphaPositiveDigest.
@@ -2352,6 +2352,8 @@
 	alphaPositiveTriageTS = time.Date(2020, time.March, 1, 2, 3, 4, 0, time.UTC)
 	alphaNegativeTriageTS = time.Date(2020, time.March, 4, 2, 3, 4, 0, time.UTC)
 	betaPositiveTriageTS  = time.Date(2020, time.March, 7, 2, 3, 4, 0, time.UTC)
+
+	anyTime = mock.MatchedBy(func(time.Time) bool { return true })
 )
 
 func makeThreeDevicesExpectationStore() *mock_expectations.Store {
diff --git a/golden/go/tjstore/fs_tjstore/fs_tjstore.go b/golden/go/tjstore/fs_tjstore/fs_tjstore.go
index f184ee1..414461e 100644
--- a/golden/go/tjstore/fs_tjstore/fs_tjstore.go
+++ b/golden/go/tjstore/fs_tjstore/fs_tjstore.go
@@ -171,8 +171,9 @@
 	return xtj, nil
 }
 
-// GetResults implements the tjstore.Store interface.
-func (s *StoreImpl) GetResults(ctx context.Context, psID tjstore.CombinedPSID) ([]tjstore.TryJobResult, error) {
+// GetResults implements the tjstore.Store interface. TODO(kjlubick) add support for updatedAfter
+// after the updated ingestion logic has baked in, filling the results with timestamps.
+func (s *StoreImpl) GetResults(ctx context.Context, psID tjstore.CombinedPSID, updatedAfter time.Time) ([]tjstore.TryJobResult, error) {
 	defer metrics2.FuncTimer().Stop()
 	q := s.client.Collection(tjResultCollection).Where(crsField, "==", psID.CRS).
 		Where(changeListIDField, "==", psID.CL).Where(patchSetIDField, "==", psID.PS)
diff --git a/golden/go/tjstore/fs_tjstore/fs_tjstore_test.go b/golden/go/tjstore/fs_tjstore/fs_tjstore_test.go
index 5218893..fd15d15 100644
--- a/golden/go/tjstore/fs_tjstore/fs_tjstore_test.go
+++ b/golden/go/tjstore/fs_tjstore/fs_tjstore_test.go
@@ -294,7 +294,7 @@
 	}})
 	assert.NoError(t, err)
 
-	xtr, err = f.GetResults(ctx, psID)
+	xtr, err = f.GetResults(ctx, psID, time.Time{})
 	assert.NoError(t, err)
 	assert.Len(t, xtr, 10)
 
@@ -352,7 +352,7 @@
 	err := f.PutResults(ctx, psID, tryJobID, cis, xtr)
 	assert.NoError(t, err)
 
-	xtr, err = f.GetResults(ctx, psID)
+	xtr, err = f.GetResults(ctx, psID, time.Time{})
 	assert.NoError(t, err)
 	assert.Len(t, xtr, 1)
 	assert.Equal(t, tjstore.TryJobResult{
@@ -410,7 +410,7 @@
 	err := f.PutResults(ctx, psID, tryJobID, cis, xtr)
 	assert.NoError(t, err)
 
-	xtr, err = f.GetResults(ctx, psID)
+	xtr, err = f.GetResults(ctx, psID, time.Time{})
 	assert.NoError(t, err)
 	assert.Len(t, xtr, N)
 
diff --git a/golden/go/tjstore/mocks/Store.go b/golden/go/tjstore/mocks/Store.go
index 17a4626..733d932 100644
--- a/golden/go/tjstore/mocks/Store.go
+++ b/golden/go/tjstore/mocks/Store.go
@@ -8,6 +8,8 @@
 	mock "github.com/stretchr/testify/mock"
 	continuous_integration "go.skia.org/infra/golden/go/continuous_integration"
 
+	time "time"
+
 	tjstore "go.skia.org/infra/golden/go/tjstore"
 )
 
@@ -16,13 +18,13 @@
 	mock.Mock
 }
 
-// GetResults provides a mock function with given fields: ctx, psID
-func (_m *Store) GetResults(ctx context.Context, psID tjstore.CombinedPSID) ([]tjstore.TryJobResult, error) {
-	ret := _m.Called(ctx, psID)
+// GetResults provides a mock function with given fields: ctx, psID, updatedAfter
+func (_m *Store) GetResults(ctx context.Context, psID tjstore.CombinedPSID, updatedAfter time.Time) ([]tjstore.TryJobResult, error) {
+	ret := _m.Called(ctx, psID, updatedAfter)
 
 	var r0 []tjstore.TryJobResult
-	if rf, ok := ret.Get(0).(func(context.Context, tjstore.CombinedPSID) []tjstore.TryJobResult); ok {
-		r0 = rf(ctx, psID)
+	if rf, ok := ret.Get(0).(func(context.Context, tjstore.CombinedPSID, time.Time) []tjstore.TryJobResult); ok {
+		r0 = rf(ctx, psID, updatedAfter)
 	} else {
 		if ret.Get(0) != nil {
 			r0 = ret.Get(0).([]tjstore.TryJobResult)
@@ -30,8 +32,8 @@
 	}
 
 	var r1 error
-	if rf, ok := ret.Get(1).(func(context.Context, tjstore.CombinedPSID) error); ok {
-		r1 = rf(ctx, psID)
+	if rf, ok := ret.Get(1).(func(context.Context, tjstore.CombinedPSID, time.Time) error); ok {
+		r1 = rf(ctx, psID, updatedAfter)
 	} else {
 		r1 = ret.Error(1)
 	}
diff --git a/golden/go/tjstore/tjstore.go b/golden/go/tjstore/tjstore.go
index 2332400..4bfb532 100644
--- a/golden/go/tjstore/tjstore.go
+++ b/golden/go/tjstore/tjstore.go
@@ -6,6 +6,7 @@
 	"context"
 	"errors"
 	"fmt"
+	"time"
 
 	"go.skia.org/infra/go/paramtools"
 	ci "go.skia.org/infra/golden/go/continuous_integration"
@@ -28,7 +29,7 @@
 
 	// GetResults returns any TryJobResults for a given ChangeList and PatchSet.
 	// The returned slice could be empty and is not sorted.
-	GetResults(ctx context.Context, psID CombinedPSID) ([]TryJobResult, error)
+	GetResults(ctx context.Context, psID CombinedPSID, updatedAfter time.Time) ([]TryJobResult, error)
 
 	// PutTryJob stores the given TryJob, overwriting any values for
 	// that TryJob if they already existed. The TryJob will "belong" to the