blob: 8bc7c4fd4c3dd5463b0874447a660b9a4e8423b0 [file] [log] [blame]
package expstorage
import (
"database/sql"
"fmt"
"strconv"
"strings"
"go.skia.org/infra/go/database"
"go.skia.org/infra/go/eventbus"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/timer"
"go.skia.org/infra/go/util"
"go.skia.org/infra/golden/go/types"
"golang.org/x/sync/errgroup"
)
// insertChunkSize is the number of records to insert with a single insert statement. The value
// was determined to work via the test. The exact value is not important since all inserts
// end up in the same transaction.
const insertChunkSize = 1000
// chunkPlaceholder is the placeholder string needed to insert a complete chunk of records.
var chunkPlaceholders = strings.TrimRight(strings.Repeat("(?,?,?,?),", insertChunkSize), ",")
// Stores expectations in an SQL database without any caching.
type SQLExpectationsStore struct {
vdb *database.VersionedDB
}
func NewSQLExpectationStore(vdb *database.VersionedDB) ExpectationsStore {
return &SQLExpectationsStore{
vdb: vdb,
}
}
// See ExpectationsStore interface.
func (s *SQLExpectationsStore) Get() (exp types.Expectations, err error) {
// Load the newest record from the database.
const stmt = `SELECT t1.name, t1.digest, t1.label
FROM exp_test_change AS t1
JOIN (
SELECT name, digest, MAX(changeid) as changeid
FROM exp_test_change
GROUP BY name, digest ) AS t2
ON (t1.name = t2.name AND t1.digest = t2.digest AND t1.changeid = t2.changeid)
WHERE t1.removed IS NULL`
rows, err := s.vdb.DB.Query(stmt)
if err != nil {
return nil, err
}
defer util.Close(rows)
result := types.TestExp{}
for rows.Next() {
var testName, digest, label string
if err = rows.Scan(&testName, &digest, &label); err != nil {
return nil, err
}
result.AddDigest(testName, digest, types.LabelFromString(label))
}
return types.NewExpectations(result), nil
}
// See ExpectationsStore interface.
func (s *SQLExpectationsStore) AddChange(changedTests types.TestExp, userId string) error {
return s.AddChangeWithTimeStamp(changedTests, userId, 0, util.TimeStampMs())
}
// TODO(stephana): Remove the AddChangeWithTimeStamp if we remove the
// migration code that calls it.
// AddChangeWithTimeStamp adds changed tests to the database with the
// given time stamp. This is primarily for migration purposes.
func (s *SQLExpectationsStore) AddChangeWithTimeStamp(changedTests types.TestExp, userId string, undoID int64, timeStamp int64) (retErr error) {
defer timer.New("adding exp change").Stop()
// Count the number of values to add.
changeCount := 0
for _, digests := range changedTests {
changeCount += len(digests)
}
const (
insertChange = `INSERT INTO exp_change (userid, ts, undo_changeid) VALUES (?, ?, ?)`
insertDigest = `INSERT INTO exp_test_change (changeid, name, digest, label) VALUES`
)
// start a transaction
tx, err := s.vdb.DB.Begin()
if err != nil {
return err
}
defer func() { retErr = database.CommitOrRollback(tx, retErr) }()
// create the change record
result, err := tx.Exec(insertChange, userId, timeStamp, undoID)
if err != nil {
return err
}
changeId, err := result.LastInsertId()
if err != nil {
return err
}
// If there are not changed records then we stop here.
if changeCount == 0 {
return nil
}
// Assemble the INSERT values.
chunks := [][]interface{}{}
remainderValuesStr := ""
current := make([]interface{}, 0, insertChunkSize)
for testName, digests := range changedTests {
for d, label := range digests {
remainderValuesStr += "(?, ?, ?, ?),"
current = append(current, changeId, testName, d, label.String())
if (len(current) / 4) >= insertChunkSize {
chunks = append(chunks, current)
current = make([]interface{}, 0, insertChunkSize)
remainderValuesStr = ""
}
}
}
remainderValuesStr = remainderValuesStr[:len(remainderValuesStr)-1]
// Insert all the chunks
if len(chunks) > 0 {
if err := insertWithPrep(insertDigest+chunkPlaceholders, tx, chunks...); err != nil {
return err
}
}
// Insert the remainder.
if len(current) > 0 {
if err := insertWithPrep(insertDigest+remainderValuesStr, tx, current); err != nil {
return err
}
}
return nil
}
// insertPrep assumes a statement with placeholders and one or more sets of values to insert.
func insertWithPrep(insertStmt string, tx *sql.Tx, valsArr ...[]interface{}) error {
prepStmt, err := tx.Prepare(insertStmt)
if err != nil {
return err
}
defer util.Close(prepStmt)
for _, vals := range valsArr {
_, err = prepStmt.Exec(vals...)
if err != nil {
return err
}
}
return nil
}
// removeChange, see ExpectationsStore interface.
func (s *SQLExpectationsStore) removeChange(changedDigests types.TestExp) (retErr error) {
defer timer.New("removing exp change").Stop()
const markRemovedStmt = `UPDATE exp_test_change
SET removed = IF(removed IS NULL, ?, removed)
WHERE (name=?) AND (digest=?)`
// start a transaction
tx, err := s.vdb.DB.Begin()
if err != nil {
return err
}
defer func() { retErr = database.CommitOrRollback(tx, retErr) }()
// Mark all the digests as removed.
now := util.TimeStampMs()
for testName, digests := range changedDigests {
for digest := range digests {
if _, err = tx.Exec(markRemovedStmt, now, testName, digest); err != nil {
return err
}
}
}
return nil
}
// See ExpectationsStore interface.
func (s *SQLExpectationsStore) QueryLog(offset, size int, details bool) ([]*TriageLogEntry, int, error) {
return s.queryChanges(offset, size, 0, details)
}
// getExpectationsAt returns the changes that are necessary to restore the values
// at the given triage change.
func (s *SQLExpectationsStore) getExpectationsAt(changeInfo *TriageLogEntry) (types.TestExp, error) {
const stmtTmpl = `
SELECT tc.name AS name, tc.digest AS digest, tc.label AS label
FROM exp_change AS ec, exp_test_change AS tc
WHERE ((tc.removed IS NULL) OR ((tc.removed IS NOT NULL) AND (tc.removed > ?))) AND
(ec.ts < ?) AND
(ec.id = tc.changeid) AND
((tc.name, tc.digest) IN (%s))
ORDER BY ec.ts ASC`
if len(changeInfo.Details) == 0 {
return types.TestExp{}, nil
}
// Extract the digests that we are interested in.
ret := types.TestExp{}
listArgs := []interface{}{changeInfo.TS, changeInfo.TS}
placeHolders := []string{}
for _, d := range changeInfo.Details {
if _, ok := ret[d.TestName]; !ok {
ret[d.TestName] = map[string]types.Label{}
}
ret[d.TestName][d.Digest] = types.UNTRIAGED
listArgs = append(listArgs, d.TestName, d.Digest)
placeHolders = append(placeHolders, "(?,?)")
}
// Add the necessary amount of placeholders to the SQL query.
stmt := fmt.Sprintf(stmtTmpl, strings.Join(placeHolders, ","))
// Fetch the records we are interested in.
rows, err := s.vdb.DB.Query(stmt, listArgs...)
if err != nil {
return nil, err
}
defer util.Close(rows)
var name, digest, label string
for rows.Next() {
if err = rows.Scan(&name, &digest, &label); err != nil {
return nil, err
}
// We expect that there could be multiple results for the same name and
// digest. They are sorted chronologically, so always overwrite earlier
// results with later results.
ret[name][digest] = types.LabelFromString(label)
}
return ret, nil
}
func (s *SQLExpectationsStore) queryChanges(offset, size int, changeID int64, details bool) ([]*TriageLogEntry, int, error) {
const (
stmtDetails = `SELECT ec.id, tc.name, tc.digest, tc.label
FROM exp_change AS ec, exp_test_change AS tc
WHERE (ec.id=tc.changeid) AND (ec.id IN (%s))
ORDER BY ec.id DESC, tc.name ASC, tc.digest ASC`
stmtTotal = `SELECT count(*) FROM exp_change`
stmtListTmpl = `SELECT ec.id, ec.userid, ec.ts, (IFNULL( COUNT( tc.changeid ) , 0 )) AS detailsCount, undo_changeid
FROM %s AS ec
LEFT OUTER JOIN exp_test_change AS tc
ON ec.id=tc.changeid
GROUP BY ec.id ORDER BY ec.ts DESC
LIMIT ?, ?`
nestedQuery = `(SELECT * FROM exp_change WHERE id=?)`
)
// Adjust the query based on whether we are interested in finding a specific item.
var stmtList string
listArgs := []interface{}{offset, size}
if changeID > 0 {
stmtList = fmt.Sprintf(stmtListTmpl, nestedQuery)
listArgs = append([]interface{}{changeID}, listArgs...)
} else {
stmtList = fmt.Sprintf(stmtListTmpl, "exp_change")
}
// Get the total number of records.
row := s.vdb.DB.QueryRow(stmtTotal)
var total int
if err := row.Scan(&total); err != nil {
return nil, 0, err
}
if total == 0 {
return []*TriageLogEntry{}, 0, nil
}
// Fetch the records we are interested in.
rows, err := s.vdb.DB.Query(stmtList, listArgs...)
if err != nil {
return nil, 0, err
}
defer util.Close(rows)
var ids []interface{}
var placeHolders []string
if details {
ids = make([]interface{}, 0, size)
placeHolders = make([]string, 0, size)
}
idToIdxMap := map[string]int{}
result := make([]*TriageLogEntry, 0, size)
for rows.Next() {
entry := &TriageLogEntry{}
if err = rows.Scan(&entry.ID, &entry.Name, &entry.TS, &entry.ChangeCount, &entry.UndoChangeID); err != nil {
return nil, 0, err
}
result = append(result, entry)
if details {
idToIdxMap[entry.ID] = len(result) - 1
ids = append(ids, entry.ID)
placeHolders = append(placeHolders, "?")
}
}
if details && len(result) > 0 {
stmt := fmt.Sprintf(stmtDetails, strings.Join(placeHolders, ","))
rows, err := s.vdb.DB.Query(stmt, ids...)
if err != nil {
return nil, 0, err
}
var recID int64
for rows.Next() {
detail := &TriageDetail{}
if err = rows.Scan(&recID, &detail.TestName, &detail.Digest, &detail.Label); err != nil {
return nil, 0, err
}
idx := idToIdxMap[strconv.FormatInt(recID, 10)]
if result[idx].Details == nil {
result[idx].Details = make([]*TriageDetail, 0, result[idx].ChangeCount)
}
result[idx].Details = append(result[idx].Details, detail)
}
}
return result, total, nil
}
// See ExpectationsStore interface.
func (s *SQLExpectationsStore) UndoChange(changeID int64, userID string) (types.TestExp, error) {
changeInfo, err := s.loadChangeEntry(changeID)
if err != nil {
return nil, err
}
// TODO(stephana): Enable undo and redo for undos.There is a small different
// between a redo and an undo in that the undo restores the state before the
// undo, while the redo replays the changes that were added in the original
// change.
// Refuse to undo a change that is the result of on undo.
if changeInfo.UndoChangeID != 0 {
return nil, fmt.Errorf("Unable to undo change %d which was created as an undo of change %d.", changeID, changeInfo.UndoChangeID)
}
// Get the expectations of tests of interest at that time.
changes, err := s.getExpectationsAt(changeInfo)
if err != nil {
return nil, err
}
return changes, s.AddChangeWithTimeStamp(changes, userID, changeID, util.TimeStampMs())
}
// Clear implements the ExpectationsStore interface.
func (s *SQLExpectationsStore) Clear() error {
const stmt_ec = `DELETE FROM exp_change`
const stmt_etc = `DELETE FROM exp_test_change`
var egroup errgroup.Group
egroup.Go(func() error {
_, err := s.vdb.DB.Exec(stmt_ec)
return err
})
egroup.Go(func() error {
_, err := s.vdb.DB.Exec(stmt_etc)
return err
})
return egroup.Wait()
}
// Loads a single change entry with all details from the DB.
func (s *SQLExpectationsStore) loadChangeEntry(changeID int64) (*TriageLogEntry, error) {
changeInfo, _, err := s.queryChanges(0, 5, changeID, true)
if err != nil {
return nil, fmt.Errorf("Unable to retrieve triage information: %s", err)
}
if len(changeInfo) != 1 {
return nil, fmt.Errorf("Triage information for change id %d should only be one record.", changeID)
}
return changeInfo[0], nil
}
// Wraps around an ExpectationsStore and caches the expectations using
// MemExpecationsStore.
type CachingExpectationStore struct {
store ExpectationsStore
cache ExpectationsStore
eventBus eventbus.EventBus
refresh bool
}
func NewCachingExpectationStore(store ExpectationsStore, eventBus eventbus.EventBus) ExpectationsStore {
ret := &CachingExpectationStore{
store: store,
cache: NewMemExpectationsStore(nil),
eventBus: eventBus,
refresh: true,
}
// Prime the cache upon creation.
// We ignore any error returned here for a simplified function signature and
// also because any ExpectationsStore implementation (i.e. CloudExpectationsStore)
// will do some connection checks before being passed to this instance.
_, _ = ret.Get()
// Register the events to update the cache.
ret.eventBus.SubscribeAsync(EV_EXPSTORAGE_CHANGED, ret.addChangeToCache)
return ret
}
// See ExpectationsStore interface.
func (c *CachingExpectationStore) Get() (exp types.Expectations, err error) {
if c.refresh {
c.refresh = false
tempExp, err := c.store.Get()
if err != nil {
return nil, err
}
if err = c.cache.AddChange(tempExp.TestExp().DeepCopy(), ""); err != nil {
return nil, err
}
}
return c.cache.Get()
}
// See ExpectationsStore interface.
func (c *CachingExpectationStore) AddChange(changedTests types.TestExp, userId string) error {
if err := c.store.AddChange(changedTests, userId); err != nil {
return err
}
// Fire an event that will trigger the addition to the cache and wait for it to complete.
// This is necessary because events that change the cache could also come from the distributed
// eventbus.
waitCh := make(chan bool)
c.eventBus.Publish(EV_EXPSTORAGE_CHANGED, evExpChange(changedTests, masterIssueID, waitCh), true)
<-waitCh
return nil
}
// addChangeToCache updates the cache and fires the change event.
func (c *CachingExpectationStore) addChangeToCache(evtChangedTests interface{}) {
evtData := evtChangedTests.(*EventExpectationChange)
changedTests := evtData.TestChanges
// Split the changes into removal and addition.
forRemoval := make(types.TestExp, len(changedTests))
forAddition := make(types.TestExp, len(changedTests))
for test, digests := range changedTests {
for digest, label := range digests {
if label == types.UNTRIAGED {
if foundTest, ok := forRemoval[test]; ok {
foundTest[digest] = label
} else {
forRemoval[test] = types.TestClassification{digest: label}
}
} else {
if foundTest, ok := forAddition[test]; ok {
foundTest[digest] = label
} else {
forAddition[test] = types.TestClassification{digest: label}
}
}
}
}
if len(forRemoval) > 0 {
if err := c.cache.removeChange(forRemoval); err != nil {
sklog.Errorf("Error removing changed expectations to cache: %s", err)
}
}
if len(forAddition) > 0 {
if err := c.cache.AddChange(forAddition, ""); err != nil {
sklog.Errorf("Error adding changed expectations to cache: %s", err)
}
}
if evtData.waitCh != nil {
evtData.waitCh <- true
}
sklog.Infof("Expectations change has been added to the cache.")
}
// removeChange implements the ExpectationsStore interface.
func (c *CachingExpectationStore) removeChange(changedDigests types.TestExp) error {
if err := c.store.removeChange(changedDigests); err != nil {
return err
}
// Fire an event that will trigger the addition to the cache.
waitCh := make(chan bool)
c.eventBus.Publish(EV_EXPSTORAGE_CHANGED, evExpChange(changedDigests, masterIssueID, waitCh), true)
<-waitCh
return nil
}
// See ExpectationsStore interface.
func (c *CachingExpectationStore) QueryLog(offset, size int, details bool) ([]*TriageLogEntry, int, error) {
return c.store.QueryLog(offset, size, details)
}
// See ExpectationsStore interface.
func (c *CachingExpectationStore) UndoChange(changeID int64, userID string) (types.TestExp, error) {
changedTests, err := c.store.UndoChange(changeID, userID)
if err != nil {
return nil, err
}
// Fire an event that will trigger the addition to the cache.
waitCh := make(chan bool)
c.eventBus.Publish(EV_EXPSTORAGE_CHANGED, evExpChange(changedTests, masterIssueID, waitCh), true)
<-waitCh
return changedTests, nil
}
// Clear implements the ExpectationsStore interface.
func (c *CachingExpectationStore) Clear() error {
if err := c.store.Clear(); err != nil {
return err
}
return c.cache.Clear()
}