// Package tracedb provides a datastore for efficiently storing and retrieving traces.
package db

import (
	"fmt"
	"sync"
	"time"

	"github.com/golang/groupcache/lru"
	"go.skia.org/infra/go/metrics2"
	"go.skia.org/infra/go/sklog"
	"go.skia.org/infra/go/tiling"
	"go.skia.org/infra/go/trace/service"
	"golang.org/x/net/context"
	"google.golang.org/grpc"
)

// CommitID represents the time of a particular commit, where a commit could either be
// a real commit into the repo, or an event like running a trybot.
type CommitID struct {
	Timestamp int64  `json:"ts"`
	ID        string `json:"id"`     // Normally a git hash, but could also be Rietveld patch id.
	Source    string `json:"source"` // The branch name, e.g. "master", or the Reitveld issue id.
}

func (c CommitID) String() string {
	return fmt.Sprintf("%s:%s:%d", c.ID, c.Source, c.Timestamp)
}

// Entry holds the params and a value for single measurement.
type Entry struct {
	Params map[string]string

	// Value is the value of the measurement.
	//
	// It should be the digest string converted to a []byte, or a float64
	// converted to a little endian []byte. I.e. tiling.Trace.SetAt
	// should be able to consume this value.
	Value []byte
}

// DB represents the interface to any datastore for perf and gold results.
type DB interface {
	// Add new information to the datastore.
	//
	// The values parameter maps a trace id to an Entry.
	//
	// Note that only allowing adding data for a single commit at a time
	// should work well with ingestion while still breaking up writes into
	// shorter actions.
	Add(commitID *CommitID, values map[string]*Entry) error

	// Remove all info for the given commit.
	Remove(commitID *CommitID) error

	// List returns all the CommitID's between begin and end.
	List(begin, end time.Time) ([]*CommitID, error)

	// Create a Tile for the given commit ids. Will build the Tile using the
	// commits in the order they are provided.
	//
	// Note that the Commits in the Tile will only contain the commit id and
	// the timestamp, the Author will not be populated.
	//
	// The Tile's Scale and TileIndex will be set to 0.
	//
	// The md5 hashes for each commitid are also returned.
	TileFromCommits(commitIDs []*CommitID) (*tiling.Tile, []string, error)

	// ListMD5 returns the md5 hashes of the data stored for each commitid.
	ListMD5(commitIDs []*CommitID) ([]string, error)

	// Close the datastore.
	Close() error
}

const (
	// MAX_ID_CACHED is the size of the LRU cache TsDB.cache.
	MAX_ID_CACHED = 1000000

	// CHUNK_SIZE is the maximum number of values that are added to the datastore
	// in any one gRPC call.
	CHUNK_SIZE = 5000

	// MAX_MESSAGE_SIZE is the maximum grpc message size.
	MAX_MESSAGE_SIZE = 1024 * 1024 * 1024
)

// TsDB is an implementation of DB that stores traces in traceservice.
type TsDB struct {
	// ts is the client for the traceservice.
	traceService traceservice.TraceServiceClient

	// conn is the underlying connection for ts.
	conn *grpc.ClientConn

	// tb is a TraceBuilder for the type of Tile we're managing, i.e. perf or gold.
	// It is used to build Trace's of the right type and size when building Tiles.
	traceBuilder tiling.TraceBuilder

	// mutex protects access to the caches.
	mutex sync.Mutex

	// cache is an LRU cache, records if a given trace has its params stored
	// during a previous Add(), keyed by traceid and maps to a bool
	cache *lru.Cache

	// paramsCache is a cache of params retrieved from tracedb, keyed by traceid.
	paramsCache map[string]map[string]string

	// id64Cache is a cache of traceids retrieved from tracedb, keyed by trace64id.
	id64Cache map[uint64]string

	// ctx is the gRPC context.
	ctx context.Context

	// clearMutex is a mutex to protect clearing of the caches. TileFromCommits
	// will get read locks, and a go routine that periodically checks the cache
	// sizes will gain a write lock. That way concurrent TileFromCommits calls
	// will proceed, but none will be running when the caches are potentially
	// cleared.
	clearMutex sync.RWMutex
}

