[gold] Fix traces and make PubSub publication non-blocking.

I think this should speed up CL indexing again and make the
trace data sensible.

Bug: skia:10582
Change-Id: I418350032fc84c420496326daedce8234254e655
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/371516
Reviewed-by: Kevin Lubick <kjlubick@google.com>
diff --git a/golden/cmd/skiacorrectness/skiacorrectness.go b/golden/cmd/skiacorrectness/skiacorrectness.go
index 2046b2e..3ed17da 100644
--- a/golden/cmd/skiacorrectness/skiacorrectness.go
+++ b/golden/cmd/skiacorrectness/skiacorrectness.go
@@ -21,6 +21,7 @@
 	"cloud.google.com/go/pubsub"
 	"github.com/gorilla/mux"
 	"github.com/jackc/pgx/v4/pgxpool"
+	"go.opencensus.io/trace"
 	"go.skia.org/infra/go/paramtools"
 	"go.skia.org/infra/golden/go/types"
 	"golang.org/x/oauth2"
@@ -626,6 +627,8 @@
 // CalculateDiffs publishes a WorkerMessage to the configured PubSub topic so that a worker
 // (see diffcalculator) can pick it up and calculate the diffs.
 func (p *pubsubDiffPublisher) CalculateDiffs(ctx context.Context, grouping paramtools.Params, left, right []types.Digest) error {
+	ctx, span := trace.StartSpan(ctx, "PublishDiffMessage")
+	defer span.End()
 	body, err := json.Marshal(diff.WorkerMessage{
 		Version:         diff.WorkerMessageVersion,
 		Grouping:        grouping,
@@ -635,14 +638,10 @@
 	if err != nil {
 		return skerr.Wrap(err) // should never happen because JSON input is well-formed.
 	}
-	pr := p.client.Topic(p.topic).Publish(ctx, &pubsub.Message{
+	p.client.Topic(p.topic).Publish(ctx, &pubsub.Message{
 		Data: body,
 	})
-	// Blocks until message actual sent
-	_, err = pr.Get(ctx)
-	if err != nil {
-		return skerr.Wrap(err)
-	}
+	// Don't block until message is sent to speed up throughput.
 	return nil
 }
 
diff --git a/golden/go/clstore/sqlclstore/sqlclstore.go b/golden/go/clstore/sqlclstore/sqlclstore.go
index 8d2e0eb..5f04cd6 100644
--- a/golden/go/clstore/sqlclstore/sqlclstore.go
+++ b/golden/go/clstore/sqlclstore/sqlclstore.go
@@ -6,8 +6,8 @@
 
 	"github.com/jackc/pgx/v4"
 	"github.com/jackc/pgx/v4/pgxpool"
+	"go.opencensus.io/trace"
 
-	"go.skia.org/infra/go/metrics2"
 	"go.skia.org/infra/go/skerr"
 	"go.skia.org/infra/go/sklog"
 	"go.skia.org/infra/golden/go/clstore"
@@ -53,7 +53,8 @@
 
 // GetChangelists implements clstore.Store.
 func (s *StoreImpl) GetChangelists(ctx context.Context, opts clstore.SearchOptions) ([]code_review.Changelist, int, error) {
-	defer metrics2.FuncTimer().Stop()
+	ctx, span := trace.StartSpan(ctx, "sqlclstore_GetChangelists")
+	defer span.End()
 	if opts.Limit <= 0 {
 		return nil, 0, skerr.Fmt("must supply a limit")
 	}
diff --git a/golden/go/expectations/fs_expectationstore/fs_expstore.go b/golden/go/expectations/fs_expectationstore/fs_expstore.go
index a4c1273..91b32cf 100644
--- a/golden/go/expectations/fs_expectationstore/fs_expstore.go
+++ b/golden/go/expectations/fs_expectationstore/fs_expstore.go
@@ -11,6 +11,7 @@
 	"time"
 
 	"cloud.google.com/go/firestore"
+	"go.opencensus.io/trace"
 	"golang.org/x/sync/errgroup"
 
 	ifirestore "go.skia.org/infra/go/firestore"
@@ -459,6 +460,8 @@
 // Firestore if not.
 func (s *Store) Get(ctx context.Context) (expectations.ReadOnly, error) {
 	if s.hasSnapshotsRunning {
+		_, span := trace.StartSpan(ctx, "fsexpstore_getFromSnapshots")
+		defer span.End()
 		// If the snapshots are running, we first check to see if we have a fresh Expectations.
 		s.returnCacheMutex.Lock()
 		defer s.returnCacheMutex.Unlock()
@@ -503,7 +506,8 @@
 // the assumption that loadExpectations will only be called for setups that do not have the
 // snapshot queries, and the entryCache is used to create the expectationChanges (for undoing).
 func (s *Store) loadExpectations(ctx context.Context) (*expectations.Expectations, error) {
-	defer metrics2.FuncTimer().Stop()
+	ctx, span := trace.StartSpan(ctx, "fsexpstore_loadExpectations")
+	defer span.End()
 	es := make([][]expectationEntry, s.numShards)
 	queries := fs_utils.ShardOnDigest(s.expectationsCollection(), digestField, s.numShards)
 
diff --git a/golden/go/indexer/indexer.go b/golden/go/indexer/indexer.go
index 721d198..39ecc69 100644
--- a/golden/go/indexer/indexer.go
+++ b/golden/go/indexer/indexer.go
@@ -779,14 +779,12 @@
 					sklog.Errorf("Changelist indexing timed out (%v)", err)
 					return nil
 				}
-				ctx, expSpan := trace.StartSpan(ctx, "indexer_getCLExpectations")
 				issueExpStore := ix.ExpectationsStore.ForChangelist(cl.SystemID, system.ID)
 				clExps, err := issueExpStore.Get(ctx)
 				if err != nil {
 					return skerr.Wrapf(err, "loading expectations for cl %s (%s)", cl.SystemID, system.ID)
 				}
 				exps := expectations.Join(clExps, masterExp)
-				expSpan.End()
 
 				clKey := fmt.Sprintf("%s_%s", system.ID, cl.SystemID)
 				clIdx, ok := ix.getCLIndex(clKey)
@@ -901,10 +899,15 @@
 	if err != nil {
 		return skerr.Wrap(err)
 	}
+	_, spanTryjobData := trace.StartSpan(ctx, "getTryjobData")
+	spanTryjobData.AddAttributes(trace.Int64Attribute("data_points", int64(len(xtjr))))
 	// The left and right digests will be the data from these tryjobs as well as the non-ignored
 	// data on the primary branch for the corresponding groupings.
 	digestsPerGrouping := map[hashableGrouping]types.DigestSet{}
 	for _, tjr := range xtjr {
+		if err := ctx.Err(); err != nil {
+			return skerr.Wrap(err)
+		}
 		traceKeys := paramtools.Params{}
 		traceKeys.Add(tjr.GroupParams, tjr.ResultParams)
 		grouping := getHashableGrouping(traceKeys)
@@ -917,10 +920,16 @@
 		}
 		digestsPerGrouping[grouping] = uniqueDigests
 	}
+	spanTryjobData.End()
 
 	tile := idx.cpxTile.GetTile(types.ExcludeIgnoredTraces)
+	_, primaryBranchData := trace.StartSpan(ctx, "getDataFromPrimarybranch")
+	primaryBranchData.AddAttributes(trace.Int64Attribute("groupings", int64(len(digestsPerGrouping))))
 	// Add the digests from the primary branch (using the index)
 	for grouping := range digestsPerGrouping {
+		if err := ctx.Err(); err != nil {
+			return skerr.Wrap(err)
+		}
 		q := paramtools.ParamSet{
 			types.CorpusField:     []string{grouping[0]},
 			types.PrimaryKeyField: []string{grouping[1]},
@@ -934,6 +943,7 @@
 		}
 		digestsPerGrouping[grouping] = uniqueDigests
 	}
+	primaryBranchData.End()
 
 	sklog.Infof("Sending diff messages for CL %s covering %d groupings to diffcalculator", clID, len(digestsPerGrouping))
 	for hg, ds := range digestsPerGrouping {