blob: 8d84838a713107727ca7a258d9f0e3015066a282 [file] [log] [blame]
// Package machine is for interacting with the machine state server. See //machine.
package machine
import (
"context"
"encoding/json"
"io"
"net/http"
"net/url"
"os"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"time"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/now"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/timer"
"go.skia.org/infra/go/util"
"go.skia.org/infra/machine/go/machine"
changeSource "go.skia.org/infra/machine/go/machine/change/source"
eventSink "go.skia.org/infra/machine/go/machine/event/sink"
"go.skia.org/infra/machine/go/machineserver/config"
"go.skia.org/infra/machine/go/machineserver/rpc"
"go.skia.org/infra/machine/go/test_machine_monitor/adb"
"go.skia.org/infra/machine/go/test_machine_monitor/ios"
"go.skia.org/infra/machine/go/test_machine_monitor/ssh"
"go.skia.org/infra/machine/go/test_machine_monitor/standalone"
"go.skia.org/infra/machine/go/test_machine_monitor/swarming"
"golang.org/x/oauth2/google"
)
const (
interrogateDuration = 30 * time.Second
// Recipes require some way to know what the user and ip address are of the device they are
// talking to. The existing (and easiest) way is to write a file that they know to read.
// That file is /tmp/ssh_machine.json. The file must be valid JSON and have a key called
// user_ip that is a string (see //infra/bots/recipe_modules/flavor/ssh.py in the skia repo)
defaultSSHMachineFileLocation = "/tmp/ssh_machine.json"
// How often we should poll machines.skia.org for an updated Description.
descriptionPollDuration = time.Minute
)
var (
// urlExpansionRegex is used to replace gorilla mux URL variables with
// values.
urlExpansionRegex = regexp.MustCompile("{.*}")
)
// Machine is the interface to the machine state server. See //machine.
type Machine struct {
// An authenticated http client that can talk to the machines.skia.org frontend.
client *http.Client
// An absolute URL used to retrieve this machines Description.
machineDescriptionURL string
// eventSink is how we send machine.Events to the machine state server.
eventSink eventSink.Sink
// changeSource emits events when the machine Description has changed on the
// server.
changeSource changeSource.Source
// adb makes calls to the adb server.
adb adb.Adb
// ios is an interface through which we talk to iOS devices.
ios ios.IOS
// ssh is an abstraction around an ssh executor
ssh ssh.SSH
// MachineID is the swarming id of the machine.
MachineID string
// Version of test_machine_monitor being run.
Version string
// startTime is the time when this machine started running.
startTime time.Time
// Metrics
interrogateTimer metrics2.Float64SummaryMetric
interrogateAndSendFailures metrics2.Counter
descriptionWatchArrivalCounter metrics2.Counter
// startSwarming is true if test_machine_monitor was used to launch Swarming.
startSwarming bool
// runningTask is true if the machine is currently running a swarming task.
runningTask bool
// mutex protects the description due to the fact it will be updated asynchronously via
// the firestore snapshot query.
mutex sync.Mutex
// description is provided by the machine state server. This tells us what
// to tell swarming, what our current mode is, etc.
description rpc.FrontendDescription
// sshMachineLocation is the name and path of the file to write the JSON data that specifies
// to recipes how to communicate with the device under test.
sshMachineLocation string
}
// New return an instance of *Machine.
func New(ctx context.Context, local bool, instanceConfig config.InstanceConfig, version string, startSwarming bool, machineServerHost string) (*Machine, error) {
sink, err := eventSink.New(ctx, local, instanceConfig)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to build sink instance.")
}
hostname, err := os.Hostname()
if err != nil {
return nil, skerr.Wrapf(err, "Could not determine hostname.")
}
machineID := os.Getenv(swarming.SwarmingBotIDEnvVar)
if machineID == "" {
// Fall back to hostname so we can track machines that
// test_machine_monitor is running on that don't also run Swarming.
machineID = hostname
}
// Construct the URL need to retrieve this machines Description.
u, err := url.Parse(machineServerHost)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to parse machineserver flag: %s", machineServerHost)
}
u.Path = urlExpansionRegex.ReplaceAllLiteralString(rpc.MachineDescriptionURL, machineID)
ts, err := google.DefaultTokenSource(ctx, "email")
if err != nil {
return nil, skerr.Wrapf(err, "Failed to create tokensource.")
}
httpClient := httputils.DefaultClientConfig().WithTokenSource(ts).With2xxOnly().WithoutRetries().Client()
changeSource, err := changeSource.New(ctx, local, instanceConfig.DescriptionChangeSource, machineID)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to create changeSource.")
}
return &Machine{
client: httpClient,
machineDescriptionURL: u.String(),
eventSink: sink,
changeSource: changeSource,
adb: adb.New(),
ios: ios.New(),
ssh: ssh.ExeImpl{},
sshMachineLocation: defaultSSHMachineFileLocation,
MachineID: machineID,
Version: version,
startTime: now.Now(ctx),
startSwarming: startSwarming,
interrogateTimer: metrics2.GetFloat64SummaryMetric("test_machine_monitor_interrogate_timer", map[string]string{"machine": machineID}),
interrogateAndSendFailures: metrics2.GetCounter("test_machine_monitor_interrogate_and_send_errors", map[string]string{"machine": machineID}),
descriptionWatchArrivalCounter: metrics2.GetCounter("test_machine_monitor_description_watch_arrival", map[string]string{"machine": machineID}),
}, nil
}
// interrogate the machine we are running on for state-related information. Compile that into a
// machine.Event and return it.
func (m *Machine) interrogate(ctx context.Context) (machine.Event, error) {
defer timer.NewWithSummary("interrogate", m.interrogateTimer).Stop()
ret := machine.NewEvent()
ret.Host.Name = m.MachineID
ret.Host.Version = m.Version
ret.Host.StartTime = m.startTime
ret.RunningSwarmingTask = m.runningTask
ret.LaunchedSwarming = m.startSwarming
var err error = nil
switch m.description.AttachedDevice {
case machine.AttachedDeviceSSH:
var ce machine.ChromeOS
if ce, err = m.tryInterrogatingChromeOSDevice(ctx); err == nil {
sklog.Infof("Successful communication with ChromeOS device: %#v", ce)
ret.ChromeOS = ce
}
case machine.AttachedDeviceAdb:
var ae machine.Android
if ae, err = m.tryInterrogatingAndroidDevice(ctx); err == nil {
sklog.Infof("Successful communication with abd device: %#v", ae)
ret.Android = ae
}
case machine.AttachedDeviceIOS:
var ie machine.IOS
if ie, err = m.tryInterrogatingIOSDevice(ctx); err == nil {
sklog.Infof("Successful communication with iOS device: %#v", ie)
ret.IOS = ie
}
case machine.AttachedDeviceNone:
var standaloneEvent machine.Standalone
sklog.Infof("No attached device set. Getting dimensions of host...")
if standaloneEvent = m.tryInterrogatingStandaloneHost(ctx); err == nil {
sklog.Infof("Successful interrogation of host: %#v", standaloneEvent)
ret.Standalone = standaloneEvent
}
default:
sklog.Errorf("Unhandled type of machine.AttachedDevice: %s", m.description.AttachedDevice)
}
return ret, skerr.Wrap(err)
}
// interrogateAndSend gathers the state for this machine and sends it to the
// sink. Of note, this does not directly determine what dimensions this machine
// should have. The machine server that listens to the events will determine the
// dimensions based on the reported state and any information it has from other
// sources (e.g. human-supplied details, previously attached devices)
func (m *Machine) interrogateAndSend(ctx context.Context) error {
event, err := m.interrogate(ctx)
if err != nil {
// Don't return an error here, otherwise Start() will always return err,
// for example, if an Android device is missing, and that's a fatal
// error.
sklog.Errorf("Failed to interrogate: %s", err)
}
if err := m.eventSink.Send(ctx, event); err != nil {
return skerr.Wrapf(err, "Failed to send interrogation step.")
}
return nil
}
// Start the background processes that send events to the sink and watch for
// changes to the Description.
func (m *Machine) Start(ctx context.Context) error {
// First do a single steps of interrogating to make sure that sending the
// event works. We don't do the same for the Description since this may be a
// new machine and retrieveDescription could fail.
if err := m.interrogateAndSend(ctx); err != nil {
return skerr.Wrap(err)
}
go m.startInterrogateLoop(ctx)
go m.startDescriptionWatch(ctx)
return nil
}
// Start a loop that scans for local devices and sends pubsub events with all
// the data every 30s.
func (m *Machine) startInterrogateLoop(ctx context.Context) {
util.RepeatCtx(ctx, interrogateDuration, func(ctx context.Context) {
if err := m.interrogateAndSend(ctx); err != nil {
m.interrogateAndSendFailures.Inc(1)
sklog.Errorf("interrogateAndSend failed: %s", err)
}
})
}
// retrieveDescription stores and updates the machine Description in m.description.
func (m *Machine) retrieveDescription(ctx context.Context) error {
req, err := http.NewRequestWithContext(ctx, "GET", m.machineDescriptionURL, nil)
if err != nil {
return skerr.Wrapf(err, "Failed to create HTTP request")
}
resp, err := m.client.Do(req)
if err != nil {
return skerr.Wrapf(err, "Failed to retrieve description from %q", m.machineDescriptionURL)
}
var desc rpc.FrontendDescription
if err := json.NewDecoder(resp.Body).Decode(&desc); err != nil {
return skerr.Wrapf(err, "Failed to decode description from %q", m.machineDescriptionURL)
}
m.UpdateDescription(desc)
m.descriptionWatchArrivalCounter.Inc(1)
return nil
}
// startDescriptionWatch starts a loop that continually looks for updates to the
// machine Description. This function does not return unless the context is
// cancelled.
func (m *Machine) startDescriptionWatch(ctx context.Context) {
changeCh := m.changeSource.Start(ctx)
tickCh := time.NewTicker(descriptionPollDuration).C
for {
select {
case <-changeCh:
if err := m.retrieveDescription(ctx); err != nil {
sklog.Errorf("Event driven retrieveDescription failed: %s", err)
}
case <-tickCh:
if err := m.retrieveDescription(ctx); err != nil {
sklog.Errorf("Timer driven retrieveDescription failed: %s", err)
}
case <-ctx.Done():
sklog.Errorf("context cancelled")
return
}
}
}
// UpdateDescription applies any change in behavior based on the new given description. This
// impacts what we tell Swarming, what mode we are in, if we should communicate with a device
// via SSH, etc.
func (m *Machine) UpdateDescription(desc rpc.FrontendDescription) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.description = desc
}
// DimensionsForSwarming returns the dimensions that should be reported to swarming.
func (m *Machine) DimensionsForSwarming() machine.SwarmingDimensions {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.description.Dimensions
}
// GetMaintenanceMode returns true if the machine should report to Swarming that it is
// in maintenance mode. Swarming does not have a "recovery" mode, so we group that in.
// The message to display for Maintenance Mode is also returned.
func (m *Machine) GetMaintenanceMode() (bool, string) {
m.mutex.Lock()
defer m.mutex.Unlock()
ret := "Maintenance Mode"
if m.description.Note.Message != "" {
ret = m.description.Note.Message
}
return m.description.Mode == machine.ModeRecovery || m.description.Mode == machine.ModeMaintenance, ret
}
// SetIsRunningSwarmingTask records if a swarming task is being run.
func (m *Machine) SetIsRunningSwarmingTask(isRunning bool) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.runningTask = isRunning
}
// IsRunningSwarmingTask returns true is a swarming task is currently running.
func (m *Machine) IsRunningSwarmingTask() bool {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.runningTask
}
// RebootDevice reboots the attached device.
func (m *Machine) RebootDevice(ctx context.Context) error {
m.mutex.Lock()
shouldRebootAndroid := len(m.description.Dimensions[machine.DimAndroidDevices]) > 0
shouldRebootIOS := util.In("iOS", m.description.Dimensions[machine.DimOS])
sshUserIP := m.description.SSHUserIP
m.mutex.Unlock()
if shouldRebootAndroid {
return m.adb.Reboot(ctx)
} else if shouldRebootIOS {
return m.ios.Reboot(ctx)
} else if sshUserIP != "" {
return m.rebootChromeOS(ctx, sshUserIP)
}
sklog.Info("No attached device to reboot.")
return nil
}
// tryInterrogatingAndroidDevice attempts to communicate with an Android device using the
// adb interface. If there is one attached, this function returns nil and the information gathered
// (which can be partially filled out). If there is not a device attached, returns an error.
func (m *Machine) tryInterrogatingAndroidDevice(ctx context.Context) (machine.Android, error) {
metrics2.GetCounter("test_machine_monitor_interrogate_device_type", map[string]string{
"machine": m.MachineID,
"type": "android",
}).Inc(1)
sklog.Info("tryInterrogatingAndroidDevice")
var ret machine.Android
if err := m.adb.EnsureOnline(ctx); err != nil {
sklog.Warningf("No Android device is available: %s", err)
return ret, skerr.Wrapf(err, "No Android device is available")
}
if uptime, err := m.adb.Uptime(ctx); err != nil {
return ret, skerr.Wrapf(err, "Failed to read uptime - assuming there is no Android device attached.")
} else {
ret.Uptime = uptime
}
props, err := m.adb.RawProperties(ctx)
if err != nil {
return ret, skerr.Wrapf(err, "Failed to read android properties.")
}
ret.GetProp = props
if battery, err := m.adb.RawDumpSys(ctx, "battery"); err == nil {
ret.DumpsysBattery = battery
} else {
sklog.Warningf("Failed to read android battery status: %s", err)
}
if thermal, err := m.adb.RawDumpSys(ctx, "thermalservice"); err == nil {
ret.DumpsysThermalService = thermal
} else {
sklog.Warningf("Failed to read android thermal status.", err)
}
return ret, nil
}
// tryInterrogatingIOSDevice attempts to communicate with an attached iOS device. If there is one,
// this function returns nil and the information gathered (which can be partially filled out). If
// there is not a device attached, it returns an error, and the other return value is undefined. If
// multiple devices are attached, an arbitrary one is chosen.
func (m *Machine) tryInterrogatingIOSDevice(ctx context.Context) (machine.IOS, error) {
metrics2.GetCounter("test_machine_monitor_interrogate_device_type", map[string]string{
"machine": m.MachineID,
"type": "ios",
}).Inc(1)
sklog.Info("tryInterrogatingIOSDevice")
var ret machine.IOS
var err error
deviceType, err := m.ios.DeviceType(ctx)
if err != nil {
return ret, skerr.Wrap(err)
}
ret.DeviceType = deviceType
if osVersion, err := m.ios.OSVersion(ctx); err != nil {
sklog.Warningf("Failed to read iOS version, though we managed to read the device type: %s", err)
} else {
ret.OSVersion = osVersion
}
battery, err := m.ios.BatteryLevel(ctx)
if err != nil {
sklog.Warningf("Failed to read iOS device battery level, though we managed to read its device type: %s", err)
}
ret.Battery = battery
return ret, nil
}
// tryInterrogatingStandaloneHost gathers information about the test machine itself (rather than an
// attached device). It returns a Standlone struct which can be partially filled out; anything we
// didn't manage to fill out will be warned about.
func (m *Machine) tryInterrogatingStandaloneHost(ctx context.Context) (ret machine.Standalone) {
metrics2.GetCounter("test_machine_monitor_interrogate_device_type", map[string]string{
"machine": m.MachineID,
"type": "standalone",
}).Inc(1)
sklog.Info("tryInterrogatingStandaloneHost")
var err error
// On Mac and Win, Swarming returns the number of cores on the whole machine, so we might differ
// in principle (though not in practice) there. On Linux, Swarming returns the number of cores
// usable by its process, which lines up perfectly with the NumCPU() semantics.
ret.Cores = runtime.NumCPU()
ret.OSVersions, err = standalone.OSVersions(ctx)
if err != nil {
sklog.Warningf("Failed to read OS version of host: %s", err)
}
ret.CPUs, err = standalone.CPUs(ctx)
if err != nil {
sklog.Warningf("Failed to get CPU type of host: %s", err)
}
// TODO(erikrose): ret.GPUs, err = standalone.GPUs(ctx)
return ret
}
var (
chromeOSReleaseRegex = regexp.MustCompile(`CHROMEOS_RELEASE_VERSION=(\S+)`)
chromeOSMilestoneRegex = regexp.MustCompile(`CHROMEOS_RELEASE_CHROME_MILESTONE=(\S+)`)
chromeOSTrackRegex = regexp.MustCompile(`CHROMEOS_RELEASE_TRACK=(\S+)`)
)
func (m *Machine) tryInterrogatingChromeOSDevice(ctx context.Context) (machine.ChromeOS, error) {
metrics2.GetCounter("test_machine_monitor_interrogate_device_type", map[string]string{
"machine": m.MachineID,
"type": "chromeos",
}).Inc(1)
sklog.Info("tryInterrogatingChromeOSDevice")
var ret machine.ChromeOS
if m.description.SSHUserIP == "" {
return ret, skerr.Fmt("no machine.SSHUserIP supplied")
}
uptime, err := m.ssh.Run(ctx, m.description.SSHUserIP, "cat", "/proc/uptime")
if err != nil {
return ret, skerr.Wrapf(err, "Could not read ChromeOS uptime - assuming there is no ChromeOS device attached")
}
u := strings.Split(uptime, " ")[0]
if f, err := strconv.ParseFloat(u, 64); err != nil {
return ret, skerr.Wrapf(err, "Invalid /proc/uptime format: %q", uptime)
} else {
ret.Uptime = time.Duration(f * float64(time.Second))
}
lsbReleaseContents, err := m.ssh.Run(ctx, m.description.SSHUserIP, "cat", "/etc/lsb-release")
if err != nil {
return ret, skerr.Wrapf(err, "Failed to read lsb-release - assuming there is no ChromeOS device attached")
}
if match := chromeOSReleaseRegex.FindStringSubmatch(lsbReleaseContents); match != nil {
ret.ReleaseVersion = match[1]
}
if match := chromeOSMilestoneRegex.FindStringSubmatch(lsbReleaseContents); match != nil {
ret.Milestone = match[1]
}
if match := chromeOSTrackRegex.FindStringSubmatch(lsbReleaseContents); match != nil {
ret.Channel = match[1]
}
if ret.ReleaseVersion == "" && ret.Milestone == "" && ret.Channel == "" {
return ret, skerr.Wrapf(err, "Could not find ChromeOS data in /etc/lsb-release. Are we sure this is the right IP?\n%s", lsbReleaseContents)
}
// Now that we know we can connect to the SSH machine, make sure recipes can as well.
err = util.WithWriteFile(m.sshMachineLocation, func(w io.Writer) error {
type sshMachineInfo struct {
Comment string
UserIP string `json:"user_ip"`
}
toWrite := sshMachineInfo{
Comment: "This file is written to by test_machine_monitor. Do not edit by hand.",
UserIP: m.description.SSHUserIP,
}
e := json.NewEncoder(w)
e.SetIndent("", " ")
return e.Encode(toWrite)
})
if err != nil {
return ret, skerr.Wrapf(err, "Could not write SSH info to %s", m.sshMachineLocation)
}
return ret, nil
}
// rebootChromeOS reboots the ChromeOS device attached via SSH.
func (m *Machine) rebootChromeOS(ctx context.Context, userIP string) error {
out, err := m.ssh.Run(ctx, userIP, "reboot")
if err != nil {
sklog.Warningf("Could not reboot ChromeOS device %s: %s", m.description.SSHUserIP, out)
return skerr.Wrap(err)
}
return nil
}