package sqlregression2store

import (
	"bytes"
	"context"
	"fmt"
	"strings"
	"text/template"
	"time"

	"github.com/google/uuid"
	"github.com/jackc/pgx/v4"
	"go.skia.org/infra/go/metrics2"
	"go.skia.org/infra/go/skerr"
	"go.skia.org/infra/go/sklog"
	"go.skia.org/infra/go/sql/pool"
	"go.skia.org/infra/go/sql/sqlutil"
	"go.skia.org/infra/go/vec32"
	"go.skia.org/infra/perf/go/alerts"
	"go.skia.org/infra/perf/go/clustering2"
	"go.skia.org/infra/perf/go/regression"
	"go.skia.org/infra/perf/go/sql"
	"go.skia.org/infra/perf/go/stepfit"
	"go.skia.org/infra/perf/go/types"
	"go.skia.org/infra/perf/go/ui/frame"
)

// SQLRegressionStore implements the regression.Store interface.
type SQLRegression2Store struct {
	// db is the underlying database.
	db                         pool.Pool
	statements                 map[statementFormat]string
	alertConfigProvider        alerts.ConfigProvider
	regressionFoundCounterLow  metrics2.Counter
	regressionFoundCounterHigh metrics2.Counter
}

// statementFormat is an SQL statementFormat identifier.
type statementFormat int

const (
	// The identifiers for all the SQL statements used.
	write statementFormat = iota
	readCompat
	readOldest
	readRange
	readByIDs
	readBySubName
	deleteByCommit
)

// statementContext provides a struct to expand sql statement templates.
type statementContext struct {
	Columns            string
	ValuesPlaceholders string
}

// statementFormats holds all the raw SQL statement templates.
var statementFormats = map[statementFormat]string{
	// readCompat is the query to read the data similar to regressions table
	// (using commit and alert ids). This allows us to keep it compatible until
	// we are fully migrated to the new schema.
	readCompat: `
		SELECT
			{{ .Columns }}
		FROM
			Regressions2
		WHERE
			commit_number=$1 AND alert_id=$2
		`,
	readOldest: `
		SELECT
			commit_number
		FROM
			Regressions2
		ORDER BY
			commit_number ASC
		LIMIT 1
		`,
	readRange: `
		SELECT
			{{ .Columns }}
		FROM
			Regressions2
		WHERE
			commit_number >= $1
			AND commit_number <= $2
		`,
	write: `
		INSERT INTO
			Regressions2 ({{ .Columns }})
		VALUES
			{{ .ValuesPlaceholders }}
		ON CONFLICT (id) DO UPDATE
        SET median_before=EXCLUDED.median_before, median_after=EXCLUDED.median_after, frame=EXCLUDED.frame,
		triage_status=EXCLUDED.triage_status, triage_message=EXCLUDED.triage_message, alert_id=EXCLUDED.alert_id,
		cluster_summary=EXCLUDED.cluster_summary, cluster_type=EXCLUDED.cluster_type,
		commit_number=EXCLUDED.commit_number, creation_time=EXCLUDED.creation_time, id=EXCLUDED.id,
		is_improvement=EXCLUDED.is_improvement, prev_commit_number=EXCLUDED.prev_commit_number
		`,
	readByIDs: `
		SELECT
			{{ .Columns }}
		FROM
			Regressions2
		WHERE
			id IN (%s)
		`,
	readBySubName: `
		SELECT
			r.id, commit_number, prev_commit_number, alert_id, creation_time, median_before, median_after, is_improvement, cluster_type, cluster_summary, frame, triage_status, triage_message
		FROM
			Regressions2 r
		INNER JOIN
			Alerts a ON r.alert_id=a.id
		WHERE
			a.sub_name = $1
		LIMIT
			$2
		OFFSET
			$3
		`,
	deleteByCommit: `
		DELETE
		FROM
			Regressions2
		WHERE
			commit_number=$1
		`,
}

