[perf] Send pubsub events as ingestion completes.
Change-Id: Iba5f48f22c662c85478eb80d29d45ed06ec58dee
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/279418
Reviewed-by: Joe Gregorio <jcgregorio@google.com>
Commit-Queue: Joe Gregorio <jcgregorio@google.com>
diff --git a/go/testutils/unittest/unittest.go b/go/testutils/unittest/unittest.go
index 54f0ac6..4326d35 100644
--- a/go/testutils/unittest/unittest.go
+++ b/go/testutils/unittest/unittest.go
@@ -123,6 +123,24 @@
}
}
+// RequiresPubSubEmulator is a function that documents a unittest requires the
+// PubSub Emulator and checks that the appropriate environment variable is set.
+func RequiresPubSubEmulator(t sktest.TestingT) {
+ s := os.Getenv("PUBSUB_EMULATOR_HOST")
+ if s == "" {
+ t.Fatal(`This test requires the PubSub emulator, which you can start with
+
+ docker run -ti -p 8010:8010 google/cloud-sdk:latest gcloud beta emulators pubsub start \
+ --project test-project --host-port 0.0.0.0:8010
+
+and then set the environment:
+
+ export PUBSUB_EMULATOR_HOST=localhost:8010
+
+`)
+ }
+}
+
// LinuxOnlyTest is a function which should be called at the beginning of a test
// which should only run on Linux.
func LinuxOnlyTest(t sktest.TestingT) {
diff --git a/perf/Makefile b/perf/Makefile
index ef4bf4e..c779546 100644
--- a/perf/Makefile
+++ b/perf/Makefile
@@ -70,6 +70,10 @@
docker run -ti -p 8000:8000 google/cloud-sdk:latest gcloud beta emulators bigtable start \
--project test-project --host-port 0.0.0.0:8000
+start_pubsub_emulator:
+ docker run -ti -p 8000:8000 google/cloud-sdk:latest gcloud beta emulators pubsub start \
+ --project test-project --host-port 0.0.0.0:8010
+
start_datastore_emulator:
docker run -ti -p 8008:8008 google/cloud-sdk:latest gcloud beta emulators datastore start --no-store-on-disk --project test-project --host-port 0.0.0.0:8008
diff --git a/perf/go/file/gcssource/gcssource_test.go b/perf/go/file/gcssource/gcssource_test.go
index 2183588..52e47d3 100644
--- a/perf/go/file/gcssource/gcssource_test.go
+++ b/perf/go/file/gcssource/gcssource_test.go
@@ -24,6 +24,7 @@
)
func setupPubSubClient(t *testing.T) (*pubsub.Client, *config.InstanceConfig) {
+ unittest.RequiresPubSubEmulator(t)
ctx := context.Background()
rand.Seed(time.Now().Unix())
diff --git a/perf/go/ingest/process/process.go b/perf/go/ingest/process/process.go
index a7f5a08..5c84171 100644
--- a/perf/go/ingest/process/process.go
+++ b/perf/go/ingest/process/process.go
@@ -5,16 +5,54 @@
"context"
"time"
+ "cloud.google.com/go/pubsub"
+ "go.skia.org/infra/go/auth"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/paramtools"
+ "go.skia.org/infra/go/query"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/perf/go/builders"
"go.skia.org/infra/perf/go/config"
perfgit "go.skia.org/infra/perf/go/git"
"go.skia.org/infra/perf/go/ingest/parser"
+ "go.skia.org/infra/perf/go/ingestevents"
+ "google.golang.org/api/option"
)
+// sendPubSubEvent sends the unencoded params and paramset found in a single
+// ingested file to the PubSub topic specified in the selected Perf instances
+// configuration data.
+func sendPubSubEvent(pubSubClient *pubsub.Client, topicName string, params []paramtools.Params, paramset paramtools.ParamSet, filename string) error {
+ if topicName == "" {
+ return nil
+ }
+ traceIDs := make([]string, 0, len(params))
+ for _, p := range params {
+ key, err := query.MakeKey(p)
+ if err != nil {
+ continue
+ }
+ traceIDs = append(traceIDs, key)
+ }
+ ie := &ingestevents.IngestEvent{
+ TraceIDs: traceIDs,
+ ParamSet: paramset,
+ Filename: filename,
+ }
+ body, err := ingestevents.CreatePubSubBody(ie)
+ if err != nil {
+ return skerr.Wrapf(err, "Failed to encode PubSub body for topic: %q", topicName)
+ }
+ msg := &pubsub.Message{
+ Data: body,
+ }
+ ctx := context.Background()
+ _, err = pubSubClient.Topic(topicName).Publish(ctx, msg).Get(ctx)
+
+ return skerr.Wrap(err)
+}
+
// Start a single go routine to process incoming ingestion files and write
// the data they contain to a trace store.
//
@@ -28,6 +66,19 @@
failedToWrite := metrics2.GetCounter("perfserver_ingest_failed_to_write")
successfulWrite := metrics2.GetCounter("perfserver_ingest_successful_write")
+ var pubSubClient *pubsub.Client
+ if instanceConfig.IngestionConfig.FileIngestionTopicName != "" {
+ ts, err := auth.NewDefaultTokenSource(false, pubsub.ScopePubSub)
+ if err != nil {
+ sklog.Fatalf("Failed to create TokenSource: %s", err)
+ }
+
+ pubSubClient, err = pubsub.NewClient(ctx, instanceConfig.IngestionConfig.SourceConfig.Project, option.WithTokenSource(ts))
+ if err != nil {
+ sklog.Fatal(err)
+ }
+ }
+
// New file.Source.
source, err := builders.NewSourceFromConfig(ctx, instanceConfig, false)
ch, err := source.Start(ctx)
@@ -84,6 +135,12 @@
sklog.Error("Failed to write %v: %s", f, err)
}
successfulWrite.Inc(1)
+
+ if err := sendPubSubEvent(pubSubClient, instanceConfig.IngestionConfig.FileIngestionTopicName, params, ps, f.Name); err != nil {
+ sklog.Errorf("Failed to send pubsub event: %s", err)
+ } else {
+ sklog.Info("FileIngestionTopicName pubsub message sent.")
+ }
}
sklog.Infof("Exited while waiting on files. Should only happen on source_type=dir.")
return nil
diff --git a/perf/go/ingest/process/process_test.go b/perf/go/ingest/process/process_test.go
index 94e62fc..fb7e0d5 100644
--- a/perf/go/ingest/process/process_test.go
+++ b/perf/go/ingest/process/process_test.go
@@ -3,17 +3,58 @@
import (
"context"
+ "fmt"
"io/ioutil"
+ "math/rand"
"os"
+ "sync"
"testing"
+ "time"
+ "cloud.google.com/go/pubsub"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "go.skia.org/infra/go/auth"
"go.skia.org/infra/go/metrics2"
+ "go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/testutils/unittest"
"go.skia.org/infra/perf/go/config"
+ "go.skia.org/infra/perf/go/ingestevents"
+ "google.golang.org/api/option"
)
+func setupPubSubClient(t *testing.T) (*pubsub.Client, *config.InstanceConfig) {
+ unittest.RequiresPubSubEmulator(t)
+ ctx := context.Background()
+
+ rand.Seed(time.Now().Unix())
+ instanceConfig := &config.InstanceConfig{
+ IngestionConfig: config.IngestionConfig{
+ SourceConfig: config.SourceConfig{
+ Project: "test-project",
+ },
+ FileIngestionTopicName: fmt.Sprintf("some-topic-%d", rand.Uint64()),
+ },
+ }
+
+ ts, err := auth.NewDefaultTokenSource(true, pubsub.ScopePubSub)
+ require.NoError(t, err)
+ pubsubClient, err := pubsub.NewClient(ctx, instanceConfig.IngestionConfig.SourceConfig.Project, option.WithTokenSource(ts))
+ require.NoError(t, err)
+
+ // Create the topic.
+ topic := pubsubClient.Topic(instanceConfig.IngestionConfig.FileIngestionTopicName)
+ ok, err := topic.Exists(ctx)
+ require.NoError(t, err)
+ if !ok {
+ topic, err = pubsubClient.CreateTopic(ctx, instanceConfig.IngestionConfig.FileIngestionTopicName)
+ }
+ topic.Stop()
+ assert.NoError(t, err)
+
+ return pubsubClient, instanceConfig
+}
+
func TestStart_IngestDemoRepoWithSQLite3TraceStore_Success(t *testing.T) {
unittest.LargeTest(t)
@@ -59,3 +100,57 @@
assert.Equal(t, int64(1), metrics2.GetCounter("perfserver_ingest_bad_githash").Get())
assert.Equal(t, int64(1), metrics2.GetCounter("perfserver_ingest_failed_to_parse").Get())
}
+
+func TestSendPubSubEvent_Success(t *testing.T) {
+ unittest.ManualTest(t)
+ client, instanceConfig := setupPubSubClient(t)
+ ctx := context.Background()
+
+ // Setup the data to send via pubsub.
+ params := []paramtools.Params{
+ {
+ "arch": "x86",
+ "config": "8888",
+ },
+ {
+ "arch": "arm",
+ "config": "565",
+ },
+ }
+ ps := paramtools.NewParamSet(params...)
+
+ // Create the subscription before the pubsub message is sent, otherwise the emulator won't deliver it.
+ topic := client.Topic(instanceConfig.IngestionConfig.FileIngestionTopicName)
+ // Create a subscription with the same name as the topic since the name is
+ // random and won't have a conflict.
+ sub, err := client.CreateSubscription(ctx, instanceConfig.IngestionConfig.FileIngestionTopicName, pubsub.SubscriptionConfig{
+ Topic: topic,
+ })
+ require.NoError(t, err)
+
+ // Setup to receive the pubsub message we will send.
+ cctx, cancel := context.WithCancel(ctx)
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
+ ev, err := ingestevents.DecodePubSubBody(msg.Data)
+ require.NoError(t, err)
+ assert.Equal(t, "somefile.json", ev.Filename)
+ assert.Equal(t, ps, ev.ParamSet)
+ wg.Done()
+ })
+ require.NoError(t, err)
+ }()
+
+ // Now we can finally send the message.
+ err = sendPubSubEvent(client, instanceConfig.IngestionConfig.FileIngestionTopicName, params, ps, "somefile.json")
+ require.NoError(t, err)
+
+ // Wait for one message to be delivered.
+ wg.Wait()
+
+ // Stop the receiver.
+ cancel()
+ topic.Stop()
+}