[perf] Ingesters should only process one pubsub event at a time.

Also add more logging to ingestion process.

Change-Id: I56e3f191a994128d775ead80b6070c3940b5f077
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/303918
Reviewed-by: Joe Gregorio <jcgregorio@google.com>
Commit-Queue: Joe Gregorio <jcgregorio@google.com>
diff --git a/perf/go/file/gcssource/gcssource.go b/perf/go/file/gcssource/gcssource.go
index 5278a69..09adeaa 100644
--- a/perf/go/file/gcssource/gcssource.go
+++ b/perf/go/file/gcssource/gcssource.go
@@ -24,7 +24,7 @@
 const (
 	// maxParallelReceives is the number of Go routines we want to run.
 	// Determined experimentally.
-	maxParallelReceives = 10
+	maxParallelReceives = 1
 
 	// subscriptionSuffix is the name we append to a topic name to build a
 	// subscription name.
@@ -73,7 +73,6 @@
 	if err != nil {
 		return nil, skerr.Wrap(err)
 	}
-
 	client := httputils.DefaultClientConfig().WithTokenSource(ts).WithoutRetries().Client()
 	gcsClient, err := storage.NewClient(ctx, option.WithHTTPClient(client))
 	if err != nil {
diff --git a/perf/go/ingest/parser/parser.go b/perf/go/ingest/parser/parser.go
index 58f4584..3f397e6 100644
--- a/perf/go/ingest/parser/parser.go
+++ b/perf/go/ingest/parser/parser.go
@@ -165,19 +165,23 @@
 
 	// Read the whole content into bytes.Reader since we may take more than one
 	// pass at the data.
+	sklog.Infof("About to read.")
 	b, err := ioutil.ReadAll(file.Contents)
+	sklog.Infof("Finished readall.")
 	if err != nil {
 		return nil, nil, "", skerr.Wrap(err)
 	}
 	r := bytes.NewReader(b)
 
 	// Expect the file to be in format.FileFormat.
+	sklog.Info("About to extract")
 	params, values, hash, commonKeys, err := p.extractFromVersion1File(r, file.Name)
 	if err != nil {
 		// Fallback to the legacy format.
 		if _, err := r.Seek(0, io.SeekStart); err != nil {
 			return nil, nil, "", skerr.Wrap(err)
 		}
+		sklog.Info("About to extract from legacy.")
 		params, values, hash, commonKeys, err = p.extractFromLegacyFile(r, file.Name)
 	}
 	if err != nil && err != ErrFileShouldBeSkipped {
diff --git a/perf/go/ingest/process/process.go b/perf/go/ingest/process/process.go
index 9eefdbd..0085a5d 100644
--- a/perf/go/ingest/process/process.go
+++ b/perf/go/ingest/process/process.go
@@ -117,6 +117,7 @@
 
 		// Parse the file.
 		params, values, gitHash, err := p.Parse(f)
+		sklog.Infof("Parse error: %s", err)
 		if err != nil {
 			if err == parser.ErrFileShouldBeSkipped {
 				skipped.Inc(1)
@@ -127,6 +128,7 @@
 			continue
 		}
 
+		sklog.Info("Lookup CommitNumber")
 		// Convert gitHash to commitNumber.
 		commitNumber, err := g.CommitNumberFromGitHash(ctx, gitHash)
 		if err != nil {
@@ -142,12 +144,14 @@
 			}
 		}
 
+		sklog.Info("Build ParamSet")
 		// Build paramset from params.
 		ps := paramtools.NewParamSet()
 		for _, p := range params {
 			ps.AddParams(p)
 		}
 
+		sklog.Info("WriteTraces")
 		// Write data to the trace store.
 		if err := store.WriteTraces(commitNumber, params, values, ps, f.Name, time.Now()); err != nil {
 			failedToWrite.Inc(1)