blob: 065618d6206e9f84f36bd4e5e5b3b3d5d3a4d0f5 [file] [log] [blame]
package main
/*
Jobs metrics.
*/
import (
"bytes"
"context"
"encoding/gob"
"fmt"
"sort"
"sync"
"time"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/metrics2/events"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/db/remote_db"
)
var (
// Time periods over which to compute metrics. Assumed to be sorted in
// increasing order.
TIME_PERIODS = []time.Duration{24 * time.Hour, 7 * 24 * time.Hour}
)
const (
JOB_STREAM = "job_metrics"
)
// jobTypeString is an enum of the types of Jobs that computeAvgJobDuration will aggregate separately.
// The string value is the same as the job_type tag value returned by computeAvgJobDuration.
type jobTypeString string
const (
NORMAL jobTypeString = "normal"
FORCED jobTypeString = "forced"
TRYJOB jobTypeString = "tryjob"
)
// jobEventDB implements the events.EventDB interface.
type jobEventDB struct {
cached []*events.Event
db db.JobReader
em *events.EventMetrics
// Do not lock mtx when calling methods on em. Otherwise deadlock can occur.
// E.g. (now fixed):
// 1. Thread 1: EventManager.updateMetrics locks EventMetrics.mtx
// 2. Thread 2: jobEventDB.update locks jobEventDB.mtx
// 3. Thread 1: EventManager.updateMetrics calls jobEventDB.Range, which waits
// to lock jobEventDB.mtx
// 4. Thread 2: jobEventDB.update calls EventMetrics.AggregateMetric (by way
// of addJobAggregates and EventStream.AggregateMetric), which waits to lock
// EventMetrics.mtx
mtx sync.Mutex
}
// See docs for events.EventDB interface.
func (j *jobEventDB) Append(string, []byte) error {
return fmt.Errorf("jobEventDB is read-only!")
}
// See docs for events.EventDB interface.
func (j *jobEventDB) Close() error {
return nil
}
// See docs for events.EventDB interface.
func (j *jobEventDB) Insert(*events.Event) error {
return fmt.Errorf("jobEventDB is read-only!")
}
// See docs for events.EventDB interface.
func (j *jobEventDB) Range(stream string, start, end time.Time) ([]*events.Event, error) {
j.mtx.Lock()
defer j.mtx.Unlock()
n := len(j.cached)
if n == 0 {
return []*events.Event{}, nil
}
first := sort.Search(n, func(i int) bool {
return !j.cached[i].Timestamp.Before(start)
})
last := first + sort.Search(n-first, func(i int) bool {
return !j.cached[i+first].Timestamp.Before(end)
})
rv := make([]*events.Event, last-first, last-first)
copy(rv[:], j.cached[first:last])
return rv, nil
}
// update updates the cached jobs in the jobEventDB. Only a single thread may
// call this method, but it can be called concurrently with other methods.
func (j *jobEventDB) update() error {
defer metrics2.FuncTimer().Stop()
now := time.Now()
longestPeriod := TIME_PERIODS[len(TIME_PERIODS)-1]
jobs, err := j.db.GetJobsFromDateRange(now.Add(-longestPeriod), now)
if err != nil {
return fmt.Errorf("Failed to load jobs from %s to %s: %s", now.Add(-longestPeriod), now, err)
}
sklog.Debugf("jobEventDB.update: Processing %d jobs for time range %s to %s.", len(jobs), now.Add(-longestPeriod), now)
cached := make([]*events.Event, 0, len(jobs))
for _, job := range jobs {
if !job.Done() {
continue
}
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(job); err != nil {
return fmt.Errorf("Failed to encode %#v to GOB: %s", job, err)
}
ev := &events.Event{
Stream: JOB_STREAM,
Timestamp: job.Created,
Data: buf.Bytes(),
}
cached = append(cached, ev)
}
j.mtx.Lock()
defer j.mtx.Unlock()
j.cached = cached
return nil
}
// computeAvgJobDuration is an events.DynamicAggregateFn that returns metrics for average Job duration
// for Jobs with status SUCCESS or FAILURE, given a slice of Events created by jobEventDB.update.
// The first return value will contain the tags "job_name" (db.Job.Name) and "job_type" (one of
// "normal", "tryjob", "forced"), and the second return value will be the corresponding average Job
// duration. Returns an error if Event.Data can't be GOB-decoded as a db.Job.
func computeAvgJobDuration(ev []*events.Event) ([]map[string]string, []float64, error) {
if len(ev) > 0 {
// ev should be ordered by timestamp
sklog.Debugf("Calculating avg-duration for %d jobs since %s.", len(ev), ev[0].Timestamp)
}
type sum struct {
count int
total time.Duration
}
type jobSums struct {
normal sum
tryjob sum
forced sum
}
byJob := map[string]*jobSums{}
for _, e := range ev {
var job db.Job
if err := gob.NewDecoder(bytes.NewReader(e.Data)).Decode(&job); err != nil {
return nil, nil, err
}
if !(job.Status == db.JOB_STATUS_SUCCESS || job.Status == db.JOB_STATUS_FAILURE) {
continue
}
entry, ok := byJob[job.Name]
if !ok {
entry = &jobSums{}
byJob[job.Name] = entry
}
var jobSum *sum
if job.IsForce {
jobSum = &entry.forced
} else if job.IsTryJob() {
jobSum = &entry.tryjob
} else {
jobSum = &entry.normal
}
jobSum.count++
jobSum.total += job.Finished.Sub(job.Created)
}
rvTags := make([]map[string]string, 0, len(byJob)*3)
rvVals := make([]float64, 0, len(byJob)*3)
add := func(jobName string, jobType jobTypeString, jobSum sum) {
if jobSum.count == 0 {
return
}
value := float64(jobSum.total) / float64(jobSum.count)
rvTags = append(rvTags, map[string]string{
"job_name": jobName,
"job_type": string(jobType),
})
rvVals = append(rvVals, value)
}
for jobName, jobSums := range byJob {
add(jobName, NORMAL, jobSums.normal)
add(jobName, TRYJOB, jobSums.tryjob)
add(jobName, FORCED, jobSums.forced)
}
return rvTags, rvVals, nil
}
// computeJobFailureMishapRate is an events.DynamicAggregateFn that returns metrics for Job failure rate and
// mishap rate, given a slice of Events created by jobEventDB.update. The first return value will
// contain the tags "job_name" (db.Job.Name) and "metric" (one of "failure-rate", "mishap-rate"),
// and the second return value will be the corresponding ratio of failed/mishap Jobs to all
// completed Jobs. Returns an error if Event.Data can't be GOB-decoded as a db.Job.
func computeJobFailureMishapRate(ev []*events.Event) ([]map[string]string, []float64, error) {
if len(ev) > 0 {
// ev should be ordered by timestamp
sklog.Debugf("Calculating failure-rate for %d jobs since %s.", len(ev), ev[0].Timestamp)
}
type jobSum struct {
fails int
mishaps int
count int
}
byJob := map[string]*jobSum{}
for _, e := range ev {
var job db.Job
if err := gob.NewDecoder(bytes.NewReader(e.Data)).Decode(&job); err != nil {
return nil, nil, err
}
entry, ok := byJob[job.Name]
if !ok {
entry = &jobSum{}
byJob[job.Name] = entry
}
entry.count++
if job.Status == db.JOB_STATUS_FAILURE {
entry.fails++
} else if job.Status == db.JOB_STATUS_MISHAP {
entry.mishaps++
}
}
rvTags := make([]map[string]string, 0, len(byJob)*2)
rvVals := make([]float64, 0, len(byJob)*2)
add := func(jobName, metric string, value float64) {
rvTags = append(rvTags, map[string]string{
"job_name": jobName,
"job_type": "",
"metric": metric,
})
rvVals = append(rvVals, value)
}
for jobName, jobSum := range byJob {
if jobSum.count == 0 {
continue
}
add(jobName, "failure-rate", float64(jobSum.fails)/float64(jobSum.count))
add(jobName, "mishap-rate", float64(jobSum.mishaps)/float64(jobSum.count))
}
return rvTags, rvVals, nil
}
// addJobAggregates adds aggregation functions for job events to the EventStream.
func addJobAggregates(s *events.EventStream) error {
for _, period := range TIME_PERIODS {
if err := s.DynamicMetric(map[string]string{"metric": "avg-duration"}, period, computeAvgJobDuration); err != nil {
return err
}
// Job failure/mishap rate.
if err := s.DynamicMetric(nil, period, computeJobFailureMishapRate); err != nil {
return err
}
}
return nil
}
// StartJobMetrics starts a goroutine which ingests metrics data based on Jobs.
func StartJobMetrics(taskSchedulerDbUrl string, ctx context.Context) error {
db, err := remote_db.NewClient(taskSchedulerDbUrl, httputils.NewTimeoutClient())
if err != nil {
return err
}
edb := &jobEventDB{
cached: []*events.Event{},
db: db,
}
em, err := events.NewEventMetrics(edb, "job_metrics")
if err != nil {
return err
}
edb.em = em
s := em.GetEventStream(JOB_STREAM)
if err := addJobAggregates(s); err != nil {
return err
}
lv := metrics2.NewLiveness("last_successful_job_metrics_update")
go util.RepeatCtx(5*time.Minute, ctx, func() {
if err := edb.update(); err != nil {
sklog.Errorf("Failed to update job data: %s", err)
} else {
lv.Reset()
}
})
em.Start(ctx)
return nil
}