package gevent

import (
	"regexp"
	"sort"
	"testing"
	"time"

	assert "github.com/stretchr/testify/require"
	"go.skia.org/infra/go/eventbus"
	"go.skia.org/infra/go/testutils"
	"go.skia.org/infra/go/util"
)

const (
	PROJECT_ID             = "test-project"
	LOCAL_TOPIC            = "testing-local-topic"
	SUBSCRIBER_1           = "buildbot-1"
	SUBSCRIBER_2           = "buildbot-2"
	SUBSCRIBER_STORAGE_EVT = "buildbot-storage-evt"

	// TEST_BUCKET is not actually accessed, it's just used to test synthetic storate events.
	TEST_BUCKET = "skia-not-existing-gm"
	TEST_PREFIX = "dm-json-v1"
)

// Test structure that is send as the payload on the event channels.
type testType struct {
	ID        int
	Value     string
	TimeStamp uint64
}

func TestEventBus(t *testing.T) {
	testutils.LargeTest(t)

	testCodec := util.JSONCodec(&testType{})
	RegisterCodec("channel1", testCodec)
	RegisterCodec("channel2", testCodec)

	eventBus, err := New(PROJECT_ID, LOCAL_TOPIC, SUBSCRIBER_1)
	assert.NoError(t, err)

	eventBusTwo, err := New(PROJECT_ID, LOCAL_TOPIC, SUBSCRIBER_2)
	assert.NoError(t, err)

	ch := make(chan int, 5)
	eventBus.SubscribeAsync("channel1", func(e interface{}) {
		ch <- e.(*testType).ID
	})

	eventBus.SubscribeAsync("channel2", func(e interface{}) {
		ch <- e.(*testType).ID
	})

	eventBus.SubscribeAsync("channel2", func(e interface{}) {
		ch <- e.(*testType).ID
	})

	now := uint64(time.Now().UnixNano()) / uint64(time.Millisecond)
	eventBusTwo.Publish("channel1", &testType{
		ID:        1,
		Value:     "value 1",
		TimeStamp: now,
	}, true)
	eventBusTwo.Publish("channel2", &testType{
		ID:        2,
		Value:     "value 2",
		TimeStamp: now + 10,
	}, true)

	// Give the messages 10 seconds to process.
	startTime := time.Now()
	for {
		time.Sleep(time.Second)
		if time.Now().Sub(startTime) > (time.Second * 10) {
			assert.FailNow(t, "Timeout: did not receive messages in time")
		}
		if len(ch) == 3 {
			break
		}
	}
	assert.Equal(t, 3, len(ch))
	vals := []int{<-ch, <-ch, <-ch}
	sort.Ints(vals)
	assert.Equal(t, []int{1, 2, 2}, vals)
}

func TestSynStorageEvents(t *testing.T) {
	testutils.LargeTest(t)

	eventBus, err := New(PROJECT_ID, LOCAL_TOPIC, SUBSCRIBER_STORAGE_EVT)
	assert.NoError(t, err)

	// Disable actual subscription to the bucket. It's not possible to test right now, but
	// if the subscription fails or doesn't work we will know immediately when deploying.
	eventBus.(*distEventBus).disableGCSSubscriptions = true

	targetFileRegExp := regexp.MustCompile(`.*\.json`)
	storageEvtChan, err := eventBus.RegisterStorageEvents(TEST_BUCKET, TEST_PREFIX, targetFileRegExp, nil)
	assert.NoError(t, err)

	evtCh := make(chan interface{}, 1)
	eventBus.SubscribeAsync(storageEvtChan, func(evt interface{}) {
		evtCh <- evt
	})

	now := util.TimeStamp(time.Microsecond)
	testObjID := TEST_PREFIX + "/2018/11/01/15/89468e1cc434e93baeed282fd0c250b1d963c017/linux_xfa_rel/1541086007/pixel/dm.json"
	evt := eventbus.NewStorageEvent(TEST_BUCKET, testObjID, now, "5bf5542e57a662120b400c4cff7e9c40")
	eventBus.PublishStorageEvent(evt)

	assert.NoError(t, testutils.EventuallyConsistent(50*time.Millisecond, func() error {
		select {
		case evt := <-evtCh:
			sEvt := evt.(*eventbus.StorageEvent)
			assert.Equal(t, TEST_BUCKET, sEvt.BucketID)
			assert.Equal(t, testObjID, sEvt.ObjectID)
			assert.Equal(t, now, sEvt.TimeStamp)
			return nil
		default:
			return testutils.TryAgainErr
		}
	}))
}
