blob: 3d224a559d70ac271eab656cb5d63a236c05fa36 [file] [log] [blame]
// switch-pod-monitor is the main application running in a switch-pod,
// registering the pod with switchboard and periodically updating the pods
// LastUpdated value. It will also remove the pod from switchboard when the
// kubernetes tries to shutdown the pod. See http://go/skia-switchboard for more
// details.
package main
import (
"context"
"encoding/json"
"flag"
"io"
"os"
"os/signal"
"syscall"
"time"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/machine/go/machineserver/config"
"go.skia.org/infra/machine/go/switchboard"
)
var (
configFlag = flag.String("config", "../machine/go/configs/test.json", "The path to the configuration file.")
local = flag.Bool("local", false, "Running locally if true, as opposed to running in GCE.")
promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')")
)
func main() {
// We don't use common.Init() here because we want to register our own signal handlers.
flag.Parse()
metrics2.InitPrometheus(*promPort)
ctx := context.Background()
// Load config file.
var instanceConfig config.InstanceConfig
err := util.WithReadFile(*configFlag, func(r io.Reader) error {
return json.NewDecoder(r).Decode(&instanceConfig)
})
if err != nil {
sklog.Fatalf("Failed to open config file: %q: %s", *configFlag, err)
}
// Create Switchboard instance.
switchboardInstance, err := switchboard.New(ctx, *local, instanceConfig)
if err != nil {
sklog.Fatalf("Failed to initialize Switchboard instance: %s", err)
}
// Find hostname.
hostname, err := os.Hostname()
if err != nil {
sklog.Fatalf("Failed to load hostname: %s", err)
}
if err := connectToSwitchboardAndWait(ctx, hostname, switchboardInstance, switchboard.PodKeepAliveDuration, switchboard.PodMaxConsecutiveKeepAliveErrors); err != nil {
sklog.Fatal(err)
}
}
// connectToSwitchboardAndWait registers the pod with switchboard and does not
// return unless the given context is cancelled.
//
// keepAliveDuration is how often to call Switchboard.KeepAlivePod.
//
// maxConsecutiveKeepAliveErrors is the number of time Switchboard.KeepAlivePod
// calls are allowed to fail consecutively before returning an error.
func connectToSwitchboardAndWait(ctx context.Context, hostname string, switchboardInstance switchboard.Switchboard, keepAliveDuration time.Duration, maxConsecutiveKeepAliveErrors int) error {
// Kubernetes will send a SIGTERM when it wants to tell a pod it is going to
// be shutdown. After waiting for the graceful shutdown period, if the pod
// is still running it will be sent a SIGKILL. See
// https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-termination
// for more details.
//
// On syscall.SIGTERM we call Switchboard.RemovePod so this pod doesn't handle
// any more new switchboard connections. We then use the k8s grace period to wait
// for all the existing leases for machines to expire. At the end of the graceperiod
// the pod is killed.
//
// We also handle syscall.SIGINT so that we can kill the app using Ctrl-C
// when running on the desktop and RemovePod still gets called.
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
if err := switchboardInstance.AddPod(ctx, hostname); err != nil {
return skerr.Wrapf(err, "Failed to add pod: %q", hostname)
}
consecutiveFailures := 0
// Are we in the graceful shutdown phase of the pod lifecycle?
gracefulShutdown := false
for {
select {
case <-ctx.Done():
return nil
case sig := <-c:
sklog.Error("Got signal:", sig)
if err := switchboardInstance.RemovePod(ctx, hostname); err != nil {
sklog.Error(err)
}
gracefulShutdown = true
if sig == syscall.SIGINT {
sklog.Info("Exiting on SIGINT.")
os.Exit(0)
}
// Periodically call KeepAlivePod so we know the pod is still running.
case <-time.Tick(keepAliveDuration):
if gracefulShutdown {
// Check to see if there are any meeting points left in this pod
// that is, if any tasks are using test machines connected to
// this pod. If there are none then exit.
count, err := switchboardInstance.NumMeetingPointsForPod(ctx, hostname)
if err != nil {
sklog.Errorf("switchboard failed to count meeting points: %s", err)
continue
}
sklog.Infof("graceful shutdown mode for pod: %q num meeting points: %s", hostname, count)
if count == 0 {
return nil
}
} else {
err := switchboardInstance.KeepAlivePod(ctx, hostname)
if err != nil {
consecutiveFailures++
sklog.Errorf("Failed to keep pod alive: %s", err)
if consecutiveFailures >= maxConsecutiveKeepAliveErrors {
if err := switchboardInstance.RemovePod(ctx, hostname); err != nil {
sklog.Error(err)
}
return skerr.Wrapf(err, "Exiting: Switchpod.KeepAlivePod failed %d consecutive times", consecutiveFailures)
}
}
}
consecutiveFailures = 0
}
}
}