[datahopper] Add metrics for Louhi CD flow

Change-Id: Id44b145b016b9667ae3f66c1fc0dc33ad442902a
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/549571
Reviewed-by: Joe Gregorio <jcgregorio@google.com>
Commit-Queue: Eric Boren <borenet@google.com>
diff --git a/autoroll/go/autoroll-pusher/main.go b/autoroll/go/autoroll-pusher/main.go
index 082cb88..96757d4 100644
--- a/autoroll/go/autoroll-pusher/main.go
+++ b/autoroll/go/autoroll-pusher/main.go
@@ -173,10 +173,11 @@
 	if err != nil {
 		return "", skerr.Wrapf(err, "Failed to get latest image for %s; failed to get token source", image)
 	}
-	imageTags, err := gcr.NewClient(ts, GCR_PROJECT, image).Tags()
+	tagsResp, err := gcr.NewClient(ts, GCR_PROJECT, image).Tags(ctx)
 	if err != nil {
 		return "", skerr.Wrapf(err, "Failed to get latest image for %s; failed to get tags", image)
 	}
+	imageTags := tagsResp.Tags
 	sort.Strings(imageTags)
 	if len(imageTags) == 0 {
 		return "", skerr.Fmt("No image tags returned for %s", image)
diff --git a/datahopper/go/cd_metrics/BUILD.bazel b/datahopper/go/cd_metrics/BUILD.bazel
new file mode 100644
index 0000000..bddfa3d
--- /dev/null
+++ b/datahopper/go/cd_metrics/BUILD.bazel
@@ -0,0 +1,20 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+    name = "cd_metrics",
+    srcs = ["cd_metrics.go"],
+    importpath = "go.skia.org/infra/datahopper/go/cd_metrics",
+    visibility = ["//visibility:public"],
+    deps = [
+        "//go/auth",
+        "//go/gcr",
+        "//go/git/repograph",
+        "//go/metrics2",
+        "//go/metrics2/events",
+        "//go/skerr",
+        "//go/sklog",
+        "//go/util",
+        "//go/vcsinfo",
+        "@org_golang_x_oauth2//:oauth2",
+    ],
+)
diff --git a/datahopper/go/cd_metrics/cd_metrics.go b/datahopper/go/cd_metrics/cd_metrics.go
new file mode 100644
index 0000000..fc6d4c0
--- /dev/null
+++ b/datahopper/go/cd_metrics/cd_metrics.go
@@ -0,0 +1,314 @@
+package cd_metrics
+
+import (
+	"bytes"
+	"context"
+	"encoding/gob"
+	"fmt"
+	"strings"
+	"time"
+
+	"go.skia.org/infra/go/auth"
+	"go.skia.org/infra/go/gcr"
+	"go.skia.org/infra/go/git/repograph"
+	"go.skia.org/infra/go/metrics2"
+	"go.skia.org/infra/go/metrics2/events"
+	"go.skia.org/infra/go/skerr"
+	"go.skia.org/infra/go/sklog"
+	"go.skia.org/infra/go/util"
+	"go.skia.org/infra/go/vcsinfo"
+	"golang.org/x/oauth2"
+)
+
+const (
+	containerRegistryProject = "skia-public"
+	louhiUser                = "louhi"
+	repoStateClean           = "clean"
+	commitHashLength         = 7
+	infraRepoUrl             = "https://skia.googlesource.com/buildbot.git"
+	k8sConfigRepoUrl         = "https://skia.googlesource.com/k8s-config.git"
+	measurementImageLatency  = "cd_image_build_latency_s"
+
+	// overlapDuration indicates how long to extend the time range past the time
+	// at which we last finished ingesting data.  This allows us to revisit
+	// commits for which the CD pipeline may not have finished.  Its value
+	// should be longer than we ever expect the CD pipeline to take.
+	overlapDuration = 6 * time.Hour
+)
+
+var (
+	// beginningOfTime is considered to be the earliest time from which we'll
+	// ingest data. The CD pipeline didn't exist before this point, so there's
+	// no reason to load earlier data.
+	beginningOfTime = time.Date(2022, time.June, 13, 0, 0, 0, 0, time.UTC)
+
+	timePeriods = []time.Duration{24 * time.Hour, 7 * 24 * time.Hour}
+)
+
+// cycle performs one cycle of metrics ingestion.
+func cycle(ctx context.Context, imageName string, repos repograph.Map, edb events.EventDB, em *events.EventMetrics, lastFinished, now time.Time) ([]metrics2.Int64Metric, error) {
+	sklog.Infof("Cycle.")
+	// Setup.
+	infraRepo := repos[infraRepoUrl]
+	//k8sConfigRepo := repos[k8sConfigRepourl]
+	ts := auth.NewGCloudTokenSource(containerRegistryProject)
+	gcrClient := gcr.NewClient(ts, containerRegistryProject, imageName)
+	resp, err := gcrClient.Tags(ctx)
+	if err != nil {
+		return nil, skerr.Wrapf(err, "failed to retrieve Docker image data")
+	}
+	sklog.Infof("Found %d docker images.", len(resp.Manifest))
+
+	// Create a mapping of shortened hash to commit details for easy retrieval.
+	commits, err := infraRepo.GetCommitsNewerThan(lastFinished.Add(-overlapDuration))
+	if err != nil {
+		return nil, skerr.Wrapf(err, "failed to retrieve commits")
+	}
+	commitMap := make(map[string]*vcsinfo.LongCommit, len(commits))
+	for _, c := range commits {
+		commitMap[c.Hash[:commitHashLength]] = c
+	}
+	sklog.Infof("Found %d commits since %s.", len(commitMap), lastFinished.Add(-overlapDuration))
+
+	// Go through the Docker images we've uploaded and map them to the commits
+	// from which they were built.
+	commitToDockerImageDigest := make(map[*vcsinfo.LongCommit]string, len(commitMap))
+	commitToDockerImageTime := make(map[*vcsinfo.LongCommit]time.Time, len(commitMap))
+	for digest, manifest := range resp.Manifest {
+		for _, tag := range manifest.Tags {
+			m := gcr.DockerTagRegex.FindStringSubmatch(tag)
+			if len(m) != 5 {
+				continue
+			}
+			timeStr := m[1]
+			user := m[2]
+			hash := m[3]
+			state := m[4]
+
+			// We only care about clean builds generated by our CD system.
+			if user != louhiUser || state != repoStateClean {
+				continue
+			}
+
+			// We only care about commits in the specified time range.
+			commit, ok := commitMap[hash]
+			if !ok {
+				continue
+			}
+
+			// Record the digest and creation time of the Docker image for the
+			// current commit.  Use the timestamp from the tag instead of the
+			// timestamp on the Docker image itself, because some commits may
+			// generate the same digest as previous a commit, and in those cases
+			// using the timestamp of the image would paint a misleading picture
+			// of the latency between commit time and Docker image creation.
+			dockerImageTime, err := time.Parse("2006-01-02T15_04_05Z", timeStr)
+			if err != nil {
+				sklog.Errorf("Invalid timestamp in tag %q: %s", tag, err)
+				continue
+			}
+			if existingTs, ok := commitToDockerImageTime[commit]; !ok || dockerImageTime.Before(existingTs) {
+				commitToDockerImageDigest[commit] = digest
+				commitToDockerImageTime[commit] = dockerImageTime
+			}
+		}
+	}
+
+	// Create an EventDB event for each commit. Produce metrics for individual
+	// commits.
+	newMetrics := make([]metrics2.Int64Metric, 0, len(commits))
+	for _, commit := range commits {
+		// Create the event and insert it into the DB.
+		dockerImageTime := time.Now()
+		ts, ok := commitToDockerImageTime[commit]
+		if ok {
+			dockerImageTime = ts
+		}
+
+		logStr := fmt.Sprintf("%s: %s", commit.Hash[:7], dockerImageTime.Sub(commit.Timestamp))
+		if ok {
+			logStr += fmt.Sprintf(" (%s)", commitToDockerImageDigest[commit])
+		}
+		sklog.Info(logStr)
+
+		data := Event{
+			CommitHash:        commit.Hash,
+			CommitTime:        commit.Timestamp,
+			DockerImageDigest: commitToDockerImageDigest[commit],
+			DockerImageTime:   commitToDockerImageTime[commit],
+			K8sConfigHash:     "",          // TODO
+			K8sConfigTime:     time.Time{}, // TODO
+		}
+		var buf bytes.Buffer
+		if err := gob.NewEncoder(&buf).Encode(&data); err != nil {
+			return nil, skerr.Wrapf(err, "failed to encode event")
+		}
+		ev := &events.Event{
+			Stream:    fmtStream(infraRepoUrl, imageName),
+			Timestamp: commit.Timestamp,
+			Data:      buf.Bytes(),
+		}
+		if err := edb.Insert(ev); err != nil {
+			return nil, skerr.Wrapf(err, "failed to insert event")
+		}
+
+		// Add other metrics.
+		{
+			// Latency between commit landing and docker image built.
+			tags := map[string]string{
+				"commit": commit.Hash,
+				"image":  imageName,
+			}
+			m := metrics2.GetInt64Metric(measurementImageLatency, tags)
+			m.Update(int64(dockerImageTime.Sub(data.CommitTime).Seconds()))
+			newMetrics = append(newMetrics, m)
+		}
+	}
+
+	return newMetrics, nil
+}
+
+// Event is an entry in the EventDB which details the time taken
+type Event struct {
+	CommitHash        string
+	CommitTime        time.Time
+	DockerImageDigest string
+	DockerImageTime   time.Time
+	K8sConfigHash     string
+	K8sConfigTime     time.Time
+}
+
+// addAggregateMetrics adds aggregate metrics to the stream.
+func addAggregateMetrics(s *events.EventStream, imageName string, period time.Duration) error {
+	if err := s.AggregateMetric(map[string]string{
+		"image":  imageName,
+		"metric": "image_build_latency",
+	}, period, func(ev []*events.Event) (float64, error) {
+		totalLatency := float64(0)
+		for _, e := range ev {
+			var data Event
+			if err := gob.NewDecoder(bytes.NewReader(e.Data)).Decode(&data); err != nil {
+				return 0.0, skerr.Wrap(err)
+			}
+			totalLatency += float64(data.DockerImageTime.Sub(data.CommitTime))
+		}
+		return totalLatency / float64(len(ev)), nil
+	}); err != nil {
+		return skerr.Wrap(err)
+	}
+	return nil
+}
+
+// getLastIngestionTs returns the timestamp of the last commit for which we
+// successfully ingested events.
+func getLastIngestionTs(edb events.EventDB, imageName string) (time.Time, error) {
+	timeEnd := time.Now()
+	window := time.Hour
+	for {
+		timeStart := timeEnd.Add(-window)
+		var latest time.Time
+		ev, err := edb.Range(fmtStream(infraRepoUrl, imageName), timeStart, timeEnd)
+		if err != nil {
+			return beginningOfTime, err
+		}
+		if len(ev) > 0 {
+			ts := ev[len(ev)-1].Timestamp
+			if ts.After(latest) {
+				latest = ts
+			}
+		}
+		if !util.TimeIsZero(latest) {
+			return latest, nil
+		}
+		if timeStart.Before(beginningOfTime) {
+			return beginningOfTime, nil
+		}
+		window *= 2
+		timeEnd = timeStart
+	}
+}
+
+// Start initiates the metrics data generation for Docker images.
+func Start(ctx context.Context, repos repograph.Map, imageNames []string, btProject, btInstance string, ts oauth2.TokenSource) error {
+	// Set up event metrics.
+	edb, err := events.NewBTEventDB(ctx, btProject, btInstance, ts)
+	if err != nil {
+		return skerr.Wrapf(err, "Failed to create EventDB")
+	}
+	em, err := events.NewEventMetrics(edb, "cd_pipeline")
+	if err != nil {
+		return skerr.Wrapf(err, "failed to create EventMetrics")
+	}
+
+	// Find the timestamp of the last-ingested commit.
+	lastFinished := time.Now()
+	for _, imageName := range imageNames {
+		s := em.GetEventStream(fmtStream(infraRepoUrl, imageName))
+		for _, p := range timePeriods {
+			if err := addAggregateMetrics(s, infraRepoUrl, p); err != nil {
+				return skerr.Wrapf(err, "failed to add metric")
+			}
+		}
+		lastFinishedForImage, err := getLastIngestionTs(edb, imageName)
+		if err != nil {
+			return skerr.Wrapf(err, "failed to get timestamp of last successful ingestion")
+		}
+		if lastFinishedForImage.Before(lastFinished) {
+			lastFinished = lastFinishedForImage
+		}
+	}
+
+	// Start ingesting data.
+	lv := metrics2.NewLiveness("last_successful_cd_pipeline_metrics")
+	oldMetrics := []metrics2.Int64Metric{}
+	go util.RepeatCtx(ctx, 10*time.Minute, func(ctx context.Context) {
+		sklog.Infof("Loop start.")
+		// TODO(borenet): Is this handled elsewhere in Datahopper?
+		if err := repos.Update(ctx); err != nil {
+			sklog.Errorf("Failed to update repos: %s", err)
+			return
+		}
+		now := time.Now()
+		anyFailed := false
+		newMetrics := []metrics2.Int64Metric{}
+		for _, imageName := range imageNames {
+			m, err := cycle(ctx, imageName, repos, edb, em, lastFinished, now)
+			if err != nil {
+				sklog.Errorf("Failed to obtain CD pipeline metrics: %s", err)
+				anyFailed = true
+			}
+			newMetrics = append(newMetrics, m...)
+		}
+		if !anyFailed {
+			lastFinished = now
+			lv.Reset()
+
+			// Delete any metrics which we haven't generated again.
+			metricsMap := make(map[metrics2.Int64Metric]struct{}, len(newMetrics))
+			for _, m := range newMetrics {
+				metricsMap[m] = struct{}{}
+			}
+			for _, m := range oldMetrics {
+				if _, ok := metricsMap[m]; !ok {
+					if err := m.Delete(); err != nil {
+						sklog.Warningf("Failed to delete metric: %s", err)
+						// If we failed to delete the metric, add it to the
+						// "new" metrics list, so that we'll carry it over and
+						// try again on the next cycle.
+						newMetrics = append(newMetrics, m)
+					}
+				}
+			}
+
+		}
+		sklog.Infof("Loop end.")
+	})
+	return nil
+}
+
+// fmtStream returns the name of an event stream given a repo URL and image name.
+func fmtStream(repo, imageName string) string {
+	split := strings.Split(repo, "/")
+	repoName := strings.TrimSuffix(split[len(split)-1], ".git")
+	return fmt.Sprintf("cd-commits-%s", repoName)
+}
diff --git a/datahopper/go/datahopper/BUILD.bazel b/datahopper/go/datahopper/BUILD.bazel
index b7e31a4..0fc81fd 100644
--- a/datahopper/go/datahopper/BUILD.bazel
+++ b/datahopper/go/datahopper/BUILD.bazel
@@ -14,6 +14,7 @@
     visibility = ["//visibility:private"],
     deps = [
         "//datahopper/go/bot_metrics",
+        "//datahopper/go/cd_metrics",
         "//datahopper/go/gcloud_metrics",
         "//datahopper/go/supported_branches",
         "//datahopper/go/swarming_metrics",
diff --git a/datahopper/go/datahopper/main.go b/datahopper/go/datahopper/main.go
index eb83bd4..b3095f1 100644
--- a/datahopper/go/datahopper/main.go
+++ b/datahopper/go/datahopper/main.go
@@ -15,6 +15,7 @@
 	"cloud.google.com/go/pubsub"
 	"cloud.google.com/go/storage"
 	"go.skia.org/infra/datahopper/go/bot_metrics"
+	"go.skia.org/infra/datahopper/go/cd_metrics"
 	"go.skia.org/infra/datahopper/go/gcloud_metrics"
 	"go.skia.org/infra/datahopper/go/supported_branches"
 	"go.skia.org/infra/datahopper/go/swarming_metrics"
@@ -45,6 +46,7 @@
 	// TODO(borenet): Combine btInstance and firestoreInstance.
 	btInstance        = flag.String("bigtable_instance", "", "BigTable instance to use.")
 	btProject         = flag.String("bigtable_project", "", "GCE project to use for BigTable.")
+	dockerImageNames  = common.NewMultiStringFlag("docker_image", nil, "Docker images to watch for Continuous Deployment metrics.")
 	firestoreInstance = flag.String("firestore_instance", "", "Firestore instance to use, eg. \"production\"")
 	gcloudProjects    = common.NewMultiStringFlag("gcloud_project", nil, "GCloud projects from which to ingest data")
 	gitstoreTable     = flag.String("gitstore_bt_table", "git-repos2", "BigTable table used for GitStore.")
@@ -210,6 +212,11 @@
 		sklog.Fatal(err)
 	}
 
