blob: 76b190cbd72b321055e6ca467941f8918611cd29 [file] [log] [blame]
// powercycle_server_ansible is an application that watches the machine server
// and powercycles test machines that need powercycling.
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"io/fs"
"net/http"
"net/url"
"regexp"
"time"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/machine/go/configs"
"go.skia.org/infra/machine/go/machineserver/config"
"go.skia.org/infra/machine/go/machineserver/rpc"
"go.skia.org/infra/skolo/go/powercycle"
"go.skia.org/infra/skolo/sys"
"golang.org/x/oauth2/google"
)
var (
// Version can be changed via -ldflags.
Version = "development"
)
// Flags
var (
configFlag = flag.String("config", "", "The name of the configuration file.")
local = flag.Bool("local", false, "Running locally if true. As opposed to in production.")
powercycleConfigFilename = flag.String("powercycle_config", "", "The name of the config file for powercycle.Controller.")
promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':20000')")
machineServerHost = flag.String("machine_server", "https://machines.skia.org", "A URL with the scheme and domain name of the machine hosting the machine server API.")
)
var (
// urlExpansionRegex is used to replace gorilla mux URL variables with
// values.
urlExpansionRegex = regexp.MustCompile("{.*}")
)
func main() {
common.InitWithMust(
"powercycle_server_ansible",
common.PrometheusOpt(promPort),
common.CloudLogging(local, "skia-public"),
)
sklog.Infof("Version: %s", Version)
ctx := context.Background()
if *powercycleConfigFilename == "" {
sklog.Fatal("--powercycle_config flag must be supplied.")
}
if *configFlag == "" {
sklog.Fatal("--config flag must be supplied.")
}
// Construct store.
var instanceConfig config.InstanceConfig
b, err := fs.ReadFile(configs.Configs, *configFlag)
if err != nil {
sklog.Fatalf("Failed to read config file %q: %s", *configFlag, err)
}
err = json.Unmarshal(b, &instanceConfig)
if err != nil {
sklog.Fatal(err)
}
// Construct powercycle controller.
powerCycleConfigBytes, err := fs.ReadFile(sys.Sys, *powercycleConfigFilename)
if err != nil {
sklog.Fatalf("Failed to read config file %q: %s", *powercycleConfigFilename, err)
}
ts, err := google.DefaultTokenSource(ctx, "email")
if err != nil {
sklog.Fatalf("Failed to create tokensource: %s", err)
}
httpClient := httputils.DefaultClientConfig().WithTokenSource(ts).With2xxOnly().WithoutRetries().Client()
sklog.Infof("Building powercycle.Controller from %q", *powercycleConfigFilename)
controllerInitCallback, err := buildPowerCycleControllerCallback(httpClient, *machineServerHost)
if err != nil {
sklog.Fatalf("Failed to create powercycle.Controller callback: %s", err)
}
powercycleController, err := powercycle.ControllerFromJSON5Bytes(ctx, powerCycleConfigBytes, true, controllerInitCallback)
if err != nil {
sklog.Fatalf("Failed to instantiate powercycle.Controller: %s", err)
}
watchForPowerCycle(ctx, httpClient, *machineServerHost, powercycleController)
select {}
}
// buildPowerCycleControllerCallback returns a callback function that sends the
// power cycle state for each machine attached to a controller to
// machines.skia.org.
func buildPowerCycleControllerCallback(client *http.Client, machineServer string) (powercycle.ControllerInitCB, error) {
// Create the absolute URL we are POSTing data to.
u, err := url.Parse(machineServer)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to parse machineserver flag: %s", machineServer)
}
u.Path = rpc.PowerCycleStateUpdateURL
absolutePowerCycleStateUpdateURL := u.String()
return func(update rpc.UpdatePowerCycleStateRequest) error {
b, err := json.Marshal(update)
if err != nil {
return skerr.Wrapf(err, "Failed to encode rpc.UpdatePowerCycleStateRequest")
}
_, err = client.Post(absolutePowerCycleStateUpdateURL, "application/json", bytes.NewReader(b))
if err != nil {
return skerr.Wrapf(err, "Failed to send rpc.UpdatePowerCycleStateRequest to machineserver")
}
return nil
}, nil
}
// watchForPowerCycle loops forever powercycling machines, this function does
// not return.
func watchForPowerCycle(ctx context.Context, httpClient *http.Client, machineServer string, powercycleController powercycle.Controller) {
stepFailure := metrics2.GetCounter("powercycle_server_ansible_step_failure")
stepSuccess := metrics2.GetCounter("powercycle_server_ansible_step_success")
deviceIDs := powercycleController.DeviceIDs()
for range time.Tick(time.Second * 5) {
if err := singleStep(ctx, httpClient, machineServer, deviceIDs, powercycleController); err != nil {
sklog.Errorf("Failed a singleStep of powercycle: %s", err)
stepFailure.Inc(1)
} else {
stepSuccess.Inc(1)
}
}
}
// in returns true if machineID is in the list of deviceIDs.
func in(machineID string, deviceIDs []powercycle.DeviceID) bool {
ret := false
for _, deviceID := range deviceIDs {
if string(deviceID) == machineID {
return true
}
}
return ret
}
// singleStep does a single round of requesting the list of devices to
// powercycle and attempts to powercycle each machine id in the
// powercycleControllers purview.
func singleStep(ctx context.Context, httpClient *http.Client, machineServer string, deviceIDs []powercycle.DeviceID, powercycleController powercycle.Controller) error {
ret := []error{}
u, err := url.Parse(machineServer)
if err != nil {
return skerr.Wrapf(err, "Failed to parse machineserver flag: %s", machineServer)
}
u.Path = rpc.PowerCycleListURL
resp, err := httpClient.Get(u.String())
if err != nil {
return skerr.Wrapf(err, "Failed to retrieve list of devices needing powercycling.")
}
defer util.Close(resp.Body)
var list rpc.ListPowerCycleResponse
err = json.NewDecoder(resp.Body).Decode(&list)
if err != nil {
return skerr.Wrapf(err, "Failed to decode list of devices needing powercycling.")
}
for _, machineID := range list {
if !in(machineID, deviceIDs) {
continue
}
if err := powercycleController.PowerCycle(ctx, powercycle.DeviceID(machineID), 0); err != nil {
ret = append(ret, skerr.Wrapf(err, "Failed to powercycle %q", machineID))
continue
} else {
sklog.Infof("Successfully powercycled: %q", machineID)
}
// We expand rpc.PowerCycleCompleteURL with machineID.
u.Path = urlExpansionRegex.ReplaceAllLiteralString(rpc.PowerCycleCompleteURL, machineID)
_, err = httpClient.Post(u.String(), "text/plain", nil)
if err != nil {
ret = append(ret, skerr.Wrapf(err, "Failed to update machines after powercycling: %q", machineID))
}
}
if len(ret) > 0 {
return ret[0]
}
return nil
}