blob: a35e10f2f59107441505828d787390094e23fea3 [file] [log] [blame]
package sqltjstore
import (
"context"
"encoding/hex"
"strings"
"time"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"go.opencensus.io/trace"
"go.skia.org/infra/go/skerr"
ci "go.skia.org/infra/golden/go/continuous_integration"
"go.skia.org/infra/golden/go/sql"
"go.skia.org/infra/golden/go/sql/schema"
"go.skia.org/infra/golden/go/tjstore"
"go.skia.org/infra/golden/go/types"
)
type StoreImpl struct {
db *pgxpool.Pool
}
// New returns a SQL-backed tjstore.Store.
func New(db *pgxpool.Pool) *StoreImpl {
return &StoreImpl{
db: db,
}
}
// GetTryJobs implements the tjstore.Store interface.
func (s *StoreImpl) GetTryJobs(ctx context.Context, cID tjstore.CombinedPSID) ([]ci.TryJob, error) {
clID := sql.Qualify(cID.CRS, cID.CL)
psID := sql.Qualify(cID.CRS, cID.PS)
rows, err := s.db.Query(ctx, `
SELECT tryjob_id, system, display_name, last_ingested_data FROM Tryjobs
WHERE changelist_id = $1 AND patchset_id = $2
ORDER by display_name`, clID, psID)
if err != nil {
return nil, skerr.Wrapf(err, "fetching tryjobs for %#v", cID)
}
defer rows.Close()
var rv []ci.TryJob
for rows.Next() {
var row schema.TryjobRow
err := rows.Scan(&row.TryjobID, &row.System, &row.DisplayName, &row.LastIngestedData)
if err != nil {
return nil, skerr.Wrapf(err, "when fetching tryjobs for %#v", cID)
}
rv = append(rv, ci.TryJob{
SystemID: sql.Unqualify(row.TryjobID),
System: row.System,
DisplayName: row.DisplayName,
Updated: row.LastIngestedData.UTC(),
})
}
return rv, nil
}
// GetTryJob implements the tjstore.Store interface.
func (s *StoreImpl) GetTryJob(ctx context.Context, id, cisName string) (ci.TryJob, error) {
qID := sql.Qualify(cisName, id)
row := s.db.QueryRow(ctx, `
SELECT display_name, last_ingested_data FROM Tryjobs WHERE tryjob_id = $1`, qID)
var r schema.TryjobRow
err := row.Scan(&r.DisplayName, &r.LastIngestedData)
if err != nil {
if err == pgx.ErrNoRows {
return ci.TryJob{}, tjstore.ErrNotFound
}
return ci.TryJob{}, skerr.Wrapf(err, "querying for id %s", qID)
}
return ci.TryJob{
SystemID: id,
System: cisName,
DisplayName: r.DisplayName,
Updated: r.LastIngestedData.UTC(),
}, nil
}
// A possible optimization for RAM usage / network would be to request the option ids only
// and then run a followup request to fetch those and re-use the maps. This is the simplest
// possible query that might work.
const resultNoTime = `SELECT Traces.keys, digest, Options.keys, SecondaryBranchValues.tryjob_id FROM
SecondaryBranchValues JOIN Traces
ON SecondaryBranchValues.secondary_branch_trace_id = Traces.trace_id
JOIN Options
ON SecondaryBranchValues.options_id = Options.options_id
WHERE branch_name = $1 AND version_name = $2`
const resultWithTime = `
SELECT Traces.keys, digest, Options.keys, SecondaryBranchValues.tryjob_id FROM
SecondaryBranchValues JOIN Traces
ON SecondaryBranchValues.secondary_branch_trace_id = Traces.trace_id
JOIN Options
ON SecondaryBranchValues.options_id = Options.options_id
JOIN Tryjobs
ON SecondaryBranchValues.tryjob_id = Tryjobs.tryjob_id
WHERE branch_name = $1 AND version_name = $2 and last_ingested_data > $3`
// GetResults implements the tjstore.Store interface. Of note, it always returns a nil GroupParams
// because the way the data is stored, there is no way to know which params were ingested together.
func (s *StoreImpl) GetResults(ctx context.Context, cID tjstore.CombinedPSID, updatedAfter time.Time) ([]tjstore.TryJobResult, error) {
ctx, span := trace.StartSpan(ctx, "sqltjstore_GetResults")
defer span.End()
clID := sql.Qualify(cID.CRS, cID.CL)
psID := sql.Qualify(cID.CRS, cID.PS)
statement := resultNoTime
arguments := []interface{}{clID, psID}
if !updatedAfter.IsZero() {
statement = resultWithTime
arguments = append(arguments, updatedAfter)
}
rows, err := s.db.Query(ctx, statement, arguments...)
if err != nil {
return nil, skerr.Wrapf(err, "getting values for tryjobs on %#v", cID)
}
defer rows.Close()
var rv []tjstore.TryJobResult
for rows.Next() {
var digestBytes schema.DigestBytes
var result tjstore.TryJobResult
var qualifiedTryjobID pgtype.Text
err := rows.Scan(&result.ResultParams, &digestBytes, &result.Options, &qualifiedTryjobID)
if err != nil {
return nil, skerr.Wrapf(err, "scanning values for tryjobs %#v", cID)
}
result.Digest = types.Digest(hex.EncodeToString(digestBytes))
if qualifiedTryjobID.Status == pgtype.Present {
parts := strings.SplitN(qualifiedTryjobID.String, "_", 2)
result.System = parts[0]
result.TryjobID = parts[1]
}
rv = append(rv, result)
}
return rv, nil
}
// Make sure StoreImpl fulfills the tjstore.Store interface.
var _ tjstore.Store = (*StoreImpl)(nil)