blob: 310feafa29087b81000d7e66c2b3c7389bd5b95c [file] [log] [blame]
// Package dataframe provides DataFrame which is a TraceSet with a calculated
// ParamSet and associated commit info.
package dataframe
import (
"fmt"
"sort"
"time"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/query"
"go.skia.org/infra/go/timer"
"go.skia.org/infra/go/vcsinfo"
"go.skia.org/infra/perf/go/cid"
"go.skia.org/infra/perf/go/ptracestore"
)
const (
// DEFAULT_NUM_COMMITS is the number of commits in the DataFrame returned
// from New().
DEFAULT_NUM_COMMITS = 50
MAX_SAMPLE_SIZE = 256
)
// ColumnHeader describes each column in a DataFrame.
type ColumnHeader struct {
Source string `json:"source"`
Offset int64 `json:"offset"`
Timestamp int64 `json:"timestamp"` // In seconds from the Unix epoch.
}
// DataFrame stores Perf measurements in a table where each row is a Trace
// indexed by a structured key (see go/query), and each column is described by
// a ColumnHeader, which could be a commit or a trybot patch level.
//
// Skip is the number of commits skipped to bring the DataFrame down
// to less than MAX_SAMPLE_SIZE commits. If Skip is zero then no
// commits were skipped.
//
// The name DataFrame was gratuitously borrowed from R.
type DataFrame struct {
TraceSet ptracestore.TraceSet `json:"traceset"`
Header []*ColumnHeader `json:"header"`
ParamSet paramtools.ParamSet `json:"paramset"`
Skip int `json:"skip"`
}
// BuildParamSet rebuilds d.ParamSet from the keys of d.TraceSet.
func (d *DataFrame) BuildParamSet() {
paramSet := paramtools.ParamSet{}
for key := range d.TraceSet {
paramSet.AddParamsFromKey(key)
}
for _, values := range paramSet {
sort.Strings(values)
}
d.ParamSet = paramSet
}
// TraceFilter is a function type that should return true if trace 'tr' should
// be removed from a DataFrame. It is used in FilterOut.
type TraceFilter func(tr ptracestore.Trace) bool
// FilterOut removes traces from d.TraceSet if the filter function 'f' returns
// true for a trace.
//
// FilterOut rebuilds the ParamSet to match the new set of traces once
// filtering is complete.
func (d *DataFrame) FilterOut(f TraceFilter) {
for key, tr := range d.TraceSet {
if f(tr) {
delete(d.TraceSet, key)
}
}
d.BuildParamSet()
}
// rangeImpl returns the slices of ColumnHeader and cid.CommitID that
// are needed by DataFrame and ptracestore.PTraceStore, respectively. The
// slices are populated from the given vcsinfo.IndexCommits.
//
// The value for 'skip', the number of commits skipped, is passed through to
// the return values.
func rangeImpl(resp []*vcsinfo.IndexCommit, skip int) ([]*ColumnHeader, []*cid.CommitID, int) {
headers := []*ColumnHeader{}
commits := []*cid.CommitID{}
for _, r := range resp {
commits = append(commits, &cid.CommitID{
Offset: r.Index,
Source: "master",
})
headers = append(headers, &ColumnHeader{
Source: "master",
Offset: int64(r.Index),
Timestamp: r.Timestamp.Unix(),
})
}
return headers, commits, skip
}
// lastN returns the slices of ColumnHeader and cid.CommitID that are
// needed by DataFrame and ptracestore.PTraceStore, respectively. The slices
// are for the last N commits in the repo.
//
// Returns 0 for 'skip', the number of commits skipped.
func lastN(vcs vcsinfo.VCS, n int) ([]*ColumnHeader, []*cid.CommitID, int) {
return rangeImpl(vcs.LastNIndex(n), 0)
}
// getRange returns the slices of ColumnHeader and cid.CommitID that are
// needed by DataFrame and ptracestore.PTraceStore, respectively. The slices
// are for the commits that fall in the given time range [begin, end).
//
// If 'downsample' is true then the number of commits returned is limited
// to MAX_SAMPLE_SIZE.
//
// The value for 'skip', the number of commits skipped, is also returned.
func getRange(vcs vcsinfo.VCS, begin, end time.Time, downsample bool) ([]*ColumnHeader, []*cid.CommitID, int) {
commits := vcs.Range(begin, end)
skip := 0
if downsample {
commits, skip = DownSample(vcs.Range(begin, end), MAX_SAMPLE_SIZE)
}
return rangeImpl(commits, skip)
}
func _new(colHeaders []*ColumnHeader, commitIDs []*cid.CommitID, matches ptracestore.KeyMatches, store ptracestore.PTraceStore, progress ptracestore.Progress, skip int) (*DataFrame, error) {
defer timer.New("_new time").Stop()
traceSet, err := store.Match(commitIDs, matches, progress)
if err != nil {
return nil, fmt.Errorf("DataFrame failed to query for all traces: %s", err)
}
d := &DataFrame{
TraceSet: traceSet,
Header: colHeaders,
ParamSet: paramtools.ParamSet{},
Skip: skip,
}
d.BuildParamSet()
return d, nil
}
// New returns a populated DataFrame of the last 50 commits given the 'vcs' and
// 'store', or a non-nil error if there was a failure retrieving the traces.
func New(vcs vcsinfo.VCS, store ptracestore.PTraceStore, progress ptracestore.Progress) (*DataFrame, error) {
return NewN(vcs, store, progress, DEFAULT_NUM_COMMITS)
}
// NewN returns a populated DataFrame of the last N commits given the 'vcs' and
// 'store', or a non-nil error if there was a failure retrieving the traces.
func NewN(vcs vcsinfo.VCS, store ptracestore.PTraceStore, progress ptracestore.Progress, n int) (*DataFrame, error) {
colHeaders, commitIDs, skip := lastN(vcs, n)
matches := func(key string) bool {
return true
}
return _new(colHeaders, commitIDs, matches, store, progress, skip)
}
// NewFromQueryAndRange returns a populated DataFrame of the traces that match
// the given time range [begin, end) and the passed in query, or a non-nil
// error if the traces can't be retrieved. The 'progress' callback is called
// periodically as the query is processed.
func NewFromQueryAndRange(vcs vcsinfo.VCS, store ptracestore.PTraceStore, begin, end time.Time, q *query.Query, progress ptracestore.Progress) (*DataFrame, error) {
defer timer.New("NewFromQueryAndRange time").Stop()
colHeaders, commitIDs, skip := getRange(vcs, begin, end, true)
return _new(colHeaders, commitIDs, q.Matches, store, progress, skip)
}
// NewFromKeysAndRange returns a populated DataFrame of the traces that match
// the given set of 'keys' over the range of [begin, end). The 'progress'
// callback is called periodically as the query is processed.
func NewFromKeysAndRange(vcs vcsinfo.VCS, keys []string, store ptracestore.PTraceStore, begin, end time.Time, progress ptracestore.Progress) (*DataFrame, error) {
defer timer.New("NewFromKeysAndRange time").Stop()
colHeaders, commitIDs, skip := getRange(vcs, begin, end, true)
sort.Strings(keys)
matches := func(key string) bool {
i := sort.SearchStrings(keys, key)
if i > len(keys)-1 {
return false
}
return keys[i] == key
}
return _new(colHeaders, commitIDs, matches, store, progress, skip)
}
// NewFromCommitIDsAndQuery returns a populated DataFrame of the traces that
// match the given time set of commits 'cids' and the query 'q'. The 'progress'
// callback is called periodically as the query is processed.
func NewFromCommitIDsAndQuery(cids []*cid.CommitID, cidl *cid.CommitIDLookup, store ptracestore.PTraceStore, q *query.Query, progress ptracestore.Progress) (*DataFrame, error) {
details, err := cidl.Lookup(cids)
if err != nil {
return nil, fmt.Errorf("Failed to look up CommitIDs: %s", err)
}
colHeaders := []*ColumnHeader{}
for _, d := range details {
colHeaders = append(colHeaders, &ColumnHeader{
Source: d.Source,
Offset: int64(d.Offset),
Timestamp: d.Timestamp,
})
}
return _new(colHeaders, cids, q.Matches, store, progress, 0)
}
// NewEmpty returns a new empty DataFrame.
func NewEmpty() *DataFrame {
return &DataFrame{
TraceSet: ptracestore.TraceSet{},
Header: []*ColumnHeader{},
ParamSet: paramtools.ParamSet{},
}
}
// NewHeaderOnly returns a DataFrame with a populated Header, with no traces.
// The 'progress' callback is called periodically as the query is processed.
//
// If 'downsample' is true then the number of commits returned is limited
// to MAX_SAMPLE_SIZE.
func NewHeaderOnly(vcs vcsinfo.VCS, begin, end time.Time, downsample bool) *DataFrame {
defer timer.New("NewHeaderOnly time").Stop()
colHeaders, _, skip := getRange(vcs, begin, end, downsample)
return &DataFrame{
TraceSet: ptracestore.TraceSet{},
Header: colHeaders,
ParamSet: paramtools.ParamSet{},
Skip: skip,
}
}