Add bisectRunTracker to be able to serialize CommitRangeTracker.

bisectRunTracker maps indexes to the actual BisectRun such that the
indices can be used for CommitRangeTracker and be able to serialize.
Note workflow.Future is a runtime feature that can't be serialized.

The underlying BisectRun needs one lookup and it will be shared
between each comparisons. This prepares the future work to run
bisection comparisons in parellel.

Bug: b/326352379
Change-Id: I89899099f2ee4acc701b38f24e85e12bde8341d1
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/830617
Reviewed-by: Kevin Lubick <kjlubick@google.com>
Commit-Queue: Hao Wu <haowoo@google.com>
diff --git a/pinpoint/go/workflows/internal/bisect.go b/pinpoint/go/workflows/internal/bisect.go
index 11dd69b..e1ad6d3 100644
--- a/pinpoint/go/workflows/internal/bisect.go
+++ b/pinpoint/go/workflows/internal/bisect.go
@@ -23,18 +23,74 @@
 	return benchmarkRunIterations[len(benchmarkRunIterations)-1]
 }
 
-// CommitRangeTracker stores a commit range as [Lower, Higher]
-// and tracks the expected sample size required for comparison.
+// bisectRunTracker stores all the running bisect runs.
+//
+// It keeps track of all the runs by indexes. The BisectRun will be updated from different
+// future fulfillment. One can wait for a bisect run that's already triggered by a different
+// bisection. This usually happens when the mid commit is computed and the result will be
+// used for comparisions from both sides. This can also happen when one comparision requires
+// more runs, and part of them is already triggered by another comparision.
+type BisectRunIndex int
+type bisectRunTracker struct {
+	runs []*BisectRun
+}
+
+func (t *bisectRunTracker) newRun(cc *midpoint.CombinedCommit) (BisectRunIndex, *BisectRun) {
+	r := newBisectRun(cc)
+	t.runs = append(t.runs, r)
+	return BisectRunIndex(len(t.runs) - 1), r
+}
+
+func (t bisectRunTracker) get(r BisectRunIndex) *BisectRun {
+	if r < 0 || int(r) >= len(t.runs) {
+		return nil
+	}
+	return t.runs[int(r)]
+}
+
+// CommitRangeTracker stores a commit range as [Lower, Higher] and expected sample size.
+//
+// It stores bisect run as indexes as it needs to be serialized. The indexes
+// are stable within the workflow thru bisectRunTracker.
 type CommitRangeTracker struct {
-	Lower              *BisectRun
-	Higher             *BisectRun
+	Lower              BisectRunIndex
+	Higher             BisectRunIndex
 	ExpectedSampleSize int32
 }
 
