gs_utils: when uploading a whole dir using IF_NEW, check for existence of multiple files in a single operation


Review URL:
diff --git a/py/utils/ b/py/utils/
index c58bc67..7f3b59c 100644
--- a/py/utils/
+++ b/py/utils/
@@ -104,9 +104,9 @@
   class UploadIf:
     """Cases in which we will upload a file.
-    Beware of performance tradeoffs.  E.g., if the file is small, the extra
-    round trip to check for file existence and/or checksum may take longer than
-    just uploading the file.
+    Beware of performance tradeoffs.  E.g., if you are uploading just one small
+    file, the extra round trip to check for file existence and/or checksum may
+    take longer than just uploading the file.
     See ('gs_utils: when uploading IF_NEW, batch up
     checks for existing files within a single remote directory')
@@ -244,7 +244,8 @@
           id_type=id_type, id_value=id_value, permission=permission)
-  def upload_dir_contents(self, source_dir, dest_bucket, dest_dir, **kwargs):
+  def upload_dir_contents(self, source_dir, dest_bucket, dest_dir,
+                          upload_if=UploadIf.ALWAYS, **kwargs):
     """Recursively upload contents of a local directory to Google Storage.
@@ -253,34 +254,67 @@
       dest_bucket: GS bucket to copy the files into
       dest_dir: full path (Posix-style) within that bucket; write the files into
           this directory.  If None, write into the root directory of the bucket.
+      upload_if: one of the UploadIf values, describing in which cases we should
+          upload the file
       kwargs: any additional keyword arguments "inherited" from upload_file()
     The copy operates as a merge: any files in source_dir will be "overlaid" on
     top of the existing content in dest_dir.  Existing files with the same names
-    may or may not be overwritten, depending on the value of the upload_if kwarg
-    inherited from upload_file().
+    may or may not be overwritten, depending on the value of upload_if.
     TODO(epoger): Upload multiple files simultaneously to reduce latency.
-    TODO(epoger): When upload_if==IF_NEW, batch up checks for existing files
-    within a single remote directory. See
     b = self._connect_to_bucket(bucket=dest_bucket)
