blob: 84ea8af4a7cfe6d12a61c4c338d715108cc9e33c [file] [log] [blame] [edit]
package perfresults
import (
"bytes"
"context"
"strings"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/client"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
swarmingv2 "go.chromium.org/luci/swarming/proto/api_v2"
"go.skia.org/infra/go/skerr"
)
const (
rbeServiceAddress = "remotebuildexecution.googleapis.com:443"
perfJsonFilename = "perf_results.json"
)
// RBEPerfLoader wraps rbe.Client to provide convenient functions
type RBEPerfLoader struct {
*client.Client
}
func NewRBEPerfLoader(ctx context.Context, casInstance string) (*RBEPerfLoader, error) {
c, err := client.NewClient(ctx, casInstance, client.DialParams{
Service: rbeServiceAddress,
UseApplicationDefault: true,
})
if err != nil {
return nil, skerr.Wrapf(err, "unable to create new RBE client")
}
return &RBEPerfLoader{Client: c}, nil
}
// fetchPerfDigests returns the digests from the given CAS output of a swarming task
func (c RBEPerfLoader) fetchPerfDigests(ctx context.Context, cas *swarmingv2.CASReference) (map[string]digest.Digest, error) {
if c.InstanceName != cas.GetCasInstance() {
return nil, skerr.Fmt("cas ref is from a different instance (%s vs %s)", c.InstanceName, cas.GetCasInstance())
}
d, err := digest.New(cas.Digest.Hash, cas.Digest.SizeBytes)
if err != nil {
return nil, skerr.Wrap(err)
}
var rootDir repb.Directory
if _, err := c.ReadProto(ctx, d, &rootDir); err != nil {
return nil, skerr.Wrap(err)
}
dirs, err := c.GetDirectoryTree(ctx, d.ToProto())
if err != nil {
return nil, skerr.Wrapf(err, "unable to get dir tree for CAS (%s)", cas.String())
}
outputs, err := c.FlattenTree(&repb.Tree{
Root: &rootDir,
Children: dirs,
}, "")
if err != nil {
return nil, skerr.Wrapf(err, "unable to flatten tree for CAS (%s)", cas.String())
}
perfDigests := make(map[string]digest.Digest)
for path, output := range outputs {
if !strings.HasSuffix(path, perfJsonFilename) {
continue
}
parts := strings.Split(path, "/")
if len(parts) != 2 {
return nil, skerr.Fmt("perf file location (%s) is unexpected", path)
}
perfDigests[parts[0]] = output.Digest
}
return perfDigests, nil
}
// loadPerfResult expects the JSON content from the given digest and loads into PerfResults.
func (c RBEPerfLoader) loadPerfResult(ctx context.Context, digest digest.Digest) (*PerfResults, error) {
blob, _, err := c.ReadBlob(ctx, digest)
if err != nil {
return nil, skerr.Wrap(err)
}
return NewResults(bytes.NewBuffer(blob))
}
// LoadPerfResults loads all the perf_results.json from the list of CAS outputs of swarming tasks.
//
// The CAS output should point to the root folder of the swarming task.
func (c RBEPerfLoader) LoadPerfResults(ctx context.Context, cases ...*swarmingv2.CASReference) (map[string]*PerfResults, error) {
results := make(map[string]*PerfResults)
// This can be done in parallel, but the gain seems to be minimum and it increases complexity
// by lot. We keep it simple here unless we run into performance issues.
for _, cas := range cases {
digests, err := c.fetchPerfDigests(ctx, cas)
if err != nil {
return nil, skerr.Wrap(err)
}
for benchmark, digest := range digests {
pr, err := c.loadPerfResult(ctx, digest)
if err != nil {
return nil, skerr.Wrap(err)
}
if e, ok := results[benchmark]; ok {
e.MergeResults(pr)
} else {
results[benchmark] = pr
}
}
}
return results, nil
}