blob: 66488824322b2bf13adc97638824941197c66185 [file] [log] [blame]
// The fsmigrator executable migrates various data from firestore to an SQL database.
// It uses port forwarding, as that is the simplest approach and there shouldn't be
// too much data.
package main
import (
"context"
"crypto/md5"
"flag"
"strings"
"time"
"cloud.google.com/go/firestore"
"github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx"
"github.com/google/uuid"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"google.golang.org/api/iterator"
ifirestore "go.skia.org/infra/go/firestore"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/golden/go/fs_utils"
"go.skia.org/infra/golden/go/sql"
"go.skia.org/infra/golden/go/sql/schema"
"go.skia.org/infra/golden/go/types"
)
func main() {
var (
fsProjectID = flag.String("fs_project_id", "skia-firestore", "The project with the firestore instance. Datastore and Firestore can't be in the same project.")
oldFSNamespace = flag.String("old_fs_namespace", "", "Typically the instance id. e.g. 'chrome-gpu', 'skia', etc")
newSQLDatabase = flag.String("new_sql_db", "", "Something like the instance id (no dashes)")
)
flag.Parse()
if *oldFSNamespace == "" {
sklog.Fatalf("You must include fs_namespace")
}
if *newSQLDatabase == "" {
sklog.Fatalf("You must include new_sql_db")
}
ctx := context.Background()
fsClient, err := ifirestore.NewClient(ctx, *fsProjectID, "gold", *oldFSNamespace, nil)
if err != nil {
sklog.Fatalf("Unable to configure Firestore: %s", err)
}
u := sql.GetConnectionURL("root@localhost:26234", *newSQLDatabase)
conf, err := pgxpool.ParseConfig(u)
if err != nil {
sklog.Fatalf("error getting postgres config %s: %s", u, err)
}
conf.MaxConns = 16
db, err := pgxpool.ConnectConfig(ctx, conf)
if err != nil {
sklog.Info("You must run\nkubectl port-forward gold-cockroachdb-0 26234:26234")
sklog.Fatalf("error connecting to the database: %s", err)
}
oldNamespace := v3Impl{client: fsClient}
// We need to look up the groupings by test name, because the old groupings were just the
// test name and not combined with the corpus. As such, we fetch all the groupings from the SQL,
// which has been filled through ingesting.
nameToGroupings, err := fetchGroupings(ctx, db)
if err != nil {
sklog.Fatalf("Getting groupings from SQL: %s", err)
}
sklog.Infof("Fetched %d groupings from SQL DB", len(nameToGroupings))
sklog.Infof("Fetching expectations")
exp, err := oldNamespace.fetchExpectations(ctx)
if err != nil {
sklog.Fatalf("fetching expectations %s", err)
}
// pass in deltas so we can link in the triage record to the expectations.
err = storeExpectations(ctx, db, exp, nameToGroupings)
if err != nil {
sklog.Fatalf("storing expectation deltas: %s", err)
}
//
// // Write the catchall expectation record. If somehow an expectation exists, but wasn't covered
// // by a delta (e.g. broken old data), we assign it to this catchall record.
// _, err = db.Exec(ctx, `UPSERT INTO ExpectationRecords
//(expectation_record_id, user_name, triage_time, num_changes) VALUES ($1, $2, $3, $4)`,
// catchAllUUID, "sql_migrator", time.Now(), unowned)
// if err != nil {
// sklog.Fatalf("creating catch-all record: %s", err)
// }
sklog.Info("done")
}
func fetchGroupings(ctx context.Context, db *pgxpool.Pool) (map[types.TestName][]schema.GroupingID, error) {
rv := map[types.TestName][]schema.GroupingID{}
rows, err := db.Query(ctx, `SELECT keys -> 'name', grouping_id FROM Groupings`)
if err != nil {
return nil, skerr.Wrap(err)
}
defer rows.Close()
for rows.Next() {
var name types.TestName
var gID schema.GroupingID
if err := rows.Scan(&name, &gID); err != nil {
return nil, skerr.Wrap(err)
}
rv[name] = append(rv[name], gID)
}
count := 0
for _, groupings := range rv {
if len(groupings) > 1 {
count++
}
}
sklog.Infof("%d out of %d test names belonged to multiple corpora", count, len(rv))
return rv, nil
}
var (
catchAllUUID = uuid.MustParse("00000000-0000-0000-0000-000000000000")
)
func storeExpectations(ctx context.Context, db *pgxpool.Pool, toStore map[string][]v3ExpectationEntry, nameToGroupings map[types.TestName][]schema.GroupingID) error {
sklog.Infof("have %d partitions", len(toStore))
return skerr.Wrap(storePrimaryBranchExpectations(ctx, db, toStore[v3PrimaryPartition], nameToGroupings))
}
func storePrimaryBranchExpectations(ctx context.Context, db *pgxpool.Pool, exps []v3ExpectationEntry, nameToGroupings map[types.TestName][]schema.GroupingID) error {
const batchSize = 1000
return util.ChunkIter(len(exps), batchSize, func(startIdx int, endIdx int) error {
if err := ctx.Err(); err != nil {
return skerr.Wrap(err)
}
batch := exps[startIdx:endIdx]
statement := `INSERT INTO Expectations
(grouping_id, digest, label, expectation_record_id) VALUES `
const valuesPerRow = 4
arguments := make([]interface{}, 0, valuesPerRow*len(batch))
for _, exp := range batch {
groupings, ok := nameToGroupings[exp.Grouping]
if !ok {
continue
}
dBytes, err := sql.DigestToBytes(exp.Digest)
if err != nil {
sklog.Warningf("Corrupt digest %q on branch %s", exp.Digest, v3PrimaryPartition)
continue
}
label := exp.Ranges[0].Label
// some test names correspond to more than one grouping. We need to write the
// expectations for both.
for _, gID := range groupings {
arguments = append(arguments, gID, dBytes, convertLabel(label), catchAllUUID)
}
}
if len(arguments) == 0 {
return nil
}
// Need to divide here to account for skipped rows.
statement += sql.ValuesPlaceholders(valuesPerRow, len(arguments)/valuesPerRow)
statement += `
ON CONFLICT (grouping_id, digest)
DO UPDATE SET (label, expectation_record_id) =
(excluded.label, excluded.expectation_record_id)
WHERE Expectations.label = 'u' AND excluded.label != 'u'
`
err := crdbpgx.ExecuteTx(ctx, db, pgx.TxOptions{}, func(tx pgx.Tx) error {
_, err := tx.Exec(ctx, statement, arguments...)
return err // don't wrap - might get retried
})
return skerr.Wrap(err)
})
}
// labelInt is the integer version of Label, used as the storage format in firestore
type labelInt int
const (
// untriagedInt represents a previously unseen digest.
untriagedInt labelInt = iota // == 0
// positiveInt represents a known good digest.
positiveInt
// negativeInt represents a known bad digest.
negativeInt
)
func convertLabel(label labelInt) schema.ExpectationLabel {
switch label {
case untriagedInt:
return schema.LabelUntriaged
case positiveInt:
return schema.LabelPositive
case negativeInt:
return schema.LabelNegative
}
return schema.LabelUntriaged
}
func hash(id string) []byte {
h := md5.Sum([]byte(id))
return h[:]
}
const (
v3Partitions = "expstore_partitions_v3"
v3ExpectationEntries = "entries"
v3RecordEntries = "triage_records"
v3ChangeEntries = "triage_changes"
v3PrimaryPartition = "master"
v3DigestField = "digest"
maxRetries = 10
maxOperationTime = 10 * time.Minute
)
type v3Impl struct {
client *ifirestore.Client
}
type v3ExpectationEntry struct {
Grouping types.TestName `firestore:"grouping"`
Digest types.Digest `firestore:"digest"`
Updated time.Time `firestore:"updated"`
LastUsed time.Time `firestore:"last_used"`
Ranges []v3TriageRange `firestore:"ranges"`
NeedsGC bool `firestore:"needs_gc"`
}
func (e *v3ExpectationEntry) id() string {
s := string(e.Grouping) + "|" + string(e.Digest)
// firestore gets cranky if there are / in key names
return strings.Replace(s, "/", "-", -1)
}
type v3TriageRange struct {
FirstIndex int `firestore:"first_index"`
LastIndex int `firestore:"last_index"`
Label labelInt `firestore:"label"`
}
type v3TriageRecord struct {
ID string
UserName string `firestore:"user"`
TS time.Time `firestore:"ts"`
Changes int `firestore:"changes"`
Committed bool `firestore:"committed"`
}
type v3ExpectationChange struct {
// RecordID refers to a document in the records collection.
RecordID string `firestore:"record_id"`
Grouping types.TestName `firestore:"grouping"`
Digest types.Digest `firestore:"digest"`
AffectedRange v3TriageRange `firestore:"affected_range"`
LabelBefore labelInt `firestore:"label_before"`
}
func (v v3Impl) fetchTriageRecords(ctx context.Context) (map[string][]v3TriageRecord, error) {
// maps partition to records
rv := map[string][]v3TriageRecord{}
partitionIterator := v.client.Collection(v3Partitions).DocumentRefs(ctx)
p, err := partitionIterator.Next()
for ; err == nil; p, err = partitionIterator.Next() {
var records []v3TriageRecord
partition := p.ID
sklog.Infof("Partition %s", partition)
recordIterator := v.client.Collection(v3Partitions).Doc(partition).Collection(v3RecordEntries).Documents(ctx)
docs, err := recordIterator.GetAll()
if err != nil {
return nil, skerr.Wrapf(err, "getting records for %s", partition)
}
for _, doc := range docs {
var r v3TriageRecord
if err := doc.DataTo(&r); err != nil {
sklog.Warning("Corrupt triage record with id %s", doc.Ref.ID)
continue
}
r.ID = doc.Ref.ID
records = append(records, r)
}
rv[partition] = records
}
if err != iterator.Done {
return nil, skerr.Wrap(err)
}
return rv, nil
}
func (v v3Impl) fetchExpectationDeltas(ctx context.Context) (map[string][]v3ExpectationChange, error) {
rv := map[string][]v3ExpectationChange{} // Maps partition -> entries
partitionIterator := v.client.Collection(v3Partitions).DocumentRefs(ctx)
p, err := partitionIterator.Next()
for ; err == nil; p, err = partitionIterator.Next() {
partition := p.ID
const numShards = 16
base := v.client.Collection(v3Partitions).Doc(partition).Collection(v3ChangeEntries)
queries := fs_utils.ShardOnDigest(base, v3DigestField, numShards)
shardedEntries := make([][]v3ExpectationChange, numShards)
err := v.client.IterDocsInParallel(ctx, "loadExpectationDeltas", partition, queries, maxRetries, maxOperationTime, func(i int, doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
entry := v3ExpectationChange{}
if err := doc.DataTo(&entry); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in firestore, could not unmarshal expectationChange with id %s", id)
}
shardedEntries[i] = append(shardedEntries[i], entry)
return nil
})
if err != nil {
return nil, skerr.Wrapf(err, "fetching expectation deltas for partition %s", partition)
}
var combinedEntries []v3ExpectationChange
for _, shard := range shardedEntries {
combinedEntries = append(combinedEntries, shard...)
}
rv[partition] = combinedEntries
}
return rv, nil
}
func (v v3Impl) fetchExpectations(ctx context.Context) (map[string][]v3ExpectationEntry, error) {
rv := map[string][]v3ExpectationEntry{} // Maps partition -> entries
partitionIterator := v.client.Collection(v3Partitions).DocumentRefs(ctx)
p, err := partitionIterator.Next()
for ; err == nil; p, err = partitionIterator.Next() {
partition := p.ID
if partition != v3PrimaryPartition {
continue
}
const numShards = 16
base := v.client.Collection(v3Partitions).Doc(partition).Collection(v3ExpectationEntries)
queries := fs_utils.ShardOnDigest(base, v3DigestField, numShards)
shardedEntries := make([][]v3ExpectationEntry, numShards)
err := v.client.IterDocsInParallel(ctx, "loadExpectations", partition, queries, maxRetries, maxOperationTime, func(i int, doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
entry := v3ExpectationEntry{}
if err := doc.DataTo(&entry); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in firestore, could not unmarshal expectationEntry with id %s", id)
}
if len(entry.Ranges) == 0 {
// This should never happen, but we'll ignore these malformed entries if they do.
return nil
}
shardedEntries[i] = append(shardedEntries[i], entry)
return nil
})
if err != nil {
return nil, skerr.Wrapf(err, "fetching expectations for partition %s", partition)
}
var combinedEntries []v3ExpectationEntry
for _, shard := range shardedEntries {
combinedEntries = append(combinedEntries, shard...)
}
rv[partition] = combinedEntries
sklog.Infof("Fetched %d entries for partition %s", len(combinedEntries), partition)
}
return rv, nil
}