// ds is a package for using Google Cloud Datastore.
package ds

import (
	"context"
	"fmt"
	"os"
	"time"

	"cloud.google.com/go/datastore"
	"go.skia.org/infra/go/auth"
	"go.skia.org/infra/go/skerr"
	"go.skia.org/infra/go/sklog"
	"go.skia.org/infra/go/util"
	"golang.org/x/sync/errgroup"
	"google.golang.org/api/iterator"
	"google.golang.org/api/option"
)

// Global constants.
const (
	// Maximum number of entities which may be inserted or deleted at once.
	MAX_MODIFICATIONS = 500
)

type Kind string

// Below are all the Kinds used in all applications. New Kinds should be listed
// here, and they should all have unique values, because when defining indexes
// for Cloud Datastore the index config is per project, not pre-namespace.
// Remember to add them to KindsToBackup if they are to be backed up, and push
// a new version of /ds/go/datastore_backup.
const (
	// Predict
	FAILURES     Kind = "Failures"
	FLAKY_RANGES Kind = "FlakyRanges"

	// Perf
	SHORTCUT   Kind = "Shortcut"
	REGRESSION Kind = "Regression"
	ALERT      Kind = "Alert"

	// Android Compile
	COMPILE_TASK              Kind = "CompileTask"
	ANDROID_COMPILE_INSTANCES Kind = "AndroidCompileInstances"

	// Leasing
	TASK Kind = "Task"

	// CT
	CAPTURE_SKPS_TASKS              Kind = "CaptureSkpsTasks"
	CHROMIUM_ANALYSIS_TASKS         Kind = "ChromiumAnalysisTasks"
	CHROMIUM_BUILD_TASKS            Kind = "ChromiumBuildTasks"
	CHROMIUM_PERF_TASKS             Kind = "ChromiumPerfTasks"
	LUA_SCRIPT_TASKS                Kind = "LuaScriptTasks"
	METRICS_ANALYSIS_TASKS          Kind = "MetricsAnalysisTasks"
	PIXEL_DIFF_TASKS                Kind = "PixelDiffTasks"
	RECREATE_PAGESETS_TASKS         Kind = "RecreatePageSetsTasks"
	RECREATE_WEBPAGE_ARCHIVES_TASKS Kind = "RecreateWebpageArchivesTasks"
	CLUSTER_TELEMETRY_IDS           Kind = "ClusterTelemetryIDs"

	// Autoroll
	KIND_AUTOROLL_MODE                Kind = "AutorollMode"
	KIND_AUTOROLL_MODE_ANCESTOR       Kind = "AutorollModeAncestor" // Fake; used to force strong consistency for testing's sake.
	KIND_AUTOROLL_ROLL                Kind = "AutorollRoll"
	KIND_AUTOROLL_ROLL_ANCESTOR       Kind = "AutorollRollAncestor" // Fake; used to force strong consistency for testing's sake.
	KIND_AUTOROLL_STATUS              Kind = "AutorollStatus"
	KIND_AUTOROLL_STATUS_ANCESTOR     Kind = "AutorollStatusAncestor" // Fake; used to force strong consistency for testing's sake.
	KIND_AUTOROLL_STRATEGY            Kind = "AutorollStrategy"
	KIND_AUTOROLL_STRATEGY_ANCESTOR   Kind = "AutorollStrategyAncestor" // Fake; used to force strong consistency for testing's sake.
	KIND_AUTOROLL_UNTHROTTLE          Kind = "AutorollUnthrottle"
	KIND_AUTOROLL_UNTHROTTLE_ANCESTOR Kind = "AutorollUnthrottleAncestor" // Fake; used to force strong consistency for testing's sake.

	// AlertManager
	INCIDENT_AM               Kind = "IncidentAm"
	INCIDENT_ACTIVE_PARENT_AM Kind = "IncidentActiveParentAm"
	SILENCE_ACTIVE_PARENT_AM  Kind = "SilenceActiveParentAm"
	SILENCE_AM                Kind = "SilenceAm"
	REMINDER_AM               Kind = "ReminderAm"
)