+	// Metrics for the Continuous Deployment pipeline.
+	if err := cd_metrics.Start(ctx, repos, *dockerImageNames, *btProject, *btInstance, ts); err != nil {
+		sklog.Fatal(err)
+	}
+
 	// Wait while the above goroutines generate data.
 	httputils.RunHealthCheckServer(*port)
 }
diff --git a/go/gcr/BUILD.bazel b/go/gcr/BUILD.bazel
index 0827302..6f4902c 100644
--- a/go/gcr/BUILD.bazel
+++ b/go/gcr/BUILD.bazel
@@ -8,6 +8,7 @@
     visibility = ["//visibility:public"],
     deps = [
         "//go/httputils",
+        "//go/skerr",
         "//go/util",
         "@org_golang_x_oauth2//:oauth2",
     ],
diff --git a/go/gcr/gcr.go b/go/gcr/gcr.go
index 374177a..d2b50d8 100644
--- a/go/gcr/gcr.go
+++ b/go/gcr/gcr.go
@@ -15,18 +15,39 @@
 package gcr
 
 import (
+	"context"
 	"encoding/json"
 	"fmt"
 	"net/http"
+	"regexp"
+	"strings"
 	"time"
 
 	"go.skia.org/infra/go/httputils"
+	"go.skia.org/infra/go/skerr"
 	"go.skia.org/infra/go/util"
 	"golang.org/x/oauth2"
 )
 
 const (
-	SERVER = "gcr.io"
+	Server = "gcr.io"
+)
+
+var (
+	// DockerTagRegex is used to parse a Docker image tag as set by our
+	// infrastructure, which uses the following format:
+	//
+	// ${datetime}-${user}-${git_hash:0:7}-${repo_state}
+	//
+	// Where datetime is a UTC timestamp following the format:
+	//
+	// +%Y-%m-%dT%H_%M_%SZ
+	//
+	// User is the username of the person who built the image, git_hash is the
+	// abbreviated Git commit hash at which the image was built, and repo_state
+	// is either "clean" or "dirty", depending on whether there were local
+	// changes to the checkout at the time when the image was built.
+	DockerTagRegex = regexp.MustCompile(`(\d{4}-\d{2}-\d{2}T\d{2}_\d{2}_\d{2}Z)-(\w+)-([a-f0-9]+)-(\w+)`)
 )
 
 // gcrTokenSource it an oauth2.TokenSource that works with the Google Container Registry API.
