[datahopper] Include tryjob info in task duration bench results

Before this change, we are ingesting tryjob results as if they're master
results.

Change-Id: Ia4e5d7c14ed5bab8f778319da3a8c84029c697a9
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/270577
Reviewed-by: Kevin Lubick <kjlubick@google.com>
Commit-Queue: Ben Wagner aka dogben <benjaminwagner@google.com>
diff --git a/datahopper/go/swarming_metrics/tasks.go b/datahopper/go/swarming_metrics/tasks.go
index 5213e75..7095330 100644
--- a/datahopper/go/swarming_metrics/tasks.go
+++ b/datahopper/go/swarming_metrics/tasks.go
@@ -10,7 +10,6 @@
 	"encoding/gob"
 	"fmt"
 	"strconv"
-	"strings"
 	"time"
 
 	swarming_api "go.chromium.org/luci/common/api/swarming/swarming/v1"
@@ -114,25 +113,37 @@
 
 	// Pull taskName from tags, because the task name could be changed (e.g. retries)
 	// and that would make ParseTaskName not happy.
-	taskName := ""
-	taskRevision := ""
-	repo := ""
-	for _, tag := range t.Request.Tags {
-		if strings.HasPrefix(tag, "sk_revision") {
-			taskRevision = strings.SplitN(tag, ":", 2)[1]
-		}
-		if strings.HasPrefix(tag, "sk_name") {
-			taskName = strings.SplitN(tag, ":", 2)[1]
-		}
-		if strings.HasPrefix(tag, "sk_repo") {
-			repo = strings.SplitN(tag, ":", 2)[1]
-		}
+	tags, err := swarming.ParseTags(t.Request.Tags)
+	if err != nil {
+		sklog.Errorf("Can not parse tags for task %q: %s", t.TaskId, err)
+		return nil
 	}
+	getTag := func(key string) string {
+		vals := tags[key]
+		if len(vals) > 0 {
+			return vals[0]
+		}
+		return ""
+	}
+	taskName := getTag("sk_name")
+	taskRevision := getTag("sk_revision")
+	taskIssue := getTag("sk_issue")
+	taskPatchSet := getTag("sk_patchset")
+	taskPatchStorage := ""
+	if getTag("sk_issue_server") == "https://skia-review.googlesource.com" {
+		taskPatchStorage = "gerrit"
+	}
+	repo := getTag("sk_repo")
 	if repo != common.REPO_SKIA {
 		// The schema parser only supports the Skia repo, not, for example, the Infra repo
 		// which would also show up here.
 		return nil
 	}
+	if taskName == "" || taskRevision == "" {
+		sklog.Errorf("Task %q has sk_repo tag but not sk_name and sk_revision.", t.TaskId)
+		// If these tags are missing, there is no useful data.
+		return nil
+	}
 	parsed, err := tnp.ParseTaskName(taskName)
 	if err != nil {
 		sklog.Errorf("Could not parse task name of %s: %s", taskName, err)
@@ -162,11 +173,15 @@
 		},
 	}
 	toReport := ingestcommon.BenchData{
-		Hash: taskRevision,
-		Key:  parsed,
+		Hash:     taskRevision,
+		Issue:    taskIssue,
+		PatchSet: taskPatchSet,
+		Source:   "datahopper",
+		Key:      parsed,
 		Results: map[string]ingestcommon.BenchResults{
 			taskName: durations,
 		},
+		PatchStorage: taskPatchStorage,
 	}
 
 	sklog.Debugf("Reporting that %s had these durations: %#v ms", taskName, durations)
diff --git a/datahopper/go/swarming_metrics/tasks_test.go b/datahopper/go/swarming_metrics/tasks_test.go
index 3afd4a5..f8d463a 100644
--- a/datahopper/go/swarming_metrics/tasks_test.go
+++ b/datahopper/go/swarming_metrics/tasks_test.go
@@ -278,8 +278,16 @@
 	t2.TaskResult.State = swarming.TASK_STATE_RUNNING
 	t3 := makeTask("3", "my-task", cr.Add(2*time.Second), st, now.Add(-time.Minute), d, nil, 47*time.Second, 3*time.Second, 34*time.Second)
 	t3.TaskResult.State = swarming.TASK_STATE_BOT_DIED
+	t4 := makeTask("4", "Test-MyOS", cr, st, co, d, map[string]string{
+		"sk_revision":     "firstRevision",
+		"sk_name":         "Test-MyOS",
+		"sk_repo":         common.REPO_SKIA,
+		"sk_issue":        "12345",
+		"sk_patchset":     "6",
+		"sk_issue_server": "https://skia-review.googlesource.com",
+	}, 31*time.Second, 7*time.Second, 3*time.Second)
 
-	swarm.On("ListTasks", lastLoad, now, []string{"pool:Skia"}, "").Return([]*swarming_api.SwarmingRpcsTaskRequestMetadata{t1, t2, t3}, nil)
+	swarm.On("ListTasks", lastLoad, now, []string{"pool:Skia"}, "").Return([]*swarming_api.SwarmingRpcsTaskRequestMetadata{t1, t2, t3, t4}, nil)
 
 	btProject, btInstance, cleanup := bt_testutil.SetupBigTable(t, events.BT_TABLE, events.BT_COLUMN_FAMILY)
 	defer cleanup()
@@ -308,6 +316,29 @@
 				},
 			},
 		},
+		Source: "datahopper",
+	}).Return(nil)
+	pc.On("PushToPerf", now, "Test-MyOS", "task_duration", ingestcommon.BenchData{
+		Hash:     "firstRevision",
+		Issue:    "12345",
+		PatchSet: "6",
+		Key: map[string]string{
+			"os":      "MyOS",
+			"role":    "Test",
+			"failure": "false",
+		},
+		Results: map[string]ingestcommon.BenchResults{
+			"Test-MyOS": {
+				"task_duration": {
+					"total_s":            float64((14*time.Minute + 31*time.Second) / time.Second),
+					"task_step_s":        float64(14 * time.Minute / time.Second),
+					"isolate_overhead_s": 10.0,
+					"all_overhead_s":     31.0,
+				},
+			},
+		},
+		Source:       "datahopper",
+		PatchStorage: "gerrit",
 	}).Return(nil)
 
 	// Load Swarming tasks.
@@ -315,7 +346,7 @@
 	revisit, err = loadSwarmingTasks(swarm, "Skia", edb, pc, mp, lastLoad, now, revisit)
 	require.NoError(t, err)
 
-	pc.AssertNumberOfCalls(t, "PushToPerf", 1)
+	pc.AssertNumberOfCalls(t, "PushToPerf", 2)
 
 	// The second task is finished.
 	t2.TaskResult.State = swarming.TASK_STATE_COMPLETED
@@ -353,12 +384,13 @@
 				},
 			},
 		},
+		Source: "datahopper",
 	}).Return(nil)
 
 	// Load Swarming tasks again.
 
 	revisit, err = loadSwarmingTasks(swarm, "Skia", edb, pc, mp, lastLoad, now, revisit)
 	require.NoError(t, err)
-	pc.AssertNumberOfCalls(t, "PushToPerf", 2)
+	pc.AssertNumberOfCalls(t, "PushToPerf", 3)
 
 }