blob: 734930bfbb54caed13b68bd013282bde68d5fd80 [file] [log] [blame]
// podwatcher is an application that monitors pods in a k8s cluster running
// swarming tasks.
// store.WatchForDeletablePods returns a channel that will produce the name of a
// k8s for every pod that becomes eligible for deletion. Note that since these
// k8s pods are from a daemonset the pod will automatically be restarted, but
// with the latest image. This is because the spec for the rpi-warming pods is
// spec:
// updateStrategy:
// type: OnDelete
// store.WatchForPowerCycle() returns a channel that will produce the name of a
// machine to powercycle.
package main
import (
var (
// Flags.
configFlag = flag.String("config", "", "The path to the configuration file.")
local = flag.Bool("local", false, "Running locally if true. As opposed to in production.")
powercycleConfigFilename = flag.String("powercycle_config", "", "The name of the config file for powercycle.Controller.")
promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':20000')")
func main() {
common.InitWithMust("podwatcher", common.PrometheusOpt(promPort))
ctx := context.Background()
if *powercycleConfigFilename == "" {
sklog.Fatal("--powercycle_config flag must be supplied.")
var instanceConfig config.InstanceConfig
err := util.WithReadFile(*configFlag, func(r io.Reader) error {
return json.NewDecoder(r).Decode(&instanceConfig)
if err != nil {
sklog.Fatalf("Failed to load config from %q", *configFlag, err)
store, err := store.New(ctx, *local, instanceConfig)
if err != nil {
sklog.Fatalf("Failed to build store instance: %s", err)
deleter, err := deleter.New()
if err != nil {
sklog.Fatalf("Failed to build deleter: %s", err)
sklog.Info("Building powercycle.Controller from %q", powercycleConfigFilename)
connectOnStartup := !*local
powercycleController, err := powercycle.ControllerFromJSON5(ctx, *powercycleConfigFilename, connectOnStartup)
if err != nil {
sklog.Fatalf("Failed to instantiate powercycle.Controller: %s", err)
successfulUpdates := metrics2.GetCounter("podreader_successful_update")
failedUpdates := metrics2.GetCounter("podreader_failed_update")
go func() {
for podname := range store.WatchForDeletablePods(ctx) {
if err := deleter.Delete(ctx, podname); err != nil {
sklog.Errorf("Failed to update pod by deleting it: %s", err)
sklog.Infof("Deleted: %q", podname)
sklog.Info("Exiting WatchForDeletablePods.")
// Start a loop that does a firestore onsnapshot watcher that gets machine names
// that need to be power-cycled.
go func() {
for machineID := range store.WatchForPowerCycle(ctx) {
if err := powercycleController.PowerCycle(ctx, powercycle.DeviceID(machineID), 0); err != nil {
sklog.Errorf("Failed to powercycle %q: %s", machineID, err)
} else {
sklog.Infof("Successfully powercycled: %q", machineID)
sklog.Info("Exiting WatchForPowerCycle")
select {}