| // Package dfiter efficiently creates dataframes used in regression detection. |
| package dfiter |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "net/url" |
| "time" |
| |
| "go.opencensus.io/trace" |
| "go.skia.org/infra/go/metrics2" |
| "go.skia.org/infra/go/now" |
| "go.skia.org/infra/go/query" |
| "go.skia.org/infra/go/skerr" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/perf/go/alerts" |
| "go.skia.org/infra/perf/go/config" |
| "go.skia.org/infra/perf/go/dataframe" |
| perfgit "go.skia.org/infra/perf/go/git" |
| "go.skia.org/infra/perf/go/progress" |
| "go.skia.org/infra/perf/go/types" |
| ) |
| |
| // ErrInsufficientData is returned by the DataFrameIterator if all the queries |
| // compeleted successfully, but there wasn't enough data to continue. |
| var ErrInsufficientData = errors.New("insufficient data") |
| |
| // DataFrameIterator is an iterator that produces DataFrames. |
| // |
| // for it.Next() { |
| // df, err := it.Value(ctx) |
| // // Do something with df. |
| // } |
| type DataFrameIterator interface { |
| Next() bool |
| Value(ctx context.Context) (*dataframe.DataFrame, error) |
| } |
| |
| // dataframeSlicer implements DataFrameIterator by slicing sub-dataframes from |
| // a larger dataframe. |
| type dataframeSlicer struct { |
| df *dataframe.DataFrame |
| size int |
| offset int |
| } |
| |
| // See DataFrameIterator. |
| func (d *dataframeSlicer) Next() bool { |
| return d.offset+d.size <= len(d.df.Header) |
| } |
| |
| // See DataFrameIterator. |
| func (d *dataframeSlicer) Value(ctx context.Context) (*dataframe.DataFrame, error) { |
| // Slice off a sub-dataframe from d.df. |
| df, err := d.df.Slice(d.offset, d.size) |
| if err != nil { |
| return nil, err |
| } |
| d.offset += 1 |
| return df, nil |
| } |
| |
| // NewDataFrameIterator returns a DataFrameIterator that produces a set of |
| // dataframes for the given query, domain, and alert. |
| // |
| // If domain.Offset is non-zero then we want the iterator to return a single |
| // dataframe of alert.Radius around the specified commit. Otherwise it returns a |
| // series of dataframes of size 2*alert.Radius+1 sliced from a single dataframe |
| // of size domain.N. |
| func NewDataFrameIterator( |
| ctx context.Context, |
| progress progress.Progress, |
| dfBuilder dataframe.DataFrameBuilder, |
| perfGit perfgit.Git, |
| regressionStateCallback types.ProgressCallback, |
| queryAsString string, |
| domain types.Domain, |
| alert *alerts.Alert, |
| anomalyConfig config.AnomalyConfig, |
| ) (DataFrameIterator, error) { |
| ctx, span := trace.StartSpan(ctx, "dfiter.NewDataFrameIterator") |
| defer span.End() |
| |
| // Because of GroupBy the Alert query isn't the one we use, instead a |
| // sub-query is passed in. |
| u, err := url.ParseQuery(queryAsString) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| q, err := query.New(u) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| var df *dataframe.DataFrame |
| if domain.Offset == 0 { |
| if anomalyConfig.SettlingTime != 0 { |
| currentTime := now.Now(ctx) |
| latestAllowedPoints := currentTime.Add(-1 * time.Duration(anomalyConfig.SettlingTime)) |
| if latestAllowedPoints.Before(domain.End) { |
| domain.End = latestAllowedPoints |
| } |
| } |
| |
| df, err = dfBuilder.NewNFromQuery(ctx, domain.End, q, domain.N, progress) |
| if err != nil { |
| if regressionStateCallback != nil { |
| regressionStateCallback("Failed querying the data due to an internal error.") |
| } |
| return nil, skerr.Wrapf(err, "Failed to build dataframe iterator source dataframe") |
| } |
| } else { |
| // We can get an iterator that returns just a single dataframe by making |
| // sure that the size of the origin dataframe is the same size as the |
| // slicer size, so we set them both to 2*Radius+1. |
| n := int32(2*alert.Radius + 1) |
| // Need to find an End time, which is the commit time of the commit at |
| // Offset+Radius. |
| // |
| // That is, for example, we are looking at commit 21 with a Radius of 3 |
| // to request an endpoint of 24: |
| // |
| // [ 18, 19, 20, 21, 22, 23, 24] |
| // |
| // That way we have the right number of points for types.OriginalStep |
| // (2*n+1), and by chopping down the length of the result by 1 we can |
| // get a dataframe of the right length for the rest of the step finding |
| // algorithms, i.e.: |
| // |
| // [ 18, 19, 20, 21, 22, 23 ] |
| // |
| // All of these contortions are to keep the detection algorithms |
| // consistent. Eventually types.OriginalStep should be changed to work |
| // on a dataframe of length 2*n like all the rest. |
| endCommit := types.CommitNumber(int(domain.Offset) + alert.Radius) |
| commit, err := perfGit.CommitFromCommitNumber(ctx, endCommit) |
| if err != nil { |
| if regressionStateCallback != nil { |
| regressionStateCallback(fmt.Sprintf("Not a valid commit number %d. Make sure you choose a commit old enough to have %d commits before it and %d commits after it.", endCommit, alert.Radius, alert.Radius-1)) |
| } |
| |
| return nil, skerr.Wrapf(err, "Failed to look up CommitNumber of a single cluster request.") |
| } |
| df, err = dfBuilder.NewNFromQuery(ctx, time.Unix(commit.Timestamp, 0), q, n, progress) |
| if err != nil { |
| if regressionStateCallback != nil { |
| regressionStateCallback("Failed querying the data due to an internal error.") |
| } |
| return nil, skerr.Wrapf(err, "Failed to build dataframe iterator source dataframe.") |
| } |
| } |
| if len(df.Header) < int(2*alert.Radius+1) { |
| if regressionStateCallback != nil { |
| regressionStateCallback(fmt.Sprintf("Query didn't return enough data points: Got %d. Want %d.", len(df.Header), 2*alert.Radius+1)) |
| } |
| sklog.Infof("Query didn't return enough data points: Got %d. Want %d.", len(df.Header), 2*alert.Radius+1) |
| return nil, ErrInsufficientData |
| } |
| // Record the total number of floating point values that were just queried |
| // fron the database. Since we know the size of a float we can use this to |
| // roughly estimate the MB/s of regression detection. |
| metrics2.GetCounter("perf_regression_detection_floats").Inc(int64(len(df.Header) * len(df.TraceSet))) |
| return &dataframeSlicer{ |
| df: df, |
| size: 2*alert.Radius + 1, |
| offset: 0, |
| }, nil |
| } |