upload_dir_contents(): upload multiple files in parallel BUG=skia:2780 R=borenet@google.com Review URL: https://codereview.chromium.org/424553002
diff --git a/py/utils/gs_utils.py b/py/utils/gs_utils.py index f53b4d1..7824c79 100644 --- a/py/utils/gs_utils.py +++ b/py/utils/gs_utils.py
@@ -19,10 +19,13 @@ # System-level imports import errno import hashlib +import logging import os import posixpath +import Queue import re import sys +import threading # Imports from third-party code TRUNK_DIRECTORY = os.path.abspath(os.path.join( @@ -43,6 +46,13 @@ from boto.s3.connection import SubdomainCallingFormat from boto.s3.prefix import Prefix +# How many files to upload at once, by default. +# TODO(epoger): Is there a way to compute this intelligently? To some extent +# it is a function of how many cores are on the machine, and how many other +# processes it is running; but it's probably more a function of how much time +# each core sits idle waiting for network I/O to complete. +DEFAULT_UPLOAD_THREADS = 10 + class AnonymousGSConnection(GSConnection): """GSConnection class that allows anonymous connections. @@ -116,21 +126,24 @@ IF_MODIFIED = 3 # if there is an existing file with the same name and # contents, leave it alone - def __init__(self, boto_file_path=None): + def __init__(self, boto_file_path=None, logger=None): """Constructor. Params: boto_file_path: full path (local-OS-style) on local disk where .boto credentials file can be found. If None, then the GSUtils object created will be able to access only public files in Google Storage. + logger: a logging.Logger instance to use for logging output; if None, + one will be created with default characteristics Raises an exception if no file is found at boto_file_path, or if the file found there is malformed. """ + self.logger = logger or logging.getLogger(__name__) self._gs_access_key_id = None self._gs_secret_access_key = None if boto_file_path: - print 'Reading boto file from %s' % boto_file_path + self.logger.info('Reading boto file from %s' % boto_file_path) boto_dict = _config_file_as_dict(filepath=boto_file_path) self._gs_access_key_id = boto_dict['gs_access_key_id'] self._gs_secret_access_key = boto_dict['gs_secret_access_key'] @@ -215,8 +228,8 @@ if upload_if == self.UploadIf.IF_NEW: old_key = b.get_key(key_name=dest_path) if old_key: - print 'Skipping upload of existing file gs://%s/%s' % ( - b.name, dest_path) + self.logger.info('Skipping upload of existing file gs://%s/%s' % ( + b.name, dest_path)) return elif upload_if == self.UploadIf.IF_MODIFIED: old_key = b.get_key(key_name=dest_path) @@ -224,8 +237,9 @@ if not local_md5: local_md5 = _get_local_md5(path=source_path) if ('"%s"' % local_md5) == old_key.etag: - print 'Skipping upload of unmodified file gs://%s/%s : %s' % ( - b.name, dest_path, local_md5) + self.logger.info( + 'Skipping upload of unmodified file gs://%s/%s : %s' % ( + b.name, dest_path, local_md5)) return elif upload_if != self.UploadIf.ALWAYS: raise Exception('unknown value of upload_if: %s' % upload_if) @@ -280,6 +294,7 @@ id_type=id_type, id_value=id_value, permission=permission) def upload_dir_contents(self, source_dir, dest_bucket, dest_dir, + num_threads=DEFAULT_UPLOAD_THREADS, upload_if=UploadIf.ALWAYS, **kwargs): """Recursively upload contents of a local directory to Google Storage. @@ -289,6 +304,7 @@ dest_bucket: GS bucket to copy the files into dest_dir: full path (Posix-style) within that bucket; write the files into this directory. If None, write into the root directory of the bucket. + num_threads: how many files to upload at once upload_if: one of the UploadIf values, describing in which cases we should upload the file kwargs: any additional keyword arguments "inherited" from upload_file() @@ -310,6 +326,7 @@ relative_dirpath = dirpath[prefix_length:] for filename in filenames: source_fileset.add(os.path.join(relative_dirpath, filename)) + num_files_total = len(source_fileset) # If we are only uploading files conditionally, remove any unnecessary # files from source_fileset. @@ -343,13 +360,42 @@ raise Exception('unknown value of upload_if: %s' % upload_if) # Upload any files still in source_fileset. - for rel_path in sorted(source_fileset): - self.upload_file( - source_path=os.path.join(source_dir, rel_path), - dest_bucket=b, - dest_path=posixpath.join(dest_dir, rel_path), - upload_if=self.UploadIf.ALWAYS, - **kwargs) + num_files_to_upload = len(source_fileset) + self.logger.info('Uploading %d files, skipping %d ...' % ( + num_files_to_upload, num_files_total - num_files_to_upload)) + if num_files_to_upload == 0: + return + if num_threads > num_files_to_upload: + num_threads = num_files_to_upload + + # Create a work queue with all files that need to be uploaded. + q = Queue.Queue(maxsize=num_files_to_upload) + for rel_path in source_fileset: + q.put(rel_path) + + # Spin up worker threads to read from the task queue. + def worker(): + while True: + try: + rel_path = q.get(block=False) + except Queue.Empty: + return # no more tasks in the queue, so exit + self.logger.info(' Uploading file %d/%d: %s' % ( + num_files_to_upload - q.qsize(), num_files_to_upload, rel_path)) + self.upload_file( + source_path=os.path.join(source_dir, rel_path), + dest_bucket=b, + dest_path=posixpath.join(dest_dir, rel_path), + upload_if=self.UploadIf.ALWAYS, + **kwargs) + q.task_done() + for _ in range(num_threads): + t = threading.Thread(target=worker) + t.daemon = True + t.start() + + # Block until all files have been uploaded and all workers have exited. + q.join() def download_file(self, source_bucket, source_path, dest_path, create_subdirs_if_needed=False):
diff --git a/py/utils/gs_utils_manualtest.py b/py/utils/gs_utils_manualtest.py index 37621d1..0a77d74 100755 --- a/py/utils/gs_utils_manualtest.py +++ b/py/utils/gs_utils_manualtest.py
@@ -6,6 +6,7 @@ """ # System-level imports. +import logging import os import posixpath import random @@ -355,6 +356,7 @@ if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) _test_upload_if_multiple_files() _test_upload_if_one_file() _test_public_read()