[task scheduler] Add ModifiedComments

Adds full support for GetModifiedComments, both in-memory and pubsub.
Pass the unified ModifiedTasks+ModifiedJobs+ModifiedComments into the DB
constructors rather than individually.

Bug: skia:
Change-Id: I68386757fd95db81aeaa16d9ac8b34ec928c9b12
Reviewed-on: https://skia-review.googlesource.com/c/177722
Commit-Queue: Eric Boren <borenet@google.com>
Reviewed-by: Ben Wagner <benjaminwagner@google.com>
diff --git a/datahopper/go/datahopper/main.go b/datahopper/go/datahopper/main.go
index 9ea7ef0..b13cf27 100644
--- a/datahopper/go/datahopper/main.go
+++ b/datahopper/go/datahopper/main.go
@@ -7,6 +7,7 @@
 import (
 	"context"
 	"flag"
+	"fmt"
 	"os"
 	"path"
 	"path/filepath"
@@ -42,10 +43,9 @@
 	taskSchedulerDbUrl = flag.String("task_db_url", "http://skia-task-scheduler:8008/db/", "Where the Skia task scheduler database is hosted.")
 	workdir            = flag.String("workdir", ".", "Working directory used by data processors.")
 
-	perfBucket       = flag.String("perf_bucket", "skia-perf", "The GCS bucket that should be used for writing into perf")
-	perfPrefix       = flag.String("perf_duration_prefix", "task-duration", "The folder name in the bucket that task duration metric shoudl be written.")
-	tasksPubsubTopic = flag.String("pubsub_topic_tasks", pubsub.TOPIC_TASKS, "Pubsub topic for tasks.")
-	jobsPubsubTopic  = flag.String("pubsub_topic_jobs", pubsub.TOPIC_JOBS, "Pubsub topic for jobs.")
+	perfBucket     = flag.String("perf_bucket", "skia-perf", "The GCS bucket that should be used for writing into perf")
+	perfPrefix     = flag.String("perf_duration_prefix", "task-duration", "The folder name in the bucket that task duration metric shoudl be written.")
+	pubsubTopicSet = flag.String("pubsub_topic_set", "", fmt.Sprintf("Pubsub topic set; one of: %v", pubsub.VALID_TOPIC_SETS))
 )
 
 var (
@@ -166,20 +166,16 @@
 	var d db.RemoteDB
 	if *firestoreInstance != "" {
 		label := "datahopper"
-		modTasks, err := pubsub.NewModifiedTasks(*tasksPubsubTopic, label, newTs)
+		mod, err := pubsub.NewModifiedData(*pubsubTopicSet, label, newTs)
 		if err != nil {
 			sklog.Fatal(err)
 		}
-		modJobs, err := pubsub.NewModifiedJobs(*jobsPubsubTopic, label, newTs)
-		if err != nil {
-			sklog.Fatal(err)
-		}
-		d, err = firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, newTs, modTasks, modJobs)
+		d, err = firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, newTs, mod)
 		if err != nil {
 			sklog.Fatalf("Failed to create Firestore DB client: %s", err)
 		}
 	} else {
-		d, err = remote_db.NewClient(*taskSchedulerDbUrl, *tasksPubsubTopic, *jobsPubsubTopic, "datahopper", newTs)
+		d, err = remote_db.NewClient(*taskSchedulerDbUrl, *pubsubTopicSet, "datahopper", newTs)
 		if err != nil {
 			sklog.Fatal(err)
 		}
diff --git a/go/util/gob.go b/go/util/gob.go
new file mode 100644
index 0000000..207f93e
--- /dev/null
+++ b/go/util/gob.go
@@ -0,0 +1,222 @@
+package util
+
+import (
+	"bytes"
+	"encoding/gob"
+	"sync"
+)
+
+const kNumDecoderGoroutines = 10
+
+// GobEncoder encodes structs into bytes via GOB encoding. Not safe for
+// concurrent use.
+//
+// Here's a template for writing a type-specific encoder:
+//
+// // FooEncoder encodes Foos into bytes via GOB encoding. Not safe for
+// // concurrent use.
+// type FooEncoder {
+// 	util.GobEncoder
+// }
+//
+// // Next returns one of the Foox provided to Process (in arbitrary order) and
+// // its serialized bytes. If any items remain, returns the item, the
+// // serialized bytes, nil. If all items have been returned, returns nil, nil,
+// // nil. If an error is encountered, returns nil, nil, error.
+// func (e *FooEncoder) Next() (*Foo, []byte, error) {
+//	item, serialized, err := e.GobEncoder.Next()
+//	if err != nil {
+//		return nil, nil, err
+//	} else if item == nil {
+//		return nil, nil, nil
+//	}
+//	return item.(*Foo), serialized, nil
+// }
+type GobEncoder struct {
+	err    error
+	items  []interface{}
+	result [][]byte
+}
+
+// Process encodes the item into a byte slice that will be returned from
+// Next() (in arbitrary order). Returns false if Next is certain to return an
+// error. Caller must ensure item does not change until after the first call to
+// Next(). May not be called after calling Next().
+func (e *GobEncoder) Process(item interface{}) bool {
+	if e.err != nil {
+		return false
+	}
+	var buf bytes.Buffer
+	if err := gob.NewEncoder(&buf).Encode(item); err != nil {
+		e.err = err
+		e.items = nil
+		e.result = nil
+		return false
+	}
+	e.items = append(e.items, item)
+	e.result = append(e.result, buf.Bytes())
+	return true
+}
+
+// Next returns one of the items provided to Process (in arbitrary order) and
+// its serialized bytes. If any items remain, returns the item, the serialized
+// bytes, nil. If all items have been returned, returns nil, nil, nil. If an
+// error is encountered, returns nil, nil, error.
+func (e *GobEncoder) Next() (interface{}, []byte, error) {
+	if e.err != nil {
+		return nil, nil, e.err
+	}
+	if len(e.items) == 0 {
+		return nil, nil, nil
+	}
+	c := e.items[0]
+	e.items = e.items[1:]
+	serialized := e.result[0]
+	e.result = e.result[1:]
+	return c, serialized, nil
+}
+
+// GobDecoder decodes bytes into structs via GOB decoding. Not safe for
+// concurrent use.
+//
+// Here's a template for writing a type-specific decoder:
+//
+// FooDecoder decodes bytes into Foos via GOB decoding. Not safe for
+// concurrent use.
+// type FooDecoder struct {
+//	*util.GobDecoder
+// }
+//
+// // NewFooDecoder returns a FooDecoder instance.
+// func NewFooDecoder() *FooDecoder {
+//	return &FooDecoder{
+//		GobDecoder: util.NewGobDecoder(func() interface{} {
+//			return &Foo{}
+//		}, func(ch <-chan interface{}) interface{} {
+//			items := []*Foo{}
+//			for item := range ch {
+//				items = append(items, item.(*Foo))
+//			}
+//			return items
+//		}),
+//	}
+// }
+//
+// // Result returns all decoded Foos provided to Process (in arbitrary order), or
+// // any error encountered.
+// func (d *FooDecoder) Result() ([]*Foo, error) {
+//	res, err := d.GobDecoder.Result()
+//	if err != nil {
+//		return nil, err
+//	}
+//	return res.([]*Foo), nil
+// }
+type GobDecoder struct {
+	// input contains the incoming byte slices. Process() sends on this
+	// channel, decode() receives from it, and Result() closes it.
+	input chan []byte
+	// output contains decoded items. decode() sends on this channel,
+	// collect() receives from it, and run() closes it when all decode()
+	// goroutines have finished.
+	output chan interface{}
+	// result contains the return value of Result(). collect() sends a single
+	// value on this channel and closes it. Result() receives from it.
+	result chan interface{}
+	// errors contains the first error from any goroutine. It's a channel in
+	// case multiple goroutines experience an error at the same time.
+	errors chan error
+
+	newItem     func() interface{}
+	collectImpl func(<-chan interface{}) interface{}
+}
+
+// NewGobDecoder returns a GobDecoder instance. The first argument is a
+// goroutine-safe function which returns a zero-valued instance of the type
+// being decoded, eg.
+//
+// func() interface{} {
+//	return &MyType{}
+// }
+//
+// The second argument is a function which collects decoded instances of that
+// type from a channel and returns a slice, eg.
+//
+// func(ch <-chan interface{}) interface{} {
+//	items := []*MyType{}
+//	for item := range ch {
+//		items = append(items, item.(*MyType))
+//	}
+//	return items
+// }
+func NewGobDecoder(newItem func() interface{}, collect func(<-chan interface{}) interface{}) *GobDecoder {
+	d := &GobDecoder{
+		input:       make(chan []byte, kNumDecoderGoroutines*2),
+		output:      make(chan interface{}, kNumDecoderGoroutines),
+		result:      make(chan interface{}, 1),
+		errors:      make(chan error, kNumDecoderGoroutines),
+		newItem:     newItem,
+		collectImpl: collect,
+	}
+	go d.run()
+	go d.collect()
+	return d
+}
+
+// run starts the decode goroutines and closes d.output when they finish.
+func (d *GobDecoder) run() {
+	// Start decoders.
+	wg := sync.WaitGroup{}
+	for i := 0; i < kNumDecoderGoroutines; i++ {
+		wg.Add(1)
+		go d.decode(&wg)
+	}
+	// Wait for decoders to exit.
+	wg.Wait()
+	// Drain d.input in the case that errors were encountered, to avoid deadlock.
+	for range d.input {
+	}
+	close(d.output)
+}
+
+// decode receives from d.input and sends to d.output until d.input is closed or
+// d.errors is non-empty. Decrements wg when done.
+func (d *GobDecoder) decode(wg *sync.WaitGroup) {
+	for b := range d.input {
+		item := d.newItem()
+		if err := gob.NewDecoder(bytes.NewReader(b)).Decode(item); err != nil {
+			d.errors <- err
+			break
+		}
+		d.output <- item
+		if len(d.errors) > 0 {
+			break
+		}
+	}
+	wg.Done()
+}
+
+// collect receives from d.output until it is closed, then sends on d.result.
+func (d *GobDecoder) collect() {
+	d.result <- d.collectImpl(d.output)
+	close(d.result)
+}
+
+// Process decodes the byte slice and includes it in Result() (in arbitrary
+// order). Returns false if Result is certain to return an error. Caller must
+// ensure b does not change until after Result() returns.
+func (d *GobDecoder) Process(b []byte) bool {
+	d.input <- b
+	return len(d.errors) == 0
+}
+
+// Result returns all decoded items provided to Process (in arbitrary order), or
+// any error encountered.
+func (d *GobDecoder) Result() (interface{}, error) {
+	close(d.input)
+	select {
+	case err := <-d.errors:
+		return nil, err
+	case result := <-d.result:
+		return result, nil
+	}
+}
diff --git a/go/util/gob_test.go b/go/util/gob_test.go
new file mode 100644
index 0000000..ec63312
--- /dev/null
+++ b/go/util/gob_test.go
@@ -0,0 +1,138 @@
+package util
+
+import (
+	"bytes"
+	"encoding/gob"
+	"fmt"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+	"go.skia.org/infra/go/deepequal"
+	"go.skia.org/infra/go/testutils"
+)
+
+type Item struct {
+	Id    string
+	Map   map[string]string
+	Slice []string
+}
+
+func TestGobEncoder(t *testing.T) {
+	testutils.SmallTest(t)
+	// TODO(benjaminwagner): Is there any way to cause an error?
+	e := GobEncoder{}
+	expectedItems := map[*Item][]byte{}
+	for i := 0; i < 25; i++ {
+		item := &Item{}
+		item.Id = fmt.Sprintf("Id-%d", i)
+		item.Map = map[string]string{"PointA": "PointB"}
+		item.Slice = []string{"bread"}
+		var buf bytes.Buffer
+		err := gob.NewEncoder(&buf).Encode(item)
+		assert.NoError(t, err)
+		expectedItems[item] = buf.Bytes()
+		assert.True(t, e.Process(item))
+	}
+
+	actualItems := map[*Item][]byte{}
+	for item, serialized, err := e.Next(); item != nil; item, serialized, err = e.Next() {
+		assert.NoError(t, err)
+		actualItems[item.(*Item)] = serialized
+	}
+
+	deepequal.AssertDeepEqual(t, expectedItems, actualItems)
+}
+
+func TestGobEncoderNoItems(t *testing.T) {
+	testutils.SmallTest(t)
+	e := GobEncoder{}
+	item, serialized, err := e.Next()
+	assert.NoError(t, err)
+	assert.Nil(t, item)
+	assert.Nil(t, serialized)
+}
+
+func TestGobDecoder(t *testing.T) {
+	testutils.SmallTest(t)
+	d := NewGobDecoder(func() interface{} {
+		return &Item{}
+	}, func(ch <-chan interface{}) interface{} {
+		items := []*Item{}
+		for item := range ch {
+			items = append(items, item.(*Item))
+		}
+		return items
+	})
+	expectedItems := map[string]*Item{}
+	for i := 0; i < 250; i++ {
+		item := &Item{}
+		item.Id = fmt.Sprintf("Id-%d", i)
+		item.Map = map[string]string{"PointA": "PointB"}
+		item.Slice = []string{"bread"}
+		var buf bytes.Buffer
+		err := gob.NewEncoder(&buf).Encode(item)
+		assert.NoError(t, err)
+		expectedItems[item.Id] = item
+		assert.True(t, d.Process(buf.Bytes()))
+	}
+
+	actualItems := map[string]*Item{}
+	iResult, err := d.Result()
+	assert.NoError(t, err)
+	result := iResult.([]*Item)
+	assert.Equal(t, len(expectedItems), len(result))
+	for _, item := range result {
+		actualItems[item.Id] = item
+	}
+	deepequal.AssertDeepEqual(t, expectedItems, actualItems)
+}
+
+func TestGobDecoderNoItems(t *testing.T) {
+	testutils.SmallTest(t)
+	d := NewGobDecoder(func() interface{} {
+		return &Item{}
+	}, func(ch <-chan interface{}) interface{} {
+		items := []*Item{}
+		for item := range ch {
+			items = append(items, item.(*Item))
+		}
+		return items
+	})
+	result, err := d.Result()
+	assert.NoError(t, err)
+	assert.Equal(t, 0, len(result.([]*Item)))
+}
+
+func TestGobDecoderError(t *testing.T) {
+	testutils.SmallTest(t)
+	item := &Item{}
+	item.Id = "Id"
+	var buf bytes.Buffer
+	err := gob.NewEncoder(&buf).Encode(item)
+	assert.NoError(t, err)
+	serialized := buf.Bytes()
+	invalid := append([]byte("Hi Mom!"), serialized...)
+
+	d := NewGobDecoder(func() interface{} {
+		return &Item{}
+	}, func(ch <-chan interface{}) interface{} {
+		items := []*Item{}
+		for item := range ch {
+			items = append(items, item.(*Item))
+		}
+		return items
+	})
+	// Process should return true before it encounters an invalid result.
+	assert.True(t, d.Process(serialized))
+	assert.True(t, d.Process(serialized))
+	// Process may return true or false after encountering an invalid value.
+	_ = d.Process(invalid)
+	for i := 0; i < 250; i++ {
+		_ = d.Process(serialized)
+	}
+
+	// Result should return error.
+	result, err := d.Result()
+	assert.Error(t, err)
+	assert.Nil(t, result)
+}
diff --git a/status/go/incremental/incremental_test.go b/status/go/incremental/incremental_test.go
index ad977cd..2cf6955 100644
--- a/status/go/incremental/incremental_test.go
+++ b/status/go/incremental/incremental_test.go
@@ -21,9 +21,7 @@
 
 func setup(t *testing.T) (context.Context, string, *IncrementalCache, repograph.Map, db.DB, *git_testutils.GitBuilder, func()) {
 	testutils.LargeTest(t)
-	taskDb := memory.NewInMemoryTaskDB(nil)
-	commentDb := &db.CommentBox{}
-	d := db.NewDB(taskDb, nil, commentDb)
+	d := memory.NewInMemoryDB(nil)
 
 	ctx := context.Background()
 	gb := git_testutils.GitInit(t, ctx)
@@ -48,7 +46,7 @@
 			Name: "DummyTask",
 		},
 	}
-	assert.NoError(t, taskDb.PutTask(initialTask))
+	assert.NoError(t, d.PutTask(initialTask))
 
 	w, err := window.New(24*time.Hour, 100, repos)
 	assert.NoError(t, err)
diff --git a/status/go/status/main.go b/status/go/status/main.go
index 7a10288..03f0100 100644
--- a/status/go/status/main.go
+++ b/status/go/status/main.go
@@ -101,8 +101,7 @@
 	testing                     = flag.Bool("testing", false, "Set to true for locally testing rules. No email will be sent.")
 	useMetadata                 = flag.Bool("use_metadata", true, "Load sensitive values from metadata not from flags.")
 	workdir                     = flag.String("workdir", ".", "Directory to use for scratch work.")
-	pubsubTopicTasks            = flag.String("pubsub_topic_tasks", pubsub.TOPIC_TASKS, "Pubsub topic for tasks.")
-	pubsubTopicJobs             = flag.String("pubsub_topic_jobs", pubsub.TOPIC_JOBS, "Pubsub topic for jobs.")
+	pubsubTopicSet              = flag.String("pubsub_topic_set", "", fmt.Sprintf("Pubsub topic set; one of: %v", pubsub.VALID_TOPIC_SETS))
 
 	repos repograph.Map
 )
@@ -729,7 +728,7 @@
 
 	// Create remote Tasks DB.
 	if *testing {
-		taskDb, err = local_db.NewDB("status-testing", path.Join(*workdir, "status-testing.bdb"), nil, nil)
+		taskDb, err = local_db.NewDB("status-testing", path.Join(*workdir, "status-testing.bdb"), nil)
 		if err != nil {
 			sklog.Fatalf("Failed to create local task DB: %s", err)
 		}
@@ -737,22 +736,18 @@
 	} else if *firestoreInstance != "" {
 		sklog.Infof("Creating firestore DB.")
 		label := *host
-		modTasks, err := pubsub.NewModifiedTasks(*pubsubTopicTasks, label, ts)
+		mod, err := pubsub.NewModifiedData(*pubsubTopicSet, label, ts)
 		if err != nil {
 			sklog.Fatal(err)
 		}
-		modJobs, err := pubsub.NewModifiedJobs(*pubsubTopicJobs, label, ts)
-		if err != nil {
-			sklog.Fatal(err)
-		}
-		taskDb, err = firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, ts, modTasks, modJobs)
+		taskDb, err = firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, ts, mod)
 		if err != nil {
 			sklog.Fatalf("Failed to create Firestore DB client: %s", err)
 		}
 	} else {
 		sklog.Infof("Creating remote DB.")
 		label := *host
-		taskDb, err = remote_db.NewClient(*taskSchedulerDbUrl, *pubsubTopicTasks, *pubsubTopicJobs, label, ts)
+		taskDb, err = remote_db.NewClient(*taskSchedulerDbUrl, *pubsubTopicSet, label, ts)
 		if err != nil {
 			sklog.Fatalf("Failed to create remote task DB: %s", err)
 		}
diff --git a/status/go/statusk/main.go b/status/go/statusk/main.go
index 780d280..9334ff1 100644
--- a/status/go/statusk/main.go
+++ b/status/go/statusk/main.go
@@ -109,8 +109,7 @@
 	testing                     = flag.Bool("testing", false, "Set to true for locally testing rules. No email will be sent.")
 	useMetadata                 = flag.Bool("use_metadata", true, "Load sensitive values from metadata not from flags.")
 	workdir                     = flag.String("workdir", ".", "Directory to use for scratch work.")
-	pubsubTopicTasks            = flag.String("pubsub_topic_tasks", pubsub.TOPIC_TASKS, "Pubsub topic for tasks.")
-	pubsubTopicJobs             = flag.String("pubsub_topic_jobs", pubsub.TOPIC_JOBS, "Pubsub topic for jobs.")
+	pubsubTopicSet              = flag.String("pubsub_topic_set", "", fmt.Sprintf("Pubsub topic set; one of: %v", pubsub.VALID_TOPIC_SETS))
 
 	repos repograph.Map
 )