// Namespaces that are used in production, and thus might be backed up.
const (
	// Perf
	PERF_NS                = "perf"
	PERF_ANDROID_NS        = "perf-android"
	PERF_ANDROID_X_NS      = "perf-android-x"
	PERF_ANDROID_MASTER_NS = "perf-androidmaster"
	PERF_CT_NS             = "perf-ct"
	PERF_FLUTTER_NS        = "perf-flutter"

	// Android Compile
	ANDROID_COMPILE_NS = "android-compile"

	// Leasing
	LEASING_SERVER_NS = "leasing-server"

	// CT
	CT_NS = "cluster-telemetry"

	// Autoroll
	AUTOROLL_NS          = "autoroll"
	AUTOROLL_INTERNAL_NS = "autoroll-internal"

	// AlertManager
	ALERT_MANAGER_NS = "alert-manager"
)

var (
	// KindsToBackup is a map from namespace to the list of Kinds to backup.
	// If this value is changed then remember to push a new version of /ds/go/datastore_backup.
	//
	// Note that we try to backup all kinds and all namespaces for every project
	// even if that app isn't running there, which has a better failure mode of
	// possibly backing up too much data rather than too little.
	KindsToBackup = map[string][]Kind{
		AUTOROLL_NS:            {KIND_AUTOROLL_MODE, KIND_AUTOROLL_MODE_ANCESTOR, KIND_AUTOROLL_ROLL, KIND_AUTOROLL_ROLL_ANCESTOR, KIND_AUTOROLL_STATUS, KIND_AUTOROLL_STATUS_ANCESTOR, KIND_AUTOROLL_STRATEGY, KIND_AUTOROLL_STRATEGY_ANCESTOR, KIND_AUTOROLL_UNTHROTTLE, KIND_AUTOROLL_UNTHROTTLE_ANCESTOR},
		AUTOROLL_INTERNAL_NS:   {KIND_AUTOROLL_MODE, KIND_AUTOROLL_MODE_ANCESTOR, KIND_AUTOROLL_ROLL, KIND_AUTOROLL_ROLL_ANCESTOR, KIND_AUTOROLL_STATUS, KIND_AUTOROLL_STATUS_ANCESTOR, KIND_AUTOROLL_STRATEGY, KIND_AUTOROLL_STRATEGY_ANCESTOR, KIND_AUTOROLL_UNTHROTTLE, KIND_AUTOROLL_UNTHROTTLE_ANCESTOR},
		PERF_NS:                {ALERT},
		PERF_ANDROID_NS:        {ALERT},
		PERF_ANDROID_X_NS:      {ALERT},
		PERF_ANDROID_MASTER_NS: {ALERT},
		PERF_CT_NS:             {ALERT},
		PERF_FLUTTER_NS:        {ALERT},
		ANDROID_COMPILE_NS:     {COMPILE_TASK, ANDROID_COMPILE_INSTANCES},
		LEASING_SERVER_NS:      {TASK},
		CT_NS:                  {CAPTURE_SKPS_TASKS, CHROMIUM_ANALYSIS_TASKS, CHROMIUM_BUILD_TASKS, CHROMIUM_PERF_TASKS, LUA_SCRIPT_TASKS, METRICS_ANALYSIS_TASKS, PIXEL_DIFF_TASKS, RECREATE_PAGESETS_TASKS, RECREATE_WEBPAGE_ARCHIVES_TASKS, CLUSTER_TELEMETRY_IDS},
		ALERT_MANAGER_NS:       {INCIDENT_AM, INCIDENT_ACTIVE_PARENT_AM, SILENCE_AM, SILENCE_ACTIVE_PARENT_AM, REMINDER_AM},
	}
)

var (
	// DS is the Cloud Datastore client. Valid after Init() has been called.
	DS *datastore.Client

	// Namespace is the datastore namespace that data will be stored in. Valid after Init() has been called.
	Namespace string
)

// InitWithOpt the Cloud Datastore Client (DS).
//
// project - The project name, i.e. "google.com:skia-buildbots".
// ns      - The datastore namespace to store data into.
// opt     - Options to pass to the client.
func InitWithOpt(project string, ns string, opts ...option.ClientOption) error {
	if ns == "" {
		return skerr.Fmt("Datastore namespace cannot be empty.")
	}

	Namespace = ns
	var err error
	DS, err = datastore.NewClient(context.Background(), project, opts...)
	if err != nil {
		return skerr.Fmt("Failed to initialize Cloud Datastore: %s", err)
	}
	return nil
}

