[gold] Optimize new SQL search.

This adds some indexes to speed up parts of queries that
were slow (as well as turn down tracing on the frontend).

Bug: skia:10582
Change-Id: Idf5bf2ac47a36571048acec048cc30e246563b79
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/404176
Commit-Queue: Kevin Lubick <kjlubick@google.com>
Reviewed-by: Leandro Lovisolo <lovisolo@google.com>
diff --git a/golden/cmd/gold_frontend/gold_frontend.go b/golden/cmd/gold_frontend/gold_frontend.go
index cee2333..ccf95ec 100644
--- a/golden/cmd/gold_frontend/gold_frontend.go
+++ b/golden/cmd/gold_frontend/gold_frontend.go
@@ -21,7 +21,6 @@
 	"cloud.google.com/go/pubsub"
 	"github.com/gorilla/mux"
 	"github.com/jackc/pgx/v4/pgxpool"
-	"go.opencensus.io/trace"
 	"go.skia.org/infra/go/paramtools"
 	"go.skia.org/infra/golden/go/types"
 	"golang.org/x/oauth2"
@@ -79,7 +78,7 @@
 	callbackPath = "/oauth2callback/"
 
 	// Arbitrarily picked.
-	maxSQLConnections = 20
+	maxSQLConnections = 32
 )
 
 type frontendServerConfig struct {
@@ -183,8 +182,7 @@
 	// Speculative memory usage fix? https://github.com/googleapis/google-cloud-go/issues/375
 	grpc.EnableTracing = false
 
-	// Record the traces of all spans, since we expect web requests to be somewhat rare.
-	if err := tracing.Initialize(1.0); err != nil {
+	if err := tracing.Initialize(0.01); err != nil {
 		sklog.Fatalf("Could not initialize tracing: %s", err)
 	}
 
@@ -590,8 +588,6 @@
 // CalculateDiffs publishes a WorkerMessage to the configured PubSub topic so that a worker
 // (see diffcalculator) can pick it up and calculate the diffs.
 func (p *pubsubDiffPublisher) CalculateDiffs(ctx context.Context, grouping paramtools.Params, left, right []types.Digest) error {
-	ctx, span := trace.StartSpan(ctx, "PublishDiffMessage")
-	defer span.End()
 	body, err := json.Marshal(diff.WorkerMessage{
 		Version:         diff.WorkerMessageVersion,
 		Grouping:        grouping,
diff --git a/golden/go/search2/search2.go b/golden/go/search2/search2.go
index 9045a92..4716ee9 100644
--- a/golden/go/search2/search2.go
+++ b/golden/go/search2/search2.go
@@ -609,12 +609,12 @@
 	if q.IncludeIgnoredTraces {
 		ignoreStatuses = append(ignoreStatuses, true)
 	}
-	args := []interface{}{getFirstCommitID(ctx), ignoreStatuses}
+	var args []interface{}
 
 	if q.OnlyIncludeDigestsProducedAtHead {
 		if len(keyFilters) == 0 {
 			// Corpus is being used as a string
-			args = append(args, q.TraceValues[types.CorpusField][0])
+			args = append(args, getFirstCommitID(ctx), ignoreStatuses, q.TraceValues[types.CorpusField][0])
 			return `
 MatchingTraces AS (
     SELECT trace_id, grouping_id, digest FROM ValuesAtHead
@@ -624,7 +624,7 @@
 )`, args
 		}
 		// Corpus is being used as a JSONB value here
-		args = append(args, `"`+q.TraceValues[types.CorpusField][0]+`"`)
+		args = append(args, getFirstCommitID(ctx), ignoreStatuses, `"`+q.TraceValues[types.CorpusField][0]+`"`)
 		return joinedTracesStatement(keyFilters) + `
 MatchingTraces AS (
     SELECT ValuesAtHead.trace_id, grouping_id, digest FROM ValuesAtHead
@@ -635,34 +635,35 @@
 	} else {
 		if len(keyFilters) == 0 {
 			// Corpus is being used as a string
-			args = append(args, q.TraceValues[types.CorpusField][0])
+			args = append(args, getFirstCommitID(ctx), ignoreStatuses, q.TraceValues[types.CorpusField][0], getFirstTileID(ctx))
 			return `
 TracesOfInterest AS (
-    SELECT trace_id FROM ValuesAtHead
+    SELECT trace_id, grouping_id FROM ValuesAtHead
 	WHERE matches_any_ignore_rule = ANY($3) AND
+		most_recent_commit_id >= $2 AND
     	corpus = $4
 ),
 MatchingTraces AS (
-	SELECT DISTINCT TraceValues.trace_id, TraceValues.grouping_id, TraceValues.digest
-	FROM TraceValues
-	JOIN TracesOfInterest on TraceValues.trace_id = TracesOfInterest.trace_id
-	WHERE commit_id >= $2
+	SELECT DISTINCT TiledTraceDigests.trace_id, TracesOfInterest.grouping_id, TiledTraceDigests.digest
+	FROM TiledTraceDigests
+	JOIN TracesOfInterest on TracesOfInterest.trace_id = TiledTraceDigests.trace_id
+	WHERE tile_id >= $5
 )
 `, args
 		}
 		// Corpus is being used as a JSONB value here
-		args = append(args, `"`+q.TraceValues[types.CorpusField][0]+`"`)
+		args = append(args, getFirstTileID(ctx), ignoreStatuses, `"`+q.TraceValues[types.CorpusField][0]+`"`)
 		return joinedTracesStatement(keyFilters) + `
 TracesOfInterest AS (
-    SELECT Traces.trace_id FROM Traces
+    SELECT Traces.trace_id, grouping_id FROM Traces
 	JOIN JoinedTraces ON Traces.trace_id = JoinedTraces.trace_id
 	WHERE matches_any_ignore_rule = ANY($3)
 ),
 MatchingTraces AS (
-	SELECT DISTINCT TraceValues.trace_id, TraceValues.grouping_id, TraceValues.digest
-	FROM TraceValues
-	JOIN TracesOfInterest on TraceValues.trace_id = TracesOfInterest.trace_id
-	WHERE commit_id >= $2
+	SELECT DISTINCT TiledTraceDigests.trace_id, TracesOfInterest.grouping_id, TiledTraceDigests.digest
+	FROM TiledTraceDigests
+	JOIN TracesOfInterest on TracesOfInterest.trace_id = TiledTraceDigests.trace_id
+	WHERE tile_id >= $2
 )`, args
 	}
 }
@@ -671,7 +672,7 @@
 // This table contains just the trace_ids that match the given filters. filters is expected to
 // have keys which passed sanitization (it will sanitize the values). The snippet will include
 // other tables that will be unioned and intersected to create the appropriate rows. This is
-// similar to the technique we use for ingore rules, chosen to maximize consistent performance
+// similar to the technique we use for ignore rules, chosen to maximize consistent performance
 // by using the inverted key index. The keys and values are hardcoded into the string instead
 // of being passed in as arguments because kjlubick@ was not able to use the placeholder values
 //to compare JSONB types removed from a JSONB object to a string while still using the indexes.
@@ -1048,52 +1049,62 @@
 	ctx, span := trace.StartSpan(ctx, "fillOutTraceHistory")
 	span.AddAttributes(trace.Int64Attribute("results", int64(len(inputs))))
 	defer span.End()
-	rv := make([]*frontend.SearchResult, len(inputs))
-	for i, input := range inputs {
-		sr := &frontend.SearchResult{
-			Digest: types.Digest(hex.EncodeToString(input.leftDigest)),
-			RefDiffs: map[common.RefClosest]*frontend.SRDiffDigest{
-				common.PositiveRef: input.closestPositive,
-				common.NegativeRef: input.closestNegative,
-			},
-		}
-		if input.closestDigest != nil && input.closestDigest.Status == expectations.Positive {
-			sr.ClosestRef = common.PositiveRef
-		} else if input.closestDigest != nil && input.closestDigest.Status == expectations.Negative {
-			sr.ClosestRef = common.NegativeRef
-		}
-		tg, err := s.traceGroupForTraces(ctx, input.traceIDs, sr.Digest)
-		if err != nil {
-			return nil, skerr.Wrap(err)
-		}
-		if err := s.fillInExpectations(ctx, &tg, input.groupingID); err != nil {
-			return nil, skerr.Wrap(err)
-		}
-		if err := s.fillInTraceParams(ctx, &tg); err != nil {
-			return nil, skerr.Wrap(err)
-		}
-		sr.TraceGroup = tg
-		if len(tg.Digests) > 0 {
-			// The first digest in the trace group is this digest.
-			sr.Status = tg.Digests[0].Status
-		} else {
-			// We assume untriaged if digest is not in the window.
-			sr.Status = expectations.Untriaged
-		}
-		if len(tg.Traces) > 0 {
-			// Grab the test name from the first trace, since we know all the traces are of
-			// the same grouping, which includes test name.
-			sr.Test = types.TestName(tg.Traces[0].Params[types.PrimaryKeyField])
-		}
-		leftPS := paramtools.ParamSet{}
-		for _, tr := range tg.Traces {
-			leftPS.AddParams(tr.Params)
-		}
-		leftPS.Normalize()
-		sr.ParamSet = leftPS
-		rv[i] = sr
+	// Fill out these histories in parallel. We avoid race conditions by writing to a prescribed
+	// index in the results slice.
+	results := make([]*frontend.SearchResult, len(inputs))
+	eg, eCtx := errgroup.WithContext(ctx)
+	for i, j := range inputs {
+		idx, input := i, j
+		eg.Go(func() error {
+			sr := &frontend.SearchResult{
+				Digest: types.Digest(hex.EncodeToString(input.leftDigest)),
+				RefDiffs: map[common.RefClosest]*frontend.SRDiffDigest{
+					common.PositiveRef: input.closestPositive,
+					common.NegativeRef: input.closestNegative,
+				},
+			}
+			if input.closestDigest != nil && input.closestDigest.Status == expectations.Positive {
+				sr.ClosestRef = common.PositiveRef
+			} else if input.closestDigest != nil && input.closestDigest.Status == expectations.Negative {
+				sr.ClosestRef = common.NegativeRef
+			}
+			tg, err := s.traceGroupForTraces(eCtx, input.traceIDs, sr.Digest)
+			if err != nil {
+				return skerr.Wrap(err)
+			}
+			if err := s.fillInExpectations(eCtx, &tg, input.groupingID); err != nil {
+				return skerr.Wrap(err)
+			}
+			if err := s.fillInTraceParams(eCtx, &tg); err != nil {
+				return skerr.Wrap(err)
+			}
+			sr.TraceGroup = tg
+			if len(tg.Digests) > 0 {
+				// The first digest in the trace group is this digest.
+				sr.Status = tg.Digests[0].Status
+			} else {
+				// We assume untriaged if digest is not in the window.
+				sr.Status = expectations.Untriaged
+			}
+			if len(tg.Traces) > 0 {
+				// Grab the test name from the first trace, since we know all the traces are of
+				// the same grouping, which includes test name.
+				sr.Test = types.TestName(tg.Traces[0].Params[types.PrimaryKeyField])
+			}
+			leftPS := paramtools.ParamSet{}
+			for _, tr := range tg.Traces {
+				leftPS.AddParams(tr.Params)
+			}
+			leftPS.Normalize()
+			sr.ParamSet = leftPS
+			results[idx] = sr
+			return nil
+		})
 	}
-	return rv, nil
+	if err := eg.Wait(); err != nil {
+		return nil, skerr.Wrap(err)
+	}
+	return results, nil
 }
 
 type traceDigestCommit struct {
diff --git a/golden/go/sql/schema/sql.go b/golden/go/sql/schema/sql.go
index fac2526..7e37793 100644
--- a/golden/go/sql/schema/sql.go
+++ b/golden/go/sql/schema/sql.go
@@ -49,7 +49,8 @@
   digest BYTES,
   label CHAR NOT NULL,
   expectation_record_id UUID,
-  PRIMARY KEY (grouping_id, digest)
+  PRIMARY KEY (grouping_id, digest),
+  INDEX label_idx (label)
 );
 CREATE TABLE IF NOT EXISTS GitCommits (
   git_hash STRING PRIMARY KEY,
@@ -138,7 +139,8 @@
   digest BYTES NOT NULL,
   grouping_id BYTES NOT NULL,
   PRIMARY KEY (trace_id, tile_id, digest),
-  INDEX grouping_digest_idx (grouping_id, digest)
+  INDEX grouping_digest_idx (grouping_id, digest),
+  INDEX tile_trace_idx (tile_id, trace_id)
 );
 CREATE TABLE IF NOT EXISTS TrackingCommits (
   repo STRING PRIMARY KEY,
diff --git a/golden/go/sql/schema/tables.go b/golden/go/sql/schema/tables.go
index a5d44ec..49fd412 100644
--- a/golden/go/sql/schema/tables.go
+++ b/golden/go/sql/schema/tables.go
@@ -474,6 +474,7 @@
 	// ExpectationRecordID corresponds to most recent ExpectationRecordRow that set the given label.
 	ExpectationRecordID *uuid.UUID `sql:"expectation_record_id UUID"`
 	primaryKey          struct{}   `sql:"PRIMARY KEY (grouping_id, digest)"`
+	labelIndex          struct{}   `sql:"INDEX label_idx (label)"`
 }
 
 // ToSQLRow implements the sqltest.SQLExporter interface.
@@ -657,6 +658,7 @@
 	// This index makes it easier to answer the question "What digests are being produced by
 	// a given grouping on the primary branch).
 	groupingDigestIndex struct{} `sql:"INDEX grouping_digest_idx (grouping_id, digest)"`
+	tileTraceIndex      struct{} `sql:"INDEX tile_trace_idx (tile_id, trace_id)"`
 }
 
 // ToSQLRow implements the sqltest.SQLExporter interface.