[perf] Insert values in batches.

Change-Id: Ic18f4650cdc009277f47542be60505548734be91
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/305617
Commit-Queue: Joe Gregorio <jcgregorio@google.com>
Reviewed-by: Kevin Lubick <kjlubick@google.com>
diff --git a/perf/go/tracestore/sqltracestore/sqltracestore.go b/perf/go/tracestore/sqltracestore/sqltracestore.go
index cfdd491..cafd347 100644
--- a/perf/go/tracestore/sqltracestore/sqltracestore.go
+++ b/perf/go/tracestore/sqltracestore/sqltracestore.go
@@ -138,6 +138,7 @@
 	"go.skia.org/infra/go/query"
 	"go.skia.org/infra/go/skerr"
 	"go.skia.org/infra/go/sklog"
+	"go.skia.org/infra/go/util"
 	"go.skia.org/infra/go/vec32"
 	perfsql "go.skia.org/infra/perf/go/sql"
 	"go.skia.org/infra/perf/go/tracestore"
@@ -151,6 +152,10 @@
 // per instance.
 const cacheSize = 10 * 1000 * 1000
 
+// Ingest instances have around 8 cores and many ingested files have ~10K values, so
+// pick a batch size that allows roughly one request per core.
+const traceValuesInsertBatchSize = 1000
+
 // statement is an SQL statement or fragment of an SQL statement.
 type statement int
 
@@ -182,6 +187,13 @@
 			{{ range $index, $element :=  . -}}
 				{{ if $index }},{{end}}({{ $element.TileNumber }}, '{{ $element.KeyValue }}', {{ $element.TraceID }})
 	  		{{ end }}`,
+		replaceTraceValues: `
+		UPSERT INTO
+			TraceValues (trace_id, commit_number, val, source_file_id)
+		VALUES
+		{{ range $index, $element :=  . -}}
+			{{ if $index }},{{end}}({{ $element.TraceID }}, {{ $element.CommitNumber }}, {{ $element.Val }}, {{ $element.SourceFileID }})
+		{{ end }}`,
 	},
 }
 
@@ -192,6 +204,14 @@
 	TraceID    int64
 }
 
+// replaceTraceValue is used in the replaceTraceValues template.
+type replaceTraceValue struct {
+	TraceID      int64
+	CommitNumber types.CommitNumber
+	Val          float32
+	SourceFileID int64
+}
+
 var statementsByDialect = map[perfsql.Dialect]statements{
 	perfsql.CockroachDBDialect: {
 		insertIntoSourceFiles: `
@@ -229,11 +249,6 @@
 			($1, $2, $3)
 		ON CONFLICT
 		DO NOTHING`,
-		replaceTraceValues: `
-		UPSERT INTO
-			TraceValues (trace_id, commit_number, val, source_file_id)
-		VALUES
-			($1, $2, $3, $4)`,
 		countIndices: `
 		SELECT
 			COUNT(*)
@@ -764,9 +779,13 @@
 	return ret, nil
 }
 
-// updateTraceValues writes a single entry in to the TraceValues table.
-func (s *SQLTraceStore) updateTraceValues(traceID int64, commitNumber types.CommitNumber, x float32, sourceID int64) error {
-	_, err := s.preparedStatements[replaceTraceValues].ExecContext(context.TODO(), traceID, commitNumber, x, sourceID)
+// updateTraceValues writes the given slice of replaceTraceValues into the store.
+func (s *SQLTraceStore) updateTraceValues(templateContext []replaceTraceValue) error {
+	var b bytes.Buffer
+	if err := s.unpreparedStatements[replaceTraceValues].Execute(&b, templateContext); err != nil {
+		return skerr.Wrapf(err, "failed to expand template")
+	}
+	_, err := s.db.ExecContext(context.TODO(), b.String())
 	return skerr.Wrap(err)
 }
 
@@ -779,25 +798,29 @@
 		return skerr.Wrap(err)
 	}
 
-	// Get trace ids for each trace and add trace ids to the index/postings.
-	// We populate the traceIDs slice whose values are 1:1 with the values and
-	// params slices.
-	traceIDs := make([]int64, len(params))
+	// Build the context for the SQL template.
+	templateContext := make([]replaceTraceValue, 0, len(params))
 	for i, p := range params {
 		traceID, err := s.writeTraceIDAndPostings(p, tileNumber)
 		if err != nil {
 			return skerr.Wrap(err)
 		}
-		traceIDs[i] = traceID
+		templateContext = append(templateContext, replaceTraceValue{
+			TraceID:      traceID,
+			CommitNumber: commitNumber,
+			Val:          values[i],
+			SourceFileID: sourceID,
+		})
 	}
 
