blob: 7c3651eb111c1d29f7928ff9a2f018e50516c307 [file] [log] [blame]
package main
import (
"context"
"flag"
"time"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/common"
fs "go.skia.org/infra/go/firestore"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/db/firestore"
"go.skia.org/infra/task_scheduler/go/db/local_db"
)
const (
TIME_CHUNK = 24 * time.Hour
)
var (
local = flag.Bool("local", false, "True if running locally.")
boltDB = flag.String("bolt_db", "", "Bolt DB to migrate from.")
fsInstance = flag.String("firestore_instance", "", "Firestore instance to migrate to.")
repoUrls = common.NewMultiStringFlag("repo", nil, "Repositories for which to schedule tasks.")
BEGINNING_OF_TIME = time.Date(2016, time.September, 1, 0, 0, 0, 0, time.UTC)
NOW = time.Now()
)
func migrateTasks(oldDB, newDB db.DB) error {
return util.IterTimeChunks(BEGINNING_OF_TIME, NOW, TIME_CHUNK, func(start, end time.Time) error {
sklog.Infof("Migrating tasks in %s - %s", start, end)
tasks, err := oldDB.GetTasksFromDateRange(start, end, "")
if err != nil {
return err
}
for _, t := range tasks {
t.DbModified = time.Time{}
}
return util.ChunkIter(len(tasks), fs.MAX_TRANSACTION_DOCS, func(start, end int) error {
return newDB.PutTasks(tasks[start:end])
})
})
}
func migrateJobs(oldDB, newDB db.DB) error {
return util.IterTimeChunks(BEGINNING_OF_TIME, NOW, TIME_CHUNK, func(start, end time.Time) error {
sklog.Infof("Migrating jobs in %s - %s", start, end)
jobs, err := oldDB.GetJobsFromDateRange(start, end)
if err != nil {
return err
}
for _, j := range jobs {
j.DbModified = time.Time{}
}
return util.ChunkIter(len(jobs), fs.MAX_TRANSACTION_DOCS, func(start, end int) error {
return newDB.PutJobs(jobs[start:end])
})
})
}
func migrateComments(oldDB, newDB db.DB) error {
comments, err := oldDB.GetCommentsForRepos(*repoUrls, BEGINNING_OF_TIME)
if err != nil {
return err
}
for _, c := range comments {
for _, commitComments := range c.CommitComments {
for _, cc := range commitComments {
if err := newDB.PutCommitComment(cc); err != nil {
return err
}
}
}
for _, taskCommentsByCommit := range c.TaskComments {
for _, taskCommentsByName := range taskCommentsByCommit {
for _, tc := range taskCommentsByName {
if err := newDB.PutTaskComment(tc); err != nil {
return err
}
}
}
}
for _, taskSpecComments := range c.TaskSpecComments {
for _, tsc := range taskSpecComments {
if err := newDB.PutTaskSpecComment(tsc); err != nil {
return err
}
}
}
}
return nil
}
func migrate(oldDB, newDB db.DB) error {
if err := migrateTasks(oldDB, newDB); err != nil {
return err
}
if err := migrateJobs(oldDB, newDB); err != nil {
return err
}
if err := migrateComments(oldDB, newDB); err != nil {
return err
}
return nil
}
func main() {
common.Init()
if *boltDB == "" {
sklog.Fatal("--bolt_db is required.")
}
if *fsInstance == "" {
sklog.Fatal("--firestore_instance is required.")
}
if *repoUrls == nil {
*repoUrls = common.PUBLIC_REPOS
}
ctx := context.Background()
oldDB, err := local_db.NewDB(local_db.DB_NAME, *boltDB, nil)
if err != nil {
sklog.Fatal(err)
}
defer util.Close(oldDB)
ts, err := auth.NewDefaultTokenSource(*local)
if err != nil {
sklog.Fatal(err)
}
newDB, err := firestore.NewDB(ctx, firestore.FIRESTORE_PROJECT, *fsInstance, ts, nil)
if err != nil {
sklog.Fatal(err)
}
defer util.Close(newDB)
if err := migrate(oldDB, newDB); err != nil {
sklog.Fatal(err)
}
}