blob: e213ce1439034cc09e9d69989254718787c54a88 [file] [log] [blame]
package scheduling
import (
"bytes"
"encoding/base64"
"encoding/gob"
"fmt"
"path"
"sort"
"strconv"
"strings"
swarming_api "go.chromium.org/luci/common/api/swarming/swarming/v1"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/isolate"
"go.skia.org/infra/go/swarming"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/db/cache"
"go.skia.org/infra/task_scheduler/go/specs"
"go.skia.org/infra/task_scheduler/go/types"
)
func jobSet(jobs ...*types.Job) map[*types.Job]struct{} {
rv := make(map[*types.Job]struct{}, len(jobs))
for _, j := range jobs {
rv[j] = struct{}{}
}
return rv
}
// taskCandidate is a struct used for determining which tasks to schedule.
type taskCandidate struct {
Attempt int `json:"attempt"`
// NB: Because multiple Jobs may share a Task, the BuildbucketBuildId
// could be inherited from any matching Job. Therefore, this should be
// used for non-critical, informational purposes only.
BuildbucketBuildId int64 `json:"buildbucketBuildId"`
Commits []string `json:"commits"`
IsolatedInput string `json:"isolatedInput"`
IsolatedHashes []string `json:"isolatedHashes"`
Jobs map[*types.Job]struct{} `json:"jobs"`
ParentTaskIds []string `json:"parentTaskIds"`
RetryOf string `json:"retryOf"`
Score float64 `json:"score"`
StealingFromId string `json:"stealingFromId"`
types.TaskKey
TaskSpec *specs.TaskSpec `json:"taskSpec"`
}
// Copy returns a copy of the taskCandidate.
func (c *taskCandidate) Copy() *taskCandidate {
jobs := make(map[*types.Job]struct{}, len(c.Jobs))
for j, _ := range c.Jobs {
jobs[j] = struct{}{}
}
return &taskCandidate{
Attempt: c.Attempt,
BuildbucketBuildId: c.BuildbucketBuildId,
Commits: util.CopyStringSlice(c.Commits),
IsolatedInput: c.IsolatedInput,
IsolatedHashes: util.CopyStringSlice(c.IsolatedHashes),
Jobs: jobs,
ParentTaskIds: util.CopyStringSlice(c.ParentTaskIds),
RetryOf: c.RetryOf,
Score: c.Score,
StealingFromId: c.StealingFromId,
TaskKey: c.TaskKey.Copy(),
TaskSpec: c.TaskSpec.Copy(),
}
}
// MakeId generates a string ID for the taskCandidate.
func (c *taskCandidate) MakeId() string {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(&c.TaskKey); err != nil {
panic(fmt.Sprintf("Failed to GOB encode TaskKey: %s", err))
}
b64 := base64.StdEncoding.EncodeToString(buf.Bytes())
return fmt.Sprintf("taskCandidate|%s", b64)
}
// parseId generates taskCandidate information from the ID.
func parseId(id string) (types.TaskKey, error) {
var rv types.TaskKey
split := strings.Split(id, "|")
if len(split) != 2 {
return rv, fmt.Errorf("Invalid ID, not enough parts: %q", id)
}
if split[0] != "taskCandidate" {
return rv, fmt.Errorf("Invalid ID, no 'taskCandidate' prefix: %q", id)
}
b, err := base64.StdEncoding.DecodeString(split[1])
if err != nil {
return rv, fmt.Errorf("Failed to base64 decode ID: %s", err)
}
if err := gob.NewDecoder(bytes.NewBuffer(b)).Decode(&rv); err != nil {
return rv, fmt.Errorf("Failed to GOB decode ID: %s", err)
}
return rv, nil
}
// MakeTask instantiates a types.Task from the taskCandidate.
func (c *taskCandidate) MakeTask() *types.Task {
commits := make([]string, len(c.Commits))
copy(commits, c.Commits)
jobs := make([]string, 0, len(c.Jobs))
for j := range c.Jobs {
jobs = append(jobs, j.Id)
}
sort.Strings(jobs)
parentTaskIds := make([]string, len(c.ParentTaskIds))
copy(parentTaskIds, c.ParentTaskIds)
maxAttempts := c.TaskSpec.MaxAttempts
if maxAttempts == 0 {
maxAttempts = specs.DEFAULT_TASK_SPEC_MAX_ATTEMPTS
}
return &types.Task{
Attempt: c.Attempt,
Commits: commits,
Id: "", // Filled in when the task is inserted into the DB.
Jobs: jobs,
MaxAttempts: maxAttempts,
ParentTaskIds: parentTaskIds,
RetryOf: c.RetryOf,
TaskKey: c.TaskKey.Copy(),
}
}
// MakeIsolateTask creates an isolate.Task from this taskCandidate.
func (c *taskCandidate) MakeIsolateTask(infraBotsDir, baseDir string) *isolate.Task {
os := "linux"
for _, d := range c.TaskSpec.Dimensions {
if strings.HasPrefix(d, "os:") {
os = d[len("os:"):]
break
}
}
return &isolate.Task{
BaseDir: baseDir,
Blacklist: isolate.DEFAULT_BLACKLIST,
Deps: c.IsolatedHashes,
IsolateFile: path.Join(infraBotsDir, c.TaskSpec.Isolate),
OsType: os,
}
}
// getPatchStorage returns "gerrit" or "" based on the Server URL.
func getPatchStorage(server string) string {
if server == "" {
return ""
}
return "gerrit"
}
// replaceVars replaces variable names with their values in a given string.
func replaceVars(c *taskCandidate, s, taskId string) string {
issueShort := ""
if len(c.Issue) < types.ISSUE_SHORT_LENGTH {
issueShort = c.Issue
} else {
issueShort = c.Issue[len(c.Issue)-types.ISSUE_SHORT_LENGTH:]
}
replacements := map[string]string{
specs.VARIABLE_BUILDBUCKET_BUILD_ID: strconv.FormatInt(c.BuildbucketBuildId, 10),
specs.VARIABLE_CODEREVIEW_SERVER: c.Server,
specs.VARIABLE_ISSUE: c.Issue,
specs.VARIABLE_ISSUE_SHORT: issueShort,
specs.VARIABLE_PATCH_REF: c.RepoState.GetPatchRef(),
specs.VARIABLE_PATCH_REPO: c.PatchRepo,
specs.VARIABLE_PATCH_STORAGE: getPatchStorage(c.Server),
specs.VARIABLE_PATCHSET: c.Patchset,
specs.VARIABLE_REPO: c.Repo,
specs.VARIABLE_REVISION: c.Revision,
specs.VARIABLE_TASK_ID: taskId,
specs.VARIABLE_TASK_NAME: c.Name,
}
for k, v := range replacements {
s = strings.Replace(s, fmt.Sprintf(specs.VARIABLE_SYNTAX, k), v, -1)
}
return s
}
// MakeTaskRequest creates a SwarmingRpcsNewTaskRequest object from the taskCandidate.
func (c *taskCandidate) MakeTaskRequest(id, isolateServer, pubSubTopic string) (*swarming_api.SwarmingRpcsNewTaskRequest, error) {
var caches []*swarming_api.SwarmingRpcsCacheEntry
if len(c.TaskSpec.Caches) > 0 {
caches = make([]*swarming_api.SwarmingRpcsCacheEntry, 0, len(c.TaskSpec.Caches))
for _, cache := range c.TaskSpec.Caches {
caches = append(caches, &swarming_api.SwarmingRpcsCacheEntry{
Name: cache.Name,
Path: cache.Path,
})
}
}
var cipdInput *swarming_api.SwarmingRpcsCipdInput
if len(c.TaskSpec.CipdPackages) > 0 {
cipdInput = &swarming_api.SwarmingRpcsCipdInput{
Packages: make([]*swarming_api.SwarmingRpcsCipdPackage, 0, len(c.TaskSpec.CipdPackages)),
}
for _, p := range c.TaskSpec.CipdPackages {
cipdInput.Packages = append(cipdInput.Packages, &swarming_api.SwarmingRpcsCipdPackage{
PackageName: p.Name,
Path: p.Path,
Version: p.Version,
})
}
}
dims := make([]*swarming_api.SwarmingRpcsStringPair, 0, len(c.TaskSpec.Dimensions))
dimsMap := make(map[string]string, len(c.TaskSpec.Dimensions))
for _, d := range c.TaskSpec.Dimensions {
split := strings.SplitN(d, ":", 2)
key := split[0]
val := split[1]
dims = append(dims, &swarming_api.SwarmingRpcsStringPair{
Key: key,
Value: val,
})
dimsMap[key] = val
}
var env []*swarming_api.SwarmingRpcsStringPair
if len(c.TaskSpec.Environment) > 0 {
env = make([]*swarming_api.SwarmingRpcsStringPair, 0, len(c.TaskSpec.Environment))
for k, v := range c.TaskSpec.Environment {
env = append(env, &swarming_api.SwarmingRpcsStringPair{
Key: k,
Value: v,
})
}
}
var envPrefixes []*swarming_api.SwarmingRpcsStringListPair
if len(c.TaskSpec.EnvPrefixes) > 0 {
envPrefixes = make([]*swarming_api.SwarmingRpcsStringListPair, 0, len(c.TaskSpec.EnvPrefixes))
for k, v := range c.TaskSpec.EnvPrefixes {
envPrefixes = append(envPrefixes, &swarming_api.SwarmingRpcsStringListPair{
Key: k,
Value: util.CopyStringSlice(v),
})
}
}
cmd := make([]string, 0, len(c.TaskSpec.Command))
for _, arg := range c.TaskSpec.Command {
cmd = append(cmd, replaceVars(c, arg, id))
}
extraArgs := make([]string, 0, len(c.TaskSpec.ExtraArgs))
for _, arg := range c.TaskSpec.ExtraArgs {
extraArgs = append(extraArgs, replaceVars(c, arg, id))
}
extraTags := make(map[string]string, len(c.TaskSpec.ExtraTags))
for k, v := range c.TaskSpec.ExtraTags {
extraTags[k] = replaceVars(c, v, id)
}
expirationSecs := int64(c.TaskSpec.Expiration.Seconds())
if expirationSecs == int64(0) {
expirationSecs = int64(swarming.RECOMMENDED_EXPIRATION.Seconds())
}
executionTimeoutSecs := int64(c.TaskSpec.ExecutionTimeout.Seconds())
if executionTimeoutSecs == int64(0) {
executionTimeoutSecs = int64(swarming.RECOMMENDED_HARD_TIMEOUT.Seconds())
}
ioTimeoutSecs := int64(c.TaskSpec.IoTimeout.Seconds())
if ioTimeoutSecs == int64(0) {
ioTimeoutSecs = int64(swarming.RECOMMENDED_IO_TIMEOUT.Seconds())
}
outputs := util.CopyStringSlice(c.TaskSpec.Outputs)
return &swarming_api.SwarmingRpcsNewTaskRequest{
ExpirationSecs: expirationSecs,
Name: c.Name,
Priority: swarming.RECOMMENDED_PRIORITY,
Properties: &swarming_api.SwarmingRpcsTaskProperties{
Caches: caches,
CipdInput: cipdInput,
Command: cmd,
Dimensions: dims,
Env: env,
EnvPrefixes: envPrefixes,
ExecutionTimeoutSecs: executionTimeoutSecs,
ExtraArgs: extraArgs,
Idempotent: false,
InputsRef: &swarming_api.SwarmingRpcsFilesRef{
Isolated: c.IsolatedInput,
Isolatedserver: isolateServer,
Namespace: isolate.DEFAULT_NAMESPACE,
},
IoTimeoutSecs: ioTimeoutSecs,
Outputs: outputs,
},
PubsubTopic: fmt.Sprintf(swarming.PUBSUB_FULLY_QUALIFIED_TOPIC_TMPL, common.PROJECT_ID, pubSubTopic),
PubsubUserdata: id,
ServiceAccount: c.TaskSpec.ServiceAccount,
Tags: types.TagsForTask(c.Name, id, c.Attempt, c.RepoState, c.RetryOf, dimsMap, c.ForcedJobId, c.ParentTaskIds, extraTags),
User: "skiabot@google.com",
}, nil
}
// allDepsMet determines whether all dependencies for the given task candidate
// have been satisfied, and if so, returns a map of whose keys are task IDs and
// values are their isolated outputs.
func (c *taskCandidate) allDepsMet(cache cache.TaskCache) (bool, map[string]string, error) {
rv := make(map[string]string, len(c.TaskSpec.Dependencies))
for _, depName := range c.TaskSpec.Dependencies {
key := c.TaskKey.Copy()
key.Name = depName
byKey, err := cache.GetTasksByKey(&key)
if err != nil {
return false, nil, err
}
ok := false
for _, t := range byKey {
if t.Done() && t.Success() && t.IsolatedOutput != "" {
rv[t.Id] = t.IsolatedOutput
ok = true
break
}
}
if !ok {
return false, nil, nil
}
}
return true, rv, nil
}
// taskCandidateSlice is an alias used for sorting a slice of taskCandidates.
type taskCandidateSlice []*taskCandidate
func (s taskCandidateSlice) Len() int { return len(s) }
func (s taskCandidateSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s taskCandidateSlice) Less(i, j int) bool {
return s[i].Score > s[j].Score // candidates sort in decreasing order.
}