[task scheduler] Fixes for Firestore
- Add missing scope
- Fix comment DB
Bug: skia:
Change-Id: I1b925a4a5fba9d5e8b79fe1ebe14c5171fbf786c
Reviewed-on: https://skia-review.googlesource.com/c/177321
Reviewed-by: Ben Wagner <benjaminwagner@google.com>
Commit-Queue: Eric Boren <borenet@google.com>
diff --git a/status/go/status/main.go b/status/go/status/main.go
index 9aa41eb..7a10288 100644
--- a/status/go/status/main.go
+++ b/status/go/status/main.go
@@ -22,6 +22,7 @@
"unicode"
"cloud.google.com/go/bigtable"
+ "cloud.google.com/go/datastore"
"github.com/gorilla/mux"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/common"
@@ -713,7 +714,7 @@
}
ctx := context.Background()
- ts, err := auth.NewDefaultTokenSource(*testing, auth.SCOPE_USERINFO_EMAIL, auth.SCOPE_GERRIT, bigtable.ReadonlyScope, pubsub.AUTH_SCOPE)
+ ts, err := auth.NewDefaultTokenSource(*testing, auth.SCOPE_USERINFO_EMAIL, auth.SCOPE_GERRIT, bigtable.ReadonlyScope, pubsub.AUTH_SCOPE, datastore.ScopeDatastore)
if err != nil {
sklog.Fatal(err)
}
@@ -734,6 +735,7 @@
}
defer util.Close(taskDb.(db.DBCloser))
} else if *firestoreInstance != "" {
+ sklog.Infof("Creating firestore DB.")
label := *host
modTasks, err := pubsub.NewModifiedTasks(*pubsubTopicTasks, label, ts)
if err != nil {
@@ -748,6 +750,7 @@
sklog.Fatalf("Failed to create Firestore DB client: %s", err)
}
} else {
+ sklog.Infof("Creating remote DB.")
label := *host
taskDb, err = remote_db.NewClient(*taskSchedulerDbUrl, *pubsubTopicTasks, *pubsubTopicJobs, label, ts)
if err != nil {
@@ -775,7 +778,7 @@
sklog.Info("Checkout complete")
// Cache for buildProgressHandler.
- tasksPerCommit, err = newTasksPerCommitCache(ctx, *workdir, []string{common.REPO_SKIA, common.REPO_SKIA_INFRA}, 14*24*time.Hour)
+ tasksPerCommit, err = newTasksPerCommitCache(ctx, *workdir, *repoUrls, 14*24*time.Hour)
if err != nil {
sklog.Fatalf("Failed to create tasksPerCommitCache: %s", err)
}
diff --git a/status/vm.go b/status/vm.go
index 01f6602..8938af6 100644
--- a/status/vm.go
+++ b/status/vm.go
@@ -4,6 +4,8 @@
"path"
"runtime"
+ "cloud.google.com/go/bigtable"
+ "cloud.google.com/go/datastore"
"go.skia.org/infra/go/gce"
"go.skia.org/infra/go/gce/server"
)
@@ -14,6 +16,10 @@
vm.DataDisks[0].Type = gce.DISK_TYPE_PERSISTENT_STANDARD
vm.Metadata["owner_primary"] = "borenet"
vm.Metadata["owner_secondary"] = "kjlubick"
+ vm.Scopes = append(vm.Scopes,
+ bigtable.ReadonlyScope,
+ datastore.ScopeDatastore,
+ )
_, filename, _, _ := runtime.Caller(0)
dir := path.Dir(filename)
diff --git a/task_scheduler/go/db/comments_test.go b/task_scheduler/go/db/comments_test.go
index cdf3c84..a00e224 100644
--- a/task_scheduler/go/db/comments_test.go
+++ b/task_scheduler/go/db/comments_test.go
@@ -44,8 +44,9 @@
// Add some comments.
tc1 := types.MakeTaskComment(1, 1, 1, 1, now)
- expected["r1"] = &types.RepoComments{
- Repo: "r1",
+ r1 := tc1.Repo
+ expected[r1] = &types.RepoComments{
+ Repo: r1,
TaskComments: map[string]map[string][]*types.TaskComment{"c1": {"n1": {tc1}}},
TaskSpecComments: map[string][]*types.TaskSpecComment{},
CommitComments: map[string][]*types.CommitComment{},
@@ -53,24 +54,25 @@
assert.NoError(t, db.PutTaskComment(tc1))
tc2 := types.MakeTaskComment(2, 1, 1, 1, now.Add(2*time.Second))
- expected["r1"].TaskComments["c1"]["n1"] = []*types.TaskComment{tc1, tc2}
+ expected[r1].TaskComments["c1"]["n1"] = []*types.TaskComment{tc1, tc2}
assert.NoError(t, db.PutTaskComment(tc2))
tc3 := types.MakeTaskComment(3, 1, 1, 1, now.Add(time.Second))
- expected["r1"].TaskComments["c1"]["n1"] = []*types.TaskComment{tc1, tc3, tc2}
+ expected[r1].TaskComments["c1"]["n1"] = []*types.TaskComment{tc1, tc3, tc2}
assert.NoError(t, db.PutTaskComment(tc3))
tc4 := types.MakeTaskComment(4, 1, 1, 2, now)
- expected["r1"].TaskComments["c2"] = map[string][]*types.TaskComment{"n1": {tc4}}
+ expected[r1].TaskComments["c2"] = map[string][]*types.TaskComment{"n1": {tc4}}
assert.NoError(t, db.PutTaskComment(tc4))
tc5 := types.MakeTaskComment(5, 1, 2, 2, now)
- expected["r1"].TaskComments["c2"]["n2"] = []*types.TaskComment{tc5}
+ expected[r1].TaskComments["c2"]["n2"] = []*types.TaskComment{tc5}
assert.NoError(t, db.PutTaskComment(tc5))
tc6 := types.MakeTaskComment(6, 2, 3, 3, now)
- expected["r2"] = &types.RepoComments{
- Repo: "r2",
+ r2 := tc6.Repo
+ expected[r2] = &types.RepoComments{
+ Repo: r2,
TaskComments: map[string]map[string][]*types.TaskComment{"c3": {"n3": {tc6.Copy()}}},
TaskSpecComments: map[string][]*types.TaskSpecComment{},
CommitComments: map[string][]*types.CommitComment{},
@@ -84,11 +86,11 @@
assert.True(t, callCount >= 6)
sc1 := types.MakeTaskSpecComment(1, 1, 1, now)
- expected["r1"].TaskSpecComments["n1"] = []*types.TaskSpecComment{sc1}
+ expected[r1].TaskSpecComments["n1"] = []*types.TaskSpecComment{sc1}
assert.NoError(t, db.PutTaskSpecComment(sc1))
cc1 := types.MakeCommitComment(1, 1, 1, now)
- expected["r1"].CommitComments["c1"] = []*types.CommitComment{cc1}
+ expected[r1].CommitComments["c1"] = []*types.CommitComment{cc1}
assert.NoError(t, db.PutCommitComment(cc1))
assert.True(t, callCount >= 8)
@@ -109,18 +111,18 @@
// Reload DB from persistent.
init := map[string]*types.RepoComments{
- "r1": expected["r1"].Copy(),
- "r2": expected["r2"].Copy(),
+ r1: expected[r1].Copy(),
+ r2: expected[r2].Copy(),
}
db = NewCommentBoxWithPersistence(init, testWriter)
{
- actual, err := db.GetCommentsForRepos([]string{"r0", "r1", "r2"}, now.Add(-10000*time.Hour))
+ actual, err := db.GetCommentsForRepos([]string{"r0", r1, r2}, now.Add(-10000*time.Hour))
assert.NoError(t, err)
expectedSlice := []*types.RepoComments{
{Repo: "r0"},
- expected["r1"],
- expected["r2"],
+ expected[r1],
+ expected[r2],
}
deepequal.AssertDeepEqual(t, expectedSlice, actual)
}
@@ -128,11 +130,11 @@
assert.Equal(t, 0, callCount)
// Delete some comments.
- expected["r1"].TaskComments["c1"]["n1"] = []*types.TaskComment{tc1, tc2}
+ expected[r1].TaskComments["c1"]["n1"] = []*types.TaskComment{tc1, tc2}
assert.NoError(t, db.DeleteTaskComment(tc3))
- expected["r1"].TaskSpecComments = map[string][]*types.TaskSpecComment{}
+ expected[r1].TaskSpecComments = map[string][]*types.TaskSpecComment{}
assert.NoError(t, db.DeleteTaskSpecComment(sc1))
- expected["r1"].CommitComments = map[string][]*types.CommitComment{}
+ expected[r1].CommitComments = map[string][]*types.CommitComment{}
assert.NoError(t, db.DeleteCommitComment(cc1))
assert.Equal(t, 3, callCount)
@@ -150,30 +152,30 @@
assert.NoError(t, db.DeleteCommitComment(types.MakeCommitComment(99, 99, 1, now)))
{
- actual, err := db.GetCommentsForRepos([]string{"r0", "r1", "r2"}, now.Add(-10000*time.Hour))
+ actual, err := db.GetCommentsForRepos([]string{"r0", r1, r2}, now.Add(-10000*time.Hour))
assert.NoError(t, err)
expectedSlice := []*types.RepoComments{
{Repo: "r0"},
- expected["r1"],
- expected["r2"],
+ expected[r1],
+ expected[r2],
}
deepequal.AssertDeepEqual(t, expectedSlice, actual)
}
// Reload DB from persistent again.
init = map[string]*types.RepoComments{
- "r1": expected["r1"].Copy(),
- "r2": expected["r2"].Copy(),
+ r1: expected[r1].Copy(),
+ r2: expected[r2].Copy(),
}
db = NewCommentBoxWithPersistence(init, testWriter)
{
- actual, err := db.GetCommentsForRepos([]string{"r0", "r1", "r2"}, now.Add(-10000*time.Hour))
+ actual, err := db.GetCommentsForRepos([]string{"r0", r1, r2}, now.Add(-10000*time.Hour))
assert.NoError(t, err)
expectedSlice := []*types.RepoComments{
{Repo: "r0"},
- expected["r1"],
- expected["r2"],
+ expected[r1],
+ expected[r2],
}
deepequal.AssertDeepEqual(t, expectedSlice, actual)
}
@@ -205,6 +207,8 @@
for _, c := range []*types.TaskComment{tc1, tc2, tc3, tc4, tc5, tc6} {
assert.NoError(t, db.PutTaskComment(c))
}
+ r1 := tc1.Repo
+ r2 := tc6.Repo
sc1 := types.MakeTaskSpecComment(1, 1, 1, now)
assert.NoError(t, db.PutTaskSpecComment(sc1))
@@ -214,7 +218,7 @@
expected := []*types.RepoComments{
{
- Repo: "r1",
+ Repo: r1,
TaskComments: map[string]map[string][]*types.TaskComment{
"c1": {
"n1": {tc1, tc3, tc2},
@@ -232,7 +236,7 @@
},
},
{
- Repo: "r2",
+ Repo: r2,
TaskComments: map[string]map[string][]*types.TaskComment{
"c3": {
"n3": {tc6},
@@ -244,7 +248,7 @@
}
{
- actual, err := db.GetCommentsForRepos([]string{"r1", "r2"}, now.Add(-10000*time.Hour))
+ actual, err := db.GetCommentsForRepos([]string{r1, r2}, now.Add(-10000*time.Hour))
assert.NoError(t, err)
deepequal.AssertDeepEqual(t, expected, actual)
}
@@ -268,7 +272,7 @@
// Assert nothing has changed.
{
- actual, err := db.GetCommentsForRepos([]string{"r1", "r2"}, now.Add(-10000*time.Hour))
+ actual, err := db.GetCommentsForRepos([]string{r1, r2}, now.Add(-10000*time.Hour))
assert.NoError(t, err)
deepequal.AssertDeepEqual(t, expected, actual)
}
@@ -279,7 +283,7 @@
// Assert nothing has changed.
{
- actual, err := db.GetCommentsForRepos([]string{"r1", "r2"}, now.Add(-10000*time.Hour))
+ actual, err := db.GetCommentsForRepos([]string{r1, r2}, now.Add(-10000*time.Hour))
assert.NoError(t, err)
deepequal.AssertDeepEqual(t, expected, actual)
}
diff --git a/task_scheduler/go/db/firestore/comments.go b/task_scheduler/go/db/firestore/comments.go
index c1f91ea..4fc0234 100644
--- a/task_scheduler/go/db/firestore/comments.go
+++ b/task_scheduler/go/db/firestore/comments.go
@@ -2,12 +2,13 @@
import (
"fmt"
+ "net/url"
"time"
fs "cloud.google.com/go/firestore"
"go.skia.org/infra/go/firestore"
+ "go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/db"
- "go.skia.org/infra/task_scheduler/go/db/local_db"
"go.skia.org/infra/task_scheduler/go/types"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -111,7 +112,7 @@
// taskCommentId returns an ID for the TaskComment.
func taskCommentId(c *types.TaskComment) string {
- return fmt.Sprintf("%s#%s#%s#%s", c.Repo, c.Revision, c.Name, fixTimestamp(c.Timestamp).Format(local_db.TIMESTAMP_FORMAT))
+ return fmt.Sprintf("%s#%s#%s#%s", url.QueryEscape(c.Repo), c.Revision, c.Name, fixTimestamp(c.Timestamp).Format(util.SAFE_TIMESTAMP_FORMAT))
}
// See documentation for db.CommentDB interface.
@@ -134,7 +135,7 @@
// taskSpecCommentId returns an ID for the TaskSpecComment.
func taskSpecCommentId(c *types.TaskSpecComment) string {
- return fmt.Sprintf("%s#%s#%s", c.Repo, c.Name, c.Timestamp.Format(local_db.TIMESTAMP_FORMAT))
+ return fmt.Sprintf("%s#%s#%s", url.QueryEscape(c.Repo), c.Name, c.Timestamp.Format(util.SAFE_TIMESTAMP_FORMAT))
}
// See documentation for db.CommentDB interface.
@@ -157,7 +158,7 @@
// commitCommentId returns an ID for the CommitComment.
func commitCommentId(c *types.CommitComment) string {
- return fmt.Sprintf("%s#%s#%s", c.Repo, c.Revision, c.Timestamp.Format(local_db.TIMESTAMP_FORMAT))
+ return fmt.Sprintf("%s#%s#%s", url.QueryEscape(c.Repo), c.Revision, c.Timestamp.Format(util.SAFE_TIMESTAMP_FORMAT))
}
// See documentation for db.CommentDB interface.
diff --git a/task_scheduler/go/db/testutil.go b/task_scheduler/go/db/testutil.go
index 371a76e..e80d30b 100644
--- a/task_scheduler/go/db/testutil.go
+++ b/task_scheduler/go/db/testutil.go
@@ -1177,13 +1177,16 @@
now := time.Now()
// Empty db.
+ r0 := fmt.Sprintf("%s%d", common.REPO_SKIA, 0)
+ r1 := fmt.Sprintf("%s%d", common.REPO_SKIA, 1)
+ r2 := fmt.Sprintf("%s%d", common.REPO_SKIA, 2)
{
- actual, err := db.GetCommentsForRepos([]string{"r0", "r1", "r2"}, now.Add(-10000*time.Hour))
+ actual, err := db.GetCommentsForRepos([]string{r0, r1, r2}, now.Add(-10000*time.Hour))
assert.NoError(t, err)
assert.Equal(t, 3, len(actual))
- assert.Equal(t, "r0", actual[0].Repo)
- assert.Equal(t, "r1", actual[1].Repo)
- assert.Equal(t, "r2", actual[2].Repo)
+ assert.Equal(t, r0, actual[0].Repo)
+ assert.Equal(t, r1, actual[1].Repo)
+ assert.Equal(t, r2, actual[2].Repo)
for _, rc := range actual {
assert.Equal(t, 0, len(rc.TaskComments))
assert.Equal(t, 0, len(rc.TaskSpecComments))
@@ -1238,9 +1241,9 @@
assert.True(t, IsAlreadyExists(db.PutCommitComment(cc1different)))
expected := []*types.RepoComments{
- {Repo: "r0"},
+ {Repo: r0},
{
- Repo: "r1",
+ Repo: r1,
TaskComments: map[string]map[string][]*types.TaskComment{
"c1": {
"n1": {tc1, tc3, tc2},
@@ -1260,7 +1263,7 @@
},
},
{
- Repo: "r2",
+ Repo: r2,
TaskComments: map[string]map[string][]*types.TaskComment{
"c3": {
"n3": {tc6copy},
@@ -1275,14 +1278,14 @@
},
}
{
- actual, err := db.GetCommentsForRepos([]string{"r0", "r1", "r2"}, now.Add(-10000*time.Hour))
+ actual, err := db.GetCommentsForRepos([]string{r0, r1, r2}, now.Add(-10000*time.Hour))
assert.NoError(t, err)
AssertDeepEqual(t, expected, actual)
}
// Specifying a cutoff time shouldn't drop required comments.
{
- actual, err := db.GetCommentsForRepos([]string{"r1"}, now.Add(time.Second))
+ actual, err := db.GetCommentsForRepos([]string{r1}, now.Add(time.Second))
assert.NoError(t, err)
assert.Equal(t, 1, len(actual))
{
@@ -1341,7 +1344,7 @@
expected[1].TaskSpecComments["n1"] = []*types.TaskSpecComment{sc2}
expected[1].CommitComments["c1"] = []*types.CommitComment{cc2}
{
- actual, err := db.GetCommentsForRepos([]string{"r0", "r1", "r2"}, now.Add(-10000*time.Hour))
+ actual, err := db.GetCommentsForRepos([]string{r0, r1, r2}, now.Add(-10000*time.Hour))
assert.NoError(t, err)
AssertDeepEqual(t, expected, actual)
}
@@ -1357,12 +1360,12 @@
assert.NoError(t, db.DeleteCommitComment(c))
}
{
- actual, err := db.GetCommentsForRepos([]string{"r0", "r1", "r2"}, now.Add(-10000*time.Hour))
+ actual, err := db.GetCommentsForRepos([]string{r0, r1, r2}, now.Add(-10000*time.Hour))
assert.NoError(t, err)
assert.Equal(t, 3, len(actual))
- assert.Equal(t, "r0", actual[0].Repo)
- assert.Equal(t, "r1", actual[1].Repo)
- assert.Equal(t, "r2", actual[2].Repo)
+ assert.Equal(t, r0, actual[0].Repo)
+ assert.Equal(t, r1, actual[1].Repo)
+ assert.Equal(t, r2, actual[2].Repo)
for _, rc := range actual {
assert.Equal(t, 0, len(rc.TaskComments))
assert.Equal(t, 0, len(rc.TaskSpecComments))
diff --git a/task_scheduler/go/task_scheduler/main.go b/task_scheduler/go/task_scheduler/main.go
index 37640e1..dce39f1 100644
--- a/task_scheduler/go/task_scheduler/main.go
+++ b/task_scheduler/go/task_scheduler/main.go
@@ -14,6 +14,7 @@
"runtime"
"time"
+ "cloud.google.com/go/datastore"
"github.com/gorilla/mux"
"go.skia.org/infra/go/allowed"
"go.skia.org/infra/go/auth"
@@ -622,7 +623,7 @@
// Authenticated HTTP client.
oauthCacheFile := path.Join(wdAbs, "google_storage_token.data")
- tokenSource, err := auth.NewLegacyTokenSource(*local, oauthCacheFile, "", auth.SCOPE_READ_WRITE, pubsub.AUTH_SCOPE)
+ tokenSource, err := auth.NewLegacyTokenSource(*local, oauthCacheFile, "", auth.SCOPE_READ_WRITE, pubsub.AUTH_SCOPE, datastore.ScopeDatastore)
if err != nil {
sklog.Fatal(err)
}
diff --git a/task_scheduler/go/types/testutil.go b/task_scheduler/go/types/testutil.go
index e1f5f2d..f9d2cdc 100644
--- a/task_scheduler/go/types/testutil.go
+++ b/task_scheduler/go/types/testutil.go
@@ -3,6 +3,8 @@
import (
"fmt"
"time"
+
+ "go.skia.org/infra/go/common"
)
const DEFAULT_TEST_REPO = "go-on-now.git"
@@ -65,7 +67,7 @@
// name, commit, and ts, and other fields based on n.
func MakeTaskComment(n int, repo int, name int, commit int, ts time.Time) *TaskComment {
return &TaskComment{
- Repo: fmt.Sprintf("r%d", repo),
+ Repo: fmt.Sprintf("%s%d", common.REPO_SKIA, repo),
Revision: fmt.Sprintf("c%d", commit),
Name: fmt.Sprintf("n%d", name),
Timestamp: ts,
@@ -79,7 +81,7 @@
// repo, name, and ts, and other fields based on n.
func MakeTaskSpecComment(n int, repo int, name int, ts time.Time) *TaskSpecComment {
return &TaskSpecComment{
- Repo: fmt.Sprintf("r%d", repo),
+ Repo: fmt.Sprintf("%s%d", common.REPO_SKIA, repo),
Name: fmt.Sprintf("n%d", name),
Timestamp: ts,
User: fmt.Sprintf("u%d", n),
@@ -93,7 +95,7 @@
// repo, commit, and ts, and other fields based on n.
func MakeCommitComment(n int, repo int, commit int, ts time.Time) *CommitComment {
return &CommitComment{
- Repo: fmt.Sprintf("r%d", repo),
+ Repo: fmt.Sprintf("%s%d", common.REPO_SKIA, repo),
Revision: fmt.Sprintf("c%d", commit),
Timestamp: ts,
User: fmt.Sprintf("u%d", n),
diff --git a/task_scheduler/vm.go b/task_scheduler/vm.go
index 9820189..6211a6e 100644
--- a/task_scheduler/vm.go
+++ b/task_scheduler/vm.go
@@ -4,6 +4,7 @@
"path"
"runtime"
+ "cloud.google.com/go/datastore"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/gce"
"go.skia.org/infra/go/gce/server"
@@ -18,6 +19,7 @@
vm.Metadata["owner_secondary"] = "benjaminwagner"
vm.Scopes = append(vm.Scopes,
auth.SCOPE_GERRIT,
+ datastore.ScopeDatastore,
)
_, filename, _, _ := runtime.Caller(0)