package expstorage

import (
	"database/sql"
	"fmt"
	"strconv"
	"strings"

	"go.skia.org/infra/go/database"
	"go.skia.org/infra/go/eventbus"
	"go.skia.org/infra/go/sklog"
	"go.skia.org/infra/go/timer"
	"go.skia.org/infra/go/util"
	"go.skia.org/infra/golden/go/types"
	"golang.org/x/sync/errgroup"
)

// insertChunkSize is the number of records to insert with a single insert statement. The value
// was determined to work via the test. The exact value is not important since all inserts
// end up in the same transaction.
const insertChunkSize = 1000

// chunkPlaceholder is the placeholder string needed to insert a complete chunk of records.
var chunkPlaceholders = strings.TrimRight(strings.Repeat("(?,?,?,?),", insertChunkSize), ",")

// Stores expectations in an SQL database without any caching.
type SQLExpectationsStore struct {
	vdb *database.VersionedDB
}

func NewSQLExpectationStore(vdb *database.VersionedDB) ExpectationsStore {
	return &SQLExpectationsStore{
		vdb: vdb,
	}
}

// See ExpectationsStore interface.
func (s *SQLExpectationsStore) Get() (exp types.Expectations, err error) {
	// Load the newest record from the database.
	const stmt = `SELECT t1.name, t1.digest, t1.label
	         FROM exp_test_change AS t1
	         JOIN (
	         	SELECT name, digest, MAX(changeid) as changeid
	         	FROM exp_test_change
	         	GROUP BY name, digest ) AS t2
				ON (t1.name = t2.name AND t1.digest = t2.digest AND t1.changeid = t2.changeid)
				WHERE t1.removed IS NULL`

	rows, err := s.vdb.DB.Query(stmt)
	if err != nil {
		return nil, err
	}
	defer util.Close(rows)

	result := types.TestExp{}
	for rows.Next() {
		var testName, digest, label string
		if err = rows.Scan(&testName, &digest, &label); err != nil {
			return nil, err
		}
		result.AddDigest(testName, digest, types.LabelFromString(label))
	}
	return types.NewExpectations(result), nil
}

// See ExpectationsStore interface.
func (s *SQLExpectationsStore) AddChange(changedTests types.TestExp, userId string) error {
	return s.AddChangeWithTimeStamp(changedTests, userId, 0, util.TimeStampMs())
}

// TODO(stephana): Remove the AddChangeWithTimeStamp if we remove the
// migration code that calls it.

// AddChangeWithTimeStamp adds changed tests to the database with the
// given time stamp. This is primarily for migration purposes.
func (s *SQLExpectationsStore) AddChangeWithTimeStamp(changedTests types.TestExp, userId string, undoID int64, timeStamp int64) (retErr error) {
	defer timer.New("adding exp change").Stop()

	// Count the number of values to add.
	changeCount := 0
	for _, digests := range changedTests {
		changeCount += len(digests)
	}

	const (
		insertChange = `INSERT INTO exp_change (userid, ts, undo_changeid) VALUES (?, ?, ?)`
		insertDigest = `INSERT INTO exp_test_change (changeid, name, digest, label) VALUES`
	)

	// start a transaction
	tx, err := s.vdb.DB.Begin()
	if err != nil {
		return err
	}

	defer func() { retErr = database.CommitOrRollback(tx, retErr) }()

	// create the change record
	result, err := tx.Exec(insertChange, userId, timeStamp, undoID)
	if err != nil {
		return err
	}
	changeId, err := result.LastInsertId()
	if err != nil {
		return err
	}

	// If there are not changed records then we stop here.
	if changeCount == 0 {
		return nil
	}

	// Assemble the INSERT values.
	chunks := [][]interface{}{}
	remainderValuesStr := ""
	current := make([]interface{}, 0, insertChunkSize)
	for testName, digests := range changedTests {
		for d, label := range digests {
			remainderValuesStr += "(?, ?, ?, ?),"
			current = append(current, changeId, testName, d, label.String())

			if (len(current) / 4) >= insertChunkSize {
				chunks = append(chunks, current)
				current = make([]interface{}, 0, insertChunkSize)
				remainderValuesStr = ""
			}
		}
	}
	remainderValuesStr = remainderValuesStr[:len(remainderValuesStr)-1]

	// Insert all the chunks
	if len(chunks) > 0 {
		if err := insertWithPrep(insertDigest+chunkPlaceholders, tx, chunks...); err != nil {
			return err
		}
	}

	// Insert the remainder.
	if len(current) > 0 {
		if err := insertWithPrep(insertDigest+remainderValuesStr, tx, current); err != nil {
			return err
		}
	}

	return nil
}

