package swarming_metrics

import (
	"context"
	"fmt"
	"io/ioutil"
	"testing"
	"time"

	"github.com/stretchr/testify/require"
	swarming_api "go.chromium.org/luci/common/api/swarming/swarming/v1"
	bt_testutil "go.skia.org/infra/go/bt/testutil"
	"go.skia.org/infra/go/common"
	"go.skia.org/infra/go/metrics2"
	"go.skia.org/infra/go/metrics2/events"
	"go.skia.org/infra/go/swarming"
	"go.skia.org/infra/go/taskname"
	"go.skia.org/infra/go/testutils"
	"go.skia.org/infra/go/testutils/unittest"
	"go.skia.org/infra/go/util"
	"go.skia.org/infra/perf/go/ingest/format"
	"go.skia.org/infra/perf/go/perfclient"
)

func makeTask(id, name string, created, started, completed time.Time, dims map[string]string, extraTags map[string]string, botOverhead, downloadOverhead, uploadOverhead time.Duration) *swarming_api.SwarmingRpcsTaskRequestMetadata {
	dimensions := make([]*swarming_api.SwarmingRpcsStringPair, 0, len(dims))
	tags := make([]string, 0, len(dims))
	for k, v := range dims {
		dimensions = append(dimensions, &swarming_api.SwarmingRpcsStringPair{
			Key:   k,
			Value: v,
		})
		tags = append(tags, fmt.Sprintf("%s:%s", k, v))
	}
	for k, v := range extraTags {
		tags = append(tags, fmt.Sprintf("%s:%s", k, v))
	}
	duration := 0.0
	if !util.TimeIsZero(completed) {
		duration = float64(completed.Sub(started) / time.Second)
	}
	return &swarming_api.SwarmingRpcsTaskRequestMetadata{
		Request: &swarming_api.SwarmingRpcsTaskRequest{
			CreatedTs: created.UTC().Format(swarming.TIMESTAMP_FORMAT),
			Tags:      tags,
			Name:      name,
			TaskSlices: []*swarming_api.SwarmingRpcsTaskSlice{
				{
					Properties: &swarming_api.SwarmingRpcsTaskProperties{
						Dimensions: dimensions,
					},
				},
			},
		},
		TaskId: id,
		TaskResult: &swarming_api.SwarmingRpcsTaskResult{
			CreatedTs:   created.UTC().Format(swarming.TIMESTAMP_FORMAT),
			CompletedTs: completed.UTC().Format(swarming.TIMESTAMP_FORMAT),
			DedupedFrom: "",
			Duration:    duration,
			Name:        name,
			PerformanceStats: &swarming_api.SwarmingRpcsPerformanceStats{
				BotOverhead: float64(botOverhead / time.Second),
				IsolatedDownload: &swarming_api.SwarmingRpcsOperationStats{
					Duration:            float64(downloadOverhead / time.Second),
					TotalBytesItemsCold: 50000000.0,
				},
				IsolatedUpload: &swarming_api.SwarmingRpcsOperationStats{
					Duration:            float64(uploadOverhead / time.Second),
					TotalBytesItemsCold: 70000000.0,
				},
			},
			StartedTs: started.UTC().Format(swarming.TIMESTAMP_FORMAT),
			State:     swarming.TASK_STATE_COMPLETED,
			TaskId:    id,
		},
	}
}

