[gold] Use time in tryjob results

Bug: skia:10399
Change-Id: I7eec72ab18747d01df5ba072a8f61d0d1a660248
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/297736
Reviewed-by: Leandro Lovisolo <lovisolo@google.com>
diff --git a/golden/go/ingestion_processors/tryjob_ingestion.go b/golden/go/ingestion_processors/tryjob_ingestion.go
index f4200f4..96bf9c4 100644
--- a/golden/go/ingestion_processors/tryjob_ingestion.go
+++ b/golden/go/ingestion_processors/tryjob_ingestion.go
@@ -276,7 +276,7 @@
 	if err := g.changeListStore.PutPatchSet(ctx, ps); err != nil {
 		return skerr.Wrapf(err, "could not store PS %s of CL %q to clstore", psID, clID)
 	}
-	err = g.tryJobStore.PutResults(ctx, combinedID, tjID, cisName, tjr)
+	err = g.tryJobStore.PutResults(ctx, combinedID, tjID, cisName, tjr, time.Now())
 	if err != nil {
 		return skerr.Wrapf(err, "putting %d results for CL %s, PS %d (%s), TJ %s, file %s", len(tjr), clID, psOrder, psID, tjID, rf.Name())
 	}
diff --git a/golden/go/ingestion_processors/tryjob_ingestion_test.go b/golden/go/ingestion_processors/tryjob_ingestion_test.go
index 05542cf..0c3bb70 100644
--- a/golden/go/ingestion_processors/tryjob_ingestion_test.go
+++ b/golden/go/ingestion_processors/tryjob_ingestion_test.go
@@ -102,7 +102,7 @@
 	mcls.On("PutPatchSet", testutils.AnyContext, makeGerritPatchSet()).Return(nil).Once()
 
 	mtjs.On("PutTryJob", testutils.AnyContext, gerritCombinedID, makeGerritBuildbucketTryJob()).Return(nil).Once()
