[gold] Speed up and fix CL indexing

The big changes here are:
 - Avoid O(n^2) inside a loop (digest counter).
 - Do not fail silently on primary branch index and make
   sure we finish indexing the primary branch before
   computing CL indexes.
 - Modify tests so we cover the CL diff code.
 - Add environment variable to populate podName in tracing.

Bug: skia:10582
Change-Id: I39b48fef72c1ccc200cd4a09e396debea989943c
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/371966
Reviewed-by: Kevin Lubick <kjlubick@google.com>
diff --git a/golden/cmd/skiacorrectness/skiacorrectness.go b/golden/cmd/skiacorrectness/skiacorrectness.go
index 3ed17da..f36d702 100644
--- a/golden/cmd/skiacorrectness/skiacorrectness.go
+++ b/golden/cmd/skiacorrectness/skiacorrectness.go
@@ -615,7 +615,6 @@
 		sklog.Fatalf("Failed to create indexer: %s", err)
 	}
 	sklog.Infof("Indexer created.")
-
 	return ixr
 }
 
diff --git a/golden/go/digest_counter/digest_counter.go b/golden/go/digest_counter/digest_counter.go
index 466bfaf..b8ed205 100644
--- a/golden/go/digest_counter/digest_counter.go
+++ b/golden/go/digest_counter/digest_counter.go
@@ -30,7 +30,7 @@
 	MaxDigestsByTest() map[types.TestName]types.DigestSet
 
 	// ByQuery returns a DigestCount of all the digests that match the given query in
-	// the provided tile.
+	// the provided tile. Note that this will recompute the digests across the entire tile.
 	ByQuery(tile *tiling.Tile, query paramtools.ParamSet) DigestCount
 }
 
diff --git a/golden/go/indexer/indexer.go b/golden/go/indexer/indexer.go
index 39ecc69..5a5b219 100644
--- a/golden/go/indexer/indexer.go
+++ b/golden/go/indexer/indexer.go
@@ -252,7 +252,7 @@
 	defer metrics2.FuncTimer().Stop()
 
 	// Retrieve Trace for the given traceID.
-	trace, ok := idx.cpxTile.GetTile(types.IncludeIgnoredTraces).Traces[traceID]
+	tr, ok := idx.cpxTile.GetTile(types.IncludeIgnoredTraces).Traces[traceID]
 	if !ok {
 		return tiling.MissingDigest, nil
 	}
@@ -264,9 +264,9 @@
 	}
 
 	// Find and return the most recent positive digest in the Trace.
-	for i := len(trace.Digests) - 1; i >= 0; i-- {
-		digest := trace.Digests[i]
-		if digest != tiling.MissingDigest && exps.Classification(trace.TestName(), digest) == expectations.Positive {
+	for i := len(tr.Digests) - 1; i >= 0; i-- {
+		digest := tr.Digests[i]
+		if digest != tiling.MissingDigest && exps.Classification(tr.TestName(), digest) == expectations.Positive {
 			return digest, nil
 		}
 	}
@@ -384,15 +384,11 @@
 		return nil
 	}
 
-	// We can start indexing the Changelists right away since it only depends on the expectations
-	// (and nothing from the master branch index).
-	go util.RepeatCtx(ctx, interval, ix.calcChangelistIndices)
-
 	defer shared.NewMetricsTimer("initial_synchronous_index").Stop()
 	// Build the first index synchronously.
 	tileStream := tilesource.GetTileStreamNow(ix.TileSource, interval, "gold-indexer")
 	if err := ix.executePipeline(ctx, <-tileStream); err != nil {
-		return err
+		return skerr.Wrap(err)
 	}
 
 	// When the master expectations change, update the blamer and its dependents. This channel
@@ -453,6 +449,10 @@
 		}
 	}()
 
+	// Start indexing the CLs now that the first index has been populated (we need the
+	// primary branch index to get the digests to diff against).
+	go util.RepeatCtx(ctx, interval, ix.calcChangelistIndices)
+
 	return nil
 }
 
