[gold] Make CLSummaries aware of triage activity

We now correctly identify stale cache entries as having to
do with seeing Changelist data AND triage actions taken
by a user.

We also warm the caches by taking into account the triage
actions.

This also changes how we handle stale cache entries. We try
to fetch the latest data, but if it's not available in time
(500ms), we return the stale data flagged as such (so the
client knows to retry it later).

We went with this short timeout approach to balance
keeping the RPC responsive but also serving fresh data now
if we can get it quickly.

Bug: skia:11917
Change-Id: I88c95ad6819fb81feeec914862fb79acf2b693be
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/401923
Reviewed-by: Leandro Lovisolo <lovisolo@google.com>
diff --git a/golden/go/search2/frontend/frontend.go b/golden/go/search2/frontend/frontend.go
index 5aab789..2935d53 100644
--- a/golden/go/search2/frontend/frontend.go
+++ b/golden/go/search2/frontend/frontend.go
@@ -13,6 +13,9 @@
 	ChangelistID string `json:"changelist_id"`
 	// PatchsetSummaries is a summary for all Patchsets for which we have data.
 	PatchsetSummaries []PatchsetNewAndUntriagedSummaryV1 `json:"patchsets"`
+	// Outdated will be true if this is a stale cached entry. Clients are free to try again later
+	// for the latest results.
+	Outdated bool `json:"outdated"`
 }
 
 // PatchsetNewAndUntriagedSummaryV1 is the summary for a specific PS. It focuses on the untriaged
@@ -55,5 +58,6 @@
 	return ChangelistSummaryResponseV1{
 		ChangelistID:      summary.ChangelistID,
 		PatchsetSummaries: xps,
+		Outdated:          summary.Outdated,
 	}
 }
diff --git a/golden/go/search2/search2.go b/golden/go/search2/search2.go
index 7d2c0c9..a6c59f8 100644
--- a/golden/go/search2/search2.go
+++ b/golden/go/search2/search2.go
@@ -53,6 +53,10 @@
 	// LastUpdated returns the timestamp of the CL, which corresponds to the last datapoint for
 	// this CL.
 	LastUpdated time.Time
+	// Outdated is set to true if the value that was previously cached was out of date and is
+	// currently being recalculated. We do this to return something quickly to the user (even if
+	// something like the
+	Outdated bool
 }
 
 // PatchsetNewAndUntriagedSummary is the summary for a specific PS. It focuses on the untriaged
@@ -247,9 +251,9 @@
 	}
 	var updatedTS time.Time
 	eg.Go(func() error {
-		row := s.db.QueryRow(ctx, `SELECT last_ingested_data
-FROM Changelists WHERE changelist_id = $1`, qCLID)
-		return skerr.Wrap(row.Scan(&updatedTS))
+		var err error
+		updatedTS, err = s.ChangelistLastUpdated(ctx, qCLID)
+		return skerr.Wrap(err)
 	})
 	if err := eg.Wait(); err != nil {
 		return NewAndUntriagedSummary{}, skerr.Wrapf(err, "Getting counts for CL %q and %d PS", qCLID, len(patchsets))
@@ -365,8 +369,21 @@
 	ctx, span := trace.StartSpan(ctx, "search2_ChangelistLastUpdated")
 	defer span.End()
 	var updatedTS time.Time
-	row := s.db.QueryRow(ctx, `SELECT last_ingested_data
-FROM Changelists AS OF SYSTEM TIME '-0.1s' WHERE changelist_id = $1`, qCLID)
+	row := s.db.QueryRow(ctx, `WITH
+LastSeenData AS (
+	SELECT last_ingested_data as ts FROM Changelists
+	WHERE changelist_id = $1
+),
+LatestTriageAction AS (
+	SELECT triage_time as ts FROM ExpectationRecords
+	WHERE branch_name = $1
+    ORDER BY triage_time DESC LIMIT 1
+)
+SELECT ts FROM LastSeenData
+UNION
+SELECT ts FROM LatestTriageAction
+ORDER BY ts DESC LIMIT 1
+`, qCLID)
 	if err := row.Scan(&updatedTS); err != nil {
 		return time.Time{}, skerr.Wrapf(err, "Getting last updated ts for cl %q", qCLID)
 	}
diff --git a/golden/go/search2/search2_test.go b/golden/go/search2/search2_test.go
index d51b944..6575755 100644
--- a/golden/go/search2/search2_test.go
+++ b/golden/go/search2/search2_test.go
@@ -23,8 +23,10 @@
 	web_frontend "go.skia.org/infra/golden/go/web/frontend"
 )
 
