blob: d17563325eb16bf2862802b0c280ae33959b487f [file] [log] [blame]
package types
import (
"fmt"
"reflect"
"sort"
"strconv"
"strings"
"time"
"unicode/utf8"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/gitiles"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
)
const (
// Swarming tags added by Task Scheduler.
SWARMING_TAG_ATTEMPT = "sk_attempt"
SWARMING_TAG_DIMENSION_PREFIX = "sk_dim_"
SWARMING_TAG_FORCED_JOB_ID = "sk_forced_job_id"
SWARMING_TAG_ID = "sk_id"
SWARMING_TAG_ISSUE = "sk_issue"
SWARMING_TAG_LUCI_PROJECT = "luci_project"
SWARMING_TAG_MILO_HOST = "milo_host"
SWARMING_TAG_NAME = "sk_name"
SWARMING_TAG_PARENT_TASK_ID = "sk_parent_task_id"
SWARMING_TAG_PATCHSET = "sk_patchset"
SWARMING_TAG_REPO = "sk_repo"
SWARMING_TAG_RETRY_OF = "sk_retry_of"
SWARMING_TAG_REVISION = "sk_revision"
SWARMING_TAG_SERVER = "sk_issue_server"
// These two tags allow the swarming ui to point to the GoB repo
SWARMING_TAG_SOURCE_REVISION = "source_revision"
SWARMING_TAG_SOURCE_REPO = "source_repo"
MILO_HOST = "https://ci.chromium.org/raw/build/%s"
// Types of task executors.
TaskExecutor_UseDefault = ""
TaskExecutor_Swarming = "swarming"
DefaultTaskExecutor = TaskExecutor_Swarming
)
var (
ValidTaskExecutors = []string{TaskExecutor_UseDefault, TaskExecutor_Swarming}
)
type TaskStatus string
const (
// TASK_STATUS_PENDING indicates the task has not started. It is the empty
// string so that it is the zero value of TaskStatus.
TASK_STATUS_PENDING TaskStatus = ""
// TASK_STATUS_RUNNING indicates the task is in progress.
TASK_STATUS_RUNNING TaskStatus = "RUNNING"
// TASK_STATUS_SUCCESS indicates the task completed successfully.
TASK_STATUS_SUCCESS TaskStatus = "SUCCESS"
// TASK_STATUS_FAILURE indicates the task completed with failures.
TASK_STATUS_FAILURE TaskStatus = "FAILURE"
// TASK_STATUS_MISHAP indicates the task exited early with an error, died
// while in progress, was manually canceled, expired while waiting on the
// queue, or timed out before completing.
TASK_STATUS_MISHAP TaskStatus = "MISHAP"
)
// TaskKey is a struct used for identifying a Task instance. Note that more
// than one Task may have the same TaskKey, eg. in the case of retries.
type TaskKey struct {
RepoState
Name string `json:"name"`
ForcedJobId string `json:"forcedJobId"`
}
// Copy returns a copy of the TaskKey.
func (k TaskKey) Copy() TaskKey {
return TaskKey{
RepoState: k.RepoState.Copy(),
Name: k.Name,
ForcedJobId: k.ForcedJobId,
}
}
// Valid indicates whether or not the TaskKey is valid.
func (k TaskKey) Valid() bool {
return k.RepoState.Valid() && k.Name != ""
}
// IsForceRun indicates whether this Task is for a forced Job, which
// indicates that it shouldn't be de-duplicated.
func (k TaskKey) IsForceRun() bool {
return k.ForcedJobId != ""
}
// Task describes a Swarming task generated from a TaskSpec, or a "fake" task
// that can not be executed on Swarming, but can be added to the DB and
// displayed as if it were a real TaskSpec.
//
// Task is stored as a GOB, so changes must maintain backwards compatibility.
// See gob package documentation for details, but generally:
// - Ensure new fields can be initialized with their zero value.
// - Do not change the type of any existing field.
// - Leave removed fields commented out to ensure the field name is not
// reused.
// - Add any new fields to the Copy() method.
type Task struct {
// Attempt is the attempt number of this task, starting with zero.
Attempt int `json:"attempt"`
// Commits are the commits which were tested in this Task. The list may
// change due to backfilling/bisecting.
Commits []string `json:"commits"`
// Created is the creation timestamp.
Created time.Time `json:"created"`
// DbModified is the time of the last successful call to TaskDB.PutTask/s for this
// Task, or zero if the task is new. It is not related to the ModifiedTs time
// of the associated Swarming task.
DbModified time.Time `json:"dbModified"`
// Finished is the time the task stopped running or expired from the queue, or
// zero if the task is pending or running.
Finished time.Time `json:"finished"`
// Id is a generated unique identifier for this Task instance. Must be
// URL-safe.
Id string `json:"id"`
// IsolatedOutput is the isolated hash of any outputs produced by this Task.
// Filled in when the task is completed. This field will not be set if the
// Task does not correspond to a Swarming task.
IsolatedOutput string `json:"isolatedOutput"`
// Jobs are the IDs of all Jobs which utilized this Task.
Jobs []string `json:"jobs"`
// MaxAttempts is the maximum number of attempts for this TaskSpec.
MaxAttempts int `json:"max_attempts"`
// ParentTaskIds are IDs of tasks which satisfied this task's dependencies.
ParentTaskIds []string `json:"parentTaskIds"`
// Properties contains key-value pairs from external sources. Both key and
// value must be UTF-8 strings. Prefer a JavaScript identifier for key. Use
// base64 encoding for binary data.
Properties map[string]string `json:"properties"`
// RetryOf is the ID of the task which this task is a retry of, if any.
RetryOf string `json:"retryOf"`
// Started is the time the task started running, or zero if the task is
// pending, or the same as Finished if the task never ran.
Started time.Time `json:"started"`
// Status is the current task status, default TASK_STATUS_PENDING.
Status TaskStatus `json:"status"`
// SwarmingBotId is the ID of the Swarming bot that ran this task. This
// field will not be set if the Task does not correspond to a Swarming task or
// if the task is still pending.
SwarmingBotId string `json:"swarmingBotId"`
// SwarmingTaskId is the Swarming task ID. This field will not be set if the
// Task does not correspond to a Swarming task.
SwarmingTaskId string `json:"swarmingTaskId"`
// TaskExecutor is the task executor used to run this Task.
TaskExecutor string `json:"taskExecutor"`
// TaskKey is a struct which describes aspects of the Task related
// to the current state of the repo when it ran, and about the Task
// itself.
TaskKey
}
// UpdateFromTaskResult sets or initializes t from data in s. If any changes
// were made to t, returns true.
//
// Returns ErrUnknownId if the SwarmingTaskId does not match.
//
// If empty, sets t.Id, t.Name, t.Repo, and t.Revision from s's tags named
// SWARMING_TAG_ID, SWARMING_TAG_NAME, SWARMING_TAG_REPO, and
// SWARMING_TAG_REVISION, and sets t.Created from s.CreatedTs. If these fields
// are non-empty, returns an error if they do not match.
//
// Always sets t.Status, t.Started, t.Finished, and t.IsolatedOutput based on s.
func (orig *Task) UpdateFromTaskResult(res *TaskResult) (bool, error) {
if res == nil {
return false, fmt.Errorf("Missing TaskResult. %v", res)
}
// Swarming TaskId.
if orig.SwarmingTaskId != res.ID {
return false, ErrUnknownId
}
copy := orig.Copy()
if !reflect.DeepEqual(orig, copy) {
sklog.Fatalf("Task.Copy is broken; original and copy differ:\n%#v\n%#v", orig, copy)
}
// "Identity" fields stored in tags.
checkOrSetFromTag := func(tagName string, field *string, fieldName string) error {
if tagValue, ok := res.Tags[tagName]; ok {
if len(tagValue) != 1 {
return fmt.Errorf("Expected a single value for tag key %q", tagName)
}
if *field == "" {
*field = tagValue[0]
} else if *field != tagValue[0] {
return fmt.Errorf("%s does not match for task %s. Was %s, now %s. %v %v", fieldName, orig.Id, *field, tagValue, orig, res)
}
}
return nil
}
if err := checkOrSetFromTag(SWARMING_TAG_FORCED_JOB_ID, &copy.ForcedJobId, "ForcedJobId"); err != nil {
return false, err
}
if err := checkOrSetFromTag(SWARMING_TAG_ID, &copy.Id, "Id"); err != nil {
return false, err
}
if err := checkOrSetFromTag(SWARMING_TAG_NAME, &copy.Name, "Name"); err != nil {
return false, err
}
if err := checkOrSetFromTag(SWARMING_TAG_REPO, &copy.Repo, "Repo"); err != nil {
return false, err
}
if err := checkOrSetFromTag(SWARMING_TAG_RETRY_OF, &copy.RetryOf, "RetryOf"); err != nil {
return false, err
}
if err := checkOrSetFromTag(SWARMING_TAG_REVISION, &copy.Revision, "Revision"); err != nil {
return false, err
}
attempt := fmt.Sprintf("%d", copy.Attempt)
if err := checkOrSetFromTag(SWARMING_TAG_ATTEMPT, &attempt, "Attempt"); err != nil {
return false, err
}
attemptInt, err := strconv.ParseInt(attempt, 10, 32)
if err != nil {
return false, fmt.Errorf("Failed to ParseInt: %s", err)
}
copy.Attempt = int(attemptInt)
if orig.Attempt != 0 && copy.Attempt != orig.Attempt {
return false, fmt.Errorf("Attempt does not match for task %s. Was %d now %d. %v %v", orig.Id, orig.Attempt, copy.Attempt, orig, res)
}
// Optional try job tags.
if err := checkOrSetFromTag(SWARMING_TAG_ISSUE, &copy.Issue, "Issue"); err != nil {
return false, err
}
if err := checkOrSetFromTag(SWARMING_TAG_PATCHSET, &copy.Patchset, "Patchset"); err != nil {
return false, err
}
if err := checkOrSetFromTag(SWARMING_TAG_SERVER, &copy.Server, "Server"); err != nil {
return false, err
}
// Set ParentTaskIds.
parentTaskIds := res.Tags[SWARMING_TAG_PARENT_TASK_ID]
sort.Strings(parentTaskIds)
copy.ParentTaskIds = parentTaskIds
// CreatedTs should always be present.
if util.TimeIsZero(copy.Created) {
copy.Created = res.Created
} else if !copy.Created.Equal(res.Created) {
return false, fmt.Errorf("Creation time has changed for task %s. Was %s, now %s. %v", orig.Id, orig.Created, res.Created, orig)
}
// Status.
copy.Status = res.Status
// Isolated output.
copy.IsolatedOutput = res.CasOutput
// Bot.
copy.SwarmingBotId = res.MachineID
// Timestamps.
maybeUpdateTime := func(newTime time.Time, field *time.Time, name string) {
if !util.TimeIsZero(newTime) {
*field = newTime
}
}
maybeUpdateTime(res.Started, &copy.Started, "Started")
maybeUpdateTime(res.Finished, &copy.Finished, "Finished")
if copy.Done() && util.TimeIsZero(copy.Started) {
copy.Started = copy.Finished
}
// TODO(benjaminwagner): SwarmingRpcsTaskResult has a ModifiedTs field that we
// could use to detect modifications. Unfortunately, it seems that while the
// task is running, ModifiedTs gets updated every 30 seconds, regardless of
// whether any other data actually changed. Maybe we could still use it for
// pending or completed tasks.
if !reflect.DeepEqual(orig, copy) {
*orig = *copy
return true, nil
}
return false, nil
}
func (t *Task) Done() bool {
return t.Status != TASK_STATUS_PENDING && t.Status != TASK_STATUS_RUNNING
}
// Fake returns whether this Task does not correspond to a Swarming task.
func (t *Task) Fake() bool {
return t.SwarmingTaskId == ""
}
func (t *Task) Success() bool {
return t.Status == TASK_STATUS_SUCCESS
}
func (t *Task) Copy() *Task {
return &Task{
Attempt: t.Attempt,
Commits: util.CopyStringSlice(t.Commits),
Created: t.Created,
DbModified: t.DbModified,
Finished: t.Finished,
Id: t.Id,
IsolatedOutput: t.IsolatedOutput,
Jobs: util.CopyStringSlice(t.Jobs),
MaxAttempts: t.MaxAttempts,
ParentTaskIds: util.CopyStringSlice(t.ParentTaskIds),
Properties: util.CopyStringMap(t.Properties),
RetryOf: t.RetryOf,
Started: t.Started,
Status: t.Status,
SwarmingBotId: t.SwarmingBotId,
SwarmingTaskId: t.SwarmingTaskId,
TaskExecutor: t.TaskExecutor,
TaskKey: t.TaskKey.Copy(),
}
}
// Validate returns an error if the task is not valid.
func (task *Task) Validate() error {
if !task.TaskKey.Valid() {
return fmt.Errorf("TaskKey is not valid.")
}
if task.Fake() && !(task.IsolatedOutput == "" && task.SwarmingBotId == "" && task.SwarmingTaskId == "") {
return fmt.Errorf("Can not specify Swarming info for a fake task.")
}
for key, value := range task.Properties {
if !utf8.ValidString(key) {
return fmt.Errorf("Invalid property key -- must be valid UTF8: %q", key)
}
if !utf8.ValidString(value) {
return fmt.Errorf("Invalid property value -- must be valid UTF8 or base64-encoded: %q", value)
}
}
// There may be old tasks where MaxAttempts is not initialized.
if task.MaxAttempts == 0 {
if task.Attempt != 0 {
return fmt.Errorf("Task MaxAttempts is not initialized but Attempt is initialized to %d. %+v", task.Attempt, task)
}
} else if task.Attempt >= task.MaxAttempts {
return fmt.Errorf("Task Attempt %d not less than MaxAttempts %d. %+v", task.Attempt, task.MaxAttempts, task)
}
if task.Attempt < 0 {
return fmt.Errorf("Task Attempt is negative!! %+v", task)
}
return nil
}
// Valid returns true if Validate() does not return an error. Hides
// task.TaskKey.Valid to prevent confusion.
func (task *Task) Valid() bool {
return task.Validate() == nil
}
// TaskSummary is a subset of the information found in a Task.
type TaskSummary struct {
Attempt int `json:"attempt"`
Id string `json:"id"`
MaxAttempts int `json:"max_attempts"`
Status TaskStatus `json:"status"`
SwarmingTaskId string `json:"swarmingTaskId"`
}
// MakeTaskSummary creates a TaskSummary from the Task instance.
func (t *Task) MakeTaskSummary() *TaskSummary {
return &TaskSummary{
Attempt: t.Attempt,
Id: t.Id,
MaxAttempts: t.MaxAttempts,
Status: t.Status,
SwarmingTaskId: t.SwarmingTaskId,
}
}
// Copy returns a copy of the TaskSummary.
func (t *TaskSummary) Copy() *TaskSummary {
return &TaskSummary{
Attempt: t.Attempt,
Id: t.Id,
MaxAttempts: t.MaxAttempts,
Status: t.Status,
SwarmingTaskId: t.SwarmingTaskId,
}
}
// TaskSlice implements sort.Interface. To sort tasks []*Task, use
// sort.Sort(TaskSlice(tasks)).
type TaskSlice []*Task
func (s TaskSlice) Len() int { return len(s) }
func (s TaskSlice) Less(i, j int) bool {
return s[i].Created.Before(s[j].Created)
}
func (s TaskSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
// TagsForTask returns the tags which should be set for a Task.
func TagsForTask(name, id string, attempt int, rs RepoState, retryOf string, dimensions map[string]string, forcedJobId string, parentTaskIds []string, extraTags map[string]string) []string {
tags := util.CopyStringMap(extraTags)
if tags == nil {
tags = make(map[string]string, 16)
}
tags[SWARMING_TAG_ATTEMPT] = fmt.Sprintf("%d", attempt)
tags[SWARMING_TAG_FORCED_JOB_ID] = forcedJobId
tags[SWARMING_TAG_NAME] = name
tags[SWARMING_TAG_ID] = id
tags[SWARMING_TAG_LUCI_PROJECT] = common.REPO_PROJECT_MAPPING[rs.Repo]
tags[SWARMING_TAG_MILO_HOST] = MILO_HOST
tags[SWARMING_TAG_REPO] = rs.Repo
tags[SWARMING_TAG_RETRY_OF] = retryOf
tags[SWARMING_TAG_REVISION] = rs.Revision
tags[SWARMING_TAG_SOURCE_REVISION] = rs.Revision
tags[SWARMING_TAG_SOURCE_REPO] = fmt.Sprintf(gitiles.CommitURL, rs.Repo, "%s")
if rs.IsTryJob() {
tags[SWARMING_TAG_SERVER] = rs.Server
tags[SWARMING_TAG_ISSUE] = rs.Issue
tags[SWARMING_TAG_PATCHSET] = rs.Patchset
}
for k, v := range dimensions {
key := fmt.Sprintf("%s%s", SWARMING_TAG_DIMENSION_PREFIX, k)
if _, ok := tags[key]; !ok {
tags[key] = v
} else {
sklog.Warningf("Duplicate dimension/tag %q.", k)
}
}
tagsList := make([]string, 0, len(tags)+len(parentTaskIds))
for k, v := range tags {
tagsList = append(tagsList, fmt.Sprintf("%s:%s", k, v))
}
for _, id := range parentTaskIds {
tagsList = append(tagsList, fmt.Sprintf("%s:%s", SWARMING_TAG_PARENT_TASK_ID, id))
}
return tagsList
}
// DimensionsFromTags returns a set of dimensions based on the given tags.
func DimensionsFromTags(tags map[string][]string) []string {
rv := make([]string, 0, len(tags))
for key, values := range tags {
if strings.HasPrefix(key, SWARMING_TAG_DIMENSION_PREFIX) {
dimKey := key[len(SWARMING_TAG_DIMENSION_PREFIX):]
for _, dimVal := range values {
rv = append(rv, fmt.Sprintf("%s:%s", dimKey, dimVal))
}
}
}
return rv
}