package specs

import (
	"bytes"
	"encoding/json"
	"fmt"
	"os"
	"path"
	"sort"
	"strings"
	"time"

	"go.skia.org/infra/go/cas/rbe"
	"go.skia.org/infra/go/cipd"
	"go.skia.org/infra/go/periodic"
	"go.skia.org/infra/go/skerr"
	"go.skia.org/infra/go/util"
	"go.skia.org/infra/task_scheduler/go/types"
)

const (
	DEFAULT_TASK_SPEC_MAX_ATTEMPTS = types.DEFAULT_MAX_TASK_ATTEMPTS

	// The default JobSpec.Priority, when unspecified or invalid.
	DEFAULT_JOB_SPEC_PRIORITY = 0.5

	TASKS_CFG_FILE = "infra/bots/tasks.json"

	// Triggering configuration for jobs.

	// By default, all jobs trigger on any branch for which they are
	// defined.
	TRIGGER_ANY_BRANCH = ""
	// Run this job on the main branch only, even if it is defined on others.
	TRIGGER_MASTER_ONLY = "master"
	TRIGGER_MAIN_ONLY   = "main"
	// Trigger this job every night.
	TRIGGER_NIGHTLY = periodic.TRIGGER_NIGHTLY
	// Don't trigger this job automatically. It will only be run when
	// explicitly triggered via a try job or a force trigger.
	TRIGGER_ON_DEMAND = "on demand"
	// Trigger this job weekly.
	TRIGGER_WEEKLY = periodic.TRIGGER_WEEKLY

	VARIABLE_SYNTAX = "<(%s)"

	VARIABLE_BUILDBUCKET_BUILD_ID = "BUILDBUCKET_BUILD_ID"
	VARIABLE_CODEREVIEW_SERVER    = "CODEREVIEW_SERVER"
	VARIABLE_ISSUE                = "ISSUE"
	VARIABLE_ISSUE_INT            = "ISSUE_INT"
	VARIABLE_ISSUE_SHORT          = "ISSUE_SHORT"
	VARIABLE_PATCH_REF            = "PATCH_REF"
	VARIABLE_PATCH_REPO           = "PATCH_REPO"
	VARIABLE_PATCH_STORAGE        = "PATCH_STORAGE"
	VARIABLE_PATCHSET             = "PATCHSET"
	VARIABLE_PATCHSET_INT         = "PATCHSET_INT"
	VARIABLE_REPO                 = "REPO"
	VARIABLE_REVISION             = "REVISION"
	VARIABLE_TASK_ID              = "TASK_ID"
	VARIABLE_TASK_NAME            = "TASK_NAME"
)

var (
	// CIPD packages which may be used in tasks.
	CIPD_PKGS_GIT_LINUX_AMD64   = cipd.PkgsGit[cipd.PlatformLinuxAmd64]
	CIPD_PKGS_GIT_MAC_AMD64     = cipd.PkgsGit[cipd.PlatformMacAmd64]
	CIPD_PKGS_GIT_WINDOWS_AMD64 = cipd.PkgsGit[cipd.PlatformWindowsAmd64]
	CIPD_PKGS_GOLDCTL           = []*CipdPackage{cipd.MustGetPackage("skia/tools/goldctl/${platform}")}
	CIPD_PKGS_ISOLATE           = []*CipdPackage{
		cipd.MustGetPackage("infra/tools/luci/isolate/${platform}"),
		cipd.MustGetPackage("infra/tools/luci/isolated/${platform}"),
	}
	CIPD_PKGS_PYTHON_LINUX_AMD64   = cipd.PkgsPython[cipd.PlatformLinuxAmd64]
	CIPD_PKGS_PYTHON_WINDOWS_AMD64 = cipd.PkgsPython[cipd.PlatformWindowsAmd64]

	CIPD_PKGS_KITCHEN_LINUX_AMD64 = append([]*CipdPackage{
		cipd.MustGetPackage("infra/tools/luci/kitchen/${platform}"),
		cipd.MustGetPackage("infra/tools/luci-auth/${platform}"),
	}, CIPD_PKGS_PYTHON_LINUX_AMD64...)

	// Variable placeholders; these are replaced with the actual value
	// at task triggering time.
	PLACEHOLDER_BUILDBUCKET_BUILD_ID = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_BUILDBUCKET_BUILD_ID)
	PLACEHOLDER_CODEREVIEW_SERVER    = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_CODEREVIEW_SERVER)
	PLACEHOLDER_ISSUE                = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_ISSUE)
	PLACEHOLDER_ISSUE_INT            = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_ISSUE_INT)
	PLACEHOLDER_ISSUE_SHORT          = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_ISSUE_SHORT)
	PLACEHOLDER_PATCH_REF            = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_PATCH_REF)
	PLACEHOLDER_PATCH_REPO           = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_PATCH_REPO)
	PLACEHOLDER_PATCH_STORAGE        = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_PATCH_STORAGE)
	PLACEHOLDER_PATCHSET             = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_PATCHSET)
	PLACEHOLDER_PATCHSET_INT         = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_PATCHSET_INT)
	PLACEHOLDER_REPO                 = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_REPO)
	PLACEHOLDER_REVISION             = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_REVISION)
	PLACEHOLDER_TASK_ID              = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_TASK_ID)
	PLACEHOLDER_TASK_NAME            = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_TASK_NAME)
	PLACEHOLDER_ISOLATED_OUTDIR      = "${ISOLATED_OUTDIR}"

	PERIODIC_TRIGGERS = []string{TRIGGER_NIGHTLY, TRIGGER_WEEKLY}
)

