/*
Package sqltracestore implements a tracestore.TraceStore on top of SQL.

We'll look that the SQL schema used to explain how SQLTraceStore maps
traces into an SQL database.

Each trace name, which is a structured key (See /infra/go/query) of the form
,key1=value1,key2=value2,..., is stored in the TraceIds table so we can use the
much shorter 64 bit trace_id in other tables.

	TraceIDs  (
		trace_id INTEGER PRIMARY KEY,
		trace_name TEXT UNIQUE NOT NULL
	)

Similarly we store the name of every source file that has been ingested in the
SourceFiles table so we can use the shorter 64 bit source_file_id in other
tables.

	SourceFiles (
		source_file_id INTEGER PRIMARY KEY,
		source_file TEXT UNIQUE NOT NULL
	)

We store the values of each trace in the TraceValues table, and use the trace_id
and the commit_number as the primary key. We also store not only the value but
the id of the source file that the value came from.

	TraceValues (
		trace_id INTEGER,
		commit_number INTEGER,
		val REAL,
		source_file_id INTEGER,
		PRIMARY KEY (trace_id, commit_number)
	)

Just using this table we can construct some useful queries. For example
we can count the number of traces in a single tile, in this case the
0th tile in a system with a tileSize of 256:

	SELECT
		COUNT(DISTINCT trace_id)
	FROM
		TraceValues
	WHERE
  		commit_number >= 0 AND commit_number < 256;

The Postings table is our inverted index for looking up which trace ids
contain which key=value pairs. For a good introduction to postings and search
https://www.tbray.org/ongoing/When/200x/2003/06/18/HowSearchWorks is a good
resource.

Remember that each trace name is a structured key of the form
,arch=x86,config=8888,..., and that over time traces may come and go, i.e. we
may stop running a test, or start running new tests, so if we want to make
searching for traces efficient we need to be aware of how those trace ids change
over time. The answer is to break our store in Tiles, i.e. blocks of commits of
tileSize length, and then for each Tile we keep an inverted index of the trace
ids. This allows us to not only construct fast queries, but to also do things
like build ParamSets, a collection of all the keys and all their values ever
seen for a particular Tile.

In the table below we store a key_value which is the literal "key=value" part of
a trace name, along with the tile_number and the 64 bit trace id. Note that
tile_number is just int(commitNumber/tileSize).

	Postings  (
		tile_number INTEGER,
		key_value text NOT NULL,
		trace_id INTEGER,
		PRIMARY KEY (tile_number, key_value, trace_id)
	)

So for example to build a ParamSet from Postings:

	SELECT DISTINCT
		key_value
	FROM
		Postings
	WHERE
		tile_number=0;

To find the most recent tile:

	SELECT
		tile_number
	FROM
		Postings
	ORDER BY
		tile_number DESC LIMIT 1;

And finally, to retrieve all the trace values that
would match a query, we first start with sub-queries for
each of the common keys in the query, which produce the
trace_ids, which are then ANDed across all the distinct
keys in the query. Finally that list is inner joined to the
TraceValues table to load up all the values.

	SELECT
		TraceIDs.trace_name, TraceValues.commit_number, TraceValues.val
	FROM
		TraceIDs
	INNER JOIN
		TraceValues
	ON
		TraceValues.trace_id = TraceIDs.trace_id
	WHERE
  		TraceValues.trace_id IN (
			SELECT trace_id FROM Postings
			WHERE key_value IN ("arch=x86", "arch=arm")
			AND tile_number=0
  		)
	  AND
	  	TraceValues.trace_id IN (
			SELECT trace_id FROM Postings
			WHERE key_value IN ("config=8888")
			AND tile_number=0
		  );

Look in migrations/test.sql for more example of raw queries using
a simple example dataset.
*/
package sqltracestore

