blob: 47ef6bdb82268c70fd19508f6c7c59259ac63ef7 [file] [log] [blame]
package rtcache
import (
"container/heap"
"sync"
"time"
lru "github.com/hashicorp/golang-lru"
ttlcache "github.com/robfig/go-cache"
)
// TODO(stephana): Add the ability to purge items from the cache and
// expunge item from the error cache. Remove DEFAULT_CACHESIZE when
// we have a way to expung items from the cache.
const (
// Duration to cache an error response.
DEFAULT_ERRCACHE_EXPIRATION_TIME = time.Minute * 30
// Interval at which the errcache is cleared of expired entries.
ERRCACHE_CLEANUP_TIME = time.Minute * 5
)
// MemReadThroughCache implements the ReadThroughCache interface.
type MemReadThroughCache struct {
workerFn ReadThroughFunc // worker function to create the items.
cache *lru.Cache // caches the items in RAM.
errCache *ttlcache.Cache // caches errors for a limited time.
pQ *priorityQueue // priority queue to order item generation.
pqItemLookup map[string]*workItem // lookup items by id in pQ.
inProgress map[string]*workItem // items that are currently being generated.
mutex sync.Mutex // protecs all members of this instance.
emptyCond *sync.Cond // used to synchronize workers when the queue is empty.
finishedCh chan bool // closing this signals go-routines to shut down.
wg sync.WaitGroup // allows to synchronize go-routines during shutdown.
activeWorkerCh chan bool // records the workers that are currently running.
}
// New returns a new instance of ReadThroughCache that is stored in RAM.
// nWorkers defines the number of concurrent workers that call wokerFn when
// requested items are not in RAM.
func New(workerFn ReadThroughFunc, maxSize int, nWorkers int) (ReadThroughCache, error) {
// if maxSize is <= 0 then we don't cache at all. But lru.Cache will not
// limit the cache if the size is 0. So we cache 1 element.
if maxSize <= 0 {
maxSize = 1
}
lruCache, err := lru.New(maxSize)
if err != nil {
return nil, err
}
ret := &MemReadThroughCache{
workerFn: workerFn,
cache: lruCache,
errCache: ttlcache.New(DEFAULT_ERRCACHE_EXPIRATION_TIME, ERRCACHE_CLEANUP_TIME),
pQ: &priorityQueue{},
inProgress: map[string]*workItem{},
pqItemLookup: map[string]*workItem{},
finishedCh: make(chan bool),
activeWorkerCh: make(chan bool, nWorkers),
}
ret.emptyCond = sync.NewCond(&ret.mutex)
ret.startWorker()
return ret, nil
}
// Get implements the ReadThroughCache interface.
func (m *MemReadThroughCache) Get(priority int64, id string) (interface{}, error) {
ret, err, resultCh := m.getOrEnqueue(priority, id)
if (err != nil) || (ret != nil) {
return ret, err
}
// Wait for the result.
ret = <-resultCh
if err, ok := ret.(error); ok {
return nil, err
}
return ret, nil
}
// Keys implements the ReadThroughCache interface.
func (m *MemReadThroughCache) Keys() []string {
m.mutex.Lock()
keys := m.cache.Keys()
m.mutex.Unlock()
// Convert to strings.
ret := make([]string, len(keys))
for idx, key := range keys {
ret[idx] = key.(string)
}
return ret
}
// Remove implements the ReadThroughCache interface.
func (m *MemReadThroughCache) Remove(ids []string) {
m.mutex.Lock()
defer m.mutex.Unlock()
for _, id := range ids {
m.cache.Remove(id)
}
}
// getOrEnqueue retrieves the desired item from the cache or schedules that it be calculated.
// The returned channel can then be used to wait for the result.
func (m *MemReadThroughCache) getOrEnqueue(priority int64, id string) (interface{}, error, chan interface{}) {
m.mutex.Lock()
defer m.mutex.Unlock()
// Check if it's in the cache.
if result, ok := m.cache.Get(id); ok {
return result, nil, nil
}
// Check if it's in the error cache.
if err, ok := m.errCache.Get(id); ok {
return nil, err.(error), nil
}
// Check if it's in already in progress, if not add it to the work queue.
resultCh := make(chan interface{})
if wi, ok := m.inProgress[id]; ok {
wi.resultChans = append(wi.resultChans, resultCh)
} else {
m.enqueue(id, priority, resultCh)
}
return nil, nil, resultCh
}
// enqueue adds to given item to the priority queue. This assumes that the
// caller currently holds the mutex.
func (m *MemReadThroughCache) enqueue(id string, priority int64, resultCh chan interface{}) {
// if the items exists then update the itme.
if found, ok := m.pqItemLookup[id]; ok {
found.resultChans = append(found.resultChans, resultCh)
if found.priority != priority {
found.priority = priority
heap.Fix(m.pQ, found.idx)
}
return
}
item := &workItem{
id: id,
priority: priority,
resultChans: []chan interface{}{resultCh},
}
heap.Push(m.pQ, item)
m.pqItemLookup[id] = item
m.emptyCond.Signal()
}
// dequeue returns the next workItem. It blocks until an item is available.
// The caller must NOT hold the mutex when calling. Moves the found item
// to inProgres table. If the finishedCh is closed this function will
// return nil.
func (m *MemReadThroughCache) dequeue() *workItem {
m.mutex.Lock()
defer m.mutex.Unlock()
for len(*m.pQ) == 0 {
if m.finished() {
return nil
}
m.emptyCond.Wait()
}
ret := heap.Pop(m.pQ).(*workItem)
delete(m.pqItemLookup, ret.id)
m.inProgress[ret.id] = ret
return ret
}
// saveResult stores the given result in the cache. It also notifies the
// any waiting calls to Get(...) that the results are ready.
func (m *MemReadThroughCache) saveResult(wi *workItem, result interface{}, err error) {
m.mutex.Lock()
if err != nil {
m.errCache.Set(wi.id, err, DEFAULT_ERRCACHE_EXPIRATION_TIME)
result = err
} else {
m.cache.Add(wi.id, result)
}
delete(m.inProgress, wi.id)
m.mutex.Unlock()
for _, ch := range wi.resultChans {
ch <- result
}
}
// finished returns true if the finishedCh was closed. Indicating that
// all go-routines should shut down.
func (m *MemReadThroughCache) finished() bool {
select {
case <-m.finishedCh:
return true
default:
}
return false
}
// Terminates all go routines and waits until they terminate. Used for testing.
func (m *MemReadThroughCache) shutdown() {
close(m.finishedCh)
m.emptyCond.Broadcast()
m.wg.Wait()
}
// startWorker starts a background process that calculates cache values when
// requested while not exceeding the configured number of workers.
func (m *MemReadThroughCache) startWorker() {
m.wg.Add(1)
go func() {
defer m.wg.Done()
for {
// Allocate a slot int he active workers channel.
select {
case <-m.finishedCh:
return
case m.activeWorkerCh <- true:
}
wi := m.dequeue()
if wi != nil {
// Start a go-routine to calculate the task.
m.wg.Add(1)
go func(wi *workItem) {
defer m.wg.Done()
ret, err := m.workerFn(wi.priority, wi.id)
m.saveResult(wi, ret, err)
<-m.activeWorkerCh
}(wi)
}
}
}()
}
// Warm implements the ReadThroughCache interface.
func (m *MemReadThroughCache) Warm(priority int64, id string) error {
_, err := m.Get(priority, id)
return err
}
// Contains implements the ReadThroughCache interface.
func (m *MemReadThroughCache) Contains(id string) bool {
m.mutex.Lock()
defer m.mutex.Unlock()
_, ok := m.cache.Get(id)
return ok
}
// workItem is used to control calls to workerFn when an item is not
// in memory. The priority field defines it's position in the priority
// queueu.
type workItem struct {
id string // id of the item that needs to be retrieved.
idx int // index of the item in the priority queue.
priority int64 // priority of the item.
resultChans []chan interface{} // waiting Get(...) calls that need to be notified.
}
// priorityQueue implements heap.Interface.
type priorityQueue []*workItem
// implement the sort.Interface portion of heap.Interface.
func (p *priorityQueue) Len() int { return len(*p) }
func (p *priorityQueue) Less(i, j int) bool { return (*p)[i].priority < (*p)[j].priority }
func (p *priorityQueue) Swap(i, j int) {
(*p)[i], (*p)[j] = (*p)[j], (*p)[i]
(*p)[i].idx = i
(*p)[j].idx = j
}
// Push implements heap.Interface.
func (p *priorityQueue) Push(x interface{}) {
item := x.(*workItem)
item.idx = len(*p)
*p = append(*p, item)
}
// Push implements heap.Interface.
func (p *priorityQueue) Pop() interface{} {
n := len(*p)
ret := (*p)[n-1]
*p = (*p)[:n-1]
return ret
}