// ErrorIsPermanent returns true if the given error cannot be recovered by
// retrying. In this case, we will never be able to process the TasksCfg,
// so we might as well cancel the jobs.
// TODO(borenet): This should probably be split into two different
// ErrorIsPermanent functions, in the syncer, and specs packages.
func ErrorIsPermanent(err error) bool {
	err = skerr.Unwrap(err)
	return (strings.Contains(err.Error(), "error: Failed to merge in the changes.") ||
		strings.Contains(err.Error(), "Failed to apply patch") ||
		strings.Contains(err.Error(), "Failed to read tasks cfg: could not parse file:") ||
		strings.Contains(err.Error(), "Invalid TasksCfg") ||
		strings.Contains(err.Error(), "The \"gclient_gn_args_from\" value must be in recursedeps") ||
		// This repo was moved, so attempts to sync it will always fail.
		strings.Contains(err.Error(), "https://skia.googlesource.com/third_party/libjpeg-turbo.git") ||
		strings.Contains(err.Error(), "no such file or directory") ||
		strings.Contains(err.Error(), "Not a valid object name"))
}

// ParseTasksCfg parses the given task cfg file contents and returns the config.
func ParseTasksCfg(contents string) (*TasksCfg, error) {
	var rv TasksCfg
	if err := json.Unmarshal([]byte(contents), &rv); err != nil {
		return nil, fmt.Errorf("Failed to read tasks cfg: could not parse file: %s\nContents:\n%s", err, string(contents))
	}
	if err := rv.Validate(); err != nil {
		return nil, err
	}

	return &rv, nil
}

// EncoderTasksCfg writes the TasksCfg to a byte slice.
func EncodeTasksCfg(cfg *TasksCfg) ([]byte, error) {
	// Encode the JSON config.
	enc, err := json.MarshalIndent(cfg, "", "  ")
	if err != nil {
		return nil, err
	}
	// The json package escapes HTML characters, which makes our output
	// much less readable. Replace the escape characters with the real
	// character.
	enc = bytes.Replace(enc, []byte("\\u003c"), []byte("<"), -1)

	// Add a newline to the end of the file. Most text editors add one, so
	// adding one here enables manual editing of the file, even though we'd
	// rather that not happen.
	enc = append(enc, []byte("\n")...)
	return enc, nil
}

// ReadTasksCfg reads the task cfg file from the given dir and returns it.
func ReadTasksCfg(repoDir string) (*TasksCfg, error) {
	contents, err := os.ReadFile(path.Join(repoDir, TASKS_CFG_FILE))
	if err != nil {
		// A nonexistent tasks.json file is valid; return an empty config.
		if os.IsNotExist(err) {
			return &TasksCfg{}, nil
		}
		return nil, fmt.Errorf("Failed to read tasks cfg: could not read file: %s", err)
	}
	return ParseTasksCfg(string(contents))
}

