blob: 8c7edb7963005b5b2ffb05aac0157ea40d49b631 [file] [log] [blame]
package diffstore
import (
const (
// METRICSDB_NAME is the name of the boltdb caching diff metrics.
METRICSDB_NAME = "diffstore_metrics"
// METRICS_DIGEST_INDEX is the index name to keep track of digests in the metrics db.
METRICS_DIGEST_INDEX = "metric_digest_index"
var (
// metricsRecIndices are the indices supported by the metricsRec type.
metricsRecIndices = []string{METRICS_DIGEST_INDEX}
// metricsStore stores diff metrics on disk.
type metricsStore struct {
// store stores the diff metrics in a boltdb database.
store *boltutil.IndexedBucket
// codec is used to encode/decode the DiffMetrics field of a metricsRec struct
codec util.LRUCodec
// TODO(stephana): Remove mapper field once we don't need the legacy code anymore.
// mapper is an instance of DiffStoreMapper.
mapper DiffStoreMapper
// factory acts as the codec for metrics and is used to create instances of metricsRec.
factory *metricsRecFactory
// metricsRec implements the boltutil.Record interface.
type metricsRec struct {
ID string `json:"id"`
DiffMetrics []byte
// Split function that is configurable and injected by metricsRecFactory.
splitFn func(string) (string, string)
// Key see the boltutil.Record interface.
func (m *metricsRec) Key() string {
return m.ID
// IndexValues see the boltutil.Record interface.
func (m *metricsRec) IndexValues() map[string][]string {
d1, d2 := m.splitFn(m.ID)
return map[string][]string{METRICS_DIGEST_INDEX: {d1, d2}}
// metricsRecFactory creates instances of metricsRec and also acts as the
// codec to serialize and deserialize instances of metricsRec. In both
// cases it injects the split function that is configurable.
type metricsRecFactory struct {
util.LRUCodec // underlying codec to (de)serialize metricsRecs.
splitFn func(string) (string, string) // split function injected into metricsRec instances.
// newRec creates a new instance of metricsRec injecting the split function.
func (m *metricsRecFactory) newRec(id string, diffMetrics []byte) *metricsRec {
return &metricsRec{
ID: id,
DiffMetrics: diffMetrics,
splitFn: m.splitFn,
// Decode overrides the Decode function in LRUCodec.
func (m *metricsRecFactory) Decode(data []byte) (interface{}, error) {
ret, err := m.LRUCodec.Decode(data)
if err != nil {
return nil, err
ret.(*metricsRec).splitFn = m.splitFn
return ret, nil
// newMetricsStore returns a new instance of metricsStore.
func newMetricsStore(baseDir string, mapper DiffStoreMapper, codec util.LRUCodec) (*metricsStore, error) {
db, err := openBoltDB(baseDir, METRICSDB_NAME+".db")
if err != nil {
return nil, err
// instantiate metricsRecFactory which acts as codec and factory for metricsRec instances.
factoryCodec := &metricsRecFactory{
LRUCodec: util.JSONCodec(&metricsRec{}),
splitFn: func(toSplit string) (string, string) {
a, b := mapper.SplitDiffID(toSplit)
return string(a), string(b)
config := &boltutil.Config{
DB: db,
Indices: metricsRecIndices,
Codec: factoryCodec,
store, err := boltutil.NewIndexedBucket(config)
if err != nil {
return nil, err
return &metricsStore{
store: store,
codec: codec,
mapper: mapper,
factory: factoryCodec,
}, nil
// loadDiffMetrics loads diff metrics from disk.
func (m *metricsStore) loadDiffMetrics(id string) (interface{}, error) {
recs, err :=[]string{id})
if err != nil {
return nil, err
if recs[0] == nil {
return nil, nil
// TODO(stephana): Remove the database guard below when we don't need it anymore.
// get the record and check if it's a legacy entry.
rec := recs[0].(*metricsRec)
if (len(rec.DiffMetrics) == 0) || strings.Contains(id, ":") {
if diffMetrics := m.fixLegacyRecord(id, rec.DiffMetrics); diffMetrics != nil {
return diffMetrics, nil
// Deserialize the byte array representing the DiffMetrics.
diffMetrics, err := m.codec.Decode(recs[0].(*metricsRec).DiffMetrics)
if err != nil {
return nil, err
return diffMetrics, nil
// saveDiffMetrics stores diff metrics to disk.
func (m *metricsStore) saveDiffMetrics(id string, diffMetrics interface{}) error {
// Serialize the diffMetrics.
bytes, err := m.codec.Encode(diffMetrics)
if err != nil {
return err
if len(bytes) == 0 {
return fmt.Errorf("Got empty string for encoded diff metric.")
rec := m.factory.newRec(id, bytes)
// purgeDiffMetrics removes all diff metrics based on specific digests.
func (m *metricsStore) purgeDiffMetrics(digests types.DigestSlice) error {
updateFn := func(tx *bolt.Tx) error {
metricIDMap, err :=, METRICS_DIGEST_INDEX, asStrings(digests))
if err != nil {
return err
metricIds := util.StringSet{}
for _, ids := range metricIDMap {
return, metricIds.Keys())
// legacyMetricsRec is the old format of the metrics records. Used by the
// db guard below.
type legacyMetricsRec struct {
ID string `json:"id"`
func (m *metricsStore) fixLegacyRecord(id string, recBytes []byte) *diff.DiffMetrics {
var newRec *diff.DiffMetrics = nil
// If we have data bytes then we just have to deserialize.
if len(recBytes) > 0 {
diffMetrics, err := m.codec.Decode(recBytes)
if err != nil {
sklog.Errorf("Error decoding diffMetrics rec: %s", err)
return nil
newRec = diffMetrics.(*diff.DiffMetrics)
// If we don't have record then try and parse it from the raw record.
if newRec == nil {
// Read the bytes from the db.
contentBytes, err :=
if err != nil {
sklog.Errorf("Error reading raw legacy record: %s", err)
return nil
legRec := legacyMetricsRec{}
if err := json.Unmarshal(contentBytes, &legRec); err != nil {
sklog.Errorf("Unable to decode legacy error: %s", err)
return nil
// Use simple heuristic to figure whether we have a record.
if len(legRec.DiffMetrics.MaxRGBADiffs) != 4 {
sklog.Errorf("Did not get a valid legacy diff metrics record.")
return nil
newRec = legRec.DiffMetrics
// Regenerate the diffID to filter out the old format.
newID := m.mapper.DiffID(m.mapper.SplitDiffID(id))
// Write the new record to the database in the background.
go func() {
defer func() {
if r := recover(); r != nil {
sklog.Errorf("Recovered panic for id(%s): %s", id, r)
if err := m.saveDiffMetrics(newID, newRec); err != nil {
sklog.Errorf("Error writing legacy record to DB: %s", err)
if newID != id {
// Remove the old record, since that's the only way to change a key.
if err :=[]string{id}); err != nil {
sklog.Errorf("Error deleting legacy record %s: %s", id, err)
sklog.Infof("Legacy database record (%s -> %s) written to the database.", id, newID)
return newRec
// convertDatabaseFromLegacy iterates over the entire database in the background
// and loads every entry, implicitly forcing a conversion to the new serialization format.
func (m *metricsStore) convertDatabaseFromLegacy() {
go func() {
defer func() {
if r := recover(); r != nil {
sklog.Errorf("Recovered panic: %s", r)
ids, err := m.listIDs()
if err != nil {
sklog.Errorf("Unable to get the database ids. Got error: %s", err)
sklog.Infof("Processing %d diffmetric records.", len(ids))
for _, id := range ids {
// The call to loadDiffMetrics will also convert a legacy record to the new record if necessary.
if _, err := m.loadDiffMetrics(id); err != nil {
sklog.Errorf("Error trying to convert legacy record: %s", err)
sklog.Infof("Legacy conversion: Loaded %d records.", len(ids))
if err :=; err != nil {
sklog.Errorf("Error re-indexing data store: %s", err)
sklog.Infof("Legacy conversion completed.")
// listIDs returns a slice with all the ids in the database.
func (m *metricsStore) listIDs() ([]string, error) {
ret := []string{}
viewFn := func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(METRICSDB_NAME))
if b == nil {
return nil
ret = make([]string, 0, b.Stats().KeyN)
c := b.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
ret = append(ret, string(append([]byte(nil), k...)))
return nil
if err :=; err != nil {
return nil, err
return ret, nil