// NewTraceServiceDB creates a new DB that stores the data in the BoltDB backed
// gRPC accessible traceservice.
func NewTraceServiceDB(conn *grpc.ClientConn, traceBuilder tiling.TraceBuilder) (*TsDB, error) {
	ret := &TsDB{
		conn:         conn,
		traceService: traceservice.NewTraceServiceClient(conn),
		traceBuilder: traceBuilder,
		cache:        lru.New(MAX_ID_CACHED),
		paramsCache:  map[string]map[string]string{},
		id64Cache:    map[uint64]string{},
		ctx:          context.Background(),
	}

	// This ping causes the client to try and reach the backend. If the backend
	// is down, it will keep trying until it's up.
	if err := ret.ping(); err != nil {
		return nil, err
	}

	// Liveness metric.
	go func() {
		liveness := metrics2.NewLiveness("ping", map[string]string{"module": "tracedb"})
		for range time.Tick(time.Minute) {
			if ret.ping() == nil {
				liveness.Reset()
			}
		}
	}()

	// Keep the caches sizes in check.
	go func() {
		for range time.Tick(15 * time.Minute) {
			ret.clearMutex.Lock()
			if len(ret.paramsCache) > MAX_ID_CACHED {
				ret.paramsCache = map[string]map[string]string{}
				sklog.Warning("Had to clear paramsCache, this is unexpected. MAX_ID_CACHED too small?")
			}
			if len(ret.id64Cache) > MAX_ID_CACHED {
				ret.id64Cache = map[uint64]string{}
				sklog.Warning("Had to clear id64Cache, this is unexpected. MAX_ID_CACHED too small?")
			}
			ret.clearMutex.Unlock()
		}
	}()
	return ret, nil
}

func (ts *TsDB) ping() error {
	_, err := ts.traceService.Ping(ts.ctx, &traceservice.Empty{})
	return err
}

// addChunk adds a set of entries to the datastore at the given CommitID.
func (ts *TsDB) addChunk(ctx context.Context, cid *traceservice.CommitID, chunk map[string]*Entry) error {
	if len(chunk) == 0 {
		return nil
	}
	addReq := &traceservice.AddRequest{
		Commitid: cid,
		Values:   []*traceservice.ValuePair{},
	}
	addParamsRequest := &traceservice.AddParamsRequest{
		Params: []*traceservice.ParamsPair{},
	}
	for traceid, entry := range chunk {
		// Check that all the traceids have their Params.
		ts.mutex.Lock()
		if _, ok := ts.cache.Get(traceid); !ok {
			addParamsRequest.Params = append(addParamsRequest.Params, &traceservice.ParamsPair{
				Key:    traceid,
				Params: entry.Params,
			})
			ts.cache.Add(traceid, true)
		}
		ts.mutex.Unlock()
		addReq.Values = append(addReq.Values, &traceservice.ValuePair{
			Key:   traceid,
			Value: entry.Value,
		})
	}
	if len(addParamsRequest.Params) > 0 {
		// TODO(stephana): We need to fix the call to AddParams. If it fails the
		// the DB ends up in an inconsistent state and traceService.GetParams
		// for the failing traceID will cause a panic.

		if _, err := ts.traceService.AddParams(ctx, addParamsRequest); err != nil {
			return fmt.Errorf("Failed to add params: %s", err)
		}
	}
	if _, err := ts.traceService.Add(ctx, addReq); err != nil {
		return fmt.Errorf("Failed to add values: %s", err)
	}
	return nil
}

// tsCommitID converts a db.CommitID to traceservice.CommitID.
func tsCommitID(c *CommitID) *traceservice.CommitID {
	return &traceservice.CommitID{
		Timestamp: c.Timestamp,
		Id:        c.ID,
		Source:    c.Source,
	}
}

// dbCommitID converts a traceservice.CommitID to db.CommitID.
func dbCommitID(c *traceservice.CommitID) *CommitID {
	return &CommitID{
		ID:        c.Id,
		Source:    c.Source,
		Timestamp: c.Timestamp,
	}
}

// Add implements DB.Add().
func (ts *TsDB) Add(commitID *CommitID, values map[string]*Entry) error {
	ctx := context.Background()
	cid := tsCommitID(commitID)
	// Break the values into chunks of CHUNK_SIZE or less and then process each slice.
	// This will keep the total request size down.
	chunks := []map[string]*Entry{}
	chunk := map[string]*Entry{}
	n := 0
	for k, v := range values {
		chunk[k] = v
		if n >= CHUNK_SIZE {
			n = 0
			chunks = append(chunks, chunk)
			chunk = map[string]*Entry{}
		}
		n++
	}
	chunks = append(chunks, chunk)

	for i, chunk := range chunks {
		if err := ts.addChunk(ctx, cid, chunk); err != nil {
			return fmt.Errorf("Failed to add chunk %d: %s", i, err)
		}
	}

	return nil
}

