|  | package specs | 
|  |  | 
|  | import ( | 
|  | "bytes" | 
|  | "encoding/json" | 
|  | "fmt" | 
|  | "os" | 
|  | "path" | 
|  | "strings" | 
|  | "time" | 
|  |  | 
|  | "go.skia.org/infra/go/cas/rbe" | 
|  | "go.skia.org/infra/go/cipd" | 
|  | "go.skia.org/infra/go/periodic" | 
|  | "go.skia.org/infra/go/skerr" | 
|  | "go.skia.org/infra/go/util" | 
|  | "go.skia.org/infra/task_scheduler/go/types" | 
|  | ) | 
|  |  | 
|  | const ( | 
|  | DEFAULT_TASK_SPEC_MAX_ATTEMPTS = types.DEFAULT_MAX_TASK_ATTEMPTS | 
|  |  | 
|  | // The default JobSpec.Priority, when unspecified or invalid. | 
|  | DEFAULT_JOB_SPEC_PRIORITY = 0.5 | 
|  |  | 
|  | TASKS_CFG_FILE = "infra/bots/tasks.json" | 
|  |  | 
|  | // Triggering configuration for jobs. | 
|  |  | 
|  | // By default, all jobs trigger on any branch for which they are | 
|  | // defined. | 
|  | TRIGGER_ANY_BRANCH = "" | 
|  | // Run this job on the main branch only, even if it is defined on others. | 
|  | TRIGGER_MASTER_ONLY = "master" | 
|  | TRIGGER_MAIN_ONLY   = "main" | 
|  | // Trigger this job every night. | 
|  | TRIGGER_NIGHTLY = periodic.TRIGGER_NIGHTLY | 
|  | // Don't trigger this job automatically. It will only be run when | 
|  | // explicitly triggered via a try job or a force trigger. | 
|  | TRIGGER_ON_DEMAND = "on demand" | 
|  | // Trigger this job weekly. | 
|  | TRIGGER_WEEKLY = periodic.TRIGGER_WEEKLY | 
|  |  | 
|  | VARIABLE_SYNTAX = "<(%s)" | 
|  |  | 
|  | VARIABLE_BUILDBUCKET_BUILD_ID = "BUILDBUCKET_BUILD_ID" | 
|  | VARIABLE_CODEREVIEW_SERVER    = "CODEREVIEW_SERVER" | 
|  | VARIABLE_ISSUE                = "ISSUE" | 
|  | VARIABLE_ISSUE_INT            = "ISSUE_INT" | 
|  | VARIABLE_ISSUE_SHORT          = "ISSUE_SHORT" | 
|  | VARIABLE_PATCH_REF            = "PATCH_REF" | 
|  | VARIABLE_PATCH_REPO           = "PATCH_REPO" | 
|  | VARIABLE_PATCH_STORAGE        = "PATCH_STORAGE" | 
|  | VARIABLE_PATCHSET             = "PATCHSET" | 
|  | VARIABLE_PATCHSET_INT         = "PATCHSET_INT" | 
|  | VARIABLE_REPO                 = "REPO" | 
|  | VARIABLE_REVISION             = "REVISION" | 
|  | VARIABLE_TASK_ID              = "TASK_ID" | 
|  | VARIABLE_TASK_NAME            = "TASK_NAME" | 
|  | ) | 
|  |  | 
|  | var ( | 
|  | // CIPD packages which may be used in tasks. | 
|  | CIPD_PKGS_GIT_LINUX_AMD64   = cipd.PkgsGit[cipd.PlatformLinuxAmd64] | 
|  | CIPD_PKGS_GIT_MAC_AMD64     = cipd.PkgsGit[cipd.PlatformMacAmd64] | 
|  | CIPD_PKGS_GIT_WINDOWS_AMD64 = cipd.PkgsGit[cipd.PlatformWindowsAmd64] | 
|  | CIPD_PKGS_GOLDCTL           = []*CipdPackage{cipd.MustGetPackage("skia/tools/goldctl/${platform}")} | 
|  | CIPD_PKGS_ISOLATE           = []*CipdPackage{ | 
|  | cipd.MustGetPackage("infra/tools/luci/isolate/${platform}"), | 
|  | } | 
|  | CIPD_PKGS_PYTHON_LINUX_AMD64   = cipd.PkgsPython[cipd.PlatformLinuxAmd64] | 
|  | CIPD_PKGS_PYTHON_WINDOWS_AMD64 = cipd.PkgsPython[cipd.PlatformWindowsAmd64] | 
|  |  | 
|  | CIPD_PKGS_KITCHEN_LINUX_AMD64 = append([]*CipdPackage{ | 
|  | cipd.MustGetPackage("infra/tools/luci/kitchen/${platform}"), | 
|  | cipd.MustGetPackage("infra/tools/luci-auth/${platform}"), | 
|  | }, CIPD_PKGS_PYTHON_LINUX_AMD64...) | 
|  |  | 
|  | // Variable placeholders; these are replaced with the actual value | 
|  | // at task triggering time. | 
|  | PLACEHOLDER_BUILDBUCKET_BUILD_ID = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_BUILDBUCKET_BUILD_ID) | 
|  | PLACEHOLDER_CODEREVIEW_SERVER    = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_CODEREVIEW_SERVER) | 
|  | PLACEHOLDER_ISSUE                = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_ISSUE) | 
|  | PLACEHOLDER_ISSUE_INT            = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_ISSUE_INT) | 
|  | PLACEHOLDER_ISSUE_SHORT          = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_ISSUE_SHORT) | 
|  | PLACEHOLDER_PATCH_REF            = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_PATCH_REF) | 
|  | PLACEHOLDER_PATCH_REPO           = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_PATCH_REPO) | 
|  | PLACEHOLDER_PATCH_STORAGE        = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_PATCH_STORAGE) | 
|  | PLACEHOLDER_PATCHSET             = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_PATCHSET) | 
|  | PLACEHOLDER_PATCHSET_INT         = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_PATCHSET_INT) | 
|  | PLACEHOLDER_REPO                 = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_REPO) | 
|  | PLACEHOLDER_REVISION             = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_REVISION) | 
|  | PLACEHOLDER_TASK_ID              = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_TASK_ID) | 
|  | PLACEHOLDER_TASK_NAME            = fmt.Sprintf(VARIABLE_SYNTAX, VARIABLE_TASK_NAME) | 
|  | PLACEHOLDER_ISOLATED_OUTDIR      = "${ISOLATED_OUTDIR}" | 
|  |  | 
|  | PERIODIC_TRIGGERS = []string{TRIGGER_NIGHTLY, TRIGGER_WEEKLY} | 
|  | ) | 
|  |  | 
|  | // ErrorIsPermanent returns true if the given error cannot be recovered by | 
|  | // retrying. In this case, we will never be able to process the TasksCfg, | 
|  | // so we might as well cancel the jobs. | 
|  | // TODO(borenet): This should probably be split into two different | 
|  | // ErrorIsPermanent functions, in the syncer, and specs packages. | 
|  | func ErrorIsPermanent(err error) bool { | 
|  | err = skerr.Unwrap(err) | 
|  | return (strings.Contains(err.Error(), "error: Failed to merge in the changes.") || | 
|  | strings.Contains(err.Error(), "Failed to apply patch") || | 
|  | strings.Contains(err.Error(), "Failed to read tasks cfg: could not parse file:") || | 
|  | strings.Contains(err.Error(), "Invalid TasksCfg") || | 
|  | strings.Contains(err.Error(), "The \"gclient_gn_args_from\" value must be in recursedeps") || | 
|  | // This repo was moved, so attempts to sync it will always fail. | 
|  | strings.Contains(err.Error(), "https://skia.googlesource.com/third_party/libjpeg-turbo.git") || | 
|  | strings.Contains(err.Error(), "no such file or directory") || | 
|  | strings.Contains(err.Error(), "Not a valid object name")) | 
|  | } | 
|  |  | 
|  | // ParseTasksCfg parses the given task cfg file contents and returns the config. | 
|  | func ParseTasksCfg(contents string) (*TasksCfg, error) { | 
|  | var rv TasksCfg | 
|  | if err := json.Unmarshal([]byte(contents), &rv); err != nil { | 
|  | return nil, fmt.Errorf("Failed to read tasks cfg: could not parse file: %s\nContents:\n%s", err, string(contents)) | 
|  | } | 
|  | if err := rv.Validate(); err != nil { | 
|  | return nil, err | 
|  | } | 
|  |  | 
|  | return &rv, nil | 
|  | } | 
|  |  | 
|  | // EncoderTasksCfg writes the TasksCfg to a byte slice. | 
|  | func EncodeTasksCfg(cfg *TasksCfg) ([]byte, error) { | 
|  | // Encode the JSON config. | 
|  | enc, err := json.MarshalIndent(cfg, "", "  ") | 
|  | if err != nil { | 
|  | return nil, err | 
|  | } | 
|  | // The json package escapes HTML characters, which makes our output | 
|  | // much less readable. Replace the escape characters with the real | 
|  | // character. | 
|  | enc = bytes.Replace(enc, []byte("\\u003c"), []byte("<"), -1) | 
|  |  | 
|  | // Add a newline to the end of the file. Most text editors add one, so | 
|  | // adding one here enables manual editing of the file, even though we'd | 
|  | // rather that not happen. | 
|  | enc = append(enc, []byte("\n")...) | 
|  | return enc, nil | 
|  | } | 
|  |  | 
|  | // ReadTasksCfg reads the task cfg file from the given dir and returns it. | 
|  | func ReadTasksCfg(repoDir string) (*TasksCfg, error) { | 
|  | contents, err := os.ReadFile(path.Join(repoDir, TASKS_CFG_FILE)) | 
|  | if err != nil { | 
|  | // A nonexistent tasks.json file is valid; return an empty config. | 
|  | if os.IsNotExist(err) { | 
|  | return &TasksCfg{}, nil | 
|  | } | 
|  | return nil, fmt.Errorf("Failed to read tasks cfg: could not read file: %s", err) | 
|  | } | 
|  | return ParseTasksCfg(string(contents)) | 
|  | } | 
|  |  | 
|  | // WriteTasksCfg writes the task cfg to the given repo. | 
|  | func WriteTasksCfg(cfg *TasksCfg, repoDir string) error { | 
|  | enc, err := EncodeTasksCfg(cfg) | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  | return os.WriteFile(path.Join(repoDir, TASKS_CFG_FILE), enc, os.ModePerm) | 
|  | } | 
|  |  | 
|  | // CommitQueueJobConfig describes how a job should run on the Commit Queue. | 
|  | type CommitQueueJobConfig struct { | 
|  | // Run on the Commit Queue only if the change contains modifications to the | 
|  | // following location regexes. | 
|  | LocationRegexes []string `json:"location_regexes,omitempty"` | 
|  | // If this flag is true then the job is marked as being experimental. It will | 
|  | // be triggered on all CLs but their outcome will not affect the Commit Queue. | 
|  | // i.e. the experimental job could fail but if all other non-experimental job | 
|  | // have succeeded then the Commit Queue will succeed. | 
|  | Experimental bool `json:"experimental,omitempty"` | 
|  | } | 
|  |  | 
|  | // Copy returns a deep copy of the CommitQueueJobConfig. | 
|  | func (c *CommitQueueJobConfig) Copy() *CommitQueueJobConfig { | 
|  | locationRegexes := util.CopyStringSlice(c.LocationRegexes) | 
|  | return &CommitQueueJobConfig{ | 
|  | LocationRegexes: locationRegexes, | 
|  | Experimental:    c.Experimental, | 
|  | } | 
|  | } | 
|  |  | 
|  | // TasksCfg is a struct which describes all Swarming tasks for a repo at a | 
|  | // particular commit. | 
|  | type TasksCfg struct { | 
|  | // Jobs is a map whose keys are JobSpec names and values are JobSpecs | 
|  | // which describe sets of tasks to run. | 
|  | Jobs map[string]*JobSpec `json:"jobs"` | 
|  |  | 
|  | // Tasks is a map whose keys are TaskSpec names and values are TaskSpecs | 
|  | // detailing the Swarming tasks which may be run. | 
|  | Tasks map[string]*TaskSpec `json:"tasks"` | 
|  |  | 
|  | // CasSpecs is a map of named specifications for content-addressed inputs to | 
|  | // tasks. | 
|  | CasSpecs map[string]*CasSpec `json:"casSpecs,omitempty"` | 
|  |  | 
|  | // CommitQueue is a map whose keys are JobSpec names and values are | 
|  | // CommitQueueJobConfig. All specified jobs will run on the Commit Queue. | 
|  | CommitQueue map[string]*CommitQueueJobConfig `json:"commit_queue,omitempty"` | 
|  | } | 
|  |  | 
|  | // Copy returns a deep copy of the TasksCfg. | 
|  | func (c *TasksCfg) Copy() *TasksCfg { | 
|  | jobs := make(map[string]*JobSpec, len(c.Jobs)) | 
|  | for name, job := range c.Jobs { | 
|  | jobs[name] = job.Copy() | 
|  | } | 
|  | tasks := make(map[string]*TaskSpec, len(c.Tasks)) | 
|  | for name, task := range c.Tasks { | 
|  | tasks[name] = task.Copy() | 
|  | } | 
|  | var casSpecs map[string]*CasSpec | 
|  | if c.CasSpecs != nil { | 
|  | casSpecs = make(map[string]*CasSpec, len(c.CasSpecs)) | 
|  | for name, spec := range c.CasSpecs { | 
|  | casSpecs[name] = spec.Copy() | 
|  | } | 
|  | } | 
|  | var commitQueue map[string]*CommitQueueJobConfig | 
|  | if len(c.CommitQueue) > 0 { | 
|  | commitQueue = make(map[string]*CommitQueueJobConfig, len(c.CommitQueue)) | 
|  | for k, v := range c.CommitQueue { | 
|  | commitQueue[k] = v.Copy() | 
|  | } | 
|  | } | 
|  | return &TasksCfg{ | 
|  | Jobs:        jobs, | 
|  | Tasks:       tasks, | 
|  | CasSpecs:    casSpecs, | 
|  | CommitQueue: commitQueue, | 
|  | } | 
|  | } | 
|  |  | 
|  | // Validate returns an error if the TasksCfg is not valid. | 
|  | func (c *TasksCfg) Validate() error { | 
|  | // Validate all tasks. | 
|  | for name, t := range c.Tasks { | 
|  | if err := t.Validate(c); err != nil { | 
|  | return skerr.Fmt("Invalid TasksCfg: %s", err) | 
|  | } | 
|  |  | 
|  | // Ensure that the CAS inputs to the task exist. | 
|  | if t.CasSpec == "" { | 
|  | return skerr.Fmt("Invalid TasksCfg: Task %q has no CasSpec.", name) | 
|  | } | 
|  | if _, ok := c.CasSpecs[t.CasSpec]; !ok { | 
|  | return skerr.Fmt("Invalid TasksCfg: Task %q references non-existent CasSpec %q", name, t.CasSpec) | 
|  | } | 
|  | } | 
|  |  | 
|  | // Validate all jobs. | 
|  | for _, j := range c.Jobs { | 
|  | if err := j.Validate(); err != nil { | 
|  | return skerr.Fmt("Invalid TasksCfg: %s", err) | 
|  | } | 
|  | } | 
|  | // Ensure that the DAG is valid. | 
|  | if err := findCycles(c.Tasks, c.Jobs); err != nil { | 
|  | return skerr.Fmt("Invalid TasksCfg: %s", err) | 
|  | } | 
|  |  | 
|  | // Ensure that CQ job names are valid. | 
|  | for cqJob := range c.CommitQueue { | 
|  | if _, ok := c.Jobs[cqJob]; !ok { | 
|  | return skerr.Fmt("Unknown job %q in CQ config", cqJob) | 
|  | } | 
|  | } | 
|  |  | 
|  | return nil | 
|  | } | 
|  |  | 
|  | // TaskSpec is a struct which describes a Swarming task to run. | 
|  | // Be sure to add any new fields to the Copy() method. | 
|  | type TaskSpec struct { | 
|  | // Caches are named Swarming caches which should be used for this task. | 
|  | Caches []*Cache `json:"caches,omitempty"` | 
|  |  | 
|  | // CasSpec references a named input to the task from content-addressed | 
|  | // storage. | 
|  | CasSpec string `json:"casSpec,omitempty"` | 
|  |  | 
|  | // CipdPackages are CIPD packages which should be installed for the task. | 
|  | CipdPackages []*CipdPackage `json:"cipd_packages,omitempty"` | 
|  |  | 
|  | // Command is the command to run in the Swarming task. | 
|  | Command []string `json:"command,omitempty"` | 
|  |  | 
|  | // Dependencies are names of other TaskSpecs for tasks which need to run | 
|  | // before this task. | 
|  | Dependencies []string `json:"dependencies,omitempty"` | 
|  |  | 
|  | // Dimensions are Swarming bot dimensions which describe the type of bot | 
|  | // which may run this task. | 
|  | Dimensions []string `json:"dimensions"` | 
|  |  | 
|  | // Environment is a set of environment variables needed by the task. | 
|  | Environment map[string]string `json:"environment,omitempty"` | 
|  |  | 
|  | // EnvPrefixes are prefixes to add to environment variables for the task, | 
|  | // for example, adding directories to PATH. Keys are environment variable | 
|  | // names and values are multiple values to add for the variable. | 
|  | EnvPrefixes map[string][]string `json:"env_prefixes,omitempty"` | 
|  |  | 
|  | // ExecutionTimeout is the maximum amount of time the task is allowed | 
|  | // to take. | 
|  | ExecutionTimeout time.Duration `json:"execution_timeout_ns,omitempty"` | 
|  |  | 
|  | // Expiration is how long the task may remain in the pending state | 
|  | // before it is abandoned. | 
|  | Expiration time.Duration `json:"expiration_ns,omitempty"` | 
|  |  | 
|  | // ExtraArgs are extra command-line arguments to pass to the task. | 
|  | // DEPRECATED: These are now ignored. | 
|  | ExtraArgs []string `json:"extra_args,omitempty"` | 
|  |  | 
|  | // ExtraTags are extra tags to add to the Swarming task. | 
|  | ExtraTags map[string]string `json:"extra_tags,omitempty"` | 
|  |  | 
|  | // Idempotent indicates that triggering this task with the same | 
|  | // parameters as previously triggered has no side effect and thus the | 
|  | // task may be de-duplicated. | 
|  | Idempotent bool `json:"idempotent,omitempty"` | 
|  |  | 
|  | // IoTimeout is the maximum amount of time which the task may take to | 
|  | // communicate with the server. | 
|  | IoTimeout time.Duration `json:"io_timeout_ns,omitempty"` | 
|  |  | 
|  | // MaxAttempts is the maximum number of attempts for this TaskSpec. If | 
|  | // zero, DEFAULT_TASK_SPEC_MAX_ATTEMPTS is used. | 
|  | MaxAttempts int `json:"max_attempts,omitempty"` | 
|  |  | 
|  | // Outputs are files and/or directories to use as outputs for the task. | 
|  | // Paths are relative to the task workdir. No error occurs if any of | 
|  | // these is missing. | 
|  | Outputs []string `json:"outputs,omitempty"` | 
|  |  | 
|  | // This field is ignored. | 
|  | Priority float64 `json:"priority,omitempty"` | 
|  |  | 
|  | // ServiceAccount indicates the Swarming service account to use for the | 
|  | // task. If not specified, we will attempt to choose a suitable default. | 
|  | ServiceAccount string `json:"service_account,omitempty"` | 
|  |  | 
|  | // TaskExecutor specifies what type of task executor should handle the task. | 
|  | TaskExecutor string `json:"task_executor,omitempty"` | 
|  | } | 
|  |  | 
|  | // Validate ensures that the TaskSpec is defined properly. | 
|  | func (t *TaskSpec) Validate(cfg *TasksCfg) error { | 
|  | // Ensure that CIPD packages are specified properly. | 
|  | for _, p := range t.CipdPackages { | 
|  | if p.Name == "" || p.Path == "" { | 
|  | return fmt.Errorf("CIPD packages must have a name, path, and version.") | 
|  | } | 
|  | } | 
|  |  | 
|  | if len(t.Dimensions) == 0 { | 
|  | return fmt.Errorf("Task must have dimensions") | 
|  | } | 
|  |  | 
|  | // Ensure that the dimensions are specified properly. | 
|  | for _, d := range t.Dimensions { | 
|  | split := strings.SplitN(d, ":", 2) | 
|  | if len(split) != 2 { | 
|  | return fmt.Errorf("Dimension %q does not contain a colon!", d) | 
|  | } | 
|  | } | 
|  |  | 
|  | if !util.In(t.TaskExecutor, types.ValidTaskExecutors) { | 
|  | return fmt.Errorf("Invalid task executor %q; must be one of: %v", t.TaskExecutor, types.ValidTaskExecutors) | 
|  | } | 
|  |  | 
|  | return nil | 
|  | } | 
|  |  | 
|  | // Copy returns a copy of the TaskSpec. | 
|  | func (t *TaskSpec) Copy() *TaskSpec { | 
|  | var caches []*Cache | 
|  | if len(t.Caches) > 0 { | 
|  | cachesDup := make([]Cache, len(t.Caches)) | 
|  | caches = make([]*Cache, 0, len(t.Caches)) | 
|  | for i, c := range t.Caches { | 
|  | cachesDup[i] = *c | 
|  | caches = append(caches, &cachesDup[i]) | 
|  | } | 
|  | } | 
|  | var cipdPackages []*CipdPackage | 
|  | if len(t.CipdPackages) > 0 { | 
|  | cipdPackages = make([]*CipdPackage, 0, len(t.CipdPackages)) | 
|  | pkgs := make([]CipdPackage, len(t.CipdPackages)) | 
|  | for i, p := range t.CipdPackages { | 
|  | pkgs[i] = *p | 
|  | cipdPackages = append(cipdPackages, &pkgs[i]) | 
|  | } | 
|  | } | 
|  | cmd := util.CopyStringSlice(t.Command) | 
|  | deps := util.CopyStringSlice(t.Dependencies) | 
|  | dims := util.CopyStringSlice(t.Dimensions) | 
|  | environment := util.CopyStringMap(t.Environment) | 
|  | var envPrefixes map[string][]string | 
|  | if len(t.EnvPrefixes) > 0 { | 
|  | envPrefixes = make(map[string][]string, len(t.EnvPrefixes)) | 
|  | for k, v := range t.EnvPrefixes { | 
|  | envPrefixes[k] = util.CopyStringSlice(v) | 
|  | } | 
|  | } | 
|  | extraArgs := util.CopyStringSlice(t.ExtraArgs) | 
|  | extraTags := util.CopyStringMap(t.ExtraTags) | 
|  | outputs := util.CopyStringSlice(t.Outputs) | 
|  | return &TaskSpec{ | 
|  | Caches:           caches, | 
|  | CasSpec:          t.CasSpec, | 
|  | CipdPackages:     cipdPackages, | 
|  | Command:          cmd, | 
|  | Dependencies:     deps, | 
|  | Dimensions:       dims, | 
|  | Environment:      environment, | 
|  | EnvPrefixes:      envPrefixes, | 
|  | ExecutionTimeout: t.ExecutionTimeout, | 
|  | Expiration:       t.Expiration, | 
|  | ExtraArgs:        extraArgs, | 
|  | ExtraTags:        extraTags, | 
|  | Idempotent:       t.Idempotent, | 
|  | IoTimeout:        t.IoTimeout, | 
|  | MaxAttempts:      t.MaxAttempts, | 
|  | Outputs:          outputs, | 
|  | Priority:         t.Priority, | 
|  | ServiceAccount:   t.ServiceAccount, | 
|  | TaskExecutor:     t.TaskExecutor, | 
|  | } | 
|  | } | 
|  |  | 
|  | // Cache is a struct representing a named cache which is used by a task. | 
|  | type Cache struct { | 
|  | Name string `json:"name"` | 
|  | Path string `json:"path"` | 
|  | } | 
|  |  | 
|  | // CipdPackage is a struct representing a CIPD package which needs to be | 
|  | // installed on a bot for a particular task. | 
|  | // TODO(borenet): Are there any downsides to using an alias rather than a new | 
|  | // type here? | 
|  | type CipdPackage = cipd.Package | 
|  |  | 
|  | // JobSpec is a struct which describes a set of TaskSpecs to run as part of a | 
|  | // larger effort. | 
|  | type JobSpec struct { | 
|  | // Priority indicates the relative priority of the job, with 0 < p <= 1, | 
|  | // where higher values result in scheduling the job's tasks sooner. If | 
|  | // unspecified or outside this range, DEFAULT_JOB_SPEC_PRIORITY is used. | 
|  | // Each task derives its priority from the set of jobs that depend upon | 
|  | // it. A task's priority is | 
|  | //   1 - (1-<job1 priority>)(1-<job2 priority>)...(1-<jobN priority>) | 
|  | // A task at HEAD with a priority of 1 and a blamelist of 1 commit has | 
|  | // approximately the same score as a task at HEAD with a priority of 0.57 | 
|  | // and a blamelist of 2 commits. | 
|  | // A backfill task with a priority of 1 that bisects a blamelist of 2 | 
|  | // commits has the same score as another backfill task at the same | 
|  | // commit with a priority of 0.4 that bisects a blamelist of 4 commits. | 
|  | Priority float64 `json:"priority,omitempty"` | 
|  | // The names of TaskSpecs that are direct dependencies of this JobSpec. | 
|  | TaskSpecs []string `json:"tasks"` | 
|  | // One of the TRIGGER_* constants; see documentation above. | 
|  | Trigger string `json:"trigger,omitempty"` | 
|  | } | 
|  |  | 
|  | // Validate returns an error if the JobSpec is not valid. | 
|  | func (j *JobSpec) Validate() error { | 
|  | // We can't validate j.TaskSpecs here because we don't know which are | 
|  | // defined.  Therefore, that check needs to occur at a higher level. | 
|  |  | 
|  | switch j.Trigger { | 
|  | case TRIGGER_ANY_BRANCH, TRIGGER_MASTER_ONLY, TRIGGER_MAIN_ONLY, | 
|  | TRIGGER_NIGHTLY, TRIGGER_ON_DEMAND, TRIGGER_WEEKLY: | 
|  | break | 
|  | default: | 
|  | return fmt.Errorf("Invalid job trigger %q", j.Trigger) | 
|  | } | 
|  | return nil | 
|  | } | 
|  |  | 
|  | // Copy returns a copy of the JobSpec. | 
|  | func (j *JobSpec) Copy() *JobSpec { | 
|  | var taskSpecs []string | 
|  | if j.TaskSpecs != nil { | 
|  | taskSpecs = make([]string, len(j.TaskSpecs)) | 
|  | copy(taskSpecs, j.TaskSpecs) | 
|  | } | 
|  | return &JobSpec{ | 
|  | Priority:  j.Priority, | 
|  | TaskSpecs: taskSpecs, | 
|  | Trigger:   j.Trigger, | 
|  | } | 
|  | } | 
|  |  | 
|  | // GetTaskSpecDAG returns a map describing all of the dependencies of the | 
|  | // JobSpec. Its keys are TaskSpec names and values are TaskSpec names upon | 
|  | // which the keys depend. | 
|  | func (j *JobSpec) GetTaskSpecDAG(cfg *TasksCfg) (map[string][]string, error) { | 
|  | rv := map[string][]string{} | 
|  | var visit func(string) error | 
|  | visit = func(name string) error { | 
|  | if _, ok := rv[name]; ok { | 
|  | return nil | 
|  | } | 
|  | spec, ok := cfg.Tasks[name] | 
|  | if !ok { | 
|  | return fmt.Errorf("No such task: %s", name) | 
|  | } | 
|  | deps := util.CopyStringSlice(spec.Dependencies) | 
|  | if deps == nil { | 
|  | deps = []string{} | 
|  | } | 
|  | rv[name] = deps | 
|  | for _, t := range deps { | 
|  | if err := visit(t); err != nil { | 
|  | return err | 
|  | } | 
|  | } | 
|  | return nil | 
|  | } | 
|  |  | 
|  | for _, t := range j.TaskSpecs { | 
|  | if err := visit(t); err != nil { | 
|  | return nil, err | 
|  | } | 
|  | } | 
|  | return rv, nil | 
|  | } | 
|  |  | 
|  | // findCycles searches for cyclical dependencies in the task specs and returns | 
|  | // an error if any are found. Also ensures that all task specs are reachable | 
|  | // from at least one job spec and that all jobs specs' dependencies are valid. | 
|  | func findCycles(tasks map[string]*TaskSpec, jobs map[string]*JobSpec) error { | 
|  | // Create vertex objects with metadata for the depth-first search. | 
|  | type vertex struct { | 
|  | active  bool | 
|  | name    string | 
|  | ts      *TaskSpec | 
|  | visited bool | 
|  | } | 
|  | vertices := make(map[string]*vertex, len(tasks)) | 
|  | for name, t := range tasks { | 
|  | vertices[name] = &vertex{ | 
|  | active:  false, | 
|  | name:    name, | 
|  | ts:      t, | 
|  | visited: false, | 
|  | } | 
|  | } | 
|  |  | 
|  | // visit performs a depth-first search of the graph, starting at v. | 
|  | var visit func(*vertex) error | 
|  | visit = func(v *vertex) error { | 
|  | v.active = true | 
|  | v.visited = true | 
|  | for _, dep := range v.ts.Dependencies { | 
|  | e := vertices[dep] | 
|  | if e == nil { | 
|  | return fmt.Errorf("Task %q has unknown task %q as a dependency.", v.name, dep) | 
|  | } | 
|  | if !e.visited { | 
|  | if err := visit(e); err != nil { | 
|  | return err | 
|  | } | 
|  | } else if e.active { | 
|  | return fmt.Errorf("Found a circular dependency involving %q and %q", v.name, e.name) | 
|  | } | 
|  | } | 
|  | v.active = false | 
|  | return nil | 
|  | } | 
|  |  | 
|  | // Perform a DFS, starting at each of the jobs' dependencies. | 
|  | for jobName, j := range jobs { | 
|  | for _, d := range j.TaskSpecs { | 
|  | v, ok := vertices[d] | 
|  | if !ok { | 
|  | return fmt.Errorf("Job %q has unknown task %q as a dependency.", jobName, d) | 
|  | } | 
|  | if !v.visited { | 
|  | if err := visit(v); err != nil { | 
|  | return err | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | // If any vertices have not been visited, then there are tasks which | 
|  | // no job has as a dependency. Report an error. | 
|  | for _, v := range vertices { | 
|  | if !v.visited { | 
|  | return fmt.Errorf("Task %q is not reachable by any Job!", v.name) | 
|  | } | 
|  | } | 
|  | return nil | 
|  | } | 
|  |  | 
|  | // CasSpec describes a set of task inputs in content-addressed storage. | 
|  | type CasSpec struct { | 
|  | Root     string   `json:"root,omitempty"` | 
|  | Paths    []string `json:"paths,omitempty"` | 
|  | Excludes []string `json:"excludes,omitempty"` | 
|  | Digest   string   `json:"digest,omitempty"` | 
|  | } | 
|  |  | 
|  | // Copy returns a deep copy of the CasSpec. | 
|  | func (s *CasSpec) Copy() *CasSpec { | 
|  | return &CasSpec{ | 
|  | Root:     s.Root, | 
|  | Paths:    util.CopyStringSlice(s.Paths), | 
|  | Excludes: util.CopyStringSlice(s.Excludes), | 
|  | Digest:   s.Digest, | 
|  | } | 
|  | } | 
|  |  | 
|  | // Validate returns an error if the CasSpec is invalid. | 
|  | func (s *CasSpec) Validate() error { | 
|  | if s.Root == "" && len(s.Paths) == 0 { | 
|  | if _, _, err := rbe.StringToDigest(s.Digest); err != nil { | 
|  | return skerr.Wrap(err) | 
|  | } | 
|  | return nil | 
|  | } | 
|  | if (s.Root != "") != (len(s.Paths) > 0) { | 
|  | return skerr.Fmt("Root and Paths must be specified together.") | 
|  | } | 
|  | return nil | 
|  | } | 
|  |  | 
|  | func Python3LinuxAMD64CIPDPackages() []*cipd.Package { | 
|  | var python3Pkgs []*cipd.Package | 
|  | for _, p := range cipd.PkgsPython[cipd.PlatformLinuxAmd64] { | 
|  | if strings.HasPrefix(p.Version, "version:2@2.7") { | 
|  | continue | 
|  | } | 
|  | python3Pkgs = append(python3Pkgs, p) | 
|  | } | 
|  | return python3Pkgs | 
|  | } | 
|  |  | 
|  | func Python3WindowsAMD64CIPDPackages() []*cipd.Package { | 
|  | var python3Pkgs []*cipd.Package | 
|  | for _, p := range cipd.PkgsPython[cipd.PlatformWindowsAmd64] { | 
|  | if strings.HasPrefix(p.Version, "version:2@2.7") { | 
|  | continue | 
|  | } | 
|  | python3Pkgs = append(python3Pkgs, p) | 
|  | } | 
|  | return python3Pkgs | 
|  | } |