-	mtjs.On("PutResults", testutils.AnyContext, gerritCombinedID, gerritTJID, buildbucketCIS, makeTryJobResults()).Return(nil).Once()
+	mtjs.On("PutResults", testutils.AnyContext, gerritCombinedID, gerritTJID, buildbucketCIS, makeTryJobResults(), anyTime).Return(nil).Once()
 
 	gtp := goldTryjobProcessor{
 		changeListStore: mcls,
@@ -143,7 +143,7 @@
 
 	combinedID := tjstore.CombinedPSID{CL: githubCLID, PS: githubPSID, CRS: "github"}
 	mtjs.On("PutTryJob", testutils.AnyContext, combinedID, makeGitHubCirrusTryJob()).Return(nil)
-	mtjs.On("PutResults", testutils.AnyContext, combinedID, githubTJID, cirrusCIS, makeGitHubTryJobResults()).Return(nil)
+	mtjs.On("PutResults", testutils.AnyContext, combinedID, githubTJID, cirrusCIS, makeGitHubTryJobResults(), anyTime).Return(nil)
 
 	gtp := goldTryjobProcessor{
 		changeListStore: mcls,
@@ -253,7 +253,7 @@
 	mcls.On("PutChangeList", testutils.AnyContext, clWithUpdatedTime(t, gerritCLID, gerritCLDate)).Return(nil)
 
 	mtjs.On("PutTryJob", testutils.AnyContext, gerritCombinedID, makeGerritBuildbucketTryJob()).Return(nil)
-	mtjs.On("PutResults", testutils.AnyContext, gerritCombinedID, gerritTJID, buildbucketCIS, makeTryJobResults()).Return(nil)
+	mtjs.On("PutResults", testutils.AnyContext, gerritCombinedID, gerritTJID, buildbucketCIS, makeTryJobResults(), anyTime).Return(nil)
 
 	gtp := goldTryjobProcessor{
 		changeListStore: mcls,
@@ -290,7 +290,7 @@
 	mcls.On("PutChangeList", testutils.AnyContext, clWithUpdatedTime(t, gerritCLID, gerritCLDate)).Return(nil)
 
 	mtjs.On("PutTryJob", testutils.AnyContext, gerritCombinedID, makeGerritBuildbucketTryJob()).Return(nil)
-	mtjs.On("PutResults", testutils.AnyContext, gerritCombinedID, gerritTJID, buildbucketCIS, makeTryJobResults()).Return(nil)
+	mtjs.On("PutResults", testutils.AnyContext, gerritCombinedID, gerritTJID, buildbucketCIS, makeTryJobResults(), anyTime).Return(nil)
 
 	gtp := goldTryjobProcessor{
 		changeListStore: mcls,
@@ -324,7 +324,7 @@
 	mcls.On("PutPatchSet", testutils.AnyContext, makeGerritPatchSet()).Return(nil)
 
 	mtjs.On("PutTryJob", testutils.AnyContext, gerritCombinedID, makeGerritBuildbucketTryJob()).Return(nil)
-	mtjs.On("PutResults", testutils.AnyContext, gerritCombinedID, gerritTJID, buildbucketCIS, makeTryJobResults()).Return(nil)
+	mtjs.On("PutResults", testutils.AnyContext, gerritCombinedID, gerritTJID, buildbucketCIS, makeTryJobResults(), anyTime).Return(nil)
 
 	gtp := goldTryjobProcessor{
 		changeListStore: mcls,
@@ -368,6 +368,8 @@
 	gerritCombinedID = tjstore.CombinedPSID{CL: gerritCLID, PS: gerritPSID, CRS: gerritCRS}
 
 	gerritCLDate = time.Date(2019, time.August, 19, 18, 17, 16, 0, time.UTC)
+
+	anyTime = mock.MatchedBy(func(time.Time) bool { return true })
 )
 
 // These are functions to avoid mutations causing issues for future tests/checks
diff --git a/golden/go/tjstore/fs_tjstore/FIRESTORE.md b/golden/go/tjstore/fs_tjstore/FIRESTORE.md
index e51029e..7489838 100644
--- a/golden/go/tjstore/fs_tjstore/FIRESTORE.md
+++ b/golden/go/tjstore/fs_tjstore/FIRESTORE.md
@@ -47,9 +47,15 @@
   - TryJobResult.OptionsHash
   - Params.Map
 
-Currently, all queries can be handled via Firestore's index merging
-<https://firebase.google.com/docs/firestore/query-data/index-overview#taking_advantage_of_index_merging>
-because we chain "==" filters together.
+We need the following complex indices:
+
+Collection ID              | Fields
+------------------------------------------------------------------
+tjstore_result             | clid: ASC crs: ASC psid: ASC digest: ASC
+tjstore_result             | clid: ASC crs: ASC psid: ASC ts: ASC
+
+The first is for fetching results in a sharded fashion. The second is for fetching results
+after a certain time.
 
 Usage
 -----
diff --git a/golden/go/tjstore/fs_tjstore/fs_tjstore.go b/golden/go/tjstore/fs_tjstore/fs_tjstore.go
index 414461e..3f9bf87 100644
--- a/golden/go/tjstore/fs_tjstore/fs_tjstore.go
+++ b/golden/go/tjstore/fs_tjstore/fs_tjstore.go
@@ -34,6 +34,7 @@
 	crsField          = "crs"
 	patchSetIDField   = "psid"
 	digestField       = "digest"
+	timestampField    = "ts"
 
 	maxReadAttempts  = 5
 	maxWriteAttempts = 5
@@ -92,6 +93,8 @@
 	ResultParams    map[string]string `firestore:"result_params"`
 	GroupParamsHash string            `firestore:"group_hash"`
 	OptionsHash     string            `firestore:"options_hash"`
+
+	CreatedTS time.Time `firestore:"ts"`
 }
 
 // paramEntry represents a paramTools.Params stored in FireStore
@@ -171,19 +174,31 @@
 	return xtj, nil
 }
 
-// GetResults implements the tjstore.Store interface. TODO(kjlubick) add support for updatedAfter
-// after the updated ingestion logic has baked in, filling the results with timestamps.
+// GetResults implements the tjstore.Store interface.
 func (s *StoreImpl) GetResults(ctx context.Context, psID tjstore.CombinedPSID, updatedAfter time.Time) ([]tjstore.TryJobResult, error) {
 	defer metrics2.FuncTimer().Stop()
 	q := s.client.Collection(tjResultCollection).Where(crsField, "==", psID.CRS).
 		Where(changeListIDField, "==", psID.CL).Where(patchSetIDField, "==", psID.PS)
 
-	shardResults := make([][]resultEntry, resultShards)
-	queries := fs_utils.ShardOnDigest(q, digestField, resultShards)
+	shards := resultShards
+	var queries []firestore.Query
+	if !updatedAfter.IsZero() {
+		// If we are including a time, we can't shard, since sharding uses inequalities and firestore
+		// won't let you do two inequalities on different fields. This may result in reduced
+		// performance, but in practice this shouldn't matter too much because we:
+		//   1) Only provide a time when doing a partial load of results while indexing changelists.
+		//   2) Index changelists in the background (and in parallel), not in a user-visible way.
+		shards = 1
+		queries = []firestore.Query{q.Where(timestampField, ">=", updatedAfter)}
+	} else {
+		queries = fs_utils.ShardOnDigest(q, digestField, shards)
+	}
+
+	shardResults := make([][]resultEntry, shards)
 
 	// maps hash -> params we need to fetch
 	// We will first add keys to this map, then go fetch the actual params
-	shardParams := make([]util.StringSet, resultShards)
+	shardParams := make([]util.StringSet, shards)
 
 	err := s.client.IterDocsInParallel(ctx, "GetResults", psID.Key(), queries, maxReadAttempts, maxOperationTime, func(i int, doc *firestore.DocumentSnapshot) error {
 		if doc == nil {
@@ -299,7 +314,7 @@
 // This would make a rollback difficult, so we opt to retry any failures multiple times.
 // We store maps first, so if we do fail, we can bail out w/o having written the
 // (incomplete) TryJobResults.  We take a similar approach in fs_expstore, which has been fine.
-func (s *StoreImpl) PutResults(ctx context.Context, psID tjstore.CombinedPSID, tjID, cisName string, r []tjstore.TryJobResult) error {
+func (s *StoreImpl) PutResults(ctx context.Context, psID tjstore.CombinedPSID, tjID, cisName string, r []tjstore.TryJobResult, ts time.Time) error {
 	if len(r) == 0 {
 		return nil
 	}
@@ -318,6 +333,8 @@
 
 			Digest:       tr.Digest,
 			ResultParams: tr.ResultParams,
+
+			CreatedTS: ts,
 		}
 		gh, err := hashParams(tr.GroupParams)
 		if err != nil {
diff --git a/golden/go/tjstore/fs_tjstore/fs_tjstore_test.go b/golden/go/tjstore/fs_tjstore/fs_tjstore_test.go
index fd15d15..ad18bb0 100644
--- a/golden/go/tjstore/fs_tjstore/fs_tjstore_test.go
+++ b/golden/go/tjstore/fs_tjstore/fs_tjstore_test.go
@@ -244,7 +244,7 @@
 		})
 	}
 
-	err := f.PutResults(ctx, psID, firstTJID, cis, xtr)
+	err := f.PutResults(ctx, psID, firstTJID, cis, xtr, time.Now())
 	assert.NoError(t, err)
 
 	gp = paramtools.Params{
@@ -273,7 +273,7 @@
 		Digest: fakeDigest("crust", 4),
 	})
 
-	err = f.PutResults(ctx, psID, secondTJID, cis, xtr)
+	err = f.PutResults(ctx, psID, secondTJID, cis, xtr, time.Now())
 	assert.NoError(t, err)
 
 	otherPSID := tjstore.CombinedPSID{
@@ -291,7 +291,7 @@
 			types.PrimaryKeyField: "test-4",
 		},
 		Digest: "abcdef",
-	}})
+	}}, time.Now())
 	assert.NoError(t, err)
 
 	xtr, err = f.GetResults(ctx, psID, time.Time{})
