Make run_on_swarming_bots use the Swarming API

Change-Id: I57127d579095d7244516c2f6824c903234228090
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/254417
Commit-Queue: Eric Boren <borenet@google.com>
Reviewed-by: Ravi Mistry <rmistry@google.com>
diff --git a/go/swarming/apiclient.go b/go/swarming/apiclient.go
index f2400c7..d839a8b 100644
--- a/go/swarming/apiclient.go
+++ b/go/swarming/apiclient.go
@@ -11,6 +11,7 @@
 	"time"
 
 	swarming "go.chromium.org/luci/common/api/swarming/swarming/v1"
+	"go.skia.org/infra/go/cipd"
 	"go.skia.org/infra/go/sklog"
 	"go.skia.org/infra/go/util"
 )
@@ -634,3 +635,18 @@
 func Completed(t *swarming.SwarmingRpcsTaskRequestMetadata) (time.Time, error) {
 	return ParseTimestamp(t.TaskResult.CompletedTs)
 }
+
+// ConvertCIPDInput converts a slice of cipd.Package to a SwarmingRpcsCipdInput.
+func ConvertCIPDInput(pkgs []*cipd.Package) *swarming.SwarmingRpcsCipdInput {
+	rv := &swarming.SwarmingRpcsCipdInput{
+		Packages: []*swarming.SwarmingRpcsCipdPackage{},
+	}
+	for _, pkg := range pkgs {
+		rv.Packages = append(rv.Packages, &swarming.SwarmingRpcsCipdPackage{
+			PackageName: pkg.Name,
+			Path:        pkg.Path,
+			Version:     pkg.Version,
+		})
+	}
+	return rv
+}
diff --git a/go/util/util.go b/go/util/util.go
index 402f04d..82fce05 100644
--- a/go/util/util.go
+++ b/go/util/util.go
@@ -1095,6 +1095,16 @@
 	})
 }
 
