blob: 459915eda4197f563d5785b5a4f8dbb7d87dca43 [file] [log] [blame]
package ingestion_processors
import (
"context"
"fmt"
"net/http"
"sort"
"strings"
"go.skia.org/infra/go/eventbus"
"go.skia.org/infra/go/ingestion"
"go.skia.org/infra/go/sharedconfig"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/tiling"
tracedb "go.skia.org/infra/go/trace/db"
"go.skia.org/infra/go/vcsinfo"
"go.skia.org/infra/golden/go/config"
"go.skia.org/infra/golden/go/jsonio"
"go.skia.org/infra/golden/go/types"
)
const (
// Configuration option that identifies the address of the traceDB service.
tracedbServiceConfig = "TraceService"
)
// Register the processor with the ingestion framework.
func init() {
ingestion.Register(config.CONSTRUCTOR_GOLD, newDeprecatedTraceDBProcessor)
}
// traceDBProcessor implements the ingestion.Processor interface for gold.
type traceDBProcessor struct {
traceDB tracedb.DB
vcs vcsinfo.VCS
}
// implements the ingestion.Constructor signature.
func newDeprecatedTraceDBProcessor(vcs vcsinfo.VCS, config *sharedconfig.IngesterConfig, _ *http.Client, _ eventbus.EventBus) (ingestion.Processor, error) {
traceDB, err := tracedb.NewTraceServiceDBFromAddress(config.ExtraParams[tracedbServiceConfig], types.GoldenTraceBuilder)
if err != nil {
return nil, err
}
ret := &traceDBProcessor{
traceDB: traceDB,
vcs: vcs,
}
return ret, nil
}
// See ingestion.Processor interface.
func (g *traceDBProcessor) Process(ctx context.Context, resultsFile ingestion.ResultFileLocation) error {
dmResults, err := processDMResults(resultsFile)
if err != nil {
return skerr.Fmt("could not process results file: %s", err)
}
if len(dmResults.Results) == 0 {
sklog.Infof("ignoring file %s because it has no results", resultsFile.Name())
return ingestion.IgnoreResultsFileErr
}
var commit *vcsinfo.LongCommit = nil
// If the target commit is not in the primary repository we look it up
// in the secondary that has the primary as a dependency.
targetHash, err := getCanonicalCommitHash(ctx, g.vcs, dmResults.GitHash)
if err != nil {
if err == ingestion.IgnoreResultsFileErr {
return ingestion.IgnoreResultsFileErr
}
return skerr.Fmt("could not identify canonical commit from %q: %s", dmResults.GitHash, err)
}
commit, err = g.vcs.Details(ctx, targetHash, true)
if err != nil {
return skerr.Fmt("could not get details for git commit %q: %s", targetHash, err)
}
if !commit.Branches["master"] {
sklog.Warningf("Commit %s is not in master branch. Got branches: %v", commit.Hash, commit.Branches)
return ingestion.IgnoreResultsFileErr
}
// Add the column to the trace db.
cid, err := g.getCommitID(commit)
if err != nil {
return skerr.Fmt("could not get trace db id: %s", err)
}
// Get the entries that should be added to the tracedb.
entries, err := extractTraceDBEntries(dmResults)
if err != nil {
return skerr.Fmt("could not create entries for results: %s", err)
}
// Write the result to the tracedb.
err = g.traceDB.Add(cid, entries)
if err != nil {
return skerr.Fmt("could not add to tracedb: %s", err)
}
return nil
}
// See ingestion.Processor interface.
func (g *traceDBProcessor) BatchFinished() error { return nil }
// getCommitID extracts the commitID from the given commit.
func (g *traceDBProcessor) getCommitID(commit *vcsinfo.LongCommit) (*tracedb.CommitID, error) {
return &tracedb.CommitID{
Timestamp: commit.Timestamp.Unix(),
ID: commit.Hash,
Source: "master",
}, nil
}
// extractTraceDBEntries returns the traceDB entries to be inserted into the data store.
func extractTraceDBEntries(dm *dmResults) (map[tiling.TraceId]*tracedb.Entry, error) {
ret := make(map[tiling.TraceId]*tracedb.Entry, len(dm.Results))
for _, result := range dm.Results {
traceId, params := idAndParams(dm, result)
if ignoreResult(dm, params) {
continue
}
ret[traceId] = &tracedb.Entry{
Params: params,
Value: []byte(result.Digest),
}
}
// If all results were ignored then we return an error.
if len(ret) == 0 {
return nil, fmt.Errorf("No valid results in file %s.", dm.name)
}
return ret, nil
}
// idAndParams constructs the Trace ID and the Trace params from the keys and options.
// It returns the id as a string of all the values, in the alphabetic order of the
// keys, separated by a colon. The trace params returned are a single map of
// key-> values. "Options" are omitted from the trace id, as per design.
func idAndParams(dm *dmResults, r *jsonio.Result) (tiling.TraceId, map[string]string) {
combinedLen := len(dm.Key) + len(r.Key)
traceIdParts := make(map[string]string, combinedLen)
params := make(map[string]string, combinedLen+len(r.Options))
for k, v := range dm.Key {
traceIdParts[k] = v
params[k] = v
}
for k, v := range r.Key {
traceIdParts[k] = v
params[k] = v
}
for k, v := range r.Options {
params[k] = v
}
keys := []string{}
for k := range traceIdParts {
keys = append(keys, k)
}
sort.Strings(keys)
values := []string{}
for _, k := range keys {
values = append(values, traceIdParts[k])
}
return tiling.TraceId(strings.Join(values, ":")), params
}
// ignoreResult returns true if the result with the given parameters should be
// ignored.
func ignoreResult(dm *dmResults, params map[string]string) bool {
// Ignore anything that is not a png. In the early days (pre-2015), ext was omitted
// but implied to be "png". Thus if ext is not provided, it will be ingested.
// New entries (created by goldctl) will always have ext set.
if ext, ok := params["ext"]; ok && (ext != "png") {
return true
}
// Make sure the test name meets basic requirements.
testName := params[types.PRIMARY_KEY_FIELD]
// Ignore results that don't have a test given and log an error since that
// should not happen. But we want to keep other results in the same input file.
if testName == "" {
sklog.Errorf("Missing test name in %s", dm.name)
return true
}
// Make sure the test name does not exceed the allowed length.
if len(testName) > types.MAXIMUM_NAME_LENGTH {
sklog.Errorf("Received test name which is longer than the allowed %d bytes: %s", types.MAXIMUM_NAME_LENGTH, testName)
return true
}
return false
}