[gold] Optimize new SQL search.
This adds some indexes to speed up parts of queries that
were slow (as well as turn down tracing on the frontend).
Bug: skia:10582
Change-Id: Idf5bf2ac47a36571048acec048cc30e246563b79
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/404176
Commit-Queue: Kevin Lubick <kjlubick@google.com>
Reviewed-by: Leandro Lovisolo <lovisolo@google.com>
diff --git a/golden/cmd/gold_frontend/gold_frontend.go b/golden/cmd/gold_frontend/gold_frontend.go
index cee2333..ccf95ec 100644
--- a/golden/cmd/gold_frontend/gold_frontend.go
+++ b/golden/cmd/gold_frontend/gold_frontend.go
@@ -21,7 +21,6 @@
"cloud.google.com/go/pubsub"
"github.com/gorilla/mux"
"github.com/jackc/pgx/v4/pgxpool"
- "go.opencensus.io/trace"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/golden/go/types"
"golang.org/x/oauth2"
@@ -79,7 +78,7 @@
callbackPath = "/oauth2callback/"
// Arbitrarily picked.
- maxSQLConnections = 20
+ maxSQLConnections = 32
)
type frontendServerConfig struct {
@@ -183,8 +182,7 @@
// Speculative memory usage fix? https://github.com/googleapis/google-cloud-go/issues/375
grpc.EnableTracing = false
- // Record the traces of all spans, since we expect web requests to be somewhat rare.
- if err := tracing.Initialize(1.0); err != nil {
+ if err := tracing.Initialize(0.01); err != nil {
sklog.Fatalf("Could not initialize tracing: %s", err)
}
@@ -590,8 +588,6 @@
// CalculateDiffs publishes a WorkerMessage to the configured PubSub topic so that a worker
// (see diffcalculator) can pick it up and calculate the diffs.
func (p *pubsubDiffPublisher) CalculateDiffs(ctx context.Context, grouping paramtools.Params, left, right []types.Digest) error {
- ctx, span := trace.StartSpan(ctx, "PublishDiffMessage")
- defer span.End()
body, err := json.Marshal(diff.WorkerMessage{
Version: diff.WorkerMessageVersion,
Grouping: grouping,
diff --git a/golden/go/search2/search2.go b/golden/go/search2/search2.go
index 9045a92..4716ee9 100644
--- a/golden/go/search2/search2.go
+++ b/golden/go/search2/search2.go
@@ -609,12 +609,12 @@
if q.IncludeIgnoredTraces {
ignoreStatuses = append(ignoreStatuses, true)
}
- args := []interface{}{getFirstCommitID(ctx), ignoreStatuses}
+ var args []interface{}
if q.OnlyIncludeDigestsProducedAtHead {
if len(keyFilters) == 0 {
// Corpus is being used as a string
- args = append(args, q.TraceValues[types.CorpusField][0])
+ args = append(args, getFirstCommitID(ctx), ignoreStatuses, q.TraceValues[types.CorpusField][0])
return `
MatchingTraces AS (
SELECT trace_id, grouping_id, digest FROM ValuesAtHead
@@ -624,7 +624,7 @@
)`, args
}
// Corpus is being used as a JSONB value here
- args = append(args, `"`+q.TraceValues[types.CorpusField][0]+`"`)
+ args = append(args, getFirstCommitID(ctx), ignoreStatuses, `"`+q.TraceValues[types.CorpusField][0]+`"`)
return joinedTracesStatement(keyFilters) + `
MatchingTraces AS (
SELECT ValuesAtHead.trace_id, grouping_id, digest FROM ValuesAtHead
@@ -635,34 +635,35 @@
} else {
if len(keyFilters) == 0 {
// Corpus is being used as a string
- args = append(args, q.TraceValues[types.CorpusField][0])
+ args = append(args, getFirstCommitID(ctx), ignoreStatuses, q.TraceValues[types.CorpusField][0], getFirstTileID(ctx))
return `
TracesOfInterest AS (
- SELECT trace_id FROM ValuesAtHead
+ SELECT trace_id, grouping_id FROM ValuesAtHead
WHERE matches_any_ignore_rule = ANY($3) AND
+ most_recent_commit_id >= $2 AND
corpus = $4
),
MatchingTraces AS (
- SELECT DISTINCT TraceValues.trace_id, TraceValues.grouping_id, TraceValues.digest
- FROM TraceValues
- JOIN TracesOfInterest on TraceValues.trace_id = TracesOfInterest.trace_id
- WHERE commit_id >= $2
+ SELECT DISTINCT TiledTraceDigests.trace_id, TracesOfInterest.grouping_id, TiledTraceDigests.digest
+ FROM TiledTraceDigests
+ JOIN TracesOfInterest on TracesOfInterest.trace_id = TiledTraceDigests.trace_id
+ WHERE tile_id >= $5
)
`, args
}
// Corpus is being used as a JSONB value here
- args = append(args, `"`+q.TraceValues[types.CorpusField][0]+`"`)
+ args = append(args, getFirstTileID(ctx), ignoreStatuses, `"`+q.TraceValues[types.CorpusField][0]+`"`)
return joinedTracesStatement(keyFilters) + `
TracesOfInterest AS (
- SELECT Traces.trace_id FROM Traces
+ SELECT Traces.trace_id, grouping_id FROM Traces
JOIN JoinedTraces ON Traces.trace_id = JoinedTraces.trace_id
WHERE matches_any_ignore_rule = ANY($3)
),
MatchingTraces AS (
- SELECT DISTINCT TraceValues.trace_id, TraceValues.grouping_id, TraceValues.digest
- FROM TraceValues
- JOIN TracesOfInterest on TraceValues.trace_id = TracesOfInterest.trace_id
- WHERE commit_id >= $2
+ SELECT DISTINCT TiledTraceDigests.trace_id, TracesOfInterest.grouping_id, TiledTraceDigests.digest
+ FROM TiledTraceDigests
+ JOIN TracesOfInterest on TracesOfInterest.trace_id = TiledTraceDigests.trace_id
+ WHERE tile_id >= $2
)`, args
}
}
@@ -671,7 +672,7 @@
// This table contains just the trace_ids that match the given filters. filters is expected to
// have keys which passed sanitization (it will sanitize the values). The snippet will include
// other tables that will be unioned and intersected to create the appropriate rows. This is
-// similar to the technique we use for ingore rules, chosen to maximize consistent performance
+// similar to the technique we use for ignore rules, chosen to maximize consistent performance
// by using the inverted key index. The keys and values are hardcoded into the string instead
// of being passed in as arguments because kjlubick@ was not able to use the placeholder values
//to compare JSONB types removed from a JSONB object to a string while still using the indexes.
@@ -1048,52 +1049,62 @@
ctx, span := trace.StartSpan(ctx, "fillOutTraceHistory")
span.AddAttributes(trace.Int64Attribute("results", int64(len(inputs))))
defer span.End()
- rv := make([]*frontend.SearchResult, len(inputs))
- for i, input := range inputs {
- sr := &frontend.SearchResult{
- Digest: types.Digest(hex.EncodeToString(input.leftDigest)),
- RefDiffs: map[common.RefClosest]*frontend.SRDiffDigest{
- common.PositiveRef: input.closestPositive,
- common.NegativeRef: input.closestNegative,
- },
- }
- if input.closestDigest != nil && input.closestDigest.Status == expectations.Positive {
- sr.ClosestRef = common.PositiveRef
- } else if input.closestDigest != nil && input.closestDigest.Status == expectations.Negative {
- sr.ClosestRef = common.NegativeRef
- }
- tg, err := s.traceGroupForTraces(ctx, input.traceIDs, sr.Digest)
- if err != nil {
- return nil, skerr.Wrap(err)
- }
- if err := s.fillInExpectations(ctx, &tg, input.groupingID); err != nil {
- return nil, skerr.Wrap(err)
- }
- if err := s.fillInTraceParams(ctx, &tg); err != nil {
- return nil, skerr.Wrap(err)
- }
- sr.TraceGroup = tg
- if len(tg.Digests) > 0 {
- // The first digest in the trace group is this digest.
- sr.Status = tg.Digests[0].Status
- } else {
- // We assume untriaged if digest is not in the window.
- sr.Status = expectations.Untriaged
- }
- if len(tg.Traces) > 0 {
- // Grab the test name from the first trace, since we know all the traces are of
- // the same grouping, which includes test name.
- sr.Test = types.TestName(tg.Traces[0].Params[types.PrimaryKeyField])
- }
- leftPS := paramtools.ParamSet{}
- for _, tr := range tg.Traces {
- leftPS.AddParams(tr.Params)
- }
- leftPS.Normalize()
- sr.ParamSet = leftPS
- rv[i] = sr
+ // Fill out these histories in parallel. We avoid race conditions by writing to a prescribed
+ // index in the results slice.
+ results := make([]*frontend.SearchResult, len(inputs))
+ eg, eCtx := errgroup.WithContext(ctx)
+ for i, j := range inputs {
+ idx, input := i, j
+ eg.Go(func() error {
+ sr := &frontend.SearchResult{
+ Digest: types.Digest(hex.EncodeToString(input.leftDigest)),
+ RefDiffs: map[common.RefClosest]*frontend.SRDiffDigest{
+ common.PositiveRef: input.closestPositive,
+ common.NegativeRef: input.closestNegative,
+ },
+ }
+ if input.closestDigest != nil && input.closestDigest.Status == expectations.Positive {
+ sr.ClosestRef = common.PositiveRef
+ } else if input.closestDigest != nil && input.closestDigest.Status == expectations.Negative {
+ sr.ClosestRef = common.NegativeRef
+ }
+ tg, err := s.traceGroupForTraces(eCtx, input.traceIDs, sr.Digest)
+ if err != nil {
+ return skerr.Wrap(err)
+ }
+ if err := s.fillInExpectations(eCtx, &tg, input.groupingID); err != nil {
+ return skerr.Wrap(err)
+ }
+ if err := s.fillInTraceParams(eCtx, &tg); err != nil {
+ return skerr.Wrap(err)
+ }
+ sr.TraceGroup = tg
+ if len(tg.Digests) > 0 {
+ // The first digest in the trace group is this digest.
+ sr.Status = tg.Digests[0].Status
+ } else {
+ // We assume untriaged if digest is not in the window.
+ sr.Status = expectations.Untriaged
+ }
+ if len(tg.Traces) > 0 {
+ // Grab the test name from the first trace, since we know all the traces are of
+ // the same grouping, which includes test name.
+ sr.Test = types.TestName(tg.Traces[0].Params[types.PrimaryKeyField])
+ }
+ leftPS := paramtools.ParamSet{}
+ for _, tr := range tg.Traces {
+ leftPS.AddParams(tr.Params)
+ }
+ leftPS.Normalize()
+ sr.ParamSet = leftPS
+ results[idx] = sr
+ return nil
+ })
}
- return rv, nil
+ if err := eg.Wait(); err != nil {
+ return nil, skerr.Wrap(err)
+ }
+ return results, nil
}
type traceDigestCommit struct {
diff --git a/golden/go/sql/schema/sql.go b/golden/go/sql/schema/sql.go
index fac2526..7e37793 100644
--- a/golden/go/sql/schema/sql.go
+++ b/golden/go/sql/schema/sql.go
@@ -49,7 +49,8 @@
digest BYTES,
label CHAR NOT NULL,
expectation_record_id UUID,
- PRIMARY KEY (grouping_id, digest)
+ PRIMARY KEY (grouping_id, digest),
+ INDEX label_idx (label)
);
CREATE TABLE IF NOT EXISTS GitCommits (
git_hash STRING PRIMARY KEY,
@@ -138,7 +139,8 @@
digest BYTES NOT NULL,
grouping_id BYTES NOT NULL,
PRIMARY KEY (trace_id, tile_id, digest),
- INDEX grouping_digest_idx (grouping_id, digest)
+ INDEX grouping_digest_idx (grouping_id, digest),
+ INDEX tile_trace_idx (tile_id, trace_id)
);
CREATE TABLE IF NOT EXISTS TrackingCommits (
repo STRING PRIMARY KEY,
diff --git a/golden/go/sql/schema/tables.go b/golden/go/sql/schema/tables.go
index a5d44ec..49fd412 100644
--- a/golden/go/sql/schema/tables.go
+++ b/golden/go/sql/schema/tables.go
@@ -474,6 +474,7 @@
// ExpectationRecordID corresponds to most recent ExpectationRecordRow that set the given label.
ExpectationRecordID *uuid.UUID `sql:"expectation_record_id UUID"`
primaryKey struct{} `sql:"PRIMARY KEY (grouping_id, digest)"`
+ labelIndex struct{} `sql:"INDEX label_idx (label)"`
}
// ToSQLRow implements the sqltest.SQLExporter interface.
@@ -657,6 +658,7 @@
// This index makes it easier to answer the question "What digests are being produced by
// a given grouping on the primary branch).
groupingDigestIndex struct{} `sql:"INDEX grouping_digest_idx (grouping_id, digest)"`
+ tileTraceIndex struct{} `sql:"INDEX tile_trace_idx (tile_id, trace_id)"`
}
// ToSQLRow implements the sqltest.SQLExporter interface.