blob: ead49bf6c47abe564a51f9c08b2d1c31f2f683d6 [file] [log] [blame]
package types
import (
"fmt"
"time"
"go.skia.org/infra/go/sklog"
)
const (
// JOB_STATUS_REQUESTED indicates that we are aware of the Job but are not
// yet ready to start running Tasks for it.
JOB_STATUS_REQUESTED JobStatus = "REQUESTED"
// JOB_STATUS_IN_PROGRESS indicates that one or more of the Job's
// Task dependencies has not yet been satisfied.
JOB_STATUS_IN_PROGRESS JobStatus = ""
// JOB_STATUS_SUCCESS indicates that all of the Job's Task dependencies
// completed successfully.
JOB_STATUS_SUCCESS JobStatus = "SUCCESS"
// JOB_STATUS_FAILURE indicates that one or more of the Job's Task
// dependencies failed.
JOB_STATUS_FAILURE JobStatus = "FAILURE"
// JOB_STATUS_MISHAP indicates that one or more of the Job's Task
// dependencies exited early with an error, died while in progress, was
// manually canceled, expired while waiting on the queue, or timed out
// before completing.
JOB_STATUS_MISHAP JobStatus = "MISHAP"
// JOB_STATUS_CANCELED indicates that the Job has been canceled.
JOB_STATUS_CANCELED JobStatus = "CANCELED"
// JOB_URL_TMPL is a template for Job URLs.
JOB_URL_TMPL = "%s/job/%s"
// DEFAULT_MAX_TASK_ATTEMPTS is the maximum number of attempts we'll
// make of each TaskSpec in a Job.
DEFAULT_MAX_TASK_ATTEMPTS = 2
)
var (
JOB_STATUS_BADNESS = map[JobStatus]int{
JOB_STATUS_SUCCESS: 0,
JOB_STATUS_IN_PROGRESS: 1,
JOB_STATUS_REQUESTED: 2,
JOB_STATUS_CANCELED: 3,
JOB_STATUS_FAILURE: 4,
JOB_STATUS_MISHAP: 5,
}
VALID_JOB_STATUSES = []JobStatus{
JOB_STATUS_REQUESTED,
JOB_STATUS_IN_PROGRESS,
JOB_STATUS_SUCCESS,
JOB_STATUS_FAILURE,
JOB_STATUS_MISHAP,
JOB_STATUS_CANCELED,
}
)
// JobStatus represents the current status of a Job. A JobStatus other than
// JOB_STATUS_IN_PROGRESS is final; we do not retry Jobs, only their component
// Tasks.
type JobStatus string
// WorseThan returns true iff this JobStatus is worse than the given JobStatus.
func (s JobStatus) WorseThan(other JobStatus) bool {
return JOB_STATUS_BADNESS[s] > JOB_STATUS_BADNESS[other]
}
// WorseJobStatus returns the worse of the two JobStatus.
func WorseJobStatus(a, b JobStatus) JobStatus {
if a.WorseThan(b) {
return a
}
return b
}
// JobStatusFromTaskStatus returns a JobStatus based on a TaskStatus.
func JobStatusFromTaskStatus(s TaskStatus) JobStatus {
switch s {
case TASK_STATUS_SUCCESS:
return JOB_STATUS_SUCCESS
case TASK_STATUS_FAILURE:
return JOB_STATUS_FAILURE
case TASK_STATUS_MISHAP:
return JOB_STATUS_MISHAP
}
return JOB_STATUS_IN_PROGRESS
}
// Job represents a set of Tasks which are executed as part of a larger effort.
//
// Job is stored as a GOB, so changes must maintain backwards compatibility.
// See gob package documentation for details, but generally:
// - Ensure new fields can be initialized with their zero value.
// - Do not change the type of any existing field.
// - Leave removed fields commented out to ensure the field name is not
// reused.
// - Add any new fields to the Copy() method.
type Job struct {
// BuildbucketBuildId is the ID of the Buildbucket build with which this
// Job is associated, if one exists.
BuildbucketBuildId int64 `json:"buildbucketBuildId"`
// BuildbucketLeaseKey is the lease key for running a Buildbucket build.
// TODO(borenet): Maybe this doesn't belong in the DB.
BuildbucketLeaseKey int64 `json:"buildbucketLeaseKey"`
// BuildbucketPubSubTopic is the pub/sub topic to which we'll pub/sub
// messages to update the build.
BuildbucketPubSubTopic string `json:"buildbucketPubSubTopic"`
// BuildbucketToken is used to authenticate requests to update the
// Buildbucket build.
BuildbucketToken string `json:"buildbucketToken"`
// Created is the creation timestamp. This property should never change
// for a given Job instance.
Created time.Time `json:"created"`
// DbModified is the time of the last successful call to JobDB.PutJob/s
// for this Job, or zero if the job is new.
DbModified time.Time `json:"dbModified"`
// Dependencies maps out the DAG of TaskSpec names upon which this Job
// depends. Keys are TaskSpec names and values are slices of TaskSpec
// names indicating which TaskSpecs that TaskSpec depends on. This
// property should never change for a given Job instance.
Dependencies map[string][]string `json:"dependencies"`
// Finished is the time at which all of the Job's dependencies finished,
// successfully or not.
Finished time.Time `json:"finished"`
// Id is a unique identifier for the Job. This property should never
// change for a given Job instance, after its initial insertion into the
// DB.
Id string `json:"id"`
// IsForce indicates whether this is a manually-triggered Job, as
// opposed to a normally scheduled one, or a try job.
IsForce bool `json:"isForce"`
// Name is a human-friendly descriptive name for the Job. All Jobs
// generated from the same JobSpec have the same name. This property
// should never change for a given Job instance.
Name string `json:"name"`
// Priority is an indicator of the relative priority of this Job.
Priority float64 `json:"priority"`
// RepoState is the current state of the repository for this Job.
RepoState
// Requested is the time at which this Job was requested. This is a
// commit timestamp, tryjob request creation timestamp, time at which
// the server received a force trigger job request, etc.
Requested time.Time `json:"requested"`
// Status is the current Job status, default JOB_STATUS_IN_PROGRESS.
Status JobStatus `json:"status"`
// StatusDetails provides additional details for the status of the Job,
// including reasons it might have failed. This may be truncated due to
// database storage limitations, so it should not include, for example, full
// logs.
StatusDetails string `json:"statusDetails"`
// Tasks are the Task instances which satisfied the dependencies of
// the Job. Keys are TaskSpec names and values are slices of TaskSummary
// instances describing the Tasks.
Tasks map[string][]*TaskSummary `json:"tasks"`
}
// Copy returns a copy of the Job.
func (j *Job) Copy() *Job {
var deps map[string][]string
if j.Dependencies != nil {
deps = make(map[string][]string, len(j.Dependencies))
for k, v := range j.Dependencies {
cpy := make([]string, len(v))
copy(cpy, v)
deps[k] = cpy
}
}
var tasks map[string][]*TaskSummary
if j.Tasks != nil {
tasks = make(map[string][]*TaskSummary, len(j.Tasks))
for k, v := range j.Tasks {
cpy := make([]*TaskSummary, 0, len(v))
for _, t := range v {
cpy = append(cpy, t.Copy())
}
tasks[k] = cpy
}
}
return &Job{
BuildbucketBuildId: j.BuildbucketBuildId,
BuildbucketLeaseKey: j.BuildbucketLeaseKey,
BuildbucketPubSubTopic: j.BuildbucketPubSubTopic,
BuildbucketToken: j.BuildbucketToken,
Created: j.Created,
DbModified: j.DbModified,
Dependencies: deps,
Finished: j.Finished,
Id: j.Id,
IsForce: j.IsForce,
Name: j.Name,
Priority: j.Priority,
RepoState: j.RepoState.Copy(),
Requested: j.Requested,
Status: j.Status,
StatusDetails: j.StatusDetails,
Tasks: tasks,
}
}
func (j *Job) Done() bool {
return j.Status != JOB_STATUS_IN_PROGRESS && j.Status != JOB_STATUS_REQUESTED
}
// MakeTaskKey returns a TaskKey for the given Task name.
func (j *Job) MakeTaskKey(taskName string) TaskKey {
rv := TaskKey{
RepoState: j.RepoState.Copy(),
Name: taskName,
}
if j.IsForce {
rv.ForcedJobId = j.Id
}
return rv
}
// URL returns a URL for the Job.
func (j *Job) URL(taskSchedulerHost string) string {
return fmt.Sprintf(JOB_URL_TMPL, taskSchedulerHost, j.Id)
}
// TraverseDependencies traces the dependency graph of the Job, calling the
// given function for each dependency. Only calls the function on task specs
// for whose dependencies the function has already been called. If the passed-in
// function returns an error, iteration stops and TraverseDependencies returns
// the same error.
func (j *Job) TraverseDependencies(fn func(string) error) error {
done := make(map[string]bool, len(j.Dependencies))
var visit func(string) error
visit = func(name string) error {
for _, d := range j.Dependencies[name] {
if !done[d] {
if err := visit(d); err != nil {
return err
}
}
}
done[name] = true
return fn(name)
}
for d := range j.Dependencies {
if !done[d] {
if err := visit(d); err != nil {
return err
}
}
}
return nil
}
// DeriveStatus derives a JobStatus based on the TaskStatuses in the Job's
// dependency tree.
func (j *Job) DeriveStatus() JobStatus {
if j.Status == JOB_STATUS_REQUESTED {
return JOB_STATUS_REQUESTED
}
if len(j.Tasks) == 0 {
return JOB_STATUS_IN_PROGRESS
}
worstStatus := JOB_STATUS_SUCCESS
if err := j.TraverseDependencies(func(name string) error {
tasks, ok := j.Tasks[name]
if !ok || len(tasks) == 0 {
worstStatus = WorseJobStatus(worstStatus, JOB_STATUS_IN_PROGRESS)
return nil
}
// We may have more than one Task for this spec, due to
// retrying of failed Tasks. We should not return a "failed"
// result if we still have retry attempts remaining or if we've
// already retried and succeeded.
maxAttempts := tasks[0].MaxAttempts
if maxAttempts == 0 {
maxAttempts = DEFAULT_MAX_TASK_ATTEMPTS
}
canRetry := len(tasks) < maxAttempts
bestStatus := JOB_STATUS_MISHAP
for _, t := range tasks {
status := JobStatusFromTaskStatus(t.Status)
if bestStatus.WorseThan(status) {
bestStatus = status
}
}
if bestStatus == JOB_STATUS_SUCCESS || bestStatus == JOB_STATUS_IN_PROGRESS {
worstStatus = WorseJobStatus(worstStatus, bestStatus)
} else if canRetry {
worstStatus = WorseJobStatus(worstStatus, JOB_STATUS_IN_PROGRESS)
} else {
worstStatus = WorseJobStatus(worstStatus, bestStatus)
}
return nil
}); err != nil {
// Our inner function doesn't return errors, and
// TraverseDependencies doesn't return errors of its own, so
// this should be safe.
sklog.Errorf("Got error traversing Job dependencies: %s", err)
return JOB_STATUS_IN_PROGRESS
}
return worstStatus
}
// JobSlice implements sort.Interface. To sort jobs []*Job, use
// sort.Sort(JobSlice(jobs)).
type JobSlice []*Job
func (s JobSlice) Len() int { return len(s) }
func (s JobSlice) Less(i, j int) bool {
return s[i].Created.Before(s[j].Created)
}
func (s JobSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}