package ds_expstore

import (
	"context"
	"fmt"
	"sort"
	"strconv"
	"sync"

	"cloud.google.com/go/datastore"
	"go.skia.org/infra/go/ds"
	"go.skia.org/infra/go/eventbus"
	"go.skia.org/infra/go/skerr"
	"go.skia.org/infra/go/sklog"
	"go.skia.org/infra/go/util"
	"go.skia.org/infra/golden/go/dsutil"
	"go.skia.org/infra/golden/go/expstorage"
	"go.skia.org/infra/golden/go/types"
	"golang.org/x/sync/errgroup"
)

// DSExpStore implements the ExpectationsStore interface with the
// Google Cloud Datastore as its backend.
// Since the difference between storing expectations for the master branch and
// the tryjobs of a Gerrit CL is only the inclusion of an issue id, it also
// simultaneously supports storing expectations for Gerrit issues using
// the same interface and based on the same storage client.
//
// To separate concerns, we store overall expectations (i.e. for the master branch)
// in the ds.MASTER_EXP_CHANGE and ds.MASTER_TEST_DIGEST_EXP entities.
// Expectations for Gerrit issues are stored in the ds.TRYJOB_EXP_CHANGE and
// ds.TRYJOB_TEST_DIGEST_EXP entities.
//
// We use instances of TDESlice to record both, expectations and expectation changes.
// These are usually stored as child entities.
//
// Expectation changes are stored as immutable, timestamped instances of
// 'ExpChange', which then act as parents to instances of TDESlice.
//
// Expectations are stored as children of summary entities. We maintain one
// summary entity for each logical expectation store, i.e. one for the master
// and one for each Gerrit issue.
// The summary entities also keep track of recently added expectation changes
// to provide a consistent listing.
//
// When expectation change, events of type EV_EXPSTORAGE_CHANGED and
// EV_TRYJOB_EXP_CHANGED are fired for the master branch and Gerrit issues
// respectively.
// Both events contain instances of EventExpectationChange as their payload.
//
type DSExpStore struct {
	// issueID is the id of the Gerrit issue and <0 for other expectations,
	// i.e. the master branch
	issueID int64

	client   *datastore.Client
	eventBus eventbus.EventBus

	// recentKeysList keeps track of recently added changes. This allows to
	// provide a consistent listing of changes.
	recentKeysList *dsutil.RecentKeysList

	// summaryKey is the key of the summary entity which stores the keys of
	// recent changes and acts as the parent entity for overall expectations.
	summaryKey *datastore.Key

	expectationsKey *datastore.Key
	blobStore       *dsutil.BlobStore

	// Use different entities depending on whether this manages expectations
	// for the master or a Gerrit issue
	changeKind ds.Kind

	// eventExpChange keeps track of which event to fire when the expectations change.
	eventExpChange string

	// globalEvent keeps track whether we want to send a events within this instance
	// or on a global event bus.
	globalEvent bool

	// lastTS and tsMutex ensure that we get distinct timestamps with ms granularity
	lastTS  int64
	tsMutex sync.Mutex
}

// ExpChange is used to store an expectation change in the database. Each
// expectation change is an atomic change to expectations for an issue.
// The actual expectations are captured in instances of TestDigestExp.
type ExpChange struct {
	ChangeID         *datastore.Key `datastore:"__key__"`
	IssueID          int64
	UserID           string
	TimeStamp        int64 `datastore:",noindex"`
	Count            int64 `datastore:",noindex"`
	UndoChangeID     int64
	OK               bool
	ExpectationsBlob *datastore.Key `datastore:",noindex"`
}

// expectationsState stores the state of expectations for either master or a Gerrit issue.
type expectationsState struct {
	ExpectationsBlob *datastore.Key // key of the blob that stores expectations
}