+// CopyFile copies the given src file to dst.
+func CopyFile(src, dst string) error {
+	return WithReadFile(src, func(r io.Reader) error {
+		return WithWriteFile(dst, func(w io.Writer) error {
+			_, err := io.Copy(w, r)
+			return err
+		})
+	})
+}
+
 // IterTimeChunks calls the given function for each time chunk of the given
 // duration within the given time range.
 func IterTimeChunks(start, end time.Time, chunkSize time.Duration, fn func(time.Time, time.Time) error) error {
diff --git a/scripts/run_on_swarming_bots/run_on_swarming_bots.go b/scripts/run_on_swarming_bots/run_on_swarming_bots.go
index 245481f..d2a2657 100644
--- a/scripts/run_on_swarming_bots/run_on_swarming_bots.go
+++ b/scripts/run_on_swarming_bots/run_on_swarming_bots.go
@@ -4,6 +4,7 @@
 	"context"
 	"flag"
 	"fmt"
+	"io"
 	"io/ioutil"
 	"os"
 	"path"
@@ -13,12 +14,15 @@
 	"time"
 
 	"github.com/google/uuid"
+	swarming_api "go.chromium.org/luci/common/api/swarming/swarming/v1"
 	"go.skia.org/infra/go/auth"
+	"go.skia.org/infra/go/cipd"
 	"go.skia.org/infra/go/common"
 	"go.skia.org/infra/go/httputils"
 	"go.skia.org/infra/go/isolate"
 	"go.skia.org/infra/go/sklog"
 	"go.skia.org/infra/go/swarming"
+	"go.skia.org/infra/go/util"
 )
 
 /*
@@ -30,9 +34,6 @@
 	TMP_ISOLATE_FILE_NAME     = "script.isolate"
 	TMP_ISOLATE_FILE_CONTENTS = `{
   'variables': {
-    'command': [
-      'python', '-u', '%s',
-    ],
     'files': [
       '%s',
     ],
@@ -121,65 +122,106 @@
 		return
 	}
 
-	swarmClient, err := swarming.NewSwarmingClient(ctx, *workdir, swarmingServer, isolateServer, "")
-	if err != nil {
-		sklog.Fatal(err)
-	}
-
 	// Copy the script to the workdir.
-	dstScript := path.Join(*workdir, scriptName)
-	contents, err := ioutil.ReadFile(*script)
+	isolateDir, err := ioutil.TempDir(*workdir, "run_on_swarming_bots")
 	if err != nil {
 		sklog.Fatal(err)
 	}
-	if err := ioutil.WriteFile(dstScript, contents, 0644); err != nil {
+	defer util.RemoveAll(isolateDir)
+	dstScript := path.Join(isolateDir, scriptName)
+	if err := util.CopyFile(*script, dstScript); err != nil {
 		sklog.Fatal(err)
 	}
 
 	// Create an isolate file.
-	isolateFile := path.Join(*workdir, TMP_ISOLATE_FILE_NAME)
-	if err := ioutil.WriteFile(isolateFile, []byte(fmt.Sprintf(TMP_ISOLATE_FILE_CONTENTS, scriptName, scriptName)), 0644); err != nil {
+	isolateFile := path.Join(isolateDir, TMP_ISOLATE_FILE_NAME)
+	if err := util.WithWriteFile(isolateFile, func(w io.Writer) error {
+		_, err := w.Write([]byte(fmt.Sprintf(TMP_ISOLATE_FILE_CONTENTS, scriptName)))
+		return err
+	}); err != nil {
 		sklog.Fatal(err)
 	}
 
 	// Upload to isolate server.
-	isolated, err := swarmClient.CreateIsolatedGenJSON(isolateFile, *workdir, "linux", *taskName, map[string]string{}, []string{})
+	isolateClient, err := isolate.NewClient(*workdir, isolateServer)
 	if err != nil {
 		sklog.Fatal(err)
 	}
-	m, err := swarmClient.BatchArchiveTargets(ctx, []string{isolated}, 5*time.Minute)
+	isolateTask := &isolate.Task{
+		BaseDir:     isolateDir,
+		Blacklist:   isolate.DEFAULT_BLACKLIST,
+		IsolateFile: isolateFile,
+	}
+	hashes, _, err := isolateClient.IsolateTasks(ctx, []*isolate.Task{isolateTask})
 	if err != nil {
 		sklog.Fatal(err)
 	}
-	group := fmt.Sprintf("%s_%s", *taskName, uuid.New())
-	tags := map[string]string{
-		"group": group,
-	}
-
-	var wg sync.WaitGroup
 
 	// Trigger the task on each bot.
+	cmd := []string{"python", "-u", scriptName}
+	group := fmt.Sprintf("%s_%s", *taskName, uuid.New())
+	tags := []string{
+		fmt.Sprintf("group:%s", group),
+	}
+	var wg sync.WaitGroup
 	for _, bot := range bots {
 		if !matchesAny(bot.BotId, includeRegs) {
 			sklog.Debugf("Skipping %s because it isn't in the whitelist", bot.BotId)
 			continue
 		}
-		botDims := map[string][]string{}
-		for _, d := range bot.Dimensions {
-			botDims[d.Key] = d.Value
-		}
 		wg.Add(1)
-		go func(id string, botDims map[string][]string) {
+		go func(id string) {
 			defer wg.Done()
-			dims := map[string]string{
-				"pool": *pool,
-				"id":   id,
+			dims := []*swarming_api.SwarmingRpcsStringPair{
+				{
+					Key:   "pool",
+					Value: *pool,
+				},
+				{
+					Key:   "id",
+					Value: id,
+				},
 			}
 			sklog.Infof("Triggering on %s", id)
-			if _, err := swarmClient.TriggerSwarmingTasks(ctx, m, dims, tags, []string{}, swarming.HIGHEST_PRIORITY, 120*time.Minute, 120*time.Minute, 120*time.Minute, false, false, ""); err != nil {
+			req := &swarming_api.SwarmingRpcsNewTaskRequest{
+				ExpirationSecs: int64((120 * time.Minute).Seconds()),
+				Name:           *taskName,
+				Priority:       swarming.HIGHEST_PRIORITY,
+				Properties: &swarming_api.SwarmingRpcsTaskProperties{
+					Caches: []*swarming_api.SwarmingRpcsCacheEntry{
+						{
+							Name: "vpython",
+							Path: "cache/vpython",
+						},
+					},
+					CipdInput:  swarming.ConvertCIPDInput(cipd.PkgsPython),
+					Command:    cmd,
+					Dimensions: dims,
+					EnvPrefixes: []*swarming_api.SwarmingRpcsStringListPair{
+						{
+							Key:   "PATH",
+							Value: []string{"cipd_bin_packages", "cipd_bin_packages/bin"},
+						},
+						{
+							Key:   "VPYTHON_VIRTUALENV_ROOT",
+							Value: []string{"cache/vpython"},
+						},
+					},
+					ExecutionTimeoutSecs: int64((120 * time.Minute).Seconds()),
+					Idempotent:           false,
+					InputsRef: &swarming_api.SwarmingRpcsFilesRef{
+						Isolated:       hashes[0],
+						Isolatedserver: isolateServer,
+						Namespace:      isolate.DEFAULT_NAMESPACE,
+					},
+					IoTimeoutSecs: int64((120 * time.Minute).Seconds()),
+				},
+				Tags: tags,
+			}
+			if _, err := swarmApi.TriggerTask(req); err != nil {
 				sklog.Fatal(err)
 			}
-		}(bot.BotId, botDims)
+		}(bot.BotId)
 	}
 
 	wg.Wait()