[cabe] cli: refactor analyze.go and check.go code into common.go

Change-Id: I077ba335e94a01713f20482b5442b263aebfc31c
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/703506
Commit-Queue: Sean McCullough <seanmccullough@google.com>
Reviewed-by: Yuan Huang <yuanhuang@google.com>
diff --git a/cabe/go/cmd/cabe/cli/BUILD.bazel b/cabe/go/cmd/cabe/cli/BUILD.bazel
index f674656..76d23f7 100644
--- a/cabe/go/cmd/cabe/cli/BUILD.bazel
+++ b/cabe/go/cmd/cabe/cli/BUILD.bazel
@@ -16,6 +16,8 @@
         "//cabe/go/perfresults",
         "//cabe/go/replaybackends",
         "//go/sklog",
+        "//go/swarming",
+        "@com_github_bazelbuild_remote_apis_sdks//go/pkg/client:go_default_library",
         "@com_github_olekukonko_tablewriter//:tablewriter",
         "@com_github_urfave_cli_v2//:cli",
         "@org_chromium_go_luci//common/api/swarming/swarming/v1:swarming",
diff --git a/cabe/go/cmd/cabe/cli/analyze.go b/cabe/go/cmd/cabe/cli/analyze.go
index 145575f..0c0cdaa 100644
--- a/cabe/go/cmd/cabe/cli/analyze.go
+++ b/cabe/go/cmd/cabe/cli/analyze.go
@@ -1,17 +1,11 @@
 package cli
 
 import (
-	"context"
 	"fmt"
 	"os"
 	"strings"
-	"time"
 
-	"go.chromium.org/luci/common/api/swarming/swarming/v1"
 	"go.skia.org/infra/cabe/go/analyzer"
-	"go.skia.org/infra/cabe/go/backends"
-	"go.skia.org/infra/cabe/go/perfresults"
-	"go.skia.org/infra/cabe/go/replaybackends"
 	"go.skia.org/infra/go/sklog"
 
 	"github.com/olekukonko/tablewriter"
@@ -33,57 +27,20 @@
 		Usage:       "cabe analyze -- --pinpoint-job <pinpoint-job>",
 		Flags:       cmd.flags(),
 		Action:      cmd.action,
+		After:       cmd.cleanup,
 	}
 }
 
 // action runs the analyzer process locally.
 func (cmd *analyzeCmd) action(cliCtx *cli.Context) error {
 	ctx := cliCtx.Context
-
-	rbeClients, err := backends.DialRBECAS(ctx)
-	if err != nil {
-		sklog.Fatalf("dialing RBE-CAS backends: %v", err)
+	if err := cmd.dialBackends(ctx); err != nil {
 		return err
 	}
 
-	swarmingClient, err := backends.DialSwarming(ctx)
-	if err != nil {
-		sklog.Fatalf("dialing swarming: %v", err)
-		return err
-	}
-
-	var casResultReader = func(c context.Context, casInstance, digest string) (map[string]perfresults.PerfResults, error) {
-		rbeClient := rbeClients[casInstance]
-		return backends.FetchBenchmarkJSON(ctx, rbeClient, digest)
-	}
-
-	var swarmingTaskReader = func(ctx context.Context, pinpointJobID string) ([]*swarming.SwarmingRpcsTaskRequestMetadata, error) {
-		tasksResp, err := swarmingClient.ListTasks(ctx, time.Now().AddDate(0, 0, -56), time.Now(), []string{pinpointSwarmingTagName + ":" + pinpointJobID}, "")
-		if err != nil {
-			sklog.Fatalf("list task results: %v", err)
-			return nil, err
-		}
-		return tasksResp, nil
-	}
-
-	if cmd.replayFromZip != "" {
-		replayBackends := replaybackends.FromZipFile(cmd.replayFromZip, "blank")
-		casResultReader = replayBackends.CASResultReader
-		swarmingTaskReader = replayBackends.SwarmingTaskReader
-	} else if cmd.recordToZip != "" {
-		replayBackends := replaybackends.ToZipFile(cmd.recordToZip, rbeClients, swarmingClient)
-		defer func() {
-			if err := replayBackends.Close(); err != nil {
-				sklog.Fatalf("closing replay backends: %v", err)
-			}
-		}()
-		casResultReader = replayBackends.CASResultReader
-		swarmingTaskReader = replayBackends.SwarmingTaskReader
-	}
-
 	var analyzerOpts = []analyzer.Options{
-		analyzer.WithCASResultReader(casResultReader),
-		analyzer.WithSwarmingTaskReader(swarmingTaskReader),
+		analyzer.WithCASResultReader(cmd.casResultReader),
+		analyzer.WithSwarmingTaskReader(cmd.swarmingTaskReader),
 	}
 
 	a := analyzer.New(cmd.pinpointJobID, analyzerOpts...)
diff --git a/cabe/go/cmd/cabe/cli/check.go b/cabe/go/cmd/cabe/cli/check.go
index 28fdaff..2a294c7 100644
--- a/cabe/go/cmd/cabe/cli/check.go
+++ b/cabe/go/cmd/cabe/cli/check.go
@@ -1,18 +1,12 @@
 package cli
 
 import (
-	"context"
 	"fmt"
-	"time"
 
 	"github.com/urfave/cli/v2"
 	"google.golang.org/protobuf/encoding/prototext"
 
-	"go.chromium.org/luci/common/api/swarming/swarming/v1"
 	"go.skia.org/infra/cabe/go/analyzer"
-	"go.skia.org/infra/cabe/go/backends"
-	"go.skia.org/infra/cabe/go/perfresults"
-	"go.skia.org/infra/cabe/go/replaybackends"
 	"go.skia.org/infra/go/sklog"
 )
 
@@ -31,57 +25,20 @@
 		Usage:       "cabe check --pinpoint-job <pinpoint-job>",
 		Flags:       cmd.flags(),
 		Action:      cmd.action,
+		After:       cmd.cleanup,
 	}
 }
 
 // action runs diagnostic checks on an experiment.
 func (cmd *checkCmd) action(cliCtx *cli.Context) error {
 	ctx := cliCtx.Context
-
-	rbeClients, err := backends.DialRBECAS(ctx)
-	if err != nil {
-		sklog.Fatalf("dialing RBE-CAS backends: %v", err)
+	if err := cmd.dialBackends(ctx); err != nil {
 		return err
 	}
 
-	swarmingClient, err := backends.DialSwarming(ctx)
-	if err != nil {
-		sklog.Fatalf("dialing swarming: %v", err)
-		return err
-	}
-
-	var casResultReader = func(c context.Context, casInstance, digest string) (map[string]perfresults.PerfResults, error) {
-		rbeClient := rbeClients[casInstance]
-		return backends.FetchBenchmarkJSON(ctx, rbeClient, digest)
-	}
-
-	var swarmingTaskReader = func(ctx context.Context, pinpointJobID string) ([]*swarming.SwarmingRpcsTaskRequestMetadata, error) {
-		tasksResp, err := swarmingClient.ListTasks(ctx, time.Now().AddDate(0, 0, -56), time.Now(), []string{pinpointSwarmingTagName + ":" + pinpointJobID}, "")
-		if err != nil {
-			sklog.Fatalf("list task results: %v", err)
-			return nil, err
-		}
-		return tasksResp, nil
-	}
-
-	if cmd.replayFromZip != "" {
-		replayBackends := replaybackends.FromZipFile(cmd.replayFromZip, "blank")
-		casResultReader = replayBackends.CASResultReader
-		swarmingTaskReader = replayBackends.SwarmingTaskReader
-	} else if cmd.recordToZip != "" {
-		replayBackends := replaybackends.ToZipFile(cmd.recordToZip, rbeClients, swarmingClient)
-		defer func() {
-			if err := replayBackends.Close(); err != nil {
-				sklog.Fatalf("closing replay backends: %v", err)
-			}
-		}()
-		casResultReader = replayBackends.CASResultReader
-		swarmingTaskReader = replayBackends.SwarmingTaskReader
-	}
-
 	var analyzerOpts = []analyzer.Options{
-		analyzer.WithCASResultReader(casResultReader),
-		analyzer.WithSwarmingTaskReader(swarmingTaskReader),
+		analyzer.WithCASResultReader(cmd.casResultReader),
+		analyzer.WithSwarmingTaskReader(cmd.swarmingTaskReader),
 	}
 
 	a := analyzer.New(cmd.pinpointJobID, analyzerOpts...)
diff --git a/cabe/go/cmd/cabe/cli/common.go b/cabe/go/cmd/cabe/cli/common.go
index 674eb3e..eef8fdd 100644
--- a/cabe/go/cmd/cabe/cli/common.go
+++ b/cabe/go/cmd/cabe/cli/common.go
@@ -1,13 +1,24 @@
 package cli
 
 import (
+	"context"
 	"fmt"
+	"time"
 
+	rbeclient "github.com/bazelbuild/remote-apis-sdks/go/pkg/client"
 	"github.com/urfave/cli/v2"
+	swarmingapi "go.chromium.org/luci/common/api/swarming/swarming/v1"
+
+	"go.skia.org/infra/cabe/go/backends"
+	"go.skia.org/infra/cabe/go/perfresults"
+	"go.skia.org/infra/cabe/go/replaybackends"
+	"go.skia.org/infra/go/sklog"
+	"go.skia.org/infra/go/swarming"
 )
 
 const (
 	pinpointSwarmingTagName = "pinpoint_job_id"
+	rbeCASTTLDays           = 56
 )
 
 // flag names
@@ -21,6 +32,62 @@
 	pinpointJobID string
 	recordToZip   string
 	replayFromZip string
+
+	replayBackends *replaybackends.ReplayBackends
+
+	swarmingClient swarming.ApiClient
+	rbeClients     map[string]*rbeclient.Client
+
+	swarmingTaskReader backends.SwarmingTaskReader
+	casResultReader    backends.CASResultReader
+}
+
+func (a *commonCmd) readCASResultFromRBEAPI(ctx context.Context, instance, digest string) (map[string]perfresults.PerfResults, error) {
+	rbeClient, ok := a.rbeClients[instance]
+	if !ok {
+		return nil, fmt.Errorf("no RBE client for instance %s", instance)
+	}
+
+	return backends.FetchBenchmarkJSON(ctx, rbeClient, digest)
+}
+
+func (a *commonCmd) readSwarmingTasksFromAPI(ctx context.Context, pinpointJobID string) ([]*swarmingapi.SwarmingRpcsTaskRequestMetadata, error) {
+	tasksResp, err := a.swarmingClient.ListTasks(ctx, time.Now().AddDate(0, 0, -rbeCASTTLDays), time.Now(), []string{"pinpoint_job_id:" + pinpointJobID}, "")
+	if err != nil {
+		sklog.Fatalf("list task results: %v", err)
+		return nil, err
+	}
+	return tasksResp, nil
+}
+
+func (cmd *commonCmd) dialBackends(ctx context.Context) error {
+	rbeClients, err := backends.DialRBECAS(ctx)
+	if err != nil {
+		sklog.Fatalf("dialing RBE-CAS backends: %v", err)
+		return err
+	}
+	cmd.rbeClients = rbeClients
+
+	swarmingClient, err := backends.DialSwarming(ctx)
+	if err != nil {
+		sklog.Fatalf("dialing swarming: %v", err)
+		return err
+	}
+	cmd.swarmingClient = swarmingClient
+
+	cmd.swarmingTaskReader = cmd.readSwarmingTasksFromAPI
+	cmd.casResultReader = cmd.readCASResultFromRBEAPI
+
+	if cmd.replayFromZip != "" {
+		cmd.replayBackends = replaybackends.FromZipFile(cmd.replayFromZip, "blank")
+		cmd.casResultReader = cmd.replayBackends.CASResultReader
+		cmd.swarmingTaskReader = cmd.replayBackends.SwarmingTaskReader
+	} else if cmd.recordToZip != "" {
+		cmd.replayBackends = replaybackends.ToZipFile(cmd.recordToZip, rbeClients, swarmingClient)
+		cmd.casResultReader = cmd.replayBackends.CASResultReader
+		cmd.swarmingTaskReader = cmd.replayBackends.SwarmingTaskReader
+	}
+	return nil
 }
 
 func (cmd *commonCmd) flags() []cli.Flag {
@@ -56,3 +123,10 @@
 	}
 	return []cli.Flag{pinpointJobIDFlag, replayFromZipFlag, recordToZipFlag}
 }
