| package cd_metrics |
| |
| /* |
| Package cd_metrics ingests data about commits and Docker images to produce |
| metrics, for example the latency between a commit landing and a Docker image |
| being built for it. |
| */ |
| |
| import ( |
| "bytes" |
| "context" |
| "encoding/gob" |
| "fmt" |
| "strings" |
| "sync" |
| "time" |
| |
| "go.skia.org/infra/go/gcr" |
| "go.skia.org/infra/go/gerrit" |
| "go.skia.org/infra/go/git/repograph" |
| "go.skia.org/infra/go/gitiles" |
| "go.skia.org/infra/go/gitstore/bt_gitstore" |
| "go.skia.org/infra/go/httputils" |
| "go.skia.org/infra/go/louhi" |
| "go.skia.org/infra/go/louhi/firestore" |
| "go.skia.org/infra/go/louhi/pubsub" |
| "go.skia.org/infra/go/metrics2" |
| "go.skia.org/infra/go/metrics2/events" |
| "go.skia.org/infra/go/skerr" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/util" |
| "go.skia.org/infra/go/vcsinfo" |
| "go.skia.org/infra/k8s-checker/go/k8s_config" |
| "golang.org/x/oauth2" |
| "golang.org/x/sync/errgroup" |
| v1 "k8s.io/api/core/v1" |
| ) |
| |
| const ( |
| containerRegistryProject = "skia-public" |
| louhiFlowSuccessMetric = "louhi_flow_success" |
| louhiUser = "louhi" |
| repoStateClean = "clean" |
| commitHashLength = 7 |
| infraRepoUrl = "https://skia.googlesource.com/buildbot.git" |
| k8sConfigRepoUrl = "https://skia.googlesource.com/k8s-config.git" |
| measurementImageLatency = "cd_image_build_latency_s" |
| measurementK8sConfigLatency = "cd_k8s_config_latency_s" |
| |
| // overlapDuration indicates how long to extend the time range past the time |
| // at which we last finished ingesting data. This allows us to revisit |
| // commits for which the CD pipeline may not have finished. Its value |
| // should be longer than we ever expect the CD pipeline to take. |
| overlapDuration = 6 * time.Hour |
| ) |
| |
| var ( |
| // beginningOfTime is considered to be the earliest time from which we'll |
| // ingest data. The CD pipeline didn't exist before this point, so there's |
| // no reason to load earlier data. |
| beginningOfTime = time.Date(2022, time.June, 13, 0, 0, 0, 0, time.UTC) |
| |
| timePeriods = []time.Duration{24 * time.Hour, 7 * 24 * time.Hour} |
| |
| repoUrls = []string{infraRepoUrl, k8sConfigRepoUrl} |
| clusters = []string{"skia-infra-corp", "skia-infra-public", "skia-infra-public-dev"} |
| ) |
| |
| // cycle performs one cycle of metrics ingestion. |
| func cycle(ctx context.Context, imageName string, repos repograph.Map, edb events.EventDB, em *events.EventMetrics, lastFinished, now time.Time, ts oauth2.TokenSource, k8sConfigMap map[key]*vcsinfo.LongCommit) ([]metrics2.Int64Metric, error) { |
| sklog.Infof("CD metrics for %s", imageName) |
| // Setup. |
| infraRepo := repos[infraRepoUrl] |
| gcrClient := gcr.NewClient(ts, containerRegistryProject, imageName) |
| resp, err := gcrClient.Tags(ctx) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "failed to retrieve Docker image data") |
| } |
| sklog.Infof(" Found %d docker images for %s", len(resp.Manifest), imageName) |
| |
| // Create a mapping of shortened hash to commit details for easy retrieval. |
| timeWindowStart := lastFinished.Add(-overlapDuration) |
| commits, err := infraRepo.GetCommitsNewerThan(timeWindowStart) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "failed to retrieve commits") |
| } |
| commitMap := make(map[string]*vcsinfo.LongCommit, len(commits)) |
| for _, c := range commits { |
| commitMap[c.Hash[:commitHashLength]] = c |
| } |
| sklog.Infof(" Found %d commits since %s.", len(commitMap), timeWindowStart) |
| |
| // Go through the Docker images we've uploaded and map them to the commits |
| // from which they were built. |
| commitToDockerImageDigest := make(map[*vcsinfo.LongCommit]string, len(commitMap)) |
| commitToDockerImageTime := make(map[*vcsinfo.LongCommit]time.Time, len(commitMap)) |
| for digest, manifest := range resp.Manifest { |
| for _, tag := range manifest.Tags { |
| m := gcr.DockerTagRegex.FindStringSubmatch(tag) |
| if len(m) != 5 { |
| continue |
| } |
| timeStr := m[1] |
| user := m[2] |
| hash := m[3] |
| state := m[4] |
| |
| // We only care about clean builds generated by our CD system. |
| if user != louhiUser || state != repoStateClean { |
| continue |
| } |
| |
| // We only care about commits in the specified time range. |
| commit, ok := commitMap[hash] |
| if !ok { |
| continue |
| } |
| |
| // Record the digest and creation time of the Docker image for the |
| // current commit. Use the timestamp from the tag instead of the |
| // timestamp on the Docker image itself, because some commits may |
| // generate the same digest as previous a commit, and in those cases |
| // using the timestamp of the image would paint a misleading picture |
| // of the latency between commit time and Docker image creation. |
| dockerImageTime, err := time.Parse("2006-01-02T15_04_05Z", timeStr) |
| if err != nil { |
| sklog.Errorf("Invalid timestamp in tag %q: %s", tag, err) |
| continue |
| } |
| if existingTs, ok := commitToDockerImageTime[commit]; !ok || dockerImageTime.Before(existingTs) { |
| commitToDockerImageDigest[commit] = digest |
| commitToDockerImageTime[commit] = dockerImageTime |
| } |
| } |
| } |
| |
| // Create an EventDB event for each commit. Produce metrics for individual |
| // commits. |
| newMetrics := make([]metrics2.Int64Metric, 0, len(commits)) |
| for _, commit := range commits { |
| digest := commitToDockerImageDigest[commit] |
| k8sConfigHash := "" |
| k8sConfigTime := time.Time{} |
| if digest != "" { |
| k := key{ |
| imageName: imageName, |
| digest: digest, |
| } |
| commit, ok := k8sConfigMap[k] |
| if ok { |
| k8sConfigHash = commit.Hash |
| k8sConfigTime = commit.Timestamp |
| } |
| } |
| |
| data := Event{ |
| CommitHash: commit.Hash, |
| CommitTime: commit.Timestamp, |
| DockerImageDigest: digest, |
| DockerImageTime: commitToDockerImageTime[commit], |
| K8sConfigHash: k8sConfigHash, |
| K8sConfigTime: k8sConfigTime, |
| } |
| var buf bytes.Buffer |
| if err := gob.NewEncoder(&buf).Encode(&data); err != nil { |
| return nil, skerr.Wrapf(err, "failed to encode event") |
| } |
| ev := &events.Event{ |
| Stream: fmtStream(infraRepoUrl, imageName), |
| Timestamp: commit.Timestamp, |
| Data: buf.Bytes(), |
| } |
| if err := edb.Insert(ev); err != nil { |
| return nil, skerr.Wrapf(err, "failed to insert event") |
| } |
| |
| // Add other metrics. |
| |
| // Latency between commit landing and docker image built. We have the |
| // EventDB to give us aggregate metrics, so we'll just use this gauge |
| // for alerts in the case where the latency for a given commit is too |
| // high. Therefore, we don't need this if we've already built the Docker |
| // image. |
| if digest == "" { |
| tags := map[string]string{ |
| "commit": commit.Hash, |
| "image": imageName, |
| } |
| m := metrics2.GetInt64Metric(measurementImageLatency, tags) |
| m.Update(int64(now.Sub(data.CommitTime).Seconds())) |
| newMetrics = append(newMetrics, m) |
| } |
| |
| // Latency between commit landing and k8s-config commit landing with the |
| // new digest. We have the EventDB to give us aggregate metrics, so |
| // we'll just use this gauge for alerts in the case where the latency |
| // for a given commit is too high. Therefore, we don't need this if we |
| // already have a k8sConfigHash. |
| if k8sConfigHash == "" { |
| tags := map[string]string{ |
| "commit": commit.Hash, |
| "image": imageName, |
| } |
| m := metrics2.GetInt64Metric(measurementK8sConfigLatency, tags) |
| m.Update(int64(now.Sub(data.CommitTime).Seconds())) |
| newMetrics = append(newMetrics, m) |
| } |
| } |
| |
| return newMetrics, nil |
| } |
| |
| // Event is an entry in the EventDB which details the time taken |
| type Event struct { |
| CommitHash string |
| CommitTime time.Time |
| DockerImageDigest string |
| DockerImageTime time.Time |
| K8sConfigHash string |
| K8sConfigTime time.Time |
| } |
| |
| // addAggregateMetrics adds aggregate metrics to the stream. |
| func addAggregateMetrics(s *events.EventStream, imageName string, period time.Duration) error { |
| // Docker image build latency. |
| if err := s.AggregateMetric(map[string]string{ |
| "image": imageName, |
| "metric": "image_build_latency", |
| }, period, func(ev []*events.Event) (float64, error) { |
| totalLatency := float64(0) |
| totalEvents := 0 |
| for _, e := range ev { |
| var data Event |
| if err := gob.NewDecoder(bytes.NewReader(e.Data)).Decode(&data); err != nil { |
| return 0.0, skerr.Wrap(err) |
| } |
| if data.DockerImageDigest != "" { |
| totalLatency += float64(data.DockerImageTime.Sub(data.CommitTime)) |
| totalEvents++ |
| } |
| } |
| return totalLatency / float64(totalEvents), nil |
| }); err != nil { |
| return skerr.Wrap(err) |
| } |
| |
| // k8s-config commit with updated Docker image latency. |
| if err := s.AggregateMetric(map[string]string{ |
| "image": imageName, |
| "metric": "k8s_config_latency", |
| }, period, func(ev []*events.Event) (float64, error) { |
| totalLatency := float64(0) |
| totalEvents := 0 |
| for _, e := range ev { |
| var data Event |
| if err := gob.NewDecoder(bytes.NewReader(e.Data)).Decode(&data); err != nil { |
| return 0.0, skerr.Wrap(err) |
| } |
| if data.K8sConfigHash != "" { |
| totalLatency += float64(data.K8sConfigTime.Sub(data.CommitTime)) |
| totalEvents++ |
| } |
| } |
| return totalLatency / float64(totalEvents), nil |
| }); err != nil { |
| return skerr.Wrap(err) |
| } |
| |
| return nil |
| } |
| |
| // getLastIngestionTs returns the timestamp of the last commit for which we |
| // successfully ingested events. |
| func getLastIngestionTs(edb events.EventDB, imageName string) (time.Time, error) { |
| timeEnd := time.Now() |
| window := time.Hour |
| for { |
| timeStart := timeEnd.Add(-window) |
| var latest time.Time |
| ev, err := edb.Range(fmtStream(infraRepoUrl, imageName), timeStart, timeEnd) |
| if err != nil { |
| return beginningOfTime, err |
| } |
| if len(ev) > 0 { |
| ts := ev[len(ev)-1].Timestamp |
| if ts.After(latest) { |
| latest = ts |
| } |
| } |
| if !util.TimeIsZero(latest) { |
| return latest, nil |
| } |
| if timeStart.Before(beginningOfTime) { |
| return beginningOfTime, nil |
| } |
| window *= 2 |
| timeEnd = timeStart |
| } |
| } |
| |
| // updateK8sConfigCache determines which digests of which images were used by |
| // which services at each commit in the given time range and updates the cache. |
| func updateK8sConfigCache(ctx context.Context, repos repograph.Map, gitilesRepo gitiles.GitilesRepo, timeWindowStart time.Time, oldCache map[string][]*key) (map[string][]*key, error) { |
| defer metrics2.FuncTimer().Stop() |
| |
| k8sConfigRepo := repos[k8sConfigRepoUrl] |
| k8sConfigCommits, err := k8sConfigRepo.GetCommitsNewerThan(timeWindowStart) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "failed to retrieve k8s-config commits") |
| } |
| |
| sklog.Infof("Loading k8s-config data for %d commits", len(k8sConfigCommits)) |
| |
| eg, ctx := errgroup.WithContext(ctx) |
| commitsMap := make(map[string]bool, len(k8sConfigCommits)) |
| var mtx sync.Mutex |
| newCache := map[string][]*key{} |
| for _, commit := range k8sConfigCommits { |
| commit := commit // https://golang.org/doc/faq#closures_and_goroutines |
| commitsMap[commit.Hash] = true |
| |
| // If we already have cached data for this commit, use that. |
| if cachedValue, ok := oldCache[commit.Hash]; ok { |
| newCache[commit.Hash] = cachedValue |
| continue |
| } |
| |
| newCache[commit.Hash] = []*key{} |
| |
| files, err := gitilesRepo.ListFilesRecursiveAtRef(ctx, ".", commit.Hash) |
| if err != nil { |
| return nil, skerr.Wrapf(err, "failed to retrieve files at %s", commit.Hash) |
| } |
| for _, file := range files { |
| inCluster := false |
| for _, cluster := range clusters { |
| if strings.HasPrefix(file, cluster) { |
| inCluster = true |
| } |
| } |
| if !inCluster { |
| continue |
| } |
| if !(strings.HasSuffix(file, ".yml") || strings.HasSuffix(file, ".yaml")) { |
| continue |
| } |
| |
| file := file // https://golang.org/doc/faq#closures_and_goroutines |
| eg.Go(func() error { |
| // Read and parse the config file. |
| contents, err := gitilesRepo.ReadFileAtRef(ctx, file, commit.Hash) |
| if err != nil { |
| return skerr.Wrapf(err, "failed to read %s at %s", file, commit.Hash) |
| } |
| k8sConfigs, _, err := k8s_config.ParseK8sConfigFile(contents) |
| if err != nil { |
| return skerr.Wrapf(err, "failed to parse %s", file) |
| } |
| |
| // Gather all containers across all deployments, statefulsets, |
| // and cron jobs. |
| containers := []v1.Container{} |
| containers = append(containers) |
| for _, config := range k8sConfigs.Deployment { |
| containers = append(containers, config.Spec.Template.Spec.Containers...) |
| } |
| for _, config := range k8sConfigs.StatefulSet { |
| containers = append(containers, config.Spec.Template.Spec.Containers...) |
| } |
| for _, config := range k8sConfigs.CronJob { |
| containers = append(containers, config.Spec.JobTemplate.Spec.Template.Spec.Containers...) |
| } |
| for _, config := range k8sConfigs.DaemonSet { |
| containers = append(containers, config.Spec.Template.Spec.Containers...) |
| } |
| |
| // Find the images used by each container. |
| for _, container := range containers { |
| imageSplit := strings.Split(container.Image, "@") |
| if len(imageSplit) != 2 { |
| // Image is probably not specified with sha256 digest; |
| // skip it and move on. |
| continue |
| } |
| imagePathSplit := strings.Split(imageSplit[0], "/") |
| |
| // Note: this doesn't take service name into account. |
| // The effect on metrics will be that we'll only care |
| // about the first time a given digest appears in the |
| // k8s-config repo, regardless of which service it's |
| // associated with. This means we won't be able to |
| // alert if a given service doesn't have its digest |
| // updated for whatever reason. It's unlikely that this |
| // will be a problem; our CD pipeline is set up to |
| // update all instances of a given image in a single CL, |
| // so if we're unable to update a single service, we'll |
| // likely fail to update all of them, and the alert will |
| // trigger. |
| k := &key{ |
| imageName: imagePathSplit[len(imagePathSplit)-1], |
| digest: imageSplit[1], |
| } |
| mtx.Lock() |
| newCache[commit.Hash] = append(newCache[commit.Hash], k) |
| mtx.Unlock() |
| } |
| return nil |
| }) |
| } |
| } |
| sklog.Info("Waiting for goroutines...") |
| if err := eg.Wait(); err != nil { |
| return nil, skerr.Wrap(err) |
| } |
| |
| return newCache, nil |
| } |
| |
| type key struct { |
| imageName string |
| digest string |
| } |
| |
| func getK8sConfigMapping(ctx context.Context, repos repograph.Map, gitilesRepo gitiles.GitilesRepo, timeWindowStart time.Time, cache map[string][]*key) map[key]*vcsinfo.LongCommit { |
| k8sConfigRepo := repos[k8sConfigRepoUrl] |
| |
| // Use the cached data to create the return value, which is in a more |
| // convenient format for metrics. |
| // map[key]<earliest commit using digest> |
| rv := map[key]*vcsinfo.LongCommit{} |
| for commitHash, keys := range cache { |
| commit := k8sConfigRepo.Get(commitHash).LongCommit |
| for _, k := range keys { |
| if prevCommit, ok := rv[*k]; !ok || prevCommit.Timestamp.Before(commit.Timestamp) { |
| rv[*k] = commit |
| } |
| } |
| } |
| |
| return rv |
| } |
| |
| // Start initiates the metrics data generation for Docker images. |
| func Start(ctx context.Context, imageNames []string, btConf *bt_gitstore.BTConfig, ts oauth2.TokenSource, fsProject, fsInstance, pubsubProject string, local bool) error { |
| // Set up event metrics. |
| edb, err := events.NewBTEventDB(ctx, btConf.ProjectID, btConf.InstanceID, ts) |
| if err != nil { |
| return skerr.Wrapf(err, "Failed to create EventDB") |
| } |
| em, err := events.NewEventMetrics(edb, "cd_pipeline") |
| if err != nil { |
| return skerr.Wrapf(err, "failed to create EventMetrics") |
| } |
| repos, err := bt_gitstore.NewBTGitStoreMap(ctx, repoUrls, btConf) |
| if err != nil { |
| sklog.Fatal(err) |
| } |
| |
| httpClient := httputils.DefaultClientConfig().WithTokenSource(ts).Client() |
| k8sConfigGitiles := gitiles.NewRepo(k8sConfigRepoUrl, httpClient) |
| |
| // Find the timestamp of the last-ingested commit. |
| lastFinished := time.Now() |
| for _, imageName := range imageNames { |
| s := em.GetEventStream(fmtStream(infraRepoUrl, imageName)) |
| for _, p := range timePeriods { |
| if err := addAggregateMetrics(s, imageName, p); err != nil { |
| return skerr.Wrapf(err, "failed to add metric") |
| } |
| } |
| lastFinishedForImage, err := getLastIngestionTs(edb, imageName) |
| if err != nil { |
| return skerr.Wrapf(err, "failed to get timestamp of last successful ingestion") |
| } |
| if lastFinishedForImage.Before(lastFinished) { |
| lastFinished = lastFinishedForImage |
| } |
| } |
| |
| // Start ingesting data. |
| lv := metrics2.NewLiveness("last_successful_cd_pipeline_metrics") |
| oldMetrics := map[metrics2.Int64Metric]struct{}{} |
| |
| k8sConfigCache := map[string][]*key{} |
| k8sConfigCacheMtx := sync.RWMutex{} |
| |
| // This goroutine updates the k8sConfigCache. |
| go util.RepeatCtx(ctx, 2*time.Minute, func(ctx context.Context) { |
| sklog.Infof("Updating k8s-config cache.") |
| |
| now := time.Now() |
| timeWindowStart := now.Add(-overlapDuration) |
| |
| k8sConfigCacheMtx.RLock() |
| newK8sConfigCache, err := updateK8sConfigCache(ctx, repos, k8sConfigGitiles, timeWindowStart, k8sConfigCache) |
| k8sConfigCacheMtx.RUnlock() |
| if err != nil { |
| sklog.Errorf("Failed to update k8s-config cache: %s", err) |
| return |
| } |
| k8sConfigCacheMtx.Lock() |
| defer k8sConfigCacheMtx.Unlock() |
| k8sConfigCache = newK8sConfigCache |
| sklog.Infof("Done updating k8s-config cache.") |
| }) |
| |
| // This goroutine updates metrics. |
| go util.RepeatCtx(ctx, 2*time.Minute, func(ctx context.Context) { |
| sklog.Infof("CD metrics loop start.") |
| |
| // These repos aren't shared with the rest of Datahopper, so we need to |
| // update them. |
| if err := repos.Update(ctx); err != nil { |
| sklog.Errorf("Failed to update repos: %s", err) |
| return |
| } |
| |
| now := time.Now() |
| timeWindowStart := now.Add(-overlapDuration) |
| k8sConfigCacheMtx.RLock() |
| k8sConfigMap := getK8sConfigMapping(ctx, repos, k8sConfigGitiles, timeWindowStart, k8sConfigCache) |
| k8sConfigCacheMtx.RUnlock() |
| |
| anyFailed := false |
| newMetrics := map[metrics2.Int64Metric]struct{}{} |
| for _, imageName := range imageNames { |
| m, err := cycle(ctx, imageName, repos, edb, em, lastFinished, now, ts, k8sConfigMap) |
| if err != nil { |
| sklog.Errorf("Failed to obtain CD pipeline metrics: %s", err) |
| anyFailed = true |
| } |
| for _, metric := range m { |
| newMetrics[metric] = struct{}{} |
| } |
| } |
| if !anyFailed { |
| lastFinished = now |
| lv.Reset() |
| |
| // Delete any metrics which we haven't generated again. |
| for m := range oldMetrics { |
| if _, ok := newMetrics[m]; !ok { |
| if err := m.Delete(); err != nil { |
| sklog.Warningf("Failed to delete metric: %s", err) |
| // If we failed to delete the metric, add it to the |
| // "new" metrics list, so that we'll carry it over and |
| // try again on the next cycle. |
| newMetrics[m] = struct{}{} |
| } |
| } |
| } |
| oldMetrics = newMetrics |
| } |
| sklog.Infof("CD metrics loop end.") |
| }) |
| em.Start(ctx) |
| |
| // Start ingestion and metrics for last Louhi flow result for each flow. |
| db, err := firestore.NewDB(ctx, fsProject, "datahopper", fsInstance) |
| if err != nil { |
| return skerr.Wrapf(err, "failed to create Louhi DB") |
| } |
| louhiHttpClient := httputils.DefaultClientConfig().WithTokenSource(ts).Client() |
| g, err := gerrit.NewGerrit("https://skia-review.googlesource.com", louhiHttpClient) |
| if err != nil { |
| return skerr.Wrapf(err, "failed to create Gerrit client") |
| } |
| louhiRepos := []gitiles.GitilesRepo{ |
| gitiles.NewRepo("https://skia.googlesource.com/buildbot.git", louhiHttpClient), |
| gitiles.NewRepo("https://skia.googlesource.com/k8s-config.git", louhiHttpClient), |
| gitiles.NewRepo("https://skia.googlesource.com/skia.git", louhiHttpClient), |
| gitiles.NewRepo("https://skia.googlesource.com/skia-autoroll-internal-config.git", louhiHttpClient), |
| } |
| if err := pubsub.ListenPubSub(ctx, db, local, pubsubProject, g, louhiRepos); err != nil { |
| return skerr.Wrapf(err, "failed to initiate Louhi pub/sub listener") |
| } |
| lvFlowResults := metrics2.NewLiveness("last_successful_louhi_flow_metrics") |
| go util.RepeatCtx(ctx, time.Minute, func(ctx context.Context) { |
| latestFlowExecs, err := db.GetLatestFlowExecutions(ctx) |
| if err != nil { |
| sklog.Errorf("Failed to get latest flow executions: %s", err) |
| return |
| } |
| for flowName, flow := range latestFlowExecs { |
| result := int64(1) |
| if flow.Result == louhi.FlowResultFailure { |
| result = 0 |
| } |
| sklog.Infof("Flow %q has success == %d at %s (flows/%s/executions/%s)", flowName, result, flow.CreatedAt, flow.FlowID, flow.ID) |
| metrics2.GetInt64Metric(louhiFlowSuccessMetric, map[string]string{ |
| "flow_name": flowName, |
| "flow_id": flow.FlowID, |
| "louhi_project": flow.ProjectID, |
| }).Update(result) |
| } |
| lvFlowResults.Reset() |
| }) |
| |
| return nil |
| } |
| |
| // fmtStream returns the name of an event stream given a repo URL and image name. |
| func fmtStream(repo, imageName string) string { |
| split := strings.Split(repo, "/") |
| repoName := strings.TrimSuffix(split[len(split)-1], ".git") |
| return fmt.Sprintf("cd-commits-%s-%s", repoName, imageName) |
| } |