// Package process does the whole process of ingesting files into a trace store.
package process

import (
	"context"
	"sync"
	"time"

	"cloud.google.com/go/pubsub"
	"go.opencensus.io/trace"
	"go.skia.org/infra/go/metrics2"
	"go.skia.org/infra/go/paramtools"
	"go.skia.org/infra/go/query"
	"go.skia.org/infra/go/skerr"
	"go.skia.org/infra/go/sklog"
	"golang.org/x/oauth2/google"

	"go.skia.org/infra/perf/go/builders"
	"go.skia.org/infra/perf/go/config"
	"go.skia.org/infra/perf/go/file"
	"go.skia.org/infra/perf/go/git"
	"go.skia.org/infra/perf/go/ingest/parser"
	"go.skia.org/infra/perf/go/ingestevents"
	"go.skia.org/infra/perf/go/tracestore"
	"go.skia.org/infra/perf/go/tracing"
	"go.skia.org/infra/perf/go/types"
	"google.golang.org/api/option"
)

const writeRetries = 10

// sendPubSubEvent sends the unencoded params and paramset found in a single
// ingested file to the PubSub topic specified in the selected Perf instances
// configuration data.
func sendPubSubEvent(ctx context.Context, pubSubClient *pubsub.Client, topicName string, params []paramtools.Params, paramset paramtools.ReadOnlyParamSet, filename string) error {
	if topicName == "" {
		return nil
	}
	traceIDs := make([]string, 0, len(params))
	for _, p := range params {
		key, err := query.MakeKey(p)
		if err != nil {
			continue
		}
		traceIDs = append(traceIDs, key)
	}
	ie := &ingestevents.IngestEvent{
		TraceIDs: traceIDs,
		ParamSet: paramset,
		Filename: filename,
	}
	body, err := ingestevents.CreatePubSubBody(ie)
	if err != nil {
		return skerr.Wrapf(err, "Failed to encode PubSub body for topic: %q", topicName)
	}
	msg := &pubsub.Message{
		Data: body,
	}
	_, err = pubSubClient.Topic(topicName).Publish(ctx, msg).Get(ctx)

	return skerr.Wrap(err)
}

// workerInfo is all the information that a worker Go routine will need to
// process a single incoming file.
type workerInfo struct {
	filesReceived        metrics2.Counter
	failedToParse        metrics2.Counter
	skipped              metrics2.Counter
	badGitHash           metrics2.Counter
	failedToWrite        metrics2.Counter
	successfulWrite      metrics2.Counter
	successfulWriteCount metrics2.Counter
	dlEnabled            bool
	p                    *parser.Parser
	store                tracestore.TraceStore
	g                    *git.Git
	pubSubClient         *pubsub.Client
	instanceConfig       *config.InstanceConfig
}

// newWorker returns a new *workerInfo.
func newWorker(
	filesReceived metrics2.Counter,
	failedToParse metrics2.Counter,
	skipped metrics2.Counter,
	badGitHash metrics2.Counter,
	failedToWrite metrics2.Counter,
	successfulWrite metrics2.Counter,
	successfulWriteCount metrics2.Counter,
	dlEnabled bool,
	p *parser.Parser,
	store tracestore.TraceStore,
	g *git.Git,
	pubSubClient *pubsub.Client,
	instanceConfig *config.InstanceConfig,
) *workerInfo {
	return &workerInfo{
		filesReceived:        filesReceived,
		failedToParse:        failedToParse,
		skipped:              skipped,
		badGitHash:           badGitHash,
		failedToWrite:        failedToWrite,
		successfulWrite:      successfulWrite,
		successfulWriteCount: successfulWriteCount,
		dlEnabled:            dlEnabled,
		p:                    p,
		store:                store,
		g:                    g,
		pubSubClient:         pubSubClient,
		instanceConfig:       instanceConfig,
	}
}

