Fully enable in memory trace params in sqltracestore plus some bug fixes.
InMemoryTraceParams
- if no data present then return empty instead of erroring out.
- Add support for negative queries (eg: bot=!win-10-perf)
- Also update tests to refresh the inmemorytraceparams when they write any data.
Change-Id: Id23d18d04e0e74cc8a2d5b2eacbddfa20461a3f3
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/1057564
Commit-Queue: Ashwin Verleker <ashwinpv@google.com>
Reviewed-by: Tony Seaward <seawardt@google.com>
diff --git a/perf/go/builders/builders.go b/perf/go/builders/builders.go
index 4f0a91d..55f717e 100644
--- a/perf/go/builders/builders.go
+++ b/perf/go/builders/builders.go
@@ -176,11 +176,9 @@
return nil, err
}
var inMemoryTraceParams *sqltracestore.InMemoryTraceParams = nil
- if instanceConfig.Experiments.InMemoryTraceParams {
- inMemoryTraceParams, err = sqltracestore.NewInMemoryTraceParams(ctx, db, 12*60*60 /*12hr*/)
- if err != nil {
- return nil, skerr.Wrap(err)
- }
+ inMemoryTraceParams, err = sqltracestore.NewInMemoryTraceParams(ctx, db, 12*60*60 /*12hr*/)
+ if err != nil {
+ return nil, skerr.Wrap(err)
}
return sqltracestore.New(db, instanceConfig.DataStoreConfig, traceParamStore, inMemoryTraceParams)
}
diff --git a/perf/go/dfbuilder/dfbuilder_test.go b/perf/go/dfbuilder/dfbuilder_test.go
index 8b323e1..c02267c 100644
--- a/perf/go/dfbuilder/dfbuilder_test.go
+++ b/perf/go/dfbuilder/dfbuilder_test.go
@@ -38,15 +38,17 @@
}
)
-func getSqlTraceStore(t *testing.T, db pool.Pool, cfg config.DataStoreConfig) *sqltracestore.SQLTraceStore {
+func getSqlTraceStore(t *testing.T, db pool.Pool, cfg config.DataStoreConfig) (*sqltracestore.SQLTraceStore, *sqltracestore.InMemoryTraceParams) {
traceParamStore := sqltracestore.NewTraceParamStore(db)
- store, err := sqltracestore.New(db, cfg, traceParamStore, nil)
+ inMemoryTraceParams, err := sqltracestore.NewInMemoryTraceParams(context.Background(), db, 1)
+ assert.NoError(t, err)
+ store, err := sqltracestore.New(db, cfg, traceParamStore, inMemoryTraceParams)
require.NoError(t, err)
- return store
+ return store, inMemoryTraceParams
}
func TestBuildTraceMapper(t *testing.T) {
db := sqltest.NewSpannerDBForTests(t, "dfbuilder")
- store := getSqlTraceStore(t, db, cfg.DataStoreConfig)
+ store, _ := getSqlTraceStore(t, db, cfg.DataStoreConfig)
tileMap := sliceOfTileNumbersFromCommits([]types.CommitNumber{0, 1, 255, 256, 257}, store)
expected := []types.TileNumber{0, 1}
assert.Equal(t, expected, tileMap)
@@ -57,7 +59,8 @@
}
// The keys of values are structured keys, not encoded keys.
-func addValuesAtIndex(store tracestore.TraceStore, index types.CommitNumber, keyValues map[string]float32, filename string, ts time.Time) error {
+func addValuesAtIndex(store tracestore.TraceStore, inMemoryTraceParams *sqltracestore.InMemoryTraceParams, index types.CommitNumber, keyValues map[string]float32, filename string, ts time.Time) error {
+ ctx := context.Background()
ps := paramtools.ParamSet{}
params := []paramtools.Params{}
values := []float32{}
@@ -70,7 +73,11 @@
params = append(params, p)
values = append(values, v)
}
- return store.WriteTraces(context.Background(), index, params, values, ps, filename, ts)
+ err := store.WriteTraces(ctx, index, params, values, ps, filename, ts)
+ if err != nil {
+ return err
+ }
+ return inMemoryTraceParams.Refresh(ctx)
}
func TestBuildNew(t *testing.T) {
@@ -82,24 +89,24 @@
instanceConfig.DataStoreConfig.TileSize = 6
- store := getSqlTraceStore(t, db, instanceConfig.DataStoreConfig)
+ store, inMemoryTraceParams := getSqlTraceStore(t, db, instanceConfig.DataStoreConfig)
builder := NewDataFrameBuilderFromTraceStore(g, store, nil, 2, doNotFilterParentTraces, instanceConfig.QueryConfig.CommitChunkSize, instanceConfig.QueryConfig.MaxEmptyTilesForQuery)
// Add some points to the first and second tile.
- err = addValuesAtIndex(store, 0, map[string]float32{
+ err = addValuesAtIndex(store, inMemoryTraceParams, 0, map[string]float32{
",arch=x86,config=8888,": 1.2,
",arch=x86,config=565,": 2.1,
",arch=arm,config=8888,": 100.5,
}, "gs://foo.json", time.Now())
assert.NoError(t, err)
- err = addValuesAtIndex(store, 1, map[string]float32{
+ err = addValuesAtIndex(store, inMemoryTraceParams, 1, map[string]float32{
",arch=x86,config=8888,": 1.3,
",arch=x86,config=565,": 2.2,
",arch=arm,config=8888,": 100.6,
}, "gs://foo.json", time.Now())
assert.NoError(t, err)
- err = addValuesAtIndex(store, 7, map[string]float32{
+ err = addValuesAtIndex(store, inMemoryTraceParams, 7, map[string]float32{
",arch=x86,config=8888,": 1.0,
",arch=x86,config=565,": 2.5,
",arch=arm,config=8888,": 101.1,
@@ -195,7 +202,7 @@
assert.Len(t, df.Header, 0)
// Add a value that only appears in one of the tiles.
- err = addValuesAtIndex(store, 7, map[string]float32{
+ err = addValuesAtIndex(store, inMemoryTraceParams, 7, map[string]float32{
",config=8888,model=Pixel,": 3.0,
}, "gs://foo.json", time.Now())
assert.NoError(t, err)
@@ -247,12 +254,12 @@
instanceConfig.DataStoreConfig.TileSize = 6
- store := getSqlTraceStore(t, db, instanceConfig.DataStoreConfig)
+ store, inMemoryTraceParams := getSqlTraceStore(t, db, instanceConfig.DataStoreConfig)
builder := NewDataFrameBuilderFromTraceStore(g, store, nil, 2, doNotFilterParentTraces, instanceConfig.QueryConfig.CommitChunkSize, instanceConfig.QueryConfig.MaxEmptyTilesForQuery)
// Add some points to the first tile.
- err = addValuesAtIndex(store, 0, map[string]float32{
+ err = addValuesAtIndex(store, inMemoryTraceParams, 0, map[string]float32{
",arch=x86,config=8888,": 1.2,
",arch=x86,config=565,": 2.1,
",arch=arm,config=8888,": 100.5,
@@ -273,12 +280,12 @@
instanceConfig.DataStoreConfig.TileSize = 6
- store := getSqlTraceStore(t, db, instanceConfig.DataStoreConfig)
+ store, inMemoryTraceParams := getSqlTraceStore(t, db, instanceConfig.DataStoreConfig)
builder := NewDataFrameBuilderFromTraceStore(g, store, nil, 2, doNotFilterParentTraces, instanceConfig.QueryConfig.CommitChunkSize, instanceConfig.QueryConfig.MaxEmptyTilesForQuery)
// Add some points to the first tile.
- err = addValuesAtIndex(store, 0, map[string]float32{
+ err = addValuesAtIndex(store, inMemoryTraceParams, 0, map[string]float32{
",arch=x86,config=8888,": 1.2,
",arch=x86,config=565,": 2.1,
",arch=arm,config=8888,": 100.5,
@@ -315,12 +322,12 @@
instanceConfig.DataStoreConfig.TileSize = 6
- store := getSqlTraceStore(t, db, instanceConfig.DataStoreConfig)
+ store, inMemoryTraceParams := getSqlTraceStore(t, db, instanceConfig.DataStoreConfig)
builder := NewDataFrameBuilderFromTraceStore(g, store, nil, 2, doNotFilterParentTraces, instanceConfig.QueryConfig.CommitChunkSize, instanceConfig.QueryConfig.MaxEmptyTilesForQuery)
// Add some points to the first tile.
- err = addValuesAtIndex(store, 0, map[string]float32{
+ err = addValuesAtIndex(store, inMemoryTraceParams, 0, map[string]float32{
",arch=x86,config=8888,": 1.2,
",arch=x86,config=565,": 2.1,
",arch=arm,config=8888,": 100.5,
@@ -328,7 +335,7 @@
assert.NoError(t, err)
// Add some points to the second tile.
- err = addValuesAtIndex(store, types.CommitNumber(instanceConfig.DataStoreConfig.TileSize), map[string]float32{
+ err = addValuesAtIndex(store, inMemoryTraceParams, types.CommitNumber(instanceConfig.DataStoreConfig.TileSize), map[string]float32{
",arch=riscv,config=8888,": 1.2,
}, "gs://foo.json", time.Now())
assert.NoError(t, err)
@@ -347,7 +354,7 @@
}
count, ps, err := builder.PreflightQuery(ctx, q, referenceParamSet)
require.NoError(t, err)
- assert.Equal(t, int64(2), count)
+ assert.Equal(t, int64(3), count)
expectedParamSet := paramtools.ParamSet{
"arch": {"arm", "riscv", "x86"},
@@ -363,7 +370,7 @@
instanceConfig.DataStoreConfig.TileSize = 6
- store := getSqlTraceStore(t, db, instanceConfig.DataStoreConfig)
+ store, _ := getSqlTraceStore(t, db, instanceConfig.DataStoreConfig)
builder := NewDataFrameBuilderFromTraceStore(g, store, nil, 2, doNotFilterParentTraces, instanceConfig.QueryConfig.CommitChunkSize, instanceConfig.QueryConfig.MaxEmptyTilesForQuery)
q, err := query.NewFromString("")
@@ -379,12 +386,12 @@
instanceConfig.DataStoreConfig.TileSize = 6
- store := getSqlTraceStore(t, db, instanceConfig.DataStoreConfig)
+ store, inMemoryTraceParams := getSqlTraceStore(t, db, instanceConfig.DataStoreConfig)
builder := NewDataFrameBuilderFromTraceStore(g, store, nil, 2, doNotFilterParentTraces, instanceConfig.QueryConfig.CommitChunkSize, instanceConfig.QueryConfig.MaxEmptyTilesForQuery)
// Add some points to the first tile.
- err = addValuesAtIndex(store, 0, map[string]float32{
+ err = addValuesAtIndex(store, inMemoryTraceParams, 0, map[string]float32{
",arch=x86,config=8888,": 1.2,
",arch=x86,config=565,": 2.1,
",arch=arm,config=8888,": 100.5,
@@ -407,12 +414,12 @@
instanceConfig.DataStoreConfig.TileSize = 6
- store := getSqlTraceStore(t, db, instanceConfig.DataStoreConfig)
+ store, inMemoryTraceParams := getSqlTraceStore(t, db, instanceConfig.DataStoreConfig)
builder := NewDataFrameBuilderFromTraceStore(g, store, nil, 2, doNotFilterParentTraces, instanceConfig.QueryConfig.CommitChunkSize, instanceConfig.QueryConfig.MaxEmptyTilesForQuery)
// Add some points to the latest tile.
- err = addValuesAtIndex(store, types.CommitNumber(instanceConfig.DataStoreConfig.TileSize+1), map[string]float32{
+ err = addValuesAtIndex(store, inMemoryTraceParams, types.CommitNumber(instanceConfig.DataStoreConfig.TileSize+1), map[string]float32{
",arch=x86,config=8888,": 1.2,
",arch=x86,config=565,": 2.1,
",arch=arm,config=8888,": 100.5,
@@ -420,7 +427,7 @@
assert.NoError(t, err)
// Add some points to the previous tile.
- err = addValuesAtIndex(store, 1, map[string]float32{
+ err = addValuesAtIndex(store, inMemoryTraceParams, 1, map[string]float32{
",arch=x86,config=8888,": 1.2,
",arch=riscv,config=8888,": 2.1,
",arch=arm,config=8888,": 100.5,
@@ -444,7 +451,7 @@
instanceConfig.DataStoreConfig.TileSize = 6
- store := getSqlTraceStore(t, db, instanceConfig.DataStoreConfig)
+ store, inMemoryTraceParams := getSqlTraceStore(t, db, instanceConfig.DataStoreConfig)
cache, err := local.New(10)
require.NoError(t, err)
@@ -453,7 +460,7 @@
builder := NewDataFrameBuilderFromTraceStore(g, store, traceCache, 2, doNotFilterParentTraces, instanceConfig.QueryConfig.CommitChunkSize, instanceConfig.QueryConfig.MaxEmptyTilesForQuery)
// Add some points to the first tile.
- err = addValuesAtIndex(store, 0, map[string]float32{
+ err = addValuesAtIndex(store, inMemoryTraceParams, 0, map[string]float32{
",arch=x86,config=8888,": 1.2,
",arch=x86,config=565,": 2.1,
",arch=arm,config=8888,": 100.5,
diff --git a/perf/go/dfiter/dfiter_test.go b/perf/go/dfiter/dfiter_test.go
index b81ad97..aa9fd9c 100644
--- a/perf/go/dfiter/dfiter_test.go
+++ b/perf/go/dfiter/dfiter_test.go
@@ -28,7 +28,8 @@
defaultAnomalyConfig = config.AnomalyConfig{}
)
-func addValuesAtIndex(store tracestore.TraceStore, index types.CommitNumber, keyValues map[string]float32, filename string, ts time.Time) error {
+func addValuesAtIndex(store tracestore.TraceStore, inMemoryTraceParams *sqltracestore.InMemoryTraceParams, index types.CommitNumber, keyValues map[string]float32, filename string, ts time.Time) error {
+ ctx := context.Background()
ps := paramtools.ParamSet{}
params := []paramtools.Params{}
values := []float32{}
@@ -41,7 +42,11 @@
params = append(params, p)
values = append(values, v)
}
- return store.WriteTraces(context.Background(), index, params, values, ps, filename, ts)
+ err := store.WriteTraces(ctx, index, params, values, ps, filename, ts)
+ if err != nil {
+ return err
+ }
+ return inMemoryTraceParams.Refresh(ctx)
}
func newForTest(t *testing.T) (context.Context, dataframe.DataFrameBuilder, perfgit.Git, time.Time) {
@@ -55,25 +60,27 @@
}
traceParamStore := sqltracestore.NewTraceParamStore(db)
- store, err := sqltracestore.New(db, cfg, traceParamStore, nil)
+ inMemoryTraceParams, err := sqltracestore.NewInMemoryTraceParams(ctx, db, 12*60*60)
+ assert.NoError(t, err)
+ store, err := sqltracestore.New(db, cfg, traceParamStore, inMemoryTraceParams)
require.NoError(t, err)
ts := gittest.StartTime
// Add some points to the first and second tile.
- err = addValuesAtIndex(store, 0, map[string]float32{
+ err = addValuesAtIndex(store, inMemoryTraceParams, 0, map[string]float32{
",arch=x86,config=8888,": 1.2,
",arch=x86,config=565,": 2.1,
",arch=arm,config=8888,": 100.5,
}, "gs://foo.json", ts)
assert.NoError(t, err)
- err = addValuesAtIndex(store, 1, map[string]float32{
+ err = addValuesAtIndex(store, inMemoryTraceParams, 1, map[string]float32{
",arch=x86,config=8888,": 1.3,
",arch=x86,config=565,": 2.2,
",arch=arm,config=8888,": 100.6,
}, "gs://foo.json", ts.Add(time.Minute))
assert.NoError(t, err)
- err = addValuesAtIndex(store, 7, map[string]float32{
+ err = addValuesAtIndex(store, inMemoryTraceParams, 7, map[string]float32{
",arch=x86,config=8888,": 1.7,
",arch=x86,config=565,": 2.5,
",arch=arm,config=8888,": 101.1,
@@ -81,7 +88,7 @@
assert.NoError(t, err)
lastTimeStamp := ts.Add(8 * time.Minute)
- err = addValuesAtIndex(store, 8, map[string]float32{
+ err = addValuesAtIndex(store, inMemoryTraceParams, 8, map[string]float32{
",arch=x86,config=8888,": 1.8,
",arch=x86,config=565,": 2.6,
",arch=arm,config=8888,": 101.2,
diff --git a/perf/go/tracestore/sqltracestore/inmemorytraceparams.go b/perf/go/tracestore/sqltracestore/inmemorytraceparams.go
index 06e421a..a56ca73 100644
--- a/perf/go/tracestore/sqltracestore/inmemorytraceparams.go
+++ b/perf/go/tracestore/sqltracestore/inmemorytraceparams.go
@@ -5,6 +5,7 @@
"sync"
"time"
+ "github.com/jackc/pgx/v4"
"go.opencensus.io/trace"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/query"
@@ -41,7 +42,7 @@
dataLock sync.RWMutex
}
-func (tp *InMemoryTraceParams) refresh(ctx context.Context) error {
+func (tp *InMemoryTraceParams) Refresh(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "InMemoryTraceParams.refresh")
defer span.End()
traceparams := [][]int32{}
@@ -63,6 +64,9 @@
`
tileNumber := types.BadTileNumber
if err := tp.db.QueryRow(ctx, getLatestTile).Scan(&tileNumber); err != nil {
+ if err == pgx.ErrNoRows {
+ return nil
+ }
return skerr.Wrap(err)
}
@@ -77,6 +81,9 @@
`
paramsRows, err := tp.db.Query(ctx, paramSetForTile, tileNumber)
if err != nil {
+ if err == pgx.ErrNoRows {
+ return nil
+ }
return skerr.Wrap(err)
}
var pCount int32 = 0
@@ -94,6 +101,9 @@
// Get traceparams row data
rows, err := tp.db.Query(ctx, "SELECT trace_id, params FROM traceparams;")
if err != nil {
+ if err == pgx.ErrNoRows {
+ return nil
+ }
return skerr.Wrap(err)
}
defer rows.Close()
@@ -140,7 +150,7 @@
func (tp *InMemoryTraceParams) startRefresher(ctx context.Context) error {
// Initialize
- err := tp.refresh(ctx)
+ err := tp.Refresh(ctx)
if err != nil {
return err
}
@@ -150,7 +160,7 @@
// Periodically run it based on the specified duration.
refreshDuration := time.Second * time.Duration(tp.refreshIntervalInSeconds)
for range time.Tick(refreshDuration) {
- err := tp.refresh(ctx)
+ err := tp.Refresh(ctx)
if err != nil {
sklog.Errorf("Error updating alert configurations. %s", err)
}
@@ -187,6 +197,9 @@
defer close(outParams)
tp.dataLock.RLock()
defer tp.dataLock.RUnlock()
+ if len(tp.traceparams) == 0 {
+ return
+ }
numTraceparams := len(tp.traceparams[0])
const kChunkSize int = 2000
const kTraceIdQueryPoolSize int = 30
@@ -225,6 +238,15 @@
traceids[nextTraceidCount] = traceIdIdx
nextTraceidCount++
}
+ } else if queryParam.IsNegative {
+ // Keep traceid if it does not match encoded param value
+ for _, eParamValue := range eParamValues {
+ if column[traceIdIdx] != eParamValue {
+ traceids[nextTraceidCount] = traceIdIdx
+ nextTraceidCount++
+ }
+ }
+
} else {
// Keep traceid if it matches any encoded param value
for _, eParamValue := range eParamValues {
diff --git a/perf/go/tracestore/sqltracestore/sqltracestore.go b/perf/go/tracestore/sqltracestore/sqltracestore.go
index a3450e2..1fc9a13 100644
--- a/perf/go/tracestore/sqltracestore/sqltracestore.go
+++ b/perf/go/tracestore/sqltracestore/sqltracestore.go
@@ -747,6 +747,10 @@
return nil, skerr.Wrap(err)
}
+ if inMemoryTraceParams == nil {
+ return nil, skerr.Fmt("inMemoryTraceParams cannot be nil")
+ }
+
ret := &SQLTraceStore{
db: db,
inMemoryTraceParams: inMemoryTraceParams,
@@ -1334,196 +1338,10 @@
return outParams, skerr.Fmt("Can't run QueryTracesIDOnlyByIndex for the empty query.")
}
- if s.inMemoryTraceParams != nil {
- s.inMemoryTraceParams.QueryTraceIDs(ctx, tileNumber, q, outParams)
- return outParams, nil
- }
-
- ps, err := s.GetParamSet(ctx, tileNumber)
- if err != nil {
- close(outParams)
- return outParams, skerr.Wrap(err)
- }
-
- plan, err := q.QueryPlan(ps)
- if err != nil {
- // Not an error, we just won't match anything in this tile.
- //
- // The plan may be invalid because it is querying with keys or values
- // that don't appear in a tile, which means the query won't work on this
- // tile, but it may still work on other tiles, so we just don't return
- // any results for this tile.
- close(outParams)
- return outParams, nil
- }
- if len(plan) == 0 {
- // We won't match anything in this tile.
- sklog.Info("QueryPlan returns an empty list")
- close(outParams)
- return outParams, nil
- }
-
- // Sanitize our inputs.
- if err := query.ValidateParamSet(plan); err != nil {
- return nil, skerr.Wrapf(err, "invalid query %#v", *q)
- }
-
- // Do a quick pre-flight to find if we can add a "AND trace_id IN (...)"
- // clause to all query parts to speed them up.
- traceIDRestriction, skipKey, planDisposition := s.restrictByCounting(ctx, tileNumber, plan)
- if planDisposition == skippable {
- // We know this query won't match any traces in this tile.
- sklog.Info("restrictByCounting returns an skippable planDisposition")
- close(outParams)
- return outParams, nil
- }
-
- optimizeSQLTraceStore := config.Config != nil && config.Config.OptimizeSQLTraceStore
-
- // This query is done in two parts because the CDB query planner seems to
- // pick a really bad plan a large percentage of the time.
-
- // First find the encoded trace ids that match the query. Break apart the
- // QueryPlan and do each group of OR's as individual queries to the
- // database, but then stream the results and do the ANDs here on the server.
- // That's because CDB complains about the amount of RAM that doing the AND
- // can require. For example, the query 'source_type=svg&sub_result=min_ms'
- // requires merging two lists that are both over 200k.
- unionChannels := []<-chan traceIDForSQL{}
- i := 0
- for key, values := range plan {
- // If we are using a restrict clause then all the trace_ids for that key
- // are included all the other queries, so we can skip querying on that
- // key directly.
- if key == skipKey {
- continue
- }
-
- // Expand the template for the SQL.
- var b bytes.Buffer
- // Query trace ids through index by_key_value if the traceIDRestriction is empty,
- // otherwise, query trace ids through the primary key, which will reduce the SQL query statement time.
- if len(traceIDRestriction) == 0 {
- context := queryTraceIDsByKeyValueContext{
- TileNumber: tileNumber,
- Key: key,
- Values: values,
- AsOf: "",
- }
- if err := s.unpreparedStatements[queryTraceIDsByKeyValue].Execute(&b, context); err != nil {
- return nil, skerr.Wrapf(err, "failed to expand queryTraceIDsByKeyValue template")
- }
- } else {
- context := queryTraceIDsContext{
- TileNumber: tileNumber,
- Key: key,
- Values: values,
- AsOf: "",
- RestrictClause: traceIDRestriction,
- }
- if err := s.unpreparedStatements[queryTraceIDs].Execute(&b, context); err != nil {
- return nil, skerr.Wrapf(err, "failed to expand queryTraceIDs template")
- }
- }
- sql := b.String()
- ch := make(chan traceIDForSQL)
- if optimizeSQLTraceStore {
- ch = make(chan traceIDForSQL, queryTracesIDOnlyByIndexChannelSize)
- }
- unionChannels = append(unionChannels, ch)
-
- go func(ch chan traceIDForSQL, sql string) {
- _, span := trace.StartSpan(ctx, "sqltracestore.QueryTracesIDOnly.PerKeyWorker")
- defer span.End()
-
- defer close(ch)
-
- queryCtx, querySpan := trace.StartSpan(ctx, "sqltracestore.QueryTracesIDOnly.ExecuteSQLQuery")
- rows, err := s.db.Query(queryCtx, sql)
- querySpan.End()
- if err != nil {
- sklog.Infof("Error querying traceIds: %v", err)
- return
- }
- for rows.Next() {
- var traceIDAsBytes []byte
- if err := rows.Scan(&traceIDAsBytes); err != nil {
- sklog.Errorf("Failed to scan traceIDAsBytes: %s", skerr.Wrap(err))
- return
- }
- if err := rows.Err(); err != nil {
- if err == pgx.ErrNoRows {
- return
- }
- sklog.Errorf("Failed while reading traceIDAsBytes: %s", skerr.Wrap(err))
- return
- }
- ch <- traceIDForSQLFromTraceIDAsBytes(traceIDAsBytes)
- }
- }(ch, sql)
- i++
- }
-
- // Now AND together the results of all the unionChannels.
- traceIDsCh := newIntersect(ctx, unionChannels)
-
- // traceIDsCh supplies the relevant trace ids matching the query.
- // Now let's collect the unique traceIds from the channel and then get the params
- // for those traces.
- traceIdsMap := map[traceIDForSQL]bool{}
- uniqueTraceIds := []string{}
- for hexEncodedTraceID := range traceIDsCh {
- if _, ok := traceIdsMap[hexEncodedTraceID]; !ok {
- uniqueTraceIds = append(uniqueTraceIds, string(hexEncodedTraceID))
- }
- traceIdsMap[hexEncodedTraceID] = true
- }
- // Populate the outParams channel with the params for the traceIds.
- err = s.populateParamsForTraces(ctx, uniqueTraceIds, outParams)
- if err != nil {
- sklog.Errorf("Error converting traceIds to params: %v", err)
- return outParams, err
- }
-
+ s.inMemoryTraceParams.QueryTraceIDs(ctx, tileNumber, q, outParams)
return outParams, nil
}
-// populateParamsForTraces reads the params for the hex encoded traceIds and posts them on
-// the outParams channel.
-func (s *SQLTraceStore) populateParamsForTraces(ctx context.Context, traceIds []string, outParams chan paramtools.Params) error {
- // The goroutine below handles reading of the trace params in chunks.
- // We run this in a separate thread so that the upstream code that reads the outParams channel
- // can start reading this information without having to wait for all the traceIds to be completely read.
- go func() {
- ctx, span := trace.StartSpan(ctx, "sqltracestore.ReadTraceParams")
- defer span.End()
-
- // Close the channel when this goroutine completes signalling end of data.
- defer close(outParams)
-
- span.AddAttributes(trace.Int64Attribute("trace_count", int64(len(traceIds))))
- err := util.ChunkIterParallelPool(ctx, len(traceIds), queryTraceParamsChunkSize, poolSize, func(ctx context.Context, startIdx, endIdx int) error {
- traceIdChunk := traceIds[startIdx:endIdx]
- params, err := s.traceParamStore.ReadParams(ctx, traceIdChunk)
- if err != nil {
- sklog.Errorf("Error reading params:%v", err)
- return err
- }
-
- // Report the params for the current chunk.
- for _, param := range params {
- outParams <- param
- }
-
- return nil
- })
- if err != nil {
- sklog.Errorf("Error retrieving trace ids: %v", err)
- }
- }()
- return nil
-}
-
// ReadTraces implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) ReadTraces(ctx context.Context, tileNumber types.TileNumber, traceNames []string) (types.TraceSet, []provider.Commit, error) {
ctx, span := trace.StartSpan(ctx, "sqltracestore.ReadTraces")
diff --git a/perf/go/tracestore/sqltracestore/sqltracestore_test.go b/perf/go/tracestore/sqltracestore/sqltracestore_test.go
index 2184c3c..f87c437 100644
--- a/perf/go/tracestore/sqltracestore/sqltracestore_test.go
+++ b/perf/go/tracestore/sqltracestore/sqltracestore_test.go
@@ -50,12 +50,15 @@
ctx := context.Background()
db := sqltest.NewSpannerDBForTests(t, "tracestore")
traceParamStore := NewTraceParamStore(db)
-
- store, err := New(db, cfg, traceParamStore, nil)
+ inMemoryTraceParams, err := NewInMemoryTraceParams(ctx, db, 12*60*60)
+ require.NoError(t, err)
+ store, err := New(db, cfg, traceParamStore, inMemoryTraceParams)
require.NoError(t, err)
if populateTraces {
populatedTestDB(t, ctx, store)
+ err := inMemoryTraceParams.Refresh(ctx)
+ require.NoError(t, err)
}
return ctx, store
@@ -67,10 +70,14 @@
require.NoError(t, err)
traceParamStore := NewTraceParamStore(db)
- store, err := New(db, cfg, traceParamStore, nil)
+ inMemoryTraceParams, err := NewInMemoryTraceParams(ctx, db, 12*60*60)
+ require.NoError(t, err)
+ store, err := New(db, cfg, traceParamStore, inMemoryTraceParams)
require.NoError(t, err)
populatedTestDB(t, ctx, store)
+ err = inMemoryTraceParams.Refresh(ctx)
+ require.NoError(t, err)
return ctx, store
}
@@ -292,17 +299,6 @@
return ret
}
-func TestQueryTracesIDOnly_EmptyTileReturnsEmptyParamset(t *testing.T) {
- ctx, s := commonTestSetup(t, true)
-
- // Query that matches one trace.
- q, err := query.NewFromString("config=565")
- assert.NoError(t, err)
- ch, err := s.QueryTracesIDOnly(ctx, 5, q)
- require.NoError(t, err)
- assert.Empty(t, paramSetFromParamsChan(ch))
-}
-
func TestQueryTracesIDOnly_MatchesOneTrace(t *testing.T) {
ctx, s := commonTestSetup(t, true)
@@ -318,24 +314,6 @@
assert.Equal(t, expected, paramSetFromParamsChan(ch))
}
-func TestQueryTracesIDOnly_QueryThatTriggersUserOfARestrictClause_Success(t *testing.T) {
- ctx, s := commonTestSetup(t, true)
- s.queryUsesRestrictClause.Reset()
-
- // "config=565" Matches one trace. "arch=x86" Matches two traces. So the
- // query will use a restrict clause to speed up the query.
- q, err := query.NewFromString("arch=x86&config=565")
- require.NoError(t, err)
- ch, err := s.QueryTracesIDOnly(ctx, 0, q)
- require.NoError(t, err)
- expected := paramtools.ParamSet{
- "arch": []string{"x86"},
- "config": []string{"565"},
- }
- assert.Equal(t, expected, paramSetFromParamsChan(ch))
- assert.Equal(t, int64(1), s.queryUsesRestrictClause.Get())
-}
-
func TestQueryTracesIDOnly_MatchesTwoTraces(t *testing.T) {
ctx, s := commonTestSetup(t, true)