blob: 3a1e72b423be24bd091ab4580eb533891de625e8 [file] [log] [blame]
package gcs_baseliner
import (
lru ""
// TODO(stephana): Tune issueCacheSize by either finding a good value that works across instance
// or one that can be tuned.
const (
// issueCacheSize is the size of the baselines cache for issue.
issueCacheSize = 10000
// baselineExpirationTime is how long the baselines should be cached for reading.
baselineExpirationTime = time.Minute
// BaselinerImpl is a helper type that provides functions to write baselines (expectations) to
// GCS and retrieve them. Other packages use it to continuously write expectations to GCS
// as they become available.
type BaselinerImpl struct {
gStorageClient storage.GCSClient
expectationsStore expstorage.ExpectationsStore
tryjobStore tryjobstore.TryjobStore
vcs vcsinfo.VCS
// mutex protects lastWrittenBaselines, baselineCache and currentTile
mutex sync.RWMutex
// lastWrittenBaselines maps[commit_hash]MD5_sum_of_baseline to keep track whether a baseline for
// a specific commit has been written already and whether we need to write it again (different MD5)
lastWrittenBaselines map[string]string
// baselineCache caches the baselines of all commits of the current tile.
// Maps commit hash to *baseline.Baseline
baselineCache *cache.Cache
// currTileInfo is the latest tileInfo we have.
currTileInfo baseline.TileInfo
// issueBaselineCache caches baselines for issue by mapping from issueID to baseline.
issueBaselineCache *lru.Cache
// New creates a new instance of baseliner.Baseliner that interacts with baselines in GCS.
func New(gStorageClient storage.GCSClient, expectationsStore expstorage.ExpectationsStore, tryjobStore tryjobstore.TryjobStore, vcs vcsinfo.VCS) (*BaselinerImpl, error) {
c, err := lru.New(issueCacheSize)
if err != nil {
return nil, skerr.Fmt("Error allocating cache: %s", err)
return &BaselinerImpl{
gStorageClient: gStorageClient,
expectationsStore: expectationsStore,
tryjobStore: tryjobStore,
vcs: vcs,
issueBaselineCache: c,
lastWrittenBaselines: map[string]string{},
baselineCache: cache.New(baselineExpirationTime, baselineExpirationTime),
}, nil
// CanWriteBaseline implements the baseline.Baseliner interface.
func (b *BaselinerImpl) CanWriteBaseline() bool {
return (b.gStorageClient != nil) && (b.gStorageClient.Options().BaselineGSPath != "")
// PushMasterBaselines implements the baseline.Baseliner interface.
func (b *BaselinerImpl) PushMasterBaselines(tileInfo baseline.TileInfo, targetHash string) (*baseline.Baseline, error) {
defer shared.NewMetricsTimer("push_master_baselines").Stop()
defer b.mutex.Unlock()
if tileInfo == nil {
tileInfo = b.currTileInfo
if tileInfo == nil {
return nil, skerr.Fmt("Received nil tile and no previous tile defined")
if !b.CanWriteBaseline() {
return nil, skerr.Fmt("Trying to write baseline while GCS path is not configured.")
// Calculate the baselines for the master tile.
exps, err := b.expectationsStore.Get()
if err != nil {
return nil, skerr.Fmt("Unable to retrieve expectations: %s", err)
// Remove negatives/untriaged entries
exps = exps.AsBaseline()
// Make sure we have all commits, not just the ones that are in the tile. Currently a tile is
// fetched in intervals. New commits might have arrived since the last tile was read. Below we
// extrapolate the baselines of the new commits to be identical to the last commit in the tile.
// As new data arrive in the next tile, we update the baselines for these commits.
tileCommits := tileInfo.AllCommits()
if len(tileCommits) == 0 {
sklog.Warningf("tile is empty, doing nothing")
return nil, nil
extraCommits, err := b.getCommitsSince(tileCommits[len(tileCommits)-1])
if err != nil {
return nil, skerr.Fmt("error getting commits since %v: %s", tileCommits[len(tileCommits)-1], err)
commits := append(extraCommits, tileCommits...)
// Get the current list of files that have been written.
lastWritten := b.lastWrittenBaselines
// Write the ones to disk that have not been written
// Maps commit hash -> md5 of expectations
written := map[string]string{}
var wMutex sync.Mutex
var egroup errgroup.Group
md5Sum, err := util.MD5Sum(exps)
if err != nil {
return nil, skerr.Wrapf(err, "calculating md5 hash of expectations")
var ret *baseline.Baseline
for _, commit := range commits {
// If we have written this baseline before, we mark it as written and process the next one.
if old, ok := lastWritten[commit.Hash]; ok && md5Sum == old {
written[commit.Hash] = md5Sum
func(commit *tiling.Commit) {
bLine := baseline.EmptyBaseline()
bLine.Expectations = exps
bLine.MD5 = md5Sum
bLine.Issue = types.MasterBranch
if commit.Hash == targetHash {
ret = bLine
b.baselineCache.Set(commit.Hash, bLine, cache.DefaultExpiration)
egroup.Go(func() error {
// Write the baseline to GCS.
_, err := b.gStorageClient.WriteBaseline(bLine, commit.Hash)
if err != nil {
return skerr.Fmt("Error writing baseline to GCS: %s", err)
defer wMutex.Unlock()
written[commit.Hash] = bLine.MD5
return nil
if err := egroup.Wait(); err != nil {
return nil, skerr.Fmt("Problem writing per-commit baselines to GCS: %s", err)
b.currTileInfo = tileInfo
b.lastWrittenBaselines = written
return ret, nil
// PushIssueBaseline implements the baseline.Baseliner interface.
func (b *BaselinerImpl) PushIssueBaseline(issueID int64, tileInfo baseline.TileInfo, dCounter digest_counter.DigestCounter) error {
issueExpStore := b.expectationsStore.ForIssue(issueID)
exp, err := issueExpStore.Get()
if err != nil {
return skerr.Fmt("Unable to get issue expectations: %s", err)
tryjobs, tryjobResults, err := b.tryjobStore.GetTryjobs(issueID, nil, true, true)
if err != nil {
return skerr.Fmt("Unable to get TryjobResults")
base, err := baseline.GetBaselineForIssue(issueID, tryjobs, tryjobResults, exp, tileInfo.AllCommits())
if err != nil {
return skerr.Fmt("Error calculating issue baseline: %s", err)
// Add it to the cache.
_ = b.issueBaselineCache.Add(issueID, base)
if !b.CanWriteBaseline() {
return skerr.Fmt("Trying to write baseline while GCS path is not configured.")
// Write the baseline to GCS.
outputPath, err := b.gStorageClient.WriteBaseline(base, "")
if err != nil {
return skerr.Fmt("Error writing baseline to GCS: %s", err)
sklog.Infof("Baseline for issue %d written to %s.", issueID, outputPath)
return nil
// FetchBaseline implements the baseline.Baseliner interface.
func (b *BaselinerImpl) FetchBaseline(commitHash string, issueID int64, issueOnly bool) (*baseline.Baseline, error) {
isIssue := !types.IsMasterBranch(issueID)
var masterBaseline *baseline.Baseline
var issueBaseline *baseline.Baseline
var egroup errgroup.Group
// Retrieve the baseline on master.
egroup.Go(func() error {
var err error
masterBaseline, err = b.getMasterExpectations(commitHash)
if err != nil {
return skerr.Fmt("Could not get master baseline: %s", err)
return nil
if isIssue {
egroup.Go(func() error {
val, ok := b.issueBaselineCache.Get(issueID)
if ok {
issueBaseline = val.(*baseline.Baseline)
return nil
var err error
issueBaseline, err = b.gStorageClient.ReadBaseline("", issueID)
if err != nil {
return skerr.Fmt("Could not get baseline for issue %d: %s", issueID, err)
// If no baseline was found. We place an empty one.
if issueBaseline == nil {
issueBaseline = baseline.EmptyBaseline()
return nil
if err := egroup.Wait(); err != nil {
return nil, skerr.Fmt("Could not fetch baselines: %s", err)
if isIssue {
if issueOnly {
// Only return the portion of the baseline that would be contributed by the issue
issueBaseline.Issue = issueID
masterBaseline = issueBaseline
} else {
// Clone the retrieved baseline before we inject the issue information.
masterBaseline = masterBaseline.Copy()
masterBaseline.Issue = issueID
return masterBaseline, nil
// getCommitSince returns all the commits have been added to the repo since the given commit.
// The returned instances of tiling.Commit do not contain a valid Author field.
func (b *BaselinerImpl) getCommitsSince(firstCommit *tiling.Commit) ([]*tiling.Commit, error) {
defer shared.NewMetricsTimer("baseliner_get_commits_since").Stop()
// If there is an underlying gitstore retrieve it, otherwise this function becomes a no-op.
gitStoreBased, ok := b.vcs.(gitstore.GitStoreBased)
if !ok {
return []*tiling.Commit{}, nil
gitStore := gitStoreBased.GetGitStore()
ctx := context.TODO()
startTime := time.Unix(firstCommit.CommitTime, 0)
endTime := startTime.Add(time.Second)
branch := b.vcs.GetBranch()
commits, err := gitStore.RangeByTime(ctx, startTime, endTime, branch)
if err != nil {
return nil, err
if len(commits) == 0 {
return nil, skerr.Fmt("No commits found while querying for commit %s", firstCommit.Hash)
var target *vcsinfo.IndexCommit
for _, c := range commits {
if c.Hash == firstCommit.Hash {
target = c
if target == nil {
return nil, skerr.Fmt("Commit %s not found in gitstore", firstCommit.Hash)
// Fetch all commits after the first one which we already have.
if commits, err = gitStore.RangeN(ctx, target.Index, int(math.MaxInt32), branch); err != nil {
return nil, err
ret := make([]*tiling.Commit, len(commits))
for idx, c := range commits {
// Note: For the task at hand we don't need to populate the Author field of tiling.Commit.
ret[idx] = &tiling.Commit{
Hash: c.Hash,
CommitTime: c.Timestamp.Unix(),
return ret[1:], nil
func (b *BaselinerImpl) getMasterExpectations(commitHash string) (*baseline.Baseline, error) {
rv := func() *baseline.Baseline {
defer b.mutex.RUnlock()
tileInfo := b.currTileInfo
// If no commit hash was given use current HEAD.
if commitHash == "" {
// If we have no tile yet, we cannot get the HEAD of it.
if tileInfo == nil {
return baseline.EmptyBaseline()
// Get the last commit that has data.
allCommits := tileInfo.AllCommits()
commitHash = allCommits[len(allCommits)-1].Hash
if base, ok := b.baselineCache.Get(commitHash); ok {
return base.(*baseline.Baseline).Copy()
return nil
if rv != nil {
return rv, nil
// We did not find it in the cache so lets load it from GCS.
ret, err := b.gStorageClient.ReadBaseline(commitHash, types.MasterBranch)
if err != nil {
return nil, err
// Look up the commit to see if it's valid.
if ret == nil {
// Load the commit and determine if it's on the current branch.
details, err := b.vcs.Details(context.TODO(), commitHash, true)
if err != nil {
return nil, err
// Get the branch we are tracking and make sure that the commit is in that branch.
branch := b.vcs.GetBranch()
if !details.Branches[branch] {
return nil, skerr.Fmt("Commit %s is not in branch %s", commitHash, branch)
// Make sure all expecations are up to date.
if ret, err = b.PushMasterBaselines(nil, commitHash); err != nil {
return nil, err
// Since we fetched from GCS - go ahead and store to cache.
defer b.mutex.Unlock()
b.baselineCache.Set(commitHash, ret, cache.DefaultExpiration)
return ret, nil
// Make sure BaselinerImpl fulfills the Baseliner interfaces
var _ baseline.BaselineWriter = (*BaselinerImpl)(nil)
var _ baseline.BaselineFetcher = (*BaselinerImpl)(nil)