// Package sqlregressionstore implements the regression.Store interface on an
// SQL database backend.
//
// To see the schema of the database used, see perf/sql/migrations.
package sqlregressionstore

import (
	"context"
	"encoding/json"

	"github.com/jackc/pgx/v4"
	"go.skia.org/infra/go/metrics2"
	"go.skia.org/infra/go/skerr"
	"go.skia.org/infra/go/sklog"
	"go.skia.org/infra/go/sql/pool"
	"go.skia.org/infra/perf/go/alerts"
	"go.skia.org/infra/perf/go/clustering2"
	"go.skia.org/infra/perf/go/regression"
	"go.skia.org/infra/perf/go/types"
	"go.skia.org/infra/perf/go/ui/frame"
)

// statement is an SQL statement identifier.
type statement int

const (
	// The identifiers for all the SQL statements used.
	write statement = iota
	read
	readRange
	batchReadMigration
	markMigrated
)

// statementsByDialect holds all the raw SQL statemens used per Dialect of SQL.
var statements = map[statement]string{
	write: `
		UPSERT INTO
			Regressions (commit_number, alert_id, regression, migrated)
		VALUES
			($1, $2, $3, false)
		`,
	read: `
		SELECT
			regression
		FROM
			Regressions
		WHERE
			commit_number=$1 AND
			alert_id=$2`,
	readRange: `
		SELECT
			commit_number, alert_id, regression
		FROM
			Regressions
		WHERE
			commit_number >= $1
			AND commit_number <= $2
		`,
	batchReadMigration: `
		SELECT
			commit_number, alert_id, regression, regression_id
		FROM
			Regressions
		WHERE
			migrated is NULL OR migrated=false
		LIMIT $1
		`,
	markMigrated: `
		UPDATE
			Regressions
		SET
			migrated=true, regression_id=$1
		WHERE
			commit_number=$2 AND alert_id=$3
		`,
}

// SQLRegressionStore implements the regression.Store interface.
type SQLRegressionStore struct {
	// db is the underlying database.
	db                         pool.Pool
	regressionFoundCounterLow  metrics2.Counter
	regressionFoundCounterHigh metrics2.Counter
}

// New returns a new *SQLRegressionStore.
//
// We presume all migrations have been run against db before this function is
// called.
func New(db pool.Pool) (*SQLRegressionStore, error) {
	return &SQLRegressionStore{
		db:                         db,
		regressionFoundCounterLow:  metrics2.GetCounter("perf_regression_store_found", map[string]string{"direction": "low"}),
		regressionFoundCounterHigh: metrics2.GetCounter("perf_regression_store_found", map[string]string{"direction": "high"}),
	}, nil
}

// Unimplemented: This function is implemented by regression2 store
func (s *SQLRegressionStore) GetRegressionsBySubName(ctx context.Context, sub_name string, limit int, offset int) ([]*regression.Regression, error) {
	return nil, nil
}

// Range implements the regression.Store interface.
func (s *SQLRegressionStore) Range(ctx context.Context, begin, end types.CommitNumber) (map[types.CommitNumber]*regression.AllRegressionsForCommit, error) {
	ret := map[types.CommitNumber]*regression.AllRegressionsForCommit{}
	rows, err := s.db.Query(ctx, statements[readRange], begin, end)
	if err != nil {
		return nil, skerr.Wrapf(err, "Failed to read regressions in range: %d %d", begin, end)
	}
	for rows.Next() {
		var commitID types.CommitNumber
		var alertID int64
		var jsonRegression string
		if err := rows.Scan(&commitID, &alertID, &jsonRegression); err != nil {
			return nil, skerr.Wrapf(err, "Failed to read single regression in range: %d %d", begin, end)
		}
		var r regression.Regression
		if err := json.Unmarshal([]byte(jsonRegression), &r); err != nil {
			return nil, skerr.Wrapf(err, "Failed to decode a single regression in range: %d %d", begin, end)
		}
		allForCommit, ok := ret[commitID]
		if !ok {
			allForCommit = regression.New()
		}
		alertIDString := alerts.IDToString(alertID)
		allForCommit.ByAlertID[alertIDString] = &r
		ret[commitID] = allForCommit
	}
	return ret, nil
}

