[gold] Start implementing Firestore ExpectationsStore
This is only the add and get parts, in an effort to avoid
monolithic CLs.
Bug: skia:9090
Change-Id: I939b4c30e9844458e79fc20f1c60dbfba46ca4cf
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/217616
Commit-Queue: Kevin Lubick <kjlubick@google.com>
Reviewed-by: Joe Gregorio <jcgregorio@google.com>
diff --git a/go/testutils/unittest/unittest.go b/go/testutils/unittest/unittest.go
index 408fe90..5c980a7 100644
--- a/go/testutils/unittest/unittest.go
+++ b/go/testutils/unittest/unittest.go
@@ -107,6 +107,8 @@
}
}
+// RequiresBigTableEmulator is a function that documents a unittest requires the
+// BigTable Emulator and checks that the appropriate environment variable is set.
func RequiresBigTableEmulator(t sktest.TestingT) {
s := os.Getenv("BIGTABLE_EMULATOR_HOST")
if s == "" {
@@ -117,3 +119,25 @@
`)
}
}
+
+// RequiresFirestoreEmulator is a function that documents a unittest requires the
+// Firestore Emulator and checks that the appropriate environment variable is set.
+func RequiresFirestoreEmulator(t sktest.TestingT) {
+ s := os.Getenv("FIRESTORE_EMULATOR_HOST")
+ if s == "" {
+ t.Fatal(`This test requires the Firestore emulator, which requires some manual setup:
+gcloud beta emulators firestore start
+# The above will install the emulator and fail with an error like:
+# [firestore] Error trying to exec /path/to/cloud-firestore-emulator.jar
+# See b/134379774
+chmod +x /path/to/cloud-firestore-emulator.jar
+# The default params try to use IPv6, which doesn't work great for our clients, so
+# we need to start it manually like:
+/path/to/cloud-firestore-emulator.jar --host=localhost --port=8151
+
+# Once the emulator is running, we need to run the following in the terminal
+# that we are running the tests in:
+export FIRESTORE_EMULATOR_HOST=localhost:8151
+`)
+ }
+}
diff --git a/golden/go/expstorage/fs_expstore/FIRESTORE.md b/golden/go/expstorage/fs_expstore/FIRESTORE.md
index a94ce43..2218bb7 100644
--- a/golden/go/expstorage/fs_expstore/FIRESTORE.md
+++ b/golden/go/expstorage/fs_expstore/FIRESTORE.md
@@ -36,7 +36,7 @@
Underneath that parent Document, we will create a three Collections:
`expectations`, `triage_records`, and `triage_changes`.
-In the `expectations` Collection, we will store many ExpectationEntry Documents with
+In the `expectations` Collection, we will store many `expectationEntry` Documents with
the following schema:
Grouping string # starting as the TestName
@@ -45,15 +45,18 @@
Updated time.Time
Issue int64 # 0 for master branch, nonzero for CLs
-The `triage_records` Collection will have TriageRecords Documents:
+The `expectationEntry` will have an ID of `[grouping]|[digest]`, allowing updates.
+
+The `triage_records` Collection will have `triageRecords` Documents:
ID string # autogenerated
UserName string
TS time.Time
Issue int64
+ Committed bool # if writing has completed (e.g. large triage)
Changes int # how many records match in triage_changes Collection
-The `triage_changes` Collection will have TriageChanges Documents:
+The `triage_changes` Collection will have `triageChanges` Documents:
RecordID string # From the triage_records table
Grouping string
@@ -75,7 +78,7 @@
Usage
-----
-To create the MasterExpectations map (at startup), we simply query all ExpectationEntry
+To create the MasterExpectations map (at startup), we simply query all `expectationEntry`
Documents with Issue==0 and assemble them together. The implementation will have an Expectations
map in RAM that acts as a write-through cache.
diff --git a/golden/go/expstorage/fs_expstore/fs_expstore.go b/golden/go/expstorage/fs_expstore/fs_expstore.go
new file mode 100644
index 0000000..904d3eb
--- /dev/null
+++ b/golden/go/expstorage/fs_expstore/fs_expstore.go
@@ -0,0 +1,260 @@
+// Implements an ExpectationsStore based on Firestore. See FIRESTORE.md for the schema
+// and design rationale.
+package fs_expstore
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "time"
+
+ "cloud.google.com/go/firestore"
+ "github.com/cenkalti/backoff"
+ ifirestore "go.skia.org/infra/go/firestore"
+ "go.skia.org/infra/go/skerr"
+ "go.skia.org/infra/golden/go/types"
+)
+
+// AccessMode indicates if this ExpectationsStore can update existing Expectations
+// in the backing store or if if can only read them.
+type AccessMode int
+
+const (
+ ReadOnly AccessMode = iota
+ ReadWrite
+)
+
+const (
+ MasterBranch = int64(0)
+)
+
+var (
+ ReadOnlyErr = errors.New("expectationStore is in read-only mode")
+)
+
+const (
+ // Should be used to create the firestore.NewClient that is passed into New.
+ ExpectationStoreCollection = "expstore"
+
+ // These are the collections in Firestore.
+ expectationsCollection = "expectations"
+ triageRecordsCollection = "triage_records"
+ triageChangesCollection = "triage_changes"
+
+ // Columns in the Collections we query by.
+ issueCol = "issue"
+
+ maxOperationTime = 2 * time.Minute
+)
+
+// Store implements expstorage.ExpectationsStore backed by
+// Firestore. It has a write-through caching mechanism.
+type Store struct {
+ client *ifirestore.Client
+ mode AccessMode
+ issue int64 // Gerrit or GitHub issue, or MasterBranch
+
+ // cacheMutex protects the write-through cache object.
+ cacheMutex sync.RWMutex
+ cache types.Expectations
+}
+
+// expectationEntry is the document type stored in the expectationsCollection.
+type expectationEntry struct {
+ Grouping types.TestName `firestore:"grouping"`
+ Digest types.Digest `firestore:"digest"`
+ Label types.Label `firestore:"label"`
+ Updated time.Time `firestore:"updated"`
+ Issue int64 `firestore:"issue"`
+}
+
+// ID returns the deterministic ID that lets us update existing entries.
+func (e *expectationEntry) ID() string {
+ return string(e.Grouping) + "|" + string(e.Digest)
+}
+
+// triageRecord is the document type stored in the triageRecordsCollection.
+type triageRecord struct {
+ UserName string `firestore:"user"`
+ TS time.Time `firestore:"ts"`
+ Issue int64 `firestore:"issue"`
+ Changes int `firestore:"changes"`
+ Committed bool `firestore:"committed"`
+}
+
+// triageChanges is the document type stored in the triageChangesCollection.
+type triageChanges struct {
+ RecordID string `firestore:"record_id"`
+ Grouping types.TestName `firestore:"grouping"`
+ Digest types.Digest `firestore:"digest"`
+ LabelBefore types.Label `firestore:"before"`
+ LabelAfter types.Label `firestore:"after"`
+}
+
+// New returns a new Store using the given firestore client. The issue param is used
+// to indicate if this Store is configured to read/write the baselines for a given CL
+// or if it is on MasterBranch.
+func New(client *ifirestore.Client, issue int64, mode AccessMode) *Store {
+ return &Store{
+ client: client,
+ issue: issue,
+ mode: mode,
+ }
+}
+
+// Get implements the ExpectationsStore interface.
+func (f *Store) Get() (types.Expectations, error) {
+ f.cacheMutex.RLock()
+ defer f.cacheMutex.RUnlock()
+ if f.cache == nil {
+ c, err := f.loadExpectations()
+ if err != nil {
+ return nil, skerr.Fmt("could not load expectations from firestore: %s", err)
+ }
+ f.cache = c
+ }
+ return f.cache.DeepCopy(), nil
+}
+
+// loadExpectations reads the entire Expectations from the expectationsCollection,
+// matching the configured branch.
+func (f *Store) loadExpectations() (types.Expectations, error) {
+ e := types.Expectations{}
+ q := f.client.Collection(expectationsCollection).Where(issueCol, "==", MasterBranch)
+ maxRetries := 3
+ err := f.client.IterDocs("loadExpectations", "", q, maxRetries, maxOperationTime, func(doc *firestore.DocumentSnapshot) error {
+ if doc == nil {
+ return nil
+ }
+ entry := expectationEntry{}
+ if err := doc.DataTo(&entry); err != nil {
+ id := doc.Ref.ID
+ return skerr.Fmt("corrupt data in firestore, could not unmarshal entry with id %s: %s", id, err)
+ }
+ e.AddDigest(entry.Grouping, entry.Digest, entry.Label)
+ return nil
+ })
+ return e, err
+}
+
+// AddChange implements the ExpectationsStore interface.
+func (f *Store) AddChange(ctx context.Context, newExp types.Expectations, userId string) error {
+ if f.mode == ReadOnly {
+ return ReadOnlyErr
+ }
+ // Create the entries that we want to write (using the previous values)
+ now, entries, changes := func() (time.Time, []expectationEntry, []triageChanges) {
+ f.cacheMutex.Lock()
+ defer f.cacheMutex.Unlock()
+ now := time.Now()
+ entries, changes := f.flatten(now, newExp)
+
+ // Write the changes to the locale cache. We do this first so we can free up
+ // the read mutex as soon as possible.
+ if f.cache == nil {
+ f.cache = newExp.DeepCopy()
+ } else {
+ f.cache.MergeExpectations(newExp)
+ }
+ return now, entries, changes
+ }()
+
+ // Nothing to add
+ if len(entries) == 0 {
+ return nil
+ }
+
+ // firestore can do up to 500 writes at once, we have 2 writes per entry, plus 1 triageRecord
+ batchSize := (500 / 2) - 1
+
+ b := f.client.Batch()
+
+ // First write the triage record, with Committed being false (i.e. in progress)
+ tr := f.client.Collection(triageRecordsCollection).NewDoc()
+ record := triageRecord{
+ UserName: userId,
+ TS: now,
+ Issue: f.issue,
+ Changes: len(entries),
+ Committed: false,
+ }
+ b.Set(tr, record)
+
+ // In batches, add ExpectationEntry and TriageChange Documents
+ for i := 0; i < len(entries); i += batchSize {
+ stop := i + batchSize
+ if stop > len(entries) {
+ stop = len(entries)
+ }
+
+ for idx, entry := range entries[i:stop] {
+ e := f.client.Collection(expectationsCollection).Doc(entry.ID())
+ b.Set(e, entry)
+
+ tc := f.client.Collection(triageChangesCollection).NewDoc()
+ change := changes[idx]
+ change.RecordID = tr.ID
+ b.Set(tc, change)
+ }
+
+ exp := &backoff.ExponentialBackOff{
+ InitialInterval: time.Second,
+ RandomizationFactor: 0.5,
+ Multiplier: 2,
+ MaxInterval: maxOperationTime / 4,
+ MaxElapsedTime: maxOperationTime,
+ Clock: backoff.SystemClock,
+ }
+
+ o := func() error {
+ _, err := b.Commit(ctx)
+ return err
+ }
+
+ if err := backoff.Retry(o, exp); err != nil {
+ // We really hope this doesn't happen, as it may leave the data in a partially
+ // broken state.
+ return skerr.Fmt("problem writing entries with retry [%d, %d]: %s", i, stop, err)
+ }
+ // Go on to the next batch, if needed.
+ if stop < len(entries) {
+ b = f.client.Batch()
+ }
+ }
+
+ // We have succeeded this potentially long write, so mark it completed.
+ update := map[string]interface{}{
+ "committed": true,
+ }
+ _, err := f.client.Set(tr, update, 10, maxOperationTime, firestore.MergeAll)
+ return err
+}
+
+// flatten creates the data for the Documents to be written for a given Expectations delta.
+// It requires that the f.cache is safe to read (i.e. the mutex is held), because
+// it needs to determine the previous values.
+func (f *Store) flatten(now time.Time, newExp types.Expectations) ([]expectationEntry, []triageChanges) {
+ var entries []expectationEntry
+ var changes []triageChanges
+
+ for testName, digestMap := range newExp {
+ for digest, label := range digestMap {
+ entries = append(entries, expectationEntry{
+ Grouping: testName,
+ Digest: digest,
+ Label: label,
+ Updated: now,
+ Issue: f.issue,
+ })
+
+ changes = append(changes, triageChanges{
+ // RecordID will be filled out later
+ Grouping: testName,
+ Digest: digest,
+ LabelBefore: f.cache.Classification(testName, digest),
+ LabelAfter: label,
+ })
+ }
+ }
+ return entries, changes
+}
diff --git a/golden/go/expstorage/fs_expstore/fs_expstore_test.go b/golden/go/expstorage/fs_expstore/fs_expstore_test.go
new file mode 100644
index 0000000..58c3426
--- /dev/null
+++ b/golden/go/expstorage/fs_expstore/fs_expstore_test.go
@@ -0,0 +1,261 @@
+package fs_expstore
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "testing"
+
+ "github.com/google/uuid"
+ assert "github.com/stretchr/testify/require"
+ "go.skia.org/infra/go/firestore"
+ "go.skia.org/infra/go/testutils/unittest"
+ data "go.skia.org/infra/golden/go/testutils/data_three_devices"
+ "go.skia.org/infra/golden/go/types"
+)
+
+// TODO(kjlubick): These tests are marked as manual because the
+// Firestore Emulator is not yet on the bots, due to some more complicated
+// setup (e.g. chmod)
+
+// TestGetExpectations writes some changes and then reads back the
+// aggregated results.
+func TestGetExpectations(t *testing.T) {
+ unittest.ManualTest(t)
+ unittest.RequiresFirestoreEmulator(t)
+
+ c := getTestFirestoreInstance(t)
+
+ f := New(c, MasterBranch, ReadWrite)
+
+ // Brand new instance should have no expectations
+ e, err := f.Get()
+ assert.NoError(t, err)
+ assert.Equal(t, types.Expectations{}, e)
+
+ ctx := context.Background()
+ err = f.AddChange(ctx, types.Expectations{
+ data.AlphaTest: {
+ data.AlphaUntriaged1Digest: types.POSITIVE,
+ data.AlphaGood1Digest: types.POSITIVE,
+ },
+ }, userOne)
+ assert.NoError(t, err)
+
+ err = f.AddChange(ctx, types.Expectations{
+ data.AlphaTest: {
+ data.AlphaBad1Digest: types.NEGATIVE,
+ data.AlphaUntriaged1Digest: types.UNTRIAGED, // overwrites previous
+ },
+ data.BetaTest: {
+ data.BetaGood1Digest: types.POSITIVE,
+ },
+ }, userTwo)
+ assert.NoError(t, err)
+
+ e, err = f.Get()
+ assert.NoError(t, err)
+ assert.Equal(t, types.Expectations{
+ data.AlphaTest: {
+ data.AlphaGood1Digest: types.POSITIVE,
+ data.AlphaBad1Digest: types.NEGATIVE,
+ data.AlphaUntriaged1Digest: types.UNTRIAGED,
+ },
+ data.BetaTest: {
+ data.BetaGood1Digest: types.POSITIVE,
+ },
+ }, e)
+
+ // Make sure that if we create a new view, we can read the results
+ // from the table to make the expectations
+ fr := New(c, MasterBranch, ReadOnly)
+ e, err = fr.Get()
+ assert.NoError(t, err)
+ assert.Equal(t, types.Expectations{
+ data.AlphaTest: {
+ data.AlphaGood1Digest: types.POSITIVE,
+ data.AlphaBad1Digest: types.NEGATIVE,
+ data.AlphaUntriaged1Digest: types.UNTRIAGED,
+ },
+ data.BetaTest: {
+ data.BetaGood1Digest: types.POSITIVE,
+ },
+ }, e)
+}
+
+// TestGetExpectationsRace writes a bunch of data from many go routines
+// in an effort to catch any race conditions in the caching layer.
+func TestGetExpectationsRace(t *testing.T) {
+ unittest.ManualTest(t)
+ unittest.RequiresFirestoreEmulator(t)
+
+ c := getTestFirestoreInstance(t)
+
+ f := New(c, MasterBranch, ReadWrite)
+
+ type entry struct {
+ Grouping types.TestName
+ Digest types.Digest
+ Label types.Label
+ }
+
+ entries := []entry{
+ {
+ Grouping: data.AlphaTest,
+ Digest: data.AlphaUntriaged1Digest,
+ Label: types.UNTRIAGED,
+ },
+ {
+ Grouping: data.AlphaTest,
+ Digest: data.AlphaBad1Digest,
+ Label: types.NEGATIVE,
+ },
+ {
+ Grouping: data.AlphaTest,
+ Digest: data.AlphaGood1Digest,
+ Label: types.POSITIVE,
+ },
+ {
+ Grouping: data.BetaTest,
+ Digest: data.BetaGood1Digest,
+ Label: types.POSITIVE,
+ },
+ {
+ Grouping: data.BetaTest,
+ Digest: data.BetaUntriaged1Digest,
+ Label: types.UNTRIAGED,
+ },
+ }
+
+ ctx := context.Background()
+ wg := sync.WaitGroup{}
+
+ for i := 0; i < 50; i++ {
+ wg.Add(1)
+ go func(i int) {
+ defer wg.Done()
+ e := entries[i%len(entries)]
+ err := f.AddChange(ctx, types.Expectations{
+ e.Grouping: {
+ e.Digest: e.Label,
+ },
+ }, userOne)
+ assert.NoError(t, err)
+ }(i)
+
+ // Make sure we can read and write w/o races
+ if i%5 == 0 {
+ _, err := f.Get()
+ assert.NoError(t, err)
+ }
+ }
+
+ wg.Wait()
+
+ e, err := f.Get()
+ assert.NoError(t, err)
+ assert.Equal(t, types.Expectations{
+ data.AlphaTest: {
+ data.AlphaGood1Digest: types.POSITIVE,
+ data.AlphaBad1Digest: types.NEGATIVE,
+ data.AlphaUntriaged1Digest: types.UNTRIAGED,
+ },
+ data.BetaTest: {
+ data.BetaGood1Digest: types.POSITIVE,
+ data.BetaUntriaged1Digest: types.UNTRIAGED,
+ },
+ }, e)
+}
+
+// TestGetExpectationsBig writes 32^2=1024 entries
+// to test the batch writing.
+func TestGetExpectationsBig(t *testing.T) {
+ unittest.ManualTest(t)
+ unittest.RequiresFirestoreEmulator(t)
+
+ c := getTestFirestoreInstance(t)
+
+ f := New(c, MasterBranch, ReadWrite)
+
+ type entry struct {
+ Grouping types.TestName
+ Digest types.Digest
+ Label types.Label
+ }
+
+ // Write the expectations in two, non-overlapping blocks.
+ exp1 := makeBigExpectations(0, 16)
+ exp2 := makeBigExpectations(16, 32)
+
+ expected := exp1.DeepCopy()
+ expected.MergeExpectations(exp2)
+
+ ctx := context.Background()
+ wg := sync.WaitGroup{}
+
+ // Write them concurrently to test for races.
+ wg.Add(2)
+ go func() {
+ defer wg.Done()
+ err := f.AddChange(ctx, exp1, userOne)
+ assert.NoError(t, err)
+ }()
+ go func() {
+ defer wg.Done()
+ err := f.AddChange(ctx, exp2, userTwo)
+ assert.NoError(t, err)
+ }()
+ wg.Wait()
+
+ e, err := f.Get()
+ assert.NoError(t, err)
+ assert.Equal(t, expected, e)
+
+ // Make sure that if we create a new view, we can read the results
+ // from the table to make the expectations
+ fr := New(c, MasterBranch, ReadOnly)
+ e, err = fr.Get()
+ assert.NoError(t, err)
+ assert.Equal(t, expected, e)
+}
+
+// TestReadOnly ensures a read-only instance fails to write data.
+func TestReadOnly(t *testing.T) {
+ unittest.SmallTest(t)
+
+ f := New(nil, MasterBranch, ReadOnly)
+
+ err := f.AddChange(context.Background(), types.Expectations{
+ data.AlphaTest: {
+ data.AlphaGood1Digest: types.POSITIVE,
+ },
+ }, userOne)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "read-only")
+}
+
+// Creates an empty firestore instance. The emulator keeps the tables in ram, but
+// by appending a random nonce, we can be assured the collection we get is empty.
+func getTestFirestoreInstance(t *testing.T) *firestore.Client {
+ randInstance := uuid.New().String()
+ c, err := firestore.NewClient(context.Background(), "should-use-emulator", "gold-test", ExpectationStoreCollection+randInstance, nil)
+ assert.NoError(t, err)
+ return c
+}
+
+// makeBigExpectations makes n tests named from start to end that each have 32 digests.
+func makeBigExpectations(start, end int) types.Expectations {
+ e := types.Expectations{}
+ for i := start; i < end; i++ {
+ for j := 0; j < 32; j++ {
+ e.AddDigest(types.TestName(fmt.Sprintf("test-%03d", i)),
+ types.Digest(fmt.Sprintf("digest-%03d", j)), types.POSITIVE)
+ }
+ }
+ return e
+}
+
+const (
+ userOne = "userOne@example.com"
+ userTwo = "userTwo@example.com"
+)
diff --git a/golden/go/expstorage/types.go b/golden/go/expstorage/types.go
index 5ff8007..739a6d9 100644
--- a/golden/go/expstorage/types.go
+++ b/golden/go/expstorage/types.go
@@ -32,10 +32,14 @@
type ExpectationsStore interface {
// Get the current classifications for image digests. The keys of the
// expectations map are the test names.
- Get() (exp types.Expectations, err error)
+ Get() (types.Expectations, error)
// AddChange writes the given classified digests to the database and records the
// user that made the change.
+ // TODO(kjlubick): This interface leads to a potential race condition if two
+ // users on the front-end click Positive and Negative for the same testname/digest.
+ // A less racy interface would take an "old value"/"new value" so that if the
+ // old value didn't match, we could reject the change.
AddChange(ctx context.Context, changes types.Expectations, userId string) error
// QueryLog allows to paginate through the changes in the expectations.
diff --git a/golden/go/types/expectations.go b/golden/go/types/expectations.go
index 1353d22..7659417 100644
--- a/golden/go/types/expectations.go
+++ b/golden/go/types/expectations.go
@@ -43,14 +43,7 @@
e[testName] = map[Digest]Label{}
}
for digest, label := range digests {
- // UNTRIAGED is the default value, so if the passed in version
- // is explicitly setting a label to UNTRIAGED, we delete what
- // was already there.
- if label == UNTRIAGED {
- delete(e[testName], digest)
- } else {
- e[testName][digest] = label
- }
+ e[testName][digest] = label
}
// In case we had only assigned UNTRIAGED values
if len(e[testName]) == 0 {