[gold] Extend timeout and add tracing to CL indexing.

I noticed some CLs (https://gold.skia.org/search?issue=341725&crs=gerrit&use_sql=true
and https://gold.skia.org/search?crs=gerrit&issue=341725&use_sql=true)
did not have their image diffs compared.

Looking in the logs, I saw CL indexing had failed. This should get some
more insight into why.

Bug: skia:10582
Change-Id: I39d184ccb2250ac31df35fc830ebd685f0e32516
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/371378
Reviewed-by: Kevin Lubick <kjlubick@google.com>
Commit-Queue: Kevin Lubick <kjlubick@google.com>
diff --git a/golden/go/indexer/indexer.go b/golden/go/indexer/indexer.go
index 0933606..721d198 100644
--- a/golden/go/indexer/indexer.go
+++ b/golden/go/indexer/indexer.go
@@ -9,15 +9,15 @@
 	"time"
 
 	ttlcache "github.com/patrickmn/go-cache"
-	"go.skia.org/infra/go/util"
-	"go.skia.org/infra/golden/go/clstore"
-	"go.skia.org/infra/golden/go/tjstore"
+	"go.opencensus.io/trace"
 
 	"go.skia.org/infra/go/metrics2"
 	"go.skia.org/infra/go/paramtools"
 	"go.skia.org/infra/go/skerr"
 	"go.skia.org/infra/go/sklog"
+	"go.skia.org/infra/go/util"
 	"go.skia.org/infra/golden/go/blame"
+	"go.skia.org/infra/golden/go/clstore"
 	"go.skia.org/infra/golden/go/diff"
 	"go.skia.org/infra/golden/go/digest_counter"
 	"go.skia.org/infra/golden/go/digesttools"
@@ -29,6 +29,7 @@
 	"go.skia.org/infra/golden/go/summary"
 	"go.skia.org/infra/golden/go/tilesource"
 	"go.skia.org/infra/golden/go/tiling"
+	"go.skia.org/infra/golden/go/tjstore"
 	"go.skia.org/infra/golden/go/types"
 	"go.skia.org/infra/golden/go/warmer"
 )
@@ -739,11 +740,12 @@
 // calcChangelistIndices goes through all open changelists within a given window and computes
 // an index of them (e.g. the untriaged digests).
 func (ix *Indexer) calcChangelistIndices(ctx context.Context) {
+	ctx, span := trace.StartSpan(ctx, "indexer_calcChangelistIndices")
+	defer span.End()
 	// Update the metric when we return (either from error or because we completed indexing).
 	defer metrics2.GetInt64Metric(indexedCLsMetric).Update(int64(ix.changelistIndices.ItemCount()))
-	defer shared.NewMetricsTimer("indexer_calculate_changelist_indices").Stop()
 	// Make sure this doesn't take arbitrarily long.
-	ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
+	ctx, cancel := context.WithTimeout(ctx, 20*time.Minute)
 	defer cancel()
 	now := time.Now()
 	masterExp, err := ix.ExpectationsStore.Get(ctx)
@@ -777,13 +779,14 @@
 					sklog.Errorf("Changelist indexing timed out (%v)", err)
 					return nil
 				}
-
+				ctx, expSpan := trace.StartSpan(ctx, "indexer_getCLExpectations")
 				issueExpStore := ix.ExpectationsStore.ForChangelist(cl.SystemID, system.ID)
 				clExps, err := issueExpStore.Get(ctx)
 				if err != nil {
 					return skerr.Wrapf(err, "loading expectations for cl %s (%s)", cl.SystemID, system.ID)
 				}
 				exps := expectations.Join(clExps, masterExp)
+				expSpan.End()
 
 				clKey := fmt.Sprintf("%s_%s", system.ID, cl.SystemID)
 				clIdx, ok := ix.getCLIndex(clKey)
@@ -819,8 +822,8 @@
 					if err != nil {
 						return skerr.Wrap(err)
 					}
