blob: a84c115952edd13ad4fb9d66b1312a9fec878b68 [file] [log] [blame]
package dfiter
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.skia.org/infra/go/now"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/query"
"go.skia.org/infra/perf/go/alerts"
"go.skia.org/infra/perf/go/config"
"go.skia.org/infra/perf/go/dataframe"
"go.skia.org/infra/perf/go/dfbuilder"
perfgit "go.skia.org/infra/perf/go/git"
"go.skia.org/infra/perf/go/git/gittest"
"go.skia.org/infra/perf/go/progress"
"go.skia.org/infra/perf/go/tracestore"
"go.skia.org/infra/perf/go/tracestore/sqltracestore"
"go.skia.org/infra/perf/go/types"
)
const testTileSize = 6
var (
defaultAnomalyConfig = config.AnomalyConfig{}
)
func addValuesAtIndex(store tracestore.TraceStore, index types.CommitNumber, keyValues map[string]float32, filename string, ts time.Time) error {
ps := paramtools.ParamSet{}
params := []paramtools.Params{}
values := []float32{}
for k, v := range keyValues {
p, err := query.ParseKey(k)
if err != nil {
return err
}
ps.AddParams(p)
params = append(params, p)
values = append(values, v)
}
return store.WriteTraces(context.Background(), index, params, values, ps, filename, ts)
}
func newForTest(t *testing.T) (context.Context, dataframe.DataFrameBuilder, perfgit.Git, time.Time) {
ctx, db, _, _, _, instanceConfig := gittest.NewForTest(t)
g, err := perfgit.New(ctx, true, db, instanceConfig)
require.NoError(t, err)
cfg := config.DataStoreConfig{
TileSize: testTileSize,
}
store, err := sqltracestore.New(db, cfg)
require.NoError(t, err)
ts := gittest.StartTime
// Add some points to the first and second tile.
err = addValuesAtIndex(store, 0, map[string]float32{
",arch=x86,config=8888,": 1.2,
",arch=x86,config=565,": 2.1,
",arch=arm,config=8888,": 100.5,
}, "gs://foo.json", ts)
assert.NoError(t, err)
err = addValuesAtIndex(store, 1, map[string]float32{
",arch=x86,config=8888,": 1.3,
",arch=x86,config=565,": 2.2,
",arch=arm,config=8888,": 100.6,
}, "gs://foo.json", ts.Add(time.Minute))
assert.NoError(t, err)
err = addValuesAtIndex(store, 7, map[string]float32{
",arch=x86,config=8888,": 1.7,
",arch=x86,config=565,": 2.5,
",arch=arm,config=8888,": 101.1,
}, "gs://foo.json", ts.Add(7*time.Minute))
assert.NoError(t, err)
lastTimeStamp := ts.Add(8 * time.Minute)
err = addValuesAtIndex(store, 8, map[string]float32{
",arch=x86,config=8888,": 1.8,
",arch=x86,config=565,": 2.6,
",arch=arm,config=8888,": 101.2,
}, "gs://foo.json", lastTimeStamp)
assert.NoError(t, err)
instanceConfig.DataStoreConfig.TileSize = testTileSize
require.NoError(t, err)
dfb := dfbuilder.NewDataFrameBuilderFromTraceStore(g, store, 2, false)
return ctx, dfb, g, lastTimeStamp
}
func TestNewDataFrameIterator_MultipleDataframes_SingleFrameOfLengthThree(t *testing.T) {
ctx, dfb, g, _ := newForTest(t)
// This is a MultipleDataframes request because Domain.Offset = 0.
// This request should return two frames since we only have data at four
// commits in the entire store, and NewDataFrameIterator only produces dense
// dataframes.
alert := &alerts.Alert{
Radius: 1,
}
domain := types.Domain{
End: gittest.StartTime.Add(8 * time.Minute), // Some time after the last commit.
N: 10,
Offset: 0,
}
query := "arch=x86"
iter, err := NewDataFrameIterator(ctx, progress.New(), dfb, g, nil, query, domain, alert, defaultAnomalyConfig)
require.NoError(t, err)
require.True(t, iter.Next())
df, err := iter.Value(ctx)
require.NoError(t, err)
assert.Equal(t, types.TraceSet{
",arch=x86,config=565,": types.Trace{2.1, 2.2, 2.5},
",arch=x86,config=8888,": types.Trace{1.2, 1.3, 1.7},
}, df.TraceSet)
require.True(t, iter.Next())
df, err = iter.Value(ctx)
require.NoError(t, err)
assert.Equal(t, types.TraceSet{
",arch=x86,config=565,": types.Trace{2.2, 2.5, 2.6},
",arch=x86,config=8888,": types.Trace{1.3, 1.7, 1.8},
}, df.TraceSet)
require.False(t, iter.Next())
}
func TestNewDataFrameIterator_MultipleDataframes_TwoFramesOfLengthTwo(t *testing.T) {
ctx, dfb, g, _ := newForTest(t)
// This is a MultipleDataframes request because Domain.Offset = 0.
// This request should only return two frames of length one since we only
// have data at four commits in the entire store, and only three of them
// come before the given domain.End time. NewDataFrameIterator only produces
// dense dataframes and an Alert.Radius of 0 means the dataframe will have a
// length of 1.
alert := &alerts.Alert{
Radius: 0,
}
domain := types.Domain{
End: gittest.StartTime.Add(5 * time.Minute),
N: 2,
Offset: 0,
}
query := "arch=x86"
iter, err := NewDataFrameIterator(ctx, progress.New(), dfb, g, nil, query, domain, alert, defaultAnomalyConfig)
require.NoError(t, err)
require.True(t, iter.Next())
df, err := iter.Value(ctx)
require.NoError(t, err)
assert.Equal(t, types.TraceSet{
",arch=x86,config=565,": types.Trace{2.1},
",arch=x86,config=8888,": types.Trace{1.2},
}, df.TraceSet)
require.True(t, iter.Next())
df, err = iter.Value(ctx)
require.NoError(t, err)
assert.Equal(t, types.TraceSet{
",arch=x86,config=565,": types.Trace{2.2},
",arch=x86,config=8888,": types.Trace{1.3},
}, df.TraceSet)
require.False(t, iter.Next())
}
// An instance of progressCapture is used to capture Progress messages for
// testing.
type progressCapture struct {
message string
}
// callback implements types.ProgressCallback.
func (p *progressCapture) callback(message string) {
p.message = message
}
func TestNewDataFrameIterator_InsufficientData_ReturnsError(t *testing.T) {
ctx, dfb, g, _ := newForTest(t)
// This is a MultipleDataframes request because Domain.Offset = 0.
// This request should only return an error because we ask for 11 points
// (radius 5), and we only have 5 points in the database.
alert := &alerts.Alert{
Radius: 5,
}
domain := types.Domain{
End: gittest.StartTime.Add(5 * time.Minute),
N: 2,
Offset: 0,
}
query := "arch=x86"
pc := &progressCapture{}
_, err := NewDataFrameIterator(ctx, progress.New(), dfb, g, pc.callback, query, domain, alert, defaultAnomalyConfig)
require.Error(t, err)
require.Equal(t, "Query didn't return enough data points: Got 2. Want 11.", pc.message)
}
func TestNewDataFrameIterator_ExactDataframeRequest_ErrIfWeSearchAfterLastCommit(t *testing.T) {
ctx, dfb, g, _ := newForTest(t)
// This is an ExactDataframeRequest because Offset != 0.
// This request should error because we start at commit 10 which doesn't
// exist.
alert := &alerts.Alert{
Radius: 1,
}
domain := types.Domain{
N: 2,
Offset: 30,
}
q := "arch=x86"
pc := &progressCapture{}
_, err := NewDataFrameIterator(ctx, progress.New(), dfb, g, pc.callback, q, domain, alert, defaultAnomalyConfig)
require.Contains(t, err.Error(), "Failed to look up CommitNumber")
require.Equal(t, "Not a valid commit number 31. Make sure you choose a commit old enough to have 1 commits before it and 0 commits after it.", pc.message)
}
func TestNewDataFrameIterator_ExactDataframeRequest_Success(t *testing.T) {
ctx, dfb, g, _ := newForTest(t)
// This is an ExactDataframeRequest because Offset != 0.
alert := &alerts.Alert{
Radius: 1,
}
domain := types.Domain{
N: 2,
Offset: 6, // Start at 6 with a radius of 1 to get the commit at 7.
}
q := "arch=x86"
iter, err := NewDataFrameIterator(ctx, progress.New(), dfb, g, nil, q, domain, alert, defaultAnomalyConfig)
require.NoError(t, err)
require.True(t, iter.Next())
df, err := iter.Value(ctx)
require.NoError(t, err)
assert.Equal(t, 3, len(df.Header))
assert.Equal(t, types.CommitNumber(0), df.Header[0].Offset)
assert.Equal(t, types.CommitNumber(1), df.Header[1].Offset)
assert.Equal(t, types.CommitNumber(7), df.Header[2].Offset)
}
func TestNewDataFrameIterator_ExactDataframeRequest_ErrIfWeSearchBeforeFirstCommit(t *testing.T) {
ctx, dfb, g, _ := newForTest(t)
// This is an ExactDataframeRequest because Offset != 0.
// This request should error because we start at commit -5 which doesn't
// exist.
alert := &alerts.Alert{
Radius: 1,
}
domain := types.Domain{
N: 2,
Offset: -5,
}
q := "arch=x86"
pc := &progressCapture{}
_, err := NewDataFrameIterator(ctx, progress.New(), dfb, g, pc.callback, q, domain, alert, defaultAnomalyConfig)
require.Contains(t, err.Error(), "Failed to look up CommitNumber")
require.Equal(t, "Not a valid commit number -4. Make sure you choose a commit old enough to have 1 commits before it and 0 commits after it.", pc.message)
}
func TestNewDataFrameIterator_MultipleDataframes_ErrIfWeSearchBeforeFirstCommit(t *testing.T) {
ctx, dfb, g, _ := newForTest(t)
// This is a MultipleDataframes request because Domain.Offset = 0.
// This request should error because we start at a commit time before the
// first commit in the repo.
alert := &alerts.Alert{
Radius: 1,
}
domain := types.Domain{
End: gittest.StartTime.Add(-1 * time.Minute),
N: 2,
Offset: 0,
}
q := "arch=x86"
pc := &progressCapture{}
_, err := NewDataFrameIterator(ctx, progress.New(), dfb, g, pc.callback, q, domain, alert, defaultAnomalyConfig)
require.Contains(t, err.Error(), "Failed to build dataframe iterator")
require.Equal(t, "Failed querying the data due to an internal error.", pc.message)
}
func TestNewDataFrameIterator_MultipleDataframesWithSettlingTime_OneFramesOfLengthThree(t *testing.T) {
_, dfb, g, lastTimeStamp := newForTest(t)
// This is a MultipleDataframes request because Domain.Offset = 0.
// This request should only return one frame of length three since we only
// have data at four commits in the entire store, and one of them comes
// outside of the settling time. An Alert.Radius of 1 means the dataframe
// will have a length of 3.
alert := &alerts.Alert{
Radius: 1,
}
domain := types.Domain{
End: gittest.StartTime.Add(8 * time.Minute),
N: 4,
Offset: 0,
}
query := "arch=x86"
anomalyConfig := config.AnomalyConfig{
SettlingTime: config.DurationAsString(30 * time.Second),
}
ctx := now.TimeTravelingContext(lastTimeStamp)
iter, err := NewDataFrameIterator(ctx, progress.New(), dfb, g, nil, query, domain, alert, anomalyConfig)
require.NoError(t, err)
require.True(t, iter.Next())
df, err := iter.Value(ctx)
require.NoError(t, err)
assert.Equal(t, types.TraceSet{
",arch=x86,config=565,": types.Trace{2.1, 2.2, 2.5},
",arch=x86,config=8888,": types.Trace{1.2, 1.3, 1.7},
}, df.TraceSet)
// Only one trace returned.
require.False(t, iter.Next())
}