Gold Search: Start reading from cache before checking materialized views.

Change-Id: Iee6daedfe588859df4ae182a272fd4782eccaa0a
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/913960
Commit-Queue: Ashwin Verleker <ashwinpv@google.com>
Reviewed-by: Hao Wu <haowoo@google.com>
diff --git a/golden/cmd/gold_frontend/gold_frontend.go b/golden/cmd/gold_frontend/gold_frontend.go
index b0862de..e077ab6 100644
--- a/golden/cmd/gold_frontend/gold_frontend.go
+++ b/golden/cmd/gold_frontend/gold_frontend.go
@@ -169,9 +169,19 @@
 	}
 
 	s2a := search.New(sqlDB, fsc.WindowSize)
+	cacheClient, err := fsc.GetCacheClient(ctx)
+	if err != nil {
+		// TODO(ashwinpv): Once we are fully onboarded, this error should cause a failure.
+		sklog.Warningf("Error while trying to create a new cache client: %v", err)
+	}
+	if cacheClient != nil {
+		sklog.Debugf("Enabling cache for search.")
+		s2a.EnableCache(cacheClient, fsc.CachingCorpora)
+	}
+
 	s2a.SetReviewSystemTemplates(templates)
 	sklog.Infof("SQL Search loaded with CRS templates %s", templates)
-	err := s2a.StartCacheProcess(ctx, 5*time.Minute, fsc.WindowSize)
+	err = s2a.StartCacheProcess(ctx, 5*time.Minute, fsc.WindowSize)
 	if err != nil {
 		sklog.Fatalf("Cannot load caches for search2 backend: %s", err)
 	}
diff --git a/golden/cmd/periodictasks/periodictasks.go b/golden/cmd/periodictasks/periodictasks.go
index f5e1640..74b06a9 100644
--- a/golden/cmd/periodictasks/periodictasks.go
+++ b/golden/cmd/periodictasks/periodictasks.go
@@ -83,12 +83,6 @@
 
 	// UpdateIgnorePeriod is how often we should try to apply the ignore rules to all traces.
 	UpdateIgnorePeriod config.Duration `json:"update_traces_ignore_period"` // TODO(kjlubick) change JSON
-
-	// List of corpora to be enabled for caching.
-	CachingCorpora []string `json:"cache_corpora" optional:"true"`
-
-	// Caching frequency in minutes.
-	CachingFrequencyMinutes int `json:"caching_frequency_minutes" optional:"true"`
 }
 
 type perfSummariesConfig struct {
diff --git a/golden/go/config/config.go b/golden/go/config/config.go
index 3aeb309..9a3c41a 100644
--- a/golden/go/config/config.go
+++ b/golden/go/config/config.go
@@ -87,6 +87,12 @@
 
 	// RedisConfig provides configuration for redis instance to be used for caching.
 	RedisConfig redis.RedisConfig `json:"redis_config" optional:"true"`
+
+	// List of corpora to be enabled for caching.
+	CachingCorpora []string `json:"cache_corpora" optional:"true"`
+
+	// Caching frequency in minutes.
+	CachingFrequencyMinutes int `json:"caching_frequency_minutes" optional:"true"`
 }
 
 // GetCacheClient returns a cache client based on the configuration.