// New returns a new instance of SQLRegression2Store
func New(db pool.Pool, alertConfigProvider alerts.ConfigProvider) (*SQLRegression2Store, error) {
	templates := map[statementFormat]string{}
	context := statementContext{
		Columns:            strings.Join(sql.Regressions2, ","),
		ValuesPlaceholders: sqlutil.ValuesPlaceholders(len(sql.Regressions2), 1),
	}
	for key, tmpl := range statementFormats {
		t, err := template.New("").Parse(tmpl)
		if err != nil {
			return nil, skerr.Wrapf(err, "Error parsing template %v, %q", key, tmpl)
		}
		// Expand the template for the SQL.
		var b bytes.Buffer
		if err := t.Execute(&b, context); err != nil {
			return nil, skerr.Wrapf(err, "Failed to execute template %v", key)
		}
		templates[key] = b.String()
	}
	return &SQLRegression2Store{
		db:                         db,
		statements:                 templates,
		alertConfigProvider:        alertConfigProvider,
		regressionFoundCounterLow:  metrics2.GetCounter("perf_regression2_store_found", map[string]string{"direction": "low"}),
		regressionFoundCounterHigh: metrics2.GetCounter("perf_regression2_store_found", map[string]string{"direction": "high"}),
	}, nil
}

// Range implements the regression.Store interface.
func (s *SQLRegression2Store) Range(ctx context.Context, begin, end types.CommitNumber) (map[types.CommitNumber]*regression.AllRegressionsForCommit, error) {
	ret := map[types.CommitNumber]*regression.AllRegressionsForCommit{}
	rows, err := s.db.Query(ctx, s.statements[readRange], begin, end)
	if err != nil {
		return nil, skerr.Wrapf(err, "Failed to read regressions in range: %d %d", begin, end)
	}
	for rows.Next() {
		r, err := convertRowToRegression(rows)
		if err != nil {
			return nil, err
		}
		allForCommit, ok := ret[r.CommitNumber]
		if !ok {
			allForCommit = regression.New()
		}
		alertIDString := alerts.IDToString(r.AlertId)
		existingRegressionForAlertId := allForCommit.ByAlertID[alertIDString]
		// If there are existing regressions for the commit-alert tuple,
		// merge it into the same regression object for backward compatibility.
		if existingRegressionForAlertId != nil {
			if existingRegressionForAlertId.High != nil && r.Low != nil {
				existingRegressionForAlertId.Low = r.Low
				existingRegressionForAlertId.LowStatus = r.LowStatus
			} else {
				existingRegressionForAlertId.High = r.High
				existingRegressionForAlertId.HighStatus = r.HighStatus
			}
		} else {
			allForCommit.ByAlertID[alertIDString] = r
		}
		ret[r.CommitNumber] = allForCommit
	}
	return ret, nil
}

// SetHigh implements the regression.Store interface.
func (s *SQLRegression2Store) SetHigh(ctx context.Context, commitNumber types.CommitNumber, alertID string, df *frame.FrameResponse, high *clustering2.ClusterSummary) (bool, string, error) {
	ret := false
	regressionID, err := s.updateBasedOnAlertAlgo(ctx, commitNumber, alertID, df, false, func(r *regression.Regression) {
		if r.Frame == nil {
			r.Frame = df
			ret = true
		}
		r.High = high
		if r.HighStatus.Status == regression.None {
			r.HighStatus.Status = regression.Untriaged
		}
		populateRegression2Fields(r)
	})
	s.regressionFoundCounterHigh.Inc(1)
	return ret, regressionID, err
}

// SetLow implements the regression.Store interface.
func (s *SQLRegression2Store) SetLow(ctx context.Context, commitNumber types.CommitNumber, alertID string, df *frame.FrameResponse, low *clustering2.ClusterSummary) (bool, string, error) {
	ret := false
	regressionID, err := s.updateBasedOnAlertAlgo(ctx, commitNumber, alertID, df, false /* mustExist*/, func(r *regression.Regression) {
		if r.Frame == nil {
			r.Frame = df
			ret = true
		}
		r.Low = low
		if r.LowStatus.Status == regression.None {
			r.LowStatus.Status = regression.Untriaged
		}
		populateRegression2Fields(r)
	})
	s.regressionFoundCounterLow.Inc(1)
	return ret, regressionID, err
}

// TriageLow implements the regression.Store interface.
func (s *SQLRegression2Store) TriageLow(ctx context.Context, commitNumber types.CommitNumber, alertID string, tr regression.TriageStatus) error {
	// TODO(ashwinpv): This code will update all regressions with the <commit_id, alert_id> pair.
	// Once we move all the data to the new db, this will need to be updated to take in a specific
	// regression id and update only that.
	_, err := s.readModifyWriteCompat(ctx, commitNumber, alertID, true, func(r *regression.Regression) bool {
		r.LowStatus = tr
		return true
	})
	return err
}