@@ -555,14 +555,14 @@
 	idx := state.(*SearchIndex)
 	for _, is := range types.IgnoreStates {
 		t := idx.cpxTile.GetTile(is)
-		for id, trace := range t.Traces {
-			if trace == nil {
+		for id, tr := range t.Traces {
+			if tr == nil {
 				sklog.Warningf("Unexpected nil trace id %s", id)
 				continue
 			}
 			tp := tiling.TracePair{
 				ID:    id,
-				Trace: trace,
+				Trace: tr,
 			}
 			// Pre-slice the data by IgnoreState, then by IgnoreState and Corpus, finally by all
 			// three of IgnoreState/Corpus/Test. We shouldn't allow queries by Corpus w/o specifying
@@ -575,14 +575,14 @@
 
 			ignoreAndCorpus := preSliceGroup{
 				IgnoreState: is,
-				Corpus:      trace.Corpus(),
+				Corpus:      tr.Corpus(),
 			}
 			idx.preSliced[ignoreAndCorpus] = append(idx.preSliced[ignoreAndCorpus], &tp)
 
 			ignoreCorpusTest := preSliceGroup{
 				IgnoreState: is,
-				Corpus:      trace.Corpus(),
-				Test:        trace.TestName(),
+				Corpus:      tr.Corpus(),
+				Test:        tr.TestName(),
 			}
 			idx.preSliced[ignoreCorpusTest] = append(idx.preSliced[ignoreCorpusTest], &tp)
 		}
@@ -745,10 +745,10 @@
 	// Update the metric when we return (either from error or because we completed indexing).
 	defer metrics2.GetInt64Metric(indexedCLsMetric).Update(int64(ix.changelistIndices.ItemCount()))
 	// Make sure this doesn't take arbitrarily long.
-	ctx, cancel := context.WithTimeout(ctx, 20*time.Minute)
+	ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
 	defer cancel()
 	now := time.Now()
-	masterExp, err := ix.ExpectationsStore.Get(ctx)
+	primaryExp, err := ix.ExpectationsStore.Get(ctx)
 	if err != nil {
 		sklog.Errorf("Could not get expectations for changelist indices: %s", err)
 		return
@@ -784,7 +784,7 @@
 				if err != nil {
 					return skerr.Wrapf(err, "loading expectations for cl %s (%s)", cl.SystemID, system.ID)
 				}
-				exps := expectations.Join(clExps, masterExp)
+				exps := expectations.Join(clExps, primaryExp)
 
 				clKey := fmt.Sprintf("%s_%s", system.ID, cl.SystemID)
 				clIdx, ok := ix.getCLIndex(clKey)
@@ -821,7 +821,7 @@
 						return skerr.Wrap(err)
 					}
 					untriagedResults, params := indexTryJobResults(ctx, existingUntriagedResults, xtjr, exps)
-					if err := ix.sendCLWorkToDiffCalculators(ctx, xtjr, system.ID+"_"+cl.SystemID); err != nil {
+					if err := ix.sendCLWorkToDiffCalculators(ctx, primaryExp, xtjr, system.ID+"_"+cl.SystemID); err != nil {
 						return skerr.Wrap(err)
 					}
 					// Copy the existing ParamSet into the newly created one. It is important to copy it from
@@ -884,7 +884,7 @@
 	return combined, params
 }
 
-func (ix *Indexer) sendCLWorkToDiffCalculators(ctx context.Context, xtjr []tjstore.TryJobResult, clID string) error {
+func (ix *Indexer) sendCLWorkToDiffCalculators(ctx context.Context, primaryExp expectations.Classifier, xtjr []tjstore.TryJobResult, clID string) error {
 	ctx, span := trace.StartSpan(ctx, "indexer_sendCLWorkToDiffCalculators")
 	defer span.End()
 	if len(xtjr) == 0 {
@@ -893,20 +893,48 @@
 	}
 	idx := ix.getIndex()
 	if idx == nil {
-		return nil
+		// Should not happen because we compute the primary branch index synchronously
+		// before starting to index CLs.
+		return skerr.Fmt("Primary branch index not ready yet")
 	}
-	exp, err := idx.expectationsStore.Get(ctx)
+	digestsPerGrouping, err := groupDataFromTryJobs(ctx, xtjr)
 	if err != nil {
 		return skerr.Wrap(err)
 	}
-	_, spanTryjobData := trace.StartSpan(ctx, "getTryjobData")
-	spanTryjobData.AddAttributes(trace.Int64Attribute("data_points", int64(len(xtjr))))
+	left, right, err := addDataFromPrimaryBranch(ctx, idx, digestsPerGrouping, primaryExp)
+	if err != nil {
+		return skerr.Wrap(err)
+	}
+
+	sklog.Infof("Sending diff messages for CL %s covering %d groupings to diffcalculator", clID, len(left))
+	for hg := range left {
+		grouping := paramtools.Params{
+			types.CorpusField:     hg[0],
+			types.PrimaryKeyField: hg[1],
+		}
+		leftDigests := left[hg].Keys()
+		rightDigests := right[hg].Keys()
+		// This should be pretty fast because it's just sending off the work, not blocking until
+		// the work is calculated.
+		if err := ix.DiffWorkPublisher.CalculateDiffs(ctx, grouping, leftDigests, rightDigests); err != nil {
+			return skerr.Wrapf(err, "publishing diff calculation for CL %s - %d, %d digests in grouping %v", clID, len(leftDigests), len(rightDigests), grouping)
+		}
+	}
+	return nil
+}
+
+// groupDataFromTryJobs takes the data from the provided TryJobResults and groups the unique
+// digests by grouping.
+func groupDataFromTryJobs(ctx context.Context, xtjr []tjstore.TryJobResult) (map[hashableGrouping]types.DigestSet, error) {
+	ctx, span := trace.StartSpan(ctx, "groupDataFromTryJobs")
+	span.AddAttributes(trace.Int64Attribute("data_points", int64(len(xtjr))))
+	defer span.End()
 	// The left and right digests will be the data from these tryjobs as well as the non-ignored
 	// data on the primary branch for the corresponding groupings.
 	digestsPerGrouping := map[hashableGrouping]types.DigestSet{}
 	for _, tjr := range xtjr {
 		if err := ctx.Err(); err != nil {
-			return skerr.Wrap(err)
+			return nil, skerr.Wrap(err)
 		}
 		traceKeys := paramtools.Params{}
 		traceKeys.Add(tjr.GroupParams, tjr.ResultParams)
@@ -920,45 +948,39 @@
 		}
 		digestsPerGrouping[grouping] = uniqueDigests
 	}
-	spanTryjobData.End()
+	return digestsPerGrouping, nil
+}
 
-	tile := idx.cpxTile.GetTile(types.ExcludeIgnoredTraces)
-	_, primaryBranchData := trace.StartSpan(ctx, "getDataFromPrimarybranch")
-	primaryBranchData.AddAttributes(trace.Int64Attribute("groupings", int64(len(digestsPerGrouping))))
+// addDataFromPrimaryBranch adds the triaged, not ignored digests from the primary branch to
+// the provided map and returns it as the first return value (the left digests). It adds those same
+// digests to a new map and returns it as the second return value (the right digests).
+func addDataFromPrimaryBranch(ctx context.Context, idx *SearchIndex, leftDigests map[hashableGrouping]types.DigestSet, exp expectations.Classifier) (map[hashableGrouping]types.DigestSet, map[hashableGrouping]types.DigestSet, error) {
+	ctx, span := trace.StartSpan(ctx, "addDataFromPrimaryBranch")
+	span.AddAttributes(trace.Int64Attribute("groupings", int64(len(leftDigests))))
+	defer span.End()
+	countByTest := idx.dCounters[types.ExcludeIgnoredTraces].ByTest()
+	rightDigests := make(map[hashableGrouping]types.DigestSet, len(leftDigests))
 	// Add the digests from the primary branch (using the index)
-	for grouping := range digestsPerGrouping {
+	for grouping := range leftDigests {
 		if err := ctx.Err(); err != nil {
-			return skerr.Wrap(err)
+			return nil, nil, skerr.Wrap(err)
 		}
-		q := paramtools.ParamSet{
-			types.CorpusField:     []string{grouping[0]},
-			types.PrimaryKeyField: []string{grouping[1]},
-		}
-		uniqueDigests := digestsPerGrouping[grouping]
-		dc := idx.dCounters[types.ExcludeIgnoredTraces].ByQuery(tile, q)
-		for digest := range dc {
+		allLeftDigests := leftDigests[grouping]
+		allRightDigests := types.DigestSet{}
+		// We assume that test names are unique across corpora. This may not be true in general,
+		// but it allows us to avoid iterating the tile again.
+		for digest := range countByTest[types.TestName(grouping[1])] {
 			if exp.Classification(types.TestName(grouping[1]), digest) != expectations.Untriaged {
-				uniqueDigests[digest] = true
+				allLeftDigests[digest] = true
+				allRightDigests[digest] = true
 			}
 		}
-		digestsPerGrouping[grouping] = uniqueDigests
+		// This won't make a new key in the map, so it should be safe overwrite this key and not
+		// affect iteration.
+		leftDigests[grouping] = allLeftDigests
+		rightDigests[grouping] = allRightDigests
 	}
-	primaryBranchData.End()
-
-	sklog.Infof("Sending diff messages for CL %s covering %d groupings to diffcalculator", clID, len(digestsPerGrouping))
-	for hg, ds := range digestsPerGrouping {
-		grouping := paramtools.Params{
-			types.CorpusField:     hg[0],
-			types.PrimaryKeyField: hg[1],
-		}
-		digests := ds.Keys()
-		// This should be pretty fast because it's just sending off the work, not blocking until
-		// the work is calculated.
-		if err := ix.DiffWorkPublisher.CalculateDiffs(ctx, grouping, digests, digests); err != nil {
-			return skerr.Wrapf(err, "publishing diff calculation for CL %s - %d digests in grouping %v", clID, len(digests), grouping)
-		}
-	}
-	return nil
+	return leftDigests, rightDigests, nil
 }
 
 // getCLIndex is a helper that returns the appropriately typed element from changelistIndices.
diff --git a/golden/go/indexer/indexer_test.go b/golden/go/indexer/indexer_test.go
index ef1e9f8..1e1333a 100644
--- a/golden/go/indexer/indexer_test.go
+++ b/golden/go/indexer/indexer_test.go
@@ -19,6 +19,7 @@
 	"go.skia.org/infra/golden/go/clstore"
 	mock_clstore "go.skia.org/infra/golden/go/clstore/mocks"
 	"go.skia.org/infra/golden/go/code_review"
+	"go.skia.org/infra/golden/go/diff"
 	diff_mocks "go.skia.org/infra/golden/go/diff/mocks"
 	mock_diffstore "go.skia.org/infra/golden/go/diffstore/mocks"
 	"go.skia.org/infra/golden/go/digest_counter"
@@ -300,22 +301,27 @@
 		"day_of_week": "wednesday",
 	}
 
+	resultParams := paramtools.Params{ // all data points are for the same test.
+		types.CorpusField:     "some_corpus",
+		types.PrimaryKeyField: string(data.AlphaTest),
+	}
+
 	mts.On("GetResults", testutils.AnyContext, firstCombinedID, time.Time{}).Return([]tjstore.TryJobResult{
 		{
-			ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+			ResultParams: resultParams,
 			GroupParams:  androidGroup,
 			Options:      firstOptionalGroup,
 			Digest:       data.AlphaPositiveDigest,
 			// Other fields ignored
 		},
 		{
-			ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+			ResultParams: resultParams,
 			GroupParams:  iosGroup,
 			Options:      firstOptionalGroup,
 			Digest:       data.AlphaNegativeDigest,
 		},
 		{
-			ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+			ResultParams: resultParams,
 			GroupParams:  androidGroup,
 			Options:      secondOptionalGroup,
 			Digest:       data.AlphaUntriagedDigest,
@@ -323,26 +329,41 @@
 	}, nil)
 	mts.On("GetResults", testutils.AnyContext, secondCombinedID, time.Time{}).Return([]tjstore.TryJobResult{
 		{
-			ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+			ResultParams: resultParams,
 			GroupParams:  androidGroup,
 			Options:      firstOptionalGroup,
 			Digest:       data.AlphaPositiveDigest,
 		},
 		{
-			ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+			ResultParams: resultParams,
 			GroupParams:  iosGroup,
 			Options:      firstOptionalGroup,
 			// Note, for this CL, this digest has not yet been triaged.
 			Digest: data.AlphaNegativeDigest,
 		},
 		{
-			ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+			ResultParams: resultParams,
 			GroupParams:  androidGroup,
 			Options:      firstOptionalGroup,
 			Digest:       data.AlphaUntriagedDigest,
 		},
 	}, nil)
 
+	expectedGrouping := paramtools.Params{
+		types.CorpusField:     "some_corpus",
+		types.PrimaryKeyField: string(data.AlphaTest),
+	}
+	leftDigestsMatcher := mock.MatchedBy(func(left []types.Digest) bool {
+		assert.ElementsMatch(t, []types.Digest{data.AlphaUntriagedDigest, data.AlphaPositiveDigest, data.AlphaNegativeDigest}, left)
+		return true
+	})
+	rightDigestsMatcher := mock.MatchedBy(func(right []types.Digest) bool {
+		assert.ElementsMatch(t, []types.Digest{data.AlphaPositiveDigest}, right)
+		return true
+	})
+	mdc := &diff_mocks.Calculator{}
+	mdc.On("CalculateDiffs", testutils.AnyContext, expectedGrouping, leftDigestsMatcher, rightDigestsMatcher).Return(nil)
+
 	ctx := context.Background()
 	ic := IndexerConfig{
 		ExpectationsStore: mes,
@@ -354,10 +375,12 @@
 				// URLTemplate and Client are unused here
 			},
 		},
+		DiffWorkPublisher: mdc,
 	}
 	ixr, err := New(ctx, ic, 0)
 	require.NoError(t, err)
 	ixr.changelistsReindexed.Reset()
+	ixr.lastMasterIndex = fakeSearchIndex()
 
 	ixr.calcChangelistIndices(ctx)
 
@@ -374,6 +397,7 @@
 		"model":       []string{"crosshatch", "iphone3"},
 		"name":        []string{"test_alpha"},
 		"os":          []string{"Android", "iOS"},
+		"source_type": []string{"some_corpus"},
 	}, clIdx.ParamSet)
 
 	clIdx = ixr.GetIndexForCL(gerritCRS, secondCLID)
@@ -391,11 +415,27 @@
 		"model":       []string{"crosshatch", "iphone3"},
 		"name":        []string{"test_alpha"},
 		"os":          []string{"Android", "iOS"},
+		"source_type": []string{"some_corpus"},
 	}, clIdx.ParamSet)
 
 	assert.Equal(t, int64(2), ixr.changelistsReindexed.Get())
 }
 
