blob: a110297592238057bb8df705c96215c50e6a6de0 [file] [log] [blame]
package ds_tryjobstore
import (
"context"
"fmt"
"reflect"
"sort"
"sync/atomic"
"cloud.google.com/go/datastore"
"go.skia.org/infra/go/ds"
"go.skia.org/infra/go/eventbus"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/golden/go/tryjobstore"
"go.skia.org/infra/golden/go/types"
"golang.org/x/sync/errgroup"
)
const (
// batchsize is the maximal size of entities processed in a single batch. This
// is to stay below the limit of 500 and empirically keeps transactions small
// enough to stay below 10MB limit.
batchSize = 300
)
// DSTryjobStore implements the TryjobStore interface on top of cloud datastore.
type DSTryjobStore struct {
client *datastore.Client
eventBus eventbus.EventBus
}
// New creates a new instance of TryjobStore based on Datastore.
func New(client *datastore.Client, eventBus eventbus.EventBus) (*DSTryjobStore, error) {
if client == nil {
return nil, skerr.Fmt("received nil for datastore client")
}
if eventBus == nil {
return nil, skerr.Fmt("received nil for eventbus")
}
return &DSTryjobStore{
client: client,
eventBus: eventBus,
}, nil
}
// ListIssues implements the TryjobStore interface.
func (c *DSTryjobStore) ListIssues(offset, size int) ([]*tryjobstore.Issue, int, error) {
ctx := context.TODO()
query := ds.NewQuery(ds.ISSUE).KeysOnly().Order("-Updated")
keys, err := c.client.GetAll(ctx, query, nil)
if err != nil {
return nil, 0, err
}
total := len(keys)
start := util.MinInt(total, offset)
end := util.MinInt(start+size, total)
targetKeys := keys[start:end]
if len(targetKeys) == 0 {
return []*tryjobstore.Issue{}, total, nil
}
// Fetch the entities.
ret := make([]*tryjobstore.Issue, len(targetKeys))
if err := c.client.GetMulti(ctx, targetKeys, ret); err != nil {
return nil, 0, err
}
return ret, total, nil
}
// GetIssue implements the TryjobStore interface.
func (c *DSTryjobStore) GetIssue(issueID int64, loadTryjobs bool) (*tryjobstore.Issue, error) {
target := &tryjobstore.Issue{}
key := c.getIssueKey(issueID)
ok, err := c.getEntity(key, target, nil)
if err != nil {
return nil, sklog.FmtErrorf("Error in getEntity for %s: %s", key, err)
}
if !ok {
return nil, nil
}
if loadTryjobs {
_, tryjobsMap, err := c.getTryjobsForIssue(issueID, nil, false, true)
if err != nil {
return nil, sklog.FmtErrorf("Error getting tryjobs for issue: %s", err)
}
for patchsetID, tryjobs := range tryjobsMap {
ps := target.FindPatchset(patchsetID)
if ps == nil {
return nil, sklog.FmtErrorf("Unable to find patchset %d in issue %d:", patchsetID, target.ID)
}
ps.Tryjobs = tryjobs
}
}
return target, nil
}
// UpdateIssue implements the TryjobStore interface.
func (c *DSTryjobStore) UpdateIssue(details *tryjobstore.Issue, updateFn tryjobstore.NewValueFn) error {
_, err := c.updateEntity(c.getIssueKey(details.ID), details, nil, false, updateFn)
return err
}
// CommitIssueExp implements the TryjobStore interface.
func (c *DSTryjobStore) CommitIssueExp(issueID int64, commitFn func() error) error {
// setCommittedFn is executed in a transaction below
setCommittedFn := func(tx *datastore.Transaction) error {
issue := &tryjobstore.Issue{}
key := c.getIssueKey(issueID)
ok, err := c.getEntity(key, issue, nil)
if err != nil {
return skerr.Fmt("Error in getEntity for %s: %s", key, err)
}
if !ok {
return skerr.Fmt("Unable to find issue %d.", issueID)
}
// If this is already committed then we are done.
if issue.Committed {
return nil
}
// Execute the commit function to commit the actual expectations.
if err := commitFn(); err != nil {
sklog.Warningf("Ran into error committing the expectations: %s", err)
return err
}
issue.Committed = true
_, err = c.updateEntity(key, issue, tx, true, nil)
return err
}
_, err := c.client.RunInTransaction(context.TODO(), setCommittedFn)
return err
}
// DeleteIssue implements the TryjobStore interface.
func (c *DSTryjobStore) DeleteIssue(issueID int64) error {
ctx := context.TODO()
// Delete any tryjobs that are still there.
if err := c.deleteTryjobsForIssue(issueID); err != nil {
return skerr.Fmt("could not delete tryjobs for issue %d: %s", issueID, err)
}
key := c.getIssueKey(issueID)
// Delete the entity.
return c.client.Delete(ctx, key)
}
// GetTryjob implements the TryjobStore interface.
func (c *DSTryjobStore) GetTryjob(issueID, buildBucketID int64) (*tryjobstore.Tryjob, error) {
ret := &tryjobstore.Tryjob{}
if err := c.client.Get(context.TODO(), c.getTryjobKey(buildBucketID), ret); err != nil {
if err == datastore.ErrNoSuchEntity {
return nil, nil
}
return nil, err
}
return ret, nil
}
// UpdateTryjob implements the TryjobStore interface.
func (c *DSTryjobStore) UpdateTryjob(buildBucketID int64, tryjob *tryjobstore.Tryjob, newValFn tryjobstore.NewValueFn) error {
// If this is an update that needs to call the newVal function, then set the parameters for updateEntity right.
if tryjob == nil {
// make sure we have the necessary information if there are no data to be written directly.
if (buildBucketID == 0) || (newValFn == nil) {
return skerr.Fmt("ID and newValFn cannot be nil when no tryjob is provided. Update not possible")
}
tryjob = &tryjobstore.Tryjob{}
} else {
// signal to updateEntity that this not in a transaction. Just in case.
newValFn = nil
buildBucketID = tryjob.BuildBucketID
}
newTryjob, err := c.updateEntity(c.getTryjobKey(buildBucketID), tryjob, nil, false, newValFn)
if err != nil {
return err
}
c.eventBus.Publish(tryjobstore.EV_TRYJOB_UPDATED, copyTryjob(newTryjob.(*tryjobstore.Tryjob)), true)
return nil
}
func copyTryjob(t *tryjobstore.Tryjob) *tryjobstore.Tryjob {
ret := &tryjobstore.Tryjob{}
*ret = *t
return ret
}
// GetTryjobs implements the TryjobStore interface.
func (c *DSTryjobStore) GetTryjobs(issueID int64, patchsetIDs []int64, filterDup bool, loadResults bool) ([]*tryjobstore.Tryjob, [][]*tryjobstore.TryjobResult, error) {
flatTryjobKeys, tryjobsMap, err := c.getTryjobsForIssue(issueID, patchsetIDs, false, filterDup)
if err != nil {
return nil, nil, skerr.Fmt("could not locate tryjobs for issue %d: %s", issueID, err)
}
// Flatten the Tryjobs map and make sure the element in keys matches.
tryjobs := make([]*tryjobstore.Tryjob, 0, len(flatTryjobKeys))
for _, tjs := range tryjobsMap {
tryjobs = append(tryjobs, tjs...)
}
sort.Slice(tryjobs, func(i, j int) bool {
return tryjobs[i].Builder < tryjobs[j].Builder
})
var results [][]*tryjobstore.TryjobResult
if loadResults {
var err error
results, err = c.GetTryjobResults(tryjobs)
if err != nil {
return nil, nil, skerr.Fmt("could not get tryjob results for issue %d: %s", issueID, err)
}
}
return tryjobs, results, nil
}
// RunningTryjobs implements the TryjobStore interface.
func (c *DSTryjobStore) RunningTryjobs() ([]*tryjobstore.Tryjob, error) {
query := ds.NewQuery(ds.TRYJOB).
Filter("Status <", int(tryjobstore.TRYJOB_COMPLETE))
tryjobs := []*tryjobstore.Tryjob{}
_, err := c.client.GetAll(context.TODO(), query, &tryjobs)
if err != nil {
return nil, sklog.FmtErrorf("Error making GetAll call: %s", err)
}
return tryjobs, nil
}
// GetTryjobResults implements the TryjobStore interface.
func (c *DSTryjobStore) GetTryjobResults(tryjobs []*tryjobstore.Tryjob) ([][]*tryjobstore.TryjobResult, error) {
tryjobKeys := make([]*datastore.Key, 0, len(tryjobs))
for _, tryjob := range tryjobs {
tryjobKeys = append(tryjobKeys, tryjob.Key)
}
_, tryjobResults, err := c.getResultsForTryjobs(tryjobKeys, false)
if err != nil {
return nil, err
}
return tryjobResults, nil
}
// UpdateTryjobResult implements the TryjobStore interface.
func (c *DSTryjobStore) UpdateTryjobResult(results []*tryjobstore.TryjobResult) error {
keys := make([]*datastore.Key, 0, len(results))
uniqueEntries := util.StringSet{}
for _, result := range results {
// Make sure that tests are not bunched together.
if len(result.Params[types.PRIMARY_KEY_FIELD]) != 1 {
return fmt.Errorf("Parameter value for primary key field '%s' must exactly contain one value. Found: %v", types.PRIMARY_KEY_FIELD, result.Params[types.PRIMARY_KEY_FIELD])
}
keys = append(keys, c.getTryjobResultKey())
uniqueEntries[string(result.TestName)+string(result.Digest)] = true
}
if len(uniqueEntries) != len(keys) {
return fmt.Errorf("All (test,digest) pairs must be unique when adding tryjob results.")
}
for i := 0; i < len(keys); i += batchSize {
endIdx := util.MinInt(i+batchSize, len(keys))
if _, err := c.client.PutMulti(context.TODO(), keys[i:endIdx], results[i:endIdx]); err != nil {
return err
}
}
return nil
}
// deleteTryjobsForIssue deletes all tryjob information for the given issue.
func (c *DSTryjobStore) deleteTryjobsForIssue(issueID int64) error {
// Get all the tryjob keys.
tryjobKeys, _, err := c.getTryjobsForIssue(issueID, nil, true, false)
if err != nil {
return fmt.Errorf("Error retrieving tryjob keys: %s", err)
}
ctx := context.TODO()
// Delete all results of the tryjobs.
tryjobResultKeys, _, err := c.getResultsForTryjobs(tryjobKeys, true)
if err != nil {
return err
}
for _, keys := range tryjobResultKeys {
// Break the keys down in batches.
for i := 0; i < len(keys); i += batchSize {
currBatch := keys[i:util.MinInt(i+batchSize, len(keys))]
if err := c.client.DeleteMulti(ctx, currBatch); err != nil {
return fmt.Errorf("Error deleting tryjob results: %s", err)
}
}
}
// Delete the tryjobs themselves.
if err := c.client.DeleteMulti(ctx, tryjobKeys); err != nil {
return fmt.Errorf("Error deleting %d tryjobs for issue %d: %s", len(tryjobKeys), issueID, err)
}
return nil
}
// getResultsForTryjobs returns the test results for the given tryjobs.
func (c *DSTryjobStore) getResultsForTryjobs(tryjobKeys []*datastore.Key, keysOnly bool) ([][]*datastore.Key, [][]*tryjobstore.TryjobResult, error) {
// Collect all results across tryjobs.
n := len(tryjobKeys)
tryjobResultKeys := make([][]*datastore.Key, n)
var tryjobResults [][]*tryjobstore.TryjobResult = nil
if !keysOnly {
tryjobResults = make([][]*tryjobstore.TryjobResult, n)
}
// Get there keys and results.
ctx := context.TODO()
var egroup errgroup.Group
for idx, key := range tryjobKeys {
func(idx int, key *datastore.Key) {
egroup.Go(func() error {
query := ds.NewQuery(ds.TRYJOB_RESULT).Filter("BuildBucketID =", key.ID)
if keysOnly {
query = query.KeysOnly()
}
queryResult := []*tryjobstore.TryjobResult{}
var err error
if tryjobResultKeys[idx], err = c.client.GetAll(ctx, query, &queryResult); err != nil {
return err
}
if !keysOnly {
tryjobResults[idx] = queryResult
}
return nil
})
}(idx, key)
}
if err := egroup.Wait(); err != nil {
return nil, nil, fmt.Errorf("Error getting tryjob results: %s", err)
}
return tryjobResultKeys, tryjobResults, nil
}
// getTryjobsForIssue is a utility function that retrieves the Tryjobs for a given
// issue and list of patchsets. If keysOnly is true only the keys of the Tryjobs will
// be returned. If filterDup is true duplicate Tryjobs will be filtered out for
// each patchset, only keeping the newest.
// The first return value is the unordered slice of keys of the Tryjobs.
// The second return value groups the tryjobs for each patchset as a map[patch_set_id][]*tryjobstore.Tryjob.
func (c *DSTryjobStore) getTryjobsForIssue(issueID int64, patchsetIDs []int64, keysOnly bool, filterDup bool) ([]*datastore.Key, map[int64][]*tryjobstore.Tryjob, error) {
if keysOnly && filterDup {
return nil, nil, sklog.FmtErrorf("filterDup cannot be true when keysOnly is true, since Tryjob is necessary for filtering")
}
if len(patchsetIDs) == 0 {
patchsetIDs = []int64{-1}
} else {
sort.Slice(patchsetIDs, func(i, j int) bool { return patchsetIDs[i] < patchsetIDs[j] })
}
n := len(patchsetIDs)
keysArr := make([][]*datastore.Key, n)
valsArr := make([][]*tryjobstore.Tryjob, n)
resultSize := int32(0)
var egroup errgroup.Group
for idx, patchsetID := range patchsetIDs {
func(idx int, patchsetID int64) {
egroup.Go(func() error {
query := ds.NewQuery(ds.TRYJOB).
Filter("IssueID =", issueID)
if patchsetID > 0 {
query = query.Filter("PatchsetID =", patchsetID)
}
var tryjobs []*tryjobstore.Tryjob = nil
if keysOnly {
query = query.KeysOnly()
}
keys, err := c.client.GetAll(context.TODO(), query, &tryjobs)
if err != nil {
return fmt.Errorf("Error making GetAll call: %s", err)
}
keysArr[idx] = keys
valsArr[idx] = tryjobs
atomic.AddInt32(&resultSize, int32(len(keys)))
return nil
})
}(idx, patchsetID)
}
if err := egroup.Wait(); err != nil {
return nil, nil, err
}
// Assemble the flat array of keys.
retKeys := make([]*datastore.Key, 0, resultSize)
for _, keys := range keysArr {
retKeys = append(retKeys, keys...)
}
// If all we want are the keys we are done.
if keysOnly {
return retKeys, nil, nil
}
// Group the tryjobs by their patchsets
tryjobsMap := make(map[int64][]*tryjobstore.Tryjob, resultSize)
for _, tryjobs := range valsArr {
for _, tj := range tryjobs {
tryjobsMap[tj.PatchsetID] = append(tryjobsMap[tj.PatchsetID], tj)
}
}
// Go through the patchsets and dedupe tryjobs for each.
if filterDup {
for patchsetID, tryjobs := range tryjobsMap {
// sort when they were last updated
sort.Slice(tryjobs, func(i, j int) bool { return tryjobs[i].Updated.Before(tryjobs[j].Updated) })
// Iterate the builders in reverse order and filter out duplicate builders.
builders := util.StringSet{}
for i := len(tryjobs) - 1; i >= 0; i-- {
if builders[tryjobs[i].Builder] {
copy(tryjobs[i:], tryjobs[i+1:])
tryjobs[len(tryjobs)-1] = nil
tryjobs = tryjobs[:len(tryjobs)-1]
} else {
builders[tryjobs[i].Builder] = true
}
}
// NOTE: Store the new slice back into the tryjobs map since we might have removed some values.
tryjobsMap[patchsetID] = tryjobs
}
}
return retKeys, tryjobsMap, nil
}
// updateEntity writes the given entity to the datastore. If the
// newValFn is not nil an error will be returned if the entity does not exist.
// The non-nil return value of newValFn will be written to the data store.
// If newValFn returns nil nothing will be written to the datastore.
// If newValFn is nil the entity will be written to the datastore if either does
// not exist yet or is newer than the existing entity.
func (c *DSTryjobStore) updateEntity(key *datastore.Key, item newerInterface, tx *datastore.Transaction, force bool, newValFn tryjobstore.NewValueFn) (interface{}, error) {
// Update the issue if the provided one is newer.
updateFn := func(tx *datastore.Transaction) error {
curr := reflect.New(reflect.TypeOf(item).Elem()).Interface()
ok, err := c.getEntity(key, curr, tx)
if err != nil {
return err
}
// If this is an in-transaction update then get the new value from the current one.
if newValFn != nil {
if ok {
newVal := newValFn(curr)
// No update required. We are done.
if newVal == nil {
return nil
}
item = newVal.(newerInterface)
} else {
return sklog.FmtErrorf("Unable to find item %s for transactional update.", key)
}
} else if ok && !force && !item.MoreRecentThan(curr) {
// We found an item that is not more recent than the current one: Nothing to do.
return nil
}
_, err = tx.Put(key, item)
return err
}
// Run the transaction.
var err error
if tx == nil {
_, err = c.client.RunInTransaction(context.TODO(), updateFn)
} else {
err = updateFn(tx)
}
return item, err
}
// getEntity loads the entity defined by key into target. If tx is not nil it uses the transaction.
func (c *DSTryjobStore) getEntity(key *datastore.Key, target interface{}, tx *datastore.Transaction) (bool, error) {
var err error
if tx == nil {
err = c.client.Get(context.TODO(), key, target)
} else {
err = tx.Get(key, target)
}
if err != nil {
// If we couldn't find it return nil, but no error.
if err == datastore.ErrNoSuchEntity {
return false, nil
}
return false, err
}
return true, nil
}
// getIssueKey returns a datastore key for the given issue id.
func (c *DSTryjobStore) getIssueKey(id int64) *datastore.Key {
ret := ds.NewKey(ds.ISSUE)
ret.ID = id
return ret
}
// getTryjobKey returns a datastore key for the given buildbucketID.
func (c *DSTryjobStore) getTryjobKey(buildBucketID int64) *datastore.Key {
ret := ds.NewKey(ds.TRYJOB)
ret.ID = buildBucketID
return ret
}
// getTryjobResultKey returns a key for the given tryjobResult.
func (c *DSTryjobStore) getTryjobResultKey() *datastore.Key {
return ds.NewKey(ds.TRYJOB_RESULT)
}
// Make sure DSTryjobStore fulfills the TryjobStore interface
var _ tryjobstore.TryjobStore = (*DSTryjobStore)(nil)