[gold] Extend timeout and add tracing to CL indexing.
I noticed some CLs (https://gold.skia.org/search?issue=341725&crs=gerrit&use_sql=true
and https://gold.skia.org/search?crs=gerrit&issue=341725&use_sql=true)
did not have their image diffs compared.
Looking in the logs, I saw CL indexing had failed. This should get some
more insight into why.
Bug: skia:10582
Change-Id: I39d184ccb2250ac31df35fc830ebd685f0e32516
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/371378
Reviewed-by: Kevin Lubick <kjlubick@google.com>
Commit-Queue: Kevin Lubick <kjlubick@google.com>
diff --git a/golden/go/indexer/indexer.go b/golden/go/indexer/indexer.go
index 0933606..721d198 100644
--- a/golden/go/indexer/indexer.go
+++ b/golden/go/indexer/indexer.go
@@ -9,15 +9,15 @@
"time"
ttlcache "github.com/patrickmn/go-cache"
- "go.skia.org/infra/go/util"
- "go.skia.org/infra/golden/go/clstore"
- "go.skia.org/infra/golden/go/tjstore"
+ "go.opencensus.io/trace"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
+ "go.skia.org/infra/go/util"
"go.skia.org/infra/golden/go/blame"
+ "go.skia.org/infra/golden/go/clstore"
"go.skia.org/infra/golden/go/diff"
"go.skia.org/infra/golden/go/digest_counter"
"go.skia.org/infra/golden/go/digesttools"
@@ -29,6 +29,7 @@
"go.skia.org/infra/golden/go/summary"
"go.skia.org/infra/golden/go/tilesource"
"go.skia.org/infra/golden/go/tiling"
+ "go.skia.org/infra/golden/go/tjstore"
"go.skia.org/infra/golden/go/types"
"go.skia.org/infra/golden/go/warmer"
)
@@ -739,11 +740,12 @@
// calcChangelistIndices goes through all open changelists within a given window and computes
// an index of them (e.g. the untriaged digests).
func (ix *Indexer) calcChangelistIndices(ctx context.Context) {
+ ctx, span := trace.StartSpan(ctx, "indexer_calcChangelistIndices")
+ defer span.End()
// Update the metric when we return (either from error or because we completed indexing).
defer metrics2.GetInt64Metric(indexedCLsMetric).Update(int64(ix.changelistIndices.ItemCount()))
- defer shared.NewMetricsTimer("indexer_calculate_changelist_indices").Stop()
// Make sure this doesn't take arbitrarily long.
- ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
+ ctx, cancel := context.WithTimeout(ctx, 20*time.Minute)
defer cancel()
now := time.Now()
masterExp, err := ix.ExpectationsStore.Get(ctx)
@@ -777,13 +779,14 @@
sklog.Errorf("Changelist indexing timed out (%v)", err)
return nil
}
-
+ ctx, expSpan := trace.StartSpan(ctx, "indexer_getCLExpectations")
issueExpStore := ix.ExpectationsStore.ForChangelist(cl.SystemID, system.ID)
clExps, err := issueExpStore.Get(ctx)
if err != nil {
return skerr.Wrapf(err, "loading expectations for cl %s (%s)", cl.SystemID, system.ID)
}
exps := expectations.Join(clExps, masterExp)
+ expSpan.End()
clKey := fmt.Sprintf("%s_%s", system.ID, cl.SystemID)
clIdx, ok := ix.getCLIndex(clKey)
@@ -819,8 +822,8 @@
if err != nil {
return skerr.Wrap(err)
}
- untriagedResults, params := indexTryJobResults(existingUntriagedResults, xtjr, exps)
- if err := ix.sendCLWorkToDiffCalculators(ctx, xtjr); err != nil {
+ untriagedResults, params := indexTryJobResults(ctx, existingUntriagedResults, xtjr, exps)
+ if err := ix.sendCLWorkToDiffCalculators(ctx, xtjr, system.ID+"_"+cl.SystemID); err != nil {
return skerr.Wrap(err)
}
// Copy the existing ParamSet into the newly created one. It is important to copy it from
@@ -845,7 +848,9 @@
// indexTryJobResults goes through all the TryJobResults and returns results useful for indexing.
// Concretely, these results are a slice with just the untriaged results and a ParamSet with the
// observed params.
-func indexTryJobResults(existing, newResults []tjstore.TryJobResult, exps expectations.Classifier) ([]tjstore.TryJobResult, paramtools.ParamSet) {
+func indexTryJobResults(ctx context.Context, existing, newResults []tjstore.TryJobResult, exps expectations.Classifier) ([]tjstore.TryJobResult, paramtools.ParamSet) {
+ ctx, span := trace.StartSpan(ctx, "indexer_indexTryJobResults")
+ defer span.End()
params := paramtools.ParamSet{}
var newlyUntriagedResults []tjstore.TryJobResult
for _, tjr := range newResults {
@@ -881,7 +886,13 @@
return combined, params
}
-func (ix *Indexer) sendCLWorkToDiffCalculators(ctx context.Context, xtjr []tjstore.TryJobResult) error {
+func (ix *Indexer) sendCLWorkToDiffCalculators(ctx context.Context, xtjr []tjstore.TryJobResult, clID string) error {
+ ctx, span := trace.StartSpan(ctx, "indexer_sendCLWorkToDiffCalculators")
+ defer span.End()
+ if len(xtjr) == 0 {
+ sklog.Infof("No new Tryjob results for CL %s", clID)
+ return nil
+ }
idx := ix.getIndex()
if idx == nil {
return nil
@@ -924,7 +935,7 @@
digestsPerGrouping[grouping] = uniqueDigests
}
- sklog.Infof("Sending CL-based message covering %d groupings to diffcalculator", len(digestsPerGrouping))
+ sklog.Infof("Sending diff messages for CL %s covering %d groupings to diffcalculator", clID, len(digestsPerGrouping))
for hg, ds := range digestsPerGrouping {
grouping := paramtools.Params{
types.CorpusField: hg[0],
@@ -934,7 +945,7 @@
// This should be pretty fast because it's just sending off the work, not blocking until
// the work is calculated.
if err := ix.DiffWorkPublisher.CalculateDiffs(ctx, grouping, digests, digests); err != nil {
- return skerr.Wrapf(err, "publishing diff calculation of %d CL digests in grouping %v", len(digests), grouping)
+ return skerr.Wrapf(err, "publishing diff calculation for CL %s - %d digests in grouping %v", clID, len(digests), grouping)
}
}
return nil
diff --git a/golden/go/tjstore/sqltjstore/sqltjstore.go b/golden/go/tjstore/sqltjstore/sqltjstore.go
index 3e42ad7..3d85990 100644
--- a/golden/go/tjstore/sqltjstore/sqltjstore.go
+++ b/golden/go/tjstore/sqltjstore/sqltjstore.go
@@ -11,6 +11,7 @@
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
+ "go.opencensus.io/trace"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/skerr"
@@ -141,6 +142,8 @@
// GetResults implements the tjstore.Store interface. Of note, it always returns a nil GroupParams
// because the way the data is stored, there is no way to know which params were ingested together.
func (s *StoreImpl) GetResults(ctx context.Context, cID tjstore.CombinedPSID, updatedAfter time.Time) ([]tjstore.TryJobResult, error) {
+ ctx, span := trace.StartSpan(ctx, "sqltjstore_GetResults")
+ defer span.End()
clID := sql.Qualify(cID.CRS, cID.CL)
psID := sql.Qualify(cID.CRS, cID.PS)