[CT] Turn on autoscaling for all 500 CT GCE instances
Bug: skia:8685
Change-Id: Iea3ff9c02e80a37fdf2ac42b7e005b5a27868378
Reviewed-on: https://skia-review.googlesource.com/c/184184
Reviewed-by: Ben Wagner <benjaminwagner@google.com>
Commit-Queue: Ravi Mistry <rmistry@google.com>
diff --git a/ct/go/ct_autoscaler/ct_autoscaler.go b/ct/go/ct_autoscaler/ct_autoscaler.go
index c112839..4e385e8 100644
--- a/ct/go/ct_autoscaler/ct_autoscaler.go
+++ b/ct/go/ct_autoscaler/ct_autoscaler.go
@@ -17,7 +17,7 @@
const (
MIN_CT_INSTANCE_NUM = 1
- MAX_CT_INSTANCE_NUM = 200
+ MAX_CT_INSTANCE_NUM = 500
)
// Interface useful for mocking.
@@ -69,11 +69,9 @@
if err := a.StopAllInstances(); err != nil {
return nil, err
}
- // Uncomment when https://bugs.chromium.org/p/skia/issues/detail?id=7900#c7
- // is resolved.
- //if err := s.DeleteBots(a.GetNamesOfManagedInstances()); err != nil {
- // return nil, err
- //}
+ if err := s.DeleteBots(a.GetNamesOfManagedInstances()); err != nil {
+ return nil, err
+ }
return &CTAutoscaler{a: a, s: s, upGauge: upGauge}, nil
}
@@ -127,12 +125,9 @@
return err
}
- // Delete all CT GCE instances from swarming.
- // Uncomment when https://bugs.chromium.org/p/skia/issues/detail?id=7900#c7
- // is resolved.
- //if err := c.s.DeleteBots(c.a.GetNamesOfManagedInstances()); err != nil {
- // return err
- //}
+ if err := c.s.DeleteBots(c.a.GetNamesOfManagedInstances()); err != nil {
+ return err
+ }
}
return nil
}
diff --git a/ct/go/ct_autoscaler/ct_autoscaler_test.go b/ct/go/ct_autoscaler/ct_autoscaler_test.go
index 545b3ea..bd841ed 100644
--- a/ct/go/ct_autoscaler/ct_autoscaler_test.go
+++ b/ct/go/ct_autoscaler/ct_autoscaler_test.go
@@ -31,7 +31,7 @@
testutils.SmallTest(t)
mock := &autoscaler.MockAutoscaler{}
s := swarming.NewMockApiClient()
- //s.On("DeleteBots", autoscaler.TestInstances).Return(nil)
+ s.On("DeleteBots", autoscaler.TestInstances).Return(nil)
defer s.AssertExpectations(t)
c := CTAutoscaler{a: mock, s: s}
@@ -40,19 +40,19 @@
assert.Nil(t, c.RegisterGCETask("test-task2"))
assert.Equal(t, 2, c.activeGCETasks)
assert.Equal(t, 1, mock.StartAllInstancesTimesCalled)
- //s.AssertNumberOfCalls(t, "DeleteBots", 0)
+ s.AssertNumberOfCalls(t, "DeleteBots", 0)
// Unregistering the 1st task should not stop all instances.
assert.Nil(t, c.UnregisterGCETask("test-task1"))
- //s.AssertNumberOfCalls(t, "DeleteBots", 0)
+ s.AssertNumberOfCalls(t, "DeleteBots", 0)
assert.Equal(t, 1, c.activeGCETasks)
assert.Equal(t, 1, mock.StartAllInstancesTimesCalled)
assert.Equal(t, 0, mock.StopAllInstancesTimesCalled)
// Unregistering the 2nd task should stop all instances.
- //s.On("DeleteBots", autoscaler.TestInstances).Return(nil)
+ s.On("DeleteBots", autoscaler.TestInstances).Return(nil)
assert.Nil(t, c.UnregisterGCETask("test-task2"))
- //s.AssertNumberOfCalls(t, "DeleteBots", 1)
+ s.AssertNumberOfCalls(t, "DeleteBots", 1)
assert.Equal(t, 0, c.activeGCETasks)
assert.Equal(t, 1, mock.StartAllInstancesTimesCalled)
assert.Equal(t, 1, mock.StopAllInstancesTimesCalled)
@@ -60,13 +60,13 @@
// Registering and then unregistering a 3rd task should start and stop all
// instances.
assert.Nil(t, c.RegisterGCETask("test-task3"))
- //s.AssertNumberOfCalls(t, "DeleteBots", 1)
+ s.AssertNumberOfCalls(t, "DeleteBots", 1)
assert.Equal(t, 1, c.activeGCETasks)
assert.Equal(t, 2, mock.StartAllInstancesTimesCalled)
assert.Equal(t, 1, mock.StopAllInstancesTimesCalled)
assert.Nil(t, c.UnregisterGCETask("test-task3"))
- //s.AssertNumberOfCalls(t, "DeleteBots", 2)
+ s.AssertNumberOfCalls(t, "DeleteBots", 2)
assert.Equal(t, 0, c.activeGCETasks)
assert.Equal(t, 2, mock.StartAllInstancesTimesCalled)
assert.Equal(t, 2, mock.StopAllInstancesTimesCalled)
diff --git a/ct/go/poller/main.go b/ct/go/poller/main.go
index 267af1f..be3f6f7 100644
--- a/ct/go/poller/main.go
+++ b/ct/go/poller/main.go
@@ -21,6 +21,7 @@
"sync"
"time"
+ "go.skia.org/infra/ct/go/ct_autoscaler"
"go.skia.org/infra/ct/go/ctfe/admin_tasks"
"go.skia.org/infra/ct/go/ctfe/capture_skps"
"go.skia.org/infra/ct/go/ctfe/chromium_analysis"
@@ -494,7 +495,7 @@
// go routine. The function returns without waiting for the task to finish and the
// WaitGroup of the goroutine is returned to the caller. The caller can then call
// wg.Wait() if they would like to wait for the task to finish.
-func pollAndExecOnce(ctx context.Context, getPatchFunc GetPatchFunc) *sync.WaitGroup {
+func pollAndExecOnce(ctx context.Context, autoscaler ct_autoscaler.ICTAutoscaler, getPatchFunc GetPatchFunc) *sync.WaitGroup {
pending, err := frontend.GetOldestPendingTaskV2()
var wg sync.WaitGroup
if err != nil {
@@ -517,6 +518,13 @@
pickedUpTasks[taskId] = "1"
tasksMtx.Unlock()
+ if task.RunsOnGCEWorkers() {
+ if err := autoscaler.RegisterGCETask(taskId); err != nil {
+ sklog.Errorf("Error when registering GCE task in CT autoscaler: %s", err)
+ return &wg
+ }
+ }
+
sklog.Infof("Executing task %s", taskId)
// Increment the WaitGroup counter.
wg.Add(1)
@@ -534,6 +542,12 @@
tasksMtx.Lock()
delete(pickedUpTasks, taskId)
tasksMtx.Unlock()
+
+ if task.RunsOnGCEWorkers() {
+ if err := autoscaler.UnregisterGCETask(taskId); err != nil {
+ sklog.Errorf("Error when unregistering GCE task in CT autoscaler: %s", err)
+ }
+ }
}()
// Return the WaitGroup to allow some callers to call wg.Wait()
return &wg
@@ -542,6 +556,10 @@
func main() {
master_common.InitWithMetrics2("ct-poller", promPort)
+ autoscaler, err := ct_autoscaler.NewCTAutoscaler(*master_common.Local)
+ if err != nil {
+ sklog.Fatalf("Could not instantiate the CT autoscaler: %s", err)
+ }
healthyGauge := metrics2.GetInt64Metric("healthy")
// Terminate all tasks which were in running state when the poller was restarted.
@@ -569,10 +587,10 @@
// Run immediately, since pollTick will not fire until after pollInterval.
ctx := context.Background()
- pollAndExecOnce(ctx, ctutil.GetPatchFromStorage)
+ pollAndExecOnce(ctx, autoscaler, ctutil.GetPatchFromStorage)
for range time.Tick(*pollInterval) {
healthyGauge.Update(1)
- pollAndExecOnce(ctx, ctutil.GetPatchFromStorage)
+ pollAndExecOnce(ctx, autoscaler, ctutil.GetPatchFromStorage)
// Sleeping for a second to avoid the small probability of ending up
// with 2 tasks with the same runID. For context see
// https://skia-review.googlesource.com/c/26941/8/ct/go/poller/main.go#96
diff --git a/ct/go/poller/poller_test.go b/ct/go/poller/poller_test.go
index 8b0248d..8c94fdd 100644
--- a/ct/go/poller/poller_test.go
+++ b/ct/go/poller/poller_test.go
@@ -15,6 +15,7 @@
"cloud.google.com/go/datastore"
expect "github.com/stretchr/testify/assert"
assert "github.com/stretchr/testify/require"
+ "go.skia.org/infra/ct/go/ct_autoscaler"
"go.skia.org/infra/ct/go/ctfe/admin_tasks"
"go.skia.org/infra/ct/go/ctfe/capture_skps"
"go.skia.org/infra/ct/go/ctfe/chromium_builds"
@@ -555,6 +556,7 @@
mockExec := exec.CommandCollector{}
ctx := exec.NewContext(context.Background(), mockExec.Run)
task := pendingRecreateWebpageArchivesTask()
+ mockCTAutoscaler := &ct_autoscaler.MockCTAutoscaler{}
mockServer := frontend.MockServer{}
mockServer.SetCurrentTask(&task.RecreateWebpageArchivesDatastoreTask)
defer frontend.CloseTestServer(frontend.InitTestServer(&mockServer))
@@ -564,10 +566,12 @@
return patchId, nil
}
- wg := pollAndExecOnce(ctx, mockGetPatchFromStorageFunc)
+ wg := pollAndExecOnce(ctx, mockCTAutoscaler, mockGetPatchFromStorageFunc)
wg.Wait()
// Expect only one poll.
expect.Equal(t, 1, mockServer.OldestPendingTaskReqCount())
+ expect.Equal(t, 1, mockCTAutoscaler.RegisterGCETaskTimesCalled)
+ expect.Equal(t, 1, mockCTAutoscaler.UnregisterGCETaskTimesCalled)
expect.Equal(t, 0, getPatchCalls)
// Expect one command: capture_archives_on_workers ...
commands := mockExec.Commands()
@@ -583,6 +587,7 @@
mockExec := exec.CommandCollector{}
ctx := exec.NewContext(context.Background(), mockExec.Run)
task1 := pendingRecreateWebpageArchivesTask()
+ mockCTAutoscaler := &ct_autoscaler.MockCTAutoscaler{}
mockServer := frontend.MockServer{}
mockServer.SetCurrentTask(&task1.RecreateWebpageArchivesDatastoreTask)
defer frontend.CloseTestServer(frontend.InitTestServer(&mockServer))
@@ -593,17 +598,19 @@
}
// Poll frontend and execute the first task.
- wg1 := pollAndExecOnce(ctx, mockGetPatchFromStorageFunc)
+ wg1 := pollAndExecOnce(ctx, mockCTAutoscaler, mockGetPatchFromStorageFunc)
wg1.Wait() // Wait for task to return to make asserting commands deterministic.
// Update current task.
task2 := pendingChromiumPerfTask()
mockServer.SetCurrentTask(&task2.DatastoreTask)
// Poll frontend and execute the second task.
- wg2 := pollAndExecOnce(ctx, mockGetPatchFromStorageFunc)
+ wg2 := pollAndExecOnce(ctx, mockCTAutoscaler, mockGetPatchFromStorageFunc)
wg2.Wait() // Wait for task to return to make asserting commands deterministic.
// Expect two pending task requests.
expect.Equal(t, 2, mockServer.OldestPendingTaskReqCount())
+ expect.Equal(t, 1, mockCTAutoscaler.RegisterGCETaskTimesCalled)
+ expect.Equal(t, 1, mockCTAutoscaler.UnregisterGCETaskTimesCalled)
// Expect two commands: capture_archives_on_workers ...; run_chromium_perf_on_workers ...
commands := mockExec.Commands()
assert.Len(t, commands, 2)
@@ -621,6 +628,7 @@
commandCollector.SetDelegateRun(mockRun.Run)
ctx := exec.NewContext(context.Background(), commandCollector.Run)
task := pendingRecreateWebpageArchivesTask()
+ mockCTAutoscaler := &ct_autoscaler.MockCTAutoscaler{}
mockServer := frontend.MockServer{}
mockServer.SetCurrentTask(&task.RecreateWebpageArchivesDatastoreTask)
defer frontend.CloseTestServer(frontend.InitTestServer(&mockServer))
@@ -631,10 +639,12 @@
return patchId, nil
}
- wg := pollAndExecOnce(ctx, mockGetPatchFromStorageFunc)
+ wg := pollAndExecOnce(ctx, mockCTAutoscaler, mockGetPatchFromStorageFunc)
wg.Wait()
// Expect only one poll.
expect.Equal(t, 1, mockServer.OldestPendingTaskReqCount())
+ expect.Equal(t, 1, mockCTAutoscaler.RegisterGCETaskTimesCalled)
+ expect.Equal(t, 1, mockCTAutoscaler.UnregisterGCETaskTimesCalled)
// Expect one command: capture_archives_on_workers ...
commands := commandCollector.Commands()
assert.Len(t, commands, 1)
@@ -655,6 +665,7 @@
func TestPollAndExecOnceNoTasks(t *testing.T) {
testutils.SmallTest(t)
+ mockCTAutoscaler := &ct_autoscaler.MockCTAutoscaler{}
mockServer := frontend.MockServer{}
mockServer.SetCurrentTask(nil)
defer frontend.CloseTestServer(frontend.InitTestServer(&mockServer))
@@ -667,14 +678,16 @@
}
// Poll frontend, no tasks.
- wg1 := pollAndExecOnce(ctx, mockGetPatchFromStorageFunc)
- wg2 := pollAndExecOnce(ctx, mockGetPatchFromStorageFunc)
- wg3 := pollAndExecOnce(ctx, mockGetPatchFromStorageFunc)
+ wg1 := pollAndExecOnce(ctx, mockCTAutoscaler, mockGetPatchFromStorageFunc)
+ wg2 := pollAndExecOnce(ctx, mockCTAutoscaler, mockGetPatchFromStorageFunc)
+ wg3 := pollAndExecOnce(ctx, mockCTAutoscaler, mockGetPatchFromStorageFunc)
// Expect three polls.
wg1.Wait()
wg2.Wait()
wg3.Wait()
expect.Equal(t, 3, mockServer.OldestPendingTaskReqCount())
+ expect.Equal(t, 0, mockCTAutoscaler.RegisterGCETaskTimesCalled)
+ expect.Equal(t, 0, mockCTAutoscaler.UnregisterGCETaskTimesCalled)
expect.Equal(t, 0, getPatchCalls)
// Expect no commands.
expect.Empty(t, mockExec.Commands())