blob: 9045a92293bfc3a83c4032adc0aa35fac42e5a53 [file] [log] [blame]
// Package search2 encapsulates various queries we make against Gold's data. It is backed
// by the SQL database and aims to replace the current search package.
package search2
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"sort"
"sync"
"time"
lru "github.com/hashicorp/golang-lru"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"go.opencensus.io/trace"
"golang.org/x/sync/errgroup"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/golden/go/expectations"
"go.skia.org/infra/golden/go/search"
"go.skia.org/infra/golden/go/search/common"
"go.skia.org/infra/golden/go/search/frontend"
"go.skia.org/infra/golden/go/search/query"
"go.skia.org/infra/golden/go/sql"
"go.skia.org/infra/golden/go/sql/schema"
"go.skia.org/infra/golden/go/tiling"
"go.skia.org/infra/golden/go/types"
web_frontend "go.skia.org/infra/golden/go/web/frontend"
)
type API interface {
// NewAndUntriagedSummaryForCL returns a summarized look at the new digests produced by a CL
// (that is, digests not currently on the primary branch for this grouping at all) as well as
// how many of the newly produced digests are currently untriaged.
NewAndUntriagedSummaryForCL(ctx context.Context, qCLID string) (NewAndUntriagedSummary, error)
// ChangelistLastUpdated returns the timestamp that the given CL was updated. It returns an
// error if the CL does not exist.
ChangelistLastUpdated(ctx context.Context, qCLID string) (time.Time, error)
// Search queries the current tile based on the parameters specified in
// the instance of the *query.Search.
Search(context.Context, *query.Search) (*frontend.SearchResponse, error)
}
// NewAndUntriagedSummary is a summary of the results associated with a given CL. It focuses on
// the untriaged and new images produced.
type NewAndUntriagedSummary struct {
// ChangelistID is the nonqualified id of the CL.
ChangelistID string
// PatchsetSummaries is a summary for all Patchsets for which we have data.
PatchsetSummaries []PatchsetNewAndUntriagedSummary
// LastUpdated returns the timestamp of the CL, which corresponds to the last datapoint for
// this CL.
LastUpdated time.Time
// Outdated is set to true if the value that was previously cached was out of date and is
// currently being recalculated. We do this to return something quickly to the user (even if
// something like the
Outdated bool
}
// PatchsetNewAndUntriagedSummary is the summary for a specific PS. It focuses on the untriaged
// and new images produced.
type PatchsetNewAndUntriagedSummary struct {
// NewImages is the number of new images (digests) that were produced by this patchset by
// non-ignored traces and not seen on the primary branch.
NewImages int
// NewUntriagedImages is the number of NewImages which are still untriaged. It is less than or
// equal to NewImages.
NewUntriagedImages int
// TotalUntriagedImages is the number of images produced by this patchset by non-ignored traces
// that are untriaged. This includes images that are untriaged and observed on the primary
// branch (i.e. might not be the fault of this CL/PS). It is greater than or equal to
// NewUntriagedImages.
TotalUntriagedImages int
// PatchsetID is the nonqualified id of the patchset. This is usually a git hash.
PatchsetID string
// PatchsetOrder is represents the chronological order the patchsets are in. It starts at 1.
PatchsetOrder int
}
const (
commitCacheSize = 5_000
optionsGroupingCacheSize = 50_000
traceCacheSize = 1_000_000
)
type Impl struct {
db *pgxpool.Pool
windowLength int
// Protects the caches
mutex sync.RWMutex
// This caches the digests seen per grouping on the primary branch.
digestsOnPrimary map[groupingDigestKey]struct{}
commitCache *lru.Cache
optionsGroupingCache *lru.Cache
traceCache *lru.Cache
}
// New returns an implementation of API.
func New(sqlDB *pgxpool.Pool, windowLength int) *Impl {
cc, err := lru.New(commitCacheSize)
if err != nil {
panic(err) // should only happen if commitCacheSize is negative.
}
gc, err := lru.New(optionsGroupingCacheSize)
if err != nil {
panic(err) // should only happen if optionsGroupingCacheSize is negative.
}
tc, err := lru.New(traceCacheSize)
if err != nil {
panic(err) // should only happen if traceCacheSize is negative.
}
return &Impl{
db: sqlDB,
windowLength: windowLength,
digestsOnPrimary: map[groupingDigestKey]struct{}{},
commitCache: cc,
optionsGroupingCache: gc,
traceCache: tc,
}
}
type groupingDigestKey struct {
groupingID schema.MD5Hash
digest schema.MD5Hash
}
// StartCacheProcess loads the caches used for searching and starts a gorotuine to keep those
// up to date.
func (s *Impl) StartCacheProcess(ctx context.Context, interval time.Duration, commitsWithData int) error {
if err := s.updateCaches(ctx, commitsWithData); err != nil {
return skerr.Wrapf(err, "setting up initial cache values")
}
go util.RepeatCtx(ctx, interval, func(ctx context.Context) {
err := s.updateCaches(ctx, commitsWithData)
if err != nil {
sklog.Errorf("Could not update caches: %s", err)
}
})
return nil
}
// updateCaches loads the digestsOnPrimary cache.
func (s *Impl) updateCaches(ctx context.Context, commitsWithData int) error {
ctx, span := trace.StartSpan(ctx, "search2_UpdateCaches", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
tile, err := s.getStartingTile(ctx, commitsWithData)
if err != nil {
return skerr.Wrapf(err, "geting tile to search")
}
onPrimary, err := s.getDigestsOnPrimary(ctx, tile)
if err != nil {
return skerr.Wrapf(err, "getting digests on primary branch")
}
s.mutex.Lock()
s.digestsOnPrimary = onPrimary
s.mutex.Unlock()
sklog.Infof("Digests on Primary cache refreshed with %d entries", len(onPrimary))
return nil
}
// getStartingTile returns the commit ID which is the beginning of the tile of interest (so we
// get enough data to do our comparisons).
func (s *Impl) getStartingTile(ctx context.Context, commitsWithDataToSearch int) (schema.TileID, error) {
ctx, span := trace.StartSpan(ctx, "getStartingTile")
defer span.End()
if commitsWithDataToSearch <= 0 {
return 0, nil
}
row := s.db.QueryRow(ctx, `SELECT tile_id FROM CommitsWithData
AS OF SYSTEM TIME '-0.1s'
ORDER BY commit_id DESC
LIMIT 1 OFFSET $1`, commitsWithDataToSearch)
var lc pgtype.Int4
if err := row.Scan(&lc); err != nil {
if err == pgx.ErrNoRows {
return 0, nil // not enough commits seen, so start at tile 0.
}
return 0, skerr.Wrapf(err, "getting latest commit")
}
if lc.Status == pgtype.Null {
// There are no commits with data, so start at tile 0.
return 0, nil
}
return schema.TileID(lc.Int), nil
}
// getDigestsOnPrimary returns a map of all distinct digests on the primary branch.
func (s *Impl) getDigestsOnPrimary(ctx context.Context, tile schema.TileID) (map[groupingDigestKey]struct{}, error) {
ctx, span := trace.StartSpan(ctx, "getDigestsOnPrimary")
defer span.End()
rows, err := s.db.Query(ctx, `
SELECT DISTINCT grouping_id, digest FROM
TiledTraceDigests WHERE tile_id >= $1`, tile)
if err != nil {
if err == pgx.ErrNoRows {
return map[groupingDigestKey]struct{}{}, nil
}
return nil, skerr.Wrap(err)
}
defer rows.Close()
rv := map[groupingDigestKey]struct{}{}
var digest schema.DigestBytes
var grouping schema.GroupingID
var key groupingDigestKey
keyGrouping := key.groupingID[:]
keyDigest := key.digest[:]
for rows.Next() {
err := rows.Scan(&grouping, &digest)
if err != nil {
return nil, skerr.Wrap(err)
}
copy(keyGrouping, grouping)
copy(keyDigest, digest)
rv[key] = struct{}{}
}
return rv, nil
}
// NewAndUntriagedSummaryForCL queries all the patchsets in parallel (to keep the query less
// complex). If there are no patchsets for the provided CL, it returns an error.
func (s *Impl) NewAndUntriagedSummaryForCL(ctx context.Context, qCLID string) (NewAndUntriagedSummary, error) {
ctx, span := trace.StartSpan(ctx, "search2_NewAndUntriagedSummaryForCL")
defer span.End()
patchsets, err := s.getPatchsets(ctx, qCLID)
if err != nil {
return NewAndUntriagedSummary{}, skerr.Wrap(err)
}
if len(patchsets) == 0 {
return NewAndUntriagedSummary{}, skerr.Fmt("CL %q not found", qCLID)
}
eg, ctx := errgroup.WithContext(ctx)
rv := make([]PatchsetNewAndUntriagedSummary, len(patchsets))
for i, p := range patchsets {
idx, ps := i, p
eg.Go(func() error {
sum, err := s.getSummaryForPS(ctx, qCLID, ps.id)
if err != nil {
return skerr.Wrap(err)
}
sum.PatchsetID = sql.Unqualify(ps.id)
sum.PatchsetOrder = ps.order
rv[idx] = sum
return nil
})
}
var updatedTS time.Time
eg.Go(func() error {
var err error
updatedTS, err = s.ChangelistLastUpdated(ctx, qCLID)
return skerr.Wrap(err)
})
if err := eg.Wait(); err != nil {
return NewAndUntriagedSummary{}, skerr.Wrapf(err, "Getting counts for CL %q and %d PS", qCLID, len(patchsets))
}
return NewAndUntriagedSummary{
ChangelistID: sql.Unqualify(qCLID),
PatchsetSummaries: rv,
LastUpdated: updatedTS.UTC(),
}, nil
}
type psIDAndOrder struct {
id string
order int
}
// getPatchsets returns the qualified ids and orders of the patchsets sorted by ps_order.
func (s *Impl) getPatchsets(ctx context.Context, qualifiedID string) ([]psIDAndOrder, error) {
ctx, span := trace.StartSpan(ctx, "getPatchsets")
defer span.End()
rows, err := s.db.Query(ctx, `SELECT patchset_id, ps_order
FROM Patchsets WHERE changelist_id = $1 ORDER BY ps_order ASC`, qualifiedID)
if err != nil {
return nil, skerr.Wrapf(err, "getting summary for cl %q", qualifiedID)
}
defer rows.Close()
var rv []psIDAndOrder
for rows.Next() {
var row psIDAndOrder
if err := rows.Scan(&row.id, &row.order); err != nil {
return nil, skerr.Wrap(err)
}
rv = append(rv, row)
}
return rv, nil
}
// getSummaryForPS looks at all the data produced for a given PS and returns the a summary of the
// newly produced digests and untriaged digests.
func (s *Impl) getSummaryForPS(ctx context.Context, clid, psID string) (PatchsetNewAndUntriagedSummary, error) {
ctx, span := trace.StartSpan(ctx, "getSummaryForPS")
defer span.End()
const statement = `
WITH
CLDigests AS (
SELECT secondary_branch_trace_id, digest, grouping_id
FROM SecondaryBranchValues
WHERE branch_name = $1 and version_name = $2
),
NonIgnoredCLDigests AS (
-- We only want to count a digest once per grouping, no matter how many times it shows up
-- because group those together (by trace) in the frontend UI.
SELECT DISTINCT digest, CLDigests.grouping_id
FROM CLDigests
JOIN Traces
ON secondary_branch_trace_id = trace_id
WHERE Traces.matches_any_ignore_rule = False
),
CLExpectations AS (
SELECT grouping_id, digest, label
FROM SecondaryBranchExpectations
WHERE branch_name = $1
),
LabeledDigests AS (
SELECT NonIgnoredCLDigests.grouping_id, NonIgnoredCLDigests.digest, COALESCE(CLExpectations.label, COALESCE(Expectations.label, 'u')) as label
FROM NonIgnoredCLDigests
LEFT JOIN Expectations
ON NonIgnoredCLDigests.grouping_id = Expectations.grouping_id AND
NonIgnoredCLDigests.digest = Expectations.digest
LEFT JOIN CLExpectations
ON NonIgnoredCLDigests.grouping_id = CLExpectations.grouping_id AND
NonIgnoredCLDigests.digest = CLExpectations.digest
)
SELECT * FROM LabeledDigests;`
rows, err := s.db.Query(ctx, statement, clid, psID)
if err != nil {
return PatchsetNewAndUntriagedSummary{}, skerr.Wrapf(err, "getting summary for ps %q in cl %q", psID, clid)
}
defer rows.Close()
s.mutex.RLock()
defer s.mutex.RUnlock()
var digest schema.DigestBytes
var grouping schema.GroupingID
var label schema.ExpectationLabel
var key groupingDigestKey
keyGrouping := key.groupingID[:]
keyDigest := key.digest[:]
var rv PatchsetNewAndUntriagedSummary
for rows.Next() {
if err := rows.Scan(&grouping, &digest, &label); err != nil {
return PatchsetNewAndUntriagedSummary{}, skerr.Wrap(err)
}
copy(keyGrouping, grouping)
copy(keyDigest, digest)
_, isExisting := s.digestsOnPrimary[key]
if !isExisting {
rv.NewImages++
}
if label == schema.LabelUntriaged {
rv.TotalUntriagedImages++
if !isExisting {
rv.NewUntriagedImages++
}
}
}
return rv, nil
}
// ChangelistLastUpdated implements the API interface.
func (s *Impl) ChangelistLastUpdated(ctx context.Context, qCLID string) (time.Time, error) {
ctx, span := trace.StartSpan(ctx, "search2_ChangelistLastUpdated")
defer span.End()
var updatedTS time.Time
row := s.db.QueryRow(ctx, `WITH
LastSeenData AS (
SELECT last_ingested_data as ts FROM Changelists
WHERE changelist_id = $1
),
LatestTriageAction AS (
SELECT triage_time as ts FROM ExpectationRecords
WHERE branch_name = $1
ORDER BY triage_time DESC LIMIT 1
)
SELECT ts FROM LastSeenData
UNION
SELECT ts FROM LatestTriageAction
ORDER BY ts DESC LIMIT 1
`, qCLID)
if err := row.Scan(&updatedTS); err != nil {
if err == pgx.ErrNoRows {
return time.Time{}, nil
}
return time.Time{}, skerr.Wrapf(err, "Getting last updated ts for cl %q", qCLID)
}
return updatedTS.UTC(), nil
}
// Search implements the SearchAPI interface.
func (s *Impl) Search(ctx context.Context, q *query.Search) (*frontend.SearchResponse, error) {
ctx, span := trace.StartSpan(ctx, "search2_Search")
defer span.End()
ctx = context.WithValue(ctx, queryKey, *q)
ctx, err := s.addCommitsData(ctx)
if err != nil {
return nil, skerr.Wrap(err)
}
commits, err := s.getCommits(ctx)
if err != nil {
return nil, skerr.Wrap(err)
}
// Find all digests and traces that match the given search criteria.
traceDigests, err := s.getMatchingDigestsAndTraces(ctx)
if err != nil {
return nil, skerr.Wrap(err)
}
// Lookup the closest diffs to the given digests. This returns a subset according to the
// limit and offset in the query.
closestDiffs, allClosestLabels, err := s.getClosestDiffs(ctx, traceDigests)
if err != nil {
return nil, skerr.Wrap(err)
}
// Go fetch history and paramset (within this grouping)
paramsetsByDigest, err := s.getParamsetsForDigests(ctx, closestDiffs)
if err != nil {
return nil, skerr.Wrap(err)
}
// Flesh out the trace history with enough data to draw the dots diagram on the frontend.
results, err := s.fillOutTraceHistory(ctx, closestDiffs)
if err != nil {
return nil, skerr.Wrap(err)
}
// Create the mapping that allows us to bulk triage all results (not for just the ones shown).
bulkTriageData, err := s.convertBulkTriageData(ctx, allClosestLabels)
if err != nil {
return nil, skerr.Wrap(err)
}
// Fill in the paramsets of the reference images.
for _, sr := range results {
for _, srdd := range sr.RefDiffs {
if srdd != nil {
srdd.ParamSet = paramsetsByDigest[srdd.Digest]
}
}
}
return &frontend.SearchResponse{
Results: results,
Offset: q.Offset,
Size: len(allClosestLabels),
BulkTriageData: bulkTriageData,
Commits: commits,
}, nil
}
// To avoid piping a lot of info about the commits in the most recent window through all the
// functions in the search pipeline, we attach them as values to the context.
type searchContextKey string
const (
actualWindowLengthKey = searchContextKey("actualWindowLengthKey")
commitToIdxKey = searchContextKey("commitToIdxKey")
firstCommitIDKey = searchContextKey("firstCommitIDKey")
firstTileIDKey = searchContextKey("firstTileIDKey")
queryKey = searchContextKey("query")
)
func getFirstCommitID(ctx context.Context) schema.CommitID {
return ctx.Value(firstCommitIDKey).(schema.CommitID)
}
func getFirstTileID(ctx context.Context) schema.TileID {
return ctx.Value(firstTileIDKey).(schema.TileID)
}
func getCommitToIdxMap(ctx context.Context) map[schema.CommitID]int {
return ctx.Value(commitToIdxKey).(map[schema.CommitID]int)
}
func getActualWindowLength(ctx context.Context) int {
return ctx.Value(actualWindowLengthKey).(int)
}
func getQuery(ctx context.Context) query.Search {
return ctx.Value(queryKey).(query.Search)
}
// addCommitsData finds the current sliding window of data (The last N commits) and adds the
// derived data to the given context and returns it.
func (s *Impl) addCommitsData(ctx context.Context) (context.Context, error) {
// Note: need to rename the context here to avoid adding the span data to all other contexts.
sCtx, span := trace.StartSpan(ctx, "addCommitsData")
defer span.End()
const statement = `SELECT commit_id, tile_id FROM
CommitsWithData ORDER BY commit_id DESC LIMIT $1`
rows, err := s.db.Query(sCtx, statement, s.windowLength)
if err != nil {
return nil, skerr.Wrap(err)
}
defer rows.Close()
ids := make([]schema.CommitID, 0, s.windowLength)
var firstObservedTile schema.TileID
for rows.Next() {
var id schema.CommitID
if err := rows.Scan(&id, &firstObservedTile); err != nil {
return nil, skerr.Wrap(err)
}
ids = append(ids, id)
}
if len(ids) == 0 {
return nil, skerr.Fmt("No commits with data")
}
// ids is ordered most recent commit to last commit at this point
ctx = context.WithValue(ctx, actualWindowLengthKey, len(ids))
ctx = context.WithValue(ctx, firstCommitIDKey, ids[len(ids)-1])
ctx = context.WithValue(ctx, firstTileIDKey, firstObservedTile)
idToIndex := map[schema.CommitID]int{}
idx := 0
for i := len(ids) - 1; i >= 0; i-- {
idToIndex[ids[i]] = idx
idx++
}
ctx = context.WithValue(ctx, commitToIdxKey, idToIndex)
return ctx, nil
}
type stageOneResult struct {
traceID schema.TraceID
groupingID schema.GroupingID
digest schema.DigestBytes
}
// getMatchingDigestsAndTraces returns the tuples of digest+traceID that match the given query.
// The order of the result is abitrary.
func (s *Impl) getMatchingDigestsAndTraces(ctx context.Context) ([]stageOneResult, error) {
ctx, span := trace.StartSpan(ctx, "getMatchingDigestsAndTraces")
defer span.End()
statement := `WITH
MatchingDigests AS (
SELECT grouping_id, digest FROM Expectations
WHERE label = ANY($1)
),`
tracesBlock, args := matchingTracesStatement(ctx)
statement += tracesBlock
statement += `
SELECT trace_id, MatchingDigests.grouping_id, MatchingTraces.digest FROM
MatchingDigests
JOIN
MatchingTraces ON MatchingDigests.grouping_id = MatchingTraces.grouping_id AND
MatchingDigests.digest = MatchingTraces.digest`
q := getQuery(ctx)
var triageStatuses []schema.ExpectationLabel
if q.IncludeUntriagedDigests {
triageStatuses = append(triageStatuses, schema.LabelUntriaged)
}
if q.IncludeNegativeDigests {
triageStatuses = append(triageStatuses, schema.LabelNegative)
}
if q.IncludePositiveDigests {
triageStatuses = append(triageStatuses, schema.LabelPositive)
}
arguments := append([]interface{}{triageStatuses}, args...)
rows, err := s.db.Query(ctx, statement, arguments...)
if err != nil {
return nil, skerr.Wrapf(err, "searching for query %v with args %v", q, arguments)
}
defer rows.Close()
var rv []stageOneResult
for rows.Next() {
var row stageOneResult
if err := rows.Scan(&row.traceID, &row.groupingID, &row.digest); err != nil {
return nil, skerr.Wrap(err)
}
rv = append(rv, row)
}
return rv, nil
}
type filterSets struct {
key string
values []string
}
// matchingTracesStatement returns a SQL snippet that includes a WITH table called MatchingTraces.
// This table will have rows containing trace_id, grouping_id, and digest of traces that match
// the given search criteria. The second parameter is the arguments that need to be included
// in the query. This code knows to start using numbered parameters at 2.
func matchingTracesStatement(ctx context.Context) (string, []interface{}) {
var keyFilters []filterSets
q := getQuery(ctx)
for key, values := range q.TraceValues {
if key == types.CorpusField {
continue
}
if key != sql.Sanitize(key) {
sklog.Infof("key %q did not pass sanitization", key)
continue
}
keyFilters = append(keyFilters, filterSets{key: key, values: values})
}
ignoreStatuses := []bool{false}
if q.IncludeIgnoredTraces {
ignoreStatuses = append(ignoreStatuses, true)
}
args := []interface{}{getFirstCommitID(ctx), ignoreStatuses}
if q.OnlyIncludeDigestsProducedAtHead {
if len(keyFilters) == 0 {
// Corpus is being used as a string
args = append(args, q.TraceValues[types.CorpusField][0])
return `
MatchingTraces AS (
SELECT trace_id, grouping_id, digest FROM ValuesAtHead
WHERE most_recent_commit_id >= $2 AND
matches_any_ignore_rule = ANY($3) AND
corpus = $4
)`, args
}
// Corpus is being used as a JSONB value here
args = append(args, `"`+q.TraceValues[types.CorpusField][0]+`"`)
return joinedTracesStatement(keyFilters) + `
MatchingTraces AS (
SELECT ValuesAtHead.trace_id, grouping_id, digest FROM ValuesAtHead
JOIN JoinedTraces ON ValuesAtHead.trace_id = JoinedTraces.trace_id
WHERE most_recent_commit_id >= $2 AND
matches_any_ignore_rule = ANY($3)
)`, args
} else {
if len(keyFilters) == 0 {
// Corpus is being used as a string
args = append(args, q.TraceValues[types.CorpusField][0])
return `
TracesOfInterest AS (
SELECT trace_id FROM ValuesAtHead
WHERE matches_any_ignore_rule = ANY($3) AND
corpus = $4
),
MatchingTraces AS (
SELECT DISTINCT TraceValues.trace_id, TraceValues.grouping_id, TraceValues.digest
FROM TraceValues
JOIN TracesOfInterest on TraceValues.trace_id = TracesOfInterest.trace_id
WHERE commit_id >= $2
)
`, args
}
// Corpus is being used as a JSONB value here
args = append(args, `"`+q.TraceValues[types.CorpusField][0]+`"`)
return joinedTracesStatement(keyFilters) + `
TracesOfInterest AS (
SELECT Traces.trace_id FROM Traces
JOIN JoinedTraces ON Traces.trace_id = JoinedTraces.trace_id
WHERE matches_any_ignore_rule = ANY($3)
),
MatchingTraces AS (
SELECT DISTINCT TraceValues.trace_id, TraceValues.grouping_id, TraceValues.digest
FROM TraceValues
JOIN TracesOfInterest on TraceValues.trace_id = TracesOfInterest.trace_id
WHERE commit_id >= $2
)`, args
}
}
// joinedTracesStatement returns a SQL snippet that includes a WITH table called JoinedTraces.
// This table contains just the trace_ids that match the given filters. filters is expected to
// have keys which passed sanitization (it will sanitize the values). The snippet will include
// other tables that will be unioned and intersected to create the appropriate rows. This is
// similar to the technique we use for ingore rules, chosen to maximize consistent performance
// by using the inverted key index. The keys and values are hardcoded into the string instead
// of being passed in as arguments because kjlubick@ was not able to use the placeholder values
//to compare JSONB types removed from a JSONB object to a string while still using the indexes.
func joinedTracesStatement(filters []filterSets) string {
statement := ""
for i, filter := range filters {
statement += fmt.Sprintf("U%d AS (\n", i)
for j, value := range filter.values {
if j != 0 {
statement += "\tUNION\n"
}
statement += fmt.Sprintf("\tSELECT trace_id FROM Traces WHERE keys -> '%s' = '%q'\n", filter.key, sql.Sanitize(value))
}
statement += "),\n"
}
statement += "JoinedTraces AS (\n"
for i := range filters {
statement += fmt.Sprintf("\tSELECT trace_id FROM U%d\n\tINTERSECT\n", i)
}
// Include a final intersect for the corpus. The calling logic will make sure a JSONB value
// (i.e. a quoted string) is in the arguments slice.
statement += "\tSELECT trace_id FROM Traces where keys -> 'source_type' = $4\n),\n"
return statement
}
type stageTwoResult struct {
leftDigest schema.DigestBytes
groupingID schema.GroupingID
rightDigests []schema.DigestBytes
traceIDs []schema.TraceID
closestDigest *frontend.SRDiffDigest // These won't have ParamSets yet
closestPositive *frontend.SRDiffDigest
closestNegative *frontend.SRDiffDigest
}
// getClosestDiffs returns information about the closest triaged digests for each result in the
// input. We are able to batch the queries by grouping and do so for better performance.
// While this returns a subset of data as defined by the query, it also returns sufficient
// information to bulk-triage all of the inputs.
func (s *Impl) getClosestDiffs(ctx context.Context, inputs []stageOneResult) ([]stageTwoResult, map[groupingDigestKey]expectations.Label, error) {
ctx, span := trace.StartSpan(ctx, "getClosestDiffs")
defer span.End()
byGrouping := map[schema.MD5Hash][]stageOneResult{}
byDigest := map[schema.MD5Hash]stageTwoResult{}
for _, input := range inputs {
gID := sql.AsMD5Hash(input.groupingID)
byGrouping[gID] = append(byGrouping[gID], input)
dID := sql.AsMD5Hash(input.digest)
bd := byDigest[dID]
bd.leftDigest = input.digest
bd.groupingID = input.groupingID
bd.traceIDs = append(bd.traceIDs, input.traceID)
byDigest[dID] = bd
}
for groupingID, inputs := range byGrouping {
const statement = `WITH
ObservedDigestsInTile AS (
SELECT digest FROM TiledTraceDigests
WHERE grouping_id = $2 and tile_id >= $3
),
PositiveOrNegativeDigests AS (
SELECT digest, label FROM Expectations
WHERE grouping_id = $2 AND (label = 'n' OR label = 'p')
),
ComparisonBetweenUntriagedAndObserved AS (
SELECT DiffMetrics.* FROM DiffMetrics
JOIN ObservedDigestsInTile ON DiffMetrics.right_digest = ObservedDigestsInTile.digest
WHERE left_digest = ANY($1)
)
-- This will return the right_digest with the smallest combined_metric for each left_digest + label
SELECT DISTINCT ON (left_digest, label)
label, left_digest, right_digest, num_pixels_diff, percent_pixels_diff, max_rgba_diffs,
combined_metric, dimensions_differ
FROM
ComparisonBetweenUntriagedAndObserved
JOIN PositiveOrNegativeDigests
ON ComparisonBetweenUntriagedAndObserved.right_digest = PositiveOrNegativeDigests.digest
ORDER BY left_digest, label, combined_metric ASC, max_channel_diff ASC, right_digest ASC
`
digests := make([][]byte, 0, len(inputs))
for _, input := range inputs {
digests = append(digests, input.digest)
}
rows, err := s.db.Query(ctx, statement, digests, groupingID[:], getFirstTileID(ctx))
if err != nil {
return nil, nil, skerr.Wrap(err)
}
var label schema.ExpectationLabel
var row schema.DiffMetricRow
for rows.Next() {
if err := rows.Scan(&label, &row.LeftDigest, &row.RightDigest, &row.NumPixelsDiff,
&row.PercentPixelsDiff, &row.MaxRGBADiffs, &row.CombinedMetric,
&row.DimensionsDiffer); err != nil {
rows.Close()
return nil, nil, skerr.Wrap(err)
}
srdd := &frontend.SRDiffDigest{
Digest: types.Digest(hex.EncodeToString(row.RightDigest)),
Status: label.ToExpectation(),
CombinedMetric: row.CombinedMetric,
DimDiffer: row.DimensionsDiffer,
MaxRGBADiffs: row.MaxRGBADiffs,
NumDiffPixels: row.NumPixelsDiff,
PixelDiffPercent: row.PercentPixelsDiff,
QueryMetric: row.CombinedMetric,
}
leftDigest := sql.AsMD5Hash(row.LeftDigest)
stageTwo := byDigest[leftDigest]
stageTwo.rightDigests = append(stageTwo.rightDigests, row.RightDigest)
if label == schema.LabelPositive {
stageTwo.closestPositive = srdd
} else {
stageTwo.closestNegative = srdd
}
if stageTwo.closestNegative != nil && stageTwo.closestPositive != nil {
if stageTwo.closestPositive.CombinedMetric < stageTwo.closestNegative.CombinedMetric {
stageTwo.closestDigest = stageTwo.closestPositive
} else {
stageTwo.closestDigest = stageTwo.closestNegative
}
} else {
// there is only one type of diff, so it defaults to the closest.
stageTwo.closestDigest = srdd
}
byDigest[leftDigest] = stageTwo
}
rows.Close()
}
q := getQuery(ctx)
bulkTriageData := map[groupingDigestKey]expectations.Label{}
results := make([]stageTwoResult, 0, len(byDigest))
for _, s2 := range byDigest {
// Filter out any results without a closest triaged digest (if that option is selected).
if q.MustIncludeReferenceFilter && s2.closestDigest == nil {
continue
}
if s2.closestDigest != nil {
// Apply RGBA Filter here - if the closest digest isn't within range, we remove it.
maxDiff := util.MaxInt(s2.closestDigest.MaxRGBADiffs[:]...)
if maxDiff < q.RGBAMinFilter || maxDiff > q.RGBAMaxFilter {
continue
}
closestLabel := s2.closestDigest.Status
key := groupingDigestKey{groupingID: sql.AsMD5Hash(s2.groupingID), digest: sql.AsMD5Hash(s2.leftDigest)}
bulkTriageData[key] = closestLabel
}
results = append(results, s2)
}
if q.Offset >= len(results) {
return nil, bulkTriageData, nil
}
sortAsc := q.Sort == query.SortAscending
sort.Slice(results, func(i, j int) bool {
if results[i].closestDigest == nil {
return true // sort results with no reference image to the top
}
if results[j].closestDigest == nil {
return false
}
if results[i].closestDigest.CombinedMetric == results[j].closestDigest.CombinedMetric {
// Tiebreak using digest in ascending order.
return bytes.Compare(results[i].leftDigest, results[j].leftDigest) < 0
}
if sortAsc {
return results[i].closestDigest.CombinedMetric < results[j].closestDigest.CombinedMetric
}
return results[i].closestDigest.CombinedMetric > results[j].closestDigest.CombinedMetric
})
if q.Limit <= 0 {
return results, bulkTriageData, nil
}
end := util.MinInt(len(results), q.Offset+q.Limit)
return results[q.Offset:end], bulkTriageData, nil
}
// getParamsetsForDigests fetches all the traces that produced the digests in the data and
// the keys for these traces as ParamSets. The ParamSets are mapped according to the digest
// which produced them in the given window (or the tiles that approximate this).
func (s *Impl) getParamsetsForDigests(ctx context.Context, inputs []stageTwoResult) (map[types.Digest]paramtools.ParamSet, error) {
ctx, span := trace.StartSpan(ctx, "getParamsetsForDigests")
defer span.End()
var rightDigests []schema.DigestBytes
for _, input := range inputs {
rightDigests = append(rightDigests, input.rightDigests...)
}
span.AddAttributes(trace.Int64Attribute("num_digests", int64(len(rightDigests))))
digestToTraces, err := s.addRightTraces(ctx, rightDigests)
if err != nil {
return nil, skerr.Wrap(err)
}
traceToOptions, err := s.getOptionsForTraces(ctx, digestToTraces)
if err != nil {
return nil, skerr.Wrap(err)
}
rv, err := s.expandTracesIntoParamsets(ctx, digestToTraces, traceToOptions)
if err != nil {
return nil, skerr.Wrap(err)
}
return rv, nil
}
// addRightTraces finds the traces that draw the given digests. We do not need to consider the
// ignore rules or other search constraints because those only apply to the search results
// (that is, the left side).
func (s *Impl) addRightTraces(ctx context.Context, digests []schema.DigestBytes) (map[types.Digest][]schema.TraceID, error) {
ctx, span := trace.StartSpan(ctx, "addRightTraces")
defer span.End()
span.AddAttributes(trace.Int64Attribute("num_right_digests", int64(len(digests))))
const statement = `SELECT DISTINCT encode(digest, 'hex') as digest, trace_id
FROM TiledTraceDigests WHERE digest = ANY($1) AND tile_id >= $2
`
rows, err := s.db.Query(ctx, statement, digests, getFirstTileID(ctx))
if err != nil {
return nil, skerr.Wrap(err)
}
digestToTraces := map[types.Digest][]schema.TraceID{}
defer rows.Close()
for rows.Next() {
var digest types.Digest
var traceID schema.TraceID
if err := rows.Scan(&digest, &traceID); err != nil {
return nil, skerr.Wrap(err)
}
digestToTraces[digest] = append(digestToTraces[digest], traceID)
}
return digestToTraces, nil
}
// getOptionsForTraces returns the most recent option map for each given trace.
func (s *Impl) getOptionsForTraces(ctx context.Context, digestToTraces map[types.Digest][]schema.TraceID) (map[schema.MD5Hash]paramtools.Params, error) {
ctx, span := trace.StartSpan(ctx, "getOptionsForTraces")
defer span.End()
byTrace := map[schema.MD5Hash]paramtools.Params{}
placeHolder := paramtools.Params{}
var traceKey schema.MD5Hash
for _, traces := range digestToTraces {
for _, traceID := range traces {
copy(traceKey[:], traceID)
byTrace[traceKey] = placeHolder
}
}
// we now have a set of the traces we need to lookup
traceIDs := make([]schema.TraceID, 0, len(byTrace))
for trID := range byTrace {
traceIDs = append(traceIDs, sql.FromMD5Hash(trID))
}
const statement = `SELECT trace_id, options_id FROM ValuesAtHead
WHERE trace_id = ANY($1)`
rows, err := s.db.Query(ctx, statement, traceIDs)
if err != nil {
return nil, skerr.Wrap(err)
}
defer rows.Close()
var traceID schema.TraceID
var optionsID schema.OptionsID
for rows.Next() {
if err := rows.Scan(&traceID, &optionsID); err != nil {
return nil, skerr.Wrap(err)
}
opts, err := s.expandOptionsToParams(ctx, optionsID)
if err != nil {
return nil, skerr.Wrap(err)
}
copy(traceKey[:], traceID)
byTrace[traceKey] = opts
}
return byTrace, nil
}
// expandOptionsToParams returns the params that correspond to a given optionsID. The returned
// params should not be mutated, as it is not copied (for performance reasons).
func (s *Impl) expandOptionsToParams(ctx context.Context, optionsID schema.OptionsID) (paramtools.Params, error) {
ctx, span := trace.StartSpan(ctx, "expandOptionsToParams")
defer span.End()
if keys, ok := s.optionsGroupingCache.Get(string(optionsID)); ok {
return keys.(paramtools.Params), nil
}
// cache miss
const statement = `SELECT keys FROM Options WHERE options_id = $1`
row := s.db.QueryRow(ctx, statement, optionsID)
var keys paramtools.Params
if err := row.Scan(&keys); err != nil {
return nil, skerr.Wrap(err)
}
s.optionsGroupingCache.Add(string(optionsID), keys)
return keys, nil
}
// expandTracesIntoParamsets effectively returns a map detailing "who drew a given digest?". This
// is done by looking up the keys associated with each trace and combining them.
func (s *Impl) expandTracesIntoParamsets(ctx context.Context, toLookUp map[types.Digest][]schema.TraceID, traceToOptions map[schema.MD5Hash]paramtools.Params) (map[types.Digest]paramtools.ParamSet, error) {
ctx, span := trace.StartSpan(ctx, "expandTracesIntoParamsets")
defer span.End()
span.AddAttributes(trace.Int64Attribute("num_trace_sets", int64(len(toLookUp))))
rv := map[types.Digest]paramtools.ParamSet{}
for digest, traces := range toLookUp {
paramset, err := s.lookupOrLoadParamSetFromCache(ctx, traces)
if err != nil {
return nil, skerr.Wrap(err)
}
var traceKey schema.MD5Hash
for _, traceID := range traces {
copy(traceKey[:], traceID)
ps := traceToOptions[traceKey]
paramset.AddParams(ps)
}
paramset.Normalize()
rv[digest] = paramset
}
return rv, nil
}
// lookupOrLoadParamSetFromCache takes a slice of traces and returns a ParamSet combining all their
// keys. It will use the traceCache or query the DB and fill the cache on a cache miss.
func (s *Impl) lookupOrLoadParamSetFromCache(ctx context.Context, traces []schema.TraceID) (paramtools.ParamSet, error) {
ctx, span := trace.StartSpan(ctx, "lookupOrLoadParamSetFromCache")
defer span.End()
paramset := paramtools.ParamSet{}
var cacheMisses []schema.TraceID
for _, traceID := range traces {
if keys, ok := s.traceCache.Get(string(traceID)); ok {
paramset.AddParams(keys.(paramtools.Params))
} else {
cacheMisses = append(cacheMisses, traceID)
}
}
if len(cacheMisses) > 0 {
const statement = `SELECT trace_id, keys FROM Traces WHERE trace_id = ANY($1)`
rows, err := s.db.Query(ctx, statement, cacheMisses)
if err != nil {
return nil, skerr.Wrap(err)
}
defer rows.Close()
var traceID schema.TraceID
for rows.Next() {
var keys paramtools.Params
if err := rows.Scan(&traceID, &keys); err != nil {
return nil, skerr.Wrap(err)
}
s.traceCache.Add(string(traceID), keys)
paramset.AddParams(keys)
}
}
return paramset, nil
}
// expandTraceToParams will return a traces keys from the cache. On a cache miss, it will look
// up the trace from the DB and add it to the cache.
func (s *Impl) expandTraceToParams(ctx context.Context, traceID schema.TraceID) (paramtools.Params, error) {
ctx, span := trace.StartSpan(ctx, "expandTraceToParams")
defer span.End()
if keys, ok := s.traceCache.Get(string(traceID)); ok {
return keys.(paramtools.Params).Copy(), nil // Return a copy to protect cached value
}
// cache miss
const statement = `SELECT keys FROM Traces WHERE trace_id = $1`
row := s.db.QueryRow(ctx, statement, traceID)
var keys paramtools.Params
if err := row.Scan(&keys); err != nil {
return nil, skerr.Wrap(err)
}
s.traceCache.Add(string(traceID), keys)
return keys.Copy(), nil // Return a copy to protect cached value
}
// fillOutTraceHistory returns a slice of SearchResults that are mostly filled in, particularly
// including the history of the traces for each result.
func (s *Impl) fillOutTraceHistory(ctx context.Context, inputs []stageTwoResult) ([]*frontend.SearchResult, error) {
ctx, span := trace.StartSpan(ctx, "fillOutTraceHistory")
span.AddAttributes(trace.Int64Attribute("results", int64(len(inputs))))
defer span.End()
rv := make([]*frontend.SearchResult, len(inputs))
for i, input := range inputs {
sr := &frontend.SearchResult{
Digest: types.Digest(hex.EncodeToString(input.leftDigest)),
RefDiffs: map[common.RefClosest]*frontend.SRDiffDigest{
common.PositiveRef: input.closestPositive,
common.NegativeRef: input.closestNegative,
},
}
if input.closestDigest != nil && input.closestDigest.Status == expectations.Positive {
sr.ClosestRef = common.PositiveRef
} else if input.closestDigest != nil && input.closestDigest.Status == expectations.Negative {
sr.ClosestRef = common.NegativeRef
}
tg, err := s.traceGroupForTraces(ctx, input.traceIDs, sr.Digest)
if err != nil {
return nil, skerr.Wrap(err)
}
if err := s.fillInExpectations(ctx, &tg, input.groupingID); err != nil {
return nil, skerr.Wrap(err)
}
if err := s.fillInTraceParams(ctx, &tg); err != nil {
return nil, skerr.Wrap(err)
}
sr.TraceGroup = tg
if len(tg.Digests) > 0 {
// The first digest in the trace group is this digest.
sr.Status = tg.Digests[0].Status
} else {
// We assume untriaged if digest is not in the window.
sr.Status = expectations.Untriaged
}
if len(tg.Traces) > 0 {
// Grab the test name from the first trace, since we know all the traces are of
// the same grouping, which includes test name.
sr.Test = types.TestName(tg.Traces[0].Params[types.PrimaryKeyField])
}
leftPS := paramtools.ParamSet{}
for _, tr := range tg.Traces {
leftPS.AddParams(tr.Params)
}
leftPS.Normalize()
sr.ParamSet = leftPS
rv[i] = sr
}
return rv, nil
}
type traceDigestCommit struct {
traceID schema.TraceID
commitID schema.CommitID
digest types.Digest
optionsID schema.OptionsID
}
// traceGroupForTraces gets all the history for a slice of traces within the given window and
// turns it into a format that the frontend can render.
func (s *Impl) traceGroupForTraces(ctx context.Context, traceIDs []schema.TraceID, primary types.Digest) (frontend.TraceGroup, error) {
ctx, span := trace.StartSpan(ctx, "traceGroupForTraces")
span.AddAttributes(trace.Int64Attribute("num_traces", int64(len(traceIDs))))
defer span.End()
const statement = `SELECT trace_id, commit_id, encode(digest, 'hex'), options_id
FROM TraceValues WHERE trace_id = ANY($1) AND commit_id >= $2
ORDER BY trace_id, commit_id`
rows, err := s.db.Query(ctx, statement, traceIDs, getFirstCommitID(ctx))
if err != nil {
return frontend.TraceGroup{}, skerr.Wrap(err)
}
defer rows.Close()
var dataPoints []traceDigestCommit
for rows.Next() {
var row traceDigestCommit
if err := rows.Scan(&row.traceID, &row.commitID, &row.digest, &row.optionsID); err != nil {
return frontend.TraceGroup{}, skerr.Wrap(err)
}
dataPoints = append(dataPoints, row)
}
return makeTraceGroup(ctx, dataPoints, primary)
}
// fillInExpectations looks up all the expectations for the digests included in the given
// TraceGroup and updates the passed in TraceGroup directly.
func (s *Impl) fillInExpectations(ctx context.Context, tg *frontend.TraceGroup, groupingID schema.GroupingID) error {
ctx, span := trace.StartSpan(ctx, "fillInExpectations")
defer span.End()
arguments := make([]interface{}, 0, len(tg.Digests))
for _, digestStatus := range tg.Digests {
dBytes, err := sql.DigestToBytes(digestStatus.Digest)
if err != nil {
sklog.Warningf("invalid digest: %s", digestStatus.Digest)
continue
}
arguments = append(arguments, dBytes)
}
const statement = `SELECT encode(digest, 'hex'), label FROM Expectations
WHERE grouping_id = $1 and digest = ANY($2)`
rows, err := s.db.Query(ctx, statement, groupingID, arguments)
if err != nil {
return skerr.Wrap(err)
}
defer rows.Close()
for rows.Next() {
var digest types.Digest
var label schema.ExpectationLabel
if err := rows.Scan(&digest, &label); err != nil {
return skerr.Wrap(err)
}
for i, ds := range tg.Digests {
if ds.Digest == digest {
tg.Digests[i].Status = label.ToExpectation()
}
}
}
return nil
}
// fillInTraceParams looks up the keys (params) for each trace and fills them in on the passed in
// TraceGroup.
func (s *Impl) fillInTraceParams(ctx context.Context, tg *frontend.TraceGroup) error {
ctx, span := trace.StartSpan(ctx, "fillInTraceParams")
defer span.End()
for i, tr := range tg.Traces {
traceID, err := hex.DecodeString(string(tr.ID))
if err != nil {
return skerr.Wrapf(err, "invalid trace id %q", tr.ID)
}
ps, err := s.expandTraceToParams(ctx, traceID)
if err != nil {
return skerr.Wrap(err)
}
if tr.RawTrace.OptionsID != nil {
opts, err := s.expandOptionsToParams(ctx, tr.RawTrace.OptionsID)
if err != nil {
return skerr.Wrap(err)
}
ps.Add(opts)
}
tg.Traces[i].Params = ps
tg.Traces[i].RawTrace = nil // Done with this temporary data.
}
return nil
}
// convertBulkTriageData converts the passed in map into the version usable by the frontend.
func (s *Impl) convertBulkTriageData(ctx context.Context, data map[groupingDigestKey]expectations.Label) (web_frontend.TriageRequestData, error) {
ctx, span := trace.StartSpan(ctx, "convertBulkTriageData")
defer span.End()
rv := map[types.TestName]map[types.Digest]expectations.Label{}
for key, label := range data {
var groupingKeys paramtools.Params
if gk, ok := s.optionsGroupingCache.Get(key.groupingID); ok {
groupingKeys = gk.(paramtools.Params)
} else {
const statement = `SELECT keys FROM Groupings WHERE grouping_id = $1`
row := s.db.QueryRow(ctx, statement, key.groupingID[:])
if err := row.Scan(&groupingKeys); err != nil {
return nil, skerr.Wrap(err)
}
s.optionsGroupingCache.Add(key.groupingID, groupingKeys)
}
testName := types.TestName(groupingKeys[types.PrimaryKeyField])
digest := types.Digest(hex.EncodeToString(key.digest[:]))
if byTest, ok := rv[testName]; ok {
byTest[digest] = label
} else {
rv[testName] = map[types.Digest]expectations.Label{digest: label}
}
}
return rv, nil
}
// getCommits returns the front-end friendly version of the commits within the searched window.
func (s *Impl) getCommits(ctx context.Context) ([]web_frontend.Commit, error) {
ctx, span := trace.StartSpan(ctx, "getCommits")
defer span.End()
rv := make([]web_frontend.Commit, getActualWindowLength(ctx))
commitIDs := getCommitToIdxMap(ctx)
for commitID, idx := range commitIDs {
var commit web_frontend.Commit
if c, ok := s.commitCache.Get(commitID); ok {
commit = c.(web_frontend.Commit)
} else {
// TODO(kjlubick) will need to handle non-git repos too
const statement = `SELECT git_hash, commit_time, author_email, subject
FROM GitCommits WHERE commit_id = $1`
row := s.db.QueryRow(ctx, statement, commitID)
var dbRow schema.GitCommitRow
if err := row.Scan(&dbRow.GitHash, &dbRow.CommitTime, &dbRow.AuthorEmail, &dbRow.Subject); err != nil {
return nil, skerr.Wrap(err)
}
commit = web_frontend.Commit{
CommitTime: dbRow.CommitTime.UTC().Unix(),
Hash: dbRow.GitHash,
Author: dbRow.AuthorEmail,
Subject: dbRow.Subject,
}
s.commitCache.Add(commitID, commit)
}
rv[idx] = commit
}
return rv, nil
}
// makeTraceGroup converts all the trace+digest+commit triples into a TraceGroup. On the frontend,
// we only show the top 9 digests before fading them to grey - this handles that logic.
func makeTraceGroup(ctx context.Context, data []traceDigestCommit, primary types.Digest) (frontend.TraceGroup, error) {
ctx, span := trace.StartSpan(ctx, "makeTraceGroup")
defer span.End()
tg := frontend.TraceGroup{}
if len(data) == 0 {
return tg, nil
}
indexMap := getCommitToIdxMap(ctx)
currentTrace := frontend.Trace{
ID: tiling.TraceID(hex.EncodeToString(data[0].traceID)),
DigestIndices: emptyIndices(getActualWindowLength(ctx)),
RawTrace: tiling.NewEmptyTrace(getActualWindowLength(ctx), nil, nil),
}
tg.Traces = append(tg.Traces, currentTrace)
for _, dp := range data {
tID := tiling.TraceID(hex.EncodeToString(dp.traceID))
if currentTrace.ID != tID {
currentTrace = frontend.Trace{
ID: tID,
DigestIndices: emptyIndices(getActualWindowLength(ctx)),
RawTrace: tiling.NewEmptyTrace(getActualWindowLength(ctx), nil, nil),
}
tg.Traces = append(tg.Traces, currentTrace)
}
idx, ok := indexMap[dp.commitID]
if !ok {
continue
}
currentTrace.RawTrace.Digests[idx] = dp.digest
// We want to report the latest options, so always update this
currentTrace.RawTrace.OptionsID = dp.optionsID
}
// Find the most recent / important digests and assign them an index. Everything else will
// be given the sentinel value.
digestIndices, totalDigests := search.ComputeDigestIndices(&tg, primary)
tg.TotalDigests = totalDigests
tg.Digests = make([]frontend.DigestStatus, len(digestIndices))
for digest, idx := range digestIndices {
tg.Digests[idx] = frontend.DigestStatus{
Digest: digest,
}
}
for _, tr := range tg.Traces {
for j, digest := range tr.RawTrace.Digests {
if digest == tiling.MissingDigest {
continue // There is already the missing index there.
}
idx, ok := digestIndices[digest]
if ok {
tr.DigestIndices[j] = idx
} else {
// Fold everything else into the last digest index (grey on the frontend).
tr.DigestIndices[j] = search.MaxDistinctDigestsToPresent - 1
}
}
}
return tg, nil
}
// emptyIndices returns an array of the given length with placeholder values for "missing data".
func emptyIndices(length int) []int {
rv := make([]int, length)
for i := range rv {
rv[i] = search.MissingDigestIndex
}
return rv
}
// Make sure Impl implements the API interface.
var _ API = (*Impl)(nil)