[perf] Add trace store NumMatches().

Also clean up signature of trace store PreflightQuery().

Change-Id: I79093c3f32a387abaf7960edc0221863473ccdaa
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/338236
Reviewed-by: Joe Gregorio <jcgregorio@google.com>
Commit-Queue: Joe Gregorio <jcgregorio@google.com>
diff --git a/perf/go/dataframe/dataframe.go b/perf/go/dataframe/dataframe.go
index 1f44f10..c327f7d 100644
--- a/perf/go/dataframe/dataframe.go
+++ b/perf/go/dataframe/dataframe.go
@@ -46,9 +46,15 @@
 	// points ending at the given 'end' time for the given keys.
 	NewNFromKeys(ctx context.Context, end time.Time, keys []string, n int32, progress progress.Progress) (*DataFrame, error)
 
+	// NumMatches returns the number of traces that will match the query.
+	NumMatches(ctx context.Context, q *query.Query) (int64, error)
+
 	// PreflightQuery returns the number of traces that will match the query and
-	// the ParamSet of all the matching traces.
-	PreflightQuery(ctx context.Context, end time.Time, q *query.Query) (int64, paramtools.ReadOnlyParamSet, error)
+	// a refined ParamSet to use for further queries. The referenceParamSet
+	// should be a master ParamSet that includes all the Params that could
+	// appear in a query. For example, the ParamSet managed by
+	// ParamSetRefresher.
+	PreflightQuery(ctx context.Context, q *query.Query, referenceParamSet paramtools.ReadOnlyParamSet) (int64, paramtools.ParamSet, error)
 }
 
 // ColumnHeader describes each column in a DataFrame.
diff --git a/perf/go/dataframe/mocks/DataFrameBuilder.go b/perf/go/dataframe/mocks/DataFrameBuilder.go
index 1da5a37..e3ae030 100644
--- a/perf/go/dataframe/mocks/DataFrameBuilder.go
+++ b/perf/go/dataframe/mocks/DataFrameBuilder.go
@@ -114,29 +114,50 @@
 	return r0, r1
 }
 
