|  | package firestore | 
|  |  | 
|  | /* | 
|  | This package provides convenience functions for interacting with Cloud Firestore. | 
|  | */ | 
|  |  | 
|  | import ( | 
|  | "context" | 
|  | "crypto/rand" | 
|  | "errors" | 
|  | "fmt" | 
|  | "os" | 
|  | "runtime" | 
|  | "sort" | 
|  | "strings" | 
|  | "sync" | 
|  | "time" | 
|  |  | 
|  | "cloud.google.com/go/firestore" | 
|  | "github.com/cenkalti/backoff" | 
|  | "github.com/google/uuid" | 
|  | "go.skia.org/infra/go/metrics2" | 
|  | "go.skia.org/infra/go/skerr" | 
|  | "go.skia.org/infra/go/sklog" | 
|  | "go.skia.org/infra/go/sktest" | 
|  | "go.skia.org/infra/go/util" | 
|  | "golang.org/x/oauth2" | 
|  | "google.golang.org/api/iterator" | 
|  | "google.golang.org/api/option" | 
|  | "google.golang.org/grpc/codes" | 
|  | "google.golang.org/grpc/status" | 
|  | ) | 
|  |  | 
|  | const ( | 
|  | // Firestore has a timestamp resolution of one microsecond. | 
|  | TS_RESOLUTION = time.Microsecond | 
|  |  | 
|  | // List all apps here as constants. | 
|  | APP_TASK_SCHEDULER = "task-scheduler" | 
|  |  | 
|  | // Base wait time between attempts. | 
|  | BACKOFF_WAIT = 5 * time.Second | 
|  |  | 
|  | // Project ID. At the moment only the skia-firestore project has | 
|  | // Firestore enabled. | 
|  | FIRESTORE_PROJECT = "skia-firestore" | 
|  |  | 
|  | // List all instances here as constants. | 
|  | INSTANCE_PROD = "prod" | 
|  | INSTANCE_TEST = "test" | 
|  |  | 
|  | // Maximum number of docs in a single transaction. | 
|  | MAX_TRANSACTION_DOCS = 500 | 
|  |  | 
|  | // IterDocs won't iterate for longer than this amount of time at once, | 
|  | // otherwise we risk server timeouts. Instead, it will stop and resume | 
|  | // iteration. | 
|  | MAX_ITER_TIME = 50 * time.Second | 
|  |  | 
|  | // Length of IDs returned by AlphaNumID. | 
|  | ID_LEN = 20 | 
|  |  | 
|  | opTypeRead  = "read" | 
|  | opTypeWrite = "write" | 
|  |  | 
|  | opCountRows    = "rows" | 
|  | opCountQueries = "queries" | 
|  |  | 
|  | alphaNum = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" | 
|  |  | 
|  | EmulatorEnvVar = "FIRESTORE_EMULATOR_HOST" | 
|  | ) | 
|  |  | 
|  | var ( | 
|  | // We will retry requests which result in these errors. | 
|  | retryErrors = []codes.Code{ | 
|  | codes.DeadlineExceeded, | 
|  | codes.ResourceExhausted, | 
|  | codes.Aborted, | 
|  | codes.Internal, | 
|  | codes.Unavailable, | 
|  | codes.Canceled, | 
|  | } | 
|  |  | 
|  | // errIterTooLong is a special error used in conjunction with | 
|  | // MAX_ITER_TIME and IterDocs to prevent running into server timeouts | 
|  | // when iterating a large number of entries. | 
|  | errIterTooLong = errors.New("iterated too long") | 
|  | ) | 
|  |  | 
|  | // FixTimestamp adjusts the given timestamp for storage in Firestore. Firestore | 
|  | // only supports microsecond precision, and we always want to store UTC. | 
|  | func FixTimestamp(t time.Time) time.Time { | 
|  | return t.UTC().Truncate(TS_RESOLUTION) | 
|  | } | 
|  |  | 
|  | // AlphaNumID generates a fixed-length alphanumeric document ID using | 
|  | // crypto/rand. Panics if crypto/rand fails to generate random bytes, eg. it | 
|  | // cannot read from /dev/urandom. | 
|  | // | 
|  | // Motivation: the Go client library for Firestore generates URL-safe base64- | 
|  | // encoded IDs, whic may not be desirable for all use cases (eg. passing an ID | 
|  | // which may contain a '-' on the command line would require some escaping). | 
|  | func AlphaNumID() string { | 
|  | bytes := make([]byte, ID_LEN) | 
|  | if _, err := rand.Read(bytes); err != nil { | 
|  | panic(fmt.Sprintf("crypto/rand.Read error: %v", err)) | 
|  | } | 
|  | for idx := range bytes { | 
|  | bytes[idx] = alphaNum[bytes[idx]%byte(len(alphaNum))] | 
|  | } | 
|  | return string(bytes) | 
|  | } | 
|  |  | 
|  | // DocumentRefSlice is a slice of DocumentRefs, used for sorting. | 
|  | type DocumentRefSlice []*firestore.DocumentRef | 
|  |  | 
|  | func (s DocumentRefSlice) Len() int           { return len(s) } | 
|  | func (s DocumentRefSlice) Less(i, j int) bool { return s[i].Path < s[j].Path } | 
|  | func (s DocumentRefSlice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] } | 
|  |  | 
|  | // Client is a Cloud Firestore client which enforces separation of app/instance | 
|  | // data via separate collections and documents. All references to collections | 
|  | // and documents are automatically prefixed with the app name as the top-level | 
|  | // collection and instance name as the parent document. | 
|  | type Client struct { | 
|  | *firestore.Client | 
|  | ParentDoc *firestore.DocumentRef | 
|  |  | 
|  | activeOps      map[int64]string | 
|  | activeOpsCount metrics2.Int64Metric | 
|  | activeOpsId    int64 // Incremented every time we run a transaction. | 
|  | activeOpsMtx   sync.RWMutex | 
|  |  | 
|  | // counters is a cache of Metrics2.Counters to the number of operations and queries for reads | 
|  | // and writes. We need to cache it here because multiple calls to the same GetCounter() would | 
|  | // give different pointers to the same underlying int, which is undesirable. | 
|  | counters     map[counterKey]metrics2.Counter | 
|  | countersMtx  sync.Mutex | 
|  | errorMetrics map[string]metrics2.Counter | 
|  | metricTags   map[string]string | 
|  | } | 
|  |  | 
|  | // counterKey is the key to the cache map of metrics counters. | 
|  | type counterKey struct { | 
|  | Operation string | 
|  | Count     string | 
|  | Path      string | 
|  | FileName  string | 
|  | FuncName  string | 
|  | } | 
|  |  | 
|  | // NewClient returns a Cloud Firestore client which enforces separation of app/ | 
|  | // instance data via separate collections and documents. All references to | 
|  | // collections and documents are automatically prefixed with the app name as the | 
|  | // top-level collection and instance name as the parent document. | 
|  | func NewClient(ctx context.Context, project, app, instance string, ts oauth2.TokenSource) (*Client, error) { | 
|  | if project == "" { | 
|  | return nil, errors.New("Project name is required.") | 
|  | } | 
|  | if app == "" { | 
|  | return nil, errors.New("App name is required.") | 
|  | } | 
|  | if instance == "" { | 
|  | return nil, errors.New("Instance name is required.") | 
|  | } | 
|  | client, err := firestore.NewClient(ctx, project, option.WithTokenSource(ts)) | 
|  | if err != nil { | 
|  | return nil, err | 
|  | } | 
|  | metricTags := map[string]string{ | 
|  | "project":  project, | 
|  | "app":      app, | 
|  | "instance": instance, | 
|  | } | 
|  | errorMetrics := make(map[string]metrics2.Counter, len(retryErrors)) | 
|  | for _, code := range retryErrors { | 
|  | errorMetrics[code.String()] = metrics2.GetCounter("firestore_retryable_errors", metricTags, map[string]string{ | 
|  | "error": code.String(), | 
|  | }) | 
|  | } | 
|  |  | 
|  | c := &Client{ | 
|  | Client:         client, | 
|  | ParentDoc:      client.Collection(app).Doc(instance), | 
|  | activeOps:      map[int64]string{}, | 
|  | activeOpsCount: metrics2.GetInt64Metric("firestore_ops_active", metricTags), | 
|  | counters:       map[counterKey]metrics2.Counter{}, | 
|  | errorMetrics:   errorMetrics, | 
|  | metricTags:     metricTags, | 
|  | } | 
|  | go util.RepeatCtx(ctx, time.Minute, func(ctx context.Context) { | 
|  | c.activeOpsMtx.RLock() | 
|  | ids := make([]int64, 0, len(c.activeOps)) | 
|  | for id := range c.activeOps { | 
|  | ids = append(ids, id) | 
|  | } | 
|  | sort.Sort(util.Int64Slice(ids)) | 
|  | ops := strings.Builder{} | 
|  | for _, id := range ids { | 
|  | _, _ = fmt.Fprintf(&ops, "\n%d\t%s", id, c.activeOps[id]) | 
|  | } | 
|  | c.activeOpsMtx.RUnlock() | 
|  | sklog.Debugf("Active operations (%d): %s", len(ids), ops.String()) | 
|  | }) | 
|  | return c, nil | 
|  | } | 
|  |  | 
|  | // NewClientForTesting returns a Client and ensures that it will connect to the | 
|  | // Firestore emulator. The Client's instance name will be randomized to ensure | 
|  | // concurrent tests don't interfere with each other. It also returns a | 
|  | // CleanupFunc that closes the Client. | 
|  | // | 
|  | // This function doesn't call unittest.RequiresFirestoreEmulator(t). See | 
|  | // //go/firestore/testutils for a version of NewClientForTesting that does. | 
|  | func NewClientForTesting(ctx context.Context, t sktest.TestingT) (*Client, util.CleanupFunc) { | 
|  | project := "test-project" | 
|  | app := "NewClientForTesting" | 
|  | instance := fmt.Sprintf("test-%s", uuid.New()) | 
|  | ctx, cancel := context.WithCancel(ctx) | 
|  | c, err := NewClient(ctx, project, app, instance, nil) | 
|  | if err != nil { | 
|  | cancel() | 
|  | t.Fatalf("Error creating test firestore.Client: %s", err) | 
|  | return nil, nil | 
|  | } | 
|  | return c, func() { | 
|  | if err := c.Close(); err != nil { | 
|  | t.Fatalf("Error closing test firestore.Client: %s", err) | 
|  | } | 
|  | cancel() | 
|  | } | 
|  | } | 
|  |  | 
|  | // recordOp adds a transaction to the active transactions map. Returns | 
|  | // a func which should be deferred until the transaction is finished. | 
|  | func (c *Client) recordOp(opName, detail string) func() { | 
|  | t := metrics2.NewTimer("firestore_ops", c.metricTags, map[string]string{ | 
|  | "op": opName, | 
|  | }) | 
|  | c.activeOpsMtx.Lock() | 
|  | defer c.activeOpsMtx.Unlock() | 
|  | id := c.activeOpsId | 
|  | c.activeOps[id] = opName + ": " + detail | 
|  | c.activeOpsId++ | 
|  | c.activeOpsCount.Update(int64(len(c.activeOps))) | 
|  | return func() { | 
|  | c.activeOpsMtx.Lock() | 
|  | defer c.activeOpsMtx.Unlock() | 
|  | delete(c.activeOps, id) | 
|  | c.activeOpsCount.Update(int64(len(c.activeOps))) | 
|  | t.Stop() | 
|  | } | 
|  | } | 
|  |  | 
|  | // getCounterHelper returns a read/write row or query metric for the given path. | 
|  | // The caller should hold c.countersMtx. | 
|  | func (c *Client) getCounterHelper(op, count, path, file, fnName string) metrics2.Counter { | 
|  | key := counterKey{ | 
|  | Operation: op, | 
|  | Count:     count, | 
|  | Path:      path, | 
|  | FileName:  file, | 
|  | FuncName:  fnName, | 
|  | } | 
|  | counter, ok := c.counters[key] | 
|  | if !ok { | 
|  | counter = metrics2.GetCounter("firestore_ops_count", c.metricTags, map[string]string{ | 
|  | "op":    op, | 
|  | "count": count, | 
|  | "path":  path, | 
|  | "file":  file, | 
|  | "func":  fnName, | 
|  | }) | 
|  | c.counters[key] = counter | 
|  | } | 
|  | return counter | 
|  | } | 
|  |  | 
|  | // getCounters returns a read/write row and query metric for the given path. | 
|  | func (c *Client) getCounters(op, path, file, fnName string) (metrics2.Counter, metrics2.Counter) { | 
|  | path = strings.TrimPrefix(path, c.ParentDoc.Path) | 
|  | path = strings.TrimPrefix(path, "/") | 
|  | path = strings.Split(path, "/")[0] | 
|  | c.countersMtx.Lock() | 
|  | defer c.countersMtx.Unlock() | 
|  | return c.getCounterHelper(op, opCountQueries, path, file, fnName), c.getCounterHelper(op, opCountRows, path, file, fnName) | 
|  | } | 
|  |  | 
|  | // countReadRows increments the "rows read" metric counter for a path by the given amount. | 
|  | func (c *Client) countReadRows(path, file, fnName string, count int) { | 
|  | _, rows := c.getCounters(opTypeRead, path, file, fnName) | 
|  | rows.Inc(int64(count)) | 
|  | } | 
|  |  | 
|  | // countReadQuery increments the "read queries" metric counter for a path by one. | 
|  | func (c *Client) countReadQuery(path, file, fnName string) { | 
|  | queries, _ := c.getCounters(opTypeRead, path, file, fnName) | 
|  | queries.Inc(1) | 
|  | } | 
|  |  | 
|  | // CountReadQueryAndRows increments the metric counters for a path. The "read queries" metric will | 
|  | // be incremented by one and the "rows read" by the amount given. This should be done when a client | 
|  | // does some amount of reading from firestore w/o using the helpers in this package (e.g. GetAll). | 
|  | func (c *Client) CountReadQueryAndRows(path string, rowCount int) { | 
|  | file, fnName := callerHelper() | 
|  | c.countReadQueryAndRows(path, file, fnName, rowCount) | 
|  | } | 
|  |  | 
|  | // countReadQueryAndRows is the private version of CountReadQueryAndRows; it takes a file, fnName | 
|  | // that is passed down from the entry-level API call. | 
|  | func (c *Client) countReadQueryAndRows(path, file, fnName string, rowCount int) { | 
|  | queries, rows := c.getCounters(opTypeRead, path, file, fnName) | 
|  | queries.Inc(1) | 
|  | rows.Inc(int64(rowCount)) | 
|  | } | 
|  |  | 
|  | // CountWriteQueryAndRows increments the metric counters for a path. The "write queries" metric will | 
|  | // be incremented by one and the "rows written" by the amount given. This should be done when a | 
|  | // client does some amount of writing to firestore w/o using the helpers in this package (or uses | 
|  | // BatchWrite). | 
|  | func (c *Client) CountWriteQueryAndRows(path string, rowCount int) { | 
|  | file, fnName := callerHelper() | 
|  | c.countWriteQueryAndRows(path, file, fnName, rowCount) | 
|  | } | 
|  |  | 
|  | // countWriteQueryAndRows is the private version of CountWriteQueryAndRows; it takes a file, fnName | 
|  | // that is passed down from the entry-level API call. | 
|  | func (c *Client) countWriteQueryAndRows(path, file, fnName string, rowCount int) { | 
|  | queries, rows := c.getCounters(opTypeWrite, path, file, fnName) | 
|  | queries.Inc(1) | 
|  | rows.Inc(int64(rowCount)) | 
|  | } | 
|  |  | 
|  | // See documentation for firestore.Client. | 
|  | func (c *Client) Collection(path string) *firestore.CollectionRef { | 
|  | return c.ParentDoc.Collection(path) | 
|  | } | 
|  |  | 
|  | // See documentation for firestore.Client. | 
|  | func (c *Client) Collections(ctx context.Context) *firestore.CollectionIterator { | 
|  | return c.ParentDoc.Collections(ctx) | 
|  | } | 
|  |  | 
|  | // See documentation for firestore.Client. | 
|  | func (c *Client) Doc(path string) *firestore.DocumentRef { | 
|  | split := strings.Split(path, "/") | 
|  | if len(split) < 2 { | 
|  | return nil | 
|  | } | 
|  | return c.ParentDoc.Collection(split[0]).Doc(strings.Join(split[1:], "/")) | 
|  | } | 
|  |  | 
|  | // withTimeout runs the given function with the given timeout. | 
|  | func withTimeout(ctx context.Context, timeout time.Duration, fn func(context.Context) error) error { | 
|  | ctx, cancel := context.WithTimeout(ctx, timeout) | 
|  | defer cancel() | 
|  | return fn(ctx) | 
|  | } | 
|  |  | 
|  | // withTimeoutAndRetries runs the given function with the given timeout and a | 
|  | // maximum of the given number of attempts. The timeout is applied for each | 
|  | // attempt. | 
|  | func (c *Client) withTimeoutAndRetries(ctx context.Context, attempts int, timeout time.Duration, fn func(context.Context) error) error { | 
|  | var err error | 
|  | for i := 0; i < attempts; i++ { | 
|  | // Do not retry on e.g. a cancelled context. | 
|  | if ctx.Err() != nil { | 
|  | return ctx.Err() | 
|  | } | 
|  |  | 
|  | err = withTimeout(ctx, timeout, fn) | 
|  | unwrapped := skerr.Unwrap(err) | 
|  | if err == nil { | 
|  | return nil | 
|  | } else if ctx.Err() != nil { | 
|  | // Do not retry if the passed-in context is expired. | 
|  | return ctx.Err() | 
|  | } else if st, ok := status.FromError(unwrapped); ok { | 
|  | // Retry if we encountered a retryable error code. | 
|  | code := st.Code() | 
|  | retry := false | 
|  | for _, retryCode := range retryErrors { | 
|  | if code == retryCode { | 
|  | retry = true | 
|  | c.errorMetrics[code.String()].Inc(1) | 
|  | break | 
|  | } | 
|  | } | 
|  | if !retry { | 
|  | return err | 
|  | } | 
|  | } else if unwrapped != context.DeadlineExceeded { | 
|  | // withTimeout uses context.WithDeadline to implement | 
|  | // timeouts, therefore we may have received the | 
|  | // DeadlineExceeded error from that context, while the | 
|  | // passed-in parent context is still valid. We want to | 
|  | // retry in that case, otherwise we stop here. | 
|  | return err | 
|  | } | 
|  | wait := BACKOFF_WAIT * time.Duration(2^i) | 
|  | sklog.Errorf("Encountered Firestore error; retrying in %s: %s", wait, err) | 
|  | time.Sleep(wait) | 
|  | } | 
|  | // Note that we could collect the errors using multierror, but that | 
|  | // would break some behavior which relies on pointer equality | 
|  | // (eg. err == ErrConcurrentUpdate). | 
|  | return err | 
|  | } | 
|  |  | 
|  | // Get retrieves the given document, using the given timeout and maximum number | 
|  | // of attempts. Returns (nil, nil) if the document does not exist. Uses the | 
|  | // given maximum number of attempts and the given per-attempt timeout. | 
|  | func (c *Client) Get(ctx context.Context, ref *firestore.DocumentRef, attempts int, timeout time.Duration) (*firestore.DocumentSnapshot, error) { | 
|  | file, fnName := callerHelper() | 
|  | defer c.recordOp("Get", ref.Path)() | 
|  | var doc *firestore.DocumentSnapshot | 
|  | err := c.withTimeoutAndRetries(ctx, attempts, timeout, func(ctx context.Context) error { | 
|  | c.countReadQueryAndRows(ref.Path, file, fnName, 1) | 
|  | got, err := ref.Get(ctx) | 
|  | if err == nil { | 
|  | doc = got | 
|  | } | 
|  | return err | 
|  | }) | 
|  | return doc, err | 
|  | } | 
|  |  | 
|  | // iterDocsInner is a helper function used by IterDocs which facilitates testing. | 
|  | func (c *Client) iterDocsInner(ctx context.Context, query firestore.Query, attempts int, timeout time.Duration, file, fnName string, callback func(*firestore.DocumentSnapshot) error, ranTooLong func(time.Time) bool) (int, error) { | 
|  | numRestarts := 0 | 
|  | var lastSeen *firestore.DocumentSnapshot | 
|  | for { | 
|  | started := time.Now() | 
|  | err := c.withTimeoutAndRetries(ctx, attempts, timeout, func(ctx context.Context) error { | 
|  | q := query | 
|  | if lastSeen != nil { | 
|  | q = q.StartAfter(lastSeen) | 
|  | } | 
|  | it := q.Documents(ctx) | 
|  | defer it.Stop() | 
|  | first := true | 
|  | for { | 
|  | doc, err := it.Next() | 
|  | if err == iterator.Done { | 
|  | break | 
|  | } else if err != nil { | 
|  | return err | 
|  | } | 
|  | // Query doesn't have a path associated with it, but we'd like to | 
|  | // record metrics. Use the path of the parent of the first found doc. | 
|  | if first { | 
|  | c.countReadQueryAndRows(doc.Ref.Parent.Path, file, fnName, 1) | 
|  | first = false | 
|  | } else { | 
|  | c.countReadRows(doc.Ref.Parent.Path, file, fnName, 1) | 
|  | } | 
|  | if err := callback(doc); err != nil { | 
|  | return err | 
|  | } | 
|  | lastSeen = doc | 
|  | if ranTooLong(started) { | 
|  | sklog.Debugf("Iterated for longer than %s; pausing to avoid timeouts.", MAX_ITER_TIME) | 
|  | return errIterTooLong | 
|  | } | 
|  | } | 
|  | return nil | 
|  | }) | 
|  | if err == nil { | 
|  | return numRestarts, nil | 
|  | } else if err != errIterTooLong { | 
|  | return numRestarts, err | 
|  | } | 
|  | numRestarts++ | 
|  | sklog.Debugf("Resuming iteration after %s", lastSeen.Ref.Path) | 
|  | } | 
|  | } | 
|  |  | 
|  | // IterDocs is a convenience function which executes the given query and calls | 
|  | // the given callback function for each document. Uses the given maximum number | 
|  | // of attempts and the given per-attempt timeout. IterDocs automatically stops | 
|  | // iterating after enough time has passed and re-issues the query, continuing | 
|  | // where it left off. This is to avoid server-side timeouts resulting from | 
|  | // iterating a large number of results. Note that this behavior may result in | 
|  | // individual results coming from inconsistent snapshots. | 
|  | func (c *Client) IterDocs(ctx context.Context, name, detail string, query firestore.Query, attempts int, timeout time.Duration, callback func(*firestore.DocumentSnapshot) error) error { | 
|  | file, fnName := callerHelper() | 
|  | return c.iterDocs(ctx, name, detail, file, fnName, query, attempts, timeout, callback) | 
|  | } | 
|  |  | 
|  | // iterDocs is the private version of IterDocs; it takes a file, fnName that is passed down from | 
|  | // the entry-level API call. | 
|  | func (c *Client) iterDocs(ctx context.Context, name, detail, file, fnName string, query firestore.Query, attempts int, timeout time.Duration, callback func(*firestore.DocumentSnapshot) error) error { | 
|  | defer c.recordOp(name, detail)() | 
|  | _, err := c.iterDocsInner(ctx, query, attempts, timeout, file, fnName, callback, func(started time.Time) bool { | 
|  | return time.Now().Sub(started) > MAX_ITER_TIME | 
|  | }) | 
|  | return err | 
|  | } | 
|  |  | 
|  | // IterDocsInParallel is a convenience function which executes the given queries | 
|  | // in multiple goroutines and calls the given callback function for each | 
|  | // document. Uses the maximum number of attempts and the given per-attempt | 
|  | // timeout for each goroutine. Each callback includes the goroutine index. | 
|  | // IterDocsInParallel automatically stops iterating after enough time has passed | 
|  | // and re-issues the query, continuing where it left off. This is to avoid | 
|  | // server-side timeouts resulting from iterating a large number of results. Note | 
|  | // that this behavior may result in individual results coming from inconsistent | 
|  | // snapshots. | 
|  | func (c *Client) IterDocsInParallel(ctx context.Context, name, detail string, queries []firestore.Query, attempts int, timeout time.Duration, callback func(int, *firestore.DocumentSnapshot) error) error { | 
|  | file, fnName := callerHelper() | 
|  | var wg sync.WaitGroup | 
|  | errs := make([]error, len(queries)) | 
|  | for idx, query := range queries { | 
|  | wg.Add(1) | 
|  | go func(idx int, query firestore.Query) { | 
|  | defer wg.Done() | 
|  | errs[idx] = c.iterDocs(ctx, name, fmt.Sprintf("%s (shard %d)", detail, idx), file, fnName, query, attempts, timeout, func(doc *firestore.DocumentSnapshot) error { | 
|  | return callback(idx, doc) | 
|  | }) | 
|  | }(idx, query) | 
|  | } | 
|  | wg.Wait() | 
|  | for _, err := range errs { | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  | } | 
|  | return nil | 
|  | } | 
|  |  | 
|  | // RunTransaction runs the given function in a transaction. Uses the given | 
|  | // maximum number of attempts and the given per-attempt timeout. | 
|  | func (c *Client) RunTransaction(ctx context.Context, name, detail string, attempts int, timeout time.Duration, fn func(context.Context, *firestore.Transaction) error) error { | 
|  | defer c.recordOp(name, detail)() | 
|  | return c.withTimeoutAndRetries(ctx, attempts, timeout, func(ctx context.Context) error { | 
|  | return c.Client.RunTransaction(ctx, fn) | 
|  | }) | 
|  | } | 
|  |  | 
|  | // See documentation for firestore.DocumentRef.Create(). Uses the given maximum | 
|  | // number of attempts and the given per-attempt timeout. | 
|  | func (c *Client) Create(ctx context.Context, ref *firestore.DocumentRef, data interface{}, attempts int, timeout time.Duration) (*firestore.WriteResult, error) { | 
|  | file, fnName := callerHelper() | 
|  | defer c.recordOp("Create", ref.Path)() | 
|  | var wr *firestore.WriteResult | 
|  | err := c.withTimeoutAndRetries(ctx, attempts, timeout, func(ctx context.Context) error { | 
|  | c.countWriteQueryAndRows(ref.Path, file, fnName, 1) | 
|  | var err error | 
|  | wr, err = ref.Create(ctx, data) | 
|  | return err | 
|  | }) | 
|  | return wr, err | 
|  | } | 
|  |  | 
|  | // See documentation for firestore.DocumentRef.Set(). Uses the given maximum | 
|  | // number of attempts and the given per-attempt timeout. | 
|  | func (c *Client) Set(ctx context.Context, ref *firestore.DocumentRef, data interface{}, attempts int, timeout time.Duration, opts ...firestore.SetOption) (*firestore.WriteResult, error) { | 
|  | file, fnName := callerHelper() | 
|  | defer c.recordOp("Set", ref.Path)() | 
|  | var wr *firestore.WriteResult | 
|  | err := c.withTimeoutAndRetries(ctx, attempts, timeout, func(ctx context.Context) error { | 
|  | c.countWriteQueryAndRows(ref.Path, file, fnName, 1) | 
|  | var err error | 
|  | wr, err = ref.Set(ctx, data, opts...) | 
|  | return err | 
|  | }) | 
|  | return wr, err | 
|  | } | 
|  |  | 
|  | // See documentation for firestore.DocumentRef.Update(). Uses the given maximum | 
|  | // number of attempts and the given per-attempt timeout. | 
|  | func (c *Client) Update(ctx context.Context, ref *firestore.DocumentRef, attempts int, timeout time.Duration, updates []firestore.Update, preconds ...firestore.Precondition) (*firestore.WriteResult, error) { | 
|  | file, fnName := callerHelper() | 
|  | defer c.recordOp("Update", ref.Path)() | 
|  | var wr *firestore.WriteResult | 
|  | err := c.withTimeoutAndRetries(ctx, attempts, timeout, func(ctx context.Context) error { | 
|  | c.countWriteQueryAndRows(ref.Path, file, fnName, 1) | 
|  | var err error | 
|  | wr, err = ref.Update(ctx, updates, preconds...) | 
|  | return err | 
|  | }) | 
|  | return wr, err | 
|  | } | 
|  |  | 
|  | // See documentation for firestore.DocumentRef.Delete(). Uses the given maximum | 
|  | // number of attempts and the given per-attempt timeout. | 
|  | func (c *Client) Delete(ctx context.Context, ref *firestore.DocumentRef, attempts int, timeout time.Duration, preconds ...firestore.Precondition) (*firestore.WriteResult, error) { | 
|  | file, fnName := callerHelper() | 
|  | return c.delete(ctx, ref, attempts, timeout, file, fnName, preconds...) | 
|  | } | 
|  |  | 
|  | // delete is the private version of Delete; it takes a file, fnName that is passed down from | 
|  | // the entry-level API call. | 
|  | func (c *Client) delete(ctx context.Context, ref *firestore.DocumentRef, attempts int, timeout time.Duration, file, fnName string, preconds ...firestore.Precondition) (*firestore.WriteResult, error) { | 
|  | defer c.recordOp("Delete", ref.Path)() | 
|  | var wr *firestore.WriteResult | 
|  | err := c.withTimeoutAndRetries(ctx, attempts, timeout, func(ctx context.Context) error { | 
|  | c.countWriteQueryAndRows(ref.Path, file, fnName, 1) | 
|  | var err error | 
|  | wr, err = ref.Delete(ctx, preconds...) | 
|  | return err | 
|  | }) | 
|  | return wr, err | 
|  | } | 
|  |  | 
|  | // RecurseDocs runs the given func for every descendent of the given document. | 
|  | // This includes missing documents, ie. those which do not exist but have sub- | 
|  | // documents. The func is run for leaf documents before their parents. This | 
|  | // function does nothing to account for documents which may be added or modified | 
|  | // while it is running. | 
|  | func (c *Client) RecurseDocs(ctx context.Context, name string, ref *firestore.DocumentRef, attempts int, timeout time.Duration, fn func(*firestore.DocumentRef) error) error { | 
|  | file, fnName := callerHelper() | 
|  | defer c.recordOp(name, ref.Path)() | 
|  | return c.recurseDocs(ctx, ref, attempts, timeout, file, fnName, fn) | 
|  | } | 
|  |  | 
|  | // recurseDocs is a recursive helper function used by RecurseDocs. | 
|  | func (c *Client) recurseDocs(ctx context.Context, ref *firestore.DocumentRef, attempts int, timeout time.Duration, file, fnName string, fn func(*firestore.DocumentRef) error) error { | 
|  | // The Firestore emulator does not correctly handle subcollection queries. | 
|  | EnsureNotEmulator() | 
|  | // TODO(borenet): Should we pause and resume like we do in IterDocs? | 
|  | colls := map[string]*firestore.CollectionRef{} | 
|  | if err := c.withTimeoutAndRetries(ctx, attempts, timeout, func(ctx context.Context) error { | 
|  | c.countReadQuery(ref.Path, file, fnName) | 
|  | it := ref.Collections(ctx) | 
|  | for { | 
|  | coll, err := it.Next() | 
|  | if err == iterator.Done { | 
|  | break | 
|  | } else if err != nil { | 
|  | return err | 
|  | } | 
|  | c.countReadRows(ref.Path, file, fnName, 1) | 
|  | colls[coll.Path] = coll | 
|  | } | 
|  | return nil | 
|  | }); err != nil { | 
|  | return err | 
|  | } | 
|  | for _, coll := range colls { | 
|  | if err := c.withTimeoutAndRetries(ctx, attempts, timeout, func(ctx context.Context) error { | 
|  | c.countReadQuery(ref.Path, file, fnName) | 
|  | it := coll.DocumentRefs(ctx) | 
|  | for { | 
|  | doc, err := it.Next() | 
|  | if err == iterator.Done { | 
|  | break | 
|  | } else if err != nil { | 
|  | return err | 
|  | } | 
|  | c.countReadRows(ref.Path, file, fnName, 1) | 
|  | if err := c.recurseDocs(ctx, doc, attempts, timeout, file, fnName, fn); err != nil { | 
|  | return err | 
|  | } | 
|  | } | 
|  | return nil | 
|  | }); err != nil { | 
|  | return err | 
|  | } | 
|  | } | 
|  | return fn(ref) | 
|  | } | 
|  |  | 
|  | // GetAllDescendantDocuments returns a slice of DocumentRefs for every | 
|  | // descendant of the given Document. This includes missing documents, ie. those | 
|  | // which do not exist but have sub-documents. This function does nothing to | 
|  | // account for documents which may be added or modified while it is running. | 
|  | func (c *Client) GetAllDescendantDocuments(ctx context.Context, ref *firestore.DocumentRef, attempts int, timeout time.Duration) ([]*firestore.DocumentRef, error) { | 
|  | file, fnName := callerHelper() | 
|  | defer c.recordOp("GetAllDescendantDocuments", ref.Path)() | 
|  | rv := []*firestore.DocumentRef{} | 
|  | if err := c.recurseDocs(ctx, ref, attempts, timeout, file, fnName, func(doc *firestore.DocumentRef) error { | 
|  | // Don't include the passed-in doc. | 
|  | if doc.Path != ref.Path { | 
|  | rv = append(rv, doc) | 
|  | } | 
|  | return nil | 
|  | }); err != nil { | 
|  | return nil, err | 
|  | } | 
|  | sort.Sort(DocumentRefSlice(rv)) | 
|  | return rv, nil | 
|  | } | 
|  |  | 
|  | // RecursiveDelete deletes the given document and all of its descendant | 
|  | // documents and collections. The given maximum number of attempts and the given | 
|  | // per-attempt timeout apply for each delete operation, as opposed to the whole | 
|  | // series of operations. This function does nothing to account for documents | 
|  | // which may be added or modified while it is running. | 
|  | func (c *Client) RecursiveDelete(ctx context.Context, ref *firestore.DocumentRef, attempts int, timeout time.Duration) error { | 
|  | file, fnName := callerHelper() | 
|  | defer c.recordOp("RecursiveDelete", ref.Path)() | 
|  | return c.recurseDocs(ctx, ref, attempts, timeout, file, fnName, func(ref *firestore.DocumentRef) error { | 
|  | _, err := c.delete(ctx, ref, attempts, timeout, file, fnName) | 
|  | return err | 
|  | }) | 
|  | } | 
|  |  | 
|  | // EnsureNotEmulator will panic if it detects the Firestore Emulator is configured. | 
|  | // Trying to authenticate to the emulator results in errors like: | 
|  | // "Failed to initialize Cloud Datastore: dialing: options.WithoutAuthentication | 
|  | // is incompatible with any option that provides credentials" | 
|  | func EnsureNotEmulator() { | 
|  | if os.Getenv(EmulatorEnvVar) != "" { | 
|  | panic("Firestore Emulator detected. Be sure to unset the following environment variable: " + EmulatorEnvVar) | 
|  | } | 
|  | } | 
|  |  | 
|  | // QuerySnapshotChannel is a helper for firestore.QuerySnapshotIterator which | 
|  | // passes each QuerySnapshot along the returned channel. QuerySnapshotChannel | 
|  | // returns the channel immediately but spins up a goroutine which runs | 
|  | // indefinitely or until an error occurs, in which case the channel is closed | 
|  | // and an error is logged. | 
|  | // | 
|  | // A couple of notes: | 
|  | // | 
|  | //  1. QuerySnapshotIterator immediately produces a QuerySnapshot containing all | 
|  | //     of the current results for the query, then blocks until those results | 
|  | //     change. QuerySnapshot.Changes contains the changes since the last snapshot | 
|  | //     (and will therefore be empty on the first snapshot), while its Documents | 
|  | //     field is an iterator which will obtain all of the updated results. | 
|  | //  2. If the consumer of the QuerySnapshotChannel is slower than the | 
|  | //     QuerySnapshotIterator, the most recent snapshot will get stuck waiting to | 
|  | //     be passed along the channel and thus may be out of date by the time the | 
|  | //     consumer sees it. If your consumer is slow, consider adding a buffered | 
|  | //     channel as an intermediate, or a goroutine to collect batches of snapshots | 
|  | //     to be processed. | 
|  | func QuerySnapshotChannel(ctx context.Context, q firestore.Query) <-chan *firestore.QuerySnapshot { | 
|  | ch := make(chan *firestore.QuerySnapshot) | 
|  | go func() { | 
|  | iter := q.Snapshots(ctx) | 
|  | for { | 
|  | qsnap, err := iter.Next() | 
|  | if err != nil { | 
|  | if ctx.Err() == context.Canceled { | 
|  | sklog.Warningf("Context canceled; closing QuerySnapshotChannel: %s", err) | 
|  | } else if st, ok := status.FromError(err); ok && st.Code() == codes.Canceled { | 
|  | sklog.Warningf("Context canceled; closing QuerySnapshotChannel: %s", err) | 
|  | } else { | 
|  | sklog.Errorf("QuerySnapshotIterator returned error; closing QuerySnapshotChannel: %s", err) | 
|  | } | 
|  | iter.Stop() | 
|  | close(ch) | 
|  | return | 
|  | } | 
|  | ch <- qsnap | 
|  | } | 
|  | }() | 
|  | return ch | 
|  | } | 
|  |  | 
|  | // BatchWrite executes breaks a large amount of writes into batches of the given size and commits | 
|  | // them, using retries that backoff exponentially up to the maxWriteTime. If a single batch fails, | 
|  | // even with backoff, an error is returned, without attempting any further batches and without | 
|  | // rolling back the previous successful batches (at present, rollback of previous batches is | 
|  | // impossible to do correctly in the general case). Callers should also call CountWriteQueryAndRows | 
|  | // to update counts of the affected collection(s). | 
|  | func (c *Client) BatchWrite(ctx context.Context, total, batchSize int, maxWriteTime time.Duration, startBatch *firestore.WriteBatch, fn func(b *firestore.WriteBatch, i int) error) error { | 
|  | if batchSize > MAX_TRANSACTION_DOCS { | 
|  | return skerr.Fmt("Batch size %d exceeds the Firestore maximum of %d", batchSize, MAX_TRANSACTION_DOCS) | 
|  | } | 
|  | if startBatch == nil && total == 0 { | 
|  | // Unless the user has given us a startBatch, having 0 elements to write is a no-op. | 
|  | // Otherwise, we want to iterate over our 0 elements and then Commit() the startBatch | 
|  | // using the exponential backoff. | 
|  | return nil | 
|  | } | 
|  | b := startBatch | 
|  | if b == nil { | 
|  | b = c.Batch() | 
|  | } | 
|  | return util.ChunkIter(total, batchSize, func(start, stop int) error { | 
|  | for i := start; i < stop; i++ { | 
|  | if err := ctx.Err(); err != nil { | 
|  | // context has been cancelled or otherwise ended - stop work. | 
|  | return err | 
|  | } | 
|  | if err := fn(b, i); err != nil { | 
|  | return err | 
|  | } | 
|  | } | 
|  |  | 
|  | exp := &backoff.ExponentialBackOff{ | 
|  | InitialInterval:     time.Second, | 
|  | RandomizationFactor: 0.5, | 
|  | Multiplier:          2, | 
|  | MaxInterval:         maxWriteTime / 4, | 
|  | MaxElapsedTime:      maxWriteTime, | 
|  | Clock:               backoff.SystemClock, | 
|  | } | 
|  |  | 
|  | o := func() error { | 
|  | // If the context is bad, retrying won't help, so just end. | 
|  | if err := ctx.Err(); err != nil { | 
|  | return backoff.Permanent(err) | 
|  | } | 
|  | _, err := b.Commit(ctx) | 
|  | return err | 
|  | } | 
|  |  | 
|  | if err := backoff.Retry(o, exp); err != nil { | 
|  | // We really hope this doesn't happen, as it may leave the data in a partially | 
|  | // broken state. | 
|  | return skerr.Wrapf(err, "committing batch [%d, %d], even with exponential retry", start, stop) | 
|  | } | 
|  | // Go on to the next batch, if needed. | 
|  | if stop < total { | 
|  | b = c.Batch() | 
|  | } | 
|  | return nil | 
|  | }) | 
|  | } | 
|  |  | 
|  | // callerHelper returns the file and function name of the caller's caller. This should only be used | 
|  | // in functions that are in the public API (so that the returned function/file are outside of this | 
|  | // package). | 
|  | func callerHelper() (string, string) { | 
|  | file, fnName := "", "" | 
|  | if pc, f, _, ok := runtime.Caller(2); ok { | 
|  | file = f | 
|  | fn := runtime.FuncForPC(pc) | 
|  | if fn != nil { | 
|  | fnName = fn.Name() | 
|  | } | 
|  | } | 
|  | return file, fnName | 
|  | } |