// Package parser has funcs for parsing ingestion files.
package parser

import (
	"bytes"
	"context"
	"errors"
	"io"
	"regexp"
	"strconv"
	"strings"

	"go.opencensus.io/trace"
	"go.skia.org/infra/go/metrics2"
	"go.skia.org/infra/go/paramtools"
	"go.skia.org/infra/go/query"
	"go.skia.org/infra/go/skerr"
	"go.skia.org/infra/go/sklog"
	"go.skia.org/infra/go/util"
	"go.skia.org/infra/perf/go/config"
	"go.skia.org/infra/perf/go/file"
	"go.skia.org/infra/perf/go/ingest/format"
	"go.skia.org/infra/perf/go/types"
)

var (
	// ErrFileShouldBeSkipped is returned if a file should be skipped.
	ErrFileShouldBeSkipped = errors.New("File should be skipped.")
)

// Parser parses file.Files contents into a form suitable for writing to trace.Store.
type Parser struct {
	parseCounter          metrics2.Counter
	parseFailCounter      metrics2.Counter
	branchNames           map[string]bool
	invalidParamCharRegex *regexp.Regexp
}

// New creates a new instance of Parser for the given branch names
// and invalid chars of parameter key/value
func New(instanceConfig *config.InstanceConfig) (parser *Parser, err error) {
	branches := instanceConfig.IngestionConfig.Branches

	invalidParamCharRegex := query.InvalidChar
	if instanceConfig.InvalidParamCharRegex != "" {
		invalidParamCharRegex, err = regexp.Compile(instanceConfig.InvalidParamCharRegex)
		if err != nil {
			return nil, skerr.Wrap(err)
		}
	}

	ret := &Parser{
		parseCounter:          metrics2.GetCounter("perf_ingest_parser_parse", nil),
		parseFailCounter:      metrics2.GetCounter("perf_ingest_parser_parse_failed", nil),
		branchNames:           map[string]bool{},
		invalidParamCharRegex: invalidParamCharRegex,
	}
	for _, branchName := range branches {
		ret.branchNames[branchName] = true
	}
	return ret, nil
}

// buildInitialParams returns a Params for the given BenchResult.
func buildInitialParams(testName, configName string, b *format.BenchData, result format.BenchResult) paramtools.Params {
	ret := paramtools.Params(b.Key).Copy()
	ret["test"] = testName
	ret["config"] = configName
	ret.Add(paramtools.Params(b.Options))
	// If there is an options map inside the result add it to the params.
	if resultOptions, ok := result["options"]; ok {
		if opts, ok := resultOptions.(map[string]interface{}); ok {
			for k, vi := range opts {
				// Ignore the very long and not useful GL_ values, we can retrieve
				// them later via ptracestore.Details.
				if strings.HasPrefix(k, "GL_") {
					continue
				}
				if s, ok := vi.(string); ok {
					ret[k] = s
				}
			}
		}
	}
	return ret
}

// getParamsAndValuesFromLegacyFormat returns two parallel slices, each slice
// contains the params and then the float for a single value of a trace.
func getParamsAndValuesFromLegacyFormat(b *format.BenchData) ([]paramtools.Params, []float32) {
	params := []paramtools.Params{}
	values := []float32{}
	for testName, allConfigs := range b.Results {
		for configName, result := range allConfigs {
			key := buildInitialParams(testName, configName, b, result)
			for k, vi := range result {
				if k == "options" || k == "samples" {
					continue
				}
				key["sub_result"] = k
				floatVal, ok := vi.(float64)
				if !ok {
					sklog.Errorf("Found a non-float64 in %v", result)
					continue
				}
				key = query.ForceValid(key)
				params = append(params, key.Copy())
				values = append(values, float32(floatVal))
			}
		}
	}
	return params, values
}

// Samples contain multiple runs of the same test, where Params describes the
// test.
type Samples struct {
	Params paramtools.Params
	Values []float64
}

// SamplesSet maps trace names to Samples for that trace.
type SamplesSet map[string]Samples

