[machine] Add PowerCycle to machine.Description.

Change-Id: Id717aa354ca46b5cba1ac1dc9c1aecc49172eb86
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/289887
Reviewed-by: Kevin Lubick <kjlubick@google.com>
Commit-Queue: Joe Gregorio <jcgregorio@google.com>
diff --git a/machine/go/machine/machine.go b/machine/go/machine/machine.go
index 1479a82..3880bc8 100644
--- a/machine/go/machine/machine.go
+++ b/machine/go/machine/machine.go
@@ -56,6 +56,9 @@
 	// the pod should be deleted.
 	ScheduledForDeletion string
 
+	// PowerCycle is true if the machine needs to be power-cycled.
+	PowerCycle bool
+
 	LastUpdated         time.Time
 	Battery             int                // Charge as an integer percent, e.g. 50% = 50.
 	Temperature         map[string]float64 // In Celsius.
diff --git a/machine/go/machine/machine_test.go b/machine/go/machine/machine_test.go
index 9290640..8de666a 100644
--- a/machine/go/machine/machine_test.go
+++ b/machine/go/machine/machine_test.go
@@ -30,6 +30,7 @@
 		Battery:              91,
 		Temperature:          map[string]float64{"cpu": 26.4},
 		RunningSwarmingTask:  true,
+		PowerCycle:           true,
 	}
 	out := in.Copy()
 	require.Equal(t, in, out)
diff --git a/machine/go/machine/store/impl.go b/machine/go/machine/store/impl.go
index de034a9..cad2c25 100644
--- a/machine/go/machine/store/impl.go
+++ b/machine/go/machine/store/impl.go
@@ -40,6 +40,8 @@
 	listIterFailureCounter                      metrics2.Counter
 	watchForDeletablePodsReceiveSnapshotCounter metrics2.Counter
 	watchForDeletablePodsDataToErrorCounter     metrics2.Counter
+	watchForPowerCycleReceiveSnapshotCounter    metrics2.Counter
+	watchForPowerCycleDataToErrorCounter        metrics2.Counter
 }
 
 // storeDescription is how machine.Description is mapped into firestore.
@@ -68,6 +70,9 @@
 	// RunningSwarmingTask is a mirror of MachineDescription.RunningSwarmingTask.
 	RunningSwarmingTask bool
 
+	// PowerCycle is a mirror of MachineDescription.PowerCycle.
+	PowerCycle bool
+
 	// MachineDescription is the full machine.Description. The values that are
 	// mirrored to fields of storeDescription are still fully stored here and
 	// are considered the source of truth.
@@ -96,6 +101,8 @@
 		listIterFailureCounter:                      metrics2.GetCounter("machine_store_list_iter_error"),
 		watchForDeletablePodsReceiveSnapshotCounter: metrics2.GetCounter("machine_store_watch_for_deletable_pods_receive_snapshot"),
 		watchForDeletablePodsDataToErrorCounter:     metrics2.GetCounter("machine_store_watch_for_deletable_pods_datato_error"),
+		watchForPowerCycleReceiveSnapshotCounter:    metrics2.GetCounter("machine_store_watch_for_power_cycle_receive_snapshot"),
+		watchForPowerCycleDataToErrorCounter:        metrics2.GetCounter("machine_store_watch_for_power_cycle_datato_error"),
 	}, nil
 }
 