@@ -716,7 +715,7 @@
 
 	// Create remote Tasks DB.
 	if *testing {
-		taskDb, err = local_db.NewDB("status-testing", path.Join(*workdir, "status-testing.bdb"), nil, nil)
+		taskDb, err = local_db.NewDB("status-testing", path.Join(*workdir, "status-testing.bdb"), nil)
 		if err != nil {
 			sklog.Fatalf("Failed to create local task DB: %s", err)
 		}
@@ -729,21 +728,17 @@
 
 		if *firestoreInstance != "" {
 			label := *host
-			modTasks, err := pubsub.NewModifiedTasks(*pubsubTopicTasks, label, ts)
+			mod, err := pubsub.NewModifiedData(*pubsubTopicSet, label, ts)
 			if err != nil {
 				sklog.Fatal(err)
 			}
-			modJobs, err := pubsub.NewModifiedJobs(*pubsubTopicJobs, label, ts)
-			if err != nil {
-				sklog.Fatal(err)
-			}
-			taskDb, err = firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, ts, modTasks, modJobs)
+			taskDb, err = firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, ts, mod)
 			if err != nil {
 				sklog.Fatalf("Failed to create Firestore DB client: %s", err)
 			}
 		} else {
 			label := *host
-			taskDb, err = remote_db.NewClient(*taskSchedulerDbUrl, *pubsubTopicTasks, *pubsubTopicJobs, label, ts)
+			taskDb, err = remote_db.NewClient(*taskSchedulerDbUrl, *pubsubTopicSet, label, ts)
 			if err != nil {
 				sklog.Fatalf("Failed to create remote task DB: %s", err)
 			}
diff --git a/status/sys/status-internal.service b/status/sys/status-internal.service
index 91e59fa..571d63a 100644
--- a/status/sys/status-internal.service
+++ b/status/sys/status-internal.service
@@ -12,8 +12,7 @@
     --host=status-internal.skia.org \
     --resources_dir=/usr/local/share/status \
     --capacity_recalculate_interval=30m \
-    --pubsub_topic_tasks=task-scheduler-modified-tasks-internal \
-    --pubsub_topic_jobs=task-scheduler-modified-jobs-internal \
+    --pubsub_topic_set=internal \
     --repo=https://skia.googlesource.com/skia_internal.git \
     --repo=https://skia.googlesource.com/internal_test.git \
     --swarming_url=https://chrome-swarming.appspot.com \
diff --git a/status/sys/status-staging.service b/status/sys/status-staging.service
index 61077ee..708ac21 100644
--- a/status/sys/status-staging.service
+++ b/status/sys/status-staging.service
@@ -13,8 +13,7 @@
     --resources_dir=/usr/local/share/status \
     --capacity_recalculate_interval=30m \
     --firestore_instance=staging \
-    --pubsub_topic_tasks=task-scheduler-modified-tasks-staging \
-    --pubsub_topic_jobs=task-scheduler-modified-jobs-staging \
+    --pubsub_topic_set=staging \
     --repo=https://skia.googlesource.com/skiabot-test.git \
     --swarming_url=https://chromium-swarm-dev.appspot.com \
     --task_scheduler_url=https://task-scheduler-staging.skia.org \
diff --git a/status/sys/statusd.service b/status/sys/statusd.service
index 45ec057..8488151 100644
--- a/status/sys/statusd.service
+++ b/status/sys/statusd.service
@@ -12,8 +12,7 @@
     --host=status.skia.org \
     --resources_dir=/usr/local/share/status \
     --capacity_recalculate_interval=30m \
-    --pubsub_topic_tasks=task-scheduler-modified-tasks \
-    --pubsub_topic_jobs=task-scheduler-modified-jobs \
+    --pubsub_topic_set=prod \
     --task_db_url=http://skia-task-scheduler:8008/db/
 Restart=always
 User=default
diff --git a/task_scheduler/go/db/comments.go b/task_scheduler/go/db/comments.go
index 8123c53..8885f2b 100644
--- a/task_scheduler/go/db/comments.go
+++ b/task_scheduler/go/db/comments.go
@@ -1,20 +1,56 @@
 package db
 
 import (
-	"fmt"
-	"sync"
 	"time"
 
-	"go.skia.org/infra/go/sklog"
-	"go.skia.org/infra/go/util"
 	"go.skia.org/infra/task_scheduler/go/types"
 )
 
+// ModifiedCommentsReader tracks which comments have been added or deleted and
+// returns results to subscribers based on what has changed since the last call
+// to GetModifiedComments.
+type ModifiedComments interface {
+	// GetModifiedComments returns all comments added or deleted since the
+	// last time GetModifiedComments was run with the given id. The returned
+	// comments are sorted by timestamp. If GetModifiedComments returns an
+	// error, the caller should call StopTrackingModifiedComments and
+	// StartTrackingModifiedComments again, and load all data from scratch
+	// to be sure that no comments were missed.
+	GetModifiedComments(string) ([]*types.TaskComment, []*types.TaskSpecComment, []*types.CommitComment, error)
+
+	// StartTrackingModifiedComments initiates tracking of modified comments
+	// for the current caller. Returns a unique ID which can be used by the
+	// caller to retrieve comments which have been added or deleted since
+	// the last query. The ID expires after a period of inactivity.
+	StartTrackingModifiedComments() (string, error)
+
+	// StopTrackingModifiedComments cancels tracking of modified comments
+	// for the provided ID.
+	StopTrackingModifiedComments(string)
+
+	// TrackModifiedTaskComment indicates the given comment should be
+	// returned from the next call to GetModifiedComments from each
+	// subscriber.
+	TrackModifiedTaskComment(*types.TaskComment)
+
+	// TrackModifiedTaskSpecComment indicates the given comment should be
+	// returned from the next call to GetModifiedComments from each
+	// subscriber.
+	TrackModifiedTaskSpecComment(*types.TaskSpecComment)
+
+	// TrackModifiedCommitComment indicates the given comment should be
+	// returned from the next call to GetModifiedComments from each
+	// subscriber.
+	TrackModifiedCommitComment(*types.CommitComment)
+}
+
 // CommentDB stores comments on Tasks, TaskSpecs, and commits.
 //
 // Clients must be tolerant of comments that refer to nonexistent Tasks,
 // TaskSpecs, or commits.
 type CommentDB interface {
+	ModifiedComments
+
 	// GetComments returns all comments for the given repos.
 	//
 	// If from is specified, it is a hint that TaskComments and CommitComments
@@ -45,373 +81,3 @@
 	// Non-ID fields of the argument are ignored.
 	DeleteCommitComment(*types.CommitComment) error
 }
-
-// CommentBox implements CommentDB with in-memory storage.
-//
-// When created via NewCommentBoxWithPersistence, CommentBox will persist the
-// in-memory representation on every change using the provided writer function.
-//
-// CommentBox can be default-initialized if only in-memory storage is desired.
-type CommentBox struct {
-	// mtx protects comments.
-	mtx sync.RWMutex
-	// comments is map[repo_name]*types.RepoComments.
-	comments map[string]*types.RepoComments
-	// writer is called to persist comments after every change.
-	writer func(map[string]*types.RepoComments) error
-}
-
-// NewCommentBoxWithPersistence creates a CommentBox that is initialized with
-// init and sends the updated in-memory representation to writer after each
-// change. The value of init and the argument to writer is
-// map[repo_name]*types.RepoComments. init must not be modified by the caller. writer
-// must not call any methods of CommentBox. writer may return an error to
-// prevent a change from taking effect.
-func NewCommentBoxWithPersistence(init map[string]*types.RepoComments, writer func(map[string]*types.RepoComments) error) *CommentBox {
-	return &CommentBox{
-		comments: init,
-		writer:   writer,
-	}
-}
-
-// See documentation for CommentDB.GetCommentsForRepos.
-func (b *CommentBox) GetCommentsForRepos(repos []string, from time.Time) ([]*types.RepoComments, error) {
-	b.mtx.RLock()
-	defer b.mtx.RUnlock()
-	rv := make([]*types.RepoComments, len(repos))
-	for i, repo := range repos {
-		if rc, ok := b.comments[repo]; ok {
-			rv[i] = rc.Copy()
-		} else {
-			rv[i] = &types.RepoComments{Repo: repo}
-		}
-	}
-	return rv, nil
-}
-
-// write calls b.writer with comments if non-null.
-func (b *CommentBox) write() error {
-	if b.writer == nil {
-		return nil
-	}
-	return b.writer(b.comments)
-}
-
-// getRepoComments returns the initialized *types.RepoComments for the given repo.
-func (b *CommentBox) getRepoComments(repo string) *types.RepoComments {
-	if b.comments == nil {
-		b.comments = make(map[string]*types.RepoComments, 1)
-	}
-	rc, ok := b.comments[repo]
-	if !ok {
-		rc = &types.RepoComments{
-			Repo:             repo,
-			TaskComments:     map[string]map[string][]*types.TaskComment{},
-			TaskSpecComments: map[string][]*types.TaskSpecComment{},
-			CommitComments:   map[string][]*types.CommitComment{},
-		}
-		b.comments[repo] = rc
-	}
-	return rc
-}
-
-// putTaskComment validates c and adds c to b.comments, or returns
-// ErrAlreadyExists if a different comment has the same ID fields. Assumes b.mtx
-// is write-locked.
-func (b *CommentBox) putTaskComment(c *types.TaskComment) error {
-	if c.Repo == "" || c.Revision == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
-		return fmt.Errorf("TaskComment missing required fields. %#v", c)
-	}
-	rc := b.getRepoComments(c.Repo)
-	nameMap, ok := rc.TaskComments[c.Revision]
-	if !ok {
-		nameMap = map[string][]*types.TaskComment{}
-		rc.TaskComments[c.Revision] = nameMap
-	}
-	cSlice := nameMap[c.Name]
-	// TODO(benjaminwagner): Would using utilities in the sort package make this
-	// cleaner?
-	if len(cSlice) > 0 {
-		// Assume comments normally inserted at the end.
-		insert := 0
-		for i := len(cSlice) - 1; i >= 0; i-- {
-			if cSlice[i].Timestamp.Equal(c.Timestamp) {
-				if *cSlice[i] == *c {
-					return nil
-				} else {
-					return ErrAlreadyExists
-				}
-			} else if cSlice[i].Timestamp.Before(c.Timestamp) {
-				insert = i + 1
-				break
-			}
-		}
-		// Ensure capacity for another comment and move any comments after the
-		// insertion point.
-		cSlice = append(cSlice, nil)
-		copy(cSlice[insert+1:], cSlice[insert:])
-		cSlice[insert] = c.Copy()
-	} else {
-		cSlice = []*types.TaskComment{c.Copy()}
-	}
-	nameMap[c.Name] = cSlice
-	return nil
-}
-
-// deleteTaskComment validates c, then finds and removes a comment matching c's
-// ID fields, returning the comment if found. Assumes b.mtx is write-locked.
-func (b *CommentBox) deleteTaskComment(c *types.TaskComment) (*types.TaskComment, error) {
-	if c.Repo == "" || c.Revision == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
-		return nil, fmt.Errorf("TaskComment missing required fields. %#v", c)
-	}
-	if rc, ok := b.comments[c.Repo]; ok {
-		if cSlice, ok := rc.TaskComments[c.Revision][c.Name]; ok {
-			// Assume linear search is fast.
-			for i, existing := range cSlice {
-				if existing.Timestamp.Equal(c.Timestamp) {
-					if len(cSlice) > 1 {
-						rc.TaskComments[c.Revision][c.Name] = append(cSlice[:i], cSlice[i+1:]...)
-					} else {
-						delete(rc.TaskComments[c.Revision], c.Name)
-						if len(rc.TaskComments[c.Revision]) == 0 {
-							delete(rc.TaskComments, c.Revision)
-						}
-					}
-					return existing, nil
-				}
-			}
-		}
-	}
-	return nil, nil
-}
-
-// See documentation for CommentDB.PutTaskComment.
-func (b *CommentBox) PutTaskComment(c *types.TaskComment) error {
-	b.mtx.Lock()
-	defer b.mtx.Unlock()
-	if err := b.putTaskComment(c); err != nil {
-		return err
-	}
-	if err := b.write(); err != nil {
-		// If write returns an error, we must revert to previous.
-		if _, delErr := b.deleteTaskComment(c); delErr != nil {
-			sklog.Warningf("Unexpected error: %s", delErr)
-		}
-		return err
-	}
-	return nil
-}
-
-// See documentation for CommentDB.DeleteTaskComment.
-func (b *CommentBox) DeleteTaskComment(c *types.TaskComment) error {
-	b.mtx.Lock()
-	defer b.mtx.Unlock()
-	existing, err := b.deleteTaskComment(c)
-	if err != nil {
-		return err
-	}
-	if existing != nil {
-		if err := b.write(); err != nil {
-			// If write returns an error, we must revert to previous.
-			if putErr := b.putTaskComment(existing); putErr != nil {
-				sklog.Warningf("Unexpected error: %s", putErr)
-			}
-			return err
-		}
-	}
-	return nil
-}
-
-// putTaskSpecComment validates c and adds c to b.comments, or returns
-// ErrAlreadyExists if a different comment has the same ID fields. Assumes b.mtx
-// is write-locked.
-func (b *CommentBox) putTaskSpecComment(c *types.TaskSpecComment) error {
-	if c.Repo == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
-		return fmt.Errorf("TaskSpecComment missing required fields. %#v", c)
-	}
-	rc := b.getRepoComments(c.Repo)
-	cSlice := rc.TaskSpecComments[c.Name]
-	if len(cSlice) > 0 {
-		// Assume comments normally inserted at the end.
-		insert := 0
-		for i := len(cSlice) - 1; i >= 0; i-- {
-			if cSlice[i].Timestamp.Equal(c.Timestamp) {
-				if *cSlice[i] == *c {
-					return nil
-				} else {
-					return ErrAlreadyExists
-				}
-			} else if cSlice[i].Timestamp.Before(c.Timestamp) {
-				insert = i + 1
-				break
-			}
-		}
-		// Ensure capacity for another comment and move any comments after the
-		// insertion point.
-		cSlice = append(cSlice, nil)
-		copy(cSlice[insert+1:], cSlice[insert:])
-		cSlice[insert] = c.Copy()
-	} else {
-		cSlice = []*types.TaskSpecComment{c.Copy()}
-	}
-	rc.TaskSpecComments[c.Name] = cSlice
-	return nil
-}
-
-// deleteTaskSpecComment validates c, then finds and removes a comment matching
-// c's ID fields, returning the comment if found. Assumes b.mtx is write-locked.
-func (b *CommentBox) deleteTaskSpecComment(c *types.TaskSpecComment) (*types.TaskSpecComment, error) {
-	if c.Repo == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
-		return nil, fmt.Errorf("TaskSpecComment missing required fields. %#v", c)
-	}
-	if rc, ok := b.comments[c.Repo]; ok {
-		if cSlice, ok := rc.TaskSpecComments[c.Name]; ok {
-			// Assume linear search is fast.
-			for i, existing := range cSlice {
-				if existing.Timestamp.Equal(c.Timestamp) {
-					if len(cSlice) > 1 {
-						rc.TaskSpecComments[c.Name] = append(cSlice[:i], cSlice[i+1:]...)
-					} else {
-						delete(rc.TaskSpecComments, c.Name)
-					}
-					return existing, nil
-				}
-			}
-		}
-	}
-	return nil, nil
-}
-
-// See documentation for CommentDB.PutTaskSpecComment.
-func (b *CommentBox) PutTaskSpecComment(c *types.TaskSpecComment) error {
-	b.mtx.Lock()
-	defer b.mtx.Unlock()
-	if err := b.putTaskSpecComment(c); err != nil {
-		return err
-	}
-	if err := b.write(); err != nil {
-		// If write returns an error, we must revert to previous.
-		if _, delErr := b.deleteTaskSpecComment(c); delErr != nil {
-			sklog.Warningf("Unexpected error: %s", delErr)
-		}
-		return err
-	}
-	return nil
-}
-
-// See documentation for CommentDB.DeleteTaskSpecComment.
-func (b *CommentBox) DeleteTaskSpecComment(c *types.TaskSpecComment) error {
-	b.mtx.Lock()
-	defer b.mtx.Unlock()
-	existing, err := b.deleteTaskSpecComment(c)
-	if err != nil {
-		return err
-	}
-	if existing != nil {
-		if err := b.write(); err != nil {
-			// If write returns an error, we must revert to previous.
-			if putErr := b.putTaskSpecComment(existing); putErr != nil {
-				sklog.Warningf("Unexpected error: %s", putErr)
-			}
-			return err
-		}
-	}
-	return nil
-}
-
-// putCommitComment validates c and adds c to b.comments, or returns
-// ErrAlreadyExists if a different comment has the same ID fields. Assumes b.mtx
-// is write-locked.
-func (b *CommentBox) putCommitComment(c *types.CommitComment) error {
-	if c.Repo == "" || c.Revision == "" || util.TimeIsZero(c.Timestamp) {
-		return fmt.Errorf("CommitComment missing required fields. %#v", c)
-	}
-	rc := b.getRepoComments(c.Repo)
-	cSlice := rc.CommitComments[c.Revision]
-	if len(cSlice) > 0 {
-		// Assume comments normally inserted at the end.
-		insert := 0
-		for i := len(cSlice) - 1; i >= 0; i-- {
-			if cSlice[i].Timestamp.Equal(c.Timestamp) {
-				if *cSlice[i] == *c {
-					return nil
-				} else {
-					return ErrAlreadyExists
-				}
-			} else if cSlice[i].Timestamp.Before(c.Timestamp) {
-				insert = i + 1
-				break
-			}
-		}
-		// Ensure capacity for another comment and move any comments after the
-		// insertion point.
-		cSlice = append(cSlice, nil)
-		copy(cSlice[insert+1:], cSlice[insert:])
-		cSlice[insert] = c.Copy()
-	} else {
-		cSlice = []*types.CommitComment{c.Copy()}
-	}
-	rc.CommitComments[c.Revision] = cSlice
-	return nil
-}
-
-// deleteCommitComment validates c, then finds and removes a comment matching
-// c's ID fields, returning the comment if found. Assumes b.mtx is write-locked.
-func (b *CommentBox) deleteCommitComment(c *types.CommitComment) (*types.CommitComment, error) {
-	if c.Repo == "" || c.Revision == "" || util.TimeIsZero(c.Timestamp) {
-		return nil, fmt.Errorf("CommitComment missing required fields. %#v", c)
-	}
-	if rc, ok := b.comments[c.Repo]; ok {
-		if cSlice, ok := rc.CommitComments[c.Revision]; ok {
-			// Assume linear search is fast.
-			for i, existing := range cSlice {
-				if existing.Timestamp.Equal(c.Timestamp) {
-					if len(cSlice) > 1 {
-						rc.CommitComments[c.Revision] = append(cSlice[:i], cSlice[i+1:]...)
-					} else {
-						delete(rc.CommitComments, c.Revision)
-					}
-					return existing, nil
-				}
-			}
-		}
-	}
-	return nil, nil
-}
-
-// See documentation for CommentDB.PutCommitComment.
-func (b *CommentBox) PutCommitComment(c *types.CommitComment) error {
-	b.mtx.Lock()
-	defer b.mtx.Unlock()
-	if err := b.putCommitComment(c); err != nil {
-		return err
-	}
-	if err := b.write(); err != nil {
-		// If write returns an error, we must revert to previous.
-		if _, delErr := b.deleteCommitComment(c); delErr != nil {
-			sklog.Warningf("Unexpected error: %s", delErr)
-		}
-		return err
-	}
-	return nil
-}
-
-// See documentation for CommentDB.DeleteCommitComment.
-func (b *CommentBox) DeleteCommitComment(c *types.CommitComment) error {
-	b.mtx.Lock()
-	defer b.mtx.Unlock()
-	existing, err := b.deleteCommitComment(c)
-	if err != nil {
-		return err
-	}
-	if existing != nil {
-		if err := b.write(); err != nil {
-			// If write returns an error, we must revert to previous.
-			if putErr := b.putCommitComment(existing); putErr != nil {
-				sklog.Warningf("Unexpected error: %s", putErr)
-			}
-			return err
-		}
-	}
-	return nil
-}
diff --git a/task_scheduler/go/db/db.go b/task_scheduler/go/db/db.go
index ce5e259..dd7215f 100644
--- a/task_scheduler/go/db/db.go
+++ b/task_scheduler/go/db/db.go
@@ -265,6 +265,30 @@
 	PutJobs([]*types.Job) error
 }
 
+// ModifiedData combines ModifiedTasks, ModifiedJobs, and ModifiedComments.
+type ModifiedData interface {
+	ModifiedTasks
+	ModifiedJobs
+	ModifiedComments
+}
+
+// modifiedData implements ModifiedData.
+type modifiedData struct {
+	ModifiedTasks
+	ModifiedJobs
+	ModifiedComments
+}
+
+// NewModifiedData returns a ModifiedData which combines the given
+// ModifiedTasks, ModifiedJobs, and ModifiedComments.
+func NewModifiedData(t ModifiedTasks, j ModifiedJobs, c ModifiedComments) ModifiedData {
+	return &modifiedData{
+		ModifiedTasks:    t,
+		ModifiedJobs:     j,
+		ModifiedComments: c,
+	}
+}
+
 // UpdateJobsWithRetries wraps a call to db.PutJobs with retries. It calls
 // db.PutJobs(f()) repeatedly until one of the following happen:
 //  - f or db.PutJobs returns an error, which is then returned from
diff --git a/task_scheduler/go/db/firestore/comments.go b/task_scheduler/go/db/firestore/comments.go
index 4fc0234..befd147 100644
--- a/task_scheduler/go/db/firestore/comments.go
+++ b/task_scheduler/go/db/firestore/comments.go
@@ -123,14 +123,34 @@
 	if st, ok := status.FromError(err); ok && st.Code() == codes.AlreadyExists {
 		return db.ErrAlreadyExists
 	}
-	return err
+	if err != nil {
+		return err
+	}
+	d.TrackModifiedTaskComment(c)
+	return nil
 }
 
 // See documentation for db.CommentDB interface.
 func (d *firestoreDB) DeleteTaskComment(c *types.TaskComment) error {
 	id := taskCommentId(c)
-	_, err := firestore.Delete(d.taskComments().Doc(id), DEFAULT_ATTEMPTS, PUT_SINGLE_TIMEOUT)
-	return err
+	ref := d.taskComments().Doc(id)
+	exists := true
+	if _, err := firestore.Get(ref, DEFAULT_ATTEMPTS, GET_SINGLE_TIMEOUT); err != nil {
+		if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
+			exists = false
+		} else {
+			return err
+		}
+	}
+	if exists {
+		if _, err := firestore.Delete(ref, DEFAULT_ATTEMPTS, PUT_SINGLE_TIMEOUT); err != nil {
+			return err
+		}
+		deleted := true
+		c.Deleted = &deleted
+		d.TrackModifiedTaskComment(c)
+	}
+	return nil
 }
 
 // taskSpecCommentId returns an ID for the TaskSpecComment.
@@ -146,14 +166,34 @@
 	if st, ok := status.FromError(err); ok && st.Code() == codes.AlreadyExists {
 		return db.ErrAlreadyExists
 	}
-	return err
+	if err != nil {
+		return err
+	}
+	d.TrackModifiedTaskSpecComment(c)
+	return nil
 }
 
 // See documentation for db.CommentDB interface.
 func (d *firestoreDB) DeleteTaskSpecComment(c *types.TaskSpecComment) error {
 	id := taskSpecCommentId(c)
-	_, err := firestore.Delete(d.taskSpecComments().Doc(id), DEFAULT_ATTEMPTS, PUT_SINGLE_TIMEOUT)
-	return err
+	ref := d.taskSpecComments().Doc(id)
+	exists := true
+	if _, err := firestore.Get(ref, DEFAULT_ATTEMPTS, GET_SINGLE_TIMEOUT); err != nil {
+		if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
+			exists = false
+		} else {
+			return err
+		}
+	}
+	if exists {
+		if _, err := firestore.Delete(ref, DEFAULT_ATTEMPTS, PUT_SINGLE_TIMEOUT); err != nil {
+			return err
+		}
+		deleted := true
+		c.Deleted = &deleted
+		d.TrackModifiedTaskSpecComment(c)
+	}
+	return nil
 }
 
 // commitCommentId returns an ID for the CommitComment.