// Add all the Samples from 'in'.
func (s SamplesSet) Add(in SamplesSet) {
	for key, samples := range in {
		existingSamples, ok := s[key]
		if !ok {
			existingSamples = Samples{
				Params: samples.Params.Copy(),
				Values: []float64{},
			}
		}
		existingSamples.Values = append(existingSamples.Values, samples.Values...)
		s[key] = existingSamples
	}
}

// GetSamplesFromLegacyFormat returns a map from trace id to the slice of
// samples for that test.
func GetSamplesFromLegacyFormat(b *format.BenchData) SamplesSet {
	ret := SamplesSet{}
	for testName, allConfigs := range b.Results {
		for configName, result := range allConfigs {
			params := buildInitialParams(testName, configName, b, result)
			iSamples, ok := result["samples"]
			if !ok {
				continue
			}
			// We only collect samples for min_ms.
			params["sub_result"] = "min_ms"

			params = query.ForceValid(params)
			traceID, err := query.MakeKeyFast(params)
			if err != nil {
				continue
			}
			iSlice := iSamples.([]interface{})
			values := make([]float64, 0, len(iSlice))
			for _, ix := range iSlice {
				x, ok := ix.(float64)
				if !ok {
					continue
				}
				values = append(values, x)
			}
			ret[traceID] = Samples{
				Params: params,
				Values: values,
			}
		}
	}
	return ret
}

// getParamsAndValuesFromVersion1Format returns two parallel slices, each slice contains
// the params and then the float for a single value of a trace.
func getParamsAndValuesFromVersion1Format(f format.Format, invalidParamCharRegex *regexp.Regexp) ([]paramtools.Params, []float32) {
	paramSlice := []paramtools.Params{}
	keyParams := paramtools.Params(f.Key)
	measurementSlice := []float32{}
	for _, result := range f.Results {
		p := keyParams.Copy()
		p.Add(result.Key)
		if len(result.Measurements) == 0 {
			paramSlice = append(paramSlice, query.ForceValidWithRegex(p, invalidParamCharRegex))
			measurementSlice = append(measurementSlice, result.Measurement)
		} else {
			for key, measurements := range result.Measurements {
				for _, measurement := range measurements {
					singleParam := p.Copy()
					singleParam[key] = measurement.Value
					paramSlice = append(paramSlice, query.ForceValidWithRegex(singleParam, invalidParamCharRegex))
					measurementSlice = append(measurementSlice, measurement.Measurement)
				}
			}

		}
	}
	return paramSlice, measurementSlice
}

// checkBranchName returns the branch name and true if the file should continue
// to be processed. Note that if the 'params' don't contain a key named 'branch'
// then the file should be processed, in which case the returned branch name is
// "".
func (p *Parser) checkBranchName(params map[string]string) (string, bool) {
	if len(p.branchNames) == 0 {
		return "", true
	}
	branch, ok := params["branch"]
	if ok {
		return branch, p.branchNames[branch]
	}
	return "", true
}

func (p *Parser) extractFromLegacyFile(r io.Reader, filename string) ([]paramtools.Params, []float32, string, map[string]string, error) {
	benchData, err := format.ParseLegacyFormat(r)
	if err != nil {
		return nil, nil, "", nil, err
	}
	params, values := getParamsAndValuesFromLegacyFormat(benchData)
	return params, values, benchData.Hash, benchData.Key, nil
}

func (p *Parser) extractFromVersion1File(r io.Reader, filename string) ([]paramtools.Params, []float32, string, map[string]string, error) {
	f, err := format.Parse(r)
	if err != nil {
		sklog.Warningf("Failed to parse the version one file: %s, got error: %s", filename, err)
		return nil, nil, "", nil, err
	}
	params, values := getParamsAndValuesFromVersion1Format(f, p.invalidParamCharRegex)
	return params, values, f.GitHash, f.Key, nil
}