+// noopPublisher returns a DiffPublisher that accepts any and all calls to CalculateDiffs.
+func noopPublisher() diff.Calculator {
+	mdc := diff_mocks.Calculator{}
+	mdc.On("CalculateDiffs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
+	return &mdc
+}
+
+// fakeSearchIndex returns an arbitrary SearchIndex with enough data to compute CL diff messages.
+func fakeSearchIndex() *SearchIndex {
+	_, tile, _ := makeComplexTileWithCrosshatchIgnores()
+	return &SearchIndex{
+		dCounters: [2]digest_counter.DigestCounter{digest_counter.New(tile), digest_counter.New(tile)},
+	}
+}
+
 func TestIndexer_CalcChangelistIndices_HasIndexForPreviousPS_Success(t *testing.T) {
 	unittest.SmallTest(t)
 
@@ -470,6 +510,7 @@
 				// URLTemplate and Client are unused here
 			},
 		},
+		DiffWorkPublisher: noopPublisher(),
 	}
 	ixr, err := New(ctx, ic, 0)
 	require.NoError(t, err)
@@ -506,6 +547,7 @@
 		ComputedTS: longAgo,
 	}
 	ixr.changelistIndices.Set("gerrit_111111", &previousIdx, 0)
+	ixr.lastMasterIndex = fakeSearchIndex()
 
 	ixr.calcChangelistIndices(ctx)
 
