| package verifiers |
| |
| import ( |
| "context" |
| "fmt" |
| "net/http" |
| "net/url" |
| "regexp" |
| "strconv" |
| "strings" |
| "time" |
| |
| buildbucketpb "go.chromium.org/luci/buildbucket/proto" |
| |
| "go.skia.org/infra/go/buildbucket" |
| "go.skia.org/infra/go/gerrit" |
| "go.skia.org/infra/go/git" |
| "go.skia.org/infra/go/skerr" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/skcq/go/codereview" |
| "go.skia.org/infra/skcq/go/config" |
| "go.skia.org/infra/skcq/go/footers" |
| "go.skia.org/infra/skcq/go/types" |
| "go.skia.org/infra/task_scheduler/go/specs" |
| ) |
| |
| const ( |
| // Time to wait before re-running old jobs for fresh results. |
| TryJobStaleTimeoutSecs = 24 * 60 * 60 |
| |
| BuildBucketDefaultSkiaProject = "skia" |
| BuildBucketInternalSkiaProject = "skia-internal" |
| |
| BuildBucketDefaultSkiaBucket = "skia.primary" |
| BuildBucketInternalSkiaBucket = "skia.internal" |
| BuildBucketStagingSkiaBucket = "skia.testing" |
| |
| CancelBuildsMsg = "SkCQ is cleaning up try jobs from older patchsets" |
| |
| // The maximum number of different try jobs that will be retried before the |
| // verifier returns failure. |
| MaxFailedTryJobsToRetry = 2 |
| |
| // When SkCQ retries try jobs it uses the following tag name. |
| RetryTagName = "skcq_retry" |
| ) |
| |
| // timeNowFunc allows tests to mock out time.Now() for testing. |
| var timeNowFunc = time.Now |
| |
| // NewTryJobsVerifier returns an instance of TryJobsVerifier. |
| func NewTryJobsVerifier(httpClient *http.Client, cr codereview.CodeReview, tasksCfg *specs.TasksCfg, footersMap map[string]string, visibilityType config.VisibilityType) (types.Verifier, error) { |
| // Find gerritURL (eg: skia-review.googlesource.com). |
| issueURL := cr.Url(0) |
| u, err := url.Parse(issueURL) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "Could not url.Parse %s", issueURL) |
| } |
| |
| return &TryJobsVerifier{ |
| bb2: buildbucket.NewClient(httpClient), |
| cr: cr, |
| gerritURL: u.Host, |
| tasksCfg: tasksCfg, |
| footersMap: footersMap, |
| visibilityType: visibilityType, |
| }, nil |
| } |
| |
| // TryJobsVerifier implements the types.Verifier interface. |
| type TryJobsVerifier struct { |
| bb2 buildbucket.BuildBucketInterface |
| cr codereview.CodeReview |
| tasksCfg *specs.TasksCfg |
| gerritURL string |
| footersMap map[string]string |
| visibilityType config.VisibilityType |
| } |
| |
| // Name implements the types.Verifier interface. |
| func (tv *TryJobsVerifier) Name() string { |
| return "TryJobsVerifier" |
| } |
| |
| // Verify implements the types.Verifier interface. |
| func (tv *TryJobsVerifier) Verify(ctx context.Context, ci *gerrit.ChangeInfo, startTime int64) (state types.VerifierState, reason string, err error) { |
| // If CQ tryjobs list is empty then return success. No bots to run. |
| if tv.tasksCfg == nil || tv.tasksCfg.CommitQueue == nil || len(tv.tasksCfg.CommitQueue) == 0 { |
| return types.VerifierSuccessState, "This repo+branch has no CQ try jobs", nil |
| } |
| |
| // If "No-Try: true" has been specified then immediately return success. |
| noTry := git.GetBoolFooterVal(tv.footersMap, footers.NoTryFooter, ci.Issue) |
| if noTry { |
| return types.VerifierSuccessState, fmt.Sprintf("Try jobs check is skipped because \"%s: %t\" has been specified", footers.NoTryFooter, noTry), nil |
| } |
| |
| // Figure out which BB bucket should be used for this change. |
| var bbProject string |
| var bbBucket string |
| switch tv.visibilityType { |
| case config.InternalVisibility: |
| bbProject = BuildBucketInternalSkiaProject |
| bbBucket = BuildBucketInternalSkiaBucket |
| case config.StagingVisibility: |
| bbProject = BuildBucketDefaultSkiaProject |
| bbBucket = BuildBucketStagingSkiaBucket |
| default: |
| bbProject = BuildBucketDefaultSkiaProject |
| bbBucket = BuildBucketDefaultSkiaBucket |
| } |
| |
| // Keeps track of how many failed try jobs should be retried. |
| retryQuota := MaxFailedTryJobsToRetry |
| alreadyRetriedTryJobIDs := map[int64]bool{} |
| |
| // Search for all try jobs that have been triggered on all equivalent |
| // patchsets. We do this because if PS5 and PS4 are NO_CHANGE patchsets |
| // and PS3 is a CODE_CHANGE patchset, then we want to consider the try jobs |
| // on PS5 + PS4 + PS3. |
| latestPatchSetID := tv.cr.GetLatestPatchSetID(ci) |
| equivalentPatchSetIDS := tv.cr.GetEquivalentPatchSetIDs(ci, latestPatchSetID) |
| tryJobsOnChange := []*buildbucketpb.Build{} |
| for _, p := range equivalentPatchSetIDS { |
| tryJobs, err := tv.bb2.GetTrybotsForCL(ctx, ci.Issue, p, "https://"+tv.gerritURL, nil) |
| if err != nil { |
| return "", "", skerr.Wrapf(err, "Could not get tryjobs for %d", ci.Issue) |
| } |
| tryJobsOnChange = append(tryJobsOnChange, tryJobs...) |
| } |
| // Create map of builder name to buildbucketpb.Build to easily reference |
| // which builds are already on the change. |
| nameToBuilderOnChange := map[string]*buildbucketpb.Build{} |
| for _, b := range tryJobsOnChange { |
| if existingTryJob, ok := nameToBuilderOnChange[b.GetBuilder().GetBuilder()]; ok { |
| // If existing try job is older then replace it. |
| if existingTryJob.GetCreateTime().Seconds < b.GetCreateTime().Seconds { |
| nameToBuilderOnChange[b.GetBuilder().GetBuilder()] = b |
| } |
| } else { |
| nameToBuilderOnChange[b.GetBuilder().GetBuilder()] = b |
| } |
| // Check to see if this try job has been retried in the current CQ attempt. |
| tags := b.GetTags() |
| for _, t := range tags { |
| if t.GetKey() == RetryTagName && t.GetValue() == strconv.FormatInt(startTime, 10) { |
| alreadyRetriedTryJobIDs[b.Id] = true |
| retryQuota -= 1 |
| break |
| } |
| } |
| } |
| |
| // Get all CQ try jobs defined in tasksCfg for this change. |
| cqTryjobsToConfigs := tv.tasksCfg.CommitQueue |
| |
| // Check, parse, and add the try jobs in IncludeTryjobsFooter if specified. |
| includeTryJobs, err := tv.getIncludeFooterTryJobs(ci.Issue, bbProject, bbBucket) |
| if err != nil { |
| return types.VerifierFailureState, err.Error(), nil |
| } |
| for _, t := range includeTryJobs { |
| // Only add to the cqTryjobsToConfigs map if it is not already there. |
| if _, ok := cqTryjobsToConfigs[t]; !ok { |
| sklog.Infof("[%d] Added tryjob %s because it was specified in %s", ci.Issue, t, footers.IncludeTryjobsFooter) |
| // Add a default CommitQueueJobConfig config for try jobs listed in |
| // IncludeTryjobsFooter. |
| cqTryjobsToConfigs[t] = &specs.CommitQueueJobConfig{} |
| } |
| } |
| |
| // See if successful try jobs should be retriggered. |
| rerunTryJobs := git.GetBoolFooterVal(tv.footersMap, footers.RerunTryjobsFooter, ci.Issue) |
| if rerunTryJobs { |
| sklog.Infof("[%d] \"%s: %t\" has been specified. All successful try jobs that completed before this cq attempt will not be reused.", ci.Issue, footers.RerunTryjobsFooter, rerunTryJobs) |
| } |
| |
| // Loop through the CQ try jobs and populate these slices. |
| extraInfoForUIMsgs := []string{} |
| jobsToExperimental := map[string]bool{} |
| jobsToRetry := map[string]bool{} |
| skippedTryJobs := []string{} |
| staleTryJobs := []string{} |
| reuseSuccessTryJobs := []string{} |
| reuseRunningTryJobs := []string{} |
| notFoundTryJobs := []string{} |
| for cqJobName, cqCfg := range cqTryjobsToConfigs { |
| // Store the experimental status of this try job. |
| jobsToExperimental[cqJobName] = cqCfg.Experimental |
| |
| // Make sure the location regex (if specified) matches before we consider this job. |
| if len(cqCfg.LocationRegexes) > 0 { |
| matched, locationRegexMatch, err := tv.doesLocationRegexMatch(ctx, ci, latestPatchSetID, cqCfg.LocationRegexes) |
| if err != nil { |
| return "", "", skerr.Wrap(err) |
| } |
| if matched { |
| // Process this try job. |
| addedReason := fmt.Sprintf("%s added because it matched the location regex: %s", cqJobName, locationRegexMatch) |
| sklog.Infof("[%d] %s", ci.Issue, addedReason) |
| extraInfoForUIMsgs = append(extraInfoForUIMsgs, addedReason) |
| } else { |
| // Ignore this CQ job. |
| skippedReason := fmt.Sprintf("%s skipped because it did not match any of the location regexes: %s", cqJobName, strings.Join(cqCfg.LocationRegexes, ",")) |
| sklog.Infof("[%d] %s", ci.Issue, skippedReason) |
| extraInfoForUIMsgs = append(extraInfoForUIMsgs, skippedReason) |
| skippedTryJobs = append(skippedTryJobs, cqJobName) |
| continue |
| } |
| } |
| |
| // Check to see if this try job already exists on the current change. |
| if build, ok := nameToBuilderOnChange[cqJobName]; ok { |
| |
| if timeNowFunc().Unix()-build.GetCreateTime().GetSeconds() >= TryJobStaleTimeoutSecs { |
| // If a job is stale then it needs to be retriggered regardless of it's state. |
| staleTryJobs = append(staleTryJobs, cqJobName) |
| |
| } else if build.GetStatus() == buildbucketpb.Status_SUCCESS { |
| if rerunTryJobs && build.GetEndTime().GetSeconds() < startTime { |
| // Do not consider these successful jobs if rerunTryJobs is true. |
| notFoundTryJobs = append(notFoundTryJobs, cqJobName) |
| } else { |
| // If a job is successful then reuse it regardless of if a user triggered it or the CQ triggered it. |
| reuseSuccessTryJobs = append(reuseSuccessTryJobs, cqJobName) |
| } |
| |
| } else if build.GetStatus() == buildbucketpb.Status_STARTED || build.GetStatus() == buildbucketpb.Status_SCHEDULED { |
| if exp, ok := jobsToExperimental[cqJobName]; ok && exp { |
| sklog.Infof("[%d] The experimental bot %s is still running. Going to consider it successful", ci.Issue, cqJobName) |
| reuseSuccessTryJobs = append(reuseSuccessTryJobs, cqJobName) |
| } else { |
| // If a job is running then consider it part of the current attempt regardless of who triggered it. |
| reuseRunningTryJobs = append(reuseRunningTryJobs, cqJobName) |
| } |
| |
| } else if build.GetStatus() == buildbucketpb.Status_CANCELED || build.GetStatus() == buildbucketpb.Status_FAILURE || build.GetStatus() == buildbucketpb.Status_INFRA_FAILURE { |
| if build.GetEndTime().GetSeconds() < startTime { |
| // If a job failed before the current cq attempt then it needs to be retriggered because it was |
| // not part of the current CQ attempt. |
| sklog.Infof("[%d] %s failed before the current CQ attempt of %d. Ignoring it and it will be retriggered.", ci.Issue, cqJobName, startTime) |
| notFoundTryJobs = append(notFoundTryJobs, cqJobName) |
| } else if exp, ok := jobsToExperimental[cqJobName]; ok && exp { |
| // This is an experimental bot. Consider it successful. |
| sklog.Infof("[%d] The experimental bot %s failed. Going to consider it successful", ci.Issue, cqJobName) |
| reuseSuccessTryJobs = append(reuseSuccessTryJobs, cqJobName) |
| } else { |
| // If a job failed after the current cq attempt started then the try job has failed in the |
| // current CQ attempt. |
| |
| // Check to see if we have already retried this try job in this CQ attempt. |
| if retry, ok := alreadyRetriedTryJobIDs[build.Id]; ok && retry { |
| sklog.Infof("[%d] The failed try job %s has already been retried once. Returning failure.", ci.Issue, cqJobName) |
| return types.VerifierFailureState, fmt.Sprintf("%s has failed twice in a row", cqJobName), nil |
| } |
| |
| // Check retry quota to see if we should retry this try job. |
| if retryQuota > 0 { |
| sklog.Infof("[%d] The try job %s has failed but retry quota %d>0 so retrying it", ci.Issue, cqJobName, retryQuota) |
| jobsToRetry[cqJobName] = true |
| notFoundTryJobs = append(notFoundTryJobs, cqJobName) |
| retryQuota -= 1 |
| } else { |
| sklog.Infof("[%d] The try job %s has failed and retry quota==0. Returning failure.", ci.Issue, cqJobName) |
| return types.VerifierFailureState, fmt.Sprintf("%s has failed", cqJobName), nil |
| } |
| } |
| |
| } else { |
| // Not sure what state this is in. Log an error. |
| sklog.Errorf("[%d] Unknown state %s for try job %s", ci.Issue, build.GetStatus(), cqJobName) |
| // Returning an error for now. |
| return "", "", skerr.Fmt("Unknown state %s for try job %s", build.GetStatus(), cqJobName) |
| |
| } |
| } else { |
| // The try job has not been triggered on the change yet. |
| notFoundTryJobs = append(notFoundTryJobs, cqJobName) |
| } |
| } |
| |
| sklog.Infof("[%d] For CQ try jobs- Skipped %d try jobs. Found %d stale try jobs. %d successful reusable try jobs. %d running reusable try jobs. %d try jobs were not found. %d Total CQ try jobs", ci.Issue, len(skippedTryJobs), len(staleTryJobs), len(reuseSuccessTryJobs), len(reuseRunningTryJobs), len(notFoundTryJobs), len(tv.tasksCfg.CommitQueue)) |
| |
| // Trigger all stale and not found try jobs |
| triggerTryJobs := append(staleTryJobs, notFoundTryJobs...) |
| if len(triggerTryJobs) > 0 { |
| sklog.Infof("[%d] Triggering %d try jobs", ci.Issue, len(triggerTryJobs)) |
| botsToTags := map[string]map[string]string{} |
| for _, t := range triggerTryJobs { |
| tags := map[string]string{ |
| "triggered_by": "skcq", |
| } |
| if experimental, ok := jobsToExperimental[t]; ok { |
| tags["cq_experimental"] = strconv.FormatBool(experimental) |
| } |
| if retry, ok := jobsToRetry[t]; ok && retry { |
| tags[RetryTagName] = strconv.FormatInt(startTime, 10) |
| } |
| botsToTags[t] = tags |
| } |
| respBuilds, err := tv.bb2.ScheduleBuilds(ctx, triggerTryJobs, botsToTags, ci.Issue, latestPatchSetID, tv.gerritURL, ci.Project, bbProject, bbBucket) |
| if err != nil { |
| return "", "", skerr.Wrapf(err, "[%d] Could not trigger %+v tryjobs", ci.Issue, triggerTryJobs) |
| } |
| // Make sure the try jobs were succesfully triggered. This step should not be necessary but if we |
| // specify a repo/bucket that does not exist the ScheduleBuilds silently succeeds. |
| newTryJobsOnChange, err := tv.bb2.GetTrybotsForCL(ctx, ci.Issue, latestPatchSetID, "https://"+tv.gerritURL, map[string]string(nil)) |
| if err != nil { |
| return "", "", skerr.Wrapf(err, "[%d] Could not get tryjobs", ci.Issue) |
| } |
| for _, b := range respBuilds { |
| found := false |
| // Make sure this build is in the new tryjobs on change. |
| for _, n := range newTryJobsOnChange { |
| if b.GetId() == n.GetId() { |
| found = true |
| } |
| } |
| if !found { |
| return "", "", skerr.Fmt("[%d] %s with id %d was scheduled but did not show up on buildbucket", ci.Issue, b.GetBuilder().GetBuilder(), b.GetId()) |
| } |
| } |
| |
| } |
| |
| extraInfoForUIMsg := "" |
| if len(extraInfoForUIMsgs) > 0 { |
| extraInfoForUIMsg = fmt.Sprintf("\n%s", strings.Join(extraInfoForUIMsgs, "\n")) |
| } |
| waitingTryJobs := append(reuseRunningTryJobs, notFoundTryJobs...) |
| if len(waitingTryJobs) > 0 { |
| return types.VerifierWaitingState, fmt.Sprintf("Waiting for %d try jobs to complete.%s", len(waitingTryJobs), extraInfoForUIMsg), nil |
| } else { |
| if len(reuseSuccessTryJobs) != len(tv.tasksCfg.CommitQueue)-len(skippedTryJobs) { |
| // This *should* not happen. |
| return "", "", skerr.Fmt("[%d] %d successful try jobs does not match the %d total try jobs - %d skipped try jobs", ci.Issue, len(reuseSuccessTryJobs), len(tv.tasksCfg.CommitQueue), len(skippedTryJobs)) |
| } |
| // If we are not waiting on anything and there were no failures then they were all successful. |
| return types.VerifierSuccessState, fmt.Sprintf("CQ Try jobs were successful.%s", extraInfoForUIMsg), nil |
| } |
| } |
| |
| // Cleanup implements the types.Verifier interface. |
| func (tv *TryJobsVerifier) Cleanup(ctx context.Context, ci *gerrit.ChangeInfo, cleanupPatchsetID int64) { |
| // If "Cq-Do-Not-Cancel-Tryjobs: true" has been specified then immediately return success. |
| noCancelTryJobs := git.GetBoolFooterVal(tv.footersMap, footers.DoNotCancelTryjobsFooter, ci.Issue) |
| if noCancelTryJobs { |
| sklog.Infof("[%d] Not checking for and not cancelling try jobs for %d/%d because %s id specified in footers", ci.Issue, ci.Issue, cleanupPatchsetID, footers.DoNotCancelTryjobsFooter) |
| return |
| } |
| |
| //Refresh the change to get the latest patchset ID. |
| refreshedChange, err := tv.cr.GetIssueProperties(ctx, ci.Issue) |
| if err != nil { |
| sklog.Errorf("[%d] Could not get refreshed change in cleanup of %s", ci.Issue, tv.Name()) |
| return |
| } |
| refreshedPSID := tv.cr.GetEarliestEquivalentPatchSetID(refreshedChange) |
| |
| if cleanupPatchsetID != refreshedPSID { |
| // Find all the builds triggered by CQ and then cancel them. |
| builds, err := tv.bb2.GetTrybotsForCL(ctx, refreshedChange.Issue, cleanupPatchsetID, "https://"+tv.gerritURL, map[string]string{"triggered_by": "skcq"}) |
| if err != nil { |
| sklog.Errorf("[%d] Could not search for trybots in cleanup of patchset %d: %s", ci.Issue, cleanupPatchsetID, err) |
| return |
| } |
| for _, b := range builds { |
| buildIDsToCancel := []int64{} |
| if b.GetStatus() == buildbucketpb.Status_STARTED || b.GetStatus() == buildbucketpb.Status_SCHEDULED { |
| sklog.Infof("[%d] old patchset %d has a still running try job: %s. It will be canceled.", refreshedChange.Issue, cleanupPatchsetID, b.GetBuilder().Builder) |
| buildIDsToCancel = append(buildIDsToCancel, b.GetId()) |
| } |
| if len(buildIDsToCancel) > 0 { |
| if _, err := tv.bb2.CancelBuilds(ctx, buildIDsToCancel, CancelBuildsMsg); err != nil { |
| sklog.Errorf("[%d] Could not cleanup buildbucket builds of IDs %+v: %s", refreshedChange.Issue, buildIDsToCancel, err) |
| return |
| } |
| } |
| } |
| } |
| |
| return |
| } |
| |
| // getIncludeFooterTryJobs parses footers for the footers.IncludeTryjobsFooter |
| // and returns try jobs from it. If the specified project or bucket does not |
| // match it is expected, then an error is returned. |
| func (tv *TryJobsVerifier) getIncludeFooterTryJobs(issue int64, bbProject, bbBucket string) ([]string, error) { |
| // Check, parse, and get the try jobs in IncludeTryjobsFooter if specified. |
| includeTryjobsFooter := git.GetStringFooterVal(tv.footersMap, footers.IncludeTryjobsFooter) |
| if includeTryjobsFooter == "" { |
| return []string{}, nil |
| } |
| includeTryJobsMap, err := footers.ParseIncludeTryjobsFooter(includeTryjobsFooter) |
| if err != nil { |
| sklog.Errorf("[%d] Could not parse %s: %s", issue, includeTryjobsFooter, err) |
| return []string{}, nil |
| } |
| retTryJobs := []string{} |
| for bucket, tryJobs := range includeTryJobsMap { |
| projectAndBucket := strings.Split(bucket, "/") |
| var p, b string |
| if len(projectAndBucket) == 2 { |
| // This format is supported. eg: "skia/skia.primary". |
| p = projectAndBucket[0] |
| b = projectAndBucket[1] |
| } else { |
| // Another supported format is "luci.skia.skia.primary" try that one out now. |
| projectAndBucket = strings.Split(bucket, ".") |
| if len(projectAndBucket) != 4 { |
| return nil, skerr.Fmt("Unsupported bucket value of \"%s\" in %+v", bucket, includeTryJobsMap) |
| } |
| p = projectAndBucket[1] |
| b = fmt.Sprintf("%s.%s", projectAndBucket[2], projectAndBucket[3]) |
| } |
| if p != bbProject { |
| return nil, skerr.Fmt("Could not recognize bb project \"%s\" in %+v", p, includeTryJobsMap) |
| } |
| if b != bbBucket { |
| return nil, skerr.Fmt("Specified bucket \"%s\" is different than expected bucket %s in %+v", b, bbBucket, includeTryJobsMap) |
| } |
| retTryJobs = append(retTryJobs, tryJobs...) |
| } |
| return retTryJobs, nil |
| } |
| |
| // doesLocationRegexMatch looks at if the file patches modify by the change |
| // match any of the provided location regexes. If any match then the matching |
| // regex is returned (for logging purposes). |
| func (tv *TryJobsVerifier) doesLocationRegexMatch(ctx context.Context, ci *gerrit.ChangeInfo, patchsetID int64, locationRegexes []string) (bool, string, error) { |
| changedFiles, err := tv.cr.GetFileNames(ctx, ci) |
| if err != nil { |
| return false, "", skerr.Wrapf(err, "Could not get file names from %d/%d", ci.Issue, patchsetID) |
| } |
| for _, locationRegex := range locationRegexes { |
| r, err := regexp.Compile(locationRegex) |
| if err != nil { |
| return false, "", skerr.Wrapf(err, "%s location regex does not compile", locationRegex) |
| } |
| // Run regex on all changed files. |
| for _, cf := range changedFiles { |
| if r.MatchString(cf) { |
| return true, locationRegex, nil |
| } |
| } |
| } |
| return false, "", nil |
| } |