blob: 162dfe67552399f1e367a40c949b2478ce01273b [file] [log] [blame]
// tilebuilder is an application that periodically loads new data from BigQuery
// for quick and easy consumption by the skiaperf UI.
//
// The algorithm
// -------------
//
// Start at level 0.
// Find the index of the last tile (0000.gob, 0001.gob, etc)
// nextTile = (last tile)+1 or 0 if no tiles were found.
// Find the last commit time from the last tile file, or use BEGINNING_OF_TIME if there were no tile files.
// Get a list of all commits >= the last commit time (exclude the last git hash).
//
// Start loading data from BigQuery, and for every time you hit (32) points write out a new Tile file
// and increment nextTile.
//
// If you get to the end and have <= 32 points left, write it out as nextTile.gob, we will pick it up
// and try to fill it out the next time through the loop.
//
// Do the following for each level, starting with 1, and incrementing until no new tiles are generated for a level.
// {{
// Find the index of the last tile (0000.gob, 0001.gob, etc)
// nextTile = (last tile)+1 or 0 if no tiles were found.
//
// Loop over the 4 subtiles that should make up this new tile and sub-sample to 32 points.
// Make sure to pick non-missing points if possible. Also make sure to rollup commit messages
// for all four of the points that get consolidated.
//
// Write out as a new tile.
//
// Keep looping until you run out of tiles at the lower level.
// }}
//
package main
import (
"bytes"
"flag"
"fmt"
"net"
"net/http"
"sort"
"text/template"
"time"
)
import (
"code.google.com/p/goauth2/compute/serviceaccount"
"code.google.com/p/google-api-go-client/bigquery/v2"
"github.com/golang/glog"
"github.com/rcrowley/go-metrics"
)
import (
// TODO(jcgregorio) Move to skia.googlesource.com/...git... or something.
"auth"
"bqutil"
"config"
"db"
"filetilestore"
"types"
)
// flags
var (
tileDir = flag.String("tile_dir", "/tmp/tileStore", "What directory to look for tiles in.")
doOauth = flag.Bool("oauth", true, "Run through the OAuth 2.0 flow on startup, otherwise use a GCE service account.")
)
var (
// BigQuery query as a template.
traceQueryTemplate *template.Template
lastTileUpdate = map[string]time.Time{}
timeSinceLastUpdate = map[string]metrics.Gauge{}
updateLatency = map[string]metrics.Timer{}
)
const (
SAMPLE_PERIOD = 30 * time.Minute
)
// TraceQuery is the data to pass into traceQueryTemplate when expanding the template.
type TraceQuery struct {
TablePrefix string
Date string
DatasetPredicates string
GitHash string
Timestamp int64
}
func Init() {
// Initialize the metrics.
for _, datasetName := range config.ALL_DATASET_NAMES {
name := string(datasetName)
lastTileUpdate[name] = time.Now()
timeSinceLastUpdate[name] = metrics.NewRegisteredGauge(fmt.Sprintf("build.%s.time_since_last_update", name), metrics.DefaultRegistry)
updateLatency[name] = metrics.NewRegisteredTimer(fmt.Sprintf("build.%s.latency", name), metrics.DefaultRegistry)
}
// Keep the timeSince* metrics up to date.
go func() {
for _ = range time.Tick(time.Minute) {
for _, datasetName := range config.ALL_DATASET_NAMES {
name := string(datasetName)
timeSinceLastUpdate[name].Update(int64(time.Since(lastTileUpdate[name]).Seconds()))
}
}
}()
traceQueryTemplate = template.Must(template.New("traceQueryTemplate").Parse(`
SELECT
*
FROM
{{.TablePrefix}}{{.Date}}
WHERE
isTrybot=false
AND (gitHash = "{{.GitHash}}")
{{.DatasetPredicates}}
AND
timestamp >= {{.Timestamp}}
ORDER BY
key DESC,
timestamp DESC;
`))
metrics.RegisterRuntimeMemStats(metrics.DefaultRegistry)
go metrics.CaptureRuntimeMemStats(metrics.DefaultRegistry, 1*time.Minute)
addr, _ := net.ResolveTCPAddr("tcp", "skia-monitoring-b:2003")
go metrics.Graphite(metrics.DefaultRegistry, 1*time.Minute, "tilepipeline", addr)
}
// startConditions returns the time from which queries should be made, the index
// of the next tile that needs to be written, and an error if any occurred.
func startConditions(store types.TileStore) (config.QuerySince, int, error) {
// startTime is when to limit queries to in time.
startTime := config.BEGINNING_OF_TIME
// nextTile is the index of the next tile to write.
nextTile := 0
tile, err := store.Get(0, -1)
if err != nil {
return startTime, 0, fmt.Errorf("Failed to read tile looking for start conditions: %s", err)
}
if tile != nil {
// We are always overwriting the last tile until it is full and we start on the next tile.
nextTile = tile.TileIndex
if tile.TileIndex > 0 {
tile, err := store.Get(0, tile.TileIndex-1)
if err != nil {
return startTime, 0, fmt.Errorf("Failed to read previous tile looking for start conditions: %s", err)
}
// Start querying from the timestamp of the last commit in the last full tile.
startTime = config.NewQuerySince(time.Unix(tile.Commits[len(tile.Commits)-1].CommitTime, 0))
glog.Infof("Picking from range: First: %v", time.Unix(tile.Commits[0].CommitTime, 0))
glog.Infof("Picking from range: Last: %v", time.Unix(tile.Commits[len(tile.Commits)-1].CommitTime, 0))
}
}
return startTime, nextTile, nil
}
func tablePrefixFromDatasetName(name config.DatasetName) string {
switch name {
case config.DATASET_SKP:
return "perf_skps_v2.skpbench"
case config.DATASET_MICRO:
return "perf_bench_v2.microbench"
}
return "perf_skps_v2.skpbench"
}
type CommitSlice []*types.Commit
type CommitSliceSortable []*types.Commit
func (p CommitSliceSortable) Len() int { return len(p) }
func (p CommitSliceSortable) Less(i, j int) bool { return p[i].CommitTime < p[j].CommitTime }
func (p CommitSliceSortable) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// gitCommits returns all the Commits that have associated test data, going
// from now back to 'startTime'.
func gitCommits(service *bigquery.Service, datasetName config.DatasetName, startTime config.QuerySince) (map[string][]string, []*types.Commit, error) {
dateMap := make(map[string][]string)
allCommits := make([]*types.Commit, 0)
commitHashMap := make(map[string]bool)
commitHistory, err := db.ReadCommitsFromDB()
if err != nil {
return nil, nil, fmt.Errorf("gitCommits: Did not get the commits history from the database: ", err)
}
queryTemplate := `
SELECT
gitHash, FIRST(timestamp) as timestamp, FIRST(gitNumber) as gitNumber
FROM
%s%s
GROUP BY
gitHash
ORDER BY
timestamp DESC;
`
// Loop over table going backward until we hit startTime.
glog.Info("gitCommits: starting.")
// historyIdx keeps the current index of commitHistory.
historyIdx := 0
totalCommits := 0
for dates := types.NewDateIter(); dates.Next() && (startTime.Date() <= dates.Date()); {
query := fmt.Sprintf(queryTemplate, tablePrefixFromDatasetName(datasetName), dates.Date())
iter, err := bqutil.NewRowIter(service, query)
if err != nil {
glog.Warningln("Tried to query a table that didn't exist", dates.Date(), err)
continue
}
gitHashesForDay := []string{}
for iter.Next() {
c := types.NewCommit()
if err := iter.Decode(c); err != nil {
return nil, allCommits, fmt.Errorf("Failed reading hashes from BigQuery: %s", err)
}
gitHashesForDay = append(gitHashesForDay, c.Hash)
// Scan commitHistory and populate commit info if available.
for ; historyIdx < len(commitHistory) && commitHistory[historyIdx].CommitTime >= c.CommitTime; historyIdx++ {
if commitHistory[historyIdx].Hash == c.Hash {
*c = *commitHistory[historyIdx]
} else if len(allCommits) > 0 {
// Append to tail_commit
tailCommits := &allCommits[len(allCommits)-1].TailCommits
*tailCommits = append(*tailCommits, commitHistory[historyIdx])
// TODO(jcgregorio) Truncate the commit messages that go into the tail.
}
}
totalCommits++
// Data may show up for a commit across more than one day, track if a
// commit has already been added, and only add if new.
if _, ok := commitHashMap[c.Hash]; !ok {
commitHashMap[c.Hash] = true
allCommits = append(allCommits, c)
}
}
dateMap[dates.Date()] = gitHashesForDay
glog.Infof("Finding hashes with data, finished day %s, total commits so far %d", dates.Date(), totalCommits)
}
sort.Sort(CommitSliceSortable(allCommits))
for _, c := range allCommits {
glog.Infof("gitCommits: allcommits: %s %d\n", c.Hash, c.CommitTime)
}
return dateMap, allCommits, nil
}
// populateParamSet returns the set of all possible values for all the 'params'
// in Dataset.
func populateParamSet(tile *types.Tile) {
// First pull the data out into a map of sets.
type ChoiceSet map[string]bool
c := make(map[string]ChoiceSet)
for _, t := range tile.Traces {
for k, v := range t.Params {
if set, ok := c[k]; !ok {
c[k] = make(map[string]bool)
c[k][v] = true
} else {
set[v] = true
}
}
}
// Now flatten the sets into []string and populate ParamsSet with that.
for k, v := range c {
allOptions := []string{}
for option, _ := range v {
allOptions = append(allOptions, option)
}
tile.ParamSet[k] = allOptions
}
}
// populateTraces reads the data from BigQuery and populates the Traces.
//
// dates is a map of table date suffixes that we will need to iterate over that
// map to the git hashes that each of those days contain.
func populateTraces(tile *types.Tile, service *bigquery.Service, datasetName config.DatasetName, dates map[string][]string) error {
// Keep a map of key to Trace.
allTraces := map[string]*types.Trace{}
numSamples := len(tile.Commits)
earliestTimestamp := tile.Commits[0].CommitTime
// A mapping of Git hashes to where they appear in the Commits array, also
// the index at which a measurement gets stored in the Values array.
hashToIndex := make(map[string]int)
for i, commit := range tile.Commits {
hashToIndex[commit.Hash] = i
}
datasetPredicates := `
AND (
params.measurementType="gpu" OR
params.measurementType="wall"
)
`
if datasetName == config.DATASET_MICRO {
datasetPredicates = ""
}
// Query each table one day at a time. This protects us from schema changes
// and from trying to pull too much data from BigQuery at one time.
for date, gitHashes := range dates {
for _, gitHash := range gitHashes {
traceQueryParams := TraceQuery{
TablePrefix: tablePrefixFromDatasetName(datasetName),
Date: date,
DatasetPredicates: datasetPredicates,
GitHash: gitHash,
Timestamp: earliestTimestamp,
}
query := &bytes.Buffer{}
err := traceQueryTemplate.Execute(query, traceQueryParams)
if err != nil {
return fmt.Errorf("Failed to construct a query: %s", err)
}
glog.Infof("Query: %q", query)
iter, err := bqutil.NewRowIter(service, query.String())
if err != nil {
return fmt.Errorf("Failed to query data from BigQuery: %s", err)
}
var trace *types.Trace = nil
currentKey := ""
for iter.Next() {
m := &struct {
Value float64 `bq:"value"`
Key string `bq:"key"`
Hash string `bq:"gitHash"`
}{}
if err := iter.Decode(m); err != nil {
return fmt.Errorf("Failed to decode Measurement from BigQuery: %s", err)
}
if m.Key != currentKey {
currentKey = m.Key
// If we haven't seen this key before, create a new Trace for it and store
// the Trace in allTraces.
if _, ok := allTraces[currentKey]; !ok {
trace = types.NewTrace(numSamples)
trace.Params = iter.DecodeParams()
trace.Key = currentKey
allTraces[currentKey] = trace
} else {
trace = allTraces[currentKey]
}
}
if index, ok := hashToIndex[m.Hash]; ok {
trace.Values[index] = m.Value
}
}
}
glog.Infof("Loading data, finished day %s", date)
}
// Flatten allTraces into Traces.
for _, trace := range allTraces {
tile.Traces = append(tile.Traces, trace)
}
return nil
}
// buildTile builds a Tile for the given set of commits.
//
// dates is the full set of days and hashes that appear in each day.
func buildTile(service *bigquery.Service, datasetName config.DatasetName, dates map[string][]string, commits []*types.Commit) (*types.Tile, error) {
tile := types.NewTile()
tile.Commits = commits
// We need to filter down 'dates' to just include the hashes that appear in 'commits'.
// First build a map of the hashes in 'commits'.
commitHashes := map[string]bool{}
for _, c := range commits {
commitHashes[c.Hash] = true
}
filteredDates := map[string][]string{}
for date, gitHashes := range dates {
filteredHashes := []string{}
for _, h := range gitHashes {
if _, ok := commitHashes[h]; ok {
filteredHashes = append(filteredHashes, h)
}
}
if len(filteredHashes) > 0 {
filteredDates[date] = filteredHashes
}
}
glog.Infof("Building tile from: %#v\n", filteredDates)
if err := populateTraces(tile, service, datasetName, filteredDates); err != nil {
return nil, fmt.Errorf("Failed to read traces from BigQuery: %s", err)
}
glog.Info("Successfully read traces from BigQuery")
populateParamSet(tile)
return tile, nil
}
func updateAllTileSets(service *bigquery.Service) {
glog.Infof("Starting to update all tile sets.")
for _, datasetName := range config.ALL_DATASET_NAMES {
glog.Infof("Starting to update tileset %s.", string(datasetName))
begin := time.Now()
store := filetilestore.NewFileTileStore(*tileDir, string(datasetName))
startTime, nextTile, err := startConditions(store)
glog.Infoln("Found startTime", startTime.SqlTsColumn(), "nextTile", nextTile)
if err != nil {
glog.Errorf("Failed to compute start conditions for dataset %s: %s", string(datasetName), err)
continue
}
glog.Infoln("Getting commits")
dates, commits, err := gitCommits(service, datasetName, startTime)
glog.Infoln("Found commits", commits)
if err != nil {
glog.Errorf("Failed to read commits for dataset %s: %s", string(datasetName), err)
continue
}
glog.Infof("Found %d new commits across %d days for dataset %s", len(commits), len(dates), string(datasetName))
for i := 0; i < len(commits); i += config.TILE_SIZE {
end := i + config.TILE_SIZE
if end > len(commits) {
end = len(commits)
}
tile, err := buildTile(service, datasetName, dates, commits[i:end])
if err != nil {
glog.Errorf("Failed to write tile: %d scale: 0: %s\n", nextTile, err)
break
}
tile.Scale = 0
tile.TileIndex = nextTile
if err := store.Put(0, nextTile, tile); err != nil {
glog.Errorf("Failed to write tile %s for dataset %s: %s", nextTile, string(datasetName), err)
break
}
glog.Infof("Write tile: %d scale: %d\n", tile.TileIndex, tile.Scale)
nextTile += 1
}
// TODO(jcgregorio) Now write out new tiles for scales 1,2,etc. Also make
// sure to merge intermediate commits, but summarize the commit message.
name := string(datasetName)
lastTileUpdate[name] = time.Now()
d := time.Since(begin)
updateLatency[name].Update(d)
glog.Infof("Finished loading Tile data for dataset %s from BigQuery in %f s", name, d.Seconds())
}
}
func main() {
flag.Parse()
Init()
db.Init()
var err error
var client *http.Client
if *doOauth {
client, err = auth.RunFlow()
if err != nil {
glog.Fatalf("Failed to auth: %s", err)
}
} else {
client, err = serviceaccount.NewClient(nil)
if err != nil {
glog.Fatalf("Failed to auth using a service account: %s", err)
}
}
service, err := bigquery.New(client)
if err != nil {
glog.Fatalf("Failed to create a new BigQuery service object: %s", err)
}
updateAllTileSets(service)
for _ = range time.Tick(SAMPLE_PERIOD) {
updateAllTileSets(service)
}
}