blob: 122f7bcbec56b095f071306764599b4329f7b9eb [file] [log] [blame]
package memory
import (
"fmt"
"sync"
"time"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/db/modified"
"go.skia.org/infra/task_scheduler/go/types"
)
// CommentBox implements CommentDB with in-memory storage.
//
// When created via NewCommentBoxWithPersistence, CommentBox will persist the
// in-memory representation on every change using the provided writer function.
type CommentBox struct {
db.ModifiedComments
// mtx protects comments.
mtx sync.RWMutex
// comments is map[repo_name]*types.RepoComments.
comments map[string]*types.RepoComments
// writer is called to persist comments after every change.
writer func(map[string]*types.RepoComments) error
}
// NewCommentBoxWithPersistence creates a CommentBox that is initialized with
// init and sends the updated in-memory representation to writer after each
// change. The value of init and the argument to writer is
// map[repo_name]*types.RepoComments. init must not be modified by the caller. writer
// must not call any methods of CommentBox. writer may return an error to
// prevent a change from taking effect.
func NewCommentBoxWithPersistence(modComments db.ModifiedComments, init map[string]*types.RepoComments, writer func(map[string]*types.RepoComments) error) *CommentBox {
if modComments == nil {
modComments = &modified.ModifiedCommentsImpl{}
}
return &CommentBox{
ModifiedComments: modComments,
comments: init,
writer: writer,
}
}
// See documentation for CommentDB.GetCommentsForRepos.
func (b *CommentBox) GetCommentsForRepos(repos []string, from time.Time) ([]*types.RepoComments, error) {
b.mtx.RLock()
defer b.mtx.RUnlock()
rv := make([]*types.RepoComments, len(repos))
for i, repo := range repos {
if rc, ok := b.comments[repo]; ok {
rv[i] = rc.Copy()
} else {
rv[i] = &types.RepoComments{Repo: repo}
}
}
return rv, nil
}
// write calls b.writer with comments if non-null.
func (b *CommentBox) write() error {
if b.writer == nil {
return nil
}
return b.writer(b.comments)
}
// getRepoComments returns the initialized *types.RepoComments for the given repo.
func (b *CommentBox) getRepoComments(repo string) *types.RepoComments {
if b.comments == nil {
b.comments = make(map[string]*types.RepoComments, 1)
}
rc, ok := b.comments[repo]
if !ok {
rc = &types.RepoComments{
Repo: repo,
TaskComments: map[string]map[string][]*types.TaskComment{},
TaskSpecComments: map[string][]*types.TaskSpecComment{},
CommitComments: map[string][]*types.CommitComment{},
}
b.comments[repo] = rc
}
return rc
}
// putTaskComment validates c and adds c to b.comments, or returns
// db.ErrAlreadyExists if a different comment has the same ID fields. Assumes b.mtx
// is write-locked.
func (b *CommentBox) putTaskComment(c *types.TaskComment) error {
if c.Repo == "" || c.Revision == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
return fmt.Errorf("TaskComment missing required fields. %#v", c)
}
rc := b.getRepoComments(c.Repo)
nameMap, ok := rc.TaskComments[c.Revision]
if !ok {
nameMap = map[string][]*types.TaskComment{}
rc.TaskComments[c.Revision] = nameMap
}
cSlice := nameMap[c.Name]
// TODO(benjaminwagner): Would using utilities in the sort package make this
// cleaner?
if len(cSlice) > 0 {
// Assume comments normally inserted at the end.
insert := 0
for i := len(cSlice) - 1; i >= 0; i-- {
if cSlice[i].Timestamp.Equal(c.Timestamp) {
if *cSlice[i] == *c {
return nil
} else {
return db.ErrAlreadyExists
}
} else if cSlice[i].Timestamp.Before(c.Timestamp) {
insert = i + 1
break
}
}
// Ensure capacity for another comment and move any comments after the
// insertion point.
cSlice = append(cSlice, nil)
copy(cSlice[insert+1:], cSlice[insert:])
cSlice[insert] = c.Copy()
} else {
cSlice = []*types.TaskComment{c.Copy()}
}
nameMap[c.Name] = cSlice
return nil
}
// deleteTaskComment validates c, then finds and removes a comment matching c's
// ID fields, returning the comment if found. Assumes b.mtx is write-locked.
func (b *CommentBox) deleteTaskComment(c *types.TaskComment) (*types.TaskComment, error) {
if c.Repo == "" || c.Revision == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
return nil, fmt.Errorf("TaskComment missing required fields. %#v", c)
}
if rc, ok := b.comments[c.Repo]; ok {
if cSlice, ok := rc.TaskComments[c.Revision][c.Name]; ok {
// Assume linear search is fast.
for i, existing := range cSlice {
if existing.Timestamp.Equal(c.Timestamp) {
if len(cSlice) > 1 {
rc.TaskComments[c.Revision][c.Name] = append(cSlice[:i], cSlice[i+1:]...)
} else {
delete(rc.TaskComments[c.Revision], c.Name)
if len(rc.TaskComments[c.Revision]) == 0 {
delete(rc.TaskComments, c.Revision)
}
}
return existing, nil
}
}
}
}
return nil, nil
}
// See documentation for CommentDB.PutTaskComment.
func (b *CommentBox) PutTaskComment(c *types.TaskComment) error {
b.mtx.Lock()
defer b.mtx.Unlock()
if err := b.putTaskComment(c); err != nil {
return err
}
if err := b.write(); err != nil {
// If write returns an error, we must revert to previous.
if _, delErr := b.deleteTaskComment(c); delErr != nil {
sklog.Warningf("Unexpected error: %s", delErr)
}
return err
}
b.TrackModifiedTaskComment(c)
return nil
}
// See documentation for CommentDB.DeleteTaskComment.
func (b *CommentBox) DeleteTaskComment(c *types.TaskComment) error {
b.mtx.Lock()
defer b.mtx.Unlock()
existing, err := b.deleteTaskComment(c)
if err != nil {
return err
}
if existing != nil {
if err := b.write(); err != nil {
// If write returns an error, we must revert to previous.
if putErr := b.putTaskComment(existing); putErr != nil {
sklog.Warningf("Unexpected error: %s", putErr)
}
return err
}
deleted := true
c.Deleted = &deleted
b.TrackModifiedTaskComment(c)
}
return nil
}
// putTaskSpecComment validates c and adds c to b.comments, or returns
// db.ErrAlreadyExists if a different comment has the same ID fields. Assumes b.mtx
// is write-locked.
func (b *CommentBox) putTaskSpecComment(c *types.TaskSpecComment) error {
if c.Repo == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
return fmt.Errorf("TaskSpecComment missing required fields. %#v", c)
}
rc := b.getRepoComments(c.Repo)
cSlice := rc.TaskSpecComments[c.Name]
if len(cSlice) > 0 {
// Assume comments normally inserted at the end.
insert := 0
for i := len(cSlice) - 1; i >= 0; i-- {
if cSlice[i].Timestamp.Equal(c.Timestamp) {
if *cSlice[i] == *c {
return nil
} else {
return db.ErrAlreadyExists
}
} else if cSlice[i].Timestamp.Before(c.Timestamp) {
insert = i + 1
break
}
}
// Ensure capacity for another comment and move any comments after the
// insertion point.
cSlice = append(cSlice, nil)
copy(cSlice[insert+1:], cSlice[insert:])
cSlice[insert] = c.Copy()
} else {
cSlice = []*types.TaskSpecComment{c.Copy()}
}
rc.TaskSpecComments[c.Name] = cSlice
return nil
}
// deleteTaskSpecComment validates c, then finds and removes a comment matching
// c's ID fields, returning the comment if found. Assumes b.mtx is write-locked.
func (b *CommentBox) deleteTaskSpecComment(c *types.TaskSpecComment) (*types.TaskSpecComment, error) {
if c.Repo == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
return nil, fmt.Errorf("TaskSpecComment missing required fields. %#v", c)
}
if rc, ok := b.comments[c.Repo]; ok {
if cSlice, ok := rc.TaskSpecComments[c.Name]; ok {
// Assume linear search is fast.
for i, existing := range cSlice {
if existing.Timestamp.Equal(c.Timestamp) {
if len(cSlice) > 1 {
rc.TaskSpecComments[c.Name] = append(cSlice[:i], cSlice[i+1:]...)
} else {
delete(rc.TaskSpecComments, c.Name)
}
return existing, nil
}
}
}
}
return nil, nil
}
// See documentation for CommentDB.PutTaskSpecComment.
func (b *CommentBox) PutTaskSpecComment(c *types.TaskSpecComment) error {
b.mtx.Lock()
defer b.mtx.Unlock()
if err := b.putTaskSpecComment(c); err != nil {
return err
}
if err := b.write(); err != nil {
// If write returns an error, we must revert to previous.
if _, delErr := b.deleteTaskSpecComment(c); delErr != nil {
sklog.Warningf("Unexpected error: %s", delErr)
}
return err
}
b.TrackModifiedTaskSpecComment(c)
return nil
}
// See documentation for CommentDB.DeleteTaskSpecComment.
func (b *CommentBox) DeleteTaskSpecComment(c *types.TaskSpecComment) error {
b.mtx.Lock()
defer b.mtx.Unlock()
existing, err := b.deleteTaskSpecComment(c)
if err != nil {
return err
}
if existing != nil {
if err := b.write(); err != nil {
// If write returns an error, we must revert to previous.
if putErr := b.putTaskSpecComment(existing); putErr != nil {
sklog.Warningf("Unexpected error: %s", putErr)
}
return err
}
deleted := true
c.Deleted = &deleted
b.TrackModifiedTaskSpecComment(c)
}
return nil
}
// putCommitComment validates c and adds c to b.comments, or returns
// db.ErrAlreadyExists if a different comment has the same ID fields. Assumes b.mtx
// is write-locked.
func (b *CommentBox) putCommitComment(c *types.CommitComment) error {
if c.Repo == "" || c.Revision == "" || util.TimeIsZero(c.Timestamp) {
return fmt.Errorf("CommitComment missing required fields. %#v", c)
}
rc := b.getRepoComments(c.Repo)
cSlice := rc.CommitComments[c.Revision]
if len(cSlice) > 0 {
// Assume comments normally inserted at the end.
insert := 0
for i := len(cSlice) - 1; i >= 0; i-- {
if cSlice[i].Timestamp.Equal(c.Timestamp) {
if *cSlice[i] == *c {
return nil
} else {
return db.ErrAlreadyExists
}
} else if cSlice[i].Timestamp.Before(c.Timestamp) {
insert = i + 1
break
}
}
// Ensure capacity for another comment and move any comments after the
// insertion point.
cSlice = append(cSlice, nil)
copy(cSlice[insert+1:], cSlice[insert:])
cSlice[insert] = c.Copy()
} else {
cSlice = []*types.CommitComment{c.Copy()}
}
rc.CommitComments[c.Revision] = cSlice
return nil
}
// deleteCommitComment validates c, then finds and removes a comment matching
// c's ID fields, returning the comment if found. Assumes b.mtx is write-locked.
func (b *CommentBox) deleteCommitComment(c *types.CommitComment) (*types.CommitComment, error) {
if c.Repo == "" || c.Revision == "" || util.TimeIsZero(c.Timestamp) {
return nil, fmt.Errorf("CommitComment missing required fields. %#v", c)
}
if rc, ok := b.comments[c.Repo]; ok {
if cSlice, ok := rc.CommitComments[c.Revision]; ok {
// Assume linear search is fast.
for i, existing := range cSlice {
if existing.Timestamp.Equal(c.Timestamp) {
if len(cSlice) > 1 {
rc.CommitComments[c.Revision] = append(cSlice[:i], cSlice[i+1:]...)
} else {
delete(rc.CommitComments, c.Revision)
}
return existing, nil
}
}
}
}
return nil, nil
}
// See documentation for CommentDB.PutCommitComment.
func (b *CommentBox) PutCommitComment(c *types.CommitComment) error {
b.mtx.Lock()
defer b.mtx.Unlock()
if err := b.putCommitComment(c); err != nil {
return err
}
if err := b.write(); err != nil {
// If write returns an error, we must revert to previous.
if _, delErr := b.deleteCommitComment(c); delErr != nil {
sklog.Warningf("Unexpected error: %s", delErr)
}
return err
}
b.TrackModifiedCommitComment(c)
return nil
}
// See documentation for CommentDB.DeleteCommitComment.
func (b *CommentBox) DeleteCommitComment(c *types.CommitComment) error {
b.mtx.Lock()
defer b.mtx.Unlock()
existing, err := b.deleteCommitComment(c)
if err != nil {
return err
}
if existing != nil {
if err := b.write(); err != nil {
// If write returns an error, we must revert to previous.
if putErr := b.putCommitComment(existing); putErr != nil {
sklog.Warningf("Unexpected error: %s", putErr)
}
return err
}
deleted := true
c.Deleted = &deleted
b.TrackModifiedCommitComment(c)
}
return nil
}