blob: cfdd491c4377741c3103d6c1c17a1e3ec0720837 [file] [log] [blame]
/*
Package sqltracestore implements a tracestore.TraceStore on top of SQL.
We'll look that the SQL schema used to explain how SQLTraceStore maps
traces into an SQL database.
Each trace name, which is a structured key (See /infra/go/query) of the form
,key1=value1,key2=value2,..., is stored in the TraceIds table so we can use the
much shorter 64 bit trace_id in other tables.
TraceIDs (
trace_id INTEGER PRIMARY KEY,
trace_name TEXT UNIQUE NOT NULL
)
Similarly we store the name of every source file that has been ingested in the
SourceFiles table so we can use the shorter 64 bit source_file_id in other
tables.
SourceFiles (
source_file_id INTEGER PRIMARY KEY,
source_file TEXT UNIQUE NOT NULL
)
We store the values of each trace in the TraceValues table, and use the trace_id
and the commit_number as the primary key. We also store not only the value but
the id of the source file that the value came from.
TraceValues (
trace_id INTEGER,
commit_number INTEGER,
val REAL,
source_file_id INTEGER,
PRIMARY KEY (trace_id, commit_number)
)
Just using this table we can construct some useful queries. For example
we can count the number of traces in a single tile, in this case the
0th tile in a system with a tileSize of 256:
SELECT
COUNT(DISTINCT trace_id)
FROM
TraceValues
WHERE
commit_number >= 0 AND commit_number < 256;
The Postings table is our inverted index for looking up which trace ids
contain which key=value pairs. For a good introduction to postings and search
https://www.tbray.org/ongoing/When/200x/2003/06/18/HowSearchWorks is a good
resource.
Remember that each trace name is a structured key of the form
,arch=x86,config=8888,..., and that over time traces may come and go, i.e. we
may stop running a test, or start running new tests, so if we want to make
searching for traces efficient we need to be aware of how those trace ids change
over time. The answer is to break our store in Tiles, i.e. blocks of commits of
tileSize length, and then for each Tile we keep an inverted index of the trace
ids. This allows us to not only construct fast queries, but to also do things
like build ParamSets, a collection of all the keys and all their values ever
seen for a particular Tile.
In the table below we store a key_value which is the literal "key=value" part of
a trace name, along with the tile_number and the 64 bit trace id. Note that
tile_number is just int(commitNumber/tileSize).
Postings (
tile_number INTEGER,
key_value text NOT NULL,
trace_id INTEGER,
PRIMARY KEY (tile_number, key_value, trace_id)
)
So for example to build a ParamSet from Postings:
SELECT DISTINCT
key_value
FROM
Postings
WHERE
tile_number=0;
To find the most recent tile:
SELECT
tile_number
FROM
Postings
ORDER BY
tile_number DESC LIMIT 1;
And finally, to retrieve all the trace values that
would match a query, we first start with sub-queries for
each of the common keys in the query, which produce the
trace_ids, which are then ANDed across all the distinct
keys in the query. Finally that list is inner joined to the
TraceValues table to load up all the values.
SELECT
TraceIDs.trace_name, TraceValues.commit_number, TraceValues.val
FROM
TraceIDs
INNER JOIN
TraceValues
ON
TraceValues.trace_id = TraceIDs.trace_id
WHERE
TraceValues.trace_id IN (
SELECT trace_id FROM Postings
WHERE key_value IN ("arch=x86", "arch=arm")
AND tile_number=0
)
AND
TraceValues.trace_id IN (
SELECT trace_id FROM Postings
WHERE key_value IN ("config=8888")
AND tile_number=0
);
Look in migrations/test.sql for more example of raw queries using
a simple example dataset.
*/
package sqltracestore
import (
"bytes"
"context"
"database/sql"
"fmt"
"sort"
"strconv"
"strings"
"text/template"
"time"
lru "github.com/hashicorp/golang-lru"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/query"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/vec32"
perfsql "go.skia.org/infra/perf/go/sql"
"go.skia.org/infra/perf/go/tracestore"
"go.skia.org/infra/perf/go/tracestore/sqltracestore/engine"
"go.skia.org/infra/perf/go/types"
)
// cacheSize is the size of the LRU cache.
//
// TODO(jcgregorio) Move to config.InstanceConfig since this should be tweaked
// per instance.
const cacheSize = 10 * 1000 * 1000
// statement is an SQL statement or fragment of an SQL statement.
type statement int
// All the different statements we need.
const (
insertIntoSourceFiles statement = iota
getSourceFileID
insertIntoTraceIDs
getTraceID
insertIntoPostings
replaceTraceValues
countIndices
getLatestTile
paramSetForTile
getSource
traceCount
)
type statements map[statement]string
type templates map[statement]string
var templatesByDialect = map[perfsql.Dialect]templates{
perfsql.CockroachDBDialect: {
insertIntoPostings: `
UPSERT INTO
Postings (tile_number, key_value, trace_id)
VALUES
{{ range $index, $element := . -}}
{{ if $index }},{{end}}({{ $element.TileNumber }}, '{{ $element.KeyValue }}', {{ $element.TraceID }})
{{ end }}`,
},
}
// insertIntoPostingsValue represents the values used in the insertIntoPostings template.
type insertIntoPostingsValue struct {
TileNumber types.TileNumber
KeyValue string
TraceID int64
}
var statementsByDialect = map[perfsql.Dialect]statements{
perfsql.CockroachDBDialect: {
insertIntoSourceFiles: `
INSERT INTO
SourceFiles (source_file)
VALUES
($1)
ON CONFLICT
DO NOTHING`,
getSourceFileID: `
SELECT
source_file_id
FROM
SourceFiles
WHERE
source_file=$1`,
insertIntoTraceIDs: `
INSERT INTO
TraceIDs (trace_name)
VALUES
($1)
ON CONFLICT
DO NOTHING`,
getTraceID: `
SELECT
trace_id
FROM
TraceIDs
WHERE
trace_name=$1`,
insertIntoPostings: `
INSERT INTO
Postings (tile_number, key_value, trace_id)
VALUES
($1, $2, $3)
ON CONFLICT
DO NOTHING`,
replaceTraceValues: `
UPSERT INTO
TraceValues (trace_id, commit_number, val, source_file_id)
VALUES
($1, $2, $3, $4)`,
countIndices: `
SELECT
COUNT(*)
FROM
Postings
WHERE
tile_number=$1`,
getLatestTile: `
SELECT
tile_number
FROM
Postings
ORDER BY
tile_number DESC
LIMIT 1`,
paramSetForTile: `
SELECT DISTINCT
key_value
FROM
Postings
WHERE
tile_number=$1`,
getSource: `
SELECT
SourceFiles.source_file
FROM
TraceIDs
INNER JOIN
TraceValues ON TraceValues.trace_id = TraceIDs.trace_id
INNER JOIN
SourceFiles ON SourceFiles.source_file_id = TraceValues.source_file_id
WHERE
TraceIDs.trace_name=$1 AND TraceValues.commit_number=$2`,
traceCount: `
SELECT
COUNT(DISTINCT trace_id)
FROM
TraceValues
WHERE
commit_number >= $1 AND commit_number <= $2`,
},
}
// SQLTraceStore implements tracestore.TraceStore backed onto an SQL database.
type SQLTraceStore struct {
// db is the SQL database instance.
db *sql.DB
// preparedStatements are all the prepared SQL statements.
preparedStatements map[statement]*sql.Stmt
// unpreparedStatements are parsed templates that can be used to construct SQL statements.
unpreparedStatements map[statement]*template.Template
// cache is an LRU cache used to store the ids (int64) of trace names (string).
cache *lru.Cache
// tileSize is the number of commits per Tile.
tileSize int32
}
// New returns a new *SQLTraceStore.
//
// We presume all migrations have been run against db before this function is
// called.
func New(db *sql.DB, dialect perfsql.Dialect, tileSize int32) (*SQLTraceStore, error) {
preparedStatements := map[statement]*sql.Stmt{}
for key, statement := range statementsByDialect[dialect] {
prepared, err := db.Prepare(statement)
if err != nil {
return nil, skerr.Wrapf(err, "preparing statement %v, %q", key, statement)
}
preparedStatements[key] = prepared
}
unpreparedStatements := map[statement]*template.Template{}
for key, tmpl := range templatesByDialect[dialect] {
t, err := template.New("").Parse(tmpl)
if err != nil {
return nil, skerr.Wrapf(err, "parsing template %v, %q", key, tmpl)
}
unpreparedStatements[key] = t
}
cache, err := lru.New(cacheSize)
if err != nil {
return nil, skerr.Wrap(err)
}
return &SQLTraceStore{
db: db,
preparedStatements: preparedStatements,
unpreparedStatements: unpreparedStatements,
cache: cache,
tileSize: tileSize,
}, nil
}
// CommitNumberOfTileStart implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) CommitNumberOfTileStart(commitNumber types.CommitNumber) types.CommitNumber {
tileNumber := types.TileNumberFromCommitNumber(commitNumber, s.tileSize)
beginCommit, _ := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)
return beginCommit
}
// CountIndices implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) CountIndices(ctx context.Context, tileNumber types.TileNumber) (int64, error) {
var ret int64
if err := s.preparedStatements[countIndices].QueryRowContext(ctx, tileNumber).Scan(&ret); err != nil {
return 0, skerr.Wrap(err)
}
return ret, nil
}
// GetLatestTile implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) GetLatestTile() (types.TileNumber, error) {
var tileNumber int64
if err := s.preparedStatements[getLatestTile].QueryRowContext(context.TODO()).Scan(&tileNumber); err != nil {
return types.BadTileNumber, skerr.Wrap(err)
}
return types.TileNumber(tileNumber), nil
}
func (s *SQLTraceStore) paramSetForTile(tileNumber types.TileNumber) (paramtools.ParamSet, error) {
rows, err := s.preparedStatements[paramSetForTile].QueryContext(context.TODO(), tileNumber)
if err != nil {
return nil, skerr.Wrapf(err, "tileNumer=%d", tileNumber)
}
ret := paramtools.NewParamSet()
for rows.Next() {
var keyValue string
if err := rows.Scan(&keyValue); err != nil {
return nil, skerr.Wrapf(err, "tileNumer=%d", tileNumber)
}
parts := strings.Split(keyValue, "=")
if len(parts) != 2 {
return nil, skerr.Fmt("Invalid key=value form: %q", keyValue)
}
ret.AddParams(paramtools.Params{parts[0]: parts[1]})
}
if err := rows.Err(); err != nil {
return nil, skerr.Wrapf(err, "tileNumer=%d", tileNumber)
}
ret.Normalize()
return ret, nil
}
// GetOrderedParamSet implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) GetOrderedParamSet(ctx context.Context, tileNumber types.TileNumber) (*paramtools.OrderedParamSet, error) {
ps, err := s.paramSetForTile(tileNumber)
if err != nil {
return nil, skerr.Wrap(err)
}
ret := paramtools.NewOrderedParamSet()
ret.Update(ps)
sort.Strings(ret.KeyOrder)
return ret, nil
}
// GetSource implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) GetSource(ctx context.Context, commitNumber types.CommitNumber, traceName string) (string, error) {
var filename string
if err := s.preparedStatements[getSource].QueryRowContext(ctx, traceName, commitNumber).Scan(&filename); err != nil {
return "", skerr.Wrapf(err, "commitNumber=%d traceName=%q", commitNumber, traceName)
}
return filename, nil
}
// OffsetFromIndex implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) OffsetFromIndex(commitNumber types.CommitNumber) int32 {
return int32(commitNumber) % s.tileSize
}
// QueryTracesByIndex implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) QueryTracesByIndex(ctx context.Context, tileNumber types.TileNumber, q *query.Query) (types.TraceSet, error) {
ops, err := s.GetOrderedParamSet(ctx, tileNumber)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to get OPS.")
}
plan, err := q.QueryPlan(ops)
if err != nil {
// Not an error, we just won't match anything in this tile.
//
// The plan may be invalid because it is querying with keys or values
// that don't appear in a tile, which means they query won't work on
// this tile, but it may still work on other tiles, so we just don't
// return any results for this tile.
return nil, nil
}
if len(plan) == 0 {
// We won't match anything in this tile.
return nil, nil
}
// Sanitize our inputs.
if err := query.ValidateParamSet(plan); err != nil {
return nil, skerr.Wrapf(err, "invalid query %#v", *q)
}
tileNumberString := strconv.FormatInt(int64(tileNumber), 10)
beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)
stmt := fmt.Sprintf(`
SELECT
TraceIDs.trace_name, TraceValues.commit_number, TraceValues.val
FROM
TraceIDs
INNER JOIN
TraceValues
ON
TraceValues.trace_id = TraceIDs.trace_id
WHERE
TraceValues.commit_number >= %d
AND TraceValues.commit_number <= %d
AND `, beginCommit, endCommit)
// TODO(jcgregorio) Break out into own function.
whereClauses := make([]string, len(plan))
clauseIndex := 0
for key, values := range plan {
inClause := make([]string, len(values))
for i, value := range values {
inClause[i] = fmt.Sprintf("'%s=%s'", key, value)
}
whereClauses[clauseIndex] = `
TraceValues.trace_id IN (
SELECT trace_id FROM Postings WHERE key_value IN (` +
strings.Join(inClause, ",") +
`) AND tile_number=` + tileNumberString + `
)`
clauseIndex++
}
fullStatement := stmt + strings.Join(whereClauses, " AND ")
rows, err := s.db.QueryContext(context.TODO(), fullStatement)
if err != nil {
sklog.Debugf("QueryTracesByIndex: fullStatement: %q", fullStatement)
return nil, skerr.Wrap(err)
}
ret := types.TraceSet{}
for rows.Next() {
var traceName string
var commitNumber int64
var val float64
if err := rows.Scan(&traceName, &commitNumber, &val); err != nil {
return nil, skerr.Wrap(err)
}
if _, ok := ret[traceName]; ok {
ret[traceName][commitNumber%int64(s.tileSize)] = float32(val)
} else {
// TODO(jcgregorio) Replace this vec32.New() with a
// https://golang.org/pkg/sync/#Pool since this is our most used/reused
// type of memory.
ret[traceName] = vec32.New(int(s.tileSize))
ret[traceName][commitNumber%int64(s.tileSize)] = float32(val)
}
}
if err := rows.Err(); err != nil {
return nil, skerr.Wrap(err)
}
return ret, nil
}
// QueryTracesIDOnlyByIndex implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) QueryTracesIDOnlyByIndex(ctx context.Context, tileNumber types.TileNumber, q *query.Query) (<-chan paramtools.Params, error) {
outParams := make(chan paramtools.Params, engine.QueryEngineChannelSize)
if q.Empty() {
close(outParams)
return outParams, skerr.Fmt("Can't run QueryTracesIDOnlyByIndex for the empty query.")
}
ops, err := s.GetOrderedParamSet(ctx, tileNumber)
if err != nil {
close(outParams)
return outParams, skerr.Wrap(err)
}
plan, err := q.QueryPlan(ops)
if err != nil {
// Not an error, we just won't match anything in this tile.
//
// The plan may be invalid because it is querying with keys or values
// that don't appear in a tile, which means they query won't work on
// this tile, but it may still work on other tiles, so we just don't
// return any results for this tile.
close(outParams)
return outParams, nil
}
if len(plan) == 0 {
// We won't match anything in this tile.
close(outParams)
return outParams, nil
}
// Sanitize our inputs.
if err := query.ValidateParamSet(plan); err != nil {
close(outParams)
return outParams, skerr.Wrapf(err, "invalid query %#v", *q)
}
tileNumberString := strconv.FormatInt(int64(tileNumber), 10)
beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)
stmt := fmt.Sprintf(`
SELECT DISTINCT
TraceIDs.trace_name
FROM
TraceIDs
INNER JOIN
TraceValues
ON
TraceValues.trace_id = TraceIDs.trace_id
WHERE
TraceValues.commit_number >= %d
AND TraceValues.commit_number <= %d
AND `, beginCommit, endCommit)
// TODO(jcgregorio) Break out into own function.
whereClauses := make([]string, len(plan))
clauseIndex := 0
for key, values := range plan {
inClause := make([]string, len(values))
for i, value := range values {
inClause[i] = fmt.Sprintf("'%s=%s'", key, value)
}
whereClauses[clauseIndex] = `
TraceValues.trace_id IN (
SELECT trace_id FROM Postings WHERE key_value IN (` +
strings.Join(inClause, ",") +
`) AND tile_number=` + tileNumberString + `
)`
clauseIndex++
}
fullStatement := stmt + strings.Join(whereClauses, " AND ")
rows, err := s.db.QueryContext(context.TODO(), fullStatement)
if err != nil {
sklog.Debugf("QueryTracesIDOnlyByIndex: fullStatement: %q", fullStatement)
close(outParams)
return outParams, skerr.Wrap(err)
}
go func() {
defer close(outParams)
for rows.Next() {
var traceName string
if err := rows.Scan(&traceName); err != nil {
sklog.Errorf("Failed to scan traceName: %s", skerr.Wrap(err))
return
}
p, err := query.ParseKey(traceName)
if err != nil {
sklog.Errorf("Failed to parse traceName: %s", skerr.Wrap(err))
return
}
outParams <- p
}
if err := rows.Err(); err != nil {
sklog.Errorf("Failed while reading traceNames: %s", skerr.Wrap(err))
return
}
}()
return outParams, nil
}
// ReadTraces implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) ReadTraces(tileNumber types.TileNumber, keys []string) (types.TraceSet, error) {
// TODO(jcgregorio) Should be broken into batches so we don't exceed the SQL
// engine limit on query sizes.
ret := types.TraceSet{}
for _, key := range keys {
if !query.ValidateKey(key) {
return nil, skerr.Fmt("Invalid key stored in shortcut: %q", key)
}
// TODO(jcgregorio) Replace this vec32.New() with a
// https://golang.org/pkg/sync/#Pool since this is our most used/reused
// type of memory.
ret[key] = vec32.New(int(s.tileSize))
}
// Get the traceIDs for the given keys.
beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)
stmt := fmt.Sprintf(`
SELECT
TraceIDs.trace_name, TraceValues.commit_number, TraceValues.val
FROM
TraceIDs
INNER JOIN
TraceValues ON TraceValues.trace_id = TraceIDs.trace_id
WHERE
TraceValues.commit_number >= %d
AND TraceValues.commit_number <= %d
AND TraceIDs.trace_name IN (`, beginCommit, endCommit)
inClause := make([]string, len(keys))
for i, key := range keys {
singleQuoted := "'" + key + "'"
inClause[i] = singleQuoted
}
fullStatement := stmt + strings.Join(inClause, ",") + ")"
rows, err := s.db.QueryContext(context.TODO(), fullStatement)
if err != nil {
sklog.Debugf("ReadTraces: fullStatement: %q", fullStatement)
return nil, skerr.Wrap(err)
}
for rows.Next() {
var traceName string
var commitNumber int64
var val float64
if err := rows.Scan(&traceName, &commitNumber, &val); err != nil {
return nil, skerr.Wrap(err)
}
ret[traceName][commitNumber%int64(s.tileSize)] = float32(val)
}
if err := rows.Err(); err != nil {
return nil, skerr.Wrapf(err, "tileNumber=%d", tileNumber)
}
return ret, nil
}
// TileNumber implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) TileNumber(commitNumber types.CommitNumber) types.TileNumber {
return types.TileNumberFromCommitNumber(commitNumber, s.tileSize)
}
// TileSize implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) TileSize() int32 {
return s.tileSize
}
// TraceCount implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) TraceCount(ctx context.Context, tileNumber types.TileNumber) (int64, error) {
var ret int64
beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)
err := s.preparedStatements[traceCount].QueryRowContext(context.TODO(), beginCommit, endCommit).Scan(&ret)
return ret, skerr.Wrap(err)
}
// WriteIndices implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) WriteIndices(ctx context.Context, tileNumber types.TileNumber) error {
// TODO(jcgregorio) This func should be removed from the interface since it only applied to BigTableTraceStore.
return nil
}
// updateSourceFile writes the filename into the SourceFiles table and returns
// the int64 id of that filename.
func (s *SQLTraceStore) updateSourceFile(filename string) (int64, error) {
ret := int64(-1)
_, err := s.preparedStatements[insertIntoSourceFiles].ExecContext(context.TODO(), filename)
if err != nil {
return ret, skerr.Wrap(err)
}
err = s.preparedStatements[getSourceFileID].QueryRowContext(context.TODO(), filename).Scan(&ret)
if err != nil {
return ret, skerr.Wrap(err)
}
return ret, nil
}
// updatePostings writes all the entries into our inverted index in Postings for
// the given traceID and tileNumber. The Params given are from the parse trace
// name.
func (s *SQLTraceStore) updatePostings(p paramtools.Params, tileNumber types.TileNumber, traceID int64) error {
// Clean the data to avoid SQL injection attacks.
p = query.ForceValid(p)
// Prepare the data for the SQL template.
values := make([]insertIntoPostingsValue, 0, len(p))
for k, v := range p {
keyValue := fmt.Sprintf("%s=%s", k, v)
values = append(values, insertIntoPostingsValue{
TileNumber: tileNumber,
KeyValue: keyValue,
TraceID: traceID,
})
}
// Expand the template.
var b bytes.Buffer
if err := s.unpreparedStatements[insertIntoPostings].Execute(&b, values); err != nil {
return skerr.Wrapf(err, "Failed to expand insertIntoPostings template with: %v", values)
}
_, err := s.db.ExecContext(context.TODO(), b.String())
if err != nil {
return skerr.Wrap(err)
}
return nil
}
// writeTraceIDAndPostings writes the trace name into the TraceIDs table and returns the
// int64 id of that trace name. This operation will happen repeatedly as data is
// ingested so we cache the results in the LRU cache.
func (s *SQLTraceStore) writeTraceIDAndPostings(traceNameAsParams paramtools.Params, tileNumber types.TileNumber) (int64, error) {
ret := int64(-1)
traceName, err := query.MakeKey(traceNameAsParams)
if err != nil {
return ret, skerr.Wrap(err)
}
// Get an int64 trace id for the traceName.
if iret, ok := s.cache.Get(traceName); ok {
ret = iret.(int64)
} else {
_, err := s.preparedStatements[insertIntoTraceIDs].ExecContext(context.TODO(), traceName)
if err != nil {
return ret, skerr.Wrapf(err, "traceName=%q", traceName)
}
err = s.preparedStatements[getTraceID].QueryRowContext(context.TODO(), traceName).Scan(&ret)
if err != nil {
return ret, skerr.Wrapf(err, "traceName=%q", traceName)
}
s.cache.Add(traceName, ret)
}
// Update postings.
if err := s.updatePostings(traceNameAsParams, tileNumber, ret); err != nil {
return ret, skerr.Wrapf(err, "traceName=%q", traceName)
}
return ret, nil
}
// updateTraceValues writes a single entry in to the TraceValues table.
func (s *SQLTraceStore) updateTraceValues(traceID int64, commitNumber types.CommitNumber, x float32, sourceID int64) error {
_, err := s.preparedStatements[replaceTraceValues].ExecContext(context.TODO(), traceID, commitNumber, x, sourceID)
return skerr.Wrap(err)
}
// WriteTraces implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) WriteTraces(commitNumber types.CommitNumber, params []paramtools.Params, values []float32, _ paramtools.ParamSet, source string, _ time.Time) error {
tileNumber := types.TileNumberFromCommitNumber(commitNumber, s.tileSize)
// Get the row id for the source file.
sourceID, err := s.updateSourceFile(source)
if err != nil {
return skerr.Wrap(err)
}
// Get trace ids for each trace and add trace ids to the index/postings.
// We populate the traceIDs slice whose values are 1:1 with the values and
// params slices.
traceIDs := make([]int64, len(params))
for i, p := range params {
traceID, err := s.writeTraceIDAndPostings(p, tileNumber)
if err != nil {
return skerr.Wrap(err)
}
traceIDs[i] = traceID
}
// Now add each trace value.
for i, x := range values {
if err := s.updateTraceValues(traceIDs[i], commitNumber, x, sourceID); err != nil {
return skerr.Wrap(err)
}
}
return nil
}
// Confirm that *SQLTraceStore fulfills the tracestore.TraceStore interface.
var _ tracestore.TraceStore = (*SQLTraceStore)(nil)