package main

import (
	"context"
	"flag"
	"fmt"
	"os"
	"path/filepath"
	"sync"
	"time"

	swarmingapi "go.chromium.org/luci/common/api/swarming/swarming/v1"
	"go.skia.org/infra/go/common"
	"go.skia.org/infra/go/httputils"
	"go.skia.org/infra/go/sklog"
	"go.skia.org/infra/go/swarming"
	"go.skia.org/infra/go/util"
	"golang.org/x/oauth2/google"
)

/*
	List/Cancel/Retry all swarming tasks that match the specified tags.
*/

const (
	LIST_CMD         = "list"
	CANCEL_CMD       = "cancel"
	RETRY_CMD        = "retry"
	STDOUT_CMD       = "stdout"
	WORKER_POOL_SIZE = 20
	// Number of tasks to display before displaying confirmation of whether to
	// proceed.
	CONFIRMATION_DISPLAY_LIMIT = 5
)

var (
	cmd      = flag.String("cmd", "", "Which swarming operation to use. Eg: list, cancel, retry, stdout.")
	tags     = common.NewMultiStringFlag("tag", nil, "Colon-separated key/value pair, eg: \"runid:testing\" Tags with which to find matching tasks. Can specify multiple times.")
	pool     = flag.String("pool", "", "Which Swarming pool to use.")
	workdir  = flag.String("workdir", ".", "Working directory. Optional, but recommended not to use CWD.")
	state    = flag.String("state", "PENDING", "State the matching tasks should be in. Possible values are: ALL, BOT_DIED, CANCELED, CLIENT_ERROR, COMPLETED, COMPLETED_FAILURE, COMPLETED_SUCCESS, DEDUPED, EXPIRED, PENDING, PENDING_RUNNING, RUNNING, TIMED_OUT")
	internal = flag.Bool("internal", false, "Run against internal swarming instance.")

	supportedCmds = map[string]bool{
		LIST_CMD:   true,
		CANCEL_CMD: true,
		RETRY_CMD:  true,
		STDOUT_CMD: true,
	}
)

func main() {
	// Setup, parse args.
	common.Init()

	ctx := context.Background()

	if _, ok := supportedCmds[*cmd]; !ok {
		sklog.Fatalf("--cmd must be one of %v", supportedCmds)
	}
	if len(*tags) == 0 {
		sklog.Fatal("Atleast one --tag is required.")
	}
	if *pool == "" {
		sklog.Fatal("--pool is required.")
	}

	var err error
	*workdir, err = filepath.Abs(*workdir)
	if err != nil {
		sklog.Fatal(err)
	}

	// Authenticated HTTP client.
	ts, err := google.DefaultTokenSource(ctx, swarming.AUTH_SCOPE)
	if err != nil {
		sklog.Fatal(err)
	}
	httpClient := httputils.DefaultClientConfig().WithTokenSource(ts).With2xxOnly().Client()

	// Swarming API client.
	swarmingServer := swarming.SWARMING_SERVER
	if *internal {
		swarmingServer = swarming.SWARMING_SERVER_PRIVATE
	}
	swarmApi, err := swarming.NewApiClient(httpClient, swarmingServer)
	if err != nil {
		sklog.Fatal(err)
	}

	// Obtain the list of tasks.
	tagsWithPool := []string{fmt.Sprintf("pool:%s", *pool)}
	for _, t := range *tags {
		tagsWithPool = append(tagsWithPool, t)
	}
	tasks, err := swarmApi.ListTasks(ctx, time.Time{}, time.Time{}, tagsWithPool, *state)
	if err != nil {
		sklog.Fatal(err)
	}
	if len(tasks) == 0 {
		sklog.Info("Found no matching tasks.")
		return
	}

	if *cmd == LIST_CMD {
		sklog.Infof("Found %d tasks...", len(tasks))
		for _, t := range tasks {
			sklog.Infof("  %s", getTaskStr(t))
		}
		sklog.Infof("Listed %d tasks.", len(tasks))

	} else if *cmd == CANCEL_CMD {
		resp, err := displayTasksAndConfirm(tasks, "canceling")
		if err != nil {
			sklog.Fatal(err)
		}
		if resp {
			sklog.Infof("Starting cancelation of %d tasks...", len(tasks))
			tasksChannel := getClosedTasksChannel(tasks, false /* dedupNames */)
			var wg sync.WaitGroup
			// Loop through workers in the worker pool.
			for i := 0; i < WORKER_POOL_SIZE; i++ {
				// Increment the WaitGroup counter.
				wg.Add(1)
				// Create and run a goroutine closure that cancels tasks.
				go func() {
					// Decrement the WaitGroup counter when the goroutine completes.
					defer wg.Done()

					for t := range tasksChannel {
						if err := swarmApi.CancelTask(ctx, t.TaskId, false /* killRunning */); err != nil {
							sklog.Errorf("Could not delete %s: %s", getTaskStr(t), err)
							continue
						}
						sklog.Infof("Deleted  %s", getTaskStr(t))
					}
				}()
			}
			// Wait for all spawned goroutines to complete
			wg.Wait()

			sklog.Infof("Cancelled %d tasks.", len(tasks))
		}

	} else if *cmd == RETRY_CMD {
		resp, err := displayTasksAndConfirm(tasks, "retrying")
		if err != nil {
			sklog.Fatal(err)
		}
		if resp {
			sklog.Infof("Starting retries of %d tasks...", len(tasks))
			tasksChannel := getClosedTasksChannel(tasks, true /* dedupNames */)

			var wg sync.WaitGroup
			// Loop through workers in the worker pool.
			for i := 0; i < WORKER_POOL_SIZE; i++ {
				// Increment the WaitGroup counter.
				wg.Add(1)
				// Create and run a goroutine closure that retries tasks.
				go func() {
					// Decrement the WaitGroup counter when the goroutine completes.
					defer wg.Done()

					for t := range tasksChannel {
						if _, err := swarmApi.RetryTask(ctx, t); err != nil {
							sklog.Errorf("Could not retry %s: %s", getTaskStr(t), err)
							continue
						}
						sklog.Infof("Retried  %s", getTaskStr(t))
					}
				}()
			}
			// Wait for all spawned goroutines to complete
			wg.Wait()

			sklog.Infof("Retried %d tasks.", len(tasks))
		}
	} else if *cmd == STDOUT_CMD {
		resp, err := displayTasksAndConfirm(tasks, "downloading stdout")
		if err != nil {
			sklog.Fatal(err)
		}
		if resp {
			sklog.Infof("Starting downloading from %d tasks...", len(tasks))
			tasksChannel := getClosedTasksChannel(tasks, false /* dedupNames */)
			outDir := filepath.Join(*workdir, "stdout")
			if err := os.MkdirAll(outDir, 0755); err != nil {
				sklog.Fatal(err)
			}

			var wg sync.WaitGroup
			// Loop through workers in the worker pool.
			for i := 0; i < WORKER_POOL_SIZE; i++ {
				// Increment the WaitGroup counter.
				wg.Add(1)
				// Create and run a goroutine closure that cancels tasks.
				go func() {
					// Decrement the WaitGroup counter when the goroutine completes.
					defer wg.Done()

					for t := range tasksChannel {
						stdout, err := swarmApi.GetStdoutOfTask(ctx, t.TaskId)
						if err != nil {
							sklog.Errorf("Could not download from %s: %s", getTaskStr(t), err)
							continue
						}
						destFile := filepath.Join(outDir, fmt.Sprintf("%s-%s.txt", t.Request.Name, t.TaskId))
						if err := os.WriteFile(destFile, []byte(stdout.Output), 0644); err != nil {
							sklog.Errorf("Could not write log to %s: %s", destFile, err)
							continue
						}
						sklog.Infof("Downloaded from %s", getTaskStr(t))
					}
				}()
			}
			// Wait for all spawned goroutines to complete
			wg.Wait()

			sklog.Infof("Downloaded stdout from %d tasks into %s.", len(tasks), outDir)
		}
	}
}

