Update util.RepeatCtx to pass the Context to the func

This allows the func to decide whether or not to respect Context.Done().

Change-Id: I36eb2dbb465b2d27e37507b563c40af1db5e2bbb
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/233960
Reviewed-by: Ravi Mistry <rmistry@google.com>
Reviewed-by: Ben Wagner aka dogben <benjaminwagner@google.com>
Commit-Queue: Eric Boren <borenet@google.com>
diff --git a/android_compile/go/android_compile_be/checkouts.go b/android_compile/go/android_compile_be/checkouts.go
index 70cd5b1..5dfa994 100644
--- a/android_compile/go/android_compile_be/checkouts.go
+++ b/android_compile/go/android_compile_be/checkouts.go
@@ -136,7 +136,7 @@
 		}
 	}
 	// Update mirror here and then periodically.
-	cleanup.Repeat(repoUpdateDuration, func() {
+	cleanup.Repeat(repoUpdateDuration, func(ctx context.Context) {
 		UpdateMirror(ctx)
 	}, nil)
 
diff --git a/android_compile/go/android_compile_be/main.go b/android_compile/go/android_compile_be/main.go
index 1bc0b98..8da0589 100644
--- a/android_compile/go/android_compile_be/main.go
+++ b/android_compile/go/android_compile_be/main.go
@@ -375,7 +375,7 @@
 
 	// Start listener for when the mirror should be force synced.
 	// Update mirror here and then periodically.
-	cleanup.Repeat(time.Minute, func() {
+	cleanup.Repeat(time.Minute, func(ctx context.Context) {
 		// Check the datastore and if it is true then Update the mirror!
 		forceMirror, err := ac_util.GetForceMirrorUpdateBool(ctx, hostname)
 		if err != nil {
diff --git a/android_compile/go/android_compile_fe/main.go b/android_compile/go/android_compile_fe/main.go
index a22bd92..1e021cf 100644
--- a/android_compile/go/android_compile_fe/main.go
+++ b/android_compile/go/android_compile_fe/main.go
@@ -165,7 +165,7 @@
 	}
 
 	// Start updater for the queue length metrics.
-	cleanup.Repeat(time.Minute, func() {
+	cleanup.Repeat(time.Minute, func(ctx context.Context) {
 		unownedPendingTasks, ownedPendingTasks, err := util.GetPendingCompileTasks("" /* ownedByInstance */)
 		if err != nil {
 			sklog.Errorf("Failed to get unowned/owned compile tasks: %s", err)
diff --git a/autoroll/go/autoroll-fe/main.go b/autoroll/go/autoroll-fe/main.go
index 42d9ee3..6cef645 100644
--- a/autoroll/go/autoroll-fe/main.go
+++ b/autoroll/go/autoroll-fe/main.go
@@ -475,7 +475,7 @@
 		if err != nil {
 			sklog.Fatal(err)
 		}
-		go util.RepeatCtx(10*time.Second, ctx, func() {
+		go util.RepeatCtx(10*time.Second, ctx, func(ctx context.Context) {
 			if err := arbMode.Update(ctx); err != nil {
 				sklog.Error(err)
 			}
@@ -484,7 +484,7 @@
 		if err != nil {
 			sklog.Fatal(err)
 		}
-		go util.RepeatCtx(10*time.Second, ctx, func() {
+		go util.RepeatCtx(10*time.Second, ctx, func(ctx context.Context) {
 			if err := arbStatus.Update(ctx); err != nil {
 				sklog.Error(err)
 			}
@@ -493,7 +493,7 @@
 		if err != nil {
 			sklog.Fatal(err)
 		}
-		go util.RepeatCtx(10*time.Second, ctx, func() {
+		go util.RepeatCtx(10*time.Second, ctx, func(ctx context.Context) {
 			if err := arbStrategy.Update(ctx); err != nil {
 				sklog.Error(err)
 			}
diff --git a/autoroll/go/autoroll-google3/google3.go b/autoroll/go/autoroll-google3/google3.go
index 8f83302..491c3d5 100644
--- a/autoroll/go/autoroll-google3/google3.go
+++ b/autoroll/go/autoroll-google3/google3.go
@@ -77,7 +77,7 @@
 
 // Start ensures DBs are closed when ctx is canceled.
 func (a *AutoRoller) Start(ctx context.Context, tickFrequency, repoFrequency time.Duration) {
-	go cleanup.Repeat(repoFrequency, func() {
+	go cleanup.Repeat(repoFrequency, func(ctx context.Context) {
 		util.LogErr(a.UpdateStatus(ctx, "", true))
 	}, nil)
 }
diff --git a/autoroll/go/repo_manager/repo_manager.go b/autoroll/go/repo_manager/repo_manager.go
index 8380faf..478da33 100644
--- a/autoroll/go/repo_manager/repo_manager.go
+++ b/autoroll/go/repo_manager/repo_manager.go
@@ -83,8 +83,13 @@
 func Start(ctx context.Context, r RepoManager, frequency time.Duration) {
 	sklog.Infof("Starting repo_manager")
 	lv := metrics2.NewLiveness("last_successful_repo_manager_update")
-	cleanup.Repeat(frequency, func() {
+	cleanup.Repeat(frequency, func(_ context.Context) {
 		sklog.Infof("Running repo_manager update.")
+		// Explicitly ignore the passed-in context; this allows us to
+		// continue updating the RepoManager even if the context is
+		// canceled, which helps to prevent errors due to interrupted
+		// syncs, etc.
+		ctx := context.Background()
 		if err := r.Update(ctx); err != nil {
 			sklog.Errorf("Failed to update repo manager: %s", err)
 		} else {
diff --git a/autoroll/go/roller/autoroller.go b/autoroll/go/roller/autoroller.go
index dac0432..e107d4e 100644
--- a/autoroll/go/roller/autoroller.go
+++ b/autoroll/go/roller/autoroller.go
@@ -286,7 +286,11 @@
 	sklog.Infof("Starting autoroller.")
 	repo_manager.Start(ctx, r.rm, repoFrequency)
 	lv := metrics2.NewLiveness("last_successful_autoroll_tick", map[string]string{"roller": r.roller})
-	cleanup.Repeat(tickFrequency, func() {
+	cleanup.Repeat(tickFrequency, func(_ context.Context) {
+		// Explicitly ignore the passed-in context; this allows us to
+		// continue running even if the context is canceled, which helps
+		// to prevent errors due to interrupted syncs, etc.
+		ctx := context.Background()
 		if err := r.Tick(ctx); err != nil {
 			// Hack: we frequently get failures from GoB which trigger error-rate alerts.
 			// These alerts are noise and sometimes hide real failures. If the error is
@@ -303,7 +307,7 @@
 	}, nil)
 
 	// Update the current sheriff in a loop.
-	cleanup.Repeat(30*time.Minute, func() {
+	cleanup.Repeat(30*time.Minute, func(ctx context.Context) {
 		emails, err := getSheriff(r.cfg.ParentName, r.cfg.ChildName, r.cfg.RollerName, r.cfg.Sheriff, r.cfg.SheriffBackup)
 		if err != nil {
 			sklog.Errorf("Failed to retrieve current sheriff: %s", err)
@@ -323,7 +327,12 @@
 	// Handle requests for manual rolls.
 	if r.cfg.SupportsManualRolls {
 		lvManualRolls := metrics2.NewLiveness("last_successful_manual_roll_check", map[string]string{"roller": r.roller})
-		cleanup.Repeat(time.Minute, func() {
+		cleanup.Repeat(time.Minute, func(_ context.Context) {
+			// Explicitly ignore the passed-in context; this allows
+			// us to continue handling manual rolls even if the
+			// context is canceled, which helps to prevent errors
+			// due to interrupted syncs, etc.
+			ctx := context.Background()
 			if err := r.handleManualRolls(ctx); err != nil {
 				sklog.Error(err)
 			} else {
diff --git a/datahopper/go/bot_metrics/bot_metrics.go b/datahopper/go/bot_metrics/bot_metrics.go
index f5a3099..3178384 100644
--- a/datahopper/go/bot_metrics/bot_metrics.go
+++ b/datahopper/go/bot_metrics/bot_metrics.go
@@ -426,7 +426,7 @@
 	if err != nil {
 		return fmt.Errorf("Failed to get timestamp of last successful ingestion: %s", err)
 	}
-	go util.RepeatCtx(10*time.Minute, ctx, func() {
+	go util.RepeatCtx(10*time.Minute, ctx, func(ctx context.Context) {
 		now := time.Now()
 		if err := cycle(ctx, taskDb, repos, tcc, edb, em, lastFinished, now); err != nil {
 			sklog.Errorf("Failed to obtain avg time to X%% bot coverage metrics: %s", err)
diff --git a/datahopper/go/datahopper/firestore_backup_metrics.go b/datahopper/go/datahopper/firestore_backup_metrics.go
index 6dd2b21..6d2a811 100644
--- a/datahopper/go/datahopper/firestore_backup_metrics.go
+++ b/datahopper/go/datahopper/firestore_backup_metrics.go
@@ -95,7 +95,7 @@
 	httpClient := httputils.DefaultClientConfig().WithTokenSource(tokenSource).With2xxOnly().Client()
 	lvMetrics := metrics2.NewLiveness("last_successful_firestore_backup_metrics_update")
 	lvBackup := metrics2.NewLiveness("last_successful_firestore_backup")
-	go util.RepeatCtx(5*time.Minute, ctx, func() {
+	go util.RepeatCtx(5*time.Minute, ctx, func(ctx context.Context) {
 		if lastBackupTime, err := getFirestoreLastBackupCompleted(ctx, httpClient); err != nil {
 			sklog.Errorf("Failed to update firestore backup metrics: %s", err)
 		} else {
diff --git a/datahopper/go/datahopper/jobs.go b/datahopper/go/datahopper/jobs.go
index 4919530..a4f12ab 100644
--- a/datahopper/go/datahopper/jobs.go
+++ b/datahopper/go/datahopper/jobs.go
@@ -305,7 +305,7 @@
 	om.start(ctx)
 
 	lv := metrics2.NewLiveness("last_successful_job_metrics_update")
-	go util.RepeatCtx(5*time.Minute, ctx, func() {
+	go util.RepeatCtx(5*time.Minute, ctx, func(ctx context.Context) {
 		if err := edb.update(); err != nil {
 			sklog.Errorf("Failed to update job data: %s", err)
 		} else {
@@ -362,7 +362,7 @@
 
 func (m *overdueJobMetrics) start(ctx context.Context) {
 	lvOverdueMetrics := metrics2.NewLiveness("last_successful_overdue_metrics_update")
-	go util.RepeatCtx(5*time.Second, ctx, func() {
+	go util.RepeatCtx(5*time.Second, ctx, func(ctx context.Context) {
 		if err := m.updateOverdueJobSpecMetrics(ctx, time.Now()); err != nil {
 			sklog.Errorf("Failed to update metrics for overdue job specs: %s", err)
 		} else {
diff --git a/datahopper/go/datahopper/main.go b/datahopper/go/datahopper/main.go
index 5db8804..597acb3 100644
--- a/datahopper/go/datahopper/main.go
+++ b/datahopper/go/datahopper/main.go
@@ -109,7 +109,7 @@
 		sklog.Fatal(err)
 	}
 	lvRepos := metrics2.NewLiveness("datahopper_repo_update")
-	go util.RepeatCtx(time.Minute, ctx, func() {
+	go util.RepeatCtx(time.Minute, ctx, func(ctx context.Context) {
 		if err := repos.Update(ctx); err != nil {
 			sklog.Errorf("Failed to update repos: %s", err)
 		} else {
@@ -122,7 +122,7 @@
 	if err != nil {
 		sklog.Fatalf("Failed to create TaskCfgCache: %s", err)
 	}
-	go util.RepeatCtx(30*time.Minute, ctx, func() {
+	go util.RepeatCtx(30*time.Minute, ctx, func(ctx context.Context) {
 		if err := tcc.Cleanup(ctx, OVERDUE_JOB_METRICS_PERIOD); err != nil {
 			sklog.Errorf("Failed to cleanup TaskCfgCache: %s", err)
 		}
diff --git a/datahopper/go/datahopper/tasks.go b/datahopper/go/datahopper/tasks.go
index f0892ce..f2dbc23 100644
--- a/datahopper/go/datahopper/tasks.go
+++ b/datahopper/go/datahopper/tasks.go
@@ -190,7 +190,7 @@
 	}
 
 	lv := metrics2.NewLiveness("last_successful_task_metrics_update")
-	go util.RepeatCtx(5*time.Minute, ctx, func() {
+	go util.RepeatCtx(5*time.Minute, ctx, func(ctx context.Context) {
 		if err := edb.update(); err != nil {
 			sklog.Errorf("Failed to update task data: %s", err)
 		} else {
diff --git a/datahopper/go/supported_branches/supported_branches.go b/datahopper/go/supported_branches/supported_branches.go
index e2fc15c..30fed7a 100644
--- a/datahopper/go/supported_branches/supported_branches.go
+++ b/datahopper/go/supported_branches/supported_branches.go
@@ -228,7 +228,7 @@
 	}
 	lv := metrics2.NewLiveness("last_successful_supported_branches_update")
 	oldMetrics := map[metrics2.Int64Metric]struct{}{}
-	go util.RepeatCtx(5*time.Minute, ctx, func() {
+	go util.RepeatCtx(5*time.Minute, ctx, func(ctx context.Context) {
 		newMetrics, err := cycle(repos, oldMetrics, swarm, pools)
 		if err == nil {
 			lv.Reset()
diff --git a/datahopper/go/swarming_metrics/bots.go b/datahopper/go/swarming_metrics/bots.go
index 8b13c2c..4b7fd6a 100644
--- a/datahopper/go/swarming_metrics/bots.go
+++ b/datahopper/go/swarming_metrics/bots.go
@@ -261,7 +261,7 @@
 			"pool":   pool,
 		})
 		oldMetrics := map[metrics2.Int64Metric]struct{}{}
-		go util.RepeatCtx(2*time.Minute, ctx, func() {
+		go util.RepeatCtx(2*time.Minute, ctx, func(ctx context.Context) {
 			newMetrics, err := reportBotMetrics(time.Now(), client, metricsClient, pool, swarmingServer)
 			if err != nil {
 				sklog.Error(err)
diff --git a/datahopper/go/swarming_metrics/tasks.go b/datahopper/go/swarming_metrics/tasks.go
index 3aa0a6f..b372cba 100644
--- a/datahopper/go/swarming_metrics/tasks.go
+++ b/datahopper/go/swarming_metrics/tasks.go
@@ -475,7 +475,7 @@
 	})
 	lastLoad := time.Now().Add(-2 * time.Minute)
 	revisitTasks := []string{}
-	go util.RepeatCtx(10*time.Minute, ctx, func() {
+	go util.RepeatCtx(10*time.Minute, ctx, func(ctx context.Context) {
 		now := time.Now()
 		revisit, err := loadSwarmingTasks(swarm, pool, edb, perfClient, tnp, lastLoad, now, revisitTasks)
 		if err != nil {
diff --git a/gitsync/go/gitsync/watcher.go b/gitsync/go/gitsync/watcher.go
index ca9c17a..cb8b5ab 100644
--- a/gitsync/go/gitsync/watcher.go
+++ b/gitsync/go/gitsync/watcher.go
@@ -60,7 +60,7 @@
 // defined by 'interval'.
 func (r *RepoWatcher) Start(ctx context.Context, interval time.Duration) {
 	lvGitSync := metrics2.NewLiveness("last_successful_git_sync", map[string]string{"repo": r.repoURL})
-	go util.RepeatCtx(interval, ctx, func() {
+	go util.RepeatCtx(interval, ctx, func(ctx context.Context) {
 		// Catch any panic and log relevant information to find the root cause.
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/go/cleanup/cleanup.go b/go/cleanup/cleanup.go
index 9a42dd9..1fe470b 100644
--- a/go/cleanup/cleanup.go
+++ b/go/cleanup/cleanup.go
@@ -46,9 +46,9 @@
 }
 
 // Repeat runs the tick function immediately and on the given timer. When
-// Cancel() is called, the optional cleanup function is run after waiting for
-// the tick function to finish.
-func Repeat(tickFrequency time.Duration, tick, cleanup func()) {
+// Cancel() is called, waits for any active tick() to finish (tick may or may
+// not respect ctx.Done), and then the optional cleanup function is run.
+func Repeat(tickFrequency time.Duration, tick func(context.Context), cleanup func()) {
 	wg.Add(1)
 	go func() {
 		defer wg.Done()
diff --git a/go/cleanup/cleanup_test.go b/go/cleanup/cleanup_test.go
index 337c4d3..2ae5166 100644
--- a/go/cleanup/cleanup_test.go
+++ b/go/cleanup/cleanup_test.go
@@ -1,6 +1,7 @@
 package cleanup
 
 import (
+	"context"
 	"testing"
 	"time"
 
@@ -17,7 +18,7 @@
 	// expected.
 	count := 0
 	cleanup := false
-	Repeat(interval, func() {
+	Repeat(interval, func(_ context.Context) {
 		count++
 		assert.False(t, cleanup)
 	}, func() {
@@ -41,7 +42,7 @@
 	}
 	for i := 0; i < n; i++ {
 		idx := i
-		Repeat(interval, func() {
+		Repeat(interval, func(_ context.Context) {
 			counts[idx]++
 			assert.False(t, cleanups[idx])
 		}, func() {
diff --git a/go/firestore/firestore.go b/go/firestore/firestore.go
index 3e57214..6685d28 100644
--- a/go/firestore/firestore.go
+++ b/go/firestore/firestore.go
@@ -161,7 +161,7 @@
 		errorMetrics:   errorMetrics,
 		metricTags:     metricTags,
 	}
-	go util.RepeatCtx(time.Minute, ctx, func() {
+	go util.RepeatCtx(time.Minute, ctx, func(ctx context.Context) {
 		c.activeOpsMtx.RLock()
 		ids := make([]int64, 0, len(c.activeOps))
 		for id := range c.activeOps {
diff --git a/go/metrics2/events/events.go b/go/metrics2/events/events.go
index 74f53ad..ab29e1d 100644
--- a/go/metrics2/events/events.go
+++ b/go/metrics2/events/events.go
@@ -120,7 +120,7 @@
 	lv := metrics2.NewLiveness("last_successful_event_metrics_update", map[string]string{
 		"measurement": m.measurement,
 	})
-	go util.RepeatCtx(time.Minute, ctx, func() {
+	go util.RepeatCtx(time.Minute, ctx, func(ctx context.Context) {
 		if err := m.updateMetrics(time.Now()); err != nil {
 			sklog.Errorf("Failed to update event metrics: %s", err)
 		} else {
diff --git a/go/metrics2/liveness.go b/go/metrics2/liveness.go
index 9d63030..e5c1e2a 100644
--- a/go/metrics2/liveness.go
+++ b/go/metrics2/liveness.go
@@ -46,7 +46,7 @@
 		m:                    c.GetInt64Metric(measurement, tags),
 		mtx:                  sync.Mutex{},
 	}
-	go util.RepeatCtx(LIVENESS_REPORT_FREQUENCY, ctx, l.update)
+	go util.RepeatCtx(LIVENESS_REPORT_FREQUENCY, ctx, func(_ context.Context) { l.update() })
 	return l
 }
 
diff --git a/go/util/util.go b/go/util/util.go
index 7da15e6..13b7301 100644
--- a/go/util/util.go
+++ b/go/util/util.go
@@ -755,18 +755,18 @@
 
 // RepeatCtx calls the provided function 'fn' immediately and then in intervals
 // defined by 'interval'. If the given context is canceled, the iteration stops.
-func RepeatCtx(interval time.Duration, ctx context.Context, fn func()) {
+func RepeatCtx(interval time.Duration, ctx context.Context, fn func(ctx context.Context)) {
 	ticker := time.NewTicker(interval)
 	done := ctx.Done()
 	defer ticker.Stop()
-	fn()
+	fn(ctx)
 MainLoop:
 	for {
 		select {
 		case <-done:
 			break MainLoop
 		case <-ticker.C:
-			fn()
+			fn(ctx)
 		}
 	}
 }
diff --git a/go/vcsinfo/bt_vcs/bt_vcs.go b/go/vcsinfo/bt_vcs/bt_vcs.go
index 0902ef4..5156c5f 100644
--- a/go/vcsinfo/bt_vcs/bt_vcs.go
+++ b/go/vcsinfo/bt_vcs/bt_vcs.go
@@ -456,7 +456,7 @@
 
 	// Keep track of commits.
 	var prevCommits []*vcsinfo.IndexCommit
-	go util.RepeatCtx(defaultWatchInterval, ctx, func() {
+	go util.RepeatCtx(defaultWatchInterval, ctx, func(ctx context.Context) {
 		allBranches, err := b.gitStore.GetBranches(ctx)
 		if err != nil {
 			sklog.Errorf("Error retrieving branches: %s", err)
diff --git a/golden/go/tilesource/tilesource.go b/golden/go/tilesource/tilesource.go
index 7eac6dd..c320957 100644
--- a/golden/go/tilesource/tilesource.go
+++ b/golden/go/tilesource/tilesource.go
@@ -66,7 +66,7 @@
 	if err := s.updateTile(ctx); err != nil {
 		return skerr.Wrapf(err, "failed initial tile update")
 	}
-	go util.RepeatCtx(interval, ctx, func() {
+	go util.RepeatCtx(interval, ctx, func(ctx context.Context) {
 		if err := s.updateTile(ctx); err != nil {
 			sklog.Errorf("Could not update tile: %s", err)
 		}
diff --git a/k8s_checker/go/k8s_checker/main.go b/k8s_checker/go/k8s_checker/main.go
index 64713f3..8bcdb88 100644
--- a/k8s_checker/go/k8s_checker/main.go
+++ b/k8s_checker/go/k8s_checker/main.go
@@ -372,7 +372,7 @@
 
 	livenessDirtyConfigs := metrics2.NewLiveness(LIVENESS_DIRTY_CONFIGS_METRIC)
 	oldMetricsDirtyConfigs := map[metrics2.Int64Metric]struct{}{}
-	go util.RepeatCtx(*dirtyConfigChecksPeriod, ctx, func() {
+	go util.RepeatCtx(*dirtyConfigChecksPeriod, ctx, func(ctx context.Context) {
 		newMetrics, err := checkForDirtyConfigs(ctx, oldMetricsDirtyConfigs)
 		if err != nil {
 			sklog.Errorf("Error when checking for dirty configs: %s", err)
@@ -384,7 +384,7 @@
 
 	livenessPodStatus := metrics2.NewLiveness(LIVENESS_POD_STATUS_METRIC)
 	oldMetricsPodStatus := map[metrics2.Int64Metric]struct{}{}
-	go util.RepeatCtx(*podStatusMetricsPeriod, ctx, func() {
+	go util.RepeatCtx(*podStatusMetricsPeriod, ctx, func(ctx context.Context) {
 		newMetrics, err := updatePodStatusMetrics(ctx, oldMetricsPodStatus)
 		if err != nil {
 			sklog.Errorf("Error when checking pod statuses: %s", err)
diff --git a/skolo/go/skmetadata/skmetadata.go b/skolo/go/skmetadata/skmetadata.go
index 06f6388..6b8bb62 100644
--- a/skolo/go/skmetadata/skmetadata.go
+++ b/skolo/go/skmetadata/skmetadata.go
@@ -135,7 +135,7 @@
 	// get_oauth2_token runs every 45 minutes, and the tokens are valid for
 	// 60 minutes. Reloading the token every 10 minutes ensures that our
 	// token is always valid.
-	util.RepeatCtx(10*time.Minute, ctx, func() {
+	util.RepeatCtx(10*time.Minute, ctx, func(ctx context.Context) {
 		if err := t.Update(); err != nil {
 			sklog.Errorf("Failed to update ServiceAccountToken from file: %s", err)
 		}
diff --git a/status/go/capacity/capacity.go b/status/go/capacity/capacity.go
index 4c4c0ac..9f36a05 100644
--- a/status/go/capacity/capacity.go
+++ b/status/go/capacity/capacity.go
@@ -317,7 +317,7 @@
 // given interval of time.  Any errors are logged, but the loop is not broken.
 func (c *CapacityClient) StartLoading(ctx context.Context, interval time.Duration) {
 	go func() {
-		util.RepeatCtx(interval, ctx, func() {
+		util.RepeatCtx(interval, ctx, func(ctx context.Context) {
 			if err := c.QueryAll(ctx); err != nil {
 				sklog.Errorf("There was a problem counting capacity stats")
 			}
diff --git a/status/go/incremental/incremental.go b/status/go/incremental/incremental.go
index d7d2ec0..b157d7e 100644
--- a/status/go/incremental/incremental.go
+++ b/status/go/incremental/incremental.go
@@ -272,7 +272,7 @@
 func (c *IncrementalCache) UpdateLoop(frequency time.Duration, ctx context.Context) {
 	lv := metrics2.NewLiveness("last_successful_incremental_cache_update")
 	lastReset := time.Now()
-	go util.RepeatCtx(frequency, ctx, func() {
+	go util.RepeatCtx(frequency, ctx, func(ctx context.Context) {
 		reset := false
 		now := time.Now()
 		if now.Sub(lastReset) > 24*time.Hour {
diff --git a/status/go/lkgr/lkgr.go b/status/go/lkgr/lkgr.go
index 4149571..3669edb 100644
--- a/status/go/lkgr/lkgr.go
+++ b/status/go/lkgr/lkgr.go
@@ -57,7 +57,7 @@
 // Start updating LKGR in a loop.
 func (r *LKGR) UpdateLoop(freq time.Duration, ctx context.Context) {
 	lv := metrics2.NewLiveness("last_successful_lkgr_update")
-	go util.RepeatCtx(freq, ctx, func() {
+	go util.RepeatCtx(freq, ctx, func(ctx context.Context) {
 		if err := r.Update(ctx); err != nil {
 			sklog.Errorf("Failed to update LKGR: %s", err)
 		} else {
diff --git a/status/go/status/main.go b/status/go/status/main.go
index ec7685e..2cd0067 100644
--- a/status/go/status/main.go
+++ b/status/go/status/main.go
@@ -814,7 +814,7 @@
 		sklog.Fatalf("Failed to create TaskCache: %s", err)
 	}
 	lvTaskCache := metrics2.NewLiveness("status_task_cache")
-	go util.RepeatCtx(60*time.Second, ctx, func() {
+	go util.RepeatCtx(60*time.Second, ctx, func(ctx context.Context) {
 		if err := tCache.Update(); err != nil {
 			sklog.Errorf("Failed to update TaskCache: %s", err)
 		} else {
@@ -856,7 +856,7 @@
 	if err := updateAutorollStatus(ctx); err != nil {
 		sklog.Fatal(err)
 	}
-	go util.RepeatCtx(60*time.Second, ctx, func() {
+	go util.RepeatCtx(60*time.Second, ctx, func(ctx context.Context) {
 		if err := updateAutorollStatus(ctx); err != nil {
 			sklog.Errorf("Failed to update autoroll status: %s", err)
 		}
diff --git a/status/go/status/tasks_per_commit.go b/status/go/status/tasks_per_commit.go
index 8566973..3301968 100644
--- a/status/go/status/tasks_per_commit.go
+++ b/status/go/status/tasks_per_commit.go
@@ -36,7 +36,7 @@
 		repos:  repos,
 		tcc:    tcc,
 	}
-	go util.RepeatCtx(time.Minute, ctx, func() {
+	go util.RepeatCtx(time.Minute, ctx, func(ctx context.Context) {
 		if err := c.update(ctx); err != nil {
 			sklog.Errorf("Failed to update tasksPerCommitCache: %s", err)
 		}
diff --git a/task_scheduler/go/scheduling/task_scheduler.go b/task_scheduler/go/scheduling/task_scheduler.go
index 3a60d0e..0d26653 100644
--- a/task_scheduler/go/scheduling/task_scheduler.go
+++ b/task_scheduler/go/scheduling/task_scheduler.go
@@ -18,6 +18,7 @@
 	multierror "github.com/hashicorp/go-multierror"
 	swarming_api "go.chromium.org/luci/common/api/swarming/swarming/v1"
 	"go.chromium.org/luci/common/isolated"
+	"go.skia.org/infra/go/cleanup"
 	"go.skia.org/infra/go/common"
 	"go.skia.org/infra/go/gcs"
 	"go.skia.org/infra/go/gerrit"
@@ -245,7 +246,12 @@
 		s.tryjobs.Start(ctx)
 	}
 	lvScheduling := metrics2.NewLiveness("last_successful_task_scheduling")
-	go util.RepeatCtx(5*time.Second, ctx, func() {
+	cleanup.Repeat(5*time.Second, func(_ context.Context) {
+		// Explicitly ignore the passed-in context; this allows us to
+		// finish the current scheduling cycle even if the context is
+		// canceled, which helps prevent "orphaned" tasks which were
+		// triggered on Swarming but were not inserted into the DB.
+		ctx := context.Background()
 		sklog.Infof("Running beforeMainLoop()")
 		beforeMainLoop()
 		sklog.Infof("beforeMainLoop() finished.")
@@ -254,9 +260,9 @@
 		} else {
 			lvScheduling.Reset()
 		}
-	})
+	}, nil)
 	lvUpdateUnfinishedTasks := metrics2.NewLiveness("last_successful_tasks_update")
-	go util.RepeatCtx(5*time.Minute, ctx, func() {
+	go util.RepeatCtx(5*time.Minute, ctx, func(ctx context.Context) {
 		if err := s.updateUnfinishedTasks(); err != nil {
 			sklog.Errorf("Failed to run periodic tasks update: %s", err)
 		} else {
@@ -264,13 +270,17 @@
 		}
 	})
 	lvUpdateRepos := metrics2.NewLiveness("last_successful_repo_update")
-	go util.RepeatCtx(10*time.Second, ctx, func() {
+	cleanup.Repeat(10*time.Second, func(_ context.Context) {
+		// Explicitly ignore the passed-in context; this allows us to
+		// continue running even if the context is canceled, which helps
+		// to prevent partial insertions of new jobs.
+		ctx := context.Background()
 		if err := s.updateRepos(ctx); err != nil {
 			sklog.Errorf("Failed to update repos: %s", err)
 		} else {
 			lvUpdateRepos.Reset()
 		}
-	})
+	}, nil)
 }
 
 // initCaches ensures that all of the RepoStates we care about are present
diff --git a/task_scheduler/go/tryjobs/tryjobs.go b/task_scheduler/go/tryjobs/tryjobs.go
index 85497f2..72b6d34 100644
--- a/task_scheduler/go/tryjobs/tryjobs.go
+++ b/task_scheduler/go/tryjobs/tryjobs.go
@@ -11,11 +11,11 @@
 
 	buildbucket_api "go.chromium.org/luci/common/api/buildbucket/buildbucket/v1"
 	"go.skia.org/infra/go/buildbucket"
+	"go.skia.org/infra/go/cleanup"
 	"go.skia.org/infra/go/gerrit"
 	"go.skia.org/infra/go/git/repograph"
 	"go.skia.org/infra/go/metrics2"
 	"go.skia.org/infra/go/sklog"
-	"go.skia.org/infra/go/util"
 	"go.skia.org/infra/task_scheduler/go/cacher"
 	"go.skia.org/infra/task_scheduler/go/db"
 	"go.skia.org/infra/task_scheduler/go/db/cache"
@@ -105,16 +105,27 @@
 // Start initiates the TryJobIntegrator's heatbeat and polling loops. If the
 // given Context is canceled, the loops stop.
 func (t *TryJobIntegrator) Start(ctx context.Context) {
-	go util.RepeatCtx(UPDATE_INTERVAL, ctx, func() {
+	cleanup.Repeat(UPDATE_INTERVAL, func(_ context.Context) {
+		// Explicitly ignore the passed-in context; this allows us to
+		// finish sending heartbeats and updating finished jobs in the
+		// DB even if the context is canceled, which helps to prevent
+		// inconsistencies between Buildbucket and the Task Scheduler
+		// DB.
 		if err := t.updateJobs(time.Now()); err != nil {
 			sklog.Error(err)
 		}
-	})
-	go util.RepeatCtx(POLL_INTERVAL, ctx, func() {
+	}, nil)
+	cleanup.Repeat(POLL_INTERVAL, func(_ context.Context) {
+		// Explicitly ignore the passed-in context; this allows us to
+		// finish leasing jobs from Buildbucket and inserting them into
+		// the DB even if the context is canceled, which helps to
+		// prevent inconsistencies between Buildbucket and the Task
+		// Scheduler DB.
+		ctx := context.Background()
 		if err := t.Poll(ctx); err != nil {
 			sklog.Errorf("Failed to poll for new try jobs: %s", err)
 		}
-	})
+	}, nil)
 }
 
 // getActiveTryJobs returns the active (not yet marked as finished in