// SetHigh implements the regression.Store interface.
func (s *SQLRegressionStore) SetHigh(ctx context.Context, commitNumber types.CommitNumber, alertID string, df *frame.FrameResponse, high *clustering2.ClusterSummary) (bool, error) {
	ret := false
	err := s.readModifyWrite(ctx, commitNumber, alertID, false /* mustExist*/, func(r *regression.Regression) {
		if r.Frame == nil {
			r.Frame = df
			ret = true
		}
		r.High = high
		if r.HighStatus.Status == regression.None {
			r.HighStatus.Status = regression.Untriaged
		}
	})
	s.regressionFoundCounterHigh.Inc(1)
	return ret, err

}

// SetLow implements the regression.Store interface.
func (s *SQLRegressionStore) SetLow(ctx context.Context, commitNumber types.CommitNumber, alertID string, df *frame.FrameResponse, low *clustering2.ClusterSummary) (bool, error) {
	ret := false
	err := s.readModifyWrite(ctx, commitNumber, alertID, false /* mustExist*/, func(r *regression.Regression) {
		if r.Frame == nil {
			r.Frame = df
			ret = true
		}
		r.Low = low
		if r.LowStatus.Status == regression.None {
			r.LowStatus.Status = regression.Untriaged
		}
	})
	s.regressionFoundCounterLow.Inc(1)
	return ret, err
}

// TriageLow implements the regression.Store interface.
func (s *SQLRegressionStore) TriageLow(ctx context.Context, commitNumber types.CommitNumber, alertID string, tr regression.TriageStatus) error {
	return s.readModifyWrite(ctx, commitNumber, alertID, true /* mustExist*/, func(r *regression.Regression) {
		r.LowStatus = tr
	})
}

// TriageHigh implements the regression.Store interface.
func (s *SQLRegressionStore) TriageHigh(ctx context.Context, commitNumber types.CommitNumber, alertID string, tr regression.TriageStatus) error {
	return s.readModifyWrite(ctx, commitNumber, alertID, true /* mustExist*/, func(r *regression.Regression) {
		r.HighStatus = tr
	})
}

// Write implements the regression.Store interface.
func (s *SQLRegressionStore) Write(ctx context.Context, regressions map[types.CommitNumber]*regression.AllRegressionsForCommit) error {
	for commitNumber, allRegressionsForCommit := range regressions {
		for alertIDString, reg := range allRegressionsForCommit.ByAlertID {
			if err := s.write(ctx, commitNumber, alertIDString, reg); err != nil {
				return err
			}
		}
	}
	return nil
}

// write the given Regression into the database at the given commitNumber and
// alert id.
func (s *SQLRegressionStore) write(ctx context.Context, commitNumber types.CommitNumber, alertIDString string, r *regression.Regression) error {
	if alertIDString == alerts.BadAlertIDAsAsString {
		return skerr.Fmt("Failed to convert alertIDString %q to an int.", alertIDString)
	}
	alertID := alerts.IDAsStringToInt(alertIDString)

	b, err := json.Marshal(r)
	if err != nil {
		return skerr.Wrapf(err, "Failed to serialize regression for alertID: %d  commitNumber=%d", alertID, commitNumber)
	}
	if _, err := s.db.Exec(ctx, statements[write], commitNumber, alertID, string(b)); err != nil {
		return skerr.Wrapf(err, "Failed to write regression for alertID: %d  commitNumber=%d", alertID, commitNumber)
	}
	return nil
}

// read the Regression from the database at the given commitNumber and alert id.
// This func is only used in tests.
func (s *SQLRegressionStore) read(ctx context.Context, commitNumber types.CommitNumber, alertIDString string) (*regression.Regression, error) {
	if alertIDString == alerts.BadAlertIDAsAsString {
		return nil, skerr.Fmt("Failed to convert alertIDString %q to an int.", alertIDString)
	}
	alertID := alerts.IDAsStringToInt(alertIDString)
	var jsonString string
	if err := s.db.QueryRow(ctx, statements[read], commitNumber, alertID).Scan(&jsonString); err != nil {
		return nil, skerr.Wrapf(err, "Failed to read regression for alertID: %d commitNumber=%d", alertID, commitNumber)
	}
	r := regression.NewRegression()
	if err := json.Unmarshal([]byte(jsonString), r); err != nil {
		return nil, skerr.Wrapf(err, "Failed to deserialize regression for alertID: %d commitNumber=%d", alertID, commitNumber)
	}
	return r, nil
}

