[datahopper] Fixes for CD pipeline metrics

Change-Id: Ia6d4bdc18701aeb45b628af4633712eed685a229
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/552398
Reviewed-by: Joe Gregorio <jcgregorio@google.com>
Commit-Queue: Eric Boren <borenet@google.com>
diff --git a/datahopper/go/cd_metrics/BUILD.bazel b/datahopper/go/cd_metrics/BUILD.bazel
index bddfa3d..6438dc8 100644
--- a/datahopper/go/cd_metrics/BUILD.bazel
+++ b/datahopper/go/cd_metrics/BUILD.bazel
@@ -6,9 +6,9 @@
     importpath = "go.skia.org/infra/datahopper/go/cd_metrics",
     visibility = ["//visibility:public"],
     deps = [
-        "//go/auth",
         "//go/gcr",
         "//go/git/repograph",
+        "//go/gitstore/bt_gitstore",
         "//go/metrics2",
         "//go/metrics2/events",
         "//go/skerr",
diff --git a/datahopper/go/cd_metrics/cd_metrics.go b/datahopper/go/cd_metrics/cd_metrics.go
index fc6d4c0..04b0654 100644
--- a/datahopper/go/cd_metrics/cd_metrics.go
+++ b/datahopper/go/cd_metrics/cd_metrics.go
@@ -8,9 +8,9 @@
 	"strings"
 	"time"
 
-	"go.skia.org/infra/go/auth"
 	"go.skia.org/infra/go/gcr"
 	"go.skia.org/infra/go/git/repograph"
+	"go.skia.org/infra/go/gitstore/bt_gitstore"
 	"go.skia.org/infra/go/metrics2"
 	"go.skia.org/infra/go/metrics2/events"
 	"go.skia.org/infra/go/skerr"
@@ -43,21 +43,21 @@
 	beginningOfTime = time.Date(2022, time.June, 13, 0, 0, 0, 0, time.UTC)
 
 	timePeriods = []time.Duration{24 * time.Hour, 7 * 24 * time.Hour}
+
+	repoUrls = []string{infraRepoUrl, k8sConfigRepoUrl}
 )
 
 // cycle performs one cycle of metrics ingestion.
-func cycle(ctx context.Context, imageName string, repos repograph.Map, edb events.EventDB, em *events.EventMetrics, lastFinished, now time.Time) ([]metrics2.Int64Metric, error) {
-	sklog.Infof("Cycle.")
+func cycle(ctx context.Context, imageName string, repos repograph.Map, edb events.EventDB, em *events.EventMetrics, lastFinished, now time.Time, ts oauth2.TokenSource) ([]metrics2.Int64Metric, error) {
+	sklog.Infof("CD metrics for %s", imageName)
 	// Setup.
 	infraRepo := repos[infraRepoUrl]
-	//k8sConfigRepo := repos[k8sConfigRepourl]
-	ts := auth.NewGCloudTokenSource(containerRegistryProject)
 	gcrClient := gcr.NewClient(ts, containerRegistryProject, imageName)
 	resp, err := gcrClient.Tags(ctx)
 	if err != nil {
 		return nil, skerr.Wrapf(err, "failed to retrieve Docker image data")
 	}
-	sklog.Infof("Found %d docker images.", len(resp.Manifest))
+	sklog.Infof("  Found %d docker images for %s", len(resp.Manifest), imageName)
 
 	// Create a mapping of shortened hash to commit details for easy retrieval.
 	commits, err := infraRepo.GetCommitsNewerThan(lastFinished.Add(-overlapDuration))
@@ -68,7 +68,7 @@
 	for _, c := range commits {
 		commitMap[c.Hash[:commitHashLength]] = c
 	}
-	sklog.Infof("Found %d commits since %s.", len(commitMap), lastFinished.Add(-overlapDuration))
+	sklog.Infof("  Found %d commits since %s.", len(commitMap), lastFinished.Add(-overlapDuration))
 
 	// Go through the Docker images we've uploaded and map them to the commits
 	// from which they were built.
@@ -125,6 +125,7 @@
 			dockerImageTime = ts
 		}
 
+		// Log the commit-to-docker-image latency for this commit.
 		logStr := fmt.Sprintf("%s: %s", commit.Hash[:7], dockerImageTime.Sub(commit.Timestamp))
 		if ok {
 			logStr += fmt.Sprintf(" (%s)", commitToDockerImageDigest[commit])
@@ -229,9 +230,9 @@
 }
 
 // Start initiates the metrics data generation for Docker images.
-func Start(ctx context.Context, repos repograph.Map, imageNames []string, btProject, btInstance string, ts oauth2.TokenSource) error {
+func Start(ctx context.Context, imageNames []string, btConf *bt_gitstore.BTConfig, ts oauth2.TokenSource) error {
 	// Set up event metrics.
-	edb, err := events.NewBTEventDB(ctx, btProject, btInstance, ts)
+	edb, err := events.NewBTEventDB(ctx, btConf.ProjectID, btConf.InstanceID, ts)
 	if err != nil {
 		return skerr.Wrapf(err, "Failed to create EventDB")
 	}
@@ -239,6 +240,10 @@
 	if err != nil {
 		return skerr.Wrapf(err, "failed to create EventMetrics")
 	}
+	repos, err := bt_gitstore.NewBTGitStoreMap(ctx, repoUrls, btConf)
+	if err != nil {
+		sklog.Fatal(err)
+	}
 
 	// Find the timestamp of the last-ingested commit.
 	lastFinished := time.Now()
@@ -262,17 +267,20 @@
 	lv := metrics2.NewLiveness("last_successful_cd_pipeline_metrics")
 	oldMetrics := []metrics2.Int64Metric{}
 	go util.RepeatCtx(ctx, 10*time.Minute, func(ctx context.Context) {
-		sklog.Infof("Loop start.")
-		// TODO(borenet): Is this handled elsewhere in Datahopper?
+		sklog.Infof("CD metrics loop start.")
+
+		// These repos aren't shared with the rest of Datahopper, so we need to
+		// update them.
 		if err := repos.Update(ctx); err != nil {
 			sklog.Errorf("Failed to update repos: %s", err)
 			return
 		}
+
 		now := time.Now()
 		anyFailed := false
 		newMetrics := []metrics2.Int64Metric{}
 		for _, imageName := range imageNames {
-			m, err := cycle(ctx, imageName, repos, edb, em, lastFinished, now)
+			m, err := cycle(ctx, imageName, repos, edb, em, lastFinished, now, ts)
 			if err != nil {
 				sklog.Errorf("Failed to obtain CD pipeline metrics: %s", err)
 				anyFailed = true
@@ -301,7 +309,7 @@
 			}
 
 		}
-		sklog.Infof("Loop end.")
+		sklog.Infof("CD metrics loop end.")
 	})
 	return nil
 }
diff --git a/datahopper/go/datahopper/main.go b/datahopper/go/datahopper/main.go
index b3095f1..0a71b03 100644
--- a/datahopper/go/datahopper/main.go
+++ b/datahopper/go/datahopper/main.go
@@ -213,8 +213,10 @@
 	}
 
 	// Metrics for the Continuous Deployment pipeline.
-	if err := cd_metrics.Start(ctx, repos, *dockerImageNames, *btProject, *btInstance, ts); err != nil {
-		sklog.Fatal(err)
+	if len(*dockerImageNames) > 0 {
+		if err := cd_metrics.Start(ctx, *dockerImageNames, btConf, ts); err != nil {
+			sklog.Fatal(err)
+		}
 	}
 
 	// Wait while the above goroutines generate data.