blob: 060c9491be5d086b2f12dacf6e996bcd59c3a018 [file] [log] [blame]
// Package reconnectingmemcached contains a wrapper around a general memcache client. It provides
// the ability to automatically reconnect after a certain number of failures. While the connection
// is down, its APIs quickly return, allowing clients to fallback to some other mechanism.
// This design decision (instead of, for example, blocking until the connection is restored) is
// because memcached is used where performance is critical, and it is probably faster for clients
// to respond to a memcached outage like they would a cache miss.
package reconnectingmemcached
import (
"math/rand"
"sync"
"time"
"github.com/bradfitz/gomemcache/memcache"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
)
// Client is a slightly modified version of the interface on *memcache.Client. Most of the methods
// return a boolean instead of an error. That boolean indicates if the connection is up or down,
// that is, if the return value is valid or if the calling client should use a fallback.
type Client interface {
// ConnectionAvailable returns true if there is an established connection. If false is returned,
// it means the connection is being restored.
ConnectionAvailable() bool
// GetMulti returns a map filled with items that were in the cache. The boolean means "ok"
// and can be false if either there was an error or the connection is currently down.
GetMulti(keys []string) (map[string]*memcache.Item, bool)
// Ping returns an error if there is no connection or if any instance is down.
Ping() error
// Set unconditionally sets the item. It returns false if there was an error or the connection
// is currently down.
Set(i *memcache.Item) bool
}
// memcachedClient is the (partial) interface of memcache.Client, which is used for testing
// purposes.
type memcachedClient interface {
Ping() error
GetMulti(keys []string) (map[string]*memcache.Item, error)
Set(item *memcache.Item) error
}
type Options struct {
// Servers are the addresses of the servers that should be contacted with equal weight.
// See bradfitz/gomemcache/memcache.New() for more.
Servers []string
// Timeout is the socket read/write timeout. The default is 100 milliseconds.
Timeout time.Duration
// MaxIdleConnections is the maximum number of connections. It should be greater than or
// equal to the peek parallel requests. The default is 2.
MaxIdleConnections int
// AllowedFailuresBeforeHealing is the number of connection errors that will be tolerated
// before autohealing starts.
AllowedFailuresBeforeHealing int
}
type healingClientImpl struct {
opts Options
client memcachedClient // if client is nil, that's a signal we are reconnecting.
// clientFactory is used to re-generate the client if it fails. This is due to the fact that
// once a *memcached.Client starts returning errors due to a bad connection, it doesn't
// heal itself and must be recreated.
clientFactory func(Options) memcachedClient
clientMutex sync.RWMutex
numFailures int
recoveryDuration time.Duration
}
// NewClient returns a Client to talk to memcached instance(s) that will heal and re-generate
// itself with the options provided.
func NewClient(opts Options) *healingClientImpl {
if opts.AllowedFailuresBeforeHealing <= 0 {
opts.AllowedFailuresBeforeHealing = 10
}
c := memcachedFactory(opts)
return &healingClientImpl{
opts: opts,
client: c,
clientFactory: memcachedFactory,
recoveryDuration: 10 * time.Second,
}
}
// memcachedFactor returns a "real" implementation of the memcached client.
func memcachedFactory(opts Options) memcachedClient {
c := memcache.New(opts.Servers...)
c.Timeout = opts.Timeout // defaults handled from memcache client code.
c.MaxIdleConns = opts.MaxIdleConnections // defaults handled from memcache client code.
return c
}
// ConnectionAvailable returns true if the client is not nil. nil means it is being healed.
func (h *healingClientImpl) ConnectionAvailable() bool {
h.clientMutex.RLock()
defer h.clientMutex.RUnlock()
return h.client != nil
}
// GetMulti passes a call through to the underlying client (if available). If the connection
// is not available or there is an error, it returns false. Otherwise it returns the value and
// true.
func (h *healingClientImpl) GetMulti(keys []string) (map[string]*memcache.Item, bool) {
h.clientMutex.RLock()
if h.client == nil {
// currently reconnecting
h.clientMutex.RUnlock()
return nil, false
}
m, err := h.client.GetMulti(keys)
h.clientMutex.RUnlock() // need to free up the mutex before calling maybeReload
if err != nil {
sklog.Errorf("Could not get %d keys from memcached: %s", len(keys), err)
h.maybeReload()
return nil, false
}
return m, true
}
// Ping returns an error if the connection is being restored or any error from the
// underlying client.
func (h *healingClientImpl) Ping() error {
h.clientMutex.RLock()
defer h.clientMutex.RUnlock()
if h.client == nil {
return skerr.Fmt("Connection down. Reconnecting.")
}
return skerr.Wrap(h.client.Ping())
}
// Set passes through to the underlying client (if available). It returns true if the set succeeded
// or the passed in item is nil. It returns false if there was an error or the connection is down.
func (h *healingClientImpl) Set(i *memcache.Item) bool {
if i == nil {
return true // trivially true
}
h.clientMutex.RLock()
if h.client == nil {
// currently reconnecting
h.clientMutex.RUnlock()
return false
}
err := h.client.Set(i)
h.clientMutex.RUnlock() // need to free up the mutex before calling maybeReload
if err != nil {
sklog.Errorf("Could not set item with key %s to memcached: %s", i.Key, err)
h.maybeReload()
return false
}
return true
}
// maybeReload will add one to the failure count. If that brings the number of failures over the
// limit, it will remove the connection and try to reconnect after 10-20 seconds.
func (h *healingClientImpl) maybeReload() {
h.clientMutex.Lock()
defer h.clientMutex.Unlock()
h.numFailures++
// We add the h.client == nil check to make it so there's only one goroutine in charge of
// reconnecting
if h.numFailures < h.numFailures || h.client == nil {
return
}
sklog.Infof("Initiating memcached reconnection.")
h.client = nil
go func() { // spin up a background goroutine to heal the connection.
for {
// wait for a random time between recoveryDuration and 2*recoveryDuration
time.Sleep(h.recoveryDuration + time.Duration(float32(h.recoveryDuration)*rand.Float32()))
c := h.clientFactory(h.opts)
if err := c.Ping(); err != nil {
sklog.Warningf("Cannot reconnect to memcached: %s", err)
continue // go back to sleep, try again later
}
h.clientMutex.Lock()
h.client = c
h.numFailures = 0
sklog.Infof("Reconnected to memcached")
h.clientMutex.Unlock()
return
}
}()
}
var _ Client = (*healingClientImpl)(nil)