blob: 966295d179e55e289a4ce44c4780da9cfa1ab6e8 [file] [log] [blame]
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
"os"
"strings"
"time"
gstorage "cloud.google.com/go/storage"
"github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"go.opencensus.io/trace"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/cache"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/gcs"
"go.skia.org/infra/go/gcs/gcsclient"
"go.skia.org/infra/go/gerrit"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/now"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/sql/sqlutil"
"go.skia.org/infra/go/util"
"go.skia.org/infra/golden/go/code_review"
"go.skia.org/infra/golden/go/code_review/commenter"
"go.skia.org/infra/golden/go/code_review/gerrit_crs"
"go.skia.org/infra/golden/go/code_review/github_crs"
"go.skia.org/infra/golden/go/config"
"go.skia.org/infra/golden/go/ignore/sqlignorestore"
searchCache "go.skia.org/infra/golden/go/search/caching"
"go.skia.org/infra/golden/go/sql"
"go.skia.org/infra/golden/go/sql/schema"
"go.skia.org/infra/golden/go/storage"
"go.skia.org/infra/golden/go/tracing"
"go.skia.org/infra/golden/go/types"
"go.skia.org/infra/golden/go/validation"
"go.skia.org/infra/golden/go/validation/data_manager"
"go.skia.org/infra/perf/go/ingest/format"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
)
const (
// Arbitrary number
maxSQLConnections = 4
clScanRange = 5 * 24 * time.Hour
)
// TODO(kjlubick) Add a task to check for abandoned CLs.
type periodicTasksConfig struct {
config.Common
// ChangelistDiffPeriod is how often to look at recently updated CLs and tabulate the diffs
// for the digests produced.
// The diffs are not calculated in this service, but the tasks are generated here and
// processed in the diffcalculator process.
ChangelistDiffPeriod config.Duration `json:"changelist_diff_period"`
// CLCommentTemplate is a string with placeholders for generating a comment message. See
// commenter.commentTemplateContext for the exact fields.
CLCommentTemplate string `json:"cl_comment_template" optional:"true"`
// CommentOnCLsPeriod, if positive, is how often to check recent CLs and Patchsets for
// untriaged digests and comment on them if appropriate.
CommentOnCLsPeriod config.Duration `json:"comment_on_cls_period" optional:"true"`
// PerfSummaries configures summary data (e.g. triage status, ignore count) that is fed into
// a GCS bucket which an instance of Perf can ingest from.
PerfSummaries *perfSummariesConfig `json:"perf_summaries" optional:"true"`
// PrimaryBranchDiffPeriod is how often to look at the most recent window of commits and
// tabulate diffs between all groupings based on the digests produced on the primary branch.
// The diffs are not calculated in this service, but sent via Pub/Sub to the appropriate workers.
PrimaryBranchDiffPeriod config.Duration `json:"primary_branch_diff_period"`
// UpdateIgnorePeriod is how often we should try to apply the ignore rules to all traces.
UpdateIgnorePeriod config.Duration `json:"update_traces_ignore_period"` // TODO(kjlubick) change JSON
// ExpirationMonitorBatchSize is the number of rows to update expiry at a time.
ExpirationMonitorBatchSize int `json:"expiration_monitor_batch_size" optional:"true"`
// ExpirationMonitorFrequencyInHours is the running frequency in hours for the expiration monitor.
ExpirationMonitorFrequencyInHours int `json:"expiration_monitor_frequency_hours" optional:"true"`
}
type perfSummariesConfig struct {
AgeOutCommits int `json:"age_out_commits"`
CorporaToSummarize []string `json:"corpora_to_summarize"`
GCSBucket string `json:"perf_gcs_bucket"`
KeysToSummarize []string `json:"keys_to_summarize"`
Period config.Duration `json:"period"`
ValuesToIgnore []string `json:"values_to_ignore"`
}
func main() {
// Command line flags.
var (
commonInstanceConfig = flag.String("common_instance_config", "", "Path to the json5 file containing the configuration that needs to be the same across all services for a given instance.")
thisConfig = flag.String("config", "", "Path to the json5 file containing the configuration specific to the periodic tasks server.")
hang = flag.Bool("hang", false, "Stop and do nothing after reading the flags. Good for debugging containers.")
local = flag.Bool("local", false, "Set to true if running locally.")
)
// Parse the options. So we can configure logging.
flag.Parse()
if *hang {
sklog.Info("Hanging")
select {}
}
var ptc periodicTasksConfig
if err := config.LoadFromJSON5(&ptc, commonInstanceConfig, thisConfig); err != nil {
sklog.Fatalf("Reading config: %s", err)
}
sklog.Infof("Loaded config %#v", ptc)
tp := 0.01
if ptc.TracingProportion > 0 {
tp = ptc.TracingProportion
}
common.InitWithMust(
"periodictasks",
common.PrometheusOpt(&ptc.PromPort),
)
if err := tracing.Initialize(tp, ptc.SQLDatabaseName); err != nil {
sklog.Fatalf("Could not set up tracing: %s", err)
}
ctx := context.Background()
db := mustInitSQLDatabase(ctx, ptc)
startUpdateTracesIgnoreStatus(ctx, db, ptc)
startCommentOnCLs(ctx, db, ptc)
gatherer := &diffWorkGatherer{
db: db,
windowSize: ptc.WindowSize,
mostRecentCLScan: now.Now(ctx).Add(-clScanRange),
}
startPrimaryBranchDiffWork(ctx, gatherer, ptc)
startChangelistsDiffWork(ctx, gatherer, ptc)
startDiffWorkMetrics(ctx, db)
startBackupStatusCheck(ctx, db, ptc)
if !*local {
startKnownDigestsSync(ctx, db, ptc)
}
if ptc.PerfSummaries != nil {
startPerfSummarization(ctx, db, ptc.PerfSummaries)
}
sklog.Infof("Starting cache population tasks.")
runCachingTasks(ctx, ptc, db)
sklog.Infof("Starting expiration monitoring tasks.")
runExpiryMonitoringTasks(ctx, db, ptc)
sklog.Infof("periodic tasks have been started")
http.HandleFunc("/healthz", httputils.ReadyHandleFunc)
sklog.Fatal(http.ListenAndServe(ptc.ReadyPort, nil))
}
func mustInitSQLDatabase(ctx context.Context, ptc periodicTasksConfig) *pgxpool.Pool {
if ptc.SQLDatabaseName == "" {
sklog.Fatalf("Must have SQL Database Information")
}
url := sql.GetConnectionURL(ptc.SQLConnection, ptc.SQLDatabaseName)
conf, err := pgxpool.ParseConfig(url)
if err != nil {
sklog.Fatalf("error getting postgres config %s: %s", url, err)
}
conf.MaxConns = maxSQLConnections
db, err := pgxpool.ConnectConfig(ctx, conf)
if err != nil {
sklog.Fatalf("error connecting to the database: %s", err)
}
sklog.Infof("Connected to SQL database %s", ptc.SQLDatabaseName)
return db
}
func startUpdateTracesIgnoreStatus(ctx context.Context, db *pgxpool.Pool, ptc periodicTasksConfig) {
liveness := metrics2.NewLiveness("periodic_tasks", map[string]string{
"task": "updateTracesIgnoreStatus",
})
go util.RepeatCtx(ctx, ptc.UpdateIgnorePeriod.Duration, func(ctx context.Context) {
sklog.Infof("Updating traces and values at head with ignore status")
ctx, span := trace.StartSpan(ctx, "periodic_updateTracesIgnoreStatus")
defer span.End()
if err := sqlignorestore.UpdateIgnoredTraces(ctx, db); err != nil {
sklog.Errorf("Error while updating traces ignore status: %s", err)
return // return so the liveness is not updated
}
liveness.Reset()
sklog.Infof("Done with updateTracesIgnoreStatus")
})
}
func startCommentOnCLs(ctx context.Context, db *pgxpool.Pool, ptc periodicTasksConfig) {
if ptc.CommentOnCLsPeriod.Duration <= 0 {
sklog.Infof("Not commenting on CLs because duration was zero.")
return
}
systems := mustInitializeSystems(ctx, ptc)
cmntr, err := commenter.New(db, systems, ptc.CLCommentTemplate, ptc.SiteURL, ptc.WindowSize)
if err != nil {
sklog.Fatalf("Could not initialize commenting: %s", err)
}
liveness := metrics2.NewLiveness("periodic_tasks", map[string]string{
"task": "commentOnCLs",
})
go util.RepeatCtx(ctx, ptc.CommentOnCLsPeriod.Duration, func(ctx context.Context) {
sklog.Infof("Checking CLs for untriaged results and commenting if necessary")
ctx, span := trace.StartSpan(ctx, "periodic_commentOnCLsWithUntriagedDigests")
defer span.End()
if err := cmntr.CommentOnChangelistsWithUntriagedDigests(ctx); err != nil {
sklog.Errorf("Error while commenting on CLs: %s", err)
return // return so the liveness is not updated
}
liveness.Reset()
sklog.Infof("Done checking on CLs to comment")
})
}
// mustInitializeSystems creates code_review.Clients and returns them wrapped as a ReviewSystem.
// It panics if any part of configuration fails.
func mustInitializeSystems(ctx context.Context, ptc periodicTasksConfig) []commenter.ReviewSystem {
tokenSource, err := google.DefaultTokenSource(ctx, auth.ScopeGerrit)
if err != nil {
sklog.Fatalf("Failed to authenticate service account: %s", err)
}
gerritHTTPClient := httputils.DefaultClientConfig().WithTokenSource(tokenSource).Client()
rv := make([]commenter.ReviewSystem, 0, len(ptc.CodeReviewSystems))
for _, cfg := range ptc.CodeReviewSystems {
var crs code_review.Client
if cfg.Flavor == "gerrit" {
if cfg.GerritURL == "" {
sklog.Fatal("You must specify gerrit_url")
}
gerritClient, err := gerrit.NewGerrit(cfg.GerritURL, gerritHTTPClient)
if err != nil {
sklog.Fatalf("Could not create gerrit client for %s", cfg.GerritURL)
}
crs = gerrit_crs.New(gerritClient)
} else if cfg.Flavor == "github" {
if cfg.GitHubRepo == "" || cfg.GitHubCredPath == "" {
sklog.Fatal("You must specify github_repo and github_cred_path")
}
gBody, err := os.ReadFile(cfg.GitHubCredPath)
if err != nil {
sklog.Fatalf("Couldn't find githubToken in %s: %s", cfg.GitHubCredPath, err)
}
gToken := strings.TrimSpace(string(gBody))
githubTS := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: gToken})
c := httputils.DefaultClientConfig().With2xxOnly().WithTokenSource(githubTS).Client()
crs = github_crs.New(c, cfg.GitHubRepo)
} else {
sklog.Fatalf("CRS flavor %s not supported.", cfg.Flavor)
}
rv = append(rv, commenter.ReviewSystem{
ID: cfg.ID,
Client: crs,
})
}
return rv
}
type diffWorkGatherer struct {
db *pgxpool.Pool
windowSize int
mostRecentCLScan time.Time
}
// startPrimaryBranchDiffWork starts the process that periodically creates rows in the SQL DB for
// diff workers to calculate diffs for images on the primary branch.
func startPrimaryBranchDiffWork(ctx context.Context, gatherer *diffWorkGatherer, ptc periodicTasksConfig) {
liveness := metrics2.NewLiveness("periodic_tasks", map[string]string{
"task": "calculatePrimaryBranchDiffWork",
})
go util.RepeatCtx(ctx, ptc.PrimaryBranchDiffPeriod.Duration, func(ctx context.Context) {
sklog.Infof("Calculating diffs for images seen recently")
ctx, span := trace.StartSpan(ctx, "periodic_PrimaryBranchDiffWork")
defer span.End()
if err := gatherer.gatherFromPrimaryBranch(ctx); err != nil {
sklog.Errorf("Error while gathering diff work on primary branch: %s", err)
return // return so the liveness is not updated
}
liveness.Reset()
sklog.Infof("Done with sending diffs from primary branch")
})
}
// gatherFromPrimaryBranch finds all groupings that have recent data on the primary branch and
// makes sure a row exists in the SQL DB for each of them.
func (g *diffWorkGatherer) gatherFromPrimaryBranch(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "gatherFromPrimaryBranch")
defer span.End()
// Doing the get/join/insert all in 1 transaction did not work when there are many groupings
// and many diffcalculator processes - too much contention.
groupingsInWindow, err := g.getDistinctGroupingsInWindow(ctx)
if err != nil {
return skerr.Wrap(err)
}
alreadyProcessedGroupings, err := g.getGroupingsBeingProcessed(ctx)
if err != nil {
return skerr.Wrap(err)
}
apg := map[string]bool{}
for _, g := range alreadyProcessedGroupings {
apg[string(g)] = true
}
// TODO(kjlubick) periodically remove groupings that are not at HEAD anymore.
var newGroupings []schema.GroupingID
for _, g := range groupingsInWindow {
if !apg[string(g)] {
newGroupings = append(newGroupings, g)
}
}
sklog.Infof("There are currently %d groupings in the window and %d groupings being processed for diffs. This cycle, there were %d new groupings detected.",
len(groupingsInWindow), len(alreadyProcessedGroupings), len(newGroupings))
if len(newGroupings) == 0 {
return nil
}
return skerr.Wrap(g.addNewGroupingsForProcessing(ctx, newGroupings))
}
// getDistinctGroupingsInWindow returns the distinct grouping ids seen within the current window.
func (g *diffWorkGatherer) getDistinctGroupingsInWindow(ctx context.Context) ([]schema.GroupingID, error) {
ctx, span := trace.StartSpan(ctx, "getDistinctGroupingsInWindow")
defer span.End()
const statement = `WITH
RecentCommits AS (
SELECT commit_id FROM CommitsWithData
ORDER BY commit_id DESC LIMIT $1
),
FirstCommitInWindow AS (
SELECT commit_id FROM RecentCommits
ORDER BY commit_id ASC LIMIT 1
)
SELECT DISTINCT grouping_id FROM ValuesAtHead
JOIN FirstCommitInWindow ON ValuesAtHead.most_recent_commit_id >= FirstCommitInWindow.commit_id`
rows, err := g.db.Query(ctx, statement, g.windowSize)
if err != nil {
return nil, skerr.Wrap(err)
}
defer rows.Close()
var rv []schema.GroupingID
for rows.Next() {
var id schema.GroupingID
if err := rows.Scan(&id); err != nil {
return nil, skerr.Wrap(err)
}
rv = append(rv, id)
}
return rv, nil
}
// getGroupingsBeingProcessed returns all groupings that we are currently computing diffs for.
func (g *diffWorkGatherer) getGroupingsBeingProcessed(ctx context.Context) ([]schema.GroupingID, error) {
ctx, span := trace.StartSpan(ctx, "getGroupingsBeingProcessed")
defer span.End()
const statement = `SELECT DISTINCT grouping_id FROM PrimaryBranchDiffCalculationWork
AS OF SYSTEM TIME '-0.1s'`
rows, err := g.db.Query(ctx, statement)
if err != nil {
return nil, skerr.Wrap(err)
}
defer rows.Close()
var rv []schema.GroupingID
for rows.Next() {
var id schema.GroupingID
if err := rows.Scan(&id); err != nil {
return nil, skerr.Wrap(err)
}
rv = append(rv, id)
}
return rv, nil
}
// addNewGroupingsForProcessing updates the PrimaryBranchDiffCalculationWork table with the newly
// provided groupings, such that we will start computing diffs for them. This table is potentially
// under a lot of contention. We try to write some sentinel values, but if there are already values
// there, we will bail out.
func (g *diffWorkGatherer) addNewGroupingsForProcessing(ctx context.Context, groupings []schema.GroupingID) error {
ctx, span := trace.StartSpan(ctx, "addNewGroupingsForProcessing")
defer span.End()
span.AddAttributes(trace.Int64Attribute("num_groupings", int64(len(groupings))))
statement := `INSERT INTO PrimaryBranchDiffCalculationWork (grouping_id, last_calculated_ts, calculation_lease_ends) VALUES`
const valuesPerRow = 3
vp := sqlutil.ValuesPlaceholders(valuesPerRow, len(groupings))
statement = statement + vp + ` ON CONFLICT DO NOTHING`
args := make([]interface{}, 0, valuesPerRow*len(groupings))
// This time will make sure we compute diffs for this soon.
beginningOfTime := time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC)
for _, g := range groupings {
args = append(args, g, beginningOfTime, beginningOfTime)
}
err := crdbpgx.ExecuteTx(ctx, g.db, pgx.TxOptions{}, func(tx pgx.Tx) error {
_, err := g.db.Exec(ctx, statement, args...)
return err // may be retried
})
return skerr.Wrap(err)
}
// startChangelistsDiffWork starts the process that periodically creates rows in the SQL DB for
// /diff workers to calculate diffs for images produced by CLs.
func startChangelistsDiffWork(ctx context.Context, gatherer *diffWorkGatherer, ptc periodicTasksConfig) {
liveness := metrics2.NewLiveness("periodic_tasks", map[string]string{
"task": "calculateChangelistsDiffWork",
})
go util.RepeatCtx(ctx, ptc.ChangelistDiffPeriod.Duration, func(ctx context.Context) {
sklog.Infof("Calculating diffs for images produced on CLs")
ctx, span := trace.StartSpan(ctx, "periodic_ChangelistsDiffWork")
defer span.End()
if err := gatherer.gatherFromChangelists(ctx); err != nil {
sklog.Errorf("Error while gathering diff work on CLs: %s", err)
return // return so the liveness is not updated
}
liveness.Reset()
sklog.Infof("Done with sending diffs from CLs")
})
}
// gatherFromChangelists scans all recently updated CLs and creates a row in the SQL DB for each
// grouping and CL that saw images not already on the primary branch.
func (g *diffWorkGatherer) gatherFromChangelists(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "gatherFromChangelists")
defer span.End()
firstTileIDInWindow, err := g.getFirstTileIDInWindow(ctx)
if err != nil {
return skerr.Wrap(err)
}
updatedTS := now.Now(ctx)
// Check all changelists updated since last cycle (or 5 days, initially).
clIDs, err := g.getRecentlyUpdatedChangelists(ctx)
if err != nil {
return skerr.Wrap(err)
}
span.AddAttributes(trace.Int64Attribute("num_cls", int64(len(clIDs))))
if len(clIDs) != 0 {
for _, cl := range clIDs {
sklog.Debugf("Creating diff work for CL %s", cl)
if err := g.createDiffRowsForCL(ctx, firstTileIDInWindow, cl); err != nil {
return skerr.Wrap(err)
}
}
}
g.mostRecentCLScan = updatedTS
if err := g.deleteOldCLDiffWork(ctx); err != nil {
return skerr.Wrap(err)
}
return nil
}
// getFirstTileIDInWindow returns the first tile in the current sliding window of commits.
func (g *diffWorkGatherer) getFirstTileIDInWindow(ctx context.Context) (schema.TileID, error) {
ctx, span := trace.StartSpan(ctx, "getFirstTileIDInWindow")
defer span.End()
const statement = `WITH
RecentCommits AS (
SELECT tile_id, commit_id FROM CommitsWithData
ORDER BY commit_id DESC LIMIT $1
)
SELECT tile_id FROM RecentCommits ORDER BY commit_id ASC LIMIT 1
`
row := g.db.QueryRow(ctx, statement, g.windowSize)
var id schema.TileID
if err := row.Scan(&id); err != nil {
return -1, skerr.Wrap(err)
}
return id, nil
}
// getRecentlyUpdatedChangelists returns the qualified IDs of all CLs that were updated after
// the most recent CL scan.
func (g *diffWorkGatherer) getRecentlyUpdatedChangelists(ctx context.Context) ([]string, error) {
ctx, span := trace.StartSpan(ctx, "getRecentlyUpdatedChangelists")
defer span.End()
const statement = `
SELECT changelist_id FROM Changelists WHERE last_ingested_data >= $1
`
rows, err := g.db.Query(ctx, statement, g.mostRecentCLScan)
if err != nil {
return nil, skerr.Wrap(err)
}
defer rows.Close()
var ids []string
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, skerr.Wrap(err)
}
ids = append(ids, id)
}
return ids, nil
}
// createDiffRowsForCL finds all data produced by this CL and compares it against the data on the
// primary branch. If any digest is not already on the primary branch (in the sliding window), it
// will be gathered up into one row in the SQL DB for diff calculation
func (g *diffWorkGatherer) createDiffRowsForCL(ctx context.Context, startingTile schema.TileID, cl string) error {
ctx, span := trace.StartSpan(ctx, "createDiffRowsForCL")
defer span.End()
span.AddAttributes(trace.StringAttribute("CL", cl))
row := g.db.QueryRow(ctx, `SELECT last_ingested_data FROM ChangeLists WHERE changelist_id = $1`, cl)
var updatedTS time.Time
if err := row.Scan(&updatedTS); err != nil {
return skerr.Wrap(err)
}
updatedTS = updatedTS.UTC()
const statement = `WITH
DataForCL AS (
SELECT DISTINCT grouping_id, digest FROM SecondaryBranchValues
WHERE branch_name = $1
),
DigestsNotOnPrimaryBranch AS (
-- We do a left join and check for null to pull only those digests that are not already
-- on the primary branch. Those new digests we have to include in our diff calculations.
SELECT DISTINCT DataForCL.grouping_id, DataForCL.digest FROM DataForCL
LEFT JOIN TiledTraceDigests ON DataForCL.grouping_id = TiledTraceDigests.grouping_id
AND DataForCL.digest = TiledTraceDigests.digest
AND TiledTraceDigests.tile_id >= $2
WHERE TiledTraceDigests.digest IS NULL
)
SELECT Groupings.grouping_id, encode(DigestsNotOnPrimaryBranch.digest, 'hex') FROM DigestsNotOnPrimaryBranch
JOIN Groupings ON DigestsNotOnPrimaryBranch.grouping_id = Groupings.grouping_id
ORDER BY 1, 2
`
rows, err := g.db.Query(ctx, statement, cl, startingTile)
if err != nil {
return skerr.Wrap(err)
}
defer rows.Close()
var newDigests []types.Digest
var currentGroupingID schema.GroupingID
var workRows []schema.SecondaryBranchDiffCalculationRow
for rows.Next() {
var digest types.Digest
var groupingID schema.GroupingID
if err := rows.Scan(&groupingID, &digest); err != nil {
return skerr.Wrap(err)
}
if currentGroupingID == nil {
currentGroupingID = groupingID
}
if bytes.Equal(currentGroupingID, groupingID) {
newDigests = append(newDigests, digest)
} else {
workRows = append(workRows, schema.SecondaryBranchDiffCalculationRow{
BranchName: cl,
GroupingID: currentGroupingID,
LastUpdated: updatedTS,
DigestsNotOnPrimary: newDigests,
})
currentGroupingID = groupingID
// Reset newDigests to be empty and then start adding the new digests to it.
newDigests = []types.Digest{digest}
}
}
rows.Close()
if currentGroupingID == nil {
return nil // nothing to report
}
workRows = append(workRows, schema.SecondaryBranchDiffCalculationRow{
BranchName: cl,
GroupingID: currentGroupingID,
LastUpdated: updatedTS,
DigestsNotOnPrimary: newDigests,
})
insertStatement := `INSERT INTO SecondaryBranchDiffCalculationWork
(branch_name, grouping_id, last_updated_ts, digests, last_calculated_ts, calculation_lease_ends) VALUES `
const valuesPerRow = 6
vp := sqlutil.ValuesPlaceholders(valuesPerRow, len(workRows))
insertStatement += vp
insertStatement += ` ON CONFLICT (branch_name, grouping_id)
DO UPDATE SET (last_updated_ts, digests) = (excluded.last_updated_ts, excluded.digests);`
arguments := make([]interface{}, 0, valuesPerRow*len(workRows))
epoch := time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC)
for _, wr := range workRows {
arguments = append(arguments, wr.BranchName, wr.GroupingID, wr.LastUpdated, wr.DigestsNotOnPrimary,
epoch, epoch)
}
err = crdbpgx.ExecuteTx(ctx, g.db, pgx.TxOptions{}, func(tx pgx.Tx) error {
_, err := g.db.Exec(ctx, insertStatement, arguments...)
return err // may be retried
})
return skerr.Wrap(err)
}
// deleteOldCLDiffWork deletes rows in the SQL DB that are "too old", that is, they belong to CLs
// that are several days old, beyond the clScanRange. This helps keep the number of rows down
// in that Table.
func (g *diffWorkGatherer) deleteOldCLDiffWork(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "createDiffRowsForCL")
defer span.End()
const statement = `DELETE FROM SecondaryBranchDiffCalculationWork WHERE last_updated_ts < $1`
err := crdbpgx.ExecuteTx(ctx, g.db, pgx.TxOptions{}, func(tx pgx.Tx) error {
cutoff := now.Now(ctx).Add(-clScanRange)
_, err := g.db.Exec(ctx, statement, cutoff)
return err // may be retried
})
return skerr.Wrap(err)
}
// startDiffWorkMetrics continuously reports how much uncalculated work there is
func startDiffWorkMetrics(ctx context.Context, db *pgxpool.Pool) {
go func() {
const queueSize = "diffcalculator_workqueuesize"
const queueFreshness = "diffcalculator_workqueuefreshness"
for range time.Tick(time.Minute) {
if err := ctx.Err(); err != nil {
return
}
// These time values come from diffcalculator
const primarySizeStatement = `SELECT COUNT(*) FROM PrimaryBranchDiffCalculationWork
WHERE (now() - last_calculated_ts) > '1m' AND (now() - calculation_lease_ends) > '10m'`
row := db.QueryRow(ctx, primarySizeStatement)
var primarySize int64
if err := row.Scan(&primarySize); err != nil {
sklog.Warningf("Could not compute queue size for primary branch: %s", err)
primarySize = -1
}
metrics2.GetInt64Metric(queueSize, map[string]string{"branch": "primary"}).Update(primarySize)
const secondaryBranchStatement = `SELECT COUNT(*) FROM SecondaryBranchDiffCalculationWork
WHERE last_updated_ts > last_calculated_ts AND (now() - calculation_lease_ends) > '10m'`
row = db.QueryRow(ctx, secondaryBranchStatement)
var secondarySize int64
if err := row.Scan(&secondarySize); err != nil {
sklog.Warningf("Could not compute queue size for secondary branch: %s", err)
secondarySize = -1
}
metrics2.GetInt64Metric(queueSize, map[string]string{"branch": "secondary"}).Update(secondarySize)
const primaryFreshnessStatement = `SELECT AVG(now() - last_calculated_ts), MAX(now() - last_calculated_ts)
FROM PrimaryBranchDiffCalculationWork`
row = db.QueryRow(ctx, primaryFreshnessStatement)
var primaryAvgFreshness time.Duration
var primaryMaxFreshness time.Duration
if err := row.Scan(&primaryAvgFreshness, &primaryMaxFreshness); err != nil {
sklog.Warningf("Could not compute diffwork freshness for primary branch: %s", err)
primaryAvgFreshness = -1
primaryMaxFreshness = -1
}
metrics2.GetInt64Metric(queueFreshness, map[string]string{"branch": "primary", "stat": "avg"}).
Update(int64(primaryAvgFreshness / time.Second))
metrics2.GetInt64Metric(queueFreshness, map[string]string{"branch": "primary", "stat": "max"}).
Update(int64(primaryMaxFreshness / time.Second))
}
}()
}
// startBackupStatusCheck repeatedly scans the Backup Schedules to see their status. If there are
// not 3 schedules, each with a success, this raises an error. The schedules are created via
// //golden/cmd/sqlinit. If the tables change, those will need to be re-created.
// https://www.cockroachlabs.com/docs/stable/create-schedule-for-backup.html
func startBackupStatusCheck(ctx context.Context, db *pgxpool.Pool, ptc periodicTasksConfig) {
go func() {
const backupError = "periodictasks_backup_error"
backupMetric := metrics2.GetInt64Metric(backupError, map[string]string{"database": ptc.SQLDatabaseName})
backupMetric.Update(0)
for range time.Tick(time.Hour) {
if err := ctx.Err(); err != nil {
return
}
statement := `SELECT id, label, state FROM [SHOW SCHEDULES] WHERE label LIKE '` +
ptc.SQLDatabaseName + `\_%'`
rows, err := db.Query(ctx, statement)
if err != nil {
sklog.Errorf("Could not check backup schedules: %s", err)
backupMetric.Update(1)
return
}
hadFailure := false
totalBackups := 0
for rows.Next() {
var id int64
var label string
var state pgtype.Text
if err := rows.Scan(&id, &label, &state); err != nil {
sklog.Errorf("Could not scan backup results: %s", err)
backupMetric.Update(1)
return
}
totalBackups++
// Example errors:
// reschedule: failed to create job for schedule 623934079889145857: err=executing schedule 623934079889145857: failed to resolve targets specified in the BACKUP stmt: table "crostastdev.commits" does not exist
// reschedule: failed to create job for schedule 623934084168056833: err=executing schedule 623934084168056833: Get https://storage.googleapis.com/skia-gold-sql-backups/crostastdev/monthly/2021/09/07-042100.00/BACKUP_MANIFEST: oauth2: cannot fetch token: 400 Bad Request
if strings.Contains(state.String, "reschedule") ||
strings.Contains(state.String, "failed") ||
strings.Contains(state.String, "err") {
hadFailure = true
sklog.Errorf("Backup Error for %s (%d) - %s", label, id, state.String)
}
}
rows.Close()
if totalBackups != 3 {
sklog.Errorf("Expected to see 3 backup schedules (daily, weekly, monthly), but instead saw %d", hadFailure)
hadFailure = true
}
if hadFailure {
backupMetric.Update(1)
} else {
backupMetric.Update(0)
sklog.Infof("All backups are performing as expected")
}
}
}()
}
// startKnownDigestsSync regularly syncs all the known digests to the KnownHashesGCSPath, which
// can be used by clients (we know it is used by Skia) to make tests not have to decoded and output
// images that match the given hash. This optimization becomes important when tests are putting out
// many many images.
func startKnownDigestsSync(ctx context.Context, db *pgxpool.Pool, ptc periodicTasksConfig) {
liveness := metrics2.NewLiveness("periodic_tasks", map[string]string{
"task": "syncKnownDigests",
})
storageClient, err := storage.NewGCSClient(ctx, nil, storage.GCSClientOptions{
Bucket: ptc.GCSBucket,
KnownHashesGCSPath: ptc.KnownHashesGCSPath,
})
if err != nil {
sklog.Errorf("Could not start syncing known digests: %s", err)
return
}
go util.RepeatCtx(ctx, 20*time.Minute, func(ctx context.Context) {
sklog.Infof("Syncing all recently seen digests to %s", ptc.KnownHashesGCSPath)
ctx, span := trace.StartSpan(ctx, "periodic_SyncKnownDigests")
defer span.End()
// We grab digests from twice our window length to be overly thorough to avoid excess
// uploads from clients who use this.
digests, err := getAllRecentDigests(ctx, db, ptc.WindowSize*2)
if err != nil {
sklog.Errorf("Error getting recent digests: %s", err)
return
}
if err := storageClient.WriteKnownDigests(ctx, digests); err != nil {
sklog.Errorf("Error writing recent digests: %s", err)
return
}
liveness.Reset()
sklog.Infof("Done syncing recently seen digests")
})
}
// getAllRecentDigests returns all the digests seen on the primary branch in the provided window
// of commits. If needed, this could combine the digests with the unique digests seen from recent
// Tryjob results.
func getAllRecentDigests(ctx context.Context, db *pgxpool.Pool, numCommits int) ([]types.Digest, error) {
ctx, span := trace.StartSpan(ctx, "getAllRecentDigests")
defer span.End()
const statement = `
WITH
RecentCommits AS (
SELECT tile_id, commit_id FROM CommitsWithData
AS OF SYSTEM TIME '-0.1s'
ORDER BY commit_id DESC LIMIT $1
),
OldestTileInWindow AS (
SELECT MIN(tile_id) as tile_id FROM RecentCommits
AS OF SYSTEM TIME '-0.1s'
)
SELECT DISTINCT encode(digest, 'hex') FROM TiledTraceDigests
JOIN OldestTileInWindow ON TiledTraceDigests.tile_id >= OldestTileInWindow.tile_id
AS OF SYSTEM TIME '-0.1s'
ORDER BY 1
`
rows, err := db.Query(ctx, statement, numCommits)
if err != nil {
return nil, skerr.Wrap(err)
}
defer rows.Close()
var rv []types.Digest
for rows.Next() {
var d types.Digest
if err := rows.Scan(&d); err != nil {
return nil, skerr.Wrap(err)
}
rv = append(rv, d)
}
return rv, nil
}
// startPerfSummarization starts the process that will summarize gold traces and upload them to
// Perf. It assumes the config is non-nil, and will panic if the minimally set data is not done so.
// It starts a go routine that will immediately being summarizing and then repeat the process at
// the configured time period.
func startPerfSummarization(ctx context.Context, db *pgxpool.Pool, sCfg *perfSummariesConfig) {
sklog.Infof("Perf summary config %+v", *sCfg)
if sCfg.AgeOutCommits <= 0 {
panic("Must have a positive, non-zero age_out_commits")
}
if len(sCfg.KeysToSummarize) == 0 {
panic("Must specify at least one key")
}
if len(sCfg.CorporaToSummarize) == 0 {
panic("Must specify at least one corpus")
}
liveness := metrics2.NewLiveness("periodic_tasks", map[string]string{
"task": "PerfSummarization",
})
sc, err := gstorage.NewClient(ctx)
if err != nil {
panic("Could not make google storage client " + err.Error())
}
storageClient := gcsclient.New(sc, sCfg.GCSBucket)
go util.RepeatCtx(ctx, sCfg.Period.Duration, func(ctx context.Context) {
sklog.Infof("Tabulating all perf data for keys %s and corpora %s", sCfg.KeysToSummarize, sCfg.CorporaToSummarize)
if err := summarizeTraces(ctx, db, sCfg, storageClient); err != nil {
sklog.Errorf("Could not summarize traces using config %+v: %s", sCfg, err)
return
}
liveness.Reset()
sklog.Infof("Done tabulating all perf data")
})
}
// summarizeTraces loops through all tuples of keys that match the given configuration (and do not
// include any ignored values), counting how many traces are triaged to one of the three states and
// how many are ignored. This data is uploaded to Perf's GCS bucket in a streaming fashion, that is
// each tuple's data is uploaded on its own, not as one big blob. The entire process could take
// a while, as the summarization may involve full table scans.
func summarizeTraces(ctx context.Context, db *pgxpool.Pool, cfg *perfSummariesConfig, client gcs.GCSClient) error {
oldestCommitID, latestCommitID, err := getWindowCommitBounds(ctx, db, cfg.AgeOutCommits)
if err != nil {
return skerr.Wrap(err)
}
tuples, err := getTuplesOfKeysToQuery(ctx, db, cfg.KeysToSummarize, cfg.ValuesToIgnore, cfg.CorporaToSummarize)
if err != nil {
return skerr.Wrap(err)
}
idToGitHash := map[schema.CommitID]string{}
for _, tuple := range tuples {
perfData, err := getTriageStatus(ctx, db, tuple, oldestCommitID)
if err != nil {
return skerr.Wrap(err)
}
perfData.IgnoredTraces, err = getIgnoredCount(ctx, db, tuple, oldestCommitID)
if err != nil {
return skerr.Wrap(err)
}
// If the traces had no actual data (e.g. are too old or just on CLs), there is nothing to
// upload to Perf.
if perfData.NegativeTraces == 0 && perfData.UntriagedTraces == 0 && perfData.PositiveTraces == 0 && perfData.IgnoredTraces == 0 {
sklog.Infof("No data for tuple %v; consider ignoring it", tuple)
continue
}
if perfData.CommitID == "" {
// It could happen that all traces for this set of keys is ignored. If that is the case,
// we will pretend this is happening at the latest commit
perfData.CommitID = latestCommitID
}
// We need to turn commitIDs into GitHashes, since perf only speaks the latter.
hash, ok := idToGitHash[perfData.CommitID]
if ok {
perfData.GitHash = hash
} else {
hash, err := getCommitHashForID(ctx, db, perfData.CommitID)
if err != nil {
return skerr.Wrap(err)
}
idToGitHash[perfData.CommitID] = hash
perfData.GitHash = hash
}
if err := uploadDataToPerf(ctx, tuple, perfData, client); err != nil {
return skerr.Wrap(err)
}
}
return nil
}
// getWindowCommitBounds finds the current oldest and newest commit that make up windowSize commits.
func getWindowCommitBounds(ctx context.Context, db *pgxpool.Pool, windowSize int) (schema.CommitID, schema.CommitID, error) {
ctx, span := trace.StartSpan(ctx, "getWindowCommitBounds")
defer span.End()
const statement = `
WITH
RecentCommits AS (
SELECT commit_id FROM CommitsWithData
ORDER BY commit_id DESC LIMIT $1
)
SELECT MIN(commit_id), MAX(commit_id) FROM RecentCommits
`
row := db.QueryRow(ctx, statement, windowSize)
var oldestCommitID schema.CommitID
var newestCommitID schema.CommitID
if err := row.Scan(&oldestCommitID, &newestCommitID); err != nil {
return "", "", skerr.Wrap(err)
}
return oldestCommitID, newestCommitID, nil
}
type pair struct {
Key string
Value string
}
type summaryTuple struct {
Corpus string
KeyValues []pair
}
// getTuplesOfKeysToQuery finds all combinations of the non-null values associated with the provided
// keys for the provided corpora that exist in the Traces table (that is, have seen at least one
// data point at some point in Gold's history). Any trace which has a value in the ignoreValues
// slice will be dropped (e.g. Ignoring very old hardware that we haven't tested on in a long time
// could help speed up this process by skipping those models/gpus).
func getTuplesOfKeysToQuery(ctx context.Context, db *pgxpool.Pool, keys, ignoreValues, corpora []string) ([]summaryTuple, error) {
ctx, span := trace.StartSpan(ctx, "getTuplesOfKeysToQuery")
defer span.End()
// Build a statement like:
// SELECT DISTINCT keys->>'source_type',keys->>'os',keys->>'model'
// FROM Traces ORDER BY 1, 2, 3, 4
statement := `SELECT DISTINCT keys->>'source_type'`
for _, key := range keys {
// These keys are provided by the maintainer of Gold, not arbitrary input. Thus, we do not
// need to pass them in as a prepared statement (which is a bit trickier to get right).
statement += fmt.Sprintf(",keys->>'%s'", sql.Sanitize(key))
}
statement += " FROM TRACES WHERE keys->>'source_type' IN "
statement += sqlutil.ValuesPlaceholders(len(corpora), 1)
statement += " ORDER BY 1"
for i := range keys {
statement += fmt.Sprintf(",%d", i+2)
}
var args []interface{}
for _, corpus := range corpora {
args = append(args, corpus)
}
rows, err := db.Query(ctx, statement, args...)
if err != nil {
return nil, skerr.Wrapf(err, "Using statement:\n%s", statement)
}
defer rows.Close()
var rv []summaryTuple
nextRow:
for rows.Next() {
var corpus string
// We have a variable number of columns being returned. Thus, we need to make a slice of
// that length, using pgtype.Text because the columns can be null ...
values := make([]pgtype.Text, len(keys))
// ... and then put pointers to those types into an args slice...
args := []interface{}{&corpus}
for i := range values {
args = append(args, &values[i])
}
// ... so we can pass that in as variadic arguments to Scan.
if err := rows.Scan(args...); err != nil {
return nil, skerr.Wrap(err)
}
tuple := summaryTuple{Corpus: corpus}
for i := range keys {
v := values[i]
if v.Status == pgtype.Null {
continue nextRow
}
// This is easiest to handle here and not in the SQL statement; Otherwise the SQL
// statement gets more unruly than it already is.
if util.In(v.String, ignoreValues) {
continue nextRow
}
tuple.KeyValues = append(tuple.KeyValues, pair{Key: keys[i], Value: v.String})
}
rv = append(rv, tuple)
}
return rv, nil
}
// summaryData is the data that will be uploaded to perf. Each integer represents a count of traces
// that produced positive, negative, or untriaged digests. Ignored traces do not have their digests
// included in the triage count (and are typically untriaged anyway), so those are a separate count.
// If multiple traces produce the same output digest, they will be counted independently, since we
// are counting "traces that produced positive digests" not "number of positive digests produced".
type summaryData struct {
PositiveTraces int
NegativeTraces int
UntriagedTraces int
IgnoredTraces int
CommitID schema.CommitID
GitHash string
}
// getTriageStatus takes a given tuple of keys and corpus and returns how many traces are triaged
// positive, negative, or not at all. This data is at head unless the "latest" commit for that
// trace is older than oldestCommitID. If two traces produce the same digest, those will be counted
// individually as 2, not combined as 1. It also returns the newest commitID that any of the traces
// which match the tuple produced data, simplifying data collection by assuming all data matching
// this tuple was produced at the same commit.
func getTriageStatus(ctx context.Context, db *pgxpool.Pool, tuple summaryTuple, oldestCommitID schema.CommitID) (summaryData, error) {
ctx, span := trace.StartSpan(ctx, "getTriageStatus")
defer span.End()
statement := "WITH\n" + joinedTracesStatement(tuple)
statement += `
),
TracesGroupingDigests AS (
SELECT JoinedTraces.trace_id, grouping_id, digest, most_recent_commit_id
FROM JoinedTraces
JOIN ValuesAtHead on JoinedTraces.trace_id = ValuesAtHead.trace_id
WHERE most_recent_commit_id >= $1 and matches_any_ignore_rule = False and corpus = $2
)
SELECT label, COUNT(*), MAX(most_recent_commit_id) FROM TracesGroupingDigests
JOIN
Expectations ON TracesGroupingDigests.grouping_id = Expectations.grouping_id AND
TracesGroupingDigests.digest = Expectations.digest
GROUP BY label`
rows, err := db.Query(ctx, statement, oldestCommitID, tuple.Corpus)
if err != nil {
return summaryData{}, skerr.Wrapf(err, "Error running statement:\n%s", statement)
}
defer rows.Close()
var rv summaryData
for rows.Next() {
var label schema.ExpectationLabel
var count int
var mostRecentCommitID schema.CommitID
if err := rows.Scan(&label, &count, &mostRecentCommitID); err != nil {
return summaryData{}, skerr.Wrap(err)
}
switch label {
case schema.LabelPositive:
rv.PositiveTraces = count
case schema.LabelNegative:
rv.NegativeTraces = count
case schema.LabelUntriaged:
rv.UntriagedTraces = count
}
rv.CommitID = mostRecentCommitID
}
return rv, nil
}
// joinedTracesStatement creates a subquery named JoinedTraces that has all the trace ids matching
// all the key-value pairs and the corpus from the passed in tuple. Experimental testing showed that
// this approach with many INTERSECTs was much faster than using the JSON @> syntax, probably due to
// better use of the keys index. The statement is left open should any callers want to add an
// additional clause.
func joinedTracesStatement(tuple summaryTuple) string {
statement := "JoinedTraces AS ("
for _, kv := range tuple.KeyValues {
statement += fmt.Sprintf("\nSELECT trace_id FROM Traces WHERE keys -> '%s' = '%q'",
sql.Sanitize(kv.Key), sql.Sanitize(kv.Value))
statement += "\n\tINTERSECT"
}
statement += fmt.Sprintf("\nSELECT trace_id FROM Traces WHERE keys -> '%s' = '%q'",
types.CorpusField, sql.Sanitize(tuple.Corpus))
return statement
}
// getIgnoredCount counts how many traces produced data more recently than oldestCommitID and match
// one or more ignore rules.
func getIgnoredCount(ctx context.Context, db *pgxpool.Pool, tuple summaryTuple, oldestCommitID schema.CommitID) (int, error) {
ctx, span := trace.StartSpan(ctx, "getIgnoredCount")
defer span.End()
statement := "WITH\n" + joinedTracesStatement(tuple)
statement += `
INTERSECT
SELECT trace_id FROM Traces where matches_any_ignore_rule = true
)
SELECT count(*) FROM JoinedTraces
JOIN ValuesAtHead ON JoinedTraces.trace_id = ValuesAtHead.trace_id
WHERE most_recent_commit_id > $1`
row := db.QueryRow(ctx, statement, oldestCommitID)
var count int
if err := row.Scan(&count); err != nil {
return 0, skerr.Wrap(err)
}
return count, nil
}
// getCommitHashForID looks up the githash associated with a commit id.
func getCommitHashForID(ctx context.Context, db *pgxpool.Pool, id schema.CommitID) (string, error) {
ctx, span := trace.StartSpan(ctx, "getCommitHashForID")
defer span.End()
row := db.QueryRow(ctx, `SELECT git_hash FROM GitCommits WHERE commit_id = $1`, id)
var gitHash string
if err := row.Scan(&gitHash); err != nil {
return "", skerr.Wrap(err)
}
return gitHash, nil
}
// uploadDataToPerf creates a JSON object in the format expected by Perf that contains all the
// tabulated summaryData and uploads it to the Perf GCS bucket.
func uploadDataToPerf(ctx context.Context, tuple summaryTuple, data summaryData, client gcs.GCSClient) error {
valueStr := tuple.Corpus
key := map[string]string{types.CorpusField: tuple.Corpus}
for _, p := range tuple.KeyValues {
valueStr += "-" + p.Value
key[p.Key] = p.Value
}
n := now.Now(ctx)
// We want to make the data easy to find but unlikely to have name collisions. Thus we use
// the UnixNano of the current time as the filename and build the folder name based on the
// time and the values of the data.
perfPath := fmt.Sprintf("gold-summary-v1/%d/%d/%d/%d/%s/%d.json",
n.Year(), n.Month(), n.Day(), n.Hour(), valueStr, n.UnixNano())
opts := gcs.FileWriteOptions{
ContentType: "application/json",
}
f := format.Format{
Version: 1,
GitHash: data.GitHash,
Key: key,
Results: []format.Result{{
Key: map[string]string{"count": "gold_triaged_positive", "unit": "traces"},
Measurement: float32(data.PositiveTraces),
}, {
Key: map[string]string{"count": "gold_triaged_negative", "unit": "traces"},
Measurement: float32(data.NegativeTraces),
}, {
Key: map[string]string{"count": "gold_untriaged", "unit": "traces"},
Measurement: float32(data.UntriagedTraces),
}, {
Key: map[string]string{"count": "gold_ignored", "unit": "traces"},
Measurement: float32(data.IgnoredTraces),
}},
}
jsonBytes, err := json.MarshalIndent(f, "", "\t")
if err != nil {
return skerr.Wrap(err)
}
if err := client.SetFileContents(ctx, perfPath, opts, jsonBytes); err != nil {
return skerr.Wrap(err)
}
sklog.Infof("Uploaded summary to perf %s", perfPath)
return nil
}
// Runs the caching tasks.
func runCachingTasks(ctx context.Context, ptc periodicTasksConfig, db *pgxpool.Pool) {
cache, err := ptc.GetCacheClient(ctx)
if err != nil {
sklog.Fatalf("Error creating a new cache instance: %v", err)
}
if cache == nil {
sklog.Fatalf("Cache is not configured correctly for this instance.")
}
liveness := metrics2.NewLiveness("periodic_tasks", map[string]string{
"task": "populateCache",
})
go util.RepeatCtx(ctx, time.Minute*time.Duration(ptc.CachingFrequencyMinutes), func(ctx context.Context) {
populateSearchCache(ctx, ptc, db, cache)
liveness.Reset()
})
}
// Runs the caching tasks related to the search functionality.
func populateSearchCache(ctx context.Context, ptc periodicTasksConfig, db *pgxpool.Pool, cache cache.Cache) {
searchCacheManager := searchCache.New(cache, db, ptc.CachingCorpora, ptc.WindowSize)
err := searchCacheManager.RunCachePopulation(ctx)
if err != nil {
sklog.Fatalf("Error running cache population: %v", err)
}
}
// runExpiryMonitoringTasks executes the tasks to monitor and update expiry of data.
func runExpiryMonitoringTasks(ctx context.Context, db *pgxpool.Pool, ptc periodicTasksConfig) {
if ptc.ExpirationMonitorBatchSize > 0 && ptc.ExpirationMonitorFrequencyInHours > 0 {
liveness := metrics2.NewLiveness("periodic_tasks", map[string]string{
"task": "updateExpiry",
})
expiryDataManager := data_manager.NewExpiryDataManager(db, ptc.ExpirationMonitorBatchSize)
expirationMonitor := validation.NewExpirationMonitor(expiryDataManager)
go util.RepeatCtx(ctx, time.Hour*time.Duration(ptc.ExpirationMonitorFrequencyInHours), func(ctx context.Context) {
err := expirationMonitor.UpdateTriagedExpectationsExpiry(ctx)
if err != nil {
sklog.Errorf("Error updating data expiration: %v", err)
}
liveness.Reset()
})
} else {
sklog.Info("Expiration monitoring is not configured for this instance.")
}
}