[task scheduler] Add opt-in support for task deduplication in Swarming

Bug: skia:8032
Change-Id: I446d3f464f43a160a2799a0fb62006aed30e4c56
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/220976
Reviewed-by: Ben Wagner aka dogben <benjaminwagner@google.com>
Commit-Queue: Eric Boren <borenet@google.com>
diff --git a/task_scheduler/go/scheduling/task_candidate.go b/task_scheduler/go/scheduling/task_candidate.go
index d90afc5..e4c30b6 100644
--- a/task_scheduler/go/scheduling/task_candidate.go
+++ b/task_scheduler/go/scheduling/task_candidate.go
@@ -293,7 +293,7 @@
 			EnvPrefixes:          envPrefixes,
 			ExecutionTimeoutSecs: executionTimeoutSecs,
 			ExtraArgs:            extraArgs,
-			Idempotent:           false,
+			Idempotent:           c.TaskSpec.Idempotent,
 			InputsRef: &swarming_api.SwarmingRpcsFilesRef{
 				Isolated:       c.IsolatedInput,
 				Isolatedserver: isolateServer,
diff --git a/task_scheduler/go/scheduling/task_scheduler.go b/task_scheduler/go/scheduling/task_scheduler.go
index a927d37..88affae 100644
--- a/task_scheduler/go/scheduling/task_scheduler.go
+++ b/task_scheduler/go/scheduling/task_scheduler.go
@@ -1339,6 +1339,13 @@
 			}
 			t.Created = created
 			t.SwarmingTaskId = resp.TaskId
+			// The task may have been de-duplicated.
+			if resp.TaskResult != nil && resp.TaskResult.State == swarming.TASK_STATE_COMPLETED {
+				if _, err := t.UpdateFromSwarming(resp.TaskResult); err != nil {
+					recordErr("Failed to update de-duplicated task", err)
+					return
+				}
+			}
 			triggered <- t
 		}(candidate)
 	}
diff --git a/task_scheduler/go/scheduling/task_scheduler_test.go b/task_scheduler/go/scheduling/task_scheduler_test.go
index e697211..5ef1919 100644
--- a/task_scheduler/go/scheduling/task_scheduler_test.go
+++ b/task_scheduler/go/scheduling/task_scheduler_test.go
@@ -3836,3 +3836,63 @@
 	// Should be one task that failed
 	assert.Equal(t, 1, failedIsolate)
 }
+
+func TestTriggerTaskDeduped(t *testing.T) {
+	// Verify that we properly handle de-duplicated tasks.
+	ctx, gb, _, s, swarmingClient, commits, _, cleanup := testMultipleCandidatesBackfillingEachOtherSetup(t)
+	defer cleanup()
+
+	// Trigger three tasks. We should attempt to trigger tasks at
+	// commits[0], commits[4], and either commits[2] or commits[6]. Mock
+	// deduplication of the task at commits[4] and ensure that the other
+	// two tasks are not deduped.
+	bot1 := makeBot("bot1", map[string]string{"pool": "Skia"})
+	bot2 := makeBot("bot2", map[string]string{"pool": "Skia"})
+	bot3 := makeBot("bot3", map[string]string{"pool": "Skia"})
+	makeTags := func(commit string) []string {
+		return []string{
+			"luci_project:",
+			"milo_host:https://ci.chromium.org/raw/build/%s",
+			"sk_attempt:0",
+			"sk_dim_pool:Skia",
+			"sk_retry_of:",
+			fmt.Sprintf("source_revision:%s", commit),
+			fmt.Sprintf("source_repo:%s/+/%%s", gb.RepoUrl()),
+			fmt.Sprintf("sk_repo:%s", gb.RepoUrl()),
+			fmt.Sprintf("sk_revision:%s", commit),
+			"sk_forced_job_id:",
+			"sk_name:dummytask",
+		}
+	}
+	swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{bot1, bot2, bot3})
+	swarmingClient.MockTriggerTaskDeduped(makeTags(commits[4]))
+	assert.NoError(t, s.MainLoop(ctx))
+	s.testWaitGroup.Wait()
+	assert.NoError(t, s.tCache.Update())
+	assert.Equal(t, 5, len(s.queue))
+	tasks, err := s.tCache.GetTasksForCommits(gb.RepoUrl(), commits)
+	assert.NoError(t, err)
+
+	var t1, t2, t3 *types.Task
+	for _, byName := range tasks {
+		for _, task := range byName {
+			if task.Revision == commits[0] {
+				t1 = task
+			} else if task.Revision == commits[4] {
+				t2 = task
+			} else if task.Revision == commits[2] || task.Revision == commits[6] {
+				t3 = task
+			} else {
+				assert.FailNow(t, fmt.Sprintf("Task has unknown revision: %v", task))
+			}
+		}
+	}
+	assert.NotNil(t, t1)
+	assert.NotNil(t, t2)
+	assert.NotNil(t, t3)
+
+	// Ensure that t2 was correctly deduped, and the others weren't.
+	assert.Equal(t, types.TASK_STATUS_PENDING, t1.Status)
+	assert.Equal(t, types.TASK_STATUS_SUCCESS, t2.Status)
+	assert.Equal(t, types.TASK_STATUS_PENDING, t3.Status)
+}
diff --git a/task_scheduler/go/specs/specs.go b/task_scheduler/go/specs/specs.go
index 43beab1..5bc0669 100644
--- a/task_scheduler/go/specs/specs.go
+++ b/task_scheduler/go/specs/specs.go
@@ -213,6 +213,11 @@
 	// ExtraTags are extra tags to add to the Swarming task.
 	ExtraTags map[string]string `json:"extra_tags,omitempty"`
 
