| package main |
| |
| import ( |
| "bytes" |
| "context" |
| "flag" |
| "io/ioutil" |
| "net/http" |
| "strings" |
| "time" |
| |
| "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx" |
| "github.com/jackc/pgtype" |
| "github.com/jackc/pgx/v4" |
| "github.com/jackc/pgx/v4/pgxpool" |
| "go.opencensus.io/trace" |
| "golang.org/x/oauth2" |
| "golang.org/x/oauth2/google" |
| |
| "go.skia.org/infra/go/auth" |
| "go.skia.org/infra/go/common" |
| "go.skia.org/infra/go/gerrit" |
| "go.skia.org/infra/go/httputils" |
| "go.skia.org/infra/go/metrics2" |
| "go.skia.org/infra/go/now" |
| "go.skia.org/infra/go/skerr" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/util" |
| "go.skia.org/infra/golden/go/code_review" |
| "go.skia.org/infra/golden/go/code_review/commenter" |
| "go.skia.org/infra/golden/go/code_review/gerrit_crs" |
| "go.skia.org/infra/golden/go/code_review/github_crs" |
| "go.skia.org/infra/golden/go/config" |
| "go.skia.org/infra/golden/go/ignore/sqlignorestore" |
| "go.skia.org/infra/golden/go/sql" |
| "go.skia.org/infra/golden/go/sql/schema" |
| "go.skia.org/infra/golden/go/storage" |
| "go.skia.org/infra/golden/go/tracing" |
| "go.skia.org/infra/golden/go/types" |
| ) |
| |
| const ( |
| // Arbitrary number |
| maxSQLConnections = 4 |
| |
| clScanRange = 5 * 24 * time.Hour |
| ) |
| |
| // TODO(kjlubick) Add a task to check for abandoned CLs. |
| |
| type periodicTasksConfig struct { |
| config.Common |
| |
| // ChangelistDiffPeriod is how often to look at recently updated CLs and tabulate the diffs |
| // for the digests produced. |
| // The diffs are not calculated in this service, but the tasks are generated here and |
| // processed in the diffcalculator process. |
| ChangelistDiffPeriod config.Duration `json:"changelist_diff_period"` |
| |
| // CLCommentTemplate is a string with placeholders for generating a comment message. See |
| // commenter.commentTemplateContext for the exact fields. |
| CLCommentTemplate string `json:"cl_comment_template" optional:"true"` |
| |
| // CommentOnCLsPeriod, if positive, is how often to check recent CLs and Patchsets for |
| // untriaged digests and comment on them if appropriate. |
| CommentOnCLsPeriod config.Duration `json:"comment_on_cls_period" optional:"true"` |
| |
| // PrimaryBranchDiffPeriod is how often to look at the most recent window of commits and |
| // tabulate diffs between all groupings based on the digests produced on the primary branch. |
| // The diffs are not calculated in this service, but sent via Pub/Sub to the appropriate workers. |
| PrimaryBranchDiffPeriod config.Duration `json:"primary_branch_diff_period"` |
| |
| // UpdateIgnorePeriod is how often we should try to apply the ignore rules to all traces. |
| UpdateIgnorePeriod config.Duration `json:"update_traces_ignore_period"` // TODO(kjlubick) change JSON |
| } |
| |
| func main() { |
| // Command line flags. |
| var ( |
| commonInstanceConfig = flag.String("common_instance_config", "", "Path to the json5 file containing the configuration that needs to be the same across all services for a given instance.") |
| thisConfig = flag.String("config", "", "Path to the json5 file containing the configuration specific to the periodic tasks server.") |
| hang = flag.Bool("hang", false, "Stop and do nothing after reading the flags. Good for debugging containers.") |
| ) |
| |
| // Parse the options. So we can configure logging. |
| flag.Parse() |
| |
| if *hang { |
| sklog.Info("Hanging") |
| select {} |
| } |
| |
| var ptc periodicTasksConfig |
| if err := config.LoadFromJSON5(&ptc, commonInstanceConfig, thisConfig); err != nil { |
| sklog.Fatalf("Reading config: %s", err) |
| } |
| sklog.Infof("Loaded config %#v", ptc) |
| |
| // Set up the logging options. |
| logOpts := []common.Opt{ |
| common.PrometheusOpt(&ptc.PromPort), |
| } |
| |
| tp := 0.01 |
| if ptc.TracingProportion > 0 { |
| tp = ptc.TracingProportion |
| } |
| common.InitWithMust("periodictasks", logOpts...) |
| if err := tracing.Initialize(tp, ptc.SQLDatabaseName); err != nil { |
| sklog.Fatalf("Could not set up tracing: %s", err) |
| } |
| |
| ctx := context.Background() |
| db := mustInitSQLDatabase(ctx, ptc) |
| |
| startUpdateTracesIgnoreStatus(ctx, db, ptc) |
| |
| startCommentOnCLs(ctx, db, ptc) |
| |
| gatherer := &diffWorkGatherer{ |
| db: db, |
| windowSize: ptc.WindowSize, |
| mostRecentCLScan: now.Now(ctx).Add(-clScanRange), |
| } |
| startPrimaryBranchDiffWork(ctx, gatherer, ptc) |
| startChangelistsDiffWork(ctx, gatherer, ptc) |
| startDiffWorkMetrics(ctx, db) |
| startBackupStatusCheck(ctx, db, ptc) |
| startKnownDigestsSync(ctx, db, ptc) |
| |
| sklog.Infof("periodic tasks have been started") |
| http.HandleFunc("/healthz", httputils.ReadyHandleFunc) |
| sklog.Fatal(http.ListenAndServe(ptc.ReadyPort, nil)) |
| } |
| |
| func mustInitSQLDatabase(ctx context.Context, ptc periodicTasksConfig) *pgxpool.Pool { |
| if ptc.SQLDatabaseName == "" { |
| sklog.Fatalf("Must have SQL Database Information") |
| } |
| url := sql.GetConnectionURL(ptc.SQLConnection, ptc.SQLDatabaseName) |
| conf, err := pgxpool.ParseConfig(url) |
| if err != nil { |
| sklog.Fatalf("error getting postgres config %s: %s", url, err) |
| } |
| |
| conf.MaxConns = maxSQLConnections |
| db, err := pgxpool.ConnectConfig(ctx, conf) |
| if err != nil { |
| sklog.Fatalf("error connecting to the database: %s", err) |
| } |
| sklog.Infof("Connected to SQL database %s", ptc.SQLDatabaseName) |
| return db |
| } |
| |
| func startUpdateTracesIgnoreStatus(ctx context.Context, db *pgxpool.Pool, ptc periodicTasksConfig) { |
| liveness := metrics2.NewLiveness("periodic_tasks", map[string]string{ |
| "task": "updateTracesIgnoreStatus", |
| }) |
| go util.RepeatCtx(ctx, ptc.UpdateIgnorePeriod.Duration, func(ctx context.Context) { |
| sklog.Infof("Updating traces and values at head with ignore status") |
| ctx, span := trace.StartSpan(ctx, "periodic_updateTracesIgnoreStatus") |
| defer span.End() |
| if err := sqlignorestore.UpdateIgnoredTraces(ctx, db); err != nil { |
| sklog.Errorf("Error while updating traces ignore status: %s", err) |
| return // return so the liveness is not updated |
| } |
| liveness.Reset() |
| sklog.Infof("Done with updateTracesIgnoreStatus") |
| }) |
| } |
| |
| func startCommentOnCLs(ctx context.Context, db *pgxpool.Pool, ptc periodicTasksConfig) { |
| if ptc.CommentOnCLsPeriod.Duration <= 0 { |
| sklog.Infof("Not commenting on CLs because duration was zero.") |
| return |
| } |
| systems := mustInitializeSystems(ctx, ptc) |
| cmntr, err := commenter.New(db, systems, ptc.CLCommentTemplate, ptc.SiteURL, ptc.WindowSize) |
| if err != nil { |
| sklog.Fatalf("Could not initialize commenting: %s", err) |
| } |
| liveness := metrics2.NewLiveness("periodic_tasks", map[string]string{ |
| "task": "commentOnCLs", |
| }) |
| go util.RepeatCtx(ctx, ptc.CommentOnCLsPeriod.Duration, func(ctx context.Context) { |
| sklog.Infof("Checking CLs for untriaged results and commenting if necessary") |
| ctx, span := trace.StartSpan(ctx, "periodic_commentOnCLsWithUntriagedDigests") |
| defer span.End() |
| if err := cmntr.CommentOnChangelistsWithUntriagedDigests(ctx); err != nil { |
| sklog.Errorf("Error while commenting on CLs: %s", err) |
| return // return so the liveness is not updated |
| } |
| liveness.Reset() |
| sklog.Infof("Done checking on CLs to comment") |
| }) |
| } |
| |
| // mustInitializeSystems creates code_review.Clients and returns them wrapped as a ReviewSystem. |
| // It panics if any part of configuration fails. |
| func mustInitializeSystems(ctx context.Context, ptc periodicTasksConfig) []commenter.ReviewSystem { |
| tokenSource, err := google.DefaultTokenSource(ctx, auth.ScopeGerrit) |
| if err != nil { |
| sklog.Fatalf("Failed to authenticate service account: %s", err) |
| } |
| gerritHTTPClient := httputils.DefaultClientConfig().WithTokenSource(tokenSource).Client() |
| rv := make([]commenter.ReviewSystem, 0, len(ptc.CodeReviewSystems)) |
| for _, cfg := range ptc.CodeReviewSystems { |
| var crs code_review.Client |
| if cfg.Flavor == "gerrit" { |
| if cfg.GerritURL == "" { |
| sklog.Fatal("You must specify gerrit_url") |
| } |
| gerritClient, err := gerrit.NewGerrit(cfg.GerritURL, gerritHTTPClient) |
| if err != nil { |
| sklog.Fatalf("Could not create gerrit client for %s", cfg.GerritURL) |
| } |
| crs = gerrit_crs.New(gerritClient) |
| } else if cfg.Flavor == "github" { |
| if cfg.GitHubRepo == "" || cfg.GitHubCredPath == "" { |
| sklog.Fatal("You must specify github_repo and github_cred_path") |
| } |
| gBody, err := ioutil.ReadFile(cfg.GitHubCredPath) |
| if err != nil { |
| sklog.Fatalf("Couldn't find githubToken in %s: %s", cfg.GitHubCredPath, err) |
| } |
| gToken := strings.TrimSpace(string(gBody)) |
| githubTS := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: gToken}) |
| c := httputils.DefaultClientConfig().With2xxOnly().WithTokenSource(githubTS).Client() |
| crs = github_crs.New(c, cfg.GitHubRepo) |
| } else { |
| sklog.Fatalf("CRS flavor %s not supported.", cfg.Flavor) |
| } |
| rv = append(rv, commenter.ReviewSystem{ |
| ID: cfg.ID, |
| Client: crs, |
| }) |
| } |
| return rv |
| } |
| |
| type diffWorkGatherer struct { |
| db *pgxpool.Pool |
| windowSize int |
| |
| mostRecentCLScan time.Time |
| } |
| |
| // startPrimaryBranchDiffWork starts the process that periodically creates rows in the SQL DB for |
| // diff workers to calculate diffs for images on the primary branch. |
| func startPrimaryBranchDiffWork(ctx context.Context, gatherer *diffWorkGatherer, ptc periodicTasksConfig) { |
| liveness := metrics2.NewLiveness("periodic_tasks", map[string]string{ |
| "task": "calculatePrimaryBranchDiffWork", |
| }) |
| go util.RepeatCtx(ctx, ptc.PrimaryBranchDiffPeriod.Duration, func(ctx context.Context) { |
| sklog.Infof("Calculating diffs for images seen recently") |
| ctx, span := trace.StartSpan(ctx, "periodic_PrimaryBranchDiffWork") |
| defer span.End() |
| if err := gatherer.gatherFromPrimaryBranch(ctx); err != nil { |
| sklog.Errorf("Error while gathering diff work on primary branch: %s", err) |
| return // return so the liveness is not updated |
| } |
| liveness.Reset() |
| sklog.Infof("Done with sending diffs from primary branch") |
| }) |
| } |
| |
| // gatherFromPrimaryBranch finds all groupings that have recent data on the primary branch and |
| // makes sure a row exists in the SQL DB for each of them. |
| func (g *diffWorkGatherer) gatherFromPrimaryBranch(ctx context.Context) error { |
| ctx, span := trace.StartSpan(ctx, "gatherFromPrimaryBranch") |
| defer span.End() |
| // Doing the get/join/insert all in 1 transaction did not work when there are many groupings |
| // and many diffcalculator processes - too much contention. |
| groupingsInWindow, err := g.getDistinctGroupingsInWindow(ctx) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| alreadyProcessedGroupings, err := g.getGroupingsBeingProcessed(ctx) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| apg := map[string]bool{} |
| for _, g := range alreadyProcessedGroupings { |
| apg[string(g)] = true |
| } |
| // TODO(kjlubick) periodically remove groupings that are not at HEAD anymore. |
| var newGroupings []schema.GroupingID |
| for _, g := range groupingsInWindow { |
| if !apg[string(g)] { |
| newGroupings = append(newGroupings, g) |
| } |
| } |
| sklog.Infof("There are currently %d groupings in the window and %d groupings being processed for diffs. This cycle, there were %d new groupings detected.", |
| len(groupingsInWindow), len(alreadyProcessedGroupings), len(newGroupings)) |
| |
| if len(newGroupings) == 0 { |
| return nil |
| } |
| |
| return skerr.Wrap(g.addNewGroupingsForProcessing(ctx, newGroupings)) |
| } |
| |
| // getDistinctGroupingsInWindow returns the distinct grouping ids seen within the current window. |
| func (g *diffWorkGatherer) getDistinctGroupingsInWindow(ctx context.Context) ([]schema.GroupingID, error) { |
| ctx, span := trace.StartSpan(ctx, "getDistinctGroupingsInWindow") |
| defer span.End() |
| |
| const statement = `WITH |
| RecentCommits AS ( |
| SELECT commit_id FROM CommitsWithData |
| ORDER BY commit_id DESC LIMIT $1 |
| ), |
| FirstCommitInWindow AS ( |
| SELECT commit_id FROM RecentCommits |
| ORDER BY commit_id ASC LIMIT 1 |
| ) |
| SELECT DISTINCT grouping_id FROM ValuesAtHead |
| JOIN FirstCommitInWindow ON ValuesAtHead.most_recent_commit_id >= FirstCommitInWindow.commit_id` |
| |
| rows, err := g.db.Query(ctx, statement, g.windowSize) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| var rv []schema.GroupingID |
| for rows.Next() { |
| var id schema.GroupingID |
| if err := rows.Scan(&id); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| rv = append(rv, id) |
| } |
| return rv, nil |
| } |
| |
| // getGroupingsBeingProcessed returns all groupings that we are currently computing diffs for. |
| func (g *diffWorkGatherer) getGroupingsBeingProcessed(ctx context.Context) ([]schema.GroupingID, error) { |
| ctx, span := trace.StartSpan(ctx, "getGroupingsBeingProcessed") |
| defer span.End() |
| |
| const statement = `SELECT DISTINCT grouping_id FROM PrimaryBranchDiffCalculationWork |
| AS OF SYSTEM TIME '-0.1s'` |
| |
| rows, err := g.db.Query(ctx, statement) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| var rv []schema.GroupingID |
| for rows.Next() { |
| var id schema.GroupingID |
| if err := rows.Scan(&id); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| rv = append(rv, id) |
| } |
| return rv, nil |
| } |
| |
| // addNewGroupingsForProcessing updates the PrimaryBranchDiffCalculationWork table with the newly |
| // provided groupings, such that we will start computing diffs for them. This table is potentially |
| // under a lot of contention. We try to write some sentinel values, but if there are already values |
| // there, we will bail out. |
| func (g *diffWorkGatherer) addNewGroupingsForProcessing(ctx context.Context, groupings []schema.GroupingID) error { |
| ctx, span := trace.StartSpan(ctx, "addNewGroupingsForProcessing") |
| defer span.End() |
| span.AddAttributes(trace.Int64Attribute("num_groupings", int64(len(groupings)))) |
| statement := `INSERT INTO PrimaryBranchDiffCalculationWork (grouping_id, last_calculated_ts, calculation_lease_ends) VALUES` |
| const valuesPerRow = 3 |
| vp := sql.ValuesPlaceholders(valuesPerRow, len(groupings)) |
| statement = statement + vp + ` ON CONFLICT DO NOTHING` |
| args := make([]interface{}, 0, valuesPerRow*len(groupings)) |
| // This time will make sure we compute diffs for this soon. |
| beginningOfTime := time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC) |
| for _, g := range groupings { |
| args = append(args, g, beginningOfTime, beginningOfTime) |
| } |
| |
| err := crdbpgx.ExecuteTx(ctx, g.db, pgx.TxOptions{}, func(tx pgx.Tx) error { |
| _, err := g.db.Exec(ctx, statement, args...) |
| return err // may be retried |
| }) |
| return skerr.Wrap(err) |
| } |
| |
| // startChangelistsDiffWork starts the process that periodically creates rows in the SQL DB for |
| ///diff workers to calculate diffs for images produced by CLs. |
| func startChangelistsDiffWork(ctx context.Context, gatherer *diffWorkGatherer, ptc periodicTasksConfig) { |
| liveness := metrics2.NewLiveness("periodic_tasks", map[string]string{ |
| "task": "calculateChangelistsDiffWork", |
| }) |
| go util.RepeatCtx(ctx, ptc.PrimaryBranchDiffPeriod.Duration, func(ctx context.Context) { |
| sklog.Infof("Calculating diffs for images produced on CLs") |
| ctx, span := trace.StartSpan(ctx, "periodic_ChangelistsDiffWork") |
| defer span.End() |
| if err := gatherer.gatherFromChangelists(ctx); err != nil { |
| sklog.Errorf("Error while gathering diff work on CLs: %s", err) |
| return // return so the liveness is not updated |
| } |
| liveness.Reset() |
| sklog.Infof("Done with sending diffs from CLs") |
| }) |
| } |
| |
| // gatherFromChangelists scans all recently updated CLs and creates a row in the SQL DB for each |
| // grouping and CL that saw images not already on the primary branch. |
| func (g *diffWorkGatherer) gatherFromChangelists(ctx context.Context) error { |
| ctx, span := trace.StartSpan(ctx, "gatherFromChangelists") |
| defer span.End() |
| firstTileIDInWindow, err := g.getFirstTileIDInWindow(ctx) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| updatedTS := now.Now(ctx) |
| // Check all changelists updated since last cycle (or 5 days, initially). |
| clIDs, err := g.getRecentlyUpdatedChangelists(ctx) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| span.AddAttributes(trace.Int64Attribute("num_cls", int64(len(clIDs)))) |
| if len(clIDs) != 0 { |
| for _, cl := range clIDs { |
| sklog.Debugf("Creating diff work for CL %s", cl) |
| if err := g.createDiffRowsForCL(ctx, firstTileIDInWindow, cl); err != nil { |
| return skerr.Wrap(err) |
| } |
| } |
| } |
| g.mostRecentCLScan = updatedTS |
| if err := g.deleteOldCLDiffWork(ctx); err != nil { |
| return skerr.Wrap(err) |
| } |
| return nil |
| } |
| |
| // getFirstTileIDInWindow returns the first tile in the current sliding window of commits. |
| func (g *diffWorkGatherer) getFirstTileIDInWindow(ctx context.Context) (schema.TileID, error) { |
| ctx, span := trace.StartSpan(ctx, "getFirstTileIDInWindow") |
| defer span.End() |
| const statement = `WITH |
| RecentCommits AS ( |
| SELECT tile_id, commit_id FROM CommitsWithData |
| ORDER BY commit_id DESC LIMIT $1 |
| ) |
| SELECT tile_id FROM RecentCommits ORDER BY commit_id ASC LIMIT 1 |
| ` |
| row := g.db.QueryRow(ctx, statement, g.windowSize) |
| var id schema.TileID |
| if err := row.Scan(&id); err != nil { |
| return -1, skerr.Wrap(err) |
| } |
| return id, nil |
| } |
| |
| // getRecentlyUpdatedChangelists returns the qualified IDs of all CLs that were updated after |
| // the most recent CL scan. |
| func (g *diffWorkGatherer) getRecentlyUpdatedChangelists(ctx context.Context) ([]string, error) { |
| ctx, span := trace.StartSpan(ctx, "getRecentlyUpdatedChangelists") |
| defer span.End() |
| const statement = ` |
| SELECT changelist_id FROM Changelists WHERE last_ingested_data >= $1 |
| ` |
| rows, err := g.db.Query(ctx, statement, g.mostRecentCLScan) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| var ids []string |
| for rows.Next() { |
| var id string |
| if err := rows.Scan(&id); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| ids = append(ids, id) |
| } |
| return ids, nil |
| } |
| |
| // createDiffRowsForCL finds all data produced by this CL and compares it against the data on the |
| // primary branch. If any digest is not already on the primary branch (in the sliding window), it |
| // will be gathered up into one row in the SQL DB for diff calculation |
| func (g *diffWorkGatherer) createDiffRowsForCL(ctx context.Context, startingTile schema.TileID, cl string) error { |
| ctx, span := trace.StartSpan(ctx, "createDiffRowsForCL") |
| defer span.End() |
| span.AddAttributes(trace.StringAttribute("CL", cl)) |
| row := g.db.QueryRow(ctx, `SELECT last_ingested_data FROM ChangeLists WHERE changelist_id = $1`, cl) |
| var updatedTS time.Time |
| if err := row.Scan(&updatedTS); err != nil { |
| return skerr.Wrap(err) |
| } |
| updatedTS = updatedTS.UTC() |
| |
| const statement = `WITH |
| DataForCL AS ( |
| SELECT DISTINCT grouping_id, digest FROM SecondaryBranchValues |
| WHERE branch_name = $1 |
| ), |
| DigestsNotOnPrimaryBranch AS ( |
| -- We do a left join and check for null to pull only those digests that are not already |
| -- on the primary branch. Those new digests we have to include in our diff calculations. |
| SELECT DISTINCT DataForCL.grouping_id, DataForCL.digest FROM DataForCL |
| LEFT JOIN TiledTraceDigests ON DataForCL.grouping_id = TiledTraceDigests.grouping_id |
| AND DataForCL.digest = TiledTraceDigests.digest |
| AND TiledTraceDigests.tile_id >= $2 |
| WHERE TiledTraceDigests.digest IS NULL |
| ) |
| SELECT Groupings.grouping_id, encode(DigestsNotOnPrimaryBranch.digest, 'hex') FROM DigestsNotOnPrimaryBranch |
| JOIN Groupings ON DigestsNotOnPrimaryBranch.grouping_id = Groupings.grouping_id |
| ORDER BY 1, 2 |
| ` |
| rows, err := g.db.Query(ctx, statement, cl, startingTile) |
| if err != nil { |
| return skerr.Wrap(err) |
| } |
| defer rows.Close() |
| var newDigests []types.Digest |
| var currentGroupingID schema.GroupingID |
| var workRows []schema.SecondaryBranchDiffCalculationRow |
| for rows.Next() { |
| var digest types.Digest |
| var groupingID schema.GroupingID |
| if err := rows.Scan(&groupingID, &digest); err != nil { |
| return skerr.Wrap(err) |
| } |
| if currentGroupingID == nil { |
| currentGroupingID = groupingID |
| } |
| if bytes.Equal(currentGroupingID, groupingID) { |
| newDigests = append(newDigests, digest) |
| } else { |
| workRows = append(workRows, schema.SecondaryBranchDiffCalculationRow{ |
| BranchName: cl, |
| GroupingID: currentGroupingID, |
| LastUpdated: updatedTS, |
| DigestsNotOnPrimary: newDigests, |
| }) |
| currentGroupingID = groupingID |
| // Reset newDigests to be empty and then start adding the new digests to it. |
| newDigests = []types.Digest{digest} |
| } |
| } |
| rows.Close() |
| if currentGroupingID == nil { |
| return nil // nothing to report |
| } |
| workRows = append(workRows, schema.SecondaryBranchDiffCalculationRow{ |
| BranchName: cl, |
| GroupingID: currentGroupingID, |
| LastUpdated: updatedTS, |
| DigestsNotOnPrimary: newDigests, |
| }) |
| |
| insertStatement := `INSERT INTO SecondaryBranchDiffCalculationWork |
| (branch_name, grouping_id, last_updated_ts, digests, last_calculated_ts, calculation_lease_ends) VALUES ` |
| const valuesPerRow = 6 |
| vp := sql.ValuesPlaceholders(valuesPerRow, len(workRows)) |
| insertStatement += vp |
| insertStatement += ` ON CONFLICT (branch_name, grouping_id) |
| DO UPDATE SET (last_updated_ts, digests) = (excluded.last_updated_ts, excluded.digests);` |
| |
| arguments := make([]interface{}, 0, valuesPerRow*len(workRows)) |
| epoch := time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC) |
| for _, wr := range workRows { |
| arguments = append(arguments, wr.BranchName, wr.GroupingID, wr.LastUpdated, wr.DigestsNotOnPrimary, |
| epoch, epoch) |
| } |
| err = crdbpgx.ExecuteTx(ctx, g.db, pgx.TxOptions{}, func(tx pgx.Tx) error { |
| _, err := g.db.Exec(ctx, insertStatement, arguments...) |
| return err // may be retried |
| }) |
| return skerr.Wrap(err) |
| } |
| |
| // deleteOldCLDiffWork deletes rows in the SQL DB that are "too old", that is, they belong to CLs |
| // that are several days old, beyond the clScanRange. This helps keep the number of rows down |
| // in that Table. |
| func (g *diffWorkGatherer) deleteOldCLDiffWork(ctx context.Context) error { |
| ctx, span := trace.StartSpan(ctx, "createDiffRowsForCL") |
| defer span.End() |
| |
| const statement = `DELETE FROM SecondaryBranchDiffCalculationWork WHERE last_updated_ts < $1` |
| err := crdbpgx.ExecuteTx(ctx, g.db, pgx.TxOptions{}, func(tx pgx.Tx) error { |
| cutoff := now.Now(ctx).Add(-clScanRange) |
| _, err := g.db.Exec(ctx, statement, cutoff) |
| return err // may be retried |
| }) |
| return skerr.Wrap(err) |
| } |
| |
| // startDiffWorkMetrics continuously reports how much uncalculated work there is |
| func startDiffWorkMetrics(ctx context.Context, db *pgxpool.Pool) { |
| go func() { |
| const queueSize = "diffcalculator_workqueuesize" |
| const queueFreshness = "diffcalculator_workqueuefreshness" |
| |
| for range time.Tick(time.Minute) { |
| if err := ctx.Err(); err != nil { |
| return |
| } |
| // These time values come from diffcalculator |
| const primarySizeStatement = `SELECT COUNT(*) FROM PrimaryBranchDiffCalculationWork |
| WHERE (now() - last_calculated_ts) > '1m' AND (now() - calculation_lease_ends) > '10m'` |
| row := db.QueryRow(ctx, primarySizeStatement) |
| var primarySize int64 |
| if err := row.Scan(&primarySize); err != nil { |
| sklog.Warningf("Could not compute queue size for primary branch: %s", err) |
| primarySize = -1 |
| } |
| metrics2.GetInt64Metric(queueSize, map[string]string{"branch": "primary"}).Update(primarySize) |
| |
| const secondaryBranchStatement = `SELECT COUNT(*) FROM SecondaryBranchDiffCalculationWork |
| WHERE last_updated_ts > last_calculated_ts AND (now() - calculation_lease_ends) > '10m'` |
| row = db.QueryRow(ctx, secondaryBranchStatement) |
| var secondarySize int64 |
| if err := row.Scan(&secondarySize); err != nil { |
| sklog.Warningf("Could not compute queue size for secondary branch: %s", err) |
| secondarySize = -1 |
| } |
| metrics2.GetInt64Metric(queueSize, map[string]string{"branch": "secondary"}).Update(secondarySize) |
| |
| const primaryFreshnessStatement = `SELECT AVG(now() - last_calculated_ts), MAX(now() - last_calculated_ts) |
| FROM PrimaryBranchDiffCalculationWork` |
| row = db.QueryRow(ctx, primaryFreshnessStatement) |
| var primaryAvgFreshness time.Duration |
| var primaryMaxFreshness time.Duration |
| if err := row.Scan(&primaryAvgFreshness, &primaryMaxFreshness); err != nil { |
| sklog.Warningf("Could not compute diffwork freshness for primary branch: %s", err) |
| primaryAvgFreshness = -1 |
| primaryMaxFreshness = -1 |
| } |
| metrics2.GetInt64Metric(queueFreshness, map[string]string{"branch": "primary", "stat": "avg"}). |
| Update(int64(primaryAvgFreshness / time.Second)) |
| metrics2.GetInt64Metric(queueFreshness, map[string]string{"branch": "primary", "stat": "max"}). |
| Update(int64(primaryMaxFreshness / time.Second)) |
| |
| } |
| }() |
| } |
| |
| // startBackupStatusCheck repeatedly scans the Backup Schedules to see their status. If there are |
| // not 3 schedules, each with a success, this raises an error. The schedules are created via |
| // //golden/cmd/sqlinit. If the tables change, those will need to be re-created. |
| // https://www.cockroachlabs.com/docs/stable/create-schedule-for-backup.html |
| func startBackupStatusCheck(ctx context.Context, db *pgxpool.Pool, ptc periodicTasksConfig) { |
| go func() { |
| const backupError = "periodictasks_backup_error" |
| backupMetric := metrics2.GetInt64Metric(backupError, map[string]string{"database": ptc.SQLDatabaseName}) |
| backupMetric.Update(0) |
| |
| for range time.Tick(time.Hour) { |
| if err := ctx.Err(); err != nil { |
| return |
| } |
| statement := `SELECT id, label, state FROM [SHOW SCHEDULES] WHERE label LIKE '` + |
| ptc.SQLDatabaseName + `\_%'` |
| rows, err := db.Query(ctx, statement) |
| if err != nil { |
| sklog.Errorf("Could not check backup schedules: %s", err) |
| backupMetric.Update(1) |
| return |
| } |
| |
| hadFailure := false |
| totalBackups := 0 |
| for rows.Next() { |
| var id int64 |
| var label string |
| var state pgtype.Text |
| if err := rows.Scan(&id, &label, &state); err != nil { |
| sklog.Errorf("Could not scan backup results: %s", err) |
| backupMetric.Update(1) |
| return |
| } |
| totalBackups++ |
| // Example errors: |
| // reschedule: failed to create job for schedule 623934079889145857: err=executing schedule 623934079889145857: failed to resolve targets specified in the BACKUP stmt: table "crostastdev.commits" does not exist |
| // reschedule: failed to create job for schedule 623934084168056833: err=executing schedule 623934084168056833: Get https://storage.googleapis.com/skia-gold-sql-backups/crostastdev/monthly/2021/09/07-042100.00/BACKUP_MANIFEST: oauth2: cannot fetch token: 400 Bad Request |
| if strings.Contains(state.String, "reschedule") || |
| strings.Contains(state.String, "failed") || |
| strings.Contains(state.String, "err") { |
| hadFailure = true |
| sklog.Errorf("Backup Error for %s (%d) - %s", label, id, state.String) |
| } |
| } |
| rows.Close() |
| if totalBackups != 3 { |
| sklog.Errorf("Expected to see 3 backup schedules (daily, weekly, monthly), but instead saw %d", hadFailure) |
| hadFailure = true |
| } |
| if hadFailure { |
| backupMetric.Update(1) |
| } else { |
| backupMetric.Update(0) |
| sklog.Infof("All backups are performing as expected") |
| } |
| } |
| }() |
| } |
| |
| // startKnownDigestsSync regularly syncs all the known digests to the KnownHashesGCSPath, which |
| // can be used by clients (we know it is used by Skia) to make tests not have to decoded and output |
| // images that match the given hash. This optimization becomes important when tests are putting out |
| // many many images. |
| func startKnownDigestsSync(ctx context.Context, db *pgxpool.Pool, ptc periodicTasksConfig) { |
| liveness := metrics2.NewLiveness("periodic_tasks", map[string]string{ |
| "task": "syncKnownDigests", |
| }) |
| |
| storageClient, err := storage.NewGCSClient(ctx, nil, storage.GCSClientOptions{ |
| Bucket: "skia-infra-testdata", |
| KnownHashesGCSPath: ptc.KnownHashesGCSPath, |
| }) |
| if err != nil { |
| sklog.Errorf("Could not start syncing known digests: %s", err) |
| return |
| } |
| |
| go util.RepeatCtx(ctx, 20*time.Minute, func(ctx context.Context) { |
| sklog.Infof("Syncing all recently seen digests to %s", ptc.KnownHashesGCSPath) |
| ctx, span := trace.StartSpan(ctx, "periodic_SyncKnownDigests") |
| defer span.End() |
| |
| // We grab digests from twice our window length to be overly thorough to avoid excess |
| // uploads from clients who use this. |
| digests, err := getAllRecentDigests(ctx, db, ptc.WindowSize*2) |
| if err != nil { |
| sklog.Errorf("Error getting recent digests: %s", err) |
| return |
| } |
| |
| if err := storageClient.WriteKnownDigests(ctx, digests); err != nil { |
| sklog.Errorf("Error writing recent digests: %s", err) |
| return |
| } |
| liveness.Reset() |
| sklog.Infof("Done syncing recently seen digests") |
| }) |
| } |
| |
| // getAllRecentDigests returns all the digests seen on the primary branch in the provided window |
| // of commits. If needed, this could combine the digests with the unique digests seen from recent |
| // Tryjob results. |
| func getAllRecentDigests(ctx context.Context, db *pgxpool.Pool, numCommits int) ([]types.Digest, error) { |
| ctx, span := trace.StartSpan(ctx, "getAllRecentDigests") |
| defer span.End() |
| |
| const statement = ` |
| WITH |
| RecentCommits AS ( |
| SELECT tile_id, commit_id FROM CommitsWithData |
| AS OF SYSTEM TIME '-0.1s' |
| ORDER BY commit_id DESC LIMIT $1 |
| ), |
| OldestTileInWindow AS ( |
| SELECT MIN(tile_id) as tile_id FROM RecentCommits |
| AS OF SYSTEM TIME '-0.1s' |
| ) |
| SELECT DISTINCT encode(digest, 'hex') FROM TiledTraceDigests |
| JOIN OldestTileInWindow ON TiledTraceDigests.tile_id >= OldestTileInWindow.tile_id |
| AS OF SYSTEM TIME '-0.1s' |
| ORDER BY 1 |
| ` |
| |
| rows, err := db.Query(ctx, statement, numCommits) |
| if err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| defer rows.Close() |
| |
| var rv []types.Digest |
| for rows.Next() { |
| var d types.Digest |
| if err := rows.Scan(&d); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| rv = append(rv, d) |
| } |
| return rv, nil |
| } |