blob: faa401a1b197fdfdf869953c7155b68dc1dccf3a [file] [log] [blame]
package switchboard
import (
"context"
"fmt"
"math/rand"
"time"
gcfirestore "cloud.google.com/go/firestore"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/firestore"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/now"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/machine/go/machineserver/config"
"google.golang.org/api/iterator"
)
const (
meetingPointsCollectionName = "meetingpoints"
podsCollectionName = "pods"
appName = "machineserver"
updateTimeout = 10 * time.Second
updateRetries = 5
reserveRetries = 20
// We use a range of ports from 10000-20000 when pick ports on
// skia-switchboard pods, which is a large range to avoid collissions.
portRangeBegin = 10_000
portRangeEnd = 20_000
)
type metricName string
const (
switchboardReserveMeetingpoint = "switchboard_reserve_meetingpoint"
switchboardReserveMeetingpointErrors = "switchboard_reserve_meetingpoint_errors"
switchboardClearMeetingpoint = "switchboard_clear_meetingpoint"
switchboardGetMeetingpoint = "switchboard_get_meetingpoint"
switchboardGetMeetingpointErrors = "switchboard_get_meetingpoint_errors"
switchboardKeepAliveMeetingpoint = "switchboard_keepalive_meetingpoint"
switchboardKeepAliveMeetingpointErrors = "switchboard_keepalive_meetingpoint_errors"
switchboardListMeetingpoint = "switchboard_list_meetingpoint"
switchboardAddPod = "switchboard_add_pod"
switchboardRemovePod = "switchboard_remove_pod"
switchboardKeepAlivePod = "switchboard_keepalive_pod"
switchboardKeepAlivePodErrors = "switchboard_keepalive_pod_errors"
switchboardListPod = "switchboard_list_pod"
switchboardListPodErrors = "switchboard_list_pod_errors"
)
var (
allMetricNames = []metricName{
switchboardReserveMeetingpoint,
switchboardReserveMeetingpointErrors,
switchboardClearMeetingpoint,
switchboardGetMeetingpoint,
switchboardGetMeetingpointErrors,
switchboardKeepAliveMeetingpoint,
switchboardKeepAliveMeetingpointErrors,
switchboardListMeetingpoint,
switchboardAddPod,
switchboardRemovePod,
switchboardKeepAlivePod,
switchboardKeepAlivePodErrors,
switchboardListPod,
switchboardListPodErrors,
}
)
// For boths Pods and MeetingPoints we will create parallel structs
// podDesciption and meetingPointDescription that are used to store their values
// in the datastore. This will make it easier to have them diverge in the future
// if that's needed.
// podDescription is the struct that gets stored in the pods Collection.
type podDescription struct {
LastUpdated time.Time
}
func descriptionToPod(d podDescription, name string) Pod {
return Pod{
Name: name,
LastUpdated: d.LastUpdated,
}
}
// meetingPointDescription is the struct that gets stored in the meetingpoints Collection.
type meetingPointDescription struct {
MeetingPoint
}
// The unique id for each MeetingPoint in the datastore is the PodName and the
// Port.
func (m meetingPointDescription) id() string {
return fmt.Sprintf("%s:%d", m.PodName, m.Port)
}
func descriptionToMeetingPoint(m meetingPointDescription) MeetingPoint {
return m.MeetingPoint
}
func meetingPointToDescription(m MeetingPoint) meetingPointDescription {
return meetingPointDescription{m}
}
// switchboardImpl implements Switchboard using Cloud Firestore as a backend.
type switchboardImpl struct {
firestoreClient *firestore.Client
meetingPointsCollection *gcfirestore.CollectionRef
podsCollection *gcfirestore.CollectionRef
counters map[metricName]metrics2.Counter
}
// New returns a new instance of switchboardImpl.
func New(ctx context.Context, local bool, instanceConfig config.InstanceConfig) (*switchboardImpl, error) {
ts, err := auth.NewDefaultTokenSource(local, "https://www.googleapis.com/auth/datastore")
if err != nil {
return nil, skerr.Wrapf(err, "Failed to create tokensource.")
}
firestoreClient, err := firestore.NewClient(ctx, instanceConfig.Store.Project, appName, instanceConfig.Store.Instance, ts)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to create firestore client for app: %q instance: %q", appName, instanceConfig.Store.Instance)
}
counters := map[metricName]metrics2.Counter{}
for _, metricName := range allMetricNames {
counters[metricName] = metrics2.GetCounter(string(metricName))
}
return &switchboardImpl{
firestoreClient: firestoreClient,
meetingPointsCollection: firestoreClient.Collection(meetingPointsCollectionName),
podsCollection: firestoreClient.Collection(podsCollectionName),
counters: counters,
}, nil
}
// ReserveMeetingPoint implements Switchboard
func (s *switchboardImpl) ReserveMeetingPoint(ctx context.Context, machineID string, username string) (MeetingPoint, error) {
s.counters[switchboardReserveMeetingpoint].Inc(1)
ret := MeetingPoint{
LastUpdated: now.Now(ctx),
Username: username,
MachineID: machineID,
}
// Loop until we find an open spot for a reservation.
//
// We use a range of 10,000 ports per pod, and go through this loop 20
// times, so even if we attach 100 machines to a single pod (which is a
// lot), we have a 100/10,000 = 1/100 chance of a collision per loop, and a
// chance of never finding an open spot is:
// 1.0 - (1/100)^20
// = 1.0 - (.01)^20
// = 1 - 1e-40
// = 99.99[35 more nines go here]% chance of success.
for i := 0; i < reserveRetries; i++ {
pods, err := s.ListPods(ctx)
if err != nil {
s.counters[switchboardReserveMeetingpointErrors].Inc(1)
sklog.Errorf("Reserve failed to load the available pods: %s", err)
continue
}
if len(pods) == 0 {
sklog.Error("No pods available")
s.counters[switchboardReserveMeetingpointErrors].Inc(1)
continue
}
// Choose a pod at random.
pod := pods[rand.Intn(len(pods))]
// Choose a port at random.
port := rand.Intn(portRangeEnd-portRangeBegin) + portRangeBegin
ret.PodName = pod.Name
ret.Port = port
desc := meetingPointToDescription(ret)
docRef := s.meetingPointsCollection.Doc(desc.id())
_, err = s.firestoreClient.Create(ctx, docRef, &desc, updateRetries, updateTimeout)
if err != nil {
sklog.Errorf("Failed to create: %s", err)
s.counters[switchboardReserveMeetingpointErrors].Inc(1)
continue
}
return ret, nil
}
return ret, ErrNoPodsFound
}
// ClearMeetingPoint implements Switchboard
func (s *switchboardImpl) ClearMeetingPoint(ctx context.Context, meetingPoint MeetingPoint) error {
s.counters[switchboardClearMeetingpoint].Inc(1)
_, err := s.meetingPointsCollection.Doc(meetingPointToDescription(meetingPoint).id()).Delete(ctx)
return err
}
// GetMeetingPoint implements Switchboard
func (s *switchboardImpl) GetMeetingPoint(ctx context.Context, machineID string) (MeetingPoint, error) {
s.counters[switchboardGetMeetingpoint].Inc(1)
iter := s.meetingPointsCollection.Where("MachineID", "==", machineID).OrderBy("LastUpdated", gcfirestore.Desc).Documents(ctx)
var ret MeetingPoint
snap, err := iter.Next()
if err == iterator.Done {
s.counters[switchboardGetMeetingpointErrors].Inc(1)
return ret, ErrMachineNotFound
}
if err != nil {
s.counters[switchboardGetMeetingpointErrors].Inc(1)
return ret, skerr.Wrapf(err, "List failed to read description.")
}
var desc meetingPointDescription
if err := snap.DataTo(&desc); err != nil {
s.counters[switchboardGetMeetingpointErrors].Inc(1)
sklog.Errorf("Failed to read data from snapshot: %s", err)
return ret, skerr.Wrapf(err, "List failed to load description.")
}
ret = descriptionToMeetingPoint(desc)
return ret, nil
}
// KeepAliveMeetingPoint implements Switchboard
func (s *switchboardImpl) KeepAliveMeetingPoint(ctx context.Context, meetingPoint MeetingPoint) error {
s.counters[switchboardKeepAliveMeetingpoint].Inc(1)
desc := meetingPointToDescription(meetingPoint)
docRef := s.meetingPointsCollection.Doc(desc.id())
return s.firestoreClient.RunTransaction(ctx, "meetingpoints", "keepalive", updateRetries, updateTimeout, func(ctx context.Context, tx *gcfirestore.Transaction) error {
snap, err := tx.Get(docRef)
if err != nil {
s.counters[switchboardKeepAliveMeetingpointErrors].Inc(1)
return skerr.Wrapf(err, "Failed querying firestore for %q", desc.id())
}
if err := snap.DataTo(&desc); err != nil {
s.counters[switchboardKeepAliveMeetingpointErrors].Inc(1)
return skerr.Wrapf(err, "Failed to deserialize firestore Get response for %q", desc.id())
}
desc.LastUpdated = now.Now(ctx)
return tx.Set(docRef, &desc)
})
}
// AddPod implements Switchboard
func (s *switchboardImpl) AddPod(ctx context.Context, podName string) error {
s.counters[switchboardAddPod].Inc(1)
docRef := s.podsCollection.Doc(podName)
return s.firestoreClient.RunTransaction(ctx, "pods", "add", updateRetries, updateTimeout, func(ctx context.Context, tx *gcfirestore.Transaction) error {
podDescription := podDescription{
LastUpdated: now.Now(ctx),
}
return tx.Create(docRef, &podDescription)
})
}
// KeepAlivePod implements Switchboard
func (s *switchboardImpl) KeepAlivePod(ctx context.Context, podName string) error {
s.counters[switchboardKeepAlivePod].Inc(1)
docRef := s.podsCollection.Doc(podName)
return s.firestoreClient.RunTransaction(ctx, "pods", "keepalive", updateRetries, updateTimeout, func(ctx context.Context, tx *gcfirestore.Transaction) error {
var podDescription podDescription
snap, err := tx.Get(docRef)
if err != nil {
s.counters[switchboardKeepAlivePodErrors].Inc(1)
return skerr.Wrapf(err, "Failed querying firestore for %q", podName)
}
if err := snap.DataTo(&podDescription); err != nil {
s.counters[switchboardKeepAlivePodErrors].Inc(1)
return skerr.Wrapf(err, "Failed to deserialize firestore Get response for %q", podName)
}
podDescription.LastUpdated = now.Now(ctx)
return tx.Set(docRef, &podDescription)
})
}
// RemovePod implements Switchboard
func (s *switchboardImpl) RemovePod(ctx context.Context, podName string) error {
s.counters[switchboardRemovePod].Inc(1)
_, err := s.podsCollection.Doc(podName).Delete(ctx)
return err
}
// ListPods implements Switchboard
func (s *switchboardImpl) ListPods(ctx context.Context) ([]Pod, error) {
s.counters[switchboardListPod].Inc(1)
ret := []Pod{}
iter := s.podsCollection.Documents(ctx)
for {
snap, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
s.counters[switchboardListPodErrors].Inc(1)
return nil, skerr.Wrapf(err, "List failed to read description.")
}
var podDescription podDescription
if err := snap.DataTo(&podDescription); err != nil {
s.counters[switchboardListPodErrors].Inc(1)
sklog.Errorf("Failed to read data from snapshot: %s", err)
continue
}
ret = append(ret, descriptionToPod(podDescription, snap.Ref.ID))
}
return ret, nil
}
// ListMeetingPoints implements Switchboard
func (s *switchboardImpl) ListMeetingPoints(ctx context.Context) ([]MeetingPoint, error) {
panic("unimplemented")
}
// Assert that switchboardImpl implementst the Switchboard interface.
var _ Switchboard = (*switchboardImpl)(nil)