// insertPrep assumes a statement with placeholders and one or more sets of values to insert.
func insertWithPrep(insertStmt string, tx *sql.Tx, valsArr ...[]interface{}) error {
	prepStmt, err := tx.Prepare(insertStmt)
	if err != nil {
		return err
	}
	defer util.Close(prepStmt)

	for _, vals := range valsArr {
		_, err = prepStmt.Exec(vals...)
		if err != nil {
			return err
		}
	}
	return nil
}

// removeChange, see ExpectationsStore interface.
func (s *SQLExpectationsStore) removeChange(changedDigests types.TestExp) (retErr error) {
	defer timer.New("removing exp change").Stop()

	const markRemovedStmt = `UPDATE exp_test_change
	                         SET removed = IF(removed IS NULL, ?, removed)
	                         WHERE (name=?) AND (digest=?)`

	// start a transaction
	tx, err := s.vdb.DB.Begin()
	if err != nil {
		return err
	}

	defer func() { retErr = database.CommitOrRollback(tx, retErr) }()

	// Mark all the digests as removed.
	now := util.TimeStampMs()
	for testName, digests := range changedDigests {
		for digest := range digests {
			if _, err = tx.Exec(markRemovedStmt, now, testName, digest); err != nil {
				return err
			}
		}
	}

	return nil
}

// See ExpectationsStore interface.
func (s *SQLExpectationsStore) QueryLog(offset, size int, details bool) ([]*TriageLogEntry, int, error) {
	return s.queryChanges(offset, size, 0, details)
}

// getExpectationsAt returns the changes that are necessary to restore the values
// at the given triage change.
func (s *SQLExpectationsStore) getExpectationsAt(changeInfo *TriageLogEntry) (types.TestExp, error) {
	const stmtTmpl = `
		SELECT tc.name AS name, tc.digest AS digest, tc.label AS label
		FROM exp_change AS ec, exp_test_change AS tc
		WHERE ((tc.removed IS NULL) OR ((tc.removed IS NOT NULL) AND (tc.removed > ?))) AND
		      (ec.ts < ?) AND
		      (ec.id = tc.changeid) AND
					((tc.name, tc.digest) IN (%s))
		ORDER BY ec.ts ASC`

	if len(changeInfo.Details) == 0 {
		return types.TestExp{}, nil
	}

	// Extract the digests that we are interested in.
	ret := types.TestExp{}
	listArgs := []interface{}{changeInfo.TS, changeInfo.TS}
	placeHolders := []string{}
	for _, d := range changeInfo.Details {
		if _, ok := ret[d.TestName]; !ok {
			ret[d.TestName] = map[string]types.Label{}
		}
		ret[d.TestName][d.Digest] = types.UNTRIAGED
		listArgs = append(listArgs, d.TestName, d.Digest)
		placeHolders = append(placeHolders, "(?,?)")
	}

	// Add the necessary amount of placeholders to the SQL query.
	stmt := fmt.Sprintf(stmtTmpl, strings.Join(placeHolders, ","))

	// Fetch the records we are interested in.
	rows, err := s.vdb.DB.Query(stmt, listArgs...)
	if err != nil {
		return nil, err
	}
	defer util.Close(rows)

	var name, digest, label string
	for rows.Next() {
		if err = rows.Scan(&name, &digest, &label); err != nil {
			return nil, err
		}
		// We expect that there could be multiple results for the same name and
		// digest. They are sorted chronologically, so always overwrite earlier
		// results with later results.
		ret[name][digest] = types.LabelFromString(label)
	}

	return ret, nil
}

