blob: 85e6101f9e85fb4e2552e6b8508d5309465587b5 [file] [log] [blame]
package gcs
import (
// This file implements utility functions for accessing data in Google Storage.
var (
// dirMap maps dataset name to a slice with Google Storage subdirectory and file prefix.
dirMap = map[string][]string{
"skps": {"pics-json-v2", "bench_"},
"micro": {"stats-json-v2", "microbench2_"},
trybotDataPath = regexp.MustCompile(`^[a-z]*[/]?([0-9]{4}/[0-9]{2}/[0-9]{2}/[0-9]{2}/[0-9a-zA-Z-]+-Trybot/[0-9]+/[0-9]+)$`)
// RequestForStorageURL returns an http.Request for a given Cloud Storage URL.
// This is workaround of a known issue: embedded slashes in URLs require use of
// URL.Opaque property
func RequestForStorageURL(url string) (*http.Request, error) {
r, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, fmt.Errorf("HTTP new request error: %s", err)
schemePos := strings.Index(url, ":")
queryPos := strings.Index(url, "?")
if queryPos == -1 {
queryPos = len(url)
r.URL.Opaque = url[schemePos+1 : queryPos]
return r, nil
// FileContentsFromGCS returns the contents of a file in the given bucket or an error.
func FileContentsFromGCS(s *storage.Client, bucketName, fileName string) ([]byte, error) {
response, err := s.Bucket(bucketName).Object(fileName).NewReader(context.Background())
if err != nil {
return nil, err
defer util.Close(response)
return ioutil.ReadAll(response)
// AllFilesInDir synchronously iterates through all the files in a given Google Storage folder.
// The callback function is called on each item in the order it is in the bucket.
// It returns an error if the bucket or folder cannot be accessed.
func AllFilesInDir(s *storage.Client, bucket, folder string, callback func(item *storage.ObjectAttrs)) error {
total := 0
q := &storage.Query{Prefix: folder, Versions: false}
it := s.Bucket(bucket).Objects(context.Background(), q)
for obj, err := it.Next(); err != iterator.Done; obj, err = it.Next() {
if err != nil {
return fmt.Errorf("Problem reading from Google Storage: %v", err)
return nil
// DeleteAllFilesInDir deletes all the files in a given folder. If processes is set to > 1,
// that many go routines will be spun up to delete the file simultaneously. Otherwise, it will
// be done one one process.
func DeleteAllFilesInDir(s *storage.Client, bucket, folder string, processes int) error {
if processes <= 0 {
processes = 1
errCount := int32(0)
var wg sync.WaitGroup
toDelete := make(chan string, 1000)
for i := 0; i < processes; i++ {
go deleteHelper(s, bucket, &wg, toDelete, &errCount)
del := func(item *storage.ObjectAttrs) {
toDelete <- item.Name
if err := AllFilesInDir(s, bucket, folder, del); err != nil {
return err
if errCount > 0 {
return fmt.Errorf("There were one or more problems when deleting files in folder %q", folder)
return nil
// deleteHelper spins and waits for work to come in on the toDelete channel. When it does, it
// uses the storage client to delete the file from the given bucket.
func deleteHelper(s *storage.Client, bucket string, wg *sync.WaitGroup, toDelete <-chan string, errCount *int32) {
defer wg.Done()
for file := range toDelete {
if err := s.Bucket(bucket).Object(file).Delete(context.Background()); err != nil {
// Ignore 404 errors on deleting, as they are already gone.
if !strings.Contains(err.Error(), "statuscode 404") {
sklog.Errorf("Problem deleting gs://%s/%s: %s", bucket, file, err)
atomic.AddInt32(errCount, 1)
// Write the given content to the given object in Google Storage.
func WriteObj(o *storage.ObjectHandle, content []byte) (err error) {
w := o.NewWriter(context.Background())
w.ObjectAttrs.ContentEncoding = "gzip"
if err := util.WithGzipWriter(w, func(w io.Writer) error {
_, err := w.Write(content)
return err
}); err != nil {
_ = w.CloseWithError(err) // Always returns nil, according to docs.
return err
return w.Close()
// SplitGSPath takes a GCS path and splits it into a <bucket,path> pair.
// It assumes the format: {bucket_name}/{path_within_bucket}.
func SplitGSPath(path string) (string, string) {
parts := strings.SplitN(path, "/", 2)
if len(parts) > 1 {
return parts[0], parts[1]
return path, ""
// WithWriteFile writes to a GCS object using the given function, handling all errors. No
// compression is done on the data. See GCSClient.FileWriter for details on the parameters.
func WithWriteFile(client GCSClient, ctx context.Context, path string, opts FileWriteOptions, fn func(io.Writer) error) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
writer := client.FileWriter(ctx, path, opts)
if err := fn(writer); err != nil {
return err
return writer.Close()
// WithWriteFileGzip writes to a GCS object using the given function, compressing the data with gzip
// and handling all errors. See GCSClient.FileWriter for details on the parameters.
func WithWriteFileGzip(client GCSClient, ctx context.Context, path string, fn func(io.Writer) error) error {
opts := FileWriteOptions{
ContentEncoding: "gzip",
return WithWriteFile(client, ctx, path, opts, func(w io.Writer) error {
return util.WithGzipWriter(w, fn)