blob: af8885391c3b4149fe236b6aff736aeec4aaeaab [file] [log] [blame]
package worker
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"image"
"image/png"
"time"
lru "github.com/hashicorp/golang-lru"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
ttlcache "github.com/patrickmn/go-cache"
"go.opencensus.io/trace"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/now"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/sql/sqlutil"
"go.skia.org/infra/go/util"
"go.skia.org/infra/golden/go/diff"
"go.skia.org/infra/golden/go/sql"
"go.skia.org/infra/golden/go/sql/schema"
"go.skia.org/infra/golden/go/types"
)
const (
// For tests with fewer than the computeTotalGridCutoff number of digests, we calculate all
// diffs for the test. Above this number, we try to be more clever about calculating a subset
// of useful diffs w/o doing too much work for very tests with very flaky traces.
// This number was chosen based on experimentation with the Skia instance (see also
// getCommonAndRecentDigests
computeTotalGridCutoff = 300
// Downloading and decoding images appears to be a bottleneck for diffing. We spin up a small
// cache for each diff message to help retain those images while we calculate the diffs.
// This number was chosen to be above the number of digests returned by
// getCommonAndRecentDigests, because downloading and decoding images is the most
// computationally expensive of the whole process.
decodedImageCacheSize = 1000
// In an effort to prevent spamming the ProblemImages database, we skip known bad images for
// a period of time. This time is controlled by badImageCooldown and a TTL cache.
badImageCooldown = time.Minute
diffingRoutines = 4
// This batch size corresponds to tens of seconds worth of computation. If we are
// interrupted, we hope not to lose more than this amount of work.
reportingBatchSize = 25
)
// ImageSource is an abstraction around a way to load the images. If images are stored in GCS, or
// on a file system or wherever, they should be provided by this mechanism.
type ImageSource interface {
// GetImage returns the raw bytes of an image with the corresponding Digest.
GetImage(ctx context.Context, digest types.Digest) ([]byte, error)
}
type WorkerImpl struct {
db *pgxpool.Pool
imageSource ImageSource
badDigestsCache *ttlcache.Cache
windowSize int
inputDigestsSummary metrics2.Float64SummaryMetric
digestsOfInterestSummary metrics2.Float64SummaryMetric
metricsCalculatedCounter metrics2.Counter
}
// New returns a diff worker which uses the provided ImageSource.
func New(db *pgxpool.Pool, src ImageSource, windowSize int) *WorkerImpl {
return &WorkerImpl{
db: db,
imageSource: src,
windowSize: windowSize,
badDigestsCache: ttlcache.New(badImageCooldown, 2*badImageCooldown),
metricsCalculatedCounter: metrics2.GetCounter("diffcalculator_metricscalculated"),
inputDigestsSummary: metrics2.GetFloat64SummaryMetric("diffcalculator_inputdigests"),
digestsOfInterestSummary: metrics2.GetFloat64SummaryMetric("diffcalculator_digestsofinterest"),
}
}
// CalculateDiffs calculates the diffs for the given grouping. It either computes all of the diffs
// if there are only "a few" digests, otherwise it computes a subset of them, taking into account
// recency and triage status.
func (w *WorkerImpl) CalculateDiffs(ctx context.Context, grouping paramtools.Params, additional []types.Digest) error {
ctx, span := trace.StartSpan(ctx, "worker2_CalculateDiffs")
if span.IsRecordingEvents() {
addMetadata(span, grouping, len(additional))
}
defer span.End()
startingTile, endingTile, err := w.getTileBounds(ctx)
if err != nil {
return skerr.Wrapf(err, "get starting tile")
}
allDigests, err := w.getAllExisting(ctx, startingTile, endingTile, grouping)
if err != nil {
return skerr.Wrap(err)
}
total := len(allDigests) + len(additional)
w.inputDigestsSummary.Observe(float64(total))
inputDigests := convertToDigestBytes(additional)
if total > computeTotalGridCutoff {
// If there are too many digests, we perform a somewhat expensive operation of looking at
// the digests produced by all traces to find a smaller subset of images that we should
// use to compute diffs for. We don't want to do this all the time because we expect
// a small percentage of groupings (i.e. tests) to have many digests.
return skerr.Wrap(w.calculateDiffSubset(ctx, grouping, inputDigests, startingTile))
}
allDigests = append(allDigests, inputDigests...)
return skerr.Wrap(w.calculateAllDiffs(ctx, allDigests))
}
// addMetadata adds some attributes to the span so we can tell how much work it was supposed to
// be doing when we are looking at the traces and the performance.
func addMetadata(span *trace.Span, grouping paramtools.Params, leftDigestCount int) {
groupingStr, _ := json.Marshal(grouping)
span.AddAttributes(
trace.StringAttribute("grouping", string(groupingStr)),
trace.Int64Attribute("additional_digests", int64(leftDigestCount)))
}
// convertToDigestBytes converts a slice of hex-encoded digests to bytes (the native type in the
// SQL DB. If any are invalid, they are ignored.
func convertToDigestBytes(digests []types.Digest) []schema.DigestBytes {
rv := make([]schema.DigestBytes, 0, len(digests))
for _, d := range digests {
b, err := sql.DigestToBytes(d)
if err != nil {
sklog.Warningf("Invalid digest seen %q: %s", d, err)
continue
}
rv = append(rv, b)
}
return rv
}
// getTileBounds returns the tile id corresponding to the commit of the beginning of the window,
// as well as the most recent tile id (so we get enough data to do our comparisons).
func (w *WorkerImpl) getTileBounds(ctx context.Context) (schema.TileID, schema.TileID, error) {
if w.windowSize <= 0 {
return 0, 0, nil
}
const statement = `WITH
RecentCommits AS (
SELECT tile_id, commit_id FROM CommitsWithData
AS OF SYSTEM TIME '-0.1s'
ORDER BY commit_id DESC LIMIT $1
)
SELECT MIN(tile_id), MAX(tile_id) FROM RecentCommits
AS OF SYSTEM TIME '-0.1s'
`
row := w.db.QueryRow(ctx, statement, w.windowSize)
var lc pgtype.Int4
var mc pgtype.Int4
if err := row.Scan(&lc, &mc); err != nil {
if err == pgx.ErrNoRows {
return 0, 0, nil // not enough commits seen, so start at tile 0.
}
return 0, 0, skerr.Wrapf(err, "getting latest commit")
}
if lc.Status == pgtype.Null || mc.Status == pgtype.Null {
// There are no commits with data, so start at tile 0.
return 0, 0, nil
}
return schema.TileID(lc.Int), schema.TileID(mc.Int), nil
}
// getAllExisting returns the unique digests that were seen on the primary branch for a given
// grouping starting at the given commit.
func (w *WorkerImpl) getAllExisting(ctx context.Context, beginTile, endTile schema.TileID, grouping paramtools.Params) ([]schema.DigestBytes, error) {
if w.windowSize <= 0 {
return nil, nil
}
ctx, span := trace.StartSpan(ctx, "getAllExisting")
defer span.End()
_, groupingID := sql.SerializeMap(grouping)
tracesForGroup, err := w.getTracesForGroup(ctx, groupingID)
if err != nil {
return nil, skerr.Wrap(err)
}
tilesInRange := make([]schema.TileID, 0, endTile-beginTile+1)
for i := beginTile; i <= endTile; i++ {
tilesInRange = append(tilesInRange, i)
}
// We tried doing "fetch traces for group" and "fetch TiledTraceDigests" all in one query.
// However, that took a long while in the large Skia repo. The reason being that the index
// was not used effectively. Our index is on tile_id + trace_id, however CockroachDB was
// having to effectively fetch all rows that came after the starting tile id, even though it
// only needed < 0.1% of them (assuming 1000 groupings). The to do this smarter is to specify
// exactly what tiles we need (not just "greater than this tile") and exactly what traces we
// need. This lets CockroachDB only fetch the portions of the index it needs to. We saw this
// be 10x more effective than the previous query with a join.
const statement = `
SELECT DISTINCT digest FROM TiledTraceDigests
AS OF SYSTEM TIME '-0.1s'
WHERE tile_id = ANY($1) AND trace_id = ANY($2)`
rows, err := w.db.Query(ctx, statement, tilesInRange, tracesForGroup)
if err != nil {
return nil, skerr.Wrapf(err, "fetching digests")
}
defer rows.Close()
var rv []schema.DigestBytes
for rows.Next() {
var d schema.DigestBytes
if err := rows.Scan(&d); err != nil {
return nil, skerr.Wrap(err)
}
rv = append(rv, d)
}
return rv, nil
}
// getTracesForGroup returns all the traces that are a part of the specified grouping.
func (w *WorkerImpl) getTracesForGroup(ctx context.Context, id schema.GroupingID) ([]schema.TraceID, error) {
ctx, span := trace.StartSpan(ctx, "getTracesForGroup")
defer span.End()
const statement = `SELECT trace_id FROM Traces
AS OF SYSTEM TIME '-0.1s'
WHERE grouping_id = $1`
rows, err := w.db.Query(ctx, statement, id)
if err != nil {
return nil, skerr.Wrapf(err, "fetching trace ids")
}
defer rows.Close()
var rv []schema.TraceID
for rows.Next() {
var t schema.TraceID
if err := rows.Scan(&t); err != nil {
return nil, skerr.Wrap(err)
}
rv = append(rv, t)
}
return rv, nil
}
// calculateAllDiffs calculates all diffs between each digest in the slice and all other digests.
// If there are duplicates in the given slice, they will be removed and not double-calculated.
func (w *WorkerImpl) calculateAllDiffs(ctx context.Context, digests []schema.DigestBytes) error {
if len(digests) == 0 {
return nil
}
ctx, span := trace.StartSpan(ctx, "calculateAllDiffs")
defer span.End()
span.AddAttributes(trace.Int64Attribute("num_digests", int64(len(digests))))
missingWork, err := w.getMissingDiffs(ctx, digests)
if err != nil {
return skerr.Wrap(err)
}
if len(missingWork) == 0 {
sklog.Infof("All diffs are already calculated")
return nil
}
span.AddAttributes(trace.Int64Attribute("num_diffs", int64(len(missingWork))))
if err := w.computeDiffsInParallel(ctx, missingWork); err != nil {
return skerr.Wrapf(err, "calculating %d diffs for %d digests", len(missingWork), len(digests))
}
return nil
}
// getMissingDiffs creates a half-square of diffs, where each digest is compared to every other
// digest. Then, it returns all those digestPairs of diffs that have not already been calculated.
func (w *WorkerImpl) getMissingDiffs(ctx context.Context, digests []schema.DigestBytes) ([]digestPair, error) {
ctx, span := trace.StartSpan(ctx, "getMissingDiffs")
span.AddAttributes(trace.Int64Attribute("num_digests", int64(len(digests))))
defer span.End()
possibleWork := map[digestPair]bool{}
for i := range digests {
left := types.Digest(hex.EncodeToString(digests[i]))
for j := i + 1; j < len(digests); j++ {
right := types.Digest(hex.EncodeToString(digests[j]))
if left == right {
continue
}
dp := newDigestPair(left, right)
possibleWork[dp] = true
}
}
span.AddAttributes(trace.Int64Attribute("possible_work_size", int64(len(possibleWork))))
const statement = `SELECT DISTINCT encode(left_digest, 'hex'), encode(right_digest, 'hex')
FROM DiffMetrics AS OF SYSTEM TIME '-0.1s'
WHERE left_digest = ANY($1) AND right_digest = ANY($1)`
rows, err := w.db.Query(ctx, statement, digests)
if err != nil {
return nil, skerr.Wrap(err)
}
defer rows.Close()
for rows.Next() {
if err := ctx.Err(); err != nil {
return nil, skerr.Wrapf(err, "context error")
}
var left, right types.Digest
if err := rows.Scan(&left, &right); err != nil {
return nil, skerr.Wrap(err)
}
dp := newDigestPair(left, right)
possibleWork[dp] = false
}
var toCalculate []digestPair
for dp, needsComputing := range possibleWork {
if needsComputing {
toCalculate = append(toCalculate, dp)
}
}
return toCalculate, nil
}
func (w *WorkerImpl) calculateDiffSubset(ctx context.Context, grouping paramtools.Params, digests []schema.DigestBytes, startingTile schema.TileID) error {
ctx, span := trace.StartSpan(ctx, "calculateDiffSubset")
defer span.End()
span.AddAttributes(trace.Int64Attribute("starting_digests", int64(len(digests))))
allTriaged, err := w.getTriagedDigests(ctx, grouping, startingTile)
if err != nil {
return skerr.Wrap(err)
}
span.AddAttributes(trace.Int64Attribute("triaged_digests", int64(len(allTriaged))))
commonAndRecentDigests, err := w.getCommonAndRecentDigests(ctx, grouping)
if err != nil {
return skerr.Wrap(err)
}
span.AddAttributes(trace.Int64Attribute("common_and_recent_digests", int64(len(commonAndRecentDigests))))
digests = append(digests, allTriaged...)
digests = append(digests, commonAndRecentDigests...)
// note: The digests slice could have duplicates, but that won't inflate the work we need to
// compute.
w.digestsOfInterestSummary.Observe(float64(len(digests)))
sklog.Infof("Got around %d digests of interest for grouping %#v", len(digests), grouping)
return skerr.Wrapf(w.calculateAllDiffs(ctx, digests), "calculating diffs for %d digests in grouping %#v", len(digests), grouping)
}
func (w *WorkerImpl) computeDiffsInParallel(ctx context.Context, work []digestPair) error {
ctx, span := trace.StartSpan(ctx, "computeDiffsInParallel")
span.AddAttributes(trace.Int64Attribute("num_diffs", int64(len(work))))
defer span.End()
sklog.Infof("Calculating %d new diffs", len(work))
imgCache, err := lru.New(decodedImageCacheSize)
if err != nil {
return skerr.Wrap(err)
}
ctx = addImgCache(ctx, imgCache)
defer func() {
imgCache.Purge() // Make it easier to GC anything left in the cache.
}()
chunkSize := len(work)/diffingRoutines + 1 // add 1 to avoid integer division to 0
err = util.ChunkIterParallel(ctx, len(work), chunkSize, func(ctx context.Context, startIdx int, endIdx int) error {
batch := work[startIdx:endIdx]
metricsBuffer := make([]schema.DiffMetricRow, 0, reportingBatchSize)
for _, pair := range batch {
if err := ctx.Err(); err != nil {
return skerr.Wrapf(err, "context error")
}
// If either image is known to be bad, skip this work
_, bad1 := w.badDigestsCache.Get(string(pair.left))
_, bad2 := w.badDigestsCache.Get(string(pair.right))
if bad1 || bad2 {
continue
}
nm, iErr := w.diff(ctx, pair.left, pair.right)
// If there is an error diffing, it is because we couldn't download or decode
// one of the images. If so, we skip that entry and report it, before moving on.
if iErr != nil {
if err := ctx.Err(); err != nil {
return skerr.Wrap(err)
}
// Add the bad image to the ttlcache so we skip it for a short while.
w.badDigestsCache.Set(string(iErr.digest), true, ttlcache.DefaultExpiration)
if err := w.reportProblemImage(ctx, iErr); err != nil {
return skerr.Wrap(err)
}
continue
}
metricsBuffer = append(metricsBuffer, nm)
if len(metricsBuffer) > reportingBatchSize {
if err := w.writeMetrics(ctx, metricsBuffer); err != nil {
return skerr.Wrap(err)
}
w.metricsCalculatedCounter.Inc(int64(len(metricsBuffer)))
metricsBuffer = metricsBuffer[:0] // reset buffer
}
}
if err := w.writeMetrics(ctx, metricsBuffer); err != nil {
return skerr.Wrap(err)
}
w.metricsCalculatedCounter.Inc(int64(len(metricsBuffer)))
return nil
})
return skerr.Wrap(err)
}
// diff calculates the difference between the two images with the provided digests and returns
// it in a format that can be inserted into the SQL database. If there is an error downloading
// or decoding a digest, an error is returned along with the problematic digest.
func (w *WorkerImpl) diff(ctx context.Context, left, right types.Digest) (schema.DiffMetricRow, *imgError) {
ctx, span := trace.StartSpan(ctx, "diff")
defer span.End()
lb, err := sql.DigestToBytes(left)
if err != nil {
return schema.DiffMetricRow{}, &imgError{digest: left, err: skerr.Wrap(err)}
}
rb, err := sql.DigestToBytes(right)
if err != nil {
return schema.DiffMetricRow{}, &imgError{digest: right, err: skerr.Wrap(err)}
}
leftImg, err := w.getDecodedImage(ctx, left)
if err != nil {
return schema.DiffMetricRow{}, &imgError{digest: left, err: skerr.Wrap(err)}
}
rightImg, err := w.getDecodedImage(ctx, right)
if err != nil {
return schema.DiffMetricRow{}, &imgError{digest: right, err: skerr.Wrap(err)}
}
m := diff.ComputeDiffMetrics(leftImg, rightImg)
return schema.DiffMetricRow{
LeftDigest: lb,
RightDigest: rb,
NumPixelsDiff: m.NumDiffPixels,
PercentPixelsDiff: m.PixelDiffPercent,
MaxRGBADiffs: m.MaxRGBADiffs,
MaxChannelDiff: max(m.MaxRGBADiffs),
CombinedMetric: m.CombinedMetric,
DimensionsDiffer: m.DimDiffer,
Timestamp: now.Now(ctx),
}, nil
}
func max(diffs [4]int) int {
m := diffs[0]
for _, d := range diffs {
if d > m {
m = d
}
}
return m
}
// getImage retrieves and decodes the given image. If the image is cached, this function will
// return the cached version. We choose to cache the decoded image (and not just the downloaded
// image) because the decoding tends to take 3-5x longer than downloading.
func (w *WorkerImpl) getDecodedImage(ctx context.Context, digest types.Digest) (*image.NRGBA, error) {
ctx, span := trace.StartSpan(ctx, "getDecodedImage")
defer span.End()
cache := getImgCache(ctx)
if cache != nil {
if cachedImg, ok := cache.Get(string(digest)); ok {
return cachedImg.(*image.NRGBA), nil
}
}
b, err := w.imageSource.GetImage(ctx, digest)
if err != nil {
return nil, skerr.Wrapf(err, "getting image with digest %s", digest)
}
img, err := decode(ctx, b)
if err != nil {
return nil, skerr.Wrap(err)
}
// In memory, the image takes up 4 bytes per pixel.
s := img.Bounds().Size()
sizeInBytes := int64(s.X * s.Y * 4)
span.AddAttributes(trace.Int64Attribute("size_in_bytes", sizeInBytes))
if cache != nil {
cache.Add(string(digest), img)
}
return img, nil
}
// writeMetrics writes two copies of the provided metrics (one for left-right and one for
// right-left) to the SQL database.
func (w *WorkerImpl) writeMetrics(ctx context.Context, metrics []schema.DiffMetricRow) error {
if len(metrics) == 0 {
return nil
}
ctx, span := trace.StartSpan(ctx, "writeMetrics")
defer span.End()
const baseStatement = `UPSERT INTO DiffMetrics
(left_digest, right_digest, num_pixels_diff, percent_pixels_diff, max_rgba_diffs,
max_channel_diff, combined_metric, dimensions_differ, ts) VALUES `
const valuesPerRow = 9
arguments := make([]interface{}, 0, len(metrics)*valuesPerRow*2)
count := 0
for _, r := range metrics {
count += 2
rgba := make([]int, 4)
copy(rgba, r.MaxRGBADiffs[:])
arguments = append(arguments, r.LeftDigest, r.RightDigest, r.NumPixelsDiff, r.PercentPixelsDiff, rgba,
r.MaxChannelDiff, r.CombinedMetric, r.DimensionsDiffer, r.Timestamp)
arguments = append(arguments, r.RightDigest, r.LeftDigest, r.NumPixelsDiff, r.PercentPixelsDiff, rgba,
r.MaxChannelDiff, r.CombinedMetric, r.DimensionsDiffer, r.Timestamp)
}
vp := sqlutil.ValuesPlaceholders(valuesPerRow, count)
_, err := w.db.Exec(ctx, baseStatement+vp, arguments...)
if err != nil {
return skerr.Wrapf(err, "writing %d metrics to SQL", len(metrics))
}
return nil
}
// reportProblemImage creates or updates a row in the ProblemImages table for the given digest.
func (w *WorkerImpl) reportProblemImage(ctx context.Context, imgErr *imgError) error {
ctx, span := trace.StartSpan(ctx, "reportProblemImage")
defer span.End()
sklog.Warningf("Reporting problem with image %s: %s", imgErr.digest, imgErr.err)
const statement = `
INSERT INTO ProblemImages (digest, num_errors, latest_error, error_ts)
VALUES ($1, $2, $3, $4)
ON CONFLICT (digest)
DO UPDATE SET (num_errors, latest_error, error_ts) =
(ProblemImages.num_errors + 1, $3, $4)`
_, err := w.db.Exec(ctx, statement, imgErr.digest, 1, imgErr.err.Error(), now.Now(ctx))
if err != nil {
return skerr.Wrapf(err, "writing to ProblemImages")
}
return nil
}
// getTriagedDigests returns all triaged digests (positive and negative) for the given grouping
// seen in the given tile or later.
func (w *WorkerImpl) getTriagedDigests(ctx context.Context, grouping paramtools.Params, startingTile schema.TileID) ([]schema.DigestBytes, error) {
ctx, span := trace.StartSpan(ctx, "getTriagedDigests")
defer span.End()
const statement = `WITH
TracesMatchingGrouping AS (
SELECT trace_id FROM Traces WHERE grouping_id = $1
),
RecentDigestsMatchingGrouping AS (
SELECT DISTINCT digest FROM TiledTraceDigests
JOIN TracesMatchingGrouping on TiledTraceDigests.trace_id = TracesMatchingGrouping.trace_id
WHERE TiledTraceDigests.tile_id >= $2
)
SELECT Expectations.digest FROM Expectations
JOIN RecentDigestsMatchingGrouping ON RecentDigestsMatchingGrouping.digest = Expectations.digest
AND Expectations.grouping_id = $1
WHERE Expectations.label = 'p' OR Expectations.label = 'n'`
_, groupingID := sql.SerializeMap(grouping)
rows, err := w.db.Query(ctx, statement, groupingID, startingTile)
if err != nil {
return nil, skerr.Wrap(err)
}
defer rows.Close()
var rv []schema.DigestBytes
for rows.Next() {
var b schema.DigestBytes
if err := rows.Scan(&b); err != nil {
return nil, skerr.Wrap(err)
}
rv = append(rv, b)
}
return rv, nil
}
// getCommonAndRecentDigests returns a set of digests seen on traces for the given grouping that
// are either commonly seen in the current window or recently seen. This approach prevents us from
// calculating too many diffs that "aren't worth it", commonly when there are several flaky traces
// in a grouping.
func (w *WorkerImpl) getCommonAndRecentDigests(ctx context.Context, grouping paramtools.Params) ([]schema.DigestBytes, error) {
ctx, span := trace.StartSpan(ctx, "getCommonAndRecentDigests")
defer span.End()
// Find the first commit in the target range of commits.
row := w.db.QueryRow(ctx, `WITH
RecentCommits AS (
SELECT commit_id FROM CommitsWithData
ORDER BY commit_id DESC LIMIT $1
)
SELECT commit_id FROM RecentCommits
ORDER BY commit_id ASC LIMIT 1`, w.windowSize)
var firstCommitID string
if err := row.Scan(&firstCommitID); err != nil {
if err == pgx.ErrNoRows {
return nil, nil
}
return nil, skerr.Wrap(err)
}
// Find all the traces for the target grouping.
traceIDs, err := w.findTraceIDsForGrouping(ctx, grouping)
if err != nil {
return nil, skerr.Wrap(err)
}
// We used to join the Traces and TraceValues tables, but that turned out to be slow. Filtering
// TraceValues using a list of trace IDs can be faster when the list of trace IDs is large.
// For example, lovisolo@ observed that for a grouping with ~1000 traces, the below query takes
// ~40s when joining the two tables, or ~6s when using "WHERE trace_id IN (id1, id2, ...)".
traceIDPlaceholders := sqlutil.ValuesPlaceholders(1 /* =valuesPerRow */, len(traceIDs))
statement := `WITH
DigestsCountsAndMostRecent AS (
SELECT digest, count(*) as occurrences, max(commit_id) as recent FROM TraceValues
WHERE trace_id IN (` + traceIDPlaceholders + `) AND commit_id >= ` + fmt.Sprintf("$%d", len(traceIDs)+1) + `
GROUP BY digest
),
MostCommon AS (
-- The number 50 here was chosen after some experimentation with the Skia instance.
SELECT digest FROM DigestsCountsAndMostRecent ORDER BY occurrences DESC, recent DESC LIMIT 50
),
MostRecent AS (
-- The number 300 here was chosen after some experimentation with the Skia instance. At the
-- time of writing, a typical grouping had ~1000 traces associated with it. A small percentage
-- of these traces might be flaky. 300 seems like a good balance between making sure we can
-- compute some digests for each trace, without losing to the quadratic cost of computing too
-- many digests.
SELECT digest FROM DigestsCountsAndMostRecent ORDER BY recent DESC, occurrences DESC LIMIT 300
)
-- The UNION operator will remove any duplicate digests
SELECT * FROM MostCommon
UNION
SELECT * FROM MostRecent`
// Build a list of arguments with the IDs of all traces of interest and the firstCommitID.
args := make([]interface{}, 0, len(traceIDs)+1)
for _, traceID := range traceIDs {
args = append(args, traceID)
}
args = append(args, firstCommitID)
// Execute the statement and return the result.
rows, err := w.db.Query(ctx, statement, args...)
if err != nil {
return nil, skerr.Wrap(err)
}
defer rows.Close()
var rv []schema.DigestBytes
for rows.Next() {
var b schema.DigestBytes
if err := rows.Scan(&b); err != nil {
return nil, skerr.Wrap(err)
}
rv = append(rv, b)
}
return rv, nil
}
// findTraceIDsForGrouping returns the trace IDs corresponding to the given grouping.
func (w *WorkerImpl) findTraceIDsForGrouping(ctx context.Context, grouping paramtools.Params) ([]schema.TraceID, error) {
ctx, span := trace.StartSpan(ctx, "findTraceIDsForGrouping")
defer span.End()
_, groupingID := sql.SerializeMap(grouping)
rows, err := w.db.Query(ctx, "SELECT trace_id FROM Traces WHERE grouping_id = $1", groupingID)
if err != nil {
return nil, skerr.Wrap(err)
}
defer rows.Close()
var traceIDs []schema.TraceID
for rows.Next() {
var tid schema.TraceID
if err := rows.Scan(&tid); err != nil {
return nil, skerr.Wrap(err)
}
traceIDs = append(traceIDs, tid)
}
return traceIDs, nil
}
type digestPair struct {
left types.Digest
right types.Digest
}
// newDigestPair returns a digestPair in a "canonical" order, such that left < right. This avoids
// effective duplicates (since comparing left to right is the same right to left).
func newDigestPair(one, two types.Digest) digestPair {
if one < two {
return digestPair{left: one, right: two}
}
return digestPair{left: two, right: one}
}
type imgError struct {
err error
digest types.Digest
}
type contextType string
const imgCacheContextKey contextType = "imgCache"
// addImgCache adds a cache of decoded images to the context, so we can use it in leaf
// functions more easily.
func addImgCache(ctx context.Context, cache *lru.Cache) context.Context {
return context.WithValue(ctx, imgCacheContextKey, cache)
}
func getImgCache(ctx context.Context) *lru.Cache {
c, ok := ctx.Value(imgCacheContextKey).(*lru.Cache)
if !ok {
return nil
}
return c
}
// decode decodes the provided bytes as a PNG and returns them.
func decode(ctx context.Context, b []byte) (*image.NRGBA, error) {
ctx, span := trace.StartSpan(ctx, "decode")
defer span.End()
im, err := png.Decode(bytes.NewReader(b))
if err != nil {
return nil, skerr.Wrap(err)
}
return diff.GetNRGBA(im), nil
}
// Make sure WorkerImpl fulfills the diff.Calculator interface.
var _ diff.Calculator = (*WorkerImpl)(nil)