blob: 27d7ee13572ba6a8497f94d88f62bb7c23cb45f9 [file] [log] [blame]
package tilesource
import (
"context"
"sync"
"time"
"go.skia.org/infra/golden/go/code_review"
"go.skia.org/infra/golden/go/publicparams"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/go/vcsinfo"
"go.skia.org/infra/golden/go/ignore"
"go.skia.org/infra/golden/go/tiling"
"go.skia.org/infra/golden/go/tracestore"
)
const (
emptyCommitsAtHeadMetric = "gold_empty_commits_at_head"
filledTracesAtHeadMetric = "gold_filled_traces_at_head"
)
type TileSource interface {
// GetTile returns the most recently loaded Tile.
GetTile() tiling.ComplexTile
}
type CachedTileSourceConfig struct {
CLUpdater code_review.ChangelistLandedUpdater
IgnoreStore ignore.Store
TraceStore tracestore.TraceStore
VCS vcsinfo.VCS
// optional. If specified, will only show the params that match this query. This is
// opt-in, to avoid leaking.
PubliclyViewableParams publicparams.Matcher
// NCommits is the number of commits we should consider. If NCommits is
// 0 or smaller all commits in the last tile will be considered.
NCommits int
}
type CachedTileSourceImpl struct {
CachedTileSourceConfig
lastCpxTile tiling.ComplexTile
mutex sync.RWMutex
}
func New(c CachedTileSourceConfig) *CachedTileSourceImpl {
cti := &CachedTileSourceImpl{
CachedTileSourceConfig: c,
}
return cti
}
// StartUpdater loads the initial tile and starts a goroutine to update it at
// the specified interval. It returns an error if the initial load fails, but
// will only log errors that happen later.
func (s *CachedTileSourceImpl) StartUpdater(ctx context.Context, interval time.Duration) error {
if err := s.updateTile(ctx); err != nil {
return skerr.Wrapf(err, "failed initial tile update")
}
go util.RepeatCtx(ctx, interval, func(ctx context.Context) {
if err := s.updateTile(ctx); err != nil {
sklog.Errorf("Could not update tile: %s", err)
}
})
return nil
}
// GetTile implements the TileSource interface.
func (s *CachedTileSourceImpl) GetTile() tiling.ComplexTile {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.lastCpxTile
}
// updateTile fetches the latest tile and caches it. updateTile expects to be called from a
// single goroutine (see StartUpdater).
func (s *CachedTileSourceImpl) updateTile(ctx context.Context) error {
defer metrics2.FuncTimer().Stop()
if err := s.VCS.Update(ctx, true, false); err != nil {
return skerr.Wrapf(err, "updating VCS")
}
var prevCommit tiling.Commit
if s.lastCpxTile != nil {
commits := s.lastCpxTile.AllCommits()
if len(commits) > 0 {
prevCommit = commits[len(commits)-1]
}
}
denseTile, allCommits, err := s.TraceStore.GetDenseTile(ctx, s.NCommits)
if err != nil {
return skerr.Wrapf(err, "fetching dense tile")
}
// Filter down to the publicly viewable ones
denseTile = s.filterTile(denseTile)
// Now that we have filtered the public list, compute metrics. We don't care about ignores,
// since that's something the user is in charge of, whereas the public list is something
// that's part of the Gold config.
computeMetricsOnTile(denseTile, allCommits)
cpxTile := tiling.NewComplexTile(denseTile)
cpxTile.SetSparse(allCommits)
// Get the tile without the ignored traces and update the complex tile.
ignores, err := s.IgnoreStore.List(ctx)
if err != nil {
return skerr.Wrapf(err, "fetching ignore rules")
}
retIgnoredTile, ignoreRules, err := ignore.FilterIgnored(denseTile, ignores)
if err != nil {
return skerr.Wrapf(err, "applying ignore rules to tile")
}
cpxTile.SetIgnoreRules(retIgnoredTile, ignoreRules)
// check if all the expectations of all commits have been added to the tile.
err = s.checkForLandedChangelists(ctx, prevCommit, allCommits)
if err != nil {
return skerr.Wrapf(err, "identifying CLs/CLExpectations that have landed")
}
s.mutex.Lock()
defer s.mutex.Unlock()
// Update the cached tile and return the result.
s.lastCpxTile = cpxTile
return nil
}
// filterTile creates a new tile from the given tile that contains
// only traces that match the publicly viewable params.
func (s *CachedTileSourceImpl) filterTile(tile *tiling.Tile) *tiling.Tile {
if s.PubliclyViewableParams == nil {
return tile
}
// filter tile.
ret := &tiling.Tile{
Traces: make(map[tiling.TraceID]*tiling.Trace, len(tile.Traces)),
Commits: tile.Commits,
}
// Iterate over the tile and copy the publicly viewable traces over.
// Build the paramset in the process.
paramSet := paramtools.ParamSet{}
for traceID, trace := range tile.Traces {
ko := trace.KeysAndOptions()
if s.PubliclyViewableParams.Matches(ko) {
ret.Traces[traceID] = trace
paramSet.AddParams(ko)
}
}
paramSet.Normalize()
ret.ParamSet = paramSet
sklog.Infof("After filtering %d original traces, %d are publicly viewable.", len(tile.Traces), len(ret.Traces))
return ret
}
// computeMetricsOnTile calculates a few metrics related to the contents of the tile.
func computeMetricsOnTile(denseTile *tiling.Tile, allCommits []tiling.Commit) {
tracesWithData := int64(0)
for _, trace := range denseTile.Traces {
if !trace.IsMissing(trace.Len() - 1) {
tracesWithData++
}
}
metrics2.GetInt64Metric(filledTracesAtHeadMetric).Update(tracesWithData)
emptyCommitsAtHead := 0
if len(denseTile.Commits) == 0 {
metrics2.GetInt64Metric(emptyCommitsAtHeadMetric).Update(int64(len(allCommits)))
return
}
lastCommitWithData := denseTile.Commits[len(denseTile.Commits)-1].Hash
// Start at the end of all of the commits and walk backwards until we hit the last commit
// with data.
for ; emptyCommitsAtHead < len(allCommits); emptyCommitsAtHead++ {
if allCommits[len(allCommits)-1-emptyCommitsAtHead].Hash == lastCommitWithData {
break
}
}
metrics2.GetInt64Metric(emptyCommitsAtHeadMetric).Update(int64(emptyCommitsAtHead))
}
// checkForLandedChangelists checks all commits of the current tile whether
// the associated expectations have been added to the baseline of the master.
func (s *CachedTileSourceImpl) checkForLandedChangelists(ctx context.Context, prev tiling.Commit, commits []tiling.Commit) error {
if s.CLUpdater == nil {
sklog.Infof("Not Updating clstore with landed CLs because no updater configured.")
return nil
}
if len(commits) == 0 {
sklog.Warningf("No commits in tile?")
return nil
}
if !prev.CommitTime.IsZero() {
// re-slice commits after prev so as to avoid doing redundant work.
lastIdx := 0
for i, c := range commits {
if prev.Hash == c.Hash {
lastIdx = i + 1
break
}
}
commits = commits[lastIdx:]
if len(commits) == 0 {
sklog.Infof("No new commits since last cycle")
return nil
}
}
hashes := make([]string, 0, len(commits))
for _, c := range commits {
hashes = append(hashes, c.Hash)
}
xc, err := s.VCS.DetailsMulti(ctx, hashes, false)
if err != nil {
return skerr.Wrapf(err, "fetching details of %d hashes starting at %s", len(hashes), hashes[0])
}
return skerr.Wrap(s.CLUpdater.UpdateChangelistsAsLanded(ctx, xc))
}