blob: ef3723afbf6edde2e6eef10b24085e8689a40418 [file] [log] [blame]
// Package pubsub contains utilities for working with Cloud PubSub.
package pubsub
import (
"context"
"os"
"time"
"cloud.google.com/go/iam"
"cloud.google.com/go/pubsub"
"go.skia.org/infra/go/emulators"
"go.skia.org/infra/go/skerr"
"google.golang.org/api/option"
)
// ScopePubSub aliases the same constant from the pubsub package to prevent
// callers from needing to import both packages.
const ScopePubSub = pubsub.ScopePubSub
// EnsureNotEmulator panics if the PubSub emulator environment variable is set.
func EnsureNotEmulator() {
if os.Getenv(string(emulators.PubSub)) != "" {
panic("PubSub Emulator detected. Be sure to unset the following environment variable: " + emulators.GetEmulatorHostEnvVarName(emulators.PubSub))
}
}
// Client represents a pubsub.Client in an interface which can be mocked for
// testing.
type Client interface {
Close() error
CreateSubscription(ctx context.Context, id string, cfg pubsub.SubscriptionConfig) (Subscription, error)
CreateTopic(ctx context.Context, topicID string) (Topic, error)
CreateTopicWithConfig(ctx context.Context, topicID string, tc *pubsub.TopicConfig) (Topic, error)
DetachSubscription(ctx context.Context, sub string) (*pubsub.DetachSubscriptionResult, error)
Project() string
Snapshot(id string) Snapshot
Snapshots(ctx context.Context) *pubsub.SnapshotConfigIterator
Subscription(id string) Subscription
SubscriptionInProject(id, projectID string) Subscription
Subscriptions(ctx context.Context) *pubsub.SubscriptionIterator
Topic(id string) Topic
TopicInProject(id, projectID string) Topic
Topics(ctx context.Context) *pubsub.TopicIterator
}
// Snapshot represents a pubsub.Snapshot in an interface which can be mocked for
// testing.
type Snapshot interface {
Delete(ctx context.Context) error
ID() string
SetLabels(ctx context.Context, label map[string]string) (*pubsub.SnapshotConfig, error)
}
// Subscription represents a pubsub.Subscription in an interface which can be
// mocked for testing.
type Subscription interface {
Config(ctx context.Context) (pubsub.SubscriptionConfig, error)
CreateSnapshot(ctx context.Context, name string) (*pubsub.SnapshotConfig, error)
Delete(ctx context.Context) error
Exists(ctx context.Context) (bool, error)
IAM() *iam.Handle
ID() string
Receive(ctx context.Context, f func(context.Context, *pubsub.Message)) error
SeekToSnapshot(ctx context.Context, snap Snapshot) error
SeekToTime(ctx context.Context, t time.Time) error
String() string
Update(ctx context.Context, cfg pubsub.SubscriptionConfigToUpdate) (pubsub.SubscriptionConfig, error)
}
// Topic represents a pubsub.Topic in an interface which can be mocked for
// testing.
type Topic interface {
Config(ctx context.Context) (pubsub.TopicConfig, error)
Delete(ctx context.Context) error
Exists(ctx context.Context) (bool, error)
Flush()
IAM() *iam.Handle
ID() string
Publish(ctx context.Context, msg *pubsub.Message) PublishResult
ResumePublish(orderingKey string)
Stop()
String() string
Subscriptions(ctx context.Context) *pubsub.SubscriptionIterator
Update(ctx context.Context, cfg pubsub.TopicConfigToUpdate) (pubsub.TopicConfig, error)
}
// PublishResult represents a pubsub.PublishResult in an interface which can be
// mocked for testing.
type PublishResult interface {
Get(ctx context.Context) (serverID string, err error)
Ready() <-chan struct{}
}
// WrappedClient implements Client by wrapping a pubsub.Client.
type WrappedClient struct {
*pubsub.Client
}
// NewClient creates a new WrappedClient.
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*WrappedClient, error) {
client, err := pubsub.NewClient(ctx, projectID, opts...)
if err != nil {
return nil, skerr.Wrap(err)
}
return &WrappedClient{client}, nil
}
// NewClientWithConfig creates a new WrappedClient with the given config..
func NewClientWithConfig(ctx context.Context, projectID string, config *pubsub.ClientConfig, opts ...option.ClientOption) (*WrappedClient, error) {
client, err := pubsub.NewClientWithConfig(ctx, projectID, config, opts...)
if err != nil {
return nil, skerr.Wrap(err)
}
return &WrappedClient{client}, nil
}
// CreateSubscription implements Client.
func (w *WrappedClient) CreateSubscription(ctx context.Context, id string, cfg pubsub.SubscriptionConfig) (Subscription, error) {
sub, err := w.Client.CreateSubscription(ctx, id, cfg)
if err != nil {
return nil, skerr.Wrap(err)
}
return &WrappedSubscription{sub}, nil
}
// CreateTopic implements Client.
func (w *WrappedClient) CreateTopic(ctx context.Context, topicID string) (Topic, error) {
topic, err := w.Client.CreateTopic(ctx, topicID)
if err != nil {
return nil, skerr.Wrap(err)
}
return &WrappedTopic{topic}, nil
}
// CreateTopicWithConfig implements Client.
func (w *WrappedClient) CreateTopicWithConfig(ctx context.Context, topicID string, tc *pubsub.TopicConfig) (Topic, error) {
topic, err := w.Client.CreateTopicWithConfig(ctx, topicID, tc)
if err != nil {
return nil, skerr.Wrap(err)
}
return &WrappedTopic{topic}, nil
}
// Snapshot implements Client.
func (w *WrappedClient) Snapshot(id string) Snapshot {
return w.Client.Snapshot(id)
}
// Subscription implements Client.
func (w *WrappedClient) Subscription(id string) Subscription {
return &WrappedSubscription{w.Client.Subscription(id)}
}
// SubscriptionInProject implements Client.
func (w *WrappedClient) SubscriptionInProject(id, projectID string) Subscription {
return &WrappedSubscription{w.Client.SubscriptionInProject(id, projectID)}
}
// Topic implements Client.
func (w *WrappedClient) Topic(id string) Topic {
return &WrappedTopic{w.Client.Topic(id)}
}
// TopicInProject implements Client.
func (w *WrappedClient) TopicInProject(id, projectID string) Topic {
return &WrappedTopic{w.Client.TopicInProject(id, projectID)}
}
// WrappedSubscription implements Subscription by wrapping a
// pubsub.Subscription.
type WrappedSubscription struct {
*pubsub.Subscription
}
// SeekToSnapshot implements Client.
func (w *WrappedSubscription) SeekToSnapshot(ctx context.Context, snap Snapshot) error {
wrapped, ok := snap.(*pubsub.Snapshot)
if !ok {
return skerr.Fmt("expected a pubsub.Snapshot")
}
return w.Subscription.SeekToSnapshot(ctx, wrapped)
}
// WrappedTopic implements Topic by wrapping a pubsub.Topic.
type WrappedTopic struct {
*pubsub.Topic
}
// Publish implements Client.
func (w *WrappedTopic) Publish(ctx context.Context, msg *pubsub.Message) PublishResult {
return w.Topic.Publish(ctx, msg)
}
// Assert that we implement the interfaces.
var _ Client = &WrappedClient{}
var _ Subscription = &WrappedSubscription{}
var _ Topic = &WrappedTopic{}