gs_utils: when uploading a whole dir using IF_NEW, check for existence of multiple files in a single operation
BUG=skia:2778
R=borenet@google.com
Review URL: https://codereview.chromium.org/420553002
diff --git a/py/utils/gs_utils.py b/py/utils/gs_utils.py
index c58bc67..7f3b59c 100644
--- a/py/utils/gs_utils.py
+++ b/py/utils/gs_utils.py
@@ -104,9 +104,9 @@
class UploadIf:
"""Cases in which we will upload a file.
- Beware of performance tradeoffs. E.g., if the file is small, the extra
- round trip to check for file existence and/or checksum may take longer than
- just uploading the file.
+ Beware of performance tradeoffs. E.g., if you are uploading just one small
+ file, the extra round trip to check for file existence and/or checksum may
+ take longer than just uploading the file.
See http://skbug.com/2778 ('gs_utils: when uploading IF_NEW, batch up
checks for existing files within a single remote directory')
"""
@@ -244,7 +244,8 @@
bucket=b, path=key.name,
id_type=id_type, id_value=id_value, permission=permission)
- def upload_dir_contents(self, source_dir, dest_bucket, dest_dir, **kwargs):
+ def upload_dir_contents(self, source_dir, dest_bucket, dest_dir,
+ upload_if=UploadIf.ALWAYS, **kwargs):
"""Recursively upload contents of a local directory to Google Storage.
params:
@@ -253,34 +254,67 @@
dest_bucket: GS bucket to copy the files into
dest_dir: full path (Posix-style) within that bucket; write the files into
this directory. If None, write into the root directory of the bucket.
+ upload_if: one of the UploadIf values, describing in which cases we should
+ upload the file
kwargs: any additional keyword arguments "inherited" from upload_file()
The copy operates as a merge: any files in source_dir will be "overlaid" on
top of the existing content in dest_dir. Existing files with the same names
- may or may not be overwritten, depending on the value of the upload_if kwarg
- inherited from upload_file().
+ may or may not be overwritten, depending on the value of upload_if.
TODO(epoger): Upload multiple files simultaneously to reduce latency.
-
- TODO(epoger): When upload_if==IF_NEW, batch up checks for existing files
- within a single remote directory. See http://skbug.com/2778
"""
b = self._connect_to_bucket(bucket=dest_bucket)
- for filename in sorted(os.listdir(source_dir)):
- local_path = os.path.join(source_dir, filename)
- if dest_dir:
- remote_path = posixpath.join(dest_dir, filename)
- else:
- remote_path = filename
+ if not dest_dir:
+ dest_dir = ''
- if os.path.isdir(local_path):
- self.upload_dir_contents( # recurse
- source_dir=local_path, dest_bucket=b, dest_dir=remote_path,
- **kwargs)
+ # Create a set of all files within source_dir.
+ source_fileset = set()
+ prefix_length = len(source_dir)+1
+ for dirpath, _, filenames in os.walk(source_dir):
+ relative_dirpath = dirpath[prefix_length:]
+ for filename in filenames:
+ source_fileset.add(os.path.join(relative_dirpath, filename))
+
+ # If we are only uploading files conditionally, remove any unnecessary
+ # files from source_fileset.
+ if upload_if == self.UploadIf.ALWAYS:
+ pass # there are no shortcuts... upload them all
+ else:
+ # Create a mapping of filename to Key for existing files within dest_dir
+ existing_dest_filemap = {}
+ prefix = dest_dir
+ if prefix and not prefix.endswith('/'):
+ prefix += '/'
+ prefix_length = len(prefix)
+ items = BucketListResultSet(bucket=b, prefix=prefix)
+ for item in items:
+ if type(item) is Key:
+ existing_dest_filemap[item.name[prefix_length:]] = item
+
+ # Now, depending on upload_if, trim files we should skip uploading.
+ files_in_common = source_fileset.intersection(
+ existing_dest_filemap.keys())
+ if upload_if == self.UploadIf.IF_NEW:
+ source_fileset -= files_in_common
+ elif upload_if == self.UploadIf.IF_MODIFIED:
+ for rel_path in files_in_common:
+ local_md5 = '"%s"' % _get_local_md5(path=os.path.join(
+ source_dir, rel_path))
+ key = existing_dest_filemap[rel_path]
+ if local_md5 == key.etag:
+ source_fileset.remove(rel_path)
else:
- self.upload_file(
- source_path=local_path, dest_bucket=b, dest_path=remote_path,
- **kwargs)
+ raise Exception('unknown value of upload_if: %s' % upload_if)
+
+ # Upload any files still in source_fileset.
+ for rel_path in sorted(source_fileset):
+ self.upload_file(
+ source_path=os.path.join(source_dir, rel_path),
+ dest_bucket=b,
+ dest_path=posixpath.join(dest_dir, rel_path),
+ upload_if=self.UploadIf.ALWAYS,
+ **kwargs)
def download_file(self, source_bucket, source_path, dest_path,
create_subdirs_if_needed=False):
diff --git a/py/utils/gs_utils_manualtest.py b/py/utils/gs_utils_manualtest.py
index b4f70a6..37621d1 100755
--- a/py/utils/gs_utils_manualtest.py
+++ b/py/utils/gs_utils_manualtest.py
@@ -45,7 +45,7 @@
gs.list_bucket_contents(bucket=TEST_BUCKET, subdir=None)
-def _test_upload_if():
+def _test_upload_if_one_file():
"""Test upload_if param within upload_file()."""
gs = _get_authenticated_gs_handle()
filename = 'filename'
@@ -113,6 +113,86 @@
# Clean up the local dir.
shutil.rmtree(local_dir)
+
+def _test_upload_if_multiple_files():
+ """Test upload_if param within upload_dir_contents()."""
+ gs = _get_authenticated_gs_handle()
+ subdir = 'subdir'
+ filenames = ['file1', 'file2']
+ local_dir = tempfile.mkdtemp()
+ remote_dir = _get_unique_posix_dir()
+ sample_file_local_path = os.path.join(local_dir, subdir, filenames[0])
+ sample_file_remote_path = posixpath.join(remote_dir, subdir, filenames[0])
+ try:
+ # Create files on local disk, and upload them for the first time.
+ os.mkdir(os.path.join(local_dir, subdir))
+ for filename in filenames:
+ with open(os.path.join(local_dir, subdir, filename), 'w') as f:
+ f.write('original contents of %s' % filename)
+ gs.upload_dir_contents(
+ source_dir=local_dir, dest_bucket=TEST_BUCKET,
+ dest_dir=remote_dir, upload_if=gs.UploadIf.IF_NEW)
+ try:
+ # Re-upload the same files, with upload_if=gs.UploadIf.ALWAYS;
+ # the timestamps should change.
+ old_timestamp = gs.get_last_modified_time(
+ bucket=TEST_BUCKET, path=sample_file_remote_path)
+ time.sleep(2)
+ gs.upload_dir_contents(
+ source_dir=local_dir, dest_bucket=TEST_BUCKET,
+ dest_dir=remote_dir, upload_if=gs.UploadIf.ALWAYS)
+ new_timestamp = gs.get_last_modified_time(
+ bucket=TEST_BUCKET, path=sample_file_remote_path)
+ assert old_timestamp != new_timestamp, '%s != %s' % (
+ old_timestamp, new_timestamp)
+
+ # Re-upload the same files, with upload_if=gs.UploadIf.IF_MODIFIED;
+ # the timestamps should NOT change.
+ old_timestamp = new_timestamp
+ time.sleep(2)
+ gs.upload_dir_contents(
+ source_dir=local_dir, dest_bucket=TEST_BUCKET,
+ dest_dir=remote_dir, upload_if=gs.UploadIf.IF_MODIFIED)
+ new_timestamp = gs.get_last_modified_time(
+ bucket=TEST_BUCKET, path=sample_file_remote_path)
+ assert old_timestamp == new_timestamp, '%s == %s' % (
+ old_timestamp, new_timestamp)
+
+ # Modify and re-upload the files, with upload_if=gs.UploadIf.IF_NEW;
+ # the timestamps should still not change.
+ old_timestamp = new_timestamp
+ with open(sample_file_local_path, 'w') as f:
+ f.write('modified contents of sample file')
+ time.sleep(2)
+ gs.upload_dir_contents(
+ source_dir=local_dir, dest_bucket=TEST_BUCKET,
+ dest_dir=remote_dir, upload_if=gs.UploadIf.IF_NEW)
+ new_timestamp = gs.get_last_modified_time(
+ bucket=TEST_BUCKET, path=sample_file_remote_path)
+ assert old_timestamp == new_timestamp, '%s == %s' % (
+ old_timestamp, new_timestamp)
+
+ # Re-upload the modified file, with upload_if=gs.UploadIf.IF_MODIFIED;
+ # now the timestamp SHOULD change.
+ old_timestamp = new_timestamp
+ time.sleep(2)
+ gs.upload_dir_contents(
+ source_dir=local_dir, dest_bucket=TEST_BUCKET,
+ dest_dir=remote_dir, upload_if=gs.UploadIf.IF_MODIFIED)
+ new_timestamp = gs.get_last_modified_time(
+ bucket=TEST_BUCKET, path=sample_file_remote_path)
+ assert old_timestamp != new_timestamp, '%s != %s' % (
+ old_timestamp, new_timestamp)
+ finally:
+ # Delete all the files we uploaded to Google Storage.
+ for filename in filenames:
+ gs.delete_file(bucket=TEST_BUCKET,
+ path=posixpath.join(remote_dir, subdir, filename))
+ finally:
+ # Clean up the local dir.
+ shutil.rmtree(local_dir)
+
+
def _test_authenticated_round_trip():
gs = _get_authenticated_gs_handle()
remote_dir = _get_unique_posix_dir()
@@ -275,7 +355,8 @@
if __name__ == '__main__':
- _test_upload_if()
+ _test_upload_if_multiple_files()
+ _test_upload_if_one_file()
_test_public_read()
_test_authenticated_round_trip()
_test_dir_upload_and_download()