| // Package indexer continuously creates an index of the test results |
| // as the tiles, expectations, and ignores change. |
| package indexer |
| |
| import ( |
| "context" |
| "fmt" |
| "sync" |
| "time" |
| |
| ttlcache "github.com/patrickmn/go-cache" |
| "go.opencensus.io/trace" |
| |
| "go.skia.org/infra/go/metrics2" |
| "go.skia.org/infra/go/paramtools" |
| "go.skia.org/infra/go/skerr" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/util" |
| "go.skia.org/infra/golden/go/blame" |
| "go.skia.org/infra/golden/go/clstore" |
| "go.skia.org/infra/golden/go/digest_counter" |
| "go.skia.org/infra/golden/go/expectations" |
| "go.skia.org/infra/golden/go/paramsets" |
| "go.skia.org/infra/golden/go/pdag" |
| "go.skia.org/infra/golden/go/storage" |
| "go.skia.org/infra/golden/go/summary" |
| "go.skia.org/infra/golden/go/tilesource" |
| "go.skia.org/infra/golden/go/tiling" |
| "go.skia.org/infra/golden/go/tjstore" |
| "go.skia.org/infra/golden/go/types" |
| ) |
| |
| const ( |
| // Metric to track the number of digests that do not have be uploaded by bots. |
| knownHashesMetric = "known_digests" |
| // Metric to track the number of changelists we currently have indexed. |
| indexedCLsMetric = "gold_indexed_changelists" |
| |
| reindexedCLsMetric = "gold_indexer_changelists_reindexed" |
| ) |
| |
| // SearchIndex contains everything that is necessary to search |
| // our current knowledge about test results. It should be |
| // considered as immutable. Whenever the underlying data changes, |
| // a new index is calculated via a pdag. |
| type SearchIndex struct { |
| searchIndexConfig |
| // The indices of these arrays are the int values of types.IgnoreState |
| dCounters [2]digest_counter.DigestCounter |
| summaries [2]countsAndBlames |
| paramsetSummaries [2]paramsets.ParamSummary |
| preSliced map[preSliceGroup][]*tiling.TracePair |
| |
| cpxTile tiling.ComplexTile |
| blamer blame.Blamer |
| |
| // This is set by the indexing pipeline when we just want to update |
| // individual tests that have changed. |
| testNames types.TestNameSet |
| } |
| |
| type preSliceGroup struct { |
| IgnoreState types.IgnoreState |
| Corpus string |
| Test types.TestName |
| } |
| |
| // countsAndBlame makes the type declaration of SearchIndex a little nicer to read. |
| type countsAndBlames []*summary.TriageStatus |
| |
| type searchIndexConfig struct { |
| expectationsStore expectations.Store |
| gcsClient storage.GCSClient |
| } |
| |
| // newSearchIndex creates a new instance of SearchIndex. It is not intended to |
| // be used outside of this package. SearchIndex instances are created by the |
| // Indexer and retrieved via GetIndex(). |
| func newSearchIndex(sic searchIndexConfig, cpxTile tiling.ComplexTile) *SearchIndex { |
| return &SearchIndex{ |
| searchIndexConfig: sic, |
| // The indices of these slices are the int values of types.IgnoreState |
| dCounters: [2]digest_counter.DigestCounter{}, |
| summaries: [2]countsAndBlames{}, |
| paramsetSummaries: [2]paramsets.ParamSummary{}, |
| preSliced: map[preSliceGroup][]*tiling.TracePair{}, |
| cpxTile: cpxTile, |
| } |
| } |
| |
| // SearchIndexForTesting returns filled in search index to be used when testing. Note that the |
| // indices of the arrays are the int values of types.IgnoreState |
| func SearchIndexForTesting(cpxTile tiling.ComplexTile, dc [2]digest_counter.DigestCounter, pm [2]paramsets.ParamSummary, exp expectations.Store, b blame.Blamer) (*SearchIndex, error) { |
| s := &SearchIndex{ |
| searchIndexConfig: searchIndexConfig{ |
| expectationsStore: exp, |
| }, |
| dCounters: dc, |
| summaries: [2]countsAndBlames{}, |
| paramsetSummaries: pm, |
| preSliced: map[preSliceGroup][]*tiling.TracePair{}, |
| blamer: b, |
| cpxTile: cpxTile, |
| } |
| // Providing the context.Background here is ok because this function is only going to be |
| // called from tests. |
| return s, preSliceData(context.Background(), s) |
| } |
| |
| // Tile implements the IndexSearcher interface. |
| func (idx *SearchIndex) Tile() tiling.ComplexTile { |
| return idx.cpxTile |
| } |
| |
| // GetIgnoreMatcher implements the IndexSearcher interface. |
| func (idx *SearchIndex) GetIgnoreMatcher() paramtools.ParamMatcher { |
| return idx.cpxTile.IgnoreRules() |
| } |
| |
| // DigestCountsByTest implements the IndexSearcher interface. |
| func (idx *SearchIndex) DigestCountsByTest(is types.IgnoreState) map[types.TestName]digest_counter.DigestCount { |
| return idx.dCounters[is].ByTest() |
| } |
| |
| // MaxDigestsByTest implements the IndexSearcher interface. |
| func (idx *SearchIndex) MaxDigestsByTest(is types.IgnoreState) map[types.TestName]types.DigestSet { |
| return idx.dCounters[is].MaxDigestsByTest() |
| } |
| |
| // DigestCountsByTrace implements the IndexSearcher interface. |
| func (idx *SearchIndex) DigestCountsByTrace(is types.IgnoreState) map[tiling.TraceID]digest_counter.DigestCount { |
| return idx.dCounters[is].ByTrace() |
| } |
| |
| // DigestCountsByQuery implements the IndexSearcher interface. |
| func (idx *SearchIndex) DigestCountsByQuery(query paramtools.ParamSet, is types.IgnoreState) digest_counter.DigestCount { |
| return idx.dCounters[is].ByQuery(idx.cpxTile.GetTile(is), query) |
| } |
| |
| // GetSummaries implements the IndexSearcher interface. |
| func (idx *SearchIndex) GetSummaries(is types.IgnoreState) []*summary.TriageStatus { |
| return idx.summaries[is] |
| } |
| |
| // SummarizeByGrouping implements the IndexSearcher interface. |
| func (idx *SearchIndex) SummarizeByGrouping(ctx context.Context, corpus string, query paramtools.ParamSet, is types.IgnoreState, head bool) ([]*summary.TriageStatus, error) { |
| exp, err := idx.expectationsStore.Get(ctx) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| // The summaries are broken down by grouping (currently corpus and test name). Conveniently, |
| // we already have the traces broken down by those areas, and summaries are independent, so we |
| // can calculate them in parallel. |
| type groupedTracePairs []*tiling.TracePair |
| var groups []groupedTracePairs |
| for g, traces := range idx.preSliced { |
| if g.IgnoreState == is && g.Corpus == corpus && g.Test != "" { |
| groups = append(groups, traces) |
| } |
| } |
| rv := make([]*summary.TriageStatus, len(groups)) |
| wg := sync.WaitGroup{} |
| for i, g := range groups { |
| wg.Add(1) |
| go func(slice int, gtp groupedTracePairs) { |
| defer wg.Done() |
| d := summary.Data{ |
| Traces: gtp, |
| // These are all thread-safe, so they can be shared. |
| Expectations: exp, |
| ByTrace: idx.dCounters[is].ByTrace(), |
| Blamer: idx.blamer, |
| } |
| ts := d.Calculate(nil, query, head) |
| if len(ts) > 1 { |
| // this should never happen, as we'd only get multiple if there were multiple |
| // tests in the pre-sliced data (e.g. our pre-slicing code is bugged). |
| sklog.Warningf("Summary Calculation should always be length 1, but wasn't %#v", ts) |
| return |
| } else if len(ts) == 0 { |
| // This will happen if query removes all of the traces belonging to this test. |
| // It results in a nil in the return value; if that is a problem we can either |
| // fill in a zeroish value or a TriageStatus with Test/Corpus filled and 0 in the |
| // counts. |
| return |
| } |
| rv[slice] = ts[0] |
| }(i, g) |
| } |
| wg.Wait() |
| return rv, nil |
| } |
| |
| // GetParamsetSummary implements the IndexSearcher interface. |
| func (idx *SearchIndex) GetParamsetSummary(test types.TestName, digest types.Digest, is types.IgnoreState) paramtools.ParamSet { |
| return idx.paramsetSummaries[is].Get(test, digest) |
| } |
| |
| // GetParamsetSummaryByTest implements the IndexSearcher interface. |
| func (idx *SearchIndex) GetParamsetSummaryByTest(is types.IgnoreState) map[types.TestName]map[types.Digest]paramtools.ParamSet { |
| return idx.paramsetSummaries[is].GetByTest() |
| } |
| |
| // GetBlame implements the IndexSearcher interface. |
| func (idx *SearchIndex) GetBlame(test types.TestName, digest types.Digest, commits []tiling.Commit) blame.BlameDistribution { |
| if idx.blamer == nil { |
| // should never happen - indexer should have this initialized |
| // before the web server starts serving requests. |
| return blame.BlameDistribution{} |
| } |
| return idx.blamer.GetBlame(test, digest, commits) |
| } |
| |
| // SlicedTraces returns a slice of TracePairs that match the query and the ignore state. |
| // This is meant to be a superset of traces, as only the corpus and testname from the query are |
| // used for this pre-filter step. |
| func (idx *SearchIndex) SlicedTraces(is types.IgnoreState, query map[string][]string) []*tiling.TracePair { |
| if len(query[types.CorpusField]) == 0 { |
| return idx.preSliced[preSliceGroup{ |
| IgnoreState: is, |
| }] |
| } |
| var rv []*tiling.TracePair |
| for _, corpus := range query[types.CorpusField] { |
| if len(query[types.PrimaryKeyField]) == 0 { |
| rv = append(rv, idx.preSliced[preSliceGroup{ |
| IgnoreState: is, |
| Corpus: corpus, |
| }]...) |
| } else { |
| for _, tn := range query[types.PrimaryKeyField] { |
| rv = append(rv, idx.preSliced[preSliceGroup{ |
| IgnoreState: is, |
| Corpus: corpus, |
| Test: types.TestName(tn), |
| }]...) |
| } |
| } |
| } |
| return rv |
| } |
| |
| // MostRecentPositiveDigest implements the IndexSearcher interface. |
| func (idx *SearchIndex) MostRecentPositiveDigest(ctx context.Context, traceID tiling.TraceID) (types.Digest, error) { |
| defer metrics2.FuncTimer().Stop() |
| |
| // Retrieve Trace for the given traceID. |
| tr, ok := idx.cpxTile.GetTile(types.IncludeIgnoredTraces).Traces[traceID] |
| if !ok { |
| return tiling.MissingDigest, nil |
| } |
| |
| // Retrieve expectations. |
| exps, err := idx.expectationsStore.Get(ctx) |
| if err != nil { |
| return "", skerr.Wrapf(err, "retrieving expectations (traceID=%q)", traceID) |
| } |
| |
| // Find and return the most recent positive digest in the Trace. |
| for i := len(tr.Digests) - 1; i >= 0; i-- { |
| digest := tr.Digests[i] |
| if digest != tiling.MissingDigest && exps.Classification(tr.TestName(), digest) == expectations.Positive { |
| return digest, nil |
| } |
| } |
| return tiling.MissingDigest, nil |
| } |
| |
| type IndexerConfig struct { |
| ExpChangeListener expectations.ChangeEventRegisterer |
| ExpectationsStore expectations.Store |
| GCSClient storage.GCSClient |
| ReviewSystems []clstore.ReviewSystem |
| TileSource tilesource.TileSource |
| TryJobStore tjstore.Store |
| } |
| |
| // Indexer is the type that continuously processes data as the underlying |
| // data change. It uses a DAG that encodes the dependencies of the |
| // different components of an index and creates a processing pipeline on top |
| // of it. |
| type Indexer struct { |
| IndexerConfig |
| |
| pipeline *pdag.Node |
| indexTestsNode *pdag.Node |
| lastMasterIndex *SearchIndex |
| masterIndexMutex sync.RWMutex |
| |
| changelistIndices *ttlcache.Cache |
| |
| changelistsReindexed metrics2.Counter |
| } |
| |
| // New returns a new IndexSource instance. It synchronously indexes the initially |
| // available tile. If the indexing fails an error is returned. |
| // The provided interval defines how often the index should be refreshed. |
| func New(ctx context.Context, ic IndexerConfig, interval time.Duration) (*Indexer, error) { |
| ret := &Indexer{ |
| IndexerConfig: ic, |
| changelistIndices: ttlcache.New(changelistCacheExpirationDuration, changelistCacheExpirationDuration), |
| changelistsReindexed: metrics2.GetCounter(reindexedCLsMetric), |
| } |
| |
| // Set up the processing pipeline. |
| root := pdag.NewNodeWithParents(pdag.NoOp) |
| |
| // At the top level, Add the DigestCounters... |
| countsNodeInclude := root.Child(calcDigestCountsInclude) |
| // These are run in parallel because they can take tens of seconds |
| // in large repos like Skia. |
| countsNodeExclude := root.Child(calcDigestCountsExclude) |
| |
| preSliceNode := root.Child(preSliceData) |
| |
| // Node that triggers blame and writing baselines. |
| // This is used to trigger when expectations change. |
| // We don't need to re-calculate DigestCounts if the |
| // expectations change because the DigestCounts don't care about |
| // the expectations, only on the tile. |
| indexTestsNode := root.Child(pdag.NoOp) |
| |
| // ... and invoke the Blamer to calculate the blames. |
| blamerNode := indexTestsNode.Child(calcBlame) |
| |
| // Parameters depend on DigestCounter. |
| paramsNodeInclude := pdag.NewNodeWithParents(calcParamsetsInclude, countsNodeInclude) |
| // These are run in parallel because they can take tens of seconds |
| // in large repos like Skia. |
| paramsNodeExclude := pdag.NewNodeWithParents(calcParamsetsExclude, countsNodeExclude) |
| |
| // Write known hashes after ignores are computed. DigestCount is a |
| // convenient way to get all the hashes, so that's what this node uses. |
| writeHashes := countsNodeInclude.Child(writeKnownHashesList) |
| |
| // Summaries depend on DigestCounter and Blamer. |
| summariesNode := pdag.NewNodeWithParents(calcSummaries, countsNodeInclude, countsNodeExclude, blamerNode, preSliceNode) |
| |
| // Set the result on the Indexer instance, once summaries, parameters and writing |
| // the hash files is done. |
| pdag.NewNodeWithParents(ret.setIndex, summariesNode, paramsNodeInclude, paramsNodeExclude, writeHashes) |
| |
| ret.pipeline = root |
| ret.indexTestsNode = indexTestsNode |
| |
| // Process the first tile and start the indexing process. |
| return ret, ret.start(ctx, interval) |
| } |
| |
| // GetIndex implements the IndexSource interface. |
| func (ix *Indexer) GetIndex() IndexSearcher { |
| return ix.getIndex() |
| } |
| |
| // getIndex is like GetIndex but returns the bare struct, for |
| // internal package use. |
| func (ix *Indexer) getIndex() *SearchIndex { |
| ix.masterIndexMutex.RLock() |
| defer ix.masterIndexMutex.RUnlock() |
| return ix.lastMasterIndex |
| } |
| |
| // start builds the initial index and starts the background |
| // process to continuously build indices. |
| func (ix *Indexer) start(ctx context.Context, interval time.Duration) error { |
| if interval == 0 { |
| sklog.Warning("Not starting indexer because duration was 0") |
| return nil |
| } |
| ctx, span := trace.StartSpan(ctx, "initial_synchronous_index") |
| defer span.End() |
| // Build the first index synchronously. |
| tileStream := tilesource.GetTileStreamNow(ix.TileSource, interval, "gold-indexer") |
| if err := ix.executePipeline(ctx, <-tileStream); err != nil { |
| return skerr.Wrap(err) |
| } |
| |
| // When the master expectations change, update the blamer and its dependents. This channel |
| // will usually be empty, except when triaging happens. We set the size to be big enough to |
| // handle a large bulk triage, if needed. |
| expCh := make(chan expectations.ID, 100000) |
| ix.ExpChangeListener.ListenForChange(func(e expectations.ID) { |
| // Schedule the list of test names to be recalculated. |
| expCh <- e |
| }) |
| |
| // Keep building indices for different types of events. This is the central |
| // event loop of the indexer. |
| go func() { |
| var cpxTile tiling.ComplexTile |
| for { |
| if err := ctx.Err(); err != nil { |
| sklog.Warningf("Stopping indexer - context error: %s", err) |
| return |
| } |
| var testChanges []expectations.ID |
| |
| // See if there is a tile or changed tests. |
| cpxTile = nil |
| select { |
| // Catch a new tile. |
| case cpxTile = <-tileStream: |
| sklog.Infof("Indexer saw a new tile") |
| |
| // Catch any test changes. |
| case tn := <-expCh: |
| testChanges = append(testChanges, tn) |
| sklog.Infof("Indexer saw some tests change") |
| } |
| |
| // Drain all input channels, effectively bunching signals together that arrive in short |
| // succession. |
| DrainLoop: |
| for { |
| select { |
| case tn := <-expCh: |
| testChanges = append(testChanges, tn) |
| default: |
| break DrainLoop |
| } |
| } |
| |
| // If there is a tile, re-index everything and forget the |
| // individual tests that changed. |
| if cpxTile != nil { |
| if err := ix.executePipeline(ctx, cpxTile); err != nil { |
| sklog.Errorf("Unable to index tile: %s", err) |
| } |
| } else if len(testChanges) > 0 { |
| // Only index the tests that have changed. |
| ix.indexTests(ctx, testChanges) |
| } |
| } |
| }() |
| |
| // Start indexing the CLs now that the first index has been populated (we need the |
| // primary branch index to get the digests to diff against). |
| go util.RepeatCtx(ctx, interval, ix.calcChangelistIndices) |
| |
| return nil |
| } |
| |
| // executePipeline runs the given tile through the the indexing pipeline. |
| // pipeline.Trigger blocks until everything is done, so this function will as well. |
| func (ix *Indexer) executePipeline(ctx context.Context, cpxTile tiling.ComplexTile) error { |
| ctx, span := trace.StartSpan(ctx, "indexer_execute_pipeline") |
| defer span.End() |
| // Create a new index from the given tile. |
| sic := searchIndexConfig{ |
| expectationsStore: ix.ExpectationsStore, |
| gcsClient: ix.GCSClient, |
| } |
| return ix.pipeline.Trigger(ctx, newSearchIndex(sic, cpxTile)) |
| } |
| |
| // indexTest creates an updated index by indexing the given list of expectation changes. |
| func (ix *Indexer) indexTests(ctx context.Context, testChanges []expectations.ID) { |
| ctx, span := trace.StartSpan(ctx, "index_tests") |
| defer span.End() |
| // Get all the test names that had expectations changed. |
| testNames := types.TestNameSet{} |
| for _, d := range testChanges { |
| testNames[d.Grouping] = true |
| } |
| if len(testNames) == 0 { |
| return |
| } |
| |
| sklog.Infof("Going to re-index %d tests", len(testNames)) |
| |
| newIdx := ix.cloneLastIndex() |
| // Set the testNames such that we only recompute those tests. |
| newIdx.testNames = testNames |
| if err := ix.indexTestsNode.Trigger(ctx, newIdx); err != nil { |
| sklog.Errorf("Error indexing tests: %v \n\n Got error: %s", testNames.Keys(), err) |
| } |
| } |
| |
| // cloneLastIndex returns a copy of the most recent index. |
| func (ix *Indexer) cloneLastIndex() *SearchIndex { |
| lastIdx := ix.getIndex() |
| sic := searchIndexConfig{ |
| expectationsStore: ix.ExpectationsStore, |
| gcsClient: ix.GCSClient, |
| } |
| return &SearchIndex{ |
| searchIndexConfig: sic, |
| cpxTile: lastIdx.cpxTile, |
| dCounters: lastIdx.dCounters, // stay the same even if expectations change. |
| paramsetSummaries: lastIdx.paramsetSummaries, // stay the same even if expectations change. |
| preSliced: lastIdx.preSliced, // stay the same even if expectations change. |
| |
| summaries: [2]countsAndBlames{ |
| // the objects inside the summaries are immutable, but may be replaced if expectations |
| // are recalculated for a subset of tests. |
| lastIdx.summaries[types.ExcludeIgnoredTraces], |
| lastIdx.summaries[types.IncludeIgnoredTraces], |
| }, |
| |
| blamer: nil, // This will need to be recomputed if expectations change. |
| |
| // Force testNames to be empty, just to be sure we re-compute everything by default |
| testNames: nil, |
| } |
| } |
| |
| // setIndex sets the lastMasterIndex value at the very end of the pipeline. |
| func (ix *Indexer) setIndex(_ context.Context, state interface{}) error { |
| newIndex := state.(*SearchIndex) |
| ix.masterIndexMutex.Lock() |
| defer ix.masterIndexMutex.Unlock() |
| ix.lastMasterIndex = newIndex |
| return nil |
| } |
| |
| // calcDigestCountsInclude is the pipeline function to calculate DigestCounts from |
| // the full tile (not applying ignore rules) |
| func calcDigestCountsInclude(_ context.Context, state interface{}) error { |
| idx := state.(*SearchIndex) |
| is := types.IncludeIgnoredTraces |
| idx.dCounters[is] = digest_counter.New(idx.cpxTile.GetTile(is)) |
| return nil |
| } |
| |
| // calcDigestCountsExclude is the pipeline function to calculate DigestCounts from |
| // the partial tile (applying ignore rules). |
| func calcDigestCountsExclude(_ context.Context, state interface{}) error { |
| idx := state.(*SearchIndex) |
| is := types.ExcludeIgnoredTraces |
| idx.dCounters[is] = digest_counter.New(idx.cpxTile.GetTile(is)) |
| return nil |
| } |
| |
| // preSliceData is the pipeline function to pre-slice our traces. Currently, we pre-slice by |
| // corpus name and then by test name because this breaks our traces up into groups of <1000. |
| func preSliceData(_ context.Context, state interface{}) error { |
| idx := state.(*SearchIndex) |
| for _, is := range types.IgnoreStates { |
| t := idx.cpxTile.GetTile(is) |
| for id, tr := range t.Traces { |
| if tr == nil { |
| sklog.Warningf("Unexpected nil trace id %s", id) |
| continue |
| } |
| tp := tiling.TracePair{ |
| ID: id, |
| Trace: tr, |
| } |
| // Pre-slice the data by IgnoreState, then by IgnoreState and Corpus, finally by all |
| // three of IgnoreState/Corpus/Test. We shouldn't allow queries by Corpus w/o specifying |
| // IgnoreState, nor should we allow queries by TestName w/o specifying a Corpus or |
| // IgnoreState. |
| ignoreOnly := preSliceGroup{ |
| IgnoreState: is, |
| } |
| idx.preSliced[ignoreOnly] = append(idx.preSliced[ignoreOnly], &tp) |
| |
| ignoreAndCorpus := preSliceGroup{ |
| IgnoreState: is, |
| Corpus: tr.Corpus(), |
| } |
| idx.preSliced[ignoreAndCorpus] = append(idx.preSliced[ignoreAndCorpus], &tp) |
| |
| ignoreCorpusTest := preSliceGroup{ |
| IgnoreState: is, |
| Corpus: tr.Corpus(), |
| Test: tr.TestName(), |
| } |
| idx.preSliced[ignoreCorpusTest] = append(idx.preSliced[ignoreCorpusTest], &tp) |
| } |
| } |
| return nil |
| } |
| |
| // calcSummaries is the pipeline function to calculate the summaries. |
| func calcSummaries(ctx context.Context, state interface{}) error { |
| idx := state.(*SearchIndex) |
| exp, err := idx.expectationsStore.Get(ctx) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| for _, is := range types.IgnoreStates { |
| d := summary.Data{ |
| Traces: idx.SlicedTraces(is, nil), |
| Expectations: exp, |
| ByTrace: idx.dCounters[is].ByTrace(), |
| Blamer: idx.blamer, |
| } |
| sum := d.Calculate(idx.testNames, nil, true) |
| // If we have recalculated only a subset of tests, we want to keep the results from |
| // the previous scans and overwrite what we have just recomputed. |
| if len(idx.testNames) > 0 && len(idx.summaries[is]) > 0 { |
| idx.summaries[is] = summary.MergeSorted(idx.summaries[is], sum) |
| } else { |
| idx.summaries[is] = sum |
| } |
| } |
| return nil |
| } |
| |
| // calcParamsetsInclude is the pipeline function to calculate the parameters from |
| // the full tile (not applying ignore rules) |
| func calcParamsetsInclude(_ context.Context, state interface{}) error { |
| idx := state.(*SearchIndex) |
| is := types.IncludeIgnoredTraces |
| idx.paramsetSummaries[is] = paramsets.NewParamSummary(idx.cpxTile.GetTile(is), idx.dCounters[is]) |
| return nil |
| } |
| |
| // calcParamsetsExclude is the pipeline function to calculate the parameters from |
| // the partial tile (applying ignore rules) |
| func calcParamsetsExclude(_ context.Context, state interface{}) error { |
| idx := state.(*SearchIndex) |
| is := types.ExcludeIgnoredTraces |
| idx.paramsetSummaries[is] = paramsets.NewParamSummary(idx.cpxTile.GetTile(is), idx.dCounters[is]) |
| return nil |
| } |
| |
| // calcBlame is the pipeline function to calculate the blame. |
| func calcBlame(ctx context.Context, state interface{}) error { |
| idx := state.(*SearchIndex) |
| exp, err := idx.expectationsStore.Get(ctx) |
| if err != nil { |
| return skerr.Wrapf(err, "fetching expectations needed to calculate blame") |
| } |
| b, err := blame.New(idx.cpxTile.GetTile(types.ExcludeIgnoredTraces), exp) |
| if err != nil { |
| idx.blamer = nil |
| return skerr.Wrapf(err, "calculating blame") |
| } |
| idx.blamer = b |
| return nil |
| } |
| |
| func writeKnownHashesList(ctx context.Context, state interface{}) error { |
| idx := state.(*SearchIndex) |
| |
| // Only write the hash file if a storage client is available. |
| if idx.gcsClient == nil { |
| return nil |
| } |
| |
| // Trigger writing the hashes list. |
| go func() { |
| // Make sure this doesn't hang indefinitely. 2 minutes was chosen as a time that's plenty |
| // long to make sure it completes (usually takes only a few seconds). |
| ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) |
| defer cancel() |
| |
| byTest := idx.DigestCountsByTest(types.IncludeIgnoredTraces) |
| // Collect all hashes in the tile. |
| // TODO(kjlubick) Do we need to check that these images have actually been properly uploaded? |
| // For clients using goldctl, it doesn't matter since goldctl will re-upload images that |
| // aren't already in GCS. For clients that use known hashes to avoid writing more to disk |
| // than they need to (e.g. Skia), this may be important. |
| hashes := types.DigestSet{} |
| for _, test := range byTest { |
| for k := range test { |
| hashes[k] = true |
| } |
| } |
| |
| // Keep track of the number of known hashes since this directly affects how |
| // many images the bots have to upload. |
| metrics2.GetInt64Metric(knownHashesMetric).Update(int64(len(hashes))) |
| if err := idx.gcsClient.WriteKnownDigests(ctx, hashes.Keys()); err != nil { |
| sklog.Errorf("Error writing known digests list: %s", err) |
| } |
| sklog.Infof("Finished writing %d known hashes", len(hashes)) |
| }() |
| return nil |
| } |
| |
| const ( |
| // maxAgeOfOpenCLsToIndex is the maximum time between now and a CL's last updated time that we |
| // will still index. |
| maxAgeOfOpenCLsToIndex = 20 * 24 * time.Hour |
| // We only keep around open CLs in the index. When a CL is closed, we don't update the indices |
| // any more. These entries will expire and be removed from the cache after |
| // changelistCacheExpirationDuration time has passed. |
| changelistCacheExpirationDuration = 10 * 24 * time.Hour |
| // maxCLsToIndex is the maximum number of CLs we query each loop to index them. Hopefully this |
| // limit isn't reached regularly. |
| maxCLsToIndex = 2000 |
| ) |
| |
| // calcChangelistIndices goes through all open changelists within a given window and computes |
| // an index of them (e.g. the untriaged digests). |
| func (ix *Indexer) calcChangelistIndices(ctx context.Context) { |
| ctx, span := trace.StartSpan(ctx, "indexer_calcChangelistIndices") |
| defer span.End() |
| // Update the metric when we return (either from error or because we completed indexing). |
| defer metrics2.GetInt64Metric(indexedCLsMetric).Update(int64(ix.changelistIndices.ItemCount())) |
| // Make sure this doesn't take arbitrarily long. |
| ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) |
| defer cancel() |
| now := time.Now() |
| primaryExp, err := ix.ExpectationsStore.Get(ctx) |
| if err != nil { |
| sklog.Errorf("Could not get expectations for changelist indices: %s", err) |
| return |
| } |
| |
| for _, system := range ix.ReviewSystems { |
| // An arbitrary cut off to the amount of recent, open CLs we try to index. |
| recent := now.Add(-maxAgeOfOpenCLsToIndex) |
| xcl, _, err := system.Store.GetChangelists(ctx, clstore.SearchOptions{ |
| StartIdx: 0, |
| Limit: maxCLsToIndex, |
| OpenCLsOnly: true, |
| After: recent, |
| }) |
| if err != nil { |
| sklog.Errorf("Could not get recent changelists: %s", err) |
| return |
| } |
| |
| sklog.Infof("Indexing %d CLs", len(xcl)) |
| |
| const numChunks = 8 // arbitrarily picked, could likely be tuned based on contention of |
| // changelistCache |
| chunkSize := (len(xcl) / numChunks) + 1 // add one to avoid integer truncation. |
| err = util.ChunkIterParallel(ctx, len(xcl), chunkSize, func(ctx context.Context, startIdx int, endIdx int) error { |
| for _, cl := range xcl[startIdx:endIdx] { |
| if err := ctx.Err(); err != nil { |
| sklog.Errorf("Changelist indexing timed out (%v)", err) |
| return nil |
| } |
| issueExpStore := ix.ExpectationsStore.ForChangelist(cl.SystemID, system.ID) |
| clExps, err := issueExpStore.Get(ctx) |
| if err != nil { |
| return skerr.Wrapf(err, "loading expectations for cl %s (%s)", cl.SystemID, system.ID) |
| } |
| exps := expectations.Join(clExps, primaryExp) |
| |
| clKey := fmt.Sprintf("%s_%s", system.ID, cl.SystemID) |
| clIdx, ok := ix.getCLIndex(clKey) |
| // Ingestion should update this timestamp when it has uploaded a new file belonging to this |
| // changelist. We add a bit of a buffer period to avoid potential issues with a file being |
| // uploaded at the exact same time we create an index (skbug.com/10265). |
| updatedWithGracePeriod := cl.Updated.Add(30 * time.Second) |
| if !ok || clIdx.ComputedTS.Before(updatedWithGracePeriod) { |
| ix.changelistsReindexed.Inc(1) |
| // Compute it from scratch and store it to the index. |
| xps, err := system.Store.GetPatchsets(ctx, cl.SystemID) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| if len(xps) == 0 { |
| continue |
| } |
| latestPS := xps[len(xps)-1] |
| psID := tjstore.CombinedPSID{ |
| CL: cl.SystemID, |
| CRS: system.ID, |
| PS: latestPS.SystemID, |
| } |
| afterTime := time.Time{} |
| var existingUntriagedResults []tjstore.TryJobResult |
| // Test to see if we can do an incremental index (just for results that were uploaded |
| // for this patchset since the last time we indexed). |
| if ok && clIdx.LatestPatchset.PS == latestPS.SystemID { |
| afterTime = clIdx.ComputedTS |
| existingUntriagedResults = clIdx.UntriagedResults |
| } |
| xtjr, err := ix.TryJobStore.GetResults(ctx, psID, afterTime) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| untriagedResults, params := indexTryJobResults(ctx, existingUntriagedResults, xtjr, exps) |
| // Copy the existing ParamSet into the newly created one. It is important to copy it from |
| // old into new (and not new into old), so we don't cause a race condition on the cached |
| // ParamSet by writing to it while GetIndexForCL is reading from it. |
| params.AddParamSet(clIdx.ParamSet) |
| clIdx.ParamSet = params |
| clIdx.LatestPatchset = psID |
| clIdx.UntriagedResults = untriagedResults |
| clIdx.ComputedTS = now |
| } |
| ix.changelistIndices.Set(clKey, &clIdx, ttlcache.DefaultExpiration) |
| } |
| return nil |
| }) |
| if err != nil { |
| sklog.Errorf("Error indexing changelists from CRS %s: %s", system.ID, err) |
| } |
| } |
| } |
| |
| // indexTryJobResults goes through all the TryJobResults and returns results useful for indexing. |
| // Concretely, these results are a slice with just the untriaged results and a ParamSet with the |
| // observed params. |
| func indexTryJobResults(ctx context.Context, existing, newResults []tjstore.TryJobResult, exps expectations.Classifier) ([]tjstore.TryJobResult, paramtools.ParamSet) { |
| ctx, span := trace.StartSpan(ctx, "indexer_indexTryJobResults") |
| defer span.End() |
| params := paramtools.ParamSet{} |
| var newlyUntriagedResults []tjstore.TryJobResult |
| for _, tjr := range newResults { |
| params.AddParams(tjr.GroupParams) |
| params.AddParams(tjr.ResultParams) |
| params.AddParams(tjr.Options) |
| tn := types.TestName(tjr.ResultParams[types.PrimaryKeyField]) |
| if exps.Classification(tn, tjr.Digest) == expectations.Untriaged { |
| // If the same digest somehow shows up twice (maybe because of how we |
| alreadyInList := false |
| for _, existingResult := range existing { |
| if existingResult.Digest == tjr.Digest && existingResult.ResultParams[types.PrimaryKeyField] == tjr.ResultParams[types.PrimaryKeyField] { |
| alreadyInList = true |
| break |
| } |
| } |
| if !alreadyInList { |
| newlyUntriagedResults = append(newlyUntriagedResults, tjr) |
| } |
| } |
| } |
| if len(newlyUntriagedResults) == 0 { |
| return existing, params |
| } |
| |
| if len(existing) == 0 { |
| return newlyUntriagedResults, params |
| } |
| // make a copy of the slice, so as not to confuse the existing index. |
| combined := make([]tjstore.TryJobResult, 0, len(existing)+len(newlyUntriagedResults)) |
| combined = append(combined, existing...) |
| combined = append(combined, newlyUntriagedResults...) |
| return combined, params |
| } |
| |
| // getCLIndex is a helper that returns the appropriately typed element from changelistIndices. |
| // We return a struct and not a pointer so that we can update the index w/o having to need a mutex. |
| func (ix *Indexer) getCLIndex(key string) (ChangelistIndex, bool) { |
| clIdx, ok := ix.changelistIndices.Get(key) |
| if !ok || clIdx == nil { |
| return ChangelistIndex{}, false |
| } |
| return *clIdx.(*ChangelistIndex), true |
| } |
| |
| // GetIndexForCL implements the IndexSource interface. |
| func (ix *Indexer) GetIndexForCL(crs, clID string) *ChangelistIndex { |
| key := fmt.Sprintf("%s_%s", crs, clID) |
| clIdx, ok := ix.getCLIndex(key) |
| if !ok { |
| return nil |
| } |
| // Return a copy to prevent clients from messing with the cached version. |
| return clIdx.Copy() |
| } |
| |
| // Make sure SearchIndex fulfills the IndexSearcher interface |
| var _ IndexSearcher = (*SearchIndex)(nil) |
| |
| // Make sure Indexer fulfills the IndexSource interface |
| var _ IndexSource = (*Indexer)(nil) |