@@ -315,6 +315,98 @@
 	assert.Equal(t, 5, crustCounts)
 }
 
+func TestPutResultsGetResults_Timestamps(t *testing.T) {
+	unittest.LargeTest(t)
+	c, cleanup := ifirestore.NewClientForTesting(context.Background(), t)
+	defer cleanup()
+
+	f := New(c)
+	ctx := context.Background()
+	const cis = "cirrus"
+	const firstDigest = types.Digest("1111111111111111")
+	const secondDigest = types.Digest("2222222222222222")
+	const tryjobID = "987654"
+
+	beforeTime := time.Date(2020, time.June, 1, 0, 0, 0, 0, time.UTC)
+	firstTime := time.Date(2020, time.June, 1, 1, 1, 1, 0, time.UTC)
+	inbetweenTime := time.Date(2020, time.June, 2, 2, 2, 2, 0, time.UTC)
+	secondTime := time.Date(2020, time.June, 3, 3, 3, 3, 0, time.UTC)
+	afterTime := time.Date(2020, time.June, 4, 4, 4, 4, 0, time.UTC)
+
+	psID := tjstore.CombinedPSID{
+		CL:  "1234",
+		CRS: "github",
+		PS:  "abcd",
+	}
+
+	gp := paramtools.Params{
+		"os":    "Android",
+		"model": "crustacean",
+	}
+	op := paramtools.Params{
+		"ext": "png",
+	}
+
+	firstBatch := []tjstore.TryJobResult{
+		{
+			GroupParams: gp,
+			Options:     op,
+			ResultParams: paramtools.Params{
+				types.PrimaryKeyField: "test-1",
+			},
+			Digest: firstDigest,
+		},
+	}
+
+	err := f.PutResults(ctx, psID, tryjobID, cis, firstBatch, firstTime)
+	assert.NoError(t, err)
+
+	secondBatch := []tjstore.TryJobResult{
+		{
+			GroupParams: gp,
+			Options:     op,
+			ResultParams: paramtools.Params{
+				types.PrimaryKeyField: "test-2",
+			},
+			Digest: secondDigest,
+		},
+	}
+
+	err = f.PutResults(ctx, psID, tryjobID, cis, secondBatch, secondTime)
+	assert.NoError(t, err)
+
+	// Empty time is all results
+	results, err := f.GetResults(ctx, psID, time.Time{})
+	require.NoError(t, err)
+	assert.Len(t, results, 2)
+
+	// This time should still cover both results.
+	results, err = f.GetResults(ctx, psID, beforeTime)
+	require.NoError(t, err)
+	assert.Len(t, results, 2)
+
+	// range is greater than or equal to the given time.
+	results, err = f.GetResults(ctx, psID, firstTime)
+	require.NoError(t, err)
+	assert.Len(t, results, 2)
+
+	// We should only see the second one
+	results, err = f.GetResults(ctx, psID, inbetweenTime)
+	require.NoError(t, err)
+	require.Len(t, results, 1)
+	assert.Equal(t, secondDigest, results[0].Digest)
+
+	// range is greater than or equal to the given time.
+	results, err = f.GetResults(ctx, psID, secondTime)
+	require.NoError(t, err)
+	require.Len(t, results, 1)
+	assert.Equal(t, secondDigest, results[0].Digest)
+
+	results, err = f.GetResults(ctx, psID, afterTime)
+	require.NoError(t, err)
+	assert.Empty(t, results)
+}
+
 // TestPutGetResultsNoOptions makes sure that options (which are optional) can be omitted
 // and everything still works
 func TestPutGetResultsNoOptions(t *testing.T) {
@@ -349,7 +441,7 @@
 		},
 	}
 
