blob: 975fc79f620214a1853b97cbabbd23fa23856e3c [file] [log] [blame]
// Package commenter contains an implementation of the code_review.ChangelistCommenter interface.
// It should be CRS-agnostic.
package commenter
import (
"bytes"
"context"
"text/template"
"time"
"github.com/jackc/pgx/v4/pgxpool"
"go.opencensus.io/trace"
"golang.org/x/sync/errgroup"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/now"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/golden/go/code_review"
"go.skia.org/infra/golden/go/sql"
"go.skia.org/infra/golden/go/sql/schema"
)
const (
numRecentOpenCLsMetric = "gold_num_recent_open_cls"
)
type ReviewSystem struct {
ID string // e.g. "gerrit", "gerrit-internal"
Client code_review.Client
}
type Impl struct {
db *pgxpool.Pool
instanceURL string
messageTemplate *template.Template
systems []ReviewSystem
lastCheck time.Time
commitsInWindow int
}
func New(db *pgxpool.Pool, systems []ReviewSystem, messageTemplate, instanceURL string, windowSize int) (*Impl, error) {
templ, err := template.New("message").Parse(messageTemplate)
if err != nil && messageTemplate != "" {
return nil, skerr.Wrapf(err, "Message template %q", messageTemplate)
}
return &Impl{
db: db,
instanceURL: instanceURL,
messageTemplate: templ,
systems: systems,
commitsInWindow: windowSize,
}, nil
}
// CommentOnChangelistsWithUntriagedDigests implements the code_review.ChangelistCommenter
// interface.
func (i *Impl) CommentOnChangelistsWithUntriagedDigests(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "commenter_CommentOnChangelistsWithUntriagedDigests")
defer span.End()
if i.lastCheck.IsZero() {
// Default to checking all CLs that had data ingested within the last day.
i.lastCheck = now.Now(ctx).Add(-1 * 24 * time.Hour)
}
lastCheckUpdate := now.Now(ctx)
patchsets, err := i.getNewestPatchsets(ctx)
if err != nil {
return skerr.Wrap(err)
}
if len(patchsets) == 0 {
i.lastCheck = lastCheckUpdate
sklog.Infof("No patchsets had seen updated since last check.")
return nil
}
err = i.addNewDigestCounts(ctx, patchsets)
if err != nil {
return skerr.Wrap(err)
}
for _, ps := range patchsets {
if ps.numNewDigests > 0 {
if err := i.commentOn(ctx, *ps); err != nil {
sklog.Warningf("Could not comment on CL and PS %#v: %s", ps, err)
// Continue anyway - don't let one problematic CL stop the rest.
}
}
}
// actually check them for untriaged digests
i.lastCheck = lastCheckUpdate
return nil
}
type patchsetInfo struct {
system string
changelistID string // qualified id
patchsetID string // qualified id
order int
numNewDigests int // an approximate count
}
// getNewestPatchsets returns the newest patchset for each open CL that had new data since the
// last time we checked. It filters out any patchsets that we've already commented on.
func (i *Impl) getNewestPatchsets(ctx context.Context) ([]*patchsetInfo, error) {
ctx, span := trace.StartSpan(ctx, "getNewestPatchsets")
defer span.End()
// Select the most recent patchset with data from all changelists that had newly triaged data
// since the last time we checked.
const statement = `WITH
ChangelistsWithNewData AS (
SELECT changelist_id FROM Changelists
WHERE status = 'open' and last_ingested_data > $1
)
SELECT DISTINCT ON (system, changelist_id)
Patchsets.system, Patchsets.changelist_id, patchset_id, ps_order, commented_on_cl FROM Patchsets
JOIN ChangelistsWithNewData on Patchsets.changelist_id = ChangelistsWithNewData.changelist_id
ORDER BY system, changelist_id, ps_order DESC
`
rows, err := i.db.Query(ctx, statement, i.lastCheck)
if err != nil {
return nil, skerr.Wrap(err)
}
defer rows.Close()
var rv []*patchsetInfo
openCLs := 0
for rows.Next() {
var row patchsetInfo
var commentAlready bool
if err := rows.Scan(&row.system, &row.changelistID, &row.patchsetID, &row.order, &commentAlready); err != nil {
return nil, skerr.Wrap(err)
}
openCLs++
if commentAlready {
// We don't bother with CLs for which we have already commented on the most recent PS.
continue
}
rv = append(rv, &row)
}
metrics2.GetInt64Metric(numRecentOpenCLsMetric, nil).Update(int64(openCLs))
return rv, nil
}
// addNewDigestCounts counts how many new images were produced for each of the patchsets. "New"
// means not seen in the current commit window.
func (i *Impl) addNewDigestCounts(ctx context.Context, patchsets []*patchsetInfo) error {
ctx, span := trace.StartSpan(ctx, "addNewDigestCounts")
defer span.End()
// We get the digests as their own step because we need to look up our secondary branch values
// using pairs of (branch_name, version_name), which is a bit awkward to write in pure SQL.
digestsOnPrimary, err := i.getDigestsOnPrimary(ctx)
if err != nil {
return skerr.Wrap(err)
}
eg, eCtx := errgroup.WithContext(ctx)
for idx := range patchsets {
ps := patchsets[idx]
eg.Go(func() error {
const statement = `SELECT DISTINCT digest
FROM SecondaryBranchValues WHERE branch_name = $1 AND version_name = $2`
rows, err := i.db.Query(eCtx, statement, ps.changelistID, ps.patchsetID)
if err != nil {
return skerr.Wrapf(err, "patchset %#v", *ps)
}
defer rows.Close()
newDigests := 0
var digestBytes schema.DigestBytes
var digestKey schema.MD5Hash
digest := digestKey[:]
for rows.Next() {
if err := rows.Scan(&digestBytes); err != nil {
return skerr.Wrap(err)
}
copy(digest, digestBytes)
if _, ok := digestsOnPrimary[digestKey]; !ok {
newDigests++
}
}
ps.numNewDigests = newDigests
return nil
})
}
return skerr.Wrap(eg.Wait())
}
// getDigestsOnPrimary returns a set of all digests currently on the primary branch.
func (i *Impl) getDigestsOnPrimary(ctx context.Context) (map[schema.MD5Hash]struct{}, error) {
ctx, span := trace.StartSpan(ctx, "getDigestsOnPrimary")
defer span.End()
const statement = `WITH
BeginningOfWindow AS (
SELECT tile_id FROM (
SELECT commit_id, tile_id FROM CommitsWithData
ORDER BY commit_id DESC
LIMIT $1
) ORDER BY commit_id ASC LIMIT 1
)
SELECT DISTINCT digest FROM TiledTraceDigests
JOIN BeginningOfWindow ON TiledTraceDigests.tile_id >= BeginningOfWindow.tile_id
`
rows, err := i.db.Query(ctx, statement, i.commitsInWindow)
if err != nil {
return nil, skerr.Wrap(err)
}
defer rows.Close()
rv := map[schema.MD5Hash]struct{}{}
var seen struct{}
var digestBytes schema.DigestBytes
var digestKey schema.MD5Hash
digest := digestKey[:]
for rows.Next() {
if err := rows.Scan(&digestBytes); err != nil {
return nil, skerr.Wrap(err)
}
copy(digest, digestBytes)
rv[digestKey] = seen
}
return rv, nil
}
// commentOn either comments on the given CL/PS that there are untriaged digests on it or
// logs if this commenter is configured to not actually comment.
func (i *Impl) commentOn(ctx context.Context, ps patchsetInfo) error {
clID := sql.Unqualify(ps.changelistID)
msg, err := i.untriagedMessage(commentTemplateContext{
CRS: ps.system,
ChangelistID: clID,
PatchsetOrder: ps.order,
NumNewDigests: ps.numNewDigests,
})
if err != nil {
return skerr.Wrap(err)
}
var client code_review.Client
for _, c := range i.systems {
if c.ID == ps.system {
client = c.Client
}
}
if client == nil {
sklog.Errorf("Could not make comment for system %s - not configured", ps.system)
return nil
}
if cl, err := client.GetChangelist(ctx, clID); err != nil {
if err == code_review.ErrNotFound {
sklog.Infof("CL %s might have been deleted", clID)
return nil
}
return skerr.Wrap(err)
} else {
if cl.Status != code_review.Open {
sklog.Infof("CL %s was not open - %v", clID, cl.Status)
return nil
}
}
sklog.Infof("Commenting on CL %s PS %d about newly produced images", clID, ps.order)
if err := client.CommentOn(ctx, clID, msg); err != nil {
return skerr.Wrapf(err, "commenting on %s CL %s", ps.system, clID)
}
const statement = `UPDATE Patchsets SET commented_on_cl = TRUE WHERE patchset_id = $1`
_, err = i.db.Exec(ctx, statement, ps.patchsetID)
if err != nil {
return skerr.Wrap(err)
}
return nil
}
// commentTemplateContext contains the fields that can be substituted into
type commentTemplateContext struct {
ChangelistID string
CRS string
InstanceURL string
NumNewDigests int
PatchsetOrder int
}
// untriagedMessage returns a message about untriaged images on the given CL/PS.
func (i *Impl) untriagedMessage(c commentTemplateContext) (string, error) {
c.InstanceURL = i.instanceURL
var b bytes.Buffer
if err := i.messageTemplate.Execute(&b, c); err != nil {
return "", skerr.Wrapf(err, "With template context %#v", c)
}
return b.String(), nil
}
// Make sure Impl fulfills the code_review.ChangelistCommenter interface.
var _ code_review.ChangelistCommenter = (*Impl)(nil)