blob: 7ae4d8ec856330fc52012c20988cdeb69732c663 [file] [log] [blame]
package ingestion_processors
import (
"context"
"errors"
"io/ioutil"
"regexp"
"time"
"go.opencensus.io/trace"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/vcsinfo"
"go.skia.org/infra/golden/go/ingestion"
"go.skia.org/infra/golden/go/jsonio"
"go.skia.org/infra/golden/go/tracestore"
"go.skia.org/infra/golden/go/tracestore/bt_tracestore"
"go.skia.org/infra/golden/go/types"
)
const (
// BigTableTraceStore identifies a primary-branch ingester backed by BigTable.
BigTableTraceStore = "big_table_tracestore"
btProjectConfig = "BTProjectID"
btInstanceConfig = "BTInstance"
btTableConfig = "BTTable"
)
// PrimaryBranchBigTable creates a Processor that uses a BigTable-backed tracestore.
func PrimaryBranchBigTable(ctx context.Context, src ingestion.Source, configParams map[string]string, vcs vcsinfo.VCS) (ingestion.Processor, error) {
btc := bt_tracestore.BTConfig{
ProjectID: configParams[btProjectConfig],
InstanceID: configParams[btInstanceConfig],
TableID: configParams[btTableConfig],
VCS: vcs,
}
bts, err := bt_tracestore.New(ctx, btc, true)
if err != nil {
return nil, skerr.Fmt("could not instantiate BT tracestore: %s", err)
}
return &btProcessor{
ts: bts,
vcs: btc.VCS,
source: src,
}, nil
}
// btProcessor implements the ingestion.Processor interface for gold using
// the BigTable TraceStore
type btProcessor struct {
ts tracestore.TraceStore
vcs vcsinfo.VCS
source ingestion.Source
}
// HandlesFile returns true if the configured source handles this file.
func (b *btProcessor) HandlesFile(name string) bool {
return b.source.HandlesFile(name)
}
// Process implements the ingestion.Processor interface.
func (b *btProcessor) Process(ctx context.Context, fileName string) error {
ctx, span := trace.StartSpan(ctx, "ingestion_BigTableProcess")
defer span.End()
r, err := b.source.GetReader(ctx, fileName)
if err != nil {
return skerr.Wrap(err)
}
gr, err := processGoldResults(ctx, r)
if err != nil {
return skerr.Wrapf(err, "could not process file %s from source %s", fileName, b.source)
}
if len(gr.Results) == 0 {
sklog.Infof("file %s had no results", fileName)
return nil
}
span.AddAttributes(trace.Int64Attribute("num_results", int64(len(gr.Results))))
// If the target commit is not in the primary repository we look it up
// in the secondary that has the primary as a dependency.
var targetHash string
if gr.CommitID != "" && gr.CommitMetadata != "" {
targetHash, err = b.lookupChromeOSMetadata(ctx, gr.CommitMetadata)
if err != nil {
return skerr.Wrapf(err, "Looking up githash from Chromeos metadata: %q", gr.CommitMetadata)
}
} else {
targetHash, err = getCanonicalCommitHash(ctx, b.vcs, gr.GitHash)
if err != nil {
return skerr.Wrapf(err, "could not identify canonical commit from %q", gr.GitHash)
}
if ok, err := b.isOnMaster(ctx, targetHash); err != nil {
return skerr.Wrapf(err, "could not determine branch for %s", targetHash)
} else if !ok {
return skerr.Fmt("Commit %s is not in primary branch", targetHash)
}
}
// Get the entries that should be added to the tracestore.
entries, err := extractTraceStoreEntries(gr, fileName)
if err != nil {
// Probably invalid, don't retry
return skerr.Wrapf(err, "could not create entries")
}
sklog.Debugf("Ingested %d entries to commit %s from file %s", len(entries), targetHash, fileName)
// Write the result to the tracestore.
err = b.ts.Put(ctx, targetHash, entries, time.Now())
if err != nil {
sklog.Errorf("Could not add entries to tracestore for file %s: %s", fileName, err)
return ingestion.ErrRetryable
}
return nil
}
// isOnMaster returns true if the given commit hash is on the master branch.
func (b *btProcessor) isOnMaster(ctx context.Context, hash string) (bool, error) {
// BT_VCS is configured to only look at master, so if we just look up the index of the hash,
// we will know if it is on the master branch.
// We can ignore the error, because it would be a "commit not found" error.
if i, _ := b.vcs.IndexOf(ctx, hash); i >= 0 {
return true, nil
}
if err := b.vcs.Update(ctx, true /*=pull*/, false /*=all branches*/); err != nil {
return false, skerr.Wrapf(err, "could not update VCS")
}
if i, _ := b.vcs.IndexOf(ctx, hash); i >= 0 {
return true, nil
}
return false, nil
}
// extractTraceStoreEntries creates a slice of tracestore.Entry for the given
// file. It will omit any entries that should be ignored. It returns an
// error if there were no un-ignored entries in the file.
func extractTraceStoreEntries(gr *jsonio.GoldResults, name string) ([]*tracestore.Entry, error) {
ret := make([]*tracestore.Entry, 0, len(gr.Results))
for _, result := range gr.Results {
params, options := paramsAndOptions(gr, result)
if err := shouldIngest(params, options); err != nil {
sklog.Infof("Not ingesting %s : %s", name, err)
continue
}
ret = append(ret, &tracestore.Entry{
Params: params,
Options: options,
Digest: result.Digest,
})
}
// If all results were ignored then we return an error.
if len(ret) == 0 {
return nil, skerr.Fmt("no valid results in file")
}
return ret, nil
}
// paramsAndOptions creates the params and options maps from a given file and entry.
func paramsAndOptions(gr *jsonio.GoldResults, r jsonio.Result) (map[string]string, map[string]string) {
params := make(map[string]string, len(gr.Key)+len(r.Key))
for k, v := range gr.Key {
params[k] = v
}
for k, v := range r.Key {
params[k] = v
}
return params, r.Options
}
// shouldIngest returns a descriptive error if we should ignore an entry
// with these params/options.
func shouldIngest(params, options map[string]string) error {
// Ignore anything that is not a png. In the early days (pre-2015), ext was omitted
// but implied to be "png". Thus if ext is not provided, it will be ingested.
// New entries (created by goldctl) will always have ext set.
if ext, ok := options["ext"]; ok && (ext != "png") {
return errors.New("ignoring non-png entry")
}
// Make sure the test name meets basic requirements.
testName := params[types.PrimaryKeyField]
// Ignore results that don't have a test given and log an error since that
// should not happen. But we want to keep other results in the same input file.
if testName == "" {
return errors.New("missing test name")
}
// Make sure the test name does not exceed the allowed length.
if len(testName) > types.MaximumNameLength {
return skerr.Fmt("received test name which is longer than the allowed %d bytes: %s", types.MaximumNameLength, testName)
}
return nil
}
// Example input is gs://chromeos-image-archive/sentry-release/R92-13944.0.0/manifest.xml
var metadataURLRegex = regexp.MustCompile(`gs://(?P<bucket>[a-z\-]+)/(?P<object>.+)`)
// Instead of parsing XML, we use a simple regex to extract the one field we need.
var revisionRegex = regexp.MustCompile(`name="chromiumos/platform/tast-tests" path="src/platform/tast-tests" revision="(?P<githash>[a-f0-9]+)"`)
// lookupChromeOSMetadata extracts the git hash (of the tast-tests repo) from the provided
// manifest.xml file. This manifest file contains the revisions of all ChromeOS repos at the
// time of the build, but we only are tracking the tast-tests one.
func (b *btProcessor) lookupChromeOSMetadata(ctx context.Context, gcsURL string) (string, error) {
gcsClient := b.source.(*ingestion.GCSSource).Client
matches := metadataURLRegex.FindStringSubmatch(gcsURL)
if matches == nil {
return "", skerr.Fmt("Invalid metadata string %q", gcsURL)
}
// match[1] is the bucket, match[2] is the object.
r, err := gcsClient.Bucket(matches[1]).Object(matches[2]).NewReader(ctx)
if err != nil {
return "", skerr.Wrapf(err, "getting reader for %q", gcsURL)
}
xmlBytes, err := ioutil.ReadAll(r)
if err != nil {
return "", skerr.Wrapf(err, "reading from %q", gcsURL)
}
match := revisionRegex.FindStringSubmatch(string(xmlBytes))
if match == nil {
return "", skerr.Fmt("Could not find tast-tests revision in XML from %q", gcsURL)
}
return match[1], nil
}