When the parent workflow was cancelled, cleanup steps to cancel the running swarming and buildbucket tasks

Based on the Temporal cancellation document: https://docs.temporal.io/dev-guide/go/cancellation
The Child workflow option "ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL" will send cancellation to the child workflows when the parent workflow got cancelled.

When the child workflow context got cancelled, ErrCanceled will be return, which will trigger the CleanupXXXActivity.

Tested on dev environment, the child workflows can be cancelled, and the swarming/build tasks can be cancelled.

Bug: b/336632955
Change-Id: I3146ea345c30b27396efaed65f9ccb68c5aeaaf2
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/846996
Reviewed-by: Jeff Yoon <jeffyoon@google.com>
Commit-Queue: Alex Sun <sunpeng@google.com>
diff --git a/pinpoint/go/run_benchmark/run_benchmark.go b/pinpoint/go/run_benchmark/run_benchmark.go
index 7225d16..9aa9d6f 100644
--- a/pinpoint/go/run_benchmark/run_benchmark.go
+++ b/pinpoint/go/run_benchmark/run_benchmark.go
@@ -101,3 +101,13 @@
 
 	return resp, nil
 }
+
+// Cancel cancels a swarming task.
+func Cancel(ctx context.Context, sc backends.SwarmingClient, taskID string) error {
+	err := sc.CancelTasks(ctx, []string{taskID})
+	if err != nil {
+		return skerr.Wrapf(err, "benchmark task %v cancellation failed", taskID)
+	}
+
+	return nil
+}
diff --git a/pinpoint/go/workflows/internal/BUILD.bazel b/pinpoint/go/workflows/internal/BUILD.bazel
index 44a3c2c..856451a 100644
--- a/pinpoint/go/workflows/internal/BUILD.bazel
+++ b/pinpoint/go/workflows/internal/BUILD.bazel
@@ -33,6 +33,7 @@
         "//pinpoint/proto/v1:proto",
         "//temporal/go/common",
         "@com_github_google_uuid//:uuid",
+        "@io_temporal_go_api//enums/v1:enums",
         "@io_temporal_go_sdk//activity",
         "@io_temporal_go_sdk//temporal",
         "@io_temporal_go_sdk//workflow",
diff --git a/pinpoint/go/workflows/internal/build_chrome.go b/pinpoint/go/workflows/internal/build_chrome.go
index c03cea5..652be09 100644
--- a/pinpoint/go/workflows/internal/build_chrome.go
+++ b/pinpoint/go/workflows/internal/build_chrome.go
@@ -2,6 +2,7 @@
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"time"
 
@@ -25,12 +26,28 @@
 
 	bca := &BuildChromeActivity{}
 	var buildID int64
+	var status buildbucketpb.Status
+	defer func() {
+		// ErrCanceled is the error returned by Context.Err when the context is canceled
+		// This logic ensures cleanup only happens if there is a Cancellation error
+		if !errors.Is(ctx.Err(), workflow.ErrCanceled) {
+			return
+		}
+		// For the Workflow to execute an Activity after it receives a Cancellation Request
+		// It has to get a new disconnected context
+		newCtx, _ := workflow.NewDisconnectedContext(ctx)
+
+		err := workflow.ExecuteActivity(newCtx, bca.CleanupBuildActivity, buildID, status).Get(ctx, nil)
+		if err != nil {
+			logger.Error("CleanupBuildActivity failed", err)
+		}
+	}()
+
 	if err := workflow.ExecuteActivity(ctx, bca.SearchOrBuildActivity, params).Get(ctx, &buildID); err != nil {
 		logger.Error("Failed to wait for SearchOrBuildActivity:", err)
 		return nil, err
 	}
 
-	var status buildbucketpb.Status
 	if err := workflow.ExecuteActivity(ctx, bca.WaitBuildCompletionActivity, buildID).Get(ctx, &status); err != nil {
 		logger.Error("Failed to wait for WaitBuildCompletionActivity:", err)
 		return nil, err
@@ -128,3 +145,25 @@
 	}
 	return cas, nil
 }