// WriteTasksCfg writes the task cfg to the given repo.
func WriteTasksCfg(cfg *TasksCfg, repoDir string) error {
	enc, err := EncodeTasksCfg(cfg)
	if err != nil {
		return err
	}
	return os.WriteFile(path.Join(repoDir, TASKS_CFG_FILE), enc, os.ModePerm)
}

// CommitQueueJobConfig describes how a job should run on the Commit Queue.
type CommitQueueJobConfig struct {
	// Run on the Commit Queue only if the change contains modifications to the
	// following location regexes.
	LocationRegexes []string `json:"location_regexes,omitempty"`
	// If this flag is true then the job is marked as being experimental. It will
	// be triggered on all CLs but their outcome will not affect the Commit Queue.
	// i.e. the experimental job could fail but if all other non-experimental job
	// have succeeded then the Commit Queue will succeed.
	Experimental bool `json:"experimental,omitempty"`
}

// Copy returns a deep copy of the CommitQueueJobConfig.
func (c *CommitQueueJobConfig) Copy() *CommitQueueJobConfig {
	locationRegexes := util.CopyStringSlice(c.LocationRegexes)
	return &CommitQueueJobConfig{
		LocationRegexes: locationRegexes,
		Experimental:    c.Experimental,
	}
}

// TasksCfg is a struct which describes all Swarming tasks for a repo at a
// particular commit.
type TasksCfg struct {
	// Jobs is a map whose keys are JobSpec names and values are JobSpecs
	// which describe sets of tasks to run.
	Jobs map[string]*JobSpec `json:"jobs"`

	// Tasks is a map whose keys are TaskSpec names and values are TaskSpecs
	// detailing the Swarming tasks which may be run.
	Tasks map[string]*TaskSpec `json:"tasks"`

	// CasSpecs is a map of named specifications for content-addressed inputs to
	// tasks.
	CasSpecs map[string]*CasSpec `json:"casSpecs,omitempty"`

	// CommitQueue is a map whose keys are JobSpec names and values are
	// CommitQueueJobConfig. All specified jobs will run on the Commit Queue.
	CommitQueue map[string]*CommitQueueJobConfig `json:"commit_queue,omitempty"`
}

// Copy returns a deep copy of the TasksCfg.
func (c *TasksCfg) Copy() *TasksCfg {
	jobs := make(map[string]*JobSpec, len(c.Jobs))
	for name, job := range c.Jobs {
		jobs[name] = job.Copy()
	}
	tasks := make(map[string]*TaskSpec, len(c.Tasks))
	for name, task := range c.Tasks {
		tasks[name] = task.Copy()
	}
	var casSpecs map[string]*CasSpec
	if c.CasSpecs != nil {
		casSpecs = make(map[string]*CasSpec, len(c.CasSpecs))
		for name, spec := range c.CasSpecs {
			casSpecs[name] = spec.Copy()
		}
	}
	var commitQueue map[string]*CommitQueueJobConfig
	if len(c.CommitQueue) > 0 {
		commitQueue = make(map[string]*CommitQueueJobConfig, len(c.CommitQueue))
		for k, v := range c.CommitQueue {
			commitQueue[k] = v.Copy()
		}
	}
	return &TasksCfg{
		Jobs:        jobs,
		Tasks:       tasks,
		CasSpecs:    casSpecs,
		CommitQueue: commitQueue,
	}
}

