[perf] Chunk writing ParamSets.
A single CT file can have over two million measurements.
Change-Id: I8c617881a540a3a7171558af3d44a670d7e32af7
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/310536
Commit-Queue: Joe Gregorio <jcgregorio@google.com>
Reviewed-by: Kevin Lubick <kjlubick@google.com>
diff --git a/perf/go/tracestore/sqltracestore/sqltracestore.go b/perf/go/tracestore/sqltracestore/sqltracestore.go
index 954bb8a..a53f925 100644
--- a/perf/go/tracestore/sqltracestore/sqltracestore.go
+++ b/perf/go/tracestore/sqltracestore/sqltracestore.go
@@ -942,7 +942,7 @@
func (s *SQLTraceStore) WriteTraces(commitNumber types.CommitNumber, params []paramtools.Params, values []float32, ps paramtools.ParamSet, source string, _ time.Time) error {
defer timer.NewWithSummary("perfserver_sqltracestore_write_traces", s.writeTracesMetric).Stop()
- ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Minute)
+ ctx, cancel := context.WithTimeout(context.TODO(), 15*time.Minute)
defer cancel()
tileNumber := s.TileNumber(commitNumber)
@@ -965,19 +965,26 @@
}
if len(paramSetsContext) > 0 {
- var b bytes.Buffer
- if err := s.unpreparedStatements[insertIntoParamSets].Execute(&b, paramSetsContext); err != nil {
- return skerr.Wrapf(err, "failed to expand paramsets template")
- }
+ err := util.ChunkIter(len(paramSetsContext), writeTracesChunkSize, func(startIdx int, endIdx int) error {
+ chunk := paramSetsContext[startIdx:endIdx]
+ var b bytes.Buffer
+ if err := s.unpreparedStatements[insertIntoParamSets].Execute(&b, chunk); err != nil {
+ return skerr.Wrapf(err, "failed to expand paramsets template in slice [%d, %d]", startIdx, endIdx)
+ }
- sql := b.String()
+ sql := b.String()
- sklog.Infof("About to write %d paramset entries with sql of length %d", len(paramSetsContext), len(sql))
- if _, err := s.db.Exec(ctx, sql); err != nil {
- return skerr.Wrapf(err, "Executing: %q", b.String())
- }
- for _, ele := range paramSetsContext {
- s.cache.Add(ele.cacheKey)
+ sklog.Infof("About to write %d paramset entries with sql of length %d", endIdx-startIdx, len(sql))
+ if _, err := s.db.Exec(ctx, sql); err != nil {
+ return skerr.Wrapf(err, "Executing: %q", b.String())
+ }
+ for _, ele := range chunk {
+ s.cache.Add(ele.cacheKey)
+ }
+ return nil
+ })
+ if err != nil {
+ return err
}
}