[CT] Turn on autoscaling for all 500 CT GCE instances

Bug: skia:8685
Change-Id: Iea3ff9c02e80a37fdf2ac42b7e005b5a27868378
Reviewed-on: https://skia-review.googlesource.com/c/184184
Reviewed-by: Ben Wagner <benjaminwagner@google.com>
Commit-Queue: Ravi Mistry <rmistry@google.com>
diff --git a/ct/go/ct_autoscaler/ct_autoscaler.go b/ct/go/ct_autoscaler/ct_autoscaler.go
index c112839..4e385e8 100644
--- a/ct/go/ct_autoscaler/ct_autoscaler.go
+++ b/ct/go/ct_autoscaler/ct_autoscaler.go
@@ -17,7 +17,7 @@
 
 const (
 	MIN_CT_INSTANCE_NUM = 1
-	MAX_CT_INSTANCE_NUM = 200
+	MAX_CT_INSTANCE_NUM = 500
 )
 
 // Interface useful for mocking.
@@ -69,11 +69,9 @@
 	if err := a.StopAllInstances(); err != nil {
 		return nil, err
 	}
-	// Uncomment when https://bugs.chromium.org/p/skia/issues/detail?id=7900#c7
-	// is resolved.
-	//if err := s.DeleteBots(a.GetNamesOfManagedInstances()); err != nil {
-	//	return nil, err
-	//}
+	if err := s.DeleteBots(a.GetNamesOfManagedInstances()); err != nil {
+		return nil, err
+	}
 
 	return &CTAutoscaler{a: a, s: s, upGauge: upGauge}, nil
 }
@@ -127,12 +125,9 @@
 			return err
 		}
 
-		// Delete all CT GCE instances from swarming.
-		// Uncomment when https://bugs.chromium.org/p/skia/issues/detail?id=7900#c7
-		// is resolved.
-		//if err := c.s.DeleteBots(c.a.GetNamesOfManagedInstances()); err != nil {
-		//	return err
-		//}
+		if err := c.s.DeleteBots(c.a.GetNamesOfManagedInstances()); err != nil {
+			return err
+		}
 	}
 	return nil
 }
diff --git a/ct/go/ct_autoscaler/ct_autoscaler_test.go b/ct/go/ct_autoscaler/ct_autoscaler_test.go
index 545b3ea..bd841ed 100644
--- a/ct/go/ct_autoscaler/ct_autoscaler_test.go
+++ b/ct/go/ct_autoscaler/ct_autoscaler_test.go
@@ -31,7 +31,7 @@
 	testutils.SmallTest(t)
 	mock := &autoscaler.MockAutoscaler{}
 	s := swarming.NewMockApiClient()
-	//s.On("DeleteBots", autoscaler.TestInstances).Return(nil)
+	s.On("DeleteBots", autoscaler.TestInstances).Return(nil)
 	defer s.AssertExpectations(t)
 	c := CTAutoscaler{a: mock, s: s}
 
@@ -40,19 +40,19 @@
 	assert.Nil(t, c.RegisterGCETask("test-task2"))
 	assert.Equal(t, 2, c.activeGCETasks)
 	assert.Equal(t, 1, mock.StartAllInstancesTimesCalled)
-	//s.AssertNumberOfCalls(t, "DeleteBots", 0)
+	s.AssertNumberOfCalls(t, "DeleteBots", 0)
 
 	// Unregistering the 1st task should not stop all instances.
 	assert.Nil(t, c.UnregisterGCETask("test-task1"))
-	//s.AssertNumberOfCalls(t, "DeleteBots", 0)
+	s.AssertNumberOfCalls(t, "DeleteBots", 0)
 	assert.Equal(t, 1, c.activeGCETasks)
 	assert.Equal(t, 1, mock.StartAllInstancesTimesCalled)
 	assert.Equal(t, 0, mock.StopAllInstancesTimesCalled)
 
 	// Unregistering the 2nd task should stop all instances.
-	//s.On("DeleteBots", autoscaler.TestInstances).Return(nil)
+	s.On("DeleteBots", autoscaler.TestInstances).Return(nil)
 	assert.Nil(t, c.UnregisterGCETask("test-task2"))
