blob: 142197501221df5471477029cd7bc2c6832620fa [file] [log] [blame]
package clustering2
import (
"crypto/md5"
"errors"
"fmt"
"math"
"net/url"
"sync"
"time"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/vcsinfo"
"go.skia.org/infra/go/git/gitinfo"
"go.skia.org/infra/go/query"
"go.skia.org/infra/go/vec32"
"go.skia.org/infra/perf/go/cid"
"go.skia.org/infra/perf/go/config"
"go.skia.org/infra/perf/go/dataframe"
"go.skia.org/infra/perf/go/ptracestore"
)
type ProcessState string
const (
PROCESS_RUNNING ProcessState = "Running"
PROCESS_SUCCESS ProcessState = "Success"
PROCESS_ERROR ProcessState = "Error"
)
const (
// MAX_FINISHED_PROCESS_AGE is the amount of time to keep a finished
// ClusterRequestProcess around before deleting it.
MAX_FINISHED_PROCESS_AGE = time.Minute
// The following limits are just to prevent excessively large or long-running
// clusterings from being triggered.
// MAX_K is the largest K used for clustering.
MAX_K = 100
// MAX_RADIUS is the maximum number of points on either side of a commit
// that will be included in clustering.
MAX_RADIUS = 25
// SPARSE_BLOCK_SEARCH_MULT When searching for commits that have data in a
// sparse data set, we'll request data in chunks of this many commits per
// point we are looking for.
SPARSE_BLOCK_SEARCH_MULT = 100
)
var (
errorNotFound = errors.New("Process not found.")
)
type ClusterAlgo string
// ClusterAlgo constants.
//
// Update algo-select-sk if this enum is changed.
const (
KMEANS_ALGO ClusterAlgo = "kmeans" // Cluster traces using k-means clustering on their shapes.
STEPFIT_ALGO ClusterAlgo = "stepfit" // Look at each trace individually and determing if it steps up or down.
)
var (
allClusterAlgos = []ClusterAlgo{KMEANS_ALGO, STEPFIT_ALGO}
)
func ToClusterAlgo(s string) (ClusterAlgo, error) {
ret := ClusterAlgo(s)
for _, c := range allClusterAlgos {
if c == ret {
return ret, nil
}
}
return ret, fmt.Errorf("%q is not a valid ClusterAlgo, must be a value in %v", s, allClusterAlgos)
}
// ClusterRequest is all the info needed to start a clustering run.
type ClusterRequest struct {
Source string `json:"source"`
Offset int `json:"offset"`
Radius int `json:"radius"`
Query string `json:"query"`
K int `json:"k"`
TZ string `json:"tz"`
Algo ClusterAlgo `json:"algo"`
Interesting float32 `json:"interesting"`
Sparse bool `json:"sparse"`
}
func (c *ClusterRequest) Id() string {
return fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%#v", *c))))
}
// ClusterResponse is the response from running clustering over a ClusterRequest.
type ClusterResponse struct {
Summary *ClusterSummaries `json:"summary"`
Frame *dataframe.FrameResponse `json:"frame"`
}
// ClusterRequestProcess handles the processing of a single ClusterRequest.
type ClusterRequestProcess struct {
// These members are read-only, should not be modified.
request *ClusterRequest
git *gitinfo.GitInfo
cidl *cid.CommitIDLookup
// mutex protects access to the remaining struct members.
mutex sync.RWMutex
response *ClusterResponse // The response when the clustering is complete.
lastUpdate time.Time // The last time this process was updated.
state ProcessState // The current state of the process.
message string // Describes the current state of the process.
cids []*cid.CommitID // The cids to run the clustering over. Calculated in calcCids.
}
func newProcess(req *ClusterRequest, git *gitinfo.GitInfo, cidl *cid.CommitIDLookup) *ClusterRequestProcess {
return &ClusterRequestProcess{
request: req,
git: git,
cidl: cidl,
lastUpdate: time.Now(),
state: PROCESS_RUNNING,
message: "Running",
}
}
func newRunningProcess(req *ClusterRequest, git *gitinfo.GitInfo, cidl *cid.CommitIDLookup) *ClusterRequestProcess {
ret := newProcess(req, git, cidl)
go ret.Run()
return ret
}
// RunningClusterRequests keeps track of all the ClusterRequestProcess's.
//
// Once a ClusterRequestProcess is complete the results will be kept in memory
// for MAX_FINISHED_PROCESS_AGE before being deleted.
type RunningClusterRequests struct {
git *gitinfo.GitInfo
cidl *cid.CommitIDLookup
defaultInteresting float32 // The threshold to control if a cluster is considered interesting.
mutex sync.Mutex
// inProcess maps a ClusterRequest.Id() of the request to the ClusterRequestProcess
// handling that request.
inProcess map[string]*ClusterRequestProcess
}
// NewRunningClusterRequests return a new RunningClusterRequests.
func NewRunningClusterRequests(git *gitinfo.GitInfo, cidl *cid.CommitIDLookup, interesting float32) *RunningClusterRequests {
fr := &RunningClusterRequests{
git: git,
cidl: cidl,
inProcess: map[string]*ClusterRequestProcess{},
defaultInteresting: interesting,
}
go fr.background()
return fr
}
// step does a single step in cleaning up old ClusterRequestProcess's.
func (fr *RunningClusterRequests) step() {
fr.mutex.Lock()
defer fr.mutex.Unlock()
now := time.Now()
for k, v := range fr.inProcess {
v.mutex.Lock()
if now.Sub(v.lastUpdate) > MAX_FINISHED_PROCESS_AGE {
delete(fr.inProcess, k)
}
v.mutex.Unlock()
}
}
// background periodically cleans up old ClusterRequestProcess's.
func (fr *RunningClusterRequests) background() {
fr.step()
for range time.Tick(time.Minute) {
fr.step()
}
}
// Add starts a new running ClusterRequestProcess and returns
// the ID of the process to be used in calls to Status() and
// Response().
func (fr *RunningClusterRequests) Add(req *ClusterRequest) string {
fr.mutex.Lock()
defer fr.mutex.Unlock()
if req.Interesting == 0 {
req.Interesting = fr.defaultInteresting
}
id := req.Id()
if p, ok := fr.inProcess[id]; ok {
state, _, _ := p.Status()
if state != PROCESS_RUNNING {
delete(fr.inProcess, id)
}
}
if _, ok := fr.inProcess[id]; !ok {
fr.inProcess[id] = newRunningProcess(req, fr.git, fr.cidl)
}
return id
}
// Status returns the ProcessingState and the message of a
// ClusterRequestProcess of the given 'id'.
func (fr *RunningClusterRequests) Status(id string) (ProcessState, string, error) {
fr.mutex.Lock()
defer fr.mutex.Unlock()
if p, ok := fr.inProcess[id]; !ok {
return PROCESS_ERROR, "Not Found", errorNotFound
} else {
return p.Status()
}
}
// Response returns the ClusterResponse of the completed ClusterRequestProcess.
func (fr *RunningClusterRequests) Response(id string) (*ClusterResponse, error) {
fr.mutex.Lock()
defer fr.mutex.Unlock()
if p, ok := fr.inProcess[id]; !ok {
return nil, errorNotFound
} else {
return p.Response(), nil
}
}
// reportError records the reason a ClusterRequestProcess failed.
func (p *ClusterRequestProcess) reportError(err error, message string) {
p.mutex.Lock()
defer p.mutex.Unlock()
sklog.Warningf("ClusterRequest failed: %#v %s: %s", *(p.request), message, err)
p.message = message
p.state = PROCESS_ERROR
p.lastUpdate = time.Now()
}
// progress records the progress of a ClusterRequestProcess.
func (p *ClusterRequestProcess) progress(step, totalSteps int) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.message = fmt.Sprintf("Querying: %d%%", int(float32(100.0)*float32(step)/float32(totalSteps)))
p.lastUpdate = time.Now()
}
// clusterProgress records the progress of a ClusterRequestProcess.
func (p *ClusterRequestProcess) clusterProgress(totalError float64) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.message = fmt.Sprintf("Clustering Total Error: %0.2f", totalError)
p.lastUpdate = time.Now()
}
// Response returns the ClusterResponse of the completed ClusterRequestProcess.
func (p *ClusterRequestProcess) Response() *ClusterResponse {
p.mutex.Lock()
defer p.mutex.Unlock()
return p.response
}
// Status returns the ProcessingState and the message of a
// ClusterRequestProcess of the given 'id'.
func (p *ClusterRequestProcess) Status() (ProcessState, string, error) {
p.mutex.Lock()
defer p.mutex.Unlock()
return p.state, p.message, nil
}
// missing returns true if >50% of the trace is vec32.MISSING_DATA_SENTINEL.
func missing(tr ptracestore.Trace) bool {
count := 0
for _, x := range tr {
if x == vec32.MISSING_DATA_SENTINEL {
count++
}
}
return (100*count)/len(tr) > 50
}
// tooMuchMissingData returns true if a trace has too many
// MISSING_DATA_SENTINEL values.
//
// The criteria is if there is >50% missing data on either side of the target
// commit, which sits at the center of the trace.
func tooMuchMissingData(tr ptracestore.Trace) bool {
if len(tr) < 3 {
return false
}
n := len(tr) / 2
if tr[n] == vec32.MISSING_DATA_SENTINEL {
return true
}
return missing(tr[:n]) || missing(tr[len(tr)-n:])
}
// CidsWithDataInRange is passed to calcCids, and returns all
// the commit ids in [begin, end) that have data.
type CidsWithDataInRange func(begin, end int) ([]*cid.CommitID, error)
// cidsWithData returns the commit ids in the dataframe that have non-missing
// data in at least one trace.
func cidsWithData(df *dataframe.DataFrame) []*cid.CommitID {
ret := []*cid.CommitID{}
for i, h := range df.Header {
for _, tr := range df.TraceSet {
if tr[i] != vec32.MISSING_DATA_SENTINEL {
ret = append(ret, &cid.CommitID{
Source: h.Source,
Offset: int(h.Offset),
})
break
}
}
}
return ret
}
// calcCids returns a slice of CommitID's that clustering should be run over.
func calcCids(request *ClusterRequest, v vcsinfo.VCS, cidsWithDataInRange CidsWithDataInRange) ([]*cid.CommitID, error) {
cids := []*cid.CommitID{}
if request.Sparse {
// Sparse means data might not be available for every commit, so we need to scan
// the data and gather up +/- Radius commits from the target commit that actually
// do have data.
// Start by checking center point as a quick exit strategy.
withData, err := cidsWithDataInRange(request.Offset, request.Offset+1)
if err != nil {
return nil, err
}
if len(withData) == 0 {
return nil, fmt.Errorf("No data at the target commit id.")
}
cids = append(cids, withData...)
// Then check from the target forward in time.
lastCommit := v.LastNIndex(1)
lastIndex := lastCommit[0].Index
finalIndex := request.Offset + 1 + SPARSE_BLOCK_SEARCH_MULT*request.Radius
if finalIndex > lastIndex {
finalIndex = lastIndex
}
withData, err = cidsWithDataInRange(request.Offset+1, finalIndex)
if err != nil {
return nil, err
}
if len(withData) < request.Radius {
return nil, fmt.Errorf("Not enough sparse data after the target commit.")
}
cids = append(cids, withData[:request.Radius]...)
// Finally check backward in time.
startIndex := request.Offset - SPARSE_BLOCK_SEARCH_MULT*request.Radius
withData, err = cidsWithDataInRange(startIndex, request.Offset)
if err != nil {
return nil, err
}
if len(withData) < request.Radius {
return nil, fmt.Errorf("Not enough sparse data before the target commit.")
}
withData = withData[len(withData)-request.Radius:]
cids = append(withData, cids...)
} else {
if request.Radius <= 0 {
request.Radius = 1
}
if request.Radius > MAX_RADIUS {
request.Radius = MAX_RADIUS
}
for i := request.Offset - request.Radius; i <= request.Offset+request.Radius; i++ {
cids = append(cids, &cid.CommitID{
Source: request.Source,
Offset: i,
})
}
}
return cids, nil
}
// Run does the work in a ClusterRequestProcess. It does not return until all the
// work is done or the request failed. Should be run as a Go routine.
func (p *ClusterRequestProcess) Run() {
if p.request.Algo == "" {
p.request.Algo = KMEANS_ALGO
}
parsedQuery, err := url.ParseQuery(p.request.Query)
if err != nil {
p.reportError(err, "Invalid URL query.")
return
}
q, err := query.New(parsedQuery)
if err != nil {
p.reportError(err, "Invalid URL query.")
return
}
// cidsWithDataInRange is a closure that we pass to calcCids that returns
// the CommitID's that are in the given range of offsets that have
// data in at least one trace that matches the current query.
cidsWithDataInRange := func(begin, end int) ([]*cid.CommitID, error) {
c := []*cid.CommitID{}
for i := begin; i < end; i++ {
c = append(c, &cid.CommitID{
Source: p.request.Source,
Offset: i,
})
}
df, err := dataframe.NewFromCommitIDsAndQuery(c, p.cidl, ptracestore.Default, q, nil)
if err != nil {
return nil, fmt.Errorf("Failed to load data searching for commit ids: %s", err)
}
return cidsWithData(df), nil
}
p.cids, err = calcCids(p.request, p.git, cidsWithDataInRange)
if err != nil {
p.reportError(err, "Could not calculate the commits to run a cluster over.")
return
}
df, err := dataframe.NewFromCommitIDsAndQuery(p.cids, p.cidl, ptracestore.Default, q, p.progress)
if err != nil {
p.reportError(err, "Invalid range of commits.")
return
}
before := len(df.TraceSet)
// Filter out Traces with insufficient data. I.e. we need 50% or more data
// on either side of the target commit.
df.FilterOut(tooMuchMissingData)
after := len(df.TraceSet)
sklog.Infof("Filtered Traces: %d %d %d", before, after, before-after)
k := p.request.K
if k <= 0 || k > MAX_K {
n := len(df.TraceSet)
// We want K to be around 50 when n = 30000, which has been determined via
// trial and error to be a good value for the Perf data we are working in. We
// want K to decrease from there as n gets smaller, but don't want K to go
// below 10, so we use a simple linear relation:
//
// k = 40/30000 * n + 10
//
k = int(math.Floor((40.0/30000.0)*float64(n) + 10))
}
sklog.Infof("Clustering with K=%d", k)
summary, err := CalculateClusterSummaries(df, k, config.MIN_STDDEV, p.clusterProgress, p.request.Interesting, p.request.Algo)
if err != nil {
p.reportError(err, "Invalid clustering.")
return
}
df.TraceSet = ptracestore.TraceSet{}
frame, err := dataframe.ResponseFromDataFrame(df, p.git, false, p.request.TZ)
if err != nil {
p.reportError(err, "Failed to convert DataFrame to FrameResponse.")
return
}
p.mutex.Lock()
defer p.mutex.Unlock()
p.state = PROCESS_SUCCESS
p.message = ""
p.response = &ClusterResponse{
Summary: summary,
Frame: frame,
}
}