blob: 4231f0e2f7423c2bfedad4f842872cc321de1996 [file] [log] [blame]
package gevent
import (
"context"
"fmt"
"sync"
"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
"go.skia.org/infra/go/eventbus"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
)
// codecMap holds codecs for the different event channels. Values are added
// via the RegisterCodec function.
var codecMap = sync.Map{}
// RegisterCodec defines a codec for the given event channel.
func RegisterCodec(channel string, codec util.LRUCodec) {
codecMap.Store(channel, codec)
}
// DistEventBus implements the eventbus.EventBus interfase on top of Cloud PubSub.
type distEventBus struct {
localEventBus eventbus.EventBus
client *pubsub.Client
clientID string
topic *pubsub.Topic
sub *pubsub.Subscription
wrapperCodec util.LRUCodec
}
// channelWrapper wraps each message to do channel multiplexing on top of a
// Cloud PubSub topic.
type channelWrapper struct {
Sender string `json:"sender"` // id of the sending node.
Channel string `json:"eventType"` // event channel of this message.
Data []byte `json:"data"` // payload encoded with the user supplied codec.
}
// New returns an instance of eventbus.EventBus that is a node in a distributed
// eventbus.
// Each instance is a node in a distributed event bus that allows to send events
// on an arbitrary number of channels.
// - projectID is the id of the GCP project where the PubSub topic should live.
// - topicName is the topic to use. It is assume that all message on this topic
// are messages of the
// event bus.
// - subscriberName is an id that uniquely identifies this node within the
// event bus network.
// - opts are the options used to create an authenticated PubSub client.
func New(projectID, topicName, subscriberName string, opts ...option.ClientOption) (eventbus.EventBus, error) {
ret := &distEventBus{
localEventBus: eventbus.New(),
wrapperCodec: util.JSONCodec(&channelWrapper{}),
}
// Create the client.
var err error
opts = append(opts, option.WithScopes(pubsub.ScopePubSub))
ret.client, err = pubsub.NewClient(context.Background(), projectID, opts...)
if err != nil {
return nil, sklog.FmtErrorf("Error creating pubsub client: %s", err)
}
// Set up the pubsub client, topic and subscription.
if err := ret.setupTopicSub(topicName, subscriberName); err != nil {
return nil, err
}
// Start the receiver.
ret.startReceiver()
return ret, nil
}
// Publish implements the eventbus.EventBus interface.
func (d *distEventBus) Publish(channel string, arg interface{}, globally bool) {
if globally {
// publish to pubsub in the background.
go func() {
codecInstance, ok := codecMap.Load(channel)
if !ok {
sklog.Errorf("Unable to publish on channel '%s'. No codec defined.", channel)
return
}
msg, err := d.encodeMsg(channel, arg, codecInstance.(util.LRUCodec))
if err != nil {
sklog.Errorf("Error encoding outgoing message: %s", err)
return
}
ctx := context.Background()
pubResult := d.topic.Publish(ctx, msg)
if _, err = pubResult.Get(ctx); err != nil {
sklog.Errorf("Error publishing message: %s", err)
return
}
}()
}
// Publish the event locally.
d.localEventBus.Publish(channel, arg, false)
}
// SubscribeAsync implements the eventbus.EventBus interface.
func (d *distEventBus) SubscribeAsync(eventType string, callback eventbus.CallbackFn) {
d.localEventBus.SubscribeAsync(eventType, callback)
}
// setupTopicSub sets up the topic and subscription.
func (d *distEventBus) setupTopicSub(topicName, subscriberName string) error {
ctx := context.Background()
// Create the topic if it doesn't exist yet.
d.topic = d.client.Topic(topicName)
if exists, err := d.topic.Exists(ctx); err != nil {
return err
} else if !exists {
if d.topic, err = d.client.CreateTopic(ctx, topicName); err != nil {
return fmt.Errorf("Error creating pubsub topic '%s': %s", topicName, err)
}
}
// Create the subscription if it doesn't exist.
subName := fmt.Sprintf("%s+%s", subscriberName, topicName)
d.sub = d.client.Subscription(subName)
if exists, err := d.sub.Exists(ctx); err != nil {
return fmt.Errorf("Error checking existence of pubsub subscription '%s': %s", subName, err)
} else if !exists {
d.sub, err = d.client.CreateSubscription(ctx, subName, pubsub.SubscriptionConfig{
Topic: d.topic,
})
if err != nil {
return fmt.Errorf("Error creating pubsub subscription '%s': %s", subName, err)
}
}
// Make the subscription also the id of this client.
d.clientID = subName
return nil
}
// startReceiver start a goroutine that processes incoming pubsub messages
// and fires events on this node.
func (d *distEventBus) startReceiver() {
go func() {
ctx := context.Background()
for {
err := d.sub.Receive(ctx, d.processReceivedMsg)
if err != nil {
sklog.Errorf("Error receiving message: %s", err)
continue
}
}
}()
}
// processReceivedMsg handles each pubsub message that arrives. It unwrapps the
// enclosed channelWrapper and dispatches the event in this process unless the
// received message was sent by this node.
func (d *distEventBus) processReceivedMsg(ctx context.Context, msg *pubsub.Message) {
defer msg.Ack()
wrapper, data, err := d.decodeMsg(msg)
if err != nil {
sklog.Errorf("Error decoding message: %s", err)
return
}
// Publish the event locally if it hasn't been sent by this instance.
if wrapper.Sender != d.clientID {
d.localEventBus.Publish(wrapper.Channel, data, true)
}
}
// decodeMsg unwraps the channelWrapper instance contained in the pubsub message
// and returns the deserialized payload as an instance of interface{}.
func (d *distEventBus) decodeMsg(msg *pubsub.Message) (*channelWrapper, interface{}, error) {
// Unwrap the payload if this was wrapped in a channel wrapper.
var wrapper *channelWrapper = nil
payload := msg.Data
var codec util.LRUCodec = nil
if d.wrapperCodec != nil {
tempWrapper, err := d.wrapperCodec.Decode(payload)
if err != nil {
return nil, nil, fmt.Errorf("Error decoding message wrapper: %s", err)
}
wrapper = tempWrapper.(*channelWrapper)
payload = wrapper.Data
codecInst, ok := codecMap.Load(wrapper.Channel)
if !ok {
return nil, nil, fmt.Errorf("Unable to decode message for channel '%s'. No codec registered.", wrapper.Channel)
}
codec = codecInst.(util.LRUCodec)
}
// Deserialize the payload.
data, err := codec.Decode(payload)
if err != nil {
return nil, nil, fmt.Errorf("Unable to decode payload of pubsub event: %s", err)
}
return wrapper, data, nil
}
// encodeMsg wraps the given payload into an instance of channelWrapper and
// creates the necessary pubsub message to send it to the cloud.
func (d *distEventBus) encodeMsg(channel string, data interface{}, codec util.LRUCodec) (*pubsub.Message, error) {
payload, err := codec.Encode(data)
if err != nil {
return nil, err
}
if d.wrapperCodec != nil {
wrapper := &channelWrapper{
Sender: d.sub.ID(),
Channel: channel,
Data: payload,
}
var err error
if payload, err = d.wrapperCodec.Encode(wrapper); err != nil {
return nil, err
}
}
return &pubsub.Message{
Data: payload,
}, nil
}