[task scheduler] Add opt-in support for task deduplication in Swarming
Bug: skia:8032
Change-Id: I446d3f464f43a160a2799a0fb62006aed30e4c56
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/220976
Reviewed-by: Ben Wagner aka dogben <benjaminwagner@google.com>
Commit-Queue: Eric Boren <borenet@google.com>
diff --git a/task_scheduler/go/scheduling/task_candidate.go b/task_scheduler/go/scheduling/task_candidate.go
index d90afc5..e4c30b6 100644
--- a/task_scheduler/go/scheduling/task_candidate.go
+++ b/task_scheduler/go/scheduling/task_candidate.go
@@ -293,7 +293,7 @@
EnvPrefixes: envPrefixes,
ExecutionTimeoutSecs: executionTimeoutSecs,
ExtraArgs: extraArgs,
- Idempotent: false,
+ Idempotent: c.TaskSpec.Idempotent,
InputsRef: &swarming_api.SwarmingRpcsFilesRef{
Isolated: c.IsolatedInput,
Isolatedserver: isolateServer,
diff --git a/task_scheduler/go/scheduling/task_scheduler.go b/task_scheduler/go/scheduling/task_scheduler.go
index a927d37..88affae 100644
--- a/task_scheduler/go/scheduling/task_scheduler.go
+++ b/task_scheduler/go/scheduling/task_scheduler.go
@@ -1339,6 +1339,13 @@
}
t.Created = created
t.SwarmingTaskId = resp.TaskId
+ // The task may have been de-duplicated.
+ if resp.TaskResult != nil && resp.TaskResult.State == swarming.TASK_STATE_COMPLETED {
+ if _, err := t.UpdateFromSwarming(resp.TaskResult); err != nil {
+ recordErr("Failed to update de-duplicated task", err)
+ return
+ }
+ }
triggered <- t
}(candidate)
}
diff --git a/task_scheduler/go/scheduling/task_scheduler_test.go b/task_scheduler/go/scheduling/task_scheduler_test.go
index e697211..5ef1919 100644
--- a/task_scheduler/go/scheduling/task_scheduler_test.go
+++ b/task_scheduler/go/scheduling/task_scheduler_test.go
@@ -3836,3 +3836,63 @@
// Should be one task that failed
assert.Equal(t, 1, failedIsolate)
}
+
+func TestTriggerTaskDeduped(t *testing.T) {
+ // Verify that we properly handle de-duplicated tasks.
+ ctx, gb, _, s, swarmingClient, commits, _, cleanup := testMultipleCandidatesBackfillingEachOtherSetup(t)
+ defer cleanup()
+
+ // Trigger three tasks. We should attempt to trigger tasks at
+ // commits[0], commits[4], and either commits[2] or commits[6]. Mock
+ // deduplication of the task at commits[4] and ensure that the other
+ // two tasks are not deduped.
+ bot1 := makeBot("bot1", map[string]string{"pool": "Skia"})
+ bot2 := makeBot("bot2", map[string]string{"pool": "Skia"})
+ bot3 := makeBot("bot3", map[string]string{"pool": "Skia"})
+ makeTags := func(commit string) []string {
+ return []string{
+ "luci_project:",
+ "milo_host:https://ci.chromium.org/raw/build/%s",
+ "sk_attempt:0",
+ "sk_dim_pool:Skia",
+ "sk_retry_of:",
+ fmt.Sprintf("source_revision:%s", commit),
+ fmt.Sprintf("source_repo:%s/+/%%s", gb.RepoUrl()),
+ fmt.Sprintf("sk_repo:%s", gb.RepoUrl()),
+ fmt.Sprintf("sk_revision:%s", commit),
+ "sk_forced_job_id:",
+ "sk_name:dummytask",
+ }
+ }
+ swarmingClient.MockBots([]*swarming_api.SwarmingRpcsBotInfo{bot1, bot2, bot3})
+ swarmingClient.MockTriggerTaskDeduped(makeTags(commits[4]))
+ assert.NoError(t, s.MainLoop(ctx))
+ s.testWaitGroup.Wait()
+ assert.NoError(t, s.tCache.Update())
+ assert.Equal(t, 5, len(s.queue))
+ tasks, err := s.tCache.GetTasksForCommits(gb.RepoUrl(), commits)
+ assert.NoError(t, err)
+
+ var t1, t2, t3 *types.Task
+ for _, byName := range tasks {
+ for _, task := range byName {
+ if task.Revision == commits[0] {
+ t1 = task
+ } else if task.Revision == commits[4] {
+ t2 = task
+ } else if task.Revision == commits[2] || task.Revision == commits[6] {
+ t3 = task
+ } else {
+ assert.FailNow(t, fmt.Sprintf("Task has unknown revision: %v", task))
+ }
+ }
+ }
+ assert.NotNil(t, t1)
+ assert.NotNil(t, t2)
+ assert.NotNil(t, t3)
+
+ // Ensure that t2 was correctly deduped, and the others weren't.
+ assert.Equal(t, types.TASK_STATUS_PENDING, t1.Status)
+ assert.Equal(t, types.TASK_STATUS_SUCCESS, t2.Status)
+ assert.Equal(t, types.TASK_STATUS_PENDING, t3.Status)
+}
diff --git a/task_scheduler/go/specs/specs.go b/task_scheduler/go/specs/specs.go
index 43beab1..5bc0669 100644
--- a/task_scheduler/go/specs/specs.go
+++ b/task_scheduler/go/specs/specs.go
@@ -213,6 +213,11 @@
// ExtraTags are extra tags to add to the Swarming task.
ExtraTags map[string]string `json:"extra_tags,omitempty"`
+ // Idempotent indicates that triggering this task with the same
+ // parameters as previously triggered has no side effect and thus the
+ // task may be de-duplicated.
+ Idempotent bool `json:"idempotent,omitempty"`
+
// IoTimeout is the maximum amount of time which the task may take to
// communicate with the server.
IoTimeout time.Duration `json:"io_timeout_ns,omitempty"`
@@ -308,6 +313,7 @@
Expiration: t.Expiration,
ExtraArgs: extraArgs,
ExtraTags: extraTags,
+ Idempotent: t.Idempotent,
IoTimeout: t.IoTimeout,
Isolate: t.Isolate,
MaxAttempts: t.MaxAttempts,
diff --git a/task_scheduler/go/specs/specs_test.go b/task_scheduler/go/specs/specs_test.go
index e519bb0..25e01f5 100644
--- a/task_scheduler/go/specs/specs_test.go
+++ b/task_scheduler/go/specs/specs_test.go
@@ -41,6 +41,7 @@
ExtraTags: map[string]string{
"dummy_tag": "dummy_val",
},
+ Idempotent: true,
IoTimeout: 10 * time.Minute,
Isolate: "abc123",
MaxAttempts: 5,
diff --git a/task_scheduler/go/testutils/testutils.go b/task_scheduler/go/testutils/testutils.go
index 5658ffa..99c5344 100644
--- a/task_scheduler/go/testutils/testutils.go
+++ b/task_scheduler/go/testutils/testutils.go
@@ -21,6 +21,7 @@
taskList []*swarming_api.SwarmingRpcsTaskRequestMetadata
taskListMtx sync.RWMutex
+ triggerDedupe map[string]bool
triggerFailure map[string]bool
triggerMtx sync.Mutex
}
@@ -29,6 +30,7 @@
return &TestClient{
botList: []*swarming_api.SwarmingRpcsBotInfo{},
taskList: []*swarming_api.SwarmingRpcsTaskRequestMetadata{},
+ triggerDedupe: map[string]bool{},
triggerFailure: map[string]bool{},
}
}
@@ -195,11 +197,16 @@
TaskResult: &swarming_api.SwarmingRpcsTaskResult{
CreatedTs: createdTs,
Name: t.Name,
- State: "PENDING",
+ State: swarming.TASK_STATE_PENDING,
TaskId: id,
Tags: t.Tags,
},
}
+ if c.triggerDedupe[md5] {
+ delete(c.triggerDedupe, md5)
+ rv.TaskResult.State = swarming.TASK_STATE_COMPLETED // No deduplicated state.
+ rv.TaskResult.DedupedFrom = uuid.New().String()
+ }
c.taskListMtx.Lock()
defer c.taskListMtx.Unlock()
c.taskList = append(c.taskList, rv)
@@ -270,3 +277,11 @@
defer c.triggerMtx.Unlock()
c.triggerFailure[md5Tags(tags)] = true
}
+
+// MockTriggerTaskDeduped forces the next call to TriggerTask which matches
+// the given tags to result in a deduplicated task.
+func (c *TestClient) MockTriggerTaskDeduped(tags []string) {
+ c.triggerMtx.Lock()
+ defer c.triggerMtx.Unlock()
+ c.triggerDedupe[md5Tags(tags)] = true
+}