blob: 1f73a2899a531ffe4a70e71b7e78bc4f34541105 [file] [log] [blame]
// Package machine is for interacting with the machine state server. See //machine.
package machine
import (
"context"
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/timer"
"go.skia.org/infra/go/util"
"go.skia.org/infra/machine/go/machine"
"go.skia.org/infra/machine/go/machine/sink"
"go.skia.org/infra/machine/go/machine/store"
"go.skia.org/infra/machine/go/machineserver/config"
"go.skia.org/infra/machine/go/test_machine_monitor/adb"
"go.skia.org/infra/machine/go/test_machine_monitor/ssh"
"go.skia.org/infra/machine/go/test_machine_monitor/swarming"
)
const (
interrogateDuration = 30 * time.Second
)
// Machine is the interface to the machine state server. See //machine.
type Machine struct {
// store is how we get our dimensions and status updates from the machine state server.
store store.Store
// sink is how we send machine.Events to the the machine state server.
sink sink.Sink
// adb makes calls to the adb server.
adb adb.Adb
// ssh is an abstraction around an ssh executor
ssh ssh.SSH
// MachineID is the swarming id of the machine.
MachineID string
// Hostname is the hostname(), which is the pod name under k8s.
Hostname string
// KubernetesImage is the container image being run.
KubernetesImage string
// Version of test_machine_monitor being run.
Version string
// startTime is the time when this machine started running.
startTime time.Time
// Metrics
interrogateTimer metrics2.Float64SummaryMetric
interrogateAndSendFailures metrics2.Counter
storeWatchArrivalCounter metrics2.Counter
// startSwarming is true if test_machine_monitor was used to launch Swarming.
startSwarming bool
// runningTask is true if the machine is currently running a swarming task.
runningTask bool
// mutex protects the description due to the fact it will be updated asynchronously via
// the firestore snapshot query.
mutex sync.Mutex
// description is provided by the machine state server. This tells us what
// to tell swarming, what our current mode is, etc.
description machine.Description
}
// New return an instance of *Machine.
func New(ctx context.Context, local bool, instanceConfig config.InstanceConfig, startTime time.Time, version string, startSwarming bool) (*Machine, error) {
store, err := store.NewFirestoreImpl(ctx, false, instanceConfig)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to build store instance.")
}
sink, err := sink.New(ctx, local, instanceConfig)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to build sink instance.")
}
kubernetesImage := os.Getenv(swarming.KubernetesImageEnvVar)
hostname, err := os.Hostname()
if err != nil {
return nil, skerr.Wrapf(err, "Could not determine hostname.")
}
machineID := os.Getenv(swarming.SwarmingBotIDEnvVar)
if machineID == "" {
// Fall back to hostname so we can track machines that
// test_machine_monitor is running on that don't also run Swarming.
machineID = hostname
}
return &Machine{
store: store,
sink: sink,
adb: adb.New(),
MachineID: machineID,
Hostname: hostname,
KubernetesImage: kubernetesImage,
Version: version,
startTime: startTime,
startSwarming: startSwarming,
interrogateTimer: metrics2.GetFloat64SummaryMetric("bot_config_machine_interrogate_timer", map[string]string{"machine": machineID}),
interrogateAndSendFailures: metrics2.GetCounter("bot_config_machine_interrogate_and_send_errors", map[string]string{"machine": machineID}),
storeWatchArrivalCounter: metrics2.GetCounter("bot_config_machine_store_watch_arrival", map[string]string{"machine": machineID}),
}, nil
}
// interrogate the machine we are running on for state-related information. It compiles that into
// a machine.Event and returns it.
func (m *Machine) interrogate(ctx context.Context) machine.Event {
defer timer.NewWithSummary("interrogate", m.interrogateTimer).Stop()
ret := machine.NewEvent()
ret.Host.Name = m.MachineID
ret.Host.PodName = m.Hostname
ret.Host.KubernetesImage = m.KubernetesImage
ret.Host.Version = m.Version
ret.Host.StartTime = m.startTime
ret.RunningSwarmingTask = m.runningTask
ret.LaunchedSwarming = m.startSwarming
if ce, ok := m.tryInterrogatingChromeOSDevice(ctx); ok {
sklog.Infof("Successful communication with ChromeOS device: %#v", ce)
ret.ChromeOS = ce
} else if ae, ok := m.tryInterrogatingAndroidDevice(ctx); ok {
sklog.Infof("Successful communication with Android device: %#v", ae)
ret.Android = ae
} else if ie, ok := m.tryInterrogatingIOSDevice(ctx); ok {
sklog.Infof("Successful communication with iOS device: %#v", ie)
ret.IOS = ie
} else {
sklog.Infof("No attached device found")
}
return ret
}
// interrogateAndSend gathers the state for this machine and sends it to the sink. Of note, this
// does not directly determine what dimensions this machine should have. The machine server that
// listens to the events will determine the dimensions based on the reported state and any
// information it has from other sources (e.g. human-supplied details, previously attached devices)
func (m *Machine) interrogateAndSend(ctx context.Context) error {
event := m.interrogate(ctx)
if err := m.sink.Send(ctx, event); err != nil {
return skerr.Wrapf(err, "Failed to send interrogation step.")
}
return nil
}
// Start the background processes that send events to the sink and watch for
// firestore changes.
func (m *Machine) Start(ctx context.Context) error {
if err := m.interrogateAndSend(ctx); err != nil {
return skerr.Wrap(err)
}
// Start a loop that scans for local devices and sends pubsub events with all the
// data every 30s.
go util.RepeatCtx(ctx, interrogateDuration, func(ctx context.Context) {
if err := m.interrogateAndSend(ctx); err != nil {
m.interrogateAndSendFailures.Inc(1)
sklog.Errorf("interrogateAndSend failed: %s", err)
}
})
m.startStoreWatch(ctx)
return nil
}
// startStoreWatch starts a loop that does a firestore onsnapshot watcher
// that gets the dims and state we should be reporting to swarming.
func (m *Machine) startStoreWatch(ctx context.Context) {
go func() {
for desc := range m.store.Watch(ctx, m.MachineID) {
m.storeWatchArrivalCounter.Inc(1)
m.UpdateDescription(desc)
}
}()
}
// UpdateDescription applies any change in behavior based on the new given description. This
// impacts what we tell Swarming, what mode we are in, if we should communicate with a device
// via SSH, etc.
func (m *Machine) UpdateDescription(desc machine.Description) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.description = desc
}
// DimensionsForSwarming returns the dimensions that should be reported to swarming.
func (m *Machine) DimensionsForSwarming() machine.SwarmingDimensions {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.description.Dimensions
}
// GetMaintenanceMode returns true if the machine should report to Swarming that it is
// in maintenance mode. Swarming does not have a "recovery" mode, so we group that in.
func (m *Machine) GetMaintenanceMode() bool {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.description.Mode == machine.ModeRecovery || m.description.Mode == machine.ModeMaintenance
}
// SetIsRunningSwarmingTask records if a swarming task is being run.
func (m *Machine) SetIsRunningSwarmingTask(isRunning bool) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.runningTask = isRunning
}
// IsRunningSwarmingTask returns true is a swarming task is currently running.
func (m *Machine) IsRunningSwarmingTask() bool {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.runningTask
}
// RebootDevice reboots the attached device.
func (m *Machine) RebootDevice(ctx context.Context) error {
m.mutex.Lock()
shouldReboot := len(m.description.Dimensions[machine.DimAndroidDevices]) > 0
m.mutex.Unlock()
if shouldReboot {
return m.adb.Reboot(ctx)
}
sklog.Info("No attached device to reboot.")
return nil
}
// tryInterrogatingAndroidDevice attempts to communicate with an Android device using the
// adb interface. If there is one attached, this function returns true and the information gathered
// (which can be partially filled out). If there is not a device attached, false is returned.
func (m *Machine) tryInterrogatingAndroidDevice(ctx context.Context) (machine.Android, bool) {
var ret machine.Android
if err := m.adb.EnsureOnline(ctx); err != nil {
sklog.Warningf("No Android device is available: %s", err)
return ret, false
}
if uptime, err := m.adb.Uptime(ctx); err != nil {
sklog.Warningf("Failed to read uptime - assuming there is no Android device attached: %s", err)
return ret, false // Assume there is no Android device attached.
} else {
ret.Uptime = uptime
}
if props, err := m.adb.RawProperties(ctx); err != nil {
sklog.Warningf("Failed to read android properties: %s", err)
} else {
ret.GetProp = props
}
if battery, err := m.adb.RawDumpSys(ctx, "battery"); err != nil {
sklog.Warningf("Failed to read android battery status: %s", err)
} else {
ret.DumpsysBattery = battery
}
if thermal, err := m.adb.RawDumpSys(ctx, "thermalservice"); err != nil {
sklog.Warningf("Failed to read android thermal status: %s", err)
} else {
ret.DumpsysThermalService = thermal
}
return ret, true
}
func (m *Machine) tryInterrogatingIOSDevice(_ context.Context) (machine.IOS, bool) {
// TODO(erikrose)
return machine.IOS{}, false
}
var (
chromeOSReleaseRegex = regexp.MustCompile(`CHROMEOS_RELEASE_VERSION=(\S+)`)
chromeOSMilestoneRegex = regexp.MustCompile(`CHROMEOS_RELEASE_CHROME_MILESTONE=(\S+)`)
chromeOSTrackRegex = regexp.MustCompile(`CHROMEOS_RELEASE_TRACK=(\S+)`)
)
func (m *Machine) tryInterrogatingChromeOSDevice(ctx context.Context) (machine.ChromeOS, bool) {
if m.description.SSHUserIP == "" {
return machine.ChromeOS{}, false
}
lsbReleaseContents, err := m.ssh.Run(ctx, m.description.SSHUserIP, "cat", "/etc/lsb-release")
if err != nil {
sklog.Warningf("Failed to read lsb-release - assuming there is no ChromeOS device attached: %s", err)
return machine.ChromeOS{}, false
}
rv := machine.ChromeOS{}
if match := chromeOSReleaseRegex.FindStringSubmatch(lsbReleaseContents); match != nil {
rv.ReleaseVersion = match[1]
}
if match := chromeOSMilestoneRegex.FindStringSubmatch(lsbReleaseContents); match != nil {
rv.Milestone = match[1]
}
if match := chromeOSTrackRegex.FindStringSubmatch(lsbReleaseContents); match != nil {
rv.Channel = match[1]
}
if rv.ReleaseVersion == "" && rv.Milestone == "" && rv.Channel == "" {
sklog.Errorf("Could not find ChromeOS data in /etc/lsb-release. Are we sure this is the right IP?\n%s", lsbReleaseContents)
return machine.ChromeOS{}, false
}
uptime, err := m.ssh.Run(ctx, m.description.SSHUserIP, "cat", "/proc/uptime")
if err != nil {
sklog.Warningf("Could not read ChromeOS uptime %s", err)
} else {
u := strings.Split(uptime, " ")[0]
if f, err := strconv.ParseFloat(u, 64); err != nil {
sklog.Warningf("Invalid /proc/uptime format: %q", uptime)
} else {
rv.Uptime = time.Duration(f * float64(time.Second))
}
}
return rv, true
}