Regression detection query optimization round 2
- In https://skia-review.googlesource.com/c/buildbot/+/817735 we separated out the event driven from continous mode. Also a change was made to update the query of the alert config to search only for the incoming trace and not the entire query in the config, when the config was running in Individual mode.
- Considering there are a lot more traces coming in an ingestion event as compared to possibly matched configs, it will involve less db queries if we simply merge the queries for multiple traces into the commonly matched config.
- There is still a slim chance that a lot of traces match a single alert config and the query can become long, but that should be acceptable since it will be a subset of the original query in the config so should be faster than running the entire original query.
- This change also reduces the time complexity of the query hits from O(num_configs*no_traces) to O(num_configs).
Bug: b/326128508
Change-Id: Ie84631123c29439373e9fc502d46e213fed11fa5
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/821645
Reviewed-by: Joe Gregorio <jcgregorio@google.com>
Commit-Queue: Ashwin Verleker <ashwinpv@google.com>
diff --git a/perf/go/regression/continuous/BUILD.bazel b/perf/go/regression/continuous/BUILD.bazel
index 0b67fc6..8f1447f 100644
--- a/perf/go/regression/continuous/BUILD.bazel
+++ b/perf/go/regression/continuous/BUILD.bazel
@@ -54,5 +54,6 @@
"//perf/go/ui/frame",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
+ "@org_golang_x_exp//slices",
],
)
diff --git a/perf/go/regression/continuous/continuous.go b/perf/go/regression/continuous/continuous.go
index 492daf5..610a863 100644
--- a/perf/go/regression/continuous/continuous.go
+++ b/perf/go/regression/continuous/continuous.go
@@ -43,6 +43,10 @@
checkIfRegressionIsDoneDuration = 100 * time.Millisecond
doNotOverrideQuery = ""
+
+ // Max no of matching traces allowed to configure filtered querying when
+ // processing an alert config running in Individual mode.
+ maxTraceIdCountForIndividualQuery = 10000
)
// Continuous is used to run clustering on the last numCommits commits and
@@ -200,7 +204,9 @@
paramset paramtools.ReadOnlyParamSet
}
-type traceConfigsMap map[string][]*alerts.Alert
+// configTracesMap provides a map of all the matching traces for a given
+// alert config.
+type configTracesMap map[alerts.Alert][]string
// getPubSubSubscription returns a pubsub.Subscription or an error if the
// subscription can't be established.
@@ -219,8 +225,8 @@
return c.provider.GetAllAlertConfigs(timeoutCtx, false)
}
-func (c *Continuous) buildTraceConfigsMapChannelEventDriven(ctx context.Context) <-chan traceConfigsMap {
- ret := make(chan traceConfigsMap)
+func (c *Continuous) buildTraceConfigsMapChannelEventDriven(ctx context.Context) <-chan configTracesMap {
+ ret := make(chan configTracesMap)
sub, err := c.getPubSubSubscription()
if err != nil {
sklog.Errorf("Failed to create pubsub subscription, not doing event driven regression detection: %s", err)
@@ -287,7 +293,7 @@
return ret
}
-func (c *Continuous) getTraceIdConfigsForIngestEvent(ctx context.Context, ie *ingestevents.IngestEvent) (traceConfigsMap, error) {
+func (c *Continuous) getTraceIdConfigsForIngestEvent(ctx context.Context, ie *ingestevents.IngestEvent) (configTracesMap, error) {
// Filter all the configs down to just those that match
// the incoming traces.
configs, err := c.callProvider(ctx)
@@ -345,8 +351,8 @@
// Note that the Alerts returned may contain more restrictive Query values if
// the original Alert contains GroupBy parameters, while the original Alert
// remains unchanged.
-func matchingConfigsFromTraceIDs(traceIDs []string, configs []*alerts.Alert) map[string][]*alerts.Alert {
- matchingConfigs := map[string][]*alerts.Alert{}
+func matchingConfigsFromTraceIDs(traceIDs []string, configs []*alerts.Alert) configTracesMap {
+ matchingConfigs := map[alerts.Alert][]string{}
if len(traceIDs) == 0 {
return matchingConfigs
}
@@ -359,11 +365,6 @@
// If any traceID matches the query in the alert then it's an alert we should run.
for _, key := range traceIDs {
if q.Matches(key) {
- _, ok := matchingConfigs[key]
- if !ok {
- // Encountered this traceID for the first time.
- matchingConfigs[key] = []*alerts.Alert{}
- }
query, err := getConfigQueryForTrace(config, key)
if err != nil {
continue
@@ -371,7 +372,12 @@
configCopy := *config
configCopy.Query = query
- matchingConfigs[key] = append(matchingConfigs[key], &configCopy)
+ _, ok := matchingConfigs[configCopy]
+ if !ok {
+ // Encountered this traceID for the first time.
+ matchingConfigs[configCopy] = []string{}
+ }
+ matchingConfigs[configCopy] = append(matchingConfigs[configCopy], key)
}
}
}
@@ -420,29 +426,35 @@
// and a list of matching alert configs as the value. These are processed
// from the file that was just ingested and notification received over pubsub.
for traceConfigMap := range c.buildTraceConfigsMapChannelEventDriven(ctx) {
- for traceId, configs := range traceConfigMap {
- c.ProcessAlertConfigForTrace(ctx, traceId, configs)
+ for config, traces := range traceConfigMap {
+ c.ProcessAlertConfigForTraces(ctx, config, traces)
}
}
}
// ProcessAlertConfigForTrace runs the alert config on a specific trace id
-func (c *Continuous) ProcessAlertConfigForTrace(ctx context.Context, traceId string, configs []*alerts.Alert) {
- sklog.Infof("Clustering over %d configs for trace %s", traceId, len(configs))
- paramset := paramtools.NewParamSet()
- paramset.AddParamsFromKey(traceId)
+func (c *Continuous) ProcessAlertConfigForTraces(ctx context.Context, config alerts.Alert, traceIds []string) {
+ sklog.Infof("Clustering over %d traces for config %s", len(traceIds), config.IDAsString)
- for _, cfg := range configs {
- queryOverride := doNotOverrideQuery
- // If the alert specifies StepFitGrouping (i.e Individual instead of KMeans)
- // we need to only query the paramset of the incoming data point instead of
- // the entire query in the alert.
- if cfg.Algo == types.StepFitGrouping {
+ queryOverride := doNotOverrideQuery
+ // If the alert specifies StepFitGrouping (i.e Individual instead of KMeans)
+ // we need to only query the paramset of the incoming data point instead of
+ // the entire query in the alert.
+ if config.Algo == types.StepFitGrouping {
+ // If the no of traceids for this alert config is above the limit,
+ // we will simply run the entire query in the alert config
+ if len(traceIds) <= maxTraceIdCountForIndividualQuery {
+ paramset := paramtools.NewParamSet()
+
+ // Merge all the traceIds into a paramset
+ for _, traceId := range traceIds {
+ paramset.AddParamsFromKey(traceId)
+ }
queryOverride = c.urlProvider.GetQueryStringFromParameters(paramset)
}
-
- c.ProcessAlertConfig(ctx, cfg, queryOverride)
}
+
+ c.ProcessAlertConfig(ctx, &config, queryOverride)
}
// RunContinuousClustering runs the regression detection on a continuous basis.
diff --git a/perf/go/regression/continuous/continuous_test.go b/perf/go/regression/continuous/continuous_test.go
index 3bfd04f..4f0ec76 100644
--- a/perf/go/regression/continuous/continuous_test.go
+++ b/perf/go/regression/continuous/continuous_test.go
@@ -26,6 +26,7 @@
"go.skia.org/infra/perf/go/stepfit"
"go.skia.org/infra/perf/go/types"
"go.skia.org/infra/perf/go/ui/frame"
+ "golang.org/x/exp/slices"
)
func TestBuildConfigsAndParamSet(t *testing.T) {
@@ -123,6 +124,7 @@
func TestMatchingConfigsFromTraceIDs_GroupByMatchesTrace_ReturnsConfigWithRestrictedQuery(t *testing.T) {
config1 := alerts.NewConfig()
config1.Query = "arch=x86"
+ config1.SetIDFromInt64(123)
config1.GroupBy = "config"
traceIDs := []string{
",arch=x86,config=8888,",
@@ -131,10 +133,16 @@
matchingConfigs := matchingConfigsFromTraceIDs(traceIDs, []*alerts.Alert{config1})
require.Len(t, matchingConfigs, 1)
- configs := matchingConfigs[traceIDs[0]]
- require.Equal(t, "arch=x86&config=8888", configs[0].Query)
- _, err := url.ParseQuery(configs[0].Query)
- require.NoError(t, err)
+ for config, traces := range matchingConfigs {
+ assert.Equal(t, config1.IDAsString, config.IDAsString)
+ assert.Equal(t, config1.GroupBy, config.GroupBy)
+
+ // Ensure that the query is updated inside the alert config
+ assert.NotEqual(t, config1.Query, config.Query)
+ assert.Equal(t, "arch=x86&config=8888", config.Query)
+
+ assert.Equal(t, traceIDs[0], traces[0])
+ }
}
func TestMatchingConfigsFromTraceIDs_MultipleGroupByPartsMatchTrace_ReturnsConfigWithRestrictedQueryUsingAllMatchingGroupByKeys(t *testing.T) {
@@ -147,10 +155,11 @@
}
matchingConfigs := matchingConfigsFromTraceIDs(traceIDs, []*alerts.Alert{config})
require.Len(t, matchingConfigs, 1)
- configs := matchingConfigs[traceIDs[0]]
- require.Equal(t, "arch=x86&config=8888&device=Pixel4", configs[0].Query)
- _, err := url.ParseQuery(configs[0].Query)
- require.NoError(t, err)
+ for config := range matchingConfigs {
+ assert.Equal(t, "arch=x86&config=8888&device=Pixel4", config.Query)
+ _, err := url.ParseQuery(config.Query)
+ require.NoError(t, err)
+ }
}
type allMocks struct {
@@ -296,11 +305,14 @@
ie := &ingestevents.IngestEvent{
TraceIDs: []string{",id=trace1,", ",id=trace2,"},
}
- traceConfigsMap, err := c.getTraceIdConfigsForIngestEvent(ctx, ie)
+ configTracesMap, err := c.getTraceIdConfigsForIngestEvent(ctx, ie)
assert.Nil(t, err)
- assert.NotNil(t, traceConfigsMap)
- assert.Equal(t, allConfigs[0], traceConfigsMap[ie.TraceIDs[0]][0], "Expect the first trace to match first config.")
- assert.Nil(t, traceConfigsMap[ie.TraceIDs[1]], "No match expected for second trace.")
+ assert.NotNil(t, configTracesMap)
+ for _, traces := range configTracesMap {
+ assert.Equal(t, ie.TraceIDs[0], traces[0], "Expect the first config to match first trace.")
+ assert.False(t, slices.Contains(traces, ie.TraceIDs[1]), "No match expected for second trace.")
+ }
+
}
func TestTraceIdForIngestEvent_MultipleConfigs_Matching(t *testing.T) {
@@ -322,8 +334,8 @@
ie := &ingestevents.IngestEvent{
TraceIDs: []string{",id=trace3,"},
}
- traceConfigsMap, err := c.getTraceIdConfigsForIngestEvent(ctx, ie)
+ configTracesMap, err := c.getTraceIdConfigsForIngestEvent(ctx, ie)
assert.Nil(t, err)
- assert.NotNil(t, traceConfigsMap)
- assert.Equal(t, allConfigs, traceConfigsMap[ie.TraceIDs[0]])
+ assert.NotNil(t, configTracesMap)
+ assert.Equal(t, 2, len(configTracesMap))
}