// TriageHigh implements the regression.Store interface.
func (s *SQLRegression2Store) TriageHigh(ctx context.Context, commitNumber types.CommitNumber, alertID string, tr regression.TriageStatus) error {
	// TODO(ashwinpv): This code will update all regressions with the <commit_id, alert_id> pair.
	// Once we move all the data to the new db, this will need to be updated to take in a specific
	// regression id and update only that.
	_, err := s.readModifyWriteCompat(ctx, commitNumber, alertID, true, func(r *regression.Regression) bool {
		r.HighStatus = tr
		return true
	})
	return err
}

// No Op for SQLRegression2Store.
func (s *SQLRegression2Store) GetRegression(ctx context.Context, commitNumber types.CommitNumber, alertID string) (*regression.Regression, error) {
	return nil, nil
}

// GetOldestCommit implements regression.Store interface
func (s *SQLRegression2Store) GetOldestCommit(ctx context.Context) (*types.CommitNumber, error) {
	var num int
	if err := s.db.QueryRow(ctx, statementFormats[readOldest]).Scan(&num); err != nil {
		return nil, skerr.Wrapf(err, "Failed to fetch oldest commit.")
	}
	commitNumber := types.CommitNumber(num)
	return &commitNumber, nil
}

// Write implements the regression.Store interface.
func (s *SQLRegression2Store) Write(ctx context.Context, regressions map[types.CommitNumber]*regression.AllRegressionsForCommit) error {
	for _, allRegressionsForCommit := range regressions {
		for _, reg := range allRegressionsForCommit.ByAlertID {
			if err := s.writeSingleRegression(ctx, reg, nil); err != nil {
				return err
			}
		}
	}
	return nil
}

// Given the subscription name GetRegressionsBySubName gets all the regressions against
// the specified subscription. The response will be paginated according to the provided
// limit and offset.
func (s *SQLRegression2Store) GetRegressionsBySubName(ctx context.Context, sub_name string, limit int, offset int) ([]*regression.Regression, error) {
	statement := s.statements[readBySubName]
	rows, err := s.db.Query(ctx, statement, sub_name, limit, offset)
	if err != nil {
		return nil, skerr.Wrapf(err, "failed to get regressions. Query: %s", statement)
	}

	regressions := []*regression.Regression{}
	for rows.Next() {
		r, err := convertRowToRegression(rows)
		if err != nil {
			return nil, skerr.Wrapf(err, "failed to convert row to regression.")
		}
		regressions = append(regressions, r)
	}

	return regressions, nil
}

// Get a list of regressions given a list of regression ids.
func (s *SQLRegression2Store) GetByIDs(ctx context.Context, ids []string) ([]*regression.Regression, error) {
	statement := s.statements[readByIDs]
	query := fmt.Sprintf(statement, quotedSlice(ids))
	rows, err := s.db.Query(ctx, query)
	if err != nil {
		return nil, skerr.Wrapf(err, "failed to get regressions by id list. Query: %s", query)
	}

	var regressions []*regression.Regression
	for rows.Next() {
		r, err := convertRowToRegression(rows)
		if err != nil {
			return nil, skerr.Wrapf(err, "failed to convert row to regression.")
		}
		regressions = append(regressions, r)
	}

	return regressions, nil
}

// convertRowToRegression converts the content of the row retrieved from the database
// into a regression object. This will return an error if either there is no data
// in the row, or if the data is invalid (eg: failed data conversion).
func convertRowToRegression(rows pgx.Row) (*regression.Regression, error) {
	r := regression.NewRegression()

	// Once we are fully migrated to regression2 schema, the variables below
	// will be directly read into the regression.Regression object.
	var clusterType regression.ClusterType
	var clusterSummary clustering2.ClusterSummary
	var triageStatus string
	var triageMessage string
	err := rows.Scan(&r.Id, &r.CommitNumber, &r.PrevCommitNumber, &r.AlertId, &r.CreationTime, &r.MedianBefore, &r.MedianAfter, &r.IsImprovement, &clusterType, &clusterSummary, &r.Frame, &triageStatus, &triageMessage)
	if err != nil {
		return nil, err
	}

	r.ClusterType = string(clusterType)
	switch clusterType {
	case regression.HighClusterType:
		r.High = &clusterSummary
		r.HighStatus = regression.TriageStatus{
			Status:  regression.Status(triageStatus),
			Message: triageMessage,
		}
	case regression.LowClusterType:
		r.Low = &clusterSummary
		r.LowStatus = regression.TriageStatus{
			Status:  regression.Status(triageStatus),
			Message: triageMessage,
		}
	default:
		// Do nothing
	}

	return r, nil
}

