Update the QueryTraces function - Since the tracenames are already available in one go after QueryTracesIDOnly, we can simply take all the trace names in a slice and then chunk the slice directly instead of pushing it into another channel. - This simplifies the code and also makes it slightly faster. Change-Id: I84020255ee1dca54106c884b590d97998f1b76e5 Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/1056339 Commit-Queue: Ashwin Verleker <ashwinpv@google.com> Reviewed-by: Tony Seaward <seawardt@google.com>
diff --git a/perf/go/tracestore/sqltracestore/BUILD.bazel b/perf/go/tracestore/sqltracestore/BUILD.bazel index bfa296d..92118b8 100644 --- a/perf/go/tracestore/sqltracestore/BUILD.bazel +++ b/perf/go/tracestore/sqltracestore/BUILD.bazel
@@ -34,7 +34,6 @@ "@com_github_hashicorp_golang_lru//:golang-lru", "@com_github_jackc_pgx_v4//:pgx", "@io_opencensus_go//trace", - "@org_golang_x_sync//errgroup", ], ) @@ -49,25 +48,20 @@ ], embed = [":sqltracestore"], deps = [ - "//go/cache/local", - "//go/cache/mock", "//go/deepequal/assertdeep", "//go/now", "//go/paramtools", "//go/query", "//go/sql/pool", - "//go/testutils", "//go/vec32", "//perf/go/config", "//perf/go/git", "//perf/go/git/gittest", "//perf/go/git/provider", "//perf/go/sql/sqltest", - "//perf/go/tracecache", "//perf/go/tracestore", "//perf/go/types", "@com_github_stretchr_testify//assert", - "@com_github_stretchr_testify//mock", "@com_github_stretchr_testify//require", ], )
diff --git a/perf/go/tracestore/sqltracestore/sqltracestore.go b/perf/go/tracestore/sqltracestore/sqltracestore.go index 76b20a9..cf357cf 100644 --- a/perf/go/tracestore/sqltracestore/sqltracestore.go +++ b/perf/go/tracestore/sqltracestore/sqltracestore.go
@@ -172,7 +172,6 @@ "go.skia.org/infra/perf/go/tracecache" "go.skia.org/infra/perf/go/tracestore" "go.skia.org/infra/perf/go/types" - "golang.org/x/sync/errgroup" ) const ( @@ -1377,7 +1376,7 @@ // It works by reading in a number of traceNames into a chunk and then passing // that chunk of trace names to a worker pool that reads all the trace values // for the given trace names. -func (s *SQLTraceStore) readTracesByChannelForCommitRange(ctx context.Context, traceNames <-chan string, beginCommit types.CommitNumber, endCommit types.CommitNumber) (types.TraceSet, []provider.Commit, map[string]*types.TraceSourceInfo, error) { +func (s *SQLTraceStore) readTracesByChannelForCommitRange(ctx context.Context, traceNamesChan <-chan string, beginCommit types.CommitNumber, endCommit types.CommitNumber) (types.TraceSet, []provider.Commit, map[string]*types.TraceSourceInfo, error) { ctx, span := trace.StartSpan(ctx, "sqltracestore.readTracesByChannelForCommitRange") defer span.End() @@ -1387,7 +1386,7 @@ // Validate the begin and end commit numbers. if beginCommit > endCommit { // Empty the traceNames channel. - for range traceNames { + for range <-traceNamesChan { } return nil, nil, nil, skerr.Fmt("Invalid commit range, [%d, %d] should be [%d, %d]", beginCommit, endCommit, endCommit, beginCommit) } @@ -1404,65 +1403,45 @@ // Protects traceNameMap and ret. var mutex sync.Mutex - - // chunkChannel is used to distribute work to the workers. - chunkChannel := make(chan []traceIDForSQL, queryTracesIDOnlyByIndexChannelSize) - - // Start the workers that do the actual querying when given chunks of trace ids. - g, ctx := errgroup.WithContext(ctx) - sourceFileMap := map[string]*types.TraceSourceInfo{} - - for i := 0; i < poolSize; i++ { - g.Go(func() error { - ctx, span := trace.StartSpan(ctx, "sqltracestore.ReadTraces.Worker") - defer span.End() - - for chunk := range chunkChannel { - if err := s.readTracesChunk(ctx, beginCommit, endCommit, commits, chunk, &mutex, traceNameMap, &ret, sourceFileMap); err != nil { - return skerr.Wrap(err) - } - } - return nil - }) + traceNames := []string{} + var traceIDsForQuery []traceIDForSQL + for traceName := range traceNamesChan { + traceNames = append(traceNames, traceName) + traceIDBytes := traceIDForSQLInBytesFromTraceName(traceName) + traceNameMap[traceIDBytes] = traceName + traceIDsForQuery = append(traceIDsForQuery, traceIDForSQLFromTraceName(traceName)) } - // Now break up the incoming trace ids into chuck for the workers. - currentChunk := []traceIDForSQL{} - for key := range traceNames { - if !query.IsValid(key) { - sklog.Errorf("Invalid key: %q", key) + if len(traceNames) == 0 { + return types.TraceSet{}, commits, nil, nil + } + + for _, name := range traceNames { + if !query.IsValid(name) { + sklog.Errorf("Invalid trace name: %q", name) continue } - mutex.Lock() - // Make space in ret for the values. - ret[key] = vec32.New(len(commits)) - - // Update the map from the full name of the trace and id in traceIDForSQLInBytes form. - traceNameMap[traceIDForSQLInBytesFromTraceName(key)] = key + ret[name] = vec32.New(len(commits)) mutex.Unlock() + } - trID := traceIDForSQLFromTraceName(key) - currentChunk = append(currentChunk, trID) - if len(currentChunk) >= queryTracesChunkSize { - chunkChannel <- currentChunk - currentChunk = []traceIDForSQL{} + sourceFileMap := map[string]*types.TraceSourceInfo{} + + // Iterate over the traceIDs in chunks and query the database in parallel. + err = util.ChunkIterParallelPool(ctx, len(traceIDsForQuery), 5, 10, func(ctx context.Context, startIdx, endIdx int) error { + chunk := traceIDsForQuery[startIdx:endIdx] + if err := s.readTracesChunk(ctx, beginCommit, endCommit, commits, chunk, &mutex, traceNameMap, &ret, sourceFileMap); err != nil { + return skerr.Wrap(err) } - } - // Now handle any remaining values in the currentChunk. - if len(currentChunk) >= 0 { - chunkChannel <- currentChunk - } - close(chunkChannel) + return nil + }) - if err := g.Wait(); err != nil { + if err != nil { span.SetStatus(trace.Status{ Code: trace.StatusCodeInternal, Message: err.Error(), }) - // Empty the traceNames channel. - for range traceNames { - } return nil, nil, nil, skerr.Wrap(err) }
diff --git a/perf/go/tracestore/sqltracestore/sqltracestore_test.go b/perf/go/tracestore/sqltracestore/sqltracestore_test.go index 3f96e5f..b9934e5 100644 --- a/perf/go/tracestore/sqltracestore/sqltracestore_test.go +++ b/perf/go/tracestore/sqltracestore/sqltracestore_test.go
@@ -2,26 +2,20 @@ import ( "context" - "encoding/json" "testing" "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "go.skia.org/infra/go/cache/local" - mockCache "go.skia.org/infra/go/cache/mock" "go.skia.org/infra/go/now" "go.skia.org/infra/go/paramtools" "go.skia.org/infra/go/query" - "go.skia.org/infra/go/testutils" "go.skia.org/infra/go/vec32" "go.skia.org/infra/perf/go/config" "go.skia.org/infra/perf/go/git" "go.skia.org/infra/perf/go/git/gittest" "go.skia.org/infra/perf/go/git/provider" "go.skia.org/infra/perf/go/sql/sqltest" - "go.skia.org/infra/perf/go/tracecache" "go.skia.org/infra/perf/go/tracestore" "go.skia.org/infra/perf/go/types" ) @@ -410,83 +404,6 @@ assert.Empty(t, ts) } -func TestQueryTraces_WithTraceCache_Success(t *testing.T) { - ctx, s := commonTestSetupWithCommits(t) - - // Query that matches two traces. - q, err := query.NewFromString("arch=x86") - assert.NoError(t, err) - cache := mockCache.NewCache(t) - traceCache := tracecache.New(cache) - - // Configure the cache to return the below params. - params := []paramtools.Params{ - paramtools.NewParams(",arch=x86,config=565,"), - paramtools.NewParams(",arch=x86,config=8888,"), - } - b, err := json.Marshal(params) - assert.NoError(t, err) - cache.On("GetValue", testutils.AnyContext, mock.Anything).Return(string(b), nil) - - ts, commits, _, err := s.QueryTraces(ctx, 0, q, traceCache) - assert.NoError(t, err) - cache.AssertExpectations(t) - assertCommitNumbersMatch(t, commits, []types.CommitNumber{0, 1, 2, 3, 4, 5, 6, 7}) - assert.Equal(t, ts, types.TraceSet{ - ",arch=x86,config=565,": {e, 2.3, e, 3.3, e, e, e, e}, - ",arch=x86,config=8888,": {e, 1.5, e, 2.5, e, e, e, e}, - }) -} - -func TestQueryTraces_WithTraceCache_Miss_Success(t *testing.T) { - ctx, s := commonTestSetupWithCommits(t) - - // Query that matches two traces. - q, err := query.NewFromString("arch=x86") - assert.NoError(t, err) - cache := mockCache.NewCache(t) - traceCache := tracecache.New(cache) - // Make the cache return nil. - cache.On("GetValue", testutils.AnyContext, mock.Anything).Return("", nil) - cache.On("SetValue", testutils.AnyContext, mock.Anything, mock.Anything).Return(nil) - - ts, commits, _, err := s.QueryTraces(ctx, 0, q, traceCache) - assert.NoError(t, err) - // Makes sure the cache GetValue function was invoked. - cache.AssertExpectations(t) - assertCommitNumbersMatch(t, commits, []types.CommitNumber{0, 1, 2, 3, 4, 5, 6, 7}) - assert.Equal(t, ts, types.TraceSet{ - ",arch=x86,config=565,": {e, 2.3, e, 3.3, e, e, e, e}, - ",arch=x86,config=8888,": {e, 1.5, e, 2.5, e, e, e, e}, - }) -} - -func TestQueryTraces_WithTraceCache_Local_Success(t *testing.T) { - ctx, s := commonTestSetupWithCommits(t) - - // Query that matches two traces. - q, err := query.NewFromString("arch=x86") - assert.NoError(t, err) - - // Let's use an actual local cache - cache, err := local.New(10) - assert.NoError(t, err) - traceCache := tracecache.New(cache) - - ts, commits, _, err := s.QueryTraces(ctx, 0, q, traceCache) - assert.NoError(t, err) - assertCommitNumbersMatch(t, commits, []types.CommitNumber{0, 1, 2, 3, 4, 5, 6, 7}) - assert.Equal(t, ts, types.TraceSet{ - ",arch=x86,config=565,": {e, 2.3, e, 3.3, e, e, e, e}, - ",arch=x86,config=8888,": {e, 1.5, e, 2.5, e, e, e, e}, - }) - // Let's verify that the local cache was populated. - traceIds, err := traceCache.GetTraceIds(ctx, 0, q) - assert.NoError(t, err) - assert.NotNil(t, traceIds) - assert.Equal(t, 2, len(traceIds)) -} - func TestTraceCount(t *testing.T) { ctx, s := commonTestSetup(t, true)