blob: 4f02878a4e8eee16f904513750aac7521e97dcce [file] [log] [blame]
package web
import (
lru ""
ttlcache ""
search_query ""
const (
// pageSize is the default page size used for pagination.
pageSize = 20
// maxPageSize is the maximum page size used for pagination.
maxPageSize = 100
// These params limit how anonymous (not logged-in) users can hit various endpoints.
// We have two buckets of requests - cheap and expensive. Expensive stuff hits a database
// or similar, where as cheap stuff is cached. These limits are shared by *all* endpoints
// in a given bucket. See for more.
maxAnonQPSExpensive = rate.Limit(0.01)
maxAnonBurstExpensive = 50
maxAnonQPSCheap = rate.Limit(5.0)
maxAnonBurstCheap = 50
// Special settings for RPCs serving the gerrit plugin. See for more.
maxAnonQPSGerritPlugin = rate.Limit(200.0)
maxAnonBurstGerritPlugin = 1000
changelistSummaryCacheSize = 10000
// RPCCallCounterMetric is the metric that should be used when counting how many times a given
// RPC route is called from clients.
RPCCallCounterMetric = "gold_rpc_call_counter"
baselineCachePrimaryBranchEntryTTL = 10 * time.Second
baselineCacheSecondaryBranchEntryTTL = time.Minute
baselineCacheCleanupInterval = 10 * time.Minute
type validateFields int
const (
// FullFrontEnd means all fields should be set
FullFrontEnd validateFields = iota
// BaselineSubset means just the fields needed for BaselineV2Response Server should be set.
// HandlersConfig holds the environment needed by the various http handler functions.
type HandlersConfig struct {
DB *pgxpool.Pool
GCSClient storage.GCSClient
IgnoreStore ignore.Store
ReviewSystems []clstore.ReviewSystem
Search2API search.API
WindowSize int
GroupingParamKeysByCorpus map[string][]string
// Handlers represents all the handlers (e.g. JSON endpoints) of Gold.
// It should be created by clients using NewHandlers.
type Handlers struct {
anonymousExpensiveQuota *rate.Limiter
anonymousCheapQuota *rate.Limiter
anonymousGerritQuota *rate.Limiter
clSummaryCache *lru.Cache
baselineCache *ttlcache.Cache
statusCache frontend.GUIStatus
statusCacheMutex sync.RWMutex
ignoredTracesCache []ignoredTrace
ignoredTracesCacheMutex sync.RWMutex
knownHashesMutex sync.RWMutex
knownHashesCache string
alogin alogin.Login
// NewHandlers returns a new instance of Handlers.
func NewHandlers(conf HandlersConfig, val validateFields, alogin alogin.Login) (*Handlers, error) {
// These fields are required by all types.
if conf.DB == nil {
return nil, skerr.Fmt("Baseliner cannot be nil")
if conf.GCSClient == nil {
return nil, skerr.Fmt("GCSClient cannot be nil")
if val == FullFrontEnd {
if conf.IgnoreStore == nil {
return nil, skerr.Fmt("IgnoreStore cannot be nil")
if conf.Search2API == nil {
return nil, skerr.Fmt("Search2API cannot be nil")
clcache, err := lru.New(changelistSummaryCacheSize)
if err != nil {
return nil, skerr.Wrap(err)
return &Handlers{
HandlersConfig: conf,
anonymousExpensiveQuota: rate.NewLimiter(maxAnonQPSExpensive, maxAnonBurstExpensive),
anonymousCheapQuota: rate.NewLimiter(maxAnonQPSCheap, maxAnonBurstCheap),
anonymousGerritQuota: rate.NewLimiter(maxAnonQPSGerritPlugin, maxAnonBurstGerritPlugin),
clSummaryCache: clcache,
baselineCache: ttlcache.New(baselineCachePrimaryBranchEntryTTL, baselineCacheCleanupInterval),
alogin: alogin,
}, nil
// limitForAnonUsers blocks using the configured rate.Limiter for expensive queries.
func (wh *Handlers) limitForAnonUsers(r *http.Request) error {
if wh.alogin.LoggedInAs(r) != alogin.NotLoggedIn {
return nil
return wh.anonymousExpensiveQuota.Wait(r.Context())
// cheapLimitForAnonUsers blocks using the configured rate.Limiter for cheap queries.
func (wh *Handlers) cheapLimitForAnonUsers(r *http.Request) error {
if wh.alogin.LoggedInAs(r) != alogin.NotLoggedIn {
return nil
return wh.anonymousCheapQuota.Wait(r.Context())
// cheapLimitForGerritPlugin blocks using the configured rate.Limiter for queries for the
// Gerrit Plugin.
func (wh *Handlers) cheapLimitForGerritPlugin(r *http.Request) error {
if wh.alogin.LoggedInAs(r) != alogin.NotLoggedIn {
return nil
return wh.anonymousGerritQuota.Wait(r.Context())
// ByBlameHandler takes the response from the SQL backend's GetBlamesForUntriagedDigests and
// converts it into the same format that the legacy version (v1) produced.
func (wh *Handlers) ByBlameHandler(w http.ResponseWriter, r *http.Request) {
defer metrics2.FuncTimer().Stop()
if err := wh.limitForAnonUsers(r); err != nil {
httputils.ReportError(w, err, "Try again later", http.StatusInternalServerError)
ctx, span := trace.StartSpan(r.Context(), "web_ByBlameHandler", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
// Extract the corpus from the query parameters.
corpus := ""
if v := r.FormValue("query"); v != "" {
if qp, err := url.ParseQuery(v); err != nil {
httputils.ReportError(w, err, "invalid input", http.StatusBadRequest)
} else if corpus = qp.Get(types.CorpusField); corpus == "" {
// If no corpus specified report an error.
http.Error(w, "did not receive value for corpus", http.StatusBadRequest)
} else {
// If no corpus specified report an error.
http.Error(w, "did not receive value for search query", http.StatusBadRequest)
summary, err := wh.Search2API.GetBlamesForUntriagedDigests(ctx, corpus)
if err != nil {
httputils.ReportError(w, err, "Could not compute blames", http.StatusInternalServerError)
result := frontend.ByBlameResponse{}
for _, sr := range summary.Ranges {
entry := frontend.ByBlameEntry{
GroupID: sr.CommitRange,
NDigests: sr.TotalUntriagedDigests,
NTests: len(sr.AffectedGroupings),
Commits: sr.Commits,
var groupings []frontend.TestRollup
for _, gr := range sr.AffectedGroupings {
groupings = append(groupings, frontend.TestRollup{
Grouping: gr.Grouping,
Num: gr.UntriagedDigests,
SampleDigest: gr.SampleDigest,
entry.AffectedTests = groupings
result.Data = append(result.Data, entry)
sendJSONResponse(w, result)
// ChangelistsHandler returns the list of code_review.Changelists that have
// uploaded results to Gold (via TryJobs).
func (wh *Handlers) ChangelistsHandler(w http.ResponseWriter, r *http.Request) {
defer metrics2.FuncTimer().Stop()
ctx, span := trace.StartSpan(r.Context(), "web_ChangelistsHandler", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
if err := wh.cheapLimitForAnonUsers(r); err != nil {
httputils.ReportError(w, err, "Try again later", http.StatusInternalServerError)
values := r.URL.Query()
offset, size, err := httputils.PaginationParams(values, 0, pageSize, maxPageSize)
if err != nil {
httputils.ReportError(w, err, "Invalid pagination params.", http.StatusInternalServerError)
_, activeOnly := values["active"]
cls, pagination, err := wh.getIngestedChangelists2(ctx, offset, size, activeOnly)
if err != nil {
httputils.ReportError(w, err, "Retrieving changelists results failed.", http.StatusInternalServerError)
response := frontend.ChangelistsResponse{
Changelists: cls,
ResponsePagination: pagination,
sendJSONResponse(w, response)
func (wh *Handlers) getIngestedChangelists2(ctx context.Context, offset, size int, activeOnly bool) ([]frontend.Changelist, httputils.ResponsePagination, error) {
ctx, span := trace.StartSpan(ctx, "web_getIngestedChangelists2")
defer span.End()
statement := `SELECT changelist_id, system, status, owner_email, subject, last_ingested_data
FROM Changelists AS OF SYSTEM TIME '-0.1s'`
if activeOnly {
statement += " WHERE status = 'open'"
} else {
// This lets us use the same statusIngestedIndex
statement += " WHERE status = ANY('open', 'landed', 'abandoned')"
statement += ` ORDER BY last_ingested_data DESC OFFSET $1 LIMIT $2`
rows, err := wh.DB.Query(ctx, statement, offset, size)
if err != nil {
return nil, httputils.ResponsePagination{}, skerr.Wrap(err)
defer rows.Close()
var rv []frontend.Changelist
for rows.Next() {
var cl frontend.Changelist
var qCLID string
if err := rows.Scan(&qCLID, &cl.System, &cl.Status, &cl.Owner, &cl.Subject, &cl.Updated); err != nil {
return nil, httputils.ResponsePagination{}, skerr.Wrap(err)
cl.Updated = cl.Updated.UTC()
cl.SystemID = sql.Unqualify(qCLID)
urlTempl := ""
for _, system := range wh.ReviewSystems {
if system.ID == cl.System {
urlTempl = system.URLTemplate
cl.URL = strings.Replace(urlTempl, "%s", cl.SystemID, 1)
rv = append(rv, cl)
pagination := httputils.ResponsePagination{
Offset: offset,
Size: size,
Total: clstore.CountMany, // exact count not important for most day-to-day work.
return rv, pagination, nil
// A list of CI systems we support. So far, the mapping of task ID to link is project agnostic. If
// that stops being the case, then we'll need to supply this mapping on a per-instance basis.
var cisTemplates = map[string]string{
"cirrus": "",
"buildbucket": "",
"buildbucket-internal": "",
// PatchsetsAndTryjobsForCL2 returns a summary of the data we have collected
// for a given Changelist, specifically any TryJobs that have uploaded data
// to Gold belonging to various patchsets in it.
func (wh *Handlers) PatchsetsAndTryjobsForCL2(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "web_PatchsetsAndTryjobsForCL2", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
if err := wh.cheapLimitForAnonUsers(r); err != nil {
httputils.ReportError(w, err, "Try again later", http.StatusInternalServerError)
clID := chi.URLParam(r, "id")
if clID == "" {
http.Error(w, "Must specify 'id' of Changelist.", http.StatusBadRequest)
crs := chi.URLParam(r, "system")
if crs == "" {
http.Error(w, "Must specify 'system' of Changelist.", http.StatusBadRequest)
rv, err := wh.getPatchsetsAndTryjobs(ctx, crs, clID)
if err != nil {
httputils.ReportError(w, err, "could not retrieve data for the specified CL.", http.StatusInternalServerError)
sendJSONResponse(w, rv)
// getPatchsetsAndTryjobs returns a summary of the patchsets and tryjobs that belong to a given
// CL.
func (wh *Handlers) getPatchsetsAndTryjobs(ctx context.Context, crs, clID string) (frontend.ChangelistSummary, error) {
ctx, span := trace.StartSpan(ctx, "getPatchsetsAndTryjobs")
defer span.End()
system, ok := wh.getCodeReviewSystem(crs)
if !ok {
return frontend.ChangelistSummary{}, skerr.Fmt("Invalid Code Review System %q", crs)
qCLID := sql.Qualify(crs, clID)
row := wh.DB.QueryRow(ctx, `SELECT status, owner_email, subject, last_ingested_data FROM Changelists
WHERE changelist_id = $1`, qCLID)
var cl frontend.Changelist
if err := row.Scan(&cl.Status, &cl.Owner, &cl.Subject, &cl.Updated); err != nil {
return frontend.ChangelistSummary{}, skerr.Wrapf(err, "checking if CL %q exists", qCLID)
cl.Updated = cl.Updated.UTC()
cl.SystemID = clID
cl.System = crs
cl.URL = strings.Replace(system.URLTemplate, "%s", cl.SystemID, 1)
rv := frontend.ChangelistSummary{CL: cl}
const statement = `SELECT Patchsets.patchset_id, Patchsets.ps_order,
tryjob_id, display_name, Tryjobs.last_ingested_data, Tryjobs.system FROM
Tryjobs JOIN Patchsets ON Tryjobs.patchset_id = Patchsets.patchset_id
WHERE Tryjobs.changelist_id = $1
ORDER BY Patchsets.patchset_id
rows, err := wh.DB.Query(ctx, statement, qCLID)
if err != nil {
return frontend.ChangelistSummary{}, skerr.Wrap(err)
defer rows.Close()
var patchsets []*frontend.Patchset
var currentPS *frontend.Patchset
for rows.Next() {
var psID string
var order int
var tj frontend.TryJob
if err := rows.Scan(&psID, &order, &tj.SystemID, &tj.DisplayName, &tj.Updated, &tj.System); err != nil {
return frontend.ChangelistSummary{}, skerr.Wrap(err)
tj.Updated = tj.Updated.UTC()
urlTempl, ok := cisTemplates[tj.System]
if !ok {
return frontend.ChangelistSummary{}, skerr.Fmt("Unrecognized CIS system: %q", tj.System)
tj.URL = strings.Replace(urlTempl, "%s", sql.Unqualify(tj.SystemID), 1)
if currentPS == nil || currentPS.SystemID != psID {
currentPS = &frontend.Patchset{
SystemID: psID,
Order: order,
patchsets = append(patchsets, currentPS)
currentPS.TryJobs = append(currentPS.TryJobs, tj)
rv.Patchsets = make([]frontend.Patchset, 0, len(patchsets)) // ensure non-nil slice
for _, ps := range patchsets {
rv.Patchsets = append(rv.Patchsets, *ps)
rv.NumTotalPatchsets = len(rv.Patchsets)
sort.Slice(rv.Patchsets, func(i, j int) bool {
return rv.Patchsets[i].Order > rv.Patchsets[j].Order
return rv, nil
// SearchHandler searches the data in the new SQL backend. It times out after 3 minutes, to prevent
// outstanding requests from growing unbounded.
func (wh *Handlers) SearchHandler(w http.ResponseWriter, r *http.Request) {
defer metrics2.FuncTimer().Stop()
if err := wh.limitForAnonUsers(r); err != nil {
httputils.ReportError(w, err, "Try again later", http.StatusInternalServerError)
q, ok := parseSearchQuery(w, r)
if !ok {
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Minute)
defer cancel()
ctx, span := trace.StartSpan(ctx, "web_SearchHandler", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
searchResponse, err := wh.Search2API.Search(ctx, q)
if err != nil {
httputils.ReportError(w, err, "Search for digests failed in the SQL backend.", http.StatusInternalServerError)
sendJSONResponse(w, searchResponse)
// parseSearchQuery extracts the search query from request.
func parseSearchQuery(w http.ResponseWriter, r *http.Request) (*search_query.Search, bool) {
q := search_query.Search{Limit: 50}
if err := search_query.ParseSearch(r, &q); err != nil {
httputils.ReportError(w, err, "Search for digests failed.", http.StatusInternalServerError)
return nil, false
// Currently, the frontend includes the corpus as a right trace value. That's really a no-op
// because that info (and the test name) are specified in the grouping. As such, we delete
// those so they don't cause us to go into a slow path accounting for keys when we do not
// need to.
// TODO(kjlubick) Make the frontend not supply these.
delete(q.RightTraceValues, types.CorpusField)
delete(q.RightTraceValues, types.PrimaryKeyField)
return &q, true
// DetailsHandler returns the details about a single digest.
func (wh *Handlers) DetailsHandler(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "web_DetailsHandler", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
if err := wh.cheapLimitForAnonUsers(r); err != nil {
httputils.ReportError(w, err, "Try again later", http.StatusInternalServerError)
req := frontend.DetailsRequest{}
if err := parseJSON(r, &req); err != nil {
httputils.ReportError(w, err, "Failed to parse JSON request.", http.StatusBadRequest)
sklog.Infof("Details request: %#v", req)
if len(req.Grouping) == 0 {
http.Error(w, "Grouping cannot be empty.", http.StatusBadRequest)
if !validation.IsValidDigest(string(req.Digest)) {
http.Error(w, "Invalid digest.", http.StatusBadRequest)
if req.CodeReviewSystem != "" && req.ChangelistID != "" {
if _, ok := wh.getCodeReviewSystem(req.CodeReviewSystem); !ok {
http.Error(w, "Invalid code review system.", http.StatusBadRequest)
ret, err := wh.Search2API.GetDigestDetails(ctx, req.Grouping, types.Digest(req.Digest), req.ChangelistID, req.CodeReviewSystem)
if err != nil {
httputils.ReportError(w, err, "Unable to get digest details.", http.StatusInternalServerError)
sendJSONResponse(w, ret)
// getGroupingForTest acts as a bridge for RPCs that only take in a test name, when they should
// be taking in a grouping. It looks up the grouping by test name and returns it.
// TODO(kjlubick) Migrate all RPCs and remove this function.
func (wh *Handlers) getGroupingForTest(ctx context.Context, testName string) (paramtools.Params, error) {
ctx, span := trace.StartSpan(ctx, "getGroupingForTest")
defer span.End()
const statement = `SELECT keys FROM Groupings WHERE keys->'name' = $1 LIMIT 1`
// Need to wrap testName with quotes to make it "valid JSON", so we can use the inverted index
// on keys.
row := wh.DB.QueryRow(ctx, statement, `"`+testName+`"`)
var ps paramtools.Params
if err := row.Scan(&ps); err != nil {
return nil, skerr.Wrapf(err, "looking up grouping for test name %q", testName)
return ps, nil
// DiffHandler compares two digests and returns that information along with triage data.
func (wh *Handlers) DiffHandler(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "web_DiffHandler", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
req := frontend.DiffRequest{}
if err := parseJSON(r, &req); err != nil {
httputils.ReportError(w, err, "Failed to parse JSON request.", http.StatusBadRequest)
sklog.Infof("Diff request: %#v", req)
if len(req.Grouping) == 0 {
http.Error(w, "Grouping cannot be empty.", http.StatusBadRequest)
if !validation.IsValidDigest(string(req.LeftDigest)) {
http.Error(w, "Invalid left digest.", http.StatusBadRequest)
if !validation.IsValidDigest(string(req.RightDigest)) {
http.Error(w, "Invalid right digest.", http.StatusBadRequest)
if req.CodeReviewSystem != "" && req.ChangelistID != "" {
if _, ok := wh.getCodeReviewSystem(req.CodeReviewSystem); !ok {
http.Error(w, "Invalid code review system.", http.StatusBadRequest)
ret, err := wh.Search2API.GetDigestsDiff(ctx, req.Grouping, req.LeftDigest, req.RightDigest, req.ChangelistID, req.CodeReviewSystem)
if err != nil {
httputils.ReportError(w, err, "Unable to get diff for digests.", http.StatusInternalServerError)
sendJSONResponse(w, ret)
// ListIgnoreRules2 returns the current ignore rules in JSON format and the counts of
// how many traces they affect.
func (wh *Handlers) ListIgnoreRules2(w http.ResponseWriter, r *http.Request) {
defer metrics2.FuncTimer().Stop()
ctx, span := trace.StartSpan(r.Context(), "web_ListIgnoreRules2", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
if err := wh.limitForAnonUsers(r); err != nil {
httputils.ReportError(w, err, "Try again later", http.StatusInternalServerError)
ignores, err := wh.getIgnores2(ctx)
if err != nil {
httputils.ReportError(w, err, "Failed to retrieve ignore rules, there may be none.", http.StatusInternalServerError)
response := frontend.IgnoresResponse{
Rules: ignores,
sendJSONResponse(w, response)
// getIgnores2 fetches all ignore rules and converts them into the frontend format. It will add the
// trace counts for each rule.
func (wh *Handlers) getIgnores2(ctx context.Context) ([]frontend.IgnoreRule, error) {
ctx, span := trace.StartSpan(ctx, "getIgnores2")
defer span.End()
rules, err := wh.IgnoreStore.List(ctx)
if err != nil {
return nil, skerr.Wrapf(err, "fetching ignores from store")
ret := make([]frontend.IgnoreRule, 0, len(rules))
for _, r := range rules {
fr, err := frontend.ConvertIgnoreRule(r)
if err != nil {
return nil, skerr.Wrap(err)
ret = append(ret, fr)
// addIgnoreCounts updates the values of ret directly
if err := wh.addIgnoreCounts2(ctx, ret); err != nil {
return nil, skerr.Wrapf(err, "adding ignore counts to %d rules", len(ret))
return ret, nil
// addIgnoreCounts2 fetches all ignored traces from the SQL DB and then goes through all the ignore
// rules and figures out which rules applied to each of those traces. This allows us to count how
// many traces each rule affects and how many are exclusively impacted by a given rule.
func (wh *Handlers) addIgnoreCounts2(ctx context.Context, rules []frontend.IgnoreRule) error {
ctx, span := trace.StartSpan(ctx, "addIgnoreCounts2")
defer span.End()
type counts struct {
Count int
UntriagedCount int
ExclusiveCount int
ExclusiveUntriagedCount int
ruleCounts := make([]counts, len(rules))
defer wh.ignoredTracesCacheMutex.RUnlock()
for _, tr := range wh.ignoredTracesCache {
idxMatched, untIdxMatched := -1, -1
numMatched, untMatched := 0, 0
for i, r := range rules {
if paramtools.ParamSet(r.ParsedQuery).MatchesParams(tr.Keys) {
idxMatched = i
// Check to see if the digest is untriaged at head
if tr.Label == expectations.Untriaged {
untIdxMatched = i
// Check for any exclusive matches
if numMatched == 1 {
if untMatched == 1 {
for i := range rules {
(&rules[i]).Count += ruleCounts[i].Count
(&rules[i]).UntriagedCount += ruleCounts[i].UntriagedCount
(&rules[i]).ExclusiveCount += ruleCounts[i].ExclusiveCount
(&rules[i]).ExclusiveUntriagedCount += ruleCounts[i].ExclusiveUntriagedCount
return nil
// UpdateIgnoreRule updates an existing ignores rule.
func (wh *Handlers) UpdateIgnoreRule(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "web_UpdateIgnoreRule", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
user := wh.alogin.LoggedInAs(r)
if user == "" {
http.Error(w, "You must be logged in to update an ignore rule.", http.StatusUnauthorized)
if !wh.alogin.HasRole(r, roles.Editor) {
http.Error(w, "You must be logged in as an editor to change ignore rules", http.StatusUnauthorized)
id := chi.URLParam(r, "id")
if id == "" {
http.Error(w, "ID must be non-empty.", http.StatusBadRequest)
expiresInterval, irb, err := getValidatedIgnoreRule(r)
if err != nil {
httputils.ReportError(w, err, "invalid ignore rule input", http.StatusBadRequest)
ts := now.Now(ctx)
ignoreRule := ignore.NewRule(user.String(), ts.Add(expiresInterval), irb.Filter, irb.Note)
ignoreRule.ID = id
if err := wh.IgnoreStore.Update(ctx, ignoreRule); err != nil {
httputils.ReportError(w, err, "Unable to update ignore rule", http.StatusInternalServerError)
sklog.Infof("Successfully updated ignore with id %s", id)
sendJSONResponse(w, map[string]string{"updated": "true"})
// getValidatedIgnoreRule parses the JSON from the given request into an IgnoreRuleBody. As a
// convenience, the duration as a time.Duration is returned.
func getValidatedIgnoreRule(r *http.Request) (time.Duration, frontend.IgnoreRuleBody, error) {
irb := frontend.IgnoreRuleBody{}
if err := parseJSON(r, &irb); err != nil {
return 0, irb, skerr.Wrapf(err, "reading request JSON")
if irb.Filter == "" {
return 0, irb, skerr.Fmt("must supply a filter")
// If a user accidentally includes a huge amount of text, we'd like to catch that here.
if len(irb.Filter) >= 10*1024 {
return 0, irb, skerr.Fmt("Filter must be < 10 KB")
if len(irb.Note) >= 1024 {
return 0, irb, skerr.Fmt("Note must be < 1 KB")
d, err := human.ParseDuration(irb.Duration)
if err != nil {
return 0, irb, skerr.Wrapf(err, "invalid duration")
return d, irb, nil
// DeleteIgnoreRule deletes an existing ignores rule.
func (wh *Handlers) DeleteIgnoreRule(w http.ResponseWriter, r *http.Request) {
user := wh.alogin.LoggedInAs(r)
if user == alogin.NotLoggedIn {
http.Error(w, "You must be logged in to delete an ignore rule", http.StatusUnauthorized)
if !wh.alogin.HasRole(r, roles.Editor) {
http.Error(w, "You must be logged in as an editor to change ignore rules", http.StatusUnauthorized)
ctx, span := trace.StartSpan(r.Context(), "web_DeleteIgnoreRule", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
id := chi.URLParam(r, "id")
if id == "" {
http.Error(w, "ID must be non-empty.", http.StatusBadRequest)
if err := wh.IgnoreStore.Delete(ctx, id); err != nil {
httputils.ReportError(w, err, "Unable to delete ignore rule", http.StatusInternalServerError)
sklog.Infof("Successfully deleted ignore with id %s", id)
sendJSONResponse(w, map[string]string{"deleted": "true"})
// AddIgnoreRule is for adding a new ignore rule.
func (wh *Handlers) AddIgnoreRule(w http.ResponseWriter, r *http.Request) {
user := wh.alogin.LoggedInAs(r)
if user == alogin.NotLoggedIn {
http.Error(w, "You must be logged in to add an ignore rule", http.StatusUnauthorized)
if !wh.alogin.HasRole(r, roles.Editor) {
http.Error(w, "You must be logged in as an editor to add ignore rules", http.StatusUnauthorized)
ctx, span := trace.StartSpan(r.Context(), "web_AddIgnoreRule", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
expiresInterval, irb, err := getValidatedIgnoreRule(r)
if err != nil {
httputils.ReportError(w, err, "invalid ignore rule input", http.StatusBadRequest)
ts := now.Now(ctx)
ignoreRule := ignore.NewRule(user.String(), ts.Add(expiresInterval), irb.Filter, irb.Note)
if err := wh.IgnoreStore.Create(ctx, ignoreRule); err != nil {
httputils.ReportError(w, err, "Failed to create ignore rule", http.StatusInternalServerError)
sklog.Infof("Successfully added ignore from %s", user)
sendJSONResponse(w, map[string]string{"added": "true"})
// TriageHandlerV2 handles a request to change the triage status of one or more
// digests of one test.
// It accepts a POST'd JSON serialization of TriageRequest and updates
// the expectations.
// TODO(kjlubick) In V3, this should take groupings, not test names. Additionally, to avoid race
// conditions where users triage the same thing at the same time, the request should include
// before and after. Finally, to avoid confusion on CLs, we should fail to apply changes
// on closed CLs (
func (wh *Handlers) TriageHandlerV2(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "web_TriageHandlerV2", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
user := wh.alogin.LoggedInAs(r)
if user == alogin.NotLoggedIn {
http.Error(w, "You must be logged in to triage.", http.StatusUnauthorized)
if !wh.alogin.HasRole(r, roles.Editor) {
http.Error(w, "You must be logged in as an editor to change expectations", http.StatusUnauthorized)
req := frontend.TriageRequestV2{}
if err := parseJSON(r, &req); err != nil {
httputils.ReportError(w, err, "Failed to parse JSON request.", http.StatusBadRequest)
sklog.Infof("Triage v2 request: %#v", req)
if err := wh.triage2(ctx, user.String(), req); err != nil {
httputils.ReportError(w, err, "Could not triage", http.StatusInternalServerError)
// Nothing to return, so just set 200
func (wh *Handlers) triage2(ctx context.Context, userID string, req frontend.TriageRequestV2) error {
ctx, span := trace.StartSpan(ctx, "triage2")
defer span.End()
branch := ""
if req.ChangelistID != "" && req.CodeReviewSystem != "" {
branch = sql.Qualify(req.CodeReviewSystem, req.ChangelistID)
// If set, use the image matching algorithm's name as the author of this change.
if req.ImageMatchingAlgorithm != "" {
userID = req.ImageMatchingAlgorithm
allDeltas, err := wh.convertToDeltas(ctx, req)
if err != nil {
return skerr.Wrapf(err, "getting groupings")
if len(allDeltas) == 0 {
return nil
span.AddAttributes(trace.Int64Attribute("num_changes", int64(len(allDeltas))))
// If this number is too big, the query can take a long time to land (many retries) and in
// extreme cases, exceed the number of parameters a SQL query can support.
const maxTriageBatchSize = 1000
return util.ChunkIter(len(allDeltas), maxTriageBatchSize, func(startIdx int, endIdx int) error {
deltas := allDeltas[startIdx:endIdx]
err = crdbpgx.ExecuteTx(ctx, wh.DB, pgx.TxOptions{}, func(tx pgx.Tx) error {
newRecordID, err := writeRecord(ctx, tx, userID, len(deltas), branch)
if err != nil {
return err
err = fillPreviousLabel(ctx, tx, deltas, newRecordID)
if err != nil {
return err
err = writeDeltas(ctx, tx, deltas)
if err != nil {
return err
if branch == "" {
return applyDeltasToPrimary(ctx, tx, deltas)
return applyDeltasToBranch(ctx, tx, deltas, branch)
if err != nil {
return skerr.Wrapf(err, "writing %d expectations from %s to branch %q", len(deltas), userID, branch)
return nil
// convertToDeltas converts in triage request (a map) into a slice of deltas. These deltas are
// partially filled out, with only the
func (wh *Handlers) convertToDeltas(ctx context.Context, req frontend.TriageRequestV2) ([]schema.ExpectationDeltaRow, error) {
rv := make([]schema.ExpectationDeltaRow, 0, len(req.TestDigestStatus))
for test, digests := range req.TestDigestStatus {
for d, label := range digests {
if label == "" {
// Empty string means the frontend didn't have a closest digest to use when making a
// "bulk triage to the closest digest" request. It's easier to catch this on the
// server side than make the JS check for empty string and mutate the POST body.
if !expectations.ValidLabel(label) {
return nil, skerr.Fmt("invalid label %q in triage request", label)
labelAfter := schema.FromExpectationLabel(label)
grouping, err := wh.getGroupingForTest(ctx, string(test))
if err != nil {
return nil, skerr.Wrap(err)
_, groupingID := sql.SerializeMap(grouping)
digestBytes, err := sql.DigestToBytes(d)
if err != nil {
return nil, skerr.Wrap(err)
rv = append(rv, schema.ExpectationDeltaRow{
GroupingID: groupingID,
Digest: digestBytes,
LabelAfter: labelAfter,
return rv, nil
// fillPreviousLabel looks up all the expectations for the partially filled-out deltas passed in
// and updates those in-place. It only pulls labels from the primary branch, as this is not meant
// for long term use (see notes for getting to V3 triage).
func fillPreviousLabel(ctx context.Context, tx pgx.Tx, deltas []schema.ExpectationDeltaRow, newRecordID uuid.UUID) error {
ctx, span := trace.StartSpan(ctx, "fillPreviousLabel")
defer span.End()
type expectationKey struct {
groupingID schema.MD5Hash
digest schema.MD5Hash
toUpdate := map[expectationKey]*schema.ExpectationDeltaRow{}
for i := range deltas {
deltas[i].ExpectationRecordID = newRecordID
deltas[i].LabelBefore = schema.LabelUntriaged
groupingID: sql.AsMD5Hash(deltas[i].GroupingID),
digest: sql.AsMD5Hash(deltas[i].Digest),
}] = &deltas[i]
statement := `SELECT grouping_id, digest, label FROM Expectations WHERE `
// We should be safe from injection attacks because we are hex encoding known valid byte arrays.
// I couldn't find a better way to match multiple composite keys using our usual techniques
// involving placeholders.
for i, d := range deltas {
if i != 0 {
statement += " OR "
statement += fmt.Sprintf(`(grouping_id = x'%x' AND digest = x'%x')`, d.GroupingID, d.Digest)
rows, err := tx.Query(ctx, statement)
if err != nil {
return err // don't wrap, could be retried
defer rows.Close()
for rows.Next() {
var gID schema.GroupingID
var d schema.DigestBytes
var label schema.ExpectationLabel
if err := rows.Scan(&gID, &d, &label); err != nil {
return skerr.Wrap(err) // probably not retryable
ek := expectationKey{
groupingID: sql.AsMD5Hash(gID),
digest: sql.AsMD5Hash(d),
row := toUpdate[ek]
if row == nil {
sklog.Warningf("Unmatched row with grouping %x and digest %x", gID, d)
continue // should never happen
row.LabelBefore = label
return nil
// TriageHandlerV3 handles a request to change the triage status of one or more digests.
func (wh *Handlers) TriageHandlerV3(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "web_TriageHandlerV3", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
user := wh.alogin.LoggedInAs(r)
if user == alogin.NotLoggedIn {
http.Error(w, "You must be logged in to triage.", http.StatusUnauthorized)
if !wh.alogin.HasRole(r, roles.Editor) {
http.Error(w, "You must be logged in as an editor to change expectations", http.StatusUnauthorized)
req := frontend.TriageRequestV3{}
if err := parseJSON(r, &req); err != nil {
httputils.ReportError(w, err, "Failed to parse JSON request.", http.StatusBadRequest)
sklog.Infof("Triage v3 request: %#v", req)
res, err := wh.triage3(ctx, user.String(), req)
if err != nil {
httputils.ReportError(w, err, "Could not triage", http.StatusInternalServerError)
sendJSONResponse(w, res)
func (wh *Handlers) triage3(ctx context.Context, userID string, req frontend.TriageRequestV3) (frontend.TriageResponse, error) {
ctx, span := trace.StartSpan(ctx, "triage3")
defer span.End()
branch := ""
if req.ChangelistID != "" && req.CodeReviewSystem != "" {
branch = sql.Qualify(req.CodeReviewSystem, req.ChangelistID)
// We disallow changes on closed CLs to avoid confusion (
const statement = "SELECT status FROM Changelists WHERE changelist_id = $1"
row := wh.DB.QueryRow(ctx, statement, branch)
var cl schema.ChangelistRow
if err := row.Scan(&cl.Status); err != nil {
return frontend.TriageResponse{}, skerr.Wrapf(err, "querying status of changelist (changelist ID %q, CRS %q)", req.ChangelistID, req.CodeReviewSystem)
if cl.Status != schema.StatusOpen {
return frontend.TriageResponse{}, skerr.Fmt("triaging digests from non-open changelists is not allowed (changelist ID %q, CRS %q, status %q)", req.ChangelistID, req.CodeReviewSystem, cl.Status)
// If set, use the image matching algorithm's name as the author of this change.
if req.ImageMatchingAlgorithm != "" {
userID = req.ImageMatchingAlgorithm
allDeltas, err := convertTriageDeltasToExpectationDeltaRows(req.Deltas)
if err != nil {
return frontend.TriageResponse{}, skerr.Wrapf(err, "converting TriageDeltas to ExpectationDeltaRows")
if len(allDeltas) == 0 {
return frontend.TriageResponse{Status: frontend.TriageResponseStatusOK}, nil
span.AddAttributes(trace.Int64Attribute("num_changes", int64(len(allDeltas))))
// If this number is too big, the query can take a long time to land (many retries) and in
// extreme cases, exceed the number of parameters a SQL query can support.
const maxTriageBatchSize = 1000
err = util.ChunkIter(len(allDeltas), maxTriageBatchSize, func(startIdx int, endIdx int) error {
deltas := allDeltas[startIdx:endIdx]
return crdbpgx.ExecuteTx(ctx, wh.DB, pgx.TxOptions{}, func(tx pgx.Tx) error {
if err := verifyExpectationDeltaRowsLabelBefore(ctx, tx, deltas, branch); err != nil {
// Could be a triageConflictError if any of the LabelBefore fields do not match
// their expected value. This error is handled outside of the transaction.
return err
newRecordID, err := writeRecord(ctx, tx, userID, len(deltas), branch)
if err != nil {
return err
for i := range deltas {
deltas[i].ExpectationRecordID = newRecordID
err = writeDeltas(ctx, tx, deltas)
if err != nil {
return err
if branch == "" {
return applyDeltasToPrimary(ctx, tx, deltas)
return applyDeltasToBranch(ctx, tx, deltas, branch)
if err != nil {
// If any of the deltas' LabelBefore do not match the corresponding entries in the
// Expectations or SecondaryBranchExpectations tables, we send a meaningful error response
// to the frontend so that we can properly report the triage conflict in the UI.
var tce *triageConflictError
if errors.As(err, &tce) {
grouping, err := wh.lookupGrouping(ctx, tce.GroupingID)
if err != nil {
return frontend.TriageResponse{}, skerr.Wrap(err)
return frontend.TriageResponse{
Status: frontend.TriageResponseStatusConflict,
Conflict: frontend.TriageConflict{
Grouping: grouping,
Digest: types.Digest(hex.EncodeToString(tce.Digest)),
ExpectedLabelBefore: tce.ExpectedLabelBefore.ToExpectation(),
ActualLabelBefore: tce.ActualLabelBefore.ToExpectation(),
}, nil
return frontend.TriageResponse{}, skerr.Wrapf(err, "writing %d expectations from %s to branch %q", len(allDeltas), userID, branch)
return frontend.TriageResponse{Status: frontend.TriageResponseStatusOK}, nil
// convertTriageDeltasToExpectationDeltaRows converts frontend.TriageDelta structs to
// schema.ExpectationDeltaRow structs.
func convertTriageDeltasToExpectationDeltaRows(deltas []frontend.TriageDelta) ([]schema.ExpectationDeltaRow, error) {
rv := make([]schema.ExpectationDeltaRow, 0, len(deltas))
for _, delta := range deltas {
if !expectations.ValidLabel(delta.LabelBefore) {
return nil, skerr.Fmt("invalid LabelBefore %q in triage request", delta.LabelBefore)
if !expectations.ValidLabel(delta.LabelAfter) {
return nil, skerr.Fmt("invalid LabelAfter %q in triage request", delta.LabelAfter)
labelBefore := schema.FromExpectationLabel(delta.LabelBefore)
labelAfter := schema.FromExpectationLabel(delta.LabelAfter)
_, groupingID := sql.SerializeMap(delta.Grouping)
digestBytes, err := sql.DigestToBytes(delta.Digest)
if err != nil {
return nil, skerr.Wrap(err)
rv = append(rv, schema.ExpectationDeltaRow{
GroupingID: groupingID,
Digest: digestBytes,
LabelBefore: labelBefore,
LabelAfter: labelAfter,
return rv, nil
// triageConflictError is an error returned by the verifyExpectationDeltaRowsLabelBefore method. It
// contains the necessary information to construct a meaningful error response to return to the
// frontend.
type triageConflictError struct {
GroupingID schema.GroupingID
Digest schema.DigestBytes
ExpectedLabelBefore schema.ExpectationLabel
ActualLabelBefore schema.ExpectationLabel
func (e *triageConflictError) Error() string {
return fmt.Sprintf("expected LabelBefore for grouping %x and digest %x to be %s, was %s", e.GroupingID, e.Digest, e.ExpectedLabelBefore, e.ActualLabelBefore)
// groupingIDAndDigest is a (grouping ID, digest) pair.
type groupingIDAndDigest struct {
groupingID schema.MD5Hash
digest schema.MD5Hash
// verifyExpectationDeltaRowsLabelBefore verifies that the LabelBefore column of each passed in
// schema.ExpectationDeltaRow matches the Label column of the corresponding entry in the
// Expectations or SecondaryBranchExpectations table. If no entry is found, we check that the
// LabelBefore is untriaged. This function prevents race conditions where multiple Gold users might
// attempt to triage the same digest.
// If branchName is empty, we only check against the Expectations table.
// If branchName is not empty (e.g. when triaging digests from a CL), we first check the
// SecondaryBranchExpectations table, and if there is no corresponding entry, we check against the
// Expectations table.
// If the LabelBefore of a schema.ExpectationDeltaRow does not match the expected label, a
// triageConflictError is returned.
func verifyExpectationDeltaRowsLabelBefore(ctx context.Context, tx pgx.Tx, deltaRows []schema.ExpectationDeltaRow, branchName string) error {
ctx, span := trace.StartSpan(ctx, "verifyExpectationDeltaRowsLabelBefore")
defer span.End()
// Put the deltaRows in a map keyed by grouping ID and digest for easier querying.
deltaRowsMap := map[groupingIDAndDigest]*schema.ExpectationDeltaRow{}
for i := range deltaRows {
key := groupingIDAndDigest{
groupingID: sql.AsMD5Hash(deltaRows[i].GroupingID),
digest: sql.AsMD5Hash(deltaRows[i].Digest),
deltaRowsMap[key] = &deltaRows[i]
// Check the deltaRows' LabelBefore columns against the corresponding table.
var (
verifiedDeltaRows map[groupingIDAndDigest]bool
err error
if branchName == "" {
verifiedDeltaRows, err = verifyPrimaryBranchLabelBefore(ctx, tx, deltaRowsMap)
} else {
verifiedDeltaRows, err = verifySecondaryBranchLabelBefore(ctx, tx, branchName, deltaRowsMap)
if err != nil {
return err // Don't wrap - crdbpgx might retry
// If any of the deltaRows did not have a matching entry in the Expectations or
// SecondaryBranchExpectations tables, check that their LabelBefore columns are set to
// "untriaged".
for key, deltaRow := range deltaRowsMap {
if !verifiedDeltaRows[key] && deltaRow.LabelBefore != schema.LabelUntriaged {
return &triageConflictError{
GroupingID: deltaRow.GroupingID,
Digest: deltaRow.Digest,
ExpectedLabelBefore: schema.LabelUntriaged,
ActualLabelBefore: deltaRow.LabelBefore,
return nil
// makeGroupingAndDigestWhereClause builds the part of a "WHERE" clause that filters by grouping ID
// and digest. It returns the SQL clause and a list of parameter values.
func makeGroupingAndDigestWhereClause(deltaRows map[groupingIDAndDigest]*schema.ExpectationDeltaRow, startingPlaceholderNum int) (string, []interface{}) {
var parts []string
args := make([]interface{}, 0, 2*len(deltaRows))
placeholderNum := startingPlaceholderNum
for _, deltaRow := range deltaRows {
parts = append(parts, fmt.Sprintf("(grouping_id = $%d AND digest = $%d)", placeholderNum, placeholderNum+1))
args = append(args, deltaRow.GroupingID, deltaRow.Digest)
placeholderNum += 2
sort.Strings(parts) // Make the query string deterministic for easier debugging.
return strings.Join(parts, " OR "), args
// verifyPrimaryBranchLabelBefore verifies that the LabelBefore of each given ExpectationDeltaRow
// matches the label of the corresponding row in the Expectations table, if any. If the labels
// do not match, it returns a triageConflictError.
// It returns a set with one (grouping ID, digest) pair for each ExpectationDeltaRow it was able to
// verify, i.e. those with a corresponding row in the Expectations table.
func verifyPrimaryBranchLabelBefore(ctx context.Context, tx pgx.Tx, deltaRows map[groupingIDAndDigest]*schema.ExpectationDeltaRow) (map[groupingIDAndDigest]bool, error) {
whereClause, whereArgs := makeGroupingAndDigestWhereClause(deltaRows, 1)
statement := "SELECT grouping_id, digest, label FROM Expectations WHERE " + whereClause
rows, err := tx.Query(ctx, statement, whereArgs...)
if err != nil {
return nil, err // Don't wrap - crdbpgx might retry
defer rows.Close()
// Check that the LabelBefore of each ExpectationDeltaRow matches the label of the
// corresponding row in the Expectations table, if any.
verifiedDeltaRows := map[groupingIDAndDigest]bool{}
for rows.Next() {
var groupingID schema.GroupingID
var digest schema.DigestBytes
var label schema.ExpectationLabel
if err := rows.Scan(&groupingID, &digest, &label); err != nil {
return nil, err
key := groupingIDAndDigest{
groupingID: sql.AsMD5Hash(groupingID),
digest: sql.AsMD5Hash(digest),
deltaRow := deltaRows[key]
if deltaRow == nil {
sklog.Warningf("Unmatched row with grouping %x and digest %x.", groupingID, digest)
continue // Should never happen.
if label != deltaRow.LabelBefore {
return nil, &triageConflictError{
GroupingID: groupingID,
Digest: digest,
ExpectedLabelBefore: label,
ActualLabelBefore: deltaRow.LabelBefore,
verifiedDeltaRows[key] = true
return verifiedDeltaRows, nil
// verifySecondaryBranchLabelBefore verifies that the LabelBefore of each given ExpectationDeltaRow
// matches the label of the corresponding row in the SecondaryBranchExpectations table. If there is
// no such row, it does the same against the corresponding row in the Expectations table, if any.
// If the LabelBefore does not match the label of the corresponding row in either table, it returns
// a triageConflictError.
// It returns a set with one (grouping ID, digest) pair for each ExpectationDeltaRow it was able to
// verify, i.e. those with a corresponding row in the SecondaryBranchExpectations or Expectations
// table.
func verifySecondaryBranchLabelBefore(ctx context.Context, tx pgx.Tx, branchName string, deltaRows map[groupingIDAndDigest]*schema.ExpectationDeltaRow) (map[groupingIDAndDigest]bool, error) {
// Gather the relevant labels from the Expectations table.
primaryBranchLabels := map[groupingIDAndDigest]schema.ExpectationLabel{}
whereClause, whereArgs := makeGroupingAndDigestWhereClause(deltaRows, 1)
statement := "SELECT grouping_id, digest, label FROM Expectations WHERE " + whereClause
rows, err := tx.Query(ctx, statement, whereArgs...)
if err != nil {
return nil, err // Don't wrap - crdbpgx might retry
defer rows.Close()
for rows.Next() {
var groupingID schema.GroupingID
var digest schema.DigestBytes
var label schema.ExpectationLabel
if err := rows.Scan(&groupingID, &digest, &label); err != nil {
return nil, err
groupingID: sql.AsMD5Hash(groupingID),
digest: sql.AsMD5Hash(digest),
}] = label
// Gather the relevant labels from the SecondaryBranchExpectations table.
secondaryBranchLabels := map[groupingIDAndDigest]schema.ExpectationLabel{}
whereClause, whereArgs = makeGroupingAndDigestWhereClause(deltaRows, 2)
statement = `
SELECT grouping_id,
FROM SecondaryBranchExpectations
WHERE branch_name = $1 AND (` + whereClause + ")"
rows, err = tx.Query(ctx, statement, append([]interface{}{branchName}, whereArgs...)...)
if err != nil {
return nil, err // Don't wrap - crdbpgx might retry
defer rows.Close()
for rows.Next() {
var groupingID schema.GroupingID
var digest schema.DigestBytes
var label schema.ExpectationLabel
if err := rows.Scan(&groupingID, &digest, &label); err != nil {
return nil, err
groupingID: sql.AsMD5Hash(groupingID),
digest: sql.AsMD5Hash(digest),
}] = label
// Check that the LabelBefore of each ExpectationDeltaRow matches the label of the
// corresponding row in the SecondaryBranchExpectations or Expectations table, if any.
verifiedDeltaRows := map[groupingIDAndDigest]bool{}
for key, deltaRow := range deltaRows {
label, ok := secondaryBranchLabels[key]
if !ok {
label, ok = primaryBranchLabels[key]
if ok {
if label != deltaRow.LabelBefore {
return nil, &triageConflictError{
GroupingID: deltaRow.GroupingID,
Digest: deltaRow.Digest,
ExpectedLabelBefore: label,
ActualLabelBefore: deltaRow.LabelBefore,
verifiedDeltaRows[key] = true
return verifiedDeltaRows, nil
// StatusHandler returns information about the most recently ingested data and the triage status
// of the various corpora.
func (wh *Handlers) StatusHandler(w http.ResponseWriter, r *http.Request) {
_, span := trace.StartSpan(r.Context(), "web_StatusHandler")
defer span.End()
defer wh.statusCacheMutex.RUnlock()
// This should be an incredibly cheap call and therefore does not count against any quota.
sendJSONResponse(w, wh.statusCache)
// GroupingsHandler returns a map from corpus name to the list of keys that comprise the corpus
// grouping.
// This method returns the union between the following two sets of corpus/grouping pairs:
// - Those defined in the Gold instance's JSON5 configuration.
// - A set constructed by getting a list of corpora from the status cache, and by assigning them
// the default (source_type, name) grouping (that is, corpus name and test name).
// If a corpus appears on both sets, the grouping from the JSON5 configuration takes precedence.
// This gives us flexibility in case we want to support groupings other than (source_type, name) in
// the future.
// For large Gold instances (e.g. Skia, Chrome) it is important to provide a dictionary of grouping
// param keys by corpus in its JSON5 config because:
// - The status cache is periodically populated by a goroutine that runs a slow SQL query (~13
// minutes in the case of the Skia instance).
// - Upon launching an instance, the status cache remains empty for several minutes until said
// goroutine finishes running the aforementioned slow SQL query for the first time.
// - During that time, this RPC (/json/v1/groupings) returns an empty dictionary if the JSON5
// config does not include a dictionary of grouping param keys by corpus.
// - The "goldctl imgtest add" command hits this RPC to validate that the test being added
// includes all the params required by its corpus' grouping.
// - If the RPC returns an empty map, goldctl reports "grouping params for corpus X are unknown",
// which causes spurious test failures in the associated CI system.
// Some possible alternatives:
// - Write a fast SQL query specifically for /json/v1/groupings, but that's probably hard with
// the current schema. It might require factoring the corpora out into their own table.
// - Delay starting the webserver until the status cache is populated, but that would be at the
// expense of a much longer startup time for large instances.
func (wh *Handlers) GroupingsHandler(w http.ResponseWriter, r *http.Request) {
_, span := trace.StartSpan(r.Context(), "web_GroupingsHandler")
defer span.End()
// We will read the grouping param keys by corpus from the status cache. This should be an
// incredibly cheap call and therefore does not count against any quota.
defer wh.statusCacheMutex.RUnlock()
res := frontend.GroupingsResponse{
GroupingParamKeysByCorpus: map[string][]string{},
if len(wh.GroupingParamKeysByCorpus) != 0 {
res.GroupingParamKeysByCorpus = wh.GroupingParamKeysByCorpus
for _, cs := range wh.statusCache.CorpStatus {
corpus := cs.Name
if _, ok := res.GroupingParamKeysByCorpus[corpus]; ok {
// JSON5 config file takes precedence.
res.GroupingParamKeysByCorpus[corpus] = []string{
// Sorted lexicographically.
sendJSONResponse(w, res)
// ClusterDiffRequest contains the options that the frontend provides to the clusterdiff RPC.
type ClusterDiffRequest struct {
Corpus string
Filters paramtools.ParamSet
IncludePositiveDigests bool
IncludeNegativeDigests bool
IncludeUntriagedDigests bool
// TODO(kjlubick) the frontend does not yet support these yet.
ChangelistID string
CodeReviewSystemID string
PatchsetID string
func parseClusterDiffQuery(r *http.Request) (ClusterDiffRequest, error) {
if err := r.ParseForm(); err != nil {
return ClusterDiffRequest{}, skerr.Wrap(err)
var rv ClusterDiffRequest
// TODO(kjlubick) rename this field on the UI side
if corpus := r.FormValue("source_type"); corpus == "" {
return ClusterDiffRequest{}, skerr.Fmt("Must include corpus")
} else {
rv.Corpus = corpus
if q := r.FormValue("query"); q == "" {
return ClusterDiffRequest{}, skerr.Fmt("Must include query")
} else {
filters, err := url.ParseQuery(q)
if err != nil {
return ClusterDiffRequest{}, skerr.Wrapf(err, "invalid query %q", q)
rv.Filters = paramtools.ParamSet(filters)
rv.IncludePositiveDigests = r.FormValue("pos") == "true"
rv.IncludeNegativeDigests = r.FormValue("neg") == "true"
rv.IncludeUntriagedDigests = r.FormValue("unt") == "true"
rv.CodeReviewSystemID = r.FormValue("crs")
rv.ChangelistID = r.FormValue("cl_id")
rv.PatchsetID = r.FormValue("ps_id")
return rv, nil
// ClusterDiffHandler computes the diffs between all digests that match the filters and
// returns them in a way that is convenient for rendering via d3.js
func (wh *Handlers) ClusterDiffHandler(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "web_ClusterDiffHandler", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
if err := wh.limitForAnonUsers(r); err != nil {
httputils.ReportError(w, err, "Try again later", http.StatusInternalServerError)
q, err := parseClusterDiffQuery(r)
if err != nil {
httputils.ReportError(w, err, "Invalid requrest", http.StatusBadRequest)
testNames, ok := q.Filters[types.PrimaryKeyField]
if !ok || len(testNames) == 0 {
http.Error(w, "Must include test name", http.StatusBadRequest)
leftGrouping := paramtools.Params{
types.CorpusField: q.Corpus,
types.PrimaryKeyField: testNames[0],
delete(q.Filters, types.PrimaryKeyField)
clusterOpts := search.ClusterOptions{
Grouping: leftGrouping,
Filters: q.Filters,
IncludePositiveDigests: q.IncludePositiveDigests,
IncludeNegativeDigests: q.IncludeNegativeDigests,
IncludeUntriagedDigests: q.IncludeUntriagedDigests,
CodeReviewSystem: q.CodeReviewSystemID,
ChangelistID: q.ChangelistID,
PatchsetID: q.PatchsetID,
clusterResp, err := wh.Search2API.GetCluster(ctx, clusterOpts)
if err != nil {
httputils.ReportError(w, err, "Unable to compute cluster.", http.StatusInternalServerError)
sendJSONResponse(w, clusterResp)
// ListTestsHandler returns all the tests in the given corpus and a count of how many digests
// have been seen for that.
func (wh *Handlers) ListTestsHandler(w http.ResponseWriter, r *http.Request) {
defer metrics2.FuncTimer().Stop()
ctx, span := trace.StartSpan(r.Context(), "web_ListTestsHandler", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
if err := wh.limitForAnonUsers(r); err != nil {
httputils.ReportError(w, err, "Try again later", http.StatusInternalServerError)
// Inputs: (head, ignored, corpus, keys)
q, err := frontend.ParseListTestsQuery(r)
if err != nil {
httputils.ReportError(w, err, "Failed to parse form data.", http.StatusBadRequest)
counts, err := wh.Search2API.CountDigestsByTest(ctx, q)
if err != nil {
httputils.ReportError(w, err, "Could not compute query.", http.StatusInternalServerError)
sendJSONResponse(w, counts)
// TriageLogHandler returns what has been triaged recently.
func (wh *Handlers) TriageLogHandler(w http.ResponseWriter, r *http.Request) {
defer metrics2.FuncTimer().Stop()
ctx, span := trace.StartSpan(r.Context(), "web_TriageLogHandler", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
if err := wh.cheapLimitForAnonUsers(r); err != nil {
httputils.ReportError(w, err, "Try again later", http.StatusInternalServerError)
// Get the pagination params.
q := r.URL.Query()
offset, size, err := httputils.PaginationParams(q, 0, pageSize, maxPageSize)
if err != nil {
httputils.ReportError(w, err, "Invalid Pagination params", http.StatusBadRequest)
clID := q.Get("changelist_id")
crs := q.Get("crs")
if clID != "" {
if _, ok := wh.getCodeReviewSystem(crs); !ok {
http.Error(w, "Invalid Code Review System; did you include crs?", http.StatusBadRequest)
} else {
crs = ""
logEntries, total, err := wh.getTriageLog(ctx, crs, clID, offset, size)
if err != nil {
httputils.ReportError(w, err, "Unable to retrieve triage logs", http.StatusInternalServerError)
response := frontend.TriageLogResponse{
Entries: logEntries,
ResponsePagination: httputils.ResponsePagination{
Offset: offset,
Size: size,
Total: total,
sendJSONResponse(w, response)
// getTriageLog returns the specified entries and the total count of expectation records.
func (wh *Handlers) getTriageLog(ctx context.Context, crs, clid string, offset, size int) ([]frontend.TriageLogEntry, int, error) {
ctx, span := trace.StartSpan(ctx, "getTriageLog2")
defer span.End()
total, err := wh.getTotalTriageRecords(ctx, crs, clid)
if err != nil {
return nil, 0, skerr.Wrap(err)
if total == 0 {
return []frontend.TriageLogEntry{}, 0, nil // We don't want null in our JSON response.
// Default to the primary branch, which is associated with branch_name (i.e. CL) as NULL.
branchStatement := "WHERE branch_name IS NULL"
if crs != "" {
branchStatement = "WHERE branch_name = $3"
statement := `WITH
RecentRecords AS (
SELECT expectation_record_id, user_name, triage_time
FROM ExpectationRecords ` + branchStatement + `
ORDER BY triage_time DESC, expectation_record_id
SELECT RecentRecords.*, Groupings.keys, digest, label_before, label_after
FROM RecentRecords
JOIN ExpectationDeltas ON RecentRecords.expectation_record_id = ExpectationDeltas.expectation_record_id
JOIN Groupings ON ExpectationDeltas.grouping_id = Groupings.grouping_id
ORDER BY triage_time DESC, expectation_record_id, digest
args := []interface{}{offset, size}
if crs != "" {
args = append(args, sql.Qualify(crs, clid))
rows, err := wh.DB.Query(ctx, statement, args...)
if err != nil {
return nil, 0, skerr.Wrap(err)
defer rows.Close()
var currentEntry *frontend.TriageLogEntry
var rv []frontend.TriageLogEntry
for rows.Next() {
var record schema.ExpectationRecordRow
var delta schema.ExpectationDeltaRow
var grouping paramtools.Params
if err := rows.Scan(&record.ExpectationRecordID, &record.UserName, &record.TriageTime,
&grouping, &delta.Digest, &delta.LabelBefore, &delta.LabelAfter); err != nil {
return nil, 0, skerr.Wrap(err)
if currentEntry == nil || currentEntry.ID != record.ExpectationRecordID.String() {
rv = append(rv, frontend.TriageLogEntry{
ID: record.ExpectationRecordID.String(),
User: record.UserName,
// Multiply by 1000 to convert seconds to milliseconds
TS: record.TriageTime.UTC().Unix() * 1000,
currentEntry = &rv[len(rv)-1]
currentEntry.Details = append(currentEntry.Details, frontend.TriageDelta{
Grouping: grouping,
Digest: types.Digest(hex.EncodeToString(delta.Digest)),
LabelBefore: delta.LabelBefore.ToExpectation(),
LabelAfter: delta.LabelAfter.ToExpectation(),
return rv, total, nil
// getTotalTriageRecords returns the total number of triage records for the CL (or the primary
// branch)
func (wh *Handlers) getTotalTriageRecords(ctx context.Context, crs, clid string) (int, error) {
ctx, span := trace.StartSpan(ctx, "getTotalTriageRecords")
defer span.End()
branchStatement := "WHERE branch_name IS NULL"
if crs != "" {
branchStatement = "WHERE branch_name = $1"
statement := `SELECT COUNT(*) FROM ExpectationRecords ` + branchStatement
var args []interface{}
if crs != "" {
args = append(args, sql.Qualify(crs, clid))
row := wh.DB.QueryRow(ctx, statement, args...)
var count int
if err := row.Scan(&count); err != nil {
return 0, skerr.Wrap(err)
return count, nil
// TriageUndoHandler performs an "undo" for a given id. This id corresponds to the record id of the
// set of changes in the DB.
// If successful it returns the same result as a call to TriageLogHandler to reflect the changes.
func (wh *Handlers) TriageUndoHandler(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "web_TriageUndoHandler", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
// Get the user and make sure they are logged in.
user := wh.alogin.LoggedInAs(r)
if user == alogin.NotLoggedIn {
http.Error(w, "You must be logged in to change expectations", http.StatusUnauthorized)
if !wh.alogin.HasRole(r, roles.Editor) {
http.Error(w, "You must be logged in as an editor to change expectations", http.StatusUnauthorized)
// Extract the id to undo.
changeID := r.URL.Query().Get("id")
// Do the undo procedure.
if err := wh.undoExpectationChanges(ctx, changeID, user.String()); err != nil {
httputils.ReportError(w, err, "Unable to undo.", http.StatusInternalServerError)
// Send the same response as a query for the first page.
wh.TriageLogHandler(w, r)
// undoExpectationChanges will look up all ExpectationDeltas associated with the record that has
// the given ID. It will set the current expectations for those digests/groupings to be the
// label_before value. This will all be done in a transaction.
func (wh *Handlers) undoExpectationChanges(ctx context.Context, recordID, userID string) error {
ctx, span := trace.StartSpan(ctx, "undoExpectationChanges")
defer span.End()
err := crdbpgx.ExecuteTx(ctx, wh.DB, pgx.TxOptions{}, func(tx pgx.Tx) error {
deltas, err := getDeltasForRecord(ctx, tx, recordID)
if err != nil {
return err // Don't wrap - crdbpgx might retry
if len(deltas) == 0 {
return skerr.Fmt("no expectation deltas found for record %s", recordID)
branchNameRow := tx.QueryRow(ctx, `SELECT branch_name FROM ExpectationRecords WHERE expectation_record_id = $1`, recordID)
var branchOfOriginal pgtype.Text
if err := branchNameRow.Scan(&branchOfOriginal); err != nil {
return err
newRecordID, err := writeRecord(ctx, tx, userID, len(deltas), branchOfOriginal.String)
if err != nil {
return err
invertedDeltas := invertDeltas(deltas, newRecordID)
if err := writeDeltas(ctx, tx, invertedDeltas); err != nil {
return err
if branchOfOriginal.Status != pgtype.Present {
err = applyDeltasToPrimary(ctx, tx, invertedDeltas)
} else {
err = applyDeltasToBranch(ctx, tx, invertedDeltas, branchOfOriginal.String)
return err
if err != nil {
return skerr.Wrap(err)
return nil
// writeRecord writes a new ExpectationRecord to the DB.
func writeRecord(ctx context.Context, tx pgx.Tx, userID string, numChanges int, branch string) (uuid.UUID, error) {
ctx, span := trace.StartSpan(ctx, "writeRecord")
defer span.End()
var br *string
if branch != "" {
br = &branch
const statement = `INSERT INTO ExpectationRecords
(user_name, triage_time, num_changes, branch_name) VALUES ($1, $2, $3, $4) RETURNING expectation_record_id`
row := tx.QueryRow(ctx, statement, userID, now.Now(ctx), numChanges, br)
var recordUUID uuid.UUID
err := row.Scan(&recordUUID)
if err != nil {
return uuid.UUID{}, err
return recordUUID, nil
// invertDeltas returns a slice of deltas corresponding to the same grouping+digest pairs as the
// original slice, but with inverted before/after labels and a new record ID.
func invertDeltas(deltas []schema.ExpectationDeltaRow, newRecordID uuid.UUID) []schema.ExpectationDeltaRow {
var rv []schema.ExpectationDeltaRow
for _, d := range deltas {
rv = append(rv, schema.ExpectationDeltaRow{
ExpectationRecordID: newRecordID,
GroupingID: d.GroupingID,
Digest: d.Digest,
LabelBefore: d.LabelAfter, // Intentionally flipped around
LabelAfter: d.LabelBefore,
return rv
// getDeltasForRecord returns all ExpectationDeltaRows for the given record ID.
func getDeltasForRecord(ctx context.Context, tx pgx.Tx, recordID string) ([]schema.ExpectationDeltaRow, error) {
ctx, span := trace.StartSpan(ctx, "getDeltasForRecord")
defer span.End()
const statement = `SELECT grouping_id, digest, label_before, label_after
FROM ExpectationDeltas WHERE expectation_record_id = $1`
rows, err := tx.Query(ctx, statement, recordID)
if err != nil {
return nil, err // Don't wrap - crdbpgx might retry
defer rows.Close()
var deltas []schema.ExpectationDeltaRow
for rows.Next() {
var row schema.ExpectationDeltaRow
if err := rows.Scan(&row.GroupingID, &row.Digest, &row.LabelBefore, &row.LabelAfter); err != nil {
return nil, skerr.Wrap(err) // probably not retriable
deltas = append(deltas, row)
return deltas, nil
// writeDeltas writes the given rows to the SQL DB.
func writeDeltas(ctx context.Context, tx pgx.Tx, deltas []schema.ExpectationDeltaRow) error {
ctx, span := trace.StartSpan(ctx, "writeDeltas")
defer span.End()
const statement = `INSERT INTO ExpectationDeltas
(expectation_record_id, grouping_id, digest, label_before, label_after) VALUES `
const valuesPerRow = 5
vp := sqlutil.ValuesPlaceholders(valuesPerRow, len(deltas))
arguments := make([]interface{}, 0, len(deltas)*valuesPerRow)
for _, d := range deltas {
arguments = append(arguments, d.ExpectationRecordID, d.GroupingID, d.Digest, d.LabelBefore, d.LabelAfter)
_, err := tx.Exec(ctx, statement+vp, arguments...)
return err // don't wrap, could be retryable
// applyDeltasToPrimary applies the given deltas to the primary branch expectations.
func applyDeltasToPrimary(ctx context.Context, tx pgx.Tx, deltas []schema.ExpectationDeltaRow) error {
ctx, span := trace.StartSpan(ctx, "applyDeltasToPrimary")
defer span.End()
const statement = `UPSERT INTO Expectations
(grouping_id, digest, label, expectation_record_id) VALUES `
const valuesPerRow = 4
vp := sqlutil.ValuesPlaceholders(valuesPerRow, len(deltas))
arguments := make([]interface{}, 0, len(deltas)*valuesPerRow)
for _, d := range deltas {
arguments = append(arguments, d.GroupingID, d.Digest, d.LabelAfter, d.ExpectationRecordID)
_, err := tx.Exec(ctx, statement+vp, arguments...)
return err // don't wrap, could be retryable
// applyDeltasToBranch applies the given deltas to the given branch (i.e. CL).
func applyDeltasToBranch(ctx context.Context, tx pgx.Tx, deltas []schema.ExpectationDeltaRow, branch string) error {
ctx, span := trace.StartSpan(ctx, "applyInvertedDeltasToBranch")
defer span.End()
const statement = `UPSERT INTO SecondaryBranchExpectations
(branch_name, grouping_id, digest, label, expectation_record_id) VALUES `
const valuesPerRow = 5
vp := sqlutil.ValuesPlaceholders(valuesPerRow, len(deltas))
arguments := make([]interface{}, 0, len(deltas)*valuesPerRow)
for _, d := range deltas {
arguments = append(arguments, branch, d.GroupingID, d.Digest, d.LabelAfter, d.ExpectationRecordID)
_, err := tx.Exec(ctx, statement+vp, arguments...)
return err // don't wrap, could be retryable
// ParamsHandler returns all Params that could be searched over. It uses the SQL Backend and
// returns *only* the keys, not the options.
func (wh *Handlers) ParamsHandler(w http.ResponseWriter, r *http.Request) {
if err := wh.cheapLimitForAnonUsers(r); err != nil {
httputils.ReportError(w, err, "Try again later", http.StatusInternalServerError)
ctx, span := trace.StartSpan(r.Context(), "web_ParamsHandler", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
if err := r.ParseForm(); err != nil {
httputils.ReportError(w, err, "Invalid form headers", http.StatusBadRequest)
clID := r.Form.Get("changelist_id")
crs := r.Form.Get("crs")
if clID == "" {
ps, err := wh.Search2API.GetPrimaryBranchParamset(ctx)
if err != nil {
httputils.ReportError(w, err, "Could not get paramset for primary branch", http.StatusInternalServerError)
sendJSONResponse(w, ps)
if _, ok := wh.getCodeReviewSystem(crs); !ok {
http.Error(w, "Invalid Code Review System; did you include crs?", http.StatusBadRequest)
ps, err := wh.Search2API.GetChangelistParamset(ctx, crs, clID)
if err != nil {
httputils.ReportError(w, err, "Could not get paramset for given CL", http.StatusInternalServerError)
sendJSONResponse(w, ps)
// CommitsHandler returns the last n commits with data that make up the sliding window.
func (wh *Handlers) CommitsHandler(w http.ResponseWriter, r *http.Request) {
if err := wh.cheapLimitForAnonUsers(r); err != nil {
httputils.ReportError(w, err, "Try again later", http.StatusInternalServerError)
ctx, span := trace.StartSpan(r.Context(), "web_CommitsHandler", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
commits, err := wh.Search2API.GetCommitsInWindow(ctx)
if err != nil {
httputils.ReportError(w, err, "Could not get commits", http.StatusInternalServerError)
sendJSONResponse(w, commits)
// KnownHashesHandler returns known hashes that have been written to GCS in the background
// Each line contains a single digest for an image. Bots will then only upload images which
// have a hash not found on this list, avoiding significant amounts of unnecessary uploads.
func (wh *Handlers) KnownHashesHandler(w http.ResponseWriter, r *http.Request) {
// No limit for anon users - this is an endpoint backed up by baseline servers, and
// should be able to handle a large load.
_, span := trace.StartSpan(r.Context(), "web_TextKnownHashesProxy")
defer span.End()
w.Header().Set("Content-Type", "text/plain")
defer wh.knownHashesMutex.RUnlock()
if _, err := w.Write([]byte(wh.knownHashesCache)); err != nil {
sklog.Errorf("Failed to write the known hashes", err)
// BaselineHandlerV2 returns a JSON representation of that baseline including
// baselines for a options issue. It can respond to requests like these:
// /json/expectations
// /json/expectations?issue=123456
// The "issue" parameter indicates the changelist ID for which we would like to
// retrieve the baseline. In that case the returned options will be a blend of
// the master baseline and the baseline defined for the changelist (usually
// based on tryjob results).
func (wh *Handlers) BaselineHandlerV2(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "frontend_BaselineHandlerV2")
defer span.End()
// No limit for anon users - this is an endpoint backed up by baseline servers, and
// should be able to handle a large load.
q := r.URL.Query()
clID := q.Get("issue")
crs := q.Get("crs")
if clID != "" {
if _, ok := wh.getCodeReviewSystem(crs); !ok {
http.Error(w, "Invalid CRS provided.", http.StatusBadRequest)
} else {
crs = ""
bl, err := wh.fetchBaseline(ctx, crs, clID)
if err != nil {
httputils.ReportError(w, err, "Fetching baseline failed.", http.StatusInternalServerError)
sendJSONResponse(w, bl)
// fetchBaseline returns an object that contains all the positive and negatively triaged digests
// for either the primary branch or the primary branch and the CL. As per usual, the triage status
// on a CL overrides the triage status on the primary branch.
func (wh *Handlers) fetchBaseline(ctx context.Context, crs, clID string) (frontend.BaselineV2Response, error) {
ctx, span := trace.StartSpan(ctx, "fetchBaseline")
defer span.End()
span.AddAttributes(trace.BoolAttribute("fromCache", false))
// Return the baseline from the cache if possible.
baselineCacheKey := "primary"
if clID != "" {
baselineCacheKey = fmt.Sprintf("%s_%s", crs, clID)
if val, ok := wh.baselineCache.Get(baselineCacheKey); ok {
res := val.(frontend.BaselineV2Response)
trace.BoolAttribute("fromCache", true),
trace.Int64Attribute("numExpectationsReturned", int64(len(res.Expectations))))
return res, nil
statement := `WITH
PrimaryBranchExps AS (
SELECT grouping_id, digest, label FROM Expectations
WHERE label = 'n' OR label = 'p'
var args []interface{}
if crs == "" {
span.AddAttributes(trace.StringAttribute("type", "primary"))
statement += `
SELECT Groupings.keys ->> 'name', encode(digest, 'hex'), label FROM PrimaryBranchExps
JOIN Groupings ON PrimaryBranchExps.grouping_id = Groupings.grouping_id
} else {
trace.StringAttribute("type", "changelist"),
trace.StringAttribute("crs", crs),
trace.StringAttribute("clID", clID))
qCLID := sql.Qualify(crs, clID)
statement += `,
CLExps AS (
SELECT grouping_id, digest, label FROM SecondaryBranchExpectations
WHERE branch_name = $1
JoinedExps AS (
SELECT COALESCE(CLExps.grouping_id, PrimaryBranchExps.grouping_id) as grouping_id,
COALESCE(CLExps.digest, PrimaryBranchExps.digest) as digest,
COALESCE(CLExps.label, PrimaryBranchExps.label, 'u') as label
CLExps.grouping_id = PrimaryBranchExps.grouping_id
AND CLExps.digest = PrimaryBranchExps.digest
SELECT Groupings.keys ->> 'name', encode(digest, 'hex'), label FROM JoinedExps
JOIN Groupings ON JoinedExps.grouping_id = Groupings.grouping_id
WHERE label = 'n' OR label = 'p'`
args = append(args, qCLID)
rows, err := wh.DB.Query(ctx, statement, args...)
if err != nil {
return frontend.BaselineV2Response{}, skerr.Wrap(err)
defer rows.Close()
baseline := expectations.Baseline{}
for rows.Next() {
var testName types.TestName
var digest types.Digest
var label schema.ExpectationLabel
if err := rows.Scan(&testName, &digest, &label); err != nil {
return frontend.BaselineV2Response{}, skerr.Wrap(err)
byDigest, ok := baseline[testName]
if !ok {
byDigest = map[types.Digest]expectations.Label{}
baseline[testName] = byDigest
byDigest[digest] = label.ToExpectation()
response := frontend.BaselineV2Response{
CodeReviewSystem: crs,
ChangelistID: clID,
Expectations: baseline,
span.AddAttributes(trace.Int64Attribute("numExpectationsReturned", int64(len(response.Expectations))))
// Cache the computed baseline.
baselineCacheEntryTTL := baselineCachePrimaryBranchEntryTTL
if clID != "" {
baselineCacheEntryTTL = baselineCacheSecondaryBranchEntryTTL
wh.baselineCache.Set(baselineCacheKey, response, baselineCacheEntryTTL)
return response, nil
// DigestListHandler returns a list of digests for a given test. This is used by goldctl's
// local diff tech.
func (wh *Handlers) DigestListHandler(w http.ResponseWriter, r *http.Request) {
if err := wh.cheapLimitForAnonUsers(r); err != nil {
httputils.ReportError(w, err, "Try again later", http.StatusInternalServerError)
ctx, span := trace.StartSpan(r.Context(), "web_DigestListHandler")
defer span.End()
if err := r.ParseForm(); err != nil {
httputils.ReportError(w, err, "Failed to parse form values", http.StatusInternalServerError)
encodedGrouping := r.Form.Get("grouping")
if encodedGrouping == "" {
http.Error(w, "You must include 'grouping'", http.StatusBadRequest)
groupingSet, err := url.ParseQuery(encodedGrouping)
if err != nil {
httputils.ReportError(w, skerr.Wrapf(err, "bad grouping %s", encodedGrouping), "Invalid grouping", http.StatusBadRequest)
grouping := make(paramtools.Params, len(groupingSet))
for key, values := range groupingSet {
if len(values) == 0 {
grouping[key] = values[0]
// If needed, we could add a TTL cache here.
out, err := wh.Search2API.GetDigestsForGrouping(ctx, grouping)
if err != nil {
httputils.ReportError(w, err, "Could not retrieve digests", http.StatusInternalServerError)
sendJSONResponse(w, out)
// Whoami returns the email address of the user or service account used to authenticate the
// request. For debugging purposes only.
func (wh *Handlers) Whoami(w http.ResponseWriter, r *http.Request) {
if err := wh.cheapLimitForAnonUsers(r); err != nil {
httputils.ReportError(w, err, "Try again later", http.StatusInternalServerError)
_, span := trace.StartSpan(r.Context(), "web_Whoami")
defer span.End()
user := wh.alogin.LoggedInAs(r)
sendJSONResponse(w, map[string]interface{}{
"whoami": user.String(),
"roles": wh.alogin.Roles(r),
// LatestPositiveDigestHandler returns the most recent positive digest for the given trace.
// Starting at the tip of tree, it will skip over any missing data, untriaged digests or digests
// triaged negative until it finds a positive digest.
func (wh *Handlers) LatestPositiveDigestHandler(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "web_LatestPositiveDigestHandler")
defer span.End()
if err := wh.cheapLimitForAnonUsers(r); err != nil {
httputils.ReportError(w, err, "Try again later", http.StatusInternalServerError)
tID := chi.URLParam(r, "traceID")
if tID == "" {
http.Error(w, "Must specify traceID.", http.StatusBadRequest)
traceID, err := hex.DecodeString(tID)
if err != nil {
httputils.ReportError(w, err, "Invalid traceID - must be an MD5 hash", http.StatusBadRequest)
digest, err := wh.getLatestPositiveDigest(ctx, traceID)
if err != nil {
httputils.ReportError(w, err, "Could not complete query.", http.StatusInternalServerError)
sendJSONResponse(w, frontend.MostRecentPositiveDigestResponse{Digest: digest})
func (wh *Handlers) getLatestPositiveDigest(ctx context.Context, traceID schema.TraceID) (types.Digest, error) {
ctx, span := trace.StartSpan(ctx, "getLatestPositiveDigest")
defer span.End()
const statement = `WITH
RecentDigests AS (
SELECT digest, commit_id, grouping_id FROM TraceValues WHERE trace_id = $1
ORDER BY commit_id DESC LIMIT 1000 -- arbitrary limit
SELECT encode(RecentDigests.digest, 'hex') FROM RecentDigests
JOIN Expectations ON Expectations.grouping_id = RecentDigests.grouping_id AND
Expectations.digest = RecentDigests.digest
WHERE label = 'p'
row := wh.DB.QueryRow(ctx, statement, traceID)
var digest types.Digest
if err := row.Scan(&digest); err != nil {
if err == pgx.ErrNoRows {
return "", nil
return "", skerr.Wrap(err)
return digest, nil
// ChangelistSearchRedirect redirects the user to a search page showing the search results
// for a given CL. It will do a (hopefully) quick scan of the untriaged digests - if it finds some,
// it will include the corpus containing some of those untriaged digests in the search query so the
// user will see results (instead of getting directed to a corpus with no results).
func (wh *Handlers) ChangelistSearchRedirect(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "web_ChangelistSearchRedirect")
defer span.End()
if err := wh.cheapLimitForAnonUsers(r); err != nil {
httputils.ReportError(w, err, "Try again later", http.StatusInternalServerError)
crs := chi.URLParam(r, "system")
if crs == "" {
http.Error(w, "Must specify 'system' of Changelist.", http.StatusBadRequest)
clID := chi.URLParam(r, "id")
if clID == "" {
http.Error(w, "Must specify 'id' of Changelist.", http.StatusBadRequest)
if _, ok := wh.getCodeReviewSystem(crs); !ok {
http.Error(w, "Invalid Code Review System", http.StatusBadRequest)
// This allows users to link to something like:
// And the page loads showing the all the results.
extraQueryParam := ""
if strings.Contains(clID, "?") {
s := strings.Split(clID, "?")
clID, extraQueryParam = s[0], s[1]
} else if strings.Contains(clID, "&") {
s := strings.Split(clID, "&")
clID, extraQueryParam = s[0], s[1]
qualifiedPSID, psOrder, err := wh.getLatestPatchset(ctx, crs, clID)
if err != nil {
httputils.ReportError(w, err, "Could not find latest patchset", http.StatusNotFound)
// TODO(kjlubick) when we change the patchsets arg to not be a list of orders, we should
// update it here too (probably specify the ps id).
baseURL := fmt.Sprintf("/search?issue=%s&crs=%s&patchsets=%d", clID, crs, psOrder)
if extraQueryParam != "" {
baseURL += "&" + extraQueryParam
corporaWithUntriagedUnignoredDigests, err := wh.getActionableDigests(ctx, crs, clID, qualifiedPSID)
if err != nil {
sklog.Errorf("Error getting digests for CL %s from CRS %s with PS %s: %s", clID, crs, qualifiedPSID, err)
http.Redirect(w, r, baseURL, http.StatusTemporaryRedirect)
if len(corporaWithUntriagedUnignoredDigests) == 0 {
http.Redirect(w, r, baseURL, http.StatusTemporaryRedirect)
http.Redirect(w, r, baseURL+"&corpus="+corporaWithUntriagedUnignoredDigests[0].Corpus, http.StatusTemporaryRedirect)
// getLatestPatchset returns the latest patchset for a given CL. It goes off of created_ts, due
// to the fact that (for GitHub) rebases can happen and potentially cause ps_order to be off.
func (wh *Handlers) getLatestPatchset(ctx context.Context, crs, clID string) (string, int, error) {
ctx, span := trace.StartSpan(ctx, "getLatestPatchset")
defer span.End()
const statement = `SELECT patchset_id, ps_order FROM Patchsets
WHERE changelist_id = $1
ORDER BY created_ts DESC, ps_order DESC
row := wh.DB.QueryRow(ctx, statement, sql.Qualify(crs, clID))
var qualifiedID string
var order int
if err := row.Scan(&qualifiedID, &order); err != nil {
return "", 0, skerr.Wrap(err)
return qualifiedID, order, nil
type corpusAndCount struct {
Corpus string
Count int
// getActionableDigests returns a list of corpus and the number of untriaged, not-ignored digests
// that have been seen in the data for the given PS. We choose *not* to strip out digests that
// are already on the primary branch because that additional join makes this query too slow.
// As is, it can take 3-4 seconds on a large instance like Skia. The return value will be sorted
// by count, with the corpus name being the tie-breaker.
func (wh *Handlers) getActionableDigests(ctx context.Context, crs, clID, qPSID string) ([]corpusAndCount, error) {
ctx, span := trace.StartSpan(ctx, "getActionableDigests")
defer span.End()
const statement = `WITH
DataFromCL AS (
SELECT secondary_branch_trace_id, SecondaryBranchValues.grouping_id, digest
FROM SecondaryBranchValues WHERE branch_name = $1 AND version_name = $2
ExpectationsForCL AS (
SELECT grouping_id, digest, label
FROM SecondaryBranchExpectations
WHERE branch_name = $1
JoinedExpectations AS (
SELECT COALESCE(ExpectationsForCL.grouping_id, Expectations.grouping_id) AS grouping_id,
COALESCE(ExpectationsForCL.digest, Expectations.digest) AS digest,
COALESCE(ExpectationsForCL.label, Expectations.label, 'u') AS label
FROM ExpectationsForCL FULL OUTER JOIN Expectations ON
ExpectationsForCL.grouping_id = Expectations.grouping_id
AND ExpectationsForCL.digest = Expectations.digest
UntriagedData AS (
SELECT secondary_branch_trace_id, DataFromCL.grouping_id, DataFromCL.digest FROM DataFromCL
LEFT JOIN JoinedExpectations ON DataFromCL.grouping_id = JoinedExpectations.grouping_id
AND DataFromCL.digest = JoinedExpectations.digest
WHERE label = 'u' OR label IS NULL
UnignoredUntriagedData AS (
SELECT DISTINCT UntriagedData.grouping_id, digest FROM UntriagedData
JOIN Traces ON UntriagedData.secondary_branch_trace_id = Traces.trace_id
AND matches_any_ignore_rule = FALSE
SELECT keys->>'source_type', COUNT(*) FROM Groupings JOIN UnignoredUntriagedData
ON Groupings.grouping_id = UnignoredUntriagedData.grouping_id
rows, err := wh.DB.Query(ctx, statement, sql.Qualify(crs, clID), qPSID)
if err != nil {
return nil, skerr.Wrap(err)
defer rows.Close()
var rv []corpusAndCount
for rows.Next() {
var c corpusAndCount
if err := rows.Scan(&c.Corpus, &c.Count); err != nil {
return nil, skerr.Wrap(err)
rv = append(rv, c)
return rv, nil
func (wh *Handlers) getCodeReviewSystem(crs string) (clstore.ReviewSystem, bool) {
var system clstore.ReviewSystem
found := false
for _, rs := range wh.ReviewSystems {
if rs.ID == crs {
system = rs
found = true
return system, found
const (
validDigestLength = 2 * md5.Size
dotPNG = ".png"
// ImageHandler returns either a single image or a diff between two images identified by their
// respective digests.
func (wh *Handlers) ImageHandler(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "web_ImageHandler")
defer span.End()
// No rate limit, as this should be quite fast.
_, imgFile := path.Split(r.URL.Path)
// Get the file that was requested and verify that it's a valid PNG file.
if !strings.HasSuffix(imgFile, dotPNG) {
// Trim the image extension to get the image or diff ID.
imgID := imgFile[:len(imgFile)-len(dotPNG)]
// Cache images for 12 hours.
w.Header().Set("Cache-Control", "public, max-age=43200")
if len(imgID) == validDigestLength {
// Example request:
wh.serveImageWithDigest(ctx, w, types.Digest(imgID))
} else if len(imgID) == validDigestLength*2+1 {
// Example request:
left := types.Digest(imgID[:validDigestLength])
// + 1 for the dash
right := types.Digest(imgID[validDigestLength+1:])
wh.serveImageDiff(ctx, w, left, right)
} else {
// serveImageWithDigest downloads the image from GCS and returns it. If there is an error, a 404
// or 500 error is returned, as appropriate.
func (wh *Handlers) serveImageWithDigest(ctx context.Context, w http.ResponseWriter, digest types.Digest) {
ctx, span := trace.StartSpan(ctx, "serveImageWithDigest")
defer span.End()
// Go's image package has no color profile support and we convert to 8-bit NRGBA to diff,
// but our source images may have embedded color profiles and be up to 16-bit. So we must
// at least take care to serve the original .pngs unaltered.
b, err := wh.GCSClient.GetImage(ctx, digest)
if err != nil {
sklog.Warningf("Could not get image with digest %s: %s", digest, err)
if _, err := w.Write(b); err != nil {
httputils.ReportError(w, err, "Could not load image. Try again later.", http.StatusInternalServerError)
// serveImageDiff downloads the left and right images, computes the diff between them, encodes
// the diff as a PNG image and writes it to the provided ResponseWriter. If there is an error, it
// returns a 404 or 500 error as appropriate.
func (wh *Handlers) serveImageDiff(ctx context.Context, w http.ResponseWriter, left types.Digest, right types.Digest) {
ctx, span := trace.StartSpan(ctx, "serveImageDiff")
defer span.End()
// TODO(lovisolo): Diff in NRGBA64?
// TODO(lovisolo): Make sure each pair of images is in the same color space before diffing?
// (They probably are today but it'd be a good correctness check to make sure.)
eg, eCtx := errgroup.WithContext(ctx)
var leftImg *image.NRGBA
var rightImg *image.NRGBA
eg.Go(func() error {
b, err := wh.GCSClient.GetImage(eCtx, left)
if err != nil {
return skerr.Wrap(err)
leftImg, err = decode(b)
return skerr.Wrap(err)
eg.Go(func() error {
b, err := wh.GCSClient.GetImage(eCtx, right)
if err != nil {
return skerr.Wrap(err)
rightImg, err = decode(b)
return skerr.Wrap(err)
if err := eg.Wait(); err != nil {
sklog.Warningf("Could not get diff for images %q and %q: %s", left, right, err)
// Compute the diff image.
_, diffImg := diff.PixelDiff(leftImg, rightImg)
// Write output image to the http.ResponseWriter. Content-Type is set automatically
// based on the first 512 bytes of written data. See docs for ResponseWriter.Write()
// for details.
// The encoding step below does not take color profiles into account. This is fine since
// both the left and right images used to compute the diff are in the same color space,
// and also because the resulting diff image is just a visual approximation of the
// differences between the left and right images.
if err := encodeImg(w, diffImg); err != nil {
httputils.ReportError(w, err, "could not serve diff image", http.StatusInternalServerError)
// decode decodes the provided bytes as a PNG and returns them as an *image.NRGBA.
func decode(b []byte) (*image.NRGBA, error) {
im, err := png.Decode(bytes.NewReader(b))
if err != nil {
return nil, skerr.Wrap(err)
return diff.GetNRGBA(im), nil
// noCacheNotFound disables caching and returns a 404.
func noCacheNotFound(w http.ResponseWriter) {
w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
http.NotFound(w, nil)
// ChangelistSummaryHandler returns a summary of the new and untriaged digests produced by this
// CL across all Patchsets.
func (wh *Handlers) ChangelistSummaryHandler(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "web_ChangelistSummaryHandler", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
if err := wh.cheapLimitForGerritPlugin(r); err != nil {
httputils.ReportError(w, err, "Try again later", http.StatusInternalServerError)
clID := chi.URLParam(r, "id")
if clID == "" {
http.Error(w, "Must specify 'id' of Changelist.", http.StatusBadRequest)
crs := chi.URLParam(r, "system")
if crs == "" {
http.Error(w, "Must specify 'system' of Changelist.", http.StatusBadRequest)
system, ok := wh.getCodeReviewSystem(crs)
if !ok {
http.Error(w, "Invalid Code Review System", http.StatusBadRequest)
qCLID := sql.Qualify(system.ID, clID)
sum, err := wh.getCLSummary2(ctx, qCLID)
if err != nil {
httputils.ReportError(w, err, "Could not get summary", http.StatusInternalServerError)
rv := convertChangelistSummaryResponseV1(sum)
sendJSONResponse(w, rv)
// getCLSummary2 fetches, caches, and returns the summary for a given CL. If the result has already
// been cached, it will return that cached value with a flag if the value is still up to date or
// not. If the cached data is stale, it will spawn a goroutine to update the cached value.
func (wh *Handlers) getCLSummary2(ctx context.Context, qCLID string) (search.NewAndUntriagedSummary, error) {
ts, err := wh.Search2API.ChangelistLastUpdated(ctx, qCLID)
if err != nil {
return search.NewAndUntriagedSummary{}, skerr.Wrap(err)
if ts.IsZero() { // A Zero time means we have no data for this CL.
return search.NewAndUntriagedSummary{}, nil
cached, ok := wh.clSummaryCache.Get(qCLID)
if ok {
sum, ok := cached.(search.NewAndUntriagedSummary)
if ok {
if ts.Before(sum.LastUpdated) || sum.LastUpdated.Equal(ts) {
sum.Outdated = false
return sum, nil
// Result is stale. Start a goroutine to fetch it again.
done := make(chan struct{})
go func() {
// We intentionally use context.Background() and not the request's context because
// if we return a result, we want the fetching in the background to continue so
// if/when the client tries again, we can serve that updated result.
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
newValue, err := wh.Search2API.NewAndUntriagedSummaryForCL(ctx, qCLID)
if err != nil {
sklog.Warningf("Could not fetch out of date summary for cl %s in background: %s", qCLID, err)
wh.clSummaryCache.Add(qCLID, newValue)
done <- struct{}{}
// Wait up to 500ms to return the latest value quickly if available
timer := time.NewTimer(500 * time.Millisecond)
defer timer.Stop()
select {
case <-done:
case <-timer.C:
cached, ok = wh.clSummaryCache.Get(qCLID)
if ok {
if possiblyUpdated, ok := cached.(search.NewAndUntriagedSummary); ok {
if ts.Before(possiblyUpdated.LastUpdated) || possiblyUpdated.LastUpdated.Equal(ts) {
// We were able to fetch new data quickly, so return it now.
possiblyUpdated.Outdated = false
return possiblyUpdated, nil
// The cached data is still stale or invalid, so return what we have marked as outdated.
sum.Outdated = true
return sum, nil
// Invalid or missing cache entry. We must fetch because we have nothing to give the user.
sum, err := wh.Search2API.NewAndUntriagedSummaryForCL(ctx, qCLID)
if err != nil {
return search.NewAndUntriagedSummary{}, skerr.Wrap(err)
wh.clSummaryCache.Add(qCLID, sum)
return sum, nil
// convertChangelistSummaryResponseV1 converts the search2 version of a Changelist summary into
// the version expected by the frontend.
func convertChangelistSummaryResponseV1(summary search.NewAndUntriagedSummary) frontend.ChangelistSummaryResponseV1 {
xps := make([]frontend.PatchsetNewAndUntriagedSummaryV1, 0, len(summary.PatchsetSummaries))
for _, ps := range summary.PatchsetSummaries {
xps = append(xps, frontend.PatchsetNewAndUntriagedSummaryV1{
NewImages: ps.NewImages,
NewUntriagedImages: ps.NewUntriagedImages,
TotalUntriagedImages: ps.TotalUntriagedImages,
PatchsetID: ps.PatchsetID,
PatchsetOrder: ps.PatchsetOrder,
// It is convenient for the UI to have these sorted with the latest patchset first.
sort.Slice(xps, func(i, j int) bool {
return xps[i].PatchsetOrder > xps[j].PatchsetOrder
return frontend.ChangelistSummaryResponseV1{
ChangelistID: summary.ChangelistID,
PatchsetSummaries: xps,
Outdated: summary.Outdated,
// StartCacheWarming starts warming the caches for data we want to serve quickly. It starts
// goroutines that will run in the background (until the provided context is cancelled).
func (wh *Handlers) StartCacheWarming(ctx context.Context) {
// startCLCacheProcess starts a go routine to warm the CL Summary cache. This way, most
// summaries are responsive, even on big instances.
func (wh *Handlers) startCLCacheProcess(ctx context.Context) {
// We warm every CL that was open and produced data or saw triage activity in the last 5 days.
// After the first cycle, we will incrementally update the cache.
lastCheck := now.Now(ctx).Add(-5 * 24 * time.Hour)
go util.RepeatCtx(ctx, time.Minute, func(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "web_warmCLCacheCycle", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
newTS := now.Now(ctx)
rows, err := wh.DB.Query(ctx, `WITH
ChangelistsWithNewData AS (
SELECT changelist_id FROM Changelists
WHERE status = 'open' and last_ingested_data > $1
ChangelistsWithTriageActivity AS (
SELECT DISTINCT branch_name AS changelist_id FROM ExpectationRecords
WHERE branch_name IS NOT NULL AND triage_time > $1
SELECT changelist_id FROM ChangelistsWithNewData
SELECT changelist_id FROM ChangelistsWithTriageActivity
`, lastCheck)
if err != nil {
if err == pgx.ErrNoRows {
sklog.Infof("No CLS updated since %s", lastCheck)
lastCheck = newTS
sklog.Errorf("Could not fetch updated CLs to warm cache: %s", err)
defer rows.Close()
var qualifiedIDS []string
for rows.Next() {
var qID string
if err := rows.Scan(&qID); err != nil {
sklog.Errorf("Could not scan: %s", err)
qualifiedIDS = append(qualifiedIDS, qID)
sklog.Infof("Warming cache for %d CLs", len(qualifiedIDS))
span.AddAttributes(trace.Int64Attribute("num_cls", int64(len(qualifiedIDS))))
// warm cache 3 at a time. This number of goroutines was chosen arbitrarily.
_ = util.ChunkIterParallel(ctx, len(qualifiedIDS), len(qualifiedIDS)/3+1, func(ctx context.Context, startIdx int, endIdx int) error {
if err := ctx.Err(); err != nil {
return nil
for _, qCLID := range qualifiedIDS[startIdx:endIdx] {
_, err := wh.getCLSummary2(ctx, qCLID)
if err != nil {
sklog.Warningf("Ignoring error while warming CL Cache for %s: %s", qCLID, err)
return nil
lastCheck = newTS
sklog.Infof("Done warming cache")
// startStatusCacheProcess will compute the GUI Status on a timer and save it to the cache.
func (wh *Handlers) startStatusCacheProcess(ctx context.Context) {
go util.RepeatCtx(ctx, time.Minute, func(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "web_warmStatusCacheCycle", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
gs, err := wh.Search2API.ComputeGUIStatus(ctx)
if err != nil {
sklog.Errorf("Could not compute GUI Status: %s", err)
defer wh.statusCacheMutex.Unlock()
wh.statusCache = gs
// StartKnownHashesCacheProcess will fetch the known hashes on a timer and save it to the cache.
func (wh *Handlers) StartKnownHashesCacheProcess(ctx context.Context) {
go util.RepeatCtx(ctx, time.Minute, func(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "web_warmKnownHashesCacheCycle", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
var buf bytes.Buffer
if err := wh.GCSClient.LoadKnownDigests(ctx, &buf); err != nil {
sklog.Errorf("Could not fetch known digests: %s", err)
defer wh.knownHashesMutex.Unlock()
wh.knownHashesCache = buf.String()
type ignoredTrace struct {
Keys paramtools.Params
Label expectations.Label
// startIgnoredTraceCacheProcess will periodically update the cache of ignored traces.
func (wh *Handlers) startIgnoredTraceCacheProcess(ctx context.Context) {
go util.RepeatCtx(ctx, time.Minute, func(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "web_warmIgnoredTraceCacheCycle", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
if err := wh.updateIgnoredTracesCache(ctx); err != nil {
sklog.Errorf("Could not get all ignored traces: %s", err)
// updateIgnoredTracesCache fetches all ignored traces that have recent data and returns both
// the trace keys and the triage status of the digest at ToT.
func (wh *Handlers) updateIgnoredTracesCache(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "updateIgnoredTracesCache")
defer span.End()
const statement = `WITH
RecentCommits AS (
SELECT commit_id FROM CommitsWithData
ORDER BY commit_id DESC LIMIT $1
OldestCommitInWindow AS (
SELECT commit_id FROM RecentCommits
ORDER BY commit_id ASC LIMIT 1
SELECT keys, label FROM ValuesAtHead
JOIN OldestCommitInWindow ON ValuesAtHead.most_recent_commit_id >= OldestCommitInWindow.commit_id
AND matches_any_ignore_rule = TRUE
JOIN Expectations ON ValuesAtHead.grouping_id = Expectations.grouping_id
AND ValuesAtHead.digest = Expectations.digest
rows, err := wh.DB.Query(ctx, statement, wh.WindowSize)
if err != nil {
return skerr.Wrap(err)
defer rows.Close()
var ignoredTraces []ignoredTrace
for rows.Next() {
var ps paramtools.Params
var label schema.ExpectationLabel
if err := rows.Scan(&ps, &label); err != nil {
return skerr.Wrap(err)
ignoredTraces = append(ignoredTraces, ignoredTrace{
Keys: ps,
Label: label.ToExpectation(),
defer wh.ignoredTracesCacheMutex.Unlock()
wh.ignoredTracesCache = ignoredTraces
return nil
// PositiveDigestsByGroupingIDHandler returns all positively triaged digests seen in the sliding
// window for a given grouping, split up by trace.
// Used by
func (wh *Handlers) PositiveDigestsByGroupingIDHandler(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "web_PositiveDigestsByGroupingIDHandler", trace.WithSampler(trace.AlwaysSample()))
defer span.End()
if err := wh.cheapLimitForAnonUsers(r); err != nil {
httputils.ReportError(w, err, "Try again later", http.StatusInternalServerError)
gID := chi.URLParam(r, "groupingID")
if len(gID) != 2*md5.Size {
http.Error(w, "Must specify 'groupingID', which is a hex-encoded MD5 hash of the JSON encoded group keys (e.g. source_type and name)", http.StatusBadRequest)
groupingID, err := hex.DecodeString(gID)
if err != nil {
httputils.ReportError(w, err, "Invalid 'groupingID', which is a hex-encoded MD5 hash of the JSON encoded group keys (e.g. source_type and name)", http.StatusBadRequest)
groupingKeys, err := wh.lookupGrouping(ctx, groupingID)
if err != nil {
httputils.ReportError(w, err, "Unknown groupingID", http.StatusBadRequest)
beginTile, endTile, err := wh.getTilesInWindow(ctx)
if err != nil {
httputils.ReportError(w, err, "Error while finding commits with data", http.StatusInternalServerError)
resp, err := wh.getPositiveDigests(ctx, beginTile, endTile, groupingID)
if err != nil {
httputils.ReportError(w, err, "Error while finding positive traces for grouping", http.StatusInternalServerError)
resp.GroupingID = gID
resp.GroupingKeys = groupingKeys
sendJSONResponse(w, resp)
// lookupGrouping returns the keys associated with the provided grouping id.
func (wh *Handlers) lookupGrouping(ctx context.Context, id schema.GroupingID) (paramtools.Params, error) {
ctx, span := trace.StartSpan(ctx, "lookupGrouping")
defer span.End()
row := wh.DB.QueryRow(ctx, `SELECT keys FROM Groupings WHERE grouping_id = $1`, id)
var keys paramtools.Params
err := row.Scan(&keys)
if err != nil {
return nil, skerr.Wrap(err) // likely the grouping was not found
return keys, nil
// getTilesInWindow returns the start and end tile of the given window.
func (wh *Handlers) getTilesInWindow(ctx context.Context) (schema.TileID, schema.TileID, error) {
ctx, span := trace.StartSpan(ctx, "getTilesInWindow")
defer span.End()
const statement = `WITH
RecentCommits AS (
SELECT tile_id, commit_id FROM CommitsWithData
ORDER BY commit_id DESC LIMIT $1
SELECT MIN(tile_id), MAX(tile_id) FROM RecentCommits
row := wh.DB.QueryRow(ctx, statement, wh.WindowSize)
var lc pgtype.Int4
var mc pgtype.Int4
if err := row.Scan(&lc, &mc); err != nil {
if err == pgx.ErrNoRows {
return 0, 0, nil // not enough commits seen, so start at tile 0.
return 0, 0, skerr.Wrapf(err, "getting latest commit")
if lc.Status == pgtype.Null || mc.Status == pgtype.Null {
// There are no commits with data, so start at tile 0.
return 0, 0, nil
return schema.TileID(lc.Int), schema.TileID(mc.Int), nil
// getPositiveDigests returns all digests which are triaged as positive in the given tiles that
// belong to the provided grouping. These digests are split according to the traces that made them.
func (wh *Handlers) getPositiveDigests(ctx context.Context, beginTile, endTile schema.TileID, groupingID schema.GroupingID) (frontend.PositiveDigestsByGroupingIDResponse, error) {
ctx, span := trace.StartSpan(ctx, "getPositiveDigests")
defer span.End()
tracesForGroup, err := wh.getTracesForGroup(ctx, groupingID)
if err != nil {
return frontend.PositiveDigestsByGroupingIDResponse{}, skerr.Wrap(err)
tilesInRange := make([]schema.TileID, 0, endTile-beginTile+1)
for i := beginTile; i <= endTile; i++ {
tilesInRange = append(tilesInRange, i)
// Querying traces, and then digests is much faster because the indexes can be used more
// efficiently. kjlubick@ tried specifying INNER LOOKUP JOIN but that didn't work on v20.2.7
const statement = `
DigestsOfInterest AS (
SELECT DISTINCT digest, trace_id FROM TiledTraceDigests
WHERE tile_id = ANY($1) AND trace_id = ANY($2)
SELECT encode(DigestsOfInterest.trace_id, 'hex'), encode(DigestsOfInterest.digest, 'hex') FROM DigestsOfInterest
JOIN Expectations ON grouping_id = $3 AND label = 'p' AND
Expectations.digest = DigestsOfInterest.digest
ORDER BY 1, 2`
rows, err := wh.DB.Query(ctx, statement, tilesInRange, tracesForGroup, groupingID)
if err != nil {
return frontend.PositiveDigestsByGroupingIDResponse{}, skerr.Wrapf(err, "fetching digests")
defer rows.Close()
traceToDigests := make(map[string][]types.Digest, len(tracesForGroup))
for rows.Next() {
var d types.Digest
var t string
if err := rows.Scan(&t, &d); err != nil {
return frontend.PositiveDigestsByGroupingIDResponse{}, skerr.Wrap(err)
traceToDigests[t] = append(traceToDigests[t], d)
rv := frontend.PositiveDigestsByGroupingIDResponse{}
for traceID, digests := range traceToDigests {
rv.Traces = append(rv.Traces, frontend.PositiveDigestsTraceInfo{
TraceID: traceID,
PositiveDigests: digests,
// Sort by trace ID for determinism - the digests should already be sorted
// because of the SQL query.
sort.Slice(rv.Traces, func(i, j int) bool {
return rv.Traces[i].TraceID < rv.Traces[j].TraceID
return rv, nil
// getTracesForGroup returns all the traces that are a part of the specified grouping.
func (wh *Handlers) getTracesForGroup(ctx context.Context, id schema.GroupingID) ([]schema.TraceID, error) {
ctx, span := trace.StartSpan(ctx, "getTracesForGroup")
defer span.End()
const statement = `SELECT trace_id FROM Traces
WHERE grouping_id = $1`
rows, err := wh.DB.Query(ctx, statement, id)
if err != nil {
return nil, skerr.Wrapf(err, "fetching trace ids")
defer rows.Close()
var rv []schema.TraceID
for rows.Next() {
var t schema.TraceID
if err := rows.Scan(&t); err != nil {
return nil, skerr.Wrap(err)
rv = append(rv, t)
return rv, nil