blob: 1d3b2677469bc8e07c198cbd59ae998cf6445445 [file] [log] [blame]
package dataframe
import (
type ProcessState string
type RequestType int
const (
PROCESS_RUNNING ProcessState = "Running"
PROCESS_SUCCESS ProcessState = "Success"
PROCESS_ERROR ProcessState = "Error"
// Values for FrameRequest.RequestType.
const (
// MAX_FINISHED_PROCESS_AGE is the amount of time to keep a finished
// FrameRequestProcess around before deleting it.
var (
errorNotFound = errors.New("Process not found.")
// FrameRequest is used to deserialize JSON frame requests.
type FrameRequest struct {
Begin int `json:"begin"` // Beginning of time range in Unix timestamp seconds.
End int `json:"end"` // End of time range in Unix timestamp seconds.
Formulas []string `json:"formulas"` // The Formulae to evaluate.
Queries []string `json:"queries"` // The queries to perform encoded as a URL query.
Hidden []string `json:"hidden"` // The ids of traces to remove from the response.
Keys string `json:"keys"` // The id of a list of keys stored via shortcut2.
TZ string `json:"tz"` // The timezone the request is from.
NumCommits int32 `json:"num_commits"` // If RequestType is REQUEST_COMPACT, then the number of commits to show before End, and Begin is ignored.
RequestType RequestType `json:"request_type"`
func (f *FrameRequest) Id() string {
return fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%#v", *f))))
// FrameResponse is serialized to JSON as the response to frame requests.
type FrameResponse struct {
DataFrame *DataFrame `json:"dataframe"`
Ticks []interface{} `json:"ticks"`
Skps []int `json:"skps"`
Msg string `json:"msg"`
// FrameRequestProcess keeps track of a running Go routine that's
// processing a FrameRequest to build a FrameResponse.
type FrameRequestProcess struct {
// request is read-only, it should not be modified.
request *FrameRequest
// git is for Git info. The value of the 'git' variable should not be
// changed, but git is Go routine safe.
git *gitinfo.GitInfo
// dfBuilder builds DataFrame's.
dfBuilder DataFrameBuilder
ctx context.Context
mutex sync.RWMutex // Protects access to the remaining struct members.
response *FrameResponse
lastUpdate time.Time // The last time this process was updated.
state ProcessState // The current state of the process.
message string // A longer message if the process fails.
search int // The current search (either Formula or Query) being processed.
totalSearches int // The total number of Formulas and Queries in the FrameRequest.
percent float32 // The percentage of the searches complete [0.0-1.0].
func (fr *RunningFrameRequests) newProcess(ctx context.Context, req *FrameRequest) *FrameRequestProcess {
numKeys := 0
if req.Keys != "" {
numKeys = 1
ret := &FrameRequestProcess{
git: fr.git,
request: req,
lastUpdate: time.Now(),
totalSearches: len(req.Formulas) + len(req.Queries) + numKeys,
dfBuilder: fr.dfBuilder,
ctx: ctx,
go ret.Run(ctx)
return ret
// RunningFrameRequests keeps track of all the FrameRequestProcess's.
// Once a FrameRequestProcess is complete the results will be kept in memory
// for MAX_FINISHED_PROCESS_AGE before being deleted.
type RunningFrameRequests struct {
mutex sync.Mutex
git *gitinfo.GitInfo
dfBuilder DataFrameBuilder
// inProcess maps a FrameRequest.Id() of the request to the FrameRequestProcess
// handling that request.
inProcess map[string]*FrameRequestProcess
func NewRunningFrameRequests(git *gitinfo.GitInfo, dfBuilder DataFrameBuilder) *RunningFrameRequests {
fr := &RunningFrameRequests{
git: git,
dfBuilder: dfBuilder,
inProcess: map[string]*FrameRequestProcess{},
go fr.background()
return fr
// step does a single step in cleaning up old FrameRequestProcess's.
func (fr *RunningFrameRequests) step() {
defer fr.mutex.Unlock()
now := time.Now()
for k, v := range fr.inProcess {
if now.Sub(v.lastUpdate) > MAX_FINISHED_PROCESS_AGE {
delete(fr.inProcess, k)
// background periodically cleans up old FrameRequestProcess's.
func (fr *RunningFrameRequests) background() {
for range time.Tick(time.Minute) {
// Add starts a new running FrameRequestProcess and returns
// the ID of the process to be used in calls to Status() and
// Response().
func (fr *RunningFrameRequests) Add(ctx context.Context, req *FrameRequest) string {
defer fr.mutex.Unlock()
id := req.Id()
if _, ok := fr.inProcess[id]; !ok {
fr.inProcess[id] = fr.newProcess(ctx, req)
return id
// Status returns the ProcessingState, the message, and the percent complete of
// a FrameRequestProcess of the given 'id'.
func (fr *RunningFrameRequests) Status(id string) (ProcessState, string, float32, error) {
defer fr.mutex.Unlock()
if p, ok := fr.inProcess[id]; !ok {
return PROCESS_ERROR, "", 0.0, errorNotFound
} else {
return p.Status()
// Response returns the FrameResponse of the completed FrameRequestProcess.
func (fr *RunningFrameRequests) Response(id string) (*FrameResponse, error) {
defer fr.mutex.Unlock()
if p, ok := fr.inProcess[id]; !ok {
return nil, errorNotFound
} else {
return p.Response(), nil
// reportError records the reason a FrameRequestProcess failed.
func (p *FrameRequestProcess) reportError(err error, message string) {
defer p.mutex.Unlock()
sklog.Errorf("FrameRequest failed: %#v %s: %s", *(p.request), message, err)
p.message = message
p.lastUpdate = time.Now()
// progress records the progress of a FrameRequestProcess.
func (p *FrameRequestProcess) progress(step, totalSteps int) {
defer p.mutex.Unlock()
p.percent = (float32( + (float32(step) / float32(totalSteps))) / float32(p.totalSearches)
p.lastUpdate = time.Now()
// searchInc records the progress of a FrameRequestProcess as it completes each
// Query or Formula.
func (p *FrameRequestProcess) searchInc() {
defer p.mutex.Unlock() += 1
// Response returns the FrameResponse of the completed FrameRequestProcess.
func (p *FrameRequestProcess) Response() *FrameResponse {
defer p.mutex.Unlock()
return p.response
// Status returns the ProcessingState, the message, and the percent complete of
// a FrameRequestProcess of the given 'id'.
func (p *FrameRequestProcess) Status() (ProcessState, string, float32, error) {
defer p.mutex.Unlock()
return p.state, p.message, p.percent, nil
// Run does the work in a FrameRequestProcess. It does not return until all the
// work is done or the request failed. Should be run as a Go routine.
func (p *FrameRequestProcess) Run(ctx context.Context) {
begin := time.Unix(int64(p.request.Begin), 0)
end := time.Unix(int64(p.request.End), 0)
// Results from all the queries and calcs will be accumulated in this dataframe.
df := NewEmpty()
// Queries.
for _, q := range p.request.Queries {
newDF, err := p.doSearch(q, begin, end)
if err != nil {
p.reportError(err, "Failed to complete query.")
df = Join(df, newDF)
// Formulas.
for _, formula := range p.request.Formulas {
newDF, err := p.doCalc(formula, begin, end)
if err != nil {
p.reportError(err, "Failed to complete query.")
df = Join(df, newDF)
// Keys
if p.request.Keys != "" {
newDF, err := p.doKeys(p.request.Keys, begin, end)
if err != nil {
p.reportError(err, "Failed to complete query.")
df = Join(df, newDF)
// Filter out "Hidden" traces.
for _, key := range p.request.Hidden {
delete(df.TraceSet, key)
if len(df.Header) == 0 {
df = NewHeaderOnly(p.git, begin, end, true)
resp, err := ResponseFromDataFrame(p.ctx, df, p.git, true, p.request.TZ)
if err != nil {
p.reportError(err, "Failed to get ticks or skps.")
defer p.mutex.Unlock()
p.response = resp
// getCommitTimesForFile returns a slice of Unix timestamps in seconds that are
// the times that the given file changed in git between the given 'begin' and
// 'end' hashes (inclusive).
func getCommitTimesForFile(ctx context.Context, begin, end string, filename string, git *gitinfo.GitInfo) []int64 {
ret := []int64{}
// Now query for all the changes to the skp version over the given range of commits.
log, err := git.LogFine(ctx, begin+"^", end, "--format=format:%ct", "--", filename)
if err != nil {
sklog.Errorf("Could not get skp log for %s..%s -- %q: %s", begin, end, filename, err)
return ret
// Parse.
for _, s := range strings.Split(log, "\n") {
i, err := strconv.ParseInt(s, 10, 64)
if err != nil {
ret = append(ret, int64(i))
return ret
// getSkps returns the indices where the SKPs have been updated given
// the ColumnHeaders.
func getSkps(ctx context.Context, headers []*ColumnHeader, git *gitinfo.GitInfo) ([]int, error) {
// We have Offsets, which need to be converted to git hashes.
ci, err := git.ByIndex(ctx, int(headers[0].Offset))
if err != nil {
return nil, fmt.Errorf("Could not find commit for index %d: %s", headers[0].Offset, err)
begin := ci.Hash
ci, err = git.ByIndex(ctx, int(headers[len(headers)-1].Offset))
if err != nil {
return nil, fmt.Errorf("Could not find commit for index %d: %s", headers[len(headers)-1].Offset, err)
end := ci.Hash
// Now query for all the changes to the skp version over the given range of commits.
ts := getCommitTimesForFile(ctx, begin, end, "infra/bots/assets/skp/VERSION", git)
// Add in the changes to the old skp version over the given range of commits.
ts = append(ts, getCommitTimesForFile(ctx, begin, end, "SKP_VERSION", git)...)
// Sort because they are in reverse order.
// Now flag all the columns where the skp changes.
ret := []int{}
for i, h := range headers {
if len(ts) == 0 {
if h.Timestamp >= ts[0] {
ret = append(ret, i)
ts = ts[1:]
if len(ts) == 0 {
// Coalesce all skp updates for a col into a single index.
for len(ts) > 0 && h.Timestamp >= ts[0] {
ts = ts[1:]
return ret, nil
// ResponseFromDataFrame fills out the rest of a FrameResponse for the given DataFrame.
// If truncate is true then the number of traces returned is limited.
// tz is the timezone, and can be the empty string if the default (Eastern) timezone is acceptable.
func ResponseFromDataFrame(ctx context.Context, df *DataFrame, git *gitinfo.GitInfo, truncate bool, tz string) (*FrameResponse, error) {
if len(df.Header) == 0 {
return nil, fmt.Errorf("No commits matched that time range.")
// Calculate the human ticks based on the column headers.
ts := []int64{}
for _, c := range df.Header {
ts = append(ts, c.Timestamp)
ticks := human.FlotTickMarks(ts, tz)
// Determine where SKP changes occurred.
skps, err := getSkps(ctx, df.Header, git)
if err != nil {
return nil, fmt.Errorf("Failed to load skps: %s", err)
// Truncate the result if it's too large.
msg := ""
if truncate && len(df.TraceSet) > MAX_TRACES_IN_RESPONSE {
msg = fmt.Sprintf("Response too large, the number of traces returned has been truncated from %d to %d.", len(df.TraceSet), MAX_TRACES_IN_RESPONSE)
keys := []string{}
for k := range df.TraceSet {
keys = append(keys, k)
newTraceSet := types.TraceSet{}
for _, key := range keys {
newTraceSet[key] = df.TraceSet[key]
df.TraceSet = newTraceSet
return &FrameResponse{
DataFrame: df,
Ticks: ticks,
Skps: skps,
Msg: msg,
}, nil
// doSearch applies the given query and returns a dataframe that matches the
// given time range [begin, end) in a DataFrame.
func (p *FrameRequestProcess) doSearch(queryStr string, begin, end time.Time) (*DataFrame, error) {
urlValues, err := url.ParseQuery(queryStr)
if err != nil {
return nil, fmt.Errorf("Failed to parse query: %s", err)
q, err := query.New(urlValues)
if err != nil {
return nil, fmt.Errorf("Invalid Query: %s", err)
if p.request.RequestType == REQUEST_TIME_RANGE {
return p.dfBuilder.NewFromQueryAndRange(begin, end, q, true, p.progress)
} else {
return p.dfBuilder.NewNFromQuery(p.ctx, end, q, p.request.NumCommits, p.progress)
// doKeys returns a DataFrame that matches the given set of keys given
// the time range [begin, end).
func (p *FrameRequestProcess) doKeys(keyID string, begin, end time.Time) (*DataFrame, error) {
keys, err := shortcut2.Get(keyID)
if err != nil {
return nil, fmt.Errorf("Failed to find that set of keys %q: %s", keyID, err)
if p.request.RequestType == REQUEST_TIME_RANGE {
return p.dfBuilder.NewFromKeysAndRange(keys.Keys, begin, end, true, p.progress)
} else {
return p.dfBuilder.NewNFromKeys(p.ctx, end, keys.Keys, p.request.NumCommits, p.progress)
// doCalc applies the given formula and returns a dataframe that matches the
// given time range [begin, end) in a DataFrame.
func (p *FrameRequestProcess) doCalc(formula string, begin, end time.Time) (*DataFrame, error) {
// During the calculation 'rowsFromQuery' will be called to load up data, we
// will capture the dataframe that's created at that time. We only really
// need df.Headers so it doesn't matter if the calculation has multiple calls
// to filter(), we can just use the last one returned.
var df *DataFrame
rowsFromQuery := func(s string) (calc.Rows, error) {
urlValues, err := url.ParseQuery(s)
if err != nil {
return nil, err
q, err := query.New(urlValues)
if err != nil {
return nil, err
if p.request.RequestType == REQUEST_TIME_RANGE {
df, err = p.dfBuilder.NewFromQueryAndRange(begin, end, q, true, p.progress)
} else {
df, err = p.dfBuilder.NewNFromQuery(p.ctx, end, q, p.request.NumCommits, p.progress)
if err != nil {
return nil, err
// DataFrames are float32, but calc does its work in float64.
rows := calc.Rows{}
for k, v := range df.TraceSet {
rows[k] = vec32.Dup(v)
return rows, nil
rowsFromShortcut := func(s string) (calc.Rows, error) {
keys, err := shortcut2.Get(s)
if err != nil {
return nil, err
if p.request.RequestType == REQUEST_TIME_RANGE {
df, err = p.dfBuilder.NewFromKeysAndRange(keys.Keys, begin, end, true, p.progress)
} else {
df, err = p.dfBuilder.NewNFromKeys(p.ctx, end, keys.Keys, p.request.NumCommits, p.progress)
if err != nil {
return nil, err
// DataFrames are float32, but calc does its work in float64.
rows := calc.Rows{}
for k, v := range df.TraceSet {
rows[k] = vec32.Dup(v)
return rows, nil
ctx := calc.NewContext(rowsFromQuery, rowsFromShortcut)
rows, err := ctx.Eval(formula)
if err != nil {
return nil, fmt.Errorf("Calculation failed: %s", err)
// Convert the Rows from float64 to float32 for DataFrame.
ts := types.TraceSet{}
for k, v := range rows {
ts[k] = v
df.TraceSet = ts
// Clear the paramset since we are returning calculated values.
df.ParamSet = paramtools.ParamSet{}
return df, nil