// Parse the given file.File contents.
//
// Returns two parallel slices, each slice contains the params and then the
// float for a single value of a trace.
//
// The returned error will be ErrFileShouldBeSkipped if the file should not be
// processed any further.
//
// The File.Contents will be closed when this func returns.
func (p *Parser) Parse(ctx context.Context, file file.File) ([]paramtools.Params, []float32, string, error) {
	_, span := trace.StartSpan(ctx, "ingest.parser.Parse")
	defer span.End()

	defer util.Close(file.Contents)
	p.parseCounter.Inc(1)

	// Read the whole content into bytes.Reader since we may take more than one
	// pass at the data.
	sklog.Infof("About to read.")
	b, err := io.ReadAll(file.Contents)
	sklog.Infof("Finished readall.")
	if err != nil {
		p.parseFailCounter.Inc(1)
		return nil, nil, "", skerr.Wrap(err)
	}
	r := bytes.NewReader(b)

	// Expect the file to be in format.FileFormat.
	sklog.Info("About to extract")
	params, values, hash, commonKeys, err := p.extractFromVersion1File(r, file.Name)
	if err != nil {
		// Fallback to the legacy format.
		if _, err := r.Seek(0, io.SeekStart); err != nil {
			return nil, nil, "", skerr.Wrap(err)
		}
		sklog.Info("About to extract from legacy.")
		params, values, hash, commonKeys, err = p.extractFromLegacyFile(r, file.Name)
	}
	if err != nil && err != ErrFileShouldBeSkipped {
		p.parseFailCounter.Inc(1)
	}
	if err != nil {
		return nil, nil, "", err
	}
	branch, ok := p.checkBranchName(commonKeys)
	if !ok {
		return nil, nil, "", ErrFileShouldBeSkipped
	}
	if len(params) == 0 {
		metrics2.GetCounter("perf_ingest_parser_no_data_in_file", map[string]string{"branch": branch}).Inc(1)
		sklog.Infof("No data in: %q", file.Name)
		return nil, nil, "", ErrFileShouldBeSkipped
	}
	return params, values, hash, nil
}

// ParseTryBot extracts the issue and patch identifiers from the file.File.
//
// The issue and patch values are returned as strings. If either can be further
// parsed as integers that will be done at a higher level.
func (p *Parser) ParseTryBot(file file.File) (types.CL, string, error) {
	defer util.Close(file.Contents)
	p.parseCounter.Inc(1)

	// Read the whole content into bytes.Reader since we may take more than one
	// pass at the data.
	b, err := io.ReadAll(file.Contents)
	if err != nil {
		p.parseFailCounter.Inc(1)
		return "", "", skerr.Wrap(err)
	}
	r := bytes.NewReader(b)

	parsed, err := format.Parse(r)
	if err != nil {
		// Fallback to legacy format.
		if _, err := r.Seek(0, io.SeekStart); err != nil {
			p.parseFailCounter.Inc(1)
			return "", "", skerr.Wrap(err)
		}
		benchData, err := format.ParseLegacyFormat(r)
		if err != nil {
			p.parseFailCounter.Inc(1)
			return "", "", skerr.Wrap(err)
		}
		return types.CL(benchData.Issue), benchData.PatchSet, nil
	}
	return parsed.Issue, parsed.Patchset, nil

}

// ParseCommitNumberFromGitHash parse commit number from git hash.
// this method will be used to get integer commit number from string git hash.
// For example: "git_hash": "CP:727901", the commit number will be 727901
func (p *Parser) ParseCommitNumberFromGitHash(gitHash string) (types.CommitNumber, error) {
	gitHashContent := strings.SplitN(gitHash, "CP:", -1)

	if len(gitHashContent) != 2 {
		return types.BadCommitNumber, skerr.Fmt("Failed to parse commit number string from git hash: %q", gitHash)
	}

	commitNumber, err := strconv.Atoi(gitHashContent[1])
	if err != nil {
		return types.BadCommitNumber, skerr.Wrapf(err, "Failed to parse commit number integer from git hash: %q", gitHash)
	}

	return types.CommitNumber(commitNumber), nil
}
