blob: e2a95a1152a160bf4dbfae55cf8683a6e68b2d7e [file] [log] [blame]
package store
import (
"context"
"time"
gcfirestore "cloud.google.com/go/firestore"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/firestore"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/machine/go/machine"
"go.skia.org/infra/machine/go/machineserver/config"
"google.golang.org/api/iterator"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
machinesCollectionName = "machines"
appName = "machineserver"
updateTimeout = 10 * time.Second
updateRetries = 5
)
// StoreImpl implements the Store interface.
type StoreImpl struct {
firestoreClient *firestore.Client
machinesCollection *gcfirestore.CollectionRef
updateCounter metrics2.Counter
updateDataToErrorCounter metrics2.Counter
watchReceiveSnapshotCounter metrics2.Counter
watchDataToErrorCounter metrics2.Counter
listCounter metrics2.Counter
listIterFailureCounter metrics2.Counter
watchForDeletablePodsReceiveSnapshotCounter metrics2.Counter
watchForDeletablePodsDataToErrorCounter metrics2.Counter
watchForPowerCycleReceiveSnapshotCounter metrics2.Counter
watchForPowerCycleDataToErrorCounter metrics2.Counter
}
// storeDescription is how machine.Description is mapped into firestore.
//
// Some fields from machine.Description are mirrored to top level
// storeDescription fields so we can query on them.
type storeDescription struct {
// OS is a mirror of MachineDescription.Dimensions["os"].
OS []string
// OS is a mirror of MachineDescription.Dimensions["device_type"].
DeviceType []string
// OS is a mirror of MachineDescription.Dimensions["quarantined"].
Quarantined []string
// Mode is a mirror of MachineDescription.Mode.
Mode machine.Mode
// LastUpdated is a mirror of MachineDescription.LastUpdated.
LastUpdated time.Time
// ScheduledForDeletion is a mirror of MachineDescription.ScheduledForDeletion.
ScheduledForDeletion string
// RunningSwarmingTask is a mirror of MachineDescription.RunningSwarmingTask.
RunningSwarmingTask bool
// PowerCycle is a mirror of MachineDescription.PowerCycle.
PowerCycle bool
// MachineDescription is the full machine.Description. The values that are
// mirrored to fields of storeDescription are still fully stored here and
// are considered the source of truth.
MachineDescription machine.Description
}
// New returns a new instance of StoreImpl.
func New(ctx context.Context, local bool, instanceConfig config.InstanceConfig) (*StoreImpl, error) {
ts, err := auth.NewDefaultTokenSource(local, "https://www.googleapis.com/auth/datastore")
if err != nil {
return nil, skerr.Wrapf(err, "Failed to create tokensource.")
}
firestoreClient, err := firestore.NewClient(ctx, instanceConfig.Store.Project, appName, instanceConfig.Store.Instance, ts)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to create firestore client for app: %q instance: %q", appName, instanceConfig.Store.Instance)
}
return &StoreImpl{
firestoreClient: firestoreClient,
machinesCollection: firestoreClient.Collection(machinesCollectionName),
updateCounter: metrics2.GetCounter("machine_store_update"),
updateDataToErrorCounter: metrics2.GetCounter("machine_store_update_datato_error"),
watchReceiveSnapshotCounter: metrics2.GetCounter("machine_store_watch_receive_snapshot"),
watchDataToErrorCounter: metrics2.GetCounter("machine_store_watch_datato_error"),
listCounter: metrics2.GetCounter("machine_store_list"),
listIterFailureCounter: metrics2.GetCounter("machine_store_list_iter_error"),
watchForDeletablePodsReceiveSnapshotCounter: metrics2.GetCounter("machine_store_watch_for_deletable_pods_receive_snapshot"),
watchForDeletablePodsDataToErrorCounter: metrics2.GetCounter("machine_store_watch_for_deletable_pods_datato_error"),
watchForPowerCycleReceiveSnapshotCounter: metrics2.GetCounter("machine_store_watch_for_power_cycle_receive_snapshot"),
watchForPowerCycleDataToErrorCounter: metrics2.GetCounter("machine_store_watch_for_power_cycle_datato_error"),
}, nil
}
// Update implements the Store interface.
func (st *StoreImpl) Update(ctx context.Context, machineID string, txCallback TxCallback) error {
st.updateCounter.Inc(1)
docRef := st.machinesCollection.Doc(machineID)
return st.firestoreClient.RunTransaction(ctx, "store", "update", updateRetries, updateTimeout, func(ctx context.Context, tx *gcfirestore.Transaction) error {
var storeDescription storeDescription
machineDescription := machine.NewDescription()
machineDescription.Dimensions[machine.DimID] = []string{machineID}
if snap, err := tx.Get(docRef); err == nil {
if err := snap.DataTo(&storeDescription); err != nil {
st.updateDataToErrorCounter.Inc(1)
return skerr.Wrapf(err, "Failed to deserialize firestore Get response for %q", machineID)
}
machineDescription = storeToMachineDescription(storeDescription)
} else if st, ok := status.FromError(err); ok && st.Code() != codes.NotFound {
return skerr.Wrapf(err, "Failed querying firestore for %q", machineID)
}
updatedMachineDescription := txCallback(machineDescription)
updatedStoreDescription := machineDescriptionToStoreDescription(updatedMachineDescription)
return tx.Set(docRef, &updatedStoreDescription)
})
}
// Watch implements the Store interface.
func (st *StoreImpl) Watch(ctx context.Context, machineID string) <-chan machine.Description {
iter := st.machinesCollection.Doc(machineID).Snapshots(ctx)
ch := make(chan machine.Description)
go func() {
for {
snap, err := iter.Next()
if err != nil {
if ctx.Err() == context.Canceled {
sklog.Warningf("Context canceled; closing channel: %s", err)
} else if st, ok := status.FromError(err); ok && st.Code() == codes.Canceled {
sklog.Warningf("Context canceled; closing channel: %s", err)
} else {
sklog.Errorf("iter returned error; closing channel: %s", err)
}
iter.Stop()
close(ch)
return
}
if !snap.Exists() {
continue
}
var storeDescription storeDescription
if err := snap.DataTo(&storeDescription); err != nil {
sklog.Errorf("Failed to read data from snapshot: %s", err)
st.watchDataToErrorCounter.Inc(1)
continue
}
machineDescription := storeToMachineDescription(storeDescription)
st.watchReceiveSnapshotCounter.Inc(1)
ch <- machineDescription
}
}()
return ch
}
// WatchForDeletablePods implements the Store interface.
func (st *StoreImpl) WatchForDeletablePods(ctx context.Context) <-chan string {
q := st.machinesCollection.Where("ScheduledForDeletion", ">", "").Where("RunningSwarmingTask", "==", false)
ch := make(chan string)
go func() {
defer close(ch)
for qsnap := range firestore.QuerySnapshotChannel(ctx, q) {
for {
snap, err := qsnap.Documents.Next()
if err == iterator.Done {
break
}
if err != nil {
sklog.Errorf("Failed to read document snapshot: %s", err)
}
var storeDescription storeDescription
if err := snap.DataTo(&storeDescription); err != nil {
sklog.Errorf("Failed to read data from snapshot: %s", err)
st.watchForDeletablePodsDataToErrorCounter.Inc(1)
continue
}
machineDescription := storeToMachineDescription(storeDescription)
st.watchForDeletablePodsReceiveSnapshotCounter.Inc(1)
ch <- machineDescription.PodName
}
}
}()
return ch
}
// WatchForPowerCycle implements the Store interface.
func (st *StoreImpl) WatchForPowerCycle(ctx context.Context) <-chan string {
q := st.machinesCollection.Where("PowerCycle", "==", true).Where("RunningSwarmingTask", "==", false)
ch := make(chan string)
go func() {
defer close(ch)
for qsnap := range firestore.QuerySnapshotChannel(ctx, q) {
for {
snap, err := qsnap.Documents.Next()
if err == iterator.Done {
break
}
if err != nil {
sklog.Errorf("Failed to read document snapshot: %s", err)
}
var storeDescription storeDescription
if err := snap.DataTo(&storeDescription); err != nil {
sklog.Errorf("Failed to read data from snapshot: %s", err)
st.watchForPowerCycleDataToErrorCounter.Inc(1)
continue
}
machineDescription := storeToMachineDescription(storeDescription)
st.watchForPowerCycleReceiveSnapshotCounter.Inc(1)
machineID := machineDescription.Dimensions[machine.DimID][0]
err = st.Update(ctx, machineID, func(previous machine.Description) machine.Description {
ret := previous.Copy()
ret.PowerCycle = false
return ret
})
if err != nil {
sklog.Errorf("Failed to update machine.Description PowerCycle: %s", err)
// Just log the error, still powercycle the machine.
}
ch <- machineID
}
}
}()
return ch
}
// List implements the Store interface.
func (st *StoreImpl) List(ctx context.Context) ([]machine.Description, error) {
st.listCounter.Inc(1)
ret := []machine.Description{}
iter := st.machinesCollection.Documents(ctx)
for {
snap, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
st.listIterFailureCounter.Inc(1)
return nil, skerr.Wrapf(err, "List failed to read description.")
}
var storeDescription storeDescription
if err := snap.DataTo(&storeDescription); err != nil {
st.listIterFailureCounter.Inc(1)
sklog.Errorf("Failed to read data from snapshot: %s", err)
continue
}
machineDescription := storeToMachineDescription(storeDescription)
ret = append(ret, machineDescription)
}
return ret, nil
}
func machineDescriptionToStoreDescription(m machine.Description) storeDescription {
return storeDescription{
OS: m.Dimensions[machine.DimOS],
DeviceType: m.Dimensions[machine.DimDeviceType],
Quarantined: m.Dimensions[machine.DimQuarantined],
Mode: m.Mode,
LastUpdated: m.LastUpdated,
ScheduledForDeletion: m.ScheduledForDeletion,
RunningSwarmingTask: m.RunningSwarmingTask,
PowerCycle: m.PowerCycle,
MachineDescription: m,
}
}
func storeToMachineDescription(s storeDescription) machine.Description {
return s.MachineDescription
}
// Affirm that StoreImpl implements the Store interface.
var _ Store = (*StoreImpl)(nil)