-					untriagedResults, params := indexTryJobResults(existingUntriagedResults, xtjr, exps)
-					if err := ix.sendCLWorkToDiffCalculators(ctx, xtjr); err != nil {
+					untriagedResults, params := indexTryJobResults(ctx, existingUntriagedResults, xtjr, exps)
+					if err := ix.sendCLWorkToDiffCalculators(ctx, xtjr, system.ID+"_"+cl.SystemID); err != nil {
 						return skerr.Wrap(err)
 					}
 					// Copy the existing ParamSet into the newly created one. It is important to copy it from
@@ -845,7 +848,9 @@
 // indexTryJobResults goes through all the TryJobResults and returns results useful for indexing.
 // Concretely, these results are a slice with just the untriaged results and a ParamSet with the
 // observed params.
-func indexTryJobResults(existing, newResults []tjstore.TryJobResult, exps expectations.Classifier) ([]tjstore.TryJobResult, paramtools.ParamSet) {
+func indexTryJobResults(ctx context.Context, existing, newResults []tjstore.TryJobResult, exps expectations.Classifier) ([]tjstore.TryJobResult, paramtools.ParamSet) {
+	ctx, span := trace.StartSpan(ctx, "indexer_indexTryJobResults")
+	defer span.End()
 	params := paramtools.ParamSet{}
 	var newlyUntriagedResults []tjstore.TryJobResult
 	for _, tjr := range newResults {
@@ -881,7 +886,13 @@
 	return combined, params
 }
 
-func (ix *Indexer) sendCLWorkToDiffCalculators(ctx context.Context, xtjr []tjstore.TryJobResult) error {
+func (ix *Indexer) sendCLWorkToDiffCalculators(ctx context.Context, xtjr []tjstore.TryJobResult, clID string) error {
+	ctx, span := trace.StartSpan(ctx, "indexer_sendCLWorkToDiffCalculators")
+	defer span.End()
+	if len(xtjr) == 0 {
+		sklog.Infof("No new Tryjob results for CL %s", clID)
+		return nil
+	}
 	idx := ix.getIndex()
 	if idx == nil {
 		return nil
@@ -924,7 +935,7 @@
 		digestsPerGrouping[grouping] = uniqueDigests
 	}
 
-	sklog.Infof("Sending CL-based message covering %d groupings to diffcalculator", len(digestsPerGrouping))
+	sklog.Infof("Sending diff messages for CL %s covering %d groupings to diffcalculator", clID, len(digestsPerGrouping))
 	for hg, ds := range digestsPerGrouping {
 		grouping := paramtools.Params{
 			types.CorpusField:     hg[0],
@@ -934,7 +945,7 @@
 		// This should be pretty fast because it's just sending off the work, not blocking until
 		// the work is calculated.
 		if err := ix.DiffWorkPublisher.CalculateDiffs(ctx, grouping, digests, digests); err != nil {
-			return skerr.Wrapf(err, "publishing diff calculation of %d CL digests in grouping %v", len(digests), grouping)
+			return skerr.Wrapf(err, "publishing diff calculation for CL %s - %d digests in grouping %v", clID, len(digests), grouping)
 		}
 	}
 	return nil
diff --git a/golden/go/tjstore/sqltjstore/sqltjstore.go b/golden/go/tjstore/sqltjstore/sqltjstore.go
index 3e42ad7..3d85990 100644
--- a/golden/go/tjstore/sqltjstore/sqltjstore.go
+++ b/golden/go/tjstore/sqltjstore/sqltjstore.go
@@ -11,6 +11,7 @@
 	"github.com/jackc/pgtype"
 	"github.com/jackc/pgx/v4"
 	"github.com/jackc/pgx/v4/pgxpool"
+	"go.opencensus.io/trace"
 
 	"go.skia.org/infra/go/paramtools"
 	"go.skia.org/infra/go/skerr"
@@ -141,6 +142,8 @@
 // GetResults implements the tjstore.Store interface. Of note, it always returns a nil GroupParams
 // because the way the data is stored, there is no way to know which params were ingested together.
 func (s *StoreImpl) GetResults(ctx context.Context, cID tjstore.CombinedPSID, updatedAfter time.Time) ([]tjstore.TryJobResult, error) {
+	ctx, span := trace.StartSpan(ctx, "sqltjstore_GetResults")
+	defer span.End()
 	clID := sql.Qualify(cID.CRS, cID.CL)
 	psID := sql.Qualify(cID.CRS, cID.PS)