blob: 2e3ef843761167902a148d54aca453172f520c6f [file] [log] [blame]
// Package fs_clstore implements the clstore.Store interface with
// a FireStore backend.
package fs_clstore
import (
"context"
"fmt"
"time"
"cloud.google.com/go/firestore"
ifirestore "go.skia.org/infra/go/firestore"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/golden/go/clstore"
"go.skia.org/infra/golden/go/code_review"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
// These are the collections in Firestore.
changelistCollection = "clstore_changelist"
patchsetCollection = "clstore_patchset"
// These are the fields we query by
orderField = "order"
statusField = "status"
updatedField = "updated"
maxReadAttempts = 5
maxWriteAttempts = 5
maxOperationTime = time.Minute
)
// StoreImpl is the Firestore based implementation of clstore.
type StoreImpl struct {
client *ifirestore.Client
crsID string
}
// New returns a new StoreImpl. The crsID should be distinct enough to separate an internal CRS
// from an external one, if needed (e.g. "gerrit" vs "gerrit-internal")
func New(client *ifirestore.Client, crsID string) *StoreImpl {
return &StoreImpl{
client: client,
crsID: crsID,
}
}
// changelistEntry represents how a Changelist is stored in Firestore.
type changelistEntry struct {
SystemID string `firestore:"systemid"`
System string `firestore:"system"`
Owner string `firestore:"owner"`
Status code_review.CLStatus `firestore:"status"`
Subject string `firestore:"subject"`
Updated time.Time `firestore:"updated"`
}
// patchsetEntry represents how a Patchset is stored in Firestore.
type patchsetEntry struct {
SystemID string `firestore:"systemid"`
System string `firestore:"system"`
ChangelistID string `firestore:"changelistid"`
Order int `firestore:"order"`
GitHash string `firestore:"githash"`
CommentedOnCL bool `firestore:"did_comment"`
LastCheckedIfCommentNecessary time.Time `firestore:"last_checked_about_comment"`
}
// toPatchset converts the Firestore representation of a Patchset to a code_review.Patchset
func (p patchsetEntry) toPatchset() code_review.Patchset {
return code_review.Patchset{
SystemID: p.SystemID,
ChangelistID: p.ChangelistID,
Order: p.Order,
GitHash: p.GitHash,
LastCheckedIfCommentNecessary: p.LastCheckedIfCommentNecessary,
CommentedOnCL: p.CommentedOnCL,
}
}
// GetChangelist implements the clstore.Store interface.
func (s *StoreImpl) GetChangelist(ctx context.Context, id string) (code_review.Changelist, error) {
defer metrics2.FuncTimer().Stop()
fID := s.changelistFirestoreID(id)
doc, err := s.client.Collection(changelistCollection).Doc(fID).Get(ctx)
if err != nil {
if status.Code(err) == codes.NotFound {
return code_review.Changelist{}, clstore.ErrNotFound
}
return code_review.Changelist{}, skerr.Wrapf(err, "retrieving CL %s from Firestore", fID)
}
if doc == nil {
return code_review.Changelist{}, clstore.ErrNotFound
}
cle := changelistEntry{}
if err := doc.DataTo(&cle); err != nil {
id := doc.Ref.ID
return code_review.Changelist{}, skerr.Wrapf(err, "corrupt data in Firestore, could not unmarshal %s changelist with id %s", s.crsID, id)
}
cl := code_review.Changelist{
SystemID: cle.SystemID,
Owner: cle.Owner,
Status: cle.Status,
Subject: cle.Subject,
Updated: cle.Updated,
}
return cl, nil
}
// changelistFirestoreID returns the id for a given CL in a given CRS - this allows us to
// look up a document by id w/o having to perform a query.
func (s *StoreImpl) changelistFirestoreID(clID string) string {
return clID + "_" + s.crsID
}
// GetChangelists implements the clstore.Store interface.
func (s *StoreImpl) GetChangelists(ctx context.Context, opts clstore.SearchOptions) ([]code_review.Changelist, int, error) {
defer metrics2.FuncTimer().Stop()
if opts.Limit <= 0 {
return nil, 0, skerr.Fmt("must supply a limit")
}
q := s.client.Collection(changelistCollection).OrderBy(updatedField, firestore.Desc)
if !opts.After.IsZero() {
q = q.Where(updatedField, ">=", opts.After)
}
if opts.OpenCLsOnly {
q = q.Where(statusField, "==", code_review.Open)
}
q = q.Limit(opts.Limit).Offset(opts.StartIdx)
var xcl []code_review.Changelist
r := fmt.Sprintf("[%d:%d]", opts.StartIdx, opts.StartIdx+opts.Limit)
err := s.client.IterDocs(ctx, "GetChangelists", r, q, maxReadAttempts, maxOperationTime, func(doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
entry := changelistEntry{}
if err := doc.DataTo(&entry); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in Firestore, could not unmarshal entry with id %s", id)
}
xcl = append(xcl, code_review.Changelist{
SystemID: entry.SystemID,
Updated: entry.Updated,
Subject: entry.Subject,
Status: entry.Status,
Owner: entry.Owner,
})
return nil
})
if err != nil {
return nil, -1, skerr.Wrapf(err, "fetching cls in range %s", r)
}
n := len(xcl)
if n == opts.Limit && n != 0 {
// We don't know how many there are and it might be too slow to count, so just give
// the "many" response.
n = clstore.CountMany
} else {
// We know exactly either 1) how many there are (if n > 0) or 2) an upper bound on how many
// there are (if n == 0)
n += opts.StartIdx
}
return xcl, n, nil
}
// GetPatchset implements the clstore.Store interface.
func (s *StoreImpl) GetPatchset(ctx context.Context, clID, psID string) (code_review.Patchset, error) {
defer metrics2.FuncTimer().Stop()
fID := s.changelistFirestoreID(clID)
doc, err := s.client.Collection(changelistCollection).Doc(fID).
Collection(patchsetCollection).Doc(psID).Get(ctx)
if err != nil {
if status.Code(err) == codes.NotFound {
return code_review.Patchset{}, clstore.ErrNotFound
}
return code_review.Patchset{}, skerr.Wrapf(err, "retrieving PS %s from Firestore", fID)
}
if doc == nil {
return code_review.Patchset{}, clstore.ErrNotFound
}
pse := patchsetEntry{}
if err := doc.DataTo(&pse); err != nil {
id := doc.Ref.ID
return code_review.Patchset{}, skerr.Wrapf(err, "corrupt data in Firestore, could not unmarshal %s patchset with id %s", s.crsID, id)
}
return pse.toPatchset(), nil
}
// GetPatchsetByOrder implements the clstore.Store interface.
func (s *StoreImpl) GetPatchsetByOrder(ctx context.Context, clID string, psOrder int) (code_review.Patchset, error) {
defer metrics2.FuncTimer().Stop()
fID := s.changelistFirestoreID(clID)
q := s.client.Collection(changelistCollection).Doc(fID).
Collection(patchsetCollection).Where(orderField, "==", psOrder)
ps := code_review.Patchset{}
found := false
msg := fmt.Sprintf("%s:%d", clID, psOrder)
err := s.client.IterDocs(ctx, "GetPatchsetByOrder", msg, q, maxReadAttempts, maxOperationTime, func(doc *firestore.DocumentSnapshot) error {
if doc == nil || found {
return nil
}
entry := patchsetEntry{}
if err := doc.DataTo(&entry); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in Firestore, could not unmarshal patchsetEntry with id %s", id)
}
ps = entry.toPatchset()
found = true
return nil
})
if err != nil {
return code_review.Patchset{}, skerr.Wrapf(err, "fetching patchsets for cl %s", clID)
}
if !found {
return code_review.Patchset{}, clstore.ErrNotFound
}
return ps, nil
}
// GetPatchsets implements the clstore.Store interface.
func (s *StoreImpl) GetPatchsets(ctx context.Context, clID string) ([]code_review.Patchset, error) {
defer metrics2.FuncTimer().Stop()
fID := s.changelistFirestoreID(clID)
q := s.client.Collection(changelistCollection).Doc(fID).
Collection(patchsetCollection).OrderBy(orderField, firestore.Asc)
var xps []code_review.Patchset
err := s.client.IterDocs(ctx, "GetPatchsets", clID, q, maxReadAttempts, maxOperationTime, func(doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
entry := patchsetEntry{}
if err := doc.DataTo(&entry); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in Firestore, could not unmarshal entry with id %s", id)
}
xps = append(xps, entry.toPatchset())
return nil
})
if err != nil {
return nil, skerr.Wrapf(err, "fetching patchsets for cl %s", clID)
}
return xps, nil
}
// PutChangelist implements the clstore.Store interface.
func (s *StoreImpl) PutChangelist(ctx context.Context, cl code_review.Changelist) error {
defer metrics2.FuncTimer().Stop()
fID := s.changelistFirestoreID(cl.SystemID)
cd := s.client.Collection(changelistCollection).Doc(fID)
record := changelistEntry{
SystemID: cl.SystemID,
System: s.crsID,
Owner: cl.Owner,
Status: cl.Status,
Subject: cl.Subject,
Updated: cl.Updated,
}
_, err := s.client.Set(ctx, cd, record, maxWriteAttempts, maxOperationTime)
if err != nil {
return skerr.Wrapf(err, "could not write CL %v to clstore", cl)
}
return nil
}
// PutPatchset implements the clstore.Store interface.
func (s *StoreImpl) PutPatchset(ctx context.Context, ps code_review.Patchset) error {
defer metrics2.FuncTimer().Stop()
fID := s.changelistFirestoreID(ps.ChangelistID)
pd := s.client.Collection(changelistCollection).Doc(fID).
Collection(patchsetCollection).Doc(ps.SystemID)
record := patchsetEntry{
SystemID: ps.SystemID,
System: s.crsID,
ChangelistID: ps.ChangelistID,
Order: ps.Order,
GitHash: ps.GitHash,
LastCheckedIfCommentNecessary: ps.LastCheckedIfCommentNecessary,
CommentedOnCL: ps.CommentedOnCL,
}
_, err := s.client.Set(ctx, pd, record, maxWriteAttempts, maxOperationTime)
if err != nil {
return skerr.Wrapf(err, "could not write PS %v to clstore", ps)
}
return nil
}
// Make sure StoreImpl fulfills the clstore.Store interface.
var _ clstore.Store = (*StoreImpl)(nil)