package main

import (
	"bytes"
	"context"
	"encoding/gob"
	"fmt"
	"sort"
	"sync"
	"time"

	"go.skia.org/infra/go/httputils"
	"go.skia.org/infra/go/metrics2"
	"go.skia.org/infra/go/metrics2/events"
	"go.skia.org/infra/go/sklog"
	"go.skia.org/infra/go/util"
	"go.skia.org/infra/task_scheduler/go/db"
	"go.skia.org/infra/task_scheduler/go/db/remote_db"
	"go.skia.org/infra/task_scheduler/go/flakes"
)

const (
	TASK_STREAM = "task_metrics"
)

// taskEventDB implements the events.EventDB interface.
type taskEventDB struct {
	cached []*events.Event
	db     db.TaskReader
	em     *events.EventMetrics
	// Do not lock mtx when calling methods on em. Otherwise deadlock can occur.
	// E.g. (now fixed):
	// 1. Thread 1: EventManager.updateMetrics locks EventMetrics.mtx
	// 2. Thread 2: taskEventDB.update locks taskEventDB.mtx
	// 3. Thread 1: EventManager.updateMetrics calls taskEventDB.Range, which waits
	//    to lock taskEventDB.mtx
	// 4. Thread 2: taskEventDB.update calls EventMetrics.AggregateMetric (by way
	//    of addTaskAggregates and EventStream.AggregateMetric), which waits to lock
	//    EventMetrics.mtx
	mtx sync.Mutex
}

// See docs for events.EventDB interface.
func (t *taskEventDB) Append(string, []byte) error {
	return fmt.Errorf("taskEventDB is read-only!")
}

// See docs for events.EventDB interface.
func (t *taskEventDB) Close() error {
	return nil
}

// See docs for events.EventDB interface.
func (t *taskEventDB) Insert(*events.Event) error {
	return fmt.Errorf("taskEventDB is read-only!")
}

// See docs for events.EventDB interface.
func (t *taskEventDB) Range(stream string, start, end time.Time) ([]*events.Event, error) {
	t.mtx.Lock()
	defer t.mtx.Unlock()

	n := len(t.cached)
	if n == 0 {
		return []*events.Event{}, nil
	}

	first := sort.Search(n, func(i int) bool {
		return !t.cached[i].Timestamp.Before(start)
	})

	last := first + sort.Search(n-first, func(i int) bool {
		return !t.cached[i+first].Timestamp.Before(end)
	})

	rv := make([]*events.Event, last-first, last-first)
	copy(rv[:], t.cached[first:last])
	return rv, nil
}

// update updates the cached tasks in the taskEventDB. Only a single thread may
// call this method, but it can be called concurrently with other methods.
func (t *taskEventDB) update() error {
	defer metrics2.FuncTimer().Stop()
	now := time.Now()
	longestPeriod := TIME_PERIODS[len(TIME_PERIODS)-1]
	tasks, err := t.db.GetTasksFromDateRange(now.Add(-longestPeriod), now, "")
	if err != nil {
		return fmt.Errorf("Failed to load tasks from %s to %s: %s", now.Add(-longestPeriod), now, err)
	}
	sklog.Debugf("taskEventDB.update: Processing %d tasks for time range %s to %s.", len(tasks), now.Add(-longestPeriod), now)
	cached := make([]*events.Event, 0, len(tasks))
	for _, task := range tasks {
		if !task.Done() {
			continue
		}
		var buf bytes.Buffer
		if err := gob.NewEncoder(&buf).Encode(task); err != nil {
			return fmt.Errorf("Failed to encode %#v to GOB: %s", task, err)
		}
		ev := &events.Event{
			Stream:    TASK_STREAM,
			Timestamp: task.Created,
			Data:      buf.Bytes(),
		}
		cached = append(cached, ev)
	}
	t.mtx.Lock()
	defer t.mtx.Unlock()
	t.cached = cached
	return nil
}

// computeTaskFlakeRate is an events.DynamicAggregateFn that returns metrics for Task flake rate, given
// a slice of Events created by taskEventDB.update. The first return value will contain the tags
// "task_name" (db.Task.Name) and "metric" (one of "failure-rate", "mishap-rate"),
// and the second return value will be the corresponding ratio of failed/mishap Task to all
// completed Tasks. Returns an error if Event.Data can't be GOB-decoded as a db.Task.
func computeTaskFlakeRate(ev []*events.Event) ([]map[string]string, []float64, error) {
	if len(ev) > 0 {
		// ev should be ordered by timestamp
		sklog.Debugf("Calculating flake-rate for %d tasks since %s.", len(ev), ev[0].Timestamp)
	}
	type taskSum struct {
		flakes int
		count  int
	}
	byTask := map[string]*taskSum{}
	tasks := make([]*db.Task, 0, len(ev))
	for _, e := range ev {
		var task db.Task
		if err := gob.NewDecoder(bytes.NewReader(e.Data)).Decode(&task); err != nil {
			return nil, nil, err
		}
		tasks = append(tasks, &task)
		entry, ok := byTask[task.Name]
		if !ok {
			entry = &taskSum{}
			byTask[task.Name] = entry
		}
		entry.count++
	}
	flaky := flakes.FindFlakes(tasks)
	for _, task := range flaky {
		byTask[task.Name].flakes++
	}
	rvTags := make([]map[string]string, 0, len(byTask)*2)
	rvVals := make([]float64, 0, len(byTask)*2)
	add := func(taskName, metric string, value float64) {
		rvTags = append(rvTags, map[string]string{
			"task_name": taskName,
			"metric":    metric,
		})
		rvVals = append(rvVals, value)
	}
	for taskName, taskSum := range byTask {
		if taskSum.count == 0 {
			continue
		}
		add(taskName, "flake-rate", float64(taskSum.flakes)/float64(taskSum.count))
	}
	return rvTags, rvVals, nil
}

// addTaskAggregates adds aggregation functions for job events to the EventStream.
func addTaskAggregates(s *events.EventStream) error {
	for _, period := range TIME_PERIODS {
		// Flake rate.
		if err := s.DynamicMetric(nil, period, computeTaskFlakeRate); err != nil {
			return err
		}
	}
	return nil
}

// StartTaskMetrics starts a goroutine which ingests metrics data based on Tasks.
func StartTaskMetrics(taskSchedulerDbUrl string, ctx context.Context) error {
	db, err := remote_db.NewClient(taskSchedulerDbUrl, httputils.NewTimeoutClient())
	if err != nil {
		return err
	}
	edb := &taskEventDB{
		cached: []*events.Event{},
		db:     db,
	}
	em, err := events.NewEventMetrics(edb, "task_metrics")
	if err != nil {
		return err
	}
	edb.em = em

	s := em.GetEventStream(TASK_STREAM)
	if err := addTaskAggregates(s); err != nil {
		return err
	}

	lv := metrics2.NewLiveness("last_successful_task_metrics_update")
	go util.RepeatCtx(5*time.Minute, ctx, func() {
		if err := edb.update(); err != nil {
			sklog.Errorf("Failed to update task data: %s", err)
		} else {
			lv.Reset()
		}
	})
	em.Start(ctx)
	return nil
}
