blob: f0d98d081a87136fe79c8cec81d9b74febf3cb57 [file] [log] [blame]
// See README.md
package main
import (
"context"
"encoding/csv"
"flag"
"fmt"
"io"
"net/url"
"os"
"sort"
"sync"
"time"
"cloud.google.com/go/storage"
"github.com/aclements/go-moremath/stats"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/query"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/perf/go/ingest/format"
"go.skia.org/infra/perf/go/ingest/parser"
"golang.org/x/sync/errgroup"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)
const (
workerPoolSize = 64
)
// flags
var (
prefix = flag.String("prefix", "", "GCS location to search for files. E.g. gs://skia-perf/nano-json-v1/2021/05/23/02/. If not supplied then all the files from yesterday are queried.")
out = flag.String("out", "", "Output filename. If not supplied then CSV file is written to stdout.")
filter = flag.String("filter", "source_type=skp&sub_result=min_ms", "A query to filter the traces.")
top = flag.Int("top", 100, "The top number of CSV rows to report. Set to -1 to return all of them.")
)
// sampleInfo is the information we calculate for each test run.
type sampleInfo struct {
traceid string
median float64
min float64
ratio float64 // ratio = median/min.
}
// sampleInfoSlice is a utility type for sorting slices of sampleInfo by
// descending ratio.
type sampleInfoSlice []sampleInfo
func (p sampleInfoSlice) Len() int { return len(p) }
func (p sampleInfoSlice) Less(i, j int) bool { return p[i].ratio > p[j].ratio }
func (p sampleInfoSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func main() {
ctx, bucket, objectPrefix, traceFilter, w := initialize()
defer util.Close(w)
filenames, err := filenamesFromBucketAndObjectPrefix(ctx, bucket, objectPrefix)
if err != nil {
sklog.Fatal(err)
}
samples, err := samplesFromFilenames(ctx, bucket, traceFilter, filenames)
if err != nil {
sklog.Fatal(err)
}
sort.Sort(sampleInfoSlice(samples))
if err := writeCSV(samples, *top, w); err != nil {
sklog.Fatal(err)
}
util.Close(w)
}
// initialize parses all the flags and constructs the objects we need from them.
func initialize() (context.Context, *storage.BucketHandle, string, *query.Query, io.WriteCloser) {
common.Init()
ctx := context.Background()
// Determine bucket and object prefix.
if *prefix == "" {
*prefix = "gs://skia-perf/nano-json-v1/" + time.Now().Add(-24*time.Hour).Format("2006/01/02/")
}
sklog.Infof("Reading JSON files from: %q", *prefix)
u, err := url.Parse(*prefix)
if err != nil {
sklog.Fatal("Failed to parse the prefix %q: %s", *prefix, err)
}
bucketName := u.Host
objectPrefix := u.Path[1:]
sklog.Infof("Reading JSON files from bucket: %q path: %q", bucketName, objectPrefix)
// Construct traceFilter.
values, err := url.ParseQuery(*filter)
if err != nil {
sklog.Fatal("Failed to parse filter %q: %s", *filter, err)
}
traceFilter, err := query.New(values)
if err != nil {
sklog.Fatal("Failed to build traceFilter from filter %q: %s", *filter, err)
}
// Create GCS client.
gcsClient, err := storage.NewClient(ctx, option.WithScopes(storage.ScopeReadOnly))
if err != nil {
sklog.Fatal("Failed to create GCS client: %s", err)
}
bucket := gcsClient.Bucket(bucketName)
// Determine where to write the results.
var w io.WriteCloser = os.Stdout
if *out != "" {
w, err = os.Create(*out)
if err != nil {
sklog.Fatalf("Failed to create %q", *out, err)
}
}
return ctx, bucket, objectPrefix, traceFilter, w
}
func filenamesFromBucketAndObjectPrefix(ctx context.Context, bucket *storage.BucketHandle, objectPrefix string) ([]string, error) {
// Populate filenames with all the names of the files in GCS that match the prefix and filter.
filenames := []string{}
q := &storage.Query{
Prefix: objectPrefix,
}
if err := q.SetAttrSelection([]string{"Name"}); err != nil { // Only return the Name.
return nil, skerr.Wrapf(err, "Failed to reduce attributes.")
}
it := bucket.Objects(ctx, q)
for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
sklog.Fatalf("Failed iterating names of files in bucket: %s", err)
}
filenames = append(filenames, attrs.Name)
}
return filenames, nil
}
func samplesFromFilenames(ctx context.Context, bucket *storage.BucketHandle, traceFilter *query.Query, filenames []string) ([]sampleInfo, error) {
// Protected by mutex.
samples := []sampleInfo{}
// Protects allRatios
var mutex sync.Mutex
// gcsFilenameChannel is used to distribute work to the workers.
gcsFilenameChannel := make(chan string, len(filenames))
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < workerPoolSize; i++ {
g.Go(func() error {
for filename := range gcsFilenameChannel {
info, err := traceInfoFromFilename(ctx, bucket, traceFilter, filename)
if err != nil {
return skerr.Wrap(err)
}
mutex.Lock()
samples = append(samples, info...)
mutex.Unlock()
}
return nil
})
}
// Feed the workers.
for _, filename := range filenames {
gcsFilenameChannel <- filename
}
close(gcsFilenameChannel)
if err := g.Wait(); err != nil {
// Empty the channel.
for range gcsFilenameChannel {
}
sklog.Fatal(err)
}
return samples, nil
}
// traceInfoFromFilename returns a slice of traceInfos extracted from a single
// JSON file.
func traceInfoFromFilename(ctx context.Context, bucket *storage.BucketHandle, traceFilter *query.Query, filename string) ([]sampleInfo, error) {
sklog.Info(filename)
r, err := bucket.Object(filename).NewReader(ctx)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to open GCS file: %q", filename)
}
defer util.Close(r)
benchData, err := format.ParseLegacyFormat(r)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to parse GCS file: %q", filename)
}
ret := []sampleInfo{}
for traceid, samples := range parser.GetSamplesFromLegacyFormat(benchData) {
// Filter for the types of traces we are interested in.
if !traceFilter.Matches(traceid) {
continue
}
sort.Float64s(samples.Values) // Sort so we can find the min.
values := stats.Sample{Xs: samples.Values}
median := values.Quantile(0.5)
min := samples.Values[0]
ratio := median / min
ret = append(ret, sampleInfo{
traceid: traceid,
median: median,
min: min,
ratio: ratio,
})
}
return ret, nil
}
// writeCSV to w, using samples, and truncating to 'top' entries if top != -1.
func writeCSV(samples []sampleInfo, top int, w io.Writer) error {
cw := csv.NewWriter(w)
if err := cw.Write([]string{"traceid", "min", "median", "ratio"}); err != nil {
return skerr.Wrapf(err, "Failed to write header.")
}
count := len(samples)
if top > 0 && top < count {
samples = samples[:top]
}
for _, info := range samples {
if err := cw.Write([]string{
info.traceid,
fmt.Sprintf("%f", info.min),
fmt.Sprintf("%f", info.median),
fmt.Sprintf("%f", info.ratio),
}); err != nil {
return skerr.Wrapf(err, "Failed to write row.")
}
}
cw.Flush()
return nil
}