blob: c1eed06613d2b2446c3a7dc763fb4b0f630a64b1 [file] [log] [blame]
// Implementation of backing up a DB to Google Cloud Storage (GCS).
package recovery
import (
const (
// DB_BACKUP_DIR is the prefix of the object name to store DB backups in the
// GCS bucket.
DB_BACKUP_DIR = "db-backup"
// DB_FILE_NAME_EXTENSION is added to the base filename.
// TRIGGER_DIRNAME is the name of the directory containing files indicating
// that an automatic backup should occur. These files are created by the
// systemd task-scheduler-db-backup.service.
TRIGGER_DIRNAME = "trigger-backup"
// RETRY_COUNT is the number of times to attempt a DB backup when failures
// occur.
// JOB_BACKUP_DIR is the prefix of the object name to store incremental Job
// backups in the GCS bucket.
JOB_BACKUP_DIR = "job-backup"
// JOB_FILE_NAME_EXTENSION is added to the base filename.
// DBBackup has methods to trigger periodic and immediate backups.
type DBBackup interface {
// Tick triggers a periodic backup if one is due. This allows callers to
// perform backups when it is less likely to conflict with other actions.
// ImmediateBackup triggers a backup immediately.
ImmediateBackup() error
// RetrieveJobs returns all backed-up Jobs created or modified since the given
// time, as a map[Job.Id]*Job. Only the most recent backup is returned.
RetrieveJobs(since time.Time) (map[string]*db.Job, error)
// gsDBBackup implements DBBackup.
type gsDBBackup struct {
// gsBucket specifies the GCS bucket (for testing).
gsBucket string
// gsClient accesses gsBucket.
gsClient *storage.Client
// db is the DB to back up.
db db.BackupDBCloser
// ctx allows to stop the gsDBBackup.
ctx context.Context
// triggerDir is the directory that task-scheduler-db-backup.service will
// write files to trigger an automatic backup.
triggerDir string
// modifiedJobsId is the return value of StartTrackingModifiedJobs.
modifiedJobsId string
// lastDBBackupLiveness records the modified time of the most recent DB
// backup.
lastDBBackupLiveness metrics2.Liveness
// recentDBBackupCount records the number of DB backups in the last 24 hours.
recentDBBackupCount metrics2.Int64Metric
// maybeBackupDBLiveness records whether maybeBackupDB is being called.
maybeBackupDBLiveness metrics2.Liveness
// jobBackupCount records the number of jobs backed up since the gsDBBackup
// was created.
jobBackupCount metrics2.Counter
// incrementalBackupLiveness tracks whether incrementalBackupStep is running
// successfully.
incrementalBackupLiveness metrics2.Liveness
// incrementalBackupResetCount records the number of times GetModifiedJobsGOB
// returned ErrUnknownId since the last successful DB backup.
incrementalBackupResetCount metrics2.Counter
// NewDBBackup creates a DBBackup.
// - ctx can be used to stop any background processes as well as to interrupt
// Tick or ImmediateBackup.
// - gsBucket is the GCS bucket to store backups.
// - db is the DB to back up.
// - authClient is a client authenticated with auth.SCOPE_READ_WRITE.
func NewDBBackup(ctx context.Context, gsBucket string, db db.BackupDBCloser, name string, workdir string, authClient *http.Client) (DBBackup, error) {
gsClient, err := storage.NewClient(ctx, option.WithHTTPClient(authClient))
if err != nil {
return nil, err
b, err := newGsDbBackupWithClient(ctx, gsBucket, db, name, workdir, gsClient)
if err != nil {
return nil, err
go util.RepeatCtx(10*time.Minute, b.ctx, b.updateMetrics)
go util.RepeatCtx(10*time.Second, b.ctx, func() {
if err := b.incrementalBackupStep(time.Now()); err != nil {
sklog.Errorf("Incremental Job backup failed: %s", err)
return b, nil
// newGsDbBackupWithClient is the same as NewDBBackup but takes a GCS client for
// testing and does not start the metrics goroutine or the incremental backup
// goroutine.
func newGsDbBackupWithClient(ctx context.Context, gsBucket string, db db.BackupDBCloser, name string, workdir string, gsClient *storage.Client) (*gsDBBackup, error) {
modJobsId, err := db.StartTrackingModifiedJobs()
if err != nil {
return nil, err
metricTags := map[string]string{
"database": name,
b := &gsDBBackup{
gsBucket: gsBucket,
gsClient: gsClient,
db: db,
ctx: ctx,
triggerDir: path.Join(workdir, TRIGGER_DIRNAME),
modifiedJobsId: modJobsId,
lastDBBackupLiveness: metrics2.NewLiveness("last_db_backup", metricTags),
recentDBBackupCount: metrics2.GetInt64Metric("recent_db_backup_count", metricTags),
maybeBackupDBLiveness: metrics2.NewLiveness("db_backup_maybe_backup_db", metricTags),
jobBackupCount: metrics2.GetCounter("incremental_job_backup", metricTags),
incrementalBackupLiveness: metrics2.NewLiveness("incremental_backup", metricTags),
incrementalBackupResetCount: metrics2.GetCounter("incremental_backup_reset", metricTags),
// Release resources when done.
go func() {
// TODO(benjaminwagner): Liveness doesn't have a Delete method.
//if err := b.lastDBBackupLiveness.Delete(); err != nil {
// sklog.Error(err)
if err := b.recentDBBackupCount.Delete(); err != nil {
// TODO(benjaminwagner): Liveness doesn't have a Delete method.
//if err := b.maybeBackupDBLiveness.Delete(); err != nil {
// sklog.Error(err)
if err := b.jobBackupCount.Delete(); err != nil {
// TODO(benjaminwagner): Liveness doesn't have a Delete method.
//if err := b.incrementalBackupLiveness.Delete(); err != nil {
// sklog.Error(err)
if err := b.incrementalBackupResetCount.Delete(); err != nil {
return b, nil
// getBackupMetrics returns the Updated time of the most recent DB backup and
// the number of backups in the last 24 hours, or the zero time if no backups
// exist. Does not return an error unless the request could not be completed.
func (b *gsDBBackup) getBackupMetrics(now time.Time) (time.Time, int64, error) {
lastTime := time.Time{}
var count int64 = 0
countAfter := now.Add(-24 * time.Hour)
err := gcs.AllFilesInDir(b.gsClient, b.gsBucket, DB_BACKUP_DIR, func(item *storage.ObjectAttrs) {
if item.Updated.After(lastTime) {
lastTime = item.Updated
if item.Updated.After(countAfter) {
return lastTime, count, err
// updateMetrics updates the metrics for the time since last successful backup
// and number of backups in the last 24 hours.
func (b *gsDBBackup) updateMetrics() {
last, count, err := b.getBackupMetrics(time.Now())
if err != nil {
sklog.Errorf("Failed to get DB backup metrics: %s", err)
sklog.Infof("Last DB backup was %s.", last)
// writeDBBackupToFile creates filename and writes the DB to it. File may be
// written even if an error is returned.
func (b *gsDBBackup) writeDBBackupToFile(filename string) error {
fileW, err := os.Create(filename)
if err != nil {
return fmt.Errorf("Could not create temp file to write DB backup: %s", err)
defer func() {
// We set fileW to nil when we manually close it below.
if fileW != nil {
// TODO(benjaminwagner): Start WriteBackup in a goroutine, close fileW on
// b.ctx.Done().
if err := b.db.WriteBackup(fileW); err != nil {
return err
err, fileW = fileW.Close(), nil
return err
// uploadFile gzips and writes the given file as the given object name to GCS.
func uploadFile(ctx context.Context, filename string, bucket *storage.BucketHandle, objectname string, modTime time.Time) (err error) {
fileR, openErr := os.Open(filename)
if openErr != nil {
return fmt.Errorf("Unable to read temporary backup file: %s", err)
// If we are able to successfully read temp file until EOF, we don't
// care if Close returns an error.
defer util.Close(fileR)
return upload(ctx, fileR, bucket, objectname, modTime)
// upload gzips and writes the given content as the given object name to GCS.
func upload(ctx context.Context, content io.Reader, bucket *storage.BucketHandle, objectname string, modTime time.Time) (err error) {
objW := bucket.Object(objectname).NewWriter(ctx)
basename := path.Base(objectname)
objW.ObjectAttrs.ContentType = "application/octet-stream"
objW.ObjectAttrs.ContentDisposition = fmt.Sprintf("attachment; filename=\"%s\"", basename)
objW.ObjectAttrs.ContentEncoding = "gzip"
if err := util.WithGzipWriter(objW, func(gzW io.Writer) error {
gzW.(*gzip.Writer).Header.Name = basename
gzW.(*gzip.Writer).Header.ModTime = modTime.UTC()
if _, err = io.Copy(gzW, content); err != nil {
return err
return nil
}); err != nil {
_ = objW.CloseWithError(err)
return err
return objW.Close()
// backupDB performs an immediate backup of b.db, using the given name as the
// base filename.
func (b *gsDBBackup) backupDB(now time.Time, basename string) (err error) {
// We expect TMPDIR to be set to a location that can store a large file at
// high throughput.
tempdir, err := ioutil.TempDir("", "dbbackup")
if err != nil {
return err
defer util.RemoveAll(tempdir)
tempfilename := path.Join(tempdir, fmt.Sprintf("%s.%s", basename, DB_FILE_NAME_EXTENSION))
modTime, err := b.db.GetIncrementalBackupTime()
if err != nil {
sklog.Warningf("Error getting DB incremental backup time; using current time instead. %s", err)
modTime = now
if err := b.writeDBBackupToFile(tempfilename); err != nil {
return err
bucket := b.gsClient.Bucket(b.gsBucket)
objectname := fmt.Sprintf("%s/%s/%s.%s", DB_BACKUP_DIR, now.UTC().Format("2006/01/02"), basename, DB_FILE_NAME_EXTENSION)
if err := uploadFile(b.ctx, tempfilename, bucket, objectname, modTime); err != nil {
return err
return nil
// immediateBackupBasename creates a base filename for backupDB that is unlikely
// to conflict with other backups.
func immediateBackupBasename(now time.Time) string {
return "task-scheduler-" + now.UTC().Format("15:04:05")
// See documentation for DBBackup.ImmediateBackup.
func (b *gsDBBackup) ImmediateBackup() error {
sklog.Infof("Beginning manual DB backup.")
now := time.Now()
return b.backupDB(now, immediateBackupBasename(now))
// findAndParseTriggerFile returns the base filename for the first file in
// triggerDir and the number of times backupDB has failed for this trigger. For
// empty files written by the systemd service task-scheduler-db-backup.service,
// returns the filename and 0.
func (b *gsDBBackup) findAndParseTriggerFile() (string, int, error) {
dir, err := os.Open(b.triggerDir)
if err != nil {
return "", 0, fmt.Errorf("Unable to read trigger directory %s: %s", b.triggerDir, err)
defer util.Close(dir)
files, err := dir.Readdirnames(1)
if err == io.EOF {
return "", 0, nil
} else if err != nil {
return "", 0, fmt.Errorf("Unable to list trigger directory %s: %s", b.triggerDir, err)
basename := files[0]
filename := path.Join(b.triggerDir, basename)
content, err := ioutil.ReadFile(filename)
if err != nil {
return "", 0, fmt.Errorf("Unable to read trigger file %s: %s", filename, err)
trimmed := bytes.TrimSpace(content)
retries := 0
if len(trimmed) > 0 {
retries, err = strconv.Atoi(string(trimmed))
if err != nil {
return "", 0, fmt.Errorf("Unable to parse trigger file %s: %s. Full content: %q", filename, err, string(content))
return basename, retries, nil
// writeTriggerFile writes to the given trigger file indicating that the given
// number of backupDB attempts have failed.
func (b *gsDBBackup) writeTriggerFile(basename string, retries int) error {
filename := path.Join(b.triggerDir, basename)
content := []byte(strconv.Itoa(retries))
if err := ioutil.WriteFile(filename, content, 0666); err != nil {
return fmt.Errorf("Unable to write new retry count (%d) to trigger file %s: %s", retries, filename, err)
return nil
// deleteTriggerFile removes the given trigger file indicating that the backup
// succeeded or retries are exhausted.
func (b *gsDBBackup) deleteTriggerFile(basename string) error {
filename := path.Join(b.triggerDir, basename)
if err := os.Remove(filename); err != nil {
return fmt.Errorf("Unable to remove trigger file %s: %s", filename, err)
return nil
// maybeBackupDB calls backupDB if TRIGGER_DIRNAME contains a file.
func (b *gsDBBackup) maybeBackupDB(now time.Time) {
// Look for a trigger file written by task-scheduler-db-backup.service
// or a previous automatic backup attempt.
basename, attemptCount, err := b.findAndParseTriggerFile()
if err != nil {
if basename == "" {
if attemptCount == 1 {
sklog.Infof("Beginning automatic DB backup.")
} else {
sklog.Infof("Retrying automatic DB backup -- attempt %d.", attemptCount)
if err := b.backupDB(now, basename); err != nil {
sklog.Errorf("Automatic DB backup failed: %s", err)
if attemptCount >= RETRY_COUNT {
sklog.Errorf("Automatic DB backup failed after %d attempts. Retries exhausted.", attemptCount)
if err := b.deleteTriggerFile(basename); err != nil {
} else {
if err := b.writeTriggerFile(basename, attemptCount); err != nil {
} else {
sklog.Infof("Completed automatic DB backup.")
if err := b.deleteTriggerFile(basename); err != nil {
// See documentation for DBBackup.Tick.
func (b *gsDBBackup) Tick() {
now := time.Now()
// TODO(benjaminwagner): Tick should return as soon as the DB file is written.
// formatJobObjectName returns the GCS object name for a Job with the given id
// being uploaded at the given time.
func formatJobObjectName(ts time.Time, id string) string {
return fmt.Sprintf("%s/%s/%s.%s", JOB_BACKUP_DIR, ts.UTC().Format("2006/01/02"), id, JOB_FILE_NAME_EXTENSION)
// parseIdFromJobObjectName returns the Job ID from a GCS object name formatted
// with formatJobObjectName.
func parseIdFromJobObjectName(name string) string {
return strings.TrimSuffix(path.Base(name), "."+JOB_FILE_NAME_EXTENSION)
// backupJob writes the given bytes to GCS under the given Job id.
func (b *gsDBBackup) backupJob(now time.Time, id string, jobGob []byte) error {
bucket := b.gsClient.Bucket(b.gsBucket)
return upload(b.ctx, bytes.NewReader(jobGob), bucket, formatJobObjectName(now, id), now)
// incrementalBackupStep writes all recently modified Jobs to GCS.
func (b *gsDBBackup) incrementalBackupStep(now time.Time) error {
jobs, err := b.db.GetModifiedJobsGOB(b.modifiedJobsId)
if db.IsUnknownId(err) {
sklog.Errorf("incrementalBackupStep too slow; GetModifiedJobsGOB expired id: %s", b.modifiedJobsId)
id, startErr := b.db.StartTrackingModifiedJobs()
if startErr != nil {
return startErr
b.modifiedJobsId = id
// Since we just started tracking, there's nothing to do.
// TODO(benjaminwagner): Ideally, we should scan the JobCache for Jobs whose
// DbModified time is after b.db.GetIncrementalBackupTime() and call
// backupJob for each of them.
return err
} else if err != nil {
return err
errs := []error{}
for id, jobGob := range jobs {
// TODO(benjaminwagner): Use goroutines.
if err := b.backupJob(now, id, jobGob); err != nil {
// We still want to process the remaining jobs.
errs = append(errs, err)
if len(errs) == 0 {
if err := b.db.SetIncrementalBackupTime(now); err != nil {
return err
return nil
} else if len(errs) == 1 {
return errs[0]
} else {
errStr := &bytes.Buffer{}
fmt.Fprint(errStr, "Multiple errors performing incremental Job backups:")
for _, err := range errs {
fmt.Fprint(errStr, "\n", err.Error())
return errors.New(errStr.String())
// downloadGOB reads and GOB-decodes the given object from GCS.
func downloadGOB(ctx context.Context, bucket *storage.BucketHandle, objectname string, dst interface{}) error {
objR, err := bucket.Object(objectname).NewReader(ctx)
if err != nil {
return err
// As long as we can decode the object, we don't care if Close returns an
// error.
defer util.Close(objR)
// GCS will transparently decompress gzip unless the client specifies
// "Accept-Encoding: gzip". Unfortunately, the Go GCS client library does not
// provide a way to specify Accept-Encoding, so the file will be decompressed
// on the server side. See
if err := gob.NewDecoder(objR).Decode(dst); err != nil {
return fmt.Errorf("Error decoding GOB data: %s", err)
return nil
// RetrieveJobs implements DBBackup.RetrieveJobs for gsDBBackup.
func RetrieveJobs(ctx context.Context, since time.Time, gsClient *storage.Client, gsBucket string) (map[string]*db.Job, error) {
sinceDir := path.Dir(formatJobObjectName(since, "dummy")) + "/"
bucket := gsClient.Bucket(gsBucket)
rv := map[string]*db.Job{}
// Iterate from today backwards to sinceDir.
for t := time.Now(); ; t = t.Add(-24 * time.Hour) {
curDir := path.Dir(formatJobObjectName(t, "dummy")) + "/"
if curDir < sinceDir {
q := &storage.Query{Prefix: curDir, Versions: false}
it := bucket.Objects(ctx, q)
for obj, err := it.Next(); err != iterator.Done; obj, err = it.Next() {
if err != nil {
return nil, fmt.Errorf("Unable to list jobs in %s/%s: %s", gsBucket, curDir, err)
if obj.Updated.Before(since) {
// If rv already contains this Job, it is newer than this version, so
// skip.
id := parseIdFromJobObjectName(obj.Name)
if _, ok := rv[id]; ok {
// TODO(benjaminwagner): Download and decode in parallel.
var job db.Job
if err := downloadGOB(ctx, bucket, obj.Name, &job); err != nil {
return nil, fmt.Errorf("Unable to read %s/%s: %s", gsBucket, obj.Name, err)
rv[job.Id] = &job
return rv, nil
// See docs for DBBackup interface.
func (b *gsDBBackup) RetrieveJobs(since time.Time) (map[string]*db.Job, error) {
return RetrieveJobs(b.ctx, since, b.gsClient, b.gsBucket)