// DeprecatedNew returns an ExpectationsStore implementation based on
// Cloud Datastore for the master branch and a factory to create ExpectationsStore
// instances for Gerrit issues. The factory uses the same datastore client as the
// master store.
func DeprecatedNew(client *datastore.Client, eventBus eventbus.EventBus) (*DSExpStore, error) {
	if client == nil {
		return nil, sklog.FmtErrorf("Received nil for datastore client.")
	}

	// Create the instance for the master and set the target entities for the
	// master branch.
	summaryKey := ds.NewKey(ds.HELPER_RECENT_KEYS)
	summaryKey.Name = "expstorage-recent-keys-master"
	expectationsKey := ds.NewKey(ds.EXPECTATIONS_BLOB_ROOT)
	expectationsKey.Name = "expstorage-expectations-master"
	blobStore := dsutil.NewBlobStore(client, ds.EXPECTATIONS_BLOB_ROOT, ds.EXPECTATIONS_BLOB)

	store := &DSExpStore{
		issueID:         types.MasterBranch,
		changeKind:      ds.MASTER_EXP_CHANGE,
		eventExpChange:  expstorage.EV_EXPSTORAGE_CHANGED,
		globalEvent:     true,
		client:          client,
		eventBus:        eventBus,
		summaryKey:      summaryKey,
		expectationsKey: expectationsKey,
		recentKeysList:  dsutil.NewRecentKeysList(client, summaryKey, dsutil.DefaultConsistencyDelta),
		blobStore:       blobStore,
	}

	// Check the connection to the cloud datastore and if we could load the
	// expectations successfully.
	_, _, err := store.loadCurrentExpectations(nil)
	if err != nil {
		return nil, skerr.Fmt("Error in test call to the cloud datastore: %s", err)
	}
	return store, nil
}

// ForIssue implements the ExpectationsStore interface.
func (c *DSExpStore) ForIssue(issueID int64) expstorage.ExpectationsStore {
	summaryKey := ds.NewKey(ds.HELPER_RECENT_KEYS)
	summaryKey.Name = fmt.Sprintf("expstorage-issue-%d", issueID)
	expectationsKey := ds.NewKey(ds.EXPECTATIONS_BLOB_ROOT)
	expectationsKey.Name = fmt.Sprintf("expstorage-expectations-issue-%d", issueID)
	return &DSExpStore{
		issueID:         issueID,
		changeKind:      ds.TRYJOB_EXP_CHANGE,
		eventExpChange:  expstorage.EV_TRYJOB_EXP_CHANGED,
		globalEvent:     false,
		client:          c.client,
		eventBus:        c.eventBus,
		summaryKey:      summaryKey,
		expectationsKey: expectationsKey,
		recentKeysList:  dsutil.NewRecentKeysList(c.client, summaryKey, dsutil.DefaultConsistencyDelta),
		blobStore:       c.blobStore,
	}
}

// Get implements the ExpectationsStore interface.
func (c *DSExpStore) Get() (types.Expectations, error) {
	expectations, _, err := c.loadCurrentExpectations(nil)
	if err != nil {
		return nil, sklog.FmtErrorf("Error retrieving expectations: %s", err)
	}
	return expectations, nil
}

// AddChange implements the ExpectationsStore interface.
func (c *DSExpStore) AddChange(ctx context.Context, changes types.Expectations, userID string) error {
	_, err := c.makeChange(ctx, changes, userID, c.getUniqueTimeStampMs(), 0, true)
	return err
}

// ImportChange bypasses the ExpectationStore interface to copy change records directly.
func (c *DSExpStore) ImportChange(ctx context.Context, changes types.Expectations, userID string, timeStamp int64) (*datastore.Key, error) {
	return c.makeChange(ctx, changes, userID, timeStamp, 0, false)
}