// Validate returns an error if the TasksCfg is not valid.
func (c *TasksCfg) Validate() error {
	// Validate all tasks.
	for name, t := range c.Tasks {
		if err := t.Validate(c); err != nil {
			return skerr.Fmt("Invalid TasksCfg: %s", err)
		}

		// Ensure that the CAS inputs to the task exist.
		if t.CasSpec == "" {
			return skerr.Fmt("Invalid TasksCfg: Task %q has no CasSpec.", name)
		}
		if _, ok := c.CasSpecs[t.CasSpec]; !ok {
			return skerr.Fmt("Invalid TasksCfg: Task %q references non-existent CasSpec %q", name, t.CasSpec)
		}
	}

	// Validate all jobs.
	for _, j := range c.Jobs {
		if err := j.Validate(); err != nil {
			return skerr.Fmt("Invalid TasksCfg: %s", err)
		}
	}
	// Ensure that the DAG is valid.
	if err := findCycles(c.Tasks, c.Jobs); err != nil {
		return skerr.Fmt("Invalid TasksCfg: %s", err)
	}

	// Ensure that CQ job names are valid.
	for cqJob := range c.CommitQueue {
		if _, ok := c.Jobs[cqJob]; !ok {
			return skerr.Fmt("Unknown job %q in CQ config", cqJob)
		}
	}

	// CD and non-CD jobs may not share any tasks.
	isCD := map[string]bool{}
	var checkTaskAndJobCD func(string, string) error
	checkTaskAndJobCD = func(taskName, jobName string) error {
		job := c.Jobs[jobName]
		task := c.Tasks[taskName]
		if taskIsCD, ok := isCD[taskName]; ok && taskIsCD != job.IsCD {
			return skerr.Fmt("Mixing CD and non-CD tasks: task %q wanted by job %q", taskName, jobName)
		}
		isCD[taskName] = job.IsCD
		for _, depName := range task.Dependencies {
			if err := checkTaskAndJobCD(depName, jobName); err != nil {
				return skerr.Wrap(err)
			}
		}
		return nil
	}
	// Sort by job name to make tests consistent.
	jobNames := make([]string, 0, len(c.Jobs))
	for jobName := range c.Jobs {
		jobNames = append(jobNames, jobName)
	}
	sort.Strings(jobNames)
	for _, jobName := range jobNames {
		job := c.Jobs[jobName]
		for _, taskName := range job.TaskSpecs {
			if err := checkTaskAndJobCD(taskName, jobName); err != nil {
				return skerr.Wrap(err)
			}
		}
	}

	// CD jobs may only trigger on the main or master branch.
	for jobName, job := range c.Jobs {
		if job.IsCD {
			if job.Trigger != TRIGGER_MAIN_ONLY && job.Trigger != TRIGGER_MASTER_ONLY {
				return skerr.Fmt("CD job %q must trigger on main/master branch only", jobName)
			}
		}
	}

	return nil
}

// TaskSpec is a struct which describes a Swarming task to run.
// Be sure to add any new fields to the Copy() method.
type TaskSpec struct {
	// Caches are named Swarming caches which should be used for this task.
	Caches []*Cache `json:"caches,omitempty"`

	// CasSpec references a named input to the task from content-addressed
	// storage.
	CasSpec string `json:"casSpec,omitempty"`

	// CipdPackages are CIPD packages which should be installed for the task.
	CipdPackages []*CipdPackage `json:"cipd_packages,omitempty"`

	// Command is the command to run in the Swarming task.
	Command []string `json:"command,omitempty"`

	// Dependencies are names of other TaskSpecs for tasks which need to run
	// before this task.
	Dependencies []string `json:"dependencies,omitempty"`

	// Dimensions are Swarming bot dimensions which describe the type of bot
	// which may run this task.
	Dimensions []string `json:"dimensions"`

	// Environment is a set of environment variables needed by the task.
	Environment map[string]string `json:"environment,omitempty"`

	// EnvPrefixes are prefixes to add to environment variables for the task,
	// for example, adding directories to PATH. Keys are environment variable
	// names and values are multiple values to add for the variable.
	EnvPrefixes map[string][]string `json:"env_prefixes,omitempty"`

	// ExecutionTimeout is the maximum amount of time the task is allowed
	// to take.
	ExecutionTimeout time.Duration `json:"execution_timeout_ns,omitempty"`

	// Expiration is how long the task may remain in the pending state
	// before it is abandoned.
	Expiration time.Duration `json:"expiration_ns,omitempty"`

	// ExtraArgs are extra command-line arguments to pass to the task.
	ExtraArgs []string `json:"extra_args,omitempty"`

	// ExtraTags are extra tags to add to the Swarming task.
	ExtraTags map[string]string `json:"extra_tags,omitempty"`

	// Idempotent indicates that triggering this task with the same
	// parameters as previously triggered has no side effect and thus the
	// task may be de-duplicated.
	Idempotent bool `json:"idempotent,omitempty"`

	// IoTimeout is the maximum amount of time which the task may take to
	// communicate with the server.
	IoTimeout time.Duration `json:"io_timeout_ns,omitempty"`

	// MaxAttempts is the maximum number of attempts for this TaskSpec. If
	// zero, DEFAULT_TASK_SPEC_MAX_ATTEMPTS is used.
	MaxAttempts int `json:"max_attempts,omitempty"`

	// Outputs are files and/or directories to use as outputs for the task.
	// Paths are relative to the task workdir. No error occurs if any of
	// these is missing.
	Outputs []string `json:"outputs,omitempty"`

	// This field is ignored.
	Priority float64 `json:"priority,omitempty"`

	// ServiceAccount indicates the Swarming service account to use for the
	// task. If not specified, we will attempt to choose a suitable default.
	ServiceAccount string `json:"service_account,omitempty"`

	// TaskExecutor specifies what type of task executor should handle the task.
	TaskExecutor string `json:"task_executor,omitempty"`
}

