blob: b81ebd591bd556f02433310b213d4a99451c73a1 [file] [log] [blame]
// The fsmigrator executable migrates various data from firestore to an SQL database.
// It uses port forwarding, as that is the simplest approach and there shouldn't be
// too much data.
package main
import (
"context"
"flag"
"sync"
"time"
"github.com/jackc/pgx/v4/pgxpool"
ifirestore "go.skia.org/infra/go/firestore"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
ci "go.skia.org/infra/golden/go/continuous_integration"
"go.skia.org/infra/golden/go/sql"
"go.skia.org/infra/golden/go/tjstore"
"go.skia.org/infra/golden/go/tjstore/fs_tjstore"
"go.skia.org/infra/golden/go/tjstore/sqltjstore"
)
func main() {
var (
fsProjectID = flag.String("fs_project_id", "skia-firestore", "The project with the firestore instance. Datastore and Firestore can't be in the same project.")
oldFSNamespace = flag.String("old_fs_namespace", "", "Typically the instance id. e.g. 'chrome-gpu', 'skia', etc")
newSQLDatabase = flag.String("new_sql_db", "", "Something like the instance id (no dashes)")
)
flag.Parse()
if *oldFSNamespace == "" {
sklog.Fatalf("You must include fs_namespace")
}
if *newSQLDatabase == "" {
sklog.Fatalf("You must include new_sql_db")
}
ctx := context.Background()
fsClient, err := ifirestore.NewClient(ctx, *fsProjectID, "gold", *oldFSNamespace, nil)
if err != nil {
sklog.Fatalf("Unable to configure Firestore: %s", err)
}
oldStore := fs_tjstore.New(fsClient)
u := sql.GetConnectionURL("root@localhost:26234", *newSQLDatabase)
conf, err := pgxpool.ParseConfig(u)
if err != nil {
sklog.Fatalf("error getting postgres config %s: %s", u, err)
}
conf.MaxConns = 16
db, err := pgxpool.ConnectConfig(ctx, conf)
if err != nil {
sklog.Info("You must run\nkubectl port-forward gold-cockroachdb-0 26234:26234")
sklog.Fatalf("error connecting to the database: %s", err)
}
// We've already migrated all the CL and PS data, so we can query the SQL for that instead of
// the Firestore. We picked the first of the year as the cutoff date because it would take too
// long to migrate all the data, and this seemed like a good idea.
rows, err := db.Query(ctx, `
SELECT Patchsets.changelist_id, Patchsets.system, Patchsets.patchset_id, Changelists.last_ingested_data
FROM Changelists JOIN Patchsets ON
Changelists.changelist_id = Patchsets.changelist_id
WHERE last_ingested_data > '2021-01-01'
ORDER BY 1, 2;
`)
if err != nil {
sklog.Fatalf("getting newish CLs from DB %s", err)
}
defer rows.Close()
var toMigrate []tjstore.CombinedPSID
var lastUpdatedTimestamps []time.Time
for rows.Next() {
var cid tjstore.CombinedPSID
var qCLID string
var qPSID string
var ts time.Time
if err := rows.Scan(&qCLID, &cid.CRS, &qPSID, &ts); err != nil {
sklog.Fatalf("Invalid row: %s")
}
cid.CL = sql.Unqualify(qCLID)
cid.PS = sql.Unqualify(qPSID)
toMigrate = append(toMigrate, cid)
lastUpdatedTimestamps = append(lastUpdatedTimestamps, ts.UTC())
}
rows.Close()
sklog.Infof("There are %d PS with Tryjob data that needs migrating", len(toMigrate))
if len(toMigrate) == 0 || len(lastUpdatedTimestamps) == 0 {
sklog.Infof("Trivially done")
return
}
sqlStore := sqltjstore.New(db)
workChan := make(chan int, len(toMigrate))
for i := range toMigrate {
workChan <- i
}
close(workChan)
wg := sync.WaitGroup{}
const numChunks = 4
wg.Add(4)
for i := 0; i < numChunks; i++ {
go func() {
defer wg.Done()
for idx := range workChan {
if err := ctx.Err(); err != nil {
sklog.Errorf("Context error: %s", err)
return
}
cid := toMigrate[idx]
ts := lastUpdatedTimestamps[idx]
sklog.Infof("%v", cid)
tryjobs, err := oldStore.GetTryJobs(ctx, cid)
if err != nil {
sklog.Fatalf("Error fetching tryjobs from FS for %v: %s", cid, err)
}
data, err := oldStore.GetResults(ctx, cid, time.Time{})
if err != nil {
sklog.Fatalf("Error fetching data from FS for %v: %s", cid, err)
}
err = storeTryJobs(ctx, db, cid, tryjobs)
if err != nil {
sklog.Fatalf("Error writing tryjobs (len %d) to SQL for %v: %s", len(tryjobs), cid, err)
}
err = sqlStore.PutResults(ctx, cid, "unknown (migrated from FS)", data, ts)
if err != nil {
sklog.Fatalf("Error writing data (len %d) to SQL for %v: %s", len(data), cid, err)
}
}
}()
}
wg.Wait()
sklog.Infof("Done")
}
func storeTryJobs(ctx context.Context, db *pgxpool.Pool, cID tjstore.CombinedPSID, xtj []ci.TryJob) error {
const statement = `
UPSERT INTO Tryjobs (tryjob_id, system, changelist_id, patchset_id, display_name, last_ingested_data)
VALUES `
const valuesPerRow = 6
placeholders := sql.ValuesPlaceholders(valuesPerRow, len(xtj))
arguments := make([]interface{}, 0, valuesPerRow*len(xtj))
clID := sql.Qualify(cID.CRS, cID.CL)
psID := sql.Qualify(cID.CRS, cID.PS)
for _, tj := range xtj {
tjID := sql.Qualify(tj.System, tj.SystemID)
arguments = append(arguments, tjID, tj.System, clID, psID, tj.DisplayName, tj.Updated)
}
_, err := db.Exec(ctx, statement+placeholders, arguments...)
if err != nil {
return skerr.Wrapf(err, "Storing %d Tryjobs", len(xtj))
}
return nil
}