blob: 25a1dd6d65cb32eb5d6815492d82c78d778d6e6b [file] [log] [blame]
package main
import (
"bufio"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"os"
"sync"
"time"
"cloud.google.com/go/pubsub"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
)
var (
project = flag.String("project_id", "", "GCE Project ID")
topicName = flag.String("topic", "", "Pubsub topic name")
file = flag.String("file", "", "File from which to read/write pubsub messages.")
record = flag.Bool("record", false, "If true, record pubsub messages and write them to the given file.")
playback = flag.Bool("playback", false, "If true, read messages from the given file and send them via pubsub.")
waitForInput = flag.Bool("wait_for_input", false, "If true, wait for user input before playing back each message in playback mode.")
)
func main() {
// Initial setup.
common.Init()
if *project == "" {
sklog.Fatal("--project_id is required.")
}
if *topicName == "" {
sklog.Fatal("--topic is required.")
}
if *file == "" {
sklog.Fatal("--file is required.")
}
if *record == *playback {
sklog.Fatal("Exactly one of --playback or --record is required.")
}
// Setup pubsub.
ctx := context.Background()
client, err := pubsub.NewClient(ctx, *project)
if err != nil {
sklog.Fatal(err)
}
topic := client.Topic(*topicName)
if exists, err := topic.Exists(ctx); err != nil {
sklog.Fatal(err)
} else if !exists {
topic, err = client.CreateTopic(ctx, *topicName)
if err != nil {
sklog.Fatal(err)
}
}
// Perform the desired action.
if *record {
doRecord(ctx, client, *file)
} else {
doPlayback(ctx, client, *file)
}
}
// doRecord records pubsub messages until the process is interrupted.
func doRecord(ctx context.Context, client *pubsub.Client, file string) {
// Create a subscription.
hostname, err := os.Hostname()
if err != nil {
sklog.Fatal(err)
}
subName := fmt.Sprintf("pubsub_recorder_%s", hostname)
sub := client.Subscription(subName)
if exists, err := sub.Exists(ctx); err != nil {
sklog.Fatal(err)
} else if exists {
if err := sub.Delete(ctx); err != nil {
sklog.Fatal(err)
}
}
sub, err = client.CreateSubscription(ctx, subName, pubsub.SubscriptionConfig{
Topic: client.Topic(*topicName),
AckDeadline: 10 * time.Second,
})
if err != nil {
sklog.Fatal(err)
}
defer func() {
if err := sub.Delete(ctx); err != nil {
sklog.Fatalf("Failed to delete subscription: %s", err)
}
}()
// Receive messages one at a time and write them to the file. Use a
// mutex to serialize writes to the backing file.
sklog.Infof("Waiting to receive pubsub messages.")
msgs := []*pubsub.Message{}
var mtx sync.Mutex
if err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
mtx.Lock()
defer mtx.Unlock()
sklog.Infof("Processing pubsub message %s", msg.ID)
msgs = append(msgs, msg)
if err := util.WithWriteFile(file, func(w io.Writer) error {
return json.NewEncoder(w).Encode(msgs)
}); err != nil {
sklog.Fatal(err)
}
msg.Ack()
}); err != nil {
sklog.Fatal(err)
}
}
// doPlayback plays back all pubsub messages in the given file.
func doPlayback(ctx context.Context, client *pubsub.Client, file string) {
topic := client.Topic(*topicName)
defer topic.Stop()
b, err := os.ReadFile(file)
if err != nil {
sklog.Fatal(err)
}
var msgs []*pubsub.Message
if err := json.Unmarshal(b, &msgs); err != nil {
sklog.Fatal(err)
}
for _, msg := range msgs {
if *waitForInput {
sklog.Infof("Press enter key to send next message...")
if _, err := bufio.NewReader(os.Stdin).ReadBytes('\n'); err != nil {
sklog.Fatal(err)
}
}
sklog.Infof("Publishing message %s", msg.ID)
if _, err := topic.Publish(ctx, msg).Get(ctx); err != nil {
sklog.Fatal(err)
}
}
}