blob: 8bcdb8890351ee31965dd16cce6202e4a1eb0976 [file] [log] [blame]
// k8s_checker is an application that checks for the following and alerts if necessary:
// * Dirty images checked into K8s config files.
// * Dirty configs running in K8s.
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"path/filepath"
"strings"
"time"
yaml "gopkg.in/yaml.v2"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/exec"
"go.skia.org/infra/go/git"
"go.skia.org/infra/go/gitauth"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
)
const (
IMAGE_DIRTY_SUFFIX = "-dirty"
// Metric names.
DIRTY_COMMITTED_IMAGE_METRIC = "dirty_committed_image_metric"
DIRTY_CONFIG_METRIC = "dirty_config_metric"
LIVENESS_DIRTY_CONFIGS_METRIC = "k8s_checker"
LIVENESS_POD_STATUS_METRIC = "k8s_checker_pod_status"
POD_STATUS_METRIC = "k8s_pod_status"
// Possible values for the Phase of a pod. More detail in the docs:
// https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase
POD_PHASE_PENDING = "Pending"
POD_PHASE_RUNNING = "Running"
POD_PHASE_SUCCEEDED = "Succeeded"
POD_PHASE_FAILED = "Failed"
POD_PHASE_UNKNOWN = "Unknown"
)
var (
// Flags.
k8sYamlRepo = flag.String("k8s_yaml_repo", "https://skia.googlesource.com/skia-public-config", "The repository where K8s yaml files are stored (eg: https://skia.googlesource.com/skia-public-config)")
kubeConfig = flag.String("kube_config", "/var/secrets/kube-config/kube_config", "The kube config of the project kubectl will query against.")
workdir = flag.String("workdir", "/tmp/", "Directory to use for scratch work.")
promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':20000')")
serviceAccountKey = flag.String("service_account_key", "", "Should be set when running in K8s.")
dirtyConfigChecksPeriod = flag.Duration("dirty_config_checks_period", 2*time.Minute, "How often to check for dirty configs/images in K8s.")
podStatusMetricsPeriod = flag.Duration("pod_status_metrics_period", time.Minute, "How often to update pod status metrics.")
)
type containerState struct {
Running *struct {
StartedAt time.Time `json:"startedAt"`
} `json:"running"`
Terminated *struct {
ContainerID string `json:"containerID"`
ExitCode int `json:"exitCode"`
FinishedAt time.Time `json:"finishedAt"`
Message string `json:"message"`
Reason string `json:"reason"`
Signal int `json:"signal"`
StartedAt time.Time `json:"startedAt"`
} `json:"terminated"`
Waiting *struct {
Message string `json:"message"`
Reason string `json:"reason"`
} `json:"waiting"`
}
type K8sPodsJson struct {
Items []struct {
Metadata struct {
Labels struct {
App string `json:"app"`
} `json:"labels"`
Name string `json:"name"`
} `json:"metadata"`
Spec struct {
Containers []struct {
Name string `json:"name"`
Image string `json:"image"`
} `json:"containers"`
} `json:"spec"`
Status struct {
Conditions []struct {
LastProbeTime time.Time `json:"lastProbeTime"`
LastTransitionTime time.Time `json:"lastTransitionTime"`
Status string `json:"status"`
Type string `json:"type"`
} `json:"conditions"`
ContainerStatuses []struct {
ContainerID string `json:"containerID"`
Image string `json:"image"`
ImageID string `json:"imageID"`
LastState *containerState `json:"lastState"`
Name string `json:"name"`
Ready bool `json:"ready"`
RestartCount int `json:"restartCount"`
State *containerState `json:"state"`
} `json:"containerStatuses"`
Message string `json:"message"`
Phase string `json:"phase"`
Reason string `json:"reason"`
StartTime time.Time `json:"startTime"`
} `json:"status"`
} `json:"items"`
}
// getLiveAppContainersToImages returns a map of app names to their containers to the images running on them.
func getLiveAppContainersToImages(ctx context.Context) (map[string]map[string]string, error) {
// Get JSON output of pods running in K8s.
getPodsCommand := fmt.Sprintf("kubectl get pods --kubeconfig=%s -o json --field-selector=status.phase=Running", *kubeConfig)
output, err := exec.RunSimple(ctx, getPodsCommand)
if err != nil {
return nil, fmt.Errorf("Error when running \"%s\": %s", getPodsCommand, err)
}
liveAppContainersToImages := map[string]map[string]string{}
var podsJson K8sPodsJson
if err := json.Unmarshal([]byte(output), &podsJson); err != nil {
return nil, fmt.Errorf("Error when unmarshalling JSON: %s", err)
}
for _, item := range podsJson.Items {
app := item.Metadata.Labels.App
liveAppContainersToImages[app] = map[string]string{}
for _, container := range item.Spec.Containers {
liveAppContainersToImages[app][container.Name] = container.Image
}
}
return liveAppContainersToImages, nil
}
type K8sConfig struct {
Spec struct {
Template struct {
Metadata struct {
Labels struct {
App string `yaml:"app"`
} `yaml:"labels"`
} `yaml:"metadata"`
TemplateSpec struct {
Containers []struct {
Name string `yaml:"name"`
Image string `yaml:"image"`
} `yaml:"containers"`
} `yaml:"spec"`
} `yaml:"template"`
} `yaml:"spec"`
}
// checkForDirtyConfigs checks for:
// * Dirty images checked into K8s config files.
// * Dirty configs running in K8s.
// It takes in a map of oldMetrics, any metrics from that map that are not encountered during this
// invocation of the function are deleted. This is done to handle the case when metric tags
// change. Eg: liveImage in dirtyConfigMetricTags.
// It returns a map of newMetrics, which are all the metrics that were used during this
// invocation of the function.
func checkForDirtyConfigs(ctx context.Context, oldMetrics map[metrics2.Int64Metric]struct{}) (map[metrics2.Int64Metric]struct{}, error) {
sklog.Info("\n\n---------- New round of checking k8 dirty configs ----------\n\n")
// Get mapping from live apps to their containers and images.
liveAppContainerToImages, err := getLiveAppContainersToImages(ctx)
if err != nil {
return nil, fmt.Errorf("Could not get live pods from kubectl: %s", err)
}
// Checkout the K8s config repo.
// Use gitiles if this ends up giving us any problems.
g, err := git.NewCheckout(ctx, *k8sYamlRepo, *workdir)
if err != nil {
return nil, fmt.Errorf("Error when checking out %s: %s", *k8sYamlRepo, err)
}
if err := g.Update(ctx); err != nil {
return nil, fmt.Errorf("Error when updating %s: %s", *k8sYamlRepo, err)
}
files, err := ioutil.ReadDir(g.Dir())
if err != nil {
return nil, fmt.Errorf("Error when reading from %s: %s", g.Dir(), err)
}
newMetrics := map[metrics2.Int64Metric]struct{}{}
for _, f := range files {
if filepath.Ext(f.Name()) != ".yaml" {
// Only interested in YAML configs.
continue
}
b, err := ioutil.ReadFile(filepath.Join(g.Dir(), f.Name()))
if err != nil {
sklog.Fatal(err)
}
// There can be multiple YAML documents within a single YAML file.
yamlDocs := strings.Split(string(b), "---")
for _, yamlDoc := range yamlDocs {
var config K8sConfig
if err := yaml.Unmarshal([]byte(yamlDoc), &config); err != nil {
sklog.Fatal(err)
}
app := config.Spec.Template.Metadata.Labels.App
if app == "" {
// This YAML config does not have an app. Continue.
continue
}
for _, c := range config.Spec.Template.TemplateSpec.Containers {
container := c.Name
committedImage := c.Image
// Check if the image in the config is dirty.
dirtyCommittedMetricTags := map[string]string{
"yaml": f.Name(),
"repo": *k8sYamlRepo,
"committedImage": committedImage,
}
dirtyCommittedMetric := metrics2.GetInt64Metric(DIRTY_COMMITTED_IMAGE_METRIC, dirtyCommittedMetricTags)
newMetrics[dirtyCommittedMetric] = struct{}{}
if strings.HasSuffix(committedImage, IMAGE_DIRTY_SUFFIX) {
sklog.Infof("%s has a dirty committed image: %s\n\n", f.Name(), committedImage)
dirtyCommittedMetric.Update(1)
} else {
dirtyCommittedMetric.Update(0)
}
// Check if the image running in k8s matches the checked in image.
if liveContainersToImages, ok := liveAppContainerToImages[app]; ok {
if liveImage, ok := liveContainersToImages[container]; ok {
dirtyConfigMetricTags := map[string]string{
"app": app,
"container": container,
"yaml": f.Name(),
"repo": *k8sYamlRepo,
"committedImage": committedImage,
"liveImage": liveImage,
}
dirtyConfigMetric := metrics2.GetInt64Metric(DIRTY_CONFIG_METRIC, dirtyConfigMetricTags)
newMetrics[dirtyConfigMetric] = struct{}{}
if liveImage != committedImage {
dirtyConfigMetric.Update(1)
sklog.Infof("For app %s and container %s the running image differs from the image in config: %s != %s\n\n", app, container, liveImage, committedImage)
} else {
dirtyConfigMetric.Update(0)
}
} else {
sklog.Infof("There is no running container %s for the config file %s\n\n", container, f.Name())
}
} else {
sklog.Infof("There is no running app %s for the config file %s\n\n", app, f.Name())
}
}
}
}
// Delete unused old metrics.
for m := range oldMetrics {
if _, ok := newMetrics[m]; !ok {
if err := m.Delete(); err != nil {
sklog.Errorf("Failed to delete metric: %s", err)
// Add the metric to newMetrics so that we'll
// have the chance to delete it again on the
// next cycle.
newMetrics[m] = struct{}{}
}
}
}
return newMetrics, nil
}
func updatePodStatusMetrics(ctx context.Context, oldMetrics map[metrics2.Int64Metric]struct{}) (map[metrics2.Int64Metric]struct{}, error) {
now := time.Now()
// Get JSON output of pods running in K8s.
getPodsCommand := fmt.Sprintf("kubectl get pods --kubeconfig=%s -o json", *kubeConfig)
output, err := exec.RunSimple(ctx, getPodsCommand)
if err != nil {
return nil, fmt.Errorf("Error when running \"%s\": %s", getPodsCommand, err)
}
var podsJson K8sPodsJson
if err := json.Unmarshal([]byte(output), &podsJson); err != nil {
return nil, fmt.Errorf("Error when unmarshalling JSON: %s", err)
}
newMetrics := make(map[metrics2.Int64Metric]struct{}, len(podsJson.Items))
for _, item := range podsJson.Items {
// Attempt to find the time that the pod transitioned into its
// current state.
var lastTransitionTime time.Time
for _, condition := range item.Status.Conditions {
if condition.LastTransitionTime.After(lastTransitionTime) {
lastTransitionTime = condition.LastTransitionTime
}
}
if util.TimeIsZero(lastTransitionTime) && (item.Status.Phase == POD_PHASE_FAILED || item.Status.Phase == POD_PHASE_SUCCEEDED) {
lastTransitionTime = item.Status.StartTime
}
if util.TimeIsZero(lastTransitionTime) {
b, err := json.MarshalIndent(item, "", " ")
if err != nil {
return nil, err
}
sklog.Errorf("Could not find transition time for pod:\n%s", string(b))
lastTransitionTime = now
}
duration := int64(now.Sub(lastTransitionTime).Seconds())
for _, container := range item.Status.ContainerStatuses {
// Find an appropriate status. The possible values for
// Phase do not provide as much information as we'd
// like, so dig into the State object when possible.
status := item.Status.Phase
if status == POD_PHASE_RUNNING && container.State.Terminated != nil {
status = "Terminating"
}
if status == POD_PHASE_PENDING && container.State.Waiting != nil && container.State.Waiting.Reason != "" {
status = container.State.Waiting.Reason
}
// Update the metric.
tags := map[string]string{
"app": item.Metadata.Labels.App,
"container": container.Name,
"pod": item.Metadata.Name,
"repo": *k8sYamlRepo,
"status": status,
}
m := metrics2.GetInt64Metric(POD_STATUS_METRIC, tags)
newMetrics[m] = struct{}{}
delete(oldMetrics, m)
m.Update(duration)
sklog.Debugf(" %s:\t%s for %ds", item.Metadata.Name, status, duration)
}
}
for m := range oldMetrics {
if err := m.Delete(); err != nil {
sklog.Errorf("Failed to delete metric: %s", err)
// Add the metric to newMetrics so that we'll
// have the chance to delete it again on the
// next cycle.
newMetrics[m] = struct{}{}
}
}
return newMetrics, nil
}
func main() {
common.InitWithMust("k8s_checker", common.PrometheusOpt(promPort))
defer sklog.Flush()
ctx := context.Background()
if *serviceAccountKey != "" {
activationCmd := fmt.Sprintf("gcloud auth activate-service-account --key-file %s", *serviceAccountKey)
if _, err := exec.RunSimple(ctx, activationCmd); err != nil {
sklog.Fatal(err)
}
// Use the gitcookie created by gitauth package.
ts, err := auth.NewDefaultTokenSource(false, auth.SCOPE_USERINFO_EMAIL, auth.SCOPE_GERRIT)
if err != nil {
sklog.Fatal(err)
}
gitcookiesPath := filepath.Join(*workdir, ".gitcookies")
if _, err := gitauth.New(ts, gitcookiesPath, true, ""); err != nil {
sklog.Fatalf("Failed to create git cookie updater: %s", err)
}
}
livenessDirtyConfigs := metrics2.NewLiveness(LIVENESS_DIRTY_CONFIGS_METRIC)
oldMetricsDirtyConfigs := map[metrics2.Int64Metric]struct{}{}
go util.RepeatCtx(*dirtyConfigChecksPeriod, ctx, func(ctx context.Context) {
newMetrics, err := checkForDirtyConfigs(ctx, oldMetricsDirtyConfigs)
if err != nil {
sklog.Errorf("Error when checking for dirty configs: %s", err)
} else {
livenessDirtyConfigs.Reset()
oldMetricsDirtyConfigs = newMetrics
}
})
livenessPodStatus := metrics2.NewLiveness(LIVENESS_POD_STATUS_METRIC)
oldMetricsPodStatus := map[metrics2.Int64Metric]struct{}{}
go util.RepeatCtx(*podStatusMetricsPeriod, ctx, func(ctx context.Context) {
newMetrics, err := updatePodStatusMetrics(ctx, oldMetricsPodStatus)
if err != nil {
sklog.Errorf("Error when checking pod statuses: %s", err)
} else {
livenessPodStatus.Reset()
oldMetricsPodStatus = newMetrics
}
})
select {}
}