// writeSingleRegression writes the regression.Regression object to the database.
// If the tx is specified, the write occurs within the transaction.
func (s *SQLRegression2Store) writeSingleRegression(ctx context.Context, r *regression.Regression, tx pgx.Tx) error {
	clusterType, clusterSummary, triage := r.GetClusterTypeAndSummaryAndTriageStatus()
	r.CreationTime = time.Now()
	var err error
	if tx == nil {
		_, err = s.db.Exec(ctx, s.statements[write], r.Id, r.CommitNumber, r.PrevCommitNumber, r.AlertId, r.CreationTime, r.MedianBefore, r.MedianAfter, r.IsImprovement, clusterType, clusterSummary, r.Frame, triage.Status, triage.Message)
	} else {
		_, err = tx.Exec(ctx, s.statements[write], r.Id, r.CommitNumber, r.PrevCommitNumber, r.AlertId, r.CreationTime, r.MedianBefore, r.MedianAfter, r.IsImprovement, clusterType, clusterSummary, r.Frame, triage.Status, triage.Message)
	}
	if err != nil {
		return skerr.Wrapf(err, "Failed to write single regression with id %s", r.Id)
	}
	return nil
}

// updateBasedOnAlertAlgo updates the regression based on the Algo specified in the
// alert config. This is to handle the difference in creating/updating regressions in
// KMeans v/s Individual mode.
// TODO(ashwinpv): Once we are fully on to the regression2 schema, move this logic out
// of the Store (since ideally store should only care about reading and writing data instead
// of the feature semantics)
func (s *SQLRegression2Store) updateBasedOnAlertAlgo(ctx context.Context, commitNumber types.CommitNumber, alertID string, df *frame.FrameResponse, mustExist bool, updateFunc func(r *regression.Regression)) (string, error) {
	// If KMeans the expectation is that as we get more incoming data,
	// the regression becomes more accurate. This means we need to check
	// if there is a regression for the same <commit_id, alert_id> pair
	// and update it.
	var regressionID string
	var err error
	alertConfig, err := s.alertConfigProvider.GetAlertConfig(alerts.IDAsStringToInt(alertID))
	if err != nil {
		return "", err
	}
	if alertConfig.Algo == types.KMeansGrouping {
		regressionID, err = s.readModifyWriteCompat(ctx, commitNumber, alertID, mustExist /* mustExist*/, func(r *regression.Regression) bool {
			updateFunc(r)
			return true
		})
	} else {
		regressionID, err = s.readModifyWriteCompat(ctx, commitNumber, alertID, mustExist /* mustExist*/, func(r *regression.Regression) bool {
			if r.Frame != nil {
				// Do not update existing regressions when the algo is stepfit.
				return false
			}

			// At this point r is a newly created regression. This should get updated in the database.
			updateFunc(r)
			return true
		})
	}

	if err != nil {
		sklog.Errorf("Error while updating database %s", err)
		return "", err
	}

	return regressionID, nil
}

// readModifyWriteCompat reads the Regression at the given commitNumber and alert id
// and then calls the given callback, giving the caller a chance to modify the
// struct, before writing it back to the database.
//
// If mustExist is true then the read must be successful, otherwise a new
// default Regression will be used and stored back to the database after the
// callback is called.
func (s *SQLRegression2Store) readModifyWriteCompat(ctx context.Context, commitNumber types.CommitNumber, alertIDString string, mustExist bool, cb func(r *regression.Regression) bool) (string, error) {
	alertID := alerts.IDAsStringToInt(alertIDString)
	if alertID == alerts.BadAlertID {
		return "", skerr.Fmt("Failed to convert alertIDString %q to an int.", alertIDString)
	}

	// Do everything in a transaction so we don't have any lost updates.
	tx, err := s.db.Begin(ctx)
	if err != nil {
		return "", skerr.Wrapf(err, "Can't start transaction")
	}

	var r *regression.Regression

	rows, err := tx.Query(ctx, s.statements[readCompat], commitNumber, alertID)
	if err != nil {
		rollbackTransaction(ctx, tx)
		return "", err
	}

	regressionsToWrite := []*regression.Regression{}
	existingRows := false
	for rows.Next() {
		existingRows = true
		r, err = convertRowToRegression(rows)
		if err != nil {
			if mustExist {
				var errorMsg string
				if err == pgx.ErrNoRows {
					errorMsg = "Regression does not exist"
				} else {
					errorMsg = "Failed reading regression data."
				}
				rollbackTransaction(ctx, tx)
				return "", skerr.Wrapf(err, "%s", errorMsg)
			}
		}

		shouldUpdate := cb(r)

		if shouldUpdate {
			regressionsToWrite = append(regressionsToWrite, r)
		}
	}

	if !existingRows {
		r = regression.NewRegression()
		r.AlertId = alertID
		r.CommitNumber = commitNumber
		r.CreationTime = time.Now().UTC()
		shouldUpdate := cb(r)

		if shouldUpdate {
			regressionsToWrite = append(regressionsToWrite, r)
		}
	}

	// Update all the regressions marked for writing into the db.
	for _, reg := range regressionsToWrite {
		if err = s.writeSingleRegression(ctx, reg, tx); err != nil {
			rollbackTransaction(ctx, tx)
			return "", skerr.Wrapf(err, "Failed to write regression for alertID: %d  commitNumber=%d", alertID, commitNumber)
		}
	}
	return r.Id, tx.Commit(ctx)
}