@@ -169,12 +209,32 @@
 	if st, ok := status.FromError(err); ok && st.Code() == codes.AlreadyExists {
 		return db.ErrAlreadyExists
 	}
-	return err
+	if err != nil {
+		return err
+	}
+	d.TrackModifiedCommitComment(c)
+	return nil
 }
 
 // See documentation for db.CommentDB interface.
 func (d *firestoreDB) DeleteCommitComment(c *types.CommitComment) error {
 	id := commitCommentId(c)
-	_, err := firestore.Delete(d.commitComments().Doc(id), DEFAULT_ATTEMPTS, PUT_SINGLE_TIMEOUT)
-	return err
+	ref := d.commitComments().Doc(id)
+	exists := true
+	if _, err := firestore.Get(ref, DEFAULT_ATTEMPTS, GET_SINGLE_TIMEOUT); err != nil {
+		if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
+			exists = false
+		} else {
+			return err
+		}
+	}
+	if exists {
+		if _, err := firestore.Delete(d.commitComments().Doc(id), DEFAULT_ATTEMPTS, PUT_SINGLE_TIMEOUT); err != nil {
+			return err
+		}
+		deleted := true
+		c.Deleted = &deleted
+		d.TrackModifiedCommitComment(c)
+	}
+	return nil
 }
diff --git a/task_scheduler/go/db/firestore/firestore.go b/task_scheduler/go/db/firestore/firestore.go
index ca9f388..5607ca8 100644
--- a/task_scheduler/go/db/firestore/firestore.go
+++ b/task_scheduler/go/db/firestore/firestore.go
@@ -56,31 +56,24 @@
 	client    *firestore.Client
 	parentDoc string
 
-	// ModifiedTasksImpl and ModifiedJobsImpl are embedded in order to
-	// implement db.ModifiedTasksReader and db.ModifiedJobsReader.
-	db.ModifiedTasks
-	db.ModifiedJobs
+	db.ModifiedData
 }
 
 // NewDB returns a db.DB which uses Cloud Firestore for storage. The parentDoc
 // parameter is optional and indicates the path of a parent document to which
 // all collections within the DB will belong. If it is not supplied, then the
 // collections will be at the top level.
-func NewDB(ctx context.Context, project, instance string, ts oauth2.TokenSource, modTasks db.ModifiedTasks, modJobs db.ModifiedJobs) (db.BackupDBCloser, error) {
+func NewDB(ctx context.Context, project, instance string, ts oauth2.TokenSource, mod db.ModifiedData) (db.BackupDBCloser, error) {
 	client, err := firestore.NewClient(ctx, project, firestore.APP_TASK_SCHEDULER, instance, ts)
 	if err != nil {
 		return nil, err
 	}
-	if modTasks == nil {
-		modTasks = &modified.ModifiedTasksImpl{}
-	}
-	if modJobs == nil {
-		modJobs = &modified.ModifiedJobsImpl{}
+	if mod == nil {
+		mod = modified.NewModifiedData()
 	}
 	return &firestoreDB{
-		client:        client,
-		ModifiedTasks: modTasks,
-		ModifiedJobs:  modJobs,
+		client:       client,
+		ModifiedData: mod,
 	}, nil
 }
 
diff --git a/task_scheduler/go/db/firestore/firestore_test.go b/task_scheduler/go/db/firestore/firestore_test.go
index be7579e..4002133 100644
--- a/task_scheduler/go/db/firestore/firestore_test.go
+++ b/task_scheduler/go/db/firestore/firestore_test.go
@@ -24,7 +24,7 @@
 	testutils.MediumTest(t)
 	testutils.ManualTest(t)
 	instance := fmt.Sprintf("test-%s", uuid.New())
-	d, err := NewDB(context.Background(), "skia-firestore", instance, nil, nil, nil)
+	d, err := NewDB(context.Background(), "skia-firestore", instance, nil, nil)
 	assert.NoError(t, err)
 	cleanup := func() {
 		c := d.(*firestoreDB).client
diff --git a/task_scheduler/go/db/local_db/busywork/main.go b/task_scheduler/go/db/local_db/busywork/main.go
index 01844df..41b083c 100644
--- a/task_scheduler/go/db/local_db/busywork/main.go
+++ b/task_scheduler/go/db/local_db/busywork/main.go
@@ -520,7 +520,7 @@
 		sklog.Fatal(err)
 	}
 	id := fmt.Sprintf("busywork_%s", hostname)
-	d, err := firestore.NewDB(context.Background(), "skia-firestore", id, ts, nil, nil)
+	d, err := firestore.NewDB(context.Background(), "skia-firestore", id, ts, nil)
 	if err != nil {
 		sklog.Fatal(err)
 	}
diff --git a/task_scheduler/go/db/local_db/local_db.go b/task_scheduler/go/db/local_db/local_db.go
index 3109a63..689ecfe 100644
--- a/task_scheduler/go/db/local_db/local_db.go
+++ b/task_scheduler/go/db/local_db/local_db.go
@@ -18,6 +18,7 @@
 	"go.skia.org/infra/go/sklog"
 	"go.skia.org/infra/go/util"
 	"go.skia.org/infra/task_scheduler/go/db"
+	"go.skia.org/infra/task_scheduler/go/db/memory"
 	"go.skia.org/infra/task_scheduler/go/db/modified"
 	"go.skia.org/infra/task_scheduler/go/types"
 )
@@ -230,12 +231,11 @@
 
 	// ModifiedTasksImpl and ModifiedJobsImpl are embedded in order to
 	// implement db.ModifiedTasksReader and db.ModifiedJobsReader.
-	db.ModifiedTasks
-	db.ModifiedJobs
+	db.ModifiedData
 
 	// CommentBox is embedded in order to implement db.CommentDB. CommentBox uses
 	// this localDB to persist the comments.
-	*db.CommentBox
+	*memory.CommentBox
 
 	// Close will send on each of these channels to indicate goroutines should
 	// stop.
@@ -316,16 +316,13 @@
 }
 
 // NewDB returns a local DB instance.
-func NewDB(name, filename string, modTasks db.ModifiedTasks, modJobs db.ModifiedJobs) (db.BackupDBCloser, error) {
+func NewDB(name, filename string, mod db.ModifiedData) (db.BackupDBCloser, error) {
 	boltdb, err := bolt.Open(filename, 0600, nil)
 	if err != nil {
 		return nil, err
 	}
-	if modTasks == nil {
-		modTasks = &modified.ModifiedTasksImpl{}
-	}
-	if modJobs == nil {
-		modJobs = &modified.ModifiedJobsImpl{}
+	if mod == nil {
+		mod = modified.NewModifiedData()
 	}
 	d := &localDB{
 		name:     name,
@@ -384,8 +381,7 @@
 			"bucket":   BUCKET_JOBS,
 			"count":    "rows",
 		}),
-		ModifiedTasks: modTasks,
-		ModifiedJobs:  modJobs,
+		ModifiedData: mod,
 	}
 
 	stopReportActiveTx := make(chan bool)
@@ -432,7 +428,7 @@
 		return nil, err
 	}
 
-	d.CommentBox = db.NewCommentBoxWithPersistence(comments, d.writeCommentsMap)
+	d.CommentBox = memory.NewCommentBoxWithPersistence(mod, comments, d.writeCommentsMap)
 
 	if dbMetric, err := boltutil.NewDbMetric(boltdb, []string{BUCKET_TASKS, BUCKET_JOBS, BUCKET_COMMENTS}, map[string]string{"database": name}); err != nil {
 		return nil, err
@@ -533,7 +529,7 @@
 	d.metricReadTaskQueries.Inc(1)
 	min := []byte(start.Add(-MAX_CREATED_TIME_SKEW).UTC().Format(TIMESTAMP_FORMAT))
 	max := []byte(end.Add(MAX_CREATED_TIME_SKEW).UTC().Format(TIMESTAMP_FORMAT))
-	decoder := types.TaskDecoder{}
+	decoder := types.NewTaskDecoder()
 	if err := d.view("GetTasksFromDateRange", func(tx *bolt.Tx) error {
 		c := tasksBucket(tx).Cursor()
 		for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
@@ -740,7 +736,7 @@
 	d.metricReadJobQueries.Inc(1)
 	min := []byte(start.UTC().Format(TIMESTAMP_FORMAT))
 	max := []byte(end.UTC().Format(TIMESTAMP_FORMAT))
-	decoder := types.JobDecoder{}
+	decoder := types.NewJobDecoder()
 	if err := d.view("GetJobsFromDateRange", func(tx *bolt.Tx) error {
 		c := jobsBucket(tx).Cursor()
 		for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
diff --git a/task_scheduler/go/db/local_db/local_db_test.go b/task_scheduler/go/db/local_db/local_db_test.go
index 22b9649..ee2f1e9 100644
--- a/task_scheduler/go/db/local_db/local_db_test.go
+++ b/task_scheduler/go/db/local_db/local_db_test.go
@@ -150,7 +150,7 @@
 func makeDB(t *testing.T, name string) (db.BackupDBCloser, string) {
 	tmpdir, err := ioutil.TempDir("", name)
 	assert.NoError(t, err)
-	d, err := NewDB(name, filepath.Join(tmpdir, "task.db"), nil, nil)
+	d, err := NewDB(name, filepath.Join(tmpdir, "task.db"), nil)
 	assert.NoError(t, err)
 	return d, tmpdir
 }
diff --git a/task_scheduler/go/db/local_db/ts_local_db_viewer/main.go b/task_scheduler/go/db/local_db/ts_local_db_viewer/main.go
index 87c44aa..00f7412 100644
--- a/task_scheduler/go/db/local_db/ts_local_db_viewer/main.go
+++ b/task_scheduler/go/db/local_db/ts_local_db_viewer/main.go
@@ -28,7 +28,7 @@
 	// Global init.
 	common.Init()
 
-	d, err := local_db.NewDB(local_db.DB_NAME, *dbfile, nil, nil)
+	d, err := local_db.NewDB(local_db.DB_NAME, *dbfile, nil)
 	if err != nil {
 		sklog.Fatal(err)
 	}
diff --git a/task_scheduler/go/db/memory/comments.go b/task_scheduler/go/db/memory/comments.go
new file mode 100644
index 0000000..122f7bc
--- /dev/null
+++ b/task_scheduler/go/db/memory/comments.go
@@ -0,0 +1,399 @@
+package memory
+
+import (
+	"fmt"
+	"sync"
+	"time"
+
+	"go.skia.org/infra/go/sklog"
+	"go.skia.org/infra/go/util"
+	"go.skia.org/infra/task_scheduler/go/db"
+	"go.skia.org/infra/task_scheduler/go/db/modified"
+	"go.skia.org/infra/task_scheduler/go/types"
+)
+
+// CommentBox implements CommentDB with in-memory storage.
+//
+// When created via NewCommentBoxWithPersistence, CommentBox will persist the
+// in-memory representation on every change using the provided writer function.
+type CommentBox struct {
+	db.ModifiedComments
+
+	// mtx protects comments.
+	mtx sync.RWMutex
+	// comments is map[repo_name]*types.RepoComments.
+	comments map[string]*types.RepoComments
+	// writer is called to persist comments after every change.
+	writer func(map[string]*types.RepoComments) error
+}
+
+// NewCommentBoxWithPersistence creates a CommentBox that is initialized with
+// init and sends the updated in-memory representation to writer after each
+// change. The value of init and the argument to writer is
+// map[repo_name]*types.RepoComments. init must not be modified by the caller. writer
+// must not call any methods of CommentBox. writer may return an error to
+// prevent a change from taking effect.
+func NewCommentBoxWithPersistence(modComments db.ModifiedComments, init map[string]*types.RepoComments, writer func(map[string]*types.RepoComments) error) *CommentBox {
+	if modComments == nil {
+		modComments = &modified.ModifiedCommentsImpl{}
+	}
+	return &CommentBox{
+		ModifiedComments: modComments,
+		comments:         init,
+		writer:           writer,
+	}
+}
+
+// See documentation for CommentDB.GetCommentsForRepos.
+func (b *CommentBox) GetCommentsForRepos(repos []string, from time.Time) ([]*types.RepoComments, error) {
+	b.mtx.RLock()
+	defer b.mtx.RUnlock()
+	rv := make([]*types.RepoComments, len(repos))
+	for i, repo := range repos {
+		if rc, ok := b.comments[repo]; ok {
+			rv[i] = rc.Copy()
+		} else {
+			rv[i] = &types.RepoComments{Repo: repo}
+		}
+	}
+	return rv, nil
+}
+
+// write calls b.writer with comments if non-null.
+func (b *CommentBox) write() error {
+	if b.writer == nil {
+		return nil
+	}
+	return b.writer(b.comments)
+}
+
+// getRepoComments returns the initialized *types.RepoComments for the given repo.
+func (b *CommentBox) getRepoComments(repo string) *types.RepoComments {
+	if b.comments == nil {
+		b.comments = make(map[string]*types.RepoComments, 1)
+	}
+	rc, ok := b.comments[repo]
+	if !ok {
+		rc = &types.RepoComments{
+			Repo:             repo,
+			TaskComments:     map[string]map[string][]*types.TaskComment{},
+			TaskSpecComments: map[string][]*types.TaskSpecComment{},
+			CommitComments:   map[string][]*types.CommitComment{},
+		}
+		b.comments[repo] = rc
+	}
+	return rc
+}
+
+// putTaskComment validates c and adds c to b.comments, or returns
+// db.ErrAlreadyExists if a different comment has the same ID fields. Assumes b.mtx
+// is write-locked.
+func (b *CommentBox) putTaskComment(c *types.TaskComment) error {
+	if c.Repo == "" || c.Revision == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
+		return fmt.Errorf("TaskComment missing required fields. %#v", c)
+	}
+	rc := b.getRepoComments(c.Repo)
+	nameMap, ok := rc.TaskComments[c.Revision]
+	if !ok {
+		nameMap = map[string][]*types.TaskComment{}
+		rc.TaskComments[c.Revision] = nameMap
+	}
+	cSlice := nameMap[c.Name]
+	// TODO(benjaminwagner): Would using utilities in the sort package make this
+	// cleaner?
+	if len(cSlice) > 0 {
+		// Assume comments normally inserted at the end.
+		insert := 0
+		for i := len(cSlice) - 1; i >= 0; i-- {
+			if cSlice[i].Timestamp.Equal(c.Timestamp) {
+				if *cSlice[i] == *c {
+					return nil
+				} else {
+					return db.ErrAlreadyExists
+				}
+			} else if cSlice[i].Timestamp.Before(c.Timestamp) {
+				insert = i + 1
+				break
+			}
+		}
+		// Ensure capacity for another comment and move any comments after the
+		// insertion point.
+		cSlice = append(cSlice, nil)
+		copy(cSlice[insert+1:], cSlice[insert:])
+		cSlice[insert] = c.Copy()
+	} else {
+		cSlice = []*types.TaskComment{c.Copy()}
+	}
+	nameMap[c.Name] = cSlice
+	return nil
+}
+
+// deleteTaskComment validates c, then finds and removes a comment matching c's
+// ID fields, returning the comment if found. Assumes b.mtx is write-locked.
+func (b *CommentBox) deleteTaskComment(c *types.TaskComment) (*types.TaskComment, error) {
+	if c.Repo == "" || c.Revision == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
+		return nil, fmt.Errorf("TaskComment missing required fields. %#v", c)
+	}
+	if rc, ok := b.comments[c.Repo]; ok {
+		if cSlice, ok := rc.TaskComments[c.Revision][c.Name]; ok {
+			// Assume linear search is fast.
+			for i, existing := range cSlice {
+				if existing.Timestamp.Equal(c.Timestamp) {
+					if len(cSlice) > 1 {
+						rc.TaskComments[c.Revision][c.Name] = append(cSlice[:i], cSlice[i+1:]...)
+					} else {
+						delete(rc.TaskComments[c.Revision], c.Name)
+						if len(rc.TaskComments[c.Revision]) == 0 {
+							delete(rc.TaskComments, c.Revision)
+						}
+					}
+					return existing, nil
+				}
+			}
+		}
+	}
+	return nil, nil
+}
+
+// See documentation for CommentDB.PutTaskComment.
+func (b *CommentBox) PutTaskComment(c *types.TaskComment) error {
+	b.mtx.Lock()
+	defer b.mtx.Unlock()
+	if err := b.putTaskComment(c); err != nil {
+		return err
+	}
+	if err := b.write(); err != nil {
+		// If write returns an error, we must revert to previous.
+		if _, delErr := b.deleteTaskComment(c); delErr != nil {
+			sklog.Warningf("Unexpected error: %s", delErr)
+		}
+		return err
+	}
+	b.TrackModifiedTaskComment(c)
+	return nil
+}
+
+// See documentation for CommentDB.DeleteTaskComment.
+func (b *CommentBox) DeleteTaskComment(c *types.TaskComment) error {
+	b.mtx.Lock()
+	defer b.mtx.Unlock()
+	existing, err := b.deleteTaskComment(c)
+	if err != nil {
+		return err
+	}
+	if existing != nil {
+		if err := b.write(); err != nil {
+			// If write returns an error, we must revert to previous.
+			if putErr := b.putTaskComment(existing); putErr != nil {
+				sklog.Warningf("Unexpected error: %s", putErr)
+			}
+			return err
+		}
+		deleted := true
+		c.Deleted = &deleted
+		b.TrackModifiedTaskComment(c)
+	}
+	return nil
+}
+
+// putTaskSpecComment validates c and adds c to b.comments, or returns
+// db.ErrAlreadyExists if a different comment has the same ID fields. Assumes b.mtx
+// is write-locked.
+func (b *CommentBox) putTaskSpecComment(c *types.TaskSpecComment) error {
+	if c.Repo == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
+		return fmt.Errorf("TaskSpecComment missing required fields. %#v", c)
+	}
+	rc := b.getRepoComments(c.Repo)
+	cSlice := rc.TaskSpecComments[c.Name]
+	if len(cSlice) > 0 {
+		// Assume comments normally inserted at the end.
+		insert := 0
+		for i := len(cSlice) - 1; i >= 0; i-- {
+			if cSlice[i].Timestamp.Equal(c.Timestamp) {
+				if *cSlice[i] == *c {
+					return nil
+				} else {
+					return db.ErrAlreadyExists
+				}
+			} else if cSlice[i].Timestamp.Before(c.Timestamp) {
+				insert = i + 1
+				break
+			}
+		}
+		// Ensure capacity for another comment and move any comments after the
+		// insertion point.
+		cSlice = append(cSlice, nil)
+		copy(cSlice[insert+1:], cSlice[insert:])
+		cSlice[insert] = c.Copy()
+	} else {
+		cSlice = []*types.TaskSpecComment{c.Copy()}
+	}
+	rc.TaskSpecComments[c.Name] = cSlice
+	return nil
+}
+
+// deleteTaskSpecComment validates c, then finds and removes a comment matching
+// c's ID fields, returning the comment if found. Assumes b.mtx is write-locked.
+func (b *CommentBox) deleteTaskSpecComment(c *types.TaskSpecComment) (*types.TaskSpecComment, error) {
+	if c.Repo == "" || c.Name == "" || util.TimeIsZero(c.Timestamp) {
+		return nil, fmt.Errorf("TaskSpecComment missing required fields. %#v", c)
+	}
+	if rc, ok := b.comments[c.Repo]; ok {
+		if cSlice, ok := rc.TaskSpecComments[c.Name]; ok {
+			// Assume linear search is fast.
+			for i, existing := range cSlice {
+				if existing.Timestamp.Equal(c.Timestamp) {
+					if len(cSlice) > 1 {
+						rc.TaskSpecComments[c.Name] = append(cSlice[:i], cSlice[i+1:]...)
+					} else {
+						delete(rc.TaskSpecComments, c.Name)
+					}
+					return existing, nil
+				}
+			}
+		}
+	}
+	return nil, nil
+}
+
+// See documentation for CommentDB.PutTaskSpecComment.
+func (b *CommentBox) PutTaskSpecComment(c *types.TaskSpecComment) error {
+	b.mtx.Lock()
+	defer b.mtx.Unlock()
+	if err := b.putTaskSpecComment(c); err != nil {
+		return err
+	}
+	if err := b.write(); err != nil {
+		// If write returns an error, we must revert to previous.
+		if _, delErr := b.deleteTaskSpecComment(c); delErr != nil {
+			sklog.Warningf("Unexpected error: %s", delErr)
+		}
+		return err
+	}
+	b.TrackModifiedTaskSpecComment(c)
+	return nil
+}
+
+// See documentation for CommentDB.DeleteTaskSpecComment.
+func (b *CommentBox) DeleteTaskSpecComment(c *types.TaskSpecComment) error {
+	b.mtx.Lock()
+	defer b.mtx.Unlock()
+	existing, err := b.deleteTaskSpecComment(c)
+	if err != nil {
+		return err
+	}
+	if existing != nil {
+		if err := b.write(); err != nil {
+			// If write returns an error, we must revert to previous.
+			if putErr := b.putTaskSpecComment(existing); putErr != nil {
+				sklog.Warningf("Unexpected error: %s", putErr)
+			}
+			return err
+		}
+		deleted := true
+		c.Deleted = &deleted
+		b.TrackModifiedTaskSpecComment(c)
+	}
+	return nil
+}
+
+// putCommitComment validates c and adds c to b.comments, or returns
+// db.ErrAlreadyExists if a different comment has the same ID fields. Assumes b.mtx
+// is write-locked.
+func (b *CommentBox) putCommitComment(c *types.CommitComment) error {
+	if c.Repo == "" || c.Revision == "" || util.TimeIsZero(c.Timestamp) {
+		return fmt.Errorf("CommitComment missing required fields. %#v", c)
+	}
+	rc := b.getRepoComments(c.Repo)
+	cSlice := rc.CommitComments[c.Revision]
+	if len(cSlice) > 0 {
+		// Assume comments normally inserted at the end.
+		insert := 0
+		for i := len(cSlice) - 1; i >= 0; i-- {
+			if cSlice[i].Timestamp.Equal(c.Timestamp) {
+				if *cSlice[i] == *c {
+					return nil
+				} else {
+					return db.ErrAlreadyExists
+				}
+			} else if cSlice[i].Timestamp.Before(c.Timestamp) {
+				insert = i + 1
+				break
+			}
+		}
+		// Ensure capacity for another comment and move any comments after the
+		// insertion point.
+		cSlice = append(cSlice, nil)
+		copy(cSlice[insert+1:], cSlice[insert:])
+		cSlice[insert] = c.Copy()
+	} else {
+		cSlice = []*types.CommitComment{c.Copy()}
+	}
+	rc.CommitComments[c.Revision] = cSlice
+	return nil
+}
+
+// deleteCommitComment validates c, then finds and removes a comment matching
+// c's ID fields, returning the comment if found. Assumes b.mtx is write-locked.
+func (b *CommentBox) deleteCommitComment(c *types.CommitComment) (*types.CommitComment, error) {
+	if c.Repo == "" || c.Revision == "" || util.TimeIsZero(c.Timestamp) {
+		return nil, fmt.Errorf("CommitComment missing required fields. %#v", c)
+	}
+	if rc, ok := b.comments[c.Repo]; ok {
+		if cSlice, ok := rc.CommitComments[c.Revision]; ok {
+			// Assume linear search is fast.
+			for i, existing := range cSlice {
+				if existing.Timestamp.Equal(c.Timestamp) {
+					if len(cSlice) > 1 {
+						rc.CommitComments[c.Revision] = append(cSlice[:i], cSlice[i+1:]...)
+					} else {
+						delete(rc.CommitComments, c.Revision)
+					}
+					return existing, nil
+				}
+			}
+		}
+	}
+	return nil, nil
+}
+
+// See documentation for CommentDB.PutCommitComment.
+func (b *CommentBox) PutCommitComment(c *types.CommitComment) error {
+	b.mtx.Lock()
+	defer b.mtx.Unlock()
+	if err := b.putCommitComment(c); err != nil {
+		return err
+	}
+	if err := b.write(); err != nil {
+		// If write returns an error, we must revert to previous.
+		if _, delErr := b.deleteCommitComment(c); delErr != nil {
+			sklog.Warningf("Unexpected error: %s", delErr)
+		}
+		return err
+	}
+	b.TrackModifiedCommitComment(c)
+	return nil
+}
+
+// See documentation for CommentDB.DeleteCommitComment.
+func (b *CommentBox) DeleteCommitComment(c *types.CommitComment) error {
+	b.mtx.Lock()
+	defer b.mtx.Unlock()
+	existing, err := b.deleteCommitComment(c)
+	if err != nil {
+		return err
+	}
+	if existing != nil {
+		if err := b.write(); err != nil {
+			// If write returns an error, we must revert to previous.
+			if putErr := b.putCommitComment(existing); putErr != nil {
+				sklog.Warningf("Unexpected error: %s", putErr)
+			}
+			return err
+		}
+		deleted := true
+		c.Deleted = &deleted
+		b.TrackModifiedCommitComment(c)
+	}
+	return nil
+}
diff --git a/task_scheduler/go/db/comments_test.go b/task_scheduler/go/db/memory/comments_test.go
similarity index 63%
rename from task_scheduler/go/db/comments_test.go
rename to task_scheduler/go/db/memory/comments_test.go
index a00e224..54f1914 100644
--- a/task_scheduler/go/db/comments_test.go
+++ b/task_scheduler/go/db/memory/comments_test.go
@@ -1,26 +1,22 @@
-package db
+package memory
 
 import (
 	"fmt"
-	"os"
 	"testing"
 	"time"
 
 	assert "github.com/stretchr/testify/require"
 	"go.skia.org/infra/go/deepequal"
 	"go.skia.org/infra/go/testutils"
+	"go.skia.org/infra/task_scheduler/go/db"
+	"go.skia.org/infra/task_scheduler/go/db/modified"
 	"go.skia.org/infra/task_scheduler/go/types"
 )
 
-func TestMain(m *testing.M) {
-	AssertDeepEqual = deepequal.AssertDeepEqual
-	os.Exit(m.Run())
-}
-
 // TestCommentBox checks that CommentBox correctly implements CommentDB.
 func TestCommentBox(t *testing.T) {
 	testutils.SmallTest(t)
-	TestCommentDB(t, &CommentBox{})
+	db.TestCommentDB(t, &CommentBox{ModifiedComments: &modified.ModifiedCommentsImpl{}})
 }
 
 // TestCommentBoxWithPersistence checks that NewCommentBoxWithPersistence can be
@@ -36,7 +32,7 @@
 		return nil
 	}
 
-	db := NewCommentBoxWithPersistence(nil, testWriter)
+	d := NewCommentBoxWithPersistence(nil, nil, testWriter)
 
 	now := time.Now()
 
@@ -51,23 +47,23 @@
 		TaskSpecComments: map[string][]*types.TaskSpecComment{},
 		CommitComments:   map[string][]*types.CommitComment{},
 	}
