blob: e2f1751e5b58e5e1a5d4de4ca550d28eb62ddd8e [file] [log] [blame]
// Package parser has funcs for parsing ingestion files.
package parser
import (
"bytes"
"context"
"errors"
"io"
"regexp"
"strconv"
"strings"
"go.opencensus.io/trace"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/query"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/perf/go/config"
"go.skia.org/infra/perf/go/file"
"go.skia.org/infra/perf/go/ingest/format"
"go.skia.org/infra/perf/go/types"
)
var (
// ErrFileShouldBeSkipped is returned if a file should be skipped.
ErrFileShouldBeSkipped = errors.New("File should be skipped.")
)
// Parser parses file.Files contents into a form suitable for writing to trace.Store.
type Parser struct {
parseCounter metrics2.Counter
parseFailCounter metrics2.Counter
branchNames map[string]bool
invalidParamCharRegex *regexp.Regexp
}
// New creates a new instance of Parser for the given branch names
// and invalid chars of parameter key/value
func New(instanceConfig *config.InstanceConfig) (parser *Parser, err error) {
branches := instanceConfig.IngestionConfig.Branches
invalidParamCharRegex := query.InvalidChar
if instanceConfig.InvalidParamCharRegex != "" {
invalidParamCharRegex, err = regexp.Compile(instanceConfig.InvalidParamCharRegex)
if err != nil {
return nil, skerr.Wrap(err)
}
}
ret := &Parser{
parseCounter: metrics2.GetCounter("perf_ingest_parser_parse", nil),
parseFailCounter: metrics2.GetCounter("perf_ingest_parser_parse_failed", nil),
branchNames: map[string]bool{},
invalidParamCharRegex: invalidParamCharRegex,
}
for _, branchName := range branches {
ret.branchNames[branchName] = true
}
return ret, nil
}
// buildInitialParams returns a Params for the given BenchResult.
func buildInitialParams(testName, configName string, b *format.BenchData, result format.BenchResult) paramtools.Params {
ret := paramtools.Params(b.Key).Copy()
ret["test"] = testName
ret["config"] = configName
ret.Add(paramtools.Params(b.Options))
// If there is an options map inside the result add it to the params.
if resultOptions, ok := result["options"]; ok {
if opts, ok := resultOptions.(map[string]interface{}); ok {
for k, vi := range opts {
// Ignore the very long and not useful GL_ values, we can retrieve
// them later via ptracestore.Details.
if strings.HasPrefix(k, "GL_") {
continue
}
if s, ok := vi.(string); ok {
ret[k] = s
}
}
}
}
return ret
}
// getParamsAndValuesFromLegacyFormat returns two parallel slices, each slice
// contains the params and then the float for a single value of a trace.
func getParamsAndValuesFromLegacyFormat(b *format.BenchData) ([]paramtools.Params, []float32) {
params := []paramtools.Params{}
values := []float32{}
for testName, allConfigs := range b.Results {
for configName, result := range allConfigs {
key := buildInitialParams(testName, configName, b, result)
for k, vi := range result {
if k == "options" || k == "samples" {
continue
}
key["sub_result"] = k
floatVal, ok := vi.(float64)
if !ok {
sklog.Errorf("Found a non-float64 in %v", result)
continue
}
key = query.ForceValid(key)
params = append(params, key.Copy())
values = append(values, float32(floatVal))
}
}
}
return params, values
}
// Samples contain multiple runs of the same test, where Params describes the
// test.
type Samples struct {
Params paramtools.Params
Values []float64
}
// SamplesSet maps trace names to Samples for that trace.
type SamplesSet map[string]Samples
// Add all the Samples from 'in'.
func (s SamplesSet) Add(in SamplesSet) {
for key, samples := range in {
existingSamples, ok := s[key]
if !ok {
existingSamples = Samples{
Params: samples.Params.Copy(),
Values: []float64{},
}
}
existingSamples.Values = append(existingSamples.Values, samples.Values...)
s[key] = existingSamples
}
}
// GetSamplesFromLegacyFormat returns a map from trace id to the slice of
// samples for that test.
func GetSamplesFromLegacyFormat(b *format.BenchData) SamplesSet {
ret := SamplesSet{}
for testName, allConfigs := range b.Results {
for configName, result := range allConfigs {
params := buildInitialParams(testName, configName, b, result)
iSamples, ok := result["samples"]
if !ok {
continue
}
// We only collect samples for min_ms.
params["sub_result"] = "min_ms"
params = query.ForceValid(params)
traceID, err := query.MakeKeyFast(params)
if err != nil {
continue
}
iSlice := iSamples.([]interface{})
values := make([]float64, 0, len(iSlice))
for _, ix := range iSlice {
x, ok := ix.(float64)
if !ok {
continue
}
values = append(values, x)
}
ret[traceID] = Samples{
Params: params,
Values: values,
}
}
}
return ret
}
// getParamsAndValuesFromVersion1Format returns two parallel slices, each slice contains
// the params and then the float for a single value of a trace.
func getParamsAndValuesFromVersion1Format(f format.Format, invalidParamCharRegex *regexp.Regexp) ([]paramtools.Params, []float32) {
paramSlice := []paramtools.Params{}
keyParams := paramtools.Params(f.Key)
measurementSlice := []float32{}
for _, result := range f.Results {
p := keyParams.Copy()
p.Add(result.Key)
if len(result.Measurements) == 0 {
paramSlice = append(paramSlice, query.ForceValidWithRegex(p, invalidParamCharRegex))
measurementSlice = append(measurementSlice, result.Measurement)
} else {
for key, measurements := range result.Measurements {
for _, measurement := range measurements {
singleParam := p.Copy()
singleParam[key] = measurement.Value
paramSlice = append(paramSlice, query.ForceValidWithRegex(singleParam, invalidParamCharRegex))
measurementSlice = append(measurementSlice, measurement.Measurement)
}
}
}
}
return paramSlice, measurementSlice
}
// checkBranchName returns the branch name and true if the file should continue
// to be processed. Note that if the 'params' don't contain a key named 'branch'
// then the file should be processed, in which case the returned branch name is
// "".
func (p *Parser) checkBranchName(params map[string]string) (string, bool) {
if len(p.branchNames) == 0 {
return "", true
}
branch, ok := params["branch"]
if ok {
return branch, p.branchNames[branch]
}
return "", true
}
func (p *Parser) extractFromLegacyFile(r io.Reader, filename string) ([]paramtools.Params, []float32, string, map[string]string, error) {
benchData, err := format.ParseLegacyFormat(r)
if err != nil {
return nil, nil, "", nil, err
}
params, values := getParamsAndValuesFromLegacyFormat(benchData)
return params, values, benchData.Hash, benchData.Key, nil
}
func (p *Parser) extractFromVersion1File(r io.Reader, filename string) ([]paramtools.Params, []float32, string, map[string]string, error) {
f, err := format.Parse(r)
if err != nil {
sklog.Warningf("Failed to parse the version one file: %s, got error: %s", filename, err)
return nil, nil, "", nil, err
}
params, values := getParamsAndValuesFromVersion1Format(f, p.invalidParamCharRegex)
return params, values, f.GitHash, f.Key, nil
}
// Parse the given file.File contents.
//
// Returns two parallel slices, each slice contains the params and then the
// float for a single value of a trace.
//
// The returned error will be ErrFileShouldBeSkipped if the file should not be
// processed any further.
//
// The File.Contents will be closed when this func returns.
func (p *Parser) Parse(ctx context.Context, file file.File) ([]paramtools.Params, []float32, string, error) {
_, span := trace.StartSpan(ctx, "ingest.parser.Parse")
defer span.End()
defer util.Close(file.Contents)
p.parseCounter.Inc(1)
// Read the whole content into bytes.Reader since we may take more than one
// pass at the data.
sklog.Infof("About to read.")
b, err := io.ReadAll(file.Contents)
sklog.Infof("Finished readall.")
if err != nil {
p.parseFailCounter.Inc(1)
return nil, nil, "", skerr.Wrap(err)
}
r := bytes.NewReader(b)
// Expect the file to be in format.FileFormat.
sklog.Info("About to extract")
params, values, hash, commonKeys, err := p.extractFromVersion1File(r, file.Name)
if err != nil {
// Fallback to the legacy format.
if _, err := r.Seek(0, io.SeekStart); err != nil {
return nil, nil, "", skerr.Wrap(err)
}
sklog.Info("About to extract from legacy.")
params, values, hash, commonKeys, err = p.extractFromLegacyFile(r, file.Name)
}
if err != nil && err != ErrFileShouldBeSkipped {
p.parseFailCounter.Inc(1)
}
if err != nil {
return nil, nil, "", err
}
branch, ok := p.checkBranchName(commonKeys)
if !ok {
return nil, nil, "", ErrFileShouldBeSkipped
}
if len(params) == 0 {
metrics2.GetCounter("perf_ingest_parser_no_data_in_file", map[string]string{"branch": branch}).Inc(1)
sklog.Infof("No data in: %q", file.Name)
return nil, nil, "", ErrFileShouldBeSkipped
}
return params, values, hash, nil
}
// ParseTryBot extracts the issue and patch identifiers from the file.File.
//
// The issue and patch values are returned as strings. If either can be further
// parsed as integers that will be done at a higher level.
func (p *Parser) ParseTryBot(file file.File) (types.CL, string, error) {
defer util.Close(file.Contents)
p.parseCounter.Inc(1)
// Read the whole content into bytes.Reader since we may take more than one
// pass at the data.
b, err := io.ReadAll(file.Contents)
if err != nil {
p.parseFailCounter.Inc(1)
return "", "", skerr.Wrap(err)
}
r := bytes.NewReader(b)
parsed, err := format.Parse(r)
if err != nil {
// Fallback to legacy format.
if _, err := r.Seek(0, io.SeekStart); err != nil {
p.parseFailCounter.Inc(1)
return "", "", skerr.Wrap(err)
}
benchData, err := format.ParseLegacyFormat(r)
if err != nil {
p.parseFailCounter.Inc(1)
return "", "", skerr.Wrap(err)
}
return types.CL(benchData.Issue), benchData.PatchSet, nil
}
return parsed.Issue, parsed.Patchset, nil
}
// ParseCommitNumberFromGitHash parse commit number from git hash.
// this method will be used to get integer commit number from string git hash.
// For example: "git_hash": "CP:727901", the commit number will be 727901
func (p *Parser) ParseCommitNumberFromGitHash(gitHash string) (types.CommitNumber, error) {
gitHashContent := strings.SplitN(gitHash, "CP:", -1)
if len(gitHashContent) != 2 {
return types.BadCommitNumber, skerr.Fmt("Failed to parse commit number string from git hash: %q", gitHash)
}
commitNumber, err := strconv.Atoi(gitHashContent[1])
if err != nil {
return types.BadCommitNumber, skerr.Wrapf(err, "Failed to parse commit number integer from git hash: %q", gitHash)
}
return types.CommitNumber(commitNumber), nil
}