[gold] Start implementing Firestore ExpectationsStore

This is only the add and get parts, in an effort to avoid
monolithic CLs.

Bug: skia:9090
Change-Id: I939b4c30e9844458e79fc20f1c60dbfba46ca4cf
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/217616
Commit-Queue: Kevin Lubick <kjlubick@google.com>
Reviewed-by: Joe Gregorio <jcgregorio@google.com>
diff --git a/go/testutils/unittest/unittest.go b/go/testutils/unittest/unittest.go
index 408fe90..5c980a7 100644
--- a/go/testutils/unittest/unittest.go
+++ b/go/testutils/unittest/unittest.go
@@ -107,6 +107,8 @@
 	}
 }
 
+// RequiresBigTableEmulator is a function that documents a unittest requires the
+// BigTable Emulator and checks that the appropriate environment variable is set.
 func RequiresBigTableEmulator(t sktest.TestingT) {
 	s := os.Getenv("BIGTABLE_EMULATOR_HOST")
 	if s == "" {
@@ -117,3 +119,25 @@
 `)
 	}
 }
+
+// RequiresFirestoreEmulator is a function that documents a unittest requires the
+// Firestore Emulator and checks that the appropriate environment variable is set.
+func RequiresFirestoreEmulator(t sktest.TestingT) {
+	s := os.Getenv("FIRESTORE_EMULATOR_HOST")
+	if s == "" {
+		t.Fatal(`This test requires the Firestore emulator, which requires some manual setup:
+gcloud beta emulators firestore start
+# The above will install the emulator and fail with an error like:
+#   [firestore] Error trying to exec /path/to/cloud-firestore-emulator.jar
+# See b/134379774
+chmod +x /path/to/cloud-firestore-emulator.jar
+# The default params try to use IPv6, which doesn't work great for our clients, so
+# we need to start it manually like:
+/path/to/cloud-firestore-emulator.jar --host=localhost --port=8151
+
+# Once the emulator is running, we need to run the following in the terminal
+# that we are running the tests in:
+export FIRESTORE_EMULATOR_HOST=localhost:8151
+`)
+	}
+}
diff --git a/golden/go/expstorage/fs_expstore/FIRESTORE.md b/golden/go/expstorage/fs_expstore/FIRESTORE.md
index a94ce43..2218bb7 100644
--- a/golden/go/expstorage/fs_expstore/FIRESTORE.md
+++ b/golden/go/expstorage/fs_expstore/FIRESTORE.md
@@ -36,7 +36,7 @@
 Underneath that parent Document, we will create a three Collections:
 `expectations`, `triage_records`, and `triage_changes`.
 
-In the `expectations` Collection, we will store many ExpectationEntry Documents with
+In the `expectations` Collection, we will store many `expectationEntry` Documents with
 the following schema:
 
 	Grouping       string    # starting as the TestName
@@ -45,15 +45,18 @@
 	Updated        time.Time
 	Issue          int64     # 0 for master branch, nonzero for CLs
 
-The `triage_records` Collection will have TriageRecords Documents:
+The `expectationEntry` will have an ID of `[grouping]|[digest]`, allowing updates.
+
+The `triage_records` Collection will have `triageRecords` Documents:
 
 	ID           string    # autogenerated
 	UserName     string
 	TS           time.Time
 	Issue        int64
+	Committed    bool      # if writing has completed (e.g. large triage)
 	Changes      int       # how many records match in triage_changes Collection
 
-The `triage_changes` Collection will have TriageChanges Documents:
+The `triage_changes` Collection will have `triageChanges` Documents:
 
 	RecordID       string # From the triage_records table
 	Grouping       string
@@ -75,7 +78,7 @@
 Usage
 -----
 
-To create the MasterExpectations map (at startup), we simply query all ExpectationEntry
+To create the MasterExpectations map (at startup), we simply query all `expectationEntry`
 Documents with Issue==0 and assemble them together. The implementation will have an Expectations
 map in RAM that acts as a write-through cache.
 
diff --git a/golden/go/expstorage/fs_expstore/fs_expstore.go b/golden/go/expstorage/fs_expstore/fs_expstore.go
new file mode 100644
index 0000000..904d3eb
--- /dev/null
+++ b/golden/go/expstorage/fs_expstore/fs_expstore.go
@@ -0,0 +1,260 @@
+// Implements an ExpectationsStore based on Firestore. See FIRESTORE.md for the schema
+// and design rationale.
+package fs_expstore
+
+import (
+	"context"
+	"errors"
+	"sync"
+	"time"
+
+	"cloud.google.com/go/firestore"
+	"github.com/cenkalti/backoff"
+	ifirestore "go.skia.org/infra/go/firestore"
+	"go.skia.org/infra/go/skerr"
+	"go.skia.org/infra/golden/go/types"
+)
+
+// AccessMode indicates if this ExpectationsStore can update existing Expectations
+// in the backing store or if if can only read them.
+type AccessMode int
+
+const (
+	ReadOnly AccessMode = iota
+	ReadWrite
+)
+
+const (
+	MasterBranch = int64(0)
+)
+
+var (
+	ReadOnlyErr = errors.New("expectationStore is in read-only mode")
+)
+
+const (
+	// Should be used to create the firestore.NewClient that is passed into New.
+	ExpectationStoreCollection = "expstore"
+
+	// These are the collections in Firestore.
+	expectationsCollection  = "expectations"
+	triageRecordsCollection = "triage_records"
+	triageChangesCollection = "triage_changes"
+
+	// Columns in the Collections we query by.
+	issueCol = "issue"
+
+	maxOperationTime = 2 * time.Minute
+)
+
+// Store implements expstorage.ExpectationsStore backed by
+// Firestore. It has a write-through caching mechanism.
+type Store struct {
+	client *ifirestore.Client
+	mode   AccessMode
+	issue  int64 // Gerrit or GitHub issue, or MasterBranch
+
+	// cacheMutex protects the write-through cache object.
+	cacheMutex sync.RWMutex
+	cache      types.Expectations
+}
+
+// expectationEntry is the document type stored in the expectationsCollection.
+type expectationEntry struct {
+	Grouping types.TestName `firestore:"grouping"`
+	Digest   types.Digest   `firestore:"digest"`
+	Label    types.Label    `firestore:"label"`
+	Updated  time.Time      `firestore:"updated"`
+	Issue    int64          `firestore:"issue"`
+}
+
+// ID returns the deterministic ID that lets us update existing entries.
+func (e *expectationEntry) ID() string {
+	return string(e.Grouping) + "|" + string(e.Digest)
+}
+
+// triageRecord is the document type stored in the triageRecordsCollection.
+type triageRecord struct {
+	UserName  string    `firestore:"user"`
+	TS        time.Time `firestore:"ts"`
+	Issue     int64     `firestore:"issue"`
+	Changes   int       `firestore:"changes"`
+	Committed bool      `firestore:"committed"`
+}
+
+// triageChanges is the document type stored in the triageChangesCollection.
+type triageChanges struct {
+	RecordID    string         `firestore:"record_id"`
+	Grouping    types.TestName `firestore:"grouping"`
+	Digest      types.Digest   `firestore:"digest"`
+	LabelBefore types.Label    `firestore:"before"`
+	LabelAfter  types.Label    `firestore:"after"`
+}
+
+// New returns a new Store using the given firestore client. The issue param is used
+// to indicate if this Store is configured to read/write the baselines for a given CL
+// or if it is on MasterBranch.
+func New(client *ifirestore.Client, issue int64, mode AccessMode) *Store {
+	return &Store{
+		client: client,
+		issue:  issue,
+		mode:   mode,
+	}
+}
+
+// Get implements the ExpectationsStore interface.
+func (f *Store) Get() (types.Expectations, error) {
+	f.cacheMutex.RLock()
+	defer f.cacheMutex.RUnlock()
+	if f.cache == nil {
+		c, err := f.loadExpectations()
+		if err != nil {
+			return nil, skerr.Fmt("could not load expectations from firestore: %s", err)
+		}
+		f.cache = c
+	}
+	return f.cache.DeepCopy(), nil
+}
+
+// loadExpectations reads the entire Expectations from the expectationsCollection,
+// matching the configured branch.
+func (f *Store) loadExpectations() (types.Expectations, error) {
+	e := types.Expectations{}
+	q := f.client.Collection(expectationsCollection).Where(issueCol, "==", MasterBranch)
+	maxRetries := 3
+	err := f.client.IterDocs("loadExpectations", "", q, maxRetries, maxOperationTime, func(doc *firestore.DocumentSnapshot) error {
+		if doc == nil {
+			return nil
+		}
+		entry := expectationEntry{}
+		if err := doc.DataTo(&entry); err != nil {
+			id := doc.Ref.ID
+			return skerr.Fmt("corrupt data in firestore, could not unmarshal entry with id %s: %s", id, err)
+		}
+		e.AddDigest(entry.Grouping, entry.Digest, entry.Label)
+		return nil
+	})
+	return e, err
+}
+
+// AddChange implements the ExpectationsStore interface.
+func (f *Store) AddChange(ctx context.Context, newExp types.Expectations, userId string) error {
+	if f.mode == ReadOnly {
+		return ReadOnlyErr
+	}
+	// Create the entries that we want to write (using the previous values)
+	now, entries, changes := func() (time.Time, []expectationEntry, []triageChanges) {
+		f.cacheMutex.Lock()
+		defer f.cacheMutex.Unlock()
+		now := time.Now()
+		entries, changes := f.flatten(now, newExp)
+
+		// Write the changes to the locale cache. We do this first so we can free up
+		// the read mutex as soon as possible.
+		if f.cache == nil {
+			f.cache = newExp.DeepCopy()
+		} else {
+			f.cache.MergeExpectations(newExp)
+		}
+		return now, entries, changes
+	}()
+
+	// Nothing to add
+	if len(entries) == 0 {
+		return nil
+	}
+
+	// firestore can do up to 500 writes at once, we have 2 writes per entry, plus 1 triageRecord
+	batchSize := (500 / 2) - 1
+
+	b := f.client.Batch()
+
+	// First write the triage record, with Committed being false (i.e. in progress)
+	tr := f.client.Collection(triageRecordsCollection).NewDoc()
+	record := triageRecord{
+		UserName:  userId,
+		TS:        now,
+		Issue:     f.issue,
+		Changes:   len(entries),
+		Committed: false,
+	}
+	b.Set(tr, record)
+
+	// In batches, add ExpectationEntry and TriageChange Documents
+	for i := 0; i < len(entries); i += batchSize {
+		stop := i + batchSize
+		if stop > len(entries) {
+			stop = len(entries)
+		}
+
+		for idx, entry := range entries[i:stop] {
+			e := f.client.Collection(expectationsCollection).Doc(entry.ID())
+			b.Set(e, entry)
+
+			tc := f.client.Collection(triageChangesCollection).NewDoc()
+			change := changes[idx]
+			change.RecordID = tr.ID
+			b.Set(tc, change)
+		}
+
+		exp := &backoff.ExponentialBackOff{
+			InitialInterval:     time.Second,
+			RandomizationFactor: 0.5,
+			Multiplier:          2,
+			MaxInterval:         maxOperationTime / 4,
+			MaxElapsedTime:      maxOperationTime,
+			Clock:               backoff.SystemClock,
+		}
+
+		o := func() error {
+			_, err := b.Commit(ctx)
+			return err
+		}
+
+		if err := backoff.Retry(o, exp); err != nil {
+			// We really hope this doesn't happen, as it may leave the data in a partially
+			// broken state.
+			return skerr.Fmt("problem writing entries with retry [%d, %d]: %s", i, stop, err)
+		}
+		// Go on to the next batch, if needed.
+		if stop < len(entries) {
+			b = f.client.Batch()
+		}
+	}
+
+	// We have succeeded this potentially long write, so mark it completed.
+	update := map[string]interface{}{
+		"committed": true,
+	}
+	_, err := f.client.Set(tr, update, 10, maxOperationTime, firestore.MergeAll)
+	return err
+}
+
+// flatten creates the data for the Documents to be written for a given Expectations delta.
+// It requires that the f.cache is safe to read (i.e. the mutex is held), because
+// it needs to determine the previous values.
+func (f *Store) flatten(now time.Time, newExp types.Expectations) ([]expectationEntry, []triageChanges) {
+	var entries []expectationEntry
+	var changes []triageChanges
+
+	for testName, digestMap := range newExp {
+		for digest, label := range digestMap {
+			entries = append(entries, expectationEntry{
+				Grouping: testName,
+				Digest:   digest,
+				Label:    label,
+				Updated:  now,
+				Issue:    f.issue,
+			})
+
+			changes = append(changes, triageChanges{
+				// RecordID will be filled out later
+				Grouping:    testName,
+				Digest:      digest,
+				LabelBefore: f.cache.Classification(testName, digest),
+				LabelAfter:  label,
+			})
+		}
+	}
+	return entries, changes
+}
diff --git a/golden/go/expstorage/fs_expstore/fs_expstore_test.go b/golden/go/expstorage/fs_expstore/fs_expstore_test.go
new file mode 100644
index 0000000..58c3426
--- /dev/null
+++ b/golden/go/expstorage/fs_expstore/fs_expstore_test.go
@@ -0,0 +1,261 @@
+package fs_expstore
+
+import (
+	"context"
+	"fmt"
+	"sync"
+	"testing"
+
+	"github.com/google/uuid"
+	assert "github.com/stretchr/testify/require"
+	"go.skia.org/infra/go/firestore"
+	"go.skia.org/infra/go/testutils/unittest"
+	data "go.skia.org/infra/golden/go/testutils/data_three_devices"
+	"go.skia.org/infra/golden/go/types"
+)
+
+// TODO(kjlubick): These tests are marked as manual because the
+// Firestore Emulator is not yet on the bots, due to some more complicated
+// setup (e.g. chmod)
+
+// TestGetExpectations writes some changes and then reads back the
+// aggregated results.
+func TestGetExpectations(t *testing.T) {
+	unittest.ManualTest(t)
+	unittest.RequiresFirestoreEmulator(t)
+
+	c := getTestFirestoreInstance(t)
+
+	f := New(c, MasterBranch, ReadWrite)
+
+	// Brand new instance should have no expectations
+	e, err := f.Get()
+	assert.NoError(t, err)
+	assert.Equal(t, types.Expectations{}, e)
+
+	ctx := context.Background()
+	err = f.AddChange(ctx, types.Expectations{
+		data.AlphaTest: {
+			data.AlphaUntriaged1Digest: types.POSITIVE,
+			data.AlphaGood1Digest:      types.POSITIVE,
+		},
+	}, userOne)
+	assert.NoError(t, err)
+
+	err = f.AddChange(ctx, types.Expectations{
+		data.AlphaTest: {
+			data.AlphaBad1Digest:       types.NEGATIVE,
+			data.AlphaUntriaged1Digest: types.UNTRIAGED, // overwrites previous
+		},
+		data.BetaTest: {
+			data.BetaGood1Digest: types.POSITIVE,
+		},
+	}, userTwo)
+	assert.NoError(t, err)
+
+	e, err = f.Get()
+	assert.NoError(t, err)
+	assert.Equal(t, types.Expectations{
+		data.AlphaTest: {
+			data.AlphaGood1Digest:      types.POSITIVE,
+			data.AlphaBad1Digest:       types.NEGATIVE,
+			data.AlphaUntriaged1Digest: types.UNTRIAGED,
+		},
+		data.BetaTest: {
+			data.BetaGood1Digest: types.POSITIVE,
+		},
+	}, e)
+
+	// Make sure that if we create a new view, we can read the results
+	// from the table to make the expectations
+	fr := New(c, MasterBranch, ReadOnly)
+	e, err = fr.Get()
+	assert.NoError(t, err)
+	assert.Equal(t, types.Expectations{
+		data.AlphaTest: {
+			data.AlphaGood1Digest:      types.POSITIVE,
+			data.AlphaBad1Digest:       types.NEGATIVE,
+			data.AlphaUntriaged1Digest: types.UNTRIAGED,
+		},
+		data.BetaTest: {
+			data.BetaGood1Digest: types.POSITIVE,
+		},
+	}, e)
+}
+
+// TestGetExpectationsRace writes a bunch of data from many go routines
+// in an effort to catch any race conditions in the caching layer.
+func TestGetExpectationsRace(t *testing.T) {
+	unittest.ManualTest(t)
+	unittest.RequiresFirestoreEmulator(t)
+
+	c := getTestFirestoreInstance(t)
+
+	f := New(c, MasterBranch, ReadWrite)
+
+	type entry struct {
+		Grouping types.TestName
+		Digest   types.Digest
+		Label    types.Label
+	}
+
+	entries := []entry{
+		{
+			Grouping: data.AlphaTest,
+			Digest:   data.AlphaUntriaged1Digest,
+			Label:    types.UNTRIAGED,
+		},
+		{
+			Grouping: data.AlphaTest,
+			Digest:   data.AlphaBad1Digest,
+			Label:    types.NEGATIVE,
+		},
+		{
+			Grouping: data.AlphaTest,
+			Digest:   data.AlphaGood1Digest,
+			Label:    types.POSITIVE,
+		},
+		{
+			Grouping: data.BetaTest,
+			Digest:   data.BetaGood1Digest,
+			Label:    types.POSITIVE,
+		},
+		{
+			Grouping: data.BetaTest,
+			Digest:   data.BetaUntriaged1Digest,
+			Label:    types.UNTRIAGED,
+		},
+	}
+
+	ctx := context.Background()
+	wg := sync.WaitGroup{}
+
+	for i := 0; i < 50; i++ {
+		wg.Add(1)
+		go func(i int) {
+			defer wg.Done()
+			e := entries[i%len(entries)]
+			err := f.AddChange(ctx, types.Expectations{
+				e.Grouping: {
+					e.Digest: e.Label,
+				},
+			}, userOne)
+			assert.NoError(t, err)
+		}(i)
+
+		// Make sure we can read and write w/o races
+		if i%5 == 0 {
+			_, err := f.Get()
+			assert.NoError(t, err)
+		}
+	}
+
+	wg.Wait()
+
+	e, err := f.Get()
+	assert.NoError(t, err)
+	assert.Equal(t, types.Expectations{
+		data.AlphaTest: {
+			data.AlphaGood1Digest:      types.POSITIVE,
+			data.AlphaBad1Digest:       types.NEGATIVE,
+			data.AlphaUntriaged1Digest: types.UNTRIAGED,
+		},
+		data.BetaTest: {
+			data.BetaGood1Digest:      types.POSITIVE,
+			data.BetaUntriaged1Digest: types.UNTRIAGED,
+		},
+	}, e)
+}
+
+// TestGetExpectationsBig writes 32^2=1024 entries
+// to test the batch writing.
+func TestGetExpectationsBig(t *testing.T) {
+	unittest.ManualTest(t)
+	unittest.RequiresFirestoreEmulator(t)
+
+	c := getTestFirestoreInstance(t)
+
+	f := New(c, MasterBranch, ReadWrite)
+
+	type entry struct {
+		Grouping types.TestName
+		Digest   types.Digest
+		Label    types.Label
+	}
+
+	// Write the expectations in two, non-overlapping blocks.
+	exp1 := makeBigExpectations(0, 16)
+	exp2 := makeBigExpectations(16, 32)
+
+	expected := exp1.DeepCopy()
+	expected.MergeExpectations(exp2)
+
+	ctx := context.Background()
+	wg := sync.WaitGroup{}
+
+	// Write them concurrently to test for races.
+	wg.Add(2)
+	go func() {
+		defer wg.Done()
+		err := f.AddChange(ctx, exp1, userOne)
+		assert.NoError(t, err)
+	}()
+	go func() {
+		defer wg.Done()
+		err := f.AddChange(ctx, exp2, userTwo)
+		assert.NoError(t, err)
+	}()
+	wg.Wait()
+
+	e, err := f.Get()
+	assert.NoError(t, err)
+	assert.Equal(t, expected, e)
+
+	// Make sure that if we create a new view, we can read the results
+	// from the table to make the expectations
+	fr := New(c, MasterBranch, ReadOnly)
+	e, err = fr.Get()
+	assert.NoError(t, err)
+	assert.Equal(t, expected, e)
+}
+
+// TestReadOnly ensures a read-only instance fails to write data.
+func TestReadOnly(t *testing.T) {
+	unittest.SmallTest(t)
+
+	f := New(nil, MasterBranch, ReadOnly)
+
+	err := f.AddChange(context.Background(), types.Expectations{
+		data.AlphaTest: {
+			data.AlphaGood1Digest: types.POSITIVE,
+		},
+	}, userOne)
+	assert.Error(t, err)
+	assert.Contains(t, err.Error(), "read-only")
+}
+
+// Creates an empty firestore instance. The emulator keeps the tables in ram, but
+// by appending a random nonce, we can be assured the collection we get is empty.
+func getTestFirestoreInstance(t *testing.T) *firestore.Client {
+	randInstance := uuid.New().String()
+	c, err := firestore.NewClient(context.Background(), "should-use-emulator", "gold-test", ExpectationStoreCollection+randInstance, nil)
+	assert.NoError(t, err)
+	return c
+}
+
+// makeBigExpectations makes n tests named from start to end that each have 32 digests.
+func makeBigExpectations(start, end int) types.Expectations {
+	e := types.Expectations{}
+	for i := start; i < end; i++ {
+		for j := 0; j < 32; j++ {
+			e.AddDigest(types.TestName(fmt.Sprintf("test-%03d", i)),
+				types.Digest(fmt.Sprintf("digest-%03d", j)), types.POSITIVE)
+		}
+	}
+	return e
+}
+
+const (
+	userOne = "userOne@example.com"
+	userTwo = "userTwo@example.com"
+)
diff --git a/golden/go/expstorage/types.go b/golden/go/expstorage/types.go
index 5ff8007..739a6d9 100644
--- a/golden/go/expstorage/types.go
+++ b/golden/go/expstorage/types.go
@@ -32,10 +32,14 @@
 type ExpectationsStore interface {
 	// Get the current classifications for image digests. The keys of the
 	// expectations map are the test names.
-	Get() (exp types.Expectations, err error)
+	Get() (types.Expectations, error)
 
 	// AddChange writes the given classified digests to the database and records the
 	// user that made the change.
+	// TODO(kjlubick): This interface leads to a potential race condition if two
+	// users on the front-end click Positive and Negative for the same testname/digest.
+	//  A less racy interface would take an "old value"/"new value" so that if the
+	// old value didn't match, we could reject the change.
 	AddChange(ctx context.Context, changes types.Expectations, userId string) error
 
 	// QueryLog allows to paginate through the changes in the expectations.
diff --git a/golden/go/types/expectations.go b/golden/go/types/expectations.go
index 1353d22..7659417 100644
--- a/golden/go/types/expectations.go
+++ b/golden/go/types/expectations.go
@@ -43,14 +43,7 @@
 			e[testName] = map[Digest]Label{}
 		}
 		for digest, label := range digests {
-			// UNTRIAGED is the default value, so if the passed in version
-			// is explicitly setting a label to UNTRIAGED, we delete what
-			// was already there.
-			if label == UNTRIAGED {
-				delete(e[testName], digest)
-			} else {
-				e[testName][digest] = label
-			}
+			e[testName][digest] = label
 		}
 		// In case we had only assigned UNTRIAGED values
 		if len(e[testName]) == 0 {