// Validate ensures that the TaskSpec is defined properly.
func (t *TaskSpec) Validate(cfg *TasksCfg) error {
	// Ensure that CIPD packages are specified properly.
	for _, p := range t.CipdPackages {
		if p.Name == "" || p.Path == "" {
			return fmt.Errorf("CIPD packages must have a name, path, and version.")
		}
	}

	if len(t.Dimensions) == 0 {
		return fmt.Errorf("Task must have dimensions")
	}

	// Ensure that the dimensions are specified properly.
	for _, d := range t.Dimensions {
		split := strings.SplitN(d, ":", 2)
		if len(split) != 2 {
			return fmt.Errorf("Dimension %q does not contain a colon!", d)
		}
	}

	if !util.In(t.TaskExecutor, types.ValidTaskExecutors) {
		return fmt.Errorf("Invalid task executor %q; must be one of: %v", t.TaskExecutor, types.ValidTaskExecutors)
	}

	return nil
}

// Copy returns a copy of the TaskSpec.
func (t *TaskSpec) Copy() *TaskSpec {
	var caches []*Cache
	if len(t.Caches) > 0 {
		cachesDup := make([]Cache, len(t.Caches))
		caches = make([]*Cache, 0, len(t.Caches))
		for i, c := range t.Caches {
			cachesDup[i] = *c
			caches = append(caches, &cachesDup[i])
		}
	}
	var cipdPackages []*CipdPackage
	if len(t.CipdPackages) > 0 {
		cipdPackages = make([]*CipdPackage, 0, len(t.CipdPackages))
		pkgs := make([]CipdPackage, len(t.CipdPackages))
		for i, p := range t.CipdPackages {
			pkgs[i] = *p
			cipdPackages = append(cipdPackages, &pkgs[i])
		}
	}
	cmd := util.CopyStringSlice(t.Command)
	deps := util.CopyStringSlice(t.Dependencies)
	dims := util.CopyStringSlice(t.Dimensions)
	environment := util.CopyStringMap(t.Environment)
	var envPrefixes map[string][]string
	if len(t.EnvPrefixes) > 0 {
		envPrefixes = make(map[string][]string, len(t.EnvPrefixes))
		for k, v := range t.EnvPrefixes {
			envPrefixes[k] = util.CopyStringSlice(v)
		}
	}
	extraArgs := util.CopyStringSlice(t.ExtraArgs)
	extraTags := util.CopyStringMap(t.ExtraTags)
	outputs := util.CopyStringSlice(t.Outputs)
	return &TaskSpec{
		Caches:           caches,
		CasSpec:          t.CasSpec,
		CipdPackages:     cipdPackages,
		Command:          cmd,
		Dependencies:     deps,
		Dimensions:       dims,
		Environment:      environment,
		EnvPrefixes:      envPrefixes,
		ExecutionTimeout: t.ExecutionTimeout,
		Expiration:       t.Expiration,
		ExtraArgs:        extraArgs,
		ExtraTags:        extraTags,
		Idempotent:       t.Idempotent,
		IoTimeout:        t.IoTimeout,
		MaxAttempts:      t.MaxAttempts,
		Outputs:          outputs,
		Priority:         t.Priority,
		ServiceAccount:   t.ServiceAccount,
		TaskExecutor:     t.TaskExecutor,
	}
}

