| // The bbstate package tracks the state of tryjobs in BuildBucket and loads |
| // issues from Gerrit to maintain consistent tryjob information in an instance |
| // of tryjobstore.TryjobStore. |
| package bbstate |
| |
| import ( |
| "encoding/json" |
| "fmt" |
| "net/http" |
| "regexp" |
| "strconv" |
| "sync" |
| "time" |
| |
| "github.com/davecgh/go-spew/spew" |
| bb_api "go.chromium.org/luci/common/api/buildbucket/buildbucket/v1" |
| "go.skia.org/infra/go/buildbucket" |
| "go.skia.org/infra/go/gerrit" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/golden/go/tryjobstore" |
| "golang.org/x/sync/errgroup" |
| ) |
| |
| // BuildBucketDBSync fetches issue and build information from the relevant services. |
| // This defines the interfaces of BuildBucketState and is used to mock it in tests. |
| type BuildIssueSync interface { |
| // SyncIssueTryjob forces a synchronous fetch of the issue information from |
| // Gerrit and the Tryjob information from the build system. Return values of |
| // (nil, nil, nil) indicate that we received a "not found" (404) return status |
| // when requesting the issue from Gerrit. |
| SyncIssueTryjob(issueID, buildBucketID int64) (*tryjobstore.Issue, *tryjobstore.Tryjob, error) |
| } |
| |
| const ( |
| // DefaultSkiaBucketName is the name for the Skia BuildBucket. |
| DefaultSkiaBucketName = "skia.primary" |
| |
| // DefaultSkiaBuildBucketURL is the default URL for the BuildBucketService used by Skia. |
| DefaultSkiaBuildBucketURL = "https://cr-buildbucket.appspot.com/api/buildbucket/v1/" |
| |
| // DefaultTimeWindow is the time window we track in build bucket. |
| DefaultTimeWindow = 5 * time.Hour // Time we look back at the build bucket |
| |
| // DefaultPollInterval is the interval at which we poll BuildBucket. |
| DefaultPollInterval = 5 * time.Minute // Interval at which we poll buildbucket |
| |
| // DefaultTestBuilderRegex is the default regexp that identifies tryjobs that |
| // run tests/produce images. This will probably only work for Skia. |
| DefaultTestBuilderRegex = `^Test-` |
| |
| // maxConcurrentWrites controls how many updates written concurrently to the |
| // underlying TryjobStore. |
| maxConcurrentWrites = 1000 |
| |
| // buildWatcherPollInterval is the interval at which builds, that are in a |
| // (pre)running state, are polled. |
| buildWatcherPollInterval = 10 * time.Second |
| ) |
| |
| // Config defines the configuration options for BuildBucketState. |
| type Config struct { |
| // BuildBucketURL is the URL of the BuildBucket instance to poll for tryjobs. |
| BuildBucketURL string |
| |
| // BuildBucketName is the name of the BuildBucket to poll. |
| BuildBucketName string |
| |
| // Client is an authenticated http client used to connect to BuildBucket. |
| Client *http.Client |
| |
| // TryjobStore stores tryjob related data. |
| TryjobStore tryjobstore.TryjobStore |
| |
| // GerritClient is used to query Gerrit for issue details. |
| GerritClient *gerrit.Gerrit |
| |
| // PollInterval is the interval at which to poll BuildBucket. |
| PollInterval time.Duration |
| |
| // TimeWindow is the time delta that defines how far back in time BuildBucket is queried. |
| TimeWindow time.Duration |
| |
| // BuilderRegexp is the regular expression that has to match for a builder to be included. |
| BuilderRegexp string |
| } |
| |
| // BuildBucketState captures all tryjobs that are being run by BuildBucket. |
| // These are the tryjobs that were run within a time windows. |
| // It polls the BuildBucket and retrieves Tryjobs as they arrived. As needed |
| // the corresponding Gerrit issues are loaded. This information is then |
| // stored in a TryjobStore. |
| type BuildBucketState struct { |
| // service client to access buildbucket. |
| service *bb_api.Service |
| bucketName string |
| tryjobStore tryjobstore.TryjobStore |
| gerritAPI *gerrit.Gerrit |
| builderRegExp *regexp.Regexp |
| |
| // currentBuilds keeps track of currently running builds that we are tracking. |
| // mapping: currentBuilds[build_bucket_id] => status |
| currentBuilds map[int64]tryjobstore.TryjobStatus |
| |
| // watchMutex protects currentBuilds |
| watchMutex sync.Mutex |
| } |
| |
| // NewBuildBucketState creates a new instance of BuildBucketState. |
| // bbURL is the URL of the target BuildBucket instance and client is an |
| // authenticated http client. |
| func NewBuildBucketState(config *Config) (BuildIssueSync, error) { |
| service, err := bb_api.New(config.Client) |
| if err != nil { |
| return nil, err |
| } |
| |
| // compile the regular expression to filter builders that should be ingested. |
| builderRegExp, err := regexp.Compile(config.BuilderRegexp) |
| if err != nil { |
| return nil, err |
| } |
| |
| service.BasePath = config.BuildBucketURL |
| ret := &BuildBucketState{ |
| service: service, |
| bucketName: config.BuildBucketName, |
| tryjobStore: config.TryjobStore, |
| gerritAPI: config.GerritClient, |
| builderRegExp: builderRegExp, |
| currentBuilds: map[int64]tryjobstore.TryjobStatus{}, |
| } |
| if err := ret.startPollers(config.PollInterval, config.TimeWindow); err != nil { |
| return nil, err |
| } |
| return ret, nil |
| } |
| |
| // SyncIssueTryjob implements the BuildIssueSync interface. |
| func (b *BuildBucketState) SyncIssueTryjob(issueID, buildBucketID int64) (*tryjobstore.Issue, *tryjobstore.Tryjob, error) { |
| // Fetch the build information from BuildBucket and convert it to a Tryjob. |
| tryjob, err := b.fetchBuild(buildBucketID) |
| if err != nil { |
| return nil, nil, err |
| } |
| |
| // The referenced tryjob doesn't exist. |
| if tryjob == nil { |
| return nil, nil, fmt.Errorf("Tryjob with BuildBucket id %d for issue %d does not exist.", buildBucketID, issueID) |
| } |
| |
| // Update the tryjob information. |
| if err := b.updateTryjobState(tryjob); err != nil { |
| // If we got a 404 from Gerrit we signal this back to caller with a nil result. |
| if err == gerrit.ErrNotFound { |
| return nil, nil, nil |
| } |
| return nil, nil, fmt.Errorf("Error adding build info to tryjob store. \n%s\nError: %s", spew.Sdump(tryjob), err) |
| } |
| |
| if tryjob.IssueID != issueID { |
| return nil, nil, fmt.Errorf("Issue %d is not referenced by tryjob %d.", issueID, buildBucketID) |
| } |
| |
| issue, err := b.tryjobStore.GetIssue(issueID, false) |
| if err != nil { |
| return nil, nil, err |
| } |
| |
| if issue == nil { |
| return nil, nil, fmt.Errorf("Issue %d does not exist.", issueID) |
| } |
| |
| return issue, tryjob, nil |
| } |
| |
| // startPollers watches builds that are assumed running (according to the database) |
| // until they reach a completed state. |
| // It also starts a poller that continuously queries BuildBucket for newly |
| // started builds and then watches them the same way. |
| func (b *BuildBucketState) startPollers(pollInterval, timeWindow time.Duration) error { |
| // Fetch all tryjobs that we know as running and watch them for completion. |
| tryjobs, err := b.tryjobStore.RunningTryjobs() |
| if err != nil { |
| return sklog.FmtErrorf("Error retrieving running tryjobs: %s", err) |
| } |
| |
| // Watch these tryjobs to make sure we catch when they are finished. |
| for _, tryjob := range tryjobs { |
| b.watchBuild(tryjob.BuildBucketID) |
| } |
| |
| // Create the channel that will receive the list of running tryjobs. |
| buildsCh := make(chan *bb_api.ApiCommonBuildMessage) |
| workPermissions := make(chan bool, maxConcurrentWrites) |
| |
| // Process the new builds discovered by the poller. |
| go func() { |
| for build := range buildsCh { |
| // Get work permission. |
| workPermissions <- true |
| |
| go func(build *bb_api.ApiCommonBuildMessage) { |
| // Give up work permission at the end. |
| defer func() { <-workPermissions }() |
| |
| // Only consider builds that have not completed. |
| switch build.Status { |
| case buildbucket.STATUS_SCHEDULED: |
| fallthrough |
| case buildbucket.STATUS_STARTED: |
| b.watchBuild(build.Id) |
| } |
| }(build) |
| } |
| }() |
| |
| // Start the poller. |
| if err := b.startSearchPoller(buildsCh, pollInterval, timeWindow); err != nil { |
| return err |
| } |
| |
| return nil |
| } |
| |
| // GetWatchedBuilds returns the builds that are currently being tracked until |
| // they complete. Return value: map[build_bucket_id]status |
| func (b *BuildBucketState) GetWatchedBuilds() map[int64]tryjobstore.TryjobStatus { |
| b.watchMutex.Lock() |
| defer b.watchMutex.Unlock() |
| ret := make(map[int64]tryjobstore.TryjobStatus, len(b.currentBuilds)) |
| for k, v := range b.currentBuilds { |
| ret[k] = v |
| } |
| return ret |
| } |
| |
| // watchBuild ensures that the build with the given BuildBucket id is being watched. |
| // If the build is not being watched a new go-routine is started that follows its |
| // state until it completes. |
| func (b *BuildBucketState) watchBuild(buildBucketID int64) { |
| // Check if we are already watching this build. |
| b.watchMutex.Lock() |
| defer b.watchMutex.Unlock() |
| if _, ok := b.currentBuilds[buildBucketID]; !ok { |
| b.currentBuilds[buildBucketID] = tryjobstore.TRYJOB_UNKNOWN |
| go b.watchOneBuild(buildBucketID) |
| } |
| } |
| |
| // watchOneBuild tracks the build that corresponds to the given BuildBucket id. |
| // It assumes it is being run in its own go-routine. |
| func (b *BuildBucketState) watchOneBuild(buildBucketID int64) { |
| for { |
| // fetch the build info |
| if tryjob, err := b.fetchBuild(buildBucketID); err != nil { |
| sklog.Errorf("Error fetching build from BuildBucket: %s", err) |
| } else { |
| // If the tryjob is ignored or not available we are done. |
| if tryjob == nil { |
| return |
| } |
| |
| // Get the last known status for this id. |
| b.watchMutex.Lock() |
| lastKnownStatus := b.currentBuilds[buildBucketID] |
| b.watchMutex.Unlock() |
| |
| // If it has changed since then we need to update it. |
| if tryjob.Status != lastKnownStatus { |
| if err := b.updateTryjobState(tryjob); err != nil { |
| sklog.Errorf("Error updating tryjob state: %s", err) |
| } else { |
| // If the tryjob has finished we are done watching it. |
| if tryjob.Status >= tryjobstore.TRYJOB_COMPLETE { |
| b.watchMutex.Lock() |
| delete(b.currentBuilds, buildBucketID) |
| b.watchMutex.Unlock() |
| return |
| } |
| b.watchMutex.Lock() |
| b.currentBuilds[buildBucketID] = tryjob.Status |
| b.watchMutex.Unlock() |
| } |
| } |
| } |
| |
| // Wait for a little bit and poll the status again. |
| time.Sleep(buildWatcherPollInterval) |
| } |
| } |
| |
| // fetchBuild retrieves the build that corresponds to the given BuildBucket id |
| // and extracts the information into an instance of Tryjob. |
| // The first return value being nil, indicates that the build does not exist |
| // of was ignored for some reason. |
| func (b *BuildBucketState) fetchBuild(buildBucketID int64) (*tryjobstore.Tryjob, error) { |
| buildResp, err := b.service.Get(buildBucketID).Do() |
| if err != nil { |
| return nil, err |
| } |
| |
| if buildResp == nil { |
| return nil, fmt.Errorf("buildResp is nil. No result found.") |
| } |
| |
| if buildResp.Build == nil { |
| return nil, fmt.Errorf("Build information is nil. No result found.") |
| } |
| |
| if (buildResp.Error != nil) && (buildResp.Error.Message != "") { |
| return nil, fmt.Errorf("Unable to retrieve build %d. Got %s", buildBucketID, buildResp.Error.Message) |
| } |
| build := buildResp.Build |
| |
| // Parse the parameters encoded in the ParametersJson field. |
| params := &tryjobstore.Parameters{} |
| if err := json.Unmarshal([]byte(build.ParametersJson), params); err != nil { |
| return nil, fmt.Errorf("Error unmarshalling params: %s", err) |
| } |
| |
| // Check if this is a builder we can ignore. |
| if b.ignoreBuild(build, params) { |
| return nil, nil |
| } |
| |
| // Extract the tryjob info. |
| return getTryjobInfo(build, params) |
| } |
| |
| // updateTryjobState adds the provided tryjob information to the TryjobStore. |
| func (b *BuildBucketState) updateTryjobState(tryjob *tryjobstore.Tryjob) error { |
| // Find the existing issue in the tryjob store. |
| issue, err := b.tryjobStore.GetIssue(tryjob.IssueID, false) |
| if err != nil { |
| return err |
| } |
| |
| if !issue.HasPatchset(tryjob.PatchsetID) { |
| // Make sure we have an up to date issue. Note: 'issue' might be nil |
| // if we didn't find it in the issue store. |
| if issue, err = b.syncGerritIssue(tryjob.IssueID, tryjob.PatchsetID, issue); err != nil { |
| // Note: Pass the error directly since it might be gerrit.ErrorNotFound. |
| return err |
| } |
| |
| // If the issue was not found we have a problem. |
| if issue == nil { |
| return fmt.Errorf("Issue %d was not found.", tryjob.IssueID) |
| } |
| |
| // Make sure the patchset detail is present. |
| if !issue.HasPatchset(tryjob.PatchsetID) { |
| return fmt.Errorf("Found issue %d, but could not find patchset detail %d", tryjob.IssueID, tryjob.PatchsetID) |
| } |
| } |
| |
| // Add the Tryjob. |
| if err := b.tryjobStore.UpdateTryjob(0, tryjob, nil); err != nil { |
| return fmt.Errorf("Error updating issue and tryjob (%d, %d). Got error: %s", tryjob.IssueID, tryjob.BuildBucketID, err) |
| } |
| return nil |
| } |
| |
| // syncGerritIssue makes sure the data between Gerrit and the TryjobStore are |
| // consistent. |
| // If the the given 'issue' is nil, it will try and fetch it from Gerrit and |
| // create a new entry in the TryjobStore. |
| // If the issue identified by 'issueID' does not exist in Gerrit it will |
| // return nil. |
| func (b *BuildBucketState) syncGerritIssue(issueID, patchsetID int64, issue *tryjobstore.Issue) (*tryjobstore.Issue, error) { |
| // If 'issue' is nil, we need to see if we can find it in Gerrit. |
| if issue == nil { |
| var err error |
| issue, err = b.updateGerritIssue(issueID, issue) |
| if err != nil { |
| // We didn't find the issue in Gerrit. So pass the error back. |
| if err == gerrit.ErrNotFound { |
| return nil, err |
| } |
| return nil, fmt.Errorf("Error fetching issue %d: %s", issueID, err) |
| } |
| } else { |
| // Check if the issue is up to date. |
| if !issue.HasPatchset(patchsetID) { |
| var err error |
| if issue, err = b.updateGerritIssue(issueID, issue); err != nil { |
| return nil, fmt.Errorf("Error updating issue %d: %s", issueID, err) |
| } |
| } |
| } |
| |
| // Write the update issues to the store. |
| if err := b.tryjobStore.UpdateIssue(issue, nil); err != nil { |
| return nil, err |
| } |
| sklog.Infof("Added information for issue %d", issueID) |
| |
| return issue, nil |
| } |
| |
| // updateGerritIssue fetches issue details from Gerrit and merges them into the |
| // internal representation of the Gerrit issues. If issue is nil a new instance |
| // will be allocated and returned. |
| func (b *BuildBucketState) updateGerritIssue(issueID int64, issue *tryjobstore.Issue) (*tryjobstore.Issue, error) { |
| changeInfo, err := b.gerritAPI.GetIssueProperties(issueID) |
| if err != nil { |
| // Note: Make sure to pass through the error unchanged since it is tested against |
| // gerrit.ErrNotFound |
| return nil, err |
| } |
| |
| if issue == nil { |
| issue = &tryjobstore.Issue{} |
| } |
| |
| commitInfos := make([]*gerrit.CommitInfo, len(changeInfo.Revisions)) |
| var egroup errgroup.Group |
| for idx, rev := range changeInfo.Patchsets { |
| func(idx int, rev *gerrit.Revision) { |
| egroup.Go(func() error { |
| var err error |
| commitInfos[idx], err = b.gerritAPI.GetCommit(issueID, rev.ID) |
| return err |
| }) |
| }(idx, rev) |
| } |
| |
| if err := egroup.Wait(); err != nil { |
| return nil, sklog.FmtErrorf("Error retrieving commit info for issue %d: %s", issueID, err) |
| } |
| |
| b.setIssueDetails(issueID, changeInfo, commitInfos, issue) |
| return issue, nil |
| } |
| |
| // setIssueDetails set the properties of the given IssueDetails from the values |
| // in the Gerrit ChangeInfo. |
| func (b *BuildBucketState) setIssueDetails(issueID int64, changeInfo *gerrit.ChangeInfo, commitInfos []*gerrit.CommitInfo, issue *tryjobstore.Issue) { |
| issue.ID = issueID |
| issue.Subject = changeInfo.Subject |
| issue.Owner = changeInfo.Owner.Email |
| issue.Updated = changeInfo.Updated |
| issue.URL = b.gerritAPI.Url(issueID) |
| issue.Status = changeInfo.Status |
| |
| // extract the patchset detail. |
| psDetails := make([]*tryjobstore.PatchsetDetail, 0, len(changeInfo.Patchsets)) |
| for idx, revision := range changeInfo.Patchsets { |
| psDetails = append(psDetails, &tryjobstore.PatchsetDetail{ |
| ID: revision.Number, |
| Commit: revision.ID, |
| ParentCommit: getParentCommit(commitInfos[idx]), |
| }) |
| } |
| issue.UpdatePatchsets(psDetails) |
| } |
| |
| // getParentCommit robustly returns the parent commit from the given CommitInfo instance. If that's |
| // not possible, the empty string is returned. |
| func getParentCommit(commitInfo *gerrit.CommitInfo) string { |
| if commitInfo == nil || len(commitInfo.Parents) == 0 { |
| return "" |
| } |
| |
| return commitInfo.Parents[0].Commit |
| } |
| |
| // pollBuildBucket queries the BuildBucket instance from (now - timeWindow) to now. |
| func (b *BuildBucketState) searchForNewBuilds(buildsCh chan<- *bb_api.ApiCommonBuildMessage, timeWindow time.Duration) error { |
| // Search over a specific time window. |
| searchCall := b.service.Search() |
| |
| timeWindowStart := time.Now().Add(-timeWindow).UnixNano() / int64(time.Microsecond) |
| searchCall.Bucket(b.bucketName).CreationTsLow(timeWindowStart) |
| |
| if _, err := searchCall.Run(buildsCh, 0, nil); err != nil { |
| return fmt.Errorf("Error querying build bucket: %s", err) |
| } |
| return nil |
| } |
| |
| // ignoreBuild is the central place to determine whether a build from |
| // BuildBucket should be ignored. For example, BuildBucket can contain build jobs |
| // that produce no test output. |
| func (b *BuildBucketState) ignoreBuild(build *bb_api.ApiCommonBuildMessage, params *tryjobstore.Parameters) bool { |
| // If BuildResultDetails are there, then parse them and see if |
| // resultDetails['properties']['skip_test'] exists and is true. |
| // This will only apply to some clients, but there should not be any false positives. |
| if build.ResultDetailsJson != "" { |
| resultDetails := map[string]interface{}{} |
| if err := json.Unmarshal([]byte(build.ResultDetailsJson), &resultDetails); err != nil { |
| sklog.Errorf("Error unmarshalling generic JSON: %s", err) |
| } else if props, ok := resultDetails["properties"].(map[string]interface{}); ok { |
| // If skip_test exists and is a bool with value true we need to ignore this build. |
| if val, ok := props["skip_test"].(bool); ok && val { |
| return true |
| } |
| } |
| } |
| |
| // Check whether the builder is ruled out by a regular expression. |
| return !b.builderRegExp.Match([]byte(params.BuilderName)) |
| } |
| |
| // startSearchPoller polls the BuildBucket immediately and starts a poller at the |
| // given interval with the given time windows. All results are written to buildCh. |
| // If the first poll fails, an error is returned. |
| func (b *BuildBucketState) startSearchPoller(buildsCh chan<- *bb_api.ApiCommonBuildMessage, interval, timeWindow time.Duration) error { |
| if err := b.searchForNewBuilds(buildsCh, timeWindow); err != nil { |
| return err |
| } |
| go func() { |
| for range time.Tick(interval) { |
| if err := b.searchForNewBuilds(buildsCh, timeWindow); err != nil { |
| sklog.Errorf("Error polling BuildBucket: %s", err) |
| } |
| } |
| }() |
| return nil |
| } |
| |
| // extractPatchsetRegex is used to extract the patchset ID from BuildBucket builds. |
| var extractPatchsetRegex = regexp.MustCompile(`^refs\/changes\/[0-9]*\/[0-9]*\/(?P<patchset>.+)$`) |
| |
| // getTryjobInfo extracts tryjob information from the BuildBucket record. |
| // It translates the status of a BuildBucket build to the status defined for |
| // Tryjob instances in tryjobstore, which is richer in that it also captures |
| // whether a tryjob result has been ingested or not. |
| func getTryjobInfo(build *bb_api.ApiCommonBuildMessage, params *tryjobstore.Parameters) (*tryjobstore.Tryjob, error) { |
| matchedGroups := extractPatchsetRegex.FindStringSubmatch(params.Properties.GerritPatchset) |
| if len(matchedGroups) != 2 { |
| return nil, fmt.Errorf("Unable to extract patchset info from '%s'", params.Properties.GerritPatchset) |
| } |
| |
| patchsetID, err := strconv.ParseInt(matchedGroups[1], 10, 64) |
| if err != nil { |
| return nil, err |
| } |
| |
| issueID := int64(params.Properties.GerritIssue) |
| |
| // Make sure the relevant ids are correct. |
| if (issueID <= 0) || (patchsetID <= 0) { |
| return nil, sklog.FmtErrorf("Invalid issue id (%d) or patchset id (%d).", issueID, patchsetID) |
| } |
| |
| // Translate the two result fields into one for tryjobs. |
| var status tryjobstore.TryjobStatus = tryjobstore.TRYJOB_UNKNOWN |
| switch build.Status { |
| case buildbucket.STATUS_SCHEDULED: |
| status = tryjobstore.TRYJOB_SCHEDULED |
| case buildbucket.STATUS_STARTED: |
| status = tryjobstore.TRYJOB_RUNNING |
| case buildbucket.STATUS_COMPLETED: |
| switch build.Result { |
| case buildbucket.RESULT_CANCELED: |
| fallthrough |
| case buildbucket.RESULT_FAILURE: |
| status = tryjobstore.TRYJOB_FAILED |
| case buildbucket.RESULT_SUCCESS: |
| status = tryjobstore.TRYJOB_COMPLETE |
| } |
| } |
| |
| if status == tryjobstore.TRYJOB_UNKNOWN { |
| return nil, fmt.Errorf("Unknown tryjob state. Got (status, result): (%s, %s)", build.Status, build.Result) |
| } |
| |
| // UpdateTs is in micro seconds. |
| // Note: Multiplying by time.Microsecond results in the correct number of nanoseconds. |
| const microPerSec = int64(time.Second / time.Microsecond) |
| updated := time.Unix(build.UpdatedTs/microPerSec, (build.UpdatedTs%microPerSec)*int64(time.Microsecond)) |
| ret := &tryjobstore.Tryjob{ |
| IssueID: issueID, |
| PatchsetID: patchsetID, |
| Builder: params.BuilderName, |
| BuildBucketID: build.Id, |
| Updated: updated, |
| Status: status, |
| } |
| |
| return ret, nil |
| } |