| package ds_expstore |
| |
| import ( |
| "context" |
| "fmt" |
| "sort" |
| "strconv" |
| "sync" |
| |
| "cloud.google.com/go/datastore" |
| "go.skia.org/infra/go/ds" |
| "go.skia.org/infra/go/eventbus" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/util" |
| "go.skia.org/infra/golden/go/dsutil" |
| "go.skia.org/infra/golden/go/expstorage" |
| "go.skia.org/infra/golden/go/types" |
| "golang.org/x/sync/errgroup" |
| ) |
| |
| // DSExpStore implements the ExpectationsStore interface with the |
| // Google Cloud Datastore as its backend. |
| // Since the difference between storing expectations for the master branch and |
| // the tryjobs of a Gerrit CL is only the inclusion of an issue id, it also |
| // simultaneously supports storing expectations for Gerrit issues using |
| // the same interface and based on the same storage client. |
| // |
| // To separate concerns, we store overall expectations (i.e. for the master branch) |
| // in the ds.MASTER_EXP_CHANGE and ds.MASTER_TEST_DIGEST_EXP entities. |
| // Expectations for Gerrit issues are stored in the ds.TRYJOB_EXP_CHANGE and |
| // ds.TRYJOB_TEST_DIGEST_EXP entities. |
| // |
| // We use instances of TDESlice to record both, expectations and expectation changes. |
| // These are usually stored as child entities. |
| // |
| // Expectation changes are stored as immutable, timestamped instances of |
| // 'ExpChange', which then act as parents to instances of TDESlice. |
| // |
| // Expectations are stored as children of summary entities. We maintain one |
| // summary entity for each logical expectation store, i.e. one for the master |
| // and one for each Gerrit issue. |
| // The summary entities also keep track of recently added expectation changes |
| // to provide a consistent listing. |
| // |
| // When expectation change, events of type EV_EXPSTORAGE_CHANGED and |
| // EV_TRYJOB_EXP_CHANGED are fired for the master branch and Gerrit issues |
| // respectively. |
| // Both events contain instances of EventExpectationChange as their payload. |
| // |
| type DSExpStore struct { |
| // issueID is the id of the Gerrit issue and <0 for other expectations, |
| // i.e. the master branch |
| issueID int64 |
| |
| client *datastore.Client |
| eventBus eventbus.EventBus |
| |
| // recentKeysList keeps track of recently added changes. This allows to |
| // provide a consistent listing of changes. |
| recentKeysList *dsutil.RecentKeysList |
| |
| // summaryKey is the key of the summary entity which stores the keys of |
| // recent changes and acts as the parent entity for overall expectations. |
| summaryKey *datastore.Key |
| |
| expectationsKey *datastore.Key |
| blobStore *dsutil.BlobStore |
| |
| // Use different entities depending on whether this manages expectations |
| // for the master or a Gerrit issue |
| changeKind ds.Kind |
| |
| // eventExpChange keeps track of which event to fire when the expectations change. |
| eventExpChange string |
| |
| // globalEvent keeps track whether we want to send a events within this instance |
| // or on a global event bus. |
| globalEvent bool |
| |
| // lastTS and tsMutex ensure that we get distinct timestamps with ms granularity |
| lastTS int64 |
| tsMutex sync.Mutex |
| } |
| |
| // ExpChange is used to store an expectation change in the database. Each |
| // expectation change is an atomic change to expectations for an issue. |
| // The actual expectations are captured in instances of TestDigestExp. |
| type ExpChange struct { |
| ChangeID *datastore.Key `datastore:"__key__"` |
| IssueID int64 |
| UserID string |
| TimeStamp int64 `datastore:",noindex"` |
| Count int64 `datastore:",noindex"` |
| UndoChangeID int64 |
| OK bool |
| ExpectationsBlob *datastore.Key `datastore:",noindex"` |
| } |
| |
| // expectationsState stores the state of expectations for either master or a Gerrit issue. |
| type expectationsState struct { |
| ExpectationsBlob *datastore.Key // key of the blob that stores expectations |
| } |
| |
| // DeprecatedNew returns an ExpectationsStore implementation based on |
| // Cloud Datastore for the master branch and a factory to create ExpectationsStore |
| // instances for Gerrit issues. The factory uses the same datastore client as the |
| // master store. |
| func DeprecatedNew(client *datastore.Client, eventBus eventbus.EventBus) (*DSExpStore, error) { |
| if client == nil { |
| return nil, sklog.FmtErrorf("Received nil for datastore client.") |
| } |
| |
| // Create the instance for the master and set the target entities for the |
| // master branch. |
| summaryKey := ds.NewKey(ds.HELPER_RECENT_KEYS) |
| summaryKey.Name = "expstorage-recent-keys-master" |
| expectationsKey := ds.NewKey(ds.EXPECTATIONS_BLOB_ROOT) |
| expectationsKey.Name = "expstorage-expectations-master" |
| blobStore := dsutil.NewBlobStore(client, ds.EXPECTATIONS_BLOB_ROOT, ds.EXPECTATIONS_BLOB) |
| |
| store := &DSExpStore{ |
| issueID: expstorage.MasterBranch, |
| changeKind: ds.MASTER_EXP_CHANGE, |
| eventExpChange: expstorage.EV_EXPSTORAGE_CHANGED, |
| globalEvent: true, |
| client: client, |
| eventBus: eventBus, |
| summaryKey: summaryKey, |
| expectationsKey: expectationsKey, |
| recentKeysList: dsutil.NewRecentKeysList(client, summaryKey, dsutil.DefaultConsistencyDelta), |
| blobStore: blobStore, |
| } |
| |
| // Check the connection to the cloud datastore and if we could load the |
| // expectations successfully. |
| _, _, err := store.loadCurrentExpectations(nil) |
| if err != nil { |
| return nil, sklog.FmtErrorf("Error in test call to the cloud datastore: %s", err) |
| } |
| return store, nil |
| } |
| |
| // ForIssue implements the ExpectationsStore interface. |
| func (c *DSExpStore) ForIssue(issueID int64) expstorage.ExpectationsStore { |
| summaryKey := ds.NewKey(ds.HELPER_RECENT_KEYS) |
| summaryKey.Name = fmt.Sprintf("expstorage-issue-%d", issueID) |
| expectationsKey := ds.NewKey(ds.EXPECTATIONS_BLOB_ROOT) |
| expectationsKey.Name = fmt.Sprintf("expstorage-expectations-issue-%d", issueID) |
| return &DSExpStore{ |
| issueID: issueID, |
| changeKind: ds.TRYJOB_EXP_CHANGE, |
| eventExpChange: expstorage.EV_TRYJOB_EXP_CHANGED, |
| globalEvent: false, |
| client: c.client, |
| eventBus: c.eventBus, |
| summaryKey: summaryKey, |
| expectationsKey: expectationsKey, |
| recentKeysList: dsutil.NewRecentKeysList(c.client, summaryKey, dsutil.DefaultConsistencyDelta), |
| blobStore: c.blobStore, |
| } |
| } |
| |
| // Get implements the ExpectationsStore interface. |
| func (c *DSExpStore) Get() (types.Expectations, error) { |
| expectations, _, err := c.loadCurrentExpectations(nil) |
| if err != nil { |
| return nil, sklog.FmtErrorf("Error retrieving expectations: %s", err) |
| } |
| return expectations, nil |
| } |
| |
| // AddChange implements the ExpectationsStore interface. |
| func (c *DSExpStore) AddChange(ctx context.Context, changes types.Expectations, userID string) error { |
| _, err := c.makeChange(ctx, changes, userID, c.getUniqueTimeStampMs(), 0, true) |
| return err |
| } |
| |
| // ImportChange bypasses the ExpectationStore interface to copy change records directly. |
| func (c *DSExpStore) ImportChange(ctx context.Context, changes types.Expectations, userID string, timeStamp int64) (*datastore.Key, error) { |
| return c.makeChange(ctx, changes, userID, timeStamp, 0, false) |
| } |
| |
| // QueryLog implements the ExpectationsStore interface. |
| func (c *DSExpStore) QueryLog(ctx context.Context, offset, size int, details bool) ([]expstorage.TriageLogEntry, int, error) { |
| allKeys, err := c.getExpChangeKeys(ctx, 0) |
| if err != nil { |
| return nil, 0, sklog.FmtErrorf("Error retrieving keys for expectation changes: %s", err) |
| } |
| |
| if offset < 0 { |
| offset = 0 |
| } |
| |
| if size <= 0 { |
| size = len(allKeys) |
| } |
| |
| start := util.MinInt(offset, len(allKeys)) |
| end := util.MinInt(start+size, len(allKeys)) |
| retKeys := allKeys[start:end] |
| |
| ret := make([]expstorage.TriageLogEntry, 0, len(retKeys)) |
| expChanges := make([]*ExpChange, len(retKeys)) |
| if err := c.client.GetMulti(ctx, retKeys, expChanges); err != nil { |
| return nil, 0, sklog.FmtErrorf("Error retrieving expectation changes: %s", err) |
| } |
| |
| for _, change := range expChanges { |
| ret = append(ret, expstorage.TriageLogEntry{ |
| ID: strconv.FormatInt(change.ChangeID.ID, 10), |
| Name: change.UserID, |
| TS: change.TimeStamp, |
| ChangeCount: int(change.Count), |
| Details: nil, |
| }) |
| } |
| |
| // If we want details fetch them in parallel. |
| var egroup errgroup.Group |
| var detailRecs [][]expstorage.TriageDetail |
| if details { |
| detailRecs = make([][]expstorage.TriageDetail, len(retKeys)) |
| for idx, expChange := range expChanges { |
| func(idx int, blobKey *datastore.Key) { |
| egroup.Go(func() error { |
| exp := types.Expectations{} |
| if err := c.blobStore.Load(blobKey, &exp); err != nil { |
| return err |
| } |
| |
| triageDetails := make([]expstorage.TriageDetail, 0, len(exp)) |
| for testName, digests := range exp { |
| for digest, label := range digests { |
| triageDetails = append(triageDetails, expstorage.TriageDetail{ |
| TestName: testName, |
| Digest: digest, |
| Label: label.String(), |
| }) |
| } |
| } |
| |
| sort.Slice(triageDetails, func(i, j int) bool { |
| return (triageDetails[i].TestName < triageDetails[j].TestName) || |
| ((triageDetails[i].TestName == triageDetails[j].TestName) && |
| (triageDetails[i].Digest < triageDetails[j].Digest)) |
| }) |
| detailRecs[idx] = triageDetails |
| return nil |
| }) |
| }(idx, expChange.ExpectationsBlob) |
| } |
| } |
| |
| // Wait for all queries to finish. |
| if err := egroup.Wait(); err != nil { |
| return nil, 0, err |
| } |
| |
| // Fill in the details. |
| if details { |
| for i := range ret { |
| ret[i].Details = detailRecs[i] |
| } |
| } |
| |
| return ret, len(allKeys), nil |
| } |
| |
| // UndoChange implements the ExpectationsStore interface. |
| func (c *DSExpStore) UndoChange(ctx context.Context, changeIDStr, userID string) (types.Expectations, error) { |
| changeID, err := strconv.ParseInt(changeIDStr, 10, 64) |
| // Make sure the entity is valid. |
| if err != nil || changeID <= 0 { |
| return nil, sklog.FmtErrorf("Change with id %s does not exist.", changeIDStr) |
| } |
| |
| // Fetch the change record of the change we want to undo. |
| expChange := &ExpChange{} |
| expChangeKey := ds.NewKey(c.changeKind) |
| expChangeKey.ID = changeID |
| if err := c.client.Get(ctx, expChangeKey, expChange); err != nil { |
| if err == datastore.ErrNoSuchEntity { |
| return nil, sklog.FmtErrorf("Change with id %d does not exist.", changeID) |
| } |
| return nil, sklog.FmtErrorf("Error retrieving change %d: %s", expChangeKey.ID, err) |
| } |
| |
| // Fetch the actual changes. |
| undoChanges := types.Expectations{} |
| if err := c.blobStore.Load(expChange.ExpectationsBlob, &undoChanges); err != nil { |
| return nil, sklog.FmtErrorf("Error retrieving expectations blob: %s", err) |
| } |
| |
| // If this has been undone already, then don't do it. |
| if expChange.UndoChangeID != 0 { |
| return nil, fmt.Errorf("Unable to undo change %d which was created as an undo of change %d.", changeID, expChange.UndoChangeID) |
| } |
| |
| // Retrieve the keys of all changes prior to the one we want to undo to |
| // build the expectations at the time of the original change |
| prevChangeKeys, err := c.getExpChangeKeys(ctx, changeID) |
| if err != nil { |
| return nil, sklog.FmtErrorf("Error retrieving keys for expectation changes: %s", err) |
| } |
| |
| // Build the expectations at that point. |
| prevExp, err := c.calcExpectations(ctx, prevChangeKeys) |
| if err != nil { |
| return nil, sklog.FmtErrorf("Unable to get expectations for undo: %s", err) |
| } |
| |
| changes := types.Expectations{} |
| for testName, digests := range undoChanges { |
| changes[testName] = make(types.TestClassification, len(digests)) |
| for digest := range digests { |
| changes[testName][digest] = prevExp[testName][digest] |
| } |
| } |
| |
| _, err = c.makeChange(ctx, changes, userID, c.getUniqueTimeStampMs(), changeID, true) |
| return changes, err |
| } |
| |
| // Clear implements the ExpectationsStore interface. |
| func (c *DSExpStore) Clear(ctx context.Context) error { |
| delKeys := []*datastore.Key{c.summaryKey, c.expectationsKey} |
| |
| allExpChangeKeys, err := c.getExpChangeKeys(ctx, 0) |
| if err != nil { |
| return sklog.FmtErrorf("Error retrieving keys for expectation changes: %s", err) |
| } |
| delBlobKeys := make([]*datastore.Key, 0, len(allExpChangeKeys)+1) |
| |
| _, state, err := c.loadCurrentExpectations(nil) |
| if err != nil { |
| return sklog.FmtErrorf("Error loading current expectations: %s", err) |
| } |
| delBlobKeys = append(delBlobKeys, state.ExpectationsBlob) |
| |
| // Extract the keys of the blobs storing the expectations. |
| for _, key := range allExpChangeKeys { |
| expChange := &ExpChange{} |
| if err := c.client.Get(ctx, key, expChange); err != nil { |
| return sklog.FmtErrorf("Error retrieving expectations change %d: %s", key.ID, err) |
| } |
| delBlobKeys = append(delBlobKeys, expChange.ExpectationsBlob) |
| } |
| |
| // Add the expectation change keys to the keys that need to be deleted. |
| delKeys = append(delKeys, allExpChangeKeys...) |
| |
| var egroup errgroup.Group |
| |
| // Delete the expectations blobs. |
| for _, key := range delBlobKeys { |
| if key != nil { |
| func(key *datastore.Key) { |
| egroup.Go(func() error { return c.blobStore.Delete(key) }) |
| }(key) |
| } |
| } |
| |
| // Delete all keys we have accumulated. 500 at a time which the limit for |
| // cloud datastore. |
| for _, batch := range dsutil.Batch(delKeys, 500) { |
| func(batch []*datastore.Key) { |
| egroup.Go(func() error { return c.client.DeleteMulti(ctx, batch) }) |
| }(batch) |
| } |
| |
| // Wait until it's all done. |
| return egroup.Wait() |
| } |
| |
| // calcExpectations calculates the expectations by accumulating the expectation changes |
| // referenced by the given list of keys. keys are assumed to be sorted in |
| // reverse chronological order. |
| func (c *DSExpStore) calcExpectations(ctx context.Context, keys []*datastore.Key) (types.Expectations, error) { |
| concurrent := make(chan bool, 10000) |
| changes := make([]types.Expectations, len(keys)) |
| var egroup errgroup.Group |
| |
| for idx, key := range keys { |
| concurrent <- true |
| func(idx int, key *datastore.Key) { |
| egroup.Go(func() error { |
| defer func() { |
| <-concurrent |
| }() |
| |
| exps, err := c.getChanges(ctx, key) |
| changes[idx] = exps |
| return err |
| }) |
| }(idx, key) |
| } |
| if err := egroup.Wait(); err != nil { |
| return nil, err |
| } |
| |
| ret := types.Expectations{} |
| for i := len(changes) - 1; i >= 0; i-- { |
| ret.MergeExpectations(changes[i]) |
| } |
| return ret, nil |
| } |
| |
| // makeChange updates the expectations by adding a new change record to the datastore. |
| // timeStampMs is the timestamp of this change. |
| // If undoChangeId is larger than 0 then it will be recorded in the change record |
| // since this is an undo of an earlier change. |
| // If transactional is true it the change will be added in a transaction. |
| // This should only be false when we import existing data. |
| func (c *DSExpStore) makeChange(ctx context.Context, changes types.Expectations, userId string, timeStampMs int64, undoChangeID int64, transactional bool) (changeKey *datastore.Key, err error) { |
| // Get the total count of changes so we can include it in the change record. |
| count := 0 |
| for _, digests := range changes { |
| count += len(digests) |
| } |
| |
| // Write the expectation changes. |
| blobKey, err := c.blobStore.Save(changes) |
| if err != nil { |
| return nil, sklog.FmtErrorf("Saving changes to blob failed: %s", err) |
| } |
| |
| // If we have an error it means the transaction below failed and we want |
| // to delete the part that was created outside of the transaction. |
| purgeKeys := []*datastore.Key(nil) |
| actions := dsutil.TxActions{} |
| actions.AddRollbackFn(func() error { return c.blobStore.Delete(blobKey) }) |
| actions.AddRollbackFn(func() error { return c.client.DeleteMulti(ctx, purgeKeys) }) |
| defer func() { actions.Run(err) }() |
| |
| // Add a new change record with the OK flag set to false. This |
| // allows us to create change records outside of the transaction and |
| // potentially in parallel without the write limits of doing it in a |
| // transaction. The change record is not valid (= included in |
| // searches until the OK flag is set to true inside the transaction below). |
| changeKey = dsutil.TimeSortableKey(c.changeKind, timeStampMs) |
| expChange := &ExpChange{ |
| IssueID: c.issueID, |
| UserID: userId, |
| UndoChangeID: undoChangeID, |
| TimeStamp: timeStampMs, |
| OK: false, |
| ExpectationsBlob: blobKey, |
| Count: int64(count), |
| } |
| if changeKey, err = c.client.Put(ctx, changeKey, expChange); err != nil { |
| return nil, sklog.FmtErrorf("Error writing change record: %s", err) |
| } |
| purgeKeys = append(purgeKeys, changeKey) |
| |
| updateFn := func(tx *datastore.Transaction) error { |
| // Start transaction to: |
| // - store the key of the new change record to deal with eventual consistency |
| // - add the change to the summary |
| // - mark the change as valid. |
| |
| // Update the recent changes so we get full consistency on queries. |
| if err := c.recentKeysList.Add(tx, changeKey); err != nil { |
| return err |
| } |
| |
| // Update the overall expectations |
| if err := c.updateCurrentExpectations(tx, changes, false, &actions); err != nil { |
| return err |
| } |
| |
| // Mark the expectation change as valid. |
| expChange.OK = true |
| _, err := tx.Put(changeKey, expChange) |
| return err |
| } |
| |
| // Run the relevant updates in a transaction. |
| if transactional { |
| if _, err = c.client.RunInTransaction(ctx, updateFn); err != nil { |
| return nil, sklog.FmtErrorf("Error updating expectations and recentKeysList for change %d: %s", changeKey.ID, err) |
| } |
| } else { |
| expChange.OK = true |
| if _, err = c.client.Mutate(ctx, datastore.NewUpdate(changeKey, expChange)); err != nil { |
| return nil, sklog.FmtErrorf("Error commiting the expectation change: %s", err) |
| } |
| } |
| |
| if c.eventBus != nil { |
| c.eventBus.Publish(c.eventExpChange, &expstorage.EventExpectationChange{ |
| TestChanges: changes, |
| IssueID: c.issueID, |
| }, c.globalEvent) |
| } |
| return changeKey, nil |
| } |
| |
| // updateCurrentExpectations updates the current overall expectations with the changes |
| // provided. The expectations are the sum of all change records in the database. |
| // We continuously keep track of that sum as new change records are added. |
| func (c *DSExpStore) updateCurrentExpectations(tx *datastore.Transaction, changes types.Expectations, overwrite bool, actions *dsutil.TxActions) (err error) { |
| currentExp, expState, err := c.loadCurrentExpectations(tx) |
| if err != nil { |
| return sklog.FmtErrorf("Error loading current expectations: %s", err) |
| } |
| oldExpsBlob := expState.ExpectationsBlob |
| |
| if overwrite || (currentExp == nil) { |
| currentExp = changes.DeepCopy() |
| } else { |
| currentExp.MergeExpectations(changes) |
| } |
| |
| // Create a new entry for the expectations |
| newBlobKey, err := c.blobStore.Save(currentExp) |
| if err != nil { |
| return sklog.FmtErrorf("Error writing new expectations: %s", err) |
| } |
| |
| // delete the new blob if we fail |
| delNewBlobFn := func() error { |
| if err := c.blobStore.Delete(newBlobKey); err != nil { |
| return sklog.FmtErrorf("Error deleting new expectations blob: %s", err) |
| } |
| return nil |
| } |
| |
| // either at the very end of this function or as part of the transaction |
| if tx == nil { |
| defer func() { |
| if err != nil { |
| util.LogErr(delNewBlobFn()) |
| } |
| }() |
| } else { |
| actions.AddRollbackFn(delNewBlobFn) |
| } |
| |
| // Write the new key to our expectation state |
| expState.ExpectationsBlob = newBlobKey |
| |
| putFn := dsutil.PutFn(c.client, tx) |
| if err = putFn(c.expectationsKey, expState); err != nil { |
| return sklog.FmtErrorf("Error writing new expectations blob: %s", err) |
| } |
| |
| // If there is not old blob to be deleted we are done |
| if oldExpsBlob == nil { |
| return nil |
| } |
| |
| // Remove the old blob either right away or after the transaction succeeds |
| delOldBlob := func() error { |
| if err := c.blobStore.Delete(oldExpsBlob); err != nil { |
| return sklog.FmtErrorf("Error removing old expectations blob: %s", err) |
| } |
| return nil |
| } |
| if tx == nil { |
| if err := delOldBlob(); err != nil { |
| sklog.Errorf("Error deleting old blob data: %s", err) |
| } |
| return nil |
| } |
| actions.AddCommitFn(delOldBlob) |
| return nil |
| } |
| |
| // getExpChangeKeys returns the keys of all expectation changes for the given issue |
| // in reverse chronological order. If beforeID is larger than 0 it is assumed to be |
| // an ID that was created via TimeSortableKey and we only want to retrieve keys that are |
| // older than the time stamp encoded in beforeID. |
| // The time is extracted with the GetTimeFromID function. |
| func (c *DSExpStore) getExpChangeKeys(ctx context.Context, beforeID int64) ([]*datastore.Key, error) { |
| // Query all changes |
| var egroup errgroup.Group |
| var queryKeys []*datastore.Key |
| egroup.Go(func() error { |
| q := ds.NewQuery(c.changeKind). |
| Filter("OK =", true). |
| KeysOnly() |
| |
| if c.issueID > 0 { |
| q = q.Filter("IssueID =", c.issueID) |
| } |
| |
| var err error |
| queryKeys, err = c.client.GetAll(ctx, q, nil) |
| return err |
| }) |
| |
| // Load the recent added changes. |
| var recently *dsutil.Recently |
| egroup.Go(func() error { |
| var err error |
| // Get the recently changed keys. Note: these are added/removed in a |
| // transaction so we are guaranteed they their OK value is true. |
| recently, err = c.recentKeysList.GetRecent() |
| return err |
| }) |
| |
| if err := egroup.Wait(); err != nil { |
| return nil, sklog.FmtErrorf("Error retrieving keys of expectation changes: %s", err) |
| } |
| |
| // Combine the recent keys with the result of the query for a consistent list |
| // of the keys. ret will be sorted. |
| ret := recently.Combine(queryKeys) |
| |
| // Remove all keys that are newer than the target key |
| if beforeID > 0 { |
| // Find keys that are strictly older than the given ID. |
| beforeTS := dsutil.GetTimeFromID(beforeID) |
| idx := sort.Search(len(ret), func(i int) bool { |
| return dsutil.GetTimeFromID(ret[i].ID) < beforeTS |
| }) |
| ret = ret[idx:] |
| } |
| |
| return ret, nil |
| } |
| |
| // loadCurrentExpectations loads the current expectations for this expectation |
| // store (either for the master branch or for a Gerrit issue). If no expectations |
| // have been set it will return non-nil values and no error. |
| func (c *DSExpStore) loadCurrentExpectations(tx *datastore.Transaction) (types.Expectations, *expectationsState, error) { |
| getFn := dsutil.GetFn(c.client, tx) |
| exp := types.Expectations{} |
| expState := &expectationsState{} |
| if err := getFn(c.expectationsKey, expState); err != nil && err != datastore.ErrNoSuchEntity { |
| return nil, nil, err |
| } |
| |
| var err error |
| if expState.ExpectationsBlob != nil { |
| if err = c.blobStore.Load(expState.ExpectationsBlob, &exp); err != nil { |
| return nil, nil, err |
| } |
| } |
| |
| return exp, expState, err |
| } |
| |
| // getChanges loads the changes for the given expectations change key. |
| func (c *DSExpStore) getChanges(ctx context.Context, expChangeKey *datastore.Key) (types.Expectations, error) { |
| expChange := &ExpChange{} |
| if err := c.client.Get(ctx, expChangeKey, expChange); err != nil { |
| return nil, err |
| } |
| |
| ret := types.Expectations{} |
| if expChange.ExpectationsBlob != nil { |
| if err := c.blobStore.Load(expChange.ExpectationsBlob, &ret); err != nil { |
| return nil, sklog.FmtErrorf("Unable to load expectations blob: %s", err) |
| } |
| } |
| return ret, nil |
| } |
| |
| // getUniqueTimeStampMs returns a unique time in milliseconds |
| func (c *DSExpStore) getUniqueTimeStampMs() int64 { |
| c.tsMutex.Lock() |
| defer c.tsMutex.Unlock() |
| ts := util.TimeStampMs() |
| if ts <= c.lastTS { |
| ts = c.lastTS + 1 |
| } |
| c.lastTS = ts |
| return ts |
| } |
| |
| // Make sure DSExpStore fulfills the ExpectationsStore interface |
| var _ expstorage.ExpectationsStore = (*DSExpStore)(nil) |