-    for filename in sorted(os.listdir(source_dir)):
-      local_path = os.path.join(source_dir, filename)
-      if dest_dir:
-        remote_path = posixpath.join(dest_dir, filename)
-      else:
-        remote_path = filename
+    if not dest_dir:
+      dest_dir = ''
-      if os.path.isdir(local_path):
-        self.upload_dir_contents(  # recurse
-            source_dir=local_path, dest_bucket=b, dest_dir=remote_path,
-            **kwargs)
+    # Create a set of all files within source_dir.
+    source_fileset = set()
+    prefix_length = len(source_dir)+1
+    for dirpath, _, filenames in os.walk(source_dir):
+      relative_dirpath = dirpath[prefix_length:]
+      for filename in filenames:
+        source_fileset.add(os.path.join(relative_dirpath, filename))
+    # If we are only uploading files conditionally, remove any unnecessary
+    # files from source_fileset.
+    if upload_if == self.UploadIf.ALWAYS:
+      pass  # there are no shortcuts... upload them all
+    else:
+      # Create a mapping of filename to Key for existing files within dest_dir
+      existing_dest_filemap = {}
+      prefix = dest_dir
+      if prefix and not prefix.endswith('/'):
+        prefix += '/'
+      prefix_length = len(prefix)
+      items = BucketListResultSet(bucket=b, prefix=prefix)
+      for item in items:
+        if type(item) is Key:
+          existing_dest_filemap[[prefix_length:]] = item
+      # Now, depending on upload_if, trim files we should skip uploading.
+      files_in_common = source_fileset.intersection(
+          existing_dest_filemap.keys())
+      if upload_if == self.UploadIf.IF_NEW:
+        source_fileset -= files_in_common
+      elif upload_if == self.UploadIf.IF_MODIFIED:
+        for rel_path in files_in_common:
+          local_md5 = '"%s"' % _get_local_md5(path=os.path.join(
+              source_dir, rel_path))
+          key = existing_dest_filemap[rel_path]
+          if local_md5 == key.etag:
+            source_fileset.remove(rel_path)
-        self.upload_file(
-            source_path=local_path, dest_bucket=b, dest_path=remote_path,
-            **kwargs)
+        raise Exception('unknown value of upload_if: %s' % upload_if)
+    # Upload any files still in source_fileset.
+    for rel_path in sorted(source_fileset):
+      self.upload_file(
+          source_path=os.path.join(source_dir, rel_path),
+          dest_bucket=b,
+          dest_path=posixpath.join(dest_dir, rel_path),
+          upload_if=self.UploadIf.ALWAYS,
+          **kwargs)
   def download_file(self, source_bucket, source_path, dest_path,
diff --git a/py/utils/ b/py/utils/
index b4f70a6..37621d1 100755
--- a/py/utils/
+++ b/py/utils/
@@ -45,7 +45,7 @@
   gs.list_bucket_contents(bucket=TEST_BUCKET, subdir=None)
-def _test_upload_if():
+def _test_upload_if_one_file():
   """Test upload_if param within upload_file()."""
   gs = _get_authenticated_gs_handle()
   filename = 'filename'
@@ -113,6 +113,86 @@
     # Clean up the local dir.
+def _test_upload_if_multiple_files():
+  """Test upload_if param within upload_dir_contents()."""
+  gs = _get_authenticated_gs_handle()
+  subdir = 'subdir'
+  filenames = ['file1', 'file2']
+  local_dir = tempfile.mkdtemp()
+  remote_dir = _get_unique_posix_dir()
+  sample_file_local_path = os.path.join(local_dir, subdir, filenames[0])
+  sample_file_remote_path = posixpath.join(remote_dir, subdir, filenames[0])
+  try:
+    # Create files on local disk, and upload them for the first time.
+    os.mkdir(os.path.join(local_dir, subdir))
+    for filename in filenames:
+      with open(os.path.join(local_dir, subdir, filename), 'w') as f:
+        f.write('original contents of %s' % filename)
+    gs.upload_dir_contents(
+        source_dir=local_dir, dest_bucket=TEST_BUCKET,
+        dest_dir=remote_dir, upload_if=gs.UploadIf.IF_NEW)
+    try:
+      # Re-upload the same files, with upload_if=gs.UploadIf.ALWAYS;
+      # the timestamps should change.
+      old_timestamp = gs.get_last_modified_time(
+          bucket=TEST_BUCKET, path=sample_file_remote_path)
+      time.sleep(2)
+      gs.upload_dir_contents(
+          source_dir=local_dir, dest_bucket=TEST_BUCKET,
+          dest_dir=remote_dir, upload_if=gs.UploadIf.ALWAYS)
+      new_timestamp = gs.get_last_modified_time(
+          bucket=TEST_BUCKET, path=sample_file_remote_path)
+      assert old_timestamp != new_timestamp, '%s != %s' % (
+          old_timestamp, new_timestamp)
+      # Re-upload the same files, with upload_if=gs.UploadIf.IF_MODIFIED;
+      # the timestamps should NOT change.
+      old_timestamp = new_timestamp
+      time.sleep(2)
+      gs.upload_dir_contents(
+          source_dir=local_dir, dest_bucket=TEST_BUCKET,
+          dest_dir=remote_dir, upload_if=gs.UploadIf.IF_MODIFIED)
+      new_timestamp = gs.get_last_modified_time(
+          bucket=TEST_BUCKET, path=sample_file_remote_path)
+      assert old_timestamp == new_timestamp, '%s == %s' % (
+          old_timestamp, new_timestamp)
+      # Modify and re-upload the files, with upload_if=gs.UploadIf.IF_NEW;
+      # the timestamps should still not change.
+      old_timestamp = new_timestamp
+      with open(sample_file_local_path, 'w') as f:
+        f.write('modified contents of sample file')
+      time.sleep(2)
+      gs.upload_dir_contents(
+          source_dir=local_dir, dest_bucket=TEST_BUCKET,
+          dest_dir=remote_dir, upload_if=gs.UploadIf.IF_NEW)
+      new_timestamp = gs.get_last_modified_time(
+          bucket=TEST_BUCKET, path=sample_file_remote_path)
+      assert old_timestamp == new_timestamp, '%s == %s' % (
+          old_timestamp, new_timestamp)
+      # Re-upload the modified file, with upload_if=gs.UploadIf.IF_MODIFIED;
+      # now the timestamp SHOULD change.
+      old_timestamp = new_timestamp
+      time.sleep(2)
+      gs.upload_dir_contents(
+          source_dir=local_dir, dest_bucket=TEST_BUCKET,
+          dest_dir=remote_dir, upload_if=gs.UploadIf.IF_MODIFIED)
+      new_timestamp = gs.get_last_modified_time(
+          bucket=TEST_BUCKET, path=sample_file_remote_path)
+      assert old_timestamp != new_timestamp, '%s != %s' % (
+          old_timestamp, new_timestamp)
+    finally:
+      # Delete all the files we uploaded to Google Storage.
+      for filename in filenames:
+        gs.delete_file(bucket=TEST_BUCKET,
+                       path=posixpath.join(remote_dir, subdir, filename))
+  finally:
+    # Clean up the local dir.
+    shutil.rmtree(local_dir)
 def _test_authenticated_round_trip():
   gs = _get_authenticated_gs_handle()
   remote_dir = _get_unique_posix_dir()
@@ -275,7 +355,8 @@
 if __name__ == '__main__':
-  _test_upload_if()
+  _test_upload_if_multiple_files()
+  _test_upload_if_one_file()