import (
	"bytes"
	"context"
	"crypto/md5"
	"database/sql"
	"fmt"
	"sort"
	"strconv"
	"strings"
	"text/template"
	"time"

	"go.skia.org/infra/go/metrics2"
	"go.skia.org/infra/go/paramtools"
	"go.skia.org/infra/go/query"
	"go.skia.org/infra/go/skerr"
	"go.skia.org/infra/go/sklog"
	"go.skia.org/infra/go/timer"
	"go.skia.org/infra/go/util"
	"go.skia.org/infra/go/vec32"
	"go.skia.org/infra/perf/go/cache"
	"go.skia.org/infra/perf/go/cache/local"
	"go.skia.org/infra/perf/go/cache/memcached"
	"go.skia.org/infra/perf/go/config"
	perfsql "go.skia.org/infra/perf/go/sql"
	"go.skia.org/infra/perf/go/tracestore"
	"go.skia.org/infra/perf/go/tracestore/sqltracestore/engine"
	"go.skia.org/infra/perf/go/types"
)

// defaultCacheSize is the size of the in-memory LRU cache if no size was
// specified in the config file.
const defaultCacheSize = 20 * 1000 * 1000

// Ingest instances have around 8 cores and many ingested files have ~10K
// values, so pick a batch size that allows roughly one request per core.
const traceValuesInsertBatchSize = 1000

// traceIDFromSQL is the type of the IDs that are used in the SQL database for
// traces.
type traceIDFromSQL int64

const badTraceIDFromSQL traceIDFromSQL = -1

// sourceFileIDFromSQL is the type of the IDs that are used in the SQL database
// for source files.
type sourceFileIDFromSQL int64

const badSourceFileIDFromSQL sourceFileIDFromSQL = -1

// statement is an SQL statement or fragment of an SQL statement.
type statement int

// All the different statements we need.
const (
	insertIntoSourceFiles statement = iota
	getSourceFileID
	insertIntoTraceIDs
	getTraceID
	insertIntoPostings
	replaceTraceValues
	countIndices
	getLatestTile
	paramSetForTile
	getSource
	traceCount
)

type statements map[statement]string

type templates map[statement]string

var templatesByDialect = map[perfsql.Dialect]templates{
	perfsql.CockroachDBDialect: {
		insertIntoPostings: `
		UPSERT INTO
			Postings (tile_number, key_value, trace_id)
		VALUES
			{{ range $index, $element :=  . -}}
				{{ if $index }},{{end}}({{ $element.TileNumber }}, '{{ $element.KeyValue }}', {{ $element.TraceID }})
	  		{{ end }}`,
		replaceTraceValues: `
		UPSERT INTO
			TraceValues (trace_id, commit_number, val, source_file_id)
		VALUES
		{{ range $index, $element :=  . -}}
			{{ if $index }},{{end}}({{ $element.TraceID }}, {{ $element.CommitNumber }}, {{ $element.Val }}, {{ $element.SourceFileID }})
		{{ end }}`,
	},
}

// insertIntoPostingsValue represents the values used in the insertIntoPostings template.
type insertIntoPostingsValue struct {
	TileNumber types.TileNumber
	KeyValue   string
	TraceID    traceIDFromSQL
}

// replaceTraceValue is used in the replaceTraceValues template.
type replaceTraceValue struct {
	TraceID      traceIDFromSQL
	CommitNumber types.CommitNumber
	Val          float32
	SourceFileID sourceFileIDFromSQL
}

var statementsByDialect = map[perfsql.Dialect]statements{
	perfsql.CockroachDBDialect: {
		insertIntoSourceFiles: `
		INSERT INTO
			SourceFiles (source_file)
		VALUES
			($1)
		ON CONFLICT
		DO NOTHING`,
		getSourceFileID: `
		SELECT
			source_file_id
		FROM
			SourceFiles
		WHERE
			source_file=$1`,
		insertIntoTraceIDs: `
		INSERT INTO
			TraceIDs (trace_name)
		VALUES
			($1)
		ON CONFLICT
		DO NOTHING`,
		getTraceID: `
		SELECT
			trace_id
		FROM
			TraceIDs
		WHERE
			trace_name=$1`,
		insertIntoPostings: `
		INSERT INTO
			Postings (tile_number, key_value, trace_id)
		VALUES
			($1, $2, $3)
		ON CONFLICT
		DO NOTHING`,
		countIndices: `
		SELECT
			COUNT(*)
		FROM
			Postings
		WHERE
			tile_number=$1`,
		getLatestTile: `
		SELECT
			tile_number
		FROM
			Postings
		ORDER BY
			tile_number DESC
		LIMIT 1`,
		paramSetForTile: `
		SELECT DISTINCT
			key_value
		FROM
			Postings
		WHERE
			tile_number=$1`,
		getSource: `
		SELECT
			SourceFiles.source_file
		FROM
			TraceIDs
		INNER JOIN
			TraceValues ON TraceValues.trace_id = TraceIDs.trace_id
		INNER JOIN
			SourceFiles ON SourceFiles.source_file_id = TraceValues.source_file_id
		WHERE
			TraceIDs.trace_name=$1 AND TraceValues.commit_number=$2`,
		traceCount: `
		SELECT
			COUNT(DISTINCT trace_id)
		FROM
			TraceValues
		WHERE
		  commit_number >= $1 AND commit_number <= $2`,
	},
}

