blob: 44946dbc3c643155499150966d2eac8f178761c5 [file] [log] [blame]
package sqltracestore
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"github.com/jackc/pgx/v4"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/sql/pool"
"go.skia.org/infra/go/util"
"go.skia.org/infra/perf/go/tracestore"
)
type sqlstatement int
const (
insert sqlstatement = iota
read
readMultiple
getsourcefileid
getsourcefileidMultiple
)
// statements are already constructed SQL statements.
var sqlstatements = map[sqlstatement]string{
insert: `INSERT INTO
Metadata
VALUES ($1,$2)
ON CONFLICT (source_file_id) DO NOTHING`,
read: `SELECT links FROM Metadata WHERE source_file_id=$1`,
readMultiple: `SELECT source_file_id, links FROM Metadata WHERE source_file_id IN `,
getsourcefileid: `
SELECT
source_file_id
FROM
SourceFiles
WHERE
source_file=$1`,
getsourcefileidMultiple: `
SELECT
source_file, source_file_id
FROM
SourceFiles
WHERE
source_file IN `,
}
// SQLMetadataStore implements the MetadataStore interface.
type SQLMetadataStore struct {
// db is the SQL database instance.
db pool.Pool
}
// NewSQLMetadataStore returns a new instance of the SQLMetadataStore.
func NewSQLMetadataStore(db pool.Pool) *SQLMetadataStore {
return &SQLMetadataStore{
db: db,
}
}
// InsertMetadata inserts the metadata for the source file.
func (s *SQLMetadataStore) InsertMetadata(ctx context.Context, sourceFileName string, links map[string]string) error {
var sourceFileId int
row := s.db.QueryRow(ctx, sqlstatements[getsourcefileid], sourceFileName)
if err := row.Scan(&sourceFileId); err != nil {
if err == pgx.ErrNoRows {
return skerr.Wrapf(err, "Source file %s does not exist in the database.", sourceFileName)
}
return skerr.Wrap(err)
}
if _, err := s.db.Exec(ctx, sqlstatements[insert], sourceFileId, links); err != nil {
return skerr.Wrap(err)
}
return nil
}
// GetMetadata returns the metadata for the given source file.
func (s *SQLMetadataStore) GetMetadata(ctx context.Context, sourceFileName string) (map[string]string, error) {
var sourceFileId int
sourceFileRow := s.db.QueryRow(ctx, sqlstatements[getsourcefileid], sourceFileName)
if err := sourceFileRow.Scan(&sourceFileId); err != nil {
if err == pgx.ErrNoRows {
return nil, skerr.Wrapf(err, "Source file %s does not exist in the database.", sourceFileName)
}
return nil, skerr.Wrap(err)
}
row := s.db.QueryRow(ctx, sqlstatements[read], sourceFileId)
var links map[string]string
if err := row.Scan(&links); err != nil {
if err == pgx.ErrNoRows {
return nil, nil
}
return nil, skerr.Wrap(err)
}
return links, nil
}
// GetMetadataMultiple returns the metadata for the given list of source files.
// The return value is a map where the key is the source file name and value is the map of links.
func (s *SQLMetadataStore) GetMetadataMultiple(ctx context.Context, sourceFileNames []string) (map[string]map[string]string, error) {
fileLinksAggregate := map[string]map[string]string{}
mutex := sync.Mutex{}
err := util.ChunkIterParallelPool(ctx, len(sourceFileNames), 200, 50, func(ctx context.Context, startIdx, endIdx int) error {
sourceFileChunk := sourceFileNames[startIdx:endIdx]
var sb strings.Builder
for _, sourceFile := range sourceFileChunk {
sb.WriteString("'" + sourceFile + "', ")
}
argString := sb.String()
// Trim the last 2 chars (", ")
argString = argString[:len(argString)-2]
sourceFileRows, err := s.db.Query(ctx, sqlstatements[getsourcefileidMultiple]+fmt.Sprintf("(%s)", argString))
if err != nil {
if err == pgx.ErrNoRows {
return skerr.Wrapf(err, "Source files %s do not exist in the database.", sourceFileNames)
}
return skerr.Wrap(err)
}
sourceMap := map[int]string{}
sourceFileIds := []string{}
for sourceFileRows.Next() {
var sourceFileName string
var sourceFileId int
if err := sourceFileRows.Scan(&sourceFileName, &sourceFileId); err != nil {
return skerr.Wrapf(err, "Failed to scan source file row data.")
}
sourceMap[sourceFileId] = sourceFileName
sourceFileIds = append(sourceFileIds, strconv.Itoa(sourceFileId))
}
sql := sqlstatements[readMultiple] + fmt.Sprintf("(%s)", strings.Join(sourceFileIds, ","))
rows, err := s.db.Query(ctx, sql)
if err != nil {
if err == pgx.ErrNoRows {
return nil
}
return skerr.Wrap(err)
}
// Need to add a mutex to avoid concurrent map writes on fileLinksAggregate.
addMetadata := func(fileName string, links map[string]string) {
mutex.Lock()
defer mutex.Unlock()
fileLinksAggregate[fileName] = links
}
for rows.Next() {
var source_file_id int
var links map[string]string
if err := rows.Scan(&source_file_id, &links); err != nil {
return skerr.Wrapf(err, "Failed to scan links data.")
}
fileName := sourceMap[source_file_id]
addMetadata(fileName, links)
}
return nil
})
if err != nil {
return nil, err
}
return fileLinksAggregate, nil
}
func (s *SQLMetadataStore) GetMetadataForSourceFileIDs(ctx context.Context, sourceFileIDs []int64) (map[int64]map[string]string, error) {
if len(sourceFileIDs) == 0 {
sklog.Info("sourceFileIDs list is empty, returning")
return map[int64]map[string]string{}, nil
}
fileLinksAggregate := map[int64]map[string]string{}
mutex := sync.Mutex{}
err := util.ChunkIterParallelPool(ctx, len(sourceFileIDs), 200, 50, func(ctx context.Context, startIdx, endIdx int) error {
sourceFileIDChunk := sourceFileIDs[startIdx:endIdx]
sb := strings.Builder{}
for _, sourceFileID := range sourceFileIDChunk {
sb.WriteString(strconv.FormatInt(sourceFileID, 10) + ", ")
}
sql := sqlstatements[readMultiple] + fmt.Sprintf("(%s)", sb.String()[:len(sb.String())-2])
rows, err := s.db.Query(ctx, sql)
if err != nil {
if err == pgx.ErrNoRows {
return nil
}
return skerr.Wrap(err)
}
// Need to add a mutex to avoid concurrent map writes on fileLinksAggregate.
addMetadata := func(sourceFileId int64, links map[string]string) {
mutex.Lock()
defer mutex.Unlock()
fileLinksAggregate[sourceFileId] = links
}
for rows.Next() {
var source_file_id int64
var links map[string]string
if err := rows.Scan(&source_file_id, &links); err != nil {
return skerr.Wrapf(err, "Failed to scan links data.")
}
addMetadata(source_file_id, links)
}
return nil
})
if err != nil {
return nil, err
}
return fileLinksAggregate, nil
}
var _ tracestore.MetadataStore = (*SQLMetadataStore)(nil)