Make run_on_swarming_bots use the Swarming API
Change-Id: I57127d579095d7244516c2f6824c903234228090
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/254417
Commit-Queue: Eric Boren <borenet@google.com>
Reviewed-by: Ravi Mistry <rmistry@google.com>
diff --git a/go/swarming/apiclient.go b/go/swarming/apiclient.go
index f2400c7..d839a8b 100644
--- a/go/swarming/apiclient.go
+++ b/go/swarming/apiclient.go
@@ -11,6 +11,7 @@
"time"
swarming "go.chromium.org/luci/common/api/swarming/swarming/v1"
+ "go.skia.org/infra/go/cipd"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
)
@@ -634,3 +635,18 @@
func Completed(t *swarming.SwarmingRpcsTaskRequestMetadata) (time.Time, error) {
return ParseTimestamp(t.TaskResult.CompletedTs)
}
+
+// ConvertCIPDInput converts a slice of cipd.Package to a SwarmingRpcsCipdInput.
+func ConvertCIPDInput(pkgs []*cipd.Package) *swarming.SwarmingRpcsCipdInput {
+ rv := &swarming.SwarmingRpcsCipdInput{
+ Packages: []*swarming.SwarmingRpcsCipdPackage{},
+ }
+ for _, pkg := range pkgs {
+ rv.Packages = append(rv.Packages, &swarming.SwarmingRpcsCipdPackage{
+ PackageName: pkg.Name,
+ Path: pkg.Path,
+ Version: pkg.Version,
+ })
+ }
+ return rv
+}
diff --git a/go/util/util.go b/go/util/util.go
index 402f04d..82fce05 100644
--- a/go/util/util.go
+++ b/go/util/util.go
@@ -1095,6 +1095,16 @@
})
}
+// CopyFile copies the given src file to dst.
+func CopyFile(src, dst string) error {
+ return WithReadFile(src, func(r io.Reader) error {
+ return WithWriteFile(dst, func(w io.Writer) error {
+ _, err := io.Copy(w, r)
+ return err
+ })
+ })
+}
+
// IterTimeChunks calls the given function for each time chunk of the given
// duration within the given time range.
func IterTimeChunks(start, end time.Time, chunkSize time.Duration, fn func(time.Time, time.Time) error) error {
diff --git a/scripts/run_on_swarming_bots/run_on_swarming_bots.go b/scripts/run_on_swarming_bots/run_on_swarming_bots.go
index 245481f..d2a2657 100644
--- a/scripts/run_on_swarming_bots/run_on_swarming_bots.go
+++ b/scripts/run_on_swarming_bots/run_on_swarming_bots.go
@@ -4,6 +4,7 @@
"context"
"flag"
"fmt"
+ "io"
"io/ioutil"
"os"
"path"
@@ -13,12 +14,15 @@
"time"
"github.com/google/uuid"
+ swarming_api "go.chromium.org/luci/common/api/swarming/swarming/v1"
"go.skia.org/infra/go/auth"
+ "go.skia.org/infra/go/cipd"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/isolate"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/swarming"
+ "go.skia.org/infra/go/util"
)
/*
@@ -30,9 +34,6 @@
TMP_ISOLATE_FILE_NAME = "script.isolate"
TMP_ISOLATE_FILE_CONTENTS = `{
'variables': {
- 'command': [
- 'python', '-u', '%s',
- ],
'files': [
'%s',
],
@@ -121,65 +122,106 @@
return
}
- swarmClient, err := swarming.NewSwarmingClient(ctx, *workdir, swarmingServer, isolateServer, "")
- if err != nil {
- sklog.Fatal(err)
- }
-
// Copy the script to the workdir.
- dstScript := path.Join(*workdir, scriptName)
- contents, err := ioutil.ReadFile(*script)
+ isolateDir, err := ioutil.TempDir(*workdir, "run_on_swarming_bots")
if err != nil {
sklog.Fatal(err)
}
- if err := ioutil.WriteFile(dstScript, contents, 0644); err != nil {
+ defer util.RemoveAll(isolateDir)
+ dstScript := path.Join(isolateDir, scriptName)
+ if err := util.CopyFile(*script, dstScript); err != nil {
sklog.Fatal(err)
}
// Create an isolate file.
- isolateFile := path.Join(*workdir, TMP_ISOLATE_FILE_NAME)
- if err := ioutil.WriteFile(isolateFile, []byte(fmt.Sprintf(TMP_ISOLATE_FILE_CONTENTS, scriptName, scriptName)), 0644); err != nil {
+ isolateFile := path.Join(isolateDir, TMP_ISOLATE_FILE_NAME)
+ if err := util.WithWriteFile(isolateFile, func(w io.Writer) error {
+ _, err := w.Write([]byte(fmt.Sprintf(TMP_ISOLATE_FILE_CONTENTS, scriptName)))
+ return err
+ }); err != nil {
sklog.Fatal(err)
}
// Upload to isolate server.
- isolated, err := swarmClient.CreateIsolatedGenJSON(isolateFile, *workdir, "linux", *taskName, map[string]string{}, []string{})
+ isolateClient, err := isolate.NewClient(*workdir, isolateServer)
if err != nil {
sklog.Fatal(err)
}
- m, err := swarmClient.BatchArchiveTargets(ctx, []string{isolated}, 5*time.Minute)
+ isolateTask := &isolate.Task{
+ BaseDir: isolateDir,
+ Blacklist: isolate.DEFAULT_BLACKLIST,
+ IsolateFile: isolateFile,
+ }
+ hashes, _, err := isolateClient.IsolateTasks(ctx, []*isolate.Task{isolateTask})
if err != nil {
sklog.Fatal(err)
}
- group := fmt.Sprintf("%s_%s", *taskName, uuid.New())
- tags := map[string]string{
- "group": group,
- }
-
- var wg sync.WaitGroup
// Trigger the task on each bot.
+ cmd := []string{"python", "-u", scriptName}
+ group := fmt.Sprintf("%s_%s", *taskName, uuid.New())
+ tags := []string{
+ fmt.Sprintf("group:%s", group),
+ }
+ var wg sync.WaitGroup
for _, bot := range bots {
if !matchesAny(bot.BotId, includeRegs) {
sklog.Debugf("Skipping %s because it isn't in the whitelist", bot.BotId)
continue
}
- botDims := map[string][]string{}
- for _, d := range bot.Dimensions {
- botDims[d.Key] = d.Value
- }
wg.Add(1)
- go func(id string, botDims map[string][]string) {
+ go func(id string) {
defer wg.Done()
- dims := map[string]string{
- "pool": *pool,
- "id": id,
+ dims := []*swarming_api.SwarmingRpcsStringPair{
+ {
+ Key: "pool",
+ Value: *pool,
+ },
+ {
+ Key: "id",
+ Value: id,
+ },
}
sklog.Infof("Triggering on %s", id)
- if _, err := swarmClient.TriggerSwarmingTasks(ctx, m, dims, tags, []string{}, swarming.HIGHEST_PRIORITY, 120*time.Minute, 120*time.Minute, 120*time.Minute, false, false, ""); err != nil {
+ req := &swarming_api.SwarmingRpcsNewTaskRequest{
+ ExpirationSecs: int64((120 * time.Minute).Seconds()),
+ Name: *taskName,
+ Priority: swarming.HIGHEST_PRIORITY,
+ Properties: &swarming_api.SwarmingRpcsTaskProperties{
+ Caches: []*swarming_api.SwarmingRpcsCacheEntry{
+ {
+ Name: "vpython",
+ Path: "cache/vpython",
+ },
+ },
+ CipdInput: swarming.ConvertCIPDInput(cipd.PkgsPython),
+ Command: cmd,
+ Dimensions: dims,
+ EnvPrefixes: []*swarming_api.SwarmingRpcsStringListPair{
+ {
+ Key: "PATH",
+ Value: []string{"cipd_bin_packages", "cipd_bin_packages/bin"},
+ },
+ {
+ Key: "VPYTHON_VIRTUALENV_ROOT",
+ Value: []string{"cache/vpython"},
+ },
+ },
+ ExecutionTimeoutSecs: int64((120 * time.Minute).Seconds()),
+ Idempotent: false,
+ InputsRef: &swarming_api.SwarmingRpcsFilesRef{
+ Isolated: hashes[0],
+ Isolatedserver: isolateServer,
+ Namespace: isolate.DEFAULT_NAMESPACE,
+ },
+ IoTimeoutSecs: int64((120 * time.Minute).Seconds()),
+ },
+ Tags: tags,
+ }
+ if _, err := swarmApi.TriggerTask(req); err != nil {
sklog.Fatal(err)
}
- }(bot.BotId, botDims)
+ }(bot.BotId)
}
wg.Wait()