func (s *SQLExpectationsStore) queryChanges(offset, size int, changeID int64, details bool) ([]*TriageLogEntry, int, error) {
	const (
		stmtDetails = `SELECT ec.id, tc.name, tc.digest, tc.label
					  FROM exp_change AS ec, exp_test_change AS tc
					  WHERE (ec.id=tc.changeid) AND (ec.id IN (%s))
					  ORDER BY ec.id DESC, tc.name ASC, tc.digest ASC`

		stmtTotal = `SELECT count(*) FROM exp_change`

		stmtListTmpl = `SELECT ec.id, ec.userid, ec.ts, (IFNULL( COUNT( tc.changeid ) , 0 )) AS detailsCount, undo_changeid
					  FROM %s AS ec
						LEFT OUTER JOIN exp_test_change AS tc
							ON ec.id=tc.changeid
					  GROUP BY ec.id ORDER BY ec.ts DESC
					  LIMIT ?, ?`

		nestedQuery = `(SELECT * FROM exp_change WHERE id=?)`
	)

	// Adjust the query based on whether we are interested in finding a specific item.
	var stmtList string
	listArgs := []interface{}{offset, size}
	if changeID > 0 {
		stmtList = fmt.Sprintf(stmtListTmpl, nestedQuery)
		listArgs = append([]interface{}{changeID}, listArgs...)

	} else {
		stmtList = fmt.Sprintf(stmtListTmpl, "exp_change")
	}

	// Get the total number of records.
	row := s.vdb.DB.QueryRow(stmtTotal)
	var total int
	if err := row.Scan(&total); err != nil {
		return nil, 0, err
	}

	if total == 0 {
		return []*TriageLogEntry{}, 0, nil
	}

	// Fetch the records we are interested in.
	rows, err := s.vdb.DB.Query(stmtList, listArgs...)
	if err != nil {
		return nil, 0, err
	}
	defer util.Close(rows)

	var ids []interface{}
	var placeHolders []string

	if details {
		ids = make([]interface{}, 0, size)
		placeHolders = make([]string, 0, size)
	}

	idToIdxMap := map[string]int{}
	result := make([]*TriageLogEntry, 0, size)
	for rows.Next() {
		entry := &TriageLogEntry{}
		if err = rows.Scan(&entry.ID, &entry.Name, &entry.TS, &entry.ChangeCount, &entry.UndoChangeID); err != nil {
			return nil, 0, err
		}

		result = append(result, entry)
		if details {
			idToIdxMap[entry.ID] = len(result) - 1
			ids = append(ids, entry.ID)
			placeHolders = append(placeHolders, "?")
		}
	}

	if details && len(result) > 0 {
		stmt := fmt.Sprintf(stmtDetails, strings.Join(placeHolders, ","))
		rows, err := s.vdb.DB.Query(stmt, ids...)
		if err != nil {
			return nil, 0, err
		}

		var recID int64
		for rows.Next() {
			detail := &TriageDetail{}
			if err = rows.Scan(&recID, &detail.TestName, &detail.Digest, &detail.Label); err != nil {
				return nil, 0, err
			}

			idx := idToIdxMap[strconv.FormatInt(recID, 10)]
			if result[idx].Details == nil {
				result[idx].Details = make([]*TriageDetail, 0, result[idx].ChangeCount)
			}
			result[idx].Details = append(result[idx].Details, detail)
		}
	}

	return result, total, nil
}

// See  ExpectationsStore interface.
func (s *SQLExpectationsStore) UndoChange(changeID int64, userID string) (types.TestExp, error) {
	changeInfo, err := s.loadChangeEntry(changeID)
	if err != nil {
		return nil, err
	}

	// TODO(stephana): Enable undo and redo for undos.There is a small different
	// between a redo and an undo in that the undo restores the state before the
	// undo, while the redo replays the changes that were added in the original
	// change.

	// Refuse to undo a change that is the result of on undo.
	if changeInfo.UndoChangeID != 0 {
		return nil, fmt.Errorf("Unable to undo change %d which was created as an undo of change %d.", changeID, changeInfo.UndoChangeID)
	}

	// Get the expectations of tests of interest at that time.
	changes, err := s.getExpectationsAt(changeInfo)
	if err != nil {
		return nil, err
	}

	return changes, s.AddChangeWithTimeStamp(changes, userID, changeID, util.TimeStampMs())
}

// Clear implements the ExpectationsStore interface.
func (s *SQLExpectationsStore) Clear() error {
	const stmt_ec = `DELETE FROM exp_change`
	const stmt_etc = `DELETE FROM exp_test_change`

	var egroup errgroup.Group
	egroup.Go(func() error {
		_, err := s.vdb.DB.Exec(stmt_ec)
		return err
	})

	egroup.Go(func() error {
		_, err := s.vdb.DB.Exec(stmt_etc)
		return err
	})

	return egroup.Wait()
}

// Loads a single change entry with all details from the DB.
func (s *SQLExpectationsStore) loadChangeEntry(changeID int64) (*TriageLogEntry, error) {
	changeInfo, _, err := s.queryChanges(0, 5, changeID, true)
	if err != nil {
		return nil, fmt.Errorf("Unable to retrieve triage information: %s", err)
	}

	if len(changeInfo) != 1 {
		return nil, fmt.Errorf("Triage information for change id %d should only be one record.", changeID)
	}

	return changeInfo[0], nil
}

// Wraps around an ExpectationsStore and caches the expectations using
// MemExpecationsStore.
type CachingExpectationStore struct {
	store    ExpectationsStore
	cache    ExpectationsStore
	eventBus eventbus.EventBus
	refresh  bool
}

