blob: 6c46a87026c7d8aca0b95384f8c68d0661d9dbba [file] [log] [blame]
// Package targetconnect initiates and maintains a connection from
// a test machine to a switchboard pod. See https://go/skia-switchboard.
package targetconnect
import (
"context"
"sync"
"time"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/machine/go/machine/store"
"go.skia.org/infra/machine/go/switchboard"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp" // Add support for GCP auth to all kubernetes codepaths.
)
const defaultRetryDelay = time.Second
// RevPortForward is the interface of an object that initiates a reverse
// port-forward into a switchboard pod.
type RevPortForward interface {
Start(ctx context.Context, podName string, port int) error
}
// Connection that can initiate and maintain a connection from a target machine
// into the switchboard cluster.
type Connection struct {
switchboard switchboard.Switchboard
machineStore store.Store
revPortForward RevPortForward
hostname string
username string
meetingPointLiveness metrics2.Liveness
stepsCounter metrics2.Counter
mutex sync.RWMutex // protects runningTest.
runningTest bool
}
// New return a new *connection that can initiate and maintain a connection from
// a target machine into the switchboard cluster.
func New(switchboard switchboard.Switchboard, revportforward RevPortForward, machineStore store.Store, hostname, username string) *Connection {
tags := map[string]string{
"hostname": hostname,
"username": username,
}
return &Connection{
switchboard: switchboard,
machineStore: machineStore,
revPortForward: revportforward,
hostname: hostname,
username: username,
meetingPointLiveness: metrics2.NewLiveness("machine_targetconnect_keep_alive_meeting_point", tags),
stepsCounter: metrics2.GetCounter("machine_targetconnect_steps", tags),
}
}
func (c *Connection) isRunningTest() bool {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.runningTest
}
// Start a connection to a switchboard pod, while keeping Switchboard up to date
// on the status of the connetion.
//
// Start does not return, unless the passed in Context is cancelled.
func (c *Connection) Start(ctx context.Context) error {
ticker := time.NewTicker(switchboard.MeetingPointKeepAliveDuration)
// Keep track if this machine is running a test.
go func() {
machineCh := c.machineStore.Watch(ctx, c.hostname)
for {
select {
case <-ctx.Done():
return
case desc := <-machineCh:
c.mutex.Lock()
c.runningTest = desc.RunningSwarmingTask
c.mutex.Unlock()
}
}
}()
for {
c.stepsCounter.Inc(1)
c.singleStep(ctx, ticker, defaultRetryDelay)
if err := ctx.Err(); err != nil {
return err
}
}
}
// singleStep launches two Go routines, one that creates the reverse port
// forward, and another that keeps Switchboard up to date. If the reverse port
// forward terminates, or the passed in Context is cancelled, then both Go
// routines terminate and the function returns.
func (c *Connection) singleStep(ctx context.Context, ticker *time.Ticker, sleepDurationOnReserveFailure time.Duration) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
mp, err := c.switchboard.ReserveMeetingPoint(ctx, c.hostname, c.username)
if err != nil {
time.Sleep(sleepDurationOnReserveFailure)
return
}
// We want to wait until both Go routines finish.
var wg sync.WaitGroup
wg.Add(2)
// This Go routine establishes the reverse port forward.
go func() {
defer wg.Done()
// If this Go routine exits, make sure the other Go routine also exits
// by cancelling the Context. Note that we are only cancelling the
// context we created at the top of singleStep, not the one passed into
// Start().
defer cancel()
// Only returns on error or if the Context was cancelled.
err := c.revPortForward.Start(ctx, mp.PodName, mp.Port)
if err != nil {
sklog.Warningf("targetconnect revportforward error: %s", err)
}
// We use a background context because the passed in context might get
// cancelled and if it does we still need to call ClearMeetingPoint with
// an uncancelled Context.
err = c.switchboard.ClearMeetingPoint(context.Background(), mp)
if err != nil {
sklog.Errorf("targetconnect failed to clear meeting point: %s", err)
}
}()
// This Go routine periodically calls KeepAliveMeetingPoint.
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !c.isRunningTest() && !c.switchboard.IsValidPod(ctx, mp.PodName) {
sklog.Infof("pod is no longer valid, exiting for force reconnect: %q", mp.PodName)
cancel()
return
}
err := c.switchboard.KeepAliveMeetingPoint(ctx, mp)
if err != nil {
sklog.Errorf("targetconnect KeepAliveMeetingPoint failed: %s", err)
}
c.meetingPointLiveness.Reset()
}
}
}()
wg.Wait()
}