-	err := f.PutResults(ctx, psID, tryJobID, cis, xtr)
+	err := f.PutResults(ctx, psID, tryJobID, cis, xtr, time.Now())
 	assert.NoError(t, err)
 
 	xtr, err = f.GetResults(ctx, psID, time.Time{})
@@ -407,7 +499,7 @@
 		})
 	}
 
-	err := f.PutResults(ctx, psID, tryJobID, cis, xtr)
+	err := f.PutResults(ctx, psID, tryJobID, cis, xtr, time.Now())
 	assert.NoError(t, err)
 
 	xtr, err = f.GetResults(ctx, psID, time.Time{})
diff --git a/golden/go/tjstore/mocks/Store.go b/golden/go/tjstore/mocks/Store.go
index 733d932..2f7b8fa 100644
--- a/golden/go/tjstore/mocks/Store.go
+++ b/golden/go/tjstore/mocks/Store.go
@@ -85,13 +85,13 @@
 	return r0, r1
 }
 
-// PutResults provides a mock function with given fields: ctx, psID, tjID, cisName, r
-func (_m *Store) PutResults(ctx context.Context, psID tjstore.CombinedPSID, tjID string, cisName string, r []tjstore.TryJobResult) error {
-	ret := _m.Called(ctx, psID, tjID, cisName, r)
+// PutResults provides a mock function with given fields: ctx, psID, tjID, cisName, r, ts
+func (_m *Store) PutResults(ctx context.Context, psID tjstore.CombinedPSID, tjID string, cisName string, r []tjstore.TryJobResult, ts time.Time) error {
+	ret := _m.Called(ctx, psID, tjID, cisName, r, ts)
 
 	var r0 error
-	if rf, ok := ret.Get(0).(func(context.Context, tjstore.CombinedPSID, string, string, []tjstore.TryJobResult) error); ok {
-		r0 = rf(ctx, psID, tjID, cisName, r)
+	if rf, ok := ret.Get(0).(func(context.Context, tjstore.CombinedPSID, string, string, []tjstore.TryJobResult, time.Time) error); ok {
+		r0 = rf(ctx, psID, tjID, cisName, r, ts)
 	} else {
 		r0 = ret.Error(0)
 	}
diff --git a/golden/go/tjstore/tjstore.go b/golden/go/tjstore/tjstore.go
index 4bfb532..74f8dde 100644
--- a/golden/go/tjstore/tjstore.go
+++ b/golden/go/tjstore/tjstore.go
@@ -28,7 +28,9 @@
 	GetTryJobs(ctx context.Context, psID CombinedPSID) ([]ci.TryJob, error)
 
 	// GetResults returns any TryJobResults for a given ChangeList and PatchSet.
-	// The returned slice could be empty and is not sorted.
+	// The returned slice could be empty and is not sorted. If updatedAfter is not
+	// a zero time, it will be used to return the subset of results created on or after
+	// the given time.
 	GetResults(ctx context.Context, psID CombinedPSID, updatedAfter time.Time) ([]TryJobResult, error)
 
 	// PutTryJob stores the given TryJob, overwriting any values for
@@ -38,10 +40,11 @@
 
 	// PutResults stores the given TryJobResult. The TryJobResults will "belong"
 	// to the associated ChangeList and PatchSet. tjID is the SystemID of the TryJob which
-	// produced the TryJobResults.
+	// produced the TryJobResults. ts is the timestamp that will be used to mark the creating
+	// time of these results (see GetResults).
 	// Of note, a typical Skia TryJob might have 5-10k TryJobResult objects.
 	// An error may mean partial success.
-	PutResults(ctx context.Context, psID CombinedPSID, tjID, cisName string, r []TryJobResult) error
+	PutResults(ctx context.Context, psID CombinedPSID, tjID, cisName string, r []TryJobResult, ts time.Time) error
 }
 
 var ErrNotFound = errors.New("not found")