blob: 379381d83fb331525bd52023dc5573e53f6584f7 [file] [log] [blame]
// Package fs_clstore implements the clstore.Store interface with
// a FireStore backend.
package fs_clstore
import (
"context"
"fmt"
"time"
"cloud.google.com/go/firestore"
ifirestore "go.skia.org/infra/go/firestore"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/golden/go/clstore"
"go.skia.org/infra/golden/go/code_review"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
// These are the collections in Firestore.
changelistCollection = "clstore_changelist"
patchsetCollection = "clstore_patchset"
// These are the fields we query by
orderField = "order"
statusField = "status"
updatedField = "updated"
maxReadAttempts = 5
maxWriteAttempts = 5
maxOperationTime = time.Minute
)
// StoreImpl is the Firestore based implementation of clstore.
type StoreImpl struct {
client *ifirestore.Client
crsID string
}
// New returns a new StoreImpl. The crsID should be distinct enough to separate an internal CRS
// from an external one, if needed (e.g. "gerrit" vs "gerrit-internal")
func New(client *ifirestore.Client, crsID string) *StoreImpl {
return &StoreImpl{
client: client,
crsID: crsID,
}
}
// changeListEntry represents how a ChangeList is stored in Firestore.
type changeListEntry struct {
SystemID string `firestore:"systemid"`
System string `firestore:"system"`
Owner string `firestore:"owner"`
Status code_review.CLStatus `firestore:"status"`
Subject string `firestore:"subject"`
Updated time.Time `firestore:"updated"`
}
// patchSetEntry represents how a PatchSet is stored in Firestore.
type patchSetEntry struct {
SystemID string `firestore:"systemid"`
System string `firestore:"system"`
ChangeListID string `firestore:"changelistid"`
Order int `firestore:"order"`
GitHash string `firestore:"githash"`
CommentedOnCL bool `firestore:"did_comment"`
LastCheckedIfCommentNecessary time.Time `firestore:"last_checked_about_comment"`
}
// toPatchSet converts the Firestore representation of a PatchSet to a code_review.PatchSet
func (p patchSetEntry) toPatchSet() code_review.PatchSet {
return code_review.PatchSet{
SystemID: p.SystemID,
ChangeListID: p.ChangeListID,
Order: p.Order,
GitHash: p.GitHash,
LastCheckedIfCommentNecessary: p.LastCheckedIfCommentNecessary,
CommentedOnCL: p.CommentedOnCL,
}
}
// GetChangeList implements the clstore.Store interface.
func (s *StoreImpl) GetChangeList(ctx context.Context, id string) (code_review.ChangeList, error) {
defer metrics2.FuncTimer().Stop()
fID := s.changeListFirestoreID(id)
doc, err := s.client.Collection(changelistCollection).Doc(fID).Get(ctx)
if err != nil {
if status.Code(err) == codes.NotFound {
return code_review.ChangeList{}, clstore.ErrNotFound
}
return code_review.ChangeList{}, skerr.Wrapf(err, "retrieving CL %s from Firestore", fID)
}
if doc == nil {
return code_review.ChangeList{}, clstore.ErrNotFound
}
cle := changeListEntry{}
if err := doc.DataTo(&cle); err != nil {
id := doc.Ref.ID
return code_review.ChangeList{}, skerr.Wrapf(err, "corrupt data in Firestore, could not unmarshal %s changelist with id %s", s.crsID, id)
}
cl := code_review.ChangeList{
SystemID: cle.SystemID,
Owner: cle.Owner,
Status: cle.Status,
Subject: cle.Subject,
Updated: cle.Updated,
}
return cl, nil
}
// changeListFirestoreID returns the id for a given CL in a given CRS - this allows us to
// look up a document by id w/o having to perform a query.
func (s *StoreImpl) changeListFirestoreID(clID string) string {
return clID + "_" + s.crsID
}
// GetChangeLists implements the clstore.Store interface.
func (s *StoreImpl) GetChangeLists(ctx context.Context, opts clstore.SearchOptions) ([]code_review.ChangeList, int, error) {
defer metrics2.FuncTimer().Stop()
if opts.Limit <= 0 {
return nil, 0, skerr.Fmt("must supply a limit")
}
q := s.client.Collection(changelistCollection).OrderBy(updatedField, firestore.Desc)
if !opts.After.IsZero() {
q = q.Where(updatedField, ">=", opts.After)
}
if opts.OpenCLsOnly {
q = q.Where(statusField, "==", code_review.Open)
}
q = q.Limit(opts.Limit).Offset(opts.StartIdx)
var xcl []code_review.ChangeList
r := fmt.Sprintf("[%d:%d]", opts.StartIdx, opts.StartIdx+opts.Limit)
err := s.client.IterDocs(ctx, "GetChangeLists", r, q, maxReadAttempts, maxOperationTime, func(doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
entry := changeListEntry{}
if err := doc.DataTo(&entry); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in Firestore, could not unmarshal entry with id %s", id)
}
xcl = append(xcl, code_review.ChangeList{
SystemID: entry.SystemID,
Updated: entry.Updated,
Subject: entry.Subject,
Status: entry.Status,
Owner: entry.Owner,
})
return nil
})
if err != nil {
return nil, -1, skerr.Wrapf(err, "fetching cls in range %s", r)
}
n := len(xcl)
if n == opts.Limit && n != 0 {
// We don't know how many there are and it might be too slow to count, so just give
// the "many" response.
n = clstore.CountMany
} else {
// We know exactly either 1) how many there are (if n > 0) or 2) an upper bound on how many
// there are (if n == 0)
n += opts.StartIdx
}
return xcl, n, nil
}
// GetPatchSet implements the clstore.Store interface.
func (s *StoreImpl) GetPatchSet(ctx context.Context, clID, psID string) (code_review.PatchSet, error) {
defer metrics2.FuncTimer().Stop()
fID := s.changeListFirestoreID(clID)
doc, err := s.client.Collection(changelistCollection).Doc(fID).
Collection(patchsetCollection).Doc(psID).Get(ctx)
if err != nil {
if status.Code(err) == codes.NotFound {
return code_review.PatchSet{}, clstore.ErrNotFound
}
return code_review.PatchSet{}, skerr.Wrapf(err, "retrieving PS %s from Firestore", fID)
}
if doc == nil {
return code_review.PatchSet{}, clstore.ErrNotFound
}
pse := patchSetEntry{}
if err := doc.DataTo(&pse); err != nil {
id := doc.Ref.ID
return code_review.PatchSet{}, skerr.Wrapf(err, "corrupt data in Firestore, could not unmarshal %s patchset with id %s", s.crsID, id)
}
return pse.toPatchSet(), nil
}
// GetPatchSetByOrder implements the clstore.Store interface.
func (s *StoreImpl) GetPatchSetByOrder(ctx context.Context, clID string, psOrder int) (code_review.PatchSet, error) {
defer metrics2.FuncTimer().Stop()
fID := s.changeListFirestoreID(clID)
q := s.client.Collection(changelistCollection).Doc(fID).
Collection(patchsetCollection).Where(orderField, "==", psOrder)
ps := code_review.PatchSet{}
found := false
msg := fmt.Sprintf("%s:%d", clID, psOrder)
err := s.client.IterDocs(ctx, "GetPatchSetByOrder", msg, q, maxReadAttempts, maxOperationTime, func(doc *firestore.DocumentSnapshot) error {
if doc == nil || found {
return nil
}
entry := patchSetEntry{}
if err := doc.DataTo(&entry); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in Firestore, could not unmarshal patchsetEntry with id %s", id)
}
ps = entry.toPatchSet()
found = true
return nil
})
if err != nil {
return code_review.PatchSet{}, skerr.Wrapf(err, "fetching patchsets for cl %s", clID)
}
if !found {
return code_review.PatchSet{}, clstore.ErrNotFound
}
return ps, nil
}
// GetPatchSets implements the clstore.Store interface.
func (s *StoreImpl) GetPatchSets(ctx context.Context, clID string) ([]code_review.PatchSet, error) {
defer metrics2.FuncTimer().Stop()
fID := s.changeListFirestoreID(clID)
q := s.client.Collection(changelistCollection).Doc(fID).
Collection(patchsetCollection).OrderBy(orderField, firestore.Asc)
var xps []code_review.PatchSet
err := s.client.IterDocs(ctx, "GetPatchSets", clID, q, maxReadAttempts, maxOperationTime, func(doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
entry := patchSetEntry{}
if err := doc.DataTo(&entry); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in Firestore, could not unmarshal entry with id %s", id)
}
xps = append(xps, entry.toPatchSet())
return nil
})
if err != nil {
return nil, skerr.Wrapf(err, "fetching patchsets for cl %s", clID)
}
return xps, nil
}
// PutChangeList implements the clstore.Store interface.
func (s *StoreImpl) PutChangeList(ctx context.Context, cl code_review.ChangeList) error {
defer metrics2.FuncTimer().Stop()
fID := s.changeListFirestoreID(cl.SystemID)
cd := s.client.Collection(changelistCollection).Doc(fID)
record := changeListEntry{
SystemID: cl.SystemID,
System: s.crsID,
Owner: cl.Owner,
Status: cl.Status,
Subject: cl.Subject,
Updated: cl.Updated,
}
_, err := s.client.Set(ctx, cd, record, maxWriteAttempts, maxOperationTime)
if err != nil {
return skerr.Wrapf(err, "could not write CL %v to clstore", cl)
}
return nil
}
// PutPatchSet implements the clstore.Store interface.
func (s *StoreImpl) PutPatchSet(ctx context.Context, ps code_review.PatchSet) error {
defer metrics2.FuncTimer().Stop()
fID := s.changeListFirestoreID(ps.ChangeListID)
pd := s.client.Collection(changelistCollection).Doc(fID).
Collection(patchsetCollection).Doc(ps.SystemID)
record := patchSetEntry{
SystemID: ps.SystemID,
System: s.crsID,
ChangeListID: ps.ChangeListID,
Order: ps.Order,
GitHash: ps.GitHash,
LastCheckedIfCommentNecessary: ps.LastCheckedIfCommentNecessary,
CommentedOnCL: ps.CommentedOnCL,
}
_, err := s.client.Set(ctx, pd, record, maxWriteAttempts, maxOperationTime)
if err != nil {
return skerr.Wrapf(err, "could not write PS %v to clstore", ps)
}
return nil
}
// Make sure StoreImpl fulfills the clstore.Store interface.
var _ clstore.Store = (*StoreImpl)(nil)