+
+func (cmd *commonCmd) cleanup(cliCtx *cli.Context) error {
+	if cmd.replayBackends != nil {
+		return cmd.replayBackends.Close()
+	}
+	return nil
+}
diff --git a/cabe/go/replaybackends/replaybackends.go b/cabe/go/replaybackends/replaybackends.go
index 91d1889..897918f 100644
--- a/cabe/go/replaybackends/replaybackends.go
+++ b/cabe/go/replaybackends/replaybackends.go
@@ -156,10 +156,10 @@
 	ret.SwarmingTaskReader = func(ctx context.Context, pinpointJobID string) ([]*swarmingapi.SwarmingRpcsTaskRequestMetadata, error) {
 		var start, end time.Time
 		end = time.Now()
-		start = time.Now().Add(-time.Hour * 24 * 14) // past two weeks
+		start = time.Now().Add(-time.Hour * 24 * 52) // past 52 days
 		state := ""                                  // any state
 
-		sklog.Infof("getting task metadata from swarming service")
+		sklog.Infof("getting task metadata from swarming service, pinpoint_job_id: %v", pinpointJobID)
 		rmd, err := swarmingClient.ListTasks(ctx, start, end, []string{"pinpoint_job_id:" + pinpointJobID}, state)
 		if err != nil {
 			sklog.Errorf("getting swarming tasks: %v", err)
@@ -215,7 +215,9 @@
 func (r *ReplayBackends) Close() error {
 	sklog.Infof("closing replay backends")
 	if r.zipWriter == nil {
-		return fmt.Errorf("cannot close replay backends without a writer")
+		sklog.Infof("no zipWriter to close")
+
+		return nil
 	}
 	if err := r.zipWriter.Close(); err != nil {
 		return err