func TestLoadSwarmingTasks(t *testing.T) {
	unittest.LargeTest(t)

	wd, err := ioutil.TempDir("", "")
	require.NoError(t, err)
	defer testutils.RemoveAll(t, wd)

	// Fake some tasks in Swarming.
	swarm := swarming.NewMockApiClient()
	defer swarm.AssertExpectations(t)
	pc := perfclient.NewMockPerfClient()
	defer pc.AssertExpectations(t)
	mp := taskname.NewMockTaskNameParser()
	defer mp.AssertExpectations(t)
	now := time.Now()
	lastLoad := now.Add(-time.Hour)

	cr := now.Add(-30 * time.Minute)
	st := now.Add(-29 * time.Minute)
	co := now.Add(-15 * time.Minute)

	d := map[string]string{
		"os":   "Ubuntu",
		"pool": "Skia",
	}

	t1 := makeTask("1", "my-task", cr, st, co, d, nil, 0.0, 0.0, 0.0)
	t2 := makeTask("2", "my-task", cr.Add(time.Second), st, time.Time{}, d, nil, 0.0, 0.0, 0.0)
	t2.TaskResult.State = swarming.TASK_STATE_RUNNING
	swarm.On("ListTasks", lastLoad, now, []string{"pool:Skia"}, "").Return([]*swarming_api.SwarmingRpcsTaskRequestMetadata{t1, t2}, nil)

	btProject, btInstance, cleanup := bt_testutil.SetupBigTable(t, events.BT_TABLE, events.BT_COLUMN_FAMILY)
	defer cleanup()
	edb, err := events.NewBTEventDB(context.Background(), btProject, btInstance, nil)
	require.NoError(t, err)

	// Load Swarming tasks.
	revisit := []string{}
	revisit, err = loadSwarmingTasks(swarm, "Skia", edb, pc, mp, lastLoad, now, revisit)
	require.NoError(t, err)

	// Ensure that we inserted the expected task and added the other to
	// the revisit list.
	require.Equal(t, 1, len(revisit))
	assertCount := func(from, to time.Time, expect int) {
		require.NoError(t, testutils.EventuallyConsistent(5*time.Second, func() error {
			ev, err := edb.Range(streamForPool("Skia"), from, to)
			require.NoError(t, err)
			if len(ev) != expect {
				return testutils.TryAgainErr
			}
			return nil
		}))
	}
	assertCount(lastLoad, now, 1)

	// datahopper will follow up on the revisit list (which is t2's id)
	swarm.On("GetTaskMetadata", "2").Return(t2, nil)

	// The second task is finished.
	t2.TaskResult.State = swarming.TASK_STATE_COMPLETED
	t2.TaskResult.CompletedTs = now.Add(5 * time.Minute).UTC().Format(swarming.TIMESTAMP_FORMAT)

	lastLoad = now
	now = now.Add(10 * time.Minute)

	// This is empty because datahopper will pull in the task data from revisit
	swarm.On("ListTasks", lastLoad, now, []string{"pool:Skia"}, "").Return([]*swarming_api.SwarmingRpcsTaskRequestMetadata{}, nil)

	// Load Swarming tasks again.
	revisit, err = loadSwarmingTasks(swarm, "Skia", edb, pc, mp, lastLoad, now, revisit)
	require.NoError(t, err)

	// Ensure that we loaded details for the unfinished task from the last
	// attempt.
	require.Equal(t, 0, len(revisit))
	assertCount(now.Add(-time.Hour), now, 2)
}

