Remove lag when updating a machine's Swarming dimensions after a test finishes.
Bug: skia:13942
Change-Id: I2b7b9a31b6bf02cbb7c422c0d26c916768a03d3a
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/609456
Commit-Queue: Joe Gregorio <jcgregorio@google.com>
Reviewed-by: Eric Boren <borenet@google.com>
diff --git a/machine/go/test_machine_monitor/machine/BUILD.bazel b/machine/go/test_machine_monitor/machine/BUILD.bazel
index 0cfa1f7..af796a8 100644
--- a/machine/go/test_machine_monitor/machine/BUILD.bazel
+++ b/machine/go/test_machine_monitor/machine/BUILD.bazel
@@ -47,6 +47,7 @@
"//machine/go/test_machine_monitor/ios",
"//machine/go/test_machine_monitor/ssh",
"@com_github_stretchr_testify//assert",
+ "@com_github_stretchr_testify//mock",
"@com_github_stretchr_testify//require",
],
)
diff --git a/machine/go/test_machine_monitor/machine/machine.go b/machine/go/test_machine_monitor/machine/machine.go
index 697a10b3..37a3883 100644
--- a/machine/go/test_machine_monitor/machine/machine.go
+++ b/machine/go/test_machine_monitor/machine/machine.go
@@ -121,10 +121,13 @@
// descriptionRetrievalCallback is called whenever a new machine state is pulled from
// machineserver. It is passed the new state.
descriptionRetrievalCallback func(*Machine)
+
+ // This channel emits a value if a round of interrogation must take place immediately.
+ triggerInterrogationCh <-chan bool
}
// New return an instance of *Machine.
-func New(ctx context.Context, local bool, instanceConfig config.InstanceConfig, version string, startSwarming bool, machineServerHost string, startFoundryBot bool, descriptionRetrievalCallback func(*Machine)) (*Machine, error) {
+func New(ctx context.Context, local bool, instanceConfig config.InstanceConfig, version string, startSwarming bool, machineServerHost string, startFoundryBot bool, descriptionRetrievalCallback func(*Machine), triggerInterrogationCh <-chan bool) (*Machine, error) {
sink, err := eventSink.New(ctx, local, instanceConfig)
if err != nil {
@@ -185,6 +188,7 @@
startFoundryBot: startFoundryBot,
homeDir: homeDir,
descriptionRetrievalCallback: descriptionRetrievalCallback,
+ triggerInterrogationCh: triggerInterrogationCh,
}, nil
}
@@ -271,6 +275,7 @@
// for example, if an Android device is missing, and that's a fatal
// error.
sklog.Errorf("Failed to interrogate: %s", err)
+ m.interrogateAndSendFailures.Inc(1)
}
if err := m.eventSink.Send(ctx, event); err != nil {
return skerr.Wrapf(err, "Failed to send interrogation step.")
@@ -299,12 +304,18 @@
// Start a loop that scans for local devices and sends pubsub events with all
// the data every 30s.
func (m *Machine) startInterrogateLoop(ctx context.Context) {
- util.RepeatCtx(ctx, interrogateDuration, func(ctx context.Context) {
- if err := m.interrogateAndSend(ctx); err != nil {
- m.interrogateAndSendFailures.Inc(1)
- sklog.Errorf("interrogateAndSend failed: %s", err)
+ timer := time.NewTicker(interrogateDuration)
+ defer timer.Stop()
+ for {
+ select {
+ case <-m.triggerInterrogationCh:
+ _ = m.interrogateAndSend(ctx)
+ case <-timer.C:
+ _ = m.interrogateAndSend(ctx)
+ case <-ctx.Done():
+ return
}
- })
+ }
}
// retrieveDescription stores and updates the machine Description in m.description.
diff --git a/machine/go/test_machine_monitor/machine/machine_test.go b/machine/go/test_machine_monitor/machine/machine_test.go
index e2e4575..c24802e 100644
--- a/machine/go/test_machine_monitor/machine/machine_test.go
+++ b/machine/go/test_machine_monitor/machine/machine_test.go
@@ -14,6 +14,7 @@
"time"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.skia.org/infra/go/executil"
"go.skia.org/infra/go/httputils"
@@ -572,12 +573,13 @@
// Create a Machine instance.
m := &Machine{
- eventSink: eventSink,
- startTime: start,
- description: rpc.ToFrontendDescription(desc),
- adb: adb.New(),
- interrogateTimer: metrics2.GetFloat64SummaryMetric("bot_config_machine_interrogate_timer", map[string]string{"machine": machineID}),
- MachineID: "my-test-bot-001",
+ eventSink: eventSink,
+ startTime: start,
+ description: rpc.ToFrontendDescription(desc),
+ adb: adb.New(),
+ interrogateTimer: metrics2.GetFloat64SummaryMetric("bot_config_machine_interrogate_timer", map[string]string{"machine": machineID}),
+ interrogateAndSendFailures: metrics2.GetCounter("test_machine_monitor_interrogate_and_send_errors", map[string]string{"machine": "my-test-bot-001"}),
+ MachineID: "my-test-bot-001",
}
ctx = executil.WithFakeTests(ctx,
@@ -590,6 +592,49 @@
eventSink.AssertExpectations(t)
}
+func TestStartInterrogation_TriggerInterrogationChannel_InterrogationIsDone(t *testing.T) {
+ ctx := context.Background()
+
+ // Successful calls for a single loop of interrogating an ADB device.
+ ctx = executil.WithFakeTests(ctx,
+ "Test_FakeExe_AdbGetState_Success",
+ "Test_FakeExe_AdbShellGetUptime_Success",
+ "Test_FakeExe_AdbShellGetProp_Success",
+ "Test_FakeExe_RawDumpSys_Success",
+ "Test_FakeExe_RawDumpSys_Success",
+ )
+ cancelCtx, cancel := context.WithCancel(ctx)
+
+ // Other tests confirm the value being sent is valid, in this case we just
+ // want to cancel the context so the startInterrogateLoop exits.
+ eventSink := sinkMocks.NewSink(t)
+ eventSink.On("Send", testutils.AnyContext, mock.Anything).Run(func(args mock.Arguments) {
+ cancel()
+ }).Return(nil)
+
+ desc := machine.NewDescription(ctx)
+ desc.AttachedDevice = machine.AttachedDeviceAdb
+
+ // Create a Machine instance.
+ triggerInterrogationCh := make(chan bool, 1)
+ m := &Machine{
+ eventSink: eventSink,
+ description: rpc.ToFrontendDescription(desc),
+ adb: adb.New(),
+ interrogateTimer: metrics2.GetFloat64SummaryMetric("bot_config_machine_interrogate_timer", map[string]string{"machine": machineID}),
+ MachineID: "my-test-bot-001",
+ triggerInterrogationCh: triggerInterrogationCh,
+ }
+
+ // trigger an interrogation right away.
+ triggerInterrogationCh <- true
+
+ m.startInterrogateLoop(cancelCtx)
+
+ // The test will fail by timeout if startInterrogateLoop fails to exit on
+ // context cancellation.
+}
+
func Test_FakeExe_AdbShellGetUptime_Success(t *testing.T) {
if !executil.IsCallingFakeCommand() {
return
diff --git a/machine/go/test_machine_monitor/main.go b/machine/go/test_machine_monitor/main.go
index 782b6ea..ecda287 100644
--- a/machine/go/test_machine_monitor/main.go
+++ b/machine/go/test_machine_monitor/main.go
@@ -19,6 +19,12 @@
"go.skia.org/infra/machine/go/test_machine_monitor/swarming"
)
+const (
+ // Make the triggerInterrogation channel buffered so we don't lag responding
+ // to HTTP requests from the Swarming bot.
+ interrogationChannelSize = 10
+)
+
// flags
var (
configFlag = flag.String("config", "prod.json", "The name to the configuration file, such as prod.json or test.json, as found in machine/go/configs.")
@@ -71,7 +77,8 @@
}
ctx := context.Background()
- machineState, err := machine.New(ctx, *local, instanceConfig, Version, *startSwarming, *machineServerHost, *startFoundryBot, reportAvailability)
+ triggerInterrogationCh := make(chan bool, interrogationChannelSize)
+ machineState, err := machine.New(ctx, *local, instanceConfig, Version, *startSwarming, *machineServerHost, *startFoundryBot, reportAvailability, triggerInterrogationCh)
if err != nil {
sklog.Fatal("Failed to create machine: %s", err)
}
@@ -80,7 +87,7 @@
}
sklog.Infof("Starting the server.")
- machineSwarmingServer, err := server.New(machineState)
+ machineSwarmingServer, err := server.New(machineState, triggerInterrogationCh)
if err != nil {
sklog.Fatal(err)
}
diff --git a/machine/go/test_machine_monitor/server/server.go b/machine/go/test_machine_monitor/server/server.go
index 6d4c5e1..c86a9c2 100644
--- a/machine/go/test_machine_monitor/server/server.go
+++ b/machine/go/test_machine_monitor/server/server.go
@@ -2,7 +2,6 @@
package server
import (
- "context"
"encoding/json"
"net/http"
"os"
@@ -22,8 +21,9 @@
// Server is the core functionality of test_machine_monitor.
type Server struct {
- r *mux.Router
- machine *machine.Machine
+ r *mux.Router
+ machine *machine.Machine
+ triggerInterrogationCh chan<- bool
getStateRequests metrics2.Counter
getStateRequestsSuccess metrics2.Counter
@@ -36,11 +36,12 @@
}
// New returns a new instance of Server.
-func New(m *machine.Machine) (*Server, error) {
+func New(m *machine.Machine, triggerInterrogationCh chan<- bool) (*Server, error) {
r := mux.NewRouter()
ret := &Server{
- r: r,
- machine: m,
+ r: r,
+ machine: m,
+ triggerInterrogationCh: triggerInterrogationCh,
getStateRequests: metrics2.GetCounter("bot_config_server_get_state_requests", map[string]string{"machine": m.MachineID}),
getStateRequestsSuccess: metrics2.GetCounter("bot_config_server_get_state_requests_success", map[string]string{"machine": m.MachineID}),
@@ -181,9 +182,19 @@
s.onAfterTaskSuccess.Inc(1)
// Don't use r.Context() here as that seems to get cancelled by Swarming
// pretty quickly.
- if err := s.machine.RebootDevice(context.Background()); err != nil {
- sklog.Warningf("Failed to reboot device: %s", err)
- }
+
+ // Do this in a Go routine so as we can return from this HTTP handler
+ // quickly.
+ go func() {
+ defer metrics2.FuncTimer().Stop()
+ // Do the reboot first so that the device will be ready
+ // when doing the interrogation.
+ if err := s.machine.RebootDevice(r.Context()); err != nil {
+ sklog.Warningf("Failed to reboot device: %s", err)
+ }
+ s.triggerInterrogationCh <- true
+ }()
+
}
// Start the http server. This function never returns.
diff --git a/machine/go/test_machine_monitor/server/server_test.go b/machine/go/test_machine_monitor/server/server_test.go
index f586ea0..dc33fa8 100644
--- a/machine/go/test_machine_monitor/server/server_test.go
+++ b/machine/go/test_machine_monitor/server/server_test.go
@@ -18,7 +18,7 @@
func TestGetSettings_Success(t *testing.T) {
r := httptest.NewRequest("GET", "/get_settings", nil)
- s, err := New(&botmachine.Machine{})
+ s, err := New(&botmachine.Machine{}, make(chan bool))
require.NoError(t, err)
w := httptest.NewRecorder()
@@ -40,7 +40,7 @@
r := httptest.NewRequest("POST", "/get_state", strings.NewReader("{\"foo\":\"bar\"}"))
- s, err := New(&botmachine.Machine{})
+ s, err := New(&botmachine.Machine{}, make(chan bool))
require.NoError(t, err)
w := httptest.NewRecorder()
@@ -61,7 +61,7 @@
r := httptest.NewRequest("POST", "/get_state", strings.NewReader("This is not valid JSON"))
- s, err := New(&botmachine.Machine{})
+ s, err := New(&botmachine.Machine{}, make(chan bool))
require.NoError(t, err)
w := httptest.NewRecorder()
@@ -75,7 +75,7 @@
r := httptest.NewRequest("POST", "/get_settings", strings.NewReader("{\"foo\": [\"bar\"]}"))
- s, err := New(&botmachine.Machine{})
+ s, err := New(&botmachine.Machine{}, make(chan bool))
require.NoError(t, err)
s.machine.UpdateDescription(rpc.FrontendDescription{
Dimensions: machine.SwarmingDimensions{"foo": {"baz", "quux"}},
@@ -101,7 +101,7 @@
r := httptest.NewRequest("GET", "/on_begin_task", nil)
- s, err := New(&botmachine.Machine{})
+ s, err := New(&botmachine.Machine{}, make(chan bool))
require.NoError(t, err)
require.False(t, s.machine.IsRunningSwarmingTask())
s.onBeforeTaskSuccess.Reset()
@@ -122,7 +122,7 @@
r := httptest.NewRequest("GET", "/on_after_task", nil)
- s, err := New(&botmachine.Machine{})
+ s, err := New(&botmachine.Machine{}, make(chan bool))
require.NoError(t, err)
s.machine.SetIsRunningSwarmingTask(true)
require.True(t, s.machine.IsRunningSwarmingTask())