blob: 48768a135411d65eded37376b94c3363d590f893 [file] [log] [blame]
// Package databuilder provides a tool for generating test data in a way that is easy for
// a human to update and understand.
package databuilder
import (
"bytes"
"crypto/md5"
"crypto/sha1"
"encoding/hex"
"fmt"
"image"
"image/png"
"io"
"io/ioutil"
"path/filepath"
"sort"
"time"
"github.com/google/uuid"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/util"
"go.skia.org/infra/golden/go/diff"
"go.skia.org/infra/golden/go/sql"
"go.skia.org/infra/golden/go/sql/schema"
"go.skia.org/infra/golden/go/types"
)
// TablesBuilder has methods on it for generating trace data and other related data in a way
// that can be easily turned into SQL table rows.
type TablesBuilder struct {
// TileWidth is the number of commits with data that should be grouped together in a tile.
// In production, tiles will default to 100 dense commits wide.
TileWidth int
changelistBuilders []*ChangelistBuilder
commitsWithData *CommitBuilder
commitsWithNoData *GitCommitBuilder
diffMetrics []schema.DiffMetricRow
expectationBuilders []*ExpectationsBuilder
groupingKeys []string
ignoreRules []schema.IgnoreRuleRow
runeToDigest map[rune]schema.DigestBytes
traceBuilders []*TraceBuilder
}
// CommitsWithData returns a new CommitBuilder to which the trace data will be connected.
// It panics if called more than once.
func (b *TablesBuilder) CommitsWithData() *CommitBuilder {
if b.commitsWithData != nil {
logAndPanic("Cannot call CommitsWithData() more than once.")
}
b.commitsWithData = &CommitBuilder{}
return b.commitsWithData
}
// CommitsWithNoData returns a new CommitBuilder that will fill in the GitCommits table.
func (b *TablesBuilder) CommitsWithNoData() *GitCommitBuilder {
if b.commitsWithNoData != nil {
logAndPanic("Cannot call CommitsWithNoData() more than once.")
}
b.commitsWithNoData = &GitCommitBuilder{}
return b.commitsWithNoData
}
// SetDigests loads a mapping of runes to the digest that they represent. This allows
// specifying the trace history be done with a string of characters. If a rune is invalid or
// the digests are invalid, this will panic. It panics if called more than once.
func (b *TablesBuilder) SetDigests(runeToDigest map[rune]types.Digest) {
if b.runeToDigest != nil {
logAndPanic("Cannot call SetDigests() more than once.")
}
m := make(map[rune]schema.DigestBytes, len(runeToDigest))
for symbol, digest := range runeToDigest {
if symbol == '-' {
logAndPanic("Cannot map something to -")
}
d, err := sql.DigestToBytes(digest)
if err != nil {
logAndPanic("Invalid digest %q: %s", digest, err)
}
m[symbol] = d
}
b.runeToDigest = m
}
// SetGroupingKeys specifies which keys from a Trace's params will be used to define the grouping.
// It panics if called more than once.
func (b *TablesBuilder) SetGroupingKeys(fields ...string) {
if b.groupingKeys != nil {
logAndPanic("Cannot call SetGroupingKeys() more than once.")
}
b.groupingKeys = fields
}
// AddTracesWithCommonKeys returns a new TraceBuilder for building a set of related traces. This can
// be called more than once - all data will be combined at the end. It panics if any of its
// prerequisites have not been called.
func (b *TablesBuilder) AddTracesWithCommonKeys(params paramtools.Params) *TraceBuilder {
if b.commitsWithData == nil {
logAndPanic("Must add commits before traces")
}
if len(b.commitsWithData.commitRows) == 0 {
logAndPanic("Must specify at least one commit")
}
if len(b.groupingKeys) == 0 {
logAndPanic("Must add grouping keys before traces")
}
if len(b.runeToDigest) == 0 {
logAndPanic("Must add digests before traces")
}
tb := &TraceBuilder{
commits: b.commitsWithData.commitRows,
commonKeys: params,
symbolsToDigest: b.runeToDigest,
groupingKeys: b.groupingKeys,
}
b.traceBuilders = append(b.traceBuilders, tb)
return tb
}
// AddTriageEvent returns a builder for a series of triage events on the primary branch. These
// events will be attributed to the given user and timestamp.
func (b *TablesBuilder) AddTriageEvent(user, triageTime string) *ExpectationsBuilder {
if len(b.groupingKeys) == 0 {
logAndPanic("Must add grouping keys before expectations")
}
ts, err := time.Parse(time.RFC3339, triageTime)
if err != nil {
logAndPanic("Invalid triage time %q: %s", triageTime, err)
}
eb := &ExpectationsBuilder{
groupingKeys: b.groupingKeys,
record: &schema.ExpectationRecordRow{
ExpectationRecordID: uuid.New(),
BranchName: nil,
UserName: user,
TriageTime: ts,
},
}
b.expectationBuilders = append(b.expectationBuilders, eb)
return eb
}
// finalizeExpectations finds the final state of the expectations for each digest+grouping pair.
// This includes identifying untriaged digests. It panics if any of the ExpectationDeltas have
// preconditions.
func (b *TablesBuilder) finalizeExpectations() []*schema.ExpectationRow {
var expectations []*schema.ExpectationRow
find := func(g schema.GroupingID, d schema.DigestBytes) *schema.ExpectationRow {
for _, e := range expectations {
if bytes.Equal(e.GroupingID, g) && bytes.Equal(e.Digest, d) {
return e
}
}
return nil
}
for _, eb := range b.expectationBuilders {
eb.record.NumChanges = len(eb.deltas)
for _, ed := range eb.deltas {
existingRecord := find(ed.GroupingID, ed.Digest)
if existingRecord == nil {
expectations = append(expectations, &schema.ExpectationRow{
GroupingID: ed.GroupingID,
Digest: ed.Digest,
Label: ed.LabelAfter,
ExpectationRecordID: &ed.ExpectationRecordID,
})
continue
}
if existingRecord.Label != ed.LabelBefore {
logAndPanic("Expectation Delta precondition is incorrect. Label before was %s: %#v", existingRecord.Label, ed)
}
existingRecord.Label = ed.LabelAfter
existingRecord.ExpectationRecordID = &ed.ExpectationRecordID
}
}
// Fill in untriaged values.
for _, tb := range b.traceBuilders {
for _, trace := range tb.traceValues {
for _, tv := range trace {
if tv == nil {
continue
}
existingRecord := find(tv.GroupingID, tv.Digest)
if existingRecord == nil {
expectations = append(expectations, &schema.ExpectationRow{
GroupingID: tv.GroupingID,
Digest: tv.Digest,
Label: schema.LabelUntriaged,
})
}
}
}
}
return expectations
}
// ComputeDiffMetricsFromImages generates all the diff metrics from the observed images/digests
// and from the trace history. It reads the images from disk to use in order to compute the diffs.
// It is expected that the images are in the provided directory named [digest].png.
func (b *TablesBuilder) ComputeDiffMetricsFromImages(imgDir string, nowStr string) {
if len(b.diffMetrics) != 0 {
logAndPanic("Must call ComputeDiffMetricsFromImages only once")
}
if len(b.runeToDigest) == 0 {
logAndPanic("Must call ComputeDiffMetricsFromImages after SetDigests")
}
if len(b.traceBuilders) == 0 {
logAndPanic("Must call ComputeDiffMetricsFromImages after inserting trace data")
}
now, err := time.Parse(time.RFC3339, nowStr)
if err != nil {
logAndPanic("Invalid time for now: %s", err)
}
_, err = ioutil.ReadDir(imgDir)
if err != nil {
logAndPanic("Error reading directory %q: %s", imgDir, err)
}
images := make(map[types.Digest]*image.NRGBA, len(b.runeToDigest))
for _, db := range b.runeToDigest {
d := types.Digest(hex.EncodeToString(db))
images[d] = openNRGBAFromDisk(imgDir, d)
}
// Maps groupingID to digests that appear in that grouping across all the trace data.
toCompute := map[schema.MD5Hash]types.DigestSet{}
for _, tb := range b.traceBuilders {
for _, trace := range tb.traceValues {
for _, tv := range trace {
if tv != nil {
groupingID := sql.AsMD5Hash(tv.GroupingID)
if _, ok := toCompute[groupingID]; !ok {
toCompute[groupingID] = types.DigestSet{}
}
d := types.Digest(hex.EncodeToString(tv.Digest))
toCompute[groupingID][d] = true
}
}
}
}
// Add all the data from the CLs to the respective groupings.
for _, clb := range b.changelistBuilders {
for _, psb := range clb.patchsets {
for _, ps := range psb.dataPoints {
for _, dp := range ps {
if dp != nil {
groupingID := sql.AsMD5Hash(dp.GroupingID)
if _, ok := toCompute[groupingID]; !ok {
toCompute[groupingID] = types.DigestSet{}
}
d := types.Digest(hex.EncodeToString(dp.Digest))
toCompute[groupingID][d] = true
}
}
}
}
}
duplicates := map[string]bool{}
// For each grouping, compare each digest to every other digest and create the metric rows
// for that.
for _, xd := range toCompute {
digests := xd.Keys()
sort.Sort(digests)
for leftIdx, leftDigest := range digests {
leftImg := images[leftDigest]
for rightIdx := leftIdx + 1; rightIdx < len(digests); rightIdx++ {
rightDigest := digests[rightIdx]
rightImg := images[rightDigest]
key1 := string(leftDigest) + string(rightDigest)
key2 := string(rightDigest) + string(leftDigest)
if duplicates[key1] || duplicates[key2] {
continue
}
duplicates[key1] = true
duplicates[key2] = true
dm := diff.ComputeDiffMetrics(leftImg, rightImg)
if dm.NumDiffPixels == 0 {
logAndPanic("%s and %s aren't different", leftDigest, rightDigest)
}
ld, err := sql.DigestToBytes(leftDigest)
if err != nil {
logAndPanic(err.Error())
}
rd, err := sql.DigestToBytes(rightDigest)
if err != nil {
logAndPanic(err.Error())
}
b.diffMetrics = append(b.diffMetrics, schema.DiffMetricRow{
LeftDigest: ld,
RightDigest: rd,
NumPixelsDiff: dm.NumDiffPixels,
PercentPixelsDiff: dm.PixelDiffPercent,
MaxRGBADiffs: dm.MaxRGBADiffs,
MaxChannelDiff: max(dm.MaxRGBADiffs),
CombinedMetric: dm.CombinedMetric,
DimensionsDiffer: dm.DimDiffer,
Timestamp: now,
})
// And in the other order of left-right
b.diffMetrics = append(b.diffMetrics, schema.DiffMetricRow{
LeftDigest: rd,
RightDigest: ld,
NumPixelsDiff: dm.NumDiffPixels,
PercentPixelsDiff: dm.PixelDiffPercent,
MaxRGBADiffs: dm.MaxRGBADiffs,
MaxChannelDiff: max(dm.MaxRGBADiffs),
CombinedMetric: dm.CombinedMetric,
DimensionsDiffer: dm.DimDiffer,
Timestamp: now,
})
}
}
}
}
func max(d [4]int) int {
m := -1
for _, k := range d {
if k > m {
m = k
}
}
return m
}
func openNRGBAFromDisk(basePath string, digest types.Digest) *image.NRGBA {
var img *image.NRGBA
path := filepath.Join(basePath, string(digest)+".png")
err := util.WithReadFile(path, func(r io.Reader) error {
im, err := png.Decode(r)
if err != nil {
return skerr.Wrapf(err, "decoding %s", path)
}
img = diff.GetNRGBA(im)
return nil
})
if err != nil {
logAndPanic(err.Error())
}
return img
}
// AddIgnoreRule adds an ignore rule with the given information. It will be applied to traces during
// the generation of structs.
func (b *TablesBuilder) AddIgnoreRule(created, updated, updateTS, note string, query paramtools.ParamSet) uuid.UUID {
ts, err := time.Parse(time.RFC3339, updateTS)
if err != nil {
logAndPanic("invalid time %q: %s", updateTS, err)
}
if len(query) == 0 {
logAndPanic("Cannot use empty rule")
}
id := uuid.New()
b.ignoreRules = append(b.ignoreRules, schema.IgnoreRuleRow{
IgnoreRuleID: id,
CreatorEmail: created,
UpdatedEmail: updated,
Expires: ts,
Note: note,
Query: query.FrozenCopy(), // make a copy to ensure immutability
})
return id
}
func qualify(system, id string) string {
return system + "_" + id
}
// AddChangelist returns a builder for data belonging to a changelist.
func (b *TablesBuilder) AddChangelist(id, crs, owner, subject string, status schema.ChangelistStatus) *ChangelistBuilder {
if len(b.groupingKeys) == 0 {
logAndPanic("Must add grouping keys before traces")
}
cb := &ChangelistBuilder{
changelist: schema.ChangelistRow{
ChangelistID: qualify(crs, id),
System: crs,
Status: status,
OwnerEmail: owner,
Subject: subject,
},
groupingKeys: b.groupingKeys,
}
b.changelistBuilders = append(b.changelistBuilders, cb)
return cb
}
// Build should be called when all the data has been loaded in for a given setup. It will generate
// the SQL rows as represented in a schema.Tables. If any validation steps fail, it will panic.
func (b *TablesBuilder) Build() schema.Tables {
if b.TileWidth == 0 {
b.TileWidth = 100 // default
}
var tables schema.Tables
tables.CommitsWithData = b.commitsWithData.commitRows
applyTilingToCommits(tables.CommitsWithData, b.TileWidth)
tables.GitCommits = assembleGitCommits(b.commitsWithData, b.commitsWithNoData)
valuesAtHead := map[schema.MD5Hash]*schema.ValueAtHeadRow{}
for _, traceBuilder := range b.traceBuilders {
// Add unique rows from the tables gathered by tracebuilders.
for _, opt := range traceBuilder.options {
tables.Options = addOptionIfUnique(tables.Options, opt)
}
for _, g := range traceBuilder.groupings {
tables.Groupings = addGroupingIfUnique(tables.Groupings, g)
}
for _, sf := range traceBuilder.sourceFiles {
tables.SourceFiles = addSourceFileIfUnique(tables.SourceFiles, sf)
}
for _, t := range traceBuilder.traces {
var matchesAnyIgnoreRule schema.NullableBool
// prevent accidental duplicates on primary branch.
tables.Traces, matchesAnyIgnoreRule = b.addTrace(tables.Traces, t, false)
valuesAtHead[sql.AsMD5Hash(t.TraceID)] = &schema.ValueAtHeadRow{
TraceID: t.TraceID,
GroupingID: t.GroupingID,
Corpus: t.Corpus,
Keys: t.Keys,
MatchesAnyIgnoreRule: matchesAnyIgnoreRule,
}
}
for _, xtv := range traceBuilder.traceValues {
for _, tv := range xtv {
if tv != nil {
if tv.TraceID == nil || tv.GroupingID == nil {
panic("Incomplete data - you must call Keys()")
}
if tv.OptionsID == nil {
panic("Incomplete data - you must call Options*()")
}
if tv.SourceFileID == nil {
panic("Incomplete data - you must call IngestedFrom()")
}
tables.TraceValues = append(tables.TraceValues, *tv)
vHead := valuesAtHead[sql.AsMD5Hash(tv.TraceID)]
vHead.Digest = tv.Digest
vHead.MostRecentCommitID = tv.CommitID
vHead.OptionsID = tv.OptionsID
}
}
}
}
tables.TiledTraceDigests = b.computeTiledTraceDigests(tables.CommitsWithData)
tables.PrimaryBranchParams = b.computePrimaryBranchParams(tables.CommitsWithData)
exp := b.finalizeExpectations()
for _, e := range exp {
tables.Expectations = append(tables.Expectations, *e)
}
for _, eb := range b.expectationBuilders {
tables.ExpectationRecords = append(tables.ExpectationRecords, *eb.record)
tables.ExpectationDeltas = append(tables.ExpectationDeltas, eb.deltas...)
}
for _, cl := range b.changelistBuilders {
for _, ps := range cl.patchsets {
for _, opt := range ps.options {
tables.Options = addOptionIfUnique(tables.Options, opt)
}
for _, g := range ps.groupings {
tables.Groupings = addGroupingIfUnique(tables.Groupings, g)
}
for _, sf := range ps.sourceFiles {
tables.SourceFiles = addSourceFileIfUnique(tables.SourceFiles, sf)
}
for _, t := range ps.traces {
// duplicates allowed for different changelists.
tables.Traces, _ = b.addTrace(tables.Traces, t, true)
}
for _, tj := range ps.tryjobs {
tables.Tryjobs = append(tables.Tryjobs, tj)
if cl.changelist.LastIngestedData.Before(tj.LastIngestedData) {
cl.changelist.LastIngestedData = tj.LastIngestedData
}
}
for _, xdp := range ps.dataPoints {
for _, dp := range xdp {
tables.SecondaryBranchValues = append(tables.SecondaryBranchValues, *dp)
}
}
tables.Patchsets = append(tables.Patchsets, ps.patchset)
}
tables.Changelists = append(tables.Changelists, cl.changelist)
var clExpectations []schema.SecondaryBranchExpectationRow
find := func(g schema.GroupingID, d schema.DigestBytes) *schema.SecondaryBranchExpectationRow {
for i, e := range clExpectations {
if bytes.Equal(e.GroupingID, g) && bytes.Equal(e.Digest, d) {
return &clExpectations[i]
}
}
return nil
}
for _, clExpBuilder := range cl.expectationBuilders {
record := *clExpBuilder.record
record.NumChanges = len(clExpBuilder.deltas)
tables.ExpectationRecords = append(tables.ExpectationRecords, record)
tables.ExpectationDeltas = append(tables.ExpectationDeltas, clExpBuilder.deltas...)
for _, delta := range clExpBuilder.deltas {
existingRecord := find(delta.GroupingID, delta.Digest)
if existingRecord == nil {
clExpectations = append(clExpectations, schema.SecondaryBranchExpectationRow{
BranchName: *record.BranchName,
GroupingID: delta.GroupingID,
Digest: delta.Digest,
Label: delta.LabelAfter,
ExpectationRecordID: record.ExpectationRecordID,
})
continue
}
if existingRecord.Label != delta.LabelBefore {
logAndPanic("Expectation Delta precondition is incorrect. Label before was %s: %#v", existingRecord.Label, delta)
}
existingRecord.Label = delta.LabelAfter
existingRecord.ExpectationRecordID = delta.ExpectationRecordID
}
}
tables.SecondaryBranchExpectations = append(tables.SecondaryBranchExpectations, clExpectations...)
}
tables.SecondaryBranchParams = b.computeSecondaryBranchParams()
tables.DiffMetrics = b.diffMetrics
for _, atHead := range valuesAtHead {
tables.ValuesAtHead = append(tables.ValuesAtHead, *atHead)
}
tables.IgnoreRules = b.ignoreRules
return tables
}
func assembleGitCommits(withData *CommitBuilder, withNoData *GitCommitBuilder) []schema.GitCommitRow {
gitCommits := withData.gitRows
if withNoData != nil {
gitCommits = append(gitCommits, withNoData.gitRows...)
}
sort.Slice(gitCommits, func(i, j int) bool {
return gitCommits[i].CommitTime.Before(gitCommits[j].CommitTime)
})
return gitCommits
}
func applyTilingToCommits(commits []schema.CommitWithDataRow, tileWidth int) {
// We sort the commits by CommitID in lexicographical order. By definition of CommitID, this is
// the order they happen in.
sort.Slice(commits, func(i, j int) bool {
return commits[i].CommitID < commits[j].CommitID
})
// We start with tile 0 and keep filling up that tile until we roll over to the next one.
currTile := 0
itemsInTile := 0
for i := range commits {
commits[i].TileID = schema.TileID(currTile)
itemsInTile++
if itemsInTile >= tileWidth {
currTile++
itemsInTile = 0
}
}
}
func addOptionIfUnique(existing []schema.OptionsRow, opt schema.OptionsRow) []schema.OptionsRow {
for _, existingOpt := range existing {
if bytes.Equal(opt.OptionsID, existingOpt.OptionsID) {
return existing
}
}
return append(existing, opt)
}
func addGroupingIfUnique(existing []schema.GroupingRow, g schema.GroupingRow) []schema.GroupingRow {
for _, existingG := range existing {
if bytes.Equal(g.GroupingID, existingG.GroupingID) {
return existing
}
}
return append(existing, g)
}
func addSourceFileIfUnique(existing []schema.SourceFileRow, sf schema.SourceFileRow) []schema.SourceFileRow {
for _, existingSF := range existing {
if bytes.Equal(sf.SourceFileID, existingSF.SourceFileID) {
return existing
}
}
return append(existing, sf)
}
func (b *TablesBuilder) addTrace(existing []schema.TraceRow, t schema.TraceRow, allowDuplicates bool) ([]schema.TraceRow, schema.NullableBool) {
for _, existingT := range existing {
if bytes.Equal(t.TraceID, existingT.TraceID) {
if allowDuplicates {
// When adding traces from patchsets, it is expected for there to be duplicate
// traces because patchsets can build onto existing traces or make new ones.
return existing, existingT.MatchesAnyIgnoreRule
}
// Having a duplicate trace means that there are duplicate TraceValues entries
// and that is not intended.
logAndPanic("Duplicate trace found: %v", t.Keys)
}
}
ignored := false
for _, ir := range b.ignoreRules {
if ir.Query.MatchesParams(t.Keys) {
ignored = true
break
}
}
matches := schema.NBNull
if len(b.ignoreRules) > 0 {
if ignored {
matches = schema.NBTrue
} else {
matches = schema.NBFalse
}
}
t.MatchesAnyIgnoreRule = matches
return append(existing, t), matches
}
type tiledTraceDigest struct {
tileID schema.TileID
traceID schema.MD5Hash
digest schema.MD5Hash
groupingID schema.MD5Hash
}
func (b *TablesBuilder) computeTiledTraceDigests(commits []schema.CommitWithDataRow) []schema.TiledTraceDigestRow {
seenRows := map[tiledTraceDigest]bool{}
for _, builder := range b.traceBuilders {
for _, xtv := range builder.traceValues {
for _, tv := range xtv {
if tv == nil {
continue
}
tiledID := getTileID(tv.CommitID, commits)
seenRows[tiledTraceDigest{
tileID: tiledID,
traceID: sql.AsMD5Hash(tv.TraceID),
digest: sql.AsMD5Hash(tv.Digest),
groupingID: sql.AsMD5Hash(tv.GroupingID),
}] = true
}
}
}
var rv []schema.TiledTraceDigestRow
for row := range seenRows {
tID := make(schema.TraceID, len(schema.MD5Hash{}))
db := make(schema.DigestBytes, len(schema.MD5Hash{}))
gID := make(schema.GroupingID, len(schema.MD5Hash{}))
copy(tID, row.traceID[:])
copy(db, row.digest[:])
copy(gID, row.groupingID[:])
rv = append(rv, schema.TiledTraceDigestRow{
TileID: row.tileID,
TraceID: tID,
Digest: db,
GroupingID: gID,
})
}
return rv
}
func getTileID(id schema.CommitID, commits []schema.CommitWithDataRow) schema.TileID {
for _, c := range commits {
if c.CommitID == id {
return c.TileID
}
}
panic("Could not find tile for commit " + id)
}
// computePrimaryBranchParams goes through all trace data and returns the PrimaryBranchParamRow
// with the appropriately tiled key/value pairs that showed up in the trace keys and params.
func (b *TablesBuilder) computePrimaryBranchParams(commits []schema.CommitWithDataRow) []schema.PrimaryBranchParamRow {
seenRows := map[schema.PrimaryBranchParamRow]bool{}
for _, builder := range b.traceBuilders {
findTraceKeys := func(traceID schema.TraceID) paramtools.Params {
for _, tr := range builder.traces {
if bytes.Equal(tr.TraceID, traceID) {
return tr.Keys.Copy() // Copy to ensure immutability
}
}
logAndPanic("missing trace id %x", traceID)
return nil
}
findOptions := func(optID schema.OptionsID) paramtools.Params {
for _, opt := range builder.options {
if bytes.Equal(opt.OptionsID, optID) {
return opt.Keys.Copy() // Copy to ensure immutability
}
}
logAndPanic("missing options id %x", optID)
return nil
}
for _, xtv := range builder.traceValues {
for _, tv := range xtv {
if tv == nil {
continue
}
tiledID := getTileID(tv.CommitID, commits)
keys := findTraceKeys(tv.TraceID)
for k, v := range keys {
seenRows[schema.PrimaryBranchParamRow{
TileID: tiledID,
Key: k,
Value: v,
}] = true
}
options := findOptions(tv.OptionsID)
for k, v := range options {
seenRows[schema.PrimaryBranchParamRow{
TileID: tiledID,
Key: k,
Value: v,
}] = true
}
}
}
}
var rv []schema.PrimaryBranchParamRow
for row := range seenRows {
rv = append(rv, row)
}
return rv
}
// computeSecondaryBranchParams goes through every PS of every built CL and creates a row for each
// key/value pair from the traces and the options and returns it (without duplicates). This assumes
// that traces are only made on a patchset that has some data associated with it. It is simpler
// than computePrimaryBranchParams because we don't need to worry about tiling.
func (b *TablesBuilder) computeSecondaryBranchParams() []schema.SecondaryBranchParamRow {
seenRows := map[schema.SecondaryBranchParamRow]bool{}
for _, cl := range b.changelistBuilders {
for _, ps := range cl.patchsets {
for _, tr := range ps.traces {
for k, v := range tr.Keys {
seenRows[schema.SecondaryBranchParamRow{
BranchName: ps.patchset.ChangelistID,
VersionName: ps.patchset.PatchsetID,
Key: k,
Value: v,
}] = true
}
}
for _, opt := range ps.options {
for k, v := range opt.Keys {
seenRows[schema.SecondaryBranchParamRow{
BranchName: ps.patchset.ChangelistID,
VersionName: ps.patchset.PatchsetID,
Key: k,
Value: v,
}] = true
}
}
}
}
var rv []schema.SecondaryBranchParamRow
for row := range seenRows {
rv = append(rv, row)
}
return rv
}
// CommitBuilder has methods for easily building commit history. All methods are chainable.
type CommitBuilder struct {
commitRows []schema.CommitWithDataRow
gitRows []schema.GitCommitRow
}
// Insert adds a commit with the given data. It panics if the commitTime is not formatted to
// RFC3339.
func (b *CommitBuilder) Insert(commitID schema.CommitID, author, subject, commitTime string) *CommitBuilder {
h := sha1.Sum([]byte(commitID))
gitHash := hex.EncodeToString(h[:])
ct, err := time.Parse(time.RFC3339, commitTime)
if err != nil {
logAndPanic("Invalid time %q: %s", commitTime, err)
}
b.commitRows = append(b.commitRows, schema.CommitWithDataRow{
CommitID: commitID,
// tiling will be computed in Build.
})
b.gitRows = append(b.gitRows, schema.GitCommitRow{
GitHash: gitHash,
CommitID: commitID,
CommitTime: ct,
AuthorEmail: author,
Subject: subject,
})
return b
}
// GitCommitBuilder has methods for building rows in the GitCommits table.
type GitCommitBuilder struct {
gitRows []schema.GitCommitRow
}
// Insert adds a commit with the given data. It panics if the commitTime is not formatted to
// RFC3339 or if the gitHash is invalid.
func (b *GitCommitBuilder) Insert(commitID, gitHash, author, subject, commitTime string) *GitCommitBuilder {
if len(gitHash) != 40 {
panic("invalid git hash length; must be 40 chars")
}
ct, err := time.Parse(time.RFC3339, commitTime)
if err != nil {
logAndPanic("Invalid time %q: %s", commitTime, err)
}
b.gitRows = append(b.gitRows, schema.GitCommitRow{
GitHash: gitHash,
CommitID: schema.CommitID(commitID),
CommitTime: ct,
AuthorEmail: author,
Subject: subject,
})
return b
}
// TraceBuilder has methods for easily building trace data. All methods are chainable.
type TraceBuilder struct {
// inputs needed upon creation
commits []schema.CommitWithDataRow
commonKeys paramtools.Params
groupingKeys []string
symbolsToDigest map[rune]schema.DigestBytes
// built as a result of the calling methods
groupings []schema.GroupingRow
options []schema.OptionsRow
sourceFiles []schema.SourceFileRow
traceValues [][]*schema.TraceValueRow // each row is one trace's data points
traces []schema.TraceRow
}
// History takes in a slice of strings, with each string representing the history of a trace. Each
// string must have a number of symbols equal to the length of the number of commits. A dash '-'
// means no data at that commit; any other symbol must match the previous call to SetDigests().
// If any data is invalid or missing, this method panics.
func (b *TraceBuilder) History(traceHistories ...string) *TraceBuilder {
if len(b.traceValues) > 0 {
logAndPanic("History must be called only once.")
}
// traceValues will have length len(commits) * numTraces after this is complete. Some entries
// may be nil to represent "no data" and will be stripped out later.
for _, th := range traceHistories {
if len(th) != len(b.commits) {
logAndPanic("history %q is of invalid length: expected %d", th, len(b.commits))
}
traceValues := make([]*schema.TraceValueRow, len(b.commits))
for i, symbol := range th {
if symbol == '-' {
continue
}
digest, ok := b.symbolsToDigest[symbol]
if !ok {
logAndPanic("Unknown symbol in trace history %s", string(symbol))
}
traceValues[i] = &schema.TraceValueRow{
CommitID: b.commits[i].CommitID,
Digest: digest,
}
}
b.traceValues = append(b.traceValues, traceValues)
}
return b
}
// Keys specifies the params for each trace. It must be called after History() and the keys param
// must have the same number of elements that the call to History() had. The nth element here
// represents the nth trace history. This method panics if any trace would end up being identical
// or lacks the grouping data. This method panics if called with incorrect parameters or at the
// wrong time in building chain.
func (b *TraceBuilder) Keys(keys []paramtools.Params) *TraceBuilder {
if len(b.traceValues) == 0 {
logAndPanic("Keys must be called after history loaded")
}
if len(b.traces) > 0 {
logAndPanic("Keys must only once")
}
if len(keys) != len(b.traceValues) {
logAndPanic("Expected one set of keys for each trace")
}
// We now have enough data to make all the traces.
seenTraces := map[string]bool{} // maps serialized keys to if we have seen them.
for i, traceParams := range keys {
traceParams.Add(b.commonKeys)
grouping := make(paramtools.Params, len(keys))
for _, gk := range b.groupingKeys {
val, ok := traceParams[gk]
if !ok {
logAndPanic("Missing grouping key %q from %v", gk, traceParams)
}
grouping[gk] = val
}
_, groupingID := sql.SerializeMap(grouping)
traceJSON, traceID := sql.SerializeMap(traceParams)
if seenTraces[traceJSON] {
logAndPanic("Found identical trace %s", traceJSON)
}
seenTraces[traceJSON] = true
for _, tv := range b.traceValues[i] {
if tv != nil {
tv.GroupingID = groupingID
tv.TraceID = traceID
tv.Shard = sql.ComputeTraceValueShard(traceID)
}
}
b.groupings = append(b.groupings, schema.GroupingRow{
GroupingID: groupingID,
Keys: grouping,
})
b.traces = append(b.traces, schema.TraceRow{
TraceID: traceID,
Corpus: traceParams[types.CorpusField],
GroupingID: groupingID,
Keys: traceParams.Copy(), // make a copy to ensure immutability
MatchesAnyIgnoreRule: schema.NBNull,
})
}
return b
}
// OptionsAll applies the given options for all data points provided in history.
func (b *TraceBuilder) OptionsAll(opts paramtools.Params) *TraceBuilder {
xopts := make([]paramtools.Params, len(b.traceValues))
for i := range xopts {
xopts[i] = opts
}
return b.OptionsPerTrace(xopts)
}
// OptionsPerTrace applies the given optional params to the traces created in History. The number
// of options is expected to match the number of traces. It panics if called more than once or
// at the wrong time.
func (b *TraceBuilder) OptionsPerTrace(xopts []paramtools.Params) *TraceBuilder {
if len(xopts) != len(b.traceValues) {
logAndPanic("Must have one options per trace")
}
optsPerTrace := make([][]paramtools.Params, len(b.traceValues))
for i, optForThisTrace := range xopts {
optsPerTrace[i] = make([]paramtools.Params, len(b.commits))
for j := range b.commits {
optsPerTrace[i][j] = optForThisTrace
}
}
return b.OptionsPerPoint(optsPerTrace)
}
// OptionsPerPoint applies the given optional params to the each data point in the provided traces.
// The number of rows should equal the number of traces and the number of cols should equal the
// number of commits.
func (b *TraceBuilder) OptionsPerPoint(optsByTrace [][]paramtools.Params) *TraceBuilder {
if len(b.traceValues) == 0 {
logAndPanic("Options* must be called after history loaded")
}
if len(b.options) > 0 {
logAndPanic("Must call Options* only once")
}
if len(optsByTrace) != len(b.traceValues) {
logAndPanic("Must have the number of rows match the number of traces")
}
if len(optsByTrace[0]) != len(b.commits) {
logAndPanic("Must have the number of cols match the number of commits")
}
for r := range optsByTrace {
for c := range optsByTrace[r] {
tv := b.traceValues[r][c]
if tv == nil {
continue
}
_, optionsID := sql.SerializeMap(optsByTrace[r][c])
b.options = append(b.options, schema.OptionsRow{
OptionsID: optionsID,
Keys: optsByTrace[r][c].Copy(), // make a copy to ensure immutability
})
tv.OptionsID = optionsID
}
}
return b
}
// IngestedFrom applies the given list of files and ingested times to the provided data.
// The number of filenames and ingestedDates is expected to match the number of commits; if no
// data is at that commit, it is ok to have both entries be empty string. It panics if any inputs
// are invalid.
func (b *TraceBuilder) IngestedFrom(filenames, ingestedDates []string) *TraceBuilder {
if len(b.traceValues) == 0 {
logAndPanic("IngestedFrom must be called after history")
}
if len(b.sourceFiles) > 0 {
logAndPanic("Must call IngestedFrom only once")
}
if len(filenames) != len(b.commits) {
logAndPanic("Expected %d files", len(b.commits))
}
if len(ingestedDates) != len(b.commits) {
logAndPanic("Expected %d dates", len(b.commits))
}
for i := range filenames {
name, ingestedDate := filenames[i], ingestedDates[i]
if name == "" && ingestedDate == "" {
continue // not used by any traces
}
if name == "" || ingestedDate == "" {
logAndPanic("both name and date should be empty, if one is")
}
h := md5.Sum([]byte(name))
sourceID := h[:]
d, err := time.Parse(time.RFC3339, ingestedDate)
if err != nil {
logAndPanic("Invalid date format %q: %s", ingestedDate, err)
}
b.sourceFiles = append(b.sourceFiles, schema.SourceFileRow{
SourceFileID: sourceID,
SourceFile: name,
LastIngested: d,
})
// apply it to every ith tracevalue.
for _, traceRows := range b.traceValues {
if traceRows[i] != nil {
traceRows[i].SourceFileID = sourceID
}
}
}
return b
}
type ExpectationsBuilder struct {
currentGroupingID schema.GroupingID
deltas []schema.ExpectationDeltaRow
groupingKeys []string
record *schema.ExpectationRecordRow
}
func (b *ExpectationsBuilder) ExpectationsForGrouping(keys paramtools.Params) *ExpectationsBuilder {
for _, key := range b.groupingKeys {
if _, ok := keys[key]; !ok {
logAndPanic("Grouping is missing key %q", key)
}
}
_, b.currentGroupingID = sql.SerializeMap(keys)
return b
}
// Positive marks the given digest as positive for the current grouping. It assumes that the
// previous triage state was untriaged (as this is quite common for test data).
func (b *ExpectationsBuilder) Positive(d types.Digest) *ExpectationsBuilder {
return b.Triage(d, schema.LabelUntriaged, schema.LabelPositive)
}
// Negative marks the given digest as negative for the current grouping. It assumes that the
// previous triage state was untriaged (as this is quite common for test data).
func (b *ExpectationsBuilder) Negative(d types.Digest) *ExpectationsBuilder {
return b.Triage(d, schema.LabelUntriaged, schema.LabelNegative)
}
// Triage allows an event to be added that has some custom setup. E.g. changing a postively
// triaged digest to be negatively triaged.
func (b *ExpectationsBuilder) Triage(d types.Digest, before, after schema.ExpectationLabel) *ExpectationsBuilder {
if b.currentGroupingID == nil {
logAndPanic("You must call ExpectationsForGrouping first")
}
db, err := sql.DigestToBytes(d)
if err != nil {
logAndPanic("Invalid digest %q: %s", d, err)
}
b.deltas = append(b.deltas, schema.ExpectationDeltaRow{
ExpectationRecordID: b.record.ExpectationRecordID,
GroupingID: b.currentGroupingID,
Digest: db,
LabelBefore: before,
LabelAfter: after,
})
return b
}
func logAndPanic(msg string, args ...interface{}) {
panic(fmt.Sprintf(msg, args...))
}
type ChangelistBuilder struct {
changelist schema.ChangelistRow
expectationBuilders []*ExpectationsBuilder
groupingKeys []string
patchsets []*PatchsetBuilder
}
// AddPatchset returns a builder for data associated with the given patchset.
func (b *ChangelistBuilder) AddPatchset(psID, gitHash string, order int) *PatchsetBuilder {
pb := &PatchsetBuilder{
patchset: schema.PatchsetRow{
PatchsetID: qualify(b.changelist.System, psID),
System: b.changelist.System,
ChangelistID: b.changelist.ChangelistID,
Order: order,
GitHash: gitHash,
},
groupingKeys: b.groupingKeys,
}
b.patchsets = append(b.patchsets, pb)
return pb
}
// AddTriageEvent returns a builder for a series of triage events on the current CL. These
// events will be attributed to the given user and timestamp.
func (b *ChangelistBuilder) AddTriageEvent(user, triageTime string) *ExpectationsBuilder {
if len(b.groupingKeys) == 0 {
logAndPanic("Must add grouping keys before expectations")
}
ts, err := time.Parse(time.RFC3339, triageTime)
if err != nil {
logAndPanic("Invalid triage time %q: %s", triageTime, err)
}
eb := &ExpectationsBuilder{
groupingKeys: b.groupingKeys,
record: &schema.ExpectationRecordRow{
ExpectationRecordID: uuid.New(),
BranchName: &b.changelist.ChangelistID,
UserName: user,
TriageTime: ts,
},
}
b.expectationBuilders = append(b.expectationBuilders, eb)
return eb
}
type PatchsetBuilder struct {
commonKeys paramtools.Params
dataPoints [][]*schema.SecondaryBranchValueRow
groupingKeys []string
groupings []schema.GroupingRow
options []schema.OptionsRow
patchset schema.PatchsetRow
sourceFiles []schema.SourceFileRow
traces []schema.TraceRow
tryjobs []schema.TryjobRow
}
// DataWithCommonKeys sets it so the next calls to Digests will use these trace keys.
func (b *PatchsetBuilder) DataWithCommonKeys(keys paramtools.Params) *PatchsetBuilder {
b.commonKeys = keys
return b
}
// Digests adds some data to this patchset. Each digest represents a single data point on a
// single trace.
func (b *PatchsetBuilder) Digests(digests ...types.Digest) *PatchsetBuilder {
if len(digests) == 0 {
panic("Cannot add empty data")
}
newData := make([]*schema.SecondaryBranchValueRow, 0, len(digests))
for _, d := range digests {
db, err := sql.DigestToBytes(d)
if err != nil {
logAndPanic("Invalid digest %q: %s", d, err)
}
newData = append(newData, &schema.SecondaryBranchValueRow{
BranchName: b.patchset.ChangelistID,
VersionName: b.patchset.PatchsetID,
Digest: db,
})
}
b.dataPoints = append(b.dataPoints, newData)
return b
}
// Keys applies the given keys to the previous set of data added with Digests(), including
// a previously set common keys. The length of keys should match the earlier call to Digests().
func (b *PatchsetBuilder) Keys(keys []paramtools.Params) *PatchsetBuilder {
if len(b.dataPoints) == 0 {
logAndPanic("Must call Digests before Keys()")
}
lastData := b.dataPoints[len(b.dataPoints)-1]
if len(lastData) != len(keys) {
logAndPanic("Expected %d keys", len(lastData))
}
for i, traceParams := range keys {
traceParams.Add(b.commonKeys)
grouping := make(paramtools.Params, len(keys))
for _, gk := range b.groupingKeys {
val, ok := traceParams[gk]
if !ok {
logAndPanic("Missing grouping key %q from %v", gk, traceParams)
}
grouping[gk] = val
}
_, groupingID := sql.SerializeMap(grouping)
_, traceID := sql.SerializeMap(traceParams)
lastData[i].GroupingID = groupingID
lastData[i].TraceID = traceID
b.groupings = append(b.groupings, schema.GroupingRow{
GroupingID: groupingID,
Keys: grouping,
})
b.traces = append(b.traces, schema.TraceRow{
TraceID: traceID,
Corpus: traceParams[types.CorpusField],
GroupingID: groupingID,
Keys: traceParams.Copy(), // make a copy to ensure immutability
MatchesAnyIgnoreRule: schema.NBNull,
})
}
return b
}
// OptionsAll applies the given options to the entire previous set of data added with Digests().
func (b *PatchsetBuilder) OptionsAll(opts paramtools.Params) *PatchsetBuilder {
if len(b.dataPoints) == 0 {
logAndPanic("OptionsAll must be called after history loaded")
}
lastData := b.dataPoints[len(b.dataPoints)-1]
xopts := make([]paramtools.Params, len(lastData))
for i := range xopts {
xopts[i] = opts
}
return b.OptionsPerPoint(xopts)
}
// OptionsPerPoint applies the given options to the previous set of data added with Digests(). The
// length of keys should match the earlier call to Digests().
func (b *PatchsetBuilder) OptionsPerPoint(xopts []paramtools.Params) *PatchsetBuilder {
if len(b.dataPoints) == 0 {
logAndPanic("OptionsPerPoint must be called after history loaded")
}
lastData := b.dataPoints[len(b.dataPoints)-1]
if len(lastData) != len(xopts) {
logAndPanic("Expected %d options", len(lastData))
}
for i, opts := range xopts {
_, optionsID := sql.SerializeMap(opts)
b.options = append(b.options, schema.OptionsRow{
OptionsID: optionsID,
Keys: opts.Copy(), // make a copy to ensure immutability
})
lastData[i].OptionsID = optionsID
}
return b
}
// FromTryjob assigns all data previously added with Digests() to be from the given tryjob.
func (b *PatchsetBuilder) FromTryjob(id, cis, name, file, ingestedTS string) *PatchsetBuilder {
if len(b.dataPoints) == 0 {
logAndPanic("FromTryjob must be called after history loaded")
}
updated, err := time.Parse(time.RFC3339, ingestedTS)
if err != nil {
logAndPanic("invalid timestamp %q: %s", ingestedTS, err)
}
qID := qualify(cis, id)
b.tryjobs = append(b.tryjobs, schema.TryjobRow{
TryjobID: qID,
System: cis,
ChangelistID: b.patchset.ChangelistID,
PatchsetID: b.patchset.PatchsetID,
DisplayName: name,
LastIngestedData: updated,
})
h := md5.Sum([]byte(file))
sourceID := h[:]
b.sourceFiles = append(b.sourceFiles, schema.SourceFileRow{
SourceFileID: sourceID,
SourceFile: file,
LastIngested: updated,
})
lastData := b.dataPoints[len(b.dataPoints)-1]
for _, d := range lastData {
d.TryjobID = qID
d.SourceFileID = sourceID
}
return b
}