blob: 6356e88c83e0449c41b6bf902bca82a798d2c92d [file] [log] [blame]
package firestore
import (
"context"
"time"
fs "cloud.google.com/go/firestore"
"github.com/cenkalti/backoff"
"go.skia.org/infra/go/firestore"
"go.skia.org/infra/go/now"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/task_scheduler/go/types"
)
// The DbModified field is set before the document is actually inserted into the
// DB. We have to account for the lag between the timestamp being set and the
// actual modification in the DB when we query for modifications, or we may miss
// modifications.
const dbModifiedLag = DEFAULT_ATTEMPTS * (PUT_MULTI_TIMEOUT + firestore.BACKOFF_WAIT*time.Duration(2^DEFAULT_ATTEMPTS))
// modifiedCh is a helper function used by Modified* which runs
// firestore.QuerySnapshotChannel with a query by DbModified time. Starts a
// goroutine that runs until the given context is cancelled, at which point the
// returned channel is closed.
func modifiedCh(ctx context.Context, coll *fs.CollectionRef, field string) <-chan *fs.QuerySnapshot {
// Note: we could specify the wait intervals for the exponential
// backoff, but the defaults seem like a reasonable place to start.
backoffWait := backoff.NewExponentialBackOff()
outCh := make(chan *fs.QuerySnapshot)
go func() {
defer close(outCh)
lastSnapTime := now.Now(ctx)
// Loop and resume watching if the input channel is closed.
for {
// It's important that we recreate the query inside the
// loop, otherwise we may restart iteration with an old
// timestamp and generate a bunch of old results.
q := coll.Query.Where(field, ">=", lastSnapTime.Add(-dbModifiedLag))
for qsnap := range firestore.QuerySnapshotChannel(ctx, q) {
lastSnapTime = qsnap.ReadTime
backoffWait.Reset()
outCh <- qsnap
}
// Respect context cancellation.
if err := ctx.Err(); err != nil {
sklog.Warningf("%s while watching query.", err)
return
}
// If we reached this point, the QuerySnapshotIterator
// has stopped for some reason. Restart it after a brief
// wait in case it's repeatedly failing.
time.Sleep(backoffWait.NextBackOff())
}
}()
return outCh
}
// ModifiedTasksCh passes slices of Tasks along the returned channel as they are
// modified in the DB.
func (d *firestoreDB) ModifiedTasksCh(ctx context.Context) <-chan []*types.Task {
outCh := make(chan []*types.Task)
go func() {
defer close(outCh)
for snap := range modifiedCh(ctx, d.tasks(), KEY_DB_MODIFIED) {
tasks := make([]*types.Task, 0, len(snap.Changes))
for _, ch := range snap.Changes {
// We don't support deletion of tasks, but this
// is where we'd handle it if we did.
if ch.Kind != fs.DocumentRemoved {
var t types.Task
if err := ch.Doc.DataTo(&t); err != nil {
sklog.Errorf("Failed to decode Task: %+v", ch.Doc.Data())
continue
}
tasks = append(tasks, &t)
}
}
outCh <- tasks
}
}()
return outCh
}
// ModifiedJobsCh passes slices of Jobs along the returned channel as they are
// modified in the DB.
func (d *firestoreDB) ModifiedJobsCh(ctx context.Context) <-chan []*types.Job {
outCh := make(chan []*types.Job)
go func() {
defer close(outCh)
for snap := range modifiedCh(ctx, d.jobs(), KEY_DB_MODIFIED) {
jobs := make([]*types.Job, 0, len(snap.Changes))
for _, ch := range snap.Changes {
// We don't support deletion of jobs, but this
// is where we'd handle it if we did.
if ch.Kind != fs.DocumentRemoved {
var j types.Job
if err := ch.Doc.DataTo(&j); err != nil {
sklog.Errorf("Failed to decode Job: %+v", ch.Doc.Data())
continue
}
jobs = append(jobs, &j)
}
}
outCh <- jobs
}
}()
return outCh
}
// ModifiedTaskCommentsCh passes slices of TaskComments along the returned channel
// as they are modified in the DB.
func (d *firestoreDB) ModifiedTaskCommentsCh(ctx context.Context) <-chan []*types.TaskComment {
outCh := make(chan []*types.TaskComment)
go func() {
defer close(outCh)
for snap := range modifiedCh(ctx, d.taskComments(), KEY_TIMESTAMP) {
cs := make([]*types.TaskComment, 0, len(snap.Changes))
for _, ch := range snap.Changes {
var c types.TaskComment
if err := ch.Doc.DataTo(&c); err != nil {
sklog.Errorf("Failed to decode TaskComment: %+v", ch.Doc.Data())
continue
}
// If the comment was removed, set the Deleted
// field.
if ch.Kind == fs.DocumentRemoved {
deleted := true
c.Deleted = &deleted
}
cs = append(cs, &c)
}
outCh <- cs
}
}()
return outCh
}
// ModifiedTaskSpecCommentsCh passes slices of TaskSpecComments along the returned
// channel as they are modified in the DB.
func (d *firestoreDB) ModifiedTaskSpecCommentsCh(ctx context.Context) <-chan []*types.TaskSpecComment {
outCh := make(chan []*types.TaskSpecComment)
go func() {
defer close(outCh)
for snap := range modifiedCh(ctx, d.taskSpecComments(), KEY_TIMESTAMP) {
cs := make([]*types.TaskSpecComment, 0, len(snap.Changes))
for _, ch := range snap.Changes {
var c types.TaskSpecComment
if err := ch.Doc.DataTo(&c); err != nil {
sklog.Errorf("Failed to decode TaskSpecComment: %+v", ch.Doc.Data())
continue
}
// If the comment was removed, set the Deleted
// field.
if ch.Kind == fs.DocumentRemoved {
deleted := true
c.Deleted = &deleted
}
cs = append(cs, &c)
}
outCh <- cs
}
}()
return outCh
}
// ModifiedCommitCommentsCh passes slices of CommitComments along the returned
// channel as they are modified in the DB.
func (d *firestoreDB) ModifiedCommitCommentsCh(ctx context.Context) <-chan []*types.CommitComment {
outCh := make(chan []*types.CommitComment)
go func() {
defer close(outCh)
for snap := range modifiedCh(ctx, d.commitComments(), KEY_TIMESTAMP) {
cs := make([]*types.CommitComment, 0, len(snap.Changes))
for _, ch := range snap.Changes {
var c types.CommitComment
if err := ch.Doc.DataTo(&c); err != nil {
sklog.Errorf("Failed to decode CommitComment: %+v", ch.Doc.Data())
continue
}
// If the comment was removed, set the Deleted
// field.
if ch.Kind == fs.DocumentRemoved {
deleted := true
c.Deleted = &deleted
}
cs = append(cs, &c)
}
outCh <- cs
}
}()
return outCh
}