// SQLTraceStore implements tracestore.TraceStore backed onto an SQL database.
type SQLTraceStore struct {
	// db is the SQL database instance.
	db *sql.DB

	// preparedStatements are all the prepared SQL statements.
	preparedStatements map[statement]*sql.Stmt

	// unpreparedStatements are parsed templates that can be used to construct SQL statements.
	unpreparedStatements map[statement]*template.Template

	// cache is an LRU cache used to store the ids (traceIDFromSQL) of hashed trace names (string):
	//    "md5(traceName)" -> traceIDFromSQL
	// and if a traces postings have been written for a tile (bool).
	//    "traceID-tileNumber" -> bool
	//
	// For traceNames the md5 sum is used to keep the cache keys shorter, thus preserving
	// memory and also allowing the use of memcached which restricts keys to 250 chars.
	cache cache.Cache

	// tileSize is the number of commits per Tile.
	tileSize int32

	// metrics
	writeTracesMetric                              metrics2.Float64SummaryMetric
	writeTracesMetricSQL                           metrics2.Float64SummaryMetric
	updateTraceValuesMetric                        metrics2.Float64SummaryMetric
	buildTracesContextMetric                       metrics2.Float64SummaryMetric
	writeTraceIDAndPostingsCacheHitMetric          metrics2.Counter
	writeTraceIDAndPostingsCacheMissMetric         metrics2.Counter
	writeTraceIDAndPostingsPostingsCacheHitMetric  metrics2.Counter
	writeTraceIDAndPostingsPostingsCacheMissMetric metrics2.Counter
}

// New returns a new *SQLTraceStore.
//
// We presume all migrations have been run against db before this function is
// called.
func New(db *sql.DB, dialect perfsql.Dialect, datastoreConfig config.DataStoreConfig) (*SQLTraceStore, error) {
	preparedStatements := map[statement]*sql.Stmt{}
	for key, statement := range statementsByDialect[dialect] {
		prepared, err := db.Prepare(statement)
		if err != nil {
			return nil, skerr.Wrapf(err, "preparing statement %v, %q", key, statement)
		}
		preparedStatements[key] = prepared
	}

	unpreparedStatements := map[statement]*template.Template{}
	for key, tmpl := range templatesByDialect[dialect] {
		t, err := template.New("").Parse(tmpl)
		if err != nil {
			return nil, skerr.Wrapf(err, "parsing template %v, %q", key, tmpl)
		}
		unpreparedStatements[key] = t
	}

	var memoryCacheSize = datastoreConfig.Cache.Size
	if memoryCacheSize == 0 {
		memoryCacheSize = defaultCacheSize
	}

	var err error
	var cache cache.Cache
	if len(datastoreConfig.Cache.MemcachedServers) > 0 && datastoreConfig.Cache.Namespace != "" {
		cache, err = memcached.New(datastoreConfig.Cache.MemcachedServers, datastoreConfig.Cache.Namespace)
		if err != nil {
			// Fall back to in-memory cache.
			cache, err = local.New(memoryCacheSize)
			if err != nil {
				return nil, skerr.Wrap(err)
			}

		}
	} else {
		cache, err = local.New(memoryCacheSize)
		if err != nil {
			return nil, skerr.Wrap(err)
		}
	}

	return &SQLTraceStore{
		db:                                     db,
		preparedStatements:                     preparedStatements,
		unpreparedStatements:                   unpreparedStatements,
		cache:                                  cache,
		tileSize:                               datastoreConfig.TileSize,
		writeTracesMetric:                      metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_writeTraces"),
		writeTracesMetricSQL:                   metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_writeTracesSQL"),
		updateTraceValuesMetric:                metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_updateTraceValues"),
		buildTracesContextMetric:               metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_buildTracesContext"),
		writeTraceIDAndPostingsCacheHitMetric:  metrics2.GetCounter("perfserver_sqltracestore_writeTraceIDAndPostings_cache_hit"),
		writeTraceIDAndPostingsCacheMissMetric: metrics2.GetCounter("perfserver_sqltracestore_writeTraceIDAndPostings_cache_miss"),
		writeTraceIDAndPostingsPostingsCacheHitMetric:  metrics2.GetCounter("perfserver_sqltracestore_writeTraceIDAndPostings_postings_cache_hit"),
		writeTraceIDAndPostingsPostingsCacheMissMetric: metrics2.GetCounter("perfserver_sqltracestore_writeTraceIDAndPostings_postings_cache_miss"),
	}, nil
}

