blob: 809fc0180fc779edcb5a32b83556d6cc10567fbb [file] [log] [blame]
package eventbus
import (
"sort"
"sync"
"testing"
"go.skia.org/infra/go/testutils"
assert "github.com/stretchr/testify/require"
)
const LOCAL_TOPIC = "local-topic"
const SYNC_MSG = -1
type testType struct {
ID int
Value string
}
func init() {
}
func TestEventBus(t *testing.T) {
testutils.MediumTest(t)
eventBus := New()
ch := make(chan int, 5)
eventBus.SubscribeAsync("topic1", func(e interface{}) { ch <- 1 })
eventBus.SubscribeAsync("topic2", func(e interface{}) { ch <- (e.(int)) + 1 })
eventBus.SubscribeAsync("topic2", func(e interface{}) { ch <- e.(int) })
eventBus.Publish("topic1", nil)
eventBus.Publish("topic2", 2)
eventBus.Wait("topic1")
eventBus.Wait("topic2")
assert.Equal(t, 3, len(ch))
vals := []int{<-ch, <-ch, <-ch}
sort.Ints(vals)
assert.Equal(t, []int{1, 2, 3}, vals)
}
func TestSubTopics(t *testing.T) {
testutils.MediumTest(t)
testutils.SkipIfShort(t)
const N_NUMBERS = 200
const ALL_NUMBERS_EVENT = "allNumbers"
const EVEN_NUMBERS_EVENT = "evenNumbers"
RegisterSubTopic(ALL_NUMBERS_EVENT, EVEN_NUMBERS_EVENT, func(data interface{}) bool {
i, ok := data.(int)
if !ok {
return false
}
return i%2 == 0
})
eventBus := New()
allCh := make(chan int, N_NUMBERS*3)
evenCh := make(chan int, N_NUMBERS*3)
eventBus.SubscribeAsync(ALL_NUMBERS_EVENT, func(e interface{}) { allCh <- e.(int) })
eventBus.SubscribeAsync(EVEN_NUMBERS_EVENT, func(e interface{}) { evenCh <- e.(int) })
allExpected := []int{}
evenExpected := []int{}
for i := 0; i < N_NUMBERS; i++ {
eventBus.Publish(ALL_NUMBERS_EVENT, i)
allExpected = append(allExpected, i)
if i%2 == 0 {
evenExpected = append(evenExpected, i)
}
}
eventBus.Wait(ALL_NUMBERS_EVENT)
close(allCh)
close(evenCh)
assert.Equal(t, N_NUMBERS, len(allCh))
compChan(t, allExpected, allCh)
compChan(t, evenExpected, evenCh)
}
func compChan(t assert.TestingT, exp []int, ch <-chan int) {
actual := []int{}
for v := range ch {
actual = append(actual, v)
}
sort.Ints(actual)
assert.Equal(t, exp, actual)
}
type atomicMap struct {
m map[int]*testType
mutex sync.Mutex
ready bool
}
func newAtomicMap() *atomicMap {
return &atomicMap{
m: map[int]*testType{},
ready: false,
}
}
func (a *atomicMap) Add(k int, v *testType) {
a.mutex.Lock()
defer a.mutex.Unlock()
a.m[k] = v
}
func (a *atomicMap) Len() int {
a.mutex.Lock()
defer a.mutex.Unlock()
return len(a.m)
}
func (a *atomicMap) setReady() {
a.mutex.Lock()
defer a.mutex.Unlock()
a.ready = true
}
func (a *atomicMap) isReady() bool {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.ready
}