blob: 9db33bcfe475852445aeafec1860ebd75ce8c62f [file] [log] [blame]
// Package pubsubsource implements source.Source using Google Cloud PubSub.
package pubsubsource
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"testing"
"time"
"cloud.google.com/go/pubsub"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/testutils/unittest"
"go.skia.org/infra/machine/go/machine"
"go.skia.org/infra/machine/go/machineserver/config"
"google.golang.org/api/option"
)
func setupPubSubClient(t *testing.T) (context.Context, *pubsub.Client, config.InstanceConfig) {
ctx := context.Background()
rand.Seed(time.Now().Unix())
instanceConfig := config.InstanceConfig{
Source: config.Source{
Project: "test-project",
Topic: fmt.Sprintf("events-%d", rand.Int63()),
},
}
ts, err := auth.NewDefaultTokenSource(true, pubsub.ScopePubSub)
require.NoError(t, err)
pubsubClient, err := pubsub.NewClient(ctx, instanceConfig.Source.Project, option.WithTokenSource(ts))
require.NoError(t, err)
// Create the topic.
topic := pubsubClient.Topic(instanceConfig.Source.Topic)
ok, err := topic.Exists(ctx)
require.NoError(t, err)
if !ok {
topic, err = pubsubClient.CreateTopic(ctx, instanceConfig.Source.Topic)
}
topic.Stop()
assert.NoError(t, err)
return ctx, pubsubClient, instanceConfig
}
func sendPubSubMessages(ctx context.Context, t *testing.T, pubsubClient *pubsub.Client, instanceConfig config.InstanceConfig) {
topic := pubsubClient.Topic(instanceConfig.Source.Topic)
msg := &pubsub.Message{
Data: []byte("This isn't valid JSON."),
}
res := topic.Publish(ctx, msg)
// Wait for the message to be sent.
_, err := res.Get(ctx)
require.NoError(t, err)
// Now publish a good message.
b, err := json.Marshal(machine.Event{
Host: machine.Host{
Name: "skia-rpi2-rack4-shelf1-001",
},
})
require.NoError(t, err)
msg = &pubsub.Message{
Data: b,
}
res = topic.Publish(ctx, msg)
// Wait for the message to be sent.
_, err = res.Get(ctx)
require.NoError(t, err)
topic.Stop()
}
func TestStart(t *testing.T) {
unittest.ManualTest(t)
unittest.RequiresPubSubEmulator(t)
ctx, pubsubClient, instanceConfig := setupPubSubClient(t)
ctx, cancel := context.WithCancel(ctx)
// Create source and call Start.
source, err := New(ctx, true, instanceConfig)
require.NoError(t, err)
ch, err := source.Start(ctx)
require.NoError(t, err)
sendPubSubMessages(ctx, t, pubsubClient, instanceConfig)
// Load the one file sendPubSubMessages should have sent.
event := <-ch
// Now cancel the context and wait for channel to close.
cancel()
for range ch {
}
assert.Equal(t, "skia-rpi2-rack4-shelf1-001", event.Host.Name)
assert.Equal(t, int64(2), source.eventsReceivedCounter.Get())
assert.Equal(t, int64(1), source.eventsFailedToParseCounter.Get())
assert.NoError(t, pubsubClient.Close())
}
func TestStart_SecondCallToStartFails(t *testing.T) {
unittest.ManualTest(t)
unittest.RequiresPubSubEmulator(t)
ctx, pubsubClient, instanceConfig := setupPubSubClient(t)
// Create source and call Start.
source, err := New(ctx, true, instanceConfig)
require.NoError(t, err)
_, err = source.Start(ctx)
require.NoError(t, err)
_, err = source.Start(ctx)
require.Error(t, err)
assert.NoError(t, pubsubClient.Close())
}