[gold] Use time in tryjob results
Bug: skia:10399
Change-Id: I7eec72ab18747d01df5ba072a8f61d0d1a660248
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/297736
Reviewed-by: Leandro Lovisolo <lovisolo@google.com>
diff --git a/golden/go/ingestion_processors/tryjob_ingestion.go b/golden/go/ingestion_processors/tryjob_ingestion.go
index f4200f4..96bf9c4 100644
--- a/golden/go/ingestion_processors/tryjob_ingestion.go
+++ b/golden/go/ingestion_processors/tryjob_ingestion.go
@@ -276,7 +276,7 @@
if err := g.changeListStore.PutPatchSet(ctx, ps); err != nil {
return skerr.Wrapf(err, "could not store PS %s of CL %q to clstore", psID, clID)
}
- err = g.tryJobStore.PutResults(ctx, combinedID, tjID, cisName, tjr)
+ err = g.tryJobStore.PutResults(ctx, combinedID, tjID, cisName, tjr, time.Now())
if err != nil {
return skerr.Wrapf(err, "putting %d results for CL %s, PS %d (%s), TJ %s, file %s", len(tjr), clID, psOrder, psID, tjID, rf.Name())
}
diff --git a/golden/go/ingestion_processors/tryjob_ingestion_test.go b/golden/go/ingestion_processors/tryjob_ingestion_test.go
index 05542cf..0c3bb70 100644
--- a/golden/go/ingestion_processors/tryjob_ingestion_test.go
+++ b/golden/go/ingestion_processors/tryjob_ingestion_test.go
@@ -102,7 +102,7 @@
mcls.On("PutPatchSet", testutils.AnyContext, makeGerritPatchSet()).Return(nil).Once()
mtjs.On("PutTryJob", testutils.AnyContext, gerritCombinedID, makeGerritBuildbucketTryJob()).Return(nil).Once()
- mtjs.On("PutResults", testutils.AnyContext, gerritCombinedID, gerritTJID, buildbucketCIS, makeTryJobResults()).Return(nil).Once()
+ mtjs.On("PutResults", testutils.AnyContext, gerritCombinedID, gerritTJID, buildbucketCIS, makeTryJobResults(), anyTime).Return(nil).Once()
gtp := goldTryjobProcessor{
changeListStore: mcls,
@@ -143,7 +143,7 @@
combinedID := tjstore.CombinedPSID{CL: githubCLID, PS: githubPSID, CRS: "github"}
mtjs.On("PutTryJob", testutils.AnyContext, combinedID, makeGitHubCirrusTryJob()).Return(nil)
- mtjs.On("PutResults", testutils.AnyContext, combinedID, githubTJID, cirrusCIS, makeGitHubTryJobResults()).Return(nil)
+ mtjs.On("PutResults", testutils.AnyContext, combinedID, githubTJID, cirrusCIS, makeGitHubTryJobResults(), anyTime).Return(nil)
gtp := goldTryjobProcessor{
changeListStore: mcls,
@@ -253,7 +253,7 @@
mcls.On("PutChangeList", testutils.AnyContext, clWithUpdatedTime(t, gerritCLID, gerritCLDate)).Return(nil)
mtjs.On("PutTryJob", testutils.AnyContext, gerritCombinedID, makeGerritBuildbucketTryJob()).Return(nil)
- mtjs.On("PutResults", testutils.AnyContext, gerritCombinedID, gerritTJID, buildbucketCIS, makeTryJobResults()).Return(nil)
+ mtjs.On("PutResults", testutils.AnyContext, gerritCombinedID, gerritTJID, buildbucketCIS, makeTryJobResults(), anyTime).Return(nil)
gtp := goldTryjobProcessor{
changeListStore: mcls,
@@ -290,7 +290,7 @@
mcls.On("PutChangeList", testutils.AnyContext, clWithUpdatedTime(t, gerritCLID, gerritCLDate)).Return(nil)
mtjs.On("PutTryJob", testutils.AnyContext, gerritCombinedID, makeGerritBuildbucketTryJob()).Return(nil)
- mtjs.On("PutResults", testutils.AnyContext, gerritCombinedID, gerritTJID, buildbucketCIS, makeTryJobResults()).Return(nil)
+ mtjs.On("PutResults", testutils.AnyContext, gerritCombinedID, gerritTJID, buildbucketCIS, makeTryJobResults(), anyTime).Return(nil)
gtp := goldTryjobProcessor{
changeListStore: mcls,
@@ -324,7 +324,7 @@
mcls.On("PutPatchSet", testutils.AnyContext, makeGerritPatchSet()).Return(nil)
mtjs.On("PutTryJob", testutils.AnyContext, gerritCombinedID, makeGerritBuildbucketTryJob()).Return(nil)
- mtjs.On("PutResults", testutils.AnyContext, gerritCombinedID, gerritTJID, buildbucketCIS, makeTryJobResults()).Return(nil)
+ mtjs.On("PutResults", testutils.AnyContext, gerritCombinedID, gerritTJID, buildbucketCIS, makeTryJobResults(), anyTime).Return(nil)
gtp := goldTryjobProcessor{
changeListStore: mcls,
@@ -368,6 +368,8 @@
gerritCombinedID = tjstore.CombinedPSID{CL: gerritCLID, PS: gerritPSID, CRS: gerritCRS}
gerritCLDate = time.Date(2019, time.August, 19, 18, 17, 16, 0, time.UTC)
+
+ anyTime = mock.MatchedBy(func(time.Time) bool { return true })
)
// These are functions to avoid mutations causing issues for future tests/checks
diff --git a/golden/go/tjstore/fs_tjstore/FIRESTORE.md b/golden/go/tjstore/fs_tjstore/FIRESTORE.md
index e51029e..7489838 100644
--- a/golden/go/tjstore/fs_tjstore/FIRESTORE.md
+++ b/golden/go/tjstore/fs_tjstore/FIRESTORE.md
@@ -47,9 +47,15 @@
- TryJobResult.OptionsHash
- Params.Map
-Currently, all queries can be handled via Firestore's index merging
-<https://firebase.google.com/docs/firestore/query-data/index-overview#taking_advantage_of_index_merging>
-because we chain "==" filters together.
+We need the following complex indices:
+
+Collection ID | Fields
+------------------------------------------------------------------
+tjstore_result | clid: ASC crs: ASC psid: ASC digest: ASC
+tjstore_result | clid: ASC crs: ASC psid: ASC ts: ASC
+
+The first is for fetching results in a sharded fashion. The second is for fetching results
+after a certain time.
Usage
-----
diff --git a/golden/go/tjstore/fs_tjstore/fs_tjstore.go b/golden/go/tjstore/fs_tjstore/fs_tjstore.go
index 414461e..3f9bf87 100644
--- a/golden/go/tjstore/fs_tjstore/fs_tjstore.go
+++ b/golden/go/tjstore/fs_tjstore/fs_tjstore.go
@@ -34,6 +34,7 @@
crsField = "crs"
patchSetIDField = "psid"
digestField = "digest"
+ timestampField = "ts"
maxReadAttempts = 5
maxWriteAttempts = 5
@@ -92,6 +93,8 @@
ResultParams map[string]string `firestore:"result_params"`
GroupParamsHash string `firestore:"group_hash"`
OptionsHash string `firestore:"options_hash"`
+
+ CreatedTS time.Time `firestore:"ts"`
}
// paramEntry represents a paramTools.Params stored in FireStore
@@ -171,19 +174,31 @@
return xtj, nil
}
-// GetResults implements the tjstore.Store interface. TODO(kjlubick) add support for updatedAfter
-// after the updated ingestion logic has baked in, filling the results with timestamps.
+// GetResults implements the tjstore.Store interface.
func (s *StoreImpl) GetResults(ctx context.Context, psID tjstore.CombinedPSID, updatedAfter time.Time) ([]tjstore.TryJobResult, error) {
defer metrics2.FuncTimer().Stop()
q := s.client.Collection(tjResultCollection).Where(crsField, "==", psID.CRS).
Where(changeListIDField, "==", psID.CL).Where(patchSetIDField, "==", psID.PS)
- shardResults := make([][]resultEntry, resultShards)
- queries := fs_utils.ShardOnDigest(q, digestField, resultShards)
+ shards := resultShards
+ var queries []firestore.Query
+ if !updatedAfter.IsZero() {
+ // If we are including a time, we can't shard, since sharding uses inequalities and firestore
+ // won't let you do two inequalities on different fields. This may result in reduced
+ // performance, but in practice this shouldn't matter too much because we:
+ // 1) Only provide a time when doing a partial load of results while indexing changelists.
+ // 2) Index changelists in the background (and in parallel), not in a user-visible way.
+ shards = 1
+ queries = []firestore.Query{q.Where(timestampField, ">=", updatedAfter)}
+ } else {
+ queries = fs_utils.ShardOnDigest(q, digestField, shards)
+ }
+
+ shardResults := make([][]resultEntry, shards)
// maps hash -> params we need to fetch
// We will first add keys to this map, then go fetch the actual params
- shardParams := make([]util.StringSet, resultShards)
+ shardParams := make([]util.StringSet, shards)
err := s.client.IterDocsInParallel(ctx, "GetResults", psID.Key(), queries, maxReadAttempts, maxOperationTime, func(i int, doc *firestore.DocumentSnapshot) error {
if doc == nil {
@@ -299,7 +314,7 @@
// This would make a rollback difficult, so we opt to retry any failures multiple times.
// We store maps first, so if we do fail, we can bail out w/o having written the
// (incomplete) TryJobResults. We take a similar approach in fs_expstore, which has been fine.
-func (s *StoreImpl) PutResults(ctx context.Context, psID tjstore.CombinedPSID, tjID, cisName string, r []tjstore.TryJobResult) error {
+func (s *StoreImpl) PutResults(ctx context.Context, psID tjstore.CombinedPSID, tjID, cisName string, r []tjstore.TryJobResult, ts time.Time) error {
if len(r) == 0 {
return nil
}
@@ -318,6 +333,8 @@
Digest: tr.Digest,
ResultParams: tr.ResultParams,
+
+ CreatedTS: ts,
}
gh, err := hashParams(tr.GroupParams)
if err != nil {
diff --git a/golden/go/tjstore/fs_tjstore/fs_tjstore_test.go b/golden/go/tjstore/fs_tjstore/fs_tjstore_test.go
index fd15d15..ad18bb0 100644
--- a/golden/go/tjstore/fs_tjstore/fs_tjstore_test.go
+++ b/golden/go/tjstore/fs_tjstore/fs_tjstore_test.go
@@ -244,7 +244,7 @@
})
}
- err := f.PutResults(ctx, psID, firstTJID, cis, xtr)
+ err := f.PutResults(ctx, psID, firstTJID, cis, xtr, time.Now())
assert.NoError(t, err)
gp = paramtools.Params{
@@ -273,7 +273,7 @@
Digest: fakeDigest("crust", 4),
})
- err = f.PutResults(ctx, psID, secondTJID, cis, xtr)
+ err = f.PutResults(ctx, psID, secondTJID, cis, xtr, time.Now())
assert.NoError(t, err)
otherPSID := tjstore.CombinedPSID{
@@ -291,7 +291,7 @@
types.PrimaryKeyField: "test-4",
},
Digest: "abcdef",
- }})
+ }}, time.Now())
assert.NoError(t, err)
xtr, err = f.GetResults(ctx, psID, time.Time{})
@@ -315,6 +315,98 @@
assert.Equal(t, 5, crustCounts)
}
+func TestPutResultsGetResults_Timestamps(t *testing.T) {
+ unittest.LargeTest(t)
+ c, cleanup := ifirestore.NewClientForTesting(context.Background(), t)
+ defer cleanup()
+
+ f := New(c)
+ ctx := context.Background()
+ const cis = "cirrus"
+ const firstDigest = types.Digest("1111111111111111")
+ const secondDigest = types.Digest("2222222222222222")
+ const tryjobID = "987654"
+
+ beforeTime := time.Date(2020, time.June, 1, 0, 0, 0, 0, time.UTC)
+ firstTime := time.Date(2020, time.June, 1, 1, 1, 1, 0, time.UTC)
+ inbetweenTime := time.Date(2020, time.June, 2, 2, 2, 2, 0, time.UTC)
+ secondTime := time.Date(2020, time.June, 3, 3, 3, 3, 0, time.UTC)
+ afterTime := time.Date(2020, time.June, 4, 4, 4, 4, 0, time.UTC)
+
+ psID := tjstore.CombinedPSID{
+ CL: "1234",
+ CRS: "github",
+ PS: "abcd",
+ }
+
+ gp := paramtools.Params{
+ "os": "Android",
+ "model": "crustacean",
+ }
+ op := paramtools.Params{
+ "ext": "png",
+ }
+
+ firstBatch := []tjstore.TryJobResult{
+ {
+ GroupParams: gp,
+ Options: op,
+ ResultParams: paramtools.Params{
+ types.PrimaryKeyField: "test-1",
+ },
+ Digest: firstDigest,
+ },
+ }
+
+ err := f.PutResults(ctx, psID, tryjobID, cis, firstBatch, firstTime)
+ assert.NoError(t, err)
+
+ secondBatch := []tjstore.TryJobResult{
+ {
+ GroupParams: gp,
+ Options: op,
+ ResultParams: paramtools.Params{
+ types.PrimaryKeyField: "test-2",
+ },
+ Digest: secondDigest,
+ },
+ }
+
+ err = f.PutResults(ctx, psID, tryjobID, cis, secondBatch, secondTime)
+ assert.NoError(t, err)
+
+ // Empty time is all results
+ results, err := f.GetResults(ctx, psID, time.Time{})
+ require.NoError(t, err)
+ assert.Len(t, results, 2)
+
+ // This time should still cover both results.
+ results, err = f.GetResults(ctx, psID, beforeTime)
+ require.NoError(t, err)
+ assert.Len(t, results, 2)
+
+ // range is greater than or equal to the given time.
+ results, err = f.GetResults(ctx, psID, firstTime)
+ require.NoError(t, err)
+ assert.Len(t, results, 2)
+
+ // We should only see the second one
+ results, err = f.GetResults(ctx, psID, inbetweenTime)
+ require.NoError(t, err)
+ require.Len(t, results, 1)
+ assert.Equal(t, secondDigest, results[0].Digest)
+
+ // range is greater than or equal to the given time.
+ results, err = f.GetResults(ctx, psID, secondTime)
+ require.NoError(t, err)
+ require.Len(t, results, 1)
+ assert.Equal(t, secondDigest, results[0].Digest)
+
+ results, err = f.GetResults(ctx, psID, afterTime)
+ require.NoError(t, err)
+ assert.Empty(t, results)
+}
+
// TestPutGetResultsNoOptions makes sure that options (which are optional) can be omitted
// and everything still works
func TestPutGetResultsNoOptions(t *testing.T) {
@@ -349,7 +441,7 @@
},
}
- err := f.PutResults(ctx, psID, tryJobID, cis, xtr)
+ err := f.PutResults(ctx, psID, tryJobID, cis, xtr, time.Now())
assert.NoError(t, err)
xtr, err = f.GetResults(ctx, psID, time.Time{})
@@ -407,7 +499,7 @@
})
}
- err := f.PutResults(ctx, psID, tryJobID, cis, xtr)
+ err := f.PutResults(ctx, psID, tryJobID, cis, xtr, time.Now())
assert.NoError(t, err)
xtr, err = f.GetResults(ctx, psID, time.Time{})
diff --git a/golden/go/tjstore/mocks/Store.go b/golden/go/tjstore/mocks/Store.go
index 733d932..2f7b8fa 100644
--- a/golden/go/tjstore/mocks/Store.go
+++ b/golden/go/tjstore/mocks/Store.go
@@ -85,13 +85,13 @@
return r0, r1
}
-// PutResults provides a mock function with given fields: ctx, psID, tjID, cisName, r
-func (_m *Store) PutResults(ctx context.Context, psID tjstore.CombinedPSID, tjID string, cisName string, r []tjstore.TryJobResult) error {
- ret := _m.Called(ctx, psID, tjID, cisName, r)
+// PutResults provides a mock function with given fields: ctx, psID, tjID, cisName, r, ts
+func (_m *Store) PutResults(ctx context.Context, psID tjstore.CombinedPSID, tjID string, cisName string, r []tjstore.TryJobResult, ts time.Time) error {
+ ret := _m.Called(ctx, psID, tjID, cisName, r, ts)
var r0 error
- if rf, ok := ret.Get(0).(func(context.Context, tjstore.CombinedPSID, string, string, []tjstore.TryJobResult) error); ok {
- r0 = rf(ctx, psID, tjID, cisName, r)
+ if rf, ok := ret.Get(0).(func(context.Context, tjstore.CombinedPSID, string, string, []tjstore.TryJobResult, time.Time) error); ok {
+ r0 = rf(ctx, psID, tjID, cisName, r, ts)
} else {
r0 = ret.Error(0)
}
diff --git a/golden/go/tjstore/tjstore.go b/golden/go/tjstore/tjstore.go
index 4bfb532..74f8dde 100644
--- a/golden/go/tjstore/tjstore.go
+++ b/golden/go/tjstore/tjstore.go
@@ -28,7 +28,9 @@
GetTryJobs(ctx context.Context, psID CombinedPSID) ([]ci.TryJob, error)
// GetResults returns any TryJobResults for a given ChangeList and PatchSet.
- // The returned slice could be empty and is not sorted.
+ // The returned slice could be empty and is not sorted. If updatedAfter is not
+ // a zero time, it will be used to return the subset of results created on or after
+ // the given time.
GetResults(ctx context.Context, psID CombinedPSID, updatedAfter time.Time) ([]TryJobResult, error)
// PutTryJob stores the given TryJob, overwriting any values for
@@ -38,10 +40,11 @@
// PutResults stores the given TryJobResult. The TryJobResults will "belong"
// to the associated ChangeList and PatchSet. tjID is the SystemID of the TryJob which
- // produced the TryJobResults.
+ // produced the TryJobResults. ts is the timestamp that will be used to mark the creating
+ // time of these results (see GetResults).
// Of note, a typical Skia TryJob might have 5-10k TryJobResult objects.
// An error may mean partial success.
- PutResults(ctx context.Context, psID CombinedPSID, tjID, cisName string, r []TryJobResult) error
+ PutResults(ctx context.Context, psID CombinedPSID, tjID, cisName string, r []TryJobResult, ts time.Time) error
}
var ErrNotFound = errors.New("not found")