| package ingestion_processors |
| |
| import ( |
| "context" |
| "crypto/md5" |
| "errors" |
| "strconv" |
| "time" |
| |
| "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx" |
| lru "github.com/hashicorp/golang-lru" |
| "github.com/jackc/pgx/v4" |
| "github.com/jackc/pgx/v4/pgxpool" |
| "go.opencensus.io/trace" |
| "golang.org/x/sync/errgroup" |
| |
| "go.skia.org/infra/go/metrics2" |
| "go.skia.org/infra/go/now" |
| "go.skia.org/infra/go/paramtools" |
| "go.skia.org/infra/go/skerr" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/sql/sqlutil" |
| "go.skia.org/infra/go/util" |
| "go.skia.org/infra/golden/go/ingestion" |
| "go.skia.org/infra/golden/go/jsonio" |
| "go.skia.org/infra/golden/go/sql" |
| "go.skia.org/infra/golden/go/sql/schema" |
| "go.skia.org/infra/golden/go/types" |
| ) |
| |
| const ( |
| // SQLPrimaryBranch indicates that the primary branch ingester use the SQL backend. |
| SQLPrimaryBranch = "sql_primary" |
| |
| sqlTileWidthConfig = "TileWidth" |
| |
| commitsCacheSize = 1000 |
| expectationsCacheSize = 10_000_000 |
| optionsGroupingCacheSize = 1_000_000 |
| paramsCacheSize = 100_000 |
| traceCacheSize = 10_000_000 |
| |
| cacheSizeMetric = "gold_ingestion_cache_sizes" |
| ) |
| |
| type sqlPrimaryIngester struct { |
| db *pgxpool.Pool |
| source ingestion.Source |
| tileWidth int |
| |
| commitsCache *lru.Cache |
| expectationsCache *lru.Cache |
| optionGroupingCache *lru.Cache |
| paramsCache *lru.Cache |
| traceCache *lru.Cache |
| |
| filesProcessed metrics2.Counter |
| filesSuccess metrics2.Counter |
| resultsIngested metrics2.Counter |
| } |
| |
| // PrimaryBranchSQL creates a Processor that writes to the SQL backend and returns it. |
| // It panics if configuration is invalid. |
| func PrimaryBranchSQL(src ingestion.Source, configParams map[string]string, db *pgxpool.Pool) *sqlPrimaryIngester { |
| tw := configParams[sqlTileWidthConfig] |
| tileWidth := 10 |
| if tw != "" { |
| var err error |
| tileWidth, err = strconv.Atoi(tw) |
| if err != nil { |
| panic(skerr.Wrapf(err, "Invalid TileWidth")) |
| } |
| } |
| commitsCache, err := lru.New(commitsCacheSize) |
| if err != nil { |
| panic(err) // should only throw error on invalid size |
| } |
| eCache, err := lru.New(expectationsCacheSize) |
| if err != nil { |
| panic(err) // should only throw error on invalid size |
| } |
| ogCache, err := lru.New(optionsGroupingCacheSize) |
| if err != nil { |
| panic(err) // should only throw error on invalid size |
| } |
| paramsCache, err := lru.New(paramsCacheSize) |
| if err != nil { |
| panic(err) // should only throw error on invalid size |
| } |
| tCache, err := lru.New(traceCacheSize) |
| if err != nil { |
| panic(err) // should only throw error on invalid size |
| } |
| |
| return &sqlPrimaryIngester{ |
| db: db, |
| source: src, |
| tileWidth: tileWidth, |
| commitsCache: commitsCache, |
| expectationsCache: eCache, |
| optionGroupingCache: ogCache, |
| paramsCache: paramsCache, |
| traceCache: tCache, |
| |
| filesProcessed: metrics2.GetCounter("gold_primarysqlingestion_files_processed"), |
| filesSuccess: metrics2.GetCounter("gold_primarysqlingestion_files_success"), |
| resultsIngested: metrics2.GetCounter("gold_primarysqlingestion_results_ingested"), |
| } |
| } |
| |
| // HandlesFile returns true if the underlying source handles the given file |
| func (s *sqlPrimaryIngester) HandlesFile(name string) bool { |
| return s.source.HandlesFile(name) |
| } |
| |
| // Process take the content of the given file and writes it to the various SQL tables required |
| // by the schema. |
| // If there is a SQL error, we return ingestion.ErrRetryable but do NOT rollback the data. During |
| // exploratory design, it was observed that if we tried to put all the data from a single file |
| // in a transaction, the whole ingestion process ground to a halt as many big transactions would |
| // conflict with each other. Instead, the data is written to the DB in an order to minimize |
| // short-term errors if it partially succeeds and relies on retrying ingestion of files to |
| // deal with SQL errors. |
| func (s *sqlPrimaryIngester) Process(ctx context.Context, fileName string) error { |
| s.filesProcessed.Inc(1) |
| ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) |
| defer cancel() |
| ctx, span := trace.StartSpan(ctx, "ingestion_SQLPrimaryBranchProcess") |
| defer span.End() |
| r, err := s.source.GetReader(ctx, fileName) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| gr, err := processGoldResults(ctx, r) |
| if err != nil { |
| return skerr.Wrapf(err, "could not process file %s from source %s", fileName, s.source) |
| } |
| if len(gr.Results) == 0 { |
| sklog.Infof("file %s had no results", fileName) |
| return nil |
| } |
| span.AddAttributes(trace.Int64Attribute("num_results", int64(len(gr.Results)))) |
| sklog.Infof("Ingesting %d results from file %s", len(gr.Results), fileName) |
| |
| commitID, tileID, err := s.getCommitAndTileID(ctx, gr) |
| if err != nil { |
| return skerr.Wrapf(err, "identifying commit id for file %s", fileName) |
| } |
| sourceFileID := md5.Sum([]byte(fileName)) |
| |
| if err := s.writeData(ctx, gr, commitID, tileID, sourceFileID[:]); err != nil { |
| sklog.Errorf("Error writing data for file %s: %s", fileName, err) |
| return ingestion.ErrRetryable |
| } |
| |
| if err := s.upsertSourceFile(ctx, sourceFileID[:], fileName); err != nil { |
| sklog.Errorf("Error writing to SourceFiles for file %s: %s", fileName, err) |
| return ingestion.ErrRetryable |
| } |
| s.filesSuccess.Inc(1) |
| s.resultsIngested.Inc(int64(len(gr.Results))) |
| return nil |
| } |
| |
| type commitCacheEntry struct { |
| commitID schema.CommitID |
| tileID schema.TileID |
| } |
| |
| // getCommitAndTileID gets the commit id and corresponding tile id from the information provided |
| // in the given jsonio. Currently, this looks up the GitHash to determine the commit_id |
| // (i.e. a sequence number), but this could be more flexible (e.g. To support multiple repos). |
| // The tileID is derived from existing tileIDs of surrounding commits. |
| func (s *sqlPrimaryIngester) getCommitAndTileID(ctx context.Context, gr *jsonio.GoldResults) (schema.CommitID, schema.TileID, error) { |
| ctx, span := trace.StartSpan(ctx, "getCommitAndTileID") |
| defer span.End() |
| var commitID schema.CommitID |
| var key string |
| if gr.GitHash != "" { |
| key = gr.GitHash |
| if c, ok := s.commitsCache.Get(key); ok { |
| cce, ok := c.(commitCacheEntry) |
| if ok { |
| return cce.commitID, cce.tileID, nil |
| } |
| sklog.Warningf("Corrupt entry in commits cache: %#v", c) |
| s.commitsCache.Remove(key) |
| } |
| // Cache miss - go to DB; We can't assume it's in the CommitsWithData table yet. |
| row := s.db.QueryRow(ctx, `SELECT commit_id FROM GitCommits WHERE git_hash = $1`, gr.GitHash) |
| if err := row.Scan(&commitID); err != nil { |
| return "", 0, skerr.Wrapf(err, "looking up git_hash = %q", gr.GitHash) |
| } |
| } else if gr.CommitID != "" && gr.CommitMetadata != "" { |
| key = gr.CommitID |
| if _, ok := s.commitsCache.Get(key); !ok { |
| // On a cache miss, make sure we store it to the table. |
| const statement = `INSERT INTO MetadataCommits (commit_id, commit_metadata) |
| VALUES ($1, $2) ON CONFLICT DO NOTHING` |
| _, err := s.db.Exec(ctx, statement, gr.CommitID, gr.CommitMetadata) |
| if err != nil { |
| return "", 0, skerr.Wrapf(err, "storing metadata commit %q : %q", gr.CommitID, gr.CommitMetadata) |
| } |
| } |
| commitID = schema.CommitID(gr.CommitID) |
| } else { |
| // should never happen, assuming the jsonio.Validate works |
| return "", 0, skerr.Fmt("missing GitHash and CommitID/CommitMetadata") |
| } |
| |
| tileID, err := s.getAndWriteTileIDForCommit(ctx, commitID) |
| if err != nil { |
| return "", 0, skerr.Wrapf(err, "computing tile id for %s", commitID) |
| } |
| s.updateCommitCache(gr, key, commitID, tileID) |
| return commitID, tileID, nil |
| } |
| |
| // getAndWriteTileIDForCommit determines the tileID that should be used for this commit and writes |
| // it to the DB using a transaction. We use a transaction here (but not in general) because this |
| // should be a relatively fast read/write sequence and done somewhat infrequently (e.g. new commit |
| // has data created for it and it's not cached). |
| func (s *sqlPrimaryIngester) getAndWriteTileIDForCommit(ctx context.Context, targetCommit schema.CommitID) (schema.TileID, error) { |
| ctx, span := trace.StartSpan(ctx, "getAndWriteTileIDForCommit") |
| defer span.End() |
| var returnTileID schema.TileID |
| err := crdbpgx.ExecuteTx(ctx, s.db, pgx.TxOptions{}, func(tx pgx.Tx) error { |
| tileID, err := s.getTileIDForCommit(ctx, tx, targetCommit) |
| if err != nil { |
| return err // Don't wrap - crdbpgx might retry |
| } |
| // Rows in this table are immutable once written |
| _, err = tx.Exec(ctx, ` |
| INSERT INTO CommitsWithData (commit_id, tile_id) VALUES ($1, $2) |
| ON CONFLICT DO NOTHING`, targetCommit, tileID) |
| if err != nil { |
| return err // Don't wrap - crdbpgx might retry |
| } |
| returnTileID = tileID |
| return nil |
| }) |
| if err != nil { |
| return 0, skerr.Wrap(err) |
| } |
| return returnTileID, nil |
| } |
| |
| // getTileIDForCommit determines the tileID for this commit by looking at surrounding commits |
| // (if any) for clues. If we have no commits before us, we are at tile 0. If we have no commits |
| // after us, we are on the latest tile, or if that is full, we start the next tile. If there are |
| // commits on either side of us, we use the tile for the commit after us. |
| func (s *sqlPrimaryIngester) getTileIDForCommit(ctx context.Context, tx pgx.Tx, targetCommit schema.CommitID) (schema.TileID, error) { |
| ctx, span := trace.StartSpan(ctx, "getTileIDForCommit") |
| defer span.End() |
| row := tx.QueryRow(ctx, `SELECT commit_id, tile_id FROM CommitsWithData WHERE |
| commit_id <= $1 ORDER BY commit_id DESC LIMIT 1`, targetCommit) |
| var cID schema.CommitID |
| var beforeTileID schema.TileID |
| if err := row.Scan(&cID, &beforeTileID); err == pgx.ErrNoRows { |
| // If there is no data before this commit, we are, by definition, at tile 0 |
| return 0, nil |
| } else if err != nil { |
| return 0, err // Don't wrap - crdbpgx might retry |
| } |
| if cID == targetCommit { |
| return beforeTileID, nil // already been computed |
| } |
| |
| row = tx.QueryRow(ctx, `SELECT tile_id FROM CommitsWithData WHERE |
| commit_id >= $1 ORDER BY commit_id ASC LIMIT 1`, targetCommit) |
| var afterTileID schema.TileID |
| if err := row.Scan(&afterTileID); err == pgx.ErrNoRows { |
| // If there is no data after this commit, we are at the highest tile id. |
| row := tx.QueryRow(ctx, `SELECT count(*) FROM CommitsWithData WHERE |
| tile_id = $1`, beforeTileID) |
| var count int |
| if err := row.Scan(&count); err != nil { |
| return 0, err // Don't wrap - crdbpgx might retry |
| } |
| // If the size of the previous tile exceeds our tile width, go to the next tile. |
| if count >= s.tileWidth { |
| return beforeTileID + 1, nil |
| } |
| return beforeTileID, nil |
| } else if err != nil { |
| return 0, err // Don't wrap - crdbpgx might retry |
| } |
| // We have CommitsWithData both before and after the targetCommit. We always return the tile |
| // after this one, because it is less likely that tile is full. |
| return afterTileID, nil |
| } |
| |
| // updateCommitCache updates the local cache of "CommitsWithData" if necessary (so we know we do |
| // not have to write to the table the next time). |
| func (s *sqlPrimaryIngester) updateCommitCache(gr *jsonio.GoldResults, key string, id schema.CommitID, tileID schema.TileID) { |
| s.commitsCache.Add(key, commitCacheEntry{ |
| commitID: id, |
| tileID: tileID, |
| }) |
| } |
| |
| // upsertSourceFile creates a row in SourceFiles for the given file or updates the existing row's |
| // last_ingested timestamp. The time can be overridden via the context. |
| func (s *sqlPrimaryIngester) upsertSourceFile(ctx context.Context, srcID schema.SourceFileID, fileName string) error { |
| ctx, span := trace.StartSpan(ctx, "upsertSourceFile") |
| defer span.End() |
| const statement = `UPSERT INTO SourceFiles (source_file_id, source_file, last_ingested) |
| VALUES ($1, $2, $3)` |
| err := crdbpgx.ExecuteTx(ctx, s.db, pgx.TxOptions{}, func(tx pgx.Tx) error { |
| _, err := tx.Exec(ctx, statement, srcID, fileName, now.Now(ctx)) |
| return err // Don't wrap - crdbpgx might retry |
| }) |
| return skerr.Wrap(err) |
| } |
| |
| // writeData writes all the data from the processed JSON file, associating it with the given |
| // commitID, tileID, and sourceFile. This has to write to several tables in accordance with the |
| // schema/design. It makes use of caches where possible to avoid writing to tables with immutable |
| // data that we know is there already (e.g. a previous write succeeded). |
| func (s *sqlPrimaryIngester) writeData(ctx context.Context, gr *jsonio.GoldResults, commitID schema.CommitID, tileID schema.TileID, srcID schema.SourceFileID) error { |
| ctx, span := trace.StartSpan(ctx, "writeData") |
| span.AddAttributes(trace.Int64Attribute("results", int64(len(gr.Results)))) |
| defer span.End() |
| |
| var groupingsToCreate []schema.GroupingRow |
| var optionsToCreate []schema.OptionsRow |
| var tracesToCreate []schema.TraceRow |
| var traceValuesToUpdate []schema.TraceValueRow |
| var valuesAtHeadToUpdate []schema.ValueAtHeadRow |
| |
| newCacheEntries := map[string]bool{} |
| paramset := paramtools.ParamSet{} // All params for this set of data points |
| for _, result := range gr.Results { |
| keys, options := paramsAndOptions(gr, result) |
| if err := shouldIngest(keys, options); err != nil { |
| sklog.Infof("Not ingesting a result: %s", err) |
| continue |
| } |
| digestBytes, err := sql.DigestToBytes(result.Digest) |
| if err != nil { |
| sklog.Errorf("Invalid digest %s: %s", result.Digest, err) |
| continue |
| } |
| _, traceID := sql.SerializeMap(keys) |
| paramset.AddParams(keys) |
| |
| _, optionsID := sql.SerializeMap(options) |
| // We explicitly do not add options to paramset, but may store them to a different |
| // table in the future. |
| |
| grouping := groupingFor(keys) |
| _, groupingID := sql.SerializeMap(grouping) |
| |
| if h := string(optionsID); !s.optionGroupingCache.Contains(h) && !newCacheEntries[h] { |
| optionsToCreate = append(optionsToCreate, schema.OptionsRow{ |
| OptionsID: optionsID, |
| Keys: options, |
| }) |
| newCacheEntries[h] = true |
| } |
| |
| if h := string(groupingID); !s.optionGroupingCache.Contains(h) && !newCacheEntries[h] { |
| groupingsToCreate = append(groupingsToCreate, schema.GroupingRow{ |
| GroupingID: groupingID, |
| Keys: grouping, |
| }) |
| newCacheEntries[h] = true |
| } |
| |
| th := string(traceID) |
| if newCacheEntries[th] { |
| continue // already seen data for this trace |
| } |
| newCacheEntries[th] = true |
| if !s.traceCache.Contains(th) { |
| tracesToCreate = append(tracesToCreate, schema.TraceRow{ |
| TraceID: traceID, |
| GroupingID: groupingID, |
| Keys: keys, |
| MatchesAnyIgnoreRule: schema.NBNull, |
| }) |
| } |
| valuesAtHeadToUpdate = append(valuesAtHeadToUpdate, schema.ValueAtHeadRow{ |
| TraceID: traceID, |
| MostRecentCommitID: commitID, |
| Digest: digestBytes, |
| OptionsID: optionsID, |
| GroupingID: groupingID, |
| Keys: keys, |
| MatchesAnyIgnoreRule: schema.NBNull, |
| }) |
| traceValuesToUpdate = append(traceValuesToUpdate, schema.TraceValueRow{ |
| Shard: sql.ComputeTraceValueShard(traceID), |
| TraceID: traceID, |
| CommitID: commitID, |
| Digest: digestBytes, |
| GroupingID: groupingID, |
| OptionsID: optionsID, |
| SourceFileID: srcID, |
| }) |
| } |
| |
| eg, ctx := errgroup.WithContext(ctx) |
| eg.Go(func() error { |
| return skerr.Wrap(batchCreateGroupings(ctx, s.db, groupingsToCreate, s.optionGroupingCache)) |
| }) |
| eg.Go(func() error { |
| return skerr.Wrap(batchCreateOptions(ctx, s.db, optionsToCreate, s.optionGroupingCache)) |
| }) |
| eg.Go(func() error { |
| return skerr.Wrap(batchCreateTraces(ctx, s.db, tracesToCreate, s.traceCache)) |
| }) |
| eg.Go(func() error { |
| return skerr.Wrap(s.batchCreateUntriagedExpectations(ctx, traceValuesToUpdate)) |
| }) |
| eg.Go(func() error { |
| return skerr.Wrap(s.batchUpdateTraceValues(ctx, traceValuesToUpdate)) |
| }) |
| eg.Go(func() error { |
| return skerr.Wrap(s.batchUpdateValuesAtHead(ctx, valuesAtHeadToUpdate)) |
| }) |
| eg.Go(func() error { |
| return skerr.Wrap(s.batchCreatePrimaryBranchParams(ctx, paramset, tileID)) |
| }) |
| eg.Go(func() error { |
| return skerr.Wrap(s.batchCreateTiledTraceDigests(ctx, traceValuesToUpdate, tileID)) |
| }) |
| return skerr.Wrap(eg.Wait()) |
| } |
| |
| // batchCreateGroupings writes the given grouping rows to the Groupings table if they aren't |
| // already there (they are immutable once written). It updates the cache after a successful write. |
| func batchCreateGroupings(ctx context.Context, db crdbpgx.Conn, rows []schema.GroupingRow, cache *lru.Cache) error { |
| if len(rows) == 0 { |
| return nil |
| } |
| ctx, span := trace.StartSpan(ctx, "batchCreateGroupings") |
| span.AddAttributes(trace.Int64Attribute("groupings", int64(len(rows)))) |
| defer span.End() |
| const chunkSize = 200 // Arbitrarily picked |
| err := util.ChunkIter(len(rows), chunkSize, func(startIdx int, endIdx int) error { |
| if err := ctx.Err(); err != nil { |
| return err |
| } |
| batch := rows[startIdx:endIdx] |
| if len(batch) == 0 { |
| return nil |
| } |
| statement := `INSERT INTO Groupings (grouping_id, keys) VALUES ` |
| const valuesPerRow = 2 |
| statement += sqlutil.ValuesPlaceholders(valuesPerRow, len(batch)) |
| arguments := make([]interface{}, 0, valuesPerRow*len(batch)) |
| for _, row := range batch { |
| arguments = append(arguments, row.GroupingID, row.Keys) |
| } |
| // ON CONFLICT DO NOTHING because if the rows already exist, then the data we are writing |
| // is immutable. |
| statement += ` ON CONFLICT DO NOTHING;` |
| |
| err := crdbpgx.ExecuteTx(ctx, db, pgx.TxOptions{}, func(tx pgx.Tx) error { |
| _, err := tx.Exec(ctx, statement, arguments...) |
| return err // Don't wrap - crdbpgx might retry |
| }) |
| return skerr.Wrap(err) |
| }) |
| if err != nil { |
| return skerr.Wrapf(err, "storing %d groupings", len(rows)) |
| } |
| // We've successfully written them to the DB, add them to the cache. |
| for _, r := range rows { |
| cache.Add(string(r.GroupingID), struct{}{}) |
| } |
| return nil |
| } |
| |
| // batchCreateOptions writes the given options rows to the Options table if they aren't |
| // already there (they are immutable once written). It updates the cache after a successful write. |
| func batchCreateOptions(ctx context.Context, db crdbpgx.Conn, rows []schema.OptionsRow, cache *lru.Cache) error { |
| if len(rows) == 0 { |
| return nil |
| } |
| ctx, span := trace.StartSpan(ctx, "batchCreateOptions") |
| span.AddAttributes(trace.Int64Attribute("options", int64(len(rows)))) |
| defer span.End() |
| const chunkSize = 200 // Arbitrarily picked |
| err := util.ChunkIter(len(rows), chunkSize, func(startIdx int, endIdx int) error { |
| if err := ctx.Err(); err != nil { |
| return err |
| } |
| batch := rows[startIdx:endIdx] |
| if len(batch) == 0 { |
| return nil |
| } |
| statement := `INSERT INTO Options (options_id, keys) VALUES ` |
| const valuesPerRow = 2 |
| statement += sqlutil.ValuesPlaceholders(valuesPerRow, len(batch)) |
| arguments := make([]interface{}, 0, valuesPerRow*len(batch)) |
| for _, row := range batch { |
| arguments = append(arguments, row.OptionsID, row.Keys) |
| } |
| // ON CONFLICT DO NOTHING because if the rows already exist, then the data we are writing |
| // is immutable. |
| statement += ` ON CONFLICT DO NOTHING;` |
| |
| err := crdbpgx.ExecuteTx(ctx, db, pgx.TxOptions{}, func(tx pgx.Tx) error { |
| _, err := tx.Exec(ctx, statement, arguments...) |
| return err // Don't wrap - crdbpgx might retry |
| }) |
| return skerr.Wrap(err) |
| }) |
| if err != nil { |
| return skerr.Wrapf(err, "storing %d options", len(rows)) |
| } |
| // We've successfully written them to the DB, add them to the cache. |
| for _, r := range rows { |
| cache.Add(string(r.OptionsID), struct{}{}) |
| } |
| return nil |
| } |
| |
| // batchCreateTraces writes the given trace rows to the Traces table if they aren't |
| // already there. The values we write are immutable once written. It updates the cache after a |
| // successful write. |
| func batchCreateTraces(ctx context.Context, db crdbpgx.Conn, rows []schema.TraceRow, cache *lru.Cache) error { |
| if len(rows) == 0 { |
| return nil |
| } |
| ctx, span := trace.StartSpan(ctx, "batchCreateTraces") |
| span.AddAttributes(trace.Int64Attribute("traces", int64(len(rows)))) |
| defer span.End() |
| const chunkSize = 200 // Arbitrarily picked |
| err := util.ChunkIter(len(rows), chunkSize, func(startIdx int, endIdx int) error { |
| if err := ctx.Err(); err != nil { |
| return err |
| } |
| batch := rows[startIdx:endIdx] |
| if len(batch) == 0 { |
| return nil |
| } |
| statement := `INSERT INTO Traces (trace_id, grouping_id, keys) VALUES ` |
| const valuesPerRow = 3 |
| statement += sqlutil.ValuesPlaceholders(valuesPerRow, len(batch)) |
| arguments := make([]interface{}, 0, valuesPerRow*len(batch)) |
| for _, row := range batch { |
| arguments = append(arguments, row.TraceID, row.GroupingID, row.Keys) |
| } |
| // ON CONFLICT DO NOTHING because if the rows already exist, then the data we are writing |
| // is immutable (we aren't writing to matches_any_ignore_rule). |
| statement += ` ON CONFLICT DO NOTHING;` |
| |
| err := crdbpgx.ExecuteTx(ctx, db, pgx.TxOptions{}, func(tx pgx.Tx) error { |
| _, err := tx.Exec(ctx, statement, arguments...) |
| return err // Don't wrap - crdbpgx might retry |
| }) |
| return skerr.Wrap(err) |
| }) |
| if err != nil { |
| return skerr.Wrapf(err, "storing %d traces", len(rows)) |
| } |
| // We've successfully written them to the DB, add them to the cache. |
| for _, r := range rows { |
| cache.Add(string(r.TraceID), struct{}{}) |
| } |
| return nil |
| } |
| |
| // batchCreateUntriagedExpectations creates a set of grouping+digest from all the provided values. |
| // It then ensures these exist in the Expectations table, storing the "untriaged" label if they |
| // are not there already. As per the schema/design this allows us to query for untriaged digests |
| // more easily than doing a big scan of TraceValues. It updates the cache on a successful write. |
| func (s *sqlPrimaryIngester) batchCreateUntriagedExpectations(ctx context.Context, values []schema.TraceValueRow) error { |
| if len(values) == 0 { |
| return nil |
| } |
| ctx, span := trace.StartSpan(ctx, "batchCreateUntriagedExpectations") |
| span.AddAttributes(trace.Int64Attribute("trace_values", int64(len(values)))) |
| defer span.End() |
| |
| // Create ExpectationRows defaulting to untriaged for every data point in values. |
| // We want to make sure it's unique because it's a SQL error to try to create multiple |
| // of the same row in the same statement. |
| newExpectations := map[string]bool{} |
| var expectationRows []schema.ExpectationRow |
| for _, valueRow := range values { |
| newKey := string(valueRow.GroupingID) + string(valueRow.Digest) |
| if newExpectations[newKey] || s.expectationsCache.Contains(newKey) { |
| continue // We already are going to make this expectation row. |
| } |
| expectationRows = append(expectationRows, schema.ExpectationRow{ |
| GroupingID: valueRow.GroupingID, |
| Digest: valueRow.Digest, |
| Label: schema.LabelUntriaged, |
| }) |
| newExpectations[newKey] = true |
| } |
| if err := s.batchCreateExpectations(ctx, expectationRows); err != nil { |
| return skerr.Wrap(err) |
| } |
| // We've successfully written them to the DB, add them to the cache. |
| for key := range newExpectations { |
| s.expectationsCache.Add(key, struct{}{}) |
| } |
| return nil |
| } |
| |
| // batchCreateExpectations actually writes the provided expectation rows to the database. If any |
| // rows are already there, we don't overwrite the contents because it might already have been |
| // triaged. |
| func (s *sqlPrimaryIngester) batchCreateExpectations(ctx context.Context, rows []schema.ExpectationRow) error { |
| if len(rows) == 0 { |
| return nil |
| } |
| ctx, span := trace.StartSpan(ctx, "batchCreateExpectations") |
| span.AddAttributes(trace.Int64Attribute("expectations", int64(len(rows)))) |
| defer span.End() |
| const chunkSize = 200 // Arbitrarily picked |
| err := util.ChunkIter(len(rows), chunkSize, func(startIdx int, endIdx int) error { |
| if err := ctx.Err(); err != nil { |
| return err |
| } |
| batch := rows[startIdx:endIdx] |
| if len(batch) == 0 { |
| return nil |
| } |
| statement := `INSERT INTO Expectations (grouping_id, digest, label) VALUES ` |
| const valuesPerRow = 3 |
| statement += sqlutil.ValuesPlaceholders(valuesPerRow, len(batch)) |
| arguments := make([]interface{}, 0, valuesPerRow*len(batch)) |
| for _, row := range batch { |
| arguments = append(arguments, row.GroupingID, row.Digest, row.Label) |
| } |
| // ON CONFLICT DO NOTHING because if the rows already exist, then we already have an |
| // expectation, either previously automatically created or changed by a user. In either |
| // case, we don't want to overwrite it. |
| statement += ` ON CONFLICT DO NOTHING;` |
| |
| err := crdbpgx.ExecuteTx(ctx, s.db, pgx.TxOptions{}, func(tx pgx.Tx) error { |
| _, err := tx.Exec(ctx, statement, arguments...) |
| return err // Don't wrap - crdbpgx might retry |
| }) |
| return skerr.Wrap(err) |
| }) |
| if err != nil { |
| return skerr.Wrapf(err, "storing %d expectations", len(rows)) |
| } |
| return nil |
| } |
| |
| // batchUpdateTraceValues stores all the given rows. |
| func (s *sqlPrimaryIngester) batchUpdateTraceValues(ctx context.Context, rows []schema.TraceValueRow) error { |
| if len(rows) == 0 { |
| return nil |
| } |
| ctx, span := trace.StartSpan(ctx, "batchUpdateTraceValues") |
| span.AddAttributes(trace.Int64Attribute("values", int64(len(rows)))) |
| defer span.End() |
| const chunkSize = 200 // Arbitrarily picked |
| err := util.ChunkIter(len(rows), chunkSize, func(startIdx int, endIdx int) error { |
| if err := ctx.Err(); err != nil { |
| return err |
| } |
| batch := rows[startIdx:endIdx] |
| if len(batch) == 0 { |
| return nil |
| } |
| const statement = `UPSERT INTO TraceValues (shard, trace_id, commit_id, digest, |
| grouping_id, options_id, source_file_id) VALUES ` |
| const valuesPerRow = 7 |
| arguments := make([]interface{}, 0, valuesPerRow*len(batch)) |
| for _, row := range batch { |
| arguments = append(arguments, row.Shard, row.TraceID, row.CommitID, row.Digest, |
| row.GroupingID, row.OptionsID, row.SourceFileID) |
| } |
| vp := sqlutil.ValuesPlaceholders(valuesPerRow, len(batch)) |
| err := crdbpgx.ExecuteTx(ctx, s.db, pgx.TxOptions{}, func(tx pgx.Tx) error { |
| _, err := tx.Exec(ctx, statement+vp, arguments...) |
| return err // Don't wrap - crdbpgx might retry |
| }) |
| return skerr.Wrap(err) |
| }) |
| if err != nil { |
| return skerr.Wrapf(err, "storing %d trace values", len(rows)) |
| } |
| return nil |
| } |
| |
| // batchUpdateValuesAtHead stores all the given rows, if the given most_recent_commit_id is newer |
| // than the existing one in the DB. |
| func (s *sqlPrimaryIngester) batchUpdateValuesAtHead(ctx context.Context, rows []schema.ValueAtHeadRow) error { |
| if len(rows) == 0 { |
| return nil |
| } |
| ctx, span := trace.StartSpan(ctx, "batchUpdateValuesAtHead") |
| span.AddAttributes(trace.Int64Attribute("values", int64(len(rows)))) |
| defer span.End() |
| const chunkSize = 50 // Arbitrarily picked (smaller because more likely to contend) |
| err := util.ChunkIter(len(rows), chunkSize, func(startIdx int, endIdx int) error { |
| if err := ctx.Err(); err != nil { |
| return err |
| } |
| batch := rows[startIdx:endIdx] |
| if len(batch) == 0 { |
| return nil |
| } |
| statement := `INSERT INTO ValuesAtHead (trace_id, most_recent_commit_id, digest, |
| options_id, grouping_id, keys) VALUES ` |
| const valuesPerRow = 6 |
| statement += sqlutil.ValuesPlaceholders(valuesPerRow, len(batch)) |
| arguments := make([]interface{}, 0, valuesPerRow*len(batch)) |
| for _, row := range batch { |
| arguments = append(arguments, row.TraceID, row.MostRecentCommitID, row.Digest, |
| row.OptionsID, row.GroupingID, row.Keys) |
| } |
| // If the row already exists, we'll update these three fields if and only if the |
| // commit_id comes after or at the stored commit_id. |
| statement += ` |
| ON CONFLICT (trace_id) |
| DO UPDATE SET (most_recent_commit_id, digest, options_id) = |
| (excluded.most_recent_commit_id, excluded.digest, excluded.options_id) |
| WHERE excluded.most_recent_commit_id >= ValuesAtHead.most_recent_commit_id` |
| err := crdbpgx.ExecuteTx(ctx, s.db, pgx.TxOptions{}, func(tx pgx.Tx) error { |
| _, err := tx.Exec(ctx, statement, arguments...) |
| return err // Don't wrap - crdbpgx might retry |
| }) |
| return skerr.Wrap(err) |
| }) |
| if err != nil { |
| return skerr.Wrapf(err, "updating %d values at head", len(rows)) |
| } |
| return nil |
| } |
| |
| // batchCreatePrimaryBranchParams turns the given paramset into tile-key-value tuples and stores |
| // them to the DB. It updates the cache on success. |
| func (s *sqlPrimaryIngester) batchCreatePrimaryBranchParams(ctx context.Context, paramset paramtools.ParamSet, tile schema.TileID) error { |
| ctx, span := trace.StartSpan(ctx, "batchCreatePrimaryBranchParams") |
| defer span.End() |
| var rows []schema.PrimaryBranchParamRow |
| for key, values := range paramset { |
| for _, value := range values { |
| pr := schema.PrimaryBranchParamRow{ |
| TileID: tile, |
| Key: key, |
| Value: value, |
| } |
| if s.paramsCache.Contains(pr) { |
| continue // don't need to store it again. |
| } |
| rows = append(rows, pr) |
| } |
| } |
| |
| if len(rows) == 0 { |
| return nil |
| } |
| span.AddAttributes(trace.Int64Attribute("key_value_pairs", int64(len(rows)))) |
| |
| const chunkSize = 200 // Arbitrarily picked |
| err := util.ChunkIter(len(rows), chunkSize, func(startIdx int, endIdx int) error { |
| if err := ctx.Err(); err != nil { |
| return err |
| } |
| batch := rows[startIdx:endIdx] |
| if len(batch) == 0 { |
| return nil |
| } |
| statement := `INSERT INTO PrimaryBranchParams (tile_id, key, value) VALUES ` |
| const valuesPerRow = 3 |
| statement += sqlutil.ValuesPlaceholders(valuesPerRow, len(batch)) |
| arguments := make([]interface{}, 0, valuesPerRow*len(batch)) |
| for _, row := range batch { |
| arguments = append(arguments, row.TileID, row.Key, row.Value) |
| } |
| // ON CONFLICT DO NOTHING because if the rows already exist, the data is immutable. |
| statement += ` ON CONFLICT DO NOTHING;` |
| |
| err := crdbpgx.ExecuteTx(ctx, s.db, pgx.TxOptions{}, func(tx pgx.Tx) error { |
| _, err := tx.Exec(ctx, statement, arguments...) |
| return err // Don't wrap - crdbpgx might retry |
| }) |
| return skerr.Wrap(err) |
| }) |
| if err != nil { |
| return skerr.Wrapf(err, "storing %d primary branch params", len(rows)) |
| } |
| |
| for _, r := range rows { |
| s.paramsCache.Add(r, struct{}{}) |
| } |
| return nil |
| } |
| |
| // batchCreateTiledTraceDigests creates trace-tile-digest tuples and stores them to the DB. |
| func (s *sqlPrimaryIngester) batchCreateTiledTraceDigests(ctx context.Context, values []schema.TraceValueRow, tileID schema.TileID) error { |
| if len(values) == 0 { |
| return nil |
| } |
| // TODO(kjlubick) perhaps have a cache for these as well? |
| ctx, span := trace.StartSpan(ctx, "batchCreateTiledTraceDigests") |
| defer span.End() |
| |
| span.AddAttributes(trace.Int64Attribute("values", int64(len(values)))) |
| |
| const chunkSize = 200 // Arbitrarily picked |
| err := util.ChunkIter(len(values), chunkSize, func(startIdx int, endIdx int) error { |
| if err := ctx.Err(); err != nil { |
| return err |
| } |
| batch := values[startIdx:endIdx] |
| if len(batch) == 0 { |
| return nil |
| } |
| statement := `INSERT INTO TiledTraceDigests (trace_id, tile_id, digest, grouping_id) VALUES ` |
| const valuesPerRow = 4 |
| statement += sqlutil.ValuesPlaceholders(valuesPerRow, len(batch)) |
| arguments := make([]interface{}, 0, valuesPerRow*len(batch)) |
| for _, row := range batch { |
| arguments = append(arguments, row.TraceID, tileID, row.Digest, row.GroupingID) |
| } |
| // ON CONFLICT DO NOTHING only update things if the grouping_id is different. |
| // We don't expect the grouping_id to be different often, only when a grouping is changed. |
| statement += ` ON CONFLICT (trace_id, tile_id, digest) |
| DO UPDATE SET grouping_id = excluded.grouping_id |
| WHERE TiledTraceDigests.grouping_id != excluded.grouping_id;` |
| |
| err := crdbpgx.ExecuteTx(ctx, s.db, pgx.TxOptions{}, func(tx pgx.Tx) error { |
| _, err := tx.Exec(ctx, statement, arguments...) |
| return err // Don't wrap - crdbpgx might retry |
| }) |
| return skerr.Wrap(err) |
| }) |
| if err != nil { |
| return skerr.Wrapf(err, "storing %d Tiled Trace Digest rows", len(values)) |
| } |
| return nil |
| } |
| |
| // MonitorCacheMetrics starts a goroutine to report the cache sizes every minute |
| func (s *sqlPrimaryIngester) MonitorCacheMetrics(ctx context.Context) { |
| commitSize := metrics2.GetInt64Metric(cacheSizeMetric, map[string]string{"cache_name": "commits"}) |
| expectationSize := metrics2.GetInt64Metric(cacheSizeMetric, map[string]string{"cache_name": "expectations"}) |
| optionGroupingSize := metrics2.GetInt64Metric(cacheSizeMetric, map[string]string{"cache_name": "optionGrouping"}) |
| paramsSize := metrics2.GetInt64Metric(cacheSizeMetric, map[string]string{"cache_name": "params"}) |
| traceSize := metrics2.GetInt64Metric(cacheSizeMetric, map[string]string{"cache_name": "trace"}) |
| go util.RepeatCtx(ctx, time.Minute, func(ctx context.Context) { |
| commitSize.Update(int64(s.commitsCache.Len())) |
| expectationSize.Update(int64(s.expectationsCache.Len())) |
| optionGroupingSize.Update(int64(s.optionGroupingCache.Len())) |
| paramsSize.Update(int64(s.paramsCache.Len())) |
| traceSize.Update(int64(s.traceCache.Len())) |
| }) |
| } |
| |
| func groupingFor(keys map[string]string) map[string]string { |
| // This could one day be configurable per corpus or something. |
| return map[string]string{ |
| types.CorpusField: keys[types.CorpusField], |
| types.PrimaryKeyField: keys[types.PrimaryKeyField], |
| } |
| } |
| |
| // paramsAndOptions creates the params and options maps from a given file and entry. |
| func paramsAndOptions(gr *jsonio.GoldResults, r jsonio.Result) (map[string]string, map[string]string) { |
| params := make(map[string]string, len(gr.Key)+len(r.Key)) |
| for k, v := range gr.Key { |
| params[k] = v |
| } |
| for k, v := range r.Key { |
| params[k] = v |
| } |
| return params, r.Options |
| } |
| |
| // shouldIngest returns a descriptive error if we should ignore an entry |
| // with these params/options. |
| func shouldIngest(params, options map[string]string) error { |
| // Ignore anything that is not a png. In the early days (pre-2015), ext was omitted |
| // but implied to be "png". Thus if ext is not provided, it will be ingested. |
| // New entries (created by goldctl) will always have ext set. |
| if ext, ok := options["ext"]; ok && (ext != "png") { |
| return errors.New("ignoring non-png entry") |
| } |
| |
| // Make sure the test name meets basic requirements. |
| testName := params[types.PrimaryKeyField] |
| |
| // Ignore results that don't have a test given and log an error since that |
| // should not happen. But we want to keep other results in the same input file. |
| if testName == "" { |
| return errors.New("missing test name") |
| } |
| |
| // Make sure the test name does not exceed the allowed length. |
| if len(testName) > types.MaximumNameLength { |
| return skerr.Fmt("received test name which is longer than the allowed %d bytes: %s", types.MaximumNameLength, testName) |
| } |
| |
| return nil |
| } |
| |
| // Make sure sqlPrimaryIngester implements the ingestion.Processor interface. |
| var _ ingestion.Processor = (*sqlPrimaryIngester)(nil) |