blob: 928ef33bf58e4f6bc523ca01932934ea577bcbf4 [file] [log] [blame]
package main
// This program migrates the data for Gold from SQL to Cloud Datastore.
import (
"flag"
"math"
"math/rand"
"sync"
"sync/atomic"
"time"
"cloud.google.com/go/datastore"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/database"
"go.skia.org/infra/go/deepequal"
"go.skia.org/infra/go/ds"
"go.skia.org/infra/go/skiaversion"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/golden/go/db"
"go.skia.org/infra/golden/go/expstorage"
"go.skia.org/infra/golden/go/ignore"
"go.skia.org/infra/golden/go/types"
)
// Command line flags
var (
dsNamespace = flag.String("ds_namespace", "", "Cloud datastore namespace to be used by this instance.")
projectID = flag.String("project_id", common.PROJECT_ID, "GCP project ID.")
promptPassword = flag.Bool("password", false, "Prompt for root password.")
)
// List of entities we are importing
var targetKinds = []ds.Kind{
ds.MASTER_EXP_CHANGE,
ds.IGNORE_RULE,
ds.EXPECTATIONS_BLOB,
ds.EXPECTATIONS_BLOB_ROOT,
}
func main() {
// Configure the MySQL database
dbConf := database.ConfigFromFlags(db.PROD_DB_HOST, db.PROD_DB_PORT, database.USER_ROOT, db.PROD_DB_NAME, db.MigrationSteps())
// Global init to initialize logging and parse arguments.
common.Init()
skiaversion.MustLogVersion()
// Set up the SQL based expectations store
vdb := setupMySQL(dbConf, *promptPassword)
// Set up the cloud data store based expectations store
// Needed to use TimeSortableKey(...) which relies on an RNG. See docs there.
rand.Seed(time.Now().UnixNano())
if err := ds.InitWithOpt(*projectID, *dsNamespace); err != nil {
sklog.Fatalf("Unable to configure cloud datastore: %s", err)
}
dsClient := ds.DS
// Remove entities from previous runs.
removeExistingEntities(dsClient, targetKinds)
// Migrate the expectation and ignore stores
migrateExpectationStore(vdb, dsClient)
migrateIgnoreStore(vdb, dsClient)
sklog.Infoln("Database migration finished.")
}
func removeExistingEntities(dsClient *datastore.Client, targetKinds []ds.Kind) {
// Remove all instances that might be there from a previous migration run.
sklog.Infof("Removing old entries")
var wg sync.WaitGroup
for _, kind := range targetKinds {
wg.Add(1)
go func(kind ds.Kind) {
defer wg.Done()
removeCount, err := ds.DeleteAll(dsClient, kind, true)
if err != nil {
sklog.Fatalf("Error deleting entities of kind %s: %s", kind, err)
}
sklog.Infof("Removed %d %s entities", removeCount, kind)
}(kind)
}
wg.Wait()
sklog.Infof("Done removing old entries")
}
func migrateExpectationStore(vdb *database.VersionedDB, dsClient *datastore.Client) {
sqlExpStore := expstorage.NewSQLExpectationStore(vdb)
newExpStore, _, err := expstorage.NewCloudExpectationsStore(dsClient, nil)
if err != nil {
sklog.Fatalf("Unable to create cloud expectations store: %s", err)
}
// Get the cloud datastore directly to sideload the data via functions that
// are not part of the ExpectationsStore interface.
cloudExpStore := newExpStore.(*expstorage.CloudExpStore)
// Get the total number of expectation changes and divide them into pages.
_, total, err := sqlExpStore.QueryLog(0, 1, false)
pageSize := 1000
changeRecCount := 0
totalChangeCount := int32(0)
lastTS := int64(-1)
nPages := int(math.Ceil(float64(total) / float64(pageSize)))
sklog.Infof("Found %d change records. Fetching %d pages", total, nPages)
importChanges := make([]types.TestExp, 0, total)
importKeys := make([]*datastore.Key, 0, total)
// Iterate over the expectation changes in the sql expectations store
for p := 0; p < nPages; p++ {
first := p * pageSize
logEntries, _, err := sqlExpStore.QueryLog(first, util.MinInt(pageSize, total), true)
if err != nil {
sklog.Fatalf("Error retrieving expectation changes: %s", err)
}
var wg sync.WaitGroup
startTime := time.Now()
newEntries := make([]types.TestExp, len(logEntries))
newKeys := make([]*datastore.Key, len(logEntries))
for i := 0; i < len(logEntries); i++ {
entry := logEntries[i]
if lastTS == -1 {
lastTS = entry.TS
}
if entry.TS > lastTS {
sklog.Fatalf("TS does not decrease monotonically. Change %s has time stamp %d following %d", entry.ID, entry.TS, lastTS)
}
lastTS = entry.TS
wg.Add(1)
go func(idx int, entry *expstorage.TriageLogEntry) {
defer wg.Done()
// Write the changes directly to the cloud datastore and keep the key.
changes := entry.GetChanges()
newKey, err := cloudExpStore.ImportChange(changes, entry.Name, entry.TS)
if err != nil {
sklog.Fatalf("Error adding expectation change: %s", err)
}
newEntries[idx] = changes
newKeys[idx] = newKey
atomic.AddInt32(&totalChangeCount, int32(len(entry.Details)))
}(i, entry)
}
wg.Wait()
importChanges = append(importChanges, newEntries...)
importKeys = append(importKeys, newKeys...)
avgTime := float64(time.Now().Sub(startTime)/time.Millisecond) / float64(len(logEntries))
changeRecCount += len(logEntries)
sklog.Infof("Migrated %d/%d records. %.2f ms average", changeRecCount, total, avgTime)
}
// Accumulate the expecations from what we have loaded from the SQL store.
localExps := types.NewExpectations(nil)
for i := len(importChanges) - 1; i >= 0; i-- {
localExps.AddTestExp(importChanges[i])
}
// Get the expectations of the SQL store.
sqlExpectations, err := sqlExpStore.Get()
if err != nil {
sklog.Fatalf("Unable to retrieve sql expectations: %s", err)
}
// Compare the sql expectations to the locally computed expectations
sklog.Infof("Doing by test comparison")
lt := localExps.TestExp()
testFailures := 0
digestFailures := 0
for testName, digests := range sqlExpectations.TestExp() {
found, ok := lt[testName]
sklog.Infof("%s %v", testName, ok)
if !ok {
testFailures++
continue
}
failCount := 0
for d, l := range digests {
if l != found[d] {
sklog.Infof(" fail %s %s %s", d, l, found[d])
failCount++
}
}
digestFailures += failCount
}
sklog.Infof("Test failures : %5d", testFailures)
sklog.Infof("Digest failures : %5d", digestFailures)
// Due to an issue with the SQL in SqlExpectationStore we are willing to
// accept a small number of discrepancies. The number 4 was chosen based on
// differences inspected in a very recent snapshot of the database.
// If it goes up this will fail and further investigation is necessary.
if testFailures > 0 || digestFailures > 4 {
sklog.Fatalf("Got more errors than expected. Testfailures: %d Digest failures: %d", testFailures, digestFailures)
}
// Calculate the expectations from the changes we imported into the CloudExpecationsStore
calcExps, err := cloudExpStore.CalcExpectations(importKeys)
if err != nil {
sklog.Fatalf("Error calculating expectations: %s", err)
}
// Make sure they are the same as the locally calculated expectations.
if !deepequal.DeepEqual(localExps, calcExps) {
sklog.Fatalf("Local expectations and calculated expectations do not match")
}
// Store the calculated expectations in the cloud datastore.
if err := cloudExpStore.PutExpectations(calcExps.TestExp()); err != nil {
sklog.Fatalf("Error writing expectations: %s", err)
}
// Read them back and make sure they match what we have written earlier.
foundExp, err := cloudExpStore.Get()
if err != nil {
sklog.Fatalf("Error retrieving cloud expectations: %s", err)
}
if !deepequal.DeepEqual(calcExps, foundExp) {
sklog.Fatalf("Found expectations and expectations from SQL do not match.")
}
sklog.Infof("Summary: migrated %d expectation changes with %d expectation values changes", total, totalChangeCount)
}
func migrateIgnoreStore(vdb *database.VersionedDB, dsClient *datastore.Client) {
sqlIgnoreStore := ignore.NewSQLIgnoreStore(vdb, nil, nil)
cloudIgnoreStore, err := ignore.NewCloudIgnoreStore(dsClient, nil, nil)
if err != nil {
sklog.Fatalf("Error creating CloudIgnoreStore: %s", err)
}
ignoreRules, err := sqlIgnoreStore.List(false)
if err != nil {
sklog.Fatalf("Error retrieving ignore rules: %s", err)
}
for _, rule := range ignoreRules {
if err := cloudIgnoreStore.Create(rule); err != nil {
sklog.Fatalf("Error creating new ignore rule: %s", err)
}
}
cloudIgnoreRules, err := cloudIgnoreStore.List(false)
if err != nil {
sklog.Fatalf("Error retrieving ignore rules: %s", err)
}
if !deepequal.DeepEqual(ignoreRules, cloudIgnoreRules) {
sklog.Fatalf("Ignore rules do not match !")
}
sklog.Infof("Migrated %d ignore rules", len(ignoreRules))
}
// Initialize the MySQL wrapper
func setupMySQL(dbConf *database.DatabaseConfig, promptPassword bool) *database.VersionedDB {
if promptPassword {
if err := dbConf.PromptForPassword(); err != nil {
sklog.Fatal(err)
}
}
vdb, err := dbConf.NewVersionedDB()
if err != nil {
sklog.Fatal(err)
}
return vdb
}