-	//s.AssertNumberOfCalls(t, "DeleteBots", 1)
+	s.AssertNumberOfCalls(t, "DeleteBots", 1)
 	assert.Equal(t, 0, c.activeGCETasks)
 	assert.Equal(t, 1, mock.StartAllInstancesTimesCalled)
 	assert.Equal(t, 1, mock.StopAllInstancesTimesCalled)
@@ -60,13 +60,13 @@
 	// Registering and then unregistering a 3rd task should start and stop all
 	// instances.
 	assert.Nil(t, c.RegisterGCETask("test-task3"))
-	//s.AssertNumberOfCalls(t, "DeleteBots", 1)
+	s.AssertNumberOfCalls(t, "DeleteBots", 1)
 	assert.Equal(t, 1, c.activeGCETasks)
 	assert.Equal(t, 2, mock.StartAllInstancesTimesCalled)
 	assert.Equal(t, 1, mock.StopAllInstancesTimesCalled)
 
 	assert.Nil(t, c.UnregisterGCETask("test-task3"))
-	//s.AssertNumberOfCalls(t, "DeleteBots", 2)
+	s.AssertNumberOfCalls(t, "DeleteBots", 2)
 	assert.Equal(t, 0, c.activeGCETasks)
 	assert.Equal(t, 2, mock.StartAllInstancesTimesCalled)
 	assert.Equal(t, 2, mock.StopAllInstancesTimesCalled)
diff --git a/ct/go/poller/main.go b/ct/go/poller/main.go
index 267af1f..be3f6f7 100644
--- a/ct/go/poller/main.go
+++ b/ct/go/poller/main.go
@@ -21,6 +21,7 @@
 	"sync"
 	"time"
 
+	"go.skia.org/infra/ct/go/ct_autoscaler"
 	"go.skia.org/infra/ct/go/ctfe/admin_tasks"
 	"go.skia.org/infra/ct/go/ctfe/capture_skps"
 	"go.skia.org/infra/ct/go/ctfe/chromium_analysis"
@@ -494,7 +495,7 @@
 // go routine. The function returns without waiting for the task to finish and the
 // WaitGroup of the goroutine is returned to the caller. The caller can then call
 // wg.Wait() if they would like to wait for the task to finish.