// QueryLog implements the ExpectationsStore interface.
func (c *DSExpStore) QueryLog(ctx context.Context, offset, size int, details bool) ([]expstorage.TriageLogEntry, int, error) {
	allKeys, err := c.getExpChangeKeys(ctx, 0)
	if err != nil {
		return nil, 0, sklog.FmtErrorf("Error retrieving keys for expectation changes: %s", err)
	}

	if offset < 0 {
		offset = 0
	}

	if size <= 0 {
		size = len(allKeys)
	}

	start := util.MinInt(offset, len(allKeys))
	end := util.MinInt(start+size, len(allKeys))
	retKeys := allKeys[start:end]

	ret := make([]expstorage.TriageLogEntry, 0, len(retKeys))
	expChanges := make([]*ExpChange, len(retKeys))
	if err := c.client.GetMulti(ctx, retKeys, expChanges); err != nil {
		return nil, 0, sklog.FmtErrorf("Error retrieving expectation changes: %s", err)
	}

	for _, change := range expChanges {
		ret = append(ret, expstorage.TriageLogEntry{
			ID:          strconv.FormatInt(change.ChangeID.ID, 10),
			Name:        change.UserID,
			TS:          change.TimeStamp,
			ChangeCount: int(change.Count),
			Details:     nil,
		})
	}

	// If we want details fetch them in parallel.
	var egroup errgroup.Group
	var detailRecs [][]expstorage.TriageDetail
	if details {
		detailRecs = make([][]expstorage.TriageDetail, len(retKeys))
		for idx, expChange := range expChanges {
			func(idx int, blobKey *datastore.Key) {
				egroup.Go(func() error {
					exp := types.Expectations{}
					if err := c.blobStore.Load(blobKey, &exp); err != nil {
						return err
					}

					triageDetails := make([]expstorage.TriageDetail, 0, len(exp))
					for testName, digests := range exp {
						for digest, label := range digests {
							triageDetails = append(triageDetails, expstorage.TriageDetail{
								TestName: testName,
								Digest:   digest,
								Label:    label.String(),
							})
						}
					}

					sort.Slice(triageDetails, func(i, j int) bool {
						return (triageDetails[i].TestName < triageDetails[j].TestName) ||
							((triageDetails[i].TestName == triageDetails[j].TestName) &&
								(triageDetails[i].Digest < triageDetails[j].Digest))
					})
					detailRecs[idx] = triageDetails
					return nil
				})
			}(idx, expChange.ExpectationsBlob)
		}
	}

	// Wait for all queries to finish.
	if err := egroup.Wait(); err != nil {
		return nil, 0, err
	}

	// Fill in the details.
	if details {
		for i := range ret {
			ret[i].Details = detailRecs[i]
		}
	}

	return ret, len(allKeys), nil
}

// UndoChange implements the ExpectationsStore interface.
func (c *DSExpStore) UndoChange(ctx context.Context, changeIDStr, userID string) (types.Expectations, error) {
	changeID, err := strconv.ParseInt(changeIDStr, 10, 64)
	// Make sure the entity is valid.
	if err != nil || changeID <= 0 {
		return nil, sklog.FmtErrorf("Change with id %s does not exist.", changeIDStr)
	}

	// Fetch the change record of the change we want to undo.
	expChange := &ExpChange{}
	expChangeKey := ds.NewKey(c.changeKind)
	expChangeKey.ID = changeID
	if err := c.client.Get(ctx, expChangeKey, expChange); err != nil {
		if err == datastore.ErrNoSuchEntity {
			return nil, sklog.FmtErrorf("Change with id %d does not exist.", changeID)
		}
		return nil, sklog.FmtErrorf("Error retrieving change %d: %s", expChangeKey.ID, err)
	}

	// Fetch the actual changes.
	undoChanges := types.Expectations{}
	if err := c.blobStore.Load(expChange.ExpectationsBlob, &undoChanges); err != nil {
		return nil, sklog.FmtErrorf("Error retrieving expectations blob: %s", err)
	}

	// If this has been undone already, then don't do it.
	if expChange.UndoChangeID != 0 {
		return nil, fmt.Errorf("Unable to undo change %d which was created as an undo of change %d.", changeID, expChange.UndoChangeID)
	}

	// Retrieve the keys of all changes prior to the one we want to undo to
	// build the expectations at the time of the original change
	prevChangeKeys, err := c.getExpChangeKeys(ctx, changeID)
	if err != nil {
		return nil, sklog.FmtErrorf("Error retrieving keys for expectation changes: %s", err)
	}

	// Build the expectations at that point.
	prevExp, err := c.calcExpectations(ctx, prevChangeKeys)
	if err != nil {
		return nil, sklog.FmtErrorf("Unable to get expectations for undo: %s", err)
	}

	changes := types.Expectations{}
	for testName, digests := range undoChanges {
		changes[testName] = make(types.TestClassification, len(digests))
		for digest := range digests {
			changes[testName][digest] = prevExp[testName][digest]
		}
	}

	_, err = c.makeChange(ctx, changes, userID, c.getUniqueTimeStampMs(), changeID, true)
	return changes, err
}

