Add timeouts to contexts in alertstore and shortcutstore.
Bug: b/304528541
Change-Id: Ic52f75683a55f6d9a55eaaf6956d6186b4b9853c
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/769219
Commit-Queue: Leandro Lovisolo <lovisolo@google.com>
Reviewed-by: Leandro Lovisolo <lovisolo@google.com>
Auto-Submit: Joe Gregorio <jcgregorio@google.com>
diff --git a/go/ctxutil/ctxutil.go b/go/ctxutil/ctxutil.go
index fe4ab49..35c596c 100644
--- a/go/ctxutil/ctxutil.go
+++ b/go/ctxutil/ctxutil.go
@@ -2,6 +2,7 @@
import (
"context"
+ "time"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
@@ -19,3 +20,11 @@
sklog.Errorf("ctx is missing deadline at %s", stack)
}
}
+
+// WithContextTimeout calls `f` with a context that has a timeout, and ensures
+// that the cancel function gets called.
+func WithContextTimeout(ctx context.Context, timeout time.Duration, f func(ctx context.Context)) {
+ timeoutContext, cancel := context.WithTimeout(ctx, timeout)
+ defer cancel()
+ f(timeoutContext)
+}
diff --git a/perf/go/regression/continuous/BUILD.bazel b/perf/go/regression/continuous/BUILD.bazel
index 2882ab0..8730c9e 100644
--- a/perf/go/regression/continuous/BUILD.bazel
+++ b/perf/go/regression/continuous/BUILD.bazel
@@ -7,6 +7,7 @@
importpath = "go.skia.org/infra/perf/go/regression/continuous",
visibility = ["//visibility:public"],
deps = [
+ "//go/ctxutil",
"//go/metrics2",
"//go/paramtools",
"//go/pubsub/sub",
diff --git a/perf/go/regression/continuous/continuous.go b/perf/go/regression/continuous/continuous.go
index 155d0ce..c2a0f64 100644
--- a/perf/go/regression/continuous/continuous.go
+++ b/perf/go/regression/continuous/continuous.go
@@ -11,6 +11,7 @@
"time"
"cloud.google.com/go/pubsub"
+ "go.skia.org/infra/go/ctxutil"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/pubsub/sub"
@@ -208,6 +209,12 @@
return sub.New(ctx, c.flags.Local, c.instanceConfig.IngestionConfig.SourceConfig.Project, c.instanceConfig.IngestionConfig.FileIngestionTopicName, maxParallelReceives)
}
+func (c *Continuous) callProvider(ctx context.Context) ([]*alerts.Alert, error) {
+ timeoutCtx, cancel := context.WithTimeout(ctx, config.QueryMaxRunTime)
+ defer cancel()
+ return c.provider(timeoutCtx)
+}
+
// buildConfigAndParamsetChannel returns a channel that will feed the configs
// and paramset that continuous regression detection should run over. In the
// future when Continuous.eventDriven is true this will be driven by PubSub
@@ -261,7 +268,7 @@
sklog.Infof("IngestEvent received for : %q", ie.Filename)
// Filter all the configs down to just those that match
// the incoming traces.
- configs, err := c.provider(ctx)
+ configs, err := c.callProvider(ctx)
if err != nil {
sklog.Errorf("Failed to get list of configs: %s", err)
// An error not related to the event, nack so we try again later.
@@ -297,7 +304,7 @@
sklog.Info("Channel context error %s", err)
return
}
- configs, err := c.provider(ctx)
+ configs, err := c.callProvider(ctx)
if err != nil {
sklog.Errorf("Failed to get list of configs: %s", err)
time.Sleep(time.Minute)
@@ -417,7 +424,11 @@
sklog.Warningf("Alert failed smoketest: Alert contains invalid query: %q: %s", cfg.Query, err)
continue
}
- matches, err := c.dfBuilder.NumMatches(context.Background(), q)
+
+ var matches int64
+ ctxutil.WithContextTimeout(ctx, config.QueryMaxRunTime, func(ctx context.Context) {
+ matches, err = c.dfBuilder.NumMatches(ctx, q)
+ })
if err != nil {
sklog.Warningf("Alert failed smoketest: %q Failed while trying generic query: %s", cfg.DisplayName, err)
continue
@@ -453,7 +464,12 @@
// traces they matched.
expandBaseRequest = regression.DoNotExpandBaseAlertByGroupBy
}
- if err := regression.ProcessRegressions(ctx, req, clusterResponseProcessor, c.perfGit, c.shortcutStore, c.dfBuilder, c.paramsProvider(), expandBaseRequest, regression.ContinueOnError, c.instanceConfig.AnomalyConfig); err != nil {
+
+ var err error
+ ctxutil.WithContextTimeout(ctx, config.QueryMaxRunTime, func(ctx context.Context) {
+ err = regression.ProcessRegressions(ctx, req, clusterResponseProcessor, c.perfGit, c.shortcutStore, c.dfBuilder, c.paramsProvider(), expandBaseRequest, regression.ContinueOnError, c.instanceConfig.AnomalyConfig)
+ })
+ if err != nil {
sklog.Warningf("Failed regression detection: Query: %q Error: %s", req.Query, err)
}