blob: 27887eb04b8562cec1c7863502a146d25eda1ba8 [file] [log] [blame]
package goldingestion
import (
"context"
"fmt"
"net/http"
"go.skia.org/infra/go/eventbus"
"go.skia.org/infra/go/ingestion"
"go.skia.org/infra/go/sharedconfig"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
tracedb "go.skia.org/infra/go/trace/db"
"go.skia.org/infra/go/vcsinfo"
"go.skia.org/infra/golden/go/config"
"go.skia.org/infra/golden/go/types"
)
const (
// Configuration option that identifies the address of the traceDB service.
CONFIG_TRACESERVICE = "TraceService"
// Configuration option for the secondary repository.
CONFIG_SECONDARY_REPO = "SecondaryRepoURL"
// Configuration option to define the regular expression to extract the
// commit from the secondary repo. The provided regular expression must
// contain exactly one group which maps to the commit in the DEPS file.
CONFIG_SECONDARY_REG_EX = "SecondaryRegEx"
)
// Register the processor with the ingestion framework.
func init() {
ingestion.Register(config.CONSTRUCTOR_GOLD, newGoldProcessor)
}
// goldProcessor implements the ingestion.Processor interface for gold.
type goldProcessor struct {
traceDB tracedb.DB
vcs vcsinfo.VCS
}
// implements the ingestion.Constructor signature.
func newGoldProcessor(vcs vcsinfo.VCS, config *sharedconfig.IngesterConfig, client *http.Client, eventBus eventbus.EventBus) (ingestion.Processor, error) {
traceDB, err := tracedb.NewTraceServiceDBFromAddress(config.ExtraParams[CONFIG_TRACESERVICE], types.GoldenTraceBuilder)
if err != nil {
return nil, err
}
ret := &goldProcessor{
traceDB: traceDB,
vcs: vcs,
}
return ret, nil
}
// See ingestion.Processor interface.
func (g *goldProcessor) Process(ctx context.Context, resultsFile ingestion.ResultFileLocation) error {
dmResults, err := processDMResults(resultsFile)
if err != nil {
return skerr.Fmt("could not process results file: %s", err)
}
if len(dmResults.Results) == 0 {
sklog.Infof("ignoring file %s because it has no results", resultsFile.Name())
return ingestion.IgnoreResultsFileErr
}
var commit *vcsinfo.LongCommit = nil
// If the target commit is not in the primary repository we look it up
// in the secondary that has the primary as a dependency.
targetHash, err := g.getCanonicalCommitHash(ctx, dmResults.GitHash)
if err != nil {
if err == ingestion.IgnoreResultsFileErr {
return ingestion.IgnoreResultsFileErr
}
return skerr.Fmt("could not identify canonical commit from %q: %s", dmResults.GitHash, err)
}
commit, err = g.vcs.Details(ctx, targetHash, true)
if err != nil {
return skerr.Fmt("could not get details for git commit %q: %s", targetHash, err)
}
if !commit.Branches["master"] {
sklog.Warningf("Commit %s is not in master branch. Got branches: %v", commit.Hash, commit.Branches)
return ingestion.IgnoreResultsFileErr
}
// Add the column to the trace db.
cid, err := g.getCommitID(commit)
if err != nil {
return skerr.Fmt("could not get trace db id: %s", err)
}
// Get the entries that should be added to the tracedb.
entries, err := extractTraceDBEntries(dmResults)
if err != nil {
return skerr.Fmt("could not create entries for results: %s", err)
}
// Write the result to the tracedb.
err = g.traceDB.Add(cid, entries)
if err != nil {
return skerr.Fmt("could not add to tracedb: %s", err)
}
return nil
}
// See ingestion.Processor interface.
func (g *goldProcessor) BatchFinished() error { return nil }
// getCommitID extracts the commitID from the given commit.
func (g *goldProcessor) getCommitID(commit *vcsinfo.LongCommit) (*tracedb.CommitID, error) {
return &tracedb.CommitID{
Timestamp: commit.Timestamp.Unix(),
ID: commit.Hash,
Source: "master",
}, nil
}
// getCanonicalCommitHash returns the commit hash in the primary repository. If the given
// target hash is not in the primary repository it will try and find it in the secondary
// repository which has the primary as a dependency.
func (g *goldProcessor) getCanonicalCommitHash(ctx context.Context, targetHash string) (string, error) {
// If it is not in the primary repo.
if !isCommit(ctx, g.vcs, targetHash) {
// Extract the commit.
foundCommit, err := g.vcs.ResolveCommit(ctx, targetHash)
if err != nil && err != vcsinfo.NoSecondaryRepo {
return "", fmt.Errorf("Unable to resolve commit %s in primary or secondary repo. Got err: %s", targetHash, err)
}
if foundCommit == "" {
if err == vcsinfo.NoSecondaryRepo {
sklog.Warningf("Unable to find commit %s in primary or secondary repo.", targetHash)
} else {
sklog.Warningf("Unable to find commit %s in primary repo and no secondary configured.", targetHash)
}
return "", ingestion.IgnoreResultsFileErr
}
// Check if the found commit is actually in the primary repository. This could indicate misconfiguration
// of the secondary repo.
if !isCommit(ctx, g.vcs, foundCommit) {
return "", fmt.Errorf("Found invalid commit %s in secondary repo at commit %s. Not contained in primary repo.", foundCommit, targetHash)
}
sklog.Infof("Commit translation: %s -> %s", targetHash, foundCommit)
targetHash = foundCommit
}
return targetHash, nil
}
// isCommit returns true if the given commit is in vcs.
func isCommit(ctx context.Context, vcs vcsinfo.VCS, commitHash string) bool {
ret, err := vcs.Details(ctx, commitHash, false)
return (err == nil) && (ret != nil)
}