Gold Search: Start reading from cache before checking materialized views.
Change-Id: Iee6daedfe588859df4ae182a272fd4782eccaa0a
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/913960
Commit-Queue: Ashwin Verleker <ashwinpv@google.com>
Reviewed-by: Hao Wu <haowoo@google.com>
diff --git a/golden/cmd/gold_frontend/gold_frontend.go b/golden/cmd/gold_frontend/gold_frontend.go
index b0862de..e077ab6 100644
--- a/golden/cmd/gold_frontend/gold_frontend.go
+++ b/golden/cmd/gold_frontend/gold_frontend.go
@@ -169,9 +169,19 @@
}
s2a := search.New(sqlDB, fsc.WindowSize)
+ cacheClient, err := fsc.GetCacheClient(ctx)
+ if err != nil {
+ // TODO(ashwinpv): Once we are fully onboarded, this error should cause a failure.
+ sklog.Warningf("Error while trying to create a new cache client: %v", err)
+ }
+ if cacheClient != nil {
+ sklog.Debugf("Enabling cache for search.")
+ s2a.EnableCache(cacheClient, fsc.CachingCorpora)
+ }
+
s2a.SetReviewSystemTemplates(templates)
sklog.Infof("SQL Search loaded with CRS templates %s", templates)
- err := s2a.StartCacheProcess(ctx, 5*time.Minute, fsc.WindowSize)
+ err = s2a.StartCacheProcess(ctx, 5*time.Minute, fsc.WindowSize)
if err != nil {
sklog.Fatalf("Cannot load caches for search2 backend: %s", err)
}
diff --git a/golden/cmd/periodictasks/periodictasks.go b/golden/cmd/periodictasks/periodictasks.go
index f5e1640..74b06a9 100644
--- a/golden/cmd/periodictasks/periodictasks.go
+++ b/golden/cmd/periodictasks/periodictasks.go
@@ -83,12 +83,6 @@
// UpdateIgnorePeriod is how often we should try to apply the ignore rules to all traces.
UpdateIgnorePeriod config.Duration `json:"update_traces_ignore_period"` // TODO(kjlubick) change JSON
-
- // List of corpora to be enabled for caching.
- CachingCorpora []string `json:"cache_corpora" optional:"true"`
-
- // Caching frequency in minutes.
- CachingFrequencyMinutes int `json:"caching_frequency_minutes" optional:"true"`
}
type perfSummariesConfig struct {
diff --git a/golden/go/config/config.go b/golden/go/config/config.go
index 3aeb309..9a3c41a 100644
--- a/golden/go/config/config.go
+++ b/golden/go/config/config.go
@@ -87,6 +87,12 @@
// RedisConfig provides configuration for redis instance to be used for caching.
RedisConfig redis.RedisConfig `json:"redis_config" optional:"true"`
+
+ // List of corpora to be enabled for caching.
+ CachingCorpora []string `json:"cache_corpora" optional:"true"`
+
+ // Caching frequency in minutes.
+ CachingFrequencyMinutes int `json:"caching_frequency_minutes" optional:"true"`
}
// GetCacheClient returns a cache client based on the configuration.
diff --git a/golden/go/search/BUILD.bazel b/golden/go/search/BUILD.bazel
index 138c97e..c17cf28 100644
--- a/golden/go/search/BUILD.bazel
+++ b/golden/go/search/BUILD.bazel
@@ -7,12 +7,14 @@
importpath = "go.skia.org/infra/golden/go/search",
visibility = ["//visibility:public"],
deps = [
+ "//go/cache",
"//go/paramtools",
"//go/skerr",
"//go/sklog",
"//go/util",
"//golden/go/expectations",
"//golden/go/publicparams",
+ "//golden/go/search/caching",
"//golden/go/search/query",
"//golden/go/sql",
"//golden/go/sql/schema",
diff --git a/golden/go/search/caching/BUILD.bazel b/golden/go/search/caching/BUILD.bazel
index 2018994..3c14e5e 100644
--- a/golden/go/search/caching/BUILD.bazel
+++ b/golden/go/search/caching/BUILD.bazel
@@ -25,6 +25,7 @@
embed = [":caching"],
deps = [
"//go/cache/mock",
+ "//go/deepequal/assertdeep",
"//golden/go/sql/datakitchensink",
"//golden/go/sql/sqltest",
"@com_github_jackc_pgx_v4//pgxpool",
diff --git a/golden/go/search/caching/byblame.go b/golden/go/search/caching/byblame.go
index 7d024c6..7ffde8f 100644
--- a/golden/go/search/caching/byblame.go
+++ b/golden/go/search/caching/byblame.go
@@ -5,7 +5,6 @@
"github.com/jackc/pgx/v4/pgxpool"
"go.skia.org/infra/go/skerr"
- "go.skia.org/infra/golden/go/sql/schema"
)
// This query collects untriaged image digests within the specified commit window for the given
@@ -33,13 +32,6 @@
UntriagedDigests.digest = UnignoredDataAtHead.digest`
)
-// ByBlameData provides a struct to hold data for the entry in by blame cache.
-type ByBlameData struct {
- TraceID schema.TraceID `json:"traceID"`
- GroupingID schema.GroupingID `json:"groupingID"`
- Digest schema.DigestBytes `json:"digest"`
-}
-
// ByBlameDataProvider implements cacheDataProvider.
type ByBlameDataProvider struct {
db *pgxpool.Pool
@@ -55,27 +47,36 @@
}
}
+// GetDataForCorpus returns the byblame data for the given corpus.
+func (prov ByBlameDataProvider) GetDataForCorpus(ctx context.Context, corpus string) ([]ByBlameData, error) {
+ cacheData := []ByBlameData{}
+ rows, err := prov.db.Query(ctx, ByBlameQuery, prov.commitWindow, corpus)
+ if err != nil {
+ return nil, err
+ }
+ for rows.Next() {
+ byBlameData := ByBlameData{}
+ if err := rows.Scan(&byBlameData.TraceID, &byBlameData.GroupingID, &byBlameData.Digest); err != nil {
+ return nil, skerr.Wrap(err)
+ }
+ cacheData = append(cacheData, byBlameData)
+ }
+
+ return cacheData, nil
+}
+
// GetCacheData implements cacheDataProvider.
func (prov ByBlameDataProvider) GetCacheData(ctx context.Context) (map[string]string, error) {
cacheMap := map[string]string{}
// For each of the corpora, execute the sql query and add the results to the map.
for _, corpus := range prov.corpora {
- key := ByBlameKey(corpus)
- cacheData := []ByBlameData{}
- rows, err := prov.db.Query(ctx, ByBlameQuery, prov.commitWindow, corpus)
+ cacheData, err := prov.GetDataForCorpus(ctx, corpus)
if err != nil {
return nil, err
}
- for rows.Next() {
- byBlameData := ByBlameData{}
- if err := rows.Scan(&byBlameData.TraceID, &byBlameData.GroupingID, &byBlameData.Digest); err != nil {
- return nil, skerr.Wrap(err)
- }
- cacheData = append(cacheData, byBlameData)
- }
-
if len(cacheData) > 0 {
+ key := ByBlameKey(corpus)
cacheDataStr, err := toJSON(cacheData)
if err != nil {
return nil, skerr.Wrap(err)
diff --git a/golden/go/search/caching/common.go b/golden/go/search/caching/common.go
index d46bac9..6e04b2e 100644
--- a/golden/go/search/caching/common.go
+++ b/golden/go/search/caching/common.go
@@ -3,8 +3,17 @@
import (
"context"
"encoding/json"
+
+ "go.skia.org/infra/golden/go/sql/schema"
)
+// ByBlameData provides a struct to hold data for the entry in by blame cache.
+type ByBlameData struct {
+ TraceID schema.TraceID `json:"traceID"`
+ GroupingID schema.GroupingID `json:"groupingID"`
+ Digest schema.DigestBytes `json:"digest"`
+}
+
// cacheDataProvider provides an interface for getting cache data.
type cacheDataProvider interface {
GetCacheData(ctx context.Context) (map[string]string, error)
diff --git a/golden/go/search/caching/searchCache.go b/golden/go/search/caching/searchCache.go
index 06a4bea..f13dec4 100644
--- a/golden/go/search/caching/searchCache.go
+++ b/golden/go/search/caching/searchCache.go
@@ -2,18 +2,27 @@
import (
"context"
+ "encoding/json"
"github.com/jackc/pgx/v4/pgxpool"
"go.skia.org/infra/go/cache"
"go.skia.org/infra/go/skerr"
)
+type SearchCacheType int
+
+const (
+ // ByBlame_Corpus denotes the cache type for untriaged images by commits for a given corpus.
+ ByBlame_Corpus SearchCacheType = iota
+)
+
// SearchCacheManager provides a struct to handle the cache operations for gold search.
type SearchCacheManager struct {
- cacheClient cache.Cache
- db *pgxpool.Pool
- corpora []string
- commitWindow int
+ cacheClient cache.Cache
+ db *pgxpool.Pool
+ corpora []string
+ commitWindow int
+ dataProviders map[SearchCacheType]cacheDataProvider
}
// New returns a new instance of the SearchCacheManager.
@@ -23,20 +32,15 @@
db: db,
corpora: corpora,
commitWindow: commitWindow,
- }
-}
-
-// getCacheDataProviders returns a list of cacheDataProviders
-func (s SearchCacheManager) getCacheDataProviders() []cacheDataProvider {
- return []cacheDataProvider{
- NewByBlameDataProvider(s.db, s.corpora, s.commitWindow),
+ dataProviders: map[SearchCacheType]cacheDataProvider{
+ ByBlame_Corpus: NewByBlameDataProvider(db, corpora, commitWindow),
+ },
}
}
// RunCachePopulation gets the cache data from the providers and stores it in the cache instance.
func (s SearchCacheManager) RunCachePopulation(ctx context.Context) error {
- providers := s.getCacheDataProviders()
- for _, prov := range providers {
+ for _, prov := range s.dataProviders {
data, err := prov.GetCacheData(ctx)
if err != nil {
return skerr.Wrapf(err, "Error while running cache population with provider %s", prov)
@@ -52,3 +56,22 @@
return nil
}
+
+// GetByBlameData returns the by blame data for the given corpus from cache.
+func (s SearchCacheManager) GetByBlameData(ctx context.Context, corpus string) ([]ByBlameData, error) {
+ cacheKey := ByBlameKey(corpus)
+ data := []ByBlameData{}
+ jsonStr, err := s.cacheClient.GetValue(ctx, cacheKey)
+ if err != nil {
+ return data, skerr.Wrapf(err, "Error retrieving by blame data from cache for key %s corpus %s", cacheKey, corpus)
+ }
+
+ // This is the case when there is a cache miss.
+ if jsonStr == "" {
+ provider := s.dataProviders[ByBlame_Corpus].(ByBlameDataProvider)
+ return provider.GetDataForCorpus(ctx, corpus)
+ }
+
+ err = json.Unmarshal([]byte(jsonStr), &data)
+ return data, err
+}
diff --git a/golden/go/search/caching/searchCache_test.go b/golden/go/search/caching/searchCache_test.go
index bfd22fe..8ec971a 100644
--- a/golden/go/search/caching/searchCache_test.go
+++ b/golden/go/search/caching/searchCache_test.go
@@ -9,6 +9,7 @@
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
mockCache "go.skia.org/infra/go/cache/mock"
+ "go.skia.org/infra/go/deepequal/assertdeep"
dks "go.skia.org/infra/golden/go/sql/datakitchensink"
"go.skia.org/infra/golden/go/sql/sqltest"
)
@@ -42,3 +43,49 @@
err := searchCacheManager.RunCachePopulation(ctx)
assert.Nil(t, err)
}
+
+func TestReadFromCache_CacheHit_Success(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ db := useKitchenSinkData(ctx, t)
+ cacheClient := mockCache.NewCache(t)
+ corpus := dks.RoundCorpus
+ cacheResults := []ByBlameData{
+ {
+ TraceID: []byte("trace1"),
+ GroupingID: []byte("group1"),
+ Digest: []byte("d1"),
+ },
+ {
+ TraceID: []byte("trace2"),
+ GroupingID: []byte("group2"),
+ Digest: []byte("d2"),
+ },
+ }
+ cacheClient.On("GetValue", ctx, ByBlameKey(corpus)).Return(toJSON(cacheResults))
+
+ searchCacheManager := New(cacheClient, db, []string{corpus}, 5)
+ data, err := searchCacheManager.GetByBlameData(ctx, corpus)
+ assert.Nil(t, err)
+ assert.NotNil(t, data)
+ assertdeep.Equal(t, cacheResults, data)
+ cacheClient.AssertNumberOfCalls(t, "GetValue", 1)
+}
+
+func TestReadFromCache_CacheMiss_Success(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ db := useKitchenSinkData(ctx, t)
+ cacheClient := mockCache.NewCache(t)
+ corpus := dks.RoundCorpus
+
+ cacheClient.On("GetValue", ctx, ByBlameKey(corpus)).Return("", nil)
+
+ searchCacheManager := New(cacheClient, db, []string{corpus}, 5)
+ data, err := searchCacheManager.GetByBlameData(ctx, corpus)
+ assert.Nil(t, err)
+ assert.NotNil(t, data)
+ // Even when there is a cache miss, we should have data since it will fall back to db query.
+ assert.True(t, len(data) > 0)
+ cacheClient.AssertNumberOfCalls(t, "GetValue", 1)
+}
diff --git a/golden/go/search/search.go b/golden/go/search/search.go
index c03f559..aff5010 100644
--- a/golden/go/search/search.go
+++ b/golden/go/search/search.go
@@ -21,12 +21,14 @@
"go.opencensus.io/trace"
"golang.org/x/sync/errgroup"
+ "go.skia.org/infra/go/cache"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/golden/go/expectations"
"go.skia.org/infra/golden/go/publicparams"
+ "go.skia.org/infra/golden/go/search/caching"
"go.skia.org/infra/golden/go/search/query"
"go.skia.org/infra/golden/go/sql"
"go.skia.org/infra/golden/go/sql/schema"
@@ -204,6 +206,7 @@
paramsetCache *ttlcache.Cache
materializedViews map[string]bool
+ cacheManager *caching.SearchCacheManager
}
// New returns an implementation of API.
@@ -233,6 +236,11 @@
}
}
+// EnableCache enables reading data from cache for search.
+func (s *Impl) EnableCache(cacheClient cache.Cache, cache_corpora []string) {
+ s.cacheManager = caching.New(cacheClient, s.db, cache_corpora, s.windowLength)
+}
+
// SetReviewSystemTemplates sets the URL templates that are used to link to the code review system.
// The Changelist ID will be substituted in using fmt.Sprintf and a %s placeholder.
func (s *Impl) SetReviewSystemTemplates(m map[string]string) {
@@ -2819,6 +2827,31 @@
func (s *Impl) getTracesWithUntriagedDigestsAtHead(ctx context.Context, corpus string) (map[groupingDigestKey][]schema.TraceID, error) {
ctx, span := trace.StartSpan(ctx, "getTracesWithUntriagedDigestsAtHead")
defer span.End()
+
+ // Caching is enabled.
+ if s.cacheManager != nil {
+ sklog.Debugf("Search cache is enabled.")
+ byBlameData, err := s.cacheManager.GetByBlameData(ctx, corpus)
+ if err != nil {
+ sklog.Errorf("Error encountered when retrieving ByBlame data from cache: %v", err)
+ return nil, err
+ }
+
+ sklog.Debugf("Retrieved %d items from search cache for corpus %s", len(byBlameData), corpus)
+ rv := map[groupingDigestKey][]schema.TraceID{}
+ var key groupingDigestKey
+ groupingKey := key.groupingID[:]
+ digestKey := key.digest[:]
+ for _, data := range byBlameData {
+ copy(groupingKey, data.GroupingID)
+ copy(digestKey, data.Digest)
+ rv[key] = append(rv[key], data.TraceID)
+ }
+
+ return rv, nil
+ }
+
+ sklog.Debugf("Search cache is not enabled. Proceeding with regular search.")
statement := `WITH
UntriagedDigests AS (
SELECT grouping_id, digest FROM Expectations