-	assert.NoError(t, db.PutTaskComment(tc1))
+	assert.NoError(t, d.PutTaskComment(tc1))
 
 	tc2 := types.MakeTaskComment(2, 1, 1, 1, now.Add(2*time.Second))
 	expected[r1].TaskComments["c1"]["n1"] = []*types.TaskComment{tc1, tc2}
-	assert.NoError(t, db.PutTaskComment(tc2))
+	assert.NoError(t, d.PutTaskComment(tc2))
 
 	tc3 := types.MakeTaskComment(3, 1, 1, 1, now.Add(time.Second))
 	expected[r1].TaskComments["c1"]["n1"] = []*types.TaskComment{tc1, tc3, tc2}
-	assert.NoError(t, db.PutTaskComment(tc3))
+	assert.NoError(t, d.PutTaskComment(tc3))
 
 	tc4 := types.MakeTaskComment(4, 1, 1, 2, now)
 	expected[r1].TaskComments["c2"] = map[string][]*types.TaskComment{"n1": {tc4}}
-	assert.NoError(t, db.PutTaskComment(tc4))
+	assert.NoError(t, d.PutTaskComment(tc4))
 
 	tc5 := types.MakeTaskComment(5, 1, 2, 2, now)
 	expected[r1].TaskComments["c2"]["n2"] = []*types.TaskComment{tc5}
-	assert.NoError(t, db.PutTaskComment(tc5))
+	assert.NoError(t, d.PutTaskComment(tc5))
 
 	tc6 := types.MakeTaskComment(6, 2, 3, 3, now)
 	r2 := tc6.Repo
@@ -77,21 +73,21 @@
 		TaskSpecComments: map[string][]*types.TaskSpecComment{},
 		CommitComments:   map[string][]*types.CommitComment{},
 	}
-	assert.NoError(t, db.PutTaskComment(tc6))
+	assert.NoError(t, d.PutTaskComment(tc6))
 
 	tc6copy := tc6.Copy() // Adding identical comment should be ignored.
-	assert.NoError(t, db.PutTaskComment(tc6copy))
+	assert.NoError(t, d.PutTaskComment(tc6copy))
 	tc6.Message = "modifying after Put shouldn't affect stored comment"
 
 	assert.True(t, callCount >= 6)
 
 	sc1 := types.MakeTaskSpecComment(1, 1, 1, now)
 	expected[r1].TaskSpecComments["n1"] = []*types.TaskSpecComment{sc1}
-	assert.NoError(t, db.PutTaskSpecComment(sc1))
+	assert.NoError(t, d.PutTaskSpecComment(sc1))
 
 	cc1 := types.MakeCommitComment(1, 1, 1, now)
 	expected[r1].CommitComments["c1"] = []*types.CommitComment{cc1}
-	assert.NoError(t, db.PutCommitComment(cc1))
+	assert.NoError(t, d.PutCommitComment(cc1))
 
 	assert.True(t, callCount >= 8)
 	callCount = 0
@@ -99,13 +95,13 @@
 	// Check that if there's an error adding, testWriter is not called.
 	tc1different := tc1.Copy()
 	tc1different.Message = "not the same"
-	assert.True(t, IsAlreadyExists(db.PutTaskComment(tc1different)))
+	assert.True(t, db.IsAlreadyExists(d.PutTaskComment(tc1different)))
 	sc1different := sc1.Copy()
 	sc1different.Message = "not the same"
-	assert.True(t, IsAlreadyExists(db.PutTaskSpecComment(sc1different)))
+	assert.True(t, db.IsAlreadyExists(d.PutTaskSpecComment(sc1different)))
 	cc1different := cc1.Copy()
 	cc1different.Message = "not the same"
-	assert.True(t, IsAlreadyExists(db.PutCommitComment(cc1different)))
+	assert.True(t, db.IsAlreadyExists(d.PutCommitComment(cc1different)))
 
 	assert.Equal(t, 0, callCount)
 
@@ -114,10 +110,10 @@
 		r1: expected[r1].Copy(),
 		r2: expected[r2].Copy(),
 	}
