Add timeouts to contexts in alertstore and shortcutstore.

Bug: b/304528541
Change-Id: Ic52f75683a55f6d9a55eaaf6956d6186b4b9853c
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/769219
Commit-Queue: Leandro Lovisolo <lovisolo@google.com>
Reviewed-by: Leandro Lovisolo <lovisolo@google.com>
Auto-Submit: Joe Gregorio <jcgregorio@google.com>
diff --git a/go/ctxutil/ctxutil.go b/go/ctxutil/ctxutil.go
index fe4ab49..35c596c 100644
--- a/go/ctxutil/ctxutil.go
+++ b/go/ctxutil/ctxutil.go
@@ -2,6 +2,7 @@
 
 import (
 	"context"
+	"time"
 
 	"go.skia.org/infra/go/skerr"
 	"go.skia.org/infra/go/sklog"
@@ -19,3 +20,11 @@
 		sklog.Errorf("ctx is missing deadline at %s", stack)
 	}
 }
+
+// WithContextTimeout calls `f` with a context that has a timeout, and ensures
+// that the cancel function gets called.
+func WithContextTimeout(ctx context.Context, timeout time.Duration, f func(ctx context.Context)) {
+	timeoutContext, cancel := context.WithTimeout(ctx, timeout)
+	defer cancel()
+	f(timeoutContext)
+}
diff --git a/perf/go/regression/continuous/BUILD.bazel b/perf/go/regression/continuous/BUILD.bazel
index 2882ab0..8730c9e 100644
--- a/perf/go/regression/continuous/BUILD.bazel
+++ b/perf/go/regression/continuous/BUILD.bazel
@@ -7,6 +7,7 @@
     importpath = "go.skia.org/infra/perf/go/regression/continuous",
     visibility = ["//visibility:public"],
     deps = [
+        "//go/ctxutil",
         "//go/metrics2",
         "//go/paramtools",
         "//go/pubsub/sub",
diff --git a/perf/go/regression/continuous/continuous.go b/perf/go/regression/continuous/continuous.go
index 155d0ce..c2a0f64 100644
--- a/perf/go/regression/continuous/continuous.go
+++ b/perf/go/regression/continuous/continuous.go
@@ -11,6 +11,7 @@
 	"time"
 
 	"cloud.google.com/go/pubsub"
+	"go.skia.org/infra/go/ctxutil"
 	"go.skia.org/infra/go/metrics2"
 	"go.skia.org/infra/go/paramtools"
 	"go.skia.org/infra/go/pubsub/sub"
@@ -208,6 +209,12 @@
 	return sub.New(ctx, c.flags.Local, c.instanceConfig.IngestionConfig.SourceConfig.Project, c.instanceConfig.IngestionConfig.FileIngestionTopicName, maxParallelReceives)
 }
 
+func (c *Continuous) callProvider(ctx context.Context) ([]*alerts.Alert, error) {
+	timeoutCtx, cancel := context.WithTimeout(ctx, config.QueryMaxRunTime)
+	defer cancel()
+	return c.provider(timeoutCtx)
+}
+
 // buildConfigAndParamsetChannel returns a channel that will feed the configs
 // and paramset that continuous regression detection should run over. In the
 // future when Continuous.eventDriven is true this will be driven by PubSub
@@ -261,7 +268,7 @@
 						sklog.Infof("IngestEvent received for : %q", ie.Filename)
 						// Filter all the configs down to just those that match
 						// the incoming traces.
-						configs, err := c.provider(ctx)
+						configs, err := c.callProvider(ctx)
 						if err != nil {
 							sklog.Errorf("Failed to get list of configs: %s", err)
 							// An error not related to the event, nack so we try again later.
@@ -297,7 +304,7 @@
 				sklog.Info("Channel context error %s", err)
 				return
 			}
-			configs, err := c.provider(ctx)
+			configs, err := c.callProvider(ctx)
 			if err != nil {
 				sklog.Errorf("Failed to get list of configs: %s", err)
 				time.Sleep(time.Minute)
@@ -417,7 +424,11 @@
 					sklog.Warningf("Alert failed smoketest: Alert contains invalid query: %q: %s", cfg.Query, err)
 					continue
 				}
-				matches, err := c.dfBuilder.NumMatches(context.Background(), q)
+
+				var matches int64
+				ctxutil.WithContextTimeout(ctx, config.QueryMaxRunTime, func(ctx context.Context) {
+					matches, err = c.dfBuilder.NumMatches(ctx, q)
+				})
 				if err != nil {
 					sklog.Warningf("Alert failed smoketest: %q Failed while trying generic query: %s", cfg.DisplayName, err)
 					continue
@@ -453,7 +464,12 @@
 				// traces they matched.
 				expandBaseRequest = regression.DoNotExpandBaseAlertByGroupBy
 			}
-			if err := regression.ProcessRegressions(ctx, req, clusterResponseProcessor, c.perfGit, c.shortcutStore, c.dfBuilder, c.paramsProvider(), expandBaseRequest, regression.ContinueOnError, c.instanceConfig.AnomalyConfig); err != nil {
+
+			var err error
+			ctxutil.WithContextTimeout(ctx, config.QueryMaxRunTime, func(ctx context.Context) {
+				err = regression.ProcessRegressions(ctx, req, clusterResponseProcessor, c.perfGit, c.shortcutStore, c.dfBuilder, c.paramsProvider(), expandBaseRequest, regression.ContinueOnError, c.instanceConfig.AnomalyConfig)
+			})
+			if err != nil {
 				sklog.Warningf("Failed regression detection: Query: %q Error: %s", req.Query, err)
 			}