| package ingester |
| |
| import ( |
| "context" |
| "sort" |
| "time" |
| |
| "go.skia.org/infra/go/gerrit" |
| "go.skia.org/infra/go/gitiles" |
| "go.skia.org/infra/go/louhi" |
| "go.skia.org/infra/go/skerr" |
| "go.skia.org/infra/go/util" |
| "go.skia.org/infra/go/vcsinfo" |
| ) |
| |
| // NotificationToFlowExecution converts a Notification to a FlowExecution. Note |
| // that, since not all Notifications contain all of the information about the |
| // flow, the returned FlowExecution may not be complete. |
| func NotificationToFlowExecution(ctx context.Context, n *louhi.Notification, ts time.Time) *louhi.FlowExecution { |
| var result louhi.FlowResult |
| var finishedAt time.Time |
| if n.EventAction == louhi.EventAction_FAILED { |
| result = louhi.FlowResultFailure |
| finishedAt = ts |
| } else if n.EventAction == louhi.EventAction_FINISHED { |
| // Note: from casual observation it seems to be the case that we do not |
| // receive both a FINISHED and a FAILED notification when a flow fails. |
| // If we determine that to be incorrect, we'll need to change some logic |
| // around to ensure that we get the correct result for the flow. |
| result = louhi.FlowResultSuccess |
| finishedAt = ts |
| } |
| return &louhi.FlowExecution{ |
| Artifacts: n.ArtifactLink, |
| CreatedAt: ts, |
| FinishedAt: finishedAt, |
| FlowID: n.FlowUniqueKey, |
| FlowName: n.FlowName, |
| GeneratedCLs: n.GeneratedCls, |
| GitBranch: n.Branch, |
| GitCommit: n.RefSha, |
| ID: n.PipelineExecutionId, |
| Link: n.Link, |
| ModifiedAt: ts, |
| ProjectID: n.ProjectId, |
| Result: result, |
| StartedBy: n.StartedBy, |
| TriggerType: louhi.TriggerType(n.TriggerType), |
| } |
| } |
| |
| // Ingester is used for ingesting Louhi Notifications into a DB. |
| type Ingester struct { |
| db louhi.DB |
| gerrit gerrit.GerritInterface |
| repos []gitiles.GitilesRepo |
| } |
| |
| // NewIngester returns an Ingester instance. |
| func NewIngester(db louhi.DB, g gerrit.GerritInterface, repos []gitiles.GitilesRepo) *Ingester { |
| return &Ingester{ |
| db: db, |
| gerrit: g, |
| repos: repos, |
| } |
| } |
| |
| // UpdateFlowFromNotification retrieves the FlowExecution from the DB, updates |
| // it from the Notifaction, and updates it into the DB. |
| func (i *Ingester) UpdateFlowFromNotification(ctx context.Context, n *louhi.Notification, ts time.Time) error { |
| newFlow := NotificationToFlowExecution(ctx, n, ts) |
| oldFlow, err := i.db.GetFlowExecution(ctx, newFlow.ID) |
| if err != nil { |
| return skerr.Wrapf(err, "failed to retrieve flow %q from DB", newFlow.ID) |
| } |
| |
| // This might be the first time we've seen this flow. |
| if oldFlow == nil { |
| oldFlow = newFlow |
| } |
| if len(newFlow.Artifacts) > 0 { |
| oldFlow.Artifacts = util.NewStringSet(oldFlow.Artifacts, newFlow.Artifacts).Keys() |
| sort.Strings(oldFlow.Artifacts) |
| } |
| if util.TimeIsZero(oldFlow.CreatedAt) || (!util.TimeIsZero(newFlow.CreatedAt) && newFlow.CreatedAt.Before(oldFlow.CreatedAt)) { |
| oldFlow.CreatedAt = newFlow.CreatedAt |
| } |
| if oldFlow.FlowName == "" { |
| oldFlow.FlowName = newFlow.FlowName |
| } |
| if oldFlow.FlowID == "" { |
| oldFlow.FlowID = newFlow.FlowID |
| } |
| if len(newFlow.GeneratedCLs) > 0 { |
| oldFlow.GeneratedCLs = util.NewStringSet(oldFlow.GeneratedCLs, newFlow.GeneratedCLs).Keys() |
| sort.Strings(oldFlow.GeneratedCLs) |
| } |
| if oldFlow.GitBranch == "" { |
| oldFlow.GitBranch = newFlow.GitBranch |
| } |
| if oldFlow.GitCommit == "" { |
| oldFlow.GitCommit = newFlow.GitCommit |
| } |
| // Note: this should never happen, since we use PipelineExecutionId as the |
| // database ID, so it'll be populated if it made it into the DB. |
| if oldFlow.ID == "" { |
| oldFlow.ID = newFlow.ID |
| } |
| if oldFlow.Link == "" { |
| oldFlow.Link = newFlow.Link |
| } |
| if oldFlow.ProjectID == "" { |
| oldFlow.ProjectID = newFlow.ProjectID |
| } |
| if newFlow.Result != louhi.FlowResultUnknown { |
| oldFlow.Result = newFlow.Result |
| oldFlow.FinishedAt = newFlow.FinishedAt |
| } |
| if oldFlow.StartedBy == "" { |
| oldFlow.StartedBy = newFlow.StartedBy |
| } |
| if oldFlow.TriggerType == "" { |
| oldFlow.TriggerType = newFlow.TriggerType |
| } |
| if newFlow.ModifiedAt.After(oldFlow.ModifiedAt) { |
| oldFlow.ModifiedAt = newFlow.ModifiedAt |
| } |
| |
| // Retrieve the CL information for the flow, but only if we haven't done so |
| // yet. |
| if oldFlow.SourceCL == "" && oldFlow.GitCommit != "" { |
| // Retrieve the commit details. Unfortunately, the Louhi notification |
| // doesn't include the repo URL, so we have to scan through our list. |
| var clDetails *vcsinfo.LongCommit |
| for _, repo := range i.repos { |
| details, err := repo.Details(ctx, oldFlow.GitCommit) |
| if err == nil { |
| clDetails = details |
| break |
| } |
| } |
| if clDetails == nil { |
| return skerr.Fmt("failed to retrieve CL details for commit %s", oldFlow.GitCommit) |
| } |
| issue, err := i.gerrit.ExtractIssueFromCommit(clDetails.Body) |
| if err != nil { |
| return skerr.Wrapf(err, "failed to extract issue number from commit body: %s", clDetails.Body) |
| } |
| oldFlow.SourceCL = i.gerrit.Url(issue) |
| } |
| |
| if err := i.db.PutFlowExecution(ctx, oldFlow); err != nil { |
| return skerr.Wrapf(err, "failed to update flow %q in DB", n.PipelineExecutionId) |
| } |
| return nil |
| } |