blob: e85d805e046ac5d85588aac3ab88f52131e8d0b9 [file] [log] [blame]
package sqlwrapped
import (
"context"
"fmt"
"github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx"
"github.com/google/uuid"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"go.opencensus.io/trace"
"go.skia.org/infra/go/now"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/golden/go/expectations"
"go.skia.org/infra/golden/go/sql"
"go.skia.org/infra/golden/go/sql/schema"
"go.skia.org/infra/golden/go/types"
)
// Impl wraps a Firestore-backed Expectation Store with a layer to write changes to a SQL database
// but return everything from the Firestore DB. This will be in place until the existing search
// implementation can be replaced, due to the interwoven nature of the querysnapshots of the
// Firestore expectations and the current searching code.
type Impl struct {
LegacyStore expectations.Store
sqlDB *pgxpool.Pool
branch string
}
func New(store expectations.Store, db *pgxpool.Pool) *Impl {
return &Impl{
LegacyStore: store,
sqlDB: db,
}
}
// Get returns the result from the wrapped Store.
func (i *Impl) Get(ctx context.Context) (expectations.ReadOnly, error) {
return i.LegacyStore.Get(ctx)
}
// GetCopy returns the result from the wrapped Store.
func (i *Impl) GetCopy(ctx context.Context) (*expectations.Expectations, error) {
return i.LegacyStore.GetCopy(ctx)
}
// AddChange first adds the change to the Firestore database - if that succeeds, it writes the
// corresponding values to the SQL DB. Because the incoming deltas only have the test name, it
// needs to look up the associated corpora with those. It writes the expectations to the SQL db
// in one transaction, so to avoid partial commit errors.
func (i *Impl) AddChange(ctx context.Context, changes []expectations.Delta, userID string) error {
ctx, span := trace.StartSpan(ctx, "sqlwrapped_AddChange", trace.WithSampler(trace.AlwaysSample()))
span.AddAttributes(trace.Int64Attribute("num_total_changes", int64(len(changes))))
defer span.End()
if err := i.LegacyStore.AddChange(ctx, changes, userID); err != nil {
return skerr.Wrap(err)
}
deltas, err := i.resolveGroupings(ctx, changes)
if err != nil {
return skerr.Wrapf(err, "getting groupings for %d changes", len(changes))
}
if len(deltas) == 0 {
return nil
}
span.AddAttributes(trace.Int64Attribute("num_changes", int64(len(deltas))))
err = crdbpgx.ExecuteTx(ctx, i.sqlDB, pgx.TxOptions{}, func(tx pgx.Tx) error {
expID, err := writeRecord(ctx, tx, userID, len(deltas), i.branch)
if err != nil {
return err
}
err = fillPreviousLabel(ctx, tx, deltas, expID)
if err != nil {
return err
}
err = writeDeltas(ctx, tx, deltas)
if err != nil {
return err
}
if i.branch == "" {
return writeExpectations(ctx, tx, deltas)
}
return writeSecondaryExpectations(ctx, tx, deltas, i.branch)
})
if err != nil {
return skerr.Wrapf(err, "writing %d expectations from %s", len(changes), userID)
}
return nil
}
// writeRecord writes a new ExpectationRecord to the DB.
func writeRecord(ctx context.Context, tx pgx.Tx, userID string, numChanges int, branch string) (uuid.UUID, error) {
ctx, span := trace.StartSpan(ctx, "writeRecord")
defer span.End()
var br *string
if branch != "" {
br = &branch
}
const statement = `INSERT INTO ExpectationRecords
(user_name, triage_time, num_changes, branch_name) VALUES ($1, $2, $3, $4) RETURNING expectation_record_id`
row := tx.QueryRow(ctx, statement, userID, now.Now(ctx), numChanges, br)
var recordUUID uuid.UUID
err := row.Scan(&recordUUID)
if err != nil {
return uuid.UUID{}, skerr.Wrapf(err, "getting new UUID")
}
return recordUUID, nil
}
// resolveGroupings creates the initial ExpectationDeltaRows by looking up the groupings based on
// the test names. If a test name could belong to more than one grouping, it is undetermined which
// will be returned.
func (i *Impl) resolveGroupings(ctx context.Context, changes []expectations.Delta) ([]schema.ExpectationDeltaRow, error) {
ctx, span := trace.StartSpan(ctx, "resolveGroupings")
defer span.End()
uniquetests := map[types.TestName]schema.GroupingID{}
for _, c := range changes {
uniquetests[c.Grouping] = nil
}
const statement = `
SELECT keys -> 'name', grouping_id FROM Groupings where keys -> 'name' IN `
arguments := make([]interface{}, 0, len(uniquetests))
for tn := range uniquetests {
arguments = append(arguments, tn)
}
vp := sql.ValuesPlaceholders(len(uniquetests), 1)
rows, err := i.sqlDB.Query(ctx, statement+vp, arguments...)
if err != nil {
return nil, skerr.Wrapf(err, "getting groupings for %s", arguments)
}
defer rows.Close()
for rows.Next() {
var tn types.TestName
var gID schema.GroupingID
if err := rows.Scan(&tn, &gID); err != nil {
return nil, skerr.Wrap(err)
}
uniquetests[tn] = gID
}
var rv []schema.ExpectationDeltaRow
for _, c := range changes {
d, err := sql.DigestToBytes(c.Digest)
if err != nil {
return nil, skerr.Wrapf(err, "invalid digest %q", c.Digest)
}
gID, ok := uniquetests[c.Grouping]
if !ok || gID == nil {
sklog.Warningf("Cannot find grouping for test %s", c.Grouping)
continue
}
rv = append(rv, schema.ExpectationDeltaRow{
GroupingID: gID,
Digest: d,
LabelAfter: convertLabel(c.Label),
})
}
return rv, nil
}
type expectationKey struct {
groupingID schema.MD5Hash
digest schema.MD5Hash
}
// fillPreviousLabel looks up all the expectations for the partially filled-out deltas passed in
// and updates those in-place.
func fillPreviousLabel(ctx context.Context, tx pgx.Tx, deltas []schema.ExpectationDeltaRow, expID uuid.UUID) error {
ctx, span := trace.StartSpan(ctx, "fillPreviousLabel")
defer span.End()
toUpdate := map[expectationKey]*schema.ExpectationDeltaRow{}
for i := range deltas {
deltas[i].ExpectationRecordID = expID // fill in expectation record while we are here
deltas[i].LabelBefore = schema.LabelUntriaged // default to untriaged
toUpdate[expectationKey{
groupingID: sql.AsMD5Hash(deltas[i].GroupingID),
digest: sql.AsMD5Hash(deltas[i].Digest),
}] = &deltas[i]
}
statement := `SELECT grouping_id, digest, label FROM Expectations WHERE `
// We should be safe from injection attacks because we are hex encoding known valid byte arrays.
// I couldn't find a better way to match multiple composite keys using our usual techniques
// involving placeholders.
for i, d := range deltas {
if i != 0 {
statement += " OR "
}
statement += fmt.Sprintf(`(grouping_id = x'%x' AND digest = x'%x')`, d.GroupingID, d.Digest)
}
rows, err := tx.Query(ctx, statement)
if err != nil {
return err // don't wrap, could be retried
}
defer rows.Close()
for rows.Next() {
var gID schema.GroupingID
var d schema.DigestBytes
var label schema.ExpectationLabel
if err := rows.Scan(&gID, &d, &label); err != nil {
return skerr.Wrap(err) // probably not retryable
}
ek := expectationKey{
groupingID: sql.AsMD5Hash(gID),
digest: sql.AsMD5Hash(d),
}
row := toUpdate[ek]
if row == nil {
sklog.Warningf("Unmatched row with grouping %x and digest %x", gID, d)
continue // should never happen
}
row.LabelBefore = label
}
return nil
}
// writeDeltas writes the given rows to the SQL DB.
func writeDeltas(ctx context.Context, tx pgx.Tx, deltas []schema.ExpectationDeltaRow) error {
ctx, span := trace.StartSpan(ctx, "writeDeltas")
defer span.End()
const statement = `INSERT INTO ExpectationDeltas
(expectation_record_id, grouping_id, digest, label_before, label_after) VALUES `
const valuesPerRow = 5
vp := sql.ValuesPlaceholders(valuesPerRow, len(deltas))
arguments := make([]interface{}, 0, len(deltas)*valuesPerRow)
for _, d := range deltas {
arguments = append(arguments, d.ExpectationRecordID, d.GroupingID, d.Digest, d.LabelBefore, d.LabelAfter)
}
_, err := tx.Exec(ctx, statement+vp, arguments...)
return err // don't wrap, could be retryable
}
// writeExpectations writes expectations based on the passed in deltas to the DB.
func writeExpectations(ctx context.Context, tx pgx.Tx, deltas []schema.ExpectationDeltaRow) error {
ctx, span := trace.StartSpan(ctx, "writeExpectations")
defer span.End()
const statement = `UPSERT INTO Expectations
(grouping_id, digest, label, expectation_record_id) VALUES `
const valuesPerRow = 4
vp := sql.ValuesPlaceholders(valuesPerRow, len(deltas))
arguments := make([]interface{}, 0, len(deltas)*valuesPerRow)
for _, d := range deltas {
arguments = append(arguments, d.GroupingID, d.Digest, d.LabelAfter, d.ExpectationRecordID)
}
_, err := tx.Exec(ctx, statement+vp, arguments...)
return err // don't wrap, could be retryable
}
// writeSecondaryExpectations writes expectations based on the passed in deltas to the appropriate
// table for the secondary branch.
func writeSecondaryExpectations(ctx context.Context, tx pgx.Tx, deltas []schema.ExpectationDeltaRow, branch string) error {
ctx, span := trace.StartSpan(ctx, "writeSecondaryExpectations")
defer span.End()
const statement = `UPSERT INTO SecondaryBranchExpectations
(branch_name, grouping_id, digest, label, expectation_record_id) VALUES `
const valuesPerRow = 5
vp := sql.ValuesPlaceholders(valuesPerRow, len(deltas))
arguments := make([]interface{}, 0, len(deltas)*valuesPerRow)
for _, d := range deltas {
arguments = append(arguments, branch, d.GroupingID, d.Digest, d.LabelAfter, d.ExpectationRecordID)
}
_, err := tx.Exec(ctx, statement+vp, arguments...)
return err // don't wrap, could be retryable
}
// QueryLog returns the result from the underlying Firestore DB.
func (i *Impl) QueryLog(ctx context.Context, offset, n int, details bool) ([]expectations.TriageLogEntry, int, error) {
return i.LegacyStore.QueryLog(ctx, offset, n, details)
}
// UndoChange undoes the change in the Firestore DB, and writes a row to a table in the SQL DB so
// it can be manually applied. This is necessary due to the incompatibility of the ID types between
// Firestore and the Expectation table. It happens rarely (~1/week across all instances), so this
// shouldn't be too large of a manual change when removing the Firestore DB.
func (i *Impl) UndoChange(ctx context.Context, changeID, userID string) error {
err := i.LegacyStore.UndoChange(ctx, changeID, userID)
if err != nil {
return skerr.Wrap(err)
}
const statement = `INSERT INTO DeprecatedExpectationUndos (expectation_id, user_id, ts)
VALUES ($1, $2, $3)`
_, err = i.sqlDB.Exec(ctx, statement, changeID, userID, now.Now(ctx))
if err != nil {
return skerr.Wrap(err)
}
return nil
}
// ForChangelist returns the result from the underlying Firestore DB wrapped so it too will
// write to the SQL db.
func (i *Impl) ForChangelist(id, crs string) expectations.Store {
return &Impl{
sqlDB: i.sqlDB,
LegacyStore: i.LegacyStore.ForChangelist(id, crs),
branch: fmt.Sprintf("%s_%s", crs, id),
}
}
// GetTriageHistory returns the result from the underlying Firestore DB.
func (i *Impl) GetTriageHistory(ctx context.Context, grouping types.TestName, digest types.Digest) ([]expectations.TriageHistory, error) {
return i.LegacyStore.GetTriageHistory(ctx, grouping, digest)
}
func convertLabel(label expectations.Label) schema.ExpectationLabel {
switch label {
case expectations.Positive:
return schema.LabelPositive
case expectations.Negative:
return schema.LabelNegative
case expectations.Untriaged:
return schema.LabelUntriaged
}
sklog.Warningf("Invalid label %q", label)
return schema.LabelUntriaged
}
// Make sure Impl fulfills the expectations.Store interface
var _ expectations.Store = (*Impl)(nil)