// Cache is a struct representing a named cache which is used by a task.
type Cache struct {
	Name string `json:"name"`
	Path string `json:"path"`
}

// CipdPackage is a struct representing a CIPD package which needs to be
// installed on a bot for a particular task.
// TODO(borenet): Are there any downsides to using an alias rather than a new
// type here?
type CipdPackage = cipd.Package

// JobSpec is a struct which describes a set of TaskSpecs to run as part of a
// larger effort.
type JobSpec struct {
	// IsCD indicates whether this job is a Continuous Deployment pipeline. If
	// true, this job is not allowed to be triggered as a try job, no backfills
	// of tasks are run (ie. only the newest Task Candidate runs), and its tasks
	// run on a special pool of machines which are dedicated only to CD tasks.
	IsCD bool `json:"is_cd,omitempty"`
	// Priority indicates the relative priority of the job, with 0 < p <= 1,
	// where higher values result in scheduling the job's tasks sooner. If
	// unspecified or outside this range, DEFAULT_JOB_SPEC_PRIORITY is used.
	// Each task derives its priority from the set of jobs that depend upon
	// it. A task's priority is
	//   1 - (1-<job1 priority>)(1-<job2 priority>)...(1-<jobN priority>)
	// A task at HEAD with a priority of 1 and a blamelist of 1 commit has
	// approximately the same score as a task at HEAD with a priority of 0.57
	// and a blamelist of 2 commits.
	// A backfill task with a priority of 1 that bisects a blamelist of 2
	// commits has the same score as another backfill task at the same
	// commit with a priority of 0.4 that bisects a blamelist of 4 commits.
	Priority float64 `json:"priority,omitempty"`
	// The names of TaskSpecs that are direct dependencies of this JobSpec.
	TaskSpecs []string `json:"tasks"`
	// One of the TRIGGER_* constants; see documentation above.
	Trigger string `json:"trigger,omitempty"`
}

// Validate returns an error if the JobSpec is not valid.
func (j *JobSpec) Validate() error {
	// We can't validate j.TaskSpecs here because we don't know which are
	// defined.  Therefore, that check needs to occur at a higher level.

	switch j.Trigger {
	case TRIGGER_ANY_BRANCH, TRIGGER_MASTER_ONLY, TRIGGER_MAIN_ONLY,
		TRIGGER_NIGHTLY, TRIGGER_ON_DEMAND, TRIGGER_WEEKLY:
		break
	default:
		return fmt.Errorf("Invalid job trigger %q", j.Trigger)
	}
	return nil
}

// Copy returns a copy of the JobSpec.
func (j *JobSpec) Copy() *JobSpec {
	var taskSpecs []string
	if j.TaskSpecs != nil {
		taskSpecs = make([]string, len(j.TaskSpecs))
		copy(taskSpecs, j.TaskSpecs)
	}
	return &JobSpec{
		IsCD:      j.IsCD,
		Priority:  j.Priority,
		TaskSpecs: taskSpecs,
		Trigger:   j.Trigger,
	}
}

// GetTaskSpecDAG returns a map describing all of the dependencies of the
// JobSpec. Its keys are TaskSpec names and values are TaskSpec names upon
// which the keys depend.
func (j *JobSpec) GetTaskSpecDAG(cfg *TasksCfg) (map[string][]string, error) {
	rv := map[string][]string{}
	var visit func(string) error
	visit = func(name string) error {
		if _, ok := rv[name]; ok {
			return nil
		}
		spec, ok := cfg.Tasks[name]
		if !ok {
			return fmt.Errorf("No such task: %s", name)
		}
		deps := util.CopyStringSlice(spec.Dependencies)
		if deps == nil {
			deps = []string{}
		}
		rv[name] = deps
		for _, t := range deps {
			if err := visit(t); err != nil {
				return err
			}
		}
		return nil
	}

	for _, t := range j.TaskSpecs {
		if err := visit(t); err != nil {
			return nil, err
		}
	}
	return rv, nil
}