-	db = NewCommentBoxWithPersistence(init, testWriter)
+	d = NewCommentBoxWithPersistence(nil, init, testWriter)
 
 	{
-		actual, err := db.GetCommentsForRepos([]string{"r0", r1, r2}, now.Add(-10000*time.Hour))
+		actual, err := d.GetCommentsForRepos([]string{"r0", r1, r2}, now.Add(-10000*time.Hour))
 		assert.NoError(t, err)
 		expectedSlice := []*types.RepoComments{
 			{Repo: "r0"},
@@ -131,28 +127,28 @@
 
 	// Delete some comments.
 	expected[r1].TaskComments["c1"]["n1"] = []*types.TaskComment{tc1, tc2}
-	assert.NoError(t, db.DeleteTaskComment(tc3))
+	assert.NoError(t, d.DeleteTaskComment(tc3))
 	expected[r1].TaskSpecComments = map[string][]*types.TaskSpecComment{}
-	assert.NoError(t, db.DeleteTaskSpecComment(sc1))
+	assert.NoError(t, d.DeleteTaskSpecComment(sc1))
 	expected[r1].CommitComments = map[string][]*types.CommitComment{}
-	assert.NoError(t, db.DeleteCommitComment(cc1))
+	assert.NoError(t, d.DeleteCommitComment(cc1))
 
 	assert.Equal(t, 3, callCount)
 
 	// Delete of nonexistent task should succeed.
-	assert.NoError(t, db.DeleteTaskComment(types.MakeTaskComment(99, 1, 1, 1, now.Add(99*time.Second))))
-	assert.NoError(t, db.DeleteTaskComment(types.MakeTaskComment(99, 1, 1, 99, now)))
-	assert.NoError(t, db.DeleteTaskComment(types.MakeTaskComment(99, 1, 99, 1, now)))
-	assert.NoError(t, db.DeleteTaskComment(types.MakeTaskComment(99, 99, 1, 1, now)))
-	assert.NoError(t, db.DeleteTaskSpecComment(types.MakeTaskSpecComment(99, 1, 1, now.Add(99*time.Second))))
-	assert.NoError(t, db.DeleteTaskSpecComment(types.MakeTaskSpecComment(99, 1, 99, now)))
-	assert.NoError(t, db.DeleteTaskSpecComment(types.MakeTaskSpecComment(99, 99, 1, now)))
-	assert.NoError(t, db.DeleteCommitComment(types.MakeCommitComment(99, 1, 1, now.Add(99*time.Second))))
-	assert.NoError(t, db.DeleteCommitComment(types.MakeCommitComment(99, 1, 99, now)))
-	assert.NoError(t, db.DeleteCommitComment(types.MakeCommitComment(99, 99, 1, now)))
+	assert.NoError(t, d.DeleteTaskComment(types.MakeTaskComment(99, 1, 1, 1, now.Add(99*time.Second))))
+	assert.NoError(t, d.DeleteTaskComment(types.MakeTaskComment(99, 1, 1, 99, now)))
+	assert.NoError(t, d.DeleteTaskComment(types.MakeTaskComment(99, 1, 99, 1, now)))
+	assert.NoError(t, d.DeleteTaskComment(types.MakeTaskComment(99, 99, 1, 1, now)))
+	assert.NoError(t, d.DeleteTaskSpecComment(types.MakeTaskSpecComment(99, 1, 1, now.Add(99*time.Second))))
+	assert.NoError(t, d.DeleteTaskSpecComment(types.MakeTaskSpecComment(99, 1, 99, now)))
+	assert.NoError(t, d.DeleteTaskSpecComment(types.MakeTaskSpecComment(99, 99, 1, now)))
+	assert.NoError(t, d.DeleteCommitComment(types.MakeCommitComment(99, 1, 1, now.Add(99*time.Second))))
+	assert.NoError(t, d.DeleteCommitComment(types.MakeCommitComment(99, 1, 99, now)))
+	assert.NoError(t, d.DeleteCommitComment(types.MakeCommitComment(99, 99, 1, now)))
 
 	{
-		actual, err := db.GetCommentsForRepos([]string{"r0", r1, r2}, now.Add(-10000*time.Hour))
+		actual, err := d.GetCommentsForRepos([]string{"r0", r1, r2}, now.Add(-10000*time.Hour))
 		assert.NoError(t, err)
 		expectedSlice := []*types.RepoComments{
 			{Repo: "r0"},
@@ -167,10 +163,10 @@
 		r1: expected[r1].Copy(),
 		r2: expected[r2].Copy(),
 	}
-	db = NewCommentBoxWithPersistence(init, testWriter)
+	d = NewCommentBoxWithPersistence(nil, init, testWriter)
 
 	{
-		actual, err := db.GetCommentsForRepos([]string{"r0", r1, r2}, now.Add(-10000*time.Hour))
+		actual, err := d.GetCommentsForRepos([]string{"r0", r1, r2}, now.Add(-10000*time.Hour))
 		assert.NoError(t, err)
 		expectedSlice := []*types.RepoComments{
 			{Repo: "r0"},
@@ -193,7 +189,7 @@
 		return injectedError
 	}
 
-	db := NewCommentBoxWithPersistence(nil, testWriter)
+	d := NewCommentBoxWithPersistence(nil, nil, testWriter)
 
 	now := time.Now()
 
@@ -205,16 +201,16 @@
 	tc5 := types.MakeTaskComment(5, 1, 2, 2, now)
 	tc6 := types.MakeTaskComment(6, 2, 3, 3, now)
 	for _, c := range []*types.TaskComment{tc1, tc2, tc3, tc4, tc5, tc6} {
-		assert.NoError(t, db.PutTaskComment(c))
+		assert.NoError(t, d.PutTaskComment(c))
 	}
 	r1 := tc1.Repo
 	r2 := tc6.Repo
 
 	sc1 := types.MakeTaskSpecComment(1, 1, 1, now)
-	assert.NoError(t, db.PutTaskSpecComment(sc1))
+	assert.NoError(t, d.PutTaskSpecComment(sc1))
 
 	cc1 := types.MakeCommitComment(1, 1, 1, now)
-	assert.NoError(t, db.PutCommitComment(cc1))
+	assert.NoError(t, d.PutCommitComment(cc1))
 
 	expected := []*types.RepoComments{
 		{
@@ -248,7 +244,7 @@
 	}
 
 	{
-		actual, err := db.GetCommentsForRepos([]string{r1, r2}, now.Add(-10000*time.Hour))
+		actual, err := d.GetCommentsForRepos([]string{r1, r2}, now.Add(-10000*time.Hour))
 		assert.NoError(t, err)
 		deepequal.AssertDeepEqual(t, expected, actual)
 	}
@@ -257,33 +253,33 @@
 
 	injectedError = fmt.Errorf("No comments from the peanut gallery.")
 
-	assert.Error(t, db.PutTaskComment(types.MakeTaskComment(99, 1, 1, 1, now.Add(99*time.Second))))
-	assert.Error(t, db.PutTaskComment(types.MakeTaskComment(99, 1, 1, 99, now)))
-	assert.Error(t, db.PutTaskComment(types.MakeTaskComment(99, 1, 99, 1, now)))
-	assert.Error(t, db.PutTaskComment(types.MakeTaskComment(99, 99, 1, 1, now)))
-	assert.Error(t, db.PutTaskSpecComment(types.MakeTaskSpecComment(99, 1, 1, now.Add(99*time.Second))))
-	assert.Error(t, db.PutTaskSpecComment(types.MakeTaskSpecComment(99, 1, 99, now)))
-	assert.Error(t, db.PutTaskSpecComment(types.MakeTaskSpecComment(99, 99, 1, now)))
-	assert.Error(t, db.PutCommitComment(types.MakeCommitComment(99, 1, 1, now.Add(99*time.Second))))
-	assert.Error(t, db.PutCommitComment(types.MakeCommitComment(99, 1, 99, now)))
-	assert.Error(t, db.PutCommitComment(types.MakeCommitComment(99, 99, 1, now)))
+	assert.Error(t, d.PutTaskComment(types.MakeTaskComment(99, 1, 1, 1, now.Add(99*time.Second))))
+	assert.Error(t, d.PutTaskComment(types.MakeTaskComment(99, 1, 1, 99, now)))
+	assert.Error(t, d.PutTaskComment(types.MakeTaskComment(99, 1, 99, 1, now)))
+	assert.Error(t, d.PutTaskComment(types.MakeTaskComment(99, 99, 1, 1, now)))
+	assert.Error(t, d.PutTaskSpecComment(types.MakeTaskSpecComment(99, 1, 1, now.Add(99*time.Second))))
+	assert.Error(t, d.PutTaskSpecComment(types.MakeTaskSpecComment(99, 1, 99, now)))
+	assert.Error(t, d.PutTaskSpecComment(types.MakeTaskSpecComment(99, 99, 1, now)))
+	assert.Error(t, d.PutCommitComment(types.MakeCommitComment(99, 1, 1, now.Add(99*time.Second))))
+	assert.Error(t, d.PutCommitComment(types.MakeCommitComment(99, 1, 99, now)))
+	assert.Error(t, d.PutCommitComment(types.MakeCommitComment(99, 99, 1, now)))
 
 	assert.Equal(t, 10, callCount)
 
 	// Assert nothing has changed.
 	{
-		actual, err := db.GetCommentsForRepos([]string{r1, r2}, now.Add(-10000*time.Hour))
+		actual, err := d.GetCommentsForRepos([]string{r1, r2}, now.Add(-10000*time.Hour))
 		assert.NoError(t, err)
 		deepequal.AssertDeepEqual(t, expected, actual)
 	}
 
-	assert.Error(t, db.DeleteTaskComment(tc1))
-	assert.Error(t, db.DeleteTaskSpecComment(sc1))
-	assert.Error(t, db.DeleteCommitComment(cc1))
+	assert.Error(t, d.DeleteTaskComment(tc1))
+	assert.Error(t, d.DeleteTaskSpecComment(sc1))
+	assert.Error(t, d.DeleteCommitComment(cc1))
 
 	// Assert nothing has changed.
 	{
-		actual, err := db.GetCommentsForRepos([]string{r1, r2}, now.Add(-10000*time.Hour))
+		actual, err := d.GetCommentsForRepos([]string{r1, r2}, now.Add(-10000*time.Hour))
 		assert.NoError(t, err)
 		deepequal.AssertDeepEqual(t, expected, actual)
 	}
diff --git a/task_scheduler/go/db/memory/memory.go b/task_scheduler/go/db/memory/memory.go
index 0a9700b..7f6b84d 100644
--- a/task_scheduler/go/db/memory/memory.go
+++ b/task_scheduler/go/db/memory/memory.go
@@ -208,6 +208,9 @@
 
 // NewInMemoryDB returns an extremely simple, inefficient, in-memory DB
 // implementation.
-func NewInMemoryDB(modTasks db.ModifiedTasks, modJobs db.ModifiedJobs) db.DB {
-	return db.NewDB(NewInMemoryTaskDB(modTasks), NewInMemoryJobDB(modJobs), &db.CommentBox{})
+func NewInMemoryDB(mod db.ModifiedData) db.DB {
+	if mod == nil {
+		mod = modified.NewModifiedData()
+	}
+	return db.NewDB(NewInMemoryTaskDB(mod), NewInMemoryJobDB(mod), &CommentBox{ModifiedComments: mod})
 }
diff --git a/task_scheduler/go/db/migrate/migrate.go b/task_scheduler/go/db/migrate/migrate.go
index 33b6cd0..7c3651e 100644
--- a/task_scheduler/go/db/migrate/migrate.go
+++ b/task_scheduler/go/db/migrate/migrate.go
@@ -121,7 +121,7 @@
 	}
 
 	ctx := context.Background()
-	oldDB, err := local_db.NewDB(local_db.DB_NAME, *boltDB, nil, nil)
+	oldDB, err := local_db.NewDB(local_db.DB_NAME, *boltDB, nil)
 	if err != nil {
 		sklog.Fatal(err)
 	}
@@ -131,7 +131,7 @@
 	if err != nil {
 		sklog.Fatal(err)
 	}
-	newDB, err := firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *fsInstance, ts, nil, nil)
+	newDB, err := firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *fsInstance, ts, nil)
 	if err != nil {
 		sklog.Fatal(err)
 	}
diff --git a/task_scheduler/go/db/modified/modified_data.go b/task_scheduler/go/db/modified/modified_data.go
index 21b3f6f..471355c 100644
--- a/task_scheduler/go/db/modified/modified_data.go
+++ b/task_scheduler/go/db/modified/modified_data.go
@@ -4,6 +4,7 @@
 	"bytes"
 	"encoding/gob"
 	"sort"
+	"strings"
 	"sync"
 	"time"
 
@@ -13,6 +14,12 @@
 	"go.skia.org/infra/task_scheduler/go/types"
 )
 
+// NewModifiedData returns a db.ModifiedData instance which tracks modifications
+// in memory.
+func NewModifiedData() db.ModifiedData {
+	return db.NewModifiedData(&ModifiedTasksImpl{}, &ModifiedJobsImpl{}, &ModifiedCommentsImpl{})
+}
+
 // modifiedData allows subscribers to keep track of DB entries that have been
 // modified. It is designed to be used with wrappers in order to store a desired
 // type of data.
@@ -125,7 +132,7 @@
 	if err != nil {
 		return nil, err
 	}
-	d := types.TaskDecoder{}
+	d := types.NewTaskDecoder()
 	for _, g := range tasks {
 		if !d.Process(g) {
 			break
@@ -178,7 +185,7 @@
 	if err != nil {
 		return nil, err
 	}
-	d := types.JobDecoder{}
+	d := types.NewJobDecoder()
 	for _, g := range jobs {
 		if !d.Process(g) {
 			break
@@ -221,5 +228,128 @@
 	m.m.StopTrackingModifiedEntries(id)
 }
 
+type ModifiedCommentsImpl struct {
+	tasks     modifiedData
+	taskSpecs modifiedData
+	commits   modifiedData
+}
+
+// See docs for ModifiedComments interface.
+func (m *ModifiedCommentsImpl) GetModifiedComments(id string) ([]*types.TaskComment, []*types.TaskSpecComment, []*types.CommitComment, error) {
+	ids := strings.Split(id, "#")
+	if len(ids) != 3 {
+		return nil, nil, nil, db.ErrUnknownId
+	}
+	tasks, err := m.tasks.GetModifiedEntries(ids[0])
+	if err != nil {
+		return nil, nil, nil, err
+	}
+	d1 := types.NewTaskCommentDecoder()
+	for _, g := range tasks {
+		if !d1.Process(g) {
+			break
+		}
+	}
+	rv1, err := d1.Result()
+	if err != nil {
+		return nil, nil, nil, err
+	}
+	sort.Sort(types.TaskCommentSlice(rv1))
+
+	taskSpecs, err := m.taskSpecs.GetModifiedEntries(ids[1])
+	if err != nil {
+		return nil, nil, nil, err
+	}
+	d2 := types.NewTaskSpecCommentDecoder()
+	for _, g := range taskSpecs {
+		if !d2.Process(g) {
+			break
+		}
+	}
+	rv2, err := d2.Result()
+	if err != nil {
+		return nil, nil, nil, err
+	}
+	sort.Sort(types.TaskSpecCommentSlice(rv2))
+
+	commits, err := m.commits.GetModifiedEntries(ids[2])
+	if err != nil {
+		return nil, nil, nil, err
+	}
+	d3 := types.NewCommitCommentDecoder()
+	for _, g := range commits {
+		if !d3.Process(g) {
+			break
+		}
+	}
+	rv3, err := d3.Result()
+	if err != nil {
+		return nil, nil, nil, err
+	}
+	sort.Sort(types.CommitCommentSlice(rv3))
+
+	return rv1, rv2, rv3, nil
+}
+
+// See docs for ModifiedComments interface.
+func (m *ModifiedCommentsImpl) TrackModifiedTaskComment(c *types.TaskComment) {
+	var buf bytes.Buffer
+	if err := gob.NewEncoder(&buf).Encode(c); err != nil {
+		sklog.Fatal(err)
+	}
+	m.tasks.TrackModifiedEntries(map[string][]byte{c.Id(): buf.Bytes()})
+}
+
+// See docs for ModifiedComments interface.
+func (m *ModifiedCommentsImpl) TrackModifiedTaskSpecComment(c *types.TaskSpecComment) {
+	var buf bytes.Buffer
+	if err := gob.NewEncoder(&buf).Encode(c); err != nil {
+		sklog.Fatal(err)
+	}
+	m.taskSpecs.TrackModifiedEntries(map[string][]byte{c.Id(): buf.Bytes()})
+}
+
+// See docs for ModifiedComments interface.
+func (m *ModifiedCommentsImpl) TrackModifiedCommitComment(c *types.CommitComment) {
+	var buf bytes.Buffer
+	if err := gob.NewEncoder(&buf).Encode(c); err != nil {
+		sklog.Fatal(err)
+	}
+	m.commits.TrackModifiedEntries(map[string][]byte{c.Id(): buf.Bytes()})
+}
+
+// See docs for ModifiedComments interface.
+func (m *ModifiedCommentsImpl) StartTrackingModifiedComments() (string, error) {
+	id1, err := m.tasks.StartTrackingModifiedEntries()
+	if err != nil {
+		return "", err
+	}
+	id2, err := m.taskSpecs.StartTrackingModifiedEntries()
+	if err != nil {
+		m.tasks.StopTrackingModifiedEntries(id1)
+		return "", err
+	}
+	id3, err := m.commits.StartTrackingModifiedEntries()
+	if err != nil {
+		m.tasks.StopTrackingModifiedEntries(id1)
+		m.taskSpecs.StopTrackingModifiedEntries(id2)
+		return "", err
+	}
+	return id1 + "#" + id2 + "#" + id3, nil
+}
+
+// See docs for ModifiedComments interface.
+func (m *ModifiedCommentsImpl) StopTrackingModifiedComments(id string) {
+	ids := strings.Split(id, "#")
+	if len(ids) != 3 {
+		sklog.Errorf("Invalid id %q", id)
+		return
+	}
+	m.tasks.StopTrackingModifiedEntries(ids[0])
+	m.taskSpecs.StopTrackingModifiedEntries(ids[1])
+	m.commits.StopTrackingModifiedEntries(ids[2])
+}
+
 var _ db.ModifiedTasks = &ModifiedTasksImpl{}
 var _ db.ModifiedJobs = &ModifiedJobsImpl{}
+var _ db.ModifiedComments = &ModifiedCommentsImpl{}
diff --git a/task_scheduler/go/db/modified/modified_data_test.go b/task_scheduler/go/db/modified/modified_data_test.go
index 7d1a222..cb771ce 100644
--- a/task_scheduler/go/db/modified/modified_data_test.go
+++ b/task_scheduler/go/db/modified/modified_data_test.go
@@ -67,3 +67,34 @@
 	_, err = m.StartTrackingModifiedJobs()
 	assert.NoError(t, err)
 }
+
+func TestDefaultModifiedComments(t *testing.T) {
+	testutils.MediumTest(t)
+	m := &ModifiedCommentsImpl{}
+	db.TestModifiedComments(t, m)
+}
+
+func TestDefaultMultipleCommentModifications(t *testing.T) {
+	testutils.MediumTest(t)
+	m := &ModifiedCommentsImpl{}
+	db.TestMultipleCommentModifications(t, m)
+}
+
+func TestDefaultModifiedCommentsTooManyUsers(t *testing.T) {
+	testutils.MediumTest(t)
+	m := ModifiedCommentsImpl{}
+
+	var oneId string
+	// Max out the number of modified-tasks users; ensure that we error out.
+	for i := 0; i < db.MAX_MODIFIED_DATA_USERS; i++ {
+		id, err := m.StartTrackingModifiedComments()
+		assert.NoError(t, err)
+		oneId = id
+	}
+	_, err := m.StartTrackingModifiedComments()
+	assert.True(t, db.IsTooManyUsers(err))
+
+	m.StopTrackingModifiedComments(oneId)
+	_, err = m.StartTrackingModifiedComments()
+	assert.NoError(t, err)
+}
diff --git a/task_scheduler/go/db/modified/mux.go b/task_scheduler/go/db/modified/mux.go
index 7d25457..d89aa68 100644
--- a/task_scheduler/go/db/modified/mux.go
+++ b/task_scheduler/go/db/modified/mux.go
@@ -7,6 +7,23 @@
 	"go.skia.org/infra/task_scheduler/go/types"
 )
 
+// NewMuxModifiedData returns a db.ModifiedData implementation which writes to
+// multiple ModifiedData instances but only reads from one.
+func NewMuxModifiedData(readWrite db.ModifiedData, writeOnly ...db.ModifiedData) db.ModifiedData {
+	tWriteOnly := make([]db.ModifiedTasks, 0, len(writeOnly))
+	jWriteOnly := make([]db.ModifiedJobs, 0, len(writeOnly))
+	cWriteOnly := make([]db.ModifiedComments, 0, len(writeOnly))
+	for _, wo := range writeOnly {
+		tWriteOnly = append(tWriteOnly, wo)
+		jWriteOnly = append(jWriteOnly, wo)
+		cWriteOnly = append(cWriteOnly, wo)
+	}
+	t := NewMuxModifiedTasks(readWrite, tWriteOnly...)
+	j := NewMuxModifiedJobs(readWrite, jWriteOnly...)
+	c := NewMuxModifiedComments(readWrite, cWriteOnly...)
+	return db.NewModifiedData(t, j, c)
+}
+
 // MuxModifiedTasks is an implementation of db.ModifiedTasks which writes to
 // multiple ModifiedTasks instances but only reads from one.
 type MuxModifiedTasks struct {
@@ -14,7 +31,7 @@
 	writeOnly []db.ModifiedTasks
 }
 
-// New MuxModifiedTasks returns an implementation of db.ModifiedTasks which
+// NewMuxModifiedTasks returns an implementation of db.ModifiedTasks which
 // writes to multiple ModifiedTasks instances but only reads from one.
 func NewMuxModifiedTasks(readWrite db.ModifiedTasks, writeOnly ...db.ModifiedTasks) db.ModifiedTasks {
 	return &MuxModifiedTasks{
@@ -70,3 +87,43 @@
 		wo.TrackModifiedJobsGOB(dbModified, gobs)
 	}
 }
+
+// MuxModifiedComments is an implementation of db.ModifiedComments which writes
+// to multiple ModifiedJobs instances but only reads from one.
+type MuxModifiedComments struct {
+	db.ModifiedComments
+	writeOnly []db.ModifiedComments
+}
+
+// New MuxModifiedComments returns an implementation of db.ModifiedComments
+// which writes to multiple ModifiedJobs instances but only reads from one.
+func NewMuxModifiedComments(readWrite db.ModifiedComments, writeOnly ...db.ModifiedComments) db.ModifiedComments {
+	return &MuxModifiedComments{
+		ModifiedComments: readWrite,
+		writeOnly:        writeOnly,
+	}
+}
+
+// See documentation for db.ModifiedComments interface.
+func (m *MuxModifiedComments) TrackModifiedTaskComment(c *types.TaskComment) {
+	m.ModifiedComments.TrackModifiedTaskComment(c)
+	for _, wo := range m.writeOnly {
+		wo.TrackModifiedTaskComment(c)
+	}
+}
+
+// See documentation for db.ModifiedComments interface.
+func (m *MuxModifiedComments) TrackModifiedTaskSpecComment(c *types.TaskSpecComment) {
+	m.ModifiedComments.TrackModifiedTaskSpecComment(c)
+	for _, wo := range m.writeOnly {
+		wo.TrackModifiedTaskSpecComment(c)
+	}
+}
+
+// See documentation for db.ModifiedComments interface.
+func (m *MuxModifiedComments) TrackModifiedCommitComment(c *types.CommitComment) {
+	m.ModifiedComments.TrackModifiedCommitComment(c)
+	for _, wo := range m.writeOnly {
+		wo.TrackModifiedCommitComment(c)
+	}
+}
diff --git a/task_scheduler/go/db/modified/mux_test.go b/task_scheduler/go/db/modified/mux_test.go
index fc63641..a09bd10 100644
--- a/task_scheduler/go/db/modified/mux_test.go
+++ b/task_scheduler/go/db/modified/mux_test.go
@@ -104,3 +104,53 @@
 	m.TrackModifiedJob(t1)
 	check(t1)
 }
+
+func TestMuxModifiedComments(t *testing.T) {
+	testutils.MediumTest(t)
+	m := NewMuxModifiedComments(&ModifiedCommentsImpl{}, &ModifiedCommentsImpl{})
+	db.TestModifiedComments(t, m)
+}
+
+func TestMuxMultipleCommentModifications(t *testing.T) {
+	testutils.MediumTest(t)
+	m := NewMuxModifiedComments(&ModifiedCommentsImpl{}, &ModifiedCommentsImpl{})
+	db.TestMultipleCommentModifications(t, m)
+}
+
+// Simple test to verify that we actually write to the write-only ModifiedComments.
+func TestMuxModifiedCommentsWriteOnly(t *testing.T) {
+	testutils.MediumTest(t)
+	rw := &ModifiedCommentsImpl{}
+	w1 := &ModifiedCommentsImpl{}
+	w2 := &ModifiedCommentsImpl{}
+	w3 := &ModifiedCommentsImpl{}
+	m := NewMuxModifiedComments(rw, w1, w2, w3)
+	rwId, err := m.StartTrackingModifiedComments()
+	assert.NoError(t, err)
+	wo := []db.ModifiedComments{w1, w2, w3}
+	ids := []string{}
+	for _, w := range wo {
+		id, err := w.StartTrackingModifiedComments()
+		assert.NoError(t, err)
+		ids = append(ids, id)
+	}
+
+	check := func(e1 []*types.TaskComment, e2 []*types.TaskSpecComment, e3 []*types.CommitComment) {
+		a1, a2, a3, err := m.GetModifiedComments(rwId)
+		assert.NoError(t, err)
+		deepequal.AssertDeepEqual(t, e1, a1)
+		deepequal.AssertDeepEqual(t, e2, a2)
+		deepequal.AssertDeepEqual(t, e3, a3)
+		for idx, w := range []db.ModifiedComments{w1, w2, w3} {
+			a1, a2, a3, err := w.GetModifiedComments(ids[idx])
+			assert.NoError(t, err)
+			deepequal.AssertDeepEqual(t, e1, a1)
+			deepequal.AssertDeepEqual(t, e2, a2)
+			deepequal.AssertDeepEqual(t, e3, a3)
+		}
+	}
+	check([]*types.TaskComment{}, []*types.TaskSpecComment{}, []*types.CommitComment{})
+	c1 := types.MakeTaskComment(1, 1, 1, 1, time.Now())
+	m.TrackModifiedTaskComment(c1)
+	check([]*types.TaskComment{c1}, []*types.TaskSpecComment{}, []*types.CommitComment{})
+}
diff --git a/task_scheduler/go/db/pubsub/modified.go b/task_scheduler/go/db/pubsub/modified.go
index 442ea85..0d95504 100644
--- a/task_scheduler/go/db/pubsub/modified.go
+++ b/task_scheduler/go/db/pubsub/modified.go
@@ -6,6 +6,7 @@
 	"encoding/gob"
 	"fmt"
 	"sort"
+	"strings"
 	"sync"
 	"time"
 
@@ -18,6 +19,27 @@
 	"google.golang.org/api/option"
 )
 
+// NewModifiedData returns a db.ModifiedData instance which uses pubsub.
+func NewModifiedData(topicSet, label string, ts oauth2.TokenSource) (db.ModifiedData, error) {
+	topicSetObj, ok := topics[topicSet]
+	if !ok {
+		return nil, fmt.Errorf("Topic must be one of %v, not %q", VALID_TOPIC_SETS, topicSet)
+	}
+	t, err := NewModifiedTasks(topicSetObj.tasks, label, ts)
+	if err != nil {
+		return nil, err
+	}
+	j, err := NewModifiedJobs(topicSetObj.jobs, label, ts)
+	if err != nil {
+		return nil, err
+	}
+	c, err := NewModifiedComments(topicSetObj.taskComments, topicSetObj.taskSpecComments, topicSetObj.commitComments, label, ts)
+	if err != nil {
+		return nil, err
+	}
+	return db.NewModifiedData(t, j, c), nil
+}
+
 type entry struct {
 	ts   time.Time
 	data []byte
@@ -133,7 +155,7 @@
 				data: m.Data,
 			}
 		} else {
-			sklog.Debugf("Received duplicate or outdated message for %s", dataId)
+			sklog.Debugf("Received duplicate or outdated message (%s vs %s) for %s", prev.ts, dbModified, dataId)
 		}
 		return nil
 	})
@@ -346,3 +368,164 @@
 func (c *jobClient) TrackModifiedJobsGOB(ts time.Time, jobsById map[string][]byte) {
 	c.publisher.publishGOB(ts, jobsById)
 }
+
+// commentClient implements db.ModifiedComments using pubsub.
+type commentClient struct {
+	tasks     *modifiedClient
+	taskSpecs *modifiedClient
+	commits   *modifiedClient
+}
+
+// NewModifiedComments returns a db.ModifiedComments which uses pubsub. The
+// topics should be one of the sets of TOPIC_* constants defined in this
+// package. The subscriberLabel is included in the subscription ID, along with a
+// timestamp; this should help to debug zombie subscriptions. It should be
+// descriptive and unique to this process, or if the process uses multiple
+// instances of ModifiedJobs, unique to each instance.
+func NewModifiedComments(taskCommentsTopic, taskSpecCommentsTopic, commitCommentsTopic string, label string, ts oauth2.TokenSource) (db.ModifiedComments, error) {
+	c, err := pubsub.NewClient(context.Background(), PROJECT_ID, option.WithTokenSource(ts))
+	if err != nil {
+		return nil, err
+	}
+	tasks, err := newModifiedClient(c, taskCommentsTopic, label)
+	if err != nil {
+		return nil, err
+	}
+	taskSpecs, err := newModifiedClient(c, taskSpecCommentsTopic, label)
+	if err != nil {
+		return nil, err
+	}
+	commits, err := newModifiedClient(c, commitCommentsTopic, label)
+	if err != nil {
+		return nil, err
+	}
+	return &commentClient{
+		tasks:     tasks,
+		taskSpecs: taskSpecs,
+		commits:   commits,
+	}, nil
+}
+
+// See documentation for db.ModifiedComments interface.
+func (c *commentClient) GetModifiedComments(id string) ([]*types.TaskComment, []*types.TaskSpecComment, []*types.CommitComment, error) {
+	ids := strings.Split(id, "#")
+	if len(ids) != 3 {
+		return nil, nil, nil, db.ErrUnknownId
+	}
+	gobs, err := c.tasks.getModifiedData(ids[0])
+	if err != nil {
+		return nil, nil, nil, err
+	}
+	rv1 := make([]*types.TaskComment, 0, len(gobs))
+	for _, g := range gobs {
+		var c types.TaskComment
+		if err := gob.NewDecoder(bytes.NewReader(g)).Decode(&c); err != nil {
+			// We didn't attempt to decode the blob in the pubsub
+			// message when we received it. Ignore this job.
+			sklog.Errorf("Failed to decode job from pubsub message: %s", err)
+		} else {
+			rv1 = append(rv1, &c)
+		}
+	}
+	sort.Sort(types.TaskCommentSlice(rv1))
+
+	gobs, err = c.taskSpecs.getModifiedData(ids[1])
+	if err != nil {
+		return nil, nil, nil, err
+	}
+	rv2 := make([]*types.TaskSpecComment, 0, len(gobs))
+	for _, g := range gobs {
+		var c types.TaskSpecComment
+		if err := gob.NewDecoder(bytes.NewReader(g)).Decode(&c); err != nil {
+			// We didn't attempt to decode the blob in the pubsub
+			// message when we received it. Ignore this job.
+			sklog.Errorf("Failed to decode job from pubsub message: %s", err)
+		} else {
+			rv2 = append(rv2, &c)
+		}
+	}
+	sort.Sort(types.TaskSpecCommentSlice(rv2))
+
+	gobs, err = c.commits.getModifiedData(ids[2])
+	if err != nil {
+		return nil, nil, nil, err
+	}
+	rv3 := make([]*types.CommitComment, 0, len(gobs))
+	for _, g := range gobs {
+		var c types.CommitComment
+		if err := gob.NewDecoder(bytes.NewReader(g)).Decode(&c); err != nil {
+			// We didn't attempt to decode the blob in the pubsub
+			// message when we received it. Ignore this job.
+			sklog.Errorf("Failed to decode job from pubsub message: %s", err)
+		} else {
+			rv3 = append(rv3, &c)
+		}
+	}
+	sort.Sort(types.CommitCommentSlice(rv3))
+	return rv1, rv2, rv3, nil
+}
+
+// See documentation for db.ModifiedComments interface.
+func (c *commentClient) StartTrackingModifiedComments() (string, error) {
+	id1, err := c.tasks.startTrackingModifiedData()
+	if err != nil {
+		return "", err
+	}
+	id2, err := c.taskSpecs.startTrackingModifiedData()
+	if err != nil {
+		return "", err
+	}
+	id3, err := c.commits.startTrackingModifiedData()
+	if err != nil {
+		return "", err
+	}
+	return id1 + "#" + id2 + "#" + id3, nil
+}
+
+// See documentation for db.ModifiedComments interface.
+func (c *commentClient) StopTrackingModifiedComments(id string) {
+	ids := strings.Split(id, "#")
+	if len(ids) != 3 {
+		sklog.Errorf("Invalid ID %q", id)
+		return
+	}
+	c.tasks.stopTrackingModifiedData(ids[0])
+	c.taskSpecs.stopTrackingModifiedData(ids[1])
+	c.commits.stopTrackingModifiedData(ids[2])
+}
+
+// See documentation for db.ModifiedComments interface.
+func (c *commentClient) TrackModifiedTaskComment(tc *types.TaskComment) {
+	// Hack: since the timestamp is part of the ID, we can't change it. But,
+	// we have to provide a different timestamp from the one we sent when
+	// the comment was created, or else it'll get de-duplicated.
+	ts := tc.Timestamp
+	if tc.Deleted != nil && *tc.Deleted {
+		ts = time.Now()
+	}
+	c.tasks.publisher.publish(tc.Id(), ts, tc)
+}
+
+// See documentation for db.ModifiedComments interface.
+func (c *commentClient) TrackModifiedTaskSpecComment(tc *types.TaskSpecComment) {
+	// Hack: since the timestamp is part of the ID, we can't change it. But,
+	// we have to provide a different timestamp from the one we sent when
+	// the comment was created, or else it'll get de-duplicated.
+	ts := tc.Timestamp
+	if tc.Deleted != nil && *tc.Deleted {
+		ts = time.Now()
+	}
+	c.taskSpecs.publisher.publish(tc.Id(), ts, tc)
+}
+
+// See documentation for db.ModifiedComments interface.
+func (c *commentClient) TrackModifiedCommitComment(cc *types.CommitComment) {
+	// Hack: since the timestamp is part of the ID, we can't change it. But,
+	// we have to provide a different timestamp from the one we sent when
+	// the comment was created, or else it'll get de-duplicated.
+	ts := cc.Timestamp
+	if cc.Deleted != nil && *cc.Deleted {
+		ts = time.Now()
+	}
+	c.commits.publisher.publish(cc.Id(), ts, cc)
+}
diff --git a/task_scheduler/go/db/pubsub/modified_test.go b/task_scheduler/go/db/pubsub/modified_test.go
index 9a5014a..c87ce0f 100644
--- a/task_scheduler/go/db/pubsub/modified_test.go
+++ b/task_scheduler/go/db/pubsub/modified_test.go
@@ -45,3 +45,23 @@
 	m := setupJobs(t)
 	db.TestMultipleJobModifications(t, m)
 }
