blob: 9e55e900651e1309b4bbec5d8da34e897dfe2be5 [file] [log] [blame]
package main
import (
"context"
"flag"
"math"
"strings"
"time"
"cloud.google.com/go/firestore"
"go.skia.org/infra/golden/go/fs_utils"
"google.golang.org/api/iterator"
ifirestore "go.skia.org/infra/go/firestore"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/golden/go/expectations"
"go.skia.org/infra/golden/go/types"
)
const (
maxOperationTime = 2 * time.Minute
maxRetries = 5
dryrun = true
)
func main() {
var (
fsProjectID = flag.String("fs_project_id", "skia-firestore", "The project with the firestore instance. Datastore and Firestore can't be in the same project.")
oldFSNamespace = flag.String("old_fs_namespace", "", "Typically the instance id. e.g. 'chrome-gpu', 'skia', etc")
newFSNamespace = flag.String("new_fs_namespace", "", "Typically the instance id. e.g. 'chrome'")
)
flag.Parse()
if *oldFSNamespace == "" {
sklog.Fatalf("You must include fs_namespace")
}
if *newFSNamespace == "" {
sklog.Fatalf("You must include sql_db_name")
}
fsClient, err := ifirestore.NewClient(context.Background(), *fsProjectID, "gold", *oldFSNamespace, nil)
if err != nil {
sklog.Fatalf("Unable to configure Firestore: %s", err)
}
ctx := context.Background()
oldNamespace := v3Impl{client: fsClient}
fsClient, err = ifirestore.NewClient(context.Background(), *fsProjectID, "gold", *newFSNamespace, nil)
if err != nil {
sklog.Fatalf("Unable to configure Firestore: %s", err)
}
newNamespace := v3Impl{client: fsClient}
// Fetch triage records
records, err := oldNamespace.fetchTriageRecords(ctx)
if err != nil {
sklog.Fatalf("Fetching triage records: %s", err)
}
sklog.Infof("Should migrate %d records", len(records))
if err := newNamespace.storeRecords(ctx, records); err != nil {
sklog.Fatalf("Storing triage records: %s", err)
}
// Fetch Deltas
deltas, err := oldNamespace.fetchExpectationDeltas(ctx)
if err != nil {
sklog.Fatalf("Fetching expectation deltas: %s", err)
}
if err := newNamespace.storeExpectationChanges(ctx, deltas); err != nil {
sklog.Fatalf("Storing triage records: %s", err)
}
// Fetch entries
exp, err := oldNamespace.fetchExpectations(ctx)
if err != nil {
sklog.Fatalf("Fetching expectation entries: %s", err)
}
if err := newNamespace.storeEntries(ctx, exp); err != nil {
sklog.Fatalf("Storing triage records: %s", err)
}
sklog.Infof("Done")
}
const (
v3Partitions = "expstore_partitions_v3"
v3ExpectationEntries = "entries"
v3RecordEntries = "triage_records"
v3ChangeEntries = "triage_changes"
v3MasterPartition = "master"
v3BeginningOfTime = 0
v3EndOfTime = math.MaxInt32
v3DigestField = "digest"
)
type v3Impl struct {
client *ifirestore.Client
}
type v3ExpectationEntry struct {
Grouping types.TestName `firestore:"grouping"`
Digest types.Digest `firestore:"digest"`
Updated time.Time `firestore:"updated"`
LastUsed time.Time `firestore:"last_used"`
Ranges []v3TriageRange `firestore:"ranges"`
NeedsGC bool `firestore:"needs_gc"`
}
func (e *v3ExpectationEntry) id() string {
s := string(e.Grouping) + "|" + string(e.Digest)
// firestore gets cranky if there are / in key names
return strings.Replace(s, "/", "-", -1)
}
type v3TriageRange struct {
FirstIndex int `firestore:"first_index"`
LastIndex int `firestore:"last_index"`
Label expectations.LabelInt `firestore:"label"`
}
type v3TriageRecord struct {
ID string
UserName string `firestore:"user"`
TS time.Time `firestore:"ts"`
Changes int `firestore:"changes"`
Committed bool `firestore:"committed"`
}
type v3ExpectationChange struct {
// RecordID refers to a document in the records collection.
RecordID string `firestore:"record_id"`
Grouping types.TestName `firestore:"grouping"`
Digest types.Digest `firestore:"digest"`
AffectedRange v3TriageRange `firestore:"affected_range"`
LabelBefore expectations.LabelInt `firestore:"label_before"`
}
func (v v3Impl) fetchTriageRecords(ctx context.Context) (map[string][]v3TriageRecord, error) {
// maps partition to records
rv := map[string][]v3TriageRecord{}
partitionIterator := v.client.Collection(v3Partitions).DocumentRefs(ctx)
p, err := partitionIterator.Next()
for ; err == nil; p, err = partitionIterator.Next() {
var records []v3TriageRecord
partition := p.ID
sklog.Infof("Partition %s", partition)
recordIterator := v.client.Collection(v3Partitions).Doc(partition).Collection(v3RecordEntries).Documents(ctx)
docs, err := recordIterator.GetAll()
if err != nil {
return nil, skerr.Wrapf(err, "getting records for %s", partition)
}
for _, doc := range docs {
var r v3TriageRecord
if err := doc.DataTo(&r); err != nil {
if err != nil {
sklog.Warning("Corrupt triage record with id %s", doc.Ref.ID)
continue
}
}
r.ID = doc.Ref.ID
records = append(records, r)
}
rv[partition] = records
}
if err != iterator.Done {
return nil, skerr.Wrap(err)
}
return rv, nil
}
func (v v3Impl) storeRecords(ctx context.Context, recordsByPartition map[string][]v3TriageRecord) error {
if dryrun {
sklog.Infof("Would store %d partition of records", len(recordsByPartition))
return nil
}
for partition, records := range recordsByPartition {
sklog.Infof("Writing %d records for partition %s", len(records), partition)
entryCollection := v.client.Collection(v3Partitions).Doc(partition).Collection(v3RecordEntries)
err := v.client.BatchWrite(ctx, len(records), ifirestore.MAX_TRANSACTION_DOCS, maxOperationTime, nil, func(b *firestore.WriteBatch, i int) error {
record := records[i]
doc := entryCollection.Doc(record.ID)
b.Set(doc, record)
return nil
})
if err != nil {
return skerr.Wrapf(err, "storing to partition %s", partition)
}
}
return nil
}
func (v v3Impl) fetchExpectationDeltas(ctx context.Context) (map[string][]v3ExpectationChange, error) {
rv := map[string][]v3ExpectationChange{} // Maps partition -> entries
partitionIterator := v.client.Collection(v3Partitions).DocumentRefs(ctx)
p, err := partitionIterator.Next()
for ; err == nil; p, err = partitionIterator.Next() {
partition := p.ID
const numShards = 16
base := v.client.Collection(v3Partitions).Doc(partition).Collection(v3ChangeEntries)
queries := fs_utils.ShardOnDigest(base, v3DigestField, numShards)
shardedEntries := make([][]v3ExpectationChange, numShards)
err := v.client.IterDocsInParallel(ctx, "loadExpectationDeltas", partition, queries, maxRetries, maxOperationTime, func(i int, doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
entry := v3ExpectationChange{}
if err := doc.DataTo(&entry); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in firestore, could not unmarshal expectationChange with id %s", id)
}
shardedEntries[i] = append(shardedEntries[i], entry)
return nil
})
if err != nil {
return nil, skerr.Wrapf(err, "fetching expectation deltas for partition %s", partition)
}
var combinedEntries []v3ExpectationChange
for _, shard := range shardedEntries {
combinedEntries = append(combinedEntries, shard...)
}
rv[partition] = combinedEntries
}
return rv, nil
}
func (v v3Impl) storeExpectationChanges(ctx context.Context, changesByPartition map[string][]v3ExpectationChange) error {
if dryrun {
sklog.Infof("Would store %d partition of changes", len(changesByPartition))
return nil
}
for partition, changes := range changesByPartition {
sklog.Infof("Writing %d changes for partition %s", len(changes), partition)
changesCollection := v.client.Collection(v3Partitions).Doc(partition).Collection(v3ChangeEntries)
err := v.client.BatchWrite(ctx, len(changes), ifirestore.MAX_TRANSACTION_DOCS, maxOperationTime, nil, func(b *firestore.WriteBatch, i int) error {
change := changes[i]
doc := changesCollection.NewDoc()
b.Set(doc, change)
return nil
})
if err != nil {
return skerr.Wrapf(err, "storing to partition %s", partition)
}
}
return nil
}
func (v v3Impl) fetchExpectations(ctx context.Context) (map[string][]v3ExpectationEntry, error) {
rv := map[string][]v3ExpectationEntry{} // Maps partition -> entries
partitionIterator := v.client.Collection(v3Partitions).DocumentRefs(ctx)
p, err := partitionIterator.Next()
for ; err == nil; p, err = partitionIterator.Next() {
partition := p.ID
const numShards = 16
base := v.client.Collection(v3Partitions).Doc(partition).Collection(v3ExpectationEntries)
queries := fs_utils.ShardOnDigest(base, v3DigestField, numShards)
shardedEntries := make([][]v3ExpectationEntry, numShards)
err := v.client.IterDocsInParallel(ctx, "loadExpectations", partition, queries, maxRetries, maxOperationTime, func(i int, doc *firestore.DocumentSnapshot) error {
if doc == nil {
return nil
}
entry := v3ExpectationEntry{}
if err := doc.DataTo(&entry); err != nil {
id := doc.Ref.ID
return skerr.Wrapf(err, "corrupt data in firestore, could not unmarshal expectationEntry with id %s", id)
}
if len(entry.Ranges) == 0 {
// This should never happen, but we'll ignore these malformed entries if they do.
return nil
}
shardedEntries[i] = append(shardedEntries[i], entry)
return nil
})
if err != nil {
return nil, skerr.Wrapf(err, "fetching expectations for partition %s", partition)
}
var combinedEntries []v3ExpectationEntry
for _, shard := range shardedEntries {
combinedEntries = append(combinedEntries, shard...)
}
rv[partition] = combinedEntries
sklog.Infof("Fetched %d entries for partition %s", len(combinedEntries), partition)
}
return rv, nil
}
func (v v3Impl) storeEntries(ctx context.Context, entriesByPartition map[string][]v3ExpectationEntry) error {
if dryrun {
sklog.Infof("Would store %d partition of entries", len(entriesByPartition))
return nil
}
for partition, entries := range entriesByPartition {
sklog.Infof("Writing %d entries for partition %s", len(entries), partition)
entryCollection := v.client.Collection(v3Partitions).Doc(partition).Collection(v3ExpectationEntries)
err := v.client.BatchWrite(ctx, len(entries), ifirestore.MAX_TRANSACTION_DOCS, maxOperationTime, nil, func(b *firestore.WriteBatch, i int) error {
entry := entries[i]
doc := entryCollection.Doc(entry.id())
b.Set(doc, entry)
return nil
})
if err != nil {
return skerr.Wrapf(err, "storing to partition %s", partition)
}
}
return nil
}