blob: 3cb9d0bc172352b6e449d409d3176bf754a034d2 [file] [log] [blame]
package ingestion_processors
import (
"context"
"net/http"
"strings"
"go.skia.org/infra/go/buildbucket"
"go.skia.org/infra/go/firestore"
"go.skia.org/infra/go/gerrit"
"go.skia.org/infra/go/ingestion"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/sharedconfig"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/vcsinfo"
"go.skia.org/infra/golden/go/clstore"
"go.skia.org/infra/golden/go/clstore/fs_clstore"
"go.skia.org/infra/golden/go/code_review"
"go.skia.org/infra/golden/go/code_review/gerrit_crs"
"go.skia.org/infra/golden/go/continuous_integration"
"go.skia.org/infra/golden/go/continuous_integration/buildbucket_cis"
"go.skia.org/infra/golden/go/jsonio"
"go.skia.org/infra/golden/go/shared"
"go.skia.org/infra/golden/go/tjstore"
"go.skia.org/infra/golden/go/tjstore/fs_tjstore"
)
const (
firestoreTryJobIngester = "gold-tryjob-fs"
firestoreProjectIDParam = "FirestoreProjectID"
firestoreNamespaceParam = "FirestoreNamespace"
codeReviewSystemParam = "CodeReviewSystem"
gerritURLParam = "GerritURL"
continuousIntegrationSystemParam = "ContinuousIntegrationSystem"
gerritCRS = "gerrit"
buildbucketCIS = "buildbucket"
)
// Register the ingestion Processor with the ingestion framework.
func init() {
ingestion.Register(firestoreTryJobIngester, newModularTryjobProcessor)
}
// goldTryjobProcessor implements the ingestion.Processor interface to ingest tryjob results.
type goldTryjobProcessor struct {
reviewClient code_review.Client
integrationClient continuous_integration.Client
changeListStore clstore.Store
tryJobStore tjstore.Store
crsName string
cisName string
}
// newModularTryjobProcessor returns an ingestion.Processor which is modular and can support
// different CodeReviewSystems (e.g. "Gerrit", "GitHub") and different ContinuousIntegrationSystems
// (e.g. "BuildBucket", "CirrusCI"). This particular implementation stores the data in Firestore.
func newModularTryjobProcessor(ctx context.Context, _ vcsinfo.VCS, config *sharedconfig.IngesterConfig, client *http.Client) (ingestion.Processor, error) {
crsName := config.ExtraParams[codeReviewSystemParam]
if strings.TrimSpace(crsName) == "" {
return nil, skerr.Fmt("missing code review system (e.g. 'gerrit')")
}
crs, err := codeReviewSystemFactory(crsName, config, client)
if err != nil {
return nil, skerr.Wrapf(err, "could not create client for CRS %q", crsName)
}
cisName := config.ExtraParams[continuousIntegrationSystemParam]
if strings.TrimSpace(cisName) == "" {
return nil, skerr.Fmt("missing continuous integration system (e.g. 'buildbucket')")
}
cis, err := continuousIntegrationSystemFactory(cisName, config, client)
if err != nil {
return nil, skerr.Wrapf(err, "could not create client for CIS %q", cisName)
}
fsProjectID := config.ExtraParams[firestoreProjectIDParam]
if strings.TrimSpace(fsProjectID) == "" {
return nil, skerr.Fmt("missing firestore project id")
}
fsNamespace := config.ExtraParams[firestoreNamespaceParam]
if strings.TrimSpace(fsNamespace) == "" {
return nil, skerr.Fmt("missing firestore namespace")
}
fsClient, err := firestore.NewClient(ctx, fsProjectID, "gold", fsNamespace, nil)
if err != nil {
return nil, skerr.Wrapf(err, "could not init firestore in project %s, namespace %s", fsProjectID, fsNamespace)
}
return &goldTryjobProcessor{
reviewClient: crs,
integrationClient: cis,
changeListStore: fs_clstore.New(fsClient, crsName),
tryJobStore: fs_tjstore.New(fsClient, cisName),
crsName: crsName,
cisName: cisName,
}, nil
}
func codeReviewSystemFactory(crsName string, config *sharedconfig.IngesterConfig, client *http.Client) (code_review.Client, error) {
if crsName == gerritCRS {
gerritURL := config.ExtraParams[gerritURLParam]
if strings.TrimSpace(gerritURL) == "" {
return nil, skerr.Fmt("missing URL for the Gerrit code review system")
}
gerritClient, err := gerrit.NewGerrit(gerritURL, "", client)
if err != nil {
return nil, skerr.Wrapf(err, "creating gerrit client for %s", gerritURL)
}
return gerrit_crs.New(gerritClient), nil
}
return nil, skerr.Fmt("CodeReviewSystem %q not recognized", crsName)
}
func continuousIntegrationSystemFactory(cisName string, _ *sharedconfig.IngesterConfig, client *http.Client) (continuous_integration.Client, error) {
if cisName == buildbucketCIS {
bbClient := buildbucket.NewClient(client)
return buildbucket_cis.New(bbClient), nil
}
return nil, skerr.Fmt("ContinuousIntegrationSystem %q not recognized", cisName)
}
// Process implements the Processor interface.
func (g *goldTryjobProcessor) Process(ctx context.Context, rf ingestion.ResultFileLocation) error {
defer metrics2.FuncTimer().Stop()
gr, err := processGoldResults(rf)
if err != nil {
sklog.Errorf("Error processing result: %s", err)
return ingestion.IgnoreResultsFileErr
}
clID := ""
psOrder := 0
crs := gr.CodeReviewSystem
if crs == "" || crs == g.crsName {
// Default to Gerrit
crs = gerritCRS
clID = gr.ChangeListID
psOrder = gr.PatchSetOrder
} else {
sklog.Warningf("Result %s said it was for crs %q, but this ingester is configured for %s", rf.Name(), crs, g.crsName)
// We only support one CRS and one CIS at the moment, but if needed, we can have
// multiple configured and pivot to the one we need.
return ingestion.IgnoreResultsFileErr
}
tjID := ""
cis := gr.ContinuousIntegrationSystem
if cis == "" || cis == g.cisName {
// Default to BuildBucket
cis = buildbucketCIS
tjID = gr.TryJobID
} else {
sklog.Warningf("Result %s said it was for cis %q, but this ingester is configured for %s", rf.Name(), cis, g.cisName)
// We only support one CRS and one CIS at the moment, but if needed, we can have
// multiple configured and pivot to the one we need.
return ingestion.IgnoreResultsFileErr
}
// Fetch CL from clstore if we have seen it before, from CRS if we have not.
_, err = g.changeListStore.GetChangeList(ctx, clID)
if err == clstore.ErrNotFound {
cl, err := g.reviewClient.GetChangeList(ctx, clID)
if err == code_review.ErrNotFound {
sklog.Warningf("Unknown %s CL with id %q", crs, clID)
// Try again later - maybe the input was created before the CL?
return ingestion.IgnoreResultsFileErr
} else if err != nil {
return skerr.Wrapf(err, "fetching CL from %s with id %q", crs, clID)
}
err = g.changeListStore.PutChangeList(ctx, cl)
if err != nil {
return skerr.Wrapf(err, "storing CL with id %q to clstore", clID)
}
} else if err != nil {
return skerr.Wrapf(err, "fetching CL from clstore with id %q", clID)
}
// Fetch PS from clstore if we have seen it before, from CRS if we have not.
ps, err := g.changeListStore.GetPatchSetByOrder(ctx, clID, psOrder)
if err == clstore.ErrNotFound {
xps, err := g.reviewClient.GetPatchSets(ctx, clID)
if err != nil {
return skerr.Wrapf(err, "could not get patchsets for %s cl %s", crs, clID)
}
// It should be ok to put any PatchSets we've seen before - they should be immutable.
found := false
for _, p := range xps {
if p.Order == psOrder {
if err := g.changeListStore.PutPatchSet(ctx, p); err != nil {
return skerr.Wrapf(err, "could not store PS %d of CL %q to clstore", psOrder, clID)
}
ps = p
found = true
break
}
}
if !found {
sklog.Warningf("Unknown %s PS with order %d for CL %q", crs, psOrder, clID)
// Try again later - maybe the input was created before the CL uploaded its PS?
return ingestion.IgnoreResultsFileErr
}
} else if err != nil {
return skerr.Wrapf(err, "fetching PS from clstore with order %d for CL %q", psOrder, clID)
}
combinedID := tjstore.CombinedPSID{CL: clID, PS: ps.SystemID, CRS: crs}
_, err = g.tryJobStore.GetTryJob(ctx, tjID)
if err == tjstore.ErrNotFound {
tj, err := g.integrationClient.GetTryJob(ctx, tjID)
if err == tjstore.ErrNotFound {
sklog.Warningf("Unknown %s Tryjob with id %q", cis, tjID)
// Try again later - maybe there's some lag with the Integration System?
return ingestion.IgnoreResultsFileErr
} else if err != nil {
return skerr.Wrapf(err, "fetching tryjob from %s with id %q", cis, tjID)
}
err = g.tryJobStore.PutTryJob(ctx, combinedID, tj)
if err != nil {
return skerr.Wrapf(err, "storing tryjob %q to tryjobstore", tjID)
}
} else if err != nil {
return skerr.Wrapf(err, "fetching TryJob with id %s", tjID)
}
defer shared.NewMetricsTimer("put_tryjobstore_entries").Stop()
// Store the results from the file.
tjr := toTryJobResults(gr)
err = g.tryJobStore.PutResults(ctx, combinedID, tjID, tjr)
if err != nil {
return skerr.Wrapf(err, "putting %d results for CL %s, PS %d, TJ %s, file %s", len(tjr), clID, psOrder, tjID, rf.Name())
}
return nil
}
// toTryJobResults converts the JSON file to a slize of TryJobResult.
func toTryJobResults(j *jsonio.GoldResults) []tjstore.TryJobResult {
var tjr []tjstore.TryJobResult
for _, r := range j.Results {
tjr = append(tjr, tjstore.TryJobResult{
GroupParams: j.Key,
ResultParams: r.Key,
Options: r.Options,
Digest: r.Digest,
})
}
return tjr
}