+
+func setupComments(t *testing.T) db.ModifiedComments {
+	testutils.LargeTest(t)
+	topic1 := fmt.Sprintf("modified-comments-test-tasks-%s", uuid.New())
+	topic2 := fmt.Sprintf("modified-comments-test-taskspecs-%s", uuid.New())
+	topic3 := fmt.Sprintf("modified-comments-test-commits-%s", uuid.New())
+	m, err := NewModifiedComments(topic1, topic2, topic3, "fake-label", nil)
+	assert.NoError(t, err)
+	return m
+}
+
+func TestPubsubModifiedComments(t *testing.T) {
+	m := setupComments(t)
+	db.TestModifiedComments(t, m)
+}
+
+func TestPubsubMultipleCommentModifications(t *testing.T) {
+	m := setupComments(t)
+	db.TestMultipleCommentModifications(t, m)
+}
diff --git a/task_scheduler/go/db/pubsub/pubsub.go b/task_scheduler/go/db/pubsub/pubsub.go
index e5efb42..18e6ca0 100644
--- a/task_scheduler/go/db/pubsub/pubsub.go
+++ b/task_scheduler/go/db/pubsub/pubsub.go
@@ -24,13 +24,27 @@
 	// Default project ID.
 	PROJECT_ID = "skia-public"
 
+	// Sets of topic, based on scheduler instance.
+	TOPIC_SET_PROD     = "prod"
+	TOPIC_SET_INTERNAL = "internal"
+	TOPIC_SET_STAGING  = "staging"
+
 	// Known topic names.
-	TOPIC_TASKS          = "task-scheduler-modified-tasks"
-	TOPIC_TASKS_INTERNAL = "task-scheduler-modified-tasks-internal"
-	TOPIC_TASKS_STAGING  = "task-scheduler-modified-tasks-staging"
-	TOPIC_JOBS           = "task-scheduler-modified-jobs"
-	TOPIC_JOBS_INTERNAL  = "task-scheduler-modified-jobs-internal"
-	TOPIC_JOBS_STAGING   = "task-scheduler-modified-jobs-staging"
+	TOPIC_TASKS                      = "task-scheduler-modified-tasks"
+	TOPIC_TASKS_INTERNAL             = "task-scheduler-modified-tasks-internal"
+	TOPIC_TASKS_STAGING              = "task-scheduler-modified-tasks-staging"
+	TOPIC_JOBS                       = "task-scheduler-modified-jobs"
+	TOPIC_JOBS_INTERNAL              = "task-scheduler-modified-jobs-internal"
+	TOPIC_JOBS_STAGING               = "task-scheduler-modified-jobs-staging"
+	TOPIC_TASK_COMMENTS              = "task-scheduler-modified-task-comments"
+	TOPIC_TASK_COMMENTS_INTERNAL     = "task-scheduler-modified-task-comments-internal"
+	TOPIC_TASK_COMMENTS_STAGING      = "task-scheduler-modified-task-comments-staging"
+	TOPIC_TASKSPEC_COMMENTS          = "task-scheduler-modified-taskspec-comments"
+	TOPIC_TASKSPEC_COMMENTS_INTERNAL = "task-scheduler-modified-taskspec-comments-internal"
+	TOPIC_TASKSPEC_COMMENTS_STAGING  = "task-scheduler-modified-taskspec-comments-staging"
+	TOPIC_COMMIT_COMMENTS            = "task-scheduler-modified-commit-comments"
+	TOPIC_COMMIT_COMMENTS_INTERNAL   = "task-scheduler-modified-commit-comments-internal"
+	TOPIC_COMMIT_COMMENTS_STAGING    = "task-scheduler-modified-commit-comments-staging"
 
 	// Attributes sent with all pubsub messages.
 
@@ -42,6 +56,47 @@
 	ATTR_SENDER_ID = "sender"
 )
 
+var (
+	VALID_TOPIC_SETS = []string{
+		TOPIC_SET_PROD,
+		TOPIC_SET_INTERNAL,
+		TOPIC_SET_STAGING,
+	}
+
+	topics = map[string]topicSet{
+		TOPIC_SET_PROD: topicSet{
+			tasks:            TOPIC_TASKS,
+			jobs:             TOPIC_JOBS,
+			taskComments:     TOPIC_TASK_COMMENTS,
+			taskSpecComments: TOPIC_TASKSPEC_COMMENTS,
+			commitComments:   TOPIC_COMMIT_COMMENTS,
+		},
+		TOPIC_SET_INTERNAL: topicSet{
+			tasks:            TOPIC_TASKS_INTERNAL,
+			jobs:             TOPIC_JOBS_INTERNAL,
+			taskComments:     TOPIC_TASK_COMMENTS_INTERNAL,
+			taskSpecComments: TOPIC_TASKSPEC_COMMENTS_INTERNAL,
+			commitComments:   TOPIC_COMMIT_COMMENTS_INTERNAL,
+		},
+		TOPIC_SET_STAGING: topicSet{
+			tasks:            TOPIC_TASKS_STAGING,
+			jobs:             TOPIC_JOBS_STAGING,
+			taskComments:     TOPIC_TASK_COMMENTS_STAGING,
+			taskSpecComments: TOPIC_TASKSPEC_COMMENTS_STAGING,
+			commitComments:   TOPIC_COMMIT_COMMENTS_STAGING,
+		},
+	}
+)
+
+// topicSet is used for organizing sets of pubsub topics.
+type topicSet struct {
+	tasks            string
+	jobs             string
+	taskComments     string
+	taskSpecComments string
+	commitComments   string
+}
+
 // publisher sends pubsub messages for modified tasks and jobs.
 type publisher struct {
 	senderId string
@@ -157,6 +212,78 @@
 	p.publish(j.Id, j.DbModified, j)
 }
 
+// TaskSpecCommentPublisher sends pubsub messages for comments.
+type TaskSpecCommentPublisher struct {
+	*publisher
+}
+
+// NewTaskSpecCommentPublisher creates a TaskSpecCommentPublisher instance. It
+// creates the given topic if it does not already exist.
+func NewTaskSpecCommentPublisher(topic string, ts oauth2.TokenSource) (*TaskSpecCommentPublisher, error) {
+	c, err := pubsub.NewClient(context.Background(), PROJECT_ID, option.WithTokenSource(ts))
+	if err != nil {
+		return nil, err
+	}
+	pub, err := newPublisher(c, topic)
+	if err != nil {
+		return nil, err
+	}
+	return &TaskSpecCommentPublisher{pub}, nil
+}
+
+// Publish publishes a pubsub message for the given comment.
+func (p *TaskSpecCommentPublisher) Publish(t *types.TaskSpecComment) {
+	p.publish(t.Id(), t.Timestamp, t)
+}
+
+// TaskCommentPublisher sends pubsub messages for comments.
+type TaskCommentPublisher struct {
+	*publisher
+}
+
+// NewTaskCommentPublisher creates a TaskCommentPublisher instance. It creates the given
+// topic if it does not already exist.
+func NewTaskCommentPublisher(topic string, ts oauth2.TokenSource) (*TaskCommentPublisher, error) {
+	c, err := pubsub.NewClient(context.Background(), PROJECT_ID, option.WithTokenSource(ts))
+	if err != nil {
+		return nil, err
+	}
+	pub, err := newPublisher(c, topic)
+	if err != nil {
+		return nil, err
+	}
+	return &TaskCommentPublisher{pub}, nil
+}
+
+// Publish publishes a pubsub message for the given comment.
+func (p *TaskCommentPublisher) Publish(t *types.TaskComment) {
+	p.publish(t.Id(), t.Timestamp, t)
+}
+
+// CommitCommentPublisher sends pubsub messages for comments.
+type CommitCommentPublisher struct {
+	*publisher
+}
+
+// NewCommitCommentPublisher creates a CommitCommentPublisher instance. It creates the given
+// topic if it does not already exist.
+func NewCommitCommentPublisher(topic string, ts oauth2.TokenSource) (*CommitCommentPublisher, error) {
+	c, err := pubsub.NewClient(context.Background(), PROJECT_ID, option.WithTokenSource(ts))
+	if err != nil {
+		return nil, err
+	}
+	pub, err := newPublisher(c, topic)
+	if err != nil {
+		return nil, err
+	}
+	return &CommitCommentPublisher{pub}, nil
+}
+
+// Publish publishes a pubsub message for the given comment.
+func (p *CommitCommentPublisher) Publish(t *types.CommitComment) {
+	p.publish(t.Id(), t.Timestamp, t)
+}
+
 // subscriber uses pubsub to watch for modified data.
 type subscriber struct {
 	client   *pubsub.Client
@@ -296,3 +423,90 @@
 	}
 	return s.start()
 }
+
+// NewTaskCommentSubscriber creates a subscriber which calls the given callback
+// function for every pubsub message. The topic should be one of the TOPIC_*
+// constants defined in this package. The subscriberLabel is included in the
+// subscription ID, along with a timestamp; this should help to debug zombie
+// subscriptions. Acknowledgement of the message is done automatically based on
+// the return value of the callback: if the callback returns an error, the
+// message is Nack'd and will be re-sent at a later time, otherwise the message
+// is Ack'd and will not be re-sent. Therefore, if the comment is not valid or
+// otherwise cannot ever be processed, the callback should return nil to prevent
+// the message from being re-sent.
+func NewTaskCommentSubscriber(topic, subscriberLabel string, ts oauth2.TokenSource, callback func(*types.TaskComment) error) (context.CancelFunc, error) {
+	c, err := pubsub.NewClient(context.Background(), PROJECT_ID, option.WithTokenSource(ts))
+	if err != nil {
+		return nil, err
+	}
+	s, err := newSubscriber(c, topic, subscriberLabel, func(m *pubsub.Message) error {
+		var c types.TaskComment
+		if err := gob.NewDecoder(bytes.NewReader(m.Data)).Decode(&c); err != nil {
+			sklog.Errorf("Failed to decode TaskComment from pubsub message: %s", err)
+			return nil // We will never be able to process this message.
+		}
+		return callback(&c)
+	})
+	if err != nil {
+		return nil, err
+	}
+	return s.start()
+}
+
+// NewTaskSpecCommentSubscriber creates a subscriber which calls the given
+// callback function for every pubsub message. The topic should be one of the
+// TOPIC_* constants defined in this package. The subscriberLabel is included in
+// the subscription ID, along with a timestamp; this should help to debug zombie
+// subscriptions. Acknowledgement of the message is done automatically based on
+// the return value of the callback: if the callback returns an error, the
+// message is Nack'd and will be re-sent at a later time, otherwise the message
+// is Ack'd and will not be re-sent. Therefore, if the comment is not valid or
+// otherwise cannot ever be processed, the callback should return nil to prevent
+// the message from being re-sent.
+func NewTaskSpecCommentSubscriber(topic, subscriberLabel string, ts oauth2.TokenSource, callback func(*types.TaskSpecComment) error) (context.CancelFunc, error) {
+	c, err := pubsub.NewClient(context.Background(), PROJECT_ID, option.WithTokenSource(ts))
+	if err != nil {
+		return nil, err
+	}
+	s, err := newSubscriber(c, topic, subscriberLabel, func(m *pubsub.Message) error {
+		var c types.TaskSpecComment
+		if err := gob.NewDecoder(bytes.NewReader(m.Data)).Decode(&c); err != nil {
+			sklog.Errorf("Failed to decode TaskSpecComment from pubsub message: %s", err)
+			return nil // We will never be able to process this message.
+		}
+		return callback(&c)
+	})
+	if err != nil {
+		return nil, err
+	}
+	return s.start()
+}
+
+// NewCommitCommentSubscriber creates a subscriber which calls the given
+// callback function for every pubsub message. The topic should be one of the
+// TOPIC_* constants defined in this package. The subscriberLabel is included in
+// the subscription ID, along with a timestamp; this should help to debug zombie
+// subscriptions. Acknowledgement of the message is done automatically based on
+// the return value of the callback: if the callback returns an error, the
+// message is Nack'd and will be re-sent at a later time, otherwise the message
+// is Ack'd and will not be re-sent. Therefore, if the comment is not valid or
+// otherwise cannot ever be processed, the callback should return nil to prevent
+// the message from being re-sent.
+func NewCommitCommentSubscriber(topic, subscriberLabel string, ts oauth2.TokenSource, callback func(*types.CommitComment) error) (context.CancelFunc, error) {
+	c, err := pubsub.NewClient(context.Background(), PROJECT_ID, option.WithTokenSource(ts))
+	if err != nil {
+		return nil, err
+	}
+	s, err := newSubscriber(c, topic, subscriberLabel, func(m *pubsub.Message) error {
+		var c types.CommitComment
+		if err := gob.NewDecoder(bytes.NewReader(m.Data)).Decode(&c); err != nil {
+			sklog.Errorf("Failed to decode CommitComment from pubsub message: %s", err)
+			return nil // We will never be able to process this message.
+		}
+		return callback(&c)
+	})
+	if err != nil {
+		return nil, err
+	}
+	return s.start()
+}
diff --git a/task_scheduler/go/db/recovery/backups_test.go b/task_scheduler/go/db/recovery/backups_test.go
index 0cd38e4..eda14e3 100644
--- a/task_scheduler/go/db/recovery/backups_test.go
+++ b/task_scheduler/go/db/recovery/backups_test.go
@@ -136,7 +136,7 @@
 	assert.NoError(t, os.MkdirAll(path.Join(dir, TRIGGER_DIRNAME), os.ModePerm))
 
 	db := &testDB{
-		DB:      memory.NewInMemoryDB(nil, nil),
+		DB:      memory.NewInMemoryDB(nil),
 		content: content,
 		ts:      time.Unix(TEST_DB_TIME, 0),
 	}
diff --git a/task_scheduler/go/db/remote_db/remote_db.go b/task_scheduler/go/db/remote_db/remote_db.go
index 8b79c3f..7c20be5 100644
--- a/task_scheduler/go/db/remote_db/remote_db.go
+++ b/task_scheduler/go/db/remote_db/remote_db.go
@@ -80,27 +80,21 @@
 type client struct {
 	serverRoot string
 	client     *http.Client
-	db.ModifiedTasks
-	db.ModifiedJobs
+	db.ModifiedData
 }
 
 // NewClient returns a db.RemoteDB that connects to the server created by
 // NewServer. serverRoot should end with a slash.
-func NewClient(serverRoot, tasksTopic, jobsTopic, label string, ts oauth2.TokenSource) (db.RemoteDB, error) {
+func NewClient(serverRoot, topicSet, label string, ts oauth2.TokenSource) (db.RemoteDB, error) {
 	c := httputils.DefaultClientConfig().WithTokenSource(ts).Client()
-	modTasks, err := pubsub.NewModifiedTasks(tasksTopic, label, ts)
-	if err != nil {
-		return nil, err
-	}
-	modJobs, err := pubsub.NewModifiedJobs(jobsTopic, label, ts)
+	mod, err := pubsub.NewModifiedData(topicSet, label, ts)
 	if err != nil {
 		return nil, err
 	}
 	return &client{
-		serverRoot:    serverRoot,
-		client:        c,
-		ModifiedTasks: modTasks,
-		ModifiedJobs:  modJobs,
+		serverRoot:   serverRoot,
+		client:       c,
+		ModifiedData: mod,
 	}, nil
 }
 
diff --git a/task_scheduler/go/db/remote_db/remote_db_test.go b/task_scheduler/go/db/remote_db/remote_db_test.go
index 926ecb8..f5b9b31 100644
--- a/task_scheduler/go/db/remote_db/remote_db_test.go
+++ b/task_scheduler/go/db/remote_db/remote_db_test.go
@@ -97,17 +97,15 @@
 // makeDB sets up a client/server pair wrapped in a clientWithBackdoor.
 func makeDB(t *testing.T) db.DBCloser {
 	serverLabel := fmt.Sprintf("remote-db-test-%s", uuid.New())
-	modTasks, err := pubsub.NewModifiedTasks(pubsub.TOPIC_TASKS, serverLabel, nil)
+	mod, err := pubsub.NewModifiedData(pubsub.TOPIC_SET_PROD, serverLabel, nil)
 	assert.NoError(t, err)
-	modJobs, err := pubsub.NewModifiedJobs(pubsub.TOPIC_JOBS, serverLabel, nil)
-	assert.NoError(t, err)
-	baseDB := memory.NewInMemoryDB(modTasks, modJobs)
+	baseDB := memory.NewInMemoryDB(mod)
 	r := mux.NewRouter()
 	err = RegisterServer(baseDB, r.PathPrefix("/db").Subrouter())
 	assert.NoError(t, err)
 	ts := httptest.NewServer(r)
 	clientLabel := fmt.Sprintf("remote-db-test-%s", uuid.New())
-	dbclient, err := NewClient(ts.URL+"/db/", pubsub.TOPIC_TASKS, pubsub.TOPIC_JOBS, clientLabel, nil)
+	dbclient, err := NewClient(ts.URL+"/db/", pubsub.TOPIC_SET_PROD, clientLabel, nil)
 	assert.NoError(t, err)
 	dbclient.(*client).client.Transport = newReqCountingTransport(dbclient.(*client).client.Transport)
 	return &clientWithBackdoor{
diff --git a/task_scheduler/go/db/testutil.go b/task_scheduler/go/db/testutil.go
index e80d30b..bf650f1 100644
--- a/task_scheduler/go/db/testutil.go
+++ b/task_scheduler/go/db/testutil.go
@@ -72,6 +72,33 @@
 	}))
 }
 