// Remove implements DB.Remove().
func (ts *TsDB) Remove(commitID *CommitID) error {
	removeRequest := &traceservice.RemoveRequest{
		Commitid: tsCommitID(commitID),
	}
	_, err := ts.traceService.Remove(context.Background(), removeRequest)
	return err
}

// List implements DB.List().
func (ts *TsDB) List(begin, end time.Time) ([]*CommitID, error) {
	listReq := &traceservice.ListRequest{
		Begin: begin.Unix(),
		End:   end.Unix(),
	}
	listResp, err := ts.traceService.List(context.Background(), listReq)
	if err != nil {
		return nil, fmt.Errorf("List request failed: %s", err)
	}
	// Copy the data from the ListResponse to a slice of CommitIDs.
	ret := []*CommitID{}
	for _, c := range listResp.Commitids {
		ret = append(ret, dbCommitID(c))
	}
	return ret, nil
}

// TileFromCommits implements DB.TileFromCommits().
func (ts *TsDB) TileFromCommits(commitIDs []*CommitID) (*tiling.Tile, []string, error) {
	ts.clearMutex.RLock()
	ts.clearMutex.RUnlock()
	ctx := context.Background()

	// Build the Tile.
	tile := tiling.NewTile()
	n := len(commitIDs)
	tile.Commits = make([]*tiling.Commit, n, n)
	hash := make([]string, n)

	// Populate the Tile's commits.
	for i, cid := range commitIDs {
		tile.Commits[i] = &tiling.Commit{
			Hash:       cid.ID,
			CommitTime: cid.Timestamp,
		}
	}

	// tileMutex protects access to the Tile. Note that this only means the Tile,
	// while writing values into a Trace that already exists and is the right
	// size is Go routine safe.
	var tileMutex sync.Mutex

	errCh := make(chan error, len(commitIDs))

	// Fill in the data for each commit in it's own Go routine.
	var wg sync.WaitGroup
	for i, cid := range commitIDs {
		wg.Add(1)
		go func(i int, cid *CommitID) {
			defer wg.Done()
			// Load the values for the commit.
			getValuesRequest := &traceservice.GetValuesRequest{
				Commitid: tsCommitID(cid),
			}
			getRawValues, err := ts.traceService.GetValuesRaw(ctx, getValuesRequest)
			if err != nil {
				errCh <- fmt.Errorf("Failed to get values for %d %#v: %s", i, *cid, err)
				return
			}
			// Convert raw response into values.
			ci, err := traceservice.NewCommitInfo(getRawValues.Value)
			if err != nil {
				errCh <- fmt.Errorf("Failed to convert values for %d %#v: %s", i, *cid, err)
				return
			}
			// Now make sure we have all the traceids for the trace64ids in ci.
			missingKeys64 := []uint64{}
			ts.mutex.Lock()
			for id64 := range ci.Values {
				if _, ok := ts.id64Cache[id64]; !ok {
					missingKeys64 = append(missingKeys64, id64)
				}
			}
			ts.mutex.Unlock()
			if len(missingKeys64) > 0 {
				traceidsRequest := &traceservice.GetTraceIDsRequest{
					Id: missingKeys64,
				}
				traceids, err := ts.traceService.GetTraceIDs(ctx, traceidsRequest)
				if err != nil {
					errCh <- fmt.Errorf("Failed to get traceids for trace64ids for %d %#v: %s", i, *cid, err)
					return
				}
				ts.mutex.Lock()
				for _, tid := range traceids.Ids {
					ts.id64Cache[tid.Id64] = tid.Id
				}
				ts.mutex.Unlock()
			}

			ts.mutex.Lock()
			for id64, rawValue := range ci.Values {
				if rawValue == nil {
					sklog.Errorf("Got a nil rawValue in response: %s", err)
					continue
				}
				traceid := ts.id64Cache[id64]
				tileMutex.Lock()
				tr, ok := tile.Traces[traceid]
				if !ok || tr == nil {
					tile.Traces[traceid] = ts.traceBuilder(n)
					tr = tile.Traces[traceid]
				}
				tileMutex.Unlock()
				if tr == nil {
					sklog.Errorf("Trace was still nil for key: %v", traceid)
					continue
				}
				if err := tr.SetAt(i, rawValue); err != nil {
					errCh <- fmt.Errorf("Unable to convert trace value %d %#v: %s", i, *cid, err)
					return
				}
			}
			// Fill in the commits hash.
			hash[i] = getRawValues.Md5
			ts.mutex.Unlock()
		}(i, cid)
	}
	wg.Wait()

	// See if any Go routine generated an error.
	select {
	case err, ok := <-errCh:
		if ok {
			return nil, nil, fmt.Errorf("Failed to load trace data: %s", err)
		}
	default:
	}

	sklog.Infof("Finished loading values. Starting to load Params.")

	// Now load the params for the traces.
	traceids := []string{}
	ts.mutex.Lock()
	for k := range tile.Traces {
		// Only load params for traces not already in the cache.
		if _, ok := ts.paramsCache[k]; !ok {
			traceids = append(traceids, k)
		}
	}
	ts.mutex.Unlock()

	// Break the loading of params into chunks and make those requests concurrently.
	// The params are just loaded into the paramsCache.
	tracechunks := [][]string{}
	for len(traceids) > 0 {
		if len(traceids) > CHUNK_SIZE {
			tracechunks = append(tracechunks, traceids[:CHUNK_SIZE])
			traceids = traceids[CHUNK_SIZE:]
		} else {
			tracechunks = append(tracechunks, traceids)
			traceids = []string{}
		}
	}

	errCh = make(chan error, len(tracechunks))
	for _, chunk := range tracechunks {
		wg.Add(1)
		go func(chunk []string) {
			defer wg.Done()
			req := &traceservice.GetParamsRequest{
				Traceids: chunk,
			}
			resp, err := ts.traceService.GetParams(ctx, req)
			if err != nil {
				errCh <- fmt.Errorf("Failed to load params: %s", err)
				return
			}
			for _, param := range resp.Params {
				ts.mutex.Lock()
				ts.paramsCache[param.Key] = param.Params
				ts.mutex.Unlock()
			}
		}(chunk)
	}
	wg.Wait()

	// See if any Go routine generated an error.
	select {
	case err, ok := <-errCh:
		if ok {
			return nil, nil, fmt.Errorf("Failed to load params: %s", err)
		}
	default:
	}

	// Add all params from the cache.
	ts.mutex.Lock()
	for k, tr := range tile.Traces {
		p := tr.Params()
		for pk, pv := range ts.paramsCache[k] {
			p[pk] = pv
		}
	}
	ts.mutex.Unlock()

	// Rebuild the ParamSet.
	sklog.Infof("Finished loading params. Starting to rebuild ParamSet.")
	tiling.GetParamSet(tile.Traces, tile.ParamSet)
	return tile, hash, nil
}