@@ -106,6 +113,7 @@
 	return st.firestoreClient.RunTransaction(ctx, "store", "update", updateRetries, updateTimeout, func(ctx context.Context, tx *gcfirestore.Transaction) error {
 		var storeDescription storeDescription
 		machineDescription := machine.NewDescription()
+		machineDescription.Dimensions[machine.DimID] = []string{machineID}
 		if snap, err := tx.Get(docRef); err == nil {
 			if err := snap.DataTo(&storeDescription); err != nil {
 				st.updateDataToErrorCounter.Inc(1)
@@ -189,6 +197,36 @@
 	return ch
 }
 
+// WatchForPowerCycle implements the Store interface.
+func (st *StoreImpl) WatchForPowerCycle(ctx context.Context) <-chan string {
+	q := st.machinesCollection.Where("PowerCycle", "==", true).Where("RunningSwarmingTask", "==", false)
+	ch := make(chan string)
+	go func() {
+		defer close(ch)
+		for qsnap := range firestore.QuerySnapshotChannel(ctx, q) {
+			for {
+				snap, err := qsnap.Documents.Next()
+				if err == iterator.Done {
+					break
+				}
+				if err != nil {
+					sklog.Errorf("Failed to read document snapshot: %s", err)
+				}
+				var storeDescription storeDescription
+				if err := snap.DataTo(&storeDescription); err != nil {
+					sklog.Errorf("Failed to read data from snapshot: %s", err)
+					st.watchForPowerCycleDataToErrorCounter.Inc(1)
+					continue
+				}
+				machineDescription := storeToMachineDescription(storeDescription)
+				st.watchForPowerCycleReceiveSnapshotCounter.Inc(1)
+				ch <- machineDescription.Dimensions[machine.DimID][0]
+			}
+		}
+	}()
+	return ch
+}
+
 // List implements the Store interface.
 func (st *StoreImpl) List(ctx context.Context) ([]machine.Description, error) {
 	st.listCounter.Inc(1)
@@ -225,6 +263,7 @@
 		LastUpdated:          m.LastUpdated,
 		ScheduledForDeletion: m.ScheduledForDeletion,
 		RunningSwarmingTask:  m.RunningSwarmingTask,
+		PowerCycle:           m.PowerCycle,
 		MachineDescription:   m,
 	}
 }
diff --git a/machine/go/machine/store/impl_test.go b/machine/go/machine/store/impl_test.go
index 0559434..2b067a3 100644
--- a/machine/go/machine/store/impl_test.go
+++ b/machine/go/machine/store/impl_test.go
@@ -43,6 +43,22 @@
 	}, m)
 }
 
