blob: 2fd0430a6312b6da4380662b59a425fd328c9be7 [file] [log] [blame]
package recentschannel
import (
"fmt"
"sync"
)
// recentschannel.Ch is a buffered channel on which sending never blocks (though receiving still
// may). A Send() to a full Ch evicts the oldest item in the buffer, leaving the most recent ones.
//
// recentschannel.Ch is useful for receiving nondeterministic heartbeats, such as results of network
// polls, which cannot be replaced with Tickers without incurring latency. Unlike with normal
// channels, there is no risk that a heartbeat sender will block on a full channel until the
// listener completes some possibly lengthy work. The advantage over ordinary nonblocking sends is
// that the receiver sees one of the newest values (THE newest if channel capacity is 1) rather than
// the oldest.
type Ch[T any] struct {
ch chan T
mutex sync.Mutex
}
func New[T any](size int) *Ch[T] {
if size < 1 {
panic(fmt.Sprintf("recentschannel.Ch size must be at least 1; got %d.", size))
// Otherwise, we would necessarily block on sends.
}
return &Ch[T]{
ch: make(chan T, size),
}
}
func (t *Ch[T]) Send(value T) {
t.mutex.Lock()
defer t.mutex.Unlock()
if len(t.ch) == cap(t.ch) {
// Channel full. Dump 1 entry on the floor to make room for new one. Don't block if somebody
// else beat us to it.
select {
case <-t.ch:
default:
}
}
t.ch <- value
}
func (t *Ch[T]) Recv() <-chan T {
return t.ch
}