[switchboard] Implement on Firestore.
Bug: skia:12063
Change-Id: I0a39bf176764b74325d2a13ee6499852be39ed60
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/417010
Reviewed-by: Ravi Mistry <rmistry@google.com>
Commit-Queue: Joe Gregorio <jcgregorio@google.com>
diff --git a/go/now/now.go b/go/now/now.go
index 79d8248..1f7c722 100644
--- a/go/now/now.go
+++ b/go/now/now.go
@@ -21,7 +21,7 @@
// The value set can also be a function that returns a time.Time.
//
// var monotonicTime int64 = 0
-// var mockTimeProvider = func() {
+// var mockTimeProvider = func() time.Time {
// monotonicTime += 1
// return time.Unix(monotonicTime, 0).UTC()
// }
diff --git a/machine/go/machine/store/impl_test.go b/machine/go/machine/store/impl_test.go
index ad2020f..92d076e 100644
--- a/machine/go/machine/store/impl_test.go
+++ b/machine/go/machine/store/impl_test.go
@@ -271,6 +271,7 @@
ret.Dimensions["foo"] = []string{"quux"}
return ret
})
+ require.NoError(t, err)
// Confirm they both show up in the list.
descriptions, err = store.List(ctx)
diff --git a/machine/go/switchboard/BUILD.bazel b/machine/go/switchboard/BUILD.bazel
index 05d9155..19bcdca 100644
--- a/machine/go/switchboard/BUILD.bazel
+++ b/machine/go/switchboard/BUILD.bazel
@@ -1,8 +1,36 @@
+load("//bazel/go:go_test.bzl", "go_test")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "switchboard",
- srcs = ["switchboard.go"],
+ srcs = [
+ "impl.go",
+ "switchboard.go",
+ ],
importpath = "go.skia.org/infra/machine/go/switchboard",
visibility = ["//visibility:public"],
+ deps = [
+ "//go/auth",
+ "//go/firestore",
+ "//go/metrics2",
+ "//go/now",
+ "//go/skerr",
+ "//go/sklog",
+ "//machine/go/machineserver/config",
+ "@com_google_cloud_go_firestore//:firestore",
+ "@org_golang_google_api//iterator",
+ ],
+)
+
+go_test(
+ name = "switchboard_test",
+ srcs = ["impl_test.go"],
+ embed = [":switchboard"],
+ deps = [
+ "//go/now",
+ "//go/testutils/unittest",
+ "//machine/go/machineserver/config",
+ "@com_github_google_uuid//:uuid",
+ "@com_github_stretchr_testify//require",
+ ],
)
diff --git a/machine/go/switchboard/impl.go b/machine/go/switchboard/impl.go
new file mode 100644
index 0000000..faa401a
--- /dev/null
+++ b/machine/go/switchboard/impl.go
@@ -0,0 +1,329 @@
+package switchboard
+
+import (
+ "context"
+ "fmt"
+ "math/rand"
+ "time"
+
+ gcfirestore "cloud.google.com/go/firestore"
+ "go.skia.org/infra/go/auth"
+ "go.skia.org/infra/go/firestore"
+ "go.skia.org/infra/go/metrics2"
+ "go.skia.org/infra/go/now"
+ "go.skia.org/infra/go/skerr"
+ "go.skia.org/infra/go/sklog"
+ "go.skia.org/infra/machine/go/machineserver/config"
+ "google.golang.org/api/iterator"
+)
+
+const (
+ meetingPointsCollectionName = "meetingpoints"
+
+ podsCollectionName = "pods"
+
+ appName = "machineserver"
+
+ updateTimeout = 10 * time.Second
+
+ updateRetries = 5
+
+ reserveRetries = 20
+
+ // We use a range of ports from 10000-20000 when pick ports on
+ // skia-switchboard pods, which is a large range to avoid collissions.
+ portRangeBegin = 10_000
+
+ portRangeEnd = 20_000
+)
+
+type metricName string
+
+const (
+ switchboardReserveMeetingpoint = "switchboard_reserve_meetingpoint"
+ switchboardReserveMeetingpointErrors = "switchboard_reserve_meetingpoint_errors"
+ switchboardClearMeetingpoint = "switchboard_clear_meetingpoint"
+ switchboardGetMeetingpoint = "switchboard_get_meetingpoint"
+ switchboardGetMeetingpointErrors = "switchboard_get_meetingpoint_errors"
+ switchboardKeepAliveMeetingpoint = "switchboard_keepalive_meetingpoint"
+ switchboardKeepAliveMeetingpointErrors = "switchboard_keepalive_meetingpoint_errors"
+ switchboardListMeetingpoint = "switchboard_list_meetingpoint"
+ switchboardAddPod = "switchboard_add_pod"
+ switchboardRemovePod = "switchboard_remove_pod"
+ switchboardKeepAlivePod = "switchboard_keepalive_pod"
+ switchboardKeepAlivePodErrors = "switchboard_keepalive_pod_errors"
+ switchboardListPod = "switchboard_list_pod"
+ switchboardListPodErrors = "switchboard_list_pod_errors"
+)
+
+var (
+ allMetricNames = []metricName{
+ switchboardReserveMeetingpoint,
+ switchboardReserveMeetingpointErrors,
+ switchboardClearMeetingpoint,
+ switchboardGetMeetingpoint,
+ switchboardGetMeetingpointErrors,
+ switchboardKeepAliveMeetingpoint,
+ switchboardKeepAliveMeetingpointErrors,
+ switchboardListMeetingpoint,
+ switchboardAddPod,
+ switchboardRemovePod,
+ switchboardKeepAlivePod,
+ switchboardKeepAlivePodErrors,
+ switchboardListPod,
+ switchboardListPodErrors,
+ }
+)
+
+// For boths Pods and MeetingPoints we will create parallel structs
+// podDesciption and meetingPointDescription that are used to store their values
+// in the datastore. This will make it easier to have them diverge in the future
+// if that's needed.
+
+// podDescription is the struct that gets stored in the pods Collection.
+type podDescription struct {
+ LastUpdated time.Time
+}
+
+func descriptionToPod(d podDescription, name string) Pod {
+ return Pod{
+ Name: name,
+ LastUpdated: d.LastUpdated,
+ }
+}
+
+// meetingPointDescription is the struct that gets stored in the meetingpoints Collection.
+type meetingPointDescription struct {
+ MeetingPoint
+}
+
+// The unique id for each MeetingPoint in the datastore is the PodName and the
+// Port.
+func (m meetingPointDescription) id() string {
+ return fmt.Sprintf("%s:%d", m.PodName, m.Port)
+}
+
+func descriptionToMeetingPoint(m meetingPointDescription) MeetingPoint {
+ return m.MeetingPoint
+}
+
+func meetingPointToDescription(m MeetingPoint) meetingPointDescription {
+ return meetingPointDescription{m}
+}
+
+// switchboardImpl implements Switchboard using Cloud Firestore as a backend.
+type switchboardImpl struct {
+ firestoreClient *firestore.Client
+ meetingPointsCollection *gcfirestore.CollectionRef
+ podsCollection *gcfirestore.CollectionRef
+ counters map[metricName]metrics2.Counter
+}
+
+// New returns a new instance of switchboardImpl.
+func New(ctx context.Context, local bool, instanceConfig config.InstanceConfig) (*switchboardImpl, error) {
+ ts, err := auth.NewDefaultTokenSource(local, "https://www.googleapis.com/auth/datastore")
+ if err != nil {
+ return nil, skerr.Wrapf(err, "Failed to create tokensource.")
+ }
+
+ firestoreClient, err := firestore.NewClient(ctx, instanceConfig.Store.Project, appName, instanceConfig.Store.Instance, ts)
+ if err != nil {
+ return nil, skerr.Wrapf(err, "Failed to create firestore client for app: %q instance: %q", appName, instanceConfig.Store.Instance)
+ }
+
+ counters := map[metricName]metrics2.Counter{}
+ for _, metricName := range allMetricNames {
+ counters[metricName] = metrics2.GetCounter(string(metricName))
+ }
+
+ return &switchboardImpl{
+ firestoreClient: firestoreClient,
+ meetingPointsCollection: firestoreClient.Collection(meetingPointsCollectionName),
+ podsCollection: firestoreClient.Collection(podsCollectionName),
+ counters: counters,
+ }, nil
+}
+
+// ReserveMeetingPoint implements Switchboard
+func (s *switchboardImpl) ReserveMeetingPoint(ctx context.Context, machineID string, username string) (MeetingPoint, error) {
+ s.counters[switchboardReserveMeetingpoint].Inc(1)
+ ret := MeetingPoint{
+ LastUpdated: now.Now(ctx),
+ Username: username,
+ MachineID: machineID,
+ }
+
+ // Loop until we find an open spot for a reservation.
+ //
+ // We use a range of 10,000 ports per pod, and go through this loop 20
+ // times, so even if we attach 100 machines to a single pod (which is a
+ // lot), we have a 100/10,000 = 1/100 chance of a collision per loop, and a
+ // chance of never finding an open spot is:
+ // 1.0 - (1/100)^20
+ // = 1.0 - (.01)^20
+ // = 1 - 1e-40
+ // = 99.99[35 more nines go here]% chance of success.
+ for i := 0; i < reserveRetries; i++ {
+ pods, err := s.ListPods(ctx)
+ if err != nil {
+ s.counters[switchboardReserveMeetingpointErrors].Inc(1)
+ sklog.Errorf("Reserve failed to load the available pods: %s", err)
+ continue
+ }
+
+ if len(pods) == 0 {
+ sklog.Error("No pods available")
+ s.counters[switchboardReserveMeetingpointErrors].Inc(1)
+ continue
+ }
+
+ // Choose a pod at random.
+ pod := pods[rand.Intn(len(pods))]
+ // Choose a port at random.
+ port := rand.Intn(portRangeEnd-portRangeBegin) + portRangeBegin
+ ret.PodName = pod.Name
+ ret.Port = port
+
+ desc := meetingPointToDescription(ret)
+ docRef := s.meetingPointsCollection.Doc(desc.id())
+
+ _, err = s.firestoreClient.Create(ctx, docRef, &desc, updateRetries, updateTimeout)
+ if err != nil {
+ sklog.Errorf("Failed to create: %s", err)
+ s.counters[switchboardReserveMeetingpointErrors].Inc(1)
+ continue
+ }
+ return ret, nil
+ }
+
+ return ret, ErrNoPodsFound
+}
+
+// ClearMeetingPoint implements Switchboard
+func (s *switchboardImpl) ClearMeetingPoint(ctx context.Context, meetingPoint MeetingPoint) error {
+ s.counters[switchboardClearMeetingpoint].Inc(1)
+ _, err := s.meetingPointsCollection.Doc(meetingPointToDescription(meetingPoint).id()).Delete(ctx)
+ return err
+}
+
+// GetMeetingPoint implements Switchboard
+func (s *switchboardImpl) GetMeetingPoint(ctx context.Context, machineID string) (MeetingPoint, error) {
+ s.counters[switchboardGetMeetingpoint].Inc(1)
+ iter := s.meetingPointsCollection.Where("MachineID", "==", machineID).OrderBy("LastUpdated", gcfirestore.Desc).Documents(ctx)
+
+ var ret MeetingPoint
+ snap, err := iter.Next()
+ if err == iterator.Done {
+ s.counters[switchboardGetMeetingpointErrors].Inc(1)
+ return ret, ErrMachineNotFound
+ }
+ if err != nil {
+ s.counters[switchboardGetMeetingpointErrors].Inc(1)
+ return ret, skerr.Wrapf(err, "List failed to read description.")
+ }
+ var desc meetingPointDescription
+ if err := snap.DataTo(&desc); err != nil {
+ s.counters[switchboardGetMeetingpointErrors].Inc(1)
+ sklog.Errorf("Failed to read data from snapshot: %s", err)
+ return ret, skerr.Wrapf(err, "List failed to load description.")
+ }
+ ret = descriptionToMeetingPoint(desc)
+
+ return ret, nil
+}
+
+// KeepAliveMeetingPoint implements Switchboard
+func (s *switchboardImpl) KeepAliveMeetingPoint(ctx context.Context, meetingPoint MeetingPoint) error {
+ s.counters[switchboardKeepAliveMeetingpoint].Inc(1)
+ desc := meetingPointToDescription(meetingPoint)
+ docRef := s.meetingPointsCollection.Doc(desc.id())
+ return s.firestoreClient.RunTransaction(ctx, "meetingpoints", "keepalive", updateRetries, updateTimeout, func(ctx context.Context, tx *gcfirestore.Transaction) error {
+ snap, err := tx.Get(docRef)
+ if err != nil {
+ s.counters[switchboardKeepAliveMeetingpointErrors].Inc(1)
+ return skerr.Wrapf(err, "Failed querying firestore for %q", desc.id())
+ }
+ if err := snap.DataTo(&desc); err != nil {
+ s.counters[switchboardKeepAliveMeetingpointErrors].Inc(1)
+ return skerr.Wrapf(err, "Failed to deserialize firestore Get response for %q", desc.id())
+ }
+ desc.LastUpdated = now.Now(ctx)
+
+ return tx.Set(docRef, &desc)
+ })
+}
+
+// AddPod implements Switchboard
+func (s *switchboardImpl) AddPod(ctx context.Context, podName string) error {
+ s.counters[switchboardAddPod].Inc(1)
+ docRef := s.podsCollection.Doc(podName)
+ return s.firestoreClient.RunTransaction(ctx, "pods", "add", updateRetries, updateTimeout, func(ctx context.Context, tx *gcfirestore.Transaction) error {
+ podDescription := podDescription{
+ LastUpdated: now.Now(ctx),
+ }
+ return tx.Create(docRef, &podDescription)
+ })
+}
+
+// KeepAlivePod implements Switchboard
+func (s *switchboardImpl) KeepAlivePod(ctx context.Context, podName string) error {
+ s.counters[switchboardKeepAlivePod].Inc(1)
+ docRef := s.podsCollection.Doc(podName)
+ return s.firestoreClient.RunTransaction(ctx, "pods", "keepalive", updateRetries, updateTimeout, func(ctx context.Context, tx *gcfirestore.Transaction) error {
+ var podDescription podDescription
+ snap, err := tx.Get(docRef)
+ if err != nil {
+ s.counters[switchboardKeepAlivePodErrors].Inc(1)
+ return skerr.Wrapf(err, "Failed querying firestore for %q", podName)
+ }
+ if err := snap.DataTo(&podDescription); err != nil {
+ s.counters[switchboardKeepAlivePodErrors].Inc(1)
+ return skerr.Wrapf(err, "Failed to deserialize firestore Get response for %q", podName)
+ }
+ podDescription.LastUpdated = now.Now(ctx)
+
+ return tx.Set(docRef, &podDescription)
+ })
+}
+
+// RemovePod implements Switchboard
+func (s *switchboardImpl) RemovePod(ctx context.Context, podName string) error {
+ s.counters[switchboardRemovePod].Inc(1)
+ _, err := s.podsCollection.Doc(podName).Delete(ctx)
+ return err
+}
+
+// ListPods implements Switchboard
+func (s *switchboardImpl) ListPods(ctx context.Context) ([]Pod, error) {
+ s.counters[switchboardListPod].Inc(1)
+
+ ret := []Pod{}
+ iter := s.podsCollection.Documents(ctx)
+ for {
+ snap, err := iter.Next()
+ if err == iterator.Done {
+ break
+ }
+ if err != nil {
+ s.counters[switchboardListPodErrors].Inc(1)
+ return nil, skerr.Wrapf(err, "List failed to read description.")
+ }
+ var podDescription podDescription
+ if err := snap.DataTo(&podDescription); err != nil {
+ s.counters[switchboardListPodErrors].Inc(1)
+ sklog.Errorf("Failed to read data from snapshot: %s", err)
+ continue
+ }
+
+ ret = append(ret, descriptionToPod(podDescription, snap.Ref.ID))
+ }
+ return ret, nil
+}
+
+// ListMeetingPoints implements Switchboard
+func (s *switchboardImpl) ListMeetingPoints(ctx context.Context) ([]MeetingPoint, error) {
+ panic("unimplemented")
+}
+
+// Assert that switchboardImpl implementst the Switchboard interface.
+var _ Switchboard = (*switchboardImpl)(nil)
diff --git a/machine/go/switchboard/impl_test.go b/machine/go/switchboard/impl_test.go
new file mode 100644
index 0000000..ff91b14
--- /dev/null
+++ b/machine/go/switchboard/impl_test.go
@@ -0,0 +1,350 @@
+package switchboard
+
+import (
+ "context"
+ "fmt"
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/require"
+ "go.skia.org/infra/go/now"
+ "go.skia.org/infra/go/testutils/unittest"
+ "go.skia.org/infra/machine/go/machineserver/config"
+)
+
+// Constants and variables used by all tests.
+
+const podName = "switch-pod-0"
+
+var (
+ // time to use by now.Now() by default.
+ mockTime = time.Unix(12, 0).UTC()
+
+ machineID = "skia-rpi2-rack4-shelf1-003"
+
+ userName = "chrome-bot"
+)
+
+func setupForTest(t *testing.T) (context.Context, *switchboardImpl) {
+ unittest.RequiresFirestoreEmulator(t)
+ cfg := config.InstanceConfig{
+ Store: config.Store{
+ Project: "test-project",
+ Instance: fmt.Sprintf("test-%s", uuid.New()),
+ },
+ }
+ ctx := context.Background()
+ ctx = context.WithValue(ctx, now.ContextKey, mockTime)
+ s, err := New(ctx, true, cfg)
+ for _, c := range s.counters {
+ c.Reset()
+ }
+ require.NoError(t, err)
+ return ctx, s
+}
+
+func TestAddPod_Success(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, s := setupForTest(t)
+
+ // Add a pod.
+ err := s.AddPod(ctx, podName)
+ require.NoError(t, err)
+
+ // Confirm it was added correctly.
+ pods, err := s.ListPods(ctx)
+ require.NoError(t, err)
+ require.Len(t, pods, 1)
+ require.Equal(t, Pod{
+ Name: podName,
+ LastUpdated: mockTime,
+ }, pods[0])
+ require.Equal(t, int64(1), s.counters[switchboardAddPod].Get())
+ require.Equal(t, int64(1), s.counters[switchboardListPod].Get())
+ require.Equal(t, int64(0), s.counters[switchboardListPodErrors].Get())
+}
+
+func TestAddPod_AddWhenAlreadyExisting_ReturnsError(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, s := setupForTest(t)
+
+ // Add a pod.
+ err := s.AddPod(ctx, podName)
+ require.NoError(t, err)
+
+ // Adding the same pod should fail.
+ err = s.AddPod(ctx, podName)
+ require.Error(t, err)
+
+ require.Equal(t, int64(2), s.counters[switchboardAddPod].Get())
+}
+
+func TestRemovePod_PodExists_Success(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, s := setupForTest(t)
+
+ // Add a pod.
+ err := s.AddPod(ctx, podName)
+ require.NoError(t, err)
+
+ // Now remove it.
+ err = s.RemovePod(ctx, podName)
+ require.NoError(t, err)
+
+ // Confirm it's gone.
+ pods, err := s.ListPods(ctx)
+ require.NoError(t, err)
+ require.Len(t, pods, 0)
+ require.Equal(t, int64(1), s.counters[switchboardAddPod].Get())
+ require.Equal(t, int64(1), s.counters[switchboardRemovePod].Get())
+ require.Equal(t, int64(1), s.counters[switchboardListPod].Get())
+ require.Equal(t, int64(0), s.counters[switchboardListPodErrors].Get())
+}
+
+func TestRemovePod_PodDoesNotExist_Success(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, s := setupForTest(t)
+
+ // Note we never added a pod.
+ err := s.RemovePod(ctx, podName)
+ require.NoError(t, err)
+ require.Equal(t, int64(1), s.counters[switchboardRemovePod].Get())
+}
+
+func TestKeepAlivePod_Success(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, s := setupForTest(t)
+
+ // Add a pod.
+ err := s.AddPod(ctx, podName)
+ require.NoError(t, err)
+
+ newMockTime := mockTime.Add(time.Hour)
+ ctx = context.WithValue(ctx, now.ContextKey, newMockTime)
+
+ // Call KeepAlivePod so the time gets updated.
+ err = s.KeepAlivePod(ctx, podName)
+ require.NoError(t, err)
+
+ // Confirm the time has been updated.
+ pods, err := s.ListPods(ctx)
+ require.NoError(t, err)
+ require.Len(t, pods, 1)
+ require.Equal(t, Pod{
+ Name: podName,
+ LastUpdated: newMockTime,
+ }, pods[0])
+
+ require.Equal(t, int64(1), s.counters[switchboardAddPod].Get())
+ require.Equal(t, int64(1), s.counters[switchboardKeepAlivePod].Get())
+ require.Equal(t, int64(0), s.counters[switchboardKeepAlivePodErrors].Get())
+ require.Equal(t, int64(1), s.counters[switchboardListPod].Get())
+ require.Equal(t, int64(0), s.counters[switchboardListPodErrors].Get())
+}
+
+func TestKeepAlivePod_PodDoesNotExist_ReturnsError(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, s := setupForTest(t)
+
+ // Note we never added a pod.
+ err := s.KeepAlivePod(ctx, podName)
+ require.Error(t, err)
+
+ require.Equal(t, int64(1), s.counters[switchboardKeepAlivePod].Get())
+ require.Equal(t, int64(1), s.counters[switchboardKeepAlivePodErrors].Get())
+}
+
+func TestReserveMeetingPoint_Success(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, s := setupForTest(t)
+
+ // Add a pod.
+ err := s.AddPod(ctx, podName)
+ require.NoError(t, err)
+
+ // Reserve a MeetingPoint.
+ meetingPoint, err := s.ReserveMeetingPoint(ctx, machineID, userName)
+ require.NoError(t, err)
+
+ // Confirm the MeetingPoint is in the correct pod.
+ require.GreaterOrEqual(t, meetingPoint.Port, portRangeBegin)
+ require.GreaterOrEqual(t, portRangeEnd, meetingPoint.Port)
+ require.Equal(t, mockTime, meetingPoint.LastUpdated)
+ require.Equal(t, meetingPoint.PodName, podName)
+ require.Equal(t, meetingPoint.Username, userName)
+
+ require.Equal(t, int64(1), s.counters[switchboardReserveMeetingpoint].Get())
+ require.Equal(t, int64(0), s.counters[switchboardReserveMeetingpointErrors].Get())
+}
+
+func TestReserveMeetingPoint_TriesAgainOnCollision_Success(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, s := setupForTest(t)
+
+ // Add a pod.
+ err := s.AddPod(ctx, podName)
+ require.NoError(t, err)
+
+ // Get a reservation for machineID.
+ rand.Seed(1)
+ meetingPoint, err := s.ReserveMeetingPoint(ctx, machineID, userName)
+ require.NoError(t, err)
+
+ // Reset rand so we guess the same port the first time, which should result
+ // in a single collision, which means it tries again and then succeeds.
+ rand.Seed(1)
+ meetingPoint, err = s.ReserveMeetingPoint(ctx, machineID, userName)
+ require.NoError(t, err)
+
+ require.GreaterOrEqual(t, meetingPoint.Port, portRangeBegin)
+ require.GreaterOrEqual(t, portRangeEnd, meetingPoint.Port)
+ require.Equal(t, mockTime, meetingPoint.LastUpdated)
+ require.Equal(t, meetingPoint.PodName, podName)
+ require.Equal(t, meetingPoint.Username, userName)
+
+ require.Equal(t, int64(2), s.counters[switchboardReserveMeetingpoint].Get())
+ require.Equal(t, int64(1), s.counters[switchboardReserveMeetingpointErrors].Get()) // Confirm we failed at least once.
+}
+
+func TestReserveMeetingPoint_NoPodsAvailable_ReturnsError(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, s := setupForTest(t)
+
+ // We haven't added any pods, so this should fail after 'reserveRetries' retries.
+ _, err := s.ReserveMeetingPoint(ctx, machineID, userName)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), ErrNoPodsFound.Error())
+
+ require.Equal(t, int64(1), s.counters[switchboardReserveMeetingpoint].Get())
+ require.Equal(t, int64(reserveRetries), s.counters[switchboardReserveMeetingpointErrors].Get())
+}
+
+func TestGetMeetingPoint_Success(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, s := setupForTest(t)
+
+ // Add a pod.
+ err := s.AddPod(ctx, podName)
+ require.NoError(t, err)
+
+ meetingPoint, err := s.ReserveMeetingPoint(ctx, machineID, userName)
+ require.NoError(t, err)
+
+ meetingPoint, err = s.GetMeetingPoint(ctx, machineID)
+ require.NoError(t, err)
+
+ require.GreaterOrEqual(t, meetingPoint.Port, portRangeBegin)
+ require.GreaterOrEqual(t, portRangeEnd, meetingPoint.Port)
+ require.Equal(t, mockTime, meetingPoint.LastUpdated)
+ require.Equal(t, meetingPoint.PodName, podName)
+ require.Equal(t, meetingPoint.Username, userName)
+
+ require.Equal(t, int64(1), s.counters[switchboardReserveMeetingpoint].Get())
+ require.Equal(t, int64(0), s.counters[switchboardReserveMeetingpointErrors].Get())
+ require.Equal(t, int64(1), s.counters[switchboardGetMeetingpoint].Get())
+ require.Equal(t, int64(0), s.counters[switchboardGetMeetingpointErrors].Get())
+}
+
+func TestGetMeetingPoint_NoSuchMachine_ReturnsError(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, s := setupForTest(t)
+
+ // Add a pod.
+ err := s.AddPod(ctx, podName)
+ require.NoError(t, err)
+
+ // Note that we don't call ReserveMeetingPoint before calling GetMeetingPoint.
+ _, err = s.GetMeetingPoint(ctx, machineID)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), ErrMachineNotFound.Error())
+
+ require.Equal(t, int64(1), s.counters[switchboardGetMeetingpoint].Get())
+ require.Equal(t, int64(1), s.counters[switchboardGetMeetingpointErrors].Get())
+}
+
+func TestKeepAliveMeetingPoint_Success(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, s := setupForTest(t)
+
+ // Add a pod.
+ err := s.AddPod(ctx, podName)
+ require.NoError(t, err)
+
+ meetingPoint, err := s.ReserveMeetingPoint(ctx, machineID, userName)
+ require.NoError(t, err)
+
+ // Advance the mock time.
+ newMockTime := mockTime.Add(time.Hour)
+ ctx = context.WithValue(ctx, now.ContextKey, newMockTime)
+
+ // newMockTime should be used for LastUpdated.
+ err = s.KeepAliveMeetingPoint(ctx, meetingPoint)
+ require.NoError(t, err)
+
+ meetingPoint, err = s.GetMeetingPoint(ctx, machineID)
+ require.NoError(t, err)
+
+ require.GreaterOrEqual(t, meetingPoint.Port, portRangeBegin)
+ require.GreaterOrEqual(t, portRangeEnd, meetingPoint.Port)
+ require.Equal(t, newMockTime, meetingPoint.LastUpdated) // LastUpdated has changed.
+ require.Equal(t, meetingPoint.PodName, podName)
+ require.Equal(t, meetingPoint.Username, userName)
+
+ require.Equal(t, int64(1), s.counters[switchboardReserveMeetingpoint].Get())
+ require.Equal(t, int64(0), s.counters[switchboardReserveMeetingpointErrors].Get())
+ require.Equal(t, int64(1), s.counters[switchboardKeepAliveMeetingpoint].Get())
+ require.Equal(t, int64(0), s.counters[switchboardKeepAliveMeetingpointErrors].Get())
+}
+
+func TestKeepAliveMeetingPoint_MeetingPointDoesNotExist_ReturnsError(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, s := setupForTest(t)
+
+ err := s.KeepAliveMeetingPoint(ctx, MeetingPoint{
+ PodName: podName,
+ Port: portRangeBegin,
+ })
+ require.Error(t, err)
+
+ require.Equal(t, int64(1), s.counters[switchboardKeepAliveMeetingpoint].Get())
+ require.Equal(t, int64(1), s.counters[switchboardKeepAliveMeetingpointErrors].Get())
+}
+
+func TestClearMeetingPoint_Success(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, s := setupForTest(t)
+
+ // Add a pod.
+ err := s.AddPod(ctx, podName)
+ require.NoError(t, err)
+
+ meetingPoint, err := s.ReserveMeetingPoint(ctx, machineID, userName)
+ require.NoError(t, err)
+
+ meetingPoint, err = s.GetMeetingPoint(ctx, machineID)
+ require.NoError(t, err)
+
+ err = s.ClearMeetingPoint(ctx, meetingPoint)
+ require.NoError(t, err)
+
+ meetingPoint, err = s.GetMeetingPoint(ctx, machineID)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), ErrMachineNotFound.Error())
+
+ require.Equal(t, int64(1), s.counters[switchboardClearMeetingpoint].Get())
+ require.Equal(t, int64(2), s.counters[switchboardGetMeetingpoint].Get())
+ require.Equal(t, int64(1), s.counters[switchboardGetMeetingpointErrors].Get())
+}
+
+func TestClearMeetingPoint_NoSuchMeetingPoint_Success(t *testing.T) {
+ unittest.LargeTest(t)
+ ctx, s := setupForTest(t)
+
+ err := s.ClearMeetingPoint(ctx, MeetingPoint{
+ PodName: podName,
+ Port: portRangeBegin,
+ })
+ require.NoError(t, err)
+}
diff --git a/machine/go/switchboard/mocks/Switchboard.go b/machine/go/switchboard/mocks/Switchboard.go
index 3943afa..1337396 100644
--- a/machine/go/switchboard/mocks/Switchboard.go
+++ b/machine/go/switchboard/mocks/Switchboard.go
@@ -14,13 +14,13 @@
mock.Mock
}
-// AddPod provides a mock function with given fields: ctx, PodName
-func (_m *Switchboard) AddPod(ctx context.Context, PodName string) error {
- ret := _m.Called(ctx, PodName)
+// AddPod provides a mock function with given fields: ctx, podName
+func (_m *Switchboard) AddPod(ctx context.Context, podName string) error {
+ ret := _m.Called(ctx, podName)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string) error); ok {
- r0 = rf(ctx, PodName)
+ r0 = rf(ctx, podName)
} else {
r0 = ret.Error(0)
}
@@ -77,6 +77,20 @@
return r0
}
+// KeepAlivePod provides a mock function with given fields: ctx, podName
+func (_m *Switchboard) KeepAlivePod(ctx context.Context, podName string) error {
+ ret := _m.Called(ctx, podName)
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func(context.Context, string) error); ok {
+ r0 = rf(ctx, podName)
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
// ListMeetingPoints provides a mock function with given fields: ctx
func (_m *Switchboard) ListMeetingPoints(ctx context.Context) ([]switchboard.MeetingPoint, error) {
ret := _m.Called(ctx)
@@ -101,15 +115,15 @@
}
// ListPods provides a mock function with given fields: ctx
-func (_m *Switchboard) ListPods(ctx context.Context) ([]string, error) {
+func (_m *Switchboard) ListPods(ctx context.Context) ([]switchboard.Pod, error) {
ret := _m.Called(ctx)
- var r0 []string
- if rf, ok := ret.Get(0).(func(context.Context) []string); ok {
+ var r0 []switchboard.Pod
+ if rf, ok := ret.Get(0).(func(context.Context) []switchboard.Pod); ok {
r0 = rf(ctx)
} else {
if ret.Get(0) != nil {
- r0 = ret.Get(0).([]string)
+ r0 = ret.Get(0).([]switchboard.Pod)
}
}
@@ -123,13 +137,13 @@
return r0, r1
}
-// RemovePod provides a mock function with given fields: ctx, PodName
-func (_m *Switchboard) RemovePod(ctx context.Context, PodName string) error {
- ret := _m.Called(ctx, PodName)
+// RemovePod provides a mock function with given fields: ctx, podName
+func (_m *Switchboard) RemovePod(ctx context.Context, podName string) error {
+ ret := _m.Called(ctx, podName)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string) error); ok {
- r0 = rf(ctx, PodName)
+ r0 = rf(ctx, podName)
} else {
r0 = ret.Error(0)
}
@@ -137,20 +151,20 @@
return r0
}
-// ReserveMeetingPoint provides a mock function with given fields: ctx, machineID, Username
-func (_m *Switchboard) ReserveMeetingPoint(ctx context.Context, machineID string, Username string) (switchboard.MeetingPoint, error) {
- ret := _m.Called(ctx, machineID, Username)
+// ReserveMeetingPoint provides a mock function with given fields: ctx, machineID, username
+func (_m *Switchboard) ReserveMeetingPoint(ctx context.Context, machineID string, username string) (switchboard.MeetingPoint, error) {
+ ret := _m.Called(ctx, machineID, username)
var r0 switchboard.MeetingPoint
if rf, ok := ret.Get(0).(func(context.Context, string, string) switchboard.MeetingPoint); ok {
- r0 = rf(ctx, machineID, Username)
+ r0 = rf(ctx, machineID, username)
} else {
r0 = ret.Get(0).(switchboard.MeetingPoint)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok {
- r1 = rf(ctx, machineID, Username)
+ r1 = rf(ctx, machineID, username)
} else {
r1 = ret.Error(1)
}
diff --git a/machine/go/switchboard/switchboard.go b/machine/go/switchboard/switchboard.go
index b922d46..28568ef 100644
--- a/machine/go/switchboard/switchboard.go
+++ b/machine/go/switchboard/switchboard.go
@@ -1,5 +1,5 @@
// Package switchboard is for keeping track of connections from machines to the
-// switchboard pods.
+// switchboard pods. See the design doc: http://go/skia-switchboard.
package switchboard
import (
@@ -8,8 +8,25 @@
"time"
)
+// ErrMachineNotFound is returned when a given machineID is not found.
var ErrMachineNotFound = errors.New("no such machine")
+// ErrNoPodsFound is returned when no pods have been registered.
+var ErrNoPodsFound = errors.New("no pods found")
+
+// Pod describes a single pod in the skia-switchboard cluster.
+type Pod struct {
+ // Name is the pod name in the kubernetes cluster.
+ Name string
+
+ // LastUpdated is updated every time Switchboard.KeepAlivePod is
+ // called, which will be done by switch-pod-monitor.
+ //
+ // The machines server will have a background process that monitors for
+ // expired Pods and removes them.
+ LastUpdated time.Time
+}
+
// MeetingPoint has a machine ID and all of the information needed on how to
// connect to that machine via SSH. This is, if a client ran the equivalent of:
//
@@ -35,7 +52,7 @@
// RPis on rack4.
Username string
- // The domain name of the machine, e.g. 'skia-rpi-001'.
+ // MachineID is the domain name of the machine, e.g. 'skia-rpi-001'.
MachineID string
// LastUpdated is updated every time Switchboard.KeepAliveMeetingPoint is
@@ -60,7 +77,7 @@
// rack4 RPIs run as root now so they can get access to the USB port from
// within the k8s container, so at least during the transition we will need
// to specify the target account name.
- ReserveMeetingPoint(ctx context.Context, machineID string, Username string) (MeetingPoint, error)
+ ReserveMeetingPoint(ctx context.Context, machineID string, username string) (MeetingPoint, error)
// ClearMeetingPoint is called by bot_config if it failed to connect to the
// switchboard or if the machine is shutting down, i.e. bot_config
@@ -80,15 +97,19 @@
// AddPod adds a new k8s pod to the list of available pods running in the
// switchboard cluster. It is called by the programming that runs on startup
// in each switchboard pod.
- AddPod(ctx context.Context, PodName string) error
+ AddPod(ctx context.Context, podName string) error
+
+ // KeepAlivePod is called by a pod periodically to indicate it
+ // is still a valid connection.
+ KeepAlivePod(ctx context.Context, podName string) error
// RemovePod removes a k8s pod from the list of available pods. It is called
// from each switchboard pod as it shuts down.
- RemovePod(ctx context.Context, PodName string) error
+ RemovePod(ctx context.Context, podName string) error
// ListPods returns a list of all the pods availble to accept connections.
// This will be used in the machines UI.
- ListPods(ctx context.Context) ([]string, error)
+ ListPods(ctx context.Context) ([]Pod, error)
// ListMeetingPoints returns all the active MeetingPoints. This will be used
// in the machines UI.