[gold] Speed up and fix CL indexing
The big changes here are:
- Avoid O(n^2) inside a loop (digest counter).
- Do not fail silently on primary branch index and make
sure we finish indexing the primary branch before
computing CL indexes.
- Modify tests so we cover the CL diff code.
- Add environment variable to populate podName in tracing.
Bug: skia:10582
Change-Id: I39b48fef72c1ccc200cd4a09e396debea989943c
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/371966
Reviewed-by: Kevin Lubick <kjlubick@google.com>
diff --git a/golden/cmd/skiacorrectness/skiacorrectness.go b/golden/cmd/skiacorrectness/skiacorrectness.go
index 3ed17da..f36d702 100644
--- a/golden/cmd/skiacorrectness/skiacorrectness.go
+++ b/golden/cmd/skiacorrectness/skiacorrectness.go
@@ -615,7 +615,6 @@
sklog.Fatalf("Failed to create indexer: %s", err)
}
sklog.Infof("Indexer created.")
-
return ixr
}
diff --git a/golden/go/digest_counter/digest_counter.go b/golden/go/digest_counter/digest_counter.go
index 466bfaf..b8ed205 100644
--- a/golden/go/digest_counter/digest_counter.go
+++ b/golden/go/digest_counter/digest_counter.go
@@ -30,7 +30,7 @@
MaxDigestsByTest() map[types.TestName]types.DigestSet
// ByQuery returns a DigestCount of all the digests that match the given query in
- // the provided tile.
+ // the provided tile. Note that this will recompute the digests across the entire tile.
ByQuery(tile *tiling.Tile, query paramtools.ParamSet) DigestCount
}
diff --git a/golden/go/indexer/indexer.go b/golden/go/indexer/indexer.go
index 39ecc69..5a5b219 100644
--- a/golden/go/indexer/indexer.go
+++ b/golden/go/indexer/indexer.go
@@ -252,7 +252,7 @@
defer metrics2.FuncTimer().Stop()
// Retrieve Trace for the given traceID.
- trace, ok := idx.cpxTile.GetTile(types.IncludeIgnoredTraces).Traces[traceID]
+ tr, ok := idx.cpxTile.GetTile(types.IncludeIgnoredTraces).Traces[traceID]
if !ok {
return tiling.MissingDigest, nil
}
@@ -264,9 +264,9 @@
}
// Find and return the most recent positive digest in the Trace.
- for i := len(trace.Digests) - 1; i >= 0; i-- {
- digest := trace.Digests[i]
- if digest != tiling.MissingDigest && exps.Classification(trace.TestName(), digest) == expectations.Positive {
+ for i := len(tr.Digests) - 1; i >= 0; i-- {
+ digest := tr.Digests[i]
+ if digest != tiling.MissingDigest && exps.Classification(tr.TestName(), digest) == expectations.Positive {
return digest, nil
}
}
@@ -384,15 +384,11 @@
return nil
}
- // We can start indexing the Changelists right away since it only depends on the expectations
- // (and nothing from the master branch index).
- go util.RepeatCtx(ctx, interval, ix.calcChangelistIndices)
-
defer shared.NewMetricsTimer("initial_synchronous_index").Stop()
// Build the first index synchronously.
tileStream := tilesource.GetTileStreamNow(ix.TileSource, interval, "gold-indexer")
if err := ix.executePipeline(ctx, <-tileStream); err != nil {
- return err
+ return skerr.Wrap(err)
}
// When the master expectations change, update the blamer and its dependents. This channel
@@ -453,6 +449,10 @@
}
}()
+ // Start indexing the CLs now that the first index has been populated (we need the
+ // primary branch index to get the digests to diff against).
+ go util.RepeatCtx(ctx, interval, ix.calcChangelistIndices)
+
return nil
}
@@ -555,14 +555,14 @@
idx := state.(*SearchIndex)
for _, is := range types.IgnoreStates {
t := idx.cpxTile.GetTile(is)
- for id, trace := range t.Traces {
- if trace == nil {
+ for id, tr := range t.Traces {
+ if tr == nil {
sklog.Warningf("Unexpected nil trace id %s", id)
continue
}
tp := tiling.TracePair{
ID: id,
- Trace: trace,
+ Trace: tr,
}
// Pre-slice the data by IgnoreState, then by IgnoreState and Corpus, finally by all
// three of IgnoreState/Corpus/Test. We shouldn't allow queries by Corpus w/o specifying
@@ -575,14 +575,14 @@
ignoreAndCorpus := preSliceGroup{
IgnoreState: is,
- Corpus: trace.Corpus(),
+ Corpus: tr.Corpus(),
}
idx.preSliced[ignoreAndCorpus] = append(idx.preSliced[ignoreAndCorpus], &tp)
ignoreCorpusTest := preSliceGroup{
IgnoreState: is,
- Corpus: trace.Corpus(),
- Test: trace.TestName(),
+ Corpus: tr.Corpus(),
+ Test: tr.TestName(),
}
idx.preSliced[ignoreCorpusTest] = append(idx.preSliced[ignoreCorpusTest], &tp)
}
@@ -745,10 +745,10 @@
// Update the metric when we return (either from error or because we completed indexing).
defer metrics2.GetInt64Metric(indexedCLsMetric).Update(int64(ix.changelistIndices.ItemCount()))
// Make sure this doesn't take arbitrarily long.
- ctx, cancel := context.WithTimeout(ctx, 20*time.Minute)
+ ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
now := time.Now()
- masterExp, err := ix.ExpectationsStore.Get(ctx)
+ primaryExp, err := ix.ExpectationsStore.Get(ctx)
if err != nil {
sklog.Errorf("Could not get expectations for changelist indices: %s", err)
return
@@ -784,7 +784,7 @@
if err != nil {
return skerr.Wrapf(err, "loading expectations for cl %s (%s)", cl.SystemID, system.ID)
}
- exps := expectations.Join(clExps, masterExp)
+ exps := expectations.Join(clExps, primaryExp)
clKey := fmt.Sprintf("%s_%s", system.ID, cl.SystemID)
clIdx, ok := ix.getCLIndex(clKey)
@@ -821,7 +821,7 @@
return skerr.Wrap(err)
}
untriagedResults, params := indexTryJobResults(ctx, existingUntriagedResults, xtjr, exps)
- if err := ix.sendCLWorkToDiffCalculators(ctx, xtjr, system.ID+"_"+cl.SystemID); err != nil {
+ if err := ix.sendCLWorkToDiffCalculators(ctx, primaryExp, xtjr, system.ID+"_"+cl.SystemID); err != nil {
return skerr.Wrap(err)
}
// Copy the existing ParamSet into the newly created one. It is important to copy it from
@@ -884,7 +884,7 @@
return combined, params
}
-func (ix *Indexer) sendCLWorkToDiffCalculators(ctx context.Context, xtjr []tjstore.TryJobResult, clID string) error {
+func (ix *Indexer) sendCLWorkToDiffCalculators(ctx context.Context, primaryExp expectations.Classifier, xtjr []tjstore.TryJobResult, clID string) error {
ctx, span := trace.StartSpan(ctx, "indexer_sendCLWorkToDiffCalculators")
defer span.End()
if len(xtjr) == 0 {
@@ -893,20 +893,48 @@
}
idx := ix.getIndex()
if idx == nil {
- return nil
+ // Should not happen because we compute the primary branch index synchronously
+ // before starting to index CLs.
+ return skerr.Fmt("Primary branch index not ready yet")
}
- exp, err := idx.expectationsStore.Get(ctx)
+ digestsPerGrouping, err := groupDataFromTryJobs(ctx, xtjr)
if err != nil {
return skerr.Wrap(err)
}
- _, spanTryjobData := trace.StartSpan(ctx, "getTryjobData")
- spanTryjobData.AddAttributes(trace.Int64Attribute("data_points", int64(len(xtjr))))
+ left, right, err := addDataFromPrimaryBranch(ctx, idx, digestsPerGrouping, primaryExp)
+ if err != nil {
+ return skerr.Wrap(err)
+ }
+
+ sklog.Infof("Sending diff messages for CL %s covering %d groupings to diffcalculator", clID, len(left))
+ for hg := range left {
+ grouping := paramtools.Params{
+ types.CorpusField: hg[0],
+ types.PrimaryKeyField: hg[1],
+ }
+ leftDigests := left[hg].Keys()
+ rightDigests := right[hg].Keys()
+ // This should be pretty fast because it's just sending off the work, not blocking until
+ // the work is calculated.
+ if err := ix.DiffWorkPublisher.CalculateDiffs(ctx, grouping, leftDigests, rightDigests); err != nil {
+ return skerr.Wrapf(err, "publishing diff calculation for CL %s - %d, %d digests in grouping %v", clID, len(leftDigests), len(rightDigests), grouping)
+ }
+ }
+ return nil
+}
+
+// groupDataFromTryJobs takes the data from the provided TryJobResults and groups the unique
+// digests by grouping.
+func groupDataFromTryJobs(ctx context.Context, xtjr []tjstore.TryJobResult) (map[hashableGrouping]types.DigestSet, error) {
+ ctx, span := trace.StartSpan(ctx, "groupDataFromTryJobs")
+ span.AddAttributes(trace.Int64Attribute("data_points", int64(len(xtjr))))
+ defer span.End()
// The left and right digests will be the data from these tryjobs as well as the non-ignored
// data on the primary branch for the corresponding groupings.
digestsPerGrouping := map[hashableGrouping]types.DigestSet{}
for _, tjr := range xtjr {
if err := ctx.Err(); err != nil {
- return skerr.Wrap(err)
+ return nil, skerr.Wrap(err)
}
traceKeys := paramtools.Params{}
traceKeys.Add(tjr.GroupParams, tjr.ResultParams)
@@ -920,45 +948,39 @@
}
digestsPerGrouping[grouping] = uniqueDigests
}
- spanTryjobData.End()
+ return digestsPerGrouping, nil
+}
- tile := idx.cpxTile.GetTile(types.ExcludeIgnoredTraces)
- _, primaryBranchData := trace.StartSpan(ctx, "getDataFromPrimarybranch")
- primaryBranchData.AddAttributes(trace.Int64Attribute("groupings", int64(len(digestsPerGrouping))))
+// addDataFromPrimaryBranch adds the triaged, not ignored digests from the primary branch to
+// the provided map and returns it as the first return value (the left digests). It adds those same
+// digests to a new map and returns it as the second return value (the right digests).
+func addDataFromPrimaryBranch(ctx context.Context, idx *SearchIndex, leftDigests map[hashableGrouping]types.DigestSet, exp expectations.Classifier) (map[hashableGrouping]types.DigestSet, map[hashableGrouping]types.DigestSet, error) {
+ ctx, span := trace.StartSpan(ctx, "addDataFromPrimaryBranch")
+ span.AddAttributes(trace.Int64Attribute("groupings", int64(len(leftDigests))))
+ defer span.End()
+ countByTest := idx.dCounters[types.ExcludeIgnoredTraces].ByTest()
+ rightDigests := make(map[hashableGrouping]types.DigestSet, len(leftDigests))
// Add the digests from the primary branch (using the index)
- for grouping := range digestsPerGrouping {
+ for grouping := range leftDigests {
if err := ctx.Err(); err != nil {
- return skerr.Wrap(err)
+ return nil, nil, skerr.Wrap(err)
}
- q := paramtools.ParamSet{
- types.CorpusField: []string{grouping[0]},
- types.PrimaryKeyField: []string{grouping[1]},
- }
- uniqueDigests := digestsPerGrouping[grouping]
- dc := idx.dCounters[types.ExcludeIgnoredTraces].ByQuery(tile, q)
- for digest := range dc {
+ allLeftDigests := leftDigests[grouping]
+ allRightDigests := types.DigestSet{}
+ // We assume that test names are unique across corpora. This may not be true in general,
+ // but it allows us to avoid iterating the tile again.
+ for digest := range countByTest[types.TestName(grouping[1])] {
if exp.Classification(types.TestName(grouping[1]), digest) != expectations.Untriaged {
- uniqueDigests[digest] = true
+ allLeftDigests[digest] = true
+ allRightDigests[digest] = true
}
}
- digestsPerGrouping[grouping] = uniqueDigests
+ // This won't make a new key in the map, so it should be safe overwrite this key and not
+ // affect iteration.
+ leftDigests[grouping] = allLeftDigests
+ rightDigests[grouping] = allRightDigests
}
- primaryBranchData.End()
-
- sklog.Infof("Sending diff messages for CL %s covering %d groupings to diffcalculator", clID, len(digestsPerGrouping))
- for hg, ds := range digestsPerGrouping {
- grouping := paramtools.Params{
- types.CorpusField: hg[0],
- types.PrimaryKeyField: hg[1],
- }
- digests := ds.Keys()
- // This should be pretty fast because it's just sending off the work, not blocking until
- // the work is calculated.
- if err := ix.DiffWorkPublisher.CalculateDiffs(ctx, grouping, digests, digests); err != nil {
- return skerr.Wrapf(err, "publishing diff calculation for CL %s - %d digests in grouping %v", clID, len(digests), grouping)
- }
- }
- return nil
+ return leftDigests, rightDigests, nil
}
// getCLIndex is a helper that returns the appropriately typed element from changelistIndices.
diff --git a/golden/go/indexer/indexer_test.go b/golden/go/indexer/indexer_test.go
index ef1e9f8..1e1333a 100644
--- a/golden/go/indexer/indexer_test.go
+++ b/golden/go/indexer/indexer_test.go
@@ -19,6 +19,7 @@
"go.skia.org/infra/golden/go/clstore"
mock_clstore "go.skia.org/infra/golden/go/clstore/mocks"
"go.skia.org/infra/golden/go/code_review"
+ "go.skia.org/infra/golden/go/diff"
diff_mocks "go.skia.org/infra/golden/go/diff/mocks"
mock_diffstore "go.skia.org/infra/golden/go/diffstore/mocks"
"go.skia.org/infra/golden/go/digest_counter"
@@ -300,22 +301,27 @@
"day_of_week": "wednesday",
}
+ resultParams := paramtools.Params{ // all data points are for the same test.
+ types.CorpusField: "some_corpus",
+ types.PrimaryKeyField: string(data.AlphaTest),
+ }
+
mts.On("GetResults", testutils.AnyContext, firstCombinedID, time.Time{}).Return([]tjstore.TryJobResult{
{
- ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+ ResultParams: resultParams,
GroupParams: androidGroup,
Options: firstOptionalGroup,
Digest: data.AlphaPositiveDigest,
// Other fields ignored
},
{
- ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+ ResultParams: resultParams,
GroupParams: iosGroup,
Options: firstOptionalGroup,
Digest: data.AlphaNegativeDigest,
},
{
- ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+ ResultParams: resultParams,
GroupParams: androidGroup,
Options: secondOptionalGroup,
Digest: data.AlphaUntriagedDigest,
@@ -323,26 +329,41 @@
}, nil)
mts.On("GetResults", testutils.AnyContext, secondCombinedID, time.Time{}).Return([]tjstore.TryJobResult{
{
- ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+ ResultParams: resultParams,
GroupParams: androidGroup,
Options: firstOptionalGroup,
Digest: data.AlphaPositiveDigest,
},
{
- ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+ ResultParams: resultParams,
GroupParams: iosGroup,
Options: firstOptionalGroup,
// Note, for this CL, this digest has not yet been triaged.
Digest: data.AlphaNegativeDigest,
},
{
- ResultParams: paramtools.Params{types.PrimaryKeyField: string(data.AlphaTest)},
+ ResultParams: resultParams,
GroupParams: androidGroup,
Options: firstOptionalGroup,
Digest: data.AlphaUntriagedDigest,
},
}, nil)
+ expectedGrouping := paramtools.Params{
+ types.CorpusField: "some_corpus",
+ types.PrimaryKeyField: string(data.AlphaTest),
+ }
+ leftDigestsMatcher := mock.MatchedBy(func(left []types.Digest) bool {
+ assert.ElementsMatch(t, []types.Digest{data.AlphaUntriagedDigest, data.AlphaPositiveDigest, data.AlphaNegativeDigest}, left)
+ return true
+ })
+ rightDigestsMatcher := mock.MatchedBy(func(right []types.Digest) bool {
+ assert.ElementsMatch(t, []types.Digest{data.AlphaPositiveDigest}, right)
+ return true
+ })
+ mdc := &diff_mocks.Calculator{}
+ mdc.On("CalculateDiffs", testutils.AnyContext, expectedGrouping, leftDigestsMatcher, rightDigestsMatcher).Return(nil)
+
ctx := context.Background()
ic := IndexerConfig{
ExpectationsStore: mes,
@@ -354,10 +375,12 @@
// URLTemplate and Client are unused here
},
},
+ DiffWorkPublisher: mdc,
}
ixr, err := New(ctx, ic, 0)
require.NoError(t, err)
ixr.changelistsReindexed.Reset()
+ ixr.lastMasterIndex = fakeSearchIndex()
ixr.calcChangelistIndices(ctx)
@@ -374,6 +397,7 @@
"model": []string{"crosshatch", "iphone3"},
"name": []string{"test_alpha"},
"os": []string{"Android", "iOS"},
+ "source_type": []string{"some_corpus"},
}, clIdx.ParamSet)
clIdx = ixr.GetIndexForCL(gerritCRS, secondCLID)
@@ -391,11 +415,27 @@
"model": []string{"crosshatch", "iphone3"},
"name": []string{"test_alpha"},
"os": []string{"Android", "iOS"},
+ "source_type": []string{"some_corpus"},
}, clIdx.ParamSet)
assert.Equal(t, int64(2), ixr.changelistsReindexed.Get())
}
+// noopPublisher returns a DiffPublisher that accepts any and all calls to CalculateDiffs.
+func noopPublisher() diff.Calculator {
+ mdc := diff_mocks.Calculator{}
+ mdc.On("CalculateDiffs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
+ return &mdc
+}
+
+// fakeSearchIndex returns an arbitrary SearchIndex with enough data to compute CL diff messages.
+func fakeSearchIndex() *SearchIndex {
+ _, tile, _ := makeComplexTileWithCrosshatchIgnores()
+ return &SearchIndex{
+ dCounters: [2]digest_counter.DigestCounter{digest_counter.New(tile), digest_counter.New(tile)},
+ }
+}
+
func TestIndexer_CalcChangelistIndices_HasIndexForPreviousPS_Success(t *testing.T) {
unittest.SmallTest(t)
@@ -470,6 +510,7 @@
// URLTemplate and Client are unused here
},
},
+ DiffWorkPublisher: noopPublisher(),
}
ixr, err := New(ctx, ic, 0)
require.NoError(t, err)
@@ -506,6 +547,7 @@
ComputedTS: longAgo,
}
ixr.changelistIndices.Set("gerrit_111111", &previousIdx, 0)
+ ixr.lastMasterIndex = fakeSearchIndex()
ixr.calcChangelistIndices(ctx)
@@ -594,6 +636,7 @@
// URLTemplate and Client are unused here
},
},
+ DiffWorkPublisher: noopPublisher(),
}
ixr, err := New(ctx, ic, 0)
require.NoError(t, err)
@@ -624,7 +667,7 @@
ComputedTS: longAgo,
}
ixr.changelistIndices.Set("gerrit_111111", &previousIdx, 0)
-
+ ixr.lastMasterIndex = fakeSearchIndex()
ixr.calcChangelistIndices(ctx)
clIdx := ixr.GetIndexForCL(gerritCRS, clID)
diff --git a/golden/k8s-config-templates/gold-baselineserver-template.yaml b/golden/k8s-config-templates/gold-baselineserver-template.yaml
index ec3a47d..021270b 100644
--- a/golden/k8s-config-templates/gold-baselineserver-template.yaml
+++ b/golden/k8s-config-templates/gold-baselineserver-template.yaml
@@ -80,6 +80,10 @@
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /etc/gold-secrets/service-account.json
+ - name: K8S_POD_NAME
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.name
resources:
requests:
memory: "500Mi"
diff --git a/golden/k8s-config-templates/gold-diffserver-template.yaml b/golden/k8s-config-templates/gold-diffserver-template.yaml
index 2d4e3c2..df0e5b7 100644
--- a/golden/k8s-config-templates/gold-diffserver-template.yaml
+++ b/golden/k8s-config-templates/gold-diffserver-template.yaml
@@ -64,6 +64,10 @@
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /var/secrets/google/service-account.json
+ - name: K8S_POD_NAME
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.name
resources:
requests:
memory: "{{.K8S_MEMORY}}"
diff --git a/golden/k8s-config-templates/gold-ingestion-bt-template.yaml b/golden/k8s-config-templates/gold-ingestion-bt-template.yaml
index bb0c0c7..3b2cf73 100644
--- a/golden/k8s-config-templates/gold-ingestion-bt-template.yaml
+++ b/golden/k8s-config-templates/gold-ingestion-bt-template.yaml
@@ -71,6 +71,10 @@
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /var/secrets/google/service-account.json
+ - name: K8S_POD_NAME
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.name
resources:
requests:
memory: "4Gi"
diff --git a/golden/k8s-config-templates/gold-skiacorrectness-template.yaml b/golden/k8s-config-templates/gold-skiacorrectness-template.yaml
index 827c562..39aa5b5 100644
--- a/golden/k8s-config-templates/gold-skiacorrectness-template.yaml
+++ b/golden/k8s-config-templates/gold-skiacorrectness-template.yaml
@@ -65,6 +65,10 @@
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /var/secrets/google/service-account.json
+ - name: K8S_POD_NAME
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.name
resources:
requests:
memory: "{{.K8S_MEMORY}}"