+
+// CleanupBuildActivity wraps BuildChromeClient.CancelBuild
+func (bca *BuildChromeActivity) CleanupBuildActivity(ctx context.Context, buildID int64, status buildbucketpb.Status) error {
+	if buildID == 0 || !(status == buildbucketpb.Status_SCHEDULED || status == buildbucketpb.Status_STARTED) {
+		return nil
+	}
+
+	logger := activity.GetLogger(ctx)
+	bc, err := build_chrome.New(ctx)
+	if err != nil {
+		logger.Error("Failed to new build_chrome:", err)
+		return err
+	}
+
+	activity.RecordHeartbeat(ctx, "cancelling the build.")
+	err = bc.CancelBuild(ctx, buildID, "Pinpoint job cancelled")
+	if err != nil {
+		logger.Error("Failed to cancel build:", err)
+		return err
+	}
+	return nil
+}
diff --git a/pinpoint/go/workflows/internal/options.go b/pinpoint/go/workflows/internal/options.go
index 599f5f2..0914da8 100644
--- a/pinpoint/go/workflows/internal/options.go
+++ b/pinpoint/go/workflows/internal/options.go
@@ -4,6 +4,7 @@
 	"time"
 
 	"go.skia.org/infra/pinpoint/go/run_benchmark"
+	"go.temporal.io/api/enums/v1"
 	"go.temporal.io/sdk/temporal"
 	"go.temporal.io/sdk/workflow"
 )
@@ -48,6 +49,8 @@
 		RetryPolicy: &temporal.RetryPolicy{
 			MaximumAttempts: 4,
 		},
+		// When the parent workflow got cancelled, cancellation will be requested of the child workflow
+		ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
 	}
 
 	// Acitivity option for Building Chrome.
diff --git a/pinpoint/go/workflows/internal/run_benchmark.go b/pinpoint/go/workflows/internal/run_benchmark.go
index d5823b9..5d08907 100644
--- a/pinpoint/go/workflows/internal/run_benchmark.go
+++ b/pinpoint/go/workflows/internal/run_benchmark.go
@@ -2,6 +2,7 @@
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"time"
 
@@ -53,6 +54,23 @@
 
 	var rba RunBenchmarkActivity
 	var taskID string
+	var state run_benchmark.State
+	defer func() {
+		// ErrCanceled is the error returned by Context.Err when the context is canceled
+		// This logic ensures cleanup only happens if there is a Cancellation error
+		if !errors.Is(ctx.Err(), workflow.ErrCanceled) {
+			return
+		}
+		// For the Workflow to execute an Activity after it receives a Cancellation Request
+		// It has to get a new disconnected context
+		newCtx, _ := workflow.NewDisconnectedContext(ctx)
+
+		err := workflow.ExecuteActivity(newCtx, rba.CleanupBenchmarkRunActivity, taskID, state).Get(ctx, nil)
+		if err != nil {
+			logger.Error("CleanupBenchmarkRunActivity failed", err)
+		}
+	}()
+
 	if err := workflow.ExecuteActivity(ctx, rba.ScheduleTaskActivity, p).Get(ctx, &taskID); err != nil {
 		logger.Error("Failed to schedule task:", err)
 		return nil, skerr.Wrap(err)
@@ -62,7 +80,6 @@
 	// because swarming tasks can be pending for hours while swarming tasks
 	// generally finish in ~10 min
 	// TODO(sunxiaodi@): handle NO_RESOURCE retry case in WaitTaskPendingActivity
-	var state run_benchmark.State
 	if err := workflow.ExecuteActivity(pendingCtx, rba.WaitTaskPendingActivity, taskID).Get(pendingCtx, &state); err != nil {
 		logger.Error("Failed to poll pending task ID:", err)
 		return nil, skerr.Wrap(err)
@@ -200,3 +217,23 @@
 
 	return cas, nil
 }
+
+// CleanupActivity wraps run_benchmark.Cancel
+func (rba *RunBenchmarkActivity) CleanupBenchmarkRunActivity(ctx context.Context, taskID string, state run_benchmark.State) error {
+	if len(taskID) == 0 || state.IsTaskFinished() {
+		return nil
+	}
+
+	logger := activity.GetLogger(ctx)
+	sc, err := backends.NewSwarmingClient(ctx, backends.DefaultSwarmingServiceAddress)
+	if err != nil {
+		logger.Error("Failed to connect to swarming client:", err)
+		return skerr.Wrap(err)
+	}
+
+	err = run_benchmark.Cancel(ctx, sc, taskID)
+	if err != nil {
+		return err
+	}
+	return nil
+}