-var changelistTSForIOS = time.Date(2020, time.December, 10, 4, 5, 6, 0, time.UTC)
-var changelistTSForNewTests = time.Date(2020, time.December, 12, 9, 20, 33, 0, time.UTC)
+// These are the later of the times of the last ingested data or last triage action for the
+// given CL.
+var changelistTSForIOS = time.Date(2020, time.December, 10, 5, 0, 2, 0, time.UTC)
+var changelistTSForNewTests = time.Date(2020, time.December, 12, 9, 31, 32, 0, time.UTC)
 
 func TestNewAndUntriagedSummaryForCL_OnePatchset_Success(t *testing.T) {
 	unittest.LargeTest(t)
@@ -67,7 +69,6 @@
 	require.NoError(t, s.StartCacheProcess(ctx, time.Minute, 100))
 	rv, err := s.NewAndUntriagedSummaryForCL(ctx, sql.Qualify(dks.GerritInternalCRS, dks.ChangelistIDThatAddsNewTests))
 	require.NoError(t, err)
-
 	assert.Equal(t, NewAndUntriagedSummary{
 		ChangelistID: dks.ChangelistIDThatAddsNewTests,
 		// Should be sorted by PatchsetOrder
@@ -396,10 +397,10 @@
 			{dks.ColorModeKey: dks.RGBColorMode, types.CorpusField: dks.CornersCorpus, types.PrimaryKeyField: dks.TriangleTest},
 			{dks.ColorModeKey: dks.RGBColorMode, types.CorpusField: dks.RoundCorpus, types.PrimaryKeyField: dks.CircleTest},
 		}).OptionsAll(paramtools.Params{"ext": "png"}).
-		FromTryjob("tryjob 3", dks.BuildBucketCIS, "My-Test", "whatever", "2021-04-01T02:03:04Z")
+		FromTryjob("tryjob 3", dks.BuildBucketCIS, "My-Test", "whatever", "2021-04-05T02:03:04Z")
 
-	// The digest was triaged negative after data from the third PS was ingested, but it should
-	// retroactively apply to all PS in the CL because we don't care about time.
+	// The digest was triaged negative after data from PS 2 but before PS 3 was ingested. We want
+	// to see that the latest data impacts the timestamp.
 	cl.AddTriageEvent(dks.UserFour, "2021-04-03T00:00:00Z").
 		ExpectationsForGrouping(paramtools.Params{
 			types.CorpusField: dks.RoundCorpus, types.PrimaryKeyField: dks.CircleTest}).
@@ -434,7 +435,7 @@
 			PatchsetID:           ps3ID,
 			PatchsetOrder:        7,
 		}},
-		LastUpdated: time.Date(2021, time.April, 1, 2, 3, 4, 0, time.UTC),
+		LastUpdated: time.Date(2021, time.April, 5, 2, 3, 4, 0, time.UTC),
 	}, rv)
 }
 
diff --git a/golden/go/sql/schema/sql.go b/golden/go/sql/schema/sql.go
index c435f4b..86bdab7 100644
--- a/golden/go/sql/schema/sql.go
+++ b/golden/go/sql/schema/sql.go
@@ -41,7 +41,8 @@
   branch_name STRING,
   user_name STRING NOT NULL,
   triage_time TIMESTAMP WITH TIME ZONE NOT NULL,