// Clear implements the ExpectationsStore interface.
func (c *DSExpStore) Clear(ctx context.Context) error {
	delKeys := []*datastore.Key{c.summaryKey, c.expectationsKey}

	allExpChangeKeys, err := c.getExpChangeKeys(ctx, 0)
	if err != nil {
		return sklog.FmtErrorf("Error retrieving keys for expectation changes: %s", err)
	}
	delBlobKeys := make([]*datastore.Key, 0, len(allExpChangeKeys)+1)

	_, state, err := c.loadCurrentExpectations(nil)
	if err != nil {
		return sklog.FmtErrorf("Error loading current expectations: %s", err)
	}
	delBlobKeys = append(delBlobKeys, state.ExpectationsBlob)

	// Extract the keys of the blobs storing the expectations.
	for _, key := range allExpChangeKeys {
		expChange := &ExpChange{}
		if err := c.client.Get(ctx, key, expChange); err != nil {
			return sklog.FmtErrorf("Error retrieving expectations change %d: %s", key.ID, err)
		}
		delBlobKeys = append(delBlobKeys, expChange.ExpectationsBlob)
	}

	// Add the expectation change keys  to the keys that need to be deleted.
	delKeys = append(delKeys, allExpChangeKeys...)

	var egroup errgroup.Group

	// Delete the expectations blobs.
	for _, key := range delBlobKeys {
		if key != nil {
			func(key *datastore.Key) {
				egroup.Go(func() error { return c.blobStore.Delete(key) })
			}(key)
		}
	}

	// Delete all keys we have accumulated. 500 at a time which the limit for
	// cloud datastore.
	for _, batch := range dsutil.Batch(delKeys, 500) {
		func(batch []*datastore.Key) {
			egroup.Go(func() error { return c.client.DeleteMulti(ctx, batch) })
		}(batch)
	}

	// Wait until it's all done.
	return egroup.Wait()
}

// calcExpectations calculates the expectations by accumulating the expectation changes
// referenced by the given list of keys. keys are assumed to be sorted in
// reverse chronological order.
func (c *DSExpStore) calcExpectations(ctx context.Context, keys []*datastore.Key) (types.Expectations, error) {
	concurrent := make(chan bool, 10000)
	changes := make([]types.Expectations, len(keys))
	var egroup errgroup.Group

	for idx, key := range keys {
		concurrent <- true
		func(idx int, key *datastore.Key) {
			egroup.Go(func() error {
				defer func() {
					<-concurrent
				}()

				exps, err := c.getChanges(ctx, key)
				changes[idx] = exps
				return err
			})
		}(idx, key)
	}
	if err := egroup.Wait(); err != nil {
		return nil, err
	}

	ret := types.Expectations{}
	for i := len(changes) - 1; i >= 0; i-- {
		ret.MergeExpectations(changes[i])
	}
	return ret, nil
}

