blob: 1d1572f4c8819a5aa8dbda9d2b149caef45019a8 [file] [log] [blame]
package specs
import (
"bytes"
"encoding/gob"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"strings"
"sync"
"time"
"go.skia.org/infra/go/exec"
"go.skia.org/infra/go/git"
"go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/db"
)
const (
ISSUE_SHORT_LENGTH = 2
DEFAULT_TASK_SPEC_MAX_ATTEMPTS = db.DEFAULT_MAX_TASK_ATTEMPTS
DEFAULT_NUM_WORKERS = 10
TASKS_CFG_FILE = "infra/bots/tasks.json"
VARIABLE_SYNTAX = "<(%s)"
VARIABLE_CODEREVIEW_SERVER = "CODEREVIEW_SERVER"
VARIABLE_ISSUE = "ISSUE"
VARIABLE_ISSUE_SHORT = "ISSUE_SHORT"
VARIABLE_PATCH_REPO = "PATCH_REPO"
VARIABLE_PATCH_STORAGE = "PATCH_STORAGE"
VARIABLE_PATCHSET = "PATCHSET"
VARIABLE_REPO = "REPO"
VARIABLE_REVISION = "REVISION"
VARIABLE_TASK_NAME = "TASK_NAME"
)
var (
PLACEHOLDER_CODEREVIEW_SERVER = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_CODEREVIEW_SERVER)
PLACEHOLDER_ISSUE = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_ISSUE)
PLACEHOLDER_ISSUE_SHORT = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_ISSUE_SHORT)
PLACEHOLDER_PATCH_REPO = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_PATCH_REPO)
PLACEHOLDER_PATCH_STORAGE = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_PATCH_STORAGE)
PLACEHOLDER_PATCHSET = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_PATCHSET)
PLACEHOLDER_REPO = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_REPO)
PLACEHOLDER_REVISION = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_REVISION)
PLACEHOLDER_TASK_NAME = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_TASK_NAME)
PLACEHOLDER_ISOLATED_OUTDIR = "${ISOLATED_OUTDIR}"
)
// ParseTasksCfg parses the given task cfg file contents and returns the config.
func ParseTasksCfg(contents string) (*TasksCfg, error) {
var rv TasksCfg
if err := json.Unmarshal([]byte(contents), &rv); err != nil {
return nil, fmt.Errorf("Failed to read tasks cfg: could not parse file: %s\nContents:\n%s", err, string(contents))
}
if err := rv.Validate(); err != nil {
return nil, err
}
return &rv, nil
}
// EncoderTasksCfg writes the TasksCfg to a byte slice.
func EncodeTasksCfg(cfg *TasksCfg) ([]byte, error) {
// Encode the JSON config.
enc, err := json.MarshalIndent(cfg, "", " ")
if err != nil {
return nil, err
}
// The json package escapes HTML characters, which makes our output
// much less readable. Replace the escape characters with the real
// character.
enc = bytes.Replace(enc, []byte("\\u003c"), []byte("<"), -1)
// Add a newline to the end of the file. Most text editors add one, so
// adding one here enables manual editing of the file, even though we'd
// rather that not happen.
enc = append(enc, []byte("\n")...)
return enc, nil
}
// ReadTasksCfg reads the task cfg file from the given dir and returns it.
func ReadTasksCfg(repoDir string) (*TasksCfg, error) {
contents, err := ioutil.ReadFile(path.Join(repoDir, TASKS_CFG_FILE))
if err != nil {
return nil, fmt.Errorf("Failed to read tasks cfg: could not read file: %s", err)
}
return ParseTasksCfg(string(contents))
}
// WriteTasksCfg writes the task cfg to the given repo.
func WriteTasksCfg(cfg *TasksCfg, repoDir string) error {
enc, err := EncodeTasksCfg(cfg)
if err != nil {
return err
}
return ioutil.WriteFile(path.Join(repoDir, TASKS_CFG_FILE), enc, os.ModePerm)
}
// TasksCfg is a struct which describes all Swarming tasks for a repo at a
// particular commit.
type TasksCfg struct {
// Jobs is a map whose keys are JobSpec names and values are JobSpecs
// which describe sets of tasks to run.
Jobs map[string]*JobSpec `json:"jobs"`
// Tasks is a map whose keys are TaskSpec names and values are TaskSpecs
// detailing the Swarming tasks which may be run.
Tasks map[string]*TaskSpec `json:"tasks"`
}
// Validate returns an error if the TasksCfg is not valid.
func (c *TasksCfg) Validate() error {
for _, t := range c.Tasks {
if err := t.Validate(c); err != nil {
return err
}
}
if err := findCycles(c.Tasks, c.Jobs); err != nil {
return err
}
return nil
}
// TaskSpec is a struct which describes a Swarming task to run.
// Be sure to add any new fields to the Copy() method.
type TaskSpec struct {
// CipdPackages are CIPD packages which should be installed for the task.
CipdPackages []*CipdPackage `json:"cipd_packages,omitempty"`
// Dependencies are names of other TaskSpecs for tasks which need to run
// before this task.
Dependencies []string `json:"dependencies,omitempty"`
// Dimensions are Swarming bot dimensions which describe the type of bot
// which may run this task.
Dimensions []string `json:"dimensions"`
// Environment is a set of environment variables needed by the task.
Environment map[string]string `json:"environment,omitempty"`
// ExecutionTimeout is the maximum amount of time the task is allowed
// to take.
ExecutionTimeout time.Duration `json:"execution_timeout_ns,omitempty"`
// Expiration is how long the task may remain in the pending state
// before it is abandoned.
Expiration time.Duration `json:"expiration_ns,omitempty"`
// ExtraArgs are extra command-line arguments to pass to the task.
ExtraArgs []string `json:"extra_args,omitempty"`
// IoTimeout is the maximum amount of time which the task may take to
// communicate with the server.
IoTimeout time.Duration `json:"io_timeout_ns,omitempty"`
// Isolate is the name of the isolate file used by this task.
Isolate string `json:"isolate"`
// MaxAttempts is the maximum number of attempts for this TaskSpec. If
// zero, DEFAULT_TASK_SPEC_MAX_ATTEMPTS is used.
MaxAttempts int `json:"max_attempts,omitempty"`
// Priority indicates the relative priority of the task, with 0 < p <= 1
Priority float64 `json:"priority"`
}
// Validate ensures that the TaskSpec is defined properly.
func (t *TaskSpec) Validate(cfg *TasksCfg) error {
// Ensure that CIPD packages are specified properly.
for _, p := range t.CipdPackages {
if p.Name == "" || p.Path == "" {
return fmt.Errorf("CIPD packages must have a name, path, and version.")
}
}
// Ensure that the dimensions are specified properly.
for _, d := range t.Dimensions {
split := strings.SplitN(d, ":", 2)
if len(split) != 2 {
return fmt.Errorf("Dimension %q does not contain a colon!", d)
}
}
// Isolate file is required.
if t.Isolate == "" {
return fmt.Errorf("Isolate file is required.")
}
return nil
}
// Copy returns a copy of the TaskSpec.
func (t *TaskSpec) Copy() *TaskSpec {
var cipdPackages []*CipdPackage
if len(t.CipdPackages) > 0 {
cipdPackages = make([]*CipdPackage, 0, len(t.CipdPackages))
pkgs := make([]CipdPackage, len(t.CipdPackages))
for i, p := range t.CipdPackages {
pkgs[i] = *p
cipdPackages = append(cipdPackages, &pkgs[i])
}
}
deps := util.CopyStringSlice(t.Dependencies)
dims := util.CopyStringSlice(t.Dimensions)
environment := util.CopyStringMap(t.Environment)
extraArgs := util.CopyStringSlice(t.ExtraArgs)
return &TaskSpec{
CipdPackages: cipdPackages,
Dependencies: deps,
Dimensions: dims,
Environment: environment,
ExecutionTimeout: t.ExecutionTimeout,
Expiration: t.Expiration,
ExtraArgs: extraArgs,
IoTimeout: t.IoTimeout,
Isolate: t.Isolate,
MaxAttempts: t.MaxAttempts,
Priority: t.Priority,
}
}
// CipdPackage is a struct representing a CIPD package which needs to be
// installed on a bot for a particular task.
type CipdPackage struct {
Name string `json:"name"`
Path string `json:"path"`
Version string `json:"version"`
}
// JobSpec is a struct which describes a set of TaskSpecs to run as part of a
// larger effort.
type JobSpec struct {
Priority float64 `json:"priority"`
TaskSpecs []string `json:"tasks"`
Trigger string `json:"trigger,omitempty"`
}
// Copy returns a copy of the JobSpec.
func (j *JobSpec) Copy() *JobSpec {
var taskSpecs []string
if j.TaskSpecs != nil {
taskSpecs = make([]string, len(j.TaskSpecs))
copy(taskSpecs, j.TaskSpecs)
}
return &JobSpec{
Priority: j.Priority,
TaskSpecs: taskSpecs,
Trigger: j.Trigger,
}
}
// GetTaskSpecDAG returns a map describing all of the dependencies of the
// JobSpec. Its keys are TaskSpec names and values are TaskSpec names upon
// which the keys depend.
func (j *JobSpec) GetTaskSpecDAG(cfg *TasksCfg) (map[string][]string, error) {
rv := map[string][]string{}
var visit func(string) error
visit = func(name string) error {
if _, ok := rv[name]; ok {
return nil
}
spec, ok := cfg.Tasks[name]
if !ok {
return fmt.Errorf("No such task: %s", name)
}
deps := util.CopyStringSlice(spec.Dependencies)
if deps == nil {
deps = []string{}
}
rv[name] = deps
for _, t := range deps {
if err := visit(t); err != nil {
return err
}
}
return nil
}
for _, t := range j.TaskSpecs {
if err := visit(t); err != nil {
return nil, err
}
}
return rv, nil
}
// TaskCfgCache is a struct used for caching tasks cfg files. The user should
// periodically call Cleanup() to remove old entries.
type TaskCfgCache struct {
// protected by mtx
cache map[db.RepoState]*cacheEntry
depotToolsDir string
file string
mtx sync.RWMutex
// protected by mtx
addedTasksCache map[db.RepoState]util.StringSet
recentCommits map[string]time.Time
recentJobSpecs map[string]time.Time
recentMtx sync.RWMutex
recentTaskSpecs map[string]time.Time
repos repograph.Map
queue chan func(int)
workdir string
}
// gobTaskCfgCache is a struct used for (de)serializing TaskCfgCache instance.
type gobTaskCfgCache struct {
AddedTasksCache map[db.RepoState]util.StringSet
Cache map[db.RepoState]*cacheEntry
RecentCommits map[string]time.Time
RecentJobSpecs map[string]time.Time
RecentTaskSpecs map[string]time.Time
}
// NewTaskCfgCache returns a TaskCfgCache instance.
func NewTaskCfgCache(repos repograph.Map, depotToolsDir, workdir string, numWorkers int) (*TaskCfgCache, error) {
file := path.Join(workdir, "taskCfgCache.gob")
c := &TaskCfgCache{
depotToolsDir: depotToolsDir,
file: file,
queue: make(chan func(int)),
repos: repos,
workdir: workdir,
}
f, err := os.Open(file)
if err == nil {
var gobCache gobTaskCfgCache
if err := gob.NewDecoder(f).Decode(&gobCache); err != nil {
util.Close(f)
return nil, err
}
util.Close(f)
c.addedTasksCache = gobCache.AddedTasksCache
c.cache = gobCache.Cache
c.recentCommits = gobCache.RecentCommits
c.recentJobSpecs = gobCache.RecentJobSpecs
c.recentTaskSpecs = gobCache.RecentTaskSpecs
for _, e := range c.cache {
e.c = c
}
} else if !os.IsNotExist(err) {
return nil, fmt.Errorf("Failed to read cache file: %s", err)
} else {
c.cache = map[db.RepoState]*cacheEntry{}
c.addedTasksCache = map[db.RepoState]util.StringSet{}
c.recentCommits = map[string]time.Time{}
c.recentJobSpecs = map[string]time.Time{}
c.recentTaskSpecs = map[string]time.Time{}
}
for i := 0; i < numWorkers; i++ {
go func(i int) {
for f := range c.queue {
f(i)
}
}(i)
}
return c, nil
}
// Close frees up resources used by the TaskCfgCache.
func (c *TaskCfgCache) Close() error {
close(c.queue)
return nil
}
type cacheEntry struct {
c *TaskCfgCache
Cfg *TasksCfg
Err string
mtx sync.Mutex
Rs db.RepoState
}
func (e *cacheEntry) Get() (*TasksCfg, error) {
e.mtx.Lock()
defer e.mtx.Unlock()
if e.Cfg != nil {
return e.Cfg, nil
}
if e.Err != "" {
return nil, fmt.Errorf(e.Err)
}
// We haven't seen this RepoState before, or it's scrolled out of our
// window. Read it.
// Point the upstream to a local source of truth to eliminate network
// latency.
r, ok := e.c.repos[e.Rs.Repo]
if !ok {
return nil, fmt.Errorf("Unknown repo %q", e.Rs.Repo)
}
var cfg *TasksCfg
if err := e.c.TempGitRepo(e.Rs, e.Rs.IsTryJob(), func(checkout *git.TempCheckout) error {
var err error
cfg, err = ReadTasksCfg(checkout.Dir())
if err != nil {
// The tasks.cfg file may not exist for a particular commit.
if strings.Contains(err.Error(), "does not exist in") || strings.Contains(err.Error(), "exists on disk, but not in") || strings.Contains(err.Error(), "no such file or directory") {
// In this case, use an empty config.
cfg = &TasksCfg{
Tasks: map[string]*TaskSpec{},
}
} else {
return err
}
}
return nil
}); err != nil {
if strings.Contains(err.Error(), "error: Failed to merge in the changes.") {
e.Err = err.Error()
}
return nil, err
}
e.Cfg = cfg
// Write the commit and task specs into the recent lists.
// TODO(borenet): The below should probably go elsewhere.
e.c.recentMtx.Lock()
defer e.c.recentMtx.Unlock()
d := r.Get(e.Rs.Revision)
if d == nil {
return nil, fmt.Errorf("Unknown revision %s in %s", e.Rs.Revision, e.Rs.Repo)
}
ts := d.Timestamp
if ts.After(e.c.recentCommits[e.Rs.Revision]) {
e.c.recentCommits[e.Rs.Revision] = ts
}
for name := range cfg.Tasks {
if ts.After(e.c.recentTaskSpecs[name]) {
e.c.recentTaskSpecs[name] = ts
}
}
for name := range cfg.Jobs {
if ts.After(e.c.recentJobSpecs[name]) {
e.c.recentJobSpecs[name] = ts
}
}
e.c.mtx.Lock()
defer e.c.mtx.Unlock()
return cfg, e.c.write()
}
func (c *TaskCfgCache) getEntry(rs db.RepoState) *cacheEntry {
rv, ok := c.cache[rs]
if !ok {
rv = &cacheEntry{
c: c,
Rs: rs,
}
c.cache[rs] = rv
}
return rv
}
// ReadTasksCfg reads the task cfg file from the given RepoState and returns it.
// Stores a cache of already-read task cfg files. Syncs the repo and reads the
// file if needed.
func (c *TaskCfgCache) ReadTasksCfg(rs db.RepoState) (*TasksCfg, error) {
c.mtx.Lock()
entry := c.getEntry(rs)
c.mtx.Unlock()
return entry.Get()
}
// GetTaskSpecsForRepoStates returns a set of TaskSpecs for each of the
// given set of RepoStates, keyed by RepoState and TaskSpec name.
func (c *TaskCfgCache) GetTaskSpecsForRepoStates(rs []db.RepoState) (map[db.RepoState]map[string]*TaskSpec, error) {
c.mtx.Lock()
entries := make(map[db.RepoState]*cacheEntry, len(rs))
for _, s := range rs {
entries[s] = c.getEntry(s)
}
c.mtx.Unlock()
var m sync.Mutex
var wg sync.WaitGroup
rv := make(map[db.RepoState]map[string]*TaskSpec, len(rs))
errs := []error{}
for s, entry := range entries {
wg.Add(1)
go func(s db.RepoState, entry *cacheEntry) {
defer wg.Done()
cfg, err := entry.Get()
if err != nil {
m.Lock()
defer m.Unlock()
errs = append(errs, err)
return
}
// Make a copy of the task specs.
subMap := make(map[string]*TaskSpec, len(cfg.Tasks))
for name, task := range cfg.Tasks {
subMap[name] = task.Copy()
}
m.Lock()
defer m.Unlock()
rv[s] = subMap
}(s, entry)
}
wg.Wait()
if len(errs) > 0 {
return nil, fmt.Errorf("Errors loading task cfgs: %v", errs)
}
return rv, nil
}
// GetTaskSpec returns the TaskSpec at the given RepoState, or an error if no
// such TaskSpec exists.
func (c *TaskCfgCache) GetTaskSpec(rs db.RepoState, name string) (*TaskSpec, error) {
cfg, err := c.ReadTasksCfg(rs)
if err != nil {
return nil, err
}
t, ok := cfg.Tasks[name]
if !ok {
return nil, fmt.Errorf("No such task spec: %s @ %s", name, rs)
}
return t.Copy(), nil
}
// GetAddedTaskSpecsForRepoStates returns a mapping from each input RepoState to
// the set of task names that were added at that RepoState.
func (c *TaskCfgCache) GetAddedTaskSpecsForRepoStates(rss []db.RepoState) (map[db.RepoState]util.StringSet, error) {
rv := make(map[db.RepoState]util.StringSet, len(rss))
// todoParents collects the RepoStates in rss that are not in
// c.addedTasksCache. We also save the RepoStates' parents so we don't
// have to recompute them later.
todoParents := make(map[db.RepoState][]db.RepoState, 0)
// allTodoRs collects the RepoStates for which we need to look up
// TaskSpecs.
allTodoRs := []db.RepoState{}
if err := func() error {
c.mtx.RLock()
defer c.mtx.RUnlock()
for _, rs := range rss {
val, ok := c.addedTasksCache[rs]
if ok {
rv[rs] = val.Copy()
} else {
allTodoRs = append(allTodoRs, rs)
parents, err := rs.Parents(c.repos)
if err != nil {
return err
}
allTodoRs = append(allTodoRs, parents...)
todoParents[rs] = parents
}
}
return nil
}(); err != nil {
return nil, err
}
if len(todoParents) == 0 {
return rv, nil
}
taskSpecs, err := c.GetTaskSpecsForRepoStates(allTodoRs)
if err != nil {
return nil, err
}
c.mtx.Lock()
defer c.mtx.Unlock()
for cur, parents := range todoParents {
addedTasks := util.NewStringSet()
for task := range taskSpecs[cur] {
// If this revision has no parents, the task spec is added by this
// revision.
addedByCur := len(parents) == 0
for _, parent := range parents {
if _, ok := taskSpecs[parent][task]; !ok {
// If missing in parrent, the task spec is added by this revision.
addedByCur = true
break
}
}
if addedByCur {
addedTasks[task] = true
}
}
c.addedTasksCache[cur] = addedTasks.Copy()
rv[cur] = addedTasks
}
return rv, nil
}
// GetJobSpec returns the JobSpec at the given RepoState, or an error if no such
// JobSpec exists.
func (c *TaskCfgCache) GetJobSpec(rs db.RepoState, name string) (*JobSpec, error) {
cfg, err := c.ReadTasksCfg(rs)
if err != nil {
return nil, err
}
j, ok := cfg.Jobs[name]
if !ok {
return nil, fmt.Errorf("No such job spec: %s @ %s", name, rs)
}
return j.Copy(), nil
}
// MakeJob is a helper function which retrieves the given JobSpec at the given
// RepoState and uses it to create a Job instance.
func (c *TaskCfgCache) MakeJob(rs db.RepoState, name string) (*db.Job, error) {
cfg, err := c.ReadTasksCfg(rs)
if err != nil {
return nil, err
}
spec, ok := cfg.Jobs[name]
if !ok {
return nil, fmt.Errorf("No such job: %s", name)
}
deps, err := spec.GetTaskSpecDAG(cfg)
if err != nil {
return nil, err
}
return &db.Job{
Created: time.Now(),
Dependencies: deps,
Name: name,
Priority: spec.Priority,
RepoState: rs,
Tasks: map[string][]*db.TaskSummary{},
}, nil
}
// Cleanup removes cache entries which are outside of our scheduling window.
func (c *TaskCfgCache) Cleanup(period time.Duration) error {
c.mtx.Lock()
defer c.mtx.Unlock()
periodStart := time.Now().Add(-period)
for repoState := range c.cache {
details, err := repoState.GetCommit(c.repos)
if err != nil || details.Timestamp.Before(periodStart) {
delete(c.cache, repoState)
}
}
for repoState := range c.addedTasksCache {
details, err := repoState.GetCommit(c.repos)
if err != nil || details.Timestamp.Before(periodStart) {
delete(c.addedTasksCache, repoState)
}
}
c.recentMtx.Lock()
defer c.recentMtx.Unlock()
for k, ts := range c.recentCommits {
if ts.Before(periodStart) {
delete(c.recentCommits, k)
}
}
for k, ts := range c.recentTaskSpecs {
if ts.Before(periodStart) {
delete(c.recentTaskSpecs, k)
}
}
for k, ts := range c.recentJobSpecs {
if ts.Before(periodStart) {
delete(c.recentJobSpecs, k)
}
}
return c.write()
}
// write writes the TaskCfgCache to a file. Assumes the caller holds both c.mtx
// and c.recentMtx.
func (c *TaskCfgCache) write() error {
dir := path.Dir(c.file)
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return err
}
f, err := os.Create(c.file)
if err != nil {
return fmt.Errorf("Failed to create TaskCfgCache file: %s", err)
}
gobCache := gobTaskCfgCache{
AddedTasksCache: c.addedTasksCache,
Cache: c.cache,
RecentCommits: c.recentCommits,
RecentJobSpecs: c.recentJobSpecs,
RecentTaskSpecs: c.recentTaskSpecs,
}
if err := gob.NewEncoder(f).Encode(&gobCache); err != nil {
util.Close(f)
return fmt.Errorf("Failed to encode TaskCfgCache: %s", err)
}
return f.Close()
}
func stringMapKeys(m map[string]time.Time) []string {
rv := make([]string, 0, len(m))
for k := range m {
rv = append(rv, k)
}
return rv
}
// RecentSpecsAndCommits returns lists of recent job and task spec names and
// commit hashes.
func (c *TaskCfgCache) RecentSpecsAndCommits() ([]string, []string, []string) {
c.recentMtx.RLock()
defer c.recentMtx.RUnlock()
return stringMapKeys(c.recentJobSpecs), stringMapKeys(c.recentTaskSpecs), stringMapKeys(c.recentCommits)
}
// findCycles searches for cyclical dependencies in the task specs and returns
// an error if any are found. Also ensures that all task specs are reachable
// from at least one job spec and that all jobs specs' dependencies are valid.
func findCycles(tasks map[string]*TaskSpec, jobs map[string]*JobSpec) error {
// Create vertex objects with metadata for the depth-first search.
type vertex struct {
active bool
name string
ts *TaskSpec
visited bool
}
vertices := make(map[string]*vertex, len(tasks))
for name, t := range tasks {
vertices[name] = &vertex{
active: false,
name: name,
ts: t,
visited: false,
}
}
// visit performs a depth-first search of the graph, starting at v.
var visit func(*vertex) error
visit = func(v *vertex) error {
v.active = true
v.visited = true
for _, dep := range v.ts.Dependencies {
e := vertices[dep]
if e == nil {
return fmt.Errorf("Task %q has unknown task %q as a dependency.", v.name, dep)
}
if !e.visited {
if err := visit(e); err != nil {
return err
}
} else if e.active {
return fmt.Errorf("Found a circular dependency involving %q and %q", v.name, e.name)
}
}
v.active = false
return nil
}
// Perform a DFS, starting at each of the jobs' dependencies.
for jobName, j := range jobs {
for _, d := range j.TaskSpecs {
v, ok := vertices[d]
if !ok {
return fmt.Errorf("Job %q has unknown task %q as a dependency.", jobName, d)
}
if !v.visited {
if err := visit(v); err != nil {
return err
}
}
}
}
// If any vertices have not been visited, then there are tasks which
// no job has as a dependency. Report an error.
for _, v := range vertices {
if !v.visited {
return fmt.Errorf("Task %q is not reachable by any Job!", v.name)
}
}
return nil
}
// TempGitRepo creates a git repository in a temporary directory, gets it into
// the given RepoState, and runs the given function inside the repo dir.
//
// This method uses a worker pool; if all workers are busy, it will block until
// one is free.
func (c *TaskCfgCache) TempGitRepo(rs db.RepoState, botUpdate bool, fn func(*git.TempCheckout) error) error {
rvErr := make(chan error)
c.queue <- func(workerId int) {
var gr *git.TempCheckout
var err error
if botUpdate {
tmp, err2 := ioutil.TempDir("", "")
if err2 != nil {
rvErr <- err2
return
}
defer util.RemoveAll(tmp)
cacheDir := path.Join(c.workdir, "cache", fmt.Sprintf("%d", workerId))
gr, err = tempGitRepoBotUpdate(rs, c.depotToolsDir, cacheDir, tmp)
} else {
repo, ok := c.repos[rs.Repo]
if !ok {
rvErr <- fmt.Errorf("Unknown repo: %s", rs.Repo)
return
}
gr, err = tempGitRepo(repo.Repo(), rs)
}
if err != nil {
rvErr <- err
return
}
defer gr.Delete()
rvErr <- fn(gr)
}
return <-rvErr
}
// tempGitRepo creates a git repository in a temporary directory, gets it into
// the given RepoState, and returns its location.
func tempGitRepo(repo *git.Repo, rs db.RepoState) (rv *git.TempCheckout, rvErr error) {
if rs.IsTryJob() {
return nil, fmt.Errorf("specs.tempGitRepo does not apply patches, and should not be called for try jobs.")
}
c, err := repo.TempCheckout()
if err != nil {
return nil, err
}
defer func() {
if rvErr != nil {
c.Delete()
}
}()
// Check out the correct commit.
if _, err := c.Git("checkout", rs.Revision); err != nil {
return nil, err
}
return c, nil
}
// tempGitRepoBotUpdate creates a git repository in a temporary directory, gets it into
// the given RepoState, and returns its location.
func tempGitRepoBotUpdate(rs db.RepoState, depotToolsDir, gitCacheDir, tmp string) (*git.TempCheckout, error) {
// Run bot_update to obtain a checkout of the repo and its DEPS.
botUpdatePath := path.Join(depotToolsDir, "recipes", "recipe_modules", "bot_update", "resources", "bot_update.py")
projectName := strings.TrimSuffix(path.Base(rs.Repo), ".git")
spec := fmt.Sprintf("cache_dir = '%s'\nsolutions = [{'deps_file': '.DEPS.git', 'managed': False, 'name': '%s', 'url': '%s'}]", gitCacheDir, projectName, rs.Repo)
revMap := map[string]string{
projectName: "got_revision",
}
revisionMappingFile := path.Join(tmp, "revision_mapping")
revMapBytes, err := json.Marshal(revMap)
if err != nil {
return nil, err
}
if err := ioutil.WriteFile(revisionMappingFile, revMapBytes, os.ModePerm); err != nil {
return nil, err
}
patchRepo := rs.Repo
patchRepoName := projectName
if rs.PatchRepo != "" {
patchRepo = rs.PatchRepo
patchRepoName = strings.TrimSuffix(path.Base(rs.PatchRepo), ".git")
}
outputJson := path.Join(tmp, "output_json")
cmd := []string{
"python", "-u", botUpdatePath,
"--spec", spec,
"--patch_root", patchRepoName,
"--revision_mapping_file", revisionMappingFile,
"--git-cache-dir", gitCacheDir,
"--output_json", outputJson,
"--revision", fmt.Sprintf("%s@%s", projectName, rs.Revision),
}
if rs.IsTryJob() {
if strings.Contains(rs.Server, "codereview.chromium") {
cmd = append(cmd, []string{
"--issue", rs.Issue,
"--patchset", rs.Patchset,
}...)
} else {
gerritRef := fmt.Sprintf("refs/changes/%s/%s/%s", rs.Issue[len(rs.Issue)-2:], rs.Issue, rs.Patchset)
cmd = append(cmd, []string{
"--gerrit_repo", patchRepo,
"--gerrit_ref", gerritRef,
}...)
}
}
if _, err := exec.RunCommand(&exec.Command{
Name: cmd[0],
Args: cmd[1:],
Dir: tmp,
Env: []string{
fmt.Sprintf("PATH=%s:%s", depotToolsDir, os.Getenv("PATH")),
},
InheritEnv: true,
}); err != nil {
return nil, err
}
// bot_update points the upstream to a local cache. Point back to the
// "real" upstream, in case the caller cares about the remote URL. Note
// that this doesn't change the remote URLs for the DEPS.
co := &git.TempCheckout{
GitDir: git.GitDir(path.Join(tmp, projectName)),
}
if _, err := co.Git("remote", "set-url", "origin", rs.Repo); err != nil {
return nil, err
}
// Self-check.
head, err := co.RevParse("HEAD")
if err != nil {
return nil, err
}
if head != rs.Revision {
return nil, fmt.Errorf("TempGitRepo ended up at the wrong revision. Wanted %q but got %q", rs.Revision, head)
}
return co, nil
}