[perf] Don't apply migations on startup.
Way too many "Error: can't acquire lock. At migrations.go:22" on startup.
Also:
- Add a cache for OrderedParamSets.
- Add a Tiles table to speed up building ParamSets.
- Add LOOKUP to many "INNER JOIN"s to speed them up.
Change-Id: I1c22b07f3f315d9bd9276edd02c243e8000c18cb
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/307956
Commit-Queue: Joe Gregorio <jcgregorio@google.com>
Auto-Submit: Joe Gregorio <jcgregorio@google.com>
Reviewed-by: Kevin Lubick <kjlubick@google.com>
diff --git a/perf/COCKROACHDB.md b/perf/COCKROACHDB.md
index 6e8cb3a..96722ca 100644
--- a/perf/COCKROACHDB.md
+++ b/perf/COCKROACHDB.md
@@ -43,9 +43,7 @@
## Migrations
-In theory the migrations library will take care of schema migrations on startup, but
-in reality, if running more than one instance, contention for the lock can arise. The
-migrations can be applied from the desktop by using the migrations command line app, which
+Migrations can be applied from the desktop by using the migrations command line app, which
can be installed by:
go get -tags 'cockroachdb' github.com/golang-migrate/migrate/cmd/migrate
diff --git a/perf/go/alerts/sqlalertstore/sqlalertstore_test.go b/perf/go/alerts/sqlalertstore/sqlalertstore_test.go
index 452d602..b220b92 100644
--- a/perf/go/alerts/sqlalertstore/sqlalertstore_test.go
+++ b/perf/go/alerts/sqlalertstore/sqlalertstore_test.go
@@ -14,7 +14,7 @@
for name, subTest := range alertstest.SubTests {
t.Run(name, func(t *testing.T) {
- db, cleanup := sqltest.NewCockroachDBForTests(t, "alertstore", sqltest.ApplyMigrations)
+ db, cleanup := sqltest.NewCockroachDBForTests(t, "alertstore")
// If this test timeouts then comment out the cleanup(), as it may hide the
// actual errors.
defer cleanup()
diff --git a/perf/go/builders/builders.go b/perf/go/builders/builders.go
index 30d538a..f25d6bd 100644
--- a/perf/go/builders/builders.go
+++ b/perf/go/builders/builders.go
@@ -6,7 +6,6 @@
import (
"context"
- "database/sql"
"strings"
"cloud.google.com/go/bigtable"
@@ -32,42 +31,17 @@
"go.skia.org/infra/perf/go/shortcut"
"go.skia.org/infra/perf/go/shortcut/dsshortcutstore"
"go.skia.org/infra/perf/go/shortcut/sqlshortcutstore"
- "go.skia.org/infra/perf/go/sql/migrations"
- "go.skia.org/infra/perf/go/sql/migrations/cockroachdb"
"go.skia.org/infra/perf/go/tracestore"
"go.skia.org/infra/perf/go/tracestore/btts"
"go.skia.org/infra/perf/go/tracestore/sqltracestore"
"google.golang.org/api/option"
)
-// newCockroachDBFromConfig opens an existing CockroachDB database with all
-// migrations applied.
+// newCockroachDBFromConfig opens an existing CockroachDB database.
+//
+// No migrations are applied automatically, they must be applied by the
+// 'migrate' command line application. See COCKROACHDB.md for more details.
func newCockroachDBFromConfig(ctx context.Context, instanceConfig *config.InstanceConfig) (*pgxpool.Pool, error) {
- // Note that the migrationsConnection is different from the sql.Open
- // connection string since migrations know about CockroachDB, but we use the
- // Postgres driver for the database/sql connection since there's no native
- // CockroachDB golang driver, and the suggested SQL drive for CockroachDB is
- // the Postgres driver since that's the underlying communication protocol it
- // uses.
- migrationsConnection := strings.Replace(instanceConfig.DataStoreConfig.ConnectionString, "postgresql://", "cockroach://", 1)
-
- db, err := sql.Open("pgx", instanceConfig.DataStoreConfig.ConnectionString)
- if err != nil {
- return nil, skerr.Wrap(err)
- }
- cockroachdbMigrations, err := cockroachdb.New()
- if err != nil {
- return nil, skerr.Wrap(err)
- }
- err = migrations.Up(cockroachdbMigrations, migrationsConnection)
- if err != nil {
- return nil, skerr.Wrap(err)
- }
- if err := db.Close(); err != nil {
- return nil, skerr.Wrap(err)
- }
- sklog.Infof("Finished applying migrations.")
-
return pgxpool.Connect(ctx, instanceConfig.DataStoreConfig.ConnectionString)
}
diff --git a/perf/go/builders/builders_test.go b/perf/go/builders/builders_test.go
index 38732ee..b3df33a 100644
--- a/perf/go/builders/builders_test.go
+++ b/perf/go/builders/builders_test.go
@@ -78,7 +78,7 @@
connectionString := fmt.Sprintf("postgresql://root@%s/%s?sslmode=disable", perfsql.GetCockroachDBEmulatorHost(), databaseName)
- _, cleanup := sqltest.NewCockroachDBForTests(t, databaseName, sqltest.DoNotApplyMigrations)
+ _, cleanup := sqltest.NewCockroachDBForTests(t, databaseName)
instanceConfig := &config.InstanceConfig{
DataStoreConfig: config.DataStoreConfig{
diff --git a/perf/go/dfbuilder/dfbuilder.go b/perf/go/dfbuilder/dfbuilder.go
index 41638f7..3eecc32 100644
--- a/perf/go/dfbuilder/dfbuilder.go
+++ b/perf/go/dfbuilder/dfbuilder.go
@@ -523,12 +523,13 @@
return -1, nil, err
}
+ now := time.Now()
if q.Empty() {
// If the query is empty then we have a shortcut for building the
// ParamSet by just using the OPS. In that case we only need to count
// encodedKeys to get the count.
for i := 0; i < 2; i++ {
- ops, err := b.store.GetOrderedParamSet(ctx, tileNumber)
+ ops, err := b.store.GetOrderedParamSet(ctx, tileNumber, now)
if err != nil {
return -1, nil, err
}
@@ -550,7 +551,7 @@
var ops *paramtools.OrderedParamSet
// Record the OPS for the first tile.
- opsOne, err := b.store.GetOrderedParamSet(ctx, tileNumber)
+ opsOne, err := b.store.GetOrderedParamSet(ctx, tileNumber, now)
if err != nil {
return -1, nil, err
}
@@ -572,7 +573,7 @@
tileNumber = tileNumber.Prev()
if tileNumber != types.BadTileNumber {
// Record the OPS for the second tile.
- opsTwo, err := b.store.GetOrderedParamSet(ctx, tileNumber)
+ opsTwo, err := b.store.GetOrderedParamSet(ctx, tileNumber, now)
if err != nil {
return -1, nil, err
}
diff --git a/perf/go/dfbuilder/dfbuilder_test.go b/perf/go/dfbuilder/dfbuilder_test.go
index 839fae9..b62fbbd 100644
--- a/perf/go/dfbuilder/dfbuilder_test.go
+++ b/perf/go/dfbuilder/dfbuilder_test.go
@@ -39,7 +39,7 @@
unittest.LargeTest(t)
dbName := fmt.Sprintf("dfbuilder%d", rand.Uint32())
- db, cleanup := sqltest.NewCockroachDBForTests(t, dbName, sqltest.ApplyMigrations)
+ db, cleanup := sqltest.NewCockroachDBForTests(t, dbName)
defer cleanup()
store, err := sqltracestore.New(db, cfg.DataStoreConfig)
@@ -194,6 +194,7 @@
",config=8888,model=Pixel,": 3.0,
}, "gs://foo.json", time.Now())
assert.NoError(t, err)
+ store.ClearOrderedParamSetCache()
// This query will only encode for one tile and should still succeed.
q, err = query.New(url.Values{"model": []string{"Pixel"}})
diff --git a/perf/go/git/gittest/gittest.go b/perf/go/git/gittest/gittest.go
index 3e27737..8f4e488 100644
--- a/perf/go/git/gittest/gittest.go
+++ b/perf/go/git/gittest/gittest.go
@@ -55,7 +55,7 @@
dbName := fmt.Sprintf("git%d", rand.Uint64())
// Init our sql database.
- db, sqlCleanup := sqltest.NewCockroachDBForTests(t, dbName, sqltest.ApplyMigrations)
+ db, sqlCleanup := sqltest.NewCockroachDBForTests(t, dbName)
// Get tmp dir to use for repo checkout.
tmpDir, err := ioutil.TempDir("", "git")
diff --git a/perf/go/ingest/process/process_test.go b/perf/go/ingest/process/process_test.go
index 545ff0f..eee5752 100644
--- a/perf/go/ingest/process/process_test.go
+++ b/perf/go/ingest/process/process_test.go
@@ -61,7 +61,7 @@
func TestStart_IngestDemoRepoWithCockroachDBTraceStore_Success(t *testing.T) {
unittest.ManualTest(t)
- _, cleanup := sqltest.NewCockroachDBForTests(t, CockroachDatabaseName, sqltest.ApplyMigrations)
+ _, cleanup := sqltest.NewCockroachDBForTests(t, CockroachDatabaseName)
defer cleanup()
// Get tmp dir to use for repo checkout.
diff --git a/perf/go/psrefresh/mocks/OPSProvider.go b/perf/go/psrefresh/mocks/OPSProvider.go
index 21a805c..de68976 100644
--- a/perf/go/psrefresh/mocks/OPSProvider.go
+++ b/perf/go/psrefresh/mocks/OPSProvider.go
@@ -8,6 +8,8 @@
mock "github.com/stretchr/testify/mock"
paramtools "go.skia.org/infra/go/paramtools"
+ time "time"
+
types "go.skia.org/infra/perf/go/types"
)
@@ -37,13 +39,13 @@
return r0, r1
}
-// GetOrderedParamSet provides a mock function with given fields: ctx, tileNumber
-func (_m *OPSProvider) GetOrderedParamSet(ctx context.Context, tileNumber types.TileNumber) (*paramtools.OrderedParamSet, error) {
- ret := _m.Called(ctx, tileNumber)
+// GetOrderedParamSet provides a mock function with given fields: ctx, tileNumber, now
+func (_m *OPSProvider) GetOrderedParamSet(ctx context.Context, tileNumber types.TileNumber, now time.Time) (*paramtools.OrderedParamSet, error) {
+ ret := _m.Called(ctx, tileNumber, now)
var r0 *paramtools.OrderedParamSet
- if rf, ok := ret.Get(0).(func(context.Context, types.TileNumber) *paramtools.OrderedParamSet); ok {
- r0 = rf(ctx, tileNumber)
+ if rf, ok := ret.Get(0).(func(context.Context, types.TileNumber, time.Time) *paramtools.OrderedParamSet); ok {
+ r0 = rf(ctx, tileNumber, now)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*paramtools.OrderedParamSet)
@@ -51,8 +53,8 @@
}
var r1 error
- if rf, ok := ret.Get(1).(func(context.Context, types.TileNumber) error); ok {
- r1 = rf(ctx, tileNumber)
+ if rf, ok := ret.Get(1).(func(context.Context, types.TileNumber, time.Time) error); ok {
+ r1 = rf(ctx, tileNumber, now)
} else {
r1 = ret.Error(1)
}
diff --git a/perf/go/psrefresh/psrefresh.go b/perf/go/psrefresh/psrefresh.go
index a675b10..2af8798 100644
--- a/perf/go/psrefresh/psrefresh.go
+++ b/perf/go/psrefresh/psrefresh.go
@@ -15,7 +15,7 @@
// OPSProvider allows access to OrdererParamSets. btts.BigTableTraceStore implements this interface.
type OPSProvider interface {
GetLatestTile() (types.TileNumber, error)
- GetOrderedParamSet(ctx context.Context, tileNumber types.TileNumber) (*paramtools.OrderedParamSet, error)
+ GetOrderedParamSet(ctx context.Context, tileNumber types.TileNumber, now time.Time) (*paramtools.OrderedParamSet, error)
}
// ParamSetRefresher keeps a fresh paramtools.ParamSet that represents all
@@ -52,18 +52,19 @@
func (pf *ParamSetRefresher) oneStep() error {
ctx := context.Background()
+ now := time.Now()
tileKey, err := pf.traceStore.GetLatestTile()
if err != nil {
return skerr.Wrapf(err, "Failed to get starting tile.")
}
- ops, err := pf.traceStore.GetOrderedParamSet(ctx, tileKey)
+ ops, err := pf.traceStore.GetOrderedParamSet(ctx, tileKey, now)
if err != nil {
return skerr.Wrapf(err, "Failed to paramset from latest tile.")
}
ps := ops.ParamSet
tileKey = tileKey.Prev()
- ops2, err := pf.traceStore.GetOrderedParamSet(ctx, tileKey)
+ ops2, err := pf.traceStore.GetOrderedParamSet(ctx, tileKey, now)
if err != nil {
return skerr.Wrapf(err, "Failed to paramset from second to latest tile.")
}
diff --git a/perf/go/psrefresh/psrefresh_test.go b/perf/go/psrefresh/psrefresh_test.go
index 7d3974f..ca53857 100644
--- a/perf/go/psrefresh/psrefresh_test.go
+++ b/perf/go/psrefresh/psrefresh_test.go
@@ -29,8 +29,8 @@
ps2.Update(paramtools.ParamSet{
"config": []string{"8888", "565", "gles"},
})
- op.On("GetOrderedParamSet", mock.Anything, tileNumber).Return(ps1, nil)
- op.On("GetOrderedParamSet", mock.Anything, tileNumber2).Return(ps2, nil)
+ op.On("GetOrderedParamSet", mock.Anything, tileNumber, mock.Anything).Return(ps1, nil)
+ op.On("GetOrderedParamSet", mock.Anything, tileNumber2, mock.Anything).Return(ps2, nil)
pf := NewParamSetRefresher(op)
err := pf.Start(time.Minute)
diff --git a/perf/go/regression/dfiter_test.go b/perf/go/regression/dfiter_test.go
index a4ca774..61be47a 100644
--- a/perf/go/regression/dfiter_test.go
+++ b/perf/go/regression/dfiter_test.go
@@ -45,7 +45,7 @@
type cleanupFunc func()
func newForTest(t *testing.T) (context.Context, dataframe.DataFrameBuilder, *perfgit.Git, cleanupFunc) {
- db, dbCleanup := sqltest.NewCockroachDBForTests(t, CockroachDatabaseName, sqltest.ApplyMigrations)
+ db, dbCleanup := sqltest.NewCockroachDBForTests(t, CockroachDatabaseName)
cfg := config.DataStoreConfig{
TileSize: testTileSize,
diff --git a/perf/go/regression/sqlregressionstore/sqlregressionstore_test.go b/perf/go/regression/sqlregressionstore/sqlregressionstore_test.go
index 1246441..0c7cc37 100644
--- a/perf/go/regression/sqlregressionstore/sqlregressionstore_test.go
+++ b/perf/go/regression/sqlregressionstore/sqlregressionstore_test.go
@@ -18,7 +18,7 @@
// Common regressiontest tests.
for name, subTest := range regressiontest.SubTests {
t.Run(name, func(t *testing.T) {
- db, cleanup := sqltest.NewCockroachDBForTests(t, "regstore", sqltest.ApplyMigrations)
+ db, cleanup := sqltest.NewCockroachDBForTests(t, "regstore")
// If this test times out then comment out the cleanup(), as it may hide the
// actual errors.
defer cleanup()
@@ -32,7 +32,7 @@
// SQLRegressionStore specific tests.
for name, subTest := range subTests {
t.Run(name, func(t *testing.T) {
- db, cleanup := sqltest.NewCockroachDBForTests(t, "regstore", sqltest.ApplyMigrations)
+ db, cleanup := sqltest.NewCockroachDBForTests(t, "regstore")
// If this test times out then comment out the cleanup(), as it may hide the
// actual errors.
defer cleanup()
diff --git a/perf/go/shortcut/sqlshortcutstore/sqlshortcutstore_test.go b/perf/go/shortcut/sqlshortcutstore/sqlshortcutstore_test.go
index 8130be6..c53b3a8 100644
--- a/perf/go/shortcut/sqlshortcutstore/sqlshortcutstore_test.go
+++ b/perf/go/shortcut/sqlshortcutstore/sqlshortcutstore_test.go
@@ -14,7 +14,7 @@
for name, subTest := range shortcuttest.SubTests {
t.Run(name, func(t *testing.T) {
- db, cleanup := sqltest.NewCockroachDBForTests(t, "shortcutstore", sqltest.ApplyMigrations)
+ db, cleanup := sqltest.NewCockroachDBForTests(t, "shortcutstore")
defer cleanup()
store, err := New(db)
require.NoError(t, err)
diff --git a/perf/go/sql/sqltest/sqltest.go b/perf/go/sql/sqltest/sqltest.go
index 46db2a5..95cb1d8 100644
--- a/perf/go/sql/sqltest/sqltest.go
+++ b/perf/go/sql/sqltest/sqltest.go
@@ -18,17 +18,6 @@
// database resources.
type Cleanup func()
-// ApplyMigrationsOption indicates if migrations should be applied to an SQL database.
-type ApplyMigrationsOption bool
-
-const (
- // ApplyMigrations is used if migrations at to be applied.
- ApplyMigrations ApplyMigrationsOption = true
-
- // DoNotApplyMigrations is used if migrations should not be applied.
- DoNotApplyMigrations ApplyMigrationsOption = false
-)
-
// NewCockroachDBForTests creates a new temporary CockroachDB database with all
// migrations applied for testing. It also returns a function to call to clean
// up the database after the tests have completed.
@@ -39,7 +28,7 @@
// test.
//
// If migrations to are be applied then set applyMigrations to true.
-func NewCockroachDBForTests(t *testing.T, databaseName string, applyMigrations ApplyMigrationsOption) (*pgxpool.Pool, Cleanup) {
+func NewCockroachDBForTests(t *testing.T, databaseName string) (*pgxpool.Pool, Cleanup) {
// Note that the migrationsConnection is different from the sql.Open
// connection string since migrations know about CockroachDB, but we use the
// Postgres driver for the database/sql connection since there's no native
@@ -60,10 +49,8 @@
cockroachdbMigrations, err := cockroachdb.New()
require.NoError(t, err)
- if applyMigrations {
- err = migrations.Up(cockroachdbMigrations, migrationsConnection)
- require.NoError(t, err)
- }
+ err = migrations.Up(cockroachdbMigrations, migrationsConnection)
+ require.NoError(t, err)
ctx := context.Background()
conn, err := pgxpool.Connect(ctx, connectionString)
diff --git a/perf/go/tracestore/btts/btts.go b/perf/go/tracestore/btts/btts.go
index 8c4eb98..d2f7dab 100644
--- a/perf/go/tracestore/btts/btts.go
+++ b/perf/go/tracestore/btts/btts.go
@@ -345,7 +345,7 @@
}
// GetOrderedParamSet implements the tracestore.TraceStore interface.
-func (b *BigTableTraceStore) GetOrderedParamSet(ctx context.Context, tileNumber types.TileNumber) (*paramtools.OrderedParamSet, error) {
+func (b *BigTableTraceStore) GetOrderedParamSet(ctx context.Context, tileNumber types.TileNumber, _ time.Time) (*paramtools.OrderedParamSet, error) {
tileKey := TileKeyFromTileNumber(tileNumber)
ctx, span := trace.StartSpan(ctx, "BigTableTraceStore.GetOrderedParamSet")
defer span.End()
@@ -565,7 +565,7 @@
func (b *BigTableTraceStore) ReadTraces(tileNumber types.TileNumber, keys []string) (types.TraceSet, error) {
tileKey := TileKeyFromTileNumber(tileNumber)
// First encode all the keys by the OrderedParamSet of the given tile.
- ops, err := b.GetOrderedParamSet(context.TODO(), tileNumber)
+ ops, err := b.GetOrderedParamSet(context.TODO(), tileNumber, time.Now())
if err != nil {
return nil, fmt.Errorf("Failed to get OPS: %s", err)
}
@@ -611,7 +611,7 @@
func (b *BigTableTraceStore) regexpFromQuery(ctx context.Context, tileNumber types.TileNumber, q *query.Query) (*regexp.Regexp, error) {
// Get the OPS, which we need to encode the query, and decode the traceids of the results.
- ops, err := b.GetOrderedParamSet(ctx, tileNumber)
+ ops, err := b.GetOrderedParamSet(ctx, tileNumber, time.Now())
if err != nil {
return nil, err
}
@@ -647,7 +647,7 @@
ctx, span := trace.StartSpan(ctx, "BigTableTraceStore.QueryTraces")
defer span.End()
- ops, err := b.GetOrderedParamSet(ctx, tileNumber)
+ ops, err := b.GetOrderedParamSet(ctx, tileNumber, time.Now())
if err != nil {
return nil, fmt.Errorf("Failed to get OPS: %s", err)
}
@@ -709,7 +709,7 @@
defer span.End()
outParams := make(chan paramtools.Params, engine.QUERY_ENGINE_CHANNEL_SIZE)
- ops, err := b.GetOrderedParamSet(ctx, tileNumber)
+ ops, err := b.GetOrderedParamSet(ctx, tileNumber, time.Now())
if err != nil {
return nil, skerr.Wrapf(err, "Failed to get OPS")
}
@@ -771,7 +771,7 @@
ctx, span := trace.StartSpan(ctx, "BigTableTraceStore.QueryTracesByIndex")
defer span.End()
- ops, err := b.GetOrderedParamSet(ctx, tileNumber)
+ ops, err := b.GetOrderedParamSet(ctx, tileNumber, time.Now())
if err != nil {
return nil, fmt.Errorf("Failed to get OPS: %s", err)
}
@@ -897,7 +897,7 @@
ctx, span := trace.StartSpan(ctx, "BigTableTraceStore.allTraces")
defer span.End()
- ops, err := b.GetOrderedParamSet(ctx, tileNumber)
+ ops, err := b.GetOrderedParamSet(ctx, tileNumber, time.Now())
if err != nil {
return nil, fmt.Errorf("Failed to get OPS: %s", err)
}
@@ -1023,7 +1023,7 @@
tileKey := b.tileKey(commitNumber)
tileNumber := types.TileNumberFromCommitNumber(commitNumber, b.tileSize)
offset := b.OffsetFromCommitNumber(commitNumber)
- ops, err := b.GetOrderedParamSet(ctx, tileNumber)
+ ops, err := b.GetOrderedParamSet(ctx, tileNumber, time.Now())
if err != nil {
return "", fmt.Errorf("Failed to load OrderedParamSet for tile: %s", err)
}
diff --git a/perf/go/tracestore/btts/btts_test.go b/perf/go/tracestore/btts/btts_test.go
index ca970e0..e165d64 100644
--- a/perf/go/tracestore/btts/btts_test.go
+++ b/perf/go/tracestore/btts/btts_test.go
@@ -186,7 +186,7 @@
assert.NoError(t, err)
tileNumber := types.TileNumber(1)
- ops, err := b.GetOrderedParamSet(ctx, tileNumber)
+ ops, err := b.GetOrderedParamSet(ctx, tileNumber, time.Now())
assert.NoError(t, err)
assertIndices(t, ops, b, nil, "Start empty")
@@ -210,7 +210,7 @@
err = b.WriteTraces(257, expectedParams, values, paramset, "gs://some/test/location", now)
assert.NoError(t, err)
- ops, err = b.GetOrderedParamSet(ctx, tileNumber)
+ ops, err = b.GetOrderedParamSet(ctx, tileNumber, time.Now())
assert.NoError(t, err)
count := assertIndices(t, ops, b, expectedParams, "First write")
assert.Equal(t, 8, count)
@@ -272,7 +272,7 @@
}
err = b.WriteTraces(257, overWriteParams, values, paramset, "gs://some/other/test/location", now)
assert.NoError(t, err)
- ops, err = b.GetOrderedParamSet(ctx, tileNumber)
+ ops, err = b.GetOrderedParamSet(ctx, tileNumber, time.Now())
assert.NoError(t, err)
count = assertIndices(t, ops, b, expectedParams, "Overwrite")
assert.Equal(t, 8, count)
@@ -337,7 +337,7 @@
// Add new trace to expectations.
expectedParams = append(expectedParams, writeParams[0])
- ops, err = b.GetOrderedParamSet(ctx, tileNumber)
+ ops, err = b.GetOrderedParamSet(ctx, tileNumber, time.Now())
assert.NoError(t, err)
count = assertIndices(t, ops, b, expectedParams, "Add new trace")
assert.Equal(t, 10, count)
diff --git a/perf/go/tracestore/btts/execute_test.go b/perf/go/tracestore/btts/execute_test.go
index 4508d8e..e130c3f 100644
--- a/perf/go/tracestore/btts/execute_test.go
+++ b/perf/go/tracestore/btts/execute_test.go
@@ -65,7 +65,7 @@
err = b.WriteTraces(257, params, values, paramset, "gs://some/test/location", now)
assert.NoError(t, err)
- ops, err := b.GetOrderedParamSet(ctx, tileNumber)
+ ops, err := b.GetOrderedParamSet(ctx, tileNumber, time.Now())
assert.NoError(t, err)
// Now that the Tile is populated construct an encoded key=value pair to
@@ -134,7 +134,7 @@
err = b.WriteTraces(257, params, values, paramset, "gs://some/test/location", now)
assert.NoError(t, err)
- ops, err := b.GetOrderedParamSet(ctx, tileNumber)
+ ops, err := b.GetOrderedParamSet(ctx, tileNumber, time.Now())
assert.NoError(t, err)
// Now that the Tile is populated construct an encoded paramset to use as a
diff --git a/perf/go/tracestore/btts/paramindex_test.go b/perf/go/tracestore/btts/paramindex_test.go
index b6d5d9d..cf8cb96 100644
--- a/perf/go/tracestore/btts/paramindex_test.go
+++ b/perf/go/tracestore/btts/paramindex_test.go
@@ -47,7 +47,7 @@
err = b.WriteTraces(257, params, values, paramset, "gs://some/test/location", now)
assert.NoError(t, err)
- ops, err := b.GetOrderedParamSet(ctx, tileNumber)
+ ops, err := b.GetOrderedParamSet(ctx, tileNumber, time.Now())
assert.NoError(t, err)
// Now that the Tile is populated construct an encoded key=value pair to
@@ -111,7 +111,7 @@
err = b.WriteTraces(257, params, values, paramset, "gs://some/test/location", now)
assert.NoError(t, err)
- ops, err := b.GetOrderedParamSet(ctx, tileNumber)
+ ops, err := b.GetOrderedParamSet(ctx, tileNumber, time.Now())
assert.NoError(t, err)
// Pick out an encoded key=value pair that corresponds to a know unencoded
diff --git a/perf/go/tracestore/sqltracestore/sqltracestore.go b/perf/go/tracestore/sqltracestore/sqltracestore.go
index 39452c9..387ec72 100644
--- a/perf/go/tracestore/sqltracestore/sqltracestore.go
+++ b/perf/go/tracestore/sqltracestore/sqltracestore.go
@@ -65,7 +65,7 @@
DISTINCT TraceNames.params
FROM
TraceNames
- INNER JOIN TraceValues2 ON TraceNames.trace_id = TraceValues2.trace_id
+ INNER LOOKUP JOIN TraceValues2 ON TraceNames.trace_id = TraceValues2.trace_id
WHERE
TraceValues2.commit_number >= 0
AND TraceValues2.commit_number < 512;
@@ -80,7 +80,7 @@
TraceValues2.val
FROM
TraceNames
- INNER JOIN TraceValues2 ON TraceValues2.trace_id = TraceNames.trace_id
+ INNER LOOKUP JOIN TraceValues2 ON TraceValues2.trace_id = TraceNames.trace_id
WHERE
TraceNames.params ->> 'arch' IN ('x86')
AND TraceNames.params ->> 'config' IN ('565', '8888')
@@ -102,6 +102,7 @@
"text/template"
"time"
+ lru "github.com/hashicorp/golang-lru"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"go.skia.org/infra/go/metrics2"
@@ -126,6 +127,15 @@
// specified in the config file.
const defaultCacheSize = 20 * 1000 * 1000
+const orderedParamSetCacheSize = 100
+
+const orderedParamSetCacheTTL = 5 * time.Minute
+
+type orderedParamSetCacheEntry struct {
+ expires time.Time // When this entry expires.
+ orderedParamSet *paramtools.OrderedParamSet
+}
+
// traceIDForSQL is the type of the IDs that are used in the SQL queries,
// they are hex encoded md5 hashes of a trace name, e.g. "\x00112233...".
// Note the \x prefix which tells CockroachDB that this is hex encoded.
@@ -164,6 +174,7 @@
queryTraces
queryTracesIDOnly
readTraces
+ insertIntoTiles
)
var templates = map[statement]string{
@@ -198,7 +209,7 @@
TraceValues2.val
FROM
TraceNames
- INNER JOIN TraceValues2 ON TraceValues2.trace_id = TraceNames.trace_id
+ INNER LOOKUP JOIN TraceValues2 ON TraceValues2.trace_id = TraceNames.trace_id
WHERE
TraceValues2.commit_number >= {{ .BeginCommitNumber }}
AND TraceValues2.commit_number < {{ .EndCommitNumber }}
@@ -216,7 +227,7 @@
TraceNames.params
FROM
TraceNames
- INNER JOIN TraceValues2 ON TraceValues2.trace_id = TraceNames.trace_id
+ INNER LOOKUP JOIN TraceValues2 ON TraceValues2.trace_id = TraceNames.trace_id
WHERE
TraceValues2.commit_number >= {{ .BeginCommitNumber }}
AND TraceValues2.commit_number < {{ .EndCommitNumber }}
@@ -236,7 +247,7 @@
TraceValues2.val
FROM
TraceNames
- INNER JOIN TraceValues2 ON TraceValues2.trace_id = TraceNames.trace_id
+ INNER LOOKUP JOIN TraceValues2 ON TraceValues2.trace_id = TraceNames.trace_id
WHERE
TraceValues2.commit_number >= {{ .BeginCommitNumber }}
AND TraceValues2.commit_number < {{ .EndCommitNumber }}
@@ -257,6 +268,16 @@
WHERE
TraceNames.trace_id = '{{ .MD5HexTraceID }}'
AND TraceValues2.commit_number = {{ .CommitNumber }}`,
+ insertIntoTiles: `
+ INSERT INTO
+ Tiles (tile_number, trace_id)
+ VALUES
+ {{ range $index, $element := . -}}
+ {{ if $index }},{{end}}
+ ( {{ $element.TileNumber }}, '{{ $element.MD5HexTraceID }}' )
+ {{ end }}
+ ON CONFLICT
+ DO NOTHING`,
}
// replaceTraceValuesContext is the context for the replaceTraceValues template.
@@ -306,6 +327,16 @@
MD5HexTraceID traceIDForSQL
}
+// insertIntoTilesContext is the context for the insertIntoTiles template.
+type insertIntoTilesContext struct {
+ TileNumber types.TileNumber
+
+ // The MD5 sum of the trace name as a hex string, i.e.
+ // "\xfe385b159ff55dca481069805e5ff050". Note the leading \x which
+ // CockroachDB will use to know the string is in hex.
+ MD5HexTraceID traceIDForSQL
+}
+
var statements = map[statement]string{
insertIntoSourceFiles: `
INSERT INTO
@@ -332,13 +363,12 @@
1;`,
paramSetForTile: `
SELECT
- DISTINCT TraceNames.params
+ TraceNames.params
FROM
TraceNames
- INNER JOIN TraceValues2 ON TraceNames.trace_id = TraceValues2.trace_id
+ INNER LOOKUP JOIN Tiles ON TraceNames.trace_id = Tiles.trace_id
WHERE
- TraceValues2.commit_number >= $1
- AND TraceValues2.commit_number < $2;`,
+ Tiles.tile_number = $1`,
traceCount: `
SELECT
COUNT(DISTINCT trace_id)
@@ -356,8 +386,17 @@
// unpreparedStatements are parsed templates that can be used to construct SQL statements.
unpreparedStatements map[statement]*template.Template
+ // A cache from md5(trace_name) -> true if the trace_name has already been
+ // written to the TraceNames table.
+ //
+ // And from md5(trace_name)+tile_number -> true if the trace_name has
+ // already been written to the Tiles table.
cache cache.Cache
+ // orderedParamSetCache is a cache for OrderedParamSets that have a TTL. The
+ // cache maps tileNumber -> orderedParamSetCacheEntry.
+ orderedParamSetCache *lru.Cache
+
// tileSize is the number of commits per Tile.
tileSize int32
@@ -385,12 +424,17 @@
if err != nil {
return nil, skerr.Wrap(err)
}
+ paramSetCache, err := lru.New(orderedParamSetCacheSize)
+ if err != nil {
+ return nil, skerr.Wrap(err)
+ }
return &SQLTraceStore{
db: db,
unpreparedStatements: unpreparedStatements,
tileSize: datastoreConfig.TileSize,
cache: cache,
+ orderedParamSetCache: paramSetCache,
writeTracesMetric: metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_writeTraces"),
writeTracesMetricSQL: metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_writeTracesSQL"),
buildTracesContextsMetric: metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_buildTracesContext"),
@@ -421,13 +465,9 @@
}
func (s *SQLTraceStore) paramSetForTile(tileNumber types.TileNumber) (paramtools.ParamSet, error) {
- // Convert the tile number into a range of commits, since we don't store data by
- // tile anymore.
- beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)
- rows, err := s.db.Query(context.TODO(), statements[paramSetForTile], beginCommit, endCommit)
+ rows, err := s.db.Query(context.TODO(), statements[paramSetForTile], tileNumber)
if err != nil {
- fmt.Printf("%q %d %d", statements[paramSetForTile], beginCommit, endCommit)
return nil, skerr.Wrapf(err, "Failed querying - tileNumber=%d", tileNumber)
}
ret := paramtools.NewParamSet()
@@ -452,8 +492,22 @@
return ret, nil
}
+// ClearOrderedParamSetCache is only used for tests.
+func (s *SQLTraceStore) ClearOrderedParamSetCache() {
+ s.orderedParamSetCache.Purge()
+}
+
// GetOrderedParamSet implements the tracestore.TraceStore interface.
-func (s *SQLTraceStore) GetOrderedParamSet(ctx context.Context, tileNumber types.TileNumber) (*paramtools.OrderedParamSet, error) {
+func (s *SQLTraceStore) GetOrderedParamSet(ctx context.Context, tileNumber types.TileNumber, now time.Time) (*paramtools.OrderedParamSet, error) {
+
+ iEntry, ok := s.orderedParamSetCache.Get(tileNumber)
+ if ok {
+ entry := iEntry.(orderedParamSetCacheEntry)
+ if entry.expires.After(now) {
+ return entry.orderedParamSet, nil
+ }
+ _ = s.orderedParamSetCache.Remove(tileNumber)
+ }
ps, err := s.paramSetForTile(tileNumber)
if err != nil {
return nil, skerr.Wrap(err)
@@ -461,6 +515,12 @@
ret := paramtools.NewOrderedParamSet()
ret.Update(ps)
sort.Strings(ret.KeyOrder)
+
+ _ = s.orderedParamSetCache.Add(tileNumber, orderedParamSetCacheEntry{
+ expires: now.Add(orderedParamSetCacheTTL),
+ orderedParamSet: ret,
+ })
+
return ret, nil
}
@@ -493,7 +553,7 @@
// QueryTracesByIndex implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) QueryTracesByIndex(ctx context.Context, tileNumber types.TileNumber, q *query.Query) (types.TraceSet, error) {
- ops, err := s.GetOrderedParamSet(ctx, tileNumber)
+ ops, err := s.GetOrderedParamSet(ctx, tileNumber, time.Now())
if err != nil {
return nil, skerr.Wrapf(err, "Failed to get OPS.")
}
@@ -583,7 +643,7 @@
return outParams, skerr.Fmt("Can't run QueryTracesIDOnlyByIndex for the empty query.")
}
- ops, err := s.GetOrderedParamSet(ctx, tileNumber)
+ ops, err := s.GetOrderedParamSet(ctx, tileNumber, time.Now())
if err != nil {
close(outParams)
return outParams, skerr.Wrap(err)
@@ -765,9 +825,16 @@
return ret, nil
}
+func cacheKeyForTraceIDAndTile(traceID traceIDForSQL, tileNumber types.TileNumber) string {
+ return fmt.Sprintf("%s-%d", traceID, tileNumber)
+}
+
// WriteTraces implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) WriteTraces(commitNumber types.CommitNumber, params []paramtools.Params, values []float32, _ paramtools.ParamSet, source string, _ time.Time) error {
defer timer.NewWithSummary("perfserver_sqltracestore_writeTraces", s.writeTracesMetric).Stop()
+
+ tileNumber := s.TileNumber(commitNumber)
+
// Get the row id for the source file.
sourceID, err := s.updateSourceFile(source)
if err != nil {
@@ -778,6 +845,7 @@
// Build the 'context' which will be used to populate the SQL template.
namesTemplateContext := make([]replaceTraceNamesContext, 0, len(params))
valuesTemplateContext := make([]replaceTraceValuesContext, 0, len(params))
+ tilesTemplateContext := make([]insertIntoTilesContext, 0, len(params))
for i, p := range params {
traceName, err := query.MakeKey(p)
@@ -802,6 +870,12 @@
JSONParams: string(jsonParams),
})
}
+ if !s.cache.Exists(cacheKeyForTraceIDAndTile(traceID, tileNumber)) {
+ tilesTemplateContext = append(tilesTemplateContext, insertIntoTilesContext{
+ MD5HexTraceID: traceID,
+ TileNumber: tileNumber,
+ })
+ }
}
t.Stop()
@@ -812,8 +886,7 @@
defer cancel()
if len(namesTemplateContext) > 0 {
-
- err = util.ChunkIter(len(namesTemplateContext), 100, func(startIdx int, endIdx int) error {
+ err := util.ChunkIter(len(namesTemplateContext), 100, func(startIdx int, endIdx int) error {
var b bytes.Buffer
if err := s.unpreparedStatements[replaceTraceNames].Execute(&b, namesTemplateContext[startIdx:endIdx]); err != nil {
return skerr.Wrapf(err, "failed to expand trace names template on slice [%d, %d]", startIdx, endIdx)
@@ -836,6 +909,30 @@
}
}
+ if len(tilesTemplateContext) > 0 {
+ err := util.ChunkIter(len(tilesTemplateContext), 100, func(startIdx int, endIdx int) error {
+ var b bytes.Buffer
+ if err := s.unpreparedStatements[insertIntoTiles].Execute(&b, tilesTemplateContext[startIdx:endIdx]); err != nil {
+ return skerr.Wrapf(err, "failed to expand tiles template on slice [%d, %d]", startIdx, endIdx)
+ }
+ sql := b.String()
+
+ sklog.Infof("About to write %d tiles tiles with sql of length %d", len(params), len(sql))
+ if _, err := s.db.Exec(ctx, sql); err != nil {
+ return skerr.Wrapf(err, "Executing: %q", b.String())
+ }
+ return nil
+ })
+
+ if err != nil {
+ return err
+ }
+
+ for _, entry := range tilesTemplateContext {
+ s.cache.Add(cacheKeyForTraceIDAndTile(entry.MD5HexTraceID, tileNumber), "1")
+ }
+ }
+
sklog.Infof("About to format %d trace values", len(params))
err = util.ChunkIter(len(valuesTemplateContext), writeTracesChunkSize, func(startIdx int, endIdx int) error {
@@ -858,7 +955,7 @@
sklog.Info("Finished writing trace values.")
- return err
+ return nil
}
// Confirm that *SQLTraceStore fulfills the tracestore.TraceStore interface.
diff --git a/perf/go/tracestore/sqltracestore/sqltracestore_test.go b/perf/go/tracestore/sqltracestore/sqltracestore_test.go
index c4604ad..db48faa 100644
--- a/perf/go/tracestore/sqltracestore/sqltracestore_test.go
+++ b/perf/go/tracestore/sqltracestore/sqltracestore_test.go
@@ -37,7 +37,7 @@
func commonTestSetup(t *testing.T, populateTraces bool) (context.Context, *SQLTraceStore, sqltest.Cleanup) {
unittest.LargeTest(t)
ctx := context.Background()
- db, cleanup := sqltest.NewCockroachDBForTests(t, fmt.Sprintf("tracestore%d", rand.Int63()), sqltest.ApplyMigrations)
+ db, cleanup := sqltest.NewCockroachDBForTests(t, fmt.Sprintf("tracestore%d", rand.Int63()))
store, err := New(db, cfg)
require.NoError(t, err)
@@ -336,7 +336,10 @@
ctx, s, cleanup := commonTestSetup(t, true)
defer cleanup()
- ops, err := s.GetOrderedParamSet(ctx, 1)
+ tileNumber := types.TileNumber(1)
+ assert.False(t, s.orderedParamSetCache.Contains(tileNumber))
+
+ ops, err := s.GetOrderedParamSet(ctx, tileNumber, time.Now())
assert.NoError(t, err)
expected := paramtools.ParamSet{
"arch": []string{"x86"},
@@ -344,6 +347,51 @@
}
assert.Equal(t, expected, ops.ParamSet)
assert.Equal(t, []string{"arch", "config"}, ops.KeyOrder)
+
+ assert.True(t, s.orderedParamSetCache.Contains(tileNumber))
+}
+
+func TestGetOrderedParamSet_ParamSetCacheIsClearedAfterTTL(t *testing.T) {
+ ctx, s, cleanup := commonTestSetup(t, true)
+ defer cleanup()
+
+ tileNumber := types.TileNumber(0)
+ assert.False(t, s.orderedParamSetCache.Contains(tileNumber))
+
+ ops, err := s.GetOrderedParamSet(ctx, tileNumber, time.Now())
+ assert.NoError(t, err)
+ expected := paramtools.ParamSet{
+ "arch": []string{"x86"},
+ "config": []string{"565", "8888"},
+ }
+ assert.Equal(t, expected, ops.ParamSet)
+ assert.Equal(t, []string{"arch", "config"}, ops.KeyOrder)
+ assert.True(t, s.orderedParamSetCache.Contains(tileNumber))
+
+ // Add new points that will expand the ParamSet.
+ traceNames := []paramtools.Params{
+ {"config": "8888", "arch": "risc-v"},
+ {"config": "565", "arch": "risc-v"},
+ }
+ err = s.WriteTraces(types.CommitNumber(1), traceNames,
+ []float32{1.5, 2.3},
+ paramtools.ParamSet{}, // ParamSet is empty because WriteTraces doesn't use it in this impl.
+ "gs://perf-bucket/2020/02/08/11/testdata.json",
+ time.Time{}) // time is unused in this impl of TraceStore.
+
+ // The cached version should be returned.
+ ops, err = s.GetOrderedParamSet(ctx, tileNumber, time.Now())
+ assert.NoError(t, err)
+ assert.Equal(t, expected, ops.ParamSet)
+
+ // But if we query past the TTL we should get an updated OPS.
+ updatedExpected := paramtools.ParamSet{
+ "arch": []string{"risc-v", "x86"},
+ "config": []string{"565", "8888"},
+ }
+ ops, err = s.GetOrderedParamSet(ctx, tileNumber, time.Now().Add(orderedParamSetCacheTTL*2))
+ assert.NoError(t, err)
+ assert.Equal(t, updatedExpected, ops.ParamSet)
}
func TestGetOrderedParamSet_Empty(t *testing.T) {
@@ -351,7 +399,7 @@
defer cleanup()
// Test the empty case where there is no data in datastore.
- ops, err := s.GetOrderedParamSet(ctx, 1)
+ ops, err := s.GetOrderedParamSet(ctx, 1, time.Now())
assert.NoError(t, err)
assert.Equal(t, paramtools.ParamSet{}, ops.ParamSet)
}
diff --git a/perf/go/tracestore/tracestore.go b/perf/go/tracestore/tracestore.go
index 4e082b1..ddc1225 100644
--- a/perf/go/tracestore/tracestore.go
+++ b/perf/go/tracestore/tracestore.go
@@ -25,7 +25,7 @@
GetLatestTile() (types.TileNumber, error)
// GetOrderedParamSet returns the OPS for the given tile.
- GetOrderedParamSet(ctx context.Context, tileNumber types.TileNumber) (*paramtools.OrderedParamSet, error)
+ GetOrderedParamSet(ctx context.Context, tileNumber types.TileNumber, now time.Time) (*paramtools.OrderedParamSet, error)
// GetSource returns the full URL of the file that contained the point at
// 'index' of trace 'traceId'.
diff --git a/perf/migrations/cdb.sql b/perf/migrations/cdb.sql
index abbe312..1a652db 100644
--- a/perf/migrations/cdb.sql
+++ b/perf/migrations/cdb.sql
@@ -86,24 +86,24 @@
params ->> 'arch' IN ('x86', 'arm')
AND params ->> 'config' IN ('8888');
--- ParamSet for two Tiles
+-- ParamSet for a tile.
SELECT
DISTINCT TraceNames.params
FROM
- TraceNames
- INNER JOIN TraceValues2 ON TraceNames.trace_id = TraceValues2.trace_id
+ TraceNames INNER LOOKUP
+ JOIN Tiles ON TraceNames.trace_id = Tiles.trace_id
WHERE
- TraceValues2.commit_number >= 0
- AND TraceValues2.commit_number < 512;
+ Tiles.tile_number = 2
+LIMIT
+ 10;
--- Count the number traces that are in a single tile.
+-- Count traces per tile.
SELECT
- COUNT(DISTINCT trace_id)
+ COUNT(trace_id)
FROM
- TraceValues2
+ Tiles
WHERE
- commit_number > 0
- AND commit_number < 256;
+ tile_number = 3;
-- Most recent commit.
SELECT
@@ -133,7 +133,7 @@
TraceNames
INNER JOIN TraceValues2 ON TraceNames.trace_id = TraceValues2.trace_id
WHERE
- TraceNames.params ->> 'arch' IN ('riscv')
+ TraceNames.params -> 'arch' IN ('"riscv"' :: JSONB)
AND TraceValues2.commit_number >= 256
AND TraceValues2.commit_number < 512;
@@ -148,5 +148,34 @@
WHERE
TraceValues2.commit_number >= 0
AND TraceValues2.commit_number < 255
- AND TraceNames.params ->> 'arch' IN ('x86')
- AND TraceNames.params ->> 'config' IN ('565', '8888');
\ No newline at end of file
+ AND TraceNames.params -> 'arch' IN ('"x86"' :: JSONB)
+ AND TraceNames.params -> 'config' IN ('"565"' :: JSONB, '"8888"' :: JSONB);
+
+-- This is fast with PRIMARY KEY (trace_id, commit_number)
+SELECT
+ tracenames.params,
+ tracevalues2.commit_number,
+ tracevalues2.val
+FROM
+ TraceValues2 INNER LOOKUP
+ JOIN TraceNames ON tracevalues2.trace_id = tracenames.trace_id
+WHERE
+ tracevalues2.commit_number >= 47920
+ AND tracevalues2.commit_number < 49950
+ AND tracevalues2.trace_id IN (
+ SELECT
+ DISTINCT trace_id
+ FROM
+ tracenames
+ WHERE
+ params -> 'name' = '"AndroidCodec_01_original.jpg_SampleSize2"' :: JSONB
+ );
+
+-- Create the Tile table on the fly if we haven't ingested it.
+INSERT INTO
+ Tiles (tile_number, trace_id)
+SELECT
+ DISTINCT mod(commit_number, 256),
+ trace_id
+FROM
+ tracevalues2 ON CONFLICT DO NOTHING;
\ No newline at end of file
diff --git a/perf/migrations/cockroachdb/0002_create_initial_tables.up.sql b/perf/migrations/cockroachdb/0002_create_initial_tables.up.sql
index 30452d6..46aec6c 100644
--- a/perf/migrations/cockroachdb/0002_create_initial_tables.up.sql
+++ b/perf/migrations/cockroachdb/0002_create_initial_tables.up.sql
@@ -17,5 +17,13 @@
val REAL,
-- Id of the source filename, from SourceFiles.
source_file_id INT,
- PRIMARY KEY (commit_number, trace_id)
+ PRIMARY KEY (trace_id, commit_number)
+);
+
+CREATE TABLE IF NOT EXISTS Tiles (
+ -- Id of the trace name from TraceNames.
+ trace_id BYTES,
+ -- The number of the tile that the trace_id appears in.
+ tile_number INT,
+ PRIMARY KEY (trace_id, tile_number)
);
\ No newline at end of file