// CommitNumberOfTileStart implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) CommitNumberOfTileStart(commitNumber types.CommitNumber) types.CommitNumber {
	tileNumber := types.TileNumberFromCommitNumber(commitNumber, s.tileSize)
	beginCommit, _ := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)
	return beginCommit
}

// CountIndices implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) CountIndices(ctx context.Context, tileNumber types.TileNumber) (int64, error) {
	var ret int64
	if err := s.preparedStatements[countIndices].QueryRowContext(ctx, tileNumber).Scan(&ret); err != nil {
		return 0, skerr.Wrap(err)
	}
	return ret, nil
}

// GetLatestTile implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) GetLatestTile() (types.TileNumber, error) {
	tileNumber := types.BadTileNumber
	if err := s.preparedStatements[getLatestTile].QueryRowContext(context.TODO()).Scan(&tileNumber); err != nil {
		return tileNumber, skerr.Wrap(err)
	}
	return tileNumber, nil
}

func (s *SQLTraceStore) paramSetForTile(tileNumber types.TileNumber) (paramtools.ParamSet, error) {
	rows, err := s.preparedStatements[paramSetForTile].QueryContext(context.TODO(), tileNumber)
	if err != nil {
		return nil, skerr.Wrapf(err, "tileNumer=%d", tileNumber)
	}
	ret := paramtools.NewParamSet()
	for rows.Next() {
		var keyValue string
		if err := rows.Scan(&keyValue); err != nil {
			return nil, skerr.Wrapf(err, "tileNumer=%d", tileNumber)
		}
		parts := strings.Split(keyValue, "=")
		if len(parts) != 2 {
			return nil, skerr.Fmt("Invalid key=value form: %q", keyValue)
		}
		ret.AddParams(paramtools.Params{parts[0]: parts[1]})
	}
	if err := rows.Err(); err != nil {
		return nil, skerr.Wrapf(err, "tileNumer=%d", tileNumber)
	}
	ret.Normalize()
	return ret, nil
}

// GetOrderedParamSet implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) GetOrderedParamSet(ctx context.Context, tileNumber types.TileNumber) (*paramtools.OrderedParamSet, error) {
	ps, err := s.paramSetForTile(tileNumber)
	if err != nil {
		return nil, skerr.Wrap(err)
	}
	ret := paramtools.NewOrderedParamSet()
	ret.Update(ps)
	sort.Strings(ret.KeyOrder)
	return ret, nil
}

// GetSource implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) GetSource(ctx context.Context, commitNumber types.CommitNumber, traceName string) (string, error) {
	var filename string
	if err := s.preparedStatements[getSource].QueryRowContext(ctx, traceName, commitNumber).Scan(&filename); err != nil {
		return "", skerr.Wrapf(err, "commitNumber=%d traceName=%q", commitNumber, traceName)
	}
	return filename, nil
}

// OffsetFromCommitNumber implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) OffsetFromCommitNumber(commitNumber types.CommitNumber) int32 {
	return int32(commitNumber) % s.tileSize
}