func NewCachingExpectationStore(store ExpectationsStore, eventBus eventbus.EventBus) ExpectationsStore {
	ret := &CachingExpectationStore{
		store:    store,
		cache:    NewMemExpectationsStore(nil),
		eventBus: eventBus,
		refresh:  true,
	}

	// Prime the cache upon creation.
	// We ignore any error returned here for a simplified function signature and
	// also because any ExpectationsStore implementation (i.e. CloudExpectationsStore)
	// will do some connection checks before being passed to this instance.
	_, _ = ret.Get()

	// Register the events to update the cache.
	ret.eventBus.SubscribeAsync(EV_EXPSTORAGE_CHANGED, ret.addChangeToCache)

	return ret
}

// See ExpectationsStore interface.
func (c *CachingExpectationStore) Get() (exp types.Expectations, err error) {
	if c.refresh {
		c.refresh = false
		tempExp, err := c.store.Get()
		if err != nil {
			return nil, err
		}

		if err = c.cache.AddChange(tempExp.TestExp().DeepCopy(), ""); err != nil {
			return nil, err
		}
	}
	return c.cache.Get()
}

// See ExpectationsStore interface.
func (c *CachingExpectationStore) AddChange(changedTests types.TestExp, userId string) error {
	if err := c.store.AddChange(changedTests, userId); err != nil {
		return err
	}
	// Fire an event that will trigger the addition to the cache and wait for it to complete.
	// This is necessary because events that change the cache could also come from the distributed
	// eventbus.
	waitCh := make(chan bool)
	c.eventBus.Publish(EV_EXPSTORAGE_CHANGED, evExpChange(changedTests, masterIssueID, waitCh), true)
	<-waitCh
	return nil
}

// addChangeToCache updates the cache and fires the change event.
func (c *CachingExpectationStore) addChangeToCache(evtChangedTests interface{}) {
	evtData := evtChangedTests.(*EventExpectationChange)
	changedTests := evtData.TestChanges

	// Split the changes into removal and addition.
	forRemoval := make(types.TestExp, len(changedTests))
	forAddition := make(types.TestExp, len(changedTests))
	for test, digests := range changedTests {
		for digest, label := range digests {
			if label == types.UNTRIAGED {
				if foundTest, ok := forRemoval[test]; ok {
					foundTest[digest] = label
				} else {
					forRemoval[test] = types.TestClassification{digest: label}
				}
			} else {
				if foundTest, ok := forAddition[test]; ok {
					foundTest[digest] = label
				} else {
					forAddition[test] = types.TestClassification{digest: label}
				}
			}
		}
	}

	if len(forRemoval) > 0 {
		if err := c.cache.removeChange(forRemoval); err != nil {
			sklog.Errorf("Error removing changed expectations to cache: %s", err)
		}
	}

	if len(forAddition) > 0 {
		if err := c.cache.AddChange(forAddition, ""); err != nil {
			sklog.Errorf("Error adding changed expectations to cache: %s", err)
		}
	}
	if evtData.waitCh != nil {
		evtData.waitCh <- true
	}
	sklog.Infof("Expectations change has been added to the cache.")
}

// removeChange implements the ExpectationsStore interface.
func (c *CachingExpectationStore) removeChange(changedDigests types.TestExp) error {
	if err := c.store.removeChange(changedDigests); err != nil {
		return err
	}

	// Fire an event that will trigger the addition to the cache.
	waitCh := make(chan bool)
	c.eventBus.Publish(EV_EXPSTORAGE_CHANGED, evExpChange(changedDigests, masterIssueID, waitCh), true)
	<-waitCh
	return nil
}

// See ExpectationsStore interface.
func (c *CachingExpectationStore) QueryLog(offset, size int, details bool) ([]*TriageLogEntry, int, error) {
	return c.store.QueryLog(offset, size, details)
}

// See  ExpectationsStore interface.
func (c *CachingExpectationStore) UndoChange(changeID int64, userID string) (types.TestExp, error) {
	changedTests, err := c.store.UndoChange(changeID, userID)
	if err != nil {
		return nil, err
	}

	// Fire an event that will trigger the addition to the cache.
	waitCh := make(chan bool)
	c.eventBus.Publish(EV_EXPSTORAGE_CHANGED, evExpChange(changedTests, masterIssueID, waitCh), true)
	<-waitCh
	return changedTests, nil
}

// Clear implements the ExpectationsStore interface.
func (c *CachingExpectationStore) Clear() error {
	if err := c.store.Clear(); err != nil {
		return err
	}
	return c.cache.Clear()
}
