blob: 6c48069980a20073cc2cea63ad3172edc13cfd1f [file] [log] [blame]
package gevent
import (
"os"
"sort"
"testing"
"time"
assert "github.com/stretchr/testify/require"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/testutils"
"go.skia.org/infra/go/util"
)
const LOCAL_TOPIC = "testing-local-topic"
const SUBSCRIBER_1 = "buildbot-1"
const SUBSCRIBER_2 = "buildbot-2"
// Test structure that is send as the payload on the event channels.
type testType struct {
ID int
Value string
TimeStamp uint64
}
func TestEventBus(t *testing.T) {
testutils.MediumTest(t)
if os.Getenv("PUBSUB_EMULATOR_HOST") == "" {
t.Skip(`Skipping tests that require a local Cloud PubSub emulator.
Set the environment: $(gcloud beta emulators pubsub env-init)
Run the emulator: gcloud beta emulators pubsub start`)
}
testCodec := util.JSONCodec(&testType{})
RegisterCodec("channel1", testCodec)
RegisterCodec("channel2", testCodec)
eventBus, err := New(common.PROJECT_ID, LOCAL_TOPIC, SUBSCRIBER_1)
assert.NoError(t, err)
eventBusTwo, err := New(common.PROJECT_ID, LOCAL_TOPIC, SUBSCRIBER_2)
assert.NoError(t, err)
ch := make(chan int, 5)
eventBus.SubscribeAsync("channel1", func(e interface{}) {
ch <- e.(*testType).ID
})
eventBus.SubscribeAsync("channel2", func(e interface{}) {
ch <- e.(*testType).ID
})
eventBus.SubscribeAsync("channel2", func(e interface{}) {
ch <- e.(*testType).ID
})
now := uint64(time.Now().UnixNano()) / uint64(time.Millisecond)
eventBusTwo.Publish("channel1", &testType{
ID: 1,
Value: "value 1",
TimeStamp: now,
}, true)
eventBusTwo.Publish("channel2", &testType{
ID: 2,
Value: "value 2",
TimeStamp: now + 10,
}, true)
// Give the messages 10 seconds to process.
startTime := time.Now()
for {
time.Sleep(time.Second)
if time.Now().Sub(startTime) > (time.Second * 10) {
assert.FailNow(t, "Timeout: did not receive messages in time")
}
if len(ch) == 3 {
break
}
}
assert.Equal(t, 3, len(ch))
vals := []int{<-ch, <-ch, <-ch}
sort.Ints(vals)
assert.Equal(t, []int{1, 2, 2}, vals)
}