blob: cdc304fc7760df2331612f3838c2e563480050fa [file] [log] [blame]
package dfiter
import (
"context"
"fmt"
"sync"
"time"
"go.opencensus.io/trace"
"go.skia.org/infra/go/query"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/perf/go/dataframe"
"go.skia.org/infra/perf/go/progress"
"golang.org/x/sync/singleflight"
)
// DfProvider is a struct used to manage a cache of dataframes.
type DfProvider struct {
dfCache map[string]*dataframe.DataFrame
mutex sync.RWMutex
group singleflight.Group
}
// NewDfProvider returns a new instance of the DfProvider.
func NewDfProvider() *DfProvider {
return &DfProvider{
dfCache: map[string]*dataframe.DataFrame{},
}
}
// GetDataFrame returns a dataframe instance for the provided query and time range.
func (d *DfProvider) GetDataFrame(ctx context.Context, dfBuilder dataframe.DataFrameBuilder, query *query.Query, end time.Time, n int32, progress progress.Progress) (*dataframe.DataFrame, error) {
ctx, span := trace.StartSpan(ctx, "dfiterProvider.GetDataFrame")
defer span.End()
key := key(query, end, n)
df := d.readFromCache(key)
if df != nil {
sklog.Infof("Dataframe found in cache for key: %s", key)
return df, nil
}
// The single flight group manages multiple threads processing keys in parallel.
// The group ensures that only one thread is executing per key. If a thread is
// already running on a key, the other thread waits and the same result is returned.
v, err, _ := d.group.Do(key, func() (interface{}, error) {
df, err := dfBuilder.NewNFromQuery(ctx, end, query, n, progress)
if err != nil {
return nil, err
}
d.addToCache(key, df)
return df, nil
})
if err != nil {
return nil, err
}
return v.(*dataframe.DataFrame), nil
}
// readFromCache reads the given key from the cache.
func (d *DfProvider) readFromCache(key string) *dataframe.DataFrame {
d.mutex.RLock()
defer d.mutex.RUnlock()
if df, ok := d.dfCache[key]; ok {
return df
}
return nil
}
// addToCache adds the given key and dataframe pair to the cache.
func (d *DfProvider) addToCache(key string, df *dataframe.DataFrame) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.dfCache[key] = df
}
// key generates a key for the cache.
func key(query *query.Query, end time.Time, n int32) string {
return fmt.Sprintf("%s_%s_%d", query.KeyValueString(), end.String(), n)
}