// Init the Cloud Datastore Client (DS).
//
// project - The project name, i.e. "google.com:skia-buildbots".
// ns      - The datastore namespace to store data into.
func Init(project string, ns string) error {
	tok, err := auth.NewDefaultJWTServiceAccountTokenSource("https://www.googleapis.com/auth/datastore")
	if err != nil {
		return err
	}
	return InitWithOpt(project, ns, option.WithTokenSource(tok))
}

// InitForTesting is an init to call when running tests. It doesn't do any
// auth as it is expecting to run against the Cloud Datastore Emulator.
// See https://cloud.google.com/datastore/docs/tools/datastore-emulator
//
// project - The project name, i.e. "google.com:skia-buildbots".
// ns      - The datastore namespace to store data into.
func InitForTesting(project string, ns string, kinds ...Kind) error {
	Namespace = ns
	var err error
	DS, err = datastore.NewClient(context.Background(), project)
	if err != nil {
		return fmt.Errorf("Failed to initialize Cloud Datastore: %s", err)
	}
	return nil
}

// DeleteAll removes all entities of the given kind. If wait is true it waits
// until an eventually consistent query of the Kind returns a count of 0.
// Upon success the number of deleted entities is returned.
//
// Note: This is a very expensive operation if there are many entities of this
// kind and should be run as an 'offline' task.
func DeleteAll(client *datastore.Client, kind Kind, wait bool) (int, error) {
	const (
		// keyPageSize is the number of keys we retrieve at once
		keyPageSize = 10000

		// At most 500 keys can be deleted at once (Cloud Datastore limitation)
		deletePageSize = 500
	)

	sliceIter := newKeySliceIterator(client, kind, keyPageSize)
	slice, done, err := sliceIter.next()
	ctx := context.TODO()
	keySlices := [][]*datastore.Key{}

	totalKeyCount := 0
	for !done && (err == nil) {
		keySlices = append(keySlices, slice)
		totalKeyCount += len(slice)
		slice, done, err = sliceIter.next()
		sklog.Infof("Loaded %s %d keys %d", kind, len(slice), totalKeyCount)
	}
	if err != nil {
		return 0, err
	}

	// Delete all slices in parallel.
	var egroup errgroup.Group
	for _, slice := range keySlices {
		func(slice []*datastore.Key) {
			egroup.Go(func() error {
				for len(slice) > 0 {
					targetSlice := slice[:util.MinInt(deletePageSize, len(slice))]
					if err := client.DeleteMulti(ctx, targetSlice); err != nil {
						return err
					}
					slice = slice[len(targetSlice):]
				}
				return nil
			})
		}(slice)
	}
	if err := egroup.Wait(); err != nil {
		return 0, skerr.Fmt("Error deleting entities: %s", err)
	}

	// If we need to wait loop until the entity count goes to zero.
	if wait {
		found := 1
		for found > 0 {
			if found, err = client.Count(context.TODO(), NewQuery(kind)); err != nil {
				return 0, err
			}
			// Sleep proportional to the number of found keys, but no more than 10 seconds.
			sleepTimeMs := util.MinInt64(int64(found)*10, 10000)
			time.Sleep(time.Duration(sleepTimeMs) * time.Millisecond)
		}
	}
	return totalKeyCount, nil
}

// MigrateData copies all entries of the specified kind from the source datastore client
// to the destination datastore client.
func MigrateData(ctx context.Context, srcClient, dstClient *datastore.Client, kind Kind, createNewKey bool) error {
	q := NewQuery(kind)
	for t := srcClient.Run(ctx, q); ; {
		pl := &datastore.PropertyList{}
		key, err := t.Next(pl)
		if err == iterator.Done {
			break
		}
		if err != nil {
			return fmt.Errorf("Error getting the next entry from the source client: %s", err)
		}

		if createNewKey {
			key = NewKey(kind)
		}
		_, err = dstClient.Put(ctx, key, pl)
		sklog.Debugf("%s: %s", kind, key)
		if err != nil {
			return fmt.Errorf("Error putting %s into the destination client: %s", key, err)
		}
	}
	return nil
}

