[perf] Send pubsub events as ingestion completes.

Change-Id: Iba5f48f22c662c85478eb80d29d45ed06ec58dee
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/279418
Reviewed-by: Joe Gregorio <jcgregorio@google.com>
Commit-Queue: Joe Gregorio <jcgregorio@google.com>
diff --git a/go/testutils/unittest/unittest.go b/go/testutils/unittest/unittest.go
index 54f0ac6..4326d35 100644
--- a/go/testutils/unittest/unittest.go
+++ b/go/testutils/unittest/unittest.go
@@ -123,6 +123,24 @@
 	}
 }
 
+// RequiresPubSubEmulator is a function that documents a unittest requires the
+// PubSub Emulator and checks that the appropriate environment variable is set.
+func RequiresPubSubEmulator(t sktest.TestingT) {
+	s := os.Getenv("PUBSUB_EMULATOR_HOST")
+	if s == "" {
+		t.Fatal(`This test requires the PubSub emulator, which you can start with
+
+    docker run -ti -p 8010:8010 google/cloud-sdk:latest gcloud beta emulators pubsub start \
+		--project test-project --host-port 0.0.0.0:8010
+
+and then set the environment:
+
+    export PUBSUB_EMULATOR_HOST=localhost:8010
+
+`)
+	}
+}
+
 // LinuxOnlyTest is a function which should be called at the beginning of a test
 // which should only run on Linux.
 func LinuxOnlyTest(t sktest.TestingT) {
diff --git a/perf/Makefile b/perf/Makefile
index ef4bf4e..c779546 100644
--- a/perf/Makefile
+++ b/perf/Makefile
@@ -70,6 +70,10 @@
 	docker run -ti -p 8000:8000 google/cloud-sdk:latest gcloud beta emulators bigtable start \
 		  --project test-project --host-port 0.0.0.0:8000
 
+start_pubsub_emulator:
+	docker run -ti -p 8000:8000 google/cloud-sdk:latest gcloud beta emulators pubsub start \
+		  --project test-project --host-port 0.0.0.0:8010
+
 start_datastore_emulator:
 	 docker run -ti -p 8008:8008 google/cloud-sdk:latest gcloud beta emulators datastore start --no-store-on-disk --project test-project --host-port 0.0.0.0:8008
 
diff --git a/perf/go/file/gcssource/gcssource_test.go b/perf/go/file/gcssource/gcssource_test.go
index 2183588..52e47d3 100644
--- a/perf/go/file/gcssource/gcssource_test.go
+++ b/perf/go/file/gcssource/gcssource_test.go
@@ -24,6 +24,7 @@
 )
 
 func setupPubSubClient(t *testing.T) (*pubsub.Client, *config.InstanceConfig) {
+	unittest.RequiresPubSubEmulator(t)
 	ctx := context.Background()
 
 	rand.Seed(time.Now().Unix())
diff --git a/perf/go/ingest/process/process.go b/perf/go/ingest/process/process.go
index a7f5a08..5c84171 100644
--- a/perf/go/ingest/process/process.go
+++ b/perf/go/ingest/process/process.go
@@ -5,16 +5,54 @@
 	"context"
 	"time"
 
+	"cloud.google.com/go/pubsub"
+	"go.skia.org/infra/go/auth"
 	"go.skia.org/infra/go/metrics2"
 	"go.skia.org/infra/go/paramtools"
+	"go.skia.org/infra/go/query"
 	"go.skia.org/infra/go/skerr"
 	"go.skia.org/infra/go/sklog"
 	"go.skia.org/infra/perf/go/builders"
 	"go.skia.org/infra/perf/go/config"
 	perfgit "go.skia.org/infra/perf/go/git"
 	"go.skia.org/infra/perf/go/ingest/parser"
+	"go.skia.org/infra/perf/go/ingestevents"
+	"google.golang.org/api/option"
 )
 
+// sendPubSubEvent sends the unencoded params and paramset found in a single
+// ingested file to the PubSub topic specified in the selected Perf instances
+// configuration data.
+func sendPubSubEvent(pubSubClient *pubsub.Client, topicName string, params []paramtools.Params, paramset paramtools.ParamSet, filename string) error {
+	if topicName == "" {
+		return nil
+	}
+	traceIDs := make([]string, 0, len(params))
+	for _, p := range params {
+		key, err := query.MakeKey(p)
+		if err != nil {
+			continue
+		}
+		traceIDs = append(traceIDs, key)
+	}
+	ie := &ingestevents.IngestEvent{
+		TraceIDs: traceIDs,
+		ParamSet: paramset,
+		Filename: filename,
+	}
+	body, err := ingestevents.CreatePubSubBody(ie)
+	if err != nil {
+		return skerr.Wrapf(err, "Failed to encode PubSub body for topic: %q", topicName)
+	}
+	msg := &pubsub.Message{
+		Data: body,
+	}
+	ctx := context.Background()
+	_, err = pubSubClient.Topic(topicName).Publish(ctx, msg).Get(ctx)
+
+	return skerr.Wrap(err)
+}
+
 // Start a single go routine to process incoming ingestion files and write
 // the data they contain to a trace store.
 //
@@ -28,6 +66,19 @@
 	failedToWrite := metrics2.GetCounter("perfserver_ingest_failed_to_write")
 	successfulWrite := metrics2.GetCounter("perfserver_ingest_successful_write")
 
