blob: 9dfbc1cd55bcf96df15fcd71a79e067330a80985 [file] [log] [blame]
package regression
import (
"context"
"fmt"
"net/url"
"go.skia.org/infra/go/query"
"go.skia.org/infra/go/vcsinfo"
"go.skia.org/infra/go/vec32"
"go.skia.org/infra/perf/go/cid"
"go.skia.org/infra/perf/go/dataframe"
"go.skia.org/infra/perf/go/types"
)
// DataFrameIterator is an iterator that produces DataFrames.
//
// for it.Next() {
// df, err := it.Value(ctx)
// // Do something with df.
// }
type DataFrameIterator interface {
Next() bool
Value(ctx context.Context) (*dataframe.DataFrame, error)
}
// dataframeSlicer implements DataFrameIterator by slicing sub-dataframes from
// a larger dataframe.
type dataframeSlicer struct {
df *dataframe.DataFrame
size int
offset int
}
// See DataFrameIterator.
func (d *dataframeSlicer) Next() bool {
return d.offset+d.size <= len(d.df.Header)
}
// See DataFrameIterator.
func (d *dataframeSlicer) Value(ctx context.Context) (*dataframe.DataFrame, error) {
// Slice off a sub-dataframe from d.df.
df, err := d.df.Slice(d.offset, d.size)
if err != nil {
return nil, err
}
d.offset += 1
return df, nil
}
// NewDataFrameIterator retuns a DataFrameIterator that produces a set of
// dataframes for the given ClusterRequest.
func NewDataFrameIterator(ctx context.Context, progress types.Progress, req *ClusterRequest, dfBuilder dataframe.DataFrameBuilder) (DataFrameIterator, error) {
u, err := url.ParseQuery(req.Query)
if err != nil {
return nil, err
}
q, err := query.New(u)
if err != nil {
return nil, err
}
df, err := dfBuilder.NewNFromQuery(ctx, req.End, q, req.N, progress)
if err != nil {
return nil, fmt.Errorf("Failed to build dataframe iterator: %s", err)
}
return &dataframeSlicer{
df: df,
size: req.Radius*2 + 1,
offset: 0,
}, nil
}
// singleIterator is a DataFrameIterator that produces a single DataFrame.
//
// This code is transitional to move regression detection to using
// DateFrameIterators for all regression detection.
type singleIterator struct {
started bool
q *query.Query
begin int
end int
progress types.Progress
cidl *cid.CommitIDLookup
v vcsinfo.VCS
request *ClusterRequest
dfBuilder dataframe.DataFrameBuilder
}
// See DataFrameIterator.
func (s *singleIterator) Next() bool {
ret := !s.started
s.started = true
return ret
}
// See DataFrameIterator.
func (s *singleIterator) Value(ctx context.Context) (*dataframe.DataFrame, error) {
parsedQuery, err := url.ParseQuery(s.request.Query)
if err != nil {
return nil, fmt.Errorf("Invalid URL query: %s", err)
}
q, err := query.New(parsedQuery)
if err != nil {
return nil, fmt.Errorf("Invalid URL query: %s", err)
}
cidsWithDataInRange := func(begin, end int) ([]*cid.CommitID, error) {
c := []*cid.CommitID{}
for i := begin; i < end; i++ {
c = append(c, &cid.CommitID{
Source: s.request.Source,
Offset: i,
})
}
df, err := s.dfBuilder.NewFromCommitIDsAndQuery(ctx, c, s.cidl, q, nil)
if err != nil {
return nil, fmt.Errorf("Failed to load data searching for commit ids: %s", err)
}
return cidsWithData(df), nil
}
cids, err := calcCids(s.request, s.v, cidsWithDataInRange)
if err != nil {
return nil, fmt.Errorf("Could not calculate the commits to run a cluster over: %s", err)
}
df, err := s.dfBuilder.NewFromCommitIDsAndQuery(ctx, cids, s.cidl, q, s.progress)
if err != nil {
return nil, fmt.Errorf("Invalid range of commits: %s", err)
}
return df, err
}
// NewSingleDataFrameIterator creates a singeIterator instance.
func NewSingleDataFrameIterator(progress types.Progress, cidl *cid.CommitIDLookup, v vcsinfo.VCS, request *ClusterRequest, dfBuilder dataframe.DataFrameBuilder) *singleIterator {
return &singleIterator{
started: false,
progress: progress,
cidl: cidl,
v: v,
request: request,
dfBuilder: dfBuilder,
}
}
// CidsWithDataInRange is passed to calcCids, and returns all
// the commit ids in [begin, end) that have data.
type CidsWithDataInRange func(begin, end int) ([]*cid.CommitID, error)
// cidsWithData returns the commit ids in the dataframe that have non-missing
// data in at least one trace.
func cidsWithData(df *dataframe.DataFrame) []*cid.CommitID {
ret := []*cid.CommitID{}
for i, h := range df.Header {
for _, tr := range df.TraceSet {
if tr[i] != vec32.MISSING_DATA_SENTINEL {
ret = append(ret, &cid.CommitID{
Source: h.Source,
Offset: int(h.Offset),
})
break
}
}
}
return ret
}
// calcCids returns a slice of CommitID's that clustering should be run over.
func calcCids(request *ClusterRequest, v vcsinfo.VCS, cidsWithDataInRange CidsWithDataInRange) ([]*cid.CommitID, error) {
cids := []*cid.CommitID{}
if request.Sparse {
// Sparse means data might not be available for every commit, so we need to scan
// the data and gather up +/- Radius commits from the target commit that actually
// do have data.
// Start by checking center point as a quick exit strategy.
withData, err := cidsWithDataInRange(request.Offset, request.Offset+1)
if err != nil {
return nil, err
}
if len(withData) == 0 {
return nil, fmt.Errorf("No data at the target commit id.")
}
cids = append(cids, withData...)
// Then check from the target forward in time.
lastCommit := v.LastNIndex(1)
lastIndex := lastCommit[0].Index
finalIndex := request.Offset + 1 + SPARSE_BLOCK_SEARCH_MULT*request.Radius
if finalIndex > lastIndex {
finalIndex = lastIndex
}
withData, err = cidsWithDataInRange(request.Offset+1, finalIndex)
if err != nil {
return nil, err
}
if len(withData) < request.Radius {
return nil, fmt.Errorf("Not enough sparse data after the target commit.")
}
cids = append(cids, withData[:request.Radius]...)
// Finally check backward in time.
backward := request.Radius
startIndex := request.Offset - SPARSE_BLOCK_SEARCH_MULT*backward
withData, err = cidsWithDataInRange(startIndex, request.Offset)
if err != nil {
return nil, err
}
if len(withData) < backward {
return nil, fmt.Errorf("Not enough sparse data before the target commit.")
}
withData = withData[len(withData)-backward:]
cids = append(withData, cids...)
} else {
if request.Radius <= 0 {
request.Radius = 1
}
if request.Radius > MAX_RADIUS {
request.Radius = MAX_RADIUS
}
from := request.Offset - request.Radius
to := request.Offset + request.Radius
for i := from; i <= to; i++ {
cids = append(cids, &cid.CommitID{
Source: request.Source,
Offset: i,
})
}
}
return cids, nil
}