blob: 7d2b3bcbb358f02adccec17ed94e77360175e056 [file] [log] [blame]
// Copyright (c) 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be found
// in the LICENSE file.
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"os/exec"
"reflect"
"strconv"
"strings"
"time"
)
import (
"code.google.com/p/goauth2/compute/serviceaccount"
"code.google.com/p/goauth2/oauth"
"code.google.com/p/google-api-go-client/bigquery/v2"
"github.com/oxtoacart/webbrowser"
)
const (
// JSON doesn't support NaN or +/- Inf, so we need a valid float
// to signal missing data that also has a compact JSON representation.
MISSING_DATA_SENTINEL = 1e100
)
// Shouldn't need auth when running from GCE, but will need it for local dev.
// TODO(jcgregorio) Move to reading this from client_secrets.json and void these keys at that point.
var config = &oauth.Config{
ClientId: "470362608618-nlbqngfl87f4b3mhqqe9ojgaoe11vrld.apps.googleusercontent.com",
ClientSecret: "J4YCkfMXFJISGyuBuVEiH60T",
Scope: bigquery.BigqueryScope,
AuthURL: "https://accounts.google.com/o/oauth2/auth",
TokenURL: "https://accounts.google.com/o/oauth2/token",
RedirectURL: "urn:ietf:wg:oauth:2.0:oob",
TokenCache: oauth.CacheFile("bqtoken.data"),
}
// runFlow runs through a 3LO OAuth 2.0 flow to get credentials for BigQuery.
func runFlow(config *oauth.Config) (*http.Client, error) {
transport := &oauth.Transport{Config: config}
if _, err := config.TokenCache.Token(); err != nil {
url := config.AuthCodeURL("")
fmt.Printf(`Your browser has been opened to visit:
%s
Enter the verification code:`, url)
webbrowser.Open(url)
var code string
fmt.Scan(&code)
if _, err := transport.Exchange(code); err != nil {
return nil, err
}
}
return transport.Client(), nil
}
// Trace represents all the values of a single measurement over time.
type Trace struct {
Key string `json:"key"`
Values []float64 `json:"values"`
Params map[string]string `json:"params"`
Trybot bool `json:"trybot"`
}
// NewTrace allocates a new Trace set up for the given number of samples.
//
// The Trace Values are pre-filled in with the missing data sentinel since not
// all tests will be run on all commits.
func NewTrace(numSamples int) *Trace {
t := &Trace{
Values: make([]float64, numSamples, numSamples),
Params: make(map[string]string),
Trybot: false,
}
for i, _ := range t.Values {
t.Values[i] = MISSING_DATA_SENTINEL
}
return t
}
// Annotations for commits.
//
// Will map to the table of annotation notes in MySQL. See DESIGN.md
// for the MySQL schema.
type Annotation struct {
ID int `json:"id"`
Notes string `json:"notes"`
Author string `json:"author"`
Type int `json:"type"`
}
// Commit is information about each Git commit.
type Commit struct {
CommitTime time.Time `json:"commit_time"`
Hash string `json:"hash"`
GitNumber int `json:"git_number"`
CommitMessage string `json:"commit_msg"`
Annotations []Annotation `json:"annotations,omitempty"`
}
// Choices is a list of possible values for a param. See AllData.
type Choices []string
// AllData is the top level struct we return via JSON to the UI.
//
// The length of the Commits array is the same length as all of the Values
// arrays in all of the Traces.
type AllData struct {
Traces []Trace `json:"traces"`
ParamSet map[string]Choices `json:"param_set"`
Commits []Commit `json:"commits"`
}
// gitCommitsWithTestData returns the list of commits that have perf data
// associated with them.
//
// Not all commits will have perf data, the builders don't necessarily run for
// each commit.
func gitCommitsWithTestData(service *bigquery.Service) (map[string]bool, error) {
query := `
SELECT
gitHash
FROM
(TABLE_DATE_RANGE(perf_skps_v2.skpbench,
DATE_ADD(CURRENT_TIMESTAMP(),
-2,
'DAY'),
CURRENT_TIMESTAMP()))
GROUP BY
gitHash;
`
iter, err := NewRowIter(service, query)
if err != nil {
return nil, fmt.Errorf("Failed to query for the Git hashes used: %s", err)
}
hashes := make(map[string]bool)
for iter.Next() {
h := &struct {
Hash string `bq:"gitHash"`
}{}
err := iter.Decode(h)
if err != nil {
return nil, fmt.Errorf("Failed reading hashes from BigQuery: %s", err)
}
hashes[h.Hash] = true
}
return hashes, nil
}
// GitHash represents information on a single Git commit.
type GitHash struct {
Hash string
TimeStamp time.Time
}
// readCommitsFromGit reads the commit history from a Git repository.
func readCommitsFromGit(dir string) ([]GitHash, error) {
cmd := exec.Command("git", strings.Split("log --format=%H%x20%ci", " ")...)
cmd.Dir = dir
b, err := cmd.Output()
if err != nil {
return nil, fmt.Errorf("Failed to run Git: %s", err)
}
lines := strings.Split(string(b), "\n")
hashes := make([]GitHash, 0, len(lines))
for _, line := range lines {
parts := strings.SplitN(line, " ", 2)
if len(parts) == 2 {
t, err := time.Parse("2006-01-02 15:04:05 -0700", parts[1])
if err != nil {
return nil, fmt.Errorf("Failed parsing Git log timestamp: %s", err)
}
hashes = append(hashes, GitHash{Hash: parts[0], TimeStamp: t})
}
}
return hashes, nil
}
// RowIter is a utility for reading data from a BigQuery query response.
//
// RowIter will iterate over all the results, even if they span more than one
// page of results. It automatically uses page tokens to iterate over all the
// pages to retrieve all results.
type RowIter struct {
response *bigquery.GetQueryResultsResponse
jobId string
service *bigquery.Service
nextPageToken string
row int
}
// poll until the job is complete.
func (r *RowIter) poll() error {
var queryResponse *bigquery.GetQueryResultsResponse
for {
var err error
queryCall := r.service.Jobs.GetQueryResults("google.com:chrome-skia", r.jobId)
if r.nextPageToken != "" {
queryCall.PageToken(r.nextPageToken)
}
queryResponse, err = queryCall.Do()
if err != nil {
return err
}
if queryResponse.JobComplete {
break
}
time.Sleep(time.Second)
}
r.nextPageToken = queryResponse.PageToken
r.response = queryResponse
return nil
}
// NewRowIter starts a query and returns a RowIter for iterating through the
// results.
func NewRowIter(service *bigquery.Service, query string) (*RowIter, error) {
job := &bigquery.Job{
Configuration: &bigquery.JobConfiguration{
Query: &bigquery.JobConfigurationQuery{
Query: query,
},
},
}
jobResponse, err := service.Jobs.Insert("google.com:chrome-skia", job).Do()
if err != nil {
return nil, err
}
r := &RowIter{
jobId: jobResponse.JobReference.JobId,
service: service,
row: -1, // Start at -1 so the first call to Next() puts us at the 0th Row.
}
return r, r.poll()
}
// Next moves to the next row in the response and returns true as long as data
// is availble, returning false when the end of the results are reached.
//
// Calling Next() the first time actually points the iterator at the first row,
// which makes it possible to use Next if a for loop:
//
// for iter.Next() { ... }
//
func (r *RowIter) Next() bool {
r.row++
if r.row >= len(r.response.Rows) {
if r.nextPageToken != "" {
r.poll()
r.row = 0
return len(r.response.Rows) > 0
} else {
return false
}
}
return true
}
// DecodeParams pulls all the values in the params record out as a map[string]string.
//
// The schema for each table has a nested record called 'params' that contains
// various axes along which queries could be built, such as the gpu the test was
// run against. Pull out the entire record as a generic map[string]string.
func (r *RowIter) DecodeParams() map[string]string {
row := r.response.Rows[r.row]
schema := r.response.Schema
params := map[string]string{}
for i, cell := range row.F {
if cell.V != nil {
name := schema.Fields[i].Name
if strings.HasPrefix(name, "params_") {
params[strings.TrimPrefix(name, "params_")] = cell.V.(string)
}
}
}
return params
}
// Decode uses struct tags to decode a single row into a struct.
//
// For example, given a struct:
//
// type A struct {
// Name string `bq:"name"`
// Value float64 `bq:"measurement"`
// }
//
// And a BigQuery table that contained two columns named "name" and
// "measurement". Then calling Decode as follows would parse the column values
// for "name" and "measurement" and place them in the Name and Value fields
// respectively.
//
// a = &A{}
// iter.Decode(a)
//
// Implementation Details:
//
// If a tag names a column that doesn't exist, the field is merely ignored,
// i.e. it is left unchanged from when it was passed into Decode.
//
// Not all columns need to be tagged in the struct.
//
// The decoder doesn't handle nested structs, only the top level fields are decoded.
//
// The decoder only handles struct fields of type string, int, int32, int64,
// float, float32 and float64.
func (r *RowIter) Decode(s interface{}) error {
row := r.response.Rows[r.row]
schema := r.response.Schema
// Collapse the data in the row into a map[string]string.
rowMap := map[string]string{}
for i, cell := range row.F {
if cell.V != nil {
rowMap[schema.Fields[i].Name] = cell.V.(string)
}
}
// Then iter over the fields of 's' and set them from the row data.
sv := reflect.ValueOf(s).Elem()
st := sv.Type()
for i := 0; i < sv.NumField(); i++ {
columnName := st.Field(i).Tag.Get("bq")
if columnValue, ok := rowMap[columnName]; ok {
switch sv.Field(i).Kind() {
case reflect.String:
sv.Field(i).SetString(columnValue)
case reflect.Float32, reflect.Float64:
f, err := strconv.ParseFloat(columnValue, 64)
if err != nil {
return err
}
sv.Field(i).SetFloat(f)
case reflect.Int32, reflect.Int64:
parsedInt, err := strconv.ParseInt(columnValue, 10, 64)
if err != nil {
return err
}
sv.Field(i).SetInt(parsedInt)
default:
return fmt.Errorf("can't decode into field of type: %s", sv.Field(i).Kind())
}
}
}
return nil
}
// populateTraces reads the measurement data from BigQuery and populates the Traces.
func populateTraces(service *bigquery.Service, all *AllData, hashToIndex map[string]int, numSamples int) error {
type Measurement struct {
Value float64 `bq:"value"`
Key string `bq:"key"`
Hash string `bq:"gitHash"`
}
// Now query the actual samples.
query := `
SELECT
*
FROM
(TABLE_DATE_RANGE(perf_skps_v2.skpbench,
DATE_ADD(CURRENT_TIMESTAMP(),
-2,
'DAY'),
CURRENT_TIMESTAMP()))
WHERE
params.benchName="tabl_worldjournal.skp"
OR
params.benchName="desk_amazon.skp"
ORDER BY
key DESC,
timestamp DESC;
`
iter, err := NewRowIter(service, query)
if err != nil {
return fmt.Errorf("Failed to query data from BigQuery: %s", err)
}
var trace *Trace = nil
currentKey := ""
for iter.Next() {
m := &Measurement{}
if err := iter.Decode(m); err != nil {
return fmt.Errorf("Failed to decode Measurement from BigQuery: %s", err)
}
if m.Key != currentKey {
if trace != nil {
all.Traces = append(all.Traces, *trace)
}
currentKey = m.Key
trace = NewTrace(numSamples)
trace.Params = iter.DecodeParams()
trace.Key = m.Key
}
if index, ok := hashToIndex[m.Hash]; ok {
trace.Values[index] = m.Value
}
}
all.Traces = append(all.Traces, *trace)
return nil
}
// Data is the full set of traces for the last N days all parsed into structs.
type Data struct {
all *AllData
}
// AsJSON serializes the data as JSON.
func (d *Data) AsJSON(w io.Writer) error {
// TODO(jcgregorio) Keep a cache of the gzipped JSON around and serve that as long as it's fresh.
return json.NewEncoder(w).Encode(d.all)
}
// populateParamSet returns the set of all possible values for all the 'params'
// in AllData.
func populateParamSet(all *AllData) {
// First pull the data out into a map of sets.
type ChoiceSet map[string]bool
c := make(map[string]ChoiceSet)
for _, t := range all.Traces {
for k, v := range t.Params {
if set, ok := c[k]; !ok {
c[k] = make(map[string]bool)
c[k][v] = true
} else {
set[v] = true
}
}
}
// Now flatten the sets into []string and populate all.ParamsSet with that.
for k, v := range c {
allOptions := []string{}
for option, _ := range v {
allOptions = append(allOptions, option)
}
all.ParamSet[k] = allOptions
}
}
// NewData loads the data the first time and then starts a go routine to
// preiodically refresh the data.
//
// TODO(jcgregorio) Actuall do the bit where we start a go routine.
func NewData(doOauth bool, gitRepoDir string) (*Data, error) {
var err error
var client *http.Client
if doOauth {
client, err = runFlow(config)
if err != nil {
return nil, fmt.Errorf("Failed to auth: %s", err)
}
} else {
client, err = serviceaccount.NewClient(nil)
if err != nil {
return nil, fmt.Errorf("Failed to auth using a service account: %s", err)
}
}
service, err := bigquery.New(client)
if err != nil {
return nil, fmt.Errorf("Failed to create a new BigQuery service object: %s", err)
}
// First query and get the list of hashes we are interested in and use that
// and the git log results to fill in the Commits.
allGitHashes, err := readCommitsFromGit(gitRepoDir)
if err != nil {
return nil, fmt.Errorf("Failed to read hashes from Git log: %s", err)
}
hashesTested, err := gitCommitsWithTestData(service)
if err != nil {
return nil, fmt.Errorf("Failed to read hashes from BigQuery: %s", err)
}
// Order the git hashes by commit log order.
commits := make([]Commit, 0, len(hashesTested))
for i := len(allGitHashes) - 1; i >= 0; i-- {
h := allGitHashes[i]
if _, ok := hashesTested[h.Hash]; ok {
commits = append(commits, Commit{Hash: h.Hash, CommitTime: h.TimeStamp})
}
}
// The number of samples that appear in each trace.
numSamples := len(commits)
// A mapping of Git hashes to where they appear in the Commits array, also the index
// at which a measurement gets stored in the Values array.
hashToIndex := make(map[string]int)
for i, commit := range commits {
hashToIndex[commit.Hash] = i
}
all := &AllData{
Traces: make([]Trace, 0, 0),
ParamSet: make(map[string]Choices),
Commits: commits,
}
if err := populateTraces(service, all, hashToIndex, numSamples); err != nil {
// Fail fast, monit will restart us if we fail for some reason.
panic(err)
}
populateParamSet(all)
return &Data{all: all}, nil
}