// QueryTracesByIndex implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) QueryTracesByIndex(ctx context.Context, tileNumber types.TileNumber, q *query.Query) (types.TraceSet, error) {
	ops, err := s.GetOrderedParamSet(ctx, tileNumber)
	if err != nil {
		return nil, skerr.Wrapf(err, "Failed to get OPS.")
	}

	plan, err := q.QueryPlan(ops)
	if err != nil {
		// Not an error, we just won't match anything in this tile.
		//
		// The plan may be invalid because it is querying with keys or values
		// that don't appear in a tile, which means they query won't work on
		// this tile, but it may still work on other tiles, so we just don't
		// return any results for this tile.
		return nil, nil
	}
	if len(plan) == 0 {
		// We won't match anything in this tile.
		return nil, nil
	}
	// Sanitize our inputs.
	if err := query.ValidateParamSet(plan); err != nil {
		return nil, skerr.Wrapf(err, "invalid query %#v", *q)
	}
	tileNumberString := strconv.FormatInt(int64(tileNumber), 10)
	beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)

	stmt := fmt.Sprintf(`
SELECT
	TraceIDs.trace_name, TraceValues.commit_number, TraceValues.val
FROM
	TraceIDs
INNER JOIN
	TraceValues
ON
	TraceValues.trace_id = TraceIDs.trace_id
WHERE
	TraceValues.commit_number >= %d
	AND TraceValues.commit_number <= %d
	AND `, beginCommit, endCommit)

	// TODO(jcgregorio) Break out into own function.
	whereClauses := make([]string, len(plan))
	clauseIndex := 0
	for key, values := range plan {
		inClause := make([]string, len(values))
		for i, value := range values {
			inClause[i] = fmt.Sprintf("'%s=%s'", key, value)
		}
		whereClauses[clauseIndex] = `
	TraceValues.trace_id IN (
		SELECT trace_id FROM Postings WHERE key_value IN (` +
			strings.Join(inClause, ",") +
			`) AND tile_number=` + tileNumberString + `
		)`
		clauseIndex++
	}
	fullStatement := stmt + strings.Join(whereClauses, " AND ")

	rows, err := s.db.QueryContext(context.TODO(), fullStatement)
	if err != nil {
		sklog.Debugf("QueryTracesByIndex: fullStatement: %q", fullStatement)
		return nil, skerr.Wrap(err)
	}

	ret := types.TraceSet{}

	for rows.Next() {
		var traceName string
		var commitNumber types.CommitNumber
		var val float64
		if err := rows.Scan(&traceName, &commitNumber, &val); err != nil {
			return nil, skerr.Wrap(err)
		}
		if _, ok := ret[traceName]; ok {
			ret[traceName][s.OffsetFromCommitNumber(commitNumber)] = float32(val)
		} else {
			// TODO(jcgregorio) Replace this vec32.New() with a
			// https://golang.org/pkg/sync/#Pool since this is our most used/reused
			// type of memory.
			ret[traceName] = vec32.New(int(s.tileSize))
			ret[traceName][s.OffsetFromCommitNumber(commitNumber)] = float32(val)
		}
	}
	if err := rows.Err(); err != nil {
		return nil, skerr.Wrap(err)
	}

	return ret, nil
}

