blob: cc3bccf00793005833a39f9dd431182a3bb8045e [file] [log] [blame]
package util
import "sync"
// CondMonitor implements a monitor that limits the number of threads
// per int64-id that can concurrently enter a critical section.
// This could be used like this:
// ...
// mon := NewCondInt64Monitor(5)
// ...
// defer mon.Enter(id).Release()
// ...
//
// This allows 5 threads for each value of id to enter the critical section
// which starts after the call to 'Enter' and ends when 'Release' is called.
type CondMonitor struct {
cond sync.Cond
countMap map[int64]int
nConcurrent int
}
// NewCondMonitor creates a new monitor with the given number of
// concurrent threads that can enter the critical section for every int64 value
// provided to the 'Enter' call.
func NewCondMonitor(nConcurrent int) *CondMonitor {
return &CondMonitor{
countMap: map[int64]int{},
nConcurrent: nConcurrent,
cond: sync.Cond{L: &sync.Mutex{}},
}
}
// MonitorRelease is returned by the Enter call.
type MonitorRelease func()
// Release is called to identify the end of the critical section.
func (m MonitorRelease) Release() { m() }
// Enter marks the start of a critical section. It limits the number of
// threads
func (m *CondMonitor) Enter(id int64) MonitorRelease {
// Get the lock over the entire map.
m.cond.L.Lock()
for {
// If the requested id is below the concurrency threshold continue otherwise
// wait until somebody leaves the monitor.
if m.countMap[id] < m.nConcurrent {
m.countMap[id]++
break
}
m.cond.Wait()
}
m.cond.L.Unlock()
// Return the function to leave the monitor.
return func() {
m.cond.L.Lock()
// Decrement the counter and signal the waiting threads to reexamine the counter.
m.countMap[id]--
if m.countMap[id] == 0 {
delete(m.countMap, id)
}
m.cond.L.Unlock()
m.cond.Broadcast()
}
}