Add RBEPerfLoader to perfresults.

RBEPerfLoader loads perf_results.json from multiple swarming tasks
output in CAS. It merges the results from the same benchmark runs in
multiple shards.

RBEPerfLoader uses grpc replayer to record and replay the gRPC traffic
and compresses the recording to save space. Common replay test utils
are moved to replay_test.go.

Bug: b/334901267
Change-Id: I06c595f0cd5567214c32dfdabdfc07397ef31bd8
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/840963
Reviewed-by: Kevin Lubick <kjlubick@google.com>
Commit-Queue: Hao Wu <haowoo@google.com>
diff --git a/perf/go/perfresults/BUILD.bazel b/perf/go/perfresults/BUILD.bazel
index 68e3729..2b53a08 100644
--- a/perf/go/perfresults/BUILD.bazel
+++ b/perf/go/perfresults/BUILD.bazel
@@ -6,6 +6,7 @@
     srcs = [
         "buildbucket.go",
         "perf_results_parser.go",
+        "rbecas.go",
         "swarming.go",
     ],
     importpath = "go.skia.org/infra/perf/go/perfresults",
@@ -13,6 +14,9 @@
     deps = [
         "//go/httputils",
         "//go/skerr",
+        "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:go_default_library",
+        "@com_github_bazelbuild_remote_apis_sdks//go/pkg/client",
+        "@com_github_bazelbuild_remote_apis_sdks//go/pkg/digest",
         "@org_chromium_go_luci//buildbucket/proto",
         "@org_chromium_go_luci//common/retry",
         "@org_chromium_go_luci//grpc/prpc",
@@ -27,15 +31,24 @@
     srcs = [
         "buildbucket_test.go",
         "perf_results_test.go",
+        "rbecas_test.go",
+        "replay_test.go",
         "swarming_test.go",
     ],
     data = glob(["testdata/**"]),
     embed = [":perfresults"],
     env_inherit = ["HOME"],
     deps = [
+        "@com_github_bazelbuild_remote_apis_sdks//go/pkg/client",
+        "@com_github_bazelbuild_remote_apis_sdks//go/pkg/digest",
         "@com_github_stretchr_testify//assert",
         "@com_github_stretchr_testify//require",
         "@com_google_cloud_go//httpreplay",
+        "@com_google_cloud_go//rpcreplay",
         "@org_chromium_go_luci//swarming/proto/api_v2",
+        "@org_golang_google_grpc//:go_default_library",
+        "@org_golang_google_grpc//credentials",
+        "@org_golang_google_grpc//credentials/oauth",
+        "@org_golang_x_oauth2//google",
     ],
 )
diff --git a/perf/go/perfresults/perf_results_parser.go b/perf/go/perfresults/perf_results_parser.go
index 95b04d8..27984a7 100644
--- a/perf/go/perfresults/perf_results_parser.go
+++ b/perf/go/perfresults/perf_results_parser.go
@@ -110,7 +110,7 @@
 		// If Name is not empty, it is a histogram
 		if entry.Name != "" {
 			entry.populateDiagnostics(md)
-			pr.Merge(entry.Histogram)
+			pr.merge(entry.Histogram)
 			continue
 		}
 		switch entry.Type {
@@ -153,7 +153,7 @@
 		// If Name is not empty, it is a histogram
 		if entry.Name != "" {
 			entry.populateDiagnostics(md)
-			pr.Merge(entry.Histogram)
+			pr.merge(entry.Histogram)
 			continue
 		}
 		switch entry.Type {
@@ -176,8 +176,18 @@
 	}
 }
 