func TestMetrics(t *testing.T) {
	unittest.LargeTest(t)

	wd, err := ioutil.TempDir("", "")
	require.NoError(t, err)
	defer testutils.RemoveAll(t, wd)

	// Fake a task in Swarming.
	swarm := swarming.NewMockApiClient()
	defer swarm.AssertExpectations(t)
	pc := perfclient.NewMockPerfClient()
	defer pc.AssertExpectations(t)
	mp := taskname.NewMockTaskNameParser()
	defer mp.AssertExpectations(t)
	// This needs to be now, otherwise the metrics won't be aggregated
	// due to the requirement to list the period (e.g. 24h)
	now := time.Now()
	lastLoad := now.Add(-time.Hour)

	cr := now.Add(-30 * time.Minute)
	st := now.Add(-29 * time.Minute)
	co := now.Add(-15 * time.Minute)

	d := map[string]string{
		"os":   "Ubuntu",
		"pool": "Skia",
	}

	t1 := makeTask("1", "my-task", cr, st, co, d, nil, 0.0, 0.0, 0.0)
	t1.TaskResult.PerformanceStats.BotOverhead = 21
	t1.TaskResult.PerformanceStats.IsolatedUpload.Duration = 13
	t1.TaskResult.PerformanceStats.IsolatedDownload.Duration = 7
	swarm.On("ListTasks", lastLoad, now, []string{"pool:Skia"}, "").Return([]*swarming_api.SwarmingRpcsTaskRequestMetadata{t1}, nil)

	// Setup the metrics.
	btProject, btInstance, cleanup := bt_testutil.SetupBigTable(t, events.BT_TABLE, events.BT_COLUMN_FAMILY)
	defer cleanup()
	edb, em, err := setupMetrics(context.Background(), btProject, btInstance, "Skia", nil)
	require.NoError(t, err)

	// Load the Swarming task, ensure that it got inserted.
	revisit := []string{}
	revisit, err = loadSwarmingTasks(swarm, "Skia", edb, pc, mp, lastLoad, now, revisit)
	require.NoError(t, err)
	require.Equal(t, 0, len(revisit))
	ev, err := edb.Range(streamForPool("Skia"), lastLoad, now)
	require.NoError(t, err)
	require.Equal(t, 1, len(ev))

	// Forcibly update metrics.
	require.NoError(t, em.UpdateMetrics())

	// Ensure that each of the aggregation functions gets us the correct
	// values.

	checkMetricVal := func(metric string, expect float64) {
		tags := map[string]string{
			"metric":    metric,
			"os":        "Ubuntu",
			"period":    "24h0m0s",
			"pool":      "Skia",
			"stream":    streamForPool("Skia"),
			"task_name": "my-task",
		}
		for k := range includeDimensions {
			if _, ok := tags[k]; !ok {
				tags[k] = ""
			}
		}
		mx := metrics2.GetFloat64Metric(MEASUREMENT_SWARMING_TASKS, tags)
		require.NotNil(t, mx)
		require.Equal(t, expect, mx.Get())
	}

	checkMetricVal("duration", float64(co.Sub(st)/1000000))
	checkMetricVal("pending-time", float64(st.Sub(cr)/1000000))
	checkMetricVal("overhead-bot", 21000.0)
	checkMetricVal("overhead-upload", 13000.0)
	checkMetricVal("overhead-download", 7000.0)
	checkMetricVal("cas-cache-miss-download", 50000000.0)
	checkMetricVal("cas-cache-miss-upload", 70000000.0)
}

