blob: b3ebdacc727fabe93c980fc6ec2f93687666da23 [file] [log] [blame]
package sink
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"testing"
"time"
"cloud.google.com/go/pubsub"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/testutils/unittest"
"go.skia.org/infra/machine/go/machine"
"go.skia.org/infra/machine/go/machineserver/config"
"google.golang.org/api/option"
)
func setupPubSubClient(t *testing.T) (context.Context, *pubsub.Client, *pubsub.Subscription, config.InstanceConfig) {
ctx := context.Background()
rand.Seed(time.Now().Unix())
instanceConfig := config.InstanceConfig{
Source: config.Source{
Project: "test-project",
Topic: fmt.Sprintf("sink-%d", rand.Int63()),
},
}
ts, err := auth.NewDefaultTokenSource(true, pubsub.ScopePubSub)
require.NoError(t, err)
pubsubClient, err := pubsub.NewClient(ctx, instanceConfig.Source.Project, option.WithTokenSource(ts))
require.NoError(t, err)
// Create the topic.
topic := pubsubClient.Topic(instanceConfig.Source.Topic)
ok, err := topic.Exists(ctx)
require.NoError(t, err)
if !ok {
topic, err = pubsubClient.CreateTopic(ctx, instanceConfig.Source.Topic)
}
topic.Stop()
assert.NoError(t, err)
sub, err := pubsubClient.CreateSubscription(ctx, instanceConfig.Source.Topic, pubsub.SubscriptionConfig{
Topic: topic,
})
require.NoError(t, err)
return ctx, pubsubClient, sub, instanceConfig
}
func TestSink(t *testing.T) {
unittest.ManualTest(t)
unittest.RequiresPubSubEmulator(t)
ctx, _, sub, instanceConfig := setupPubSubClient(t)
// Create new sink.
s, err := New(ctx, true, instanceConfig)
require.NoError(t, err)
// Create event to send.
event := machine.Event{
EventType: machine.EventTypeRawState,
Android: machine.Android{
GetProp: `
[ro.product.manufacturer]: [asus]
[ro.product.model]: [Nexus 7]
[ro.product.name]: [razor]
`,
},
Host: machine.Host{
Name: "my-machine-id",
},
}
// Send the event.
err = s.Send(ctx, event)
require.NoError(t, err)
// Confirm that the event was sent correctly.
called := false
cancellableCtx, cancel := context.WithCancel(ctx)
err = sub.Receive(cancellableCtx, func(ctx context.Context, m *pubsub.Message) {
// cancel so sub.Receive returns.
cancel()
called = true
m.Ack()
var receivedEvent machine.Event
err := json.Unmarshal(m.Data, &receivedEvent)
require.NoError(t, err)
assert.Equal(t, receivedEvent, event)
})
require.NoError(t, err)
assert.True(t, called)
}