@@ -594,6 +636,7 @@
 				// URLTemplate and Client are unused here
 			},
 		},
+		DiffWorkPublisher: noopPublisher(),
 	}
 	ixr, err := New(ctx, ic, 0)
 	require.NoError(t, err)
@@ -624,7 +667,7 @@
 		ComputedTS: longAgo,
 	}
 	ixr.changelistIndices.Set("gerrit_111111", &previousIdx, 0)
-
+	ixr.lastMasterIndex = fakeSearchIndex()
 	ixr.calcChangelistIndices(ctx)
 
 	clIdx := ixr.GetIndexForCL(gerritCRS, clID)
diff --git a/golden/k8s-config-templates/gold-baselineserver-template.yaml b/golden/k8s-config-templates/gold-baselineserver-template.yaml
index ec3a47d..021270b 100644
--- a/golden/k8s-config-templates/gold-baselineserver-template.yaml
+++ b/golden/k8s-config-templates/gold-baselineserver-template.yaml
@@ -80,6 +80,10 @@
           env:
             - name: GOOGLE_APPLICATION_CREDENTIALS
               value: /etc/gold-secrets/service-account.json
+            - name: K8S_POD_NAME
+              valueFrom:
+                fieldRef:
+                  fieldPath: metadata.name
           resources:
             requests:
               memory: "500Mi"