+func TestMachineToStoreDescription_WithPowerCycle(t *testing.T) {
+	unittest.SmallTest(t)
+	d := machine.NewDescription()
+	d.Dimensions[machine.DimOS] = []string{"Android"}
+	d.PowerCycle = true
+
+	m := machineDescriptionToStoreDescription(d)
+	assert.Equal(t, storeDescription{
+		OS:                 []string{"Android"},
+		Mode:               d.Mode,
+		LastUpdated:        d.LastUpdated,
+		MachineDescription: d,
+		PowerCycle:         true,
+	}, m)
+}
+
 func setupForTest(t *testing.T) (context.Context, config.InstanceConfig) {
 	require.NotEmpty(t, os.Getenv("FIRESTORE_EMULATOR_HOST"), "This test requires the firestore emulator.")
 	cfg := config.InstanceConfig{
@@ -171,7 +187,10 @@
 	// Wait for first description.
 	m := <-ch
 	assert.Equal(t, machine.ModeMaintenance, m.Mode)
-	assert.Equal(t, machine.SwarmingDimensions{machine.DimOS: {"Android"}}, m.Dimensions)
+	assert.Equal(t, machine.SwarmingDimensions{
+		machine.DimID: {"skia-rpi2-rack2-shelf1-001"},
+		machine.DimOS: {"Android"},
+	}, m.Dimensions)
 	assert.Equal(t, "Hello World!", m.Annotation.Message)
 	assert.NoError(t, store.firestoreClient.Close())
 }
@@ -228,7 +247,10 @@
 	descriptions, err = store.List(ctx)
 	require.NoError(t, err)
 	assert.Len(t, descriptions, 1)
-	assert.Equal(t, machine.SwarmingDimensions{"foo": {"bar", "baz"}}, descriptions[0].Dimensions)
+	assert.Equal(t, machine.SwarmingDimensions{
+		"foo":         {"bar", "baz"},
+		machine.DimID: {"skia-rpi2-rack2-shelf1-001"},
+	}, descriptions[0].Dimensions)
 
 	// Add a second description.
 	err = store.Update(ctx, "skia-rpi2-rack2-shelf1-002", func(previous machine.Description) machine.Description {
@@ -374,3 +396,126 @@
 	}
 	assert.NoError(t, store.firestoreClient.Close())
 }
+
+func TestWatchForPowerCycle_Success(t *testing.T) {
+	unittest.LargeTest(t)
+	ctx, cfg := setupForTest(t)
+	store, err := New(ctx, true, cfg)
+	require.NoError(t, err)
+
+	ctx, cancel := context.WithCancel(ctx)
+	defer cancel()
+
+	defer store.watchForPowerCycleDataToErrorCounter.Reset()
+	defer store.watchForPowerCycleReceiveSnapshotCounter.Reset()
+
+	// First add the watch.
+	ch := store.WatchForPowerCycle(ctx)
+
+	const machineName = "skia-rpi2-rack2-shelf1-001"
+
+	// Then create the document.
+	err = store.Update(ctx, machineName, func(previous machine.Description) machine.Description {
+		ret := previous.Copy()
+		ret.Mode = machine.ModeMaintenance
+		ret.RunningSwarmingTask = false
+		ret.PowerCycle = true
+		return ret
+	})
+	require.NoError(t, err)
+
+	// Wait for first pod name.
+	m := <-ch
+	assert.Equal(t, machineName, m)
+
+	assert.Equal(t, int64(1), store.watchForPowerCycleReceiveSnapshotCounter.Get())
+	assert.Equal(t, int64(0), store.watchForPowerCycleDataToErrorCounter.Get())
+	assert.NoError(t, store.firestoreClient.Close())
+}
+
+func TestWatchForPowerCycle_OnlyMatchesTheRightMachines(t *testing.T) {
+	unittest.LargeTest(t)
+	ctx, cfg := setupForTest(t)
+	store, err := New(ctx, true, cfg)
+	require.NoError(t, err)
+
+	ctx, cancel := context.WithCancel(ctx)
+	defer cancel()
+
+	defer store.watchForPowerCycleDataToErrorCounter.Reset()
+	defer store.watchForPowerCycleReceiveSnapshotCounter.Reset()
+
+	// First add the watch.
+	ch := store.WatchForPowerCycle(ctx)
+
+	// Add some machines that don't match the query.
+	err = store.Update(ctx, "skia-rpi2-rack4-shelf2-013", func(previous machine.Description) machine.Description {
+		ret := previous.Copy()
+		ret.Mode = machine.ModeMaintenance
+		ret.RunningSwarmingTask = true
+		ret.PowerCycle = true
+		return ret
+	})
+	require.NoError(t, err)
+
+	err = store.Update(ctx, "skia-rpi2-rack4-shelf1-021", func(previous machine.Description) machine.Description {
+		ret := previous.Copy()
+		ret.Mode = machine.ModeMaintenance
+		ret.RunningSwarmingTask = false
+		ret.PowerCycle = false
+		return ret
+	})
+	require.NoError(t, err)
+
+	// Now add a machine that will match the query.
+	const machineName = "skia-rpi2-rack2-shelf1-001"
+
+	// Then create the document.
+	err = store.Update(ctx, machineName, func(previous machine.Description) machine.Description {
+		ret := previous.Copy()
+		ret.Mode = machine.ModeMaintenance
+		ret.RunningSwarmingTask = false
+		ret.PowerCycle = true
+		return ret
+	})
+	require.NoError(t, err)
+
+	// Wait for first pod name.
+	m := <-ch
+	assert.Equal(t, machineName, m)
+
+	assert.Equal(t, int64(1), store.watchForPowerCycleReceiveSnapshotCounter.Get())
+	assert.Equal(t, int64(0), store.watchForPowerCycleDataToErrorCounter.Get())
+	assert.NoError(t, store.firestoreClient.Close())
+}
+
+func TestWatchForPowerCycle_ISCancellable(t *testing.T) {
+	unittest.LargeTest(t)
+	ctx, cfg := setupForTest(t)
+	store, err := New(ctx, true, cfg)
+	require.NoError(t, err)
+
+	ctx, cancel := context.WithCancel(ctx)
+
+	// First add the watch.
+	ch := store.WatchForPowerCycle(ctx)
+
+	const machineName = "skia-rpi2-rack2-shelf1-001"
+
+	// Then create the document.
+	err = store.Update(ctx, machineName, func(previous machine.Description) machine.Description {
+		ret := previous.Copy()
+		ret.Mode = machine.ModeMaintenance
+		ret.RunningSwarmingTask = false
+		ret.PowerCycle = true
+		return ret
+	})
+	require.NoError(t, err)
+
+	cancel()
+
+	// The test passes if we get past this loop since that means the channel was closed.
+	for range ch {
+	}
+	assert.NoError(t, store.firestoreClient.Close())
+}
diff --git a/machine/go/machine/store/store.go b/machine/go/machine/store/store.go
index 753406d..df4d8cb 100644
--- a/machine/go/machine/store/store.go
+++ b/machine/go/machine/store/store.go
@@ -32,6 +32,10 @@
 	// restarted, but with the latest image.
 	WatchForDeletablePods(ctx context.Context) <-chan string
 
+	// WatchForPowerCycle returns a channel that will produce the name of a
+	// machine that needs to be power-cycled.
+	WatchForPowerCycle(ctx context.Context) <-chan string
+
 	// List returns a slice containing all known machines.
 	List(ctx context.Context) ([]machine.Description, error)
 }