| package status |
| |
| import ( |
| "context" |
| "sort" |
| "sync" |
| "time" |
| |
| "go.skia.org/infra/go/eventbus" |
| "go.skia.org/infra/go/metrics2" |
| "go.skia.org/infra/go/skerr" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/tiling" |
| "go.skia.org/infra/go/vcsinfo" |
| "go.skia.org/infra/golden/go/expstorage" |
| "go.skia.org/infra/golden/go/shared" |
| "go.skia.org/infra/golden/go/tilesource" |
| "go.skia.org/infra/golden/go/types" |
| "go.skia.org/infra/golden/go/types/expectations" |
| ) |
| |
| const ( |
| // Metric names and templates for metric names added in this file. |
| METRIC_TOTAL = "gold_status_total_digests" |
| METRIC_ALL = "gold_status_all" |
| METRIC_CORPUS = "gold_status_by_corpus" |
| ) |
| |
| // GUIStatus reflects the current rebaseline status. In particular whether |
| // HEAD is baselined and how many untriaged and negative digests there |
| // currently are. |
| type GUIStatus struct { |
| // Indicates whether current HEAD is ok. |
| OK bool `json:"ok"` |
| |
| FirstCommit *tiling.Commit `json:"firstCommit"` |
| |
| // Last commit currently know. |
| LastCommit *tiling.Commit `json:"lastCommit"` |
| |
| TotalCommits int `json:"totalCommits"` |
| FilledCommits int `json:"filledCommits"` |
| |
| // Status per corpus. |
| CorpStatus []*GUICorpusStatus `json:"corpStatus"` |
| } |
| |
| type GUICorpusStatus struct { |
| // Name of the corpus. |
| Name string `json:"name"` |
| |
| // Indicates whether this status is ok. |
| OK bool `json:"ok"` |
| |
| // Earliest commit hash considered HEAD (is not always the last commit). |
| MinCommitHash string `json:"minCommitHash"` |
| |
| // Number of untriaged digests in HEAD. |
| UntriagedCount int `json:"untriagedCount"` |
| |
| // Number of negative digests in HEAD. |
| NegativeCount int `json:"negativeCount"` |
| } |
| |
| type CorpusStatusSorter []*GUICorpusStatus |
| |
| // Implement sort.Interface |
| func (c CorpusStatusSorter) Len() int { return len(c) } |
| func (c CorpusStatusSorter) Less(i, j int) bool { return c[i].Name < c[j].Name } |
| func (c CorpusStatusSorter) Swap(i, j int) { c[i], c[j] = c[j], c[i] } |
| |
| type StatusWatcherConfig struct { |
| EventBus eventbus.EventBus |
| ExpectationsStore expstorage.ExpectationsStore |
| TileSource tilesource.TileSource |
| VCS vcsinfo.VCS |
| } |
| |
| type StatusWatcher struct { |
| StatusWatcherConfig |
| current *GUIStatus |
| mutex sync.Mutex |
| |
| // Gauges to track overall digests with different labels. |
| allUntriagedGauge metrics2.Int64Metric |
| allPositiveGauge metrics2.Int64Metric |
| allNegativeGauge metrics2.Int64Metric |
| totalGauge metrics2.Int64Metric |
| |
| // Gauges to track counts of digests by corpus / label |
| corpusGauges map[string]map[expectations.Label]metrics2.Int64Metric |
| } |
| |
| func New(ctx context.Context, swc StatusWatcherConfig) (*StatusWatcher, error) { |
| ret := &StatusWatcher{ |
| StatusWatcherConfig: swc, |
| allUntriagedGauge: metrics2.GetInt64Metric(METRIC_ALL, map[string]string{"type": expectations.Untriaged.String()}), |
| allPositiveGauge: metrics2.GetInt64Metric(METRIC_ALL, map[string]string{"type": expectations.Positive.String()}), |
| allNegativeGauge: metrics2.GetInt64Metric(METRIC_ALL, map[string]string{"type": expectations.Negative.String()}), |
| totalGauge: metrics2.GetInt64Metric(METRIC_TOTAL, nil), |
| corpusGauges: map[string]map[expectations.Label]metrics2.Int64Metric{}, |
| } |
| |
| if err := ret.calcAndWatchStatus(ctx); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| return ret, nil |
| } |
| |
| // GetStatus returns the current status, ready to be sent to the frontend. |
| func (s *StatusWatcher) GetStatus() *GUIStatus { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| return s.current |
| } |
| |
| // updateLastCommitAge calculates how old the last commit in Gold is and reports |
| // it to metrics, so we can alert on it. It computes the age in two ways - absolute |
| // age (wall_time) and time since a newer commit landed (with_new_commit). The latter |
| // is the preferred metric due to the lower false-positive chance, with the former |
| // being a good backup since it has a lower false-negative chance. |
| // updateLastCommitAge is thread-safe. |
| func (s *StatusWatcher) updateLastCommitAge() { |
| st := s.GetStatus() |
| if st == nil { |
| sklog.Warningf("GetStatus() was nil when computing metrics") |
| return |
| } |
| if st.LastCommit == nil { |
| sklog.Warningf("GetStatus() had nil LastCommit when computing metrics: %#v", st) |
| return |
| } |
| |
| lastCommitAge := metrics2.GetInt64Metric("gold_last_commit_age_s", map[string]string{ |
| "type": "wall_time", |
| }) |
| lastCommitUnix := st.LastCommit.CommitTime // already in seconds since epoch |
| lastCommitAge.Update(time.Now().Unix() - lastCommitUnix) |
| |
| if s.VCS == nil { |
| sklog.Warningf("skipping updateLastCommitAge because VCS not set up") |
| return |
| } |
| // Start looking one second after the commit we know about to avoid erroneously |
| // alerting when two commits land in the same second. |
| commitsFromLast := s.VCS.Range(time.Unix(st.LastCommit.CommitTime+1, 0), time.Now()) |
| uningestedCommitAgeMetric := metrics2.GetInt64Metric("gold_last_commit_age_s", map[string]string{ |
| "type": "with_new_commit", |
| }) |
| if len(commitsFromLast) == 0 { |
| uningestedCommitAgeMetric.Update(0) |
| } else { |
| oldestNoningestedCommit := commitsFromLast[0] |
| uningestedCommitAgeMetric.Update(time.Now().Unix() - oldestNoningestedCommit.Timestamp.Unix()) |
| } |
| } |
| |
| func (s *StatusWatcher) calcAndWatchStatus(ctx context.Context) error { |
| sklog.Infof("Starting status watcher") |
| expChanges := make(chan expstorage.Delta) |
| s.EventBus.SubscribeAsync(expstorage.ExpectationsChangedTopic, func(e interface{}) { |
| expChanges <- e.(*expstorage.EventExpectationChange).ExpectationDelta |
| }) |
| |
| tileStream := tilesource.GetTileStreamNow(s.TileSource, 2*time.Minute, "status-watcher") |
| sklog.Infof("Got tile stream for status watcher") |
| |
| lastCpxTile := <-tileStream |
| sklog.Infof("Received first tile for status watcher") |
| |
| if err := s.calcStatus(ctx, lastCpxTile); err != nil { |
| return skerr.Wrap(err) |
| } |
| sklog.Infof("Calculated first status") |
| |
| liveness := metrics2.NewLiveness("gold_status_monitoring") |
| go func() { |
| for { |
| select { |
| case cpxTile := <-tileStream: |
| if err := s.calcStatus(ctx, cpxTile); err != nil { |
| sklog.Errorf("Error calculating status: %s", err) |
| } else { |
| lastCpxTile = cpxTile |
| liveness.Reset() |
| } |
| case <-expChanges: |
| drainChangeChannel(expChanges) |
| if err := s.calcStatus(ctx, lastCpxTile); err != nil { |
| sklog.Errorf("Error calculating tile after expectation update: %s", err) |
| } |
| liveness.Reset() |
| } |
| } |
| }() |
| sklog.Infof("Done starting status watcher") |
| |
| return nil |
| } |
| |
| func (s *StatusWatcher) calcStatus(ctx context.Context, cpxTile types.ComplexTile) error { |
| defer s.updateLastCommitAge() |
| defer shared.NewMetricsTimer("calculate_status").Stop() |
| |
| okByCorpus := map[string]bool{} |
| exp, err := s.ExpectationsStore.Get(ctx) |
| if err != nil { |
| return skerr.Wrapf(err, "fetching expectations") |
| } |
| |
| // Gathers unique labels by corpus and label. |
| byCorpus := map[string]map[expectations.Label]map[string]bool{} |
| |
| // Iterate over the current traces |
| dataTile := cpxTile.GetTile(types.ExcludeIgnoredTraces) |
| if len(dataTile.Commits) == 0 { |
| sklog.Warningf("Empty tile, doing nothing") |
| return nil |
| } |
| tileLen := dataTile.LastCommitIndex() + 1 |
| for _, trace := range dataTile.Traces { |
| gTrace := trace.(*types.GoldenTrace) |
| |
| idx := tileLen - 1 |
| for (idx >= 0) && (gTrace.Digests[idx] == types.MISSING_DIGEST) { |
| idx-- |
| } |
| |
| // If this is an empty trace we ignore it for now. |
| if idx == -1 { |
| continue |
| } |
| |
| // If this corpus doesn't exist yet, we initialize it. |
| corpus := gTrace.Corpus() |
| if _, ok := byCorpus[corpus]; !ok { |
| okByCorpus[corpus] = true |
| byCorpus[corpus] = map[expectations.Label]map[string]bool{ |
| expectations.Positive: {}, |
| expectations.Negative: {}, |
| expectations.Untriaged: {}, |
| } |
| |
| if _, ok := s.corpusGauges[corpus]; !ok { |
| s.corpusGauges[corpus] = map[expectations.Label]metrics2.Int64Metric{ |
| expectations.Untriaged: metrics2.GetInt64Metric(METRIC_CORPUS, map[string]string{"type": expectations.Untriaged.String(), "corpus": corpus}), |
| expectations.Positive: metrics2.GetInt64Metric(METRIC_CORPUS, map[string]string{"type": expectations.Positive.String(), "corpus": corpus}), |
| expectations.Negative: metrics2.GetInt64Metric(METRIC_CORPUS, map[string]string{"type": expectations.Negative.String(), "corpus": corpus}), |
| } |
| } |
| } |
| |
| // Account for the corpus and testname. |
| digest := gTrace.Digests[idx] |
| testName := gTrace.TestName() |
| status := exp.Classification(testName, digest) |
| |
| okByCorpus[corpus] = okByCorpus[corpus] && |
| ((status == expectations.Positive) || (status == expectations.Negative)) |
| byCorpus[corpus][status][string(testName)+string(digest)] = true |
| } |
| |
| overallOk := true |
| allUntriagedCount := 0 |
| allPositiveCount := 0 |
| allNegativeCount := 0 |
| corpStatus := make([]*GUICorpusStatus, 0, len(byCorpus)) |
| for corpus := range byCorpus { |
| overallOk = overallOk && okByCorpus[corpus] |
| untriagedCount := len(byCorpus[corpus][expectations.Untriaged]) |
| positiveCount := len(byCorpus[corpus][expectations.Positive]) |
| negativeCount := len(byCorpus[corpus][expectations.Negative]) |
| corpStatus = append(corpStatus, &GUICorpusStatus{ |
| Name: corpus, |
| OK: okByCorpus[corpus], |
| UntriagedCount: untriagedCount, |
| NegativeCount: negativeCount, |
| }) |
| allUntriagedCount += untriagedCount |
| allNegativeCount += negativeCount |
| allPositiveCount += positiveCount |
| |
| s.corpusGauges[corpus][expectations.Positive].Update(int64(positiveCount)) |
| s.corpusGauges[corpus][expectations.Negative].Update(int64(negativeCount)) |
| s.corpusGauges[corpus][expectations.Untriaged].Update(int64(untriagedCount)) |
| } |
| s.allUntriagedGauge.Update(int64(allUntriagedCount)) |
| s.allPositiveGauge.Update(int64(allPositiveCount)) |
| s.allNegativeGauge.Update(int64(allNegativeCount)) |
| s.totalGauge.Update(int64(allUntriagedCount + allPositiveCount + allNegativeCount)) |
| |
| sort.Sort(CorpusStatusSorter(corpStatus)) |
| |
| allCommits := cpxTile.AllCommits() |
| result := &GUIStatus{ |
| OK: overallOk, |
| FirstCommit: allCommits[0], |
| LastCommit: allCommits[len(allCommits)-1], |
| TotalCommits: len(allCommits), |
| FilledCommits: cpxTile.FilledCommits(), |
| CorpStatus: corpStatus, |
| } |
| |
| // Swap out the current tile. |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| s.current = result |
| |
| return nil |
| } |
| |
| // drainChangeChannel removes everything from the channel that's currently |
| // buffered or ready to be read. |
| func drainChangeChannel(ch <-chan expstorage.Delta) { |
| for { |
| select { |
| case <-ch: |
| default: |
| return |
| } |
| } |
| } |