-func pollAndExecOnce(ctx context.Context, getPatchFunc GetPatchFunc) *sync.WaitGroup {
+func pollAndExecOnce(ctx context.Context, autoscaler ct_autoscaler.ICTAutoscaler, getPatchFunc GetPatchFunc) *sync.WaitGroup {
 	pending, err := frontend.GetOldestPendingTaskV2()
 	var wg sync.WaitGroup
 	if err != nil {
@@ -517,6 +518,13 @@
 	pickedUpTasks[taskId] = "1"
 	tasksMtx.Unlock()
 
+	if task.RunsOnGCEWorkers() {
+		if err := autoscaler.RegisterGCETask(taskId); err != nil {
+			sklog.Errorf("Error when registering GCE task in CT autoscaler: %s", err)
+			return &wg
+		}
+	}
+
 	sklog.Infof("Executing task %s", taskId)
 	// Increment the WaitGroup counter.
 	wg.Add(1)
@@ -534,6 +542,12 @@
 		tasksMtx.Lock()
 		delete(pickedUpTasks, taskId)
 		tasksMtx.Unlock()
+
+		if task.RunsOnGCEWorkers() {
+			if err := autoscaler.UnregisterGCETask(taskId); err != nil {
+				sklog.Errorf("Error when unregistering GCE task in CT autoscaler: %s", err)
+			}
+		}
 	}()
 	// Return the WaitGroup to allow some callers to call wg.Wait()
 	return &wg
@@ -542,6 +556,10 @@
 func main() {
 	master_common.InitWithMetrics2("ct-poller", promPort)
 
+	autoscaler, err := ct_autoscaler.NewCTAutoscaler(*master_common.Local)
+	if err != nil {
+		sklog.Fatalf("Could not instantiate the CT autoscaler: %s", err)
+	}
 	healthyGauge := metrics2.GetInt64Metric("healthy")
 
 	// Terminate all tasks which were in running state when the poller was restarted.
@@ -569,10 +587,10 @@
 
 	// Run immediately, since pollTick will not fire until after pollInterval.
 	ctx := context.Background()
-	pollAndExecOnce(ctx, ctutil.GetPatchFromStorage)
+	pollAndExecOnce(ctx, autoscaler, ctutil.GetPatchFromStorage)
 	for range time.Tick(*pollInterval) {
 		healthyGauge.Update(1)
-		pollAndExecOnce(ctx, ctutil.GetPatchFromStorage)
+		pollAndExecOnce(ctx, autoscaler, ctutil.GetPatchFromStorage)
 		// Sleeping for a second to avoid the small probability of ending up
 		// with 2 tasks with the same runID. For context see
 		// https://skia-review.googlesource.com/c/26941/8/ct/go/poller/main.go#96
diff --git a/ct/go/poller/poller_test.go b/ct/go/poller/poller_test.go
index 8b0248d..8c94fdd 100644
--- a/ct/go/poller/poller_test.go
+++ b/ct/go/poller/poller_test.go
@@ -15,6 +15,7 @@
 	"cloud.google.com/go/datastore"
 	expect "github.com/stretchr/testify/assert"
 	assert "github.com/stretchr/testify/require"
+	"go.skia.org/infra/ct/go/ct_autoscaler"
 	"go.skia.org/infra/ct/go/ctfe/admin_tasks"
 	"go.skia.org/infra/ct/go/ctfe/capture_skps"
 	"go.skia.org/infra/ct/go/ctfe/chromium_builds"
@@ -555,6 +556,7 @@
 	mockExec := exec.CommandCollector{}
 	ctx := exec.NewContext(context.Background(), mockExec.Run)
 	task := pendingRecreateWebpageArchivesTask()
+	mockCTAutoscaler := &ct_autoscaler.MockCTAutoscaler{}
 	mockServer := frontend.MockServer{}
 	mockServer.SetCurrentTask(&task.RecreateWebpageArchivesDatastoreTask)
 	defer frontend.CloseTestServer(frontend.InitTestServer(&mockServer))
@@ -564,10 +566,12 @@
 		return patchId, nil
 	}
 
-	wg := pollAndExecOnce(ctx, mockGetPatchFromStorageFunc)
+	wg := pollAndExecOnce(ctx, mockCTAutoscaler, mockGetPatchFromStorageFunc)
 	wg.Wait()
 	// Expect only one poll.
 	expect.Equal(t, 1, mockServer.OldestPendingTaskReqCount())
+	expect.Equal(t, 1, mockCTAutoscaler.RegisterGCETaskTimesCalled)
+	expect.Equal(t, 1, mockCTAutoscaler.UnregisterGCETaskTimesCalled)
 	expect.Equal(t, 0, getPatchCalls)
 	// Expect one command: capture_archives_on_workers ...
 	commands := mockExec.Commands()
@@ -583,6 +587,7 @@
 	mockExec := exec.CommandCollector{}
 	ctx := exec.NewContext(context.Background(), mockExec.Run)
 	task1 := pendingRecreateWebpageArchivesTask()
+	mockCTAutoscaler := &ct_autoscaler.MockCTAutoscaler{}
 	mockServer := frontend.MockServer{}
 	mockServer.SetCurrentTask(&task1.RecreateWebpageArchivesDatastoreTask)
 	defer frontend.CloseTestServer(frontend.InitTestServer(&mockServer))
@@ -593,17 +598,19 @@
 	}
 
 	// Poll frontend and execute the first task.
-	wg1 := pollAndExecOnce(ctx, mockGetPatchFromStorageFunc)
+	wg1 := pollAndExecOnce(ctx, mockCTAutoscaler, mockGetPatchFromStorageFunc)
 	wg1.Wait() // Wait for task to return to make asserting commands deterministic.
 	// Update current task.
 	task2 := pendingChromiumPerfTask()
 	mockServer.SetCurrentTask(&task2.DatastoreTask)
 	// Poll frontend and execute the second task.
-	wg2 := pollAndExecOnce(ctx, mockGetPatchFromStorageFunc)
+	wg2 := pollAndExecOnce(ctx, mockCTAutoscaler, mockGetPatchFromStorageFunc)
 	wg2.Wait() // Wait for task to return to make asserting commands deterministic.
 
 	// Expect two pending task requests.
 	expect.Equal(t, 2, mockServer.OldestPendingTaskReqCount())
+	expect.Equal(t, 1, mockCTAutoscaler.RegisterGCETaskTimesCalled)
+	expect.Equal(t, 1, mockCTAutoscaler.UnregisterGCETaskTimesCalled)
 	// Expect two commands: capture_archives_on_workers ...; run_chromium_perf_on_workers ...
 	commands := mockExec.Commands()
 	assert.Len(t, commands, 2)
@@ -621,6 +628,7 @@
 	commandCollector.SetDelegateRun(mockRun.Run)
 	ctx := exec.NewContext(context.Background(), commandCollector.Run)
 	task := pendingRecreateWebpageArchivesTask()
+	mockCTAutoscaler := &ct_autoscaler.MockCTAutoscaler{}
 	mockServer := frontend.MockServer{}
 	mockServer.SetCurrentTask(&task.RecreateWebpageArchivesDatastoreTask)
 	defer frontend.CloseTestServer(frontend.InitTestServer(&mockServer))
@@ -631,10 +639,12 @@
 		return patchId, nil
 	}
 
-	wg := pollAndExecOnce(ctx, mockGetPatchFromStorageFunc)
+	wg := pollAndExecOnce(ctx, mockCTAutoscaler, mockGetPatchFromStorageFunc)
 	wg.Wait()
 	// Expect only one poll.
 	expect.Equal(t, 1, mockServer.OldestPendingTaskReqCount())
+	expect.Equal(t, 1, mockCTAutoscaler.RegisterGCETaskTimesCalled)
+	expect.Equal(t, 1, mockCTAutoscaler.UnregisterGCETaskTimesCalled)
 	// Expect one command: capture_archives_on_workers ...
 	commands := commandCollector.Commands()
 	assert.Len(t, commands, 1)
@@ -655,6 +665,7 @@
 
 func TestPollAndExecOnceNoTasks(t *testing.T) {
 	testutils.SmallTest(t)
+	mockCTAutoscaler := &ct_autoscaler.MockCTAutoscaler{}
 	mockServer := frontend.MockServer{}
 	mockServer.SetCurrentTask(nil)
 	defer frontend.CloseTestServer(frontend.InitTestServer(&mockServer))
@@ -667,14 +678,16 @@
 	}
 
 	// Poll frontend, no tasks.
-	wg1 := pollAndExecOnce(ctx, mockGetPatchFromStorageFunc)
-	wg2 := pollAndExecOnce(ctx, mockGetPatchFromStorageFunc)
-	wg3 := pollAndExecOnce(ctx, mockGetPatchFromStorageFunc)
+	wg1 := pollAndExecOnce(ctx, mockCTAutoscaler, mockGetPatchFromStorageFunc)
+	wg2 := pollAndExecOnce(ctx, mockCTAutoscaler, mockGetPatchFromStorageFunc)
+	wg3 := pollAndExecOnce(ctx, mockCTAutoscaler, mockGetPatchFromStorageFunc)
 	// Expect three polls.
 	wg1.Wait()
 	wg2.Wait()
 	wg3.Wait()
 	expect.Equal(t, 3, mockServer.OldestPendingTaskReqCount())
+	expect.Equal(t, 0, mockCTAutoscaler.RegisterGCETaskTimesCalled)
+	expect.Equal(t, 0, mockCTAutoscaler.UnregisterGCETaskTimesCalled)
 	expect.Equal(t, 0, getPatchCalls)
 	// Expect no commands.
 	expect.Empty(t, mockExec.Commands())