Remove lag when updating a machine's Swarming dimensions after a test finishes.

Bug: skia:13942
Change-Id: I2b7b9a31b6bf02cbb7c422c0d26c916768a03d3a
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/609456
Commit-Queue: Joe Gregorio <jcgregorio@google.com>
Reviewed-by: Eric Boren <borenet@google.com>
diff --git a/machine/go/test_machine_monitor/machine/BUILD.bazel b/machine/go/test_machine_monitor/machine/BUILD.bazel
index 0cfa1f7..af796a8 100644
--- a/machine/go/test_machine_monitor/machine/BUILD.bazel
+++ b/machine/go/test_machine_monitor/machine/BUILD.bazel
@@ -47,6 +47,7 @@
         "//machine/go/test_machine_monitor/ios",
         "//machine/go/test_machine_monitor/ssh",
         "@com_github_stretchr_testify//assert",
+        "@com_github_stretchr_testify//mock",
         "@com_github_stretchr_testify//require",
     ],
 )
diff --git a/machine/go/test_machine_monitor/machine/machine.go b/machine/go/test_machine_monitor/machine/machine.go
index 697a10b3..37a3883 100644
--- a/machine/go/test_machine_monitor/machine/machine.go
+++ b/machine/go/test_machine_monitor/machine/machine.go
@@ -121,10 +121,13 @@
 	// descriptionRetrievalCallback is called whenever a new machine state is pulled from
 	// machineserver. It is passed the new state.
 	descriptionRetrievalCallback func(*Machine)
+
+	// This channel emits a value if a round of interrogation must take place immediately.
+	triggerInterrogationCh <-chan bool
 }
 
 // New return an instance of *Machine.
