[pinpoint] Add bot Id to dimensions for pairwise execution

Mentioning botId in the dimensions is required to make sure that
the benchmark is run on a specific device

Bug: b/321304277
Change-Id: I4eec79dbf62a9e8e1f9f97e0c8bd0e4153c58522
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/834716
Reviewed-by: Leina Sun <sunxiaodi@google.com>
Commit-Queue: Vidit Chitkara <viditchitkara@google.com>
diff --git a/pinpoint/go/run_benchmark/run_benchmark.go b/pinpoint/go/run_benchmark/run_benchmark.go
index 0dbd30b..4a40caf 100644
--- a/pinpoint/go/run_benchmark/run_benchmark.go
+++ b/pinpoint/go/run_benchmark/run_benchmark.go
@@ -75,7 +75,7 @@
 }
 
 // Run schedules a swarming task to run the RunBenchmarkRequest.
-func Run(ctx context.Context, sc backends.SwarmingClient, commit, bot, benchmark, story, storyTag string, jobID string, buildArtifact *spb.SwarmingRpcsCASReference, iter int) ([]*spb.SwarmingRpcsTaskRequestMetadata, error) {
+func Run(ctx context.Context, sc backends.SwarmingClient, commit, bot, benchmark, story, storyTag string, jobID string, buildArtifact *spb.SwarmingRpcsCASReference, iter int, dimensions []map[string]string) ([]*spb.SwarmingRpcsTaskRequestMetadata, error) {
 	botConfig, err := bot_configs.GetBotConfig(bot, false)
 	if err != nil {
 		return nil, skerr.Wrapf(err, "Failed to create benchmark test object")
@@ -86,7 +86,11 @@
 		return nil, skerr.Wrapf(err, "Failed to prepare benchmark test for execution")
 	}
 
-	swarmingRequest := createSwarmingRequest(jobID, bt.GetCommand(), buildArtifact, botConfig.Dimensions)
+	dims := botConfig.Dimensions
+	if dimensions != nil {
+		dims = append(dims, dimensions...)
+	}
+	swarmingRequest := createSwarmingRequest(jobID, bt.GetCommand(), buildArtifact, dims)
 
 	resp := make([]*spb.SwarmingRpcsTaskRequestMetadata, 0)
 	for i := 0; i < iter; i++ {
diff --git a/pinpoint/go/run_benchmark/run_benchmark_test.go b/pinpoint/go/run_benchmark/run_benchmark_test.go
index 0beb76a..0aa6d1a 100644
--- a/pinpoint/go/run_benchmark/run_benchmark_test.go
+++ b/pinpoint/go/run_benchmark/run_benchmark_test.go
@@ -51,7 +51,7 @@
 		Return(&swarmingV1.SwarmingRpcsTaskRequestMetadata{
 			TaskId: "123",
 		}, nil).Once()
-	taskIds, err := Run(ctx, sc, c, "android-pixel2_webview-perf", "performance_browser_tests", "story", "all", fakeID, buildArtifact, 1)
+	taskIds, err := Run(ctx, sc, c, "android-pixel2_webview-perf", "performance_browser_tests", "story", "all", fakeID, buildArtifact, 1, nil)
 	assert.NoError(t, err)
 	assert.Equal(t, 1, len(taskIds))
 	assert.Equal(t, "123", taskIds[0].TaskId)
diff --git a/pinpoint/go/workflows/internal/commits_runner.go b/pinpoint/go/workflows/internal/commits_runner.go
index 35befe7..632b692 100644
--- a/pinpoint/go/workflows/internal/commits_runner.go
+++ b/pinpoint/go/workflows/internal/commits_runner.go
@@ -121,17 +121,21 @@
 	return runs
 }
 
-func runBenchmark(ctx workflow.Context, cc *midpoint.CombinedCommit, cas *swarmingV1.SwarmingRpcsCASReference, scrp *SingleCommitRunnerParams) (*workflows.TestRun, error) {
+func runBenchmark(ctx workflow.Context, cc *midpoint.CombinedCommit, cas *swarmingV1.SwarmingRpcsCASReference, scrp *SingleCommitRunnerParams, dimensions []map[string]string, iteration int64) (*workflows.TestRun, error) {
 	var tr *workflows.TestRun
-	if err := workflow.ExecuteChildWorkflow(ctx, workflows.RunBenchmark, &RunBenchmarkParams{
-		JobID:     scrp.PinpointJobID,
-		Commit:    cc,
-		BotConfig: scrp.BotConfig,
-		BuildCAS:  cas,
-		Benchmark: scrp.Benchmark,
-		Story:     scrp.Story,
-		StoryTags: scrp.StoryTags,
-	}).Get(ctx, &tr); err != nil {
+	rbp := &RunBenchmarkParams{
+		JobID:        scrp.PinpointJobID,
+		Commit:       cc,
+		BotConfig:    scrp.BotConfig,
+		BuildCAS:     cas,
+		Benchmark:    scrp.Benchmark,
+		Story:        scrp.Story,
+		StoryTags:    scrp.StoryTags,
+		Dimensions:   dimensions,
+		IterationIdx: iteration,
+	}
+
+	if err := workflow.ExecuteChildWorkflow(ctx, workflows.RunBenchmark, rbp).Get(ctx, &tr); err != nil {
 		return nil, err
 	}
 
@@ -175,10 +179,14 @@
 	ctx = workflow.WithChildOptions(ctx, runBenchmarkWorkflowOptions)
 	ctx = workflow.WithActivityOptions(ctx, regularActivityOptions)
 	for i := 0; i < int(sc.Iterations); i++ {
+		// We need to make a copy of i since the following is a closure. By making a
+		// copy every closure will point to it's own copy of i rather than pointing to
+		// the same variable.
+		iteration := int64(i)
 		workflow.Go(ctx, func(gCtx workflow.Context) {
 			defer wg.Done()
 
-			tr, err := runBenchmark(gCtx, sc.CombinedCommit, b.CAS, sc)
+			tr, err := runBenchmark(gCtx, sc.CombinedCommit, b.CAS, sc, nil, iteration)
 
 			if err != nil {
 				ec.Send(gCtx, err)
diff --git a/pinpoint/go/workflows/internal/pairwise_runner.go b/pinpoint/go/workflows/internal/pairwise_runner.go
index 645dfaa..8c600d4 100644
--- a/pinpoint/go/workflows/internal/pairwise_runner.go
+++ b/pinpoint/go/workflows/internal/pairwise_runner.go
@@ -35,7 +35,7 @@
 //
 // The function makes a swarming API call internally to fetch the desired bots. If successful, a slice
 // of bot ids is returned
-func FindAvailableBotsActivity(ctx context.Context, botConfig string) ([]string, error) {
+func FindAvailableBotsActivity(ctx context.Context, botConfig string, seed int64) ([]string, error) {
 	sc, err := backends.NewSwarmingClient(ctx, backends.DefaultSwarmingServiceAddress)
 	if err != nil {
 		return nil, skerr.Wrapf(err, "Failed to initialize swarming client")
@@ -51,6 +51,13 @@
 		botIds[i] = b.BotId
 	}
 
+	// The list of bot ids is randomized to make sure that the tasks
+	// do not everytime pick the same set of bots and leave the remaining
+	// unused almost the entire time.
+	rand.New(rand.NewSource(seed)).Shuffle(len(botIds), func(i, j int) {
+		botIds[i], botIds[j] = botIds[j], botIds[i]
+	})
+
 	return botIds, nil
 }
 
@@ -83,7 +90,7 @@
 	ctx = workflow.WithChildOptions(ctx, runBenchmarkWorkflowOptions)
 
 	var botIds []string
-	if err := workflow.ExecuteActivity(ctx, FindAvailableBotsActivity, pc.BotConfig).Get(ctx, &botIds); err != nil {
+	if err := workflow.ExecuteActivity(ctx, FindAvailableBotsActivity, pc.BotConfig, pc.Seed).Get(ctx, &botIds); err != nil {
 		return nil, err
 	}
 
@@ -95,17 +102,22 @@
 
 	// TODO(b/332391612): viditchitkara@ Build chrome for leftBuild and rightBuild in parallel
 	// to save time.
-	leftBuild, err := buildChrome(ctx, pc.PinpointJobID, pc.BotConfig, pc.Benchmark, pc.LeftBuild.Commit)
-	if err != nil {
-		return nil, skerr.Wrapf(err, "unable to build chrome for commit %s", pc.LeftBuild.Commit.Main.GitHash)
-	}
-	pc.LeftBuild = *leftBuild
 
-	rightBuild, err := buildChrome(ctx, pc.PinpointJobID, pc.BotConfig, pc.Benchmark, pc.RightBuild.Commit)
-	if err != nil {
-		return nil, skerr.Wrapf(err, "unable to build chrome for commit %s", pc.RightBuild.Commit.Main.GitHash)
+	if pc.LeftBuild.CAS == nil {
+		leftBuild, err := buildChrome(ctx, pc.PinpointJobID, pc.BotConfig, pc.Benchmark, pc.LeftBuild.Commit)
+		if err != nil {
+			return nil, skerr.Wrapf(err, "unable to build chrome for commit %s", pc.LeftBuild.Commit.Main.GitHash)
+		}
+		pc.LeftBuild = *leftBuild
 	}
-	pc.RightBuild = *rightBuild
+
+	if pc.RightBuild.CAS == nil {
+		rightBuild, err := buildChrome(ctx, pc.PinpointJobID, pc.BotConfig, pc.Benchmark, pc.RightBuild.Commit)
+		if err != nil {
+			return nil, skerr.Wrapf(err, "unable to build chrome for commit %s", pc.RightBuild.Commit.Main.GitHash)
+		}
+		pc.RightBuild = *rightBuild
+	}
 
 	pairs := generatePairIndices(pc.Seed, int(pc.Iterations))
 	runs := []struct {
@@ -130,12 +142,22 @@
 	orders := [][2]int{{0, 1}, {1, 0}}
 	for i := 0; i < int(pc.Iterations); i++ {
 		pairIdx := pairs[i]
+		botDimension := []map[string]string{
+			{
+				"key":   "id",
+				"value": botIds[i%len(botIds)],
+			},
+		}
+
+		// We need to make a copy of i since the following is a closure. By making a
+		// copy every closure will point to it's own copy of i rather than pointing to
+		// the same variable.
+		iteration := int64(i)
 		workflow.Go(ctx, func(gCtx workflow.Context) {
 			defer wg.Done()
 
 			for _, idx := range orders[pairIdx] {
-				// TODO(viditchitkara@): append bot id to the dimension so they only run on the given bot.
-				tr, err := runBenchmark(gCtx, runs[idx].cc, runs[idx].cas, &pc.SingleCommitRunnerParams)
+				tr, err := runBenchmark(gCtx, runs[idx].cc, runs[idx].cas, &pc.SingleCommitRunnerParams, botDimension, iteration)
 				if err != nil {
 					ec.Send(gCtx, err)
 					continue
diff --git a/pinpoint/go/workflows/internal/run_benchmark.go b/pinpoint/go/workflows/internal/run_benchmark.go
index 61ef1ae..e5b8e92 100644
--- a/pinpoint/go/workflows/internal/run_benchmark.go
+++ b/pinpoint/go/workflows/internal/run_benchmark.go
@@ -32,6 +32,12 @@
 	Story string
 	// story tags for the test
 	StoryTags string
+	// additional dimensions for bot selection
+	Dimensions []map[string]string
+	// iteration for the benchmark run. A few workflows have multiple iterations of
+	// benchmark runs and this param comes in handy to get additional info of a specific run.
+	// This is for debugging/informational purposes only.
+	IterationIdx int64
 }
 
 // RunBenchmarkActivity wraps RunBenchmarkWorkflow in Activities
@@ -97,7 +103,7 @@
 		return "", skerr.Wrap(err)
 	}
 
-	taskIds, err := run_benchmark.Run(ctx, sc, rbp.Commit.GetMainGitHash(), rbp.BotConfig, rbp.Benchmark, rbp.Story, rbp.StoryTags, rbp.JobID, rbp.BuildCAS, 1)
+	taskIds, err := run_benchmark.Run(ctx, sc, rbp.Commit.GetMainGitHash(), rbp.BotConfig, rbp.Benchmark, rbp.Story, rbp.StoryTags, rbp.JobID, rbp.BuildCAS, 1, rbp.Dimensions)
 	if err != nil {
 		return "", err
 	}
diff --git a/pinpoint/go/workflows/sample/main.go b/pinpoint/go/workflows/sample/main.go
index dafcdb8..7a74947 100644
--- a/pinpoint/go/workflows/sample/main.go
+++ b/pinpoint/go/workflows/sample/main.go
@@ -29,6 +29,7 @@
 	commit                  = flag.String("commit", "611b5a084486cd6d99a0dad63f34e320a2ebc2b3", "Git commit hash to build Chrome.")
 	triggerBisectFlag       = flag.Bool("bisect", false, "toggle true to trigger bisect workflow")
 	triggerSingleCommitFlag = flag.Bool("single-commit", false, "toggle true to trigger single commit runner workflow")
+	triggerPairwiseFlag     = flag.Bool("pairwise", false, "toggle true to trigger pairwise commit runner workflow")
 )
 
 func defaultWorkflowOptions() client.StartWorkflowOptions {
@@ -77,6 +78,62 @@
 	return be
 }
 
+func triggerPairwiseRunner(c client.Client) *internal.PairwiseRun {
+	ctx := context.Background()
+	// based off of https://pinpoint-dot-chromeperf.appspot.com/job/179a34b2be0000
+	p := &internal.PairwiseCommitsRunnerParams{
+		SingleCommitRunnerParams: internal.SingleCommitRunnerParams{
+			PinpointJobID:     "179a34b2be0000",
+			BotConfig:         "android-pixel4-perf",
+			Benchmark:         "blink_perf.bindings",
+			Story:             "gc-mini-tree.html",
+			Chart:             "gc-mini-tree",
+			AggregationMethod: "mean",
+			CombinedCommit:    midpoint.NewCombinedCommit(&pb.Commit{GitHash: *commit}),
+			Iterations:        6,
+		},
+		Seed: 54321,
+		LeftBuild: workflows.Build{
+			BuildChromeParams: workflows.BuildChromeParams{
+				Commit: midpoint.NewCombinedCommit(&pb.Commit{GitHash: "573a50658f4301465569c3faf00a145093a1fe9b"}), // 1284448
+			},
+			CAS: &swarmingV1.SwarmingRpcsCASReference{
+				CasInstance: "projects/chrome-swarming/instances/default_instance",
+				Digest: &swarmingV1.SwarmingRpcsDigest{
+					Hash:      "062ccf0a30a362d8e4df3c9b82172a78e3d62c2990eb30927f5863a6b08e80bb",
+					SizeBytes: 810,
+				},
+			},
+		},
+		RightBuild: workflows.Build{
+			BuildChromeParams: workflows.BuildChromeParams{
+				Commit: midpoint.NewCombinedCommit(&pb.Commit{GitHash: "a633e198b79b2e0c83c72a3006cdffe642871e22"}), // 1284449
+			},
+			CAS: &swarmingV1.SwarmingRpcsCASReference{
+				CasInstance: "projects/chrome-swarming/instances/default_instance",
+				Digest: &swarmingV1.SwarmingRpcsDigest{
+					Hash:      "51845150f953c33ee4c0900589ba916ca28b7896806460aa8935c0de2b209db6",
+					SizeBytes: 810,
+				},
+			},
+		},
+	}
+
+	var pr *internal.PairwiseRun
+	we, err := c.ExecuteWorkflow(ctx, defaultWorkflowOptions(), workflows.PairwiseCommitsRunner, p)
+	if err != nil {
+		sklog.Fatalf("Unable to execute workflow: %v", err)
+		return nil
+	}
+	sklog.Infof("Started workflow.. WorkflowID: %v RunID: %v", we.GetID(), we.GetRunID())
+
+	if err := we.Get(ctx, &pr); err != nil {
+		sklog.Fatalf("Unable to get result: %v", err)
+		return nil
+	}
+	return pr
+}
+
 func triggerSingleCommitRunner(c client.Client) *internal.CommitRun {
 	ctx := context.Background()
 	p := &internal.SingleCommitRunnerParams{
@@ -159,4 +216,8 @@
 		result := triggerSingleCommitRunner(c)
 		sklog.Infof("Workflow result: %v", spew.Sdump(result))
 	}
+	if *triggerPairwiseFlag {
+		result := triggerPairwiseRunner(c)
+		sklog.Infof("Workflow result: %v", spew.Sdump(result))
+	}
 }