// Package dfiter efficiently creates dataframes used in regression detection.
package dfiter

import (
	"context"
	"errors"
	"fmt"
	"net/url"
	"time"

	"go.opencensus.io/trace"
	"go.skia.org/infra/go/metrics2"
	"go.skia.org/infra/go/now"
	"go.skia.org/infra/go/query"
	"go.skia.org/infra/go/skerr"
	"go.skia.org/infra/go/sklog"
	"go.skia.org/infra/perf/go/alerts"
	"go.skia.org/infra/perf/go/config"
	"go.skia.org/infra/perf/go/dataframe"
	perfgit "go.skia.org/infra/perf/go/git"
	"go.skia.org/infra/perf/go/progress"
	"go.skia.org/infra/perf/go/types"
)

// ErrInsufficientData is returned by the DataFrameIterator if all the queries
// compeleted successfully, but there wasn't enough data to continue.
var ErrInsufficientData = errors.New("insufficient data")

// DataFrameIterator is an iterator that produces DataFrames.
//
//	for it.Next() {
//	  df, err := it.Value(ctx)
//	  // Do something with df.
//	}
type DataFrameIterator interface {
	Next() bool
	Value(ctx context.Context) (*dataframe.DataFrame, error)
}

// dataframeSlicer implements DataFrameIterator by slicing sub-dataframes from
// a larger dataframe.
type dataframeSlicer struct {
	df     *dataframe.DataFrame
	size   int
	offset int
}

// See DataFrameIterator.
func (d *dataframeSlicer) Next() bool {
	return d.offset+d.size <= len(d.df.Header)
}

// See DataFrameIterator.
func (d *dataframeSlicer) Value(ctx context.Context) (*dataframe.DataFrame, error) {
	// Slice off a sub-dataframe from d.df.
	df, err := d.df.Slice(d.offset, d.size)
	if err != nil {
		return nil, err
	}
	d.offset += 1
	return df, nil
}

// NewDataFrameIterator returns a DataFrameIterator that produces a set of
// dataframes for the given query, domain, and alert.
//
// If domain.Offset is non-zero then we want the iterator to return a single
// dataframe of alert.Radius around the specified commit. Otherwise it returns a
// series of dataframes of size 2*alert.Radius+1 sliced from a single dataframe
// of size domain.N.
func NewDataFrameIterator(
	ctx context.Context,
	progress progress.Progress,
	dfBuilder dataframe.DataFrameBuilder,
	perfGit perfgit.Git,
	regressionStateCallback types.ProgressCallback,
	queryAsString string,
	domain types.Domain,
	alert *alerts.Alert,
	anomalyConfig config.AnomalyConfig,
) (DataFrameIterator, error) {
	ctx, span := trace.StartSpan(ctx, "dfiter.NewDataFrameIterator")
	defer span.End()

	// Because of GroupBy the Alert query isn't the one we use, instead a
	// sub-query is passed in.
	u, err := url.ParseQuery(queryAsString)
	if err != nil {
		return nil, skerr.Wrap(err)
	}
	q, err := query.New(u)
	if err != nil {
		return nil, skerr.Wrap(err)
	}
	var df *dataframe.DataFrame
	if domain.Offset == 0 {
		if anomalyConfig.SettlingTime != 0 {
			currentTime := now.Now(ctx)
			latestAllowedPoints := currentTime.Add(-1 * time.Duration(anomalyConfig.SettlingTime))
			if latestAllowedPoints.Before(domain.End) {
				domain.End = latestAllowedPoints
			}
		}

		df, err = dfBuilder.NewNFromQuery(ctx, domain.End, q, domain.N, progress)
		if err != nil {
			if regressionStateCallback != nil {
				regressionStateCallback("Failed querying the data due to an internal error.")
			}
			return nil, skerr.Wrapf(err, "Failed to build dataframe iterator source dataframe")
		}
	} else {
		// We can get an iterator that returns just a single dataframe by making
		// sure that the size of the origin dataframe is the same size as the
		// slicer size, so we set them both to 2*Radius+1.
		n := int32(2*alert.Radius + 1)
		// Need to find an End time, which is the commit time of the commit at
		// Offset+Radius.
		//
		// That is, for example, we are looking at commit 21 with a Radius of 3
		// to request an endpoint of 24:
		//
		//    [ 18, 19, 20, 21, 22, 23, 24]
		//
		// That way we have the right number of points for types.OriginalStep
		// (2*n+1), and by chopping down the length of the result by 1 we can
		// get a dataframe of the right length for the rest of the step finding
		// algorithms, i.e.:
		//
		//    [ 18, 19, 20, 21, 22, 23 ]
		//
		// All of these contortions are to keep the detection algorithms
		// consistent. Eventually types.OriginalStep should be changed to work
		// on a dataframe of length 2*n like all the rest.
		endCommit := types.CommitNumber(int(domain.Offset) + alert.Radius)
		commit, err := perfGit.CommitFromCommitNumber(ctx, endCommit)
		if err != nil {
			if regressionStateCallback != nil {
				regressionStateCallback(fmt.Sprintf("Not a valid commit number %d. Make sure you choose a commit old enough to have %d commits before it and %d commits after it.", endCommit, alert.Radius, alert.Radius-1))
			}

			return nil, skerr.Wrapf(err, "Failed to look up CommitNumber of a single cluster request.")
		}
		df, err = dfBuilder.NewNFromQuery(ctx, time.Unix(commit.Timestamp, 0), q, n, progress)
		if err != nil {
			if regressionStateCallback != nil {
				regressionStateCallback("Failed querying the data due to an internal error.")
			}
			return nil, skerr.Wrapf(err, "Failed to build dataframe iterator source dataframe.")
		}
	}
	if len(df.Header) < int(2*alert.Radius+1) {
		if regressionStateCallback != nil {
			regressionStateCallback(fmt.Sprintf("Query didn't return enough data points: Got %d. Want %d.", len(df.Header), 2*alert.Radius+1))
		}
		sklog.Infof("Query didn't return enough data points: Got %d. Want %d.", len(df.Header), 2*alert.Radius+1)
		return nil, ErrInsufficientData
	}
	// Record the total number of floating point values that were just queried
	// fron the database. Since we know the size of a float we can use this to
	// roughly estimate the MB/s of regression detection.
	metrics2.GetCounter("perf_regression_detection_floats").Inc(int64(len(df.Header) * len(df.TraceSet)))
	return &dataframeSlicer{
		df:     df,
		size:   2*alert.Radius + 1,
		offset: 0,
	}, nil
}
