blob: 0cbe86aabf220c1ff976578124d560c672170e9c [file] [log] [blame]
// Implements an ExpectationsStore based on Firestore. See FIRESTORE.md for the schema
// and design rationale.
package fs_expstore
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
"cloud.google.com/go/firestore"
"github.com/cenkalti/backoff"
"go.skia.org/infra/go/eventbus"
ifirestore "go.skia.org/infra/go/firestore"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/golden/go/expstorage"
"go.skia.org/infra/golden/go/fs_utils"
"go.skia.org/infra/golden/go/shared"
"go.skia.org/infra/golden/go/types"
"golang.org/x/sync/errgroup"
)
// AccessMode indicates if this ExpectationsStore can update existing Expectations
// in the backing store or if if can only read them.
type AccessMode int
const (
ReadOnly AccessMode = iota
ReadWrite
)
var (
ReadOnlyErr = errors.New("expectationStore is in read-only mode")
)
const (
// These are the collections in Firestore.
expectationsCollection = "expstore_expectations"
triageRecordsCollection = "expstore_triage_records"
triageChangesCollection = "expstore_triage_changes"
// Fields in the Collections we query by.
committedField = "committed"
digestField = "digest"
groupingField = "grouping"
issueField = "issue"
recordIDField = "record_id"
tsField = "ts"
maxOperationTime = 2 * time.Minute
// masterShards was determined empirically on a data set of about 550k expectationEntry
// 1 shard -> ???
// 10 shards -> 215s
// 100 shards -> 21s
// 256 shards -> 10s
// 512 shards -> 9s
// 4096 shards -> 9s
masterShards = 512
// There will not be very many entries on issues, relative to the MasterBranch, so
// we can get away with many fewer shards to avoid the overhead of so many
// simultaneous queries.
issueShards = 4
// snapshotShards was determined empirically on a data set of about 550k expectationEntry
// The more shards here, the more overhead and contention with the masterShards,
// so we aim for the sweet spot, erring on the side of too few shards.
// Times are for the New() function (i.e. initial fetch)
// 1 shard -> ???
// 8 shards -> 49s
// 16 shards -> 25s
// 32 shards -> 17s
// 64 shards -> 15s
// 96 shards -> ???
// 128 shards -> ???
// 512 shards -> ???
snapshotShards = 32
)
// Store implements expstorage.ExpectationsStore backed by
// Firestore. It has a write-through caching mechanism.
type Store struct {
client *ifirestore.Client
mode AccessMode
issue int64 // Gerrit or GitHub issue, or MasterBranch
// eventBus allows this Store to communicate with the outside world when
// expectations change.
eventBus eventbus.EventBus
// globalEvent keeps track whether we want to send events within this instance
// or on the global eventbus.
globalEvent bool
// eventExpChange keeps track of which event to fire when the expectations change.
// This will be for either the MasterExpectations or for an IssueExpectations.
eventExpChange string
// cacheMutex protects the write-through cache object.
cacheMutex sync.RWMutex
cache types.Expectations
masterQuerySnapshots []*firestore.QuerySnapshotIterator
}
// expectationEntry is the document type stored in the expectationsCollection.
type expectationEntry struct {
Grouping types.TestName `firestore:"grouping"`
Digest types.Digest `firestore:"digest"`
Label types.Label `firestore:"label"`
Updated time.Time `firestore:"updated"`
Issue int64 `firestore:"issue"`
}
// ID returns the deterministic ID that lets us update existing entries.
func (e *expectationEntry) ID() string {
s := string(e.Grouping) + "|" + string(e.Digest)
// firestore gets cranky if there are / in key names
return strings.Replace(s, "/", "-", -1)
}
// triageRecord is the document type stored in the triageRecordsCollection.
type triageRecord struct {
UserName string `firestore:"user"`
TS time.Time `firestore:"ts"`
Issue int64 `firestore:"issue"`
Changes int `firestore:"changes"`
Committed bool `firestore:"committed"`
}
// triageChanges is the document type stored in the triageChangesCollection.
type triageChanges struct {
RecordID string `firestore:"record_id"`
Grouping types.TestName `firestore:"grouping"`
Digest types.Digest `firestore:"digest"`
LabelBefore types.Label `firestore:"before"`
LabelAfter types.Label `firestore:"after"`
}
// New returns a new Store using the given firestore client. The Store will track
// MasterBranch- see ForIssue() for getting Stores that track ChangeLists.
// The passed in context is used for the QuerySnapshots (in ReadOnly mode).
func New(ctx context.Context, client *ifirestore.Client, eventBus eventbus.EventBus, mode AccessMode) (*Store, error) {
defer metrics2.FuncTimer().Stop()
defer shared.NewMetricsTimer("expstore_init").Stop()
f := &Store{
client: client,
eventBus: eventBus,
eventExpChange: expstorage.EV_EXPSTORAGE_CHANGED,
globalEvent: true,
issue: types.MasterBranch,
mode: mode,
}
if mode == ReadOnly {
// Start the snapshots (which will ignore any results)
err := f.initQuerySnapshot(ctx)
if err != nil {
return nil, skerr.Wrapf(err, "could not get initial query snapshot")
}
}
// pre-load the cache. This simplifies the mutex handling in Get().
_, err := f.getMasterExpectations()
if err != nil {
return nil, skerr.Fmt("could not perform initial get")
}
if mode == ReadOnly {
// Starts several go routines to listen to the snapshots created earlier.
// If there were any new entries added after the snapshots were created, but before
// we did a full fetch, we'll see them on the first pass through.
f.listenToQuerySnapshots()
}
return f, nil
}
// ForIssue implements the ExpectationsStore interface.
func (f *Store) ForIssue(id int64) expstorage.ExpectationsStore {
if types.IsMasterBranch(id) {
// It is invalid to re-request the master branch
return nil
}
return &Store{
client: f.client,
eventBus: f.eventBus,
eventExpChange: expstorage.EV_TRYJOB_EXP_CHANGED,
globalEvent: false,
issue: id,
mode: f.mode,
}
}
// Get implements the ExpectationsStore interface.
func (f *Store) Get() (types.Expectations, error) {
if f.issue == types.MasterBranch {
defer metrics2.NewTimer("gold_get_expectations", map[string]string{"master_branch": "true"}).Stop()
f.cacheMutex.RLock()
defer f.cacheMutex.RUnlock()
return f.getMasterExpectations()
}
defer metrics2.NewTimer("gold_get_expectations", map[string]string{"master_branch": "false"}).Stop()
return f.getIssueExpectations()
}
// getMasterExpectations returns an Expectations object which is safe to mutate
// based on the current state. It is expected the caller has taken care of any mutex grabbing.
func (f *Store) getMasterExpectations() (types.Expectations, error) {
if f.cache == nil {
c, err := f.loadExpectationsSharded(types.MasterBranch, masterShards)
if err != nil {
return nil, skerr.Fmt("could not load master expectations from firestore: %s", err)
}
f.cache = c
}
return f.cache.DeepCopy(), nil
}
// initQuerySnapshot creates many firestore.QuerySnapshotIterator objects based on a shard of
// all expectations and does the first Next() on them (which will try to return all data
// in those shards). We ignore that data, as a future call will update the cached expectations.
// It stores these iterators in f.masterQuerySnapshots. Without sharding them, this times out
// with many expectations because of the fact that the first call to Next() fetches all data
// currently there.
// TODO(kjlubick): if this works well for the ReadOnly case, could we maybe do something similar
// for the read-write case? I don't think we'd ever have two writers, but it would open us up
// to that.
func (f *Store) initQuerySnapshot(ctx context.Context) error {
q := f.client.Collection(expectationsCollection).Where(issueField, "==", types.MasterBranch)
queries := fs_utils.ShardQueryOnDigest(q, digestField, snapshotShards)
f.masterQuerySnapshots = make([]*firestore.QuerySnapshotIterator, snapshotShards)
var eg errgroup.Group
for shard, q := range queries {
func(shard int, q firestore.Query) {
eg.Go(func() error {
snap := q.Snapshots(ctx)
_, err := snap.Next()
if err != nil {
return skerr.Wrapf(err, "getting initial snapshot data")
}
f.masterQuerySnapshots[shard] = snap
return nil
})
}(shard, q)
}
return eg.Wait()
}
// listenToQuerySnapshots takes the f.masterQuerySnapshots from earlier and spins up N
// go routines that listen to those snapshots. If they see new triages (i.e. expectationEntry),
// they update the f.cache (which is protected by cacheMutex).
func (f *Store) listenToQuerySnapshots() {
for i := 0; i < snapshotShards; i++ {
go func(shard int) {
for {
qs, err := f.masterQuerySnapshots[shard].Next()
if err != nil {
sklog.Errorf("reading query snapshot %d: %s", shard, err)
// sleep and try again
time.Sleep(5 * time.Second)
continue
}
e := types.Expectations{}
for _, dc := range qs.Changes {
if dc.Kind == firestore.DocumentRemoved {
continue // There will likely never be DocumentRemoved events
}
entry := expectationEntry{}
if err := dc.Doc.DataTo(&entry); err != nil {
id := dc.Doc.Ref.ID
sklog.Errorf("corrupt data in firestore, could not unmarshal entry with id %s", id)
continue
}
sklog.Debugf("Query Snapshot saw: %s %s %s\n", entry.Grouping, entry.Digest, entry.Label.String())
e.AddDigest(entry.Grouping, entry.Digest, entry.Label)
}
func() {
f.cacheMutex.Lock()
defer f.cacheMutex.Unlock()
f.cache.MergeExpectations(e)
}()
}
}(i)
}
}
// getIssueExpectations returns an Expectations object which is safe to mutate
// that has all issue-specific Expectations.
// It fetches everything from firestore every time, as there could be multiple
// readers and writers and thus caching isn't safe.
func (f *Store) getIssueExpectations() (types.Expectations, error) {
issueExp, err := f.loadExpectationsSharded(f.issue, issueShards)
if err != nil {
return nil, skerr.Fmt("could not load expectations delta for issue %d from firestore: %s", f.issue, err)
}
return issueExp, nil
}
// loadExpectationsSharded returns an Expectations object from the expectationsCollection,
// with all Expectations belonging to the passed in issue (can be MasterBranch).
func (f *Store) loadExpectationsSharded(issue int64, shards int) (types.Expectations, error) {
defer metrics2.FuncTimer().Stop()
q := f.client.Collection(expectationsCollection).Where(issueField, "==", issue)
es := make([]types.Expectations, shards)
queries := fs_utils.ShardQueryOnDigest(q, digestField, shards)
maxRetries := 3
err := f.client.IterDocsInParallel("loadExpectations", strconv.FormatInt(issue, 10), queries, maxRetries, maxOperationTime, func(i int, doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
entry := expectationEntry{}
if err := doc.DataTo(&entry); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in firestore, could not unmarshal entry with id %s", id)
}
if es[i] == nil {
es[i] = types.Expectations{}
}
es[i].AddDigest(entry.Grouping, entry.Digest, entry.Label)
return nil
})
if err != nil {
return nil, skerr.Wrapf(err, "fetching expectations for ChangeList %d", issue)
}
e := types.Expectations{}
for _, ne := range es {
e.MergeExpectations(ne)
}
return e, nil
}
// AddChange implements the ExpectationsStore interface.
func (f *Store) AddChange(ctx context.Context, newExp types.Expectations, userID string) error {
defer metrics2.FuncTimer().Stop()
if f.mode == ReadOnly {
return ReadOnlyErr
}
// Create the entries that we want to write (using the previous values)
now, entries, changes := func() (time.Time, []expectationEntry, []triageChanges) {
f.cacheMutex.Lock()
defer f.cacheMutex.Unlock()
now := time.Now()
entries, changes := f.flatten(now, newExp)
// Write the changes to the locale cache. We do this first so we can free up
// the read mutex as soon as possible.
if f.cache == nil {
f.cache = newExp.DeepCopy()
} else {
f.cache.MergeExpectations(newExp)
}
return now, entries, changes
}()
if f.eventBus != nil {
f.eventBus.Publish(f.eventExpChange, &expstorage.EventExpectationChange{
TestChanges: newExp,
IssueID: f.issue,
}, f.globalEvent)
}
// Nothing to add
if len(entries) == 0 {
return nil
}
// firestore can do up to 500 writes at once, we have 2 writes per entry, plus 1 triageRecord
batchSize := (500 / 2) - 1
b := f.client.Batch()
// First write the triage record, with Committed being false (i.e. in progress)
tr := f.client.Collection(triageRecordsCollection).NewDoc()
record := triageRecord{
UserName: userID,
TS: now,
Issue: f.issue,
Changes: len(entries),
Committed: false,
}
b.Set(tr, record)
// In batches, add ExpectationEntry and TriageChange Documents
for i := 0; i < len(entries); i += batchSize {
stop := i + batchSize
if stop > len(entries) {
stop = len(entries)
}
sklog.Debugf("Storing new expectations [%d, %d]", i, stop)
for idx, entry := range entries[i:stop] {
e := f.client.Collection(expectationsCollection).Doc(entry.ID())
b.Set(e, entry)
tc := f.client.Collection(triageChangesCollection).NewDoc()
change := changes[idx]
change.RecordID = tr.ID
b.Set(tc, change)
}
exp := &backoff.ExponentialBackOff{
InitialInterval: time.Second,
RandomizationFactor: 0.5,
Multiplier: 2,
MaxInterval: maxOperationTime / 4,
MaxElapsedTime: maxOperationTime,
Clock: backoff.SystemClock,
}
o := func() error {
_, err := b.Commit(ctx)
return err
}
if err := backoff.Retry(o, exp); err != nil {
// We really hope this doesn't happen, as it may leave the data in a partially
// broken state.
return skerr.Wrapf(err, "writing entries with retry [%d, %d]", i, stop)
}
// Go on to the next batch, if needed.
if stop < len(entries) {
b = f.client.Batch()
}
}
// We have succeeded this potentially long write, so mark it completed.
update := map[string]interface{}{
"committed": true,
}
_, err := f.client.Set(tr, update, 10, maxOperationTime, firestore.MergeAll)
return err
}
// flatten creates the data for the Documents to be written for a given Expectations delta.
// It requires that the f.cache is safe to read (i.e. the mutex is held), because
// it needs to determine the previous values.
func (f *Store) flatten(now time.Time, newExp types.Expectations) ([]expectationEntry, []triageChanges) {
var entries []expectationEntry
var changes []triageChanges
for testName, digestMap := range newExp {
for digest, label := range digestMap {
entries = append(entries, expectationEntry{
Grouping: testName,
Digest: digest,
Label: label,
Updated: now,
Issue: f.issue,
})
changes = append(changes, triageChanges{
// RecordID will be filled out later
Grouping: testName,
Digest: digest,
LabelBefore: f.cache.Classification(testName, digest),
LabelAfter: label,
})
}
}
return entries, changes
}
// QueryLog implements the ExpectationsStore interface.
func (f *Store) QueryLog(ctx context.Context, offset, size int, details bool) ([]expstorage.TriageLogEntry, int, error) {
if offset < 0 || size <= 0 {
return nil, 0, skerr.Fmt("offset: %d and size: %d must be positive", offset, size)
}
tags := map[string]string{
"with_details": "false",
}
if details {
tags["with_details"] = "true"
}
defer metrics2.NewTimer("gold_query_log", tags).Stop()
// Fetch the records, which have everything except the details.
q := f.client.Collection(triageRecordsCollection).OrderBy(tsField, firestore.Desc).Offset(offset).Limit(size)
q = q.Where(issueField, "==", f.issue).Where(committedField, "==", true)
var rv []expstorage.TriageLogEntry
d := fmt.Sprintf("offset: %d, size %d", offset, size)
err := f.client.IterDocs("query_log", d, q, 3, maxOperationTime, func(doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
tr := triageRecord{}
if err := doc.DataTo(&tr); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in firestore, could not unmarshal triage record with id %s", id)
}
rv = append(rv, expstorage.TriageLogEntry{
ID: doc.Ref.ID,
Name: tr.UserName,
TS: tr.TS.Unix() * 1000,
ChangeCount: tr.Changes,
})
return nil
})
if err != nil {
return nil, 0, skerr.Wrapf(err, "could not request triage records [%d: %d]", offset, size)
}
if len(rv) == 0 || !details {
return rv, len(rv), nil
}
// Make a query for each of the records to fetch the changes belonging to that record.
qs := make([]firestore.Query, 0, len(rv))
for _, r := range rv {
q := f.client.Collection(triageChangesCollection).Where(recordIDField, "==", r.ID)
// Sort them by grouping, then Digest for determinism
q = q.OrderBy(groupingField, firestore.Asc).OrderBy(digestField, firestore.Asc)
qs = append(qs, q)
}
// Then fire them all off in parallel.
err = f.client.IterDocsInParallel("query_log_details", d, qs, 3, maxOperationTime, func(i int, doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
tc := triageChanges{}
if err := doc.DataTo(&tc); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in firestore, could not unmarshal triage changes with id %s", id)
}
rv[i].Details = append(rv[i].Details, expstorage.TriageDetail{
TestName: tc.Grouping,
Digest: tc.Digest,
Label: tc.LabelAfter.String(),
})
return nil
})
if err != nil {
return nil, 0, skerr.Wrapf(err, "could not query details")
}
return rv, len(rv), nil
}
// UndoChange implements the ExpectationsStore interface.
func (f *Store) UndoChange(ctx context.Context, changeID, userID string) (types.Expectations, error) {
defer metrics2.FuncTimer().Stop()
if f.mode == ReadOnly {
return nil, ReadOnlyErr
}
// Verify the original change id exists.
dr := f.client.Collection(triageRecordsCollection).Doc(changeID)
doc, err := f.client.Get(dr, 3, maxOperationTime)
if err != nil || !doc.Exists() {
return nil, skerr.Wrapf(err, "could not find change to undo with id %s", changeID)
}
q := f.client.Collection(triageChangesCollection).Where(recordIDField, "==", changeID)
delta := types.Expectations{}
err = f.client.IterDocs("undo_query", changeID, q, 3, maxOperationTime, func(doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
tc := triageChanges{}
if err := doc.DataTo(&tc); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in firestore, could not unmarshal triage changes with id %s", id)
}
delta.AddDigest(tc.Grouping, tc.Digest, tc.LabelBefore)
return nil
})
if err != nil {
return nil, skerr.Wrapf(err, "could not get delta to undo %s", changeID)
}
if err = f.AddChange(ctx, delta, userID); err != nil {
return nil, skerr.Wrapf(err, "could not apply delta to undo %s", changeID)
}
return delta, nil
}
// Make sure Store fulfills the ExpectationsStore interface
var _ expstorage.ExpectationsStore = (*Store)(nil)