blob: 2183588842f2eb4de60af548cfb8fe3b28c0ec14 [file] [log] [blame]
package gcssource
import (
"context"
"encoding/json"
"io/ioutil"
"math/rand"
"testing"
"time"
"cloud.google.com/go/pubsub"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/testutils/unittest"
"go.skia.org/infra/perf/go/config"
"go.skia.org/infra/perf/go/file"
"google.golang.org/api/option"
)
const (
// testFile is a real file that we can read.
testFile = "gs://skia-infra/testdata/perf-files-gcssource-test.json"
)
func setupPubSubClient(t *testing.T) (*pubsub.Client, *config.InstanceConfig) {
ctx := context.Background()
rand.Seed(time.Now().Unix())
instanceConfig := &config.InstanceConfig{
IngestionConfig: config.IngestionConfig{
SourceConfig: config.SourceConfig{
Project: "skia-public",
Topic: "perf-file-source-test",
Sources: []string{"gs://skia-infra/testdata/"},
},
},
}
ts, err := auth.NewDefaultTokenSource(true, pubsub.ScopePubSub)
require.NoError(t, err)
pubsubClient, err := pubsub.NewClient(ctx, instanceConfig.IngestionConfig.SourceConfig.Project, option.WithTokenSource(ts))
require.NoError(t, err)
// Create the topic.
topic := pubsubClient.Topic(instanceConfig.IngestionConfig.SourceConfig.Topic)
ok, err := topic.Exists(ctx)
require.NoError(t, err)
if !ok {
topic, err = pubsubClient.CreateTopic(ctx, instanceConfig.IngestionConfig.SourceConfig.Topic)
}
topic.Stop()
assert.NoError(t, err)
return pubsubClient, instanceConfig
}
func sendPubSubMessages(ctx context.Context, t *testing.T, pubsubClient *pubsub.Client, instanceConfig *config.InstanceConfig) {
topic := pubsubClient.Topic(instanceConfig.IngestionConfig.SourceConfig.Topic)
// Publish two messages that looks like the arrival of a file, the first
// doesn't match Sources and so shouldn't get through.
b, err := json.Marshal(pubSubEvent{
Bucket: "skia-infra",
Name: "some-random-location/somefile.json",
})
require.NoError(t, err)
msg := &pubsub.Message{
Data: b,
}
res := topic.Publish(ctx, msg)
// Wait for the message to be sent.
_, err = res.Get(ctx)
require.NoError(t, err)
// Now publish a good message.
b, err = json.Marshal(pubSubEvent{
Bucket: "skia-infra",
Name: "testdata/perf-files-gcssource-test.json",
})
require.NoError(t, err)
msg = &pubsub.Message{
Data: b,
}
res = topic.Publish(ctx, msg)
// Wait for the message to be sent.
_, err = res.Get(ctx)
require.NoError(t, err)
}
func TestStart_ReceiveOneFileFilterOneFile(t *testing.T) {
unittest.ManualTest(t)
ctx := context.Background()
// Set up test.
pubsubClient, instanceConfig := setupPubSubClient(t)
// Send two events, but only one that is valid.
sendPubSubMessages(ctx, t, pubsubClient, instanceConfig)
// Create source and call Start.
source, err := New(ctx, instanceConfig, true)
require.NoError(t, err)
ch, err := source.Start(ctx)
require.NoError(t, err)
// Load the one file sendPubSubMessages should have sent.
file := <-ch
assert.Equal(t, testFile, file.Name)
b, err := ioutil.ReadAll(file.Contents)
assert.NoError(t, err)
assert.NoError(t, file.Contents.Close())
assert.Equal(t, "{\n \"status\": \"Success\"\n}\n", string(b))
}
func TestStart_SecondCallToStartFails(t *testing.T) {
unittest.ManualTest(t)
ctx := context.Background()
// Set up test.
_, instanceConfig := setupPubSubClient(t)
// Create source and call Start.
source, err := New(ctx, instanceConfig, true)
require.NoError(t, err)
_, err = source.Start(ctx)
require.NoError(t, err)
// A second call to Start should fail.
_, err = source.Start(ctx)
require.Error(t, err)
}
func TestReceiveSingleEvent_Success(t *testing.T) {
unittest.ManualTest(t)
ctx := context.Background()
// Set up test.
_, instanceConfig := setupPubSubClient(t)
// Create source.
source, err := New(ctx, instanceConfig, true)
// Swap out channel for one we control.
ch := make(chan file.File, 1)
source.fileChannel = ch
require.NoError(t, err)
// Create an encoded message that points to a real GCS file.
b, err := json.Marshal(pubSubEvent{
Bucket: "skia-infra",
Name: "testdata/perf-files-gcssource-test.json",
})
require.NoError(t, err)
msg := &pubsub.Message{
Data: b,
}
assert.True(t, source.receiveSingleEvent(ctx, msg))
// Confirm the correct file.File comes out of the channel.
file := <-ch
assert.Equal(t, testFile, file.Name)
b, err = ioutil.ReadAll(file.Contents)
assert.NoError(t, err)
assert.NoError(t, file.Contents.Close())
assert.Equal(t, "{\n \"status\": \"Success\"\n}\n", string(b))
}
func TestReceiveSingleEvent_FileDoesntExist(t *testing.T) {
unittest.ManualTest(t)
ctx := context.Background()
// Set up test.
_, instanceConfig := setupPubSubClient(t)
// Create source.
source, err := New(ctx, instanceConfig, true)
require.NoError(t, err)
// Create an encoded message that points to a non-existent GCS file.
b, err := json.Marshal(pubSubEvent{
Bucket: "skia-infra",
Name: "testdata/some-file-that-doesnt-exist.json",
})
require.NoError(t, err)
msg := &pubsub.Message{
Data: b,
}
assert.False(t, source.receiveSingleEvent(ctx, msg))
}
func TestReceiveSingleEvent_InvalidJSONInMessage(t *testing.T) {
unittest.ManualTest(t)
ctx := context.Background()
// Set up test.
_, instanceConfig := setupPubSubClient(t)
// Create source.
source, err := New(ctx, instanceConfig, true)
require.NoError(t, err)
// Send a message that isn't valid JSON.
msg := &pubsub.Message{
Data: []byte("this isn't valid json"),
}
assert.True(t, source.receiveSingleEvent(ctx, msg))
}