@@ -43,23 +64,23 @@
 
 func (g *gcrTokenSource) Token() (*oauth2.Token, error) {
 	// Use the authorized client to get a gcr.io specific oauth token.
-	resp, err := g.client.Get(fmt.Sprintf("https://%s/v2/token?scope=repository:%s/%s:pull", SERVER, g.projectId, g.imageName))
+	resp, err := g.client.Get(fmt.Sprintf("https://%s/v2/token?scope=repository:%s/%s:pull", Server, g.projectId, g.imageName))
 	if err != nil {
 		return nil, err
 	}
 	defer util.Close(resp.Body)
 	if resp.StatusCode != 200 {
-		return nil, fmt.Errorf("Got unexpected status: %s", resp.Status)
+		return nil, skerr.Fmt("Got unexpected status: %s", resp.Status)
 	}
 	var res struct {
 		AccessToken  string `json:"token"`
 		ExpiresInSec int    `json:"expires_in"`
 	}
 	if err := json.NewDecoder(resp.Body).Decode(&res); err != nil {
-		return nil, fmt.Errorf("Invalid token JSON from metadata: %v", err)
+		return nil, skerr.Wrapf(err, "Invalid token JSON from metadata: %v", err)
 	}
 	if res.ExpiresInSec == 0 || res.AccessToken == "" {
-		return nil, fmt.Errorf("Incomplete token received from metadata: %#v", res)
+		return nil, skerr.Fmt("Incomplete token received from metadata: %#v", res)
 	}
 	return &oauth2.Token{
 		AccessToken: res.AccessToken,
@@ -97,23 +118,57 @@
 	}
 }
 
