package cd_metrics
Package cd_metrics ingests data about commits and Docker images to produce
metrics, for example the latency between a commit landing and a Docker image
being built for it.
import (
v1 ""
const (
containerRegistryProject = "skia-public"
louhiUser = "louhi"
repoStateClean = "clean"
commitHashLength = 7
infraRepoUrl = ""
k8sConfigRepoUrl = ""
measurementImageLatency = "cd_image_build_latency_s"
measurementK8sConfigLatency = "cd_k8s_config_latency_s"
// overlapDuration indicates how long to extend the time range past the time
// at which we last finished ingesting data. This allows us to revisit
// commits for which the CD pipeline may not have finished. Its value
// should be longer than we ever expect the CD pipeline to take.
overlapDuration = 6 * time.Hour
var (
// beginningOfTime is considered to be the earliest time from which we'll
// ingest data. The CD pipeline didn't exist before this point, so there's
// no reason to load earlier data.
beginningOfTime = time.Date(2022, time.June, 13, 0, 0, 0, 0, time.UTC)
timePeriods = []time.Duration{24 * time.Hour, 7 * 24 * time.Hour}
repoUrls = []string{infraRepoUrl, k8sConfigRepoUrl}
clusters = []string{"skia-infra-corp", "skia-infra-public", "skia-infra-public-dev"}
// cycle performs one cycle of metrics ingestion.
func cycle(ctx context.Context, imageName string, repos repograph.Map, edb events.EventDB, em *events.EventMetrics, lastFinished, now time.Time, ts oauth2.TokenSource, k8sConfigMap map[key]*vcsinfo.LongCommit) ([]metrics2.Int64Metric, error) {
sklog.Infof("CD metrics for %s", imageName)
// Setup.
infraRepo := repos[infraRepoUrl]
gcrClient := gcr.NewClient(ts, containerRegistryProject, imageName)
resp, err := gcrClient.Tags(ctx)
if err != nil {
return nil, skerr.Wrapf(err, "failed to retrieve Docker image data")
sklog.Infof(" Found %d docker images for %s", len(resp.Manifest), imageName)
// Create a mapping of shortened hash to commit details for easy retrieval.
timeWindowStart := lastFinished.Add(-overlapDuration)
commits, err := infraRepo.GetCommitsNewerThan(timeWindowStart)
if err != nil {
return nil, skerr.Wrapf(err, "failed to retrieve commits")
commitMap := make(map[string]*vcsinfo.LongCommit, len(commits))
for _, c := range commits {
commitMap[c.Hash[:commitHashLength]] = c
sklog.Infof(" Found %d commits since %s.", len(commitMap), timeWindowStart)
// Go through the Docker images we've uploaded and map them to the commits
// from which they were built.
commitToDockerImageDigest := make(map[*vcsinfo.LongCommit]string, len(commitMap))
commitToDockerImageTime := make(map[*vcsinfo.LongCommit]time.Time, len(commitMap))
for digest, manifest := range resp.Manifest {
for _, tag := range manifest.Tags {
m := gcr.DockerTagRegex.FindStringSubmatch(tag)
if len(m) != 5 {
timeStr := m[1]
user := m[2]
hash := m[3]
state := m[4]
// We only care about clean builds generated by our CD system.
if user != louhiUser || state != repoStateClean {
// We only care about commits in the specified time range.
commit, ok := commitMap[hash]
if !ok {
// Record the digest and creation time of the Docker image for the
// current commit. Use the timestamp from the tag instead of the
// timestamp on the Docker image itself, because some commits may
// generate the same digest as previous a commit, and in those cases
// using the timestamp of the image would paint a misleading picture
// of the latency between commit time and Docker image creation.
dockerImageTime, err := time.Parse("2006-01-02T15_04_05Z", timeStr)
if err != nil {
sklog.Errorf("Invalid timestamp in tag %q: %s", tag, err)
if existingTs, ok := commitToDockerImageTime[commit]; !ok || dockerImageTime.Before(existingTs) {
commitToDockerImageDigest[commit] = digest
commitToDockerImageTime[commit] = dockerImageTime
// Create an EventDB event for each commit. Produce metrics for individual
// commits.
newMetrics := make([]metrics2.Int64Metric, 0, len(commits))
for _, commit := range commits {
digest := commitToDockerImageDigest[commit]
k8sConfigHash := ""
k8sConfigTime := time.Time{}
if digest != "" {
k := key{
imageName: imageName,
digest: digest,
commit, ok := k8sConfigMap[k]
if ok {
k8sConfigHash = commit.Hash
k8sConfigTime = commit.Timestamp
data := Event{
CommitHash: commit.Hash,
CommitTime: commit.Timestamp,
DockerImageDigest: digest,
DockerImageTime: commitToDockerImageTime[commit],
K8sConfigHash: k8sConfigHash,
K8sConfigTime: k8sConfigTime,
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(&data); err != nil {
return nil, skerr.Wrapf(err, "failed to encode event")
ev := &events.Event{
Stream: fmtStream(infraRepoUrl, imageName),
Timestamp: commit.Timestamp,
Data: buf.Bytes(),
if err := edb.Insert(ev); err != nil {
return nil, skerr.Wrapf(err, "failed to insert event")
// Add other metrics.
// Latency between commit landing and docker image built. We have the
// EventDB to give us aggregate metrics, so we'll just use this gauge
// for alerts in the case where the latency for a given commit is too
// high. Therefore, we don't need this if we've already built the Docker
// image.
if digest == "" {
tags := map[string]string{
"commit": commit.Hash,
"image": imageName,
m := metrics2.GetInt64Metric(measurementImageLatency, tags)
newMetrics = append(newMetrics, m)
// Latency between commit landing and k8s-config commit landing with the
// new digest. We have the EventDB to give us aggregate metrics, so
// we'll just use this gauge for alerts in the case where the latency
// for a given commit is too high. Therefore, we don't need this if we
// already have a k8sConfigHash.
if k8sConfigHash == "" {
tags := map[string]string{
"commit": commit.Hash,
"image": imageName,
m := metrics2.GetInt64Metric(measurementK8sConfigLatency, tags)
newMetrics = append(newMetrics, m)
return newMetrics, nil
// Event is an entry in the EventDB which details the time taken
type Event struct {
CommitHash string
CommitTime time.Time
DockerImageDigest string
DockerImageTime time.Time
K8sConfigHash string
K8sConfigTime time.Time
// addAggregateMetrics adds aggregate metrics to the stream.
func addAggregateMetrics(s *events.EventStream, imageName string, period time.Duration) error {
// Docker image build latency.
if err := s.AggregateMetric(map[string]string{
"image": imageName,
"metric": "image_build_latency",
}, period, func(ev []*events.Event) (float64, error) {
totalLatency := float64(0)
totalEvents := 0
for _, e := range ev {
var data Event
if err := gob.NewDecoder(bytes.NewReader(e.Data)).Decode(&data); err != nil {
return 0.0, skerr.Wrap(err)
if data.DockerImageDigest != "" {
totalLatency += float64(data.DockerImageTime.Sub(data.CommitTime))
return totalLatency / float64(totalEvents), nil
}); err != nil {
return skerr.Wrap(err)
// k8s-config commit with updated Docker image latency.
if err := s.AggregateMetric(map[string]string{
"image": imageName,
"metric": "k8s_config_latency",
}, period, func(ev []*events.Event) (float64, error) {
totalLatency := float64(0)
totalEvents := 0
for _, e := range ev {
var data Event
if err := gob.NewDecoder(bytes.NewReader(e.Data)).Decode(&data); err != nil {
return 0.0, skerr.Wrap(err)
if data.K8sConfigHash != "" {
totalLatency += float64(data.K8sConfigTime.Sub(data.CommitTime))
return totalLatency / float64(totalEvents), nil
}); err != nil {
return skerr.Wrap(err)
return nil
// getLastIngestionTs returns the timestamp of the last commit for which we
// successfully ingested events.
func getLastIngestionTs(edb events.EventDB, imageName string) (time.Time, error) {
timeEnd := time.Now()
window := time.Hour
for {
timeStart := timeEnd.Add(-window)
var latest time.Time
ev, err := edb.Range(fmtStream(infraRepoUrl, imageName), timeStart, timeEnd)
if err != nil {
return beginningOfTime, err
if len(ev) > 0 {
ts := ev[len(ev)-1].Timestamp
if ts.After(latest) {
latest = ts
if !util.TimeIsZero(latest) {
return latest, nil
if timeStart.Before(beginningOfTime) {
return beginningOfTime, nil
window *= 2
timeEnd = timeStart
// updateK8sConfigCache determines which digests of which images were used by
// which services at each commit in the given time range and updates the cache.
func updateK8sConfigCache(ctx context.Context, repos repograph.Map, gitilesRepo gitiles.GitilesRepo, timeWindowStart time.Time, oldCache map[string][]*key) (map[string][]*key, error) {
defer metrics2.FuncTimer().Stop()
k8sConfigRepo := repos[k8sConfigRepoUrl]
k8sConfigCommits, err := k8sConfigRepo.GetCommitsNewerThan(timeWindowStart)
if err != nil {
return nil, skerr.Wrapf(err, "failed to retrieve k8s-config commits")
sklog.Infof("Loading k8s-config data for %d commits", len(k8sConfigCommits))
eg, ctx := errgroup.WithContext(ctx)
commitsMap := make(map[string]bool, len(k8sConfigCommits))
var mtx sync.Mutex
newCache := map[string][]*key{}
for _, commit := range k8sConfigCommits {
commitsMap[commit.Hash] = true
// If we already have cached data for this commit, use that.
if cachedValue, ok := oldCache[commit.Hash]; ok {
newCache[commit.Hash] = cachedValue
newCache[commit.Hash] = []*key{}
files, err := gitilesRepo.ListFilesRecursiveAtRef(ctx, ".", commit.Hash)
if err != nil {
return nil, skerr.Wrapf(err, "failed to retrieve files at %s", commit.Hash)
for _, file := range files {
inCluster := false
for _, cluster := range clusters {
if strings.HasPrefix(file, cluster) {
inCluster = true
if !inCluster {
if !(strings.HasSuffix(file, ".yml") || strings.HasSuffix(file, ".yaml")) {
file := file //
eg.Go(func() error {
// Read and parse the config file.
contents, err := gitilesRepo.ReadFileAtRef(ctx, file, commit.Hash)
if err != nil {
return skerr.Wrapf(err, "failed to read %s at %s", file, commit.Hash)
deployments, statefulSets, cronJobs, err := k8s_config.ParseK8sConfigFile(contents)
if err != nil {
return skerr.Wrapf(err, "failed to parse %s", file)
// Gather all containers across all deployments, statefulsets,
// and cron jobs.
containers := []v1.Container{}
containers = append(containers)
for _, config := range deployments {
containers = append(containers, config.Spec.Template.Spec.Containers...)
for _, config := range statefulSets {
containers = append(containers, config.Spec.Template.Spec.Containers...)
for _, config := range cronJobs {
containers = append(containers, config.Spec.JobTemplate.Spec.Template.Spec.Containers...)
// Find the images used by each container.
for _, container := range containers {
imageSplit := strings.Split(container.Image, "@")
if len(imageSplit) != 2 {
// Image is probably not specified with sha256 digest;
// skip it and move on.
imagePathSplit := strings.Split(imageSplit[0], "/")
// Note: this doesn't take service name into account.
// The effect on metrics will be that we'll only care
// about the first time a given digest appears in the
// k8s-config repo, regardless of which service it's
// associated with. This means we won't be able to
// alert if a given service doesn't have its digest
// updated for whatever reason. It's unlikely that this
// will be a problem; our CD pipeline is set up to
// update all instances of a given image in a single CL,
// so if we're unable to update a single service, we'll
// likely fail to update all of them, and the alert will
// trigger.
k := &key{
imageName: imagePathSplit[len(imagePathSplit)-1],
digest: imageSplit[1],
newCache[commit.Hash] = append(newCache[commit.Hash], k)
return nil
sklog.Info("Waiting for goroutines...")
if err := eg.Wait(); err != nil {
return nil, skerr.Wrap(err)
return newCache, nil
type key struct {
imageName string
digest string
func getK8sConfigMapping(ctx context.Context, repos repograph.Map, gitilesRepo gitiles.GitilesRepo, timeWindowStart time.Time, cache map[string][]*key) map[key]*vcsinfo.LongCommit {
k8sConfigRepo := repos[k8sConfigRepoUrl]
// Use the cached data to create the return value, which is in a more
// convenient format for metrics.
// map[key]<earliest commit using digest>
rv := map[key]*vcsinfo.LongCommit{}
for commitHash, keys := range cache {
commit := k8sConfigRepo.Get(commitHash).LongCommit
for _, k := range keys {
if prevCommit, ok := rv[*k]; !ok || prevCommit.Timestamp.Before(commit.Timestamp) {
rv[*k] = commit
return rv
// Start initiates the metrics data generation for Docker images.
func Start(ctx context.Context, imageNames []string, btConf *bt_gitstore.BTConfig, ts oauth2.TokenSource) error {
// Set up event metrics.
edb, err := events.NewBTEventDB(ctx, btConf.ProjectID, btConf.InstanceID, ts)
if err != nil {
return skerr.Wrapf(err, "Failed to create EventDB")
em, err := events.NewEventMetrics(edb, "cd_pipeline")
if err != nil {
return skerr.Wrapf(err, "failed to create EventMetrics")
repos, err := bt_gitstore.NewBTGitStoreMap(ctx, repoUrls, btConf)
if err != nil {
httpClient := httputils.DefaultClientConfig().WithTokenSource(ts).Client()
k8sConfigGitiles := gitiles.NewRepo(k8sConfigRepoUrl, httpClient)
// Find the timestamp of the last-ingested commit.
lastFinished := time.Now()
for _, imageName := range imageNames {
s := em.GetEventStream(fmtStream(infraRepoUrl, imageName))
for _, p := range timePeriods {
if err := addAggregateMetrics(s, imageName, p); err != nil {
return skerr.Wrapf(err, "failed to add metric")
lastFinishedForImage, err := getLastIngestionTs(edb, imageName)
if err != nil {
return skerr.Wrapf(err, "failed to get timestamp of last successful ingestion")
if lastFinishedForImage.Before(lastFinished) {
lastFinished = lastFinishedForImage
// Start ingesting data.
lv := metrics2.NewLiveness("last_successful_cd_pipeline_metrics")
oldMetrics := map[metrics2.Int64Metric]struct{}{}
k8sConfigCache := map[string][]*key{}
k8sConfigCacheMtx := sync.RWMutex{}
// This goroutine updates the k8sConfigCache.
go util.RepeatCtx(ctx, 2*time.Minute, func(ctx context.Context) {
sklog.Infof("Updating k8s-config cache.")
now := time.Now()
timeWindowStart := now.Add(-overlapDuration)
newK8sConfigCache, err := updateK8sConfigCache(ctx, repos, k8sConfigGitiles, timeWindowStart, k8sConfigCache)
if err != nil {
sklog.Errorf("Failed to update k8s-config cache.")
} else {
k8sConfigCache = newK8sConfigCache
sklog.Infof("Done updating k8s-config cache.")
// This goroutine updates metrics.
go util.RepeatCtx(ctx, 2*time.Minute, func(ctx context.Context) {
sklog.Infof("CD metrics loop start.")
// These repos aren't shared with the rest of Datahopper, so we need to
// update them.
if err := repos.Update(ctx); err != nil {
sklog.Errorf("Failed to update repos: %s", err)
now := time.Now()
timeWindowStart := now.Add(-overlapDuration)
k8sConfigMap := getK8sConfigMapping(ctx, repos, k8sConfigGitiles, timeWindowStart, k8sConfigCache)
anyFailed := false
newMetrics := map[metrics2.Int64Metric]struct{}{}
for _, imageName := range imageNames {
m, err := cycle(ctx, imageName, repos, edb, em, lastFinished, now, ts, k8sConfigMap)
if err != nil {
sklog.Errorf("Failed to obtain CD pipeline metrics: %s", err)
anyFailed = true
for _, metric := range m {
newMetrics[metric] = struct{}{}
if !anyFailed {
lastFinished = now
// Delete any metrics which we haven't generated again.
for m := range oldMetrics {
if _, ok := newMetrics[m]; !ok {
if err := m.Delete(); err != nil {
sklog.Warningf("Failed to delete metric: %s", err)
// If we failed to delete the metric, add it to the
// "new" metrics list, so that we'll carry it over and
// try again on the next cycle.
newMetrics[m] = struct{}{}
oldMetrics = newMetrics
sklog.Infof("CD metrics loop end.")
return nil
// fmtStream returns the name of an event stream given a repo URL and image name.
func fmtStream(repo, imageName string) string {
split := strings.Split(repo, "/")
repoName := strings.TrimSuffix(split[len(split)-1], ".git")
return fmt.Sprintf("cd-commits-%s-%s", repoName, imageName)