// processSingleFile parses a single incoming file and write the data to the
// datastore.
func (w *workerInfo) processSingleFile(f file.File) error {
	ctx := context.Background()
	ctx, span := trace.StartSpan(ctx, "ingest.parser.processSingleFile")
	defer span.End()

	sklog.Infof("Ingest received: %v", f)
	w.filesReceived.Inc(1)

	// Parse the file.
	params, values, gitHash, err := w.p.Parse(ctx, f)
	if err != nil {
		if err == parser.ErrFileShouldBeSkipped {
			sklog.Debugf("File should be skipped %v: %s", f, err)
			w.skipped.Inc(1)
		} else {
			sklog.Errorf("Failed to parse %v: %s", f, err)
			w.failedToParse.Inc(1)
		}
		nackMessageIfNecessary(w.dlEnabled, f)
		return nil
	}

	sklog.Info("Lookup CommitNumber")

	// if git_hash is missing from GCS file
	if len(gitHash) == 0 {
		sklog.Errorf("Unable to handle empty git hash.")
		nackMessageIfNecessary(w.dlEnabled, f)
		return nil
	}

	commitNumberFromFile := types.CommitNumber(0)
	if w.g.RepoSuppliedCommitNumber() {
		commitNumberFromFile, err = w.p.ParseCommitNumberFromGitHash(gitHash)
		if err != nil {
			sklog.Errorf("Unable to convert githash to integer commit number %q.", gitHash, err)
			nackMessageIfNecessary(w.dlEnabled, f)
			return nil
		}
	}

	// Convert gitHash or check the existance of a commitNumber.
	commitNumber, err := w.g.GetCommitNumber(ctx, gitHash, commitNumberFromFile)
	if err != nil {
		if err := w.g.Update(ctx); err != nil {
			sklog.Errorf("Failed to Update: ", err)
		}
		commitNumber, err = w.g.GetCommitNumber(ctx, gitHash, commitNumberFromFile)
		if err != nil {
			w.badGitHash.Inc(1)
			sklog.Error("Failed to find commit number %v: %s", f, err)
			nackMessageIfNecessary(w.dlEnabled, f)
			return nil
		}
	}

	sklog.Info("Build ParamSet")
	// Build paramset from params.
	ps := paramtools.NewParamSet()
	for _, p := range params {
		ps.AddParams(p)
	}
	ps.Normalize()

	sklog.Info("WriteTraces")
	const retries = writeRetries
	i := 0
	writeFailed := false
	for {
		// Write data to the trace store.
		err := w.store.WriteTraces(ctx, commitNumber, params, values, ps, f.Name, time.Now())
		if err == nil {
			break
		}
		i++
		if i > retries {
			writeFailed = true
			break
		}
	}
	if writeFailed {
		w.failedToWrite.Inc(1)
		sklog.Errorf("Failed to write after %d retries %q: %s", retries, f.Name, err)
		nackMessageIfNecessary(w.dlEnabled, f)
	} else {
		if f.PubSubMsg != nil {
			f.PubSubMsg.Ack()
			sklog.Debugf("Message acked: %v", f.PubSubMsg)
		}
		w.successfulWrite.Inc(1)
		w.successfulWriteCount.Inc(int64(len(params)))
	}

	if err := sendPubSubEvent(ctx, w.pubSubClient, w.instanceConfig.IngestionConfig.FileIngestionTopicName, params, ps.Freeze(), f.Name); err != nil {
		sklog.Errorf("Failed to send pubsub event: %s", err)
	} else {
		sklog.Info("FileIngestionTopicName pubsub message sent.")
	}
	return nil
}