// makeChange updates the expectations by adding a new change record to the datastore.
// timeStampMs is the timestamp of this change.
// If undoChangeId is larger than 0 then it will be recorded in the change record
// since this is an undo of an earlier change.
// If transactional is true it the change will be added in a transaction.
// This should only be false when we import existing data.
func (c *DSExpStore) makeChange(ctx context.Context, changes types.Expectations, userId string, timeStampMs int64, undoChangeID int64, transactional bool) (changeKey *datastore.Key, err error) {
	// Get the total count of changes so we can include it in the change record.
	count := 0
	for _, digests := range changes {
		count += len(digests)
	}

	// Write the expectation changes.
	blobKey, err := c.blobStore.Save(changes)
	if err != nil {
		return nil, sklog.FmtErrorf("Saving changes to blob failed: %s", err)
	}

	// If we have an error it means the transaction below failed and we want
	// to delete the part that was created outside of the transaction.
	purgeKeys := []*datastore.Key(nil)
	actions := dsutil.TxActions{}
	actions.AddRollbackFn(func() error { return c.blobStore.Delete(blobKey) })
	actions.AddRollbackFn(func() error { return c.client.DeleteMulti(ctx, purgeKeys) })
	defer func() { actions.Run(err) }()

	// Add a new change record with the OK flag set to false. This
	// allows us to create change records outside of the transaction and
	// potentially in parallel without the write limits of doing it in a
	// transaction. The change record is not valid (= included in
	// searches until the OK flag is set to true inside the transaction below).
	changeKey = dsutil.TimeSortableKey(c.changeKind, timeStampMs)
	expChange := &ExpChange{
		IssueID:          c.issueID,
		UserID:           userId,
		UndoChangeID:     undoChangeID,
		TimeStamp:        timeStampMs,
		OK:               false,
		ExpectationsBlob: blobKey,
		Count:            int64(count),
	}
	if changeKey, err = c.client.Put(ctx, changeKey, expChange); err != nil {
		return nil, sklog.FmtErrorf("Error writing change record: %s", err)
	}
	purgeKeys = append(purgeKeys, changeKey)

	updateFn := func(tx *datastore.Transaction) error {
		// Start transaction to:
		//  - store the key of the new change record to deal with eventual consistency
		//  - add the change to the summary
		//  - mark the change as valid.

		// Update the recent changes so we get full consistency on queries.
		if err := c.recentKeysList.Add(tx, changeKey); err != nil {
			return err
		}

		// Update the overall expectations
		if err := c.updateCurrentExpectations(tx, changes, false, &actions); err != nil {
			return err
		}

		// Mark the expectation change as valid.
		expChange.OK = true
		_, err := tx.Put(changeKey, expChange)
		return err
	}

	// Run the relevant updates in a transaction.
	if transactional {
		if _, err = c.client.RunInTransaction(ctx, updateFn); err != nil {
			return nil, sklog.FmtErrorf("Error updating expectations and recentKeysList for change %d: %s", changeKey.ID, err)
		}
	} else {
		expChange.OK = true
		if _, err = c.client.Mutate(ctx, datastore.NewUpdate(changeKey, expChange)); err != nil {
			return nil, sklog.FmtErrorf("Error commiting the expectation change: %s", err)
		}
	}

	if c.eventBus != nil {
		c.eventBus.Publish(c.eventExpChange, &expstorage.EventExpectationChange{
			TestChanges: changes,
			IssueID:     c.issueID,
		}, c.globalEvent)
	}
	return changeKey, nil
}

// updateCurrentExpectations updates the current overall expectations with the changes
// provided. The expectations are the sum of all change records in the database.
// We continuously keep track of that sum as new change records are added.
func (c *DSExpStore) updateCurrentExpectations(tx *datastore.Transaction, changes types.Expectations, overwrite bool, actions *dsutil.TxActions) (err error) {
	currentExp, expState, err := c.loadCurrentExpectations(tx)
	if err != nil {
		return sklog.FmtErrorf("Error loading current expectations: %s", err)
	}
	oldExpsBlob := expState.ExpectationsBlob

	if overwrite || (currentExp == nil) {
		currentExp = changes.DeepCopy()
	} else {
		currentExp.MergeExpectations(changes)
	}

	// Create a new entry for the expectations
	newBlobKey, err := c.blobStore.Save(currentExp)
	if err != nil {
		return sklog.FmtErrorf("Error writing new expectations: %s", err)
	}

	// delete the new blob if we fail
	delNewBlobFn := func() error {
		if err := c.blobStore.Delete(newBlobKey); err != nil {
			return sklog.FmtErrorf("Error deleting new expectations blob: %s", err)
		}
		return nil
	}

	// either at the very end of this function or as part of the transaction
	if tx == nil {
		defer func() {
			if err != nil {
				util.LogErr(delNewBlobFn())
			}
		}()
	} else {
		actions.AddRollbackFn(delNewBlobFn)
	}

	// Write the new key to our expectation state
	expState.ExpectationsBlob = newBlobKey

	putFn := dsutil.PutFn(c.client, tx)
	if err = putFn(c.expectationsKey, expState); err != nil {
		return sklog.FmtErrorf("Error writing new expectations blob: %s", err)
	}

	// If there is not old blob to be deleted we are done
	if oldExpsBlob == nil {
		return nil
	}

	// Remove the old blob either right away or after the transaction succeeds
	delOldBlob := func() error {
		if err := c.blobStore.Delete(oldExpsBlob); err != nil {
			return sklog.FmtErrorf("Error removing old expectations blob: %s", err)
		}
		return nil
	}
	if tx == nil {
		if err := delOldBlob(); err != nil {
			sklog.Errorf("Error deleting old blob data: %s", err)
		}
		return nil
	}
	actions.AddCommitFn(delOldBlob)
	return nil
}

