blob: eaecd6c04d49aa5211ff964fdf6647fa300b51d7 [file] [log] [blame]
// Package search2 encapsulates various queries we make against Gold's data. It is backed
// by the SQL database and aims to replace the current search package.
package search2
import (
"context"
"time"
"github.com/jackc/pgx/v4/pgxpool"
"go.opencensus.io/trace"
"golang.org/x/sync/errgroup"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/golden/go/sql"
)
type API interface {
// NewAndUntriagedSummaryForCL returns a summarized look at the new digests produced by a CL
// (that is, digests not currently on the primary branch for this grouping at all) as well as
// how many of the newly produced digests are currently untriaged.
NewAndUntriagedSummaryForCL(ctx context.Context, crs, clID string) (NewAndUntriagedSummary, error)
ChangelistLastUpdated(ctx context.Context, crs, clID string) (time.Time, error)
}
// NewAndUntriagedSummary is a summary of the results associated with a given CL. It focuses on
// the untriaged and new images produced.
type NewAndUntriagedSummary struct {
// ChangelistID is the nonqualified id of the CL.
ChangelistID string
// PatchsetSummaries is a summary for all Patchsets for which we have data.
PatchsetSummaries []PatchsetNewAndUntriagedSummary
// LastUpdated returns the timestamp of the CL, which corresponds to the last datapoint for
// this CL.
LastUpdated time.Time
}
// PatchsetNewAndUntriagedSummary is the summary for a specific PS. It focuses on the untriaged
// and new images produced.
type PatchsetNewAndUntriagedSummary struct {
// NewImages is the number of new images (digests) that were produced by this patchset by
// non-ignored traces and not seen on the primary branch.
NewImages int
// NewUntriagedImages is the number of NewImages which are still untriaged. It is less than or
// equal to NewImages.
NewUntriagedImages int
// TotalUntriagedImages is the number of images produced by this patchset by non-ignored traces
// that are untriaged. This includes images that are untriaged and observed on the primary
// branch (i.e. might not be the fault of this CL/PS). It is greater than or equal to
// NewUntriagedImages.
TotalUntriagedImages int
// PatchsetID is the nonqualified id of the patchset. This is usually a git hash.
PatchsetID string
// PatchsetOrder is represents the chronological order the patchsets are in. It starts at 1.
PatchsetOrder int
}
type Impl struct {
db *pgxpool.Pool
}
// New returns an implementation of API.
func New(sqlDB *pgxpool.Pool) *Impl {
return &Impl{db: sqlDB}
}
// NewAndUntriagedSummaryForCL queries all the patchsets in parallel (to keep the query less
// complex). If there are no patchsets for the provided CL, it returns an error.
func (s *Impl) NewAndUntriagedSummaryForCL(ctx context.Context, crs, clID string) (NewAndUntriagedSummary, error) {
ctx, span := trace.StartSpan(ctx, "search2_NewAndUntriagedSummaryForCL")
defer span.End()
qCLID := sql.Qualify(crs, clID)
patchsets, err := s.getPatchsets(ctx, qCLID)
if err != nil {
return NewAndUntriagedSummary{}, skerr.Wrap(err)
}
if len(patchsets) == 0 {
return NewAndUntriagedSummary{}, skerr.Fmt("CL %q not found", qCLID)
}
eg, ctx := errgroup.WithContext(ctx)
rv := make([]PatchsetNewAndUntriagedSummary, len(patchsets))
for i, p := range patchsets {
idx, ps := i, p
eg.Go(func() error {
sum, err := s.getSummaryForPS(ctx, qCLID, ps.id)
if err != nil {
return skerr.Wrap(err)
}
sum.PatchsetID = sql.Unqualify(ps.id)
sum.PatchsetOrder = ps.order
rv[idx] = sum
return nil
})
}
var updatedTS time.Time
eg.Go(func() error {
row := s.db.QueryRow(ctx, `SELECT last_ingested_data
FROM Changelists WHERE changelist_id = $1`, qCLID)
return skerr.Wrap(row.Scan(&updatedTS))
})
if err := eg.Wait(); err != nil {
return NewAndUntriagedSummary{}, skerr.Wrapf(err, "Getting counts for CL %q and %d PS", qCLID, len(patchsets))
}
return NewAndUntriagedSummary{
ChangelistID: clID,
PatchsetSummaries: rv,
LastUpdated: updatedTS.UTC(),
}, nil
}
type psIDAndOrder struct {
id string
order int
}
// getPatchsets returns the qualified ids and orders of the patchsets sorted by ps_order.
func (s *Impl) getPatchsets(ctx context.Context, qualifiedID string) ([]psIDAndOrder, error) {
ctx, span := trace.StartSpan(ctx, "getPatchsets")
defer span.End()
rows, err := s.db.Query(ctx, `SELECT patchset_id, ps_order
FROM Patchsets WHERE changelist_id = $1 ORDER BY ps_order ASC`, qualifiedID)
if err != nil {
return nil, skerr.Wrapf(err, "getting summary for cl %q", qualifiedID)
}
defer rows.Close()
var rv []psIDAndOrder
for rows.Next() {
var row psIDAndOrder
if err := rows.Scan(&row.id, &row.order); err != nil {
return nil, skerr.Wrap(err)
}
rv = append(rv, row)
}
return rv, nil
}
// getSummaryForPS looks at all the data produced for a given PS and returns the a summary of the
// newly produced digests and untriaged digests.
func (s *Impl) getSummaryForPS(ctx context.Context, clid, psID string) (PatchsetNewAndUntriagedSummary, error) {
ctx, span := trace.StartSpan(ctx, "getSummaryForPS")
defer span.End()
const statement = `
WITH
CLDigests AS (
SELECT secondary_branch_trace_id, digest, grouping_id
FROM SecondaryBranchValues
WHERE branch_name = $1 and version_name = $2
),
NonIgnoredCLDigests AS (
-- We only want to count a digest once per grouping, no matter how many times it shows up
-- because group those together (by trace) in the frontend UI.
SELECT DISTINCT digest, CLDigests.grouping_id
FROM CLDigests
JOIN Traces
ON secondary_branch_trace_id = trace_id
WHERE Traces.matches_any_ignore_rule = False
),
CLExpectations AS (
SELECT grouping_id, digest, label
FROM SecondaryBranchExpectations
WHERE branch_name = $1
),
NewDigests AS (
SELECT NonIgnoredCLDigests.digest, NonIgnoredCLDigests.grouping_id
FROM NonIgnoredCLDigests
LEFT JOIN TiledTraceDigests
ON NonIgnoredCLDigests.grouping_id = TiledTraceDigests.grouping_id AND
NonIgnoredCLDigests.digest = TiledTraceDigests.digest
WHERE TiledTraceDigests.tile_id IS NULL
),
LabeledNewDigests AS (
SELECT COALESCE(CLExpectations.label, 'u') as label
FROM NewDigests
LEFT JOIN CLExpectations
ON NewDigests.grouping_id = CLExpectations.grouping_id AND
NewDigests.digest = CLExpectations.digest
),
LabeledDigests AS (
SELECT COALESCE(CLExpectations.label, COALESCE(Expectations.label, 'u')) as label
FROM NonIgnoredCLDigests
LEFT JOIN Expectations
ON NonIgnoredCLDigests.grouping_id = Expectations.grouping_id AND
NonIgnoredCLDigests.digest = Expectations.digest
LEFT JOIN CLExpectations
ON NonIgnoredCLDigests.grouping_id = CLExpectations.grouping_id AND
NonIgnoredCLDigests.digest = CLExpectations.digest
)
SELECT count(*) as "num_digests", '"new"' as "name"
FROM NewDigests
UNION
SELECT count(*) as "num_digests", '"new_unt"' as "name"
FROM LabeledNewDigests
WHERE label = 'u'
UNION
SELECT count(*) as "num_digests", '"unt"' as "name"
FROM LabeledDigests
WHERE label = 'u'`
rows, err := s.db.Query(ctx, statement, clid, psID)
if err != nil {
return PatchsetNewAndUntriagedSummary{}, skerr.Wrapf(err, "getting summary for ps %q in cl %q", psID, clid)
}
defer rows.Close()
rows.Next()
var ignore string // to get the union to work, I need to label the values (otherwise they get
// deduplicated), but I don't actually need to read this in, since it's always in the provided
// order.
rv := PatchsetNewAndUntriagedSummary{}
if err := rows.Scan(&rv.NewImages, &ignore); err != nil {
return PatchsetNewAndUntriagedSummary{}, skerr.Wrap(err)
}
rows.Next()
if err := rows.Scan(&rv.NewUntriagedImages, &ignore); err != nil {
return PatchsetNewAndUntriagedSummary{}, skerr.Wrap(err)
}
rows.Next()
if err := rows.Scan(&rv.TotalUntriagedImages, &ignore); err != nil {
return PatchsetNewAndUntriagedSummary{}, skerr.Wrap(err)
}
return rv, nil
}
// ChangelistLastUpdated implements the API interface.
func (s *Impl) ChangelistLastUpdated(ctx context.Context, crs, clID string) (time.Time, error) {
ctx, span := trace.StartSpan(ctx, "search2_ChangelistLastUpdated")
defer span.End()
qCLID := sql.Qualify(crs, clID)
var updatedTS time.Time
row := s.db.QueryRow(ctx, `SELECT last_ingested_data
FROM Changelists AS OF SYSTEM TIME '-0.1s' WHERE changelist_id = $1`, qCLID)
if err := row.Scan(&updatedTS); err != nil {
return time.Time{}, skerr.Wrapf(err, "Getting last updated ts for cl %q", qCLID)
}
return updatedTS.UTC(), nil
}
// Make sure Impl implements the API interface.
var _ API = (*Impl)(nil)