| package dfbuilder |
| |
| import ( |
| "context" |
| "fmt" |
| "sync" |
| "time" |
| |
| "go.opencensus.io/trace" |
| "go.skia.org/infra/go/metrics2" |
| "go.skia.org/infra/go/paramtools" |
| "go.skia.org/infra/go/query" |
| "go.skia.org/infra/go/skerr" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/timer" |
| "go.skia.org/infra/go/vec32" |
| "go.skia.org/infra/perf/go/dataframe" |
| perfgit "go.skia.org/infra/perf/go/git" |
| "go.skia.org/infra/perf/go/progress" |
| "go.skia.org/infra/perf/go/tracefilter" |
| "go.skia.org/infra/perf/go/tracesetbuilder" |
| "go.skia.org/infra/perf/go/tracestore" |
| "go.skia.org/infra/perf/go/types" |
| "golang.org/x/sync/errgroup" |
| ) |
| |
| // Filtering is a custom type used to define |
| // the modes for filtering parent traces |
| type Filtering bool |
| |
| const ( |
| // newNMaxSearch is the minimum number of queries to perform that returned |
| // no data before giving up. |
| // TODO(jcgregorio) Make this either a flag or config value. |
| newNMaxSearch = 40 |
| |
| // It is possible for some ParamSet tiles to have "bad" data, for example, a |
| // tremendous amount of garbage data from a bad ingestion process. This |
| // timeout guards against that eventuality by limiting how long Queries get |
| // to run on each tile. |
| // |
| // Note that this value is much shorter than the default timeout for all SQL |
| // requests because during regression detection we can issue many Queries and |
| // as each one hits that bad ParamSet tile we can clog the backend with these |
| // long running queries. |
| singleTileQueryTimeout = time.Minute |
| |
| // Filter parent traces |
| doFilterParentTraces Filtering = true |
| |
| // Do not filter parent traces |
| doNotFilterParentTraces Filtering = false |
| ) |
| |
| // builder implements DataFrameBuilder using TraceStore. |
| type builder struct { |
| git perfgit.Git |
| store tracestore.TraceStore |
| tileSize int32 |
| numPreflightTiles int |
| filterParentTraces Filtering |
| mux *sync.Mutex |
| |
| newTimer metrics2.Float64SummaryMetric |
| newByTileTimer metrics2.Float64SummaryMetric |
| newFromQueryAndRangeTimer metrics2.Float64SummaryMetric |
| newFromKeysAndRangeTimer metrics2.Float64SummaryMetric |
| newFromCommitIDsAndQueryTimer metrics2.Float64SummaryMetric |
| newNFromQueryTimer metrics2.Float64SummaryMetric |
| newNFromKeysTimer metrics2.Float64SummaryMetric |
| preflightQueryTimer metrics2.Float64SummaryMetric |
| } |
| |
| // NewDataFrameBuilderFromTraceStore builds a DataFrameBuilder. |
| func NewDataFrameBuilderFromTraceStore(git perfgit.Git, store tracestore.TraceStore, numPreflightTiles int, filterParentTraces Filtering) dataframe.DataFrameBuilder { |
| return &builder{ |
| git: git, |
| store: store, |
| numPreflightTiles: numPreflightTiles, |
| tileSize: store.TileSize(), |
| filterParentTraces: filterParentTraces, |
| mux: &sync.Mutex{}, |
| newTimer: metrics2.GetFloat64SummaryMetric("perfserver_dfbuilder_new"), |
| newByTileTimer: metrics2.GetFloat64SummaryMetric("perfserver_dfbuilder_newByTile"), |
| newFromQueryAndRangeTimer: metrics2.GetFloat64SummaryMetric("perfserver_dfbuilder_newFromQueryAndRange"), |
| newFromKeysAndRangeTimer: metrics2.GetFloat64SummaryMetric("perfserver_dfbuilder_newFromKeysAndRange"), |
| newFromCommitIDsAndQueryTimer: metrics2.GetFloat64SummaryMetric("perfserver_dfbuilder_newFromCommitIDsAndQuery"), |
| newNFromQueryTimer: metrics2.GetFloat64SummaryMetric("perfserver_dfbuilder_newNFromQuery"), |
| newNFromKeysTimer: metrics2.GetFloat64SummaryMetric("perfserver_dfbuilder_newNFromKeys"), |
| preflightQueryTimer: metrics2.GetFloat64SummaryMetric("perfserver_dfbuilder_preflightQuery"), |
| } |
| } |
| |
| // fromIndexRange returns the headers and indices for all the commits |
| // between beginIndex and endIndex inclusive. |
| func fromIndexRange(ctx context.Context, git perfgit.Git, beginIndex, endIndex types.CommitNumber) ([]*dataframe.ColumnHeader, []types.CommitNumber, int, error) { |
| ctx, span := trace.StartSpan(ctx, "dfbuilder.fromIndexRange") |
| defer span.End() |
| |
| commits, err := git.CommitSliceFromCommitNumberRange(ctx, beginIndex, endIndex) |
| if err != nil { |
| return nil, nil, 0, skerr.Wrapf(err, "Failed to get headers and commit numbers from time range.") |
| } |
| colHeader := make([]*dataframe.ColumnHeader, len(commits), len(commits)) |
| commitNumbers := make([]types.CommitNumber, len(commits), len(commits)) |
| for i, commit := range commits { |
| colHeader[i] = &dataframe.ColumnHeader{ |
| Offset: commit.CommitNumber, |
| Timestamp: dataframe.TimestampSeconds(commit.Timestamp), |
| } |
| commitNumbers[i] = commit.CommitNumber |
| } |
| return colHeader, commitNumbers, 0, nil |
| } |
| |
| // tileMapOffsetToIndex maps the offset of each point in a tile to the index it |
| // should appear in the resulting Trace. |
| type tileMapOffsetToIndex map[types.TileNumber]map[int32]int32 |
| |
| // sliceOfTileNumbersFromCommits returns a slice of types.TileNumber that contains every Tile that needs |
| // to be queried based on the all the commits in indices. |
| func sliceOfTileNumbersFromCommits(indices []types.CommitNumber, store tracestore.TraceStore) []types.TileNumber { |
| ret := []types.TileNumber{} |
| if len(indices) == 0 { |
| return ret |
| } |
| begin := store.TileNumber(indices[0]) |
| end := store.TileNumber(indices[len(indices)-1]) |
| for i := begin; i <= end; i++ { |
| ret = append(ret, i) |
| } |
| return ret |
| } |
| |
| // new builds a DataFrame for the given columns and populates it with traces that match the given query. |
| // |
| // The progress callback is triggered once for every tile. |
| func (b *builder) new(ctx context.Context, colHeaders []*dataframe.ColumnHeader, indices []types.CommitNumber, q *query.Query, progress progress.Progress, skip int) (*dataframe.DataFrame, error) { |
| ctx, span := trace.StartSpan(ctx, "dfbuilder.new") |
| defer span.End() |
| |
| // TODO tickle progress as each Go routine completes. |
| defer timer.NewWithSummary("perfserver_dfbuilder_new", b.newTimer).Stop() |
| // Determine which tiles we are querying over, and how each tile maps into our results. |
| mapper := sliceOfTileNumbersFromCommits(indices, b.store) |
| |
| commitNumberToOutputIndex := map[types.CommitNumber]int32{} |
| for i, c := range indices { |
| commitNumberToOutputIndex[c] = int32(i) |
| } |
| |
| traceSetBuilder := tracesetbuilder.New(len(indices)) |
| defer traceSetBuilder.Close() |
| |
| var mutex sync.Mutex // mutex protects tilesCompleted. |
| tilesCompleted := 0 |
| triggerProgress := func() { |
| mutex.Lock() |
| defer mutex.Unlock() |
| tilesCompleted++ |
| progress.Message("Tiles", fmt.Sprintf("%d/%d", tilesCompleted, len(mapper))) |
| } |
| |
| var g errgroup.Group |
| // For each tile. |
| for _, tileNumber := range mapper { |
| tileNumber := tileNumber |
| // TODO(jcgregorio) If we query across a large number of tiles N then this will spawn N*8 Go routines |
| // all hitting the backend at the same time. Maybe we need a worker pool if this becomes a problem. |
| g.Go(func() error { |
| defer timer.NewWithSummary("perfserver_dfbuilder_new_by_tile", b.newByTileTimer).Stop() |
| |
| // Query for matching traces in the given tile. |
| queryContext, cancel := context.WithTimeout(ctx, singleTileQueryTimeout) |
| defer cancel() |
| traces, commits, err := b.store.QueryTraces(queryContext, tileNumber, q) |
| if err != nil { |
| return err |
| } |
| |
| traceSetBuilder.Add(commitNumberToOutputIndex, commits, traces) |
| triggerProgress() |
| return nil |
| }) |
| } |
| if err := g.Wait(); err != nil { |
| span.SetStatus(trace.Status{ |
| Code: trace.StatusCodeInternal, |
| Message: err.Error(), |
| }) |
| |
| return nil, fmt.Errorf("Failed while querying: %s", err) |
| } |
| traceSet, paramSet := traceSetBuilder.Build(ctx) |
| d := &dataframe.DataFrame{ |
| TraceSet: traceSet, |
| Header: colHeaders, |
| ParamSet: paramSet, |
| Skip: skip, |
| } |
| return d.Compress(), nil |
| } |
| |
| // See DataFrameBuilder. |
| func (b *builder) NewFromQueryAndRange(ctx context.Context, begin, end time.Time, q *query.Query, downsample bool, progress progress.Progress) (*dataframe.DataFrame, error) { |
| ctx, span := trace.StartSpan(ctx, "dfbuilder.NewFromQueryAndRange") |
| defer span.End() |
| |
| defer timer.NewWithSummary("perfserver_dfbuilder_NewFromQueryAndRange", b.newFromQueryAndRangeTimer).Stop() |
| |
| colHeaders, indices, skip, err := dataframe.FromTimeRange(ctx, b.git, begin, end, downsample) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| return b.new(ctx, colHeaders, indices, q, progress, skip) |
| } |
| |
| // See DataFrameBuilder. |
| func (b *builder) NewFromKeysAndRange(ctx context.Context, keys []string, begin, end time.Time, downsample bool, progress progress.Progress) (*dataframe.DataFrame, error) { |
| ctx, span := trace.StartSpan(ctx, "dfbuilder.NewFromKeysAndRange") |
| defer span.End() |
| |
| // TODO tickle progress as each Go routine completes. |
| defer timer.NewWithSummary("perfserver_dfbuilder_NewFromKeysAndRange", b.newFromKeysAndRangeTimer).Stop() |
| colHeaders, indices, skip, err := dataframe.FromTimeRange(ctx, b.git, begin, end, downsample) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| // Determine which tiles we are querying over, and how each tile maps into our results. |
| // In this case we don't need this, instead a mapping from CommitNumber to Index would be easier. |
| mapper := sliceOfTileNumbersFromCommits(indices, b.store) |
| |
| commitNumberToOutputIndex := map[types.CommitNumber]int32{} |
| for i, c := range indices { |
| commitNumberToOutputIndex[c] = int32(i) |
| } |
| |
| var mutex sync.Mutex // mutex protects traceSet and paramSet. |
| traceSet := types.TraceSet{} |
| paramSet := paramtools.ParamSet{} |
| stepsCompleted := 0 |
| // triggerProgress must only be called when the caller has mutex locked. |
| triggerProgress := func() { |
| stepsCompleted += 1 |
| progress.Message("Tiles", fmt.Sprintf("%d/%d", stepsCompleted, len(mapper))) |
| } |
| |
| var g errgroup.Group |
| // For each tile. |
| for _, tileNumber := range mapper { |
| tileKey := tileNumber |
| // traceMap := traceMap |
| g.Go(func() error { |
| // Read the traces for the given keys. |
| traces, commits, err := b.store.ReadTraces(ctx, tileKey, keys) |
| if err != nil { |
| return err |
| } |
| mutex.Lock() |
| defer mutex.Unlock() |
| // For each trace, convert the encodedKey to a structured key |
| // and copy the trace values into their final destination. |
| for key, tileTrace := range traces { |
| trace, ok := traceSet[key] |
| if !ok { |
| trace = types.NewTrace(len(indices)) |
| } |
| for i, c := range commits { |
| // dstIndex := traceMap[b.store.OffsetFromCommitNumber(c.CommitNumber)] |
| dstIndex := commitNumberToOutputIndex[c.CommitNumber] |
| trace[dstIndex] = tileTrace[i] |
| } |
| /* |
| for srcIndex, dstIndex := range traceMap { |
| trace[dstIndex] = tileTrace[srcIndex] |
| } |
| */ |
| traceSet[key] = trace |
| p, err := query.ParseKey(key) |
| if err != nil { |
| continue |
| } |
| paramSet.AddParams(p) |
| } |
| return nil |
| }) |
| } |
| if err := g.Wait(); err != nil { |
| return nil, fmt.Errorf("Failed while querying: %s", err) |
| } |
| d := &dataframe.DataFrame{ |
| TraceSet: traceSet, |
| Header: colHeaders, |
| ParamSet: paramSet.Freeze(), |
| Skip: skip, |
| } |
| triggerProgress() |
| return d.Compress(), nil |
| } |
| |
| // findIndexForTime finds the index of the closest commit <= 'end'. |
| // |
| // Pass in zero time, i.e. time.Time{} to indicate to just get the most recent commit. |
| func (b *builder) findIndexForTime(ctx context.Context, end time.Time) (types.CommitNumber, error) { |
| ctx, span := trace.StartSpan(ctx, "dfbuilder.findIndexForTime") |
| defer span.End() |
| |
| return b.git.CommitNumberFromTime(ctx, end) |
| } |
| |
| // See DataFrameBuilder. |
| func (b *builder) NewNFromQuery(ctx context.Context, end time.Time, q *query.Query, n int32, progress progress.Progress) (*dataframe.DataFrame, error) { |
| ctx, span := trace.StartSpan(ctx, "dfbuilder.NewNFromQuery") |
| defer span.End() |
| |
| defer timer.NewWithSummary("perfserver_dfbuilder_NewNFromQuery", b.newNFromQueryTimer).Stop() |
| |
| sklog.Infof("Querying to: %v", end) |
| |
| ret := dataframe.NewEmpty() |
| ps := paramtools.NewParamSet() |
| var total int32 // total number of commits we've added to ret so far. |
| steps := 1 // Total number of times we've gone through the loop below, used in the progress() callback. |
| numStepsNoData := 0 |
| |
| endIndex, err := b.findIndexForTime(ctx, end) |
| if err != nil { |
| return nil, fmt.Errorf("Failed to find end index: %s", err) |
| } |
| |
| // beginIndex is the index of the first commit in the tile that endIndex is |
| // in. We are OK if beginIndex == endIndex because fromIndexRange returns |
| // headers from begin to end *inclusive*. |
| beginIndex := b.store.CommitNumberOfTileStart(endIndex) |
| |
| sklog.Infof("BeginIndex: %d EndIndex: %d", beginIndex, endIndex) |
| for total < n { |
| // Query for traces. |
| headers, indices, skip, err := fromIndexRange(ctx, b.git, beginIndex, endIndex) |
| if err != nil { |
| return nil, fmt.Errorf("Failed building index range: %s", err) |
| } |
| |
| df, err := b.new(ctx, headers, indices, q, progress, skip) |
| if err != nil { |
| return nil, fmt.Errorf("Failed while querying: %s", err) |
| } |
| |
| nonMissing := 0 |
| // Total up the number of data points we have for each commit. |
| counts := make([]int, len(df.Header)) |
| for _, tr := range df.TraceSet { |
| for i, x := range tr { |
| if x != vec32.MissingDataSentinel { |
| counts[i] += 1 |
| nonMissing += 1 |
| } |
| } |
| } |
| // If there are no matches then we might be done. |
| if nonMissing == 0 { |
| numStepsNoData += 1 |
| } |
| if numStepsNoData > newNMaxSearch { |
| sklog.Infof("Failed querying: %s", q) |
| break |
| } |
| |
| ps.AddParamSet(df.ParamSet) |
| |
| // For each commit that has data, copy the data from df into ret. |
| // Move backwards down the trace since we are building the result from 'end' backwards. |
| for i := len(counts) - 1; i >= 0; i-- { |
| if counts[i] > 0 { |
| ret.Header = append([]*dataframe.ColumnHeader{df.Header[i]}, ret.Header...) |
| for key, sourceTrace := range df.TraceSet { |
| if _, ok := ret.TraceSet[key]; !ok { |
| ret.TraceSet[key] = vec32.New(int(n)) |
| } |
| ret.TraceSet[key][n-1-total] = sourceTrace[i] |
| } |
| total += 1 |
| |
| // If we've added enough commits to ret then we are done. |
| if total == n { |
| break |
| } |
| } |
| } |
| sklog.Infof("Total: %d Steps: %d NumStepsNoData: %d Query: %v", total, steps, numStepsNoData, q.String()) |
| |
| if total == n { |
| break |
| } |
| |
| progress.Message("Tiles", fmt.Sprintf("Tiles searched: %d. Found %d/%d points.", steps, total, n)) |
| steps += 1 |
| |
| // Now step back a full tile. |
| |
| // At this point we know beginIndex points to the 0th column in a tile, |
| // so endIndex is easy to calculate. |
| endIndex = beginIndex - 1 |
| if endIndex < 0 { |
| break |
| } |
| beginIndex = b.store.CommitNumberOfTileStart(endIndex) |
| if beginIndex < 0 { |
| beginIndex = 0 |
| } |
| } |
| ps.Normalize() |
| ret.ParamSet = ps.Freeze() |
| if b.filterParentTraces == doFilterParentTraces { |
| ret.TraceSet = filterParentTraces(ret.TraceSet) |
| } |
| |
| if trimIndex := n - total; trimIndex > 0 { |
| // Trim down the traces so they are the same length as ret.Header. |
| for key, tr := range ret.TraceSet { |
| if len(tr) > int(trimIndex) { |
| ret.TraceSet[key] = tr[trimIndex:] |
| } |
| } |
| } |
| |
| return ret, nil |
| } |
| |
| // See DataFrameBuilder. |
| func (b *builder) NewNFromKeys(ctx context.Context, end time.Time, keys []string, n int32, progress progress.Progress) (*dataframe.DataFrame, error) { |
| ctx, span := trace.StartSpan(ctx, "dfbuilder.NewNFromKeys") |
| defer span.End() |
| |
| defer timer.NewWithSummary("perfserver_dfbuilder_NewNFromKeys", b.newNFromKeysTimer).Stop() |
| |
| endIndex, err := b.findIndexForTime(ctx, end) |
| if err != nil { |
| return nil, fmt.Errorf("Failed to find end index: %s", err) |
| } |
| beginIndex := endIndex.Add(-(b.tileSize - 1)) |
| if beginIndex < 0 { |
| beginIndex = 0 |
| } |
| |
| ret := dataframe.NewEmpty() |
| ps := paramtools.NewParamSet() |
| var total int32 // total number of commits we've added to ret so far. |
| steps := 1 // Total number of times we've gone through the loop below, used in the progress() callback. |
| numStepsNoData := 0 |
| |
| for total < n { |
| headers, indices, skip, err := fromIndexRange(ctx, b.git, beginIndex, endIndex) |
| if err != nil { |
| return nil, fmt.Errorf("Failed building index range: %s", err) |
| } |
| |
| // Determine which tiles we are querying over, and how each tile maps into our results. |
| mapper := sliceOfTileNumbersFromCommits(indices, b.store) |
| |
| commitNumberToOutputIndex := map[types.CommitNumber]int32{} |
| for i, c := range indices { |
| commitNumberToOutputIndex[c] = int32(i) |
| } |
| |
| traceSet := types.TraceSet{} |
| for _, tileNumber := range mapper { |
| // Read the traces for the given keys. |
| traces, commits, err := b.store.ReadTraces(ctx, tileNumber, keys) |
| if err != nil { |
| return nil, err |
| } |
| // For each trace, convert the encodedKey to a structured key |
| // and copy the trace values into their final destination. |
| for key, tileTrace := range traces { |
| trace, ok := traceSet[key] |
| if !ok { |
| trace = types.NewTrace(len(indices)) |
| } |
| for i, c := range commits { |
| // dstIndex := traceMap[b.store.OffsetFromCommitNumber(c.CommitNumber)] |
| dstIndex := commitNumberToOutputIndex[c.CommitNumber] |
| trace[dstIndex] = tileTrace[i] |
| } |
| /* |
| for srcIndex, dstIndex := range traceMap { |
| // What to we do with commits here? |
| trace[dstIndex] = tileTrace[srcIndex] |
| } |
| */ |
| traceSet[key] = trace |
| } |
| } |
| df := &dataframe.DataFrame{ |
| TraceSet: traceSet, |
| Header: headers, |
| ParamSet: paramtools.NewReadOnlyParamSet(), |
| Skip: skip, |
| } |
| df.BuildParamSet() |
| |
| nonMissing := 0 |
| // Total up the number of data points we have for each commit. |
| counts := make([]int, len(df.Header)) |
| for _, tr := range df.TraceSet { |
| for i, x := range tr { |
| if x != vec32.MissingDataSentinel { |
| counts[i] += 1 |
| nonMissing += 1 |
| } |
| } |
| } |
| // If there are no matches then we might be done. |
| if nonMissing == 0 { |
| numStepsNoData += 1 |
| } |
| if numStepsNoData > newNMaxSearch { |
| break |
| } |
| |
| ps.AddParamSet(df.ParamSet) |
| |
| // For each commit that has data, copy the data from df into ret. |
| // Move backwards down the trace since we are building the result from 'end' backwards. |
| for i := len(counts) - 1; i >= 0; i-- { |
| if counts[i] > 0 { |
| ret.Header = append([]*dataframe.ColumnHeader{df.Header[i]}, ret.Header...) |
| for key, sourceTrace := range df.TraceSet { |
| if _, ok := ret.TraceSet[key]; !ok { |
| ret.TraceSet[key] = vec32.New(int(n)) |
| } |
| ret.TraceSet[key][n-1-total] = sourceTrace[i] |
| } |
| total += 1 |
| |
| // If we've added enough commits to ret then we are done. |
| if total == n { |
| break |
| } |
| } |
| } |
| |
| sklog.Infof("Total: %d Steps: %d NumStepsNoData: %d", total, steps, numStepsNoData) |
| |
| if total == n { |
| break |
| } |
| |
| progress.Message("Tiles", fmt.Sprintf("Tiles searched: %d. Found %d/%d points.", steps, total, n)) |
| steps += 1 |
| |
| endIndex = endIndex.Add(-b.tileSize) |
| beginIndex = endIndex.Add(-b.tileSize) |
| if endIndex < 0 { |
| break |
| } |
| if beginIndex < 0 { |
| beginIndex = 0 |
| } |
| } |
| ps.Normalize() |
| ret.ParamSet = ps.Freeze() |
| if b.filterParentTraces { |
| ret.TraceSet = filterParentTraces(ret.TraceSet) |
| } |
| if total < n { |
| // Trim down the traces so they are the same length as ret.Header. |
| for key, tr := range ret.TraceSet { |
| ret.TraceSet[key] = tr[n-total:] |
| } |
| } |
| |
| return ret, nil |
| } |
| |
| // PreflightQuery implements dataframe.DataFrameBuilder. |
| func (b *builder) PreflightQuery(ctx context.Context, q *query.Query, referenceParamSet paramtools.ReadOnlyParamSet) (int64, paramtools.ParamSet, error) { |
| ctx, span := trace.StartSpan(ctx, "dfbuilder.PreflightQuery") |
| defer span.End() |
| |
| defer timer.NewWithSummary("perfserver_dfbuilder_PreflightQuery", b.preflightQueryTimer).Stop() |
| |
| var count int64 |
| |
| timeBeforeGetLatestTile := time.Now() |
| tileNumber, err := b.store.GetLatestTile(ctx) |
| if err != nil { |
| return -1, nil, err |
| } |
| |
| if q.Empty() { |
| return -1, nil, skerr.Fmt("Can not pre-flight an empty query") |
| } |
| duration := time.Now().Sub(timeBeforeGetLatestTile) |
| sklog.Debugf("Time spent to get latest tile is %d ms", int64(duration/time.Millisecond)) |
| |
| // Since the query isn't empty we'll have to run a partial query |
| // to build the ParamSet. Do so over the two most recent tiles. |
| ps := paramtools.NewParamSet() |
| |
| queryContext, cancel := context.WithTimeout(ctx, time.Duration(b.numPreflightTiles)*singleTileQueryTimeout) |
| defer cancel() |
| |
| timeBeforeQueryTraces := time.Now() |
| // Query traces in parallel to speed it up. |
| var wg sync.WaitGroup |
| doAddParams := func(p paramtools.Params) { |
| b.mux.Lock() |
| defer b.mux.Unlock() |
| ps.AddParams(p) |
| } |
| doUpdateCount := func(tileOneCount int64) { |
| b.mux.Lock() |
| defer b.mux.Unlock() |
| if tileOneCount > count { |
| count = tileOneCount |
| } |
| } |
| var queryTraceError error |
| for i := 0; i < b.numPreflightTiles; i++ { |
| wg.Add(1) |
| go func(iterateTileNumber types.TileNumber) { |
| defer wg.Done() |
| |
| // Count the matches and sum the params in the tile. |
| out, err := b.store.QueryTracesIDOnly(queryContext, iterateTileNumber, q) |
| if err != nil { |
| queryTraceError = err |
| sklog.Errorf("failed to query traces at tile %d with error: %s", iterateTileNumber, err) |
| return |
| } |
| var tileOneCount int64 |
| for p := range out { |
| tileOneCount++ |
| doAddParams(p) |
| } |
| doUpdateCount(tileOneCount) |
| }(tileNumber) |
| |
| // Now move to the previous tile. |
| tileNumber = tileNumber.Prev() |
| if tileNumber == types.BadTileNumber { |
| break |
| } |
| } |
| wg.Wait() |
| duration = time.Now().Sub(timeBeforeQueryTraces) |
| sklog.Debugf("Time spent to query traces is %d ms", int64(duration/time.Millisecond)) |
| if queryTraceError != nil { |
| return -1, nil, fmt.Errorf("failed to query traces: %s", queryTraceError) |
| } |
| |
| // Now we have the ParamSet that corresponds to the query, but for each |
| // key in the query we need to go back and put in all the values that |
| // appear for that key since the user can make more selections in that |
| // key. |
| timeBeforeQueryPlan := time.Now() |
| queryPlan, err := q.QueryPlan(ps.Freeze()) |
| duration = time.Now().Sub(timeBeforeQueryPlan) |
| sklog.Debugf("Time spent to query plan is %d ms", int64(duration/time.Millisecond)) |
| if err != nil { |
| return -1, nil, err |
| } |
| for key := range queryPlan { |
| ps[key] = referenceParamSet[key] |
| } |
| timeBeforeNormalize := time.Now() |
| ps.Normalize() |
| duration = time.Now().Sub(timeBeforeNormalize) |
| sklog.Debugf("Time spent to normalize param set is %d ms", int64(duration/time.Millisecond)) |
| |
| return count, ps, nil |
| } |
| |
| // NumMatches implements dataframe.DataFrameBuilder. |
| func (b *builder) NumMatches(ctx context.Context, q *query.Query) (int64, error) { |
| ctx, span := trace.StartSpan(ctx, "dfbuilder.NumMatches") |
| defer span.End() |
| defer timer.NewWithSummary("perfserver_dfbuilder_NumMatches", b.preflightQueryTimer).Stop() |
| |
| var count int64 |
| |
| tileNumber, err := b.store.GetLatestTile(ctx) |
| if err != nil { |
| return -1, skerr.Wrap(err) |
| } |
| |
| queryContext, cancel := context.WithTimeout(ctx, 2*singleTileQueryTimeout) |
| defer cancel() |
| |
| // Count the matches in the first tile. |
| out, err := b.store.QueryTracesIDOnly(queryContext, tileNumber, q) |
| if err != nil { |
| return -1, skerr.Wrapf(err, "Failed to query traces.") |
| } |
| var tileOneCount int64 |
| for range out { |
| tileOneCount++ |
| } |
| count = tileOneCount |
| |
| // Now move to the previous tile. |
| tileNumber = tileNumber.Prev() |
| if tileNumber != types.BadTileNumber { |
| // Count the matches in the second tile. |
| out, err = b.store.QueryTracesIDOnly(queryContext, tileNumber, q) |
| if err != nil { |
| return -1, fmt.Errorf("Failed to query traces: %s", err) |
| } |
| var tileTwoCount int64 |
| for range out { |
| tileTwoCount++ |
| } |
| // Use the larger of the two counts as our result. |
| if tileTwoCount > count { |
| count = tileTwoCount |
| } |
| } |
| |
| return count, nil |
| } |
| |
| // Filters out parent traces if there are child traces present. |
| // Eg: if there are two traces |
| // T1 = master=m1,bot=b1,test=t1,subtest_1=s1 |
| // T2 = master=m1,bot=b1,test=t1 |
| // we will filter out T2 and only keep T1 since T2 is a parent |
| // of T1 |
| func filterParentTraces(traceSet types.TraceSet) types.TraceSet { |
| traceFilter := tracefilter.NewTraceFilter() |
| paramSetKeys := []string{"master", "bot", "benchmark", "test", "subtest_1", "subtest_2", "subtest_3", "subtest_4", "subtest_5"} |
| for key := range traceSet { |
| params, err := query.ParseKey(key) |
| if err != nil { |
| sklog.Errorf("Error parsing key: %s", err) |
| } |
| |
| // Now lets get the path in the order of the keys |
| // specified in paramSetKeys |
| path := []string{} |
| for _, paramKey := range paramSetKeys { |
| paramValue, ok := params[paramKey] |
| if ok { |
| path = append(path, paramValue) |
| } |
| } |
| |
| traceFilter.AddPath(path, key) |
| } |
| |
| filteredKeys := traceFilter.GetLeafNodeTraceKeys() |
| filteredTraceSet := types.TraceSet{} |
| for _, filteredKey := range filteredKeys { |
| filteredTraceSet[filteredKey] = traceSet[filteredKey] |
| } |
| |
| sklog.Infof("Filtered trace set length: %d, original trace set length: %d", len(filteredTraceSet), len(traceSet)) |
| return filteredTraceSet |
| } |
| |
| // Validate that *builder faithfully implements the DataFrameBuidler interface. |
| var _ dataframe.DataFrameBuilder = (*builder)(nil) |