| // Package search2 encapsulates various queries we make against Gold's data. It is backed |
| // by the SQL database and aims to replace the current search package. |
| package search |
| |
| import ( |
| "bytes" |
| "context" |
| "encoding/hex" |
| "fmt" |
| "sort" |
| "strconv" |
| "strings" |
| "sync" |
| "time" |
| |
| lru "github.com/hashicorp/golang-lru" |
| "github.com/jackc/pgtype" |
| "github.com/jackc/pgx/v4" |
| "github.com/jackc/pgx/v4/pgxpool" |
| ttlcache "github.com/patrickmn/go-cache" |
| "go.opencensus.io/trace" |
| "golang.org/x/sync/errgroup" |
| |
| "go.skia.org/infra/go/cache" |
| "go.skia.org/infra/go/paramtools" |
| "go.skia.org/infra/go/skerr" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/util" |
| "go.skia.org/infra/golden/go/config" |
| "go.skia.org/infra/golden/go/expectations" |
| "go.skia.org/infra/golden/go/publicparams" |
| "go.skia.org/infra/golden/go/search/caching" |
| "go.skia.org/infra/golden/go/search/common" |
| "go.skia.org/infra/golden/go/search/query" |
| "go.skia.org/infra/golden/go/sql" |
| "go.skia.org/infra/golden/go/sql/schema" |
| "go.skia.org/infra/golden/go/tiling" |
| "go.skia.org/infra/golden/go/types" |
| "go.skia.org/infra/golden/go/web/frontend" |
| ) |
| |
| type API interface { |
| // NewAndUntriagedSummaryForCL returns a summarized look at the new digests produced by a CL |
| // (that is, digests not currently on the primary branch for this grouping at all) as well as |
| // how many of the newly produced digests are currently untriaged. |
| NewAndUntriagedSummaryForCL(ctx context.Context, qCLID string) (NewAndUntriagedSummary, error) |
| |
| // ChangelistLastUpdated returns the timestamp that the given CL was updated. It returns an |
| // error if the CL does not exist. |
| ChangelistLastUpdated(ctx context.Context, qCLID string) (time.Time, error) |
| |
| // Search queries the current tile based on the parameters specified in |
| // the instance of the *query.Search. |
| Search(context.Context, *query.Search) (*frontend.SearchResponse, error) |
| |
| // GetPrimaryBranchParamset returns all params that are on the most recent few tiles. If |
| // this is public view, it will only return the params on the traces which match the publicly |
| // visible rules. |
| GetPrimaryBranchParamset(ctx context.Context) (paramtools.ReadOnlyParamSet, error) |
| |
| // GetChangelistParamset returns all params that were produced by the given CL. If |
| // this is public view, it will only return the params on the traces which match the publicly |
| // visible rules. |
| GetChangelistParamset(ctx context.Context, crs, clID string) (paramtools.ReadOnlyParamSet, error) |
| |
| // GetBlamesForUntriagedDigests finds all untriaged digests at head and then tries to determine |
| // which commits first introduced those untriaged digests. It returns a list of commits or |
| // commit ranges that are believed to have caused those untriaged digests. |
| GetBlamesForUntriagedDigests(ctx context.Context, corpus string) (BlameSummaryV1, error) |
| |
| // GetCluster returns all digests from the traces matching the various filters compared to |
| // all other digests in that set, so they can be drawn as a 2d cluster. This helps visualize |
| // patterns in the images, which can identify errors in triaging, among other things. |
| GetCluster(ctx context.Context, opts ClusterOptions) (frontend.ClusterDiffResult, error) |
| |
| // GetCommitsInWindow returns the commits in the configured window. |
| GetCommitsInWindow(ctx context.Context) ([]frontend.Commit, error) |
| |
| // GetDigestsForGrouping returns all digests that were produced in a given grouping in the most |
| // recent window of data. |
| GetDigestsForGrouping(ctx context.Context, grouping paramtools.Params) (frontend.DigestListResponse, error) |
| |
| // GetDigestDetails returns information about the given digest as produced on the given |
| // grouping. If the CL and CRS are provided, it will include information specific to that CL. |
| GetDigestDetails(ctx context.Context, grouping paramtools.Params, digest types.Digest, clID, crs string) (frontend.DigestDetails, error) |
| |
| // GetDigestsDiff returns comparison and triage information about the left and right digest. |
| GetDigestsDiff(ctx context.Context, grouping paramtools.Params, left, right types.Digest, clID, crs string) (frontend.DigestComparison, error) |
| |
| // CountDigestsByTest summarizes the counts of digests according to some limited filtering |
| // and breaks it down by test. |
| CountDigestsByTest(ctx context.Context, q frontend.ListTestsQuery) (frontend.ListTestsResponse, error) |
| |
| // ComputeGUIStatus looks at all visible traces at head and returns a summary of how many are |
| // untriaged for each corpus, as well as the most recent commit for which we have data. |
| ComputeGUIStatus(ctx context.Context) (frontend.GUIStatus, error) |
| } |
| |
| // NewAndUntriagedSummary is a summary of the results associated with a given CL. It focuses on |
| // the untriaged and new images produced. |
| type NewAndUntriagedSummary struct { |
| // ChangelistID is the nonqualified id of the CL. |
| ChangelistID string |
| // PatchsetSummaries is a summary for all Patchsets for which we have data. |
| PatchsetSummaries []PatchsetNewAndUntriagedSummary |
| // LastUpdated returns the timestamp of the CL, which corresponds to the last datapoint for |
| // this CL. |
| LastUpdated time.Time |
| // Outdated is set to true if the value that was previously cached was out of date and is |
| // currently being recalculated. We do this to return something quickly to the user (even if |
| // something like the |
| Outdated bool |
| } |
| |
| // PatchsetNewAndUntriagedSummary is the summary for a specific PS. It focuses on the untriaged |
| // and new images produced. |
| type PatchsetNewAndUntriagedSummary struct { |
| // NewImages is the number of new images (digests) that were produced by this patchset by |
| // non-ignored traces and not seen on the primary branch. |
| NewImages int |
| // NewUntriagedImages is the number of NewImages which are still untriaged. It is less than or |
| // equal to NewImages. |
| NewUntriagedImages int |
| // TotalUntriagedImages is the number of images produced by this patchset by non-ignored traces |
| // that are untriaged. This includes images that are untriaged and observed on the primary |
| // branch (i.e. might not be the fault of this CL/PS). It is greater than or equal to |
| // NewUntriagedImages. |
| TotalUntriagedImages int |
| // PatchsetID is the nonqualified id of the patchset. This is usually a git hash. |
| PatchsetID string |
| // PatchsetOrder is represents the chronological order the patchsets are in. It starts at 1. |
| PatchsetOrder int |
| } |
| |
| type BlameSummaryV1 struct { |
| Ranges []BlameEntry |
| } |
| |
| // BlameEntry represents a commit or range of commits that is responsible for some amount of |
| // untriaged digests. It allows us to identify potentially problematic commits and coordinate with |
| // the authors as necessary. |
| type BlameEntry struct { |
| // CommitRange is either a single commit id or two commit ids separated by a colon indicating |
| // a range. This string can be used as the "blame id" in the search. |
| CommitRange string |
| // TotalUntriagedDigests is the number of digests that are believed to be first untriaged |
| // in this commit range. |
| TotalUntriagedDigests int |
| // AffectedGroupings summarize the untriaged digests affected in the commit range. |
| AffectedGroupings []*AffectedGrouping |
| // Commits is one or two commits corresponding to the CommitRange. |
| Commits []frontend.Commit |
| } |
| |
| type AffectedGrouping struct { |
| Grouping paramtools.Params |
| UntriagedDigests int |
| SampleDigest types.Digest |
| |
| // groupingID is used as an intermediate step in combineIntoRanges, and to search by blame ID. |
| groupingID schema.MD5Hash |
| |
| // traceIDsAndDigests is used to search by blame ID. |
| traceIDsAndDigests []traceIDAndDigest |
| } |
| |
| type traceIDAndDigest struct { |
| id schema.TraceID |
| digest schema.DigestBytes |
| } |
| |
| type ClusterOptions struct { |
| Grouping paramtools.Params |
| Filters paramtools.ParamSet |
| IncludePositiveDigests bool |
| IncludeNegativeDigests bool |
| IncludeUntriagedDigests bool |
| CodeReviewSystem string |
| ChangelistID string |
| PatchsetID string |
| } |
| |
| const ( |
| commitCacheSize = 5_000 |
| optionsGroupingCacheSize = 50_000 |
| traceCacheSize = 1_000_000 |
| ) |
| |
| type Impl struct { |
| db *pgxpool.Pool |
| windowLength int |
| // Lets us create links from CL data to the Code Review System that produced it. |
| reviewSystemMapping map[string]string |
| |
| // mutex protects the caches, e.g. digestsOnPrimary and publiclyVisibleTraces |
| mutex sync.RWMutex |
| // This caches the digests seen per grouping on the primary branch. |
| digestsOnPrimary map[groupingDigestKey]struct{} |
| // This caches the trace ids that are publicly visible. |
| publiclyVisibleTraces map[schema.MD5Hash]struct{} |
| // This caches the corpora names that are publicly visible. |
| publiclyVisibleCorpora map[string]struct{} |
| isPublicView bool |
| |
| commitCache *lru.Cache |
| optionsGroupingCache *lru.Cache |
| traceCache *lru.Cache |
| paramsetCache *ttlcache.Cache |
| |
| materializedViews map[string]bool |
| cacheManager *caching.SearchCacheManager |
| dbType config.DatabaseType |
| } |
| |
| // New returns an implementation of API. |
| func New(sqlDB *pgxpool.Pool, windowLength int, cacheClient cache.Cache, cache_corpora []string) *Impl { |
| cc, err := lru.New(commitCacheSize) |
| if err != nil { |
| panic(err) // should only happen if commitCacheSize is negative. |
| } |
| gc, err := lru.New(optionsGroupingCacheSize) |
| if err != nil { |
| panic(err) // should only happen if optionsGroupingCacheSize is negative. |
| } |
| tc, err := lru.New(traceCacheSize) |
| if err != nil { |
| panic(err) // should only happen if traceCacheSize is negative. |
| } |
| pc := ttlcache.New(time.Minute, 10*time.Minute) |
| return &Impl{ |
| db: sqlDB, |
| windowLength: windowLength, |
| digestsOnPrimary: map[groupingDigestKey]struct{}{}, |
| commitCache: cc, |
| optionsGroupingCache: gc, |
| traceCache: tc, |
| paramsetCache: pc, |
| reviewSystemMapping: map[string]string{}, |
| cacheManager: caching.New(cacheClient, sqlDB, cache_corpora, windowLength), |
| } |
| } |
| |
| // SetDatabaseType sets the database type for the current configuration. |
| func (s *Impl) SetDatabaseType(dbType config.DatabaseType) { |
| s.dbType = dbType |
| } |
| |
| // SetReviewSystemTemplates sets the URL templates that are used to link to the code review system. |
| // The Changelist ID will be substituted in using fmt.Sprintf and a %s placeholder. |
| func (s *Impl) SetReviewSystemTemplates(m map[string]string) { |
| s.reviewSystemMapping = m |
| } |
| |
| type groupingDigestKey struct { |
| groupingID schema.MD5Hash |
| digest schema.MD5Hash |
| } |
| |
| // StartCacheProcess loads the caches used for searching and starts a goroutine to keep those |
| // up to date. |
| func (s *Impl) StartCacheProcess(ctx context.Context, interval time.Duration, commitsWithData int) error { |
| if err := s.updateCaches(ctx, commitsWithData); err != nil { |
| return skerr.Wrapf(err, "setting up initial cache values") |
| } |
| go util.RepeatCtx(ctx, interval, func(ctx context.Context) { |
| err := s.updateCaches(ctx, commitsWithData) |
| if err != nil { |
| sklog.Errorf("Could not update caches: %s", err) |
| } |
| }) |
| return nil |
| } |
| |
| // updateCaches loads the digestsOnPrimary cache. |
| func (s *Impl) updateCaches(ctx context.Context, commitsWithData int) error { |
| ctx, span := trace.StartSpan(ctx, "search2_UpdateCaches", trace.WithSampler(trace.AlwaysSample())) |
| defer span.End() |
| tile, err := s.getStartingTile(ctx, commitsWithData) |
| if err != nil { |
| return skerr.Wrapf(err, "geting tile to search") |
| } |
| onPrimary, err := s.getDigestsOnPrimary(ctx, tile) |
| if err != nil { |
| return skerr.Wrapf(err, "getting digests on primary branch") |
| } |
| s.mutex.Lock() |
| s.digestsOnPrimary = onPrimary |
| s.mutex.Unlock() |
| sklog.Infof("Digests on Primary cache refreshed with %d entries", len(onPrimary)) |
| return nil |
| } |
| |
| // getStartingTile returns the commit ID which is the beginning of the tile of interest (so we |
| // get enough data to do our comparisons). |
| func (s *Impl) getStartingTile(ctx context.Context, commitsWithDataToSearch int) (schema.TileID, error) { |
| ctx, span := trace.StartSpan(ctx, "getStartingTile") |
| defer span.End() |
| if commitsWithDataToSearch <= 0 { |
| return 0, nil |
| } |
| row := s.db.QueryRow(ctx, `SELECT tile_id FROM CommitsWithData |
| ORDER BY commit_id DESC |
| LIMIT 1 OFFSET $1`, commitsWithDataToSearch) |
| var tileID int |
| if s.dbType == config.CockroachDB { |
| var lc pgtype.Int4 |
| if err := row.Scan(&lc); err != nil { |
| if err == pgx.ErrNoRows { |
| return 0, nil // not enough commits seen, so start at tile 0. |
| } |
| return 0, skerr.Wrapf(err, "getting latest commit") |
| } |
| if lc.Status == pgtype.Null { |
| // There are no commits with data, so start at tile 0. |
| return 0, nil |
| } |
| tileID = int(lc.Int) |
| |
| } else { |
| var lc pgtype.Int8 |
| if err := row.Scan(&lc); err != nil { |
| if err == pgx.ErrNoRows { |
| return 0, nil // not enough commits seen, so start at tile 0. |
| } |
| return 0, skerr.Wrapf(err, "getting latest commit") |
| } |
| if lc.Status == pgtype.Null { |
| // There are no commits with data, so start at tile 0. |
| return 0, nil |
| } |
| tileID = int(lc.Int) |
| } |
| return schema.TileID(tileID), nil |
| } |
| |
| // getDigestsOnPrimary returns a map of all distinct digests on the primary branch. |
| func (s *Impl) getDigestsOnPrimary(ctx context.Context, tile schema.TileID) (map[groupingDigestKey]struct{}, error) { |
| ctx, span := trace.StartSpan(ctx, "getDigestsOnPrimary") |
| defer span.End() |
| rows, err := s.db.Query(ctx, ` |
| SELECT DISTINCT grouping_id, digest FROM |
| TiledTraceDigests WHERE tile_id >= $1`, tile) |
| if err != nil { |
| if err == pgx.ErrNoRows { |
| return map[groupingDigestKey]struct{}{}, nil |
| } |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| rv := map[groupingDigestKey]struct{}{} |
| var digest schema.DigestBytes |
| var grouping schema.GroupingID |
| var key groupingDigestKey |
| keyGrouping := key.groupingID[:] |
| keyDigest := key.digest[:] |
| for rows.Next() { |
| err := rows.Scan(&grouping, &digest) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| copy(keyGrouping, grouping) |
| copy(keyDigest, digest) |
| rv[key] = struct{}{} |
| } |
| return rv, nil |
| } |
| |
| const ( |
| unignoredRecentTracesView = "traces" |
| byBlameView = "byblame" |
| ) |
| |
| // StartMaterializedViews creates materialized views for non-ignored traces belonging to the |
| // given corpora. It starts a goroutine to keep these up to date. |
| func (s *Impl) StartMaterializedViews(ctx context.Context, corpora []string, updateInterval time.Duration) error { |
| _, span := trace.StartSpan(ctx, "StartMaterializedViews", trace.WithSampler(trace.AlwaysSample())) |
| defer span.End() |
| if len(corpora) == 0 { |
| sklog.Infof("No materialized views configured") |
| return nil |
| } |
| sklog.Infof("Initializing materialized views") |
| eg, eCtx := errgroup.WithContext(ctx) |
| var mutex sync.Mutex |
| s.materializedViews = map[string]bool{} |
| for _, c := range corpora { |
| corpus := c |
| eg.Go(func() error { |
| mvName, err := s.createUnignoredRecentTracesView(eCtx, corpus) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| mutex.Lock() |
| defer mutex.Unlock() |
| s.materializedViews[mvName] = true |
| return nil |
| }) |
| eg.Go(func() error { |
| mvName, err := s.createByBlameView(eCtx, corpus) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| mutex.Lock() |
| defer mutex.Unlock() |
| s.materializedViews[mvName] = true |
| return nil |
| }) |
| } |
| if err := eg.Wait(); err != nil { |
| return skerr.Wrapf(err, "initializing materialized views %q", corpora) |
| } |
| |
| sklog.Infof("Initialized %d materialized views", len(s.materializedViews)) |
| go util.RepeatCtx(ctx, updateInterval, func(ctx context.Context) { |
| eg, eCtx := errgroup.WithContext(ctx) |
| for v := range s.materializedViews { |
| view := v |
| eg.Go(func() error { |
| statement := `REFRESH MATERIALIZED VIEW ` + view |
| _, err := s.db.Exec(eCtx, statement) |
| return skerr.Wrapf(err, "updating %s", view) |
| }) |
| } |
| if err := eg.Wait(); err != nil { |
| sklog.Warningf("Could not refresh material views: %s", err) |
| } |
| }) |
| return nil |
| } |
| |
| func (s *Impl) createUnignoredRecentTracesView(ctx context.Context, corpus string) (string, error) { |
| mvName := "mv_" + corpus + "_" + unignoredRecentTracesView |
| statement := "CREATE MATERIALIZED VIEW IF NOT EXISTS " + mvName |
| statement += ` |
| AS WITH |
| BeginningOfWindow AS ( |
| SELECT commit_id FROM CommitsWithData |
| ORDER BY commit_id DESC |
| OFFSET ` + strconv.Itoa(s.windowLength-1) + ` LIMIT 1 |
| ) |
| SELECT trace_id, grouping_id, digest FROM ValuesAtHead |
| JOIN BeginningOfWindow ON ValuesAtHead.most_recent_commit_id >= BeginningOfWindow.commit_id |
| WHERE corpus = '` + corpus + `' AND matches_any_ignore_rule = FALSE |
| ` |
| _, err := s.db.Exec(ctx, statement) |
| return mvName, skerr.Wrap(err) |
| } |
| |
| func (s *Impl) createByBlameView(ctx context.Context, corpus string) (string, error) { |
| mvName := "mv_" + corpus + "_" + byBlameView |
| statement := "CREATE MATERIALIZED VIEW IF NOT EXISTS " + mvName |
| statement += ` |
| AS WITH |
| BeginningOfWindow AS ( |
| SELECT commit_id FROM CommitsWithData |
| ORDER BY commit_id DESC |
| OFFSET ` + strconv.Itoa(s.windowLength-1) + ` LIMIT 1 |
| ), |
| UntriagedDigests AS ( |
| SELECT grouping_id, digest FROM Expectations |
| WHERE label = 'u' |
| ), |
| UnignoredDataAtHead AS ( |
| SELECT trace_id, grouping_id, digest FROM ValuesAtHead |
| JOIN BeginningOfWindow ON ValuesAtHead.most_recent_commit_id >= BeginningOfWindow.commit_id |
| WHERE matches_any_ignore_rule = FALSE AND corpus = '` + corpus + `' |
| ) |
| SELECT UnignoredDataAtHead.trace_id, UnignoredDataAtHead.grouping_id, UnignoredDataAtHead.digest FROM |
| UntriagedDigests |
| JOIN UnignoredDataAtHead ON UntriagedDigests.grouping_id = UnignoredDataAtHead.grouping_id AND |
| UntriagedDigests.digest = UnignoredDataAtHead.digest |
| ` |
| _, err := s.db.Exec(ctx, statement) |
| return mvName, skerr.Wrap(err) |
| } |
| |
| // getMaterializedView returns the name of the materialized view if it has been created, or empty |
| // string if there is not such a view. |
| func (s *Impl) getMaterializedView(viewName, corpus string) string { |
| if len(s.materializedViews) == 0 { |
| return "" |
| } |
| mv := "mv_" + corpus + "_" + viewName |
| if s.materializedViews[mv] { |
| return mv |
| } |
| return "" |
| } |
| |
| // StartApplyingPublicParams loads the cached set of traces which are publicly visible and then |
| // starts a goroutine to update this cache as per the provided interval. |
| func (s *Impl) StartApplyingPublicParams(ctx context.Context, matcher publicparams.Matcher, interval time.Duration) error { |
| _, span := trace.StartSpan(ctx, "StartApplyingPublicParams", trace.WithSampler(trace.AlwaysSample())) |
| defer span.End() |
| s.isPublicView = true |
| |
| cycle := func(ctx context.Context) error { |
| rows, err := s.db.Query(ctx, `SELECT trace_id, keys FROM Traces AS OF SYSTEM TIME '-0.1s'`) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| publiclyVisibleCorpora := map[string]struct{}{} |
| publiclyVisibleTraces := map[schema.MD5Hash]struct{}{} |
| var yes struct{} |
| var traceKey schema.MD5Hash |
| defer rows.Close() |
| for rows.Next() { |
| var traceID schema.TraceID |
| var keys paramtools.Params |
| if err := rows.Scan(&traceID, &keys); err != nil { |
| return skerr.Wrap(err) |
| } |
| if matcher.Matches(keys) { |
| copy(traceKey[:], traceID) |
| publiclyVisibleTraces[traceKey] = yes |
| publiclyVisibleCorpora[keys[types.CorpusField]] = yes |
| } |
| } |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| s.publiclyVisibleTraces = publiclyVisibleTraces |
| s.publiclyVisibleCorpora = publiclyVisibleCorpora |
| return nil |
| } |
| if err := cycle(ctx); err != nil { |
| return skerr.Wrapf(err, "initializing cache of visible traces") |
| } |
| sklog.Infof("Successfully initialized visible trace cache.") |
| |
| go util.RepeatCtx(ctx, interval, func(ctx context.Context) { |
| err := cycle(ctx) |
| if err != nil { |
| sklog.Warningf("Could not update map of public traces: %s", err) |
| } |
| }) |
| return nil |
| } |
| |
| // NewAndUntriagedSummaryForCL queries all the patchsets in parallel (to keep the query less |
| // complex). If there are no patchsets for the provided CL, it returns an error. |
| func (s *Impl) NewAndUntriagedSummaryForCL(ctx context.Context, qCLID string) (NewAndUntriagedSummary, error) { |
| ctx, span := trace.StartSpan(ctx, "search2_NewAndUntriagedSummaryForCL") |
| defer span.End() |
| |
| patchsets, err := s.getPatchsets(ctx, qCLID) |
| if err != nil { |
| return NewAndUntriagedSummary{}, skerr.Wrap(err) |
| } |
| if len(patchsets) == 0 { |
| return NewAndUntriagedSummary{}, skerr.Fmt("CL %q not found", qCLID) |
| } |
| |
| eg, ctx := errgroup.WithContext(ctx) |
| rv := make([]PatchsetNewAndUntriagedSummary, len(patchsets)) |
| for i, p := range patchsets { |
| idx, ps := i, p |
| eg.Go(func() error { |
| sum, err := s.getSummaryForPS(ctx, qCLID, ps.id) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| sum.PatchsetID = sql.Unqualify(ps.id) |
| sum.PatchsetOrder = ps.order |
| rv[idx] = sum |
| return nil |
| }) |
| } |
| var updatedTS time.Time |
| eg.Go(func() error { |
| var err error |
| updatedTS, err = s.ChangelistLastUpdated(ctx, qCLID) |
| return skerr.Wrap(err) |
| }) |
| if err := eg.Wait(); err != nil { |
| return NewAndUntriagedSummary{}, skerr.Wrapf(err, "Getting counts for CL %q and %d PS", qCLID, len(patchsets)) |
| } |
| return NewAndUntriagedSummary{ |
| ChangelistID: sql.Unqualify(qCLID), |
| PatchsetSummaries: rv, |
| LastUpdated: updatedTS.UTC(), |
| }, nil |
| } |
| |
| type psIDAndOrder struct { |
| id string |
| order int |
| } |
| |
| // getPatchsets returns the qualified ids and orders of the patchsets sorted by ps_order. |
| func (s *Impl) getPatchsets(ctx context.Context, qualifiedID string) ([]psIDAndOrder, error) { |
| ctx, span := trace.StartSpan(ctx, "getPatchsets") |
| defer span.End() |
| rows, err := s.db.Query(ctx, `SELECT patchset_id, ps_order |
| FROM Patchsets WHERE changelist_id = $1 ORDER BY ps_order ASC`, qualifiedID) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "getting summary for cl %q", qualifiedID) |
| } |
| defer rows.Close() |
| var rv []psIDAndOrder |
| for rows.Next() { |
| var row psIDAndOrder |
| if err := rows.Scan(&row.id, &row.order); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| rv = append(rv, row) |
| } |
| return rv, nil |
| } |
| |
| // getSummaryForPS looks at all the data produced for a given PS and returns the a summary of the |
| // newly produced digests and untriaged digests. |
| func (s *Impl) getSummaryForPS(ctx context.Context, clid, psID string) (PatchsetNewAndUntriagedSummary, error) { |
| ctx, span := trace.StartSpan(ctx, "getSummaryForPS") |
| defer span.End() |
| const statement = `WITH |
| CLDigests AS ( |
| SELECT secondary_branch_trace_id, digest, grouping_id |
| FROM SecondaryBranchValues |
| WHERE branch_name = $1 and version_name = $2 |
| ), |
| NonIgnoredCLDigests AS ( |
| -- We only want to count a digest once per grouping, no matter how many times it shows up |
| -- because group those together (by trace) in the frontend UI. |
| SELECT DISTINCT digest, CLDigests.grouping_id |
| FROM CLDigests |
| JOIN Traces |
| ON secondary_branch_trace_id = trace_id |
| WHERE Traces.matches_any_ignore_rule = False |
| ), |
| CLExpectations AS ( |
| SELECT grouping_id, digest, label |
| FROM SecondaryBranchExpectations |
| WHERE branch_name = $1 |
| ), |
| LabeledDigests AS ( |
| SELECT NonIgnoredCLDigests.grouping_id, NonIgnoredCLDigests.digest, COALESCE(CLExpectations.label, COALESCE(Expectations.label, 'u')) as label |
| FROM NonIgnoredCLDigests |
| LEFT JOIN Expectations |
| ON NonIgnoredCLDigests.grouping_id = Expectations.grouping_id AND |
| NonIgnoredCLDigests.digest = Expectations.digest |
| LEFT JOIN CLExpectations |
| ON NonIgnoredCLDigests.grouping_id = CLExpectations.grouping_id AND |
| NonIgnoredCLDigests.digest = CLExpectations.digest |
| ) |
| SELECT * FROM LabeledDigests;` |
| |
| rows, err := s.db.Query(ctx, statement, clid, psID) |
| if err != nil { |
| return PatchsetNewAndUntriagedSummary{}, skerr.Wrapf(err, "getting summary for ps %q in cl %q", psID, clid) |
| } |
| defer rows.Close() |
| s.mutex.RLock() |
| defer s.mutex.RUnlock() |
| var digest schema.DigestBytes |
| var grouping schema.GroupingID |
| var label schema.ExpectationLabel |
| var key groupingDigestKey |
| keyGrouping := key.groupingID[:] |
| keyDigest := key.digest[:] |
| var rv PatchsetNewAndUntriagedSummary |
| |
| for rows.Next() { |
| if err := rows.Scan(&grouping, &digest, &label); err != nil { |
| return PatchsetNewAndUntriagedSummary{}, skerr.Wrap(err) |
| } |
| copy(keyGrouping, grouping) |
| copy(keyDigest, digest) |
| _, isExisting := s.digestsOnPrimary[key] |
| if !isExisting { |
| rv.NewImages++ |
| } |
| if label == schema.LabelUntriaged { |
| rv.TotalUntriagedImages++ |
| if !isExisting { |
| rv.NewUntriagedImages++ |
| } |
| } |
| } |
| return rv, nil |
| } |
| |
| // ChangelistLastUpdated implements the API interface. |
| func (s *Impl) ChangelistLastUpdated(ctx context.Context, qCLID string) (time.Time, error) { |
| ctx, span := trace.StartSpan(ctx, "search2_ChangelistLastUpdated") |
| defer span.End() |
| var updatedTS time.Time |
| row := s.db.QueryRow(ctx, `WITH |
| LastSeenData AS ( |
| SELECT last_ingested_data as ts FROM Changelists |
| WHERE changelist_id = $1 |
| ), |
| LatestTriageAction AS ( |
| SELECT triage_time as ts FROM ExpectationRecords |
| WHERE branch_name = $1 |
| ORDER BY triage_time DESC LIMIT 1 |
| ) |
| SELECT ts FROM LastSeenData |
| UNION |
| SELECT ts FROM LatestTriageAction |
| ORDER BY ts DESC LIMIT 1 |
| `, qCLID) |
| if err := row.Scan(&updatedTS); err != nil { |
| if err == pgx.ErrNoRows { |
| return time.Time{}, nil |
| } |
| return time.Time{}, skerr.Wrapf(err, "Getting last updated ts for cl %q", qCLID) |
| } |
| return updatedTS.UTC(), nil |
| } |
| |
| // Search implements the SearchAPI interface. |
| func (s *Impl) Search(ctx context.Context, q *query.Search) (*frontend.SearchResponse, error) { |
| ctx, span := trace.StartSpan(ctx, "search2_Search") |
| defer span.End() |
| |
| ctx = context.WithValue(ctx, common.QueryKey, *q) |
| ctx, err := s.addCommitsData(ctx) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| if q.ChangelistID != "" { |
| if q.CodeReviewSystemID == "" { |
| return nil, skerr.Fmt("Code Review System (crs) must be specified") |
| } |
| return s.searchCLData(ctx) |
| } |
| |
| commits, err := s.getCommits(ctx) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| // Find all digests and traces that match the given search criteria. |
| // This will be filtered according to the publiclyAllowedParams as well. |
| traceDigests, err := s.getMatchingDigestsAndTraces(ctx) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| if len(traceDigests) == 0 { |
| return &frontend.SearchResponse{ |
| Commits: commits, |
| }, nil |
| } |
| // Lookup the closest diffs to the given digests. This returns a subset according to the |
| // limit and offset in the query. |
| closestDiffs, extendedBulkTriageDeltaInfos, err := s.getClosestDiffs(ctx, traceDigests) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| // Go fetch history and paramset (within this grouping, and respecting publiclyAllowedParams). |
| paramsetsByDigest, err := s.getParamsetsForRightSide(ctx, closestDiffs) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| // Flesh out the trace history with enough data to draw the dots diagram on the frontend. |
| results, err := s.fillOutTraceHistory(ctx, closestDiffs) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| // Populate the LabelBefore fields of the extendedBulkTriageDeltaInfos with expectations from |
| // the primary branch. |
| if err := s.populateLabelBefore(ctx, extendedBulkTriageDeltaInfos); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| // Fill in the paramsets of the reference images. |
| for _, sr := range results { |
| for _, srdd := range sr.RefDiffs { |
| if srdd != nil { |
| srdd.ParamSet = paramsetsByDigest[srdd.Digest] |
| } |
| } |
| } |
| |
| // Populate the optionsIDs fields of each extendedBulkTriageDeltaInfo. |
| if err := s.populateExtendedBulkTriageDeltaInfosOptionsIDs(ctx, extendedBulkTriageDeltaInfos); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| bulkTriageDeltaInfos, err := s.prepareExtendedBulkTriageDeltaInfosForFrontend(ctx, extendedBulkTriageDeltaInfos) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| return &frontend.SearchResponse{ |
| Results: results, |
| Offset: q.Offset, |
| Size: len(extendedBulkTriageDeltaInfos), |
| BulkTriageDeltaInfos: bulkTriageDeltaInfos, |
| Commits: commits, |
| }, nil |
| } |
| |
| // addCommitsData finds the current sliding window of data (The last N commits) and adds the |
| // derived data to the given context and returns it. |
| func (s *Impl) addCommitsData(ctx context.Context) (context.Context, error) { |
| return common.AddCommitsData(ctx, s.db, s.windowLength) |
| } |
| |
| type digestWithTraceAndGrouping struct { |
| traceID schema.TraceID |
| groupingID schema.GroupingID |
| digest schema.DigestBytes |
| // optionsID will be set for CL data only; for primary data we have to look it up from a |
| // different table and the options could change over time. |
| optionsID schema.OptionsID |
| } |
| |
| // getMatchingDigestsAndTraces returns the tuples of digest+traceID that match the given query. |
| // The order of the result is arbitrary. |
| func (s *Impl) getMatchingDigestsAndTraces(ctx context.Context) ([]digestWithTraceAndGrouping, error) { |
| ctx, span := trace.StartSpan(ctx, "getMatchingDigestsAndTraces") |
| defer span.End() |
| q := common.GetQuery(ctx) |
| corpus := q.TraceValues[types.CorpusField][0] |
| if corpus == "" { |
| return nil, skerr.Fmt("must specify corpus in search of left side.") |
| } |
| if q.BlameGroupID != "" { |
| results, err := s.getTracesForBlame(ctx, corpus, q.BlameGroupID) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "searching for blame %q in corpus %q", q.BlameGroupID, corpus) |
| } |
| return results, nil |
| } |
| statement := `WITH |
| MatchingDigests AS ( |
| SELECT grouping_id, digest FROM Expectations |
| WHERE label = ANY($1) |
| ),` |
| tracesBlock, args := s.matchingTracesStatement(ctx) |
| statement += tracesBlock |
| statement += ` |
| SELECT trace_id, MatchingDigests.grouping_id, MatchingTraces.digest FROM |
| MatchingDigests |
| JOIN |
| MatchingTraces ON MatchingDigests.grouping_id = MatchingTraces.grouping_id AND |
| MatchingDigests.digest = MatchingTraces.digest` |
| |
| var triageStatuses []schema.ExpectationLabel |
| if q.IncludeUntriagedDigests { |
| triageStatuses = append(triageStatuses, schema.LabelUntriaged) |
| } |
| if q.IncludeNegativeDigests { |
| triageStatuses = append(triageStatuses, schema.LabelNegative) |
| } |
| if q.IncludePositiveDigests { |
| triageStatuses = append(triageStatuses, schema.LabelPositive) |
| } |
| arguments := append([]interface{}{triageStatuses}, args...) |
| |
| rows, err := s.db.Query(ctx, statement, arguments...) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "searching for query %v with args %v", q, arguments) |
| } |
| defer rows.Close() |
| var rv []digestWithTraceAndGrouping |
| s.mutex.RLock() |
| defer s.mutex.RUnlock() |
| var traceKey schema.MD5Hash |
| for rows.Next() { |
| var row digestWithTraceAndGrouping |
| if err := rows.Scan(&row.traceID, &row.groupingID, &row.digest); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| if s.publiclyVisibleTraces != nil { |
| copy(traceKey[:], row.traceID) |
| if _, ok := s.publiclyVisibleTraces[traceKey]; !ok { |
| continue |
| } |
| } |
| rv = append(rv, row) |
| } |
| return rv, nil |
| } |
| |
| // getTracesForBlame returns the traces that match the given blameID. It mirrors the behavior of |
| // method GetBlamesForUntriagedDigests. See function combineIntoRanges for details. |
| func (s *Impl) getTracesForBlame(ctx context.Context, corpus string, blameID string) ([]digestWithTraceAndGrouping, error) { |
| ctx, span := trace.StartSpan(ctx, "getTracesForBlame") |
| defer span.End() |
| // Find untriaged digests at head and the traces that produced them. |
| tracesByDigest, err := s.getTracesWithUntriagedDigestsAtHead(ctx, corpus) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| if s.isPublicView { |
| tracesByDigest = s.applyPublicFilter(ctx, tracesByDigest) |
| } |
| var traces []schema.TraceID |
| for _, xt := range tracesByDigest { |
| traces = append(traces, xt...) |
| } |
| if len(traces) == 0 { |
| return nil, nil // No data, we can stop here |
| } |
| ctx, err = s.addCommitsData(ctx) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| // Return the trace histories for those traces, as well as a mapping of the unique |
| // digest+grouping pairs in order to get expectations. |
| histories, _, err := s.getHistoriesForTraces(ctx, tracesByDigest) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| // Expand grouping_ids into full params. |
| groupings, err := s.expandGroupings(ctx, tracesByDigest) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| commits, err := s.getCommits(ctx) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| // Look at trace histories and identify ranges of commits that caused us to go from drawing |
| // triaged digests to untriaged digests. |
| ranges := combineIntoRanges(ctx, histories, groupings, commits) |
| |
| var rv []digestWithTraceAndGrouping |
| for _, r := range ranges { |
| if r.CommitRange == blameID { |
| for _, ag := range r.AffectedGroupings { |
| for _, traceIDAndDigest := range ag.traceIDsAndDigests { |
| rv = append(rv, digestWithTraceAndGrouping{ |
| traceID: traceIDAndDigest.id, |
| groupingID: ag.groupingID[:], |
| digest: traceIDAndDigest.digest, |
| }) |
| } |
| } |
| } |
| } |
| return rv, nil |
| } |
| |
| type filterSets struct { |
| key string |
| values []string |
| } |
| |
| // matchingTracesStatement returns a SQL snippet that includes a WITH table called MatchingTraces. |
| // This table will have rows containing trace_id, grouping_id, and digest of traces that match |
| // the given search criteria. The second parameter is the arguments that need to be included |
| // in the query. This code knows to start using numbered parameters at 2. |
| func (s *Impl) matchingTracesStatement(ctx context.Context) (string, []interface{}) { |
| var keyFilters []filterSets |
| q := common.GetQuery(ctx) |
| for key, values := range q.TraceValues { |
| if key == types.CorpusField { |
| continue |
| } |
| if key != sql.Sanitize(key) { |
| sklog.Infof("key %q did not pass sanitization", key) |
| continue |
| } |
| keyFilters = append(keyFilters, filterSets{key: key, values: values}) |
| } |
| ignoreStatuses := []bool{false} |
| if q.IncludeIgnoredTraces { |
| ignoreStatuses = append(ignoreStatuses, true) |
| } |
| corpus := sql.Sanitize(q.TraceValues[types.CorpusField][0]) |
| materializedView := s.getMaterializedView(unignoredRecentTracesView, corpus) |
| if q.OnlyIncludeDigestsProducedAtHead { |
| if len(keyFilters) == 0 { |
| if materializedView != "" && !q.IncludeIgnoredTraces { |
| return "MatchingTraces AS (SELECT * FROM " + materializedView + ")", nil |
| } |
| // Corpus is being used as a string |
| args := []interface{}{common.GetFirstCommitID(ctx), ignoreStatuses, corpus} |
| return ` |
| MatchingTraces AS ( |
| SELECT trace_id, grouping_id, digest FROM ValuesAtHead |
| WHERE most_recent_commit_id >= $2 AND |
| matches_any_ignore_rule = ANY($3) AND |
| corpus = $4 |
| )`, args |
| } |
| if materializedView != "" && !q.IncludeIgnoredTraces { |
| return joinedTracesStatement(keyFilters, corpus) + ` |
| MatchingTraces AS ( |
| SELECT JoinedTraces.trace_id, grouping_id, digest FROM ` + materializedView + ` |
| JOIN JoinedTraces ON JoinedTraces.trace_id = ` + materializedView + `.trace_id |
| )`, nil |
| } |
| // Corpus is being used as a JSONB value here |
| args := []interface{}{common.GetFirstCommitID(ctx), ignoreStatuses} |
| return joinedTracesStatement(keyFilters, corpus) + ` |
| MatchingTraces AS ( |
| SELECT ValuesAtHead.trace_id, grouping_id, digest FROM ValuesAtHead |
| JOIN JoinedTraces ON ValuesAtHead.trace_id = JoinedTraces.trace_id |
| WHERE most_recent_commit_id >= $2 AND |
| matches_any_ignore_rule = ANY($3) |
| )`, args |
| } else { |
| if len(keyFilters) == 0 { |
| if materializedView != "" && !q.IncludeIgnoredTraces { |
| args := []interface{}{common.GetFirstTileID(ctx)} |
| return `MatchingTraces AS ( |
| SELECT DISTINCT TiledTraceDigests.trace_id, TiledTraceDigests.grouping_id, TiledTraceDigests.digest |
| FROM TiledTraceDigests |
| JOIN ` + materializedView + ` ON ` + materializedView + `.trace_id = TiledTraceDigests.trace_id |
| WHERE tile_id >= $2 |
| )`, args |
| } |
| // Corpus is being used as a string |
| args := []interface{}{common.GetFirstCommitID(ctx), ignoreStatuses, corpus, common.GetFirstTileID(ctx)} |
| return ` |
| TracesOfInterest AS ( |
| SELECT trace_id, grouping_id FROM ValuesAtHead |
| WHERE matches_any_ignore_rule = ANY($3) AND |
| most_recent_commit_id >= $2 AND |
| corpus = $4 |
| ), |
| MatchingTraces AS ( |
| SELECT DISTINCT TiledTraceDigests.trace_id, TracesOfInterest.grouping_id, TiledTraceDigests.digest |
| FROM TiledTraceDigests |
| JOIN TracesOfInterest ON TracesOfInterest.trace_id = TiledTraceDigests.trace_id |
| WHERE tile_id >= $5 |
| ) |
| `, args |
| } |
| if materializedView != "" && !q.IncludeIgnoredTraces { |
| args := []interface{}{common.GetFirstTileID(ctx)} |
| return joinedTracesStatement(keyFilters, corpus) + ` |
| TracesOfInterest AS ( |
| SELECT JoinedTraces.trace_id, grouping_id FROM ` + materializedView + ` |
| JOIN JoinedTraces ON JoinedTraces.trace_id = ` + materializedView + `.trace_id |
| ), |
| MatchingTraces AS ( |
| SELECT DISTINCT TiledTraceDigests.trace_id, TracesOfInterest.grouping_id, TiledTraceDigests.digest |
| FROM TiledTraceDigests |
| JOIN TracesOfInterest on TracesOfInterest.trace_id = TiledTraceDigests.trace_id |
| WHERE tile_id >= $2 |
| )`, args |
| } |
| // Corpus is being used as a JSONB value here |
| args := []interface{}{common.GetFirstTileID(ctx), ignoreStatuses} |
| return joinedTracesStatement(keyFilters, corpus) + ` |
| TracesOfInterest AS ( |
| SELECT Traces.trace_id, grouping_id FROM Traces |
| JOIN JoinedTraces ON Traces.trace_id = JoinedTraces.trace_id |
| WHERE matches_any_ignore_rule = ANY($3) |
| ), |
| MatchingTraces AS ( |
| SELECT DISTINCT TiledTraceDigests.trace_id, TracesOfInterest.grouping_id, TiledTraceDigests.digest |
| FROM TiledTraceDigests |
| JOIN TracesOfInterest on TracesOfInterest.trace_id = TiledTraceDigests.trace_id |
| WHERE tile_id >= $2 |
| )`, args |
| } |
| } |
| |
| // joinedTracesStatement returns a SQL snippet that includes a WITH table called JoinedTraces. |
| // This table contains just the trace_ids that match the given filters. filters is expected to |
| // have keys which passed sanitization (it will sanitize the values). The snippet will include |
| // other tables that will be unioned and intersected to create the appropriate rows. This is |
| // similar to the technique we use for ignore rules, chosen to maximize consistent performance |
| // by using the inverted key index. The keys and values are hardcoded into the string instead |
| // of being passed in as arguments because kjlubick@ was not able to use the placeholder values |
| // to compare JSONB types removed from a JSONB object to a string while still using the indexes. |
| func joinedTracesStatement(filters []filterSets, corpus string) string { |
| statement := "" |
| for i, filter := range filters { |
| statement += fmt.Sprintf("U%d AS (\n", i) |
| for j, value := range filter.values { |
| if j != 0 { |
| statement += "\tUNION\n" |
| } |
| statement += fmt.Sprintf("\tSELECT trace_id FROM Traces WHERE keys -> '%s' = '%q'\n", filter.key, sql.Sanitize(value)) |
| } |
| statement += "),\n" |
| } |
| statement += "JoinedTraces AS (\n" |
| for i := range filters { |
| statement += fmt.Sprintf("\tSELECT trace_id FROM U%d\n\tINTERSECT\n", i) |
| } |
| // Include a final intersect for the corpus. The calling logic will make sure a JSONB value |
| // (i.e. a quoted string) is in the arguments slice. |
| statement += "\tSELECT trace_id FROM Traces where keys -> 'source_type' = '\"" + corpus + "\"'\n),\n" |
| return statement |
| } |
| |
| type digestAndClosestDiffs struct { |
| leftDigest schema.DigestBytes |
| groupingID schema.GroupingID |
| rightDigests []schema.DigestBytes |
| traceIDs []schema.TraceID |
| optionsIDs []schema.OptionsID // will be set for CL data only |
| closestDigest *frontend.SRDiffDigest // These won't have ParamSets yet |
| closestPositive *frontend.SRDiffDigest |
| closestNegative *frontend.SRDiffDigest |
| } |
| |
| // extendedBulkTriageDeltaInfo extends the frontend.BulkTriageDeltaInfo struct with the information |
| // needed to populate the LabelBefore field in a separate SQL query. |
| type extendedBulkTriageDeltaInfo struct { |
| frontend.BulkTriageDeltaInfo |
| |
| traceIDs []schema.TraceID |
| groupingID schema.GroupingID |
| digest schema.DigestBytes |
| optionsIDs []schema.OptionsID // Will be set for CL data only. |
| } |
| |
| // getClosestDiffs returns information about the closest triaged digests for each result in the |
| // input. We are able to batch the queries by grouping and do so for better performance. |
| // While this returns a subset of data as defined by the query, it also returns sufficient |
| // information to bulk-triage all of the inputs. Note that this function does not populate the |
| // LabelBefore fields of the returned extendedBulkTriageDeltaInfo structs; these need to be |
| // populated by the caller. |
| func (s *Impl) getClosestDiffs(ctx context.Context, inputs []digestWithTraceAndGrouping) ([]digestAndClosestDiffs, []extendedBulkTriageDeltaInfo, error) { |
| ctx, span := trace.StartSpan(ctx, "getClosestDiffs") |
| defer span.End() |
| byGrouping := map[schema.MD5Hash][]digestWithTraceAndGrouping{} |
| // Even if two groupings draw the same digest, we want those as two different results because |
| // they could be triaged differently. |
| byDigestAndGrouping := map[groupingDigestKey]digestAndClosestDiffs{} |
| var mutex sync.Mutex |
| for _, input := range inputs { |
| gID := sql.AsMD5Hash(input.groupingID) |
| byGrouping[gID] = append(byGrouping[gID], input) |
| key := groupingDigestKey{ |
| digest: sql.AsMD5Hash(input.digest), |
| groupingID: sql.AsMD5Hash(input.groupingID), |
| } |
| bd := byDigestAndGrouping[key] |
| bd.leftDigest = input.digest |
| bd.groupingID = input.groupingID |
| bd.traceIDs = append(bd.traceIDs, input.traceID) |
| if input.optionsID != nil { |
| bd.optionsIDs = append(bd.optionsIDs, input.optionsID) |
| } |
| byDigestAndGrouping[key] = bd |
| } |
| |
| // Look up the diffs in parallel by grouping, as we only want to compare the images to other |
| // images produced by traces in the same grouping. |
| eg, eCtx := errgroup.WithContext(ctx) |
| for g, i := range byGrouping { |
| groupingID, inputs := g, i |
| eg.Go(func() error { |
| // Aggregate and deduplicate digests from the stage one results. |
| digests := make([]schema.DigestBytes, 0, len(inputs)) |
| duplicates := map[schema.MD5Hash]bool{} |
| var key schema.MD5Hash |
| for _, input := range inputs { |
| copy(key[:], input.digest) |
| if duplicates[key] { |
| continue |
| } |
| duplicates[key] = true |
| digests = append(digests, input.digest) |
| } |
| resultsByDigest, err := s.getDiffsForGrouping(eCtx, groupingID, digests) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| // Combine those results into our search results. |
| mutex.Lock() |
| defer mutex.Unlock() |
| for key, diffs := range resultsByDigest { |
| // combine this map with the other |
| digestAndClosestDiffs := byDigestAndGrouping[key] |
| for _, srdd := range diffs { |
| digestBytes, err := sql.DigestToBytes(srdd.Digest) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| digestAndClosestDiffs.rightDigests = append(digestAndClosestDiffs.rightDigests, digestBytes) |
| if srdd.Status == expectations.Positive { |
| digestAndClosestDiffs.closestPositive = srdd |
| } else { |
| digestAndClosestDiffs.closestNegative = srdd |
| } |
| if digestAndClosestDiffs.closestNegative != nil && digestAndClosestDiffs.closestPositive != nil { |
| if digestAndClosestDiffs.closestPositive.CombinedMetric < digestAndClosestDiffs.closestNegative.CombinedMetric { |
| digestAndClosestDiffs.closestDigest = digestAndClosestDiffs.closestPositive |
| } else { |
| digestAndClosestDiffs.closestDigest = digestAndClosestDiffs.closestNegative |
| } |
| } else { |
| // there is only one type of diff, so it defaults to the closest. |
| digestAndClosestDiffs.closestDigest = srdd |
| } |
| } |
| byDigestAndGrouping[key] = digestAndClosestDiffs |
| } |
| return nil |
| }) |
| } |
| if err := eg.Wait(); err != nil { |
| return nil, nil, skerr.Wrap(err) |
| } |
| |
| q := common.GetQuery(ctx) |
| var extendedBulkTriageDeltaInfos []extendedBulkTriageDeltaInfo |
| results := make([]digestAndClosestDiffs, 0, len(byDigestAndGrouping)) |
| for _, s2 := range byDigestAndGrouping { |
| // Filter out any results without a closest triaged digest (if that option is selected). |
| if q.MustIncludeReferenceFilter && s2.closestDigest == nil { |
| continue |
| } |
| grouping, err := s.expandGrouping(ctx, sql.AsMD5Hash(s2.groupingID)) |
| if err != nil { |
| return nil, nil, skerr.Wrap(err) |
| } |
| triageDeltaInfo := extendedBulkTriageDeltaInfo{ |
| // We do not populate the LabelBefore field, as that is the caller's responsibility. |
| // However, we will populate the ClosestDiffLabel field if there is a closest digest. |
| BulkTriageDeltaInfo: frontend.BulkTriageDeltaInfo{ |
| Grouping: grouping, |
| Digest: types.Digest(hex.EncodeToString(s2.leftDigest)), |
| }, |
| traceIDs: s2.traceIDs, |
| groupingID: s2.groupingID, |
| digest: s2.leftDigest, |
| optionsIDs: s2.optionsIDs, |
| } |
| if s2.closestDigest != nil { |
| // Apply RGBA Filter here - if the closest digest isn't within range, we remove it. |
| maxDiff := util.MaxInt(s2.closestDigest.MaxRGBADiffs[:]...) |
| if maxDiff < q.RGBAMinFilter || maxDiff > q.RGBAMaxFilter { |
| continue |
| } |
| closestLabel := s2.closestDigest.Status |
| triageDeltaInfo.ClosestDiffLabel = frontend.ClosestDiffLabel(closestLabel) |
| } else { |
| triageDeltaInfo.ClosestDiffLabel = frontend.ClosestDiffLabelNone |
| } |
| results = append(results, s2) |
| extendedBulkTriageDeltaInfos = append(extendedBulkTriageDeltaInfos, triageDeltaInfo) |
| } |
| // Sort for determinism. |
| sort.Slice(extendedBulkTriageDeltaInfos, func(i, j int) bool { |
| groupIDComparison := bytes.Compare(extendedBulkTriageDeltaInfos[i].groupingID, extendedBulkTriageDeltaInfos[j].groupingID) |
| return groupIDComparison < 0 || (groupIDComparison == 0 && extendedBulkTriageDeltaInfos[i].Digest < extendedBulkTriageDeltaInfos[j].Digest) |
| }) |
| if q.Offset >= len(results) { |
| return nil, extendedBulkTriageDeltaInfos, nil |
| } |
| sortAsc := q.Sort == query.SortAscending |
| sort.Slice(results, func(i, j int) bool { |
| if results[i].closestDigest == nil && results[j].closestDigest != nil { |
| return true // sort results with no reference image to the top |
| } |
| if results[i].closestDigest != nil && results[j].closestDigest == nil { |
| return false |
| } |
| if (results[i].closestDigest == nil && results[j].closestDigest == nil) || |
| results[i].closestDigest.CombinedMetric == results[j].closestDigest.CombinedMetric { |
| // Tiebreak using digest in ascending order, followed by groupingID. |
| c := bytes.Compare(results[i].leftDigest, results[j].leftDigest) |
| if c != 0 { |
| return c < 0 |
| } |
| return bytes.Compare(results[i].groupingID, results[j].groupingID) < 0 |
| } |
| if sortAsc { |
| return results[i].closestDigest.CombinedMetric < results[j].closestDigest.CombinedMetric |
| } |
| return results[i].closestDigest.CombinedMetric > results[j].closestDigest.CombinedMetric |
| }) |
| |
| if q.Limit <= 0 { |
| for i := range extendedBulkTriageDeltaInfos { |
| extendedBulkTriageDeltaInfos[i].InCurrentSearchResultsPage = true |
| } |
| return results, extendedBulkTriageDeltaInfos, nil |
| } |
| end := util.MinInt(len(results), q.Offset+q.Limit) |
| for i := q.Offset; i < end; i++ { |
| extendedBulkTriageDeltaInfos[i].InCurrentSearchResultsPage = true |
| } |
| return results[q.Offset:end], extendedBulkTriageDeltaInfos, nil |
| } |
| |
| // getDiffsForGrouping returns the closest positive and negative diffs for the provided digests |
| // in the given grouping. |
| func (s *Impl) getDiffsForGrouping(ctx context.Context, groupingID schema.MD5Hash, leftDigests []schema.DigestBytes) (map[groupingDigestKey][]*frontend.SRDiffDigest, error) { |
| ctx, span := trace.StartSpan(ctx, "getDiffsForGrouping") |
| defer span.End() |
| rtv := common.GetQuery(ctx).RightTraceValues |
| digestsInGrouping, err := s.getDigestsForGrouping(ctx, groupingID[:], rtv) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| statement := ` |
| WITH |
| PositiveOrNegativeDigests AS ( |
| SELECT digest, label FROM Expectations |
| WHERE grouping_id = $1 AND (label = 'n' OR label = 'p') |
| ), |
| ComparisonBetweenUntriagedAndObserved AS ( |
| SELECT DiffMetrics.* FROM DiffMetrics |
| WHERE left_digest = ANY($2) AND right_digest = ANY($3) |
| ) |
| -- This will return the right_digest with the smallest combined_metric for each left_digest + label |
| SELECT DISTINCT ON (left_digest, label) |
| label, left_digest, right_digest, num_pixels_diff, percent_pixels_diff, max_rgba_diffs, |
| combined_metric, dimensions_differ |
| FROM |
| ComparisonBetweenUntriagedAndObserved |
| JOIN PositiveOrNegativeDigests |
| ON ComparisonBetweenUntriagedAndObserved.right_digest = PositiveOrNegativeDigests.digest |
| ORDER BY left_digest, label, combined_metric ASC, max_channel_diff ASC, right_digest ASC |
| ` |
| |
| rows, err := s.db.Query(ctx, statement, groupingID[:], leftDigests, digestsInGrouping) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| results := map[groupingDigestKey][]*frontend.SRDiffDigest{} |
| var label schema.ExpectationLabel |
| var row schema.DiffMetricRow |
| for rows.Next() { |
| if err := rows.Scan(&label, &row.LeftDigest, &row.RightDigest, &row.NumPixelsDiff, |
| &row.PercentPixelsDiff, &row.MaxRGBADiffs, &row.CombinedMetric, |
| &row.DimensionsDiffer); err != nil { |
| rows.Close() |
| return nil, skerr.Wrap(err) |
| } |
| srdd := &frontend.SRDiffDigest{ |
| Digest: types.Digest(hex.EncodeToString(row.RightDigest)), |
| Status: label.ToExpectation(), |
| CombinedMetric: row.CombinedMetric, |
| DimDiffer: row.DimensionsDiffer, |
| MaxRGBADiffs: row.MaxRGBADiffs, |
| NumDiffPixels: row.NumPixelsDiff, |
| PixelDiffPercent: row.PercentPixelsDiff, |
| QueryMetric: row.CombinedMetric, |
| } |
| key := groupingDigestKey{ |
| digest: sql.AsMD5Hash(row.LeftDigest), |
| groupingID: groupingID, |
| } |
| results[key] = append(results[key], srdd) |
| } |
| return results, nil |
| } |
| |
| // getDigestsForGrouping returns the digests that were produced in the given range by any traces |
| // which belong to the grouping and match the provided paramset (if provided). It returns digests |
| // from traces regardless of the traces' ignore statuses. As per usual with a ParamSet, we use |
| // a union on values associated with a given key and an intersect across multiple keys. |
| func (s *Impl) getDigestsForGrouping(ctx context.Context, groupingID schema.GroupingID, traceKeys paramtools.ParamSet) ([]schema.DigestBytes, error) { |
| ctx, span := trace.StartSpan(ctx, "getDigestsForGrouping") |
| defer span.End() |
| tracesForGroup, err := s.getTracesForGroup(ctx, groupingID, traceKeys) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| beginTile, endTile := common.GetFirstTileID(ctx), common.GetLastTileID(ctx) |
| tilesInRange := make([]schema.TileID, 0, endTile-beginTile+1) |
| for i := beginTile; i <= endTile; i++ { |
| tilesInRange = append(tilesInRange, i) |
| } |
| // See diff/worker for explanation of this faster query. |
| const statement = ` |
| SELECT DISTINCT digest FROM TiledTraceDigests |
| AS OF SYSTEM TIME '-0.1s' |
| WHERE tile_id = ANY($1) AND trace_id = ANY($2)` |
| |
| rows, err := s.db.Query(ctx, statement, tilesInRange, tracesForGroup) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "fetching digests") |
| } |
| defer rows.Close() |
| var rv []schema.DigestBytes |
| for rows.Next() { |
| var d schema.DigestBytes |
| if err := rows.Scan(&d); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| rv = append(rv, d) |
| } |
| return rv, nil |
| } |
| |
| // getTracesForGroup returns all traces that match the given groupingID and the provided key/values |
| // in traceKeys, if any. It may return an error if traceKeys contains invalid characters that we |
| // cannot safely turn into a SQL query. |
| func (s *Impl) getTracesForGroup(ctx context.Context, groupingID schema.GroupingID, traceKeys paramtools.ParamSet) ([]schema.TraceID, error) { |
| ctx, span := trace.StartSpan(ctx, "getTracesForGroup") |
| span.AddAttributes(trace.Int64Attribute("num trace keys", int64(len(traceKeys)))) |
| defer span.End() |
| |
| statement, err := observedDigestsStatement(traceKeys) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "Could not make valid query for %#v", traceKeys) |
| } |
| |
| rows, err := s.db.Query(ctx, statement, groupingID) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "fetching trace ids") |
| } |
| defer rows.Close() |
| var rv []schema.TraceID |
| for rows.Next() { |
| var t schema.TraceID |
| if err := rows.Scan(&t); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| rv = append(rv, t) |
| } |
| return rv, nil |
| } |
| |
| // observedDigestsStatement returns a SQL query that returns all trace ids, regardless of ignore |
| // status, that matches the given paramset and belongs to the given grouping. We put the keys and |
| // values directly into the query because the pgx driver does not handle the placeholders well |
| // when used like key -> $2; it has a hard time deducing the appropriate types. |
| func observedDigestsStatement(ps paramtools.ParamSet) (string, error) { |
| if len(ps) == 0 { |
| return `SELECT trace_id FROM Traces |
| AS OF SYSTEM TIME '-0.1s' |
| WHERE grouping_id = $1`, nil |
| } |
| statement := "WITH\n" |
| unionIndex := 0 |
| keys := make([]string, 0, len(ps)) |
| for key := range ps { |
| keys = append(keys, key) |
| } |
| sort.Strings(keys) // sort for determinism |
| for _, key := range keys { |
| if key != sql.Sanitize(key) { |
| return "", skerr.Fmt("Invalid query key %q", key) |
| } |
| if unionIndex > 0 { |
| statement += ",\n" |
| } |
| statement += fmt.Sprintf("U%d AS (\n", unionIndex) |
| for j, value := range ps[key] { |
| if value != sql.Sanitize(value) { |
| return "", skerr.Fmt("Invalid query value %q", value) |
| } |
| if j != 0 { |
| statement += "\tUNION\n" |
| } |
| // It is important to use -> and not --> to correctly make use of the keys index. |
| statement += fmt.Sprintf("\tSELECT trace_id FROM Traces WHERE keys -> '%s' = '%q'\n", key, value) |
| } |
| statement += ")" |
| unionIndex++ |
| } |
| statement += "\n" |
| for i := 0; i < unionIndex; i++ { |
| statement += fmt.Sprintf("SELECT trace_id FROM U%d\nINTERSECT\n", i) |
| } |
| statement += `SELECT trace_id FROM Traces WHERE grouping_id = $1` |
| return statement, nil |
| } |
| |
| // getParamsetsForRightSide fetches all the traces that produced the digests in the data and |
| // the keys for these traces as ParamSets. The ParamSets are mapped according to the digest |
| // which produced them in the given window (or the tiles that approximate this). |
| func (s *Impl) getParamsetsForRightSide(ctx context.Context, inputs []digestAndClosestDiffs) (map[types.Digest]paramtools.ParamSet, error) { |
| ctx, span := trace.StartSpan(ctx, "getParamsetsForRightSide") |
| defer span.End() |
| digestToTraces, err := s.addRightTraces(ctx, inputs) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| traceToOptions, err := s.getOptionsForTraces(ctx, digestToTraces) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| rv, err := s.expandTracesIntoParamsets(ctx, digestToTraces, traceToOptions) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| return rv, nil |
| } |
| |
| // addRightTraces finds the traces that draw the given digests. We do not need to consider the |
| // ignore rules or other search constraints because those only apply to the search results |
| // (that is, the left side). |
| func (s *Impl) addRightTraces(ctx context.Context, inputs []digestAndClosestDiffs) (map[types.Digest][]schema.TraceID, error) { |
| ctx, span := trace.StartSpan(ctx, "addRightTraces") |
| defer span.End() |
| |
| digestToTraces := map[types.Digest][]schema.TraceID{} |
| var mutex sync.Mutex |
| eg, eCtx := errgroup.WithContext(ctx) |
| totalDigests := 0 |
| for _, input := range inputs { |
| groupingID := input.groupingID |
| rightDigests := input.rightDigests |
| totalDigests += len(input.rightDigests) |
| eg.Go(func() error { |
| const statement = `SELECT DISTINCT encode(digest, 'hex') AS digest, trace_id |
| FROM TiledTraceDigests@grouping_digest_idx |
| WHERE digest = ANY($1) AND grouping_id = $2 AND tile_id >= $3 |
| ` |
| rows, err := s.db.Query(eCtx, statement, rightDigests, groupingID, common.GetFirstTileID(ctx)) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| defer rows.Close() |
| mutex.Lock() |
| defer mutex.Unlock() |
| s.mutex.RLock() |
| defer s.mutex.RUnlock() |
| var traceKey schema.MD5Hash |
| for rows.Next() { |
| var digest types.Digest |
| var traceID schema.TraceID |
| if err := rows.Scan(&digest, &traceID); err != nil { |
| return skerr.Wrap(err) |
| } |
| if s.publiclyVisibleTraces != nil { |
| copy(traceKey[:], traceID) |
| if _, ok := s.publiclyVisibleTraces[traceKey]; !ok { |
| continue |
| } |
| } |
| digestToTraces[digest] = append(digestToTraces[digest], traceID) |
| } |
| return nil |
| }) |
| } |
| span.AddAttributes(trace.Int64Attribute("num_right_digests", int64(totalDigests))) |
| if err := eg.Wait(); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| return digestToTraces, nil |
| } |
| |
| // getOptionsForTraces returns the most recent option map for each given trace. |
| func (s *Impl) getOptionsForTraces(ctx context.Context, digestToTraces map[types.Digest][]schema.TraceID) (map[schema.MD5Hash]paramtools.Params, error) { |
| ctx, span := trace.StartSpan(ctx, "getOptionsForTraces") |
| defer span.End() |
| byTrace := map[schema.MD5Hash]paramtools.Params{} |
| placeHolder := paramtools.Params{} |
| var traceKey schema.MD5Hash |
| for _, traces := range digestToTraces { |
| for _, traceID := range traces { |
| copy(traceKey[:], traceID) |
| byTrace[traceKey] = placeHolder |
| } |
| } |
| // we now have a set of the traces we need to lookup |
| traceIDs := make([]schema.TraceID, 0, len(byTrace)) |
| for trID := range byTrace { |
| traceIDs = append(traceIDs, sql.FromMD5Hash(trID)) |
| } |
| const statement = `SELECT trace_id, options_id FROM ValuesAtHead |
| WHERE trace_id = ANY($1)` |
| rows, err := s.db.Query(ctx, statement, traceIDs) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| var traceID schema.TraceID |
| var optionsID schema.OptionsID |
| for rows.Next() { |
| if err := rows.Scan(&traceID, &optionsID); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| opts, err := s.expandOptionsToParams(ctx, optionsID) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| copy(traceKey[:], traceID) |
| byTrace[traceKey] = opts |
| } |
| return byTrace, nil |
| } |
| |
| // expandOptionsToParams returns the params that correspond to a given optionsID. The returned |
| // params should not be mutated, as it is not copied (for performance reasons). |
| func (s *Impl) expandOptionsToParams(ctx context.Context, optionsID schema.OptionsID) (paramtools.Params, error) { |
| ctx, span := trace.StartSpan(ctx, "expandOptionsToParams") |
| defer span.End() |
| if keys, ok := s.optionsGroupingCache.Get(string(optionsID)); ok { |
| return keys.(paramtools.Params), nil |
| } |
| // cache miss |
| const statement = `SELECT keys FROM Options WHERE options_id = $1` |
| row := s.db.QueryRow(ctx, statement, optionsID) |
| var keys paramtools.Params |
| if err := row.Scan(&keys); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| s.optionsGroupingCache.Add(string(optionsID), keys) |
| return keys, nil |
| } |
| |
| // expandTracesIntoParamsets effectively returns a map detailing "who drew a given digest?". This |
| // is done by looking up the keys associated with each trace and combining them. |
| func (s *Impl) expandTracesIntoParamsets(ctx context.Context, toLookUp map[types.Digest][]schema.TraceID, traceToOptions map[schema.MD5Hash]paramtools.Params) (map[types.Digest]paramtools.ParamSet, error) { |
| ctx, span := trace.StartSpan(ctx, "expandTracesIntoParamsets") |
| defer span.End() |
| span.AddAttributes(trace.Int64Attribute("num_trace_sets", int64(len(toLookUp)))) |
| rv := map[types.Digest]paramtools.ParamSet{} |
| for digest, traces := range toLookUp { |
| paramset, err := s.lookupOrLoadParamSetFromCache(ctx, traces) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| var traceKey schema.MD5Hash |
| for _, traceID := range traces { |
| copy(traceKey[:], traceID) |
| ps := traceToOptions[traceKey] |
| paramset.AddParams(ps) |
| } |
| paramset.Normalize() |
| rv[digest] = paramset |
| } |
| return rv, nil |
| } |
| |
| // lookupOrLoadParamSetFromCache takes a slice of traces and returns a ParamSet combining all their |
| // keys. It will use the traceCache or query the DB and fill the cache on a cache miss. |
| func (s *Impl) lookupOrLoadParamSetFromCache(ctx context.Context, traces []schema.TraceID) (paramtools.ParamSet, error) { |
| ctx, span := trace.StartSpan(ctx, "lookupOrLoadParamSetFromCache") |
| defer span.End() |
| paramset := paramtools.ParamSet{} |
| var cacheMisses []schema.TraceID |
| for _, traceID := range traces { |
| if keys, ok := s.traceCache.Get(string(traceID)); ok { |
| paramset.AddParams(keys.(paramtools.Params)) |
| } else { |
| cacheMisses = append(cacheMisses, traceID) |
| } |
| } |
| if len(cacheMisses) > 0 { |
| const statement = `SELECT trace_id, keys FROM Traces WHERE trace_id = ANY($1)` |
| rows, err := s.db.Query(ctx, statement, cacheMisses) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| var traceID schema.TraceID |
| for rows.Next() { |
| var keys paramtools.Params |
| if err := rows.Scan(&traceID, &keys); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| s.traceCache.Add(string(traceID), keys) |
| paramset.AddParams(keys) |
| } |
| } |
| return paramset, nil |
| } |
| |
| // expandTraceToParams will return a traces keys from the cache. On a cache miss, it will look |
| // up the trace from the DB and add it to the cache. |
| func (s *Impl) expandTraceToParams(ctx context.Context, traceID schema.TraceID) (paramtools.Params, error) { |
| ctx, span := trace.StartSpan(ctx, "expandTraceToParams") |
| defer span.End() |
| if keys, ok := s.traceCache.Get(string(traceID)); ok { |
| return keys.(paramtools.Params).Copy(), nil // Return a copy to protect cached value |
| } |
| // cache miss |
| const statement = `SELECT keys FROM Traces WHERE trace_id = $1` |
| row := s.db.QueryRow(ctx, statement, traceID) |
| var keys paramtools.Params |
| if err := row.Scan(&keys); err != nil { |
| if err == pgx.ErrNoRows { |
| return nil, nil |
| } |
| return nil, skerr.Wrap(err) |
| } |
| s.traceCache.Add(string(traceID), keys) |
| return keys.Copy(), nil // Return a copy to protect cached value |
| } |
| |
| // fillOutTraceHistory returns a slice of SearchResults that are mostly filled in, particularly |
| // including the history of the traces for each result. |
| func (s *Impl) fillOutTraceHistory(ctx context.Context, inputs []digestAndClosestDiffs) ([]*frontend.SearchResult, error) { |
| ctx, span := trace.StartSpan(ctx, "fillOutTraceHistory") |
| span.AddAttributes(trace.Int64Attribute("results", int64(len(inputs)))) |
| defer span.End() |
| // Fill out these histories in parallel. We avoid race conditions by writing to a prescribed |
| // index in the results slice. |
| results := make([]*frontend.SearchResult, len(inputs)) |
| eg, eCtx := errgroup.WithContext(ctx) |
| for i, j := range inputs { |
| idx, input := i, j |
| eg.Go(func() error { |
| sr := &frontend.SearchResult{ |
| Digest: types.Digest(hex.EncodeToString(input.leftDigest)), |
| RefDiffs: map[frontend.RefClosest]*frontend.SRDiffDigest{ |
| frontend.PositiveRef: input.closestPositive, |
| frontend.NegativeRef: input.closestNegative, |
| }, |
| } |
| if input.closestDigest != nil && input.closestDigest.Status == expectations.Positive { |
| sr.ClosestRef = frontend.PositiveRef |
| } else if input.closestDigest != nil && input.closestDigest.Status == expectations.Negative { |
| sr.ClosestRef = frontend.NegativeRef |
| } |
| tg, err := s.traceGroupForTraces(eCtx, input.traceIDs, input.optionsIDs, sr.Digest) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| if err := s.fillInExpectations(eCtx, &tg, input.groupingID); err != nil { |
| return skerr.Wrap(err) |
| } |
| th, err := s.getTriageHistory(eCtx, input.groupingID, input.leftDigest) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| sr.TriageHistory = th |
| if err := s.fillInTraceParams(eCtx, &tg); err != nil { |
| return skerr.Wrap(err) |
| } |
| sr.TraceGroup = tg |
| if len(tg.Digests) > 0 { |
| // The first digest in the trace group is this digest. |
| sr.Status = tg.Digests[0].Status |
| } else { |
| // We assume untriaged if digest is not in the window. |
| sr.Status = expectations.Untriaged |
| } |
| if len(tg.Traces) > 0 { |
| // Grab the test name from the first trace, since we know all the traces are of |
| // the same grouping, which includes test name. |
| sr.Test = types.TestName(tg.Traces[0].Params[types.PrimaryKeyField]) |
| } |
| leftPS := paramtools.ParamSet{} |
| for _, tr := range tg.Traces { |
| leftPS.AddParams(tr.Params) |
| } |
| leftPS.Normalize() |
| sr.ParamSet = leftPS |
| results[idx] = sr |
| return nil |
| }) |
| } |
| if err := eg.Wait(); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| return results, nil |
| } |
| |
| type traceDigestCommit struct { |
| commitID schema.CommitID |
| digest types.Digest |
| optionsID schema.OptionsID |
| } |
| |
| // traceGroupForTraces gets all the history for a slice of traces within the given window and |
| // turns it into a format that the frontend can render. If latestOptions is provided, it is assumed |
| // to be a parallel slice to traceIDs - those options will be used as the options for each trace |
| // instead of the ones that were searched. |
| func (s *Impl) traceGroupForTraces(ctx context.Context, traceIDs []schema.TraceID, latestOptions []schema.OptionsID, primary types.Digest) (frontend.TraceGroup, error) { |
| ctx, span := trace.StartSpan(ctx, "traceGroupForTraces") |
| span.AddAttributes(trace.Int64Attribute("num_traces", int64(len(traceIDs)))) |
| defer span.End() |
| |
| traceData := make(map[schema.MD5Hash][]traceDigestCommit, len(traceIDs)) |
| // Make sure there's an entry for each trace. That way, even if the trace is not seen on |
| // the primary branch (e.g. newly added in a CL), we can show something for it. |
| for i := range traceIDs { |
| key := sql.AsMD5Hash(traceIDs[i]) |
| traceData[key] = nil |
| if latestOptions != nil { |
| traceData[key] = append(traceData[key], traceDigestCommit{optionsID: latestOptions[i]}) |
| } |
| } |
| const statement = `SELECT trace_id, commit_id, encode(digest, 'hex'), options_id |
| FROM TraceValues WHERE trace_id = ANY($1) AND commit_id >= $2 |
| ORDER BY trace_id, commit_id` |
| rows, err := s.db.Query(ctx, statement, traceIDs, common.GetFirstCommitID(ctx)) |
| if err != nil { |
| return frontend.TraceGroup{}, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| var key schema.MD5Hash |
| var traceID schema.TraceID |
| for rows.Next() { |
| var row traceDigestCommit |
| if err := rows.Scan(&traceID, &row.commitID, &row.digest, &row.optionsID); err != nil { |
| return frontend.TraceGroup{}, skerr.Wrap(err) |
| } |
| copy(key[:], traceID) |
| traceData[key] = append(traceData[key], row) |
| } |
| return makeTraceGroup(ctx, traceData, primary) |
| } |
| |
| // fillInExpectations looks up all the expectations for the digests included in the given |
| // TraceGroup and updates the passed in TraceGroup directly. |
| func (s *Impl) fillInExpectations(ctx context.Context, tg *frontend.TraceGroup, groupingID schema.GroupingID) error { |
| ctx, span := trace.StartSpan(ctx, "fillInExpectations") |
| defer span.End() |
| digests := make([]interface{}, 0, len(tg.Digests)) |
| for _, digestStatus := range tg.Digests { |
| dBytes, err := sql.DigestToBytes(digestStatus.Digest) |
| if err != nil { |
| sklog.Warningf("invalid digest: %s", digestStatus.Digest) |
| continue |
| } |
| digests = append(digests, dBytes) |
| } |
| arguments := []interface{}{groupingID, digests} |
| statement := ` |
| SELECT encode(digest, 'hex'), label FROM Expectations |
| WHERE grouping_id = $1 and digest = ANY($2)` |
| if qCLID := common.GetQualifiedCL(ctx); qCLID != "" { |
| // We use a full outer join to make sure we have the triage status from both tables |
| // (with the CL expectations winning if triaged in both places) |
| statement = `WITH |
| CLExpectations AS ( |
| SELECT digest, label |
| FROM SecondaryBranchExpectations |
| WHERE branch_name = $3 AND grouping_id = $1 AND digest = ANY($2) |
| ), |
| PrimaryExpectations AS ( |
| SELECT digest, label FROM Expectations |
| WHERE grouping_id = $1 AND digest = ANY($2) |
| ) |
| SELECT encode(COALESCE(CLExpectations.digest, PrimaryExpectations.digest), 'hex'), |
| COALESCE(CLExpectations.label, COALESCE(PrimaryExpectations.label, 'u')) FROM |
| CLExpectations FULL OUTER JOIN PrimaryExpectations ON |
| CLExpectations.digest = PrimaryExpectations.digest` |
| arguments = append(arguments, qCLID) |
| } |
| rows, err := s.db.Query(ctx, statement, arguments...) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| defer rows.Close() |
| for rows.Next() { |
| var digest types.Digest |
| var label schema.ExpectationLabel |
| if err := rows.Scan(&digest, &label); err != nil { |
| return skerr.Wrap(err) |
| } |
| for i, ds := range tg.Digests { |
| if ds.Digest == digest { |
| tg.Digests[i].Status = label.ToExpectation() |
| } |
| } |
| } |
| return nil |
| } |
| |
| func (s *Impl) getTriageHistory(ctx context.Context, groupingID schema.GroupingID, digest schema.DigestBytes) ([]frontend.TriageHistory, error) { |
| ctx, span := trace.StartSpan(ctx, "getTriageHistory") |
| defer span.End() |
| |
| qCLID := common.GetQualifiedCL(ctx) |
| const statement = `WITH |
| PrimaryRecord AS ( |
| SELECT expectation_record_id FROM Expectations |
| WHERE grouping_id = $1 AND digest = $2 |
| ), |
| SecondaryRecord AS ( |
| SELECT expectation_record_id FROM SecondaryBranchExpectations |
| WHERE grouping_id = $1 AND digest = $2 AND branch_name = $3 |
| ), |
| CombinedRecords AS ( |
| SELECT expectation_record_id FROM PrimaryRecord |
| UNION |
| SELECT expectation_record_id FROM SecondaryRecord |
| ) |
| SELECT user_name, triage_time FROM ExpectationRecords |
| JOIN CombinedRecords ON ExpectationRecords.expectation_record_id = CombinedRecords.expectation_record_id |
| ORDER BY triage_time DESC LIMIT 1` |
| |
| row := s.db.QueryRow(ctx, statement, groupingID, digest, qCLID) |
| var user string |
| var ts time.Time |
| if err := row.Scan(&user, &ts); err != nil { |
| if err == pgx.ErrNoRows { |
| return nil, nil |
| } |
| return nil, skerr.Wrap(err) |
| } |
| return []frontend.TriageHistory{{ |
| User: user, |
| TS: ts.UTC(), |
| }}, nil |
| } |
| |
| // fillInTraceParams looks up the keys (params) for each trace and fills them in on the passed in |
| // TraceGroup. |
| func (s *Impl) fillInTraceParams(ctx context.Context, tg *frontend.TraceGroup) error { |
| ctx, span := trace.StartSpan(ctx, "fillInTraceParams") |
| defer span.End() |
| for i, tr := range tg.Traces { |
| traceID, err := hex.DecodeString(string(tr.ID)) |
| if err != nil { |
| return skerr.Wrapf(err, "invalid trace id %q", tr.ID) |
| } |
| ps, err := s.expandTraceToParams(ctx, traceID) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| if tr.RawTrace.OptionsID != nil { |
| opts, err := s.expandOptionsToParams(ctx, tr.RawTrace.OptionsID) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| ps.Add(opts) |
| } |
| tg.Traces[i].Params = ps |
| tg.Traces[i].RawTrace = nil // Done with this temporary data. |
| } |
| return nil |
| } |
| |
| // makeGroupingAndDigestWhereClause builds the part of a "WHERE" clause that filters by grouping ID |
| // and digest. It returns the SQL clause and a list of parameter values. |
| func makeGroupingAndDigestWhereClause(triageDeltaInfos []extendedBulkTriageDeltaInfo, startingPlaceholderNum int) (string, []interface{}) { |
| var parts []string |
| args := make([]interface{}, 0, 2*len(triageDeltaInfos)) |
| placeholderNum := startingPlaceholderNum |
| for _, bulkTriageDeltaInfo := range triageDeltaInfos { |
| parts = append(parts, fmt.Sprintf("(grouping_id = $%d AND digest = $%d)", placeholderNum, placeholderNum+1)) |
| args = append(args, bulkTriageDeltaInfo.groupingID, bulkTriageDeltaInfo.digest) |
| placeholderNum += 2 |
| } |
| sort.Strings(parts) // Make the query string deterministic for easier debugging. |
| return strings.Join(parts, " OR "), args |
| } |
| |
| // findPrimaryBranchLabels returns the primary branch labels for the digests corresponding to the |
| // passed in extendedBulkTriageDeltaInfo structs. |
| func (s *Impl) findPrimaryBranchLabels(ctx context.Context, triageDeltaInfos []extendedBulkTriageDeltaInfo) (map[groupingDigestKey]schema.ExpectationLabel, error) { |
| labels := map[groupingDigestKey]schema.ExpectationLabel{} |
| if len(triageDeltaInfos) == 0 { |
| return labels, nil |
| } |
| whereClause, whereArgs := makeGroupingAndDigestWhereClause(triageDeltaInfos, 1) |
| statement := "SELECT grouping_id, digest, label FROM Expectations WHERE " + whereClause |
| rows, err := s.db.Query(ctx, statement, whereArgs...) |
| if err != nil { |
| sklog.Warningf("Error for triage delta infos %+v", triageDeltaInfos) |
| return nil, skerr.Wrapf(err, "with statement %s", statement) |
| } |
| defer rows.Close() |
| for rows.Next() { |
| var groupingID schema.GroupingID |
| var digest schema.DigestBytes |
| var label schema.ExpectationLabel |
| if err := rows.Scan(&groupingID, &digest, &label); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| labels[groupingDigestKey{ |
| groupingID: sql.AsMD5Hash(groupingID), |
| digest: sql.AsMD5Hash(digest), |
| }] = label |
| } |
| return labels, nil |
| } |
| |
| // findSecondaryBranchLabels returns the primary branch labels for the digests corresponding to the |
| // passed in extendedBulkTriageDeltaInfo structs. |
| func (s *Impl) findSecondaryBranchLabels(ctx context.Context, triageDeltaInfos []extendedBulkTriageDeltaInfo) (map[groupingDigestKey]schema.ExpectationLabel, error) { |
| labels := map[groupingDigestKey]schema.ExpectationLabel{} |
| if len(triageDeltaInfos) == 0 { |
| return labels, nil |
| } |
| whereClause, whereArgs := makeGroupingAndDigestWhereClause(triageDeltaInfos, 2) |
| statement := ` |
| SELECT grouping_id, |
| digest, |
| label |
| FROM SecondaryBranchExpectations |
| WHERE branch_name = $1 AND (` + whereClause + ")" |
| rows, err := s.db.Query(ctx, statement, append([]interface{}{common.GetQualifiedCL(ctx)}, whereArgs...)...) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| for rows.Next() { |
| var groupingID schema.GroupingID |
| var digest schema.DigestBytes |
| var label schema.ExpectationLabel |
| if err := rows.Scan(&groupingID, &digest, &label); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| labels[groupingDigestKey{ |
| groupingID: sql.AsMD5Hash(groupingID), |
| digest: sql.AsMD5Hash(digest), |
| }] = label |
| } |
| return labels, nil |
| } |
| |
| // populateLabelBefore populates the LabelBefore field of each passed in |
| // extendedBulkTriageDeltaInfo with expectations for the primary branch. |
| // |
| // It mirrors the verifyPrimaryBranchLabelBefore function in web.go |
| // (https://skia.googlesource.com/buildbot/+/refs/heads/main/golden/go/web/web.go#1178). |
| func (s *Impl) populateLabelBefore(ctx context.Context, triageDeltaInfos []extendedBulkTriageDeltaInfo) error { |
| // Gather the relevant labels from the Expectations table. |
| primaryBranchLabels, err := s.findPrimaryBranchLabels(ctx, triageDeltaInfos) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| |
| // Place extendedBulkTriageDeltaInfos in a map for faster querying. |
| byKey := map[groupingDigestKey]*extendedBulkTriageDeltaInfo{} |
| for i := range triageDeltaInfos { |
| byKey[groupingDigestKey{ |
| groupingID: sql.AsMD5Hash(triageDeltaInfos[i].groupingID), |
| digest: sql.AsMD5Hash(triageDeltaInfos[i].digest), |
| }] = &triageDeltaInfos[i] |
| } |
| |
| for key, triageDeltaInfo := range byKey { |
| label, ok := primaryBranchLabels[key] |
| if !ok { |
| label = schema.LabelUntriaged |
| } |
| triageDeltaInfo.LabelBefore = label.ToExpectation() |
| } |
| |
| return nil |
| } |
| |
| // populateLabelBeforeForCL populates the LabelBefore field of each passed in |
| // extendedBulkTriageDeltaInfo with expectations for a CL. |
| // |
| // It mirrors the verifySecondaryBranchLabelBefore function in web.go |
| // (https://skia.googlesource.com/buildbot/+/refs/heads/main/golden/go/web/web.go#1231). |
| func (s *Impl) populateLabelBeforeForCL(ctx context.Context, triageDeltaInfos []extendedBulkTriageDeltaInfo) error { |
| // Gather the relevant labels from the Expectations table. |
| primaryBranchLabels, err := s.findPrimaryBranchLabels(ctx, triageDeltaInfos) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| |
| // Gather the relevant labels from the SecondaryBranchExpectations table. |
| secondaryBranchLabels, err := s.findSecondaryBranchLabels(ctx, triageDeltaInfos) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| |
| // Place extendedBulkTriageDeltaInfos in a map for faster querying. |
| byKey := map[groupingDigestKey]*extendedBulkTriageDeltaInfo{} |
| for i := range triageDeltaInfos { |
| byKey[groupingDigestKey{ |
| groupingID: sql.AsMD5Hash(triageDeltaInfos[i].groupingID), |
| digest: sql.AsMD5Hash(triageDeltaInfos[i].digest), |
| }] = &triageDeltaInfos[i] |
| } |
| |
| for key, triageDeltaInfo := range byKey { |
| label, ok := secondaryBranchLabels[key] |
| if !ok { |
| label, ok = primaryBranchLabels[key] |
| } |
| if !ok { |
| label = schema.LabelUntriaged |
| } |
| triageDeltaInfo.LabelBefore = label.ToExpectation() |
| } |
| |
| return nil |
| } |
| |
| type traceDigestKey struct { |
| traceID schema.MD5Hash |
| digest schema.MD5Hash |
| } |
| |
| // populateExtendedBulkTriageDeltaInfosOptionsIDs populates the optionsIDs field of the given |
| // extendedBulkTriageDeltaInfos. |
| func (s *Impl) populateExtendedBulkTriageDeltaInfosOptionsIDs(ctx context.Context, triageDeltaInfos []extendedBulkTriageDeltaInfo) error { |
| // Map triageDeltaInfos by trace ID and digest for faster querying, and gather all trace IDs. |
| var allTraceIDs []schema.TraceID |
| triageDeltaInfosByTraceAndDigest := map[traceDigestKey]*extendedBulkTriageDeltaInfo{} |
| for i := range triageDeltaInfos { |
| allTraceIDs = append(allTraceIDs, triageDeltaInfos[i].traceIDs...) |
| for _, traceID := range triageDeltaInfos[i].traceIDs { |
| key := traceDigestKey{ |
| traceID: sql.AsMD5Hash(traceID), |
| digest: sql.AsMD5Hash(triageDeltaInfos[i].digest), |
| } |
| triageDeltaInfosByTraceAndDigest[key] = &triageDeltaInfos[i] |
| } |
| } |
| |
| const statement = `SELECT trace_id, digest, options_id FROM TraceValues |
| WHERE trace_id = ANY($1) AND commit_id >= $2` |
| rows, err := s.db.Query(ctx, statement, allTraceIDs, common.GetFirstCommitID(ctx)) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| defer rows.Close() |
| |
| var traceID schema.TraceID |
| var digest schema.DigestBytes |
| var optionsID schema.OptionsID |
| for rows.Next() { |
| if err := rows.Scan(&traceID, &digest, &optionsID); err != nil { |
| return skerr.Wrap(err) |
| } |
| key := traceDigestKey{ |
| traceID: sql.AsMD5Hash(traceID), |
| digest: sql.AsMD5Hash(digest), |
| } |
| if triageDeltaInfo, ok := triageDeltaInfosByTraceAndDigest[key]; ok { |
| triageDeltaInfo.optionsIDs = append(triageDeltaInfo.optionsIDs, optionsID) |
| } |
| } |
| return nil |
| } |
| |
| // prepareExtendedBulkTriageDeltaInfosForFrontend turns extendedBulkTriageDeltaInfo structs into |
| // frontend.BulkTriageDeltaInfo structs, and filters out those with disallow_triaging=true. |
| func (s *Impl) prepareExtendedBulkTriageDeltaInfosForFrontend(ctx context.Context, extendedBulkTriageDeltaInfos []extendedBulkTriageDeltaInfo) ([]frontend.BulkTriageDeltaInfo, error) { |
| // The frontend expects a non-null array. |
| bulkTriageDeltaInfos := []frontend.BulkTriageDeltaInfo{} |
| for _, triageDeltaInfo := range extendedBulkTriageDeltaInfos { |
| disallowTriaging := false |
| for _, optionsID := range triageDeltaInfo.optionsIDs { |
| options, err := s.expandOptionsToParams(ctx, optionsID) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| if options["disallow_triaging"] == "true" { |
| disallowTriaging = true |
| break |
| } |
| } |
| if !disallowTriaging { |
| bulkTriageDeltaInfos = append(bulkTriageDeltaInfos, triageDeltaInfo.BulkTriageDeltaInfo) |
| } |
| } |
| return bulkTriageDeltaInfos, nil |
| } |
| |
| // expandGrouping returns the params associated with the grouping id. It will use the cache - if |
| // there is a cache miss, it will look it up, add it to the cache and return it. |
| func (s *Impl) expandGrouping(ctx context.Context, groupingID schema.MD5Hash) (paramtools.Params, error) { |
| var groupingKeys paramtools.Params |
| if gk, ok := s.optionsGroupingCache.Get(groupingID); ok { |
| return gk.(paramtools.Params), nil |
| } else { |
| const statement = `SELECT keys FROM Groupings WHERE grouping_id = $1` |
| row := s.db.QueryRow(ctx, statement, groupingID[:]) |
| if err := row.Scan(&groupingKeys); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| s.optionsGroupingCache.Add(groupingID, groupingKeys) |
| } |
| return groupingKeys, nil |
| } |
| |
| // getCommits returns the front-end friendly version of the commits within the searched window. |
| func (s *Impl) getCommits(ctx context.Context) ([]frontend.Commit, error) { |
| ctx, span := trace.StartSpan(ctx, "getCommits") |
| defer span.End() |
| rv := make([]frontend.Commit, common.GetActualWindowLength(ctx)) |
| commitIDs := common.GetCommitToIdxMap(ctx) |
| for commitID, idx := range commitIDs { |
| var commit frontend.Commit |
| if c, ok := s.commitCache.Get(commitID); ok { |
| commit = c.(frontend.Commit) |
| } else { |
| if isStandardGitCommitID(commitID) { |
| const statement = `SELECT git_hash, commit_time, author_email, subject |
| FROM GitCommits WHERE commit_id = $1` |
| row := s.db.QueryRow(ctx, statement, commitID) |
| var dbRow schema.GitCommitRow |
| if err := row.Scan(&dbRow.GitHash, &dbRow.CommitTime, &dbRow.AuthorEmail, &dbRow.Subject); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| commit = frontend.Commit{ |
| CommitTime: dbRow.CommitTime.UTC().Unix(), |
| ID: string(commitID), |
| Hash: dbRow.GitHash, |
| Author: dbRow.AuthorEmail, |
| Subject: dbRow.Subject, |
| } |
| s.commitCache.Add(commitID, commit) |
| } else { |
| commit = frontend.Commit{ |
| ID: string(commitID), |
| } |
| s.commitCache.Add(commitID, commit) |
| } |
| } |
| rv[idx] = commit |
| } |
| return rv, nil |
| } |
| |
| // isStandardGitCommitID detects our standard commit ids for git repos (monotonically increasing |
| // integers). It returns false if that is not being used (e.g. for instances that don't use that |
| // as their ID) |
| func isStandardGitCommitID(id schema.CommitID) bool { |
| _, err := strconv.ParseInt(string(id), 10, 64) |
| return err == nil |
| } |
| |
| // searchCLData returns the search response for the given CL's data (or an error if no such data |
| // exists). It reuses much of the same pipeline structure as the normal search, with a few key |
| // differences. It prepends the data to all traces, pretending as if the CL were to land and the |
| // new data would be drawn at ToT (this can be confusing for CLs which already landed). |
| func (s *Impl) searchCLData(ctx context.Context) (*frontend.SearchResponse, error) { |
| ctx, span := trace.StartSpan(ctx, "searchCLData") |
| defer span.End() |
| var err error |
| ctx, err = s.addCLData(ctx) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| commits, err := s.getCommits(ctx) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| if commits, err = s.addCLCommit(ctx, commits); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| // Find all digests and traces that match the given search criteria. |
| // This will be filtered according to the publiclyAllowedParams as well. |
| traceDigests, err := s.getMatchingDigestsAndTracesForCL(ctx) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| // Lookup the closest diffs on the primary branch to the given digests. This returns a subset |
| // according to the limit and offset in the query. |
| // TODO(kjlubick) perhaps we want to include the digests produced by this CL/PS as well? |
| closestDiffs, extendedBulkTriageDeltaInfos, err := s.getClosestDiffs(ctx, traceDigests) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| // Go fetch history and paramset (within this grouping, and respecting publiclyAllowedParams). |
| paramsetsByDigest, err := s.getParamsetsForRightSide(ctx, closestDiffs) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| // Flesh out the trace history with enough data to draw the dots diagram on the frontend. |
| results, err := s.fillOutTraceHistory(ctx, closestDiffs) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| // Populate the LabelBefore fields of the extendedBulkTriageDeltaInfos with expectations from |
| // the CL. |
| if err := s.populateLabelBeforeForCL(ctx, extendedBulkTriageDeltaInfos); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| // Fill in the paramsets of the reference images. |
| for _, sr := range results { |
| for _, srdd := range sr.RefDiffs { |
| if srdd != nil { |
| srdd.ParamSet = paramsetsByDigest[srdd.Digest] |
| } |
| } |
| } |
| |
| bulkTriageDeltaInfos, err := s.prepareExtendedBulkTriageDeltaInfosForFrontend(ctx, extendedBulkTriageDeltaInfos) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| return &frontend.SearchResponse{ |
| Results: results, |
| Offset: common.GetQuery(ctx).Offset, |
| Size: len(extendedBulkTriageDeltaInfos), |
| BulkTriageDeltaInfos: bulkTriageDeltaInfos, |
| Commits: commits, |
| }, nil |
| } |
| |
| // addCLData returns a context with some CL-specific data added as values. If the data can not be |
| // verified, an error is returned. |
| func (s *Impl) addCLData(ctx context.Context) (context.Context, error) { |
| ctx, span := trace.StartSpan(ctx, "addCLData") |
| defer span.End() |
| q := common.GetQuery(ctx) |
| qCLID := sql.Qualify(q.CodeReviewSystemID, q.ChangelistID) |
| const statement = `SELECT patchset_id FROM Patchsets WHERE |
| changelist_id = $1 AND system = $2 AND ps_order = $3` |
| row := s.db.QueryRow(ctx, statement, qCLID, q.CodeReviewSystemID, q.Patchsets[0]) |
| var qPSID string |
| if err := row.Scan(&qPSID); err != nil { |
| if err == pgx.ErrNoRows { |
| return nil, skerr.Fmt("CL %q has no PS with order %d: %#v", qCLID, q.Patchsets[0], q) |
| } |
| return nil, skerr.Wrap(err) |
| } |
| ctx = context.WithValue(ctx, common.QualifiedCLIDKey, qCLID) |
| ctx = context.WithValue(ctx, common.QualifiedPSIDKey, qPSID) |
| return ctx, nil |
| } |
| |
| // addCLCommit adds a fake commit to the end of the trace data to represent the data for this CL. |
| func (s *Impl) addCLCommit(ctx context.Context, commits []frontend.Commit) ([]frontend.Commit, error) { |
| ctx, span := trace.StartSpan(ctx, "addCLCommit") |
| defer span.End() |
| q := common.GetQuery(ctx) |
| |
| urlTemplate, ok := s.reviewSystemMapping[q.CodeReviewSystemID] |
| if !ok { |
| return nil, skerr.Fmt("unknown CRS %s", q.CodeReviewSystemID) |
| } |
| |
| qCLID := sql.Qualify(q.CodeReviewSystemID, q.ChangelistID) |
| const statement = `SELECT owner_email, subject, last_ingested_data FROM Changelists |
| WHERE changelist_id = $1` |
| row := s.db.QueryRow(ctx, statement, qCLID) |
| var cl schema.ChangelistRow |
| if err := row.Scan(&cl.OwnerEmail, &cl.Subject, &cl.LastIngestedData); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| return append(commits, frontend.Commit{ |
| CommitTime: cl.LastIngestedData.UTC().Unix(), |
| Hash: q.ChangelistID, |
| Author: cl.OwnerEmail, |
| Subject: cl.Subject, |
| ChangelistURL: fmt.Sprintf(urlTemplate, q.ChangelistID), |
| }), nil |
| } |
| |
| // getMatchingDigestsAndTracesForCL returns all data produced at the specified Changelist and |
| // Patchset that matches the provided query. One key difference from searching on the primary |
| // branch is the fact that the "AtHead" option is meaningless. Another is that we can filter out |
| // data seen on the primary branch. We have this as an in-memory cache because it is much much |
| // faster than querying it live (even with good indexing). |
| func (s *Impl) getMatchingDigestsAndTracesForCL(ctx context.Context) ([]digestWithTraceAndGrouping, error) { |
| ctx, span := trace.StartSpan(ctx, "getMatchingDigestsAndTracesForCL") |
| defer span.End() |
| q := common.GetQuery(ctx) |
| statement := `WITH |
| CLDigests AS ( |
| SELECT secondary_branch_trace_id, digest, grouping_id, options_id |
| FROM SecondaryBranchValues |
| WHERE branch_name = $1 and version_name = $2 |
| ),` |
| mt, err := matchingCLTracesStatement(q.TraceValues, q.IncludeIgnoredTraces) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| statement += mt |
| statement += ` |
| MatchingCLDigests AS ( |
| SELECT trace_id, digest, grouping_id, options_id |
| FROM CLDigests JOIN MatchingTraces |
| ON CLDigests.secondary_branch_trace_id = MatchingTraces.trace_id |
| ), |
| CLExpectations AS ( |
| SELECT grouping_id, digest, label |
| FROM SecondaryBranchExpectations |
| WHERE branch_name = $1 |
| ) |
| SELECT trace_id, MatchingCLDigests.grouping_id, MatchingCLDigests.digest, options_id |
| FROM MatchingCLDigests |
| LEFT JOIN Expectations |
| ON MatchingCLDigests.grouping_id = Expectations.grouping_id AND |
| MatchingCLDigests.digest = Expectations.digest |
| LEFT JOIN CLExpectations |
| ON MatchingCLDigests.grouping_id = CLExpectations.grouping_id AND |
| MatchingCLDigests.digest = CLExpectations.digest |
| WHERE COALESCE(CLExpectations.label, COALESCE(Expectations.label, 'u')) = ANY($3) |
| ` |
| |
| var triageStatuses []schema.ExpectationLabel |
| if q.IncludeUntriagedDigests { |
| triageStatuses = append(triageStatuses, schema.LabelUntriaged) |
| } |
| if q.IncludeNegativeDigests { |
| triageStatuses = append(triageStatuses, schema.LabelNegative) |
| } |
| if q.IncludePositiveDigests { |
| triageStatuses = append(triageStatuses, schema.LabelPositive) |
| } |
| |
| rows, err := s.db.Query(ctx, statement, common.GetQualifiedCL(ctx), common.GetQualifiedPS(ctx), triageStatuses) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| var rv []digestWithTraceAndGrouping |
| s.mutex.RLock() |
| defer s.mutex.RUnlock() |
| var traceKey schema.MD5Hash |
| var key groupingDigestKey |
| keyGrouping := key.groupingID[:] |
| keyDigest := key.digest[:] |
| for rows.Next() { |
| var row digestWithTraceAndGrouping |
| if err := rows.Scan(&row.traceID, &row.groupingID, &row.digest, &row.optionsID); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| if s.publiclyVisibleTraces != nil { |
| copy(traceKey[:], row.traceID) |
| if _, ok := s.publiclyVisibleTraces[traceKey]; !ok { |
| continue |
| } |
| } |
| if !q.IncludeDigestsProducedOnMaster { |
| copy(keyGrouping, row.groupingID) |
| copy(keyDigest, row.digest) |
| if _, existsOnPrimary := s.digestsOnPrimary[key]; existsOnPrimary { |
| continue |
| } |
| } |
| rv = append(rv, row) |
| } |
| return rv, nil |
| } |
| |
| // matchingCLTracesStatement returns |
| func matchingCLTracesStatement(ps paramtools.ParamSet, includeIgnored bool) (string, error) { |
| corpora := ps[types.CorpusField] |
| if len(corpora) == 0 { |
| return "", skerr.Fmt("Corpus must be specified: %v", ps) |
| } |
| corpus := corpora[0] |
| if corpus != sql.Sanitize(corpus) { |
| return "", skerr.Fmt("Invalid corpus: %q", corpus) |
| } |
| corpusStatement := `SELECT trace_id FROM Traces WHERE corpus = '` + corpus + `' AND (matches_any_ignore_rule ` |
| if includeIgnored { |
| corpusStatement += "IS NOT NULL)" |
| } else { |
| corpusStatement += "= FALSE OR matches_any_ignore_rule is NULL)" |
| } |
| if len(ps) == 1 { |
| return "MatchingTraces AS (\n\t" + corpusStatement + "\n),", nil |
| } |
| statement := "" |
| unionIndex := 0 |
| keys := make([]string, 0, len(ps)) |
| for key := range ps { |
| keys = append(keys, key) |
| } |
| sort.Strings(keys) // sort for determinism |
| for _, key := range keys { |
| if key == types.CorpusField { |
| continue |
| } |
| if key != sql.Sanitize(key) { |
| return "", skerr.Fmt("Invalid query key %q", key) |
| } |
| statement += fmt.Sprintf("U%d AS (\n", unionIndex) |
| for j, value := range ps[key] { |
| if value != sql.Sanitize(value) { |
| return "", skerr.Fmt("Invalid query value %q", value) |
| } |
| if j != 0 { |
| statement += "\tUNION\n" |
| } |
| statement += fmt.Sprintf("\tSELECT trace_id FROM Traces WHERE keys -> '%s' = '%q'\n", key, value) |
| } |
| statement += "),\n" |
| unionIndex++ |
| } |
| statement += "MatchingTraces AS (\n" |
| for i := 0; i < unionIndex; i++ { |
| statement += fmt.Sprintf("\tSELECT trace_id FROM U%d\n\tINTERSECT\n", i) |
| } |
| // Include a final intersect for the corpus |
| statement += "\t" + corpusStatement + "\n),\n" |
| return statement, nil |
| } |
| |
| // makeTraceGroup converts all the trace+digest+commit triples into a TraceGroup. On the frontend, |
| // we only show the top 9 digests before fading them to grey - this handles that logic. |
| // It is assumed that the slices in the data map are in ascending order of commits. |
| func makeTraceGroup(ctx context.Context, data map[schema.MD5Hash][]traceDigestCommit, primary types.Digest) (frontend.TraceGroup, error) { |
| ctx, span := trace.StartSpan(ctx, "makeTraceGroup") |
| defer span.End() |
| isCL := common.GetQualifiedCL(ctx) != "" |
| tg := frontend.TraceGroup{} |
| if len(data) == 0 { |
| return tg, nil |
| } |
| traceLength := common.GetActualWindowLength(ctx) |
| if isCL { |
| traceLength++ // We will append the current data to the end. |
| } |
| indexMap := common.GetCommitToIdxMap(ctx) |
| for trID, points := range data { |
| currentTrace := frontend.Trace{ |
| ID: tiling.TraceID(hex.EncodeToString(trID[:])), |
| DigestIndices: emptyIndices(traceLength), |
| RawTrace: tiling.NewEmptyTrace(traceLength, nil, nil), |
| } |
| for _, dp := range points { |
| if dp.optionsID != nil { |
| // We want to report the latest options, so always update this if non-nil. |
| currentTrace.RawTrace.OptionsID = dp.optionsID |
| } |
| idx, ok := indexMap[dp.commitID] |
| if !ok { |
| continue |
| } |
| currentTrace.RawTrace.Digests[idx] = dp.digest |
| } |
| tg.Traces = append(tg.Traces, currentTrace) |
| } |
| // Sort traces by ID for determinism |
| sort.Slice(tg.Traces, func(i, j int) bool { |
| return tg.Traces[i].ID < tg.Traces[j].ID |
| }) |
| |
| // Find the most recent / important digests and assign them an index. Everything else will |
| // be given the sentinel value. |
| digestIndices, totalDigests := computeDigestIndices(&tg, primary) |
| tg.TotalDigests = totalDigests |
| |
| tg.Digests = make([]frontend.DigestStatus, len(digestIndices)) |
| for digest, idx := range digestIndices { |
| tg.Digests[idx] = frontend.DigestStatus{ |
| Digest: digest, |
| Status: expectations.Untriaged, |
| } |
| } |
| |
| for _, tr := range tg.Traces { |
| for j, digest := range tr.RawTrace.Digests { |
| if j == len(tr.RawTrace.Digests)-1 && isCL { |
| // Put the CL Data (the primary digest here, aka index 0) as happening most |
| // recently at head. |
| tr.DigestIndices[j] = 0 |
| continue |
| } |
| if digest == tiling.MissingDigest { |
| continue // There is already the missing index there. |
| } |
| idx, ok := digestIndices[digest] |
| if ok { |
| tr.DigestIndices[j] = idx |
| } else { |
| // Fold everything else into the last digest index (grey on the frontend). |
| tr.DigestIndices[j] = maxDistinctDigestsToPresent - 1 |
| } |
| } |
| } |
| return tg, nil |
| } |
| |
| // emptyIndices returns an array of the given length with placeholder values for "missing data". |
| func emptyIndices(length int) []int { |
| rv := make([]int, length) |
| for i := range rv { |
| rv[i] = missingDigestIndex |
| } |
| return rv |
| } |
| |
| // GetPrimaryBranchParamset returns a possibly cached ParamSet of all visible traces over the last |
| // N tiles that correspond to the windowLength. |
| func (s *Impl) GetPrimaryBranchParamset(ctx context.Context) (paramtools.ReadOnlyParamSet, error) { |
| ctx, span := trace.StartSpan(ctx, "search2_GetPrimaryBranchParamset") |
| defer span.End() |
| const primaryBranchKey = "primary_branch" |
| if val, ok := s.paramsetCache.Get(primaryBranchKey); ok { |
| return val.(paramtools.ReadOnlyParamSet), nil |
| } |
| ctx, err := s.addCommitsData(ctx) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| if s.isPublicView { |
| roPS, err := s.getPublicParamsetForPrimaryBranch(ctx) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| s.paramsetCache.Set(primaryBranchKey, roPS, 5*time.Minute) |
| return roPS, nil |
| } |
| |
| const statement = `SELECT DISTINCT key, value FROM PrimaryBranchParams |
| WHERE tile_id >= $1` |
| rows, err := s.db.Query(ctx, statement, common.GetFirstTileID(ctx)) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| ps := paramtools.ParamSet{} |
| var key string |
| var value string |
| |
| for rows.Next() { |
| if err := rows.Scan(&key, &value); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| ps[key] = append(ps[key], value) // We rely on the SQL query to deduplicate values |
| } |
| ps.Normalize() |
| roPS := paramtools.ReadOnlyParamSet(ps) |
| s.paramsetCache.Set(primaryBranchKey, roPS, 5*time.Minute) |
| return roPS, nil |
| } |
| |
| func (s *Impl) getPublicParamsetForPrimaryBranch(ctx context.Context) (paramtools.ReadOnlyParamSet, error) { |
| ctx, span := trace.StartSpan(ctx, "getPublicParamsetForPrimaryBranch") |
| defer span.End() |
| |
| const statement = `SELECT trace_id, keys FROM ValuesAtHead WHERE most_recent_commit_id >= $1` |
| rows, err := s.db.Query(ctx, statement, common.GetFirstCommitID(ctx)) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| combinedParamset := paramtools.ParamSet{} |
| var ps paramtools.Params |
| var traceID schema.TraceID |
| var traceKey schema.MD5Hash |
| |
| s.mutex.RLock() |
| for rows.Next() { |
| if err := rows.Scan(&traceID, &ps); err != nil { |
| s.mutex.RUnlock() |
| return nil, skerr.Wrap(err) |
| } |
| copy(traceKey[:], traceID) |
| if _, ok := s.publiclyVisibleTraces[traceKey]; ok { |
| combinedParamset.AddParams(ps) |
| } |
| } |
| s.mutex.RUnlock() |
| combinedParamset.Normalize() |
| return paramtools.ReadOnlyParamSet(combinedParamset), nil |
| } |
| |
| // GetChangelistParamset returns a possibly cached ParamSet of all visible traces seen in the |
| // given changelist. It returns an error if no data has been seen for the given CL. |
| func (s *Impl) GetChangelistParamset(ctx context.Context, crs, clID string) (paramtools.ReadOnlyParamSet, error) { |
| ctx, span := trace.StartSpan(ctx, "search2_GetChangelistParamset") |
| defer span.End() |
| qCLID := sql.Qualify(crs, clID) |
| if val, ok := s.paramsetCache.Get(qCLID); ok { |
| return val.(paramtools.ReadOnlyParamSet), nil |
| } |
| if s.isPublicView { |
| roPS, err := s.getPublicParamsetForCL(ctx, qCLID) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| s.paramsetCache.Set(qCLID, roPS, 5*time.Minute) |
| return roPS, nil |
| } |
| const statement = `SELECT DISTINCT key, value FROM SecondaryBranchParams |
| WHERE branch_name = $1` |
| rows, err := s.db.Query(ctx, statement, qCLID) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| ps := paramtools.ParamSet{} |
| var key string |
| var value string |
| |
| for rows.Next() { |
| if err := rows.Scan(&key, &value); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| ps[key] = append(ps[key], value) // We rely on the SQL query to deduplicate values |
| } |
| if len(ps) == 0 { |
| return nil, skerr.Fmt("Could not find params for CL %q in system %q", clID, crs) |
| } |
| ps.Normalize() |
| roPS := paramtools.ReadOnlyParamSet(ps) |
| s.paramsetCache.Set(qCLID, roPS, 5*time.Minute) |
| return roPS, nil |
| } |
| |
| func (s *Impl) getPublicParamsetForCL(ctx context.Context, qualifiedCLID string) (paramtools.ReadOnlyParamSet, error) { |
| ctx, span := trace.StartSpan(ctx, "getPublicParamsetForCL") |
| defer span.End() |
| |
| const statement = `SELECT secondary_branch_trace_id FROM SecondaryBranchValues |
| WHERE branch_name = $1 |
| ` |
| rows, err := s.db.Query(ctx, statement, qualifiedCLID) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| var traceID schema.TraceID |
| var traceKey schema.MD5Hash |
| var traceIDsToExpand []schema.TraceID |
| s.mutex.RLock() |
| for rows.Next() { |
| if err := rows.Scan(&traceID); err != nil { |
| s.mutex.RUnlock() |
| return nil, skerr.Wrap(err) |
| } |
| copy(traceKey[:], traceID) |
| if _, ok := s.publiclyVisibleTraces[traceKey]; ok { |
| traceIDsToExpand = append(traceIDsToExpand, traceID) |
| } |
| } |
| s.mutex.RUnlock() |
| rows.Close() |
| |
| combinedParamset := paramtools.ParamSet{} |
| for _, traceID := range traceIDsToExpand { |
| ps, err := s.expandTraceToParams(ctx, traceID) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| combinedParamset.AddParams(ps) |
| } |
| |
| combinedParamset.Normalize() |
| return paramtools.ReadOnlyParamSet(combinedParamset), nil |
| } |
| |
| // GetBlamesForUntriagedDigests implements the API interface. |
| func (s *Impl) GetBlamesForUntriagedDigests(ctx context.Context, corpus string) (BlameSummaryV1, error) { |
| ctx, span := trace.StartSpan(ctx, "search2_GetBlamesForUntriagedDigests") |
| defer span.End() |
| |
| ctx, err := s.addCommitsData(ctx) |
| if err != nil { |
| return BlameSummaryV1{}, skerr.Wrap(err) |
| } |
| // Find untriaged digests at head and the traces that produced them. |
| tracesByDigest, err := s.getTracesWithUntriagedDigestsAtHead(ctx, corpus) |
| if err != nil { |
| return BlameSummaryV1{}, skerr.Wrap(err) |
| } |
| if s.isPublicView { |
| tracesByDigest = s.applyPublicFilter(ctx, tracesByDigest) |
| } |
| if len(tracesByDigest) == 0 { |
| return BlameSummaryV1{}, nil // No data, we can stop here |
| } |
| // Return the trace histories for those traces, as well as a mapping of the unique |
| // digest+grouping pairs in order to get expectations. |
| histories, _, err := s.getHistoriesForTraces(ctx, tracesByDigest) |
| if err != nil { |
| return BlameSummaryV1{}, skerr.Wrap(err) |
| } |
| // Expand grouping_ids into full params |
| groupings, err := s.expandGroupings(ctx, tracesByDigest) |
| if err != nil { |
| return BlameSummaryV1{}, skerr.Wrap(err) |
| } |
| commits, err := s.getCommits(ctx) |
| if err != nil { |
| return BlameSummaryV1{}, skerr.Wrap(err) |
| } |
| // Look at trace histories and identify ranges of commits that caused us to go from drawing |
| // triaged digests to untriaged digests. |
| ranges := combineIntoRanges(ctx, histories, groupings, commits) |
| return BlameSummaryV1{ |
| Ranges: ranges, |
| }, nil |
| } |
| |
| type traceData []types.Digest |
| |
| // getTracesWithUntriagedDigestsAtHead identifies all untriaged digests being produced at head |
| // within the current window and returns all traces responsible for that behavior, clustered by the |
| // digest+grouping at head. This clustering allows us to better identify the commit(s) that caused |
| // the change, even with sparse data. |
| func (s *Impl) getTracesWithUntriagedDigestsAtHead(ctx context.Context, corpus string) (map[groupingDigestKey][]schema.TraceID, error) { |
| ctx, span := trace.StartSpan(ctx, "getTracesWithUntriagedDigestsAtHead") |
| defer span.End() |
| |
| byBlameData, err := s.cacheManager.GetByBlameData(ctx, string(common.GetFirstCommitID(ctx)), corpus) |
| if err != nil { |
| sklog.Errorf("Error encountered when retrieving ByBlame data from cache: %v", err) |
| return nil, err |
| } |
| |
| sklog.Debugf("Retrieved %d items from search cache for corpus %s", len(byBlameData), corpus) |
| rv := map[groupingDigestKey][]schema.TraceID{} |
| var key groupingDigestKey |
| groupingKey := key.groupingID[:] |
| digestKey := key.digest[:] |
| for _, data := range byBlameData { |
| copy(groupingKey, data.GroupingID) |
| copy(digestKey, data.Digest) |
| rv[key] = append(rv[key], data.TraceID) |
| } |
| |
| return rv, nil |
| } |
| |
| // applyPublicFilter filters the traces according to the publicly visible traces map. |
| func (s *Impl) applyPublicFilter(ctx context.Context, data map[groupingDigestKey][]schema.TraceID) map[groupingDigestKey][]schema.TraceID { |
| ctx, span := trace.StartSpan(ctx, "applyPublicFilter") |
| defer span.End() |
| filtered := make(map[groupingDigestKey][]schema.TraceID, len(data)) |
| s.mutex.RLock() |
| defer s.mutex.RUnlock() |
| var traceKey schema.MD5Hash |
| traceID := traceKey[:] |
| for gdk, traces := range data { |
| for _, tr := range traces { |
| copy(traceID, tr) |
| if _, ok := s.publiclyVisibleTraces[traceKey]; ok { |
| filtered[gdk] = append(filtered[gdk], tr) |
| } |
| } |
| } |
| return filtered |
| } |
| |
| // untriagedDigestAtHead represents a single untriaged digest in a particular grouping observed |
| // at head. It includes the histories of all traces that are |
| type untriagedDigestAtHead struct { |
| atHead groupingDigestKey |
| traces []traceIDAndData |
| } |
| |
| type traceIDAndData struct { |
| id schema.TraceID |
| data traceData |
| } |
| |
| // getHistoriesForTraces looks up the commits in the current window (aka the trace history) for all |
| // traces in the provided map. It returns them associated with the digest+grouping that was |
| // produced at head, as well as a map corresponding to all unique digests seen in these histories |
| // (to look up expectations). |
| func (s *Impl) getHistoriesForTraces(ctx context.Context, traces map[groupingDigestKey][]schema.TraceID) ([]untriagedDigestAtHead, map[groupingDigestKey]bool, error) { |
| ctx, span := trace.StartSpan(ctx, "getHistoriesForTraces") |
| defer span.End() |
| tracesToDigest := map[schema.MD5Hash]groupingDigestKey{} |
| var tracesToLookup []schema.TraceID |
| for gdk, traceIDs := range traces { |
| for _, traceID := range traceIDs { |
| tracesToDigest[sql.AsMD5Hash(traceID)] = gdk |
| tracesToLookup = append(tracesToLookup, traceID) |
| } |
| } |
| span.AddAttributes(trace.Int64Attribute("num_traces", int64(len(tracesToLookup)))) |
| const statement = `SELECT trace_id, commit_id, digest FROM TraceValues |
| WHERE commit_id >= $1 and trace_id = ANY($2) |
| ORDER BY trace_id` |
| rows, err := s.db.Query(ctx, statement, common.GetFirstCommitID(ctx), tracesToLookup) |
| if err != nil { |
| return nil, nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| traceLength := common.GetActualWindowLength(ctx) |
| commitIdxMap := common.GetCommitToIdxMap(ctx) |
| tracesByDigest := make(map[groupingDigestKey][]traceIDAndData, len(traces)) |
| uniqueDigestsByGrouping := map[groupingDigestKey]bool{} |
| var currentTraceID schema.TraceID |
| var currentTraceData traceData |
| var key schema.MD5Hash |
| for rows.Next() { |
| var traceID schema.TraceID |
| var commitID schema.CommitID |
| var digest schema.DigestBytes |
| if err := rows.Scan(&traceID, &commitID, &digest); err != nil { |
| return nil, nil, skerr.Wrap(err) |
| } |
| copy(key[:], traceID) |
| gdk := tracesToDigest[key] |
| // Note that we've seen this digest on this grouping so we can look up the expectations. |
| uniqueDigestsByGrouping[groupingDigestKey{ |
| groupingID: gdk.groupingID, |
| digest: sql.AsMD5Hash(digest), |
| }] = true |
| if !bytes.Equal(traceID, currentTraceID) || currentTraceData == nil { |
| currentTraceID = traceID |
| // Make a new slice of digests (traceData) and associated it with the correct |
| // grouping+digest |
| currentTraceData = make(traceData, traceLength) |
| tracesByDigest[gdk] = append(tracesByDigest[gdk], traceIDAndData{ |
| id: currentTraceID, |
| data: currentTraceData, |
| }) |
| } |
| idx, ok := commitIdxMap[commitID] |
| if !ok { |
| continue // commit is out of range or too new |
| } |
| currentTraceData[idx] = types.Digest(hex.EncodeToString(digest)) |
| } |
| // Flatten the map into a sorted slice for determinism |
| var rv []untriagedDigestAtHead |
| for key, traces := range tracesByDigest { |
| rv = append(rv, untriagedDigestAtHead{ |
| atHead: key, |
| traces: traces, |
| }) |
| } |
| sort.Slice(rv, func(i, j int) bool { |
| return bytes.Compare(rv[i].atHead.digest[:], rv[j].atHead.digest[:]) <= 0 |
| }) |
| return rv, uniqueDigestsByGrouping, nil |
| } |
| |
| // expectationKey represents a digest+grouping in a way that is easier to look up from the data |
| // in a trace history (e.g. a hex-encoded digest). |
| type expectationKey struct { |
| groupingID schema.MD5Hash |
| digest types.Digest |
| } |
| |
| // getExpectations looks up the expectations for the given pairs of digests+grouping. |
| func (s *Impl) getExpectations(ctx context.Context, digests map[groupingDigestKey]bool) (map[expectationKey]expectations.Label, error) { |
| ctx, span := trace.StartSpan(ctx, "getExpectations") |
| defer span.End() |
| byGrouping := map[schema.MD5Hash][]schema.DigestBytes{} |
| for gdk := range digests { |
| byGrouping[gdk.groupingID] = append(byGrouping[gdk.groupingID], sql.FromMD5Hash(gdk.digest)) |
| } |
| |
| exp := map[expectationKey]expectations.Label{} |
| var mutex sync.Mutex |
| eg, eCtx := errgroup.WithContext(ctx) |
| for g, d := range byGrouping { |
| groupingKey, digests := g, d |
| eg.Go(func() error { |
| const statement = `SELECT encode(digest, 'hex'), label FROM Expectations |
| WHERE grouping_id = $1 and digest = ANY($2)` |
| rows, err := s.db.Query(eCtx, statement, groupingKey[:], digests) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| defer rows.Close() |
| mutex.Lock() |
| defer mutex.Unlock() |
| var digest types.Digest |
| var label schema.ExpectationLabel |
| for rows.Next() { |
| if err := rows.Scan(&digest, &label); err != nil { |
| return skerr.Wrap(err) |
| } |
| exp[expectationKey{ |
| groupingID: groupingKey, |
| digest: digest, |
| }] = label.ToExpectation() |
| } |
| return nil |
| }) |
| } |
| if err := eg.Wait(); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| return exp, nil |
| } |
| |
| // expandGroupings returns a map of schema.GroupingIDs (as md5 hashes) to their related params. |
| func (s *Impl) expandGroupings(ctx context.Context, groupings map[groupingDigestKey][]schema.TraceID) (map[schema.MD5Hash]paramtools.Params, error) { |
| ctx, span := trace.StartSpan(ctx, "getHistoriesForTraces") |
| defer span.End() |
| |
| rv := map[schema.MD5Hash]paramtools.Params{} |
| for gdk := range groupings { |
| if _, ok := rv[gdk.groupingID]; ok { |
| continue |
| } |
| grouping, err := s.expandGrouping(ctx, gdk.groupingID) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| rv[gdk.groupingID] = grouping |
| } |
| return rv, nil |
| } |
| |
| // combineIntoRanges looks at the histories for all the traces provided starting at the earliest |
| // (head) and working backwards. It looks for the change from drawing the untriaged digests at head |
| // to drawing a different digest, and tries to identify which commits caused that. There could be |
| // multiple commits in the window that have affected different tests, so this algorithm combines |
| // ranges and returns them as a slice, with the commits that produced the most untriaged digests |
| // coming first. It is recommended to look at the tests for this function to see some examples. |
| func combineIntoRanges(ctx context.Context, digests []untriagedDigestAtHead, groupings map[schema.MD5Hash]paramtools.Params, commits []frontend.Commit) []BlameEntry { |
| ctx, span := trace.StartSpan(ctx, "combineIntoRanges") |
| defer span.End() |
| |
| type tracesByBlameRange map[string][]schema.TraceID |
| |
| entriesByRange := map[string]BlameEntry{} |
| for _, data := range digests { |
| key := data.atHead |
| // The digest in key represents an untriaged digest seen at head. |
| // traces represents all the traces that are drawing that untriaged digest at head. |
| // We would like to identify the narrowest range that this change could have happened. |
| latestUntriagedDigest := types.Digest(hex.EncodeToString(sql.FromMD5Hash(key.digest))) |
| // Last commit before the untriaged digest at head was first produced. |
| blameStartIdx := -1 |
| // First commit to produce the untriaged digest at head. |
| blameEndIdx := len(commits) |
| // (trace ID, digest) pairs for all traces that are drawing the untriaged digest at head. |
| var traceIDsAndDigests []traceIDAndDigest |
| |
| for _, tr := range data.traces { |
| traceIDsAndDigests = append(traceIDsAndDigests, traceIDAndDigest{ |
| id: tr.id, |
| digest: key.digest[:], |
| }) |
| |
| // Identify the range at which the latest untriaged digest first occurred. For example, |
| // the range at which the latest untriaged digest "c" first occurred in trace |
| // "AAA-b--cc-" is 5:7. |
| latestUntriagedDigestFound := false |
| latestUntriagedDigestEarliestOccurrenceStartIdx := -1 |
| latestUntriagedDigestEarliestOccurrenceEndIdx := len(commits) |
| |
| for i := len(tr.data) - 1; i >= 0; i-- { |
| digest := tr.data[i] |
| if !latestUntriagedDigestFound { |
| if digest == tiling.MissingDigest { |
| continue |
| } else if digest == latestUntriagedDigest { |
| latestUntriagedDigestFound = true |
| } else { |
| break |
| } |
| } |
| if digest == latestUntriagedDigest { |
| latestUntriagedDigestEarliestOccurrenceStartIdx = i |
| latestUntriagedDigestEarliestOccurrenceEndIdx = i |
| } else if digest == tiling.MissingDigest { |
| latestUntriagedDigestEarliestOccurrenceStartIdx = i |
| } else { |
| break |
| } |
| } |
| |
| // If the current blame range, and the range at which the latest untriaged digest first |
| // occurred are disjoint, use the earliest of the two as the new blame range. |
| // |
| // Example traces: |
| // |
| // AA--cccccc |
| // BABABB--cc |
| // |
| // In this example, the second trace is very flaky, and the untriaged digest is not |
| // produced until several commits after the offending commit landed. The resulting |
| // ranges are 2:4 and 6:8. |
| // |
| // We use the earliest range as the new blame range (2:4 in the above example) as that |
| // is where the offending commit is likely found. |
| disjointRanges := blameEndIdx < latestUntriagedDigestEarliestOccurrenceStartIdx || |
| latestUntriagedDigestEarliestOccurrenceEndIdx < blameStartIdx+1 |
| if disjointRanges { |
| if latestUntriagedDigestEarliestOccurrenceEndIdx < blameStartIdx+1 { |
| // Update blame range to equal the earliest of the two ranges. |
| blameStartIdx = latestUntriagedDigestEarliestOccurrenceStartIdx - 1 |
| blameEndIdx = latestUntriagedDigestEarliestOccurrenceEndIdx |
| } else { |
| // Nothing to do, as the current blame range is the earliest of the two ranges. |
| } |
| } else { |
| if blameStartIdx < latestUntriagedDigestEarliestOccurrenceStartIdx-1 { |
| blameStartIdx = latestUntriagedDigestEarliestOccurrenceStartIdx - 1 |
| } |
| if blameEndIdx > latestUntriagedDigestEarliestOccurrenceEndIdx { |
| blameEndIdx = latestUntriagedDigestEarliestOccurrenceEndIdx |
| } |
| } |
| } |
| |
| // blameStartIdx is now either -1 (for beginning of tile) or the index of the last known |
| // digest that was different from the untriaged digest at head. |
| if blameStartIdx == -1 && blameEndIdx == len(commits) { |
| continue // We didn't find any untriaged digests on this trace |
| } |
| // We know have identified a blame range that has accounted for one additional untriaged |
| // digest at head (and possibly others before that). |
| blameRange, blameCommits := getRangeAndBlame(commits, blameStartIdx, blameEndIdx) |
| entry, ok := entriesByRange[blameRange] |
| if !ok { |
| entry.CommitRange = blameRange |
| entry.Commits = blameCommits |
| } |
| entry.TotalUntriagedDigests++ |
| // Find the grouping associated with this digest if it already is in the list. |
| found := false |
| for _, ag := range entry.AffectedGroupings { |
| if ag.groupingID == key.groupingID { |
| found = true |
| ag.UntriagedDigests++ |
| ag.traceIDsAndDigests = append(ag.traceIDsAndDigests, traceIDsAndDigests...) |
| break |
| } |
| } |
| if !found { |
| entry.AffectedGroupings = append(entry.AffectedGroupings, &AffectedGrouping{ |
| Grouping: groupings[key.groupingID], |
| UntriagedDigests: 1, |
| SampleDigest: types.Digest(hex.EncodeToString(key.digest[:])), |
| groupingID: key.groupingID, |
| traceIDsAndDigests: traceIDsAndDigests, |
| }) |
| } |
| entriesByRange[blameRange] = entry |
| } |
| // Sort data so the "biggest changes" come first (and perform other cleanups) |
| blameEntries := make([]BlameEntry, 0, len(entriesByRange)) |
| for _, entry := range entriesByRange { |
| sort.Slice(entry.AffectedGroupings, func(i, j int) bool { |
| if entry.AffectedGroupings[i].UntriagedDigests == entry.AffectedGroupings[j].UntriagedDigests { |
| // Tiebreak on sample digest |
| return entry.AffectedGroupings[i].SampleDigest < entry.AffectedGroupings[j].SampleDigest |
| } |
| // Otherwise, put the grouping with the most digests first |
| return entry.AffectedGroupings[i].UntriagedDigests > entry.AffectedGroupings[j].UntriagedDigests |
| }) |
| for _, ag := range entry.AffectedGroupings { |
| // Sort the traceIDsAndDigests to ensure a deterministic response. |
| sort.Slice(ag.traceIDsAndDigests, func(i, j int) bool { |
| traceComparison := bytes.Compare(ag.traceIDsAndDigests[i].id, ag.traceIDsAndDigests[j].id) |
| if traceComparison == -1 { |
| return true |
| } else if traceComparison == 0 { |
| // Tiebreak on the digest. |
| if bytes.Compare(ag.traceIDsAndDigests[i].digest, ag.traceIDsAndDigests[j].digest) == -1 { |
| return true |
| } |
| return false |
| } |
| return false |
| }) |
| } |
| blameEntries = append(blameEntries, entry) |
| } |
| // Sort so those ranges with more untriaged digests come first. |
| sort.Slice(blameEntries, func(i, j int) bool { |
| if blameEntries[i].TotalUntriagedDigests == blameEntries[j].TotalUntriagedDigests { |
| // tie break on the commit range |
| return blameEntries[i].CommitRange < blameEntries[j].CommitRange |
| } |
| return blameEntries[i].TotalUntriagedDigests > blameEntries[j].TotalUntriagedDigests |
| }) |
| return blameEntries |
| } |
| |
| // getRangeAndBlame returns a range identifier (either a single commit id or a start and end |
| // commit id separated by a colon) and the corresponding web commit objects. |
| func getRangeAndBlame(commits []frontend.Commit, startIndex, endIndex int) (string, []frontend.Commit) { |
| endCommit := commits[endIndex] |
| // If the indexes are within 1 (or rarely, equal), we have pinned the range down to one commit. |
| // If startIndex is -1, then we have no data all the way to the beginning of the window - this |
| // commonly happens when a new test is added. |
| if (endIndex-startIndex) == 1 || startIndex == endIndex || startIndex == -1 { |
| return endCommit.ID, []frontend.Commit{endCommit} |
| } |
| // Add 1 because startIndex is the last known "good" index, and we want our blamelist to only |
| // encompass "bad" commits. |
| startCommit := commits[startIndex+1] |
| return fmt.Sprintf("%s:%s", startCommit.ID, endCommit.ID), commits[startIndex+1 : endIndex+1] |
| } |
| |
| // GetCluster implements the API interface. |
| // TODO(kjlubick) Handle CL data (frontend currently does not). |
| func (s *Impl) GetCluster(ctx context.Context, opts ClusterOptions) (frontend.ClusterDiffResult, error) { |
| ctx, span := trace.StartSpan(ctx, "search2_GetCluster") |
| defer span.End() |
| |
| ctx, err := s.addCommitsData(ctx) |
| if err != nil { |
| return frontend.ClusterDiffResult{}, skerr.Wrap(err) |
| } |
| // Fetch all digests and traces that match the options. If this is a public view, those traces |
| // will be filtered. |
| digestsAndTraces, err := s.getDigestsAndTracesForCluster(ctx, opts) |
| if err != nil { |
| return frontend.ClusterDiffResult{}, skerr.Wrap(err) |
| } |
| nodes, links, err := s.getLinks(ctx, digestsAndTraces) |
| if err != nil { |
| return frontend.ClusterDiffResult{}, skerr.Wrap(err) |
| } |
| byDigest, combined, err := s.getParamsetsForCluster(ctx, digestsAndTraces) |
| if err != nil { |
| return frontend.ClusterDiffResult{}, skerr.Wrap(err) |
| } |
| |
| return frontend.ClusterDiffResult{ |
| Nodes: nodes, |
| Links: links, |
| Test: types.TestName(opts.Grouping[types.PrimaryKeyField]), |
| ParamsetByDigest: byDigest, |
| ParamsetsUnion: combined, |
| }, nil |
| } |
| |
| type digestClusterInfo struct { |
| label schema.ExpectationLabel |
| traceIDs []schema.TraceID |
| optionsIDs []schema.OptionsID |
| } |
| |
| // getDigestsAndTracesForCluster returns the digests, traces, and options *at head* that match the |
| // given cluster options. |
| func (s *Impl) getDigestsAndTracesForCluster(ctx context.Context, opts ClusterOptions) (map[schema.MD5Hash]*digestClusterInfo, error) { |
| ctx, span := trace.StartSpan(ctx, "getDigestsAndTracesForCluster") |
| defer span.End() |
| statement := "WITH " |
| dataOfInterest, err := clusterDataOfInterestStatement(opts) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| statement += dataOfInterest + ` |
| SELECT DataOfInterest.*, label |
| FROM DataOfInterest JOIN Expectations |
| ON Expectations.grouping_id = $1 and DataOfInterest.digest = Expectations.digest |
| WHERE label = ANY($3) |
| ` |
| var triageStatuses []schema.ExpectationLabel |
| if opts.IncludeUntriagedDigests { |
| triageStatuses = append(triageStatuses, schema.LabelUntriaged) |
| } |
| if opts.IncludeNegativeDigests { |
| triageStatuses = append(triageStatuses, schema.LabelNegative) |
| } |
| if opts.IncludePositiveDigests { |
| triageStatuses = append(triageStatuses, schema.LabelPositive) |
| } |
| if len(triageStatuses) == 0 { |
| return nil, nil // If no triage status is set, there can be no results. |
| } |
| |
| _, groupingID := sql.SerializeMap(opts.Grouping) |
| rows, err := s.db.Query(ctx, statement, groupingID, common.GetFirstCommitID(ctx), triageStatuses) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| var digestKey schema.MD5Hash |
| var digest schema.DigestBytes |
| rv := map[schema.MD5Hash]*digestClusterInfo{} |
| for rows.Next() { |
| var traceID schema.TraceID |
| var optionsID schema.OptionsID |
| var label schema.ExpectationLabel |
| if err := rows.Scan(&traceID, &optionsID, &digest, &label); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| copy(digestKey[:], digest) |
| info := rv[digestKey] |
| if info == nil { |
| info = &digestClusterInfo{ |
| label: label, |
| } |
| rv[digestKey] = info |
| } |
| info.traceIDs = append(info.traceIDs, traceID) |
| info.optionsIDs = append(info.optionsIDs, optionsID) |
| } |
| if s.isPublicView { |
| s.mutex.RLock() |
| defer s.mutex.RUnlock() |
| for _, info := range rv { |
| var filteredTraceIDs []schema.TraceID |
| var filteredOptionIDs []schema.OptionsID |
| var traceKey schema.MD5Hash |
| tr := traceKey[:] |
| for i := range info.traceIDs { |
| copy(tr, info.traceIDs[i]) |
| if _, ok := s.publiclyVisibleTraces[traceKey]; ok { |
| filteredTraceIDs = append(filteredTraceIDs, info.traceIDs[i]) |
| filteredOptionIDs = append(filteredOptionIDs, info.optionsIDs[i]) |
| } |
| } |
| info.traceIDs = filteredTraceIDs |
| info.optionsIDs = filteredOptionIDs |
| } |
| } |
| return rv, nil |
| } |
| |
| // clusterDataOfInterestStatement returns a statement called DataOfInterest that contains |
| // the trace_id, options_id, digest from the ValuesAtHead table from the traces that match |
| // the given options. It will make use of the $1 placeholder for grouping_id and $2 for |
| // most_recent_commit_id |
| func clusterDataOfInterestStatement(opts ClusterOptions) (string, error) { |
| if len(opts.Filters) == 0 { |
| return ` |
| DataOfInterest AS ( |
| SELECT trace_id, options_id, digest FROM ValuesAtHead |
| WHERE grouping_id = $1 AND matches_any_ignore_rule = FALSE AND most_recent_commit_id >= $2 |
| )`, nil |
| } |
| keys := make([]string, 0, len(opts.Filters)) |
| for key := range opts.Filters { |
| keys = append(keys, key) |
| } |
| sort.Strings(keys) // sort for determinism |
| |
| statement := "\n" |
| unionIndex := 0 |
| for _, key := range keys { |
| if key != sql.Sanitize(key) { |
| return "", skerr.Fmt("Invalid query key %q", key) |
| } |
| statement += fmt.Sprintf("U%d AS (\n", unionIndex) |
| for j, value := range opts.Filters[key] { |
| if value != sql.Sanitize(value) { |
| return "", skerr.Fmt("Invalid query value %q", value) |
| } |
| if j != 0 { |
| statement += "\tUNION\n" |
| } |
| statement += fmt.Sprintf("\tSELECT trace_id FROM Traces WHERE keys -> '%s' = '%q'\n", key, value) |
| } |
| statement += "),\n" |
| unionIndex++ |
| } |
| statement += "TracesOfInterest AS (\n" |
| for i := 0; i < unionIndex; i++ { |
| statement += fmt.Sprintf("\tSELECT trace_id FROM U%d\n\tINTERSECT\n", i) |
| } |
| // Include a final intersect for the corpus |
| statement += ` SELECT trace_id FROM Traces WHERE grouping_id = $1 AND matches_any_ignore_rule = FALSE |
| ), |
| DataOfInterest AS ( |
| SELECT ValuesAtHead.trace_id, options_id, digest FROM ValuesAtHead |
| JOIN TracesOfInterest ON ValuesAtHead.trace_id = TracesOfInterest.trace_id |
| WHERE most_recent_commit_id >= $2 |
| )` |
| return statement, nil |
| } |
| |
| // getLinks returns the nodes and links that correspond to the digests and how each compares to |
| // the other digests. |
| func (s *Impl) getLinks(ctx context.Context, digests map[schema.MD5Hash]*digestClusterInfo) ([]frontend.Node, []frontend.Link, error) { |
| ctx, span := trace.StartSpan(ctx, "getDigestsAndTracesForCluster") |
| defer span.End() |
| |
| var digestsToLookup []schema.DigestBytes |
| nodes := make([]frontend.Node, 0, len(digestsToLookup)) |
| for digest, info := range digests { |
| if len(info.traceIDs) > 0 { |
| digestsToLookup = append(digestsToLookup, sql.FromMD5Hash(digest)) |
| nodes = append(nodes, frontend.Node{ |
| Digest: types.Digest(hex.EncodeToString(digest[:])), |
| Status: info.label.ToExpectation(), |
| }) |
| } |
| } |
| if len(digestsToLookup) == 0 { |
| return nil, nil, nil |
| } |
| // sort the nodes by digest for determinism. |
| sort.Slice(nodes, func(i, j int) bool { |
| return nodes[i].Digest < nodes[j].Digest |
| }) |
| // Make a map so we can easily go from digest to index as we go through our results. |
| digestToIndex := make(map[types.Digest]int, len(digestsToLookup)) |
| for i, n := range nodes { |
| digestToIndex[n.Digest] = i |
| } |
| |
| span.AddAttributes(trace.Int64Attribute("num_digests", int64(len(digestsToLookup)))) |
| const statement = `SELECT encode(left_digest, 'hex'), encode(right_digest, 'hex'), percent_pixels_diff |
| FROM DiffMetrics AS OF SYSTEM TIME '-0.1s' |
| WHERE left_digest = ANY($1) AND right_digest = ANY($1) AND left_digest < right_digest |
| ORDER BY 1, 2` |
| rows, err := s.db.Query(ctx, statement, digestsToLookup) |
| if err != nil { |
| return nil, nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| var leftDigest types.Digest |
| var rightDigest types.Digest |
| links := make([]frontend.Link, 0, len(nodes)*(len(nodes)-1)/2) |
| for rows.Next() { |
| var link frontend.Link |
| if err := rows.Scan(&leftDigest, &rightDigest, &link.Distance); err != nil { |
| return nil, nil, skerr.Wrap(err) |
| } |
| link.LeftIndex = digestToIndex[leftDigest] |
| link.RightIndex = digestToIndex[rightDigest] |
| links = append(links, link) |
| } |
| return nodes, links, nil |
| } |
| |
| // getParamsetsForCluster looks up all the params for the given traces and options and returns |
| // them grouped by digest and in totality. |
| func (s *Impl) getParamsetsForCluster(ctx context.Context, digests map[schema.MD5Hash]*digestClusterInfo) (map[types.Digest]paramtools.ParamSet, paramtools.ParamSet, error) { |
| ctx, span := trace.StartSpan(ctx, "getParamsetsForCluster") |
| defer span.End() |
| if len(digests) == 0 { |
| return nil, nil, nil |
| } |
| byDigest := make(map[types.Digest]paramtools.ParamSet, len(digests)) |
| combined := paramtools.ParamSet{} |
| for d, info := range digests { |
| digest := types.Digest(hex.EncodeToString(d[:])) |
| thisDigestsParamset, ok := byDigest[digest] |
| if !ok { |
| thisDigestsParamset = paramtools.ParamSet{} |
| byDigest[digest] = thisDigestsParamset |
| } |
| for _, traceID := range info.traceIDs { |
| p, err := s.expandTraceToParams(ctx, traceID) |
| if err != nil { |
| return nil, nil, skerr.Wrap(err) |
| } |
| thisDigestsParamset.AddParams(p) |
| combined.AddParams(p) |
| } |
| for _, optID := range info.optionsIDs { |
| p, err := s.expandOptionsToParams(ctx, optID) |
| if err != nil { |
| return nil, nil, skerr.Wrap(err) |
| } |
| thisDigestsParamset.AddParams(p) |
| combined.AddParams(p) |
| } |
| } |
| // Normalize the paramsets for determinism |
| for _, ps := range byDigest { |
| ps.Normalize() |
| } |
| combined.Normalize() |
| return byDigest, combined, nil |
| } |
| |
| // GetCommitsInWindow implements the API interface |
| func (s *Impl) GetCommitsInWindow(ctx context.Context) ([]frontend.Commit, error) { |
| ctx, span := trace.StartSpan(ctx, "addCommitsData") |
| defer span.End() |
| // TODO(kjlubick) will need to handle non-git repos as well. |
| const statement = `WITH |
| RecentCommits AS ( |
| SELECT commit_id FROM CommitsWithData |
| AS OF SYSTEM TIME '-0.1s' |
| ORDER BY commit_id DESC LIMIT $1 |
| ) |
| SELECT git_hash, GitCommits.commit_id, commit_time, author_email, subject FROM GitCommits |
| JOIN RecentCommits ON GitCommits.commit_id = RecentCommits.commit_id |
| AS OF SYSTEM TIME '-0.1s' |
| ORDER BY commit_id ASC` |
| rows, err := s.db.Query(ctx, statement, s.windowLength) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| var rv []frontend.Commit |
| for rows.Next() { |
| var ts time.Time |
| var commit frontend.Commit |
| if err := rows.Scan(&commit.Hash, &commit.ID, &ts, &commit.Author, &commit.Subject); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| commit.CommitTime = ts.UTC().Unix() |
| rv = append(rv, commit) |
| } |
| return rv, nil |
| } |
| |
| // GetDigestsForGrouping implements the API interface. |
| func (s *Impl) GetDigestsForGrouping(ctx context.Context, grouping paramtools.Params) (frontend.DigestListResponse, error) { |
| ctx, span := trace.StartSpan(ctx, "search2_GetDigestsForGrouping") |
| defer span.End() |
| _, groupingID := sql.SerializeMap(grouping) |
| const statement = `WITH |
| RecentCommits AS ( |
| SELECT tile_id FROM CommitsWithData |
| ORDER BY commit_id DESC LIMIT $1 |
| ), |
| FirstTileInWindow AS ( |
| SELECT tile_id FROM RecentCommits |
| ORDER BY tile_id ASC LIMIT 1 |
| ) |
| SELECT DISTINCT encode(digest, 'hex') FROM TiledTraceDigests JOIN |
| FirstTileInWindow ON TiledTraceDigests.tile_id >= FirstTileInWindow.tile_id AND |
| TiledTraceDigests.grouping_id = $2 |
| ORDER BY 1` |
| |
| rows, err := s.db.Query(ctx, statement, s.windowLength, groupingID) |
| if err != nil { |
| return frontend.DigestListResponse{}, skerr.Wrapf(err, "getting digests for grouping %x - %#v", groupingID, grouping) |
| } |
| defer rows.Close() |
| var resp frontend.DigestListResponse |
| for rows.Next() { |
| var digest types.Digest |
| if err := rows.Scan(&digest); err != nil { |
| return frontend.DigestListResponse{}, skerr.Wrap(err) |
| } |
| resp.Digests = append(resp.Digests, digest) |
| } |
| return resp, nil |
| } |
| |
| // GetDigestDetails is very similar to the Search() function, but it only has one digest, so |
| // there is only one result. |
| func (s *Impl) GetDigestDetails(ctx context.Context, grouping paramtools.Params, digest types.Digest, clID, crs string) (frontend.DigestDetails, error) { |
| ctx, span := trace.StartSpan(ctx, "search2_GetDigestDetails") |
| defer span.End() |
| |
| ctx, err := s.addCommitsData(ctx) |
| if err != nil { |
| return frontend.DigestDetails{}, skerr.Wrap(err) |
| } |
| commits, err := s.getCommits(ctx) |
| if err != nil { |
| return frontend.DigestDetails{}, skerr.Wrap(err) |
| } |
| // Fill in a few values to allow us to use the same methods as Search. |
| ctx = context.WithValue(ctx, common.QueryKey, query.Search{ |
| CodeReviewSystemID: crs, ChangelistID: clID, |
| Offset: 0, Limit: 1, RGBAMaxFilter: 255, |
| }) |
| |
| var digestWithTraceAndGrouping []digestWithTraceAndGrouping |
| if clID == "" { |
| digestWithTraceAndGrouping, err = s.getTracesForGroupingAndDigest(ctx, grouping, digest) |
| if err != nil { |
| return frontend.DigestDetails{}, skerr.Wrap(err) |
| } |
| } else { |
| if crs == "" { |
| return frontend.DigestDetails{}, skerr.Fmt("Code Review System (crs) must be specified") |
| } |
| ctx = context.WithValue(ctx, common.QualifiedCLIDKey, sql.Qualify(crs, clID)) |
| digestWithTraceAndGrouping, err = s.getTracesFromCLThatProduced(ctx, grouping, digest) |
| if err != nil { |
| return frontend.DigestDetails{}, skerr.Wrap(err) |
| } |
| if commits, err = s.addCLCommit(ctx, commits); err != nil { |
| return frontend.DigestDetails{}, skerr.Wrap(err) |
| } |
| } |
| |
| if s.isPublicView { |
| digestWithTraceAndGrouping = s.applyPublicFilterToDigestWithTraceAndGrouping(digestWithTraceAndGrouping) |
| } |
| |
| // Lookup the closest diffs to the given digests. This returns a subset according to the |
| // limit and offset in the query. |
| digestAndClosestDiffs, _, err := s.getClosestDiffs(ctx, digestWithTraceAndGrouping) |
| if err != nil { |
| return frontend.DigestDetails{}, skerr.Wrap(err) |
| } |
| if len(digestAndClosestDiffs) == 0 { |
| return frontend.DigestDetails{}, skerr.Fmt("No results found") |
| } |
| // Go fetch history and paramset (within this grouping, and respecting publiclyAllowedParams). |
| paramsetsByDigest, err := s.getParamsetsForRightSide(ctx, digestAndClosestDiffs) |
| if err != nil { |
| return frontend.DigestDetails{}, skerr.Wrap(err) |
| } |
| // Flesh out the trace history with enough data to draw the dots diagram on the frontend. |
| // The returned slice should be length 1 because we had only 1 digest and 1 |
| // digestAndClosestDiffs. |
| resultSlice, err := s.fillOutTraceHistory(ctx, digestAndClosestDiffs) |
| if err != nil { |
| return frontend.DigestDetails{}, skerr.Wrap(err) |
| } |
| |
| result := *resultSlice[0] |
| // Fill in the paramsets of the reference images. |
| for _, srdd := range result.RefDiffs { |
| if srdd != nil { |
| srdd.ParamSet = paramsetsByDigest[srdd.Digest] |
| } |
| } |
| |
| // Make sure the Test is set, even if the digest wasn't seen in the current window |
| // The frontend relies on this field to be able to triage the results. |
| result.Test = types.TestName(grouping[types.PrimaryKeyField]) |
| return frontend.DigestDetails{ |
| Commits: commits, |
| Result: result, |
| }, nil |
| } |
| |
| // applyPublicFilterToDigestWithTraceAndGrouping filters out any digestWithTraceAndGrouping for |
| // traces that are not publicly visible. |
| func (s *Impl) applyPublicFilterToDigestWithTraceAndGrouping(results []digestWithTraceAndGrouping) []digestWithTraceAndGrouping { |
| s.mutex.RLock() |
| defer s.mutex.RUnlock() |
| filteredResults := make([]digestWithTraceAndGrouping, 0, len(results)) |
| var traceKey schema.MD5Hash |
| for _, result := range results { |
| copy(traceKey[:], result.traceID) |
| if _, ok := s.publiclyVisibleTraces[traceKey]; ok { |
| filteredResults = append(filteredResults, result) |
| } |
| } |
| return filteredResults |
| } |
| |
| // getTracesForGroupingAndDigest finds the traces on the primary branch which produced the given |
| // digest in the provided grouping. If no such trace exists (recently), a single result will be |
| // added with no trace, to all the UI to at least show the image and possible diff data. |
| func (s *Impl) getTracesForGroupingAndDigest(ctx context.Context, grouping paramtools.Params, digest types.Digest) ([]digestWithTraceAndGrouping, error) { |
| ctx, span := trace.StartSpan(ctx, "getTracesForGroupingAndDigest") |
| defer span.End() |
| |
| _, groupingID := sql.SerializeMap(grouping) |
| digestBytes, err := sql.DigestToBytes(digest) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| const statement = `SELECT trace_id FROM TiledTraceDigests@grouping_digest_idx |
| WHERE tile_id >= $1 AND grouping_id = $2 AND digest = $3 |
| ` |
| rows, err := s.db.Query(ctx, statement, common.GetFirstTileID(ctx), groupingID, digestBytes) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| var results []digestWithTraceAndGrouping |
| for rows.Next() { |
| var traceID schema.TraceID |
| if err := rows.Scan(&traceID); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| results = append(results, digestWithTraceAndGrouping{ |
| traceID: traceID, |
| groupingID: groupingID, |
| digest: digestBytes, |
| }) |
| } |
| if len(results) == 0 { |
| // Add in a result that has at least the digest and groupingID. This can be helpful for |
| // when an image was produced a long time ago, but not in recent traces. It allows the UI |
| // to at least show the image and some diff information. |
| results = append(results, digestWithTraceAndGrouping{ |
| groupingID: groupingID, |
| digest: digestBytes, |
| }) |
| } |
| return results, nil |
| } |
| |
| // getTracesFromCLThatProduced returns a digestWithTraceAndGrouping for all traces that produced |
| // the provided digest in the given grouping on the most recent PS for the given CL. |
| func (s *Impl) getTracesFromCLThatProduced(ctx context.Context, grouping paramtools.Params, digest types.Digest) ([]digestWithTraceAndGrouping, error) { |
| ctx, span := trace.StartSpan(ctx, "getTracesFromCLThatProduced") |
| defer span.End() |
| |
| _, groupingID := sql.SerializeMap(grouping) |
| digestBytes, err := sql.DigestToBytes(digest) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| const statement = `WITH |
| MostRecentPS AS ( |
| SELECT patchset_id FROM Patchsets WHERE changelist_id = $1 |
| ORDER BY created_ts DESC, ps_order DESC |
| LIMIT 1 |
| ) |
| SELECT secondary_branch_trace_id, options_id FROM SecondaryBranchValues |
| JOIN MostRecentPS ON SecondaryBranchValues.version_name = MostRecentPS.patchset_id |
| WHERE branch_name = $1 AND grouping_id = $2 AND digest = $3` |
| rows, err := s.db.Query(ctx, statement, common.GetQualifiedCL(ctx), groupingID, digestBytes) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| var results []digestWithTraceAndGrouping |
| for rows.Next() { |
| var traceID schema.TraceID |
| var optionsID schema.OptionsID |
| if err := rows.Scan(&traceID, &optionsID); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| results = append(results, digestWithTraceAndGrouping{ |
| traceID: traceID, |
| groupingID: groupingID, |
| digest: digestBytes, |
| optionsID: optionsID, |
| }) |
| } |
| return results, nil |
| } |
| |
| // GetDigestsDiff implements the API interface. |
| func (s *Impl) GetDigestsDiff(ctx context.Context, grouping paramtools.Params, left, right types.Digest, clID, crs string) (frontend.DigestComparison, error) { |
| ctx, span := trace.StartSpan(ctx, "web_GetDigestsDiff") |
| defer span.End() |
| _, groupingID := sql.SerializeMap(grouping) |
| leftBytes, err := sql.DigestToBytes(left) |
| if err != nil { |
| return frontend.DigestComparison{}, skerr.Wrap(err) |
| } |
| rightBytes, err := sql.DigestToBytes(right) |
| if err != nil { |
| return frontend.DigestComparison{}, skerr.Wrap(err) |
| } |
| |
| metrics, err := s.getDiffBetween(ctx, leftBytes, rightBytes) |
| if err != nil { |
| return frontend.DigestComparison{}, skerr.Wrapf(err, "missing diff information for %s-%s", left, right) |
| } |
| |
| leftLabel, err := s.getExpectationsForDigest(ctx, groupingID, leftBytes, crs, clID) |
| if err != nil { |
| return frontend.DigestComparison{}, skerr.Wrap(err) |
| } |
| rightLabel, err := s.getExpectationsForDigest(ctx, groupingID, rightBytes, crs, clID) |
| if err != nil { |
| return frontend.DigestComparison{}, skerr.Wrap(err) |
| } |
| |
| leftParamset, err := s.getParamsetsForTracesProducing(ctx, groupingID, leftBytes, crs, clID) |
| if err != nil { |
| return frontend.DigestComparison{}, skerr.Wrap(err) |
| } |
| rightParamset, err := s.getParamsetsForTracesProducing(ctx, groupingID, rightBytes, crs, clID) |
| if err != nil { |
| return frontend.DigestComparison{}, skerr.Wrap(err) |
| } |
| |
| metrics.Digest = right |
| metrics.Status = rightLabel |
| metrics.ParamSet = rightParamset |
| return frontend.DigestComparison{ |
| Left: frontend.LeftDiffInfo{ |
| Test: types.TestName(grouping[types.PrimaryKeyField]), |
| Digest: left, |
| Status: leftLabel, |
| TriageHistory: nil, // TODO(kjlubick) |
| ParamSet: leftParamset, |
| }, |
| Right: metrics, |
| }, nil |
| |
| } |
| |
| // getDiffBetween returns the diff metrics for the given digests. |
| func (s *Impl) getDiffBetween(ctx context.Context, left, right schema.DigestBytes) (frontend.SRDiffDigest, error) { |
| ctx, span := trace.StartSpan(ctx, "getDiffBetween") |
| defer span.End() |
| const statement = `SELECT num_pixels_diff, percent_pixels_diff, max_rgba_diffs, |
| combined_metric, dimensions_differ |
| FROM DiffMetrics WHERE left_digest = $1 and right_digest = $2 LIMIT 1` |
| row := s.db.QueryRow(ctx, statement, left, right) |
| var rv frontend.SRDiffDigest |
| if err := row.Scan(&rv.NumDiffPixels, &rv.PixelDiffPercent, &rv.MaxRGBADiffs, |
| &rv.CombinedMetric, &rv.DimDiffer); err != nil { |
| return frontend.SRDiffDigest{}, skerr.Wrap(err) |
| } |
| return rv, nil |
| } |
| |
| // getExpectationsForDigest returns the expectations for the given digest and grouping pair. It |
| // assumes that the given digest is valid. If the provided digest was triaged on the given CL (and |
| // the CL is valid), the expectation from that CL takes precedence over the expectation from the |
| // primary branch. If no expectation is found, we assume that 1) the digest was ingested from a CL, |
| // 2) the digest was never seen before, and 3) the digest has not yet been triaged; therefore we |
| // return expectations.Untriaged. It is the caller's responsibility to detect whether the digest is |
| // valid in this case, if applicable (e.g. by inspecting the SecondaryBranchValues table). |
| // |
| // Defaulting to expectations.Untriaged is consistent with the way CL ingestion works: we do not |
| // populate the SecondaryBranchExpectations table with untriaged entries during ingestion. |
| func (s *Impl) getExpectationsForDigest(ctx context.Context, groupingID schema.GroupingID, digest schema.DigestBytes, crs, clID string) (expectations.Label, error) { |
| ctx, span := trace.StartSpan(ctx, "getExpectationsForDigest") |
| defer span.End() |
| |
| const statement = `WITH |
| -- If the CRS and CLID are blank here, this will be empty, but the COALESCE will fix things up. |
| ExpectationsFromCL AS ( |
| SELECT label, digest FROM SecondaryBranchExpectations |
| WHERE grouping_id = $1 |
| AND digest = $2 |
| AND branch_name = $3 |
| LIMIT 1 |
| ), |
| ExpectationsFromPrimary AS ( |
| SELECT label, digest FROM Expectations |
| WHERE grouping_id = $1 |
| AND digest = $2 |
| LIMIT 1 |
| ), |
| Digest AS ( |
| SELECT $2 AS digest |
| ) |
| SELECT COALESCE(ExpectationsFromCL.label, ExpectationsFromPrimary.label, 'u') AS label |
| -- This ensures that the query result is never empty, which is necessary to generate the |
| -- potentially NULL expectations passed to COALESCE. |
| FROM Digest |
| FULL OUTER JOIN ExpectationsFromCL USING (digest) |
| FULL OUTER JOIN ExpectationsFromPrimary USING (digest) |
| ` |
| |
| qCLID := sql.Qualify(crs, clID) |
| row := s.db.QueryRow(ctx, statement, groupingID, digest, qCLID) |
| var label schema.ExpectationLabel |
| if err := row.Scan(&label); err != nil { |
| if err == pgx.ErrNoRows { |
| return "", skerr.Wrapf(err, "while querying expectations for %x on primary branch or cl %s for grouping %x: query returned 0 rows, which should never happen (this is a bug)", digest, qCLID, groupingID) |
| } |
| return "", skerr.Wrap(err) |
| } |
| return label.ToExpectation(), nil |
| } |
| |
| // getParamsetsForTracesProducing returns the paramset of the traces on the primary branch which |
| // produced the digest at the given grouping. If a valid CRS and CLID are provided, the traces |
| // on that will also be included. |
| func (s *Impl) getParamsetsForTracesProducing(ctx context.Context, groupingID schema.GroupingID, digest schema.DigestBytes, crs, clID string) (paramtools.ParamSet, error) { |
| ctx, span := trace.StartSpan(ctx, "getParamsetsForTracesProducing") |
| defer span.End() |
| |
| const statement = `WITH |
| RecentCommits AS ( |
| SELECT commit_id, tile_id FROM CommitsWithData |
| ORDER BY commit_id DESC LIMIT $1 |
| ), |
| OldestTileInWindow AS ( |
| SELECT tile_id FROM RecentCommits |
| ORDER BY commit_id ASC LIMIT 1 |
| ), |
| TracesThatProducedDigestOnPrimary AS ( |
| SELECT DISTINCT trace_id FROM TiledTraceDigests |
| JOIN OldestTileInWindow ON TiledTraceDigests.tile_id >= OldestTileInWindow.tile_id AND |
| TiledTraceDigests.grouping_id = $2 AND TiledTraceDigests.digest = $3 |
| ), |
| TracesAndOptionsFromPrimary AS ( |
| SELECT TracesThatProducedDigestOnPrimary.trace_id, ValuesAtHead.options_id |
| FROM TracesThatProducedDigestOnPrimary JOIN ValuesAtHead |
| ON TracesThatProducedDigestOnPrimary.trace_id = ValuesAtHead.trace_id |
| ), |
| -- If the crs and clID are empty or do not exist, this will be the empty set. |
| TracesAndOptionsThatProducedDigestOnCL AS ( |
| SELECT DISTINCT secondary_branch_trace_id AS trace_id, options_id FROM SecondaryBranchValues WHERE |
| grouping_id = $2 AND digest = $3 AND branch_name = $4 |
| ) |
| SELECT trace_id, options_id FROM TracesAndOptionsFromPrimary |
| UNION |
| SELECT trace_id, options_id FROM TracesAndOptionsThatProducedDigestOnCL |
| ` |
| rows, err := s.db.Query(ctx, statement, s.windowLength, groupingID, digest, sql.Qualify(crs, clID)) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| var traces []schema.TraceID |
| var opts []schema.OptionsID |
| |
| for rows.Next() { |
| var trID schema.TraceID |
| var optID schema.OptionsID |
| if err := rows.Scan(&trID, &optID); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| // trace ids should be unique due to the query we made |
| traces = append(traces, trID) |
| // There are generally few options, so a linear search to avoid duplicates is sufficient |
| // to avoid a lot of cache lookups. |
| existsAlready := false |
| for _, o := range opts { |
| if bytes.Equal(o, optID) { |
| existsAlready = true |
| break |
| } |
| } |
| if !existsAlready { |
| opts = append(opts, optID) |
| } |
| } |
| |
| paramset, err := s.lookupOrLoadParamSetFromCache(ctx, traces) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| for _, o := range opts { |
| ps, err := s.expandOptionsToParams(ctx, o) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| paramset.AddParams(ps) |
| } |
| paramset.Normalize() |
| return paramset, nil |
| } |
| |
| // CountDigestsByTest counts only the digests at head that match the given query. |
| func (s *Impl) CountDigestsByTest(ctx context.Context, q frontend.ListTestsQuery) (frontend.ListTestsResponse, error) { |
| ctx, span := trace.StartSpan(ctx, "countDigestsByTest") |
| defer span.End() |
| |
| statement := `WITH |
| CommitsInWindow AS ( |
| SELECT commit_id, tile_id FROM CommitsWithData |
| ORDER BY commit_id DESC LIMIT $1 |
| ), |
| OldestCommitInWindow AS ( |
| SELECT commit_id, tile_id FROM CommitsInWindow |
| ORDER BY commit_id ASC LIMIT 1 |
| ),` |
| digestsStatement, digestsArgs, err := digestCountTracesStatement(q) |
| if err != nil { |
| return frontend.ListTestsResponse{}, skerr.Wrap(err) |
| } |
| statement += digestsStatement |
| statement += `DigestsWithLabels AS ( |
| SELECT Groupings.grouping_id, Groupings.keys AS grouping, label, DigestsOfInterest.digest |
| FROM DigestsOfInterest |
| JOIN Expectations ON DigestsOfInterest.grouping_id = Expectations.grouping_id |
| AND DigestsOfInterest.digest = Expectations.digest |
| JOIN Groupings ON DigestsOfInterest.grouping_id = Groupings.grouping_id |
| ) |
| SELECT encode(grouping_id, 'hex'), grouping, label, COUNT(digest) FROM DigestsWithLabels |
| GROUP BY grouping_id, grouping, label ORDER BY grouping->>'name'` |
| |
| arguments := []interface{}{s.windowLength} |
| arguments = append(arguments, digestsArgs...) |
| sklog.Infof("query: %#v", q) |
| sklog.Infof("statement: %s", statement) |
| sklog.Infof("arguments: %s", arguments) |
| rows, err := s.db.Query(ctx, statement, arguments...) |
| if err != nil { |
| return frontend.ListTestsResponse{}, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| var currentSummary *frontend.TestSummary |
| var currentSummaryGroupingID string |
| var summaries []*frontend.TestSummary |
| for rows.Next() { |
| var groupingID string |
| var grouping paramtools.Params |
| var label schema.ExpectationLabel |
| var count int |
| if err := rows.Scan(&groupingID, &grouping, &label, &count); err != nil { |
| return frontend.ListTestsResponse{}, skerr.Wrap(err) |
| } |
| if currentSummary == nil || currentSummaryGroupingID != groupingID { |
| currentSummary = &frontend.TestSummary{Grouping: grouping} |
| currentSummaryGroupingID = groupingID |
| summaries = append(summaries, currentSummary) |
| } |
| if label == schema.LabelNegative { |
| currentSummary.NegativeDigests = count |
| } else if label == schema.LabelPositive { |
| currentSummary.PositiveDigests = count |
| } else { |
| currentSummary.UntriagedDigests = count |
| } |
| } |
| |
| withTotals := make([]frontend.TestSummary, 0, len(summaries)) |
| for _, s := range summaries { |
| s.TotalDigests = s.UntriagedDigests + s.PositiveDigests + s.NegativeDigests |
| withTotals = append(withTotals, *s) |
| } |
| return frontend.ListTestsResponse{Tests: withTotals}, nil |
| } |
| |
| // digestCountTracesStatement returns a statement and arguments that will return all tests, |
| // digests and their grouping ids. The results will be in a table called DigestsWithLabels. |
| func digestCountTracesStatement(q frontend.ListTestsQuery) (string, []interface{}, error) { |
| arguments := []interface{}{q.Corpus} |
| statement := `DigestsOfInterest AS ( |
| SELECT DISTINCT keys->>'name' AS test_name, digest, grouping_id FROM ValuesAtHead |
| JOIN OldestCommitInWindow ON ValuesAtHead.most_recent_commit_id >= OldestCommitInWindow.commit_id |
| WHERE corpus = $2` |
| if q.IgnoreState == types.ExcludeIgnoredTraces { |
| statement += ` AND matches_any_ignore_rule = FALSE` |
| } |
| if len(q.TraceValues) > 0 { |
| jObj := map[string]string{} |
| for key, values := range q.TraceValues { |
| if len(values) != 1 { |
| return "", nil, skerr.Fmt("not implemented: we only support one value per key") |
| } |
| jObj[key] = values[0] |
| } |
| statement += ` AND keys @> $3` |
| arguments = append(arguments, jObj) |
| } |
| statement += ")," |
| return statement, arguments, nil |
| } |
| |
| // ComputeGUIStatus implements the API interface. It has special logic for public views vs the |
| // normal views to avoid leaking. |
| func (s *Impl) ComputeGUIStatus(ctx context.Context) (frontend.GUIStatus, error) { |
| ctx, span := trace.StartSpan(ctx, "search2_ComputeGUIStatus") |
| defer span.End() |
| |
| var gs frontend.GUIStatus |
| row := s.db.QueryRow(ctx, `SELECT commit_id FROM CommitsWithData |
| ORDER BY CommitsWithData.commit_id DESC LIMIT 1`) |
| if err := row.Scan(&gs.LastCommit.ID); err != nil { |
| return frontend.GUIStatus{}, skerr.Wrap(err) |
| } |
| |
| row = s.db.QueryRow(ctx, `SELECT git_hash, commit_time, author_email, subject |
| FROM GitCommits WHERE GitCommits.commit_id = $1`, gs.LastCommit.ID) |
| var ts time.Time |
| if err := row.Scan(&gs.LastCommit.Hash, &ts, &gs.LastCommit.Author, &gs.LastCommit.Subject); err != nil { |
| // TODO(kjlubick) make this work for MetadataCommits |
| sklog.Infof("Error getting git info for commit_id %s - %s", gs.LastCommit.ID, err) |
| } else { |
| gs.LastCommit.CommitTime = ts.UTC().Unix() |
| } |
| |
| if s.isPublicView { |
| xcs, err := s.getPublicViewCorporaStatuses(ctx) |
| if err != nil { |
| return frontend.GUIStatus{}, skerr.Wrap(err) |
| } |
| gs.CorpStatus = xcs |
| return gs, nil |
| } |
| xcs, err := s.getCorporaStatuses(ctx) |
| if err != nil { |
| return frontend.GUIStatus{}, skerr.Wrap(err) |
| } |
| gs.CorpStatus = xcs |
| return gs, nil |
| } |
| |
| // getCorporaStatuses counts the untriaged digests for all corpora. |
| func (s *Impl) getCorporaStatuses(ctx context.Context) ([]frontend.GUICorpusStatus, error) { |
| ctx, span := trace.StartSpan(ctx, "getCorporaStatuses") |
| defer span.End() |
| const statement = `WITH |
| CommitsInWindow AS ( |
| SELECT commit_id FROM CommitsWithData |
| ORDER BY commit_id DESC LIMIT $1 |
| ), |
| OldestCommitInWindow AS ( |
| SELECT commit_id FROM CommitsInWindow |
| ORDER BY commit_id ASC LIMIT 1 |
| ), |
| DistinctNotIgnoredDigests AS ( |
| SELECT DISTINCT corpus, digest, grouping_id FROM ValuesAtHead |
| JOIN OldestCommitInWindow ON ValuesAtHead.most_recent_commit_id >= OldestCommitInWindow.commit_id |
| WHERE matches_any_ignore_rule = FALSE |
| ), |
| CorporaWithAtLeastOneTriaged AS ( |
| SELECT corpus, COUNT(DistinctNotIgnoredDigests.digest) AS num_untriaged FROM DistinctNotIgnoredDigests |
| JOIN Expectations ON DistinctNotIgnoredDigests.grouping_id = Expectations.grouping_id AND |
| DistinctNotIgnoredDigests.digest = Expectations.digest AND label = 'u' |
| GROUP BY corpus |
| ), |
| AllCorpora AS ( |
| -- Corpora with no untriaged digests will not show up in CorporaWithAtLeastOneTriaged. |
| -- We still want to include them in our status, so we do a separate query and union it in. |
| SELECT DISTINCT corpus, 0 AS num_untriaged FROM ValuesAtHead |
| JOIN OldestCommitInWindow ON ValuesAtHead.most_recent_commit_id >= OldestCommitInWindow.commit_id |
| ) |
| SELECT corpus, max(num_untriaged) FROM ( |
| SELECT corpus, num_untriaged FROM AllCorpora |
| UNION |
| SELECT corpus, num_untriaged FROM CorporaWithAtLeastOneTriaged |
| ) AS all_corpora GROUP BY corpus` |
| |
| rows, err := s.db.Query(ctx, statement, s.windowLength) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| var rv []frontend.GUICorpusStatus |
| for rows.Next() { |
| var cs frontend.GUICorpusStatus |
| if err := rows.Scan(&cs.Name, &cs.UntriagedCount); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| rv = append(rv, cs) |
| } |
| |
| sort.Slice(rv, func(i, j int) bool { |
| return rv[i].Name < rv[j].Name |
| }) |
| return rv, nil |
| } |
| |
| // getPublicViewCorporaStatuses counts the untriaged digests belonging to only those traces which |
| // match the public view matcher. It filters the traces using the cached publiclyVisibleTraces. |
| func (s *Impl) getPublicViewCorporaStatuses(ctx context.Context) ([]frontend.GUICorpusStatus, error) { |
| ctx, span := trace.StartSpan(ctx, "getCorporaStatuses") |
| defer span.End() |
| const statement = `WITH |
| CommitsInWindow AS ( |
| SELECT commit_id FROM CommitsWithData |
| ORDER BY commit_id DESC LIMIT $1 |
| ), |
| OldestCommitInWindow AS ( |
| SELECT commit_id FROM CommitsInWindow |
| ORDER BY commit_id ASC LIMIT 1 |
| ), |
| NotIgnoredDigests AS ( |
| SELECT trace_id, corpus, digest, grouping_id FROM ValuesAtHead |
| JOIN OldestCommitInWindow ON ValuesAtHead.most_recent_commit_id >= OldestCommitInWindow.commit_id |
| WHERE matches_any_ignore_rule = FALSE AND corpus = ANY($2) |
| ) |
| SELECT trace_id, corpus FROM NotIgnoredDigests |
| JOIN Expectations ON NotIgnoredDigests.grouping_id = Expectations.grouping_id AND |
| NotIgnoredDigests.digest = Expectations.digest AND label = 'u' |
| ` |
| |
| s.mutex.RLock() |
| defer s.mutex.RUnlock() |
| corpusCount := map[string]int{} |
| var corporaArgs []string |
| for corpus := range s.publiclyVisibleCorpora { |
| corpusCount[corpus] = 0 // make sure we include all corpora, even those with 0 untriaged. |
| corporaArgs = append(corporaArgs, corpus) |
| } |
| |
| rows, err := s.db.Query(ctx, statement, s.windowLength, corporaArgs) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| |
| var traceKey schema.MD5Hash |
| for rows.Next() { |
| var tr schema.TraceID |
| var corpus string |
| if err := rows.Scan(&tr, &corpus); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| copy(traceKey[:], tr) |
| if _, ok := s.publiclyVisibleTraces[traceKey]; ok { |
| corpusCount[corpus]++ |
| } |
| } |
| |
| var rv []frontend.GUICorpusStatus |
| for corpus, count := range corpusCount { |
| rv = append(rv, frontend.GUICorpusStatus{ |
| Name: corpus, |
| UntriagedCount: count, |
| }) |
| } |
| |
| sort.Slice(rv, func(i, j int) bool { |
| return rv[i].Name < rv[j].Name |
| }) |
| |
| return rv, nil |
| } |
| |
| type digestCountAndLastSeen struct { |
| digest types.Digest |
| // count is how many times a digest has been seen in a TraceGroup. |
| count int |
| // lastSeenIndex refers to the commit index that this digest was most recently seen. That is, |
| // a higher number means it was seen more recently. This digest might have seen much much earlier |
| // than this index, but only the latest occurrence affects this value. |
| lastSeenIndex int |
| } |
| |
| const ( |
| // maxDistinctDigestsToPresent is the maximum number of digests we want to show |
| // in a dotted line of traces. We assume that showing more digests yields |
| // no additional information, because the trace is likely to be flaky. |
| maxDistinctDigestsToPresent = 9 |
| |
| // 0 is always the primary digest, no matter where (or if) it appears in the trace. |
| primaryDigestIndex = 0 |
| |
| // The frontend knows to handle -1 specially and show no dot. |
| missingDigestIndex = -1 |
| |
| mostRecentNDigests = 3 |
| ) |
| |
| // ComputeDigestIndices assigns distinct digests an index ( up to MaxDistinctDigestsToPresent). |
| // This index |
| // maps to a color of dot on the frontend when representing traces. The indices are assigned to |
| // some of the most recent digests and some of the most common digests. All digests not in this |
| // map will be grouped under the highest index (represented by a grey color on the frontend). |
| // This hybrid approach was adapted in an effort to minimize the "interesting" digests that are |
| // globbed together under the grey color, which is harder to inspect from the frontend. |
| // See skbug.com/10387 for more context. |
| func computeDigestIndices(traceGroup *frontend.TraceGroup, primary types.Digest) (map[types.Digest]int, int) { |
| // digestStats is a slice that has one entry per unique digest. This could be a map, but |
| // we are going to sort it later, so it's cleaner to just use a slice initially especially |
| // when the vast vast majority (99.9% of Skia's data) of our traces have fewer than 30 unique |
| // digests. The worst case would be a few hundred unique digests, for which Ω(n) lookup isn't |
| // terrible. |
| digestStats := make([]digestCountAndLastSeen, 0, 5) |
| // Populate digestStats, iterating over the digests from all traces from oldest to newest. |
| // By construction, all traces in the TraceGroup will have the same length. |
| traceLength := len(traceGroup.Traces[0].RawTrace.Digests) |
| sawPrimary := false |
| for idx := 0; idx < traceLength; idx++ { |
| for _, tr := range traceGroup.Traces { |
| digest := tr.RawTrace.Digests[idx] |
| // Don't bother counting up data for missing digests. |
| if digest == tiling.MissingDigest { |
| continue |
| } |
| if digest == primary { |
| sawPrimary = true |
| } |
| // Go look up the entry for this digest. The sentinel value -1 will tell us if we haven't |
| // seen one and need to add one. |
| dsIdxToUpdate := -1 |
| for i, ds := range digestStats { |
| if ds.digest == digest { |
| dsIdxToUpdate = i |
| break |
| } |
| } |
| if dsIdxToUpdate == -1 { |
| dsIdxToUpdate = len(digestStats) |
| digestStats = append(digestStats, digestCountAndLastSeen{ |
| digest: digest, |
| }) |
| } |
| digestStats[dsIdxToUpdate].count++ |
| digestStats[dsIdxToUpdate].lastSeenIndex = idx |
| } |
| } |
| |
| // Sort in order of highest last seen index, with tiebreaks being higher count and then |
| // lexicographically by digest. |
| sort.Slice(digestStats, func(i, j int) bool { |
| statsA, statsB := digestStats[i], digestStats[j] |
| if statsA.lastSeenIndex != statsB.lastSeenIndex { |
| return statsA.lastSeenIndex > statsB.lastSeenIndex |
| } |
| if statsA.count != statsB.count { |
| return statsA.count > statsB.count |
| } |
| return statsA.digest < statsB.digest |
| }) |
| |
| // Assign the primary digest the primaryDigestIndex. |
| digestIndices := make(map[types.Digest]int, maxDistinctDigestsToPresent) |
| digestIndices[primary] = primaryDigestIndex |
| // Go through the slice until we have either added the n most recent digests or have run out |
| // of unique digests. We are careful not to add a digest we've already added (e.g. the primary |
| // digest). We start with the most recent digests to preserve a little bit of backwards |
| // compatibility with the assigned colors (e.g. developers are used to green and orange being the |
| // more recent digests). |
| digestIndex := 1 |
| for i := 0; i < len(digestStats) && len(digestIndices) < 1+mostRecentNDigests; i++ { |
| ds := digestStats[i] |
| if _, ok := digestIndices[ds.digest]; ok { |
| continue |
| } |
| digestIndices[ds.digest] = digestIndex |
| digestIndex++ |
| } |
| |
| // Re-sort the slice in order of highest count, with tiebreaks being a higher last seen index |
| // and then lexicographically by digest. |
| sort.Slice(digestStats, func(i, j int) bool { |
| statsA, statsB := digestStats[i], digestStats[j] |
| if statsA.count != statsB.count { |
| return statsA.count > statsB.count |
| } |
| if statsA.lastSeenIndex != statsB.lastSeenIndex { |
| return statsA.lastSeenIndex > statsB.lastSeenIndex |
| } |
| return statsA.digest < statsB.digest |
| }) |
| |
| // Assign the rest of the indices in order of most common digests. |
| for i := 0; i < len(digestStats) && len(digestIndices) < maxDistinctDigestsToPresent; i++ { |
| ds := digestStats[i] |
| if _, ok := digestIndices[ds.digest]; ok { |
| continue |
| } |
| digestIndices[ds.digest] = digestIndex |
| digestIndex++ |
| } |
| totalDigests := len(digestStats) |
| if !sawPrimary { |
| totalDigests++ |
| } |
| return digestIndices, totalDigests |
| } |
| |
| // Make sure Impl implements the API interface. |
| var _ API = (*Impl)(nil) |