// ListMD5 returns the md5 hashes of the data stored for each commitid.
func (ts *TsDB) ListMD5(commitIDs []*CommitID) ([]string, error) {
	ctx := context.Background()
	req := &traceservice.ListMD5Request{
		Commitid: make([]*traceservice.CommitID, len(commitIDs)),
	}
	for i, cid := range commitIDs {
		req.Commitid[i] = tsCommitID(cid)
	}
	resp, err := ts.traceService.ListMD5(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("Failed to load hashes: %s", err)
	}
	ret := make([]string, len(commitIDs))
	for i, ci5 := range resp.Commitmd5 {
		ret[i] = ci5.Md5
	}
	return ret, nil
}

// Close the underlying connection to the datastore.
func (ts *TsDB) Close() error {
	return ts.conn.Close()
}

// NewTraceServiceDBFromAddress is given the address of the traceService
// implementation and returns an instance of the trace.DB
// (the higher level wrapper on top of trace service).
func NewTraceServiceDBFromAddress(traceServiceAddr string, traceBuilder tiling.TraceBuilder) (DB, error) {
	if traceServiceAddr == "" {
		return nil, fmt.Errorf("Did not get address for trace services.")
	}

	conn, err := grpc.Dial(traceServiceAddr, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MAX_MESSAGE_SIZE), grpc.MaxCallSendMsgSize(MAX_MESSAGE_SIZE)))
	if err != nil {
		return nil, fmt.Errorf("Unable to connnect to trace service at %s. Got error: %s", traceServiceAddr, err)
	}

	return NewTraceServiceDB(conn, traceBuilder)
}