+	var pubSubClient *pubsub.Client
+	if instanceConfig.IngestionConfig.FileIngestionTopicName != "" {
+		ts, err := auth.NewDefaultTokenSource(false, pubsub.ScopePubSub)
+		if err != nil {
+			sklog.Fatalf("Failed to create TokenSource: %s", err)
+		}
+
+		pubSubClient, err = pubsub.NewClient(ctx, instanceConfig.IngestionConfig.SourceConfig.Project, option.WithTokenSource(ts))
+		if err != nil {
+			sklog.Fatal(err)
+		}
+	}
+
 	// New file.Source.
 	source, err := builders.NewSourceFromConfig(ctx, instanceConfig, false)
 	ch, err := source.Start(ctx)
@@ -84,6 +135,12 @@
 			sklog.Error("Failed to write %v: %s", f, err)
 		}
 		successfulWrite.Inc(1)
+
+		if err := sendPubSubEvent(pubSubClient, instanceConfig.IngestionConfig.FileIngestionTopicName, params, ps, f.Name); err != nil {
+			sklog.Errorf("Failed to send pubsub event: %s", err)
+		} else {
+			sklog.Info("FileIngestionTopicName pubsub message sent.")
+		}
 	}
 	sklog.Infof("Exited while waiting on files. Should only happen on source_type=dir.")
 	return nil
diff --git a/perf/go/ingest/process/process_test.go b/perf/go/ingest/process/process_test.go
index 94e62fc..fb7e0d5 100644
--- a/perf/go/ingest/process/process_test.go
+++ b/perf/go/ingest/process/process_test.go
@@ -3,17 +3,58 @@
 
 import (
 	"context"
+	"fmt"
 	"io/ioutil"
+	"math/rand"
 	"os"
+	"sync"
 	"testing"
+	"time"
 
+	"cloud.google.com/go/pubsub"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
+	"go.skia.org/infra/go/auth"
 	"go.skia.org/infra/go/metrics2"
+	"go.skia.org/infra/go/paramtools"
 	"go.skia.org/infra/go/testutils/unittest"
 	"go.skia.org/infra/perf/go/config"
+	"go.skia.org/infra/perf/go/ingestevents"
+	"google.golang.org/api/option"
 )
 
+func setupPubSubClient(t *testing.T) (*pubsub.Client, *config.InstanceConfig) {
+	unittest.RequiresPubSubEmulator(t)
+	ctx := context.Background()
+
+	rand.Seed(time.Now().Unix())
+	instanceConfig := &config.InstanceConfig{
+		IngestionConfig: config.IngestionConfig{
+			SourceConfig: config.SourceConfig{
+				Project: "test-project",
+			},
+			FileIngestionTopicName: fmt.Sprintf("some-topic-%d", rand.Uint64()),
+		},
+	}
+
+	ts, err := auth.NewDefaultTokenSource(true, pubsub.ScopePubSub)
+	require.NoError(t, err)
+	pubsubClient, err := pubsub.NewClient(ctx, instanceConfig.IngestionConfig.SourceConfig.Project, option.WithTokenSource(ts))
+	require.NoError(t, err)
+
+	// Create the topic.
+	topic := pubsubClient.Topic(instanceConfig.IngestionConfig.FileIngestionTopicName)
+	ok, err := topic.Exists(ctx)
+	require.NoError(t, err)
+	if !ok {
+		topic, err = pubsubClient.CreateTopic(ctx, instanceConfig.IngestionConfig.FileIngestionTopicName)
+	}
+	topic.Stop()
+	assert.NoError(t, err)
+
+	return pubsubClient, instanceConfig
+}
+
 func TestStart_IngestDemoRepoWithSQLite3TraceStore_Success(t *testing.T) {
 	unittest.LargeTest(t)
 
@@ -59,3 +100,57 @@
 	assert.Equal(t, int64(1), metrics2.GetCounter("perfserver_ingest_bad_githash").Get())
 	assert.Equal(t, int64(1), metrics2.GetCounter("perfserver_ingest_failed_to_parse").Get())
 }
+
+func TestSendPubSubEvent_Success(t *testing.T) {
+	unittest.ManualTest(t)
+	client, instanceConfig := setupPubSubClient(t)
+	ctx := context.Background()
+
+	// Setup the data to send via pubsub.
+	params := []paramtools.Params{
+		{
+			"arch":   "x86",
+			"config": "8888",
+		},
+		{
+			"arch":   "arm",
+			"config": "565",
+		},
+	}
+	ps := paramtools.NewParamSet(params...)
+
+	// Create the subscription before the pubsub message is sent, otherwise the emulator won't deliver it.
+	topic := client.Topic(instanceConfig.IngestionConfig.FileIngestionTopicName)
+	// Create a subscription with the same name as the topic since the name is
+	// random and won't have a conflict.
+	sub, err := client.CreateSubscription(ctx, instanceConfig.IngestionConfig.FileIngestionTopicName, pubsub.SubscriptionConfig{
+		Topic: topic,
+	})
+	require.NoError(t, err)
+
+	// Setup to receive the pubsub message we will send.
+	cctx, cancel := context.WithCancel(ctx)
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
+			ev, err := ingestevents.DecodePubSubBody(msg.Data)
+			require.NoError(t, err)
+			assert.Equal(t, "somefile.json", ev.Filename)
+			assert.Equal(t, ps, ev.ParamSet)
+			wg.Done()
+		})
+		require.NoError(t, err)
+	}()
+
+	// Now we can finally send the message.
+	err = sendPubSubEvent(client, instanceConfig.IngestionConfig.FileIngestionTopicName, params, ps, "somefile.json")
+	require.NoError(t, err)
+
+	// Wait for one message to be delivered.
+	wg.Wait()
+
+	// Stop the receiver.
+	cancel()
+	topic.Stop()
+}