blob: ca5bc54a80e24560b2e5a2172a07456f370c0cf1 [file] [log] [blame]
// Package boltutil provides higher level primitives to work with bolt DB.
package boltutil
import (
"bytes"
"fmt"
"strings"
"github.com/boltdb/bolt"
"go.skia.org/infra/go/util"
)
const (
// Maximum length of the value for an index. This results from us storing the
// length in a two byte uint.
MAX_INDEX_VAL_LEN = 65535
// META_DATA_BUCKET_PREFIX is the prefix of the bucket name used for meta data.
META_DATA_BUCKET_PREFIX = "_meta_"
// META_DATA_KEY is the key under which meta data are stored.
META_DATA_KEY = "meta"
// BUILD_INDEX_BATCH_SIZE is the batchsize used when building new indices.
BUILD_INDEX_BATCH_SIZE = 100
)
// Record is the interface that has to be implemented by a client to store
// instance of a type in a IndexedBucket instance. It allows to extract relevant
// key information about each record.
type Record interface {
// Key returns the primary key of the record. The return value does not have
// to be a human readable string it can be any sequence of bytes. The
// returned string has to uniquely identify this instance.
Key() string
// IndexValues returns the values of the type for the attributes we want to
// index. The return value maps from the index names to the list of values for
// that index (= secondary key values): map[index_name][]{val_1, val_2, val_n}.
// Any key in the returned map must also be given in Indices field of the
// Config instance passed to NewStore(...).
// Note: Individual values for each index cannot be longer than 65535.
IndexValues() map[string][]string
}
// IndexedBucket embeds a bolt database and adds higher level functions to add and maintain
// indices. Indices allow to read records by keys other than the primary key.
// It aims at following the boltdb patterns and allows to still use the
// underlying bolt DB directly if necessary.
type IndexedBucket struct {
// BoltDB instance used by this store.
DB *bolt.DB
// indices is the set of indices this IndexedBucket maintains.
indices util.StringSet
// mainBucket is the name of the bucket where the records are stored.
mainBucket []byte
// codec provides functions to serialize and deserialize records.
codec util.LRUCodec
}
// Config contains the configuration values to set up a IndexedBucket instance.
type Config struct {
// DB is the boltdb instance to store the indexed bucket.
DB *bolt.DB
// Name is the primary name of the database.
Name string
// Indices is the list of indices that should be created for every record stored.
Indices []string
// Codec is used to serialize and deserialize records. It has to consume
// and produce instances that implement the Record interface.
Codec util.LRUCodec
}
// NewIndexedBucket returns a new instance of IndexedBucket. Since it uses an existing
// BoltDB instance, it is up to the caller to close it.
func NewIndexedBucket(config *Config) (*IndexedBucket, error) {
// Make sure there is not collision with any internal buckets, i.e. for meta data.
if config.Name == "" || strings.HasPrefix(config.Name, "_") {
return nil, fmt.Errorf("Bucket name cannot be empty or start with %q", "_")
}
ret := &IndexedBucket{
DB: config.DB,
indices: util.NewStringSet(config.Indices),
mainBucket: []byte(config.Name),
codec: config.Codec,
}
if err := ret.initBuckets(); err != nil {
return nil, err
}
return ret, nil
}
// Read reads the records identified by keys. The returned slice has the
// exact same number of elements as keys. If a record could not be found the
// corresponding entry in the return value will be nil.
func (ix *IndexedBucket) Read(keys []string) ([]Record, error) {
return ix.readRecs(keys, nil)
}
// Same as Read but use an existing transaction.
func (ix *IndexedBucket) ReadTx(tx *bolt.Tx, keys []string) ([]Record, error) {
return ix.readRecs(keys, tx)
}
// ReadIndex returns the record keys for the given index name and list of index values.
// The return value maps from index values (secondary key values) to a list of primary keys.
func (ix *IndexedBucket) ReadIndex(idx string, keys []string) (map[string][]string, error) {
return ix.ReadIndexTx(nil, idx, keys)
}
// ReadIndexTx does the same as ReadIndex but uses an existing transaction.
func (ix *IndexedBucket) ReadIndexTx(tx *bolt.Tx, idx string, keys []string) (map[string][]string, error) {
if _, ok := ix.indices[idx]; !ok {
return nil, fmt.Errorf("Invalid index: %q", idx)
}
ret := make(map[string][]string, len(keys))
viewFn := func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(idx))
for _, key := range keys {
ret[key] = ix.indexLookup(bucket, key)
}
return nil
}
var err error
if tx != nil {
err = viewFn(tx)
} else {
err = ix.DB.View(viewFn)
}
return ret, err
}
// ReadRaw reads the bytes for a given key. If the key does not exist
// the returned byte slice is nil.
func (ix *IndexedBucket) ReadRaw(key string) ([]byte, error) {
// Key cannot be empty.
if key == "" {
return nil, nil
}
var ret []byte = nil
viewFn := func(tx *bolt.Tx) error {
bucket := tx.Bucket(ix.mainBucket)
contentBytes := bucket.Get([]byte(key))
if contentBytes != nil {
// If we have data make a copy.
ret = append(ret, contentBytes...)
}
return nil
}
err := ix.DB.View(viewFn)
return ret, err
}
// WriteFn is the type of the callback function that can be handed to the
// Write method to update existing records before they are overwritten with a
// new instance. The passed records are the current records in the database
// (before new values are written). The WriteFn can then modify records in place
// and they will be written to disk. To prevent a record from being written back
// into the database it should set the entry to nil.
type WriteFn func(*bolt.Tx, []Record) error
// Update allows to update records in the database by following these steps:
// - read existing records from the database, identified by the Key() value
// of inputRecs.
// - call the writeFn to allow the client to update the records. If it does
// not want a record to be updated it will set its entry to nil.
func (ix *IndexedBucket) Update(inputRecs []Record, writeFn WriteFn) error {
return ix.writeRecs(inputRecs, writeFn, nil)
}
// UpdateTx is the same as Update but re-uses an existing transaction.
func (ix *IndexedBucket) UpdateTx(tx *bolt.Tx, inputRecs []Record, writeFn WriteFn) error {
return ix.writeRecs(inputRecs, writeFn, tx)
}
func (ix *IndexedBucket) Insert(recs []Record) error {
return ix.writeRecs(recs, nil, nil)
}
// writeRecs writes given records with the given transaction or by creating a new transaction.
// See Write(...) for details how it works.
func (ix *IndexedBucket) writeRecs(inputRecs []Record, writeFn WriteFn, tx *bolt.Tx) error {
if len(inputRecs) == 0 {
return nil
}
// Extract the keys and make sure they are unique and not empty.
uniqueKeys := make(util.StringSet, len(inputRecs))
keys := make([]string, len(inputRecs), len(inputRecs))
for i, rec := range inputRecs {
uniqueKeys[rec.Key()] = true
keys[i] = rec.Key()
}
if (len(uniqueKeys) != len(inputRecs)) || uniqueKeys[""] {
return fmt.Errorf("Keys of input records have to be unique and cannot be empty.")
}
updateFn := func(tx *bolt.Tx) error {
foundRecs, err := ix.readRecs(keys, tx)
if err != nil {
return err
}
// Capture the index values before they are changed.
origIndexState, err := ix.getIndexState(foundRecs)
if err != nil {
return err
}
writeRecs := inputRecs
if writeFn != nil {
if err := writeFn(tx, foundRecs); err != nil {
return err
}
writeRecs = foundRecs
// Make sure the keys are still in the same order as before.
for i, writeRec := range writeRecs {
if (writeRec != nil) && (inputRecs[i].Key() != writeRec.Key()) {
return fmt.Errorf("Order of keys in the input to Update needs to match the order of keys after call to write function.")
}
}
}
// TODO(stephana): Make this parallel since encoding can be CPU intensive.
// Get the main bucket and write the entries.
bucket := tx.Bucket(ix.mainBucket)
for _, rec := range writeRecs {
if rec != nil {
contentBytes, err := ix.codec.Encode(rec)
if err != nil {
return err
}
if err := bucket.Put([]byte(rec.Key()), contentBytes); err != nil {
return err
}
}
}
if err := ix.updateIndices(tx, writeRecs, origIndexState, ix.indices); err != nil {
return err
}
return nil
}
// If we were given a transaction then use it otherwise create a new one.
if tx != nil {
return updateFn(tx)
}
return ix.DB.Update(updateFn)
}
// Delete deletes the records identified by keys from the database and updates
// the indices accordingly.
func (ix *IndexedBucket) Delete(keys []string) error {
return ix.deleteRecs(keys, nil)
}
// DeleteTx does the same as Delete but uses an existing transaction.
func (ix *IndexedBucket) DeleteTx(tx *bolt.Tx, keys []string) error {
return ix.deleteRecs(keys, tx)
}
// ReIndex rebuilds the indices that were specified when the instance was created.
// This will block until all indices are rebuild.
func (ix *IndexedBucket) ReIndex() error {
updateFn := func(tx *bolt.Tx) error {
// Build all indices. This will remove the indices before bulding if they exist.
return ix.buildIndices(tx, ix.indices)
}
return ix.DB.Update(updateFn)
}
// deleteRecs deletes the records identified by keys. It uses tx if it's not
// nil or creates a new transaction otherwise.
func (ix *IndexedBucket) deleteRecs(keys []string, tx *bolt.Tx) error {
updateFn := func(tx *bolt.Tx) error {
entries, err := ix.readRecs(keys, tx)
if err != nil {
return err
}
// TODO(stephana): Combine the above readRecs step with the delete
// step to avoid having to traverse the keyspace twice. Use a cursor.
// Get the main bucket and delete the records from it.
bucket := tx.Bucket(ix.mainBucket)
for _, key := range keys {
if err := bucket.Delete([]byte(key)); err != nil {
return err
}
}
if err := ix.deleteIndices(tx, entries); err != nil {
return err
}
return nil
}
if tx != nil {
return updateFn(tx)
}
return ix.DB.Update(updateFn)
}
// TODO(stephana): Revisit the signature of List and how it is implemented
// based on the use cases.
// List returns a list of records sorted in ascending order by the primary
// key starting with record at offset. If size <= 0 all records will be
// returned.
// Note: This can be take a long time for large values of offset and/or size.
// Results might not be consistent across multiple calls since they might
// be interleaved with write operations.
func (ix *IndexedBucket) List(offset, size int) ([]Record, int, error) {
total := 0
var ret []Record = nil
viewFn := func(tx *bolt.Tx) error {
bucket := tx.Bucket(ix.mainBucket)
nEntries := bucket.Stats().KeyN
// If size <= 0 then we want all entries.
if size <= 0 {
size = nEntries
}
resultSize := util.MaxInt(0, util.MinInt(nEntries-offset, size))
if resultSize == 0 {
ret = []Record{}
return nil
}
cursor := bucket.Cursor()
_, val := cursor.First()
// TODO(stephana): This is definitely inefficient, but there is no
// option in boltdb to address a specific element by index. If this is
// too slow we can add an additional pseudo index for issue ids.
// Skip the first entries if offset > 0.
for i := 0; i < offset; i++ {
_, val = cursor.Next()
}
allEntries := make([]Record, resultSize, resultSize)
var err error
var iRec interface{}
// Note: We are guaranteed to have resultSize records in the database.
for i := 0; i < resultSize; i++ {
if iRec, err = ix.codec.Decode(val); err != nil {
return err
}
allEntries[i] = iRec.(Record)
_, val = cursor.Next()
}
// Success assign the results now.
ret = allEntries
total = nEntries
return nil
}
err := ix.DB.View(viewFn)
return ret, total, err
}
// readRecs reads the records identified by keys. It returns an array
// of records with the same length as the keys array. If a record cannot be
// found the corresponding value is nil.
func (ix *IndexedBucket) readRecs(keys []string, tx *bolt.Tx) ([]Record, error) {
var ret []Record = nil
viewFn := func(tx *bolt.Tx) error {
bucket := tx.Bucket(ix.mainBucket)
ret = make([]Record, len(keys), len(keys))
// TODO(stephana): Make this parallel since decoding can be CPU intensive.
for i, key := range keys {
contentBytes := bucket.Get([]byte(key))
if contentBytes != nil {
rec, err := ix.codec.Decode(contentBytes)
if err != nil {
return err
}
ret[i] = rec.(Record)
}
}
return nil
}
// if a transaction was supplied then use it.
var err error
if tx != nil {
err = viewFn(tx)
} else {
err = ix.DB.View(viewFn)
}
return ret, err
}
// indexLookup returns the primary keys for the given (bucket,key) pair.
// The bucket is assumed to contain an index.
func (ix *IndexedBucket) indexLookup(bucket *bolt.Bucket, key string) []string {
searchPrefix := []byte(key)
searchPrefixLen := len(searchPrefix)
ret := []string{}
cursor := bucket.Cursor()
for k, _ := cursor.Seek(searchPrefix); bytes.HasPrefix(k, searchPrefix); k, _ = cursor.Next() {
keyPrefixLen, val := fromIndexEntry(k)
// Make sure the index value matches the target exactly.
if keyPrefixLen != searchPrefixLen {
continue
}
ret = append(ret, val)
}
return ret
}
// getIndexState captures the index values for the records in the given list.
// It returns an array where each element maps to the corresponding element
// in the input array.
func (ix *IndexedBucket) getIndexState(recs []Record) ([]map[string][]string, error) {
ret := make([]map[string][]string, len(recs), len(recs))
var err error
for i, rec := range recs {
if rec != nil {
if ret[i], err = ix.getIndexValues(rec, ix.indices); err != nil {
return nil, err
}
}
}
return ret, nil
}
// getIndexValues extracts the index values from rec and returns a deep copy.
func (ix *IndexedBucket) getIndexValues(rec Record, targetIndices util.StringSet) (map[string][]string, error) {
iv := rec.IndexValues()
ret := make(map[string][]string, len(iv))
for idxName, values := range iv {
if _, ok := ix.indices[idxName]; !ok {
return nil, fmt.Errorf("Unknown index name: %s. All indices returned by IndexValues must be listed in initial Config.", idxName)
}
// Make sure the index values do not exceed the maximum length.
for _, val := range values {
if len(val) > MAX_INDEX_VAL_LEN {
return nil, fmt.Errorf("Value for index %s too long. Cannot exceed %d.", idxName, MAX_INDEX_VAL_LEN)
}
}
// Only consider the indices in targetIndices.
if !targetIndices[idxName] {
continue
}
ret[idxName] = append([]string(nil), values...)
}
return ret, nil
}
// toIndexEntry concatenates the index value (the value for which we wish to search)
// and parent id (primary key) and the length of the index value. This allows
// to separate them without inserting any kind of separator. By appending the
// length at the end, we are still able to use the result in a prefix scan.
func toIndexEntry(indexVal string, parentID string) string {
len1, len2 := len(indexVal), len(parentID)
ret := make([]byte, len1+len2+2, len1+len2+2)
copy(ret, indexVal)
copy(ret[len1:], parentID)
ret[len1+len2] = byte(len1 >> 8)
ret[len1+len2+1] = byte(len1)
return string(ret)
}
// fromIndexEntry splits a byte slice generated by the toIndexEntry and returns
// the primary key value and the length of the index value. We ignore the
// prefixed index value under the assumption that this entry has already been
// chosen because of having a specific prefix.
func fromIndexEntry(entry []byte) (int, string) {
first := len(entry) - 2
keyLen := (int(entry[first]) << 8) | int(entry[first+1])
ret := append([]byte(nil), entry[keyLen:first]...)
return keyLen, string(ret)
}
// updateIndices updates the indices the given records
func (ix *IndexedBucket) updateIndices(tx *bolt.Tx, recs []Record, baseState []map[string][]string, indices util.StringSet) error {
addChanges := make(map[string][]string, len(indices))
delChanges := make(map[string][]string, len(indices))
var indexVals map[string][]string
var err error
for i, rec := range recs {
// rec != nil indicates that this record will change in the database and
// we need to update the index.
if rec != nil {
parentId := rec.Key()
if indexVals, err = ix.getIndexValues(rec, indices); err != nil {
return err
}
for idxName, indexVals := range indexVals {
newVals := util.NewStringSet(indexVals)
oldVals := util.NewStringSet(baseState[i][idxName])
for indexVal := range newVals.Complement(oldVals) {
addChanges[idxName] = append(addChanges[idxName], toIndexEntry(indexVal, parentId))
}
for indexVal := range oldVals.Complement(newVals) {
delChanges[idxName] = append(delChanges[idxName], toIndexEntry(indexVal, parentId))
}
}
}
}
// Delete all entries that need to be deleted.
if err := writeIndexChanges(tx, delChanges, deleteIndexOp); err != nil {
return err
}
// Add all the entries that need to be added.
return writeIndexChanges(tx, addChanges, addIndexOp)
}
// deleteIndices deletes the indices associated with the the given records.
func (ix *IndexedBucket) deleteIndices(tx *bolt.Tx, entries []Record) error {
changes := make(map[string][]string, len(ix.indices))
var idxValsMap map[string][]string
var err error
for _, entry := range entries {
if entry != nil {
parentId := entry.Key()
if idxValsMap, err = ix.getIndexValues(entry, ix.indices); err != nil {
return err
}
for idxName, indexVals := range idxValsMap {
for _, indexVal := range indexVals {
changes[idxName] = append(changes[idxName], toIndexEntry(indexVal, parentId))
}
}
}
}
return writeIndexChanges(tx, changes, deleteIndexOp)
}
// metaData captures the meta data that are stored for each IndexedBucket.
type metaData struct {
// Version is for future use when the bucket layout changes.
Version int
// Indices contains the indices currently maintained for a bucket.
Indices []string
}
// initBuckets makes sure all needed buckets exist and the configured indices
// are correct in the database. If indices were added or removed it will
// update the database accordingly.
func (ix *IndexedBucket) initBuckets() error {
return ix.DB.Update(func(tx *bolt.Tx) error {
// Create the main bucket if it does not exist.
if _, err := tx.CreateBucketIfNotExists(ix.mainBucket); err != nil {
return err
}
var removeIndices util.StringSet = nil
var addIndices util.StringSet = nil
// Get the metadata bucket and load the meta data.
metaBucketName := []byte(META_DATA_BUCKET_PREFIX + string(ix.mainBucket))
codec := util.JSONCodec(&metaData{})
bucket := tx.Bucket(metaBucketName)
var err error
// If the bucket doesn't exist or is empty, we create all indices.
if bucket == nil {
if bucket, err = tx.CreateBucket(metaBucketName); err != nil {
return err
}
addIndices = ix.indices
} else if metaBytes := bucket.Get([]byte(META_DATA_KEY)); metaBytes == nil {
addIndices = ix.indices
} else {
// deserialize the meta data.
iMeta, err := codec.Decode(metaBytes)
if err != nil {
return err
}
metaRec := iMeta.(*metaData)
// Find the indices that need to be added or removed.
existingIndices := util.NewStringSet(metaRec.Indices)
addIndices = ix.indices.Complement(existingIndices)
removeIndices = existingIndices.Complement(ix.indices)
}
// Delete all unnecessary indices.
for idxName := range removeIndices {
if err := tx.DeleteBucket([]byte(idxName)); err != nil {
return err
}
}
// Add the necessary indices.
if len(addIndices) > 0 {
if err := ix.buildIndices(tx, addIndices); err != nil {
return err
}
}
// Write the meta data.
metaRec := &metaData{Version: 1, Indices: ix.indices.Keys()}
metaBytes, err := codec.Encode(metaRec)
if err != nil {
return err
}
return bucket.Put([]byte(META_DATA_KEY), metaBytes)
})
}
// buildIndices iterates over all records and creates the provided indices.
func (ix *IndexedBucket) buildIndices(tx *bolt.Tx, indices util.StringSet) error {
// Clear the indices first to be in a well defined state.
for idxName := range indices {
bucket := tx.Bucket([]byte(idxName))
if bucket != nil {
if err := tx.DeleteBucket([]byte(idxName)); err != nil {
return err
}
}
if _, err := tx.CreateBucket([]byte(idxName)); err != nil {
return err
}
}
// iterate over the entire database and build the indices.
recBatch := make([]Record, 0, BUILD_INDEX_BATCH_SIZE)
emptyBaseState := make([]map[string][]string, BUILD_INDEX_BATCH_SIZE, BUILD_INDEX_BATCH_SIZE)
c := tx.Bucket(ix.mainBucket).Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
rec, err := ix.codec.Decode(v)
if err != nil {
return fmt.Errorf("Error while (re)building indices, while reading record with key %q: %s", string(k), err)
}
recBatch = append(recBatch, rec.(Record))
if len(recBatch) >= BUILD_INDEX_BATCH_SIZE {
if err := ix.updateIndices(tx, recBatch, emptyBaseState[:len(recBatch)], indices); err != nil {
return err
}
// Note: This might prevent some instances of Record from being garbage collected
// right away which is accpetable in this case.
recBatch = recBatch[:0]
}
}
// Process any records that haven't been indexed yet.
if len(recBatch) > 0 {
if err := ix.updateIndices(tx, recBatch, emptyBaseState[:len(recBatch)], indices); err != nil {
return err
}
}
return nil
}
// indexOp is a helper type that allows to define different operations for writeIndexChanges.
type indexOp func(*bolt.Bucket, []byte) error
// deleteIndexOp is the operation used to delete from the indices.
func deleteIndexOp(bucket *bolt.Bucket, k []byte) error {
return bucket.Delete(k)
}
// addIndexOp is the operation used to add to an index.
func addIndexOp(bucket *bolt.Bucket, k []byte) error {
return bucket.Put(k, []byte{})
}
// writeIndexChanges writes changes to indices.
// changes map[indexName][]indexValues
func writeIndexChanges(tx *bolt.Tx, changes map[string][]string, op indexOp) error {
for idxName, indexEntries := range changes {
bucket := tx.Bucket([]byte(idxName))
for _, entry := range indexEntries {
if err := op(bucket, []byte(entry)); err != nil {
return err
}
}
}
return nil
}