upload_dir_contents(): upload multiple files in parallel

BUG=skia:2780
R=borenet@google.com

Review URL: https://codereview.chromium.org/424553002
diff --git a/py/utils/gs_utils.py b/py/utils/gs_utils.py
index f53b4d1..7824c79 100644
--- a/py/utils/gs_utils.py
+++ b/py/utils/gs_utils.py
@@ -19,10 +19,13 @@
 # System-level imports
 import errno
 import hashlib
+import logging
 import os
 import posixpath
+import Queue
 import re
 import sys
+import threading
 
 # Imports from third-party code
 TRUNK_DIRECTORY = os.path.abspath(os.path.join(
@@ -43,6 +46,13 @@
 from boto.s3.connection import SubdomainCallingFormat
 from boto.s3.prefix import Prefix
 
+# How many files to upload at once, by default.
+# TODO(epoger): Is there a way to compute this intelligently?  To some extent
+# it is a function of how many cores are on the machine, and how many other
+# processes it is running; but it's probably more a function of how much time
+# each core sits idle waiting for network I/O to complete.
+DEFAULT_UPLOAD_THREADS = 10
+
 
 class AnonymousGSConnection(GSConnection):
   """GSConnection class that allows anonymous connections.
@@ -116,21 +126,24 @@
     IF_MODIFIED = 3 # if there is an existing file with the same name and
                     # contents, leave it alone
 
-  def __init__(self, boto_file_path=None):
+  def __init__(self, boto_file_path=None, logger=None):
     """Constructor.
 
     Params:
       boto_file_path: full path (local-OS-style) on local disk where .boto
           credentials file can be found.  If None, then the GSUtils object
           created will be able to access only public files in Google Storage.
+      logger: a logging.Logger instance to use for logging output; if None,
+          one will be created with default characteristics
 
     Raises an exception if no file is found at boto_file_path, or if the file
     found there is malformed.
     """
+    self.logger = logger or logging.getLogger(__name__)
     self._gs_access_key_id = None
     self._gs_secret_access_key = None
     if boto_file_path:
-      print 'Reading boto file from %s' % boto_file_path
+      self.logger.info('Reading boto file from %s' % boto_file_path)
       boto_dict = _config_file_as_dict(filepath=boto_file_path)
       self._gs_access_key_id = boto_dict['gs_access_key_id']
       self._gs_secret_access_key = boto_dict['gs_secret_access_key']
@@ -215,8 +228,8 @@
     if upload_if == self.UploadIf.IF_NEW:
       old_key = b.get_key(key_name=dest_path)
       if old_key:
-        print 'Skipping upload of existing file gs://%s/%s' % (
-            b.name, dest_path)
+        self.logger.info('Skipping upload of existing file gs://%s/%s' % (
+            b.name, dest_path))
         return
     elif upload_if == self.UploadIf.IF_MODIFIED:
       old_key = b.get_key(key_name=dest_path)
@@ -224,8 +237,9 @@
         if not local_md5:
           local_md5 = _get_local_md5(path=source_path)
         if ('"%s"' % local_md5) == old_key.etag:
-          print 'Skipping upload of unmodified file gs://%s/%s : %s' % (
-              b.name, dest_path, local_md5)
+          self.logger.info(
+              'Skipping upload of unmodified file gs://%s/%s : %s' % (
+                  b.name, dest_path, local_md5))
           return
     elif upload_if != self.UploadIf.ALWAYS:
       raise Exception('unknown value of upload_if: %s' % upload_if)
@@ -280,6 +294,7 @@
           id_type=id_type, id_value=id_value, permission=permission)
 
   def upload_dir_contents(self, source_dir, dest_bucket, dest_dir,
+                          num_threads=DEFAULT_UPLOAD_THREADS,
                           upload_if=UploadIf.ALWAYS, **kwargs):
     """Recursively upload contents of a local directory to Google Storage.
 
@@ -289,6 +304,7 @@
       dest_bucket: GS bucket to copy the files into
       dest_dir: full path (Posix-style) within that bucket; write the files into
           this directory.  If None, write into the root directory of the bucket.
+      num_threads: how many files to upload at once
       upload_if: one of the UploadIf values, describing in which cases we should
           upload the file
       kwargs: any additional keyword arguments "inherited" from upload_file()
@@ -310,6 +326,7 @@
       relative_dirpath = dirpath[prefix_length:]
       for filename in filenames:
         source_fileset.add(os.path.join(relative_dirpath, filename))
+    num_files_total = len(source_fileset)
 
     # If we are only uploading files conditionally, remove any unnecessary
     # files from source_fileset.
@@ -343,13 +360,42 @@
         raise Exception('unknown value of upload_if: %s' % upload_if)
 
     # Upload any files still in source_fileset.
-    for rel_path in sorted(source_fileset):
-      self.upload_file(
-          source_path=os.path.join(source_dir, rel_path),
-          dest_bucket=b,
-          dest_path=posixpath.join(dest_dir, rel_path),
-          upload_if=self.UploadIf.ALWAYS,
-          **kwargs)
+    num_files_to_upload = len(source_fileset)
+    self.logger.info('Uploading %d files, skipping %d ...' % (
+        num_files_to_upload, num_files_total - num_files_to_upload))
+    if num_files_to_upload == 0:
+      return
+    if num_threads > num_files_to_upload:
+      num_threads = num_files_to_upload
+
+    # Create a work queue with all files that need to be uploaded.
+    q = Queue.Queue(maxsize=num_files_to_upload)
+    for rel_path in source_fileset:
+      q.put(rel_path)
+
+    # Spin up worker threads to read from the task queue.
+    def worker():
+      while True:
+        try:
+          rel_path = q.get(block=False)
+        except Queue.Empty:
+          return  # no more tasks in the queue, so exit
+        self.logger.info(' Uploading file %d/%d: %s' % (
+            num_files_to_upload - q.qsize(), num_files_to_upload, rel_path))
+        self.upload_file(
+            source_path=os.path.join(source_dir, rel_path),
+            dest_bucket=b,
+            dest_path=posixpath.join(dest_dir, rel_path),
+            upload_if=self.UploadIf.ALWAYS,
+            **kwargs)
+        q.task_done()
+    for _ in range(num_threads):
+      t = threading.Thread(target=worker)
+      t.daemon = True
+      t.start()
+
+    # Block until all files have been uploaded and all workers have exited.
+    q.join()
 
   def download_file(self, source_bucket, source_path, dest_path,
                     create_subdirs_if_needed=False):
diff --git a/py/utils/gs_utils_manualtest.py b/py/utils/gs_utils_manualtest.py
index 37621d1..0a77d74 100755
--- a/py/utils/gs_utils_manualtest.py
+++ b/py/utils/gs_utils_manualtest.py
@@ -6,6 +6,7 @@
 """
 
 # System-level imports.
+import logging
 import os
 import posixpath
 import random
@@ -355,6 +356,7 @@
 
 
 if __name__ == '__main__':
+  logging.basicConfig(level=logging.INFO)
   _test_upload_if_multiple_files()
   _test_upload_if_one_file()
   _test_public_read()