blob: 904d3eb17722f7ba2b924764211e09cf1461b9a9 [file] [log] [blame]
// Implements an ExpectationsStore based on Firestore. See FIRESTORE.md for the schema
// and design rationale.
package fs_expstore
import (
"context"
"errors"
"sync"
"time"
"cloud.google.com/go/firestore"
"github.com/cenkalti/backoff"
ifirestore "go.skia.org/infra/go/firestore"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/golden/go/types"
)
// AccessMode indicates if this ExpectationsStore can update existing Expectations
// in the backing store or if if can only read them.
type AccessMode int
const (
ReadOnly AccessMode = iota
ReadWrite
)
const (
MasterBranch = int64(0)
)
var (
ReadOnlyErr = errors.New("expectationStore is in read-only mode")
)
const (
// Should be used to create the firestore.NewClient that is passed into New.
ExpectationStoreCollection = "expstore"
// These are the collections in Firestore.
expectationsCollection = "expectations"
triageRecordsCollection = "triage_records"
triageChangesCollection = "triage_changes"
// Columns in the Collections we query by.
issueCol = "issue"
maxOperationTime = 2 * time.Minute
)
// Store implements expstorage.ExpectationsStore backed by
// Firestore. It has a write-through caching mechanism.
type Store struct {
client *ifirestore.Client
mode AccessMode
issue int64 // Gerrit or GitHub issue, or MasterBranch
// cacheMutex protects the write-through cache object.
cacheMutex sync.RWMutex
cache types.Expectations
}
// expectationEntry is the document type stored in the expectationsCollection.
type expectationEntry struct {
Grouping types.TestName `firestore:"grouping"`
Digest types.Digest `firestore:"digest"`
Label types.Label `firestore:"label"`
Updated time.Time `firestore:"updated"`
Issue int64 `firestore:"issue"`
}
// ID returns the deterministic ID that lets us update existing entries.
func (e *expectationEntry) ID() string {
return string(e.Grouping) + "|" + string(e.Digest)
}
// triageRecord is the document type stored in the triageRecordsCollection.
type triageRecord struct {
UserName string `firestore:"user"`
TS time.Time `firestore:"ts"`
Issue int64 `firestore:"issue"`
Changes int `firestore:"changes"`
Committed bool `firestore:"committed"`
}
// triageChanges is the document type stored in the triageChangesCollection.
type triageChanges struct {
RecordID string `firestore:"record_id"`
Grouping types.TestName `firestore:"grouping"`
Digest types.Digest `firestore:"digest"`
LabelBefore types.Label `firestore:"before"`
LabelAfter types.Label `firestore:"after"`
}
// New returns a new Store using the given firestore client. The issue param is used
// to indicate if this Store is configured to read/write the baselines for a given CL
// or if it is on MasterBranch.
func New(client *ifirestore.Client, issue int64, mode AccessMode) *Store {
return &Store{
client: client,
issue: issue,
mode: mode,
}
}
// Get implements the ExpectationsStore interface.
func (f *Store) Get() (types.Expectations, error) {
f.cacheMutex.RLock()
defer f.cacheMutex.RUnlock()
if f.cache == nil {
c, err := f.loadExpectations()
if err != nil {
return nil, skerr.Fmt("could not load expectations from firestore: %s", err)
}
f.cache = c
}
return f.cache.DeepCopy(), nil
}
// loadExpectations reads the entire Expectations from the expectationsCollection,
// matching the configured branch.
func (f *Store) loadExpectations() (types.Expectations, error) {
e := types.Expectations{}
q := f.client.Collection(expectationsCollection).Where(issueCol, "==", MasterBranch)
maxRetries := 3
err := f.client.IterDocs("loadExpectations", "", q, maxRetries, maxOperationTime, func(doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
entry := expectationEntry{}
if err := doc.DataTo(&entry); err != nil {
id := doc.Ref.ID
return skerr.Fmt("corrupt data in firestore, could not unmarshal entry with id %s: %s", id, err)
}
e.AddDigest(entry.Grouping, entry.Digest, entry.Label)
return nil
})
return e, err
}
// AddChange implements the ExpectationsStore interface.
func (f *Store) AddChange(ctx context.Context, newExp types.Expectations, userId string) error {
if f.mode == ReadOnly {
return ReadOnlyErr
}
// Create the entries that we want to write (using the previous values)
now, entries, changes := func() (time.Time, []expectationEntry, []triageChanges) {
f.cacheMutex.Lock()
defer f.cacheMutex.Unlock()
now := time.Now()
entries, changes := f.flatten(now, newExp)
// Write the changes to the locale cache. We do this first so we can free up
// the read mutex as soon as possible.
if f.cache == nil {
f.cache = newExp.DeepCopy()
} else {
f.cache.MergeExpectations(newExp)
}
return now, entries, changes
}()
// Nothing to add
if len(entries) == 0 {
return nil
}
// firestore can do up to 500 writes at once, we have 2 writes per entry, plus 1 triageRecord
batchSize := (500 / 2) - 1
b := f.client.Batch()
// First write the triage record, with Committed being false (i.e. in progress)
tr := f.client.Collection(triageRecordsCollection).NewDoc()
record := triageRecord{
UserName: userId,
TS: now,
Issue: f.issue,
Changes: len(entries),
Committed: false,
}
b.Set(tr, record)
// In batches, add ExpectationEntry and TriageChange Documents
for i := 0; i < len(entries); i += batchSize {
stop := i + batchSize
if stop > len(entries) {
stop = len(entries)
}
for idx, entry := range entries[i:stop] {
e := f.client.Collection(expectationsCollection).Doc(entry.ID())
b.Set(e, entry)
tc := f.client.Collection(triageChangesCollection).NewDoc()
change := changes[idx]
change.RecordID = tr.ID
b.Set(tc, change)
}
exp := &backoff.ExponentialBackOff{
InitialInterval: time.Second,
RandomizationFactor: 0.5,
Multiplier: 2,
MaxInterval: maxOperationTime / 4,
MaxElapsedTime: maxOperationTime,
Clock: backoff.SystemClock,
}
o := func() error {
_, err := b.Commit(ctx)
return err
}
if err := backoff.Retry(o, exp); err != nil {
// We really hope this doesn't happen, as it may leave the data in a partially
// broken state.
return skerr.Fmt("problem writing entries with retry [%d, %d]: %s", i, stop, err)
}
// Go on to the next batch, if needed.
if stop < len(entries) {
b = f.client.Batch()
}
}
// We have succeeded this potentially long write, so mark it completed.
update := map[string]interface{}{
"committed": true,
}
_, err := f.client.Set(tr, update, 10, maxOperationTime, firestore.MergeAll)
return err
}
// flatten creates the data for the Documents to be written for a given Expectations delta.
// It requires that the f.cache is safe to read (i.e. the mutex is held), because
// it needs to determine the previous values.
func (f *Store) flatten(now time.Time, newExp types.Expectations) ([]expectationEntry, []triageChanges) {
var entries []expectationEntry
var changes []triageChanges
for testName, digestMap := range newExp {
for digest, label := range digestMap {
entries = append(entries, expectationEntry{
Grouping: testName,
Digest: digest,
Label: label,
Updated: now,
Issue: f.issue,
})
changes = append(changes, triageChanges{
// RecordID will be filled out later
Grouping: testName,
Digest: digest,
LabelBefore: f.cache.Classification(testName, digest),
LabelAfter: label,
})
}
}
return entries, changes
}