blob: fdbf8b7b2ba4f754f1b06aa6b627c5126c4f6e9a [file] [log] [blame]
package psrefresh
import (
"context"
"net/url"
"sync"
"time"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/query"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/perf/go/config"
"go.skia.org/infra/perf/go/dataframe"
"go.skia.org/infra/perf/go/types"
)
// OPSProvider allows access to OrdererParamSets. TraceStore implements this interface.
type OPSProvider interface {
GetLatestTile(context.Context) (types.TileNumber, error)
GetParamSet(ctx context.Context, tileNumber types.TileNumber) (paramtools.ReadOnlyParamSet, error)
}
// ParamSetRefresher provides an interface for accessing instance param sets.
type ParamSetRefresher interface {
// GetAll returns all the paramsets in the instance.
GetAll() paramtools.ReadOnlyParamSet
// GetParamSetForQuery returns the paramsets filtered for the provided query.
GetParamSetForQuery(ctx context.Context, query *query.Query, q url.Values) (int64, paramtools.ParamSet, error)
// Start kicks off the param set refresh routine.
Start(period time.Duration) error
}
// defaultParamSetRefresher keeps a fresh paramtools.ParamSet that represents all the
// traces stored in the two most recent tiles in the trace store.
type defaultParamSetRefresher struct {
traceStore OPSProvider
period time.Duration
numParamSets int
dfBuilder dataframe.DataFrameBuilder
qConfig config.QueryConfig
mutex sync.Mutex // protects ps.
ps paramtools.ReadOnlyParamSet
}
// NewDefaultParamSetRefresher builds a new *ParamSetRefresher.
func NewDefaultParamSetRefresher(traceStore OPSProvider, numParamSets int, dfBuilder dataframe.DataFrameBuilder, qconfig config.QueryConfig) *defaultParamSetRefresher {
return &defaultParamSetRefresher{
traceStore: traceStore,
numParamSets: numParamSets,
dfBuilder: dfBuilder,
qConfig: qconfig,
ps: paramtools.ReadOnlyParamSet{},
}
}
// Start actually starts the refreshing process.
//
// The 'period' is how often the paramset should be refreshed.
func (pf *defaultParamSetRefresher) Start(period time.Duration) error {
pf.period = period
sklog.Info("Refresher refreshing")
if err := pf.oneStep(); err != nil {
return skerr.Wrapf(err, "Failed to build the initial ParamSet")
}
go pf.refresh()
return nil
}
func (pf *defaultParamSetRefresher) oneStep() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2)
defer cancel()
tileKey, err := pf.traceStore.GetLatestTile(ctx)
if err != nil {
return skerr.Wrapf(err, "Failed to get starting tile.")
}
ps := paramtools.NewParamSet()
first := true
for i := 0; i < pf.numParamSets; i++ {
ps1, err := pf.traceStore.GetParamSet(ctx, tileKey)
if err != nil {
if first {
// Only the failing on the first tile should be an error,
// previous tiles may be empty, or invalid.
return skerr.Wrapf(err, "Failed to get paramset from first tile.")
}
sklog.Warningf("Failed to get paramset from %d most recent tile: %s", i, err)
}
first = false
ps.AddParamSet(ps1)
tileKey = tileKey.Prev()
}
ps.Normalize()
pf.ps = ps.Freeze()
return nil
}
func (pf *defaultParamSetRefresher) refresh() {
stepFailures := metrics2.GetCounter("paramset_refresh_failures", nil)
for range time.Tick(pf.period) {
if err := pf.oneStep(); err != nil {
sklog.Errorf("Failed to refresh the ParamSet: %s", err)
stepFailures.Inc(1)
}
}
}
// GetAll returns the fresh paramset.
func (pf *defaultParamSetRefresher) GetAll() paramtools.ReadOnlyParamSet {
pf.mutex.Lock()
defer pf.mutex.Unlock()
return pf.ps
}
// GetParamSetForQuery returns the trace count, paramset for the given query.
func (pf *defaultParamSetRefresher) GetParamSetForQuery(ctx context.Context, query *query.Query, q url.Values) (int64, paramtools.ParamSet, error) {
return pf.dfBuilder.PreflightQuery(ctx, query, pf.GetAll())
}
// append the default values for parameters
func (pf *defaultParamSetRefresher) UpdateQueryValueWithDefaults(v url.Values) {
if len(pf.qConfig.DefaultParamSelections) > 0 {
for key, values := range pf.qConfig.DefaultParamSelections {
v[key] = values
}
}
}
// check whether value is part of the list validValues
func ShouldCacheValue(value string, validValues []string) bool {
for _, validValue := range validValues {
if value == validValue {
return true
}
}
return false
}
// Confirm we implement the interface.
var _ ParamSetRefresher = (*defaultParamSetRefresher)(nil)