// QueryTracesIDOnlyByIndex implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) QueryTracesIDOnlyByIndex(ctx context.Context, tileNumber types.TileNumber, q *query.Query) (<-chan paramtools.Params, error) {
	outParams := make(chan paramtools.Params, engine.QueryEngineChannelSize)
	if q.Empty() {
		close(outParams)
		return outParams, skerr.Fmt("Can't run QueryTracesIDOnlyByIndex for the empty query.")
	}

	ops, err := s.GetOrderedParamSet(ctx, tileNumber)
	if err != nil {
		close(outParams)
		return outParams, skerr.Wrap(err)
	}

	plan, err := q.QueryPlan(ops)
	if err != nil {
		// Not an error, we just won't match anything in this tile.
		//
		// The plan may be invalid because it is querying with keys or values
		// that don't appear in a tile, which means they query won't work on
		// this tile, but it may still work on other tiles, so we just don't
		// return any results for this tile.
		close(outParams)
		return outParams, nil
	}
	if len(plan) == 0 {
		// We won't match anything in this tile.
		close(outParams)
		return outParams, nil
	}
	// Sanitize our inputs.
	if err := query.ValidateParamSet(plan); err != nil {
		close(outParams)
		return outParams, skerr.Wrapf(err, "invalid query %#v", *q)
	}

	tileNumberString := strconv.FormatInt(int64(tileNumber), 10)
	beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)

	stmt := fmt.Sprintf(`
SELECT DISTINCT
	TraceIDs.trace_name
FROM
	TraceIDs
INNER JOIN
	TraceValues
ON
	TraceValues.trace_id = TraceIDs.trace_id
WHERE
	TraceValues.commit_number >= %d
	AND TraceValues.commit_number <= %d
	AND `, beginCommit, endCommit)

	// TODO(jcgregorio) Break out into own function.
	// TODO(jcgregorio) Convert to use text/templates.
	whereClauses := make([]string, len(plan))
	clauseIndex := 0
	for key, values := range plan {
		inClause := make([]string, len(values))
		for i, value := range values {
			inClause[i] = fmt.Sprintf("'%s=%s'", key, value)
		}
		whereClauses[clauseIndex] = `
		TraceValues.trace_id IN (
			SELECT trace_id FROM Postings WHERE key_value IN (` +
			strings.Join(inClause, ",") +
			`) AND tile_number=` + tileNumberString + `
			)`
		clauseIndex++
	}
	fullStatement := stmt + strings.Join(whereClauses, " AND ")

	rows, err := s.db.QueryContext(context.TODO(), fullStatement)
	if err != nil {
		sklog.Debugf("QueryTracesIDOnlyByIndex: fullStatement: %q", fullStatement)
		close(outParams)
		return outParams, skerr.Wrap(err)
	}

	go func() {
		defer close(outParams)

		for rows.Next() {
			var traceName string
			if err := rows.Scan(&traceName); err != nil {
				sklog.Errorf("Failed to scan traceName: %s", skerr.Wrap(err))
				return
			}
			p, err := query.ParseKey(traceName)
			if err != nil {
				sklog.Errorf("Failed to parse traceName: %s", skerr.Wrap(err))
				return
			}
			outParams <- p
		}
		if err := rows.Err(); err != nil {
			sklog.Errorf("Failed while reading traceNames: %s", skerr.Wrap(err))
			return
		}
	}()

	return outParams, nil
}

// ReadTraces implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) ReadTraces(tileNumber types.TileNumber, keys []string) (types.TraceSet, error) {
	// TODO(jcgregorio) Should be broken into batches so we don't exceed the SQL
	// engine limit on query sizes.
	ret := types.TraceSet{}
	for _, key := range keys {
		if !query.ValidateKey(key) {
			return nil, skerr.Fmt("Invalid key stored in shortcut: %q", key)
		}

		// TODO(jcgregorio) Replace this vec32.New() with a
		// https://golang.org/pkg/sync/#Pool since this is our most used/reused
		// type of memory.
		ret[key] = vec32.New(int(s.tileSize))
	}

	// Get the traceIDs for the given keys.
	beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)
	stmt := fmt.Sprintf(`
SELECT
	TraceIDs.trace_name, TraceValues.commit_number, TraceValues.val
FROM
    TraceIDs
INNER JOIN
	TraceValues ON TraceValues.trace_id = TraceIDs.trace_id
WHERE
	TraceValues.commit_number >= %d
	AND TraceValues.commit_number <= %d
	AND TraceIDs.trace_name IN (`, beginCommit, endCommit)
	inClause := make([]string, len(keys))
	for i, key := range keys {
		singleQuoted := "'" + key + "'"
		inClause[i] = singleQuoted
	}
	fullStatement := stmt + strings.Join(inClause, ",") + ")"
	rows, err := s.db.QueryContext(context.TODO(), fullStatement)
	if err != nil {
		sklog.Debugf("ReadTraces: fullStatement: %q", fullStatement)
		return nil, skerr.Wrap(err)
	}
	for rows.Next() {
		var traceName string
		var commitNumber types.CommitNumber
		var val float64
		if err := rows.Scan(&traceName, &commitNumber, &val); err != nil {
			return nil, skerr.Wrap(err)
		}
		ret[traceName][s.OffsetFromCommitNumber(commitNumber)] = float32(val)
	}
	if err := rows.Err(); err != nil {
		return nil, skerr.Wrapf(err, "tileNumber=%d", tileNumber)
	}

	return ret, nil
}

