blob: 331ae10dbf87f2132218c92826cf49aabd9d93dd [file] [log] [blame]
package tilesource
import (
"context"
"net/url"
"sync"
"time"
"go.skia.org/infra/golden/go/code_review"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/tiling"
"go.skia.org/infra/go/util"
"go.skia.org/infra/go/vcsinfo"
"go.skia.org/infra/golden/go/ignore"
"go.skia.org/infra/golden/go/tracestore"
"go.skia.org/infra/golden/go/types"
)
type TileSource interface {
// GetTile returns the most recently loaded Tile.
GetTile() types.ComplexTile
}
type CachedTileSourceConfig struct {
CLUpdater code_review.Updater
IgnoreStore ignore.Store
TraceStore tracestore.TraceStore
VCS vcsinfo.VCS
// optional. If specified, will only show the params that match this query. This is
// opt-in, to avoid leaking.
PubliclyViewableParams paramtools.ParamSet
// NCommits is the number of commits we should consider. If NCommits is
// 0 or smaller all commits in the last tile will be considered.
NCommits int
}
type CachedTileSourceImpl struct {
CachedTileSourceConfig
lastCpxTile types.ComplexTile
mutex sync.RWMutex
}
func New(c CachedTileSourceConfig) *CachedTileSourceImpl {
cti := &CachedTileSourceImpl{
CachedTileSourceConfig: c,
}
return cti
}
// StartUpdater loads the initial tile and starts a goroutine to update it at
// the specified interval. It returns an error if the initial load fails, but
// will only log errors that happen later.
func (s *CachedTileSourceImpl) StartUpdater(ctx context.Context, interval time.Duration) error {
if err := s.updateTile(ctx); err != nil {
return skerr.Wrapf(err, "failed initial tile update")
}
go util.RepeatCtx(interval, ctx, func(ctx context.Context) {
if err := s.updateTile(ctx); err != nil {
sklog.Errorf("Could not update tile: %s", err)
}
})
return nil
}
// GetTile implements the TileSource interface.
func (s *CachedTileSourceImpl) GetTile() types.ComplexTile {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.lastCpxTile
}
// updateTile fetches the latest tile and caches it. updateTile expects to be called from a
// single goroutine (see StartUpdater).
func (s *CachedTileSourceImpl) updateTile(ctx context.Context) error {
defer metrics2.FuncTimer().Stop()
if err := s.VCS.Update(ctx, true, false); err != nil {
return skerr.Wrapf(err, "updating VCS")
}
var prevCommit *tiling.Commit
if s.lastCpxTile != nil {
commits := s.lastCpxTile.AllCommits()
if len(commits) > 0 {
prevCommit = commits[len(commits)-1]
}
}
denseTile, allCommits, err := s.TraceStore.GetDenseTile(ctx, s.NCommits)
if err != nil {
return skerr.Wrapf(err, "fetching dense tile")
}
// Filter down to the publicly viewable ones
denseTile = s.filterTile(denseTile)
cpxTile := types.NewComplexTile(denseTile)
cpxTile.SetSparse(allCommits)
// Get the tile without the ignored traces and update the complex tile.
ignores, err := s.IgnoreStore.List(ctx)
if err != nil {
return skerr.Wrapf(err, "fetching ignore rules")
}
retIgnoredTile, ignoreRules, err := ignore.FilterIgnored(denseTile, ignores)
if err != nil {
return skerr.Wrapf(err, "applying ignore rules to tile")
}
cpxTile.SetIgnoreRules(retIgnoredTile, ignoreRules)
// check if all the expectations of all commits have been added to the tile.
err = s.checkForLandedChangeLists(ctx, prevCommit, allCommits)
if err != nil {
return skerr.Wrapf(err, "identifying CLs/CLExpectations that have landed")
}
s.mutex.Lock()
defer s.mutex.Unlock()
// Update the cached tile and return the result.
s.lastCpxTile = cpxTile
return nil
}
// filterTile creates a new tile from the given tile that contains
// only traces that match the publicly viewable params.
func (s *CachedTileSourceImpl) filterTile(tile *tiling.Tile) *tiling.Tile {
if len(s.PubliclyViewableParams) == 0 {
return tile
}
// filter tile.
ret := &tiling.Tile{
Traces: make(map[tiling.TraceID]tiling.Trace, len(tile.Traces)),
Commits: tile.Commits,
}
// Iterate over the tile and copy the publicly viewable traces over.
// Build the paramset in the process.
paramSet := paramtools.ParamSet{}
for traceID, trace := range tile.Traces {
if tiling.Matches(trace, url.Values(s.PubliclyViewableParams)) {
ret.Traces[traceID] = trace
paramSet.AddParams(trace.Params())
}
}
paramSet.Normalize()
ret.ParamSet = paramSet
sklog.Infof("After filtering %d original traces, %d are publicly viewable.", len(tile.Traces), len(ret.Traces))
return ret
}
// checkForLandedChangeLists checks all commits of the current tile whether
// the associated expectations have been added to the baseline of the master.
func (s *CachedTileSourceImpl) checkForLandedChangeLists(ctx context.Context, prev *tiling.Commit, commits []*tiling.Commit) error {
if s.CLUpdater == nil {
sklog.Infof("Not Updating clstore with landed CLs because no updater configured.")
return nil
}
if len(commits) == 0 {
sklog.Warningf("No commits in tile?")
return nil
}
if prev != nil {
// re-slice commits after prev so as to avoid doing redundant work.
lastIdx := 0
for i, c := range commits {
if prev.Hash == c.Hash {
lastIdx = i + 1
break
}
}
commits = commits[lastIdx:]
if len(commits) == 0 {
sklog.Infof("No new commits since last cycle")
return nil
}
}
hashes := make([]string, 0, len(commits))
for _, c := range commits {
hashes = append(hashes, c.Hash)
}
xc, err := s.VCS.DetailsMulti(ctx, hashes, false)
if err != nil {
return skerr.Wrapf(err, "fetching details of %d hashes starting at %s", len(hashes), hashes[0])
}
return skerr.Wrap(s.CLUpdater.UpdateChangeListsAsLanded(ctx, xc))
}