| package main |
| |
| import ( |
| "bytes" |
| "context" |
| "encoding/gob" |
| "fmt" |
| "sort" |
| "sync" |
| "time" |
| |
| "go.skia.org/infra/go/metrics2" |
| "go.skia.org/infra/go/metrics2/events" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/util" |
| "go.skia.org/infra/task_scheduler/go/db" |
| "go.skia.org/infra/task_scheduler/go/flakes" |
| "go.skia.org/infra/task_scheduler/go/types" |
| ) |
| |
| const ( |
| TASK_STREAM = "task_metrics" |
| ) |
| |
| // taskEventDB implements the events.EventDB interface. |
| type taskEventDB struct { |
| cached []*events.Event |
| db db.TaskReader |
| em *events.EventMetrics |
| // Do not lock mtx when calling methods on em. Otherwise deadlock can occur. |
| // E.g. (now fixed): |
| // 1. Thread 1: EventManager.updateMetrics locks EventMetrics.mtx |
| // 2. Thread 2: taskEventDB.update locks taskEventDB.mtx |
| // 3. Thread 1: EventManager.updateMetrics calls taskEventDB.Range, which waits |
| // to lock taskEventDB.mtx |
| // 4. Thread 2: taskEventDB.update calls EventMetrics.AggregateMetric (by way |
| // of addTaskAggregates and EventStream.AggregateMetric), which waits to lock |
| // EventMetrics.mtx |
| mtx sync.Mutex |
| } |
| |
| // See docs for events.EventDB interface. |
| func (t *taskEventDB) Append(string, []byte) error { |
| return fmt.Errorf("taskEventDB is read-only!") |
| } |
| |
| // See docs for events.EventDB interface. |
| func (t *taskEventDB) Close() error { |
| return nil |
| } |
| |
| // See docs for events.EventDB interface. |
| func (t *taskEventDB) Insert(*events.Event) error { |
| return fmt.Errorf("taskEventDB is read-only!") |
| } |
| |
| // See docs for events.EventDB interface. |
| func (t *taskEventDB) Range(stream string, start, end time.Time) ([]*events.Event, error) { |
| t.mtx.Lock() |
| defer t.mtx.Unlock() |
| |
| n := len(t.cached) |
| if n == 0 { |
| return []*events.Event{}, nil |
| } |
| |
| first := sort.Search(n, func(i int) bool { |
| return !t.cached[i].Timestamp.Before(start) |
| }) |
| |
| last := first + sort.Search(n-first, func(i int) bool { |
| return !t.cached[i+first].Timestamp.Before(end) |
| }) |
| |
| rv := make([]*events.Event, last-first, last-first) |
| copy(rv[:], t.cached[first:last]) |
| return rv, nil |
| } |
| |
| // update updates the cached tasks in the taskEventDB. Only a single thread may |
| // call this method, but it can be called concurrently with other methods. |
| func (t *taskEventDB) update() error { |
| defer metrics2.FuncTimer().Stop() |
| now := time.Now() |
| longestPeriod := TIME_PERIODS[len(TIME_PERIODS)-1] |
| tasks, err := t.db.GetTasksFromDateRange(now.Add(-longestPeriod), now, "") |
| if err != nil { |
| return fmt.Errorf("Failed to load tasks from %s to %s: %s", now.Add(-longestPeriod), now, err) |
| } |
| sklog.Debugf("taskEventDB.update: Processing %d tasks for time range %s to %s.", len(tasks), now.Add(-longestPeriod), now) |
| cached := make([]*events.Event, 0, len(tasks)) |
| for _, task := range tasks { |
| if !task.Done() { |
| continue |
| } |
| var buf bytes.Buffer |
| if err := gob.NewEncoder(&buf).Encode(task); err != nil { |
| return fmt.Errorf("Failed to encode %#v to GOB: %s", task, err) |
| } |
| ev := &events.Event{ |
| Stream: TASK_STREAM, |
| Timestamp: task.Created, |
| Data: buf.Bytes(), |
| } |
| cached = append(cached, ev) |
| } |
| t.mtx.Lock() |
| defer t.mtx.Unlock() |
| t.cached = cached |
| return nil |
| } |
| |
| // computeTaskFlakeRate is an events.DynamicAggregateFn that returns metrics for Task flake rate, given |
| // a slice of Events created by taskEventDB.update. The first return value will contain the tags |
| // "task_name" (types.Task.Name) and "metric" (one of "failure-rate", "mishap-rate"), |
| // and the second return value will be the corresponding ratio of failed/mishap Task to all |
| // completed Tasks. Returns an error if Event.Data can't be GOB-decoded as a types.Task. |
| func computeTaskFlakeRate(ev []*events.Event) ([]map[string]string, []float64, error) { |
| if len(ev) > 0 { |
| // ev should be ordered by timestamp |
| sklog.Debugf("Calculating flake-rate for %d tasks since %s.", len(ev), ev[0].Timestamp) |
| } |
| type taskSum struct { |
| flakes int |
| count int |
| } |
| byTask := map[string]*taskSum{} |
| tasks := make([]*types.Task, 0, len(ev)) |
| for _, e := range ev { |
| var task types.Task |
| if err := gob.NewDecoder(bytes.NewReader(e.Data)).Decode(&task); err != nil { |
| return nil, nil, err |
| } |
| tasks = append(tasks, &task) |
| entry, ok := byTask[task.Name] |
| if !ok { |
| entry = &taskSum{} |
| byTask[task.Name] = entry |
| } |
| entry.count++ |
| } |
| flaky := flakes.FindFlakes(tasks) |
| for _, task := range flaky { |
| byTask[task.Name].flakes++ |
| } |
| rvTags := make([]map[string]string, 0, len(byTask)*2) |
| rvVals := make([]float64, 0, len(byTask)*2) |
| add := func(taskName, metric string, value float64) { |
| rvTags = append(rvTags, map[string]string{ |
| "task_name": taskName, |
| "metric": metric, |
| }) |
| rvVals = append(rvVals, value) |
| } |
| for taskName, taskSum := range byTask { |
| if taskSum.count == 0 { |
| continue |
| } |
| add(taskName, "flake-rate", float64(taskSum.flakes)/float64(taskSum.count)) |
| } |
| return rvTags, rvVals, nil |
| } |
| |
| // addTaskAggregates adds aggregation functions for job events to the EventStream. |
| func addTaskAggregates(s *events.EventStream, instance string) error { |
| for _, period := range TIME_PERIODS { |
| // Flake rate. |
| if err := s.DynamicMetric(map[string]string{"instance": instance}, period, computeTaskFlakeRate); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // StartTaskMetrics starts a goroutine which ingests metrics data based on Tasks. |
| func StartTaskMetrics(ctx context.Context, taskDb db.TaskReader, instance string) error { |
| edb := &taskEventDB{ |
| cached: []*events.Event{}, |
| db: taskDb, |
| } |
| em, err := events.NewEventMetrics(edb, "task_metrics") |
| if err != nil { |
| return err |
| } |
| edb.em = em |
| |
| s := em.GetEventStream(TASK_STREAM) |
| if err := addTaskAggregates(s, instance); err != nil { |
| return err |
| } |
| |
| lv := metrics2.NewLiveness("last_successful_task_metrics_update") |
| go util.RepeatCtx(5*time.Minute, ctx, func() { |
| if err := edb.update(); err != nil { |
| sklog.Errorf("Failed to update task data: %s", err) |
| } else { |
| lv.Reset() |
| } |
| }) |
| em.Start(ctx) |
| return nil |
| } |