-// PreflightQuery provides a mock function with given fields: ctx, end, q
-func (_m *DataFrameBuilder) PreflightQuery(ctx context.Context, end time.Time, q *query.Query) (int64, paramtools.ReadOnlyParamSet, error) {
-	ret := _m.Called(ctx, end, q)
+// NumMatches provides a mock function with given fields: ctx, q
+func (_m *DataFrameBuilder) NumMatches(ctx context.Context, q *query.Query) (int64, error) {
+	ret := _m.Called(ctx, q)
 
 	var r0 int64
-	if rf, ok := ret.Get(0).(func(context.Context, time.Time, *query.Query) int64); ok {
-		r0 = rf(ctx, end, q)
+	if rf, ok := ret.Get(0).(func(context.Context, *query.Query) int64); ok {
+		r0 = rf(ctx, q)
 	} else {
 		r0 = ret.Get(0).(int64)
 	}
 
-	var r1 paramtools.ReadOnlyParamSet
-	if rf, ok := ret.Get(1).(func(context.Context, time.Time, *query.Query) paramtools.ReadOnlyParamSet); ok {
-		r1 = rf(ctx, end, q)
+	var r1 error
+	if rf, ok := ret.Get(1).(func(context.Context, *query.Query) error); ok {
+		r1 = rf(ctx, q)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// PreflightQuery provides a mock function with given fields: ctx, q, referenceParamSet
+func (_m *DataFrameBuilder) PreflightQuery(ctx context.Context, q *query.Query, referenceParamSet paramtools.ReadOnlyParamSet) (int64, paramtools.ParamSet, error) {
+	ret := _m.Called(ctx, q, referenceParamSet)
+
+	var r0 int64
+	if rf, ok := ret.Get(0).(func(context.Context, *query.Query, paramtools.ReadOnlyParamSet) int64); ok {
+		r0 = rf(ctx, q, referenceParamSet)
+	} else {
+		r0 = ret.Get(0).(int64)
+	}
+
+	var r1 paramtools.ParamSet
+	if rf, ok := ret.Get(1).(func(context.Context, *query.Query, paramtools.ReadOnlyParamSet) paramtools.ParamSet); ok {
+		r1 = rf(ctx, q, referenceParamSet)
 	} else {
 		if ret.Get(1) != nil {
-			r1 = ret.Get(1).(paramtools.ReadOnlyParamSet)
+			r1 = ret.Get(1).(paramtools.ParamSet)
 		}
 	}
 
 	var r2 error
-	if rf, ok := ret.Get(2).(func(context.Context, time.Time, *query.Query) error); ok {
-		r2 = rf(ctx, end, q)
+	if rf, ok := ret.Get(2).(func(context.Context, *query.Query, paramtools.ReadOnlyParamSet) error); ok {
+		r2 = rf(ctx, q, referenceParamSet)
 	} else {
 		r2 = ret.Error(2)
 	}
diff --git a/perf/go/dfbuilder/dfbuilder.go b/perf/go/dfbuilder/dfbuilder.go
index ab6a43b..72c15da 100644
--- a/perf/go/dfbuilder/dfbuilder.go
+++ b/perf/go/dfbuilder/dfbuilder.go
@@ -513,15 +513,14 @@
 	return ret, nil
 }
 
-// See DataFrameBuilder.
-func (b *builder) PreflightQuery(ctx context.Context, end time.Time, q *query.Query) (int64, paramtools.ReadOnlyParamSet, error) {
+// PreflightQuery implements dataframe.DataFrameBuilder.
+func (b *builder) PreflightQuery(ctx context.Context, q *query.Query, referenceParamSet paramtools.ReadOnlyParamSet) (int64, paramtools.ParamSet, error) {
 	ctx, span := trace.StartSpan(ctx, "dfbuiler.PreflightQuery")
 	defer span.End()
 
 	defer timer.NewWithSummary("perfserver_dfbuilder_PreflightQuery", b.preflightQueryTimer).Stop()
 
 	var count int64
-	largestParamSet := paramtools.ReadOnlyParamSet{}
 
 	tileNumber, err := b.store.GetLatestTile(ctx)
 	if err != nil {
@@ -529,40 +528,13 @@
 	}
 
 	if q.Empty() {
-		// If the query is empty then we have a shortcut for building the
-		// ParamSet by just using the OPS. In that case we only need to count
-		// encodedKeys to get the count.
-		for i := 0; i < 2; i++ {
-			fullParamSet, err := b.store.GetParamSet(ctx, tileNumber)
-			if err != nil {
-				return -1, nil, err
-			}
-			largestParamSet = fullParamSet
-			tileNumber = tileNumber.Prev()
-			if tileNumber == types.BadTileNumber {
-				break
-			}
-		}
-
-		count, err = b.store.TraceCount(ctx, tileNumber)
-		if err != nil {
-			return -1, nil, err
-		}
-		return count, largestParamSet, nil
-
+		return -1, nil, skerr.Fmt("Can not pre-flight an empty query")
 	}
 
 	// Since the query isn't empty we'll have to run a partial query
 	// to build the ParamSet. Do so over the two most recent tiles.
 	ps := paramtools.NewParamSet()
 
-	// Record the OPS for the first tile.
-	psOne, err := b.store.GetParamSet(ctx, tileNumber)
-	if err != nil {
-		return -1, nil, err
-	}
-
-	largestParamSet = psOne
 	// Count the matches and sum the params in the first tile.
 	out, err := b.store.QueryTracesIDOnly(ctx, tileNumber, q)
 	if err != nil {
@@ -578,12 +550,6 @@
 	// Now move to the previous tile.
 	tileNumber = tileNumber.Prev()
 	if tileNumber != types.BadTileNumber {
-		// Record the OPS for the second tile.
-		psTwo, err := b.store.GetParamSet(ctx, tileNumber)
-		if err != nil {
-			return -1, nil, err
-		}
-
 		// Count the matches and sum the params in the second tile.
 		out, err = b.store.QueryTracesIDOnly(ctx, tileNumber, q)
 		if err != nil {
@@ -598,10 +564,6 @@
 		if tileTwoCount > count {
 			count = tileTwoCount
 		}
-		// Use the larger of the two OPSs to work with.
-		if psTwo.Size() > ps.Size() {
-			largestParamSet = psTwo
-		}
 	}
 
 	// Now we have the ParamSet that corresponds to the query, but for each
@@ -613,11 +575,56 @@
 		return -1, nil, err
 	}
 	for key := range queryPlan {
-		queryPlan[key] = ps[key]
+		ps[key] = referenceParamSet[key]
 	}
-	queryPlan.Normalize()
+	ps.Normalize()
 
-	return count, queryPlan.Freeze(), nil
+	return count, ps, nil
+}
+
+// NumMatches implements dataframe.DataFrameBuilder.
+func (b *builder) NumMatches(ctx context.Context, q *query.Query) (int64, error) {
+	ctx, span := trace.StartSpan(ctx, "dfbuiler.NumMatches")
+	defer span.End()
+	defer timer.NewWithSummary("perfserver_dfbuilder_NumMatches", b.preflightQueryTimer).Stop()
+
+	var count int64
+
+	tileNumber, err := b.store.GetLatestTile(ctx)
+	if err != nil {
+		return -1, err
+	}
+
+	// Count the matches in the first tile.
+	out, err := b.store.QueryTracesIDOnly(ctx, tileNumber, q)
+	if err != nil {
+		return -1, fmt.Errorf("Failed to query traces: %s", err)
+	}
+	var tileOneCount int64
+	for range out {
+		tileOneCount++
+	}
+	count = tileOneCount
+
+	// Now move to the previous tile.
+	tileNumber = tileNumber.Prev()
+	if tileNumber != types.BadTileNumber {
+		// Count the matches in the second tile.
+		out, err = b.store.QueryTracesIDOnly(ctx, tileNumber, q)
+		if err != nil {
+			return -1, fmt.Errorf("Failed to query traces: %s", err)
+		}
+		var tileTwoCount int64
+		for range out {
+			tileTwoCount++
+		}
+		// Use the larger of the two counts as our result.
+		if tileTwoCount > count {
+			count = tileTwoCount
+		}
+	}
+
+	return count, nil
 }
 
 // Validate that *builder faithfully implements the DataFrameBuidler interface.
diff --git a/perf/go/dfbuilder/dfbuilder_test.go b/perf/go/dfbuilder/dfbuilder_test.go
index 8350ae1..9380399 100644
--- a/perf/go/dfbuilder/dfbuilder_test.go
+++ b/perf/go/dfbuilder/dfbuilder_test.go
@@ -241,3 +241,230 @@
 	assert.Empty(t, columnHeaders)
 	assert.Empty(t, commitNumbers)
 }
+
+func TestPreflightQuery_EmptyQuery_ReturnsError(t *testing.T) {
+	unittest.LargeTest(t)
+	ctx := context.Background()
+
+	ctx, db, _, _, instanceConfig, _, cleanup := gittest.NewForTest(t)
+	defer cleanup()
+	g, err := perfgit.New(ctx, true, db, instanceConfig)
+	require.NoError(t, err)
+
+	instanceConfig.DataStoreConfig.TileSize = 6
+
+	store, err := sqltracestore.New(db, instanceConfig.DataStoreConfig)
+	require.NoError(t, err)
+
+	builder := NewDataFrameBuilderFromTraceStore(g, store)
+
+	// Add some points to the first tile.
+	err = addValuesAtIndex(store, 0, map[string]float32{
+		",arch=x86,config=8888,": 1.2,
+		",arch=x86,config=565,":  2.1,
+		",arch=arm,config=8888,": 100.5,
+	}, "gs://foo.json", time.Now())
+	assert.NoError(t, err)
+
+	q, err := query.NewFromString("")
+	require.NoError(t, err)
+	_, _, err = builder.PreflightQuery(ctx, q, paramtools.NewReadOnlyParamSet())
+	require.Error(t, err)
+	assert.Contains(t, err.Error(), "Can not pre-flight an empty query")
+}
+
+func TestPreflightQuery_NonEmptyQuery_Success(t *testing.T) {
+	unittest.LargeTest(t)
+	ctx := context.Background()
+
+	ctx, db, _, _, instanceConfig, _, cleanup := gittest.NewForTest(t)
+	defer cleanup()
+	g, err := perfgit.New(ctx, true, db, instanceConfig)
+	require.NoError(t, err)
+
+	instanceConfig.DataStoreConfig.TileSize = 6
+
+	store, err := sqltracestore.New(db, instanceConfig.DataStoreConfig)
+	require.NoError(t, err)
+
+	builder := NewDataFrameBuilderFromTraceStore(g, store)
+
+	// Add some points to the first tile.
+	err = addValuesAtIndex(store, 0, map[string]float32{
+		",arch=x86,config=8888,": 1.2,
+		",arch=x86,config=565,":  2.1,
+		",arch=arm,config=8888,": 100.5,
+	}, "gs://foo.json", time.Now())
+	assert.NoError(t, err)
+
+	// Create a query that will match two of the points.
+	q, err := query.NewFromString("config=8888")
+	require.NoError(t, err)
+
+	// The reference ParamSet contains values that should not appear in the
+	// returned ParamSet, and some that get retained.
+	referenceParamSet := paramtools.ReadOnlyParamSet{
+		"arch":   {"x86", "arm", "should-disappear"},
+		"config": {"565", "8888", "should-be-retained"},
+		// 'should-be-retained' is retained because 'config' is in the query and
+		// so all 'config' values should be returned in the ParamSet.
+	}
+	count, ps, err := builder.PreflightQuery(ctx, q, referenceParamSet)
+	require.NoError(t, err)
+	assert.Equal(t, int64(2), count)
+
+	expectedParamSet := paramtools.ParamSet{
+		"arch":   {"arm", "x86"},
+		"config": {"565", "8888", "should-be-retained"},
+	}
+	assert.Equal(t, expectedParamSet, ps)
+}
+
+func TestPreflightQuery_TilesContainDifferentNumberOfMatches_ReturnedParamSetReflectsBothTiles(t *testing.T) {
+	unittest.LargeTest(t)
+	ctx := context.Background()
+
+	ctx, db, _, _, instanceConfig, _, cleanup := gittest.NewForTest(t)
+	defer cleanup()
+	g, err := perfgit.New(ctx, true, db, instanceConfig)
+	require.NoError(t, err)
+
+	instanceConfig.DataStoreConfig.TileSize = 6
+
+	store, err := sqltracestore.New(db, instanceConfig.DataStoreConfig)
+	require.NoError(t, err)
+
+	builder := NewDataFrameBuilderFromTraceStore(g, store)
+
+	// Add some points to the first tile.
+	err = addValuesAtIndex(store, 0, map[string]float32{
+		",arch=x86,config=8888,": 1.2,
+		",arch=x86,config=565,":  2.1,
+		",arch=arm,config=8888,": 100.5,
+	}, "gs://foo.json", time.Now())
+	assert.NoError(t, err)
+
+	// Add some points to the second tile.
+	err = addValuesAtIndex(store, types.CommitNumber(instanceConfig.DataStoreConfig.TileSize), map[string]float32{
+		",arch=riscv,config=8888,": 1.2,
+	}, "gs://foo.json", time.Now())
+	assert.NoError(t, err)
+
+	// Create a query that will match two of the points in tile 0 and one of the points in tile 1.
+	q, err := query.NewFromString("config=8888")
+	require.NoError(t, err)
+
+	// The reference ParamSet contains values that should not appear in the
+	// returned ParamSet, and some that get retained.
+	referenceParamSet := paramtools.ReadOnlyParamSet{
+		"arch":   {"x86", "arm", "should-disappear"},
+		"config": {"565", "8888", "should-be-retained"},
+		// 'should-be-retained' is retained because 'config' is in the query and
+		// so all 'config' values should be returned in the ParamSet.
+	}
+	count, ps, err := builder.PreflightQuery(ctx, q, referenceParamSet)
+	require.NoError(t, err)
+	assert.Equal(t, int64(2), count)
+
+	expectedParamSet := paramtools.ParamSet{
+		"arch":   {"arm", "riscv", "x86"},
+		"config": {"565", "8888", "should-be-retained"},
+	}
+	assert.Equal(t, expectedParamSet, ps)
+}
+
+func TestNumMatches_EmptyQuery_ReturnsError(t *testing.T) {
+	unittest.LargeTest(t)
+	ctx := context.Background()
+
+	ctx, db, _, _, instanceConfig, _, cleanup := gittest.NewForTest(t)
+	defer cleanup()
+	g, err := perfgit.New(ctx, true, db, instanceConfig)
+	require.NoError(t, err)
+
+	instanceConfig.DataStoreConfig.TileSize = 6
+
+	store, err := sqltracestore.New(db, instanceConfig.DataStoreConfig)
+	require.NoError(t, err)
+
+	builder := NewDataFrameBuilderFromTraceStore(g, store)
+	q, err := query.NewFromString("")
+	require.NoError(t, err)
+	_, err = builder.NumMatches(ctx, q)
+	require.Error(t, err)
+}
+
+func TestNumMatches_NonEmptyQuery_Success(t *testing.T) {
+	unittest.LargeTest(t)
+	ctx := context.Background()
+
+	ctx, db, _, _, instanceConfig, _, cleanup := gittest.NewForTest(t)
+	defer cleanup()
+	g, err := perfgit.New(ctx, true, db, instanceConfig)
+	require.NoError(t, err)
+
+	instanceConfig.DataStoreConfig.TileSize = 6
+
+	store, err := sqltracestore.New(db, instanceConfig.DataStoreConfig)
+	require.NoError(t, err)
+
+	builder := NewDataFrameBuilderFromTraceStore(g, store)
+
+	// Add some points to the first tile.
+	err = addValuesAtIndex(store, 0, map[string]float32{
+		",arch=x86,config=8888,": 1.2,
+		",arch=x86,config=565,":  2.1,
+		",arch=arm,config=8888,": 100.5,
+	}, "gs://foo.json", time.Now())
+	assert.NoError(t, err)
+
+	// Create a query that will match two of the points.
+	q, err := query.NewFromString("config=8888")
+	require.NoError(t, err)
+
+	count, err := builder.NumMatches(ctx, q)
+	require.NoError(t, err)
+	assert.Equal(t, int64(2), count)
+}
+
+func TestNumMatches_TilesContainDifferentNumberOfMatches_TheLargerOfTheTwoCountsIsReturned(t *testing.T) {
+	unittest.LargeTest(t)
+	ctx := context.Background()
+
+	ctx, db, _, _, instanceConfig, _, cleanup := gittest.NewForTest(t)
+	defer cleanup()
+	g, err := perfgit.New(ctx, true, db, instanceConfig)
+	require.NoError(t, err)
+
+	instanceConfig.DataStoreConfig.TileSize = 6
+
+	store, err := sqltracestore.New(db, instanceConfig.DataStoreConfig)
+	require.NoError(t, err)
+
+	builder := NewDataFrameBuilderFromTraceStore(g, store)
+
+	// Add some points to the latest tile.
+	err = addValuesAtIndex(store, types.CommitNumber(instanceConfig.DataStoreConfig.TileSize+1), map[string]float32{
+		",arch=x86,config=8888,": 1.2,
+		",arch=x86,config=565,":  2.1,
+		",arch=arm,config=8888,": 100.5,
+	}, "gs://foo.json", time.Now())
+	assert.NoError(t, err)
+
+	// Add some points to the previous tile.
+	err = addValuesAtIndex(store, 1, map[string]float32{
+		",arch=x86,config=8888,":   1.2,
+		",arch=riscv,config=8888,": 2.1,
+		",arch=arm,config=8888,":   100.5,
+	}, "gs://foo.json", time.Now())
+	assert.NoError(t, err)
+
+	// Create a query that will match two of the points in tile 1, but three
+	// points in tile 0.
+	q, err := query.NewFromString("config=8888")
+	require.NoError(t, err)
+
+	count, err := builder.NumMatches(ctx, q)
+	require.NoError(t, err)
+	assert.Equal(t, int64(3), count)
+}
diff --git a/perf/go/frontend/frontend.go b/perf/go/frontend/frontend.go
index b8d2369..40684bf 100644
--- a/perf/go/frontend/frontend.go
+++ b/perf/go/frontend/frontend.go
@@ -633,19 +633,19 @@
 		return
 	}
 	resp := CountHandlerResponse{}
+	fullPS := f.paramsetRefresher.Get()
 	if cr.Q == "" {
-		ps := f.paramsetRefresher.Get()
 		resp.Count = 0
-		resp.Paramset = ps
+		resp.Paramset = fullPS
 	} else {
-		count, ps, err := f.dfBuilder.PreflightQuery(r.Context(), time.Now(), q)
+		count, ps, err := f.dfBuilder.PreflightQuery(r.Context(), q, fullPS)
 		if err != nil {
 			httputils.ReportError(w, err, "Failed to Preflight the query, too many key-value pairs selected. Limit is 200.", http.StatusBadRequest)
 			return
 		}
 
 		resp.Count = int(count)
-		resp.Paramset = ps
+		resp.Paramset = ps.Freeze()
 	}
 	if err := json.NewEncoder(w).Encode(resp); err != nil {
 		sklog.Errorf("Failed to encode paramset: %s", err)
diff --git a/perf/go/regression/continuous/continuous.go b/perf/go/regression/continuous/continuous.go
index 2e61b7d..3efebfe 100644
--- a/perf/go/regression/continuous/continuous.go
+++ b/perf/go/regression/continuous/continuous.go
@@ -383,7 +383,7 @@
 					sklog.Warningf("Alert failed smoketest: Alert contains invalid query: %q: %s", cfg.Query, err)
 					continue
 				}
-				matches, _, err := c.dfBuilder.PreflightQuery(context.Background(), time.Time{}, q)
+				matches, err := c.dfBuilder.NumMatches(context.Background(), q)
 				if err != nil {
 					sklog.Warningf("Alert failed smoketest: %q Failed while trying generic query: %s", cfg.DisplayName, err)
 					continue
diff --git a/perf/go/tracestore/sqltracestore/sqltracestore.go b/perf/go/tracestore/sqltracestore/sqltracestore.go
index 17ec178..2126b37 100644
--- a/perf/go/tracestore/sqltracestore/sqltracestore.go
+++ b/perf/go/tracestore/sqltracestore/sqltracestore.go
@@ -296,7 +296,7 @@
         SELECT
             trace_id
         FROM
-            Postings
+            Postings@primary
             {{ .AsOf }}
         WHERE
             tile_number = {{ .TileNumber }}