blob: c03c9b1451f71d6cf036f17b420fc61f6891c5da [file] [log] [blame]
/*
Package sqltracestore implements a tracestore.TraceStore on top of SQL.
We'll look that the SQL schema used to explain how SQLTraceStore maps
traces into an SQL database.
Each trace name, which is a structured key (See /infra/go/query) of the form
,key1=value1,key2=value2,..., is stored in the TraceIds table so we can use the
much shorter 64 bit trace_id in other tables.
TraceIDs (
trace_id INTEGER PRIMARY KEY,
trace_name TEXT UNIQUE NOT NULL
)
Similarly we store the name of every source file that has been ingested in the
SourceFiles table so we can use the shorter 64 bit source_file_id in other
tables.
SourceFiles (
source_file_id INTEGER PRIMARY KEY,
source_file TEXT UNIQUE NOT NULL
)
We store the values of each trace in the TraceValues table, and use the trace_id
and the commit_number as the primary key. We also store not only the value but
the id of the source file that the value came from.
TraceValues (
trace_id INTEGER,
commit_number INTEGER,
val REAL,
source_file_id INTEGER,
PRIMARY KEY (trace_id, commit_number)
)
Just using this table we can construct some useful queries. For example
we can count the number of traces in a single tile, in this case the
0th tile in a system with a tileSize of 256:
SELECT
COUNT(DISTINCT trace_id)
FROM
TraceValues
WHERE
commit_number >= 0 AND commit_number < 256;
The Postings table is our inverted index for looking up which trace ids
contain which key=value pairs. For a good introduction to postings and search
https://www.tbray.org/ongoing/When/200x/2003/06/18/HowSearchWorks is a good
resource.
Remember that each trace name is a structured key of the form
,arch=x86,config=8888,..., and that over time traces may come and go, i.e. we
may stop running a test, or start running new tests, so if we want to make
searching for traces efficient we need to be aware of how those trace ids change
over time. The answer is to break our store in Tiles, i.e. blocks of commits of
tileSize length, and then for each Tile we keep an inverted index of the trace
ids. This allows us to not only construct fast queries, but to also do things
like build ParamSets, a collection of all the keys and all their values ever
seen for a particular Tile.
In the table below we store a key_value which is the literal "key=value" part of
a trace name, along with the tile_number and the 64 bit trace id. Note that
tile_number is just int(commitNumber/tileSize).
Postings (
tile_number INTEGER,
key_value text NOT NULL,
trace_id INTEGER,
PRIMARY KEY (tile_number, key_value, trace_id)
)
So for example to build a ParamSet from Postings:
SELECT DISTINCT
key_value
FROM
Postings
WHERE
tile_number=0;
To find the most recent tile:
SELECT
tile_number
FROM
Postings
ORDER BY
tile_number DESC LIMIT 1;
And finally, to retrieve all the trace values that
would match a query, we first start with sub-queries for
each of the common keys in the query, which produce the
trace_ids, which are then ANDed across all the distinct
keys in the query. Finally that list is inner joined to the
TraceValues table to load up all the values.
SELECT
TraceIDs.trace_name, TraceValues.commit_number, TraceValues.val
FROM
TraceIDs
INNER JOIN
TraceValues
ON
TraceValues.trace_id = TraceIDs.trace_id
WHERE
TraceValues.trace_id IN (
SELECT trace_id FROM Postings
WHERE key_value IN ("arch=x86", "arch=arm")
AND tile_number=0
)
AND
TraceValues.trace_id IN (
SELECT trace_id FROM Postings
WHERE key_value IN ("config=8888")
AND tile_number=0
);
Look in migrations/test.sql for more example of raw queries using
a simple example dataset.
*/
package sqltracestore
import (
"bytes"
"context"
"crypto/md5"
"database/sql"
"fmt"
"sort"
"strconv"
"strings"
"text/template"
"time"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/query"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/timer"
"go.skia.org/infra/go/util"
"go.skia.org/infra/go/vec32"
"go.skia.org/infra/perf/go/cache"
"go.skia.org/infra/perf/go/cache/local"
perfsql "go.skia.org/infra/perf/go/sql"
"go.skia.org/infra/perf/go/tracestore"
"go.skia.org/infra/perf/go/tracestore/sqltracestore/engine"
"go.skia.org/infra/perf/go/types"
)
// cacheSize is the size of the LRU cache.
//
// TODO(jcgregorio) Move to config.InstanceConfig since this should be tweaked
// per instance.
const cacheSize = 20 * 1000 * 1000
// Ingest instances have around 8 cores and many ingested files have ~10K
// values, so pick a batch size that allows roughly one request per core.
const traceValuesInsertBatchSize = 1000
// traceIDFromSQL is the type of the IDs that are used in the SQL database for
// traces.
type traceIDFromSQL int64
const badTraceIDFromSQL traceIDFromSQL = -1
// sourceFileIDFromSQL is the type of the IDs that are used in the SQL database
// for source files.
type sourceFileIDFromSQL int64
const badSourceFileIDFromSQL sourceFileIDFromSQL = -1
// statement is an SQL statement or fragment of an SQL statement.
type statement int
// All the different statements we need.
const (
insertIntoSourceFiles statement = iota
getSourceFileID
insertIntoTraceIDs
getTraceID
insertIntoPostings
replaceTraceValues
countIndices
getLatestTile
paramSetForTile
getSource
traceCount
)
type statements map[statement]string
type templates map[statement]string
var templatesByDialect = map[perfsql.Dialect]templates{
perfsql.CockroachDBDialect: {
insertIntoPostings: `
UPSERT INTO
Postings (tile_number, key_value, trace_id)
VALUES
{{ range $index, $element := . -}}
{{ if $index }},{{end}}({{ $element.TileNumber }}, '{{ $element.KeyValue }}', {{ $element.TraceID }})
{{ end }}`,
replaceTraceValues: `
UPSERT INTO
TraceValues (trace_id, commit_number, val, source_file_id)
VALUES
{{ range $index, $element := . -}}
{{ if $index }},{{end}}({{ $element.TraceID }}, {{ $element.CommitNumber }}, {{ $element.Val }}, {{ $element.SourceFileID }})
{{ end }}`,
},
}
// insertIntoPostingsValue represents the values used in the insertIntoPostings template.
type insertIntoPostingsValue struct {
TileNumber types.TileNumber
KeyValue string
TraceID traceIDFromSQL
}
// replaceTraceValue is used in the replaceTraceValues template.
type replaceTraceValue struct {
TraceID traceIDFromSQL
CommitNumber types.CommitNumber
Val float32
SourceFileID sourceFileIDFromSQL
}
var statementsByDialect = map[perfsql.Dialect]statements{
perfsql.CockroachDBDialect: {
insertIntoSourceFiles: `
INSERT INTO
SourceFiles (source_file)
VALUES
($1)
ON CONFLICT
DO NOTHING`,
getSourceFileID: `
SELECT
source_file_id
FROM
SourceFiles
WHERE
source_file=$1`,
insertIntoTraceIDs: `
INSERT INTO
TraceIDs (trace_name)
VALUES
($1)
ON CONFLICT
DO NOTHING`,
getTraceID: `
SELECT
trace_id
FROM
TraceIDs
WHERE
trace_name=$1`,
insertIntoPostings: `
INSERT INTO
Postings (tile_number, key_value, trace_id)
VALUES
($1, $2, $3)
ON CONFLICT
DO NOTHING`,
countIndices: `
SELECT
COUNT(*)
FROM
Postings
WHERE
tile_number=$1`,
getLatestTile: `
SELECT
tile_number
FROM
Postings
ORDER BY
tile_number DESC
LIMIT 1`,
paramSetForTile: `
SELECT DISTINCT
key_value
FROM
Postings
WHERE
tile_number=$1`,
getSource: `
SELECT
SourceFiles.source_file
FROM
TraceIDs
INNER JOIN
TraceValues ON TraceValues.trace_id = TraceIDs.trace_id
INNER JOIN
SourceFiles ON SourceFiles.source_file_id = TraceValues.source_file_id
WHERE
TraceIDs.trace_name=$1 AND TraceValues.commit_number=$2`,
traceCount: `
SELECT
COUNT(DISTINCT trace_id)
FROM
TraceValues
WHERE
commit_number >= $1 AND commit_number <= $2`,
},
}
// SQLTraceStore implements tracestore.TraceStore backed onto an SQL database.
type SQLTraceStore struct {
// db is the SQL database instance.
db *sql.DB
// preparedStatements are all the prepared SQL statements.
preparedStatements map[statement]*sql.Stmt
// unpreparedStatements are parsed templates that can be used to construct SQL statements.
unpreparedStatements map[statement]*template.Template
// cache is an LRU cache used to store the ids (traceIDFromSQL) of hashed trace names (string):
// "md5(traceName)" -> traceIDFromSQL
// and if a traces postings have been written for a tile (bool).
// "traceID-tileNumber" -> bool
//
// For traceNames the md5 sum is used to keep the cache keys shorter, thus preserving
// memory and also allowing the use of memcached which restricts keys to 250 chars.
cache cache.Cache
// tileSize is the number of commits per Tile.
tileSize int32
// metrics
writeTracesMetric metrics2.Float64SummaryMetric
updateTraceValuesMetric metrics2.Float64SummaryMetric
buildTracesContextMetric metrics2.Float64SummaryMetric
writeTraceIDAndPostingsMetric metrics2.Float64SummaryMetric
writeTraceIDAndPostingsCacheHitMetric metrics2.Counter
writeTraceIDAndPostingsCacheMissMetric metrics2.Counter
writeTraceIDAndPostingsPostingsCacheHitMetric metrics2.Counter
writeTraceIDAndPostingsPostingsCacheMissMetric metrics2.Counter
}
// New returns a new *SQLTraceStore.
//
// We presume all migrations have been run against db before this function is
// called.
func New(db *sql.DB, dialect perfsql.Dialect, tileSize int32) (*SQLTraceStore, error) {
preparedStatements := map[statement]*sql.Stmt{}
for key, statement := range statementsByDialect[dialect] {
prepared, err := db.Prepare(statement)
if err != nil {
return nil, skerr.Wrapf(err, "preparing statement %v, %q", key, statement)
}
preparedStatements[key] = prepared
}
unpreparedStatements := map[statement]*template.Template{}
for key, tmpl := range templatesByDialect[dialect] {
t, err := template.New("").Parse(tmpl)
if err != nil {
return nil, skerr.Wrapf(err, "parsing template %v, %q", key, tmpl)
}
unpreparedStatements[key] = t
}
cache, err := local.New(cacheSize)
if err != nil {
return nil, skerr.Wrap(err)
}
return &SQLTraceStore{
db: db,
preparedStatements: preparedStatements,
unpreparedStatements: unpreparedStatements,
cache: cache,
tileSize: tileSize,
writeTracesMetric: metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_writeTraces"),
updateTraceValuesMetric: metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_updateTraceValues"),
buildTracesContextMetric: metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_buildTracesContext"),
writeTraceIDAndPostingsMetric: metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_writeTraceIDAndPostings"),
writeTraceIDAndPostingsCacheHitMetric: metrics2.GetCounter("perfserver_sqltracestore_writeTraceIDAndPostings_cache_hit"),
writeTraceIDAndPostingsCacheMissMetric: metrics2.GetCounter("perfserver_sqltracestore_writeTraceIDAndPostings_cache_miss"),
writeTraceIDAndPostingsPostingsCacheHitMetric: metrics2.GetCounter("perfserver_sqltracestore_writeTraceIDAndPostings_postings_cache_hit"),
writeTraceIDAndPostingsPostingsCacheMissMetric: metrics2.GetCounter("perfserver_sqltracestore_writeTraceIDAndPostings_postings_cache_miss"),
}, nil
}
// CommitNumberOfTileStart implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) CommitNumberOfTileStart(commitNumber types.CommitNumber) types.CommitNumber {
tileNumber := types.TileNumberFromCommitNumber(commitNumber, s.tileSize)
beginCommit, _ := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)
return beginCommit
}
// CountIndices implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) CountIndices(ctx context.Context, tileNumber types.TileNumber) (int64, error) {
var ret int64
if err := s.preparedStatements[countIndices].QueryRowContext(ctx, tileNumber).Scan(&ret); err != nil {
return 0, skerr.Wrap(err)
}
return ret, nil
}
// GetLatestTile implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) GetLatestTile() (types.TileNumber, error) {
tileNumber := types.BadTileNumber
if err := s.preparedStatements[getLatestTile].QueryRowContext(context.TODO()).Scan(&tileNumber); err != nil {
return tileNumber, skerr.Wrap(err)
}
return tileNumber, nil
}
func (s *SQLTraceStore) paramSetForTile(tileNumber types.TileNumber) (paramtools.ParamSet, error) {
rows, err := s.preparedStatements[paramSetForTile].QueryContext(context.TODO(), tileNumber)
if err != nil {
return nil, skerr.Wrapf(err, "tileNumer=%d", tileNumber)
}
ret := paramtools.NewParamSet()
for rows.Next() {
var keyValue string
if err := rows.Scan(&keyValue); err != nil {
return nil, skerr.Wrapf(err, "tileNumer=%d", tileNumber)
}
parts := strings.Split(keyValue, "=")
if len(parts) != 2 {
return nil, skerr.Fmt("Invalid key=value form: %q", keyValue)
}
ret.AddParams(paramtools.Params{parts[0]: parts[1]})
}
if err := rows.Err(); err != nil {
return nil, skerr.Wrapf(err, "tileNumer=%d", tileNumber)
}
ret.Normalize()
return ret, nil
}
// GetOrderedParamSet implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) GetOrderedParamSet(ctx context.Context, tileNumber types.TileNumber) (*paramtools.OrderedParamSet, error) {
ps, err := s.paramSetForTile(tileNumber)
if err != nil {
return nil, skerr.Wrap(err)
}
ret := paramtools.NewOrderedParamSet()
ret.Update(ps)
sort.Strings(ret.KeyOrder)
return ret, nil
}
// GetSource implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) GetSource(ctx context.Context, commitNumber types.CommitNumber, traceName string) (string, error) {
var filename string
if err := s.preparedStatements[getSource].QueryRowContext(ctx, traceName, commitNumber).Scan(&filename); err != nil {
return "", skerr.Wrapf(err, "commitNumber=%d traceName=%q", commitNumber, traceName)
}
return filename, nil
}
// OffsetFromCommitNumber implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) OffsetFromCommitNumber(commitNumber types.CommitNumber) int32 {
return int32(commitNumber) % s.tileSize
}
// QueryTracesByIndex implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) QueryTracesByIndex(ctx context.Context, tileNumber types.TileNumber, q *query.Query) (types.TraceSet, error) {
ops, err := s.GetOrderedParamSet(ctx, tileNumber)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to get OPS.")
}
plan, err := q.QueryPlan(ops)
if err != nil {
// Not an error, we just won't match anything in this tile.
//
// The plan may be invalid because it is querying with keys or values
// that don't appear in a tile, which means they query won't work on
// this tile, but it may still work on other tiles, so we just don't
// return any results for this tile.
return nil, nil
}
if len(plan) == 0 {
// We won't match anything in this tile.
return nil, nil
}
// Sanitize our inputs.
if err := query.ValidateParamSet(plan); err != nil {
return nil, skerr.Wrapf(err, "invalid query %#v", *q)
}
tileNumberString := strconv.FormatInt(int64(tileNumber), 10)
beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)
stmt := fmt.Sprintf(`
SELECT
TraceIDs.trace_name, TraceValues.commit_number, TraceValues.val
FROM
TraceIDs
INNER JOIN
TraceValues
ON
TraceValues.trace_id = TraceIDs.trace_id
WHERE
TraceValues.commit_number >= %d
AND TraceValues.commit_number <= %d
AND `, beginCommit, endCommit)
// TODO(jcgregorio) Break out into own function.
whereClauses := make([]string, len(plan))
clauseIndex := 0
for key, values := range plan {
inClause := make([]string, len(values))
for i, value := range values {
inClause[i] = fmt.Sprintf("'%s=%s'", key, value)
}
whereClauses[clauseIndex] = `
TraceValues.trace_id IN (
SELECT trace_id FROM Postings WHERE key_value IN (` +
strings.Join(inClause, ",") +
`) AND tile_number=` + tileNumberString + `
)`
clauseIndex++
}
fullStatement := stmt + strings.Join(whereClauses, " AND ")
rows, err := s.db.QueryContext(context.TODO(), fullStatement)
if err != nil {
sklog.Debugf("QueryTracesByIndex: fullStatement: %q", fullStatement)
return nil, skerr.Wrap(err)
}
ret := types.TraceSet{}
for rows.Next() {
var traceName string
var commitNumber types.CommitNumber
var val float64
if err := rows.Scan(&traceName, &commitNumber, &val); err != nil {
return nil, skerr.Wrap(err)
}
if _, ok := ret[traceName]; ok {
ret[traceName][s.OffsetFromCommitNumber(commitNumber)] = float32(val)
} else {
// TODO(jcgregorio) Replace this vec32.New() with a
// https://golang.org/pkg/sync/#Pool since this is our most used/reused
// type of memory.
ret[traceName] = vec32.New(int(s.tileSize))
ret[traceName][s.OffsetFromCommitNumber(commitNumber)] = float32(val)
}
}
if err := rows.Err(); err != nil {
return nil, skerr.Wrap(err)
}
return ret, nil
}
// QueryTracesIDOnlyByIndex implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) QueryTracesIDOnlyByIndex(ctx context.Context, tileNumber types.TileNumber, q *query.Query) (<-chan paramtools.Params, error) {
outParams := make(chan paramtools.Params, engine.QueryEngineChannelSize)
if q.Empty() {
close(outParams)
return outParams, skerr.Fmt("Can't run QueryTracesIDOnlyByIndex for the empty query.")
}
ops, err := s.GetOrderedParamSet(ctx, tileNumber)
if err != nil {
close(outParams)
return outParams, skerr.Wrap(err)
}
plan, err := q.QueryPlan(ops)
if err != nil {
// Not an error, we just won't match anything in this tile.
//
// The plan may be invalid because it is querying with keys or values
// that don't appear in a tile, which means they query won't work on
// this tile, but it may still work on other tiles, so we just don't
// return any results for this tile.
close(outParams)
return outParams, nil
}
if len(plan) == 0 {
// We won't match anything in this tile.
close(outParams)
return outParams, nil
}
// Sanitize our inputs.
if err := query.ValidateParamSet(plan); err != nil {
close(outParams)
return outParams, skerr.Wrapf(err, "invalid query %#v", *q)
}
tileNumberString := strconv.FormatInt(int64(tileNumber), 10)
beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)
stmt := fmt.Sprintf(`
SELECT DISTINCT
TraceIDs.trace_name
FROM
TraceIDs
INNER JOIN
TraceValues
ON
TraceValues.trace_id = TraceIDs.trace_id
WHERE
TraceValues.commit_number >= %d
AND TraceValues.commit_number <= %d
AND `, beginCommit, endCommit)
// TODO(jcgregorio) Break out into own function.
// TODO(jcgregorio) Convert to use text/templates.
whereClauses := make([]string, len(plan))
clauseIndex := 0
for key, values := range plan {
inClause := make([]string, len(values))
for i, value := range values {
inClause[i] = fmt.Sprintf("'%s=%s'", key, value)
}
whereClauses[clauseIndex] = `
TraceValues.trace_id IN (
SELECT trace_id FROM Postings WHERE key_value IN (` +
strings.Join(inClause, ",") +
`) AND tile_number=` + tileNumberString + `
)`
clauseIndex++
}
fullStatement := stmt + strings.Join(whereClauses, " AND ")
rows, err := s.db.QueryContext(context.TODO(), fullStatement)
if err != nil {
sklog.Debugf("QueryTracesIDOnlyByIndex: fullStatement: %q", fullStatement)
close(outParams)
return outParams, skerr.Wrap(err)
}
go func() {
defer close(outParams)
for rows.Next() {
var traceName string
if err := rows.Scan(&traceName); err != nil {
sklog.Errorf("Failed to scan traceName: %s", skerr.Wrap(err))
return
}
p, err := query.ParseKey(traceName)
if err != nil {
sklog.Errorf("Failed to parse traceName: %s", skerr.Wrap(err))
return
}
outParams <- p
}
if err := rows.Err(); err != nil {
sklog.Errorf("Failed while reading traceNames: %s", skerr.Wrap(err))
return
}
}()
return outParams, nil
}
// ReadTraces implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) ReadTraces(tileNumber types.TileNumber, keys []string) (types.TraceSet, error) {
// TODO(jcgregorio) Should be broken into batches so we don't exceed the SQL
// engine limit on query sizes.
ret := types.TraceSet{}
for _, key := range keys {
if !query.ValidateKey(key) {
return nil, skerr.Fmt("Invalid key stored in shortcut: %q", key)
}
// TODO(jcgregorio) Replace this vec32.New() with a
// https://golang.org/pkg/sync/#Pool since this is our most used/reused
// type of memory.
ret[key] = vec32.New(int(s.tileSize))
}
// Get the traceIDs for the given keys.
beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)
stmt := fmt.Sprintf(`
SELECT
TraceIDs.trace_name, TraceValues.commit_number, TraceValues.val
FROM
TraceIDs
INNER JOIN
TraceValues ON TraceValues.trace_id = TraceIDs.trace_id
WHERE
TraceValues.commit_number >= %d
AND TraceValues.commit_number <= %d
AND TraceIDs.trace_name IN (`, beginCommit, endCommit)
inClause := make([]string, len(keys))
for i, key := range keys {
singleQuoted := "'" + key + "'"
inClause[i] = singleQuoted
}
fullStatement := stmt + strings.Join(inClause, ",") + ")"
rows, err := s.db.QueryContext(context.TODO(), fullStatement)
if err != nil {
sklog.Debugf("ReadTraces: fullStatement: %q", fullStatement)
return nil, skerr.Wrap(err)
}
for rows.Next() {
var traceName string
var commitNumber types.CommitNumber
var val float64
if err := rows.Scan(&traceName, &commitNumber, &val); err != nil {
return nil, skerr.Wrap(err)
}
ret[traceName][s.OffsetFromCommitNumber(commitNumber)] = float32(val)
}
if err := rows.Err(); err != nil {
return nil, skerr.Wrapf(err, "tileNumber=%d", tileNumber)
}
return ret, nil
}
// TileNumber implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) TileNumber(commitNumber types.CommitNumber) types.TileNumber {
return types.TileNumberFromCommitNumber(commitNumber, s.tileSize)
}
// TileSize implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) TileSize() int32 {
return s.tileSize
}
// TraceCount implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) TraceCount(ctx context.Context, tileNumber types.TileNumber) (int64, error) {
var ret int64
beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)
err := s.preparedStatements[traceCount].QueryRowContext(context.TODO(), beginCommit, endCommit).Scan(&ret)
return ret, skerr.Wrap(err)
}
// WriteIndices implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) WriteIndices(ctx context.Context, tileNumber types.TileNumber) error {
// TODO(jcgregorio) This func should be removed from the interface since it only applied to BigTableTraceStore.
return nil
}
// updateSourceFile writes the filename into the SourceFiles table and returns
// the sourceFileIDFromSQL of that filename.
func (s *SQLTraceStore) updateSourceFile(filename string) (sourceFileIDFromSQL, error) {
ret := badSourceFileIDFromSQL
_, err := s.preparedStatements[insertIntoSourceFiles].ExecContext(context.TODO(), filename)
if err != nil {
return ret, skerr.Wrap(err)
}
err = s.preparedStatements[getSourceFileID].QueryRowContext(context.TODO(), filename).Scan(&ret)
if err != nil {
return ret, skerr.Wrap(err)
}
return ret, nil
}
// updatePostings writes all the entries into our inverted index in Postings for
// the given traceID and tileNumber. The Params given are from the parse trace
// name.
func (s *SQLTraceStore) updatePostings(p paramtools.Params, tileNumber types.TileNumber, traceID traceIDFromSQL) error {
// Clean the data to avoid SQL injection attacks.
p = query.ForceValid(p)
// Prepare the data for the SQL template.
values := make([]insertIntoPostingsValue, 0, len(p))
for k, v := range p {
keyValue := fmt.Sprintf("%s=%s", k, v)
values = append(values, insertIntoPostingsValue{
TileNumber: tileNumber,
KeyValue: keyValue,
TraceID: traceID,
})
}
// Expand the template.
var b bytes.Buffer
if err := s.unpreparedStatements[insertIntoPostings].Execute(&b, values); err != nil {
return skerr.Wrapf(err, "Failed to expand insertIntoPostings template with: %v", values)
}
_, err := s.db.ExecContext(context.TODO(), b.String())
if err != nil {
return skerr.Wrap(err)
}
return nil
}
func getPostingsCacheEntryKey(traceID traceIDFromSQL, tileNumber types.TileNumber) string {
return fmt.Sprintf("%d-%d", traceID, tileNumber)
}
func getHashedTraceName(traceName string) string {
return fmt.Sprintf("%x", md5.Sum([]byte(traceName)))
}
// writeTraceIDAndPostings writes the trace name into the TraceIDs table and
// returns the traceIDFromSQL of that trace name. This operation will happen
// repeatedly as data is ingested so we cache the results in the LRU cache.
func (s *SQLTraceStore) writeTraceIDAndPostings(traceNameAsParams paramtools.Params, tileNumber types.TileNumber) (traceIDFromSQL, error) {
defer timer.NewWithSummary("perfserver_sqltracestore_writeTraceIDAndPostings", s.writeTraceIDAndPostingsMetric).Stop()
traceID := badTraceIDFromSQL
traceName, err := query.MakeKey(traceNameAsParams)
if err != nil {
return traceID, skerr.Wrap(err)
}
hashedTraceName := getHashedTraceName(traceName)
// Get a traceIDFromSQL for the traceName.
if iret, ok := s.cache.Get(hashedTraceName); ok {
traceID = iret.(traceIDFromSQL)
s.writeTraceIDAndPostingsCacheHitMetric.Inc(1)
} else {
s.writeTraceIDAndPostingsCacheMissMetric.Inc(1)
_, err := s.preparedStatements[insertIntoTraceIDs].ExecContext(context.TODO(), traceName)
if err != nil {
return traceID, skerr.Wrapf(err, "traceName=%q", traceName)
}
err = s.preparedStatements[getTraceID].QueryRowContext(context.TODO(), traceName).Scan(&traceID)
if err != nil {
return traceID, skerr.Wrapf(err, "traceName=%q", traceName)
}
s.cache.Add(hashedTraceName, traceID)
}
postingsCacheEntryKey := getPostingsCacheEntryKey(traceID, tileNumber)
if !s.cache.Exists(postingsCacheEntryKey) {
s.writeTraceIDAndPostingsPostingsCacheMissMetric.Inc(1)
// TODO(jcgregorio) Do a query on the database to see if the postings are there.
// Update postings.
if err := s.updatePostings(traceNameAsParams, tileNumber, traceID); err != nil {
return traceID, skerr.Wrapf(err, "traceName=%q tileNumber=%d traceID=%d", traceName, tileNumber, traceID)
}
s.cache.Add(postingsCacheEntryKey, true)
} else {
s.writeTraceIDAndPostingsPostingsCacheHitMetric.Inc(1)
}
return traceID, nil
}
// updateTraceValues writes the given slice of replaceTraceValues into the store.
func (s *SQLTraceStore) updateTraceValues(templateContext []replaceTraceValue) error {
defer timer.NewWithSummary("perfserver_sqltracestore_updateTraceValues", s.updateTraceValuesMetric).Stop()
var b bytes.Buffer
if err := s.unpreparedStatements[replaceTraceValues].Execute(&b, templateContext); err != nil {
return skerr.Wrapf(err, "failed to expand template")
}
_, err := s.db.ExecContext(context.TODO(), b.String())
return skerr.Wrap(err)
}
// WriteTraces implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) WriteTraces(commitNumber types.CommitNumber, params []paramtools.Params, values []float32, _ paramtools.ParamSet, source string, _ time.Time) error {
defer timer.NewWithSummary("perfserver_sqltracestore_writeTraces", s.writeTracesMetric).Stop()
tileNumber := types.TileNumberFromCommitNumber(commitNumber, s.tileSize)
// Get the row id for the source file.
sourceID, err := s.updateSourceFile(source)
if err != nil {
return skerr.Wrap(err)
}
t := timer.NewWithSummary("perfserver_sqltracestore_buildTracesContext", s.buildTracesContextMetric)
// Build the 'context' which will be used to populate the SQL template.
templateContext := make([]replaceTraceValue, 0, len(params))
for i, p := range params {
traceID, err := s.writeTraceIDAndPostings(p, tileNumber)
if err != nil {
t.Stop()
return skerr.Wrap(err)
}
templateContext = append(templateContext, replaceTraceValue{
TraceID: traceID,
CommitNumber: commitNumber,
Val: values[i],
SourceFileID: sourceID,
})
}
t.Stop()
err = util.ChunkIterParallel(context.TODO(), len(templateContext), traceValuesInsertBatchSize, func(ctx context.Context, startIdx int, endIdx int) error {
if err := s.updateTraceValues(templateContext[startIdx:endIdx]); err != nil {
return skerr.Wrapf(err, "failed inserting subSlice: [%d:%d]", startIdx, endIdx)
}
return nil
})
return err
}
// Confirm that *SQLTraceStore fulfills the tracestore.TraceStore interface.
var _ tracestore.TraceStore = (*SQLTraceStore)(nil)