| package main |
| |
| /* |
| Jobs metrics. |
| */ |
| |
| import ( |
| "bytes" |
| "context" |
| "encoding/gob" |
| "fmt" |
| "sync" |
| "time" |
| |
| "go.skia.org/infra/go/metrics2" |
| "go.skia.org/infra/go/metrics2/events" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/util" |
| "go.skia.org/infra/task_scheduler/go/db" |
| "go.skia.org/infra/task_scheduler/go/db/remote_db" |
| ) |
| |
| var ( |
| // Time periods over which to compute metrics. Assumed to be sorted in |
| // increasing order. |
| TIME_PERIODS = []time.Duration{24 * time.Hour, 7 * 24 * time.Hour} |
| ) |
| |
| // jobEventDB implements the events.EventDB interface. |
| type jobEventDB struct { |
| cached map[string][]*events.Event |
| db db.JobReader |
| em *events.EventMetrics |
| metrics map[string]bool // Only update() may read/write this map. |
| // Do not lock mtx when calling methods on em. Otherwise deadlock can occur. |
| // E.g. (now fixed): |
| // 1. Thread 1: EventManager.updateMetrics locks EventMetrics.mtx |
| // 2. Thread 2: jobEventDB.update locks jobEventDB.mtx |
| // 3. Thread 1: EventManager.updateMetrics calls jobEventDB.Range, which waits |
| // to lock jobEventDB.mtx |
| // 4. Thread 2: jobEventDB.update calls EventMetrics.AggregateMetric (by way |
| // of addAggregates and EventStream.AggregateMetric), which waits to lock |
| // EventMetrics.mtx |
| mtx sync.Mutex |
| } |
| |
| // See docs for events.EventDB interface. |
| func (j *jobEventDB) Append(string, []byte) error { |
| return fmt.Errorf("jobEventDB is read-only!") |
| } |
| |
| // See docs for events.EventDB interface. |
| func (j *jobEventDB) Close() error { |
| return nil |
| } |
| |
| // See docs for events.EventDB interface. |
| func (j *jobEventDB) Insert(*events.Event) error { |
| return fmt.Errorf("jobEventDB is read-only!") |
| } |
| |
| // See docs for events.EventDB interface. |
| func (j *jobEventDB) Range(stream string, start, end time.Time) ([]*events.Event, error) { |
| j.mtx.Lock() |
| defer j.mtx.Unlock() |
| rv := make([]*events.Event, 0, len(j.cached[stream])) |
| for _, ev := range j.cached[stream] { |
| if !start.After(ev.Timestamp) && ev.Timestamp.Before(end) { |
| rv = append(rv, ev) |
| } |
| } |
| return rv, nil |
| } |
| |
| // update updates the cached jobs in the jobEventDB. Only a single thread may |
| // call this method, but it can be called concurrently with other methods. |
| func (j *jobEventDB) update() error { |
| defer metrics2.FuncTimer().Stop() |
| now := time.Now() |
| longestPeriod := TIME_PERIODS[len(TIME_PERIODS)-1] |
| jobs, err := j.db.GetJobsFromDateRange(now.Add(-longestPeriod), now) |
| if err != nil { |
| return fmt.Errorf("Failed to load jobs from %s to %s: %s", now.Add(-longestPeriod), now, err) |
| } |
| sklog.Debugf("jobEventDB.update: Processing %d jobs for time range %s to %s.", len(jobs), now.Add(-longestPeriod), now) |
| cached := map[string][]*events.Event{} |
| for _, job := range jobs { |
| if !job.Done() { |
| continue |
| } |
| var buf bytes.Buffer |
| if err := gob.NewEncoder(&buf).Encode(job); err != nil { |
| return fmt.Errorf("Failed to encode %#v to GOB: %s", job, err) |
| } |
| ev := &events.Event{ |
| Stream: job.Name, |
| Timestamp: job.Created, |
| Data: buf.Bytes(), |
| } |
| cached[job.Name] = append(cached[job.Name], ev) |
| |
| // TODO(borenet): Need to think about what happens when jobs are |
| // removed or renamed. As written, we'll continue to report |
| // metrics on defunct jobs until datahopper is restarted. There |
| // isn't currently a way to remove metrics from EventMetrics. If |
| // we added that, we could diff the jobs before and after and |
| // remove metrics for those we don't see in the window. |
| if !j.metrics[job.Name] { |
| s := j.em.GetEventStream(job.Name) |
| if err := addAggregates(s); err != nil { |
| return err |
| } |
| j.metrics[job.Name] = true |
| } |
| } |
| j.mtx.Lock() |
| defer j.mtx.Unlock() |
| j.cached = cached |
| return nil |
| } |
| |
| // addAggregates adds aggregation functions for job events to the EventStream. |
| func addAggregates(s *events.EventStream) error { |
| for _, period := range TIME_PERIODS { |
| // Average Job duration. |
| if err := s.AggregateMetric(map[string]string{"metric": "avg-duration"}, period, func(ev []*events.Event) (float64, error) { |
| if len(ev) > 0 { |
| // ev should be ordered by timestamp |
| sklog.Debugf("Calculating avg-duration for %s for %d jobs since %s.", ev[0].Stream, len(ev), ev[0].Timestamp) |
| } |
| count := 0 |
| total := time.Duration(0) |
| for _, e := range ev { |
| var job db.Job |
| if err := gob.NewDecoder(bytes.NewBuffer(e.Data)).Decode(&job); err != nil { |
| return 0.0, err |
| } |
| if !(job.Status == db.JOB_STATUS_SUCCESS || job.Status == db.JOB_STATUS_FAILURE) { |
| continue |
| } |
| count++ |
| total += job.Finished.Sub(job.Created) |
| } |
| if count == 0 { |
| return 0.0, nil |
| } |
| return float64(total) / float64(count), nil |
| }); err != nil { |
| return err |
| } |
| |
| // Job failure rate. |
| if err := s.AggregateMetric(map[string]string{"metric": "failure-rate"}, period, func(ev []*events.Event) (float64, error) { |
| if len(ev) > 0 { |
| // ev should be ordered by timestamp |
| sklog.Debugf("Calculating failure-rate for %s for %d jobs since %s.", ev[0].Stream, len(ev), ev[0].Timestamp) |
| } |
| count := 0 |
| fails := 0 |
| for _, e := range ev { |
| var job db.Job |
| if err := gob.NewDecoder(bytes.NewBuffer(e.Data)).Decode(&job); err != nil { |
| return 0.0, err |
| } |
| count++ |
| if job.Status == db.JOB_STATUS_FAILURE { |
| fails++ |
| } |
| } |
| if count == 0 { |
| return 0.0, nil |
| } |
| return float64(fails) / float64(count), nil |
| }); err != nil { |
| return err |
| } |
| |
| // Job mishap rate. |
| if err := s.AggregateMetric(map[string]string{"metric": "mishap-rate"}, period, func(ev []*events.Event) (float64, error) { |
| if len(ev) > 0 { |
| // ev should be ordered by timestamp |
| sklog.Debugf("Calculating mishap-rate for %s for %d jobs since %s.", ev[0].Stream, len(ev), ev[0].Timestamp) |
| } |
| count := 0 |
| mishap := 0 |
| for _, e := range ev { |
| var job db.Job |
| if err := gob.NewDecoder(bytes.NewBuffer(e.Data)).Decode(&job); err != nil { |
| return 0.0, err |
| } |
| count++ |
| if job.Status == db.JOB_STATUS_MISHAP { |
| mishap++ |
| } |
| } |
| if count == 0 { |
| return 0.0, nil |
| } |
| return float64(mishap) / float64(count), nil |
| }); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // StartJobMetrics starts a goroutine which ingests metrics data based on Jobs. |
| func StartJobMetrics(taskSchedulerDbUrl string, ctx context.Context) error { |
| db, err := remote_db.NewClient(taskSchedulerDbUrl) |
| if err != nil { |
| return err |
| } |
| edb := &jobEventDB{ |
| cached: map[string][]*events.Event{}, |
| db: db, |
| metrics: map[string]bool{}, |
| } |
| em, err := events.NewEventMetrics(edb, "job_metrics") |
| if err != nil { |
| return err |
| } |
| edb.em = em |
| lv := metrics2.NewLiveness("last_successful_job_metrics_update") |
| go util.RepeatCtx(5*time.Minute, ctx, func() { |
| if err := edb.update(); err != nil { |
| sklog.Errorf("Failed to update job data: %s", err) |
| } else { |
| lv.Reset() |
| } |
| }) |
| em.Start(ctx) |
| return nil |
| } |