| package service |
| |
| import ( |
| "context" |
| "fmt" |
| "sort" |
| "strings" |
| |
| tpr_client "go.temporal.io/sdk/client" |
| |
| "go.skia.org/infra/go/paramtools" |
| "go.skia.org/infra/go/query" |
| "go.skia.org/infra/go/skerr" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/perf/go/anomalygroup" |
| ag "go.skia.org/infra/perf/go/anomalygroup/proto/v1" |
| "go.skia.org/infra/perf/go/backend/shared" |
| reg "go.skia.org/infra/perf/go/regression" |
| "google.golang.org/grpc" |
| ) |
| |
| // anomalygroupService implements AnomalyGroupService |
| type anomalygroupService struct { |
| ag.UnimplementedAnomalyGroupServiceServer |
| anomalygroupStore anomalygroup.Store |
| regressionStore reg.Store |
| temporalClient tpr_client.Client |
| } |
| |
| // New returns a new instance of anomalygroupService. |
| func New(anomalygroupStore anomalygroup.Store, regressionStore reg.Store, temporalClient tpr_client.Client) *anomalygroupService { |
| return &anomalygroupService{ |
| anomalygroupStore: anomalygroupStore, |
| regressionStore: regressionStore, |
| temporalClient: temporalClient, |
| } |
| } |
| |
| // RegisterGrpc implements backend.BackendService |
| func (s *anomalygroupService) RegisterGrpc(server *grpc.Server) { |
| ag.RegisterAnomalyGroupServiceServer(server, s) |
| } |
| |
| // GetAuthorizationPolicy implements backend.BackendService |
| func (s *anomalygroupService) GetAuthorizationPolicy() shared.AuthorizationPolicy { |
| return shared.AuthorizationPolicy{ |
| AllowUnauthenticated: true, |
| } |
| } |
| |
| // GetServiceDescriptor implements backend.BackendService |
| func (s *anomalygroupService) GetServiceDescriptor() grpc.ServiceDesc { |
| return ag.AnomalyGroupService_ServiceDesc |
| } |
| |
| // created a new group given a set of properties. |
| func (s *anomalygroupService) CreateNewAnomalyGroup( |
| ctx context.Context, |
| req *ag.CreateNewAnomalyGroupRequest) (*ag.CreateNewAnomalyGroupResponse, error) { |
| |
| new_group_id, err := s.anomalygroupStore.Create( |
| ctx, |
| req.SubscriptionName, |
| req.SubscriptionRevision, |
| req.Domain, |
| req.Benchmark, |
| req.StartCommit, |
| req.EndCommit, |
| req.Action.String()) |
| if err != nil { |
| return nil, fmt.Errorf( |
| "error when calling CreateNewAnomalyGroup. Params: %s", req) |
| } |
| return &ag.CreateNewAnomalyGroupResponse{ |
| AnomalyGroupId: new_group_id, |
| }, nil |
| } |
| |
| // Given a group id, return the group. |
| func (s *anomalygroupService) LoadAnomalyGroupByID( |
| ctx context.Context, |
| req *ag.LoadAnomalyGroupByIDRequest) (*ag.LoadAnomalyGroupByIDResponse, error) { |
| anomaly_group, err := s.anomalygroupStore.LoadById(ctx, req.AnomalyGroupId) |
| if err != nil { |
| return nil, fmt.Errorf( |
| "error when calling LoadAnomalyGroupByID: %s. Params: %s", err.Error(), req) |
| } |
| return &ag.LoadAnomalyGroupByIDResponse{ |
| AnomalyGroup: anomaly_group, |
| }, nil |
| } |
| |
| // Given one of the following value, update the group |
| // - bisection id |
| // - reported issue id |
| // - new anomaly id |
| // - new culprit ids |
| func (s *anomalygroupService) UpdateAnomalyGroup( |
| ctx context.Context, |
| req *ag.UpdateAnomalyGroupRequest) (*ag.UpdateAnomalyGroupResponse, error) { |
| if req.BisectionId != "" { |
| if err := s.anomalygroupStore.UpdateBisectID( |
| ctx, req.AnomalyGroupId, req.BisectionId); err != nil { |
| return nil, fmt.Errorf( |
| "error updating the bisection id %s for anomaly group %s", |
| req.BisectionId, req.AnomalyGroupId) |
| } |
| } else if req.IssueId != "" { |
| if err := s.anomalygroupStore.UpdateReportedIssueID( |
| ctx, req.AnomalyGroupId, req.IssueId); err != nil { |
| return nil, fmt.Errorf( |
| "error updating the reported issue id %s for anomaly group %s", |
| req.IssueId, req.AnomalyGroupId) |
| } |
| } else if req.AnomalyId != "" { |
| if err := s.anomalygroupStore.AddAnomalyID( |
| ctx, req.AnomalyGroupId, req.AnomalyId); err != nil { |
| return nil, fmt.Errorf( |
| "error adding the anomaly id %s to anomaly group %s", |
| req.AnomalyId, req.AnomalyGroupId) |
| } |
| } else if len(req.CulpritIds) > 0 { |
| if err := s.anomalygroupStore.AddCulpritIDs( |
| ctx, req.AnomalyGroupId, req.CulpritIds); err != nil { |
| return nil, fmt.Errorf( |
| "error adding the culprit ids %s to anomaly group %s", |
| req.CulpritIds, req.AnomalyGroupId) |
| } |
| } |
| return &ag.UpdateAnomalyGroupResponse{}, nil |
| } |
| |
| // Give a set of grouping criteria, return the existing groups. |
| func (s *anomalygroupService) FindExistingGroups( |
| ctx context.Context, |
| req *ag.FindExistingGroupsRequest) (*ag.FindExistingGroupsResponse, error) { |
| test_path_pieces := strings.Split(req.TestPath, "/") |
| if len(test_path_pieces) < 5 { |
| return nil, fmt.Errorf("invalid fromat of test path: %s", req.TestPath) |
| } |
| domain_name := test_path_pieces[0] |
| benchmark_name := test_path_pieces[2] |
| anomaly_groups, err := s.anomalygroupStore.FindExistingGroup(ctx, |
| req.SubscriptionName, req.SubscriptionRevision, domain_name, |
| benchmark_name, req.StartCommit, req.EndCommit, req.Action.String()) |
| if err != nil { |
| return nil, fmt.Errorf("failed on finding existing groups. Request: %s", req) |
| } |
| return &ag.FindExistingGroupsResponse{ |
| AnomalyGroups: anomaly_groups, |
| }, nil |
| } |
| |
| // Given the group id and a limit value N, return the top N regressions from |
| // the group's anomalies, sorted by the percentage changed on median. |
| func (s *anomalygroupService) FindTopAnomalies( |
| ctx context.Context, |
| req *ag.FindTopAnomaliesRequest) (*ag.FindTopAnomaliesResponse, error) { |
| group_id := req.AnomalyGroupId |
| anomaly_group, err := s.anomalygroupStore.LoadById(ctx, group_id) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "failed to load anomaly group: %s", group_id) |
| } |
| |
| anomalies, err := s.regressionStore.GetByIDs(ctx, anomaly_group.AnomalyIds) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "failed to load regressions from group: %s", group_id) |
| } |
| |
| sort.Slice(anomalies, func(i, j int) bool { |
| // sort anomalies by the percentage changed from median_before to median_after |
| diff_i := (anomalies[i].MedianAfter - anomalies[i].MedianBefore) / anomalies[i].MedianBefore |
| diff_j := (anomalies[j].MedianAfter - anomalies[j].MedianBefore) / anomalies[j].MedianBefore |
| return diff_i > diff_j |
| }) |
| |
| var count int |
| if req.Limit == 0 { |
| count = len(anomalies) |
| } else { |
| count = int(req.Limit) |
| } |
| |
| top_regressions := []*ag.Anomaly{} |
| // loop over the top 'count' regressions. |
| for i := 0; i < count; i++ { |
| anomaly := anomalies[i] |
| paramset := anomaly.Frame.DataFrame.ParamSet |
| |
| if !isParamSetValid(paramset) { |
| // Debug logs on b/357629365: can we use traceset to replace paramset? |
| for key := range anomaly.Frame.DataFrame.TraceSet { |
| paramset_from_key, err := query.ParseKey(key) |
| if err != nil { |
| sklog.Debugf("[AG][InvalidParamset] Failed to parse trace set key: %s", key) |
| } else { |
| sklog.Debugf("[AG][InvalidParamset] Paramset parsed from trace set: %s", paramset_from_key) |
| } |
| } |
| |
| return nil, skerr.Fmt("invalid paramset %s for chromeperf", paramset) |
| } |
| |
| // find the last available parameters of the subtest_x series. |
| subtest_keys := []string{"subtest_3", "subtest_2", "subtest_1"} |
| story := []string{} |
| for _, key := range subtest_keys { |
| ok := false |
| if story, ok = paramset[key]; ok { |
| break |
| } |
| } |
| |
| // TODO(wenbinzhang): put the list of parameters in config file. |
| paramset_map := map[string]string{ |
| "bot": paramset["bot"][0], |
| "benchmark": paramset["benchmark"][0], |
| "story": story[0], |
| "measurement": paramset["test"][0], |
| "stat": paramset["stat"][0], |
| } |
| |
| top_regressions = append(top_regressions, &ag.Anomaly{ |
| StartCommit: int64(anomaly.PrevCommitNumber), |
| EndCommit: int64(anomaly.CommitNumber), |
| Paramset: paramset_map, |
| ImprovementDirection: paramset["improvement_direction"][0], |
| }) |
| } |
| |
| return &ag.FindTopAnomaliesResponse{ |
| Anomalies: top_regressions, |
| }, nil |
| } |
| |
| func isParamSetValid(paramset paramtools.ReadOnlyParamSet) bool { |
| requiredKeys := []string{"bot", "benchmark", "test", "stat", "subtest_1"} |
| for _, key := range requiredKeys { |
| value, ok := paramset[key] |
| if !ok { |
| return false |
| } |
| if len(value) > 1 { |
| return false |
| } |
| } |
| |
| return true |
| } |