+func findModifiedComments(t testutils.TestingT, m ModifiedComments, id string, e1 []*types.TaskComment, e2 []*types.TaskSpecComment, e3 []*types.CommitComment) {
+	// Note that the slice only works because we don't call
+	// TrackModifiedJob more than once for any given job, otherwise
+	// we'd have to use a map and compare DbModified.
+	a1 := []*types.TaskComment{}
+	a2 := []*types.TaskSpecComment{}
+	a3 := []*types.CommitComment{}
+	assert.NoError(t, testutils.EventuallyConsistent(10*time.Second, func() error {
+		c1, c2, c3, err := m.GetModifiedComments(id)
+		assert.NoError(t, err)
+		a1 = append(a1, c1...)
+		a2 = append(a2, c2...)
+		a3 = append(a3, c3...)
+		if len(a1) != len(e1) || len(a2) != len(e2) || len(a3) != len(e3) {
+			time.Sleep(100 * time.Millisecond)
+			return testutils.TryAgainErr
+		}
+		sort.Sort(types.TaskCommentSlice(a1))
+		sort.Sort(types.TaskSpecCommentSlice(a2))
+		sort.Sort(types.CommitCommentSlice(a3))
+		deepequal.AssertDeepEqual(t, e1, a1)
+		deepequal.AssertDeepEqual(t, e2, a2)
+		deepequal.AssertDeepEqual(t, e3, a3)
+		return nil
+	}))
+}
+
 // TestTaskDB performs basic tests for an implementation of TaskDB.
 func TestTaskDB(t testutils.TestingT, db TaskDB) {
 	_, err := db.GetModifiedTasks("dummy-id")
@@ -1174,7 +1201,19 @@
 
 // TestCommentDB validates that db correctly implements the CommentDB interface.
 func TestCommentDB(t testutils.TestingT, db CommentDB) {
-	now := time.Now()
+	_, _, _, err := db.GetModifiedComments("dummy-id")
+	assert.True(t, IsUnknownId(err))
+
+	id, err := db.StartTrackingModifiedComments()
+	assert.NoError(t, err)
+
+	c1, c2, c3, err := db.GetModifiedComments(id)
+	assert.NoError(t, err)
+	assert.Equal(t, 0, len(c1))
+	assert.Equal(t, 0, len(c2))
+	assert.Equal(t, 0, len(c3))
+
+	now := time.Now().Truncate(TS_RESOLUTION)
 
 	// Empty db.
 	r0 := fmt.Sprintf("%s%d", common.REPO_SKIA, 0)
@@ -1195,40 +1234,43 @@
 	}
 
 	// Add some comments.
-	tc1 := types.MakeTaskComment(1, 1, 1, 1, now)
-	tc2 := types.MakeTaskComment(2, 1, 1, 1, now.Add(2*time.Second))
-	tc3 := types.MakeTaskComment(3, 1, 1, 1, now.Add(time.Second))
-	tc4 := types.MakeTaskComment(4, 1, 1, 2, now)
-	tc5 := types.MakeTaskComment(5, 1, 2, 2, now)
-	tc6 := types.MakeTaskComment(6, 2, 3, 3, now)
+	tc1 := types.MakeTaskComment(1, 1, 1, 1, now.Add(-2*time.Second))
+	tc2 := types.MakeTaskComment(2, 1, 1, 1, now)
+	tc3 := types.MakeTaskComment(3, 1, 1, 1, now.Add(-time.Second))
+	tc4 := types.MakeTaskComment(4, 1, 1, 2, now.Add(-2*time.Second+time.Millisecond))
+	tc5 := types.MakeTaskComment(5, 1, 2, 2, now.Add(-2*time.Second+2*time.Millisecond))
+	tc6 := types.MakeTaskComment(6, 2, 3, 3, now.Add(-2*time.Second+3*time.Millisecond))
 	for _, c := range []*types.TaskComment{tc1, tc2, tc3, tc4, tc5, tc6} {
 		assert.NoError(t, db.PutTaskComment(c))
 	}
 	tc6copy := tc6.Copy()
 	tc6.Message = "modifying after Put shouldn't affect stored comment"
 
-	sc1 := types.MakeTaskSpecComment(1, 1, 1, now)
-	sc2 := types.MakeTaskSpecComment(2, 1, 1, now.Add(2*time.Second))
-	sc3 := types.MakeTaskSpecComment(3, 1, 1, now.Add(time.Second))
-	sc4 := types.MakeTaskSpecComment(4, 1, 2, now)
-	sc5 := types.MakeTaskSpecComment(5, 2, 3, now)
+	sc1 := types.MakeTaskSpecComment(1, 1, 1, now.Add(-2*time.Second))
+	sc2 := types.MakeTaskSpecComment(2, 1, 1, now)
+	sc3 := types.MakeTaskSpecComment(3, 1, 1, now.Add(-time.Second))
+	sc4 := types.MakeTaskSpecComment(4, 1, 2, now.Add(-2*time.Second+time.Millisecond))
+	sc5 := types.MakeTaskSpecComment(5, 2, 3, now.Add(-2*time.Second+2*time.Millisecond))
 	for _, c := range []*types.TaskSpecComment{sc1, sc2, sc3, sc4, sc5} {
 		assert.NoError(t, db.PutTaskSpecComment(c))
 	}
 	sc5copy := sc5.Copy()
 	sc5.Message = "modifying after Put shouldn't affect stored comment"
 
-	cc1 := types.MakeCommitComment(1, 1, 1, now)
-	cc2 := types.MakeCommitComment(2, 1, 1, now.Add(2*time.Second))
-	cc3 := types.MakeCommitComment(3, 1, 1, now.Add(time.Second))
-	cc4 := types.MakeCommitComment(4, 1, 2, now)
-	cc5 := types.MakeCommitComment(5, 2, 3, now)
+	cc1 := types.MakeCommitComment(1, 1, 1, now.Add(-2*time.Second))
+	cc2 := types.MakeCommitComment(2, 1, 1, now)
+	cc3 := types.MakeCommitComment(3, 1, 1, now.Add(-time.Second))
+	cc4 := types.MakeCommitComment(4, 1, 2, now.Add(-2*time.Second+time.Millisecond))
+	cc5 := types.MakeCommitComment(5, 2, 3, now.Add(-2*time.Second+2*time.Millisecond))
 	for _, c := range []*types.CommitComment{cc1, cc2, cc3, cc4, cc5} {
 		assert.NoError(t, db.PutCommitComment(c))
 	}
 	cc5copy := cc5.Copy()
 	cc5.Message = "modifying after Put shouldn't affect stored comment"
 
+	// Ensure that all comments show up in the modified list.
+	findModifiedComments(t, db, id, []*types.TaskComment{tc1, tc4, tc5, tc6copy, tc3, tc2}, []*types.TaskSpecComment{sc1, sc4, sc5copy, sc3, sc2}, []*types.CommitComment{cc1, cc4, cc5copy, cc3, cc2})
+
 	// Check that adding duplicate non-identical comment gives an error.
 	tc1different := tc1.Copy()
 	tc1different.Message = "not the same"
@@ -1285,7 +1327,7 @@
 
 	// Specifying a cutoff time shouldn't drop required comments.
 	{
-		actual, err := db.GetCommentsForRepos([]string{r1}, now.Add(time.Second))
+		actual, err := db.GetCommentsForRepos([]string{r1}, now.Add(-time.Second))
 		assert.NoError(t, err)
 		assert.Equal(t, 1, len(actual))
 		{
@@ -1319,6 +1361,9 @@
 			AssertDeepEqual(t, cc2, ccs[offset+1])
 		}
 	}
+	// Clear out modified comments.
+	_, _, _, err = db.GetModifiedComments(id)
+	assert.NoError(t, err)
 
 	// Delete some comments.
 	assert.NoError(t, db.DeleteTaskComment(tc3))
@@ -1347,6 +1392,14 @@
 		actual, err := db.GetCommentsForRepos([]string{r0, r1, r2}, now.Add(-10000*time.Hour))
 		assert.NoError(t, err)
 		AssertDeepEqual(t, expected, actual)
+		deleted := true
+		tc1different.Deleted = &deleted
+		sc1different.Deleted = &deleted
+		cc1different.Deleted = &deleted
+		tc3.Deleted = &deleted
+		sc3.Deleted = &deleted
+		cc3.Deleted = &deleted
+		findModifiedComments(t, db, id, []*types.TaskComment{tc1different, tc3}, []*types.TaskSpecComment{sc1different, sc3}, []*types.CommitComment{cc1different, cc3})
 	}
 
 	// Delete all the comments.
@@ -1664,6 +1717,93 @@
 	}))
 }
 
+func TestModifiedComments(t testutils.TestingT, m ModifiedComments) {
+	_, _, _, err := m.GetModifiedComments("dummy-id")
+	assert.True(t, IsUnknownId(err))
+
+	id, err := m.StartTrackingModifiedComments()
+	assert.NoError(t, err)
+
+	time.Sleep(time.Second)
+	a1, a2, a3, err := m.GetModifiedComments(id)
+	assert.NoError(t, err)
+	assert.Equal(t, 0, len(a1))
+	assert.Equal(t, 0, len(a2))
+	assert.Equal(t, 0, len(a3))
+
+	c1 := types.MakeTaskComment(1, 1, 1, 1, time.Now())
+
+	// Insert the comment.
+	m.TrackModifiedTaskComment(c1)
+
+	// Ensure that the comment shows up in the modified list.
+	findModifiedComments(t, m, id, []*types.TaskComment{c1}, []*types.TaskSpecComment{}, []*types.CommitComment{})
+
+	// Insert two more comments.
+	c2 := types.MakeTaskComment(2, 2, 2, 2, time.Now())
+	m.TrackModifiedTaskComment(c2)
+	c3 := types.MakeTaskComment(3, 3, 3, 3, time.Now())
+	m.TrackModifiedTaskComment(c3)
+
+	// Ensure that both comments show up in the modified list.
+	findModifiedComments(t, m, id, []*types.TaskComment{c2, c3}, []*types.TaskSpecComment{}, []*types.CommitComment{})
+
+	// Insert two more comments.
+	c4 := types.MakeTaskSpecComment(4, 4, 4, time.Now())
+	m.TrackModifiedTaskSpecComment(c4)
+	c5 := types.MakeCommitComment(5, 5, 5, time.Now())
+	m.TrackModifiedCommitComment(c5)
+
+	// Ensure that both comments show up in the modified list.
+	findModifiedComments(t, m, id, []*types.TaskComment{}, []*types.TaskSpecComment{c4}, []*types.CommitComment{c5})
+
+	// Check StopTrackingModifiedComments.
+	m.StopTrackingModifiedComments(id)
+	err = testutils.EventuallyConsistent(10*time.Second, func() error {
+		_, _, _, err := m.GetModifiedComments(id)
+		if err == nil {
+			return testutils.TryAgainErr
+		}
+		return err
+	})
+	assert.True(t, IsUnknownId(err))
+}
+
+func TestMultipleCommentModifications(t testutils.TestingT, m ModifiedComments) {
+	id, err := m.StartTrackingModifiedComments()
+	assert.NoError(t, err)
+
+	c1 := types.MakeTaskComment(1, 1, 1, 1, time.Now())
+
+	// Insert the comment.
+	m.TrackModifiedTaskComment(c1)
+
+	// Delete the comment.
+	deleted := true
+	c1.Deleted = &deleted
+	m.TrackModifiedTaskComment(c1)
+
+	// Ensure that the comment shows up only once in the modified list and
+	// is the most recent value.
+	var actual *types.TaskComment
+	assert.NoError(t, testutils.EventuallyConsistent(10*time.Second, func() error {
+		a1, _, _, err := m.GetModifiedComments(id)
+		if err != nil {
+			return err
+		}
+		if len(a1) == 1 {
+			if actual == nil || actual.Deleted == nil {
+				actual = a1[0]
+			}
+		}
+		if deepequal.DeepEqual(c1, actual) {
+			return nil
+		}
+		time.Sleep(100 * time.Millisecond)
+		return testutils.TryAgainErr
+	}))
+}
+
 func TestUpdateDBFromSwarmingTask(t testutils.TestingT, db TaskDB) {
 	// Create task, initialize from swarming, and save.
 	now := time.Now().UTC().Round(time.Microsecond)
diff --git a/task_scheduler/go/scheduling/perftest/perftest.go b/task_scheduler/go/scheduling/perftest/perftest.go
index aa1a1bc..96b710b 100644
--- a/task_scheduler/go/scheduling/perftest/perftest.go
+++ b/task_scheduler/go/scheduling/perftest/perftest.go
@@ -265,7 +265,7 @@
 	assertNoError(err)
 	assertDeepEqual([]string{head}, commits)
 
-	d, err := local_db.NewDB("testdb", path.Join(workdir, "tasks.db"), nil, nil)
+	d, err := local_db.NewDB("testdb", path.Join(workdir, "tasks.db"), nil)
 	assertNoError(err)
 	w, err := window.New(time.Hour, 0, nil)
 	assertNoError(err)
diff --git a/task_scheduler/go/scheduling/task_scheduler_test.go b/task_scheduler/go/scheduling/task_scheduler_test.go
index 642945a..6ea2f69 100644
--- a/task_scheduler/go/scheduling/task_scheduler_test.go
+++ b/task_scheduler/go/scheduling/task_scheduler_test.go
@@ -194,7 +194,7 @@
 	assert.NoError(t, err)
 
 	assert.NoError(t, os.Mkdir(path.Join(tmp, periodic_triggers.TRIGGER_DIRNAME), os.ModePerm))
-	d := memory.NewInMemoryDB(nil, nil)
+	d := memory.NewInMemoryDB(nil)
 	isolateClient, err := isolate.NewClient(tmp, isolate.ISOLATE_SERVER_URL_FAKE)
 	assert.NoError(t, err)
 	swarmingClient := swarming_testutils.NewTestClient()
@@ -2008,7 +2008,7 @@
 	gb.Commit(ctx)
 
 	// Setup the scheduler.
-	d := memory.NewInMemoryDB(nil, nil)
+	d := memory.NewInMemoryDB(nil)
 	isolateClient, err := isolate.NewClient(workdir, isolate.ISOLATE_SERVER_URL_FAKE)
 	assert.NoError(t, err)
 	swarmingClient := swarming_testutils.NewTestClient()
@@ -3383,7 +3383,7 @@
 func TestValidateTaskForUpdate(t *testing.T) {
 	testutils.SmallTest(t)
 
-	d := memory.NewInMemoryDB(nil, nil)
+	d := memory.NewInMemoryDB(nil)
 
 	c1 := "abc123"
 	c2 := "def456"
@@ -3461,7 +3461,7 @@
 func TestUpdateTask(t *testing.T) {
 	testutils.SmallTest(t)
 
-	d := memory.NewInMemoryDB(nil, nil)
+	d := memory.NewInMemoryDB(nil)
 
 	c1 := "abc123"
 	c2 := "def456"
diff --git a/task_scheduler/go/task_scheduler/main.go b/task_scheduler/go/task_scheduler/main.go
index dce39f1..1c195ec 100644
--- a/task_scheduler/go/task_scheduler/main.go
+++ b/task_scheduler/go/task_scheduler/main.go
@@ -87,8 +87,7 @@
 	firestoreInstance = flag.String("firestore_instance", "", "Firestore instance to use, eg. \"prod\"")
 	isolateServer     = flag.String("isolate_server", isolate.ISOLATE_SERVER_URL, "Which Isolate server to use.")
 	local             = flag.Bool("local", false, "Whether we're running on a dev machine vs in production.")
-	pubsubTopicTasks  = flag.String("pubsub_topic_tasks", "", "Pubsub topic for tasks.")
-	pubsubTopicJobs   = flag.String("pubsub_topic_jobs", "", "Pubsub topic for jobs.")
+	pubsubTopicSet    = flag.String("pubsub_topic_set", "", fmt.Sprintf("Pubsub topic set; one of: %v", pubsub.VALID_TOPIC_SETS))
 	repoUrls          = common.NewMultiStringFlag("repo", nil, "Repositories for which to schedule tasks.")
 	recipesCfgFile    = flag.String("recipes_cfg", "", "Path to the recipes.cfg file.")
 	resourcesDir      = flag.String("resources_dir", "", "The directory to find templates, JS, and CSS files. If blank, assumes you're running inside a checkout and will attempt to find the resources relative to this source file.")
@@ -656,23 +655,18 @@
 
 	// Initialize the database.
 	label := *host
-	modTasks, err := pubsub.NewModifiedTasks(*pubsubTopicTasks, label, tokenSource)
+	mod, err := pubsub.NewModifiedData(*pubsubTopicSet, label, tokenSource)
 	if err != nil {
 		sklog.Fatal(err)
 	}
-	modTasks = modified.NewMuxModifiedTasks(&modified.ModifiedTasksImpl{}, modTasks)
-	modJobs, err := pubsub.NewModifiedJobs(*pubsubTopicJobs, label, tokenSource)
-	if err != nil {
-		sklog.Fatal(err)
-	}
-	modJobs = modified.NewMuxModifiedJobs(&modified.ModifiedJobsImpl{}, modJobs)
+	mod = modified.NewMuxModifiedData(modified.NewModifiedData(), mod)
 	if *firestoreInstance != "" {
-		tsDb, err = firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, tokenSource, modTasks, modJobs)
+		tsDb, err = firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *firestoreInstance, tokenSource, mod)
 		if err != nil {
 			sklog.Fatalf("Failed to create Firestore DB client: %s", err)
 		}
 	} else {
-		tsDb, err = local_db.NewDB(local_db.DB_NAME, path.Join(wdAbs, local_db.DB_FILENAME), modTasks, modJobs)
+		tsDb, err = local_db.NewDB(local_db.DB_NAME, path.Join(wdAbs, local_db.DB_FILENAME), mod)
 		if err != nil {
 			sklog.Fatal(err)
 		}
diff --git a/task_scheduler/go/tryjobs/testutils.go b/task_scheduler/go/tryjobs/testutils.go
index ae3a409..4f29a5f 100644
--- a/task_scheduler/go/tryjobs/testutils.go
+++ b/task_scheduler/go/tryjobs/testutils.go
@@ -111,7 +111,7 @@
 	assert.NoError(t, err)
 	taskCfgCache, err := specs.NewTaskCfgCache(ctx, rm, depot_tools_testutils.GetDepotTools(t, ctx), path.Join(tmpDir, "cache"), specs.DEFAULT_NUM_WORKERS)
 	assert.NoError(t, err)
-	d, err := local_db.NewDB("tasks_db", path.Join(tmpDir, "tasks.db"), nil, nil)
+	d, err := local_db.NewDB("tasks_db", path.Join(tmpDir, "tasks.db"), nil)
 	assert.NoError(t, err)
 	mock := mockhttpclient.NewURLMock()
 	projectRepoMapping := map[string]string{
diff --git a/task_scheduler/go/types/comments.go b/task_scheduler/go/types/comments.go
index d0af194..05302c7 100644
--- a/task_scheduler/go/types/comments.go
+++ b/task_scheduler/go/types/comments.go
@@ -6,6 +6,7 @@
 	"time"
 
 	"go.skia.org/infra/go/sklog"
+	"go.skia.org/infra/go/util"
 )
 
 // TaskComment contains a comment about a Task. {Repo, Revision, Name,
@@ -18,13 +19,23 @@
 	// Timestamp is compared ignoring timezone. The timezone reflects User's
 	// location.
 	Timestamp time.Time `json:"time"`
-	TaskId    string    `json:"taskId"`
-	User      string    `json:"user"`
-	Message   string    `json:"message"`
+	TaskId    string    `json:"taskId,omitempty"`
+	User      string    `json:"user,omitempty"`
+	Message   string    `json:"message,omitempty"`
+	Deleted   *bool     `json:"deleted,omitempty"`
 }
 
 func (c TaskComment) Copy() *TaskComment {
-	return &c
+	rv := &c
+	if c.Deleted != nil {
+		v := *c.Deleted
+		rv.Deleted = &v
+	}
+	return rv
+}
+
+func (c *TaskComment) Id() string {
+	return c.Repo + "#" + c.Revision + "#" + c.Name + "#" + c.Timestamp.Format(util.SAFE_TIMESTAMP_FORMAT)
 }
 
 // TaskSpecComment contains a comment about a TaskSpec. {Repo, Name, Timestamp}
@@ -35,14 +46,24 @@
 	// Timestamp is compared ignoring timezone. The timezone reflects User's
 	// location.
 	Timestamp     time.Time `json:"time"`
-	User          string    `json:"user"`
+	User          string    `json:"user,omitempty"`
 	Flaky         bool      `json:"flaky"`
 	IgnoreFailure bool      `json:"ignoreFailure"`
-	Message       string    `json:"message"`
+	Message       string    `json:"message,omitempty"`
+	Deleted       *bool     `json:"deleted,omitempty"`
 }
 
 func (c TaskSpecComment) Copy() *TaskSpecComment {
-	return &c
+	rv := &c
+	if c.Deleted != nil {
+		v := *c.Deleted
+		rv.Deleted = &v
+	}
+	return rv
+}
+
+func (c *TaskSpecComment) Id() string {
+	return c.Repo + "#" + c.Name + "#" + c.Timestamp.Format(util.SAFE_TIMESTAMP_FORMAT)
 }
 
 // CommitComment contains a comment about a commit. {Repo, Revision, Timestamp}
@@ -53,13 +74,23 @@
 	// Timestamp is compared ignoring timezone. The timezone reflects User's
 	// location.
 	Timestamp     time.Time `json:"time"`
-	User          string    `json:"user"`
+	User          string    `json:"user,omitempty"`
 	IgnoreFailure bool      `json:"ignoreFailure"`
-	Message       string    `json:"message"`
+	Message       string    `json:"message,omitempty"`
+	Deleted       *bool     `json:"deleted,omitempty"`
 }
 
 func (c CommitComment) Copy() *CommitComment {
-	return &c
+	rv := &c
+	if c.Deleted != nil {
+		v := *c.Deleted
+		rv.Deleted = &v
+	}
+	return rv
+}
+
+func (c *CommitComment) Id() string {
+	return c.Repo + "#" + c.Revision + "#" + c.Timestamp.Format(util.SAFE_TIMESTAMP_FORMAT)
 }
 
 // RepoComments contains comments that all pertain to the same repository.
@@ -90,3 +121,199 @@
 	}
 	return &copy
 }