-func newTrackerWithHashes(lowerHash, higherHash string, expectedSize int32) *CommitRangeTracker {
-	return &CommitRangeTracker{
-		Lower:              newBisectRun(midpoint.NewCombinedCommit(midpoint.NewChromiumCommit(lowerHash))),
-		Higher:             newBisectRun(midpoint.NewCombinedCommit(midpoint.NewChromiumCommit(higherHash))),
+func (t CommitRangeTracker) CloneWithHigher(higher BisectRunIndex) CommitRangeTracker {
+	return CommitRangeTracker{
+		Lower:              t.Lower,
+		Higher:             higher,
+		ExpectedSampleSize: t.ExpectedSampleSize,
+	}
+}
+
+func (t CommitRangeTracker) CloneWithLower(lower BisectRunIndex) CommitRangeTracker {
+	return CommitRangeTracker{
+		Lower:              lower,
+		Higher:             t.Higher,
+		ExpectedSampleSize: t.ExpectedSampleSize,
+	}
+}
+
+func (t CommitRangeTracker) CloneWithExpectedSampleSize(expectedSampleSize int32) CommitRangeTracker {
+	return CommitRangeTracker{
+		Lower:              t.Lower,
+		Higher:             t.Higher,
+		ExpectedSampleSize: expectedSampleSize,
+	}
+}
+
+// newTrackerWithHashes returns a CommitRangeTracker containing two new BisectRun.
+//
+// This is a helper function to make the code slightly cleaner, this function will eventually retire.
+func (t *bisectRunTracker) newTrackerWithHashes(lowerHash, higherHash string, expectedSize int32) CommitRangeTracker {
+	lower, _ := t.newRun(midpoint.NewCombinedCommit(midpoint.NewChromiumCommit(lowerHash)))
+	higher, _ := t.newRun(midpoint.NewCombinedCommit(midpoint.NewChromiumCommit(higherHash)))
+	return CommitRangeTracker{
+		Lower:              lower,
+		Higher:             higher,
 		ExpectedSampleSize: expectedSize,
 	}
 }
@@ -52,13 +108,13 @@
 // FindMidCommitActivity is an Activity that finds the middle point of two commits.
 //
 // TODO(b/326352320): Move this into its own file.
-func FindMidCommitActivity(ctx context.Context, cr *CommitRangeTracker) (*midpoint.CombinedCommit, error) {
+func FindMidCommitActivity(ctx context.Context, lower, higher *midpoint.CombinedCommit) (*midpoint.CombinedCommit, error) {
 	httpClientTokenSource, err := google.DefaultTokenSource(ctx, auth.ScopeReadOnly)
 	if err != nil {
 		return nil, skerr.Wrapf(err, "Problem setting up default token source")
 	}
 	c := httputils.DefaultClientConfig().WithTokenSource(httpClientTokenSource).With2xxOnly().Client()
-	m, err := midpoint.New(ctx, c).FindMidCombinedCommit(ctx, cr.Lower.Commit, cr.Higher.Commit)
+	m, err := midpoint.New(ctx, c).FindMidCombinedCommit(ctx, lower, higher)
 	if err != nil {
 		return nil, skerr.Wrap(err)
 	}
@@ -153,31 +209,49 @@
 		}
 	}
 
-	commitStack := stack.New[*CommitRangeTracker]()
-	commitStack.Push(newTrackerWithHashes(p.Request.StartGitHash, p.Request.EndGitHash, minSampleSize))
+	// schedulePairRuns is a helper function to schedule new benchmark runs from two BisectRun.
+	// It captures common local variable and attempts to make the code cleaner in the for-loop below.
+	schedulePairRuns := func(lower, higher *BisectRun) (workflow.ChildWorkflowFuture, workflow.ChildWorkflowFuture, error) {
+		expected := nextRunSize(lower, higher, minSampleSize)
+		lf, err := lower.scheduleRuns(ctx, jobID, *p, expected-lower.totalRuns())
+		if err != nil {
+			logger.Warn("Failed to schedule more runs.", "commit", *lower.Commit, "error", err)
+			return nil, nil, skerr.Wrap(err)
+		}
+
+		hf, err := higher.scheduleRuns(ctx, jobID, *p, expected-higher.totalRuns())
+		if err != nil {
+			logger.Warn("Failed to schedule more runs.", "commit", *higher.Commit, "error", err)
+			return nil, nil, err
+		}
+
+		return lf, hf, nil
+	}
+
+	tracker := bisectRunTracker{}
+	commitStack := stack.New[CommitRangeTracker]()
+	commitStack.Push(tracker.newTrackerWithHashes(p.Request.StartGitHash, p.Request.EndGitHash, minSampleSize))
 
 	// TODO(b/322203189): Store and order the new commits so that the data can be relayed
 	// to the UI
 	for commitStack.Size() > 0 {
 		cr := commitStack.Pop()
-		lf, err := cr.Lower.scheduleRuns(ctx, jobID, *p, cr.ExpectedSampleSize-cr.Lower.totalRuns())
-		if err != nil {
-			return nil, skerr.Wrap(err)
-		}
-		hf, err := cr.Higher.scheduleRuns(ctx, jobID, *p, cr.ExpectedSampleSize-cr.Higher.totalRuns())
+
+		lower, higher := tracker.get(cr.Lower), tracker.get(cr.Higher)
+		lf, hf, err := schedulePairRuns(lower, higher)
 		if err != nil {
 			return nil, skerr.Wrap(err)
 		}
 
-		if err := cr.Lower.updateRuns(ctx, lf); err != nil {
+		if err := lower.updateRuns(ctx, lf); err != nil {
 			return nil, skerr.Wrap(err)
 		}
 
-		if err := cr.Higher.updateRuns(ctx, hf); err != nil {
+		if err := higher.updateRuns(ctx, hf); err != nil {
 			return nil, skerr.Wrap(err)
 		}
 
-		result, err := compareRuns(ctx, &cr.Higher.CommitRun, &cr.Lower.CommitRun, p.Request.Chart, magnitude)
+		result, err := compareRuns(ctx, &higher.CommitRun, &lower.CommitRun, p.Request.Chart, magnitude)
 		if err != nil {
 			return nil, skerr.Wrap(err)
 		}
@@ -188,48 +262,37 @@
 			// verdict. Running more samples is too expensive. Instead, assume the two samples
 			// are the statistically similar.
 			// assumes that cr.Lower and cr.Higher will have the same number of runs
-			if len(cr.Lower.Runs) < int(getMaxSampleSize()) {
-				commitStack.Push(&CommitRangeTracker{
-					Lower:              cr.Lower,
-					Higher:             cr.Higher,
-					ExpectedSampleSize: nextRunSize(cr.Lower, cr.Higher, minSampleSize),
-				})
-			} else {
+			if len(lower.Runs) >= int(getMaxSampleSize()) {
 				// TODO(haowoo@): add metric to measure this occurrence
-				logger.Warn("reached unknown verdict with p-value %d and sample size of %d", result.PValue, len(cr.Lower.Runs))
+				logger.Warn("reached unknown verdict with p-value %d and sample size of %d", result.PValue, len(lower.Runs))
+				break
 			}
+			commitStack.Push(cr.CloneWithExpectedSampleSize(nextRunSize(lower, higher, minSampleSize)))
 
 		case compare.Different:
 			// TODO(b/326352320): If the middle point has a different repo, it means that it looks into
 			//	the autoroll and there are changes in DEPS. We need to construct a CombinedCommit so it
 			//	can currently build with modified deps.
 			var mid *midpoint.CombinedCommit
-			if err := workflow.ExecuteActivity(ctx, FindMidCommitActivity, cr).Get(ctx, &mid); err != nil {
+			if err := workflow.ExecuteActivity(ctx, FindMidCommitActivity, lower.Commit, higher.Commit).Get(ctx, &mid); err != nil {
 				return nil, skerr.Wrap(err)
 			}
 
 			// TODO(b/326352319): Update protos so that pb.BisectionExecution can track multiple culprits.
-			if mid.Key() == cr.Lower.Commit.Key() {
+			if mid.Key() == lower.Commit.Key() {
 				// TODO(b/329502712): Append additional info to bisectionExecution
 				// such as p-values, average difference
-				e.Culprits = append(e.Culprits, cr.Higher.Commit.GetMainGitHash())
+				e.Culprits = append(e.Culprits, higher.Commit.GetMainGitHash())
 				break
 			}
-			midRun := newBisectRun(mid)
+
+			midIdx, midRun := tracker.newRun(mid)
 
 			// Both higher and lower should contain the same number runs so we would expect the same
 			// number of runs for both sides (lower, mid) and (mid, higher)
-			sampleSize := nextRunSize(cr.Lower, midRun, minSampleSize)
-			commitStack.Push(&CommitRangeTracker{
-				Lower:              cr.Lower,
-				Higher:             midRun,
-				ExpectedSampleSize: sampleSize,
-			})
-			commitStack.Push(&CommitRangeTracker{
-				Lower:              midRun,
-				Higher:             cr.Higher,
-				ExpectedSampleSize: sampleSize,
-			})
+			cr = cr.CloneWithExpectedSampleSize(nextRunSize(lower, midRun, minSampleSize))
+			commitStack.Push(cr.CloneWithHigher(midIdx))
+			commitStack.Push(cr.CloneWithLower(midIdx))
 		}
 	}
 	return e, nil
diff --git a/pinpoint/go/workflows/internal/bisect_test.go b/pinpoint/go/workflows/internal/bisect_test.go
index c59eb51..2df24b7 100644
--- a/pinpoint/go/workflows/internal/bisect_test.go
+++ b/pinpoint/go/workflows/internal/bisect_test.go
@@ -6,6 +6,7 @@
 	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
 	"go.skia.org/infra/pinpoint/go/compare"
+	"go.skia.org/infra/pinpoint/go/midpoint"
 	"go.skia.org/infra/pinpoint/go/workflows"
 	"go.temporal.io/sdk/testsuite"
 	"go.temporal.io/sdk/workflow"
@@ -48,3 +49,34 @@
 	require.Empty(t, be.Culprits)
 	env.AssertExpectations(t)
 }
+
+func TestBisectRunTracker_NewIdx_ReturnSameRun(t *testing.T) {
+	tracker := bisectRunTracker{}
+	idx, run := tracker.newRun(&midpoint.CombinedCommit{})
+	require.Same(t, run, tracker.get(idx), "should be exact same addresses")
+}
+
+func TestBisectRunTracker_TwoRuns_ReturnDiffIndex(t *testing.T) {
+	tracker := bisectRunTracker{}
+	idx1, run1 := tracker.newRun(&midpoint.CombinedCommit{})
+	idx2, run2 := tracker.newRun(&midpoint.CombinedCommit{})
+	require.NotEqualValues(t, idx1, idx2)
+	require.NotSame(t, run1, run2, "pointers should be different")
+	require.NotSame(t, tracker.get(idx1), tracker.get(idx2), "pointers should be different")
+}
+
+func TestBisectRunTracker_NonExistIndex_ReturnNil(t *testing.T) {
+	nonExist := BisectRunIndex(1000)
+	tracker := bisectRunTracker{}
+	require.Nil(t, tracker.get(nonExist))
+	_, _ = tracker.newRun(&midpoint.CombinedCommit{})
+	require.Nil(t, tracker.get(nonExist))
+}
+
+func TestBisectRunTracker_ManyRuns_ReturnIndex(t *testing.T) {
+	tracker := bisectRunTracker{}
+	for i := 0; i < 100; i++ {
+		idx, run := tracker.newRun(&midpoint.CombinedCommit{})
+		require.Same(t, run, tracker.get(idx), "should be exact same addresses")
+	}
+}