func displayTasksAndConfirm(tasks []*swarmingapi.SwarmingRpcsTaskRequestMetadata, verb string) (bool, error) {
	sklog.Infof("Found %d tasks. Displaying the first %d:", len(tasks), CONFIRMATION_DISPLAY_LIMIT)
	for _, t := range tasks[:util.MinInt(CONFIRMATION_DISPLAY_LIMIT, len(tasks))] {
		sklog.Infof("  %s", getTaskStr(t))
	}
	sklog.Infof("Would you like to proceed with %s %d tasks? ('y' or 'n')", verb, len(tasks))
	return askForConfirmation()
}

func askForConfirmation() (bool, error) {
	var response string
	if _, err := fmt.Scanln(&response); err != nil {
		return false, err
	}
	if response == "y" {
		return true, nil
	} else if response == "n" {
		return false, nil
	} else {
		sklog.Info("Please type 'y' or 'n' and then press enter:")
		return askForConfirmation()
	}
}

func getClosedTasksChannel(tasks []*swarmingapi.SwarmingRpcsTaskRequestMetadata, dedupNames bool) chan *swarmingapi.SwarmingRpcsTaskRequestMetadata {
	// Create channel that contains specified tasks. This channel will
	// be consumed by the worker pool.
	tasksChannel := make(chan *swarmingapi.SwarmingRpcsTaskRequestMetadata, len(tasks))
	// Dictionary that will be used for deduping by names if dedupNames is true.
	taskNames := map[string]bool{}

	for _, t := range tasks {
		if dedupNames {
			if _, ok := taskNames[t.Request.Name]; ok {
				// Already encountered this task. Continue.
				sklog.Errorf("Found another %s: %s", t.Request.Name, t.TaskId)
				continue
			}
		}
		tasksChannel <- t
		taskNames[t.Request.Name] = true
	}
	close(tasksChannel)
	return tasksChannel
}

func getTaskStr(t *swarmingapi.SwarmingRpcsTaskRequestMetadata) string {
	return fmt.Sprintf("Task id: %s  Created: %s  Name: %s", t.TaskId, t.Request.CreatedTs, t.Request.Name)
}
