// Package sqlclstore contains a SQL implementation of a clstore.Store.
package sqlclstore

import (
	"context"

	"github.com/jackc/pgx/v4"
	"github.com/jackc/pgx/v4/pgxpool"
	"go.opencensus.io/trace"

	"go.skia.org/infra/go/skerr"
	"go.skia.org/infra/go/sklog"
	"go.skia.org/infra/golden/go/clstore"
	"go.skia.org/infra/golden/go/code_review"
	"go.skia.org/infra/golden/go/sql"
	"go.skia.org/infra/golden/go/sql/schema"
)

type StoreImpl struct {
	db       *pgxpool.Pool
	systemID string
}

// New returns a SQL-backed clstore.Store for the given system.
func New(db *pgxpool.Pool, systemID string) *StoreImpl {
	return &StoreImpl{
		db:       db,
		systemID: systemID,
	}
}

// We don't have an enterprise license and don't benefit from follower reads.
// https://www.cockroachlabs.com/docs/stable/follower-reads.html
// If we did, we would want the AS OF SYSTEM TIME to be 4.8 seconds (or 5 for ease).
// Setting a non-zero delay reduces some contention because the query doesn't have to
// be retried if data is written while the query is being executed (which is common for
// instances that stream in results).
const statementAll = `
SELECT changelist_id, status, owner_email, subject, last_ingested_data
FROM Changelists AS OF SYSTEM TIME '-0.1s'
WHERE system = $1 and last_ingested_data > $2
ORDER BY last_ingested_data DESC
LIMIT $3 OFFSET $4
`

const statementOpenOnly = `
SELECT changelist_id, status, owner_email, subject, last_ingested_data
FROM Changelists AS OF SYSTEM TIME '-0.1s'
WHERE system = $1 and last_ingested_data > $2 and status = 'open'
ORDER BY last_ingested_data DESC
LIMIT $3 OFFSET $4
`

// GetChangelists implements clstore.Store.
func (s *StoreImpl) GetChangelists(ctx context.Context, opts clstore.SearchOptions) ([]code_review.Changelist, int, error) {
	ctx, span := trace.StartSpan(ctx, "sqlclstore_GetChangelists")
	defer span.End()
	if opts.Limit <= 0 {
		return nil, 0, skerr.Fmt("must supply a limit")
	}
	if opts.StartIdx < 0 {
		return nil, 0, skerr.Fmt("start index must be positive")
	}
	statement := statementAll
	if opts.OpenCLsOnly {
		statement = statementOpenOnly
	}
	rows, err := s.db.Query(ctx, statement, s.systemID, opts.After, opts.Limit, opts.StartIdx)
	if err != nil {
		return nil, -1, skerr.Wrapf(err, "querying for options %s - %#v", s.systemID, opts)
	}
	defer rows.Close()
	var rv []code_review.Changelist
	for rows.Next() {
		var r schema.ChangelistRow
		err := rows.Scan(&r.ChangelistID, &r.Status, &r.OwnerEmail, &r.Subject, &r.LastIngestedData)
		if err != nil {
			return nil, -1, skerr.Wrapf(err, "Scanning data for changelists %s - %#v", s.systemID, opts)
		}
		rv = append(rv, code_review.Changelist{
			SystemID: sql.Unqualify(r.ChangelistID),
			Owner:    r.OwnerEmail,
			Status:   convertToStatusEnum(r.Status),
			Subject:  r.Subject,
			Updated:  r.LastIngestedData.UTC(),
		})
	}

	const totalStatement = `SELECT count(*) FROM Changelists WHERE system = $1`
	var total int
	countRow := s.db.QueryRow(ctx, totalStatement, s.systemID)
	if err := countRow.Scan(&total); err != nil {
		return nil, -1, skerr.Wrapf(err, "counting changelists")
	}
	return rv, total, nil
}

// GetChangelist implements clstore.Store.
func (s *StoreImpl) GetChangelist(ctx context.Context, id string) (code_review.Changelist, error) {
	qID := sql.Qualify(s.systemID, id)
	row := s.db.QueryRow(ctx, `
SELECT status, owner_email, subject, last_ingested_data FROM Changelists WHERE changelist_id = $1`, qID)
	var r schema.ChangelistRow
	err := row.Scan(&r.Status, &r.OwnerEmail, &r.Subject, &r.LastIngestedData)
	if err != nil {
		if err == pgx.ErrNoRows {
			return code_review.Changelist{}, clstore.ErrNotFound
		}
		return code_review.Changelist{}, skerr.Wrapf(err, "querying for id %s", qID)
	}
	return code_review.Changelist{
		SystemID: id,
		Owner:    r.OwnerEmail,
		Status:   convertToStatusEnum(r.Status),
		Subject:  r.Subject,
		Updated:  r.LastIngestedData.UTC(),
	}, nil
}

// PutChangelist implements clstore.Store.
func (s *StoreImpl) PutChangelist(ctx context.Context, cl code_review.Changelist) error {
	qID := sql.Qualify(s.systemID, cl.SystemID)
	const statement = `
UPSERT INTO Changelists (changelist_id, system, status, owner_email, subject, last_ingested_data)
VALUES ($1, $2, $3, $4, $5, $6)`
	_, err := s.db.Exec(ctx, statement, qID, s.systemID, convertFromStatusEnum(cl.Status), cl.Owner, cl.Subject, cl.Updated)
	if err != nil {
		return skerr.Wrapf(err, "Inserting CL %#v", cl)
	}
	return nil
}

