blob: 9604316af220e634caa703e824823bda9ff63c50 [file] [log] [blame]
// Google Storage utility that contains methods for both CT master and worker
// scripts.
package util
import (
"context"
"fmt"
"io"
"net/http"
"os"
"path"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"time"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/gcs"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/go/util/zip"
"golang.org/x/oauth2/google"
storage "google.golang.org/api/storage/v1"
)
const (
DOWNLOAD_UPLOAD_GOROUTINE_POOL_SIZE = 30
// Use larger pool size for deletions. This is useful when deleting directories
// with 1M/1B subdirectories from the master. Google Storage will not be overwhelmed
// because all workers do not do large scale deletions at the same time.
DELETE_GOROUTINE_POOL_SIZE = 1000
)
type GcsUtil struct {
// The client used to connect to Google Storage.
client *http.Client
service *storage.Service
}
// NewGcsUtil initializes and returns a utility for CT interations with Google
// Storage. If client is nil then a client is created with the default token
// source.
func NewGcsUtil(client *http.Client) (*GcsUtil, error) {
if client == nil {
ts, err := google.DefaultTokenSource(context.TODO(), auth.ScopeFullControl)
if err != nil {
return nil, fmt.Errorf("Problem setting up default token source: %s", err)
}
clientConfig := httputils.DefaultClientConfig().With2xxOnly().WithTokenSource(ts)
client = clientConfig.Client()
}
client.Timeout = HTTP_CLIENT_TIMEOUT
service, err := storage.New(client)
if err != nil {
return nil, fmt.Errorf("Failed to create interface to Google Storage: %s", err)
}
return &GcsUtil{client: client, service: service}, nil
}
// Returns the response body of the specified GCS object. Tries MAX_URI_GET_TRIES
// times if download is unsuccessful. Client must close the response body when
// finished with it.
func getRespBody(res *storage.Object, client *http.Client) (io.ReadCloser, error) {
for i := 0; i < MAX_URI_GET_TRIES; i++ {
request, err := gcs.RequestForStorageURL(res.MediaLink)
if err != nil {
sklog.Warningf("Unable to create Storage MediaURI request: %s\n", err)
continue
}
resp, err := client.Do(request)
if err != nil {
sklog.Warningf("Unable to retrieve Storage MediaURI: %s", err)
continue
}
if resp.StatusCode != 200 {
sklog.Warningf("Failed to retrieve: %d %s", resp.StatusCode, resp.Status)
util.Close(resp.Body)
continue
}
return resp.Body, nil
}
return nil, fmt.Errorf("Failed fetching file after %d attempts", MAX_URI_GET_TRIES)
}
// Returns the response body of the specified GCS file from the default CT
// bucket. Client must close the response body when finished with it.
func (gs *GcsUtil) GetRemoteFileContents(filePath string) (io.ReadCloser, error) {
return gs.GetRemoteFileContentsFromBucket(GCSBucketName, filePath)
}
// Returns the response body of the specified GCS file. Client must close the
// response body when finished with it.
func (gs *GcsUtil) GetRemoteFileContentsFromBucket(bucket, filePath string) (io.ReadCloser, error) {
res, err := gs.service.Objects.Get(bucket, filePath).Do()
if err != nil {
return nil, fmt.Errorf("Could not get %s from GCS: %s", filePath, err)
}
return getRespBody(res, gs.client)
}
type filePathToStorageObject struct {
storageObject *storage.Object
filePath string
}
// downloadRemoteDir downloads the specified Google Storage dir to the specified
// local dir. The local dir will be emptied and recreated. Handles multiple levels
// of directories.
func (gs *GcsUtil) downloadRemoteDir(localDir, bucket, gsDir string) error {
// Empty the local dir.
util.RemoveAll(localDir)
// Create the local dir.
MkdirAll(localDir, 0700)
// The channel where the storage objects to be downloaded will be sent to.
chStorageObjects := make(chan filePathToStorageObject, DOWNLOAD_UPLOAD_GOROUTINE_POOL_SIZE)
// Kick off one goroutine to populate the channel.
errPopulator := make(chan error, 1)
var wgPopulator sync.WaitGroup
wgPopulator.Add(1)
go func() {
defer wgPopulator.Done()
defer close(chStorageObjects)
req := gs.service.Objects.List(bucket).Prefix(gsDir + "/")
for req != nil {
resp, err := req.Do()
if err != nil {
errPopulator <- fmt.Errorf("Error occured while listing %s: %s", gsDir, err)
return
}
for _, result := range resp.Items {
fileName := path.Base(result.Name)
// If downloading from subdir then add it to the fileName.
fileGsDir := path.Dir(result.Name)
subDirs := strings.TrimPrefix(fileGsDir, gsDir)
if subDirs != "" {
dirTokens := strings.Split(subDirs, "/")
for i := range dirTokens {
fileName = filepath.Join(dirTokens[len(dirTokens)-i-1], fileName)
}
// Create the local directory.
MkdirAll(filepath.Join(localDir, filepath.Dir(fileName)), 0700)
}
chStorageObjects <- filePathToStorageObject{storageObject: result, filePath: fileName}
}
if len(resp.NextPageToken) > 0 {
req.PageToken(resp.NextPageToken)
} else {
req = nil
}
}
}()
// Kick off goroutines to download the storage objects.
var wgConsumer sync.WaitGroup
for i := 0; i < DOWNLOAD_UPLOAD_GOROUTINE_POOL_SIZE; i++ {
wgConsumer.Add(1)
go func(goroutineNum int) {
defer wgConsumer.Done()
for obj := range chStorageObjects {
if err := downloadStorageObj(obj, gs.client, localDir, goroutineNum); err != nil {
sklog.Errorf("Could not download storage object: %s", err)
return
}
// Sleep for a second after downloading file to avoid bombarding Cloud
// storage.
time.Sleep(time.Second)
}
}(i + 1)
}
wgPopulator.Wait()
wgConsumer.Wait()
// Check if there was an error listing the GCS dir.
select {
case err, ok := <-errPopulator:
if ok {
return err
}
default:
}
return nil
}
func downloadStorageObj(obj filePathToStorageObject, c *http.Client, localDir string, goroutineNum int) error {
result := obj.storageObject
filePath := obj.filePath
respBody, err := getRespBody(result, c)
if err != nil {
return fmt.Errorf("Could not fetch %s: %s", result.MediaLink, err)
}
defer util.Close(respBody)
outputFile := filepath.Join(localDir, filePath)
out, err := os.Create(outputFile)
if err != nil {
return fmt.Errorf("Unable to create file %s: %s", outputFile, err)
}
defer util.Close(out)
if _, err = io.Copy(out, respBody); err != nil {
return err
}
return nil
}
// DownloadChromiumBuild downloads the specified Chromium build from Google
// Storage to a local dir. It returns the path to the chrome binary's dir.
func (gs *GcsUtil) DownloadChromiumBuild(chromiumBuild, chromiumBinaryName string) (string, error) {
localDir := filepath.Join(ChromiumBuildsDir, chromiumBuild)
gsDir := path.Join(CHROMIUM_BUILDS_DIR_NAME, chromiumBuild)
return gs.DownloadChromiumBuildFromTo(localDir, GCSBucketName, gsDir, CHROMIUM_BUILD_ZIP_NAME, chromiumBinaryName)
}
// DownloadChromiumBuildFromTo downloads a chromium build from the specified
// gs://${bucket}/${gsDir}/${zipName} to the localDir. It returns the path to the chrome
// binary's dir.
func (gs *GcsUtil) DownloadChromiumBuildFromTo(localDir, bucket, gsDir, zipName, chromiumBinaryName string) (string, error) {
sklog.Infof("Downloading %s from Google Storage to %s", gsDir, localDir)
if err := gs.downloadRemoteDir(localDir, bucket, gsDir); err != nil {
return "", fmt.Errorf("Error downloading %s into %s: %s", gsDir, localDir, err)
}
// Unzip the build.
zipFilePath := filepath.Join(localDir, zipName)
if err := zip.UnZip(localDir, zipFilePath); err != nil {
return "", fmt.Errorf("Error when unzipping %s: %s", zipFilePath, err)
}
// The unzipped directory could be flat or be one level up.
pathToBinary := filepath.Join(localDir, chromiumBinaryName)
if _, err := os.Stat(pathToBinary); os.IsNotExist(err) {
sklog.Infof("Could not find chrome binary %s at %s. Going to try one level up.", chromiumBinaryName, pathToBinary)
pathToBinary = filepath.Join(localDir, strings.TrimSuffix(zipName, filepath.Ext(zipName)), chromiumBinaryName)
if _, err := os.Stat(pathToBinary); os.IsNotExist(err) {
return "", fmt.Errorf("Could not find chrome binary %s at %s", chromiumBinaryName, pathToBinary)
}
}
if runtime.GOOS != "windows" {
// Downloaded chrome binary needs to be set as an executable on linux.
util.LogErr(os.Chmod(pathToBinary, 0777))
}
return filepath.Dir(pathToBinary), nil
}
// DeleteRemoteDirLogErr wraps DeleteRemoteDir and logs an error if one is returned.
func (gs *GcsUtil) DeleteRemoteDirLogErr(gsDir string) {
if err := gs.DeleteRemoteDir(gsDir); err != nil {
sklog.Errorf("Failed to DeleteRemoteDir(%s): %s", gsDir, err)
}
}
func (gs *GcsUtil) DeleteRemoteDir(gsDir string) error {
// The channel where the GCS filepaths to be deleted will be sent to.
chFilePaths := make(chan string, DELETE_GOROUTINE_POOL_SIZE)
// Kick off one goroutine to populate the channel.
errPopulator := make(chan error, 1)
var wgPopulator sync.WaitGroup
wgPopulator.Add(1)
go func() {
defer wgPopulator.Done()
defer close(chFilePaths)
req := gs.service.Objects.List(GCSBucketName).Prefix(gsDir + "/")
for req != nil {
resp, err := req.Do()
if err != nil {
errPopulator <- fmt.Errorf("Error occured while listing %s: %s", gsDir, err)
return
}
for _, result := range resp.Items {
chFilePaths <- result.Name
}
if len(resp.NextPageToken) > 0 {
req.PageToken(resp.NextPageToken)
} else {
req = nil
}
}
}()
// Kick off goroutines to delete the file paths.
var wgConsumer sync.WaitGroup
for i := 0; i < DELETE_GOROUTINE_POOL_SIZE; i++ {
wgConsumer.Add(1)
go func(goroutineNum int) {
defer wgConsumer.Done()
for filePath := range chFilePaths {
if err := gs.service.Objects.Delete(GCSBucketName, filePath).Do(); err != nil {
sklog.Errorf("Goroutine#%d could not delete %s: %s", goroutineNum, filePath, err)
return
}
// Sleep for a second after deleting file to avoid bombarding Cloud
// storage.
time.Sleep(time.Second)
}
}(i + 1)
}
wgPopulator.Wait()
wgConsumer.Wait()
// Check if there was an error listing the GCS dir.
select {
case err, ok := <-errPopulator:
if ok {
return err
}
default:
}
return nil
}
// UploadFile calls UploadFileToBucket with CT's default bucket.
func (gs *GcsUtil) UploadFile(fileName, localDir, gsDir string) error {
return gs.UploadFileToBucket(fileName, localDir, gsDir, GCSBucketName)
}
// UploadFileToBucket uploads the specified file to the remote dir of the bucket
// in Google Storage. It also sets the appropriate ACLs on the uploaded file.
func (gs *GcsUtil) UploadFileToBucket(fileName, localDir, gsDir, bucket string) error {
localFile := filepath.Join(localDir, fileName)
gsFile := path.Join(gsDir, fileName)
object := &storage.Object{
Name: gsFile,
// All objects uploaded to CT's bucket via this util must be readable by
// the google.com domain. This will be fine tuned later if required.
Acl: []*storage.ObjectAccessControl{
{
Bucket: bucket,
Entity: "domain-google.com",
Object: gsFile,
Role: "READER",
},
},
}
f, err := os.Open(localFile)
if err != nil {
return fmt.Errorf("Error opening %s: %s", localFile, err)
}
defer util.Close(f)
// TODO(rmistry): gs api now enables resumable uploads by default. Handle 308
// response codes.
if _, err := gs.service.Objects.Insert(bucket, object).Media(f).Do(); err != nil {
return fmt.Errorf("Objects.Insert failed: %s", err)
}
sklog.Infof("Copied %s to %s", localFile, fmt.Sprintf("gs://%s/%s", bucket, gsFile))
return nil
}
// UploadSwarmingArtifact uploads the specified local artifacts to Google Storage.
func (gs *GcsUtil) UploadSwarmingArtifacts(dirName, pagesetType string) error {
localDir := path.Join(StorageDir, dirName, pagesetType)
gsDir := path.Join(SWARMING_DIR_NAME, dirName, pagesetType)
return gs.UploadDir(localDir, gsDir, false)
}
// DownloadSwarmingArtifacts downloads the specified artifacts from Google Storage to a local dir.
// The Google storage directory is assumed to have numerical subdirs Eg: {1..1000}. This function
// downloads the contents of those directories into a local directory without the numerical
// subdirs.
// Returns the ranking/index of the downloaded artifact.
func (gs *GcsUtil) DownloadSwarmingArtifacts(localDir, remoteDirName, pagesetType string, startRange, num int) (map[string]int, error) {
// Empty the local dir.
util.RemoveAll(localDir)
// Create the local dir.
MkdirAll(localDir, 0700)
gsDir := path.Join(SWARMING_DIR_NAME, remoteDirName, pagesetType)
endRange := num + startRange - 1
// The channel where remote files to be downloaded will be sent to.
chRemoteDirs := make(chan string, num)
for i := startRange; i <= endRange; i++ {
chRemoteDirs <- path.Join(gsDir, strconv.Itoa(i))
}
close(chRemoteDirs)
// Dictionary of artifacts to its rank/index.
artifactToIndex := map[string]int{}
// Mutex to control access to the above dictionary.
var mtx sync.Mutex
// Kick off goroutines to download artifacts and populate the artifactToIndex dictionary.
var wg sync.WaitGroup
for i := 0; i < DOWNLOAD_UPLOAD_GOROUTINE_POOL_SIZE; i++ {
wg.Add(1)
go func(goroutineNum int) {
defer wg.Done()
for remoteDir := range chRemoteDirs {
if err := gs.downloadFromSwarmingDir(remoteDir, gsDir, localDir, goroutineNum, &mtx, artifactToIndex); err != nil {
sklog.Error(err)
return
}
}
}(i + 1)
}
wg.Wait()
if len(chRemoteDirs) != 0 {
return artifactToIndex, fmt.Errorf("Unable to download all artifacts.")
}
return artifactToIndex, nil
}
// GetRemoteDirCount returns the number of objects in the specified dir.
func (gs *GcsUtil) GetRemoteDirCount(gsDir string) (int, error) {
req := gs.service.Objects.List(GCSBucketName).Prefix(gsDir + "/")
count := 0
for req != nil {
resp, err := req.Do()
if err != nil {
return -1, fmt.Errorf("Error occured while listing %s: %s", gsDir, err)
}
count += len(resp.Items)
if len(resp.NextPageToken) > 0 {
req.PageToken(resp.NextPageToken)
} else {
req = nil
}
}
return count, nil
}
func (gs *GcsUtil) downloadFromSwarmingDir(remoteDir, gsDir, localDir string, runID int, mtx *sync.Mutex, artifactToIndex map[string]int) error {
req := gs.service.Objects.List(GCSBucketName).Prefix(remoteDir + "/")
for req != nil {
resp, err := req.Do()
if err != nil {
return fmt.Errorf("Error occured while listing %s: %s", gsDir, err)
}
for _, result := range resp.Items {
fileName := path.Base(result.Name)
fileGsDir := path.Dir(result.Name)
index, err := strconv.Atoi(path.Base(fileGsDir))
if err != nil {
return fmt.Errorf("%s was not in expected format: %s", fileGsDir, err)
}
respBody, err := getRespBody(result, gs.client)
if err != nil {
return fmt.Errorf("Could not fetch %s: %s", result.MediaLink, err)
}
defer util.Close(respBody)
outputFile := filepath.Join(localDir, fileName)
out, err := os.Create(outputFile)
if err != nil {
return fmt.Errorf("Unable to create file %s: %s", outputFile, err)
}
defer util.Close(out)
if _, err = io.Copy(out, respBody); err != nil {
return err
}
// Sleep for a second after downloading file to avoid bombarding Cloud
// storage.
time.Sleep(time.Second)
mtx.Lock()
artifactToIndex[path.Join(localDir, fileName)] = index
mtx.Unlock()
}
if len(resp.NextPageToken) > 0 {
req.PageToken(resp.NextPageToken)
} else {
req = nil
}
}
return nil
}
// UploadDir uploads the specified local dir into the specified Google Storage dir.
func (gs *GcsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error {
if cleanDir {
// Empty the remote dir before uploading to it.
util.LogErr(gs.DeleteRemoteDir(gsDir))
}
// Construct a dictionary of file paths to their file infos.
pathsToFileInfos := map[string]os.FileInfo{}
visit := func(path string, f os.FileInfo, err error) error {
if f.IsDir() {
return nil
}
pathsToFileInfos[path] = f
return nil
}
if err := filepath.Walk(localDir, visit); err != nil {
return fmt.Errorf("Unable to read the local dir %s: %s", localDir, err)
}
// The channel where the filepaths to be uploaded will be sent to.
chFilePaths := make(chan string, len(pathsToFileInfos))
// File filepaths and send it to the above channel.
for path, fileInfo := range pathsToFileInfos {
fileName := fileInfo.Name()
containingDir := strings.TrimSuffix(path, fileName)
subDirs := strings.TrimPrefix(containingDir, localDir)
if subDirs != "" {
dirTokens := strings.Split(subDirs, "/")
for i := range dirTokens {
fileName = filepath.Join(dirTokens[len(dirTokens)-i-1], fileName)
}
}
chFilePaths <- fileName
}
close(chFilePaths)
// Kick off goroutines to upload the file paths.
var wg sync.WaitGroup
for i := 0; i < DOWNLOAD_UPLOAD_GOROUTINE_POOL_SIZE; i++ {
wg.Add(1)
go func(goroutineNum int) {
defer wg.Done()
for filePath := range chFilePaths {
if err := gs.UploadFile(filePath, localDir, gsDir); err != nil {
sklog.Errorf("Goroutine#%d could not upload %s to %s: %s", goroutineNum, filePath, localDir, err)
}
// Sleep for a second after uploading file to avoid bombarding Cloud
// storage.
time.Sleep(time.Second)
}
}(i + 1)
}
wg.Wait()
return nil
}
// DownloadRemoteFile calls DownloadRemoteFileFromBucket with CT's default bucket.
func (gs *GcsUtil) DownloadRemoteFile(remotePath, localPath string) error {
return gs.DownloadRemoteFileFromBucket(GCSBucketName, remotePath, localPath)
}
// DownloadRemoteFileFromBucket downloads the specified remote path into the specified
// local file. This function has been tested to download very large files (~33GB).
// TODO(rmistry): Update all code that downloads remote files to use this or the
// DownloadRemoteFile method.
func (gs *GcsUtil) DownloadRemoteFileFromBucket(bucket, remotePath, localPath string) error {
respBody, err := gs.GetRemoteFileContentsFromBucket(bucket, remotePath)
if err != nil {
return err
}
defer util.Close(respBody)
out, err := os.Create(localPath)
if err != nil {
return err
}
bufferSize := int64(1024 * 1024 * 1024)
for {
_, err := io.CopyN(out, respBody, bufferSize)
if err == io.EOF {
break
} else if err != nil {
defer util.Close(out)
return err
}
// Sleep for 30 seconds. Bots run out of memory without this.
// Eg: https://chromium-swarm.appspot.com/task?id=2fba9fba3d553510
// Maybe this sleep gives Golang time to clear some caches.
time.Sleep(30 * time.Second)
}
if err := out.Close(); err != nil {
return err
}
return nil
}