+	// Idempotent indicates that triggering this task with the same
+	// parameters as previously triggered has no side effect and thus the
+	// task may be de-duplicated.
+	Idempotent bool `json:"idempotent,omitempty"`
+
 	// IoTimeout is the maximum amount of time which the task may take to
 	// communicate with the server.
 	IoTimeout time.Duration `json:"io_timeout_ns,omitempty"`
@@ -308,6 +313,7 @@
 		Expiration:       t.Expiration,
 		ExtraArgs:        extraArgs,
 		ExtraTags:        extraTags,
+		Idempotent:       t.Idempotent,
 		IoTimeout:        t.IoTimeout,
 		Isolate:          t.Isolate,
 		MaxAttempts:      t.MaxAttempts,
diff --git a/task_scheduler/go/specs/specs_test.go b/task_scheduler/go/specs/specs_test.go
index e519bb0..25e01f5 100644
--- a/task_scheduler/go/specs/specs_test.go
+++ b/task_scheduler/go/specs/specs_test.go
@@ -41,6 +41,7 @@
 		ExtraTags: map[string]string{
 			"dummy_tag": "dummy_val",
 		},
+		Idempotent:     true,
 		IoTimeout:      10 * time.Minute,
 		Isolate:        "abc123",
 		MaxAttempts:    5,
diff --git a/task_scheduler/go/testutils/testutils.go b/task_scheduler/go/testutils/testutils.go
index 5658ffa..99c5344 100644
--- a/task_scheduler/go/testutils/testutils.go
+++ b/task_scheduler/go/testutils/testutils.go
@@ -21,6 +21,7 @@
 	taskList    []*swarming_api.SwarmingRpcsTaskRequestMetadata
 	taskListMtx sync.RWMutex
 
+	triggerDedupe  map[string]bool
 	triggerFailure map[string]bool
 	triggerMtx     sync.Mutex
 }
@@ -29,6 +30,7 @@
 	return &TestClient{
 		botList:        []*swarming_api.SwarmingRpcsBotInfo{},
 		taskList:       []*swarming_api.SwarmingRpcsTaskRequestMetadata{},
+		triggerDedupe:  map[string]bool{},
 		triggerFailure: map[string]bool{},
 	}
 }
@@ -195,11 +197,16 @@
 		TaskResult: &swarming_api.SwarmingRpcsTaskResult{
 			CreatedTs: createdTs,
 			Name:      t.Name,
-			State:     "PENDING",
+			State:     swarming.TASK_STATE_PENDING,
 			TaskId:    id,
 			Tags:      t.Tags,
 		},
 	}
+	if c.triggerDedupe[md5] {
+		delete(c.triggerDedupe, md5)
+		rv.TaskResult.State = swarming.TASK_STATE_COMPLETED // No deduplicated state.
+		rv.TaskResult.DedupedFrom = uuid.New().String()
+	}
 	c.taskListMtx.Lock()
 	defer c.taskListMtx.Unlock()
 	c.taskList = append(c.taskList, rv)
@@ -270,3 +277,11 @@
 	defer c.triggerMtx.Unlock()
 	c.triggerFailure[md5Tags(tags)] = true
 }
+
+// MockTriggerTaskDeduped forces the next call to TriggerTask which matches
+// the given tags to result in a deduplicated task.
+func (c *TestClient) MockTriggerTaskDeduped(tags []string) {
+	c.triggerMtx.Lock()
+	defer c.triggerMtx.Unlock()
+	c.triggerDedupe[md5Tags(tags)] = true
+}