[perf] Insert values in batches.
Change-Id: Ic18f4650cdc009277f47542be60505548734be91
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/305617
Commit-Queue: Joe Gregorio <jcgregorio@google.com>
Reviewed-by: Kevin Lubick <kjlubick@google.com>
diff --git a/perf/go/tracestore/sqltracestore/sqltracestore.go b/perf/go/tracestore/sqltracestore/sqltracestore.go
index cfdd491..cafd347 100644
--- a/perf/go/tracestore/sqltracestore/sqltracestore.go
+++ b/perf/go/tracestore/sqltracestore/sqltracestore.go
@@ -138,6 +138,7 @@
"go.skia.org/infra/go/query"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
+ "go.skia.org/infra/go/util"
"go.skia.org/infra/go/vec32"
perfsql "go.skia.org/infra/perf/go/sql"
"go.skia.org/infra/perf/go/tracestore"
@@ -151,6 +152,10 @@
// per instance.
const cacheSize = 10 * 1000 * 1000
+// Ingest instances have around 8 cores and many ingested files have ~10K values, so
+// pick a batch size that allows roughly one request per core.
+const traceValuesInsertBatchSize = 1000
+
// statement is an SQL statement or fragment of an SQL statement.
type statement int
@@ -182,6 +187,13 @@
{{ range $index, $element := . -}}
{{ if $index }},{{end}}({{ $element.TileNumber }}, '{{ $element.KeyValue }}', {{ $element.TraceID }})
{{ end }}`,
+ replaceTraceValues: `
+ UPSERT INTO
+ TraceValues (trace_id, commit_number, val, source_file_id)
+ VALUES
+ {{ range $index, $element := . -}}
+ {{ if $index }},{{end}}({{ $element.TraceID }}, {{ $element.CommitNumber }}, {{ $element.Val }}, {{ $element.SourceFileID }})
+ {{ end }}`,
},
}
@@ -192,6 +204,14 @@
TraceID int64
}
+// replaceTraceValue is used in the replaceTraceValues template.
+type replaceTraceValue struct {
+ TraceID int64
+ CommitNumber types.CommitNumber
+ Val float32
+ SourceFileID int64
+}
+
var statementsByDialect = map[perfsql.Dialect]statements{
perfsql.CockroachDBDialect: {
insertIntoSourceFiles: `
@@ -229,11 +249,6 @@
($1, $2, $3)
ON CONFLICT
DO NOTHING`,
- replaceTraceValues: `
- UPSERT INTO
- TraceValues (trace_id, commit_number, val, source_file_id)
- VALUES
- ($1, $2, $3, $4)`,
countIndices: `
SELECT
COUNT(*)
@@ -764,9 +779,13 @@
return ret, nil
}
-// updateTraceValues writes a single entry in to the TraceValues table.
-func (s *SQLTraceStore) updateTraceValues(traceID int64, commitNumber types.CommitNumber, x float32, sourceID int64) error {
- _, err := s.preparedStatements[replaceTraceValues].ExecContext(context.TODO(), traceID, commitNumber, x, sourceID)
+// updateTraceValues writes the given slice of replaceTraceValues into the store.
+func (s *SQLTraceStore) updateTraceValues(templateContext []replaceTraceValue) error {
+ var b bytes.Buffer
+ if err := s.unpreparedStatements[replaceTraceValues].Execute(&b, templateContext); err != nil {
+ return skerr.Wrapf(err, "failed to expand template")
+ }
+ _, err := s.db.ExecContext(context.TODO(), b.String())
return skerr.Wrap(err)
}
@@ -779,25 +798,29 @@
return skerr.Wrap(err)
}
- // Get trace ids for each trace and add trace ids to the index/postings.
- // We populate the traceIDs slice whose values are 1:1 with the values and
- // params slices.
- traceIDs := make([]int64, len(params))
+ // Build the context for the SQL template.
+ templateContext := make([]replaceTraceValue, 0, len(params))
for i, p := range params {
traceID, err := s.writeTraceIDAndPostings(p, tileNumber)
if err != nil {
return skerr.Wrap(err)
}
- traceIDs[i] = traceID
+ templateContext = append(templateContext, replaceTraceValue{
+ TraceID: traceID,
+ CommitNumber: commitNumber,
+ Val: values[i],
+ SourceFileID: sourceID,
+ })
}
- // Now add each trace value.
- for i, x := range values {
- if err := s.updateTraceValues(traceIDs[i], commitNumber, x, sourceID); err != nil {
- return skerr.Wrap(err)
+ err = util.ChunkIterParallel(context.TODO(), len(templateContext), traceValuesInsertBatchSize, func(ctx context.Context, startIdx int, endIdx int) error {
+ if err := s.updateTraceValues(templateContext[startIdx:endIdx]); err != nil {
+ return skerr.Wrapf(err, "failed inserting subSlice: [%d:%d]", startIdx, endIdx)
}
- }
- return nil
+ return nil
+ })
+
+ return err
}
// Confirm that *SQLTraceStore fulfills the tracestore.TraceStore interface.
diff --git a/perf/go/tracestore/sqltracestore/sqltracestore_test.go b/perf/go/tracestore/sqltracestore/sqltracestore_test.go
index c1e3ccd..4758b17 100644
--- a/perf/go/tracestore/sqltracestore/sqltracestore_test.go
+++ b/perf/go/tracestore/sqltracestore/sqltracestore_test.go
@@ -2,6 +2,7 @@
import (
"context"
+ "fmt"
"testing"
"time"
@@ -81,6 +82,57 @@
assert.Equal(t, traceID, traceID2)
}
+func testWriteTraces_MultipleBatches_Success(t *testing.T, s *SQLTraceStore) {
+ ctx := context.Background()
+
+ const commitNumber = types.CommitNumber(1)
+
+ // Add enough values to force it to be done in batches.
+ const testLength = 2*traceValuesInsertBatchSize + 1
+
+ const tileNumber = types.TileNumber(0)
+
+ traceNames := make([]paramtools.Params, 0, testLength)
+ values := make([]float32, 0, testLength)
+
+ for i := 0; i < testLength; i++ {
+ traceNames = append(traceNames, paramtools.Params{
+ "traceid": fmt.Sprintf("%d", i),
+ "config": "8888",
+ })
+ values = append(values, float32(i))
+ }
+ err := s.WriteTraces(
+ commitNumber,
+ traceNames,
+ values,
+ paramtools.ParamSet{}, // ParamSet is empty because WriteTraces doesn't use it in this impl.
+ "gs://not-tested-as-part-of-this-test.json",
+ time.Time{}) // time is unused in this impl of TraceStore.
+ require.NoError(t, err)
+
+ // Confirm all traces were written.
+ q, err := query.NewFromString("config=8888")
+ require.NoError(t, err)
+ ts, err := s.QueryTracesByIndex(ctx, tileNumber, q)
+ assert.NoError(t, err)
+ assert.Len(t, ts, testLength)
+
+ // Spot test some values.
+ q, err = query.NewFromString("config=8888&traceid=0")
+ require.NoError(t, err)
+ ts, err = s.QueryTracesByIndex(ctx, tileNumber, q)
+ assert.NoError(t, err)
+
+ assert.Equal(t, float32(0), ts[",config=8888,traceid=0,"][s.OffsetFromIndex(commitNumber)])
+
+ q, err = query.NewFromString(fmt.Sprintf("config=8888&traceid=%d", testLength-1))
+ require.NoError(t, err)
+ ts, err = s.QueryTracesByIndex(ctx, tileNumber, q)
+ assert.NoError(t, err)
+ assert.Equal(t, float32(testLength-1), ts[fmt.Sprintf(",config=8888,traceid=%d,", testLength-1)][s.OffsetFromIndex(commitNumber)])
+}
+
func testReadTraces(t *testing.T, s *SQLTraceStore) {
populatedTestDB(t, s)
@@ -474,6 +526,7 @@
"testCountIndices_Empty": testCountIndices_Empty,
"testGetSource_Empty": testGetSource_Empty,
"testReadTraces": testReadTraces,
+ "testWriteTraces_MultipleBatches_Success": testWriteTraces_MultipleBatches_Success,
"testReadTraces_InvalidKey": testReadTraces_InvalidKey,
"testReadTraces_NoResults": testReadTraces_NoResults,
"testReadTraces_EmptyTileReturnsNoData": testReadTraces_EmptyTileReturnsNoData,