blob: 8d6455732202383c9cb210301aa1ad00952cee83 [file] [log] [blame]
// Package goldpushk contains the Goldpushk struct, which coordinates all the operations performed
// by goldpushk.
//
// Also included in this package is function ProductionDeployableUnits(), which returns a set with
// all the services goldpushk is able to manage.
//
// Function ProductionDeployableUnits is the source of truth of goldpushk, and should be updated to
// reflect any relevant changes in configuration.
//
// For testing, function TestingDeployableUnits should be used instead, which only contains
// information about testing services that can be deployed the public or corp clusters without
// disrupting any production services.
package goldpushk
import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
osexec "os/exec"
"path/filepath"
"regexp"
"strings"
"text/tabwriter"
"time"
"go.skia.org/infra/go/exec"
"go.skia.org/infra/go/git"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
)
const (
// Paths below are relative to $SKIA_INFRA_ROOT/golden.
k8sConfigTemplatesDir = "k8s-config-templates"
k8sInstancesDir = "k8s-instances"
// kubectl timestamp format as of September 30, 2019.
//
// $ kubectl version --short
// Client Version: v1.16.0
// Server Version: v1.14.6-gke.1
kubectlTimestampLayout = "2006-01-02T15:04:05Z"
// Time to wait between the push and monitoring steps, to give Kubernetes a chance to update the
// status of the affected pods.
delayBetweenPushAndMonitoring = 10 * time.Second
// Kubernetes does not like colons in its strings, so we can't use time.RFC3999 (or any of the
// provided formats) as is. This replaces the colons with underscores.
rfc3999KubernetesSafe = "2006-01-02T15_04_05Z07_00"
)
// cluster represents a Kubernetes cluster on which to deploy DeployableUnits, and contains all the
// information necessary to switch between clusters with the "gcloud" command.
type cluster struct {
name string // e.g. "skia-corp".
projectID string // e.g. "google.com:skia-corp".
}
var (
clusterSkiaPublic = cluster{name: "skia-public", projectID: "skia-public"}
clusterSkiaCorp = cluster{name: "skia-corp", projectID: "google.com:skia-corp"}
)
// Goldpushk contains information about the deployment steps to be carried out.
type Goldpushk struct {
// Input parameters (provided via flags or environment variables).
deployableUnits []DeployableUnit
canariedDeployableUnits []DeployableUnit
goldSrcDir string // Path to the golden directory in the skia-infra checkout.
dryRun bool
noCommit bool
minUptimeSeconds int
uptimePollFrequencySeconds int
// Other constructor parameters.
k8sConfigRepoUrl string
verbose bool
// Checked out Git repository with k8s config files.
k8sConfigCheckout *git.TempCheckout
// The Kubernetes cluster that the kubectl command is currently configured to use.
currentCluster cluster
// Miscellaneous.
unitTest bool // Disables confirmation prompt from unit tests.
disableCopyingConfigsToCheckout bool
// If set, will return this time from .now() instead of the actual time. Used for tests.
fakeNow time.Time
}
// New is the Goldpushk constructor.
func New(deployableUnits []DeployableUnit, canariedDeployableUnits []DeployableUnit, skiaInfraRootPath string, dryRun, noCommit bool, minUptimeSeconds, uptimePollFrequencySeconds int, k8sConfigRepoUrl string, verbose bool) *Goldpushk {
return &Goldpushk{
deployableUnits: deployableUnits,
canariedDeployableUnits: canariedDeployableUnits,
goldSrcDir: filepath.Join(skiaInfraRootPath, "golden"),
dryRun: dryRun,
noCommit: noCommit,
minUptimeSeconds: minUptimeSeconds,
uptimePollFrequencySeconds: uptimePollFrequencySeconds,
k8sConfigRepoUrl: k8sConfigRepoUrl,
verbose: verbose,
}
}
// Run carries out the deployment steps.
func (g *Goldpushk) Run(ctx context.Context) error {
// Print out list of targeted deployable units, and ask for confirmation.
if ok, err := g.printOutInputsAndAskConfirmation(); err != nil {
return skerr.Wrap(err)
} else if !ok {
return nil
}
// Check out k8s-config.
if err := g.checkOutK8sConfigRepo(ctx); err != nil {
return skerr.Wrap(err)
}
defer g.k8sConfigCheckout.Delete()
// Regenerate config files.
if err := g.regenerateConfigFiles(ctx); err != nil {
return skerr.Wrap(err)
}
// Commit config files, giving the user the option to abort.
if ok, err := g.commitConfigFiles(ctx); err != nil {
return skerr.Wrap(err)
} else if !ok {
return nil
}
// Deploy canaries.
if err := g.pushCanaries(ctx); err != nil {
return skerr.Wrap(err)
}
// Monitor canaries.
if err := g.monitorCanaries(ctx); err != nil {
return skerr.Wrap(err)
}
// Deploy remaining DeployableUnits.
if err := g.pushServices(ctx); err != nil {
return skerr.Wrap(err)
}
// Monitor remaining DeployableUnits.
if err := g.monitorServices(ctx); err != nil {
return skerr.Wrap(err)
}
// Give the user a chance to examine the generated files before exiting and cleaning up the Git
// repository.
if g.dryRun {
fmt.Println("\nDry-run finished. Any generated files can be found in the k8s-config Git repository checkout below:")
fmt.Printf(" %s\n", g.k8sConfigCheckout.GitDir)
fmt.Println("Press enter to delete the checkout above and exit.")
if _, err := fmt.Scanln(); err != nil {
return skerr.Wrap(err)
}
}
return nil
}
// printOutInputsAndAskConfirmation prints out a summary of the actions to be
// taken, then asks the user for confirmation.
func (g *Goldpushk) printOutInputsAndAskConfirmation() (bool, error) {
// Skip if running from an unit test.
if g.unitTest {
return true, nil
}
// Print out a summary of the services to deploy.
if len(g.canariedDeployableUnits) != 0 {
fmt.Println("The following services will be canaried:")
for _, d := range g.canariedDeployableUnits {
fmt.Printf(" %s\n", d.CanonicalName())
}
fmt.Println()
}
fmt.Println("The following services will be deployed:")
for _, d := range g.deployableUnits {
fmt.Printf(" %s\n", d.CanonicalName())
}
// Ask for confirmation, ending execution by default.
ok, err := prompt("\nProceed?")
if err != nil {
return false, skerr.Wrap(err)
}
if !ok {
fmt.Println("Aborting.")
return false, nil
}
return true, nil
}
// prompt prints out a question to stdout and scans a y/n answer from stdin.
func prompt(question string) (bool, error) {
fmt.Printf("%s (y/N): ", question)
var input string
if _, err := fmt.Scanln(&input); err != nil {
return false, skerr.Wrapf(err, "unable to read from standard input")
}
if input != "y" {
return false, nil
}
return true, nil
}
// checkOutK8sConfigRepo checks out the k8s-config Git repository.
func (g *Goldpushk) checkOutK8sConfigRepo(ctx context.Context) error {
fmt.Println()
var err error
g.k8sConfigCheckout, err = git.NewTempCheckout(ctx, g.k8sConfigRepoUrl)
if err != nil {
return skerr.Wrapf(err, "failed to check out %s", g.k8sConfigRepoUrl)
}
fmt.Printf("Cloned Git repository %s at %s.\n", g.k8sConfigRepoUrl, string(g.k8sConfigCheckout.GitDir))
return nil
}
// regenerateConfigFiles regenerates the .yaml and .json5 files for each
// instance/service pair that will be deployed. Any generated files will be
// checked into the corresponding Git repository with configuration files.
func (g *Goldpushk) regenerateConfigFiles(ctx context.Context) error {
// We keep track of which instance-specific configuration files have been copied in case
// they are required by two or more DeployableUnits; it suffices to copy said files once.
instanceConfigsCopied := map[Instance]bool{}
// Iterate over all units to deploy (including canaries).
return g.forAllDeployableUnits(func(unit DeployableUnit) error {
// Path to the template file inside $SKIA_INFRA_ROOT/golden.
tPath := unit.getDeploymentFileTemplatePath(g.goldSrcDir)
// Path to the deployment file (.yaml) we will regenerate inside the k8s-config Git repository.
oPath := g.getDeploymentFilePath(unit)
// Regenerate .yaml file.
if err := g.expandTemplate(ctx, unit, tPath, oPath); err != nil {
return skerr.Wrapf(err, "error while regenerating %s", oPath)
}
if !instanceConfigsCopied[unit.Instance] {
instanceConfigsCopied[unit.Instance] = true
// Copy all configuration files from the appropriate instance directory into the
// k8s-config repo so they can be checked in.
instanceConfigDirectory := g.getInstanceSpecificConfigDir(unit.Instance)
checkoutDirectory := g.getGitRepoSubdirPath(unit)
err := g.copyConfigsToCheckout(instanceConfigDirectory, checkoutDirectory)
if err != nil {
return skerr.Wrap(err)
}
}
return nil
})
}
// copyConfigsToCheckout copies all JSON5 configurations from the provided directory
// into the given checkout directory. Upon copying, the files will have the prefix "gold-".
func (g *Goldpushk) copyConfigsToCheckout(configDir, checkoutDir string) error {
if g.disableCopyingConfigsToCheckout {
return nil
}
jsonFiles, err := ioutil.ReadDir(configDir)
if err != nil {
return skerr.Wrap(err)
}
for _, jf := range jsonFiles {
if !strings.HasSuffix(jf.Name(), ".json5") {
continue
}
// Bad things will happen if there are multiple configuration files with the same name,
// as one will overwrite the other. If it becomes a problem, we could try to detect it.
dstFile := filepath.Join(checkoutDir, "gold-"+jf.Name())
srcFile := filepath.Join(configDir, jf.Name())
b, err := ioutil.ReadFile(srcFile)
if err != nil {
return skerr.Wrapf(err, "reading %s", srcFile)
}
if err := ioutil.WriteFile(dstFile, b, 0644); err != nil {
return skerr.Wrapf(err, "writing %s", dstFile)
}
}
return nil
}
// getInstanceSpecificConfigDir returns the path to the JSON5 configuration files for a given
// instance. These are checked in to the infra repo.
func (g *Goldpushk) getInstanceSpecificConfigDir(inst Instance) string {
return filepath.Join(g.goldSrcDir, k8sInstancesDir, string(inst))
}
// getDeploymentFilePath returns the path to the deployment file (.yaml) for the
// given DeployableUnit inside the k8s-config Git repository.
func (g *Goldpushk) getDeploymentFilePath(unit DeployableUnit) string {
return filepath.Join(g.getGitRepoSubdirPath(unit), unit.CanonicalName()+".yaml")
}
// getGitRepoSubdirPath returns the path to the subdirectory inside the k8s-config
// repository checkout in which the config files for the given DeployableUnit
// should be checked in (e.g. /path/to/k8s-config/skia-public-config).
func (g *Goldpushk) getGitRepoSubdirPath(unit DeployableUnit) string {
subdir := clusterSkiaPublic.name
if unit.internal {
subdir = clusterSkiaCorp.name
}
return filepath.Join(string(g.k8sConfigCheckout.GitDir), subdir)
}
// expandTemplate executes the kube-conf-gen command with arguments sufficient to produce the
// templated yaml files that control a Kuberenetes deployment. It makes use of the instance
// specific configuration files.
func (g *Goldpushk) expandTemplate(ctx context.Context, unit DeployableUnit, templatePath, outputPath string) error {
goldCommonJSON5 := filepath.Join(g.goldSrcDir, k8sConfigTemplatesDir, "gold-common.json5")
instanceStr := string(unit.Instance)
instanceJSON5 := fmt.Sprintf("%s.json5", unit.Instance)
instanceJSON5 = filepath.Join(g.getInstanceSpecificConfigDir(unit.Instance), instanceJSON5)
serviceJSON5 := fmt.Sprintf("%s-%s.json5", unit.Instance, unit.Service)
serviceJSON5 = filepath.Join(g.getInstanceSpecificConfigDir(unit.Instance), serviceJSON5)
err := g.execCmd(ctx, "kube-conf-gen", []string{
// Notes on the kube-conf-gen arguments used:
// - Flag "-extra INSTANCE_ID:<instanceStr>" binds template variable
// INSTANCE_ID to instanceStr.
// - Flag "-strict" will make kube-conf-gen fail in the presence of
// unsupported types, missing data, etc.
// - Flag "-parse_conf=false" prevents the values read from the JSON5
// config files provided with -c <json5-file> from being converted to
// strings.
"-c", goldCommonJSON5,
"-c", instanceJSON5,
"-c", serviceJSON5,
"-extra", "INSTANCE_ID:" + instanceStr,
"-extra", "NOW:" + g.now().Format(rfc3999KubernetesSafe),
"-t", templatePath,
"-parse_conf=false", "-strict",
"-o", outputPath,
})
if err != nil {
return skerr.Wrap(err)
}
sklog.Infof("Generated %s", outputPath)
return nil
}
// commitConfigFiles prints out a summary of the changes to be committed to
// k8s-config, asks for confirmation and pushes those changes.
func (g *Goldpushk) commitConfigFiles(ctx context.Context) (bool, error) {
// Print out summary of changes (git status -s).
if err := g.printOutGitStatus(ctx); err != nil {
return false, skerr.Wrap(err)
}
// Skip if --no-commit or --dryrun.
if g.dryRun || g.noCommit {
reason := "dry run"
if g.noCommit {
reason = "no commit"
}
fmt.Printf("\nSkipping commit step (%s).\n", reason)
return true, nil
}
// Ask for confirmation.
ok, err := prompt("\nCommit and push the above changes? Answering no will abort execution.")
if err != nil {
return false, skerr.Wrap(err)
}
if !ok {
return false, nil
}
fmt.Println()
// Skip if the k8s-config checkout has no changes (i.e. if "git status -s" prints out nothing).
stdout, err := g.k8sConfigCheckout.Git(ctx, "status", "-s")
if err != nil {
return false, skerr.Wrap(err)
}
if len(stdout) == 0 {
return true, nil
}
// Add, commit and push changes.
fmt.Printf("Pushing changes to the k8s-config Git repository.\n")
if _, err := g.k8sConfigCheckout.Git(ctx, "add", "."); err != nil {
return false, skerr.Wrap(err)
}
if _, err := g.k8sConfigCheckout.Git(ctx, "commit", "-m", "Push"); err != nil {
return false, skerr.Wrap(err)
}
if _, err := g.k8sConfigCheckout.Git(ctx, "push", git.DefaultRemote, git.DefaultBranch); err != nil {
return false, skerr.Wrap(err)
}
return true, nil
}
// printOutGitStatus runs "git status -s" on the k8s-config checkout and prints its output to stdout.
func (g *Goldpushk) printOutGitStatus(ctx context.Context) error {
stdout, err := g.k8sConfigCheckout.Git(ctx, "status", "-s")
if err != nil {
return skerr.Wrap(err)
}
if len(stdout) == 0 {
fmt.Printf("\nNo changes to be pushed to the k8s-config Git repository.\n")
} else {
fmt.Printf("\nChanges to be pushed to the k8s-config Git repository:\n")
fmt.Print(stdout)
}
return nil
}
// pushCanaries deploys the canaried DeployableUnits.
func (g *Goldpushk) pushCanaries(ctx context.Context) error {
if len(g.canariedDeployableUnits) == 0 {
return nil
}
fmt.Println("\nPushing canaried services.")
if err := g.pushDeployableUnits(ctx, g.canariedDeployableUnits); err != nil {
return skerr.Wrap(err)
}
return nil
}
// monitorCanaries monitors the canaried DeployableUnits after they have been pushed to production.
func (g *Goldpushk) monitorCanaries(ctx context.Context) error {
if len(g.canariedDeployableUnits) == 0 {
return nil
}
if err := g.monitor(ctx, g.canariedDeployableUnits, g.getUptimes, time.Sleep); err != nil {
return skerr.Wrap(err)
}
return nil
}
// pushServices deploys the non-canaried DeployableUnits.
func (g *Goldpushk) pushServices(ctx context.Context) error {
if len(g.canariedDeployableUnits) == 0 {
fmt.Println("\nPushing services.")
} else {
fmt.Println("\nPushing remaining services.")
}
if err := g.pushDeployableUnits(ctx, g.deployableUnits); err != nil {
return skerr.Wrap(err)
}
return nil
}
// monitorServices monitors the non-canaried DeployableUnits after they have been pushed to
// production.
func (g *Goldpushk) monitorServices(ctx context.Context) error {
if err := g.monitor(ctx, g.deployableUnits, g.getUptimes, time.Sleep); err != nil {
return skerr.Wrap(err)
}
return nil
}
// pushDeployableUnits takes a slice of DeployableUnits and pushes them to their corresponding
// clusters.
func (g *Goldpushk) pushDeployableUnits(ctx context.Context, units []DeployableUnit) error {
if g.dryRun {
fmt.Println("\nSkipping push step (dry run).")
return nil
}
// We want to make sure we push configs for an instance only once on a given deploy command.
instanceSpecificConfigMapsPushed := map[Instance]bool{}
for _, unit := range units {
if err := g.pushSingleDeployableUnit(ctx, unit, instanceSpecificConfigMapsPushed); err != nil {
return skerr.Wrap(err)
}
}
return nil
}
// pushSingleDeployableUnit pushes the given DeployableUnit to the corresponding cluster by running
// "kubectl apply -f path/to/config.yaml".
func (g *Goldpushk) pushSingleDeployableUnit(ctx context.Context, unit DeployableUnit, instanceSpecificConfigMapsPushed map[Instance]bool) error {
// Get the cluster corresponding to the given DeployableUnit.
cluster := clusterSkiaPublic
if unit.internal {
cluster = clusterSkiaCorp
}
// Switch clusters.
if err := g.switchClusters(ctx, cluster); err != nil {
return skerr.Wrap(err)
}
if !instanceSpecificConfigMapsPushed[unit.Instance] {
instanceSpecificConfigMapsPushed[unit.Instance] = true
if err := g.pushConfigurationJSON(ctx, unit.Instance); err != nil {
return skerr.Wrap(err)
}
}
// Push DeployableUnit.
path := g.getDeploymentFilePath(unit)
fmt.Printf("%s: applying %s.\n", unit.CanonicalName(), path)
if err := g.execCmd(ctx, "kubectl", []string{"apply", "-f", path}); err != nil {
return skerr.Wrap(err)
}
return nil
}
// pushConfigurationJSON pushes all the configuration files for a given instance. This includes
// all JSON5 files for all services. This is done because all services overlap with some common
// configuration.
func (g *Goldpushk) pushConfigurationJSON(ctx context.Context, instance Instance) error {
configMapName := fmt.Sprintf("gold-%s-config", instance)
instanceConfigDirectory := g.getInstanceSpecificConfigDir(instance)
if err := g.pushConfigMap(ctx, instanceConfigDirectory, configMapName); err != nil {
return skerr.Wrapf(err, "pushing the configuration files at %s", instanceConfigDirectory)
}
return nil
}
// pushConfigMap pushes the file(s) at a given path as a config map with the given name. It deletes
// any pre-existing map before, so as to overwrite it. The path for config maps may be to a single
// file or an entire directory.
func (g *Goldpushk) pushConfigMap(ctx context.Context, path, configMapName string) error {
// Delete existing ConfigMap.
if err := g.execCmd(ctx, "kubectl", []string{"delete", "configmap", configMapName}); err != nil {
// Command "kubectl delete configmap" returns exit code 1 when the ConfigMap does not exist on
// the cluster.
var exitError *osexec.ExitError
if errors.As(skerr.Unwrap(err), &exitError) && exitError.ExitCode() == 1 {
sklog.Infof("Did not delete ConfigMap %s as it does not exist on the cluster.", configMapName)
} else {
return skerr.Wrap(err)
}
}
// Create new ConfigMap.
if err := g.execCmd(ctx, "kubectl", []string{"create", "configmap", configMapName, "--from-file", path}); err != nil {
return skerr.Wrap(err)
}
return nil
}
// switchClusters runs the "gcloud" command necessary to switch kubectl to the given cluster.
func (g *Goldpushk) switchClusters(ctx context.Context, cluster cluster) error {
if g.currentCluster != cluster {
sklog.Infof("Switching to cluster %s\n", cluster.name)
if err := g.execCmd(ctx, "gcloud", []string{"container", "clusters", "get-credentials", cluster.name, "--zone", "us-central1-a", "--project", cluster.projectID}); err != nil {
return skerr.Wrap(err)
}
g.currentCluster = cluster
}
return nil
}
// uptimesFn has the same signature as method Goldpushk.getUptimes(). To facilitate testing, method
// Goldpushk.monitor() takes an uptimesFn instance as a parameter instead of calling
// Goldpushk.getUptimes() directly.
type uptimesFn func(context.Context, []DeployableUnit, time.Time) (map[DeployableUnitID]time.Duration, error)
// sleepFn has the same signature as time.Sleep(). Its purpose is to enable mocking that function
// from tests.
type sleepFn func(time.Duration)
// monitor watches the state of the given DeployableUnits and returns as soon as they seem to be
// up and running on their respective clusters.
//
// It does so by polling the clusters via kubectl every N seconds, and it prints out a status table
// on each iteration.
func (g *Goldpushk) monitor(ctx context.Context, units []DeployableUnit, getUptimes uptimesFn, sleep sleepFn) error {
fmt.Printf("\nMonitoring the following services until they all reach %d seconds of uptime (polling every %d seconds):\n", g.minUptimeSeconds, g.uptimePollFrequencySeconds)
for _, unit := range units {
fmt.Printf(" %s\n", unit.CanonicalName())
}
if g.dryRun {
fmt.Println("\nSkipping monitoring step (dry run).")
return nil
}
fmt.Printf("\nWaiting %d seconds before starting the monitoring step.\n", int(delayBetweenPushAndMonitoring.Seconds()))
sleep(delayBetweenPushAndMonitoring)
// Estimate the width of the status table to print at each monitoring iteration. This table
// consists of three columns: UPTIME, READY and NAME. The first two are both 10 characters wide.
statusTableWidth := 20 // UPTIME + READY.
// We determine the width of column NAME by looking for the longest name among all DeployableUnits
// to monitor (e.g. "gold-skia-diffserver").
longestName := 0
for _, unit := range units {
if len(unit.CanonicalName()) > longestName {
longestName = len(unit.CanonicalName())
}
}
statusTableWidth += longestName
// Print status table header.
w := tabwriter.NewWriter(os.Stdout, 10, 0, 2, ' ', 0)
if _, err := fmt.Fprintln(w, "\nUPTIME\tREADY\tNAME"); err != nil {
return skerr.Wrap(err)
}
if err := w.Flush(); err != nil {
return skerr.Wrap(err)
}
// Monitoring loop.
for {
// Get uptimes.
uptimes, err := getUptimes(ctx, units, time.Now())
if err != nil {
return skerr.Wrap(err)
}
// Print out uptimes for each DeployableUnit.
for _, unit := range units {
// First we assume the DeployableUnit has no uptime. If it's not yet ready, it won't show up
// in the uptimes dictionary.
uptimeStr := "<None>"
ready := "No"
// We now check if it does have an uptime, and update the variables above accordingly if so.
if t, ok := uptimes[unit.DeployableUnitID]; ok {
uptimeStr = fmt.Sprintf("%ds", int64(t.Seconds()))
if int(t.Seconds()) >= g.minUptimeSeconds {
ready = "Yes"
}
}
// Print out a row in the status table for the current DeployableUnit.
if _, err := fmt.Fprintf(w, "%s\t%s\t%s\n", uptimeStr, ready, unit.CanonicalName()); err != nil {
return skerr.Wrap(err)
}
}
// Have all DeployableUnits been in the "ready" state for at least minUptimeSeconds?
done := true
for _, unit := range units {
if t, ok := uptimes[unit.DeployableUnitID]; !ok || int(t.Seconds()) < g.minUptimeSeconds {
done = false
break
}
}
// If so, break out of the monitoring loop.
if done {
if err := w.Flush(); err != nil {
return skerr.Wrap(err)
}
return nil
}
// Otherwise, print a horizontal line to separate the uptimes from this iteration and the next.
for i := 0; i < statusTableWidth; i++ {
if _, err := fmt.Fprintf(w, "-"); err != nil {
return skerr.Wrap(err)
}
}
if _, err := fmt.Fprintf(w, "\n"); err != nil {
return skerr.Wrap(err)
}
if err := w.Flush(); err != nil {
return skerr.Wrap(err)
}
// Wait before polling again.
sleep(time.Duration(g.uptimePollFrequencySeconds) * time.Second)
}
}
// getUptimes groups the given DeployableUnits by cluster, calls getUptimesSingleCluster once per
// cluster, and returns the union of the uptimes returned by both calls to getUptimesSingleCluster.
func (g *Goldpushk) getUptimes(ctx context.Context, units []DeployableUnit, now time.Time) (map[DeployableUnitID]time.Duration, error) {
// Group units by cluster.
publicUnits := []DeployableUnit{}
corpUnits := []DeployableUnit{}
for _, unit := range units {
if unit.internal {
corpUnits = append(corpUnits, unit)
} else {
publicUnits = append(publicUnits, unit)
}
}
// This will hold the uptimes from both clusters.
allUptimes := make(map[DeployableUnitID]time.Duration)
// Once per cluster.
for i := 0; i < 2; i++ {
// Select the right cluster and DeployableUnits.
cluster := clusterSkiaPublic
units := publicUnits
if i == 1 {
cluster = clusterSkiaCorp
units = corpUnits
}
// Skip if no units.
if len(units) == 0 {
continue
}
// Switch to the current cluster.
if err := g.switchClusters(ctx, cluster); err != nil {
return nil, skerr.Wrap(err)
}
// Get the uptimes for the current cluster.
uptimes, err := g.getUptimesSingleCluster(ctx, units, now)
if err != nil {
return nil, skerr.Wrap(err)
}
// Add uptimes to the multicluster map.
for unitID, uptime := range uptimes {
allUptimes[unitID] = uptime
}
}
return allUptimes, nil
}
// getUptimesSingleCluster takes a slice of DeployableUnits and returns a dictionary mapping
// DeployableUnitIDs to the time duration since all the pods corresponding to that unit have entered
// the "Ready" state.
//
// A DeployableUnit will not have a corresponding entry in the returned map if any of its pods are
// not yet ready, or if no matching pods are returned by kubectl.
//
// This method makes the following assumptions:
// - All the given DeployableUnits belong to the same Kubernetes cluster.
// - kubectl is already set up to operate on that cluster.
// - A DeployableUnit may correspond to more than one pod (e.g. ReplicaSets).
func (g *Goldpushk) getUptimesSingleCluster(ctx context.Context, units []DeployableUnit, now time.Time) (map[DeployableUnitID]time.Duration, error) {
// JSONPath expression to be passed to kubectl. Below is a sample fragment of what the output
// looks like:
//
// app:gold-chrome-gpu-baselineserver podName:gold-chrome-gpu-baselineserver-5dfd8b65cb-9krrl ready:True readyLastTransitionTime:2019-10-03T16:45:48Z
// app:gold-chrome-gpu-baselineserver podName:gold-chrome-gpu-baselineserver-5dfd8b65cb-hr86n ready:True readyLastTransitionTime:2019-09-30T13:20:39Z
// app:gold-chrome-gpu-baselineserver podName:gold-chrome-gpu-baselineserver-5dfd8b65cb-l4lt5 ready:True readyLastTransitionTime:2019-10-04T01:59:40Z
// app:gold-chrome-gpu-diffserver podName:gold-chrome-gpu-diffserver-0 ready:True readyLastTransitionTime:2019-10-09T18:43:08Z
// app:gold-chrome-gpu-ingestion-bt podName:gold-chrome-gpu-ingestion-bt-f8b66844f-4969w ready:True readyLastTransitionTime:2019-10-04T01:54:54Z
// app:gold-chrome-gpu-frontend podName:gold-chrome-gpu-frontend-67c547667d-cwt42 ready:True readyLastTransitionTime:2019-10-04T02:01:11Z
//
// The output format should be fairly self explanatory, but to see an example of where those
// are coming from, try running e.g. "kubectl get pod gold-skia-diffserver-0 -o json".
//
// Note: Field podName is not used, and is only included for debugging purposes. It will be
// printed out to stdout if flag --logtostderr is passed.
jsonPathExpr := `
{range .items[*]}
{'app:'}
{.metadata.labels.app}
{' podName:'}
{.metadata.name}
{' ready:'}
{.status.conditions[?(@.type == 'Ready')].status}
{' readyLastTransitionTime:'}
{.status.conditions[?(@.type == 'Ready')].lastTransitionTime}
{'\n'}
{end}`
jsonPathExpr = strings.ReplaceAll(jsonPathExpr, "\n", "")
// Execute kubectl command that will return per-pod uptime.
stdout, err := g.execCmdAndReturnStdout(ctx, "kubectl", []string{"get", "pods", "-o", fmt.Sprintf("jsonpath=%s", jsonPathExpr)})
if err != nil {
return nil, skerr.Wrap(err)
}
// This map will hold the uptimes parsed from the command output.
uptime := make(map[DeployableUnitID]time.Duration)
// If at least one of the pods corresponding to a DeployableUnit is not ready, then it will be
// excluded from the returned dictionary.
//
// Take for example the fictitious "kubectl get pods ..." command output below:
// ...
// app:gold-chrome-gpu-baselineserver podName:gold-chrome-gpu-baselineserver-5dfd8b65cb-9krrl ready:True readyLastTransitionTime:2019-10-03T16:45:48Z
// app:gold-chrome-gpu-baselineserver podName:gold-chrome-gpu-baselineserver-5dfd8b65cb-hr86n ready:False readyLastTransitionTime:2019-09-30T13:20:39Z
// app:gold-chrome-gpu-baselineserver podName:gold-chrome-gpu-baselineserver-5dfd8b65cb-l4lt5 ready:True readyLastTransitionTime:2019-10-04T01:59:40Z
// ...
// In this example, DeployableUnit "gold-chrome-gpu-baselineserver" will be excluded from the
// returned dictionary because one of its pods is not yet ready.
//
// The dictionary below keeps track of which DeployableUnits to exclude from this method's output.
excludeFromOutput := make(map[DeployableUnitID]bool)
// We will parse each output line using this regular expression.
re := regexp.MustCompile(`app:(?P<app>\S*)\s+podName:(?P<podName>\S+)\s+ready:(?P<ready>\S+)\s+readyLastTransitionTime:(?P<readyLastTransitionTime>\S+)`)
// Iterate over all output lines.
for _, line := range strings.Split(stdout, "\n") {
// Skip empty line at the end.
if line == "" {
continue
}
// Parse line, e.g. "app:gold-chrome-gpu-baselineserver podName:gold-chrome-gpu-baselineserver-5dfd8b65cb-9krrl ready:True readyLastTransitionTime:2019-10-03T16:45:48Z"
matches := re.FindStringSubmatch(line)
// If for whatever reason the regular expression does not match, skip to the next line.
if len(matches) < 4 {
continue
}
// Extract values from current line.
app := matches[1] // e.g. "gold-chrome-gpu-baselineserver"
ready := matches[3] // e.g. "True"
readyLastTransitionTime := matches[4] // e.g. "2019-10-03T16:45:48Z"
// Iterate over the given DeployableUnits; see if there is a DeployableUnit that matches the
// current line.
var unitID DeployableUnitID
for _, unit := range units {
if unit.CanonicalName() == app {
unitID = unit.DeployableUnitID
}
}
// If no DeployableUnit matches, skip to the next line. This is OK since "kubectl get pods"
// returns information about all pods running on the cluster, and not just the ones we are
// interested in.
if unitID == (DeployableUnitID{}) {
continue
}
// If the pod is not yet ready, we exclude its corresponding DeployableUnit from the method's
// output.
if ready != "True" {
delete(uptime, unitID) // Delete it from the output if it was previously added.
excludeFromOutput[unitID] = true
continue
}
// If the DeployableUnit has been excluded from the output due to another of its pods not being
// ready, skip to the next line.
if _, ok := excludeFromOutput[unitID]; ok {
continue
}
// Parse the timestamp, e.g. "2019-09-30T13:20:33Z".
t, err := time.Parse(kubectlTimestampLayout, readyLastTransitionTime)
if err != nil {
return nil, skerr.Wrap(err)
}
// Compute the time duration since the pod corresponding to the current line has been ready.
readyFor := now.Sub(t)
// We'll report the uptime of the pod that became ready the most recently.
if currentMin, ok := uptime[unitID]; !ok || (readyFor < currentMin) {
uptime[unitID] = readyFor
}
}
return uptime, nil
}
// forAllDeployableUnits applies all deployable units (including canaried units)
// to the given function.
func (g *Goldpushk) forAllDeployableUnits(f func(unit DeployableUnit) error) error {
for _, unit := range g.deployableUnits {
if err := f(unit); err != nil {
return skerr.Wrap(err)
}
}
for _, unit := range g.canariedDeployableUnits {
if err := f(unit); err != nil {
return skerr.Wrap(err)
}
}
return nil
}
// now returns the current time in UTC or a mocked out time.
func (g *Goldpushk) now() time.Time {
if g.fakeNow.IsZero() {
return time.Now().UTC()
}
return g.fakeNow
}
// execCmd executes a command with the given arguments.
func (g *Goldpushk) execCmd(ctx context.Context, name string, args []string) error {
cmd := makeExecCommand(name, args, g.verbose)
if err := exec.Run(ctx, cmd); err != nil {
return skerr.Wrapf(err, "failed to run %s", cmdToDebugStr(cmd))
}
return nil
}
// execCmdAndReturnStdout executes a command with the given arguments and returns its output.
func (g *Goldpushk) execCmdAndReturnStdout(ctx context.Context, name string, args []string) (string, error) {
cmd := makeExecCommand(name, args, g.verbose)
stdout, err := exec.RunCommand(ctx, cmd)
if err != nil {
return "", skerr.Wrapf(err, "failed to run %s", cmdToDebugStr(cmd))
}
return stdout, nil
}
// makeExecCommand returns an exec.Command for the given command and arguments.
func makeExecCommand(name string, args []string, debug bool) *exec.Command {
cmd := &exec.Command{
Name: name,
Args: args,
InheritPath: true,
LogStderr: true,
LogStdout: true,
}
if debug {
cmd.Verbose = exec.Info
}
return cmd
}
// cmdToDebugStr returns a human-readable string representation of an *exec.Command.
func cmdToDebugStr(cmd *exec.Command) string {
return fmt.Sprintf("%s %s", cmd.Name, strings.Join(cmd.Args, " "))
}