blob: ab8a7144a45343c6f1b81a7b106fb362fc5e9819 [file] [log] [blame]
// Package pivot provides the ability to pivot dataframes.
//
// That is, given a set of traces:
//
// types.TraceSet{
// ",arch=arm,config=8888,": types.Trace{1, 0, 0},
// ",arch=arm,config=565,": types.Trace{0, 2, 0},
// ",arch=arm,config=gles,": types.Trace{0, 0, 3},
// ",arch=intel,config=8888,": types.Trace{1, 2, 3},
// ",arch=intel,config=565,": types.Trace{1, 2, 3},
// ",arch=intel,config=gles,": types.Trace{1, 2, 3},
// }
//
// You may want to compare how 'arm' machines compare to 'intel' machines. If
// the traces were stored in a spreadsheet then answering that question would
// require a pivot table, where you would pivot over the 'arch' key. This is
// also similar to a GROUP BY operation in SQL, and for such queries you also
// need to supply the type of operation to apply to all the values that appear
// in each group.
//
// So if we created a pivot Request of the form:
//
// req := Request {
// GroupBy: []string{"arch"},
// Operation: Sum,
// }
//
// it would pivot those traces and return summary traces:
//
// types.TraceSet{
// ",arch=arm,": types.Trace{1, 2, 3},
// ",arch=intel,": types.Trace{3, 6, 9},
// }
//
// Note how the trace ids only contain keys that appear in the GroupBy list, as
// these new traces represent each group.
//
// The above set of generated traces could be plotted. But we may want to
// summarize the data further into a table, so we can optionally apply Summary
// operations that will be applied to the resulting traces:
//
// req := Request {
// GroupBy: []string{"arch"},
// Operation: Sum,
// Summary: []Operation{Avg},
// }
//
// Applied to the same traces above we now get:
//
// types.TraceSet{
// ",arch=arm,": types.Trace{2}, // (1+2+3)/3
// ",arch=intel,": types.Trace{6}, // (3+6+9)/3
// }
//
// Note that muliple Summary operations can be applied, and each one will
// generate its own column in the resulting TraceSet.
package pivot
import (
"context"
"go.skia.org/infra/go/calc"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/query"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/vec32"
"go.skia.org/infra/perf/go/dataframe"
"go.skia.org/infra/perf/go/types"
)
// Operation that can be applied to pivot values.
type Operation string
// Operation constants.
const (
Sum Operation = "sum"
Avg Operation = "avg"
Geo Operation = "geo"
Std Operation = "std"
Count Operation = "count"
Min Operation = "min"
Max Operation = "max"
)
// AllOperations for exporting to TypeScript.
var AllOperations = []Operation{Sum, Avg, Geo, Std, Count, Min, Max}
// Request controls how a pivot is done.
type Request struct {
// Which keys to group by.
GroupBy []string `json:"group_by"`
// Operation to apply when grouping.
Operation Operation `json:"operation"`
// If Summary is the empty slice then the Summary is commits, i.e. a plot.
// otherwise produce one column for each Operation in Summary.
Summary []Operation `json:"summary"`
}
type groupByOperation func(types.TraceSet) types.Trace
type summaryOperation func([]float32) float32
// For each type of operation store both the group by and the summary
// operation functions.
type operationFunctions struct {
groupByOperation groupByOperation
summaryOperation summaryOperation
}
func stdDev(a []float32) float32 {
_, stddev, err := vec32.MeanAndStdDev(a)
if err != nil {
return vec32.MissingDataSentinel
}
return stddev
}
// opMap contains all the known operation implementations for both GroupBy and
// Summary operations. Keeping it in a table like this ensures that we always
// have both groupBy and summary functions available.
var opMap map[Operation]operationFunctions = map[Operation]operationFunctions{
Sum: {
groupByOperation: calc.SumFuncImpl,
summaryOperation: vec32.SumE,
},
Avg: {
groupByOperation: calc.AveFuncImpl,
summaryOperation: vec32.MeanE,
},
Geo: {
groupByOperation: calc.GeoFuncImpl,
summaryOperation: vec32.GeoE,
},
Std: {
groupByOperation: calc.StdDevFuncImpl,
summaryOperation: stdDev,
},
Count: {
groupByOperation: calc.CountFuncImpl,
summaryOperation: vec32.Count,
},
Min: {
groupByOperation: calc.MinFuncImpl,
summaryOperation: vec32.Min,
},
Max: {
groupByOperation: calc.MaxFuncImpl,
summaryOperation: vec32.Max,
},
}
// Valid returns an error if the Request is not valid.
func (o Request) Valid() error {
if len(o.GroupBy) == 0 {
return skerr.Fmt("at least one GroupBy value must be supplied.")
}
valid := false
for _, op := range AllOperations {
if op == o.Operation {
valid = true
break
}
}
if !valid {
return skerr.Fmt("invalid Operation value: %q", o.Operation)
}
valid = false
for _, incomingOp := range o.Summary {
for _, op := range AllOperations {
if op == incomingOp {
valid = true
break
}
}
if !valid {
return skerr.Fmt("invalid Summary value: %q", incomingOp)
}
}
return nil
}
// Returns nil if a groupBy key is missing from fullKey.
func groupKeyFromTraceKey(fullKeyAsParam paramtools.Params, groupBy []string) string {
ret := paramtools.Params{}
for _, group := range groupBy {
value, ok := fullKeyAsParam[group]
if !ok {
return ""
}
ret[group] = value
}
key, err := query.MakeKeyFast(ret)
if err != nil {
return ""
}
return key
}
// Pivot returns a new Dataframe with the pivot described in Request applied.
func Pivot(ctx context.Context, req Request, df *dataframe.DataFrame) (*dataframe.DataFrame, error) {
if err := req.Valid(); err != nil {
return nil, skerr.Wrap(err)
}
ret := dataframe.NewEmpty()
// Pre-populate groupedTraceSets with empty types.TraceSet{}s.
groupedTraceSets := map[string]types.TraceSet{}
cpCh, err := df.ParamSet.CartesianProduct(req.GroupBy)
if err != nil {
return nil, skerr.Wrap(err)
}
for p := range cpCh {
groupID, err := query.MakeKeyFast(p)
if err != nil {
continue
}
groupedTraceSets[groupID] = types.TraceSet{}
}
// Loop over all members of TraceSet and put them into groups.
for traceID, trace := range df.TraceSet {
p, err := query.ParseKeyFast(traceID)
if err != nil {
continue
}
groupKey := groupKeyFromTraceKey(p, req.GroupBy)
// If the trace doesn't fit in any group then ignore it.
if groupKey == "" {
continue
}
groupedTraceSets[groupKey][traceID] = trace
}
// Do the GroupBy Operation.
for groupID, traces := range groupedTraceSets {
if len(traces) == 0 {
continue
}
ret.TraceSet[groupID] = opMap[req.Operation].groupByOperation(traces)
if ctx.Err() != nil {
return nil, skerr.Wrap(ctx.Err())
}
}
ret.BuildParamSet()
// Return now if there aren't any Summary operations.
if len(req.Summary) == 0 {
// Use the original Header from the DataFrame.
ret.Header = df.Header
return ret, nil
}
// Make summary columns.
for groupKey, trace := range ret.TraceSet {
summaryValues := make(types.Trace, len(req.Summary))
for i, op := range req.Summary {
summaryValues[i] = opMap[op].summaryOperation(trace)
}
ret.TraceSet[groupKey] = summaryValues
if ctx.Err() != nil {
return nil, skerr.Wrap(ctx.Err())
}
}
// Adjust Header to match the Summary columns.
ret.Header = make([]*dataframe.ColumnHeader, len(req.Summary))
for i := 0; i < len(req.Summary); i++ {
ret.Header[i] = &dataframe.ColumnHeader{
Offset: types.CommitNumber(i),
}
}
return ret, nil
}