blob: 69dbd51511bb04aaa5b88164ba1af1941e26cfa5 [file] [log] [blame]
// dryrun allows testing an Alert and seeing the regression it would find.
package dryrun
import (
"context"
"crypto/md5"
"encoding/json"
"fmt"
"net/http"
"sort"
"sync"
"time"
"github.com/gorilla/mux"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/vcsinfo"
"go.skia.org/infra/perf/go/alerts"
"go.skia.org/infra/perf/go/cid"
"go.skia.org/infra/perf/go/dataframe"
"go.skia.org/infra/perf/go/regression"
)
const (
CLEANUP_DURATION = 5 * time.Minute
)
// Domain is the time domain over which to look for commit.
//
// Generated by the <domain-picker-sk> element.
type Domain struct {
Begin int `json:"begin"` // Beginning of time range in Unix timestamp seconds.
End int `json:"end"` // End of time range in Unix timestamp seconds.
NumCommits int32 `json:"num_commits"` // If RequestType is REQUEST_COMPACT, then the number of commits to show before End, and Begin is ignored.
RequestType dataframe.RequestType `json:"request_type"`
}
// StartRequest is the data POSTed to StartHandler.
type StartRequest struct {
Config alerts.Config `json:"config"`
Domain Domain `json:"domain"`
}
// Id is used to identify StartRequests as stored in Requests.inFlight.
func (s *StartRequest) Id() string {
return fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%#v", *s))))
}
// StartResponse is the JSON response sent from StartHandler.
type StartResponse struct {
ID string `json:"id"`
}
// Running is the data stored for each running dryrun.
type Running struct {
mutex sync.Mutex
whenFinished time.Time
Finished bool `json:"finished"` // True if the dry run is complete.
Message string `json:"message"` // Human readable string describing the dry run state.
Regressions map[string]*regression.Regression `json:"regressions"` // All the regressions found so far.
}
// Requests handles HTTP request for doing dryruns.
type Requests struct {
cidl *cid.CommitIDLookup
dfBuilder dataframe.DataFrameBuilder
vcs vcsinfo.VCS
paramsProvider regression.ParamsetProvider // TODO build the paramset from dfBuilder.
mutex sync.Mutex
inFlight map[string]*Running
}
func New(cidl *cid.CommitIDLookup, dfBuilder dataframe.DataFrameBuilder, paramsProvider regression.ParamsetProvider, vcs vcsinfo.VCS) *Requests {
ret := &Requests{
cidl: cidl,
dfBuilder: dfBuilder,
paramsProvider: paramsProvider,
vcs: vcs,
inFlight: map[string]*Running{},
}
// Start a go routine to clean up old dry runs.
go ret.cleaner()
return ret
}
// cleanerStep does a single step of cleaner().
func (d *Requests) cleanerStep() {
cutoff := time.Now().Add(-CLEANUP_DURATION)
d.mutex.Lock()
defer d.mutex.Unlock()
for k, v := range d.inFlight {
if !v.Finished {
continue
}
if v.whenFinished.Before(cutoff) {
delete(d.inFlight, k)
}
}
metrics2.GetInt64Metric("dryrun_inflight", nil).Update(int64(len(d.inFlight)))
}
// cleaner removes old dry runs from inFlight.
func (d *Requests) cleaner() {
for range time.Tick(CLEANUP_DURATION) {
d.cleanerStep()
}
}
func (d *Requests) StartHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
var req StartRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
httputils.ReportError(w, err, "Could not decode POST body.", http.StatusInternalServerError)
return
}
if req.Config.Query == "" {
httputils.ReportError(w, fmt.Errorf("Query was empty."), "A Query is required.", http.StatusInternalServerError)
return
}
if err := req.Config.Validate(); err != nil {
httputils.ReportError(w, err, "Invalid Alert config.", http.StatusInternalServerError)
return
}
d.mutex.Lock()
defer d.mutex.Unlock()
id := req.Id()
if p, ok := d.inFlight[id]; ok {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.Finished {
delete(d.inFlight, id)
}
}
if _, ok := d.inFlight[id]; !ok {
running := &Running{
Finished: false,
Message: "Starting dry run.",
Regressions: map[string]*regression.Regression{},
}
d.inFlight[id] = running
go func() {
ctx := context.Background()
// Create a callback that will be passed each found Regression.
cb := func(queryRequest *regression.ClusterRequest, clusterResponse []*regression.ClusterResponse) {
running.mutex.Lock()
defer running.mutex.Unlock()
// Loop over clusterResponse, convert each one to a regression, and merge with running.Regressions.
for _, cr := range clusterResponse {
c, reg, err := regression.RegressionFromClusterResponse(ctx, cr, &req.Config, d.cidl)
if err != nil {
running.Message = "Failed to convert to Regression, some data may be missing."
sklog.Errorf("Failed to convert to Regression: %s", err)
return
}
id := c.ID()
running.Message = fmt.Sprintf("Commit: %s", id)
// We might not have found any regressions.
if reg.Low == nil && reg.High == nil {
continue
}
if origReg, ok := running.Regressions[id]; !ok {
running.Regressions[id] = reg
} else {
running.Regressions[id] = origReg.Merge(reg)
}
}
}
end := time.Unix(int64(req.Domain.End), 0)
regression.RegressionsForAlert(ctx, &req.Config, d.paramsProvider(), cb, int(req.Domain.NumCommits), end, d.vcs, d.cidl, d.dfBuilder, nil)
running.mutex.Lock()
defer running.mutex.Unlock()
running.Finished = true
running.whenFinished = time.Now()
running.Message = "Dry run complete."
}()
}
resp := StartResponse{
ID: id,
}
if err := json.NewEncoder(w).Encode(resp); err != nil {
sklog.Errorf("Failed to encode paramset: %s", err)
}
}
// RegressionRow is a Regression found for a specific commit.
type RegressionRow struct {
CID *cid.CommitDetail `json:"cid"`
Regression *regression.Regression `json:"regression"`
}
// Status is the JSON response sent from StatusHandler.
type Status struct {
Finished bool `json:"finished"`
Message string `json:"message"`
Regressions []*RegressionRow `json:"regressions"`
}
func (d *Requests) StatusHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
id := mux.Vars(r)["id"]
d.mutex.Lock()
defer d.mutex.Unlock()
// Grab the running dryrun.
running, ok := d.inFlight[id]
if !ok {
httputils.ReportError(w, fmt.Errorf("Invalid id: %q", id), "Invalid or expired dry run.", http.StatusInternalServerError)
return
}
// Convert the Running.Regressions into a properly formed Status response.
running.mutex.Lock()
defer running.mutex.Unlock()
keys := []string{}
for id := range running.Regressions {
keys = append(keys, id)
}
sort.Strings(keys)
cids := []*cid.CommitID{}
for _, key := range keys {
commitId, err := cid.FromID(key)
if err != nil {
httputils.ReportError(w, err, "Failed to parse commit id.", http.StatusInternalServerError)
return
}
cids = append(cids, commitId)
}
cidd, err := d.cidl.Lookup(r.Context(), cids)
if err != nil {
httputils.ReportError(w, err, "Failed to find commit ids.", http.StatusInternalServerError)
return
}
status := &Status{
Finished: running.Finished,
Message: running.Message,
Regressions: []*RegressionRow{},
}
for _, details := range cidd {
status.Regressions = append(status.Regressions, &RegressionRow{
CID: details,
Regression: running.Regressions[details.ID()],
})
}
if err := json.NewEncoder(w).Encode(status); err != nil {
sklog.Errorf("Failed to encode paramset: %s", err)
}
}