func rollbackTransaction(ctx context.Context, tx pgx.Tx) {
	if err := tx.Rollback(ctx); err != nil {
		sklog.Errorf("Failed on rollback: %s", err)
	}
}

// WriteRegression writes a single regression object into the table and returns the Id of the written row.
func (s *SQLRegression2Store) WriteRegression(ctx context.Context, regression *regression.Regression, tx pgx.Tx) (string, error) {
	// If the regression has both high and low specified, we need to create two separate regression
	// entries into the regression2 table.
	if regression.High != nil && regression.Low != nil {
		highRegression := *regression
		lowRegression := *regression
		highRegression.Low = nil
		lowRegression.High = nil
		populateRegression2Fields(&highRegression)
		err := s.writeSingleRegression(ctx, &highRegression, tx)
		if err == nil {
			populateRegression2Fields(&lowRegression)
			err = s.writeSingleRegression(ctx, &lowRegression, tx)
		}
		return highRegression.Id, err
	} else {
		populateRegression2Fields(regression)
		err := s.writeSingleRegression(ctx, regression, tx)
		return regression.Id, err
	}
}

// populateRegression2Fields populates the fields in the regression object
// which are specific to the regression2 schema.
func populateRegression2Fields(regression *regression.Regression) {
	if regression.Id == "" {
		regression.Id = uuid.NewString()
	}

	_, clusterSummary, _ := regression.GetClusterTypeAndSummaryAndTriageStatus()

	regression.CreationTime = clusterSummary.Timestamp

	// Find the index of the commit where the regression was detected.
	regressionPointIndex := clusterSummary.StepFit.TurningPoint

	prevCommitNumber := regression.Frame.DataFrame.Header[regressionPointIndex-1].Offset
	regression.PrevCommitNumber = prevCommitNumber

	medianBefore, _, _, _ := vec32.TwoSidedStdDev(clusterSummary.Centroid[:regressionPointIndex])
	regression.MedianBefore = medianBefore
	medianAfter, _, _, _ := vec32.TwoSidedStdDev(clusterSummary.Centroid[regressionPointIndex:])
	regression.MedianAfter = medianAfter

	regression.IsImprovement = isRegressionImprovement(regression.Frame.DataFrame.ParamSet, clusterSummary.StepFit.Status)
}

// isRegressionImprovement returns true if the metric has moved towards the improvement direction.
func isRegressionImprovement(paramset map[string][]string, stepFitStatus stepfit.StepFitStatus) bool {
	if _, ok := paramset["improvement_direction"]; ok {
		improvementDirection := paramset["improvement_direction"]
		return improvementDirection[0] == "down" && stepFitStatus == stepfit.LOW || improvementDirection[0] == "up" && stepFitStatus == stepfit.HIGH
	}

	return false
}

// DeleteByCommit implements the regression.Store interface. Deletes a regression via commit number.
func (s *SQLRegression2Store) DeleteByCommit(ctx context.Context, num types.CommitNumber, tx pgx.Tx) error {
	var err error
	if tx == nil {
		_, err = s.db.Exec(ctx, statementFormats[deleteByCommit], num)
	} else {
		_, err = tx.Exec(ctx, statementFormats[deleteByCommit], num)
	}

	return err
}

// Confirm that SQLRegressionStore implements regression.Store.
var _ regression.Store = (*SQLRegression2Store)(nil)

// Takes a string array as input, and returns a comma joined string where each element
// is single quoted.
func quotedSlice(a []string) string {
	q := make([]string, len(a))
	for i, s := range a {
		q[i] = fmt.Sprintf("'%s'", s)
	}
	return strings.Join(q, ", ")
}
