blob: 1a36452ddfa34cb77fa5eacdd4bf9365bc7be011 [file] [log] [blame]
package main
import (
"context"
"fmt"
"os"
"path"
"sync"
"time"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/git"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/specs"
)
// tasksPerCommitCache is a struct used for caching the number of task specs
// for various commits.
type tasksPerCommitCache struct {
cached map[db.RepoState]int
mtx sync.Mutex
period time.Duration
repos repograph.Map
tcc *specs.TaskCfgCache
}
// newTasksPerCommitCache returns a tasksPerCommitCache instance.
func newTasksPerCommitCache(workdir string, repoUrls []string, period time.Duration, ctx context.Context) (*tasksPerCommitCache, error) {
wd := path.Join(workdir, "tasksPerCommitCache")
if _, err := os.Stat(wd); err != nil {
if os.IsNotExist(err) {
if err := os.Mkdir(wd, os.ModePerm); err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("There is a problem with the workdir: %s", err)
}
}
repos, err := repograph.NewMap(repoUrls, wd)
if err != nil {
return nil, err
}
depotTools, err := git.NewCheckout(common.REPO_DEPOT_TOOLS, wd)
if err != nil {
return nil, err
}
if err := depotTools.Update(); err != nil {
return nil, err
}
gitCache := path.Join(wd, "cache")
tcc, err := specs.NewTaskCfgCache(repos, depotTools.Dir(), gitCache, 3)
if err != nil {
return nil, err
}
c := &tasksPerCommitCache{
cached: map[db.RepoState]int{},
period: period,
repos: repos,
tcc: tcc,
}
go util.RepeatCtx(time.Minute, ctx, func() {
if err := c.update(); err != nil {
sklog.Errorf("Failed to update tasksPerCommitCache: %s", err)
}
})
return c, nil
}
// Get returns the number of tasks expected to run at the given commit.
func (c *tasksPerCommitCache) Get(rs db.RepoState) (int, error) {
c.mtx.Lock()
defer c.mtx.Unlock()
if _, ok := c.cached[rs]; !ok {
// Find the number of TaskSpecs expected to run at this commit.
cfg, err := c.tcc.ReadTasksCfg(rs)
if err != nil {
return 0, err
}
tasksForCommit := make(map[string]bool, len(cfg.Tasks))
var recurse func(string)
recurse = func(taskSpec string) {
if tasksForCommit[taskSpec] {
return
}
tasksForCommit[taskSpec] = true
for _, d := range cfg.Tasks[taskSpec].Dependencies {
recurse(d)
}
}
for _, job := range cfg.Jobs {
if job.Trigger == "" {
for _, taskSpec := range job.TaskSpecs {
recurse(taskSpec)
}
}
}
c.cached[rs] = len(tasksForCommit)
}
return c.cached[rs], nil
}
// update pulls down new commits and evicts old entries from the cache.
func (c *tasksPerCommitCache) update() error {
if err := c.repos.Update(); err != nil {
return err
}
c.mtx.Lock()
defer c.mtx.Unlock()
start := time.Now().Add(-c.period)
for rs := range c.cached {
repo, ok := c.repos[rs.Repo]
if !ok {
return fmt.Errorf("No such repo: %s", rs.Repo)
}
commit := repo.Get(rs.Revision)
if commit == nil {
return fmt.Errorf("No such commit: %s in repo %s", rs.Revision, rs.Repo)
}
if commit.Timestamp.Before(start) {
delete(c.cached, rs)
}
}
return nil
}