blob: adc77e7dde602e33f102dde8b260c13acf454970 [file] [log] [blame]
package eventbus
import (
"sync"
"go.skia.org/infra/go/util"
)
// CallbackFn defines the signature of all callback functions.
type CallbackFn func(interface{})
// EventBus is a minimal eventbus that allows to subscribe to events
// and publish events.
type EventBus struct {
// Map of handlers keyed by topic. This is used to keep track of subscriptions.
handlers map[string]*topicHandler
// Used to protect handlers.
mutex sync.Mutex
}
// Internal struct to keep keep track of an event and it's handlers.
type topicHandler struct {
callbacks []CallbackFn
wg sync.WaitGroup
}
// SubTopicFilter is used to accept or reject messages from a topic for inclusion in a sub topic.
type SubTopicFilter func(eventData interface{}) bool
// subTopicEntry stores the information to create a topic by filtering a topic.
type subTopicEntry struct {
// topic to be filtered.
topic string
// filter function that accepts or rejects a message from the underlying topic.
filterFn SubTopicFilter
}
var (
// GlobalEvents stores a map[topic]LRUCodec of global events that should
// be available through this eventbus.
globalEvents map[string]util.LRUCodec = map[string]util.LRUCodec{}
// globalEventsMutex protects globalEvents.
globalEventsMutex sync.Mutex
// subTopics stores topics that we generate by filtering other topics.
subTopics map[string]*subTopicEntry = map[string]*subTopicEntry{}
// subTopicsMutex protects subTopics.
subTopicsMutex sync.Mutex
)
// RegisterGlobalEvent registers a global event to be handled by
// instances of EventBus. A global event is identified by a topic(string)
// and a codec that can translate between go data structures and raw
// bytes slices.
// Events need to be be registered before an instance of EventBus is
// created. Ideally in the "init" function of the package that defines the
// event. This is necessary, because global events are used accross
// applications and best shared via the shared packages.
func RegisterGlobalEvent(topic string, codec util.LRUCodec) {
globalEventsMutex.Lock()
defer globalEventsMutex.Unlock()
globalEvents[topic] = codec
}
// RegisterSubTopic creates an event topic that is derived from an existing topic by
// applying a filter. In the background it subscribes to 'topic'. If it receives an
// event for topic it invokes the filter function. If the filter function returns true
// it will emit an event for the sub topic.
func RegisterSubTopic(topic, subTopic string, filterFn SubTopicFilter) {
subTopicsMutex.Lock()
defer subTopicsMutex.Unlock()
subTopics[subTopic] = &subTopicEntry{
topic: topic,
filterFn: filterFn,
}
}
// New returns a new instance of EventBus
func New() *EventBus {
globalEventsMutex.Lock()
defer globalEventsMutex.Unlock()
ret := &EventBus{
handlers: map[string]*topicHandler{},
}
return ret
}
// Publish publishes events for the provided topic. arg is passed
// to all functions that have subscribed to this event. The type
// and value of arg is event dependent.
func (e *EventBus) Publish(topic string, arg interface{}) {
e.publishEvent(topic, arg)
}
func (e *EventBus) publishEvent(topic string, arg interface{}) {
e.mutex.Lock()
defer e.mutex.Unlock()
if th, ok := e.handlers[topic]; ok {
for _, callback := range th.callbacks {
th.wg.Add(1)
go func(callback CallbackFn) {
defer th.wg.Done()
callback(arg)
}(callback)
}
}
}
// SubscribeAsync subscribes to the given topic. When an event for topic
// is published the callback function will be called. All function calls
// are asynchronous, i.e. run in a separate goroutine.
func (e *EventBus) SubscribeAsync(topic string, callback CallbackFn) {
e.mutex.Lock()
defer e.mutex.Unlock()
if !e.subscribeToSubTopic(topic, callback) {
e.subscribeWithLock(topic, callback)
}
}
// subscribeWithLocks registers the given callback for the given topic. It assumes
// that the subscription process has been locked with e.mutex.
func (e *EventBus) subscribeWithLock(topic string, callback CallbackFn) {
if th, ok := e.handlers[topic]; ok {
th.callbacks = append(th.callbacks, callback)
} else {
e.handlers[topic] = &topicHandler{callbacks: []CallbackFn{callback}}
}
}
// Wait will block until the goroutines for a specific topic have finished.
func (e *EventBus) Wait(topic string) {
// Block briefly to find the handler struct for the topic.
e.mutex.Lock()
th, ok := e.handlers[topic]
e.mutex.Unlock()
if ok {
th.wg.Wait()
}
}
// subscribeToSubTopic returns true if the given 'subTopic' is indeed a registered sub topic.
// In that case it will register for the underlying topic to filter events. This results in
// a recursive call to SubscribeAsync.
func (e *EventBus) subscribeToSubTopic(subTopic string, callback CallbackFn) bool {
subTopicsMutex.Lock()
entry, ok := subTopics[subTopic]
subTopicsMutex.Unlock()
if ok {
e.subscribeWithLock(entry.topic, func(data interface{}) {
if entry.filterFn(data) {
callback(data)
}
})
}
return ok
}