blob: 37621d1e70cdcf6e27fa456c3bd00bb4b80dc2af [file] [log] [blame]
#!/usr/bin/python
"""Tests for gs_utils.py.
TODO(epoger): How should we exercise these self-tests? See http://skbug.com/2751
"""
# System-level imports.
import os
import posixpath
import random
import shutil
import sys
import tempfile
import time
# Local imports.
import gs_utils
TEST_BUCKET = 'chromium-skia-testing'
def _get_authenticated_gs_handle():
"""Returns an instance of GSUtils using ~/.boto for authentication."""
try:
return gs_utils.GSUtils(
boto_file_path=os.path.expanduser(os.path.join('~','.boto')))
except:
print """
Failed to instantiate GSUtils object with default .boto file path.
Do you have a ~/.boto file that provides the credentials needed to write
into gs://%s?
""" % TEST_BUCKET
raise
def _get_unique_posix_dir():
"""Returns a unique directory name suitable for use in Google Storage."""
return 'gs_utils_manualtest/%d' % random.randint(0, sys.maxint)
def _test_public_read():
"""Make sure we can read from public files without .boto file credentials."""
gs = gs_utils.GSUtils()
gs.list_bucket_contents(bucket=TEST_BUCKET, subdir=None)
def _test_upload_if_one_file():
"""Test upload_if param within upload_file()."""
gs = _get_authenticated_gs_handle()
filename = 'filename'
remote_dir = _get_unique_posix_dir()
dest_path = posixpath.join(remote_dir, filename)
local_dir = tempfile.mkdtemp()
try:
# Create a file on local disk, and upload it for the first time.
local_path = os.path.join(local_dir, filename)
with open(local_path, 'w') as f:
f.write('original contents')
gs.upload_file(source_path=local_path, dest_bucket=TEST_BUCKET,
dest_path=dest_path, upload_if=gs.UploadIf.IF_NEW)
try:
# Re-upload the same file, with upload_if=gs.UploadIf.ALWAYS;
# the timestamp should change.
old_timestamp = gs.get_last_modified_time(
bucket=TEST_BUCKET, path=dest_path)
time.sleep(2)
gs.upload_file(source_path=local_path, dest_bucket=TEST_BUCKET,
dest_path=dest_path, upload_if=gs.UploadIf.ALWAYS)
new_timestamp = gs.get_last_modified_time(
bucket=TEST_BUCKET, path=dest_path)
assert old_timestamp != new_timestamp, '%s != %s' % (
old_timestamp, new_timestamp)
# Re-upload the same file, with upload_if=gs.UploadIf.IF_MODIFIED;
# the timestamp should NOT change.
old_timestamp = new_timestamp
time.sleep(2)
gs.upload_file(source_path=local_path, dest_bucket=TEST_BUCKET,
dest_path=dest_path, upload_if=gs.UploadIf.IF_MODIFIED)
new_timestamp = gs.get_last_modified_time(
bucket=TEST_BUCKET, path=dest_path)
assert old_timestamp == new_timestamp, '%s == %s' % (
old_timestamp, new_timestamp)
# Modify and re-upload the file, with upload_if=gs.UploadIf.IF_NEW;
# the timestamp should still not change.
old_timestamp = new_timestamp
with open(local_path, 'w') as f:
f.write('modified contents')
time.sleep(2)
gs.upload_file(source_path=local_path, dest_bucket=TEST_BUCKET,
dest_path=dest_path, upload_if=gs.UploadIf.IF_NEW)
new_timestamp = gs.get_last_modified_time(
bucket=TEST_BUCKET, path=dest_path)
assert old_timestamp == new_timestamp, '%s == %s' % (
old_timestamp, new_timestamp)
# Re-upload the modified file, with upload_if=gs.UploadIf.IF_MODIFIED;
# now the timestamp SHOULD change.
old_timestamp = new_timestamp
time.sleep(2)
gs.upload_file(source_path=local_path, dest_bucket=TEST_BUCKET,
dest_path=dest_path, upload_if=gs.UploadIf.IF_MODIFIED)
new_timestamp = gs.get_last_modified_time(
bucket=TEST_BUCKET, path=dest_path)
assert old_timestamp != new_timestamp, '%s != %s' % (
old_timestamp, new_timestamp)
finally:
# Clean up the remote_dir.
gs.delete_file(bucket=TEST_BUCKET, path=dest_path)
finally:
# Clean up the local dir.
shutil.rmtree(local_dir)
def _test_upload_if_multiple_files():
"""Test upload_if param within upload_dir_contents()."""
gs = _get_authenticated_gs_handle()
subdir = 'subdir'
filenames = ['file1', 'file2']
local_dir = tempfile.mkdtemp()
remote_dir = _get_unique_posix_dir()
sample_file_local_path = os.path.join(local_dir, subdir, filenames[0])
sample_file_remote_path = posixpath.join(remote_dir, subdir, filenames[0])
try:
# Create files on local disk, and upload them for the first time.
os.mkdir(os.path.join(local_dir, subdir))
for filename in filenames:
with open(os.path.join(local_dir, subdir, filename), 'w') as f:
f.write('original contents of %s' % filename)
gs.upload_dir_contents(
source_dir=local_dir, dest_bucket=TEST_BUCKET,
dest_dir=remote_dir, upload_if=gs.UploadIf.IF_NEW)
try:
# Re-upload the same files, with upload_if=gs.UploadIf.ALWAYS;
# the timestamps should change.
old_timestamp = gs.get_last_modified_time(
bucket=TEST_BUCKET, path=sample_file_remote_path)
time.sleep(2)
gs.upload_dir_contents(
source_dir=local_dir, dest_bucket=TEST_BUCKET,
dest_dir=remote_dir, upload_if=gs.UploadIf.ALWAYS)
new_timestamp = gs.get_last_modified_time(
bucket=TEST_BUCKET, path=sample_file_remote_path)
assert old_timestamp != new_timestamp, '%s != %s' % (
old_timestamp, new_timestamp)
# Re-upload the same files, with upload_if=gs.UploadIf.IF_MODIFIED;
# the timestamps should NOT change.
old_timestamp = new_timestamp
time.sleep(2)
gs.upload_dir_contents(
source_dir=local_dir, dest_bucket=TEST_BUCKET,
dest_dir=remote_dir, upload_if=gs.UploadIf.IF_MODIFIED)
new_timestamp = gs.get_last_modified_time(
bucket=TEST_BUCKET, path=sample_file_remote_path)
assert old_timestamp == new_timestamp, '%s == %s' % (
old_timestamp, new_timestamp)
# Modify and re-upload the files, with upload_if=gs.UploadIf.IF_NEW;
# the timestamps should still not change.
old_timestamp = new_timestamp
with open(sample_file_local_path, 'w') as f:
f.write('modified contents of sample file')
time.sleep(2)
gs.upload_dir_contents(
source_dir=local_dir, dest_bucket=TEST_BUCKET,
dest_dir=remote_dir, upload_if=gs.UploadIf.IF_NEW)
new_timestamp = gs.get_last_modified_time(
bucket=TEST_BUCKET, path=sample_file_remote_path)
assert old_timestamp == new_timestamp, '%s == %s' % (
old_timestamp, new_timestamp)
# Re-upload the modified file, with upload_if=gs.UploadIf.IF_MODIFIED;
# now the timestamp SHOULD change.
old_timestamp = new_timestamp
time.sleep(2)
gs.upload_dir_contents(
source_dir=local_dir, dest_bucket=TEST_BUCKET,
dest_dir=remote_dir, upload_if=gs.UploadIf.IF_MODIFIED)
new_timestamp = gs.get_last_modified_time(
bucket=TEST_BUCKET, path=sample_file_remote_path)
assert old_timestamp != new_timestamp, '%s != %s' % (
old_timestamp, new_timestamp)
finally:
# Delete all the files we uploaded to Google Storage.
for filename in filenames:
gs.delete_file(bucket=TEST_BUCKET,
path=posixpath.join(remote_dir, subdir, filename))
finally:
# Clean up the local dir.
shutil.rmtree(local_dir)
def _test_authenticated_round_trip():
gs = _get_authenticated_gs_handle()
remote_dir = _get_unique_posix_dir()
subdir = 'subdir'
filenames_to_upload = ['file1', 'file2']
# Upload test files to Google Storage, checking that their fine-grained
# ACLs were set correctly.
id_type = gs.IdType.GROUP_BY_DOMAIN
id_value = 'chromium.org'
set_permission = gs.Permission.READ
local_src_dir = tempfile.mkdtemp()
os.mkdir(os.path.join(local_src_dir, subdir))
try:
for filename in filenames_to_upload:
with open(os.path.join(local_src_dir, subdir, filename), 'w') as f:
f.write('contents of %s\n' % filename)
dest_path = posixpath.join(remote_dir, subdir, filename)
gs.upload_file(
source_path=os.path.join(local_src_dir, subdir, filename),
dest_bucket=TEST_BUCKET, dest_path=dest_path,
fine_grained_acl_list=[(id_type, id_value, set_permission)])
got_permission = gs.get_acl(bucket=TEST_BUCKET, path=dest_path,
id_type=id_type, id_value=id_value)
assert got_permission == set_permission, '%s == %s' % (
got_permission, set_permission)
finally:
shutil.rmtree(local_src_dir)
# Get a list of the files we uploaded to Google Storage.
(dirs, files) = gs.list_bucket_contents(
bucket=TEST_BUCKET, subdir=remote_dir)
assert dirs == [subdir], '%s == [%s]' % (dirs, subdir)
assert files == [], '%s == []' % files
(dirs, files) = gs.list_bucket_contents(
bucket=TEST_BUCKET, subdir=posixpath.join(remote_dir, subdir))
assert dirs == [], '%s == []' % dirs
assert files == filenames_to_upload, '%s == %s' % (files, filenames_to_upload)
# Manipulate ACLs on one of those files, and verify them.
# TODO(epoger): Test IdTypes other than GROUP_BY_DOMAIN ?
# TODO(epoger): Test setting multiple ACLs on the same file?
id_type = gs.IdType.GROUP_BY_DOMAIN
id_value = 'google.com'
fullpath = posixpath.join(remote_dir, subdir, filenames_to_upload[0])
# Make sure ACL is empty to start with ...
gs.set_acl(bucket=TEST_BUCKET, path=fullpath,
id_type=id_type, id_value=id_value, permission=gs.Permission.EMPTY)
permission = gs.get_acl(bucket=TEST_BUCKET, path=fullpath,
id_type=id_type, id_value=id_value)
assert permission == gs.Permission.EMPTY, '%s == %s' % (
permission, gs.Permission.EMPTY)
# ... set it to OWNER ...
gs.set_acl(bucket=TEST_BUCKET, path=fullpath,
id_type=id_type, id_value=id_value, permission=gs.Permission.OWNER)
permission = gs.get_acl(bucket=TEST_BUCKET, path=fullpath,
id_type=id_type, id_value=id_value)
assert permission == gs.Permission.OWNER, '%s == %s' % (
permission, gs.Permission.OWNER)
# ... now set it to READ ...
gs.set_acl(bucket=TEST_BUCKET, path=fullpath,
id_type=id_type, id_value=id_value, permission=gs.Permission.READ)
permission = gs.get_acl(bucket=TEST_BUCKET, path=fullpath,
id_type=id_type, id_value=id_value)
assert permission == gs.Permission.READ, '%s == %s' % (
permission, gs.Permission.READ)
# ... and clear it again to finish.
gs.set_acl(bucket=TEST_BUCKET, path=fullpath,
id_type=id_type, id_value=id_value, permission=gs.Permission.EMPTY)
permission = gs.get_acl(bucket=TEST_BUCKET, path=fullpath,
id_type=id_type, id_value=id_value)
assert permission == gs.Permission.EMPTY, '%s == %s' % (
permission, gs.Permission.EMPTY)
# Download the files we uploaded to Google Storage, and validate contents.
local_dest_dir = tempfile.mkdtemp()
try:
for filename in filenames_to_upload:
gs.download_file(source_bucket=TEST_BUCKET,
source_path=posixpath.join(remote_dir, subdir, filename),
dest_path=os.path.join(local_dest_dir, subdir, filename),
create_subdirs_if_needed=True)
with open(os.path.join(local_dest_dir, subdir, filename)) as f:
file_contents = f.read()
assert file_contents == 'contents of %s\n' % filename, (
'%s == "contents of %s\n"' % (file_contents, filename))
finally:
shutil.rmtree(local_dest_dir)
# Delete all the files we uploaded to Google Storage.
for filename in filenames_to_upload:
gs.delete_file(bucket=TEST_BUCKET,
path=posixpath.join(remote_dir, subdir, filename))
# Confirm that we deleted all the files we uploaded to Google Storage.
(dirs, files) = gs.list_bucket_contents(
bucket=TEST_BUCKET, subdir=posixpath.join(remote_dir, subdir))
assert dirs == [], '%s == []' % dirs
assert files == [], '%s == []' % files
def _test_dir_upload_and_download():
"""Test upload_dir_contents() and download_dir_contents()."""
gs = _get_authenticated_gs_handle()
remote_dir = _get_unique_posix_dir()
subdir = 'subdir'
filenames = ['file1', 'file2']
# Create directory tree on local disk and upload it.
id_type = gs.IdType.GROUP_BY_DOMAIN
id_value = 'chromium.org'
set_permission = gs.Permission.READ
local_src_dir = tempfile.mkdtemp()
os.mkdir(os.path.join(local_src_dir, subdir))
try:
for filename in filenames:
with open(os.path.join(local_src_dir, subdir, filename), 'w') as f:
f.write('contents of %s\n' % filename)
gs.upload_dir_contents(
source_dir=local_src_dir, dest_bucket=TEST_BUCKET, dest_dir=remote_dir,
predefined_acl=gs.PredefinedACL.PRIVATE,
fine_grained_acl_list=[(id_type, id_value, set_permission)])
finally:
shutil.rmtree(local_src_dir)
# Validate the list of the files we uploaded to Google Storage.
(dirs, files) = gs.list_bucket_contents(
bucket=TEST_BUCKET, subdir=remote_dir)
assert dirs == [subdir], '%s == [%s]' % (dirs, subdir)
assert files == [], '%s == []' % files
(dirs, files) = gs.list_bucket_contents(
bucket=TEST_BUCKET, subdir=posixpath.join(remote_dir, subdir))
assert dirs == [], '%s == []' % dirs
assert files == filenames, '%s == %s' % (files, filenames)
# Check the fine-grained ACLs we set in Google Storage.
for filename in filenames:
got_permission = gs.get_acl(
bucket=TEST_BUCKET, path=posixpath.join(remote_dir, subdir, filename),
id_type=id_type, id_value=id_value)
assert got_permission == set_permission, '%s == %s' % (
got_permission, set_permission)
# Download the directory tree we just uploaded, make sure its contents
# are what we expect, and then delete the tree in Google Storage.
local_dest_dir = tempfile.mkdtemp()
try:
gs.download_dir_contents(source_bucket=TEST_BUCKET, source_dir=remote_dir,
dest_dir=local_dest_dir)
for filename in filenames:
with open(os.path.join(local_dest_dir, subdir, filename)) as f:
file_contents = f.read()
assert file_contents == 'contents of %s\n' % filename, (
'%s == "contents of %s\n"' % (file_contents, filename))
finally:
shutil.rmtree(local_dest_dir)
for filename in filenames:
gs.delete_file(bucket=TEST_BUCKET,
path=posixpath.join(remote_dir, subdir, filename))
if __name__ == '__main__':
_test_upload_if_multiple_files()
_test_upload_if_one_file()
_test_public_read()
_test_authenticated_round_trip()
_test_dir_upload_and_download()
# TODO(epoger): Add _test_unauthenticated_access() to make sure we raise
# an exception when we try to access without needed credentials.