// TileNumber implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) TileNumber(commitNumber types.CommitNumber) types.TileNumber {
	return types.TileNumberFromCommitNumber(commitNumber, s.tileSize)
}

// TileSize implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) TileSize() int32 {
	return s.tileSize
}

// TraceCount implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) TraceCount(ctx context.Context, tileNumber types.TileNumber) (int64, error) {
	var ret int64
	beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)
	err := s.preparedStatements[traceCount].QueryRowContext(context.TODO(), beginCommit, endCommit).Scan(&ret)
	return ret, skerr.Wrap(err)
}

// WriteIndices implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) WriteIndices(ctx context.Context, tileNumber types.TileNumber) error {
	// TODO(jcgregorio) This func should be removed from the interface since it only applied to BigTableTraceStore.
	return nil
}

// updateSourceFile writes the filename into the SourceFiles table and returns
// the sourceFileIDFromSQL of that filename.
func (s *SQLTraceStore) updateSourceFile(filename string) (sourceFileIDFromSQL, error) {
	ret := badSourceFileIDFromSQL
	_, err := s.preparedStatements[insertIntoSourceFiles].ExecContext(context.TODO(), filename)
	if err != nil {
		return ret, skerr.Wrap(err)
	}
	err = s.preparedStatements[getSourceFileID].QueryRowContext(context.TODO(), filename).Scan(&ret)
	if err != nil {
		return ret, skerr.Wrap(err)
	}

	return ret, nil
}

// updatePostings writes all the entries into our inverted index in Postings for
// the given traceID and tileNumber. The Params given are from the parse trace
// name.
func (s *SQLTraceStore) updatePostings(p paramtools.Params, tileNumber types.TileNumber, traceID traceIDFromSQL) error {
	// Clean the data to avoid SQL injection attacks.
	p = query.ForceValid(p)

	// Prepare the data for the SQL template.
	values := make([]insertIntoPostingsValue, 0, len(p))
	for k, v := range p {
		keyValue := fmt.Sprintf("%s=%s", k, v)
		values = append(values, insertIntoPostingsValue{
			TileNumber: tileNumber,
			KeyValue:   keyValue,
			TraceID:    traceID,
		})
	}

	// Expand the template.
	var b bytes.Buffer
	if err := s.unpreparedStatements[insertIntoPostings].Execute(&b, values); err != nil {
		return skerr.Wrapf(err, "Failed to expand insertIntoPostings template with: %v", values)
	}

	_, err := s.db.ExecContext(context.TODO(), b.String())
	if err != nil {
		return skerr.Wrap(err)
	}

	return nil
}

func getPostingsCacheEntryKey(traceID traceIDFromSQL, tileNumber types.TileNumber) string {
	return fmt.Sprintf("%d-%d", traceID, tileNumber)
}

func getHashedTraceName(traceName string) string {
	return fmt.Sprintf("%x", md5.Sum([]byte(traceName)))
}

func (s *SQLTraceStore) getTraceIDFromCache(hashedTraceName string) (traceIDFromSQL, bool) {
	if traceIDString, ok := s.cache.Get(hashedTraceName); ok {
		traceIDInt64, err := strconv.ParseInt(traceIDString, 10, 64)
		if err != nil {
			return badTraceIDFromSQL, false
		}
		return traceIDFromSQL(traceIDInt64), true
	}
	return badTraceIDFromSQL, false
}

// postingsCacheValue is what we write to the cache when postings exist for the
// given key. The actual value doesn't matter since we only check for existence
// when reading the cache.
const postingsCacheValue = "1"