-  num_changes INT4 NOT NULL
+  num_changes INT4 NOT NULL,
+  INDEX branch_ts_idx (branch_name, triage_time)
 );
 CREATE TABLE IF NOT EXISTS Expectations (
   grouping_id BYTES,
diff --git a/golden/go/sql/schema/tables.go b/golden/go/sql/schema/tables.go
index 47c165a..c3e20ca 100644
--- a/golden/go/sql/schema/tables.go
+++ b/golden/go/sql/schema/tables.go
@@ -400,7 +400,8 @@
 	TriageTime time.Time `sql:"triage_time TIMESTAMP WITH TIME ZONE NOT NULL"`
 	// NumChanges is how many digests were affected. It corresponds to the number of
 	// ExpectationDelta rows have this record as their parent. It is a denormalized field.
-	NumChanges int `sql:"num_changes INT4 NOT NULL"`
+	NumChanges         int      `sql:"num_changes INT4 NOT NULL"`
+	branchTriagedIndex struct{} `sql:"INDEX branch_ts_idx (branch_name, triage_time)"`
 }
 
 // ToSQLRow implements the sqltest.SQLExporter interface.
diff --git a/golden/go/web/web.go b/golden/go/web/web.go
index d3e9bb9..7d54a90 100644
--- a/golden/go/web/web.go
+++ b/golden/go/web/web.go
@@ -1880,7 +1880,8 @@
 }
 
 // getCLSummary2 fetches, caches, and returns the summary for a given CL. If the result has already
