blob: aaa731dc4de4eafcec4d101e9a8e89f8223d9943 [file] [log] [blame]
package bolt_failurestore
import (
"path/filepath"
"sync"
"github.com/boltdb/bolt"
"go.skia.org/infra/go/boltutil"
"go.skia.org/infra/go/util"
"go.skia.org/infra/golden/go/diff"
"go.skia.org/infra/golden/go/diffstore/common"
"go.skia.org/infra/golden/go/diffstore/failurestore"
"go.skia.org/infra/golden/go/types"
)
const (
// FAILUREDB_NAME is the name of the boltdb storing diff failures.
FAILUREDB_NAME = "diffstore_failures"
)
// digestFailureRec is a wrapper around diff.DigestFailure to implement the boltutil.Record interface.
type digestFailureRec struct {
*diff.DigestFailure
}
// Key see boltutil.Record interface.
func (d *digestFailureRec) Key() string {
return string(d.Digest)
}
// IndexValues see boltutil.Record interface. No index at this point.
func (d *digestFailureRec) IndexValues() map[string][]string {
return nil
}
// BoltImpl persists DigestFailures in boltDB database. It assumes that the
// number of unavailable digests is small and it keeps a copy of the entire list
// in memory at all time.
type BoltImpl struct {
// store stores the digests that have failed to load.
store *boltutil.IndexedBucket
// cachedFailures caches all failures for fast lookup.
cachedFailures map[types.Digest]*diff.DigestFailure
// dbMutex protects changes to the database.
dbMutex sync.Mutex
// cacheMutex protects cachedFailures.
cacheMutex sync.RWMutex
}
// New returns a new instance of BoltImpl that opens a database in the given directory.
func New(baseDir string) (*BoltImpl, error) {
baseDir = filepath.Join(baseDir, FAILUREDB_NAME)
db, err := common.OpenBoltDB(baseDir, FAILUREDB_NAME+".db")
if err != nil {
return nil, err
}
config := &boltutil.Config{
DB: db,
Name: FAILUREDB_NAME,
Indices: []string{},
Codec: util.JSONCodec(digestFailureRec{}),
}
store, err := boltutil.NewIndexedBucket(config)
if err != nil {
return nil, err
}
ret := &BoltImpl{
store: store,
}
if err := ret.loadDigestFailures(); err != nil {
return nil, err
}
return ret, nil
}
// UnavailableDigests returns the current list of unavailable digests for fast lookup.
func (f *BoltImpl) UnavailableDigests() map[types.Digest]*diff.DigestFailure {
f.cacheMutex.RLock()
defer f.cacheMutex.RUnlock()
return f.cachedFailures
}
// AddDigestFailureIfNew adds a digest failure to the database only if the
// there is no failure recorded for the given digest.
func (f *BoltImpl) AddDigestFailureIfNew(failure *diff.DigestFailure) error {
unavailable := f.UnavailableDigests()
if _, ok := unavailable[failure.Digest]; !ok {
return f.AddDigestFailure(failure)
}
return nil
}
// AddDigestFailure adds a digest failure to the database or updates an
// existing failure.
func (f *BoltImpl) AddDigestFailure(failure *diff.DigestFailure) error {
f.dbMutex.Lock()
defer f.dbMutex.Unlock()
inputRecs := []boltutil.Record{&digestFailureRec{DigestFailure: failure}}
updateFn := func(tx *bolt.Tx, result []boltutil.Record) error {
if result[0] != nil {
if result[0].(*digestFailureRec).TS >= failure.TS {
result[0] = nil
return nil
}
}
result[0] = inputRecs[0]
return nil
}
if err := f.store.Update(inputRecs, updateFn); err != nil {
return err
}
// Load the new failures into the cache.
return f.loadDigestFailures()
}
// PurgeDigestFailures removes the failures identified by digests from the database.
func (f *BoltImpl) PurgeDigestFailures(digests types.DigestSlice) error {
f.dbMutex.Lock()
defer f.dbMutex.Unlock()
targets := make([]string, 0, len(digests))
unavailable := f.UnavailableDigests()
for _, d := range digests {
if _, ok := unavailable[d]; ok {
targets = append(targets, string(d))
}
}
if len(targets) == 0 {
return nil
}
if err := f.store.Delete(targets); err != nil {
return err
}
return f.loadDigestFailures()
}
// loadDigestFailures loads all digest failures into memory and updates the current cache.
func (f *BoltImpl) loadDigestFailures() error {
allFailures, _, err := f.store.List(0, -1)
if err != nil {
return err
}
ret := make(map[types.Digest]*diff.DigestFailure, len(allFailures))
for _, rec := range allFailures {
failure := rec.(*digestFailureRec)
ret[failure.Digest] = failure.DigestFailure
}
f.cacheMutex.Lock()
defer f.cacheMutex.Unlock()
f.cachedFailures = ret
return nil
}
// Make sure BoltImpl fulfills the FailureStore interface
var _ failurestore.FailureStore = (*BoltImpl)(nil)