| // Package pstracestore is a database for Perf data. |
| package ptracestore |
| |
| import ( |
| "bytes" |
| "encoding/binary" |
| "errors" |
| "fmt" |
| "os" |
| "path/filepath" |
| "sync" |
| "time" |
| |
| "github.com/boltdb/bolt" |
| "github.com/golang/groupcache/lru" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/timer" |
| "go.skia.org/infra/go/util" |
| "go.skia.org/infra/go/vec32" |
| "go.skia.org/infra/perf/go/cid" |
| "go.skia.org/infra/perf/go/constants" |
| ) |
| |
| const ( |
| MAX_CACHED_TILES = 20 |
| |
| TRACE_VALUES_BUCKET_NAME = "traces" |
| TRACE_SOURCES_BUCKET_NAME = "sources" |
| SOURCE_LIST_BUCKET_NAME = "sourceList" |
| ) |
| |
| var ( |
| // tileNotExist is returned from getBoltDB only if 'readonly' is true and |
| // the tile doesn't exist. |
| tileNotExist = errors.New("Tile does not exist.") |
| ) |
| |
| // Trace is just a slice of float32s. |
| type Trace []float32 |
| |
| // NewTrace returns a Trace of length 'traceLen' initialized to vec32.MISSING_DATA_SENTINEL. |
| func NewTrace(traceLen int) Trace { |
| ret := make([]float32, traceLen) |
| for i := range ret { |
| ret[i] = vec32.MISSING_DATA_SENTINEL |
| } |
| return ret |
| } |
| |
| // TraceSet is a set of Trace's, keyed by trace id. |
| type TraceSet map[string]Trace |
| |
| // Progress is a func that is called as Match works, passing in the steps |
| // completed and the total number of steps, where a 'step' is applying a query |
| // to a single tile. |
| type Progress func(step, totalSteps int) |
| |
| // KeyMatches is a func that returns true if a key matches some criteria. |
| // Passed to Match(). |
| type KeyMatches func(key string) bool |
| |
| // PTraceStore is an interface for storing Perf data. |
| // |
| // PTraceStore doesn't know anything about git hashes or Rietveld issue IDs, |
| // that will be handled at a level above this. |
| // |
| // TODO(jcgregorio) How to list all the Sources? |
| type PTraceStore interface { |
| // Add new values to the datastore at the given commitID. |
| // |
| // values - A map from the trace id to a float32 value. |
| // sourceFile - The full path of the file where this information came from, |
| // usually the Google Storage URL. |
| Add(commitID *cid.CommitID, values map[string]float32, sourceFile string) error |
| |
| // Retrieve the source and value for a given measurement in a given trace, |
| // and a non-nil error if no such point was found. |
| Details(commitID *cid.CommitID, traceID string) (string, float32, error) |
| |
| // Match returns TraceSet that match the given Query and slice of cid.CommitIDs. |
| // |
| // The 'progess' callback will be called as each Tile is processed. |
| // |
| // The returned TraceSet will contain a slice of Trace, and that list will be |
| // empty if there are no matches. |
| Match(commitIDs []*cid.CommitID, matches KeyMatches, progress Progress) (TraceSet, error) |
| } |
| |
| // BoltTraceStore is an implementation of PTraceStore that uses BoltDB. |
| type BoltTraceStore struct { |
| // mutex protects access to cache. |
| mutex sync.Mutex |
| |
| // cache is a cache of opened tiles. |
| cache *lru.Cache |
| |
| // queryCache is a cache of opened tiles used for queries, i.e. for Match and |
| // Details calls. |
| queryCache *lru.Cache |
| |
| // dir is the directory where tiles are stored. |
| dir string |
| } |
| |
| // cacheEntry is what's stored in the BoltTraceStore cache. |
| // |
| // The cacheEntry holds a bolt.DB and a WaitGroup. Every time the cacheEntry is |
| // requested from getBoltDB the wg is incremented. If the entry gets evicted |
| // from the cache we wait for the 'wg' to reach zero before closing the BoltDB, |
| // since there may be transactions still running against the DB. |
| type cacheEntry struct { |
| db *bolt.DB |
| wg sync.WaitGroup |
| } |
| |
| func (c *cacheEntry) Done() { |
| c.wg.Done() |
| } |
| |
| // closer is a callback we pass to the lru cache to close bolt.DBs once they've |
| // been evicted from the cache. |
| // |
| // The call to entry.wg.Wait() is safe because the BoltTraceStore cache is |
| // protected by a mutex, and lru.Cache is single threaded, so no adds can |
| // happen concurrently. Only during an add will an entry be evicted from the |
| // cache, and during that eviction we can wait for the wg to decrease to 0, |
| // which is guaranteed since the cache is locked. |
| func closer(key lru.Key, value interface{}) { |
| if entry, ok := value.(*cacheEntry); ok { |
| sklog.Infof("Waiting: %v", key) |
| entry.wg.Wait() |
| sklog.Infof("Closing: %v", key) |
| util.Close(entry.db) |
| } else { |
| sklog.Errorf("Found a non-bolt.DB in the cache at key %q", key) |
| } |
| } |
| |
| // New creates a new BoltTraceStore that stores tiles in the given directory. |
| func New(dir string) (*BoltTraceStore, error) { |
| cache := lru.New(MAX_CACHED_TILES) |
| cache.OnEvicted = closer |
| |
| queryCache := lru.New(MAX_CACHED_TILES) |
| queryCache.OnEvicted = closer |
| |
| if err := os.MkdirAll(dir, 0755); err != nil { |
| return nil, fmt.Errorf("Failed to create %q for ptracestore: %s", dir, err) |
| } |
| |
| return &BoltTraceStore{ |
| dir: dir, |
| cache: cache, |
| queryCache: queryCache, |
| }, nil |
| } |
| |
| // traceValue is used to encode/decode trace values. |
| type traceValue struct { |
| Index int64 |
| Value float32 |
| } |
| |
| // sourceValue is used to encode/decode trace sources. |
| type sourceValue struct { |
| Index int64 |
| Source uint64 |
| } |
| |
| // getBoltDB returns a new/existing bolt.DB. Already opened db's are cached. |
| // |
| // If 'readonly' is true then getBoltDB will fail with a tileNotExist error |
| // instead of creating a new DB at that location. |
| // |
| // Calls must call Done() on the returned cacheEntry when they are done using it. |
| func (b *BoltTraceStore) getBoltDB(commitID *cid.CommitID, readonly bool) (*cacheEntry, error) { |
| b.mutex.Lock() |
| defer b.mutex.Unlock() |
| name := commitID.Filename() |
| // Look for tile in the cache. |
| if ientry, ok := b.cache.Get(name); ok { |
| if entry, ok := ientry.(*cacheEntry); ok { |
| entry.wg.Add(1) |
| return entry, nil |
| } |
| } |
| |
| // Look for tile in the queryCache. |
| if ientry, ok := b.queryCache.Get(name); ok { |
| if entry, ok := ientry.(*cacheEntry); ok { |
| entry.wg.Add(1) |
| return entry, nil |
| } |
| } |
| |
| filename := filepath.Join(b.dir, commitID.Filename()) |
| if _, err := os.Stat(filename); os.IsNotExist(err) && readonly { |
| return nil, tileNotExist |
| } |
| db, err := bolt.Open(filename, 0600, &bolt.Options{Timeout: 5 * time.Second}) |
| if err != nil { |
| return nil, fmt.Errorf("Unable to open %q: %s", filename, err) |
| } |
| entry := &cacheEntry{ |
| db: db, |
| wg: sync.WaitGroup{}, |
| } |
| entry.wg.Add(1) |
| |
| if readonly { |
| b.queryCache.Add(name, entry) |
| } else { |
| b.cache.Add(name, entry) |
| } |
| return entry, nil |
| } |
| |
| func uint64ToBytes(u uint64) []byte { |
| b := make([]byte, 8, 8) |
| binary.LittleEndian.PutUint64(b, u) |
| return b |
| } |
| |
| func serialize(i interface{}) ([]byte, error) { |
| buf := &bytes.Buffer{} |
| err := binary.Write(buf, binary.LittleEndian, i) |
| if err != nil { |
| return nil, fmt.Errorf("binary.Write of value failed: %s", err) |
| } |
| return buf.Bytes(), nil |
| } |
| |
| func (b *BoltTraceStore) Add(commitID *cid.CommitID, values map[string]float32, sourceFile string) error { |
| sklog.Infof("Ingesting source file: %q", sourceFile) |
| index := commitID.Offset % constants.COMMITS_PER_TILE |
| entry, err := b.getBoltDB(commitID, false) |
| if err != nil { |
| return fmt.Errorf("Unable to open datastore: %s", err) |
| } |
| defer entry.Done() |
| |
| var lastSourceIndex uint64 |
| // Add the source and get its index. |
| addSource := func(tx *bolt.Tx) error { |
| t, err := tx.CreateBucketIfNotExists([]byte(SOURCE_LIST_BUCKET_NAME)) |
| if err != nil { |
| return fmt.Errorf("Failed to get bucket: %s", err) |
| } |
| lastSourceIndex, err = t.NextSequence() |
| if err != nil { |
| return fmt.Errorf("Failed to get source index: %s", err) |
| } |
| sklog.Infof("lastSourceIndex: %d", lastSourceIndex) |
| |
| // Write the source. |
| if err := t.Put(uint64ToBytes(lastSourceIndex), []byte(sourceFile)); err != nil { |
| return fmt.Errorf("Failed to write the source file: %s", err) |
| } |
| return nil |
| } |
| |
| if err := entry.db.Update(addSource); err != nil { |
| return fmt.Errorf("Error while writing source list: %s", err) |
| } |
| |
| // Now that we have lastSourceIndex we can add the trace values. |
| addValues := func(tx *bolt.Tx) error { |
| t, err := tx.CreateBucketIfNotExists([]byte(TRACE_VALUES_BUCKET_NAME)) |
| if err != nil { |
| return fmt.Errorf("Failed to get bucket: %s", err) |
| } |
| s, err := tx.CreateBucketIfNotExists([]byte(TRACE_SOURCES_BUCKET_NAME)) |
| if err != nil { |
| return fmt.Errorf("Failed to get bucket: %s", err) |
| } |
| |
| // Add values and source index. |
| for traceID, value := range values { |
| // Write the value. |
| valueBytes, err := serialize(traceValue{ |
| Index: int64(index), |
| Value: value, |
| }) |
| if err != nil { |
| return err |
| } |
| // Append the serialized traceValue to the current trace value. |
| if err := t.Put([]byte(traceID), append(t.Get([]byte(traceID)), valueBytes...)); err != nil { |
| return fmt.Errorf("bucket.Put() of value failed: %s", err) |
| } |
| |
| // Write the source. |
| sourceBytes, err := serialize(sourceValue{ |
| Index: int64(index), |
| Source: lastSourceIndex, |
| }) |
| if err != nil { |
| return err |
| } |
| // Append the serialized sourceValue to the current trace value. |
| if err := s.Put([]byte(traceID), append(s.Get([]byte(traceID)), sourceBytes...)); err != nil { |
| return fmt.Errorf("bucket.Put() of source failed: %s", err) |
| } |
| } |
| return nil |
| } |
| |
| if err := entry.db.Update(addValues); err != nil { |
| return fmt.Errorf("Error while writing values: %s", err) |
| } |
| |
| return nil |
| } |
| |
| func (b *BoltTraceStore) Details(commitID *cid.CommitID, traceID string) (string, float32, error) { |
| entry, err := b.getBoltDB(commitID, true) |
| if err != nil { |
| return "", 0, fmt.Errorf("Unable to open datastore: %s", err) |
| } |
| defer entry.Done() |
| |
| localIndex := int64(commitID.Offset % constants.COMMITS_PER_TILE) |
| var sourceRet string |
| var valueRet float32 |
| |
| get := func(tx *bolt.Tx) error { |
| sl := tx.Bucket([]byte(SOURCE_LIST_BUCKET_NAME)) |
| if sl == nil { |
| return fmt.Errorf("Failed to get bucket: %s", SOURCE_LIST_BUCKET_NAME) |
| } |
| v := tx.Bucket([]byte(TRACE_VALUES_BUCKET_NAME)) |
| if v == nil { |
| return fmt.Errorf("Failed to get bucket: %s", TRACE_VALUES_BUCKET_NAME) |
| } |
| s := tx.Bucket([]byte(TRACE_SOURCES_BUCKET_NAME)) |
| if s == nil { |
| return fmt.Errorf("Failed to get bucket: %s", TRACE_SOURCES_BUCKET_NAME) |
| } |
| |
| // Read the value. |
| rawValues := v.Get([]byte(traceID)) |
| if rawValues == nil { |
| rawValues = []byte{} |
| } |
| rawValues = dup(rawValues) |
| buf := bytes.NewBuffer(rawValues) |
| value := traceValue{ |
| Index: -1, |
| } |
| for { |
| err := binary.Read(buf, binary.LittleEndian, &value) |
| if err != nil { |
| break |
| } |
| if value.Index == localIndex { |
| valueRet = value.Value |
| // Don't break, we want the last value for index. |
| } |
| } |
| if value.Index == -1 { |
| return fmt.Errorf("Value not found: %q in %q", traceID, commitID.Filename()) |
| } |
| |
| // Read the source. |
| rawSource := s.Get([]byte(traceID)) |
| if rawSource == nil { |
| return fmt.Errorf("Source not found.") |
| } |
| rawSource = dup(rawSource) |
| buf = bytes.NewBuffer(rawSource) |
| source := sourceValue{ |
| Index: -1, |
| } |
| var sourceIndex uint64 |
| for { |
| err := binary.Read(buf, binary.LittleEndian, &source) |
| if err != nil { |
| sklog.Infof("Failed binary.Read: %s", err) |
| break |
| } |
| if source.Index == localIndex { |
| sourceIndex = source.Source |
| // Don't break, we want the last value for index. |
| } |
| } |
| if value.Index == -1 { |
| return fmt.Errorf("Source not found: %q in %q", traceID, commitID.Filename()) |
| } |
| |
| // Read the sourceFullname. |
| sourceRet = string(sl.Get(uint64ToBytes(sourceIndex))) |
| |
| return nil |
| } |
| |
| if err := entry.db.View(get); err != nil { |
| return "", 0, fmt.Errorf("Error while reading value: %s", err) |
| } |
| |
| return sourceRet, valueRet, nil |
| } |
| |
| type tileMap struct { |
| commitID *cid.CommitID |
| idxmap map[int]int |
| } |
| |
| // buildMapper transforms the slice of commitIDs passed to Match into a mapping |
| // from the location of the commit in the DB to the index for that commit in |
| // the Trace's returned from Match. I.e. it maps tiles to a map that says where |
| // each value stored in the tile trace needs to be copied into the destination |
| // Trace. |
| // |
| // For example, if given: |
| // |
| // commitIDs := []*cid.CommitID{ |
| // &cid.CommitID{ |
| // Source: "master", |
| // Offset: 49, |
| // }, |
| // &cid.CommitID{ |
| // Source: "master", |
| // Offset: 50, |
| // }, |
| // &cid.CommitID{ |
| // Source: "master", |
| // Offset: 51, |
| // }, |
| // } |
| // |
| // This will return the following, presuming a tile size of 50: |
| // |
| // map[string]*tileMap{ |
| // "master-000000.bdb": &tileMap{ |
| // commitID: &cid.CommitID{ |
| // Source: "master", |
| // Offset: 49, |
| // }, |
| // idxmap: map[int]int{ |
| // 49: 0, |
| // }, |
| // }, |
| // "master-000001.bdb": &tileMap{ |
| // commitID: &cid.CommitID{ |
| // Source: "master", |
| // Offset: 50, |
| // }, |
| // idxmap: map[int]int{ |
| // 0: 1, |
| // 1: 2, |
| // }, |
| // }, |
| // } |
| // |
| // The returned map is used when loading traces out of tiles. |
| func buildMapper(commitIDs []*cid.CommitID) map[string]*tileMap { |
| mapper := map[string]*tileMap{} |
| for targetIndex, commitID := range commitIDs { |
| if tm, ok := mapper[commitID.Filename()]; !ok { |
| mapper[commitID.Filename()] = &tileMap{ |
| commitID: commitID, |
| idxmap: map[int]int{commitID.Offset % constants.COMMITS_PER_TILE: targetIndex}, |
| } |
| } else { |
| tm.idxmap[commitID.Offset%constants.COMMITS_PER_TILE] = targetIndex |
| } |
| } |
| return mapper |
| } |
| |
| // dup makes a copy of a byte slice. |
| // |
| // Needed since values returned from BoltDB are only valid |
| // for the life of the transaction. |
| func dup(b []byte) []byte { |
| ret := make([]byte, len(b)) |
| copy(ret, b) |
| return ret |
| } |
| |
| // loadMatches loads values into 'traceSet' that match the 'matches' from the |
| // tile in the BoltDB 'db'. Only values at the offsets in 'idxmap' are |
| // actually loaded, and 'idxmap' determines where they are stored in the Trace. |
| func loadMatches(entry *cacheEntry, idxmap map[int]int, matches KeyMatches, traceSet TraceSet, traceLen int) error { |
| defer timer.New("loadMatches time").Stop() |
| defer entry.Done() |
| |
| get := func(tx *bolt.Tx) error { |
| defer timer.New("loadMatches TX time").Stop() |
| bucket := tx.Bucket([]byte(TRACE_VALUES_BUCKET_NAME)) |
| if bucket == nil { |
| // If the bucket doesn't exist then we've never written to this tile, it's not an error, |
| // it just means it has no data. |
| return nil |
| } |
| v := bucket.Cursor() |
| value := traceValue{} |
| // Loop over the entire bucket. |
| for btraceid, rawValues := v.First(); btraceid != nil; btraceid, rawValues = v.Next() { |
| // Does the trace id match the query? |
| if !matches(string(btraceid)) { |
| continue |
| } |
| // Get the trace. |
| trace := traceSet[string(btraceid)] |
| if trace == nil { |
| // Don't make the copy until we know we are going to need it. |
| traceid := string(dup(btraceid)) |
| traceSet[traceid] = NewTrace(traceLen) |
| trace = traceSet[traceid] |
| } |
| |
| // Decode all the [index, float32] pairs stored for the trace. |
| buf := bytes.NewBuffer(rawValues) |
| for { |
| if err := binary.Read(buf, binary.LittleEndian, &value); err != nil { |
| break |
| } |
| // Store the value in trace if the index appears in idxmap. |
| if offset, ok := idxmap[int(value.Index)]; ok { |
| trace[offset] = value.Value |
| // Don't break, we want the last value for index. |
| } |
| } |
| } |
| return nil |
| } |
| |
| return entry.db.View(get) |
| } |
| |
| func (b *BoltTraceStore) Match(commitIDs []*cid.CommitID, matches KeyMatches, progress Progress) (TraceSet, error) { |
| ret := TraceSet{} |
| mapper := buildMapper(commitIDs) |
| i := 0 |
| for _, tm := range mapper { |
| i++ |
| if progress != nil { |
| progress(i, len(mapper)) |
| } |
| entry, err := b.getBoltDB(tm.commitID, true) |
| if err == tileNotExist { |
| sklog.Infof("Skipped non-existent db: %s", tm.commitID.Filename()) |
| continue |
| } |
| if err != nil { |
| return nil, fmt.Errorf("Failed to open tile from %s: %s", tm.commitID.Filename(), err) |
| } |
| // loadMatches calls entry.Done(). |
| if err := loadMatches(entry, tm.idxmap, matches, ret, len(commitIDs)); err != nil { |
| return nil, fmt.Errorf("Failed to load traces from %s: %s", tm.commitID.Filename(), err) |
| } |
| } |
| if progress != nil { |
| progress(len(mapper), len(mapper)) |
| } |
| return ret, nil |
| } |
| |
| var Default *BoltTraceStore |
| |
| func Init(dir string) { |
| if Default != nil { |
| sklog.Fatalf("ptracestore should only be initialized once.") |
| } |
| var err error |
| Default, err = New(dir) |
| if err != nil { |
| sklog.Fatalf("ptracestore failed to init: %s", err) |
| } |
| } |
| |
| // Ensure that *BoltTraceStore implements PTraceStore. |
| var _ PTraceStore = &BoltTraceStore{} |