-// Merge takes the given histogram and merges sample values.
-func (pr *PerfResults) Merge(other Histogram) {
+// MergeResults merges the given PerfResults histograms.
+//
+// This is used to merge all the results from multiple shards running from the same commit,
+// assuming all the rest metadata are the same.
+func (pr *PerfResults) MergeResults(other *PerfResults) {
+	for _, hist := range other.Histograms {
+		pr.merge(hist)
+	}
+}
+
+// merge takes the given histogram and merges sample values.
+func (pr *PerfResults) merge(other Histogram) {
 	if h, ok := pr.Histograms[other.Name]; ok {
 		other.SampleValues = append(h.SampleValues, other.SampleValues...)
 	}
diff --git a/perf/go/perfresults/rbecas.go b/perf/go/perfresults/rbecas.go
new file mode 100644
index 0000000..84ea8af
--- /dev/null
+++ b/perf/go/perfresults/rbecas.go
@@ -0,0 +1,117 @@
+package perfresults
+
+import (
+	"bytes"
+	"context"
+	"strings"
+
+	"github.com/bazelbuild/remote-apis-sdks/go/pkg/client"
+	"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
+	repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
+	swarmingv2 "go.chromium.org/luci/swarming/proto/api_v2"
+
+	"go.skia.org/infra/go/skerr"
+)
+
+const (
+	rbeServiceAddress = "remotebuildexecution.googleapis.com:443"
+	perfJsonFilename  = "perf_results.json"
+)
+
+// RBEPerfLoader wraps rbe.Client to provide convenient functions
+type RBEPerfLoader struct {
+	*client.Client
+}
+
+func NewRBEPerfLoader(ctx context.Context, casInstance string) (*RBEPerfLoader, error) {
+	c, err := client.NewClient(ctx, casInstance, client.DialParams{
+		Service:               rbeServiceAddress,
+		UseApplicationDefault: true,
+	})
+	if err != nil {
+		return nil, skerr.Wrapf(err, "unable to create new RBE client")
+	}
+	return &RBEPerfLoader{Client: c}, nil
+}
+
+// fetchPerfDigests returns the digests from the given CAS output of a swarming task
+func (c RBEPerfLoader) fetchPerfDigests(ctx context.Context, cas *swarmingv2.CASReference) (map[string]digest.Digest, error) {
+	if c.InstanceName != cas.GetCasInstance() {
+		return nil, skerr.Fmt("cas ref is from a different instance (%s vs %s)", c.InstanceName, cas.GetCasInstance())
+	}
+
+	d, err := digest.New(cas.Digest.Hash, cas.Digest.SizeBytes)
+	if err != nil {
+		return nil, skerr.Wrap(err)
+	}
+	var rootDir repb.Directory
+	if _, err := c.ReadProto(ctx, d, &rootDir); err != nil {
+		return nil, skerr.Wrap(err)
+	}
+
+	dirs, err := c.GetDirectoryTree(ctx, d.ToProto())
+	if err != nil {
+		return nil, skerr.Wrapf(err, "unable to get dir tree for CAS (%s)", cas.String())
+	}
+
+	outputs, err := c.FlattenTree(&repb.Tree{
+		Root:     &rootDir,
+		Children: dirs,
+	}, "")
+	if err != nil {
+		return nil, skerr.Wrapf(err, "unable to flatten tree for CAS (%s)", cas.String())
+	}
+
+	perfDigests := make(map[string]digest.Digest)
+	for path, output := range outputs {
+		if !strings.HasSuffix(path, perfJsonFilename) {
+			continue
+		}
+		parts := strings.Split(path, "/")
+		if len(parts) != 2 {
+			return nil, skerr.Fmt("perf file location (%s) is unexpected", path)
+		}
+		perfDigests[parts[0]] = output.Digest
+	}
+	return perfDigests, nil
+}
+
+// loadPerfResult expects the JSON content from the given digest and loads into PerfResults.
+func (c RBEPerfLoader) loadPerfResult(ctx context.Context, digest digest.Digest) (*PerfResults, error) {
+	blob, _, err := c.ReadBlob(ctx, digest)
+	if err != nil {
+		return nil, skerr.Wrap(err)
+	}
+	return NewResults(bytes.NewBuffer(blob))
+}
+
+// LoadPerfResults loads all the perf_results.json from the list of CAS outputs of swarming tasks.
+//
+// The CAS output should point to the root folder of the swarming task.
+func (c RBEPerfLoader) LoadPerfResults(ctx context.Context, cases ...*swarmingv2.CASReference) (map[string]*PerfResults, error) {
+	results := make(map[string]*PerfResults)
+
+	// This can be done in parallel, but the gain seems to be minimum and it increases complexity
+	// by lot. We keep it simple here unless we run into performance issues.
+	for _, cas := range cases {
+		digests, err := c.fetchPerfDigests(ctx, cas)
+		if err != nil {
+			return nil, skerr.Wrap(err)
+		}
+
+		for benchmark, digest := range digests {
+			pr, err := c.loadPerfResult(ctx, digest)
+			if err != nil {
+				return nil, skerr.Wrap(err)
+			}
+
+			if e, ok := results[benchmark]; ok {
+				e.MergeResults(pr)
+			} else {
+				results[benchmark] = pr
+			}
+		}
+	}
+
+	return results, nil
+}
diff --git a/perf/go/perfresults/rbecas_test.go b/perf/go/perfresults/rbecas_test.go
new file mode 100644
index 0000000..d879af8
--- /dev/null
+++ b/perf/go/perfresults/rbecas_test.go
@@ -0,0 +1,107 @@
+package perfresults
+
+import (
+	"context"
+	"testing"
+
+	"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
+	"github.com/stretchr/testify/assert"
+)
+
+func Test_RBE_LoadPerfResults_ReturnPerfResults(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	c := newRBEReplay(t, ctx, "chrome-swarming", "RBE_LoadPerfResults_ReturnPerfResults")
+
+	// CAS Output from: https://chrome-swarming.appspot.com/task?id=68f6c580c2e5d711
+	cas := makeCAS("d127f8323a5016001b6d44bdc784a41aacb982909f721878589258d3dfc30616", 752)
+
+	// Load from the same output twice so they can be merged
+	pr, err := c.LoadPerfResults(ctx, cas, cas)
+	assert.NoError(t, err)
+	assert.Contains(t, pr, "speedometer3")
+
+	assert.Len(t, pr["speedometer3"].Histograms, 21)
+	assert.Len(t, pr["speedometer3"].GetSampleValues("Charts-chartjs"), 20, "concating two samples should add together")
+}
+
+func Test_RBE_FetchPerfDigests_ReturnListDigests(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	c := newRBEReplay(t, ctx, "chrome-swarming", "RBE_FetchPerfDigests_ReturnListDigests")
+
+	cas := makeCAS("d127f8323a5016001b6d44bdc784a41aacb982909f721878589258d3dfc30616", 752)
+	digests, err := c.fetchPerfDigests(ctx, cas)
+	assert.NoError(t, err)
+
+	expected := map[string]digest.Digest{
+		"rendering.mobile":            {Hash: "06d37aeeb0d7a2d1040082b2cf17c6caf9d2d8deae8b69e4bcdc58d6f6647be4", Size: 1985714},
+		"speedometer2-predictable":    {Hash: "258720d2a653d825d92aff2c1085abe54f270ebce6072d72057b24534a569c88", Size: 26728},
+		"system_health.common_mobile": {Hash: "aede2a355ae63200cd11e1791226dc8919d517158d0a623323655e6e75327690", Size: 1271968},
+		"speedometer2":                {Hash: "69034a473e9e1a845b2bdb46e0ce660a822ba49f890ca89d5088f97b198291b4", Size: 26553},
+		"speedometer3":                {Hash: "0b20a718825dc5805bb3b4d8b2cff4633a900e7cb3a8050164fcac59ceb2c58a", Size: 30817},
+		"speedometer3-predictable":    {Hash: "cd3b62c0c75a5690b434bcea50bec753a948a172d6ea27a7eb0c27fa8b3c0326", Size: 31124},
+	}
+
+	assert.Subset(t, digests, expected)
+}
+
+func Test_RBE_FetchPerfDigests_Empty(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	c := newRBEReplay(t, ctx, "chrome-swarming", "RBE_FetchPerfDigests_Empty")
+
+	cas := makeCAS("d8a9ce0076c037b00dc8f75261db1c6811e0d54a39ee243370af1b8df05264f6", 435)
+	digests, err := c.fetchPerfDigests(ctx, cas)
+	assert.NoError(t, err)
+	assert.Len(t, digests, 0)
+}
+
+func Test_RBE_FetchPerfDigests_InvalidPath(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	c := newRBEReplay(t, ctx, "chrome-swarming", "RBE_FetchPerfDigests_InvalidPath")
+
+	cas := makeCAS("ec9d563cd54d4915acf6f894207355af52d6f850ae9734e9274c058b99cb15f7", 1362)
+	_, err := c.fetchPerfDigests(ctx, cas)
+	assert.ErrorContains(t, err, "perf file location")
+}
+
+func Test_RBE_LoadPerfResult_ReturnValid(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	c := newRBEReplay(t, ctx, "chrome-swarming", "RBE_LoadPerfResult_ReturnValid")
+
+	cas := makeCAS("d127f8323a5016001b6d44bdc784a41aacb982909f721878589258d3dfc30616", 752)
+	digests, err := c.fetchPerfDigests(ctx, cas)
+	assert.NoError(t, err)
+
+	pr, err := c.loadPerfResult(ctx, digests["speedometer2"])
+	assert.NoError(t, err)
+	assert.Len(t, pr.Histograms, 18)
+
+	pr, err = c.loadPerfResult(ctx, digests["speedometer3"])
+	assert.NoError(t, err)
+	assert.Len(t, pr.Histograms, 21)
+}
+
+func Test_RBE_LoadPerfResult_InvalidDigest(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	c := newRBEReplay(t, ctx, "chrome-swarming", "RBE_LoadPerfResult_InvalidDigest")
+
+	check_error := func(d digest.Digest) {
+		pr, err := c.loadPerfResult(ctx, d)
+		assert.Error(t, err)
+		assert.Nil(t, pr)
+	}
+
+	// it should load a directory but fail at parsing
+	check_error(digest.Digest{Hash: "d127f8323a5016001b6d44bdc784a41aacb982909f721878589258d3dfc30616", Size: 752})
+
+	// invalid json file content
+	check_error(digest.Digest{Hash: "94f74174df883c2b1e30e26bfe765b9e15fa39473db34f882275b67fa89a579a", Size: 6504})
+
+	// invalid digest
+	check_error(digest.Digest{Hash: "94f74174df883c2b1e30e26bfe765b9e15fa39473db34f882275b67fa89a579b", Size: 6504})
+}
diff --git a/perf/go/perfresults/replay_test.go b/perf/go/perfresults/replay_test.go
new file mode 100644
index 0000000..050b9f7
--- /dev/null
+++ b/perf/go/perfresults/replay_test.go
@@ -0,0 +1,105 @@
+package perfresults
+
+import (
+	"compress/gzip"
+	"context"
+	"crypto/tls"
+	"flag"
+	"net/http"
+	"os"
+	"path"
+	"testing"
+
+	"cloud.google.com/go/httpreplay"
+	"cloud.google.com/go/rpcreplay"
+	"github.com/bazelbuild/remote-apis-sdks/go/pkg/client"
+	"github.com/stretchr/testify/require"
+	"golang.org/x/oauth2/google"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/credentials"
+	"google.golang.org/grpc/credentials/oauth"
+)
+
+var recordPath = flag.String("record_path", "", "The path the replayer writes real backend responses.")
+
+func setupReplay(t *testing.T, replayName string) *http.Client {
+	// if the recordPath is not given, then we will replay from the testdata;
+	// otherwise we will record the traffic and save to the given path;
+	if *recordPath == "" {
+		replayFile := path.Join("testdata", replayName)
+		hr, err := httpreplay.NewReplayer(replayFile)
+		require.NoError(t, err)
+
+		ctx, cancel := context.WithCancel(context.Background())
+		c, err := hr.Client(ctx)
+		require.NoError(t, err)
+		t.Cleanup(func() {
+			hr.Close()
+			cancel()
+		})
+		return c
+	}
+
+	replayFile := path.Join(*recordPath, replayName)
+	hr, err := httpreplay.NewRecorder(replayFile, nil)
+	require.NoError(t, err)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	c, err := hr.Client(ctx)
+	require.NoError(t, err)
+	t.Cleanup(func() {
+		hr.Close()
+		cancel()
+	})
+	return c
+}
+
+func newRBEReplay(t *testing.T, ctx context.Context, casInstance string, replayName string) *RBEPerfLoader {
+	casInstance = "projects/" + casInstance + "/instances/default_instance"
+
+	ts, err := google.DefaultTokenSource(ctx)
+	require.NoError(t, err)
+
+	opts := []grpc.DialOption{
+		grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: ts}),
+		grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})),
+	}
+
+	if *recordPath == "" {
+		replayFile := path.Join("testdata", replayName)
+		f, err := os.Open(replayFile + ".rpc")
+		require.NoError(t, err)
+		gr, err := gzip.NewReader(f)
+		require.NoError(t, err)
+
+		rr, err := rpcreplay.NewReplayerReader(gr)
+		require.NoError(t, err)
+		opts = append(opts, rr.DialOptions()...)
+		t.Cleanup(func() {
+			rr.Close()
+			gr.Close()
+			f.Close()
+		})
+	} else {
+		replayFile := path.Join(*recordPath, replayName)
+		f, err := os.Create(replayFile + ".rpc")
+		require.NoError(t, err)
+		gw := gzip.NewWriter(f)
+
+		rr, err := rpcreplay.NewRecorderWriter(gw, nil)
+		require.NoError(t, err)
+		opts = append(opts, rr.DialOptions()...)
+
+		t.Cleanup(func() {
+			rr.Close()
+			gw.Close()
+			f.Close()
+		})
+	}
+
+	conn, err := grpc.Dial(rbeServiceAddress, opts...)
+	require.NoError(t, err)
+	c, err := client.NewClientFromConnection(ctx, casInstance, conn, conn)
+	require.NoError(t, err)
+	return &RBEPerfLoader{Client: c}
+}
diff --git a/perf/go/perfresults/swarming_test.go b/perf/go/perfresults/swarming_test.go
index 0d54cdf..6bad8eb 100644
--- a/perf/go/perfresults/swarming_test.go
+++ b/perf/go/perfresults/swarming_test.go
@@ -2,51 +2,13 @@
 
 import (
 	"context"
-	"flag"
-	"net/http"
-	"path"
 	"testing"
 
-	"cloud.google.com/go/httpreplay"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 	swarmingv2 "go.chromium.org/luci/swarming/proto/api_v2"
 )
 
