blob: f959af25ff2f77037072116629ba7e9c1a9498e2 [file] [log] [blame]
// Package dsregressionstore implements regression.Store using Google Cloud Datastore.
package dsregressionstore
import (
"context"
"encoding/json"
"fmt"
"sync"
"cloud.google.com/go/datastore"
"go.skia.org/infra/go/ds"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/perf/go/cid"
"go.skia.org/infra/perf/go/clustering2"
"go.skia.org/infra/perf/go/dataframe"
"go.skia.org/infra/perf/go/regression"
"go.skia.org/infra/perf/go/types"
"google.golang.org/api/iterator"
)
// dsEntry is used for storing regression.Regressions in a RegressionStore.
type dsEntry struct {
TS int64
Triaged bool
Body string `datastore:",noindex"`
}
// RegressionStoreDS implements RegressionStore using Google Cloud Datastore.
type RegressionStoreDS struct {
lookup regression.DetailLookup
mutex sync.Mutex
}
// NewRegressionStoreDS returns a new RegressionStoreDS.
func NewRegressionStoreDS(lookup regression.DetailLookup) *RegressionStoreDS {
return &RegressionStoreDS{
lookup: lookup,
}
}
// loadFromDS loads regression.Regressions stored for the given commit from Cloud Datastore.
func (s *RegressionStoreDS) loadFromDS(tx *datastore.Transaction, cid *cid.CommitDetail) (*regression.AllRegressionsForCommit, error) {
key := ds.NewKey(ds.REGRESSION)
key.Name = cid.ID()
entry := &dsEntry{}
if err := tx.Get(key, entry); err != nil {
return nil, err
}
ret := regression.New()
if err := json.Unmarshal([]byte(entry.Body), ret); err != nil {
return nil, fmt.Errorf("Failed to decode JSON body: %s", err)
}
return ret, nil
}
// storeToDS stores regression.Regressions for the given commit in Cloud Datastore.
func (s *RegressionStoreDS) storeToDS(tx *datastore.Transaction, cid *cid.CommitDetail, r *regression.AllRegressionsForCommit) error {
body, err := r.JSON()
if err != nil {
return fmt.Errorf("Failed to encode regression.Regressions to JSON: %s", err)
}
if len(body) > 1024*1024 {
return fmt.Errorf("regression.Regressions is too large, >1MB.")
}
entry := &dsEntry{
Body: string(body),
Triaged: r.Triaged(),
TS: cid.Timestamp,
}
key := ds.NewKey(ds.REGRESSION)
key.Name = cid.ID()
_, err = tx.Put(key, entry)
if err != nil {
return fmt.Errorf("Failed to write to database: %s", err)
}
return nil
}
// Range implements the RegressionStore interface.
func (s *RegressionStoreDS) Range(ctx context.Context, begin, end types.CommitNumber) (map[types.CommitNumber]*regression.AllRegressionsForCommit, error) {
beginDetail, err := s.lookup(ctx, &cid.CommitID{Offset: begin})
if err != nil {
return nil, skerr.Wrapf(err, "Failed for begin=%d", begin)
}
endDetail, err := s.lookup(ctx, &cid.CommitID{Offset: end})
if err != nil {
return nil, skerr.Wrapf(err, "Failed for end=%d", end)
}
ret := map[types.CommitNumber]*regression.AllRegressionsForCommit{}
q := ds.NewQuery(ds.REGRESSION).Filter("TS >=", beginDetail.Timestamp).Filter("TS <=", endDetail.Timestamp)
it := ds.DS.Run(ctx, q)
for {
entry := &dsEntry{}
key, err := it.Next(entry)
if err == iterator.Done {
break
} else if err != nil {
return nil, fmt.Errorf("Failed to read from database: %s", err)
}
reg := regression.New()
if err := json.Unmarshal([]byte(entry.Body), reg); err != nil {
return nil, fmt.Errorf("Failed to decode JSON body: %s", err)
}
// We use a form of the types.CommitNumber serialized as "master-%06d"
// as the key, so we need to convert that back to a types.CommitNumber.
c, err := cid.FromID(key.Name)
if err != nil {
return nil, fmt.Errorf("Failed to turn commit id into a CommitID: %s", err)
}
ret[types.CommitNumber(c.Offset)] = reg
}
return ret, nil
}
// SetHigh implements the RegressionStore interface.
func (s *RegressionStoreDS) SetHigh(ctx context.Context, cid *cid.CommitDetail, alertID string, df *dataframe.FrameResponse, high *clustering2.ClusterSummary) (bool, error) {
isNew := false
_, err := ds.DS.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
r, err := s.loadFromDS(tx, cid)
if err == datastore.ErrNoSuchEntity {
r = regression.New()
} else if err != nil {
return err
}
isNew = r.SetHigh(alertID, df, high)
return s.storeToDS(tx, cid, r)
})
return isNew, err
}
// SetLow implements the RegressionStore interface.
func (s *RegressionStoreDS) SetLow(ctx context.Context, cid *cid.CommitDetail, alertID string, df *dataframe.FrameResponse, low *clustering2.ClusterSummary) (bool, error) {
isNew := false
_, err := ds.DS.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
r, err := s.loadFromDS(tx, cid)
if err == datastore.ErrNoSuchEntity {
r = regression.New()
} else if err != nil {
return err
}
isNew = r.SetLow(alertID, df, low)
return s.storeToDS(tx, cid, r)
})
return isNew, err
}
// TriageLow implements the RegressionStore interface.
func (s *RegressionStoreDS) TriageLow(ctx context.Context, cid *cid.CommitDetail, alertID string, tr regression.TriageStatus) error {
_, err := ds.DS.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
r, err := s.loadFromDS(tx, cid)
if err != nil {
return fmt.Errorf("Failed to load regression.Regressions: %s", err)
}
if err = r.TriageLow(alertID, tr); err != nil {
return fmt.Errorf("Failed to update regression.Regressions: %s", err)
}
return s.storeToDS(tx, cid, r)
})
return err
}
// TriageHigh implements the RegressionStore interface.
func (s *RegressionStoreDS) TriageHigh(ctx context.Context, cid *cid.CommitDetail, alertID string, tr regression.TriageStatus) error {
_, err := ds.DS.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
r, err := s.loadFromDS(tx, cid)
if err != nil {
return fmt.Errorf("Failed to load regression.Regressions: %s", err)
}
if err = r.TriageHigh(alertID, tr); err != nil {
return fmt.Errorf("Failed to update regression.Regressions: %s", err)
}
return s.storeToDS(tx, cid, r)
})
return err
}
// Write implements the RegressionStore interface.
func (s *RegressionStoreDS) Write(ctx context.Context, regressions map[types.CommitNumber]*regression.AllRegressionsForCommit) error {
for commitNumber, reg := range regressions {
c := cid.CommitIDFromCommitNumber(commitNumber)
commitDetail, err := s.lookup(ctx, c)
if err != nil {
return fmt.Errorf("Could not find details for cid %v: %s", c, err)
}
_, err = ds.DS.RunInTransaction(context.TODO(), func(tx *datastore.Transaction) error {
return s.storeToDS(tx, commitDetail, reg)
})
if err != nil {
return fmt.Errorf("Could not store regressions for cid %v: %s", c, err)
}
}
return nil
}
// Confirm the RegressionStoreDS implements the RegressionStore interface.
var _ regression.Store = (*RegressionStoreDS)(nil)