-// been cached and the cached value is still valid, it will return that cached value.
+// been cached, it will return that cached value with a flag if the value is still up to date or
+// not. If the cached data is stale, it will spawn a goroutine to update the cached value.
 func (wh *Handlers) getCLSummary2(ctx context.Context, qCLID string) (search2.NewAndUntriagedSummary, error) {
 	ts, err := wh.Search2API.ChangelistLastUpdated(ctx, qCLID)
 	if err != nil {
@@ -1892,12 +1893,48 @@
 		sum, ok := cached.(search2.NewAndUntriagedSummary)
 		if ok {
 			if ts.Before(sum.LastUpdated) || sum.LastUpdated.Equal(ts) {
+				sum.Outdated = false
 				return sum, nil
 			}
+			// Result is stale. Start a goroutine to fetch it again.
+			done := make(chan struct{})
+			go func() {
+				// We intentionally use context.Background() and not the request's context because
+				// if we return a result, we want the fetching in the background to continue so
+				// if/when the client tries again, we can serve that updated result.
+				ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
+				defer cancel()
+				newValue, err := wh.Search2API.NewAndUntriagedSummaryForCL(ctx, qCLID)
+				if err != nil {
+					sklog.Warningf("Could not fetch out of date summary for cl %s in background: %s", qCLID, err)
+					return
+				}
+				wh.clSummaryCache.Add(qCLID, newValue)
+				done <- struct{}{}
+			}()
+			// Wait up to 500ms to return the latest value quickly if available
+			timer := time.NewTimer(500 * time.Millisecond)
+			defer timer.Stop()
+			select {
+			case <-done:
+			case <-timer.C:
+			}
+			cached, ok = wh.clSummaryCache.Get(qCLID)
+			if ok {
+				if possiblyUpdated, ok := cached.(search2.NewAndUntriagedSummary); ok {
+					if ts.Before(possiblyUpdated.LastUpdated) || possiblyUpdated.LastUpdated.Equal(ts) {
+						// We were able to fetch new data quickly, so return it now.
+						possiblyUpdated.Outdated = false
+						return possiblyUpdated, nil
+					}
+				}
+			}
+			// The cached data is still stale or invalid, so return what we have marked as outdated.
+			sum.Outdated = true
+			return sum, nil
 		}
-		// Invalid or old cache entry - must overwrite.
 	}
-
+	// Invalid or missing cache entry. We must fetch because we have nothing to give the user.
 	sum, err := wh.Search2API.NewAndUntriagedSummaryForCL(ctx, qCLID)
 	if err != nil {
 		return search2.NewAndUntriagedSummary{}, skerr.Wrap(err)
@@ -1909,17 +1946,26 @@
 // StartCacheWarming starts a go routine to warm the CL Summary cache. This way, most summaries are
 // responsive, even on big instances.
 func (wh *Handlers) StartCacheWarming(ctx context.Context) {
-	// We warm every CL that was open and produced data in the last 5 days. After the first cycle,
-	// we will incrementally update the cache.
+	// We warm every CL that was open and produced data or saw triage activity in the last 5 days.
+	// After the first cycle, we will incrementally update the cache.
 	lastCheck := now.Now(ctx).Add(-5 * 24 * time.Hour)
 	go util.RepeatCtx(ctx, time.Minute, func(ctx context.Context) {
 		ctx, span := trace.StartSpan(ctx, "web_warmCacheCycle", trace.WithSampler(trace.AlwaysSample()))
 		defer span.End()
 		newTS := now.Now(ctx)
-		rows, err := wh.DB.Query(ctx, `
-SELECT changelist_id FROM Changelists AS OF SYSTEM TIME '-0.1s'
-WHERE status = 'open' and last_ingested_data > $1
-ORDER BY last_ingested_data DESC`, lastCheck)
+		rows, err := wh.DB.Query(ctx, `WITH
+ChangelistsWithNewData AS (
+	SELECT changelist_id FROM Changelists
+	WHERE status = 'open' and last_ingested_data > $1
+),
+ChangelistsWithTriageActivity AS (
+	SELECT DISTINCT branch_name AS changelist_id FROM ExpectationRecords
+	WHERE branch_name IS NOT NULL AND triage_time > $1
+)
+SELECT changelist_id FROM ChangelistsWithNewData
+UNION
+SELECT changelist_id FROM ChangelistsWithTriageActivity
+`, lastCheck)
 		if err != nil {
 			if err == pgx.ErrNoRows {
 				sklog.Infof("No CLS updated since %s", lastCheck)
@@ -1940,7 +1986,7 @@
 		}
 		sklog.Infof("Warming cache for %d CLs", len(qualifiedIDS))
 		span.AddAttributes(trace.Int64Attribute("num_cls", int64(len(qualifiedIDS))))
-		// warm cache 3 at a time.
+		// warm cache 3 at a time. This number of goroutines was chosen arbitrarily.
 		_ = util.ChunkIterParallel(ctx, len(qualifiedIDS), len(qualifiedIDS)/3+1, func(ctx context.Context, startIdx int, endIdx int) error {
 			if err := ctx.Err(); err != nil {
 				return nil
diff --git a/golden/go/web/web_test.go b/golden/go/web/web_test.go
index 472c4cd..7d03712 100644
--- a/golden/go/web/web_test.go
+++ b/golden/go/web/web_test.go
@@ -2883,11 +2883,11 @@
 	})
 	wh.ChangelistSummaryHandler(w, r)
 	// Note this JSON had the patchsets sorted so the latest one is first.
-	const expectedJSON = `{"changelist_id":"my_cl","patchsets":[{"new_images":5,"new_untriaged_images":6,"total_untriaged_images":7,"patchset_id":"patchset8","patchset_order":8},{"new_images":1,"new_untriaged_images":2,"total_untriaged_images":3,"patchset_id":"patchset1","patchset_order":1}]}`
+	const expectedJSON = `{"changelist_id":"my_cl","patchsets":[{"new_images":5,"new_untriaged_images":6,"total_untriaged_images":7,"patchset_id":"patchset8","patchset_order":8},{"new_images":1,"new_untriaged_images":2,"total_untriaged_images":3,"patchset_id":"patchset1","patchset_order":1}],"outdated":false}`
 	assertJSONResponseWas(t, http.StatusOK, expectedJSON, w)
 }
 
-func TestChangelistSummaryHandler_ValidInput_CacheUsed(t *testing.T) {
+func TestChangelistSummaryHandler_CachedValueStaleButUpdatesQuickly_ReturnsFreshResult(t *testing.T) {
 	unittest.SmallTest(t)
 
 	ms := &mock_search2.API{}
@@ -2945,7 +2945,75 @@
 			continue
 		}
 		// Note this JSON had the patchsets sorted so the latest one is first.
-		const expectedJSON = `{"changelist_id":"my_cl","patchsets":[{"new_images":5,"new_untriaged_images":6,"total_untriaged_images":7,"patchset_id":"patchset8","patchset_order":8},{"new_images":1,"new_untriaged_images":2,"total_untriaged_images":3,"patchset_id":"patchset1","patchset_order":1}]}`
+		const expectedJSON = `{"changelist_id":"my_cl","patchsets":[{"new_images":5,"new_untriaged_images":6,"total_untriaged_images":7,"patchset_id":"patchset8","patchset_order":8},{"new_images":1,"new_untriaged_images":2,"total_untriaged_images":3,"patchset_id":"patchset1","patchset_order":1}],"outdated":false}`
+		assertJSONResponseWas(t, http.StatusOK, expectedJSON, w)
+	}
+	ms.AssertExpectations(t)
+}
+
+func TestChangelistSummaryHandler_CachedValueStaleUpdatesSlowly_ReturnsStaleResult(t *testing.T) {
+	unittest.SmallTest(t)
+
+	ms := &mock_search2.API{}
+	// First call should have just one PS.
+	ms.On("NewAndUntriagedSummaryForCL", testutils.AnyContext, "my-system_my_cl").Return(search2.NewAndUntriagedSummary{
+		ChangelistID: "my_cl",
+		PatchsetSummaries: []search2.PatchsetNewAndUntriagedSummary{{
+			NewImages:            1,
+			NewUntriagedImages:   2,
+			TotalUntriagedImages: 3,
+			PatchsetID:           "patchset1",
+			PatchsetOrder:        1,
+		}},
+		LastUpdated: time.Date(2021, time.March, 1, 1, 1, 1, 0, time.UTC),
+	}, nil).Once()
+	// Second call should have two PS and the latest timestamp.
+	ms.On("NewAndUntriagedSummaryForCL", testutils.AnyContext, "my-system_my_cl").Return(func(context.Context, string) search2.NewAndUntriagedSummary {
+		// This is longer than the time we wait before giving up and returning stale results.
+		time.Sleep(2 * time.Second)
+		return search2.NewAndUntriagedSummary{
+			ChangelistID: "my_cl",
+			PatchsetSummaries: []search2.PatchsetNewAndUntriagedSummary{{
+				NewImages:            1,
+				NewUntriagedImages:   2,
+				TotalUntriagedImages: 3,
+				PatchsetID:           "patchset1",
+				PatchsetOrder:        1,
+			}, {
+				NewImages:            5,
+				NewUntriagedImages:   6,
+				TotalUntriagedImages: 7,
+				PatchsetID:           "patchset8",
+				PatchsetOrder:        8,
+			}},
+			LastUpdated: time.Date(2021, time.April, 1, 1, 1, 1, 0, time.UTC),
+		}
+	}, nil).Once()
+	ms.On("ChangelistLastUpdated", testutils.AnyContext, "my-system_my_cl").Return(time.Date(2021, time.April, 1, 1, 1, 1, 0, time.UTC), nil)
+
+	wh := initCaches(Handlers{
+		HandlersConfig: HandlersConfig{
+			Search2API: ms,
+			ReviewSystems: []clstore.ReviewSystem{{
+				ID: "my-system",
+			}},
+		},
+		anonymousGerritQuota: rate.NewLimiter(rate.Inf, 1),
+	})
+
+	for i := 0; i < 2; i++ {
+		w := httptest.NewRecorder()
+		r := httptest.NewRequest(http.MethodGet, requestURL, nil)
+		r = mux.SetURLVars(r, map[string]string{
+			"id":     "my_cl",
+			"system": "my-system",
+		})
+		wh.ChangelistSummaryHandler(w, r)
+		if i == 0 {
+			continue
+		}
+		// Note this JSON is the first result marked as stale.
+		const expectedJSON = `{"changelist_id":"my_cl","patchsets":[{"new_images":1,"new_untriaged_images":2,"total_untriaged_images":3,"patchset_id":"patchset1","patchset_order":1}],"outdated":true}`
 		assertJSONResponseWas(t, http.StatusOK, expectedJSON, w)
 	}
 	ms.AssertExpectations(t)
@@ -3048,7 +3116,6 @@
 	defer cancel()
 	db := sqltest.NewCockroachDBForTestsWithProductionSchema(ctx, t)
 	require.NoError(t, sqltest.BulkInsertDataTables(ctx, db, datakitchensink.Build()))
-	waitForSystemTime()
 
 	wh := initCaches(Handlers{
 		HandlersConfig: HandlersConfig{