| /* |
| Package sqltracestore implements a tracestore.TraceStore on top of SQL. We'll |
| look that the SQL schema used to explain how SQLTraceStore maps traces into an |
| SQL database. |
| |
| We store the name of every source file that has been ingested in the SourceFiles |
| table so we can use the shorter 64 bit source_file_id in other tables. |
| |
| SourceFiles ( |
| source_file_id INT PRIMARY KEY DEFAULT unique_rowid(), |
| source_file TEXT UNIQUE NOT NULL |
| ) |
| |
| Each trace name, which is a structured key (See /infra/go/query) of the |
| form,key1=value1,key2=value2,..., is stored either as the md5 hash of the trace |
| name, i.e. trace_id = md5(trace_name) or as the series of key=value pairs that |
| make up the params of the key. |
| |
| When we store the values of each trace in the TraceValues table, use the |
| trace_id and the commit_number as the primary key. We also store not only the |
| value but the id of the source file that the value came from. |
| |
| CREATE TABLE IF NOT EXISTS TraceValues ( |
| trace_id BYTES, |
| -- Id of the trace name from TraceIDS. |
| commit_number INT, |
| -- A types.CommitNumber. |
| val REAL, |
| -- The floating point measurement. |
| source_file_id INT, |
| -- Id of the source filename, from SourceFiles. |
| PRIMARY KEY (trace_id, commit_number) |
| ); |
| |
| Just using this table we can construct some useful queries. For example we can |
| count the number of traces in a single tile, in this case the 0th tile in a |
| system with a tileSize of 256: |
| |
| SELECT |
| COUNT(DISTINCT trace_id) |
| FROM |
| TraceValues |
| WHERE |
| commit_number >= 0 AND commit_number < 256; |
| |
| The Postings table is our inverted index for looking up which trace ids contain |
| which key=value pairs. For a good introduction to postings and search |
| https://www.tbray.org/ongoing/When/200x/2003/06/18/HowSearchWorks is a good |
| resource. |
| |
| Remember that each trace name is a structured key of the |
| form,arch=x86,config=8888,..., and that over time traces may come and go, i.e. |
| we may stop running a test, or start running new tests, so if we want to make |
| searching for traces efficient we need to be aware of how those trace ids change |
| over time. The answer is to break our store in Tiles, i.e. blocks of commits of |
| tileSize length, and then for each Tile we keep an inverted index of the trace |
| ids. |
| |
| In the table below we store a key_value which is the literal "key=value" part of |
| a trace name, along with the tile_number and the md5 trace_id. Note that |
| tile_number is just int(commitNumber/tileSize). |
| |
| CREATE TABLE IF NOT EXISTS Postings ( |
| -- A types.TileNumber. |
| tile_number INT, |
| -- A key value pair from a structured key, e.g. "config=8888". |
| key_value STRING NOT NULL, |
| -- md5(trace_name) |
| trace_id BYTES, |
| PRIMARY KEY (tile_number, key_value, trace_id) |
| ); |
| |
| Finally, to make it fast to turn UI queries into SQL queries we store the |
| ParamSet representing all the trace names in the Tile. |
| |
| CREATE TABLE IF NOT EXISTS ParamSets ( |
| tile_number INT, |
| param_key STRING, |
| param_value STRING, |
| PRIMARY KEY (tile_number, param_key, param_value), |
| INDEX (tile_number DESC), |
| ); |
| |
| So for example to build a ParamSet for a tile: |
| |
| SELECT |
| param_key, param_value |
| FROM |
| ParamSets |
| WHERE |
| tile_number=0; |
| |
| To find the most recent tile: |
| |
| SELECT |
| tile_number |
| FROM |
| ParamSets |
| ORDER BY |
| tile_number DESC LIMIT 1; |
| |
| To query for traces we first find the trace_ids of all the traces that would |
| match the given query on a tile. |
| |
| SELECT |
| encode(trace_id, 'hex') |
| FROM |
| Postings |
| WHERE |
| key_value IN ('config=8888', 'config=565') |
| AND tile_number = 0 |
| INTERSECT |
| SELECT |
| encode(trace_id, 'hex') |
| FROM |
| Postings |
| WHERE |
| key_value IN ('arch=x86', 'arch=risc-v') |
| AND tile_number = 0; |
| |
| Then once you have all the trace_ids, load the values from the TraceValues |
| table. |
| |
| SELECT |
| trace_id, |
| commit_number, |
| val |
| FROM |
| TraceValues |
| WHERE |
| tracevalues.commit_number >= 0 |
| AND tracevalues.commit_number < 256 |
| AND tracevalues.trace_id IN ( |
| '\xfe385b159ff55dca481069805e5ff050', |
| '\x277262a9236d571883d47dab102070bc' |
| ); |
| |
| Look in migrations/cdb.sql for more example of raw queries using a simple |
| example dataset. |
| */ |
| package sqltracestore |
| |
| import ( |
| "bytes" |
| "context" |
| "crypto/md5" |
| "fmt" |
| "sort" |
| "strings" |
| "sync" |
| "text/template" |
| "time" |
| |
| lru "github.com/hashicorp/golang-lru" |
| "github.com/jackc/pgx/v4" |
| "go.opencensus.io/trace" |
| "go.skia.org/infra/go/cache" |
| "go.skia.org/infra/go/cache/local" |
| "go.skia.org/infra/go/cache/memcached" |
| "go.skia.org/infra/go/metrics2" |
| "go.skia.org/infra/go/now" |
| "go.skia.org/infra/go/paramtools" |
| "go.skia.org/infra/go/query" |
| "go.skia.org/infra/go/skerr" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/sql/pool" |
| "go.skia.org/infra/go/timer" |
| "go.skia.org/infra/go/util" |
| "go.skia.org/infra/go/vec32" |
| "go.skia.org/infra/perf/go/config" |
| "go.skia.org/infra/perf/go/git/provider" |
| "go.skia.org/infra/perf/go/tracecache" |
| "go.skia.org/infra/perf/go/tracestore" |
| "go.skia.org/infra/perf/go/types" |
| "golang.org/x/sync/errgroup" |
| ) |
| |
| const ( |
| // cacheMetricsRefreshDuration controls how often we update the metrics for |
| // in-memory caches. |
| cacheMetricsRefreshDuration = 15 * time.Second |
| |
| writeTracesValuesChunkSize = 100 |
| writeTracesPostingsChunkSize = 100 |
| writeTracesParamSetsChunkSize = 100 |
| // The number of parallel writes when writing postings data. |
| writePostingsParallelPoolSize = 5 |
| |
| // The number of parallel writes when writing traces data. |
| writeTracesParallelPoolSize = 5 |
| |
| // queryTracesChunkSize is the number of traces we try to read trace values for |
| // at a time. |
| queryTracesChunkSize = 10000 |
| |
| // queryTraceParamsChunkSize is the number of traces we try to convert into |
| // params in a single request. |
| queryTraceParamsChunkSize = 2000 |
| |
| // Number of parallel requests sent to the database when servicing a single |
| // query. 30 matches the max number of cores we use on a clustering instance. |
| poolSize = 30 |
| |
| queryTracesIDOnlyByIndexChannelSize = 10000 |
| |
| // defaultCacheSize is the size of the in-memory LRU caches. |
| defaultCacheSize = 40 * 1000 * 1000 |
| |
| orderedParamSetCacheSize = 100 |
| |
| orderedParamSetCacheTTL = 5 * time.Minute |
| |
| // Keep this small. Queries that have a small number of matches will return |
| // quickly and those are the important queries. |
| countingQueryDuration = 5 * time.Second |
| |
| // Max number of trace_ids to add to a query to speed it up. This is a rough |
| // guess, some testing should be done to validate the right size for this |
| // const. |
| countOptimizationThreshold = 10000 |
| |
| // Max no of traceIds to store in a single cache entry. |
| maxTraceIdsInCache = 200 |
| ) |
| |
| type orderedParamSetCacheEntry struct { |
| expires time.Time // When this entry expires. |
| paramSet paramtools.ReadOnlyParamSet |
| } |
| |
| // traceIDForSQL is the type of the IDs that are used in the SQL queries, |
| // they are hex encoded md5 hashes of a trace name, e.g. "\x00112233...". |
| // Note the \x prefix which tells the database that this is hex encoded. |
| type traceIDForSQL string |
| |
| var badTraceIDFromSQL traceIDForSQL = "" |
| |
| // traceIDForSQLInBytes is the md5 hash of a trace name. |
| type traceIDForSQLInBytes [md5.Size]byte |
| |
| // Calculates the traceIDForSQL for the given trace name, e.g. "\x00112233...". |
| // Note the \x prefix which tells the database that this is hex encoded. |
| func traceIDForSQLFromTraceName(traceName string) traceIDForSQL { |
| b := md5.Sum([]byte(traceName)) |
| return traceIDForSQL(fmt.Sprintf("\\x%x", b)) |
| } |
| |
| func traceIDForSQLInBytesFromTraceName(traceName string) traceIDForSQLInBytes { |
| return md5.Sum([]byte(traceName)) |
| } |
| |
| func traceIDForSQLFromTraceIDAsBytes(b []byte) traceIDForSQL { |
| return traceIDForSQL(fmt.Sprintf("\\x%x", b)) |
| } |
| |
| // sourceFileIDFromSQL is the type of the IDs that are used in the SQL database |
| // for source files. |
| type sourceFileIDFromSQL int64 |
| |
| const badSourceFileIDFromSQL sourceFileIDFromSQL = -1 |
| |
| // statement is an SQL statement or fragment of an SQL statement. |
| type statement int |
| |
| // All the different statements we need. Each statement will appear either in |
| // templatesByDialect or statementsByDialect. |
| const ( |
| insertIntoSourceFiles statement = iota |
| insertIntoTraceValues |
| insertIntoTraceValues2 |
| insertIntoPostings |
| insertIntoParamSets |
| getSourceFileID |
| getLatestTile |
| paramSetForTile |
| getSource |
| getSources |
| traceCount |
| queryTraceIDs |
| queryTraceIDsByKeyValue |
| readTraces |
| getLastNSources |
| getTraceIDsBySource |
| countMatchingTraces |
| restrictClause |
| deleteCommit |
| countCommitInCommitNumberRange |
| getCommitsFromCommitNumberRange |
| ) |
| |
| var templates = map[statement]string{ |
| insertIntoTraceValues: `INSERT INTO |
| TraceValues (trace_id, commit_number, val, source_file_id) |
| VALUES |
| {{ range $index, $element := . -}} |
| {{ if $index }},{{end}} |
| ( |
| '{{ $element.MD5HexTraceID }}', {{ $element.CommitNumber }}, {{ $element.Val }}, {{ $element.SourceFileID }} |
| ) |
| {{ end }} |
| ON CONFLICT (trace_id, commit_number) DO UPDATE |
| SET trace_id=EXCLUDED.trace_id, commit_number=EXCLUDED.commit_number, val=EXCLUDED.val, source_file_id=EXCLUDED.source_file_id |
| `, |
| insertIntoTraceValues2: `INSERT INTO |
| TraceValues2 (trace_id, commit_number, val, source_file_id, benchmark, bot, test, subtest_1, subtest_2, subtest_3) |
| VALUES |
| {{ range $index, $element := . -}} |
| {{ if $index }},{{end}} |
| ( |
| '{{ $element.MD5HexTraceID }}', {{ $element.CommitNumber }}, {{ $element.Val }}, {{ $element.SourceFileID }}, |
| '{{ $element.Benchmark }}', '{{ $element.Bot }}', '{{ $element.Test }}', '{{ $element.Subtest_1 }}', |
| '{{ $element.Subtest_2 }}', '{{ $element.Subtest_3 }}' |
| ) |
| {{ end }} |
| ON CONFLICT (trace_id, commit_number) DO UPDATE |
| SET trace_id=EXCLUDED.trace_id, commit_number=EXCLUDED.commit_number, val=EXCLUDED.val, source_file_id=EXCLUDED.source_file_id, |
| benchmark=EXCLUDED.benchmark, bot=EXCLUDED.bot, test=EXCLUDED.test, subtest_1=EXCLUDED.subtest_1, subtest_2=EXCLUDED.subtest_2, subtest_3=EXCLUDED.subtest_3 |
| `, |
| queryTraceIDs: ` |
| {{ $key := .Key }} |
| SELECT |
| trace_id |
| FROM |
| Postings |
| WHERE |
| tile_number = {{ .TileNumber }} |
| AND key_value IN |
| ( |
| {{ range $index, $value := .Values -}} |
| {{ if $index }},{{end}} |
| '{{ $key }}={{ $value }}' |
| {{ end }} |
| ) |
| {{ .RestrictClause }} |
| ORDER BY trace_id`, |
| queryTraceIDsByKeyValue: ` |
| {{ $key := .Key }} |
| SELECT |
| trace_id |
| FROM |
| Postings |
| WHERE |
| tile_number = {{ .TileNumber }} |
| AND key_value IN |
| ( |
| {{ range $index, $value := .Values -}} |
| {{ if $index }},{{end}} |
| '{{ $key }}={{ $value }}' |
| {{ end }} |
| ) |
| ORDER BY trace_id`, |
| readTraces: ` |
| SELECT |
| trace_id, |
| commit_number, |
| val |
| FROM |
| TraceValues |
| WHERE |
| commit_number >= {{ .BeginCommitNumber }} |
| AND commit_number <= {{ .EndCommitNumber }} |
| AND trace_id IN |
| ( |
| {{ range $index, $trace_id := .TraceIDs -}} |
| {{ if $index }},{{end}} |
| '{{ $trace_id }}' |
| {{ end }} |
| ) |
| `, |
| getSource: ` |
| SELECT |
| SourceFiles.source_file |
| FROM |
| TraceValues |
| INNER JOIN SourceFiles ON SourceFiles.source_file_id = TraceValues.source_file_id |
| WHERE |
| TraceValues.trace_id = '{{ .MD5HexTraceID }}' |
| AND TraceValues.commit_number = {{ .CommitNumber }}`, |
| getSources: ` |
| SELECT |
| TraceValues.commit_number, SourceFiles.source_file |
| FROM |
| TraceValues |
| INNER JOIN SourceFiles ON SourceFiles.source_file_id = TraceValues.source_file_id |
| WHERE |
| TraceValues.trace_id = '{{ .MD5HexTraceID }}' |
| AND TraceValues.commit_number IN `, |
| insertIntoPostings: ` |
| INSERT INTO |
| Postings (tile_number, key_value, trace_id) |
| VALUES |
| {{ range $index, $element := . -}} |
| {{ if $index }},{{end}} |
| ( {{ $element.TileNumber }}, '{{ $element.Key }}={{ $element.Value }}', '{{ $element.MD5HexTraceID }}' ) |
| {{ end }} |
| ON CONFLICT (tile_number, key_value, trace_id) DO NOTHING`, |
| insertIntoParamSets: ` |
| INSERT INTO |
| ParamSets (tile_number, param_key, param_value) |
| VALUES |
| {{ range $index, $element := . -}} |
| {{ if $index }},{{end}} |
| ( {{ $element.TileNumber }}, '{{ $element.Key }}', '{{ $element.Value }}' ) |
| {{ end }} |
| ON CONFLICT (tile_number, param_key, param_value) |
| DO NOTHING`, |
| paramSetForTile: ` |
| SELECT |
| param_key, param_value |
| FROM |
| ParamSets |
| WHERE |
| tile_number = {{ .TileNumber }}`, |
| countMatchingTraces: ` |
| {{ $key := .Key }} |
| SELECT |
| count(*) |
| FROM ( |
| SELECT |
| * |
| FROM |
| Postings |
| WHERE |
| tile_number = {{ .TileNumber }} |
| AND key_value IN |
| ( |
| {{ range $index, $value := .Values -}} |
| {{ if $index }},{{end}} |
| '{{ $key }}={{ $value }}' |
| {{ end }} |
| ) |
| LIMIT {{ .CountOptimizationThreshold }} |
| ) AS temp`, |
| restrictClause: ` |
| AND trace_ID IN |
| ({{ range $index, $value := .Values -}} |
| {{ if $index }},{{end}} |
| '{{ $value }}' |
| {{ end }})`, |
| } |
| |
| // replaceTraceValuesContext is the context for the replaceTraceValues template. |
| type insertIntoTraceValuesContext struct { |
| // The MD5 sum of the trace name as a hex string, i.e. |
| // "\xfe385b159ff55dca481069805e5ff050". Note the leading \x which |
| // the database will use to know the string is in hex. |
| MD5HexTraceID traceIDForSQL |
| |
| CommitNumber types.CommitNumber |
| Val float32 |
| SourceFileID sourceFileIDFromSQL |
| } |
| |
| // replaceTraceValuesContext is the context for the replaceTraceValues2 template. |
| type insertIntoTraceValuesContext2 struct { |
| // The MD5 sum of the trace name as a hex string, i.e. |
| // "\xfe385b159ff55dca481069805e5ff050". Note the leading \x which |
| // the database will use to know the string is in hex. |
| MD5HexTraceID traceIDForSQL |
| |
| CommitNumber types.CommitNumber |
| Val float32 |
| SourceFileID sourceFileIDFromSQL |
| Benchmark string |
| Bot string |
| Test string |
| Subtest_1 string |
| Subtest_2 string |
| Subtest_3 string |
| } |
| |
| // replaceTraceNamesContext is the context for the replaceTraceNames template. |
| type replaceTraceNamesContext struct { |
| // The trace's Params serialize as JSON. |
| JSONParams string |
| |
| // The MD5 sum of the trace name as a hex string, i.e. |
| // "\xfe385b159ff55dca481069805e5ff050". Note the leading \x which |
| // the database will use to know the string is in hex. |
| MD5HexTraceID traceIDForSQL |
| } |
| |
| // queryPlanContext is used in queryTracesContext. |
| type queryPlanContext struct { |
| Key string |
| Values []string |
| } |
| |
| // queryTraceIDsContext is the context for the queryTraceIDsContext template. |
| type queryTraceIDsContext struct { |
| TileNumber types.TileNumber |
| Key string |
| Values []string |
| AsOf string |
| RestrictClause string |
| } |
| |
| // queryTraceIDsByKeyValueContext is the context for the queryTraceIDsByKeyValueContext template. |
| type queryTraceIDsByKeyValueContext struct { |
| TileNumber types.TileNumber |
| Key string |
| Values []string |
| AsOf string |
| } |
| |
| // readTracesContext is the context for the readTraces template. |
| type readTracesContext struct { |
| BeginCommitNumber types.CommitNumber |
| EndCommitNumber types.CommitNumber |
| TraceIDs []traceIDForSQL |
| AsOf string |
| } |
| |
| // getSourceContext is the context for the getSource template. |
| type getSourceContext struct { |
| CommitNumber types.CommitNumber |
| |
| // The MD5 sum of the trace name as a hex string, i.e. |
| // "\xfe385b159ff55dca481069805e5ff050". Note the leading \x which |
| // the database will use to know the string is in hex. |
| MD5HexTraceID traceIDForSQL |
| } |
| |
| // getSourcesContext is the context for the getSourceIds template. |
| type getSourcesContext struct { |
| // The MD5 sum of the trace name as a hex string, i.e. |
| // "\xfe385b159ff55dca481069805e5ff050". Note the leading \x which |
| // query will use to know the string is in hex. |
| MD5HexTraceID traceIDForSQL |
| } |
| |
| // insertIntoTilesContext is the context for the insertIntoTiles template. |
| type insertIntoPostingsContext struct { |
| TileNumber types.TileNumber |
| |
| // Key is a Params key. |
| Key string |
| |
| // Value is the value for the Params key above. |
| Value string |
| |
| // The MD5 sum of the trace name as a hex string, i.e. |
| // "\xfe385b159ff55dca481069805e5ff050". Note the leading \x which |
| // the database will use to know the string is in hex. |
| MD5HexTraceID traceIDForSQL |
| |
| // cacheKey is the key for this entry in the local LRU cache. It is not used |
| // as part of the SQL template. |
| cacheKey string |
| } |
| |
| // insertIntoParamSetsContext is the context for the insertIntoParamSets template. |
| type insertIntoParamSetsContext struct { |
| TileNumber types.TileNumber |
| Key string |
| Value string |
| |
| // cacheKey is the key for this entry in the local LRU cache. It is not used |
| // as part of the SQL template. |
| cacheKey string |
| } |
| |
| // paramSetForTileContext is the context for the paramSetForTile template. |
| type paramSetForTileContext struct { |
| TileNumber types.TileNumber |
| AsOf string |
| } |
| |
| // countMatchingTraces is the context for the countMatchingTraces template. |
| type countMatchingTracesContext struct { |
| TileNumber types.TileNumber |
| Key string |
| Values []string |
| AsOf string |
| CountOptimizationThreshold int64 |
| } |
| |
| // restrictClauseContext is the context for the restrictClause template. |
| type restrictClauseContext struct { |
| Key string |
| Values []traceIDForSQL |
| } |
| |
| var statements = map[statement]string{ |
| insertIntoSourceFiles: ` |
| INSERT INTO |
| SourceFiles (source_file) |
| VALUES |
| ($1) |
| ON CONFLICT (source_file_id) |
| DO NOTHING`, |
| getSourceFileID: ` |
| SELECT |
| source_file_id |
| FROM |
| SourceFiles |
| WHERE |
| source_file=$1`, |
| getLatestTile: ` |
| SELECT |
| tile_number |
| FROM |
| ParamSets |
| ORDER BY |
| tile_number DESC |
| LIMIT |
| 1;`, |
| traceCount: ` |
| SELECT |
| COUNT(DISTINCT trace_id) |
| FROM |
| Postings |
| WHERE |
| tile_number = $1`, |
| getLastNSources: ` |
| SELECT |
| SourceFiles.source_file, TraceValues.commit_number |
| FROM |
| TraceValues |
| INNER JOIN |
| SourceFiles |
| ON |
| TraceValues.source_file_id = SourceFiles.source_file_id |
| WHERE |
| TraceValues.trace_id=$1 |
| ORDER BY |
| TraceValues.commit_number DESC |
| LIMIT |
| $2`, |
| getTraceIDsBySource: ` |
| SELECT |
| Postings.key_value, Postings.trace_id |
| FROM |
| SourceFiles |
| INNER JOIN |
| TraceValues |
| ON |
| TraceValues.source_file_id = SourceFiles.source_file_id |
| INNER JOIN |
| Postings |
| ON |
| TraceValues.trace_id = Postings.trace_id |
| WHERE |
| SourceFiles.source_file = $1 |
| AND |
| Postings.tile_number= $2 |
| ORDER BY |
| Postings.trace_id`, |
| countCommitInCommitNumberRange: ` |
| SELECT |
| count(*) |
| FROM |
| Commits |
| WHERE |
| commit_number >= $1 |
| AND commit_number <= $2`, |
| getCommitsFromCommitNumberRange: ` |
| SELECT |
| commit_number, git_hash, commit_time, author, subject |
| FROM |
| Commits |
| WHERE |
| commit_number >= $1 |
| AND commit_number <= $2 |
| ORDER BY |
| commit_number ASC |
| `, |
| deleteCommit: ` |
| DELETE FROM |
| Commits |
| WHERE |
| commit_number = $1 |
| `, |
| } |
| |
| type timeProvider func() time.Time |
| |
| // SQLTraceStore implements tracestore.TraceStore backed onto an SQL database. |
| type SQLTraceStore struct { |
| // db is the SQL database instance. |
| db pool.Pool |
| inMemoryTraceParams *InMemoryTraceParams |
| |
| // unpreparedStatements are parsed templates that can be used to construct SQL statements. |
| unpreparedStatements map[statement]*template.Template |
| |
| // statements are already constructed SQL statements. |
| statements map[statement]string |
| |
| // And from md5(trace_name)+tile_number -> true if the trace_name has |
| // already been written to the Postings table. |
| // |
| // And from (tile_number, paramKey, paramValue) -> true if the param has |
| // been written to the ParamSets tables. |
| cache cache.Cache |
| |
| // orderedParamSetCache is a cache for OrderedParamSets that have a TTL. The |
| // cache maps tileNumber -> orderedParamSetCacheEntry. |
| orderedParamSetCache *lru.Cache |
| |
| // tileSize is the number of commits per Tile. |
| tileSize int32 |
| |
| traceParamStore tracestore.TraceParamStore |
| |
| // metrics |
| writeTracesMetric metrics2.Float64SummaryMetric |
| writeTracesMetricSQL metrics2.Float64SummaryMetric |
| buildTracesContextsMetric metrics2.Float64SummaryMetric |
| cacheMissMetric metrics2.Counter |
| orderedParamSetsCacheMissMetric metrics2.Counter |
| queryUsesRestrictClause metrics2.Counter |
| queryRestrictionMinKeyInPlan metrics2.Float64SummaryMetric |
| orderedParamSetCacheLen metrics2.Int64Metric |
| commitSliceFromCommitNumberRangeCalled metrics2.Counter |
| } |
| |
| // New returns a new *SQLTraceStore. |
| // |
| // We presume all migrations have been run against db before this function is |
| // called. |
| func New(db pool.Pool, datastoreConfig config.DataStoreConfig, traceParamStore tracestore.TraceParamStore, |
| inMemoryTraceParams *InMemoryTraceParams) (*SQLTraceStore, error) { |
| unpreparedStatements := map[statement]*template.Template{} |
| queryTemplates := templates |
| for key, tmpl := range queryTemplates { |
| t, err := template.New("").Parse(tmpl) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "parsing template %v, %q", key, tmpl) |
| } |
| unpreparedStatements[key] = t |
| } |
| |
| var cache cache.Cache |
| var err error |
| if datastoreConfig.CacheConfig != nil && len(datastoreConfig.CacheConfig.MemcachedServers) > 0 { |
| cache, err = memcached.New(datastoreConfig.CacheConfig.MemcachedServers, datastoreConfig.CacheConfig.Namespace) |
| } else { |
| cache, err = local.New(defaultCacheSize) |
| } |
| if err != nil { |
| return nil, skerr.Wrapf(err, "failed to build cache.") |
| } |
| |
| paramSetCache, err := lru.New(orderedParamSetCacheSize) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| ret := &SQLTraceStore{ |
| db: db, |
| inMemoryTraceParams: inMemoryTraceParams, |
| unpreparedStatements: unpreparedStatements, |
| statements: statements, |
| tileSize: datastoreConfig.TileSize, |
| cache: cache, |
| orderedParamSetCache: paramSetCache, |
| traceParamStore: traceParamStore, |
| writeTracesMetric: metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_write_traces"), |
| writeTracesMetricSQL: metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_write_traces_sql"), |
| buildTracesContextsMetric: metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_build_traces_context"), |
| cacheMissMetric: metrics2.GetCounter("perfserver_sqltracestore_cache_miss"), |
| queryUsesRestrictClause: metrics2.GetCounter("perfserver_sqltracestore_restrict_clause_used"), |
| orderedParamSetsCacheMissMetric: metrics2.GetCounter("perfserver_sqltracestore_ordered_paramsets_cache_miss"), |
| queryRestrictionMinKeyInPlan: metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_min_key_in_plan"), |
| orderedParamSetCacheLen: metrics2.GetInt64Metric("perfserver_sqltracestore_ordered_paramset_cache_len"), |
| commitSliceFromCommitNumberRangeCalled: metrics2.GetCounter("perfserver_sqltracestore_commit_slice_from_commit_number_range_called"), |
| } |
| |
| return ret, nil |
| } |
| |
| // StartBackgroundMetricsGathering runs continuously in the background and gathers |
| // metrics related to param sets in the database. |
| func (s *SQLTraceStore) StartBackgroundMetricsGathering() { |
| for range time.Tick(cacheMetricsRefreshDuration) { |
| s.orderedParamSetCacheLen.Update(int64(s.orderedParamSetCache.Len())) |
| ctx, cancel := context.WithTimeout(context.Background(), time.Minute) |
| tileNumber, err := s.GetLatestTile(ctx) |
| if err != nil { |
| sklog.Errorf("Failed to load latest tile when calculating metrics: %s", err) |
| cancel() |
| continue |
| } |
| s.updateParamSetMetricsForTile(ctx, tileNumber) |
| s.updateParamSetMetricsForTile(ctx, tileNumber-1) |
| cancel() |
| } |
| } |
| |
| func (s *SQLTraceStore) updateParamSetMetricsForTile(ctx context.Context, tileNumber types.TileNumber) { |
| ps, err := s.GetParamSet(ctx, tileNumber) |
| if err != nil { |
| sklog.Errorf("Failed to load ParamSet when calculating metrics: %s") |
| return |
| } |
| metrics2.GetInt64Metric("perfserver_sqltracestore_paramset_size", map[string]string{"tileNumber": fmt.Sprintf("%d", tileNumber)}).Update(int64(ps.Size())) |
| } |
| |
| // CommitNumberOfTileStart implements the tracestore.TraceStore interface. |
| func (s *SQLTraceStore) CommitNumberOfTileStart(commitNumber types.CommitNumber) types.CommitNumber { |
| tileNumber := types.TileNumberFromCommitNumber(commitNumber, s.tileSize) |
| beginCommit, _ := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize) |
| return beginCommit |
| } |
| |
| // GetLatestTile implements the tracestore.TraceStore interface. |
| func (s *SQLTraceStore) GetLatestTile(ctx context.Context) (types.TileNumber, error) { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.GetLatestTile") |
| defer span.End() |
| |
| tileNumber := types.BadTileNumber |
| if err := s.db.QueryRow(ctx, s.statements[getLatestTile]).Scan(&tileNumber); err != nil { |
| return types.BadTileNumber, skerr.Wrap(err) |
| } |
| return tileNumber, nil |
| } |
| |
| func (s *SQLTraceStore) paramSetForTile(ctx context.Context, tileNumber types.TileNumber) (paramtools.ReadOnlyParamSet, error) { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.GetParamSet") |
| defer span.End() |
| |
| defer timer.New(fmt.Sprintf("paramSetForTile-%d", tileNumber)).Stop() |
| |
| context := paramSetForTileContext{ |
| TileNumber: tileNumber, |
| AsOf: "", |
| } |
| |
| // Expand the template for the SQL. |
| var b bytes.Buffer |
| if err := s.unpreparedStatements[paramSetForTile].Execute(&b, context); err != nil { |
| return nil, skerr.Wrapf(err, "failed to expand paramSetForTile template") |
| } |
| sql := b.String() |
| |
| rows, err := s.db.Query(ctx, sql) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "Failed querying - tileNumber=%d", tileNumber) |
| } |
| ps := paramtools.NewParamSet() |
| for rows.Next() { |
| var key string |
| var value string |
| if err := rows.Scan(&key, &value); err != nil { |
| return nil, skerr.Wrapf(err, "Failed scanning row - tileNumber=%d", tileNumber) |
| } |
| // This is safe because the paramsets table enforces uniqueness already |
| ps[key] = append(ps[key], value) |
| } |
| ps.Normalize() |
| ret := ps.Freeze() |
| if err == pgx.ErrNoRows { |
| return ret, nil |
| } |
| if err := rows.Err(); err != nil { |
| return nil, skerr.Wrapf(err, "Other failure - tileNumber=%d", tileNumber) |
| } |
| |
| return ret, nil |
| } |
| |
| // ClearOrderedParamSetCache is only used for tests. |
| func (s *SQLTraceStore) ClearOrderedParamSetCache() { |
| s.orderedParamSetCache.Purge() |
| } |
| |
| // GetParamSet implements the tracestore.TraceStore interface. |
| func (s *SQLTraceStore) GetParamSet(ctx context.Context, tileNumber types.TileNumber) (paramtools.ReadOnlyParamSet, error) { |
| defer timer.New("GetParamSet").Stop() |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.GetParamSet") |
| defer span.End() |
| |
| now := now.Now(ctx) |
| iEntry, ok := s.orderedParamSetCache.Get(tileNumber) |
| if ok { |
| if entry, ok := iEntry.(orderedParamSetCacheEntry); ok && entry.expires.After(now) { |
| return entry.paramSet, nil |
| } |
| _ = s.orderedParamSetCache.Remove(tileNumber) |
| } |
| ps, err := s.paramSetForTile(ctx, tileNumber) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| _ = s.orderedParamSetCache.Add(tileNumber, orderedParamSetCacheEntry{ |
| expires: now.Add(orderedParamSetCacheTTL), |
| paramSet: ps, |
| }) |
| |
| return ps, nil |
| } |
| |
| // GetSource implements the tracestore.TraceStore interface. |
| func (s *SQLTraceStore) GetSource(ctx context.Context, commitNumber types.CommitNumber, traceName string) (string, error) { |
| var filename string |
| traceID := traceIDForSQLFromTraceName(traceName) |
| |
| sourceContext := getSourceContext{ |
| MD5HexTraceID: traceID, |
| CommitNumber: commitNumber, |
| } |
| |
| var b bytes.Buffer |
| if err := s.unpreparedStatements[getSource].Execute(&b, sourceContext); err != nil { |
| return "", skerr.Wrapf(err, "failed to expand get source template") |
| } |
| sql := b.String() |
| |
| if err := s.db.QueryRow(ctx, sql).Scan(&filename); err != nil { |
| return "", skerr.Wrapf(err, "commitNumber=%d traceName=%q traceID=%q", commitNumber, traceName, traceID) |
| } |
| return filename, nil |
| } |
| |
| // GetSourceIds returns the source ids for the given traces and commits. |
| // The returned object is a map where the key is the name of the trace and value is a map of commit number to the source file name. |
| func (s *SQLTraceStore) GetSourceIds(ctx context.Context, commitNumbers []types.CommitNumber, traceNames []string) (map[string]map[types.CommitNumber]string, error) { |
| sourceInfo := map[string]map[types.CommitNumber]string{} |
| for _, traceName := range traceNames { |
| sourcesForTrace, err := s.GetSources(ctx, traceName, commitNumbers) |
| if err != nil { |
| return nil, err |
| } |
| sourceInfo[traceName] = sourcesForTrace |
| } |
| |
| return sourceInfo, nil |
| } |
| |
| // GetSources returns the source files for a given trace and list of commits. |
| func (s *SQLTraceStore) GetSources(ctx context.Context, traceName string, commits []types.CommitNumber) (map[types.CommitNumber]string, error) { |
| traceID := traceIDForSQLFromTraceName(traceName) |
| |
| sourceContext := getSourcesContext{ |
| MD5HexTraceID: traceID, |
| } |
| |
| var b bytes.Buffer |
| if err := s.unpreparedStatements[getSources].Execute(&b, sourceContext); err != nil { |
| return nil, skerr.Wrapf(err, "failed to expand get source template") |
| } |
| sql := b.String() |
| |
| var sb strings.Builder |
| for _, commit := range commits { |
| sb.WriteString(fmt.Sprintf("%d,", commit)) |
| } |
| commitString := sb.String() |
| sql = sql + fmt.Sprintf("(%s)", commitString[:len(commitString)-1]) |
| rows, err := s.db.Query(ctx, sql) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "commitNumber=%v traceName=%q traceID=%q", commits, traceName, traceID) |
| } |
| |
| sourceData := map[types.CommitNumber]string{} |
| for rows.Next() { |
| var commitNumber types.CommitNumber |
| var sourceFile string |
| if err := rows.Scan(&commitNumber, &sourceFile); err != nil { |
| return nil, err |
| } |
| sourceData[commitNumber] = sourceFile |
| } |
| |
| return sourceData, nil |
| } |
| |
| // GetLastNSources implements the tracestore.TraceStore interface. |
| func (s *SQLTraceStore) GetLastNSources(ctx context.Context, traceID string, n int) ([]tracestore.Source, error) { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.GetLastNSources") |
| defer span.End() |
| |
| traceIDAsBytes := traceIDForSQLInBytesFromTraceName(traceID) |
| rows, err := s.db.Query(ctx, s.statements[getLastNSources], traceIDAsBytes[:], n) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "Failed for traceID=%q and n=%d", traceID, n) |
| } |
| |
| ret := []tracestore.Source{} |
| for rows.Next() { |
| var filename string |
| var commitNumber types.CommitNumber |
| if err := rows.Scan(&filename, &commitNumber); err != nil { |
| return nil, skerr.Wrapf(err, "Failed scanning for traceID=%q and n=%d", traceID, n) |
| } |
| ret = append(ret, tracestore.Source{ |
| Filename: filename, |
| CommitNumber: commitNumber, |
| }) |
| } |
| if err := rows.Err(); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| return ret, nil |
| } |
| |
| // GetTraceIDsBySource implements the tracestore.TraceStore interface. |
| func (s *SQLTraceStore) GetTraceIDsBySource(ctx context.Context, sourceFilename string, tileNumber types.TileNumber) ([]string, error) { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.GetTraceIDsBySource") |
| defer span.End() |
| |
| rows, err := s.db.Query(ctx, s.statements[getTraceIDsBySource], sourceFilename, tileNumber) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "Failed for sourceFilename=%q and tileNumber=%d", sourceFilename, tileNumber) |
| } |
| |
| // We queried the Postings table, build up each traceid from all the |
| // key=value pairs returned. |
| var currentTraceIDAsBytes []byte |
| p := paramtools.Params{} |
| ret := []string{} |
| for rows.Next() { |
| var keyValue string |
| var traceIDAsBytes []byte |
| if err := rows.Scan(&keyValue, &traceIDAsBytes); err != nil { |
| return nil, skerr.Wrapf(err, "Failed scanning for sourceFilename=%q and tileNumber=%d", sourceFilename, tileNumber) |
| } |
| // If we hit a new trace_id then we have a complete traceID. |
| if !bytes.Equal(currentTraceIDAsBytes, traceIDAsBytes) { |
| if currentTraceIDAsBytes == nil { |
| // This is the first time going through this loop. |
| currentTraceIDAsBytes = make([]byte, len(traceIDAsBytes)) |
| } else { |
| // Since traceIDAsBytes changed we are done building up the |
| // params for the traceID, so convert the params into a string. |
| traceID, err := query.MakeKey(p) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| ret = append(ret, traceID) |
| } |
| p = paramtools.Params{} |
| copy(currentTraceIDAsBytes, traceIDAsBytes) |
| } |
| |
| // Add to the current Params. |
| parts := strings.SplitN(keyValue, "=", 2) |
| if len(parts) != 2 { |
| sklog.Warningf("Found invalid key=value pair in Postings: %q", keyValue) |
| continue |
| } |
| p[parts[0]] = parts[1] |
| |
| } |
| if err := rows.Err(); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| // Make sure to get the last trace id. |
| if currentTraceIDAsBytes != nil { |
| traceID, err := query.MakeKey(p) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| ret = append(ret, traceID) |
| } |
| |
| return ret, nil |
| } |
| |
| // countCommitInCommitNumberRange counts the number of commits in a given commit number range. |
| func (s *SQLTraceStore) countCommitInCommitNumberRange(ctx context.Context, begin, end types.CommitNumber) (int, error) { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.countCommitInCommitNumberRange") |
| defer span.End() |
| |
| var count int |
| if err := s.db.QueryRow(ctx, s.statements[countCommitInCommitNumberRange], begin, end).Scan(&count); err != nil { |
| return 0, skerr.Wrap(err) |
| } |
| return count, nil |
| } |
| |
| // OffsetFromCommitNumber implements the tracestore.TraceStore interface. |
| func (s *SQLTraceStore) OffsetFromCommitNumber(commitNumber types.CommitNumber) int32 { |
| return int32(commitNumber) % s.tileSize |
| } |
| |
| // QueryTraces implements the tracestore.TraceStore interface. |
| func (s *SQLTraceStore) QueryTraces(ctx context.Context, tileNumber types.TileNumber, q *query.Query, traceCache *tracecache.TraceCache) (types.TraceSet, []provider.Commit, error) { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.QueryTraces") |
| defer span.End() |
| |
| traceNames := make(chan string, queryTracesIDOnlyByIndexChannelSize) |
| |
| var pChan <-chan paramtools.Params |
| var err error |
| cacheTraceIds := false |
| if traceCache != nil { |
| sklog.Infof("Trace cache is enabled.") |
| pChan, err = s.getTraceIdChannelFromCache(ctx, traceCache, tileNumber, q) |
| if err != nil { |
| // If there is an error getting data from cache, log it and fall back to the regular db search. |
| sklog.Infof("Error retrieving trace id params from cache %v. Falling back to db search.", err) |
| pChan, err = s.QueryTracesIDOnly(ctx, tileNumber, q) |
| cacheTraceIds = true |
| } |
| } else { |
| pChan, err = s.QueryTracesIDOnly(ctx, tileNumber, q) |
| } |
| |
| if err != nil { |
| return nil, nil, skerr.Wrapf(err, "Failed to get list of traceIDs matching query.") |
| } |
| |
| // Start a Go routine that converts Params into a trace name and then feeds |
| // those trace names into the traceNames channel. |
| go func() { |
| defer timer.New("QueryTracesIDOnly - Complete").Stop() |
| traceIdsToCache := []paramtools.Params{} |
| for p := range pChan { |
| if cacheTraceIds { |
| traceIdsToCache = append(traceIdsToCache, p) |
| } |
| traceName, err := query.MakeKey(p) |
| if err != nil { |
| sklog.Warningf("Invalid trace name found in query response: %s", err) |
| continue |
| } |
| traceNames <- traceName |
| } |
| close(traceNames) |
| if cacheTraceIds && len(traceIdsToCache) > 0 && len(traceIdsToCache) <= maxTraceIdsInCache { |
| sklog.Infof("Adding %d traceIds to the cache for query %v", len(traceIdsToCache), q) |
| err := traceCache.CacheTraceIds(ctx, tileNumber, q, traceIdsToCache) |
| if err != nil { |
| // Log the error and continue. |
| sklog.Errorf("Error adding traceIds to the cache for query %v: %v", q, err) |
| } |
| } |
| }() |
| |
| beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize) |
| return s.readTracesByChannelForCommitRange(ctx, traceNames, beginCommit, endCommit) |
| } |
| |
| // planCount is used in restrictByCounting to find how many traces match each |
| // part of the query plan. |
| type planCount struct { |
| key string |
| values []string |
| count int64 |
| } |
| |
| // planCountSlice is a slice of planCounts, that is sortable, since we want to |
| // find the key in the plan with the smallest number of matches. |
| type planCountSlice []*planCount |
| |
| func (p planCountSlice) Len() int { return len(p) } |
| func (p planCountSlice) Less(i, j int) bool { |
| if p[i].count == p[j].count { |
| return strings.Compare(p[i].key, p[j].key) == -1 |
| } |
| return p[i].count < p[j].count |
| } |
| func (p planCountSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } |
| |
| // planDisposition encodes the disposition of the plan, i.e. is it still worth |
| // running, or can it be skipped. |
| type planDisposition int |
| |
| const ( |
| skippable planDisposition = iota |
| runnable |
| ) |
| |
| // restrictByCounting analyzes the query plan buy running each part of the plan |
| // under a count(*) query, and then returing the key of the part of the query |
| // with the smallest number of matches. |
| // |
| // An AND clause to be appended to a WHERE clause is returned that contains all |
| // the IDs of the part of the plan with the smallest number of matches, along |
| // with the name of the key that had the smallest number of matches, and the |
| // disposition of the plan. |
| // |
| // If the count queries take too long, or all the keys return too many matches, |
| // then both the returned clause and key name will be the empty string. |
| func (s *SQLTraceStore) restrictByCounting(ctx context.Context, tileNumber types.TileNumber, plan paramtools.ParamSet) (string, string, planDisposition) { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.restrictByCounting") |
| defer span.End() |
| |
| if len(plan) < 2 { |
| return "", "", runnable |
| } |
| |
| ctx, cancel := context.WithTimeout(ctx, countingQueryDuration) |
| defer cancel() |
| |
| // mutex protects planCounts and planDisposition. |
| var mutex sync.Mutex |
| // TODO(jcgregorio) To speed this up even more we should have an LRU cache |
| // with a timeout that caches (key,values,tileNumber) -> count. The value |
| // shouldn't change much and we don't need an exact count, only an |
| // approximation. |
| planCounts := make([]*planCount, 0, len(plan)) |
| planDisposition := runnable |
| |
| // For each key in the plan run a separate Go routine that counts how many |
| // traces in the tile match that query, storing the results for each key in |
| // planCounts. |
| var wg sync.WaitGroup |
| for key, values := range plan { |
| wg.Add(1) |
| go func(key string, values []string) { |
| defer wg.Done() |
| context := countMatchingTracesContext{ |
| TileNumber: tileNumber, |
| Key: key, |
| Values: values, |
| AsOf: "", |
| CountOptimizationThreshold: countOptimizationThreshold, |
| } |
| |
| // Expand the template for the SQL. |
| var b bytes.Buffer |
| if err := s.unpreparedStatements[countMatchingTraces].Execute(&b, context); err != nil { |
| sklog.Warningf("failed to expand countMatchingTraces template: %s", err) |
| return |
| } |
| sql := b.String() |
| row := s.db.QueryRow(ctx, sql) |
| var count int64 |
| if err := row.Scan(&count); err != nil { |
| sklog.Warningf("failed to retrieve count in countMatchingTraces: %s", err) |
| return |
| } |
| if count == 0 { |
| // There are no traces that match this part of the query, so we |
| // know no traces will match the entire query, so set our |
| // disposition to 'skippable' so the full query is never run. |
| mutex.Lock() |
| defer mutex.Unlock() |
| planDisposition = skippable |
| return |
| } |
| mutex.Lock() |
| defer mutex.Unlock() |
| planCounts = append(planCounts, &planCount{ |
| key: key, |
| values: values, |
| count: count, |
| }) |
| }(key, values) |
| } |
| wg.Wait() |
| if planDisposition == skippable { |
| return "", "", skippable |
| } |
| if len(planCounts) == 0 { |
| return "", "", runnable |
| } |
| |
| sort.Sort(planCountSlice(planCounts)) |
| |
| // optimal is the key with the smallest number of matches to the plan. |
| optimal := planCounts[0] |
| |
| s.queryRestrictionMinKeyInPlan.Observe(float64(optimal.count)) |
| span.AddAttributes(trace.Float64Attribute("minKeyInPlan", float64(optimal.count))) |
| |
| // We want to avoid create too large of an "AND IN ()" clause, so if there |
| // are too many matches for the optimal key then just skip the restrict |
| // clause completely. |
| if optimal.count >= countOptimizationThreshold { |
| return "", "", runnable |
| } |
| |
| // Now that we know the key in the plan with the smallest number of matching |
| // trace_ids, we can go back to the database and query for all those |
| // matching trace_ids. |
| context := queryTraceIDsByKeyValueContext{ |
| TileNumber: tileNumber, |
| Key: optimal.key, |
| Values: optimal.values, |
| AsOf: "", |
| } |
| |
| // Expand the template for the SQL. |
| var b bytes.Buffer |
| if err := s.unpreparedStatements[queryTraceIDsByKeyValue].Execute(&b, context); err != nil { |
| sklog.Warningf("Failed to expand queryTraceIDsByKeyValue template: %s", err) |
| return "", "", runnable |
| |
| } |
| sql := b.String() |
| rows, err := s.db.Query(ctx, sql) |
| if err != nil { |
| return "", "", runnable |
| } |
| |
| ids := make([]traceIDForSQL, 0, optimal.count) |
| for rows.Next() { |
| var traceIDAsBytes []byte |
| if err := rows.Scan(&traceIDAsBytes); err != nil { |
| sklog.Errorf("Failed to scan traceIDAsBytes: %s", skerr.Wrap(err)) |
| return "", "", runnable |
| } |
| if err := rows.Err(); err != nil { |
| if err == pgx.ErrNoRows { |
| return "", "", runnable |
| } |
| sklog.Errorf("Failed while reading traceIDAsBytes: %s", skerr.Wrap(err)) |
| return "", "", runnable |
| } |
| ids = append(ids, traceIDForSQLFromTraceIDAsBytes(traceIDAsBytes)) |
| } |
| |
| // Now format the matching trace_ids for the optimal key into an "AND |
| // trace_id IN (...)" clause to speed up all the other queries in the plan. |
| b.Reset() |
| err = s.unpreparedStatements[restrictClause].Execute(&b, restrictClauseContext{ |
| Key: optimal.key, |
| Values: ids, |
| }) |
| if err != nil { |
| sklog.Errorf("Failed to expand the restrictClause template: %s", err) |
| return "", "", runnable |
| } |
| s.queryUsesRestrictClause.Inc(1) |
| return b.String(), optimal.key, runnable |
| } |
| |
| // QueryTracesIDOnly implements the tracestore.TraceStore interface. |
| func (s *SQLTraceStore) QueryTracesIDOnly(ctx context.Context, tileNumber types.TileNumber, q *query.Query) (<-chan paramtools.Params, error) { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.QueryTracesIDOnly") |
| defer span.End() |
| |
| defer timer.New("QueryTracesIDOnlyByIndex").Stop() |
| outParams := make(chan paramtools.Params, queryTracesIDOnlyByIndexChannelSize) |
| if q.Empty() { |
| close(outParams) |
| return outParams, skerr.Fmt("Can't run QueryTracesIDOnlyByIndex for the empty query.") |
| } |
| |
| if s.inMemoryTraceParams != nil { |
| s.inMemoryTraceParams.QueryTraceIDs(ctx, tileNumber, q, outParams) |
| return outParams, nil |
| } |
| |
| ps, err := s.GetParamSet(ctx, tileNumber) |
| if err != nil { |
| close(outParams) |
| return outParams, skerr.Wrap(err) |
| } |
| |
| plan, err := q.QueryPlan(ps) |
| if err != nil { |
| // Not an error, we just won't match anything in this tile. |
| // |
| // The plan may be invalid because it is querying with keys or values |
| // that don't appear in a tile, which means the query won't work on this |
| // tile, but it may still work on other tiles, so we just don't return |
| // any results for this tile. |
| close(outParams) |
| return outParams, nil |
| } |
| if len(plan) == 0 { |
| // We won't match anything in this tile. |
| sklog.Info("QueryPlan returns an empty list") |
| close(outParams) |
| return outParams, nil |
| } |
| |
| // Sanitize our inputs. |
| if err := query.ValidateParamSet(plan); err != nil { |
| return nil, skerr.Wrapf(err, "invalid query %#v", *q) |
| } |
| |
| // Do a quick pre-flight to find if we can add a "AND trace_id IN (...)" |
| // clause to all query parts to speed them up. |
| traceIDRestriction, skipKey, planDisposition := s.restrictByCounting(ctx, tileNumber, plan) |
| if planDisposition == skippable { |
| // We know this query won't match any traces in this tile. |
| sklog.Info("restrictByCounting returns an skippable planDisposition") |
| close(outParams) |
| return outParams, nil |
| } |
| |
| optimizeSQLTraceStore := config.Config != nil && config.Config.OptimizeSQLTraceStore |
| |
| // This query is done in two parts because the CDB query planner seems to |
| // pick a really bad plan a large percentage of the time. |
| |
| // First find the encoded trace ids that match the query. Break apart the |
| // QueryPlan and do each group of OR's as individual queries to the |
| // database, but then stream the results and do the ANDs here on the server. |
| // That's because CDB complains about the amount of RAM that doing the AND |
| // can require. For example, the query 'source_type=svg&sub_result=min_ms' |
| // requires merging two lists that are both over 200k. |
| unionChannels := []<-chan traceIDForSQL{} |
| i := 0 |
| for key, values := range plan { |
| // If we are using a restrict clause then all the trace_ids for that key |
| // are included all the other queries, so we can skip querying on that |
| // key directly. |
| if key == skipKey { |
| continue |
| } |
| |
| // Expand the template for the SQL. |
| var b bytes.Buffer |
| // Query trace ids through index by_key_value if the traceIDRestriction is empty, |
| // otherwise, query trace ids through the primary key, which will reduce the SQL query statement time. |
| if len(traceIDRestriction) == 0 { |
| context := queryTraceIDsByKeyValueContext{ |
| TileNumber: tileNumber, |
| Key: key, |
| Values: values, |
| AsOf: "", |
| } |
| if err := s.unpreparedStatements[queryTraceIDsByKeyValue].Execute(&b, context); err != nil { |
| return nil, skerr.Wrapf(err, "failed to expand queryTraceIDsByKeyValue template") |
| } |
| } else { |
| context := queryTraceIDsContext{ |
| TileNumber: tileNumber, |
| Key: key, |
| Values: values, |
| AsOf: "", |
| RestrictClause: traceIDRestriction, |
| } |
| if err := s.unpreparedStatements[queryTraceIDs].Execute(&b, context); err != nil { |
| return nil, skerr.Wrapf(err, "failed to expand queryTraceIDs template") |
| } |
| } |
| sql := b.String() |
| ch := make(chan traceIDForSQL) |
| if optimizeSQLTraceStore { |
| ch = make(chan traceIDForSQL, queryTracesIDOnlyByIndexChannelSize) |
| } |
| unionChannels = append(unionChannels, ch) |
| |
| go func(ch chan traceIDForSQL, sql string) { |
| _, span := trace.StartSpan(ctx, "sqltracestore.QueryTracesIDOnly.PerKeyWorker") |
| defer span.End() |
| |
| defer close(ch) |
| |
| queryCtx, querySpan := trace.StartSpan(ctx, "sqltracestore.QueryTracesIDOnly.ExecuteSQLQuery") |
| rows, err := s.db.Query(queryCtx, sql) |
| querySpan.End() |
| if err != nil { |
| sklog.Infof("Error querying traceIds: %v", err) |
| return |
| } |
| for rows.Next() { |
| var traceIDAsBytes []byte |
| if err := rows.Scan(&traceIDAsBytes); err != nil { |
| sklog.Errorf("Failed to scan traceIDAsBytes: %s", skerr.Wrap(err)) |
| return |
| } |
| if err := rows.Err(); err != nil { |
| if err == pgx.ErrNoRows { |
| return |
| } |
| sklog.Errorf("Failed while reading traceIDAsBytes: %s", skerr.Wrap(err)) |
| return |
| } |
| ch <- traceIDForSQLFromTraceIDAsBytes(traceIDAsBytes) |
| } |
| }(ch, sql) |
| i++ |
| } |
| |
| // Now AND together the results of all the unionChannels. |
| traceIDsCh := newIntersect(ctx, unionChannels) |
| |
| // traceIDsCh supplies the relevant trace ids matching the query. |
| // Now let's collect the unique traceIds from the channel and then get the params |
| // for those traces. |
| traceIdsMap := map[traceIDForSQL]bool{} |
| uniqueTraceIds := []string{} |
| for hexEncodedTraceID := range traceIDsCh { |
| if _, ok := traceIdsMap[hexEncodedTraceID]; !ok { |
| uniqueTraceIds = append(uniqueTraceIds, string(hexEncodedTraceID)) |
| } |
| traceIdsMap[hexEncodedTraceID] = true |
| } |
| // Populate the outParams channel with the params for the traceIds. |
| err = s.populateParamsForTraces(ctx, uniqueTraceIds, outParams) |
| if err != nil { |
| sklog.Errorf("Error converting traceIds to params: %v", err) |
| return outParams, err |
| } |
| |
| return outParams, nil |
| } |
| |
| // populateParamsForTraces reads the params for the hex encoded traceIds and posts them on |
| // the outParams channel. |
| func (s *SQLTraceStore) populateParamsForTraces(ctx context.Context, traceIds []string, outParams chan paramtools.Params) error { |
| // The goroutine below handles reading of the trace params in chunks. |
| // We run this in a separate thread so that the upstream code that reads the outParams channel |
| // can start reading this information without having to wait for all the traceIds to be completely read. |
| go func() { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.ReadTraceParams") |
| defer span.End() |
| |
| // Close the channel when this goroutine completes signalling end of data. |
| defer close(outParams) |
| |
| span.AddAttributes(trace.Int64Attribute("trace_count", int64(len(traceIds)))) |
| err := util.ChunkIterParallelPool(ctx, len(traceIds), queryTraceParamsChunkSize, poolSize, func(ctx context.Context, startIdx, endIdx int) error { |
| traceIdChunk := traceIds[startIdx:endIdx] |
| params, err := s.traceParamStore.ReadParams(ctx, traceIdChunk) |
| if err != nil { |
| sklog.Errorf("Error reading params:%v", err) |
| return err |
| } |
| |
| // Report the params for the current chunk. |
| for _, param := range params { |
| outParams <- param |
| } |
| |
| return nil |
| }) |
| if err != nil { |
| sklog.Errorf("Error retrieving trace ids: %v", err) |
| } |
| }() |
| return nil |
| } |
| |
| // ReadTraces implements the tracestore.TraceStore interface. |
| func (s *SQLTraceStore) ReadTraces(ctx context.Context, tileNumber types.TileNumber, traceNames []string) (types.TraceSet, []provider.Commit, error) { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.ReadTraces") |
| defer span.End() |
| |
| defer timer.New("ReadTraces").Stop() |
| |
| beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize) |
| return s.ReadTracesForCommitRange(ctx, traceNames, beginCommit, endCommit) |
| } |
| |
| // ReadTracesForCommitRange implements the tracestore.TraceStore interface. |
| func (s *SQLTraceStore) ReadTracesForCommitRange(ctx context.Context, traceNames []string, beginCommit types.CommitNumber, endCommit types.CommitNumber) (types.TraceSet, []provider.Commit, error) { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.ReadTracesForCommitRange") |
| defer span.End() |
| |
| defer timer.New("ReadTraces").Stop() |
| |
| traceNamesChannel := make(chan string, len(traceNames)) |
| |
| for _, traceName := range traceNames { |
| traceNamesChannel <- traceName |
| } |
| close(traceNamesChannel) |
| |
| return s.readTracesByChannelForCommitRange(ctx, traceNamesChannel, beginCommit, endCommit) |
| } |
| |
| // readTracesByChannelForCommitRange reads the traceNames from a channel so we |
| // don't have to wait for the full list of trace ids to be ready first. |
| // |
| // It works by reading in a number of traceNames into a chunk and then passing |
| // that chunk of trace names to a worker pool that reads all the trace values |
| // for the given trace names. |
| func (s *SQLTraceStore) readTracesByChannelForCommitRange(ctx context.Context, traceNames <-chan string, beginCommit types.CommitNumber, endCommit types.CommitNumber) (types.TraceSet, []provider.Commit, error) { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.readTracesByChannelForCommitRange") |
| defer span.End() |
| |
| // The return value, protected by mutex. |
| ret := types.TraceSet{} |
| |
| // Validate the begin and end commit numbers. |
| if beginCommit > endCommit { |
| // Empty the traceNames channel. |
| for range traceNames { |
| } |
| return nil, nil, skerr.Fmt("Invalid commit range, [%d, %d] should be [%d, %d]", beginCommit, endCommit, endCommit, beginCommit) |
| } |
| |
| commits, err := s.commitSliceFromCommitNumberRange(ctx, beginCommit, endCommit) |
| if err != nil { |
| return nil, nil, skerr.Fmt("Cannot count commit within the commit range, [%d, %d]", beginCommit, endCommit) |
| } |
| |
| // Map from the [md5.Size]byte representation of a trace id to the trace name. |
| // |
| // Protected by mutex. |
| traceNameMap := map[traceIDForSQLInBytes]string{} |
| |
| // Protects traceNameMap and ret. |
| var mutex sync.Mutex |
| |
| // chunkChannel is used to distribute work to the workers. |
| chunkChannel := make(chan []traceIDForSQL, queryTracesIDOnlyByIndexChannelSize) |
| |
| // Start the workers that do the actual querying when given chunks of trace ids. |
| g, ctx := errgroup.WithContext(ctx) |
| for i := 0; i < poolSize; i++ { |
| g.Go(func() error { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.ReadTraces.Worker") |
| defer span.End() |
| |
| for chunk := range chunkChannel { |
| if err := s.readTracesChunk(ctx, beginCommit, endCommit, commits, chunk, &mutex, traceNameMap, &ret); err != nil { |
| return skerr.Wrap(err) |
| } |
| } |
| return nil |
| }) |
| } |
| |
| // Now break up the incoming trace ids into chuck for the workers. |
| currentChunk := []traceIDForSQL{} |
| for key := range traceNames { |
| if !query.IsValid(key) { |
| sklog.Errorf("Invalid key: %q", key) |
| continue |
| } |
| |
| mutex.Lock() |
| // Make space in ret for the values. |
| ret[key] = vec32.New(len(commits)) |
| |
| // Update the map from the full name of the trace and id in traceIDForSQLInBytes form. |
| traceNameMap[traceIDForSQLInBytesFromTraceName(key)] = key |
| mutex.Unlock() |
| |
| trID := traceIDForSQLFromTraceName(key) |
| currentChunk = append(currentChunk, trID) |
| if len(currentChunk) >= queryTracesChunkSize { |
| chunkChannel <- currentChunk |
| currentChunk = []traceIDForSQL{} |
| } |
| } |
| // Now handle any remaining values in the currentChunk. |
| if len(currentChunk) >= 0 { |
| chunkChannel <- currentChunk |
| } |
| close(chunkChannel) |
| |
| if err := g.Wait(); err != nil { |
| span.SetStatus(trace.Status{ |
| Code: trace.StatusCodeInternal, |
| Message: err.Error(), |
| }) |
| // Empty the traceNames channel. |
| for range traceNames { |
| } |
| return nil, nil, skerr.Wrap(err) |
| } |
| |
| return ret, commits, nil |
| } |
| |
| // readTracesChunk updates the passed in TraceSet with all the values loaded for |
| // the given slice of trace ids. |
| // |
| // The mutex protects 'ret' and 'traceNameMap'. |
| func (s *SQLTraceStore) readTracesChunk(ctx context.Context, beginCommit types.CommitNumber, endCommit types.CommitNumber, commits []provider.Commit, chunk []traceIDForSQL, mutex *sync.Mutex, traceNameMap map[traceIDForSQLInBytes]string, ret *types.TraceSet) error { |
| if len(chunk) == 0 { |
| return nil |
| } |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.ReadTraces.Chunk") |
| span.AddAttributes(trace.Int64Attribute("chunk_length", int64(len(chunk)))) |
| defer span.End() |
| // Populate the context for the SQL template. |
| readTracesContext := readTracesContext{ |
| BeginCommitNumber: beginCommit, |
| EndCommitNumber: endCommit, |
| TraceIDs: chunk, |
| AsOf: "", |
| } |
| |
| // Expand the template for the SQL. |
| var b bytes.Buffer |
| if err := s.unpreparedStatements[readTraces].Execute(&b, readTracesContext); err != nil { |
| return skerr.Wrapf(err, "failed to expand readTraces template") |
| } |
| |
| sql := b.String() |
| // Execute the query. |
| queryCtx, querySpan := trace.StartSpan(ctx, "sqltracestore.ReadTraces.Chunk.ExecuteSQLQuery") |
| rows, err := s.db.Query(queryCtx, sql) |
| querySpan.End() |
| if err != nil { |
| return skerr.Wrapf(err, "SQL: %q", sql) |
| } |
| |
| // Create a local map to store results from this chunk. This avoids |
| // holding the main lock while iterating over every row. |
| localTraces := types.TraceSet{} |
| var traceIDArray traceIDForSQLInBytes |
| commitToIndexMap := map[types.CommitNumber]int{} |
| for i, commit := range commits { |
| commitToIndexMap[commit.CommitNumber] = i |
| } |
| |
| for rows.Next() { |
| var traceIDInBytes []byte |
| var commitNumber types.CommitNumber |
| var val float64 |
| if err := rows.Scan(&traceIDInBytes, &commitNumber, &val); err != nil { |
| return skerr.Wrap(err) |
| } |
| |
| // pgx can't Scan into an array, but Go can't use a slice as a map key, so |
| // we Scan into a byte slice and then copy into a byte array to use |
| // as the index into the map. |
| copy(traceIDArray[:], traceIDInBytes) |
| |
| // Note: We read traceNameMap without a lock. This is safe because the map is |
| // fully populated before the goroutines are dispatched and is not written to after. |
| traceName := traceNameMap[traceIDArray] |
| |
| if localTraces[traceName] == nil { |
| localTraces[traceName] = vec32.New(len(commits)) |
| } |
| localTraces[traceName][commitToIndexMap[commitNumber]] = float32(val) |
| } |
| if err := rows.Err(); err != nil { |
| return skerr.Wrap(err) |
| } |
| |
| mutex.Lock() |
| defer mutex.Unlock() |
| |
| // Merge the locally collected results into the final shared result map. |
| for traceName, localValues := range localTraces { |
| // The slice in the final 'ret' map was already created before this goroutine started. |
| // We just need to carefully copy the values we found into it. |
| for i, v := range localValues { |
| if v != vec32.MissingDataSentinel { |
| (*ret)[traceName][i] = v |
| } |
| } |
| } |
| |
| return nil |
| } |
| |
| // TileNumber implements the tracestore.TraceStore interface. |
| func (s *SQLTraceStore) TileNumber(commitNumber types.CommitNumber) types.TileNumber { |
| return types.TileNumberFromCommitNumber(commitNumber, s.tileSize) |
| } |
| |
| // TileSize implements the tracestore.TraceStore interface. |
| func (s *SQLTraceStore) TileSize() int32 { |
| return s.tileSize |
| } |
| |
| // TraceCount implements the tracestore.TraceStore interface. |
| func (s *SQLTraceStore) TraceCount(ctx context.Context, tileNumber types.TileNumber) (int64, error) { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.TraceCount") |
| defer span.End() |
| |
| var ret int64 |
| err := s.db.QueryRow(ctx, s.statements[traceCount], tileNumber).Scan(&ret) |
| span.AddAttributes(trace.Int64Attribute("count", ret)) |
| return ret, skerr.Wrap(err) |
| } |
| |
| // updateSourceFile writes the filename into the SourceFiles table and returns |
| // the sourceFileIDFromSQL of that filename. |
| func (s *SQLTraceStore) updateSourceFile(ctx context.Context, filename string) (sourceFileIDFromSQL, error) { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.updateSourceFile") |
| defer span.End() |
| |
| ret := badSourceFileIDFromSQL |
| // We want to ensure that there is only one entry per source file. |
| // We do that by checking for existence of a row first and do the insert |
| // only if there are no rows. |
| err := s.db.QueryRow(ctx, s.statements[getSourceFileID], filename).Scan(&ret) |
| if err == pgx.ErrNoRows { |
| _, err = s.db.Exec(ctx, s.statements[insertIntoSourceFiles], filename) |
| |
| if err != nil { |
| return ret, skerr.Wrap(err) |
| } |
| |
| // We can potentially get rid of this read by returning the id in the |
| // insert statement above. |
| err = s.db.QueryRow(ctx, s.statements[getSourceFileID], filename).Scan(&ret) |
| if err != nil { |
| return ret, skerr.Wrap(err) |
| } |
| } else if err != nil { |
| return ret, skerr.Wrap(err) |
| } |
| |
| return ret, nil |
| } |
| |
| func cacheKeyForPostings(tileNumber types.TileNumber, traceID traceIDForSQL) string { |
| return fmt.Sprintf("%d-%s", tileNumber, traceID) |
| } |
| |
| func cacheKeyForParamSets(tileNumber types.TileNumber, paramKey, paramValue string) string { |
| return fmt.Sprintf("%d-%q-%q", tileNumber, paramKey, paramValue) |
| } |
| |
| // WriteTraces implements the tracestore.TraceStore interface. |
| func (s *SQLTraceStore) WriteTraces(ctx context.Context, commitNumber types.CommitNumber, params []paramtools.Params, values []float32, ps paramtools.ParamSet, source string, _ time.Time) error { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.WriteTraces") |
| defer span.End() |
| |
| defer timer.NewWithSummary("perfserver_sqltracestore_write_traces", s.writeTracesMetric).Stop() |
| |
| ctx, cancel := context.WithTimeout(ctx, 60*time.Minute) |
| defer cancel() |
| |
| tileNumber := s.TileNumber(commitNumber) |
| |
| // Write ParamSet. |
| paramSetsContext := []insertIntoParamSetsContext{} |
| for paramKey, paramValues := range ps { |
| for _, paramValue := range paramValues { |
| cacheKey := cacheKeyForParamSets(tileNumber, paramKey, paramValue) |
| if !s.cache.Exists(cacheKey) { |
| s.cacheMissMetric.Inc(1) |
| paramSetsContext = append(paramSetsContext, insertIntoParamSetsContext{ |
| TileNumber: tileNumber, |
| Key: paramKey, |
| Value: paramValue, |
| cacheKey: cacheKey, |
| }) |
| } |
| } |
| } |
| |
| if len(paramSetsContext) > 0 { |
| err := util.ChunkIter(len(paramSetsContext), writeTracesParamSetsChunkSize, func(startIdx int, endIdx int) error { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.WriteTraces.writeParamSetsChunk") |
| defer span.End() |
| |
| chunk := paramSetsContext[startIdx:endIdx] |
| var b bytes.Buffer |
| if err := s.unpreparedStatements[insertIntoParamSets].Execute(&b, chunk); err != nil { |
| return skerr.Wrapf(err, "failed to expand paramsets template in slice [%d, %d]", startIdx, endIdx) |
| } |
| |
| sql := b.String() |
| |
| sklog.Infof("About to write %d paramset entries with sql of length %d", endIdx-startIdx, len(sql)) |
| if _, err := s.db.Exec(ctx, sql); err != nil { |
| return skerr.Wrapf(err, "Executing: %q", b.String()) |
| } |
| for _, ele := range chunk { |
| s.cache.Add(ele.cacheKey) |
| } |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| } |
| |
| // Write the source file entry and the id. |
| sourceID, err := s.updateSourceFile(ctx, source) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| |
| // Build the 'context's which will be used to populate the SQL templates for |
| // the TraceValues and Postings tables. |
| t := timer.NewWithSummary("perfserver_sqltracestore_build_traces_contexts", s.buildTracesContextsMetric) |
| valuesTemplateContext := make([]insertIntoTraceValuesContext, 0, len(params)) |
| postingsTemplateContext := []insertIntoPostingsContext{} // We have no idea how long this will be. |
| |
| traceParams := map[string]paramtools.Params{} |
| for i, p := range params { |
| traceName, err := query.MakeKey(p) |
| if err != nil { |
| sklog.Errorf("Somehow still invalid: %v", p) |
| continue |
| } |
| traceID := traceIDForSQLFromTraceName(traceName) |
| traceParams[string(traceID)] = p |
| valuesTemplateContext = append(valuesTemplateContext, insertIntoTraceValuesContext{ |
| MD5HexTraceID: traceID, |
| CommitNumber: commitNumber, |
| Val: values[i], |
| SourceFileID: sourceID, |
| }) |
| |
| cacheKey := cacheKeyForPostings(tileNumber, traceID) |
| if !s.cache.Exists(cacheKey) { |
| s.cacheMissMetric.Inc(1) |
| for paramKey, paramValue := range p { |
| postingsTemplateContext = append(postingsTemplateContext, insertIntoPostingsContext{ |
| TileNumber: tileNumber, |
| Key: paramKey, |
| Value: paramValue, |
| MD5HexTraceID: traceID, |
| cacheKey: cacheKey, |
| }) |
| } |
| } |
| } |
| t.Stop() |
| |
| // Now that the contexts are built, execute the SQL in batches. |
| defer timer.NewWithSummary("perfserver_sqltracestore_write_traces_sql_insert", s.writeTracesMetricSQL).Stop() |
| sklog.Infof("About to format %d postings names", len(params)) |
| |
| if len(postingsTemplateContext) > 0 { |
| var err error |
| err = util.ChunkIterParallelPool(ctx, len(postingsTemplateContext), writeTracesPostingsChunkSize, writePostingsParallelPoolSize, func(ctx context.Context, startIdx int, endIdx int) error { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.WriteTraces.writePostingsChunkParallel") |
| defer span.End() |
| |
| var b bytes.Buffer |
| if err := s.unpreparedStatements[insertIntoPostings].Execute(&b, postingsTemplateContext[startIdx:endIdx]); err != nil { |
| return skerr.Wrapf(err, "failed to expand postings template on slice [%d, %d]", startIdx, endIdx) |
| } |
| sql := b.String() |
| |
| if _, err := s.db.Exec(ctx, sql); err != nil { |
| return skerr.Wrapf(err, "Executing: %q", b.String()) |
| } |
| return nil |
| }) |
| |
| if err != nil { |
| return err |
| } |
| |
| for _, entry := range postingsTemplateContext { |
| s.cache.Add(entry.cacheKey) |
| } |
| } |
| |
| sklog.Infof("Writing %d trace params entries", len(traceParams)) |
| traceParamsError := s.traceParamStore.WriteTraceParams(ctx, traceParams) |
| if traceParamsError != nil { |
| // Log and ignore this error while we release and test this feature. |
| // TODO(ashwinpv): Return the error once we have fully tested. |
| sklog.Infof("Error writing trace params: %v", traceParamsError) |
| } |
| sklog.Infof("About to format %d trace values", len(valuesTemplateContext)) |
| |
| err = util.ChunkIterParallelPool(ctx, len(valuesTemplateContext), writeTracesValuesChunkSize, writeTracesParallelPoolSize, func(ctx context.Context, startIdx int, endIdx int) error { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.WriteTraces.writeTraceValuesChunkParallel") |
| defer span.End() |
| |
| var b bytes.Buffer |
| if err := s.unpreparedStatements[insertIntoTraceValues].Execute(&b, valuesTemplateContext[startIdx:endIdx]); err != nil { |
| return skerr.Wrapf(err, "failed to expand trace values template") |
| } |
| |
| sql := b.String() |
| if _, err := s.db.Exec(ctx, sql); err != nil { |
| return skerr.Wrapf(err, "Executing: %q", sql) |
| } |
| return nil |
| }) |
| |
| if err != nil { |
| return err |
| } |
| |
| sklog.Info("Finished writing trace values.") |
| |
| return nil |
| } |
| |
| func (s *SQLTraceStore) WriteTraces2(ctx context.Context, commitNumber types.CommitNumber, params []paramtools.Params, values []float32, ps paramtools.ParamSet, source string, _ time.Time) error { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.WriteTraces2") |
| defer span.End() |
| |
| defer timer.NewWithSummary("perfserver_sqltracestore_write_traces2", s.writeTracesMetric).Stop() |
| |
| ctx, cancel := context.WithTimeout(ctx, 60*time.Minute) |
| defer cancel() |
| |
| // Write the source file entry and the id. |
| sourceID, err := s.updateSourceFile(ctx, source) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| |
| // Build the 'context's which will be used to populate the SQL templates for |
| // the TraceValues table. |
| t := timer.NewWithSummary("perfserver_sqltracestore_build_traces_contexts2", s.buildTracesContextsMetric) |
| valuesTemplateContext2 := make([]insertIntoTraceValuesContext2, 0, len(params)) |
| |
| for i, p := range params { |
| traceName, err := query.MakeKey(p) |
| if err != nil { |
| sklog.Errorf("Somehow still invalid: %v", p) |
| continue |
| } |
| traceID := traceIDForSQLFromTraceName(traceName) |
| insertContext := insertIntoTraceValuesContext2{ |
| MD5HexTraceID: traceID, |
| CommitNumber: commitNumber, |
| Val: values[i], |
| SourceFileID: sourceID, |
| } |
| if v, ok := p["benchmark"]; ok { |
| insertContext.Benchmark = v |
| } |
| if v, ok := p["bot"]; ok { |
| insertContext.Bot = v |
| } |
| if v, ok := p["test"]; ok { |
| insertContext.Test = v |
| } |
| if v, ok := p["subtest_1"]; ok { |
| insertContext.Subtest_1 = v |
| } |
| if v, ok := p["subtest_2"]; ok { |
| insertContext.Subtest_2 = v |
| } |
| if v, ok := p["subtest_3"]; ok { |
| insertContext.Subtest_3 = v |
| } |
| valuesTemplateContext2 = append(valuesTemplateContext2, insertContext) |
| } |
| t.Stop() |
| |
| // Now that the contexts are built, execute the SQL in batches. |
| defer timer.NewWithSummary("perfserver_sqltracestore_write_traces2_sql_insert", s.writeTracesMetricSQL).Stop() |
| |
| sklog.Infof("About to format %d trace values 2", len(valuesTemplateContext2)) |
| |
| err = util.ChunkIterParallelPool(ctx, len(valuesTemplateContext2), writeTracesValuesChunkSize, writeTracesParallelPoolSize, func(ctx context.Context, startIdx int, endIdx int) error { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.WriteTraces2.writeTraceValuesChunkParallel") |
| defer span.End() |
| |
| var b bytes.Buffer |
| if err := s.unpreparedStatements[insertIntoTraceValues2].Execute(&b, valuesTemplateContext2[startIdx:endIdx]); err != nil { |
| return skerr.Wrapf(err, "failed to expand trace values2 template") |
| } |
| |
| sql := b.String() |
| if _, err := s.db.Exec(ctx, sql); err != nil { |
| return skerr.Wrapf(err, "Executing: %q", sql) |
| } |
| return nil |
| }) |
| |
| if err != nil { |
| return err |
| } |
| |
| sklog.Info("Finished writing trace values 2.") |
| |
| return nil |
| } |
| |
| // commitSliceFromCommitNumberRange returns a slice of Commits that fall in the range |
| // [begin, end], i.e inclusive of both begin and end. |
| func (s *SQLTraceStore) commitSliceFromCommitNumberRange(ctx context.Context, begin, end types.CommitNumber) ([]provider.Commit, error) { |
| ctx, span := trace.StartSpan(ctx, "sqltracestore.commitSliceFromCommitNumberRange") |
| defer span.End() |
| |
| s.commitSliceFromCommitNumberRangeCalled.Inc(1) |
| rows, err := s.db.Query(ctx, s.statements[getCommitsFromCommitNumberRange], begin, end) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "Failed to query for commit slice in range %v-%v", begin, end) |
| } |
| defer rows.Close() |
| ret := []provider.Commit{} |
| for rows.Next() { |
| var c provider.Commit |
| if err := rows.Scan(&c.CommitNumber, &c.GitHash, &c.Timestamp, &c.Author, &c.Subject); err != nil { |
| return nil, skerr.Wrapf(err, "Failed to read row in range %v-%v", begin, end) |
| } |
| ret = append(ret, c) |
| } |
| return ret, nil |
| } |
| |
| // deleteCommit delete a commit from Commits table. |
| // this method is for testing only. |
| func (s *SQLTraceStore) deleteCommit(ctx context.Context, commitNumber types.CommitNumber) error { |
| commandTag, err := s.db.Exec(ctx, s.statements[deleteCommit], commitNumber) |
| if err != nil { |
| return skerr.Wrapf(err, "Failed to delete the commit %v", commitNumber) |
| } |
| if commandTag.RowsAffected() != 1 { |
| return skerr.Fmt("Failed to delete the commit %v", commitNumber) |
| } |
| return nil |
| } |
| |
| // getTraceIdChannelFromCache returns a params channel containing the trace params retrieved from the cache. |
| func (s *SQLTraceStore) getTraceIdChannelFromCache(ctx context.Context, traceCache *tracecache.TraceCache, tileNumber types.TileNumber, query *query.Query) (<-chan paramtools.Params, error) { |
| traceIdsFromCache, err := traceCache.GetTraceIds(ctx, tileNumber, query) |
| if err != nil { |
| return nil, err |
| } |
| |
| traceIdsChannel := make(chan paramtools.Params, queryTracesIDOnlyByIndexChannelSize) |
| if traceIdsFromCache != nil { |
| sklog.Infof("Retrieved %d trace ids from cache for tile %d and query %v", len(traceIdsFromCache), tileNumber, query) |
| go func() { |
| for _, traceId := range traceIdsFromCache { |
| traceIdsChannel <- traceId |
| } |
| close(traceIdsChannel) |
| }() |
| return traceIdsChannel, nil |
| } else { |
| // Make sure the channel is closed in the case where there is a cache miss. |
| close(traceIdsChannel) |
| return nil, skerr.Fmt("No traceIds found in the cache.") |
| } |
| } |
| |
| // Confirm that *SQLTraceStore fulfills the tracestore.TraceStore interface. |
| var _ tracestore.TraceStore = (*SQLTraceStore)(nil) |