diff --git a/golden/k8s-config-templates/gold-diffserver-template.yaml b/golden/k8s-config-templates/gold-diffserver-template.yaml
index 2d4e3c2..df0e5b7 100644
--- a/golden/k8s-config-templates/gold-diffserver-template.yaml
+++ b/golden/k8s-config-templates/gold-diffserver-template.yaml
@@ -64,6 +64,10 @@
           env:
             - name: GOOGLE_APPLICATION_CREDENTIALS
               value: /var/secrets/google/service-account.json
+            - name: K8S_POD_NAME
+              valueFrom:
+                fieldRef:
+                  fieldPath: metadata.name
           resources:
             requests:
               memory: "{{.K8S_MEMORY}}"
diff --git a/golden/k8s-config-templates/gold-ingestion-bt-template.yaml b/golden/k8s-config-templates/gold-ingestion-bt-template.yaml
index bb0c0c7..3b2cf73 100644
--- a/golden/k8s-config-templates/gold-ingestion-bt-template.yaml
+++ b/golden/k8s-config-templates/gold-ingestion-bt-template.yaml
@@ -71,6 +71,10 @@
           env:
             - name: GOOGLE_APPLICATION_CREDENTIALS
               value: /var/secrets/google/service-account.json
+            - name: K8S_POD_NAME
+              valueFrom:
+                fieldRef:
+                  fieldPath: metadata.name
           resources:
             requests:
               memory: "4Gi"
diff --git a/golden/k8s-config-templates/gold-skiacorrectness-template.yaml b/golden/k8s-config-templates/gold-skiacorrectness-template.yaml
index 827c562..39aa5b5 100644
--- a/golden/k8s-config-templates/gold-skiacorrectness-template.yaml
+++ b/golden/k8s-config-templates/gold-skiacorrectness-template.yaml
@@ -65,6 +65,10 @@
           env:
             - name: GOOGLE_APPLICATION_CREDENTIALS
               value: /var/secrets/google/service-account.json
+            - name: K8S_POD_NAME
+              valueFrom:
+                fieldRef:
+                  fieldPath: metadata.name
           resources:
             requests:
               memory: "{{.K8S_MEMORY}}"