blob: 8a605766d147573a52f6fd0c43128f1b82f110b6 [file] [log] [blame]
// Package fscommentstore contains a Firestore-based implementation of comment.Store.
package fscommentstore
import (
"context"
"sort"
"sync"
"time"
"cloud.google.com/go/firestore"
ifirestore "go.skia.org/infra/go/firestore"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/golden/go/comment"
"go.skia.org/infra/golden/go/comment/trace"
"go.skia.org/infra/golden/go/fs_utils"
)
const (
// These are the collections in Firestore.
traceCommentCollection = "commentstore_tracecomments"
maxReadAttempts = 5
maxWriteAttempts = 5
maxOperationTime = time.Minute
)
// StoreImpl is the Firestore-based implementation of comment.Store. It uses query snapshots to
// synchronize a local commentCache with the data in Firestore, thus it does not need to poll. We
// don't do any searches on our data, so keeping all the trace comments in RAM (backed by Firestore)
// is a fine solution for the sake of performance (and avoiding unnecessary Firestore traffic).
type StoreImpl struct {
client *ifirestore.Client
cacheMutex sync.RWMutex
commentCache map[trace.ID]trace.Comment
}
// commentEntry represents how an trace.Comment is stored in Firestore.
type commentEntry struct {
CreatedBy string `firestore:"createdby"`
UpdatedBy string `firestore:"updatedby"`
CreatedTS time.Time `firestore:"createdts"`
UpdatedTS time.Time `firestore:"updatedts"`
QueryToMatch paramtools.ParamSet `firestore:"query"`
Comment string `firestore:"comment"`
}
// toTraceComment converts a Firestore commentEntry into a trace.Comment. The QueryToMatch field
// will be a normalized copy of the one from the passed in entry (to avoid a race condition between
// a potentially cached commentEntry and the returned value).
func toTraceComment(id trace.ID, r commentEntry) trace.Comment {
q := r.QueryToMatch.Copy()
q.Normalize()
return trace.Comment{
ID: id,
CreatedBy: r.CreatedBy,
UpdatedBy: r.UpdatedBy,
CreatedTS: r.CreatedTS,
UpdatedTS: r.UpdatedTS,
Comment: r.Comment,
QueryToMatch: q,
}
}
// toEntry converts a trace.Comment into a Firestore commentEntry. The QueryToMatch field will be
// a normalized copy of the one from the passed in comment (to avoid a race condition between
// a potentially cached commentEntry and the passed in value).
func toEntry(r trace.Comment) commentEntry {
q := r.QueryToMatch.Copy()
q.Normalize()
return commentEntry{
CreatedBy: r.CreatedBy,
UpdatedBy: r.UpdatedBy,
CreatedTS: r.CreatedTS,
UpdatedTS: r.UpdatedTS,
Comment: r.Comment,
QueryToMatch: q,
}
}
// New returns a new StoreImpl.
func New(ctx context.Context, client *ifirestore.Client) *StoreImpl {
s := &StoreImpl{
client: client,
commentCache: map[trace.ID]trace.Comment{},
}
s.startQueryIterator(ctx)
return s
}
// startQueryIterator sets up the listener to the Query Snapshots which keep the local commentCache of
// comments in sync with those in Firestore.
func (s *StoreImpl) startQueryIterator(ctx context.Context) {
go func() {
// We don't ever expect there to be a lot (>10,000) of trace comments, so a single,
// un-sharded snapshot should suffice to read in all the stored rules. If it doesn't, look
// at how fs_expectationstore does it.
snapFactory := func() *firestore.QuerySnapshotIterator {
return s.client.Collection(traceCommentCollection).Snapshots(ctx)
}
err := fs_utils.ListenAndRecover(ctx, nil, snapFactory, s.updateCacheWithEntriesFrom)
if err != nil {
sklog.Errorf("Unrecoverable error: %s", err)
}
}()
}
// updateCacheWithEntriesFrom loops through all the changes in the given snapshot and updates
// the commentCache with those new values (or deletes the old ones).
func (s *StoreImpl) updateCacheWithEntriesFrom(_ context.Context, qs *firestore.QuerySnapshot) error {
s.cacheMutex.Lock()
defer s.cacheMutex.Unlock()
for _, dc := range qs.Changes {
id := trace.ID(dc.Doc.Ref.ID)
if dc.Kind == firestore.DocumentRemoved {
delete(s.commentCache, id)
continue
}
entry := commentEntry{}
if err := dc.Doc.DataTo(&entry); err != nil {
sklog.Errorf("corrupt data in firestore, could not unmarshal commentEntry with id %s", id)
continue
}
s.commentCache[id] = toTraceComment(id, entry)
}
return nil
}
// CreateComment implements the comment.Store interface.
func (s *StoreImpl) CreateComment(ctx context.Context, c trace.Comment) (trace.ID, error) {
doc := s.client.Collection(traceCommentCollection).NewDoc()
if _, err := s.client.Set(ctx, doc, toEntry(c), maxWriteAttempts, maxOperationTime); err != nil {
return "", skerr.Wrapf(err, "storing new trace comment to Firestore (%#v)", c)
}
c.ID = trace.ID(doc.ID)
// Make a copy of the map to make sure our cache doesn't accidentally share it with the
// passed-in version.
c.QueryToMatch = c.QueryToMatch.Copy()
// Update the cache (this helps avoid flakes on unit tests).
s.cacheMutex.Lock()
defer s.cacheMutex.Unlock()
s.commentCache[c.ID] = c
return c.ID, nil
}
// ListComments implements the comment.Store interface. It returns the local commentCache of trace
// comments, never re-fetching from Firestore because the query snapshots should keep the local
// commentCache up to date, give or take a few seconds.
func (s *StoreImpl) ListComments(_ context.Context) ([]trace.Comment, error) {
s.cacheMutex.RLock()
defer s.cacheMutex.RUnlock()
rv := make([]trace.Comment, 0, len(s.commentCache))
for _, c := range s.commentCache {
// Return a copy of the QueryToMatch map, so subsequent mutations do not impact the
// cached version.
c.QueryToMatch = c.QueryToMatch.Copy()
rv = append(rv, c)
}
sort.Slice(rv, func(i, j int) bool {
return rv[i].UpdatedTS.Before(rv[j].UpdatedTS)
})
return rv, nil
}
// UpdateComment implements the comment.Store interface.
func (s *StoreImpl) UpdateComment(ctx context.Context, comment trace.Comment) error {
if comment.ID == "" {
return skerr.Fmt("ID for trace comment cannot be empty")
}
doc := s.client.Collection(traceCommentCollection).Doc(string(comment.ID))
dSnap, err := s.client.Get(ctx, doc, maxReadAttempts, maxOperationTime)
if err != nil {
return skerr.Wrapf(err, "getting trace comment %s before updating", comment.ID)
}
var oldComment commentEntry
if err := dSnap.DataTo(&oldComment); err != nil {
return skerr.Wrapf(err, "corrupt data in firestore for id %s", comment.ID)
}
updatedComment := toEntry(comment)
updatedComment.CreatedBy = oldComment.CreatedBy
updatedComment.CreatedTS = oldComment.CreatedTS
if _, err := s.client.Set(ctx, doc, updatedComment, maxWriteAttempts, maxOperationTime); err != nil {
return skerr.Wrapf(err, "updating trace comment %s", comment.ID)
}
// Update the cache (this helps avoid flakes on unit tests).
s.cacheMutex.Lock()
defer s.cacheMutex.Unlock()
s.commentCache[comment.ID] = toTraceComment(comment.ID, updatedComment)
return nil
}
// DeleteComment implements the comment.Store interface.
func (s *StoreImpl) DeleteComment(ctx context.Context, id trace.ID) error {
if id == "" {
return skerr.Fmt("ID for trace comment cannot be empty")
}
toDelete := s.client.Collection(traceCommentCollection).Doc(string(id))
if _, err := s.client.Delete(ctx, toDelete, maxWriteAttempts, maxOperationTime); err != nil {
return skerr.Wrapf(err, "deleting trace comment with id %s", id)
}
// Update the cache (this helps avoid flakes on unit tests).
s.cacheMutex.Lock()
defer s.cacheMutex.Unlock()
delete(s.commentCache, id)
return nil
}
// Make sure Store fulfills the comment.Store interface
var _ comment.Store = (*StoreImpl)(nil)