blob: 0602ee521c9718c6e28049d91ff6246feb1e3a8b [file] [log] [blame] [edit]
// Package builders builds objects from config.InstanceConfig objects.
//
// These are functions separate from config.InstanceConfig so that we don't end
// up with cyclical import issues.
package builders
import (
"context"
"io/fs"
"sync"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
_ "github.com/jackc/pgx/v4/stdlib" // pgx Go sql
"go.skia.org/infra/go/cache"
"go.skia.org/infra/go/deepequal/assertdeep"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/sql/pool"
"go.skia.org/infra/go/sql/pool/wrapper/timeout"
"go.skia.org/infra/go/sql/schema"
"go.skia.org/infra/perf/go/alerts"
"go.skia.org/infra/perf/go/alerts/sqlalertstore"
"go.skia.org/infra/perf/go/anomalygroup"
ag_store "go.skia.org/infra/perf/go/anomalygroup/sqlanomalygroupstore"
"go.skia.org/infra/perf/go/config"
"go.skia.org/infra/perf/go/culprit"
culprit_store "go.skia.org/infra/perf/go/culprit/sqlculpritstore"
"go.skia.org/infra/perf/go/favorites"
favorite_store "go.skia.org/infra/perf/go/favorites/sqlfavoritestore"
"go.skia.org/infra/perf/go/file"
"go.skia.org/infra/perf/go/file/dirsource"
"go.skia.org/infra/perf/go/file/gcssource"
"go.skia.org/infra/perf/go/filestore/gcs"
localfilestore "go.skia.org/infra/perf/go/filestore/local"
perfgit "go.skia.org/infra/perf/go/git"
"go.skia.org/infra/perf/go/graphsshortcut"
"go.skia.org/infra/perf/go/graphsshortcut/graphsshortcutstore"
"go.skia.org/infra/perf/go/regression"
"go.skia.org/infra/perf/go/regression/sqlregression2store"
"go.skia.org/infra/perf/go/regression/sqlregressionstore"
"go.skia.org/infra/perf/go/shortcut"
"go.skia.org/infra/perf/go/shortcut/sqlshortcutstore"
"go.skia.org/infra/perf/go/sql"
"go.skia.org/infra/perf/go/sql/expectedschema"
"go.skia.org/infra/perf/go/subscription"
subscription_store "go.skia.org/infra/perf/go/subscription/sqlsubscriptionstore"
"go.skia.org/infra/perf/go/tracestore"
"go.skia.org/infra/perf/go/tracestore/sqltracestore"
gcp_redis "cloud.google.com/go/redis/apiv1"
localCache "go.skia.org/infra/go/cache/local"
redisCache "go.skia.org/infra/go/cache/redis"
)
// pgxLogAdaptor allows bubbling pgx logs up into our application.
type pgxLogAdaptor struct{}
// Log a message at the given level with data key/value pairs. data may be nil.
func (pgxLogAdaptor) Log(ctx context.Context, level pgx.LogLevel, msg string, data map[string]interface{}) {
switch level {
case pgx.LogLevelTrace:
case pgx.LogLevelDebug:
case pgx.LogLevelInfo:
case pgx.LogLevelWarn:
sklog.Warningf("pgx - %s %v", msg, data)
case pgx.LogLevelError:
sklog.Warningf("pgx - %s %v", msg, data)
case pgx.LogLevelNone:
}
}
// maxPoolConnections is the MaxConns our pgxPool will maintain.
//
// TODO(jcgregorio) This is a guess of a good number, once later CLs land I can
// experiment with how this affects performance.
const maxPoolConnections = 300
// singletonPool is the one and only instance of pool.Pool that an
// application should have, used in NewCockroachDBFromConfig.
var singletonPool pool.Pool
// singletonPoolMutex is used to enforce the singleton nature of singletonPool,
// used in NewCockroachDBFromConfig
var singletonPoolMutex sync.Mutex
// NewCockroachDBFromConfig opens an existing CockroachDB database.
//
// No migrations are applied automatically, they must be applied by the
// 'migrate' command line application. See COCKROACHDB.md for more details.
func NewCockroachDBFromConfig(ctx context.Context, instanceConfig *config.InstanceConfig, checkSchema bool) (pool.Pool, error) {
singletonPoolMutex.Lock()
defer singletonPoolMutex.Unlock()
if singletonPool != nil {
return singletonPool, nil
}
cfg, err := pgxpool.ParseConfig(instanceConfig.DataStoreConfig.ConnectionString)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to parse database config: %q", instanceConfig.DataStoreConfig.ConnectionString)
}
sklog.Infof("%#v", *cfg)
cfg.MaxConns = maxPoolConnections
cfg.ConnConfig.Logger = pgxLogAdaptor{}
rawPool, err := pgxpool.ConnectConfig(ctx, cfg)
if err != nil {
return nil, skerr.Wrap(err)
}
// Wrap the db pool in a ContentTimeout which checks that every context has
// a timeout.
singletonPool = timeout.New(rawPool)
if checkSchema {
// Confirm the database has the right schema.
expectedSchema, err := expectedschema.Load()
if err != nil {
return nil, skerr.Wrap(err)
}
actual, err := schema.GetDescription(ctx, singletonPool, sql.Tables{})
if err != nil {
return nil, skerr.Wrap(err)
}
if diff := assertdeep.Diff(expectedSchema, *actual); diff != "" {
return nil, skerr.Fmt("Schema needs to be updated: %s.", diff)
}
}
return singletonPool, err
}
// NewPerfGitFromConfig return a new perfgit.Git for the given instanceConfig.
//
// The instance created does not poll by default, callers need to call
// StartBackgroundPolling().
func NewPerfGitFromConfig(ctx context.Context, local bool, instanceConfig *config.InstanceConfig) (perfgit.Git, error) {
if instanceConfig.DataStoreConfig.ConnectionString == "" {
return nil, skerr.Fmt("A connection_string must always be supplied.")
}
switch instanceConfig.DataStoreConfig.DataStoreType {
case config.CockroachDBDataStoreType:
default:
return nil, skerr.Fmt("Unknown datastore_type: %q", instanceConfig.DataStoreConfig.DataStoreType)
}
// Now create the appropriate db.
db, err := NewCockroachDBFromConfig(ctx, instanceConfig, true)
if err != nil {
return nil, skerr.Wrap(err)
}
g, err := perfgit.New(ctx, local, db, instanceConfig)
if err != nil {
return nil, skerr.Wrap(err)
}
return g, nil
}
// NewTraceStoreFromConfig creates a new TraceStore from the InstanceConfig.
//
// If local is true then we aren't running in production.
func NewTraceStoreFromConfig(ctx context.Context, local bool, instanceConfig *config.InstanceConfig) (tracestore.TraceStore, error) {
switch instanceConfig.DataStoreConfig.DataStoreType {
case config.CockroachDBDataStoreType:
db, err := NewCockroachDBFromConfig(ctx, instanceConfig, true)
if err != nil {
return nil, skerr.Wrap(err)
}
return sqltracestore.New(db, instanceConfig.DataStoreConfig)
}
return nil, skerr.Fmt("Unknown datastore type: %q", instanceConfig.DataStoreConfig.DataStoreType)
}
// NewAlertStoreFromConfig creates a new alerts.Store from the InstanceConfig.
func NewAlertStoreFromConfig(ctx context.Context, local bool, instanceConfig *config.InstanceConfig) (alerts.Store, error) {
switch instanceConfig.DataStoreConfig.DataStoreType {
case config.CockroachDBDataStoreType:
db, err := NewCockroachDBFromConfig(ctx, instanceConfig, true)
if err != nil {
return nil, skerr.Wrap(err)
}
return sqlalertstore.New(db)
}
return nil, skerr.Fmt("Unknown datastore type: %q", instanceConfig.DataStoreConfig.DataStoreType)
}
// NewRegressionStoreFromConfig creates a new regression.RegressionStore from
// the InstanceConfig.
//
// If local is true then we aren't running in production.
func NewRegressionStoreFromConfig(ctx context.Context, local bool, instanceConfig *config.InstanceConfig, alertsConfigProvider alerts.ConfigProvider) (regression.Store, error) {
switch instanceConfig.DataStoreConfig.DataStoreType {
case config.CockroachDBDataStoreType:
db, err := NewCockroachDBFromConfig(ctx, instanceConfig, true)
if err != nil {
return nil, skerr.Wrap(err)
}
if instanceConfig.UseRegression2 {
return sqlregression2store.New(db, alertsConfigProvider)
} else {
return sqlregressionstore.New(db)
}
}
return nil, skerr.Fmt("Unknown datastore type: %q", instanceConfig.DataStoreConfig.DataStoreType)
}
// NewShortcutStoreFromConfig creates a new shortcut.Store from the
// InstanceConfig.
func NewShortcutStoreFromConfig(ctx context.Context, local bool, instanceConfig *config.InstanceConfig) (shortcut.Store, error) {
switch instanceConfig.DataStoreConfig.DataStoreType {
case config.CockroachDBDataStoreType:
db, err := NewCockroachDBFromConfig(ctx, instanceConfig, true)
if err != nil {
return nil, skerr.Wrap(err)
}
return sqlshortcutstore.New(db)
}
return nil, skerr.Fmt("Unknown datastore type: %q", instanceConfig.DataStoreConfig.DataStoreType)
}
// NewShortcutStoreFromConfig creates a new shortcut.Store from the
// InstanceConfig.
func NewGraphsShortcutStoreFromConfig(ctx context.Context, local bool, instanceConfig *config.InstanceConfig) (graphsshortcut.Store, error) {
switch instanceConfig.DataStoreConfig.DataStoreType {
case config.CockroachDBDataStoreType:
db, err := NewCockroachDBFromConfig(ctx, instanceConfig, true)
if err != nil {
return nil, skerr.Wrap(err)
}
return graphsshortcutstore.New(db)
}
return nil, skerr.Fmt("Unknown datastore type: %q", instanceConfig.DataStoreConfig.DataStoreType)
}
// NewSourceFromConfig creates a new file.Source from the InstanceConfig.
//
// If local is true then we aren't running in production.
func NewSourceFromConfig(ctx context.Context, instanceConfig *config.InstanceConfig, local bool) (file.Source, error) {
switch instanceConfig.IngestionConfig.SourceConfig.SourceType {
case config.GCSSourceType:
return gcssource.New(ctx, instanceConfig, local)
case config.DirSourceType:
n := len(instanceConfig.IngestionConfig.SourceConfig.Sources)
if n != 1 {
return nil, skerr.Fmt("For a source_type of 'dir' there must be a single entry for 'sources', found %d.", n)
}
return dirsource.New(instanceConfig.IngestionConfig.SourceConfig.Sources[0])
default:
return nil, skerr.Fmt("Unknown source_type: %q", instanceConfig.IngestionConfig.SourceConfig.SourceType)
}
}
// NewIngestedFSFromConfig creates a new fs.FS from the InstanceConfig which
// provides access to ingested files.
//
// If local is true then we aren't running in production.
func NewIngestedFSFromConfig(ctx context.Context, cfg *config.InstanceConfig, local bool) (fs.FS, error) {
switch cfg.IngestionConfig.SourceConfig.SourceType {
case config.GCSSourceType:
return gcs.New(ctx, local)
case config.DirSourceType:
return localfilestore.New(cfg.IngestionConfig.SourceConfig.Sources[0])
}
// We currently default to Google Cloud Storage, but Config options could be
// added to use other systems, such as S3.
return gcs.New(ctx, local)
}
// NewAnomalyGroupStoreFromConfig creates a new anomalygroup.Store from the
// InstanceConfig which provides access to the anomalygroup data.
func NewAnomalyGroupStoreFromConfig(ctx context.Context, instanceConfig *config.InstanceConfig) (anomalygroup.Store, error) {
switch instanceConfig.DataStoreConfig.DataStoreType {
case config.CockroachDBDataStoreType:
db, err := NewCockroachDBFromConfig(ctx, instanceConfig, true)
if err != nil {
return nil, skerr.Wrap(err)
}
return ag_store.New(db)
}
return nil, skerr.Fmt("Unknown datastore type: %q", instanceConfig.DataStoreConfig.DataStoreType)
}
// NewCulpritStoreFromConfig creates a new culprit.Store from the
// InstanceConfig which provides access to the culprit data.
func NewCulpritStoreFromConfig(ctx context.Context, instanceConfig *config.InstanceConfig) (culprit.Store, error) {
switch instanceConfig.DataStoreConfig.DataStoreType {
case config.CockroachDBDataStoreType:
db, err := NewCockroachDBFromConfig(ctx, instanceConfig, true)
if err != nil {
return nil, skerr.Wrap(err)
}
return culprit_store.New(db)
}
return nil, skerr.Fmt("Unknown datastore type: %q", instanceConfig.DataStoreConfig.DataStoreType)
}
// NewSubscriptionStoreFromConfig creates a new subscription.Store from the
// InstanceConfig which provides access to the subscription data.
func NewSubscriptionStoreFromConfig(ctx context.Context, instanceConfig *config.InstanceConfig) (subscription.Store, error) {
switch instanceConfig.DataStoreConfig.DataStoreType {
case config.CockroachDBDataStoreType:
db, err := NewCockroachDBFromConfig(ctx, instanceConfig, true)
if err != nil {
return nil, skerr.Wrap(err)
}
return subscription_store.New(db)
}
return nil, skerr.Fmt("Unknown datastore type: %q", instanceConfig.DataStoreConfig.DataStoreType)
}
// NewFavoriteStoreFromConfig creates a new favorites.Store from the
// InstanceConfig which provides access to the favorite data.
func NewFavoriteStoreFromConfig(ctx context.Context, instanceConfig *config.InstanceConfig) (favorites.Store, error) {
switch instanceConfig.DataStoreConfig.DataStoreType {
case config.CockroachDBDataStoreType:
db, err := NewCockroachDBFromConfig(ctx, instanceConfig, true)
if err != nil {
return nil, skerr.Wrap(err)
}
return favorite_store.New(db), nil
}
return nil, skerr.Fmt("Unknown datastore type: %q", instanceConfig.DataStoreConfig.DataStoreType)
}
// GetCacheFromConfig returns a cache.Cache instance based on the given configuration.
func GetCacheFromConfig(ctx context.Context, instanceConfig config.InstanceConfig) (cache.Cache, error) {
var cache cache.Cache
var err error
switch instanceConfig.QueryConfig.CacheConfig.Type {
case config.RedisCache:
redisConfig := instanceConfig.QueryConfig.RedisConfig
gcpClient, err := gcp_redis.NewCloudRedisClient(ctx)
if err != nil {
sklog.Fatalf("Cannot create Redis client for Google Cloud: %v", err)
}
cache, err = redisCache.NewRedisCache(ctx, gcpClient, &redisConfig)
if err != nil {
sklog.Fatalf("Error creating new redis cache: %v", err)
}
case config.LocalCache:
cache, err = localCache.New(100)
if err != nil {
sklog.Fatalf("Error creating new local cache: %v", err)
}
default:
sklog.Fatalf("Invalid cache type %s specified in config", instanceConfig.QueryConfig.CacheConfig.Type)
}
return cache, err
}