func TestPerfUpload(t *testing.T) {
	unittest.LargeTest(t)

	wd, err := ioutil.TempDir("", "")
	require.NoError(t, err)
	defer testutils.RemoveAll(t, wd)

	// Fake some tasks in Swarming.
	swarm := swarming.NewMockApiClient()
	defer swarm.AssertExpectations(t)
	pc := perfclient.NewMockPerfClient()
	defer pc.AssertExpectations(t)
	mp := taskname.NewMockTaskNameParser()
	defer mp.AssertExpectations(t)
	now := time.Now()
	lastLoad := now.Add(-time.Hour)

	cr := now.Add(-30 * time.Minute)
	st := now.Add(-29 * time.Minute)
	co := now.Add(-15 * time.Minute)

	d := map[string]string{
		"os":   "Ubuntu",
		"pool": "Skia",
	}

	t1 := makeTask("1", "Test-MyOS (retry)", cr, st, co, d, map[string]string{
		"sk_revision": "firstRevision",
		"sk_name":     "Test-MyOS",
		"sk_repo":     common.REPO_SKIA,
	}, 17*time.Second, 5*time.Second, 4*time.Second)
	t2 := makeTask("2", "Perf-MyOS", cr.Add(time.Minute), st, time.Time{}, d, map[string]string{
		"sk_revision": "secondRevision",
		"sk_name":     "Perf-MyOS",
		"sk_repo":     common.REPO_SKIA,
	}, 37*time.Second, 23*time.Second, 4*time.Second)
	t2.TaskResult.State = swarming.TASK_STATE_RUNNING
	t3 := makeTask("3", "my-task", cr.Add(2*time.Second), st, now.Add(-time.Minute), d, nil, 47*time.Second, 3*time.Second, 34*time.Second)
	t3.TaskResult.State = swarming.TASK_STATE_BOT_DIED
	t4 := makeTask("4", "Test-MyOS", cr, st, co, d, map[string]string{
		"sk_revision":     "firstRevision",
		"sk_name":         "Test-MyOS",
		"sk_repo":         common.REPO_SKIA,
		"sk_issue":        "12345",
		"sk_patchset":     "6",
		"sk_issue_server": "https://skia-review.googlesource.com",
	}, 31*time.Second, 7*time.Second, 3*time.Second)

	swarm.On("ListTasks", lastLoad, now, []string{"pool:Skia"}, "").Return([]*swarming_api.SwarmingRpcsTaskRequestMetadata{t1, t2, t3, t4}, nil)

	btProject, btInstance, cleanup := bt_testutil.SetupBigTable(t, events.BT_TABLE, events.BT_COLUMN_FAMILY)
	defer cleanup()
	edb, err := events.NewBTEventDB(context.Background(), btProject, btInstance, nil)
	require.NoError(t, err)

	mp.On("ParseTaskName", "Test-MyOS").Return(map[string]string{
		"os":   "MyOS",
		"role": "Test",
	}, nil)

	pc.On("PushToPerf", now, "Test-MyOS", "task_duration", format.BenchData{
		Hash: "firstRevision",
		Key: map[string]string{
			"os":      "MyOS",
			"role":    "Test",
			"failure": "false",
		},
		Results: map[string]format.BenchResults{
			"Test-MyOS": {
				"task_duration": {
					"total_s":        float64((14*time.Minute + 17*time.Second) / time.Second),
					"task_step_s":    float64(14 * time.Minute / time.Second),
					"cas_overhead_s": 9.0,
					"all_overhead_s": 17.0,
				},
			},
		},
		Source: "datahopper",
	}).Return(nil)
	pc.On("PushToPerf", now, "Test-MyOS", "task_duration", format.BenchData{
		Hash:     "firstRevision",
		Issue:    "12345",
		PatchSet: "6",
		Key: map[string]string{
			"os":      "MyOS",
			"role":    "Test",
			"failure": "false",
		},
		Results: map[string]format.BenchResults{
			"Test-MyOS": {
				"task_duration": {
					"total_s":        float64((14*time.Minute + 31*time.Second) / time.Second),
					"task_step_s":    float64(14 * time.Minute / time.Second),
					"cas_overhead_s": 10.0,
					"all_overhead_s": 31.0,
				},
			},
		},
		Source:       "datahopper",
		PatchStorage: "gerrit",
	}).Return(nil)

	// Load Swarming tasks.
	revisit := []string{}
	revisit, err = loadSwarmingTasks(swarm, "Skia", edb, pc, mp, lastLoad, now, revisit)
	require.NoError(t, err)

	pc.AssertNumberOfCalls(t, "PushToPerf", 2)

	// The second task is finished.
	t2.TaskResult.State = swarming.TASK_STATE_COMPLETED
	t2.TaskResult.CompletedTs = now.Add(5 * time.Minute).UTC().Format(swarming.TIMESTAMP_FORMAT)
	t2.TaskResult.Duration = float64(33 * time.Minute / time.Second)
	t2.TaskResult.Failure = true

	lastLoad = now
	now = now.Add(10 * time.Minute)
	// This is empty because datahopper will pull in the task data from revisit
	swarm.On("ListTasks", lastLoad, now, []string{"pool:Skia"}, "").Return([]*swarming_api.SwarmingRpcsTaskRequestMetadata{}, nil)

	// datahopper will follow up on the revisit list (which is t2's id)
	swarm.On("GetTaskMetadata", "2").Return(t2, nil)

	mp.On("ParseTaskName", "Perf-MyOS").Return(map[string]string{
		"os":   "MyOS",
		"role": "Perf",
	}, nil)

	pc.On("PushToPerf", now, "Perf-MyOS", "task_duration", format.BenchData{
		Hash: "secondRevision",
		Key: map[string]string{
			"os":      "MyOS",
			"role":    "Perf",
			"failure": "true",
		},
		Results: map[string]format.BenchResults{
			"Perf-MyOS": {
				"task_duration": {
					"total_s":        float64((33*time.Minute + 37*time.Second) / time.Second),
					"task_step_s":    float64(33 * time.Minute / time.Second),
					"cas_overhead_s": 27.0,
					"all_overhead_s": 37.0,
				},
			},
		},
		Source: "datahopper",
	}).Return(nil)

	// Load Swarming tasks again.

	revisit, err = loadSwarmingTasks(swarm, "Skia", edb, pc, mp, lastLoad, now, revisit)
	require.NoError(t, err)
	pc.AssertNumberOfCalls(t, "PushToPerf", 3)

}
