blob: 85dbd4de73d61d89621bcc647b3def32d55aab1f [file] [log] [blame]
package goldingestion
import (
"context"
"encoding/json"
"fmt"
"net/http"
"regexp"
"strings"
"time"
"github.com/golang/glog"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/ds"
"go.skia.org/infra/go/eventbus"
"go.skia.org/infra/go/gerrit"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/human"
"go.skia.org/infra/go/ingestion"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/sharedconfig"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/go/vcsinfo"
"go.skia.org/infra/golden/go/bbstate"
"go.skia.org/infra/golden/go/config"
"go.skia.org/infra/golden/go/tryjobstore"
"go.skia.org/infra/golden/go/types"
gstorage "google.golang.org/api/storage/v1"
)
// Define configuration options to be used in the config file under
// ExtraParams.
const (
CONFIG_GERRIT_CODE_REVIEW_URL = "GerritCodeReviewURL"
CONFIG_SERVICE_ACCOUNT_FILE = "ServiceAccountFile"
CONFIG_BUILD_BUCKET_URL = "BuildBucketURL"
CONFIG_BUILD_BUCKET_NAME = "BuildBucketName"
CONFIG_BUILD_BUCKET_POLL_INTERVAL = "BuildBucketPollInterval"
CONFIG_BUILD_BUCKET_TIME_WINDOW = "BuildBucketTimeWindow"
CONFIG_BUILDER_REGEX = "BuilderRegEx"
CONFIG_JOB_CFG_FILE = "JobConfigFile"
)
// Register the ingestion Processor with the ingestion framework.
func init() {
ingestion.Register(config.CONSTRUCTOR_GOLD_TRYJOB, newGoldTryjobProcessor)
}
// goldTryjobProcessor implements the ingestion.Processor interface to ingest
// tryjob results.
type goldTryjobProcessor struct {
buildIssueSync bbstate.BuildIssueSync
tryjobStore tryjobstore.TryjobStore
vcs vcsinfo.VCS
cfgFile string
syncMonitor *util.CondMonitor
}
// newGoldTryjobProcessor implements the ingestion.Constructor function.
func newGoldTryjobProcessor(vcs vcsinfo.VCS, config *sharedconfig.IngesterConfig, ignoreClient *http.Client, eventBus eventbus.EventBus) (ingestion.Processor, error) {
gerritURL := config.ExtraParams[CONFIG_GERRIT_CODE_REVIEW_URL]
if strings.TrimSpace(gerritURL) == "" {
return nil, fmt.Errorf("Missing URL for the Gerrit code review systems. Got value: '%s'", gerritURL)
}
// Get the config options.
svcAccountFile := config.ExtraParams[CONFIG_SERVICE_ACCOUNT_FILE]
sklog.Infof("Got service account file '%s'", svcAccountFile)
pollInterval, err := parseDuration(config.ExtraParams[CONFIG_BUILD_BUCKET_POLL_INTERVAL], bbstate.DefaultPollInterval)
if err != nil {
return nil, err
}
timeWindow, err := parseDuration(config.ExtraParams[CONFIG_BUILD_BUCKET_TIME_WINDOW], bbstate.DefaultTimeWindow)
if err != nil {
return nil, err
}
buildBucketURL := config.ExtraParams[CONFIG_BUILD_BUCKET_URL]
buildBucketName := config.ExtraParams[CONFIG_BUILD_BUCKET_NAME]
if (buildBucketURL == "") || (buildBucketName == "") {
return nil, fmt.Errorf("BuildBucketName and BuildBucketURL must not be empty.")
}
builderRegExp := config.ExtraParams[CONFIG_BUILDER_REGEX]
if builderRegExp == "" {
builderRegExp = bbstate.DefaultTestBuilderRegex
}
// Get the config file in the repo that should be parsed to determine whether a
// bot uploads results. Currently only applies to the Skia repo.
cfgFile := config.ExtraParams[CONFIG_JOB_CFG_FILE]
// Create the cloud tryjob store.
tryjobStore, err := tryjobstore.NewCloudTryjobStore(ds.DS, eventBus)
if err != nil {
return nil, fmt.Errorf("Error creating tryjob store: %s", err)
}
sklog.Infof("Cloud Tryjobstore Store created")
// Instantiate the Gerrit API client.
ts, err := auth.NewJWTServiceAccountTokenSource("", svcAccountFile, gstorage.CloudPlatformScope, "https://www.googleapis.com/auth/userinfo.email")
if err != nil {
return nil, fmt.Errorf("Failed to authenticate service account: %s", err)
}
client := httputils.DefaultClientConfig().WithTokenSource(ts).With2xxOnly().Client()
sklog.Infof("HTTP client instantiated")
gerritReview, err := gerrit.NewGerrit(gerritURL, "", client)
if err != nil {
return nil, err
}
sklog.Infof("Gerrit client instantiated")
bbConf := &bbstate.Config{
BuildBucketURL: buildBucketURL,
BuildBucketName: buildBucketName,
Client: client,
TryjobStore: tryjobStore,
GerritClient: gerritReview,
PollInterval: pollInterval,
TimeWindow: timeWindow,
BuilderRegexp: builderRegExp,
}
bbGerritClient, err := bbstate.NewBuildBucketState(bbConf)
if err != nil {
return nil, err
}
sklog.Infof("BuildBucketState created")
ret := &goldTryjobProcessor{
buildIssueSync: bbGerritClient,
tryjobStore: tryjobStore,
vcs: vcs,
cfgFile: cfgFile,
// The argument to NewCondMonitor is 1 because we always want exactly one go-routine per unique
// issue ID to enter the critical section. See the syncIssueAndTryjob function.
syncMonitor: util.NewCondMonitor(1),
}
eventBus.SubscribeAsync(tryjobstore.EV_TRYJOB_UPDATED, ret.tryjobUpdatedHandler)
return ret, nil
}
// See ingestion.Processor interface.
func (g *goldTryjobProcessor) Process(ctx context.Context, resultsFile ingestion.ResultFileLocation) error {
dmResults, err := processDMResults(resultsFile)
if err != nil {
sklog.Errorf("Error processing result: %s", err)
return ingestion.IgnoreResultsFileErr
}
// Make sure we have an issue, patchset and a buildbucket id.
if (dmResults.Issue <= 0) || (dmResults.Patchset <= 0) || (dmResults.BuildBucketID <= 0) {
sklog.Errorf("Invalid data. issue, patchset and buildbucket id must be > 0. Got (%d, %d, %d).", dmResults.Issue, dmResults.Patchset, dmResults.BuildBucketID)
return ingestion.IgnoreResultsFileErr
}
entries, err := extractTraceDBEntries(dmResults)
if err != nil {
sklog.Errorf("Error getting tracedb entries: %s", err)
return ingestion.IgnoreResultsFileErr
}
// Save the results to the trybot store.
issueID := dmResults.Issue
tryjob, err := g.tryjobStore.GetTryjob(issueID, dmResults.BuildBucketID)
if err != nil {
sklog.Errorf("Error retrieving tryjob: %s", err)
return ingestion.IgnoreResultsFileErr
}
tryjob, err = g.syncIssueAndTryjob(issueID, tryjob, dmResults, resultsFile.Name())
if err != nil {
return err
}
// Add the Githash of the underlying result.
if tryjob.MasterCommit == "" {
tryjob.MasterCommit = dmResults.GitHash
} else if tryjob.MasterCommit != dmResults.GitHash {
sklog.Errorf("Master commit in tryjob and ingested results do not match.")
return ingestion.IgnoreResultsFileErr
}
// Convert to a trybotstore.TryjobResult slice by aggregating parameters for each test/digest pair.
resultsMap := make(map[string]*tryjobstore.TryjobResult, len(entries))
for _, entry := range entries {
testName := types.TestName(entry.Params[types.PRIMARY_KEY_FIELD])
key := string(testName) + string(entry.Value)
if found, ok := resultsMap[key]; ok {
found.Params.AddParams(entry.Params)
} else {
resultsMap[key] = &tryjobstore.TryjobResult{
BuildBucketID: tryjob.BuildBucketID,
TestName: testName,
Digest: types.Digest(entry.Value),
Params: paramtools.NewParamSet(entry.Params),
}
}
}
tjResults := make([]*tryjobstore.TryjobResult, 0, len(resultsMap))
for _, result := range resultsMap {
tjResults = append(tjResults, result)
}
// Update the database with the results.
if err := g.tryjobStore.UpdateTryjobResult(tjResults); err != nil {
return skerr.Fmt("Error updating tryjob results: %s", err)
}
tryjob.Status = tryjobstore.TRYJOB_INGESTED
if err := g.tryjobStore.UpdateTryjob(0, tryjob, nil); err != nil {
return err
}
sklog.Infof("Ingested: %s", resultsFile.Name())
return nil
}
func (g *goldTryjobProcessor) syncIssueAndTryjob(issueID int64, tryjob *tryjobstore.Tryjob, dmResults *DMResults, resultFileName string) (*tryjobstore.Tryjob, error) {
// Only let one thread in at time for each issueID. In most cases they will follow the fast
// path of finding the issue and having an non-nil tryjob.
defer g.syncMonitor.Enter(issueID).Release()
// Fetch the issue and check if the trybot is contained.
issue, err := g.tryjobStore.GetIssue(issueID, false)
if err != nil {
return nil, skerr.Fmt("Unable to retrieve issue %d to process file %s. Got error: %s", issueID, resultFileName, err)
}
// If we haven't loaded the tryjob then see if we can fetch it from
// Gerrit and Buildbucket. This should be the exception since tryjobs should
// be picket up by BuildBucketState as they are added.
if (tryjob == nil) || (issue == nil) || !issue.HasPatchset(tryjob.PatchsetID) {
var err error
if issue, tryjob, err = g.buildIssueSync.SyncIssueTryjob(issueID, dmResults.BuildBucketID); err != nil {
sklog.Errorf("Error fetching the issue and tryjob information: %s", err)
return nil, ingestion.IgnoreResultsFileErr
}
// If the issue is nil, that means it could not be found (404) and we don't want to ingest this
// file again (and avoid repeated errors).
if issue == nil {
sklog.Errorf("Unable to find issue %d. Received 404 from Gerrit.", issueID)
return nil, ingestion.IgnoreResultsFileErr
}
}
return tryjob, nil
}
// setTryjobToState is a utility function that updates the status of a tryjob
// to the new status if the new status is a logical successor to the current status.
// If tryjob is nil, issueID and tryjobID will be used to fetch the tryjob record.
func (g *goldTryjobProcessor) setTryjobToStatus(tryjob *tryjobstore.Tryjob, minStatus tryjobstore.TryjobStatus, newStatus tryjobstore.TryjobStatus) error {
return g.tryjobStore.UpdateTryjob(tryjob.BuildBucketID, nil, func(curr interface{}) interface{} {
tryjob := curr.(*tryjobstore.Tryjob)
// Only update if it is at the minimum status level and the status increases.
if (tryjob.Status < minStatus) || (tryjob.Status >= newStatus) {
return nil
}
tryjob.Status = newStatus
return tryjob
})
}
// See ingestion.Processor interface.
func (g *goldTryjobProcessor) BatchFinished() error { return nil }
// parseDuration parses the given duration. If strVal is empty the default value
// will be returned. If the given duration is invalid an error will be returned.
func parseDuration(strVal string, defaultVal time.Duration) (time.Duration, error) {
if strVal == "" {
return defaultVal, nil
}
return human.ParseDuration(strVal)
}
// tryjobUpdateHandler is the event handler for when a tryjob is updated in the
// underlying tryjob store.
func (g *goldTryjobProcessor) tryjobUpdatedHandler(evData interface{}) {
// Make a shallow copy of the tryjob.
tryjob := *(evData.(*tryjobstore.Tryjob))
// Check if this is a no-upload bot. If that's the case mark the bot as ingested.
if g.noUpload(tryjob.Builder, tryjob.MasterCommit) {
// Mark as ingested if it has completed.
if err := g.setTryjobToStatus(&tryjob, tryjobstore.TRYJOB_COMPLETE, tryjobstore.TRYJOB_INGESTED); err != nil {
sklog.Errorf("Unable to set tryjob (%d, %d) to status 'ingested': %s", tryjob.IssueID, tryjob.BuildBucketID, err)
}
sklog.Infof("Job %d for issue %d marked as ingested.", tryjob.BuildBucketID, tryjob.IssueID)
}
}
// TODO(stephana): Make the noUpload code use the same code as gen_tasks.go in the
// skia repo. This is essentially a copy of the code, but uses the same source of
// information (the cfg.json file from skia).
// noUpload returns true if this builder does not upload results and we should
// therefore not wait for results to appear.
func (g *goldTryjobProcessor) noUpload(builder, commit string) bool {
if g.cfgFile == "" {
return false
}
ctx := context.Background()
cfgContent, err := g.vcs.GetFile(ctx, g.cfgFile, commit)
if err != nil {
sklog.Errorf("Error retrieving %s: %s", g.cfgFile, err)
}
// Parse the config file used to generate the tasks.
config := struct {
NoUpload []string `json:"no_upload"`
}{}
if err := json.Unmarshal([]byte(cfgContent), &config); err != nil {
sklog.Errorf("Unable to parse %s. Got error: %s", g.cfgFile, err)
}
// See if we match the builders that should not be uploaded.
for _, s := range config.NoUpload {
m, err := regexp.MatchString(s, builder)
if err != nil {
glog.Errorf("Error matching regex: %s", err)
continue
}
if m {
return true
}
}
return false
}