[gold] Fix traces and make PubSub publication non-blocking.
I think this should speed up CL indexing again and make the
trace data sensible.
Bug: skia:10582
Change-Id: I418350032fc84c420496326daedce8234254e655
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/371516
Reviewed-by: Kevin Lubick <kjlubick@google.com>
diff --git a/golden/cmd/skiacorrectness/skiacorrectness.go b/golden/cmd/skiacorrectness/skiacorrectness.go
index 2046b2e..3ed17da 100644
--- a/golden/cmd/skiacorrectness/skiacorrectness.go
+++ b/golden/cmd/skiacorrectness/skiacorrectness.go
@@ -21,6 +21,7 @@
"cloud.google.com/go/pubsub"
"github.com/gorilla/mux"
"github.com/jackc/pgx/v4/pgxpool"
+ "go.opencensus.io/trace"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/golden/go/types"
"golang.org/x/oauth2"
@@ -626,6 +627,8 @@
// CalculateDiffs publishes a WorkerMessage to the configured PubSub topic so that a worker
// (see diffcalculator) can pick it up and calculate the diffs.
func (p *pubsubDiffPublisher) CalculateDiffs(ctx context.Context, grouping paramtools.Params, left, right []types.Digest) error {
+ ctx, span := trace.StartSpan(ctx, "PublishDiffMessage")
+ defer span.End()
body, err := json.Marshal(diff.WorkerMessage{
Version: diff.WorkerMessageVersion,
Grouping: grouping,
@@ -635,14 +638,10 @@
if err != nil {
return skerr.Wrap(err) // should never happen because JSON input is well-formed.
}
- pr := p.client.Topic(p.topic).Publish(ctx, &pubsub.Message{
+ p.client.Topic(p.topic).Publish(ctx, &pubsub.Message{
Data: body,
})
- // Blocks until message actual sent
- _, err = pr.Get(ctx)
- if err != nil {
- return skerr.Wrap(err)
- }
+ // Don't block until message is sent to speed up throughput.
return nil
}
diff --git a/golden/go/clstore/sqlclstore/sqlclstore.go b/golden/go/clstore/sqlclstore/sqlclstore.go
index 8d2e0eb..5f04cd6 100644
--- a/golden/go/clstore/sqlclstore/sqlclstore.go
+++ b/golden/go/clstore/sqlclstore/sqlclstore.go
@@ -6,8 +6,8 @@
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
+ "go.opencensus.io/trace"
- "go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/golden/go/clstore"
@@ -53,7 +53,8 @@
// GetChangelists implements clstore.Store.
func (s *StoreImpl) GetChangelists(ctx context.Context, opts clstore.SearchOptions) ([]code_review.Changelist, int, error) {
- defer metrics2.FuncTimer().Stop()
+ ctx, span := trace.StartSpan(ctx, "sqlclstore_GetChangelists")
+ defer span.End()
if opts.Limit <= 0 {
return nil, 0, skerr.Fmt("must supply a limit")
}
diff --git a/golden/go/expectations/fs_expectationstore/fs_expstore.go b/golden/go/expectations/fs_expectationstore/fs_expstore.go
index a4c1273..91b32cf 100644
--- a/golden/go/expectations/fs_expectationstore/fs_expstore.go
+++ b/golden/go/expectations/fs_expectationstore/fs_expstore.go
@@ -11,6 +11,7 @@
"time"
"cloud.google.com/go/firestore"
+ "go.opencensus.io/trace"
"golang.org/x/sync/errgroup"
ifirestore "go.skia.org/infra/go/firestore"
@@ -459,6 +460,8 @@
// Firestore if not.
func (s *Store) Get(ctx context.Context) (expectations.ReadOnly, error) {
if s.hasSnapshotsRunning {
+ _, span := trace.StartSpan(ctx, "fsexpstore_getFromSnapshots")
+ defer span.End()
// If the snapshots are running, we first check to see if we have a fresh Expectations.
s.returnCacheMutex.Lock()
defer s.returnCacheMutex.Unlock()
@@ -503,7 +506,8 @@
// the assumption that loadExpectations will only be called for setups that do not have the
// snapshot queries, and the entryCache is used to create the expectationChanges (for undoing).
func (s *Store) loadExpectations(ctx context.Context) (*expectations.Expectations, error) {
- defer metrics2.FuncTimer().Stop()
+ ctx, span := trace.StartSpan(ctx, "fsexpstore_loadExpectations")
+ defer span.End()
es := make([][]expectationEntry, s.numShards)
queries := fs_utils.ShardOnDigest(s.expectationsCollection(), digestField, s.numShards)
diff --git a/golden/go/indexer/indexer.go b/golden/go/indexer/indexer.go
index 721d198..39ecc69 100644
--- a/golden/go/indexer/indexer.go
+++ b/golden/go/indexer/indexer.go
@@ -779,14 +779,12 @@
sklog.Errorf("Changelist indexing timed out (%v)", err)
return nil
}
- ctx, expSpan := trace.StartSpan(ctx, "indexer_getCLExpectations")
issueExpStore := ix.ExpectationsStore.ForChangelist(cl.SystemID, system.ID)
clExps, err := issueExpStore.Get(ctx)
if err != nil {
return skerr.Wrapf(err, "loading expectations for cl %s (%s)", cl.SystemID, system.ID)
}
exps := expectations.Join(clExps, masterExp)
- expSpan.End()
clKey := fmt.Sprintf("%s_%s", system.ID, cl.SystemID)
clIdx, ok := ix.getCLIndex(clKey)
@@ -901,10 +899,15 @@
if err != nil {
return skerr.Wrap(err)
}
+ _, spanTryjobData := trace.StartSpan(ctx, "getTryjobData")
+ spanTryjobData.AddAttributes(trace.Int64Attribute("data_points", int64(len(xtjr))))
// The left and right digests will be the data from these tryjobs as well as the non-ignored
// data on the primary branch for the corresponding groupings.
digestsPerGrouping := map[hashableGrouping]types.DigestSet{}
for _, tjr := range xtjr {
+ if err := ctx.Err(); err != nil {
+ return skerr.Wrap(err)
+ }
traceKeys := paramtools.Params{}
traceKeys.Add(tjr.GroupParams, tjr.ResultParams)
grouping := getHashableGrouping(traceKeys)
@@ -917,10 +920,16 @@
}
digestsPerGrouping[grouping] = uniqueDigests
}
+ spanTryjobData.End()
tile := idx.cpxTile.GetTile(types.ExcludeIgnoredTraces)
+ _, primaryBranchData := trace.StartSpan(ctx, "getDataFromPrimarybranch")
+ primaryBranchData.AddAttributes(trace.Int64Attribute("groupings", int64(len(digestsPerGrouping))))
// Add the digests from the primary branch (using the index)
for grouping := range digestsPerGrouping {
+ if err := ctx.Err(); err != nil {
+ return skerr.Wrap(err)
+ }
q := paramtools.ParamSet{
types.CorpusField: []string{grouping[0]},
types.PrimaryKeyField: []string{grouping[1]},
@@ -934,6 +943,7 @@
}
digestsPerGrouping[grouping] = uniqueDigests
}
+ primaryBranchData.End()
sklog.Infof("Sending diff messages for CL %s covering %d groupings to diffcalculator", clID, len(digestsPerGrouping))
for hg, ds := range digestsPerGrouping {