[cabe] cli: refactor analyze.go and check.go code into common.go
Change-Id: I077ba335e94a01713f20482b5442b263aebfc31c
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/703506
Commit-Queue: Sean McCullough <seanmccullough@google.com>
Reviewed-by: Yuan Huang <yuanhuang@google.com>
diff --git a/cabe/go/cmd/cabe/cli/BUILD.bazel b/cabe/go/cmd/cabe/cli/BUILD.bazel
index f674656..76d23f7 100644
--- a/cabe/go/cmd/cabe/cli/BUILD.bazel
+++ b/cabe/go/cmd/cabe/cli/BUILD.bazel
@@ -16,6 +16,8 @@
"//cabe/go/perfresults",
"//cabe/go/replaybackends",
"//go/sklog",
+ "//go/swarming",
+ "@com_github_bazelbuild_remote_apis_sdks//go/pkg/client:go_default_library",
"@com_github_olekukonko_tablewriter//:tablewriter",
"@com_github_urfave_cli_v2//:cli",
"@org_chromium_go_luci//common/api/swarming/swarming/v1:swarming",
diff --git a/cabe/go/cmd/cabe/cli/analyze.go b/cabe/go/cmd/cabe/cli/analyze.go
index 145575f..0c0cdaa 100644
--- a/cabe/go/cmd/cabe/cli/analyze.go
+++ b/cabe/go/cmd/cabe/cli/analyze.go
@@ -1,17 +1,11 @@
package cli
import (
- "context"
"fmt"
"os"
"strings"
- "time"
- "go.chromium.org/luci/common/api/swarming/swarming/v1"
"go.skia.org/infra/cabe/go/analyzer"
- "go.skia.org/infra/cabe/go/backends"
- "go.skia.org/infra/cabe/go/perfresults"
- "go.skia.org/infra/cabe/go/replaybackends"
"go.skia.org/infra/go/sklog"
"github.com/olekukonko/tablewriter"
@@ -33,57 +27,20 @@
Usage: "cabe analyze -- --pinpoint-job <pinpoint-job>",
Flags: cmd.flags(),
Action: cmd.action,
+ After: cmd.cleanup,
}
}
// action runs the analyzer process locally.
func (cmd *analyzeCmd) action(cliCtx *cli.Context) error {
ctx := cliCtx.Context
-
- rbeClients, err := backends.DialRBECAS(ctx)
- if err != nil {
- sklog.Fatalf("dialing RBE-CAS backends: %v", err)
+ if err := cmd.dialBackends(ctx); err != nil {
return err
}
- swarmingClient, err := backends.DialSwarming(ctx)
- if err != nil {
- sklog.Fatalf("dialing swarming: %v", err)
- return err
- }
-
- var casResultReader = func(c context.Context, casInstance, digest string) (map[string]perfresults.PerfResults, error) {
- rbeClient := rbeClients[casInstance]
- return backends.FetchBenchmarkJSON(ctx, rbeClient, digest)
- }
-
- var swarmingTaskReader = func(ctx context.Context, pinpointJobID string) ([]*swarming.SwarmingRpcsTaskRequestMetadata, error) {
- tasksResp, err := swarmingClient.ListTasks(ctx, time.Now().AddDate(0, 0, -56), time.Now(), []string{pinpointSwarmingTagName + ":" + pinpointJobID}, "")
- if err != nil {
- sklog.Fatalf("list task results: %v", err)
- return nil, err
- }
- return tasksResp, nil
- }
-
- if cmd.replayFromZip != "" {
- replayBackends := replaybackends.FromZipFile(cmd.replayFromZip, "blank")
- casResultReader = replayBackends.CASResultReader
- swarmingTaskReader = replayBackends.SwarmingTaskReader
- } else if cmd.recordToZip != "" {
- replayBackends := replaybackends.ToZipFile(cmd.recordToZip, rbeClients, swarmingClient)
- defer func() {
- if err := replayBackends.Close(); err != nil {
- sklog.Fatalf("closing replay backends: %v", err)
- }
- }()
- casResultReader = replayBackends.CASResultReader
- swarmingTaskReader = replayBackends.SwarmingTaskReader
- }
-
var analyzerOpts = []analyzer.Options{
- analyzer.WithCASResultReader(casResultReader),
- analyzer.WithSwarmingTaskReader(swarmingTaskReader),
+ analyzer.WithCASResultReader(cmd.casResultReader),
+ analyzer.WithSwarmingTaskReader(cmd.swarmingTaskReader),
}
a := analyzer.New(cmd.pinpointJobID, analyzerOpts...)
diff --git a/cabe/go/cmd/cabe/cli/check.go b/cabe/go/cmd/cabe/cli/check.go
index 28fdaff..2a294c7 100644
--- a/cabe/go/cmd/cabe/cli/check.go
+++ b/cabe/go/cmd/cabe/cli/check.go
@@ -1,18 +1,12 @@
package cli
import (
- "context"
"fmt"
- "time"
"github.com/urfave/cli/v2"
"google.golang.org/protobuf/encoding/prototext"
- "go.chromium.org/luci/common/api/swarming/swarming/v1"
"go.skia.org/infra/cabe/go/analyzer"
- "go.skia.org/infra/cabe/go/backends"
- "go.skia.org/infra/cabe/go/perfresults"
- "go.skia.org/infra/cabe/go/replaybackends"
"go.skia.org/infra/go/sklog"
)
@@ -31,57 +25,20 @@
Usage: "cabe check --pinpoint-job <pinpoint-job>",
Flags: cmd.flags(),
Action: cmd.action,
+ After: cmd.cleanup,
}
}
// action runs diagnostic checks on an experiment.
func (cmd *checkCmd) action(cliCtx *cli.Context) error {
ctx := cliCtx.Context
-
- rbeClients, err := backends.DialRBECAS(ctx)
- if err != nil {
- sklog.Fatalf("dialing RBE-CAS backends: %v", err)
+ if err := cmd.dialBackends(ctx); err != nil {
return err
}
- swarmingClient, err := backends.DialSwarming(ctx)
- if err != nil {
- sklog.Fatalf("dialing swarming: %v", err)
- return err
- }
-
- var casResultReader = func(c context.Context, casInstance, digest string) (map[string]perfresults.PerfResults, error) {
- rbeClient := rbeClients[casInstance]
- return backends.FetchBenchmarkJSON(ctx, rbeClient, digest)
- }
-
- var swarmingTaskReader = func(ctx context.Context, pinpointJobID string) ([]*swarming.SwarmingRpcsTaskRequestMetadata, error) {
- tasksResp, err := swarmingClient.ListTasks(ctx, time.Now().AddDate(0, 0, -56), time.Now(), []string{pinpointSwarmingTagName + ":" + pinpointJobID}, "")
- if err != nil {
- sklog.Fatalf("list task results: %v", err)
- return nil, err
- }
- return tasksResp, nil
- }
-
- if cmd.replayFromZip != "" {
- replayBackends := replaybackends.FromZipFile(cmd.replayFromZip, "blank")
- casResultReader = replayBackends.CASResultReader
- swarmingTaskReader = replayBackends.SwarmingTaskReader
- } else if cmd.recordToZip != "" {
- replayBackends := replaybackends.ToZipFile(cmd.recordToZip, rbeClients, swarmingClient)
- defer func() {
- if err := replayBackends.Close(); err != nil {
- sklog.Fatalf("closing replay backends: %v", err)
- }
- }()
- casResultReader = replayBackends.CASResultReader
- swarmingTaskReader = replayBackends.SwarmingTaskReader
- }
-
var analyzerOpts = []analyzer.Options{
- analyzer.WithCASResultReader(casResultReader),
- analyzer.WithSwarmingTaskReader(swarmingTaskReader),
+ analyzer.WithCASResultReader(cmd.casResultReader),
+ analyzer.WithSwarmingTaskReader(cmd.swarmingTaskReader),
}
a := analyzer.New(cmd.pinpointJobID, analyzerOpts...)
diff --git a/cabe/go/cmd/cabe/cli/common.go b/cabe/go/cmd/cabe/cli/common.go
index 674eb3e..eef8fdd 100644
--- a/cabe/go/cmd/cabe/cli/common.go
+++ b/cabe/go/cmd/cabe/cli/common.go
@@ -1,13 +1,24 @@
package cli
import (
+ "context"
"fmt"
+ "time"
+ rbeclient "github.com/bazelbuild/remote-apis-sdks/go/pkg/client"
"github.com/urfave/cli/v2"
+ swarmingapi "go.chromium.org/luci/common/api/swarming/swarming/v1"
+
+ "go.skia.org/infra/cabe/go/backends"
+ "go.skia.org/infra/cabe/go/perfresults"
+ "go.skia.org/infra/cabe/go/replaybackends"
+ "go.skia.org/infra/go/sklog"
+ "go.skia.org/infra/go/swarming"
)
const (
pinpointSwarmingTagName = "pinpoint_job_id"
+ rbeCASTTLDays = 56
)
// flag names
@@ -21,6 +32,62 @@
pinpointJobID string
recordToZip string
replayFromZip string
+
+ replayBackends *replaybackends.ReplayBackends
+
+ swarmingClient swarming.ApiClient
+ rbeClients map[string]*rbeclient.Client
+
+ swarmingTaskReader backends.SwarmingTaskReader
+ casResultReader backends.CASResultReader
+}
+
+func (a *commonCmd) readCASResultFromRBEAPI(ctx context.Context, instance, digest string) (map[string]perfresults.PerfResults, error) {
+ rbeClient, ok := a.rbeClients[instance]
+ if !ok {
+ return nil, fmt.Errorf("no RBE client for instance %s", instance)
+ }
+
+ return backends.FetchBenchmarkJSON(ctx, rbeClient, digest)
+}
+
+func (a *commonCmd) readSwarmingTasksFromAPI(ctx context.Context, pinpointJobID string) ([]*swarmingapi.SwarmingRpcsTaskRequestMetadata, error) {
+ tasksResp, err := a.swarmingClient.ListTasks(ctx, time.Now().AddDate(0, 0, -rbeCASTTLDays), time.Now(), []string{"pinpoint_job_id:" + pinpointJobID}, "")
+ if err != nil {
+ sklog.Fatalf("list task results: %v", err)
+ return nil, err
+ }
+ return tasksResp, nil
+}
+
+func (cmd *commonCmd) dialBackends(ctx context.Context) error {
+ rbeClients, err := backends.DialRBECAS(ctx)
+ if err != nil {
+ sklog.Fatalf("dialing RBE-CAS backends: %v", err)
+ return err
+ }
+ cmd.rbeClients = rbeClients
+
+ swarmingClient, err := backends.DialSwarming(ctx)
+ if err != nil {
+ sklog.Fatalf("dialing swarming: %v", err)
+ return err
+ }
+ cmd.swarmingClient = swarmingClient
+
+ cmd.swarmingTaskReader = cmd.readSwarmingTasksFromAPI
+ cmd.casResultReader = cmd.readCASResultFromRBEAPI
+
+ if cmd.replayFromZip != "" {
+ cmd.replayBackends = replaybackends.FromZipFile(cmd.replayFromZip, "blank")
+ cmd.casResultReader = cmd.replayBackends.CASResultReader
+ cmd.swarmingTaskReader = cmd.replayBackends.SwarmingTaskReader
+ } else if cmd.recordToZip != "" {
+ cmd.replayBackends = replaybackends.ToZipFile(cmd.recordToZip, rbeClients, swarmingClient)
+ cmd.casResultReader = cmd.replayBackends.CASResultReader
+ cmd.swarmingTaskReader = cmd.replayBackends.SwarmingTaskReader
+ }
+ return nil
}
func (cmd *commonCmd) flags() []cli.Flag {
@@ -56,3 +123,10 @@
}
return []cli.Flag{pinpointJobIDFlag, replayFromZipFlag, recordToZipFlag}
}
+
+func (cmd *commonCmd) cleanup(cliCtx *cli.Context) error {
+ if cmd.replayBackends != nil {
+ return cmd.replayBackends.Close()
+ }
+ return nil
+}
diff --git a/cabe/go/replaybackends/replaybackends.go b/cabe/go/replaybackends/replaybackends.go
index 91d1889..897918f 100644
--- a/cabe/go/replaybackends/replaybackends.go
+++ b/cabe/go/replaybackends/replaybackends.go
@@ -156,10 +156,10 @@
ret.SwarmingTaskReader = func(ctx context.Context, pinpointJobID string) ([]*swarmingapi.SwarmingRpcsTaskRequestMetadata, error) {
var start, end time.Time
end = time.Now()
- start = time.Now().Add(-time.Hour * 24 * 14) // past two weeks
+ start = time.Now().Add(-time.Hour * 24 * 52) // past 52 days
state := "" // any state
- sklog.Infof("getting task metadata from swarming service")
+ sklog.Infof("getting task metadata from swarming service, pinpoint_job_id: %v", pinpointJobID)
rmd, err := swarmingClient.ListTasks(ctx, start, end, []string{"pinpoint_job_id:" + pinpointJobID}, state)
if err != nil {
sklog.Errorf("getting swarming tasks: %v", err)
@@ -215,7 +215,9 @@
func (r *ReplayBackends) Close() error {
sklog.Infof("closing replay backends")
if r.zipWriter == nil {
- return fmt.Errorf("cannot close replay backends without a writer")
+ sklog.Infof("no zipWriter to close")
+
+ return nil
}
if err := r.zipWriter.Close(); err != nil {
return err