blob: 7e79a6a7d5ffba9a3b820ee14d2fee5241aa5396 [file] [log] [blame]
package diffstore
import (
"sync"
"github.com/boltdb/bolt"
"go.skia.org/infra/go/boltutil"
"go.skia.org/infra/go/util"
"go.skia.org/infra/golden/go/diff"
)
const (
// FAILUREDB_NAME is the name of the boltdb storing diff failures.
FAILUREDB_NAME = "diffstore_failures"
)
// digestFailureRec is a wrapper around diff.DigestFailure to implement the boltutil.Record interface.
type digestFailureRec struct {
*diff.DigestFailure
}
// Key see boltutil.Record interface.
func (d *digestFailureRec) Key() string {
return d.Digest
}
// IndexValues see boltutil.Record interface. No index at this point.
func (d *digestFailureRec) IndexValues() map[string][]string {
return nil
}
// failureStore persists DigestFailures in boltDB database. It assumes that the
// number of unavailable digests is small and it keeps a copy of the entire list
// in memory at all time.
type failureStore struct {
// store stores the digests that have failed to load.
store *boltutil.IndexedBucket
// cachedFailures caches all failures for fast lookup.
cachedFailures map[string]*diff.DigestFailure
// dbMutex protects changes to the database.
dbMutex sync.Mutex
// cacheMutex protects cachedFailures.
cacheMutex sync.RWMutex
}
// newFailureStore returns a new instance of failureStore that opens a database in the given directory.
func newFailureStore(baseDir string) (*failureStore, error) {
db, err := openBoltDB(baseDir, FAILUREDB_NAME+".db")
if err != nil {
return nil, err
}
config := &boltutil.Config{
DB: db,
Name: FAILUREDB_NAME,
Indices: []string{},
Codec: util.JSONCodec(digestFailureRec{}),
}
store, err := boltutil.NewIndexedBucket(config)
if err != nil {
return nil, err
}
ret := &failureStore{
store: store,
}
if err := ret.loadDigestFailures(); err != nil {
return nil, err
}
return ret, nil
}
// unavailableDigests returns the current list of unavailable digests for fast lookup.
func (f *failureStore) unavailableDigests() map[string]*diff.DigestFailure {
f.cacheMutex.RLock()
defer f.cacheMutex.RUnlock()
return f.cachedFailures
}
// addDigestFailureIfNew adds a digest failure to the database only if the
// there is no failure recorded for the given digest.
func (f *failureStore) addDigestFailureIfNew(failure *diff.DigestFailure) error {
unavailable := f.unavailableDigests()
if _, ok := unavailable[failure.Digest]; !ok {
return f.addDigestFailure(failure)
}
return nil
}
// addDigestFailure adds a digest failure to the database or updates an
// existing failure.
func (f *failureStore) addDigestFailure(failure *diff.DigestFailure) error {
f.dbMutex.Lock()
defer f.dbMutex.Unlock()
inputRecs := []boltutil.Record{&digestFailureRec{DigestFailure: failure}}
updateFn := func(tx *bolt.Tx, result []boltutil.Record) error {
if result[0] != nil {
if result[0].(*digestFailureRec).TS >= failure.TS {
result[0] = nil
return nil
}
}
result[0] = inputRecs[0]
return nil
}
if err := f.store.Update(inputRecs, updateFn); err != nil {
return err
}
// Load the new failures into the cache.
return f.loadDigestFailures()
}
// purgeDigestFailures removes the failures identified by digests from the database.
func (f *failureStore) purgeDigestFailures(digests []string) error {
f.dbMutex.Lock()
defer f.dbMutex.Unlock()
targets := make([]string, 0, len(digests))
unavailable := f.unavailableDigests()
for _, d := range digests {
if _, ok := unavailable[d]; ok {
targets = append(targets, d)
}
}
if len(targets) == 0 {
return nil
}
if err := f.store.Delete(digests); err != nil {
return err
}
return f.loadDigestFailures()
}
// loadDigestFailures loads all digest failures into memory and updates the current cache.
func (f *failureStore) loadDigestFailures() error {
allFailures, _, err := f.store.List(0, -1)
if err != nil {
return err
}
ret := make(map[string]*diff.DigestFailure, len(allFailures))
for _, rec := range allFailures {
failure := rec.(*digestFailureRec)
ret[failure.Digest] = failure.DigestFailure
}
f.cacheMutex.Lock()
defer f.cacheMutex.Unlock()
f.cachedFailures = ret
return nil
}