// worker ingests files that arrive on the given 'ch' channel.
func worker(ctx context.Context, wg *sync.WaitGroup, g *git.Git, store tracestore.TraceStore, ch <-chan file.File, pubSubClient *pubsub.Client, instanceConfig *config.InstanceConfig) {
	// Metrics.
	filesReceived := metrics2.GetCounter("perfserver_ingest_files_received")
	failedToParse := metrics2.GetCounter("perfserver_ingest_failed_to_parse")
	skipped := metrics2.GetCounter("perfserver_ingest_skipped")
	badGitHash := metrics2.GetCounter("perfserver_ingest_bad_githash")
	failedToWrite := metrics2.GetCounter("perfserver_ingest_failed_to_write")
	successfulWrite := metrics2.GetCounter("perfserver_ingest_successful_write")
	successfulWriteCount := metrics2.GetCounter("perfserver_ingest_num_points_written")
	dlEnabled := config.IsDeadLetterCollectionEnabled(instanceConfig)

	// New Parser.
	p, err := parser.New(instanceConfig)
	if err != nil {
		sklog.Errorf("Ingestion worker failed to create parser: %s", err)
		wg.Done()
		return
	}

	workerInfo := newWorker(filesReceived, failedToParse, skipped, badGitHash, failedToWrite, successfulWrite, successfulWriteCount, dlEnabled, p, store, g, pubSubClient, instanceConfig)

	for f := range ch {
		if err := ctx.Err(); err != nil {
			sklog.Error(err)
			break
		}
		if err := workerInfo.processSingleFile(f); err != nil {
			break
		}
	}
	wg.Done()
}

// Start a single go routine to process incoming ingestion files and write
// the data they contain to a trace store.
//
// Except for file.Sources of type "dir" this function should never return
// except on error.
func Start(ctx context.Context, local bool, numParallelIngesters int, instanceConfig *config.InstanceConfig) error {
	if err := tracing.Init(local, instanceConfig); err != nil {
		sklog.Fatalf("Failed to start tracing: %s", err)
	}

	var pubSubClient *pubsub.Client
	if instanceConfig.IngestionConfig.FileIngestionTopicName != "" {
		ts, err := google.DefaultTokenSource(ctx, pubsub.ScopePubSub)
		if err != nil {
			sklog.Fatalf("Failed to create TokenSource: %s", err)
		}

		pubSubClient, err = pubsub.NewClient(ctx, instanceConfig.IngestionConfig.SourceConfig.Project, option.WithTokenSource(ts))
		if err != nil {
			sklog.Fatal(err)
		}
	}

	// New file.Source.
	source, err := builders.NewSourceFromConfig(ctx, instanceConfig, local)
	if err != nil {
		return skerr.Wrap(err)
	}
	ch, err := source.Start(ctx)
	if err != nil {
		return skerr.Wrap(err)
	}

	// New TraceStore.
	store, err := builders.NewTraceStoreFromConfig(ctx, local, instanceConfig)
	if err != nil {
		return skerr.Wrap(err)
	}

	// New perfgit.Git.
	sklog.Infof("Cloning repo %q into %q", instanceConfig.GitRepoConfig.URL, instanceConfig.GitRepoConfig.Dir)
	g, err := builders.NewPerfGitFromConfig(ctx, local, instanceConfig)
	if err != nil {
		return skerr.Wrap(err)
	}
	// Polling isn't needed because we call update on the repo if we find a git hash we don't recognize.
	// g.StartBackgroundPolling(ctx, gitRefreshDuration)

	sklog.Info("Waiting on files to process.")

	var wg sync.WaitGroup

	for i := 0; i < numParallelIngesters; i++ {
		wg.Add(1)
		go worker(ctx, &wg, g, store, ch, pubSubClient, instanceConfig)
	}
	wg.Wait()

	sklog.Infof("Exited while waiting on files. Should only happen on source_type=dir.")
	return nil
}

func nackMessageIfNecessary(dlEnabled bool, f file.File) {
	if dlEnabled {
		// This message will be available to the ingestor immediately.
		f.PubSubMsg.Nack()
		sklog.Debugf("Message nacked during message process: %v", f.PubSubMsg)
	}
}
