blob: 51f66a6a28a5d2cf84864145ae95b735cd6c8ffe [file] [log] [blame]
// Package dfloader implements results.Loader using a DataFrameBuilder.
package dfloader
import (
"context"
"fmt"
"sort"
"time"
"go.opencensus.io/trace"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/query"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/vec32"
"go.skia.org/infra/perf/go/dataframe"
perfgit "go.skia.org/infra/perf/go/git"
"go.skia.org/infra/perf/go/progress"
"go.skia.org/infra/perf/go/trybot/results"
"go.skia.org/infra/perf/go/trybot/store"
"go.skia.org/infra/perf/go/types"
)
// TraceHistorySize is the number of points we load for each trace.
const TraceHistorySize = 20
// ErrQueryMustNotBeEmpty is returned if an empty query is passed in the TryBotRequest.
var ErrQueryMustNotBeEmpty = fmt.Errorf("Query must not be empty.")
// Loader implements results.Loader.
type Loader struct {
dfb dataframe.DataFrameBuilder
store store.TryBotStore
git perfgit.Git
}
// New returns a new Loader instance.
func New(dfb dataframe.DataFrameBuilder, store store.TryBotStore, git perfgit.Git) Loader {
return Loader{
dfb: dfb,
store: store,
git: git,
}
}
// sortableTryBotResults sorts a slice of results.TryBotResult in StdDevRatio
// descending so we list regressions first and performance improvements last.
type sortableTryBotResults []results.TryBotResult
func (p sortableTryBotResults) Len() int { return len(p) }
func (p sortableTryBotResults) Less(i, j int) bool { return p[i].StdDevRatio > p[j].StdDevRatio }
func (p sortableTryBotResults) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// Load implements the results.Loader interface.
func (l Loader) Load(ctx context.Context, request results.TryBotRequest, progress progress.Progress) (results.TryBotResponse, error) {
ctx, span := trace.StartSpan(ctx, "dfloader.Load")
defer span.End()
timestamp := time.Now()
if request.Kind == results.Commit {
commit, err := l.git.CommitFromCommitNumber(ctx, request.CommitNumber)
if err != nil {
return results.TryBotResponse{}, skerr.Wrap(err)
}
timestamp = time.Unix(commit.Timestamp, 0)
}
q, err := query.NewFromString(request.Query)
if err != nil {
return results.TryBotResponse{}, skerr.Wrap(err)
}
if request.Kind == results.Commit && q.Empty() {
return results.TryBotResponse{}, ErrQueryMustNotBeEmpty
}
var df *dataframe.DataFrame
rebuildParamSet := false
// TODO(jcgregorio) What we really need for queries below is a new call into
// TraceStore that retrieves the last N commits along with their offsets
// ending at a given commit. Note that the N values can come from different
// commits from trace to trace. These should be run as individual LIMIT N
// queries across the TraceValues table.
if request.Kind == results.Commit {
// Always pull in TraceHistorySize+1 trace values. The TraceHistorySize
// represents the history of the trace, and the TraceHistorySize+1 point
// represents either the commit under inspection or a placeholder for the
// trybot value, which lets us avoid a second memory allocation, which we'd
// get if we had only queried for TraceHistorySize values.
df, err = l.dfb.NewNFromQuery(ctx, timestamp, q, TraceHistorySize+1, progress)
if err != nil {
return results.TryBotResponse{}, skerr.Wrap(err)
}
} else {
// Load the trybot results.
storeResults, err := l.store.Get(ctx, request.CL, request.PatchNumber)
if err != nil {
return results.TryBotResponse{}, skerr.Wrap(err)
}
traceNames := make([]string, 0, len(storeResults))
for _, results := range storeResults {
traceNames = append(traceNames, results.TraceName)
}
// Query for all traces that match up with the trybot results.
df, err = l.dfb.NewNFromKeys(ctx, timestamp, traceNames, TraceHistorySize+1, progress)
if err != nil {
return results.TryBotResponse{}, skerr.Wrap(err)
}
// Replace the last value in each trace with the trybot result.
for _, results := range storeResults {
values, ok := df.TraceSet[results.TraceName]
if !ok {
delete(df.TraceSet, results.TraceName)
// At this point the df.ParamSet is no longer valid and we should rebuild it.
rebuildParamSet = true
continue
}
values[len(values)-1] = results.Value
}
}
ret := results.TryBotResponse{}
ret.Header = df.Header
if request.Kind == results.TryBot && len(ret.Header) > 0 {
ret.Header[len(ret.Header)-1].Offset = types.BadCommitNumber
}
ret.ParamSet = df.ParamSet
res := make([]results.TryBotResult, 0, len(df.TraceSet))
// Loop over all the traces and parse the key into params and pass the
// values to vec32.StdDevRatio.
for traceName, values := range df.TraceSet {
params, err := query.ParseKey(traceName)
if err != nil {
sklog.Errorf("Failed to parse %q: %s", traceName, err)
rebuildParamSet = true
continue
}
stddevRatio, median, lower, upper, err := vec32.StdDevRatio(values)
if err != nil {
rebuildParamSet = true
continue
}
res = append(res, results.TryBotResult{
Params: params,
Median: median,
Lower: lower,
Upper: upper,
StdDevRatio: stddevRatio,
Values: values,
})
}
sort.Sort(sortableTryBotResults(res))
ret.Results = res
if rebuildParamSet {
ps := paramtools.NewParamSet()
for _, res := range ret.Results {
ps.AddParams(res.Params)
}
ps.Normalize()
ret.ParamSet = ps.Freeze()
}
return ret, nil
}
// Assert that we fulfill the interface.
var _ results.Loader = (*Loader)(nil)