blob: 98ff9172b158f9d1bfd94f1fd25f661489b8783a [file] [log] [blame]
// Package common has functions useful across its peer modules.
package common
import (
"context"
"cloud.google.com/go/pubsub"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/machine/go/machineserver/config"
"google.golang.org/api/option"
)
// NewPubSubClient creates a new pubsub client from the given config and also
// creates the associated topic specified in the instance config.
func NewPubSubClient(ctx context.Context, local bool, instanceConfig config.InstanceConfig) (*pubsub.Client, *pubsub.Topic, error) {
ts, err := auth.NewDefaultTokenSource(local, pubsub.ScopePubSub)
if err != nil {
return nil, nil, skerr.Wrapf(err, "Failed to create token source.")
}
pubsubClient, err := pubsub.NewClient(ctx, instanceConfig.Source.Project, option.WithTokenSource(ts))
if err != nil {
return nil, nil, skerr.Wrapf(err, "Failed to create PubSub client for project %s", instanceConfig.Source.Project)
}
topic := pubsubClient.Topic(instanceConfig.Source.Topic)
exists, err := topic.Exists(ctx)
if err != nil {
return nil, nil, skerr.Wrapf(err, "Failed to check existence of PubSub topic %q", topic.ID())
}
if !exists {
if _, err := pubsubClient.CreateTopic(ctx, topic.ID()); err != nil {
return nil, nil, skerr.Wrapf(err, "Failed to create PubSub topic %q", topic.ID())
}
}
return pubsubClient, topic, nil
}