[task scheduler] Add ModifiedComments
Adds full support for GetModifiedComments, both in-memory and pubsub.
Pass the unified ModifiedTasks+ModifiedJobs+ModifiedComments into the DB
constructors rather than individually.
Bug: skia:
Change-Id: I68386757fd95db81aeaa16d9ac8b34ec928c9b12
Reviewed-on: https://skia-review.googlesource.com/c/177722
Commit-Queue: Eric Boren <borenet@google.com>
Reviewed-by: Ben Wagner <benjaminwagner@google.com>
diff --git a/datahopper/go/datahopper/main.go b/datahopper/go/datahopper/main.go
index 9ea7ef0..b13cf27 100644
--- a/datahopper/go/datahopper/main.go
+++ b/datahopper/go/datahopper/main.go
@@ -7,6 +7,7 @@
import (
"context"
"flag"
+ "fmt"
"os"
"path"
"path/filepath"
@@ -42,10 +43,9 @@
taskSchedulerDbUrl = flag.String("task_db_url", "http://skia-task-scheduler:8008/db/", "Where the Skia task scheduler database is hosted.")
workdir = flag.String("workdir", ".", "Working directory used by data processors.")
- perfBucket = flag.String("perf_bucket", "skia-perf", "The GCS bucket that should be used for writing into perf")
- perfPrefix = flag.String("perf_duration_prefix", "task-duration", "The folder name in the bucket that task duration metric shoudl be written.")
- tasksPubsubTopic = flag.String("pubsub_topic_tasks", pubsub.TOPIC_TASKS, "Pubsub topic for tasks.")
- jobsPubsubTopic = flag.String("pubsub_topic_jobs", pubsub.TOPIC_JOBS, "Pubsub topic for jobs.")
+ perfBucket = flag.String("perf_bucket", "skia-perf", "The GCS bucket that should be used for writing into perf")
+ perfPrefix = flag.String("perf_duration_prefix", "task-duration", "The folder name in the bucket that task duration metric shoudl be written.")
+ pubsubTopicSet = flag.String("pubsub_topic_set", "", fmt.Sprintf("Pubsub topic set; one of: %v", pubsub.VALID_TOPIC_SETS))
)
var (
@@ -166,20 +166,16 @@
var d db.RemoteDB
if *firestoreInstance != "" {
label := "datahopper"
- modTasks, err := pubsub.NewModifiedTasks(*tasksPubsubTopic, label, newTs)
+ mod, err := pubsub.NewModifiedData(*pubsubTopicSet, label, newTs)
if err != nil {
sklog.Fatal(err)
}
- modJobs, err := pubsub.NewModifiedJobs(*jobsPubsubTopic, label, newTs)
- if err != nil {
- sklog.Fatal(err)
- }
- d, err = firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, newTs, modTasks, modJobs)
+ d, err = firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, newTs, mod)
if err != nil {
sklog.Fatalf("Failed to create Firestore DB client: %s", err)
}
} else {
- d, err = remote_db.NewClient(*taskSchedulerDbUrl, *tasksPubsubTopic, *jobsPubsubTopic, "datahopper", newTs)
+ d, err = remote_db.NewClient(*taskSchedulerDbUrl, *pubsubTopicSet, "datahopper", newTs)
if err != nil {
sklog.Fatal(err)
}
diff --git a/go/util/gob.go b/go/util/gob.go
new file mode 100644
index 0000000..207f93e
--- /dev/null
+++ b/go/util/gob.go
@@ -0,0 +1,222 @@
+package util
+
+import (
+ "bytes"
+ "encoding/gob"
+ "sync"
+)
+
+const kNumDecoderGoroutines = 10
+
+// GobEncoder encodes structs into bytes via GOB encoding. Not safe for
+// concurrent use.
+//
+// Here's a template for writing a type-specific encoder:
+//
+// // FooEncoder encodes Foos into bytes via GOB encoding. Not safe for
+// // concurrent use.
+// type FooEncoder {
+// util.GobEncoder
+// }
+//
+// // Next returns one of the Foox provided to Process (in arbitrary order) and
+// // its serialized bytes. If any items remain, returns the item, the
+// // serialized bytes, nil. If all items have been returned, returns nil, nil,
+// // nil. If an error is encountered, returns nil, nil, error.
+// func (e *FooEncoder) Next() (*Foo, []byte, error) {
+// item, serialized, err := e.GobEncoder.Next()
+// if err != nil {
+// return nil, nil, err
+// } else if item == nil {
+// return nil, nil, nil
+// }
+// return item.(*Foo), serialized, nil
+// }
+type GobEncoder struct {
+ err error
+ items []interface{}
+ result [][]byte
+}
+
+// Process encodes the item into a byte slice that will be returned from
+// Next() (in arbitrary order). Returns false if Next is certain to return an
+// error. Caller must ensure item does not change until after the first call to
+// Next(). May not be called after calling Next().
+func (e *GobEncoder) Process(item interface{}) bool {
+ if e.err != nil {
+ return false
+ }
+ var buf bytes.Buffer
+ if err := gob.NewEncoder(&buf).Encode(item); err != nil {
+ e.err = err
+ e.items = nil
+ e.result = nil
+ return false
+ }
+ e.items = append(e.items, item)
+ e.result = append(e.result, buf.Bytes())
+ return true
+}
+
+// Next returns one of the items provided to Process (in arbitrary order) and
+// its serialized bytes. If any items remain, returns the item, the serialized
+// bytes, nil. If all items have been returned, returns nil, nil, nil. If an
+// error is encountered, returns nil, nil, error.
+func (e *GobEncoder) Next() (interface{}, []byte, error) {
+ if e.err != nil {
+ return nil, nil, e.err
+ }
+ if len(e.items) == 0 {
+ return nil, nil, nil
+ }
+ c := e.items[0]
+ e.items = e.items[1:]
+ serialized := e.result[0]
+ e.result = e.result[1:]
+ return c, serialized, nil
+}
+
+// GobDecoder decodes bytes into structs via GOB decoding. Not safe for
+// concurrent use.
+//
+// Here's a template for writing a type-specific decoder:
+//
+// FooDecoder decodes bytes into Foos via GOB decoding. Not safe for
+// concurrent use.
+// type FooDecoder struct {
+// *util.GobDecoder
+// }
+//
+// // NewFooDecoder returns a FooDecoder instance.
+// func NewFooDecoder() *FooDecoder {
+// return &FooDecoder{
+// GobDecoder: util.NewGobDecoder(func() interface{} {
+// return &Foo{}
+// }, func(ch <-chan interface{}) interface{} {
+// items := []*Foo{}
+// for item := range ch {
+// items = append(items, item.(*Foo))
+// }
+// return items
+// }),
+// }
+// }
+//
+// // Result returns all decoded Foos provided to Process (in arbitrary order), or
+// // any error encountered.
+// func (d *FooDecoder) Result() ([]*Foo, error) {
+// res, err := d.GobDecoder.Result()
+// if err != nil {
+// return nil, err
+// }
+// return res.([]*Foo), nil
+// }
+type GobDecoder struct {
+ // input contains the incoming byte slices. Process() sends on this
+ // channel, decode() receives from it, and Result() closes it.
+ input chan []byte
+ // output contains decoded items. decode() sends on this channel,
+ // collect() receives from it, and run() closes it when all decode()
+ // goroutines have finished.
+ output chan interface{}
+ // result contains the return value of Result(). collect() sends a single
+ // value on this channel and closes it. Result() receives from it.
+ result chan interface{}
+ // errors contains the first error from any goroutine. It's a channel in
+ // case multiple goroutines experience an error at the same time.
+ errors chan error
+
+ newItem func() interface{}
+ collectImpl func(<-chan interface{}) interface{}
+}
+
+// NewGobDecoder returns a GobDecoder instance. The first argument is a
+// goroutine-safe function which returns a zero-valued instance of the type
+// being decoded, eg.
+//
+// func() interface{} {
+// return &MyType{}
+// }
+//
+// The second argument is a function which collects decoded instances of that
+// type from a channel and returns a slice, eg.
+//
+// func(ch <-chan interface{}) interface{} {
+// items := []*MyType{}
+// for item := range ch {
+// items = append(items, item.(*MyType))
+// }
+// return items
+// }
+func NewGobDecoder(newItem func() interface{}, collect func(<-chan interface{}) interface{}) *GobDecoder {
+ d := &GobDecoder{
+ input: make(chan []byte, kNumDecoderGoroutines*2),
+ output: make(chan interface{}, kNumDecoderGoroutines),
+ result: make(chan interface{}, 1),
+ errors: make(chan error, kNumDecoderGoroutines),
+ newItem: newItem,
+ collectImpl: collect,
+ }
+ go d.run()
+ go d.collect()
+ return d
+}
+
+// run starts the decode goroutines and closes d.output when they finish.
+func (d *GobDecoder) run() {
+ // Start decoders.
+ wg := sync.WaitGroup{}
+ for i := 0; i < kNumDecoderGoroutines; i++ {
+ wg.Add(1)
+ go d.decode(&wg)
+ }
+ // Wait for decoders to exit.
+ wg.Wait()
+ // Drain d.input in the case that errors were encountered, to avoid deadlock.
+ for range d.input {
+ }
+ close(d.output)
+}
+
+// decode receives from d.input and sends to d.output until d.input is closed or
+// d.errors is non-empty. Decrements wg when done.
+func (d *GobDecoder) decode(wg *sync.WaitGroup) {
+ for b := range d.input {
+ item := d.newItem()
+ if err := gob.NewDecoder(bytes.NewReader(b)).Decode(item); err != nil {
+ d.errors <- err
+ break
+ }
+ d.output <- item
+ if len(d.errors) > 0 {
+ break
+ }
+ }
+ wg.Done()
+}
+
+// collect receives from d.output until it is closed, then sends on d.result.
+func (d *GobDecoder) collect() {
+ d.result <- d.collectImpl(d.output)
+ close(d.result)
+}
+
+// Process decodes the byte slice and includes it in Result() (in arbitrary
+// order). Returns false if Result is certain to return an error. Caller must
+// ensure b does not change until after Result() returns.
+func (d *GobDecoder) Process(b []byte) bool {
+ d.input <- b
+ return len(d.errors) == 0
+}
+
+// Result returns all decoded items provided to Process (in arbitrary order), or
+// any error encountered.
+func (d *GobDecoder) Result() (interface{}, error) {
+ close(d.input)
+ select {
+ case err := <-d.errors:
+ return nil, err
+ case result := <-d.result:
+ return result, nil
+ }
+}
diff --git a/go/util/gob_test.go b/go/util/gob_test.go
new file mode 100644
index 0000000..ec63312
--- /dev/null
+++ b/go/util/gob_test.go
@@ -0,0 +1,138 @@
+package util
+
+import (
+ "bytes"
+ "encoding/gob"
+ "fmt"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "go.skia.org/infra/go/deepequal"
+ "go.skia.org/infra/go/testutils"
+)
+
+type Item struct {
+ Id string
+ Map map[string]string
+ Slice []string
+}
+
+func TestGobEncoder(t *testing.T) {
+ testutils.SmallTest(t)
+ // TODO(benjaminwagner): Is there any way to cause an error?
+ e := GobEncoder{}
+ expectedItems := map[*Item][]byte{}
+ for i := 0; i < 25; i++ {
+ item := &Item{}
+ item.Id = fmt.Sprintf("Id-%d", i)
+ item.Map = map[string]string{"PointA": "PointB"}
+ item.Slice = []string{"bread"}
+ var buf bytes.Buffer
+ err := gob.NewEncoder(&buf).Encode(item)
+ assert.NoError(t, err)
+ expectedItems[item] = buf.Bytes()
+ assert.True(t, e.Process(item))
+ }
+
+ actualItems := map[*Item][]byte{}
+ for item, serialized, err := e.Next(); item != nil; item, serialized, err = e.Next() {
+ assert.NoError(t, err)
+ actualItems[item.(*Item)] = serialized
+ }
+
+ deepequal.AssertDeepEqual(t, expectedItems, actualItems)
+}
+
+func TestGobEncoderNoItems(t *testing.T) {
+ testutils.SmallTest(t)
+ e := GobEncoder{}
+ item, serialized, err := e.Next()
+ assert.NoError(t, err)
+ assert.Nil(t, item)
+ assert.Nil(t, serialized)
+}
+
+func TestGobDecoder(t *testing.T) {
+ testutils.SmallTest(t)
+ d := NewGobDecoder(func() interface{} {
+ return &Item{}
+ }, func(ch <-chan interface{}) interface{} {
+ items := []*Item{}
+ for item := range ch {
+ items = append(items, item.(*Item))
+ }
+ return items
+ })
+ expectedItems := map[string]*Item{}
+ for i := 0; i < 250; i++ {
+ item := &Item{}
+ item.Id = fmt.Sprintf("Id-%d", i)
+ item.Map = map[string]string{"PointA": "PointB"}
+ item.Slice = []string{"bread"}
+ var buf bytes.Buffer
+ err := gob.NewEncoder(&buf).Encode(item)
+ assert.NoError(t, err)
+ expectedItems[item.Id] = item
+ assert.True(t, d.Process(buf.Bytes()))
+ }
+
+ actualItems := map[string]*Item{}
+ iResult, err := d.Result()
+ assert.NoError(t, err)
+ result := iResult.([]*Item)
+ assert.Equal(t, len(expectedItems), len(result))
+ for _, item := range result {
+ actualItems[item.Id] = item
+ }
+ deepequal.AssertDeepEqual(t, expectedItems, actualItems)
+}
+
+func TestGobDecoderNoItems(t *testing.T) {
+ testutils.SmallTest(t)
+ d := NewGobDecoder(func() interface{} {
+ return &Item{}
+ }, func(ch <-chan interface{}) interface{} {
+ items := []*Item{}
+ for item := range ch {
+ items = append(items, item.(*Item))
+ }
+ return items
+ })
+ result, err := d.Result()
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(result.([]*Item)))
+}
+
+func TestGobDecoderError(t *testing.T) {
+ testutils.SmallTest(t)
+ item := &Item{}
+ item.Id = "Id"
+ var buf bytes.Buffer
+ err := gob.NewEncoder(&buf).Encode(item)
+ assert.NoError(t, err)
+ serialized := buf.Bytes()
+ invalid := append([]byte("Hi Mom!"), serialized...)
+
+ d := NewGobDecoder(func() interface{} {
+ return &Item{}
+ }, func(ch <-chan interface{}) interface{} {
+ items := []*Item{}
+ for item := range ch {
+ items = append(items, item.(*Item))
+ }
+ return items
+ })
+ // Process should return true before it encounters an invalid result.
+ assert.True(t, d.Process(serialized))
+ assert.True(t, d.Process(serialized))
+ // Process may return true or false after encountering an invalid value.
+ _ = d.Process(invalid)
+ for i := 0; i < 250; i++ {
+ _ = d.Process(serialized)
+ }
+
+ // Result should return error.
+ result, err := d.Result()
+ assert.Error(t, err)
+ assert.Nil(t, result)
+}
diff --git a/status/go/incremental/incremental_test.go b/status/go/incremental/incremental_test.go
index ad977cd..2cf6955 100644
--- a/status/go/incremental/incremental_test.go
+++ b/status/go/incremental/incremental_test.go
@@ -21,9 +21,7 @@
func setup(t *testing.T) (context.Context, string, *IncrementalCache, repograph.Map, db.DB, *git_testutils.GitBuilder, func()) {
testutils.LargeTest(t)
- taskDb := memory.NewInMemoryTaskDB(nil)
- commentDb := &db.CommentBox{}
- d := db.NewDB(taskDb, nil, commentDb)
+ d := memory.NewInMemoryDB(nil)
ctx := context.Background()
gb := git_testutils.GitInit(t, ctx)
@@ -48,7 +46,7 @@
Name: "DummyTask",
},
}
- assert.NoError(t, taskDb.PutTask(initialTask))
+ assert.NoError(t, d.PutTask(initialTask))
w, err := window.New(24*time.Hour, 100, repos)
assert.NoError(t, err)
diff --git a/status/go/status/main.go b/status/go/status/main.go
index 7a10288..03f0100 100644
--- a/status/go/status/main.go
+++ b/status/go/status/main.go
@@ -101,8 +101,7 @@
testing = flag.Bool("testing", false, "Set to true for locally testing rules. No email will be sent.")
useMetadata = flag.Bool("use_metadata", true, "Load sensitive values from metadata not from flags.")
workdir = flag.String("workdir", ".", "Directory to use for scratch work.")
- pubsubTopicTasks = flag.String("pubsub_topic_tasks", pubsub.TOPIC_TASKS, "Pubsub topic for tasks.")
- pubsubTopicJobs = flag.String("pubsub_topic_jobs", pubsub.TOPIC_JOBS, "Pubsub topic for jobs.")
+ pubsubTopicSet = flag.String("pubsub_topic_set", "", fmt.Sprintf("Pubsub topic set; one of: %v", pubsub.VALID_TOPIC_SETS))
repos repograph.Map
)
@@ -729,7 +728,7 @@
// Create remote Tasks DB.
if *testing {
- taskDb, err = local_db.NewDB("status-testing", path.Join(*workdir, "status-testing.bdb"), nil, nil)
+ taskDb, err = local_db.NewDB("status-testing", path.Join(*workdir, "status-testing.bdb"), nil)
if err != nil {
sklog.Fatalf("Failed to create local task DB: %s", err)
}
@@ -737,22 +736,18 @@
} else if *firestoreInstance != "" {
sklog.Infof("Creating firestore DB.")
label := *host
- modTasks, err := pubsub.NewModifiedTasks(*pubsubTopicTasks, label, ts)
+ mod, err := pubsub.NewModifiedData(*pubsubTopicSet, label, ts)
if err != nil {
sklog.Fatal(err)
}
- modJobs, err := pubsub.NewModifiedJobs(*pubsubTopicJobs, label, ts)
- if err != nil {
- sklog.Fatal(err)
- }
- taskDb, err = firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, ts, modTasks, modJobs)
+ taskDb, err = firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, ts, mod)
if err != nil {
sklog.Fatalf("Failed to create Firestore DB client: %s", err)
}
} else {
sklog.Infof("Creating remote DB.")
label := *host
- taskDb, err = remote_db.NewClient(*taskSchedulerDbUrl, *pubsubTopicTasks, *pubsubTopicJobs, label, ts)
+ taskDb, err = remote_db.NewClient(*taskSchedulerDbUrl, *pubsubTopicSet, label, ts)
if err != nil {
sklog.Fatalf("Failed to create remote task DB: %s", err)
}
diff --git a/status/go/statusk/main.go b/status/go/statusk/main.go
index 780d280..9334ff1 100644
--- a/status/go/statusk/main.go
+++ b/status/go/statusk/main.go
@@ -109,8 +109,7 @@
testing = flag.Bool("testing", false, "Set to true for locally testing rules. No email will be sent.")
useMetadata = flag.Bool("use_metadata", true, "Load sensitive values from metadata not from flags.")
workdir = flag.String("workdir", ".", "Directory to use for scratch work.")
- pubsubTopicTasks = flag.String("pubsub_topic_tasks", pubsub.TOPIC_TASKS, "Pubsub topic for tasks.")
- pubsubTopicJobs = flag.String("pubsub_topic_jobs", pubsub.TOPIC_JOBS, "Pubsub topic for jobs.")
+ pubsubTopicSet = flag.String("pubsub_topic_set", "", fmt.Sprintf("Pubsub topic set; one of: %v", pubsub.VALID_TOPIC_SETS))
repos repograph.Map
)
@@ -716,7 +715,7 @@
// Create remote Tasks DB.
if *testing {
- taskDb, err = local_db.NewDB("status-testing", path.Join(*workdir, "status-testing.bdb"), nil, nil)
+ taskDb, err = local_db.NewDB("status-testing", path.Join(*workdir, "status-testing.bdb"), nil)
if err != nil {
sklog.Fatalf("Failed to create local task DB: %s", err)
}
@@ -729,21 +728,17 @@
if *firestoreInstance != "" {
label := *host
- modTasks, err := pubsub.NewModifiedTasks(*pubsubTopicTasks, label, ts)
+ mod, err := pubsub.NewModifiedData(*pubsubTopicSet, label, ts)
if err != nil {
sklog.Fatal(err)
}
- modJobs, err := pubsub.NewModifiedJobs(*pubsubTopicJobs, label, ts)
- if err != nil {
- sklog.Fatal(err)
- }
- taskDb, err = firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, ts, modTasks, modJobs)
+ taskDb, err = firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, ts, mod)
if err != nil {
sklog.Fatalf("Failed to create Firestore DB client: %s", err)
}
} else {
label := *host
- taskDb, err = remote_db.NewClient(*taskSchedulerDbUrl, *pubsubTopicTasks, *pubsubTopicJobs, label, ts)
+ taskDb, err = remote_db.NewClient(*taskSchedulerDbUrl, *pubsubTopicSet, label, ts)
if err != nil {
sklog.Fatalf("Failed to create remote task DB: %s", err)
}
diff --git a/status/sys/status-internal.service b/status/sys/status-internal.service
index 91e59fa..571d63a 100644
--- a/status/sys/status-internal.service
+++ b/status/sys/status-internal.service
@@ -12,8 +12,7 @@
--host=status-internal.skia.org \
--resources_dir=/usr/local/share/status \
--capacity_recalculate_interval=30m \
- --pubsub_topic_tasks=task-scheduler-modified-tasks-internal \
- --pubsub_topic_jobs=task-scheduler-modified-jobs-internal \
+ --pubsub_topic_set=internal \
--repo=https://skia.googlesource.com/skia_internal.git \
--repo=https://skia.googlesource.com/internal_test.git \
--swarming_url=https://chrome-swarming.appspot.com \
diff --git a/status/sys/status-staging.service b/status/sys/status-staging.service
index 61077ee..708ac21 100644
--- a/status/sys/status-staging.service
+++ b/status/sys/status-staging.service
@@ -13,8 +13,7 @@
--resources_dir=/usr/local/share/status \
--capacity_recalculate_interval=30m \
--firestore_instance=staging \
- --pubsub_topic_tasks=task-scheduler-modified-tasks-staging \
- --pubsub_topic_jobs=task-scheduler-modified-jobs-staging \
+ --pubsub_topic_set=staging \
--repo=https://skia.googlesource.com/skiabot-test.git \
--swarming_url=https://chromium-swarm-dev.appspot.com \
--task_scheduler_url=https://task-scheduler-staging.skia.org \
diff --git a/status/sys/statusd.service b/status/sys/statusd.service
index 45ec057..8488151 100644
--- a/status/sys/statusd.service
+++ b/status/sys/statusd.service
@@ -12,8 +12,7 @@
--host=status.skia.org \
--resources_dir=/usr/local/share/status \
--capacity_recalculate_interval=30m \
- --pubsub_topic_tasks=task-scheduler-modified-tasks \
- --pubsub_topic_jobs=task-scheduler-modified-jobs \
+ --pubsub_topic_set=prod \
--task_db_url=http://skia-task-scheduler:8008/db/
Restart=always
User=default
diff --git a/task_scheduler/go/db/comments.go b/task_scheduler/go/db/comments.go
index 8123c53..8885f2b 100644
--- a/task_scheduler/go/db/comments.go
+++ b/task_scheduler/go/db/comments.go
@@ -1,20 +1,56 @@
package db
import (
- "fmt"
- "sync"
"time"
- "go.skia.org/infra/go/sklog"
- "go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/types"
)
+// ModifiedCommentsReader tracks which comments have been added or deleted and
+// returns results to subscribers based on what has changed since the last call
+// to GetModifiedComments.
+type ModifiedComments interface {
+ // GetModifiedComments returns all comments added or deleted since the
+ // last time GetModifiedComments was run with the given id. The returned
+ // comments are sorted by timestamp. If GetModifiedComments returns an
+ // error, the caller should call StopTrackingModifiedComments and
+ // StartTrackingModifiedComments again, and load all data from scratch
+ // to be sure that no comments were missed.
+ GetModifiedComments(string) ([]*types.TaskComment, []*types.TaskSpecComment, []*types.CommitComment, error)
+
+ // StartTrackingModifiedComments initiates tracking of modified comments
+ // for the current caller. Returns a unique ID which can be used by the
+ // caller to retrieve comments which have been added or deleted since
+ // the last query. The ID expires after a period of inactivity.
+ StartTrackingModifiedComments() (string, error)
+
+ // StopTrackingModifiedComments cancels tracking of modified comments
+ // for the provided ID.
+ StopTrackingModifiedComments(string)
+
+ // TrackModifiedTaskComment indicates the given comment should be
+ // returned from the next call to GetModifiedComments from each
+ // subscriber.
+ TrackModifiedTaskComment(*types.TaskComment)
+
+ // TrackModifiedTaskSpecComment indicates the given comment should be
+ // returned from the next call to GetModifiedComments from each
+ // subscriber.
+ TrackModifiedTaskSpecComment(*types.TaskSpecComment)
+
+ // TrackModifiedCommitComment indicates the given comment should be
+ // returned from the next call to GetModifiedComments from each
+ // subscriber.
+ TrackModifiedCommitComment(*types.CommitComment)
+}
+
// CommentDB stores comments on Tasks, TaskSpecs, and commits.
//
// Clients must be tolerant of comments that refer to nonexistent Tasks,
// TaskSpecs, or commits.
type CommentDB interface {
+ ModifiedComments
+
// GetComments returns all comments for the given repos.
//
// If from is specified, it is a hint that TaskComments and CommitComments
@@ -45,373 +81,3 @@
// Non-ID fields of the argument are ignored.
DeleteCommitComment(*types.CommitComment) error
}
-
-// CommentBox implements CommentDB with in-memory storage.
-//
-// When created via NewCommentBoxWithPersistence, CommentBox will persist the
-// in-memory representation on every change using the provided writer function.
-//
-// CommentBox can be default-initialized if only in-memory storage is desired.
-type CommentBox struct {
- // mtx protects comments.
- mtx sync.RWMutex
- // comments is map[repo_name]*types.RepoComments.
- comments map[string]*types.RepoComments
- // writer is called to persist comments after every change.
- writer func(map[string]*types.RepoComments) error
-}
-
-// NewCommentBoxWithPersistence creates a CommentBox that is initialized with
-// init and sends the updated in-memory representation to writer after each
-// change. The value of init and the argument to writer is
-// map[repo_name]*types.RepoComments. init must not be modified by the caller. writer
-// must not call any methods of CommentBox. writer may return an error to
-// prevent a change from taking effect.
-func NewCommentBoxWithPersistence(init map[string]*types.RepoComments, writer func(map[string]*types.RepoComments) error) *CommentBox {
- return &CommentBox{
- comments: init,
- writer: writer,
- }
-}
-
-// See documentation for CommentDB.GetCommentsForRepos.
-func (b *CommentBox) GetCommentsForRepos(repos []string, from time.Time) ([]*types.RepoComments, error) {
- b.mtx.RLock()
- defer b.mtx.RUnlock()
- rv := make([]*types.RepoComments, len(repos))
- for i, repo := range repos {
- if rc, ok := b.comments[repo]; ok {
- rv[i] = rc.Copy()
- } else {
- rv[i] = &types.RepoComments{Repo: repo}
- }
- }
- return rv, nil
-}
-
-// write calls b.writer with comments if non-null.
-func (b *CommentBox) write() error {
- if b.writer == nil {
- return nil
- }
- return b.writer(b.comments)
-}
-
-// getRepoComments returns the initialized *types.RepoComments for the given repo.
-func (b *CommentBox) getRepoComments(repo string) *types.RepoComments {
- if b.comments == nil {
- b.comments = make(map[string]*types.RepoComments, 1)
- }
- rc, ok := b.comments[repo]
- if !ok {
- rc = &types.RepoComments{
- Repo: repo,
- TaskComments: map[string]map[string][]*types.TaskComment{},
- TaskSpecComments: map[string][]*types.TaskSpecComment{},
- CommitComments: map[string][]*types.CommitComment{},
- }
- b.comments[repo] = rc
- }
- return rc
-}
-
-// putTaskComment validates c and adds c to b.comments, or returns
-// ErrAlreadyExists if a different comment has the same ID fields. Assumes b.mtx
-// is write-locked.
-func (b *CommentBox) putTaskComment(c *types.TaskComment) error {
- if c.Repo == "" || c.Revision == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
- return fmt.Errorf("TaskComment missing required fields. %#v", c)
- }
- rc := b.getRepoComments(c.Repo)
- nameMap, ok := rc.TaskComments[c.Revision]
- if !ok {
- nameMap = map[string][]*types.TaskComment{}
- rc.TaskComments[c.Revision] = nameMap
- }
- cSlice := nameMap[c.Name]
- // TODO(benjaminwagner): Would using utilities in the sort package make this
- // cleaner?
- if len(cSlice) > 0 {
- // Assume comments normally inserted at the end.
- insert := 0
- for i := len(cSlice) - 1; i >= 0; i-- {
- if cSlice[i].Timestamp.Equal(c.Timestamp) {
- if *cSlice[i] == *c {
- return nil
- } else {
- return ErrAlreadyExists
- }
- } else if cSlice[i].Timestamp.Before(c.Timestamp) {
- insert = i + 1
- break
- }
- }
- // Ensure capacity for another comment and move any comments after the
- // insertion point.
- cSlice = append(cSlice, nil)
- copy(cSlice[insert+1:], cSlice[insert:])
- cSlice[insert] = c.Copy()
- } else {
- cSlice = []*types.TaskComment{c.Copy()}
- }
- nameMap[c.Name] = cSlice
- return nil
-}
-
-// deleteTaskComment validates c, then finds and removes a comment matching c's
-// ID fields, returning the comment if found. Assumes b.mtx is write-locked.
-func (b *CommentBox) deleteTaskComment(c *types.TaskComment) (*types.TaskComment, error) {
- if c.Repo == "" || c.Revision == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
- return nil, fmt.Errorf("TaskComment missing required fields. %#v", c)
- }
- if rc, ok := b.comments[c.Repo]; ok {
- if cSlice, ok := rc.TaskComments[c.Revision][c.Name]; ok {
- // Assume linear search is fast.
- for i, existing := range cSlice {
- if existing.Timestamp.Equal(c.Timestamp) {
- if len(cSlice) > 1 {
- rc.TaskComments[c.Revision][c.Name] = append(cSlice[:i], cSlice[i+1:]...)
- } else {
- delete(rc.TaskComments[c.Revision], c.Name)
- if len(rc.TaskComments[c.Revision]) == 0 {
- delete(rc.TaskComments, c.Revision)
- }
- }
- return existing, nil
- }
- }
- }
- }
- return nil, nil
-}
-
-// See documentation for CommentDB.PutTaskComment.
-func (b *CommentBox) PutTaskComment(c *types.TaskComment) error {
- b.mtx.Lock()
- defer b.mtx.Unlock()
- if err := b.putTaskComment(c); err != nil {
- return err
- }
- if err := b.write(); err != nil {
- // If write returns an error, we must revert to previous.
- if _, delErr := b.deleteTaskComment(c); delErr != nil {
- sklog.Warningf("Unexpected error: %s", delErr)
- }
- return err
- }
- return nil
-}
-
-// See documentation for CommentDB.DeleteTaskComment.
-func (b *CommentBox) DeleteTaskComment(c *types.TaskComment) error {
- b.mtx.Lock()
- defer b.mtx.Unlock()
- existing, err := b.deleteTaskComment(c)
- if err != nil {
- return err
- }
- if existing != nil {
- if err := b.write(); err != nil {
- // If write returns an error, we must revert to previous.
- if putErr := b.putTaskComment(existing); putErr != nil {
- sklog.Warningf("Unexpected error: %s", putErr)
- }
- return err
- }
- }
- return nil
-}
-
-// putTaskSpecComment validates c and adds c to b.comments, or returns
-// ErrAlreadyExists if a different comment has the same ID fields. Assumes b.mtx
-// is write-locked.
-func (b *CommentBox) putTaskSpecComment(c *types.TaskSpecComment) error {
- if c.Repo == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
- return fmt.Errorf("TaskSpecComment missing required fields. %#v", c)
- }
- rc := b.getRepoComments(c.Repo)
- cSlice := rc.TaskSpecComments[c.Name]
- if len(cSlice) > 0 {
- // Assume comments normally inserted at the end.
- insert := 0
- for i := len(cSlice) - 1; i >= 0; i-- {
- if cSlice[i].Timestamp.Equal(c.Timestamp) {
- if *cSlice[i] == *c {
- return nil
- } else {
- return ErrAlreadyExists
- }
- } else if cSlice[i].Timestamp.Before(c.Timestamp) {
- insert = i + 1
- break
- }
- }
- // Ensure capacity for another comment and move any comments after the
- // insertion point.
- cSlice = append(cSlice, nil)
- copy(cSlice[insert+1:], cSlice[insert:])
- cSlice[insert] = c.Copy()
- } else {
- cSlice = []*types.TaskSpecComment{c.Copy()}
- }
- rc.TaskSpecComments[c.Name] = cSlice
- return nil
-}
-
-// deleteTaskSpecComment validates c, then finds and removes a comment matching
-// c's ID fields, returning the comment if found. Assumes b.mtx is write-locked.
-func (b *CommentBox) deleteTaskSpecComment(c *types.TaskSpecComment) (*types.TaskSpecComment, error) {
- if c.Repo == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
- return nil, fmt.Errorf("TaskSpecComment missing required fields. %#v", c)
- }
- if rc, ok := b.comments[c.Repo]; ok {
- if cSlice, ok := rc.TaskSpecComments[c.Name]; ok {
- // Assume linear search is fast.
- for i, existing := range cSlice {
- if existing.Timestamp.Equal(c.Timestamp) {
- if len(cSlice) > 1 {
- rc.TaskSpecComments[c.Name] = append(cSlice[:i], cSlice[i+1:]...)
- } else {
- delete(rc.TaskSpecComments, c.Name)
- }
- return existing, nil
- }
- }
- }
- }
- return nil, nil
-}
-
-// See documentation for CommentDB.PutTaskSpecComment.
-func (b *CommentBox) PutTaskSpecComment(c *types.TaskSpecComment) error {
- b.mtx.Lock()
- defer b.mtx.Unlock()
- if err := b.putTaskSpecComment(c); err != nil {
- return err
- }
- if err := b.write(); err != nil {
- // If write returns an error, we must revert to previous.
- if _, delErr := b.deleteTaskSpecComment(c); delErr != nil {
- sklog.Warningf("Unexpected error: %s", delErr)
- }
- return err
- }
- return nil
-}
-
-// See documentation for CommentDB.DeleteTaskSpecComment.
-func (b *CommentBox) DeleteTaskSpecComment(c *types.TaskSpecComment) error {
- b.mtx.Lock()
- defer b.mtx.Unlock()
- existing, err := b.deleteTaskSpecComment(c)
- if err != nil {
- return err
- }
- if existing != nil {
- if err := b.write(); err != nil {
- // If write returns an error, we must revert to previous.
- if putErr := b.putTaskSpecComment(existing); putErr != nil {
- sklog.Warningf("Unexpected error: %s", putErr)
- }
- return err
- }
- }
- return nil
-}
-
-// putCommitComment validates c and adds c to b.comments, or returns
-// ErrAlreadyExists if a different comment has the same ID fields. Assumes b.mtx
-// is write-locked.
-func (b *CommentBox) putCommitComment(c *types.CommitComment) error {
- if c.Repo == "" || c.Revision == "" || util.TimeIsZero(c.Timestamp) {
- return fmt.Errorf("CommitComment missing required fields. %#v", c)
- }
- rc := b.getRepoComments(c.Repo)
- cSlice := rc.CommitComments[c.Revision]
- if len(cSlice) > 0 {
- // Assume comments normally inserted at the end.
- insert := 0
- for i := len(cSlice) - 1; i >= 0; i-- {
- if cSlice[i].Timestamp.Equal(c.Timestamp) {
- if *cSlice[i] == *c {
- return nil
- } else {
- return ErrAlreadyExists
- }
- } else if cSlice[i].Timestamp.Before(c.Timestamp) {
- insert = i + 1
- break
- }
- }
- // Ensure capacity for another comment and move any comments after the
- // insertion point.
- cSlice = append(cSlice, nil)
- copy(cSlice[insert+1:], cSlice[insert:])
- cSlice[insert] = c.Copy()
- } else {
- cSlice = []*types.CommitComment{c.Copy()}
- }
- rc.CommitComments[c.Revision] = cSlice
- return nil
-}
-
-// deleteCommitComment validates c, then finds and removes a comment matching
-// c's ID fields, returning the comment if found. Assumes b.mtx is write-locked.
-func (b *CommentBox) deleteCommitComment(c *types.CommitComment) (*types.CommitComment, error) {
- if c.Repo == "" || c.Revision == "" || util.TimeIsZero(c.Timestamp) {
- return nil, fmt.Errorf("CommitComment missing required fields. %#v", c)
- }
- if rc, ok := b.comments[c.Repo]; ok {
- if cSlice, ok := rc.CommitComments[c.Revision]; ok {
- // Assume linear search is fast.
- for i, existing := range cSlice {
- if existing.Timestamp.Equal(c.Timestamp) {
- if len(cSlice) > 1 {
- rc.CommitComments[c.Revision] = append(cSlice[:i], cSlice[i+1:]...)
- } else {
- delete(rc.CommitComments, c.Revision)
- }
- return existing, nil
- }
- }
- }
- }
- return nil, nil
-}
-
-// See documentation for CommentDB.PutCommitComment.
-func (b *CommentBox) PutCommitComment(c *types.CommitComment) error {
- b.mtx.Lock()
- defer b.mtx.Unlock()
- if err := b.putCommitComment(c); err != nil {
- return err
- }
- if err := b.write(); err != nil {
- // If write returns an error, we must revert to previous.
- if _, delErr := b.deleteCommitComment(c); delErr != nil {
- sklog.Warningf("Unexpected error: %s", delErr)
- }
- return err
- }
- return nil
-}
-
-// See documentation for CommentDB.DeleteCommitComment.
-func (b *CommentBox) DeleteCommitComment(c *types.CommitComment) error {
- b.mtx.Lock()
- defer b.mtx.Unlock()
- existing, err := b.deleteCommitComment(c)
- if err != nil {
- return err
- }
- if existing != nil {
- if err := b.write(); err != nil {
- // If write returns an error, we must revert to previous.
- if putErr := b.putCommitComment(existing); putErr != nil {
- sklog.Warningf("Unexpected error: %s", putErr)
- }
- return err
- }
- }
- return nil
-}
diff --git a/task_scheduler/go/db/db.go b/task_scheduler/go/db/db.go
index ce5e259..dd7215f 100644
--- a/task_scheduler/go/db/db.go
+++ b/task_scheduler/go/db/db.go
@@ -265,6 +265,30 @@
PutJobs([]*types.Job) error
}
+// ModifiedData combines ModifiedTasks, ModifiedJobs, and ModifiedComments.
+type ModifiedData interface {
+ ModifiedTasks
+ ModifiedJobs
+ ModifiedComments
+}
+
+// modifiedData implements ModifiedData.
+type modifiedData struct {
+ ModifiedTasks
+ ModifiedJobs
+ ModifiedComments
+}
+
+// NewModifiedData returns a ModifiedData which combines the given
+// ModifiedTasks, ModifiedJobs, and ModifiedComments.
+func NewModifiedData(t ModifiedTasks, j ModifiedJobs, c ModifiedComments) ModifiedData {
+ return &modifiedData{
+ ModifiedTasks: t,
+ ModifiedJobs: j,
+ ModifiedComments: c,
+ }
+}
+
// UpdateJobsWithRetries wraps a call to db.PutJobs with retries. It calls
// db.PutJobs(f()) repeatedly until one of the following happen:
// - f or db.PutJobs returns an error, which is then returned from
diff --git a/task_scheduler/go/db/firestore/comments.go b/task_scheduler/go/db/firestore/comments.go
index 4fc0234..befd147 100644
--- a/task_scheduler/go/db/firestore/comments.go
+++ b/task_scheduler/go/db/firestore/comments.go
@@ -123,14 +123,34 @@
if st, ok := status.FromError(err); ok && st.Code() == codes.AlreadyExists {
return db.ErrAlreadyExists
}
- return err
+ if err != nil {
+ return err
+ }
+ d.TrackModifiedTaskComment(c)
+ return nil
}
// See documentation for db.CommentDB interface.
func (d *firestoreDB) DeleteTaskComment(c *types.TaskComment) error {
id := taskCommentId(c)
- _, err := firestore.Delete(d.taskComments().Doc(id), DEFAULT_ATTEMPTS, PUT_SINGLE_TIMEOUT)
- return err
+ ref := d.taskComments().Doc(id)
+ exists := true
+ if _, err := firestore.Get(ref, DEFAULT_ATTEMPTS, GET_SINGLE_TIMEOUT); err != nil {
+ if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
+ exists = false
+ } else {
+ return err
+ }
+ }
+ if exists {
+ if _, err := firestore.Delete(ref, DEFAULT_ATTEMPTS, PUT_SINGLE_TIMEOUT); err != nil {
+ return err
+ }
+ deleted := true
+ c.Deleted = &deleted
+ d.TrackModifiedTaskComment(c)
+ }
+ return nil
}
// taskSpecCommentId returns an ID for the TaskSpecComment.
@@ -146,14 +166,34 @@
if st, ok := status.FromError(err); ok && st.Code() == codes.AlreadyExists {
return db.ErrAlreadyExists
}
- return err
+ if err != nil {
+ return err
+ }
+ d.TrackModifiedTaskSpecComment(c)
+ return nil
}
// See documentation for db.CommentDB interface.
func (d *firestoreDB) DeleteTaskSpecComment(c *types.TaskSpecComment) error {
id := taskSpecCommentId(c)
- _, err := firestore.Delete(d.taskSpecComments().Doc(id), DEFAULT_ATTEMPTS, PUT_SINGLE_TIMEOUT)
- return err
+ ref := d.taskSpecComments().Doc(id)
+ exists := true
+ if _, err := firestore.Get(ref, DEFAULT_ATTEMPTS, GET_SINGLE_TIMEOUT); err != nil {
+ if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
+ exists = false
+ } else {
+ return err
+ }
+ }
+ if exists {
+ if _, err := firestore.Delete(ref, DEFAULT_ATTEMPTS, PUT_SINGLE_TIMEOUT); err != nil {
+ return err
+ }
+ deleted := true
+ c.Deleted = &deleted
+ d.TrackModifiedTaskSpecComment(c)
+ }
+ return nil
}
// commitCommentId returns an ID for the CommitComment.
@@ -169,12 +209,32 @@
if st, ok := status.FromError(err); ok && st.Code() == codes.AlreadyExists {
return db.ErrAlreadyExists
}
- return err
+ if err != nil {
+ return err
+ }
+ d.TrackModifiedCommitComment(c)
+ return nil
}
// See documentation for db.CommentDB interface.
func (d *firestoreDB) DeleteCommitComment(c *types.CommitComment) error {
id := commitCommentId(c)
- _, err := firestore.Delete(d.commitComments().Doc(id), DEFAULT_ATTEMPTS, PUT_SINGLE_TIMEOUT)
- return err
+ ref := d.commitComments().Doc(id)
+ exists := true
+ if _, err := firestore.Get(ref, DEFAULT_ATTEMPTS, GET_SINGLE_TIMEOUT); err != nil {
+ if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
+ exists = false
+ } else {
+ return err
+ }
+ }
+ if exists {
+ if _, err := firestore.Delete(d.commitComments().Doc(id), DEFAULT_ATTEMPTS, PUT_SINGLE_TIMEOUT); err != nil {
+ return err
+ }
+ deleted := true
+ c.Deleted = &deleted
+ d.TrackModifiedCommitComment(c)
+ }
+ return nil
}
diff --git a/task_scheduler/go/db/firestore/firestore.go b/task_scheduler/go/db/firestore/firestore.go
index ca9f388..5607ca8 100644
--- a/task_scheduler/go/db/firestore/firestore.go
+++ b/task_scheduler/go/db/firestore/firestore.go
@@ -56,31 +56,24 @@
client *firestore.Client
parentDoc string
- // ModifiedTasksImpl and ModifiedJobsImpl are embedded in order to
- // implement db.ModifiedTasksReader and db.ModifiedJobsReader.
- db.ModifiedTasks
- db.ModifiedJobs
+ db.ModifiedData
}
// NewDB returns a db.DB which uses Cloud Firestore for storage. The parentDoc
// parameter is optional and indicates the path of a parent document to which
// all collections within the DB will belong. If it is not supplied, then the
// collections will be at the top level.
-func NewDB(ctx context.Context, project, instance string, ts oauth2.TokenSource, modTasks db.ModifiedTasks, modJobs db.ModifiedJobs) (db.BackupDBCloser, error) {
+func NewDB(ctx context.Context, project, instance string, ts oauth2.TokenSource, mod db.ModifiedData) (db.BackupDBCloser, error) {
client, err := firestore.NewClient(ctx, project, firestore.APP_TASK_SCHEDULER, instance, ts)
if err != nil {
return nil, err
}
- if modTasks == nil {
- modTasks = &modified.ModifiedTasksImpl{}
- }
- if modJobs == nil {
- modJobs = &modified.ModifiedJobsImpl{}
+ if mod == nil {
+ mod = modified.NewModifiedData()
}
return &firestoreDB{
- client: client,
- ModifiedTasks: modTasks,
- ModifiedJobs: modJobs,
+ client: client,
+ ModifiedData: mod,
}, nil
}
diff --git a/task_scheduler/go/db/firestore/firestore_test.go b/task_scheduler/go/db/firestore/firestore_test.go
index be7579e..4002133 100644
--- a/task_scheduler/go/db/firestore/firestore_test.go
+++ b/task_scheduler/go/db/firestore/firestore_test.go
@@ -24,7 +24,7 @@
testutils.MediumTest(t)
testutils.ManualTest(t)
instance := fmt.Sprintf("test-%s", uuid.New())
- d, err := NewDB(context.Background(), "skia-firestore", instance, nil, nil, nil)
+ d, err := NewDB(context.Background(), "skia-firestore", instance, nil, nil)
assert.NoError(t, err)
cleanup := func() {
c := d.(*firestoreDB).client
diff --git a/task_scheduler/go/db/local_db/busywork/main.go b/task_scheduler/go/db/local_db/busywork/main.go
index 01844df..41b083c 100644
--- a/task_scheduler/go/db/local_db/busywork/main.go
+++ b/task_scheduler/go/db/local_db/busywork/main.go
@@ -520,7 +520,7 @@
sklog.Fatal(err)
}
id := fmt.Sprintf("busywork_%s", hostname)
- d, err := firestore.NewDB(context.Background(), "skia-firestore", id, ts, nil, nil)
+ d, err := firestore.NewDB(context.Background(), "skia-firestore", id, ts, nil)
if err != nil {
sklog.Fatal(err)
}
diff --git a/task_scheduler/go/db/local_db/local_db.go b/task_scheduler/go/db/local_db/local_db.go
index 3109a63..689ecfe 100644
--- a/task_scheduler/go/db/local_db/local_db.go
+++ b/task_scheduler/go/db/local_db/local_db.go
@@ -18,6 +18,7 @@
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/db"
+ "go.skia.org/infra/task_scheduler/go/db/memory"
"go.skia.org/infra/task_scheduler/go/db/modified"
"go.skia.org/infra/task_scheduler/go/types"
)
@@ -230,12 +231,11 @@
// ModifiedTasksImpl and ModifiedJobsImpl are embedded in order to
// implement db.ModifiedTasksReader and db.ModifiedJobsReader.
- db.ModifiedTasks
- db.ModifiedJobs
+ db.ModifiedData
// CommentBox is embedded in order to implement db.CommentDB. CommentBox uses
// this localDB to persist the comments.
- *db.CommentBox
+ *memory.CommentBox
// Close will send on each of these channels to indicate goroutines should
// stop.
@@ -316,16 +316,13 @@
}
// NewDB returns a local DB instance.
-func NewDB(name, filename string, modTasks db.ModifiedTasks, modJobs db.ModifiedJobs) (db.BackupDBCloser, error) {
+func NewDB(name, filename string, mod db.ModifiedData) (db.BackupDBCloser, error) {
boltdb, err := bolt.Open(filename, 0600, nil)
if err != nil {
return nil, err
}
- if modTasks == nil {
- modTasks = &modified.ModifiedTasksImpl{}
- }
- if modJobs == nil {
- modJobs = &modified.ModifiedJobsImpl{}
+ if mod == nil {
+ mod = modified.NewModifiedData()
}
d := &localDB{
name: name,
@@ -384,8 +381,7 @@
"bucket": BUCKET_JOBS,
"count": "rows",
}),
- ModifiedTasks: modTasks,
- ModifiedJobs: modJobs,
+ ModifiedData: mod,
}
stopReportActiveTx := make(chan bool)
@@ -432,7 +428,7 @@
return nil, err
}
- d.CommentBox = db.NewCommentBoxWithPersistence(comments, d.writeCommentsMap)
+ d.CommentBox = memory.NewCommentBoxWithPersistence(mod, comments, d.writeCommentsMap)
if dbMetric, err := boltutil.NewDbMetric(boltdb, []string{BUCKET_TASKS, BUCKET_JOBS, BUCKET_COMMENTS}, map[string]string{"database": name}); err != nil {
return nil, err
@@ -533,7 +529,7 @@
d.metricReadTaskQueries.Inc(1)
min := []byte(start.Add(-MAX_CREATED_TIME_SKEW).UTC().Format(TIMESTAMP_FORMAT))
max := []byte(end.Add(MAX_CREATED_TIME_SKEW).UTC().Format(TIMESTAMP_FORMAT))
- decoder := types.TaskDecoder{}
+ decoder := types.NewTaskDecoder()
if err := d.view("GetTasksFromDateRange", func(tx *bolt.Tx) error {
c := tasksBucket(tx).Cursor()
for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
@@ -740,7 +736,7 @@
d.metricReadJobQueries.Inc(1)
min := []byte(start.UTC().Format(TIMESTAMP_FORMAT))
max := []byte(end.UTC().Format(TIMESTAMP_FORMAT))
- decoder := types.JobDecoder{}
+ decoder := types.NewJobDecoder()
if err := d.view("GetJobsFromDateRange", func(tx *bolt.Tx) error {
c := jobsBucket(tx).Cursor()
for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
diff --git a/task_scheduler/go/db/local_db/local_db_test.go b/task_scheduler/go/db/local_db/local_db_test.go
index 22b9649..ee2f1e9 100644
--- a/task_scheduler/go/db/local_db/local_db_test.go
+++ b/task_scheduler/go/db/local_db/local_db_test.go
@@ -150,7 +150,7 @@
func makeDB(t *testing.T, name string) (db.BackupDBCloser, string) {
tmpdir, err := ioutil.TempDir("", name)
assert.NoError(t, err)
- d, err := NewDB(name, filepath.Join(tmpdir, "task.db"), nil, nil)
+ d, err := NewDB(name, filepath.Join(tmpdir, "task.db"), nil)
assert.NoError(t, err)
return d, tmpdir
}
diff --git a/task_scheduler/go/db/local_db/ts_local_db_viewer/main.go b/task_scheduler/go/db/local_db/ts_local_db_viewer/main.go
index 87c44aa..00f7412 100644
--- a/task_scheduler/go/db/local_db/ts_local_db_viewer/main.go
+++ b/task_scheduler/go/db/local_db/ts_local_db_viewer/main.go
@@ -28,7 +28,7 @@
// Global init.
common.Init()
- d, err := local_db.NewDB(local_db.DB_NAME, *dbfile, nil, nil)
+ d, err := local_db.NewDB(local_db.DB_NAME, *dbfile, nil)
if err != nil {
sklog.Fatal(err)
}
diff --git a/task_scheduler/go/db/memory/comments.go b/task_scheduler/go/db/memory/comments.go
new file mode 100644
index 0000000..122f7bc
--- /dev/null
+++ b/task_scheduler/go/db/memory/comments.go
@@ -0,0 +1,399 @@
+package memory
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "go.skia.org/infra/go/sklog"
+ "go.skia.org/infra/go/util"
+ "go.skia.org/infra/task_scheduler/go/db"
+ "go.skia.org/infra/task_scheduler/go/db/modified"
+ "go.skia.org/infra/task_scheduler/go/types"
+)
+
+// CommentBox implements CommentDB with in-memory storage.
+//
+// When created via NewCommentBoxWithPersistence, CommentBox will persist the
+// in-memory representation on every change using the provided writer function.
+type CommentBox struct {
+ db.ModifiedComments
+
+ // mtx protects comments.
+ mtx sync.RWMutex
+ // comments is map[repo_name]*types.RepoComments.
+ comments map[string]*types.RepoComments
+ // writer is called to persist comments after every change.
+ writer func(map[string]*types.RepoComments) error
+}
+
+// NewCommentBoxWithPersistence creates a CommentBox that is initialized with
+// init and sends the updated in-memory representation to writer after each
+// change. The value of init and the argument to writer is
+// map[repo_name]*types.RepoComments. init must not be modified by the caller. writer
+// must not call any methods of CommentBox. writer may return an error to
+// prevent a change from taking effect.
+func NewCommentBoxWithPersistence(modComments db.ModifiedComments, init map[string]*types.RepoComments, writer func(map[string]*types.RepoComments) error) *CommentBox {
+ if modComments == nil {
+ modComments = &modified.ModifiedCommentsImpl{}
+ }
+ return &CommentBox{
+ ModifiedComments: modComments,
+ comments: init,
+ writer: writer,
+ }
+}
+
+// See documentation for CommentDB.GetCommentsForRepos.
+func (b *CommentBox) GetCommentsForRepos(repos []string, from time.Time) ([]*types.RepoComments, error) {
+ b.mtx.RLock()
+ defer b.mtx.RUnlock()
+ rv := make([]*types.RepoComments, len(repos))
+ for i, repo := range repos {
+ if rc, ok := b.comments[repo]; ok {
+ rv[i] = rc.Copy()
+ } else {
+ rv[i] = &types.RepoComments{Repo: repo}
+ }
+ }
+ return rv, nil
+}
+
+// write calls b.writer with comments if non-null.
+func (b *CommentBox) write() error {
+ if b.writer == nil {
+ return nil
+ }
+ return b.writer(b.comments)
+}
+
+// getRepoComments returns the initialized *types.RepoComments for the given repo.
+func (b *CommentBox) getRepoComments(repo string) *types.RepoComments {
+ if b.comments == nil {
+ b.comments = make(map[string]*types.RepoComments, 1)
+ }
+ rc, ok := b.comments[repo]
+ if !ok {
+ rc = &types.RepoComments{
+ Repo: repo,
+ TaskComments: map[string]map[string][]*types.TaskComment{},
+ TaskSpecComments: map[string][]*types.TaskSpecComment{},
+ CommitComments: map[string][]*types.CommitComment{},
+ }
+ b.comments[repo] = rc
+ }
+ return rc
+}
+
+// putTaskComment validates c and adds c to b.comments, or returns
+// db.ErrAlreadyExists if a different comment has the same ID fields. Assumes b.mtx
+// is write-locked.
+func (b *CommentBox) putTaskComment(c *types.TaskComment) error {
+ if c.Repo == "" || c.Revision == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
+ return fmt.Errorf("TaskComment missing required fields. %#v", c)
+ }
+ rc := b.getRepoComments(c.Repo)
+ nameMap, ok := rc.TaskComments[c.Revision]
+ if !ok {
+ nameMap = map[string][]*types.TaskComment{}
+ rc.TaskComments[c.Revision] = nameMap
+ }
+ cSlice := nameMap[c.Name]
+ // TODO(benjaminwagner): Would using utilities in the sort package make this
+ // cleaner?
+ if len(cSlice) > 0 {
+ // Assume comments normally inserted at the end.
+ insert := 0
+ for i := len(cSlice) - 1; i >= 0; i-- {
+ if cSlice[i].Timestamp.Equal(c.Timestamp) {
+ if *cSlice[i] == *c {
+ return nil
+ } else {
+ return db.ErrAlreadyExists
+ }
+ } else if cSlice[i].Timestamp.Before(c.Timestamp) {
+ insert = i + 1
+ break
+ }
+ }
+ // Ensure capacity for another comment and move any comments after the
+ // insertion point.
+ cSlice = append(cSlice, nil)
+ copy(cSlice[insert+1:], cSlice[insert:])
+ cSlice[insert] = c.Copy()
+ } else {
+ cSlice = []*types.TaskComment{c.Copy()}
+ }
+ nameMap[c.Name] = cSlice
+ return nil
+}
+
+// deleteTaskComment validates c, then finds and removes a comment matching c's
+// ID fields, returning the comment if found. Assumes b.mtx is write-locked.
+func (b *CommentBox) deleteTaskComment(c *types.TaskComment) (*types.TaskComment, error) {
+ if c.Repo == "" || c.Revision == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
+ return nil, fmt.Errorf("TaskComment missing required fields. %#v", c)
+ }
+ if rc, ok := b.comments[c.Repo]; ok {
+ if cSlice, ok := rc.TaskComments[c.Revision][c.Name]; ok {
+ // Assume linear search is fast.
+ for i, existing := range cSlice {
+ if existing.Timestamp.Equal(c.Timestamp) {
+ if len(cSlice) > 1 {
+ rc.TaskComments[c.Revision][c.Name] = append(cSlice[:i], cSlice[i+1:]...)
+ } else {
+ delete(rc.TaskComments[c.Revision], c.Name)
+ if len(rc.TaskComments[c.Revision]) == 0 {
+ delete(rc.TaskComments, c.Revision)
+ }
+ }
+ return existing, nil
+ }
+ }
+ }
+ }
+ return nil, nil
+}
+
+// See documentation for CommentDB.PutTaskComment.
+func (b *CommentBox) PutTaskComment(c *types.TaskComment) error {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+ if err := b.putTaskComment(c); err != nil {
+ return err
+ }
+ if err := b.write(); err != nil {
+ // If write returns an error, we must revert to previous.
+ if _, delErr := b.deleteTaskComment(c); delErr != nil {
+ sklog.Warningf("Unexpected error: %s", delErr)
+ }
+ return err
+ }
+ b.TrackModifiedTaskComment(c)
+ return nil
+}
+
+// See documentation for CommentDB.DeleteTaskComment.
+func (b *CommentBox) DeleteTaskComment(c *types.TaskComment) error {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+ existing, err := b.deleteTaskComment(c)
+ if err != nil {
+ return err
+ }
+ if existing != nil {
+ if err := b.write(); err != nil {
+ // If write returns an error, we must revert to previous.
+ if putErr := b.putTaskComment(existing); putErr != nil {
+ sklog.Warningf("Unexpected error: %s", putErr)
+ }
+ return err
+ }
+ deleted := true
+ c.Deleted = &deleted
+ b.TrackModifiedTaskComment(c)
+ }
+ return nil
+}
+
+// putTaskSpecComment validates c and adds c to b.comments, or returns
+// db.ErrAlreadyExists if a different comment has the same ID fields. Assumes b.mtx
+// is write-locked.
+func (b *CommentBox) putTaskSpecComment(c *types.TaskSpecComment) error {
+ if c.Repo == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
+ return fmt.Errorf("TaskSpecComment missing required fields. %#v", c)
+ }
+ rc := b.getRepoComments(c.Repo)
+ cSlice := rc.TaskSpecComments[c.Name]
+ if len(cSlice) > 0 {
+ // Assume comments normally inserted at the end.
+ insert := 0
+ for i := len(cSlice) - 1; i >= 0; i-- {
+ if cSlice[i].Timestamp.Equal(c.Timestamp) {
+ if *cSlice[i] == *c {
+ return nil
+ } else {
+ return db.ErrAlreadyExists
+ }
+ } else if cSlice[i].Timestamp.Before(c.Timestamp) {
+ insert = i + 1
+ break
+ }
+ }
+ // Ensure capacity for another comment and move any comments after the
+ // insertion point.
+ cSlice = append(cSlice, nil)
+ copy(cSlice[insert+1:], cSlice[insert:])
+ cSlice[insert] = c.Copy()
+ } else {
+ cSlice = []*types.TaskSpecComment{c.Copy()}
+ }
+ rc.TaskSpecComments[c.Name] = cSlice
+ return nil
+}
+
+// deleteTaskSpecComment validates c, then finds and removes a comment matching
+// c's ID fields, returning the comment if found. Assumes b.mtx is write-locked.
+func (b *CommentBox) deleteTaskSpecComment(c *types.TaskSpecComment) (*types.TaskSpecComment, error) {
+ if c.Repo == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
+ return nil, fmt.Errorf("TaskSpecComment missing required fields. %#v", c)
+ }
+ if rc, ok := b.comments[c.Repo]; ok {
+ if cSlice, ok := rc.TaskSpecComments[c.Name]; ok {
+ // Assume linear search is fast.
+ for i, existing := range cSlice {
+ if existing.Timestamp.Equal(c.Timestamp) {
+ if len(cSlice) > 1 {
+ rc.TaskSpecComments[c.Name] = append(cSlice[:i], cSlice[i+1:]...)
+ } else {
+ delete(rc.TaskSpecComments, c.Name)
+ }
+ return existing, nil
+ }
+ }
+ }
+ }
+ return nil, nil
+}
+
+// See documentation for CommentDB.PutTaskSpecComment.
+func (b *CommentBox) PutTaskSpecComment(c *types.TaskSpecComment) error {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+ if err := b.putTaskSpecComment(c); err != nil {
+ return err
+ }
+ if err := b.write(); err != nil {
+ // If write returns an error, we must revert to previous.
+ if _, delErr := b.deleteTaskSpecComment(c); delErr != nil {
+ sklog.Warningf("Unexpected error: %s", delErr)
+ }
+ return err
+ }
+ b.TrackModifiedTaskSpecComment(c)
+ return nil
+}
+
+// See documentation for CommentDB.DeleteTaskSpecComment.
+func (b *CommentBox) DeleteTaskSpecComment(c *types.TaskSpecComment) error {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+ existing, err := b.deleteTaskSpecComment(c)
+ if err != nil {
+ return err
+ }
+ if existing != nil {
+ if err := b.write(); err != nil {
+ // If write returns an error, we must revert to previous.
+ if putErr := b.putTaskSpecComment(existing); putErr != nil {
+ sklog.Warningf("Unexpected error: %s", putErr)
+ }
+ return err
+ }
+ deleted := true
+ c.Deleted = &deleted
+ b.TrackModifiedTaskSpecComment(c)
+ }
+ return nil
+}
+
+// putCommitComment validates c and adds c to b.comments, or returns
+// db.ErrAlreadyExists if a different comment has the same ID fields. Assumes b.mtx
+// is write-locked.
+func (b *CommentBox) putCommitComment(c *types.CommitComment) error {
+ if c.Repo == "" || c.Revision == "" || util.TimeIsZero(c.Timestamp) {
+ return fmt.Errorf("CommitComment missing required fields. %#v", c)
+ }
+ rc := b.getRepoComments(c.Repo)
+ cSlice := rc.CommitComments[c.Revision]
+ if len(cSlice) > 0 {
+ // Assume comments normally inserted at the end.
+ insert := 0
+ for i := len(cSlice) - 1; i >= 0; i-- {
+ if cSlice[i].Timestamp.Equal(c.Timestamp) {
+ if *cSlice[i] == *c {
+ return nil
+ } else {
+ return db.ErrAlreadyExists
+ }
+ } else if cSlice[i].Timestamp.Before(c.Timestamp) {
+ insert = i + 1
+ break
+ }
+ }
+ // Ensure capacity for another comment and move any comments after the
+ // insertion point.
+ cSlice = append(cSlice, nil)
+ copy(cSlice[insert+1:], cSlice[insert:])
+ cSlice[insert] = c.Copy()
+ } else {
+ cSlice = []*types.CommitComment{c.Copy()}
+ }
+ rc.CommitComments[c.Revision] = cSlice
+ return nil
+}
+
+// deleteCommitComment validates c, then finds and removes a comment matching
+// c's ID fields, returning the comment if found. Assumes b.mtx is write-locked.
+func (b *CommentBox) deleteCommitComment(c *types.CommitComment) (*types.CommitComment, error) {
+ if c.Repo == "" || c.Revision == "" || util.TimeIsZero(c.Timestamp) {
+ return nil, fmt.Errorf("CommitComment missing required fields. %#v", c)
+ }
+ if rc, ok := b.comments[c.Repo]; ok {
+ if cSlice, ok := rc.CommitComments[c.Revision]; ok {
+ // Assume linear search is fast.
+ for i, existing := range cSlice {
+ if existing.Timestamp.Equal(c.Timestamp) {
+ if len(cSlice) > 1 {
+ rc.CommitComments[c.Revision] = append(cSlice[:i], cSlice[i+1:]...)
+ } else {
+ delete(rc.CommitComments, c.Revision)
+ }
+ return existing, nil
+ }
+ }
+ }
+ }
+ return nil, nil
+}
+
+// See documentation for CommentDB.PutCommitComment.
+func (b *CommentBox) PutCommitComment(c *types.CommitComment) error {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+ if err := b.putCommitComment(c); err != nil {
+ return err
+ }
+ if err := b.write(); err != nil {
+ // If write returns an error, we must revert to previous.
+ if _, delErr := b.deleteCommitComment(c); delErr != nil {
+ sklog.Warningf("Unexpected error: %s", delErr)
+ }
+ return err
+ }
+ b.TrackModifiedCommitComment(c)
+ return nil
+}
+
+// See documentation for CommentDB.DeleteCommitComment.
+func (b *CommentBox) DeleteCommitComment(c *types.CommitComment) error {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+ existing, err := b.deleteCommitComment(c)
+ if err != nil {
+ return err
+ }
+ if existing != nil {
+ if err := b.write(); err != nil {
+ // If write returns an error, we must revert to previous.
+ if putErr := b.putCommitComment(existing); putErr != nil {
+ sklog.Warningf("Unexpected error: %s", putErr)
+ }
+ return err
+ }
+ deleted := true
+ c.Deleted = &deleted
+ b.TrackModifiedCommitComment(c)
+ }
+ return nil
+}
diff --git a/task_scheduler/go/db/comments_test.go b/task_scheduler/go/db/memory/comments_test.go
similarity index 63%
rename from task_scheduler/go/db/comments_test.go
rename to task_scheduler/go/db/memory/comments_test.go
index a00e224..54f1914 100644
--- a/task_scheduler/go/db/comments_test.go
+++ b/task_scheduler/go/db/memory/comments_test.go
@@ -1,26 +1,22 @@
-package db
+package memory
import (
"fmt"
- "os"
"testing"
"time"
assert "github.com/stretchr/testify/require"
"go.skia.org/infra/go/deepequal"
"go.skia.org/infra/go/testutils"
+ "go.skia.org/infra/task_scheduler/go/db"
+ "go.skia.org/infra/task_scheduler/go/db/modified"
"go.skia.org/infra/task_scheduler/go/types"
)
-func TestMain(m *testing.M) {
- AssertDeepEqual = deepequal.AssertDeepEqual
- os.Exit(m.Run())
-}
-
// TestCommentBox checks that CommentBox correctly implements CommentDB.
func TestCommentBox(t *testing.T) {
testutils.SmallTest(t)
- TestCommentDB(t, &CommentBox{})
+ db.TestCommentDB(t, &CommentBox{ModifiedComments: &modified.ModifiedCommentsImpl{}})
}
// TestCommentBoxWithPersistence checks that NewCommentBoxWithPersistence can be
@@ -36,7 +32,7 @@
return nil
}
- db := NewCommentBoxWithPersistence(nil, testWriter)
+ d := NewCommentBoxWithPersistence(nil, nil, testWriter)
now := time.Now()
@@ -51,23 +47,23 @@
TaskSpecComments: map[string][]*types.TaskSpecComment{},
CommitComments: map[string][]*types.CommitComment{},
}
- assert.NoError(t, db.PutTaskComment(tc1))
+ assert.NoError(t, d.PutTaskComment(tc1))
tc2 := types.MakeTaskComment(2, 1, 1, 1, now.Add(2*time.Second))
expected[r1].TaskComments["c1"]["n1"] = []*types.TaskComment{tc1, tc2}
- assert.NoError(t, db.PutTaskComment(tc2))
+ assert.NoError(t, d.PutTaskComment(tc2))
tc3 := types.MakeTaskComment(3, 1, 1, 1, now.Add(time.Second))
expected[r1].TaskComments["c1"]["n1"] = []*types.TaskComment{tc1, tc3, tc2}
- assert.NoError(t, db.PutTaskComment(tc3))
+ assert.NoError(t, d.PutTaskComment(tc3))
tc4 := types.MakeTaskComment(4, 1, 1, 2, now)
expected[r1].TaskComments["c2"] = map[string][]*types.TaskComment{"n1": {tc4}}
- assert.NoError(t, db.PutTaskComment(tc4))
+ assert.NoError(t, d.PutTaskComment(tc4))
tc5 := types.MakeTaskComment(5, 1, 2, 2, now)
expected[r1].TaskComments["c2"]["n2"] = []*types.TaskComment{tc5}
- assert.NoError(t, db.PutTaskComment(tc5))
+ assert.NoError(t, d.PutTaskComment(tc5))
tc6 := types.MakeTaskComment(6, 2, 3, 3, now)
r2 := tc6.Repo
@@ -77,21 +73,21 @@
TaskSpecComments: map[string][]*types.TaskSpecComment{},
CommitComments: map[string][]*types.CommitComment{},
}
- assert.NoError(t, db.PutTaskComment(tc6))
+ assert.NoError(t, d.PutTaskComment(tc6))
tc6copy := tc6.Copy() // Adding identical comment should be ignored.
- assert.NoError(t, db.PutTaskComment(tc6copy))
+ assert.NoError(t, d.PutTaskComment(tc6copy))
tc6.Message = "modifying after Put shouldn't affect stored comment"
assert.True(t, callCount >= 6)
sc1 := types.MakeTaskSpecComment(1, 1, 1, now)
expected[r1].TaskSpecComments["n1"] = []*types.TaskSpecComment{sc1}
- assert.NoError(t, db.PutTaskSpecComment(sc1))
+ assert.NoError(t, d.PutTaskSpecComment(sc1))
cc1 := types.MakeCommitComment(1, 1, 1, now)
expected[r1].CommitComments["c1"] = []*types.CommitComment{cc1}
- assert.NoError(t, db.PutCommitComment(cc1))
+ assert.NoError(t, d.PutCommitComment(cc1))
assert.True(t, callCount >= 8)
callCount = 0
@@ -99,13 +95,13 @@
// Check that if there's an error adding, testWriter is not called.
tc1different := tc1.Copy()
tc1different.Message = "not the same"
- assert.True(t, IsAlreadyExists(db.PutTaskComment(tc1different)))
+ assert.True(t, db.IsAlreadyExists(d.PutTaskComment(tc1different)))
sc1different := sc1.Copy()
sc1different.Message = "not the same"
- assert.True(t, IsAlreadyExists(db.PutTaskSpecComment(sc1different)))
+ assert.True(t, db.IsAlreadyExists(d.PutTaskSpecComment(sc1different)))
cc1different := cc1.Copy()
cc1different.Message = "not the same"
- assert.True(t, IsAlreadyExists(db.PutCommitComment(cc1different)))
+ assert.True(t, db.IsAlreadyExists(d.PutCommitComment(cc1different)))
assert.Equal(t, 0, callCount)
@@ -114,10 +110,10 @@
r1: expected[r1].Copy(),
r2: expected[r2].Copy(),
}
- db = NewCommentBoxWithPersistence(init, testWriter)
+ d = NewCommentBoxWithPersistence(nil, init, testWriter)
{
- actual, err := db.GetCommentsForRepos([]string{"r0", r1, r2}, now.Add(-10000*time.Hour))
+ actual, err := d.GetCommentsForRepos([]string{"r0", r1, r2}, now.Add(-10000*time.Hour))
assert.NoError(t, err)
expectedSlice := []*types.RepoComments{
{Repo: "r0"},
@@ -131,28 +127,28 @@
// Delete some comments.
expected[r1].TaskComments["c1"]["n1"] = []*types.TaskComment{tc1, tc2}
- assert.NoError(t, db.DeleteTaskComment(tc3))
+ assert.NoError(t, d.DeleteTaskComment(tc3))
expected[r1].TaskSpecComments = map[string][]*types.TaskSpecComment{}
- assert.NoError(t, db.DeleteTaskSpecComment(sc1))
+ assert.NoError(t, d.DeleteTaskSpecComment(sc1))
expected[r1].CommitComments = map[string][]*types.CommitComment{}
- assert.NoError(t, db.DeleteCommitComment(cc1))
+ assert.NoError(t, d.DeleteCommitComment(cc1))
assert.Equal(t, 3, callCount)
// Delete of nonexistent task should succeed.
- assert.NoError(t, db.DeleteTaskComment(types.MakeTaskComment(99, 1, 1, 1, now.Add(99*time.Second))))
- assert.NoError(t, db.DeleteTaskComment(types.MakeTaskComment(99, 1, 1, 99, now)))
- assert.NoError(t, db.DeleteTaskComment(types.MakeTaskComment(99, 1, 99, 1, now)))
- assert.NoError(t, db.DeleteTaskComment(types.MakeTaskComment(99, 99, 1, 1, now)))
- assert.NoError(t, db.DeleteTaskSpecComment(types.MakeTaskSpecComment(99, 1, 1, now.Add(99*time.Second))))
- assert.NoError(t, db.DeleteTaskSpecComment(types.MakeTaskSpecComment(99, 1, 99, now)))
- assert.NoError(t, db.DeleteTaskSpecComment(types.MakeTaskSpecComment(99, 99, 1, now)))
- assert.NoError(t, db.DeleteCommitComment(types.MakeCommitComment(99, 1, 1, now.Add(99*time.Second))))
- assert.NoError(t, db.DeleteCommitComment(types.MakeCommitComment(99, 1, 99, now)))
- assert.NoError(t, db.DeleteCommitComment(types.MakeCommitComment(99, 99, 1, now)))
+ assert.NoError(t, d.DeleteTaskComment(types.MakeTaskComment(99, 1, 1, 1, now.Add(99*time.Second))))
+ assert.NoError(t, d.DeleteTaskComment(types.MakeTaskComment(99, 1, 1, 99, now)))
+ assert.NoError(t, d.DeleteTaskComment(types.MakeTaskComment(99, 1, 99, 1, now)))
+ assert.NoError(t, d.DeleteTaskComment(types.MakeTaskComment(99, 99, 1, 1, now)))
+ assert.NoError(t, d.DeleteTaskSpecComment(types.MakeTaskSpecComment(99, 1, 1, now.Add(99*time.Second))))
+ assert.NoError(t, d.DeleteTaskSpecComment(types.MakeTaskSpecComment(99, 1, 99, now)))
+ assert.NoError(t, d.DeleteTaskSpecComment(types.MakeTaskSpecComment(99, 99, 1, now)))
+ assert.NoError(t, d.DeleteCommitComment(types.MakeCommitComment(99, 1, 1, now.Add(99*time.Second))))
+ assert.NoError(t, d.DeleteCommitComment(types.MakeCommitComment(99, 1, 99, now)))
+ assert.NoError(t, d.DeleteCommitComment(types.MakeCommitComment(99, 99, 1, now)))
{
- actual, err := db.GetCommentsForRepos([]string{"r0", r1, r2}, now.Add(-10000*time.Hour))
+ actual, err := d.GetCommentsForRepos([]string{"r0", r1, r2}, now.Add(-10000*time.Hour))
assert.NoError(t, err)
expectedSlice := []*types.RepoComments{
{Repo: "r0"},
@@ -167,10 +163,10 @@
r1: expected[r1].Copy(),
r2: expected[r2].Copy(),
}
- db = NewCommentBoxWithPersistence(init, testWriter)
+ d = NewCommentBoxWithPersistence(nil, init, testWriter)
{
- actual, err := db.GetCommentsForRepos([]string{"r0", r1, r2}, now.Add(-10000*time.Hour))
+ actual, err := d.GetCommentsForRepos([]string{"r0", r1, r2}, now.Add(-10000*time.Hour))
assert.NoError(t, err)
expectedSlice := []*types.RepoComments{
{Repo: "r0"},
@@ -193,7 +189,7 @@
return injectedError
}
- db := NewCommentBoxWithPersistence(nil, testWriter)
+ d := NewCommentBoxWithPersistence(nil, nil, testWriter)
now := time.Now()
@@ -205,16 +201,16 @@
tc5 := types.MakeTaskComment(5, 1, 2, 2, now)
tc6 := types.MakeTaskComment(6, 2, 3, 3, now)
for _, c := range []*types.TaskComment{tc1, tc2, tc3, tc4, tc5, tc6} {
- assert.NoError(t, db.PutTaskComment(c))
+ assert.NoError(t, d.PutTaskComment(c))
}
r1 := tc1.Repo
r2 := tc6.Repo
sc1 := types.MakeTaskSpecComment(1, 1, 1, now)
- assert.NoError(t, db.PutTaskSpecComment(sc1))
+ assert.NoError(t, d.PutTaskSpecComment(sc1))
cc1 := types.MakeCommitComment(1, 1, 1, now)
- assert.NoError(t, db.PutCommitComment(cc1))
+ assert.NoError(t, d.PutCommitComment(cc1))
expected := []*types.RepoComments{
{
@@ -248,7 +244,7 @@
}
{
- actual, err := db.GetCommentsForRepos([]string{r1, r2}, now.Add(-10000*time.Hour))
+ actual, err := d.GetCommentsForRepos([]string{r1, r2}, now.Add(-10000*time.Hour))
assert.NoError(t, err)
deepequal.AssertDeepEqual(t, expected, actual)
}
@@ -257,33 +253,33 @@
injectedError = fmt.Errorf("No comments from the peanut gallery.")
- assert.Error(t, db.PutTaskComment(types.MakeTaskComment(99, 1, 1, 1, now.Add(99*time.Second))))
- assert.Error(t, db.PutTaskComment(types.MakeTaskComment(99, 1, 1, 99, now)))
- assert.Error(t, db.PutTaskComment(types.MakeTaskComment(99, 1, 99, 1, now)))
- assert.Error(t, db.PutTaskComment(types.MakeTaskComment(99, 99, 1, 1, now)))
- assert.Error(t, db.PutTaskSpecComment(types.MakeTaskSpecComment(99, 1, 1, now.Add(99*time.Second))))
- assert.Error(t, db.PutTaskSpecComment(types.MakeTaskSpecComment(99, 1, 99, now)))
- assert.Error(t, db.PutTaskSpecComment(types.MakeTaskSpecComment(99, 99, 1, now)))
- assert.Error(t, db.PutCommitComment(types.MakeCommitComment(99, 1, 1, now.Add(99*time.Second))))
- assert.Error(t, db.PutCommitComment(types.MakeCommitComment(99, 1, 99, now)))
- assert.Error(t, db.PutCommitComment(types.MakeCommitComment(99, 99, 1, now)))
+ assert.Error(t, d.PutTaskComment(types.MakeTaskComment(99, 1, 1, 1, now.Add(99*time.Second))))
+ assert.Error(t, d.PutTaskComment(types.MakeTaskComment(99, 1, 1, 99, now)))
+ assert.Error(t, d.PutTaskComment(types.MakeTaskComment(99, 1, 99, 1, now)))
+ assert.Error(t, d.PutTaskComment(types.MakeTaskComment(99, 99, 1, 1, now)))
+ assert.Error(t, d.PutTaskSpecComment(types.MakeTaskSpecComment(99, 1, 1, now.Add(99*time.Second))))
+ assert.Error(t, d.PutTaskSpecComment(types.MakeTaskSpecComment(99, 1, 99, now)))
+ assert.Error(t, d.PutTaskSpecComment(types.MakeTaskSpecComment(99, 99, 1, now)))
+ assert.Error(t, d.PutCommitComment(types.MakeCommitComment(99, 1, 1, now.Add(99*time.Second))))
+ assert.Error(t, d.PutCommitComment(types.MakeCommitComment(99, 1, 99, now)))
+ assert.Error(t, d.PutCommitComment(types.MakeCommitComment(99, 99, 1, now)))
assert.Equal(t, 10, callCount)
// Assert nothing has changed.
{
- actual, err := db.GetCommentsForRepos([]string{r1, r2}, now.Add(-10000*time.Hour))
+ actual, err := d.GetCommentsForRepos([]string{r1, r2}, now.Add(-10000*time.Hour))
assert.NoError(t, err)
deepequal.AssertDeepEqual(t, expected, actual)
}
- assert.Error(t, db.DeleteTaskComment(tc1))
- assert.Error(t, db.DeleteTaskSpecComment(sc1))
- assert.Error(t, db.DeleteCommitComment(cc1))
+ assert.Error(t, d.DeleteTaskComment(tc1))
+ assert.Error(t, d.DeleteTaskSpecComment(sc1))
+ assert.Error(t, d.DeleteCommitComment(cc1))
// Assert nothing has changed.
{
- actual, err := db.GetCommentsForRepos([]string{r1, r2}, now.Add(-10000*time.Hour))
+ actual, err := d.GetCommentsForRepos([]string{r1, r2}, now.Add(-10000*time.Hour))
assert.NoError(t, err)
deepequal.AssertDeepEqual(t, expected, actual)
}
diff --git a/task_scheduler/go/db/memory/memory.go b/task_scheduler/go/db/memory/memory.go
index 0a9700b..7f6b84d 100644
--- a/task_scheduler/go/db/memory/memory.go
+++ b/task_scheduler/go/db/memory/memory.go
@@ -208,6 +208,9 @@
// NewInMemoryDB returns an extremely simple, inefficient, in-memory DB
// implementation.
-func NewInMemoryDB(modTasks db.ModifiedTasks, modJobs db.ModifiedJobs) db.DB {
- return db.NewDB(NewInMemoryTaskDB(modTasks), NewInMemoryJobDB(modJobs), &db.CommentBox{})
+func NewInMemoryDB(mod db.ModifiedData) db.DB {
+ if mod == nil {
+ mod = modified.NewModifiedData()
+ }
+ return db.NewDB(NewInMemoryTaskDB(mod), NewInMemoryJobDB(mod), &CommentBox{ModifiedComments: mod})
}
diff --git a/task_scheduler/go/db/migrate/migrate.go b/task_scheduler/go/db/migrate/migrate.go
index 33b6cd0..7c3651e 100644
--- a/task_scheduler/go/db/migrate/migrate.go
+++ b/task_scheduler/go/db/migrate/migrate.go
@@ -121,7 +121,7 @@
}
ctx := context.Background()
- oldDB, err := local_db.NewDB(local_db.DB_NAME, *boltDB, nil, nil)
+ oldDB, err := local_db.NewDB(local_db.DB_NAME, *boltDB, nil)
if err != nil {
sklog.Fatal(err)
}
@@ -131,7 +131,7 @@
if err != nil {
sklog.Fatal(err)
}
- newDB, err := firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *fsInstance, ts, nil, nil)
+ newDB, err := firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *fsInstance, ts, nil)
if err != nil {
sklog.Fatal(err)
}
diff --git a/task_scheduler/go/db/modified/modified_data.go b/task_scheduler/go/db/modified/modified_data.go
index 21b3f6f..471355c 100644
--- a/task_scheduler/go/db/modified/modified_data.go
+++ b/task_scheduler/go/db/modified/modified_data.go
@@ -4,6 +4,7 @@
"bytes"
"encoding/gob"
"sort"
+ "strings"
"sync"
"time"
@@ -13,6 +14,12 @@
"go.skia.org/infra/task_scheduler/go/types"
)
+// NewModifiedData returns a db.ModifiedData instance which tracks modifications
+// in memory.
+func NewModifiedData() db.ModifiedData {
+ return db.NewModifiedData(&ModifiedTasksImpl{}, &ModifiedJobsImpl{}, &ModifiedCommentsImpl{})
+}
+
// modifiedData allows subscribers to keep track of DB entries that have been
// modified. It is designed to be used with wrappers in order to store a desired
// type of data.
@@ -125,7 +132,7 @@
if err != nil {
return nil, err
}
- d := types.TaskDecoder{}
+ d := types.NewTaskDecoder()
for _, g := range tasks {
if !d.Process(g) {
break
@@ -178,7 +185,7 @@
if err != nil {
return nil, err
}
- d := types.JobDecoder{}
+ d := types.NewJobDecoder()
for _, g := range jobs {
if !d.Process(g) {
break
@@ -221,5 +228,128 @@
m.m.StopTrackingModifiedEntries(id)
}
+type ModifiedCommentsImpl struct {
+ tasks modifiedData
+ taskSpecs modifiedData
+ commits modifiedData
+}
+
+// See docs for ModifiedComments interface.
+func (m *ModifiedCommentsImpl) GetModifiedComments(id string) ([]*types.TaskComment, []*types.TaskSpecComment, []*types.CommitComment, error) {
+ ids := strings.Split(id, "#")
+ if len(ids) != 3 {
+ return nil, nil, nil, db.ErrUnknownId
+ }
+ tasks, err := m.tasks.GetModifiedEntries(ids[0])
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ d1 := types.NewTaskCommentDecoder()
+ for _, g := range tasks {
+ if !d1.Process(g) {
+ break
+ }
+ }
+ rv1, err := d1.Result()
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ sort.Sort(types.TaskCommentSlice(rv1))
+
+ taskSpecs, err := m.taskSpecs.GetModifiedEntries(ids[1])
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ d2 := types.NewTaskSpecCommentDecoder()
+ for _, g := range taskSpecs {
+ if !d2.Process(g) {
+ break
+ }
+ }
+ rv2, err := d2.Result()
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ sort.Sort(types.TaskSpecCommentSlice(rv2))
+
+ commits, err := m.commits.GetModifiedEntries(ids[2])
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ d3 := types.NewCommitCommentDecoder()
+ for _, g := range commits {
+ if !d3.Process(g) {
+ break
+ }
+ }
+ rv3, err := d3.Result()
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ sort.Sort(types.CommitCommentSlice(rv3))
+
+ return rv1, rv2, rv3, nil
+}
+
+// See docs for ModifiedComments interface.
+func (m *ModifiedCommentsImpl) TrackModifiedTaskComment(c *types.TaskComment) {
+ var buf bytes.Buffer
+ if err := gob.NewEncoder(&buf).Encode(c); err != nil {
+ sklog.Fatal(err)
+ }
+ m.tasks.TrackModifiedEntries(map[string][]byte{c.Id(): buf.Bytes()})
+}
+
+// See docs for ModifiedComments interface.
+func (m *ModifiedCommentsImpl) TrackModifiedTaskSpecComment(c *types.TaskSpecComment) {
+ var buf bytes.Buffer
+ if err := gob.NewEncoder(&buf).Encode(c); err != nil {
+ sklog.Fatal(err)
+ }
+ m.taskSpecs.TrackModifiedEntries(map[string][]byte{c.Id(): buf.Bytes()})
+}
+
+// See docs for ModifiedComments interface.
+func (m *ModifiedCommentsImpl) TrackModifiedCommitComment(c *types.CommitComment) {
+ var buf bytes.Buffer
+ if err := gob.NewEncoder(&buf).Encode(c); err != nil {
+ sklog.Fatal(err)
+ }
+ m.commits.TrackModifiedEntries(map[string][]byte{c.Id(): buf.Bytes()})
+}
+
+// See docs for ModifiedComments interface.
+func (m *ModifiedCommentsImpl) StartTrackingModifiedComments() (string, error) {
+ id1, err := m.tasks.StartTrackingModifiedEntries()
+ if err != nil {
+ return "", err
+ }
+ id2, err := m.taskSpecs.StartTrackingModifiedEntries()
+ if err != nil {
+ m.tasks.StopTrackingModifiedEntries(id1)
+ return "", err
+ }
+ id3, err := m.commits.StartTrackingModifiedEntries()
+ if err != nil {
+ m.tasks.StopTrackingModifiedEntries(id1)
+ m.taskSpecs.StopTrackingModifiedEntries(id2)
+ return "", err
+ }
+ return id1 + "#" + id2 + "#" + id3, nil
+}
+
+// See docs for ModifiedComments interface.
+func (m *ModifiedCommentsImpl) StopTrackingModifiedComments(id string) {
+ ids := strings.Split(id, "#")
+ if len(ids) != 3 {
+ sklog.Errorf("Invalid id %q", id)
+ return
+ }
+ m.tasks.StopTrackingModifiedEntries(ids[0])
+ m.taskSpecs.StopTrackingModifiedEntries(ids[1])
+ m.commits.StopTrackingModifiedEntries(ids[2])
+}
+
var _ db.ModifiedTasks = &ModifiedTasksImpl{}
var _ db.ModifiedJobs = &ModifiedJobsImpl{}
+var _ db.ModifiedComments = &ModifiedCommentsImpl{}
diff --git a/task_scheduler/go/db/modified/modified_data_test.go b/task_scheduler/go/db/modified/modified_data_test.go
index 7d1a222..cb771ce 100644
--- a/task_scheduler/go/db/modified/modified_data_test.go
+++ b/task_scheduler/go/db/modified/modified_data_test.go
@@ -67,3 +67,34 @@
_, err = m.StartTrackingModifiedJobs()
assert.NoError(t, err)
}
+
+func TestDefaultModifiedComments(t *testing.T) {
+ testutils.MediumTest(t)
+ m := &ModifiedCommentsImpl{}
+ db.TestModifiedComments(t, m)
+}
+
+func TestDefaultMultipleCommentModifications(t *testing.T) {
+ testutils.MediumTest(t)
+ m := &ModifiedCommentsImpl{}
+ db.TestMultipleCommentModifications(t, m)
+}
+
+func TestDefaultModifiedCommentsTooManyUsers(t *testing.T) {
+ testutils.MediumTest(t)
+ m := ModifiedCommentsImpl{}
+
+ var oneId string
+ // Max out the number of modified-tasks users; ensure that we error out.
+ for i := 0; i < db.MAX_MODIFIED_DATA_USERS; i++ {
+ id, err := m.StartTrackingModifiedComments()
+ assert.NoError(t, err)
+ oneId = id
+ }
+ _, err := m.StartTrackingModifiedComments()
+ assert.True(t, db.IsTooManyUsers(err))
+
+ m.StopTrackingModifiedComments(oneId)
+ _, err = m.StartTrackingModifiedComments()
+ assert.NoError(t, err)
+}
diff --git a/task_scheduler/go/db/modified/mux.go b/task_scheduler/go/db/modified/mux.go
index 7d25457..d89aa68 100644
--- a/task_scheduler/go/db/modified/mux.go
+++ b/task_scheduler/go/db/modified/mux.go
@@ -7,6 +7,23 @@
"go.skia.org/infra/task_scheduler/go/types"
)
+// NewMuxModifiedData returns a db.ModifiedData implementation which writes to
+// multiple ModifiedData instances but only reads from one.
+func NewMuxModifiedData(readWrite db.ModifiedData, writeOnly ...db.ModifiedData) db.ModifiedData {
+ tWriteOnly := make([]db.ModifiedTasks, 0, len(writeOnly))
+ jWriteOnly := make([]db.ModifiedJobs, 0, len(writeOnly))
+ cWriteOnly := make([]db.ModifiedComments, 0, len(writeOnly))
+ for _, wo := range writeOnly {
+ tWriteOnly = append(tWriteOnly, wo)
+ jWriteOnly = append(jWriteOnly, wo)
+ cWriteOnly = append(cWriteOnly, wo)
+ }
+ t := NewMuxModifiedTasks(readWrite, tWriteOnly...)
+ j := NewMuxModifiedJobs(readWrite, jWriteOnly...)
+ c := NewMuxModifiedComments(readWrite, cWriteOnly...)
+ return db.NewModifiedData(t, j, c)
+}
+
// MuxModifiedTasks is an implementation of db.ModifiedTasks which writes to
// multiple ModifiedTasks instances but only reads from one.
type MuxModifiedTasks struct {
@@ -14,7 +31,7 @@
writeOnly []db.ModifiedTasks
}
-// New MuxModifiedTasks returns an implementation of db.ModifiedTasks which
+// NewMuxModifiedTasks returns an implementation of db.ModifiedTasks which
// writes to multiple ModifiedTasks instances but only reads from one.
func NewMuxModifiedTasks(readWrite db.ModifiedTasks, writeOnly ...db.ModifiedTasks) db.ModifiedTasks {
return &MuxModifiedTasks{
@@ -70,3 +87,43 @@
wo.TrackModifiedJobsGOB(dbModified, gobs)
}
}
+
+// MuxModifiedComments is an implementation of db.ModifiedComments which writes
+// to multiple ModifiedJobs instances but only reads from one.
+type MuxModifiedComments struct {
+ db.ModifiedComments
+ writeOnly []db.ModifiedComments
+}
+
+// New MuxModifiedComments returns an implementation of db.ModifiedComments
+// which writes to multiple ModifiedJobs instances but only reads from one.
+func NewMuxModifiedComments(readWrite db.ModifiedComments, writeOnly ...db.ModifiedComments) db.ModifiedComments {
+ return &MuxModifiedComments{
+ ModifiedComments: readWrite,
+ writeOnly: writeOnly,
+ }
+}
+
+// See documentation for db.ModifiedComments interface.
+func (m *MuxModifiedComments) TrackModifiedTaskComment(c *types.TaskComment) {
+ m.ModifiedComments.TrackModifiedTaskComment(c)
+ for _, wo := range m.writeOnly {
+ wo.TrackModifiedTaskComment(c)
+ }
+}
+
+// See documentation for db.ModifiedComments interface.
+func (m *MuxModifiedComments) TrackModifiedTaskSpecComment(c *types.TaskSpecComment) {
+ m.ModifiedComments.TrackModifiedTaskSpecComment(c)
+ for _, wo := range m.writeOnly {
+ wo.TrackModifiedTaskSpecComment(c)
+ }
+}
+
+// See documentation for db.ModifiedComments interface.
+func (m *MuxModifiedComments) TrackModifiedCommitComment(c *types.CommitComment) {
+ m.ModifiedComments.TrackModifiedCommitComment(c)
+ for _, wo := range m.writeOnly {
+ wo.TrackModifiedCommitComment(c)
+ }
+}
diff --git a/task_scheduler/go/db/modified/mux_test.go b/task_scheduler/go/db/modified/mux_test.go
index fc63641..a09bd10 100644
--- a/task_scheduler/go/db/modified/mux_test.go
+++ b/task_scheduler/go/db/modified/mux_test.go
@@ -104,3 +104,53 @@
m.TrackModifiedJob(t1)
check(t1)
}
+
+func TestMuxModifiedComments(t *testing.T) {
+ testutils.MediumTest(t)
+ m := NewMuxModifiedComments(&ModifiedCommentsImpl{}, &ModifiedCommentsImpl{})
+ db.TestModifiedComments(t, m)
+}
+
+func TestMuxMultipleCommentModifications(t *testing.T) {
+ testutils.MediumTest(t)
+ m := NewMuxModifiedComments(&ModifiedCommentsImpl{}, &ModifiedCommentsImpl{})
+ db.TestMultipleCommentModifications(t, m)
+}
+
+// Simple test to verify that we actually write to the write-only ModifiedComments.
+func TestMuxModifiedCommentsWriteOnly(t *testing.T) {
+ testutils.MediumTest(t)
+ rw := &ModifiedCommentsImpl{}
+ w1 := &ModifiedCommentsImpl{}
+ w2 := &ModifiedCommentsImpl{}
+ w3 := &ModifiedCommentsImpl{}
+ m := NewMuxModifiedComments(rw, w1, w2, w3)
+ rwId, err := m.StartTrackingModifiedComments()
+ assert.NoError(t, err)
+ wo := []db.ModifiedComments{w1, w2, w3}
+ ids := []string{}
+ for _, w := range wo {
+ id, err := w.StartTrackingModifiedComments()
+ assert.NoError(t, err)
+ ids = append(ids, id)
+ }
+
+ check := func(e1 []*types.TaskComment, e2 []*types.TaskSpecComment, e3 []*types.CommitComment) {
+ a1, a2, a3, err := m.GetModifiedComments(rwId)
+ assert.NoError(t, err)
+ deepequal.AssertDeepEqual(t, e1, a1)
+ deepequal.AssertDeepEqual(t, e2, a2)
+ deepequal.AssertDeepEqual(t, e3, a3)
+ for idx, w := range []db.ModifiedComments{w1, w2, w3} {
+ a1, a2, a3, err := w.GetModifiedComments(ids[idx])
+ assert.NoError(t, err)
+ deepequal.AssertDeepEqual(t, e1, a1)
+ deepequal.AssertDeepEqual(t, e2, a2)
+ deepequal.AssertDeepEqual(t, e3, a3)
+ }
+ }
+ check([]*types.TaskComment{}, []*types.TaskSpecComment{}, []*types.CommitComment{})
+ c1 := types.MakeTaskComment(1, 1, 1, 1, time.Now())
+ m.TrackModifiedTaskComment(c1)
+ check([]*types.TaskComment{c1}, []*types.TaskSpecComment{}, []*types.CommitComment{})
+}
diff --git a/task_scheduler/go/db/pubsub/modified.go b/task_scheduler/go/db/pubsub/modified.go
index 442ea85..0d95504 100644
--- a/task_scheduler/go/db/pubsub/modified.go
+++ b/task_scheduler/go/db/pubsub/modified.go
@@ -6,6 +6,7 @@
"encoding/gob"
"fmt"
"sort"
+ "strings"
"sync"
"time"
@@ -18,6 +19,27 @@
"google.golang.org/api/option"
)
+// NewModifiedData returns a db.ModifiedData instance which uses pubsub.
+func NewModifiedData(topicSet, label string, ts oauth2.TokenSource) (db.ModifiedData, error) {
+ topicSetObj, ok := topics[topicSet]
+ if !ok {
+ return nil, fmt.Errorf("Topic must be one of %v, not %q", VALID_TOPIC_SETS, topicSet)
+ }
+ t, err := NewModifiedTasks(topicSetObj.tasks, label, ts)
+ if err != nil {
+ return nil, err
+ }
+ j, err := NewModifiedJobs(topicSetObj.jobs, label, ts)
+ if err != nil {
+ return nil, err
+ }
+ c, err := NewModifiedComments(topicSetObj.taskComments, topicSetObj.taskSpecComments, topicSetObj.commitComments, label, ts)
+ if err != nil {
+ return nil, err
+ }
+ return db.NewModifiedData(t, j, c), nil
+}
+
type entry struct {
ts time.Time
data []byte
@@ -133,7 +155,7 @@
data: m.Data,
}
} else {
- sklog.Debugf("Received duplicate or outdated message for %s", dataId)
+ sklog.Debugf("Received duplicate or outdated message (%s vs %s) for %s", prev.ts, dbModified, dataId)
}
return nil
})
@@ -346,3 +368,164 @@
func (c *jobClient) TrackModifiedJobsGOB(ts time.Time, jobsById map[string][]byte) {
c.publisher.publishGOB(ts, jobsById)
}
+
+// commentClient implements db.ModifiedComments using pubsub.
+type commentClient struct {
+ tasks *modifiedClient
+ taskSpecs *modifiedClient
+ commits *modifiedClient
+}
+
+// NewModifiedComments returns a db.ModifiedComments which uses pubsub. The
+// topics should be one of the sets of TOPIC_* constants defined in this
+// package. The subscriberLabel is included in the subscription ID, along with a
+// timestamp; this should help to debug zombie subscriptions. It should be
+// descriptive and unique to this process, or if the process uses multiple
+// instances of ModifiedJobs, unique to each instance.
+func NewModifiedComments(taskCommentsTopic, taskSpecCommentsTopic, commitCommentsTopic string, label string, ts oauth2.TokenSource) (db.ModifiedComments, error) {
+ c, err := pubsub.NewClient(context.Background(), PROJECT_ID, option.WithTokenSource(ts))
+ if err != nil {
+ return nil, err
+ }
+ tasks, err := newModifiedClient(c, taskCommentsTopic, label)
+ if err != nil {
+ return nil, err
+ }
+ taskSpecs, err := newModifiedClient(c, taskSpecCommentsTopic, label)
+ if err != nil {
+ return nil, err
+ }
+ commits, err := newModifiedClient(c, commitCommentsTopic, label)
+ if err != nil {
+ return nil, err
+ }
+ return &commentClient{
+ tasks: tasks,
+ taskSpecs: taskSpecs,
+ commits: commits,
+ }, nil
+}
+
+// See documentation for db.ModifiedComments interface.
+func (c *commentClient) GetModifiedComments(id string) ([]*types.TaskComment, []*types.TaskSpecComment, []*types.CommitComment, error) {
+ ids := strings.Split(id, "#")
+ if len(ids) != 3 {
+ return nil, nil, nil, db.ErrUnknownId
+ }
+ gobs, err := c.tasks.getModifiedData(ids[0])
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ rv1 := make([]*types.TaskComment, 0, len(gobs))
+ for _, g := range gobs {
+ var c types.TaskComment
+ if err := gob.NewDecoder(bytes.NewReader(g)).Decode(&c); err != nil {
+ // We didn't attempt to decode the blob in the pubsub
+ // message when we received it. Ignore this job.
+ sklog.Errorf("Failed to decode job from pubsub message: %s", err)
+ } else {
+ rv1 = append(rv1, &c)
+ }
+ }
+ sort.Sort(types.TaskCommentSlice(rv1))
+
+ gobs, err = c.taskSpecs.getModifiedData(ids[1])
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ rv2 := make([]*types.TaskSpecComment, 0, len(gobs))
+ for _, g := range gobs {
+ var c types.TaskSpecComment
+ if err := gob.NewDecoder(bytes.NewReader(g)).Decode(&c); err != nil {
+ // We didn't attempt to decode the blob in the pubsub
+ // message when we received it. Ignore this job.
+ sklog.Errorf("Failed to decode job from pubsub message: %s", err)
+ } else {
+ rv2 = append(rv2, &c)
+ }
+ }
+ sort.Sort(types.TaskSpecCommentSlice(rv2))
+
+ gobs, err = c.commits.getModifiedData(ids[2])
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ rv3 := make([]*types.CommitComment, 0, len(gobs))
+ for _, g := range gobs {
+ var c types.CommitComment
+ if err := gob.NewDecoder(bytes.NewReader(g)).Decode(&c); err != nil {
+ // We didn't attempt to decode the blob in the pubsub
+ // message when we received it. Ignore this job.
+ sklog.Errorf("Failed to decode job from pubsub message: %s", err)
+ } else {
+ rv3 = append(rv3, &c)
+ }
+ }
+ sort.Sort(types.CommitCommentSlice(rv3))
+ return rv1, rv2, rv3, nil
+}
+
+// See documentation for db.ModifiedComments interface.
+func (c *commentClient) StartTrackingModifiedComments() (string, error) {
+ id1, err := c.tasks.startTrackingModifiedData()
+ if err != nil {
+ return "", err
+ }
+ id2, err := c.taskSpecs.startTrackingModifiedData()
+ if err != nil {
+ return "", err
+ }
+ id3, err := c.commits.startTrackingModifiedData()
+ if err != nil {
+ return "", err
+ }
+ return id1 + "#" + id2 + "#" + id3, nil
+}
+
+// See documentation for db.ModifiedComments interface.
+func (c *commentClient) StopTrackingModifiedComments(id string) {
+ ids := strings.Split(id, "#")
+ if len(ids) != 3 {
+ sklog.Errorf("Invalid ID %q", id)
+ return
+ }
+ c.tasks.stopTrackingModifiedData(ids[0])
+ c.taskSpecs.stopTrackingModifiedData(ids[1])
+ c.commits.stopTrackingModifiedData(ids[2])
+}
+
+// See documentation for db.ModifiedComments interface.
+func (c *commentClient) TrackModifiedTaskComment(tc *types.TaskComment) {
+ // Hack: since the timestamp is part of the ID, we can't change it. But,
+ // we have to provide a different timestamp from the one we sent when
+ // the comment was created, or else it'll get de-duplicated.
+ ts := tc.Timestamp
+ if tc.Deleted != nil && *tc.Deleted {
+ ts = time.Now()
+ }
+ c.tasks.publisher.publish(tc.Id(), ts, tc)
+}
+
+// See documentation for db.ModifiedComments interface.
+func (c *commentClient) TrackModifiedTaskSpecComment(tc *types.TaskSpecComment) {
+ // Hack: since the timestamp is part of the ID, we can't change it. But,
+ // we have to provide a different timestamp from the one we sent when
+ // the comment was created, or else it'll get de-duplicated.
+ ts := tc.Timestamp
+ if tc.Deleted != nil && *tc.Deleted {
+ ts = time.Now()
+ }
+ c.taskSpecs.publisher.publish(tc.Id(), ts, tc)
+}
+
+// See documentation for db.ModifiedComments interface.
+func (c *commentClient) TrackModifiedCommitComment(cc *types.CommitComment) {
+ // Hack: since the timestamp is part of the ID, we can't change it. But,
+ // we have to provide a different timestamp from the one we sent when
+ // the comment was created, or else it'll get de-duplicated.
+ ts := cc.Timestamp
+ if cc.Deleted != nil && *cc.Deleted {
+ ts = time.Now()
+ }
+ c.commits.publisher.publish(cc.Id(), ts, cc)
+}
diff --git a/task_scheduler/go/db/pubsub/modified_test.go b/task_scheduler/go/db/pubsub/modified_test.go
index 9a5014a..c87ce0f 100644
--- a/task_scheduler/go/db/pubsub/modified_test.go
+++ b/task_scheduler/go/db/pubsub/modified_test.go
@@ -45,3 +45,23 @@
m := setupJobs(t)
db.TestMultipleJobModifications(t, m)
}
+
+func setupComments(t *testing.T) db.ModifiedComments {
+ testutils.LargeTest(t)
+ topic1 := fmt.Sprintf("modified-comments-test-tasks-%s", uuid.New())
+ topic2 := fmt.Sprintf("modified-comments-test-taskspecs-%s", uuid.New())
+ topic3 := fmt.Sprintf("modified-comments-test-commits-%s", uuid.New())
+ m, err := NewModifiedComments(topic1, topic2, topic3, "fake-label", nil)
+ assert.NoError(t, err)
+ return m
+}
+
+func TestPubsubModifiedComments(t *testing.T) {
+ m := setupComments(t)
+ db.TestModifiedComments(t, m)
+}
+
+func TestPubsubMultipleCommentModifications(t *testing.T) {
+ m := setupComments(t)
+ db.TestMultipleCommentModifications(t, m)
+}
diff --git a/task_scheduler/go/db/pubsub/pubsub.go b/task_scheduler/go/db/pubsub/pubsub.go
index e5efb42..18e6ca0 100644
--- a/task_scheduler/go/db/pubsub/pubsub.go
+++ b/task_scheduler/go/db/pubsub/pubsub.go
@@ -24,13 +24,27 @@
// Default project ID.
PROJECT_ID = "skia-public"
+ // Sets of topic, based on scheduler instance.
+ TOPIC_SET_PROD = "prod"
+ TOPIC_SET_INTERNAL = "internal"
+ TOPIC_SET_STAGING = "staging"
+
// Known topic names.
- TOPIC_TASKS = "task-scheduler-modified-tasks"
- TOPIC_TASKS_INTERNAL = "task-scheduler-modified-tasks-internal"
- TOPIC_TASKS_STAGING = "task-scheduler-modified-tasks-staging"
- TOPIC_JOBS = "task-scheduler-modified-jobs"
- TOPIC_JOBS_INTERNAL = "task-scheduler-modified-jobs-internal"
- TOPIC_JOBS_STAGING = "task-scheduler-modified-jobs-staging"
+ TOPIC_TASKS = "task-scheduler-modified-tasks"
+ TOPIC_TASKS_INTERNAL = "task-scheduler-modified-tasks-internal"
+ TOPIC_TASKS_STAGING = "task-scheduler-modified-tasks-staging"
+ TOPIC_JOBS = "task-scheduler-modified-jobs"
+ TOPIC_JOBS_INTERNAL = "task-scheduler-modified-jobs-internal"
+ TOPIC_JOBS_STAGING = "task-scheduler-modified-jobs-staging"
+ TOPIC_TASK_COMMENTS = "task-scheduler-modified-task-comments"
+ TOPIC_TASK_COMMENTS_INTERNAL = "task-scheduler-modified-task-comments-internal"
+ TOPIC_TASK_COMMENTS_STAGING = "task-scheduler-modified-task-comments-staging"
+ TOPIC_TASKSPEC_COMMENTS = "task-scheduler-modified-taskspec-comments"
+ TOPIC_TASKSPEC_COMMENTS_INTERNAL = "task-scheduler-modified-taskspec-comments-internal"
+ TOPIC_TASKSPEC_COMMENTS_STAGING = "task-scheduler-modified-taskspec-comments-staging"
+ TOPIC_COMMIT_COMMENTS = "task-scheduler-modified-commit-comments"
+ TOPIC_COMMIT_COMMENTS_INTERNAL = "task-scheduler-modified-commit-comments-internal"
+ TOPIC_COMMIT_COMMENTS_STAGING = "task-scheduler-modified-commit-comments-staging"
// Attributes sent with all pubsub messages.
@@ -42,6 +56,47 @@
ATTR_SENDER_ID = "sender"
)
+var (
+ VALID_TOPIC_SETS = []string{
+ TOPIC_SET_PROD,
+ TOPIC_SET_INTERNAL,
+ TOPIC_SET_STAGING,
+ }
+
+ topics = map[string]topicSet{
+ TOPIC_SET_PROD: topicSet{
+ tasks: TOPIC_TASKS,
+ jobs: TOPIC_JOBS,
+ taskComments: TOPIC_TASK_COMMENTS,
+ taskSpecComments: TOPIC_TASKSPEC_COMMENTS,
+ commitComments: TOPIC_COMMIT_COMMENTS,
+ },
+ TOPIC_SET_INTERNAL: topicSet{
+ tasks: TOPIC_TASKS_INTERNAL,
+ jobs: TOPIC_JOBS_INTERNAL,
+ taskComments: TOPIC_TASK_COMMENTS_INTERNAL,
+ taskSpecComments: TOPIC_TASKSPEC_COMMENTS_INTERNAL,
+ commitComments: TOPIC_COMMIT_COMMENTS_INTERNAL,
+ },
+ TOPIC_SET_STAGING: topicSet{
+ tasks: TOPIC_TASKS_STAGING,
+ jobs: TOPIC_JOBS_STAGING,
+ taskComments: TOPIC_TASK_COMMENTS_STAGING,
+ taskSpecComments: TOPIC_TASKSPEC_COMMENTS_STAGING,
+ commitComments: TOPIC_COMMIT_COMMENTS_STAGING,
+ },
+ }
+)
+
+// topicSet is used for organizing sets of pubsub topics.
+type topicSet struct {
+ tasks string
+ jobs string
+ taskComments string
+ taskSpecComments string
+ commitComments string
+}
+
// publisher sends pubsub messages for modified tasks and jobs.
type publisher struct {
senderId string
@@ -157,6 +212,78 @@
p.publish(j.Id, j.DbModified, j)
}
+// TaskSpecCommentPublisher sends pubsub messages for comments.
+type TaskSpecCommentPublisher struct {
+ *publisher
+}
+
+// NewTaskSpecCommentPublisher creates a TaskSpecCommentPublisher instance. It
+// creates the given topic if it does not already exist.
+func NewTaskSpecCommentPublisher(topic string, ts oauth2.TokenSource) (*TaskSpecCommentPublisher, error) {
+ c, err := pubsub.NewClient(context.Background(), PROJECT_ID, option.WithTokenSource(ts))
+ if err != nil {
+ return nil, err
+ }
+ pub, err := newPublisher(c, topic)
+ if err != nil {
+ return nil, err
+ }
+ return &TaskSpecCommentPublisher{pub}, nil
+}
+
+// Publish publishes a pubsub message for the given comment.
+func (p *TaskSpecCommentPublisher) Publish(t *types.TaskSpecComment) {
+ p.publish(t.Id(), t.Timestamp, t)
+}
+
+// TaskCommentPublisher sends pubsub messages for comments.
+type TaskCommentPublisher struct {
+ *publisher
+}
+
+// NewTaskCommentPublisher creates a TaskCommentPublisher instance. It creates the given
+// topic if it does not already exist.
+func NewTaskCommentPublisher(topic string, ts oauth2.TokenSource) (*TaskCommentPublisher, error) {
+ c, err := pubsub.NewClient(context.Background(), PROJECT_ID, option.WithTokenSource(ts))
+ if err != nil {
+ return nil, err
+ }
+ pub, err := newPublisher(c, topic)
+ if err != nil {
+ return nil, err
+ }
+ return &TaskCommentPublisher{pub}, nil
+}
+
+// Publish publishes a pubsub message for the given comment.
+func (p *TaskCommentPublisher) Publish(t *types.TaskComment) {
+ p.publish(t.Id(), t.Timestamp, t)
+}
+
+// CommitCommentPublisher sends pubsub messages for comments.
+type CommitCommentPublisher struct {
+ *publisher
+}
+
+// NewCommitCommentPublisher creates a CommitCommentPublisher instance. It creates the given
+// topic if it does not already exist.
+func NewCommitCommentPublisher(topic string, ts oauth2.TokenSource) (*CommitCommentPublisher, error) {
+ c, err := pubsub.NewClient(context.Background(), PROJECT_ID, option.WithTokenSource(ts))
+ if err != nil {
+ return nil, err
+ }
+ pub, err := newPublisher(c, topic)
+ if err != nil {
+ return nil, err
+ }
+ return &CommitCommentPublisher{pub}, nil
+}
+
+// Publish publishes a pubsub message for the given comment.
+func (p *CommitCommentPublisher) Publish(t *types.CommitComment) {
+ p.publish(t.Id(), t.Timestamp, t)
+}
+
// subscriber uses pubsub to watch for modified data.
type subscriber struct {
client *pubsub.Client
@@ -296,3 +423,90 @@
}
return s.start()
}
+
+// NewTaskCommentSubscriber creates a subscriber which calls the given callback
+// function for every pubsub message. The topic should be one of the TOPIC_*
+// constants defined in this package. The subscriberLabel is included in the
+// subscription ID, along with a timestamp; this should help to debug zombie
+// subscriptions. Acknowledgement of the message is done automatically based on
+// the return value of the callback: if the callback returns an error, the
+// message is Nack'd and will be re-sent at a later time, otherwise the message
+// is Ack'd and will not be re-sent. Therefore, if the comment is not valid or
+// otherwise cannot ever be processed, the callback should return nil to prevent
+// the message from being re-sent.
+func NewTaskCommentSubscriber(topic, subscriberLabel string, ts oauth2.TokenSource, callback func(*types.TaskComment) error) (context.CancelFunc, error) {
+ c, err := pubsub.NewClient(context.Background(), PROJECT_ID, option.WithTokenSource(ts))
+ if err != nil {
+ return nil, err
+ }
+ s, err := newSubscriber(c, topic, subscriberLabel, func(m *pubsub.Message) error {
+ var c types.TaskComment
+ if err := gob.NewDecoder(bytes.NewReader(m.Data)).Decode(&c); err != nil {
+ sklog.Errorf("Failed to decode TaskComment from pubsub message: %s", err)
+ return nil // We will never be able to process this message.
+ }
+ return callback(&c)
+ })
+ if err != nil {
+ return nil, err
+ }
+ return s.start()
+}
+
+// NewTaskSpecCommentSubscriber creates a subscriber which calls the given
+// callback function for every pubsub message. The topic should be one of the
+// TOPIC_* constants defined in this package. The subscriberLabel is included in
+// the subscription ID, along with a timestamp; this should help to debug zombie
+// subscriptions. Acknowledgement of the message is done automatically based on
+// the return value of the callback: if the callback returns an error, the
+// message is Nack'd and will be re-sent at a later time, otherwise the message
+// is Ack'd and will not be re-sent. Therefore, if the comment is not valid or
+// otherwise cannot ever be processed, the callback should return nil to prevent
+// the message from being re-sent.
+func NewTaskSpecCommentSubscriber(topic, subscriberLabel string, ts oauth2.TokenSource, callback func(*types.TaskSpecComment) error) (context.CancelFunc, error) {
+ c, err := pubsub.NewClient(context.Background(), PROJECT_ID, option.WithTokenSource(ts))
+ if err != nil {
+ return nil, err
+ }
+ s, err := newSubscriber(c, topic, subscriberLabel, func(m *pubsub.Message) error {
+ var c types.TaskSpecComment
+ if err := gob.NewDecoder(bytes.NewReader(m.Data)).Decode(&c); err != nil {
+ sklog.Errorf("Failed to decode TaskSpecComment from pubsub message: %s", err)
+ return nil // We will never be able to process this message.
+ }
+ return callback(&c)
+ })
+ if err != nil {
+ return nil, err
+ }
+ return s.start()
+}
+
+// NewCommitCommentSubscriber creates a subscriber which calls the given
+// callback function for every pubsub message. The topic should be one of the
+// TOPIC_* constants defined in this package. The subscriberLabel is included in
+// the subscription ID, along with a timestamp; this should help to debug zombie
+// subscriptions. Acknowledgement of the message is done automatically based on
+// the return value of the callback: if the callback returns an error, the
+// message is Nack'd and will be re-sent at a later time, otherwise the message
+// is Ack'd and will not be re-sent. Therefore, if the comment is not valid or
+// otherwise cannot ever be processed, the callback should return nil to prevent
+// the message from being re-sent.
+func NewCommitCommentSubscriber(topic, subscriberLabel string, ts oauth2.TokenSource, callback func(*types.CommitComment) error) (context.CancelFunc, error) {
+ c, err := pubsub.NewClient(context.Background(), PROJECT_ID, option.WithTokenSource(ts))
+ if err != nil {
+ return nil, err
+ }
+ s, err := newSubscriber(c, topic, subscriberLabel, func(m *pubsub.Message) error {
+ var c types.CommitComment
+ if err := gob.NewDecoder(bytes.NewReader(m.Data)).Decode(&c); err != nil {
+ sklog.Errorf("Failed to decode CommitComment from pubsub message: %s", err)
+ return nil // We will never be able to process this message.
+ }
+ return callback(&c)
+ })
+ if err != nil {
+ return nil, err
+ }
+ return s.start()
+}
diff --git a/task_scheduler/go/db/recovery/backups_test.go b/task_scheduler/go/db/recovery/backups_test.go
index 0cd38e4..eda14e3 100644
--- a/task_scheduler/go/db/recovery/backups_test.go
+++ b/task_scheduler/go/db/recovery/backups_test.go
@@ -136,7 +136,7 @@
assert.NoError(t, os.MkdirAll(path.Join(dir, TRIGGER_DIRNAME), os.ModePerm))
db := &testDB{
- DB: memory.NewInMemoryDB(nil, nil),
+ DB: memory.NewInMemoryDB(nil),
content: content,
ts: time.Unix(TEST_DB_TIME, 0),
}
diff --git a/task_scheduler/go/db/remote_db/remote_db.go b/task_scheduler/go/db/remote_db/remote_db.go
index 8b79c3f..7c20be5 100644
--- a/task_scheduler/go/db/remote_db/remote_db.go
+++ b/task_scheduler/go/db/remote_db/remote_db.go
@@ -80,27 +80,21 @@
type client struct {
serverRoot string
client *http.Client
- db.ModifiedTasks
- db.ModifiedJobs
+ db.ModifiedData
}
// NewClient returns a db.RemoteDB that connects to the server created by
// NewServer. serverRoot should end with a slash.
-func NewClient(serverRoot, tasksTopic, jobsTopic, label string, ts oauth2.TokenSource) (db.RemoteDB, error) {
+func NewClient(serverRoot, topicSet, label string, ts oauth2.TokenSource) (db.RemoteDB, error) {
c := httputils.DefaultClientConfig().WithTokenSource(ts).Client()
- modTasks, err := pubsub.NewModifiedTasks(tasksTopic, label, ts)
- if err != nil {
- return nil, err
- }
- modJobs, err := pubsub.NewModifiedJobs(jobsTopic, label, ts)
+ mod, err := pubsub.NewModifiedData(topicSet, label, ts)
if err != nil {
return nil, err
}
return &client{
- serverRoot: serverRoot,
- client: c,
- ModifiedTasks: modTasks,
- ModifiedJobs: modJobs,
+ serverRoot: serverRoot,
+ client: c,
+ ModifiedData: mod,
}, nil
}
diff --git a/task_scheduler/go/db/remote_db/remote_db_test.go b/task_scheduler/go/db/remote_db/remote_db_test.go
index 926ecb8..f5b9b31 100644
--- a/task_scheduler/go/db/remote_db/remote_db_test.go
+++ b/task_scheduler/go/db/remote_db/remote_db_test.go
@@ -97,17 +97,15 @@
// makeDB sets up a client/server pair wrapped in a clientWithBackdoor.
func makeDB(t *testing.T) db.DBCloser {
serverLabel := fmt.Sprintf("remote-db-test-%s", uuid.New())
- modTasks, err := pubsub.NewModifiedTasks(pubsub.TOPIC_TASKS, serverLabel, nil)
+ mod, err := pubsub.NewModifiedData(pubsub.TOPIC_SET_PROD, serverLabel, nil)
assert.NoError(t, err)
- modJobs, err := pubsub.NewModifiedJobs(pubsub.TOPIC_JOBS, serverLabel, nil)
- assert.NoError(t, err)
- baseDB := memory.NewInMemoryDB(modTasks, modJobs)
+ baseDB := memory.NewInMemoryDB(mod)
r := mux.NewRouter()
err = RegisterServer(baseDB, r.PathPrefix("/db").Subrouter())
assert.NoError(t, err)
ts := httptest.NewServer(r)
clientLabel := fmt.Sprintf("remote-db-test-%s", uuid.New())
- dbclient, err := NewClient(ts.URL+"/db/", pubsub.TOPIC_TASKS, pubsub.TOPIC_JOBS, clientLabel, nil)
+ dbclient, err := NewClient(ts.URL+"/db/", pubsub.TOPIC_SET_PROD, clientLabel, nil)
assert.NoError(t, err)
dbclient.(*client).client.Transport = newReqCountingTransport(dbclient.(*client).client.Transport)
return &clientWithBackdoor{
diff --git a/task_scheduler/go/db/testutil.go b/task_scheduler/go/db/testutil.go
index e80d30b..bf650f1 100644
--- a/task_scheduler/go/db/testutil.go
+++ b/task_scheduler/go/db/testutil.go
@@ -72,6 +72,33 @@
}))
}
+func findModifiedComments(t testutils.TestingT, m ModifiedComments, id string, e1 []*types.TaskComment, e2 []*types.TaskSpecComment, e3 []*types.CommitComment) {
+ // Note that the slice only works because we don't call
+ // TrackModifiedJob more than once for any given job, otherwise
+ // we'd have to use a map and compare DbModified.
+ a1 := []*types.TaskComment{}
+ a2 := []*types.TaskSpecComment{}
+ a3 := []*types.CommitComment{}
+ assert.NoError(t, testutils.EventuallyConsistent(10*time.Second, func() error {
+ c1, c2, c3, err := m.GetModifiedComments(id)
+ assert.NoError(t, err)
+ a1 = append(a1, c1...)
+ a2 = append(a2, c2...)
+ a3 = append(a3, c3...)
+ if len(a1) != len(e1) || len(a2) != len(e2) || len(a3) != len(e3) {
+ time.Sleep(100 * time.Millisecond)
+ return testutils.TryAgainErr
+ }
+ sort.Sort(types.TaskCommentSlice(a1))
+ sort.Sort(types.TaskSpecCommentSlice(a2))
+ sort.Sort(types.CommitCommentSlice(a3))
+ deepequal.AssertDeepEqual(t, e1, a1)
+ deepequal.AssertDeepEqual(t, e2, a2)
+ deepequal.AssertDeepEqual(t, e3, a3)
+ return nil
+ }))
+}
+
// TestTaskDB performs basic tests for an implementation of TaskDB.
func TestTaskDB(t testutils.TestingT, db TaskDB) {
_, err := db.GetModifiedTasks("dummy-id")
@@ -1174,7 +1201,19 @@
// TestCommentDB validates that db correctly implements the CommentDB interface.
func TestCommentDB(t testutils.TestingT, db CommentDB) {
- now := time.Now()
+ _, _, _, err := db.GetModifiedComments("dummy-id")
+ assert.True(t, IsUnknownId(err))
+
+ id, err := db.StartTrackingModifiedComments()
+ assert.NoError(t, err)
+
+ c1, c2, c3, err := db.GetModifiedComments(id)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(c1))
+ assert.Equal(t, 0, len(c2))
+ assert.Equal(t, 0, len(c3))
+
+ now := time.Now().Truncate(TS_RESOLUTION)
// Empty db.
r0 := fmt.Sprintf("%s%d", common.REPO_SKIA, 0)
@@ -1195,40 +1234,43 @@
}
// Add some comments.
- tc1 := types.MakeTaskComment(1, 1, 1, 1, now)
- tc2 := types.MakeTaskComment(2, 1, 1, 1, now.Add(2*time.Second))
- tc3 := types.MakeTaskComment(3, 1, 1, 1, now.Add(time.Second))
- tc4 := types.MakeTaskComment(4, 1, 1, 2, now)
- tc5 := types.MakeTaskComment(5, 1, 2, 2, now)
- tc6 := types.MakeTaskComment(6, 2, 3, 3, now)
+ tc1 := types.MakeTaskComment(1, 1, 1, 1, now.Add(-2*time.Second))
+ tc2 := types.MakeTaskComment(2, 1, 1, 1, now)
+ tc3 := types.MakeTaskComment(3, 1, 1, 1, now.Add(-time.Second))
+ tc4 := types.MakeTaskComment(4, 1, 1, 2, now.Add(-2*time.Second+time.Millisecond))
+ tc5 := types.MakeTaskComment(5, 1, 2, 2, now.Add(-2*time.Second+2*time.Millisecond))
+ tc6 := types.MakeTaskComment(6, 2, 3, 3, now.Add(-2*time.Second+3*time.Millisecond))
for _, c := range []*types.TaskComment{tc1, tc2, tc3, tc4, tc5, tc6} {
assert.NoError(t, db.PutTaskComment(c))
}
tc6copy := tc6.Copy()
tc6.Message = "modifying after Put shouldn't affect stored comment"
- sc1 := types.MakeTaskSpecComment(1, 1, 1, now)
- sc2 := types.MakeTaskSpecComment(2, 1, 1, now.Add(2*time.Second))
- sc3 := types.MakeTaskSpecComment(3, 1, 1, now.Add(time.Second))
- sc4 := types.MakeTaskSpecComment(4, 1, 2, now)
- sc5 := types.MakeTaskSpecComment(5, 2, 3, now)
+ sc1 := types.MakeTaskSpecComment(1, 1, 1, now.Add(-2*time.Second))
+ sc2 := types.MakeTaskSpecComment(2, 1, 1, now)
+ sc3 := types.MakeTaskSpecComment(3, 1, 1, now.Add(-time.Second))
+ sc4 := types.MakeTaskSpecComment(4, 1, 2, now.Add(-2*time.Second+time.Millisecond))
+ sc5 := types.MakeTaskSpecComment(5, 2, 3, now.Add(-2*time.Second+2*time.Millisecond))
for _, c := range []*types.TaskSpecComment{sc1, sc2, sc3, sc4, sc5} {
assert.NoError(t, db.PutTaskSpecComment(c))
}
sc5copy := sc5.Copy()
sc5.Message = "modifying after Put shouldn't affect stored comment"
- cc1 := types.MakeCommitComment(1, 1, 1, now)
- cc2 := types.MakeCommitComment(2, 1, 1, now.Add(2*time.Second))
- cc3 := types.MakeCommitComment(3, 1, 1, now.Add(time.Second))
- cc4 := types.MakeCommitComment(4, 1, 2, now)
- cc5 := types.MakeCommitComment(5, 2, 3, now)
+ cc1 := types.MakeCommitComment(1, 1, 1, now.Add(-2*time.Second))
+ cc2 := types.MakeCommitComment(2, 1, 1, now)
+ cc3 := types.MakeCommitComment(3, 1, 1, now.Add(-time.Second))
+ cc4 := types.MakeCommitComment(4, 1, 2, now.Add(-2*time.Second+time.Millisecond))
+ cc5 := types.MakeCommitComment(5, 2, 3, now.Add(-2*time.Second+2*time.Millisecond))
for _, c := range []*types.CommitComment{cc1, cc2, cc3, cc4, cc5} {
assert.NoError(t, db.PutCommitComment(c))
}
cc5copy := cc5.Copy()
cc5.Message = "modifying after Put shouldn't affect stored comment"
+ // Ensure that all comments show up in the modified list.
+ findModifiedComments(t, db, id, []*types.TaskComment{tc1, tc4, tc5, tc6copy, tc3, tc2}, []*types.TaskSpecComment{sc1, sc4, sc5copy, sc3, sc2}, []*types.CommitComment{cc1, cc4, cc5copy, cc3, cc2})
+
// Check that adding duplicate non-identical comment gives an error.
tc1different := tc1.Copy()
tc1different.Message = "not the same"
@@ -1285,7 +1327,7 @@
// Specifying a cutoff time shouldn't drop required comments.
{
- actual, err := db.GetCommentsForRepos([]string{r1}, now.Add(time.Second))
+ actual, err := db.GetCommentsForRepos([]string{r1}, now.Add(-time.Second))
assert.NoError(t, err)
assert.Equal(t, 1, len(actual))
{
@@ -1319,6 +1361,9 @@
AssertDeepEqual(t, cc2, ccs[offset+1])
}
}
+ // Clear out modified comments.
+ _, _, _, err = db.GetModifiedComments(id)
+ assert.NoError(t, err)
// Delete some comments.
assert.NoError(t, db.DeleteTaskComment(tc3))
@@ -1347,6 +1392,14 @@
actual, err := db.GetCommentsForRepos([]string{r0, r1, r2}, now.Add(-10000*time.Hour))
assert.NoError(t, err)
AssertDeepEqual(t, expected, actual)
+ deleted := true
+ tc1different.Deleted = &deleted
+ sc1different.Deleted = &deleted
+ cc1different.Deleted = &deleted
+ tc3.Deleted = &deleted
+ sc3.Deleted = &deleted
+ cc3.Deleted = &deleted
+ findModifiedComments(t, db, id, []*types.TaskComment{tc1different, tc3}, []*types.TaskSpecComment{sc1different, sc3}, []*types.CommitComment{cc1different, cc3})
}
// Delete all the comments.
@@ -1664,6 +1717,93 @@
}))
}
+func TestModifiedComments(t testutils.TestingT, m ModifiedComments) {
+ _, _, _, err := m.GetModifiedComments("dummy-id")
+ assert.True(t, IsUnknownId(err))
+
+ id, err := m.StartTrackingModifiedComments()
+ assert.NoError(t, err)
+
+ time.Sleep(time.Second)
+ a1, a2, a3, err := m.GetModifiedComments(id)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(a1))
+ assert.Equal(t, 0, len(a2))
+ assert.Equal(t, 0, len(a3))
+
+ c1 := types.MakeTaskComment(1, 1, 1, 1, time.Now())
+
+ // Insert the comment.
+ m.TrackModifiedTaskComment(c1)
+
+ // Ensure that the comment shows up in the modified list.
+ findModifiedComments(t, m, id, []*types.TaskComment{c1}, []*types.TaskSpecComment{}, []*types.CommitComment{})
+
+ // Insert two more comments.
+ c2 := types.MakeTaskComment(2, 2, 2, 2, time.Now())
+ m.TrackModifiedTaskComment(c2)
+ c3 := types.MakeTaskComment(3, 3, 3, 3, time.Now())
+ m.TrackModifiedTaskComment(c3)
+
+ // Ensure that both comments show up in the modified list.
+ findModifiedComments(t, m, id, []*types.TaskComment{c2, c3}, []*types.TaskSpecComment{}, []*types.CommitComment{})
+
+ // Insert two more comments.
+ c4 := types.MakeTaskSpecComment(4, 4, 4, time.Now())
+ m.TrackModifiedTaskSpecComment(c4)
+ c5 := types.MakeCommitComment(5, 5, 5, time.Now())
+ m.TrackModifiedCommitComment(c5)
+
+ // Ensure that both comments show up in the modified list.
+ findModifiedComments(t, m, id, []*types.TaskComment{}, []*types.TaskSpecComment{c4}, []*types.CommitComment{c5})
+
+ // Check StopTrackingModifiedComments.
+ m.StopTrackingModifiedComments(id)
+ err = testutils.EventuallyConsistent(10*time.Second, func() error {
+ _, _, _, err := m.GetModifiedComments(id)
+ if err == nil {
+ return testutils.TryAgainErr
+ }
+ return err
+ })
+ assert.True(t, IsUnknownId(err))
+}
+
+func TestMultipleCommentModifications(t testutils.TestingT, m ModifiedComments) {
+ id, err := m.StartTrackingModifiedComments()
+ assert.NoError(t, err)
+
+ c1 := types.MakeTaskComment(1, 1, 1, 1, time.Now())
+
+ // Insert the comment.
+ m.TrackModifiedTaskComment(c1)
+
+ // Delete the comment.
+ deleted := true
+ c1.Deleted = &deleted
+ m.TrackModifiedTaskComment(c1)
+
+ // Ensure that the comment shows up only once in the modified list and
+ // is the most recent value.
+ var actual *types.TaskComment
+ assert.NoError(t, testutils.EventuallyConsistent(10*time.Second, func() error {
+ a1, _, _, err := m.GetModifiedComments(id)
+ if err != nil {
+ return err
+ }
+ if len(a1) == 1 {
+ if actual == nil || actual.Deleted == nil {
+ actual = a1[0]
+ }
+ }
+ if deepequal.DeepEqual(c1, actual) {
+ return nil
+ }
+ time.Sleep(100 * time.Millisecond)
+ return testutils.TryAgainErr
+ }))
+}
+
func TestUpdateDBFromSwarmingTask(t testutils.TestingT, db TaskDB) {
// Create task, initialize from swarming, and save.
now := time.Now().UTC().Round(time.Microsecond)
diff --git a/task_scheduler/go/scheduling/perftest/perftest.go b/task_scheduler/go/scheduling/perftest/perftest.go
index aa1a1bc..96b710b 100644
--- a/task_scheduler/go/scheduling/perftest/perftest.go
+++ b/task_scheduler/go/scheduling/perftest/perftest.go
@@ -265,7 +265,7 @@
assertNoError(err)
assertDeepEqual([]string{head}, commits)
- d, err := local_db.NewDB("testdb", path.Join(workdir, "tasks.db"), nil, nil)
+ d, err := local_db.NewDB("testdb", path.Join(workdir, "tasks.db"), nil)
assertNoError(err)
w, err := window.New(time.Hour, 0, nil)
assertNoError(err)
diff --git a/task_scheduler/go/scheduling/task_scheduler_test.go b/task_scheduler/go/scheduling/task_scheduler_test.go
index 642945a..6ea2f69 100644
--- a/task_scheduler/go/scheduling/task_scheduler_test.go
+++ b/task_scheduler/go/scheduling/task_scheduler_test.go
@@ -194,7 +194,7 @@
assert.NoError(t, err)
assert.NoError(t, os.Mkdir(path.Join(tmp, periodic_triggers.TRIGGER_DIRNAME), os.ModePerm))
- d := memory.NewInMemoryDB(nil, nil)
+ d := memory.NewInMemoryDB(nil)
isolateClient, err := isolate.NewClient(tmp, isolate.ISOLATE_SERVER_URL_FAKE)
assert.NoError(t, err)
swarmingClient := swarming_testutils.NewTestClient()
@@ -2008,7 +2008,7 @@
gb.Commit(ctx)
// Setup the scheduler.
- d := memory.NewInMemoryDB(nil, nil)
+ d := memory.NewInMemoryDB(nil)
isolateClient, err := isolate.NewClient(workdir, isolate.ISOLATE_SERVER_URL_FAKE)
assert.NoError(t, err)
swarmingClient := swarming_testutils.NewTestClient()
@@ -3383,7 +3383,7 @@
func TestValidateTaskForUpdate(t *testing.T) {
testutils.SmallTest(t)
- d := memory.NewInMemoryDB(nil, nil)
+ d := memory.NewInMemoryDB(nil)
c1 := "abc123"
c2 := "def456"
@@ -3461,7 +3461,7 @@
func TestUpdateTask(t *testing.T) {
testutils.SmallTest(t)
- d := memory.NewInMemoryDB(nil, nil)
+ d := memory.NewInMemoryDB(nil)
c1 := "abc123"
c2 := "def456"
diff --git a/task_scheduler/go/task_scheduler/main.go b/task_scheduler/go/task_scheduler/main.go
index dce39f1..1c195ec 100644
--- a/task_scheduler/go/task_scheduler/main.go
+++ b/task_scheduler/go/task_scheduler/main.go
@@ -87,8 +87,7 @@
firestoreInstance = flag.String("firestore_instance", "", "Firestore instance to use, eg. \"prod\"")
isolateServer = flag.String("isolate_server", isolate.ISOLATE_SERVER_URL, "Which Isolate server to use.")
local = flag.Bool("local", false, "Whether we're running on a dev machine vs in production.")
- pubsubTopicTasks = flag.String("pubsub_topic_tasks", "", "Pubsub topic for tasks.")
- pubsubTopicJobs = flag.String("pubsub_topic_jobs", "", "Pubsub topic for jobs.")
+ pubsubTopicSet = flag.String("pubsub_topic_set", "", fmt.Sprintf("Pubsub topic set; one of: %v", pubsub.VALID_TOPIC_SETS))
repoUrls = common.NewMultiStringFlag("repo", nil, "Repositories for which to schedule tasks.")
recipesCfgFile = flag.String("recipes_cfg", "", "Path to the recipes.cfg file.")
resourcesDir = flag.String("resources_dir", "", "The directory to find templates, JS, and CSS files. If blank, assumes you're running inside a checkout and will attempt to find the resources relative to this source file.")
@@ -656,23 +655,18 @@
// Initialize the database.
label := *host
- modTasks, err := pubsub.NewModifiedTasks(*pubsubTopicTasks, label, tokenSource)
+ mod, err := pubsub.NewModifiedData(*pubsubTopicSet, label, tokenSource)
if err != nil {
sklog.Fatal(err)
}
- modTasks = modified.NewMuxModifiedTasks(&modified.ModifiedTasksImpl{}, modTasks)
- modJobs, err := pubsub.NewModifiedJobs(*pubsubTopicJobs, label, tokenSource)
- if err != nil {
- sklog.Fatal(err)
- }
- modJobs = modified.NewMuxModifiedJobs(&modified.ModifiedJobsImpl{}, modJobs)
+ mod = modified.NewMuxModifiedData(modified.NewModifiedData(), mod)
if *firestoreInstance != "" {
- tsDb, err = firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, tokenSource, modTasks, modJobs)
+ tsDb, err = firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, tokenSource, mod)
if err != nil {
sklog.Fatalf("Failed to create Firestore DB client: %s", err)
}
} else {
- tsDb, err = local_db.NewDB(local_db.DB_NAME, path.Join(wdAbs, local_db.DB_FILENAME), modTasks, modJobs)
+ tsDb, err = local_db.NewDB(local_db.DB_NAME, path.Join(wdAbs, local_db.DB_FILENAME), mod)
if err != nil {
sklog.Fatal(err)
}
diff --git a/task_scheduler/go/tryjobs/testutils.go b/task_scheduler/go/tryjobs/testutils.go
index ae3a409..4f29a5f 100644
--- a/task_scheduler/go/tryjobs/testutils.go
+++ b/task_scheduler/go/tryjobs/testutils.go
@@ -111,7 +111,7 @@
assert.NoError(t, err)
taskCfgCache, err := specs.NewTaskCfgCache(ctx, rm, depot_tools_testutils.GetDepotTools(t, ctx), path.Join(tmpDir, "cache"), specs.DEFAULT_NUM_WORKERS)
assert.NoError(t, err)
- d, err := local_db.NewDB("tasks_db", path.Join(tmpDir, "tasks.db"), nil, nil)
+ d, err := local_db.NewDB("tasks_db", path.Join(tmpDir, "tasks.db"), nil)
assert.NoError(t, err)
mock := mockhttpclient.NewURLMock()
projectRepoMapping := map[string]string{
diff --git a/task_scheduler/go/types/comments.go b/task_scheduler/go/types/comments.go
index d0af194..05302c7 100644
--- a/task_scheduler/go/types/comments.go
+++ b/task_scheduler/go/types/comments.go
@@ -6,6 +6,7 @@
"time"
"go.skia.org/infra/go/sklog"
+ "go.skia.org/infra/go/util"
)
// TaskComment contains a comment about a Task. {Repo, Revision, Name,
@@ -18,13 +19,23 @@
// Timestamp is compared ignoring timezone. The timezone reflects User's
// location.
Timestamp time.Time `json:"time"`
- TaskId string `json:"taskId"`
- User string `json:"user"`
- Message string `json:"message"`
+ TaskId string `json:"taskId,omitempty"`
+ User string `json:"user,omitempty"`
+ Message string `json:"message,omitempty"`
+ Deleted *bool `json:"deleted,omitempty"`
}
func (c TaskComment) Copy() *TaskComment {
- return &c
+ rv := &c
+ if c.Deleted != nil {
+ v := *c.Deleted
+ rv.Deleted = &v
+ }
+ return rv
+}
+
+func (c *TaskComment) Id() string {
+ return c.Repo + "#" + c.Revision + "#" + c.Name + "#" + c.Timestamp.Format(util.SAFE_TIMESTAMP_FORMAT)
}
// TaskSpecComment contains a comment about a TaskSpec. {Repo, Name, Timestamp}
@@ -35,14 +46,24 @@
// Timestamp is compared ignoring timezone. The timezone reflects User's
// location.
Timestamp time.Time `json:"time"`
- User string `json:"user"`
+ User string `json:"user,omitempty"`
Flaky bool `json:"flaky"`
IgnoreFailure bool `json:"ignoreFailure"`
- Message string `json:"message"`
+ Message string `json:"message,omitempty"`
+ Deleted *bool `json:"deleted,omitempty"`
}
func (c TaskSpecComment) Copy() *TaskSpecComment {
- return &c
+ rv := &c
+ if c.Deleted != nil {
+ v := *c.Deleted
+ rv.Deleted = &v
+ }
+ return rv
+}
+
+func (c *TaskSpecComment) Id() string {
+ return c.Repo + "#" + c.Name + "#" + c.Timestamp.Format(util.SAFE_TIMESTAMP_FORMAT)
}
// CommitComment contains a comment about a commit. {Repo, Revision, Timestamp}
@@ -53,13 +74,23 @@
// Timestamp is compared ignoring timezone. The timezone reflects User's
// location.
Timestamp time.Time `json:"time"`
- User string `json:"user"`
+ User string `json:"user,omitempty"`
IgnoreFailure bool `json:"ignoreFailure"`
- Message string `json:"message"`
+ Message string `json:"message,omitempty"`
+ Deleted *bool `json:"deleted,omitempty"`
}
func (c CommitComment) Copy() *CommitComment {
- return &c
+ rv := &c
+ if c.Deleted != nil {
+ v := *c.Deleted
+ rv.Deleted = &v
+ }
+ return rv
+}
+
+func (c *CommitComment) Id() string {
+ return c.Repo + "#" + c.Revision + "#" + c.Timestamp.Format(util.SAFE_TIMESTAMP_FORMAT)
}
// RepoComments contains comments that all pertain to the same repository.
@@ -90,3 +121,199 @@
}
return ©
}
+
+// TaskCommentSlice implements sort.Interface. To sort taskComments
+// []*TaskComment, use sort.Sort(TaskCommentSlice(taskComments)).
+type TaskCommentSlice []*TaskComment
+
+func (s TaskCommentSlice) Len() int { return len(s) }
+
+func (s TaskCommentSlice) Less(i, j int) bool {
+ return s[i].Timestamp.Before(s[j].Timestamp)
+}
+
+func (s TaskCommentSlice) Swap(i, j int) {
+ s[i], s[j] = s[j], s[i]
+}
+
+// TaskSpecCommentSlice implements sort.Interface. To sort taskSpecComments
+// []*TaskSpecComment, use sort.Sort(TaskSpecCommentSlice(taskSpecComments)).
+type TaskSpecCommentSlice []*TaskSpecComment
+
+func (s TaskSpecCommentSlice) Len() int { return len(s) }
+
+func (s TaskSpecCommentSlice) Less(i, j int) bool {
+ return s[i].Timestamp.Before(s[j].Timestamp)
+}
+
+func (s TaskSpecCommentSlice) Swap(i, j int) {
+ s[i], s[j] = s[j], s[i]
+}
+
+// CommitCommentSlice implements sort.Interface. To sort commitComments
+// []*CommitComment, use sort.Sort(CommitCommentSlice(commitComments)).
+type CommitCommentSlice []*CommitComment
+
+func (s CommitCommentSlice) Len() int { return len(s) }
+
+func (s CommitCommentSlice) Less(i, j int) bool {
+ return s[i].Timestamp.Before(s[j].Timestamp)
+}
+
+func (s CommitCommentSlice) Swap(i, j int) {
+ s[i], s[j] = s[j], s[i]
+}
+
+// TaskCommentEncoder encodes TaskComments into bytes via GOB encoding. Not
+// safe for concurrent use.
+type TaskCommentEncoder struct {
+ util.GobEncoder
+}
+
+// Next returns one of the TaskComments provided to Process (in arbitrary order)
+// and its serialized bytes. If any comments remain, returns the TaskComment,
+// the serialized bytes, nil. If all comments have been returned, returns nil,
+// nil, nil. If an error is encountered, returns nil, nil, error.
+func (e *TaskCommentEncoder) Next() (*TaskComment, []byte, error) {
+ item, serialized, err := e.GobEncoder.Next()
+ if err != nil {
+ return nil, nil, err
+ } else if item == nil {
+ return nil, nil, nil
+ }
+ return item.(*TaskComment), serialized, err
+}
+
+// TaskCommentDecoder decodes bytes into TaskComments via GOB decoding. Not safe
+// for concurrent use.
+type TaskCommentDecoder struct {
+ *util.GobDecoder
+}
+
+// NewTaskCommentDecoder returns a TaskCommentDecoder instance.
+func NewTaskCommentDecoder() *TaskCommentDecoder {
+ return &TaskCommentDecoder{
+ GobDecoder: util.NewGobDecoder(func() interface{} {
+ return &TaskComment{}
+ }, func(ch <-chan interface{}) interface{} {
+ items := []*TaskComment{}
+ for item := range ch {
+ items = append(items, item.(*TaskComment))
+ }
+ return items
+ }),
+ }
+}
+
+// Result returns all decoded TaskComments provided to Process (in arbitrary
+// order), or any error encountered.
+func (d *TaskCommentDecoder) Result() ([]*TaskComment, error) {
+ res, err := d.GobDecoder.Result()
+ if err != nil {
+ return nil, err
+ }
+ return res.([]*TaskComment), nil
+}
+
+// TaskSpecCommentEncoder encodes TaskSpecComments into bytes via GOB encoding.
+// Not safe for concurrent use.
+type TaskSpecCommentEncoder struct {
+ util.GobEncoder
+}
+
+// Next returns one of the TaskSpecComments provided to Process (in arbitrary
+// order) and its serialized bytes. If any comments remain, returns the
+// TaskSpecComment, the serialized bytes, nil. If all comments have been
+// returned, returns nil, nil, nil. If an error is encountered, returns nil,
+// nil, error.
+func (e *TaskSpecCommentEncoder) Next() (*TaskSpecComment, []byte, error) {
+ item, serialized, err := e.GobEncoder.Next()
+ if err != nil {
+ return nil, nil, err
+ } else if item == nil {
+ return nil, nil, nil
+ }
+ return item.(*TaskSpecComment), serialized, err
+}
+
+// TaskSpecCommentDecoder decodes bytes into TaskSpecComments via GOB decoding.
+// Not safe for concurrent use.
+type TaskSpecCommentDecoder struct {
+ *util.GobDecoder
+}
+
+// NewTaskSpecCommentDecoder returns a TaskSpecCommentDecoder instance.
+func NewTaskSpecCommentDecoder() *TaskSpecCommentDecoder {
+ return &TaskSpecCommentDecoder{
+ GobDecoder: util.NewGobDecoder(func() interface{} {
+ return &TaskSpecComment{}
+ }, func(ch <-chan interface{}) interface{} {
+ items := []*TaskSpecComment{}
+ for item := range ch {
+ items = append(items, item.(*TaskSpecComment))
+ }
+ return items
+ }),
+ }
+}
+
+// Result returns all decoded TaskSpecComments provided to Process (in arbitrary
+// order), or any error encountered.
+func (d *TaskSpecCommentDecoder) Result() ([]*TaskSpecComment, error) {
+ res, err := d.GobDecoder.Result()
+ if err != nil {
+ return nil, err
+ }
+ return res.([]*TaskSpecComment), nil
+}
+
+// CommitCommentEncoder encodes CommitComments into bytes via GOB encoding. Not
+// safe for concurrent use.
+type CommitCommentEncoder struct {
+ util.GobEncoder
+}
+
+// Next returns one of the CommitComments provided to Process (in arbitrary
+// order) and its serialized bytes. If any comments remain, returns the
+// CommitComment, the serialized bytes, nil. If all comments have been returned,
+// returns nil, nil, nil. If an error is encountered, returns nil, nil, error.
+func (e *CommitCommentEncoder) Next() (*CommitComment, []byte, error) {
+ item, serialized, err := e.GobEncoder.Next()
+ if err != nil {
+ return nil, nil, err
+ } else if item == nil {
+ return nil, nil, nil
+ }
+ return item.(*CommitComment), serialized, err
+}
+
+// CommitCommentDecoder decodes bytes into CommitComments via GOB decoding.
+// Not safe for concurrent use.
+type CommitCommentDecoder struct {
+ *util.GobDecoder
+}
+
+// NewCommitCommentDecoder returns a CommitCommentDecoder instance.
+func NewCommitCommentDecoder() *CommitCommentDecoder {
+ return &CommitCommentDecoder{
+ GobDecoder: util.NewGobDecoder(func() interface{} {
+ return &CommitComment{}
+ }, func(ch <-chan interface{}) interface{} {
+ items := []*CommitComment{}
+ for item := range ch {
+ items = append(items, item.(*CommitComment))
+ }
+ return items
+ }),
+ }
+}
+
+// Result returns all decoded CommitComments provided to Process (in arbitrary
+// order), or any error encountered.
+func (d *CommitCommentDecoder) Result() ([]*CommitComment, error) {
+ res, err := d.GobDecoder.Result()
+ if err != nil {
+ return nil, err
+ }
+ return res.([]*CommitComment), nil
+}
diff --git a/task_scheduler/go/types/comments_test.go b/task_scheduler/go/types/comments_test.go
index 19fb7d1..ccf6708 100644
--- a/task_scheduler/go/types/comments_test.go
+++ b/task_scheduler/go/types/comments_test.go
@@ -11,6 +11,8 @@
func TestCopyTaskComment(t *testing.T) {
testutils.SmallTest(t)
v := MakeTaskComment(1, 1, 1, 1, time.Now())
+ deleted := true
+ v.Deleted = &deleted
deepequal.AssertCopy(t, v, v.Copy())
}
@@ -19,6 +21,8 @@
v := MakeTaskSpecComment(1, 1, 1, time.Now())
v.Flaky = true
v.IgnoreFailure = true
+ deleted := true
+ v.Deleted = &deleted
deepequal.AssertCopy(t, v, v.Copy())
}
@@ -26,6 +30,8 @@
testutils.SmallTest(t)
v := MakeCommitComment(1, 1, 1, time.Now())
v.IgnoreFailure = true
+ deleted := true
+ v.Deleted = &deleted
deepequal.AssertCopy(t, v, v.Copy())
}
diff --git a/task_scheduler/go/types/job.go b/task_scheduler/go/types/job.go
index b4bd6ca..1dac7ad 100644
--- a/task_scheduler/go/types/job.go
+++ b/task_scheduler/go/types/job.go
@@ -1,13 +1,11 @@
package types
import (
- "bytes"
- "encoding/gob"
"fmt"
- "sync"
"time"
"go.skia.org/infra/go/sklog"
+ "go.skia.org/infra/go/util"
)
const (
@@ -308,29 +306,7 @@
// concurrent use.
// TODO(benjaminwagner): Encode in parallel.
type JobEncoder struct {
- err error
- jobs []*Job
- result [][]byte
-}
-
-// Process encodes the Job into a byte slice that will be returned from Next()
-// (in arbitrary order). Returns false if Next is certain to return an error.
-// Caller must ensure j does not change until after the first call to Next().
-// May not be called after calling Next().
-func (e *JobEncoder) Process(j *Job) bool {
- if e.err != nil {
- return false
- }
- var buf bytes.Buffer
- if err := gob.NewEncoder(&buf).Encode(j); err != nil {
- e.err = err
- e.jobs = nil
- e.result = nil
- return false
- }
- e.jobs = append(e.jobs, j)
- e.result = append(e.result, buf.Bytes())
- return true
+ util.GobEncoder
}
// Next returns one of the Jobs provided to Process (in arbitrary order) and
@@ -338,113 +314,42 @@
// bytes, nil. If all jobs have been returned, returns nil, nil, nil. If an
// error is encountered, returns nil, nil, error.
func (e *JobEncoder) Next() (*Job, []byte, error) {
- if e.err != nil {
- return nil, nil, e.err
- }
- if len(e.jobs) == 0 {
+ item, serialized, err := e.GobEncoder.Next()
+ if err != nil {
+ return nil, nil, err
+ } else if item == nil {
return nil, nil, nil
}
- j := e.jobs[0]
- e.jobs = e.jobs[1:]
- serialized := e.result[0]
- e.result = e.result[1:]
- return j, serialized, nil
+ return item.(*Job), serialized, nil
}
// JobDecoder decodes bytes into Jobs via GOB decoding. Not safe for
// concurrent use.
type JobDecoder struct {
- // input contains the incoming byte slices. Process() sends on this channel,
- // decode() receives from it, and Result() closes it.
- input chan []byte
- // output contains decoded Jobs. decode() sends on this channel, collect()
- // receives from it, and run() closes it when all decode() goroutines have
- // finished.
- output chan *Job
- // result contains the return value of Result(). collect() sends a single
- // value on this channel and closes it. Result() receives from it.
- result chan []*Job
- // errors contains the first error from any goroutine. It's a channel in case
- // multiple goroutines experience an error at the same time.
- errors chan error
+ *util.GobDecoder
}
-// init initializes d if it has not been initialized. May not be called concurrently.
-func (d *JobDecoder) init() {
- if d.input == nil {
- d.input = make(chan []byte, kNumDecoderGoroutines*2)
- d.output = make(chan *Job, kNumDecoderGoroutines)
- d.result = make(chan []*Job, 1)
- d.errors = make(chan error, kNumDecoderGoroutines)
- go d.run()
- go d.collect()
+// NewJobDecoder returns a JobDecoder instance.
+func NewJobDecoder() *JobDecoder {
+ return &JobDecoder{
+ GobDecoder: util.NewGobDecoder(func() interface{} {
+ return &Job{}
+ }, func(ch <-chan interface{}) interface{} {
+ items := []*Job{}
+ for item := range ch {
+ items = append(items, item.(*Job))
+ }
+ return items
+ }),
}
}
-// run starts the decode goroutines and closes d.output when they finish.
-func (d *JobDecoder) run() {
- // Start decoders.
- wg := sync.WaitGroup{}
- for i := 0; i < kNumDecoderGoroutines; i++ {
- wg.Add(1)
- go d.decode(&wg)
- }
- // Wait for decoders to exit.
- wg.Wait()
- // Drain d.input in the case that errors were encountered, to avoid deadlock.
- for range d.input {
- }
- close(d.output)
-}
-
-// decode receives from d.input and sends to d.output until d.input is closed or
-// d.errors is non-empty. Decrements wg when done.
-func (d *JobDecoder) decode(wg *sync.WaitGroup) {
- for b := range d.input {
- var j Job
- if err := gob.NewDecoder(bytes.NewReader(b)).Decode(&j); err != nil {
- d.errors <- err
- break
- }
- d.output <- &j
- if len(d.errors) > 0 {
- break
- }
- }
- wg.Done()
-}
-
-// collect receives from d.output until it is closed, then sends on d.result.
-func (d *JobDecoder) collect() {
- result := []*Job{}
- for j := range d.output {
- result = append(result, j)
- }
- d.result <- result
- close(d.result)
-}
-
-// Process decodes the byte slice into a Job and includes it in Result() (in
-// arbitrary order). Returns false if Result is certain to return an error.
-// Caller must ensure b does not change until after Result() returns.
-func (d *JobDecoder) Process(b []byte) bool {
- d.init()
- d.input <- b
- return len(d.errors) == 0
-}
-
// Result returns all decoded Jobs provided to Process (in arbitrary order), or
// any error encountered.
func (d *JobDecoder) Result() ([]*Job, error) {
- // Allow JobDecoder to be used without initialization.
- if d.result == nil {
- return []*Job{}, nil
- }
- close(d.input)
- select {
- case err := <-d.errors:
+ res, err := d.GobDecoder.Result()
+ if err != nil {
return nil, err
- case result := <-d.result:
- return result, nil
}
+ return res.([]*Job), nil
}
diff --git a/task_scheduler/go/types/job_test.go b/task_scheduler/go/types/job_test.go
index 22f67e9..b2ecb32 100644
--- a/task_scheduler/go/types/job_test.go
+++ b/task_scheduler/go/types/job_test.go
@@ -82,7 +82,7 @@
func TestJobDecoder(t *testing.T) {
testutils.SmallTest(t)
- d := JobDecoder{}
+ d := NewJobDecoder()
expectedJobs := map[string]*Job{}
for i := 0; i < 250; i++ {
job := &Job{}
@@ -109,7 +109,7 @@
func TestJobDecoderNoJobs(t *testing.T) {
testutils.SmallTest(t)
- d := JobDecoder{}
+ d := NewJobDecoder()
result, err := d.Result()
assert.NoError(t, err)
assert.Equal(t, 0, len(result))
@@ -125,7 +125,7 @@
serialized := buf.Bytes()
invalid := append([]byte("Hi Mom!"), serialized...)
- d := JobDecoder{}
+ d := NewJobDecoder()
// Process should return true before it encounters an invalid result.
assert.True(t, d.Process(serialized))
assert.True(t, d.Process(serialized))
diff --git a/task_scheduler/go/types/task.go b/task_scheduler/go/types/task.go
index e8dfed4..d49e394 100644
--- a/task_scheduler/go/types/task.go
+++ b/task_scheduler/go/types/task.go
@@ -1,14 +1,11 @@
package types
import (
- "bytes"
- "encoding/gob"
"fmt"
"reflect"
"sort"
"strconv"
"strings"
- "sync"
"time"
"unicode/utf8"
@@ -447,31 +444,8 @@
// TaskEncoder encodes Tasks into bytes via GOB encoding. Not safe for
// concurrent use.
-// TODO(benjaminwagner): Encode in parallel.
type TaskEncoder struct {
- err error
- tasks []*Task
- result [][]byte
-}
-
-// Process encodes the Task into a byte slice that will be returned from Next()
-// (in arbitrary order). Returns false if Next is certain to return an error.
-// Caller must ensure t does not change until after the first call to Next().
-// May not be called after calling Next().
-func (e *TaskEncoder) Process(t *Task) bool {
- if e.err != nil {
- return false
- }
- var buf bytes.Buffer
- if err := gob.NewEncoder(&buf).Encode(t); err != nil {
- e.err = err
- e.tasks = nil
- e.result = nil
- return false
- }
- e.tasks = append(e.tasks, t)
- e.result = append(e.result, buf.Bytes())
- return true
+ util.GobEncoder
}
// Next returns one of the Tasks provided to Process (in arbitrary order) and
@@ -479,117 +453,44 @@
// bytes, nil. If all tasks have been returned, returns nil, nil, nil. If an
// error is encountered, returns nil, nil, error.
func (e *TaskEncoder) Next() (*Task, []byte, error) {
- if e.err != nil {
- return nil, nil, e.err
- }
- if len(e.tasks) == 0 {
+ item, serialized, err := e.GobEncoder.Next()
+ if err != nil {
+ return nil, nil, err
+ } else if item == nil {
return nil, nil, nil
}
- t := e.tasks[0]
- e.tasks = e.tasks[1:]
- serialized := e.result[0]
- e.result = e.result[1:]
- return t, serialized, nil
+ return item.(*Task), serialized, nil
}
// TaskDecoder decodes bytes into Tasks via GOB decoding. Not safe for
// concurrent use.
type TaskDecoder struct {
- // input contains the incoming byte slices. Process() sends on this channel,
- // decode() receives from it, and Result() closes it.
- input chan []byte
- // output contains decoded Tasks. decode() sends on this channel, collect()
- // receives from it, and run() closes it when all decode() goroutines have
- // finished.
- output chan *Task
- // result contains the return value of Result(). collect() sends a single
- // value on this channel and closes it. Result() receives from it.
- result chan []*Task
- // errors contains the first error from any goroutine. It's a channel in case
- // multiple goroutines experience an error at the same time.
- errors chan error
+ *util.GobDecoder
}
-const kNumDecoderGoroutines = 10
-
-// init initializes d if it has not been initialized. May not be called concurrently.
-func (d *TaskDecoder) init() {
- if d.input == nil {
- d.input = make(chan []byte, kNumDecoderGoroutines*2)
- d.output = make(chan *Task, kNumDecoderGoroutines)
- d.result = make(chan []*Task, 1)
- d.errors = make(chan error, kNumDecoderGoroutines)
- go d.run()
- go d.collect()
+// NewTaskDecoder returns a TaskDecoder instance.
+func NewTaskDecoder() *TaskDecoder {
+ return &TaskDecoder{
+ GobDecoder: util.NewGobDecoder(func() interface{} {
+ return &Task{}
+ }, func(ch <-chan interface{}) interface{} {
+ items := []*Task{}
+ for item := range ch {
+ items = append(items, item.(*Task))
+ }
+ return items
+ }),
}
}
-// run starts the decode goroutines and closes d.output when they finish.
-func (d *TaskDecoder) run() {
- // Start decoders.
- wg := sync.WaitGroup{}
- for i := 0; i < kNumDecoderGoroutines; i++ {
- wg.Add(1)
- go d.decode(&wg)
- }
- // Wait for decoders to exit.
- wg.Wait()
- // Drain d.input in the case that errors were encountered, to avoid deadlock.
- for range d.input {
- }
- close(d.output)
-}
-
-// decode receives from d.input and sends to d.output until d.input is closed or
-// d.errors is non-empty. Decrements wg when done.
-func (d *TaskDecoder) decode(wg *sync.WaitGroup) {
- for b := range d.input {
- var t Task
- if err := gob.NewDecoder(bytes.NewReader(b)).Decode(&t); err != nil {
- d.errors <- err
- break
- }
- d.output <- &t
- if len(d.errors) > 0 {
- break
- }
- }
- wg.Done()
-}
-
-// collect receives from d.output until it is closed, then sends on d.result.
-func (d *TaskDecoder) collect() {
- result := []*Task{}
- for t := range d.output {
- result = append(result, t)
- }
- d.result <- result
- close(d.result)
-}
-
-// Process decodes the byte slice into a Task and includes it in Result() (in
-// arbitrary order). Returns false if Result is certain to return an error.
-// Caller must ensure b does not change until after Result() returns.
-func (d *TaskDecoder) Process(b []byte) bool {
- d.init()
- d.input <- b
- return len(d.errors) == 0
-}
-
// Result returns all decoded Tasks provided to Process (in arbitrary order), or
// any error encountered.
func (d *TaskDecoder) Result() ([]*Task, error) {
- // Allow TaskDecoder to be used without initialization.
- if d.result == nil {
- return []*Task{}, nil
- }
- close(d.input)
- select {
- case err := <-d.errors:
+ res, err := d.GobDecoder.Result()
+ if err != nil {
return nil, err
- case result := <-d.result:
- return result, nil
}
+ return res.([]*Task), nil
}
// TagsForTask returns the tags which should be set for a Task.
diff --git a/task_scheduler/go/types/task_test.go b/task_scheduler/go/types/task_test.go
index e6bcbe4..c1e53fd 100644
--- a/task_scheduler/go/types/task_test.go
+++ b/task_scheduler/go/types/task_test.go
@@ -590,7 +590,7 @@
func TestTaskDecoder(t *testing.T) {
testutils.SmallTest(t)
- d := TaskDecoder{}
+ d := NewTaskDecoder()
expectedTasks := map[string]*Task{}
for i := 0; i < 250; i++ {
task := &Task{}
@@ -617,7 +617,7 @@
func TestTaskDecoderNoTasks(t *testing.T) {
testutils.SmallTest(t)
- d := TaskDecoder{}
+ d := NewTaskDecoder()
result, err := d.Result()
assert.NoError(t, err)
assert.Equal(t, 0, len(result))
@@ -633,7 +633,7 @@
serialized := buf.Bytes()
invalid := append([]byte("Hi Mom!"), serialized...)
- d := TaskDecoder{}
+ d := NewTaskDecoder()
// Process should return true before it encounters an invalid result.
assert.True(t, d.Process(serialized))
assert.True(t, d.Process(serialized))
diff --git a/task_scheduler/sys/task-scheduler-internal.service b/task_scheduler/sys/task-scheduler-internal.service
index 24724fe..e13856a 100644
--- a/task_scheduler/sys/task-scheduler-internal.service
+++ b/task_scheduler/sys/task-scheduler-internal.service
@@ -11,8 +11,7 @@
--isolate_server=https://chrome-isolated.appspot.com \
--logtostderr \
--pool=SkiaInternal \
- --pubsub_topic_tasks=task-scheduler-modified-tasks-internal \
- --pubsub_topic_jobs=task-scheduler-modified-jobs-internal \
+ --pubsub_topic_set=internal \
--pubsub_topic=swarming-tasks-internal \
--pubsub_subscriber=task-scheduler-internal \
--repo=https://skia.googlesource.com/internal_test.git \
diff --git a/task_scheduler/sys/task-scheduler-staging.service b/task_scheduler/sys/task-scheduler-staging.service
index 51e7b30..a322539 100644
--- a/task_scheduler/sys/task-scheduler-staging.service
+++ b/task_scheduler/sys/task-scheduler-staging.service
@@ -12,8 +12,7 @@
--isolate_server=https://isolateserver-dev.appspot.com \
--logtostderr \
--pool=Skia \
- --pubsub_topic_tasks=task-scheduler-modified-tasks-staging \
- --pubsub_topic_jobs=task-scheduler-modified-jobs-staging \
+ --pubsub_topic_set=staging \
--pubsub_topic=swarming-tasks-staging \
--pubsub_subscriber=task-scheduler-staging \
--repo=https://skia.googlesource.com/skiabot-test.git \
diff --git a/task_scheduler/sys/task-scheduler.service b/task_scheduler/sys/task-scheduler.service
index 4ba663e..782014c 100644
--- a/task_scheduler/sys/task-scheduler.service
+++ b/task_scheduler/sys/task-scheduler.service
@@ -8,8 +8,7 @@
ExecStart=/usr/local/bin/task_scheduler \
--host=task-scheduler.skia.org \
--logtostderr \
- --pubsub_topic_tasks=task-scheduler-modified-tasks \
- --pubsub_topic_jobs=task-scheduler-modified-jobs \
+ --pubsub_topic_set=prod \
--workdir=/mnt/pd0/task_scheduler_workdir \
--resources_dir=/usr/local/share/task-scheduler/
Restart=always