blob: 91b32cf47299d73b9a65cfa6edf4e8786b65a295 [file] [log] [blame]
package fs_expectationstore
import (
"context"
"errors"
"fmt"
"math"
"sort"
"strings"
"sync"
"time"
"cloud.google.com/go/firestore"
"go.opencensus.io/trace"
"golang.org/x/sync/errgroup"
ifirestore "go.skia.org/infra/go/firestore"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/golden/go/expectations"
"go.skia.org/infra/golden/go/fs_utils"
"go.skia.org/infra/golden/go/types"
)
const (
partitions = "expstore_partitions_v3"
expectationEntries = "entries"
recordEntries = "triage_records"
changeEntries = "triage_changes"
masterPartition = "master"
digestField = "digest"
needsGCField = "needs_gc"
committedField = "committed"
tsField = "ts"
groupingField = "grouping"
recordIDField = "record_id"
lastUsedField = "last_used"
beginningOfTime = 0
endOfTime = math.MaxInt32
// These parameters were chosen somewhat arbitrarily to make sure that if Firestore takes a while,
// we try our best (but not too hard) to make reads or writes happen.
maxOperationTime = 2 * time.Minute
maxRetries = 3
// The number of shards was determined experimentally based on 500k and then 100k expectation
// entries. 100k is about what we expect on a master partition. 100 is a typical guess for any
// particular CL expectation, so we can get away with fewer shards.
masterPartitionShards = 16
clPartitionShards = 2
)
// Store implements expectations.Store backed by Firestore. It has a local expectationCache of the
// expectations to reduce load on firestore
type Store struct {
client *ifirestore.Client
mode AccessMode
// Sharding our loading of expectations can drastically improve throughput. The number of shards
// can be different, depending on the approximate size of partition. We generally assume that
// the number of expectations on the master partition is much much bigger than on any other
// partition.
numShards int
// CL expectations are kept apart from those on master by dividing how we store expectations
// into partitions. Partitions look like "master" or "gerrit_123456".
partition string
// notifier allows this Store to communicate with the outside world when
// expectations change.
notifier expectations.ChangeNotifier
// entryCache is an in-memory representation of the expectations in Firestore.
entryCache map[expectations.ID]expectationEntry
entryCacheMutex sync.RWMutex
// returnCache allows us to cache the return value for Get() if there haven't been any changes
// to the expectations since the previous call.
returnCache *expectations.Expectations
// this mutex is for the returnCacheObject (i.e. setting it to nil. expectations.Expectations
// is thread-safe for setting/updating).
returnCacheMutex sync.Mutex
// hasSnapshotsRunning tracks if the snapshots were started due to a call from Initialize().
hasSnapshotsRunning bool
// now allows the mocking of time in tests.
now func() time.Time
}
// AccessMode indicates if this ExpectationsStore can update existing Expectations
// in the backing store or if if can only read them.
type AccessMode int
const (
ReadOnly AccessMode = iota
ReadWrite
)
var (
// ReadOnlyErr will be returned if any mutating actions are called on the Store and it is not
// configured for that.
ReadOnlyErr = errors.New("expectationStore is in read-only mode")
)
// expectationEntry is the document type stored in the expectationsCollection.
type expectationEntry struct {
Grouping types.TestName `firestore:"grouping"`
Digest types.Digest `firestore:"digest"`
Updated time.Time `firestore:"updated"`
LastUsed time.Time `firestore:"last_used"`
// This is sorted by FirstIndex and should have no duplicate ranges for FirstIndex and LastIndex.
// That is, we should not have two ranges that both cover [3, 9].
Ranges []triageRange `firestore:"ranges"`
NeedsGC bool `firestore:"needs_gc"`
}
// ID returns the deterministic ID that lets us update existing entries.
func (e *expectationEntry) ID() string {
s := string(e.Grouping) + "|" + string(e.Digest)
// firestore gets cranky if there are / in key names
return strings.Replace(s, "/", "-", -1)
}
// entryID turns an expectations.ID into the id string Firestore expects for an entry.
func entryID(id expectations.ID) string {
return (&expectationEntry{
Grouping: id.Grouping,
Digest: id.Digest,
}).ID()
}
// expectationChange represents the changing of a single expectation entry.
type expectationChange struct {
// RecordID refers to a document in the records collection.
RecordID string `firestore:"record_id"`
Grouping types.TestName `firestore:"grouping"`
Digest types.Digest `firestore:"digest"`
AffectedRange triageRange `firestore:"affected_range"`
LabelBefore expectations.LabelInt `firestore:"label_before"`
}
type triageRange struct {
FirstIndex int `firestore:"first_index"`
LastIndex int `firestore:"last_index"`
Label expectations.LabelInt `firestore:"label"`
}
// triageRecord represents a group of changes made in a single triage action by a user.
type triageRecord struct {
UserName string `firestore:"user"`
TS time.Time `firestore:"ts"`
Changes int `firestore:"changes"`
Committed bool `firestore:"committed"`
}
func New(client *ifirestore.Client, cn expectations.ChangeNotifier, mode AccessMode) *Store {
return &Store{
client: client,
notifier: cn,
partition: masterPartition,
numShards: masterPartitionShards,
mode: mode,
entryCache: map[expectations.ID]expectationEntry{},
now: time.Now,
}
}
// Initialize begins several goroutines which monitor Firestore QuerySnapshots, will begin
// watching for changes to the expectations, keeping the cache fresh. This also loads the initial
// set of expectation entries into the local RAM cache. It should be called on long-lived instances
// which need the in-memory cache to stay synchronized with the underlying Firestore collection.
func (s *Store) Initialize(ctx context.Context) error {
// Make the initial query of all expectations currently in the store, sharded so as to improve
// performance.
queries := fs_utils.ShardOnDigest(s.expectationsCollection(), digestField, s.numShards)
expectationSnapshots := make([]*firestore.QuerySnapshotIterator, s.numShards)
entriesByShard := make([][]expectationEntry, s.numShards)
var eg errgroup.Group
for shard, q := range queries {
func(shard int, q firestore.Query) {
eg.Go(func() error {
snap := q.Snapshots(ctx)
qs, err := snap.Next()
if err != nil {
return skerr.Wrapf(err, "getting initial snapshot data shard[%d]", shard)
}
entriesByShard[shard] = extractExpectationEntries(qs)
expectationSnapshots[shard] = snap
return nil
})
}(shard, q)
}
err := eg.Wait()
if err != nil {
return skerr.Wrap(err)
}
// Now we load the RAM cache with all of the loaded expectations.
func() {
s.entryCacheMutex.Lock()
defer s.entryCacheMutex.Unlock()
for _, entries := range entriesByShard {
for _, e := range entries {
// Due to how we shard our queries, there shouldn't be any overwriting of one shard
// over another.
id := expectations.ID{
Grouping: e.Grouping,
Digest: e.Digest,
}
s.entryCache[id] = e
}
}
}()
// Re-using those shards from earlier, we start goroutines to wait for any new expectation
// entries to be seen or changes to existing ones. When those happen, we update the cache.
// TODO(kjlubick) add an alert for this metric if it becomes non-zero.
stoppedShardsMetric := metrics2.GetCounter("stopped_expstore_shards")
stoppedShardsMetric.Reset()
for i := 0; i < s.numShards; i++ {
go func(shard int) {
snapFactory := func() *firestore.QuerySnapshotIterator {
queries := fs_utils.ShardOnDigest(s.expectationsCollection(), digestField, s.numShards)
return queries[shard].Snapshots(ctx)
}
// reuse the initial snapshots, so we don't have to re-load all the data again (for
// which there could be a lot).
err := fs_utils.ListenAndRecover(ctx, expectationSnapshots[shard], snapFactory, s.updateCacheAndNotify)
if err != nil {
sklog.Errorf("Unrecoverable error: %s", err)
}
stoppedShardsMetric.Inc(1)
}(i)
}
s.hasSnapshotsRunning = true
return nil
}
// updateCacheAndNotify updates the cached expectations with the given snapshot and then sends
// notifications to the notifier about the updates.
func (s *Store) updateCacheAndNotify(_ context.Context, qs *firestore.QuerySnapshot) error {
entries := extractExpectationEntries(qs)
toNotify := func() []expectations.ID {
var toNotify []expectations.ID
// We need to be careful to Unlock entryCacheMutex before trying to Lock returnCacheMutex,
// otherwise we can deadlock with Get() or GetCopy().
s.entryCacheMutex.Lock()
defer s.entryCacheMutex.Unlock()
for _, newEntry := range entries {
id := expectations.ID{
Grouping: newEntry.Grouping,
Digest: newEntry.Digest,
}
existing, ok := s.entryCache[id]
// We always update to the cached version.
s.entryCache[id] = newEntry
if ok {
// We get notifications when UpdateLastUsed updates timestamps. These don't require an
// event to be published since they do not affect the labels. The best way to check if
// something material changed is to compare the Updated timestamp.
if existing.Updated.Equal(newEntry.Updated) {
continue
}
}
toNotify = append(toNotify, id)
}
return toNotify
}()
if len(toNotify) > 0 {
// There were a non-zero amount of actual changes to the expectations. Purge the cached
// version that we return from Get.
func() {
s.returnCacheMutex.Lock()
defer s.returnCacheMutex.Unlock()
s.returnCache = nil
}()
}
if s.notifier != nil {
for _, id := range toNotify {
s.notifier.NotifyChange(id)
}
}
return nil
}
// extractExpectationEntries retrieves all expectation entries from a given QuerySnapshot, logging
// any errors (which should be exceedingly rare).
func extractExpectationEntries(qs *firestore.QuerySnapshot) []expectationEntry {
var entries []expectationEntry
for _, dc := range qs.Changes {
if dc.Kind == firestore.DocumentRemoved {
// TODO(kjlubick): It would probably be good to return a slice of expectation.IDs that
// can get removed from the cache.
continue
}
entry := expectationEntry{}
if err := dc.Doc.DataTo(&entry); err != nil {
id := dc.Doc.Ref.ID
sklog.Errorf("corrupt data in firestore, could not unmarshal expectationEntry with id %s", id)
continue
}
entries = append(entries, entry)
}
return entries
}
// ForChangelist implements the ExpectationsStore interface.
func (s *Store) ForChangelist(id, crs string) expectations.Store {
if id == "" || crs == "" {
// These must both be specified
return nil
}
return &Store{
client: s.client,
numShards: clPartitionShards,
notifier: nil, // we do not need to notify when Changelist expectations change.
partition: crs + "_" + id,
mode: s.mode,
now: s.now,
}
}
// AddChange implements the ExpectationsStore interface.
func (s *Store) AddChange(ctx context.Context, delta []expectations.Delta, userID string) error {
defer metrics2.FuncTimer().Stop()
if s.mode == ReadOnly {
return ReadOnlyErr
}
// Create the entries that we want to write (using the previous values)
now := s.now()
// TODO(kjlubick) If we support ranges, these constants will need to be changed.
entries, changes, err := s.makeEntriesAndChanges(ctx, now, delta, beginningOfTime, endOfTime)
if err != nil {
return skerr.Wrapf(err, "preparing %d entries before storing", len(delta))
}
// Nothing to add
if len(entries) == 0 {
return nil
}
// firestore can do up to 500 writes at once, we have 2 writes per entry, plus 1 triageRecord
const batchSize = (ifirestore.MAX_TRANSACTION_DOCS / 2) - 1
b := s.client.Batch()
// First write the triage record, with Committed being false (i.e. in progress)
tr := s.recordsCollection().NewDoc()
record := triageRecord{
UserName: userID,
TS: now,
Changes: len(entries),
Committed: false,
}
b.Set(tr, record)
s.client.CountWriteQueryAndRows(s.expectationsCollection().Path, len(entries))
s.client.CountWriteQueryAndRows(s.changesCollection().Path, len(entries))
err = s.client.BatchWrite(ctx, len(entries), batchSize, maxOperationTime, b, func(b *firestore.WriteBatch, i int) error {
entry := entries[i]
e := s.expectationsCollection().Doc(entry.ID())
b.Set(e, entry)
tc := s.changesCollection().NewDoc()
change := changes[i]
change.RecordID = tr.ID
b.Set(tc, change)
return nil
})
if err != nil {
// We really hope this doesn't fail, because it could lead to a large batch triage that
// is partially applied.
return skerr.Wrap(err)
}
// We have succeeded this potentially long write, so mark it completed.
update := map[string]interface{}{
committedField: true,
}
_, err = s.client.Set(ctx, tr, update, 10, maxOperationTime, firestore.MergeAll)
return err
}
func (s *Store) makeEntriesAndChanges(ctx context.Context, now time.Time, delta []expectations.Delta, firstIdx, lastIdx int) ([]expectationEntry, []expectationChange, error) {
if err := s.updateEntryCacheIfNeeded(ctx); err != nil {
return nil, nil, skerr.Wrap(err)
}
s.entryCacheMutex.RLock()
defer s.entryCacheMutex.RUnlock()
var entries []expectationEntry
var changes []expectationChange
for _, d := range delta {
// Fetch what was there before - mainly to get the previous Ranges
entry := s.entryCache[expectations.ID{
Grouping: d.Grouping,
Digest: d.Digest,
}]
// These do nothing if the entry was already cached.
entry.Grouping = d.Grouping
entry.Digest = d.Digest
// We intentionally do not set LastUsed here. It will be the zero value for time when
// created, and we don't want to update LastUsed simply by triaging it.
// Update Updated and Ranges with the new data.
entry.Updated = now
newRange := triageRange{
FirstIndex: firstIdx,
LastIndex: lastIdx,
Label: expectations.LabelIntFromString(d.Label),
}
previousLabel := expectations.UntriagedInt
replacedRange := false
// TODO(kjlubick): if needed, this could be a binary search, but since there will be < 20
// ranges for almost all entries, it probably doesn't matter.
for i, r := range entry.Ranges {
if r.FirstIndex == firstIdx && r.LastIndex == lastIdx {
replacedRange = true
previousLabel = r.Label
entry.Ranges[i] = newRange
break
}
}
if !replacedRange {
entry.Ranges = append(entry.Ranges, newRange)
sort.Slice(entry.Ranges, func(i, j int) bool {
return entry.Ranges[i].FirstIndex < entry.Ranges[j].FirstIndex
})
}
entries = append(entries, entry)
changes = append(changes, expectationChange{
// RecordID will be filled out later
Grouping: d.Grouping,
Digest: d.Digest,
AffectedRange: newRange,
LabelBefore: previousLabel,
})
}
return entries, changes, nil
}
func (s *Store) updateEntryCacheIfNeeded(ctx context.Context) error {
if s.hasSnapshotsRunning {
return nil
}
// loadExpectations has the side-effect of updating s.entryCache, which is required if snapshots
// are not running (which normally update the cache).
_, err := s.loadExpectations(ctx)
return skerr.Wrap(err)
}
// Get implements the expectations.Store interface. If the RAM cache of expectation entries has been
// created, Get may return a cached value when no changes have happened since the previous call to
// Get. Otherwise, a new return value will be created, from the RAM cache if available, or from
// Firestore if not.
func (s *Store) Get(ctx context.Context) (expectations.ReadOnly, error) {
if s.hasSnapshotsRunning {
_, span := trace.StartSpan(ctx, "fsexpstore_getFromSnapshots")
defer span.End()
// If the snapshots are running, we first check to see if we have a fresh Expectations.
s.returnCacheMutex.Lock()
defer s.returnCacheMutex.Unlock()
if s.returnCache != nil {
return s.returnCache, nil
}
// At this point, we do not have a fresh expectation.Expectations (something has changed
// since the last time we made it), so we assemble a new one from our in-RAM cache of
// expectation entries (which we assume to be up to date courtesy of our running snapshots).
e := s.assembleExpectations() // RLock entryCacheMutex
s.returnCache = e
return e, nil
}
// If the snapshots are not loaded, we assume we do not have a RAM cache and load the
// expectations from Firestore.
return s.loadExpectations(ctx)
}
// GetCopy implements the expectations.Store interface.
func (s *Store) GetCopy(ctx context.Context) (*expectations.Expectations, error) {
if s.hasSnapshotsRunning {
// If the snapshots are running, we first check to see if we have a fresh Expectations.
s.returnCacheMutex.Lock()
defer s.returnCacheMutex.Unlock()
if s.returnCache != nil {
// The cache is fresh, so return a copy (so clients can mutate it if they need to).
return s.returnCache.DeepCopy(), nil
}
// At this point, we do not have a fresh expectation.Expectations (something has changed
// since the last time we made it), so we assemble a new one from our in-RAM cache of
// expectation entries (which we assume to be up to date courtesy of our running snapshots).
return s.assembleExpectations(), nil // RLock entryCacheMutex
}
// If the snapshots are not loaded, we assume we do not have a RAM cache and load the
// expectations from Firestore.
return s.loadExpectations(ctx)
}
// loadExpectations fetches the expectations from Firestore and returns them. It shards the query
// to expedite the process. The fetched expectationEntry will be cached in the s.entryCache, under
// the assumption that loadExpectations will only be called for setups that do not have the
// snapshot queries, and the entryCache is used to create the expectationChanges (for undoing).
func (s *Store) loadExpectations(ctx context.Context) (*expectations.Expectations, error) {
ctx, span := trace.StartSpan(ctx, "fsexpstore_loadExpectations")
defer span.End()
es := make([][]expectationEntry, s.numShards)
queries := fs_utils.ShardOnDigest(s.expectationsCollection(), digestField, s.numShards)
err := s.client.IterDocsInParallel(ctx, "loadExpectations", s.partition, queries, maxRetries, maxOperationTime, func(i int, doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
entry := expectationEntry{}
if err := doc.DataTo(&entry); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in firestore, could not unmarshal expectationEntry with id %s", id)
}
if len(entry.Ranges) == 0 {
// This should never happen, but we'll ignore these malformed entries if they do.
return nil
}
es[i] = append(es[i], entry)
return nil
})
if err != nil {
return nil, skerr.Wrapf(err, "fetching expectations for partition %s", s.partition)
}
e := expectations.Expectations{}
toCache := map[expectations.ID]expectationEntry{}
for _, entries := range es {
for _, entry := range entries {
// TODO(kjlubick) If we decide to handle ranges of expectations, Get will need to take a
// parameter indicating the commit index for which we should return valid ranges.
e.Set(entry.Grouping, entry.Digest, entry.Ranges[0].Label.String())
toCache[expectations.ID{
Grouping: entry.Grouping,
Digest: entry.Digest,
}] = entry
}
}
s.entryCacheMutex.Lock()
defer s.entryCacheMutex.Unlock()
s.entryCache = toCache
return &e, nil
}
// assembleExpectations creates an Expectations from the entryCache. It will copy any data it needs,
// so the return value can be mutated freely.
func (s *Store) assembleExpectations() *expectations.Expectations {
s.entryCacheMutex.RLock()
defer s.entryCacheMutex.RUnlock()
e := &expectations.Expectations{}
for id, entry := range s.entryCache {
if len(entry.Ranges) == 0 {
sklog.Warningf("ignoring invalid entry for id %s", id)
continue
}
// TODO(kjlubick) If we decide to handle ranges of expectations, Get will need to take a
// parameter indicating the commit index for which we should return valid ranges.
e.Set(entry.Grouping, entry.Digest, entry.Ranges[0].Label.String())
}
return e
}
// QueryLog implements the expectations.Store interface
func (s *Store) QueryLog(ctx context.Context, offset, size int, details bool) ([]expectations.TriageLogEntry, int, error) {
if offset < 0 || size <= 0 {
return nil, -1, skerr.Fmt("offset: %d and size: %d must be positive", offset, size)
}
defer metrics2.FuncTimer().Stop()
// Fetch the records, which have everything except the details.
q := s.recordsCollection().OrderBy(tsField, firestore.Desc).Offset(offset).Limit(size)
q = q.Where(committedField, "==", true)
var rv []expectations.TriageLogEntry
d := fmt.Sprintf("offset: %d, size %d", offset, size)
err := s.client.IterDocs(ctx, "query_log", d, q, maxRetries, maxOperationTime, func(doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
tr := triageRecord{}
if err := doc.DataTo(&tr); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in firestore, could not unmarshal triageRecord with id %s", id)
}
rv = append(rv, expectations.TriageLogEntry{
ID: doc.Ref.ID,
User: tr.UserName,
TS: tr.TS,
ChangeCount: tr.Changes,
})
return nil
})
if err != nil {
return nil, -1, skerr.Wrapf(err, "could not request triage records [%d: %d]", offset, size)
}
// n is the total number of records.
n := len(rv)
if n == size && n != 0 {
// We don't know how many there are and it might be too slow to count, so just give
// the "many" response.
n = expectations.CountMany
} else {
// We know exactly either 1) how many there are (if n > 0) or 2) an upper bound on how many
// there are (if n == 0)
n += offset
}
if len(rv) == 0 || !details {
return rv, n, nil
}
// Make a query for each of the records to fetch the changes belonging to that record.
qs := make([]firestore.Query, 0, len(rv))
for _, r := range rv {
q := s.changesCollection().Where(recordIDField, "==", r.ID)
// These records are getting shown to a human - to prevent UI slowness or other bad things if
// we have many many records (e.g. migrations), we'll limit what we display to 1000. Worry not,
// if the record gets undone, all of the changes will be applied, since that does its own query.
q = q.Limit(1000)
qs = append(qs, q)
}
// Then fire them all off in parallel.
err = s.client.IterDocsInParallel(ctx, "query_log_details", d, qs, maxRetries, maxOperationTime, func(i int, doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
tc := expectationChange{}
if err := doc.DataTo(&tc); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in firestore, could not unmarshal triageChanges with id %s", id)
}
rv[i].Details = append(rv[i].Details, expectations.Delta{
Grouping: tc.Grouping,
Digest: tc.Digest,
// TODO(kjlubick) If we expose ranges, we should include FirstIndex/LastIndex here.
Label: tc.AffectedRange.Label.String(),
})
return nil
})
if err != nil {
return nil, -1, skerr.Wrapf(err, "could not query details")
}
for _, r := range rv {
sort.Slice(r.Details, func(i, j int) bool {
// Sort them by grouping, then Digest for determinism
a, b := r.Details[i], r.Details[j]
if a.Grouping == b.Grouping {
return a.Digest < b.Digest
}
return a.Grouping < b.Grouping
})
}
return rv, n, nil
}
// UndoChange implements the expectations.Store interface.
func (s *Store) UndoChange(ctx context.Context, changeID, userID string) error {
if s.mode == ReadOnly {
return ReadOnlyErr
}
defer metrics2.FuncTimer().Stop()
// Verify the original change id exists.
dr := s.recordsCollection().Doc(changeID)
doc, err := s.client.Get(ctx, dr, 3, maxOperationTime)
if err != nil || !doc.Exists() {
return skerr.Wrapf(err, "could not find change to undo with id %s", changeID)
}
q := s.changesCollection().Where(recordIDField, "==", changeID)
var delta []expectations.Delta
err = s.client.IterDocs(ctx, "undo_query", changeID, q, 3, maxOperationTime, func(doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
tc := expectationChange{}
if err := doc.DataTo(&tc); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in firestore, could not unmarshal triageChanges with id %s", id)
}
delta = append(delta, expectations.Delta{
Grouping: tc.Grouping,
Digest: tc.Digest,
// TODO(kjlubick): if we support ranges, we will want to add them here.
Label: tc.LabelBefore.String(),
})
return nil
})
if err != nil {
return skerr.Wrapf(err, "could not get delta to undo %s", changeID)
}
if err = s.AddChange(ctx, delta, userID); err != nil {
return skerr.Wrapf(err, "could not apply delta to undo %s", changeID)
}
return nil
}
// GetTriageHistory implements the expectations.Store interface.
func (s *Store) GetTriageHistory(ctx context.Context, grouping types.TestName, digest types.Digest) ([]expectations.TriageHistory, error) {
defer metrics2.FuncTimer().Stop()
q := s.changesCollection().Where(groupingField, "==", grouping).Where(digestField, "==", digest)
entryID := fmt.Sprintf("%s-%s", grouping, digest)
var recordsToFetch []*firestore.DocumentRef
err := s.client.IterDocs(ctx, "triage_history", entryID, q, 3, maxOperationTime, func(doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
tc := expectationChange{}
if err := doc.DataTo(&tc); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in firestore, could not unmarshal triage change with id %s", id)
}
recordsToFetch = append(recordsToFetch, s.recordsCollection().Doc(tc.RecordID))
return nil
})
if err != nil {
return nil, skerr.Wrapf(err, "getting history for %s", entryID)
}
if len(recordsToFetch) == 0 {
return nil, nil
}
s.client.CountReadQueryAndRows(s.recordsCollection().Path, len(recordsToFetch))
records, err := s.client.GetAll(ctx, recordsToFetch)
if err != nil {
return nil, skerr.Wrapf(err, "fetching %d records belonging to %s", len(recordsToFetch), entryID)
}
var rv []expectations.TriageHistory
for _, doc := range records {
if doc == nil {
continue
}
tr := triageRecord{}
if err := doc.DataTo(&tr); err != nil {
id := doc.Ref.ID
return nil, skerr.Wrapf(err, "corrupt data in firestore, could not unmarshal triage record with id %s", id)
}
rv = append(rv, expectations.TriageHistory{
User: tr.UserName,
TS: tr.TS,
})
}
sort.Slice(rv, func(i, j int) bool {
return rv[i].TS.After(rv[j].TS)
})
return rv, nil
}
// UpdateLastUsed implements the expectations.GarbageCollector interface.
func (s *Store) UpdateLastUsed(ctx context.Context, ids []expectations.ID, now time.Time) error {
if s.partition != masterPartition {
return skerr.Fmt("Cannot call UpdateLastUsed except on the master partition")
}
if len(ids) == 0 {
return nil
}
s.client.CountWriteQueryAndRows(s.expectationsCollection().Path, len(ids))
const batchSize = ifirestore.MAX_TRANSACTION_DOCS
err := s.client.BatchWrite(ctx, len(ids), batchSize, maxOperationTime, nil, func(b *firestore.WriteBatch, i int) error {
id := ids[i]
e := s.expectationsCollection().Doc(entryID(id))
b.Update(e, []firestore.Update{{Path: lastUsedField, Value: now}})
return nil
})
if err != nil {
// If this fails, it's not a huge concern unless failures happen multiple days in a row.
return skerr.Wrapf(err, "batch updating firestore")
}
return nil
}
// MarkUnusedEntriesForGC implements the expectations.GarbageCollector interface.
func (s *Store) MarkUnusedEntriesForGC(ctx context.Context, label expectations.LabelInt, ts time.Time) (int, error) {
if s.partition != masterPartition {
return 0, skerr.Fmt("Cannot call UpdateLastUsed except on the master partition")
}
q := s.expectationsCollection().Where(lastUsedField, "<", ts)
var toGC []*firestore.DocumentRef
// Use IterDocs instead of q.Documents(ctx).GetAll because this might be a very large query
// and we want to use the retry/restart logic of IterDocs to get them all.
err := s.client.IterDocs(ctx, "mark_expectations_for_GC", string(label.String()), q, 3, 10*time.Minute, func(doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
er := expectationEntry{}
if err := doc.DataTo(&er); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in firestore, could not unmarshal expectation entry with id %s", id)
}
// We can't have multiple inequality filters for multiple properties on a single query, so
// we have to apply these conditions after the fact.
if er.Updated.After(ts) || er.NeedsGC {
return nil
}
// TODO(kjlubick) if we implement ranges, the API will need to include a commit index for which
// the provided label applies. This should typically be for the latest commit.
if len(er.Ranges) != 1 {
sklog.Debugf("Found expectationEntry with unexpected amount of ranges: %s-%s had %d", er.Grouping, er.Digest, len(er.Ranges))
return nil
}
latestRange := er.Ranges[0]
if latestRange.Label != label {
return nil
}
toGC = append(toGC, doc.Ref)
return nil
})
if err != nil {
return 0, skerr.Wrapf(err, "fetching expectations to mark for GC")
}
// TODO(kjlubick) make this use BatchWrite
for _, doc := range toGC {
update := map[string]interface{}{
needsGCField: true,
}
_, err := s.client.Set(ctx, doc, update, maxRetries, maxOperationTime, firestore.MergeAll)
if err != nil {
return 0, skerr.Wrapf(err, "marking entry %s for GC", doc.ID)
}
}
return len(toGC), nil
}
// GarbageCollect implements the expectations.GarbageCollector interface.
func (s *Store) GarbageCollect(ctx context.Context) (int, error) {
q := s.expectationsCollection().Where(needsGCField, "==", true)
var toDelete []*firestore.DocumentRef
// Use IterDocs instead of q.Documents(ctx).GetAll because this might be a very large query
// and we want to use the retry/restart logic of IterDocs to get them all.
err := s.client.IterDocs(ctx, "gc_expectations", "", q, 3, 10*time.Minute, func(doc *firestore.DocumentSnapshot) error {
if doc == nil || doc.Ref == nil {
return nil
}
toDelete = append(toDelete, doc.Ref)
return nil
})
if err != nil {
return 0, skerr.Wrapf(err, "fetching expectations to gc")
}
s.client.CountWriteQueryAndRows(s.expectationsCollection().Path, len(toDelete))
const batchSize = ifirestore.MAX_TRANSACTION_DOCS
err = s.client.BatchWrite(ctx, len(toDelete), batchSize, maxOperationTime, nil, func(b *firestore.WriteBatch, i int) error {
doc := toDelete[i]
b.Delete(doc)
return nil
})
if err != nil {
return 0, skerr.Wrapf(err, "garbage collecting expectations from firestore")
}
return len(toDelete), nil
}
func (s *Store) expectationsCollection() *firestore.CollectionRef {
return s.client.Collection(partitions).Doc(s.partition).Collection(expectationEntries)
}
func (s *Store) recordsCollection() *firestore.CollectionRef {
return s.client.Collection(partitions).Doc(s.partition).Collection(recordEntries)
}
func (s *Store) changesCollection() *firestore.CollectionRef {
return s.client.Collection(partitions).Doc(s.partition).Collection(changeEntries)
}
// Make sure Store fulfills the expectations.Store interface
var _ expectations.Store = (*Store)(nil)
// Make sure Store fulfills the expectations.GarbageCollector interface
var _ expectations.GarbageCollector = (*Store)(nil)