diff --git a/golden/go/search/BUILD.bazel b/golden/go/search/BUILD.bazel
index 138c97e..c17cf28 100644
--- a/golden/go/search/BUILD.bazel
+++ b/golden/go/search/BUILD.bazel
@@ -7,12 +7,14 @@
     importpath = "go.skia.org/infra/golden/go/search",
     visibility = ["//visibility:public"],
     deps = [
+        "//go/cache",
         "//go/paramtools",
         "//go/skerr",
         "//go/sklog",
         "//go/util",
         "//golden/go/expectations",
         "//golden/go/publicparams",
+        "//golden/go/search/caching",
         "//golden/go/search/query",
         "//golden/go/sql",
         "//golden/go/sql/schema",
diff --git a/golden/go/search/caching/BUILD.bazel b/golden/go/search/caching/BUILD.bazel
index 2018994..3c14e5e 100644
--- a/golden/go/search/caching/BUILD.bazel
+++ b/golden/go/search/caching/BUILD.bazel
@@ -25,6 +25,7 @@
     embed = [":caching"],
     deps = [
         "//go/cache/mock",
+        "//go/deepequal/assertdeep",
         "//golden/go/sql/datakitchensink",
         "//golden/go/sql/sqltest",
         "@com_github_jackc_pgx_v4//pgxpool",
diff --git a/golden/go/search/caching/byblame.go b/golden/go/search/caching/byblame.go
index 7d024c6..7ffde8f 100644
--- a/golden/go/search/caching/byblame.go
+++ b/golden/go/search/caching/byblame.go
@@ -5,7 +5,6 @@
 
 	"github.com/jackc/pgx/v4/pgxpool"
 	"go.skia.org/infra/go/skerr"
-	"go.skia.org/infra/golden/go/sql/schema"
 )
 
 // This query collects untriaged image digests within the specified commit window for the given
@@ -33,13 +32,6 @@
 	 UntriagedDigests.digest = UnignoredDataAtHead.digest`
 )
 
-// ByBlameData provides a struct to hold data for the entry in by blame cache.
-type ByBlameData struct {
-	TraceID    schema.TraceID     `json:"traceID"`
-	GroupingID schema.GroupingID  `json:"groupingID"`
-	Digest     schema.DigestBytes `json:"digest"`
-}
-
 // ByBlameDataProvider implements cacheDataProvider.
 type ByBlameDataProvider struct {
 	db           *pgxpool.Pool
@@ -55,27 +47,36 @@
 	}
 }
 
+// GetDataForCorpus returns the byblame data for the given corpus.
+func (prov ByBlameDataProvider) GetDataForCorpus(ctx context.Context, corpus string) ([]ByBlameData, error) {
+	cacheData := []ByBlameData{}
+	rows, err := prov.db.Query(ctx, ByBlameQuery, prov.commitWindow, corpus)
+	if err != nil {
+		return nil, err
+	}
+	for rows.Next() {
+		byBlameData := ByBlameData{}
+		if err := rows.Scan(&byBlameData.TraceID, &byBlameData.GroupingID, &byBlameData.Digest); err != nil {
+			return nil, skerr.Wrap(err)
+		}
+		cacheData = append(cacheData, byBlameData)
+	}
+
+	return cacheData, nil
+}
+
 // GetCacheData implements cacheDataProvider.
 func (prov ByBlameDataProvider) GetCacheData(ctx context.Context) (map[string]string, error) {
 	cacheMap := map[string]string{}
 
 	// For each of the corpora, execute the sql query and add the results to the map.
 	for _, corpus := range prov.corpora {
-		key := ByBlameKey(corpus)
-		cacheData := []ByBlameData{}
-		rows, err := prov.db.Query(ctx, ByBlameQuery, prov.commitWindow, corpus)
+		cacheData, err := prov.GetDataForCorpus(ctx, corpus)
 		if err != nil {
 			return nil, err
 		}
-		for rows.Next() {
-			byBlameData := ByBlameData{}
-			if err := rows.Scan(&byBlameData.TraceID, &byBlameData.GroupingID, &byBlameData.Digest); err != nil {
-				return nil, skerr.Wrap(err)
-			}
-			cacheData = append(cacheData, byBlameData)
-		}
-
 		if len(cacheData) > 0 {
+			key := ByBlameKey(corpus)
 			cacheDataStr, err := toJSON(cacheData)
 			if err != nil {
 				return nil, skerr.Wrap(err)
diff --git a/golden/go/search/caching/common.go b/golden/go/search/caching/common.go
index d46bac9..6e04b2e 100644
--- a/golden/go/search/caching/common.go
+++ b/golden/go/search/caching/common.go
@@ -3,8 +3,17 @@
 import (
 	"context"
 	"encoding/json"
+
+	"go.skia.org/infra/golden/go/sql/schema"
 )
 
+// ByBlameData provides a struct to hold data for the entry in by blame cache.
+type ByBlameData struct {
+	TraceID    schema.TraceID     `json:"traceID"`
+	GroupingID schema.GroupingID  `json:"groupingID"`
+	Digest     schema.DigestBytes `json:"digest"`
+}
+
 // cacheDataProvider provides an interface for getting cache data.
 type cacheDataProvider interface {
 	GetCacheData(ctx context.Context) (map[string]string, error)
diff --git a/golden/go/search/caching/searchCache.go b/golden/go/search/caching/searchCache.go
index 06a4bea..f13dec4 100644
--- a/golden/go/search/caching/searchCache.go
+++ b/golden/go/search/caching/searchCache.go
@@ -2,18 +2,27 @@
 
 import (
 	"context"
+	"encoding/json"
 
 	"github.com/jackc/pgx/v4/pgxpool"
 	"go.skia.org/infra/go/cache"
 	"go.skia.org/infra/go/skerr"
 )
 
+type SearchCacheType int
+
+const (
+	// ByBlame_Corpus denotes the cache type for untriaged images by commits for a given corpus.
+	ByBlame_Corpus SearchCacheType = iota
+)
+
 // SearchCacheManager provides a struct to handle the cache operations for gold search.
 type SearchCacheManager struct {
-	cacheClient  cache.Cache
-	db           *pgxpool.Pool
-	corpora      []string
-	commitWindow int
+	cacheClient   cache.Cache
+	db            *pgxpool.Pool
+	corpora       []string
+	commitWindow  int
+	dataProviders map[SearchCacheType]cacheDataProvider
 }
 
 // New returns a new instance of the SearchCacheManager.
@@ -23,20 +32,15 @@
 		db:           db,
 		corpora:      corpora,
 		commitWindow: commitWindow,
-	}
-}
-
-// getCacheDataProviders returns a list of cacheDataProviders
-func (s SearchCacheManager) getCacheDataProviders() []cacheDataProvider {
-	return []cacheDataProvider{
-		NewByBlameDataProvider(s.db, s.corpora, s.commitWindow),
+		dataProviders: map[SearchCacheType]cacheDataProvider{
+			ByBlame_Corpus: NewByBlameDataProvider(db, corpora, commitWindow),
+		},
 	}
 }
 
 // RunCachePopulation gets the cache data from the providers and stores it in the cache instance.
 func (s SearchCacheManager) RunCachePopulation(ctx context.Context) error {
-	providers := s.getCacheDataProviders()
-	for _, prov := range providers {
+	for _, prov := range s.dataProviders {
 		data, err := prov.GetCacheData(ctx)
 		if err != nil {
 			return skerr.Wrapf(err, "Error while running cache population with provider %s", prov)
@@ -52,3 +56,22 @@
 
 	return nil
 }
+
+// GetByBlameData returns the by blame data for the given corpus from cache.
+func (s SearchCacheManager) GetByBlameData(ctx context.Context, corpus string) ([]ByBlameData, error) {
+	cacheKey := ByBlameKey(corpus)
+	data := []ByBlameData{}
+	jsonStr, err := s.cacheClient.GetValue(ctx, cacheKey)
+	if err != nil {
+		return data, skerr.Wrapf(err, "Error retrieving by blame data from cache for key %s corpus %s", cacheKey, corpus)
+	}
+
+	// This is the case when there is a cache miss.
+	if jsonStr == "" {
+		provider := s.dataProviders[ByBlame_Corpus].(ByBlameDataProvider)
+		return provider.GetDataForCorpus(ctx, corpus)
+	}
+
+	err = json.Unmarshal([]byte(jsonStr), &data)
+	return data, err
+}
diff --git a/golden/go/search/caching/searchCache_test.go b/golden/go/search/caching/searchCache_test.go
index bfd22fe..8ec971a 100644
--- a/golden/go/search/caching/searchCache_test.go
+++ b/golden/go/search/caching/searchCache_test.go
@@ -9,6 +9,7 @@
 	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
 	mockCache "go.skia.org/infra/go/cache/mock"
+	"go.skia.org/infra/go/deepequal/assertdeep"
 	dks "go.skia.org/infra/golden/go/sql/datakitchensink"
 	"go.skia.org/infra/golden/go/sql/sqltest"
 )
@@ -42,3 +43,49 @@
 	err := searchCacheManager.RunCachePopulation(ctx)
 	assert.Nil(t, err)
 }
+
+func TestReadFromCache_CacheHit_Success(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	db := useKitchenSinkData(ctx, t)
+	cacheClient := mockCache.NewCache(t)
+	corpus := dks.RoundCorpus
+	cacheResults := []ByBlameData{
+		{
+			TraceID:    []byte("trace1"),
+			GroupingID: []byte("group1"),
+			Digest:     []byte("d1"),
+		},
+		{
+			TraceID:    []byte("trace2"),
+			GroupingID: []byte("group2"),
+			Digest:     []byte("d2"),
+		},
+	}
+	cacheClient.On("GetValue", ctx, ByBlameKey(corpus)).Return(toJSON(cacheResults))
+
+	searchCacheManager := New(cacheClient, db, []string{corpus}, 5)
+	data, err := searchCacheManager.GetByBlameData(ctx, corpus)
+	assert.Nil(t, err)
+	assert.NotNil(t, data)
+	assertdeep.Equal(t, cacheResults, data)
+	cacheClient.AssertNumberOfCalls(t, "GetValue", 1)
+}
+
+func TestReadFromCache_CacheMiss_Success(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	db := useKitchenSinkData(ctx, t)
+	cacheClient := mockCache.NewCache(t)
+	corpus := dks.RoundCorpus
+
+	cacheClient.On("GetValue", ctx, ByBlameKey(corpus)).Return("", nil)
+
+	searchCacheManager := New(cacheClient, db, []string{corpus}, 5)
+	data, err := searchCacheManager.GetByBlameData(ctx, corpus)
+	assert.Nil(t, err)
+	assert.NotNil(t, data)
+	// Even when there is a cache miss, we should have data since it will fall back to db query.
+	assert.True(t, len(data) > 0)
+	cacheClient.AssertNumberOfCalls(t, "GetValue", 1)
+}
diff --git a/golden/go/search/search.go b/golden/go/search/search.go
index c03f559..aff5010 100644
--- a/golden/go/search/search.go
+++ b/golden/go/search/search.go
@@ -21,12 +21,14 @@
 	"go.opencensus.io/trace"
 	"golang.org/x/sync/errgroup"
 
+	"go.skia.org/infra/go/cache"
 	"go.skia.org/infra/go/paramtools"
 	"go.skia.org/infra/go/skerr"
 	"go.skia.org/infra/go/sklog"
 	"go.skia.org/infra/go/util"
 	"go.skia.org/infra/golden/go/expectations"
 	"go.skia.org/infra/golden/go/publicparams"
+	"go.skia.org/infra/golden/go/search/caching"
 	"go.skia.org/infra/golden/go/search/query"
 	"go.skia.org/infra/golden/go/sql"
 	"go.skia.org/infra/golden/go/sql/schema"
@@ -204,6 +206,7 @@
 	paramsetCache        *ttlcache.Cache
 
 	materializedViews map[string]bool
+	cacheManager      *caching.SearchCacheManager
 }
 
 // New returns an implementation of API.
@@ -233,6 +236,11 @@
 	}
 }
 
+// EnableCache enables reading data from cache for search.
+func (s *Impl) EnableCache(cacheClient cache.Cache, cache_corpora []string) {
+	s.cacheManager = caching.New(cacheClient, s.db, cache_corpora, s.windowLength)
+}
+
 // SetReviewSystemTemplates sets the URL templates that are used to link to the code review system.
 // The Changelist ID will be substituted in using fmt.Sprintf and a %s placeholder.
 func (s *Impl) SetReviewSystemTemplates(m map[string]string) {
@@ -2819,6 +2827,31 @@
 func (s *Impl) getTracesWithUntriagedDigestsAtHead(ctx context.Context, corpus string) (map[groupingDigestKey][]schema.TraceID, error) {
 	ctx, span := trace.StartSpan(ctx, "getTracesWithUntriagedDigestsAtHead")
 	defer span.End()
+
+	// Caching is enabled.
+	if s.cacheManager != nil {
+		sklog.Debugf("Search cache is enabled.")
+		byBlameData, err := s.cacheManager.GetByBlameData(ctx, corpus)
+		if err != nil {
+			sklog.Errorf("Error encountered when retrieving ByBlame data from cache: %v", err)
+			return nil, err
+		}
+
+		sklog.Debugf("Retrieved %d items from search cache for corpus %s", len(byBlameData), corpus)
+		rv := map[groupingDigestKey][]schema.TraceID{}
+		var key groupingDigestKey
+		groupingKey := key.groupingID[:]
+		digestKey := key.digest[:]
+		for _, data := range byBlameData {
+			copy(groupingKey, data.GroupingID)
+			copy(digestKey, data.Digest)
+			rv[key] = append(rv[key], data.TraceID)
+		}
+
+		return rv, nil
+	}
+
+	sklog.Debugf("Search cache is not enabled. Proceeding with regular search.")
 	statement := `WITH
 UntriagedDigests AS (
 	SELECT grouping_id, digest FROM Expectations