blob: fc6d4c0867c38d068ba2a04dea08d808a2c60058 [file] [log] [blame]
package cd_metrics
import (
"bytes"
"context"
"encoding/gob"
"fmt"
"strings"
"time"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/gcr"
"go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/metrics2/events"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/go/vcsinfo"
"golang.org/x/oauth2"
)
const (
containerRegistryProject = "skia-public"
louhiUser = "louhi"
repoStateClean = "clean"
commitHashLength = 7
infraRepoUrl = "https://skia.googlesource.com/buildbot.git"
k8sConfigRepoUrl = "https://skia.googlesource.com/k8s-config.git"
measurementImageLatency = "cd_image_build_latency_s"
// overlapDuration indicates how long to extend the time range past the time
// at which we last finished ingesting data. This allows us to revisit
// commits for which the CD pipeline may not have finished. Its value
// should be longer than we ever expect the CD pipeline to take.
overlapDuration = 6 * time.Hour
)
var (
// beginningOfTime is considered to be the earliest time from which we'll
// ingest data. The CD pipeline didn't exist before this point, so there's
// no reason to load earlier data.
beginningOfTime = time.Date(2022, time.June, 13, 0, 0, 0, 0, time.UTC)
timePeriods = []time.Duration{24 * time.Hour, 7 * 24 * time.Hour}
)
// cycle performs one cycle of metrics ingestion.
func cycle(ctx context.Context, imageName string, repos repograph.Map, edb events.EventDB, em *events.EventMetrics, lastFinished, now time.Time) ([]metrics2.Int64Metric, error) {
sklog.Infof("Cycle.")
// Setup.
infraRepo := repos[infraRepoUrl]
//k8sConfigRepo := repos[k8sConfigRepourl]
ts := auth.NewGCloudTokenSource(containerRegistryProject)
gcrClient := gcr.NewClient(ts, containerRegistryProject, imageName)
resp, err := gcrClient.Tags(ctx)
if err != nil {
return nil, skerr.Wrapf(err, "failed to retrieve Docker image data")
}
sklog.Infof("Found %d docker images.", len(resp.Manifest))
// Create a mapping of shortened hash to commit details for easy retrieval.
commits, err := infraRepo.GetCommitsNewerThan(lastFinished.Add(-overlapDuration))
if err != nil {
return nil, skerr.Wrapf(err, "failed to retrieve commits")
}
commitMap := make(map[string]*vcsinfo.LongCommit, len(commits))
for _, c := range commits {
commitMap[c.Hash[:commitHashLength]] = c
}
sklog.Infof("Found %d commits since %s.", len(commitMap), lastFinished.Add(-overlapDuration))
// Go through the Docker images we've uploaded and map them to the commits
// from which they were built.
commitToDockerImageDigest := make(map[*vcsinfo.LongCommit]string, len(commitMap))
commitToDockerImageTime := make(map[*vcsinfo.LongCommit]time.Time, len(commitMap))
for digest, manifest := range resp.Manifest {
for _, tag := range manifest.Tags {
m := gcr.DockerTagRegex.FindStringSubmatch(tag)
if len(m) != 5 {
continue
}
timeStr := m[1]
user := m[2]
hash := m[3]
state := m[4]
// We only care about clean builds generated by our CD system.
if user != louhiUser || state != repoStateClean {
continue
}
// We only care about commits in the specified time range.
commit, ok := commitMap[hash]
if !ok {
continue
}
// Record the digest and creation time of the Docker image for the
// current commit. Use the timestamp from the tag instead of the
// timestamp on the Docker image itself, because some commits may
// generate the same digest as previous a commit, and in those cases
// using the timestamp of the image would paint a misleading picture
// of the latency between commit time and Docker image creation.
dockerImageTime, err := time.Parse("2006-01-02T15_04_05Z", timeStr)
if err != nil {
sklog.Errorf("Invalid timestamp in tag %q: %s", tag, err)
continue
}
if existingTs, ok := commitToDockerImageTime[commit]; !ok || dockerImageTime.Before(existingTs) {
commitToDockerImageDigest[commit] = digest
commitToDockerImageTime[commit] = dockerImageTime
}
}
}
// Create an EventDB event for each commit. Produce metrics for individual
// commits.
newMetrics := make([]metrics2.Int64Metric, 0, len(commits))
for _, commit := range commits {
// Create the event and insert it into the DB.
dockerImageTime := time.Now()
ts, ok := commitToDockerImageTime[commit]
if ok {
dockerImageTime = ts
}
logStr := fmt.Sprintf("%s: %s", commit.Hash[:7], dockerImageTime.Sub(commit.Timestamp))
if ok {
logStr += fmt.Sprintf(" (%s)", commitToDockerImageDigest[commit])
}
sklog.Info(logStr)
data := Event{
CommitHash: commit.Hash,
CommitTime: commit.Timestamp,
DockerImageDigest: commitToDockerImageDigest[commit],
DockerImageTime: commitToDockerImageTime[commit],
K8sConfigHash: "", // TODO
K8sConfigTime: time.Time{}, // TODO
}
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(&data); err != nil {
return nil, skerr.Wrapf(err, "failed to encode event")
}
ev := &events.Event{
Stream: fmtStream(infraRepoUrl, imageName),
Timestamp: commit.Timestamp,
Data: buf.Bytes(),
}
if err := edb.Insert(ev); err != nil {
return nil, skerr.Wrapf(err, "failed to insert event")
}
// Add other metrics.
{
// Latency between commit landing and docker image built.
tags := map[string]string{
"commit": commit.Hash,
"image": imageName,
}
m := metrics2.GetInt64Metric(measurementImageLatency, tags)
m.Update(int64(dockerImageTime.Sub(data.CommitTime).Seconds()))
newMetrics = append(newMetrics, m)
}
}
return newMetrics, nil
}
// Event is an entry in the EventDB which details the time taken
type Event struct {
CommitHash string
CommitTime time.Time
DockerImageDigest string
DockerImageTime time.Time
K8sConfigHash string
K8sConfigTime time.Time
}
// addAggregateMetrics adds aggregate metrics to the stream.
func addAggregateMetrics(s *events.EventStream, imageName string, period time.Duration) error {
if err := s.AggregateMetric(map[string]string{
"image": imageName,
"metric": "image_build_latency",
}, period, func(ev []*events.Event) (float64, error) {
totalLatency := float64(0)
for _, e := range ev {
var data Event
if err := gob.NewDecoder(bytes.NewReader(e.Data)).Decode(&data); err != nil {
return 0.0, skerr.Wrap(err)
}
totalLatency += float64(data.DockerImageTime.Sub(data.CommitTime))
}
return totalLatency / float64(len(ev)), nil
}); err != nil {
return skerr.Wrap(err)
}
return nil
}
// getLastIngestionTs returns the timestamp of the last commit for which we
// successfully ingested events.
func getLastIngestionTs(edb events.EventDB, imageName string) (time.Time, error) {
timeEnd := time.Now()
window := time.Hour
for {
timeStart := timeEnd.Add(-window)
var latest time.Time
ev, err := edb.Range(fmtStream(infraRepoUrl, imageName), timeStart, timeEnd)
if err != nil {
return beginningOfTime, err
}
if len(ev) > 0 {
ts := ev[len(ev)-1].Timestamp
if ts.After(latest) {
latest = ts
}
}
if !util.TimeIsZero(latest) {
return latest, nil
}
if timeStart.Before(beginningOfTime) {
return beginningOfTime, nil
}
window *= 2
timeEnd = timeStart
}
}
// Start initiates the metrics data generation for Docker images.
func Start(ctx context.Context, repos repograph.Map, imageNames []string, btProject, btInstance string, ts oauth2.TokenSource) error {
// Set up event metrics.
edb, err := events.NewBTEventDB(ctx, btProject, btInstance, ts)
if err != nil {
return skerr.Wrapf(err, "Failed to create EventDB")
}
em, err := events.NewEventMetrics(edb, "cd_pipeline")
if err != nil {
return skerr.Wrapf(err, "failed to create EventMetrics")
}
// Find the timestamp of the last-ingested commit.
lastFinished := time.Now()
for _, imageName := range imageNames {
s := em.GetEventStream(fmtStream(infraRepoUrl, imageName))
for _, p := range timePeriods {
if err := addAggregateMetrics(s, infraRepoUrl, p); err != nil {
return skerr.Wrapf(err, "failed to add metric")
}
}
lastFinishedForImage, err := getLastIngestionTs(edb, imageName)
if err != nil {
return skerr.Wrapf(err, "failed to get timestamp of last successful ingestion")
}
if lastFinishedForImage.Before(lastFinished) {
lastFinished = lastFinishedForImage
}
}
// Start ingesting data.
lv := metrics2.NewLiveness("last_successful_cd_pipeline_metrics")
oldMetrics := []metrics2.Int64Metric{}
go util.RepeatCtx(ctx, 10*time.Minute, func(ctx context.Context) {
sklog.Infof("Loop start.")
// TODO(borenet): Is this handled elsewhere in Datahopper?
if err := repos.Update(ctx); err != nil {
sklog.Errorf("Failed to update repos: %s", err)
return
}
now := time.Now()
anyFailed := false
newMetrics := []metrics2.Int64Metric{}
for _, imageName := range imageNames {
m, err := cycle(ctx, imageName, repos, edb, em, lastFinished, now)
if err != nil {
sklog.Errorf("Failed to obtain CD pipeline metrics: %s", err)
anyFailed = true
}
newMetrics = append(newMetrics, m...)
}
if !anyFailed {
lastFinished = now
lv.Reset()
// Delete any metrics which we haven't generated again.
metricsMap := make(map[metrics2.Int64Metric]struct{}, len(newMetrics))
for _, m := range newMetrics {
metricsMap[m] = struct{}{}
}
for _, m := range oldMetrics {
if _, ok := metricsMap[m]; !ok {
if err := m.Delete(); err != nil {
sklog.Warningf("Failed to delete metric: %s", err)
// If we failed to delete the metric, add it to the
// "new" metrics list, so that we'll carry it over and
// try again on the next cycle.
newMetrics = append(newMetrics, m)
}
}
}
}
sklog.Infof("Loop end.")
})
return nil
}
// fmtStream returns the name of an event stream given a repo URL and image name.
func fmtStream(repo, imageName string) string {
split := strings.Split(repo, "/")
repoName := strings.TrimSuffix(split[len(split)-1], ".git")
return fmt.Sprintf("cd-commits-%s", repoName)
}