[gold] worker2.go: Make getCommonAndRecentDigests() faster.

The main query in this function makes an expensive join between the Traces and TraceValues tables. This CL breaks that join into two stand-alone queries, one for fetching trace IDs from the Traces table, and another one that filters TraceValues with a "WHERE trace_id IN (id1, id2, ...)" clause. This optimization should getCommonAndRecentDigests() 4x faster.

Investigation: http://go/slow-gold-diffs-2022q4#bookmark=id.2um37jdpl0hl.

Bug: skia:13306
Change-Id: Ia291beb36ef70073bbf01cd48fef3d619f085212
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/605896
Reviewed-by: Kevin Lubick <kjlubick@google.com>
Commit-Queue: Leandro Lovisolo <lovisolo@google.com>
diff --git a/golden/go/diff/worker/worker2.go b/golden/go/diff/worker/worker2.go
index 77cc68e..dfed2f6 100644
--- a/golden/go/diff/worker/worker2.go
+++ b/golden/go/diff/worker/worker2.go
@@ -5,6 +5,7 @@
 	"context"
 	"encoding/hex"
 	"encoding/json"
+	"fmt"
 	"image"
 	"image/png"
 	"time"
@@ -573,6 +574,7 @@
 	ctx, span := trace.StartSpan(ctx, "getCommonAndRecentDigests")
 	defer span.End()
 
+	// Find the first commit in the target range of commits.
 	row := w.db.QueryRow(ctx, `WITH
 RecentCommits AS (
 	SELECT commit_id FROM CommitsWithData
@@ -588,14 +590,22 @@
 		return nil, skerr.Wrap(err)
 	}
 
-	const statement = `WITH
-TracesOfInterest AS (
-  SELECT trace_id FROM Traces WHERE grouping_id = $1
-),
+	// Find all the traces for the target grouping.
+	traceIDs, err := w.findTraceIDsForGrouping(ctx, grouping)
+	if err != nil {
+		return nil, skerr.Wrap(err)
+	}
+
+	// We used to join the Traces and TraceValues tables, but that turned out to be slow. Filtering
+	// TraceValues using a list of trace IDs can be faster when the list of trace IDs is large.
+	// For example, lovisolo@ observed that for a grouping with ~1000 traces, the below query takes
+	// ~40s when joining the two tables, or ~6s when using "WHERE trace_id IN (id1, id2, ...)".
+	traceIDPlaceholders := sql.ValuesPlaceholders(1 /* =valuesPerRow */, len(traceIDs))
+
+	statement := `WITH
 DigestsCountsAndMostRecent AS (
   SELECT digest, count(*) as occurrences, max(commit_id) as recent FROM TraceValues
-  JOIN TracesOfInterest ON TraceValues.trace_id = TracesOfInterest.trace_id
-  WHERE commit_id >= $2
+  WHERE trace_id IN (` + traceIDPlaceholders + `) AND commit_id >= ` + fmt.Sprintf("$%d", len(traceIDs)+1) + `
   GROUP BY digest
 ),
 MostCommon AS (
@@ -615,8 +625,15 @@
 UNION
 SELECT * FROM MostRecent`
 
-	_, groupingID := sql.SerializeMap(grouping)
-	rows, err := w.db.Query(ctx, statement, groupingID, firstCommitID)
+	// Build a list of arguments with the IDs of all traces of interest and the firstCommitID.
+	args := make([]interface{}, 0, len(traceIDs)+1)
+	for _, traceID := range traceIDs {
+		args = append(args, traceID)
+	}
+	args = append(args, firstCommitID)
+
+	// Execute the statement and return the result.
+	rows, err := w.db.Query(ctx, statement, args...)
 	if err != nil {
 		return nil, skerr.Wrap(err)
 	}
@@ -632,6 +649,29 @@
 	return rv, nil
 }
 
+// findTraceIDsForGrouping returns the trace IDs corresponding to the given grouping.
+func (w *WorkerImpl) findTraceIDsForGrouping(ctx context.Context, grouping paramtools.Params) ([]schema.TraceID, error) {
+	ctx, span := trace.StartSpan(ctx, "findTraceIDsForGrouping")
+	defer span.End()
+
+	_, groupingID := sql.SerializeMap(grouping)
+	rows, err := w.db.Query(ctx, "SELECT trace_id FROM Traces WHERE grouping_id = $1", groupingID)
+	if err != nil {
+		return nil, skerr.Wrap(err)
+	}
+	defer rows.Close()
+
+	var traceIDs []schema.TraceID
+	for rows.Next() {
+		var tid schema.TraceID
+		if err := rows.Scan(&tid); err != nil {
+			return nil, skerr.Wrap(err)
+		}
+		traceIDs = append(traceIDs, tid)
+	}
+	return traceIDs, nil
+}
+
 type digestPair struct {
 	left  types.Digest
 	right types.Digest