// findCycles searches for cyclical dependencies in the task specs and returns
// an error if any are found. Also ensures that all task specs are reachable
// from at least one job spec and that all jobs specs' dependencies are valid.
func findCycles(tasks map[string]*TaskSpec, jobs map[string]*JobSpec) error {
	// Create vertex objects with metadata for the depth-first search.
	type vertex struct {
		active  bool
		name    string
		ts      *TaskSpec
		visited bool
	}
	vertices := make(map[string]*vertex, len(tasks))
	for name, t := range tasks {
		vertices[name] = &vertex{
			active:  false,
			name:    name,
			ts:      t,
			visited: false,
		}
	}

	// visit performs a depth-first search of the graph, starting at v.
	var visit func(*vertex) error
	visit = func(v *vertex) error {
		v.active = true
		v.visited = true
		for _, dep := range v.ts.Dependencies {
			e := vertices[dep]
			if e == nil {
				return fmt.Errorf("Task %q has unknown task %q as a dependency.", v.name, dep)
			}
			if !e.visited {
				if err := visit(e); err != nil {
					return err
				}
			} else if e.active {
				return fmt.Errorf("Found a circular dependency involving %q and %q", v.name, e.name)
			}
		}
		v.active = false
		return nil
	}

	// Perform a DFS, starting at each of the jobs' dependencies.
	for jobName, j := range jobs {
		for _, d := range j.TaskSpecs {
			v, ok := vertices[d]
			if !ok {
				return fmt.Errorf("Job %q has unknown task %q as a dependency.", jobName, d)
			}
			if !v.visited {
				if err := visit(v); err != nil {
					return err
				}
			}
		}
	}

	// If any vertices have not been visited, then there are tasks which
	// no job has as a dependency. Report an error.
	for _, v := range vertices {
		if !v.visited {
			return fmt.Errorf("Task %q is not reachable by any Job!", v.name)
		}
	}
	return nil
}

// CasSpec describes a set of task inputs in content-addressed storage.
type CasSpec struct {
	Root     string   `json:"root,omitempty"`
	Paths    []string `json:"paths,omitempty"`
	Excludes []string `json:"excludes,omitempty"`
	Digest   string   `json:"digest,omitempty"`
}

// Copy returns a deep copy of the CasSpec.
func (s *CasSpec) Copy() *CasSpec {
	return &CasSpec{
		Root:     s.Root,
		Paths:    util.CopyStringSlice(s.Paths),
		Excludes: util.CopyStringSlice(s.Excludes),
		Digest:   s.Digest,
	}
}

// Validate returns an error if the CasSpec is invalid.
func (s *CasSpec) Validate() error {
	if s.Root == "" && len(s.Paths) == 0 {
		if _, _, err := rbe.StringToDigest(s.Digest); err != nil {
			return skerr.Wrap(err)
		}
		return nil
	}
	if (s.Root != "") != (len(s.Paths) > 0) {
		return skerr.Fmt("Root and Paths must be specified together.")
	}
	return nil
}

func Python3LinuxAMD64CIPDPackages() []*cipd.Package {
	var python3Pkgs []*cipd.Package
	for _, p := range cipd.PkgsPython[cipd.PlatformLinuxAmd64] {
		if strings.HasPrefix(p.Version, "version:2@2.7") {
			continue
		}
		python3Pkgs = append(python3Pkgs, p)
	}
	return python3Pkgs
}

func Python3WindowsAMD64CIPDPackages() []*cipd.Package {
	var python3Pkgs []*cipd.Package
	for _, p := range cipd.PkgsPython[cipd.PlatformWindowsAmd64] {
		if strings.HasPrefix(p.Version, "version:2@2.7") {
			continue
		}
		python3Pkgs = append(python3Pkgs, p)
	}
	return python3Pkgs
}
