blob: 5a315d2209e90c4f1448b3a9dac6160ee9b9d5fc [file] [log] [blame]
package catapult
import (
"errors"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/pinpoint/go/midpoint"
"go.skia.org/infra/pinpoint/go/workflows"
pinpoint_proto "go.skia.org/infra/pinpoint/proto/v1"
"go.temporal.io/sdk/workflow"
)
// CulpritFinderWorkflow confirms if an anomaly is a real regression, finds culprits for the
// regression and then verifies the culprit is real.
// This workflow is also known as the sandwich verification workflow
// TODO(b/322202740): Move this workflow out of the catapult folder and into the internal folder
// prior to deprecating the catapult directory.
func CulpritFinderWorkflow(ctx workflow.Context, cfp *workflows.CulpritFinderParams) (*pinpoint_proto.CulpritFinderExecution, error) {
ctx = workflow.WithChildOptions(ctx, childWorkflowOptions)
ctx = workflow.WithActivityOptions(ctx, regularActivityOptions)
pp := workflows.PairwiseParams{
Request: &pinpoint_proto.SchedulePairwiseRequest{
StartCommit: &pinpoint_proto.CombinedCommit{
Main: midpoint.NewChromiumCommit(cfp.Request.StartGitHash),
},
EndCommit: &pinpoint_proto.CombinedCommit{
Main: midpoint.NewChromiumCommit(cfp.Request.EndGitHash),
},
Configuration: cfp.Request.Configuration,
Benchmark: cfp.Request.Benchmark,
Story: cfp.Request.Story,
Chart: cfp.Request.Chart,
Statistic: cfp.Request.Statistic,
InitialAttemptCount: "30",
ImprovementDirection: cfp.Request.ImprovementDirection,
},
}
var pe *pinpoint_proto.PairwiseExecution
if err := workflow.ExecuteChildWorkflow(ctx, workflows.PairwiseWorkflow, pp).Get(ctx, &pe); err != nil {
return nil, skerr.Wrap(err)
}
// no regression found, no bug creation necessary
if !pe.Significant {
return &pinpoint_proto.CulpritFinderExecution{
RegressionVerified: false,
}, nil
}
bp := &workflows.BisectParams{
Request: &pinpoint_proto.ScheduleBisectRequest{
ComparisonMode: "performance",
StartGitHash: cfp.Request.StartGitHash,
EndGitHash: cfp.Request.EndGitHash,
Configuration: cfp.Request.Configuration,
Benchmark: cfp.Request.Benchmark,
Story: cfp.Request.Story,
Chart: cfp.Request.Chart,
AggregationMethod: cfp.Request.Statistic,
ComparisonMagnitude: cfp.Request.ComparisonMagnitude,
InitialAttemptCount: "20",
ImprovementDirection: cfp.Request.ImprovementDirection,
},
}
var be *pinpoint_proto.BisectExecution
if err := workflow.ExecuteChildWorkflow(ctx, workflows.CatapultBisect, bp).Get(ctx, &be); err != nil {
return nil, skerr.Wrap(err)
}
// no culprits found, is there a bug?
// TODO(b/340235131): call culprit processing (if necessary)
if be.Culprits == nil || len(be.Culprits) == 0 {
return &pinpoint_proto.CulpritFinderExecution{
RegressionVerified: true,
}, nil
}
verifiedCulprits, err := verifyCulprits(ctx, be, cfp)
if err != nil {
return nil, skerr.Wrap(err)
}
// TODO(b/340235131): call culprit processing
return &pinpoint_proto.CulpritFinderExecution{
RegressionVerified: true,
Culprits: verifiedCulprits,
}, nil
}
func verifyCulprits(ctx workflow.Context, be *pinpoint_proto.BisectExecution, cfp *workflows.CulpritFinderParams) ([]*pinpoint_proto.CombinedCommit, error) {
rc := workflow.NewBufferedChannel(ctx, len(be.Culprits))
ec := workflow.NewBufferedChannel(ctx, len(be.Culprits))
wg := workflow.NewWaitGroup(ctx)
wg.Add(len(be.Culprits))
for _, culprit := range be.Culprits {
workflow.Go(ctx, func(gCtx workflow.Context) {
defer wg.Done()
exec, err := runCulpritVerification(gCtx, culprit, cfp)
if err != nil {
ec.Send(gCtx, err)
return
}
rc.Send(gCtx, exec)
})
}
wg.Wait(ctx)
rc.Close()
ec.Close()
if errs := fetchAllFromChannel[error](ctx, ec); len(errs) != 0 {
return nil, skerr.Wrapf(errors.Join(errs...), "terminal errors found")
}
culpritVerifyExecs := fetchAllFromChannel[*pinpoint_proto.PairwiseExecution](ctx, rc)
verifiedCulprits := []*pinpoint_proto.CombinedCommit{}
for _, exec := range culpritVerifyExecs {
if exec.Culprit != nil {
verifiedCulprits = append(verifiedCulprits, exec.Culprit)
}
}
return verifiedCulprits, nil
}
// runCulpritVerification triggers a culprit verification workflow and returns the result
func runCulpritVerification(ctx workflow.Context, culprit *pinpoint_proto.CombinedCommit, cfp *workflows.CulpritFinderParams) (*pinpoint_proto.PairwiseExecution, error) {
pp := workflows.PairwiseParams{
Request: &pinpoint_proto.SchedulePairwiseRequest{
// TODO(b/340220164): compare against the previous commit instead of the commit
// at the start of the regression range (requires changes to bisectExecution)
StartCommit: &pinpoint_proto.CombinedCommit{
Main: midpoint.NewChromiumCommit(cfp.Request.StartGitHash),
},
EndCommit: culprit,
Configuration: cfp.Request.Configuration,
Benchmark: cfp.Request.Benchmark,
Story: cfp.Request.Story,
Chart: cfp.Request.Chart,
Statistic: cfp.Request.Statistic,
InitialAttemptCount: "30",
ImprovementDirection: cfp.Request.ImprovementDirection,
},
CulpritVerify: true,
}
var pe *pinpoint_proto.PairwiseExecution
if err := workflow.ExecuteChildWorkflow(ctx, workflows.PairwiseWorkflow, pp).Get(ctx, &pe); err != nil {
return nil, err
}
return pe, nil
}
func fetchAllFromChannel[T any](ctx workflow.Context, rc workflow.ReceiveChannel) []T {
ln := rc.Len()
runs := make([]T, ln)
for i := 0; i < ln; i++ {
rc.Receive(ctx, &runs[i])
}
return runs
}