// getExpChangeKeys returns the keys of all expectation changes for the given issue
// in reverse chronological order. If beforeID is larger than 0 it is assumed to be
// an ID that was created via TimeSortableKey and we only want to retrieve keys that are
// older than the time stamp encoded in beforeID.
// The time is extracted with the GetTimeFromID function.
func (c *DSExpStore) getExpChangeKeys(ctx context.Context, beforeID int64) ([]*datastore.Key, error) {
	// Query all changes
	var egroup errgroup.Group
	var queryKeys []*datastore.Key
	egroup.Go(func() error {
		q := ds.NewQuery(c.changeKind).
			Filter("OK =", true).
			KeysOnly()

		if !types.IsMasterBranch(c.issueID) {
			q = q.Filter("IssueID =", c.issueID)
		}

		var err error
		queryKeys, err = c.client.GetAll(ctx, q, nil)
		return err
	})

	// Load the recent added changes.
	var recently *dsutil.Recently
	egroup.Go(func() error {
		var err error
		// Get the recently changed keys. Note: these are added/removed in a
		// transaction so we are guaranteed they their OK value is true.
		recently, err = c.recentKeysList.GetRecent()
		return err
	})

	if err := egroup.Wait(); err != nil {
		return nil, sklog.FmtErrorf("Error retrieving keys of expectation changes: %s", err)
	}

	// Combine the recent keys with the result of the query for a consistent list
	// of the keys. ret will be sorted.
	ret := recently.Combine(queryKeys)

	// Remove all keys that are newer than the target key
	if beforeID > 0 {
		// Find keys that are strictly older than the given ID.
		beforeTS := dsutil.GetTimeFromID(beforeID)
		idx := sort.Search(len(ret), func(i int) bool {
			return dsutil.GetTimeFromID(ret[i].ID) < beforeTS
		})
		ret = ret[idx:]
	}

	return ret, nil
}

// loadCurrentExpectations loads the current expectations for this expectation
// store (either for the master branch or for a Gerrit issue). If no expectations
// have been set it will return non-nil values and no error.
func (c *DSExpStore) loadCurrentExpectations(tx *datastore.Transaction) (types.Expectations, *expectationsState, error) {
	getFn := dsutil.GetFn(c.client, tx)
	exp := types.Expectations{}
	expState := &expectationsState{}
	if err := getFn(c.expectationsKey, expState); err != nil && err != datastore.ErrNoSuchEntity {
		return nil, nil, err
	}

	var err error
	if expState.ExpectationsBlob != nil {
		if err = c.blobStore.Load(expState.ExpectationsBlob, &exp); err != nil {
			return nil, nil, err
		}
	}

	return exp, expState, err
}

// getChanges loads the changes for the given expectations change key.
func (c *DSExpStore) getChanges(ctx context.Context, expChangeKey *datastore.Key) (types.Expectations, error) {
	expChange := &ExpChange{}
	if err := c.client.Get(ctx, expChangeKey, expChange); err != nil {
		return nil, err
	}

	ret := types.Expectations{}
	if expChange.ExpectationsBlob != nil {
		if err := c.blobStore.Load(expChange.ExpectationsBlob, &ret); err != nil {
			return nil, sklog.FmtErrorf("Unable to load expectations blob: %s", err)
		}
	}
	return ret, nil
}

// getUniqueTimeStampMs returns a unique time in milliseconds
func (c *DSExpStore) getUniqueTimeStampMs() int64 {
	c.tsMutex.Lock()
	defer c.tsMutex.Unlock()
	ts := util.TimeStampMs()
	if ts <= c.lastTS {
		ts = c.lastTS + 1
	}
	c.lastTS = ts
	return ts
}

// Make sure DSExpStore fulfills the ExpectationsStore interface
var _ expstorage.ExpectationsStore = (*DSExpStore)(nil)
