blob: 5ea9f94eadb4706dc75e49861792e54db3e9b12a [file] [log] [blame] [edit]
// The gitilesfollower executable monitors the repo we are tracking using gitiles. It fills in
// the GitCommits table, specifically making a mapping between a GitHash and CommitID. The CommitID
// is based on the "index" of the commit (i.e. how many commits since the initial commit).
//
// This will be used by all clients that have their tests in the same repo as the code under test.
// Clients with more complex repo structures, will need to have an alternate way of linking
// commit_id to git_hash.
package main
import (
"context"
"flag"
"fmt"
"net/http"
"regexp"
"strconv"
"time"
"github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx"
"github.com/google/uuid"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"go.opencensus.io/trace"
"golang.org/x/oauth2/google"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/gitiles"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/sql/sqlutil"
"go.skia.org/infra/go/util"
"go.skia.org/infra/go/vcsinfo"
"go.skia.org/infra/golden/go/config"
"go.skia.org/infra/golden/go/sql"
"go.skia.org/infra/golden/go/sql/schema"
"go.skia.org/infra/golden/go/tracing"
)
const (
// Arbitrary number
maxSQLConnections = 4
// The initial commit will be given this commit ID. Subsequent commits will have monotonically
// increasing integers as IDs. We pick this number instead of zero in case we need to go
// backwards, we can assign a non-negative integer as an id (which won't break the sort order
// when turned into a string).
initialID = 1_000_000_000
// If overrideLatestCommit is set on a context, the associated value will be used instead of
// querying gitiles (which changes over time). This is used by tests.
overrideLatestCommitKey = contextKey("override_latest_commit")
)
type contextKey string // See advice in https://golang.org/pkg/context/#WithValue
type repoFollowerConfig struct {
config.Common
// InitialCommit that we will use if there are no existing commits in the DB. It will be counted
// like a "commit zero", which we actually assign to commit 1 billion in case we need to go back
// in time, we can sort our commit_ids without resorting to negative numbers.
InitialCommit string `json:"initial_commit"`
// ReposToMonitorCLs is a list of all repos that we need to monitor for commits that could
// correspond to CLs. When those CLs land, we need to merge in the expectations associated
// with that CL to the primary branch.
ReposToMonitorCLs []monitorConfig `json:"repos_to_monitor_cls"`
// PollPeriod is how often we should poll the source of truth.
PollPeriod config.Duration `json:"poll_period"`
}
// monitorConfig houses the data we need to track a repo and determine which CLs a landed commit
// corresponds to.
type monitorConfig struct {
// RepoURL is the url that will be polled via gitiles.
RepoURL string `json:"repo_url"`
// ExtractionTechnique codifies the methods for linking (via a commit message/body) to a CL.
ExtractionTechnique extractionTechnique `json:"extraction_technique"`
// InitialCommit that we will used if there is no monitoring progress stored in the DB.
// It should be before/at the point where we migrated expectations.
InitialCommit string `json:"initial_commit"`
// SystemName is the abbreviation that is given to a given CodeReviewSystem.
SystemName string `json:"system_name"`
// LegacyUpdaterInUse indicates the status of the CLs should not be changed because the source
// of truth for expectations is still Firestore, which is controlled by gold_frontend.
// This should be able to be removed after the SQL migration is complete.
LegacyUpdaterInUse bool `json:"legacy_updater_in_use"`
// branch is the git branch of the repo that we will poll for the latest commit. It is set to
// the primary branch of this repo.
branch string
}
type extractionTechnique string
const (
// ReviewedLine corresponds to looking for a Reviewed-on line in the commit message.
ReviewedLine = extractionTechnique("ReviewedLine")
// FromSubject corresponds to looking at the title for a CL ID in square brackets.
FromSubject = extractionTechnique("FromSubject")
)
func main() {
// Command line flags.
var (
commonInstanceConfig = flag.String("common_instance_config", "", "Path to the json5 file containing the configuration that needs to be the same across all services for a given instance.")
thisConfig = flag.String("config", "", "Path to the json5 file containing the configuration specific to gitiles follower server.")
hang = flag.Bool("hang", false, "Stop and do nothing after reading the flags. Good for debugging containers.")
)
// Parse the options. So we can configure logging.
flag.Parse()
if *hang {
sklog.Info("Hanging")
select {}
}
var rfc repoFollowerConfig
if err := config.LoadFromJSON5(&rfc, commonInstanceConfig, thisConfig); err != nil {
sklog.Fatalf("Reading config: %s", err)
}
sklog.Infof("Loaded config %#v", rfc)
common.InitWithMust(
"gitilesfollower",
common.PrometheusOpt(&rfc.PromPort),
)
if err := tracing.Initialize(1, rfc.SQLDatabaseName); err != nil {
sklog.Fatalf("Could not set up tracing: %s", err)
}
ctx := context.Background()
db := mustInitSQLDatabase(ctx, rfc)
ts, err := google.DefaultTokenSource(ctx)
if err != nil {
sklog.Fatalf("Problem setting up default token source: %s", err)
}
client := httputils.DefaultClientConfig().WithTokenSource(ts).Client()
gitilesClient := gitiles.NewRepo(rfc.GitRepoURL, client)
// This starts a goroutine in the background
if err := pollRepo(ctx, db, gitilesClient, rfc); err != nil {
sklog.Fatalf("Could not do initial update: %s", err)
}
if err := checkForLanded(ctx, db, client, rfc); err != nil {
sklog.Fatalf("Could not do initial scan of landed CLs: %s", err)
}
sklog.Infof("Initial update complete")
http.HandleFunc("/healthz", httputils.ReadyHandleFunc)
sklog.Fatal(http.ListenAndServe(rfc.ReadyPort, nil))
}
func mustInitSQLDatabase(ctx context.Context, fcc repoFollowerConfig) *pgxpool.Pool {
if fcc.SQLDatabaseName == "" {
sklog.Fatalf("Must have SQL Database Information")
}
url := sql.GetConnectionURL(fcc.SQLConnection, fcc.SQLDatabaseName)
conf, err := pgxpool.ParseConfig(url)
if err != nil {
sklog.Fatalf("error getting postgres config %s: %s", url, err)
}
conf.MaxConns = maxSQLConnections
db, err := pgxpool.ConnectConfig(ctx, conf)
if err != nil {
sklog.Fatalf("error connecting to the database: %s", err)
}
sklog.Infof("Connected to SQL database %s", fcc.SQLDatabaseName)
return db
}
// pollRepo does an initial updateCycle and starts a goroutine to continue updating according
// to the provided duration for as long as the context remains ok.
func pollRepo(ctx context.Context, db *pgxpool.Pool, client *gitiles.Repo, rfc repoFollowerConfig) error {
sklog.Infof("Doing initial update")
err := updateCycle(ctx, db, client, rfc)
if err != nil {
return skerr.Wrap(err)
}
go func() {
ct := time.NewTicker(rfc.PollPeriod.Duration)
defer ct.Stop()
sklog.Infof("Polling every %s", rfc.PollPeriod.Duration)
for {
select {
case <-ctx.Done():
sklog.Errorf("Stopping polling due to context error: %s", ctx.Err())
return
case <-ct.C:
err := updateCycle(ctx, db, client, rfc)
if err != nil {
sklog.Errorf("Error on this cycle for talking to %s: %s", rfc.GitRepoURL, err)
}
}
}
}()
return nil
}
// GitilesLogger is a subset of the gitiles client library that we need. This allows us to mock
// it out during tests.
type GitilesLogger interface {
Log(ctx context.Context, logExpr string, opts ...gitiles.LogOption) ([]*vcsinfo.LongCommit, error)
LogFirstParent(ctx context.Context, from, to string, opts ...gitiles.LogOption) ([]*vcsinfo.LongCommit, error)
}
// updateCycle polls the gitiles repo for the latest commit and the database for the previously
// seen commit. If those are different, it polls gitiles for all commits that happened between
// those two points and stores them to the DB.
func updateCycle(ctx context.Context, db *pgxpool.Pool, client GitilesLogger, rfc repoFollowerConfig) error {
ctx, span := trace.StartSpan(ctx, "gitilesfollower_updateCycle")
defer span.End()
latestHash, err := getLatestCommitFromRepo(ctx, client, rfc.GitRepoBranch)
if err != nil {
return skerr.Wrap(err)
}
previousHash, previousID, err := getPreviousCommitFromDB(ctx, db)
if err != nil {
return skerr.Wrapf(err, "getting recent commits from DB")
}
if previousHash == latestHash {
sklog.Infof("no updates - latest seen commit %s", previousHash)
return nil
}
if previousHash == "" {
previousHash = rfc.InitialCommit
previousID = initialID
}
sklog.Infof("Getting git history from %s to %s", previousHash, latestHash)
commits, err := client.LogFirstParent(ctx, previousHash, latestHash)
if err != nil {
return skerr.Wrapf(err, "getting backlog of commits from %s..%s", previousHash, latestHash)
}
// commits is backwards and LogFirstParent does not respect gitiles.LogReverse()
reverse(commits)
sklog.Infof("Got %d commits to store", len(commits))
if err := storeCommits(ctx, db, previousID, commits); err != nil {
return skerr.Wrapf(err, "storing %d commits to GitCommits table", len(commits))
}
return nil
}
// reverses the order of the slice.
func reverse(commits []*vcsinfo.LongCommit) {
total := len(commits)
for i := 0; i < total/2; i++ {
commits[i], commits[total-i-1] = commits[total-i-1], commits[i]
}
}
// getLatestCommitFromRepo returns the git hash of the latest git commit known on the configured
// branch. If overrideLatestCommitKey has a value set, that will be used instead.
func getLatestCommitFromRepo(ctx context.Context, client GitilesLogger, branch string) (string, error) {
if hash := ctx.Value(overrideLatestCommitKey); hash != nil {
return hash.(string), nil
}
ctx, span := trace.StartSpan(ctx, "gitilesfollower_getLatestCommitFromRepo")
defer span.End()
latestCommit, err := client.Log(ctx, branch, gitiles.LogLimit(1))
if err != nil {
return "", skerr.Wrapf(err, "getting last commit")
}
if len(latestCommit) < 1 {
return "", skerr.Fmt("No commits returned")
}
return latestCommit[0].Hash, nil
}
// getPreviousCommitFromDB returns the git_hash and the commit_id of the most recently stored
// commit. "Most recent" here is defined by the lexicographical order of the commit_id. Of note,
// commit_id is returned as an integer because subsequent ids will be computed by adding to that
// integer value.
//
// This approach takes a lesson from Perf by only querying data from the most recent commit in the
// DB and the latest on the tree to make Gold resilient to merged/changed history.
// (e.g. go/skia-infra-pm-007)
func getPreviousCommitFromDB(ctx context.Context, db *pgxpool.Pool) (string, int64, error) {
ctx, span := trace.StartSpan(ctx, "gitilesfollower_getPreviousCommitFromDB")
defer span.End()
row := db.QueryRow(ctx, `SELECT git_hash, commit_id FROM GitCommits
ORDER BY commit_id DESC LIMIT 1`)
hash := ""
id := ""
if err := row.Scan(&hash, &id); err != nil {
if err == pgx.ErrNoRows {
return "", 0, nil // No data in GitCommits
}
return "", 0, skerr.Wrap(err)
}
idInt, err := strconv.ParseInt(id, 10, 64)
if err != nil {
return "", 0, skerr.Wrapf(err, "It is assumed that the commit ids for this type of repo tracking are ints: %q", id)
}
return hash, idInt, nil
}
// storeCommits writes the given commits to the SQL database, assigning them commitIDs in
// monotonically increasing order. The commits slice is expected to be sorted with the oldest
// commit first (the opposite of how gitiles returns it).
func storeCommits(ctx context.Context, db *pgxpool.Pool, lastCommitID int64, commits []*vcsinfo.LongCommit) error {
ctx, span := trace.StartSpan(ctx, "gitilesfollower_storeCommits")
defer span.End()
commitID := lastCommitID + 1
// batchSize is only really relevant in the initial load. But we need it to avoid going over
// the 65k limit of placeholder indexes.
const batchSize = 1000
const statement = `UPSERT INTO GitCommits (git_hash, commit_id, commit_time, author_email, subject) VALUES `
const valuesPerRow = 5
err := util.ChunkIter(len(commits), batchSize, func(startIdx int, endIdx int) error {
chunk := commits[startIdx:endIdx]
arguments := make([]interface{}, 0, len(chunk)*valuesPerRow)
for _, c := range chunk {
cid := fmt.Sprintf("%012d", commitID)
arguments = append(arguments, c.Hash, cid, c.Timestamp, c.Author, c.Subject)
commitID++
}
vp := sqlutil.ValuesPlaceholders(valuesPerRow, len(chunk))
if _, err := db.Exec(ctx, statement+vp, arguments...); err != nil {
return skerr.Wrap(err)
}
return nil
})
return skerr.Wrap(err)
}
// checkForLanded will check all recent commits in the ReposToMonitor for any references to CLs
// that have landed. Then, it starts a go routine to continue this periodically.
func checkForLanded(ctx context.Context, db *pgxpool.Pool, client *http.Client, rfc repoFollowerConfig) error {
if len(rfc.ReposToMonitorCLs) == 0 {
sklog.Infof("No repos to monitor landed CLs")
return nil
}
sklog.Infof("Doing initial check for landed CLs")
var gClients []*gitiles.Repo
for _, repo := range rfc.ReposToMonitorCLs {
gClients = append(gClients, gitiles.NewRepo(repo.RepoURL, client))
}
for i, client := range gClients {
cfg := rfc.ReposToMonitorCLs[i]
cfg.branch = rfc.GitRepoBranch
err := checkForLandedCycle(ctx, db, client, cfg)
if err != nil {
return skerr.Wrap(err)
}
}
go func() {
ct := time.NewTicker(rfc.PollPeriod.Duration)
defer ct.Stop()
sklog.Infof("Checking for landed CLs every %s", rfc.PollPeriod.Duration)
for {
select {
case <-ctx.Done():
sklog.Errorf("Stopping landed check due to context error: %s", ctx.Err())
return
case <-ct.C:
for i, client := range gClients {
err := checkForLandedCycle(ctx, db, client, rfc.ReposToMonitorCLs[i])
if err != nil {
sklog.Errorf("Error checking for landed commits with configuration %s", rfc.ReposToMonitorCLs[i], err)
}
}
sklog.Infof("Checked %d repos for landed CLs", len(gClients))
}
}
}()
return nil
}
// checkForLandedCycle will see if there are any recent commits for the given repo. If there are,
// it will find any corresponding CLs for them and migrate the expectations associated with them
// to the primary branch and mark them as "landed" in the DB.
func checkForLandedCycle(ctx context.Context, db *pgxpool.Pool, client GitilesLogger, m monitorConfig) error {
ctx, span := trace.StartSpan(ctx, "gitilesfollower_checkForLandedCycle")
span.AddAttributes(trace.StringAttribute("repo", m.RepoURL))
defer span.End()
latestHash, err := getLatestCommitFromRepo(ctx, client, m.branch)
if err != nil {
return skerr.Wrap(err)
}
previousHash, err := getPreviouslyLandedCommit(ctx, db, m.RepoURL)
if err != nil {
return skerr.Wrapf(err, "getting recently landed commit from DB for repo %s", m.RepoURL)
}
if previousHash == latestHash {
sklog.Infof("no updates - latest seen commit %s", previousHash)
return nil
}
if previousHash == "" {
previousHash = m.InitialCommit
}
sklog.Infof("Getting git history from %s to %s", previousHash, latestHash)
commits, err := client.LogFirstParent(ctx, previousHash, latestHash)
if err != nil {
return skerr.Wrapf(err, "getting backlog of commits from %s..%s", previousHash, latestHash)
}
if len(commits) == 0 {
sklog.Warningf("No commits between %s and %s", previousHash, latestHash)
return nil
}
// commits is backwards and LogFirstParent does not respect gitiles.LogReverse()
reverse(commits)
sklog.Infof("Found %d commits to check for a CL", len(commits))
for _, c := range commits {
var clID string
switch m.ExtractionTechnique {
case ReviewedLine:
clID = extractReviewedLine(c.Body)
case FromSubject:
clID = extractFromSubject(c.Subject)
}
if clID == "" {
sklog.Infof("No CL detected for %#v", c)
continue
}
if err := migrateExpectationsToPrimaryBranch(ctx, db, m.SystemName, clID, c.Timestamp, !m.LegacyUpdaterInUse); err != nil {
return skerr.Wrapf(err, "migrating cl %s-%s", m.SystemName, clID)
}
sklog.Infof("Commit %s landed at %s", c.Hash[:12], c.Timestamp)
}
_, err = db.Exec(ctx, `UPSERT INTO TrackingCommits (repo, last_git_hash) VALUES ($1, $2)`, m.RepoURL, latestHash)
return skerr.Wrap(err)
}
// getPreviouslyLandedCommit returns the git hash of the last commit we checked for a CL in the
// given repo.
func getPreviouslyLandedCommit(ctx context.Context, db *pgxpool.Pool, repoURL string) (string, error) {
ctx, span := trace.StartSpan(ctx, "getPreviouslyLandedCommit")
defer span.End()
row := db.QueryRow(ctx, `SELECT last_git_hash FROM TrackingCommits WHERE repo = $1`, repoURL)
var rv string
if err := row.Scan(&rv); err != nil {
if err == pgx.ErrNoRows {
return "", nil // No data in TrackingCommits
}
return "", skerr.Wrap(err)
}
return rv, nil
}
var reviewedLineRegex = regexp.MustCompile(`(^|\n)Reviewed-on: .+/(?P<clID>\S+?)($|\n)`)
// extractReviewedLine looks for a line that starts with Reviewed-on and then parses out the
// CL id from that line (which are the last characters after the last slash).
func extractReviewedLine(clBody string) string {
match := reviewedLineRegex.FindStringSubmatch(clBody)
if len(match) > 0 {
return match[2] // the second group should be our CL ID
}
return ""
}
// We assume a PR has the pull request number in the Subject/Title, at the end.
// e.g. "Turn off docs upload temporarily (#44365) (#44413)" refers to PR 44413
var prSuffix = regexp.MustCompile(`.+\(#(?P<id>\d+)\)\s*$`)
// extractFromSubject looks at the subject of a CL and expects to find the associated CL (aka Pull
// Request) appended to the message.
func extractFromSubject(subject string) string {
if match := prSuffix.FindStringSubmatch(subject); match != nil {
// match[0] is the whole string, match[1] is the first group
return match[1]
}
return ""
}
// migrateExpectationsToPrimaryBranch finds all the expectations that were added for a given CL
// and condenses them into one record per user who triaged digests on that CL. These records are
// all stored with the same timestamp as the commit that landed with them. The records and their
// corresponding expectations are added to the primary branch. Then the given CL is marked as
// "landed".
func migrateExpectationsToPrimaryBranch(ctx context.Context, db *pgxpool.Pool, crs, clID string, landedTS time.Time, setLanded bool) error {
ctx, span := trace.StartSpan(ctx, "migrateExpectationsToPrimaryBranch")
defer span.End()
qID := sql.Qualify(crs, clID)
changes, err := getExpectationChangesForCL(ctx, db, qID)
if err != nil {
return skerr.Wrap(err)
}
sklog.Infof("CL %s %s had %d expectations, which will be applied at %s", crs, clID, len(changes), landedTS)
err = storeChangesAsRecordDeltasExpectations(ctx, db, changes, landedTS)
if err != nil {
return skerr.Wrap(err)
}
if setLanded {
row := db.QueryRow(ctx, `UPDATE Changelists SET status = 'landed' WHERE changelist_id = $1 RETURNING changelist_id`, qID)
var s string
if err := row.Scan(&s); err != nil {
if err == pgx.ErrNoRows {
return nil
}
return skerr.Wrapf(err, "Updating cl %s to be landed", qID)
}
} else {
sklog.Infof("Not marking CL %s %s as landed [legacy mode in use] ", crs, clID)
}
return nil
}
type groupingDigest struct {
grouping schema.MD5Hash
digest schema.MD5Hash
}
type finalState struct {
labelBefore schema.ExpectationLabel
labelAfter schema.ExpectationLabel
userWhoTriagedLast string
}
// getExpectationChangesForCL gets all the expectations for the given CL and arranges them in
// temporal order. It de-duplicates any entries (e.g. triaging a digest to positive,then to
// negative, then to positive would be condensed to a single "triage to positive" action). Entries
// are "blamed" to the user who last touched the digest+grouping pair.
func getExpectationChangesForCL(ctx context.Context, db *pgxpool.Pool, qualifiedCLID string) (map[groupingDigest]finalState, error) {
ctx, span := trace.StartSpan(ctx, "getExpectationChangesForCL")
defer span.End()
rows, err := db.Query(ctx, `
SELECT user_name, grouping_id, digest, label_before, label_after
FROM ExpectationRecords JOIN ExpectationDeltas
ON ExpectationRecords.expectation_record_id = ExpectationDeltas.expectation_record_id
WHERE branch_name = $1
ORDER BY triage_time ASC`, qualifiedCLID)
if err != nil {
return nil, skerr.Wrapf(err, "Getting deltas and records for CL %s", qualifiedCLID)
}
defer rows.Close()
// By using a map, we can deduplicate rows and return an object that represents the final
// state of all the triage logic.
rv := map[groupingDigest]finalState{}
for rows.Next() {
var user string
var grouping schema.GroupingID
var digest schema.DigestBytes
var before schema.ExpectationLabel
var after schema.ExpectationLabel
if err := rows.Scan(&user, &grouping, &digest, &before, &after); err != nil {
return nil, skerr.Wrap(err)
}
key := groupingDigest{
grouping: sql.AsMD5Hash(grouping),
digest: sql.AsMD5Hash(digest),
}
fs, ok := rv[key]
if !ok {
// only update the label before on the first time we see a triage for a grouping.
fs.labelBefore = before
}
fs.labelAfter = after
fs.userWhoTriagedLast = user
rv[key] = fs
}
return rv, nil
}
// storeChangesAsRecordDeltasExpectations takes the given map and turns them into ExpectationDeltas.
// From there, it is able to make a record per user and store the given deltas and expectations
// according to that record.
func storeChangesAsRecordDeltasExpectations(ctx context.Context, db *pgxpool.Pool, changes map[groupingDigest]finalState, ts time.Time) error {
ctx, span := trace.StartSpan(ctx, "storeChangesAsRecordDeltasExpectations")
defer span.End()
if len(changes) == 0 {
return nil
}
// We want to make one triage record for each user who triaged data on this CL. Those records
// will represent the final state.
byUser := map[string][]schema.ExpectationDeltaRow{}
for gd, fs := range changes {
if fs.labelBefore == fs.labelAfter {
continue // skip "no-op" triages, where something was triaged in one way, then undone.
}
byUser[fs.userWhoTriagedLast] = append(byUser[fs.userWhoTriagedLast], schema.ExpectationDeltaRow{
GroupingID: sql.FromMD5Hash(gd.grouping),
Digest: sql.FromMD5Hash(gd.digest),
LabelBefore: fs.labelBefore,
LabelAfter: fs.labelAfter,
})
}
for user, deltas := range byUser {
if len(deltas) == 0 {
continue
}
recordID := uuid.New()
// Write the record for this user
err := crdbpgx.ExecuteTx(ctx, db, pgx.TxOptions{}, func(tx pgx.Tx) error {
_, err := tx.Exec(ctx, `
INSERT INTO ExpectationRecords (expectation_record_id, user_name, triage_time, num_changes)
VALUES ($1, $2, $3, $4)`, recordID, user, ts, len(deltas))
return err // Don't wrap - crdbpgx might retry
})
if err != nil {
return skerr.Wrapf(err, "storing record")
}
if err := bulkWriteDeltas(ctx, db, recordID, deltas); err != nil {
return skerr.Wrapf(err, "storing deltas")
}
if err := bulkWriteExpectations(ctx, db, recordID, deltas); err != nil {
return skerr.Wrapf(err, "storing expectations")
}
}
return nil
}
// bulkWriteDeltas stores all the deltas using a batched approach. They are all attributed to the
// provided record id.
func bulkWriteDeltas(ctx context.Context, db *pgxpool.Pool, recordID uuid.UUID, deltas []schema.ExpectationDeltaRow) error {
ctx, span := trace.StartSpan(ctx, "bulkWriteDeltas")
defer span.End()
const chunkSize = 200 // Arbitrarily picked
err := util.ChunkIter(len(deltas), chunkSize, func(startIdx int, endIdx int) error {
if err := ctx.Err(); err != nil {
return err
}
batch := deltas[startIdx:endIdx]
if len(batch) == 0 {
return nil
}
statement := `INSERT INTO ExpectationDeltas (expectation_record_id, grouping_id, digest,
label_before, label_after) VALUES `
const valuesPerRow = 5
statement += sqlutil.ValuesPlaceholders(valuesPerRow, len(batch))
arguments := make([]interface{}, 0, valuesPerRow*len(batch))
for _, row := range batch {
arguments = append(arguments, recordID, row.GroupingID, row.Digest, row.LabelBefore, row.LabelAfter)
}
err := crdbpgx.ExecuteTx(ctx, db, pgx.TxOptions{}, func(tx pgx.Tx) error {
_, err := tx.Exec(ctx, statement, arguments...)
return err // Don't wrap - crdbpgx might retry
})
return skerr.Wrap(err)
})
if err != nil {
return skerr.Wrapf(err, "storing %d expectation delta rows", len(deltas))
}
return nil
}
// bulkWriteExpectations stores all the expectations using a batched approach. They are all
// attributed to the provided record id.
func bulkWriteExpectations(ctx context.Context, db *pgxpool.Pool, recordID uuid.UUID, deltas []schema.ExpectationDeltaRow) error {
ctx, span := trace.StartSpan(ctx, "bulkWriteExpectations")
defer span.End()
const chunkSize = 200 // Arbitrarily picked
err := util.ChunkIter(len(deltas), chunkSize, func(startIdx int, endIdx int) error {
if err := ctx.Err(); err != nil {
return err
}
batch := deltas[startIdx:endIdx]
if len(batch) == 0 {
return nil
}
statement := `UPSERT INTO Expectations (grouping_id, digest, label, expectation_record_id) VALUES `
const valuesPerRow = 4
statement += sqlutil.ValuesPlaceholders(valuesPerRow, len(batch))
arguments := make([]interface{}, 0, valuesPerRow*len(batch))
for _, row := range batch {
arguments = append(arguments, row.GroupingID, row.Digest, row.LabelAfter, recordID)
}
err := crdbpgx.ExecuteTx(ctx, db, pgx.TxOptions{}, func(tx pgx.Tx) error {
_, err := tx.Exec(ctx, statement, arguments...)
return err // Don't wrap - crdbpgx might retry
})
return skerr.Wrap(err)
})
if err != nil {
return skerr.Wrapf(err, "storing %d expectation rows", len(deltas))
}
return nil
}