blob: e6f48e48227dd9217acdf9bf8571580f7cd7f96a [file] [log] [blame]
// Package tracedb provides a datastore for efficiently storing and retrieving traces.
package db
import (
// CommitID represents the time of a particular commit, where a commit could either be
// a real commit into the repo, or an event like running a trybot.
type CommitID struct {
Timestamp int64 `json:"ts"`
ID string `json:"id"` // Normally a git hash, but could also be Rietveld patch id.
Source string `json:"source"` // The branch name, e.g. "master", or the Reitveld issue id.
func (c CommitID) String() string {
return fmt.Sprintf("%s:%s:%d", c.ID, c.Source, c.Timestamp)
// Entry holds the params and a value for single measurement.
type Entry struct {
Params map[string]string
// Value is the value of the measurement.
// It should be the digest string converted to a []byte, or a float64
// converted to a little endian []byte. I.e. tiling.Trace.SetAt
// should be able to consume this value.
Value []byte
// DB represents the interface to any datastore for perf and gold results.
type DB interface {
// Add new information to the datastore.
// The values parameter maps a trace id to an Entry.
// Note that only allowing adding data for a single commit at a time
// should work well with ingestion while still breaking up writes into
// shorter actions.
Add(commitID *CommitID, values map[string]*Entry) error
// Remove all info for the given commit.
Remove(commitID *CommitID) error
// List returns all the CommitID's between begin and end.
List(begin, end time.Time) ([]*CommitID, error)
// Create a Tile for the given commit ids. Will build the Tile using the
// commits in the order they are provided.
// Note that the Commits in the Tile will only contain the commit id and
// the timestamp, the Author will not be populated.
// The Tile's Scale and TileIndex will be set to 0.
// The md5 hashes for each commitid are also returned.
TileFromCommits(commitIDs []*CommitID) (*tiling.Tile, []string, error)
// ListMD5 returns the md5 hashes of the data stored for each commitid.
ListMD5(commitIDs []*CommitID) ([]string, error)
// Close the datastore.
Close() error
const (
// MAX_ID_CACHED is the size of the LRU cache TsDB.cache.
MAX_ID_CACHED = 1000000
// CHUNK_SIZE is the maximum number of values that are added to the datastore
// in any one gRPC call.
// MAX_MESSAGE_SIZE is the maximum grpc message size.
MAX_MESSAGE_SIZE = 1024 * 1024 * 1024
// TsDB is an implementation of DB that stores traces in traceservice.
type TsDB struct {
// ts is the client for the traceservice.
traceService traceservice.TraceServiceClient
// conn is the underlying connection for ts.
conn *grpc.ClientConn
// tb is a TraceBuilder for the type of Tile we're managing, i.e. perf or gold.
// It is used to build Trace's of the right type and size when building Tiles.
traceBuilder tiling.TraceBuilder
// mutex protects access to the caches.
mutex sync.Mutex
// cache is an LRU cache, records if a given trace has its params stored
// during a previous Add(), keyed by traceid and maps to a bool
cache *lru.Cache
// paramsCache is a cache of params retrieved from tracedb, keyed by traceid.
paramsCache map[string]map[string]string
// id64Cache is a cache of traceids retrieved from tracedb, keyed by trace64id.
id64Cache map[uint64]string
// ctx is the gRPC context.
ctx context.Context
// clearMutex is a mutex to protect clearing of the caches. TileFromCommits
// will get read locks, and a go routine that periodically checks the cache
// sizes will gain a write lock. That way concurrent TileFromCommits calls
// will proceed, but none will be running when the caches are potentially
// cleared.
clearMutex sync.RWMutex
// NewTraceServiceDB creates a new DB that stores the data in the BoltDB backed
// gRPC accessible traceservice.
func NewTraceServiceDB(conn *grpc.ClientConn, traceBuilder tiling.TraceBuilder) (*TsDB, error) {
ret := &TsDB{
conn: conn,
traceService: traceservice.NewTraceServiceClient(conn),
traceBuilder: traceBuilder,
cache: lru.New(MAX_ID_CACHED),
paramsCache: map[string]map[string]string{},
id64Cache: map[uint64]string{},
ctx: context.Background(),
// This ping causes the client to try and reach the backend. If the backend
// is down, it will keep trying until it's up.
if err :=; err != nil {
return nil, err
// Liveness metric.
go func() {
liveness := metrics2.NewLiveness("ping", map[string]string{"module": "tracedb"})
for range time.Tick(time.Minute) {
if == nil {
// Keep the caches sizes in check.
go func() {
for range time.Tick(15 * time.Minute) {
if len(ret.paramsCache) > MAX_ID_CACHED {
ret.paramsCache = map[string]map[string]string{}
sklog.Warning("Had to clear paramsCache, this is unexpected. MAX_ID_CACHED too small?")
if len(ret.id64Cache) > MAX_ID_CACHED {
ret.id64Cache = map[uint64]string{}
sklog.Warning("Had to clear id64Cache, this is unexpected. MAX_ID_CACHED too small?")
return ret, nil
func (ts *TsDB) ping() error {
_, err := ts.traceService.Ping(ts.ctx, &traceservice.Empty{})
return err
// addChunk adds a set of entries to the datastore at the given CommitID.
func (ts *TsDB) addChunk(ctx context.Context, cid *traceservice.CommitID, chunk map[string]*Entry) error {
if len(chunk) == 0 {
return nil
addReq := &traceservice.AddRequest{
Commitid: cid,
Values: []*traceservice.ValuePair{},
addParamsRequest := &traceservice.AddParamsRequest{
Params: []*traceservice.ParamsPair{},
for traceid, entry := range chunk {
// Check that all the traceids have their Params.
if _, ok := ts.cache.Get(traceid); !ok {
addParamsRequest.Params = append(addParamsRequest.Params, &traceservice.ParamsPair{
Key: traceid,
Params: entry.Params,
ts.cache.Add(traceid, true)
addReq.Values = append(addReq.Values, &traceservice.ValuePair{
Key: traceid,
Value: entry.Value,
if len(addParamsRequest.Params) > 0 {
// TODO(stephana): We need to fix the call to AddParams. If it fails the
// the DB ends up in an inconsistent state and traceService.GetParams
// for the failing traceID will cause a panic.
if _, err := ts.traceService.AddParams(ctx, addParamsRequest); err != nil {
return fmt.Errorf("Failed to add params: %s", err)
if _, err := ts.traceService.Add(ctx, addReq); err != nil {
return fmt.Errorf("Failed to add values: %s", err)
return nil
// tsCommitID converts a db.CommitID to traceservice.CommitID.
func tsCommitID(c *CommitID) *traceservice.CommitID {
return &traceservice.CommitID{
Timestamp: c.Timestamp,
Id: c.ID,
Source: c.Source,
// dbCommitID converts a traceservice.CommitID to db.CommitID.
func dbCommitID(c *traceservice.CommitID) *CommitID {
return &CommitID{
ID: c.Id,
Source: c.Source,
Timestamp: c.Timestamp,
// Add implements DB.Add().
func (ts *TsDB) Add(commitID *CommitID, values map[string]*Entry) error {
ctx := context.Background()
cid := tsCommitID(commitID)
// Break the values into chunks of CHUNK_SIZE or less and then process each slice.
// This will keep the total request size down.
chunks := []map[string]*Entry{}
chunk := map[string]*Entry{}
n := 0
for k, v := range values {
chunk[k] = v
if n >= CHUNK_SIZE {
n = 0
chunks = append(chunks, chunk)
chunk = map[string]*Entry{}
chunks = append(chunks, chunk)
for i, chunk := range chunks {
if err := ts.addChunk(ctx, cid, chunk); err != nil {
return fmt.Errorf("Failed to add chunk %d: %s", i, err)
return nil
// Remove implements DB.Remove().
func (ts *TsDB) Remove(commitID *CommitID) error {
removeRequest := &traceservice.RemoveRequest{
Commitid: tsCommitID(commitID),
_, err := ts.traceService.Remove(context.Background(), removeRequest)
return err
// List implements DB.List().
func (ts *TsDB) List(begin, end time.Time) ([]*CommitID, error) {
listReq := &traceservice.ListRequest{
Begin: begin.Unix(),
End: end.Unix(),
listResp, err := ts.traceService.List(context.Background(), listReq)
if err != nil {
return nil, fmt.Errorf("List request failed: %s", err)
// Copy the data from the ListResponse to a slice of CommitIDs.
ret := []*CommitID{}
for _, c := range listResp.Commitids {
ret = append(ret, dbCommitID(c))
return ret, nil
// TileFromCommits implements DB.TileFromCommits().
func (ts *TsDB) TileFromCommits(commitIDs []*CommitID) (*tiling.Tile, []string, error) {
ctx := context.Background()
// Build the Tile.
tile := tiling.NewTile()
n := len(commitIDs)
tile.Commits = make([]*tiling.Commit, n, n)
hash := make([]string, n)
// Populate the Tile's commits.
for i, cid := range commitIDs {
tile.Commits[i] = &tiling.Commit{
Hash: cid.ID,
CommitTime: cid.Timestamp,
// tileMutex protects access to the Tile. Note that this only means the Tile,
// while writing values into a Trace that already exists and is the right
// size is Go routine safe.
var tileMutex sync.Mutex
errCh := make(chan error, len(commitIDs))
// Fill in the data for each commit in it's own Go routine.
var wg sync.WaitGroup
for i, cid := range commitIDs {
go func(i int, cid *CommitID) {
defer wg.Done()
// Load the values for the commit.
getValuesRequest := &traceservice.GetValuesRequest{
Commitid: tsCommitID(cid),
getRawValues, err := ts.traceService.GetValuesRaw(ctx, getValuesRequest)
if err != nil {
errCh <- fmt.Errorf("Failed to get values for %d %#v: %s", i, *cid, err)
// Convert raw response into values.
ci, err := traceservice.NewCommitInfo(getRawValues.Value)
if err != nil {
errCh <- fmt.Errorf("Failed to convert values for %d %#v: %s", i, *cid, err)
// Now make sure we have all the traceids for the trace64ids in ci.
missingKeys64 := []uint64{}
for id64 := range ci.Values {
if _, ok := ts.id64Cache[id64]; !ok {
missingKeys64 = append(missingKeys64, id64)
if len(missingKeys64) > 0 {
traceidsRequest := &traceservice.GetTraceIDsRequest{
Id: missingKeys64,
traceids, err := ts.traceService.GetTraceIDs(ctx, traceidsRequest)
if err != nil {
errCh <- fmt.Errorf("Failed to get traceids for trace64ids for %d %#v: %s", i, *cid, err)
for _, tid := range traceids.Ids {
ts.id64Cache[tid.Id64] = tid.Id
for id64, rawValue := range ci.Values {
if rawValue == nil {
sklog.Errorf("Got a nil rawValue in response: %s", err)
traceid := ts.id64Cache[id64]
tr, ok := tile.Traces[traceid]
if !ok || tr == nil {
tile.Traces[traceid] = ts.traceBuilder(n)
tr = tile.Traces[traceid]
if tr == nil {
sklog.Errorf("Trace was still nil for key: %v", traceid)
if err := tr.SetAt(i, rawValue); err != nil {
errCh <- fmt.Errorf("Unable to convert trace value %d %#v: %s", i, *cid, err)
// Fill in the commits hash.
hash[i] = getRawValues.Md5
}(i, cid)
// See if any Go routine generated an error.
select {
case err, ok := <-errCh:
if ok {
return nil, nil, fmt.Errorf("Failed to load trace data: %s", err)
sklog.Infof("Finished loading values. Starting to load Params.")
// Now load the params for the traces.
traceids := []string{}
for k := range tile.Traces {
// Only load params for traces not already in the cache.
if _, ok := ts.paramsCache[k]; !ok {
traceids = append(traceids, k)
// Break the loading of params into chunks and make those requests concurrently.
// The params are just loaded into the paramsCache.
tracechunks := [][]string{}
for len(traceids) > 0 {
if len(traceids) > CHUNK_SIZE {
tracechunks = append(tracechunks, traceids[:CHUNK_SIZE])
traceids = traceids[CHUNK_SIZE:]
} else {
tracechunks = append(tracechunks, traceids)
traceids = []string{}
errCh = make(chan error, len(tracechunks))
for _, chunk := range tracechunks {
go func(chunk []string) {
defer wg.Done()
req := &traceservice.GetParamsRequest{
Traceids: chunk,
resp, err := ts.traceService.GetParams(ctx, req)
if err != nil {
errCh <- fmt.Errorf("Failed to load params: %s", err)
for _, param := range resp.Params {
ts.paramsCache[param.Key] = param.Params
// See if any Go routine generated an error.
select {
case err, ok := <-errCh:
if ok {
return nil, nil, fmt.Errorf("Failed to load params: %s", err)
// Add all params from the cache.
for k, tr := range tile.Traces {
p := tr.Params()
for pk, pv := range ts.paramsCache[k] {
p[pk] = pv
// Rebuild the ParamSet.
sklog.Infof("Finished loading params. Starting to rebuild ParamSet.")
tiling.GetParamSet(tile.Traces, tile.ParamSet)
return tile, hash, nil
// ListMD5 returns the md5 hashes of the data stored for each commitid.
func (ts *TsDB) ListMD5(commitIDs []*CommitID) ([]string, error) {
ctx := context.Background()
req := &traceservice.ListMD5Request{
Commitid: make([]*traceservice.CommitID, len(commitIDs)),
for i, cid := range commitIDs {
req.Commitid[i] = tsCommitID(cid)
resp, err := ts.traceService.ListMD5(ctx, req)
if err != nil {
return nil, fmt.Errorf("Failed to load hashes: %s", err)
ret := make([]string, len(commitIDs))
for i, ci5 := range resp.Commitmd5 {
ret[i] = ci5.Md5
return ret, nil
// Close the underlying connection to the datastore.
func (ts *TsDB) Close() error {
return ts.conn.Close()
// NewTraceServiceDBFromAddress is given the address of the traceService
// implementation and returns an instance of the trace.DB
// (the higher level wrapper on top of trace service).
func NewTraceServiceDBFromAddress(traceServiceAddr string, traceBuilder tiling.TraceBuilder) (DB, error) {
if traceServiceAddr == "" {
return nil, fmt.Errorf("Did not get address for trace services.")
conn, err := grpc.Dial(traceServiceAddr, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MAX_MESSAGE_SIZE), grpc.MaxCallSendMsgSize(MAX_MESSAGE_SIZE)))
if err != nil {
return nil, fmt.Errorf("Unable to connnect to trace service at %s. Got error: %s", traceServiceAddr, err)
return NewTraceServiceDB(conn, traceBuilder)