[perf] Ingesters should only process one pubsub event at a time.
Also add more logging to ingestion process.
Change-Id: I56e3f191a994128d775ead80b6070c3940b5f077
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/303918
Reviewed-by: Joe Gregorio <jcgregorio@google.com>
Commit-Queue: Joe Gregorio <jcgregorio@google.com>
diff --git a/perf/go/file/gcssource/gcssource.go b/perf/go/file/gcssource/gcssource.go
index 5278a69..09adeaa 100644
--- a/perf/go/file/gcssource/gcssource.go
+++ b/perf/go/file/gcssource/gcssource.go
@@ -24,7 +24,7 @@
const (
// maxParallelReceives is the number of Go routines we want to run.
// Determined experimentally.
- maxParallelReceives = 10
+ maxParallelReceives = 1
// subscriptionSuffix is the name we append to a topic name to build a
// subscription name.
@@ -73,7 +73,6 @@
if err != nil {
return nil, skerr.Wrap(err)
}
-
client := httputils.DefaultClientConfig().WithTokenSource(ts).WithoutRetries().Client()
gcsClient, err := storage.NewClient(ctx, option.WithHTTPClient(client))
if err != nil {
diff --git a/perf/go/ingest/parser/parser.go b/perf/go/ingest/parser/parser.go
index 58f4584..3f397e6 100644
--- a/perf/go/ingest/parser/parser.go
+++ b/perf/go/ingest/parser/parser.go
@@ -165,19 +165,23 @@
// Read the whole content into bytes.Reader since we may take more than one
// pass at the data.
+ sklog.Infof("About to read.")
b, err := ioutil.ReadAll(file.Contents)
+ sklog.Infof("Finished readall.")
if err != nil {
return nil, nil, "", skerr.Wrap(err)
}
r := bytes.NewReader(b)
// Expect the file to be in format.FileFormat.
+ sklog.Info("About to extract")
params, values, hash, commonKeys, err := p.extractFromVersion1File(r, file.Name)
if err != nil {
// Fallback to the legacy format.
if _, err := r.Seek(0, io.SeekStart); err != nil {
return nil, nil, "", skerr.Wrap(err)
}
+ sklog.Info("About to extract from legacy.")
params, values, hash, commonKeys, err = p.extractFromLegacyFile(r, file.Name)
}
if err != nil && err != ErrFileShouldBeSkipped {
diff --git a/perf/go/ingest/process/process.go b/perf/go/ingest/process/process.go
index 9eefdbd..0085a5d 100644
--- a/perf/go/ingest/process/process.go
+++ b/perf/go/ingest/process/process.go
@@ -117,6 +117,7 @@
// Parse the file.
params, values, gitHash, err := p.Parse(f)
+ sklog.Infof("Parse error: %s", err)
if err != nil {
if err == parser.ErrFileShouldBeSkipped {
skipped.Inc(1)
@@ -127,6 +128,7 @@
continue
}
+ sklog.Info("Lookup CommitNumber")
// Convert gitHash to commitNumber.
commitNumber, err := g.CommitNumberFromGitHash(ctx, gitHash)
if err != nil {
@@ -142,12 +144,14 @@
}
}
+ sklog.Info("Build ParamSet")
// Build paramset from params.
ps := paramtools.NewParamSet()
for _, p := range params {
ps.AddParams(p)
}
+ sklog.Info("WriteTraces")
// Write data to the trace store.
if err := store.WriteTraces(commitNumber, params, values, ps, f.Name, time.Now()); err != nil {
failedToWrite.Inc(1)