-func New(ctx context.Context, local bool, instanceConfig config.InstanceConfig, version string, startSwarming bool, machineServerHost string, startFoundryBot bool, descriptionRetrievalCallback func(*Machine)) (*Machine, error) {
+func New(ctx context.Context, local bool, instanceConfig config.InstanceConfig, version string, startSwarming bool, machineServerHost string, startFoundryBot bool, descriptionRetrievalCallback func(*Machine), triggerInterrogationCh <-chan bool) (*Machine, error) {
 
 	sink, err := eventSink.New(ctx, local, instanceConfig)
 	if err != nil {
@@ -185,6 +188,7 @@
 		startFoundryBot:                startFoundryBot,
 		homeDir:                        homeDir,
 		descriptionRetrievalCallback:   descriptionRetrievalCallback,
+		triggerInterrogationCh:         triggerInterrogationCh,
 	}, nil
 }
 
@@ -271,6 +275,7 @@
 		// for example, if an Android device is missing, and that's a fatal
 		// error.
 		sklog.Errorf("Failed to interrogate: %s", err)
+		m.interrogateAndSendFailures.Inc(1)
 	}
 	if err := m.eventSink.Send(ctx, event); err != nil {
 		return skerr.Wrapf(err, "Failed to send interrogation step.")
@@ -299,12 +304,18 @@
 // Start a loop that scans for local devices and sends pubsub events with all
 // the data every 30s.
 func (m *Machine) startInterrogateLoop(ctx context.Context) {
-	util.RepeatCtx(ctx, interrogateDuration, func(ctx context.Context) {
-		if err := m.interrogateAndSend(ctx); err != nil {
-			m.interrogateAndSendFailures.Inc(1)
-			sklog.Errorf("interrogateAndSend failed: %s", err)
+	timer := time.NewTicker(interrogateDuration)
+	defer timer.Stop()
+	for {
+		select {
+		case <-m.triggerInterrogationCh:
+			_ = m.interrogateAndSend(ctx)
+		case <-timer.C:
+			_ = m.interrogateAndSend(ctx)
+		case <-ctx.Done():
+			return
 		}
-	})
+	}
 }
 
 // retrieveDescription stores and updates the machine Description in m.description.
diff --git a/machine/go/test_machine_monitor/machine/machine_test.go b/machine/go/test_machine_monitor/machine/machine_test.go
index e2e4575..c24802e 100644
--- a/machine/go/test_machine_monitor/machine/machine_test.go
+++ b/machine/go/test_machine_monitor/machine/machine_test.go
@@ -14,6 +14,7 @@
 	"time"
 
 	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
 	"go.skia.org/infra/go/executil"
 	"go.skia.org/infra/go/httputils"
@@ -572,12 +573,13 @@
 
 	// Create a Machine instance.
 	m := &Machine{
-		eventSink:        eventSink,
-		startTime:        start,
-		description:      rpc.ToFrontendDescription(desc),
-		adb:              adb.New(),
-		interrogateTimer: metrics2.GetFloat64SummaryMetric("bot_config_machine_interrogate_timer", map[string]string{"machine": machineID}),
-		MachineID:        "my-test-bot-001",
+		eventSink:                  eventSink,
+		startTime:                  start,
+		description:                rpc.ToFrontendDescription(desc),
+		adb:                        adb.New(),
+		interrogateTimer:           metrics2.GetFloat64SummaryMetric("bot_config_machine_interrogate_timer", map[string]string{"machine": machineID}),
+		interrogateAndSendFailures: metrics2.GetCounter("test_machine_monitor_interrogate_and_send_errors", map[string]string{"machine": "my-test-bot-001"}),
+		MachineID:                  "my-test-bot-001",
 	}
 
 	ctx = executil.WithFakeTests(ctx,
@@ -590,6 +592,49 @@
 	eventSink.AssertExpectations(t)
 }
 
+func TestStartInterrogation_TriggerInterrogationChannel_InterrogationIsDone(t *testing.T) {
+	ctx := context.Background()
+
+	// Successful calls for a single loop of interrogating an ADB device.
+	ctx = executil.WithFakeTests(ctx,
+		"Test_FakeExe_AdbGetState_Success",
+		"Test_FakeExe_AdbShellGetUptime_Success",
+		"Test_FakeExe_AdbShellGetProp_Success",
+		"Test_FakeExe_RawDumpSys_Success",
+		"Test_FakeExe_RawDumpSys_Success",
+	)
+	cancelCtx, cancel := context.WithCancel(ctx)
+
+	// Other tests confirm the value being sent is valid, in this case we just
+	// want to cancel the context so the startInterrogateLoop exits.
+	eventSink := sinkMocks.NewSink(t)
+	eventSink.On("Send", testutils.AnyContext, mock.Anything).Run(func(args mock.Arguments) {
+		cancel()
+	}).Return(nil)
+
+	desc := machine.NewDescription(ctx)
+	desc.AttachedDevice = machine.AttachedDeviceAdb
+
+	// Create a Machine instance.
+	triggerInterrogationCh := make(chan bool, 1)
+	m := &Machine{
+		eventSink:              eventSink,
+		description:            rpc.ToFrontendDescription(desc),
+		adb:                    adb.New(),
+		interrogateTimer:       metrics2.GetFloat64SummaryMetric("bot_config_machine_interrogate_timer", map[string]string{"machine": machineID}),
+		MachineID:              "my-test-bot-001",
+		triggerInterrogationCh: triggerInterrogationCh,
+	}
+
+	// trigger an interrogation right away.
+	triggerInterrogationCh <- true
+
+	m.startInterrogateLoop(cancelCtx)
+
+	// The test will fail by timeout if startInterrogateLoop fails to exit on
+	// context cancellation.
+}
+
 func Test_FakeExe_AdbShellGetUptime_Success(t *testing.T) {
 	if !executil.IsCallingFakeCommand() {
 		return
diff --git a/machine/go/test_machine_monitor/main.go b/machine/go/test_machine_monitor/main.go
index 782b6ea..ecda287 100644
--- a/machine/go/test_machine_monitor/main.go
+++ b/machine/go/test_machine_monitor/main.go
@@ -19,6 +19,12 @@
 	"go.skia.org/infra/machine/go/test_machine_monitor/swarming"
 )
 
+const (
+	// Make the triggerInterrogation channel buffered so we don't lag responding
+	// to HTTP requests from the Swarming bot.
+	interrogationChannelSize = 10
+)
+
 // flags
 var (
 	configFlag         = flag.String("config", "prod.json", "The name to the configuration file, such as prod.json or test.json, as found in machine/go/configs.")
@@ -71,7 +77,8 @@
 	}
 
 	ctx := context.Background()
-	machineState, err := machine.New(ctx, *local, instanceConfig, Version, *startSwarming, *machineServerHost, *startFoundryBot, reportAvailability)
+	triggerInterrogationCh := make(chan bool, interrogationChannelSize)
+	machineState, err := machine.New(ctx, *local, instanceConfig, Version, *startSwarming, *machineServerHost, *startFoundryBot, reportAvailability, triggerInterrogationCh)
 	if err != nil {
 		sklog.Fatal("Failed to create machine: %s", err)
 	}
@@ -80,7 +87,7 @@
 	}
 
 	sklog.Infof("Starting the server.")
-	machineSwarmingServer, err := server.New(machineState)
+	machineSwarmingServer, err := server.New(machineState, triggerInterrogationCh)
 	if err != nil {
 		sklog.Fatal(err)
 	}
diff --git a/machine/go/test_machine_monitor/server/server.go b/machine/go/test_machine_monitor/server/server.go
index 6d4c5e1..c86a9c2 100644
--- a/machine/go/test_machine_monitor/server/server.go
+++ b/machine/go/test_machine_monitor/server/server.go
@@ -2,7 +2,6 @@
 package server
 
 import (
-	"context"
 	"encoding/json"
 	"net/http"
 	"os"
@@ -22,8 +21,9 @@
 
 // Server is the core functionality of test_machine_monitor.
 type Server struct {
-	r       *mux.Router
-	machine *machine.Machine
+	r                      *mux.Router
+	machine                *machine.Machine
+	triggerInterrogationCh chan<- bool
 
 	getStateRequests             metrics2.Counter
 	getStateRequestsSuccess      metrics2.Counter
@@ -36,11 +36,12 @@
 }
 
 // New returns a new instance of Server.
-func New(m *machine.Machine) (*Server, error) {
+func New(m *machine.Machine, triggerInterrogationCh chan<- bool) (*Server, error) {
 	r := mux.NewRouter()
 	ret := &Server{
-		r:       r,
-		machine: m,
+		r:                      r,
+		machine:                m,
+		triggerInterrogationCh: triggerInterrogationCh,
 
 		getStateRequests:             metrics2.GetCounter("bot_config_server_get_state_requests", map[string]string{"machine": m.MachineID}),
 		getStateRequestsSuccess:      metrics2.GetCounter("bot_config_server_get_state_requests_success", map[string]string{"machine": m.MachineID}),
@@ -181,9 +182,19 @@
 	s.onAfterTaskSuccess.Inc(1)
 	// Don't use r.Context() here as that seems to get cancelled by Swarming
 	// pretty quickly.
-	if err := s.machine.RebootDevice(context.Background()); err != nil {
-		sklog.Warningf("Failed to reboot device: %s", err)
-	}
+
+	// Do this in a Go routine so as we can return from this HTTP handler
+	// quickly.
+	go func() {
+		defer metrics2.FuncTimer().Stop()
+		// Do the reboot first so that the device will be ready
+		// when doing the interrogation.
+		if err := s.machine.RebootDevice(r.Context()); err != nil {
+			sklog.Warningf("Failed to reboot device: %s", err)
+		}
+		s.triggerInterrogationCh <- true
+	}()
+
 }
 
 // Start the http server. This function never returns.
diff --git a/machine/go/test_machine_monitor/server/server_test.go b/machine/go/test_machine_monitor/server/server_test.go
index f586ea0..dc33fa8 100644
--- a/machine/go/test_machine_monitor/server/server_test.go
+++ b/machine/go/test_machine_monitor/server/server_test.go
@@ -18,7 +18,7 @@
 func TestGetSettings_Success(t *testing.T) {
 
 	r := httptest.NewRequest("GET", "/get_settings", nil)
-	s, err := New(&botmachine.Machine{})
+	s, err := New(&botmachine.Machine{}, make(chan bool))
 	require.NoError(t, err)
 	w := httptest.NewRecorder()
 
@@ -40,7 +40,7 @@
 
 	r := httptest.NewRequest("POST", "/get_state", strings.NewReader("{\"foo\":\"bar\"}"))
 
-	s, err := New(&botmachine.Machine{})
+	s, err := New(&botmachine.Machine{}, make(chan bool))
 	require.NoError(t, err)
 	w := httptest.NewRecorder()
 
@@ -61,7 +61,7 @@
 
 	r := httptest.NewRequest("POST", "/get_state", strings.NewReader("This is not valid JSON"))
 
-	s, err := New(&botmachine.Machine{})
+	s, err := New(&botmachine.Machine{}, make(chan bool))
 	require.NoError(t, err)
 	w := httptest.NewRecorder()
 
@@ -75,7 +75,7 @@
 
 	r := httptest.NewRequest("POST", "/get_settings", strings.NewReader("{\"foo\": [\"bar\"]}"))
 
-	s, err := New(&botmachine.Machine{})
+	s, err := New(&botmachine.Machine{}, make(chan bool))
 	require.NoError(t, err)
 	s.machine.UpdateDescription(rpc.FrontendDescription{
 		Dimensions: machine.SwarmingDimensions{"foo": {"baz", "quux"}},
@@ -101,7 +101,7 @@
 
 	r := httptest.NewRequest("GET", "/on_begin_task", nil)
 
-	s, err := New(&botmachine.Machine{})
+	s, err := New(&botmachine.Machine{}, make(chan bool))
 	require.NoError(t, err)
 	require.False(t, s.machine.IsRunningSwarmingTask())
 	s.onBeforeTaskSuccess.Reset()
@@ -122,7 +122,7 @@
 
 	r := httptest.NewRequest("GET", "/on_after_task", nil)
 
-	s, err := New(&botmachine.Machine{})
+	s, err := New(&botmachine.Machine{}, make(chan bool))
 	require.NoError(t, err)
 	s.machine.SetIsRunningSwarmingTask(true)
 	require.True(t, s.machine.IsRunningSwarmingTask())