// Creates a new indeterminate key of the given kind.
func NewKey(kind Kind) *datastore.Key {
	return &datastore.Key{
		Kind:      string(kind),
		Namespace: Namespace,
	}
}

func NewKeyWithParent(kind Kind, parent *datastore.Key) *datastore.Key {
	ret := NewKey(kind)
	ret.Parent = parent
	return ret
}

// Creates a new query of the given kind with the right namespace.
func NewQuery(kind Kind) *datastore.Query {
	return datastore.NewQuery(string(kind)).Namespace(Namespace)
}

// IterKeysItem is the item returned by the IterKeys function via a channel.
type IterKeysItem struct {
	Keys []*datastore.Key
	Err  error
}

// IterKeys iterates all keys of the specified kind in slices of pageSize length.
func IterKeys(client *datastore.Client, kind Kind, pageSize int) (<-chan *IterKeysItem, error) {
	sliceIter := newKeySliceIterator(client, kind, pageSize)
	keySlice, done, err := sliceIter.next()
	if err != nil {
		return nil, err
	}

	retCh := make(chan *IterKeysItem)
	go func() {
		defer close(retCh)

		keyCount := 0
		for !done {
			keyCount += len(keySlice)
			retCh <- &IterKeysItem{
				Keys: keySlice,
				Err:  err,
			}
			// Get the next slice of keys.
			keySlice, done, err = sliceIter.next()
		}
	}()
	return retCh, nil
}

// keySliceIterator allows to iterate over the keys of a specific entity
// in slices of fixed size.
type keySliceIterator struct {
	client    *datastore.Client
	kind      Kind
	pageSize  int
	orderedBy []string
	cursorStr string
	done      bool
}

// newKeySliceIterator returns a new keySliceIterator instance for the given kind.
// 'pageSize' defines the size of slices that are returned by the next method.
// 'orderedBy' allows to sort the slices with the same operators as datastore.Query.
func newKeySliceIterator(client *datastore.Client, kind Kind, pageSize int, orderedBy ...string) *keySliceIterator {
	return &keySliceIterator{
		client:    client,
		kind:      kind,
		pageSize:  pageSize,
		orderedBy: orderedBy,
		cursorStr: "",
	}
}

// next returns the next slice of keys of the iterator. If the returned bool is
// true no more keys are available.
func (k *keySliceIterator) next() ([]*datastore.Key, bool, error) {
	// Once we have reached the end, don't run the query again.
	if k.done {
		return []*datastore.Key{}, true, nil
	}

	query := NewQuery(k.kind).KeysOnly().Limit(k.pageSize)
	for _, ob := range k.orderedBy {
		query = query.Order(ob)
	}

	if k.cursorStr != "" {
		cursor, err := datastore.DecodeCursor(k.cursorStr)
		if err != nil {
			return nil, false, skerr.Fmt("Bad cursor %s: %s", k.cursorStr, err)
		}
		query = query.Start(cursor)
	}

	it := k.client.Run(context.TODO(), query)
	var err error
	var key *datastore.Key
	retKeys := make([]*datastore.Key, 0, k.pageSize)

	for {
		if key, err = it.Next(nil); err != nil {
			break
		}
		retKeys = append(retKeys, key)
	}

	if err != iterator.Done {
		return nil, false, skerr.Fmt("Error retrieving keys: %s", err)
	}

	// Get the string for the next page.
	cursor, err := it.Cursor()
	if err != nil {
		return nil, false, skerr.Fmt("Error retrieving next cursor: %s", err)
	}

	// Check if the string representation of the cursor has changed.
	newCursorStr := cursor.String()
	k.done = (k.cursorStr == newCursorStr)
	k.cursorStr = newCursorStr

	// We are not officially done while we have results to return.
	return retKeys, !(len(retKeys) > 0), nil
}

// EnsureNotEmulator will panic if it detects the Datastore Emulator is configured.
func EnsureNotEmulator() {
	s := os.Getenv("DATASTORE_EMULATOR_HOST")
	if s != "" {
		panic(`Datastore Emulator detected. Be sure to unset the following environment variables:
DATASTORE_EMULATOR_HOST
DATASTORE_EMULATOR_HOST_PATH
DATASTORE_HOST
`)
	}
}