// readModifyWrite reads the Regression at the given commitNumber and alert id
// and then calls the given callback, giving the caller a chance to modify the
// struct, before writing it back to the database.
//
// If mustExist is true then the read must be successful, otherwise a new
// default Regression will be used and stored back to the database after the
// callback is called.
func (s *SQLRegressionStore) readModifyWrite(ctx context.Context, commitNumber types.CommitNumber, alertIDString string, mustExist bool, cb func(r *regression.Regression)) error {
	if alertIDString == alerts.BadAlertIDAsAsString {
		return skerr.Fmt("Failed to convert alertIDString %q to an int.", alertIDString)
	}
	alertID := alerts.IDAsStringToInt(alertIDString)

	// Do everything in a transaction so we don't have any lost updates.
	tx, err := s.db.Begin(ctx)
	if err != nil {
		return skerr.Wrapf(err, "Can't start transaction")
	}

	r := regression.NewRegression()
	r.Id = ""

	// Read the regression from the database. If any part of that fails then
	// just use the default regression we've already constructed.
	var jsonString string
	if err := tx.QueryRow(ctx, statements[read], commitNumber, alertID).Scan(&jsonString); err == nil {
		if err := json.Unmarshal([]byte(jsonString), r); err != nil {
			sklog.Warningf("Failed to deserialize the JSON Regression: %s", err)
		}
	} else {
		if mustExist {
			if err := tx.Rollback(ctx); err != nil {
				sklog.Errorf("Failed on rollback: %s", err)
			}
			return skerr.Wrapf(err, "Regression doesn't exist.")
		}
	}

	cb(r)

	b, err := json.Marshal(r)
	if err != nil {
		if err := tx.Rollback(ctx); err != nil {
			sklog.Errorf("Failed on rollback: %s", err)
		}
		return skerr.Wrapf(err, "Failed to serialize regression for alertID: %d  commitNumber=%d", alertID, commitNumber)
	}
	if _, err := tx.Exec(ctx, statements[write], commitNumber, alertID, string(b)); err != nil {
		if err := tx.Rollback(ctx); err != nil {
			sklog.Errorf("Failed on rollback: %s", err)
		}
		return skerr.Wrapf(err, "Failed to write regression for alertID: %d  commitNumber=%d", alertID, commitNumber)
	}

	return tx.Commit(ctx)
}

// GetRegressionsToMigrate returns a set of regressions which are available to be migrated.
func (s *SQLRegressionStore) GetRegressionsToMigrate(ctx context.Context, batchSize int) ([]*regression.Regression, error) {
	regressions := []*regression.Regression{}
	var rows pgx.Rows
	var err error
	if rows, err = s.db.Query(ctx, statements[batchReadMigration], batchSize); err != nil {
		return nil, skerr.Wrapf(err, "Failed to read regressions for migration")
	}
	for rows.Next() {
		var commitID types.CommitNumber
		var alertID int64
		var jsonRegression string
		var regressionId *string
		if err := rows.Scan(&commitID, &alertID, &jsonRegression, &regressionId); err != nil {
			return nil, skerr.Wrapf(err, "Failed to read single regression")
		}
		var r regression.Regression
		if err := json.Unmarshal([]byte(jsonRegression), &r); err != nil {
			return nil, skerr.Wrapf(err, "Failed to decode a single regression for commit %d, alert %d", commitID, alertID)
		}
		r.AlertId = alertID
		r.CommitNumber = commitID
		if regressionId != nil {
			r.Id = *regressionId
		}

		regressions = append(regressions, &r)
	}

	return regressions, nil
}

// MarkMigrated marks a specific row in the regressions table as migrated.
func (s *SQLRegressionStore) MarkMigrated(ctx context.Context, regressionId string, commitNumber types.CommitNumber, alertID int64, tx pgx.Tx) error {
	if _, err := tx.Exec(ctx, statements[markMigrated], regressionId, commitNumber, alertID); err != nil {
		return skerr.Wrapf(err, "Failed to mark regression migrated for alertID: %d  commitNumber=%d", alertID, commitNumber)
	}

	return nil
}

// Not implemented as old regression schema does not have id.
func (s *SQLRegressionStore) GetByIDs(ctx context.Context, ids []string) ([]*regression.Regression, error) {
	return nil, skerr.Fmt("GetByIDs are not implemented in old version of regression store.")
}

// Confirm that SQLRegressionStore implements regression.Store.
var _ regression.Store = (*SQLRegressionStore)(nil)
