| package internal |
| |
| import ( |
| "errors" |
| "fmt" |
| "time" |
| |
| "github.com/google/uuid" |
| "go.skia.org/infra/go/skerr" |
| "go.skia.org/infra/pinpoint/go/compare" |
| "go.skia.org/infra/pinpoint/go/midpoint" |
| "go.skia.org/infra/pinpoint/go/workflows" |
| "go.skia.org/infra/temporal/go/common" |
| "go.temporal.io/sdk/workflow" |
| |
| pinpoint_proto "go.skia.org/infra/pinpoint/proto/v1" |
| timestamppb "google.golang.org/protobuf/types/known/timestamppb" |
| ) |
| |
| var benchmarkRunIterations = [...]int32{10, 20, 40, 80, 160} |
| |
| func getMaxSampleSize() int32 { |
| return benchmarkRunIterations[len(benchmarkRunIterations)-1] |
| } |
| |
| // bisectRunTracker stores all the running bisect runs. |
| // |
| // It keeps track of all the runs by indexes. The BisectRun will be updated from different |
| // future fulfillment. One can wait for a bisect run that's already triggered by a different |
| // bisection. This usually happens when the mid commit is computed and the result will be |
| // used for comparisions from both sides. This can also happen when one comparision requires |
| // more runs, and part of them is already triggered by another comparision. |
| type BisectRunIndex int |
| type bisectRunTracker struct { |
| runs []*BisectRun |
| } |
| |
| func (t *bisectRunTracker) newRun(cc *midpoint.CombinedCommit) (BisectRunIndex, *BisectRun) { |
| r := newBisectRun(cc) |
| t.runs = append(t.runs, r) |
| return BisectRunIndex(len(t.runs) - 1), r |
| } |
| |
| func (t bisectRunTracker) get(r BisectRunIndex) *BisectRun { |
| if r < 0 || int(r) >= len(t.runs) { |
| return nil |
| } |
| return t.runs[int(r)] |
| } |
| |
| // CommitRangeTracker stores a commit range as [Lower, Higher]. |
| // |
| // It stores bisect run as indexes as it needs to be serialized. The indexes |
| // are stable within the workflow thru bisectRunTracker. |
| type CommitRangeTracker struct { |
| Lower BisectRunIndex |
| Higher BisectRunIndex |
| } |
| |
| // CloneWithHigher clones itself with the overriden higher index. |
| func (t CommitRangeTracker) CloneWithHigher(higher BisectRunIndex) CommitRangeTracker { |
| return CommitRangeTracker{ |
| Lower: t.Lower, |
| Higher: higher, |
| } |
| } |
| |
| // CloneWithHigher clones itself with the overriden lower index. |
| func (t CommitRangeTracker) CloneWithLower(lower BisectRunIndex) CommitRangeTracker { |
| return CommitRangeTracker{ |
| Lower: lower, |
| Higher: t.Higher, |
| } |
| } |
| |
| func newRunnerParams(jobID string, p workflows.BisectParams, it int32, cc *midpoint.CombinedCommit, finishedIteration int32) *SingleCommitRunnerParams { |
| return &SingleCommitRunnerParams{ |
| CombinedCommit: cc, |
| PinpointJobID: jobID, |
| BotConfig: p.Request.Configuration, |
| Benchmark: p.Request.Benchmark, |
| Story: p.Request.Story, |
| Chart: p.Request.Chart, |
| AggregationMethod: p.Request.AggregationMethod, |
| Iterations: it, |
| FinishedIteration: finishedIteration, |
| BotIds: p.BotIds, |
| } |
| } |
| |
| // BisectExecution is a mirror of pinpoint_proto.BisectExecution, with additional raw data. |
| // |
| // When this BisectExecution embeds pinpoint_proto.BisectExecution, it fails to store |
| // CommitPairValues and BisectRuns, which are used to curate the information for Catapult |
| // Pinpoint. |
| // TODO(b/322203189) - This is a temporary solution for backwards compatibilty to the |
| // Catapult UI and should be removed when the catapult package is deprecated. |
| type BisectExecution struct { |
| JobId string |
| Culprits []*pinpoint_proto.CombinedCommit |
| CreateTime *timestamppb.Timestamp |
| Comparisons []*CombinedResults |
| RunData []*BisectRun |
| } |
| |
| // BisectWorkflow is a Workflow definition that takes a range of git hashes and finds the culprit. |
| func BisectWorkflow(ctx workflow.Context, p *workflows.BisectParams) (be *BisectExecution, wkErr error) { |
| ctx = workflow.WithChildOptions(ctx, childWorkflowOptions) |
| ctx = workflow.WithActivityOptions(ctx, regularActivityOptions) |
| ctx = workflow.WithLocalActivityOptions(ctx, localActivityOptions) |
| |
| logger := workflow.GetLogger(ctx) |
| |
| jobID := uuid.New().String() |
| if p.JobID != "" { |
| jobID = p.JobID |
| } |
| be = &BisectExecution{ |
| JobId: jobID, |
| Culprits: []*pinpoint_proto.CombinedCommit{}, |
| CreateTime: timestamppb.Now(), |
| Comparisons: []*CombinedResults{}, |
| } |
| |
| mh := workflow.GetMetricsHandler(ctx).WithTags(map[string]string{ |
| "job_id": jobID, |
| "user": p.Request.User, |
| "benchmark": p.Request.Benchmark, |
| "config": p.Request.Configuration, |
| "story": p.Request.Story, |
| }) |
| mh.Counter("bisect_start_count").Inc(1) |
| wkStartTime := time.Now().UnixNano() |
| defer func() { |
| duration := time.Now().UnixNano() - wkStartTime |
| mh.Timer("bisect_duration").Record(time.Duration(duration)) |
| mh.Counter("bisect_complete_count").Inc(1) |
| |
| if wkErr != nil { |
| mh.Counter("bisect_err_count").Inc(1) |
| } |
| if errors.Is(ctx.Err(), workflow.ErrCanceled) || errors.Is(ctx.Err(), workflow.ErrDeadlineExceeded) { |
| mh.Counter("bisect_timeout_count").Inc(1) |
| } |
| |
| if be != nil && len(be.Culprits) > 0 { |
| mh.Counter("bisect_found_culprit_count").Inc(1) |
| } |
| }() |
| |
| // Find the available bot list |
| if err := workflow.ExecuteActivity(ctx, FindAvailableBotsActivity, p.Request.Configuration, time.Now().UnixNano()).Get(ctx, &p.BotIds); err != nil { |
| return nil, skerr.Wrapf(err, "failed to find available bots") |
| } |
| |
| magnitude := p.GetMagnitude() |
| improvementDir := p.GetImprovementDirection() |
| |
| // minSampleSize is the minimum number of benchmark runs for each attempt |
| // Default is 10. |
| minSampleSize := p.GetInitialAttempt() |
| if minSampleSize < benchmarkRunIterations[0] { |
| logger.Warn("Initial attempt count %d is less than the default %d. Setting to default.", minSampleSize, benchmarkRunIterations[0]) |
| minSampleSize = benchmarkRunIterations[0] |
| } |
| |
| // schedulePairRuns is a helper function to schedule new benchmark runs from two BisectRun. |
| // It captures common local variable and attempts to make the code cleaner in the for-loop below. |
| schedulePairRuns := func(lower, higher *BisectRun) (workflow.ChildWorkflowFuture, workflow.ChildWorkflowFuture, error) { |
| expected := nextRunSize(lower, higher, minSampleSize) |
| lf, err := lower.scheduleRuns(ctx, jobID, *p, expected-lower.totalRuns()) |
| if err != nil { |
| logger.Warn("Failed to schedule more runs.", "commit", lower.Build.Commit, "error", err) |
| return nil, nil, skerr.Wrap(err) |
| } |
| |
| hf, err := higher.scheduleRuns(ctx, jobID, *p, expected-higher.totalRuns()) |
| if err != nil { |
| logger.Warn("Failed to schedule more runs.", "commit", higher.Build.Commit, "error", err) |
| return nil, nil, err |
| } |
| |
| return lf, hf, nil |
| } |
| |
| // The buffer size should be big enough to process incoming comparisons. |
| // The comparision is usually handled right away in the next Select(). |
| // Also, comparisons.Send happens in a goroutine because the receiving happens |
| // in the same thread as Select(), and it needs to be non-blocking. |
| comparisons := workflow.NewBufferedChannel(ctx, 100) |
| tracker := bisectRunTracker{} |
| |
| // pendings keeps track of all the ongoing benchmarks and comparisons. |
| // It counts the number of messages in the comparisons channel and the number of |
| // all the futures that are in flight. Because we generate new comparison and new |
| // futures as we go, this number is dynamic and has to be computed when it is |
| // running. |
| // There will be two cases when we increase this counter: |
| // 1) a new future is added to the selector so we need to wait for its fulfillment; |
| // 2) a new comparison is sent to the channel so we need to wait for its process. |
| // The counter will be decresed when either of the above is processed. |
| pendings := 0 |
| |
| // selector is used to implement the concurrent bisections in a non-blocking manner. |
| // the single commit runner is tracked by the future and it inserts a message into |
| // the channel to be processed after the runner completes. The channel receives the |
| // message to bisect commits. selector continues to run to process all the messages |
| // and futures, until the pendings is decreased to 0, in which case, there is no |
| // further messages or future to wait for. |
| // selector's callback is run in the same thread so we don't need to worry about |
| // race conditions here. |
| selector := workflow.NewSelector(ctx) |
| |
| // TODO(b/326352379): The errors are not handled here because they are running concurently. |
| // Each error may interrupt the other or may continue the rest. |
| selector.AddReceive(comparisons, func(c workflow.ReceiveChannel, more bool) { |
| for { |
| // cr needs to be created every time in the for-loop as the code below captures this |
| // variable in Go-routines. |
| var cr CommitRangeTracker |
| if !c.ReceiveAsync(&cr) { |
| break |
| } |
| pendings-- |
| lower, higher := tracker.get(cr.Lower), tracker.get(cr.Higher) |
| compareResult, err := compareRuns(ctx, lower, higher, p.Request.Chart, magnitude, improvementDir) |
| // The compare fails but we continue to bisect for the remainings. |
| // TODO(sunxiaodi@): Revisit compare runs error handling. compare.ComparePerformance |
| // and compare.CompareFunctional should not return error but are written to return error. |
| // GetAllValues also does not return error, so that means there are no errors passed around |
| // these functions. |
| if err != nil { |
| logger.Warn(fmt.Sprintf("Failed to compare runs: %v", err)) |
| continue |
| } |
| be.Comparisons = append(be.Comparisons, compareResult) |
| |
| result := compareResult.Result |
| switch result.Verdict { |
| case compare.Unknown: |
| // Only push to stack if less than getMaxSampleSize(). At normalized magnitudes |
| // < 0.4, it is possible to get to the max sample size and still reach an unknown |
| // verdict. Running more samples is too expensive. Instead, assume the two samples |
| // are the statistically similar. |
| // assumes that cr.Lower and cr.Higher will have the same number of runs |
| if len(lower.Runs) >= int(getMaxSampleSize()) { |
| // TODO(haowoo@): add metric to measure this occurrence |
| logger.Warn("reached unknown verdict with p-value %d and sample size of %d", result.PValue, len(lower.Runs)) |
| break |
| } |
| |
| lf, hf, err := schedulePairRuns(lower, higher) |
| if err != nil { |
| logger.Warn(fmt.Sprintf("Failed to schedule more runs (%v)", err)) |
| break |
| } |
| |
| pendings++ |
| futures := append(lower.totalPendings(), higher.totalPendings()...) |
| selector.AddFuture(common.NewFutureWithFutures(ctx, futures...), func(f workflow.Future) { |
| pendings-- |
| err := lower.updateRuns(ctx, lf) |
| if err != nil { |
| logger.Warn(fmt.Sprintf("Failed to update lower runs for (%v): %v", lower.Build.Commit, err)) |
| } |
| |
| err = higher.updateRuns(ctx, hf) |
| if err != nil { |
| logger.Warn(fmt.Sprintf("Failed to update higher runs for (%v): %v", higher.Build.Commit, err)) |
| } |
| workflow.Go(ctx, func(gCtx workflow.Context) { |
| comparisons.Send(gCtx, cr) |
| }) |
| pendings++ |
| }) |
| |
| case compare.Different: |
| var mid *midpoint.CombinedCommit |
| if err := workflow.ExecuteActivity(ctx, FindMidCommitActivity, lower.Build.Commit, higher.Build.Commit).Get(ctx, &mid); err != nil { |
| logger.Warn(fmt.Sprintf("Failed to find middle commit: %v", err)) |
| break |
| } |
| |
| var equal bool |
| if err := workflow.ExecuteActivity(ctx, CheckCombinedCommitEqualActivity, lower.Build.Commit, mid).Get(ctx, &equal); err != nil { |
| logger.Warn("Failed to determine equality between two combined commits") |
| break |
| } |
| if equal { |
| // TODO(b/329502712): Append additional info to bisectionExecution |
| // such as p-values, average difference |
| be.Culprits = append(be.Culprits, (*pinpoint_proto.CombinedCommit)(higher.Build.Commit)) |
| break |
| } |
| |
| midRunIdx, midRun := tracker.newRun(mid) |
| mf, err := midRun.scheduleRuns(ctx, be.JobId, *p, nextRunSize(lower, midRun, minSampleSize)) |
| if err != nil { |
| logger.Warn(fmt.Sprintf("Failed to schedule more runs for (%v): %v", mid, err)) |
| break |
| } |
| |
| pendings++ |
| selector.AddFuture(mf, func(f workflow.Future) { |
| pendings-- |
| err := midRun.updateRuns(ctx, mf) |
| if err != nil { |
| logger.Warn(fmt.Sprintf("Failed to update runs for (%v): %v", mid, err)) |
| return |
| } |
| workflow.Go(ctx, func(gCtx workflow.Context) { |
| comparisons.Send(gCtx, cr.CloneWithHigher(midRunIdx)) |
| comparisons.Send(gCtx, cr.CloneWithLower(midRunIdx)) |
| }) |
| pendings = pendings + 2 |
| }) |
| } |
| } |
| }) |
| |
| // Schedule the first pair and wait for all to finish before continuing. |
| lowerIdx, lower := tracker.newRun(midpoint.NewCombinedCommit(midpoint.NewChromiumCommit(p.Request.StartGitHash))) |
| higherIdx, higher := tracker.newRun(midpoint.NewCombinedCommit(midpoint.NewChromiumCommit(p.Request.EndGitHash))) |
| lf, hf, err := schedulePairRuns(lower, higher) |
| if err != nil { |
| // If we are able to schedule in the beginning, there is less chance we will fail in the middle. |
| return nil, skerr.Wrapf(err, "failed to schedule initial runs") |
| } |
| |
| if err := lower.updateRuns(ctx, lf); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| if err := higher.updateRuns(ctx, hf); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| // Send the first pair to the channel to process. |
| pendings++ |
| comparisons.SendAsync(CommitRangeTracker{Lower: lowerIdx, Higher: higherIdx}) |
| |
| // TODO(b/322203189): Store and order the new commits so that the data can be relayed |
| // to the UI |
| for pendings > 0 { |
| selector.Select(ctx) |
| } |
| |
| be.RunData = make([]*BisectRun, len(tracker.runs)) |
| copy(be.RunData, tracker.runs) |
| |
| return be, nil |
| } |