blob: eda14e3f543f054e17b2afa70c869f6bdf3abe93 [file] [log] [blame]
package recovery
import (
"bytes"
"compress/gzip"
"context"
"encoding/gob"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math/rand"
"mime"
"mime/multipart"
"net/http"
"net/url"
"os"
"path"
"reflect"
"regexp"
"strconv"
"strings"
"testing"
"time"
"cloud.google.com/go/storage"
"github.com/gorilla/mux"
assert "github.com/stretchr/testify/require"
"go.skia.org/infra/go/deepequal"
"go.skia.org/infra/go/exec"
exec_testutils "go.skia.org/infra/go/exec/testutils"
"go.skia.org/infra/go/mockhttpclient"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/testutils"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/db"
memory "go.skia.org/infra/task_scheduler/go/db/memory"
"go.skia.org/infra/task_scheduler/go/types"
"google.golang.org/api/option"
)
const (
TEST_BUCKET = "skia-test"
TEST_DB_CONTENT = `
I'm a little database
Short and stout!
Here is my file handle
Here is my timeout.
When I get all locked up,
Hear me shout!
Make a backup
And write it out!
`
TEST_DB_TIME = 1477000000
TEST_DB_CONTENT_SEED = 299792
// If metrics2.Liveness.Get returns a value longer than
// MAX_TEST_TIME_SECONDS, we know it wasn't reset during the test.
MAX_TEST_TIME_SECONDS = 15 * 60
)
// Create an io.Reader that returns the given number of bytes.
func makeLargeDBContent(bytes int64) io.Reader {
r := rand.New(rand.NewSource(TEST_DB_CONTENT_SEED))
return &io.LimitedReader{
R: r,
N: bytes,
}
}
// testDB implements db.BackupDBCloser.
type testDB struct {
db.DB
content io.Reader
ts time.Time
injectSetTSError error
injectGetTSError error
injectWriteError error
}
// Closes content if necessary.
func (tdb *testDB) Close() error {
closer, ok := tdb.content.(io.Closer)
if ok {
return closer.Close()
}
return nil
}
// Implements BackupDBCloser.WriteBackup.
func (tdb *testDB) WriteBackup(w io.Writer) error {
defer util.Close(tdb) // close tdb.content
if tdb.injectWriteError != nil {
return tdb.injectWriteError
}
_, err := io.Copy(w, tdb.content)
return err
}
// Implements BackupDBCloser.SetIncrementalBackupTime.
func (tdb *testDB) SetIncrementalBackupTime(ts time.Time) error {
if tdb.injectSetTSError != nil {
return tdb.injectSetTSError
}
tdb.ts = ts
return nil
}
// Implements BackupDBCloser.GetIncrementalBackupTime.
func (tdb *testDB) GetIncrementalBackupTime() (time.Time, error) {
if tdb.injectGetTSError != nil {
return time.Time{}, tdb.injectGetTSError
}
return tdb.ts.UTC(), nil
}
// getMockedDBBackup returns a gsDBBackup that handles GCS requests with mockMux.
// If mockMux is nil, an empty mux.Router is used. WriteBackup will write
// TEST_DB_CONTENT.
func getMockedDBBackup(t *testing.T, mockMux *mux.Router) (*gsDBBackup, context.CancelFunc) {
return getMockedDBBackupWithContent(t, mockMux, bytes.NewReader([]byte(TEST_DB_CONTENT)))
}
// getMockedDBBackupWithContent is like getMockedDBBackup but WriteBackup will
// copy the given content.
func getMockedDBBackupWithContent(t *testing.T, mockMux *mux.Router, content io.Reader) (*gsDBBackup, context.CancelFunc) {
if mockMux == nil {
mockMux = mux.NewRouter()
}
ctx, ctxCancel := context.WithCancel(context.Background())
gsClient, err := storage.NewClient(ctx, option.WithHTTPClient(mockhttpclient.NewMuxClient(mockMux)))
assert.NoError(t, err)
dir, err := ioutil.TempDir("", "getMockedDBBackupWithContent")
assert.NoError(t, err)
assert.NoError(t, os.MkdirAll(path.Join(dir, TRIGGER_DIRNAME), os.ModePerm))
db := &testDB{
DB: memory.NewInMemoryDB(nil),
content: content,
ts: time.Unix(TEST_DB_TIME, 0),
}
b, err := newGsDbBackupWithClient(ctx, TEST_BUCKET, db, "task_scheduler_db", dir, gsClient)
assert.NoError(t, err)
return b, func() {
ctxCancel()
testutils.RemoveAll(t, dir)
}
}
// object represents a GCS object for makeObjectResponse and makeObjectsResponse.
type object struct {
bucket string
name string
time time.Time
}
// makeObjectResponse generates the JSON representation of a GCS object.
func makeObjectResponse(obj object) string {
timeStr := obj.time.UTC().Format(time.RFC3339)
return fmt.Sprintf(`{
"kind": "storage#object",
"id": "%s/%s",
"name": "%s",
"bucket": "%s",
"generation": "1",
"metageneration": "1",
"timeCreated": "%s",
"updated": "%s",
"storageClass": "STANDARD",
"size": "15",
"md5Hash": "d8dh5MIGdPoMfh/owveXhA==",
"crc32c": "Oz54cA==",
"etag": "CLD56dvBp8oCEAE="
}`, obj.bucket, obj.name, obj.name, obj.bucket, timeStr, timeStr)
}
// makeObjectsResponse generates the JSON representation of an array of GCS
// objects.
func makeObjectsResponse(objs []object) string {
jsObjs := make([]string, 0, len(objs))
for _, o := range objs {
jsObjs = append(jsObjs, makeObjectResponse(o))
}
return fmt.Sprintf(`{
"kind": "storage#objects",
"items": [
%s
]
}`, strings.Join(jsObjs, ",\n"))
}
// gsRoute returns the mux.Route for the GCS server.
func gsRoute(mockMux *mux.Router) *mux.Route {
return mockMux.Schemes("https").Host("www.googleapis.com")
}
// addListObjectsHandler causes r to respond to a request to list objects in
// TEST_BUCKET/prefix with the given objects, formatted with
// makeObjectsResponse.
func addListObjectsHandler(t *testing.T, r *mux.Router, prefix string, objs []object) {
gsRoute(r).Methods("GET").
Path(fmt.Sprintf("/storage/v1/b/%s/o", TEST_BUCKET)).
Queries("prefix", prefix).
Handler(mockhttpclient.MockGetDialogue([]byte(makeObjectsResponse(objs))))
}
// getBackupMetrics should return zero time and zero count when there are no
// existing backups.
func TestGetBackupMetricsNoFiles(t *testing.T) {
testutils.SmallTest(t)
now := time.Now()
r := mux.NewRouter()
addListObjectsHandler(t, r, DB_BACKUP_DIR, []object{})
b, cancel := getMockedDBBackup(t, r)
defer cancel()
ts, count, err := b.getBackupMetrics(now)
assert.NoError(t, err)
assert.True(t, ts.IsZero())
assert.Equal(t, int64(0), count)
}
// getBackupMetrics should return the time of the latest object when there are
// multiple.
func TestGetBackupMetricsTwoFiles(t *testing.T) {
testutils.SmallTest(t)
now := time.Now().Round(time.Second)
r := mux.NewRouter()
addListObjectsHandler(t, r, DB_BACKUP_DIR, []object{
{TEST_BUCKET, "a", now.Add(-1 * time.Hour).UTC()},
{TEST_BUCKET, "b", now.Add(-2 * time.Hour).UTC()},
})
b, cancel := getMockedDBBackup(t, r)
defer cancel()
ts, count, err := b.getBackupMetrics(now)
assert.NoError(t, err)
assert.True(t, ts.Equal(now.Add(-1*time.Hour)), "Expected %s, got %s", now.Add(-1*time.Hour), ts)
assert.Equal(t, int64(2), count)
}
// getBackupMetrics should not count objects that were not modified recently.
func TestGetBackupMetricsSeveralDays(t *testing.T) {
testutils.SmallTest(t)
now := time.Now().Round(time.Second)
r := mux.NewRouter()
addListObjectsHandler(t, r, DB_BACKUP_DIR, []object{
{TEST_BUCKET, "a", now.Add(-49 * time.Hour).UTC()},
{TEST_BUCKET, "b", now.Add(-25 * time.Hour).UTC()},
{TEST_BUCKET, "c", now.Add(-1 * time.Hour).UTC()},
})
b, cancel := getMockedDBBackup(t, r)
defer cancel()
ts, count, err := b.getBackupMetrics(now)
assert.NoError(t, err)
assert.True(t, ts.Equal(now.Add(-1*time.Hour)))
assert.Equal(t, int64(1), count)
}
// getBackupMetrics should return the latest backup time even if it is far in
// the past.
func TestGetBackupMetricsOld(t *testing.T) {
testutils.SmallTest(t)
now := time.Now().Round(time.Second)
r := mux.NewRouter()
addListObjectsHandler(t, r, DB_BACKUP_DIR, []object{
{TEST_BUCKET, "a", now.Add(-49 * time.Hour).UTC()},
{TEST_BUCKET, "b", now.Add(-128 * time.Hour).UTC()},
{TEST_BUCKET, "c", now.Add(-762 * time.Hour).UTC()},
})
b, cancel := getMockedDBBackup(t, r)
defer cancel()
ts, count, err := b.getBackupMetrics(now)
assert.NoError(t, err)
assert.True(t, ts.Equal(now.Add(-49*time.Hour)))
assert.Equal(t, int64(0), count)
}
// writeDBBackupToFile should produce a file with contents equal to what
// WriteBackup wrote.
func TestWriteDBBackupToFile(t *testing.T) {
testutils.SmallTest(t)
b, cancel := getMockedDBBackup(t, nil)
defer cancel()
tempdir, err := ioutil.TempDir("", "backups_test")
assert.NoError(t, err)
defer testutils.RemoveAll(t, tempdir)
filename := path.Join(tempdir, "foo.bdb")
err = b.writeDBBackupToFile(filename)
assert.NoError(t, err)
actualContents, err := ioutil.ReadFile(filename)
assert.NoError(t, err)
assert.Equal(t, TEST_DB_CONTENT, string(actualContents))
}
// writeDBBackupToFile should succeed even if GetIncrementalBackupTime returns
// an error.
func TestWriteDBBackupToFileGetIncrementalBackupTimeError(t *testing.T) {
testutils.SmallTest(t)
b, cancel := getMockedDBBackup(t, nil)
defer cancel()
tempdir, err := ioutil.TempDir("", "backups_test")
assert.NoError(t, err)
defer testutils.RemoveAll(t, tempdir)
injectedError := fmt.Errorf("Not giving you the time of day!")
// This should not prevent DB from being backed up.
b.db.(*testDB).injectGetTSError = injectedError
filename := path.Join(tempdir, "foo.bdb")
err = b.writeDBBackupToFile(filename)
assert.NoError(t, err)
actualContents, err := ioutil.ReadFile(filename)
assert.NoError(t, err)
assert.Equal(t, TEST_DB_CONTENT, string(actualContents))
}
// writeDBBackupToFile could fail due to disk error.
func TestWriteDBBackupToFileCreateError(t *testing.T) {
testutils.SmallTest(t)
b, cancel := getMockedDBBackup(t, nil)
defer cancel()
tempdir, err := ioutil.TempDir("", "backups_test")
assert.NoError(t, err)
defer testutils.RemoveAll(t, tempdir)
filename := path.Join(tempdir, "nonexistant_dir", "foo.bdb")
err = b.writeDBBackupToFile(filename)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Could not create temp file to write DB backup")
}
// writeDBBackupToFile could fail due to DB error.
func TestWriteDBBackupToFileDBError(t *testing.T) {
testutils.SmallTest(t)
b, cancel := getMockedDBBackup(t, nil)
defer cancel()
tempdir, err := ioutil.TempDir("", "backups_test")
assert.NoError(t, err)
defer testutils.RemoveAll(t, tempdir)
injectedError := fmt.Errorf("Can't back up: unable to shift to reverse.")
b.db.(*testDB).injectWriteError = injectedError
filename := path.Join(tempdir, "foo.bdb")
err = b.writeDBBackupToFile(filename)
assert.Error(t, err)
assert.Contains(t, err.Error(), injectedError.Error())
}
// addMultipartHandler causes r to respond to a request to add an object to
// TEST_BUCKET with a successful response and sets actualBytesGzip[object_name]
// to the object contents. Also performs assertions on the request.
func addMultipartHandler(t *testing.T, r *mux.Router, actualBytesGzip map[string][]byte) {
gsRoute(r).Methods("POST").Path(fmt.Sprintf("/upload/storage/v1/b/%s/o", TEST_BUCKET)).
Queries("uploadType", "multipart").
HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t := mockhttpclient.MuxSafeT(t)
mediaType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
assert.NoError(t, err)
assert.Equal(t, "multipart/related", mediaType)
mr := multipart.NewReader(r.Body, params["boundary"])
jsonPart, err := mr.NextPart()
assert.NoError(t, err)
data := map[string]string{}
assert.NoError(t, json.NewDecoder(jsonPart).Decode(&data))
name := data["name"]
assert.Equal(t, TEST_BUCKET, data["bucket"])
assert.Equal(t, "application/octet-stream", data["contentType"])
assert.Equal(t, "gzip", data["contentEncoding"])
assert.Equal(t, fmt.Sprintf("attachment; filename=\"%s\"", path.Base(name)), data["contentDisposition"])
dataPart, err := mr.NextPart()
assert.NoError(t, err)
actualBytesGzip[name], err = ioutil.ReadAll(dataPart)
assert.NoError(t, err)
_, _ = w.Write([]byte(makeObjectResponse(object{TEST_BUCKET, name, time.Now()})))
})
}
// upload should upload data to GCS.
func TestUpload(t *testing.T) {
testutils.SmallTest(t)
now := time.Now().Round(time.Second)
r := mux.NewRouter()
actualBytesGzip := map[string][]byte{}
addMultipartHandler(t, r, actualBytesGzip)
b, cancel := getMockedDBBackup(t, r)
defer cancel()
name := "path/to/gsfile.txt"
err := upload(b.ctx, strings.NewReader(TEST_DB_CONTENT), b.gsClient.Bucket(b.gsBucket), name, now)
assert.NoError(t, err)
gzR, err := gzip.NewReader(bytes.NewReader(actualBytesGzip[name]))
assert.NoError(t, err)
assert.True(t, now.Equal(gzR.Header.ModTime))
assert.Equal(t, "gsfile.txt", gzR.Header.Name)
actualBytes, err := ioutil.ReadAll(gzR)
assert.NoError(t, err)
assert.NoError(t, gzR.Close())
assert.Equal(t, TEST_DB_CONTENT, string(actualBytes))
}
// upload may fail if the GCS request fails.
func TestUploadError(t *testing.T) {
testutils.SmallTest(t)
now := time.Now()
r := mux.NewRouter()
name := "path/to/gsfile.txt"
gsRoute(r).Methods("POST").
Path(fmt.Sprintf("/upload/storage/v1/b/%s/o", TEST_BUCKET)).
Queries("uploadType", "multipart").
HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
util.Close(r.Body)
http.Error(w, "I don't like your poem.", http.StatusTeapot)
})
b, cancel := getMockedDBBackup(t, r)
defer cancel()
err := upload(b.ctx, strings.NewReader(TEST_DB_CONTENT), b.gsClient.Bucket(b.gsBucket), name, now)
assert.Error(t, err)
assert.Contains(t, err.Error(), "got HTTP response code 418 with body: I don't like your poem.")
}
// uploadFile should upload a file to GCS.
func TestUploadFile(t *testing.T) {
testutils.SmallTest(t)
tempdir, err := ioutil.TempDir("", "backups_test")
assert.NoError(t, err)
defer testutils.RemoveAll(t, tempdir)
filename := path.Join(tempdir, "myfile.txt")
assert.NoError(t, ioutil.WriteFile(filename, []byte(TEST_DB_CONTENT), os.ModePerm))
now := time.Now().Round(time.Second)
r := mux.NewRouter()
actualBytesGzip := map[string][]byte{}
addMultipartHandler(t, r, actualBytesGzip)
b, cancel := getMockedDBBackup(t, r)
defer cancel()
name := "path/to/gsfile.txt"
err = uploadFile(b.ctx, filename, b.gsClient.Bucket(b.gsBucket), name, now)
assert.NoError(t, err)
gzR, err := gzip.NewReader(bytes.NewReader(actualBytesGzip[name]))
assert.NoError(t, err)
assert.True(t, now.Equal(gzR.Header.ModTime))
assert.Equal(t, "gsfile.txt", gzR.Header.Name)
actualBytes, err := ioutil.ReadAll(gzR)
assert.NoError(t, err)
assert.NoError(t, gzR.Close())
assert.Equal(t, TEST_DB_CONTENT, string(actualBytes))
}
// uploadFile may fail if the file doesn't exist.
func TestUploadFileNoFile(t *testing.T) {
testutils.SmallTest(t)
tempdir, err := ioutil.TempDir("", "backups_test")
assert.NoError(t, err)
defer testutils.RemoveAll(t, tempdir)
filename := path.Join(tempdir, "myfile.txt")
b, cancel := getMockedDBBackup(t, nil)
defer cancel()
now := time.Now()
name := "path/to/gsfile.txt"
err = uploadFile(b.ctx, filename, b.gsClient.Bucket(b.gsBucket), name, now)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Unable to read temporary backup file")
}
// backupDB should create a GCS object with the gzipped contents of the DB.
func TestBackupDB(t *testing.T) {
testutils.SmallTest(t)
var expectedBytes []byte
{
// Get expectedBytes from writeDBBackupToFile.
b, cancel := getMockedDBBackup(t, nil)
defer cancel()
tempdir, err := ioutil.TempDir("", "backups_test")
assert.NoError(t, err)
defer testutils.RemoveAll(t, tempdir)
filename := path.Join(tempdir, "expected.bdb")
err = b.writeDBBackupToFile(filename)
assert.NoError(t, err)
expectedBytes, err = ioutil.ReadFile(filename)
assert.NoError(t, err)
}
now := time.Now()
r := mux.NewRouter()
actualBytesGzip := map[string][]byte{}
addMultipartHandler(t, r, actualBytesGzip)
b, cancel := getMockedDBBackup(t, r)
defer cancel()
// Test resetting incrementalBackupResetCount.
b.incrementalBackupResetCount.Inc(1)
err := b.backupDB(now, "task-scheduler")
assert.NoError(t, err)
name := DB_BACKUP_DIR + "/" + now.UTC().Format("2006/01/02") + "/task-scheduler.bdb"
gzR, err := gzip.NewReader(bytes.NewReader(actualBytesGzip[name]))
assert.NoError(t, err)
actualBytes, err := ioutil.ReadAll(gzR)
assert.NoError(t, err)
assert.NoError(t, gzR.Close())
assert.Equal(t, expectedBytes, actualBytes)
// incrementalBackupResetCount should be reset.
assert.Equal(t, int64(0), b.incrementalBackupResetCount.Get())
}
// testBackupDBLarge tests backupDB for DB contents larger than 8MB.
func testBackupDBLarge(t *testing.T, contentSize int64) {
now := time.Now()
r := mux.NewRouter()
name := DB_BACKUP_DIR + "/" + now.UTC().Format("2006/01/02") + "/task-scheduler.bdb"
// https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
uploadId := "resume_me_please"
gsRoute(r).Methods("POST").Path(fmt.Sprintf("/upload/storage/v1/b/%s/o", TEST_BUCKET)).
Queries("uploadType", "resumable").
Headers("Content-Type", "application/json").
HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t := mockhttpclient.MuxSafeT(t)
data := map[string]string{}
assert.NoError(t, json.NewDecoder(r.Body).Decode(&data))
assert.Equal(t, TEST_BUCKET, data["bucket"])
assert.Equal(t, name, data["name"])
assert.Equal(t, "application/octet-stream", data["contentType"])
assert.Equal(t, "gzip", data["contentEncoding"])
assert.Equal(t, "attachment; filename=\"task-scheduler.bdb\"", data["contentDisposition"])
uploadUrl, err := url.Parse(r.URL.String())
assert.NoError(t, err)
query := uploadUrl.Query()
query.Set("upload_id", uploadId)
uploadUrl.RawQuery = query.Encode()
w.Header().Set("Location", uploadUrl.String())
})
rangeRegexp := regexp.MustCompile("bytes ([0-9]+|\\*)-?([0-9]+)?/([0-9]+|\\*)")
var recvBytes int64 = 0
complete := false
// Despite what the documentation says, the Go client uses POST, not PUT.
gsRoute(r).Methods("POST").Path(fmt.Sprintf("/upload/storage/v1/b/%s/o", TEST_BUCKET)).
Queries("uploadType", "resumable", "upload_id", uploadId).
Headers("Content-Type", "application/octet-stream").
HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t := mockhttpclient.MuxSafeT(t)
byteRange := rangeRegexp.FindStringSubmatch(r.Header.Get("Content-Range"))
assert.Equal(t, 4, len(byteRange), "Unexpected request %v %s", r.Header, r.URL.String())
assert.NotEqual(t, "*", byteRange[1], "Test does not support upload size that is a multiple of 8MB.")
begin, err := strconv.ParseInt(byteRange[1], 10, 64)
assert.NoError(t, err)
assert.Equal(t, recvBytes, begin)
end, err := strconv.ParseInt(byteRange[2], 10, 64)
assert.NoError(t, err)
finalChunk := false
if byteRange[3] != "*" {
size, err := strconv.ParseInt(byteRange[3], 10, 64)
assert.NoError(t, err)
finalChunk = size == end+1
}
recvBytes += end - begin + 1
if finalChunk {
complete = true
_, _ = w.Write([]byte(makeObjectResponse(object{TEST_BUCKET, name, time.Now()})))
} else {
w.Header().Set("Range", fmt.Sprintf("0-%d", recvBytes-1))
// https://github.com/google/google-api-go-client/commit/612451d2aabbf88084e4f1c48c0781073c0d5583
w.Header().Set("X-HTTP-Status-Code-Override", "308")
w.WriteHeader(200)
}
})
b, cancel := getMockedDBBackupWithContent(t, r, makeLargeDBContent(contentSize))
defer cancel()
// Check available disk space.
output, err := exec.RunCommand(context.Background(), &exec.Command{
Name: "df",
Args: []string{"--block-size=1", "--output=avail", os.TempDir()},
})
assert.NoError(t, err, "df failed: %s", output)
// Output looks like:
// Avail
// 13704458240
availSize, err := strconv.ParseInt(strings.TrimSpace(strings.Split(output, "\n")[1]), 10, 64)
assert.NoError(t, err, "Unable to parse df output: %s", output)
assert.True(t, availSize > contentSize, "Insufficient disk space to run test; need %d bytes, have %d bytes for %s. Please set TMPDIR.", contentSize, availSize, os.TempDir())
err = b.backupDB(now, "task-scheduler")
assert.NoError(t, err)
assert.True(t, complete)
}
// backupDB should work for a large-ish DB.
func TestBackupDBLarge(t *testing.T) {
testutils.LargeTest(t)
// Send 128MB. Add 1 so it's not a multiple of 8MB.
var contentSize int64 = 128*1024*1024 + 1
testBackupDBLarge(t, contentSize)
}
// backupDB should work for a 16GB DB.
func TestBackupDBHuge(t *testing.T) {
t.Skipf("TODO(benjaminwagner): change TMPDIR to make this work.")
testutils.LargeTest(t)
// Send 16GB. Add 1 so it's not a multiple of 8MB.
var contentSize int64 = 16*1024*1024*1024 + 1
testBackupDBLarge(t, contentSize)
}
// immediateBackupBasename should return a name based on the time of day.
func TestImmediateBackupBasename(t *testing.T) {
testutils.SmallTest(t)
test := func(expected string, input time.Time) {
assert.Equal(t, expected, immediateBackupBasename(input))
}
test("task-scheduler-00:00:00", time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC))
test("task-scheduler-01:02:03", time.Date(2016, 2, 29, 1, 2, 3, 0, time.UTC))
test("task-scheduler-13:14:15", time.Date(2016, 10, 27, 13, 14, 15, 16171819, time.UTC))
test("task-scheduler-23:59:59", time.Date(2016, 12, 31, 23, 59, 59, 999999999, time.UTC))
}
// findAndParseTriggerFile should return an error when the directory doesn't
// exist.
func TestFindAndParseTriggerFileNoDir(t *testing.T) {
testutils.SmallTest(t)
b, cancel := getMockedDBBackup(t, nil)
defer cancel()
testutils.RemoveAll(t, b.triggerDir)
_, _, err := b.findAndParseTriggerFile()
assert.Error(t, err)
assert.Contains(t, err.Error(), "Unable to read trigger directory")
}
// findAndParseTriggerFile should return empty for an empty dir.
func TestFindAndParseTriggerFileNoFile(t *testing.T) {
testutils.SmallTest(t)
b, cancel := getMockedDBBackup(t, nil)
defer cancel()
file, attempts, err := b.findAndParseTriggerFile()
assert.NoError(t, err)
assert.Equal(t, "", file)
assert.Equal(t, 0, attempts)
}
// findAndParseTriggerFile should return the filename and indicate no attempts
// for an empty file.
func TestFindAndParseTriggerFileNewFile(t *testing.T) {
testutils.SmallTest(t)
b, cancel := getMockedDBBackup(t, nil)
defer cancel()
exec_testutils.Run(t, context.Background(), b.triggerDir, "touch", "foo")
file, attempts, err := b.findAndParseTriggerFile()
assert.NoError(t, err)
assert.Equal(t, "foo", file)
assert.Equal(t, 0, attempts)
}
// findAndParseTriggerFile should choose one of the files when multiple are
// present.
func TestFindAndParseTriggerFileTwoFiles(t *testing.T) {
testutils.SmallTest(t)
b, cancel := getMockedDBBackup(t, nil)
defer cancel()
ctx := context.Background()
exec_testutils.Run(t, ctx, b.triggerDir, "touch", "foo")
exec_testutils.Run(t, ctx, b.triggerDir, "touch", "bar")
file, attempts, err := b.findAndParseTriggerFile()
assert.NoError(t, err)
assert.True(t, file == "foo" || file == "bar")
assert.Equal(t, 0, attempts)
}
// writeTriggerFile followed by findAndParseTriggerFile should return the same
// values.
func TestWriteFindAndParseTriggerFileWithRetries(t *testing.T) {
testutils.SmallTest(t)
b, cancel := getMockedDBBackup(t, nil)
defer cancel()
for i := 1; i < 3; i++ {
assert.NoError(t, b.writeTriggerFile("foo", i))
file, attempts, err := b.findAndParseTriggerFile()
assert.NoError(t, err)
assert.Equal(t, "foo", file)
assert.Equal(t, i, attempts)
}
}
// writeTriggerFile could fail if permissions are incorrect.
func TestWriteTriggerFileReadOnly(t *testing.T) {
testutils.SmallTest(t)
b, cancel := getMockedDBBackup(t, nil)
defer cancel()
assert.NoError(t, ioutil.WriteFile(path.Join(b.triggerDir, "foo"), []byte{}, 0444))
err := b.writeTriggerFile("foo", 1)
assert.Error(t, err)
assert.Regexp(t, `Unable to write new retry count \(1\) to trigger file .*/foo: .*permission denied`, err.Error())
}
// findAndParseTriggerFile should return an error when the file can't be parsed.
func TestFindAndParseTriggerFileInvalidContents(t *testing.T) {
testutils.SmallTest(t)
b, cancel := getMockedDBBackup(t, nil)
defer cancel()
assert.NoError(t, ioutil.WriteFile(path.Join(b.triggerDir, "foo"), []byte("Hi Mom!"), 0666))
_, _, err := b.findAndParseTriggerFile()
assert.Error(t, err)
assert.Contains(t, err.Error(), "Unable to parse trigger file")
}
// deleteTriggerFile followed by findAndParseTriggerFile should return empty.
func TestDeleteTriggerFile(t *testing.T) {
testutils.SmallTest(t)
b, cancel := getMockedDBBackup(t, nil)
defer cancel()
assert.NoError(t, b.writeTriggerFile("foo", 1))
file, attempts, err := b.findAndParseTriggerFile()
assert.NoError(t, err)
assert.Equal(t, "foo", file)
assert.Equal(t, 1, attempts)
assert.NoError(t, b.deleteTriggerFile("foo"))
file, attempts, err = b.findAndParseTriggerFile()
assert.NoError(t, err)
assert.Equal(t, "", file)
assert.Equal(t, 0, attempts)
files, err := ioutil.ReadDir(b.triggerDir)
assert.NoError(t, err)
assert.Equal(t, 0, len(files))
}
// deleteTriggerFile could fail if file has already been deleted.
func TestDeleteTriggerFileAlreadyDeleted(t *testing.T) {
testutils.SmallTest(t)
b, cancel := getMockedDBBackup(t, nil)
defer cancel()
err := b.deleteTriggerFile("foo")
assert.Error(t, err)
assert.Regexp(t, "Unable to remove trigger file .*/foo: .*no such file", err.Error())
}
// maybeBackupDB should do nothing if there is no trigger file.
func TestMaybeBackupDBNotYet(t *testing.T) {
testutils.SmallTest(t)
now := time.Now()
r := mux.NewRouter()
called := false
gsRoute(r).HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
called = true
})
b, cancel := getMockedDBBackup(t, r)
defer cancel()
b.maybeBackupDB(now)
assert.False(t, called)
}
// maybeBackupDB should find the trigger file and perform a backup, then delete
// the trigger file if successful.
func TestMaybeBackupDBSuccess(t *testing.T) {
testutils.SmallTest(t)
now := time.Date(2016, 10, 26, 5, 0, 0, 0, time.UTC)
r := mux.NewRouter()
actualBytesGzip := map[string][]byte{}
addMultipartHandler(t, r, actualBytesGzip)
b, cancel := getMockedDBBackup(t, r)
defer cancel()
assert.NoError(t, b.writeTriggerFile("task-scheduler", 0))
b.maybeBackupDB(now)
name := DB_BACKUP_DIR + "/" + now.UTC().Format("2006/01/02") + "/task-scheduler.bdb"
assert.True(t, len(actualBytesGzip[name]) > 0)
file, _, err := b.findAndParseTriggerFile()
assert.NoError(t, err)
assert.Equal(t, "", file)
}
// maybeBackupDB should write the number of attempts to the trigger file if the
// backup fails.
func TestMaybeBackupDBFail(t *testing.T) {
testutils.SmallTest(t)
now := time.Date(2016, 10, 26, 5, 0, 0, 0, time.UTC)
r := mux.NewRouter()
gsRoute(r).Methods("POST").Path(fmt.Sprintf("/upload/storage/v1/b/%s/o", TEST_BUCKET)).
Queries("uploadType", "multipart").
HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
util.Close(r.Body)
http.Error(w, "I don't like your poem.", http.StatusTeapot)
})
b, cancel := getMockedDBBackup(t, r)
defer cancel()
assert.NoError(t, b.writeTriggerFile("task-scheduler", 0))
b.maybeBackupDB(now)
file, attempts, err := b.findAndParseTriggerFile()
assert.NoError(t, err)
assert.Equal(t, "task-scheduler", file)
assert.Equal(t, 1, attempts)
}
// maybeBackupDB should delete the trigger file if retries are exhausted.
func TestMaybeBackupDBRetriesExhausted(t *testing.T) {
testutils.SmallTest(t)
now := time.Date(2016, 10, 26, 5, 0, 0, 0, time.UTC)
r := mux.NewRouter()
gsRoute(r).Methods("POST").Path(fmt.Sprintf("/upload/storage/v1/b/%s/o", TEST_BUCKET)).
Queries("uploadType", "multipart").
HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
util.Close(r.Body)
http.Error(w, "I don't like your poem.", http.StatusTeapot)
})
b, cancel := getMockedDBBackup(t, r)
defer cancel()
assert.NoError(t, b.writeTriggerFile("task-scheduler", 2))
b.maybeBackupDB(now)
file, _, err := b.findAndParseTriggerFile()
assert.NoError(t, err)
assert.Equal(t, "", file)
}
func TestFormatJobObjectName(t *testing.T) {
testutils.SmallTest(t)
assert.Equal(t, "job-backup/2016/01/01/police-officer.gob", formatJobObjectName(time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC), "police-officer"))
assert.Equal(t, "job-backup/2016/02/29/nurse.gob", formatJobObjectName(time.Date(2016, 2, 29, 1, 2, 3, 0, time.UTC), "nurse"))
assert.Equal(t, "job-backup/2008/08/08/scientist.gob", formatJobObjectName(time.Date(2008, 8, 8, 8, 8, 8, 8, time.UTC), "scientist"))
}
func TestParseIdFromJobObjectName(t *testing.T) {
testutils.SmallTest(t)
test := func(id string) {
assert.Equal(t, id, parseIdFromJobObjectName(formatJobObjectName(time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC), id)))
}
test("police-officer")
test("name.with.internal.dots")
test("20161116T220425.634818978Z_0000000000001e88")
}
// makeJob returns a dummy Job without Id and DbModified set.
func makeJob(now time.Time) *types.Job {
return &types.Job{
Created: now.UTC(),
Dependencies: map[string][]string{},
RepoState: types.RepoState{
Repo: types.DEFAULT_TEST_REPO,
},
Name: "Test-Job",
Tasks: map[string][]*types.TaskSummary{},
}
}
// makeExistingJob returns a dummy Job with Id and DbModified set to the given
// values.
func makeExistingJob(now time.Time, id string) *types.Job {
job := makeJob(now)
job.Id = id
job.DbModified = now.UTC()
return job
}
// backupJob should create a GCS object with the gzipped bytes.
func TestBackupJob(t *testing.T) {
testutils.SmallTest(t)
now := time.Now()
j := makeJob(now)
var buf bytes.Buffer
assert.NoError(t, gob.NewEncoder(&buf).Encode(j))
jobgob := buf.Bytes()
r := mux.NewRouter()
actualBytesGzip := map[string][]byte{}
addMultipartHandler(t, r, actualBytesGzip)
b, cancel := getMockedDBBackup(t, r)
defer cancel()
err := b.backupJob(now, "myjob", jobgob)
assert.NoError(t, err)
name := JOB_BACKUP_DIR + "/" + now.UTC().Format("2006/01/02") + "/myjob.gob"
gzR, err := gzip.NewReader(bytes.NewReader(actualBytesGzip[name]))
assert.NoError(t, err)
actualBytes, err := ioutil.ReadAll(gzR)
assert.NoError(t, err)
assert.NoError(t, gzR.Close())
assert.Equal(t, jobgob, actualBytes)
}
// incrementalBackupStep should just update the incremental backup time when
// there are no jobs.
func TestIncrementalBackupStepNoJobs(t *testing.T) {
testutils.SmallTest(t)
r := mux.NewRouter()
b, cancel := getMockedDBBackup(t, r)
defer cancel()
// Metrics occasionally fail to be deleted, so we might have a leftover from a
// previous test.
beforeCount := b.jobBackupCount.Get()
b.incrementalBackupLiveness.ManualReset(time.Time{})
now := time.Now()
assert.NoError(t, b.incrementalBackupStep(now))
newTs, err := b.db.GetIncrementalBackupTime()
assert.NoError(t, err)
assert.True(t, now.Equal(newTs))
assert.True(t, b.incrementalBackupLiveness.Get() < MAX_TEST_TIME_SECONDS)
assert.Equal(t, beforeCount, b.jobBackupCount.Get())
}
// incrementalBackupStep should back up each added or modified job.
func TestIncrementalBackupStep(t *testing.T) {
testutils.SmallTest(t)
now := time.Now()
namePrefix := JOB_BACKUP_DIR + "/" + now.UTC().Format("2006/01/02") + "/"
r := mux.NewRouter()
actualBytesGzip := map[string][]byte{}
addMultipartHandler(t, r, actualBytesGzip)
b, cancel := getMockedDBBackup(t, r)
defer cancel()
// Metrics occasionally fail to be deleted, so we might have a leftover from a
// previous test.
beforeCount := b.jobBackupCount.Get()
b.incrementalBackupLiveness.ManualReset(time.Time{})
// Add a job.
j1 := makeJob(now)
assert.NoError(t, b.db.PutJob(j1))
name1 := namePrefix + j1.Id + ".gob"
assert.NoError(t, b.incrementalBackupStep(now))
// Check the uploaded data.
{
gzR, err := gzip.NewReader(bytes.NewReader(actualBytesGzip[name1]))
assert.NoError(t, err)
var j1Copy *types.Job
assert.NoError(t, gob.NewDecoder(gzR).Decode(&j1Copy))
assert.NoError(t, gzR.Close())
deepequal.AssertDeepEqual(t, j1, j1Copy)
}
newTs, err := b.db.GetIncrementalBackupTime()
assert.NoError(t, err)
assert.True(t, now.Equal(newTs))
assert.True(t, b.incrementalBackupLiveness.Get() < MAX_TEST_TIME_SECONDS)
assert.Equal(t, beforeCount+1, b.jobBackupCount.Get())
// Modify j1 and add j2.
j1.Status = types.JOB_STATUS_CANCELED
j2 := makeJob(now.Add(time.Second))
assert.NoError(t, b.db.PutJobs([]*types.Job{j1, j2}))
name2 := namePrefix + j2.Id + ".gob"
assert.NoError(t, b.incrementalBackupStep(now))
// Check the uploaded data.
{
gzR, err := gzip.NewReader(bytes.NewReader(actualBytesGzip[name1]))
assert.NoError(t, err)
var j1Copy *types.Job
assert.NoError(t, gob.NewDecoder(gzR).Decode(&j1Copy))
assert.NoError(t, gzR.Close())
deepequal.AssertDeepEqual(t, j1, j1Copy)
}
{
gzR, err := gzip.NewReader(bytes.NewReader(actualBytesGzip[name2]))
assert.NoError(t, err)
var j2Copy *types.Job
assert.NoError(t, gob.NewDecoder(gzR).Decode(&j2Copy))
assert.NoError(t, gzR.Close())
deepequal.AssertDeepEqual(t, j2, j2Copy)
}
assert.Equal(t, beforeCount+3, b.jobBackupCount.Get())
}
// incrementalBackupStep should continue when one job can not be uploaded.
func TestIncrementalBackupStepSingleUploadError(t *testing.T) {
testutils.SmallTest(t)
now := time.Now()
r := mux.NewRouter()
b, cancel := getMockedDBBackup(t, r)
defer cancel()
// Metrics occasionally fail to be deleted, so we might have a leftover from a
// previous test.
beforeCount := b.jobBackupCount.Get()
b.incrementalBackupLiveness.ManualReset(time.Time{})
// Add two jobs.
j1 := makeJob(now)
j2 := makeJob(now.Add(time.Second))
assert.NoError(t, b.db.PutJobs([]*types.Job{j1, j2}))
count := 0
gsRoute(r).Methods("POST").Path(fmt.Sprintf("/upload/storage/v1/b/%s/o", TEST_BUCKET)).
Queries("uploadType", "multipart").
HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
count++
if count == 1 {
util.Close(r.Body)
http.Error(w, "No one wants this job.", http.StatusTeapot)
return
}
t := mockhttpclient.MuxSafeT(t)
mediaType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
assert.NoError(t, err)
assert.Equal(t, "multipart/related", mediaType)
mr := multipart.NewReader(r.Body, params["boundary"])
jsonPart, err := mr.NextPart()
assert.NoError(t, err)
_, err = io.Copy(w, jsonPart)
assert.NoError(t, err)
})
oldTs, err := b.db.GetIncrementalBackupTime()
assert.NoError(t, err)
err = b.incrementalBackupStep(now)
assert.Error(t, err)
assert.Contains(t, err.Error(), "got HTTP response code 418 with body: No one wants this job.")
assert.Equal(t, 2, count)
newTs, err := b.db.GetIncrementalBackupTime()
assert.NoError(t, err)
assert.True(t, oldTs.Equal(newTs))
assert.True(t, b.incrementalBackupLiveness.Get() > MAX_TEST_TIME_SECONDS)
assert.Equal(t, beforeCount+1, b.jobBackupCount.Get())
}
// incrementalBackupStep should report multiple errors when they occur.
func TestIncrementalBackupStepMultipleUploadError(t *testing.T) {
testutils.SmallTest(t)
now := time.Now()
r := mux.NewRouter()
b, cancel := getMockedDBBackup(t, r)
defer cancel()
// Metrics occasionally fail to be deleted, so we might have a leftover from a
// previous test.
beforeCount := b.jobBackupCount.Get()
b.incrementalBackupLiveness.ManualReset(time.Time{})
// Add two jobs.
j1 := makeJob(now)
j2 := makeJob(now.Add(time.Second))
assert.NoError(t, b.db.PutJobs([]*types.Job{j1, j2}))
gsRoute(r).Methods("POST").Path(fmt.Sprintf("/upload/storage/v1/b/%s/o", TEST_BUCKET)).
Queries("uploadType", "multipart").
HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
util.Close(r.Body)
http.Error(w, "No one wants this job.", http.StatusTeapot)
})
oldTs, err := b.db.GetIncrementalBackupTime()
assert.NoError(t, err)
err = b.incrementalBackupStep(now)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Multiple errors performing incremental Job backups")
newTs, err := b.db.GetIncrementalBackupTime()
assert.NoError(t, err)
assert.True(t, oldTs.Equal(newTs))
assert.True(t, b.incrementalBackupLiveness.Get() > MAX_TEST_TIME_SECONDS)
assert.Equal(t, beforeCount, b.jobBackupCount.Get())
}
// incrementalBackupStep should restart modified job tracking on ErrUnknownId.
func TestIncrementalBackupStepReset(t *testing.T) {
testutils.SmallTest(t)
r := mux.NewRouter()
actualBytesGzip := map[string][]byte{}
addMultipartHandler(t, r, actualBytesGzip)
b, cancel := getMockedDBBackup(t, r)
defer cancel()
// Metrics occasionally fail to be deleted, so we might have a leftover from a
// previous test.
beforeCount := b.jobBackupCount.Get()
b.incrementalBackupLiveness.ManualReset(time.Time{})
// Invalidate the ID.
b.db.StopTrackingModifiedJobs(b.modifiedJobsId)
oldTs, err := b.db.GetIncrementalBackupTime()
assert.NoError(t, err)
now := time.Now()
err = b.incrementalBackupStep(now)
assert.True(t, db.IsUnknownId(err))
assert.Equal(t, int64(1), b.incrementalBackupResetCount.Get())
newTs, err := b.db.GetIncrementalBackupTime()
assert.NoError(t, err)
assert.True(t, oldTs.Equal(newTs))
assert.True(t, b.incrementalBackupLiveness.Get() > MAX_TEST_TIME_SECONDS)
assert.Equal(t, beforeCount, b.jobBackupCount.Get())
assert.Equal(t, 0, len(actualBytesGzip))
// Ensure next round succeeds.
now = now.Add(10 * time.Second)
j1 := makeJob(now)
assert.NoError(t, b.db.PutJob(j1))
name1 := JOB_BACKUP_DIR + "/" + now.UTC().Format("2006/01/02") + "/" + j1.Id + ".gob"
assert.NoError(t, b.incrementalBackupStep(now))
newTs, err = b.db.GetIncrementalBackupTime()
assert.NoError(t, err)
assert.True(t, now.Equal(newTs))
assert.True(t, b.incrementalBackupLiveness.Get() < MAX_TEST_TIME_SECONDS)
assert.Equal(t, beforeCount+1, b.jobBackupCount.Get())
assert.Equal(t, 1, len(actualBytesGzip))
assert.True(t, len(actualBytesGzip[name1]) > 0)
}
// incrementalBackupStep should return an error if unable to set the incremental
// backup time in the DB.
func TestIncrementalBackupStepSetTSError(t *testing.T) {
testutils.SmallTest(t)
r := mux.NewRouter()
b, cancel := getMockedDBBackup(t, r)
defer cancel()
injectedError := fmt.Errorf("It's too late. Self-destruct sequence has been initiated.")
b.db.(*testDB).injectSetTSError = injectedError
b.incrementalBackupLiveness.ManualReset(time.Time{})
now := time.Now()
err := b.incrementalBackupStep(now)
assert.Equal(t, injectedError, err)
assert.True(t, b.incrementalBackupLiveness.Get() > MAX_TEST_TIME_SECONDS)
}
// addGetObjectHandler causes r to respond to a request for the contents of
// TEST_BUCKET/name with the given contents.
func addGetObjectHandler(t *testing.T, r *mux.Router, name string, contents []byte) {
// URI does not match documentation at https://cloud.google.com/storage/docs/json_api/v1/objects/get
r.Schemes("https").Host("storage.googleapis.com").Methods("GET").
Path(fmt.Sprintf("/%s/%s", TEST_BUCKET, name)).
Handler(mockhttpclient.MockGetDialogue(contents))
}
// addGetJobGOBHandler causes r to respond to a request for the given job (in
// TEST_BUCKET with name given by formatJobObjectName) with the GOB-encoded Job.
func addGetJobGOBHandler(t *testing.T, r *mux.Router, job *types.Job) {
buf := &bytes.Buffer{}
assert.NoError(t, gob.NewEncoder(buf).Encode(job))
addGetObjectHandler(t, r, formatJobObjectName(job.DbModified, job.Id), buf.Bytes())
}
// downloadGOB should download data from GCS.
func TestDownloadGOB(t *testing.T) {
testutils.SmallTest(t)
now := time.Now()
job := makeExistingJob(now, "j1")
r := mux.NewRouter()
addGetJobGOBHandler(t, r, job)
b, cancel := getMockedDBBackup(t, r)
defer cancel()
var jobCopy types.Job
name := formatJobObjectName(job.DbModified, job.Id)
err := downloadGOB(b.ctx, b.gsClient.Bucket(b.gsBucket), name, &jobCopy)
assert.NoError(t, err)
deepequal.AssertDeepEqual(t, job, &jobCopy)
}
// downloadGOB should return a sensible error if the object doesn't exist.
func TestDownloadGOBNotFound(t *testing.T) {
testutils.SmallTest(t)
r := mux.NewRouter()
name := "foo/bar/baz.gob"
r.Schemes("https").Host("storage.googleapis.com").Methods("GET").
Path(fmt.Sprintf("/%s/%s", TEST_BUCKET, name)).
Handler(mockhttpclient.MockGetError("Not Found", http.StatusNotFound))
b, cancel := getMockedDBBackup(t, r)
defer cancel()
var dummy types.Job
err := downloadGOB(b.ctx, b.gsClient.Bucket(b.gsBucket), name, &dummy)
assert.Error(t, err)
assert.Regexp(t, "object doesn't exist", err.Error())
deepequal.AssertDeepEqual(t, types.Job{}, dummy)
}
// downloadGOB should return an error if the data is not GOB-encoded.
func TestDownloadGOBNotGOB(t *testing.T) {
testutils.SmallTest(t)
r := mux.NewRouter()
name := "poem.txt"
addGetObjectHandler(t, r, name, []byte(TEST_DB_CONTENT))
b, cancel := getMockedDBBackup(t, r)
defer cancel()
var dummy types.Job
err := downloadGOB(b.ctx, b.gsClient.Bucket(b.gsBucket), name, &dummy)
assert.Error(t, err)
assert.Regexp(t, "Error decoding GOB data", err.Error())
deepequal.AssertDeepEqual(t, types.Job{}, dummy)
}
// addGetJobGOBsHandlers causes r to respond to list and get requests for the
// given Jobs. Calls addGetJobGOBHandler for each Job. Calls
// addListObjectsHandler for each dir.
func addGetJobGOBsHandlers(t *testing.T, r *mux.Router, jobsByDir map[string][]*types.Job) {
for dir, jobs := range jobsByDir {
objs := make([]object, len(jobs), len(jobs))
for i, job := range jobs {
name := formatJobObjectName(job.DbModified, job.Id)
addGetJobGOBHandler(t, r, job)
objs[i] = object{TEST_BUCKET, name, job.DbModified}
}
addListObjectsHandler(t, r, dir+"/", objs)
}
}
// assertJobMapsEqual asserts expected and actual are deep equal. If not,
// provides a useful indication of their differences to FailNow.
func assertJobMapsEqual(t *testing.T, expected map[string]*types.Job, actual map[string]*types.Job) {
msg := &bytes.Buffer{}
for id, eJob := range expected {
if aJob, ok := actual[id]; ok {
if !reflect.DeepEqual(eJob, aJob) {
if _, err := fmt.Fprintf(msg, "Job %q differs:\n\tExpected: %v\n\tActual: %v\n", id, eJob, aJob); err != nil {
sklog.Fatal(err)
}
}
} else {
if _, err := fmt.Fprintf(msg, "Missing job %q: %v\n", id, eJob); err != nil {
sklog.Fatal(err)
}
}
}
for id, aJob := range actual {
if _, ok := expected[id]; !ok {
if _, err := fmt.Fprintf(msg, "Extra job %q: %v\n", id, aJob); err != nil {
sklog.Fatal(err)
}
}
}
if msg.Len() > 0 {
assert.FailNow(t, msg.String())
}
}
// RetrieveJobs should download Jobs for the requested period from GCS.
func TestRetrieveJobsSimple(t *testing.T) {
testutils.SmallTest(t)
now := time.Now().Round(time.Second)
since := now.Add(-1 * time.Hour)
expectedJobs := map[string]*types.Job{}
job1 := makeExistingJob(since.Add(-10*time.Minute), "before")
job1dir := path.Dir(formatJobObjectName(job1.DbModified, job1.Id))
job2 := makeExistingJob(since.Add(10*time.Minute), "after")
job2dir := path.Dir(formatJobObjectName(job2.DbModified, job2.Id))
expectedJobs[job2.Id] = job2.Copy()
r := mux.NewRouter()
allJobsByDir := map[string][]*types.Job{}
if job1dir == job2dir {
allJobsByDir[job1dir] = []*types.Job{job1, job2}
} else {
allJobsByDir[job1dir] = []*types.Job{job1}
allJobsByDir[job2dir] = []*types.Job{job2}
}
nowdir := path.Dir(formatJobObjectName(time.Now(), "dummy"))
if job2dir != nowdir {
allJobsByDir[nowdir] = []*types.Job{}
}
addGetJobGOBsHandlers(t, r, allJobsByDir)
b, cancel := getMockedDBBackup(t, r)
defer cancel()
actualJobs, err := b.RetrieveJobs(since)
assert.NoError(t, err)
assertJobMapsEqual(t, expectedJobs, actualJobs)
}
// RetrieveJobs should download Jobs for the requested period from GCS where the
// Jobs span multiple directories.
func TestRetrieveJobsMultipleDirs(t *testing.T) {
testutils.MediumTest(t) // GOB encoding and decoding takes time.
now := time.Now().Round(time.Second)
since := now.Add(-26 * time.Hour)
allJobsByDir := map[string][]*types.Job{}
expectedJobs := map[string]*types.Job{}
// Add jobs before since. Not expected from RetrieveJobs.
for i := -26 * time.Hour; i < 0; i += time.Hour {
ts := since.Add(i)
job := makeExistingJob(ts, fmt.Sprintf("%s", i))
dir := path.Dir(formatJobObjectName(job.DbModified, job.Id))
allJobsByDir[dir] = append(allJobsByDir[dir], job)
}
// Add jobs at and after since. Expected from RetrieveJobs.
for i := time.Duration(0); i <= 26*time.Hour; i += time.Hour {
ts := since.Add(i)
job := makeExistingJob(ts, fmt.Sprintf("%s", i))
dir := path.Dir(formatJobObjectName(job.DbModified, job.Id))
allJobsByDir[dir] = append(allJobsByDir[dir], job)
expectedJobs[job.Id] = job.Copy()
}
r := mux.NewRouter()
addGetJobGOBsHandlers(t, r, allJobsByDir)
b, cancel := getMockedDBBackup(t, r)
defer cancel()
actualJobs, err := b.RetrieveJobs(since)
assert.NoError(t, err)
assertJobMapsEqual(t, expectedJobs, actualJobs)
}
// RetrieveJobs should download Jobs for the requested period from GCS when there
// are older versions for the same Job.
func TestRetrieveJobsMultipleVersions(t *testing.T) {
testutils.MediumTest(t) // GOB encoding and decoding takes time.
now := time.Now().Round(time.Second)
since := now.Add(-26 * time.Hour)
allJobsByDir := map[string][]*types.Job{}
expectedJobs := map[string]*types.Job{}
// Add and modify jobs before since. Not expected from RetrieveJobs.
for i := -26 * time.Hour; i < -time.Hour; i += time.Hour {
ts := since.Add(i)
origjob := makeExistingJob(ts, fmt.Sprintf("before-mod-before-%s", i))
origdir := path.Dir(formatJobObjectName(origjob.DbModified, origjob.Id))
modjob := origjob.Copy()
modjob.Status = types.JOB_STATUS_CANCELED
modjob.DbModified = ts.Add(time.Hour).UTC()
moddir := path.Dir(formatJobObjectName(modjob.DbModified, modjob.Id))
allJobsByDir[moddir] = append(allJobsByDir[moddir], modjob)
if origdir != moddir {
allJobsByDir[origdir] = append(allJobsByDir[origdir], origjob)
}
}
// Add jobs created before since and modified after since. Expected from
// RetrieveJobs.
for i := time.Hour; i < 26*time.Hour; i += time.Hour {
ts := since.Add(-i)
origjob := makeExistingJob(ts, fmt.Sprintf("before-mod-after-%s", i))
origdir := path.Dir(formatJobObjectName(origjob.DbModified, origjob.Id))
modjob := origjob.Copy()
modjob.Status = types.JOB_STATUS_CANCELED
modjob.DbModified = since.Add(i).UTC()
moddir := path.Dir(formatJobObjectName(modjob.DbModified, modjob.Id))
allJobsByDir[moddir] = append(allJobsByDir[moddir], modjob)
if origdir != moddir {
allJobsByDir[origdir] = append(allJobsByDir[origdir], origjob)
}
expectedJobs[modjob.Id] = modjob.Copy()
}
// Add jobs created and modified after since. Expected from RetrieveJobs.
for i := time.Duration(0); i < 26*time.Hour; i += time.Hour {
ts := since.Add(i)
origjob := makeExistingJob(ts, fmt.Sprintf("after-mod-after-%s", i))
origdir := path.Dir(formatJobObjectName(origjob.DbModified, origjob.Id))
modjob := origjob.Copy()
modjob.Status = types.JOB_STATUS_SUCCESS
modjob.DbModified = ts.Add(time.Hour).UTC()
moddir := path.Dir(formatJobObjectName(modjob.DbModified, modjob.Id))
allJobsByDir[moddir] = append(allJobsByDir[moddir], modjob)
if origdir != moddir {
allJobsByDir[origdir] = append(allJobsByDir[origdir], origjob)
}
expectedJobs[modjob.Id] = modjob.Copy()
}
r := mux.NewRouter()
addGetJobGOBsHandlers(t, r, allJobsByDir)
b, cancel := getMockedDBBackup(t, r)
defer cancel()
actualJobs, err := b.RetrieveJobs(since)
assert.NoError(t, err)
assertJobMapsEqual(t, expectedJobs, actualJobs)
}
// RetrieveJobs should give a sensible error if unable to list Jobs in GCS.
func TestRetrieveJobsErrorListingJobs(t *testing.T) {
testutils.SmallTest(t)
r := mux.NewRouter()
now := time.Now().Round(time.Second)
prefix := JOB_BACKUP_DIR + "/" + now.UTC().Format("2006/01/02") + "/"
gsRoute(r).Methods("GET").
Path(fmt.Sprintf("/storage/v1/b/%s/o", TEST_BUCKET)).
Queries("prefix", prefix).
Handler(mockhttpclient.MockGetError("No jobs today", http.StatusTeapot))
b, cancel := getMockedDBBackup(t, r)
defer cancel()
_, err := b.RetrieveJobs(now)
assert.Error(t, err)
assert.Regexp(t, "Unable to list jobs in "+TEST_BUCKET+"/"+prefix, err)
}
// RetrieveJobs should give a sensible error if unable to download a Job from
// GCS.
func TestRetrieveJobsErrorDownloading(t *testing.T) {
testutils.SmallTest(t)
r := mux.NewRouter()
now := time.Now().Round(time.Second)
prefix := JOB_BACKUP_DIR + "/" + now.UTC().Format("2006/01/02") + "/"
name := prefix + "j1.gob"
addListObjectsHandler(t, r, prefix, []object{
{TEST_BUCKET, name, now.UTC()},
})
addGetObjectHandler(t, r, name, []byte("Hi Mom!"))
b, cancel := getMockedDBBackup(t, r)
defer cancel()
_, err := b.RetrieveJobs(now)
assert.Error(t, err)
assert.Regexp(t, `Unable to read .*/j1\.gob`, err)
}