[task scheduler] Better handling of Swarming pubsub messages

For tasks which have been triggered on Swarming but not yet inserted
into the DB, keep a map of pending task IDs and NACK pubsub messages for
those tasks. This replaces the previous logic which NACKed the message
if the task was triggered less then two minutes ago.

Bug: skia:
Change-Id: I1cf1436ee93fe84ce8a167e5fc84c6e25c214e5f
Reviewed-on: https://skia-review.googlesource.com/c/179525
Reviewed-by: Ben Wagner <benjaminwagner@google.com>
Commit-Queue: Eric Boren <borenet@google.com>
diff --git a/task_scheduler/go/scheduling/task_scheduler.go b/task_scheduler/go/scheduling/task_scheduler.go
index 3f0aa12..cb379b4 100644
--- a/task_scheduler/go/scheduling/task_scheduler.go
+++ b/task_scheduler/go/scheduling/task_scheduler.go
@@ -28,7 +28,6 @@
 	"go.skia.org/infra/task_scheduler/go/blacklist"
 	"go.skia.org/infra/task_scheduler/go/db"
 	"go.skia.org/infra/task_scheduler/go/db/cache"
-	"go.skia.org/infra/task_scheduler/go/db/local_db"
 	"go.skia.org/infra/task_scheduler/go/specs"
 	"go.skia.org/infra/task_scheduler/go/tryjobs"
 	"go.skia.org/infra/task_scheduler/go/types"
@@ -109,6 +108,9 @@
 	newTasks    map[types.RepoState]util.StringSet
 	newTasksMtx sync.RWMutex
 
+	pendingInsert    map[string]bool
+	pendingInsertMtx sync.RWMutex
+
 	periodicTriggers *periodic_triggers.Triggerer
 	pools            []string
 	pubsubTopic      string
@@ -174,6 +176,7 @@
 		jCache:           jCache,
 		newTasks:         map[types.RepoState]util.StringSet{},
 		newTasksMtx:      sync.RWMutex{},
+		pendingInsert:    map[string]bool{},
 		periodicTriggers: pt,
 		pools:            pools,
 		pubsubTopic:      pubsubTopic,
@@ -1052,12 +1055,18 @@
 				errCh <- fmt.Errorf("Failed to trigger task: %s", err)
 				return
 			}
+			s.pendingInsertMtx.Lock()
+			s.pendingInsert[t.Id] = true
+			s.pendingInsertMtx.Unlock()
 			var resp *swarming_api.SwarmingRpcsTaskRequestMetadata
 			if err := timeout.Run(func() error {
 				var err error
 				resp, err = s.swarming.TriggerTask(req)
 				return err
 			}, time.Minute); err != nil {
+				s.pendingInsertMtx.Lock()
+				delete(s.pendingInsert, t.Id)
+				s.pendingInsertMtx.Unlock()
 				errCh <- fmt.Errorf("Failed to trigger task: %s", err)
 				return
 			}
@@ -1123,6 +1132,17 @@
 		if err := s.AddTasks(ctx, insert); err != nil {
 			errs = append(errs, fmt.Errorf("Triggered tasks but failed to insert into DB: %s", err))
 		} else {
+			// Remove the tasks from the pending map.
+			s.pendingInsertMtx.Lock()
+			for _, byRepo := range insert {
+				for _, byName := range byRepo {
+					for _, t := range byName {
+						delete(s.pendingInsert, t.Id)
+					}
+				}
+			}
+			s.pendingInsertMtx.Unlock()
+
 			// Organize the triggered task by TaskKey.
 			remove := make(map[types.TaskKey]*types.Task, numTriggered)
 			for _, byRepo := range insert {
@@ -1940,12 +1960,16 @@
 			sklog.Errorf("Swarming Pub/Sub: Failed to retrieve task %q by ID: %s", msg.SwarmingTaskId, msg.UserData)
 			return true
 		} else if t == nil {
-			ts, _, err := local_db.ParseId(msg.UserData)
-			if err != nil {
-				sklog.Errorf("Failed to parse userdata as task ID: %s", err)
-				return true
-			} else if time.Now().Sub(ts) < 2*time.Minute {
-				sklog.Infof("Failed to update task %q from pub/sub: no such task ID: %q. Less than two minutes old; try again later.", msg.SwarmingTaskId, msg.UserData)
+			isPending := false
+			func() {
+				s.pendingInsertMtx.RLock()
+				defer s.pendingInsertMtx.RUnlock()
+				if s.pendingInsert[msg.UserData] {
+					isPending = true
+				}
+			}()
+			if isPending {
+				sklog.Debugf("Received pub/sub message for task which hasn't yet been inserted into the db: %s (%s); not ack'ing message; will try again later.", msg.SwarmingTaskId, msg.UserData)
 				return false
 			} else {
 				sklog.Errorf("Failed to update task %q from pub/sub: no such task ID: %q", msg.SwarmingTaskId, msg.UserData)