Update util.RepeatCtx to pass the Context to the func
This allows the func to decide whether or not to respect Context.Done().
Change-Id: I36eb2dbb465b2d27e37507b563c40af1db5e2bbb
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/233960
Reviewed-by: Ravi Mistry <rmistry@google.com>
Reviewed-by: Ben Wagner aka dogben <benjaminwagner@google.com>
Commit-Queue: Eric Boren <borenet@google.com>
diff --git a/android_compile/go/android_compile_be/checkouts.go b/android_compile/go/android_compile_be/checkouts.go
index 70cd5b1..5dfa994 100644
--- a/android_compile/go/android_compile_be/checkouts.go
+++ b/android_compile/go/android_compile_be/checkouts.go
@@ -136,7 +136,7 @@
}
}
// Update mirror here and then periodically.
- cleanup.Repeat(repoUpdateDuration, func() {
+ cleanup.Repeat(repoUpdateDuration, func(ctx context.Context) {
UpdateMirror(ctx)
}, nil)
diff --git a/android_compile/go/android_compile_be/main.go b/android_compile/go/android_compile_be/main.go
index 1bc0b98..8da0589 100644
--- a/android_compile/go/android_compile_be/main.go
+++ b/android_compile/go/android_compile_be/main.go
@@ -375,7 +375,7 @@
// Start listener for when the mirror should be force synced.
// Update mirror here and then periodically.
- cleanup.Repeat(time.Minute, func() {
+ cleanup.Repeat(time.Minute, func(ctx context.Context) {
// Check the datastore and if it is true then Update the mirror!
forceMirror, err := ac_util.GetForceMirrorUpdateBool(ctx, hostname)
if err != nil {
diff --git a/android_compile/go/android_compile_fe/main.go b/android_compile/go/android_compile_fe/main.go
index a22bd92..1e021cf 100644
--- a/android_compile/go/android_compile_fe/main.go
+++ b/android_compile/go/android_compile_fe/main.go
@@ -165,7 +165,7 @@
}
// Start updater for the queue length metrics.
- cleanup.Repeat(time.Minute, func() {
+ cleanup.Repeat(time.Minute, func(ctx context.Context) {
unownedPendingTasks, ownedPendingTasks, err := util.GetPendingCompileTasks("" /* ownedByInstance */)
if err != nil {
sklog.Errorf("Failed to get unowned/owned compile tasks: %s", err)
diff --git a/autoroll/go/autoroll-fe/main.go b/autoroll/go/autoroll-fe/main.go
index 42d9ee3..6cef645 100644
--- a/autoroll/go/autoroll-fe/main.go
+++ b/autoroll/go/autoroll-fe/main.go
@@ -475,7 +475,7 @@
if err != nil {
sklog.Fatal(err)
}
- go util.RepeatCtx(10*time.Second, ctx, func() {
+ go util.RepeatCtx(10*time.Second, ctx, func(ctx context.Context) {
if err := arbMode.Update(ctx); err != nil {
sklog.Error(err)
}
@@ -484,7 +484,7 @@
if err != nil {
sklog.Fatal(err)
}
- go util.RepeatCtx(10*time.Second, ctx, func() {
+ go util.RepeatCtx(10*time.Second, ctx, func(ctx context.Context) {
if err := arbStatus.Update(ctx); err != nil {
sklog.Error(err)
}
@@ -493,7 +493,7 @@
if err != nil {
sklog.Fatal(err)
}
- go util.RepeatCtx(10*time.Second, ctx, func() {
+ go util.RepeatCtx(10*time.Second, ctx, func(ctx context.Context) {
if err := arbStrategy.Update(ctx); err != nil {
sklog.Error(err)
}
diff --git a/autoroll/go/autoroll-google3/google3.go b/autoroll/go/autoroll-google3/google3.go
index 8f83302..491c3d5 100644
--- a/autoroll/go/autoroll-google3/google3.go
+++ b/autoroll/go/autoroll-google3/google3.go
@@ -77,7 +77,7 @@
// Start ensures DBs are closed when ctx is canceled.
func (a *AutoRoller) Start(ctx context.Context, tickFrequency, repoFrequency time.Duration) {
- go cleanup.Repeat(repoFrequency, func() {
+ go cleanup.Repeat(repoFrequency, func(ctx context.Context) {
util.LogErr(a.UpdateStatus(ctx, "", true))
}, nil)
}
diff --git a/autoroll/go/repo_manager/repo_manager.go b/autoroll/go/repo_manager/repo_manager.go
index 8380faf..478da33 100644
--- a/autoroll/go/repo_manager/repo_manager.go
+++ b/autoroll/go/repo_manager/repo_manager.go
@@ -83,8 +83,13 @@
func Start(ctx context.Context, r RepoManager, frequency time.Duration) {
sklog.Infof("Starting repo_manager")
lv := metrics2.NewLiveness("last_successful_repo_manager_update")
- cleanup.Repeat(frequency, func() {
+ cleanup.Repeat(frequency, func(_ context.Context) {
sklog.Infof("Running repo_manager update.")
+ // Explicitly ignore the passed-in context; this allows us to
+ // continue updating the RepoManager even if the context is
+ // canceled, which helps to prevent errors due to interrupted
+ // syncs, etc.
+ ctx := context.Background()
if err := r.Update(ctx); err != nil {
sklog.Errorf("Failed to update repo manager: %s", err)
} else {
diff --git a/autoroll/go/roller/autoroller.go b/autoroll/go/roller/autoroller.go
index dac0432..e107d4e 100644
--- a/autoroll/go/roller/autoroller.go
+++ b/autoroll/go/roller/autoroller.go
@@ -286,7 +286,11 @@
sklog.Infof("Starting autoroller.")
repo_manager.Start(ctx, r.rm, repoFrequency)
lv := metrics2.NewLiveness("last_successful_autoroll_tick", map[string]string{"roller": r.roller})
- cleanup.Repeat(tickFrequency, func() {
+ cleanup.Repeat(tickFrequency, func(_ context.Context) {
+ // Explicitly ignore the passed-in context; this allows us to
+ // continue running even if the context is canceled, which helps
+ // to prevent errors due to interrupted syncs, etc.
+ ctx := context.Background()
if err := r.Tick(ctx); err != nil {
// Hack: we frequently get failures from GoB which trigger error-rate alerts.
// These alerts are noise and sometimes hide real failures. If the error is
@@ -303,7 +307,7 @@
}, nil)
// Update the current sheriff in a loop.
- cleanup.Repeat(30*time.Minute, func() {
+ cleanup.Repeat(30*time.Minute, func(ctx context.Context) {
emails, err := getSheriff(r.cfg.ParentName, r.cfg.ChildName, r.cfg.RollerName, r.cfg.Sheriff, r.cfg.SheriffBackup)
if err != nil {
sklog.Errorf("Failed to retrieve current sheriff: %s", err)
@@ -323,7 +327,12 @@
// Handle requests for manual rolls.
if r.cfg.SupportsManualRolls {
lvManualRolls := metrics2.NewLiveness("last_successful_manual_roll_check", map[string]string{"roller": r.roller})
- cleanup.Repeat(time.Minute, func() {
+ cleanup.Repeat(time.Minute, func(_ context.Context) {
+ // Explicitly ignore the passed-in context; this allows
+ // us to continue handling manual rolls even if the
+ // context is canceled, which helps to prevent errors
+ // due to interrupted syncs, etc.
+ ctx := context.Background()
if err := r.handleManualRolls(ctx); err != nil {
sklog.Error(err)
} else {
diff --git a/datahopper/go/bot_metrics/bot_metrics.go b/datahopper/go/bot_metrics/bot_metrics.go
index f5a3099..3178384 100644
--- a/datahopper/go/bot_metrics/bot_metrics.go
+++ b/datahopper/go/bot_metrics/bot_metrics.go
@@ -426,7 +426,7 @@
if err != nil {
return fmt.Errorf("Failed to get timestamp of last successful ingestion: %s", err)
}
- go util.RepeatCtx(10*time.Minute, ctx, func() {
+ go util.RepeatCtx(10*time.Minute, ctx, func(ctx context.Context) {
now := time.Now()
if err := cycle(ctx, taskDb, repos, tcc, edb, em, lastFinished, now); err != nil {
sklog.Errorf("Failed to obtain avg time to X%% bot coverage metrics: %s", err)
diff --git a/datahopper/go/datahopper/firestore_backup_metrics.go b/datahopper/go/datahopper/firestore_backup_metrics.go
index 6dd2b21..6d2a811 100644
--- a/datahopper/go/datahopper/firestore_backup_metrics.go
+++ b/datahopper/go/datahopper/firestore_backup_metrics.go
@@ -95,7 +95,7 @@
httpClient := httputils.DefaultClientConfig().WithTokenSource(tokenSource).With2xxOnly().Client()
lvMetrics := metrics2.NewLiveness("last_successful_firestore_backup_metrics_update")
lvBackup := metrics2.NewLiveness("last_successful_firestore_backup")
- go util.RepeatCtx(5*time.Minute, ctx, func() {
+ go util.RepeatCtx(5*time.Minute, ctx, func(ctx context.Context) {
if lastBackupTime, err := getFirestoreLastBackupCompleted(ctx, httpClient); err != nil {
sklog.Errorf("Failed to update firestore backup metrics: %s", err)
} else {
diff --git a/datahopper/go/datahopper/jobs.go b/datahopper/go/datahopper/jobs.go
index 4919530..a4f12ab 100644
--- a/datahopper/go/datahopper/jobs.go
+++ b/datahopper/go/datahopper/jobs.go
@@ -305,7 +305,7 @@
om.start(ctx)
lv := metrics2.NewLiveness("last_successful_job_metrics_update")
- go util.RepeatCtx(5*time.Minute, ctx, func() {
+ go util.RepeatCtx(5*time.Minute, ctx, func(ctx context.Context) {
if err := edb.update(); err != nil {
sklog.Errorf("Failed to update job data: %s", err)
} else {
@@ -362,7 +362,7 @@
func (m *overdueJobMetrics) start(ctx context.Context) {
lvOverdueMetrics := metrics2.NewLiveness("last_successful_overdue_metrics_update")
- go util.RepeatCtx(5*time.Second, ctx, func() {
+ go util.RepeatCtx(5*time.Second, ctx, func(ctx context.Context) {
if err := m.updateOverdueJobSpecMetrics(ctx, time.Now()); err != nil {
sklog.Errorf("Failed to update metrics for overdue job specs: %s", err)
} else {
diff --git a/datahopper/go/datahopper/main.go b/datahopper/go/datahopper/main.go
index 5db8804..597acb3 100644
--- a/datahopper/go/datahopper/main.go
+++ b/datahopper/go/datahopper/main.go
@@ -109,7 +109,7 @@
sklog.Fatal(err)
}
lvRepos := metrics2.NewLiveness("datahopper_repo_update")
- go util.RepeatCtx(time.Minute, ctx, func() {
+ go util.RepeatCtx(time.Minute, ctx, func(ctx context.Context) {
if err := repos.Update(ctx); err != nil {
sklog.Errorf("Failed to update repos: %s", err)
} else {
@@ -122,7 +122,7 @@
if err != nil {
sklog.Fatalf("Failed to create TaskCfgCache: %s", err)
}
- go util.RepeatCtx(30*time.Minute, ctx, func() {
+ go util.RepeatCtx(30*time.Minute, ctx, func(ctx context.Context) {
if err := tcc.Cleanup(ctx, OVERDUE_JOB_METRICS_PERIOD); err != nil {
sklog.Errorf("Failed to cleanup TaskCfgCache: %s", err)
}
diff --git a/datahopper/go/datahopper/tasks.go b/datahopper/go/datahopper/tasks.go
index f0892ce..f2dbc23 100644
--- a/datahopper/go/datahopper/tasks.go
+++ b/datahopper/go/datahopper/tasks.go
@@ -190,7 +190,7 @@
}
lv := metrics2.NewLiveness("last_successful_task_metrics_update")
- go util.RepeatCtx(5*time.Minute, ctx, func() {
+ go util.RepeatCtx(5*time.Minute, ctx, func(ctx context.Context) {
if err := edb.update(); err != nil {
sklog.Errorf("Failed to update task data: %s", err)
} else {
diff --git a/datahopper/go/supported_branches/supported_branches.go b/datahopper/go/supported_branches/supported_branches.go
index e2fc15c..30fed7a 100644
--- a/datahopper/go/supported_branches/supported_branches.go
+++ b/datahopper/go/supported_branches/supported_branches.go
@@ -228,7 +228,7 @@
}
lv := metrics2.NewLiveness("last_successful_supported_branches_update")
oldMetrics := map[metrics2.Int64Metric]struct{}{}
- go util.RepeatCtx(5*time.Minute, ctx, func() {
+ go util.RepeatCtx(5*time.Minute, ctx, func(ctx context.Context) {
newMetrics, err := cycle(repos, oldMetrics, swarm, pools)
if err == nil {
lv.Reset()
diff --git a/datahopper/go/swarming_metrics/bots.go b/datahopper/go/swarming_metrics/bots.go
index 8b13c2c..4b7fd6a 100644
--- a/datahopper/go/swarming_metrics/bots.go
+++ b/datahopper/go/swarming_metrics/bots.go
@@ -261,7 +261,7 @@
"pool": pool,
})
oldMetrics := map[metrics2.Int64Metric]struct{}{}
- go util.RepeatCtx(2*time.Minute, ctx, func() {
+ go util.RepeatCtx(2*time.Minute, ctx, func(ctx context.Context) {
newMetrics, err := reportBotMetrics(time.Now(), client, metricsClient, pool, swarmingServer)
if err != nil {
sklog.Error(err)
diff --git a/datahopper/go/swarming_metrics/tasks.go b/datahopper/go/swarming_metrics/tasks.go
index 3aa0a6f..b372cba 100644
--- a/datahopper/go/swarming_metrics/tasks.go
+++ b/datahopper/go/swarming_metrics/tasks.go
@@ -475,7 +475,7 @@
})
lastLoad := time.Now().Add(-2 * time.Minute)
revisitTasks := []string{}
- go util.RepeatCtx(10*time.Minute, ctx, func() {
+ go util.RepeatCtx(10*time.Minute, ctx, func(ctx context.Context) {
now := time.Now()
revisit, err := loadSwarmingTasks(swarm, pool, edb, perfClient, tnp, lastLoad, now, revisitTasks)
if err != nil {
diff --git a/gitsync/go/gitsync/watcher.go b/gitsync/go/gitsync/watcher.go
index ca9c17a..cb8b5ab 100644
--- a/gitsync/go/gitsync/watcher.go
+++ b/gitsync/go/gitsync/watcher.go
@@ -60,7 +60,7 @@
// defined by 'interval'.
func (r *RepoWatcher) Start(ctx context.Context, interval time.Duration) {
lvGitSync := metrics2.NewLiveness("last_successful_git_sync", map[string]string{"repo": r.repoURL})
- go util.RepeatCtx(interval, ctx, func() {
+ go util.RepeatCtx(interval, ctx, func(ctx context.Context) {
// Catch any panic and log relevant information to find the root cause.
defer func() {
if err := recover(); err != nil {
diff --git a/go/cleanup/cleanup.go b/go/cleanup/cleanup.go
index 9a42dd9..1fe470b 100644
--- a/go/cleanup/cleanup.go
+++ b/go/cleanup/cleanup.go
@@ -46,9 +46,9 @@
}
// Repeat runs the tick function immediately and on the given timer. When
-// Cancel() is called, the optional cleanup function is run after waiting for
-// the tick function to finish.
-func Repeat(tickFrequency time.Duration, tick, cleanup func()) {
+// Cancel() is called, waits for any active tick() to finish (tick may or may
+// not respect ctx.Done), and then the optional cleanup function is run.
+func Repeat(tickFrequency time.Duration, tick func(context.Context), cleanup func()) {
wg.Add(1)
go func() {
defer wg.Done()
diff --git a/go/cleanup/cleanup_test.go b/go/cleanup/cleanup_test.go
index 337c4d3..2ae5166 100644
--- a/go/cleanup/cleanup_test.go
+++ b/go/cleanup/cleanup_test.go
@@ -1,6 +1,7 @@
package cleanup
import (
+ "context"
"testing"
"time"
@@ -17,7 +18,7 @@
// expected.
count := 0
cleanup := false
- Repeat(interval, func() {
+ Repeat(interval, func(_ context.Context) {
count++
assert.False(t, cleanup)
}, func() {
@@ -41,7 +42,7 @@
}
for i := 0; i < n; i++ {
idx := i
- Repeat(interval, func() {
+ Repeat(interval, func(_ context.Context) {
counts[idx]++
assert.False(t, cleanups[idx])
}, func() {
diff --git a/go/firestore/firestore.go b/go/firestore/firestore.go
index 3e57214..6685d28 100644
--- a/go/firestore/firestore.go
+++ b/go/firestore/firestore.go
@@ -161,7 +161,7 @@
errorMetrics: errorMetrics,
metricTags: metricTags,
}
- go util.RepeatCtx(time.Minute, ctx, func() {
+ go util.RepeatCtx(time.Minute, ctx, func(ctx context.Context) {
c.activeOpsMtx.RLock()
ids := make([]int64, 0, len(c.activeOps))
for id := range c.activeOps {
diff --git a/go/metrics2/events/events.go b/go/metrics2/events/events.go
index 74f53ad..ab29e1d 100644
--- a/go/metrics2/events/events.go
+++ b/go/metrics2/events/events.go
@@ -120,7 +120,7 @@
lv := metrics2.NewLiveness("last_successful_event_metrics_update", map[string]string{
"measurement": m.measurement,
})
- go util.RepeatCtx(time.Minute, ctx, func() {
+ go util.RepeatCtx(time.Minute, ctx, func(ctx context.Context) {
if err := m.updateMetrics(time.Now()); err != nil {
sklog.Errorf("Failed to update event metrics: %s", err)
} else {
diff --git a/go/metrics2/liveness.go b/go/metrics2/liveness.go
index 9d63030..e5c1e2a 100644
--- a/go/metrics2/liveness.go
+++ b/go/metrics2/liveness.go
@@ -46,7 +46,7 @@
m: c.GetInt64Metric(measurement, tags),
mtx: sync.Mutex{},
}
- go util.RepeatCtx(LIVENESS_REPORT_FREQUENCY, ctx, l.update)
+ go util.RepeatCtx(LIVENESS_REPORT_FREQUENCY, ctx, func(_ context.Context) { l.update() })
return l
}
diff --git a/go/util/util.go b/go/util/util.go
index 7da15e6..13b7301 100644
--- a/go/util/util.go
+++ b/go/util/util.go
@@ -755,18 +755,18 @@
// RepeatCtx calls the provided function 'fn' immediately and then in intervals
// defined by 'interval'. If the given context is canceled, the iteration stops.
-func RepeatCtx(interval time.Duration, ctx context.Context, fn func()) {
+func RepeatCtx(interval time.Duration, ctx context.Context, fn func(ctx context.Context)) {
ticker := time.NewTicker(interval)
done := ctx.Done()
defer ticker.Stop()
- fn()
+ fn(ctx)
MainLoop:
for {
select {
case <-done:
break MainLoop
case <-ticker.C:
- fn()
+ fn(ctx)
}
}
}
diff --git a/go/vcsinfo/bt_vcs/bt_vcs.go b/go/vcsinfo/bt_vcs/bt_vcs.go
index 0902ef4..5156c5f 100644
--- a/go/vcsinfo/bt_vcs/bt_vcs.go
+++ b/go/vcsinfo/bt_vcs/bt_vcs.go
@@ -456,7 +456,7 @@
// Keep track of commits.
var prevCommits []*vcsinfo.IndexCommit
- go util.RepeatCtx(defaultWatchInterval, ctx, func() {
+ go util.RepeatCtx(defaultWatchInterval, ctx, func(ctx context.Context) {
allBranches, err := b.gitStore.GetBranches(ctx)
if err != nil {
sklog.Errorf("Error retrieving branches: %s", err)
diff --git a/golden/go/tilesource/tilesource.go b/golden/go/tilesource/tilesource.go
index 7eac6dd..c320957 100644
--- a/golden/go/tilesource/tilesource.go
+++ b/golden/go/tilesource/tilesource.go
@@ -66,7 +66,7 @@
if err := s.updateTile(ctx); err != nil {
return skerr.Wrapf(err, "failed initial tile update")
}
- go util.RepeatCtx(interval, ctx, func() {
+ go util.RepeatCtx(interval, ctx, func(ctx context.Context) {
if err := s.updateTile(ctx); err != nil {
sklog.Errorf("Could not update tile: %s", err)
}
diff --git a/k8s_checker/go/k8s_checker/main.go b/k8s_checker/go/k8s_checker/main.go
index 64713f3..8bcdb88 100644
--- a/k8s_checker/go/k8s_checker/main.go
+++ b/k8s_checker/go/k8s_checker/main.go
@@ -372,7 +372,7 @@
livenessDirtyConfigs := metrics2.NewLiveness(LIVENESS_DIRTY_CONFIGS_METRIC)
oldMetricsDirtyConfigs := map[metrics2.Int64Metric]struct{}{}
- go util.RepeatCtx(*dirtyConfigChecksPeriod, ctx, func() {
+ go util.RepeatCtx(*dirtyConfigChecksPeriod, ctx, func(ctx context.Context) {
newMetrics, err := checkForDirtyConfigs(ctx, oldMetricsDirtyConfigs)
if err != nil {
sklog.Errorf("Error when checking for dirty configs: %s", err)
@@ -384,7 +384,7 @@
livenessPodStatus := metrics2.NewLiveness(LIVENESS_POD_STATUS_METRIC)
oldMetricsPodStatus := map[metrics2.Int64Metric]struct{}{}
- go util.RepeatCtx(*podStatusMetricsPeriod, ctx, func() {
+ go util.RepeatCtx(*podStatusMetricsPeriod, ctx, func(ctx context.Context) {
newMetrics, err := updatePodStatusMetrics(ctx, oldMetricsPodStatus)
if err != nil {
sklog.Errorf("Error when checking pod statuses: %s", err)
diff --git a/skolo/go/skmetadata/skmetadata.go b/skolo/go/skmetadata/skmetadata.go
index 06f6388..6b8bb62 100644
--- a/skolo/go/skmetadata/skmetadata.go
+++ b/skolo/go/skmetadata/skmetadata.go
@@ -135,7 +135,7 @@
// get_oauth2_token runs every 45 minutes, and the tokens are valid for
// 60 minutes. Reloading the token every 10 minutes ensures that our
// token is always valid.
- util.RepeatCtx(10*time.Minute, ctx, func() {
+ util.RepeatCtx(10*time.Minute, ctx, func(ctx context.Context) {
if err := t.Update(); err != nil {
sklog.Errorf("Failed to update ServiceAccountToken from file: %s", err)
}
diff --git a/status/go/capacity/capacity.go b/status/go/capacity/capacity.go
index 4c4c0ac..9f36a05 100644
--- a/status/go/capacity/capacity.go
+++ b/status/go/capacity/capacity.go
@@ -317,7 +317,7 @@
// given interval of time. Any errors are logged, but the loop is not broken.
func (c *CapacityClient) StartLoading(ctx context.Context, interval time.Duration) {
go func() {
- util.RepeatCtx(interval, ctx, func() {
+ util.RepeatCtx(interval, ctx, func(ctx context.Context) {
if err := c.QueryAll(ctx); err != nil {
sklog.Errorf("There was a problem counting capacity stats")
}
diff --git a/status/go/incremental/incremental.go b/status/go/incremental/incremental.go
index d7d2ec0..b157d7e 100644
--- a/status/go/incremental/incremental.go
+++ b/status/go/incremental/incremental.go
@@ -272,7 +272,7 @@
func (c *IncrementalCache) UpdateLoop(frequency time.Duration, ctx context.Context) {
lv := metrics2.NewLiveness("last_successful_incremental_cache_update")
lastReset := time.Now()
- go util.RepeatCtx(frequency, ctx, func() {
+ go util.RepeatCtx(frequency, ctx, func(ctx context.Context) {
reset := false
now := time.Now()
if now.Sub(lastReset) > 24*time.Hour {
diff --git a/status/go/lkgr/lkgr.go b/status/go/lkgr/lkgr.go
index 4149571..3669edb 100644
--- a/status/go/lkgr/lkgr.go
+++ b/status/go/lkgr/lkgr.go
@@ -57,7 +57,7 @@
// Start updating LKGR in a loop.
func (r *LKGR) UpdateLoop(freq time.Duration, ctx context.Context) {
lv := metrics2.NewLiveness("last_successful_lkgr_update")
- go util.RepeatCtx(freq, ctx, func() {
+ go util.RepeatCtx(freq, ctx, func(ctx context.Context) {
if err := r.Update(ctx); err != nil {
sklog.Errorf("Failed to update LKGR: %s", err)
} else {
diff --git a/status/go/status/main.go b/status/go/status/main.go
index ec7685e..2cd0067 100644
--- a/status/go/status/main.go
+++ b/status/go/status/main.go
@@ -814,7 +814,7 @@
sklog.Fatalf("Failed to create TaskCache: %s", err)
}
lvTaskCache := metrics2.NewLiveness("status_task_cache")
- go util.RepeatCtx(60*time.Second, ctx, func() {
+ go util.RepeatCtx(60*time.Second, ctx, func(ctx context.Context) {
if err := tCache.Update(); err != nil {
sklog.Errorf("Failed to update TaskCache: %s", err)
} else {
@@ -856,7 +856,7 @@
if err := updateAutorollStatus(ctx); err != nil {
sklog.Fatal(err)
}
- go util.RepeatCtx(60*time.Second, ctx, func() {
+ go util.RepeatCtx(60*time.Second, ctx, func(ctx context.Context) {
if err := updateAutorollStatus(ctx); err != nil {
sklog.Errorf("Failed to update autoroll status: %s", err)
}
diff --git a/status/go/status/tasks_per_commit.go b/status/go/status/tasks_per_commit.go
index 8566973..3301968 100644
--- a/status/go/status/tasks_per_commit.go
+++ b/status/go/status/tasks_per_commit.go
@@ -36,7 +36,7 @@
repos: repos,
tcc: tcc,
}
- go util.RepeatCtx(time.Minute, ctx, func() {
+ go util.RepeatCtx(time.Minute, ctx, func(ctx context.Context) {
if err := c.update(ctx); err != nil {
sklog.Errorf("Failed to update tasksPerCommitCache: %s", err)
}
diff --git a/task_scheduler/go/scheduling/task_scheduler.go b/task_scheduler/go/scheduling/task_scheduler.go
index 3a60d0e..0d26653 100644
--- a/task_scheduler/go/scheduling/task_scheduler.go
+++ b/task_scheduler/go/scheduling/task_scheduler.go
@@ -18,6 +18,7 @@
multierror "github.com/hashicorp/go-multierror"
swarming_api "go.chromium.org/luci/common/api/swarming/swarming/v1"
"go.chromium.org/luci/common/isolated"
+ "go.skia.org/infra/go/cleanup"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/gcs"
"go.skia.org/infra/go/gerrit"
@@ -245,7 +246,12 @@
s.tryjobs.Start(ctx)
}
lvScheduling := metrics2.NewLiveness("last_successful_task_scheduling")
- go util.RepeatCtx(5*time.Second, ctx, func() {
+ cleanup.Repeat(5*time.Second, func(_ context.Context) {
+ // Explicitly ignore the passed-in context; this allows us to
+ // finish the current scheduling cycle even if the context is
+ // canceled, which helps prevent "orphaned" tasks which were
+ // triggered on Swarming but were not inserted into the DB.
+ ctx := context.Background()
sklog.Infof("Running beforeMainLoop()")
beforeMainLoop()
sklog.Infof("beforeMainLoop() finished.")
@@ -254,9 +260,9 @@
} else {
lvScheduling.Reset()
}
- })
+ }, nil)
lvUpdateUnfinishedTasks := metrics2.NewLiveness("last_successful_tasks_update")
- go util.RepeatCtx(5*time.Minute, ctx, func() {
+ go util.RepeatCtx(5*time.Minute, ctx, func(ctx context.Context) {
if err := s.updateUnfinishedTasks(); err != nil {
sklog.Errorf("Failed to run periodic tasks update: %s", err)
} else {
@@ -264,13 +270,17 @@
}
})
lvUpdateRepos := metrics2.NewLiveness("last_successful_repo_update")
- go util.RepeatCtx(10*time.Second, ctx, func() {
+ cleanup.Repeat(10*time.Second, func(_ context.Context) {
+ // Explicitly ignore the passed-in context; this allows us to
+ // continue running even if the context is canceled, which helps
+ // to prevent partial insertions of new jobs.
+ ctx := context.Background()
if err := s.updateRepos(ctx); err != nil {
sklog.Errorf("Failed to update repos: %s", err)
} else {
lvUpdateRepos.Reset()
}
- })
+ }, nil)
}
// initCaches ensures that all of the RepoStates we care about are present
diff --git a/task_scheduler/go/tryjobs/tryjobs.go b/task_scheduler/go/tryjobs/tryjobs.go
index 85497f2..72b6d34 100644
--- a/task_scheduler/go/tryjobs/tryjobs.go
+++ b/task_scheduler/go/tryjobs/tryjobs.go
@@ -11,11 +11,11 @@
buildbucket_api "go.chromium.org/luci/common/api/buildbucket/buildbucket/v1"
"go.skia.org/infra/go/buildbucket"
+ "go.skia.org/infra/go/cleanup"
"go.skia.org/infra/go/gerrit"
"go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/sklog"
- "go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/cacher"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/db/cache"
@@ -105,16 +105,27 @@
// Start initiates the TryJobIntegrator's heatbeat and polling loops. If the
// given Context is canceled, the loops stop.
func (t *TryJobIntegrator) Start(ctx context.Context) {
- go util.RepeatCtx(UPDATE_INTERVAL, ctx, func() {
+ cleanup.Repeat(UPDATE_INTERVAL, func(_ context.Context) {
+ // Explicitly ignore the passed-in context; this allows us to
+ // finish sending heartbeats and updating finished jobs in the
+ // DB even if the context is canceled, which helps to prevent
+ // inconsistencies between Buildbucket and the Task Scheduler
+ // DB.
if err := t.updateJobs(time.Now()); err != nil {
sklog.Error(err)
}
- })
- go util.RepeatCtx(POLL_INTERVAL, ctx, func() {
+ }, nil)
+ cleanup.Repeat(POLL_INTERVAL, func(_ context.Context) {
+ // Explicitly ignore the passed-in context; this allows us to
+ // finish leasing jobs from Buildbucket and inserting them into
+ // the DB even if the context is canceled, which helps to
+ // prevent inconsistencies between Buildbucket and the Task
+ // Scheduler DB.
+ ctx := context.Background()
if err := t.Poll(ctx); err != nil {
sklog.Errorf("Failed to poll for new try jobs: %s", err)
}
- })
+ }, nil)
}
// getActiveTryJobs returns the active (not yet marked as finished in