blob: 46be5d2770ca5c2c4278492e46cd1364dd7b2a6f [file] [log] [blame]
package pubsub
import (
"context"
"fmt"
"sync"
"cloud.google.com/go/pubsub"
"go.skia.org/infra/go/cleanup"
"go.skia.org/infra/go/gitstore/bt_gitstore"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"golang.org/x/oauth2"
"google.golang.org/api/option"
)
const (
// Auth scope required to use this package.
AUTH_SCOPE = pubsub.ScopePubSub
// Template used for building pubsub topic names.
TOPIC_TMPL = "gitstore-%s-%s-%d"
)
// topicName returns the pubsub topic name for the given BT instance and repo.
func topicName(btInstance, btTable string, repoID int64) string {
return fmt.Sprintf(TOPIC_TMPL, btInstance, btTable, repoID)
}
// client is a struct used for common setup between Publisher and Subscriber.
type client struct {
client *pubsub.Client
topic *pubsub.Topic
}
// newClient returns a client instance, creating the PubSub topic if requested.
func newClient(ctx context.Context, btConf *bt_gitstore.BTConfig, repoID int64, ts oauth2.TokenSource, createTopic bool) (*client, error) {
c, err := pubsub.NewClient(ctx, btConf.ProjectID, option.WithTokenSource(ts))
if err != nil {
return nil, skerr.Wrapf(err, "failed to create PubSub client for project %s", btConf.ProjectID)
}
t := c.Topic(topicName(btConf.InstanceID, btConf.TableID, repoID))
exists, err := t.Exists(ctx)
if err != nil {
return nil, skerr.Wrapf(err, "failed to check existence of PubSub topic %q", t.ID())
}
if !exists {
if !createTopic {
return nil, skerr.Fmt("PubSub topic %q does not exist; verify that the requested repo is being ingested: %d", t.ID(), repoID)
}
if _, err := c.CreateTopic(ctx, t.ID()); err != nil {
return nil, skerr.Wrapf(err, "failed to create PubSub topic %q for %d", t.ID(), repoID)
}
}
return &client{
client: c,
topic: t,
}, nil
}
// Publisher is a struct used for publishing pubsub messages for a GitStore.
type Publisher struct {
*client
queued sync.WaitGroup
}
// NewPublisher returns a Publisher instance associated with the given GitStore.
func NewPublisher(ctx context.Context, btConf *bt_gitstore.BTConfig, repoID int64, ts oauth2.TokenSource) (*Publisher, error) {
client, err := newClient(ctx, btConf, repoID, ts, true)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to create GitStore PubSub publisher")
}
p := &Publisher{
client: client,
}
cleanup.AtExit(func() {
p.Wait()
})
return p, nil
}
// Publish a pubsub message with the given updated branch heads. Typically, only
// the branch heads which have changed should be included. The message is sent
// asynchronously.
func (p *Publisher) Publish(ctx context.Context, branches map[string]string) {
res := p.topic.Publish(ctx, &pubsub.Message{
// TODO(borenet): Is it valid to add arbitrary data to this
// field? The docs do not indicate otherwise, and this is more
// convenient than having to encode/decode the map ourselves.
Attributes: branches,
})
p.queued.Add(1)
go func() {
defer p.queued.Done()
if _, err := res.Get(ctx); err != nil {
sklog.Errorf("Failed to send pubsub message: %s", err)
}
}()
}
// Wait for all pubsub messages to be sent.
func (p *Publisher) Wait() {
sklog.Info("Waiting for pubsub messages to be sent...")
p.queued.Wait()
sklog.Info("All pubsub messages have been sent.")
}
// NewSubscriber creates a pubsub subscription associated with the given
// GitStore and calls the given function whenever a message is received. The
// parameters to the callback function are the message itself and the branch
// heads as of the time that the message was sent, with names as keys and commit
// hashes as values. The callback function is responsible for calling Ack() or
// Nack() on the message.
func NewSubscriber(ctx context.Context, btConf *bt_gitstore.BTConfig, subscriberID string, repoID int64, ts oauth2.TokenSource, callback func(*pubsub.Message, map[string]string)) error {
c, err := newClient(ctx, btConf, repoID, ts, false)
if err != nil {
return skerr.Wrapf(err, "Failed to create GitStore PubSub subscriber")
}
sub := c.client.Subscription(c.topic.ID() + "_" + subscriberID)
exists, err := sub.Exists(ctx)
if err != nil {
return skerr.Wrapf(err, "Failed to check existence of PubSub subscription %q", sub.ID())
}
if !exists {
_, err := c.client.CreateSubscription(ctx, sub.ID(), pubsub.SubscriptionConfig{
Topic: c.topic,
})
if err != nil {
return skerr.Wrapf(err, "Failed to create PubSub subscription %q", sub.ID())
}
}
go func() {
if err := sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
select {
case <-ctx.Done():
sklog.Warning("Received pubsub message but the context has been canceled.")
m.Nack()
default:
callback(m, m.Attributes)
}
}); err != nil {
sklog.Errorf("Pubsub subscription (ID %q) receive failed: %s", sub.ID(), err)
}
}()
return nil
}