Remove lag when updating a machine's Swarming dimensions after a test finishes. Bug: skia:13942 Change-Id: I2b7b9a31b6bf02cbb7c422c0d26c916768a03d3a Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/609456 Commit-Queue: Joe Gregorio <jcgregorio@google.com> Reviewed-by: Eric Boren <borenet@google.com>
diff --git a/machine/go/test_machine_monitor/machine/BUILD.bazel b/machine/go/test_machine_monitor/machine/BUILD.bazel index 0cfa1f7..af796a8 100644 --- a/machine/go/test_machine_monitor/machine/BUILD.bazel +++ b/machine/go/test_machine_monitor/machine/BUILD.bazel
@@ -47,6 +47,7 @@ "//machine/go/test_machine_monitor/ios", "//machine/go/test_machine_monitor/ssh", "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//mock", "@com_github_stretchr_testify//require", ], )
diff --git a/machine/go/test_machine_monitor/machine/machine.go b/machine/go/test_machine_monitor/machine/machine.go index 697a10b3..37a3883 100644 --- a/machine/go/test_machine_monitor/machine/machine.go +++ b/machine/go/test_machine_monitor/machine/machine.go
@@ -121,10 +121,13 @@ // descriptionRetrievalCallback is called whenever a new machine state is pulled from // machineserver. It is passed the new state. descriptionRetrievalCallback func(*Machine) + + // This channel emits a value if a round of interrogation must take place immediately. + triggerInterrogationCh <-chan bool } // New return an instance of *Machine. -func New(ctx context.Context, local bool, instanceConfig config.InstanceConfig, version string, startSwarming bool, machineServerHost string, startFoundryBot bool, descriptionRetrievalCallback func(*Machine)) (*Machine, error) { +func New(ctx context.Context, local bool, instanceConfig config.InstanceConfig, version string, startSwarming bool, machineServerHost string, startFoundryBot bool, descriptionRetrievalCallback func(*Machine), triggerInterrogationCh <-chan bool) (*Machine, error) { sink, err := eventSink.New(ctx, local, instanceConfig) if err != nil { @@ -185,6 +188,7 @@ startFoundryBot: startFoundryBot, homeDir: homeDir, descriptionRetrievalCallback: descriptionRetrievalCallback, + triggerInterrogationCh: triggerInterrogationCh, }, nil } @@ -271,6 +275,7 @@ // for example, if an Android device is missing, and that's a fatal // error. sklog.Errorf("Failed to interrogate: %s", err) + m.interrogateAndSendFailures.Inc(1) } if err := m.eventSink.Send(ctx, event); err != nil { return skerr.Wrapf(err, "Failed to send interrogation step.") @@ -299,12 +304,18 @@ // Start a loop that scans for local devices and sends pubsub events with all // the data every 30s. func (m *Machine) startInterrogateLoop(ctx context.Context) { - util.RepeatCtx(ctx, interrogateDuration, func(ctx context.Context) { - if err := m.interrogateAndSend(ctx); err != nil { - m.interrogateAndSendFailures.Inc(1) - sklog.Errorf("interrogateAndSend failed: %s", err) + timer := time.NewTicker(interrogateDuration) + defer timer.Stop() + for { + select { + case <-m.triggerInterrogationCh: + _ = m.interrogateAndSend(ctx) + case <-timer.C: + _ = m.interrogateAndSend(ctx) + case <-ctx.Done(): + return } - }) + } } // retrieveDescription stores and updates the machine Description in m.description.
diff --git a/machine/go/test_machine_monitor/machine/machine_test.go b/machine/go/test_machine_monitor/machine/machine_test.go index e2e4575..c24802e 100644 --- a/machine/go/test_machine_monitor/machine/machine_test.go +++ b/machine/go/test_machine_monitor/machine/machine_test.go
@@ -14,6 +14,7 @@ "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.skia.org/infra/go/executil" "go.skia.org/infra/go/httputils" @@ -572,12 +573,13 @@ // Create a Machine instance. m := &Machine{ - eventSink: eventSink, - startTime: start, - description: rpc.ToFrontendDescription(desc), - adb: adb.New(), - interrogateTimer: metrics2.GetFloat64SummaryMetric("bot_config_machine_interrogate_timer", map[string]string{"machine": machineID}), - MachineID: "my-test-bot-001", + eventSink: eventSink, + startTime: start, + description: rpc.ToFrontendDescription(desc), + adb: adb.New(), + interrogateTimer: metrics2.GetFloat64SummaryMetric("bot_config_machine_interrogate_timer", map[string]string{"machine": machineID}), + interrogateAndSendFailures: metrics2.GetCounter("test_machine_monitor_interrogate_and_send_errors", map[string]string{"machine": "my-test-bot-001"}), + MachineID: "my-test-bot-001", } ctx = executil.WithFakeTests(ctx, @@ -590,6 +592,49 @@ eventSink.AssertExpectations(t) } +func TestStartInterrogation_TriggerInterrogationChannel_InterrogationIsDone(t *testing.T) { + ctx := context.Background() + + // Successful calls for a single loop of interrogating an ADB device. + ctx = executil.WithFakeTests(ctx, + "Test_FakeExe_AdbGetState_Success", + "Test_FakeExe_AdbShellGetUptime_Success", + "Test_FakeExe_AdbShellGetProp_Success", + "Test_FakeExe_RawDumpSys_Success", + "Test_FakeExe_RawDumpSys_Success", + ) + cancelCtx, cancel := context.WithCancel(ctx) + + // Other tests confirm the value being sent is valid, in this case we just + // want to cancel the context so the startInterrogateLoop exits. + eventSink := sinkMocks.NewSink(t) + eventSink.On("Send", testutils.AnyContext, mock.Anything).Run(func(args mock.Arguments) { + cancel() + }).Return(nil) + + desc := machine.NewDescription(ctx) + desc.AttachedDevice = machine.AttachedDeviceAdb + + // Create a Machine instance. + triggerInterrogationCh := make(chan bool, 1) + m := &Machine{ + eventSink: eventSink, + description: rpc.ToFrontendDescription(desc), + adb: adb.New(), + interrogateTimer: metrics2.GetFloat64SummaryMetric("bot_config_machine_interrogate_timer", map[string]string{"machine": machineID}), + MachineID: "my-test-bot-001", + triggerInterrogationCh: triggerInterrogationCh, + } + + // trigger an interrogation right away. + triggerInterrogationCh <- true + + m.startInterrogateLoop(cancelCtx) + + // The test will fail by timeout if startInterrogateLoop fails to exit on + // context cancellation. +} + func Test_FakeExe_AdbShellGetUptime_Success(t *testing.T) { if !executil.IsCallingFakeCommand() { return
diff --git a/machine/go/test_machine_monitor/main.go b/machine/go/test_machine_monitor/main.go index 782b6ea..ecda287 100644 --- a/machine/go/test_machine_monitor/main.go +++ b/machine/go/test_machine_monitor/main.go
@@ -19,6 +19,12 @@ "go.skia.org/infra/machine/go/test_machine_monitor/swarming" ) +const ( + // Make the triggerInterrogation channel buffered so we don't lag responding + // to HTTP requests from the Swarming bot. + interrogationChannelSize = 10 +) + // flags var ( configFlag = flag.String("config", "prod.json", "The name to the configuration file, such as prod.json or test.json, as found in machine/go/configs.") @@ -71,7 +77,8 @@ } ctx := context.Background() - machineState, err := machine.New(ctx, *local, instanceConfig, Version, *startSwarming, *machineServerHost, *startFoundryBot, reportAvailability) + triggerInterrogationCh := make(chan bool, interrogationChannelSize) + machineState, err := machine.New(ctx, *local, instanceConfig, Version, *startSwarming, *machineServerHost, *startFoundryBot, reportAvailability, triggerInterrogationCh) if err != nil { sklog.Fatal("Failed to create machine: %s", err) } @@ -80,7 +87,7 @@ } sklog.Infof("Starting the server.") - machineSwarmingServer, err := server.New(machineState) + machineSwarmingServer, err := server.New(machineState, triggerInterrogationCh) if err != nil { sklog.Fatal(err) }
diff --git a/machine/go/test_machine_monitor/server/server.go b/machine/go/test_machine_monitor/server/server.go index 6d4c5e1..c86a9c2 100644 --- a/machine/go/test_machine_monitor/server/server.go +++ b/machine/go/test_machine_monitor/server/server.go
@@ -2,7 +2,6 @@ package server import ( - "context" "encoding/json" "net/http" "os" @@ -22,8 +21,9 @@ // Server is the core functionality of test_machine_monitor. type Server struct { - r *mux.Router - machine *machine.Machine + r *mux.Router + machine *machine.Machine + triggerInterrogationCh chan<- bool getStateRequests metrics2.Counter getStateRequestsSuccess metrics2.Counter @@ -36,11 +36,12 @@ } // New returns a new instance of Server. -func New(m *machine.Machine) (*Server, error) { +func New(m *machine.Machine, triggerInterrogationCh chan<- bool) (*Server, error) { r := mux.NewRouter() ret := &Server{ - r: r, - machine: m, + r: r, + machine: m, + triggerInterrogationCh: triggerInterrogationCh, getStateRequests: metrics2.GetCounter("bot_config_server_get_state_requests", map[string]string{"machine": m.MachineID}), getStateRequestsSuccess: metrics2.GetCounter("bot_config_server_get_state_requests_success", map[string]string{"machine": m.MachineID}), @@ -181,9 +182,19 @@ s.onAfterTaskSuccess.Inc(1) // Don't use r.Context() here as that seems to get cancelled by Swarming // pretty quickly. - if err := s.machine.RebootDevice(context.Background()); err != nil { - sklog.Warningf("Failed to reboot device: %s", err) - } + + // Do this in a Go routine so as we can return from this HTTP handler + // quickly. + go func() { + defer metrics2.FuncTimer().Stop() + // Do the reboot first so that the device will be ready + // when doing the interrogation. + if err := s.machine.RebootDevice(r.Context()); err != nil { + sklog.Warningf("Failed to reboot device: %s", err) + } + s.triggerInterrogationCh <- true + }() + } // Start the http server. This function never returns.
diff --git a/machine/go/test_machine_monitor/server/server_test.go b/machine/go/test_machine_monitor/server/server_test.go index f586ea0..dc33fa8 100644 --- a/machine/go/test_machine_monitor/server/server_test.go +++ b/machine/go/test_machine_monitor/server/server_test.go
@@ -18,7 +18,7 @@ func TestGetSettings_Success(t *testing.T) { r := httptest.NewRequest("GET", "/get_settings", nil) - s, err := New(&botmachine.Machine{}) + s, err := New(&botmachine.Machine{}, make(chan bool)) require.NoError(t, err) w := httptest.NewRecorder() @@ -40,7 +40,7 @@ r := httptest.NewRequest("POST", "/get_state", strings.NewReader("{\"foo\":\"bar\"}")) - s, err := New(&botmachine.Machine{}) + s, err := New(&botmachine.Machine{}, make(chan bool)) require.NoError(t, err) w := httptest.NewRecorder() @@ -61,7 +61,7 @@ r := httptest.NewRequest("POST", "/get_state", strings.NewReader("This is not valid JSON")) - s, err := New(&botmachine.Machine{}) + s, err := New(&botmachine.Machine{}, make(chan bool)) require.NoError(t, err) w := httptest.NewRecorder() @@ -75,7 +75,7 @@ r := httptest.NewRequest("POST", "/get_settings", strings.NewReader("{\"foo\": [\"bar\"]}")) - s, err := New(&botmachine.Machine{}) + s, err := New(&botmachine.Machine{}, make(chan bool)) require.NoError(t, err) s.machine.UpdateDescription(rpc.FrontendDescription{ Dimensions: machine.SwarmingDimensions{"foo": {"baz", "quux"}}, @@ -101,7 +101,7 @@ r := httptest.NewRequest("GET", "/on_begin_task", nil) - s, err := New(&botmachine.Machine{}) + s, err := New(&botmachine.Machine{}, make(chan bool)) require.NoError(t, err) require.False(t, s.machine.IsRunningSwarmingTask()) s.onBeforeTaskSuccess.Reset() @@ -122,7 +122,7 @@ r := httptest.NewRequest("GET", "/on_after_task", nil) - s, err := New(&botmachine.Machine{}) + s, err := New(&botmachine.Machine{}, make(chan bool)) require.NoError(t, err) s.machine.SetIsRunningSwarmingTask(true) require.True(t, s.machine.IsRunningSwarmingTask())