Update the QueryTraces function

- Since the tracenames are already available in one go after QueryTracesIDOnly, we can simply take all the trace names in a slice and then chunk the slice directly instead of pushing it into another channel.
- This simplifies the code and also makes it slightly faster.

Change-Id: I84020255ee1dca54106c884b590d97998f1b76e5
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/1056339
Commit-Queue: Ashwin Verleker <ashwinpv@google.com>
Reviewed-by: Tony Seaward <seawardt@google.com>
diff --git a/perf/go/tracestore/sqltracestore/BUILD.bazel b/perf/go/tracestore/sqltracestore/BUILD.bazel
index bfa296d..92118b8 100644
--- a/perf/go/tracestore/sqltracestore/BUILD.bazel
+++ b/perf/go/tracestore/sqltracestore/BUILD.bazel
@@ -34,7 +34,6 @@
         "@com_github_hashicorp_golang_lru//:golang-lru",
         "@com_github_jackc_pgx_v4//:pgx",
         "@io_opencensus_go//trace",
-        "@org_golang_x_sync//errgroup",
     ],
 )
 
@@ -49,25 +48,20 @@
     ],
     embed = [":sqltracestore"],
     deps = [
-        "//go/cache/local",
-        "//go/cache/mock",
         "//go/deepequal/assertdeep",
         "//go/now",
         "//go/paramtools",
         "//go/query",
         "//go/sql/pool",
-        "//go/testutils",
         "//go/vec32",
         "//perf/go/config",
         "//perf/go/git",
         "//perf/go/git/gittest",
         "//perf/go/git/provider",
         "//perf/go/sql/sqltest",
-        "//perf/go/tracecache",
         "//perf/go/tracestore",
         "//perf/go/types",
         "@com_github_stretchr_testify//assert",
-        "@com_github_stretchr_testify//mock",
         "@com_github_stretchr_testify//require",
     ],
 )
diff --git a/perf/go/tracestore/sqltracestore/sqltracestore.go b/perf/go/tracestore/sqltracestore/sqltracestore.go
index 76b20a9..cf357cf 100644
--- a/perf/go/tracestore/sqltracestore/sqltracestore.go
+++ b/perf/go/tracestore/sqltracestore/sqltracestore.go
@@ -172,7 +172,6 @@
 	"go.skia.org/infra/perf/go/tracecache"
 	"go.skia.org/infra/perf/go/tracestore"
 	"go.skia.org/infra/perf/go/types"