-	// Now add each trace value.
-	for i, x := range values {
-		if err := s.updateTraceValues(traceIDs[i], commitNumber, x, sourceID); err != nil {
-			return skerr.Wrap(err)
+	err = util.ChunkIterParallel(context.TODO(), len(templateContext), traceValuesInsertBatchSize, func(ctx context.Context, startIdx int, endIdx int) error {
+		if err := s.updateTraceValues(templateContext[startIdx:endIdx]); err != nil {
+			return skerr.Wrapf(err, "failed inserting subSlice: [%d:%d]", startIdx, endIdx)
 		}
-	}
-	return nil
+		return nil
+	})
+
+	return err
 }
 
 // Confirm that *SQLTraceStore fulfills the tracestore.TraceStore interface.
diff --git a/perf/go/tracestore/sqltracestore/sqltracestore_test.go b/perf/go/tracestore/sqltracestore/sqltracestore_test.go
index c1e3ccd..4758b17 100644
--- a/perf/go/tracestore/sqltracestore/sqltracestore_test.go
+++ b/perf/go/tracestore/sqltracestore/sqltracestore_test.go
@@ -2,6 +2,7 @@
 
 import (
 	"context"
+	"fmt"
 	"testing"
 	"time"
 
@@ -81,6 +82,57 @@
 	assert.Equal(t, traceID, traceID2)
 }
 
+func testWriteTraces_MultipleBatches_Success(t *testing.T, s *SQLTraceStore) {
+	ctx := context.Background()
+
+	const commitNumber = types.CommitNumber(1)
+
+	// Add enough values to force it to be done in batches.
+	const testLength = 2*traceValuesInsertBatchSize + 1
+
+	const tileNumber = types.TileNumber(0)
+
+	traceNames := make([]paramtools.Params, 0, testLength)
+	values := make([]float32, 0, testLength)
+
+	for i := 0; i < testLength; i++ {
+		traceNames = append(traceNames, paramtools.Params{
+			"traceid": fmt.Sprintf("%d", i),
+			"config":  "8888",
+		})
+		values = append(values, float32(i))
+	}
+	err := s.WriteTraces(
+		commitNumber,
+		traceNames,
+		values,
+		paramtools.ParamSet{}, // ParamSet is empty because WriteTraces doesn't use it in this impl.
+		"gs://not-tested-as-part-of-this-test.json",
+		time.Time{}) // time is unused in this impl of TraceStore.
+	require.NoError(t, err)
+
+	// Confirm all traces were written.
+	q, err := query.NewFromString("config=8888")
+	require.NoError(t, err)
+	ts, err := s.QueryTracesByIndex(ctx, tileNumber, q)
+	assert.NoError(t, err)
+	assert.Len(t, ts, testLength)
+
+	// Spot test some values.
+	q, err = query.NewFromString("config=8888&traceid=0")
+	require.NoError(t, err)
+	ts, err = s.QueryTracesByIndex(ctx, tileNumber, q)
+	assert.NoError(t, err)
+
+	assert.Equal(t, float32(0), ts[",config=8888,traceid=0,"][s.OffsetFromIndex(commitNumber)])
+
+	q, err = query.NewFromString(fmt.Sprintf("config=8888&traceid=%d", testLength-1))
+	require.NoError(t, err)
+	ts, err = s.QueryTracesByIndex(ctx, tileNumber, q)
+	assert.NoError(t, err)
+	assert.Equal(t, float32(testLength-1), ts[fmt.Sprintf(",config=8888,traceid=%d,", testLength-1)][s.OffsetFromIndex(commitNumber)])
+}
+
 func testReadTraces(t *testing.T, s *SQLTraceStore) {
 	populatedTestDB(t, s)
 
@@ -474,6 +526,7 @@
 	"testCountIndices_Empty":                                          testCountIndices_Empty,
 	"testGetSource_Empty":                                             testGetSource_Empty,
 	"testReadTraces":                                                  testReadTraces,
+	"testWriteTraces_MultipleBatches_Success":                         testWriteTraces_MultipleBatches_Success,
 	"testReadTraces_InvalidKey":                                       testReadTraces_InvalidKey,
 	"testReadTraces_NoResults":                                        testReadTraces_NoResults,
 	"testReadTraces_EmptyTileReturnsNoData":                           testReadTraces_EmptyTileReturnsNoData,