| // Package sqlclstore contains a SQL implementation of a clstore.Store. |
| package sqlclstore |
| |
| import ( |
| "context" |
| |
| "github.com/jackc/pgx/v4" |
| "github.com/jackc/pgx/v4/pgxpool" |
| "go.opencensus.io/trace" |
| |
| "go.skia.org/infra/go/skerr" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/golden/go/clstore" |
| "go.skia.org/infra/golden/go/code_review" |
| "go.skia.org/infra/golden/go/sql" |
| "go.skia.org/infra/golden/go/sql/schema" |
| ) |
| |
| type StoreImpl struct { |
| db *pgxpool.Pool |
| systemID string |
| } |
| |
| // New returns a SQL-backed clstore.Store for the given system. |
| func New(db *pgxpool.Pool, systemID string) *StoreImpl { |
| return &StoreImpl{ |
| db: db, |
| systemID: systemID, |
| } |
| } |
| |
| // We don't have an enterprise license and don't benefit from follower reads. |
| // https://www.cockroachlabs.com/docs/stable/follower-reads.html |
| // If we did, we would want the AS OF SYSTEM TIME to be 4.8 seconds (or 5 for ease). |
| // Setting a non-zero delay reduces some contention because the query doesn't have to |
| // be retried if data is written while the query is being executed (which is common for |
| // instances that stream in results). |
| const statementAll = ` |
| SELECT changelist_id, status, owner_email, subject, last_ingested_data |
| FROM Changelists AS OF SYSTEM TIME '-0.1s' |
| WHERE system = $1 and last_ingested_data > $2 |
| ORDER BY last_ingested_data DESC |
| LIMIT $3 OFFSET $4 |
| ` |
| |
| const statementOpenOnly = ` |
| SELECT changelist_id, status, owner_email, subject, last_ingested_data |
| FROM Changelists AS OF SYSTEM TIME '-0.1s' |
| WHERE system = $1 and last_ingested_data > $2 and status = 'open' |
| ORDER BY last_ingested_data DESC |
| LIMIT $3 OFFSET $4 |
| ` |
| |
| // GetChangelists implements clstore.Store. |
| func (s *StoreImpl) GetChangelists(ctx context.Context, opts clstore.SearchOptions) ([]code_review.Changelist, int, error) { |
| ctx, span := trace.StartSpan(ctx, "sqlclstore_GetChangelists") |
| defer span.End() |
| if opts.Limit <= 0 { |
| return nil, 0, skerr.Fmt("must supply a limit") |
| } |
| if opts.StartIdx < 0 { |
| return nil, 0, skerr.Fmt("start index must be positive") |
| } |
| statement := statementAll |
| if opts.OpenCLsOnly { |
| statement = statementOpenOnly |
| } |
| rows, err := s.db.Query(ctx, statement, s.systemID, opts.After, opts.Limit, opts.StartIdx) |
| if err != nil { |
| return nil, -1, skerr.Wrapf(err, "querying for options %s - %#v", s.systemID, opts) |
| } |
| defer rows.Close() |
| var rv []code_review.Changelist |
| for rows.Next() { |
| var r schema.ChangelistRow |
| err := rows.Scan(&r.ChangelistID, &r.Status, &r.OwnerEmail, &r.Subject, &r.LastIngestedData) |
| if err != nil { |
| return nil, -1, skerr.Wrapf(err, "Scanning data for changelists %s - %#v", s.systemID, opts) |
| } |
| rv = append(rv, code_review.Changelist{ |
| SystemID: sql.Unqualify(r.ChangelistID), |
| Owner: r.OwnerEmail, |
| Status: convertToStatusEnum(r.Status), |
| Subject: r.Subject, |
| Updated: r.LastIngestedData.UTC(), |
| }) |
| } |
| |
| const totalStatement = `SELECT count(*) FROM Changelists WHERE system = $1` |
| var total int |
| countRow := s.db.QueryRow(ctx, totalStatement, s.systemID) |
| if err := countRow.Scan(&total); err != nil { |
| return nil, -1, skerr.Wrapf(err, "counting changelists") |
| } |
| return rv, total, nil |
| } |
| |
| // GetChangelist implements clstore.Store. |
| func (s *StoreImpl) GetChangelist(ctx context.Context, id string) (code_review.Changelist, error) { |
| qID := sql.Qualify(s.systemID, id) |
| row := s.db.QueryRow(ctx, ` |
| SELECT status, owner_email, subject, last_ingested_data FROM Changelists WHERE changelist_id = $1`, qID) |
| var r schema.ChangelistRow |
| err := row.Scan(&r.Status, &r.OwnerEmail, &r.Subject, &r.LastIngestedData) |
| if err != nil { |
| if err == pgx.ErrNoRows { |
| return code_review.Changelist{}, clstore.ErrNotFound |
| } |
| return code_review.Changelist{}, skerr.Wrapf(err, "querying for id %s", qID) |
| } |
| return code_review.Changelist{ |
| SystemID: id, |
| Owner: r.OwnerEmail, |
| Status: convertToStatusEnum(r.Status), |
| Subject: r.Subject, |
| Updated: r.LastIngestedData.UTC(), |
| }, nil |
| } |
| |
| // PutChangelist implements clstore.Store. |
| func (s *StoreImpl) PutChangelist(ctx context.Context, cl code_review.Changelist) error { |
| qID := sql.Qualify(s.systemID, cl.SystemID) |
| const statement = ` |
| UPSERT INTO Changelists (changelist_id, system, status, owner_email, subject, last_ingested_data) |
| VALUES ($1, $2, $3, $4, $5, $6)` |
| _, err := s.db.Exec(ctx, statement, qID, s.systemID, convertFromStatusEnum(cl.Status), cl.Owner, cl.Subject, cl.Updated) |
| if err != nil { |
| return skerr.Wrapf(err, "Inserting CL %#v", cl) |
| } |
| return nil |
| } |
| |
| // GetPatchsets implements clstore.Store. |
| func (s *StoreImpl) GetPatchsets(ctx context.Context, clID string) ([]code_review.Patchset, error) { |
| qID := sql.Qualify(s.systemID, clID) |
| rows, err := s.db.Query(ctx, ` |
| SELECT patchset_id, ps_order, git_hash, commented_on_cl, last_checked_if_comment_necessary |
| FROM Patchsets WHERE changelist_id = $1 ORDER BY ps_order ASC`, qID) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "querying for cl %s", qID) |
| } |
| defer rows.Close() |
| var rv []code_review.Patchset |
| for rows.Next() { |
| var r schema.PatchsetRow |
| err := rows.Scan(&r.PatchsetID, &r.Order, &r.GitHash, &r.CommentedOnCL, &r.LastCheckedIfCommentNecessary) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "Scanning data for cl %s", qID) |
| } |
| rv = append(rv, code_review.Patchset{ |
| SystemID: sql.Unqualify(r.PatchsetID), |
| ChangelistID: clID, |
| Order: r.Order, |
| GitHash: r.GitHash, |
| CommentedOnCL: r.CommentedOnCL, |
| LastCheckedIfCommentNecessary: r.LastCheckedIfCommentNecessary.UTC(), |
| }) |
| } |
| return rv, nil |
| } |
| |
| // GetPatchset implements clstore.Store. |
| func (s *StoreImpl) GetPatchset(ctx context.Context, _, psID string) (code_review.Patchset, error) { |
| qID := sql.Qualify(s.systemID, psID) |
| row := s.db.QueryRow(ctx, ` |
| SELECT changelist_id, ps_order, git_hash, commented_on_cl, last_checked_if_comment_necessary |
| FROM Patchsets WHERE patchset_id = $1`, qID) |
| var r schema.PatchsetRow |
| err := row.Scan(&r.ChangelistID, &r.Order, &r.GitHash, &r.CommentedOnCL, &r.LastCheckedIfCommentNecessary) |
| if err != nil { |
| if err == pgx.ErrNoRows { |
| return code_review.Patchset{}, clstore.ErrNotFound |
| } |
| return code_review.Patchset{}, skerr.Wrapf(err, "querying for id %s", qID) |
| } |
| return code_review.Patchset{ |
| SystemID: psID, |
| ChangelistID: sql.Unqualify(r.ChangelistID), |
| Order: r.Order, |
| GitHash: r.GitHash, |
| CommentedOnCL: r.CommentedOnCL, |
| LastCheckedIfCommentNecessary: r.LastCheckedIfCommentNecessary.UTC(), |
| }, nil |
| } |
| |
| // GetPatchsetByOrder implements clstore.Store. |
| func (s *StoreImpl) GetPatchsetByOrder(ctx context.Context, clID string, psOrder int) (code_review.Patchset, error) { |
| qID := sql.Qualify(s.systemID, clID) |
| row := s.db.QueryRow(ctx, ` |
| SELECT patchset_id, git_hash, commented_on_cl, last_checked_if_comment_necessary |
| FROM Patchsets WHERE changelist_id = $1 AND ps_order = $2`, qID, psOrder) |
| var r schema.PatchsetRow |
| err := row.Scan(&r.PatchsetID, &r.GitHash, &r.CommentedOnCL, &r.LastCheckedIfCommentNecessary) |
| if err != nil { |
| if err == pgx.ErrNoRows { |
| return code_review.Patchset{}, clstore.ErrNotFound |
| } |
| return code_review.Patchset{}, skerr.Wrapf(err, "querying for order %s-%d", qID, psOrder) |
| } |
| return code_review.Patchset{ |
| SystemID: sql.Unqualify(r.PatchsetID), |
| ChangelistID: clID, |
| Order: psOrder, |
| GitHash: r.GitHash, |
| CommentedOnCL: r.CommentedOnCL, |
| LastCheckedIfCommentNecessary: r.LastCheckedIfCommentNecessary.UTC(), |
| }, nil |
| } |
| |
| // PutPatchset implements clstore.Store. Note that due to foreign key constraints, it will fail |
| // if the Changelist does not already exist. |
| func (s *StoreImpl) PutPatchset(ctx context.Context, ps code_review.Patchset) error { |
| psID := sql.Qualify(s.systemID, ps.SystemID) |
| clID := sql.Qualify(s.systemID, ps.ChangelistID) |
| const statement = ` |
| UPSERT INTO Patchsets (patchset_id, system, changelist_id, ps_order, git_hash, |
| commented_on_cl, last_checked_if_comment_necessary) |
| VALUES ($1, $2, $3, $4, $5, $6, $7)` |
| _, err := s.db.Exec(ctx, statement, psID, s.systemID, clID, ps.Order, ps.GitHash, |
| ps.CommentedOnCL, ps.LastCheckedIfCommentNecessary) |
| if err != nil { |
| return skerr.Wrapf(err, "Inserting PS %#v", ps) |
| } |
| return nil |
| } |
| |
| func convertToStatusEnum(status schema.ChangelistStatus) code_review.CLStatus { |
| switch status { |
| case schema.StatusAbandoned: |
| return code_review.Abandoned |
| case schema.StatusOpen: |
| return code_review.Open |
| case schema.StatusLanded: |
| return code_review.Landed |
| } |
| sklog.Warningf("Unknown status: %s", status) |
| return code_review.Abandoned |
| } |
| |
| func convertFromStatusEnum(status code_review.CLStatus) schema.ChangelistStatus { |
| switch status { |
| case code_review.Abandoned: |
| return schema.StatusAbandoned |
| case code_review.Open: |
| return schema.StatusOpen |
| case code_review.Landed: |
| return schema.StatusLanded |
| } |
| sklog.Warningf("Unknown status: %d", status) |
| return schema.StatusAbandoned |
| } |
| |
| // Make sure StoreImpl fulfills the clstore.Store interface. |
| var _ clstore.Store = (*StoreImpl)(nil) |