+// TagsResponse is the response returned by Tags().
+type TagsResponse struct {
+	Manifest map[string]struct {
+		ImageSizeBytes string   `json:"imageSizeBytes"`
+		LayerID        string   `json:"layerId"`
+		Tags           []string `json:"tag"`
+		TimeCreatedMs  string   `json:"timeCreatedMs"`
+		TimeUploadedMs string   `json:"timeUploadedMs"`
+	} `json:"manifest"`
+	Name string   `json:"name"`
+	Tags []string `json:"tags"`
+}
+
 // Tags returns all of the tags for all versions of the image.
-func (c *Client) Tags() ([]string, error) {
-	// TODO(jcgregorio) Look for link rel=next header to do pagination. https://docs.docker.com/registry/spec/api/#listing-image-tags
-	resp, err := c.client.Get(fmt.Sprintf("https://%s/v2/%s/%s/tags/list", SERVER, c.projectId, c.imageName))
-	if err != nil {
-		return nil, fmt.Errorf("Failed to request tags: %s", err)
+func (c *Client) Tags(ctx context.Context) (*TagsResponse, error) {
+	var rv *TagsResponse
+	const batchSize = 100
+	url := fmt.Sprintf("https://%s/v2/%s/%s/tags/list?n=%d", Server, c.projectId, c.imageName, batchSize)
+	for {
+		req, err := http.NewRequest("GET", url, nil)
+		if err != nil {
+			return nil, skerr.Wrapf(err, "failed to create HTTP request")
+		}
+		req = req.WithContext(ctx)
+		req.Header.Add("Accept", "*")
+		resp, err := c.client.Do(req)
+		if err != nil {
+			return nil, skerr.Wrapf(err, "failed to request tags")
+		}
+		defer util.Close(resp.Body)
+		if resp.StatusCode != 200 {
+			return nil, skerr.Fmt("Got unexpected response: %s", resp.Status)
+		}
+		response := new(TagsResponse)
+		if err := json.NewDecoder(resp.Body).Decode(response); err != nil {
+			return nil, skerr.Wrapf(err, "could not decode response")
+		}
+		if rv == nil {
+			rv = response
+		} else {
+			rv.Tags = append(rv.Tags, response.Tags...)
+			for k, v := range response.Manifest {
+				rv.Manifest[k] = v
+			}
+		}
+
+		nextUrl, ok := resp.Header["Link"]
+		if !ok {
+			break
+		}
+		url = strings.Split(nextUrl[0], ";")[0]
 	}
-	defer util.Close(resp.Body)
-	if resp.StatusCode != 200 {
-		return nil, fmt.Errorf("Got unexpected response: %s", resp.Status)
-	}
-	type Response struct {
-		Tags []string `json:"tags"`
-	}
-	var response Response
-	if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
-		return nil, fmt.Errorf("Could not decode response: %s", err)
-	}
-	return response.Tags, nil
+	return rv, nil
 }
diff --git a/go/gcr/gcr_test.go b/go/gcr/gcr_test.go
index 3aa1337..4324ba8 100644
--- a/go/gcr/gcr_test.go
+++ b/go/gcr/gcr_test.go
@@ -1,6 +1,7 @@
 package gcr
 
 import (
+	"context"
 	"fmt"
 	"testing"
 
@@ -11,27 +12,129 @@
 
 func TestTags(t *testing.T) {
 	unittest.SmallTest(t)
-	url := fmt.Sprintf("https://%s/v2/skia-public/docserver/tags/list", SERVER)
+	ctx := context.Background()
+	url := fmt.Sprintf("https://%s/v2/skia-public/docserver/tags/list?n=100", Server)
 	m := mockhttpclient.NewURLMock()
-	m.Mock(url, mockhttpclient.MockGetDialogue([]byte(`{"tags": ["foo", "bar"]}`)))
+	m.Mock(url, mockhttpclient.MockGetDialogue([]byte(`{
+		"name": "docserver",
+		"manifest": {"sha256:abc123": {
+			"imageSizeBytes": "32",
+			"layerId": "0",
+			"tag": ["foo", "bar"],
+			"timeCreatedMs": "1655387750000",
+			"timeUploadedMs": "1655387810000"
+		}},
+		"tags": ["foo", "bar"]
+	}`)))
 	c := &Client{
 		client:    m.Client(),
 		projectId: "skia-public",
 		imageName: "docserver",
 	}
-	tags, err := c.Tags()
+	tagsResp, err := c.Tags(ctx)
 	assert.NoError(t, err)
-	assert.Equal(t, []string{"foo", "bar"}, tags)
+	assert.Equal(t, &TagsResponse{
+		Name: "docserver",
+		Manifest: map[string]struct {
+			ImageSizeBytes string   `json:"imageSizeBytes"`
+			LayerID        string   `json:"layerId"`
+			Tags           []string `json:"tag"`
+			TimeCreatedMs  string   `json:"timeCreatedMs"`
+			TimeUploadedMs string   `json:"timeUploadedMs"`
+		}{
+			"sha256:abc123": {
+				ImageSizeBytes: "32",
+				LayerID:        "0",
+				Tags:           []string{"foo", "bar"},
+				TimeCreatedMs:  "1655387750000",
+				TimeUploadedMs: "1655387810000",
+			},
+		},
+		Tags: []string{"foo", "bar"},
+	}, tagsResp)
 
 	c.imageName = "unknown"
-	tags, err = c.Tags()
+	tagsResp, err = c.Tags(ctx)
+	assert.Error(t, err)
+}
+
+func TestTags_Pagination(t *testing.T) {
+	unittest.SmallTest(t)
+
+	ctx := context.Background()
+	url := fmt.Sprintf("https://%s/v2/skia-public/docserver/tags/list?n=100", Server)
+	m := mockhttpclient.NewURLMock()
+	nextUrl := fmt.Sprintf("https://%s/v2/skia-public/docserver/tags/list?n=100&last=bar", Server)
+	m.Mock(url, mockhttpclient.MockGetDialogueWithResponseHeaders(
+		[]byte(`{
+			"name": "docserver",
+			"manifest": {"sha256:abc123": {
+				"imageSizeBytes": "32",
+				"layerId": "0",
+				"tag": ["foo", "bar"],
+				"timeCreatedMs": "1655387750000",
+				"timeUploadedMs": "1655387810000"
+			}},
+			"tags": ["foo", "bar"]
+		}`),
+		map[string][]string{
+			"Link": {nextUrl + "; rel=\"next\""},
+		},
+	))
+	m.Mock(nextUrl, mockhttpclient.MockGetDialogue([]byte(`{
+		"name": "docserver",
+		"manifest": {"sha256:def456": {
+			"imageSizeBytes": "64",
+			"layerId": "0",
+			"tag": ["baz"],
+			"timeCreatedMs": "1655387750000",
+			"timeUploadedMs": "1655387810000"
+		}},
+		"tags": ["baz"]
+	}`)))
+	c := &Client{
+		client:    m.Client(),
+		projectId: "skia-public",
+		imageName: "docserver",
+	}
+	tagsResp, err := c.Tags(ctx)
+	assert.NoError(t, err)
+	assert.Equal(t, &TagsResponse{
+		Name: "docserver",
+		Manifest: map[string]struct {
+			ImageSizeBytes string   `json:"imageSizeBytes"`
+			LayerID        string   `json:"layerId"`
+			Tags           []string `json:"tag"`
+			TimeCreatedMs  string   `json:"timeCreatedMs"`
+			TimeUploadedMs string   `json:"timeUploadedMs"`
+		}{
+			"sha256:abc123": {
+				ImageSizeBytes: "32",
+				LayerID:        "0",
+				Tags:           []string{"foo", "bar"},
+				TimeCreatedMs:  "1655387750000",
+				TimeUploadedMs: "1655387810000",
+			},
+			"sha256:def456": {
+				ImageSizeBytes: "64",
+				LayerID:        "0",
+				Tags:           []string{"baz"},
+				TimeCreatedMs:  "1655387750000",
+				TimeUploadedMs: "1655387810000",
+			},
+		},
+		Tags: []string{"foo", "bar", "baz"},
+	}, tagsResp)
+
+	c.imageName = "unknown"
+	tagsResp, err = c.Tags(ctx)
 	assert.Error(t, err)
 }
 
 func TestGcrTokenSource(t *testing.T) {
 	unittest.SmallTest(t)
 	m := mockhttpclient.NewURLMock()
-	url := fmt.Sprintf("https://%s/v2/token?scope=repository:skia-public/docserver:pull", SERVER)
+	url := fmt.Sprintf("https://%s/v2/token?scope=repository:skia-public/docserver:pull", Server)
 	m.Mock(url, mockhttpclient.MockGetDialogue([]byte(`{"token": "foo", "expires_in": 3600}`)))
 
 	ts := &gcrTokenSource{
diff --git a/go/mockhttpclient/urlmock.go b/go/mockhttpclient/urlmock.go
index 00992e0..071a867 100644
--- a/go/mockhttpclient/urlmock.go
+++ b/go/mockhttpclient/urlmock.go
@@ -158,6 +158,18 @@
 	}
 }
 
+func MockGetDialogueWithResponseHeaders(responseBody []byte, responseHeaders map[string][]string) MockDialogue {
+	return MockDialogue{
+		requestMethod:   "GET",
+		requestType:     "",
+		requestPayload:  nil,
+		responseStatus:  "OK",
+		responseCode:    http.StatusOK,
+		responsePayload: responseBody,
+		responseHeaders: responseHeaders,
+	}
+}
+
 func MockPostDialogue(requestType string, requestBody, responseBody []byte) MockDialogue {
 	return MockDialogue{
 		requestMethod:  "POST",
diff --git a/kube/go/pushk/main.go b/kube/go/pushk/main.go
index 2a37920..6bee191 100644
--- a/kube/go/pushk/main.go
+++ b/kube/go/pushk/main.go
@@ -174,7 +174,7 @@
 	}
 
 	// The full docker image name and tag of the image we want to deploy.
-	return fmt.Sprintf("%s/%s/%s:%s", gcr.SERVER, containerRegistryProject, imageName, tag), nil
+	return fmt.Sprintf("%s/%s/%s:%s", gcr.Server, containerRegistryProject, imageName, tag), nil
 }
 
 // byClusterFromChanged returns a map from cluster name to the list of modified
@@ -257,7 +257,11 @@
 	sklog.Infof("Pushing the following images: %q", imageNames)
 
 	gcrTagProvider := func(imageName string) ([]string, error) {
-		return gcr.NewClient(tokenSource, containerRegistryProject, imageName).Tags()
+		tagsResp, err := gcr.NewClient(tokenSource, containerRegistryProject, imageName).Tags(ctx)
+		if err != nil {
+			return nil, err
+		}
+		return tagsResp.Tags, nil
 	}
 
 	// Search through the yaml files looking for those that use the provided image names.