blob: 91d1889222a189031361fb3a8aa761bbe5d6773c [file] [log] [blame]
// Package replaybackends provides in-memory implementations of backend dependencies
// for testing. These implementations re-play backend responses recorded during calls
// to live production services.
// To update the files containing replay data for cabe unit tests, see the
// instructions here: go/cabe-skia-assets
package replaybackends
import (
"archive/zip"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"path"
"path/filepath"
"reflect"
"strings"
"sync"
"time"
rbeclient "github.com/bazelbuild/remote-apis-sdks/go/pkg/client"
swarmingapi "go.chromium.org/luci/common/api/swarming/swarming/v1"
"go.skia.org/infra/cabe/go/backends"
"go.skia.org/infra/cabe/go/perfresults"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/swarming"
)
const (
swarmingTasksFileName = "swarming-tasks.json"
casDirName = "cas"
)
// ReplayBackends implements the backend interfaces required for testing package etl and rpcservice.
type ReplayBackends struct {
ParsedPerfResults map[string]perfresults.PerfResults
ParsedSwarmingTasks []*swarmingapi.SwarmingRpcsTaskRequestMetadata
rbeClients map[string]*rbeclient.Client
CASResultReader backends.CASResultReader
SwarmingTaskReader backends.SwarmingTaskReader
replayZipFile string
// zipBuf is an in-memory buffer for data to be written out to a zip file.
zipBuf *bytes.Buffer
// muZip protects zipWriter
muZip sync.Mutex
zipWriter *zip.Writer
}
// FromZipFile opens a zip archive of pre-recorded backend responses and populates a
// ReplayBackends instance with it suitable for testing purposes.
//
// Replay zip files have the following internal directory structure:
// ./swarming-tasks.json - json serialized array of [swarmingapi.SwarmingRpcsTaskRequestMetadata]
// ./cas/<digest hash> - (multiple) the individual swarming task measurement output files
//
// benchmarkName is typically something determined by the thing *executing* the benchmarks, not
// by the benchmark code itself. Thus, when reconstructing the artifacts for a benchmark run we
// need to have that name provided a priori since it can't be determined automatically from task
// output files alone.
func FromZipFile(replayZipFile string, benchmarkName string) *ReplayBackends {
archive, err := zip.OpenReader(replayZipFile)
if err != nil {
panic(err)
}
defer archive.Close()
ret := &ReplayBackends{
replayZipFile: replayZipFile,
}
ret.ParsedPerfResults = make(map[string]perfresults.PerfResults)
for _, file := range archive.File {
dirName, fileName := path.Split(file.Name)
if fileName == swarmingTasksFileName {
fileReader, err := file.Open()
if err != nil {
panic(err)
}
defer fileReader.Close()
tasksBytes := make([]byte, file.UncompressedSize64)
if _, err = io.ReadFull(fileReader, tasksBytes); err != nil {
panic(err)
}
if err = json.Unmarshal([]byte(tasksBytes), &ret.ParsedSwarmingTasks); err != nil {
panic(err)
}
}
if dirName == casDirName+"/" {
fileReader, err := file.Open()
if err != nil {
panic(err)
}
defer fileReader.Close()
fileBytes := make([]byte, file.UncompressedSize64)
if _, err = io.ReadFull(fileReader, fileBytes); err != nil {
panic(err)
}
res := perfresults.PerfResults{}
// if a task had no CAS output (e.g. task failed entirely) then the json can be zero length.
if len(fileBytes) > 0 {
// But if there are bytes, they need to actually parse or it's fatal for the test setup.
err = json.Unmarshal(fileBytes, &res)
if err != nil {
fmt.Printf("failed to parse %q of type %v",
file.Name,
reflect.TypeOf(fileBytes))
panic(err)
}
}
ret.ParsedPerfResults[fileName] = res
}
}
ret.SwarmingTaskReader = func(ctx context.Context, pinpointJobID string) ([]*swarmingapi.SwarmingRpcsTaskRequestMetadata, error) {
return ret.ParsedSwarmingTasks, nil
}
// returns a map of benchmark name to parsed PerfResults.
ret.CASResultReader = func(c context.Context, instance, digest string) (map[string]perfresults.PerfResults, error) {
df := strings.Split(digest, "/")[0]
res, ok := ret.ParsedPerfResults[df]
if !ok {
return nil, fmt.Errorf("couldn't find a CAS blob for %q (%q)", digest, df)
}
return map[string]perfresults.PerfResults{benchmarkName: res}, nil
}
return ret
}
// ToZipFile is the inverse of FromZipFile. Given a filename to record to, and RBE and Swarming API
// clients to intercept, this returns a [*ReplayBackends] instance with SwarmingTaskReader and
// CasResultReader properties set to functions that will record responses from live services.
// Be sure to call [ReplayBackends.Close()] when you are done making calls to RBE and Swarming
// in order to complete the recording process and save the replay data to replayZipFile.
func ToZipFile(replayZipFile string,
rbeClients map[string]*rbeclient.Client,
swarmingClient swarming.ApiClient) *ReplayBackends {
ret := &ReplayBackends{
replayZipFile: replayZipFile,
}
ret.SwarmingTaskReader = func(ctx context.Context, pinpointJobID string) ([]*swarmingapi.SwarmingRpcsTaskRequestMetadata, error) {
var start, end time.Time
end = time.Now()
start = time.Now().Add(-time.Hour * 24 * 14) // past two weeks
state := "" // any state
sklog.Infof("getting task metadata from swarming service")
rmd, err := swarmingClient.ListTasks(ctx, start, end, []string{"pinpoint_job_id:" + pinpointJobID}, state)
if err != nil {
sklog.Errorf("getting swarming tasks: %v", err)
return nil, err
}
sklog.Infof("read %d tasks from swarming api", len(rmd))
raw, err := json.Marshal(rmd)
if err != nil {
return nil, err
}
if err := ret.recordData(raw, swarmingTasksFileName); err != nil {
return nil, err
}
return rmd, err
}
ret.CASResultReader = func(ctx context.Context, instance, digest string) (map[string]perfresults.PerfResults, error) {
rbeClient, ok := rbeClients[instance]
if !ok {
return nil, fmt.Errorf("no RBE client for instance %s", instance)
}
res, err := backends.FetchBenchmarkJSONRaw(ctx, rbeClient, digest)
if err != nil {
return nil, err
}
if len(res) > 1 {
return nil, fmt.Errorf("recording doesn't work for outputs with multiple benchmarks")
}
d := strings.Split(digest, "/")[0]
for _, raw := range res {
if err := ret.recordData(raw, casDirName, d); err != nil {
return nil, err
}
}
ret := make(map[string]perfresults.PerfResults)
for benchmark, blob := range res {
res := perfresults.PerfResults{}
if err := json.Unmarshal(blob, &res); err != nil {
sklog.Errorf("unmarshaling benchmark json: %v", err)
return nil, err
}
ret[benchmark] = res
}
return ret, nil
}
ret.zipBuf = new(bytes.Buffer)
ret.zipWriter = zip.NewWriter(ret.zipBuf)
return ret
}
func (r *ReplayBackends) Close() error {
sklog.Infof("closing replay backends")
if r.zipWriter == nil {
return fmt.Errorf("cannot close replay backends without a writer")
}
if err := r.zipWriter.Close(); err != nil {
return err
}
sklog.Infof("writing replay data to %s", r.replayZipFile)
return os.WriteFile(r.replayZipFile, r.zipBuf.Bytes(), 0755)
}
func (r *ReplayBackends) recordData(raw []byte, path ...string) error {
r.muZip.Lock()
defer r.muZip.Unlock()
sklog.Infof("about to write %d bytes to %s in replay zip", len(raw), filepath.Join(path...))
f, err := r.zipWriter.Create(filepath.Join(path...))
if err != nil {
sklog.Fatal(err)
}
_, err = f.Write(raw)
if err != nil {
sklog.Fatal(err)
}
sklog.Infof("successfully wrote %d bytes to %s in replay zip", len(raw), filepath.Join(path...))
return nil
}