// GetPatchsets implements clstore.Store.
func (s *StoreImpl) GetPatchsets(ctx context.Context, clID string) ([]code_review.Patchset, error) {
	qID := sql.Qualify(s.systemID, clID)
	rows, err := s.db.Query(ctx, `
SELECT patchset_id, ps_order, git_hash, commented_on_cl, last_checked_if_comment_necessary
FROM Patchsets WHERE changelist_id = $1 ORDER BY ps_order ASC`, qID)
	if err != nil {
		return nil, skerr.Wrapf(err, "querying for cl %s", qID)
	}
	defer rows.Close()
	var rv []code_review.Patchset
	for rows.Next() {
		var r schema.PatchsetRow
		err := rows.Scan(&r.PatchsetID, &r.Order, &r.GitHash, &r.CommentedOnCL, &r.LastCheckedIfCommentNecessary)
		if err != nil {
			return nil, skerr.Wrapf(err, "Scanning data for cl %s", qID)
		}
		rv = append(rv, code_review.Patchset{
			SystemID:                      sql.Unqualify(r.PatchsetID),
			ChangelistID:                  clID,
			Order:                         r.Order,
			GitHash:                       r.GitHash,
			CommentedOnCL:                 r.CommentedOnCL,
			LastCheckedIfCommentNecessary: r.LastCheckedIfCommentNecessary.UTC(),
		})
	}
	return rv, nil
}

// GetPatchset implements clstore.Store.
func (s *StoreImpl) GetPatchset(ctx context.Context, _, psID string) (code_review.Patchset, error) {
	qID := sql.Qualify(s.systemID, psID)
	row := s.db.QueryRow(ctx, `
SELECT changelist_id, ps_order, git_hash, commented_on_cl, last_checked_if_comment_necessary
FROM Patchsets WHERE patchset_id = $1`, qID)
	var r schema.PatchsetRow
	err := row.Scan(&r.ChangelistID, &r.Order, &r.GitHash, &r.CommentedOnCL, &r.LastCheckedIfCommentNecessary)
	if err != nil {
		if err == pgx.ErrNoRows {
			return code_review.Patchset{}, clstore.ErrNotFound
		}
		return code_review.Patchset{}, skerr.Wrapf(err, "querying for id %s", qID)
	}
	return code_review.Patchset{
		SystemID:                      psID,
		ChangelistID:                  sql.Unqualify(r.ChangelistID),
		Order:                         r.Order,
		GitHash:                       r.GitHash,
		CommentedOnCL:                 r.CommentedOnCL,
		LastCheckedIfCommentNecessary: r.LastCheckedIfCommentNecessary.UTC(),
	}, nil
}

// GetPatchsetByOrder implements clstore.Store.
func (s *StoreImpl) GetPatchsetByOrder(ctx context.Context, clID string, psOrder int) (code_review.Patchset, error) {
	qID := sql.Qualify(s.systemID, clID)
	row := s.db.QueryRow(ctx, `
SELECT patchset_id, git_hash, commented_on_cl, last_checked_if_comment_necessary
FROM Patchsets WHERE changelist_id = $1 AND ps_order = $2`, qID, psOrder)
	var r schema.PatchsetRow
	err := row.Scan(&r.PatchsetID, &r.GitHash, &r.CommentedOnCL, &r.LastCheckedIfCommentNecessary)
	if err != nil {
		if err == pgx.ErrNoRows {
			return code_review.Patchset{}, clstore.ErrNotFound
		}
		return code_review.Patchset{}, skerr.Wrapf(err, "querying for order %s-%d", qID, psOrder)
	}
	return code_review.Patchset{
		SystemID:                      sql.Unqualify(r.PatchsetID),
		ChangelistID:                  clID,
		Order:                         psOrder,
		GitHash:                       r.GitHash,
		CommentedOnCL:                 r.CommentedOnCL,
		LastCheckedIfCommentNecessary: r.LastCheckedIfCommentNecessary.UTC(),
	}, nil
}

// PutPatchset implements clstore.Store. Note that due to foreign key constraints, it will fail
// if the Changelist does not already exist.
func (s *StoreImpl) PutPatchset(ctx context.Context, ps code_review.Patchset) error {
	psID := sql.Qualify(s.systemID, ps.SystemID)
	clID := sql.Qualify(s.systemID, ps.ChangelistID)
	const statement = `
UPSERT INTO Patchsets (patchset_id, system, changelist_id, ps_order, git_hash,
  commented_on_cl, last_checked_if_comment_necessary)
VALUES ($1, $2, $3, $4, $5, $6, $7)`
	_, err := s.db.Exec(ctx, statement, psID, s.systemID, clID, ps.Order, ps.GitHash,
		ps.CommentedOnCL, ps.LastCheckedIfCommentNecessary)
	if err != nil {
		return skerr.Wrapf(err, "Inserting PS %#v", ps)
	}
	return nil
}

func convertToStatusEnum(status schema.ChangelistStatus) code_review.CLStatus {
	switch status {
	case schema.StatusAbandoned:
		return code_review.Abandoned
	case schema.StatusOpen:
		return code_review.Open
	case schema.StatusLanded:
		return code_review.Landed
	}
	sklog.Warningf("Unknown status: %s", status)
	return code_review.Abandoned
}

func convertFromStatusEnum(status code_review.CLStatus) schema.ChangelistStatus {
	switch status {
	case code_review.Abandoned:
		return schema.StatusAbandoned
	case code_review.Open:
		return schema.StatusOpen
	case code_review.Landed:
		return schema.StatusLanded
	}
	sklog.Warningf("Unknown status: %d", status)
	return schema.StatusAbandoned
}

// Make sure StoreImpl fulfills the clstore.Store interface.
var _ clstore.Store = (*StoreImpl)(nil)
