blob: 6b26b5198630915c1fcc3092784e059866f284ae [file] [log] [blame]
// Package search2 encapsulates various queries we make against Gold's data. It is backed
// by the SQL database and aims to replace the current search package.
package search2
import (
"context"
"sync"
"time"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"go.opencensus.io/trace"
"golang.org/x/sync/errgroup"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/golden/go/sql"
"go.skia.org/infra/golden/go/sql/schema"
)
type API interface {
// NewAndUntriagedSummaryForCL returns a summarized look at the new digests produced by a CL
// (that is, digests not currently on the primary branch for this grouping at all) as well as
// how many of the newly produced digests are currently untriaged.
NewAndUntriagedSummaryForCL(ctx context.Context, qCLID string) (NewAndUntriagedSummary, error)
// ChangelistLastUpdated returns the timestamp that the given CL was updated. It returns an
// error if the CL does not exist.
ChangelistLastUpdated(ctx context.Context, qCLID string) (time.Time, error)
}
// NewAndUntriagedSummary is a summary of the results associated with a given CL. It focuses on
// the untriaged and new images produced.
type NewAndUntriagedSummary struct {
// ChangelistID is the nonqualified id of the CL.
ChangelistID string
// PatchsetSummaries is a summary for all Patchsets for which we have data.
PatchsetSummaries []PatchsetNewAndUntriagedSummary
// LastUpdated returns the timestamp of the CL, which corresponds to the last datapoint for
// this CL.
LastUpdated time.Time
}
// PatchsetNewAndUntriagedSummary is the summary for a specific PS. It focuses on the untriaged
// and new images produced.
type PatchsetNewAndUntriagedSummary struct {
// NewImages is the number of new images (digests) that were produced by this patchset by
// non-ignored traces and not seen on the primary branch.
NewImages int
// NewUntriagedImages is the number of NewImages which are still untriaged. It is less than or
// equal to NewImages.
NewUntriagedImages int
// TotalUntriagedImages is the number of images produced by this patchset by non-ignored traces
// that are untriaged. This includes images that are untriaged and observed on the primary
// branch (i.e. might not be the fault of this CL/PS). It is greater than or equal to
// NewUntriagedImages.
TotalUntriagedImages int
// PatchsetID is the nonqualified id of the patchset. This is usually a git hash.
PatchsetID string
// PatchsetOrder is represents the chronological order the patchsets are in. It starts at 1.
PatchsetOrder int
}
type Impl struct {
db *pgxpool.Pool
// Protects the caches
mutex sync.RWMutex
// This caches the digests seen per grouping on the primary branch.
digestsOnPrimary map[groupingDigestKey]struct{}
}
// New returns an implementation of API.
func New(sqlDB *pgxpool.Pool) *Impl {
return &Impl{
db: sqlDB,
digestsOnPrimary: map[groupingDigestKey]struct{}{},
}
}
type groupingDigestKey struct {
groupingID schema.MD5Hash
digest schema.MD5Hash
}
// StartCacheProcess loads the caches used for searching and starts a gorotuine to keep those
// up to date.
func (s *Impl) StartCacheProcess(ctx context.Context, interval time.Duration, commitsWithData int) error {
if err := s.updateCaches(ctx, commitsWithData); err != nil {
return skerr.Wrapf(err, "setting up initial cache values")
}
go util.RepeatCtx(ctx, interval, func(ctx context.Context) {
err := s.updateCaches(ctx, commitsWithData)
if err != nil {
sklog.Errorf("Could not update caches: %s", err)
}
})
return nil
}
// updateCaches loads the digestsOnPrimary cache.
func (s *Impl) updateCaches(ctx context.Context, commitsWithData int) error {
ctx, span := trace.StartSpan(ctx, "search2_UpdateCaches", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
tile, err := s.getStartingTile(ctx, commitsWithData)
if err != nil {
return skerr.Wrapf(err, "geting tile to search")
}
onPrimary, err := s.getDigestsOnPrimary(ctx, tile)
if err != nil {
return skerr.Wrapf(err, "getting digests on primary branch")
}
s.mutex.Lock()
s.digestsOnPrimary = onPrimary
s.mutex.Unlock()
sklog.Infof("Digests on Primary cache refreshed with %d entries", len(onPrimary))
return nil
}
// getStartingTile returns the commit ID which is the beginning of the tile of interest (so we
// get enough data to do our comparisons).
func (s *Impl) getStartingTile(ctx context.Context, commitsWithDataToSearch int) (schema.TileID, error) {
ctx, span := trace.StartSpan(ctx, "getStartingTile")
defer span.End()
if commitsWithDataToSearch <= 0 {
return 0, nil
}
row := s.db.QueryRow(ctx, `SELECT tile_id FROM CommitsWithData
AS OF SYSTEM TIME '-0.1s'
ORDER BY commit_id DESC
LIMIT 1 OFFSET $1`, commitsWithDataToSearch)
var lc pgtype.Int4
if err := row.Scan(&lc); err != nil {
if err == pgx.ErrNoRows {
return 0, nil // not enough commits seen, so start at tile 0.
}
return 0, skerr.Wrapf(err, "getting latest commit")
}
if lc.Status == pgtype.Null {
// There are no commits with data, so start at tile 0.
return 0, nil
}
return schema.TileID(lc.Int), nil
}
// getDigestsOnPrimary returns a map of all distinct digests on the primary branch.
func (s *Impl) getDigestsOnPrimary(ctx context.Context, tile schema.TileID) (map[groupingDigestKey]struct{}, error) {
ctx, span := trace.StartSpan(ctx, "getDigestsOnPrimary")
defer span.End()
rows, err := s.db.Query(ctx, `
SELECT DISTINCT grouping_id, digest FROM
TiledTraceDigests WHERE tile_id >= $1`, tile)
if err != nil {
if err == pgx.ErrNoRows {
return map[groupingDigestKey]struct{}{}, nil
}
return nil, skerr.Wrap(err)
}
defer rows.Close()
rv := map[groupingDigestKey]struct{}{}
var digest schema.DigestBytes
var grouping schema.GroupingID
var key groupingDigestKey
keyGrouping := key.groupingID[:]
keyDigest := key.digest[:]
for rows.Next() {
err := rows.Scan(&grouping, &digest)
if err != nil {
return nil, skerr.Wrap(err)
}
copy(keyGrouping, grouping)
copy(keyDigest, digest)
rv[key] = struct{}{}
}
return rv, nil
}
// NewAndUntriagedSummaryForCL queries all the patchsets in parallel (to keep the query less
// complex). If there are no patchsets for the provided CL, it returns an error.
func (s *Impl) NewAndUntriagedSummaryForCL(ctx context.Context, qCLID string) (NewAndUntriagedSummary, error) {
ctx, span := trace.StartSpan(ctx, "search2_NewAndUntriagedSummaryForCL")
defer span.End()
patchsets, err := s.getPatchsets(ctx, qCLID)
if err != nil {
return NewAndUntriagedSummary{}, skerr.Wrap(err)
}
if len(patchsets) == 0 {
return NewAndUntriagedSummary{}, skerr.Fmt("CL %q not found", qCLID)
}
eg, ctx := errgroup.WithContext(ctx)
rv := make([]PatchsetNewAndUntriagedSummary, len(patchsets))
for i, p := range patchsets {
idx, ps := i, p
eg.Go(func() error {
sum, err := s.getSummaryForPS(ctx, qCLID, ps.id)
if err != nil {
return skerr.Wrap(err)
}
sum.PatchsetID = sql.Unqualify(ps.id)
sum.PatchsetOrder = ps.order
rv[idx] = sum
return nil
})
}
var updatedTS time.Time
eg.Go(func() error {
row := s.db.QueryRow(ctx, `SELECT last_ingested_data
FROM Changelists WHERE changelist_id = $1`, qCLID)
return skerr.Wrap(row.Scan(&updatedTS))
})
if err := eg.Wait(); err != nil {
return NewAndUntriagedSummary{}, skerr.Wrapf(err, "Getting counts for CL %q and %d PS", qCLID, len(patchsets))
}
return NewAndUntriagedSummary{
ChangelistID: sql.Unqualify(qCLID),
PatchsetSummaries: rv,
LastUpdated: updatedTS.UTC(),
}, nil
}
type psIDAndOrder struct {
id string
order int
}
// getPatchsets returns the qualified ids and orders of the patchsets sorted by ps_order.
func (s *Impl) getPatchsets(ctx context.Context, qualifiedID string) ([]psIDAndOrder, error) {
ctx, span := trace.StartSpan(ctx, "getPatchsets")
defer span.End()
rows, err := s.db.Query(ctx, `SELECT patchset_id, ps_order
FROM Patchsets WHERE changelist_id = $1 ORDER BY ps_order ASC`, qualifiedID)
if err != nil {
return nil, skerr.Wrapf(err, "getting summary for cl %q", qualifiedID)
}
defer rows.Close()
var rv []psIDAndOrder
for rows.Next() {
var row psIDAndOrder
if err := rows.Scan(&row.id, &row.order); err != nil {
return nil, skerr.Wrap(err)
}
rv = append(rv, row)
}
return rv, nil
}
// getSummaryForPS looks at all the data produced for a given PS and returns the a summary of the
// newly produced digests and untriaged digests.
func (s *Impl) getSummaryForPS(ctx context.Context, clid, psID string) (PatchsetNewAndUntriagedSummary, error) {
ctx, span := trace.StartSpan(ctx, "getSummaryForPS")
defer span.End()
const statement = `
WITH
CLDigests AS (
SELECT secondary_branch_trace_id, digest, grouping_id
FROM SecondaryBranchValues
WHERE branch_name = $1 and version_name = $2
),
NonIgnoredCLDigests AS (
-- We only want to count a digest once per grouping, no matter how many times it shows up
-- because group those together (by trace) in the frontend UI.
SELECT DISTINCT digest, CLDigests.grouping_id
FROM CLDigests
JOIN Traces
ON secondary_branch_trace_id = trace_id
WHERE Traces.matches_any_ignore_rule = False
),
CLExpectations AS (
SELECT grouping_id, digest, label
FROM SecondaryBranchExpectations
WHERE branch_name = $1
),
LabeledDigests AS (
SELECT NonIgnoredCLDigests.grouping_id, NonIgnoredCLDigests.digest, COALESCE(CLExpectations.label, COALESCE(Expectations.label, 'u')) as label
FROM NonIgnoredCLDigests
LEFT JOIN Expectations
ON NonIgnoredCLDigests.grouping_id = Expectations.grouping_id AND
NonIgnoredCLDigests.digest = Expectations.digest
LEFT JOIN CLExpectations
ON NonIgnoredCLDigests.grouping_id = CLExpectations.grouping_id AND
NonIgnoredCLDigests.digest = CLExpectations.digest
)
SELECT * FROM LabeledDigests;`
rows, err := s.db.Query(ctx, statement, clid, psID)
if err != nil {
return PatchsetNewAndUntriagedSummary{}, skerr.Wrapf(err, "getting summary for ps %q in cl %q", psID, clid)
}
defer rows.Close()
s.mutex.RLock()
defer s.mutex.RUnlock()
var digest schema.DigestBytes
var grouping schema.GroupingID
var label schema.ExpectationLabel
var key groupingDigestKey
keyGrouping := key.groupingID[:]
keyDigest := key.digest[:]
var rv PatchsetNewAndUntriagedSummary
for rows.Next() {
if err := rows.Scan(&grouping, &digest, &label); err != nil {
return PatchsetNewAndUntriagedSummary{}, skerr.Wrap(err)
}
copy(keyGrouping, grouping)
copy(keyDigest, digest)
_, isExisting := s.digestsOnPrimary[key]
if !isExisting {
rv.NewImages++
}
if label == schema.LabelUntriaged {
rv.TotalUntriagedImages++
if !isExisting {
rv.NewUntriagedImages++
}
}
}
return rv, nil
}
// ChangelistLastUpdated implements the API interface.
func (s *Impl) ChangelistLastUpdated(ctx context.Context, qCLID string) (time.Time, error) {
ctx, span := trace.StartSpan(ctx, "search2_ChangelistLastUpdated")
defer span.End()
var updatedTS time.Time
row := s.db.QueryRow(ctx, `SELECT last_ingested_data
FROM Changelists AS OF SYSTEM TIME '-0.1s' WHERE changelist_id = $1`, qCLID)
if err := row.Scan(&updatedTS); err != nil {
return time.Time{}, skerr.Wrapf(err, "Getting last updated ts for cl %q", qCLID)
}
return updatedTS.UTC(), nil
}
// Make sure Impl implements the API interface.
var _ API = (*Impl)(nil)