blob: 7728dd29f856655828ac5f5a044fb9c512e671a4 [file] [log] [blame]
package recent_rolls
import (
"bytes"
"context"
"encoding/gob"
"fmt"
"sync"
"time"
"cloud.google.com/go/datastore"
"go.skia.org/infra/go/autoroll"
"go.skia.org/infra/go/ds"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"google.golang.org/api/iterator"
)
const (
// Number of rolls to return from GetRecentRolls().
RecentRollsLength = 10
// loadRollsPageSize is the maximum number of rolls retrieved in a single
// call to LoadRolls().
loadRollsPageSize = 25
)
type DB interface {
// Put inserts a roll into the RollsDB.
Put(ctx context.Context, roller string, roll *autoroll.AutoRollIssue) error
// Get retrieves a roll from the RollsDB.
Get(ctx context.Context, roller string, issue int64) (*autoroll.AutoRollIssue, error)
// GetRolls loads rolls from the database. Returns the rolls and a cursor
// which may be used to retrieve more rolls.
GetRolls(ctx context.Context, roller string, cursor string) ([]*autoroll.AutoRollIssue, string, error)
}
// Fake ancestor we supply for all ModeChanges, to force consistency.
// We lose some performance this way but it keeps our tests from
// flaking.
func fakeAncestor() *datastore.Key {
rv := ds.NewKey(ds.KIND_AUTOROLL_ROLL_ANCESTOR)
rv.ID = 13 // Bogus ID.
return rv
}
// DsRoll is a struct used for storing autoroll.AutoRollIssue objects in
// datastore. The AutoRollIssue is gob-serialized before and after inserting
// to simplify the interactions with datastore.
type DsRoll struct {
// Data is the gob-serialized AutoRollIssue.
Data []byte `datastore:"data,noindex"`
// Name of the roller.
Roller string `datastore:"roller"`
// RollerCreated is synthesized from the roller name and the creation
// time of the roll. This helps to keep the overall index well-
// distributed.
RollerCreated string `datastore:"rollerCreated"`
// RollerIssue is synthesized from the roller name and the issue ID of
// the roll. This helps to keep the overall index well-distributed. This
// field is used as the ID in datastore.
RollerIssue string `datastore:"rollerIssue"`
}
// RecentRolls is a struct used for storing and retrieving recent DEPS rolls.
type RecentRolls struct {
db DB
lastSuccessfulRollTime time.Time
numFailedrolls int
recent []*autoroll.AutoRollIssue
roller string
mtx sync.RWMutex
}
// NewRecentRolls returns a new RecentRolls instance.
func NewRecentRolls(ctx context.Context, db DB, roller string) (*RecentRolls, error) {
recentRolls := &RecentRolls{
roller: roller,
db: db,
}
if err := recentRolls.refreshRecentRolls(ctx); err != nil {
return nil, err
}
return recentRolls, nil
}
// Add adds a DEPS roll to the recent rolls list.
func (r *RecentRolls) Add(ctx context.Context, roll *autoroll.AutoRollIssue) error {
if err := roll.Validate(); err != nil {
return err
}
// Warn if we already have an active roll.
currentRoll := r.currentRoll()
if currentRoll != nil {
sklog.Warningf("There is already an active roll (%d), but another is being added (%d)", currentRoll.Issue, roll.Issue)
}
// Warn if the new roll is already closed.
if roll.Closed {
sklog.Warningf("Inserting a new roll which is already closed.")
}
if err := r.db.Put(ctx, r.roller, roll); err != nil {
return err
}
return r.refreshRecentRolls(ctx)
}
// Update updates the given DEPS roll in the recent rolls list.
func (r *RecentRolls) Update(ctx context.Context, roll *autoroll.AutoRollIssue) error {
// TODO(borenet): It would be better to pass in a function to perform
// the desired modifications on the AutoRollIssue inside of the
// transaction.
if err := roll.Validate(); err != nil {
return err
}
if err := r.db.Put(ctx, r.roller, roll); err != nil {
return err
}
return r.refreshRecentRolls(ctx)
}
// Get returns the given roll.
func (r *RecentRolls) Get(ctx context.Context, issue int64) (*autoroll.AutoRollIssue, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()
for _, roll := range r.recent {
if roll.Issue == issue {
return roll.Copy(), nil
}
}
// Fall back to retrieving from datastore.
return r.db.Get(ctx, r.roller, issue)
}
// GetRecentRolls returns a copy of the recent rolls list.
func (r *RecentRolls) GetRecentRolls() []*autoroll.AutoRollIssue {
r.mtx.RLock()
defer r.mtx.RUnlock()
recent := make([]*autoroll.AutoRollIssue, 0, len(r.recent))
for _, r := range r.recent {
elem := new(autoroll.AutoRollIssue)
*elem = *r
recent = append(recent, elem)
}
return recent
}
// currentRoll returns the currently-active DEPS roll, or nil if none exists.
// Does not copy the roll. Expects that the caller holds a lock.
func (r *RecentRolls) currentRoll() *autoroll.AutoRollIssue {
if len(r.recent) == 0 {
return nil
}
if r.recent[0].Closed {
return nil
}
return r.recent[0]
}
// CurrentRoll returns a copy of the currently-active DEPS roll, or nil if none
// exists.
func (r *RecentRolls) CurrentRoll() *autoroll.AutoRollIssue {
r.mtx.RLock()
defer r.mtx.RUnlock()
current := r.currentRoll()
if current != nil {
rv := new(autoroll.AutoRollIssue)
*rv = *current
return rv
}
return nil
}
// LastRoll returns a copy of the last DEPS roll, if one exists, and nil
// otherwise.
func (r *RecentRolls) LastRoll() *autoroll.AutoRollIssue {
r.mtx.RLock()
defer r.mtx.RUnlock()
if len(r.recent) > 0 && r.recent[0].Closed {
rv := new(autoroll.AutoRollIssue)
*rv = *r.recent[0]
return rv
} else if len(r.recent) > 1 {
rv := new(autoroll.AutoRollIssue)
*rv = *r.recent[1]
return rv
}
return nil
}
// LastSuccessfulRollTime returns the timestamp of the last successful roll.
func (r *RecentRolls) LastSuccessfulRollTime() time.Time {
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.lastSuccessfulRollTime
}
// NumFailedRolls returns the number of failed rolls since the last successful
// roll.
func (r *RecentRolls) NumFailedRolls() int {
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.numFailedrolls
}
// refreshRecentRolls refreshes the list of recent DEPS rolls. Assumes the
// caller holds a write lock.
func (r *RecentRolls) refreshRecentRolls(ctx context.Context) error {
// Load rolls until we have enough to satisfy RecentRollsLength and to
// determine the number of failed rolls and timestamp of the last successful
// roll.
var history, rolls []*autoroll.AutoRollIssue
var cursor string
foundSuccessfulRoll := false
lastSuccessfulRollTime := time.Time{}
numFailedrolls := 0
for {
var err error
rolls, cursor, err = r.db.GetRolls(ctx, r.roller, cursor)
if err != nil {
return err
}
history = append(history, rolls...)
if !foundSuccessfulRoll {
for _, roll := range rolls {
if roll.Succeeded() {
foundSuccessfulRoll = true
lastSuccessfulRollTime = roll.Modified
break
} else if roll.Failed() {
numFailedrolls++
}
}
}
if len(history) >= RecentRollsLength && foundSuccessfulRoll {
break
}
if cursor == "" || len(rolls) == 0 {
break
}
}
historyLen := len(history)
if historyLen > RecentRollsLength {
historyLen = RecentRollsLength
}
r.mtx.Lock()
defer r.mtx.Unlock()
r.recent = history[:historyLen]
r.lastSuccessfulRollTime = lastSuccessfulRollTime
r.numFailedrolls = numFailedrolls
return nil
}
// DatastoreRollsDB implements RollsDB using Datastore.
type DatastoreRollsDB struct{}
// NewDatastoreRollsDB returns a RollsDB instance which uses Datastore.
func NewDatastoreRollsDB(ctx context.Context) *DatastoreRollsDB {
return &DatastoreRollsDB{}
}
// Get implements RollsDB.
func (d *DatastoreRollsDB) Get(ctx context.Context, roller string, issue int64) (*autoroll.AutoRollIssue, error) {
query := ds.NewQuery(ds.KIND_AUTOROLL_ROLL).Ancestor(fakeAncestor()).Filter("rollerIssue =", fmt.Sprintf("%s_%d", roller, issue))
var results []*DsRoll
if _, err := ds.DS.GetAll(ctx, query, &results); err != nil {
return nil, err
}
if len(results) == 0 {
return nil, fmt.Errorf("Could not find issue %d", issue)
} else if len(results) == 1 {
var rv autoroll.AutoRollIssue
if err := gob.NewDecoder(bytes.NewReader(results[0].Data)).Decode(&rv); err != nil {
return nil, skerr.Wrap(err)
}
return &rv, nil
} else {
return nil, fmt.Errorf("Found more than one issue matching %d", issue)
}
}
// Put implements RollsDB.
func (d *DatastoreRollsDB) Put(ctx context.Context, roller string, roll *autoroll.AutoRollIssue) error {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(roll); err != nil {
return fmt.Errorf("Failed to encode roll: %s", err)
}
obj := &DsRoll{
Data: buf.Bytes(),
Roller: roller,
RollerCreated: fmt.Sprintf("%s_%s", roller, roll.Created.UTC().Format(util.RFC3339NanoZeroPad)),
RollerIssue: fmt.Sprintf("%s_%d", roller, roll.Issue),
}
key := ds.NewKey(ds.KIND_AUTOROLL_ROLL)
key.Name = obj.RollerIssue
key.Parent = fakeAncestor()
_, err := ds.DS.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
_, err := tx.Put(key, obj)
return err
})
if err != nil {
return fmt.Errorf("Failed to insert roll: %s", err)
}
return nil
}
// LoadRolls implements RollsDB.
func (d *DatastoreRollsDB) GetRolls(ctx context.Context, roller, cursor string) ([]*autoroll.AutoRollIssue, string, error) {
query := ds.NewQuery(ds.KIND_AUTOROLL_ROLL).Ancestor(fakeAncestor()).Filter("roller =", roller).Order("-rollerCreated").Limit(loadRollsPageSize)
if cursor != "" {
c, err := datastore.DecodeCursor(cursor)
if err != nil {
return nil, "", skerr.Wrap(err)
}
query = query.Start(c)
}
it := ds.DS.Run(ctx, query)
rv := make([]*autoroll.AutoRollIssue, 0, loadRollsPageSize)
var env DsRoll
for i := 0; i < loadRollsPageSize; i++ {
_, err := it.Next(&env)
if err == iterator.Done {
break
} else if err != nil {
return nil, "", skerr.Wrap(err)
}
roll := new(autoroll.AutoRollIssue)
if err := gob.NewDecoder(bytes.NewReader(env.Data)).Decode(roll); err != nil {
return nil, "", fmt.Errorf("Failed to decode roll: %s", err)
}
rv = append(rv, roll)
}
// Note: Unfortunately, datastore doesn't provide any indication that we've
// reached the end of the results for a query, aside from returning fewer
// results than the provided limit. This means that the client may have to
// perform a call which returns zero results before it's clear that they've
// retrieved all of the results.
rvCursor := ""
if len(rv) == loadRollsPageSize {
c, err := it.Cursor()
if err != nil {
return nil, "", skerr.Wrap(err)
}
rvCursor = c.String()
}
return rv, rvCursor, nil
}
var _ DB = &DatastoreRollsDB{}