blob: 934b677ad0fb3055b5eb9576723541d5a065ef3c [file] [log] [blame]
// The fsmigrator executable migrates various data from firestore to an SQL database.
// It uses port forwarding, as that is the simplest approach and there shouldn't be
// too much data.
package main
import (
"context"
"crypto/md5"
"flag"
"strings"
"sync/atomic"
"time"
"cloud.google.com/go/firestore"
"github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx"
"github.com/google/uuid"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"golang.org/x/sync/errgroup"
"google.golang.org/api/iterator"
ifirestore "go.skia.org/infra/go/firestore"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/golden/go/expectations"
"go.skia.org/infra/golden/go/fs_utils"
"go.skia.org/infra/golden/go/sql"
"go.skia.org/infra/golden/go/sql/schema"
"go.skia.org/infra/golden/go/types"
)
func main() {
var (
fsProjectID = flag.String("fs_project_id", "skia-firestore", "The project with the firestore instance. Datastore and Firestore can't be in the same project.")
oldFSNamespace = flag.String("old_fs_namespace", "", "Typically the instance id. e.g. 'chrome-gpu', 'skia', etc")
newSQLDatabase = flag.String("new_sql_db", "", "Something like the instance id (no dashes)")
)
flag.Parse()
if *oldFSNamespace == "" {
sklog.Fatalf("You must include fs_namespace")
}
if *newSQLDatabase == "" {
sklog.Fatalf("You must include new_sql_db")
}
ctx := context.Background()
fsClient, err := ifirestore.NewClient(ctx, *fsProjectID, "gold", *oldFSNamespace, nil)
if err != nil {
sklog.Fatalf("Unable to configure Firestore: %s", err)
}
u := sql.GetConnectionURL("root@localhost:26234", *newSQLDatabase)
conf, err := pgxpool.ParseConfig(u)
if err != nil {
sklog.Fatalf("error getting postgres config %s: %s", u, err)
}
conf.MaxConns = 16
db, err := pgxpool.ConnectConfig(ctx, conf)
if err != nil {
sklog.Info("You must run\nkubectl port-forward gold-cockroachdb-0 26234:26234")
sklog.Fatalf("error connecting to the database: %s", err)
}
oldNamespace := v3Impl{client: fsClient}
// We need to look up the groupings by test name, because the old groupings were just the
// test name and not combined with the corpus. As such, we fetch all the groupings from the SQL,
// which has been filled through ingesting.
nameToGroupings, err := fetchGroupings(ctx, db)
if err != nil {
sklog.Fatalf("Getting groupings from SQL: %s", err)
}
sklog.Infof("Fetched %d groupings from SQL DB", len(nameToGroupings))
records, err := oldNamespace.fetchTriageRecords(ctx)
if err != nil {
sklog.Fatalf("Fetching triage records: %s", err)
}
sklog.Infof("Should migrate %d records", len(records))
if err := storeAndCombineTriageRecords(ctx, db, records); err != nil {
sklog.Fatalf("storing triage records %s", err)
}
sklog.Infof("Fetching deltas")
deltas, err := oldNamespace.fetchExpectationDeltas(ctx)
if err != nil {
sklog.Fatalf("Fetching expectation deltas: %s", err)
}
if err := storeExpectationDeltas(ctx, db, deltas, nameToGroupings); err != nil {
sklog.Fatalf("storing expectation deltas: %s", err)
}
sklog.Infof("Fetching expectations")
exp, err := oldNamespace.fetchExpectations(ctx)
if err != nil {
sklog.Fatalf("fetching expectations %s", err)
}
// pass in deltas so we can link in the triage record to the expectations.
unowned, err := storeExpectations(ctx, db, exp, nameToGroupings, deltas)
if err != nil {
sklog.Fatalf("storing expectation deltas: %s", err)
}
// Write the catchall expectation record. If somehow an expectation exists, but wasn't covered
// by a delta (e.g. broken old data), we assign it to this catchall record.
_, err = db.Exec(ctx, `UPSERT INTO ExpectationRecords
(expectation_record_id, user_name, triage_time, num_changes) VALUES ($1, $2, $3, $4)`,
catchAllUUID, "sql_migrator", time.Now(), unowned)
if err != nil {
sklog.Fatalf("creating catch-all record: %s", err)
}
sklog.Info("done")
}
func fetchGroupings(ctx context.Context, db *pgxpool.Pool) (map[types.TestName][]schema.GroupingID, error) {
rv := map[types.TestName][]schema.GroupingID{}
rows, err := db.Query(ctx, `SELECT keys -> 'name', grouping_id FROM Groupings`)
if err != nil {
return nil, skerr.Wrap(err)
}
defer rows.Close()
for rows.Next() {
var name types.TestName
var gID schema.GroupingID
if err := rows.Scan(&name, &gID); err != nil {
return nil, skerr.Wrap(err)
}
rv[name] = append(rv[name], gID)
}
count := 0
for _, groupings := range rv {
if len(groupings) > 1 {
count++
}
}
sklog.Infof("%d out of %d test names belonged to multiple corpora", count, len(rv))
return rv, nil
}
func storeAndCombineTriageRecords(ctx context.Context, db *pgxpool.Pool, toStore map[string][]v3TriageRecord) error {
sklog.Infof("have %d partitions", len(toStore))
const batchSize = 1000
eg, ctx := errgroup.WithContext(ctx)
for b, r := range toStore {
branchName, records := b, r
sklog.Infof("Writing records from partition %s", branchName)
eg.Go(func() error {
return util.ChunkIter(len(records), batchSize, func(startIdx int, endIdx int) error {
if err := ctx.Err(); err != nil {
return skerr.Wrap(err)
}
batch := records[startIdx:endIdx]
statement := `INSERT INTO ExpectationRecords
(expectation_record_id, user_name, triage_time, num_changes, branch_name) VALUES `
const valuesPerRow = 5
arguments := make([]interface{}, 0, valuesPerRow*len(batch))
for _, record := range batch {
// We can turn the old IDs into a UUID by hashing the bytes. This is faster than
// having to return the new random UUIDs.
newID := uuid.Must(uuid.FromBytes(hash(record.ID)))
arguments = append(arguments, newID, record.UserName, record.TS, record.Changes)
if branchName == v3PrimaryPartition {
arguments = append(arguments, nil)
} else {
arguments = append(arguments, branchName)
}
}
statement += sql.ValuesPlaceholders(valuesPerRow, len(batch))
statement += `ON CONFLICT DO NOTHING`
err := crdbpgx.ExecuteTx(ctx, db, pgx.TxOptions{}, func(tx pgx.Tx) error {
_, err := tx.Exec(ctx, statement, arguments...)
return err // don't wrap - might get retried
})
return skerr.Wrap(err)
})
})
}
if err := eg.Wait(); err != nil {
return skerr.Wrapf(err, "writing records to SQL")
}
return nil
}
func storeExpectationDeltas(ctx context.Context, db *pgxpool.Pool, toStore map[string][]v3ExpectationChange, nameToGroupings map[types.TestName][]schema.GroupingID) error {
sklog.Infof("have %d partitions", len(toStore))
const batchSize = 1000
eg, ctx := errgroup.WithContext(ctx)
for b, d := range toStore {
branchName, deltas := b, d
sklog.Infof("Writing deltas from partition %s", branchName)
eg.Go(func() error {
return util.ChunkIter(len(deltas), batchSize, func(startIdx int, endIdx int) error {
if err := ctx.Err(); err != nil {
return skerr.Wrap(err)
}
batch := deltas[startIdx:endIdx]
statement := `INSERT INTO ExpectationDeltas
(expectation_record_id, grouping_id, digest, label_before, label_after) VALUES `
const valuesPerRow = 5
arguments := make([]interface{}, 0, valuesPerRow*len(batch))
for _, delta := range batch {
newID := uuid.Must(uuid.FromBytes(hash(delta.RecordID)))
groupings, ok := nameToGroupings[delta.Grouping]
if !ok {
sklog.Warningf("Unknown grouping for name %s on branch %s", delta.Grouping, branchName)
continue
}
dBytes, err := sql.DigestToBytes(delta.Digest)
if err != nil {
sklog.Warningf("Corrupt digest %q on branch %s", delta.Digest, branchName)
continue
}
// Some groupings were on multiple corpora, so we need to store the record for
// all of them.
for _, gID := range groupings {
arguments = append(arguments,
newID, gID, dBytes, convertLabel(delta.LabelBefore), convertLabel(delta.AffectedRange.Label))
}
}
if len(arguments) == 0 {
return nil
}
// Need to divide here to account for skipped rows.
statement += sql.ValuesPlaceholders(valuesPerRow, len(arguments)/valuesPerRow)
statement += `ON CONFLICT DO NOTHING`
err := crdbpgx.ExecuteTx(ctx, db, pgx.TxOptions{}, func(tx pgx.Tx) error {
_, err := tx.Exec(ctx, statement, arguments...)
return err // don't wrap - might get retried
})
return skerr.Wrap(err)
})
})
}
if err := eg.Wait(); err != nil {
return skerr.Wrapf(err, "writing expectations to SQL")
}
return nil
}
var (
catchAllUUID = uuid.MustParse("00000000-0000-0000-0000-000000000000")
)
func storeExpectations(ctx context.Context, db *pgxpool.Pool, toStore map[string][]v3ExpectationEntry, nameToGroupings map[types.TestName][]schema.GroupingID, deltas map[string][]v3ExpectationChange) (int, error) {
sklog.Infof("have %d partitions", len(toStore))
extraExpectations := int32(0)
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
extra, err := storePrimaryBranchExpectations(ctx, db, toStore[v3PrimaryPartition], nameToGroupings, deltas[v3PrimaryPartition])
atomic.AddInt32(&extraExpectations, int32(extra))
return skerr.Wrap(err)
})
for b, e := range toStore {
branchName, exps := b, e
if branchName == v3PrimaryPartition {
continue
}
sklog.Infof("Writing expectations from partition %s", branchName)
eg.Go(func() error {
extra, err := storeSecondaryBranchExpectations(ctx, db, branchName, exps, nameToGroupings, deltas[branchName])
atomic.AddInt32(&extraExpectations, int32(extra))
return skerr.Wrap(err)
})
}
if err := eg.Wait(); err != nil {
return 0, skerr.Wrapf(err, "writing deltas to SQL")
}
return int(extraExpectations), nil
}
func storePrimaryBranchExpectations(ctx context.Context, db *pgxpool.Pool, exps []v3ExpectationEntry, nameToGroupings map[types.TestName][]schema.GroupingID, deltas []v3ExpectationChange) (int, error) {
const batchSize = 1000
extra := 0
return extra, util.ChunkIter(len(exps), batchSize, func(startIdx int, endIdx int) error {
if err := ctx.Err(); err != nil {
return skerr.Wrap(err)
}
batch := exps[startIdx:endIdx]
statement := `UPSERT INTO Expectations
(grouping_id, digest, label, expectation_record_id) VALUES `
const valuesPerRow = 4
arguments := make([]interface{}, 0, valuesPerRow*len(batch))
for _, exp := range batch {
groupings, ok := nameToGroupings[exp.Grouping]
if !ok {
continue
}
dBytes, err := sql.DigestToBytes(exp.Digest)
if err != nil {
sklog.Warningf("Corrupt digest %q on branch %s", exp.Digest, v3PrimaryPartition)
continue
}
label := exp.Ranges[0].Label
recordID, ok := find(exp.Grouping, exp.Digest, label, deltas)
// some test names correspond to more than one grouping. We need to write the
// expectations for both.
for _, gID := range groupings {
if !ok {
extra++
}
arguments = append(arguments, gID, dBytes, convertLabel(label), recordID)
}
}
if len(arguments) == 0 {
return nil
}
// Need to divide here to account for skipped rows.
statement += sql.ValuesPlaceholders(valuesPerRow, len(arguments)/valuesPerRow)
err := crdbpgx.ExecuteTx(ctx, db, pgx.TxOptions{}, func(tx pgx.Tx) error {
_, err := tx.Exec(ctx, statement, arguments...)
return err // don't wrap - might get retried
})
return skerr.Wrap(err)
})
}
func storeSecondaryBranchExpectations(ctx context.Context, db *pgxpool.Pool, branchName string, exps []v3ExpectationEntry, nameToGroupings map[types.TestName][]schema.GroupingID, deltas []v3ExpectationChange) (int, error) {
const batchSize = 1000
extra := 0
return extra, util.ChunkIter(len(exps), batchSize, func(startIdx int, endIdx int) error {
if err := ctx.Err(); err != nil {
return skerr.Wrap(err)
}
batch := exps[startIdx:endIdx]
statement := `UPSERT INTO SecondaryBranchExpectations
(branch_name, grouping_id, digest, label, expectation_record_id) VALUES `
const valuesPerRow = 5
arguments := make([]interface{}, 0, valuesPerRow*len(batch))
for _, exp := range batch {
groupings, ok := nameToGroupings[exp.Grouping]
if !ok {
continue
}
dBytes, err := sql.DigestToBytes(exp.Digest)
if err != nil {
sklog.Warningf("Corrupt digest %q on branch %s", exp.Digest, branchName)
continue
}
label := exp.Ranges[0].Label
recordID, ok := find(exp.Grouping, exp.Digest, label, deltas)
// account for test names that belong to multiple corpora.
for _, gID := range groupings {
if !ok {
extra++
}
arguments = append(arguments, branchName, gID, dBytes, convertLabel(label), recordID)
}
}
if len(arguments) == 0 {
return nil
}
// Need to divide here to account for skipped rows.
statement += sql.ValuesPlaceholders(valuesPerRow, len(arguments)/valuesPerRow)
err := crdbpgx.ExecuteTx(ctx, db, pgx.TxOptions{}, func(tx pgx.Tx) error {
_, err := tx.Exec(ctx, statement, arguments...)
return err // don't wrap - might get retried
})
return skerr.Wrap(err)
})
}
// find returns a matching record id for the given change and true or the catch-all UUID and false.
func find(grouping types.TestName, digest types.Digest, label expectations.LabelInt, deltas []v3ExpectationChange) (uuid.UUID, bool) {
for _, delta := range deltas {
if delta.Grouping == grouping && delta.Digest == digest && delta.AffectedRange.Label == label {
return uuid.Must(uuid.FromBytes(hash(delta.RecordID))), true
}
}
return catchAllUUID, false
}
func convertLabel(label expectations.LabelInt) schema.ExpectationLabel {
switch label {
case expectations.UntriagedInt:
return schema.LabelUntriaged
case expectations.PositiveInt:
return schema.LabelPositive
case expectations.NegativeInt:
return schema.LabelNegative
}
return schema.LabelUntriaged
}
func hash(id string) []byte {
h := md5.Sum([]byte(id))
return h[:]
}
const (
v3Partitions = "expstore_partitions_v3"
v3ExpectationEntries = "entries"
v3RecordEntries = "triage_records"
v3ChangeEntries = "triage_changes"
v3PrimaryPartition = "master"
v3DigestField = "digest"
maxRetries = 10
maxOperationTime = 10 * time.Minute
)
type v3Impl struct {
client *ifirestore.Client
}
type v3ExpectationEntry struct {
Grouping types.TestName `firestore:"grouping"`
Digest types.Digest `firestore:"digest"`
Updated time.Time `firestore:"updated"`
LastUsed time.Time `firestore:"last_used"`
Ranges []v3TriageRange `firestore:"ranges"`
NeedsGC bool `firestore:"needs_gc"`
}
func (e *v3ExpectationEntry) id() string {
s := string(e.Grouping) + "|" + string(e.Digest)
// firestore gets cranky if there are / in key names
return strings.Replace(s, "/", "-", -1)
}
type v3TriageRange struct {
FirstIndex int `firestore:"first_index"`
LastIndex int `firestore:"last_index"`
Label expectations.LabelInt `firestore:"label"`
}
type v3TriageRecord struct {
ID string
UserName string `firestore:"user"`
TS time.Time `firestore:"ts"`
Changes int `firestore:"changes"`
Committed bool `firestore:"committed"`
}
type v3ExpectationChange struct {
// RecordID refers to a document in the records collection.
RecordID string `firestore:"record_id"`
Grouping types.TestName `firestore:"grouping"`
Digest types.Digest `firestore:"digest"`
AffectedRange v3TriageRange `firestore:"affected_range"`
LabelBefore expectations.LabelInt `firestore:"label_before"`
}
func (v v3Impl) fetchTriageRecords(ctx context.Context) (map[string][]v3TriageRecord, error) {
// maps partition to records
rv := map[string][]v3TriageRecord{}
partitionIterator := v.client.Collection(v3Partitions).DocumentRefs(ctx)
p, err := partitionIterator.Next()
for ; err == nil; p, err = partitionIterator.Next() {
var records []v3TriageRecord
partition := p.ID
sklog.Infof("Partition %s", partition)
recordIterator := v.client.Collection(v3Partitions).Doc(partition).Collection(v3RecordEntries).Documents(ctx)
docs, err := recordIterator.GetAll()
if err != nil {
return nil, skerr.Wrapf(err, "getting records for %s", partition)
}
for _, doc := range docs {
var r v3TriageRecord
if err := doc.DataTo(&r); err != nil {
sklog.Warning("Corrupt triage record with id %s", doc.Ref.ID)
continue
}
r.ID = doc.Ref.ID
records = append(records, r)
}
rv[partition] = records
}
if err != iterator.Done {
return nil, skerr.Wrap(err)
}
return rv, nil
}
func (v v3Impl) fetchExpectationDeltas(ctx context.Context) (map[string][]v3ExpectationChange, error) {
rv := map[string][]v3ExpectationChange{} // Maps partition -> entries
partitionIterator := v.client.Collection(v3Partitions).DocumentRefs(ctx)
p, err := partitionIterator.Next()
for ; err == nil; p, err = partitionIterator.Next() {
partition := p.ID
const numShards = 16
base := v.client.Collection(v3Partitions).Doc(partition).Collection(v3ChangeEntries)
queries := fs_utils.ShardOnDigest(base, v3DigestField, numShards)
shardedEntries := make([][]v3ExpectationChange, numShards)
err := v.client.IterDocsInParallel(ctx, "loadExpectationDeltas", partition, queries, maxRetries, maxOperationTime, func(i int, doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
entry := v3ExpectationChange{}
if err := doc.DataTo(&entry); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in firestore, could not unmarshal expectationChange with id %s", id)
}
shardedEntries[i] = append(shardedEntries[i], entry)
return nil
})
if err != nil {
return nil, skerr.Wrapf(err, "fetching expectation deltas for partition %s", partition)
}
var combinedEntries []v3ExpectationChange
for _, shard := range shardedEntries {
combinedEntries = append(combinedEntries, shard...)
}
rv[partition] = combinedEntries
}
return rv, nil
}
func (v v3Impl) fetchExpectations(ctx context.Context) (map[string][]v3ExpectationEntry, error) {
rv := map[string][]v3ExpectationEntry{} // Maps partition -> entries
partitionIterator := v.client.Collection(v3Partitions).DocumentRefs(ctx)
p, err := partitionIterator.Next()
for ; err == nil; p, err = partitionIterator.Next() {
partition := p.ID
const numShards = 16
base := v.client.Collection(v3Partitions).Doc(partition).Collection(v3ExpectationEntries)
queries := fs_utils.ShardOnDigest(base, v3DigestField, numShards)
shardedEntries := make([][]v3ExpectationEntry, numShards)
err := v.client.IterDocsInParallel(ctx, "loadExpectations", partition, queries, maxRetries, maxOperationTime, func(i int, doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
entry := v3ExpectationEntry{}
if err := doc.DataTo(&entry); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in firestore, could not unmarshal expectationEntry with id %s", id)
}
if len(entry.Ranges) == 0 {
// This should never happen, but we'll ignore these malformed entries if they do.
return nil
}
shardedEntries[i] = append(shardedEntries[i], entry)
return nil
})
if err != nil {
return nil, skerr.Wrapf(err, "fetching expectations for partition %s", partition)
}
var combinedEntries []v3ExpectationEntry
for _, shard := range shardedEntries {
combinedEntries = append(combinedEntries, shard...)
}
rv[partition] = combinedEntries
sklog.Infof("Fetched %d entries for partition %s", len(combinedEntries), partition)
}
return rv, nil
}