-var recordPath = flag.String("record_path", "", "The path the replayer writes real backend responses.")
-
-func setupReplay(t *testing.T, replayName string) *http.Client {
-	// if the recordPath is not given, then we will replay from the testdata;
-	// otherwise we will record the traffic and save to the given path;
-	if *recordPath == "" {
-		replayFile := path.Join("testdata", replayName)
-		hr, err := httpreplay.NewReplayer(replayFile)
-		require.NoError(t, err)
-
-		ctx, cancel := context.WithCancel(context.Background())
-		c, err := hr.Client(ctx)
-		require.NoError(t, err)
-		t.Cleanup(func() {
-			hr.Close()
-			cancel()
-		})
-		return c
-	}
-
-	replayFile := path.Join(*recordPath, replayName)
-	hr, err := httpreplay.NewRecorder(replayFile, nil)
-	require.NoError(t, err)
-
-	ctx, cancel := context.WithCancel(context.Background())
-	c, err := hr.Client(ctx)
-	require.NoError(t, err)
-	t.Cleanup(func() {
-		hr.Close()
-		cancel()
-	})
-	return c
-}
-
 func makeCAS(hash string, size int64) *swarmingv2.CASReference {
 	return &swarmingv2.CASReference{
 		Digest: &swarmingv2.Digest{
diff --git a/perf/go/perfresults/testdata/RBE_FetchPerfDigests_Empty.rpc b/perf/go/perfresults/testdata/RBE_FetchPerfDigests_Empty.rpc
new file mode 100644
index 0000000..1645f0f
--- /dev/null
+++ b/perf/go/perfresults/testdata/RBE_FetchPerfDigests_Empty.rpc
Binary files differ
diff --git a/perf/go/perfresults/testdata/RBE_FetchPerfDigests_InvalidPath.rpc b/perf/go/perfresults/testdata/RBE_FetchPerfDigests_InvalidPath.rpc
new file mode 100644
index 0000000..fe409e0
--- /dev/null
+++ b/perf/go/perfresults/testdata/RBE_FetchPerfDigests_InvalidPath.rpc
Binary files differ
diff --git a/perf/go/perfresults/testdata/RBE_FetchPerfDigests_ReturnListDigests.rpc b/perf/go/perfresults/testdata/RBE_FetchPerfDigests_ReturnListDigests.rpc
new file mode 100644
index 0000000..fca2548
--- /dev/null
+++ b/perf/go/perfresults/testdata/RBE_FetchPerfDigests_ReturnListDigests.rpc
Binary files differ
diff --git a/perf/go/perfresults/testdata/RBE_LoadPerfResult_InvalidDigest.rpc b/perf/go/perfresults/testdata/RBE_LoadPerfResult_InvalidDigest.rpc
new file mode 100644
index 0000000..d0f9d0c
--- /dev/null
+++ b/perf/go/perfresults/testdata/RBE_LoadPerfResult_InvalidDigest.rpc
Binary files differ
diff --git a/perf/go/perfresults/testdata/RBE_LoadPerfResult_ReturnValid.rpc b/perf/go/perfresults/testdata/RBE_LoadPerfResult_ReturnValid.rpc
new file mode 100644
index 0000000..0940db7
--- /dev/null
+++ b/perf/go/perfresults/testdata/RBE_LoadPerfResult_ReturnValid.rpc
Binary files differ
diff --git a/perf/go/perfresults/testdata/RBE_LoadPerfResults_ReturnPerfResults.rpc b/perf/go/perfresults/testdata/RBE_LoadPerfResults_ReturnPerfResults.rpc
new file mode 100644
index 0000000..f50f180
--- /dev/null
+++ b/perf/go/perfresults/testdata/RBE_LoadPerfResults_ReturnPerfResults.rpc
Binary files differ