blob: 88583e4e64f6dd38789761c69620bc6f890b93c3 [file] [log] [blame]
package db
import (
"bytes"
"encoding/gob"
"sort"
"sync"
"time"
"github.com/satori/go.uuid"
"go.skia.org/infra/go/sklog"
)
// modifiedData allows subscribers to keep track of DB entries that have been
// modified. It is designed to be used with wrappers in order to store a desired
// type of data.
type modifiedData struct {
// map[subscriber_id][entry_id]gob
data map[string]map[string][]byte
// After the expiration time, subscribers are automatically removed.
expiration map[string]time.Time
// Protects data and expiration.
mtx sync.RWMutex
}
func (m *modifiedData) GetModifiedEntries(id string) (map[string][]byte, error) {
m.mtx.Lock()
defer m.mtx.Unlock()
if _, ok := m.expiration[id]; !ok {
return nil, ErrUnknownId
}
rv := m.data[id]
m.expiration[id] = time.Now().Add(MODIFIED_DATA_TIMEOUT)
delete(m.data, id)
return rv, nil
}
// clearExpiredSubscribers periodically deletes data about any subscribers that
// haven't been seen within MODIFIED_TASKS_TIMEOUT. Must be called as a
// goroutine. Returns when there are no remaining subscribers.
func (m *modifiedData) clearExpiredSubscribers() {
ticker := time.NewTicker(time.Minute)
for range ticker.C {
m.mtx.Lock()
for id, t := range m.expiration {
if time.Now().After(t) {
sklog.Warningf("Deleting expired subscriber with id %s; expiration time %s.", id, t)
delete(m.data, id)
delete(m.expiration, id)
}
}
anyLeft := len(m.expiration) > 0
if !anyLeft {
m.data = nil
m.expiration = nil
}
m.mtx.Unlock()
if !anyLeft {
break
}
}
ticker.Stop()
}
// TrackModifiedEntry indicates the given data should be returned from the next
// call to GetModifiedEntries from each subscriber.
func (m *modifiedData) TrackModifiedEntry(id string, d []byte) {
m.TrackModifiedEntries(map[string][]byte{id: d})
}
// TrackModifiedEntries is a batch version of TrackModifiedEntry. Values of
// gobs must not be modified after this call.
func (m *modifiedData) TrackModifiedEntries(gobs map[string][]byte) {
m.mtx.Lock()
defer m.mtx.Unlock()
for subId := range m.expiration {
sub, ok := m.data[subId]
if !ok {
sub = make(map[string][]byte, len(gobs))
m.data[subId] = sub
}
for entryId, gob := range gobs {
sub[entryId] = gob
}
}
}
// See docs for TaskReader.StartTrackingModifiedTasks or
// JobReader.StartTrackingModifiedJobs.
func (m *modifiedData) StartTrackingModifiedEntries() (string, error) {
m.mtx.Lock()
defer m.mtx.Unlock()
if m.expiration == nil {
// Initialize the data structure and start expiration goroutine.
m.data = map[string]map[string][]byte{}
m.expiration = map[string]time.Time{}
go m.clearExpiredSubscribers()
} else if len(m.expiration) >= MAX_MODIFIED_DATA_USERS {
return "", ErrTooManyUsers
}
id := uuid.NewV5(uuid.NewV1(), uuid.NewV4().String()).String()
m.expiration[id] = time.Now().Add(MODIFIED_DATA_TIMEOUT)
return id, nil
}
// See docs for TaskReader.StopTrackingModifiedTasks or
// JobReader.StopTrackingModifiedJobs.
func (m *modifiedData) StopTrackingModifiedEntries(id string) {
m.mtx.Lock()
defer m.mtx.Unlock()
delete(m.data, id)
delete(m.expiration, id)
}
type ModifiedTasks struct {
m modifiedData
}
// See docs for TaskReader interface.
func (m *ModifiedTasks) GetModifiedTasks(id string) ([]*Task, error) {
tasks, err := m.m.GetModifiedEntries(id)
if err != nil {
return nil, err
}
d := TaskDecoder{}
for _, g := range tasks {
if !d.Process(g) {
break
}
}
rv, err := d.Result()
if err != nil {
return nil, err
}
sort.Sort(TaskSlice(rv))
return rv, nil
}
// See docs for TaskReader interface.
func (m *ModifiedTasks) GetModifiedTasksGOB(id string) (map[string][]byte, error) {
return m.m.GetModifiedEntries(id)
}
// TrackModifiedTask indicates the given Task should be returned from the next
// call to GetModifiedTasks from each subscriber.
func (m *ModifiedTasks) TrackModifiedTask(t *Task) {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(t); err != nil {
sklog.Fatal(err)
}
m.m.TrackModifiedEntries(map[string][]byte{t.Id: buf.Bytes()})
}
// TrackModifiedTasksGOB is a batch, GOB version of TrackModifiedTask. Given a
// map from Task.Id to GOB-encoded task, it is equivalent to GOB-decoding each
// value of gobs as a Task and calling TrackModifiedTask on each one. Values of
// gobs must not be modified after this call.
func (m *ModifiedTasks) TrackModifiedTasksGOB(gobs map[string][]byte) {
m.m.TrackModifiedEntries(gobs)
}
// See docs for TaskReader interface.
func (m *ModifiedTasks) StartTrackingModifiedTasks() (string, error) {
return m.m.StartTrackingModifiedEntries()
}
// See docs for TaskReader interface.
func (m *ModifiedTasks) StopTrackingModifiedTasks(id string) {
m.m.StopTrackingModifiedEntries(id)
}
type ModifiedJobs struct {
m modifiedData
}
// See docs for JobReader interface.
func (m *ModifiedJobs) GetModifiedJobs(id string) ([]*Job, error) {
jobs, err := m.m.GetModifiedEntries(id)
if err != nil {
return nil, err
}
d := JobDecoder{}
for _, g := range jobs {
if !d.Process(g) {
break
}
}
rv, err := d.Result()
if err != nil {
return nil, err
}
sort.Sort(JobSlice(rv))
return rv, nil
}
// See docs for JobReader interface.
func (m *ModifiedJobs) GetModifiedJobsGOB(id string) (map[string][]byte, error) {
return m.m.GetModifiedEntries(id)
}
// TrackModifiedJob indicates the given Job should be returned from the next
// call to GetModifiedJobs from each subscriber.
func (m *ModifiedJobs) TrackModifiedJob(j *Job) {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(j); err != nil {
sklog.Fatal(err)
}
m.m.TrackModifiedEntries(map[string][]byte{j.Id: buf.Bytes()})
}
// TrackModifiedJobsGOB is a batch, GOB version of TrackModifiedJob. Given a
// map from Job.Id to GOB-encoded task, it is equivalent to GOB-decoding each
// value of gobs as a Job and calling TrackModifiedJob on each one. Values of
// gobs must not be modified after this call.
func (m *ModifiedJobs) TrackModifiedJobsGOB(gobs map[string][]byte) {
m.m.TrackModifiedEntries(gobs)
}
// See docs for JobReader interface.
func (m *ModifiedJobs) StartTrackingModifiedJobs() (string, error) {
return m.m.StartTrackingModifiedEntries()
}
// See docs for JobReader interface.
func (m *ModifiedJobs) StopTrackingModifiedJobs(id string) {
m.m.StopTrackingModifiedEntries(id)
}