blob: f01271ecb7a9d5576cf3ec9717ec33d357699570 [file] [log] [blame]
package service
import (
"context"
"fmt"
"net/url"
"sort"
"strings"
"time"
"go.skia.org/infra/go/luciconfig"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/perf/go/alerts"
pb "go.skia.org/infra/perf/go/sheriffconfig/proto/v1"
"go.skia.org/infra/perf/go/sheriffconfig/validate"
"go.skia.org/infra/perf/go/subscription"
subscription_pb "go.skia.org/infra/perf/go/subscription/proto/v1"
"go.skia.org/infra/perf/go/types"
"google.golang.org/protobuf/encoding/prototext"
)
// Custom default values for Alert and Subscription parameters.
const (
defaultBugPriority = 2
defaultBugSeverity = 2
defaultRadius = 1
defaultStepUpOnly = false
defaultMinimumNum = 1
defaultSparse = false
defaultK = 0
defaultGroupBy = ""
)
var directionMap = map[string]alerts.Direction{
"BOTH": alerts.BOTH,
"UP": alerts.UP,
"DOWN": alerts.DOWN,
}
var clusterAlgoMap = map[string]types.RegressionDetectionGrouping{
"STEPFIT": types.StepFitGrouping,
"KMEANS": types.KMeansGrouping,
}
var stepAlgoMap = map[string]types.StepDetection{
"ORIGINAL_STEP": types.OriginalStep,
"ABSOLUTE_STEP": types.AbsoluteStep,
"CONST_STEP": types.Const,
"PERCENT_STEP": types.PercentStep,
"COHEN_STEP": types.CohenStep,
"MANN_WHITNEY_U": types.MannWhitneyU,
}
var actionMap = map[string]types.AlertAction{
"NOACTION": types.NoAction,
"TRIAGE": types.FileIssue,
"BISECT": types.Bisection,
}
// Function to address validation requests.
// Simply return the validation error, or nil if there's none.
func ValidateContent(content string) error {
config, err := validate.DeserializeProto(content)
if err != nil {
return skerr.Wrap(err)
}
err = validate.ValidateConfig(config)
if err != nil {
return skerr.Wrap(err)
}
return nil
}
type sheriffconfigService struct {
subscriptionStore subscription.Store
alertStore alerts.Store
luciconfigApiClient luciconfig.ApiClient
}
// Create new SheriffConfig service.
func New(ctx context.Context,
subscriptionStore subscription.Store,
alertStore alerts.Store,
luciconfigApiClient luciconfig.ApiClient) (*sheriffconfigService, error) {
if luciconfigApiClient == nil {
var err error
luciconfigApiClient, err = luciconfig.NewApiClient(ctx, false)
if err != nil {
return nil, skerr.Fmt("Failed to create new LUCI Config client: %s.", err)
}
}
return &sheriffconfigService{
subscriptionStore: subscriptionStore,
alertStore: alertStore,
luciconfigApiClient: luciconfigApiClient,
}, nil
}
// Fetches specified path config from LUCI Config, transforms it and stores it in the CockroachDB
// in Subscription and Alert tables.
func (s *sheriffconfigService) ImportSheriffConfig(ctx context.Context, path string) error {
configs, err := s.luciconfigApiClient.GetProjectConfigs(ctx, path)
if err != nil {
return skerr.Wrap(err)
}
if len(configs) == 0 {
return skerr.Fmt("Couldn't find any configs under path: %s,", path)
}
saveRequests := []*alerts.SaveRequest{}
subscriptions := []*subscription_pb.Subscription{}
for _, config := range configs {
ss, srs, err := s.processConfig(ctx, config)
if err != nil {
return skerr.Wrap(err)
}
subscriptions = append(subscriptions, ss...)
saveRequests = append(saveRequests, srs...)
}
if len(subscriptions) != 0 {
if err := s.subscriptionStore.InsertSubscriptions(ctx, subscriptions); err != nil {
return skerr.Wrap(err)
}
}
if len(saveRequests) != 0 {
if err := s.alertStore.ReplaceAll(ctx, saveRequests); err != nil {
return skerr.Wrap(err)
}
}
return nil
}
// processConfig handles validation and transformation of a single config.
func (s *sheriffconfigService) processConfig(ctx context.Context, config *luciconfig.ProjectConfig) ([]*subscription_pb.Subscription, []*alerts.SaveRequest, error) {
// Validate and deserialize config content
sheriffconfig := &pb.SheriffConfig{}
err := prototext.Unmarshal([]byte(config.Content), sheriffconfig)
if err != nil {
return nil, nil, skerr.Fmt("Failed to unmarshal prototext: %s", err)
}
if err := validate.ValidateConfig(sheriffconfig); err != nil {
return nil, nil, skerr.Wrap(err)
}
subscriptionEntities := []*subscription_pb.Subscription{}
saveRequests := []*alerts.SaveRequest{}
// Prepare subscription and alert data
for _, subscription := range sheriffconfig.Subscriptions {
subscriptionEntity := makeSubscriptionEntity(subscription, config.Revision)
// We check if the entity already exists in the DB. If it is, there's no need to re-insert it.
// We only want to update the DB when there's an actual revision change in the configurations.
sub, err := s.subscriptionStore.GetSubscription(ctx, subscriptionEntity.Name, subscriptionEntity.Revision)
if err != nil {
return nil, nil, skerr.Wrap(err)
}
if sub == nil {
subscriptionEntities = append(subscriptionEntities, subscriptionEntity)
subSaveRequests, err := makeSaveRequests(subscription, config.Revision)
if err != nil {
return nil, nil, skerr.Wrap(err)
}
saveRequests = append(saveRequests, subSaveRequests...)
}
}
return subscriptionEntities, saveRequests, nil
}
// makeSubscriptionEntity creates subscription entitiy to be inserted into DB based on Sheriff Config protos.
func makeSubscriptionEntity(subscription *pb.Subscription, revision string) *subscription_pb.Subscription {
subscriptionEntity := &subscription_pb.Subscription{
Name: subscription.Name,
ContactEmail: subscription.ContactEmail,
BugLabels: subscription.BugLabels,
Hotlists: subscription.HotlistLabels,
BugComponent: subscription.BugComponent,
BugPriority: getPriorityFromProto(subscription.BugPriority),
BugSeverity: getSeverityFromProto(subscription.BugSeverity),
BugCcEmails: subscription.BugCcEmails,
Revision: revision,
}
return subscriptionEntity
}
// makeSaveRequests creates SaveRequest objects for a given subscription to be inserted into Alerts DB table.
func makeSaveRequests(subscription *pb.Subscription, revision string) ([]*alerts.SaveRequest, error) {
saveRequests := []*alerts.SaveRequest{}
for _, anomalyConfig := range subscription.AnomalyConfigs {
for _, match := range anomalyConfig.Rules.Match {
query, err := buildQueryFromRules(match, anomalyConfig.Rules.Exclude)
if err != nil {
return nil, skerr.Wrap(err)
}
cfg := createAlert(query, anomalyConfig, subscription, revision)
saveRequest := &alerts.SaveRequest{
Cfg: cfg,
SubKey: &alerts.SubKey{
SubName: subscription.Name,
SubRevision: revision,
},
}
saveRequests = append(saveRequests, saveRequest)
}
}
return saveRequests, nil
}
// createAlert creates Alert object.
func createAlert(query string, anomalyConfig *pb.AnomalyConfig, subscription *pb.Subscription, revision string) *alerts.Alert {
// Apply defaults
radius := defaultRadius
if anomalyConfig.Radius != nil {
radius = int(*anomalyConfig.Radius)
}
minimumNum := defaultMinimumNum
if anomalyConfig.MinimumNum != nil {
minimumNum = int(*anomalyConfig.MinimumNum)
}
sparse := defaultSparse
if anomalyConfig.Sparse != nil {
sparse = *anomalyConfig.Sparse
}
k := defaultK
if anomalyConfig.K != nil {
k = int(*anomalyConfig.K)
}
groupBy := defaultGroupBy
if anomalyConfig.GroupBy != nil {
groupBy = *anomalyConfig.GroupBy
}
cfg := &alerts.Alert{
IDAsString: "-1",
DisplayName: query,
Query: query,
Alert: subscription.ContactEmail,
Interesting: anomalyConfig.Threshold,
Algo: clusterAlgoMap[anomalyConfig.Algo.String()],
Step: stepAlgoMap[anomalyConfig.Step.String()],
StateAsString: alerts.ACTIVE,
Owner: subscription.ContactEmail,
StepUpOnly: defaultStepUpOnly,
DirectionAsString: directionMap[anomalyConfig.Direction.String()],
Radius: int(radius),
K: k,
GroupBy: groupBy,
Sparse: sparse,
MinimumNum: minimumNum,
Action: actionMap[anomalyConfig.Action.String()],
SubscriptionName: subscription.Name,
SubscriptionRevision: revision,
}
return cfg
}
// buildQueryFromRules creates query based on Sheriff Config rules.
func buildQueryFromRules(match string, excludes []string) (string, error) {
var queryParts []string
matchQuery, err := url.ParseQuery(match)
if err != nil {
return "", skerr.Wrap(err)
}
for key, values := range matchQuery {
for _, value := range values {
queryParts = append(queryParts, fmt.Sprintf("%s=%s", key, value))
}
}
for _, exclude := range excludes {
excludeQuery, err := url.ParseQuery(exclude)
if err != nil {
return "", skerr.Wrap(err)
}
for key, values := range excludeQuery {
// Use values[0] as there should only be 1 valid key value pair in an exclude pattern.
queryParts = append(queryParts, fmt.Sprintf("%s=!%s", key, values[0]))
}
}
sort.Strings(queryParts)
return strings.Join(queryParts, "&"), nil
}
func getPriorityFromProto(pri pb.Subscription_Priority) int32 {
if int32(pri) == 0 {
return defaultBugPriority
}
return int32(pri) - 1
}
func getSeverityFromProto(sev pb.Subscription_Severity) int32 {
if int32(sev) == 0 {
return defaultBugSeverity
}
return int32(sev) - 1
}
func (s *sheriffconfigService) StartImportRoutine(period time.Duration) {
go func() {
for range time.Tick(period) {
s.ImportSheriffConfigOnce()
}
}()
}
func (s *sheriffconfigService) ImportSheriffConfigOnce() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
sklog.Infof("Importing sheriff configs.")
if err := s.ImportSheriffConfig(ctx, "skia-sheriff-configs.cfg"); err != nil {
sklog.Errorf("Failed to import configs: %s", err)
}
}