+
+// TaskCommentSlice implements sort.Interface. To sort taskComments
+// []*TaskComment, use sort.Sort(TaskCommentSlice(taskComments)).
+type TaskCommentSlice []*TaskComment
+
+func (s TaskCommentSlice) Len() int { return len(s) }
+
+func (s TaskCommentSlice) Less(i, j int) bool {
+	return s[i].Timestamp.Before(s[j].Timestamp)
+}
+
+func (s TaskCommentSlice) Swap(i, j int) {
+	s[i], s[j] = s[j], s[i]
+}
+
+// TaskSpecCommentSlice implements sort.Interface. To sort taskSpecComments
+// []*TaskSpecComment, use sort.Sort(TaskSpecCommentSlice(taskSpecComments)).
+type TaskSpecCommentSlice []*TaskSpecComment
+
+func (s TaskSpecCommentSlice) Len() int { return len(s) }
+
+func (s TaskSpecCommentSlice) Less(i, j int) bool {
+	return s[i].Timestamp.Before(s[j].Timestamp)
+}
+
+func (s TaskSpecCommentSlice) Swap(i, j int) {
+	s[i], s[j] = s[j], s[i]
+}
+
+// CommitCommentSlice implements sort.Interface. To sort commitComments
+// []*CommitComment, use sort.Sort(CommitCommentSlice(commitComments)).
+type CommitCommentSlice []*CommitComment
+
+func (s CommitCommentSlice) Len() int { return len(s) }
+
+func (s CommitCommentSlice) Less(i, j int) bool {
+	return s[i].Timestamp.Before(s[j].Timestamp)
+}
+
+func (s CommitCommentSlice) Swap(i, j int) {
+	s[i], s[j] = s[j], s[i]
+}
+
+// TaskCommentEncoder encodes TaskComments into bytes via GOB encoding. Not
+// safe for concurrent use.
+type TaskCommentEncoder struct {
+	util.GobEncoder
+}
+
+// Next returns one of the TaskComments provided to Process (in arbitrary order)
+// and its serialized bytes. If any comments remain, returns the TaskComment,
+// the serialized bytes, nil. If all comments have been returned, returns nil,
+// nil, nil. If an error is encountered, returns nil, nil, error.
+func (e *TaskCommentEncoder) Next() (*TaskComment, []byte, error) {
+	item, serialized, err := e.GobEncoder.Next()
+	if err != nil {
+		return nil, nil, err
+	} else if item == nil {
+		return nil, nil, nil
+	}
+	return item.(*TaskComment), serialized, err
+}
+
+// TaskCommentDecoder decodes bytes into TaskComments via GOB decoding. Not safe
+// for concurrent use.
+type TaskCommentDecoder struct {
+	*util.GobDecoder
+}
+
+// NewTaskCommentDecoder returns a TaskCommentDecoder instance.
+func NewTaskCommentDecoder() *TaskCommentDecoder {
+	return &TaskCommentDecoder{
+		GobDecoder: util.NewGobDecoder(func() interface{} {
+			return &TaskComment{}
+		}, func(ch <-chan interface{}) interface{} {
+			items := []*TaskComment{}
+			for item := range ch {
+				items = append(items, item.(*TaskComment))
+			}
+			return items
+		}),
+	}
+}
+
+// Result returns all decoded TaskComments provided to Process (in arbitrary
+// order), or any error encountered.
+func (d *TaskCommentDecoder) Result() ([]*TaskComment, error) {
+	res, err := d.GobDecoder.Result()
+	if err != nil {
+		return nil, err
+	}
+	return res.([]*TaskComment), nil
+}
+
+// TaskSpecCommentEncoder encodes TaskSpecComments into bytes via GOB encoding.
+// Not safe for concurrent use.
+type TaskSpecCommentEncoder struct {
+	util.GobEncoder
+}
+
+// Next returns one of the TaskSpecComments provided to Process (in arbitrary
+// order) and its serialized bytes. If any comments remain, returns the
+// TaskSpecComment, the serialized bytes, nil. If all comments have been
+// returned, returns nil, nil, nil. If an error is encountered, returns nil,
+// nil, error.
+func (e *TaskSpecCommentEncoder) Next() (*TaskSpecComment, []byte, error) {
+	item, serialized, err := e.GobEncoder.Next()
+	if err != nil {
+		return nil, nil, err
+	} else if item == nil {
+		return nil, nil, nil
+	}
+	return item.(*TaskSpecComment), serialized, err
+}
+
+// TaskSpecCommentDecoder decodes bytes into TaskSpecComments via GOB decoding.
+// Not safe for concurrent use.
+type TaskSpecCommentDecoder struct {
+	*util.GobDecoder
+}
+
+// NewTaskSpecCommentDecoder returns a TaskSpecCommentDecoder instance.
+func NewTaskSpecCommentDecoder() *TaskSpecCommentDecoder {
+	return &TaskSpecCommentDecoder{
+		GobDecoder: util.NewGobDecoder(func() interface{} {
+			return &TaskSpecComment{}
+		}, func(ch <-chan interface{}) interface{} {
+			items := []*TaskSpecComment{}
+			for item := range ch {
+				items = append(items, item.(*TaskSpecComment))
+			}
+			return items
+		}),
+	}
+}
+
+// Result returns all decoded TaskSpecComments provided to Process (in arbitrary
+// order), or any error encountered.
+func (d *TaskSpecCommentDecoder) Result() ([]*TaskSpecComment, error) {
+	res, err := d.GobDecoder.Result()
+	if err != nil {
+		return nil, err
+	}
+	return res.([]*TaskSpecComment), nil
+}
+
+// CommitCommentEncoder encodes CommitComments into bytes via GOB encoding. Not
+// safe for concurrent use.
+type CommitCommentEncoder struct {
+	util.GobEncoder
+}
+
+// Next returns one of the CommitComments provided to Process (in arbitrary
+// order) and its serialized bytes. If any comments remain, returns the
+// CommitComment, the serialized bytes, nil. If all comments have been returned,
+// returns nil, nil, nil. If an error is encountered, returns nil, nil, error.
+func (e *CommitCommentEncoder) Next() (*CommitComment, []byte, error) {
+	item, serialized, err := e.GobEncoder.Next()
+	if err != nil {
+		return nil, nil, err
+	} else if item == nil {
+		return nil, nil, nil
+	}
+	return item.(*CommitComment), serialized, err
+}
+
+// CommitCommentDecoder decodes bytes into CommitComments via GOB decoding.
+// Not safe for concurrent use.
+type CommitCommentDecoder struct {
+	*util.GobDecoder
+}
+
+// NewCommitCommentDecoder returns a CommitCommentDecoder instance.
+func NewCommitCommentDecoder() *CommitCommentDecoder {
+	return &CommitCommentDecoder{
+		GobDecoder: util.NewGobDecoder(func() interface{} {
+			return &CommitComment{}
+		}, func(ch <-chan interface{}) interface{} {
+			items := []*CommitComment{}
+			for item := range ch {
+				items = append(items, item.(*CommitComment))
+			}
+			return items
+		}),
+	}
+}
+
+// Result returns all decoded CommitComments provided to Process (in arbitrary
+// order), or any error encountered.
+func (d *CommitCommentDecoder) Result() ([]*CommitComment, error) {
+	res, err := d.GobDecoder.Result()
+	if err != nil {
+		return nil, err
+	}
+	return res.([]*CommitComment), nil
+}
diff --git a/task_scheduler/go/types/comments_test.go b/task_scheduler/go/types/comments_test.go
index 19fb7d1..ccf6708 100644
--- a/task_scheduler/go/types/comments_test.go
+++ b/task_scheduler/go/types/comments_test.go
@@ -11,6 +11,8 @@
 func TestCopyTaskComment(t *testing.T) {
 	testutils.SmallTest(t)
 	v := MakeTaskComment(1, 1, 1, 1, time.Now())
+	deleted := true
+	v.Deleted = &deleted
 	deepequal.AssertCopy(t, v, v.Copy())
 }
 
@@ -19,6 +21,8 @@
 	v := MakeTaskSpecComment(1, 1, 1, time.Now())
 	v.Flaky = true
 	v.IgnoreFailure = true
+	deleted := true
+	v.Deleted = &deleted
 	deepequal.AssertCopy(t, v, v.Copy())
 }
 
@@ -26,6 +30,8 @@
 	testutils.SmallTest(t)
 	v := MakeCommitComment(1, 1, 1, time.Now())
 	v.IgnoreFailure = true
+	deleted := true
+	v.Deleted = &deleted
 	deepequal.AssertCopy(t, v, v.Copy())
 }
 
diff --git a/task_scheduler/go/types/job.go b/task_scheduler/go/types/job.go
index b4bd6ca..1dac7ad 100644
--- a/task_scheduler/go/types/job.go
+++ b/task_scheduler/go/types/job.go
@@ -1,13 +1,11 @@
 package types
 
 import (
-	"bytes"
-	"encoding/gob"
 	"fmt"
-	"sync"
 	"time"
 
 	"go.skia.org/infra/go/sklog"
+	"go.skia.org/infra/go/util"
 )
 
 const (
@@ -308,29 +306,7 @@
 // concurrent use.
 // TODO(benjaminwagner): Encode in parallel.
 type JobEncoder struct {
-	err    error
-	jobs   []*Job
-	result [][]byte
-}
-
-// Process encodes the Job into a byte slice that will be returned from Next()
-// (in arbitrary order). Returns false if Next is certain to return an error.
-// Caller must ensure j does not change until after the first call to Next().
-// May not be called after calling Next().
-func (e *JobEncoder) Process(j *Job) bool {
-	if e.err != nil {
-		return false
-	}
-	var buf bytes.Buffer
-	if err := gob.NewEncoder(&buf).Encode(j); err != nil {
-		e.err = err
-		e.jobs = nil
-		e.result = nil
-		return false
-	}
-	e.jobs = append(e.jobs, j)
-	e.result = append(e.result, buf.Bytes())
-	return true
+	util.GobEncoder
 }
 
 // Next returns one of the Jobs provided to Process (in arbitrary order) and
@@ -338,113 +314,42 @@
 // bytes, nil. If all jobs have been returned, returns nil, nil, nil. If an
 // error is encountered, returns nil, nil, error.
 func (e *JobEncoder) Next() (*Job, []byte, error) {
-	if e.err != nil {
-		return nil, nil, e.err
-	}
-	if len(e.jobs) == 0 {
+	item, serialized, err := e.GobEncoder.Next()
+	if err != nil {
+		return nil, nil, err
+	} else if item == nil {
 		return nil, nil, nil
 	}
-	j := e.jobs[0]
-	e.jobs = e.jobs[1:]
-	serialized := e.result[0]
-	e.result = e.result[1:]
-	return j, serialized, nil
+	return item.(*Job), serialized, nil
 }
 
 // JobDecoder decodes bytes into Jobs via GOB decoding. Not safe for
 // concurrent use.
 type JobDecoder struct {
-	// input contains the incoming byte slices. Process() sends on this channel,
-	// decode() receives from it, and Result() closes it.
-	input chan []byte
-	// output contains decoded Jobs. decode() sends on this channel, collect()
-	// receives from it, and run() closes it when all decode() goroutines have
-	// finished.
-	output chan *Job
-	// result contains the return value of Result(). collect() sends a single
-	// value on this channel and closes it. Result() receives from it.
-	result chan []*Job
-	// errors contains the first error from any goroutine. It's a channel in case
-	// multiple goroutines experience an error at the same time.
-	errors chan error
+	*util.GobDecoder
 }
 
-// init initializes d if it has not been initialized. May not be called concurrently.
-func (d *JobDecoder) init() {
-	if d.input == nil {
-		d.input = make(chan []byte, kNumDecoderGoroutines*2)
-		d.output = make(chan *Job, kNumDecoderGoroutines)
-		d.result = make(chan []*Job, 1)
-		d.errors = make(chan error, kNumDecoderGoroutines)
-		go d.run()
-		go d.collect()
+// NewJobDecoder returns a JobDecoder instance.
+func NewJobDecoder() *JobDecoder {
+	return &JobDecoder{
+		GobDecoder: util.NewGobDecoder(func() interface{} {
+			return &Job{}
+		}, func(ch <-chan interface{}) interface{} {
+			items := []*Job{}
+			for item := range ch {
+				items = append(items, item.(*Job))
+			}
+			return items
+		}),
 	}
 }
 
-// run starts the decode goroutines and closes d.output when they finish.
-func (d *JobDecoder) run() {
-	// Start decoders.
-	wg := sync.WaitGroup{}
-	for i := 0; i < kNumDecoderGoroutines; i++ {
-		wg.Add(1)
-		go d.decode(&wg)
-	}
-	// Wait for decoders to exit.
-	wg.Wait()
-	// Drain d.input in the case that errors were encountered, to avoid deadlock.
-	for range d.input {
-	}
-	close(d.output)
-}
-
-// decode receives from d.input and sends to d.output until d.input is closed or
-// d.errors is non-empty. Decrements wg when done.
-func (d *JobDecoder) decode(wg *sync.WaitGroup) {
-	for b := range d.input {
-		var j Job
-		if err := gob.NewDecoder(bytes.NewReader(b)).Decode(&j); err != nil {
-			d.errors <- err
-			break
-		}
-		d.output <- &j
-		if len(d.errors) > 0 {
-			break
-		}
-	}
-	wg.Done()
-}
-
-// collect receives from d.output until it is closed, then sends on d.result.
-func (d *JobDecoder) collect() {
-	result := []*Job{}
-	for j := range d.output {
-		result = append(result, j)
-	}
-	d.result <- result
-	close(d.result)
-}
-
-// Process decodes the byte slice into a Job and includes it in Result() (in
-// arbitrary order). Returns false if Result is certain to return an error.
-// Caller must ensure b does not change until after Result() returns.
-func (d *JobDecoder) Process(b []byte) bool {
-	d.init()
-	d.input <- b
-	return len(d.errors) == 0
-}
-
 // Result returns all decoded Jobs provided to Process (in arbitrary order), or
 // any error encountered.
 func (d *JobDecoder) Result() ([]*Job, error) {
-	// Allow JobDecoder to be used without initialization.
-	if d.result == nil {
-		return []*Job{}, nil
-	}
-	close(d.input)
-	select {
-	case err := <-d.errors:
+	res, err := d.GobDecoder.Result()
+	if err != nil {
 		return nil, err
-	case result := <-d.result:
-		return result, nil
 	}
+	return res.([]*Job), nil
 }
diff --git a/task_scheduler/go/types/job_test.go b/task_scheduler/go/types/job_test.go
index 22f67e9..b2ecb32 100644
--- a/task_scheduler/go/types/job_test.go
+++ b/task_scheduler/go/types/job_test.go
@@ -82,7 +82,7 @@
 
 func TestJobDecoder(t *testing.T) {
 	testutils.SmallTest(t)
-	d := JobDecoder{}
+	d := NewJobDecoder()
 	expectedJobs := map[string]*Job{}
 	for i := 0; i < 250; i++ {
 		job := &Job{}
@@ -109,7 +109,7 @@
 
 func TestJobDecoderNoJobs(t *testing.T) {
 	testutils.SmallTest(t)
-	d := JobDecoder{}
+	d := NewJobDecoder()
 	result, err := d.Result()
 	assert.NoError(t, err)
 	assert.Equal(t, 0, len(result))
@@ -125,7 +125,7 @@
 	serialized := buf.Bytes()
 	invalid := append([]byte("Hi Mom!"), serialized...)
 
-	d := JobDecoder{}
+	d := NewJobDecoder()
 	// Process should return true before it encounters an invalid result.
 	assert.True(t, d.Process(serialized))
 	assert.True(t, d.Process(serialized))
diff --git a/task_scheduler/go/types/task.go b/task_scheduler/go/types/task.go
index e8dfed4..d49e394 100644
--- a/task_scheduler/go/types/task.go
+++ b/task_scheduler/go/types/task.go
@@ -1,14 +1,11 @@
 package types
 
 import (
-	"bytes"
-	"encoding/gob"
 	"fmt"
 	"reflect"
 	"sort"
 	"strconv"
 	"strings"
-	"sync"
 	"time"
 	"unicode/utf8"
 
@@ -447,31 +444,8 @@
 
 // TaskEncoder encodes Tasks into bytes via GOB encoding. Not safe for
 // concurrent use.
-// TODO(benjaminwagner): Encode in parallel.
 type TaskEncoder struct {
-	err    error
-	tasks  []*Task
-	result [][]byte
-}
-
-// Process encodes the Task into a byte slice that will be returned from Next()
-// (in arbitrary order). Returns false if Next is certain to return an error.
-// Caller must ensure t does not change until after the first call to Next().
-// May not be called after calling Next().
-func (e *TaskEncoder) Process(t *Task) bool {
-	if e.err != nil {
-		return false
-	}
-	var buf bytes.Buffer
-	if err := gob.NewEncoder(&buf).Encode(t); err != nil {
-		e.err = err
-		e.tasks = nil
-		e.result = nil
-		return false
-	}
-	e.tasks = append(e.tasks, t)
-	e.result = append(e.result, buf.Bytes())
-	return true
+	util.GobEncoder
 }
 
 // Next returns one of the Tasks provided to Process (in arbitrary order) and
@@ -479,117 +453,44 @@
 // bytes, nil. If all tasks have been returned, returns nil, nil, nil. If an
 // error is encountered, returns nil, nil, error.
 func (e *TaskEncoder) Next() (*Task, []byte, error) {
-	if e.err != nil {
-		return nil, nil, e.err
-	}
-	if len(e.tasks) == 0 {
+	item, serialized, err := e.GobEncoder.Next()
+	if err != nil {
+		return nil, nil, err
+	} else if item == nil {
 		return nil, nil, nil
 	}
-	t := e.tasks[0]
-	e.tasks = e.tasks[1:]
-	serialized := e.result[0]
-	e.result = e.result[1:]
-	return t, serialized, nil
+	return item.(*Task), serialized, nil
 }
 
 // TaskDecoder decodes bytes into Tasks via GOB decoding. Not safe for
 // concurrent use.
 type TaskDecoder struct {
-	// input contains the incoming byte slices. Process() sends on this channel,
-	// decode() receives from it, and Result() closes it.
-	input chan []byte
-	// output contains decoded Tasks. decode() sends on this channel, collect()
-	// receives from it, and run() closes it when all decode() goroutines have
-	// finished.
-	output chan *Task
-	// result contains the return value of Result(). collect() sends a single
-	// value on this channel and closes it. Result() receives from it.
-	result chan []*Task
-	// errors contains the first error from any goroutine. It's a channel in case
-	// multiple goroutines experience an error at the same time.
-	errors chan error
+	*util.GobDecoder
 }
 
-const kNumDecoderGoroutines = 10
-
-// init initializes d if it has not been initialized. May not be called concurrently.
-func (d *TaskDecoder) init() {
-	if d.input == nil {
-		d.input = make(chan []byte, kNumDecoderGoroutines*2)
-		d.output = make(chan *Task, kNumDecoderGoroutines)
-		d.result = make(chan []*Task, 1)
-		d.errors = make(chan error, kNumDecoderGoroutines)
-		go d.run()
-		go d.collect()
+// NewTaskDecoder returns a TaskDecoder instance.
+func NewTaskDecoder() *TaskDecoder {
+	return &TaskDecoder{
+		GobDecoder: util.NewGobDecoder(func() interface{} {
+			return &Task{}
+		}, func(ch <-chan interface{}) interface{} {
+			items := []*Task{}
+			for item := range ch {
+				items = append(items, item.(*Task))
+			}
+			return items
+		}),
 	}
 }
 
-// run starts the decode goroutines and closes d.output when they finish.
-func (d *TaskDecoder) run() {
-	// Start decoders.
-	wg := sync.WaitGroup{}
-	for i := 0; i < kNumDecoderGoroutines; i++ {
-		wg.Add(1)
-		go d.decode(&wg)
-	}
-	// Wait for decoders to exit.
-	wg.Wait()
-	// Drain d.input in the case that errors were encountered, to avoid deadlock.
-	for range d.input {
-	}
-	close(d.output)
-}
-
-// decode receives from d.input and sends to d.output until d.input is closed or
-// d.errors is non-empty. Decrements wg when done.
-func (d *TaskDecoder) decode(wg *sync.WaitGroup) {
-	for b := range d.input {
-		var t Task
-		if err := gob.NewDecoder(bytes.NewReader(b)).Decode(&t); err != nil {
-			d.errors <- err
-			break
-		}
-		d.output <- &t
-		if len(d.errors) > 0 {
-			break
-		}
-	}
-	wg.Done()
-}
-
-// collect receives from d.output until it is closed, then sends on d.result.
-func (d *TaskDecoder) collect() {
-	result := []*Task{}
-	for t := range d.output {
-		result = append(result, t)
-	}
-	d.result <- result
-	close(d.result)
-}
-
-// Process decodes the byte slice into a Task and includes it in Result() (in
-// arbitrary order). Returns false if Result is certain to return an error.
-// Caller must ensure b does not change until after Result() returns.
-func (d *TaskDecoder) Process(b []byte) bool {
-	d.init()
-	d.input <- b
-	return len(d.errors) == 0
-}
-
 // Result returns all decoded Tasks provided to Process (in arbitrary order), or
 // any error encountered.
 func (d *TaskDecoder) Result() ([]*Task, error) {
-	// Allow TaskDecoder to be used without initialization.
-	if d.result == nil {
-		return []*Task{}, nil
-	}
-	close(d.input)
-	select {
-	case err := <-d.errors:
+	res, err := d.GobDecoder.Result()
+	if err != nil {
 		return nil, err
-	case result := <-d.result:
-		return result, nil
 	}
+	return res.([]*Task), nil
 }
 
 // TagsForTask returns the tags which should be set for a Task.
diff --git a/task_scheduler/go/types/task_test.go b/task_scheduler/go/types/task_test.go
index e6bcbe4..c1e53fd 100644
--- a/task_scheduler/go/types/task_test.go
+++ b/task_scheduler/go/types/task_test.go
@@ -590,7 +590,7 @@
 
 func TestTaskDecoder(t *testing.T) {
 	testutils.SmallTest(t)
-	d := TaskDecoder{}
+	d := NewTaskDecoder()
 	expectedTasks := map[string]*Task{}
 	for i := 0; i < 250; i++ {
 		task := &Task{}
@@ -617,7 +617,7 @@
 
 func TestTaskDecoderNoTasks(t *testing.T) {
 	testutils.SmallTest(t)
-	d := TaskDecoder{}
+	d := NewTaskDecoder()
 	result, err := d.Result()
 	assert.NoError(t, err)
 	assert.Equal(t, 0, len(result))
@@ -633,7 +633,7 @@
 	serialized := buf.Bytes()
 	invalid := append([]byte("Hi Mom!"), serialized...)
 
-	d := TaskDecoder{}
+	d := NewTaskDecoder()
 	// Process should return true before it encounters an invalid result.
 	assert.True(t, d.Process(serialized))
 	assert.True(t, d.Process(serialized))
diff --git a/task_scheduler/sys/task-scheduler-internal.service b/task_scheduler/sys/task-scheduler-internal.service
index 24724fe..e13856a 100644
--- a/task_scheduler/sys/task-scheduler-internal.service
+++ b/task_scheduler/sys/task-scheduler-internal.service
@@ -11,8 +11,7 @@
     --isolate_server=https://chrome-isolated.appspot.com \
     --logtostderr \
     --pool=SkiaInternal \
-    --pubsub_topic_tasks=task-scheduler-modified-tasks-internal \
-    --pubsub_topic_jobs=task-scheduler-modified-jobs-internal \
+    --pubsub_topic_set=internal \
     --pubsub_topic=swarming-tasks-internal \
     --pubsub_subscriber=task-scheduler-internal \
     --repo=https://skia.googlesource.com/internal_test.git \
diff --git a/task_scheduler/sys/task-scheduler-staging.service b/task_scheduler/sys/task-scheduler-staging.service
index 51e7b30..a322539 100644
--- a/task_scheduler/sys/task-scheduler-staging.service
+++ b/task_scheduler/sys/task-scheduler-staging.service
@@ -12,8 +12,7 @@
     --isolate_server=https://isolateserver-dev.appspot.com \
     --logtostderr \
     --pool=Skia \
-    --pubsub_topic_tasks=task-scheduler-modified-tasks-staging \
-    --pubsub_topic_jobs=task-scheduler-modified-jobs-staging \
+    --pubsub_topic_set=staging \
     --pubsub_topic=swarming-tasks-staging \
     --pubsub_subscriber=task-scheduler-staging \
     --repo=https://skia.googlesource.com/skiabot-test.git \
diff --git a/task_scheduler/sys/task-scheduler.service b/task_scheduler/sys/task-scheduler.service
index 4ba663e..782014c 100644
--- a/task_scheduler/sys/task-scheduler.service
+++ b/task_scheduler/sys/task-scheduler.service
@@ -8,8 +8,7 @@
 ExecStart=/usr/local/bin/task_scheduler \
     --host=task-scheduler.skia.org \
     --logtostderr \
-    --pubsub_topic_tasks=task-scheduler-modified-tasks \
-    --pubsub_topic_jobs=task-scheduler-modified-jobs \
+    --pubsub_topic_set=prod \
     --workdir=/mnt/pd0/task_scheduler_workdir \
     --resources_dir=/usr/local/share/task-scheduler/
 Restart=always