blob: 813565d29c26bb0e408dbcdc567d84b1a66502ca [file] [log] [blame]
package read_values
import (
"context"
"fmt"
apipb "go.chromium.org/luci/swarming/proto/api_v2"
"go.skia.org/infra/cabe/go/backends"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/perf/go/perfresults"
rbeclient "github.com/bazelbuild/remote-apis-sdks/go/pkg/client"
)
// IsSupportedAggregation checks if the aggregation method
// is supported by read_values. If not, return false.
// Empty string is supported and means that no data will be aggregated.
func IsSupportedAggregation(aggregationMethod string) bool {
if aggregationMethod == "" {
return true
}
if _, ok := perfresults.AggregationMapping[aggregationMethod]; ok {
return true
}
return false
}
// CASProvider provides API to fetch perf results from a given CAS digest.
type CASProvider interface {
Fetch(context.Context, *apipb.CASReference) (map[string]perfresults.PerfResults, error)
}
// rbeProvider implements CASProvider to fetch perf results from RBE backend.
type rbeProvider struct {
*rbeclient.Client
}
func (r *rbeProvider) Fetch(ctx context.Context, digest *apipb.CASReference) (map[string]perfresults.PerfResults, error) {
path := fmt.Sprintf("%s/%d", digest.Digest.Hash, digest.Digest.SizeBytes)
return backends.FetchBenchmarkJSON(ctx, r.Client, path)
}
type perfCASClient struct {
provider CASProvider
}
// DialRBECAS dials an RBE CAS client given a swarming instance.
// Pinpoint uses 3 swarming instances to store CAS results
// https://skia.googlesource.com/buildbot/+/5291743c698e/cabe/go/backends/rbecas.go#19
func DialRBECAS(ctx context.Context, instance string) (*perfCASClient, error) {
clients, err := backends.DialRBECAS(ctx)
if err != nil {
sklog.Errorf("Failed to dial RBE CAS client due to error: %v", err)
return nil, err
}
if client, ok := clients[instance]; ok {
return &perfCASClient{
provider: &rbeProvider{
Client: client,
},
}, nil
}
return nil, fmt.Errorf("swarming instance %s is not within the set of allowed instances", instance)
}
// ReadValuesByChart reads Pinpoint results for the benchmark and chart from a list of CAS digests.
// ReadValuesByChart will also apply data aggregations if there are any.
//
// Example Usage:
//
// ctx := context.Background()
// client, err := DialRBECAS(ctx)
// values := client.ReadValuesByChart(ctx, benchmark, chart, digests, nil)
//
// TODO(sunxiaodi@): Migrate CABE backends into pinpoint/go/backends/
func (c *perfCASClient) ReadValuesByChart(ctx context.Context, benchmark string, chart string, digests []*apipb.CASReference, agg string) ([]float64, error) {
aggMethod, ok := perfresults.AggregationMapping[agg]
if !ok && agg != "" {
return nil, skerr.Fmt("unsupported aggregation method (%s).", agg)
}
var values []float64
for _, digest := range digests {
res, err := c.provider.Fetch(ctx, digest)
if err != nil {
return nil, skerr.Wrapf(err, "could not fetch results from CAS (%v)", digest)
}
// res should be map[string]*PerfResults, right now we work around with the pointer.
pr := res[benchmark]
samples := (&pr).GetSampleValues(chart)
if aggMethod != nil && samples != nil {
values = append(values, aggMethod(perfresults.Histogram{SampleValues: samples}))
} else {
values = append(values, samples...)
}
}
return values, nil
}
func (c *perfCASClient) ReadValuesForAllCharts(ctx context.Context, benchmark string, digests []*apipb.CASReference, agg string) (map[string][]float64, error) {
aggMethod, ok := perfresults.AggregationMapping[agg]
if !ok && agg != "" {
return nil, skerr.Fmt("unsupported aggregation method (%s).", agg)
}
valuesByChart := map[string][]float64{}
// a digest is a CAS output from one swarming task
for _, digest := range digests {
res, err := c.provider.Fetch(ctx, digest)
if err != nil {
return nil, skerr.Wrapf(err, "could not fetch results from CAS (%v)", digest)
}
pr := res[benchmark]
for k, sv := range pr.Histograms {
var values []float64
if aggMethod != nil && sv.SampleValues != nil {
values = []float64{aggMethod(perfresults.Histogram{SampleValues: sv.SampleValues})}
} else {
values = sv.SampleValues
}
valuesByChart[k.ChartName] = append(valuesByChart[k.ChartName], values...)
}
}
return valuesByChart, nil
}