add is_gs_url(), split_gs_url() static methods to gs_utils.py

This will be helpful for dealing with user input in the gs://path format.

R=rmistry@google.com

Review URL: https://codereview.chromium.org/427893002
diff --git a/py/utils/gs_utils.py b/py/utils/gs_utils.py
index 15ff912..d5c5b95 100644
--- a/py/utils/gs_utils.py
+++ b/py/utils/gs_utils.py
@@ -52,6 +52,8 @@
 # each core sits idle waiting for network I/O to complete.
 DEFAULT_UPLOAD_THREADS = 10
 
+_GS_PREFIX = 'gs://'
+
 
 class AnonymousGSConnection(GSConnection):
   """GSConnection class that allows anonymous connections.
@@ -580,6 +582,34 @@
         dirs.append(item.name[prefix_length:-1])
     return (dirs, files)
 
+  @staticmethod
+  def is_gs_url(url):
+    """Returns True if url is a legal Google Storage URL ("gs://bucket/file").
+    """
+    try:
+      if url.lower().startswith(_GS_PREFIX) and len(url) > len(_GS_PREFIX):
+        return url[len(_GS_PREFIX)].isalnum()
+      else:
+        return False
+    except AttributeError:
+      return False
+
+  @staticmethod
+  def split_gs_url(url):
+    """Returns (bucket, filepath) corresponding to a legal Google Storage URL.
+
+    Raises AttributeError if the input URL is not a legal Google Storage URL.
+    """
+    if not GSUtils.is_gs_url(url):
+      raise AttributeError('"%s" is not a legal Google Storage URL' % url)
+    prefix_removed = url[len(_GS_PREFIX):]
+    pathsep_index = prefix_removed.find('/')
+    if pathsep_index < 0:
+      return (prefix_removed, '')
+    else:
+      return (prefix_removed[:pathsep_index],
+              prefix_removed[pathsep_index+1:].strip('/'))
+
   def _connect_to_bucket(self, bucket):
     """Returns a Bucket object we can use to access a particular bucket in GS.
 
diff --git a/py/utils/gs_utils_manualtest.py b/py/utils/gs_utils_manualtest.py
index 37621d1..3365055 100755
--- a/py/utils/gs_utils_manualtest.py
+++ b/py/utils/gs_utils_manualtest.py
@@ -39,6 +39,40 @@
   return 'gs_utils_manualtest/%d' % random.randint(0, sys.maxint)
 
 
+def _test_static_methods():
+  """Test all static methods."""
+  gs = gs_utils.GSUtils
+
+  # (input url,  output bucket, output path) for each test case
+  testcases = [
+      (None,  None, None),
+      (5,  None, None),
+      ('',  None, None),
+      ('/one/two',  None, None),
+      ('http://one/two',  None, None),
+      ('gs:',  None, None),
+      ('gs://',  None, None),
+      ('gs:///',  None, None),
+      ('gs://???',  None, None),
+      ('gs:///bucket',  None, None),
+      ('gs://bucket',  'bucket', ''),
+      ('GS://bucket/',  'bucket', ''),
+      ('gs://bucket//',  'bucket', ''),
+      ('gs://bucket/path1',  'bucket', 'path1'),
+      ('gs://bucket/path1/path2',  'bucket', 'path1/path2'),
+      ('gs://bucket/path1/path2/',  'bucket', 'path1/path2'),
+      ('gs://bucket///path1/path2///',  'bucket', 'path1/path2'),
+      ('gs://bucket///path1//path2///',  'bucket', 'path1//path2'),
+  ]
+  for (url, bucket, path) in testcases:
+    is_legal_url = (bucket != None)
+    assert gs.is_gs_url(url) == is_legal_url, 'gs.is_gs_url("%s") == %s' % (
+        url, is_legal_url)
+    if is_legal_url:
+      assert gs.split_gs_url(url) == (bucket, path), (
+          'gs.split_gs_url("%s") == ("%s", "%s")' % (url, bucket, path))
+
+
 def _test_public_read():
   """Make sure we can read from public files without .boto file credentials."""
   gs = gs_utils.GSUtils()
@@ -355,6 +389,7 @@
 
 
 if __name__ == '__main__':
+  _test_static_methods()
   _test_upload_if_multiple_files()
   _test_upload_if_one_file()
   _test_public_read()