| // Google Storage utility that contains methods for both CT master and worker |
| // scripts. |
| package util |
| |
| import ( |
| "context" |
| "fmt" |
| "io" |
| "net/http" |
| "os" |
| "path" |
| "path/filepath" |
| "runtime" |
| "strconv" |
| "strings" |
| "sync" |
| "time" |
| |
| "go.skia.org/infra/go/auth" |
| "go.skia.org/infra/go/gcs" |
| "go.skia.org/infra/go/httputils" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/util" |
| "go.skia.org/infra/go/util/zip" |
| "golang.org/x/oauth2/google" |
| |
| storage "google.golang.org/api/storage/v1" |
| ) |
| |
| const ( |
| DOWNLOAD_UPLOAD_GOROUTINE_POOL_SIZE = 30 |
| // Use larger pool size for deletions. This is useful when deleting directories |
| // with 1M/1B subdirectories from the master. Google Storage will not be overwhelmed |
| // because all workers do not do large scale deletions at the same time. |
| DELETE_GOROUTINE_POOL_SIZE = 1000 |
| ) |
| |
| type GcsUtil struct { |
| // The client used to connect to Google Storage. |
| client *http.Client |
| service *storage.Service |
| } |
| |
| // NewGcsUtil initializes and returns a utility for CT interations with Google |
| // Storage. If client is nil then a client is created with the default token |
| // source. |
| func NewGcsUtil(client *http.Client) (*GcsUtil, error) { |
| if client == nil { |
| ts, err := google.DefaultTokenSource(context.TODO(), auth.ScopeFullControl) |
| if err != nil { |
| return nil, fmt.Errorf("Problem setting up default token source: %s", err) |
| } |
| clientConfig := httputils.DefaultClientConfig().With2xxOnly().WithTokenSource(ts) |
| client = clientConfig.Client() |
| } |
| client.Timeout = HTTP_CLIENT_TIMEOUT |
| service, err := storage.New(client) |
| if err != nil { |
| return nil, fmt.Errorf("Failed to create interface to Google Storage: %s", err) |
| } |
| return &GcsUtil{client: client, service: service}, nil |
| } |
| |
| // Returns the response body of the specified GCS object. Tries MAX_URI_GET_TRIES |
| // times if download is unsuccessful. Client must close the response body when |
| // finished with it. |
| func getRespBody(res *storage.Object, client *http.Client) (io.ReadCloser, error) { |
| for i := 0; i < MAX_URI_GET_TRIES; i++ { |
| request, err := gcs.RequestForStorageURL(res.MediaLink) |
| if err != nil { |
| sklog.Warningf("Unable to create Storage MediaURI request: %s\n", err) |
| continue |
| } |
| |
| resp, err := client.Do(request) |
| if err != nil { |
| sklog.Warningf("Unable to retrieve Storage MediaURI: %s", err) |
| continue |
| } |
| if resp.StatusCode != 200 { |
| sklog.Warningf("Failed to retrieve: %d %s", resp.StatusCode, resp.Status) |
| util.Close(resp.Body) |
| continue |
| } |
| return resp.Body, nil |
| } |
| return nil, fmt.Errorf("Failed fetching file after %d attempts", MAX_URI_GET_TRIES) |
| } |
| |
| // Returns the response body of the specified GCS file from the default CT |
| // bucket. Client must close the response body when finished with it. |
| func (gs *GcsUtil) GetRemoteFileContents(filePath string) (io.ReadCloser, error) { |
| return gs.GetRemoteFileContentsFromBucket(GCSBucketName, filePath) |
| } |
| |
| // Returns the response body of the specified GCS file. Client must close the |
| // response body when finished with it. |
| func (gs *GcsUtil) GetRemoteFileContentsFromBucket(bucket, filePath string) (io.ReadCloser, error) { |
| res, err := gs.service.Objects.Get(bucket, filePath).Do() |
| if err != nil { |
| return nil, fmt.Errorf("Could not get %s from GCS: %s", filePath, err) |
| } |
| return getRespBody(res, gs.client) |
| } |
| |
| type filePathToStorageObject struct { |
| storageObject *storage.Object |
| filePath string |
| } |
| |
| // downloadRemoteDir downloads the specified Google Storage dir to the specified |
| // local dir. The local dir will be emptied and recreated. Handles multiple levels |
| // of directories. |
| func (gs *GcsUtil) downloadRemoteDir(localDir, bucket, gsDir string) error { |
| // Empty the local dir. |
| util.RemoveAll(localDir) |
| // Create the local dir. |
| MkdirAll(localDir, 0700) |
| // The channel where the storage objects to be downloaded will be sent to. |
| chStorageObjects := make(chan filePathToStorageObject, DOWNLOAD_UPLOAD_GOROUTINE_POOL_SIZE) |
| |
| // Kick off one goroutine to populate the channel. |
| errPopulator := make(chan error, 1) |
| var wgPopulator sync.WaitGroup |
| wgPopulator.Add(1) |
| go func() { |
| defer wgPopulator.Done() |
| defer close(chStorageObjects) |
| req := gs.service.Objects.List(bucket).Prefix(gsDir + "/") |
| for req != nil { |
| resp, err := req.Do() |
| if err != nil { |
| errPopulator <- fmt.Errorf("Error occured while listing %s: %s", gsDir, err) |
| return |
| } |
| for _, result := range resp.Items { |
| fileName := path.Base(result.Name) |
| // If downloading from subdir then add it to the fileName. |
| fileGsDir := path.Dir(result.Name) |
| subDirs := strings.TrimPrefix(fileGsDir, gsDir) |
| if subDirs != "" { |
| dirTokens := strings.Split(subDirs, "/") |
| for i := range dirTokens { |
| fileName = filepath.Join(dirTokens[len(dirTokens)-i-1], fileName) |
| } |
| // Create the local directory. |
| MkdirAll(filepath.Join(localDir, filepath.Dir(fileName)), 0700) |
| } |
| chStorageObjects <- filePathToStorageObject{storageObject: result, filePath: fileName} |
| } |
| if len(resp.NextPageToken) > 0 { |
| req.PageToken(resp.NextPageToken) |
| } else { |
| req = nil |
| } |
| } |
| }() |
| |
| // Kick off goroutines to download the storage objects. |
| var wgConsumer sync.WaitGroup |
| for i := 0; i < DOWNLOAD_UPLOAD_GOROUTINE_POOL_SIZE; i++ { |
| wgConsumer.Add(1) |
| go func(goroutineNum int) { |
| defer wgConsumer.Done() |
| for obj := range chStorageObjects { |
| if err := downloadStorageObj(obj, gs.client, localDir, goroutineNum); err != nil { |
| sklog.Errorf("Could not download storage object: %s", err) |
| return |
| } |
| // Sleep for a second after downloading file to avoid bombarding Cloud |
| // storage. |
| time.Sleep(time.Second) |
| } |
| }(i + 1) |
| } |
| |
| wgPopulator.Wait() |
| wgConsumer.Wait() |
| // Check if there was an error listing the GCS dir. |
| select { |
| case err, ok := <-errPopulator: |
| if ok { |
| return err |
| } |
| default: |
| } |
| return nil |
| } |
| |
| func downloadStorageObj(obj filePathToStorageObject, c *http.Client, localDir string, goroutineNum int) error { |
| result := obj.storageObject |
| filePath := obj.filePath |
| respBody, err := getRespBody(result, c) |
| if err != nil { |
| return fmt.Errorf("Could not fetch %s: %s", result.MediaLink, err) |
| } |
| defer util.Close(respBody) |
| outputFile := filepath.Join(localDir, filePath) |
| out, err := os.Create(outputFile) |
| if err != nil { |
| return fmt.Errorf("Unable to create file %s: %s", outputFile, err) |
| } |
| defer util.Close(out) |
| if _, err = io.Copy(out, respBody); err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| // DownloadChromiumBuild downloads the specified Chromium build from Google |
| // Storage to a local dir. It returns the path to the chrome binary's dir. |
| func (gs *GcsUtil) DownloadChromiumBuild(chromiumBuild, chromiumBinaryName string) (string, error) { |
| localDir := filepath.Join(ChromiumBuildsDir, chromiumBuild) |
| gsDir := path.Join(CHROMIUM_BUILDS_DIR_NAME, chromiumBuild) |
| return gs.DownloadChromiumBuildFromTo(localDir, GCSBucketName, gsDir, CHROMIUM_BUILD_ZIP_NAME, chromiumBinaryName) |
| } |
| |
| // DownloadChromiumBuildFromTo downloads a chromium build from the specified |
| // gs://${bucket}/${gsDir}/${zipName} to the localDir. It returns the path to the chrome |
| // binary's dir. |
| func (gs *GcsUtil) DownloadChromiumBuildFromTo(localDir, bucket, gsDir, zipName, chromiumBinaryName string) (string, error) { |
| sklog.Infof("Downloading %s from Google Storage to %s", gsDir, localDir) |
| if err := gs.downloadRemoteDir(localDir, bucket, gsDir); err != nil { |
| return "", fmt.Errorf("Error downloading %s into %s: %s", gsDir, localDir, err) |
| } |
| |
| // Unzip the build. |
| zipFilePath := filepath.Join(localDir, zipName) |
| if err := zip.UnZip(localDir, zipFilePath); err != nil { |
| return "", fmt.Errorf("Error when unzipping %s: %s", zipFilePath, err) |
| } |
| |
| // The unzipped directory could be flat or be one level up. |
| pathToBinary := filepath.Join(localDir, chromiumBinaryName) |
| if _, err := os.Stat(pathToBinary); os.IsNotExist(err) { |
| sklog.Infof("Could not find chrome binary %s at %s. Going to try one level up.", chromiumBinaryName, pathToBinary) |
| pathToBinary = filepath.Join(localDir, strings.TrimSuffix(zipName, filepath.Ext(zipName)), chromiumBinaryName) |
| if _, err := os.Stat(pathToBinary); os.IsNotExist(err) { |
| return "", fmt.Errorf("Could not find chrome binary %s at %s", chromiumBinaryName, pathToBinary) |
| } |
| } |
| |
| if runtime.GOOS != "windows" { |
| // Downloaded chrome binary needs to be set as an executable on linux. |
| util.LogErr(os.Chmod(pathToBinary, 0777)) |
| } |
| |
| return filepath.Dir(pathToBinary), nil |
| } |
| |
| // DeleteRemoteDirLogErr wraps DeleteRemoteDir and logs an error if one is returned. |
| func (gs *GcsUtil) DeleteRemoteDirLogErr(gsDir string) { |
| if err := gs.DeleteRemoteDir(gsDir); err != nil { |
| sklog.Errorf("Failed to DeleteRemoteDir(%s): %s", gsDir, err) |
| } |
| } |
| |
| func (gs *GcsUtil) DeleteRemoteDir(gsDir string) error { |
| // The channel where the GCS filepaths to be deleted will be sent to. |
| chFilePaths := make(chan string, DELETE_GOROUTINE_POOL_SIZE) |
| |
| // Kick off one goroutine to populate the channel. |
| errPopulator := make(chan error, 1) |
| var wgPopulator sync.WaitGroup |
| wgPopulator.Add(1) |
| go func() { |
| defer wgPopulator.Done() |
| defer close(chFilePaths) |
| req := gs.service.Objects.List(GCSBucketName).Prefix(gsDir + "/") |
| for req != nil { |
| resp, err := req.Do() |
| if err != nil { |
| errPopulator <- fmt.Errorf("Error occured while listing %s: %s", gsDir, err) |
| return |
| } |
| for _, result := range resp.Items { |
| chFilePaths <- result.Name |
| } |
| if len(resp.NextPageToken) > 0 { |
| req.PageToken(resp.NextPageToken) |
| } else { |
| req = nil |
| } |
| } |
| }() |
| |
| // Kick off goroutines to delete the file paths. |
| var wgConsumer sync.WaitGroup |
| for i := 0; i < DELETE_GOROUTINE_POOL_SIZE; i++ { |
| wgConsumer.Add(1) |
| go func(goroutineNum int) { |
| defer wgConsumer.Done() |
| for filePath := range chFilePaths { |
| if err := gs.service.Objects.Delete(GCSBucketName, filePath).Do(); err != nil { |
| sklog.Errorf("Goroutine#%d could not delete %s: %s", goroutineNum, filePath, err) |
| return |
| } |
| // Sleep for a second after deleting file to avoid bombarding Cloud |
| // storage. |
| time.Sleep(time.Second) |
| } |
| }(i + 1) |
| } |
| |
| wgPopulator.Wait() |
| wgConsumer.Wait() |
| // Check if there was an error listing the GCS dir. |
| select { |
| case err, ok := <-errPopulator: |
| if ok { |
| return err |
| } |
| default: |
| } |
| return nil |
| } |
| |
| // UploadFile calls UploadFileToBucket with CT's default bucket. |
| func (gs *GcsUtil) UploadFile(fileName, localDir, gsDir string) error { |
| return gs.UploadFileToBucket(fileName, localDir, gsDir, GCSBucketName) |
| } |
| |
| // UploadFileToBucket uploads the specified file to the remote dir of the bucket |
| // in Google Storage. It also sets the appropriate ACLs on the uploaded file. |
| func (gs *GcsUtil) UploadFileToBucket(fileName, localDir, gsDir, bucket string) error { |
| localFile := filepath.Join(localDir, fileName) |
| gsFile := path.Join(gsDir, fileName) |
| object := &storage.Object{ |
| Name: gsFile, |
| // All objects uploaded to CT's bucket via this util must be readable by |
| // the google.com domain. This will be fine tuned later if required. |
| Acl: []*storage.ObjectAccessControl{ |
| { |
| Bucket: bucket, |
| Entity: "domain-google.com", |
| Object: gsFile, |
| Role: "READER", |
| }, |
| }, |
| } |
| f, err := os.Open(localFile) |
| if err != nil { |
| return fmt.Errorf("Error opening %s: %s", localFile, err) |
| } |
| defer util.Close(f) |
| // TODO(rmistry): gs api now enables resumable uploads by default. Handle 308 |
| // response codes. |
| if _, err := gs.service.Objects.Insert(bucket, object).Media(f).Do(); err != nil { |
| return fmt.Errorf("Objects.Insert failed: %s", err) |
| } |
| sklog.Infof("Copied %s to %s", localFile, fmt.Sprintf("gs://%s/%s", bucket, gsFile)) |
| return nil |
| } |
| |
| // UploadSwarmingArtifact uploads the specified local artifacts to Google Storage. |
| func (gs *GcsUtil) UploadSwarmingArtifacts(dirName, pagesetType string) error { |
| localDir := path.Join(StorageDir, dirName, pagesetType) |
| gsDir := path.Join(SWARMING_DIR_NAME, dirName, pagesetType) |
| |
| return gs.UploadDir(localDir, gsDir, false) |
| } |
| |
| // DownloadSwarmingArtifacts downloads the specified artifacts from Google Storage to a local dir. |
| // The Google storage directory is assumed to have numerical subdirs Eg: {1..1000}. This function |
| // downloads the contents of those directories into a local directory without the numerical |
| // subdirs. |
| // Returns the ranking/index of the downloaded artifact. |
| func (gs *GcsUtil) DownloadSwarmingArtifacts(localDir, remoteDirName, pagesetType string, startRange, num int) (map[string]int, error) { |
| // Empty the local dir. |
| util.RemoveAll(localDir) |
| // Create the local dir. |
| MkdirAll(localDir, 0700) |
| |
| gsDir := path.Join(SWARMING_DIR_NAME, remoteDirName, pagesetType) |
| endRange := num + startRange - 1 |
| // The channel where remote files to be downloaded will be sent to. |
| chRemoteDirs := make(chan string, num) |
| for i := startRange; i <= endRange; i++ { |
| chRemoteDirs <- path.Join(gsDir, strconv.Itoa(i)) |
| } |
| close(chRemoteDirs) |
| |
| // Dictionary of artifacts to its rank/index. |
| artifactToIndex := map[string]int{} |
| // Mutex to control access to the above dictionary. |
| var mtx sync.Mutex |
| // Kick off goroutines to download artifacts and populate the artifactToIndex dictionary. |
| var wg sync.WaitGroup |
| for i := 0; i < DOWNLOAD_UPLOAD_GOROUTINE_POOL_SIZE; i++ { |
| wg.Add(1) |
| go func(goroutineNum int) { |
| defer wg.Done() |
| for remoteDir := range chRemoteDirs { |
| if err := gs.downloadFromSwarmingDir(remoteDir, gsDir, localDir, goroutineNum, &mtx, artifactToIndex); err != nil { |
| sklog.Error(err) |
| return |
| } |
| } |
| }(i + 1) |
| } |
| wg.Wait() |
| if len(chRemoteDirs) != 0 { |
| return artifactToIndex, fmt.Errorf("Unable to download all artifacts.") |
| } |
| return artifactToIndex, nil |
| } |
| |
| // GetRemoteDirCount returns the number of objects in the specified dir. |
| func (gs *GcsUtil) GetRemoteDirCount(gsDir string) (int, error) { |
| req := gs.service.Objects.List(GCSBucketName).Prefix(gsDir + "/") |
| count := 0 |
| for req != nil { |
| resp, err := req.Do() |
| if err != nil { |
| return -1, fmt.Errorf("Error occured while listing %s: %s", gsDir, err) |
| } |
| count += len(resp.Items) |
| if len(resp.NextPageToken) > 0 { |
| req.PageToken(resp.NextPageToken) |
| } else { |
| req = nil |
| } |
| } |
| return count, nil |
| } |
| |
| func (gs *GcsUtil) downloadFromSwarmingDir(remoteDir, gsDir, localDir string, runID int, mtx *sync.Mutex, artifactToIndex map[string]int) error { |
| req := gs.service.Objects.List(GCSBucketName).Prefix(remoteDir + "/") |
| for req != nil { |
| resp, err := req.Do() |
| if err != nil { |
| return fmt.Errorf("Error occured while listing %s: %s", gsDir, err) |
| } |
| for _, result := range resp.Items { |
| fileName := path.Base(result.Name) |
| fileGsDir := path.Dir(result.Name) |
| index, err := strconv.Atoi(path.Base(fileGsDir)) |
| if err != nil { |
| return fmt.Errorf("%s was not in expected format: %s", fileGsDir, err) |
| } |
| respBody, err := getRespBody(result, gs.client) |
| if err != nil { |
| return fmt.Errorf("Could not fetch %s: %s", result.MediaLink, err) |
| } |
| defer util.Close(respBody) |
| outputFile := filepath.Join(localDir, fileName) |
| out, err := os.Create(outputFile) |
| if err != nil { |
| return fmt.Errorf("Unable to create file %s: %s", outputFile, err) |
| } |
| defer util.Close(out) |
| if _, err = io.Copy(out, respBody); err != nil { |
| return err |
| } |
| // Sleep for a second after downloading file to avoid bombarding Cloud |
| // storage. |
| time.Sleep(time.Second) |
| mtx.Lock() |
| artifactToIndex[path.Join(localDir, fileName)] = index |
| mtx.Unlock() |
| } |
| if len(resp.NextPageToken) > 0 { |
| req.PageToken(resp.NextPageToken) |
| } else { |
| req = nil |
| } |
| } |
| return nil |
| } |
| |
| // UploadDir uploads the specified local dir into the specified Google Storage dir. |
| func (gs *GcsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error { |
| if cleanDir { |
| // Empty the remote dir before uploading to it. |
| util.LogErr(gs.DeleteRemoteDir(gsDir)) |
| } |
| |
| // Construct a dictionary of file paths to their file infos. |
| pathsToFileInfos := map[string]os.FileInfo{} |
| visit := func(path string, f os.FileInfo, err error) error { |
| if f.IsDir() { |
| return nil |
| } |
| pathsToFileInfos[path] = f |
| return nil |
| } |
| if err := filepath.Walk(localDir, visit); err != nil { |
| return fmt.Errorf("Unable to read the local dir %s: %s", localDir, err) |
| } |
| |
| // The channel where the filepaths to be uploaded will be sent to. |
| chFilePaths := make(chan string, len(pathsToFileInfos)) |
| // File filepaths and send it to the above channel. |
| for path, fileInfo := range pathsToFileInfos { |
| fileName := fileInfo.Name() |
| containingDir := strings.TrimSuffix(path, fileName) |
| subDirs := strings.TrimPrefix(containingDir, localDir) |
| if subDirs != "" { |
| dirTokens := strings.Split(subDirs, "/") |
| for i := range dirTokens { |
| fileName = filepath.Join(dirTokens[len(dirTokens)-i-1], fileName) |
| } |
| } |
| chFilePaths <- fileName |
| } |
| close(chFilePaths) |
| |
| // Kick off goroutines to upload the file paths. |
| var wg sync.WaitGroup |
| for i := 0; i < DOWNLOAD_UPLOAD_GOROUTINE_POOL_SIZE; i++ { |
| wg.Add(1) |
| go func(goroutineNum int) { |
| defer wg.Done() |
| for filePath := range chFilePaths { |
| if err := gs.UploadFile(filePath, localDir, gsDir); err != nil { |
| sklog.Errorf("Goroutine#%d could not upload %s to %s: %s", goroutineNum, filePath, localDir, err) |
| } |
| // Sleep for a second after uploading file to avoid bombarding Cloud |
| // storage. |
| time.Sleep(time.Second) |
| } |
| }(i + 1) |
| } |
| wg.Wait() |
| return nil |
| } |
| |
| // DownloadRemoteFile calls DownloadRemoteFileFromBucket with CT's default bucket. |
| func (gs *GcsUtil) DownloadRemoteFile(remotePath, localPath string) error { |
| return gs.DownloadRemoteFileFromBucket(GCSBucketName, remotePath, localPath) |
| } |
| |
| // DownloadRemoteFileFromBucket downloads the specified remote path into the specified |
| // local file. This function has been tested to download very large files (~33GB). |
| // TODO(rmistry): Update all code that downloads remote files to use this or the |
| // DownloadRemoteFile method. |
| func (gs *GcsUtil) DownloadRemoteFileFromBucket(bucket, remotePath, localPath string) error { |
| respBody, err := gs.GetRemoteFileContentsFromBucket(bucket, remotePath) |
| if err != nil { |
| return err |
| } |
| defer util.Close(respBody) |
| out, err := os.Create(localPath) |
| if err != nil { |
| return err |
| } |
| |
| bufferSize := int64(1024 * 1024 * 1024) |
| for { |
| _, err := io.CopyN(out, respBody, bufferSize) |
| if err == io.EOF { |
| break |
| } else if err != nil { |
| defer util.Close(out) |
| return err |
| } |
| // Sleep for 30 seconds. Bots run out of memory without this. |
| // Eg: https://chromium-swarm.appspot.com/task?id=2fba9fba3d553510 |
| // Maybe this sleep gives Golang time to clear some caches. |
| time.Sleep(30 * time.Second) |
| } |
| if err := out.Close(); err != nil { |
| return err |
| } |
| return nil |
| } |