// writeTraceIDAndPostings writes the trace name into the TraceIDs table and
// returns the traceIDFromSQL of that trace name. This operation will happen
// repeatedly as data is ingested so we cache the results in the LRU cache.
func (s *SQLTraceStore) writeTraceIDAndPostings(traceNameAsParams paramtools.Params, tileNumber types.TileNumber) (traceIDFromSQL, error) {
	traceID := badTraceIDFromSQL

	traceName, err := query.MakeKey(traceNameAsParams)
	if err != nil {
		return traceID, skerr.Wrap(err)
	}
	hashedTraceName := getHashedTraceName(traceName)

	// Get a traceIDFromSQL for the traceName.
	var ok bool
	if traceID, ok = s.getTraceIDFromCache(hashedTraceName); ok {
		s.writeTraceIDAndPostingsCacheHitMetric.Inc(1)
	} else {
		s.writeTraceIDAndPostingsCacheMissMetric.Inc(1)
		_, err := s.preparedStatements[insertIntoTraceIDs].ExecContext(context.TODO(), traceName)
		if err != nil {
			return traceID, skerr.Wrapf(err, "traceName=%q", traceName)
		}
		err = s.preparedStatements[getTraceID].QueryRowContext(context.TODO(), traceName).Scan(&traceID)
		if err != nil {
			return traceID, skerr.Wrapf(err, "traceName=%q", traceName)
		}
		s.cache.Add(hashedTraceName, fmt.Sprintf("%d", traceID))
	}
	postingsCacheEntryKey := getPostingsCacheEntryKey(traceID, tileNumber)
	if !s.cache.Exists(postingsCacheEntryKey) {
		s.writeTraceIDAndPostingsPostingsCacheMissMetric.Inc(1)
		// TODO(jcgregorio) Do a query on the database to see if the postings are there.

		// Update postings.
		if err := s.updatePostings(traceNameAsParams, tileNumber, traceID); err != nil {
			return traceID, skerr.Wrapf(err, "traceName=%q tileNumber=%d traceID=%d", traceName, tileNumber, traceID)
		}
		s.cache.Add(postingsCacheEntryKey, postingsCacheValue)
	} else {
		s.writeTraceIDAndPostingsPostingsCacheHitMetric.Inc(1)
	}

	return traceID, nil
}

// updateTraceValues writes the given slice of replaceTraceValues into the store.
func (s *SQLTraceStore) updateTraceValues(templateContext []replaceTraceValue) error {
	defer timer.NewWithSummary("perfserver_sqltracestore_updateTraceValues", s.updateTraceValuesMetric).Stop()
	var b bytes.Buffer
	if err := s.unpreparedStatements[replaceTraceValues].Execute(&b, templateContext); err != nil {
		return skerr.Wrapf(err, "failed to expand template")
	}
	_, err := s.db.ExecContext(context.TODO(), b.String())
	return skerr.Wrap(err)
}

// WriteTraces implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) WriteTraces(commitNumber types.CommitNumber, params []paramtools.Params, values []float32, _ paramtools.ParamSet, source string, _ time.Time) error {
	defer timer.NewWithSummary("perfserver_sqltracestore_writeTraces", s.writeTracesMetric).Stop()
	tileNumber := types.TileNumberFromCommitNumber(commitNumber, s.tileSize)
	// Get the row id for the source file.
	sourceID, err := s.updateSourceFile(source)
	if err != nil {
		return skerr.Wrap(err)
	}

	t := timer.NewWithSummary("perfserver_sqltracestore_buildTracesContext", s.buildTracesContextMetric)
	// Build the 'context' which will be used to populate the SQL template.
	templateContext := make([]replaceTraceValue, 0, len(params))
	for i, p := range params {
		traceID, err := s.writeTraceIDAndPostings(p, tileNumber)
		if err != nil {
			t.Stop()
			return skerr.Wrap(err)
		}
		templateContext = append(templateContext, replaceTraceValue{
			TraceID:      traceID,
			CommitNumber: commitNumber,
			Val:          values[i],
			SourceFileID: sourceID,
		})
	}
	t.Stop()

	defer timer.NewWithSummary("perfserver_sqltracestore_writeTraces_sql", s.writeTracesMetricSQL).Stop()
	err = util.ChunkIterParallel(context.TODO(), len(templateContext), traceValuesInsertBatchSize, func(ctx context.Context, startIdx int, endIdx int) error {
		if err := s.updateTraceValues(templateContext[startIdx:endIdx]); err != nil {
			return skerr.Wrapf(err, "failed inserting subSlice: [%d:%d]", startIdx, endIdx)
		}
		return nil
	})

	return err
}

// Confirm that *SQLTraceStore fulfills the tracestore.TraceStore interface.
var _ tracestore.TraceStore = (*SQLTraceStore)(nil)