-	"golang.org/x/sync/errgroup"
 )
 
 const (
@@ -1377,7 +1376,7 @@
 // It works by reading in a number of traceNames into a chunk and then passing
 // that chunk of trace names to a worker pool that reads all the trace values
 // for the given trace names.
-func (s *SQLTraceStore) readTracesByChannelForCommitRange(ctx context.Context, traceNames <-chan string, beginCommit types.CommitNumber, endCommit types.CommitNumber) (types.TraceSet, []provider.Commit, map[string]*types.TraceSourceInfo, error) {
+func (s *SQLTraceStore) readTracesByChannelForCommitRange(ctx context.Context, traceNamesChan <-chan string, beginCommit types.CommitNumber, endCommit types.CommitNumber) (types.TraceSet, []provider.Commit, map[string]*types.TraceSourceInfo, error) {
 	ctx, span := trace.StartSpan(ctx, "sqltracestore.readTracesByChannelForCommitRange")
 	defer span.End()
 
@@ -1387,7 +1386,7 @@
 	// Validate the begin and end commit numbers.
 	if beginCommit > endCommit {
 		// Empty the traceNames channel.
-		for range traceNames {
+		for range <-traceNamesChan {
 		}
 		return nil, nil, nil, skerr.Fmt("Invalid commit range, [%d, %d] should be [%d, %d]", beginCommit, endCommit, endCommit, beginCommit)
 	}
@@ -1404,65 +1403,45 @@
 
 	// Protects traceNameMap and ret.
 	var mutex sync.Mutex
-
-	// chunkChannel is used to distribute work to the workers.
-	chunkChannel := make(chan []traceIDForSQL, queryTracesIDOnlyByIndexChannelSize)
-
-	// Start the workers that do the actual querying when given chunks of trace ids.
-	g, ctx := errgroup.WithContext(ctx)
-	sourceFileMap := map[string]*types.TraceSourceInfo{}
-
-	for i := 0; i < poolSize; i++ {
-		g.Go(func() error {
-			ctx, span := trace.StartSpan(ctx, "sqltracestore.ReadTraces.Worker")
-			defer span.End()
-
-			for chunk := range chunkChannel {
-				if err := s.readTracesChunk(ctx, beginCommit, endCommit, commits, chunk, &mutex, traceNameMap, &ret, sourceFileMap); err != nil {
-					return skerr.Wrap(err)
-				}
-			}
-			return nil
-		})
+	traceNames := []string{}
+	var traceIDsForQuery []traceIDForSQL
+	for traceName := range traceNamesChan {
+		traceNames = append(traceNames, traceName)
+		traceIDBytes := traceIDForSQLInBytesFromTraceName(traceName)
+		traceNameMap[traceIDBytes] = traceName
+		traceIDsForQuery = append(traceIDsForQuery, traceIDForSQLFromTraceName(traceName))
 	}
 
-	// Now break up the incoming trace ids into chuck for the workers.
-	currentChunk := []traceIDForSQL{}
-	for key := range traceNames {
-		if !query.IsValid(key) {
-			sklog.Errorf("Invalid key: %q", key)
+	if len(traceNames) == 0 {
+		return types.TraceSet{}, commits, nil, nil
+	}
+
+	for _, name := range traceNames {
+		if !query.IsValid(name) {
+			sklog.Errorf("Invalid trace name: %q", name)
 			continue
 		}
-
 		mutex.Lock()
-		// Make space in ret for the values.
-		ret[key] = vec32.New(len(commits))
-
-		// Update the map from the full name of the trace and id in traceIDForSQLInBytes form.
-		traceNameMap[traceIDForSQLInBytesFromTraceName(key)] = key
+		ret[name] = vec32.New(len(commits))
 		mutex.Unlock()
+	}
 
-		trID := traceIDForSQLFromTraceName(key)
-		currentChunk = append(currentChunk, trID)
-		if len(currentChunk) >= queryTracesChunkSize {
-			chunkChannel <- currentChunk
-			currentChunk = []traceIDForSQL{}
+	sourceFileMap := map[string]*types.TraceSourceInfo{}
+
+	// Iterate over the traceIDs in chunks and query the database in parallel.
+	err = util.ChunkIterParallelPool(ctx, len(traceIDsForQuery), 5, 10, func(ctx context.Context, startIdx, endIdx int) error {
+		chunk := traceIDsForQuery[startIdx:endIdx]
+		if err := s.readTracesChunk(ctx, beginCommit, endCommit, commits, chunk, &mutex, traceNameMap, &ret, sourceFileMap); err != nil {
+			return skerr.Wrap(err)
 		}
-	}
-	// Now handle any remaining values in the currentChunk.
-	if len(currentChunk) >= 0 {
-		chunkChannel <- currentChunk
-	}
-	close(chunkChannel)
+		return nil
+	})
 
-	if err := g.Wait(); err != nil {
+	if err != nil {
 		span.SetStatus(trace.Status{
 			Code:    trace.StatusCodeInternal,
 			Message: err.Error(),
 		})
-		// Empty the traceNames channel.
-		for range traceNames {
-		}
 		return nil, nil, nil, skerr.Wrap(err)
 	}
 
diff --git a/perf/go/tracestore/sqltracestore/sqltracestore_test.go b/perf/go/tracestore/sqltracestore/sqltracestore_test.go
index 3f96e5f..b9934e5 100644
--- a/perf/go/tracestore/sqltracestore/sqltracestore_test.go
+++ b/perf/go/tracestore/sqltracestore/sqltracestore_test.go
@@ -2,26 +2,20 @@
 
 import (
 	"context"
-	"encoding/json"
 	"testing"
 	"time"
 
 	"github.com/stretchr/testify/assert"
-	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
-	"go.skia.org/infra/go/cache/local"
-	mockCache "go.skia.org/infra/go/cache/mock"
 	"go.skia.org/infra/go/now"
 	"go.skia.org/infra/go/paramtools"
 	"go.skia.org/infra/go/query"
-	"go.skia.org/infra/go/testutils"
 	"go.skia.org/infra/go/vec32"
 	"go.skia.org/infra/perf/go/config"
 	"go.skia.org/infra/perf/go/git"
 	"go.skia.org/infra/perf/go/git/gittest"
 	"go.skia.org/infra/perf/go/git/provider"
 	"go.skia.org/infra/perf/go/sql/sqltest"
-	"go.skia.org/infra/perf/go/tracecache"
 	"go.skia.org/infra/perf/go/tracestore"
 	"go.skia.org/infra/perf/go/types"
 )
@@ -410,83 +404,6 @@
 	assert.Empty(t, ts)
 }
 
-func TestQueryTraces_WithTraceCache_Success(t *testing.T) {
-	ctx, s := commonTestSetupWithCommits(t)
-
-	// Query that matches two traces.
-	q, err := query.NewFromString("arch=x86")
-	assert.NoError(t, err)
-	cache := mockCache.NewCache(t)
-	traceCache := tracecache.New(cache)
-
-	// Configure the cache to return the below params.
-	params := []paramtools.Params{
-		paramtools.NewParams(",arch=x86,config=565,"),
-		paramtools.NewParams(",arch=x86,config=8888,"),
-	}
-	b, err := json.Marshal(params)
-	assert.NoError(t, err)
-	cache.On("GetValue", testutils.AnyContext, mock.Anything).Return(string(b), nil)
-
-	ts, commits, _, err := s.QueryTraces(ctx, 0, q, traceCache)
-	assert.NoError(t, err)
-	cache.AssertExpectations(t)
-	assertCommitNumbersMatch(t, commits, []types.CommitNumber{0, 1, 2, 3, 4, 5, 6, 7})
-	assert.Equal(t, ts, types.TraceSet{
-		",arch=x86,config=565,":  {e, 2.3, e, 3.3, e, e, e, e},
-		",arch=x86,config=8888,": {e, 1.5, e, 2.5, e, e, e, e},
-	})
-}
-
-func TestQueryTraces_WithTraceCache_Miss_Success(t *testing.T) {
-	ctx, s := commonTestSetupWithCommits(t)
-
-	// Query that matches two traces.
-	q, err := query.NewFromString("arch=x86")
-	assert.NoError(t, err)
-	cache := mockCache.NewCache(t)
-	traceCache := tracecache.New(cache)
-	// Make the cache return nil.
-	cache.On("GetValue", testutils.AnyContext, mock.Anything).Return("", nil)
-	cache.On("SetValue", testutils.AnyContext, mock.Anything, mock.Anything).Return(nil)
-
-	ts, commits, _, err := s.QueryTraces(ctx, 0, q, traceCache)
-	assert.NoError(t, err)
-	// Makes sure the cache GetValue function was invoked.
-	cache.AssertExpectations(t)
-	assertCommitNumbersMatch(t, commits, []types.CommitNumber{0, 1, 2, 3, 4, 5, 6, 7})
-	assert.Equal(t, ts, types.TraceSet{
-		",arch=x86,config=565,":  {e, 2.3, e, 3.3, e, e, e, e},
-		",arch=x86,config=8888,": {e, 1.5, e, 2.5, e, e, e, e},
-	})
-}
-
-func TestQueryTraces_WithTraceCache_Local_Success(t *testing.T) {
-	ctx, s := commonTestSetupWithCommits(t)
-
-	// Query that matches two traces.
-	q, err := query.NewFromString("arch=x86")
-	assert.NoError(t, err)
-
-	// Let's use an actual local cache
-	cache, err := local.New(10)
-	assert.NoError(t, err)
-	traceCache := tracecache.New(cache)
-
-	ts, commits, _, err := s.QueryTraces(ctx, 0, q, traceCache)
-	assert.NoError(t, err)
-	assertCommitNumbersMatch(t, commits, []types.CommitNumber{0, 1, 2, 3, 4, 5, 6, 7})
-	assert.Equal(t, ts, types.TraceSet{
-		",arch=x86,config=565,":  {e, 2.3, e, 3.3, e, e, e, e},
-		",arch=x86,config=8888,": {e, 1.5, e, 2.5, e, e, e, e},
-	})
-	// Let's verify that the local cache was populated.
-	traceIds, err := traceCache.GetTraceIds(ctx, 0, q)
-	assert.NoError(t, err)
-	assert.NotNil(t, traceIds)
-	assert.Equal(t, 2, len(traceIds))
-}
-
 func TestTraceCount(t *testing.T) {
 	ctx, s := commonTestSetup(t, true)