| package poller |
| |
| // Initializes and polls the various issue frameworks. |
| |
| import ( |
| "context" |
| "fmt" |
| "net/http" |
| "strconv" |
| "strings" |
| "time" |
| |
| "go.skia.org/infra/go/gerrit" |
| |
| "go.skia.org/infra/go/util" |
| |
| "go.skia.org/infra/go/allowed" |
| "go.skia.org/infra/go/cleanup" |
| "go.skia.org/infra/go/metrics2" |
| "go.skia.org/infra/go/skerr" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/skcq/go/caches" |
| "go.skia.org/infra/skcq/go/codereview" |
| "go.skia.org/infra/skcq/go/config" |
| "go.skia.org/infra/skcq/go/db" |
| "go.skia.org/infra/skcq/go/throttler" |
| "go.skia.org/infra/skcq/go/types" |
| "go.skia.org/infra/skcq/go/verifiers" |
| ) |
| |
| const ( |
| LivenessMetric = "skcq_be" |
| ) |
| |
| // Start polls Gerrit for matching dry-run/CQ issues, gets their verifiers, |
| // and runs them. |
| func Start(ctx context.Context, pollInterval time.Duration, cr codereview.CodeReview, currentChangesCache caches.CurrentChangesCache, httpClient, criaClient *http.Client, dbClient db.DB, canModifyCfgsOnTheFly *allowed.AllowedFromChromeInfraAuth, publicFEInstanceURL, corpFEInstanceURL string, reposAllowList, reposBlockList []string) error { |
| liveness := metrics2.NewLiveness(LivenessMetric) |
| tm := throttler.NewThrottler() |
| vm := verifiers.NewSkCQVerifiersManager(tm, httpClient, criaClient, cr, canModifyCfgsOnTheFly) |
| cleanup.Repeat(pollInterval, func(ctx context.Context) { |
| sklog.Info("----------------New Poll Iteration--------------") |
| cls, err := cr.Search(ctx) |
| if err != nil { |
| sklog.Errorf("Error when searching for issues: %s", err) |
| return |
| } else { |
| liveness.Reset() |
| } |
| |
| // Store CLs that are being processed in this round for quicker lookup. |
| clsInThisRound := map[string]bool{} |
| |
| // Process the CLs. |
| for _, incompleteCI := range cls { |
| // Get the full issue properties of this change. This is done at the |
| // start of the loop and not in cr.Search because right before we are |
| // about to process a change we want the latest data available. |
| ci, err := cr.GetIssueProperties(ctx, incompleteCI.Issue) |
| if err != nil { |
| sklog.Errorf("[%d] Could not get full issue properties: %s", incompleteCI.Issue, err) |
| continue |
| } |
| |
| // Skip changes with repos not in allow list or in block list. |
| if len(reposAllowList) > 0 && !util.In(ci.Project, reposAllowList) { |
| sklog.Infof("[%d] Ignoring change because the repo %s is not in the repos allowlist: %s.", ci.Issue, ci.Project, reposAllowList) |
| continue |
| } |
| if len(reposBlockList) > 0 && util.In(ci.Project, reposBlockList) { |
| sklog.Infof("[%d] Ignoring change because the repo %s is in the repos blocklist: %s.", ci.Issue, ci.Project, reposBlockList) |
| continue |
| } |
| |
| // Instantiate configReader. |
| // TODO(rmistry): Cache these config readers per repo so that we do not |
| // have to keep creating new ones. |
| configReader, err := config.NewGitilesConfigReader(ctx, httpClient, ci, cr, canModifyCfgsOnTheFly) |
| if err != nil { |
| sklog.Errorf("[%d] Error when instantiating config reader: %s", ci.Issue, err) |
| continue |
| } |
| |
| processCL(ctx, vm, ci, configReader, clsInThisRound, cr, currentChangesCache, httpClient, dbClient, canModifyCfgsOnTheFly, publicFEInstanceURL, corpFEInstanceURL, tm) |
| } |
| |
| // Find CLs that were processed in the last cycle but not the current one. |
| // These CLs might still be running, mark them as abandoned and run cleanup |
| // on their verifiers. |
| for changeEquivalentPatchset, cqRecord := range currentChangesCache.Get() { |
| if _, ok := clsInThisRound[changeEquivalentPatchset]; !ok { |
| |
| ci, err := cr.GetIssueProperties(ctx, cqRecord.ChangeID) |
| if err != nil { |
| sklog.Errorf("[%d] Could not get issue properties during cleanup: %s", cqRecord.ChangeID, err) |
| continue |
| } |
| |
| configReader, err := config.NewGitilesConfigReader(ctx, httpClient, ci, cr, canModifyCfgsOnTheFly) |
| if err != nil { |
| sklog.Errorf("[%d] Could not get config reader during cleanup: %s", ci.Issue, err) |
| continue |
| } |
| |
| if err := cleanupCL(ctx, changeEquivalentPatchset, currentChangesCache, dbClient, cqRecord, ci, configReader, cr, httpClient, vm); err != nil { |
| sklog.Errorf("[%d] Error when cleaning up %s: %s", ci.Issue, changeEquivalentPatchset, err) |
| continue |
| } |
| } |
| } |
| |
| }, nil) |
| |
| return nil |
| } |
| |
| // stripNewLinesFromLog replaces new lines with spaces in the specified log |
| // because new lines show up as errors in cloud logs. See skbug.com/12280. |
| func stripNewLinesFromLog(log string) string { |
| return strings.ReplaceAll(log, "\n", " ") |
| } |
| |
| func processCL(ctx context.Context, vm types.VerifiersManager, ci *gerrit.ChangeInfo, configReader config.ConfigReader, clsInThisRound map[string]bool, cr codereview.CodeReview, currentChangesCache caches.CurrentChangesCache, httpClient *http.Client, dbClient db.DB, canModifyCfgsOnTheFly allowed.Allow, publicFEInstanceURL, corpFEInstanceURL string, tm types.ThrottlerManager) { |
| |
| // Make sure the change is still open and has either CQ+1 and CQ+2. |
| if ci.IsClosed() || !(cr.IsDryRun(ctx, ci) || cr.IsCQ(ctx, ci)) { |
| sklog.Infof("[%d] Ignoring change because it is no longer open or does not have the CQ+1/CQ+2 votes.", ci.Issue) |
| return |
| } |
| |
| // Use the equivalent patchset for the changes cache because |
| // if NO_CODE change patches come in then we want to treat it the |
| // same as the earliest equivalent patch. |
| changeEquivalentPatchset := fmt.Sprintf("%d/%d", ci.Issue, cr.GetEarliestEquivalentPatchSetID(ci)) |
| clsInThisRound[changeEquivalentPatchset] = true |
| |
| repoBranch := fmt.Sprintf("%s/%s", ci.Project, ci.Branch) |
| sklog.Infof("[%d] Started processing in repo+branch %s", ci.Issue, repoBranch) |
| |
| // Get the SkCQ cfg that will be used for this change. |
| skCQCfg, err := configReader.GetSkCQCfg(ctx) |
| if err != nil { |
| sklog.Infof("[%d] Error when reading %s: %s", ci.Issue, config.SkCQCfgPath, err) |
| if config.IsNotFound(err) { |
| cr.RemoveFromCQ(ctx, ci, fmt.Sprintf("%s. Removing from CQ.\nPlease add a %s file if this repo+branch requires CQ.", err.Error(), config.SkCQCfgPath), "Repo+Branch is missing SkCQ config file.") |
| return |
| } else if config.IsCannotModifyCfgsOnTheFly(err) { |
| cr.RemoveFromCQ(ctx, ci, fmt.Sprintf("CL owner %s does not have permission to modify %s", ci.Owner.Email, config.SkCQCfgPath), "CL owner cannot modify SkCQ configs.") |
| return |
| } else { |
| sklog.Errorf("[%d] Error reading %s: %s", ci.Issue, config.SkCQCfgPath, err) |
| return |
| } |
| } |
| |
| // Is this a change from an internal repo? |
| internalRepo := skCQCfg.VisibilityType == config.InternalVisibility |
| |
| // Gather all verifiers that will be used and all changes that will be |
| // submitted at the same time as this change. |
| clVerifiers, togetherChanges, err := vm.GetVerifiers(ctx, skCQCfg, ci, false /* isSubmittedTogetherChange */, configReader) |
| if err != nil { |
| sklog.Errorf("[%d] Error when getting verifiers: %s", ci.Issue, err) |
| // Stop processing the change due to the likely transient error. It will be |
| // retried at the next poll iteration. If the error keeps happening then |
| // the infra gardener will see an alert. |
| return |
| } |
| // Log verifiers. |
| verifierNames := []string{} |
| for _, clVerifier := range clVerifiers { |
| verifierNames = append(verifierNames, clVerifier.Name()) |
| } |
| sklog.Infof("[%d] uses verifiers: %s", ci.Issue, strings.Join(verifierNames, ", ")) |
| |
| // Update the cache if it is not already in there. |
| cqStartTime, newCQRun, err := currentChangesCache.Add(ctx, changeEquivalentPatchset, ci.Subject, ci.Owner.Email, ci.Project, ci.Branch, !cr.IsCQ(ctx, ci), internalRepo, ci.Issue, cr.GetLatestPatchSetID(ci)) |
| if err != nil { |
| sklog.Errorf("[%d] could not update the currentChangesCache: %s", ci.Issue, err) |
| } |
| cqEndTime := int64(0) |
| cqSubmittedTime := int64(0) |
| |
| if newCQRun { |
| // If this is a new CQ run then before running the verifiers update the |
| // CL with an auto-generated comment saying we are processing this patch. |
| feURL := publicFEInstanceURL |
| if internalRepo { |
| feURL = corpFEInstanceURL |
| } |
| notify := codereview.NotifyNone |
| comment := "SkCQ is trying the patch." |
| if !cr.IsCQ(ctx, ci) { |
| comment = fmt.Sprintf("Dry run: %s", comment) |
| } else if len(togetherChanges) > 0 { |
| togetherChangesLinks := []string{} |
| for _, t := range togetherChanges { |
| togetherChangesLinks = append(togetherChangesLinks, fmt.Sprintf("https://skia-review.googlesource.com/c/%s", t)) |
| } |
| comment = fmt.Sprintf("%s\nThis change will be submitted with the following changes: %s", comment, strings.Join(togetherChangesLinks, ", ")) |
| // Notify owner and reviewers to let them know that other changes might be submitted as well. |
| notify = codereview.NotifyOwnerReviewersTriggerers |
| } |
| comment = fmt.Sprintf("%s\n\nFollow status at: %s/%d/%d", comment, feURL, ci.Issue, cr.GetLatestPatchSetID(ci)) |
| if err := cr.AddComment(ctx, ci, comment, notify, "Started SkCQ run."); err != nil { |
| sklog.Errorf("[%d] Could not add started processing comment: %s", ci.Issue, err) |
| } |
| } |
| |
| // Now run the verifiers. |
| verifierStatuses := vm.RunVerifiers(ctx, ci, clVerifiers, cqStartTime) |
| rejectMsgsFromVerifiers, waitMsgsFromVerifiers, successMsgsFromVerifiers := verifiers.GetStatusStringsFromVerifierStatuses(verifierStatuses) |
| var attemptOverallState types.VerifierState |
| if len(rejectMsgsFromVerifiers) > 0 { |
| // There were failed verifiers. |
| sklog.Infof("[%d] from %s has failed verifiers: %s", ci.Issue, repoBranch, stripNewLinesFromLog(strings.Join(rejectMsgsFromVerifiers, ", "))) |
| cr.RemoveFromCQ(ctx, ci, fmt.Sprintf("Removing from SkCQ because verifiers have failed:\n\n%s", strings.Join(rejectMsgsFromVerifiers, "\n")), "SkCQ run failed.") |
| if err := currentChangesCache.Remove(ctx, changeEquivalentPatchset); err != nil { |
| sklog.Errorf("[%d] could not update the currentChangesCache: %s", ci.Issue, err) |
| } |
| attemptOverallState = types.VerifierFailureState |
| cqEndTime = time.Now().Unix() |
| } else if len(waitMsgsFromVerifiers) > 0 { |
| // There are verifiers we need to wait for. |
| sklog.Infof("[%d] from %s is waiting for verifiers: %s", ci.Issue, repoBranch, stripNewLinesFromLog(strings.Join(waitMsgsFromVerifiers, ", "))) |
| attemptOverallState = types.VerifierWaitingState |
| } else { |
| // There were no failed verifiers or verifiers that we need to wait for |
| sklog.Infof("[%d] from %s successfully ran verifiers: %s", ci.Issue, repoBranch, stripNewLinesFromLog(strings.Join(successMsgsFromVerifiers, ", "))) |
| if !cr.IsCQ(ctx, ci) { |
| removeFromCQMsg := "Dry run: This CL passed the SkCQ dry run." |
| if ci.WorkInProgress { |
| // If the change is WIP and a reviewer has been added, then |
| // automatically remove the change from WIP by publishing it. |
| for _, r := range ci.Reviewers.Reviewer { |
| // Owner shows up as a reviewer. No idea why Gerrit does this. |
| if r.AccountID != ci.Owner.AccountID { |
| // Publish and break out. |
| if err := cr.SetReadyForReview(ctx, ci); err != nil { |
| sklog.Errorf("[%d] Could not set ready for review: %s", ci.Issue, err) |
| } |
| removeFromCQMsg += "\nAutomatically published the CL because it was WIP with reviewers specified." |
| break |
| } |
| } |
| } |
| // Say everything was succesful and we are done. |
| cr.RemoveFromCQ(ctx, ci, removeFromCQMsg, "SkCQ dry run succeeded.") |
| } else { |
| if err := cr.Submit(ctx, ci); err != nil { |
| if strings.Contains(err.Error(), gerrit.ErrMergeConflict) { |
| sklog.Infof("[%d] Gerrit rejected submission due to merge conflict: %s", ci.Issue, err.Error()) |
| cr.RemoveFromCQ(ctx, ci, fmt.Sprintf("Gerrit rejected submission due to merge conflict.\n\nHint: Rebasing CL in Gerrit UI and re-submitting through SkCQ usually works."), "SkCQ merge conflict") |
| } else { |
| sklog.Errorf("[%d] Error when submitting: %s", ci.Issue, err) |
| return |
| } |
| } else { |
| cqSubmittedTime = time.Now().Unix() |
| tm.UpdateThrottler(repoBranch, time.Now(), skCQCfg.ThrottlerCfg) |
| } |
| } |
| if err := currentChangesCache.Remove(ctx, changeEquivalentPatchset); err != nil { |
| sklog.Errorf("[%d] could not update the currentChangesCache: %s", ci.Issue, err) |
| } |
| cqEndTime = time.Now().Unix() |
| attemptOverallState = types.VerifierSuccessState |
| } |
| |
| if attemptOverallState == types.VerifierFailureState { |
| // Do a pass through of all verifiers and update any states that are in |
| // VerifierWatitingState to VerifierAbortedState because we are no longer |
| // waiting for this verifier. |
| for _, v := range verifierStatuses { |
| if v.State == types.VerifierWaitingState { |
| v.State = types.VerifierAbortedState |
| v.StopTs = time.Now().Unix() |
| } |
| } |
| } |
| |
| // We are done processing this CL for this iteration of the poller's |
| // loop. Persist the CL state for UI display. |
| attempt := &types.ChangeAttempt{ |
| ChangeID: ci.Issue, |
| PatchsetID: cr.GetLatestPatchSetID(ci), |
| DryRun: !cr.IsCQ(ctx, ci), |
| Repo: ci.Project, |
| Branch: ci.Branch, |
| PatchStartTs: cqStartTime, |
| PatchStopTs: cqEndTime, |
| PatchCommittedTs: cqSubmittedTime, |
| SubmittableChanges: togetherChanges, |
| VerifiersStatuses: verifierStatuses, |
| OverallState: attemptOverallState, |
| } |
| if err := dbClient.PutChangeAttempt(ctx, attempt, db.GetChangesCol(internalRepo)); err != nil { |
| sklog.Errorf("[%d] Could not persist change attempt: %s", ci.Issue, err) |
| return |
| } |
| } |
| |
| func cleanupCL(ctx context.Context, changeEquivalentPatchset string, currentChangesCache caches.CurrentChangesCache, dbClient db.DB, cqRecord *types.CurrentlyProcessingChange, ci *gerrit.ChangeInfo, configReader config.ConfigReader, cr codereview.CodeReview, httpClient *http.Client, vm types.VerifiersManager) error { |
| sklog.Infof("[%d] %s is no longer processed by SkCQ. It was processed in the last cycle and was still running. Going to mark it as abandoned and cleanup it's verifiers.", cqRecord.ChangeID, changeEquivalentPatchset) |
| |
| // Remove the change from the changes cache. |
| if err := currentChangesCache.Remove(ctx, changeEquivalentPatchset); err != nil { |
| return skerr.Wrapf(err, "[%d] could not update the currentChangesCache during cleanup", cqRecord.ChangeID) |
| } |
| |
| // Update the attempt as being abandoned so that the UI accurately reflects what happened to |
| // the change. |
| if err := dbClient.UpdateChangeAttemptAsAbandoned(ctx, cqRecord.ChangeID, cqRecord.LatestPatchsetID, db.GetChangesCol(cqRecord.Internal), cqRecord.StartTs); err != nil { |
| return skerr.Wrapf(err, "[%d] Could not mark change %s as abandoned during cleanup", cqRecord.ChangeID, changeEquivalentPatchset) |
| } |
| |
| // Instantiate all objs needed to get verifiers. |
| skCQCfg, err := configReader.GetSkCQCfg(ctx) |
| if err != nil { |
| return skerr.Wrapf(err, "[%d] Could not get %s during cleanup: %s", ci.Issue, config.SkCQCfgPath, err) |
| } |
| // Get all verifiers. |
| verifiers, _, err := vm.GetVerifiers(ctx, skCQCfg, ci, false, configReader) |
| if err != nil { |
| return skerr.Wrapf(err, "[%d] Could not get verifiers to cleanup", ci.Issue) |
| } |
| // Parse out the previous patchset from the changeEquivalentPatchset. |
| tokens := strings.Split(changeEquivalentPatchset, "/") |
| previousPatchsetID, err := strconv.ParseInt(tokens[1], 10, 64) |
| if err != nil { |
| return skerr.Wrapf(err, "[%d] Could not parse patchsetID from %s", ci.Issue, tokens[1]) |
| } |
| // Run cleanup on all verifiers. |
| for _, v := range verifiers { |
| v.Cleanup(ctx, ci, previousPatchsetID) |
| } |
| return nil |
| } |